From a31e7bbd29dc2f9b100304fa1a922b0049b9706d Mon Sep 17 00:00:00 2001 From: Your Name Date: Mon, 1 Jun 2026 16:12:42 +0800 Subject: [PATCH] fix(api): batch truth chain quality summary --- .../services/awooop_truth_chain_service.py | 516 +++++++++++++++++- .../tests/test_awooop_truth_chain_service.py | 10 + docs/LOGBOOK.md | 27 + 3 files changed, 530 insertions(+), 23 deletions(-) diff --git a/apps/api/src/services/awooop_truth_chain_service.py b/apps/api/src/services/awooop_truth_chain_service.py index c6ee9dbb..188ede64 100644 --- a/apps/api/src/services/awooop_truth_chain_service.py +++ b/apps/api/src/services/awooop_truth_chain_service.py @@ -7,7 +7,6 @@ Telegram cards can be audited without guessing which subsystem owns the truth. from __future__ import annotations -import asyncio import json import os import shutil @@ -35,7 +34,6 @@ logger = structlog.get_logger(__name__) _MAX_ROWS = 100 _JSON_TEXT_FIELDS = {"gate_result", "source_envelope"} -_QUALITY_SUMMARY_CONCURRENCY = 8 _QUALITY_SUMMARY_CACHE_TTL_SECONDS = int( os.getenv("AWOOOP_QUALITY_SUMMARY_CACHE_TTL_SECONDS", "30") ) @@ -1919,6 +1917,160 @@ async def fetch_truth_chain(source_id: str, project_id: str = "awoooi") -> dict[ return result +def _empty_incident_groups(incident_ids: list[str]) -> dict[str, list[dict[str, Any]]]: + return {incident_id: [] for incident_id in incident_ids} + + +def _append_unique_row(group: list[dict[str, Any]], row: dict[str, Any]) -> None: + if row not in group: + group.append(row) + + +def _row_source_refs(row: dict[str, Any]) -> dict[str, Any]: + envelope = row.get("source_envelope") + if not isinstance(envelope, dict): + return {} + refs = envelope.get("source_refs") + return refs if isinstance(refs, dict) else {} + + +def _source_ref_contains(row: dict[str, Any], keys: tuple[str, ...], incident_id: str) -> bool: + refs = _row_source_refs(row) + for key in keys: + value = refs.get(key) + if isinstance(value, list) and incident_id in {str(item) for item in value}: + return True + if str(value or "") == incident_id: + return True + return False + + +def _group_rows_by_incident_reference( + *, + rows: list[dict[str, Any]], + incident_ids: list[str], + direct_fields: tuple[str, ...], + text_fields: tuple[str, ...] = (), + source_ref_keys: tuple[str, ...] = (), +) -> dict[str, list[dict[str, Any]]]: + """Group batch-fetched rows back to incidents using durable direct/source refs.""" + incident_set = set(incident_ids) + groups = _empty_incident_groups(incident_ids) + for row in rows: + direct_matches = { + str(row.get(field) or "") + for field in direct_fields + if str(row.get(field) or "") in incident_set + } + for incident_id in direct_matches: + groups[incident_id].append(row) + if direct_matches: + continue + + text_blob = "\n".join(str(row.get(field) or "") for field in text_fields) + for incident_id in incident_ids: + if ( + (text_blob and incident_id in text_blob) + or ( + source_ref_keys + and _source_ref_contains(row, source_ref_keys, incident_id) + ) + ): + _append_unique_row(groups[incident_id], row) + return groups + + +def _group_timeline_events_by_incident( + *, + rows: list[dict[str, Any]], + incident_ids: list[str], + approval_to_incident: dict[str, str], +) -> dict[str, list[dict[str, Any]]]: + groups = _empty_incident_groups(incident_ids) + incident_set = set(incident_ids) + for row in rows: + incident_id = str(row.get("incident_id") or "") + if incident_id in incident_set: + groups[incident_id].append(row) + continue + approval_incident_id = approval_to_incident.get(str(row.get("approval_id") or "")) + if approval_incident_id: + groups[approval_incident_id].append(row) + return groups + + +def _build_summary_quality_records( + *, + incidents: list[dict[str, Any]], + approvals_by_incident: dict[str, list[dict[str, Any]]], + evidence_by_incident: dict[str, list[dict[str, Any]]], + timeline_by_incident: dict[str, list[dict[str, Any]]], + legacy_mcp_by_incident: dict[str, list[dict[str, Any]]], + automation_ops_by_incident: dict[str, list[dict[str, Any]]], + auto_repair_by_incident: dict[str, list[dict[str, Any]]], + km_by_incident: dict[str, list[dict[str, Any]]], + gateway_mcp_by_incident: dict[str, list[dict[str, Any]]], + outbound_by_incident: dict[str, list[dict[str, Any]]], +) -> list[dict[str, Any]]: + records: list[dict[str, Any]] = [] + for incident in incidents: + incident_id = str(incident.get("incident_id") or "") + if not incident_id: + continue + approvals = approvals_by_incident.get(incident_id, []) + evidence_rows = evidence_by_incident.get(incident_id, []) + timeline_events = timeline_by_incident.get(incident_id, []) + legacy_mcp_rows = legacy_mcp_by_incident.get(incident_id, []) + automation_ops = automation_ops_by_incident.get(incident_id, []) + auto_repair_executions = auto_repair_by_incident.get(incident_id, []) + km_entries = km_by_incident.get(incident_id, []) + gateway_mcp_rows = gateway_mcp_by_incident.get(incident_id, []) + outbound_rows = outbound_by_incident.get(incident_id, []) + legacy_mcp_summary = _summarize_mcp(legacy_mcp_rows) + gateway_mcp_summary = _summarize_gateway_mcp(gateway_mcp_rows) + truth_status = _truth_status( + incident=incident, + approvals=approvals, + evidence_rows=evidence_rows, + automation_ops=automation_ops, + drift=None, + drift_repeat_count=0, + gateway_mcp_total=len(gateway_mcp_rows), + legacy_mcp_total=legacy_mcp_summary["total"], + outbound_visible_total=len(outbound_rows), + inbound_visible_total=0, + auto_repair_executions=auto_repair_executions, + ) + automation_quality = build_automation_quality( + incident=incident, + approvals=approvals, + evidence_rows=evidence_rows, + automation_ops=automation_ops, + auto_repair_executions=auto_repair_executions, + gateway_mcp_summary=gateway_mcp_summary, + legacy_mcp_summary=legacy_mcp_summary, + outbound_rows=outbound_rows, + km_entries=km_entries, + timeline_events=timeline_events, + ) + automation_quality["operator_outcome"] = build_operator_outcome( + truth_status=truth_status, + automation_quality=automation_quality, + source_id=incident_id, + ) + records.append({ + "incident": incident, + "truth_status": truth_status, + "automation_quality": automation_quality, + "execution": { + "automation_operation_log": automation_ops, + "auto_repair_executions": auto_repair_executions, + "ansible": build_ansible_truth(automation_ops, incident=incident, drift=None), + }, + }) + return records + + async def fetch_automation_quality_summary( *, project_id: str = "awoooi", @@ -2015,29 +2167,347 @@ async def fetch_automation_quality_summary( }, ) - semaphore = asyncio.Semaphore(_QUALITY_SUMMARY_CONCURRENCY) - - async def _quality_record(incident: dict[str, Any]) -> dict[str, Any] | None: - incident_id = str(incident.get("incident_id") or "") - if not incident_id: - return None - async with semaphore: - truth_chain = await fetch_truth_chain( - source_id=incident_id, - project_id=normalized_project_id, + incident_ids = [ + str(incident.get("incident_id") or "") + for incident in incidents + if incident.get("incident_id") + ] + if not incident_ids: + records = [] + else: + batch_params = {"incident_ids": incident_ids, "limit": _MAX_ROWS * len(incident_ids)} + approvals = await _fetch_all( + db, + """ + SELECT + id, + incident_id, + status::text AS status, + risk_level::text AS risk_level, + action, + description, + hit_count, + created_at, + updated_at, + resolved_at, + matched_playbook_id, + extra_metadata, + decision_fusion_details + FROM approval_records + WHERE incident_id = ANY(CAST(:incident_ids AS text[])) + ORDER BY created_at DESC + LIMIT :limit + """, + batch_params, + ) + approval_to_incident = { + str(row.get("id")): str(row.get("incident_id")) + for row in approvals + if row.get("id") and row.get("incident_id") + } + approval_ids = list(approval_to_incident.keys()) or ["__none__"] + evidence_rows = await _fetch_all( + db, + """ + SELECT + id, + incident_id, + matched_playbook_id, + collected_at, + collection_duration_ms, + sensors_attempted, + sensors_succeeded, + evidence_summary, + metrics_snapshot, + mcp_health, + verification_result, + pre_execution_state, + post_execution_state, + self_healing_score, + self_healing_detail + FROM incident_evidence + WHERE incident_id = ANY(CAST(:incident_ids AS text[])) + ORDER BY collected_at DESC + LIMIT :limit + """, + batch_params, + ) + timeline_events = await _fetch_all( + db, + """ + SELECT + id, + event_type, + status, + title, + description, + actor, + actor_role, + risk_level, + approval_id, + incident_id, + created_at + FROM timeline_events + WHERE incident_id = ANY(CAST(:incident_ids AS text[])) + OR approval_id::text = ANY(CAST(:approval_ids AS text[])) + ORDER BY created_at ASC + LIMIT :limit + """, + {**batch_params, "approval_ids": approval_ids}, + ) + legacy_mcp_rows = await _fetch_all( + db, + """ + SELECT + id, + session_id, + flywheel_node, + mcp_server, + tool_name, + duration_ms, + success, + error_message, + incident_id, + agent_role, + created_at + FROM mcp_audit_log + WHERE incident_id = ANY(CAST(:incident_ids AS text[])) + ORDER BY created_at ASC + LIMIT :limit + """, + batch_params, + ) + automation_ops = await _fetch_all( + db, + """ + SELECT + op_id, + operation_type, + status, + incident_id, + run_id, + parent_op_id, + actor, + dry_run_result, + error, + duration_ms, + tags, + input ->> 'action' AS input_action, + input ->> 'executor' AS input_executor, + input ->> 'execution_backend' AS input_execution_backend, + input ->> 'catalog_id' AS input_catalog_id, + input ->> 'execution_mode' AS input_execution_mode, + input ->> 'approval_source' AS input_approval_source, + input ->> 'apply_enabled' AS input_apply_enabled, + input ->> 'apply_executed' AS input_apply_executed, + input ->> 'check_mode_executed' AS input_check_mode_executed, + input ->> 'returncode' AS input_returncode, + input ->> 'playbook_id' AS input_playbook_id, + input ->> 'playbook_path' AS input_playbook_path, + input ->> 'ansible_playbook_path' AS input_ansible_playbook_path, + input ->> 'check_mode' AS input_check_mode, + input ->> 'not_used_reason' AS input_not_used_reason, + output ->> 'action' AS output_action, + output ->> 'reason' AS output_reason, + output ->> 'executor' AS output_executor, + output ->> 'execution_backend' AS output_execution_backend, + output ->> 'catalog_id' AS output_catalog_id, + output ->> 'execution_mode' AS output_execution_mode, + output ->> 'approval_source' AS output_approval_source, + output ->> 'apply_enabled' AS output_apply_enabled, + output ->> 'apply_executed' AS output_apply_executed, + output ->> 'check_mode_executed' AS output_check_mode_executed, + output ->> 'returncode' AS output_returncode, + output ->> 'playbook_id' AS output_playbook_id, + output ->> 'playbook_path' AS output_playbook_path, + output ->> 'ansible_playbook_path' AS output_ansible_playbook_path, + output ->> 'check_mode' AS output_check_mode, + output ->> 'not_used_reason' AS output_not_used_reason, + dry_run_result ->> 'returncode' AS dry_run_returncode, + dry_run_result ->> 'apply_executed' AS dry_run_apply_executed, + dry_run_result ->> 'check_mode_executed' AS dry_run_check_mode_executed, + coalesce(input::text, '') AS input_text, + coalesce(output::text, '') AS output_text, + coalesce(array_to_string(tags, ','), '') AS tags_text, + created_at + FROM automation_operation_log + WHERE incident_id::text = ANY(CAST(:incident_ids AS text[])) + OR EXISTS ( + SELECT 1 + FROM unnest(CAST(:incident_ids AS text[])) AS source_ids(incident_id) + WHERE coalesce(input::text, '') LIKE '%' || source_ids.incident_id || '%' + OR coalesce(output::text, '') LIKE '%' || source_ids.incident_id || '%' + OR coalesce(array_to_string(tags, ','), '') LIKE '%' || source_ids.incident_id || '%' + ) + ORDER BY created_at DESC + LIMIT :limit + """, + batch_params, + ) + auto_repair_executions = await _fetch_all( + db, + """ + SELECT + id, + incident_id, + playbook_id, + playbook_name, + success, + executed_steps, + error_message, + triggered_by, + similarity_score, + risk_level, + execution_time_ms, + created_at + FROM auto_repair_executions + WHERE incident_id = ANY(CAST(:incident_ids AS text[])) + ORDER BY created_at DESC + LIMIT :limit + """, + batch_params, + ) + km_entries = await _fetch_all( + db, + """ + SELECT + id, + title, + entry_type::text AS entry_type, + status::text AS status, + related_incident_id, + related_approval_id, + created_at + FROM knowledge_entries + WHERE related_incident_id = ANY(CAST(:incident_ids AS text[])) + ORDER BY created_at DESC + LIMIT :limit + """, + batch_params, + ) + gateway_mcp_rows = await _fetch_all( + db, + """ + SELECT + call_id, + project_id, + run_id, + trace_id, + agent_id, + tool_name, + gate_result, + result_status, + block_gate, + block_reason, + latency_ms, + created_at + FROM awooop_mcp_gateway_audit + WHERE project_id = :project_id + AND ( + trace_id = ANY(CAST(:incident_ids AS text[])) + OR run_id::text = ANY(CAST(:incident_ids AS text[])) + ) + ORDER BY created_at ASC + LIMIT :limit + """, + {**batch_params, "project_id": normalized_project_id}, + ) + outbound_rows = await _fetch_all( + db, + """ + SELECT + message_id, + project_id, + run_id, + channel_type, + message_type, + content_hash, + content_preview, + content_redacted, + redaction_version, + source_envelope, + provider_message_id, + send_status, + queued_at, + sent_at, + triggered_by_state + FROM awooop_outbound_message + WHERE project_id = :project_id + AND ( + run_id::text = ANY(CAST(:incident_ids AS text[])) + OR EXISTS ( + SELECT 1 + FROM unnest(CAST(:incident_ids AS text[])) AS source_ids(incident_id) + WHERE coalesce(content_preview, '') ILIKE '%' || source_ids.incident_id || '%' + OR coalesce(source_envelope::text, '') LIKE '%' || source_ids.incident_id || '%' + ) + ) + ORDER BY queued_at DESC + LIMIT :limit + """, + {**batch_params, "project_id": normalized_project_id}, ) - return { - "incident": truth_chain.get("incident") or incident, - "truth_status": truth_chain.get("truth_status") or {}, - "automation_quality": truth_chain.get("automation_quality") or {}, - "execution": truth_chain.get("execution") or {}, - } - records = [ - record - for record in await asyncio.gather(*(_quality_record(incident) for incident in incidents)) - if record is not None - ] + approvals_by_incident = _group_rows_by_incident_reference( + rows=approvals, + incident_ids=incident_ids, + direct_fields=("incident_id",), + ) + evidence_by_incident = _group_rows_by_incident_reference( + rows=evidence_rows, + incident_ids=incident_ids, + direct_fields=("incident_id",), + ) + timeline_by_incident = _group_timeline_events_by_incident( + rows=timeline_events, + incident_ids=incident_ids, + approval_to_incident=approval_to_incident, + ) + legacy_mcp_by_incident = _group_rows_by_incident_reference( + rows=legacy_mcp_rows, + incident_ids=incident_ids, + direct_fields=("incident_id",), + ) + automation_ops_by_incident = _group_rows_by_incident_reference( + rows=automation_ops, + incident_ids=incident_ids, + direct_fields=("incident_id",), + text_fields=("input_text", "output_text", "tags_text"), + ) + auto_repair_by_incident = _group_rows_by_incident_reference( + rows=auto_repair_executions, + incident_ids=incident_ids, + direct_fields=("incident_id",), + ) + km_by_incident = _group_rows_by_incident_reference( + rows=km_entries, + incident_ids=incident_ids, + direct_fields=("related_incident_id",), + ) + gateway_mcp_by_incident = _group_rows_by_incident_reference( + rows=gateway_mcp_rows, + incident_ids=incident_ids, + direct_fields=("trace_id", "run_id"), + ) + outbound_by_incident = _group_rows_by_incident_reference( + rows=outbound_rows, + incident_ids=incident_ids, + direct_fields=("run_id",), + text_fields=("content_preview",), + source_ref_keys=("incident_ids", "code_refs"), + ) + records = _build_summary_quality_records( + incidents=incidents, + approvals_by_incident=approvals_by_incident, + evidence_by_incident=evidence_by_incident, + timeline_by_incident=timeline_by_incident, + legacy_mcp_by_incident=legacy_mcp_by_incident, + automation_ops_by_incident=automation_ops_by_incident, + auto_repair_by_incident=auto_repair_by_incident, + km_by_incident=km_by_incident, + gateway_mcp_by_incident=gateway_mcp_by_incident, + outbound_by_incident=outbound_by_incident, + ) summary = summarize_automation_quality_records( project_id=normalized_project_id, diff --git a/apps/api/tests/test_awooop_truth_chain_service.py b/apps/api/tests/test_awooop_truth_chain_service.py index bdf656e2..d9af182e 100644 --- a/apps/api/tests/test_awooop_truth_chain_service.py +++ b/apps/api/tests/test_awooop_truth_chain_service.py @@ -83,6 +83,16 @@ def test_quality_summary_includes_recent_ansible_operation_incidents() -> None: assert "source_ids.recent_evidence_at DESC" in source +def test_quality_summary_uses_batched_truth_chain_inputs() -> None: + source = inspect.getsource(fetch_automation_quality_summary) + + assert "fetch_truth_chain(" not in source + assert "approval_records" in source + assert "incident_evidence" in source + assert "awooop_outbound_message" in source + assert "_build_summary_quality_records" in source + + def test_ansible_audit_keeps_external_incident_id_in_json_not_bigint_column() -> None: decision_source = inspect.getsource(record_ansible_decision_audit) claim_source = inspect.getsource(claim_pending_check_modes) diff --git a/docs/LOGBOOK.md b/docs/LOGBOOK.md index 727d0996..d4921c72 100644 --- a/docs/LOGBOOK.md +++ b/docs/LOGBOOK.md @@ -1,3 +1,30 @@ +## 2026-06-01|truth-chain quality summary 批次化查詢 + +**背景**: + +- 首頁已加上 8 秒降級保護,但正式環境 `/api/v1/platform/truth-chain/quality/summary` 快取未命中時仍可能需要數秒到數十秒。 +- 根因是 summary 端點會對每個 recent incident 再呼叫一次完整 `fetch_truth_chain()`,造成 N+1 truth-chain fanout;每筆 incident 又會查 approvals、evidence、timeline、MCP、execution、KM、outbound channel 等多張表。 + +**本次調整**: + +- `apps/api/src/services/awooop_truth_chain_service.py`: + - `fetch_automation_quality_summary()` 改為同一個 DB context 批次抓最近 incident 的 approvals、evidence、timeline、legacy MCP、automation ops、auto-repair executions、KM、AwoooP MCP gateway、outbound messages。 + - 新增 `_build_summary_quality_records()`,沿用既有 `_truth_status()`、`build_automation_quality()`、`build_ansible_truth()`,避免重寫判斷規則。 + - 移除 summary path 對 `fetch_truth_chain()` 的逐筆 fanout。 +- `apps/api/tests/test_awooop_truth_chain_service.py`: + - 新增防回歸測試,確認 quality summary 不再呼叫 `fetch_truth_chain()`,且保留 batch input 來源表。 + +**驗證**: + +- `python3 -m py_compile apps/api/src/services/awooop_truth_chain_service.py` +- `python3 scripts/security/security-mirror-progress-guard.py --root .` +- `DATABASE_URL=postgresql://test:test@localhost:5432/test PYTHONPATH=apps/api /Users/ogt/.pyenv/shims/pytest apps/api/tests/test_awooop_truth_chain_service.py apps/api/tests/test_operator_summary_cache.py -q` → `53 passed` + +**進度邊界**: + +- 整體 AI 自動化飛輪進度仍維持 `61%`;這輪是 truth-chain summary 效能修復,不直接提高自動修復成功率。 +- 下一個正式環境驗證點:`refresh=true&limit=8/30` 的 quality summary 應明顯低於上一輪約 9-27 秒的快取未命中時間,且首頁仍能顯示 live quality data。 + ## 2026-06-01|IwoooS 首屏繁中文案收斂 **背景**: