diff --git a/apps/api/src/agents/diagnostician_agent.py b/apps/api/src/agents/diagnostician_agent.py index f824f42d..0a65630f 100644 --- a/apps/api/src/agents/diagnostician_agent.py +++ b/apps/api/src/agents/diagnostician_agent.py @@ -227,12 +227,13 @@ Phase 4 動態異常偵測(AI 主動巡檢結果,可作為高信心佐證) latency_ms: int, reason: str = "unknown", ) -> DiagnosisReport: - """熔斷降級:rule-based mock(用 alert_category 作簡單假設)""" + """熔斷降級:只保留已知告警事實,不把 Docker/host memory 誤寫成 K8s OOM。""" category = _guess_category_from_snapshot(snapshot) + description = _build_degraded_description(snapshot, reason, category) return DiagnosisReport( hypotheses=[ Hypothesis( - description=f"[降級] 無法完成 LLM 分析(原因: {reason})。基於告警類別推測: {category}", + description=description, confidence=0.2, evidence_chain=[], category=category, @@ -300,11 +301,48 @@ def _extract_hypotheses(parsed: dict[str, Any]) -> list[Hypothesis]: return hypotheses +def _build_degraded_description( + snapshot: "EvidenceSnapshot", + reason: str, + category: str, +) -> str: + """組裝降級診斷文案,明確標示這不是 LLM 根因判定。""" + alert_name, labels = _alert_identity(snapshot) + parts = [f"[降級] 無法完成 LLM 分析(原因: {reason})"] + if alert_name: + parts.append(f"保留原始告警: {alert_name}") + target = _first_label(labels, "container_name", "name", "pod", "resource", "service") + host = _first_label(labels, "host", "exported_host", "instance") + if target: + parts.append(f"target={target}") + if host: + parts.append(f"host={host}") + parts.append(f"降級分類: {category}") + return ";".join(parts) + + def _guess_category_from_snapshot(snapshot: "EvidenceSnapshot") -> str: - """降級時從 snapshot 猜測告警類別(最粗粒度兜底)。""" + """降級時從 snapshot 推導保守分類,優先保留原始 alertname。""" + alert_name, labels = _alert_identity(snapshot) + if alert_name: + return alert_name + summary = (snapshot.evidence_summary or "").lower() - if "oom" in summary or "memory" in summary: + layer = str(labels.get("layer") or "").lower() + job = str(labels.get("job") or "").lower() + has_container = bool(_first_label(labels, "container_name", "container", "name")) + has_k8s_pod = bool(_first_label(labels, "pod")) or "k8s" in summary or "kubernetes" in summary + + has_memory_signal = _contains_memory_signal(summary) + + if has_memory_signal and ( + layer == "docker" or "cadvisor" in job or has_container + ): + return "DockerContainerMemoryPressure" + if "oom" in summary and has_k8s_pod: return "KubePodOOM" + if has_memory_signal: + return "MemoryPressure" if "crashloop" in summary: return "KubePodCrashLoop" if "disk" in summary: @@ -316,6 +354,56 @@ def _guess_category_from_snapshot(snapshot: "EvidenceSnapshot") -> str: return "Unknown" +def _alert_identity(snapshot: "EvidenceSnapshot") -> tuple[str, dict[str, Any]]: + """Extract alertname and labels from structured alert_info when available.""" + info = getattr(snapshot, "alert_info", None) or {} + labels = info.get("labels") if isinstance(info, dict) else {} + if not isinstance(labels, dict): + labels = {} + + alert_name = "" + if isinstance(info, dict): + alert_name = str(info.get("alert_name") or "").strip() + if not alert_name: + alert_name = str(labels.get("alertname") or "").strip() + if not alert_name: + alert_name = _extract_alertname_from_summary(getattr(snapshot, "evidence_summary", "") or "") + return alert_name, labels + + +def _contains_memory_signal(summary: str) -> bool: + return any(term in summary for term in ("memory", "mem", "記憶體", "內存")) + + +def _extract_alertname_from_summary(summary: str) -> str: + """Best-effort parse for older snapshots whose structured alert_info is absent.""" + marker = "'alert_name': '" + if marker in summary: + after = summary.split(marker, 1)[1] + return after.split("'", 1)[0].strip() + marker = '"alert_name": "' + if marker in summary: + after = summary.split(marker, 1)[1] + return after.split('"', 1)[0].strip() + marker = "'alertname': '" + if marker in summary: + after = summary.split(marker, 1)[1] + return after.split("'", 1)[0].strip() + marker = '"alertname": "' + if marker in summary: + after = summary.split(marker, 1)[1] + return after.split('"', 1)[0].strip() + return "" + + +def _first_label(labels: dict[str, Any], *keys: str) -> str: + for key in keys: + value = labels.get(key) + if value: + return str(value).strip() + return "" + + def compute_input_hash(snapshot: "EvidenceSnapshot") -> str: """計算 Diagnostician 輸入的 fingerprint(用於 AgentSession input_hash)。""" key = (snapshot.snapshot_id or "") + (snapshot.evidence_summary or "")[:100] diff --git a/apps/api/src/repositories/approval_repository.py b/apps/api/src/repositories/approval_repository.py index 496bc9c5..a25782af 100644 --- a/apps/api/src/repositories/approval_repository.py +++ b/apps/api/src/repositories/approval_repository.py @@ -16,7 +16,7 @@ from typing import Any from uuid import UUID import structlog -from sqlalchemy import select +from sqlalchemy import select, update from src.db.base import get_db_context from src.db.models import ApprovalRecord @@ -106,8 +106,6 @@ def _record_to_request(record: ApprovalRecord) -> ApprovalRequest: # B4 fix 2026-04-24 ogt + Claude Sonnet 4.6: 補回 DB 欄位(人工審核路徑讀回必要) incident_id=getattr(record, "incident_id", None), matched_playbook_id=getattr(record, "matched_playbook_id", None), - telegram_message_id=getattr(record, "telegram_message_id", None), - telegram_chat_id=getattr(record, "telegram_chat_id", None), ) @@ -153,7 +151,15 @@ class ApprovalDBRepository(IApprovalRepository): async def get_pending(self) -> list[ApprovalRequest]: """取得所有待審核的 Approval""" + now = datetime.now(UTC) async with get_db_context() as db: + await db.execute( + update(ApprovalRecord) + .where(ApprovalRecord.status == ApprovalStatus.PENDING) + .where(ApprovalRecord.expires_at < now) + .values(status=ApprovalStatus.EXPIRED, resolved_at=now) + ) + result = await db.execute( select(ApprovalRecord) .where(ApprovalRecord.status == ApprovalStatus.PENDING) diff --git a/apps/api/tests/test_approval_repository_pending_expiry.py b/apps/api/tests/test_approval_repository_pending_expiry.py new file mode 100644 index 00000000..196993b7 --- /dev/null +++ b/apps/api/tests/test_approval_repository_pending_expiry.py @@ -0,0 +1,46 @@ +import pytest + +from src.repositories import approval_repository as repo_mod + + +class _FakeScalars: + def all(self) -> list: + return [] + + +class _FakeResult: + def scalars(self) -> _FakeScalars: + return _FakeScalars() + + +class _FakeDb: + def __init__(self) -> None: + self.statements = [] + + async def execute(self, statement): + self.statements.append(statement) + return _FakeResult() + + +class _FakeDbContext: + def __init__(self, db: _FakeDb) -> None: + self.db = db + + async def __aenter__(self) -> _FakeDb: + return self.db + + async def __aexit__(self, exc_type, exc, tb) -> bool: + return False + + +@pytest.mark.asyncio +async def test_repository_get_pending_expires_old_pending_before_select(monkeypatch) -> None: + fake_db = _FakeDb() + monkeypatch.setattr(repo_mod, "get_db_context", lambda: _FakeDbContext(fake_db)) + + pending = await repo_mod.ApprovalDBRepository().get_pending() + + assert pending == [] + assert len(fake_db.statements) == 2 + assert fake_db.statements[0].__class__.__name__ == "Update" + assert fake_db.statements[1].__class__.__name__ == "Select" diff --git a/apps/api/tests/test_diagnostician_degraded_fallback.py b/apps/api/tests/test_diagnostician_degraded_fallback.py new file mode 100644 index 00000000..3076159b --- /dev/null +++ b/apps/api/tests/test_diagnostician_degraded_fallback.py @@ -0,0 +1,73 @@ +from types import SimpleNamespace + +from src.agents.diagnostician_agent import ( + _build_degraded_description, + _guess_category_from_snapshot, +) + + +def test_degraded_fallback_preserves_docker_memory_alertname() -> None: + snapshot = SimpleNamespace( + alert_info={ + "alert_name": "DockerContainerMemoryLimitPressure", + "labels": { + "alertname": "DockerContainerMemoryLimitPressure", + "layer": "docker", + "container_name": "gitea", + "host": "110", + }, + }, + evidence_summary="容器 gitea 記憶體超過 limit 85%", + ) + + category = _guess_category_from_snapshot(snapshot) + description = _build_degraded_description(snapshot, "step_timeout", category) + + assert category == "DockerContainerMemoryLimitPressure" + assert "KubePodOOM" not in description + assert "保留原始告警: DockerContainerMemoryLimitPressure" in description + assert "target=gitea" in description + assert "host=110" in description + + +def test_degraded_fallback_preserves_gitea_memory_pressure() -> None: + snapshot = SimpleNamespace( + alert_info={ + "alert_name": "GiteaMemoryPressure", + "labels": { + "alertname": "GiteaMemoryPressure", + "name": "gitea", + "host": "110", + }, + }, + evidence_summary="Gitea 記憶體工作集 > 85% limit", + ) + + assert _guess_category_from_snapshot(snapshot) == "GiteaMemoryPressure" + + +def test_degraded_fallback_uses_generic_memory_pressure_without_alertname() -> None: + snapshot = SimpleNamespace( + alert_info={"labels": {"layer": "docker", "container_name": "momo-pro-system"}}, + evidence_summary="container_name=momo-pro-system layer=docker memory limit pressure", + ) + + assert _guess_category_from_snapshot(snapshot) == "DockerContainerMemoryPressure" + + +def test_degraded_fallback_handles_chinese_memory_summary_without_alertname() -> None: + snapshot = SimpleNamespace( + alert_info={"labels": {"layer": "docker", "container_name": "gitea"}}, + evidence_summary="gitea 容器記憶體超過 limit 85%", + ) + + assert _guess_category_from_snapshot(snapshot) == "DockerContainerMemoryPressure" + + +def test_degraded_fallback_only_uses_kube_oom_for_k8s_oom_context() -> None: + snapshot = SimpleNamespace( + alert_info={"labels": {"pod": "awoooi-api-123"}}, + evidence_summary="Kubernetes pod terminated with OOMKilled", + ) + + assert _guess_category_from_snapshot(snapshot) == "KubePodOOM"