Files
awoooi/apps/api/tests/test_consensus_integration.py
Your Name cc547736ab feat(wave6-8): P2.1 fusion + P2.2 governance + P2.4 consensus + Wave 7/8 BLOCKER 修復
承接 Wave 6/7/8 多 engineer 在 agent 限額前完成的代碼,補 commit 解 production
HEAD 隱性 import error(decision_fusion 已被 decision_manager 引用但檔案 untracked)。

新增(後端核心):
- decision_fusion.py (562 行) — P2.1 方法 III(OpenClaw + Hermes + Elephant 三 LLM 融合)
- aiops_timeline.py + aiops_timeline_service.py — critic B4 修復
  /api/v1/aiops/timeline endpoint,DB 存取抽到 service 層遵守 leWOOOgo 積木化
- migrations/p2_decision_fusion_columns.sql + rollback — approval_records fusion 欄位

修改(後端整合):
- decision_manager.py — fusion 三斷鏈修補(critic B1+B2+B3):
  · B1: 寫 _evidence_snapshot_ref 到 token.proposal_data
  · B2: fusion 前計算 complexity_score 並寫 token
  · B3: fusion composite 寫 token.proposal_data["decision_fusion"]
- auto_approve.py — fusion + consensus 認識(critic B3+B5):
  · composite > 0.7 → auto_execute_eligible bypass min_confidence
  · source=consensus_engine + score>=0.6 → 規則可信路徑
- consensus_engine.py — db-fix _save_consensus 重用 agent_sessions
- governance_agent.py — db-fix _alert PG 寫入 ai_governance_events
- approval_db.py — fusion 3 欄位 + 2 partial index + CheckConstraint
- db/models.py — schema 對齊 migration
- core/config.py — vuln #1 修復:OLLAMA_URL/_FALLBACK_URL field_validator
  拒絕公網 IP + 外部域名,僅允許私網/loopback/K8s SVC 白名單
- core/feature_flags.py — P2 fusion + consensus flags
- main.py — governance_agent lifespan 啟動
- failover_alerter.py — Wave8-X2: in-memory dedup fallback(Redis 拒絕後不 fail-open)
- ollama_*.py — metrics 整合 + recovery 改善
- auto_repair_service.py — verifier 接線

新增(測試 2438 行):
- test_decision_fusion.py / test_governance_agent.py / test_consensus_integration.py
- test_p2_db_fixes.py / test_wave8_fusion_fixes.py
- test_config_url_validation.py(vuln #1 12 tests)
- test_failover_alerter.py +Wave8-X2 in-memory dedup 補測

驗收: 116 tests pass (decision_fusion + wave8_fusion + config_url + consensus +
                      governance + p2_db_fixes + failover_alerter)

Conflict resolution:
- 3 檔(config.py + auto_approve.py + decision_manager.py)git stash pop 衝突
  保留 stashed (engineer 最終版),補回 ValueError 「公網 IP」字樣對齊 test

Note: 此 commit 解 production HEAD 隱性 import error
仍未修: vuln #4 prompt injection / debugger B14 quota fail-closed
       / B25-B26 drain_pending_tasks / B8 governance fail alert

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-Authored-By: Multiple Engineers (Wave 6/7/8) <noreply@anthropic.com>
2026-04-27 08:11:40 +08:00

273 lines
9.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
test_consensus_integration.py — P2.4 12-Agent Consensus 整合測試
================================================================
測試覆蓋:
1. ENABLE_12AGENT_CONSENSUS=False → 不走 Consensus走原有雙軌路徑
2. ENABLE_12AGENT_CONSENSUS=True + P0 → Consensus 被呼叫
3. ENABLE_12AGENT_CONSENSUS=True + P2非 P0/P1→ 不走 Consensus
4. Consensus 共識分數 ≥0.6 → 回傳 READY tokenrisk_level="medium"
5. Consensus 拋例外 → fallback 到 expert_analyze不阻斷主路由
測試類型: unitmock ConsensusEnginelazy import 透過 patch consensus_engine 模組)
2026-04-26 P2.4 by Claude — 12-Agent Consensus 整合測試
"""
from __future__ import annotations
from datetime import datetime, timezone
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from src.models.incident import Incident, Severity, Signal
from src.services.consensus_engine import ConsensusResult
# =============================================================================
# Fixtures
# =============================================================================
def _make_signal(alert_name: str = "HighCPUUsage") -> Signal:
return Signal(
alert_name=alert_name,
severity=Severity.P0,
source="prometheus",
fired_at=datetime.now(timezone.utc),
)
def _make_incident(severity: Severity = Severity.P0) -> Incident:
return Incident(
severity=severity,
signals=[_make_signal()],
affected_services=["api"],
)
def _make_consensus_result(consensus_score: float = 0.75) -> ConsensusResult:
from src.services.consensus_engine import AgentOpinion, AgentType
opinion = AgentOpinion(
agent_type=AgentType.SRE,
action="重新啟動服務",
reasoning="SRE 分析: 需要重啟",
confidence=0.8,
risk_assessment="medium",
kubectl_command="kubectl rollout restart deployment/api",
priority=8,
)
return ConsensusResult(
consensus_id="CON-TEST-001",
incident_id="INC-TEST-001",
opinions=[opinion],
consensus_score=consensus_score,
recommended_action="重新啟動服務",
recommended_kubectl="kubectl rollout restart deployment/api",
final_reasoning="整合意見: 重啟",
risk_level="medium" if consensus_score >= 0.6 else "critical",
dissenting_opinions=[],
)
def _make_dm_with_mocks():
"""建立帶有必要 mock 的 DecisionManager不觸發 __init__"""
from src.services.decision_manager import DecisionManager
dm = DecisionManager.__new__(DecisionManager)
dm._save_token = AsyncMock()
dm._find_existing_token = AsyncMock(return_value=None)
return dm
# =============================================================================
# Test 1: ENABLE_12AGENT_CONSENSUS=False → 不走 Consensus
# =============================================================================
@pytest.mark.asyncio
async def test_consensus_disabled_skips_consensus_engine():
"""flag=False 時get_or_create_decision_with_consensus 應直接走 get_or_create_decision"""
incident = _make_incident(Severity.P0)
with patch("src.services.decision_manager.settings") as mock_settings:
mock_settings.ENABLE_12AGENT_CONSENSUS = False
dm = _make_dm_with_mocks()
mock_token = MagicMock()
dm.get_or_create_decision = AsyncMock(return_value=mock_token)
result = await dm.get_or_create_decision_with_consensus(
incident=incident, timeout_sec=30.0
)
dm.get_or_create_decision.assert_awaited_once_with(incident, 30.0)
assert result is mock_token
# =============================================================================
# Test 2: ENABLE_12AGENT_CONSENSUS=True + P0 → Consensus 被呼叫
# =============================================================================
@pytest.mark.asyncio
async def test_consensus_enabled_p0_calls_consensus_engine():
"""flag=True + P0 事件 → ConsensusEngine.run_consensus 必須被呼叫"""
from src.services.decision_manager import DecisionState
incident = _make_incident(Severity.P0)
consensus_result = _make_consensus_result(consensus_score=0.75)
mock_consensus_engine = AsyncMock()
mock_consensus_engine.run_consensus = AsyncMock(return_value=consensus_result)
with (
patch("src.services.decision_manager.settings") as mock_settings,
# decision_manager 用 lazy importfrom src.services.consensus_engine import get_consensus_engine
# 所以要 patch consensus_engine 模組本身的 singleton
patch(
"src.services.consensus_engine._consensus_engine",
new=mock_consensus_engine,
),
patch("src.services.consensus_engine.get_redis") as mock_redis_factory,
):
mock_settings.ENABLE_12AGENT_CONSENSUS = True
mock_redis = AsyncMock()
mock_redis.get = AsyncMock(return_value=None)
mock_redis.set = AsyncMock(return_value=True)
mock_redis_factory.return_value = mock_redis
dm = _make_dm_with_mocks()
token = await dm.get_or_create_decision_with_consensus(
incident=incident, timeout_sec=30.0
)
mock_consensus_engine.run_consensus.assert_awaited_once()
assert token.state == DecisionState.READY
assert token.proposal_data["source"] == "consensus_engine"
assert token.proposal_data["consensus_score"] == 0.75
# =============================================================================
# Test 3: ENABLE_12AGENT_CONSENSUS=True + P2 → 不走 Consensus
# =============================================================================
@pytest.mark.asyncio
async def test_consensus_enabled_p2_skips_consensus():
"""flag=True 但 P2 事件(非 P0/P1→ 走 get_or_create_decision不呼叫 Consensus"""
incident = _make_incident(Severity.P2)
mock_consensus_engine = AsyncMock()
mock_consensus_engine.run_consensus = AsyncMock()
with (
patch("src.services.decision_manager.settings") as mock_settings,
patch(
"src.services.consensus_engine._consensus_engine",
new=mock_consensus_engine,
),
):
mock_settings.ENABLE_12AGENT_CONSENSUS = True
dm = _make_dm_with_mocks()
mock_token = MagicMock()
dm.get_or_create_decision = AsyncMock(return_value=mock_token)
result = await dm.get_or_create_decision_with_consensus(
incident=incident, timeout_sec=30.0
)
dm.get_or_create_decision.assert_awaited_once()
mock_consensus_engine.run_consensus.assert_not_awaited()
assert result is mock_token
# =============================================================================
# Test 4: Consensus 共識分數 ≥0.6 → token.state == READYrisk_level="medium"
# =============================================================================
@pytest.mark.asyncio
async def test_high_consensus_score_results_in_ready_token():
"""共識分數 0.8≥0.6 閾值)→ token 狀態應為 READYrisk_level="medium" """
from src.services.decision_manager import DecisionState
incident = _make_incident(Severity.P1)
consensus_result = _make_consensus_result(consensus_score=0.80)
mock_consensus_engine = AsyncMock()
mock_consensus_engine.run_consensus = AsyncMock(return_value=consensus_result)
with (
patch("src.services.decision_manager.settings") as mock_settings,
patch(
"src.services.consensus_engine._consensus_engine",
new=mock_consensus_engine,
),
patch("src.services.consensus_engine.get_redis") as mock_redis_factory,
):
mock_settings.ENABLE_12AGENT_CONSENSUS = True
mock_redis = AsyncMock()
mock_redis.get = AsyncMock(return_value=None)
mock_redis.set = AsyncMock(return_value=True)
mock_redis_factory.return_value = mock_redis
dm = _make_dm_with_mocks()
token = await dm.get_or_create_decision_with_consensus(
incident=incident, timeout_sec=30.0
)
assert token.state == DecisionState.READY
assert token.proposal_data["consensus_score"] == 0.80
assert token.proposal_data["risk_level"] == "medium"
# =============================================================================
# Test 5: Consensus 拋例外 → fallback 到 expert_analyze不阻斷主路由
# =============================================================================
@pytest.mark.asyncio
async def test_consensus_exception_falls_back_to_expert():
"""ConsensusEngine.run_consensus 拋出例外 → fallback 到 expert_analyzetoken 仍為 READY"""
from src.services.decision_manager import DecisionState
incident = _make_incident(Severity.P0)
mock_consensus_engine = AsyncMock()
mock_consensus_engine.run_consensus = AsyncMock(
side_effect=RuntimeError("Ollama 111 offline")
)
with (
patch("src.services.decision_manager.settings") as mock_settings,
patch(
"src.services.consensus_engine._consensus_engine",
new=mock_consensus_engine,
),
patch("src.services.decision_manager.expert_analyze") as mock_expert,
):
mock_settings.ENABLE_12AGENT_CONSENSUS = True
mock_expert.return_value = {
"source": "expert_system",
"action": "fallback_action",
"confidence": 0.5,
}
dm = _make_dm_with_mocks()
token = await dm.get_or_create_decision_with_consensus(
incident=incident, timeout_sec=30.0
)
# Consensus 失敗後應 fallback 到 experttoken 仍為 READY不阻斷
assert token.state == DecisionState.READY
assert token.error is not None # 錯誤訊息被記錄
mock_expert.assert_called_once_with(incident)