feat(drift-narrator): ADR-090-C L4 稽核閉環 — notification_formatted op 入庫
Some checks failed
CD Pipeline / build-and-deploy (push) Successful in 10m47s
run-migration / migrate (push) Failing after 14s

2026-04-18 下午(台北時區)—— ogt + Claude Opus 4.7 (1M)

架構鐵律執行:
「沒有被記錄的 AI 決策,就等於沒有發生過。」
drift_narrator 每次呼叫 LLM 生成摘要,必須完整寫入
automation_operation_log + ai_collaboration_trace,形成 L4 稽核 + RLHF 語料。

本 commit 兩件事:

1. apps/api/migrations/adr090c_notification_formatted_op_type.sql
   - 擴充 automation_operation_log.operation_type CHECK 加 'notification_formatted'
   - DROP + ADD CONSTRAINT idempotent 模式
   - 已用 awoooi(表 owner)apply 進 prod 驗證通過

2. apps/api/src/services/drift_narrator_service.py
   - 新增 _log_ai_action_to_db() 負責 DB 稽核寫入
   - 在 _generate_narrative_and_items() 結尾(success / fallback 都寫)呼叫
   - automation_operation_log:
     * operation_type='notification_formatted'
     * actor='drift_narrator'
     * input = {report_id, namespace, counts, items_scanned}
     * output = {narrative, items, items_count}
     * duration_ms, tags=['drift','type4d','llm_summary']
     * parent_op_id 查詢 alert_fired 鏈路(未來 drift → alert 關聯)
   - ai_collaboration_trace:
     * agent='drift_narrator', model=provider (ollama / nemotron / 等)
     * prompt(限 8000 字)+ response(JSONB)
     * accepted = LLM JSON 解析成功 flag(未來 RLHF 訓料金礦)
   - 錯誤處理: DB 寫入 try/except 包住,永不破壞 Telegram 通知主流程

P2.4 事件關聯:
  - SELECT parent op via input->>'report_id' 或 'drift_report_id'
  - 若找到則綁定 parent_op_id(形成 alert_fired → notification_formatted 追溯鏈)
  - 目前 drift 本身不經 alert_fired,parent 為 NULL(等未來鏈路接通)

P2.5 RLHF 語料:
  - ai_collaboration_trace.accepted=true 的紀錄即為「LLM 解析成功」樣本
  - 未來統帥按 Telegram [ 採納變更] / [ 回滾] 時,對應 trace 也可更新
    outcome flag,形成完整 Human-in-the-loop 語料

技術細節:
  - get_db_context() auto-commit(src/db/base.py:128),無需手動 commit
  - prompt 最長 8000 字(一般 drift 約 2-3k)
  - raw_response 保留前 500 字在 trace.response JSON 中

相關:
  - feedback_ai_autonomous_direction.md L4 北極星
  - feedback_secrets_leak_incidents_2026-04-18.md L1-L4 分層
  - ADR-090 11 張神經網路表
  - commit fb88512(B 方案視覺層)

IDE 可能顯示 src.db.base 找不到 —— 那是誤報(drift_repository.py 用同一條路徑)。

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
OG T
2026-04-18 16:04:22 +08:00
parent 4f70da027e
commit a156566b17
2 changed files with 204 additions and 10 deletions

View File

@@ -0,0 +1,42 @@
-- ADR-090-C: automation_operation_log.operation_type 擴充 notification_formatted
-- 建立時間: 2026-04-18 下午 (台北時區)
-- 建立者: ogt + Claude Opus 4.7 (1M)
--
-- 上游:
-- - ADR-090 主 schema (adr090_asset_inventory_foundation.sql)
-- - drift_narrator_service B 方案LLM 摘要取代 str()[:30]
--
-- 目的:
-- drift_narrator 每次呼叫 LLM 生成摘要 + 寫 Telegram,
-- 這是一個 AI 動作,必須在 automation_operation_log 留痕。
-- 現有 CHECK 沒有合適的 operation_type,新增 notification_formatted。
--
-- Idempotent:
-- 先 DROP CONSTRAINT IF EXISTS 再 ADD,重複執行安全。
--
-- 執行: PGPASSWORD="$MIGRATOR_PWD" psql -U awoooi_migrator -d awoooi_prod -f 本檔
-- 回滾: 把 notification_formatted 從 IN 清單移除後重跑。
-- ============================================================================
ALTER TABLE automation_operation_log
DROP CONSTRAINT IF EXISTS automation_operation_log_type_valid;
ALTER TABLE automation_operation_log
ADD CONSTRAINT automation_operation_log_type_valid CHECK (operation_type IN (
'monitor_configured','monitor_removed',
'alert_fired','alert_suppressed','alert_routed',
'rule_created','rule_updated','rule_matched','rule_rejected','rule_deprecated',
'playbook_generated','playbook_updated','playbook_executed',
'remediation_executed','remediation_verified','remediation_rolled_back',
'self_correction_attempted',
'km_created','km_updated','km_linked',
'asset_discovered','coverage_recalculated',
'capacity_recommendation','quota_enforced',
'notification_formatted' -- ADR-090-C 新增 (drift_narrator / 未來其他通知格式化 AI 動作)
));
-- 驗收查詢 (apply 後可手動跑):
-- SELECT pg_get_constraintdef(oid) FROM pg_constraint
-- WHERE conname='automation_operation_log_type_valid';
-- 應包含 'notification_formatted'

View File

@@ -171,8 +171,12 @@ class DriftNarratorService:
items: [{level, field, summary}, ...] 最多 5 筆
LLM 失敗則 fallback 到 Python 智能截斷(不是 str()[:30] 暴力砍)
2026-04-18 ADR-090-C: 每次呼叫同步寫入 automation_operation_log +
ai_collaboration_trace(不論成功或 fallback),完整 L4 稽核。
"""
import json as _json
import time
drift_items_json = self._format_drift_for_llm(report)
intent_summary = self._format_intent_summary(interpretation)
@@ -182,24 +186,32 @@ class DriftNarratorService:
intent_summary=intent_summary,
)
started_ms = time.time()
narrative: str = ""
items: list[dict] = []
raw_response: str | None = None
provider: str = "unknown"
status: str = "failed"
llm_accepted: bool = False
try:
openclaw = get_openclaw()
text, _provider, success = await openclaw.call(prompt)
provider = _provider or "unknown"
raw_response = text if text else None
if success and text and text.strip():
_raw = text.strip()
# 嘗試剝 code fence
if _raw.startswith("```"):
_raw = _raw.strip("`").lstrip("json").strip()
try:
_parsed = _json.loads(_raw)
if isinstance(_parsed, dict):
narrative = str(_parsed.get("narrative", "")).strip()
items = _parsed.get("items", [])
if isinstance(items, list) and narrative:
# 驗證 item 結構
_narrative = str(_parsed.get("narrative", "")).strip()
_items = _parsed.get("items", [])
if isinstance(_items, list) and _narrative:
clean_items = []
for it in items[:5]:
for it in _items[:5]:
if isinstance(it, dict) and it.get("field") and it.get("summary"):
clean_items.append({
"level": it.get("level", "medium"),
@@ -207,17 +219,157 @@ class DriftNarratorService:
"summary": str(it["summary"])[:80],
})
if clean_items:
return narrative, clean_items
narrative = _narrative
items = clean_items
status = "success"
llm_accepted = True
except (_json.JSONDecodeError, ValueError) as e:
logger.warning("drift_narrator_json_parse_fail", err=str(e), raw_prefix=_raw[:80])
logger.warning("drift_narrator_openclaw_failed", provider=_provider)
if not llm_accepted:
logger.warning("drift_narrator_openclaw_failed", provider=provider)
except Exception as e:
logger.warning("drift_narrator_llm_error", error=str(e))
# FallbackPython 智能截斷(不是 str()[:30]
return self._fallback_narrative(report, interpretation), self._fallback_items(report)
# Fallback
if not llm_accepted:
narrative = self._fallback_narrative(report, interpretation)
items = self._fallback_items(report)
status = "failed"
# ADR-090-C: 同步寫 DB 稽核(永不 propagate error,保護主流程)
duration_ms = int((time.time() - started_ms) * 1000)
try:
await self._log_ai_action_to_db(
report=report,
prompt=prompt,
raw_response=raw_response,
narrative=narrative,
items=items,
provider=provider,
status=status,
llm_accepted=llm_accepted,
duration_ms=duration_ms,
)
except Exception as e:
logger.warning("drift_narrator_audit_write_failed", error=str(e))
return narrative, items
async def _log_ai_action_to_db(
self,
report: "DriftReport",
prompt: str,
raw_response: str | None,
narrative: str,
items: list[dict],
provider: str,
status: str,
llm_accepted: bool,
duration_ms: int,
) -> None:
"""
ADR-090-C: 把 drift narrator 的 AI 動作寫入 automation_operation_log +
ai_collaboration_trace(L4 稽核 + 未來 RLHF 語料)
- op_type='notification_formatted'
- actor='drift_narrator'
- 若能找到該 drift 的 incident 關聯,設 parent_op_id
"""
import json as _json
from sqlalchemy import text as _sql
from src.db.base import get_db_context
input_json = _json.dumps({
"report_id": report.report_id,
"namespace": report.namespace,
"high_count": report.high_count,
"medium_count": report.medium_count,
"items_scanned": len(report.items),
})
output_json = _json.dumps({
"narrative": narrative,
"items": items,
"items_count": len(items),
}, ensure_ascii=False)
trace_response = _json.dumps({
"narrative": narrative,
"items": items,
"raw_prefix": (raw_response or "")[:500],
}, ensure_ascii=False)
async with get_db_context() as db:
# P2.4: 嘗試找 parent_op_id若未來有 drift→alert_fired 鏈路)
parent_row = await db.execute(
_sql("""
SELECT op_id FROM automation_operation_log
WHERE operation_type='alert_fired'
AND (input::jsonb->>'report_id' = :rid
OR input::jsonb->>'drift_report_id' = :rid)
ORDER BY created_at DESC LIMIT 1
"""),
{"rid": report.report_id},
)
parent_op_id = parent_row.scalar() if parent_row else None
# 寫 automation_operation_log
op_row = await db.execute(
_sql("""
INSERT INTO automation_operation_log (
operation_type, actor, status,
input, output,
duration_ms, parent_op_id, tags
) VALUES (
'notification_formatted',
'drift_narrator',
:status,
CAST(:input AS jsonb),
CAST(:output AS jsonb),
:duration_ms, :parent_op_id,
CAST(:tags AS text[])
)
RETURNING op_id
"""),
{
"status": status,
"input": input_json,
"output": output_json,
"duration_ms": duration_ms,
"parent_op_id": parent_op_id,
"tags": "{drift,type4d,llm_summary}",
},
)
op_id = op_row.scalar()
# 寫 ai_collaboration_trace
await db.execute(
_sql("""
INSERT INTO ai_collaboration_trace (
op_id, step_order, agent, model,
prompt, response, duration_ms, accepted
) VALUES (
:op_id, 1, 'drift_narrator', :model,
:prompt, CAST(:response AS jsonb), :duration_ms, :accepted
)
"""),
{
"op_id": op_id,
"model": provider,
"prompt": prompt[:8000],
"response": trace_response,
"duration_ms": duration_ms,
"accepted": llm_accepted,
},
)
# get_db_context() 在 exit 時 auto-commitsrc/db/base.py:128
logger.info(
"drift_narrator_audit_written",
op_id=str(op_id),
parent_op_id=str(parent_op_id) if parent_op_id else None,
status=status,
items_count=len(items),
)
def _format_drift_for_llm(self, report: "DriftReport") -> str:
"""