diff --git a/apps/api/migrations/adr090c_notification_formatted_op_type.sql b/apps/api/migrations/adr090c_notification_formatted_op_type.sql new file mode 100644 index 00000000..fe2b3cdd --- /dev/null +++ b/apps/api/migrations/adr090c_notification_formatted_op_type.sql @@ -0,0 +1,42 @@ +-- ADR-090-C: automation_operation_log.operation_type 擴充 notification_formatted +-- 建立時間: 2026-04-18 下午 (台北時區) +-- 建立者: ogt + Claude Opus 4.7 (1M) +-- +-- 上游: +-- - ADR-090 主 schema (adr090_asset_inventory_foundation.sql) +-- - drift_narrator_service B 方案(LLM 摘要取代 str()[:30]) +-- +-- 目的: +-- drift_narrator 每次呼叫 LLM 生成摘要 + 寫 Telegram, +-- 這是一個 AI 動作,必須在 automation_operation_log 留痕。 +-- 現有 CHECK 沒有合適的 operation_type,新增 notification_formatted。 +-- +-- Idempotent: +-- 先 DROP CONSTRAINT IF EXISTS 再 ADD,重複執行安全。 +-- +-- 執行: PGPASSWORD="$MIGRATOR_PWD" psql -U awoooi_migrator -d awoooi_prod -f 本檔 +-- 回滾: 把 notification_formatted 從 IN 清單移除後重跑。 + +-- ============================================================================ + +ALTER TABLE automation_operation_log + DROP CONSTRAINT IF EXISTS automation_operation_log_type_valid; + +ALTER TABLE automation_operation_log + ADD CONSTRAINT automation_operation_log_type_valid CHECK (operation_type IN ( + 'monitor_configured','monitor_removed', + 'alert_fired','alert_suppressed','alert_routed', + 'rule_created','rule_updated','rule_matched','rule_rejected','rule_deprecated', + 'playbook_generated','playbook_updated','playbook_executed', + 'remediation_executed','remediation_verified','remediation_rolled_back', + 'self_correction_attempted', + 'km_created','km_updated','km_linked', + 'asset_discovered','coverage_recalculated', + 'capacity_recommendation','quota_enforced', + 'notification_formatted' -- ADR-090-C 新增 (drift_narrator / 未來其他通知格式化 AI 動作) + )); + +-- 驗收查詢 (apply 後可手動跑): +-- SELECT pg_get_constraintdef(oid) FROM pg_constraint +-- WHERE conname='automation_operation_log_type_valid'; +-- 應包含 'notification_formatted' diff --git a/apps/api/src/services/drift_narrator_service.py b/apps/api/src/services/drift_narrator_service.py index c0bcef03..18a8a162 100644 --- a/apps/api/src/services/drift_narrator_service.py +++ b/apps/api/src/services/drift_narrator_service.py @@ -171,8 +171,12 @@ class DriftNarratorService: items: [{level, field, summary}, ...] 最多 5 筆 LLM 失敗則 fallback 到 Python 智能截斷(不是 str()[:30] 暴力砍) + + 2026-04-18 ADR-090-C: 每次呼叫同步寫入 automation_operation_log + + ai_collaboration_trace(不論成功或 fallback),完整 L4 稽核。 """ import json as _json + import time drift_items_json = self._format_drift_for_llm(report) intent_summary = self._format_intent_summary(interpretation) @@ -182,24 +186,32 @@ class DriftNarratorService: intent_summary=intent_summary, ) + started_ms = time.time() + narrative: str = "" + items: list[dict] = [] + raw_response: str | None = None + provider: str = "unknown" + status: str = "failed" + llm_accepted: bool = False + try: openclaw = get_openclaw() text, _provider, success = await openclaw.call(prompt) + provider = _provider or "unknown" + raw_response = text if text else None if success and text and text.strip(): _raw = text.strip() - # 嘗試剝 code fence if _raw.startswith("```"): _raw = _raw.strip("`").lstrip("json").strip() try: _parsed = _json.loads(_raw) if isinstance(_parsed, dict): - narrative = str(_parsed.get("narrative", "")).strip() - items = _parsed.get("items", []) - if isinstance(items, list) and narrative: - # 驗證 item 結構 + _narrative = str(_parsed.get("narrative", "")).strip() + _items = _parsed.get("items", []) + if isinstance(_items, list) and _narrative: clean_items = [] - for it in items[:5]: + for it in _items[:5]: if isinstance(it, dict) and it.get("field") and it.get("summary"): clean_items.append({ "level": it.get("level", "medium"), @@ -207,17 +219,157 @@ class DriftNarratorService: "summary": str(it["summary"])[:80], }) if clean_items: - return narrative, clean_items + narrative = _narrative + items = clean_items + status = "success" + llm_accepted = True except (_json.JSONDecodeError, ValueError) as e: logger.warning("drift_narrator_json_parse_fail", err=str(e), raw_prefix=_raw[:80]) - logger.warning("drift_narrator_openclaw_failed", provider=_provider) + if not llm_accepted: + logger.warning("drift_narrator_openclaw_failed", provider=provider) except Exception as e: logger.warning("drift_narrator_llm_error", error=str(e)) - # Fallback:Python 智能截斷(不是 str()[:30]) - return self._fallback_narrative(report, interpretation), self._fallback_items(report) + # Fallback + if not llm_accepted: + narrative = self._fallback_narrative(report, interpretation) + items = self._fallback_items(report) + status = "failed" + + # ADR-090-C: 同步寫 DB 稽核(永不 propagate error,保護主流程) + duration_ms = int((time.time() - started_ms) * 1000) + try: + await self._log_ai_action_to_db( + report=report, + prompt=prompt, + raw_response=raw_response, + narrative=narrative, + items=items, + provider=provider, + status=status, + llm_accepted=llm_accepted, + duration_ms=duration_ms, + ) + except Exception as e: + logger.warning("drift_narrator_audit_write_failed", error=str(e)) + + return narrative, items + + async def _log_ai_action_to_db( + self, + report: "DriftReport", + prompt: str, + raw_response: str | None, + narrative: str, + items: list[dict], + provider: str, + status: str, + llm_accepted: bool, + duration_ms: int, + ) -> None: + """ + ADR-090-C: 把 drift narrator 的 AI 動作寫入 automation_operation_log + + ai_collaboration_trace(L4 稽核 + 未來 RLHF 語料) + + - op_type='notification_formatted' + - actor='drift_narrator' + - 若能找到該 drift 的 incident 關聯,設 parent_op_id + """ + import json as _json + from sqlalchemy import text as _sql + from src.db.base import get_db_context + + input_json = _json.dumps({ + "report_id": report.report_id, + "namespace": report.namespace, + "high_count": report.high_count, + "medium_count": report.medium_count, + "items_scanned": len(report.items), + }) + output_json = _json.dumps({ + "narrative": narrative, + "items": items, + "items_count": len(items), + }, ensure_ascii=False) + trace_response = _json.dumps({ + "narrative": narrative, + "items": items, + "raw_prefix": (raw_response or "")[:500], + }, ensure_ascii=False) + + async with get_db_context() as db: + # P2.4: 嘗試找 parent_op_id(若未來有 drift→alert_fired 鏈路) + parent_row = await db.execute( + _sql(""" + SELECT op_id FROM automation_operation_log + WHERE operation_type='alert_fired' + AND (input::jsonb->>'report_id' = :rid + OR input::jsonb->>'drift_report_id' = :rid) + ORDER BY created_at DESC LIMIT 1 + """), + {"rid": report.report_id}, + ) + parent_op_id = parent_row.scalar() if parent_row else None + + # 寫 automation_operation_log + op_row = await db.execute( + _sql(""" + INSERT INTO automation_operation_log ( + operation_type, actor, status, + input, output, + duration_ms, parent_op_id, tags + ) VALUES ( + 'notification_formatted', + 'drift_narrator', + :status, + CAST(:input AS jsonb), + CAST(:output AS jsonb), + :duration_ms, :parent_op_id, + CAST(:tags AS text[]) + ) + RETURNING op_id + """), + { + "status": status, + "input": input_json, + "output": output_json, + "duration_ms": duration_ms, + "parent_op_id": parent_op_id, + "tags": "{drift,type4d,llm_summary}", + }, + ) + op_id = op_row.scalar() + + # 寫 ai_collaboration_trace + await db.execute( + _sql(""" + INSERT INTO ai_collaboration_trace ( + op_id, step_order, agent, model, + prompt, response, duration_ms, accepted + ) VALUES ( + :op_id, 1, 'drift_narrator', :model, + :prompt, CAST(:response AS jsonb), :duration_ms, :accepted + ) + """), + { + "op_id": op_id, + "model": provider, + "prompt": prompt[:8000], + "response": trace_response, + "duration_ms": duration_ms, + "accepted": llm_accepted, + }, + ) + # get_db_context() 在 exit 時 auto-commit(src/db/base.py:128) + logger.info( + "drift_narrator_audit_written", + op_id=str(op_id), + parent_op_id=str(parent_op_id) if parent_op_id else None, + status=status, + items_count=len(items), + ) def _format_drift_for_llm(self, report: "DriftReport") -> str: """