From a156566b1758b6bfbf7b484e0ea8b809013bf343 Mon Sep 17 00:00:00 2001 From: OG T Date: Sat, 18 Apr 2026 16:04:22 +0800 Subject: [PATCH] =?UTF-8?q?feat(drift-narrator):=20ADR-090-C=20L4=20?= =?UTF-8?q?=E7=A8=BD=E6=A0=B8=E9=96=89=E7=92=B0=20=E2=80=94=20notification?= =?UTF-8?q?=5Fformatted=20op=20=E5=85=A5=E5=BA=AB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 2026-04-18 下午(台北時區)—— ogt + Claude Opus 4.7 (1M) 架構鐵律執行: 「沒有被記錄的 AI 決策,就等於沒有發生過。」 drift_narrator 每次呼叫 LLM 生成摘要,必須完整寫入 automation_operation_log + ai_collaboration_trace,形成 L4 稽核 + RLHF 語料。 本 commit 兩件事: 1. apps/api/migrations/adr090c_notification_formatted_op_type.sql - 擴充 automation_operation_log.operation_type CHECK 加 'notification_formatted' - DROP + ADD CONSTRAINT idempotent 模式 - 已用 awoooi(表 owner)apply 進 prod 驗證通過 2. apps/api/src/services/drift_narrator_service.py - 新增 _log_ai_action_to_db() 負責 DB 稽核寫入 - 在 _generate_narrative_and_items() 結尾(success / fallback 都寫)呼叫 - automation_operation_log: * operation_type='notification_formatted' * actor='drift_narrator' * input = {report_id, namespace, counts, items_scanned} * output = {narrative, items, items_count} * duration_ms, tags=['drift','type4d','llm_summary'] * parent_op_id 查詢 alert_fired 鏈路(未來 drift → alert 關聯) - ai_collaboration_trace: * agent='drift_narrator', model=provider (ollama / nemotron / 等) * prompt(限 8000 字)+ response(JSONB) * accepted = LLM JSON 解析成功 flag(未來 RLHF 訓料金礦) - 錯誤處理: DB 寫入 try/except 包住,永不破壞 Telegram 通知主流程 P2.4 事件關聯: - SELECT parent op via input->>'report_id' 或 'drift_report_id' - 若找到則綁定 parent_op_id(形成 alert_fired → notification_formatted 追溯鏈) - 目前 drift 本身不經 alert_fired,parent 為 NULL(等未來鏈路接通) P2.5 RLHF 語料: - ai_collaboration_trace.accepted=true 的紀錄即為「LLM 解析成功」樣本 - 未來統帥按 Telegram [✅ 採納變更] / [⏪ 回滾] 時,對應 trace 也可更新 outcome flag,形成完整 Human-in-the-loop 語料 技術細節: - get_db_context() auto-commit(src/db/base.py:128),無需手動 commit - prompt 最長 8000 字(一般 drift 約 2-3k) - raw_response 保留前 500 字在 trace.response JSON 中 相關: - feedback_ai_autonomous_direction.md L4 北極星 - feedback_secrets_leak_incidents_2026-04-18.md L1-L4 分層 - ADR-090 11 張神經網路表 - commit fb88512(B 方案視覺層) IDE 可能顯示 src.db.base 找不到 —— 那是誤報(drift_repository.py 用同一條路徑)。 Co-Authored-By: Claude Opus 4.7 (1M context) --- ...adr090c_notification_formatted_op_type.sql | 42 +++++ .../src/services/drift_narrator_service.py | 172 +++++++++++++++++- 2 files changed, 204 insertions(+), 10 deletions(-) create mode 100644 apps/api/migrations/adr090c_notification_formatted_op_type.sql diff --git a/apps/api/migrations/adr090c_notification_formatted_op_type.sql b/apps/api/migrations/adr090c_notification_formatted_op_type.sql new file mode 100644 index 00000000..fe2b3cdd --- /dev/null +++ b/apps/api/migrations/adr090c_notification_formatted_op_type.sql @@ -0,0 +1,42 @@ +-- ADR-090-C: automation_operation_log.operation_type 擴充 notification_formatted +-- 建立時間: 2026-04-18 下午 (台北時區) +-- 建立者: ogt + Claude Opus 4.7 (1M) +-- +-- 上游: +-- - ADR-090 主 schema (adr090_asset_inventory_foundation.sql) +-- - drift_narrator_service B 方案(LLM 摘要取代 str()[:30]) +-- +-- 目的: +-- drift_narrator 每次呼叫 LLM 生成摘要 + 寫 Telegram, +-- 這是一個 AI 動作,必須在 automation_operation_log 留痕。 +-- 現有 CHECK 沒有合適的 operation_type,新增 notification_formatted。 +-- +-- Idempotent: +-- 先 DROP CONSTRAINT IF EXISTS 再 ADD,重複執行安全。 +-- +-- 執行: PGPASSWORD="$MIGRATOR_PWD" psql -U awoooi_migrator -d awoooi_prod -f 本檔 +-- 回滾: 把 notification_formatted 從 IN 清單移除後重跑。 + +-- ============================================================================ + +ALTER TABLE automation_operation_log + DROP CONSTRAINT IF EXISTS automation_operation_log_type_valid; + +ALTER TABLE automation_operation_log + ADD CONSTRAINT automation_operation_log_type_valid CHECK (operation_type IN ( + 'monitor_configured','monitor_removed', + 'alert_fired','alert_suppressed','alert_routed', + 'rule_created','rule_updated','rule_matched','rule_rejected','rule_deprecated', + 'playbook_generated','playbook_updated','playbook_executed', + 'remediation_executed','remediation_verified','remediation_rolled_back', + 'self_correction_attempted', + 'km_created','km_updated','km_linked', + 'asset_discovered','coverage_recalculated', + 'capacity_recommendation','quota_enforced', + 'notification_formatted' -- ADR-090-C 新增 (drift_narrator / 未來其他通知格式化 AI 動作) + )); + +-- 驗收查詢 (apply 後可手動跑): +-- SELECT pg_get_constraintdef(oid) FROM pg_constraint +-- WHERE conname='automation_operation_log_type_valid'; +-- 應包含 'notification_formatted' diff --git a/apps/api/src/services/drift_narrator_service.py b/apps/api/src/services/drift_narrator_service.py index c0bcef03..18a8a162 100644 --- a/apps/api/src/services/drift_narrator_service.py +++ b/apps/api/src/services/drift_narrator_service.py @@ -171,8 +171,12 @@ class DriftNarratorService: items: [{level, field, summary}, ...] 最多 5 筆 LLM 失敗則 fallback 到 Python 智能截斷(不是 str()[:30] 暴力砍) + + 2026-04-18 ADR-090-C: 每次呼叫同步寫入 automation_operation_log + + ai_collaboration_trace(不論成功或 fallback),完整 L4 稽核。 """ import json as _json + import time drift_items_json = self._format_drift_for_llm(report) intent_summary = self._format_intent_summary(interpretation) @@ -182,24 +186,32 @@ class DriftNarratorService: intent_summary=intent_summary, ) + started_ms = time.time() + narrative: str = "" + items: list[dict] = [] + raw_response: str | None = None + provider: str = "unknown" + status: str = "failed" + llm_accepted: bool = False + try: openclaw = get_openclaw() text, _provider, success = await openclaw.call(prompt) + provider = _provider or "unknown" + raw_response = text if text else None if success and text and text.strip(): _raw = text.strip() - # 嘗試剝 code fence if _raw.startswith("```"): _raw = _raw.strip("`").lstrip("json").strip() try: _parsed = _json.loads(_raw) if isinstance(_parsed, dict): - narrative = str(_parsed.get("narrative", "")).strip() - items = _parsed.get("items", []) - if isinstance(items, list) and narrative: - # 驗證 item 結構 + _narrative = str(_parsed.get("narrative", "")).strip() + _items = _parsed.get("items", []) + if isinstance(_items, list) and _narrative: clean_items = [] - for it in items[:5]: + for it in _items[:5]: if isinstance(it, dict) and it.get("field") and it.get("summary"): clean_items.append({ "level": it.get("level", "medium"), @@ -207,17 +219,157 @@ class DriftNarratorService: "summary": str(it["summary"])[:80], }) if clean_items: - return narrative, clean_items + narrative = _narrative + items = clean_items + status = "success" + llm_accepted = True except (_json.JSONDecodeError, ValueError) as e: logger.warning("drift_narrator_json_parse_fail", err=str(e), raw_prefix=_raw[:80]) - logger.warning("drift_narrator_openclaw_failed", provider=_provider) + if not llm_accepted: + logger.warning("drift_narrator_openclaw_failed", provider=provider) except Exception as e: logger.warning("drift_narrator_llm_error", error=str(e)) - # Fallback:Python 智能截斷(不是 str()[:30]) - return self._fallback_narrative(report, interpretation), self._fallback_items(report) + # Fallback + if not llm_accepted: + narrative = self._fallback_narrative(report, interpretation) + items = self._fallback_items(report) + status = "failed" + + # ADR-090-C: 同步寫 DB 稽核(永不 propagate error,保護主流程) + duration_ms = int((time.time() - started_ms) * 1000) + try: + await self._log_ai_action_to_db( + report=report, + prompt=prompt, + raw_response=raw_response, + narrative=narrative, + items=items, + provider=provider, + status=status, + llm_accepted=llm_accepted, + duration_ms=duration_ms, + ) + except Exception as e: + logger.warning("drift_narrator_audit_write_failed", error=str(e)) + + return narrative, items + + async def _log_ai_action_to_db( + self, + report: "DriftReport", + prompt: str, + raw_response: str | None, + narrative: str, + items: list[dict], + provider: str, + status: str, + llm_accepted: bool, + duration_ms: int, + ) -> None: + """ + ADR-090-C: 把 drift narrator 的 AI 動作寫入 automation_operation_log + + ai_collaboration_trace(L4 稽核 + 未來 RLHF 語料) + + - op_type='notification_formatted' + - actor='drift_narrator' + - 若能找到該 drift 的 incident 關聯,設 parent_op_id + """ + import json as _json + from sqlalchemy import text as _sql + from src.db.base import get_db_context + + input_json = _json.dumps({ + "report_id": report.report_id, + "namespace": report.namespace, + "high_count": report.high_count, + "medium_count": report.medium_count, + "items_scanned": len(report.items), + }) + output_json = _json.dumps({ + "narrative": narrative, + "items": items, + "items_count": len(items), + }, ensure_ascii=False) + trace_response = _json.dumps({ + "narrative": narrative, + "items": items, + "raw_prefix": (raw_response or "")[:500], + }, ensure_ascii=False) + + async with get_db_context() as db: + # P2.4: 嘗試找 parent_op_id(若未來有 drift→alert_fired 鏈路) + parent_row = await db.execute( + _sql(""" + SELECT op_id FROM automation_operation_log + WHERE operation_type='alert_fired' + AND (input::jsonb->>'report_id' = :rid + OR input::jsonb->>'drift_report_id' = :rid) + ORDER BY created_at DESC LIMIT 1 + """), + {"rid": report.report_id}, + ) + parent_op_id = parent_row.scalar() if parent_row else None + + # 寫 automation_operation_log + op_row = await db.execute( + _sql(""" + INSERT INTO automation_operation_log ( + operation_type, actor, status, + input, output, + duration_ms, parent_op_id, tags + ) VALUES ( + 'notification_formatted', + 'drift_narrator', + :status, + CAST(:input AS jsonb), + CAST(:output AS jsonb), + :duration_ms, :parent_op_id, + CAST(:tags AS text[]) + ) + RETURNING op_id + """), + { + "status": status, + "input": input_json, + "output": output_json, + "duration_ms": duration_ms, + "parent_op_id": parent_op_id, + "tags": "{drift,type4d,llm_summary}", + }, + ) + op_id = op_row.scalar() + + # 寫 ai_collaboration_trace + await db.execute( + _sql(""" + INSERT INTO ai_collaboration_trace ( + op_id, step_order, agent, model, + prompt, response, duration_ms, accepted + ) VALUES ( + :op_id, 1, 'drift_narrator', :model, + :prompt, CAST(:response AS jsonb), :duration_ms, :accepted + ) + """), + { + "op_id": op_id, + "model": provider, + "prompt": prompt[:8000], + "response": trace_response, + "duration_ms": duration_ms, + "accepted": llm_accepted, + }, + ) + # get_db_context() 在 exit 時 auto-commit(src/db/base.py:128) + logger.info( + "drift_narrator_audit_written", + op_id=str(op_id), + parent_op_id=str(parent_op_id) if parent_op_id else None, + status=status, + items_count=len(items), + ) def _format_drift_for_llm(self, report: "DriftReport") -> str: """