Files
awoooi/apps/api/src/api/v1/sentry_webhook.py
Your Name f3fbd39898
All checks were successful
Code Review / ai-code-review (push) Successful in 11s
CD Pipeline / tests (push) Successful in 5m50s
CD Pipeline / build-and-deploy (push) Successful in 3m58s
CD Pipeline / post-deploy-checks (push) Successful in 1m48s
feat(awooop): add provider upstream canary
2026-05-20 20:48:36 +08:00

764 lines
27 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
AWOOOI API - Sentry Webhook Handler
====================================
接收 Sentry Issue Alert轉發給 OpenClaw 進行 AI 分析
整合流程:
1. Sentry Alert → AWOOOI API Webhook
2. 組裝錯誤上下文
3. 呼叫 OpenClaw Error Analyzer Agent
4. 結果回寫 Sentry Issue Comment
5. 發送 Telegram 告警 (含截圖)
6. 建立 Approval 供人工審核
🔴 HARD RULE: 時間顯示使用 Asia/Taipei (UTC+8)
"""
import json
import uuid
from typing import Any
import structlog
from fastapi import APIRouter, BackgroundTasks, HTTPException, Request
from pydantic import BaseModel
from src.core.awooop_operator_auth import authenticate_awooop_operator_headers
from src.core.circuit_breaker import get_openclaw_guard
from src.core.metrics import (
record_alert_chain_failure,
record_alert_chain_success,
record_alert_processed,
record_anomaly,
)
from src.models.approval import (
ApprovalRequestCreate,
BlastRadius,
DataImpact,
RiskLevel,
)
from src.services.anomaly_counter import get_anomaly_counter
from src.services.approval_db import get_approval_service
from src.services.channel_hub import record_external_alert_event
from src.services.openclaw_http_service import get_openclaw_http_service
from src.services.sentry_service import get_sentry_service
# 2026-04-27 P3.1-T2 by Claude — Tier-2 三服務感知強化:補 SentryWebhookService 簽章驗證
from src.services.sentry_webhook_service import (
SentrySignatureError,
verify_sentry_signature,
)
from src.services.telegram_gateway import get_telegram_gateway
from src.utils.timezone import now_taipei_iso
logger = structlog.get_logger(__name__)
router = APIRouter(prefix="/webhooks/sentry", tags=["Sentry Webhook"])
# OpenClaw 配置 - 從 settings 讀取 (P1-1 修復, 2026-03-29)
# 2026-03-29: SENTRY_API_URL 已移至 settings.SENTRY_SELF_HOSTED_URL (Wave A.1)
# Sentry Level → Risk Level 映射
SENTRY_LEVEL_TO_RISK = {
"fatal": RiskLevel.CRITICAL,
"error": RiskLevel.HIGH,
"warning": RiskLevel.MEDIUM,
"info": RiskLevel.LOW,
}
# 去重配置 (Phase 10.2.1 - 2026-03-27)
SENTRY_DEDUP_TTL = 600 # 10 分鐘內不重複發送同一 issue
class SentryIssuePayload(BaseModel):
"""Sentry Issue Alert Payload (簡化版)"""
action: str # created, resolved, etc.
data: dict
actor: dict | None = None
class ErrorAnalysisResult(BaseModel):
"""錯誤分析結果"""
root_cause: str
impact: str
fix_suggestion: str
prevention: str
confidence: float
analyzed_by: str # ollama, claude
@router.get("/health")
async def sentry_webhook_health() -> dict:
"""Wave A.6 Smoke Test: Sentry Webhook 可達性探測"""
return {"status": "ok", "webhook": "sentry"}
def _sentry_event_tag(event_data: dict[str, Any], key: str) -> str | None:
tags = event_data.get("tags") or []
for tag in tags:
if isinstance(tag, list | tuple) and len(tag) >= 2 and str(tag[0]) == key:
return str(tag[1])
if isinstance(tag, dict) and str(tag.get("key")) == key:
value = tag.get("value")
return str(value) if value is not None else None
return None
def _is_sentry_upstream_canary(payload: dict[str, Any]) -> bool:
data = payload.get("data") if isinstance(payload, dict) else None
if not isinstance(data, dict) or payload.get("action") != "triggered":
return False
issue_data = data.get("issue") if isinstance(data.get("issue"), dict) else {}
event_data = data.get("event") if isinstance(data.get("event"), dict) else {}
issue_id = str(issue_data.get("id") or "")
short_id = str(issue_data.get("shortId") or "")
title = str(issue_data.get("title") or "")
return (
issue_id.startswith("awoooi-canary-")
or short_id.upper().startswith("AWOOOI-CANARY")
or title == "AwoooPSourceProviderCanary"
or (_sentry_event_tag(event_data, "awoooi_canary") or "").lower() == "true"
)
async def _record_sentry_upstream_canary(
payload: dict[str, Any],
request: Request,
) -> dict[str, Any]:
operator = authenticate_awooop_operator_headers(
request.headers.get("x-awooop-operator-id"),
request.headers.get("x-awooop-operator-key"),
)
data = payload.get("data") if isinstance(payload.get("data"), dict) else {}
issue_data = data.get("issue") if isinstance(data.get("issue"), dict) else {}
event_data = data.get("event") if isinstance(data.get("event"), dict) else {}
issue_id = str(
issue_data.get("id")
or issue_data.get("shortId")
or _sentry_event_tag(event_data, "run_ref")
or "awoooi-canary-unknown"
)
source_url = (
issue_data.get("permalink")
or issue_data.get("web_url")
or issue_data.get("url")
)
event_uuid = await record_external_alert_event(
project_id="awoooi",
provider="sentry",
event_id=issue_id,
stage="upstream_canary",
title=str(issue_data.get("title") or "AwoooPSourceProviderCanary"),
severity=str(issue_data.get("level") or "info"),
namespace="awoooi-prod",
target_resource=str(issue_data.get("culprit") or "source-provider-ingestion"),
fingerprint=f"source-provider-canary:sentry:{issue_id}",
source_url=source_url,
labels={
"project": issue_data.get("project", {}),
"level": issue_data.get("level", "info"),
"awoooi_canary": "true",
"operator_id": operator.operator_id,
"telegram": "not_sent",
"incident": "not_created",
"approval": "not_created",
},
annotations={
"message": event_data.get("message"),
"summary": (
"Operator-signed Sentry webhook canary; records upstream "
"source evidence without creating incident, approval, or Telegram."
),
},
payload={
"raw_canary": payload,
"operator_id": operator.operator_id,
"auth_method": operator.auth_method,
"side_effects": {
"incident_created": False,
"approval_created": False,
"telegram_sent": False,
"openclaw_called": False,
},
},
)
if event_uuid is None:
raise HTTPException(
status_code=500,
detail="sentry upstream canary was not recorded",
)
return {
"status": "canary_recorded",
"provider": "sentry",
"event_id": issue_id,
"conversation_event_id": str(event_uuid),
"side_effects": {
"incident_created": False,
"approval_created": False,
"telegram_sent": False,
"openclaw_called": False,
},
}
@router.post("/error")
async def handle_sentry_error(
request: Request,
background_tasks: BackgroundTasks
):
"""
Sentry Issue Webhook Handler
觸發條件:
- Issue 新建 (action=created)
- Level: error 或 fatal
處理流程:
1. 解析 Sentry payload
2. 組裝錯誤上下文
3. 背景執行 OpenClaw 分析
4. 回寫 Sentry Comment
"""
try:
# 2026-04-27 P3.1-T2 by Claude — Tier-2 三服務感知強化:接入 SentryWebhookService 簽章驗證
body = await request.body()
try:
payload_from_body = json.loads(body.decode("utf-8") or "{}")
except json.JSONDecodeError:
payload_from_body = {}
if isinstance(payload_from_body, dict) and _is_sentry_upstream_canary(payload_from_body):
return await _record_sentry_upstream_canary(payload_from_body, request)
sig_header = request.headers.get("sentry-hook-signature", "")
try:
verify_sentry_signature(body, sig_header)
except SentrySignatureError as sig_err:
logger.warning("sentry_signature_rejected", error=str(sig_err))
raise HTTPException(status_code=401, detail=str(sig_err)) from sig_err
payload = await request.json()
logger.info(f"Received Sentry webhook: action={payload.get('action')}")
# 只處理新建的 issue
if payload.get("action") != "triggered":
return {"status": "ignored", "reason": "action is not triggered"}
# 提取錯誤資訊
issue_data = payload.get("data", {}).get("issue", {})
event_data = payload.get("data", {}).get("event", {})
issue_id = issue_data.get("id")
source_url = (
issue_data.get("permalink")
or issue_data.get("web_url")
or issue_data.get("url")
)
background_tasks.add_task(
record_external_alert_event,
project_id="awoooi",
provider="sentry",
event_id=str(issue_id or issue_data.get("shortId") or "unknown"),
stage="received",
title=str(issue_data.get("title") or "Sentry issue"),
severity=str(issue_data.get("level") or "error"),
namespace="sentry",
target_resource=str(issue_data.get("culprit") or issue_data.get("project", {}).get("slug") or "unknown"),
fingerprint=f"sentry-{issue_id or issue_data.get('shortId') or 'unknown'}",
source_url=source_url,
labels={
"project": issue_data.get("project", {}),
"level": issue_data.get("level"),
"culprit": issue_data.get("culprit"),
},
annotations={"message": event_data.get("message")},
payload=payload,
)
# Phase 10.2.1: 去重檢查 (10 分鐘內不重複發送)
sentry_service = get_sentry_service()
if not await sentry_service.check_dedup(issue_id, ttl=SENTRY_DEDUP_TTL):
background_tasks.add_task(
record_external_alert_event,
project_id="awoooi",
provider="sentry",
event_id=str(issue_id or issue_data.get("shortId") or "unknown"),
stage="deduplicated",
title=str(issue_data.get("title") or "Sentry issue"),
severity=str(issue_data.get("level") or "error"),
namespace="sentry",
target_resource=str(issue_data.get("culprit") or issue_data.get("project", {}).get("slug") or "unknown"),
fingerprint=f"sentry-{issue_id or issue_data.get('shortId') or 'unknown'}",
source_url=source_url,
labels={"project": issue_data.get("project", {}), "level": issue_data.get("level")},
annotations={"message": event_data.get("message")},
payload={"dedup_ttl": SENTRY_DEDUP_TTL},
is_duplicate=True,
)
return {"status": "deduplicated", "issue_id": issue_id, "ttl": SENTRY_DEDUP_TTL}
error_context = {
"issue_id": issue_data.get("id"),
"source_url": source_url,
"title": issue_data.get("title"),
"culprit": issue_data.get("culprit"),
"level": issue_data.get("level"),
"first_seen": issue_data.get("firstSeen"),
"count": issue_data.get("count"),
"project": issue_data.get("project", {}).get("slug"),
# 事件詳情
"message": event_data.get("message"),
"platform": event_data.get("platform"),
"tags": event_data.get("tags", []),
# Stack trace (最後5個 frame)
"stacktrace": _extract_stacktrace(event_data),
}
# 判斷是否需要 AI 分析
level = issue_data.get("level", "error")
if level not in ["error", "fatal"]:
return {"status": "ignored", "reason": f"level {level} does not require analysis"}
# 背景執行分析
background_tasks.add_task(
analyze_and_comment,
error_context=error_context,
issue_id=issue_data.get("id"),
project_slug=issue_data.get("project", {}).get("slug"),
)
return {
"status": "accepted",
"issue_id": error_context["issue_id"],
"message": "Analysis scheduled"
}
except HTTPException:
raise
except Exception as e:
logger.exception("Sentry webhook processing failed")
raise HTTPException(status_code=500, detail=str(e)) from e
def _extract_stacktrace(event_data: dict) -> list[dict]:
"""提取 Stack Trace (最後5個 frame)"""
try:
exception = event_data.get("exception", {})
values = exception.get("values", [])
if not values:
return []
stacktrace = values[0].get("stacktrace", {})
frames = stacktrace.get("frames", [])
# 取最後5個 frame只保留關鍵資訊
return [
{
"filename": f.get("filename"),
"function": f.get("function"),
"lineno": f.get("lineno"),
"context_line": f.get("context_line"),
}
for f in frames[-5:]
]
except Exception:
return []
async def analyze_and_comment(
error_context: dict,
issue_id: str,
project_slug: str
):
"""
背景任務:分析錯誤 + Telegram 告警 + 建立 Approval
Phase 10: Sentry + OpenClaw AI 整合
Phase 21 (ADR-037): 異常頻率統計
執行順序 (避免邏輯衝突):
1. 記錄異常頻率 (AnomalyCounter)
2. 呼叫 OpenClaw 分析
3. 建立 Approval (含頻率資訊)
4. 發送 Telegram 告警 (含頻率資訊)
5. 回寫 Sentry Comment (含頻率資訊)
"""
try:
logger.info("sentry_analysis_started", issue_id=issue_id)
# 1. 記錄異常頻率 (ADR-037)
anomaly_counter = get_anomaly_counter()
anomaly_signature = {
"alert_name": "sentry_error",
"service": error_context.get("project", "unknown"),
"error_type": error_context.get("title", "unknown"),
"namespace": "sentry", # Sentry 來源統一標記
}
frequency = await anomaly_counter.record_anomaly(anomaly_signature)
frequency_dict = frequency.to_dict()
logger.info(
"anomaly_frequency_recorded",
issue_id=issue_id,
anomaly_key=frequency.anomaly_key,
count_24h=frequency.count_24h,
escalation_level=frequency.escalation_level,
)
# Wave A.5: 記錄異常指標 (ADR-037)
record_anomaly(
alert_name="sentry_error",
service=error_context.get("project", "unknown"),
frequency_24h=frequency.count_24h,
escalation_level=frequency.escalation_level,
)
# 2. 呼叫 OpenClaw 分析
analysis = await call_openclaw_analyzer(error_context)
# 3. 建立 Approval (含頻率資訊)
approval_id = await create_sentry_approval(
error_context=error_context,
analysis=analysis,
anomaly_frequency=frequency_dict,
)
await record_external_alert_event(
project_id="awoooi",
provider="sentry",
event_id=str(issue_id or error_context.get("issue_id") or "unknown"),
stage="approval_linked",
title=str(error_context.get("title") or "Sentry issue"),
severity=str(error_context.get("level") or "error"),
namespace="sentry",
target_resource=str(error_context.get("culprit") or error_context.get("project") or "unknown"),
fingerprint=f"sentry-{issue_id or error_context.get('issue_id') or 'unknown'}",
approval_id=approval_id,
source_url=error_context.get("source_url"),
labels={
"project": error_context.get("project"),
"level": error_context.get("level"),
},
annotations={"message": error_context.get("message")},
payload={
"anomaly_frequency": frequency_dict,
"ai_analyzed": analysis is not None,
"ai_provider": analysis.analyzed_by if analysis else None,
},
)
# 4. 發送 Telegram 告警 (含頻率資訊)
await send_sentry_telegram_alert(
error_context=error_context,
analysis=analysis,
approval_id=approval_id,
anomaly_frequency=frequency_dict,
)
# 5. 回寫 Sentry Comment (如果分析成功,含頻率資訊)
if analysis:
await post_sentry_comment(
project_slug=project_slug,
issue_id=issue_id,
analysis=analysis,
anomaly_frequency=frequency_dict,
)
logger.info(
"sentry_analysis_completed",
issue_id=issue_id,
approval_id=approval_id,
has_analysis=analysis is not None,
escalation_level=frequency.escalation_level,
)
# Wave A.5: 記錄告警鏈路成功 (ADR-037)
record_alert_chain_success("sentry")
record_alert_processed(
source="sentry",
severity=error_context.get("level", "error"),
outcome="incident_created",
)
except Exception as e:
logger.exception("sentry_analysis_failed", issue_id=issue_id, error=str(e))
# Wave A.5: 記錄告警鏈路失敗 (ADR-037)
record_alert_chain_failure("sentry")
async def call_openclaw_analyzer(error_context: dict) -> ErrorAnalysisResult | None:
"""
呼叫 OpenClaw Error Analyzer Agent
ADR-038: 雙層保護
- Layer 1: Circuit Breaker5 連續失敗 → 斷路 60 秒)
- Layer 2: Concurrency Semaphore最多 3 並發)
優先使用 Ollama (本地,零成本)
Fallback: Claude (高嚴重性)
Phase 22 P0 修復: 使用 OpenClawHttpService (2026-03-31)
"""
guard = get_openclaw_guard()
# ADR-038 Layer 1: Circuit Breaker 快速失敗
if guard.is_circuit_open():
logger.warning(
"openclaw_circuit_open",
metrics=guard.get_metrics(),
)
return None
# ADR-038 Layer 2: Concurrency Semaphore 排隊
async with guard.semaphore:
try:
# Phase 22 P0: 使用 Service 層而非直接 httpx
service = get_openclaw_http_service()
data = await service.analyze_error(
error_context=error_context,
prefer_local=True,
timeout=60.0,
)
if data:
guard.record_success()
return ErrorAnalysisResult(**data)
else:
guard.record_failure()
return None
except Exception as e:
logger.exception(f"OpenClaw call failed: {e}")
guard.record_failure()
return None
async def post_sentry_comment(
project_slug: str,
issue_id: str,
analysis: ErrorAnalysisResult,
anomaly_frequency: dict | None = None,
):
"""
回寫分析結果到 Sentry Issue Comment
API: POST /api/0/issues/{issue_id}/comments/
Phase 21 (ADR-037): 含異常頻率統計
"""
# 頻率統計區塊 (ADR-037)
frequency_section = ""
if anomaly_frequency and anomaly_frequency.get("count_24h", 0) > 1:
freq = anomaly_frequency
escalation_emoji = {
None: "",
"REPEAT": ":warning:",
"ESCALATE": ":red_circle:",
"PERMANENT_FIX": ":rotating_light:",
}.get(freq.get("escalation_level"), "")
frequency_section = f"""
## 頻率統計 {escalation_emoji}
| 時間窗口 | 次數 |
|---------|------|
| 1 小時 | {freq.get('count_1h', 0)} |
| 24 小時 | {freq.get('count_24h', 0)} |
| 7 天 | {freq.get('count_7d', 0)} |
| 30 天 | {freq.get('count_30d', 0)} |
**修復嘗試**: {freq.get('auto_repair_count', 0)}
"""
if freq.get("escalation_level"):
frequency_section += f"**升級建議**: {freq['escalation_level']}\n"
comment_text = f"""## AI 錯誤分析 (by {analysis.analyzed_by})
**根本原因 (Root Cause)**
{analysis.root_cause}
**影響範圍 (Impact)**
{analysis.impact}
**建議修復 (Fix Suggestion)**
{analysis.fix_suggestion}
**預防措施 (Prevention)**
{analysis.prevention}
{frequency_section}
---
*分析信心度: {analysis.confidence:.0%} | 分析時間: {now_taipei_iso()}*
"""
try:
# Wave A.4: 使用 SentryService 回寫 Comment (ADR-037, 2026-03-29)
# 符合 leWOOOgo 模組化原則: Router 層透過 Service 層存取外部 API
sentry_service = get_sentry_service()
result = await sentry_service.post_issue_comment(issue_id, comment_text)
if result:
logger.info(
"sentry_comment_success",
issue_id=issue_id,
comment_id=result.get("id"),
)
else:
# Token 未配置或 API 失敗時記錄 (不中斷流程)
logger.warning("sentry_comment_skipped_or_failed", issue_id=issue_id)
except Exception as e:
logger.exception("sentry_comment_failed", issue_id=issue_id, error=str(e))
# =============================================================================
# Telegram 告警 (Phase 10: Sentry + OpenClaw)
# =============================================================================
async def send_sentry_telegram_alert(
error_context: dict,
analysis: ErrorAnalysisResult | None,
approval_id: str,
anomaly_frequency: dict | None = None,
):
"""
發送 Sentry 錯誤告警到 Telegram
Phase 21 (ADR-037): 含異常頻率統計
格式 (project_sentry_openclaw_v2.md):
═══════════════════════════
🐛 SENTRY 錯誤告警
═══════════════════════════
📍 components/dashboard.tsx:142
❌ TypeError: Cannot read property 'x' of null
📊 頻率: 1h:3 / 24h:12 / 7d:42
───────────────────────────
🧠 OpenClaw 分析:
「這是 null check 問題...」
───────────────────────────
[ Y 建立修復任務 ] [ n 忽略 ]
"""
try:
telegram = get_telegram_gateway()
await telegram.initialize()
# 提取錯誤資訊
title = error_context.get("title", "Unknown Error")
culprit = error_context.get("culprit", "unknown")
level = error_context.get("level", "error")
# 發送 Sentry 告警卡片 (含 Y/n 按鈕)
# TODO(2026-04-05): Sentry 路徑無 incident_id待 Sentry→Incident 關聯後補傳
await telegram.send_approval_card(
approval_id=approval_id,
risk_level="high" if level in ["fatal", "error"] else "medium",
resource_name=culprit,
root_cause=analysis.root_cause if analysis else title,
suggested_action=analysis.fix_suggestion if analysis else "待分析",
primary_responsibility="FE" if "tsx" in culprit or "jsx" in culprit else "BE",
confidence=analysis.confidence if analysis else 0.0,
namespace="sentry",
anomaly_frequency=anomaly_frequency,
# 2026-04-02 ogt: 修復 ai_provider 未傳遞 → Telegram 顯示「AI 仲裁判定」而非具體模型名稱
ai_provider=analysis.analyzed_by if analysis else "",
)
logger.info(
"sentry_telegram_sent",
approval_id=approval_id,
escalation_level=anomaly_frequency.get("escalation_level") if anomaly_frequency else None,
)
except Exception as e:
logger.exception("sentry_telegram_failed", error=str(e))
# =============================================================================
# Approval 創建 (Phase 10: Sentry + OpenClaw)
# =============================================================================
async def create_sentry_approval(
error_context: dict,
analysis: ErrorAnalysisResult | None,
anomaly_frequency: dict | None = None,
) -> str:
"""
為 Sentry 錯誤建立 Approval 記錄
Phase 21 (ADR-037): 含異常頻率統計
Returns:
str: Approval ID
"""
try:
approval_service = get_approval_service()
# 決定風險等級 (考慮頻率升級)
level = error_context.get("level", "error")
risk_level = SENTRY_LEVEL_TO_RISK.get(level, RiskLevel.MEDIUM)
# ADR-037: 根據頻率升級風險等級
if anomaly_frequency:
escalation = anomaly_frequency.get("escalation_level")
if escalation == "PERMANENT_FIX":
risk_level = RiskLevel.CRITICAL
elif escalation == "ESCALATE" and risk_level != RiskLevel.CRITICAL:
risk_level = RiskLevel.HIGH
# 組裝 Approval 請求
title = error_context.get("title", "Unknown Error")
culprit = error_context.get("culprit", "unknown")
project = error_context.get("project", "unknown")
issue_id = error_context.get("issue_id", "unknown")
# 組裝 metadata (含頻率資訊)
metadata = {
"source": "sentry",
"alert_type": f"sentry_{level}",
"sentry_issue_id": issue_id,
"sentry_project": project,
"culprit": culprit,
"error_count": error_context.get("count", 1),
"first_seen": error_context.get("first_seen"),
"stacktrace": error_context.get("stacktrace", []),
"llm_provider": analysis.analyzed_by if analysis else "pending",
"llm_confidence": analysis.confidence if analysis else 0.0,
}
# ADR-037: 添加頻率資訊到 metadata
if anomaly_frequency:
metadata["anomaly_frequency"] = anomaly_frequency
# 組裝 Approval 請求 (符合 ApprovalRequestBase schema)
approval_request = ApprovalRequestCreate(
action=f"Sentry {level.upper()} Alert: {culprit}",
description=f"{title}\n\nRoot Cause: {analysis.root_cause if analysis else '待分析'}\nSuggestion: {analysis.fix_suggestion if analysis else '待 AI 分析'}",
risk_level=risk_level,
blast_radius=BlastRadius(
affected_pods=1,
estimated_downtime="0",
related_services=[project],
data_impact=DataImpact.READ_ONLY,
),
dry_run_checks=[], # Sentry 告警無 dry-run
requested_by="sentry-webhook",
metadata=metadata,
)
# 創建 Approval (ID 由 Service 自動生成)
approval = await approval_service.create_approval(request=approval_request)
approval_id = str(approval.id)
logger.info(
"sentry_approval_created",
approval_id=approval_id,
issue_id=issue_id,
risk_level=risk_level.value,
)
return approval_id
except Exception as e:
logger.exception("sentry_approval_creation_failed", error=str(e))
# 即使創建失敗,也返回一個臨時 ID
return f"temp-{uuid.uuid4().hex[:8]}"