承接 Wave 6/7/8 多 engineer 在 agent 限額前完成的代碼,補 commit 解 production HEAD 隱性 import error(decision_fusion 已被 decision_manager 引用但檔案 untracked)。 新增(後端核心): - decision_fusion.py (562 行) — P2.1 方法 III(OpenClaw + Hermes + Elephant 三 LLM 融合) - aiops_timeline.py + aiops_timeline_service.py — critic B4 修復 /api/v1/aiops/timeline endpoint,DB 存取抽到 service 層遵守 leWOOOgo 積木化 - migrations/p2_decision_fusion_columns.sql + rollback — approval_records fusion 欄位 修改(後端整合): - decision_manager.py — fusion 三斷鏈修補(critic B1+B2+B3): · B1: 寫 _evidence_snapshot_ref 到 token.proposal_data · B2: fusion 前計算 complexity_score 並寫 token · B3: fusion composite 寫 token.proposal_data["decision_fusion"] - auto_approve.py — fusion + consensus 認識(critic B3+B5): · composite > 0.7 → auto_execute_eligible bypass min_confidence · source=consensus_engine + score>=0.6 → 規則可信路徑 - consensus_engine.py — db-fix _save_consensus 重用 agent_sessions - governance_agent.py — db-fix _alert PG 寫入 ai_governance_events - approval_db.py — fusion 3 欄位 + 2 partial index + CheckConstraint - db/models.py — schema 對齊 migration - core/config.py — vuln #1 修復:OLLAMA_URL/_FALLBACK_URL field_validator 拒絕公網 IP + 外部域名,僅允許私網/loopback/K8s SVC 白名單 - core/feature_flags.py — P2 fusion + consensus flags - main.py — governance_agent lifespan 啟動 - failover_alerter.py — Wave8-X2: in-memory dedup fallback(Redis 拒絕後不 fail-open) - ollama_*.py — metrics 整合 + recovery 改善 - auto_repair_service.py — verifier 接線 新增(測試 2438 行): - test_decision_fusion.py / test_governance_agent.py / test_consensus_integration.py - test_p2_db_fixes.py / test_wave8_fusion_fixes.py - test_config_url_validation.py(vuln #1 12 tests) - test_failover_alerter.py +Wave8-X2 in-memory dedup 補測 驗收: 116 tests pass (decision_fusion + wave8_fusion + config_url + consensus + governance + p2_db_fixes + failover_alerter) Conflict resolution: - 3 檔(config.py + auto_approve.py + decision_manager.py)git stash pop 衝突 保留 stashed (engineer 最終版),補回 ValueError 「公網 IP」字樣對齊 test Note: 此 commit 解 production HEAD 隱性 import error 仍未修: vuln #4 prompt injection / debugger B14 quota fail-closed / B25-B26 drain_pending_tasks / B8 governance fail alert Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Co-Authored-By: Multiple Engineers (Wave 6/7/8) <noreply@anthropic.com>
273 lines
9.7 KiB
Python
273 lines
9.7 KiB
Python
"""
|
||
test_consensus_integration.py — P2.4 12-Agent Consensus 整合測試
|
||
================================================================
|
||
測試覆蓋:
|
||
1. ENABLE_12AGENT_CONSENSUS=False → 不走 Consensus(走原有雙軌路徑)
|
||
2. ENABLE_12AGENT_CONSENSUS=True + P0 → Consensus 被呼叫
|
||
3. ENABLE_12AGENT_CONSENSUS=True + P2(非 P0/P1)→ 不走 Consensus
|
||
4. Consensus 共識分數 ≥0.6 → 回傳 READY token,risk_level="medium"
|
||
5. Consensus 拋例外 → fallback 到 expert_analyze,不阻斷主路由
|
||
|
||
測試類型: unit(mock ConsensusEngine,lazy import 透過 patch consensus_engine 模組)
|
||
|
||
2026-04-26 P2.4 by Claude — 12-Agent Consensus 整合測試
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
from datetime import datetime, timezone
|
||
from unittest.mock import AsyncMock, MagicMock, patch
|
||
|
||
import pytest
|
||
|
||
from src.models.incident import Incident, Severity, Signal
|
||
from src.services.consensus_engine import ConsensusResult
|
||
|
||
|
||
# =============================================================================
|
||
# Fixtures
|
||
# =============================================================================
|
||
|
||
|
||
def _make_signal(alert_name: str = "HighCPUUsage") -> Signal:
|
||
return Signal(
|
||
alert_name=alert_name,
|
||
severity=Severity.P0,
|
||
source="prometheus",
|
||
fired_at=datetime.now(timezone.utc),
|
||
)
|
||
|
||
|
||
def _make_incident(severity: Severity = Severity.P0) -> Incident:
|
||
return Incident(
|
||
severity=severity,
|
||
signals=[_make_signal()],
|
||
affected_services=["api"],
|
||
)
|
||
|
||
|
||
def _make_consensus_result(consensus_score: float = 0.75) -> ConsensusResult:
|
||
from src.services.consensus_engine import AgentOpinion, AgentType
|
||
|
||
opinion = AgentOpinion(
|
||
agent_type=AgentType.SRE,
|
||
action="重新啟動服務",
|
||
reasoning="SRE 分析: 需要重啟",
|
||
confidence=0.8,
|
||
risk_assessment="medium",
|
||
kubectl_command="kubectl rollout restart deployment/api",
|
||
priority=8,
|
||
)
|
||
return ConsensusResult(
|
||
consensus_id="CON-TEST-001",
|
||
incident_id="INC-TEST-001",
|
||
opinions=[opinion],
|
||
consensus_score=consensus_score,
|
||
recommended_action="重新啟動服務",
|
||
recommended_kubectl="kubectl rollout restart deployment/api",
|
||
final_reasoning="整合意見: 重啟",
|
||
risk_level="medium" if consensus_score >= 0.6 else "critical",
|
||
dissenting_opinions=[],
|
||
)
|
||
|
||
|
||
def _make_dm_with_mocks():
|
||
"""建立帶有必要 mock 的 DecisionManager(不觸發 __init__)"""
|
||
from src.services.decision_manager import DecisionManager
|
||
|
||
dm = DecisionManager.__new__(DecisionManager)
|
||
dm._save_token = AsyncMock()
|
||
dm._find_existing_token = AsyncMock(return_value=None)
|
||
return dm
|
||
|
||
|
||
# =============================================================================
|
||
# Test 1: ENABLE_12AGENT_CONSENSUS=False → 不走 Consensus
|
||
# =============================================================================
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_consensus_disabled_skips_consensus_engine():
|
||
"""flag=False 時,get_or_create_decision_with_consensus 應直接走 get_or_create_decision"""
|
||
incident = _make_incident(Severity.P0)
|
||
|
||
with patch("src.services.decision_manager.settings") as mock_settings:
|
||
mock_settings.ENABLE_12AGENT_CONSENSUS = False
|
||
|
||
dm = _make_dm_with_mocks()
|
||
mock_token = MagicMock()
|
||
dm.get_or_create_decision = AsyncMock(return_value=mock_token)
|
||
|
||
result = await dm.get_or_create_decision_with_consensus(
|
||
incident=incident, timeout_sec=30.0
|
||
)
|
||
|
||
dm.get_or_create_decision.assert_awaited_once_with(incident, 30.0)
|
||
assert result is mock_token
|
||
|
||
|
||
# =============================================================================
|
||
# Test 2: ENABLE_12AGENT_CONSENSUS=True + P0 → Consensus 被呼叫
|
||
# =============================================================================
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_consensus_enabled_p0_calls_consensus_engine():
|
||
"""flag=True + P0 事件 → ConsensusEngine.run_consensus 必須被呼叫"""
|
||
from src.services.decision_manager import DecisionState
|
||
|
||
incident = _make_incident(Severity.P0)
|
||
consensus_result = _make_consensus_result(consensus_score=0.75)
|
||
|
||
mock_consensus_engine = AsyncMock()
|
||
mock_consensus_engine.run_consensus = AsyncMock(return_value=consensus_result)
|
||
|
||
with (
|
||
patch("src.services.decision_manager.settings") as mock_settings,
|
||
# decision_manager 用 lazy import:from src.services.consensus_engine import get_consensus_engine
|
||
# 所以要 patch consensus_engine 模組本身的 singleton
|
||
patch(
|
||
"src.services.consensus_engine._consensus_engine",
|
||
new=mock_consensus_engine,
|
||
),
|
||
patch("src.services.consensus_engine.get_redis") as mock_redis_factory,
|
||
):
|
||
mock_settings.ENABLE_12AGENT_CONSENSUS = True
|
||
|
||
mock_redis = AsyncMock()
|
||
mock_redis.get = AsyncMock(return_value=None)
|
||
mock_redis.set = AsyncMock(return_value=True)
|
||
mock_redis_factory.return_value = mock_redis
|
||
|
||
dm = _make_dm_with_mocks()
|
||
|
||
token = await dm.get_or_create_decision_with_consensus(
|
||
incident=incident, timeout_sec=30.0
|
||
)
|
||
|
||
mock_consensus_engine.run_consensus.assert_awaited_once()
|
||
assert token.state == DecisionState.READY
|
||
assert token.proposal_data["source"] == "consensus_engine"
|
||
assert token.proposal_data["consensus_score"] == 0.75
|
||
|
||
|
||
# =============================================================================
|
||
# Test 3: ENABLE_12AGENT_CONSENSUS=True + P2 → 不走 Consensus
|
||
# =============================================================================
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_consensus_enabled_p2_skips_consensus():
|
||
"""flag=True 但 P2 事件(非 P0/P1)→ 走 get_or_create_decision,不呼叫 Consensus"""
|
||
incident = _make_incident(Severity.P2)
|
||
|
||
mock_consensus_engine = AsyncMock()
|
||
mock_consensus_engine.run_consensus = AsyncMock()
|
||
|
||
with (
|
||
patch("src.services.decision_manager.settings") as mock_settings,
|
||
patch(
|
||
"src.services.consensus_engine._consensus_engine",
|
||
new=mock_consensus_engine,
|
||
),
|
||
):
|
||
mock_settings.ENABLE_12AGENT_CONSENSUS = True
|
||
|
||
dm = _make_dm_with_mocks()
|
||
mock_token = MagicMock()
|
||
dm.get_or_create_decision = AsyncMock(return_value=mock_token)
|
||
|
||
result = await dm.get_or_create_decision_with_consensus(
|
||
incident=incident, timeout_sec=30.0
|
||
)
|
||
|
||
dm.get_or_create_decision.assert_awaited_once()
|
||
mock_consensus_engine.run_consensus.assert_not_awaited()
|
||
assert result is mock_token
|
||
|
||
|
||
# =============================================================================
|
||
# Test 4: Consensus 共識分數 ≥0.6 → token.state == READY,risk_level="medium"
|
||
# =============================================================================
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_high_consensus_score_results_in_ready_token():
|
||
"""共識分數 0.8(≥0.6 閾值)→ token 狀態應為 READY,risk_level="medium" """
|
||
from src.services.decision_manager import DecisionState
|
||
|
||
incident = _make_incident(Severity.P1)
|
||
consensus_result = _make_consensus_result(consensus_score=0.80)
|
||
|
||
mock_consensus_engine = AsyncMock()
|
||
mock_consensus_engine.run_consensus = AsyncMock(return_value=consensus_result)
|
||
|
||
with (
|
||
patch("src.services.decision_manager.settings") as mock_settings,
|
||
patch(
|
||
"src.services.consensus_engine._consensus_engine",
|
||
new=mock_consensus_engine,
|
||
),
|
||
patch("src.services.consensus_engine.get_redis") as mock_redis_factory,
|
||
):
|
||
mock_settings.ENABLE_12AGENT_CONSENSUS = True
|
||
|
||
mock_redis = AsyncMock()
|
||
mock_redis.get = AsyncMock(return_value=None)
|
||
mock_redis.set = AsyncMock(return_value=True)
|
||
mock_redis_factory.return_value = mock_redis
|
||
|
||
dm = _make_dm_with_mocks()
|
||
|
||
token = await dm.get_or_create_decision_with_consensus(
|
||
incident=incident, timeout_sec=30.0
|
||
)
|
||
|
||
assert token.state == DecisionState.READY
|
||
assert token.proposal_data["consensus_score"] == 0.80
|
||
assert token.proposal_data["risk_level"] == "medium"
|
||
|
||
|
||
# =============================================================================
|
||
# Test 5: Consensus 拋例外 → fallback 到 expert_analyze,不阻斷主路由
|
||
# =============================================================================
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_consensus_exception_falls_back_to_expert():
|
||
"""ConsensusEngine.run_consensus 拋出例外 → fallback 到 expert_analyze,token 仍為 READY"""
|
||
from src.services.decision_manager import DecisionState
|
||
|
||
incident = _make_incident(Severity.P0)
|
||
|
||
mock_consensus_engine = AsyncMock()
|
||
mock_consensus_engine.run_consensus = AsyncMock(
|
||
side_effect=RuntimeError("Ollama 111 offline")
|
||
)
|
||
|
||
with (
|
||
patch("src.services.decision_manager.settings") as mock_settings,
|
||
patch(
|
||
"src.services.consensus_engine._consensus_engine",
|
||
new=mock_consensus_engine,
|
||
),
|
||
patch("src.services.decision_manager.expert_analyze") as mock_expert,
|
||
):
|
||
mock_settings.ENABLE_12AGENT_CONSENSUS = True
|
||
mock_expert.return_value = {
|
||
"source": "expert_system",
|
||
"action": "fallback_action",
|
||
"confidence": 0.5,
|
||
}
|
||
|
||
dm = _make_dm_with_mocks()
|
||
|
||
token = await dm.get_or_create_decision_with_consensus(
|
||
incident=incident, timeout_sec=30.0
|
||
)
|
||
|
||
# Consensus 失敗後應 fallback 到 expert,token 仍為 READY(不阻斷)
|
||
assert token.state == DecisionState.READY
|
||
assert token.error is not None # 錯誤訊息被記錄
|
||
mock_expert.assert_called_once_with(incident)
|