fix(alerts): guard approval actions and wire playbook learning
All checks were successful
Code Review / ai-code-review (push) Successful in 10s
CD Pipeline / tests (push) Successful in 42s
CD Pipeline / build-and-deploy (push) Successful in 3m31s
CD Pipeline / post-deploy-checks (push) Successful in 1m18s

This commit is contained in:
Your Name
2026-05-06 03:34:24 +08:00
parent 5890fffd7f
commit 3b64d66836
5 changed files with 568 additions and 31 deletions

View File

@@ -33,14 +33,8 @@ from pydantic import BaseModel, Field
from src.core.config import settings
from src.core.constants import is_cicd_alertname, is_heartbeat_alertname
from src.services.alert_rule_engine import get_incident_type, match_rule
from src.services.action_parser import is_safe_kubectl_action
from src.services.security_interceptor import check_webhook_nonce # P0-06: nonce dedup via Service 層
from src.core.logging import get_logger
from src.core.metrics import record_alert_chain_success
# Phase 15.2: Trace Context (moved to SignalProducerService)
# get_trace_context 已移至 Service 層
from src.models.approval import (
ApprovalRequestCreate,
BlastRadius,
@@ -48,31 +42,39 @@ from src.models.approval import (
DryRunCheck,
RiskLevel,
)
# R4 #129 (2026-04-01 ogt): AlertPayload/AlertResponse 移至 models 層AlertAnalyzer 移至 services 層
# ogt 更新 v1.1 2026-04-01 台北時間: generate_alert_fingerprint 移至 alert_analyzer_service (ADR-024)
# [首席架構師] 移除 generate_alert_fingerprint 直接 import改用 AlertAnalyzer.generate_fingerprint v1.2 2026-04-01 Asia/Taipei
from src.models.webhook import AlertPayload, AlertResponse
from src.services.action_parser import is_safe_kubectl_action
from src.services.alert_analyzer_service import AlertAnalyzer
from src.services.alert_approval_guard import guard_alert_approval_action
from src.services.alert_grouping_service import get_alert_grouping_service
from src.services.alert_rule_engine import get_incident_type, match_rule
from src.services.alertmanager_llm_guard import (
ALERTMANAGER_LLM_INFLIGHT_LOCK_TTL_SECONDS,
try_acquire_alertmanager_llm_lock,
)
from src.services.approval_db import get_approval_service
from src.services.auto_approve import get_auto_approve_policy
from src.services.auto_repair_service import AutoRepairService
# Phase 15.2: Trace Context (moved to SignalProducerService)
# get_trace_context 已移至 Service 層
# R4 #129 (2026-04-01 ogt): AlertPayload/AlertResponse 移至 models 層AlertAnalyzer 移至 services 層
# ogt 更新 v1.1 2026-04-01 台北時間: generate_alert_fingerprint 移至 alert_analyzer_service (ADR-024)
# [首席架構師] 移除 generate_alert_fingerprint 直接 import改用 AlertAnalyzer.generate_fingerprint v1.2 2026-04-01 Asia/Taipei
# Phase 17 P0: Service 層 (消除 Router 直接存取 Redis)
# C2 修正 (首席架構師審查 2026-04-10): create_incident_for_approval + extract_affected_services 已移入 Service 層
from src.services.incident_service import (
classify_alert_early,
create_incident_for_approval,
extract_affected_services,
get_incident_service,
)
from src.services.auto_approve import get_auto_approve_policy
from src.services.auto_repair_service import AutoRepairService
# Phase 5: OpenClaw AI Engine
from src.services.openclaw import get_openclaw
from src.services.playbook_match_resolver import resolve_playbook_id_for_alert
from src.services.security_interceptor import check_webhook_nonce # P0-06: nonce dedup via Service 層
from src.services.signal_producer import SignalData, get_signal_producer
# Phase 5: Telegram Gateway (行動戰情室)
@@ -81,9 +83,6 @@ from src.services.telegram_gateway import TelegramGatewayError, get_telegram_gat
# Phase 18.1.7: K8s 資源名稱正規化 已移至 alert_analyzer_service (R4 #129)
from src.utils.timezone import now_taipei
# ADR-076: 告警聚合引擎 (2026-04-14 Claude Haiku 4.5 Asia/Taipei)
from src.services.alert_grouping_service import get_alert_grouping_service
router = APIRouter(prefix="/webhooks", tags=["Webhooks"])
logger = get_logger("awoooi.webhooks")
@@ -1147,15 +1146,33 @@ async def receive_alert(
data_impact = impact_mapping.get(blast.data_impact.value, DataImpact.NONE)
# 2026-04-27 Claude Sonnet 4.6: shadow-run Step1 — 補 metadata kwarg讓 extra_metadata 可觀測
_cmd_cs1 = (analysis_result.kubectl_command or "").strip()
_alertname_cs1 = str((alert.labels or {}).get("alertname") or alert.alert_type or "")
_guarded_action_cs1 = await guard_alert_approval_action(
action=(_cmd_cs1 or f"{analysis_result.action_title} | NO_ACTION"),
alert_namespace=alert.namespace,
alertname=_alertname_cs1,
alert_category=get_incident_type(_alertname_cs1),
)
_matched_playbook_id_cs1 = await resolve_playbook_id_for_alert(
alertname=_alertname_cs1,
affected_services=analysis_result.affected_services
or ([alert.target_resource] if alert.target_resource else []),
severity=risk_level.value,
)
if _guarded_action_cs1.blocked:
risk_level = RiskLevel.LOW
_cmd_cs1 = ""
_approval_metadata_cs1 = {
"source": ai_provider,
"confidence_score": analysis_result.confidence,
"is_rule_based": False,
"playbook_id": None,
"playbook_id": _matched_playbook_id_cs1,
**_guarded_action_cs1.metadata,
}
_cmd_cs1 = (analysis_result.kubectl_command or "").strip()
approval_create = ApprovalRequestCreate(
action=(_cmd_cs1 or f"{analysis_result.action_title} | NO_ACTION"),
action=_guarded_action_cs1.action,
description=f"[AI: {ai_provider}] {analysis_result.action_title} | {analysis_result.description}",
risk_level=risk_level,
blast_radius=BlastRadius(
@@ -1172,6 +1189,7 @@ async def receive_alert(
],
requested_by=f"OpenClaw ({ai_provider})",
metadata=_approval_metadata_cs1,
matched_playbook_id=_matched_playbook_id_cs1,
)
suggested_action = analysis_result.kubectl_command
else:
@@ -1218,7 +1236,7 @@ async def receive_alert(
# 設計confidence ≥ 0.85 + 非 CRITICAL + 非破壞性 + 有 kubectl 指令 → 直接執行
# 安全防線CRITICAL / destructive patterns / NO_ACTION/INVESTIGATE/OBSERVE / 空 kubectl → 降級 PENDING
if analysis_result:
_cs1_kubectl = analysis_result.kubectl_command.strip() if analysis_result.kubectl_command else ""
_cs1_kubectl = _cmd_cs1
_cs1_can_auto = (
bool(_cs1_kubectl)
and analysis_result.confidence >= 0.85
@@ -1239,7 +1257,7 @@ async def receive_alert(
required_signatures=0,
status=ApprovalStatus.APPROVED,
risk_level=risk_level.value,
matched_playbook_id=None,
matched_playbook_id=_matched_playbook_id_cs1,
metadata={
**_approval_metadata_cs1,
"is_high_confidence": True,
@@ -1489,7 +1507,6 @@ async def _process_new_alert_background(
str(blast.get("data_impact", "NONE")).upper(),
DataImpact.NONE,
)
rule_action_title = str(rule_response.get("action_title", "人工排查主機告警"))
rule_kubectl = str(rule_response.get("kubectl_command", "")).strip()
rule_description = str(rule_response.get("description", message))
rule_action = (
@@ -1497,13 +1514,31 @@ async def _process_new_alert_background(
if rule_kubectl else
f"NO_ACTION - {rule_description[:120]}"
)
_matched_playbook_id_cs2 = await resolve_playbook_id_for_alert(
rule_id=str(rule_response.get("rule_id", "")),
alertname=alertname,
affected_services=[target_resource] if target_resource else [],
severity=rule_risk.value,
)
_guarded_action_cs2 = await guard_alert_approval_action(
action=rule_action,
alert_namespace=namespace,
alertname=alertname,
alert_category=alert_category,
)
if _guarded_action_cs2.blocked:
rule_action = _guarded_action_cs2.action
rule_kubectl = ""
rule_risk = RiskLevel.LOW
# 2026-04-27 Claude Sonnet 4.6: shadow-run Step1 — 補 metadata kwarg讓 extra_metadata 可觀測
_approval_metadata_cs2 = {
"source": "rule_engine",
"confidence_score": float(rule_response.get("confidence", 0.0) or 0.0),
"is_rule_based": True,
"playbook_id": str(rule_response.get("rule_id", "")) or None,
"rule_id": str(rule_response.get("rule_id", "")) or None,
"playbook_id": _matched_playbook_id_cs2,
**_guarded_action_cs2.metadata,
}
approval_create = ApprovalRequestCreate(
action=rule_action,
@@ -1534,6 +1569,7 @@ async def _process_new_alert_background(
],
requested_by="OpenClaw (rule-engine)",
metadata=_approval_metadata_cs2,
matched_playbook_id=_matched_playbook_id_cs2,
)
approval = await service.create_approval_with_fingerprint(
@@ -1584,7 +1620,7 @@ async def _process_new_alert_background(
required_signatures=0,
status=ApprovalStatus.APPROVED,
risk_level=rule_risk.value,
matched_playbook_id=_approval_metadata_cs2.get("playbook_id"),
matched_playbook_id=_matched_playbook_id_cs2,
)
# 使用 DB 中剛建立的 approval.id 讓 executor 可回寫
_auto_approval.id = approval.id
@@ -1724,15 +1760,34 @@ async def _process_new_alert_background(
data_impact = impact_mapping.get(blast.data_impact.value, DataImpact.NONE) if blast else DataImpact.NONE
# 2026-04-27 Claude Sonnet 4.6: shadow-run Step1 — 補 metadata kwarg讓 extra_metadata 可觀測
_cmd_cs3 = (analysis_result.kubectl_command or "").strip()
_guarded_action_cs3 = await guard_alert_approval_action(
action=(_cmd_cs3 or f"{analysis_result.action_title} | NO_ACTION"),
alert_namespace=namespace,
alertname=alertname,
alert_category=alert_category,
)
_matched_playbook_id_cs3 = await resolve_playbook_id_for_alert(
rule_id=str(rule_response.get("rule_id", "")),
alertname=alertname,
affected_services=analysis_result.affected_services
or ([target_resource] if target_resource else []),
severity=risk_level.value,
)
if _guarded_action_cs3.blocked:
risk_level = RiskLevel.LOW
_cmd_cs3 = ""
_approval_metadata_cs3 = {
"source": ai_provider,
"confidence_score": analysis_result.confidence,
"is_rule_based": False,
"playbook_id": None,
"rule_id": str(rule_response.get("rule_id", "")) or None,
"playbook_id": _matched_playbook_id_cs3,
**_guarded_action_cs3.metadata,
}
_cmd_cs3 = (analysis_result.kubectl_command or "").strip()
approval_create = ApprovalRequestCreate(
action=(_cmd_cs3 or f"{analysis_result.action_title} | NO_ACTION"),
action=_guarded_action_cs3.action,
description=f"[AI: {ai_provider}] {analysis_result.action_title} | {analysis_result.description}",
risk_level=risk_level,
blast_radius=BlastRadius(
@@ -1747,6 +1802,7 @@ async def _process_new_alert_background(
],
requested_by=f"OpenClaw ({ai_provider})",
metadata=_approval_metadata_cs3,
matched_playbook_id=_matched_playbook_id_cs3,
)
approval = await service.create_approval_with_fingerprint(
@@ -1760,7 +1816,7 @@ async def _process_new_alert_background(
"risk_level": risk_level.value,
"confidence": analysis_result.confidence,
"action": approval_create.action,
"kubectl_command": analysis_result.kubectl_command,
"kubectl_command": _cmd_cs3,
"is_rule_based": False,
"source": ai_provider,
}
@@ -1776,7 +1832,7 @@ async def _process_new_alert_background(
logger.warning("shadow_auto_approve_failed", error=str(_shadow_err_cs3))
# 2026-04-27 Claude Sonnet 4.6: CS3 LLM 高信心自動執行修法3擴展
_cs3_kubectl = (analysis_result.kubectl_command or "").strip()
_cs3_kubectl = _cmd_cs3
_cs3_can_auto = (
bool(_cs3_kubectl)
and analysis_result.confidence >= 0.85
@@ -1793,7 +1849,7 @@ async def _process_new_alert_background(
required_signatures=0,
status=ApprovalStatus.APPROVED,
risk_level=risk_level.value,
matched_playbook_id=None,
matched_playbook_id=_matched_playbook_id_cs3,
metadata={
**_approval_metadata_cs3,
"is_high_confidence": True,
@@ -1895,7 +1951,7 @@ async def _process_new_alert_background(
risk_level=risk_level.value,
resource_name=target_resource,
root_cause=root_cause,
suggested_action=(analysis_result.kubectl_command or "").strip() or analysis_result.suggested_action.value,
suggested_action=approval_create.action,
estimated_downtime=estimated_downtime,
hit_count=1,
primary_responsibility=primary_responsibility,

View File

@@ -0,0 +1,151 @@
"""Alert approval guardrails for AI-generated remediation actions.
This service runs before an Alertmanager-derived action becomes an
ApprovalRecord. It prevents a known failure mode: an LLM invents a kubectl
target that does not belong to the current alert domain, then the approval
pipeline faithfully executes or displays that bad command.
"""
from __future__ import annotations
from dataclasses import dataclass, field
import structlog
from src.services.action_parser import ActionKind, parse_kubectl_action
logger = structlog.get_logger(__name__)
_ALLOWED_K8S_NAMESPACES = frozenset({"awoooi-prod", "observability", "signoz", "langfuse"})
@dataclass(frozen=True)
class ApprovalActionGuardResult:
"""Guarded action payload returned to approval creation."""
action: str
blocked: bool = False
reason: str | None = None
metadata: dict[str, object] = field(default_factory=dict)
async def guard_alert_approval_action(
*,
action: str,
alert_namespace: str | None,
alertname: str,
alert_category: str,
) -> ApprovalActionGuardResult:
"""Validate an AI/rule action before it is persisted as an approval.
Non-kubectl actions are intentionally left to their domain-specific gates.
Kubectl actions must satisfy the structured parser and must not jump to an
unrelated namespace such as ``default`` or ``production`` when the alert
came from AWOOOI's production namespace.
"""
raw_action = (action or "").strip()
if not raw_action.lower().startswith("kubectl"):
return ApprovalActionGuardResult(action=action)
parsed = parse_kubectl_action(raw_action)
if not parsed.ok:
return _blocked(raw_action, f"invalid_kubectl:{parsed.reason}", alertname)
requested_namespace = parsed.namespace
expected_namespace = (alert_namespace or "awoooi-prod").strip() or "awoooi-prod"
if requested_namespace and requested_namespace not in _ALLOWED_K8S_NAMESPACES:
return _blocked(
raw_action,
f"namespace_not_allowed:{requested_namespace}",
alertname,
expected_namespace=expected_namespace,
)
if (
requested_namespace
and expected_namespace in _ALLOWED_K8S_NAMESPACES
and requested_namespace != expected_namespace
and requested_namespace != "observability"
):
return _blocked(
raw_action,
f"namespace_mismatch:{requested_namespace}!={expected_namespace}",
alertname,
expected_namespace=expected_namespace,
)
# Read-only commands are safe enough to display once the namespace is sane.
# Mutating commands still need resource existence checks to avoid executing
# hallucinated deployments like "flywheelexecutionratemissing".
if parsed.kind == ActionKind.READONLY and parsed.verb in {"get", "version"}:
return ApprovalActionGuardResult(action=action)
if parsed.resource_name and parsed.resource_type in {
"deployment",
"statefulset",
"daemonset",
"pod",
"service",
}:
try:
from src.services.resource_resolver import get_resource_resolver
resolver = get_resource_resolver()
resolved = await resolver.resolve(
raw_resource=parsed.resource_name,
namespace=requested_namespace or expected_namespace,
resource_kind=parsed.resource_type,
)
if not resolved.success:
return _blocked(
raw_action,
f"k8s_resource_not_found:{parsed.resource_type}/{parsed.resource_name}",
alertname,
expected_namespace=expected_namespace,
candidates=resolved.candidates,
)
except Exception as exc:
logger.warning(
"approval_action_resource_guard_unavailable",
alertname=alertname,
alert_category=alert_category,
action=raw_action[:160],
error=str(exc),
)
return ApprovalActionGuardResult(
action=action,
metadata={"action_guard_warning": "resource_guard_unavailable"},
)
return ApprovalActionGuardResult(action=action)
def _blocked(
raw_action: str,
reason: str,
alertname: str,
*,
expected_namespace: str | None = None,
candidates: list[str] | None = None,
) -> ApprovalActionGuardResult:
logger.warning(
"approval_action_blocked_before_persist",
alertname=alertname,
reason=reason,
action=raw_action[:160],
expected_namespace=expected_namespace,
candidates=candidates or [],
)
return ApprovalActionGuardResult(
action=f"NO_ACTION - INVALID_TARGET: {reason}; original={raw_action[:180]}",
blocked=True,
reason=reason,
metadata={
"action_guard": "blocked_before_persist",
"blocked_action": raw_action[:300],
"blocked_reason": reason,
"expected_namespace": expected_namespace,
"candidates": candidates or [],
},
)

View File

@@ -0,0 +1,189 @@
"""Resolve alert/rule context to a real Playbook ID.
The learning loop updates EWMA trust only when ``approval_records`` carries the
actual ``playbooks.playbook_id``. YAML rule IDs such as ``host_resource_alert``
are not Playbook IDs, so this resolver bridges rule/alert context to the
canonical DB identity before an ApprovalRecord is created.
"""
from __future__ import annotations
from dataclasses import dataclass
from functools import lru_cache
from pathlib import Path
import structlog
import yaml
from sqlalchemy import text as sa_text
from src.db.base import get_db_context
logger = structlog.get_logger(__name__)
@dataclass(frozen=True)
class PlaybookMatch:
playbook_id: str
source: str
async def resolve_playbook_id_for_alert(
*,
rule_id: str | None = None,
alertname: str | None = None,
affected_services: list[str] | None = None,
severity: str | None = None,
) -> str | None:
"""Return a real ``playbooks.playbook_id`` for alert context if available."""
match = await _resolve_exact_yaml_rule(rule_id=rule_id, alertname=alertname)
if match:
return match.playbook_id
match = await _resolve_by_recommendation(
alertname=alertname,
affected_services=affected_services or [],
severity=severity,
)
return match.playbook_id if match else None
async def _resolve_exact_yaml_rule(
*,
rule_id: str | None,
alertname: str | None,
) -> PlaybookMatch | None:
"""Use deterministic DB fields before falling back to fuzzy recommendations."""
rule_id = (rule_id or "").strip()
alertname = (alertname or "").strip()
if not rule_id and not alertname:
return None
alertname_candidates = [alertname]
alertname_candidates.extend(_alertnames_for_rule_id(rule_id))
alertname_candidates = list(dict.fromkeys(name for name in alertname_candidates if name))
try:
async with get_db_context() as db:
if rule_id:
row = (
await db.execute(
sa_text(
"""
SELECT playbook_id
FROM playbooks
WHERE source = 'yaml_rule'
AND status = 'approved'
AND (
name = ('AutoMigrated: ' || :rule_id)
OR notes ILIKE ('%rule.id=' || :rule_id || '%')
)
ORDER BY updated_at DESC
LIMIT 1
"""
),
{"rule_id": rule_id},
)
).first()
if row:
return PlaybookMatch(playbook_id=str(row[0]), source="exact_yaml_rule")
for candidate in alertname_candidates:
row = (
await db.execute(
sa_text(
"""
SELECT playbook_id
FROM playbooks
WHERE source = 'yaml_rule'
AND status = 'approved'
AND (symptom_pattern::jsonb->'alert_names') ? :alertname
ORDER BY updated_at DESC
LIMIT 1
"""
),
{"alertname": candidate},
)
).first()
if row:
return PlaybookMatch(playbook_id=str(row[0]), source="exact_alertname")
return None
except Exception as exc:
logger.warning(
"playbook_exact_match_failed",
rule_id=rule_id,
alertname=alertname,
error=str(exc),
)
return None
@lru_cache(maxsize=1)
def _rule_alertname_index() -> dict[str, tuple[str, ...]]:
rules_path = Path(__file__).resolve().parents[2] / "alert_rules.yaml"
try:
data = yaml.safe_load(rules_path.read_text(encoding="utf-8")) or {}
except Exception as exc:
logger.debug("playbook_rule_index_load_failed", path=str(rules_path), error=str(exc))
return {}
index: dict[str, tuple[str, ...]] = {}
for rule in data.get("rules", []):
if not isinstance(rule, dict):
continue
rule_id = str(rule.get("id") or "").strip()
alertnames = rule.get("match", {}).get("alertname", [])
if rule_id and isinstance(alertnames, list):
index[rule_id] = tuple(str(name) for name in alertnames if name)
return index
def _alertnames_for_rule_id(rule_id: str) -> tuple[str, ...]:
if not rule_id:
return ()
return _rule_alertname_index().get(rule_id, ())
async def _resolve_by_recommendation(
*,
alertname: str | None,
affected_services: list[str],
severity: str | None,
) -> PlaybookMatch | None:
alertname = (alertname or "").strip()
if not alertname and not affected_services:
return None
try:
from src.models.playbook import SymptomPattern
from src.services.playbook_service import get_playbook_service
symptoms = SymptomPattern(
alert_names=[alertname] if alertname else [],
affected_services=affected_services,
severity_range=[severity or "P2"],
)
recommendations = await get_playbook_service().get_recommendations(
symptoms=symptoms,
top_k=1,
use_rag=False,
)
if not recommendations:
return None
best = recommendations[0]
if best.similarity_score < 0.5:
return None
return PlaybookMatch(
playbook_id=best.playbook.playbook_id,
source="symptom_recommendation",
)
except Exception as exc:
logger.debug(
"playbook_recommendation_match_skipped",
alertname=alertname,
affected_services=affected_services,
error=str(exc),
)
return None

View File

@@ -0,0 +1,92 @@
from __future__ import annotations
import pytest
from src.services.alert_approval_guard import guard_alert_approval_action
from src.services.resource_resolver import ResolveResult, set_resource_resolver
from src.utils.k8s_naming import ResourceType
class StubResolver:
def __init__(self, *, success: bool, candidates: list[str] | None = None) -> None:
self.success = success
self.candidates = candidates or []
self.calls: list[dict[str, str]] = []
async def resolve(
self,
raw_resource: str,
namespace: str = "awoooi-prod",
resource_kind: str = "deployment",
) -> ResolveResult:
self.calls.append(
{
"raw_resource": raw_resource,
"namespace": namespace,
"resource_kind": resource_kind,
}
)
return ResolveResult(
success=self.success,
resource_name=raw_resource if self.success else None,
namespace=namespace,
resource_type=ResourceType(resource_kind)
if resource_kind in ResourceType._value2member_map_
else ResourceType.UNKNOWN,
confidence=1.0 if self.success else 0.0,
candidates=self.candidates,
original_input=raw_resource,
)
@pytest.mark.asyncio
async def test_blocks_llm_kubectl_default_namespace_for_prod_alert() -> None:
result = await guard_alert_approval_action(
action="kubectl logs deployment/sentry-self-hosted-snuba-metrics-consumer-1 -n default",
alert_namespace="awoooi-prod",
alertname="SentryRpsZero",
alert_category="infrastructure",
)
assert result.blocked is True
assert result.action.startswith("NO_ACTION - INVALID_TARGET")
assert result.reason == "namespace_not_allowed:default"
@pytest.mark.asyncio
async def test_blocks_hallucinated_mutating_k8s_resource() -> None:
resolver = StubResolver(success=False, candidates=["awoooi-api"])
set_resource_resolver(resolver)
try:
result = await guard_alert_approval_action(
action="kubectl scale deployment flywheelexecutionratemissing --replicas=5 -n awoooi-prod",
alert_namespace="awoooi-prod",
alertname="FlywheelExecutionRateMissing",
alert_category="infrastructure",
)
finally:
set_resource_resolver(None)
assert result.blocked is True
assert result.reason == "k8s_resource_not_found:deployment/flywheelexecutionratemissing"
assert result.metadata["candidates"] == ["awoooi-api"]
assert resolver.calls == [
{
"raw_resource": "flywheelexecutionratemissing",
"namespace": "awoooi-prod",
"resource_kind": "deployment",
}
]
@pytest.mark.asyncio
async def test_allows_sane_readonly_cluster_inventory() -> None:
result = await guard_alert_approval_action(
action="kubectl get pods -n awoooi-prod",
alert_namespace="awoooi-prod",
alertname="ColdStartCheck",
alert_category="infrastructure",
)
assert result.blocked is False
assert result.action == "kubectl get pods -n awoooi-prod"

View File

@@ -3423,3 +3423,52 @@ curl -fsS 'https://awoooi.wooo.work/api/v1/platform/tenants' | jq '{total, tenan
# awoooi = legacy_awoooi_default
# ewoooc = shadow
```
---
## 2026-05-06台北— Alert Approval Guard + Playbook Trust 接線
**觸發**Telegram 告警仍出現 Gemini/LLM 產生的錯域 `kubectl` 指令,例如把 Sentry Docker container 當成 K8s deployment或把 `FlywheelExecutionRateMissing` 當 deployment scale同時 production 24h `approval_records.matched_playbook_id` 為 0導致 learning service 無法更新 Playbook trust。
### 已修正
| 範圍 | 結果 |
|------|------|
| Approval 前置閘門 | 新增 `alert_approval_guard.py`AI/rule action 寫入 `ApprovalRecord` 前先檢查 kubectl grammar、namespace 與 K8s resource target |
| 錯域動作處理 | `default` / `production` / 不存在 deployment 會降級為 `NO_ACTION - INVALID_TARGET`,避免錯誤命令進入批准與執行 |
| Playbook 真 ID | 新增 `playbook_match_resolver.py`,將 YAML `rule_id` / alertname 解析成真正 `PB-...`,不再把 rule id 偽裝成 playbook id |
| Alertmanager 入口 | CS1 / CS2 / CS3 建立 approval 時填入 `matched_playbook_id`auto-execute 也沿用同一個 PB ID |
| Telegram 顯示 | 被 guard 擋下的建議動作顯示為明確 `INVALID_TARGET`,不再把幻覺 kubectl 當成可執行建議 |
### 驗證
```bash
DATABASE_URL=postgresql+asyncpg://awoooi:awoooi_test_2026@localhost:5432/awoooi_test \
/Users/ogt/awoooi/apps/api/.venv/bin/python -m pytest \
apps/api/tests/test_alert_approval_guard.py \
apps/api/tests/test_action_parser_safety.py \
apps/api/tests/test_rule_engine_auto_execute.py \
apps/api/tests/test_matched_playbook_id_e2e.py \
apps/api/tests/test_learning_chain_e2e.py -q
# 59 passed
/Users/ogt/awoooi/apps/api/.venv/bin/python -m ruff check --select E9,F401,F821,F841 \
apps/api/src/services/alert_approval_guard.py \
apps/api/src/services/playbook_match_resolver.py \
apps/api/src/api/v1/webhooks.py \
apps/api/tests/test_alert_approval_guard.py
# All checks passed
```
線上只讀 resolver spot check
```text
HostHighCpuLoad -> PB-20260427-C29FE4
NodeExporterDown -> PB-20260420-282F79
DockerContainerCpuSustainedHigh -> PB-20260505-F4197B
```
### 後續
- 部署後觀察 24h`approval_records.matched_playbook_id IS NOT NULL` 必須從 0 開始增加。
- 若 guard 擋下大量 LLM 動作,下一步不是放寬 guard而是讓 PreDecision/MCP 先收 evidence再產生 domain-correct SSH/K8s action。