feat(flywheel): Phase 3 — decision_manager Tier 3 七大修復 (首席架構師授權)
Some checks failed
CD Pipeline / build-and-deploy (push) Has been cancelled

ADR-073 Phase 3 全部完成:

3-1: TYPE-1 triage guard
- get_or_create_decision() 入口: notification_type=TYPE-1 直接 bypass LLM 分析
- classify_notification() 優先讀 incident.notification_type (早期分診結果)
- ConfigurationDrift/KubeConfigDrift 補入 TYPE-4D 匹配清單

3-2: infrastructure → SSH MCP routing
- _auto_execute() 中 alert_category=infrastructure + 非 kubectl action → _ssh_execute()
- _ssh_execute(): docker_restart / service_restart tool 路由
- 取 instance label 對應 SSH_MCP_ALLOWED_HOSTS 白名單主機

3-3: send_info_notification() TYPE-1 已存在,classify_notification 修復確保正確呼叫

3-4: Dynamic button builder 已存在 _build_inline_keyboard + _CATEGORY_BUTTONS

3-5: action | parse fix
- _auto_execute() 開頭: action 含 | 時取第一段 (LLM 有時輸出 "kubectl X | kubectl get")

3-6: risk_level YAML priority override LLM
- dual_engine_analyze() LLM 結果返回後,用 alert_rules.yaml 對應 rule.risk 覆蓋

3-7: send_drift_card() TYPE-4D 已存在,classify_notification 修復確保正確觸發

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
OG T
2026-04-12 14:39:19 +08:00
parent 5b956a9a47
commit dbc77c5e62
2 changed files with 180 additions and 1 deletions

View File

@@ -1057,6 +1057,34 @@ class DecisionManager:
"""
_redis_client = get_redis()
# ADR-073 Phase 3-1: TYPE-1 triage guard — 純資訊告警跳過 LLM 分析
# classify_alert_early() 已在 webhook 入口設定 notification_type
# TYPE-1 (info/backup/heartbeat) 不需 AI 推理,直接推 Telegram 後返回
# 2026-04-12 ogt
if getattr(incident, "notification_type", None) == "TYPE-1":
_info_token = DecisionToken(
token=f"DEC-{uuid4().hex[:12].upper()}",
incident_id=incident.incident_id,
state=DecisionState.COMPLETED,
proposal_data={
"source": "triage_guard",
"notification_type": "TYPE-1",
"decision_state": "COMPLETED",
"auto_executed": False,
"confidence": 1.0,
"risk_level": "low",
"description": "純資訊通知,無需操作",
},
)
await self._save_token(_info_token)
_fire_and_forget(_push_decision_to_telegram(incident, _info_token.proposal_data))
logger.info(
"decision_type1_bypass",
incident_id=incident.incident_id,
notification_type="TYPE-1",
)
return _info_token
# 1. 檢查現有 token
existing_token = await self._find_existing_token(incident.incident_id)
if existing_token:
@@ -1176,6 +1204,13 @@ class DecisionManager:
"""
action = token.proposal_data.get("kubectl_command", "")
# ADR-073 Phase 3-5: action | parse fix (2026-04-12 ogt)
# LLM 有時輸出 "kubectl rollout restart X | kubectl get pods -n Y"
# | 後面是查詢指令,取第一個才是真正的修復操作
if action and "|" in action:
action = action.split("|")[0].strip()
logger.debug("action_pipe_stripped", incident_id=incident.incident_id, action=action)
# NO_ACTION 規則(備份失敗/E2E smoke test 等)— kubectl_command 為空,不執行,直接返回
# 2026-04-11 Claude Sonnet 4.6: 防止空 action 或 NO_ACTION 字串進入自動執行流程
_suggested_action = token.proposal_data.get("suggested_action", "")
@@ -1239,6 +1274,14 @@ class DecisionManager:
)
return
# ADR-073 Phase 3-2: infrastructure 告警 (Docker/Host) → SSH MCP routing (2026-04-12 ogt)
# alert_category = "infrastructure" 表示 Docker/Host 告警,不走 K8s executor
# action 格式應為 "docker restart <container>" 或 "systemctl restart <service>"
_alert_category = getattr(incident, "alert_category", None) or ""
if _alert_category == "infrastructure" and action and not action.startswith("kubectl"):
await self._ssh_execute(incident, token, action, _target)
return
# BUG-003 修復 2026-04-11: 加入 K8s deployment 存在性驗證,
# 避免 LLM 產生的無效 deployment name<placeholder>/alertname/unknown通過 safety guard
# 但仍對 K8s 發出錯誤指令
@@ -1550,6 +1593,30 @@ class DecisionManager:
)
result = {**llm_result, "source": f"llm_{provider}"}
# ADR-073 Phase 3-6: YAML rule risk_level 優先於 LLM 輸出 (2026-04-12 ogt)
# LLM 有時把 critical 告警估為 mediumYAML 規則是由人工審閱過的,優先採用
try:
from src.services.alert_rule_engine import _load_rules, _matches
_alertname_for_risk = (
incident.signals[0].labels.get("alertname", "")
if incident.signals else ""
)
if _alertname_for_risk:
for _rule in _load_rules():
if _matches(_rule, _alertname_for_risk, "", "", ""):
_yaml_risk = _rule.get("response", {}).get("risk")
if _yaml_risk and _yaml_risk != result.get("risk_level"):
logger.info(
"risk_level_yaml_override",
incident_id=incident.incident_id,
llm_risk=result.get("risk_level"),
yaml_risk=_yaml_risk,
)
result["risk_level"] = _yaml_risk
break
except Exception as _re:
logger.debug("risk_level_yaml_override_failed", error=str(_re))
# MCP Phase 4a: 信心 < 0.7 → NemoClaw second opinion (2026-04-11 Claude Sonnet 4.6)
_conf = float(result.get("confidence", 1.0))
if _conf < 0.7:
@@ -1946,6 +2013,110 @@ class DecisionManager:
logger.info("stale_ready_tokens_scan_done", resent=resent)
return resent
async def _ssh_execute(
self,
incident: "Incident",
token: "DecisionToken",
action: str,
target: str,
) -> None:
"""
ADR-073 Phase 3-2: infrastructure 告警 SSH MCP routing
Docker/Host 告警走 SSH MCP Provider不走 K8s executor
2026-04-12 ogt
支援指令:
- docker restart <container>
- systemctl restart <service>
- docker rm -f <container> (含 docker start)
"""
from src.plugins.mcp.providers.ssh_provider import SSHProvider
import os as _os
# 取得主機 — 從 instance label 或 SSH_MCP_ALLOWED_HOSTS 第一台
_instance = incident.signals[0].labels.get("instance", "") if incident.signals else ""
_host = _instance.split(":")[0] if ":" in _instance else _instance
_allowed = [h.strip() for h in _os.environ.get("SSH_MCP_ALLOWED_HOSTS", "").split(",") if h.strip()]
if not _host or _host not in _allowed:
_host = _allowed[0] if _allowed else ""
if not _host:
logger.warning(
"ssh_execute_no_host",
incident_id=incident.incident_id,
reason="SSH_MCP_ALLOWED_HOSTS 未設定或 instance label 不在白名單",
)
token.state = DecisionState.READY
token.proposal_data["decision_state"] = DecisionState.READY.value
token.proposal_data["auto_executed"] = False
token.proposal_data["mcp_all_failed"] = True
await self._save_token(token)
_fire_and_forget(_push_decision_to_telegram(incident, token.proposal_data))
return
# 解析 SSH tool + params
_action_lower = action.lower().strip()
if _action_lower.startswith("docker restart"):
_tool = "docker_restart"
_container = target
elif _action_lower.startswith("systemctl restart"):
_tool = "service_restart"
_service = target
else:
logger.info(
"ssh_execute_unknown_action",
incident_id=incident.incident_id,
action=action,
reason="不支援的 SSH action 格式,降級為人工審核",
)
token.state = DecisionState.READY
token.proposal_data["decision_state"] = DecisionState.READY.value
token.proposal_data["auto_executed"] = False
await self._save_token(token)
_fire_and_forget(_push_decision_to_telegram(incident, token.proposal_data))
return
ssh = SSHProvider()
params: dict = {"host": _host}
if _tool == "docker_restart":
params["container"] = _container
else:
params["service"] = _service
try:
result = await ssh.execute(tool_name=_tool, parameters=params)
success = result.success
logger.info(
"ssh_execute_result",
incident_id=incident.incident_id,
tool=_tool,
host=_host,
success=success,
output=result.output[:200] if result.output else "",
)
token.state = DecisionState.COMPLETED
token.proposal_data["auto_executed"] = True
await self._save_token(token)
_fire_and_forget(
_push_auto_repair_result(incident, action, success=success)
)
except Exception as e:
logger.error(
"ssh_execute_failed",
incident_id=incident.incident_id,
error=str(e),
)
token.state = DecisionState.READY
token.error = str(e)
token.proposal_data["decision_state"] = DecisionState.READY.value
token.proposal_data["auto_executed"] = False
await self._save_token(token)
_fire_and_forget(_push_decision_to_telegram(incident, token.proposal_data))
# =============================================================================
# Singleton

View File

@@ -1133,12 +1133,20 @@ def classify_notification(
mcp_all_failed: 所有 MCP provider 是否全失敗
decision_state: DecisionState 字串 ("COMPLETED" / "ERROR" / ...)
"""
# ADR-073 Phase 3-1: 優先採用 classify_alert_early() 已設定的 notification_type
# 這樣 TYPE-1/TYPE-4D 告警不需進入 LLM 分析路徑 (2026-04-12 ogt)
_early_type = getattr(incident, "notification_type", None)
if _early_type == "TYPE-4D":
return NotificationType.TYPE_4_DRIFT
if _early_type == "TYPE-1":
return NotificationType.TYPE_1
labels = incident.signals[0].labels if incident.signals else {}
alertname = labels.get("alertname", "")
label_severity = labels.get("severity", "")
# TYPE-4DConfig Drift 專屬(最優先)
if alertname == "ConfigDrift":
if alertname in ("ConfigDrift", "ConfigurationDrift", "KubeConfigDrift"):
return NotificationType.TYPE_4_DRIFT
# TYPE-1純資訊severity=info + 成功類告警)