Files
ewoooc/tests/test_elephant_alpha_engine.py
OoO 56cd883148
All checks were successful
CD Pipeline / deploy (push) Successful in 2m11s
fix: gate elephant price decisions to HITL
2026-06-18 10:14:46 +08:00

718 lines
25 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import asyncio
import logging
def test_run_with_timeout_supports_sync_function():
from services.elephant_alpha_autonomous_engine import ElephantAlphaAutonomousEngine
result = asyncio.run(ElephantAlphaAutonomousEngine._run_with_timeout(lambda value: value + 1, 41))
assert result == 42
def test_execute_step_rejects_unknown_action():
from services.elephant_alpha_autonomous_engine import ElephantAlphaAutonomousEngine
engine = ElephantAlphaAutonomousEngine()
try:
asyncio.run(engine._execute_step({"agent": "mystery", "action": "do_anything"}))
except ValueError as exc:
assert "Unrecognized step" in str(exc)
else:
raise AssertionError("unknown action should fail")
def test_execute_step_routes_code_fix_to_autoheal(monkeypatch):
from services.elephant_alpha_autonomous_engine import ElephantAlphaAutonomousEngine
calls = []
engine = ElephantAlphaAutonomousEngine()
monkeypatch.setattr(
engine,
"_run_auto_heal",
lambda error_type, context: calls.append((error_type, context)) or {"ok": True},
)
asyncio.run(engine._execute_step({
"agent": "elephant_alpha",
"action": "code_fix",
"parameters": {"target_file": "services/example.py", "error_message": "Traceback"},
}))
assert calls == [("python_exception", {"target_file": "services/example.py", "error_message": "Traceback"})]
def test_execute_step_routes_price_adjustment_to_human_review(monkeypatch):
from services.elephant_alpha_autonomous_engine import ElephantAlphaAutonomousEngine
calls = []
engine = ElephantAlphaAutonomousEngine()
monkeypatch.setattr(
engine,
"_record_price_adjustment_review",
lambda step: calls.append(step) or {"status": "pending_review", "sku": "SKU-9"},
)
result = asyncio.run(engine._execute_step({
"agent": "nemotron",
"action": "execute_price_adjustment",
"parameters": {"sku": "SKU-9", "recommended_price": 1280},
}))
assert result == {"status": "pending_review", "sku": "SKU-9"}
assert calls[0]["parameters"]["recommended_price"] == 1280
def test_execute_step_skips_legacy_openclaw_strategy_actions():
from services.elephant_alpha_autonomous_engine import ElephantAlphaAutonomousEngine
engine = ElephantAlphaAutonomousEngine()
for action in {
"generate_market_strategy",
"generate_dynamic_pricing_strategy",
"generate_resource_optimization_strategy",
}:
result = asyncio.run(engine._execute_step({
"agent": "openclaw",
"action": action,
}))
assert result is None
def test_autoheal_derives_python_exception_from_traceback():
from services.auto_heal_service import AutoHealService
svc = AutoHealService()
assert svc._derive_error_type({"traceback_str": "Traceback (most recent call last):\nNameError"}) == "python_exception"
def test_execute_autonomous_decision_logs_short_circuit_telemetry_failure(monkeypatch, caplog):
from services.elephant_alpha_autonomous_engine import (
AutonomousTrigger,
ElephantAlphaAutonomousEngine,
)
import services.ai_call_logger as ai_call_logger
engine = ElephantAlphaAutonomousEngine()
async def _no_hermes_threats(top_n=5):
return None
def _broken_log_ai_call(*args, **kwargs):
raise RuntimeError("ai telemetry unavailable")
monkeypatch.setattr(engine, "_fetch_hermes_threats_summary", _no_hermes_threats)
monkeypatch.setattr(ai_call_logger, "log_ai_call", _broken_log_ai_call)
caplog.set_level(logging.WARNING, logger="services.elephant_alpha_autonomous_engine")
trigger = AutonomousTrigger(
trigger_type="price_drop_alert",
conditions={},
threshold=0.8,
enabled=True,
)
asyncio.run(engine._execute_autonomous_decision(trigger))
assert "EA short-circuit telemetry failed" in caplog.text
def test_competitor_db_evidence_actions_are_concrete():
from services.elephant_alpha_autonomous_engine import ElephantAlphaAutonomousEngine
actions = ElephantAlphaAutonomousEngine._format_competitor_evidence_actions(
[
{
"sku": "SKU-1",
"name": "測試商品",
"momo_price": 1200,
"competitor_price": 990,
"price_gap_pct": 17.5,
"competitor_product_id": "D123456",
"match_score": 0.84,
"tags": [
"identity_v2",
"match_type_exact",
"price_basis_total_price",
"alert_tier_price_alert_exact",
],
}
],
trigger_type="price_drop_alert",
)
assert actions == [
"[SKU-1] 測試商品MOMO $1,200 vs PChome $990 (+17.5%)|每件價差 NT$ 210證據高信心同款 / 總價可比 / 可直接價格告警 / score 0.84|建議人工確認 PChome identity_v2 後評估跟價或促銷PChome D123456"
]
def test_execute_autonomous_decision_uses_db_evidence_without_hermes_prefetch(monkeypatch):
import services.elephant_alpha_autonomous_engine as engine_module
from services.elephant_alpha_autonomous_engine import (
AutonomousTrigger,
ElephantAlphaAutonomousEngine,
)
from services.elephant_alpha_orchestrator import StrategicDecision
engine = ElephantAlphaAutonomousEngine()
contexts = []
notified = []
async def _capture_context(context):
contexts.append(context)
return StrategicDecision(
priority="high",
agents_required=["elephant_alpha"],
reasoning="已有 DB 價差實證,允許進入決策流程。",
expected_outcome="生成可稽核行動",
confidence=0.95,
execution_plan=[],
resource_requirements={},
)
async def _fetch_should_not_run(top_n=5):
raise AssertionError("DB evidence should avoid Hermes LLM prefetch")
async def _capture_notify(decision, trigger):
notified.append(trigger.trigger_type)
monkeypatch.setattr(engine_module.elephant_orchestrator, "analyze_and_coordinate", _capture_context)
monkeypatch.setattr(engine, "_fetch_hermes_threats_summary", _fetch_should_not_run)
monkeypatch.setattr(engine, "_notify_telegram_executed", _capture_notify)
monkeypatch.setattr(engine, "_store_escalation", lambda trigger_type: None)
trigger = AutonomousTrigger(
trigger_type="price_drop_alert",
conditions={"_db_evidence_actions": ["[SKU-1] DB 實證價差"]},
threshold=0.8,
enabled=True,
)
asyncio.run(engine._execute_autonomous_decision(trigger))
assert contexts[0]["conditions"]["_prefetched_hermes_threats"] == ["[SKU-1] DB 實證價差"]
assert notified == ["price_drop_alert"]
def test_high_confidence_price_decision_skips_execution_plan_and_goes_hitl(monkeypatch):
import services.elephant_alpha_autonomous_engine as engine_module
from services.elephant_alpha_autonomous_engine import (
AutonomousTrigger,
ElephantAlphaAutonomousEngine,
)
from services.elephant_alpha_orchestrator import StrategicDecision
engine = ElephantAlphaAutonomousEngine()
notified = []
stored = []
async def _capture_context(context):
return StrategicDecision(
priority="high",
agents_required=["hermes", "nemotron"],
reasoning="已有 DB 價格比對實證,轉人工覆核。",
expected_outcome="人工確認價格策略",
confidence=0.96,
execution_plan=[
{"agent": "hermes", "action": "analyze_price_competition"},
{"agent": "nemotron", "action": "dispatch_alert", "parameters": {"threats": []}},
],
resource_requirements={},
)
async def _execution_should_not_run(decision):
raise AssertionError("price decisions must not execute long-running autonomous steps")
async def _capture_notify(decision, trigger):
notified.append((trigger.trigger_type, len(decision.execution_plan)))
monkeypatch.setattr(engine_module.elephant_orchestrator, "analyze_and_coordinate", _capture_context)
monkeypatch.setattr(engine, "_execute_decision", _execution_should_not_run)
monkeypatch.setattr(engine, "_notify_telegram_executed", _capture_notify)
monkeypatch.setattr(engine, "_store_escalation", lambda trigger_type: stored.append(trigger_type))
trigger = AutonomousTrigger(
trigger_type="price_drop_alert",
conditions={"_db_evidence_actions": ["[SKU-1] DB 實證價差"]},
threshold=0.8,
enabled=True,
)
asyncio.run(engine._execute_autonomous_decision(trigger))
assert notified == [("price_drop_alert", 2)]
assert stored == ["price_drop_alert"]
def test_price_review_decision_envelope_blocks_auto_execution():
from services.elephant_alpha_autonomous_engine import (
AutonomousTrigger,
ElephantAlphaAutonomousEngine,
)
from services.elephant_alpha_orchestrator import StrategicDecision
decision = StrategicDecision(
priority="high",
agents_required=["elephant_alpha"],
reasoning="已有價格比對實證。",
expected_outcome="人工確認價格策略。",
confidence=0.96,
execution_plan=[{"agent": "hermes", "action": "analyze_price_competition"}],
resource_requirements={},
)
trigger = AutonomousTrigger(
trigger_type="price_drop_alert",
conditions={},
threshold=0.8,
enabled=True,
)
envelope = ElephantAlphaAutonomousEngine._build_price_review_decision_envelope(
decision,
trigger,
concrete_actions=["[SKU-1] MOMO $1,200 vs PChome $990"],
)
assert envelope["decision_type"] == "price_decision_review"
assert envelope["source_agent"] == "elephant_alpha"
assert envelope["recommended_action"]["requires_hitl"] is True
assert envelope["guardrails"]["can_auto_execute"] is False
assert envelope["guardrails"]["blocked_reason"] == "price decisions require HITL; execution_plan skipped"
assert envelope["guardrails"]["data_quality"] == "complete"
def test_price_trigger_queries_use_lateral_latest_price_lookup(monkeypatch):
import services.elephant_alpha_autonomous_engine as engine_module
from services.elephant_alpha_autonomous_engine import (
AutonomousTrigger,
ElephantAlphaAutonomousEngine,
)
captured_sql = []
class _FakeResult:
def mappings(self):
return self
def fetchall(self):
return []
class _FakeSession:
def execute(self, statement, params=None):
captured_sql.append(str(statement))
return _FakeResult()
def close(self):
return None
monkeypatch.setattr(engine_module, "get_session", lambda: _FakeSession())
engine = ElephantAlphaAutonomousEngine()
asyncio.run(engine._check_price_drop_trigger(AutonomousTrigger("price_drop_alert", {}, 0.8, True)))
asyncio.run(engine._check_market_opportunity_trigger(AutonomousTrigger("market_opportunity", {}, 0.8, True)))
assert engine._fetch_recent_competitor_evidence_actions(top_n=2) is None
assert len(captured_sql) == 3
assert all("JOIN LATERAL" in sql for sql in captured_sql)
assert all("SELECT DISTINCT ON (product_id)" not in sql for sql in captured_sql)
assert all("latest_momo" not in sql for sql in captured_sql)
def test_escalate_resource_optimization_without_evidence_is_suppressed(monkeypatch):
import services.elephant_alpha_autonomous_engine as engine_module
from services.elephant_alpha_autonomous_engine import (
AutonomousTrigger,
ElephantAlphaAutonomousEngine,
)
from services.elephant_alpha_orchestrator import StrategicDecision
engine = ElephantAlphaAutonomousEngine()
suppressed = []
cooldown = []
def _raise_if_db_opened():
raise AssertionError("no-concrete resource escalation should not write human_review")
monkeypatch.setattr(engine_module, "get_session", _raise_if_db_opened)
monkeypatch.setattr(engine, "_store_escalation", lambda trigger_type: cooldown.append(trigger_type))
monkeypatch.setattr(
engine,
"_record_suppressed_escalation",
lambda decision, trigger, reason: suppressed.append((trigger.trigger_type, reason)),
)
decision = StrategicDecision(
priority="medium",
agents_required=["openclaw"],
reasoning="資源調配建議信心不足",
expected_outcome="待人工確認",
confidence=0.60,
execution_plan=[],
resource_requirements={},
)
trigger = AutonomousTrigger(
trigger_type="resource_optimization",
conditions={"_resource_metrics": {"action_queue_size": 14, "system_load_pct": 52.0}},
threshold=0.6,
enabled=True,
)
asyncio.run(engine._escalate_to_human(decision, trigger))
assert cooldown == ["resource_optimization"]
assert suppressed == [("resource_optimization", "no_concrete_evidence")]
def test_escalate_price_alert_without_evidence_is_suppressed(monkeypatch):
import services.elephant_alpha_autonomous_engine as engine_module
from services.elephant_alpha_autonomous_engine import (
AutonomousTrigger,
ElephantAlphaAutonomousEngine,
)
from services.elephant_alpha_orchestrator import StrategicDecision
engine = ElephantAlphaAutonomousEngine()
suppressed = []
cooldown = []
async def _no_concrete_actions(top_n=5):
return None
def _raise_if_db_opened():
raise AssertionError("no-concrete price escalation should not write human_review")
monkeypatch.setattr(engine, "_fetch_hermes_threats_summary", _no_concrete_actions)
monkeypatch.setattr(engine_module, "get_session", _raise_if_db_opened)
monkeypatch.setattr(engine, "_store_escalation", lambda trigger_type: cooldown.append(trigger_type))
monkeypatch.setattr(
engine,
"_record_suppressed_escalation",
lambda decision, trigger, reason: suppressed.append((trigger.trigger_type, reason)),
)
decision = StrategicDecision(
priority="medium",
agents_required=["openclaw"],
reasoning="價格調整建議信心不足",
expected_outcome="待人工確認",
confidence=0.62,
execution_plan=[],
resource_requirements={},
)
trigger = AutonomousTrigger(
trigger_type="price_drop_alert",
conditions={},
threshold=0.8,
enabled=True,
)
asyncio.run(engine._escalate_to_human(decision, trigger))
assert cooldown == ["price_drop_alert"]
assert suppressed == [("price_drop_alert", "no_concrete_evidence")]
def test_resource_pressure_classifier_does_not_equate_backlog_with_cpu_load():
from services.elephant_alpha_autonomous_engine import ElephantAlphaAutonomousEngine
metrics = ElephantAlphaAutonomousEngine._classify_resource_pressure({
"action_queue_size": 34,
"high_priority_count": 0,
"human_review_count": 0,
"stale_count": 0,
"system_load_pct": 19.2,
"queue_threshold": 10,
"load_threshold_pct": 80,
"high_priority_threshold": 5,
"stale_threshold": 5,
})
assert metrics["pressure_level"] == "backlog_only"
assert metrics["should_alert"] is False
assert metrics["load_pressure"] is False
assert "system_load=19.2%/80%" in metrics["evidence"]
def test_resource_pressure_classifier_alerts_on_actionable_review_backlog():
from services.elephant_alpha_autonomous_engine import ElephantAlphaAutonomousEngine
metrics = ElephantAlphaAutonomousEngine._classify_resource_pressure({
"action_queue_size": 34,
"high_priority_count": 8,
"human_review_count": 6,
"stale_count": 0,
"system_load_pct": 22.0,
"queue_threshold": 10,
"load_threshold_pct": 80,
"high_priority_threshold": 5,
"stale_threshold": 5,
})
assert metrics["pressure_level"] == "warning"
assert metrics["should_alert"] is True
assert metrics["high_priority_pressure"] is True
assert metrics["load_pressure"] is False
def test_resource_pressure_message_is_measurement_based_not_llm_theatre():
from services.elephant_alpha_autonomous_engine import ElephantAlphaAutonomousEngine
metrics = ElephantAlphaAutonomousEngine._classify_resource_pressure({
"action_queue_size": 34,
"high_priority_count": 8,
"human_review_count": 6,
"stale_count": 5,
"system_load_pct": 22.0,
"queue_threshold": 10,
"load_threshold_pct": 80,
"high_priority_threshold": 5,
"stale_threshold": 5,
"stale_hours": 24,
"top_action_items": [
{
"id": 42,
"status": "pending",
"priority": 1,
"age_hours": 31.5,
"type": "code_review_fix",
"description": "修正高優先錯誤",
}
],
})
msg = ElephantAlphaAutonomousEngine._build_resource_pressure_telegram_message(
metrics,
insight_id=123,
previous_limit=10,
new_limit=8,
)
assert "量測指標" in msg
assert "系統處置紀錄" in msg
assert "待處理焦點" in msg
assert "#42 P1 pending code_review_fix" in msg
assert "主機 CPU 未達高負載門檻" in msg
assert "未啟動 Hermes/NemoTron 價格分析" in msg
assert "預期效益" not in msg
assert "48小時" not in msg
assert "48 小時效益預測" in msg
def test_resource_pressure_decision_envelope_is_measurement_based():
from services.elephant_alpha_autonomous_engine import ElephantAlphaAutonomousEngine
metrics = ElephantAlphaAutonomousEngine._classify_resource_pressure({
"action_queue_size": 34,
"high_priority_count": 8,
"human_review_count": 6,
"stale_count": 5,
"system_load_pct": 22.0,
"queue_threshold": 10,
"load_threshold_pct": 80,
"high_priority_threshold": 5,
"stale_threshold": 5,
"stale_hours": 24,
})
envelope = ElephantAlphaAutonomousEngine._build_resource_pressure_decision_envelope(
metrics,
insight_id=1942,
previous_limit=10,
new_limit=8,
)
msg = ElephantAlphaAutonomousEngine._build_resource_pressure_telegram_message(
metrics,
insight_id=1942,
previous_limit=10,
new_limit=8,
decision_envelope=envelope,
)
assert envelope["decision_id"] == "ea_resource_pressure_1942"
assert envelope["source_agent"] == "elephant_alpha"
assert envelope["decision_type"] == "resource_optimization"
assert envelope["severity"] == "P2"
assert envelope["guardrails"]["can_auto_execute"] is False
assert envelope["guardrails"]["llm_used"] is False
assert envelope["trace"]["provider"] == "action_plans_cpu_probe"
assert "🧭 <b>決策信封</b>" in msg
assert "資料品質:<code>complete</code> 自動執行:<b>不允許</b>" in msg
assert "不採用 LLM 生成的 48 小時效益預測" in msg
assert "Gemini" not in msg
def test_resource_optimization_bypasses_llm_orchestrator(monkeypatch):
import services.elephant_alpha_autonomous_engine as engine_module
from services.elephant_alpha_autonomous_engine import (
AutonomousTrigger,
ElephantAlphaAutonomousEngine,
)
engine = ElephantAlphaAutonomousEngine()
sent = []
stored = []
async def _boom(*args, **kwargs):
raise AssertionError("resource_optimization must not call LLM orchestrator")
async def _capture_send(*args, **kwargs):
sent.append(args)
monkeypatch.setattr(engine_module.elephant_orchestrator, "analyze_and_coordinate", _boom)
monkeypatch.setattr(engine, "_run_action_plan_hygiene", lambda: {"updated_count": 0})
monkeypatch.setattr(engine, "_record_resource_pressure_insight", lambda *args, **kwargs: 77)
monkeypatch.setattr(engine, "_send_resource_pressure_telegram", _capture_send)
monkeypatch.setattr(engine, "_store_escalation", lambda trigger_type: stored.append(trigger_type))
trigger = AutonomousTrigger(
trigger_type="resource_optimization",
conditions={
"_resource_metrics": {
"action_queue_size": 34,
"high_priority_count": 9,
"human_review_count": 6,
"stale_count": 0,
"system_load_pct": 20.0,
"queue_threshold": 10,
"load_threshold_pct": 80,
"high_priority_threshold": 5,
"stale_threshold": 5,
}
},
threshold=0.6,
enabled=True,
)
asyncio.run(engine._execute_autonomous_decision(trigger))
assert stored == ["resource_optimization"]
assert len(sent) == 1
assert engine.max_autonomous_decisions_per_hour == 8
def test_resource_pressure_message_reports_hygiene_result():
from services.elephant_alpha_autonomous_engine import ElephantAlphaAutonomousEngine
metrics = ElephantAlphaAutonomousEngine._classify_resource_pressure({
"action_queue_size": 9,
"high_priority_count": 0,
"human_review_count": 0,
"stale_count": 0,
"system_load_pct": 20.0,
"queue_threshold": 10,
"load_threshold_pct": 80,
"high_priority_threshold": 5,
"stale_threshold": 5,
})
metrics["pre_hygiene"] = {
"action_queue_size": 100,
"high_priority_count": 47,
"stale_count": 99,
}
metrics["hygiene_result"] = {
"updated_count": 91,
"by_source": {"code_review_fix": 66, "openclaw_recommendation": 25},
}
msg = ElephantAlphaAutonomousEngine._build_resource_pressure_telegram_message(
metrics,
insight_id=124,
previous_limit=10,
new_limit=10,
)
assert "P4 resolved" in msg
assert "清理前 Action queue100" in msg
assert "已自動關閉過期 action_plans 91 筆" in msg
assert "只改 status/metadata不刪除資料" in msg
def test_resource_optimization_cannot_use_legacy_autonomous_execution_template(monkeypatch):
from services.elephant_alpha_autonomous_engine import (
AutonomousTrigger,
ElephantAlphaAutonomousEngine,
)
from services.elephant_alpha_orchestrator import StrategicDecision
engine = ElephantAlphaAutonomousEngine()
sent = []
async def _capture_send(message):
sent.append(message)
monkeypatch.setattr("services.telegram_templates._send_telegram_raw", _capture_send)
decision = StrategicDecision(
priority="medium",
agents_required=["elephant_alpha"],
reasoning="resource pressure test",
expected_outcome="old style outcome must not be sent",
confidence=0.8,
execution_plan=[{"agent": "Hermes", "action": "generate_resource_optimization_strategy"}],
resource_requirements={},
)
trigger = AutonomousTrigger(
trigger_type="resource_optimization",
conditions={},
threshold=0.6,
enabled=True,
)
asyncio.run(engine._notify_telegram_executed(decision, trigger))
assert sent == []
def test_human_escalation_decision_envelope_blocks_auto_execution():
from services.elephant_alpha_autonomous_engine import (
AutonomousTrigger,
ElephantAlphaAutonomousEngine,
)
from services.elephant_alpha_orchestrator import StrategicDecision
decision = StrategicDecision(
priority="medium",
agents_required=["elephant_alpha"],
reasoning="需要人工判讀。",
expected_outcome="不自動執行。",
confidence=0.62,
execution_plan=[],
resource_requirements={},
)
trigger = AutonomousTrigger(
trigger_type="code_exception",
conditions={"scan_containers": ["momo-pro-system"]},
threshold=1.0,
enabled=True,
)
envelope = ElephantAlphaAutonomousEngine._build_human_escalation_decision_envelope(
decision,
trigger,
insight_id=77,
concrete_actions=[],
)
assert envelope["decision_id"] == "ea_review_77"
assert envelope["decision_type"] == "ea_escalation"
assert envelope["guardrails"]["can_auto_execute"] is False
assert envelope["guardrails"]["data_quality"] == "partial"
assert envelope["recommended_action"]["requires_hitl"] is True
assert envelope["trace"]["provider"] == "elephant_alpha"
def test_elephant_alpha_openclaw_registry_is_ollama_first():
from services.elephant_alpha_orchestrator import ElephantAlphaOrchestrator
orchestrator = ElephantAlphaOrchestrator()
openclaw = orchestrator.agents["openclaw"]
assert not openclaw.model.startswith("gemini")
assert openclaw.model == "qwen2.5-coder:7b"
assert "Gemini is not a primary agent model" in orchestrator.system_prompt
assert "Ollama cascade" in orchestrator.system_prompt