diff --git a/services/ai_orchestrator.py b/services/ai_orchestrator.py index ffabcb3..283a53b 100644 --- a/services/ai_orchestrator.py +++ b/services/ai_orchestrator.py @@ -41,6 +41,7 @@ class AIOrchestrator: """ ctx = await self._get_context(session_id) # includes hermes analysis result = await self.nemotron.handle_l2(event, ctx) + result.setdefault("session_id", session_id) await self._save_action_plan(result) # review gate handled by routes/bot_api_routes callback return result @@ -92,13 +93,14 @@ class AIOrchestrator: INSERT INTO action_plans (session_id, plan_type, sku, payload, status, created_by) VALUES - (:sid, :pt, :sku, :pl, 'pending', 'nemotron') + (:sid, :pt, :sku, :pl, :status, 'nemotron') """), { "sid": plan.get("session_id"), "pt": plan.get("plan_type"), "sku": plan.get("sku"), "pl": json.dumps(plan, ensure_ascii=False), + "status": "auto_pending" if plan.get("auto_execute") else "pending", }, ) session.commit() diff --git a/services/code_review_pipeline_service.py b/services/code_review_pipeline_service.py index b7838fe..e1cf718 100644 --- a/services/code_review_pipeline_service.py +++ b/services/code_review_pipeline_service.py @@ -40,6 +40,8 @@ _pipeline_lock = threading.Lock() GEMINI_API_KEY = os.getenv("GEMINI_API_KEY", "") REVIEW_MODEL = os.getenv("OPENCLAW_MODEL", "gemini-2.5-flash") INTERNAL_TOKEN = os.getenv("INTERNAL_WEBHOOK_TOKEN", "") +AUTO_FIX_ENABLED = os.getenv("CODE_REVIEW_AUTO_FIX_ENABLED", "false").lower() == "true" +ALLOW_INSECURE_WEBHOOK = os.getenv("MOMO_ALLOW_INSECURE_INTERNAL_WEBHOOK_FOR_DEV", "").lower() == "true" # ═══════════════════════════════════════════════════════════════════════════════ @@ -330,11 +332,12 @@ class CodeReviewPipeline: "auto_fix": true|false, "reasoning": "決策理由(繁體中文,一句話,需含具體數字)", "fix_files": ["需自動修復的檔案(最多5個,所有有問題的檔案)"], - "human_review_needed": false + "human_review_needed": true }} -規則(依 ADR-014,所有問題一律自動修復,安全網為 Git+Gitea CI/CD 回滾): -- 任何 finding ≥ 1 → auto_fix=true,human_review_needed=false +規則(依 ADR-012 L3 HITL,所有 code fix 預設需要人工審核): +- CRITICAL/HIGH → auto_fix=false,human_review_needed=true +- MEDIUM/LOW 只有在系統明確允許時才可 auto_fix=true - priority 按最嚴重 severity 決定:CRITICAL>HIGH>MEDIUM>LOW - fix_files 填入所有有問題的檔案(不限 CRITICAL/HIGH)""" @@ -345,19 +348,19 @@ class CodeReviewPipeline: timeout=60, ) if resp.success: - return json.loads(resp.content) + return self._guard_ea_decision(json.loads(resp.content), findings) except Exception as e: logger.warning("[CodeReview] ElephantAlpha 決策失敗,回退規則: %s", e) - # 規則 fallback(ADR-014:任何 finding 一律自動修復,回滾防線由 Git+CI/CD 負責) + # 規則 fallback:ADR-012 L3 邊界,code fix 預設走 HITL。 has_findings = len(findings) > 0 - auto_fix = has_findings priority = ( "critical" if critical_n > 0 else "high" if high_n > 0 else "medium" if sev["medium"] > 0 else "low" if sev["low"] > 0 else "low" ) + auto_fix = bool(has_findings and AUTO_FIX_ENABLED and priority not in {"critical", "high"}) fix_files = list({ f.get("file", "") for f in findings if f.get("file") })[:5] @@ -365,11 +368,38 @@ class CodeReviewPipeline: return { "priority": priority, "auto_fix": auto_fix, - "reasoning": f"ADR-014 規則:CRITICAL={critical_n} HIGH={high_n} MEDIUM={sev['medium']} LOW={sev['low']},{'觸發自動修復' if auto_fix else '無問題無需修復'}", + "reasoning": f"ADR-012 HITL 規則:CRITICAL={critical_n} HIGH={high_n} MEDIUM={sev['medium']} LOW={sev['low']},{'允許低風險自動修復' if auto_fix else '建立 action_plan 等待人工審核'}", "fix_files": fix_files, - "human_review_needed": False, + "human_review_needed": has_findings and not auto_fix, } + def _guard_ea_decision(self, decision: Dict, findings: List[Dict]) -> Dict: + """Apply local ADR-012 safety gates even if the LLM suggests auto-fix.""" + sev = self.state["severity_summary"] + priority = (decision.get("priority") or "").lower() or ( + "critical" if sev["critical"] > 0 else + "high" if sev["high"] > 0 else + "medium" if sev["medium"] > 0 else + "low" + ) + has_high_risk = sev["critical"] > 0 or sev["high"] > 0 or priority in {"critical", "high"} + wants_auto_fix = bool(decision.get("auto_fix")) + allowed_auto_fix = bool(wants_auto_fix and AUTO_FIX_ENABLED and not has_high_risk) + if wants_auto_fix and not allowed_auto_fix: + logger.warning( + "[CodeReview] EA auto_fix overridden by ADR-012 HITL gate priority=%s auto_fix_enabled=%s", + priority, AUTO_FIX_ENABLED, + ) + + decision["priority"] = priority + decision["auto_fix"] = allowed_auto_fix + decision["human_review_needed"] = bool(findings and not allowed_auto_fix) + decision["reasoning"] = ( + f"{decision.get('reasoning', '')} " + f"[ADR-012 gate: auto_fix={'enabled' if allowed_auto_fix else 'blocked'}, priority={priority}]" + ).strip() + return decision + # ── Step 5:NemoTron 派遣 ────────────────────────────────────────────────── def _nemotron_dispatch(self, ea: Dict, findings: List[Dict]) -> Dict: @@ -627,7 +657,11 @@ def get_history(limit: int = 20) -> List[Dict]: def verify_internal_token(request_token: str) -> bool: - """驗證 CD webhook 來源 token。未設定 env 時直接放行(dev 環境)""" + """驗證 CD webhook 來源 token。Production 預設必填,避免外部觸發 auto-review/fix 鏈。""" if not INTERNAL_TOKEN: - return True + if ALLOW_INSECURE_WEBHOOK: + logger.warning("[CodeReview] INTERNAL_WEBHOOK_TOKEN 未設定,僅因 dev override 放行") + return True + logger.error("[CodeReview] INTERNAL_WEBHOOK_TOKEN 未設定,拒絕 webhook") + return False return request_token == INTERNAL_TOKEN diff --git a/services/event_router.py b/services/event_router.py index e630a12..2342bf6 100644 --- a/services/event_router.py +++ b/services/event_router.py @@ -4,16 +4,24 @@ # W2-C: L2 優先走 Elephant Alpha Orchestrator;EA 不可用時 fallback AIOrchestrator # import asyncio +import json import logging +import os import threading import traceback import time +from datetime import datetime from typing import Any, Dict, Optional from services.ai_orchestrator import AIOrchestrator from services.telegram_templates import send_telegram_with_result, triaged_alert logger = logging.getLogger(__name__) +_QUEUE_PATH = os.getenv( + "MOMO_EVENT_ROUTER_QUEUE", + os.path.join(os.path.dirname(os.path.dirname(__file__)), "data", "event_router_failed_deliveries.jsonl"), +) +_QUEUE_LOCK = threading.Lock() async def _handle_l1(event: Dict[str, Any], session_id: str) -> Dict[str, Any]: @@ -69,6 +77,106 @@ async def _handle_l0(event: Dict[str, Any]) -> Dict[str, Any]: return {"status": "ok", "echo": event.get("event_type")} +async def _run_tier_handler(tier: str, event: Dict[str, Any], session_id: str) -> Dict[str, Any]: + """Run AI tier with L0-safe degradation; handler failure must not break alerts.""" + try: + if tier == "L0": + return await _handle_l0(event) + if tier == "L1": + return await _handle_l1(event, session_id) + if tier == "L2": + return await _handle_l2(event, session_id) + return await _handle_l0(event) + except Exception as exc: + logger.exception("[EventRouter] %s handler failed, degraded to template alert: %s", tier, exc) + return { + "status": "degraded", + "summary": "AI 分析暫不可用,已降級為模板告警;原始事件仍保留。", + "cause": f"{type(exc).__name__}: {str(exc)[:200]}", + "suggested_actions": ["先依原始事實排查", "若重複發生,查看 event_router_failed_deliveries.jsonl 與服務 logs"], + "handler_error": str(exc)[:500], + } + + +def _event_key(event: Dict[str, Any]) -> str: + return f"{event.get('source', 'unknown')}:{event.get('event_type', 'unknown')}" + + +def _is_event_silenced(event: Dict[str, Any]) -> bool: + try: + from services.agent_actions import is_silenced + key = _event_key(event) + return is_silenced(key) or is_silenced(str(event.get("event_type", ""))) + except Exception as exc: + logger.warning("[EventRouter] silence check failed: %s", exc) + return False + + +def _queue_failed_delivery( + event: Dict[str, Any], + tier: str, + message: Optional[str], + errors: list, + reason: str, +) -> bool: + """Append failed notification delivery to a local JSONL queue for later replay.""" + record = { + "ts": datetime.now().isoformat(), + "reason": reason, + "tier": tier, + "event_key": _event_key(event), + "event": event, + "message": message, + "errors": errors, + } + try: + os.makedirs(os.path.dirname(_QUEUE_PATH), exist_ok=True) + with _QUEUE_LOCK: + with open(_QUEUE_PATH, "a", encoding="utf-8") as fh: + fh.write(json.dumps(record, ensure_ascii=False, default=str) + "\n") + return True + except Exception as exc: + logger.error("[EventRouter] failed to queue delivery fallback: %s", exc) + return False + + +def _execute_safe_actions(result: Dict[str, Any], event: Dict[str, Any]) -> list[Dict[str, Any]]: + """ + Execute only ADR-012 L2 SAFE_ACTIONS when NemoTron explicitly marks the plan auto-safe. + All action functions own their own audit writes. + """ + if not isinstance(result, dict): + return [] + if not (result.get("auto_execute") or result.get("dispatch_to") in {"safe_action", "auto_execute"}): + return [] + + action_plan = result.get("action_plan") or result.get("execution_plan") or [] + if not isinstance(action_plan, list): + return [] + + try: + from services.agent_actions import SAFE_ACTIONS + except Exception as exc: + return [{"action": "load_safe_actions", "status": "error", "error": str(exc)[:200]}] + + executed = [] + for step in action_plan[:3]: + if not isinstance(step, dict): + continue + action = step.get("action") + params = step.get("params") or {} + if action not in SAFE_ACTIONS: + executed.append({"action": action, "status": "rejected", "reason": "not in SAFE_ACTIONS"}) + continue + try: + action_result = SAFE_ACTIONS[action](**params) + executed.append({"action": action, "status": "ok", "result": action_result}) + except Exception as exc: + logger.exception("[EventRouter] safe action failed: %s", action) + executed.append({"action": action, "status": "error", "error": str(exc)[:300]}) + return executed + + async def dispatch(event: Dict[str, Any], admin_chat_ids: Optional[list] = None) -> Dict[str, Any]: """ Main event routing entry (ADR-012 §③ — 唯一入口). @@ -79,17 +187,28 @@ async def dispatch(event: Dict[str, Any], admin_chat_ids: Optional[list] = None) started_at = time.perf_counter() try: - if tier == "L0": - result = await _handle_l0(event) - elif tier == "L1": - result = await _handle_l1(event, session_id) - elif tier == "L2": - result = await _handle_l2(event, session_id) - else: - result = await _handle_l0(event) + if _is_event_silenced(event): + return { + "tier": tier, + "sent": 0, + "errors": [], + "latency_ms": int((time.perf_counter() - started_at) * 1000), + "payload": {"status": "silenced", "event_key": _event_key(event)}, + "delivered": True, + "silenced": True, + "queued": False, + } + + result = await _run_tier_handler(tier, event, session_id) + executed_actions = _execute_safe_actions(result, event) + if executed_actions: + result["executed_actions"] = executed_actions message, reply_markup = _build_telegram_message(event, tier, result) send_result = send_telegram_with_result(message, chat_ids=admin_chat_ids, reply_markup=reply_markup) + queued = False + if not send_result["ok"]: + queued = _queue_failed_delivery(event, tier, message, send_result["errors"], "telegram_delivery_failed") latency_ms = int((time.perf_counter() - started_at) * 1000) return { @@ -99,9 +218,12 @@ async def dispatch(event: Dict[str, Any], admin_chat_ids: Optional[list] = None) "latency_ms": latency_ms, "payload": result, "delivered": send_result["ok"], + "silenced": False, + "queued": queued, } except Exception as e: logger.exception(f"[EventRouter] dispatch failed: {e}") + queued = _queue_failed_delivery(event, tier, None, [str(e)], "dispatch_exception") return { "tier": tier, "sent": 0, @@ -109,6 +231,8 @@ async def dispatch(event: Dict[str, Any], admin_chat_ids: Optional[list] = None) "latency_ms": int((time.perf_counter() - started_at) * 1000), "payload": None, "delivered": False, + "silenced": False, + "queued": queued, } @@ -125,6 +249,7 @@ def _run_coroutine_in_thread(coro) -> Dict[str, Any]: "errors": [str(e)], "latency_ms": 0, "payload": None, + "delivered": False, } thread = threading.Thread(target=runner, daemon=True) @@ -137,6 +262,7 @@ def _run_coroutine_in_thread(coro) -> Dict[str, Any]: "errors": ["dispatch_sync timed out"], "latency_ms": 15000, "payload": None, + "delivered": False, } return result["value"] @@ -188,7 +314,8 @@ def _classify(event: Dict[str, Any]) -> str: return "L1" if has_trace else "L0" if sev == "alert": if event_type in {"price_threat", "db_connection_error", "crawler_timeout", - "nim_quota_exhausted", "embedding_failure"}: + "nim_quota_exhausted", "embedding_failure", + "scheduler_task_failure"}: return "L2" return "L1" return "L0" diff --git a/services/nemoton_dispatcher_service.py b/services/nemoton_dispatcher_service.py index 0ba59a8..bf13c5a 100644 --- a/services/nemoton_dispatcher_service.py +++ b/services/nemoton_dispatcher_service.py @@ -366,7 +366,7 @@ class NemotronDispatcher: """ if not NIM_API_KEY: logger.warning("[NemotronDispatcher] NVIDIA_API_KEY 未設定,跳過 NIM 呼叫") - return [], {} + raise RuntimeError("NVIDIA_API_KEY not configured") threat_summary = json.dumps( [ @@ -943,6 +943,32 @@ class NemotronDispatcher: """ try: message = (event or {}).get("message", "") or "" + event_type = (event or {}).get("event_type", "") + payload = (event or {}).get("payload") or {} + task_name = payload.get("task_name") or (event or {}).get("task_name") + + if event_type == "scheduler_task_failure" and task_name: + try: + from services.agent_actions import ALLOWED_RETRY_TASKS + if task_name in ALLOWED_RETRY_TASKS: + return { + "session_id": f"evt:{event_type}:{(event or {}).get('source', 'unknown')}", + "plan_type": "retry_task", + "action_plan": [{ + "action": "retry_task", + "params": { + "task_name": task_name, + "max_attempts": 2, + "backoff_sec": 60, + }, + }], + "dispatch_to": "safe_action", + "auto_execute": True, + "metadata": {"event_type": event_type, "task_name": task_name}, + } + except Exception as action_err: + logger.warning("[NemotronDispatcher.handle_l2] retry_task 規劃跳過: %s", action_err) + # ctx 可能是 {"latest": {...}} 或已攤平的 intent 結果 hermes = {} if isinstance(ctx, dict): @@ -1070,29 +1096,37 @@ class NemotronDispatcher: "nim_stats": {}, } - if not _check_nim_quota(): - # 配額耗盡:fallback 直接派發 HIGH 威脅(不帶 NIM 足跡) - logger.warning("[Dispatcher] NIM 配額耗盡,fallback 直接派發 HIGH 威脅") - footprint = _build_footprint_block(hermes_stats, None) - for t in nim_candidates: - if t.risk == "HIGH": - self._exec_trigger_price_alert( - t.sku, t.name, - t.gap_pct, t.sales_7d_delta_pct, - t.recommended_action, t.confidence, - momo_price=t.momo_price, comp_price=t.pchome_price, - footprint=footprint, - ) - dispatched += 1 + if not NIM_API_KEY: + logger.warning("[Dispatcher][ADR-004] NVIDIA_API_KEY 未設定,啟動 Hermes 規則引擎降級") + fb = self._hermes_rule_fallback(nim_candidates, hermes_stats) return { - "dispatched": dispatched, - "skipped": len(threats) - dispatched + skipped, - "errors": errors, - "nim_stats": {}, + "dispatched": dispatched + fb["dispatched"], + "skipped": skipped + fb["skipped"], + "errors": errors + fb["errors"], + "nim_stats": fb["nim_stats"], + } + + if not _check_nim_quota(): + logger.warning("[Dispatcher][ADR-004] NIM 配額耗盡,啟動 Hermes 規則引擎降級") + fb = self._hermes_rule_fallback(nim_candidates, hermes_stats) + return { + "dispatched": dispatched + fb["dispatched"], + "skipped": skipped + fb["skipped"], + "errors": errors + fb["errors"], + "nim_stats": fb["nim_stats"], } try: tool_calls, nim_stats = self._call_nim(nim_candidates) + if not tool_calls: + logger.warning("[Dispatcher][ADR-004] NIM 0 tool_calls,啟動 Hermes 規則引擎降級") + fb = self._hermes_rule_fallback(nim_candidates, hermes_stats) + return { + "dispatched": dispatched + fb["dispatched"], + "skipped": skipped + fb["skipped"], + "errors": errors + fb["errors"], + "nim_stats": fb["nim_stats"], + } except requests.HTTPError as e: if e.response is not None and e.response.status_code == 429: logger.warning("[Dispatcher][ADR-004] NIM HTTP 429,啟動 Hermes 規則引擎降級") @@ -1103,20 +1137,22 @@ class NemotronDispatcher: "errors": errors + fb["errors"], "nim_stats": fb["nim_stats"], } - logger.error(f"[Dispatcher] NIM HTTP 錯誤: {e}") + logger.warning("[Dispatcher][ADR-004] NIM HTTP 錯誤,啟動 Hermes 規則引擎降級: %s", e) + fb = self._hermes_rule_fallback(nim_candidates, hermes_stats) return { - "dispatched": dispatched, - "skipped": len(nim_candidates), - "errors": errors + [str(e)], - "nim_stats": {}, + "dispatched": dispatched + fb["dispatched"], + "skipped": skipped + fb["skipped"], + "errors": errors + [str(e)] + fb["errors"], + "nim_stats": fb["nim_stats"], } except Exception as e: - logger.error(f"[Dispatcher] NIM 呼叫失敗: {e}") + logger.warning("[Dispatcher][ADR-004] NIM 呼叫失敗,啟動 Hermes 規則引擎降級: %s", e) + fb = self._hermes_rule_fallback(nim_candidates, hermes_stats) return { - "dispatched": dispatched, - "skipped": len(nim_candidates), - "errors": errors + [str(e)], - "nim_stats": {}, + "dispatched": dispatched + fb["dispatched"], + "skipped": skipped + fb["skipped"], + "errors": errors + [str(e)] + fb["errors"], + "nim_stats": fb["nim_stats"], } # 建立運算足跡(Telegram 顯示文字 + DB 結構化 JSON,共用同一份) diff --git a/tests/test_code_review_pipeline_security.py b/tests/test_code_review_pipeline_security.py new file mode 100644 index 0000000..4201ace --- /dev/null +++ b/tests/test_code_review_pipeline_security.py @@ -0,0 +1,49 @@ +def test_verify_internal_token_requires_env_by_default(monkeypatch): + import services.code_review_pipeline_service as module + + monkeypatch.setattr(module, "INTERNAL_TOKEN", "") + monkeypatch.setattr(module, "ALLOW_INSECURE_WEBHOOK", False) + + assert module.verify_internal_token("") is False + assert module.verify_internal_token("anything") is False + + +def test_verify_internal_token_allows_explicit_dev_override(monkeypatch): + import services.code_review_pipeline_service as module + + monkeypatch.setattr(module, "INTERNAL_TOKEN", "") + monkeypatch.setattr(module, "ALLOW_INSECURE_WEBHOOK", True) + + assert module.verify_internal_token("") is True + + +def test_code_review_guard_blocks_high_risk_auto_fix(monkeypatch): + import services.code_review_pipeline_service as module + + monkeypatch.setattr(module, "AUTO_FIX_ENABLED", True) + pipeline = module.CodeReviewPipeline("abcdef123456", ["services/example.py"]) + pipeline.state["severity_summary"] = {"critical": 0, "high": 1, "medium": 0, "low": 0} + + guarded = pipeline._guard_ea_decision( + {"priority": "high", "auto_fix": True, "reasoning": "建議修復", "fix_files": ["services/example.py"]}, + [{"severity": "HIGH", "file": "services/example.py"}], + ) + + assert guarded["auto_fix"] is False + assert guarded["human_review_needed"] is True + + +def test_code_review_guard_requires_auto_fix_feature_flag(monkeypatch): + import services.code_review_pipeline_service as module + + monkeypatch.setattr(module, "AUTO_FIX_ENABLED", False) + pipeline = module.CodeReviewPipeline("abcdef123456", ["services/example.py"]) + pipeline.state["severity_summary"] = {"critical": 0, "high": 0, "medium": 1, "low": 0} + + guarded = pipeline._guard_ea_decision( + {"priority": "medium", "auto_fix": True, "reasoning": "建議修復", "fix_files": ["services/example.py"]}, + [{"severity": "MEDIUM", "file": "services/example.py"}], + ) + + assert guarded["auto_fix"] is False + assert guarded["human_review_needed"] is True diff --git a/tests/test_event_router.py b/tests/test_event_router.py new file mode 100644 index 0000000..7b3da8b --- /dev/null +++ b/tests/test_event_router.py @@ -0,0 +1,87 @@ +import asyncio +import json + + +def test_dispatch_degrades_and_queues_when_ai_or_telegram_fails(tmp_path, monkeypatch): + import services.event_router as event_router + + queue_path = tmp_path / "event_router_failed.jsonl" + monkeypatch.setattr(event_router, "_QUEUE_PATH", str(queue_path)) + monkeypatch.setattr(event_router, "_is_event_silenced", lambda event: False) + + async def broken_l1(event, session_id): + raise RuntimeError("hermes unavailable") + + monkeypatch.setattr(event_router, "_handle_l1", broken_l1) + monkeypatch.setattr( + event_router, + "send_telegram_with_result", + lambda *args, **kwargs: { + "ok": False, + "sent": 0, + "failed": 1, + "chat_ids": [123], + "errors": ["123:HTTP 500"], + }, + ) + + result = asyncio.run(event_router.dispatch({ + "source": "Scheduler.Test", + "event_type": "crawler_timeout", + "severity": "warning", + "title": "測試任務異常", + "summary": "timeout", + "trace": "Traceback...", + })) + + assert result["tier"] == "L1" + assert result["delivered"] is False + assert result["queued"] is True + assert result["payload"]["status"] == "degraded" + queued = json.loads(queue_path.read_text(encoding="utf-8").strip()) + assert queued["reason"] == "telegram_delivery_failed" + assert queued["event_key"] == "Scheduler.Test:crawler_timeout" + + +def test_dispatch_executes_only_auto_safe_actions(monkeypatch): + import services.agent_actions as agent_actions + import services.event_router as event_router + + calls = [] + monkeypatch.setattr(event_router, "_is_event_silenced", lambda event: False) + monkeypatch.setitem( + agent_actions.SAFE_ACTIONS, + "retry_task", + lambda **params: calls.append(params) or {"status": "scheduled"}, + ) + + async def planned_l2(event, session_id): + return { + "dispatch_to": "safe_action", + "auto_execute": True, + "action_plan": [ + {"action": "retry_task", "params": {"task_name": "run_momo_task"}}, + {"action": "restart_container", "params": {"container": "momo-db"}}, + ], + } + + monkeypatch.setattr(event_router, "_handle_l2", planned_l2) + monkeypatch.setattr( + event_router, + "send_telegram_with_result", + lambda *args, **kwargs: {"ok": True, "sent": 1, "failed": 0, "chat_ids": [123], "errors": []}, + ) + + result = asyncio.run(event_router.dispatch({ + "source": "Scheduler.MOMO", + "event_type": "scheduler_task_failure", + "severity": "alert", + "title": "MOMO 任務異常", + "summary": "boom", + "payload": {"task_name": "run_momo_task"}, + })) + + assert result["tier"] == "L2" + assert calls == [{"task_name": "run_momo_task"}] + assert result["payload"]["executed_actions"][0]["status"] == "ok" + assert result["payload"]["executed_actions"][1]["status"] == "rejected" diff --git a/tests/test_nemotron_fallback.py b/tests/test_nemotron_fallback.py new file mode 100644 index 0000000..57d5c79 --- /dev/null +++ b/tests/test_nemotron_fallback.py @@ -0,0 +1,79 @@ +from dataclasses import dataclass + + +@dataclass +class FakeThreat: + sku: str + name: str + momo_price: float = 100.0 + pchome_price: float = 80.0 + gap_pct: float = 25.0 + sales_7d_delta_pct: float = -20.0 + risk: str = "HIGH" + recommended_action: str = "請評估價格策略" + confidence: float = 0.8 + + +def _patch_execution_methods(monkeypatch, dispatcher): + calls = [] + + def record(kind): + def _inner(*args, **kwargs): + calls.append({"kind": kind, "args": args, "kwargs": kwargs}) + return _inner + + monkeypatch.setattr(dispatcher, "_exec_trigger_price_alert", record("price_alert")) + monkeypatch.setattr(dispatcher, "_exec_add_to_recommendation", record("recommendation")) + monkeypatch.setattr(dispatcher, "_exec_flag_for_human_review", record("human_review")) + return calls + + +def test_dispatch_falls_back_to_hermes_rules_without_nim_api_key(monkeypatch): + import services.nemoton_dispatcher_service as module + + monkeypatch.setattr(module, "NIM_API_KEY", "") + dispatcher = module.NemotronDispatcher() + calls = _patch_execution_methods(monkeypatch, dispatcher) + + result = dispatcher.dispatch([FakeThreat("SKU-1", "測試品")], hermes_stats={"duration_sec": 1}) + + assert result["dispatched"] == 1 + assert result["skipped"] == 0 + assert result["nim_stats"]["degraded"] is True + assert calls[0]["kind"] == "price_alert" + + +def test_dispatch_falls_back_to_hermes_rules_on_nim_timeout(monkeypatch): + import requests + import services.nemoton_dispatcher_service as module + + monkeypatch.setattr(module, "NIM_API_KEY", "test-key") + monkeypatch.setattr(module, "_check_nim_quota", lambda: True) + dispatcher = module.NemotronDispatcher() + calls = _patch_execution_methods(monkeypatch, dispatcher) + monkeypatch.setattr(dispatcher, "_call_nim", lambda threats: (_ for _ in ()).throw(requests.Timeout("timeout"))) + + result = dispatcher.dispatch([FakeThreat("SKU-2", "測試品")], hermes_stats={"duration_sec": 1}) + + assert result["dispatched"] == 1 + assert result["skipped"] == 0 + assert result["nim_stats"]["degraded"] is True + assert result["errors"] == ["timeout"] + assert calls[0]["kind"] == "price_alert" + + +def test_dispatch_falls_back_to_hermes_rules_on_zero_tool_calls(monkeypatch): + import services.nemoton_dispatcher_service as module + + monkeypatch.setattr(module, "NIM_API_KEY", "test-key") + monkeypatch.setattr(module, "_check_nim_quota", lambda: True) + dispatcher = module.NemotronDispatcher() + calls = _patch_execution_methods(monkeypatch, dispatcher) + monkeypatch.setattr(dispatcher, "_call_nim", lambda threats: ([], {"total_tokens": 10})) + + result = dispatcher.dispatch([FakeThreat("SKU-3", "測試品")], hermes_stats={"duration_sec": 1}) + + assert result["dispatched"] == 1 + assert result["skipped"] == 0 + assert result["nim_stats"]["degraded"] is True + assert calls[0]["kind"] == "price_alert"