Files
awoooi/apps/api/src/services/learning_service.py
OG T 7da64eaad2
Some checks failed
CD Pipeline / build-and-deploy (push) Failing after 19m7s
Type Sync Check / check-type-sync (push) Failing after 1m18s
feat(Phase 3): 學習閉環重建 — 三根因修復 + 2x EWMA + Evolver Agent
ADR-083 Phase 3 學習閉環重建:

**三根因修復**
- approval_execution.py: fire-and-forget create_task → await asyncio.wait_for(timeout=30) × 2
  (成功路徑 L265 + 失敗路徑 L353,超時記錄 learning_trigger_timeout metric,主流程不 crash)
- models/approval.py: ApprovalRequestBase 新增 matched_playbook_id 欄位
- decision_manager.py: _auto_execute 建立 ApprovalRequest 時填充 matched_playbook_id
- learning_service.py: 雙路徑查找 _matched_pb_id(matched_playbook_id + metadata fallback)

**2x EWMA 負向強化**
- models/playbook.py: 新增 trust_score: float = 0.3(EWMA 動態信任度欄位)
- repositories/playbook_repository.py: update_stats 加 EWMA
  成功: trust = 0.9 × old + 0.1 × 1.0
  失敗: trust = 0.8 × old + 0.2 × 0.0(衰減速度 2x)
  trust < 0.1 → log warning,等 Evolver 封存

**Evolver Agent(新建)**
- services/playbook_evolver.py: 三功能全靜態規則
  1. 低信任封存: trust < 0.1 → DEPRECATED
  2. 休眠封存: 30d 未使用 AND trust < 0.5 → DEPRECATED
  3. 相似合併: 症狀 Jaccard > 0.9 → 保留高 trust,封存低 trust
  AIOPS_P3_EVOLVER_ENABLED=False 預設關閉

**文件**
- ADR-083 學習閉環重建
- MASTER §8 Phase 3 完工記錄

AIOPS_P3_ENABLED=False(預設),骨架就位等統帥批准開啟

Co-Authored-By: Claude Sonnet 4.6(亞太)<noreply@anthropic.com>
2026-04-15 14:01:37 +08:00

770 lines
26 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Learning Service - Phase 5 持續學習迴圈
======================================
ADR-030: 智能自動修復系統
Phase D-G P0 修正: 符合 leWOOOgo 積木化原則
從執行結果中學習,持續優化決策:
1. 更新 Playbook 統計 (成功率/執行次數)
2. 調整信任度 (成功 +分 / 失敗 -分)
3. 萃取新 Playbook (成功案例自動萃取)
4. 處理人工反饋 (有效性評分)
5. 🆕 Redis 持久化學習數據 (透過 Repository)
6. 🆕 修復推薦 (基於歷史成功率)
設計原則:
- 非同步執行,不阻塞主流程
- 失敗容忍,學習失敗不影響執行結果
- 完整審計追蹤
- 🆕 Service 不直接存取 Redis (透過 ILearningRepository)
版本: v1.1
建立: 2026-03-26 (台北時區)
更新: 2026-03-29 (台北時區) - P0 修正: 新增 Repository 層
"""
from dataclasses import dataclass, field
from datetime import UTC, datetime
from enum import Enum
from typing import Any
import structlog
from src.models.approval import ApprovalRequest
from src.models.incident import IncidentStatus
from src.repositories.interfaces import ILearningRepository
from src.repositories.learning_repository import get_learning_repository
from src.services.trust_engine import get_trust_manager
logger = structlog.get_logger(__name__)
# =============================================================================
# Constants
# =============================================================================
class FeedbackType(str, Enum):
"""反饋類型"""
EXECUTION_SUCCESS = "execution_success" # 執行成功
EXECUTION_FAILURE = "execution_failure" # 執行失敗
HUMAN_APPROVE = "human_approve" # 人工批准
HUMAN_REJECT = "human_reject" # 人工拒絕
HUMAN_OVERRIDE = "human_override" # 人工覆蓋 AI 決策
EFFECTIVENESS_RATING = "effectiveness_rating" # 有效性評分
# 信任度調整參數
TRUST_SUCCESS_BOOST = 1 # 成功 +1 分
TRUST_FAILURE_PENALTY = 2 # 失敗 -2 分 (或歸零)
TRUST_HUMAN_REJECT_PENALTY = 1 # 人工拒絕 -1 分
# =============================================================================
# Data Models
# =============================================================================
@dataclass
class ExecutionResult:
"""執行結果"""
approval_id: str
incident_id: str
action: str
success: bool
error_message: str | None = None
duration_seconds: float = 0.0
executed_at: datetime = field(default_factory=lambda: datetime.now(UTC))
def to_dict(self) -> dict[str, Any]:
return {
"approval_id": self.approval_id,
"incident_id": self.incident_id,
"action": self.action,
"success": self.success,
"error_message": self.error_message,
"duration_seconds": self.duration_seconds,
"executed_at": self.executed_at.isoformat(),
}
@dataclass
class FeedbackRequest:
"""人工反饋請求"""
incident_id: str
feedback_type: FeedbackType
effectiveness_score: int | None = None # 1-5 分
learning_notes: str | None = None # 學習筆記
submitted_by: str | None = None
submitted_at: datetime = field(default_factory=lambda: datetime.now(UTC))
@dataclass
class LearningRecord:
"""學習記錄"""
incident_id: str
feedback_type: FeedbackType
action_pattern: str
trust_before: int
trust_after: int
playbook_updated: bool = False
new_playbook_id: str | None = None
learned_at: datetime = field(default_factory=lambda: datetime.now(UTC))
def to_dict(self) -> dict[str, Any]:
return {
"incident_id": self.incident_id,
"feedback_type": self.feedback_type.value,
"action_pattern": self.action_pattern,
"trust_before": self.trust_before,
"trust_after": self.trust_after,
"playbook_updated": self.playbook_updated,
"new_playbook_id": self.new_playbook_id,
"learned_at": self.learned_at.isoformat(),
}
# =============================================================================
# Learning Service
# =============================================================================
class LearningService:
"""
持續學習服務
職責:
1. 處理執行結果 → 更新 Playbook + 信任度
2. 處理人工反饋 → 調整 Playbook 有效性
3. 萃取新 Playbook (成功案例)
4. 🆕 Redis 持久化學習數據 (透過 Repository)
5. 🆕 修復推薦 (基於歷史成功率)
2026-03-29 P0 修正: 符合 leWOOOgo 積木化原則
- 透過 ILearningRepository 存取 Redis
- 不直接依賴 Redis Client
"""
# 推薦門檻
MIN_SAMPLES = 5 # 最少需要 N 次數據才能推薦
SUCCESS_RATE_THRESHOLD = 0.6 # 成功率門檻
def __init__(
self,
repository: ILearningRepository | None = None,
):
self._trust_manager = get_trust_manager()
self._repository = repository or get_learning_repository()
async def process_execution_result(
self,
approval: ApprovalRequest,
result: ExecutionResult,
) -> LearningRecord:
"""
處理執行結果,觸發學習
Args:
approval: 原始審批請求
result: 執行結果
Returns:
LearningRecord: 學習記錄
"""
action_pattern = self._extract_action_pattern(approval.action)
# 取得當前信任分數
trust_record = self._trust_manager.get_trust_record(action_pattern)
trust_before = trust_record.score if trust_record else 0
# 1. 調整信任度
if result.success:
# 成功: 記錄批准 (信任分數 +1)
self._trust_manager.record_approval(
action_pattern=action_pattern,
user_role="system",
user_id="auto_learning",
)
feedback_type = FeedbackType.EXECUTION_SUCCESS
else:
# 失敗: 記錄拒絕 (信任分數歸零)
self._trust_manager.record_rejection(
action_pattern=action_pattern,
user_role="system",
user_id="auto_learning",
reason=result.error_message,
)
feedback_type = FeedbackType.EXECUTION_FAILURE
# 取得更新後的信任分數
trust_record = self._trust_manager.get_trust_record(action_pattern)
trust_after = trust_record.score if trust_record else 0
# 2. 更新 Playbook 統計 (如果有匹配)
# ADR-083 Phase 3: 雙路徑查找 matched_playbook_id
# 路徑 A: ApprovalRequest.matched_playbook_idauto_execute 路徑Phase 3 修復)
# 路徑 B: approval.metadata["playbook_id"](人工審核路徑,透過 proposal_service 存入 metadata
# 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 3 Playbook EWMA 修復
_matched_pb_id: str | None = (
getattr(approval, "matched_playbook_id", None)
or (approval.metadata or {}).get("matched_playbook_id")
or (approval.metadata or {}).get("playbook_id")
)
playbook_updated = False
if _matched_pb_id:
try:
await self._update_playbook_stats(
playbook_id=_matched_pb_id,
success=result.success,
)
playbook_updated = True
except Exception as e:
logger.warning(
"playbook_stats_update_failed",
playbook_id=_matched_pb_id,
error=str(e),
)
# 3. 嘗試萃取新 Playbook (成功且無匹配 Playbook)
new_playbook_id = None
if result.success and not _matched_pb_id:
try:
new_playbook_id = await self._try_extract_playbook(
incident_id=result.incident_id,
action=approval.action,
)
except Exception as e:
logger.warning(
"playbook_extraction_failed",
incident_id=result.incident_id,
error=str(e),
)
# 建立學習記錄
record = LearningRecord(
incident_id=result.incident_id,
feedback_type=feedback_type,
action_pattern=action_pattern,
trust_before=trust_before,
trust_after=trust_after,
playbook_updated=playbook_updated,
new_playbook_id=new_playbook_id,
)
logger.info(
"learning_completed",
incident_id=result.incident_id,
success=result.success,
trust_change=f"{trust_before}{trust_after}",
playbook_updated=playbook_updated,
new_playbook=new_playbook_id,
)
return record
async def process_human_feedback(
self,
feedback: FeedbackRequest,
) -> LearningRecord:
"""
處理人工反饋
Args:
feedback: 反饋請求
Returns:
LearningRecord: 學習記錄
"""
# 從 incident 取得 action pattern (需查詢)
action_pattern = f"incident:{feedback.incident_id}"
trust_record = self._trust_manager.get_trust_record(action_pattern)
trust_before = trust_record.score if trust_record else 0
playbook_updated = False
if feedback.feedback_type == FeedbackType.HUMAN_APPROVE:
# 人工批准: 信任 +1
self._trust_manager.record_approval(
action_pattern=action_pattern,
user_role="human",
user_id=feedback.submitted_by,
)
elif feedback.feedback_type == FeedbackType.HUMAN_REJECT:
# 人工拒絕: 信任歸零
self._trust_manager.record_rejection(
action_pattern=action_pattern,
user_role="human",
user_id=feedback.submitted_by,
reason="Human rejected",
)
elif feedback.feedback_type == FeedbackType.EFFECTIVENESS_RATING:
# 有效性評分
if feedback.effectiveness_score is not None:
if feedback.effectiveness_score >= 4:
# 高評分: 增加信任
self._trust_manager.record_approval(
action_pattern=action_pattern,
user_role="feedback",
user_id=feedback.submitted_by,
)
playbook_updated = await self._promote_playbook(feedback.incident_id)
elif feedback.effectiveness_score <= 2:
# 低評分: 降低信任
self._trust_manager.record_rejection(
action_pattern=action_pattern,
user_role="feedback",
user_id=feedback.submitted_by,
reason=f"Low effectiveness score: {feedback.effectiveness_score}",
)
playbook_updated = await self._demote_playbook(feedback.incident_id)
trust_record = self._trust_manager.get_trust_record(action_pattern)
trust_after = trust_record.score if trust_record else 0
record = LearningRecord(
incident_id=feedback.incident_id,
feedback_type=feedback.feedback_type,
action_pattern=action_pattern,
trust_before=trust_before,
trust_after=trust_after,
playbook_updated=playbook_updated,
)
logger.info(
"human_feedback_processed",
incident_id=feedback.incident_id,
feedback_type=feedback.feedback_type.value,
effectiveness_score=feedback.effectiveness_score,
trust_change=f"{trust_before}{trust_after}",
)
return record
# =========================================================================
# Private Methods
# =========================================================================
def _extract_action_pattern(self, action: str) -> str:
"""從 action 字串提取 pattern"""
if not action:
return "unknown"
parts = action.split()
if len(parts) < 3:
return "unknown"
verb = parts[1] if len(parts) > 1 else "unknown"
resource_part = parts[2] if len(parts) > 2 else ""
if "/" in resource_part:
resource_name = resource_part.split("/")[-1]
else:
resource_name = resource_part
# 移除 pod hash suffix
resource_parts = resource_name.split("-")
if len(resource_parts) >= 3:
resource_name = "-".join(resource_parts[:-2]) + "-*"
return f"{verb}:{resource_name}"
async def _update_playbook_stats(
self,
playbook_id: str,
success: bool,
) -> None:
"""更新 Playbook 統計"""
try:
from src.services.playbook_service import get_playbook_service
service = get_playbook_service()
await service.record_execution(playbook_id, success)
except Exception as e:
logger.warning(
"playbook_stats_update_error",
playbook_id=playbook_id,
error=str(e),
)
async def _try_extract_playbook(
self,
incident_id: str,
action: str,
) -> str | None:
"""嘗試從成功案例萃取 Playbook"""
try:
from src.repositories.incident_repository import get_incident_repository
from src.services.playbook_service import get_playbook_service
# 取得 Incident
repo = get_incident_repository()
incident = await repo.get_by_id(incident_id)
if not incident:
return None
# 確認狀態為 RESOLVED
if incident.status not in [IncidentStatus.RESOLVED, IncidentStatus.CLOSED]:
return None
# 萃取 Playbook
service = get_playbook_service()
playbook = await service.extract_from_incident(
incident=incident,
auto_approve=False, # 需人工審核
)
if playbook:
logger.info(
"playbook_auto_extracted",
incident_id=incident_id,
playbook_id=playbook.playbook_id,
)
return playbook.playbook_id
return None
except Exception as e:
logger.warning(
"playbook_extraction_error",
incident_id=incident_id,
error=str(e),
)
return None
async def _promote_playbook(self, incident_id: str) -> bool:
"""
提升 Playbook 信心度 (高評分)
2026-03-30 Claude Code: 實作信心度提升邏輯
邏輯:
- 尋找 source_incident_ids 包含此 incident_id 的 Playbooks
- 提升 ai_confidence +0.1 (上限 1.0)
- 若信心度 >= 0.9 且 status == DRAFT → 自動升級為 APPROVED
"""
try:
from src.repositories.playbook_repository import get_playbook_repository
repo = get_playbook_repository()
playbooks = await repo.find_by_source_incident(incident_id)
if not playbooks:
logger.debug(
"playbook_promote_no_match",
incident_id=incident_id,
)
return False
# 信心度提升參數
CONFIDENCE_BOOST = 0.1
updated_count = 0
for playbook in playbooks:
result = await repo.adjust_confidence(
playbook_id=playbook.playbook_id,
delta=CONFIDENCE_BOOST,
reason=f"High effectiveness rating from incident {incident_id}",
)
if result:
updated_count += 1
logger.info(
"playbook_promoted",
incident_id=incident_id,
updated_count=updated_count,
total_playbooks=len(playbooks),
)
return updated_count > 0
except Exception as e:
logger.warning(
"playbook_promote_failed",
incident_id=incident_id,
error=str(e),
)
return False
async def _demote_playbook(self, incident_id: str) -> bool:
"""
降低 Playbook 信心度 (低評分)
2026-03-30 Claude Code: 實作信心度降低邏輯
邏輯:
- 尋找 source_incident_ids 包含此 incident_id 的 Playbooks
- 降低 ai_confidence -0.15 (下限 0.0)
- 若信心度 < 0.3 且 failure_rate > 50% → 自動降級為 DEPRECATED
"""
try:
from src.repositories.playbook_repository import get_playbook_repository
repo = get_playbook_repository()
playbooks = await repo.find_by_source_incident(incident_id)
if not playbooks:
logger.debug(
"playbook_demote_no_match",
incident_id=incident_id,
)
return False
# 信心度降低參數 (懲罰比獎勵更重,避免低品質 Playbook 累積)
CONFIDENCE_PENALTY = -0.15
updated_count = 0
for playbook in playbooks:
result = await repo.adjust_confidence(
playbook_id=playbook.playbook_id,
delta=CONFIDENCE_PENALTY,
reason=f"Low effectiveness rating from incident {incident_id}",
)
if result:
updated_count += 1
logger.info(
"playbook_demoted",
incident_id=incident_id,
updated_count=updated_count,
total_playbooks=len(playbooks),
)
return updated_count > 0
except Exception as e:
logger.warning(
"playbook_demote_failed",
incident_id=incident_id,
error=str(e),
)
return False
# =========================================================================
# 🆕 Phase D-G P0 修正: 新增方法
# =========================================================================
async def record_repair_result(
self,
anomaly_key: str,
repair_action: str,
success: bool,
root_cause: str | None = None,
fix_description: str | None = None,
execution_time_seconds: float | None = None,
) -> bool:
"""
記錄修復結果到 Repository (Redis 持久化)
2026-03-29 P0 修正: 透過 Repository 存取 Redis
Args:
anomaly_key: 異常 key
repair_action: 修復動作
success: 是否成功
root_cause: 根因 (如果找到)
fix_description: 修復說明
execution_time_seconds: 執行時間
Returns:
bool: 是否成功記錄
"""
return await self._repository.record_repair(
anomaly_key=anomaly_key,
repair_action=repair_action,
success=success,
root_cause=root_cause,
fix_description=fix_description,
execution_time_seconds=execution_time_seconds,
)
async def get_recommended_fix(self, anomaly_key: str) -> dict:
"""
根據歷史學習,推薦最佳修復方案
2026-03-29 P0 修正: 使用 Repository 取得統計
Returns:
{
'action': 'scale_up',
'confidence': 0.85,
'tier': 2,
'based_on': '12 次歷史數據',
'avg_execution_time': 45.2,
'alternatives': [...]
}
"""
import math
all_stats = await self._repository.get_all_repair_stats(anomaly_key)
if not all_stats:
return self._default_recommendation()
# 計算各動作的加權分數
scored_actions = []
for action, stats in all_stats.items():
if stats["total"] >= self.MIN_SAMPLES:
success_rate = stats["success_rate"]
if success_rate >= self.SUCCESS_RATE_THRESHOLD:
# 加權: 成功率 * log(樣本數)
score = success_rate * math.log(stats["total"] + 1)
# 取得平均執行時間
history = await self._repository.get_repair_history(
anomaly_key, action, limit=20
)
times = [
h["execution_time"]
for h in history
if h.get("execution_time")
]
avg_time = sum(times) / len(times) if times else 0.0
scored_actions.append({
"action": action,
"score": score,
"success_rate": success_rate,
"total_samples": stats["total"],
"tier": self._get_action_tier(action),
"avg_execution_time": avg_time,
})
if not scored_actions:
return self._default_recommendation()
# 排序: 優先高成功率,其次低 Tier
scored_actions.sort(key=lambda x: (-x["score"], x["tier"]))
best = scored_actions[0]
alternatives = scored_actions[1:3] if len(scored_actions) > 1 else []
return {
"action": best["action"],
"confidence": 0.0, # 🔴 歷史學習不是 AI 分析,信心度設 0
"tier": best["tier"],
"based_on": f"{best['total_samples']} 次歷史數據",
"avg_execution_time": best["avg_execution_time"],
"success_rate": best["success_rate"], # 保留原始成功率作為參考
"alternatives": [
{"action": a["action"], "confidence": 0.0, "success_rate": a["success_rate"], "tier": a["tier"]}
for a in alternatives
],
}
async def get_learning_summary(self, anomaly_key: str) -> dict:
"""
取得學習摘要
Phase 22 P2: 業務邏輯移至 Service 層
2026-03-31 Claude Code (首席架構師技術債修復)
邏輯:
- 從 Repository 取得原始統計數據
- 在 Service 層計算 best_action 和 learning_status
Returns:
{
'anomaly_key': 'abc123',
'total_repair_attempts': 8,
'overall_success_rate': 0.625,
'actions_tried': ['restart_pod', 'scale_up'],
'best_action': {'action': 'scale_up', 'success_rate': 0.75},
'learning_status': 'sufficient',
}
"""
# 從 Repository 取得原始統計
all_stats = await self._repository.get_all_repair_stats(anomaly_key)
if not all_stats:
return {
"anomaly_key": anomaly_key,
"total_repair_attempts": 0,
"overall_success_rate": 0.0,
"actions_tried": [],
"best_action": None,
"learning_status": "insufficient",
}
# === 以下為業務邏輯,應在 Service 層 ===
total_attempts = sum(s["total"] for s in all_stats.values())
total_success = sum(s["success"] for s in all_stats.values())
overall_rate = total_success / total_attempts if total_attempts > 0 else 0.0
# 找出最佳動作 (需要至少 3 次數據)
best_action = None
best_rate = 0.0
for action, stats in all_stats.items():
if stats["total"] >= 3 and stats["success_rate"] > best_rate:
best_rate = stats["success_rate"]
best_action = {"action": action, "success_rate": best_rate}
# 判斷學習狀態
if total_attempts < 3:
learning_status = "insufficient"
elif total_attempts < 10:
learning_status = "learning"
elif overall_rate >= 0.8:
learning_status = "excellent"
else:
learning_status = "sufficient"
return {
"anomaly_key": anomaly_key,
"total_repair_attempts": total_attempts,
"overall_success_rate": overall_rate,
"actions_tried": list(all_stats.keys()),
"best_action": best_action,
"learning_status": learning_status,
}
def _get_action_tier(self, action: str) -> int:
"""取得動作的 Tier"""
tier_actions = {
1: ["restart_pod", "restart_container", "delete_pod"],
2: ["scale_up", "increase_memory", "increase_cpu", "adjust_limits"],
3: ["apply_hotfix", "update_config", "patch_deployment", "rollback"],
4: ["create_issue", "notify_team", "schedule_fix", "manual_intervention"],
}
for tier, actions in tier_actions.items():
if action in actions:
return tier
return 1 # 預設 Tier 1
def _default_recommendation(self) -> dict:
"""預設推薦 (無歷史數據時)"""
return {
"action": "restart_pod",
"confidence": 0.0, # 🔴 預設推薦不是 AI 分析,信心度設 0
"tier": 1,
"based_on": "無歷史數據,使用預設",
"avg_execution_time": 30.0,
"alternatives": [
{"action": "delete_pod", "confidence": 0.0, "tier": 1},
],
}
# =============================================================================
# Singleton
# =============================================================================
_learning_service: LearningService | None = None
def get_learning_service() -> LearningService:
"""取得學習服務 singleton"""
global _learning_service
if _learning_service is None:
_learning_service = LearningService()
return _learning_service