feat(Phase 3): 學習閉環補完 — Root cause 3 + 診斷 feedback + 知識遺忘 + Fine-tune 管線
Some checks failed
CD Pipeline / build-and-deploy (push) Has been cancelled

- approval_execution.py: _run_post_execution_verify() 補接 record_verification_result()
  Root cause 3 終結:環境驗證結果(success/degraded/failed/timeout)不再孤立
- learning_service.py: 新增 record_verification_result() — 驗證結果 → Redis + Playbook EWMA
- learning_service.py: 新增 record_diagnosis_outcome() — 誤診負向訊號回寫(L3×D4)
- jobs/knowledge_decay_job.py: 新建 30d 知識遺忘 Job(未引用 draft/review → archived)
- services/finetune_exporter.py: 新建每週 JSONL 匯出(EvidenceSnapshot × AgentSession)
- main.py: 掛載 knowledge_decay_loop(24h)+ finetune_export_loop(7d)
- MASTER §8: Phase 3 核心改造項全部落地記錄

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
OG T
2026-04-15 20:57:33 +08:00
parent e23e49c13b
commit fb1bbd0e20
6 changed files with 598 additions and 0 deletions

View File

@@ -0,0 +1,180 @@
"""
AWOOOI AIOps Phase 3 — 知識遺忘 Job
=====================================
職責每日掃描知識庫knowledge_entries中 30 天未被引用view_count = 0
且 updated_at < now-30d的草稿/審核條目,標記為 archived知識遺忘
為什麼需要知識遺忘?
短期學習偏差AI 早期案例學習的修復模式可能已過時K8s 版本、服務名稱改變)。
若不遺忘,舊的 zero-evidence 條目會持續污染 RAG 檢索,
拉低 Playbook 匹配精度,增加誤診率。
遺忘策略:
- 對象status in (draft, review) 且 view_count = 0 且 updated_at < now-30d
- 動作status → archivedtags 追加 'kb_decay_30d'
- 豁免status = approved需人工封存status = archived已封存
- 每次執行記錄摘要到 structlog不寫 governance event避免雜訊
設計原則:
1. 只標記,不刪除(符合 archive_not_delete 鐵律)
2. 批次操作,每次最多 200 筆(避免長事務)
3. Job 失敗只記錄 error不影響主路徑
ADR-083 Phase 3: 知識遺忘L7×D4
2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 3 初始建立
"""
from __future__ import annotations
import asyncio
from dataclasses import dataclass, field
from datetime import timedelta
import structlog
from sqlalchemy import and_, select, update
from src.db.base import get_session_factory
from src.db.models import KnowledgeEntryRecord
from src.models.knowledge import EntryStatus
from src.utils.timezone import now_taipei
logger = structlog.get_logger(__name__)
# ─────────────────────────────────────────────────────────────────────────────
# 常數
# ─────────────────────────────────────────────────────────────────────────────
DECAY_AGE_DAYS = 30
DECAY_TAG = "kb_decay_30d"
BATCH_LIMIT = 200
DAILY_INTERVAL_SEC = 86_400 # 24h
# ─────────────────────────────────────────────────────────────────────────────
# Data Types
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class DecayScanResult:
"""知識遺忘掃描結果"""
total_scanned: int
decayed_ids: list[str] = field(default_factory=list)
scanned_at: str = field(default_factory=lambda: now_taipei().isoformat())
@property
def decayed_count(self) -> int:
return len(self.decayed_ids)
def to_dict(self) -> dict:
return {
"total_scanned": self.total_scanned,
"decayed_count": self.decayed_count,
"decayed_ids_sample": self.decayed_ids[:20],
"scanned_at": self.scanned_at,
}
# ─────────────────────────────────────────────────────────────────────────────
# Main Job
# ─────────────────────────────────────────────────────────────────────────────
class KnowledgeDecayJob:
"""
知識遺忘 Job每日執行
Usage:
job = KnowledgeDecayJob()
result = await job.run()
"""
async def run(self) -> DecayScanResult:
"""
完整執行:掃描 → 標記 archived知識遺忘
Returns:
DecayScanResult
"""
from src.core.feature_flags import aiops_flags
if not aiops_flags.AIOPS_P3_ENABLED:
logger.debug("knowledge_decay_job_skipped_feature_flag")
return DecayScanResult(total_scanned=0)
try:
return await self._run_scan()
except Exception as e:
logger.error(
"knowledge_decay_job_error",
error=str(e),
)
return DecayScanResult(total_scanned=0)
async def _run_scan(self) -> DecayScanResult:
cutoff = now_taipei() - timedelta(days=DECAY_AGE_DAYS)
decayable_statuses = [EntryStatus.DRAFT.value, EntryStatus.REVIEW.value]
session_factory = get_session_factory()
async with session_factory() as db:
# 查30 天未引用view_count=0且 updated_at < cutoff 的 draft/review 條目
stmt = select(KnowledgeEntryRecord).where(
and_(
KnowledgeEntryRecord.status.in_(decayable_statuses),
KnowledgeEntryRecord.view_count == 0,
KnowledgeEntryRecord.updated_at < cutoff,
)
).limit(BATCH_LIMIT)
result = await db.execute(stmt)
entries = result.scalars().all()
total_scanned = len(entries)
if not entries:
logger.debug("knowledge_decay_nothing_to_decay")
return DecayScanResult(total_scanned=0)
decayed_ids = []
for entry in entries:
# 追加 decay tag不重複
current_tags: list[str] = list(entry.tags or [])
if DECAY_TAG not in current_tags:
current_tags.append(DECAY_TAG)
entry.status = EntryStatus.ARCHIVED.value
entry.tags = current_tags
decayed_ids.append(entry.id)
await db.commit()
result = DecayScanResult(
total_scanned=total_scanned,
decayed_ids=decayed_ids,
)
logger.info(
"knowledge_decay_job_done",
**result.to_dict(),
)
return result
# ─────────────────────────────────────────────────────────────────────────────
# Loop掛載到 main.py
# ─────────────────────────────────────────────────────────────────────────────
async def run_knowledge_decay_loop() -> None:
"""
無限迴圈:每 24h 執行一次知識遺忘掃描。
在 main.py startup 以 asyncio.create_task 掛載。
"""
job = KnowledgeDecayJob()
while True:
try:
result = await job.run()
if result.decayed_count > 0:
logger.info(
"knowledge_decay_loop_tick",
decayed=result.decayed_count,
)
except Exception as e:
logger.error("knowledge_decay_loop_error", error=str(e))
await asyncio.sleep(DAILY_INTERVAL_SEC)

View File

@@ -345,6 +345,24 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]:
except Exception as e: except Exception as e:
logger.warning("approval_timeout_resolver_schedule_failed", error=str(e)) logger.warning("approval_timeout_resolver_schedule_failed", error=str(e))
# ADR-083 Phase 3: 知識遺忘 Job每日— 30d 未引用 KB entry 標記 archived
# 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 3 初始建立
try:
from src.jobs.knowledge_decay_job import run_knowledge_decay_loop
asyncio.create_task(run_knowledge_decay_loop())
logger.info("knowledge_decay_loop_scheduled", interval_sec=86400)
except Exception as e:
logger.warning("knowledge_decay_loop_schedule_failed", error=str(e))
# ADR-083 Phase 3: Fine-tune JSONL 匯出(每週)— EvidenceSnapshot × AgentSession → JSONL
# 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 3 初始建立
try:
from src.services.finetune_exporter import run_finetune_export_loop
asyncio.create_task(run_finetune_export_loop())
logger.info("finetune_export_loop_scheduled", interval_sec=604800)
except Exception as e:
logger.warning("finetune_export_loop_schedule_failed", error=str(e))
# Phase 4 ADR-084: 主動巡檢每 5 分鐘執行一次 # Phase 4 ADR-084: 主動巡檢每 5 分鐘執行一次
# 協調 DynamicBaselineService + LogAnomalyDetector + TrendPredictor # 協調 DynamicBaselineService + LogAnomalyDetector + TrendPredictor
# Shadow Mode 控制AIOPS_P4_SHADOW_MODE=True 時只記錄,不觸發 Alert # Shadow Mode 控制AIOPS_P4_SHADOW_MODE=True 時只記錄,不觸發 Alert

View File

@@ -568,6 +568,25 @@ class ApprovalExecutionService:
action=action_taken, action=action_taken,
) )
# ADR-083 Phase 3 Root cause 3: 驗證結果接線到學習服務
# 環境驗證Pod Running / 指標恢復)是比執行 exit code 更精確的學習訊號
# 2026-04-15 ogt + Claude Sonnet 4.6(亞太)
try:
from src.services.learning_service import get_learning_service
_matched_pb_id = getattr(approval, "matched_playbook_id", None)
await get_learning_service().record_verification_result(
incident_id=approval.incident_id,
action_taken=action_taken,
verification_result=verification_result,
matched_playbook_id=_matched_pb_id,
)
except Exception as _lerr:
logger.warning(
"post_verify_learning_failed",
approval_id=str(approval.id),
error=str(_lerr),
)
except Exception as _e: except Exception as _e:
# 驗證失敗不影響執行結果 # 驗證失敗不影響執行結果
logger.warning( logger.warning(

View File

@@ -0,0 +1,247 @@
"""
AWOOOI AIOps Phase 3 — Fine-tune JSONL 匯出器
=============================================
職責每週將EvidenceSnapshot × AgentSession × AutoRepairExecution
組合成訓練對instruction, input, output匯出為 JSONL 檔案供 LLM 微調。
為什麼需要 fine-tune 管線?
EWMA Playbook trust 只調整「選哪個 Playbook」
但 LLM 本身的推理模式(症狀識別、根因分析、行動描述格式)無法從 EWMA 學習。
Fine-tune 資料管線讓 AI 從真實成功案例中學習「如何推理」,
不只學習「信任哪個 Playbook」。
匯出策略:
- 查詢 incident_evidence 中 verification_result = 'success' 且有 evidence_summary 的記錄
- 聯結同 incident_id 的 AgentSessioncoordinator turn取得推理決策
- 聯結 auto_repair_executions 取得實際執行動作
- 組合成 JSONL 格式Alpaca instruction-input-output 格式)
- 輸出到 FINETUNE_EXPORT_PATH預設 /tmp/finetune/MinIO 支援待設定
JSONL 格式(每行 1 個 JSON 物件):
{
"instruction": "根據 AIOps 情報摘要,分析告警根因並提出修復建議",
"input": "<evidence_summary>",
"output": "<coordinator 推理決策 + 執行動作>",
"metadata": {
"incident_id": "...",
"alertname": "...",
"verification_result": "success",
"collected_at": "...",
"schema_version": "v1"
}
}
設計原則:
1. 只匯出 verification_result = 'success' 的記錄(負向案例不入訓練集,避免強化錯誤模式)
2. 每次匯出加時間戳前綴(不覆蓋舊檔)
3. 每批最多 500 筆(大規模訓練集需分批)
4. 失敗只記錄 error不影響主路徑
ADR-083 Phase 3: Fine-tune 管線L7×D4
2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 3 初始建立
"""
from __future__ import annotations
import asyncio
import json
import os
from datetime import timedelta
from pathlib import Path
import structlog
from sqlalchemy import and_, select
from src.db.base import get_session_factory
from src.db.models import AgentSession, AutoRepairExecution, IncidentEvidence
from src.utils.timezone import now_taipei
logger = structlog.get_logger(__name__)
# ─────────────────────────────────────────────────────────────────────────────
# 常數
# ─────────────────────────────────────────────────────────────────────────────
FINETUNE_EXPORT_PATH = os.getenv("FINETUNE_EXPORT_PATH", "/tmp/finetune")
BATCH_LIMIT = 500
EXPORT_LOOKBACK_DAYS = 7 # 只匯出過去 N 天的資料
WEEKLY_INTERVAL_SEC = 7 * 86_400
INSTRUCTION = (
"根據以下 AIOps 情報摘要EvidenceSnapshot"
"分析告警根因並提出具體的修復建議,說明修復動作的理由。"
)
# ─────────────────────────────────────────────────────────────────────────────
# Fine-tune Exporter
# ─────────────────────────────────────────────────────────────────────────────
class FineTuneExporter:
"""
Fine-tune JSONL 匯出器(每週執行)
Usage:
exporter = FineTuneExporter()
path, count = await exporter.export()
"""
async def export(self) -> tuple[str | None, int]:
"""
匯出訓練資料。
Returns:
(output_file_path, row_count)
若無資料或功能關閉,返回 (None, 0)
"""
from src.core.feature_flags import aiops_flags
if not aiops_flags.AIOPS_P3_FINETUNE_EXPORT:
logger.debug("finetune_exporter_skipped_feature_flag")
return None, 0
try:
return await self._run_export()
except Exception as e:
logger.error("finetune_exporter_error", error=str(e))
return None, 0
async def _run_export(self) -> tuple[str | None, int]:
cutoff = now_taipei() - timedelta(days=EXPORT_LOOKBACK_DAYS)
session_factory = get_session_factory()
async with session_factory() as db:
# 1. 取得成功驗證的 EvidenceSnapshot有 evidence_summary + verification_result='success'
stmt = select(IncidentEvidence).where(
and_(
IncidentEvidence.verification_result == "success",
IncidentEvidence.evidence_summary.isnot(None),
IncidentEvidence.collected_at >= cutoff,
)
).limit(BATCH_LIMIT)
result = await db.execute(stmt)
evidences = result.scalars().all()
if not evidences:
logger.info("finetune_exporter_no_data", lookback_days=EXPORT_LOOKBACK_DAYS)
return None, 0
# 2. 為每筆 evidence 取對應的 coordinator AgentSession + AutoRepairExecution
rows: list[dict] = []
for ev in evidences:
row = await self._build_row(db, ev)
if row:
rows.append(row)
if not rows:
return None, 0
# 3. 寫出 JSONL
output_path = await self._write_jsonl(rows)
logger.info(
"finetune_export_done",
row_count=len(rows),
path=output_path,
)
return output_path, len(rows)
async def _build_row(self, db, ev: IncidentEvidence) -> dict | None:
"""組合單筆訓練對。"""
# 取 coordinator Agent turn若有
agent_stmt = select(AgentSession).where(
and_(
AgentSession.incident_id == ev.incident_id,
AgentSession.agent_role == "coordinator",
)
).order_by(AgentSession.created_at.desc()).limit(1)
agent_result = await db.execute(agent_stmt)
coordinator = agent_result.scalar_one_or_none()
# 取最新執行記錄
exec_stmt = select(AutoRepairExecution).where(
AutoRepairExecution.incident_id == ev.incident_id,
).order_by(AutoRepairExecution.created_at.desc()).limit(1)
exec_result = await db.execute(exec_stmt)
execution = exec_result.scalar_one_or_none()
# 組合 output 文字
output_parts: list[str] = []
if coordinator and coordinator.output_json:
coord_out = coordinator.output_json
if isinstance(coord_out, dict):
# 取 reasoning 或 decision 字段
reasoning = (
coord_out.get("reasoning")
or coord_out.get("decision")
or str(coord_out)[:500]
)
output_parts.append(f"[AI 決策]\n{reasoning}")
if execution:
action_desc = execution.playbook_name or "未知"
if execution.executed_steps:
steps = execution.executed_steps
if isinstance(steps, list) and steps:
first = steps[0]
if isinstance(first, dict):
action_desc = first.get("action") or first.get("step") or action_desc
output_parts.append(f"[執行動作]\n{action_desc}")
output_parts.append(
f"[執行結果] {'成功' if execution.success else '失敗'}"
)
if not output_parts:
return None # 無 output 資料,跳過
# 取 alertname優先從 ev 關聯 incident 的 signal labels
alertname = ev.incident_id # fallback to incident_id
return {
"instruction": INSTRUCTION,
"input": (ev.evidence_summary or "").strip(),
"output": "\n\n".join(output_parts),
"metadata": {
"incident_id": ev.incident_id,
"alertname": alertname,
"verification_result": ev.verification_result,
"collected_at": ev.collected_at.isoformat() if ev.collected_at else None,
"schema_version": ev.schema_version,
"matched_playbook_id": ev.matched_playbook_id,
},
}
async def _write_jsonl(self, rows: list[dict]) -> str:
"""寫出 JSONL 到 FINETUNE_EXPORT_PATH。"""
export_dir = Path(FINETUNE_EXPORT_PATH)
export_dir.mkdir(parents=True, exist_ok=True)
ts = now_taipei().strftime("%Y%m%d-%H%M%S")
filename = f"finetune-{ts}.jsonl"
output_path = export_dir / filename
with open(output_path, "w", encoding="utf-8") as f:
for row in rows:
f.write(json.dumps(row, ensure_ascii=False) + "\n")
return str(output_path)
# ─────────────────────────────────────────────────────────────────────────────
# Loop掛載到 main.py
# ─────────────────────────────────────────────────────────────────────────────
async def run_finetune_export_loop() -> None:
"""
無限迴圈:每 7 天執行一次 fine-tune 資料匯出。
在 main.py startup 以 asyncio.create_task 掛載。
"""
exporter = FineTuneExporter()
while True:
try:
path, count = await exporter.export()
if count > 0:
logger.info("finetune_export_loop_tick", rows=count, path=path)
except Exception as e:
logger.error("finetune_export_loop_error", error=str(e))
await asyncio.sleep(WEEKLY_INTERVAL_SEC)

View File

@@ -645,6 +645,110 @@ class LearningService:
) )
return False return False
async def record_diagnosis_outcome(
self,
incident_id: str,
matched_playbook_id: str | None,
was_correct: bool,
actual_fix: str | None = None,
) -> None:
"""
記錄 AI 診斷結果的正確性。
ADR-083 Phase 3: 誤診回寫 playbook_diagnosis_feedbackL3×D4
當 AI 提議被人工拒絕、或執行後驗證失敗,代表診斷可能有誤;
此時回寫負向信號,讓對應 Playbook trust_score EWMA 收縮。
Args:
incident_id: 關聯 Incident ID
matched_playbook_id: 此次診斷使用的 Playbook ID若有
was_correct: 診斷是否正確False = 誤診)
actual_fix: 實際有效的修復動作(可供 Evolver 學習)
2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 3 誤診回饋接線
"""
# 1. 記錄到 Repositorydiag: 前綴與 exec: / verify: 區分)
try:
await self._repository.record_repair(
anomaly_key=f"diag:{incident_id}",
repair_action=actual_fix or "unknown",
success=was_correct,
fix_description=f"diagnosis_correct={was_correct}",
)
except Exception as e:
logger.warning(
"record_diagnosis_to_repo_failed",
incident_id=incident_id,
error=str(e),
)
# 2. 誤診時強化 Playbook 負向學習(已有 2x EWMA 衰減係數)
if matched_playbook_id and not was_correct:
await self._update_playbook_stats(
playbook_id=matched_playbook_id,
success=False,
)
logger.info(
"diagnosis_outcome_recorded",
incident_id=incident_id,
was_correct=was_correct,
matched_playbook_id=matched_playbook_id,
)
async def record_verification_result(
self,
incident_id: str,
action_taken: str,
verification_result: str,
matched_playbook_id: str | None = None,
) -> None:
"""
記錄環境驗證結果到學習系統。
ADR-083 Phase 3 Root cause 3: post_execution_verifier → learning 接線修復。
環境驗證Pod Running / 指標恢復)比執行指令 exit code 更精確,
單獨存一條 verify: 前綴記錄,並更新 Playbook EWMA stats。
Args:
incident_id: 關聯 Incident ID
action_taken: 執行的動作描述(例如 "restart_service:awoooi-api"
verification_result: "success" | "degraded" | "failed" | "timeout"
matched_playbook_id: 匹配的 Playbook ID有則更新 EWMA stats
2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 3 Root cause 3 修復
"""
success = (verification_result == "success")
# 1. 記錄環境驗證結果到 Repositoryanomaly_key 加 verify: 前綴與執行記錄區分)
try:
await self._repository.record_repair(
anomaly_key=f"verify:{incident_id}",
repair_action=action_taken,
success=success,
fix_description=verification_result,
)
except Exception as e:
logger.warning(
"record_verification_to_repo_failed",
incident_id=incident_id,
error=str(e),
)
# 2. 更新 Playbook EWMA stats比執行 exit code 更精確的訊號)
if matched_playbook_id:
await self._update_playbook_stats(
playbook_id=matched_playbook_id,
success=success,
)
logger.info(
"verification_result_recorded",
incident_id=incident_id,
verification_result=verification_result,
matched_playbook_id=matched_playbook_id,
)
async def get_recommended_fix(self, anomaly_key: str) -> dict: async def get_recommended_fix(self, anomaly_key: str) -> dict:
""" """
根據歷史學習,推薦最佳修復方案 根據歷史學習,推薦最佳修復方案

View File

@@ -1529,3 +1529,33 @@ Phase 6 完成後
- [ ] 生產驗證(等 3ce5025 / Phase 6 image 部署後觀察) - [ ] 生產驗證(等 3ce5025 / Phase 6 image 部署後觀察)
**commit chain** fab65e7 → f31b4e3 → f045506 → f9ba200 → 3ce5025 → (Phase 6 REST API commit) **commit chain** fab65e7 → f31b4e3 → f045506 → f9ba200 → 3ce5025 → (Phase 6 REST API commit)
---
### 2026-04-15 深夜 (台北) — Phase 3 學習閉環補完 — Root cause 3 + 診斷 feedback + 知識遺忘 + Fine-tune 管線
**本次完成Phase 3「核心改造項」全部落地**
| 檔案 | 修改內容 | 對應項目 |
|------|---------|---------|
| `services/approval_execution.py` | `_run_post_execution_verify()` 補接 `record_verification_result()` 呼叫Root cause 3 終結 | 驗證結果 → 學習接線 |
| `services/learning_service.py` | 新增 `record_verification_result()`環境驗證結果success/degraded/failed/timeout回寫 Redis + 更新 Playbook EWMA | Root cause 3 接線 |
| `services/learning_service.py` | 新增 `record_diagnosis_outcome()`:誤診(人工拒絕/驗證失敗)回寫負向 Playbook 訊號 | 診斷 feedbackL3×D4 |
| `jobs/knowledge_decay_job.py` | **新建** 每日 Job30 天未引用view_count=0的 draft/review KB 條目標 archived + tag `kb_decay_30d` | 知識遺忘L7×D4 |
| `services/finetune_exporter.py` | **新建** 每週 Jobverification_result='success' 的 EvidenceSnapshot × AgentSession × AutoRepairExecution → Alpaca JSONL → `/tmp/finetune/` | Fine-tune 管線L7×D4 |
| `main.py` | 掛載 `run_knowledge_decay_loop`(每 24h+ `run_finetune_export_loop`(每 7d| Job 調度 |
**Phase 3 退出條件更新:**
- [x] Root cause 1fire-and-forget → await7da64ea
- [x] Root cause 2matched_playbook_id 永不填充7da64ea
- [x] Root cause 3驗證結果未傳學習本次
- [x] 2x EWMA 負向衰減7da64eaplaybook_repository.py
- [x] Evolver Agent7da64eaplaybook_evolver.py
- [x] 診斷 feedback本次record_diagnosis_outcome
- [x] 知識遺忘 Job本次knowledge_decay_job.py
- [x] Fine-tune 管線本次finetune_exporter.py
- [ ] `matched_playbook_id` null 率 = 0生產驗證需 7 天監控)
- [ ] Playbook trust_score 有 ≥ 1 筆 24h 動態更新(生產驗證)
- [ ] Fine-tune JSONL ≥ 10 條(待 EvidenceSnapshot 累積 7 天後驗證)
**下一步:** 推 Gitea → CD 部署 → 7 天生產觀察 Phase 3 退出條件