fix(aiops): preserve alert identity in degraded diagnosis
Some checks failed
CD Pipeline / tests (push) Successful in 1m26s
Code Review / ai-code-review (push) Successful in 12s
CD Pipeline / build-and-deploy (push) Has been cancelled
CD Pipeline / post-deploy-checks (push) Has been cancelled

This commit is contained in:
Your Name
2026-06-04 11:32:31 +08:00
parent d41360d8e7
commit 0bb4773b9e
4 changed files with 220 additions and 7 deletions

View File

@@ -227,12 +227,13 @@ Phase 4 動態異常偵測AI 主動巡檢結果,可作為高信心佐證)
latency_ms: int,
reason: str = "unknown",
) -> DiagnosisReport:
"""熔斷降級:rule-based mock用 alert_category 作簡單假設)"""
"""熔斷降級:只保留已知告警事實,不把 Docker/host memory 誤寫成 K8s OOM。"""
category = _guess_category_from_snapshot(snapshot)
description = _build_degraded_description(snapshot, reason, category)
return DiagnosisReport(
hypotheses=[
Hypothesis(
description=f"[降級] 無法完成 LLM 分析(原因: {reason})。基於告警類別推測: {category}",
description=description,
confidence=0.2,
evidence_chain=[],
category=category,
@@ -300,11 +301,48 @@ def _extract_hypotheses(parsed: dict[str, Any]) -> list[Hypothesis]:
return hypotheses
def _build_degraded_description(
snapshot: "EvidenceSnapshot",
reason: str,
category: str,
) -> str:
"""組裝降級診斷文案,明確標示這不是 LLM 根因判定。"""
alert_name, labels = _alert_identity(snapshot)
parts = [f"[降級] 無法完成 LLM 分析(原因: {reason}"]
if alert_name:
parts.append(f"保留原始告警: {alert_name}")
target = _first_label(labels, "container_name", "name", "pod", "resource", "service")
host = _first_label(labels, "host", "exported_host", "instance")
if target:
parts.append(f"target={target}")
if host:
parts.append(f"host={host}")
parts.append(f"降級分類: {category}")
return "".join(parts)
def _guess_category_from_snapshot(snapshot: "EvidenceSnapshot") -> str:
"""降級時從 snapshot 猜測告警類別(最粗粒度兜底)"""
"""降級時從 snapshot 推導保守分類,優先保留原始 alertname"""
alert_name, labels = _alert_identity(snapshot)
if alert_name:
return alert_name
summary = (snapshot.evidence_summary or "").lower()
if "oom" in summary or "memory" in summary:
layer = str(labels.get("layer") or "").lower()
job = str(labels.get("job") or "").lower()
has_container = bool(_first_label(labels, "container_name", "container", "name"))
has_k8s_pod = bool(_first_label(labels, "pod")) or "k8s" in summary or "kubernetes" in summary
has_memory_signal = _contains_memory_signal(summary)
if has_memory_signal and (
layer == "docker" or "cadvisor" in job or has_container
):
return "DockerContainerMemoryPressure"
if "oom" in summary and has_k8s_pod:
return "KubePodOOM"
if has_memory_signal:
return "MemoryPressure"
if "crashloop" in summary:
return "KubePodCrashLoop"
if "disk" in summary:
@@ -316,6 +354,56 @@ def _guess_category_from_snapshot(snapshot: "EvidenceSnapshot") -> str:
return "Unknown"
def _alert_identity(snapshot: "EvidenceSnapshot") -> tuple[str, dict[str, Any]]:
"""Extract alertname and labels from structured alert_info when available."""
info = getattr(snapshot, "alert_info", None) or {}
labels = info.get("labels") if isinstance(info, dict) else {}
if not isinstance(labels, dict):
labels = {}
alert_name = ""
if isinstance(info, dict):
alert_name = str(info.get("alert_name") or "").strip()
if not alert_name:
alert_name = str(labels.get("alertname") or "").strip()
if not alert_name:
alert_name = _extract_alertname_from_summary(getattr(snapshot, "evidence_summary", "") or "")
return alert_name, labels
def _contains_memory_signal(summary: str) -> bool:
return any(term in summary for term in ("memory", "mem", "記憶體", "內存"))
def _extract_alertname_from_summary(summary: str) -> str:
"""Best-effort parse for older snapshots whose structured alert_info is absent."""
marker = "'alert_name': '"
if marker in summary:
after = summary.split(marker, 1)[1]
return after.split("'", 1)[0].strip()
marker = '"alert_name": "'
if marker in summary:
after = summary.split(marker, 1)[1]
return after.split('"', 1)[0].strip()
marker = "'alertname': '"
if marker in summary:
after = summary.split(marker, 1)[1]
return after.split("'", 1)[0].strip()
marker = '"alertname": "'
if marker in summary:
after = summary.split(marker, 1)[1]
return after.split('"', 1)[0].strip()
return ""
def _first_label(labels: dict[str, Any], *keys: str) -> str:
for key in keys:
value = labels.get(key)
if value:
return str(value).strip()
return ""
def compute_input_hash(snapshot: "EvidenceSnapshot") -> str:
"""計算 Diagnostician 輸入的 fingerprint用於 AgentSession input_hash"""
key = (snapshot.snapshot_id or "") + (snapshot.evidence_summary or "")[:100]

View File

@@ -16,7 +16,7 @@ from typing import Any
from uuid import UUID
import structlog
from sqlalchemy import select
from sqlalchemy import select, update
from src.db.base import get_db_context
from src.db.models import ApprovalRecord
@@ -106,8 +106,6 @@ def _record_to_request(record: ApprovalRecord) -> ApprovalRequest:
# B4 fix 2026-04-24 ogt + Claude Sonnet 4.6: 補回 DB 欄位(人工審核路徑讀回必要)
incident_id=getattr(record, "incident_id", None),
matched_playbook_id=getattr(record, "matched_playbook_id", None),
telegram_message_id=getattr(record, "telegram_message_id", None),
telegram_chat_id=getattr(record, "telegram_chat_id", None),
)
@@ -153,7 +151,15 @@ class ApprovalDBRepository(IApprovalRepository):
async def get_pending(self) -> list[ApprovalRequest]:
"""取得所有待審核的 Approval"""
now = datetime.now(UTC)
async with get_db_context() as db:
await db.execute(
update(ApprovalRecord)
.where(ApprovalRecord.status == ApprovalStatus.PENDING)
.where(ApprovalRecord.expires_at < now)
.values(status=ApprovalStatus.EXPIRED, resolved_at=now)
)
result = await db.execute(
select(ApprovalRecord)
.where(ApprovalRecord.status == ApprovalStatus.PENDING)

View File

@@ -0,0 +1,46 @@
import pytest
from src.repositories import approval_repository as repo_mod
class _FakeScalars:
def all(self) -> list:
return []
class _FakeResult:
def scalars(self) -> _FakeScalars:
return _FakeScalars()
class _FakeDb:
def __init__(self) -> None:
self.statements = []
async def execute(self, statement):
self.statements.append(statement)
return _FakeResult()
class _FakeDbContext:
def __init__(self, db: _FakeDb) -> None:
self.db = db
async def __aenter__(self) -> _FakeDb:
return self.db
async def __aexit__(self, exc_type, exc, tb) -> bool:
return False
@pytest.mark.asyncio
async def test_repository_get_pending_expires_old_pending_before_select(monkeypatch) -> None:
fake_db = _FakeDb()
monkeypatch.setattr(repo_mod, "get_db_context", lambda: _FakeDbContext(fake_db))
pending = await repo_mod.ApprovalDBRepository().get_pending()
assert pending == []
assert len(fake_db.statements) == 2
assert fake_db.statements[0].__class__.__name__ == "Update"
assert fake_db.statements[1].__class__.__name__ == "Select"

View File

@@ -0,0 +1,73 @@
from types import SimpleNamespace
from src.agents.diagnostician_agent import (
_build_degraded_description,
_guess_category_from_snapshot,
)
def test_degraded_fallback_preserves_docker_memory_alertname() -> None:
snapshot = SimpleNamespace(
alert_info={
"alert_name": "DockerContainerMemoryLimitPressure",
"labels": {
"alertname": "DockerContainerMemoryLimitPressure",
"layer": "docker",
"container_name": "gitea",
"host": "110",
},
},
evidence_summary="容器 gitea 記憶體超過 limit 85%",
)
category = _guess_category_from_snapshot(snapshot)
description = _build_degraded_description(snapshot, "step_timeout", category)
assert category == "DockerContainerMemoryLimitPressure"
assert "KubePodOOM" not in description
assert "保留原始告警: DockerContainerMemoryLimitPressure" in description
assert "target=gitea" in description
assert "host=110" in description
def test_degraded_fallback_preserves_gitea_memory_pressure() -> None:
snapshot = SimpleNamespace(
alert_info={
"alert_name": "GiteaMemoryPressure",
"labels": {
"alertname": "GiteaMemoryPressure",
"name": "gitea",
"host": "110",
},
},
evidence_summary="Gitea 記憶體工作集 > 85% limit",
)
assert _guess_category_from_snapshot(snapshot) == "GiteaMemoryPressure"
def test_degraded_fallback_uses_generic_memory_pressure_without_alertname() -> None:
snapshot = SimpleNamespace(
alert_info={"labels": {"layer": "docker", "container_name": "momo-pro-system"}},
evidence_summary="container_name=momo-pro-system layer=docker memory limit pressure",
)
assert _guess_category_from_snapshot(snapshot) == "DockerContainerMemoryPressure"
def test_degraded_fallback_handles_chinese_memory_summary_without_alertname() -> None:
snapshot = SimpleNamespace(
alert_info={"labels": {"layer": "docker", "container_name": "gitea"}},
evidence_summary="gitea 容器記憶體超過 limit 85%",
)
assert _guess_category_from_snapshot(snapshot) == "DockerContainerMemoryPressure"
def test_degraded_fallback_only_uses_kube_oom_for_k8s_oom_context() -> None:
snapshot = SimpleNamespace(
alert_info={"labels": {"pod": "awoooi-api-123"}},
evidence_summary="Kubernetes pod terminated with OOMKilled",
)
assert _guess_category_from_snapshot(snapshot) == "KubePodOOM"