fix(api): batch truth chain quality summary
Some checks failed
CD Pipeline / tests (push) Successful in 1m19s
Code Review / ai-code-review (push) Successful in 13s
CD Pipeline / post-deploy-checks (push) Has been cancelled
CD Pipeline / build-and-deploy (push) Has been cancelled

This commit is contained in:
Your Name
2026-06-01 16:12:42 +08:00
parent 617ca6ed70
commit a31e7bbd29
3 changed files with 530 additions and 23 deletions

View File

@@ -7,7 +7,6 @@ Telegram cards can be audited without guessing which subsystem owns the truth.
from __future__ import annotations
import asyncio
import json
import os
import shutil
@@ -35,7 +34,6 @@ logger = structlog.get_logger(__name__)
_MAX_ROWS = 100
_JSON_TEXT_FIELDS = {"gate_result", "source_envelope"}
_QUALITY_SUMMARY_CONCURRENCY = 8
_QUALITY_SUMMARY_CACHE_TTL_SECONDS = int(
os.getenv("AWOOOP_QUALITY_SUMMARY_CACHE_TTL_SECONDS", "30")
)
@@ -1919,6 +1917,160 @@ async def fetch_truth_chain(source_id: str, project_id: str = "awoooi") -> dict[
return result
def _empty_incident_groups(incident_ids: list[str]) -> dict[str, list[dict[str, Any]]]:
return {incident_id: [] for incident_id in incident_ids}
def _append_unique_row(group: list[dict[str, Any]], row: dict[str, Any]) -> None:
if row not in group:
group.append(row)
def _row_source_refs(row: dict[str, Any]) -> dict[str, Any]:
envelope = row.get("source_envelope")
if not isinstance(envelope, dict):
return {}
refs = envelope.get("source_refs")
return refs if isinstance(refs, dict) else {}
def _source_ref_contains(row: dict[str, Any], keys: tuple[str, ...], incident_id: str) -> bool:
refs = _row_source_refs(row)
for key in keys:
value = refs.get(key)
if isinstance(value, list) and incident_id in {str(item) for item in value}:
return True
if str(value or "") == incident_id:
return True
return False
def _group_rows_by_incident_reference(
*,
rows: list[dict[str, Any]],
incident_ids: list[str],
direct_fields: tuple[str, ...],
text_fields: tuple[str, ...] = (),
source_ref_keys: tuple[str, ...] = (),
) -> dict[str, list[dict[str, Any]]]:
"""Group batch-fetched rows back to incidents using durable direct/source refs."""
incident_set = set(incident_ids)
groups = _empty_incident_groups(incident_ids)
for row in rows:
direct_matches = {
str(row.get(field) or "")
for field in direct_fields
if str(row.get(field) or "") in incident_set
}
for incident_id in direct_matches:
groups[incident_id].append(row)
if direct_matches:
continue
text_blob = "\n".join(str(row.get(field) or "") for field in text_fields)
for incident_id in incident_ids:
if (
(text_blob and incident_id in text_blob)
or (
source_ref_keys
and _source_ref_contains(row, source_ref_keys, incident_id)
)
):
_append_unique_row(groups[incident_id], row)
return groups
def _group_timeline_events_by_incident(
*,
rows: list[dict[str, Any]],
incident_ids: list[str],
approval_to_incident: dict[str, str],
) -> dict[str, list[dict[str, Any]]]:
groups = _empty_incident_groups(incident_ids)
incident_set = set(incident_ids)
for row in rows:
incident_id = str(row.get("incident_id") or "")
if incident_id in incident_set:
groups[incident_id].append(row)
continue
approval_incident_id = approval_to_incident.get(str(row.get("approval_id") or ""))
if approval_incident_id:
groups[approval_incident_id].append(row)
return groups
def _build_summary_quality_records(
*,
incidents: list[dict[str, Any]],
approvals_by_incident: dict[str, list[dict[str, Any]]],
evidence_by_incident: dict[str, list[dict[str, Any]]],
timeline_by_incident: dict[str, list[dict[str, Any]]],
legacy_mcp_by_incident: dict[str, list[dict[str, Any]]],
automation_ops_by_incident: dict[str, list[dict[str, Any]]],
auto_repair_by_incident: dict[str, list[dict[str, Any]]],
km_by_incident: dict[str, list[dict[str, Any]]],
gateway_mcp_by_incident: dict[str, list[dict[str, Any]]],
outbound_by_incident: dict[str, list[dict[str, Any]]],
) -> list[dict[str, Any]]:
records: list[dict[str, Any]] = []
for incident in incidents:
incident_id = str(incident.get("incident_id") or "")
if not incident_id:
continue
approvals = approvals_by_incident.get(incident_id, [])
evidence_rows = evidence_by_incident.get(incident_id, [])
timeline_events = timeline_by_incident.get(incident_id, [])
legacy_mcp_rows = legacy_mcp_by_incident.get(incident_id, [])
automation_ops = automation_ops_by_incident.get(incident_id, [])
auto_repair_executions = auto_repair_by_incident.get(incident_id, [])
km_entries = km_by_incident.get(incident_id, [])
gateway_mcp_rows = gateway_mcp_by_incident.get(incident_id, [])
outbound_rows = outbound_by_incident.get(incident_id, [])
legacy_mcp_summary = _summarize_mcp(legacy_mcp_rows)
gateway_mcp_summary = _summarize_gateway_mcp(gateway_mcp_rows)
truth_status = _truth_status(
incident=incident,
approvals=approvals,
evidence_rows=evidence_rows,
automation_ops=automation_ops,
drift=None,
drift_repeat_count=0,
gateway_mcp_total=len(gateway_mcp_rows),
legacy_mcp_total=legacy_mcp_summary["total"],
outbound_visible_total=len(outbound_rows),
inbound_visible_total=0,
auto_repair_executions=auto_repair_executions,
)
automation_quality = build_automation_quality(
incident=incident,
approvals=approvals,
evidence_rows=evidence_rows,
automation_ops=automation_ops,
auto_repair_executions=auto_repair_executions,
gateway_mcp_summary=gateway_mcp_summary,
legacy_mcp_summary=legacy_mcp_summary,
outbound_rows=outbound_rows,
km_entries=km_entries,
timeline_events=timeline_events,
)
automation_quality["operator_outcome"] = build_operator_outcome(
truth_status=truth_status,
automation_quality=automation_quality,
source_id=incident_id,
)
records.append({
"incident": incident,
"truth_status": truth_status,
"automation_quality": automation_quality,
"execution": {
"automation_operation_log": automation_ops,
"auto_repair_executions": auto_repair_executions,
"ansible": build_ansible_truth(automation_ops, incident=incident, drift=None),
},
})
return records
async def fetch_automation_quality_summary(
*,
project_id: str = "awoooi",
@@ -2015,29 +2167,347 @@ async def fetch_automation_quality_summary(
},
)
semaphore = asyncio.Semaphore(_QUALITY_SUMMARY_CONCURRENCY)
async def _quality_record(incident: dict[str, Any]) -> dict[str, Any] | None:
incident_id = str(incident.get("incident_id") or "")
if not incident_id:
return None
async with semaphore:
truth_chain = await fetch_truth_chain(
source_id=incident_id,
project_id=normalized_project_id,
incident_ids = [
str(incident.get("incident_id") or "")
for incident in incidents
if incident.get("incident_id")
]
if not incident_ids:
records = []
else:
batch_params = {"incident_ids": incident_ids, "limit": _MAX_ROWS * len(incident_ids)}
approvals = await _fetch_all(
db,
"""
SELECT
id,
incident_id,
status::text AS status,
risk_level::text AS risk_level,
action,
description,
hit_count,
created_at,
updated_at,
resolved_at,
matched_playbook_id,
extra_metadata,
decision_fusion_details
FROM approval_records
WHERE incident_id = ANY(CAST(:incident_ids AS text[]))
ORDER BY created_at DESC
LIMIT :limit
""",
batch_params,
)
approval_to_incident = {
str(row.get("id")): str(row.get("incident_id"))
for row in approvals
if row.get("id") and row.get("incident_id")
}
approval_ids = list(approval_to_incident.keys()) or ["__none__"]
evidence_rows = await _fetch_all(
db,
"""
SELECT
id,
incident_id,
matched_playbook_id,
collected_at,
collection_duration_ms,
sensors_attempted,
sensors_succeeded,
evidence_summary,
metrics_snapshot,
mcp_health,
verification_result,
pre_execution_state,
post_execution_state,
self_healing_score,
self_healing_detail
FROM incident_evidence
WHERE incident_id = ANY(CAST(:incident_ids AS text[]))
ORDER BY collected_at DESC
LIMIT :limit
""",
batch_params,
)
timeline_events = await _fetch_all(
db,
"""
SELECT
id,
event_type,
status,
title,
description,
actor,
actor_role,
risk_level,
approval_id,
incident_id,
created_at
FROM timeline_events
WHERE incident_id = ANY(CAST(:incident_ids AS text[]))
OR approval_id::text = ANY(CAST(:approval_ids AS text[]))
ORDER BY created_at ASC
LIMIT :limit
""",
{**batch_params, "approval_ids": approval_ids},
)
legacy_mcp_rows = await _fetch_all(
db,
"""
SELECT
id,
session_id,
flywheel_node,
mcp_server,
tool_name,
duration_ms,
success,
error_message,
incident_id,
agent_role,
created_at
FROM mcp_audit_log
WHERE incident_id = ANY(CAST(:incident_ids AS text[]))
ORDER BY created_at ASC
LIMIT :limit
""",
batch_params,
)
automation_ops = await _fetch_all(
db,
"""
SELECT
op_id,
operation_type,
status,
incident_id,
run_id,
parent_op_id,
actor,
dry_run_result,
error,
duration_ms,
tags,
input ->> 'action' AS input_action,
input ->> 'executor' AS input_executor,
input ->> 'execution_backend' AS input_execution_backend,
input ->> 'catalog_id' AS input_catalog_id,
input ->> 'execution_mode' AS input_execution_mode,
input ->> 'approval_source' AS input_approval_source,
input ->> 'apply_enabled' AS input_apply_enabled,
input ->> 'apply_executed' AS input_apply_executed,
input ->> 'check_mode_executed' AS input_check_mode_executed,
input ->> 'returncode' AS input_returncode,
input ->> 'playbook_id' AS input_playbook_id,
input ->> 'playbook_path' AS input_playbook_path,
input ->> 'ansible_playbook_path' AS input_ansible_playbook_path,
input ->> 'check_mode' AS input_check_mode,
input ->> 'not_used_reason' AS input_not_used_reason,
output ->> 'action' AS output_action,
output ->> 'reason' AS output_reason,
output ->> 'executor' AS output_executor,
output ->> 'execution_backend' AS output_execution_backend,
output ->> 'catalog_id' AS output_catalog_id,
output ->> 'execution_mode' AS output_execution_mode,
output ->> 'approval_source' AS output_approval_source,
output ->> 'apply_enabled' AS output_apply_enabled,
output ->> 'apply_executed' AS output_apply_executed,
output ->> 'check_mode_executed' AS output_check_mode_executed,
output ->> 'returncode' AS output_returncode,
output ->> 'playbook_id' AS output_playbook_id,
output ->> 'playbook_path' AS output_playbook_path,
output ->> 'ansible_playbook_path' AS output_ansible_playbook_path,
output ->> 'check_mode' AS output_check_mode,
output ->> 'not_used_reason' AS output_not_used_reason,
dry_run_result ->> 'returncode' AS dry_run_returncode,
dry_run_result ->> 'apply_executed' AS dry_run_apply_executed,
dry_run_result ->> 'check_mode_executed' AS dry_run_check_mode_executed,
coalesce(input::text, '') AS input_text,
coalesce(output::text, '') AS output_text,
coalesce(array_to_string(tags, ','), '') AS tags_text,
created_at
FROM automation_operation_log
WHERE incident_id::text = ANY(CAST(:incident_ids AS text[]))
OR EXISTS (
SELECT 1
FROM unnest(CAST(:incident_ids AS text[])) AS source_ids(incident_id)
WHERE coalesce(input::text, '') LIKE '%' || source_ids.incident_id || '%'
OR coalesce(output::text, '') LIKE '%' || source_ids.incident_id || '%'
OR coalesce(array_to_string(tags, ','), '') LIKE '%' || source_ids.incident_id || '%'
)
ORDER BY created_at DESC
LIMIT :limit
""",
batch_params,
)
auto_repair_executions = await _fetch_all(
db,
"""
SELECT
id,
incident_id,
playbook_id,
playbook_name,
success,
executed_steps,
error_message,
triggered_by,
similarity_score,
risk_level,
execution_time_ms,
created_at
FROM auto_repair_executions
WHERE incident_id = ANY(CAST(:incident_ids AS text[]))
ORDER BY created_at DESC
LIMIT :limit
""",
batch_params,
)
km_entries = await _fetch_all(
db,
"""
SELECT
id,
title,
entry_type::text AS entry_type,
status::text AS status,
related_incident_id,
related_approval_id,
created_at
FROM knowledge_entries
WHERE related_incident_id = ANY(CAST(:incident_ids AS text[]))
ORDER BY created_at DESC
LIMIT :limit
""",
batch_params,
)
gateway_mcp_rows = await _fetch_all(
db,
"""
SELECT
call_id,
project_id,
run_id,
trace_id,
agent_id,
tool_name,
gate_result,
result_status,
block_gate,
block_reason,
latency_ms,
created_at
FROM awooop_mcp_gateway_audit
WHERE project_id = :project_id
AND (
trace_id = ANY(CAST(:incident_ids AS text[]))
OR run_id::text = ANY(CAST(:incident_ids AS text[]))
)
ORDER BY created_at ASC
LIMIT :limit
""",
{**batch_params, "project_id": normalized_project_id},
)
outbound_rows = await _fetch_all(
db,
"""
SELECT
message_id,
project_id,
run_id,
channel_type,
message_type,
content_hash,
content_preview,
content_redacted,
redaction_version,
source_envelope,
provider_message_id,
send_status,
queued_at,
sent_at,
triggered_by_state
FROM awooop_outbound_message
WHERE project_id = :project_id
AND (
run_id::text = ANY(CAST(:incident_ids AS text[]))
OR EXISTS (
SELECT 1
FROM unnest(CAST(:incident_ids AS text[])) AS source_ids(incident_id)
WHERE coalesce(content_preview, '') ILIKE '%' || source_ids.incident_id || '%'
OR coalesce(source_envelope::text, '') LIKE '%' || source_ids.incident_id || '%'
)
)
ORDER BY queued_at DESC
LIMIT :limit
""",
{**batch_params, "project_id": normalized_project_id},
)
return {
"incident": truth_chain.get("incident") or incident,
"truth_status": truth_chain.get("truth_status") or {},
"automation_quality": truth_chain.get("automation_quality") or {},
"execution": truth_chain.get("execution") or {},
}
records = [
record
for record in await asyncio.gather(*(_quality_record(incident) for incident in incidents))
if record is not None
]
approvals_by_incident = _group_rows_by_incident_reference(
rows=approvals,
incident_ids=incident_ids,
direct_fields=("incident_id",),
)
evidence_by_incident = _group_rows_by_incident_reference(
rows=evidence_rows,
incident_ids=incident_ids,
direct_fields=("incident_id",),
)
timeline_by_incident = _group_timeline_events_by_incident(
rows=timeline_events,
incident_ids=incident_ids,
approval_to_incident=approval_to_incident,
)
legacy_mcp_by_incident = _group_rows_by_incident_reference(
rows=legacy_mcp_rows,
incident_ids=incident_ids,
direct_fields=("incident_id",),
)
automation_ops_by_incident = _group_rows_by_incident_reference(
rows=automation_ops,
incident_ids=incident_ids,
direct_fields=("incident_id",),
text_fields=("input_text", "output_text", "tags_text"),
)
auto_repair_by_incident = _group_rows_by_incident_reference(
rows=auto_repair_executions,
incident_ids=incident_ids,
direct_fields=("incident_id",),
)
km_by_incident = _group_rows_by_incident_reference(
rows=km_entries,
incident_ids=incident_ids,
direct_fields=("related_incident_id",),
)
gateway_mcp_by_incident = _group_rows_by_incident_reference(
rows=gateway_mcp_rows,
incident_ids=incident_ids,
direct_fields=("trace_id", "run_id"),
)
outbound_by_incident = _group_rows_by_incident_reference(
rows=outbound_rows,
incident_ids=incident_ids,
direct_fields=("run_id",),
text_fields=("content_preview",),
source_ref_keys=("incident_ids", "code_refs"),
)
records = _build_summary_quality_records(
incidents=incidents,
approvals_by_incident=approvals_by_incident,
evidence_by_incident=evidence_by_incident,
timeline_by_incident=timeline_by_incident,
legacy_mcp_by_incident=legacy_mcp_by_incident,
automation_ops_by_incident=automation_ops_by_incident,
auto_repair_by_incident=auto_repair_by_incident,
km_by_incident=km_by_incident,
gateway_mcp_by_incident=gateway_mcp_by_incident,
outbound_by_incident=outbound_by_incident,
)
summary = summarize_automation_quality_records(
project_id=normalized_project_id,

View File

@@ -83,6 +83,16 @@ def test_quality_summary_includes_recent_ansible_operation_incidents() -> None:
assert "source_ids.recent_evidence_at DESC" in source
def test_quality_summary_uses_batched_truth_chain_inputs() -> None:
source = inspect.getsource(fetch_automation_quality_summary)
assert "fetch_truth_chain(" not in source
assert "approval_records" in source
assert "incident_evidence" in source
assert "awooop_outbound_message" in source
assert "_build_summary_quality_records" in source
def test_ansible_audit_keeps_external_incident_id_in_json_not_bigint_column() -> None:
decision_source = inspect.getsource(record_ansible_decision_audit)
claim_source = inspect.getsource(claim_pending_check_modes)

View File

@@ -1,3 +1,30 @@
## 2026-06-01truth-chain quality summary 批次化查詢
**背景**
- 首頁已加上 8 秒降級保護,但正式環境 `/api/v1/platform/truth-chain/quality/summary` 快取未命中時仍可能需要數秒到數十秒。
- 根因是 summary 端點會對每個 recent incident 再呼叫一次完整 `fetch_truth_chain()`,造成 N+1 truth-chain fanout每筆 incident 又會查 approvals、evidence、timeline、MCP、execution、KM、outbound channel 等多張表。
**本次調整**
- `apps/api/src/services/awooop_truth_chain_service.py`
- `fetch_automation_quality_summary()` 改為同一個 DB context 批次抓最近 incident 的 approvals、evidence、timeline、legacy MCP、automation ops、auto-repair executions、KM、AwoooP MCP gateway、outbound messages。
- 新增 `_build_summary_quality_records()`,沿用既有 `_truth_status()``build_automation_quality()``build_ansible_truth()`,避免重寫判斷規則。
- 移除 summary path 對 `fetch_truth_chain()` 的逐筆 fanout。
- `apps/api/tests/test_awooop_truth_chain_service.py`
- 新增防回歸測試,確認 quality summary 不再呼叫 `fetch_truth_chain()`,且保留 batch input 來源表。
**驗證**
- `python3 -m py_compile apps/api/src/services/awooop_truth_chain_service.py`
- `python3 scripts/security/security-mirror-progress-guard.py --root .`
- `DATABASE_URL=postgresql://test:test@localhost:5432/test PYTHONPATH=apps/api /Users/ogt/.pyenv/shims/pytest apps/api/tests/test_awooop_truth_chain_service.py apps/api/tests/test_operator_summary_cache.py -q``53 passed`
**進度邊界**
- 整體 AI 自動化飛輪進度仍維持 `61%`;這輪是 truth-chain summary 效能修復,不直接提高自動修復成功率。
- 下一個正式環境驗證點:`refresh=true&limit=8/30` 的 quality summary 應明顯低於上一輪約 9-27 秒的快取未命中時間,且首頁仍能顯示 live quality data。
## 2026-06-01IwoooS 首屏繁中文案收斂
**背景**