import asyncio import logging def test_run_with_timeout_supports_sync_function(): from services.elephant_alpha_autonomous_engine import ElephantAlphaAutonomousEngine result = asyncio.run(ElephantAlphaAutonomousEngine._run_with_timeout(lambda value: value + 1, 41)) assert result == 42 def test_execute_step_rejects_unknown_action(): from services.elephant_alpha_autonomous_engine import ElephantAlphaAutonomousEngine engine = ElephantAlphaAutonomousEngine() try: asyncio.run(engine._execute_step({"agent": "mystery", "action": "do_anything"})) except ValueError as exc: assert "Unrecognized step" in str(exc) else: raise AssertionError("unknown action should fail") def test_execute_step_routes_code_fix_to_autoheal(monkeypatch): from services.elephant_alpha_autonomous_engine import ElephantAlphaAutonomousEngine calls = [] engine = ElephantAlphaAutonomousEngine() monkeypatch.setattr( engine, "_run_auto_heal", lambda error_type, context: calls.append((error_type, context)) or {"ok": True}, ) asyncio.run(engine._execute_step({ "agent": "elephant_alpha", "action": "code_fix", "parameters": {"target_file": "services/example.py", "error_message": "Traceback"}, })) assert calls == [("python_exception", {"target_file": "services/example.py", "error_message": "Traceback"})] def test_execute_step_routes_price_adjustment_to_human_review(monkeypatch): from services.elephant_alpha_autonomous_engine import ElephantAlphaAutonomousEngine calls = [] engine = ElephantAlphaAutonomousEngine() monkeypatch.setattr( engine, "_record_price_adjustment_review", lambda step: calls.append(step) or {"status": "pending_review", "sku": "SKU-9"}, ) result = asyncio.run(engine._execute_step({ "agent": "nemotron", "action": "execute_price_adjustment", "parameters": {"sku": "SKU-9", "recommended_price": 1280}, })) assert result == {"status": "pending_review", "sku": "SKU-9"} assert calls[0]["parameters"]["recommended_price"] == 1280 def test_execute_step_skips_legacy_openclaw_strategy_actions(): from services.elephant_alpha_autonomous_engine import ElephantAlphaAutonomousEngine engine = ElephantAlphaAutonomousEngine() for action in { "generate_market_strategy", "generate_dynamic_pricing_strategy", "generate_resource_optimization_strategy", }: result = asyncio.run(engine._execute_step({ "agent": "openclaw", "action": action, })) assert result is None def test_autoheal_derives_python_exception_from_traceback(): from services.auto_heal_service import AutoHealService svc = AutoHealService() assert svc._derive_error_type({"traceback_str": "Traceback (most recent call last):\nNameError"}) == "python_exception" def test_execute_autonomous_decision_logs_short_circuit_telemetry_failure(monkeypatch, caplog): from services.elephant_alpha_autonomous_engine import ( AutonomousTrigger, ElephantAlphaAutonomousEngine, ) import services.ai_call_logger as ai_call_logger engine = ElephantAlphaAutonomousEngine() async def _no_hermes_threats(top_n=5): return None def _broken_log_ai_call(*args, **kwargs): raise RuntimeError("ai telemetry unavailable") monkeypatch.setattr(engine, "_fetch_hermes_threats_summary", _no_hermes_threats) monkeypatch.setattr(ai_call_logger, "log_ai_call", _broken_log_ai_call) caplog.set_level(logging.WARNING, logger="services.elephant_alpha_autonomous_engine") trigger = AutonomousTrigger( trigger_type="price_drop_alert", conditions={}, threshold=0.8, enabled=True, ) asyncio.run(engine._execute_autonomous_decision(trigger)) assert "EA short-circuit telemetry failed" in caplog.text def test_competitor_db_evidence_actions_are_concrete(): from services.elephant_alpha_autonomous_engine import ElephantAlphaAutonomousEngine actions = ElephantAlphaAutonomousEngine._format_competitor_evidence_actions( [ { "sku": "SKU-1", "name": "測試商品", "momo_price": 1200, "competitor_price": 990, "price_gap_pct": 17.5, "competitor_product_id": "D123456", "match_score": 0.84, "tags": [ "identity_v2", "match_type_exact", "price_basis_total_price", "alert_tier_price_alert_exact", ], } ], trigger_type="price_drop_alert", ) assert actions == [ "[SKU-1] 測試商品|MOMO $1,200 vs PChome $990 (+17.5%)|每件價差 NT$ 210|證據:高信心同款 / 總價可比 / 可直接價格告警 / score 0.84|建議人工確認 PChome identity_v2 後評估跟價或促銷|PChome D123456" ] def test_execute_autonomous_decision_uses_db_evidence_without_hermes_prefetch(monkeypatch): import services.elephant_alpha_autonomous_engine as engine_module from services.elephant_alpha_autonomous_engine import ( AutonomousTrigger, ElephantAlphaAutonomousEngine, ) from services.elephant_alpha_orchestrator import StrategicDecision engine = ElephantAlphaAutonomousEngine() contexts = [] notified = [] async def _capture_context(context): contexts.append(context) return StrategicDecision( priority="high", agents_required=["elephant_alpha"], reasoning="已有 DB 價差實證,允許進入決策流程。", expected_outcome="生成可稽核行動", confidence=0.95, execution_plan=[], resource_requirements={}, ) async def _fetch_should_not_run(top_n=5): raise AssertionError("DB evidence should avoid Hermes LLM prefetch") async def _capture_notify(decision, trigger): notified.append(trigger.trigger_type) monkeypatch.setattr(engine_module.elephant_orchestrator, "analyze_and_coordinate", _capture_context) monkeypatch.setattr(engine, "_fetch_hermes_threats_summary", _fetch_should_not_run) monkeypatch.setattr(engine, "_notify_telegram_executed", _capture_notify) monkeypatch.setattr(engine, "_store_escalation", lambda trigger_type: None) trigger = AutonomousTrigger( trigger_type="price_drop_alert", conditions={"_db_evidence_actions": ["[SKU-1] DB 實證價差"]}, threshold=0.8, enabled=True, ) asyncio.run(engine._execute_autonomous_decision(trigger)) assert contexts[0]["conditions"]["_prefetched_hermes_threats"] == ["[SKU-1] DB 實證價差"] assert notified == ["price_drop_alert"] def test_high_confidence_price_decision_skips_execution_plan_and_goes_hitl(monkeypatch): import services.elephant_alpha_autonomous_engine as engine_module from services.elephant_alpha_autonomous_engine import ( AutonomousTrigger, ElephantAlphaAutonomousEngine, ) from services.elephant_alpha_orchestrator import StrategicDecision engine = ElephantAlphaAutonomousEngine() notified = [] stored = [] async def _capture_context(context): return StrategicDecision( priority="high", agents_required=["hermes", "nemotron"], reasoning="已有 DB 價格比對實證,轉人工覆核。", expected_outcome="人工確認價格策略", confidence=0.96, execution_plan=[ {"agent": "hermes", "action": "analyze_price_competition"}, {"agent": "nemotron", "action": "dispatch_alert", "parameters": {"threats": []}}, ], resource_requirements={}, ) async def _execution_should_not_run(decision): raise AssertionError("price decisions must not execute long-running autonomous steps") async def _capture_notify(decision, trigger): notified.append((trigger.trigger_type, len(decision.execution_plan))) monkeypatch.setattr(engine_module.elephant_orchestrator, "analyze_and_coordinate", _capture_context) monkeypatch.setattr(engine, "_execute_decision", _execution_should_not_run) monkeypatch.setattr(engine, "_notify_telegram_executed", _capture_notify) monkeypatch.setattr(engine, "_store_escalation", lambda trigger_type: stored.append(trigger_type)) trigger = AutonomousTrigger( trigger_type="price_drop_alert", conditions={"_db_evidence_actions": ["[SKU-1] DB 實證價差"]}, threshold=0.8, enabled=True, ) asyncio.run(engine._execute_autonomous_decision(trigger)) assert notified == [("price_drop_alert", 2)] assert stored == ["price_drop_alert"] def test_price_review_decision_envelope_blocks_auto_execution(): from services.elephant_alpha_autonomous_engine import ( AutonomousTrigger, ElephantAlphaAutonomousEngine, ) from services.elephant_alpha_orchestrator import StrategicDecision decision = StrategicDecision( priority="high", agents_required=["elephant_alpha"], reasoning="已有價格比對實證。", expected_outcome="人工確認價格策略。", confidence=0.96, execution_plan=[{"agent": "hermes", "action": "analyze_price_competition"}], resource_requirements={}, ) trigger = AutonomousTrigger( trigger_type="price_drop_alert", conditions={}, threshold=0.8, enabled=True, ) envelope = ElephantAlphaAutonomousEngine._build_price_review_decision_envelope( decision, trigger, concrete_actions=["[SKU-1] MOMO $1,200 vs PChome $990"], ) assert envelope["decision_type"] == "price_decision_review" assert envelope["source_agent"] == "elephant_alpha" assert envelope["recommended_action"]["requires_hitl"] is True assert envelope["guardrails"]["can_auto_execute"] is False assert envelope["guardrails"]["blocked_reason"] == "price decisions require HITL; execution_plan skipped" assert envelope["guardrails"]["data_quality"] == "complete" def test_price_trigger_queries_use_lateral_latest_price_lookup(monkeypatch): import services.elephant_alpha_autonomous_engine as engine_module from services.elephant_alpha_autonomous_engine import ( AutonomousTrigger, ElephantAlphaAutonomousEngine, ) captured_sql = [] class _FakeResult: def mappings(self): return self def fetchall(self): return [] class _FakeSession: def execute(self, statement, params=None): captured_sql.append(str(statement)) return _FakeResult() def close(self): return None monkeypatch.setattr(engine_module, "get_session", lambda: _FakeSession()) engine = ElephantAlphaAutonomousEngine() asyncio.run(engine._check_price_drop_trigger(AutonomousTrigger("price_drop_alert", {}, 0.8, True))) asyncio.run(engine._check_market_opportunity_trigger(AutonomousTrigger("market_opportunity", {}, 0.8, True))) assert engine._fetch_recent_competitor_evidence_actions(top_n=2) is None assert len(captured_sql) == 3 assert all("JOIN LATERAL" in sql for sql in captured_sql) assert all("SELECT DISTINCT ON (product_id)" not in sql for sql in captured_sql) assert all("latest_momo" not in sql for sql in captured_sql) def test_escalate_resource_optimization_without_evidence_is_suppressed(monkeypatch): import services.elephant_alpha_autonomous_engine as engine_module from services.elephant_alpha_autonomous_engine import ( AutonomousTrigger, ElephantAlphaAutonomousEngine, ) from services.elephant_alpha_orchestrator import StrategicDecision engine = ElephantAlphaAutonomousEngine() suppressed = [] cooldown = [] def _raise_if_db_opened(): raise AssertionError("no-concrete resource escalation should not write human_review") monkeypatch.setattr(engine_module, "get_session", _raise_if_db_opened) monkeypatch.setattr(engine, "_store_escalation", lambda trigger_type: cooldown.append(trigger_type)) monkeypatch.setattr( engine, "_record_suppressed_escalation", lambda decision, trigger, reason: suppressed.append((trigger.trigger_type, reason)), ) decision = StrategicDecision( priority="medium", agents_required=["openclaw"], reasoning="資源調配建議信心不足", expected_outcome="待人工確認", confidence=0.60, execution_plan=[], resource_requirements={}, ) trigger = AutonomousTrigger( trigger_type="resource_optimization", conditions={"_resource_metrics": {"action_queue_size": 14, "system_load_pct": 52.0}}, threshold=0.6, enabled=True, ) asyncio.run(engine._escalate_to_human(decision, trigger)) assert cooldown == ["resource_optimization"] assert suppressed == [("resource_optimization", "no_concrete_evidence")] def test_escalate_price_alert_without_evidence_is_suppressed(monkeypatch): import services.elephant_alpha_autonomous_engine as engine_module from services.elephant_alpha_autonomous_engine import ( AutonomousTrigger, ElephantAlphaAutonomousEngine, ) from services.elephant_alpha_orchestrator import StrategicDecision engine = ElephantAlphaAutonomousEngine() suppressed = [] cooldown = [] async def _no_concrete_actions(top_n=5): return None def _raise_if_db_opened(): raise AssertionError("no-concrete price escalation should not write human_review") monkeypatch.setattr(engine, "_fetch_hermes_threats_summary", _no_concrete_actions) monkeypatch.setattr(engine_module, "get_session", _raise_if_db_opened) monkeypatch.setattr(engine, "_store_escalation", lambda trigger_type: cooldown.append(trigger_type)) monkeypatch.setattr( engine, "_record_suppressed_escalation", lambda decision, trigger, reason: suppressed.append((trigger.trigger_type, reason)), ) decision = StrategicDecision( priority="medium", agents_required=["openclaw"], reasoning="價格調整建議信心不足", expected_outcome="待人工確認", confidence=0.62, execution_plan=[], resource_requirements={}, ) trigger = AutonomousTrigger( trigger_type="price_drop_alert", conditions={}, threshold=0.8, enabled=True, ) asyncio.run(engine._escalate_to_human(decision, trigger)) assert cooldown == ["price_drop_alert"] assert suppressed == [("price_drop_alert", "no_concrete_evidence")] def test_resource_pressure_classifier_does_not_equate_backlog_with_cpu_load(): from services.elephant_alpha_autonomous_engine import ElephantAlphaAutonomousEngine metrics = ElephantAlphaAutonomousEngine._classify_resource_pressure({ "action_queue_size": 34, "high_priority_count": 0, "human_review_count": 0, "stale_count": 0, "system_load_pct": 19.2, "queue_threshold": 10, "load_threshold_pct": 80, "high_priority_threshold": 5, "stale_threshold": 5, }) assert metrics["pressure_level"] == "backlog_only" assert metrics["should_alert"] is False assert metrics["load_pressure"] is False assert "system_load=19.2%/80%" in metrics["evidence"] def test_resource_pressure_classifier_alerts_on_actionable_review_backlog(): from services.elephant_alpha_autonomous_engine import ElephantAlphaAutonomousEngine metrics = ElephantAlphaAutonomousEngine._classify_resource_pressure({ "action_queue_size": 34, "high_priority_count": 8, "human_review_count": 6, "stale_count": 0, "system_load_pct": 22.0, "queue_threshold": 10, "load_threshold_pct": 80, "high_priority_threshold": 5, "stale_threshold": 5, }) assert metrics["pressure_level"] == "warning" assert metrics["should_alert"] is True assert metrics["high_priority_pressure"] is True assert metrics["load_pressure"] is False def test_resource_pressure_message_is_measurement_based_not_llm_theatre(): from services.elephant_alpha_autonomous_engine import ElephantAlphaAutonomousEngine metrics = ElephantAlphaAutonomousEngine._classify_resource_pressure({ "action_queue_size": 34, "high_priority_count": 8, "human_review_count": 6, "stale_count": 5, "system_load_pct": 22.0, "queue_threshold": 10, "load_threshold_pct": 80, "high_priority_threshold": 5, "stale_threshold": 5, "stale_hours": 24, "top_action_items": [ { "id": 42, "status": "pending", "priority": 1, "age_hours": 31.5, "type": "code_review_fix", "description": "修正高優先錯誤", } ], }) msg = ElephantAlphaAutonomousEngine._build_resource_pressure_telegram_message( metrics, insight_id=123, previous_limit=10, new_limit=8, ) assert "量測指標" in msg assert "系統處置紀錄" in msg assert "待處理焦點" in msg assert "#42 P1 pending code_review_fix" in msg assert "主機 CPU 未達高負載門檻" in msg assert "未啟動 Hermes/NemoTron 價格分析" in msg assert "預期效益" not in msg assert "48小時" not in msg assert "48 小時效益預測" in msg def test_resource_pressure_decision_envelope_is_measurement_based(): from services.elephant_alpha_autonomous_engine import ElephantAlphaAutonomousEngine metrics = ElephantAlphaAutonomousEngine._classify_resource_pressure({ "action_queue_size": 34, "high_priority_count": 8, "human_review_count": 6, "stale_count": 5, "system_load_pct": 22.0, "queue_threshold": 10, "load_threshold_pct": 80, "high_priority_threshold": 5, "stale_threshold": 5, "stale_hours": 24, }) envelope = ElephantAlphaAutonomousEngine._build_resource_pressure_decision_envelope( metrics, insight_id=1942, previous_limit=10, new_limit=8, ) msg = ElephantAlphaAutonomousEngine._build_resource_pressure_telegram_message( metrics, insight_id=1942, previous_limit=10, new_limit=8, decision_envelope=envelope, ) assert envelope["decision_id"] == "ea_resource_pressure_1942" assert envelope["source_agent"] == "elephant_alpha" assert envelope["decision_type"] == "resource_optimization" assert envelope["severity"] == "P2" assert envelope["guardrails"]["can_auto_execute"] is False assert envelope["guardrails"]["llm_used"] is False assert envelope["trace"]["provider"] == "action_plans_cpu_probe" assert "🧭 決策信封" in msg assert "資料品質:complete 自動執行:不允許" in msg assert "不採用 LLM 生成的 48 小時效益預測" in msg assert "Gemini" not in msg def test_resource_optimization_bypasses_llm_orchestrator(monkeypatch): import services.elephant_alpha_autonomous_engine as engine_module from services.elephant_alpha_autonomous_engine import ( AutonomousTrigger, ElephantAlphaAutonomousEngine, ) engine = ElephantAlphaAutonomousEngine() sent = [] stored = [] async def _boom(*args, **kwargs): raise AssertionError("resource_optimization must not call LLM orchestrator") async def _capture_send(*args, **kwargs): sent.append(args) monkeypatch.setattr(engine_module.elephant_orchestrator, "analyze_and_coordinate", _boom) monkeypatch.setattr(engine, "_run_action_plan_hygiene", lambda: {"updated_count": 0}) monkeypatch.setattr(engine, "_record_resource_pressure_insight", lambda *args, **kwargs: 77) monkeypatch.setattr(engine, "_send_resource_pressure_telegram", _capture_send) monkeypatch.setattr(engine, "_store_escalation", lambda trigger_type: stored.append(trigger_type)) trigger = AutonomousTrigger( trigger_type="resource_optimization", conditions={ "_resource_metrics": { "action_queue_size": 34, "high_priority_count": 9, "human_review_count": 6, "stale_count": 0, "system_load_pct": 20.0, "queue_threshold": 10, "load_threshold_pct": 80, "high_priority_threshold": 5, "stale_threshold": 5, } }, threshold=0.6, enabled=True, ) asyncio.run(engine._execute_autonomous_decision(trigger)) assert stored == ["resource_optimization"] assert len(sent) == 1 assert engine.max_autonomous_decisions_per_hour == 8 def test_resource_pressure_message_reports_hygiene_result(): from services.elephant_alpha_autonomous_engine import ElephantAlphaAutonomousEngine metrics = ElephantAlphaAutonomousEngine._classify_resource_pressure({ "action_queue_size": 9, "high_priority_count": 0, "human_review_count": 0, "stale_count": 0, "system_load_pct": 20.0, "queue_threshold": 10, "load_threshold_pct": 80, "high_priority_threshold": 5, "stale_threshold": 5, }) metrics["pre_hygiene"] = { "action_queue_size": 100, "high_priority_count": 47, "stale_count": 99, } metrics["hygiene_result"] = { "updated_count": 91, "by_source": {"code_review_fix": 66, "openclaw_recommendation": 25}, } msg = ElephantAlphaAutonomousEngine._build_resource_pressure_telegram_message( metrics, insight_id=124, previous_limit=10, new_limit=10, ) assert "P4 resolved" in msg assert "清理前 Action queue:100" in msg assert "已自動關閉過期 action_plans 91 筆" in msg assert "只改 status/metadata,不刪除資料" in msg def test_resource_optimization_cannot_use_legacy_autonomous_execution_template(monkeypatch): from services.elephant_alpha_autonomous_engine import ( AutonomousTrigger, ElephantAlphaAutonomousEngine, ) from services.elephant_alpha_orchestrator import StrategicDecision engine = ElephantAlphaAutonomousEngine() sent = [] async def _capture_send(message): sent.append(message) monkeypatch.setattr("services.telegram_templates._send_telegram_raw", _capture_send) decision = StrategicDecision( priority="medium", agents_required=["elephant_alpha"], reasoning="resource pressure test", expected_outcome="old style outcome must not be sent", confidence=0.8, execution_plan=[{"agent": "Hermes", "action": "generate_resource_optimization_strategy"}], resource_requirements={}, ) trigger = AutonomousTrigger( trigger_type="resource_optimization", conditions={}, threshold=0.6, enabled=True, ) asyncio.run(engine._notify_telegram_executed(decision, trigger)) assert sent == [] def test_human_escalation_decision_envelope_blocks_auto_execution(): from services.elephant_alpha_autonomous_engine import ( AutonomousTrigger, ElephantAlphaAutonomousEngine, ) from services.elephant_alpha_orchestrator import StrategicDecision decision = StrategicDecision( priority="medium", agents_required=["elephant_alpha"], reasoning="需要人工判讀。", expected_outcome="不自動執行。", confidence=0.62, execution_plan=[], resource_requirements={}, ) trigger = AutonomousTrigger( trigger_type="code_exception", conditions={"scan_containers": ["momo-pro-system"]}, threshold=1.0, enabled=True, ) envelope = ElephantAlphaAutonomousEngine._build_human_escalation_decision_envelope( decision, trigger, insight_id=77, concrete_actions=[], ) assert envelope["decision_id"] == "ea_review_77" assert envelope["decision_type"] == "ea_escalation" assert envelope["guardrails"]["can_auto_execute"] is False assert envelope["guardrails"]["data_quality"] == "partial" assert envelope["recommended_action"]["requires_hitl"] is True assert envelope["trace"]["provider"] == "elephant_alpha" def test_elephant_alpha_openclaw_registry_is_ollama_first(): from services.elephant_alpha_orchestrator import ElephantAlphaOrchestrator orchestrator = ElephantAlphaOrchestrator() openclaw = orchestrator.agents["openclaw"] assert not openclaw.model.startswith("gemini") assert openclaw.model == "qwen2.5-coder:7b" assert "Gemini is not a primary agent model" in orchestrator.system_prompt assert "Ollama cascade" in orchestrator.system_prompt