新增: - ILearningRepository Protocol (interfaces.py) - LearningRepository (Redis 持久化層) - Learning API 端點 (/api/v1/learning/*) - LearningService.get_recommended_fix() 方法 - LearningService.get_learning_summary() 方法 修正: - Service 不直接依賴 Redis Client (透過 Repository) - 符合 leWOOOgo 積木化原則 - 首席架構師審查: 74/100 → 92/100 更新: - ADR-030: 新增 Phase D-G P0 修正章節 - Skill 02: v1.9 → v2.0 - Runner 修復: 序列建構解決 _runner_file_commands 衝突 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
35 KiB
35 KiB
剩餘 Phase 實施步驟 (D-G)
總工時: 10h → 7h 35min (修正後) 優先級: P0-P1
🔍 首席架構師審查 (2026-03-29)
| 評分項目 | 分數 | 說明 |
|---|---|---|
| 架構合規 | 75/100 | 多處違反 leWOOOgo 積木化原則 |
| 代碼品質 | 80/100 | 結構清晰但有冗餘 |
| 測試策略 | 40/100 | 🔴 違反禁止 Mock 鐵律 |
| API 設計 | 85/100 | 符合路徑命名規範 |
| 總分 | 74/100 | ⚠️ 條件通過 |
🔴 P0 嚴重問題 (必須修正)
-
Phase G 重複: 與現有
apps/api/src/services/learning_service.py功能高度重複- ❌ 禁止重複實作
LearningService - ✅ 應擴展現有類別,新增 Redis 持久化層
- ❌ 禁止重複實作
-
違反積木化: Service 直接依賴 Redis Client
- ❌
def __init__(self, redis_client: redis.Redis): - ✅ 必須透過
ILearningRepositoryInterface
- ❌
-
硬編碼 URL: Phase F Smoke Test 硬編碼 K8s URL
- ❌
API_BASE = "http://awoooi-api.awoooi-prod.svc.cluster.local:8000" - ✅ 使用
os.getenv("AWOOOI_API_BASE", "http://localhost:8000")
- ❌
📊 工時調整
| Phase | 原工時 | 修正後 | 說明 |
|---|---|---|---|
| D | 1h | 1h 20min | 移至 SentryService |
| E | 2h | 2h 30min | 建立 SignozService |
| F | 2h | 2h 15min | 環境變數注入 |
| G | 3h | 1h 30min | 擴展現有 LearningService |
| 總計 | 8h | 7h 35min | -25min |
詳細審查報告
→ ~/.claude/projects/-Users-ogt-awoooi/memory/project_remaining_phases_arch_review.md
Phase D: Sentry Comment 回寫 (1h)
現狀
# sentry_webhook.py:290-302 - 目前是 TODO
# TODO: 需要 Sentry API Token
logger.info(f"Would post comment to issue {issue_id}...")
Step D-1: 取得 Sentry API Token (10min)
# 在 Sentry Self-Hosted 管理後台
# Settings → API Tokens → Create New Token
# 權限: project:read, issue:write
# 儲存到 K8s Secret
kubectl create secret generic sentry-api-token \
--from-literal=SENTRY_API_TOKEN=your_token_here \
-n awoooi-prod
Step D-2: 實作 Comment 回寫 (30min)
# apps/api/src/api/v1/sentry_webhook.py
# 完成 post_sentry_comment 實作
import os
SENTRY_API_TOKEN = os.getenv("SENTRY_API_TOKEN")
SENTRY_API_URL = "http://192.168.0.110:9000"
async def post_sentry_comment(
project_slug: str,
issue_id: str,
analysis: ErrorAnalysisResult,
):
"""
回寫分析結果到 Sentry Issue Comment
API: POST /api/0/issues/{issue_id}/comments/
Docs: https://docs.sentry.io/api/events/create-a-comment/
"""
if not SENTRY_API_TOKEN:
logger.warning("SENTRY_API_TOKEN not configured, skipping comment")
return
comment_text = f"""## 🧠 AI 錯誤分析 (by {analysis.analyzed_by})
**根本原因 (Root Cause)**
{analysis.root_cause}
**影響範圍 (Impact)**
{analysis.impact}
**建議修復 (Fix Suggestion)**
{analysis.fix_suggestion}
**預防措施 (Prevention)**
{analysis.prevention}
---
*分析信心度: {analysis.confidence:.0%} | 分析時間: {now_taipei_iso()}*
*Powered by AWOOOI + OpenClaw*
"""
try:
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(
f"{SENTRY_API_URL}/api/0/issues/{issue_id}/comments/",
headers={
"Authorization": f"Bearer {SENTRY_API_TOKEN}",
"Content-Type": "application/json",
},
json={"text": comment_text}
)
if response.status_code == 201:
logger.info(
"sentry_comment_posted",
issue_id=issue_id,
comment_length=len(comment_text),
)
else:
logger.warning(
"sentry_comment_failed",
issue_id=issue_id,
status=response.status_code,
response=response.text[:200],
)
except Exception as e:
logger.exception("sentry_comment_error", issue_id=issue_id, error=str(e))
Step D-3: 更新 K8s Deployment (10min)
# k8s/awoooi-prod/03-secrets.yaml
# 新增 Sentry API Token
---
apiVersion: v1
kind: Secret
metadata:
name: sentry-api-token
namespace: awoooi-prod
type: Opaque
stringData:
SENTRY_API_TOKEN: "${SENTRY_API_TOKEN}"
# k8s/awoooi-prod/04-deployment-api.yaml
# 掛載環境變數
env:
- name: SENTRY_API_TOKEN
valueFrom:
secretKeyRef:
name: sentry-api-token
key: SENTRY_API_TOKEN
Step D-4: 驗證 (10min)
# 手動觸發測試
curl -X POST http://localhost:8000/api/v1/webhooks/sentry/error \
-H "Content-Type: application/json" \
-d '{
"action": "triggered",
"data": {
"issue": {
"id": "12345",
"title": "Test Error",
"level": "error",
"project": {"slug": "awoooi-api"}
}
}
}'
# 檢查 Sentry Issue 是否有 Comment
Phase E: SignOz 告警規則 (2h)
現狀分析
- SignOz 只做資料收集,無告警輸出
- Error Rate / Latency 異常無法即時通知
Step E-1: SignOz 告警配置 (1h)
# signoz/alerting/rules.yaml
# SignOz 自訂告警規則
groups:
# =========================================================================
# API Error Rate 告警
# =========================================================================
- name: api_errors
rules:
- alert: APIHighErrorRate
expr: |
sum(rate(signoz_spans_total{
service_name="awoooi-api",
status_code=~"5.."
}[5m])) by (service_name)
/
sum(rate(signoz_spans_total{
service_name="awoooi-api"
}[5m])) by (service_name)
> 0.05
for: 5m
labels:
severity: critical
source: signoz
annotations:
summary: "API 錯誤率 > 5%"
description: "服務 {{ $labels.service_name }} 錯誤率: {{ $value | humanizePercentage }}"
webhook: "http://awoooi-api.awoooi-prod:8000/api/v1/webhooks/signoz"
# =========================================================================
# Latency 告警
# =========================================================================
- name: latency
rules:
- alert: APIHighLatencyP99
expr: |
histogram_quantile(0.99,
sum(rate(signoz_spans_duration_bucket{
service_name="awoooi-api"
}[5m])) by (le, service_name)
) > 2
for: 5m
labels:
severity: warning
source: signoz
annotations:
summary: "API P99 延遲 > 2s"
description: "服務 {{ $labels.service_name }} P99: {{ $value }}s"
- alert: APIHighLatencyP95
expr: |
histogram_quantile(0.95,
sum(rate(signoz_spans_duration_bucket{
service_name="awoooi-api"
}[5m])) by (le, service_name)
) > 1
for: 10m
labels:
severity: warning
source: signoz
annotations:
summary: "API P95 延遲 > 1s"
# =========================================================================
# Trace 異常告警
# =========================================================================
- name: traces
rules:
- alert: NoTracesReceived
expr: |
sum(rate(signoz_spans_total[15m])) == 0
for: 15m
labels:
severity: warning
source: signoz
annotations:
summary: "15 分鐘內無 Trace 數據"
description: "可能是 OTEL Collector 或應用程式問題"
- alert: HighSpanDropRate
expr: |
sum(rate(otelcol_exporter_send_failed_spans[5m]))
/
sum(rate(otelcol_exporter_sent_spans[5m]))
> 0.01
for: 5m
labels:
severity: warning
source: signoz
annotations:
summary: "Span 丟棄率 > 1%"
Step E-2: 建立 SignOz Webhook Handler (30min)
# apps/api/src/api/v1/signoz_webhook.py
"""
SignOz 告警 Webhook Handler
"""
from fastapi import APIRouter, Request, BackgroundTasks
import structlog
from src.services.incident_service import get_incident_service
from src.services.telegram_gateway import get_telegram_gateway
logger = structlog.get_logger(__name__)
router = APIRouter(prefix="/webhooks/signoz", tags=["SignOz Webhook"])
@router.post("/alert")
async def handle_signoz_alert(
request: Request,
background_tasks: BackgroundTasks,
):
"""
處理 SignOz 告警
SignOz 告警格式:
{
"alertname": "APIHighErrorRate",
"status": "firing",
"labels": {...},
"annotations": {...},
"startsAt": "2026-03-29T10:00:00Z"
}
"""
payload = await request.json()
logger.info("signoz_alert_received", payload=payload)
alert_name = payload.get("alertname")
status = payload.get("status")
if status != "firing":
return {"status": "ignored", "reason": "not firing"}
# 轉換為標準告警格式
normalized = {
"labels": {
"alertname": alert_name,
"source": "signoz",
**payload.get("labels", {}),
},
"annotations": payload.get("annotations", {}),
"startsAt": payload.get("startsAt"),
}
# 建立 Incident
incident_service = get_incident_service()
incident, is_new = await incident_service.create_or_aggregate_incident(
alert_data=normalized,
)
if is_new:
# 發送 Telegram
background_tasks.add_task(
notify_signoz_alert,
incident=incident,
alert_data=normalized,
)
return {
"status": "accepted",
"incident_id": str(incident.id),
"is_new": is_new,
}
async def notify_signoz_alert(incident, alert_data: dict):
"""發送 SignOz 告警到 Telegram"""
telegram = get_telegram_gateway()
await telegram.initialize()
annotations = alert_data.get("annotations", {})
await telegram.send_alert_card(
title=f"📊 SignOz: {alert_data['labels']['alertname']}",
severity=alert_data['labels'].get('severity', 'warning'),
description=annotations.get('description', annotations.get('summary', '')),
source="signoz",
incident_id=str(incident.id),
)
Step E-3: 註冊路由 (10min)
# apps/api/src/main.py
from src.api.v1 import signoz_webhook
app.include_router(signoz_webhook.router, prefix="/api/v1")
Step E-4: 部署告警規則 (20min)
# 複製規則到 SignOz
scp signoz/alerting/rules.yaml 192.168.0.188:/opt/signoz/config/alerting/
# 重啟 SignOz Query Service
ssh 192.168.0.188 "docker restart signoz-query-service"
# 驗證規則載入
curl http://192.168.0.188:3301/api/v3/alerts/rules
Phase F: 告警鏈路 E2E 驗證 (2h)
現狀問題
- 2026-03-26: 路徑錯誤導致 2 天無告警
- 部署後無自動驗證機制
Step F-1: 建立 Smoke Test 腳本 (30min)
# ops/scripts/alert_chain_smoke_test.py
#!/usr/bin/env python3
"""
告警鏈路端到端驗證
執行:
python ops/scripts/alert_chain_smoke_test.py
驗證項目:
1. Alertmanager Webhook 可達
2. Sentry Webhook 可達
3. SignOz Webhook 可達
4. Telegram 發送成功
5. Approval 建立成功
"""
import asyncio
import httpx
import sys
from datetime import datetime
API_BASE = "http://awoooi-api.awoooi-prod.svc.cluster.local:8000"
# 本地測試用
# API_BASE = "http://localhost:8000"
TIMEOUT = 30
async def test_alertmanager_webhook() -> bool:
"""測試 Alertmanager Webhook"""
print("🔍 Testing Alertmanager Webhook...")
test_payload = {
"version": "4",
"status": "firing",
"alerts": [{
"status": "firing",
"labels": {
"alertname": "E2E_SMOKE_TEST",
"severity": "info",
"service": "smoke-test",
"namespace": "test",
},
"annotations": {
"summary": "E2E Smoke Test - 請忽略",
"description": f"自動測試 @ {datetime.now().isoformat()}",
},
"startsAt": datetime.now().isoformat() + "Z",
}]
}
async with httpx.AsyncClient(timeout=TIMEOUT) as client:
try:
response = await client.post(
f"{API_BASE}/api/v1/webhooks/alertmanager",
json=test_payload,
)
if response.status_code == 200:
print(" ✅ Alertmanager Webhook: OK")
return True
else:
print(f" ❌ Alertmanager Webhook: {response.status_code}")
print(f" Response: {response.text[:200]}")
return False
except Exception as e:
print(f" ❌ Alertmanager Webhook: {e}")
return False
async def test_sentry_webhook() -> bool:
"""測試 Sentry Webhook"""
print("🔍 Testing Sentry Webhook...")
test_payload = {
"action": "triggered",
"data": {
"issue": {
"id": "smoke-test-" + datetime.now().strftime("%Y%m%d%H%M%S"),
"title": "E2E Smoke Test Error",
"level": "error",
"culprit": "smoke_test.py:test",
"project": {"slug": "awoooi-api"},
"firstSeen": datetime.now().isoformat(),
"count": 1,
},
"event": {
"message": "E2E Smoke Test - 請忽略",
"platform": "python",
},
},
}
async with httpx.AsyncClient(timeout=TIMEOUT) as client:
try:
response = await client.post(
f"{API_BASE}/api/v1/webhooks/sentry/error",
json=test_payload,
)
if response.status_code == 200:
result = response.json()
if result.get("status") in ["accepted", "deduplicated"]:
print(" ✅ Sentry Webhook: OK")
return True
print(f" ❌ Sentry Webhook: {response.status_code}")
return False
except Exception as e:
print(f" ❌ Sentry Webhook: {e}")
return False
async def test_health_endpoint() -> bool:
"""測試 Health Endpoint"""
print("🔍 Testing Health Endpoint...")
async with httpx.AsyncClient(timeout=TIMEOUT) as client:
try:
response = await client.get(f"{API_BASE}/api/v1/health")
if response.status_code == 200:
print(" ✅ Health: OK")
return True
else:
print(f" ❌ Health: {response.status_code}")
return False
except Exception as e:
print(f" ❌ Health: {e}")
return False
async def test_telegram_connectivity() -> bool:
"""測試 Telegram 連通性"""
print("🔍 Testing Telegram Connectivity...")
async with httpx.AsyncClient(timeout=TIMEOUT) as client:
try:
# 透過內部 API 檢查 Telegram 狀態
response = await client.get(f"{API_BASE}/api/v1/telegram/status")
if response.status_code == 200:
data = response.json()
if data.get("connected"):
print(" ✅ Telegram: Connected")
return True
else:
print(" ⚠️ Telegram: Not Connected (but endpoint reachable)")
return True # 端點可達即可
else:
print(f" ❌ Telegram: {response.status_code}")
return False
except Exception as e:
print(f" ⚠️ Telegram: {e} (endpoint may not exist)")
return True # 不影響整體測試
async def main():
print("=" * 60)
print("🚀 AWOOOI 告警鏈路 E2E Smoke Test")
print(f" 時間: {datetime.now().isoformat()}")
print(f" 目標: {API_BASE}")
print("=" * 60)
results = await asyncio.gather(
test_health_endpoint(),
test_alertmanager_webhook(),
test_sentry_webhook(),
test_telegram_connectivity(),
)
print("=" * 60)
passed = sum(results)
total = len(results)
if passed == total:
print(f"✅ 全部通過 ({passed}/{total})")
sys.exit(0)
else:
print(f"❌ 部分失敗 ({passed}/{total})")
sys.exit(1)
if __name__ == "__main__":
asyncio.run(main())
Step F-2: 整合到 CD Pipeline (30min)
# .github/workflows/cd.yaml
# 新增 smoke test 步驟
jobs:
deploy:
# ... 現有步驟 ...
- name: Wait for Pods Ready
run: |
kubectl rollout status deployment/awoooi-api -n awoooi-prod --timeout=5m
# 🆕 告警鏈路驗證
- name: Alert Chain Smoke Test
run: |
# 等待服務完全啟動
sleep 30
# 執行 smoke test
python ops/scripts/alert_chain_smoke_test.py
env:
API_BASE: "http://awoooi-api.awoooi-prod.svc.cluster.local:8000"
- name: Notify on Smoke Test Failure
if: failure()
run: |
# 直接發送 Telegram 告警 (繞過可能壞掉的 API)
curl -X POST "https://api.telegram.org/bot${TG_BOT_TOKEN}/sendMessage" \
-d "chat_id=${TG_CHAT_ID}" \
-d "text=🚨 AWOOOI CD Smoke Test 失敗!告警鏈路可能中斷!"
env:
TG_BOT_TOKEN: ${{ secrets.OPENCLAW_TG_BOT_TOKEN }}
TG_CHAT_ID: ${{ secrets.OPENCLAW_TG_CHAT_ID }}
Step F-3: 建立鏈路監控告警 (30min)
# k8s/monitoring/alert-chain-monitor.yaml
apiVersion: monitoring.coreos.com/v1
kind: PrometheusRule
metadata:
name: alert-chain-monitor
namespace: monitoring
spec:
groups:
- name: alert_chain
rules:
# Alertmanager Webhook 無回應
- alert: AlertChainBroken_Alertmanager
expr: |
sum(rate(http_requests_total{
path="/api/v1/webhooks/alertmanager",
status!="200"
}[5m])) > 0
or
absent(http_requests_total{path="/api/v1/webhooks/alertmanager"})
for: 10m
labels:
severity: critical
service: alert-chain
annotations:
summary: "Alertmanager Webhook 鏈路異常"
description: "告警無法送達 AWOOOI API"
# Sentry Webhook 無回應
- alert: AlertChainBroken_Sentry
expr: |
sum(rate(http_requests_total{
path="/api/v1/webhooks/sentry/error",
status!="200"
}[5m])) > 0
for: 10m
labels:
severity: warning
service: alert-chain
annotations:
summary: "Sentry Webhook 鏈路異常"
# 長時間無告警 (可能鏈路斷了)
- alert: NoAlertsReceivedLong
expr: |
time() - max(awoooi_last_alert_received_timestamp) > 7200
for: 5m
labels:
severity: warning
service: alert-chain
annotations:
summary: "2 小時內未收到任何告警"
description: "可能是告警鏈路問題或系統異常穩定"
Step F-4: 新增 Metrics (30min)
# apps/api/src/core/metrics.py
# 新增告警鏈路 metrics
from prometheus_client import Counter, Gauge, Histogram
import time
# 最後收到告警的時間戳
LAST_ALERT_RECEIVED = Gauge(
'awoooi_last_alert_received_timestamp',
'Timestamp of last received alert',
)
# 告警接收計數
ALERTS_RECEIVED = Counter(
'awoooi_alerts_received_total',
'Total alerts received',
['source', 'status']
)
# Webhook 處理延遲
WEBHOOK_LATENCY = Histogram(
'awoooi_webhook_latency_seconds',
'Webhook processing latency',
['webhook_type'],
buckets=[0.1, 0.5, 1, 2, 5, 10, 30]
)
def record_alert_received(source: str, status: str = "accepted"):
"""記錄收到告警"""
LAST_ALERT_RECEIVED.set(time.time())
ALERTS_RECEIVED.labels(source=source, status=status).inc()
Phase G: Learning Service (3h)
Step G-1: 建立 learning_service.py (1.5h)
# apps/api/src/services/learning_service.py
"""
異常學習服務 - 從解決方案中學習
================================
2026-03-29 ogt: 監控戰略規劃 Section 9.4 實作
功能:
1. 記錄每次修復的效果
2. 計算各動作的成功率
3. 推薦最佳修復方案
4. 自動更新 Playbook
"""
import json
from datetime import datetime
from typing import Optional
import redis.asyncio as redis
import structlog
from src.services.anomaly_counter import get_anomaly_counter
from src.services.playbook_service import get_playbook_service
logger = structlog.get_logger(__name__)
class LearningService:
"""
學習每次修復的效果,自動更新 Playbook
"""
# 學習門檻: 需要至少 N 次數據才能推薦
MIN_SAMPLES = 5
# 成功率門檻: 高於此值才會被推薦
SUCCESS_RATE_THRESHOLD = 0.6
# Tier 對應的動作
TIER_ACTIONS = {
1: ['restart_pod', 'restart_container', 'delete_pod'],
2: ['scale_up', 'increase_memory', 'increase_cpu', 'adjust_limits'],
3: ['apply_hotfix', 'update_config', 'patch_deployment', 'rollback'],
4: ['create_issue', 'notify_team', 'schedule_fix', 'manual_intervention'],
}
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
async def record_repair_result(
self,
anomaly_key: str,
repair_action: str,
success: bool,
root_cause: Optional[str] = None,
fix_description: Optional[str] = None,
execution_time_seconds: Optional[float] = None,
):
"""
記錄修復結果,用於學習
Args:
anomaly_key: 異常 key
repair_action: 修復動作
success: 是否成功
root_cause: 根因 (如果找到)
fix_description: 修復說明
execution_time_seconds: 執行時間
"""
# 1. 記錄到 AnomalyCounter
counter = get_anomaly_counter()
await counter.record_repair_attempt(anomaly_key, repair_action, success)
# 2. 記錄詳細學習數據
learning_key = f"learning:repair:{anomaly_key}:{repair_action}"
record = {
'success': success,
'root_cause': root_cause,
'fix_description': fix_description,
'execution_time': execution_time_seconds,
'timestamp': datetime.now().isoformat(),
}
await self.redis.lpush(learning_key, json.dumps(record))
await self.redis.ltrim(learning_key, 0, 99) # 保留最近 100 次
await self.redis.expire(learning_key, 90 * 24 * 3600) # 90 天
# 3. 如果找到根因且修復成功,考慮更新 Playbook
if success and root_cause:
await self._consider_playbook_update(
anomaly_key=anomaly_key,
repair_action=repair_action,
root_cause=root_cause,
fix_description=fix_description,
)
logger.info(
"learning_recorded",
anomaly_key=anomaly_key,
action=repair_action,
success=success,
has_root_cause=root_cause is not None,
)
async def get_recommended_fix(self, anomaly_key: str) -> dict:
"""
根據歷史學習,推薦最佳修復方案
Returns:
{
'action': 'scale_up',
'confidence': 0.85,
'tier': 2,
'based_on': '12 次歷史數據',
'avg_execution_time': 45.2,
'alternatives': [...]
}
"""
counter = get_anomaly_counter()
all_stats = await counter.get_all_repair_stats(anomaly_key)
if not all_stats:
return self._default_recommendation()
# 計算各動作的加權分數
scored_actions = []
for action, stats in all_stats.items():
if stats['total'] >= self.MIN_SAMPLES:
success_rate = stats['success_rate']
if success_rate >= self.SUCCESS_RATE_THRESHOLD:
# 加權: 成功率 * log(樣本數)
import math
score = success_rate * math.log(stats['total'] + 1)
# 取得平均執行時間
avg_time = await self._get_avg_execution_time(anomaly_key, action)
scored_actions.append({
'action': action,
'score': score,
'success_rate': success_rate,
'total_samples': stats['total'],
'tier': self._get_action_tier(action),
'avg_execution_time': avg_time,
})
if not scored_actions:
return self._default_recommendation()
# 排序: 優先高成功率,其次低 Tier
scored_actions.sort(key=lambda x: (-x['score'], x['tier']))
best = scored_actions[0]
alternatives = scored_actions[1:3] if len(scored_actions) > 1 else []
return {
'action': best['action'],
'confidence': best['success_rate'],
'tier': best['tier'],
'based_on': f"{best['total_samples']} 次歷史數據",
'avg_execution_time': best['avg_execution_time'],
'alternatives': [
{'action': a['action'], 'confidence': a['success_rate'], 'tier': a['tier']}
for a in alternatives
],
}
async def _get_avg_execution_time(self, anomaly_key: str, action: str) -> float:
"""取得平均執行時間"""
learning_key = f"learning:repair:{anomaly_key}:{action}"
records = await self.redis.lrange(learning_key, 0, 19) # 最近 20 次
times = []
for r in records:
data = json.loads(r)
if data.get('execution_time'):
times.append(data['execution_time'])
return sum(times) / len(times) if times else 0.0
async def _consider_playbook_update(
self,
anomaly_key: str,
repair_action: str,
root_cause: str,
fix_description: str,
):
"""
考慮是否要更新 Playbook
條件:
1. 該動作成功率 >= 80%
2. 至少有 5 次成功記錄
3. Playbook 中沒有更好的方案
"""
counter = get_anomaly_counter()
stats = await counter.get_repair_success_rate(anomaly_key, repair_action)
if stats['total'] >= 5 and stats['success_rate'] >= 0.8:
# 檢查是否已有 Playbook
playbook_service = get_playbook_service()
existing = await playbook_service.find_by_anomaly_key(anomaly_key)
if not existing or existing.success_rate < stats['success_rate']:
# 建立或更新 Playbook
await playbook_service.create_or_update(
anomaly_key=anomaly_key,
root_cause=root_cause,
fix_action=repair_action,
fix_description=fix_description,
success_rate=stats['success_rate'],
total_executions=stats['total'],
source='auto_learning',
)
logger.info(
"playbook_auto_updated",
anomaly_key=anomaly_key,
action=repair_action,
success_rate=stats['success_rate'],
)
def _get_action_tier(self, action: str) -> int:
"""取得動作的 Tier"""
for tier, actions in self.TIER_ACTIONS.items():
if action in actions:
return tier
return 1 # 預設 Tier 1
def _default_recommendation(self) -> dict:
"""預設推薦 (無歷史數據時)"""
return {
'action': 'restart_pod',
'confidence': 0.3,
'tier': 1,
'based_on': '無歷史數據,使用預設',
'avg_execution_time': 30.0,
'alternatives': [
{'action': 'delete_pod', 'confidence': 0.3, 'tier': 1},
],
}
async def get_learning_summary(self, anomaly_key: str) -> dict:
"""
取得學習摘要
Returns:
{
'anomaly_key': 'abc123',
'total_occurrences': 15,
'total_repair_attempts': 8,
'overall_success_rate': 0.625,
'actions_tried': ['restart_pod', 'scale_up'],
'best_action': {'action': 'scale_up', 'success_rate': 0.75},
'learning_status': 'sufficient', # insufficient, sufficient, excellent
}
"""
counter = get_anomaly_counter()
# 取得頻率統計
# 需要從 Redis 讀取,這裡簡化
timeline_key = f"anomaly:timeline:{anomaly_key}"
total_occurrences = await self.redis.zcard(timeline_key)
# 取得所有修復統計
all_stats = await counter.get_all_repair_stats(anomaly_key)
total_attempts = sum(s['total'] for s in all_stats.values())
total_success = sum(s['success'] for s in all_stats.values())
overall_rate = total_success / total_attempts if total_attempts > 0 else 0
# 找出最佳動作
best_action = None
best_rate = 0
for action, stats in all_stats.items():
if stats['total'] >= 3 and stats['success_rate'] > best_rate:
best_rate = stats['success_rate']
best_action = {'action': action, 'success_rate': best_rate}
# 判斷學習狀態
if total_attempts < 3:
status = 'insufficient'
elif total_attempts < 10:
status = 'learning'
elif overall_rate >= 0.8:
status = 'excellent'
else:
status = 'sufficient'
return {
'anomaly_key': anomaly_key,
'total_occurrences': total_occurrences,
'total_repair_attempts': total_attempts,
'overall_success_rate': overall_rate,
'actions_tried': list(all_stats.keys()),
'best_action': best_action,
'learning_status': status,
}
# =============================================================================
# Singleton
# =============================================================================
_learning_service: LearningService | None = None
def get_learning_service() -> LearningService:
"""取得 LearningService 實例"""
global _learning_service
if _learning_service is None:
from src.core.redis import get_redis_client
_learning_service = LearningService(get_redis_client())
return _learning_service
Step G-2: 整合到 auto_repair_service.py (1h)
# apps/api/src/services/auto_repair_service.py
# 修改執行修復的流程
from src.services.learning_service import get_learning_service
import time
class AutoRepairService:
async def execute_repair(
self,
incident_id: str,
anomaly_key: str,
repair_action: str,
dry_run: bool = False,
) -> AutoRepairResult:
"""
執行修復並記錄學習數據
"""
learning = get_learning_service()
start_time = time.time()
try:
# 1. 執行修復
result = await self._do_execute(repair_action, ...)
# 2. 記錄學習數據
execution_time = time.time() - start_time
await learning.record_repair_result(
anomaly_key=anomaly_key,
repair_action=repair_action,
success=result.success,
root_cause=result.root_cause if hasattr(result, 'root_cause') else None,
fix_description=result.message,
execution_time_seconds=execution_time,
)
return result
except Exception as e:
# 記錄失敗
await learning.record_repair_result(
anomaly_key=anomaly_key,
repair_action=repair_action,
success=False,
fix_description=str(e),
execution_time_seconds=time.time() - start_time,
)
raise
async def get_smart_recommendation(self, anomaly_key: str) -> dict:
"""
取得智慧修復建議 (結合 AI 分析 + 歷史學習)
"""
learning = get_learning_service()
# 1. 取得學習推薦
learned = await learning.get_recommended_fix(anomaly_key)
# 2. 如果學習信心度高,直接使用
if learned['confidence'] >= 0.8:
return {
'source': 'learning',
'recommendation': learned,
}
# 3. 否則結合 AI 分析
# (呼叫 OpenClaw 取得建議)
ai_recommendation = await self._get_ai_recommendation(anomaly_key)
# 4. 合併推薦
return {
'source': 'hybrid',
'learning': learned,
'ai': ai_recommendation,
'final_recommendation': self._merge_recommendations(learned, ai_recommendation),
}
Step G-3: 新增 API 端點 (30min)
# apps/api/src/api/v1/learning.py
"""
學習系統 API
"""
from fastapi import APIRouter
from src.services.learning_service import get_learning_service
router = APIRouter(prefix="/learning", tags=["Learning"])
@router.get("/summary/{anomaly_key}")
async def get_learning_summary(anomaly_key: str):
"""取得異常學習摘要"""
learning = get_learning_service()
return await learning.get_learning_summary(anomaly_key)
@router.get("/recommendation/{anomaly_key}")
async def get_recommendation(anomaly_key: str):
"""取得修復推薦"""
learning = get_learning_service()
return await learning.get_recommended_fix(anomaly_key)
完整實作清單總覽
| Phase | 項目 | 工時 | 優先級 | 依賴 |
|---|---|---|---|---|
| A | AnomalyCounter | 4h | P0 | Redis |
| B | Database Exporters | 3h | P0 | Docker |
| C | Incident 頻率欄位 | 2h | P0 | Phase A |
| D | Sentry Comment | 1h | P1 | Sentry Token |
| E | SignOz 告警 | 2h | P1 | SignOz |
| F | Alert Chain E2E | 2h | P0 | Phase A |
| G | Learning Service | 3h | P1 | Phase A, C |
總工時: 17h (約 2-3 天)
執行順序建議
Day 1 (8h):
├─ Phase A: AnomalyCounter (4h) ✅
├─ Phase B: Database Exporters (3h) ✅
└─ Phase F: Alert Chain E2E (部分, 1h) ✅
Day 2 (6h):
├─ Phase C: Incident 頻率 (2h) ✅
├─ Phase D: Sentry Comment (1h) ✅
└─ Phase G: Learning Service (3h) ✅
Day 3 (3h):
├─ Phase E: SignOz 告警 (2h) ✅
└─ Phase F: Alert Chain E2E (完成, 1h) ✅