@software
def sql_investigate(question: str) -> dict:
attempt:
df = con.execute(question).df()
head = df.head(30)
return {
"rows": int(len(df)),
"columns": checklist(df.columns),
"preview": head.to_dict(orient="data")
}
besides Exception as e:
return {"error": str(e)}
@software
def log_pattern_scan(window_start_iso: str, window_end_iso: str, top_k: int = 8) -> dict:
ws = pd.to_datetime(window_start_iso)
we = pd.to_datetime(window_end_iso)
df = logs_df((logs_df("ts") >= ws) & (logs_df("ts") <= we)).copy()
if df.empty:
return {"rows": 0, "top_error_kinds": (), "top_services": (), "top_endpoints": ()}
df("error_kind_norm") = df("error_kind").fillna("").substitute("", "NONE")
err = df(df("stage").isin(("WARN","ERROR"))).copy()
top_err = err("error_kind_norm").value_counts().head(int(top_k)).to_dict()
top_svc = err("service").value_counts().head(int(top_k)).to_dict()
top_ep = err("endpoint").value_counts().head(int(top_k)).to_dict()
by_region = err.groupby("area").measurement().sort_values(ascending=False).head(int(top_k)).to_dict()
p95_latency = float(np.percentile(df("latency_ms").values, 95))
return {
"rows": int(len(df)),
"warn_error_rows": int(len(err)),
"p95_latency_ms": p95_latency,
"top_error_kinds": top_err,
"top_services": top_svc,
"top_endpoints": top_ep,
"error_by_region": by_region
}
@software
def propose_mitigations(speculation: str) -> dict:
h = speculation.decrease()
mitigations = ()
if "conn" in h or "pool" in h or "db" in h:
mitigations += (
{"motion": "Improve DB connection pool measurement (bounded) and add backpressure at db-proxy", "proprietor": "Platform", "eta_days": 3},
{"motion": "Add circuit breaker + adaptive timeouts between api-gateway and db-proxy", "proprietor": "Backend", "eta_days": 5},
{"motion": "Tune question hotspots; add indexes for prime offending endpoints", "proprietor": "Knowledge/DBA", "eta_days": 7},
)
if "timeout" in h or "upstream" in h:
mitigations += (
{"motion": "Implement hedged requests for idempotent calls (rigorously) and tighten retry budgets", "proprietor": "Backend", "eta_days": 6},
{"motion": "Add upstream SLO-aware load shedding at api-gateway", "proprietor": "Platform", "eta_days": 7},
)
if "cache" in h:
mitigations += (
{"motion": "Add request coalescing and unfavourable caching to forestall cache-miss storms", "proprietor": "Backend", "eta_days": 6},
{"motion": "Prewarm cache for prime endpoints throughout deploys", "proprietor": "SRE", "eta_days": 4},
)
if not mitigations:
mitigations += (
{"motion": "Add focused dashboards and alerts for the suspected bottleneck metric", "proprietor": "SRE", "eta_days": 3},
{"motion": "Run managed load take a look at to breed and validate the speculation", "proprietor": "Perf Eng", "eta_days": 5},
)
mitigations = mitigations(:10)
return {"speculation": speculation, "mitigations": mitigations}
@software
def draft_postmortem(title: str, window_start_iso: str, window_end_iso: str, customer_impact: str, suspected_root_cause: str, key_facts_json: str, mitigations_json: str) -> dict:
attempt:
information = json.hundreds(key_facts_json)
besides Exception:
information = {"be aware": "key_facts_json was not legitimate JSON"}
attempt:
mits = json.hundreds(mitigations_json)
besides Exception:
mits = {"be aware": "mitigations_json was not legitimate JSON"}
doc = {
"title": title,
"date_utc": datetime.utcnow().strftime("%Y-%m-%d"),
"incident_window_utc": {"begin": window_start_iso, "finish": window_end_iso},
"customer_impact": customer_impact,
"suspected_root_cause": suspected_root_cause,
"detection": {
"how_detected": "Automated anomaly detection + error-rate spike triage",
"gaps": ("Add earlier saturation alerting", "Enhance symptom-to-cause correlation dashboards")
},
"timeline": (
{"t": window_start_iso, "occasion": "Signs start (latency/error anomalies)"},
{"t": "T+10m", "occasion": "On-call begins triage; identifies prime providers/endpoints"},
{"t": "T+25m", "occasion": "Mitigation actions initiated (throttling/backpressure)"},
{"t": window_end_iso, "occasion": "Buyer affect ends; metrics stabilize"},
),
"key_facts": information,
"corrective_actions": mits.get("mitigations", mits),
"followups": (
{"space": "Reliability", "process": "Add saturation alerts + budget-based retries", "precedence": "P1"},
{"space": "Observability", "process": "Add golden alerts per service/endpoint", "precedence": "P1"},
{"space": "Efficiency", "process": "Reproduce with load take a look at and validate repair", "precedence": "P2"},
),
"appendix": {"notes": "Generated by a Haystack multi-agent workflow (non-RAG)."}
}
return {"postmortem_json": doc}
llm = OpenAIChatGenerator(mannequin="gpt-4o-mini")
state_schema = {
"metrics_csv_path": {"sort": str},
"logs_csv_path": {"sort": str},
"metrics_summary": {"sort": dict},
"logs_summary": {"sort": dict},
"incident_window": {"sort": dict},
"investigation_notes": {"sort": checklist, "handler": merge_lists},
"speculation": {"sort": str},
"key_facts": {"sort": dict},
"mitigation_plan": {"sort": dict},
"postmortem": {"sort": dict},
}
profiler_prompt = """You're a specialist incident profiler.
Aim: flip uncooked metrics/log summaries into crisp, high-signal findings.
Guidelines:
- Desire calling instruments over guessing.
- Output should be a JSON object with keys: window, signs, top_contributors, speculation, key_facts.
- Speculation should be falsifiable and point out not less than one particular service and mechanism.
"""
writer_prompt = """You're a specialist postmortem author.
Aim: produce a high-quality postmortem JSON (not prose) utilizing the offered proof and mitigation plan.
Guidelines:
- Name instruments provided that wanted.
- Hold 'suspected_root_cause' particular and never generic.
- Guarantee corrective actions have homeowners and eta_days.
"""
coordinator_prompt = """You're an incident commander coordinating a non-RAG multi-agent workflow.
It's essential to:
1) Load inputs
2) Discover an incident window (use p95_ms or error_rate)
3) Examine with focused SQL and log sample scan
4) Ask the specialist profiler to synthesize proof
5) Suggest mitigations
6) Ask the specialist author to draft a postmortem JSON
Return a remaining response with:
- A brief govt abstract (max 10 strains)
- The postmortem JSON
- A compact runbook guidelines (bulleted)
"""
profiler_agent = Agent(
chat_generator=llm,
instruments=(load_inputs, detect_incident_window, sql_investigate, log_pattern_scan),
system_prompt=profiler_prompt,
exit_conditions=("textual content"),
state_schema=state_schema
)
writer_agent = Agent(
chat_generator=llm,
instruments=(draft_postmortem),
system_prompt=writer_prompt,
exit_conditions=("textual content"),
state_schema=state_schema
)
profiler_tool = ComponentTool(
element=profiler_agent,
title="profiler_specialist",
description="Synthesizes incident proof right into a falsifiable speculation and key information (JSON output).",
outputs_to_string={"supply": "last_message"}
)
writer_tool = ComponentTool(
element=writer_agent,
title="postmortem_writer_specialist",
description="Drafts a postmortem JSON utilizing title/window/affect/rca/information/mitigations.",
outputs_to_string={"supply": "last_message"}
)
coordinator_agent = Agent(
chat_generator=llm,
instruments=(
load_inputs,
detect_incident_window,
sql_investigate,
log_pattern_scan,
propose_mitigations,
profiler_tool,
writer_tool,
draft_postmortem
),
system_prompt=coordinator_prompt,
exit_conditions=("textual content"),
state_schema=state_schema
)
