Files
awoooi/apps/api/tests/test_agent_step_timeouts.py
2026-05-06 08:55:14 +08:00

497 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Agent Step Timeout 拆分 + Metric 測試
======================================
# 2026-04-27 Claude Sonnet 4.6: A1 — 三段 timeout 拆分 + step metric (北極星 §1.2 Observable by Default)
測試範圍:
1. 三個 Agent 的 timeout default 值正確Diagnostician=30 / Solver=20 / Critic=15
2. env override 生效monkeypatch 模擬不同環境配置)
3. Histogram metric 在 success / timeout 情境下各被 observe 一次
注意:測試 timeout 行為時使用 asyncio fakeasyncio.sleep mock
符合 feedback_no_mock_testing這是測試時序行為不是測試 LLM 推理。
"""
from __future__ import annotations
import asyncio
import importlib
import sys
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from prometheus_client import CollectorRegistry, Histogram
# =============================================================================
# Section 1: Timeout Default 值正確性
# =============================================================================
class TestTimeoutDefaults:
"""三段 timeout 的 default 值必須是 30/20/15s不受環境干擾"""
def test_diagnostician_default_timeout_is_30(self, monkeypatch):
"""Diagnostician default timeout = 30.0sNIM 主吃口,需最大預算)"""
# 確保 env 未設置,移除可能的殘留
monkeypatch.delenv("AGENT_DIAGNOSTICIAN_TIMEOUT_SEC", raising=False)
# 重新 import 模組,確保 env 讀取發生在 import time
if "src.agents.diagnostician_agent" in sys.modules:
del sys.modules["src.agents.diagnostician_agent"]
import src.agents.diagnostician_agent as mod
importlib.reload(mod)
assert mod.AGENT_DIAGNOSTICIAN_TIMEOUT_SEC == 30.0, (
f"Diagnostician default timeout 期望 30.0,實際 {mod.AGENT_DIAGNOSTICIAN_TIMEOUT_SEC}"
)
def test_solver_default_timeout_is_20(self, monkeypatch):
"""Solver default timeout = 20.0sprompt 規模中等)"""
monkeypatch.delenv("AGENT_SOLVER_TIMEOUT_SEC", raising=False)
if "src.agents.solver_agent" in sys.modules:
del sys.modules["src.agents.solver_agent"]
import src.agents.solver_agent as mod
importlib.reload(mod)
assert mod.AGENT_SOLVER_TIMEOUT_SEC == 20.0, (
f"Solver default timeout 期望 20.0,實際 {mod.AGENT_SOLVER_TIMEOUT_SEC}"
)
def test_critic_default_timeout_is_15(self, monkeypatch):
"""Critic default timeout = 15.0s(輸出最短,保留預算給 Diagnostician/Solver"""
monkeypatch.delenv("AGENT_CRITIC_TIMEOUT_SEC", raising=False)
if "src.agents.critic_agent" in sys.modules:
del sys.modules["src.agents.critic_agent"]
import src.agents.critic_agent as mod
importlib.reload(mod)
assert mod.AGENT_CRITIC_TIMEOUT_SEC == 15.0, (
f"Critic default timeout 期望 15.0,實際 {mod.AGENT_CRITIC_TIMEOUT_SEC}"
)
def test_agent_debate_global_timeout_default_is_420(self, monkeypatch):
"""Agent debate global timeout defaults to the direct GCP qwen3 budget."""
monkeypatch.delenv("AGENT_DEBATE_GLOBAL_TIMEOUT_SEC", raising=False)
if "src.services.agent_orchestrator" in sys.modules:
del sys.modules["src.services.agent_orchestrator"]
import src.services.agent_orchestrator as mod
importlib.reload(mod)
assert mod.GLOBAL_TIMEOUT_SEC == 420.0
def test_deprecated_alias_matches_new_constant_diagnostician(self, monkeypatch):
"""PHASE2_STEP_TIMEOUT_SEC alias 應等於 AGENT_DIAGNOSTICIAN_TIMEOUT_SEC相容性保證"""
monkeypatch.delenv("AGENT_DIAGNOSTICIAN_TIMEOUT_SEC", raising=False)
if "src.agents.diagnostician_agent" in sys.modules:
del sys.modules["src.agents.diagnostician_agent"]
import src.agents.diagnostician_agent as mod
importlib.reload(mod)
assert mod.PHASE2_STEP_TIMEOUT_SEC == mod.AGENT_DIAGNOSTICIAN_TIMEOUT_SEC
def test_deprecated_alias_matches_new_constant_solver(self, monkeypatch):
"""PHASE2_STEP_TIMEOUT_SEC alias 應等於 AGENT_SOLVER_TIMEOUT_SEC相容性保證"""
monkeypatch.delenv("AGENT_SOLVER_TIMEOUT_SEC", raising=False)
if "src.agents.solver_agent" in sys.modules:
del sys.modules["src.agents.solver_agent"]
import src.agents.solver_agent as mod
importlib.reload(mod)
assert mod.PHASE2_STEP_TIMEOUT_SEC == mod.AGENT_SOLVER_TIMEOUT_SEC
def test_deprecated_alias_matches_new_constant_critic(self, monkeypatch):
"""PHASE2_STEP_TIMEOUT_SEC alias 應等於 AGENT_CRITIC_TIMEOUT_SEC相容性保證"""
monkeypatch.delenv("AGENT_CRITIC_TIMEOUT_SEC", raising=False)
if "src.agents.critic_agent" in sys.modules:
del sys.modules["src.agents.critic_agent"]
import src.agents.critic_agent as mod
importlib.reload(mod)
assert mod.PHASE2_STEP_TIMEOUT_SEC == mod.AGENT_CRITIC_TIMEOUT_SEC
# =============================================================================
# Section 2: env override 生效
# =============================================================================
class TestEnvOverride:
"""env var 覆蓋 default — 模擬 K8s ConfigMap 動態調整"""
def test_diagnostician_env_override(self, monkeypatch):
"""AGENT_DIAGNOSTICIAN_TIMEOUT_SEC=45.0 覆蓋 default 30.0"""
monkeypatch.setenv("AGENT_DIAGNOSTICIAN_TIMEOUT_SEC", "45.0")
if "src.agents.diagnostician_agent" in sys.modules:
del sys.modules["src.agents.diagnostician_agent"]
import src.agents.diagnostician_agent as mod
importlib.reload(mod)
assert mod.AGENT_DIAGNOSTICIAN_TIMEOUT_SEC == 45.0, (
f"env override 期望 45.0,實際 {mod.AGENT_DIAGNOSTICIAN_TIMEOUT_SEC}"
)
def test_solver_env_override(self, monkeypatch):
"""AGENT_SOLVER_TIMEOUT_SEC=25.0 覆蓋 default 20.0"""
monkeypatch.setenv("AGENT_SOLVER_TIMEOUT_SEC", "25.0")
if "src.agents.solver_agent" in sys.modules:
del sys.modules["src.agents.solver_agent"]
import src.agents.solver_agent as mod
importlib.reload(mod)
assert mod.AGENT_SOLVER_TIMEOUT_SEC == 25.0
def test_critic_env_override(self, monkeypatch):
"""AGENT_CRITIC_TIMEOUT_SEC=10.0 覆蓋 default 15.0"""
monkeypatch.setenv("AGENT_CRITIC_TIMEOUT_SEC", "10.0")
if "src.agents.critic_agent" in sys.modules:
del sys.modules["src.agents.critic_agent"]
import src.agents.critic_agent as mod
importlib.reload(mod)
assert mod.AGENT_CRITIC_TIMEOUT_SEC == 10.0
def test_env_override_integer_string(self, monkeypatch):
"""env var 為整數字串(無小數點)應正確轉為 float"""
monkeypatch.setenv("AGENT_DIAGNOSTICIAN_TIMEOUT_SEC", "60")
if "src.agents.diagnostician_agent" in sys.modules:
del sys.modules["src.agents.diagnostician_agent"]
import src.agents.diagnostician_agent as mod
importlib.reload(mod)
assert mod.AGENT_DIAGNOSTICIAN_TIMEOUT_SEC == 60.0
assert isinstance(mod.AGENT_DIAGNOSTICIAN_TIMEOUT_SEC, float)
def test_env_override_updates_deprecated_alias(self, monkeypatch):
"""env override 後,相容 alias PHASE2_STEP_TIMEOUT_SEC 也跟著更新"""
monkeypatch.setenv("AGENT_CRITIC_TIMEOUT_SEC", "8.0")
if "src.agents.critic_agent" in sys.modules:
del sys.modules["src.agents.critic_agent"]
import src.agents.critic_agent as mod
importlib.reload(mod)
assert mod.PHASE2_STEP_TIMEOUT_SEC == 8.0
assert mod.PHASE2_STEP_TIMEOUT_SEC == mod.AGENT_CRITIC_TIMEOUT_SEC
def test_agent_debate_global_timeout_env_override(self, monkeypatch):
"""AGENT_DEBATE_GLOBAL_TIMEOUT_SEC=300 覆蓋 default 420.0"""
monkeypatch.setenv("AGENT_DEBATE_GLOBAL_TIMEOUT_SEC", "300")
if "src.services.agent_orchestrator" in sys.modules:
del sys.modules["src.services.agent_orchestrator"]
import src.services.agent_orchestrator as mod
importlib.reload(mod)
assert mod.GLOBAL_TIMEOUT_SEC == 300.0
# =============================================================================
# Section 3: Metric Histogram observe 驗證
# =============================================================================
class TestAgentStepMetrics:
"""
aiops_agent_step_duration_seconds Histogram 在各情境下被正確 observe。
使用隔離的 CollectorRegistry 避免全域 REGISTRY 污染(跨測試 Duplicated timeseries
直接呼叫 observe_agent_step(),驗證 _sum / _count 值。
"""
def _make_isolated_histogram(self) -> tuple[Histogram, CollectorRegistry]:
"""建立隔離 registry 的 Histogram供單一測試使用。"""
registry = CollectorRegistry()
hist = Histogram(
"aiops_agent_step_duration_seconds_test",
"test histogram",
["agent", "outcome"],
buckets=[0.5, 1.0, 2.0, 5.0, 10.0, 15.0, 20.0, 30.0, 45.0, 60.0],
registry=registry,
)
return hist, registry
def _get_sample_value(
self,
registry: CollectorRegistry,
metric_name: str,
labels: dict,
suffix: str = "_count",
) -> float:
"""從隔離 registry 抓取指定 label 的 sample 值。"""
for metric in registry.collect():
if metric.name == metric_name:
for sample in metric.samples:
if sample.name == metric_name + suffix and sample.labels == labels:
return sample.value
return 0.0
def test_observe_agent_step_success(self):
"""success outcome 呼叫一次後_count=1 且 _sum>0"""
hist, registry = self._make_isolated_histogram()
# 直接 observe繞過全域 REGISTRY
hist.labels(agent="diagnostician", outcome="success").observe(1.5)
count = self._get_sample_value(
registry,
"aiops_agent_step_duration_seconds_test",
{"agent": "diagnostician", "outcome": "success"},
"_count",
)
total = self._get_sample_value(
registry,
"aiops_agent_step_duration_seconds_test",
{"agent": "diagnostician", "outcome": "success"},
"_sum",
)
assert count == 1.0, f"expect _count=1, got {count}"
assert total == pytest.approx(1.5), f"expect _sum=1.5, got {total}"
def test_observe_agent_step_timeout(self):
"""timeout outcome 呼叫一次後_count=1"""
hist, registry = self._make_isolated_histogram()
hist.labels(agent="solver", outcome="timeout").observe(20.1)
count = self._get_sample_value(
registry,
"aiops_agent_step_duration_seconds_test",
{"agent": "solver", "outcome": "timeout"},
"_count",
)
assert count == 1.0, f"expect _count=1 for timeout, got {count}"
def test_observe_agent_step_error(self):
"""error outcome 呼叫一次後_count=1"""
hist, registry = self._make_isolated_histogram()
hist.labels(agent="critic", outcome="error").observe(0.05)
count = self._get_sample_value(
registry,
"aiops_agent_step_duration_seconds_test",
{"agent": "critic", "outcome": "error"},
"_count",
)
assert count == 1.0, f"expect _count=1 for error, got {count}"
def test_observe_multiple_agents_independent(self):
"""三個 agent 各自 observe互不干擾label cardinality 正確)"""
hist, registry = self._make_isolated_histogram()
hist.labels(agent="diagnostician", outcome="success").observe(2.0)
hist.labels(agent="solver", outcome="success").observe(3.0)
hist.labels(agent="critic", outcome="timeout").observe(15.5)
diag_count = self._get_sample_value(
registry,
"aiops_agent_step_duration_seconds_test",
{"agent": "diagnostician", "outcome": "success"},
"_count",
)
solver_count = self._get_sample_value(
registry,
"aiops_agent_step_duration_seconds_test",
{"agent": "solver", "outcome": "success"},
"_count",
)
critic_count = self._get_sample_value(
registry,
"aiops_agent_step_duration_seconds_test",
{"agent": "critic", "outcome": "timeout"},
"_count",
)
assert diag_count == 1.0
assert solver_count == 1.0
assert critic_count == 1.0
@pytest.mark.asyncio
async def test_observe_called_on_success_via_mock(self):
"""
透過 mock 驗證 diagnostician _analyze 在成功路徑呼叫 observe_agent_step("diagnostician", "success", ...)。
策略mock openclaw.call 回傳合法 JSONmock observe_agent_step
驗證被呼叫一次且 outcome="success"
LLM 推理本身不被 mock只 mock 網路層回傳)。
"""
import src.agents.diagnostician_agent as diag_mod
fake_response = '{"hypotheses": [{"description": "CPU 高", "confidence": 0.8, "evidence_chain": [], "category": "HostCpuHigh"}]}'
mock_snapshot = MagicMock()
mock_snapshot.snapshot_id = "test-snap-001"
mock_snapshot.evidence_summary = "CPU 95%"
mock_snapshot.anomaly_context = None
with patch(
"src.agents.diagnostician_agent.observe_agent_step"
) as mock_observe, patch(
"src.services.openclaw.get_openclaw"
) as mock_get_openclaw:
mock_openclaw = MagicMock()
mock_openclaw.call = AsyncMock(
return_value=(fake_response, "nim", True)
)
mock_get_openclaw.return_value = mock_openclaw
agent = diag_mod.DiagnosticianAgent()
await agent._analyze(mock_snapshot)
mock_observe.assert_called_once()
call_args = mock_observe.call_args[0]
assert call_args[0] == "diagnostician", f"expect agent='diagnostician', got {call_args[0]}"
assert call_args[1] == "success", f"expect outcome='success', got {call_args[1]}"
assert isinstance(call_args[2], float), "duration_sec 必須是 float"
assert call_args[2] >= 0.0, "duration_sec 不能為負"
@pytest.mark.asyncio
async def test_observe_called_on_timeout_via_mock(self):
"""
透過 mock 驗證 diagnostician _analyze 在 timeout 路徑呼叫 observe_agent_step("diagnostician", "timeout", ...)。
策略mock openclaw.call 拋出 asyncio.TimeoutError模擬 wait_for 超時),
驗證 observe_agent_step 被呼叫且 outcome="timeout"
"""
import src.agents.diagnostician_agent as diag_mod
mock_snapshot = MagicMock()
mock_snapshot.snapshot_id = "test-snap-timeout"
mock_snapshot.evidence_summary = "NIM 無回應"
mock_snapshot.anomaly_context = None
with patch(
"src.agents.diagnostician_agent.observe_agent_step"
) as mock_observe, patch(
"src.agents.diagnostician_agent.asyncio.wait_for",
side_effect=asyncio.TimeoutError(),
):
agent = diag_mod.DiagnosticianAgent()
result = await agent._analyze(mock_snapshot)
mock_observe.assert_called_once()
call_args = mock_observe.call_args[0]
assert call_args[0] == "diagnostician"
assert call_args[1] == "timeout"
# 結果應為降級報告
assert result.degraded is True
@pytest.mark.asyncio
async def test_observe_called_on_solver_success(self):
"""Solver 成功路徑呼叫 observe_agent_step("solver", "success", ...)"""
import src.agents.solver_agent as solver_mod
from src.agents.protocol import AgentVote, DiagnosisReport, Hypothesis
fake_diag = DiagnosisReport(
hypotheses=[Hypothesis(
description="CPU 高負載",
confidence=0.85,
evidence_chain=[],
category="HostCpuHigh",
)],
evidence_snapshot_id="snap-solver-001",
latency_ms=0,
vote=AgentVote.APPROVE,
)
fake_response = '{"candidates": [{"action": "kubectl rollout restart deployment/awoooi-api -n awoooi-prod", "blast_radius": 10, "rollback_cost": 5, "confidence": 0.8, "rationale": "重啟清除碎片"}]}'
with patch(
"src.agents.solver_agent.observe_agent_step"
) as mock_observe, patch(
"src.services.openclaw.get_openclaw"
) as mock_get_openclaw, patch(
"src.agents.solver_agent._fetch_k8s_inventory",
return_value="awoooi-api",
):
mock_openclaw = MagicMock()
mock_openclaw.call = AsyncMock(return_value=(fake_response, "nim", True))
mock_get_openclaw.return_value = mock_openclaw
agent = solver_mod.SolverAgent()
await agent._solve(fake_diag)
mock_observe.assert_called_once()
call_args = mock_observe.call_args[0]
assert call_args[0] == "solver"
assert call_args[1] == "success"
@pytest.mark.asyncio
async def test_observe_called_on_critic_timeout(self):
"""Critic timeout 路徑呼叫 observe_agent_step("critic", "timeout", ...)"""
import src.agents.critic_agent as critic_mod
from src.agents.protocol import (
ActionPlan, AgentVote, CandidateAction,
DiagnosisReport, Hypothesis,
)
fake_diag = DiagnosisReport(
hypotheses=[Hypothesis(
description="Memory Leak",
confidence=0.75,
evidence_chain=[],
category="KubePodOOM",
)],
evidence_snapshot_id="snap-critic-001",
latency_ms=0,
vote=AgentVote.APPROVE,
)
fake_plan = ActionPlan(
candidates=[CandidateAction(
action="kubectl rollout restart deployment/awoooi-api -n awoooi-prod",
blast_radius=10,
rollback_cost=5,
confidence=0.8,
rationale="重啟",
)],
diagnosis_report=fake_diag,
latency_ms=0,
vote=AgentVote.APPROVE,
)
with patch(
"src.agents.critic_agent.observe_agent_step"
) as mock_observe, patch(
"src.agents.critic_agent.asyncio.wait_for",
side_effect=asyncio.TimeoutError(),
):
agent = critic_mod.CriticAgent()
result = await agent._critique(fake_diag, fake_plan)
mock_observe.assert_called_once()
call_args = mock_observe.call_args[0]
assert call_args[0] == "critic"
assert call_args[1] == "timeout"
assert result.degraded is True
# =============================================================================
# Section 4: Histogram buckets 驗證
# =============================================================================
class TestHistogramBuckets:
"""aiops_agent_step_duration_seconds 的 buckets 必須覆蓋 NIM 實測分佈"""
def test_expected_buckets(self):
"""buckets 必須包含 30sDiagnostician timeout 邊界)和 15sCritic timeout 邊界)"""
from src.observability.agent_step_metrics import _AGENT_STEP_BUCKETS
assert 15.0 in _AGENT_STEP_BUCKETS, "15s bucket 必須存在Critic timeout 邊界)"
assert 20.0 in _AGENT_STEP_BUCKETS, "20s bucket 必須存在Solver timeout 邊界)"
assert 30.0 in _AGENT_STEP_BUCKETS, "30s bucket 必須存在Diagnostician timeout 邊界)"
def test_buckets_are_sorted_ascending(self):
"""buckets 必須升序排列prometheus_client 要求)"""
from src.observability.agent_step_metrics import _AGENT_STEP_BUCKETS
assert _AGENT_STEP_BUCKETS == sorted(_AGENT_STEP_BUCKETS), (
f"buckets 必須升序:{_AGENT_STEP_BUCKETS}"
)