feat(telegram): persist callback evidence source snapshots
All checks were successful
CD Pipeline / tests (push) Successful in 1m3s
Code Review / ai-code-review (push) Successful in 12s
CD Pipeline / build-and-deploy (push) Successful in 3m41s
CD Pipeline / post-deploy-checks (push) Successful in 1m54s

This commit is contained in:
Your Name
2026-05-25 10:28:29 +08:00
parent 0a845498ff
commit dd1c513841
5 changed files with 464 additions and 15 deletions

View File

@@ -912,11 +912,243 @@ def _callback_reply_km_stale_completion_snapshot(
return snapshot
def _callback_reply_source_ref_values(envelope: object, key: str) -> list[str]:
if not isinstance(envelope, dict):
return []
source_refs = envelope.get("source_refs")
if not isinstance(source_refs, dict):
return []
raw_values = source_refs.get(key)
if isinstance(raw_values, list):
return [str(item) for item in raw_values if str(item or "").strip()]
if raw_values not in (None, ""):
return [str(raw_values)]
return []
def _append_unique_text(values: list[str], value: object) -> None:
text = str(value or "").strip()
if text and text not in values:
values.append(text)
def _callback_reply_awooop_mcp_snapshot(
truth_chain: dict[str, object] | None,
) -> dict[str, object]:
mcp = truth_chain.get("mcp") if isinstance(truth_chain, dict) else {}
if not isinstance(mcp, dict):
mcp = {}
gateway = mcp.get("awooop_gateway") if isinstance(mcp.get("awooop_gateway"), dict) else {}
legacy = mcp.get("legacy") if isinstance(mcp.get("legacy"), dict) else {}
top_tools: list[dict[str, object]] = []
seen_tools: set[str] = set()
for source, summary in (("gateway", gateway), ("legacy", legacy)):
by_tool = summary.get("by_tool") if isinstance(summary, dict) else []
if not isinstance(by_tool, list):
continue
for item in by_tool:
if not isinstance(item, dict):
continue
tool_name = str(item.get("tool_name") or "unknown").strip() or "unknown"
key = f"{source}:{tool_name}"
if key in seen_tools:
continue
seen_tools.add(key)
top_tools.append({
"source": source,
"tool_name": tool_name,
"total": (
_safe_int(item.get("total"))
or _safe_int(item.get("success"))
+ _safe_int(item.get("failed"))
+ _safe_int(item.get("blocked"))
),
"success": _safe_int(item.get("success")),
"failed": _safe_int(item.get("failed")),
"blocked": _safe_int(item.get("blocked")),
"last_error": str(item.get("last_error") or "")[:240] or None,
})
if len(top_tools) >= 5:
break
if len(top_tools) >= 5:
break
return {
"gateway": {
"total": _safe_int(gateway.get("total")),
"success": _safe_int(gateway.get("success")),
"failed": _safe_int(gateway.get("failed")),
"blocked": _safe_int(gateway.get("blocked")),
"first_class_total": _safe_int(gateway.get("first_class_total")),
"legacy_bridge_total": _safe_int(gateway.get("legacy_bridge_total")),
"policy_enforced_total": _safe_int(gateway.get("policy_enforced_total")),
"stage": str(gateway.get("stage") or ""),
"stage_status": str(gateway.get("stage_status") or ""),
},
"legacy": {
"total": _safe_int(legacy.get("total")),
"success": _safe_int(legacy.get("success")),
"failed": _safe_int(legacy.get("failed")),
},
"top_tools": top_tools,
}
def _callback_reply_awooop_execution_snapshot(
truth_chain: dict[str, object] | None,
) -> dict[str, object]:
execution = truth_chain.get("execution") if isinstance(truth_chain, dict) else {}
if not isinstance(execution, dict):
execution = {}
ops = execution.get("automation_operation_log")
if not isinstance(ops, list):
ops = []
latest_op = ops[0] if ops and isinstance(ops[0], dict) else {}
playbook_ids: list[str] = []
playbook_paths: list[str] = []
for row in ops:
if not isinstance(row, dict):
continue
for key in ("matched_playbook_id", "input_playbook_id", "output_playbook_id"):
_append_unique_text(playbook_ids, row.get(key))
for key in (
"input_playbook_path",
"output_playbook_path",
"input_ansible_playbook_path",
"output_ansible_playbook_path",
):
_append_unique_text(playbook_paths, row.get(key))
ansible = execution.get("ansible") if isinstance(execution.get("ansible"), dict) else {}
ansible_records = ansible.get("records") if isinstance(ansible.get("records"), list) else []
latest_ansible = (
ansible_records[0]
if ansible_records and isinstance(ansible_records[0], dict)
else {}
)
candidate_catalog = (
ansible.get("candidate_catalog")
if isinstance(ansible.get("candidate_catalog"), dict)
else {}
)
candidates = (
candidate_catalog.get("candidates")
if isinstance(candidate_catalog.get("candidates"), list)
else []
)
return {
"operation_total": len(ops),
"latest_operation_type": latest_op.get("operation_type"),
"latest_status": latest_op.get("status"),
"latest_actor": latest_op.get("actor"),
"latest_action": (
latest_op.get("input_action") or latest_op.get("output_action")
),
"latest_executor": (
latest_op.get("input_executor")
or latest_op.get("output_executor")
or latest_op.get("input_execution_backend")
or latest_op.get("output_execution_backend")
),
"playbook_ids": playbook_ids[:5],
"playbook_paths": playbook_paths[:5],
"ansible": {
"considered": bool(ansible.get("considered")),
"record_total": len(ansible_records),
"candidate_count": len(candidates),
"not_used_reason": ansible.get("not_used_reason"),
"latest_operation_type": latest_ansible.get("operation_type"),
"latest_status": latest_ansible.get("status"),
"latest_playbook_path": latest_ansible.get("playbook_path"),
"latest_check_mode": latest_ansible.get("check_mode"),
"candidate_playbooks": [
{
"catalog_id": item.get("catalog_id"),
"playbook_path": item.get("playbook_path"),
"risk_level": item.get("risk_level"),
"match_score": item.get("match_score"),
}
for item in candidates[:3]
if isinstance(item, dict)
],
},
}
def _callback_reply_awooop_source_snapshot(
truth_chain: dict[str, object] | None,
source_correlation: dict[str, object] | None,
) -> dict[str, object]:
channel = truth_chain.get("channel") if isinstance(truth_chain, dict) else {}
if not isinstance(channel, dict):
channel = {}
inbound_events = channel.get("inbound_events")
outbound_messages = channel.get("outbound_messages")
if not isinstance(inbound_events, list):
inbound_events = []
if not isinstance(outbound_messages, list):
outbound_messages = []
source_refs: dict[str, list[str]] = {
"alert_ids": [],
"sentry_issue_ids": [],
"signoz_alerts": [],
"fingerprints": [],
"incident_ids": [],
}
inbound_channels: list[str] = []
for row in inbound_events:
if not isinstance(row, dict):
continue
_append_unique_text(inbound_channels, row.get("channel_type"))
envelope = row.get("source_envelope")
for key in source_refs:
for value in _callback_reply_source_ref_values(envelope, key):
_append_unique_text(source_refs[key], value)
latest_inbound = (
inbound_events[0]
if inbound_events and isinstance(inbound_events[0], dict)
else {}
)
latest_outbound = (
outbound_messages[0]
if outbound_messages and isinstance(outbound_messages[0], dict)
else {}
)
source_snapshot: dict[str, object] = {
"inbound_total": len(inbound_events),
"outbound_total": len(outbound_messages),
"inbound_channels": inbound_channels[:5],
"refs": {key: values[:5] for key, values in source_refs.items()},
"latest_inbound": {
"channel_type": latest_inbound.get("channel_type"),
"provider_event_id": latest_inbound.get("provider_event_id"),
"content_type": latest_inbound.get("content_type"),
"is_duplicate": latest_inbound.get("is_duplicate"),
"received_at": latest_inbound.get("received_at"),
},
"latest_outbound": {
"channel_type": latest_outbound.get("channel_type"),
"message_type": latest_outbound.get("message_type"),
"send_status": latest_outbound.get("send_status"),
"sent_at": latest_outbound.get("sent_at"),
},
}
if isinstance(source_correlation, dict):
source_snapshot["correlation"] = source_correlation
return source_snapshot
def _callback_reply_awooop_status_chain_snapshot(
*,
incident_id: str | None,
truth_chain: dict[str, object] | None = None,
remediation_history: dict[str, object] | None = None,
source_correlation: dict[str, object] | None = None,
) -> dict[str, object] | None:
"""Persist a compact AwoooP status-chain snapshot with callback evidence."""
if not incident_id or (not truth_chain and not remediation_history):
@@ -1034,6 +1266,12 @@ def _callback_reply_awooop_status_chain_snapshot(
"incident": latest.get("writes_incident_state"),
"auto_repair": latest.get("writes_auto_repair_result"),
},
"mcp": _callback_reply_awooop_mcp_snapshot(truth_chain),
"execution": _callback_reply_awooop_execution_snapshot(truth_chain),
"source_refs": _callback_reply_awooop_source_snapshot(
truth_chain,
source_correlation,
),
}
@@ -6256,6 +6494,7 @@ class TelegramGateway:
project_id=project_id,
)
truth_chain: dict[str, object] | None = None
source_correlation: dict[str, object] | None = None
try:
from src.services.awooop_truth_chain_service import fetch_truth_chain
@@ -6289,10 +6528,28 @@ class TelegramGateway:
lines += _format_km_stale_completion_lines(km_completion_summary)
lines += _format_remediation_history_lines(remediation_history)
if truth_chain is not None:
try:
from src.services.platform_operator_service import (
_fetch_source_correlation_summary,
)
source_correlation = await _fetch_source_correlation_summary(
project_id=project_id,
incident_ids=[incident_id],
)
except Exception as source_exc:
logger.warning(
"incident_detail_source_correlation_summary_failed",
incident_id=incident_id,
error=str(source_exc),
)
awooop_status_chain_snapshot = _callback_reply_awooop_status_chain_snapshot(
incident_id=incident_id,
truth_chain=truth_chain,
remediation_history=remediation_history,
source_correlation=source_correlation,
)
await self._send_html_line_message(
lines,
@@ -6421,6 +6678,7 @@ class TelegramGateway:
project_id=project_id,
)
truth_chain: dict[str, object] | None = None
source_correlation: dict[str, object] | None = None
try:
from src.services.awooop_truth_chain_service import fetch_truth_chain
@@ -6449,10 +6707,28 @@ class TelegramGateway:
lines += _format_km_stale_completion_lines(km_completion_summary)
lines += _format_remediation_history_lines(remediation_history)
if truth_chain is not None:
try:
from src.services.platform_operator_service import (
_fetch_source_correlation_summary,
)
source_correlation = await _fetch_source_correlation_summary(
project_id=project_id,
incident_ids=[incident_id],
)
except Exception as source_exc:
logger.warning(
"incident_history_source_correlation_summary_failed",
incident_id=incident_id,
error=str(source_exc),
)
awooop_status_chain_snapshot = _callback_reply_awooop_status_chain_snapshot(
incident_id=incident_id,
truth_chain=truth_chain,
remediation_history=remediation_history,
source_correlation=source_correlation,
)
await self._send_html_line_message(
lines,

View File

@@ -199,6 +199,80 @@ def test_callback_reply_awooop_status_chain_snapshot_marks_manual_gate() -> None
},
"blockers": [],
},
"mcp": {
"awooop_gateway": {
"total": 2,
"success": 1,
"failed": 1,
"blocked": 0,
"first_class_total": 2,
"legacy_bridge_total": 0,
"policy_enforced_total": 2,
"by_tool": [
{
"tool_name": "prometheus.query",
"total": 2,
"success": 1,
"failed": 1,
"blocked": 0,
}
],
},
"legacy": {
"total": 1,
"success": 1,
"failed": 0,
},
},
"execution": {
"automation_operation_log": [
{
"operation_type": "playbook_previewed",
"status": "waiting_approval",
"actor": "Hermes",
"input_executor": "ansible",
"input_playbook_id": "pb-host-restart",
"input_playbook_path": "infra/ansible/playbooks/188-ai-web.yml",
}
],
"ansible": {
"considered": True,
"records": [
{
"operation_type": "ansible_check_mode_previewed",
"status": "waiting_approval",
"playbook_path": "infra/ansible/playbooks/188-ai-web.yml",
"check_mode": True,
}
],
"candidate_catalog": {
"candidates": [
{
"catalog_id": "ansible:188-ai-web",
"playbook_path": "infra/ansible/playbooks/188-ai-web.yml",
"risk_level": "medium",
"match_score": 3,
}
]
},
},
},
"channel": {
"inbound_events": [
{
"channel_type": "sentry",
"provider_event_id": "sentry:ISSUE-1",
"source_envelope": {
"source_refs": {
"sentry_issue_ids": ["ISSUE-1"],
"signoz_alerts": ["signoz:alert-1"],
"fingerprints": ["fp-1"],
}
},
}
],
"outbound_messages": [],
},
},
remediation_history={
"total": 1,
@@ -214,6 +288,38 @@ def test_callback_reply_awooop_status_chain_snapshot_marks_manual_gate() -> None
}
],
},
source_correlation={
"schema_version": "source_provider_correlation_v1",
"status": "candidate_found",
"verification_status": "candidate_only",
"direct_ref_total": 0,
"candidate_total": 1,
"applied_link_total": 0,
"provider_event_total": 1,
"latest_applied_link_at": None,
"providers": {
"sentry": {
"direct_ref_total": 0,
"candidate_total": 1,
"applied_link_total": 0,
"latest_event_at": "2026-05-20T00:00:00Z",
},
"signoz": {
"direct_ref_total": 0,
"candidate_total": 0,
"applied_link_total": 0,
"latest_event_at": None,
},
},
"top_candidates": [
{
"provider": "sentry",
"provider_event_id": "sentry:ISSUE-1",
"score": 2,
"link_state": "candidate",
}
],
},
)
assert snapshot is not None
@@ -227,6 +333,16 @@ def test_callback_reply_awooop_status_chain_snapshot_marks_manual_gate() -> None
assert snapshot["evidence"]["latest_route"] == "investigator/ssh_diagnose/read"
assert snapshot["writes"]["incident"] is False
assert snapshot["blockers"] == ["pending_human_approval"]
assert snapshot["mcp"]["gateway"]["policy_enforced_total"] == 2
assert snapshot["mcp"]["top_tools"][0]["tool_name"] == "prometheus.query"
assert snapshot["execution"]["latest_executor"] == "ansible"
assert snapshot["execution"]["playbook_paths"] == [
"infra/ansible/playbooks/188-ai-web.yml"
]
assert snapshot["execution"]["ansible"]["candidate_count"] == 1
assert snapshot["source_refs"]["refs"]["sentry_issue_ids"] == ["ISSUE-1"]
assert snapshot["source_refs"]["refs"]["signoz_alerts"] == ["signoz:alert-1"]
assert snapshot["source_refs"]["correlation"]["status"] == "candidate_found"
def test_km_stale_completion_lines_show_owner_review_queue_state() -> None:

View File

@@ -2797,6 +2797,9 @@
"previewEmpty": "No preview",
"openRun": "Open Run",
"awooopSnapshotTitle": "Callback-time AwoooP Status Chain",
"awooopSnapshotMcp": "MCP: total {total} / success {success} / failed {failed} / blocked {blocked}; top {topTool}",
"awooopSnapshotExecution": "Execution: executor {executor}; playbook {playbook}; Ansible considered={ansible} / candidates={candidates}",
"awooopSnapshotSource": "Source: {status}; direct {direct} / candidate {candidate} / applied {applied}; {providers}",
"kmCompletion": {
"title": "KM Owner Review",
"status": "Status: {status}",

View File

@@ -2798,6 +2798,9 @@
"previewEmpty": "無摘要",
"openRun": "開啟 Run",
"awooopSnapshotTitle": "Callback 當下 AwoooP 狀態鏈",
"awooopSnapshotMcp": "MCPtotal {total} / success {success} / failed {failed} / blocked {blocked}top {topTool}",
"awooopSnapshotExecution": "Executionexecutor {executor}playbook {playbook}Ansible considered={ansible} / candidates={candidates}",
"awooopSnapshotSource": "Source{status}direct {direct} / candidate {candidate} / applied {applied}{providers}",
"kmCompletion": {
"title": "KM Owner Review",
"status": "狀態:{status}",

View File

@@ -1643,6 +1643,69 @@ function CallbackKmCompletionSnapshot({
);
}
function CallbackAwoooPStatusChainSnapshot({
chain,
}: {
chain?: AwoooPStatusChain | null;
}) {
const t = useTranslations("awooop.callbackReply.events");
if (!chain) return null;
const mcpGateway = chain.mcp?.gateway ?? {};
const topTool = chain.mcp?.top_tools?.[0];
const execution = chain.execution ?? {};
const ansible = execution.ansible ?? {};
const correlation = chain.source_refs?.correlation;
const providerSummary = ["sentry", "signoz"].map((provider) => {
const item = correlation?.providers?.[provider];
return `${provider} ${item?.direct_ref_total ?? 0}/${item?.candidate_total ?? 0}/${item?.applied_link_total ?? 0}`;
}).join(" · ");
return (
<div className="mt-3 border-t border-[#e0ddd4] pt-3">
<div className="flex items-center gap-2">
<Activity className="h-3.5 w-3.5 text-[#1f5b9b]" aria-hidden="true" />
<p className="text-xs font-semibold text-[#141413]">
{t("awooopSnapshotTitle")}
</p>
</div>
<AwoooPStatusChainPanel
chain={chain}
compact
className="mt-2"
/>
<div className="grid gap-px border border-t-0 border-[#e0ddd4] bg-[#e0ddd4] md:grid-cols-3">
<p className="truncate bg-white px-3 py-2 font-mono text-xs text-[#5f5b52]" title={topTool?.tool_name ?? "--"}>
{t("awooopSnapshotMcp", {
total: mcpGateway.total ?? 0,
success: mcpGateway.success ?? 0,
failed: mcpGateway.failed ?? 0,
blocked: mcpGateway.blocked ?? 0,
topTool: topTool?.tool_name ?? "--",
})}
</p>
<p className="truncate bg-white px-3 py-2 font-mono text-xs text-[#5f5b52]" title={execution.playbook_paths?.[0] ?? "--"}>
{t("awooopSnapshotExecution", {
executor: execution.latest_executor ?? "--",
playbook: execution.playbook_paths?.[0] ?? "--",
ansible: String(ansible.considered ?? false),
candidates: ansible.candidate_count ?? 0,
})}
</p>
<p className="truncate bg-white px-3 py-2 font-mono text-xs text-[#5f5b52]" title={providerSummary}>
{t("awooopSnapshotSource", {
status: correlation?.status ?? "--",
direct: correlation?.direct_ref_total ?? 0,
candidate: correlation?.candidate_total ?? 0,
applied: correlation?.applied_link_total ?? 0,
providers: providerSummary,
})}
</p>
</div>
</div>
);
}
function CallbackReplyEvidencePanel({
events,
total,
@@ -1733,21 +1796,9 @@ function CallbackReplyEvidencePanel({
compact
className="mt-3"
/>
{event.persisted_awooop_status_chain ? (
<div className="mt-3 border-t border-[#e0ddd4] pt-3">
<div className="flex items-center gap-2">
<Activity className="h-3.5 w-3.5 text-[#1f5b9b]" aria-hidden="true" />
<p className="text-xs font-semibold text-[#141413]">
{t("awooopSnapshotTitle")}
</p>
</div>
<AwoooPStatusChainPanel
chain={event.persisted_awooop_status_chain}
compact
className="mt-2"
/>
</div>
) : null}
<CallbackAwoooPStatusChainSnapshot
chain={event.persisted_awooop_status_chain}
/>
<CallbackKmCompletionSummary
summary={event.km_stale_completion_summary}
/>