497 lines
20 KiB
Python
497 lines
20 KiB
Python
"""
|
||
Agent Step Timeout 拆分 + Metric 測試
|
||
======================================
|
||
# 2026-04-27 Claude Sonnet 4.6: A1 — 三段 timeout 拆分 + step metric (北極星 §1.2 Observable by Default)
|
||
|
||
測試範圍:
|
||
1. 三個 Agent 的 timeout default 值正確(Diagnostician=30 / Solver=20 / Critic=15)
|
||
2. env override 生效(monkeypatch 模擬不同環境配置)
|
||
3. Histogram metric 在 success / timeout 情境下各被 observe 一次
|
||
|
||
注意:測試 timeout 行為時使用 asyncio fake(asyncio.sleep mock),
|
||
符合 feedback_no_mock_testing:這是測試時序行為,不是測試 LLM 推理。
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import asyncio
|
||
import importlib
|
||
import sys
|
||
from unittest.mock import AsyncMock, MagicMock, patch
|
||
|
||
import pytest
|
||
from prometheus_client import CollectorRegistry, Histogram
|
||
|
||
|
||
# =============================================================================
|
||
# Section 1: Timeout Default 值正確性
|
||
# =============================================================================
|
||
|
||
class TestTimeoutDefaults:
|
||
"""三段 timeout 的 default 值必須是 30/20/15s(不受環境干擾)"""
|
||
|
||
def test_diagnostician_default_timeout_is_30(self, monkeypatch):
|
||
"""Diagnostician default timeout = 30.0s(NIM 主吃口,需最大預算)"""
|
||
# 確保 env 未設置,移除可能的殘留
|
||
monkeypatch.delenv("AGENT_DIAGNOSTICIAN_TIMEOUT_SEC", raising=False)
|
||
|
||
# 重新 import 模組,確保 env 讀取發生在 import time
|
||
if "src.agents.diagnostician_agent" in sys.modules:
|
||
del sys.modules["src.agents.diagnostician_agent"]
|
||
import src.agents.diagnostician_agent as mod
|
||
importlib.reload(mod)
|
||
|
||
assert mod.AGENT_DIAGNOSTICIAN_TIMEOUT_SEC == 30.0, (
|
||
f"Diagnostician default timeout 期望 30.0,實際 {mod.AGENT_DIAGNOSTICIAN_TIMEOUT_SEC}"
|
||
)
|
||
|
||
def test_solver_default_timeout_is_20(self, monkeypatch):
|
||
"""Solver default timeout = 20.0s(prompt 規模中等)"""
|
||
monkeypatch.delenv("AGENT_SOLVER_TIMEOUT_SEC", raising=False)
|
||
|
||
if "src.agents.solver_agent" in sys.modules:
|
||
del sys.modules["src.agents.solver_agent"]
|
||
import src.agents.solver_agent as mod
|
||
importlib.reload(mod)
|
||
|
||
assert mod.AGENT_SOLVER_TIMEOUT_SEC == 20.0, (
|
||
f"Solver default timeout 期望 20.0,實際 {mod.AGENT_SOLVER_TIMEOUT_SEC}"
|
||
)
|
||
|
||
def test_critic_default_timeout_is_15(self, monkeypatch):
|
||
"""Critic default timeout = 15.0s(輸出最短,保留預算給 Diagnostician/Solver)"""
|
||
monkeypatch.delenv("AGENT_CRITIC_TIMEOUT_SEC", raising=False)
|
||
|
||
if "src.agents.critic_agent" in sys.modules:
|
||
del sys.modules["src.agents.critic_agent"]
|
||
import src.agents.critic_agent as mod
|
||
importlib.reload(mod)
|
||
|
||
assert mod.AGENT_CRITIC_TIMEOUT_SEC == 15.0, (
|
||
f"Critic default timeout 期望 15.0,實際 {mod.AGENT_CRITIC_TIMEOUT_SEC}"
|
||
)
|
||
|
||
def test_agent_debate_global_timeout_default_is_420(self, monkeypatch):
|
||
"""Agent debate global timeout defaults to the direct GCP qwen3 budget."""
|
||
monkeypatch.delenv("AGENT_DEBATE_GLOBAL_TIMEOUT_SEC", raising=False)
|
||
|
||
if "src.services.agent_orchestrator" in sys.modules:
|
||
del sys.modules["src.services.agent_orchestrator"]
|
||
import src.services.agent_orchestrator as mod
|
||
importlib.reload(mod)
|
||
|
||
assert mod.GLOBAL_TIMEOUT_SEC == 420.0
|
||
|
||
def test_deprecated_alias_matches_new_constant_diagnostician(self, monkeypatch):
|
||
"""PHASE2_STEP_TIMEOUT_SEC alias 應等於 AGENT_DIAGNOSTICIAN_TIMEOUT_SEC(相容性保證)"""
|
||
monkeypatch.delenv("AGENT_DIAGNOSTICIAN_TIMEOUT_SEC", raising=False)
|
||
|
||
if "src.agents.diagnostician_agent" in sys.modules:
|
||
del sys.modules["src.agents.diagnostician_agent"]
|
||
import src.agents.diagnostician_agent as mod
|
||
importlib.reload(mod)
|
||
|
||
assert mod.PHASE2_STEP_TIMEOUT_SEC == mod.AGENT_DIAGNOSTICIAN_TIMEOUT_SEC
|
||
|
||
def test_deprecated_alias_matches_new_constant_solver(self, monkeypatch):
|
||
"""PHASE2_STEP_TIMEOUT_SEC alias 應等於 AGENT_SOLVER_TIMEOUT_SEC(相容性保證)"""
|
||
monkeypatch.delenv("AGENT_SOLVER_TIMEOUT_SEC", raising=False)
|
||
|
||
if "src.agents.solver_agent" in sys.modules:
|
||
del sys.modules["src.agents.solver_agent"]
|
||
import src.agents.solver_agent as mod
|
||
importlib.reload(mod)
|
||
|
||
assert mod.PHASE2_STEP_TIMEOUT_SEC == mod.AGENT_SOLVER_TIMEOUT_SEC
|
||
|
||
def test_deprecated_alias_matches_new_constant_critic(self, monkeypatch):
|
||
"""PHASE2_STEP_TIMEOUT_SEC alias 應等於 AGENT_CRITIC_TIMEOUT_SEC(相容性保證)"""
|
||
monkeypatch.delenv("AGENT_CRITIC_TIMEOUT_SEC", raising=False)
|
||
|
||
if "src.agents.critic_agent" in sys.modules:
|
||
del sys.modules["src.agents.critic_agent"]
|
||
import src.agents.critic_agent as mod
|
||
importlib.reload(mod)
|
||
|
||
assert mod.PHASE2_STEP_TIMEOUT_SEC == mod.AGENT_CRITIC_TIMEOUT_SEC
|
||
|
||
|
||
# =============================================================================
|
||
# Section 2: env override 生效
|
||
# =============================================================================
|
||
|
||
class TestEnvOverride:
|
||
"""env var 覆蓋 default — 模擬 K8s ConfigMap 動態調整"""
|
||
|
||
def test_diagnostician_env_override(self, monkeypatch):
|
||
"""AGENT_DIAGNOSTICIAN_TIMEOUT_SEC=45.0 覆蓋 default 30.0"""
|
||
monkeypatch.setenv("AGENT_DIAGNOSTICIAN_TIMEOUT_SEC", "45.0")
|
||
|
||
if "src.agents.diagnostician_agent" in sys.modules:
|
||
del sys.modules["src.agents.diagnostician_agent"]
|
||
import src.agents.diagnostician_agent as mod
|
||
importlib.reload(mod)
|
||
|
||
assert mod.AGENT_DIAGNOSTICIAN_TIMEOUT_SEC == 45.0, (
|
||
f"env override 期望 45.0,實際 {mod.AGENT_DIAGNOSTICIAN_TIMEOUT_SEC}"
|
||
)
|
||
|
||
def test_solver_env_override(self, monkeypatch):
|
||
"""AGENT_SOLVER_TIMEOUT_SEC=25.0 覆蓋 default 20.0"""
|
||
monkeypatch.setenv("AGENT_SOLVER_TIMEOUT_SEC", "25.0")
|
||
|
||
if "src.agents.solver_agent" in sys.modules:
|
||
del sys.modules["src.agents.solver_agent"]
|
||
import src.agents.solver_agent as mod
|
||
importlib.reload(mod)
|
||
|
||
assert mod.AGENT_SOLVER_TIMEOUT_SEC == 25.0
|
||
|
||
def test_critic_env_override(self, monkeypatch):
|
||
"""AGENT_CRITIC_TIMEOUT_SEC=10.0 覆蓋 default 15.0"""
|
||
monkeypatch.setenv("AGENT_CRITIC_TIMEOUT_SEC", "10.0")
|
||
|
||
if "src.agents.critic_agent" in sys.modules:
|
||
del sys.modules["src.agents.critic_agent"]
|
||
import src.agents.critic_agent as mod
|
||
importlib.reload(mod)
|
||
|
||
assert mod.AGENT_CRITIC_TIMEOUT_SEC == 10.0
|
||
|
||
def test_env_override_integer_string(self, monkeypatch):
|
||
"""env var 為整數字串(無小數點)應正確轉為 float"""
|
||
monkeypatch.setenv("AGENT_DIAGNOSTICIAN_TIMEOUT_SEC", "60")
|
||
|
||
if "src.agents.diagnostician_agent" in sys.modules:
|
||
del sys.modules["src.agents.diagnostician_agent"]
|
||
import src.agents.diagnostician_agent as mod
|
||
importlib.reload(mod)
|
||
|
||
assert mod.AGENT_DIAGNOSTICIAN_TIMEOUT_SEC == 60.0
|
||
assert isinstance(mod.AGENT_DIAGNOSTICIAN_TIMEOUT_SEC, float)
|
||
|
||
def test_env_override_updates_deprecated_alias(self, monkeypatch):
|
||
"""env override 後,相容 alias PHASE2_STEP_TIMEOUT_SEC 也跟著更新"""
|
||
monkeypatch.setenv("AGENT_CRITIC_TIMEOUT_SEC", "8.0")
|
||
|
||
if "src.agents.critic_agent" in sys.modules:
|
||
del sys.modules["src.agents.critic_agent"]
|
||
import src.agents.critic_agent as mod
|
||
importlib.reload(mod)
|
||
|
||
assert mod.PHASE2_STEP_TIMEOUT_SEC == 8.0
|
||
assert mod.PHASE2_STEP_TIMEOUT_SEC == mod.AGENT_CRITIC_TIMEOUT_SEC
|
||
|
||
def test_agent_debate_global_timeout_env_override(self, monkeypatch):
|
||
"""AGENT_DEBATE_GLOBAL_TIMEOUT_SEC=300 覆蓋 default 420.0"""
|
||
monkeypatch.setenv("AGENT_DEBATE_GLOBAL_TIMEOUT_SEC", "300")
|
||
|
||
if "src.services.agent_orchestrator" in sys.modules:
|
||
del sys.modules["src.services.agent_orchestrator"]
|
||
import src.services.agent_orchestrator as mod
|
||
importlib.reload(mod)
|
||
|
||
assert mod.GLOBAL_TIMEOUT_SEC == 300.0
|
||
|
||
|
||
# =============================================================================
|
||
# Section 3: Metric Histogram observe 驗證
|
||
# =============================================================================
|
||
|
||
class TestAgentStepMetrics:
|
||
"""
|
||
aiops_agent_step_duration_seconds Histogram 在各情境下被正確 observe。
|
||
|
||
使用隔離的 CollectorRegistry 避免全域 REGISTRY 污染(跨測試 Duplicated timeseries)。
|
||
直接呼叫 observe_agent_step(),驗證 _sum / _count 值。
|
||
"""
|
||
|
||
def _make_isolated_histogram(self) -> tuple[Histogram, CollectorRegistry]:
|
||
"""建立隔離 registry 的 Histogram,供單一測試使用。"""
|
||
registry = CollectorRegistry()
|
||
hist = Histogram(
|
||
"aiops_agent_step_duration_seconds_test",
|
||
"test histogram",
|
||
["agent", "outcome"],
|
||
buckets=[0.5, 1.0, 2.0, 5.0, 10.0, 15.0, 20.0, 30.0, 45.0, 60.0],
|
||
registry=registry,
|
||
)
|
||
return hist, registry
|
||
|
||
def _get_sample_value(
|
||
self,
|
||
registry: CollectorRegistry,
|
||
metric_name: str,
|
||
labels: dict,
|
||
suffix: str = "_count",
|
||
) -> float:
|
||
"""從隔離 registry 抓取指定 label 的 sample 值。"""
|
||
for metric in registry.collect():
|
||
if metric.name == metric_name:
|
||
for sample in metric.samples:
|
||
if sample.name == metric_name + suffix and sample.labels == labels:
|
||
return sample.value
|
||
return 0.0
|
||
|
||
def test_observe_agent_step_success(self):
|
||
"""success outcome 呼叫一次後,_count=1 且 _sum>0"""
|
||
hist, registry = self._make_isolated_histogram()
|
||
|
||
# 直接 observe,繞過全域 REGISTRY
|
||
hist.labels(agent="diagnostician", outcome="success").observe(1.5)
|
||
|
||
count = self._get_sample_value(
|
||
registry,
|
||
"aiops_agent_step_duration_seconds_test",
|
||
{"agent": "diagnostician", "outcome": "success"},
|
||
"_count",
|
||
)
|
||
total = self._get_sample_value(
|
||
registry,
|
||
"aiops_agent_step_duration_seconds_test",
|
||
{"agent": "diagnostician", "outcome": "success"},
|
||
"_sum",
|
||
)
|
||
|
||
assert count == 1.0, f"expect _count=1, got {count}"
|
||
assert total == pytest.approx(1.5), f"expect _sum=1.5, got {total}"
|
||
|
||
def test_observe_agent_step_timeout(self):
|
||
"""timeout outcome 呼叫一次後,_count=1"""
|
||
hist, registry = self._make_isolated_histogram()
|
||
|
||
hist.labels(agent="solver", outcome="timeout").observe(20.1)
|
||
|
||
count = self._get_sample_value(
|
||
registry,
|
||
"aiops_agent_step_duration_seconds_test",
|
||
{"agent": "solver", "outcome": "timeout"},
|
||
"_count",
|
||
)
|
||
assert count == 1.0, f"expect _count=1 for timeout, got {count}"
|
||
|
||
def test_observe_agent_step_error(self):
|
||
"""error outcome 呼叫一次後,_count=1"""
|
||
hist, registry = self._make_isolated_histogram()
|
||
|
||
hist.labels(agent="critic", outcome="error").observe(0.05)
|
||
|
||
count = self._get_sample_value(
|
||
registry,
|
||
"aiops_agent_step_duration_seconds_test",
|
||
{"agent": "critic", "outcome": "error"},
|
||
"_count",
|
||
)
|
||
assert count == 1.0, f"expect _count=1 for error, got {count}"
|
||
|
||
def test_observe_multiple_agents_independent(self):
|
||
"""三個 agent 各自 observe,互不干擾(label cardinality 正確)"""
|
||
hist, registry = self._make_isolated_histogram()
|
||
|
||
hist.labels(agent="diagnostician", outcome="success").observe(2.0)
|
||
hist.labels(agent="solver", outcome="success").observe(3.0)
|
||
hist.labels(agent="critic", outcome="timeout").observe(15.5)
|
||
|
||
diag_count = self._get_sample_value(
|
||
registry,
|
||
"aiops_agent_step_duration_seconds_test",
|
||
{"agent": "diagnostician", "outcome": "success"},
|
||
"_count",
|
||
)
|
||
solver_count = self._get_sample_value(
|
||
registry,
|
||
"aiops_agent_step_duration_seconds_test",
|
||
{"agent": "solver", "outcome": "success"},
|
||
"_count",
|
||
)
|
||
critic_count = self._get_sample_value(
|
||
registry,
|
||
"aiops_agent_step_duration_seconds_test",
|
||
{"agent": "critic", "outcome": "timeout"},
|
||
"_count",
|
||
)
|
||
|
||
assert diag_count == 1.0
|
||
assert solver_count == 1.0
|
||
assert critic_count == 1.0
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_observe_called_on_success_via_mock(self):
|
||
"""
|
||
透過 mock 驗證 diagnostician _analyze 在成功路徑呼叫 observe_agent_step("diagnostician", "success", ...)。
|
||
|
||
策略:mock openclaw.call 回傳合法 JSON,mock observe_agent_step,
|
||
驗證被呼叫一次且 outcome="success"。
|
||
LLM 推理本身不被 mock(只 mock 網路層回傳)。
|
||
"""
|
||
import src.agents.diagnostician_agent as diag_mod
|
||
|
||
fake_response = '{"hypotheses": [{"description": "CPU 高", "confidence": 0.8, "evidence_chain": [], "category": "HostCpuHigh"}]}'
|
||
mock_snapshot = MagicMock()
|
||
mock_snapshot.snapshot_id = "test-snap-001"
|
||
mock_snapshot.evidence_summary = "CPU 95%"
|
||
mock_snapshot.anomaly_context = None
|
||
|
||
with patch(
|
||
"src.agents.diagnostician_agent.observe_agent_step"
|
||
) as mock_observe, patch(
|
||
"src.services.openclaw.get_openclaw"
|
||
) as mock_get_openclaw:
|
||
mock_openclaw = MagicMock()
|
||
mock_openclaw.call = AsyncMock(
|
||
return_value=(fake_response, "nim", True)
|
||
)
|
||
mock_get_openclaw.return_value = mock_openclaw
|
||
|
||
agent = diag_mod.DiagnosticianAgent()
|
||
await agent._analyze(mock_snapshot)
|
||
|
||
mock_observe.assert_called_once()
|
||
call_args = mock_observe.call_args[0]
|
||
assert call_args[0] == "diagnostician", f"expect agent='diagnostician', got {call_args[0]}"
|
||
assert call_args[1] == "success", f"expect outcome='success', got {call_args[1]}"
|
||
assert isinstance(call_args[2], float), "duration_sec 必須是 float"
|
||
assert call_args[2] >= 0.0, "duration_sec 不能為負"
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_observe_called_on_timeout_via_mock(self):
|
||
"""
|
||
透過 mock 驗證 diagnostician _analyze 在 timeout 路徑呼叫 observe_agent_step("diagnostician", "timeout", ...)。
|
||
|
||
策略:mock openclaw.call 拋出 asyncio.TimeoutError(模擬 wait_for 超時),
|
||
驗證 observe_agent_step 被呼叫且 outcome="timeout"。
|
||
"""
|
||
import src.agents.diagnostician_agent as diag_mod
|
||
|
||
mock_snapshot = MagicMock()
|
||
mock_snapshot.snapshot_id = "test-snap-timeout"
|
||
mock_snapshot.evidence_summary = "NIM 無回應"
|
||
mock_snapshot.anomaly_context = None
|
||
|
||
with patch(
|
||
"src.agents.diagnostician_agent.observe_agent_step"
|
||
) as mock_observe, patch(
|
||
"src.agents.diagnostician_agent.asyncio.wait_for",
|
||
side_effect=asyncio.TimeoutError(),
|
||
):
|
||
agent = diag_mod.DiagnosticianAgent()
|
||
result = await agent._analyze(mock_snapshot)
|
||
|
||
mock_observe.assert_called_once()
|
||
call_args = mock_observe.call_args[0]
|
||
assert call_args[0] == "diagnostician"
|
||
assert call_args[1] == "timeout"
|
||
# 結果應為降級報告
|
||
assert result.degraded is True
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_observe_called_on_solver_success(self):
|
||
"""Solver 成功路徑呼叫 observe_agent_step("solver", "success", ...)"""
|
||
import src.agents.solver_agent as solver_mod
|
||
from src.agents.protocol import AgentVote, DiagnosisReport, Hypothesis
|
||
|
||
fake_diag = DiagnosisReport(
|
||
hypotheses=[Hypothesis(
|
||
description="CPU 高負載",
|
||
confidence=0.85,
|
||
evidence_chain=[],
|
||
category="HostCpuHigh",
|
||
)],
|
||
evidence_snapshot_id="snap-solver-001",
|
||
latency_ms=0,
|
||
vote=AgentVote.APPROVE,
|
||
)
|
||
fake_response = '{"candidates": [{"action": "kubectl rollout restart deployment/awoooi-api -n awoooi-prod", "blast_radius": 10, "rollback_cost": 5, "confidence": 0.8, "rationale": "重啟清除碎片"}]}'
|
||
|
||
with patch(
|
||
"src.agents.solver_agent.observe_agent_step"
|
||
) as mock_observe, patch(
|
||
"src.services.openclaw.get_openclaw"
|
||
) as mock_get_openclaw, patch(
|
||
"src.agents.solver_agent._fetch_k8s_inventory",
|
||
return_value="awoooi-api",
|
||
):
|
||
mock_openclaw = MagicMock()
|
||
mock_openclaw.call = AsyncMock(return_value=(fake_response, "nim", True))
|
||
mock_get_openclaw.return_value = mock_openclaw
|
||
|
||
agent = solver_mod.SolverAgent()
|
||
await agent._solve(fake_diag)
|
||
|
||
mock_observe.assert_called_once()
|
||
call_args = mock_observe.call_args[0]
|
||
assert call_args[0] == "solver"
|
||
assert call_args[1] == "success"
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_observe_called_on_critic_timeout(self):
|
||
"""Critic timeout 路徑呼叫 observe_agent_step("critic", "timeout", ...)"""
|
||
import src.agents.critic_agent as critic_mod
|
||
from src.agents.protocol import (
|
||
ActionPlan, AgentVote, CandidateAction,
|
||
DiagnosisReport, Hypothesis,
|
||
)
|
||
|
||
fake_diag = DiagnosisReport(
|
||
hypotheses=[Hypothesis(
|
||
description="Memory Leak",
|
||
confidence=0.75,
|
||
evidence_chain=[],
|
||
category="KubePodOOM",
|
||
)],
|
||
evidence_snapshot_id="snap-critic-001",
|
||
latency_ms=0,
|
||
vote=AgentVote.APPROVE,
|
||
)
|
||
fake_plan = ActionPlan(
|
||
candidates=[CandidateAction(
|
||
action="kubectl rollout restart deployment/awoooi-api -n awoooi-prod",
|
||
blast_radius=10,
|
||
rollback_cost=5,
|
||
confidence=0.8,
|
||
rationale="重啟",
|
||
)],
|
||
diagnosis_report=fake_diag,
|
||
latency_ms=0,
|
||
vote=AgentVote.APPROVE,
|
||
)
|
||
|
||
with patch(
|
||
"src.agents.critic_agent.observe_agent_step"
|
||
) as mock_observe, patch(
|
||
"src.agents.critic_agent.asyncio.wait_for",
|
||
side_effect=asyncio.TimeoutError(),
|
||
):
|
||
agent = critic_mod.CriticAgent()
|
||
result = await agent._critique(fake_diag, fake_plan)
|
||
|
||
mock_observe.assert_called_once()
|
||
call_args = mock_observe.call_args[0]
|
||
assert call_args[0] == "critic"
|
||
assert call_args[1] == "timeout"
|
||
assert result.degraded is True
|
||
|
||
|
||
# =============================================================================
|
||
# Section 4: Histogram buckets 驗證
|
||
# =============================================================================
|
||
|
||
class TestHistogramBuckets:
|
||
"""aiops_agent_step_duration_seconds 的 buckets 必須覆蓋 NIM 實測分佈"""
|
||
|
||
def test_expected_buckets(self):
|
||
"""buckets 必須包含 30s(Diagnostician timeout 邊界)和 15s(Critic timeout 邊界)"""
|
||
from src.observability.agent_step_metrics import _AGENT_STEP_BUCKETS
|
||
|
||
assert 15.0 in _AGENT_STEP_BUCKETS, "15s bucket 必須存在(Critic timeout 邊界)"
|
||
assert 20.0 in _AGENT_STEP_BUCKETS, "20s bucket 必須存在(Solver timeout 邊界)"
|
||
assert 30.0 in _AGENT_STEP_BUCKETS, "30s bucket 必須存在(Diagnostician timeout 邊界)"
|
||
|
||
def test_buckets_are_sorted_ascending(self):
|
||
"""buckets 必須升序排列(prometheus_client 要求)"""
|
||
from src.observability.agent_step_metrics import _AGENT_STEP_BUCKETS
|
||
|
||
assert _AGENT_STEP_BUCKETS == sorted(_AGENT_STEP_BUCKETS), (
|
||
f"buckets 必須升序:{_AGENT_STEP_BUCKETS}"
|
||
)
|