Files
awoooi/apps/api/src/services/learning_service.py
OG T 655d1a568a
Some checks failed
CD Pipeline / build-and-deploy (push) Has been cancelled
feat(Phase 5): Declarative 修復抽象化 + Blast Radius 分控 全部完成
## Phase 5 交付(ADR-086)

### 新增服務(4 個)
- blast_radius_calculator.py: 爆炸半徑計算器(0-100 純函數)
  - 18 種 kubectl 動作基礎分 + 命名空間倍率 + 特殊 flag 修正
  - HARD_RULES 永擋:delete ns/pv/pvc/clusterrole + rm -rf + DROP TABLE
  - 分級:≤10 auto / 11-50 human / 51-99 dual / 100 blocked
- declarative_remediation.py: DeclarativeSpec 不可變規格(frozen dataclass)
  - evaluate() 封裝 Blast Radius + dry-run + rollback_plan + constraints
  - rollback_plan 從 kubectl 動作類型自動推導(不呼叫 LLM)
- gitops_pr_service.py: Gitea Issue 高風險修復審核(tier=dual)
  - 含 Blast Radius + 目標狀態 + 回滾計畫 + 雙人審核流程
  - AIOPS_P5_GITOPS_PR flag 守衛
- rollback_manager.py: 驗證失敗自動回滾
  - 先驗 rollout history ≥ 2 revision,防止無版本可回滾
  - kubectl rollout undo + 120s 收斂等待

### decision_manager.py 接線(AIOPS_P5_BLAST_RADIUS_CHECK)
- _auto_execute() 在安全守衛後、ApprovalRequest 前插入分級守衛
- blocked → 永擋 + 人工審核通知
- dual → 非同步 GitOps Issue + 升級人工審核
- human → 升級人工審核(不自動執行)
- auto(≤10)→ 原有自動執行流程
- 失敗降級:計算異常 → 保守升人工

### learning_service.py
- record_declarative_outcome(): 記錄 DeclarativeSpec 執行結果
  anomaly_key=declarative:{incident_id},含 blast_radius_score/tier/rollback

2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 5 全部完成

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-15 16:06:54 +08:00

829 lines
28 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Learning Service - Phase 5 持續學習迴圈
======================================
ADR-030: 智能自動修復系統
Phase D-G P0 修正: 符合 leWOOOgo 積木化原則
從執行結果中學習,持續優化決策:
1. 更新 Playbook 統計 (成功率/執行次數)
2. 調整信任度 (成功 +分 / 失敗 -分)
3. 萃取新 Playbook (成功案例自動萃取)
4. 處理人工反饋 (有效性評分)
5. 🆕 Redis 持久化學習數據 (透過 Repository)
6. 🆕 修復推薦 (基於歷史成功率)
設計原則:
- 非同步執行,不阻塞主流程
- 失敗容忍,學習失敗不影響執行結果
- 完整審計追蹤
- 🆕 Service 不直接存取 Redis (透過 ILearningRepository)
版本: v1.1
建立: 2026-03-26 (台北時區)
更新: 2026-03-29 (台北時區) - P0 修正: 新增 Repository 層
"""
from dataclasses import dataclass, field
from datetime import UTC, datetime
from enum import Enum
from typing import Any
import structlog
from src.models.approval import ApprovalRequest
from src.models.incident import IncidentStatus
from src.repositories.interfaces import ILearningRepository
from src.repositories.learning_repository import get_learning_repository
from src.services.trust_engine import get_trust_manager
logger = structlog.get_logger(__name__)
# =============================================================================
# Constants
# =============================================================================
class FeedbackType(str, Enum):
"""反饋類型"""
EXECUTION_SUCCESS = "execution_success" # 執行成功
EXECUTION_FAILURE = "execution_failure" # 執行失敗
HUMAN_APPROVE = "human_approve" # 人工批准
HUMAN_REJECT = "human_reject" # 人工拒絕
HUMAN_OVERRIDE = "human_override" # 人工覆蓋 AI 決策
EFFECTIVENESS_RATING = "effectiveness_rating" # 有效性評分
# 信任度調整參數
TRUST_SUCCESS_BOOST = 1 # 成功 +1 分
TRUST_FAILURE_PENALTY = 2 # 失敗 -2 分 (或歸零)
TRUST_HUMAN_REJECT_PENALTY = 1 # 人工拒絕 -1 分
# =============================================================================
# Data Models
# =============================================================================
@dataclass
class ExecutionResult:
"""執行結果"""
approval_id: str
incident_id: str
action: str
success: bool
error_message: str | None = None
duration_seconds: float = 0.0
executed_at: datetime = field(default_factory=lambda: datetime.now(UTC))
def to_dict(self) -> dict[str, Any]:
return {
"approval_id": self.approval_id,
"incident_id": self.incident_id,
"action": self.action,
"success": self.success,
"error_message": self.error_message,
"duration_seconds": self.duration_seconds,
"executed_at": self.executed_at.isoformat(),
}
@dataclass
class FeedbackRequest:
"""人工反饋請求"""
incident_id: str
feedback_type: FeedbackType
effectiveness_score: int | None = None # 1-5 分
learning_notes: str | None = None # 學習筆記
submitted_by: str | None = None
submitted_at: datetime = field(default_factory=lambda: datetime.now(UTC))
@dataclass
class LearningRecord:
"""學習記錄"""
incident_id: str
feedback_type: FeedbackType
action_pattern: str
trust_before: int
trust_after: int
playbook_updated: bool = False
new_playbook_id: str | None = None
learned_at: datetime = field(default_factory=lambda: datetime.now(UTC))
def to_dict(self) -> dict[str, Any]:
return {
"incident_id": self.incident_id,
"feedback_type": self.feedback_type.value,
"action_pattern": self.action_pattern,
"trust_before": self.trust_before,
"trust_after": self.trust_after,
"playbook_updated": self.playbook_updated,
"new_playbook_id": self.new_playbook_id,
"learned_at": self.learned_at.isoformat(),
}
# =============================================================================
# Learning Service
# =============================================================================
class LearningService:
"""
持續學習服務
職責:
1. 處理執行結果 → 更新 Playbook + 信任度
2. 處理人工反饋 → 調整 Playbook 有效性
3. 萃取新 Playbook (成功案例)
4. 🆕 Redis 持久化學習數據 (透過 Repository)
5. 🆕 修復推薦 (基於歷史成功率)
2026-03-29 P0 修正: 符合 leWOOOgo 積木化原則
- 透過 ILearningRepository 存取 Redis
- 不直接依賴 Redis Client
"""
# 推薦門檻
MIN_SAMPLES = 5 # 最少需要 N 次數據才能推薦
SUCCESS_RATE_THRESHOLD = 0.6 # 成功率門檻
def __init__(
self,
repository: ILearningRepository | None = None,
):
self._trust_manager = get_trust_manager()
self._repository = repository or get_learning_repository()
async def process_execution_result(
self,
approval: ApprovalRequest,
result: ExecutionResult,
) -> LearningRecord:
"""
處理執行結果,觸發學習
Args:
approval: 原始審批請求
result: 執行結果
Returns:
LearningRecord: 學習記錄
"""
action_pattern = self._extract_action_pattern(approval.action)
# 取得當前信任分數
trust_record = self._trust_manager.get_trust_record(action_pattern)
trust_before = trust_record.score if trust_record else 0
# 1. 調整信任度
if result.success:
# 成功: 記錄批准 (信任分數 +1)
self._trust_manager.record_approval(
action_pattern=action_pattern,
user_role="system",
user_id="auto_learning",
)
feedback_type = FeedbackType.EXECUTION_SUCCESS
else:
# 失敗: 記錄拒絕 (信任分數歸零)
self._trust_manager.record_rejection(
action_pattern=action_pattern,
user_role="system",
user_id="auto_learning",
reason=result.error_message,
)
feedback_type = FeedbackType.EXECUTION_FAILURE
# 取得更新後的信任分數
trust_record = self._trust_manager.get_trust_record(action_pattern)
trust_after = trust_record.score if trust_record else 0
# 2. 更新 Playbook 統計 (如果有匹配)
# ADR-083 Phase 3: 雙路徑查找 matched_playbook_id
# 路徑 A: ApprovalRequest.matched_playbook_idauto_execute 路徑Phase 3 修復)
# 路徑 B: approval.metadata["playbook_id"](人工審核路徑,透過 proposal_service 存入 metadata
# 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 3 Playbook EWMA 修復
_matched_pb_id: str | None = (
getattr(approval, "matched_playbook_id", None)
or (approval.metadata or {}).get("matched_playbook_id")
or (approval.metadata or {}).get("playbook_id")
)
playbook_updated = False
if _matched_pb_id:
try:
await self._update_playbook_stats(
playbook_id=_matched_pb_id,
success=result.success,
)
playbook_updated = True
except Exception as e:
logger.warning(
"playbook_stats_update_failed",
playbook_id=_matched_pb_id,
error=str(e),
)
# 3. 嘗試萃取新 Playbook (成功且無匹配 Playbook)
new_playbook_id = None
if result.success and not _matched_pb_id:
try:
new_playbook_id = await self._try_extract_playbook(
incident_id=result.incident_id,
action=approval.action,
)
except Exception as e:
logger.warning(
"playbook_extraction_failed",
incident_id=result.incident_id,
error=str(e),
)
# 建立學習記錄
record = LearningRecord(
incident_id=result.incident_id,
feedback_type=feedback_type,
action_pattern=action_pattern,
trust_before=trust_before,
trust_after=trust_after,
playbook_updated=playbook_updated,
new_playbook_id=new_playbook_id,
)
logger.info(
"learning_completed",
incident_id=result.incident_id,
success=result.success,
trust_change=f"{trust_before}{trust_after}",
playbook_updated=playbook_updated,
new_playbook=new_playbook_id,
)
return record
async def process_human_feedback(
self,
feedback: FeedbackRequest,
) -> LearningRecord:
"""
處理人工反饋
Args:
feedback: 反饋請求
Returns:
LearningRecord: 學習記錄
"""
# 從 incident 取得 action pattern (需查詢)
action_pattern = f"incident:{feedback.incident_id}"
trust_record = self._trust_manager.get_trust_record(action_pattern)
trust_before = trust_record.score if trust_record else 0
playbook_updated = False
if feedback.feedback_type == FeedbackType.HUMAN_APPROVE:
# 人工批准: 信任 +1
self._trust_manager.record_approval(
action_pattern=action_pattern,
user_role="human",
user_id=feedback.submitted_by,
)
elif feedback.feedback_type == FeedbackType.HUMAN_REJECT:
# 人工拒絕: 信任歸零
self._trust_manager.record_rejection(
action_pattern=action_pattern,
user_role="human",
user_id=feedback.submitted_by,
reason="Human rejected",
)
elif feedback.feedback_type == FeedbackType.EFFECTIVENESS_RATING:
# 有效性評分
if feedback.effectiveness_score is not None:
if feedback.effectiveness_score >= 4:
# 高評分: 增加信任
self._trust_manager.record_approval(
action_pattern=action_pattern,
user_role="feedback",
user_id=feedback.submitted_by,
)
playbook_updated = await self._promote_playbook(feedback.incident_id)
elif feedback.effectiveness_score <= 2:
# 低評分: 降低信任
self._trust_manager.record_rejection(
action_pattern=action_pattern,
user_role="feedback",
user_id=feedback.submitted_by,
reason=f"Low effectiveness score: {feedback.effectiveness_score}",
)
playbook_updated = await self._demote_playbook(feedback.incident_id)
trust_record = self._trust_manager.get_trust_record(action_pattern)
trust_after = trust_record.score if trust_record else 0
record = LearningRecord(
incident_id=feedback.incident_id,
feedback_type=feedback.feedback_type,
action_pattern=action_pattern,
trust_before=trust_before,
trust_after=trust_after,
playbook_updated=playbook_updated,
)
logger.info(
"human_feedback_processed",
incident_id=feedback.incident_id,
feedback_type=feedback.feedback_type.value,
effectiveness_score=feedback.effectiveness_score,
trust_change=f"{trust_before}{trust_after}",
)
return record
# =========================================================================
# Private Methods
# =========================================================================
def _extract_action_pattern(self, action: str) -> str:
"""從 action 字串提取 pattern"""
if not action:
return "unknown"
parts = action.split()
if len(parts) < 3:
return "unknown"
verb = parts[1] if len(parts) > 1 else "unknown"
resource_part = parts[2] if len(parts) > 2 else ""
if "/" in resource_part:
resource_name = resource_part.split("/")[-1]
else:
resource_name = resource_part
# 移除 pod hash suffix
resource_parts = resource_name.split("-")
if len(resource_parts) >= 3:
resource_name = "-".join(resource_parts[:-2]) + "-*"
return f"{verb}:{resource_name}"
async def _update_playbook_stats(
self,
playbook_id: str,
success: bool,
) -> None:
"""更新 Playbook 統計"""
try:
from src.services.playbook_service import get_playbook_service
service = get_playbook_service()
await service.record_execution(playbook_id, success)
except Exception as e:
logger.warning(
"playbook_stats_update_error",
playbook_id=playbook_id,
error=str(e),
)
async def _try_extract_playbook(
self,
incident_id: str,
action: str,
) -> str | None:
"""嘗試從成功案例萃取 Playbook"""
try:
from src.repositories.incident_repository import get_incident_repository
from src.services.playbook_service import get_playbook_service
# 取得 Incident
repo = get_incident_repository()
incident = await repo.get_by_id(incident_id)
if not incident:
return None
# 確認狀態為 RESOLVED
if incident.status not in [IncidentStatus.RESOLVED, IncidentStatus.CLOSED]:
return None
# 萃取 Playbook
service = get_playbook_service()
playbook = await service.extract_from_incident(
incident=incident,
auto_approve=False, # 需人工審核
)
if playbook:
logger.info(
"playbook_auto_extracted",
incident_id=incident_id,
playbook_id=playbook.playbook_id,
)
return playbook.playbook_id
return None
except Exception as e:
logger.warning(
"playbook_extraction_error",
incident_id=incident_id,
error=str(e),
)
return None
async def _promote_playbook(self, incident_id: str) -> bool:
"""
提升 Playbook 信心度 (高評分)
2026-03-30 Claude Code: 實作信心度提升邏輯
邏輯:
- 尋找 source_incident_ids 包含此 incident_id 的 Playbooks
- 提升 ai_confidence +0.1 (上限 1.0)
- 若信心度 >= 0.9 且 status == DRAFT → 自動升級為 APPROVED
"""
try:
from src.repositories.playbook_repository import get_playbook_repository
repo = get_playbook_repository()
playbooks = await repo.find_by_source_incident(incident_id)
if not playbooks:
logger.debug(
"playbook_promote_no_match",
incident_id=incident_id,
)
return False
# 信心度提升參數
CONFIDENCE_BOOST = 0.1
updated_count = 0
for playbook in playbooks:
result = await repo.adjust_confidence(
playbook_id=playbook.playbook_id,
delta=CONFIDENCE_BOOST,
reason=f"High effectiveness rating from incident {incident_id}",
)
if result:
updated_count += 1
logger.info(
"playbook_promoted",
incident_id=incident_id,
updated_count=updated_count,
total_playbooks=len(playbooks),
)
return updated_count > 0
except Exception as e:
logger.warning(
"playbook_promote_failed",
incident_id=incident_id,
error=str(e),
)
return False
async def _demote_playbook(self, incident_id: str) -> bool:
"""
降低 Playbook 信心度 (低評分)
2026-03-30 Claude Code: 實作信心度降低邏輯
邏輯:
- 尋找 source_incident_ids 包含此 incident_id 的 Playbooks
- 降低 ai_confidence -0.15 (下限 0.0)
- 若信心度 < 0.3 且 failure_rate > 50% → 自動降級為 DEPRECATED
"""
try:
from src.repositories.playbook_repository import get_playbook_repository
repo = get_playbook_repository()
playbooks = await repo.find_by_source_incident(incident_id)
if not playbooks:
logger.debug(
"playbook_demote_no_match",
incident_id=incident_id,
)
return False
# 信心度降低參數 (懲罰比獎勵更重,避免低品質 Playbook 累積)
CONFIDENCE_PENALTY = -0.15
updated_count = 0
for playbook in playbooks:
result = await repo.adjust_confidence(
playbook_id=playbook.playbook_id,
delta=CONFIDENCE_PENALTY,
reason=f"Low effectiveness rating from incident {incident_id}",
)
if result:
updated_count += 1
logger.info(
"playbook_demoted",
incident_id=incident_id,
updated_count=updated_count,
total_playbooks=len(playbooks),
)
return updated_count > 0
except Exception as e:
logger.warning(
"playbook_demote_failed",
incident_id=incident_id,
error=str(e),
)
return False
# =========================================================================
# 🆕 Phase D-G P0 修正: 新增方法
# =========================================================================
async def record_repair_result(
self,
anomaly_key: str,
repair_action: str,
success: bool,
root_cause: str | None = None,
fix_description: str | None = None,
execution_time_seconds: float | None = None,
) -> bool:
"""
記錄修復結果到 Repository (Redis 持久化)
2026-03-29 P0 修正: 透過 Repository 存取 Redis
Args:
anomaly_key: 異常 key
repair_action: 修復動作
success: 是否成功
root_cause: 根因 (如果找到)
fix_description: 修復說明
execution_time_seconds: 執行時間
Returns:
bool: 是否成功記錄
"""
return await self._repository.record_repair(
anomaly_key=anomaly_key,
repair_action=repair_action,
success=success,
root_cause=root_cause,
fix_description=fix_description,
execution_time_seconds=execution_time_seconds,
)
async def record_declarative_outcome(
self,
incident_id: str,
action: str,
blast_radius_score: int,
blast_radius_tier: str,
success: bool,
rollback_triggered: bool = False,
execution_time_seconds: float | None = None,
) -> bool:
"""
記錄 DeclarativeSpec 執行結果到學習系統。
Phase 5 ADR-086DeclarativeSpec 執行結果寫入學習記錄,
讓 AI 能從 Blast Radius 分級的執行歷史中學習。
Args:
incident_id: 關聯 Incident ID
action: 執行的 kubectl 命令
blast_radius_score: 爆炸半徑分數0-100
blast_radius_tier: 執行分級auto/human/dual/blocked
success: 是否執行成功
rollback_triggered: 是否觸發了回滾
execution_time_seconds: 執行耗時
Returns:
bool: 是否成功記錄
2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 5 初始建立
"""
import json
from src.utils.timezone import now_taipei
try:
anomaly_key = f"declarative:{incident_id}"
fix_desc = json.dumps({
"blast_radius_score": blast_radius_score,
"blast_radius_tier": blast_radius_tier,
"rollback_triggered": rollback_triggered,
"recorded_at": now_taipei().isoformat(),
}, ensure_ascii=False)
return await self._repository.record_repair(
anomaly_key=anomaly_key,
repair_action=action[:200],
success=success,
root_cause=f"blast_radius_tier={blast_radius_tier}",
fix_description=fix_desc,
execution_time_seconds=execution_time_seconds,
)
except Exception as e:
import structlog as _structlog
_structlog.get_logger(__name__).warning(
"record_declarative_outcome_failed",
incident_id=incident_id,
error=str(e),
)
return False
async def get_recommended_fix(self, anomaly_key: str) -> dict:
"""
根據歷史學習,推薦最佳修復方案
2026-03-29 P0 修正: 使用 Repository 取得統計
Returns:
{
'action': 'scale_up',
'confidence': 0.85,
'tier': 2,
'based_on': '12 次歷史數據',
'avg_execution_time': 45.2,
'alternatives': [...]
}
"""
import math
all_stats = await self._repository.get_all_repair_stats(anomaly_key)
if not all_stats:
return self._default_recommendation()
# 計算各動作的加權分數
scored_actions = []
for action, stats in all_stats.items():
if stats["total"] >= self.MIN_SAMPLES:
success_rate = stats["success_rate"]
if success_rate >= self.SUCCESS_RATE_THRESHOLD:
# 加權: 成功率 * log(樣本數)
score = success_rate * math.log(stats["total"] + 1)
# 取得平均執行時間
history = await self._repository.get_repair_history(
anomaly_key, action, limit=20
)
times = [
h["execution_time"]
for h in history
if h.get("execution_time")
]
avg_time = sum(times) / len(times) if times else 0.0
scored_actions.append({
"action": action,
"score": score,
"success_rate": success_rate,
"total_samples": stats["total"],
"tier": self._get_action_tier(action),
"avg_execution_time": avg_time,
})
if not scored_actions:
return self._default_recommendation()
# 排序: 優先高成功率,其次低 Tier
scored_actions.sort(key=lambda x: (-x["score"], x["tier"]))
best = scored_actions[0]
alternatives = scored_actions[1:3] if len(scored_actions) > 1 else []
return {
"action": best["action"],
"confidence": 0.0, # 🔴 歷史學習不是 AI 分析,信心度設 0
"tier": best["tier"],
"based_on": f"{best['total_samples']} 次歷史數據",
"avg_execution_time": best["avg_execution_time"],
"success_rate": best["success_rate"], # 保留原始成功率作為參考
"alternatives": [
{"action": a["action"], "confidence": 0.0, "success_rate": a["success_rate"], "tier": a["tier"]}
for a in alternatives
],
}
async def get_learning_summary(self, anomaly_key: str) -> dict:
"""
取得學習摘要
Phase 22 P2: 業務邏輯移至 Service 層
2026-03-31 Claude Code (首席架構師技術債修復)
邏輯:
- 從 Repository 取得原始統計數據
- 在 Service 層計算 best_action 和 learning_status
Returns:
{
'anomaly_key': 'abc123',
'total_repair_attempts': 8,
'overall_success_rate': 0.625,
'actions_tried': ['restart_pod', 'scale_up'],
'best_action': {'action': 'scale_up', 'success_rate': 0.75},
'learning_status': 'sufficient',
}
"""
# 從 Repository 取得原始統計
all_stats = await self._repository.get_all_repair_stats(anomaly_key)
if not all_stats:
return {
"anomaly_key": anomaly_key,
"total_repair_attempts": 0,
"overall_success_rate": 0.0,
"actions_tried": [],
"best_action": None,
"learning_status": "insufficient",
}
# === 以下為業務邏輯,應在 Service 層 ===
total_attempts = sum(s["total"] for s in all_stats.values())
total_success = sum(s["success"] for s in all_stats.values())
overall_rate = total_success / total_attempts if total_attempts > 0 else 0.0
# 找出最佳動作 (需要至少 3 次數據)
best_action = None
best_rate = 0.0
for action, stats in all_stats.items():
if stats["total"] >= 3 and stats["success_rate"] > best_rate:
best_rate = stats["success_rate"]
best_action = {"action": action, "success_rate": best_rate}
# 判斷學習狀態
if total_attempts < 3:
learning_status = "insufficient"
elif total_attempts < 10:
learning_status = "learning"
elif overall_rate >= 0.8:
learning_status = "excellent"
else:
learning_status = "sufficient"
return {
"anomaly_key": anomaly_key,
"total_repair_attempts": total_attempts,
"overall_success_rate": overall_rate,
"actions_tried": list(all_stats.keys()),
"best_action": best_action,
"learning_status": learning_status,
}
def _get_action_tier(self, action: str) -> int:
"""取得動作的 Tier"""
tier_actions = {
1: ["restart_pod", "restart_container", "delete_pod"],
2: ["scale_up", "increase_memory", "increase_cpu", "adjust_limits"],
3: ["apply_hotfix", "update_config", "patch_deployment", "rollback"],
4: ["create_issue", "notify_team", "schedule_fix", "manual_intervention"],
}
for tier, actions in tier_actions.items():
if action in actions:
return tier
return 1 # 預設 Tier 1
def _default_recommendation(self) -> dict:
"""預設推薦 (無歷史數據時)"""
return {
"action": "restart_pod",
"confidence": 0.0, # 🔴 預設推薦不是 AI 分析,信心度設 0
"tier": 1,
"based_on": "無歷史數據,使用預設",
"avg_execution_time": 30.0,
"alternatives": [
{"action": "delete_pod", "confidence": 0.0, "tier": 1},
],
}
# =============================================================================
# Singleton
# =============================================================================
_learning_service: LearningService | None = None
def get_learning_service() -> LearningService:
"""取得學習服務 singleton"""
global _learning_service
if _learning_service is None:
_learning_service = LearningService()
return _learning_service