feat(phase30): Drift 報告 AI 人話摘要 (ADR-067)
Some checks failed
CD Pipeline / build-and-deploy (push) Has been cancelled

- 新增 DriftNarratorService — qwen2.5:7b-instruct (Ollama 111)
  - 觸發條件: high >= 1 or medium >= 3(HPA replicas 白名單)
  - Redis 快取: drift_narrative:{report_id} TTL 1h
  - LLM 失敗時 graceful fallback 結構化文字
- drift.py _analyze_and_notify: 接入 narrator(Phase 30 標記)
- Migration: drift_reports.narrative_text TEXT (已在 prod 執行)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
OG T
2026-04-10 01:37:43 +08:00
parent 2065665c9b
commit 89015d4527
3 changed files with 280 additions and 3 deletions

View File

@@ -0,0 +1,15 @@
-- Phase 30: Drift 報告 AI 人話摘要欄位
-- 2026-04-10 Claude Code (ADR-067): DriftNarratorService 寫入 narrative_text
-- qwen2.5:7b-instruct 生成繁中摘要,儲存於 drift_reports 表
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_name = 'drift_reports' AND column_name = 'narrative_text'
) THEN
ALTER TABLE drift_reports ADD COLUMN narrative_text TEXT DEFAULT NULL;
COMMENT ON COLUMN drift_reports.narrative_text IS
'AI 生成的繁體中文人話摘要 (qwen2.5:7b-instruct, Phase 30 ADR-067)';
END IF;
END $$;

View File

@@ -153,7 +153,9 @@ async def internal_scan(background_tasks: BackgroundTasks) -> dict:
# =============================================================================
async def _analyze_and_notify(report: DriftReport) -> None:
"""背景Nemotron 意圖分析 + Telegram 推送"""
"""背景Nemotron 意圖分析 + Telegram 推送 + Phase 30 AI 人話摘要"""
import structlog as _structlog
_logger = _structlog.get_logger(__name__)
try:
interpreter = get_drift_interpreter()
analyzer = get_drift_analyzer()
@@ -186,8 +188,16 @@ async def _analyze_and_notify(report: DriftReport) -> None:
f"adopt 端點暫停開放,待 ADR-057 實作後啟用)"
)
except Exception as e:
import structlog
structlog.get_logger(__name__).warning("drift_telegram_failed", error=str(e))
_logger.warning("drift_telegram_failed", error=str(e))
# Phase 30 (2026-04-10 Claude Code ADR-067): AI 人話摘要推送
# 在技術格式訊息之後,額外推送 qwen2.5:7b-instruct 生成的繁中摘要
try:
from src.services.drift_narrator_service import get_drift_narrator_service
narrator = get_drift_narrator_service()
await narrator.narrate_and_notify(report, interpretation)
except Exception as e:
_logger.warning("drift_narrator_failed", error=str(e))
except Exception as e:
import structlog

View File

@@ -0,0 +1,252 @@
"""
Drift Narrator Service - Phase 30
===================================
職責:將 DriftReport 轉為繁體中文人話,推送 Telegram
設計邊界:
- 只負責「敘述」,不做分析、不生成修復指令
- 觸發條件high_count > 0 or medium_count > 2
- 模型qwen2.5:7b-instruct (Ollama 111, 90s timeout)
- Redis 快取drift_narrative:{report_id} TTL 1h避免重複推送
- HPA replicas 自動調整在白名單,不觸發摘要
版本: v1.0
建立: 2026-04-10 (台北時區)
建立者: Claude Code (Phase 30 ADR-067)
"""
from __future__ import annotations
import html
from typing import TYPE_CHECKING
import httpx
import structlog
from src.core.redis_client import get_redis
if TYPE_CHECKING:
from src.models.drift import DriftInterpretation, DriftReport
logger = structlog.get_logger(__name__)
# ============================================================
# 設定
# ============================================================
OLLAMA_URL = "http://192.168.0.111:11434"
NARRATOR_MODEL = "qwen2.5:7b-instruct"
NARRATOR_TIMEOUT = 90.0 # seconds
CACHE_TTL = 3600 # 1 小時
CACHE_PREFIX = "drift_narrative:"
# HPA 自動調整白名單 field_path不納入敘述
_HPA_ALLOWLIST_PATHS = {
"spec.replicas",
}
# 觸發條件
TRIGGER_HIGH_MIN = 1
TRIGGER_MEDIUM_MIN = 3
# ============================================================
# Prompt
# ============================================================
_NARRATIVE_PROMPT = """你是 AWOOOI SRE 維運助理,請將以下 K8s 配置漂移報告轉為繁體中文人話。
## 漂移摘要
{drift_summary}
## 意圖分析
{intent_summary}
## 要求
- 繁體中文5 行以內
- 第 1 行說明漂移了哪些資源resource name
- 第 2 行:說明嚴重程度和數量
- 第 3 行:最可能的原因(引用意圖分析)
- 第 4 行建議的運維動作rollback 或 adopt
- 避免技術術語,用平實口語
- 只輸出摘要文字,不要標題或 markdown
"""
class DriftNarratorService:
"""
Drift 報告人話摘要服務
職責邊界:
✅ 呼叫 qwen2.5:7b-instruct 生成繁中摘要
✅ Redis 快取(避免重複推送)
✅ 推送 Telegram
❌ 不做漂移分析
❌ 不生成修復指令
"""
async def narrate_and_notify(
self,
report: "DriftReport",
interpretation: "DriftInterpretation | None" = None,
) -> None:
"""
生成人話摘要並推送 Telegram
只在 high_count > 0 or medium_count >= TRIGGER_MEDIUM_MIN 時執行
"""
if not self._should_narrate(report):
logger.debug(
"drift_narrator_skip",
report_id=report.report_id,
high=report.high_count,
medium=report.medium_count,
)
return
# Redis 快取檢查(同 report_id 不重複推送)
cache_key = f"{CACHE_PREFIX}{report.report_id}"
redis = await get_redis()
if await redis.exists(cache_key):
logger.debug("drift_narrator_cache_hit", report_id=report.report_id)
return
narrative = await self._generate_narrative(report, interpretation)
await self._send_telegram(report, narrative)
# 寫入快取
await redis.set(cache_key, narrative[:500], ex=CACHE_TTL)
logger.info(
"drift_narrator_sent",
report_id=report.report_id,
high=report.high_count,
medium=report.medium_count,
)
def _should_narrate(self, report: "DriftReport") -> bool:
"""觸發條件high >= 1 or medium >= 3"""
# 過濾 HPA 白名單後重算
non_hpa_items = [
item for item in report.items
if item.field_path not in _HPA_ALLOWLIST_PATHS
and not item.is_allowlisted
]
high = sum(1 for i in non_hpa_items if i.severity == "HIGH")
medium = sum(1 for i in non_hpa_items if i.severity == "MEDIUM")
return high >= TRIGGER_HIGH_MIN or medium >= TRIGGER_MEDIUM_MIN
async def _generate_narrative(
self,
report: "DriftReport",
interpretation: "DriftInterpretation | None",
) -> str:
"""呼叫 Ollama qwen2.5:7b-instruct 生成摘要"""
drift_summary = self._format_drift_summary(report)
intent_summary = self._format_intent_summary(interpretation)
prompt = _NARRATIVE_PROMPT.format(
drift_summary=drift_summary,
intent_summary=intent_summary,
)
try:
async with httpx.AsyncClient(timeout=NARRATOR_TIMEOUT) as client:
resp = await client.post(
f"{OLLAMA_URL}/api/generate",
json={
"model": NARRATOR_MODEL,
"prompt": prompt,
"stream": False,
"options": {"temperature": 0.3, "num_predict": 300},
},
)
resp.raise_for_status()
data = resp.json()
text = data.get("response", "").strip()
if text:
return text
except httpx.TimeoutException:
logger.warning("drift_narrator_timeout", model=NARRATOR_MODEL)
except Exception as e:
logger.warning("drift_narrator_llm_error", error=str(e))
# Fallback結構化文字摘要
return self._fallback_narrative(report, interpretation)
def _format_drift_summary(self, report: "DriftReport") -> str:
lines = []
for item in report.items[:8]:
if item.is_allowlisted or item.field_path in _HPA_ALLOWLIST_PATHS:
continue
lines.append(
f"- [{item.severity}] {item.resource_kind}/{item.resource_name}: "
f"{item.field_path} "
f"(Git: {str(item.git_value)[:30]} → K8s: {str(item.actual_value)[:30]})"
)
return "\n".join(lines) if lines else "(均為白名單欄位)"
def _format_intent_summary(self, interpretation: "DriftInterpretation | None") -> str:
if not interpretation:
return "無意圖分析"
return (
f"意圖: {interpretation.intent.value} | "
f"說明: {interpretation.explanation} | "
f"信心: {interpretation.confidence:.0%}"
)
def _fallback_narrative(
self,
report: "DriftReport",
interpretation: "DriftInterpretation | None",
) -> str:
"""LLM 失敗時的結構化 fallback"""
resources = list({
f"{i.resource_kind}/{i.resource_name}"
for i in report.items[:5]
if not i.is_allowlisted
})
resource_str = "".join(resources) if resources else "未知資源"
intent_str = interpretation.explanation if interpretation else "意圖分析不可用"
return (
f"偵測到 {resource_str} 等資源發生配置漂移。\n"
f"嚴重度HIGH {report.high_count} 項、MEDIUM {report.medium_count} 項。\n"
f"研判原因:{intent_str}\n"
f"建議:確認是否需要 rollback 回 Git 狀態。"
)
async def _send_telegram(self, report: "DriftReport", narrative: str) -> None:
"""推送 Telegram 人話摘要卡"""
from src.services.telegram_gateway import get_telegram_gateway
severity_icon = "🔴" if report.high_count > 0 else "🟡"
msg = (
f"{severity_icon} <b>K8s 配置漂移</b>\n"
f"Namespace: <code>{html.escape(report.namespace)}</code>\n"
f"HIGH: {report.high_count} | MEDIUM: {report.medium_count}\n"
f"\n"
f"🤖 <b>AI 研判</b>\n"
f"{html.escape(narrative)}\n"
f"\n"
f"📋 Report: <code>{html.escape(report.report_id)}</code>\n"
f"<i>qwen2.5:7b-instruct | 免費本地推理</i>"
)
try:
tg = get_telegram_gateway()
await tg.send_text(msg[:4096])
except Exception as e:
logger.warning("drift_narrator_telegram_error", error=str(e))
# ============================================================
# Singleton
# ============================================================
_narrator: DriftNarratorService | None = None
def get_drift_narrator_service() -> DriftNarratorService:
global _narrator
if _narrator is None:
_narrator = DriftNarratorService()
return _narrator