56 lines
1.8 KiB
Python
56 lines
1.8 KiB
Python
from types import SimpleNamespace
|
|
|
|
from src.agents.protocol import AgentSessionStatus
|
|
from src.services import decision_manager as decision_manager_module
|
|
from src.services.decision_manager import (
|
|
_incident_llm_timeout_seconds,
|
|
_phase2_fallback_reason,
|
|
)
|
|
|
|
|
|
def _package(**kwargs):
|
|
data = {
|
|
"session_status": AgentSessionStatus.COMPLETED,
|
|
"all_agents_degraded": False,
|
|
"recommended_action": "kubectl rollout restart deployment/awoooi-api -n awoooi-prod",
|
|
"requires_human_approval": False,
|
|
}
|
|
data.update(kwargs)
|
|
return SimpleNamespace(**data)
|
|
|
|
|
|
def test_phase2_timeout_continues_to_fallback() -> None:
|
|
pkg = _package(
|
|
session_status=AgentSessionStatus.TIMEOUT,
|
|
recommended_action=None,
|
|
requires_human_approval=True,
|
|
)
|
|
|
|
assert _phase2_fallback_reason(pkg) == "session_timeout"
|
|
|
|
|
|
def test_phase2_empty_manual_gate_continues_to_fallback() -> None:
|
|
pkg = _package(recommended_action="", requires_human_approval=True)
|
|
|
|
assert _phase2_fallback_reason(pkg) == "empty_action_requires_human"
|
|
|
|
|
|
def test_phase2_actionable_package_stays_primary() -> None:
|
|
pkg = _package()
|
|
|
|
assert _phase2_fallback_reason(pkg) is None
|
|
|
|
|
|
def test_incident_llm_timeout_uses_configured_value(monkeypatch) -> None:
|
|
monkeypatch.setattr(decision_manager_module.settings, "INCIDENT_LLM_TIMEOUT_SECONDS", 240)
|
|
monkeypatch.setattr(decision_manager_module.settings, "OPENCLAW_TIMEOUT", 120)
|
|
|
|
assert _incident_llm_timeout_seconds() == 240.0
|
|
|
|
|
|
def test_incident_llm_timeout_never_below_openclaw_timeout(monkeypatch) -> None:
|
|
monkeypatch.setattr(decision_manager_module.settings, "INCIDENT_LLM_TIMEOUT_SECONDS", 60)
|
|
monkeypatch.setattr(decision_manager_module.settings, "OPENCLAW_TIMEOUT", 120)
|
|
|
|
assert _incident_llm_timeout_seconds() == 120.0
|