Files
awoooi/apps/api/src/services/drift_narrator_service.py
Your Name 35fe37c82a
All checks were successful
Code Review / ai-code-review (push) Successful in 23s
CD Pipeline / tests (push) Successful in 5m51s
CD Pipeline / build-and-deploy (push) Successful in 3m29s
CD Pipeline / post-deploy-checks (push) Successful in 1m14s
fix(api): route direct ollama callers through ordered fallback
2026-05-19 12:56:13 +08:00

801 lines
32 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Drift Narrator Service - Phase 30
===================================
職責:將 DriftReport 轉為繁體中文人話,推送 Telegram
設計邊界:
- 只負責「敘述」,不做分析、不生成修復指令
- 觸發條件high_count > 0 or medium_count > 2
- 模型qwen2.5:7b-instruct (Ollama 111, 90s timeout)
- Redis 快取drift_narrative:{report_id} TTL 1h避免重複推送
- HPA replicas 自動調整在白名單,不觸發摘要
版本: v1.0
建立: 2026-04-10 (台北時區)
建立者: Claude Code (Phase 30 ADR-067)
"""
from __future__ import annotations
from typing import TYPE_CHECKING
import structlog
from src.core.redis_client import get_redis
from src.services.model_registry import get_model
from src.services.openclaw import get_openclaw
if TYPE_CHECKING:
from src.models.drift import DriftInterpretation, DriftReport
logger = structlog.get_logger(__name__)
# ============================================================
# 設定
# ============================================================
# D1 集中化 2026-04-11: 從 models.json providers.ollama.models.drift_summary 讀取
NARRATOR_MODEL = get_model("ollama", "drift_summary")
NARRATOR_TIMEOUT = 90.0 # seconds
CACHE_TTL = 3600 # 1 小時
CACHE_PREFIX = "drift_narrative:"
# HPA 自動調整白名單 field_path不納入敘述
_HPA_ALLOWLIST_PATHS = {
"spec.replicas",
}
# 觸發條件
TRIGGER_HIGH_MIN = 1
TRIGGER_MEDIUM_MIN = 3
# ============================================================
# Prompt
# ============================================================
# 2026-04-18 ogt + Claude Opus 4.7: B 方案 — LLM 驅動智能摘要(取代 Python str()[:30] 截斷)
# 架構鐵律: 捨棄 Python 寫死字串解析,結構化 diff 直接餵 LLM,由 LLM 產出繁中 Top 5 摘要
# 2026-04-20 P0.2 ogt + Claude Opus 4.7: 加 recommendation 輸出LLM 推薦該按哪顆按鈕
# - action ∈ {adopt, revert, ignore, investigate}
# - confidence 0.0-1.0(統帥指令:先不 auto-execute門檻 0.85 保留給未來)
# - reason 一行繁中解釋
_NARRATIVE_PROMPT = """你是 AWOOOI SRE 維運助理。以下是 K8s Config Drift 報告的原始結構化資料。
## 漂移項目原始資料JSON
{drift_items_json}
## 意圖分析
{intent_summary}
## 輸出規格(必須是合法 JSON不得有任何前後文字
{{
"narrative": "4-5 行繁體中文敘述,說明漂移了哪些資源/嚴重程度/可能原因/建議動作",
"items": [
{{
"level": "high 或 medium",
"field": "簡化後的欄位路徑 (40 字內)",
"summary": "30 字內繁體中文口語摘要,說明從什麼變成什麼"
}}
],
"recommendation": {{
"action": "adopt 或 revert 或 ignore 或 investigate",
"confidence": 0.85,
"reason": "一行繁體中文解釋為何推薦此動作(含關鍵證據)"
}}
}}
## recommendation action 語意
- adopt: 現狀合理,應把 K8s 狀態寫回 Git (例HPA 自動擴縮、緊急 hotfix 已驗證)
- revert: 漂移有風險,應回滾到 Git 狀態 (例image tag 被誤改、secret 被外部改)
- ignore: 噪音K8s controller 自動補齊 (例:空 list/dict 差異)
- investigate: 不確定,需要人工查清楚
## 規則
- 繁體中文
- items 最多挑 5 筆最重要的HIGH 優先)
- summary 要讓非技術人員看懂「改了什麼」,例如:
- "新增 repair-ssh-key secret 掛載"(而非 repr 一長串)
- "(未設) → awoooi-executor"
- "新增 pod anti-affinity 規則"
- 禁止 markdown、反引號、emoji
- 只輸出純 JSON,不要包在 code block 裡
- recommendation.confidence 要誠實HIGH drift 且意圖不明 → 0.3-0.5trivial noise → 0.9
"""
class DriftNarratorService:
"""
Drift 報告人話摘要服務
職責邊界:
✅ 呼叫 qwen2.5:7b-instruct 生成繁中摘要
✅ Redis 快取(避免重複推送)
✅ 推送 Telegram
❌ 不做漂移分析
❌ 不生成修復指令
"""
async def narrate_and_notify(
self,
report: DriftReport,
interpretation: DriftInterpretation | None = None,
) -> None:
"""
生成人話摘要並推送 Telegram
只在 high_count > 0 or medium_count >= TRIGGER_MEDIUM_MIN 時執行
"""
if not self._should_narrate(report):
logger.debug(
"drift_narrator_skip",
report_id=report.report_id,
high=report.high_count,
medium=report.medium_count,
)
return
# Redis 快取檢查(同 report_id 不重複推送)
cache_key = f"{CACHE_PREFIX}{report.report_id}"
redis = await get_redis()
if await redis.exists(cache_key):
logger.debug("drift_narrator_cache_hit", report_id=report.report_id)
return
# 2026-04-18 B 方案: LLM 同時產 narrative + 結構化 items取代 str()[:30]
# 2026-04-20 P0.2: 追加 recommendationaction/confidence/reason
narrative, items, recommendation = await self._generate_narrative_and_items(report, interpretation)
repeat_state = None
try:
from src.repositories.drift_repository import get_drift_repository
repeat_state = await get_drift_repository().get_repeat_state(report)
except Exception as e:
logger.warning("drift_repeat_state_lookup_failed", report_id=report.report_id, error=str(e))
await self._send_telegram(report, narrative, items, recommendation, repeat_state)
# 寫入 DB narrative_text (Phase 30 ADR-067)
try:
from src.repositories.drift_repository import get_drift_repository
await get_drift_repository().update_narrative(report.report_id, narrative)
except Exception as e:
logger.warning("drift_narrator_db_write_failed", error=str(e))
# 寫入快取
await redis.set(cache_key, narrative[:500], ex=CACHE_TTL)
logger.info(
"drift_narrator_sent",
report_id=report.report_id,
high=report.high_count,
medium=report.medium_count,
)
def _should_narrate(self, report: DriftReport) -> bool:
"""觸發條件high >= 1 or medium >= 3"""
# 過濾 HPA 白名單後重算
non_hpa_items = [
item for item in report.items
if item.field_path not in _HPA_ALLOWLIST_PATHS
and not item.is_allowlisted
]
high = sum(1 for i in non_hpa_items if i.drift_level.value == "high")
medium = sum(1 for i in non_hpa_items if i.drift_level.value == "medium")
return high >= TRIGGER_HIGH_MIN or medium >= TRIGGER_MEDIUM_MIN
async def _generate_narrative_and_items(
self,
report: DriftReport,
interpretation: DriftInterpretation | None,
) -> tuple[str, list[dict], dict]:
"""
2026-04-18 ogt + Claude Opus 4.7: B 方案 — LLM 產生 narrative + 結構化 items
2026-04-20 P0.2 ogt + Claude Opus 4.7: 追加 recommendationAI 推薦按鈕)
回傳 (narrative, items, recommendation):
narrative: 繁中 4-5 行敘述
items: [{level, field, summary}, ...] 最多 5 筆
recommendation: {action, confidence, reason}
action ∈ {adopt, revert, ignore, investigate}
confidence 0.0-1.0(統帥指令:先不 auto-execute僅顯示供統帥參考
LLM 失敗則 fallback 到 Python 智能截斷(不是 str()[:30] 暴力砍)
2026-04-18 ADR-090-C: 每次呼叫同步寫入 automation_operation_log +
ai_collaboration_trace(不論成功或 fallback),完整 L4 稽核。
"""
import json as _json
import time
drift_items_json = self._format_drift_for_llm(report)
intent_summary = self._format_intent_summary(interpretation)
prompt = _NARRATIVE_PROMPT.format(
drift_items_json=drift_items_json,
intent_summary=intent_summary,
)
started_ms = time.time()
narrative: str = ""
items: list[dict] = []
recommendation: dict = {} # 2026-04-20 P0.2
raw_response: str | None = None
provider: str = "unknown"
status: str = "failed"
llm_accepted: bool = False
try:
openclaw = get_openclaw()
text, _provider, success = await openclaw.call(prompt)
provider = _provider or "unknown"
raw_response = text if text else None
if success and text and text.strip():
_raw = text.strip()
if _raw.startswith("```"):
_raw = _raw.strip("`").lstrip("json").strip()
# 解析策略: 3 路 fallback
# Path 1: 直接我們的 {narrative, items} 結構 (純 Ollama 或 LLM 守規矩)
# Path 2: NEMOTRON wrapper {description,...} 且 description 內含我們的結構
# Path 3: NEMOTRON wrapper,description 是純敘述 → 用它當 narrative + Python fallback items
_parsed_narrative = None
_parsed_items = None
try:
_parsed = _json.loads(_raw)
if isinstance(_parsed, dict):
# Path 1
if "narrative" in _parsed and isinstance(_parsed.get("items"), list):
_parsed_narrative = str(_parsed["narrative"]).strip()
_parsed_items = _parsed["items"]
else:
# Path 2 / Path 3: NEMOTRON wrapper
_desc = (
_parsed.get("description")
or _parsed.get("action_title")
or _parsed.get("reasoning")
or ""
)
_desc = str(_desc).strip()
# Path 2: description 本身是巢狀 JSON 含我們結構?
if _desc.startswith("{") and "narrative" in _desc:
try:
_inner = _json.loads(_desc)
if isinstance(_inner, dict) and "narrative" in _inner:
_parsed_narrative = str(_inner.get("narrative", "")).strip()
_parsed_items = _inner.get("items", []) if isinstance(_inner.get("items"), list) else None
except (_json.JSONDecodeError, ValueError):
pass
# Path 3: 只有 narrative(來自 description),items 用 Python fallback
if _parsed_narrative is None and _desc:
_parsed_narrative = _desc
_parsed_items = None # 觸發下方 fallback_items
except (_json.JSONDecodeError, ValueError) as e:
logger.warning("drift_narrator_json_parse_fail", err=str(e),
raw_prefix=_raw[:100], provider=provider)
# 驗證 + 清洗
if _parsed_narrative:
# 清洗 items (若 LLM 有給)
clean_items = []
if isinstance(_parsed_items, list):
for it in _parsed_items[:5]:
if isinstance(it, dict) and it.get("field") and it.get("summary"):
clean_items.append({
"level": it.get("level", "medium"),
"field": str(it["field"])[:60],
"summary": str(it["summary"])[:80],
})
# items 空就用 Python smart fallback (不是 str()[:30])
if not clean_items:
clean_items = self._fallback_items(report)
# 2026-04-20 P0.2: 解析 recommendation若 LLM 給了)
_rec = None
try:
if isinstance(_parsed, dict):
_rec = _parsed.get("recommendation")
# Path 2 場景recommendation 也可能藏在 _inner
if _rec is None and _parsed.get("description", "").startswith("{"):
_inner_txt = str(_parsed["description"]).strip()
_inner = _json.loads(_inner_txt)
if isinstance(_inner, dict):
_rec = _inner.get("recommendation")
except (_json.JSONDecodeError, ValueError, KeyError):
_rec = None
if isinstance(_rec, dict) and _rec.get("action"):
_act = str(_rec.get("action", "")).strip().lower()
if _act in ("adopt", "revert", "ignore", "investigate"):
try:
_conf = float(_rec.get("confidence", 0.0))
except (TypeError, ValueError):
_conf = 0.0
_conf = max(0.0, min(1.0, _conf))
recommendation = {
"action": _act,
"confidence": _conf,
"reason": str(_rec.get("reason", ""))[:200],
}
narrative = _parsed_narrative
items = clean_items
status = "success"
llm_accepted = True
if not llm_accepted:
logger.warning("drift_narrator_openclaw_failed", provider=provider)
except Exception as e:
logger.warning("drift_narrator_llm_error", error=str(e))
# Fallback
if not llm_accepted:
narrative = self._fallback_narrative(report, interpretation)
items = self._fallback_items(report)
status = "failed"
# 2026-04-20 P0.2: LLM 未給 recommendation 就走 Python fallback
if not recommendation:
recommendation = self._fallback_recommendation(report, interpretation)
# ADR-090-C: 同步寫 DB 稽核(永不 propagate error,保護主流程)
duration_ms = int((time.time() - started_ms) * 1000)
try:
await self._log_ai_action_to_db(
report=report,
prompt=prompt,
raw_response=raw_response,
narrative=narrative,
items=items,
provider=provider,
status=status,
llm_accepted=llm_accepted,
duration_ms=duration_ms,
)
except Exception as e:
logger.warning("drift_narrator_audit_write_failed", error=str(e))
return narrative, items, recommendation
def _fallback_recommendation(
self,
report: DriftReport,
interpretation: DriftInterpretation | None,
) -> dict:
"""
2026-04-20 P0.2 ogt + Claude Opus 4.7: LLM 沒給 recommendation 時的 Python fallback
規則式推薦(保守):
- 全部 trivial/白名單 → ignore (0.8)
- 有 HIGH drift + intent=emergency_hotfix → adopt (0.5) (不確定,降信心)
- 有 HIGH drift + intent=human_error → revert (0.7)
- 其他 → investigate (0.4)(請人工介入)
"""
actionable = self._count_nontrivial_drift(report)
if actionable == 0:
return {
"action": "ignore",
"confidence": 0.8,
"reason": "全部為白名單或 K8s 預設值補齊,無實質變更。",
}
_has_high = report.high_count > 0
_intent = interpretation.intent.value if interpretation else "unknown"
if _has_high and _intent == "emergency_hotfix":
return {
"action": "adopt",
"confidence": 0.5,
"reason": "HIGH drift 但意圖分析為緊急 hotfix建議採納並補 Git請人工複核",
}
if _has_high and _intent == "human_error":
return {
"action": "revert",
"confidence": 0.7,
"reason": "HIGH drift 且意圖分析為人為誤操作,建議回滾 Git 狀態。",
}
return {
"action": "investigate",
"confidence": 0.4,
"reason": f"{actionable} 項可操作漂移,意圖={_intent},需人工查清楚再決定。",
}
async def _log_ai_action_to_db(
self,
report: DriftReport,
prompt: str,
raw_response: str | None,
narrative: str,
items: list[dict],
provider: str,
status: str,
llm_accepted: bool,
duration_ms: int,
) -> None:
"""
ADR-090-C: 把 drift narrator 的 AI 動作寫入 automation_operation_log +
ai_collaboration_trace(L4 稽核 + 未來 RLHF 語料)
- op_type='notification_formatted'
- actor='drift_narrator'
- 若能找到該 drift 的 incident 關聯,設 parent_op_id
"""
import json as _json
from sqlalchemy import text as _sql
from src.db.base import get_db_context
input_json = _json.dumps({
"report_id": report.report_id,
"namespace": report.namespace,
"high_count": report.high_count,
"medium_count": report.medium_count,
"items_scanned": len(report.items),
})
output_json = _json.dumps({
"narrative": narrative,
"items": items,
"items_count": len(items),
}, ensure_ascii=False)
trace_response = _json.dumps({
"narrative": narrative,
"items": items,
"raw_prefix": (raw_response or "")[:500],
}, ensure_ascii=False)
async with get_db_context() as db:
# P2.4: 嘗試找 parent_op_id若未來有 drift→alert_fired 鏈路)
parent_row = await db.execute(
_sql("""
SELECT op_id FROM automation_operation_log
WHERE operation_type='alert_fired'
AND (input::jsonb->>'report_id' = :rid
OR input::jsonb->>'drift_report_id' = :rid)
ORDER BY created_at DESC LIMIT 1
"""),
{"rid": report.report_id},
)
parent_op_id = parent_row.scalar() if parent_row else None
# 寫 automation_operation_log
# 2026-04-18 hotfix: tags 要傳 Python list不是 PG array literal 字串)
# 否則 asyncpg 會報 "a sized iterable container expected"
op_row = await db.execute(
_sql("""
INSERT INTO automation_operation_log (
operation_type, actor, status,
input, output,
duration_ms, parent_op_id, tags
) VALUES (
'notification_formatted',
'drift_narrator',
:status,
CAST(:input AS jsonb),
CAST(:output AS jsonb),
:duration_ms, :parent_op_id,
:tags
)
RETURNING op_id
"""),
{
"status": status,
"input": input_json,
"output": output_json,
"duration_ms": duration_ms,
"parent_op_id": parent_op_id,
"tags": ["drift", "type4d", "llm_summary"],
},
)
op_id = op_row.scalar()
# 寫 ai_collaboration_trace
await db.execute(
_sql("""
INSERT INTO ai_collaboration_trace (
op_id, step_order, agent, model,
prompt, response, duration_ms, accepted
) VALUES (
:op_id, 1, 'drift_narrator', :model,
:prompt, CAST(:response AS jsonb), :duration_ms, :accepted
)
"""),
{
"op_id": op_id,
"model": provider,
"prompt": prompt[:8000],
"response": trace_response,
"duration_ms": duration_ms,
"accepted": llm_accepted,
},
)
# get_db_context() 在 exit 時 auto-commitsrc/db/base.py:128
logger.info(
"drift_narrator_audit_written",
op_id=str(op_id),
parent_op_id=str(parent_op_id) if parent_op_id else None,
status=status,
items_count=len(items),
)
def _format_drift_for_llm(self, report: DriftReport) -> str:
"""
2026-04-18 ogt + Claude Opus 4.7: B 方案 — 餵 LLM 用的 JSON 序列化
保留更多原始 context 給 LLM 推理,不做 30 字元暴力截斷
"""
import json as _json
items_for_llm = []
for item in report.items[:12]:
if item.is_allowlisted or item.field_path in _HPA_ALLOWLIST_PATHS:
continue
items_for_llm.append({
"level": item.drift_level.value,
"resource": f"{item.resource_kind}/{item.resource_name}",
"field": item.field_path,
"git_value": str(item.git_value)[:200] if item.git_value is not None else None,
"actual_value": str(item.actual_value)[:200] if item.actual_value is not None else None,
})
return _json.dumps(items_for_llm, ensure_ascii=False, indent=2)
def _smart_shorten(self, val) -> str:
"""型別安全摘要 — dict/list 顯示大小,字串保留頭尾,None 轉「未設」"""
if val is None:
return "(未設)"
s = str(val)
if s in ("{}", "[]"):
return ""
if s.startswith("[") and s.endswith("]"):
return f"[清單 {s.count(',')+1 if s != '[]' else 0} 項]"
if s.startswith("{") and s.endswith("}"):
return f"{{物件 {s.count(':')} 欄位}}"
if len(s) > 40:
return s[:37] + "..."
return s
def _is_trivial_drift(self, git_val, actual_val) -> bool:
"""
判斷是否為 K8s controller 自動補齊的噪音
(例: None ↔ {} / None ↔ [] / {} ↔ [] 等視為無實質變更)
"""
def _is_empty(v):
if v is None:
return True
s = str(v).strip()
return s in ("", "{}", "[]", "null", "None", "false", "False", "0")
return _is_empty(git_val) and _is_empty(actual_val)
def _summarize_item(self, item) -> str:
"""
生成一筆 drift 的人話摘要 (fallback 用)
- 空 vs 空 → 標註為 controller 自動補齊
- None → 新增 → 顯示新值摘要
- 有值 → 有值 → 顯示前後變化
"""
git_val = item.git_value
actual_val = item.actual_value
if self._is_trivial_drift(git_val, actual_val):
return "K8s 預設值補齊 (無實質變更)"
from_val = self._smart_shorten(git_val)
to_val = self._smart_shorten(actual_val)
# None → 有值: 新增
if git_val is None and actual_val is not None:
return f"新增 {to_val}"
# 有值 → None: 刪除
if git_val is not None and actual_val is None:
return f"已刪除 (原: {from_val})"
# 一般變化
return f"{from_val}{to_val}"
def _fallback_items(self, report: DriftReport) -> list[dict]:
"""
LLM 失敗時的 Python 智能摘要 (取代舊 str()[:30])
- 過濾白名單
- 優先 HIGH
- trivial drift 標註為「預設值補齊」
"""
# 按 level 排序 (HIGH 優先) 並過濾白名單
filtered = [
it for it in report.items
if not it.is_allowlisted and it.field_path not in _HPA_ALLOWLIST_PATHS
]
filtered.sort(key=lambda x: 0 if x.drift_level.value == "high" else 1)
items = []
for item in filtered[:5]:
items.append({
"level": item.drift_level.value,
"field": item.field_path[:60],
"summary": self._summarize_item(item),
})
return items
def _format_intent_summary(self, interpretation: DriftInterpretation | None) -> str:
if not interpretation:
return "無意圖分析"
return (
f"意圖: {interpretation.intent.value} | "
f"說明: {interpretation.explanation} | "
f"信心: {interpretation.confidence:.0%}"
)
def _fallback_narrative(
self,
report: DriftReport,
interpretation: DriftInterpretation | None,
) -> str:
"""LLM 失敗時的結構化 fallback"""
resources = list({
f"{i.resource_kind}/{i.resource_name}"
for i in report.items[:5]
if not i.is_allowlisted
})
resource_str = "".join(resources) if resources else "未知資源"
intent_str = interpretation.explanation if interpretation else "意圖分析不可用"
return (
f"偵測到 {resource_str} 等資源發生配置漂移。\n"
f"嚴重度HIGH {report.high_count} 項、MEDIUM {report.medium_count} 項。\n"
f"研判原因:{intent_str}\n"
f"建議:確認是否需要 rollback 回 Git 狀態。"
)
async def _send_telegram(
self,
report: DriftReport,
narrative: str,
items: list[dict],
recommendation: dict | None = None,
repeat_state: dict | None = None,
) -> None:
"""
推送 TYPE-4D Config Drift 卡片ADR-075+ B 方案智能摘要
2026-04-18 ogt + Claude Opus 4.7: 改用 LLM 產的結構化 items,
取代 str()[:30] 暴力截斷產生的亂碼
2026-04-20 P0.2 ogt + Claude Opus 4.7: recommendation 顯示在卡片頂部
(統帥指令:先不 auto-execute純顯示推薦讓人一眼知道按哪顆
"""
from src.services.telegram_gateway import get_telegram_gateway
diff_summary = self._render_telegram_body(report, narrative, items, recommendation, repeat_state)
try:
tg = get_telegram_gateway()
# 2026-04-20 P0.2: 500 → 1500 字上限,讓 AI 推薦 + narrative + items 都能容納
# send_drift_card 已同步放寬 HTML 顯示上限至 1500
await tg.send_drift_card(
incident_id=report.report_id,
approval_id=report.report_id,
resource_name=report.namespace,
diff_summary=diff_summary[:1500],
detected_at="",
)
except Exception as e:
logger.warning("drift_narrator_telegram_error", error=str(e))
def _count_nontrivial_drift(self, report: DriftReport) -> int:
"""
計算非白名單、非 trivial (K8s 自動補齊) 的 drift 數
用於 Telegram 底部「還有 N 項」顯示實際可操作數量
"""
n = 0
for item in report.items:
if item.is_allowlisted or item.field_path in _HPA_ALLOWLIST_PATHS:
continue
if self._is_trivial_drift(item.git_value, item.actual_value):
continue
n += 1
return n
def _shorten_field_path(self, field: str) -> str:
"""
砍掉常見冗長前綴,讓 Telegram 排版不換行
處理 2 種場景:
A. 開頭即前綴: 'spec.template.spec.volumes''volumes'
B. LLM 雞婆加資源識別符: 'Deployment/awoooi-web: spec.template.spec.containers'
'Deployment/awoooi-web: containers'
用 replace 比 startswith 更有韌性,包容 LLM 前綴幻覺。
"""
# 先移除 absolute prefix若開頭
for prefix in ("spec.template.spec.", "spec.template.", "spec."):
if field.startswith(prefix):
return field[len(prefix):]
# 中間出現LLM 加 'Resource/Name: spec.template.spec.xxx' 場景)
for prefix in ("spec.template.spec.", "spec.template."):
if prefix in field:
return field.replace(prefix, "")
return field
def _render_telegram_body(
self,
report: DriftReport,
narrative: str,
items: list[dict],
recommendation: dict | None = None,
repeat_state: dict | None = None,
) -> str:
"""
組裝 Telegram 卡片 bodyB 方案格式 + P0.2 AI 推薦)
範例輸出:
🎯 AI 建議:⏪ 回滾 (85%) — image tag 被手動改到未驗證版本
🤖 AI 研判
volumes 與 affinity 被手動修改...
📊 漂移明細 (HIGH: 1 | MEDIUM: 29)
🔴 spec.template.spec.volumes: 新增 2 項 repair-ssh-key 掛載
🟡 spec.template.spec.serviceAccount: (未設) → awoooi-executor
... 還有 27 項
"""
lines = []
# 2026-04-20 P0.2 AI 推薦(頂部,純推薦不自動執行)
if recommendation and recommendation.get("action"):
_act = recommendation["action"]
_conf = float(recommendation.get("confidence", 0.0))
_reason = recommendation.get("reason", "")
_emoji_action = {
"adopt": "✅ 採納",
"revert": "⏪ 回滾",
"ignore": "🔕 忽略",
"investigate": "🔍 人工調查",
}.get(_act, _act)
lines.append(f"🎯 AI 建議:{_emoji_action} ({int(_conf * 100)}%) — {_reason}\n")
repeat_line = self._render_repeat_state(repeat_state)
if repeat_line:
lines.append(f"{repeat_line}\n")
lines.append(f"🤖 AI 研判\n{narrative}\n")
# 用非 trivial + 非白名單 的實際可操作數顯示
actionable = self._count_nontrivial_drift(report)
lines.append(f"📊 漂移明細 (HIGH: {report.high_count} | MEDIUM: {report.medium_count} | 可操作: {actionable})")
if not items:
lines.append(" (全部為白名單或 K8s 預設值補齊,無實質變更)")
else:
for it in items:
emoji = "🔴" if it.get("level") == "high" else "🟡"
short_field = self._shorten_field_path(it['field'])
lines.append(f"{emoji} {short_field}: {it['summary']}")
shown = len(items)
if actionable > shown:
lines.append(f"... 還有 {actionable - shown} 項 (按 🔍 查看 Diff)")
return "\n".join(lines)
def _render_repeat_state(self, repeat_state: dict | None) -> str:
"""Render operator-visible repeat/stage metadata for Telegram."""
if not repeat_state:
return ""
fingerprint = str(repeat_state.get("fingerprint") or "unknown")
occurrences = int(repeat_state.get("occurrences_12h") or 0)
window_hours = int(repeat_state.get("window_hours") or 12)
stage = str(repeat_state.get("operator_stage") or "unknown")
if occurrences <= 1:
repeat_text = f"{window_hours}h 內首次出現"
else:
repeat_text = f"{window_hours}h 內第 {occurrences} 次同指紋"
return (
"流程: drift_scanned → ai_analyzed → "
f"{stage}\n重複: {repeat_text}\n指紋: {fingerprint}"
)
# ============================================================
# Singleton
# ============================================================
_narrator: DriftNarratorService | None = None
def get_drift_narrator_service() -> DriftNarratorService:
global _narrator
if _narrator is None:
_narrator = DriftNarratorService()
return _narrator