428 lines
14 KiB
Python
428 lines
14 KiB
Python
import asyncio
|
||
import logging
|
||
|
||
|
||
def test_run_with_timeout_supports_sync_function():
|
||
from services.elephant_alpha_autonomous_engine import ElephantAlphaAutonomousEngine
|
||
|
||
result = asyncio.run(ElephantAlphaAutonomousEngine._run_with_timeout(lambda value: value + 1, 41))
|
||
|
||
assert result == 42
|
||
|
||
|
||
def test_execute_step_rejects_unknown_action():
|
||
from services.elephant_alpha_autonomous_engine import ElephantAlphaAutonomousEngine
|
||
|
||
engine = ElephantAlphaAutonomousEngine()
|
||
|
||
try:
|
||
asyncio.run(engine._execute_step({"agent": "mystery", "action": "do_anything"}))
|
||
except ValueError as exc:
|
||
assert "Unrecognized step" in str(exc)
|
||
else:
|
||
raise AssertionError("unknown action should fail")
|
||
|
||
|
||
def test_execute_step_routes_code_fix_to_autoheal(monkeypatch):
|
||
from services.elephant_alpha_autonomous_engine import ElephantAlphaAutonomousEngine
|
||
|
||
calls = []
|
||
engine = ElephantAlphaAutonomousEngine()
|
||
monkeypatch.setattr(
|
||
engine,
|
||
"_run_auto_heal",
|
||
lambda error_type, context: calls.append((error_type, context)) or {"ok": True},
|
||
)
|
||
|
||
asyncio.run(engine._execute_step({
|
||
"agent": "elephant_alpha",
|
||
"action": "code_fix",
|
||
"parameters": {"target_file": "services/example.py", "error_message": "Traceback"},
|
||
}))
|
||
|
||
assert calls == [("python_exception", {"target_file": "services/example.py", "error_message": "Traceback"})]
|
||
|
||
|
||
def test_execute_step_routes_price_adjustment_to_human_review(monkeypatch):
|
||
from services.elephant_alpha_autonomous_engine import ElephantAlphaAutonomousEngine
|
||
|
||
calls = []
|
||
engine = ElephantAlphaAutonomousEngine()
|
||
monkeypatch.setattr(
|
||
engine,
|
||
"_record_price_adjustment_review",
|
||
lambda step: calls.append(step) or {"status": "pending_review", "sku": "SKU-9"},
|
||
)
|
||
|
||
result = asyncio.run(engine._execute_step({
|
||
"agent": "nemotron",
|
||
"action": "execute_price_adjustment",
|
||
"parameters": {"sku": "SKU-9", "recommended_price": 1280},
|
||
}))
|
||
|
||
assert result == {"status": "pending_review", "sku": "SKU-9"}
|
||
assert calls[0]["parameters"]["recommended_price"] == 1280
|
||
|
||
|
||
def test_execute_step_skips_legacy_openclaw_resource_strategy():
|
||
from services.elephant_alpha_autonomous_engine import ElephantAlphaAutonomousEngine
|
||
|
||
engine = ElephantAlphaAutonomousEngine()
|
||
|
||
result = asyncio.run(engine._execute_step({
|
||
"agent": "openclaw",
|
||
"action": "generate_resource_optimization_strategy",
|
||
}))
|
||
|
||
assert result is None
|
||
|
||
|
||
def test_autoheal_derives_python_exception_from_traceback():
|
||
from services.auto_heal_service import AutoHealService
|
||
|
||
svc = AutoHealService()
|
||
|
||
assert svc._derive_error_type({"traceback_str": "Traceback (most recent call last):\nNameError"}) == "python_exception"
|
||
|
||
|
||
def test_execute_autonomous_decision_logs_short_circuit_telemetry_failure(monkeypatch, caplog):
|
||
from services.elephant_alpha_autonomous_engine import (
|
||
AutonomousTrigger,
|
||
ElephantAlphaAutonomousEngine,
|
||
)
|
||
import services.ai_call_logger as ai_call_logger
|
||
|
||
engine = ElephantAlphaAutonomousEngine()
|
||
|
||
async def _no_hermes_threats(top_n=5):
|
||
return None
|
||
|
||
def _broken_log_ai_call(*args, **kwargs):
|
||
raise RuntimeError("ai telemetry unavailable")
|
||
|
||
monkeypatch.setattr(engine, "_fetch_hermes_threats_summary", _no_hermes_threats)
|
||
monkeypatch.setattr(ai_call_logger, "log_ai_call", _broken_log_ai_call)
|
||
caplog.set_level(logging.WARNING, logger="services.elephant_alpha_autonomous_engine")
|
||
|
||
trigger = AutonomousTrigger(
|
||
trigger_type="price_drop_alert",
|
||
conditions={},
|
||
threshold=0.8,
|
||
enabled=True,
|
||
)
|
||
|
||
asyncio.run(engine._execute_autonomous_decision(trigger))
|
||
|
||
assert "EA short-circuit telemetry failed" in caplog.text
|
||
|
||
|
||
def test_competitor_db_evidence_actions_are_concrete():
|
||
from services.elephant_alpha_autonomous_engine import ElephantAlphaAutonomousEngine
|
||
|
||
actions = ElephantAlphaAutonomousEngine._format_competitor_evidence_actions(
|
||
[
|
||
{
|
||
"sku": "SKU-1",
|
||
"name": "測試商品",
|
||
"momo_price": 1200,
|
||
"competitor_price": 990,
|
||
"price_gap_pct": 17.5,
|
||
"competitor_product_id": "D123456",
|
||
}
|
||
],
|
||
trigger_type="price_drop_alert",
|
||
)
|
||
|
||
assert actions == [
|
||
"[SKU-1] 測試商品|MOMO $1,200 vs PChome $990 (+17.5%)|每件價差 NT$ 210|建議人工確認 PChome identity_v2 後評估跟價或促銷|PChome D123456"
|
||
]
|
||
|
||
|
||
def test_execute_autonomous_decision_uses_db_evidence_without_hermes_prefetch(monkeypatch):
|
||
import services.elephant_alpha_autonomous_engine as engine_module
|
||
from services.elephant_alpha_autonomous_engine import (
|
||
AutonomousTrigger,
|
||
ElephantAlphaAutonomousEngine,
|
||
)
|
||
from services.elephant_alpha_orchestrator import StrategicDecision
|
||
|
||
engine = ElephantAlphaAutonomousEngine()
|
||
contexts = []
|
||
notified = []
|
||
|
||
async def _capture_context(context):
|
||
contexts.append(context)
|
||
return StrategicDecision(
|
||
priority="high",
|
||
agents_required=["elephant_alpha"],
|
||
reasoning="已有 DB 價差實證,允許進入決策流程。",
|
||
expected_outcome="生成可稽核行動",
|
||
confidence=0.95,
|
||
execution_plan=[],
|
||
resource_requirements={},
|
||
)
|
||
|
||
async def _fetch_should_not_run(top_n=5):
|
||
raise AssertionError("DB evidence should avoid Hermes LLM prefetch")
|
||
|
||
async def _capture_notify(decision, trigger):
|
||
notified.append(trigger.trigger_type)
|
||
|
||
monkeypatch.setattr(engine_module.elephant_orchestrator, "analyze_and_coordinate", _capture_context)
|
||
monkeypatch.setattr(engine, "_fetch_hermes_threats_summary", _fetch_should_not_run)
|
||
monkeypatch.setattr(engine, "_notify_telegram_executed", _capture_notify)
|
||
monkeypatch.setattr(engine, "_store_escalation", lambda trigger_type: None)
|
||
|
||
trigger = AutonomousTrigger(
|
||
trigger_type="price_drop_alert",
|
||
conditions={"_db_evidence_actions": ["[SKU-1] DB 實證價差"]},
|
||
threshold=0.8,
|
||
enabled=True,
|
||
)
|
||
|
||
asyncio.run(engine._execute_autonomous_decision(trigger))
|
||
|
||
assert contexts[0]["conditions"]["_prefetched_hermes_threats"] == ["[SKU-1] DB 實證價差"]
|
||
assert notified == ["price_drop_alert"]
|
||
|
||
|
||
def test_escalate_resource_optimization_without_evidence_is_suppressed(monkeypatch):
|
||
import services.elephant_alpha_autonomous_engine as engine_module
|
||
from services.elephant_alpha_autonomous_engine import (
|
||
AutonomousTrigger,
|
||
ElephantAlphaAutonomousEngine,
|
||
)
|
||
from services.elephant_alpha_orchestrator import StrategicDecision
|
||
|
||
engine = ElephantAlphaAutonomousEngine()
|
||
suppressed = []
|
||
cooldown = []
|
||
|
||
def _raise_if_db_opened():
|
||
raise AssertionError("no-concrete resource escalation should not write human_review")
|
||
|
||
monkeypatch.setattr(engine_module, "get_session", _raise_if_db_opened)
|
||
monkeypatch.setattr(engine, "_store_escalation", lambda trigger_type: cooldown.append(trigger_type))
|
||
monkeypatch.setattr(
|
||
engine,
|
||
"_record_suppressed_escalation",
|
||
lambda decision, trigger, reason: suppressed.append((trigger.trigger_type, reason)),
|
||
)
|
||
|
||
decision = StrategicDecision(
|
||
priority="medium",
|
||
agents_required=["openclaw"],
|
||
reasoning="資源調配建議信心不足",
|
||
expected_outcome="待人工確認",
|
||
confidence=0.60,
|
||
execution_plan=[],
|
||
resource_requirements={},
|
||
)
|
||
trigger = AutonomousTrigger(
|
||
trigger_type="resource_optimization",
|
||
conditions={"_resource_metrics": {"action_queue_size": 14, "system_load_pct": 52.0}},
|
||
threshold=0.6,
|
||
enabled=True,
|
||
)
|
||
|
||
asyncio.run(engine._escalate_to_human(decision, trigger))
|
||
|
||
assert cooldown == ["resource_optimization"]
|
||
assert suppressed == [("resource_optimization", "no_concrete_evidence")]
|
||
|
||
|
||
def test_resource_pressure_classifier_does_not_equate_backlog_with_cpu_load():
|
||
from services.elephant_alpha_autonomous_engine import ElephantAlphaAutonomousEngine
|
||
|
||
metrics = ElephantAlphaAutonomousEngine._classify_resource_pressure({
|
||
"action_queue_size": 34,
|
||
"high_priority_count": 0,
|
||
"human_review_count": 0,
|
||
"stale_count": 0,
|
||
"system_load_pct": 19.2,
|
||
"queue_threshold": 10,
|
||
"load_threshold_pct": 80,
|
||
"high_priority_threshold": 5,
|
||
"stale_threshold": 5,
|
||
})
|
||
|
||
assert metrics["pressure_level"] == "backlog_only"
|
||
assert metrics["should_alert"] is False
|
||
assert metrics["load_pressure"] is False
|
||
assert "system_load=19.2%/80%" in metrics["evidence"]
|
||
|
||
|
||
def test_resource_pressure_classifier_alerts_on_actionable_review_backlog():
|
||
from services.elephant_alpha_autonomous_engine import ElephantAlphaAutonomousEngine
|
||
|
||
metrics = ElephantAlphaAutonomousEngine._classify_resource_pressure({
|
||
"action_queue_size": 34,
|
||
"high_priority_count": 8,
|
||
"human_review_count": 6,
|
||
"stale_count": 0,
|
||
"system_load_pct": 22.0,
|
||
"queue_threshold": 10,
|
||
"load_threshold_pct": 80,
|
||
"high_priority_threshold": 5,
|
||
"stale_threshold": 5,
|
||
})
|
||
|
||
assert metrics["pressure_level"] == "warning"
|
||
assert metrics["should_alert"] is True
|
||
assert metrics["high_priority_pressure"] is True
|
||
assert metrics["load_pressure"] is False
|
||
|
||
|
||
def test_resource_pressure_message_is_measurement_based_not_llm_theatre():
|
||
from services.elephant_alpha_autonomous_engine import ElephantAlphaAutonomousEngine
|
||
|
||
metrics = ElephantAlphaAutonomousEngine._classify_resource_pressure({
|
||
"action_queue_size": 34,
|
||
"high_priority_count": 8,
|
||
"human_review_count": 6,
|
||
"stale_count": 5,
|
||
"system_load_pct": 22.0,
|
||
"queue_threshold": 10,
|
||
"load_threshold_pct": 80,
|
||
"high_priority_threshold": 5,
|
||
"stale_threshold": 5,
|
||
"stale_hours": 24,
|
||
})
|
||
|
||
msg = ElephantAlphaAutonomousEngine._build_resource_pressure_telegram_message(
|
||
metrics,
|
||
insight_id=123,
|
||
previous_limit=10,
|
||
new_limit=8,
|
||
)
|
||
|
||
assert "量測指標" in msg
|
||
assert "系統處置紀錄" in msg
|
||
assert "主機 CPU 未達高負載門檻" in msg
|
||
assert "未啟動 Hermes/NemoTron 價格分析" in msg
|
||
assert "預期效益" not in msg
|
||
assert "48小時" not in msg
|
||
assert "48 小時效益預測" in msg
|
||
|
||
|
||
def test_resource_optimization_bypasses_llm_orchestrator(monkeypatch):
|
||
import services.elephant_alpha_autonomous_engine as engine_module
|
||
from services.elephant_alpha_autonomous_engine import (
|
||
AutonomousTrigger,
|
||
ElephantAlphaAutonomousEngine,
|
||
)
|
||
|
||
engine = ElephantAlphaAutonomousEngine()
|
||
sent = []
|
||
stored = []
|
||
|
||
async def _boom(*args, **kwargs):
|
||
raise AssertionError("resource_optimization must not call LLM orchestrator")
|
||
|
||
async def _capture_send(*args, **kwargs):
|
||
sent.append(args)
|
||
|
||
monkeypatch.setattr(engine_module.elephant_orchestrator, "analyze_and_coordinate", _boom)
|
||
monkeypatch.setattr(engine, "_run_action_plan_hygiene", lambda: {"updated_count": 0})
|
||
monkeypatch.setattr(engine, "_record_resource_pressure_insight", lambda *args, **kwargs: 77)
|
||
monkeypatch.setattr(engine, "_send_resource_pressure_telegram", _capture_send)
|
||
monkeypatch.setattr(engine, "_store_escalation", lambda trigger_type: stored.append(trigger_type))
|
||
|
||
trigger = AutonomousTrigger(
|
||
trigger_type="resource_optimization",
|
||
conditions={
|
||
"_resource_metrics": {
|
||
"action_queue_size": 34,
|
||
"high_priority_count": 9,
|
||
"human_review_count": 6,
|
||
"stale_count": 0,
|
||
"system_load_pct": 20.0,
|
||
"queue_threshold": 10,
|
||
"load_threshold_pct": 80,
|
||
"high_priority_threshold": 5,
|
||
"stale_threshold": 5,
|
||
}
|
||
},
|
||
threshold=0.6,
|
||
enabled=True,
|
||
)
|
||
|
||
asyncio.run(engine._execute_autonomous_decision(trigger))
|
||
|
||
assert stored == ["resource_optimization"]
|
||
assert len(sent) == 1
|
||
assert engine.max_autonomous_decisions_per_hour == 8
|
||
|
||
|
||
def test_resource_pressure_message_reports_hygiene_result():
|
||
from services.elephant_alpha_autonomous_engine import ElephantAlphaAutonomousEngine
|
||
|
||
metrics = ElephantAlphaAutonomousEngine._classify_resource_pressure({
|
||
"action_queue_size": 9,
|
||
"high_priority_count": 0,
|
||
"human_review_count": 0,
|
||
"stale_count": 0,
|
||
"system_load_pct": 20.0,
|
||
"queue_threshold": 10,
|
||
"load_threshold_pct": 80,
|
||
"high_priority_threshold": 5,
|
||
"stale_threshold": 5,
|
||
})
|
||
metrics["pre_hygiene"] = {
|
||
"action_queue_size": 100,
|
||
"high_priority_count": 47,
|
||
"stale_count": 99,
|
||
}
|
||
metrics["hygiene_result"] = {
|
||
"updated_count": 91,
|
||
"by_source": {"code_review_fix": 66, "openclaw_recommendation": 25},
|
||
}
|
||
|
||
msg = ElephantAlphaAutonomousEngine._build_resource_pressure_telegram_message(
|
||
metrics,
|
||
insight_id=124,
|
||
previous_limit=10,
|
||
new_limit=10,
|
||
)
|
||
|
||
assert "P4 resolved" in msg
|
||
assert "清理前 Action queue:100" in msg
|
||
assert "已自動關閉過期 action_plans 91 筆" in msg
|
||
assert "只改 status/metadata,不刪除資料" in msg
|
||
|
||
|
||
def test_resource_optimization_cannot_use_legacy_autonomous_execution_template(monkeypatch):
|
||
from services.elephant_alpha_autonomous_engine import (
|
||
AutonomousTrigger,
|
||
ElephantAlphaAutonomousEngine,
|
||
)
|
||
from services.elephant_alpha_orchestrator import StrategicDecision
|
||
|
||
engine = ElephantAlphaAutonomousEngine()
|
||
sent = []
|
||
|
||
async def _capture_send(message):
|
||
sent.append(message)
|
||
|
||
monkeypatch.setattr("services.telegram_templates._send_telegram_raw", _capture_send)
|
||
|
||
decision = StrategicDecision(
|
||
priority="medium",
|
||
agents_required=["elephant_alpha"],
|
||
reasoning="resource pressure test",
|
||
expected_outcome="old style outcome must not be sent",
|
||
confidence=0.8,
|
||
execution_plan=[{"agent": "Hermes", "action": "generate_resource_optimization_strategy"}],
|
||
resource_requirements={},
|
||
)
|
||
trigger = AutonomousTrigger(
|
||
trigger_type="resource_optimization",
|
||
conditions={},
|
||
threshold=0.6,
|
||
enabled=True,
|
||
)
|
||
|
||
asyncio.run(engine._notify_telegram_executed(decision, trigger))
|
||
|
||
assert sent == []
|