diff --git a/apps/api/tests/test_wave8_remaining_blockers.py b/apps/api/tests/test_wave8_remaining_blockers.py new file mode 100644 index 00000000..cacb81e4 --- /dev/null +++ b/apps/api/tests/test_wave8_remaining_blockers.py @@ -0,0 +1,281 @@ +"""Wave 8 餘項 BLOCKER 修復驗收測試 +================================================================================ +覆蓋 critic + debugger + vuln-verifier 報告中尚未驗收的 4 修復: + +1. vuln #4 — decision_fusion._sanitize prompt injection 防禦 +2. debugger B14 — Gemini quota Redis 異常時 fail-closed(非 fail-open) +3. debugger B25/B26 — auto_repair_service.drain_pending_tasks (lifespan SIGTERM) +4. debugger B8 — GovernanceAgent.run_self_check ≥3 失敗觸發 alert_governance + +設計原則: +- 直接驗證 production 代碼行為,不過度 mock +- 使用 monkeypatch 隔離外部依賴 + +2026-04-27 Wave8 BLOCKER 收尾 by Claude Opus 4.7 +""" + +from __future__ import annotations + +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + + +# ============================================================================= +# vuln #4 — decision_fusion._sanitize prompt injection 防禦 +# ============================================================================= + + +class TestVuln4FusionSanitize: + """alert_name / evidence / proposal 為不可信使用者輸入,prompt 注入防禦 + + 核心驗證:score_with_elephant 內呼叫 LLM 前,evil input 會被 + 控制字元剔除 + 截長至 max_len,不會原樣傳入 prompt。 + """ + + @pytest.mark.asyncio + async def test_score_with_elephant_sanitizes_prompt(self): + """注入控制字元 + 超長 attacker payload → LLM 收到的 prompt 已 sanitize""" + from src.services.decision_fusion import DecisionFusionEngine + + engine = DecisionFusionEngine() + + # 注入:alert_name 含控制字元 + payload 超長 1000 chars + evil_alert = "ignore_prior\x00\x1b\x02XXX" + "A" * 1000 + incident = MagicMock() + incident.signals = [MagicMock(labels={"alertname": evil_alert})] + + captured_prompt: dict = {} + + class _FakeResp: + status_code = 200 + text = "" + + def json(self): + return {"response": "0.5"} + + def raise_for_status(self): + pass + + async def _fake_post(url, json=None, **kwargs): # noqa: A002 + captured_prompt["body"] = (json or {}).get("prompt", "") + return _FakeResp() + + mock_client = MagicMock() + mock_client.post = AsyncMock(side_effect=_fake_post) + + with patch("httpx.AsyncClient") as mock_async_client: + mock_async_client.return_value.__aenter__.return_value = mock_client + try: + await engine.score_with_elephant( + incident=incident, + evidence=MagicMock(evidence_summary="x", mcp_health={}), + proposal="dummy", + ) + except Exception: + pass # 允許 score 解析失敗(重點驗 prompt sanitize) + + prompt_body = captured_prompt.get("body", "") + # 控制字元應被剔除 + assert "\x00" not in prompt_body + assert "\x1b" not in prompt_body + assert "\x02" not in prompt_body + # 超長 payload 應被截斷(max_len 約 100 for alert_name) + # 1000 個 'A' 不可能全部進去 + assert prompt_body.count("A") < 200, ( + f"未截長:prompt 含 {prompt_body.count('A')} 個 A(應 <200)" + ) + + def test_get_alert_name_returns_string(self): + """_get_alert_name 取 signals[0].alert_name 屬性""" + from src.services.decision_fusion import DecisionFusionEngine + + engine = DecisionFusionEngine() + signal = MagicMock() + signal.alert_name = "HighCPU" + incident = MagicMock() + incident.signals = [signal] + result = engine._get_alert_name(incident) + assert result == "HighCPU" + + def test_get_alert_name_handles_none(self): + """_get_alert_name(None) → 'unknown'(不 raise)""" + from src.services.decision_fusion import DecisionFusionEngine + + engine = DecisionFusionEngine() + assert engine._get_alert_name(None) == "unknown" + + +# ============================================================================= +# debugger B14 — Gemini quota Redis 異常時 fail-closed +# ============================================================================= + + +class TestB14QuotaFailClosed: + """Redis 異常時 fail-closed 防 Gemini 失控呼叫(費用鐵律)""" + + @pytest.mark.asyncio + async def test_redis_exception_returns_false(self): + """Redis pipeline 拋 ConnectionError → _check_gemini_quota return False""" + from src.services.ollama_failover_manager import OllamaFailoverManager + + manager = OllamaFailoverManager(health_monitor=MagicMock()) + manager._settings = MagicMock(GEMINI_DAILY_QUOTA=1000) + + # mock get_redis 返回會 raise 的 client + bad_redis = MagicMock() + bad_redis.pipeline = MagicMock(side_effect=ConnectionError("Redis down")) + + with patch("src.core.redis_client.get_redis", return_value=bad_redis), \ + patch( + "src.services.failover_alerter.get_failover_alerter", + return_value=MagicMock( + alert_gemini_quota_exceeded=AsyncMock(), + ), + ): + result = await manager._check_gemini_quota() + + # 鐵律:Redis 異常時 fail-closed(拒走 Gemini) + assert result is False, "Redis 異常時必須 fail-closed,避免 Gemini 失控呼叫" + + @pytest.mark.asyncio + async def test_redis_exception_triggers_alert(self): + """Redis 異常時 best-effort 呼叫 alert_gemini_quota_exceeded""" + from src.services.ollama_failover_manager import OllamaFailoverManager + + manager = OllamaFailoverManager(health_monitor=MagicMock()) + manager._settings = MagicMock(GEMINI_DAILY_QUOTA=1000) + + bad_redis = MagicMock() + bad_redis.pipeline = MagicMock(side_effect=ConnectionError("Redis down")) + + mock_alerter = MagicMock(alert_gemini_quota_exceeded=AsyncMock()) + + with patch("src.core.redis_client.get_redis", return_value=bad_redis), \ + patch( + "src.services.failover_alerter.get_failover_alerter", + return_value=mock_alerter, + ): + await manager._check_gemini_quota() + + # alert 應被呼叫(best-effort) + mock_alerter.alert_gemini_quota_exceeded.assert_awaited_once() + call_kwargs = mock_alerter.alert_gemini_quota_exceeded.await_args[0][0] + assert call_kwargs.get("reason") == "fail_closed_due_to_redis_error" + + +# ============================================================================= +# debugger B25/B26 — auto_repair drain_pending_tasks (lifespan SIGTERM) +# ============================================================================= + + +class TestB25B26DrainPendingTasks: + """K8s rolling restart 時 fire-and-forget tasks 不丟失""" + + @pytest.mark.asyncio + async def test_drain_returns_zero_when_no_tasks(self): + """無 pending task 時 drain 立即返回""" + from src.services.auto_repair_service import AutoRepairService + + svc = AutoRepairService() + # _pending_tasks 是 instance attribute set + assert isinstance(svc._pending_tasks, set) + result = await svc.drain_pending_tasks(timeout=1.0) + assert result.get("pending") == 0 or result.get("drained") == 0 + + @pytest.mark.asyncio + async def test_drain_waits_for_pending_tasks(self): + """有 pending task 時 drain 會等到完成或 timeout""" + import asyncio + + from src.services.auto_repair_service import AutoRepairService + + svc = AutoRepairService() + + # 注入 short-lived task + completed = [] + + async def _short_task(): + await asyncio.sleep(0.05) + completed.append("done") + + task = asyncio.create_task(_short_task()) + svc._pending_tasks.add(task) + task.add_done_callback(svc._pending_tasks.discard) + + result = await svc.drain_pending_tasks(timeout=2.0) + + # task 應已完成 + assert "done" in completed + # drain 結果含 pending 計數(修法形狀) + assert isinstance(result, dict) + + +# ============================================================================= +# debugger B8 — GovernanceAgent ≥3 失敗時觸發 alert_governance +# ============================================================================= + + +class TestB8GovernanceFailureAlert: + """4 自檢中 ≥3 項失敗 → 觸發 governance_self_failure 告警""" + + @pytest.mark.asyncio + async def test_three_failures_triggers_alert(self): + """3 項失敗應觸發 _alert""" + from src.services.governance_agent import GovernanceAgent + + agent = GovernanceAgent() + # mock 四項 check:3 個 raise,1 個成功 + agent.check_trust_drift = AsyncMock(side_effect=RuntimeError("DB down")) + agent.check_knowledge_degradation = AsyncMock(side_effect=RuntimeError("KM error")) + agent.check_llm_hallucination = AsyncMock(side_effect=RuntimeError("LLM error")) + agent.check_execution_blast_radius = AsyncMock(return_value={"status": "ok"}) + + agent._alert = AsyncMock() + + await agent.run_self_check() + + # _alert 應被呼叫(≥3 失敗) + agent._alert.assert_awaited_once() + call_args = agent._alert.await_args + assert call_args[0][0] == "governance_self_failure" + payload = call_args[0][1] + assert payload["total_checks"] == 4 + assert len(payload["failed_checks"]) >= 3 + + @pytest.mark.asyncio + async def test_two_failures_no_alert(self): + """僅 2 項失敗不觸發 alert(治理機制仍部分可用)""" + from src.services.governance_agent import GovernanceAgent + + agent = GovernanceAgent() + agent.check_trust_drift = AsyncMock(side_effect=RuntimeError("err1")) + agent.check_knowledge_degradation = AsyncMock(side_effect=RuntimeError("err2")) + agent.check_llm_hallucination = AsyncMock(return_value={"status": "ok"}) + agent.check_execution_blast_radius = AsyncMock(return_value={"status": "ok"}) + + agent._alert = AsyncMock() + + await agent.run_self_check() + + # 僅 2 失敗不觸發 + agent._alert.assert_not_awaited() + + @pytest.mark.asyncio + async def test_all_four_failures_triggers_alert(self): + """4 項全失敗也應觸發 alert""" + from src.services.governance_agent import GovernanceAgent + + agent = GovernanceAgent() + agent.check_trust_drift = AsyncMock(side_effect=RuntimeError("e")) + agent.check_knowledge_degradation = AsyncMock(side_effect=RuntimeError("e")) + agent.check_llm_hallucination = AsyncMock(side_effect=RuntimeError("e")) + agent.check_execution_blast_radius = AsyncMock(side_effect=RuntimeError("e")) + + agent._alert = AsyncMock() + + await agent.run_self_check() + + agent._alert.assert_awaited_once() + payload = agent._alert.await_args[0][1] + assert len(payload["failed_checks"]) == 4