Files
awoooi/apps/api/src/services/playbook_evolver.py
OG T 01fb531c02 fix(Phase 3): Evolver force=True bypass flag + 清理未使用 import
- run_evolver(force=True):管理員手動端點可繞過 feature flag
- 移除 typing.Any 未使用 import
- 移除 _merge_similar 中冗餘的 calculate_jaccard_similarity import

ADR-083 Phase 3 — 2026-04-15 ogt + Claude Sonnet 4.6(亞太)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-15 21:09:01 +08:00

378 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
AWOOOI AIOps Phase 3 — Playbook Evolver Agent知識演化官
==========================================================
職責Playbook 自動合併、低信任封存、知識庫精化
觸發方式:定時(每日凌晨)或手動呼叫 `run_evolver()`
三大功能:
1. 低信任封存 — trust_score < 0.1 → DEPRECATED自動退場
2. 休眠封存 — 30 天未使用 AND trust_score < 0.5 → DEPRECATED
3. 相似合併 — Jaccard 症狀相似度 > 0.9 → 合併為一(保留 trust 較高者)
設計原則:
- 純靜態規則(不依賴 LLM— 保證確定性
- 熔斷保護:單筆操作失敗不影響其他 Playbook
- best-effort 審計:合併/封存寫 logger.info
- feature flag 保護AIOPS_P3_EVOLVER_ENABLED=False 時靜默跳過
ADR-083: Phase 3 學習閉環重建
2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 3 初始建立
"""
from __future__ import annotations
import asyncio
from dataclasses import dataclass, field
from datetime import timedelta
import structlog
from src.models.playbook import Playbook, PlaybookStatus
from src.utils.timezone import now_taipei
logger = structlog.get_logger(__name__)
# ── 閾值常數 ────────────────────────────────────────────────────────────────
TRUST_ARCHIVE_THRESHOLD = 0.1 # trust_score < 此值 → 封存
DORMANT_TRUST_THRESHOLD = 0.5 # 休眠封存trust < 此值 AND 30d 未用
DORMANT_DAYS = 30 # 休眠天數
MERGE_SIMILARITY_THRESHOLD = 0.9 # 症狀相似度 > 此值 → 合併候選
MAX_MERGE_PER_RUN = 10 # 單次合併上限(防止操作過多)
# ─────────────────────────────────────────────────────────────────────────────
# Result Types
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class EvolverReport:
"""Evolver 執行報告"""
archived_count: int = 0
merged_count: int = 0
skipped_count: int = 0
archived_ids: list[str] = field(default_factory=list)
merged_pairs: list[tuple[str, str]] = field(default_factory=list) # (deprecated_id, kept_id)
errors: list[str] = field(default_factory=list)
@property
def total_affected(self) -> int:
return self.archived_count + self.merged_count
# ─────────────────────────────────────────────────────────────────────────────
# Main Entry Point
# ─────────────────────────────────────────────────────────────────────────────
async def run_evolver(force: bool = False) -> EvolverReport:
"""
執行 Evolver Agent 全流程。
Args:
force: True 時跳過 feature flag 檢查(供管理員手動觸發端點使用)
Returns:
EvolverReport包含所有操作結果
Raises:
不拋出 — 所有錯誤內部吸收,最壞情況返回空報告
"""
from src.core.feature_flags import aiops_flags
if not force and not aiops_flags.AIOPS_P3_EVOLVER_ENABLED:
logger.debug("evolver_skipped", reason="AIOPS_P3_EVOLVER_ENABLED=False")
return EvolverReport()
report = EvolverReport()
try:
playbooks = await _fetch_all_active_playbooks()
if not playbooks:
logger.info("evolver_no_playbooks")
return report
# Step 1: 低信任封存
await _archive_low_trust(playbooks, report)
# Step 2: 休眠封存(剩餘 APPROVED/DRAFT 中挑)
remaining = [p for p in playbooks if p.status not in (PlaybookStatus.DEPRECATED,)]
await _archive_dormant(remaining, report)
# Step 3: 相似合併
still_active = [p for p in remaining if p.status not in (PlaybookStatus.DEPRECATED,)]
await _merge_similar(still_active, report)
logger.info(
"evolver_done",
archived=report.archived_count,
merged=report.merged_count,
skipped=report.skipped_count,
errors=len(report.errors),
)
except Exception:
logger.exception("evolver_fatal")
return report
# ─────────────────────────────────────────────────────────────────────────────
# Step Implementations
# ─────────────────────────────────────────────────────────────────────────────
async def _archive_low_trust(playbooks: list[Playbook], report: EvolverReport) -> None:
"""Step 1: trust_score < 0.1 → DEPRECATED自動退場"""
from src.services.playbook_service import get_playbook_service
service = get_playbook_service()
for pb in playbooks:
if pb.status == PlaybookStatus.DEPRECATED:
continue
if pb.trust_score < TRUST_ARCHIVE_THRESHOLD:
try:
await service.update_with_validation(
pb.playbook_id,
{"status": PlaybookStatus.DEPRECATED.value},
)
logger.info(
"evolver_archived_low_trust",
playbook_id=pb.playbook_id,
playbook_name=pb.name,
trust_score=pb.trust_score,
)
report.archived_count += 1
report.archived_ids.append(pb.playbook_id)
# 原地更新 status 避免後續步驟重複處理
pb.status = PlaybookStatus.DEPRECATED
except Exception as e:
report.errors.append(f"archive_low_trust:{pb.playbook_id}:{e}")
logger.warning(
"evolver_archive_failed",
playbook_id=pb.playbook_id,
error=str(e),
)
async def _archive_dormant(playbooks: list[Playbook], report: EvolverReport) -> None:
"""Step 2: 30d 未使用 AND trust < 0.5 → DEPRECATED休眠退場"""
from src.services.playbook_service import get_playbook_service
service = get_playbook_service()
cutoff = now_taipei() - timedelta(days=DORMANT_DAYS)
for pb in playbooks:
if pb.status == PlaybookStatus.DEPRECATED:
continue
if pb.last_used_at is None:
# 從未使用過 — 只在 trust 低於閾值時封存
if pb.trust_score >= DORMANT_TRUST_THRESHOLD:
report.skipped_count += 1
continue
elif pb.last_used_at > cutoff:
# 30 天內有使用 — 不封存
report.skipped_count += 1
continue
if pb.trust_score >= DORMANT_TRUST_THRESHOLD:
# trust 夠高 — 保留休眠 Playbook
report.skipped_count += 1
continue
try:
await service.update_with_validation(
pb.playbook_id,
{"status": PlaybookStatus.DEPRECATED.value},
)
logger.info(
"evolver_archived_dormant",
playbook_id=pb.playbook_id,
playbook_name=pb.name,
trust_score=pb.trust_score,
last_used_at=str(pb.last_used_at),
)
report.archived_count += 1
report.archived_ids.append(pb.playbook_id)
pb.status = PlaybookStatus.DEPRECATED
except Exception as e:
report.errors.append(f"archive_dormant:{pb.playbook_id}:{e}")
logger.warning(
"evolver_dormant_archive_failed",
playbook_id=pb.playbook_id,
error=str(e),
)
async def _merge_similar(playbooks: list[Playbook], report: EvolverReport) -> None:
"""
Step 3: 症狀 Jaccard 相似度 > 0.9 → 合併為一
策略:保留 trust_score 較高的那筆,將較差的標記 DEPRECATED
合併次數上限 MAX_MERGE_PER_RUN避免單次操作影響太多。
"""
from src.services.playbook_service import get_playbook_service
service = get_playbook_service()
merged_set: set[str] = set() # 已合併(或被合併)的 playbook_id
active = [p for p in playbooks if p.status not in (PlaybookStatus.DEPRECATED,)]
merge_count = 0
for i, pb_a in enumerate(active):
if pb_a.playbook_id in merged_set:
continue
if merge_count >= MAX_MERGE_PER_RUN:
break
for pb_b in active[i + 1:]:
if pb_b.playbook_id in merged_set:
continue
if merge_count >= MAX_MERGE_PER_RUN:
break
sim = _compute_symptom_similarity(pb_a, pb_b)
if sim < MERGE_SIMILARITY_THRESHOLD:
continue
# 相似度 >= 0.9 → 合併
# 保留 trust 較高的,封存較差的
keep, drop = (
(pb_a, pb_b) if pb_a.trust_score >= pb_b.trust_score else (pb_b, pb_a)
)
try:
# 把 drop 的來源 incident 合入 keep
merged_source_ids = list(
set(keep.source_incident_ids) | set(drop.source_incident_ids)
)
await service.update_with_validation(
keep.playbook_id,
{"source_incident_ids": merged_source_ids},
)
# 封存被合併的
await service.update_with_validation(
drop.playbook_id,
{"status": PlaybookStatus.DEPRECATED.value},
)
logger.info(
"evolver_merged",
kept_id=keep.playbook_id,
dropped_id=drop.playbook_id,
similarity=f"{sim:.2f}",
kept_trust=keep.trust_score,
dropped_trust=drop.trust_score,
)
merged_set.add(drop.playbook_id)
report.merged_count += 1
report.merged_pairs.append((drop.playbook_id, keep.playbook_id))
drop.status = PlaybookStatus.DEPRECATED
merge_count += 1
except Exception as e:
report.errors.append(f"merge:{keep.playbook_id}+{drop.playbook_id}:{e}")
logger.warning(
"evolver_merge_failed",
keep_id=keep.playbook_id,
drop_id=drop.playbook_id,
error=str(e),
)
# ─────────────────────────────────────────────────────────────────────────────
# Helpers
# ─────────────────────────────────────────────────────────────────────────────
async def _fetch_all_active_playbooks() -> list[Playbook]:
"""抓取所有非 DEPRECATED 的 Playbook用於 Evolver 掃描)。"""
try:
from src.services.playbook_service import get_playbook_service
service = get_playbook_service()
# 不過濾 status — Evolver 需要看到 DRAFT + APPROVED
playbooks_page, total = await service.list_playbooks(limit=500)
return playbooks_page
except Exception as e:
logger.warning("evolver_fetch_playbooks_failed", error=str(e))
return []
def _compute_symptom_similarity(pb_a: Playbook, pb_b: Playbook) -> float:
"""
計算兩個 Playbook 症狀模式的 Jaccard 相似度。
組合三維度:
- alert_names Jaccard權重 0.5
- keywords Jaccard權重 0.3
- affected_services Jaccard權重 0.2
"""
from src.utils.similarity import calculate_jaccard_similarity
sp_a = pb_a.symptom_pattern
sp_b = pb_b.symptom_pattern
alert_sim = calculate_jaccard_similarity(
set(sp_a.alert_names), set(sp_b.alert_names)
)
keyword_sim = calculate_jaccard_similarity(
set(sp_a.keywords), set(sp_b.keywords)
)
# affected_services 為空 = 通用型 Playbook → 視為完全相符
if not sp_a.affected_services and not sp_b.affected_services:
service_sim = 1.0
elif not sp_a.affected_services or not sp_b.affected_services:
service_sim = 0.5 # 一個通用一個特定 → 中等
else:
service_sim = calculate_jaccard_similarity(
set(sp_a.affected_services), set(sp_b.affected_services)
)
return 0.5 * alert_sim + 0.3 * keyword_sim + 0.2 * service_sim
# ─────────────────────────────────────────────────────────────────────────────
# Singleton / Scheduling Hook
# ─────────────────────────────────────────────────────────────────────────────
async def schedule_daily_evolver() -> None:
"""
供 startup 或 APScheduler 呼叫的每日 Evolver 觸發點。
呼叫方式main.py lifespan 或 scheduler
asyncio.create_task(schedule_daily_evolver())
"""
from src.core.feature_flags import aiops_flags
if not aiops_flags.AIOPS_P3_EVOLVER_ENABLED:
return
logger.info("evolver_daily_scheduled")
try:
report = await run_evolver()
logger.info(
"evolver_daily_done",
archived=report.archived_count,
merged=report.merged_count,
)
except Exception:
logger.exception("evolver_daily_failed")
DAILY_INTERVAL_SEC = 86_400 # 24h
async def run_evolver_loop() -> None:
"""
無限迴圈:每 24 小時執行一次 Evolver Agent。
在 main.py startup 以 asyncio.create_task 掛載。
ADR-083 Phase 3: Evolver 每日掃描L4×D3 + L5×D2
2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 3 初始建立
"""
while True:
try:
await schedule_daily_evolver()
except Exception as e:
logger.error("evolver_loop_error", error=str(e))
await asyncio.sleep(DAILY_INTERVAL_SEC)