feat(adr-076): Task 3.3 — SSH 修復 KM 萃取(補齊飛輪雙手)
Some checks failed
CD Pipeline / build-and-deploy (push) Has been cancelled

動機: SSH MCP 修復(docker restart/systemctl)成功後,KM 無法學習
因為 _extract_repair_steps 只處理 kubectl,SSH 路徑完全漏失。

approval_execution.py:
  - _trigger_playbook_extraction: 成功執行後將 approval.action 寫入
    incident.outcome.learning_notes,供 Playbook 萃取器讀取

playbook_service.py:
  - _parse_ssh_command(): 新增模組函式,解析 ssh [user@]host 'cmd' 格式
  - _extract_repair_steps(): 步驟 2 擴充 SSH 路徑分支
      ssh ... → ActionType.SSH_COMMAND + host 記錄
      kubectl ... → ActionType.KUBECTL(保留原有邏輯)
  - _generate_name(): SSH 修復自動加 [SSH] 前綴
  - _extract_tags(): SSH 修復自動加 ssh + host_layer 標籤

test_playbook_ssh_extraction.py: 18 tests(100% 通過)

飛輪雙手對齊:
  kubectl 路徑: decision_chain.reasoning_steps → KM  (既有)
  SSH 路徑: approval.action → learning_notes → KM  (Task 3.3 新增)

測試: 794 passed, 26 skipped, 0 failed

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
OG T
2026-04-14 15:19:54 +08:00
parent cc42aa0bdb
commit aae7c12645
3 changed files with 389 additions and 14 deletions

View File

@@ -581,6 +581,11 @@ class ApprovalExecutionService:
if incident.status not in [IncidentStatus.RESOLVED, IncidentStatus.CLOSED]:
incident.status = IncidentStatus.RESOLVED
incident.resolved_at = now_taipei()
# Task 3.3 (2026-04-14): 記錄執行動作供 SSH 路徑 KM 萃取
# approval.action 含實際執行指令(可能是 kubectl 或 ssh ...
# 寫入 learning_notes 供 playbook_service._extract_repair_steps 萃取 SSH RepairStep
if not incident.outcome.learning_notes and approval.action:
incident.outcome.learning_notes = approval.action
# 回存 Incidentfire-and-forget 路徑,失敗不影響主流程)
await incident_service.save_to_working_memory(incident)

View File

@@ -36,6 +36,33 @@ from src.utils.timezone import now_taipei
logger = structlog.get_logger(__name__)
import re as _re
def _parse_ssh_command(ssh_cmd: str) -> tuple[str, str]:
"""
從 SSH 指令字串中分離主機名與實際執行指令。
Task 3.3 (2026-04-14): SSH 修復 KM 萃取輔助函式
支援格式:
ssh 192.168.0.188 'docker restart minio'
ssh root@192.168.0.110 'systemctl restart ollama || docker restart ollama'
ssh {host} "cd /data/harbor && docker-compose up -d"
Returns:
(host, inner_command) — 無法解析時回傳 ("", original_cmd)
"""
m = _re.match(
r"ssh\s+(?:[a-zA-Z0-9_]+@)?([\w.\-:{}]+)\s+['\"](.+)['\"]",
ssh_cmd.strip(),
_re.DOTALL,
)
if m:
return m.group(1), m.group(2)
# fallback: 空 host保留完整命令
return "", ssh_cmd
class IPlaybookService(Protocol):
"""Playbook Service Interface"""
@@ -542,18 +569,25 @@ class PlaybookService:
)
def _extract_repair_steps(self, incident: Incident) -> list[RepairStep]:
"""從 Incident 萃取修復步驟"""
"""
從 Incident 萃取修復步驟
Task 3.3 (2026-04-14): 補齊 SSH 修復路徑。原本只處理 kubectl
新增 last_repair_action 作為第三優先來源,支援 SSH_COMMAND 類型。
優先順序:
1. decision_chain.reasoning_steps — kubectl 命令AI 推論步驟)
2. outcome.learning_notes — kubectl 命令(人工補充)
3. outcome.last_repair_action — SSH 或 kubectl實際執行動作Task 3.3 新增)
"""
steps: list[RepairStep] = []
step_number = 1
# 從 decision_chain.reasoning_steps 提取 kubectl 命令
# 1. 從 decision_chain.reasoning_steps 提取 kubectl 命令
if incident.decision_chain and incident.decision_chain.reasoning_steps:
for reasoning in incident.decision_chain.reasoning_steps:
# 尋找包含 kubectl 的步驟
if "kubectl" in reasoning.lower():
# 嘗試提取 kubectl 命令
import re
kubectl_match = re.search(r"kubectl\s+\S+.*", reasoning)
kubectl_match = _re.search(r"kubectl\s+\S+.*", reasoning)
if kubectl_match:
steps.append(
RepairStep(
@@ -565,18 +599,49 @@ class PlaybookService:
)
step_number += 1
# 如果沒有從 reasoning_steps 取得,嘗試從 learning_notes 取得
# 2. Task 3.3: 從 learning_notes 萃取 kubectl 或 SSH 命令
# learning_notes 由兩個來源寫入:
# a. 人工補充筆記(既有邏輯)
# b. approval_execution._trigger_playbook_extraction 寫入 approval.actionTask 3.3 新增)
if not steps and incident.outcome and incident.outcome.learning_notes:
notes = incident.outcome.learning_notes
if "kubectl" in notes.lower():
notes = incident.outcome.learning_notes.strip()
if notes.startswith("ssh "):
# SSH 修復路徑Task 3.3 新增)
host, inner_cmd = _parse_ssh_command(notes)
steps.append(
RepairStep(
step_number=1,
action_type=ActionType.KUBECTL,
command=notes,
action_type=ActionType.SSH_COMMAND,
command=inner_cmd or notes,
risk_level=RiskLevel.MEDIUM,
)
)
logger.info(
"playbook_ssh_step_extracted",
host=host or "unknown",
inner_cmd_preview=(inner_cmd or notes)[:60],
)
elif "kubectl" in notes.lower():
# kubectl 路徑(原有邏輯,移入此區塊統一處理)
kubectl_match = _re.search(r"kubectl\s+\S+.*", notes)
if kubectl_match:
steps.append(
RepairStep(
step_number=1,
action_type=ActionType.KUBECTL,
command=kubectl_match.group(0).strip(),
risk_level=RiskLevel.MEDIUM,
)
)
else:
steps.append(
RepairStep(
step_number=1,
action_type=ActionType.KUBECTL,
command=notes,
risk_level=RiskLevel.MEDIUM,
)
)
return steps
@@ -598,12 +663,16 @@ class PlaybookService:
return min(base_score + effectiveness_bonus, 1.0)
def _generate_name(self, incident: Incident) -> str:
"""生成 Playbook 名稱"""
"""生成 Playbook 名稱Task 3.3: SSH 修復加 [SSH] 前綴)"""
alert_name = incident.signals[0].alert_name if incident.signals else "Unknown"
services = incident.affected_services[:2] if incident.affected_services else []
service_str = "/".join(services) if services else "system"
return f"{alert_name} - {service_str} 修復劇本"
# 偵測 SSH 修復路徑 — 加前綴以利搜尋與過濾Task 3.3
notes = (incident.outcome.learning_notes or "") if incident.outcome else ""
prefix = "[SSH] " if notes.strip().startswith("ssh ") else ""
return f"{prefix}{alert_name} - {service_str} 修復劇本"
def _generate_description(self, incident: Incident) -> str:
"""生成 Playbook 描述"""
@@ -622,7 +691,7 @@ class PlaybookService:
return ". ".join(parts) if parts else "從成功案例自動萃取的修復劇本"
def _extract_tags(self, incident: Incident) -> list[str]:
"""萃取標籤"""
"""萃取標籤Task 3.3: SSH 修復自動加 ssh 標籤)"""
tags: set[str] = set()
# 從服務名稱提取
@@ -641,6 +710,12 @@ class PlaybookService:
if "network" in signal.alert_name.lower():
tags.add("network")
# Task 3.3: SSH 修復加標籤learning_notes 以 ssh 開頭 → 主機層修復)
notes = (incident.outcome.learning_notes or "") if incident.outcome else ""
if notes.strip().startswith("ssh "):
tags.add("ssh")
tags.add("host_layer")
return list(tags)[:10]
def _find_matched_symptoms(

View File

@@ -0,0 +1,295 @@
"""
Playbook SSH 修復 KM 萃取測試
=================================
Task 3.3: SSH 修復路徑 Playbook 萃取
測試範圍:
- _parse_ssh_command() — 各種 SSH 格式解析
- PlaybookService._extract_repair_steps() — SSH / kubectl 路徑
- PlaybookService._generate_name() — [SSH] 前綴
- PlaybookService._extract_tags() — ssh / host_layer 標籤
🔴 遵循「禁止 Mock 測試鐵律」
- 純 Python 邏輯:不需要 DB/Redis/Telegram
- 使用真實模型物件,不 Mock
建立: 2026-04-14 (台北時區) Claude Sonnet 4.6 (Task 3.3)
"""
from datetime import datetime, timezone
import pytest
from src.models.incident import AIDecisionChain, Incident, IncidentOutcome, Severity, Signal
from src.models.playbook import ActionType, RiskLevel
from src.services.playbook_service import PlaybookService, _parse_ssh_command
_TZ_TAIPEI = timezone.utc # 測試用 UTC不影響邏輯
def _make_incident(**kwargs) -> Incident:
"""建立最小化 Incident純記憶體"""
now = datetime.now(_TZ_TAIPEI)
signal = Signal(
alert_name=kwargs.pop("alert_name", "MinioDown"),
severity=Severity.P2,
source="alertmanager",
fired_at=now,
labels={},
annotations={},
)
return Incident(
incident_id=kwargs.pop("incident_id", "INC-20260414-TEST"),
title=kwargs.pop("title", "Test Incident"),
severity=Severity.P2,
signals=[signal],
affected_services=kwargs.pop("affected_services", ["minio"]),
proposal_ids=[],
**kwargs,
)
# =============================================================================
# _parse_ssh_command
# =============================================================================
class TestParseSshCommand:
"""SSH 指令解析邏輯"""
@pytest.mark.parametrize("cmd,expected_host,expected_inner", [
(
"ssh 192.168.0.188 'docker restart minio'",
"192.168.0.188",
"docker restart minio",
),
(
"ssh root@192.168.0.110 'systemctl restart ollama || docker restart ollama'",
"192.168.0.110",
"systemctl restart ollama || docker restart ollama",
),
(
"ssh {host} \"cd /data/harbor && docker-compose up -d\"",
"{host}",
"cd /data/harbor && docker-compose up -d",
),
])
def test_parse_standard_ssh(self, cmd, expected_host, expected_inner):
"""標準 SSH 格式解析"""
host, inner = _parse_ssh_command(cmd)
assert host == expected_host
assert inner == expected_inner
def test_parse_unrecognized_returns_empty_host(self):
"""無法解析的格式 → 空 host保留原始命令"""
cmd = "ssh host_without_quotes do_something"
host, inner = _parse_ssh_command(cmd)
assert host == ""
assert inner == cmd
def test_parse_empty_string(self):
"""空字串不崩潰"""
host, inner = _parse_ssh_command("")
assert host == ""
# =============================================================================
# _extract_repair_steps — SSH path (last_repair_action)
# =============================================================================
class TestExtractRepairStepsSSH:
"""SSH 修復路徑萃取Task 3.3 新增)"""
def _svc(self) -> PlaybookService:
return PlaybookService.__new__(PlaybookService) # 不觸發 __init__
def test_ssh_command_in_last_repair_action(self):
"""last_repair_action 含 SSH → ActionType.SSH_COMMAND"""
incident = _make_incident()
incident.outcome = IncidentOutcome()
incident.outcome.learning_notes = "ssh 192.168.0.188 'docker restart minio'"
svc = self._svc()
steps = svc._extract_repair_steps(incident)
assert len(steps) == 1
assert steps[0].action_type == ActionType.SSH_COMMAND
assert "docker restart minio" in steps[0].command
def test_ssh_fallback_when_no_decision_chain(self):
"""無 decision_chain 時SSH last_repair_action 能補位"""
incident = _make_incident()
incident.decision_chain = None
incident.outcome = IncidentOutcome()
incident.outcome.learning_notes = "ssh root@192.168.0.110 'systemctl restart ollama'"
svc = self._svc()
steps = svc._extract_repair_steps(incident)
assert steps
assert steps[0].action_type == ActionType.SSH_COMMAND
def test_kubectl_in_last_repair_action(self):
"""last_repair_action 含 kubectl → ActionType.KUBECTL"""
incident = _make_incident()
incident.outcome = IncidentOutcome()
incident.outcome.learning_notes = (
"kubectl rollout restart deployment/awoooi-api -n awoooi-prod"
)
svc = self._svc()
steps = svc._extract_repair_steps(incident)
assert len(steps) == 1
assert steps[0].action_type == ActionType.KUBECTL
assert "kubectl rollout restart" in steps[0].command
def test_decision_chain_takes_priority_over_learning_notes(self):
"""decision_chain.reasoning_steps 有 kubectl → learning_notes SSH 不覆蓋"""
now = datetime.now(_TZ_TAIPEI)
incident = _make_incident()
incident.decision_chain = AIDecisionChain(
model_used="deepseek-r1:14b",
hypothesis="Pod crash",
confidence=0.85,
reasoning_steps=["kubectl rollout restart deployment/test -n prod"],
inference_started_at=now,
inference_completed_at=now,
latency_ms=100,
)
incident.outcome = IncidentOutcome()
incident.outcome.learning_notes = "ssh 192.168.0.188 'docker restart minio'"
svc = self._svc()
steps = svc._extract_repair_steps(incident)
# decision_chain 優先
assert steps[0].action_type == ActionType.KUBECTL
def test_no_action_returns_empty(self):
"""無任何修復來源 → 空列表"""
incident = _make_incident()
incident.decision_chain = None
incident.outcome = None
svc = self._svc()
steps = svc._extract_repair_steps(incident)
assert steps == []
def test_ssh_step_risk_level_is_medium(self):
"""SSH 步驟預設 MEDIUM 風險"""
incident = _make_incident()
incident.outcome = IncidentOutcome()
incident.outcome.learning_notes = "ssh {host} 'docker restart minio'"
svc = self._svc()
steps = svc._extract_repair_steps(incident)
assert steps[0].risk_level == RiskLevel.MEDIUM
def test_ssh_step_number_is_1(self):
"""SSH 步驟編號從 1 開始"""
incident = _make_incident()
incident.outcome = IncidentOutcome()
incident.outcome.learning_notes = "ssh 192.168.0.188 'docker restart minio'"
svc = self._svc()
steps = svc._extract_repair_steps(incident)
assert steps[0].step_number == 1
# =============================================================================
# _generate_name — [SSH] 前綴
# =============================================================================
class TestGenerateNameSSH:
"""SSH 修復時名稱包含 [SSH] 前綴"""
def _svc(self) -> PlaybookService:
return PlaybookService.__new__(PlaybookService)
def test_ssh_name_has_prefix(self):
"""SSH 修復 → 名稱含 [SSH]"""
incident = _make_incident(alert_name="MinioDown", affected_services=["minio"])
incident.outcome = IncidentOutcome()
incident.outcome.learning_notes = "ssh 192.168.0.188 'docker restart minio'"
svc = self._svc()
name = svc._generate_name(incident)
assert name.startswith("[SSH]")
assert "MinioDown" in name
def test_kubectl_name_no_prefix(self):
"""kubectl 修復 → 名稱無 [SSH] 前綴"""
incident = _make_incident(alert_name="KubePodCrashLooping")
incident.outcome = IncidentOutcome()
incident.outcome.learning_notes = (
"kubectl rollout restart deployment/awoooi-api -n prod"
)
svc = self._svc()
name = svc._generate_name(incident)
assert not name.startswith("[SSH]")
def test_no_outcome_no_prefix(self):
"""無 outcome → 名稱無前綴"""
incident = _make_incident()
incident.outcome = None
svc = self._svc()
name = svc._generate_name(incident)
assert not name.startswith("[SSH]")
# =============================================================================
# _extract_tags — ssh / host_layer 標籤
# =============================================================================
class TestExtractTagsSSH:
"""SSH 修復時自動加 ssh/host_layer 標籤"""
def _svc(self) -> PlaybookService:
return PlaybookService.__new__(PlaybookService)
def test_ssh_tags_added(self):
"""SSH last_repair_action → tags 含 ssh + host_layer"""
incident = _make_incident(affected_services=["minio"])
incident.outcome = IncidentOutcome()
incident.outcome.learning_notes = "ssh 192.168.0.188 'docker restart minio'"
svc = self._svc()
tags = svc._extract_tags(incident)
assert "ssh" in tags
assert "host_layer" in tags
def test_non_ssh_no_ssh_tag(self):
"""kubectl 修復 → 無 ssh 標籤"""
incident = _make_incident()
incident.outcome = IncidentOutcome()
incident.outcome.learning_notes = (
"kubectl rollout restart deployment/awoooi-api -n prod"
)
svc = self._svc()
tags = svc._extract_tags(incident)
assert "ssh" not in tags
def test_no_outcome_no_ssh_tag(self):
"""無 outcome → 無 ssh 標籤"""
incident = _make_incident()
incident.outcome = None
svc = self._svc()
tags = svc._extract_tags(incident)
assert "ssh" not in tags