diff --git a/apps/api/src/services/decision_manager.py b/apps/api/src/services/decision_manager.py index 4e0d5b99..fdc6a613 100644 --- a/apps/api/src/services/decision_manager.py +++ b/apps/api/src/services/decision_manager.py @@ -1057,6 +1057,34 @@ class DecisionManager: """ _redis_client = get_redis() + # ADR-073 Phase 3-1: TYPE-1 triage guard — 純資訊告警跳過 LLM 分析 + # classify_alert_early() 已在 webhook 入口設定 notification_type + # TYPE-1 (info/backup/heartbeat) 不需 AI 推理,直接推 Telegram 後返回 + # 2026-04-12 ogt + if getattr(incident, "notification_type", None) == "TYPE-1": + _info_token = DecisionToken( + token=f"DEC-{uuid4().hex[:12].upper()}", + incident_id=incident.incident_id, + state=DecisionState.COMPLETED, + proposal_data={ + "source": "triage_guard", + "notification_type": "TYPE-1", + "decision_state": "COMPLETED", + "auto_executed": False, + "confidence": 1.0, + "risk_level": "low", + "description": "純資訊通知,無需操作", + }, + ) + await self._save_token(_info_token) + _fire_and_forget(_push_decision_to_telegram(incident, _info_token.proposal_data)) + logger.info( + "decision_type1_bypass", + incident_id=incident.incident_id, + notification_type="TYPE-1", + ) + return _info_token + # 1. 檢查現有 token existing_token = await self._find_existing_token(incident.incident_id) if existing_token: @@ -1176,6 +1204,13 @@ class DecisionManager: """ action = token.proposal_data.get("kubectl_command", "") + # ADR-073 Phase 3-5: action | parse fix (2026-04-12 ogt) + # LLM 有時輸出 "kubectl rollout restart X | kubectl get pods -n Y" + # | 後面是查詢指令,取第一個才是真正的修復操作 + if action and "|" in action: + action = action.split("|")[0].strip() + logger.debug("action_pipe_stripped", incident_id=incident.incident_id, action=action) + # NO_ACTION 規則(備份失敗/E2E smoke test 等)— kubectl_command 為空,不執行,直接返回 # 2026-04-11 Claude Sonnet 4.6: 防止空 action 或 NO_ACTION 字串進入自動執行流程 _suggested_action = token.proposal_data.get("suggested_action", "") @@ -1239,6 +1274,14 @@ class DecisionManager: ) return + # ADR-073 Phase 3-2: infrastructure 告警 (Docker/Host) → SSH MCP routing (2026-04-12 ogt) + # alert_category = "infrastructure" 表示 Docker/Host 告警,不走 K8s executor + # action 格式應為 "docker restart " 或 "systemctl restart " + _alert_category = getattr(incident, "alert_category", None) or "" + if _alert_category == "infrastructure" and action and not action.startswith("kubectl"): + await self._ssh_execute(incident, token, action, _target) + return + # BUG-003 修復 2026-04-11: 加入 K8s deployment 存在性驗證, # 避免 LLM 產生的無效 deployment name(/alertname/unknown)通過 safety guard # 但仍對 K8s 發出錯誤指令 @@ -1550,6 +1593,30 @@ class DecisionManager: ) result = {**llm_result, "source": f"llm_{provider}"} + # ADR-073 Phase 3-6: YAML rule risk_level 優先於 LLM 輸出 (2026-04-12 ogt) + # LLM 有時把 critical 告警估為 medium,YAML 規則是由人工審閱過的,優先採用 + try: + from src.services.alert_rule_engine import _load_rules, _matches + _alertname_for_risk = ( + incident.signals[0].labels.get("alertname", "") + if incident.signals else "" + ) + if _alertname_for_risk: + for _rule in _load_rules(): + if _matches(_rule, _alertname_for_risk, "", "", ""): + _yaml_risk = _rule.get("response", {}).get("risk") + if _yaml_risk and _yaml_risk != result.get("risk_level"): + logger.info( + "risk_level_yaml_override", + incident_id=incident.incident_id, + llm_risk=result.get("risk_level"), + yaml_risk=_yaml_risk, + ) + result["risk_level"] = _yaml_risk + break + except Exception as _re: + logger.debug("risk_level_yaml_override_failed", error=str(_re)) + # MCP Phase 4a: 信心 < 0.7 → NemoClaw second opinion (2026-04-11 Claude Sonnet 4.6) _conf = float(result.get("confidence", 1.0)) if _conf < 0.7: @@ -1946,6 +2013,110 @@ class DecisionManager: logger.info("stale_ready_tokens_scan_done", resent=resent) return resent + async def _ssh_execute( + self, + incident: "Incident", + token: "DecisionToken", + action: str, + target: str, + ) -> None: + """ + ADR-073 Phase 3-2: infrastructure 告警 SSH MCP routing + Docker/Host 告警走 SSH MCP Provider,不走 K8s executor + 2026-04-12 ogt + + 支援指令: + - docker restart + - systemctl restart + - docker rm -f (含 docker start) + """ + from src.plugins.mcp.providers.ssh_provider import SSHProvider + import os as _os + + # 取得主機 — 從 instance label 或 SSH_MCP_ALLOWED_HOSTS 第一台 + _instance = incident.signals[0].labels.get("instance", "") if incident.signals else "" + _host = _instance.split(":")[0] if ":" in _instance else _instance + _allowed = [h.strip() for h in _os.environ.get("SSH_MCP_ALLOWED_HOSTS", "").split(",") if h.strip()] + if not _host or _host not in _allowed: + _host = _allowed[0] if _allowed else "" + + if not _host: + logger.warning( + "ssh_execute_no_host", + incident_id=incident.incident_id, + reason="SSH_MCP_ALLOWED_HOSTS 未設定或 instance label 不在白名單", + ) + token.state = DecisionState.READY + token.proposal_data["decision_state"] = DecisionState.READY.value + token.proposal_data["auto_executed"] = False + token.proposal_data["mcp_all_failed"] = True + await self._save_token(token) + _fire_and_forget(_push_decision_to_telegram(incident, token.proposal_data)) + return + + # 解析 SSH tool + params + _action_lower = action.lower().strip() + if _action_lower.startswith("docker restart"): + _tool = "docker_restart" + _container = target + elif _action_lower.startswith("systemctl restart"): + _tool = "service_restart" + _service = target + else: + logger.info( + "ssh_execute_unknown_action", + incident_id=incident.incident_id, + action=action, + reason="不支援的 SSH action 格式,降級為人工審核", + ) + token.state = DecisionState.READY + token.proposal_data["decision_state"] = DecisionState.READY.value + token.proposal_data["auto_executed"] = False + await self._save_token(token) + _fire_and_forget(_push_decision_to_telegram(incident, token.proposal_data)) + return + + ssh = SSHProvider() + params: dict = {"host": _host} + if _tool == "docker_restart": + params["container"] = _container + else: + params["service"] = _service + + try: + result = await ssh.execute(tool_name=_tool, parameters=params) + success = result.success + + logger.info( + "ssh_execute_result", + incident_id=incident.incident_id, + tool=_tool, + host=_host, + success=success, + output=result.output[:200] if result.output else "", + ) + + token.state = DecisionState.COMPLETED + token.proposal_data["auto_executed"] = True + await self._save_token(token) + + _fire_and_forget( + _push_auto_repair_result(incident, action, success=success) + ) + + except Exception as e: + logger.error( + "ssh_execute_failed", + incident_id=incident.incident_id, + error=str(e), + ) + token.state = DecisionState.READY + token.error = str(e) + token.proposal_data["decision_state"] = DecisionState.READY.value + token.proposal_data["auto_executed"] = False + await self._save_token(token) + _fire_and_forget(_push_decision_to_telegram(incident, token.proposal_data)) + # ============================================================================= # Singleton diff --git a/apps/api/src/services/telegram_gateway.py b/apps/api/src/services/telegram_gateway.py index 6f5f118e..07c794d2 100644 --- a/apps/api/src/services/telegram_gateway.py +++ b/apps/api/src/services/telegram_gateway.py @@ -1133,12 +1133,20 @@ def classify_notification( mcp_all_failed: 所有 MCP provider 是否全失敗 decision_state: DecisionState 字串 ("COMPLETED" / "ERROR" / ...) """ + # ADR-073 Phase 3-1: 優先採用 classify_alert_early() 已設定的 notification_type + # 這樣 TYPE-1/TYPE-4D 告警不需進入 LLM 分析路徑 (2026-04-12 ogt) + _early_type = getattr(incident, "notification_type", None) + if _early_type == "TYPE-4D": + return NotificationType.TYPE_4_DRIFT + if _early_type == "TYPE-1": + return NotificationType.TYPE_1 + labels = incident.signals[0].labels if incident.signals else {} alertname = labels.get("alertname", "") label_severity = labels.get("severity", "") # TYPE-4D:Config Drift 專屬(最優先) - if alertname == "ConfigDrift": + if alertname in ("ConfigDrift", "ConfigurationDrift", "KubeConfigDrift"): return NotificationType.TYPE_4_DRIFT # TYPE-1:純資訊(severity=info + 成功類告警)