補強 AI 自動化閉環與安全降級
All checks were successful
CD Pipeline / deploy (push) Successful in 1m14s

This commit is contained in:
OoO
2026-04-29 22:56:00 +08:00
parent 0875dd8fda
commit 1c2dc6cd61
7 changed files with 463 additions and 49 deletions

View File

@@ -41,6 +41,7 @@ class AIOrchestrator:
"""
ctx = await self._get_context(session_id) # includes hermes analysis
result = await self.nemotron.handle_l2(event, ctx)
result.setdefault("session_id", session_id)
await self._save_action_plan(result)
# review gate handled by routes/bot_api_routes callback
return result
@@ -92,13 +93,14 @@ class AIOrchestrator:
INSERT INTO action_plans
(session_id, plan_type, sku, payload, status, created_by)
VALUES
(:sid, :pt, :sku, :pl, 'pending', 'nemotron')
(:sid, :pt, :sku, :pl, :status, 'nemotron')
"""),
{
"sid": plan.get("session_id"),
"pt": plan.get("plan_type"),
"sku": plan.get("sku"),
"pl": json.dumps(plan, ensure_ascii=False),
"status": "auto_pending" if plan.get("auto_execute") else "pending",
},
)
session.commit()

View File

@@ -40,6 +40,8 @@ _pipeline_lock = threading.Lock()
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY", "")
REVIEW_MODEL = os.getenv("OPENCLAW_MODEL", "gemini-2.5-flash")
INTERNAL_TOKEN = os.getenv("INTERNAL_WEBHOOK_TOKEN", "")
AUTO_FIX_ENABLED = os.getenv("CODE_REVIEW_AUTO_FIX_ENABLED", "false").lower() == "true"
ALLOW_INSECURE_WEBHOOK = os.getenv("MOMO_ALLOW_INSECURE_INTERNAL_WEBHOOK_FOR_DEV", "").lower() == "true"
# ═══════════════════════════════════════════════════════════════════════════════
@@ -330,11 +332,12 @@ class CodeReviewPipeline:
"auto_fix": true|false,
"reasoning": "決策理由(繁體中文,一句話,需含具體數字)",
"fix_files": ["需自動修復的檔案最多5個所有有問題的檔案"],
"human_review_needed": false
"human_review_needed": true
}}
規則(依 ADR-014所有問題一律自動修復安全網為 Git+Gitea CI/CD 回滾
- 任何 finding ≥ 1 → auto_fix=truehuman_review_needed=false
規則(依 ADR-012 L3 HITL所有 code fix 預設需要人工審核
- CRITICAL/HIGH → auto_fix=falsehuman_review_needed=true
- MEDIUM/LOW 只有在系統明確允許時才可 auto_fix=true
- priority 按最嚴重 severity 決定CRITICAL>HIGH>MEDIUM>LOW
- fix_files 填入所有有問題的檔案(不限 CRITICAL/HIGH"""
@@ -345,19 +348,19 @@ class CodeReviewPipeline:
timeout=60,
)
if resp.success:
return json.loads(resp.content)
return self._guard_ea_decision(json.loads(resp.content), findings)
except Exception as e:
logger.warning("[CodeReview] ElephantAlpha 決策失敗,回退規則: %s", e)
# 規則 fallbackADR-014任何 finding 一律自動修復,回滾防線由 Git+CI/CD 負責)
# 規則 fallbackADR-012 L3 邊界code fix 預設走 HITL。
has_findings = len(findings) > 0
auto_fix = has_findings
priority = (
"critical" if critical_n > 0 else
"high" if high_n > 0 else
"medium" if sev["medium"] > 0 else
"low" if sev["low"] > 0 else "low"
)
auto_fix = bool(has_findings and AUTO_FIX_ENABLED and priority not in {"critical", "high"})
fix_files = list({
f.get("file", "") for f in findings if f.get("file")
})[:5]
@@ -365,11 +368,38 @@ class CodeReviewPipeline:
return {
"priority": priority,
"auto_fix": auto_fix,
"reasoning": f"ADR-014 規則CRITICAL={critical_n} HIGH={high_n} MEDIUM={sev['medium']} LOW={sev['low']}{'觸發自動修復' if auto_fix else '無問題無需修復'}",
"reasoning": f"ADR-012 HITL 規則CRITICAL={critical_n} HIGH={high_n} MEDIUM={sev['medium']} LOW={sev['low']}{'允許低風險自動修復' if auto_fix else '建立 action_plan 等待人工審核'}",
"fix_files": fix_files,
"human_review_needed": False,
"human_review_needed": has_findings and not auto_fix,
}
def _guard_ea_decision(self, decision: Dict, findings: List[Dict]) -> Dict:
"""Apply local ADR-012 safety gates even if the LLM suggests auto-fix."""
sev = self.state["severity_summary"]
priority = (decision.get("priority") or "").lower() or (
"critical" if sev["critical"] > 0 else
"high" if sev["high"] > 0 else
"medium" if sev["medium"] > 0 else
"low"
)
has_high_risk = sev["critical"] > 0 or sev["high"] > 0 or priority in {"critical", "high"}
wants_auto_fix = bool(decision.get("auto_fix"))
allowed_auto_fix = bool(wants_auto_fix and AUTO_FIX_ENABLED and not has_high_risk)
if wants_auto_fix and not allowed_auto_fix:
logger.warning(
"[CodeReview] EA auto_fix overridden by ADR-012 HITL gate priority=%s auto_fix_enabled=%s",
priority, AUTO_FIX_ENABLED,
)
decision["priority"] = priority
decision["auto_fix"] = allowed_auto_fix
decision["human_review_needed"] = bool(findings and not allowed_auto_fix)
decision["reasoning"] = (
f"{decision.get('reasoning', '')} "
f"[ADR-012 gate: auto_fix={'enabled' if allowed_auto_fix else 'blocked'}, priority={priority}]"
).strip()
return decision
# ── Step 5NemoTron 派遣 ──────────────────────────────────────────────────
def _nemotron_dispatch(self, ea: Dict, findings: List[Dict]) -> Dict:
@@ -627,7 +657,11 @@ def get_history(limit: int = 20) -> List[Dict]:
def verify_internal_token(request_token: str) -> bool:
"""驗證 CD webhook 來源 token。未設定 env 時直接放行dev 環境)"""
"""驗證 CD webhook 來源 token。Production 預設必填,避免外部觸發 auto-review/fix 鏈。"""
if not INTERNAL_TOKEN:
return True
if ALLOW_INSECURE_WEBHOOK:
logger.warning("[CodeReview] INTERNAL_WEBHOOK_TOKEN 未設定,僅因 dev override 放行")
return True
logger.error("[CodeReview] INTERNAL_WEBHOOK_TOKEN 未設定,拒絕 webhook")
return False
return request_token == INTERNAL_TOKEN

View File

@@ -4,16 +4,24 @@
# W2-C: L2 優先走 Elephant Alpha OrchestratorEA 不可用時 fallback AIOrchestrator
#
import asyncio
import json
import logging
import os
import threading
import traceback
import time
from datetime import datetime
from typing import Any, Dict, Optional
from services.ai_orchestrator import AIOrchestrator
from services.telegram_templates import send_telegram_with_result, triaged_alert
logger = logging.getLogger(__name__)
_QUEUE_PATH = os.getenv(
"MOMO_EVENT_ROUTER_QUEUE",
os.path.join(os.path.dirname(os.path.dirname(__file__)), "data", "event_router_failed_deliveries.jsonl"),
)
_QUEUE_LOCK = threading.Lock()
async def _handle_l1(event: Dict[str, Any], session_id: str) -> Dict[str, Any]:
@@ -69,6 +77,106 @@ async def _handle_l0(event: Dict[str, Any]) -> Dict[str, Any]:
return {"status": "ok", "echo": event.get("event_type")}
async def _run_tier_handler(tier: str, event: Dict[str, Any], session_id: str) -> Dict[str, Any]:
"""Run AI tier with L0-safe degradation; handler failure must not break alerts."""
try:
if tier == "L0":
return await _handle_l0(event)
if tier == "L1":
return await _handle_l1(event, session_id)
if tier == "L2":
return await _handle_l2(event, session_id)
return await _handle_l0(event)
except Exception as exc:
logger.exception("[EventRouter] %s handler failed, degraded to template alert: %s", tier, exc)
return {
"status": "degraded",
"summary": "AI 分析暫不可用,已降級為模板告警;原始事件仍保留。",
"cause": f"{type(exc).__name__}: {str(exc)[:200]}",
"suggested_actions": ["先依原始事實排查", "若重複發生,查看 event_router_failed_deliveries.jsonl 與服務 logs"],
"handler_error": str(exc)[:500],
}
def _event_key(event: Dict[str, Any]) -> str:
return f"{event.get('source', 'unknown')}:{event.get('event_type', 'unknown')}"
def _is_event_silenced(event: Dict[str, Any]) -> bool:
try:
from services.agent_actions import is_silenced
key = _event_key(event)
return is_silenced(key) or is_silenced(str(event.get("event_type", "")))
except Exception as exc:
logger.warning("[EventRouter] silence check failed: %s", exc)
return False
def _queue_failed_delivery(
event: Dict[str, Any],
tier: str,
message: Optional[str],
errors: list,
reason: str,
) -> bool:
"""Append failed notification delivery to a local JSONL queue for later replay."""
record = {
"ts": datetime.now().isoformat(),
"reason": reason,
"tier": tier,
"event_key": _event_key(event),
"event": event,
"message": message,
"errors": errors,
}
try:
os.makedirs(os.path.dirname(_QUEUE_PATH), exist_ok=True)
with _QUEUE_LOCK:
with open(_QUEUE_PATH, "a", encoding="utf-8") as fh:
fh.write(json.dumps(record, ensure_ascii=False, default=str) + "\n")
return True
except Exception as exc:
logger.error("[EventRouter] failed to queue delivery fallback: %s", exc)
return False
def _execute_safe_actions(result: Dict[str, Any], event: Dict[str, Any]) -> list[Dict[str, Any]]:
"""
Execute only ADR-012 L2 SAFE_ACTIONS when NemoTron explicitly marks the plan auto-safe.
All action functions own their own audit writes.
"""
if not isinstance(result, dict):
return []
if not (result.get("auto_execute") or result.get("dispatch_to") in {"safe_action", "auto_execute"}):
return []
action_plan = result.get("action_plan") or result.get("execution_plan") or []
if not isinstance(action_plan, list):
return []
try:
from services.agent_actions import SAFE_ACTIONS
except Exception as exc:
return [{"action": "load_safe_actions", "status": "error", "error": str(exc)[:200]}]
executed = []
for step in action_plan[:3]:
if not isinstance(step, dict):
continue
action = step.get("action")
params = step.get("params") or {}
if action not in SAFE_ACTIONS:
executed.append({"action": action, "status": "rejected", "reason": "not in SAFE_ACTIONS"})
continue
try:
action_result = SAFE_ACTIONS[action](**params)
executed.append({"action": action, "status": "ok", "result": action_result})
except Exception as exc:
logger.exception("[EventRouter] safe action failed: %s", action)
executed.append({"action": action, "status": "error", "error": str(exc)[:300]})
return executed
async def dispatch(event: Dict[str, Any], admin_chat_ids: Optional[list] = None) -> Dict[str, Any]:
"""
Main event routing entry (ADR-012 §③ — 唯一入口).
@@ -79,17 +187,28 @@ async def dispatch(event: Dict[str, Any], admin_chat_ids: Optional[list] = None)
started_at = time.perf_counter()
try:
if tier == "L0":
result = await _handle_l0(event)
elif tier == "L1":
result = await _handle_l1(event, session_id)
elif tier == "L2":
result = await _handle_l2(event, session_id)
else:
result = await _handle_l0(event)
if _is_event_silenced(event):
return {
"tier": tier,
"sent": 0,
"errors": [],
"latency_ms": int((time.perf_counter() - started_at) * 1000),
"payload": {"status": "silenced", "event_key": _event_key(event)},
"delivered": True,
"silenced": True,
"queued": False,
}
result = await _run_tier_handler(tier, event, session_id)
executed_actions = _execute_safe_actions(result, event)
if executed_actions:
result["executed_actions"] = executed_actions
message, reply_markup = _build_telegram_message(event, tier, result)
send_result = send_telegram_with_result(message, chat_ids=admin_chat_ids, reply_markup=reply_markup)
queued = False
if not send_result["ok"]:
queued = _queue_failed_delivery(event, tier, message, send_result["errors"], "telegram_delivery_failed")
latency_ms = int((time.perf_counter() - started_at) * 1000)
return {
@@ -99,9 +218,12 @@ async def dispatch(event: Dict[str, Any], admin_chat_ids: Optional[list] = None)
"latency_ms": latency_ms,
"payload": result,
"delivered": send_result["ok"],
"silenced": False,
"queued": queued,
}
except Exception as e:
logger.exception(f"[EventRouter] dispatch failed: {e}")
queued = _queue_failed_delivery(event, tier, None, [str(e)], "dispatch_exception")
return {
"tier": tier,
"sent": 0,
@@ -109,6 +231,8 @@ async def dispatch(event: Dict[str, Any], admin_chat_ids: Optional[list] = None)
"latency_ms": int((time.perf_counter() - started_at) * 1000),
"payload": None,
"delivered": False,
"silenced": False,
"queued": queued,
}
@@ -125,6 +249,7 @@ def _run_coroutine_in_thread(coro) -> Dict[str, Any]:
"errors": [str(e)],
"latency_ms": 0,
"payload": None,
"delivered": False,
}
thread = threading.Thread(target=runner, daemon=True)
@@ -137,6 +262,7 @@ def _run_coroutine_in_thread(coro) -> Dict[str, Any]:
"errors": ["dispatch_sync timed out"],
"latency_ms": 15000,
"payload": None,
"delivered": False,
}
return result["value"]
@@ -188,7 +314,8 @@ def _classify(event: Dict[str, Any]) -> str:
return "L1" if has_trace else "L0"
if sev == "alert":
if event_type in {"price_threat", "db_connection_error", "crawler_timeout",
"nim_quota_exhausted", "embedding_failure"}:
"nim_quota_exhausted", "embedding_failure",
"scheduler_task_failure"}:
return "L2"
return "L1"
return "L0"

View File

@@ -366,7 +366,7 @@ class NemotronDispatcher:
"""
if not NIM_API_KEY:
logger.warning("[NemotronDispatcher] NVIDIA_API_KEY 未設定,跳過 NIM 呼叫")
return [], {}
raise RuntimeError("NVIDIA_API_KEY not configured")
threat_summary = json.dumps(
[
@@ -943,6 +943,32 @@ class NemotronDispatcher:
"""
try:
message = (event or {}).get("message", "") or ""
event_type = (event or {}).get("event_type", "")
payload = (event or {}).get("payload") or {}
task_name = payload.get("task_name") or (event or {}).get("task_name")
if event_type == "scheduler_task_failure" and task_name:
try:
from services.agent_actions import ALLOWED_RETRY_TASKS
if task_name in ALLOWED_RETRY_TASKS:
return {
"session_id": f"evt:{event_type}:{(event or {}).get('source', 'unknown')}",
"plan_type": "retry_task",
"action_plan": [{
"action": "retry_task",
"params": {
"task_name": task_name,
"max_attempts": 2,
"backoff_sec": 60,
},
}],
"dispatch_to": "safe_action",
"auto_execute": True,
"metadata": {"event_type": event_type, "task_name": task_name},
}
except Exception as action_err:
logger.warning("[NemotronDispatcher.handle_l2] retry_task 規劃跳過: %s", action_err)
# ctx 可能是 {"latest": {...}} 或已攤平的 intent 結果
hermes = {}
if isinstance(ctx, dict):
@@ -1070,29 +1096,37 @@ class NemotronDispatcher:
"nim_stats": {},
}
if not _check_nim_quota():
# 配額耗盡fallback 直接派發 HIGH 威脅(不帶 NIM 足跡)
logger.warning("[Dispatcher] NIM 配額耗盡fallback 直接派發 HIGH 威脅")
footprint = _build_footprint_block(hermes_stats, None)
for t in nim_candidates:
if t.risk == "HIGH":
self._exec_trigger_price_alert(
t.sku, t.name,
t.gap_pct, t.sales_7d_delta_pct,
t.recommended_action, t.confidence,
momo_price=t.momo_price, comp_price=t.pchome_price,
footprint=footprint,
)
dispatched += 1
if not NIM_API_KEY:
logger.warning("[Dispatcher][ADR-004] NVIDIA_API_KEY 未設定,啟動 Hermes 規則引擎降級")
fb = self._hermes_rule_fallback(nim_candidates, hermes_stats)
return {
"dispatched": dispatched,
"skipped": len(threats) - dispatched + skipped,
"errors": errors,
"nim_stats": {},
"dispatched": dispatched + fb["dispatched"],
"skipped": skipped + fb["skipped"],
"errors": errors + fb["errors"],
"nim_stats": fb["nim_stats"],
}
if not _check_nim_quota():
logger.warning("[Dispatcher][ADR-004] NIM 配額耗盡,啟動 Hermes 規則引擎降級")
fb = self._hermes_rule_fallback(nim_candidates, hermes_stats)
return {
"dispatched": dispatched + fb["dispatched"],
"skipped": skipped + fb["skipped"],
"errors": errors + fb["errors"],
"nim_stats": fb["nim_stats"],
}
try:
tool_calls, nim_stats = self._call_nim(nim_candidates)
if not tool_calls:
logger.warning("[Dispatcher][ADR-004] NIM 0 tool_calls啟動 Hermes 規則引擎降級")
fb = self._hermes_rule_fallback(nim_candidates, hermes_stats)
return {
"dispatched": dispatched + fb["dispatched"],
"skipped": skipped + fb["skipped"],
"errors": errors + fb["errors"],
"nim_stats": fb["nim_stats"],
}
except requests.HTTPError as e:
if e.response is not None and e.response.status_code == 429:
logger.warning("[Dispatcher][ADR-004] NIM HTTP 429啟動 Hermes 規則引擎降級")
@@ -1103,20 +1137,22 @@ class NemotronDispatcher:
"errors": errors + fb["errors"],
"nim_stats": fb["nim_stats"],
}
logger.error(f"[Dispatcher] NIM HTTP 錯誤: {e}")
logger.warning("[Dispatcher][ADR-004] NIM HTTP 錯誤,啟動 Hermes 規則引擎降級: %s", e)
fb = self._hermes_rule_fallback(nim_candidates, hermes_stats)
return {
"dispatched": dispatched,
"skipped": len(nim_candidates),
"errors": errors + [str(e)],
"nim_stats": {},
"dispatched": dispatched + fb["dispatched"],
"skipped": skipped + fb["skipped"],
"errors": errors + [str(e)] + fb["errors"],
"nim_stats": fb["nim_stats"],
}
except Exception as e:
logger.error(f"[Dispatcher] NIM 呼叫失敗: {e}")
logger.warning("[Dispatcher][ADR-004] NIM 呼叫失敗,啟動 Hermes 規則引擎降級: %s", e)
fb = self._hermes_rule_fallback(nim_candidates, hermes_stats)
return {
"dispatched": dispatched,
"skipped": len(nim_candidates),
"errors": errors + [str(e)],
"nim_stats": {},
"dispatched": dispatched + fb["dispatched"],
"skipped": skipped + fb["skipped"],
"errors": errors + [str(e)] + fb["errors"],
"nim_stats": fb["nim_stats"],
}
# 建立運算足跡Telegram 顯示文字 + DB 結構化 JSON共用同一份

View File

@@ -0,0 +1,49 @@
def test_verify_internal_token_requires_env_by_default(monkeypatch):
import services.code_review_pipeline_service as module
monkeypatch.setattr(module, "INTERNAL_TOKEN", "")
monkeypatch.setattr(module, "ALLOW_INSECURE_WEBHOOK", False)
assert module.verify_internal_token("") is False
assert module.verify_internal_token("anything") is False
def test_verify_internal_token_allows_explicit_dev_override(monkeypatch):
import services.code_review_pipeline_service as module
monkeypatch.setattr(module, "INTERNAL_TOKEN", "")
monkeypatch.setattr(module, "ALLOW_INSECURE_WEBHOOK", True)
assert module.verify_internal_token("") is True
def test_code_review_guard_blocks_high_risk_auto_fix(monkeypatch):
import services.code_review_pipeline_service as module
monkeypatch.setattr(module, "AUTO_FIX_ENABLED", True)
pipeline = module.CodeReviewPipeline("abcdef123456", ["services/example.py"])
pipeline.state["severity_summary"] = {"critical": 0, "high": 1, "medium": 0, "low": 0}
guarded = pipeline._guard_ea_decision(
{"priority": "high", "auto_fix": True, "reasoning": "建議修復", "fix_files": ["services/example.py"]},
[{"severity": "HIGH", "file": "services/example.py"}],
)
assert guarded["auto_fix"] is False
assert guarded["human_review_needed"] is True
def test_code_review_guard_requires_auto_fix_feature_flag(monkeypatch):
import services.code_review_pipeline_service as module
monkeypatch.setattr(module, "AUTO_FIX_ENABLED", False)
pipeline = module.CodeReviewPipeline("abcdef123456", ["services/example.py"])
pipeline.state["severity_summary"] = {"critical": 0, "high": 0, "medium": 1, "low": 0}
guarded = pipeline._guard_ea_decision(
{"priority": "medium", "auto_fix": True, "reasoning": "建議修復", "fix_files": ["services/example.py"]},
[{"severity": "MEDIUM", "file": "services/example.py"}],
)
assert guarded["auto_fix"] is False
assert guarded["human_review_needed"] is True

View File

@@ -0,0 +1,87 @@
import asyncio
import json
def test_dispatch_degrades_and_queues_when_ai_or_telegram_fails(tmp_path, monkeypatch):
import services.event_router as event_router
queue_path = tmp_path / "event_router_failed.jsonl"
monkeypatch.setattr(event_router, "_QUEUE_PATH", str(queue_path))
monkeypatch.setattr(event_router, "_is_event_silenced", lambda event: False)
async def broken_l1(event, session_id):
raise RuntimeError("hermes unavailable")
monkeypatch.setattr(event_router, "_handle_l1", broken_l1)
monkeypatch.setattr(
event_router,
"send_telegram_with_result",
lambda *args, **kwargs: {
"ok": False,
"sent": 0,
"failed": 1,
"chat_ids": [123],
"errors": ["123:HTTP 500"],
},
)
result = asyncio.run(event_router.dispatch({
"source": "Scheduler.Test",
"event_type": "crawler_timeout",
"severity": "warning",
"title": "測試任務異常",
"summary": "timeout",
"trace": "Traceback...",
}))
assert result["tier"] == "L1"
assert result["delivered"] is False
assert result["queued"] is True
assert result["payload"]["status"] == "degraded"
queued = json.loads(queue_path.read_text(encoding="utf-8").strip())
assert queued["reason"] == "telegram_delivery_failed"
assert queued["event_key"] == "Scheduler.Test:crawler_timeout"
def test_dispatch_executes_only_auto_safe_actions(monkeypatch):
import services.agent_actions as agent_actions
import services.event_router as event_router
calls = []
monkeypatch.setattr(event_router, "_is_event_silenced", lambda event: False)
monkeypatch.setitem(
agent_actions.SAFE_ACTIONS,
"retry_task",
lambda **params: calls.append(params) or {"status": "scheduled"},
)
async def planned_l2(event, session_id):
return {
"dispatch_to": "safe_action",
"auto_execute": True,
"action_plan": [
{"action": "retry_task", "params": {"task_name": "run_momo_task"}},
{"action": "restart_container", "params": {"container": "momo-db"}},
],
}
monkeypatch.setattr(event_router, "_handle_l2", planned_l2)
monkeypatch.setattr(
event_router,
"send_telegram_with_result",
lambda *args, **kwargs: {"ok": True, "sent": 1, "failed": 0, "chat_ids": [123], "errors": []},
)
result = asyncio.run(event_router.dispatch({
"source": "Scheduler.MOMO",
"event_type": "scheduler_task_failure",
"severity": "alert",
"title": "MOMO 任務異常",
"summary": "boom",
"payload": {"task_name": "run_momo_task"},
}))
assert result["tier"] == "L2"
assert calls == [{"task_name": "run_momo_task"}]
assert result["payload"]["executed_actions"][0]["status"] == "ok"
assert result["payload"]["executed_actions"][1]["status"] == "rejected"

View File

@@ -0,0 +1,79 @@
from dataclasses import dataclass
@dataclass
class FakeThreat:
sku: str
name: str
momo_price: float = 100.0
pchome_price: float = 80.0
gap_pct: float = 25.0
sales_7d_delta_pct: float = -20.0
risk: str = "HIGH"
recommended_action: str = "請評估價格策略"
confidence: float = 0.8
def _patch_execution_methods(monkeypatch, dispatcher):
calls = []
def record(kind):
def _inner(*args, **kwargs):
calls.append({"kind": kind, "args": args, "kwargs": kwargs})
return _inner
monkeypatch.setattr(dispatcher, "_exec_trigger_price_alert", record("price_alert"))
monkeypatch.setattr(dispatcher, "_exec_add_to_recommendation", record("recommendation"))
monkeypatch.setattr(dispatcher, "_exec_flag_for_human_review", record("human_review"))
return calls
def test_dispatch_falls_back_to_hermes_rules_without_nim_api_key(monkeypatch):
import services.nemoton_dispatcher_service as module
monkeypatch.setattr(module, "NIM_API_KEY", "")
dispatcher = module.NemotronDispatcher()
calls = _patch_execution_methods(monkeypatch, dispatcher)
result = dispatcher.dispatch([FakeThreat("SKU-1", "測試品")], hermes_stats={"duration_sec": 1})
assert result["dispatched"] == 1
assert result["skipped"] == 0
assert result["nim_stats"]["degraded"] is True
assert calls[0]["kind"] == "price_alert"
def test_dispatch_falls_back_to_hermes_rules_on_nim_timeout(monkeypatch):
import requests
import services.nemoton_dispatcher_service as module
monkeypatch.setattr(module, "NIM_API_KEY", "test-key")
monkeypatch.setattr(module, "_check_nim_quota", lambda: True)
dispatcher = module.NemotronDispatcher()
calls = _patch_execution_methods(monkeypatch, dispatcher)
monkeypatch.setattr(dispatcher, "_call_nim", lambda threats: (_ for _ in ()).throw(requests.Timeout("timeout")))
result = dispatcher.dispatch([FakeThreat("SKU-2", "測試品")], hermes_stats={"duration_sec": 1})
assert result["dispatched"] == 1
assert result["skipped"] == 0
assert result["nim_stats"]["degraded"] is True
assert result["errors"] == ["timeout"]
assert calls[0]["kind"] == "price_alert"
def test_dispatch_falls_back_to_hermes_rules_on_zero_tool_calls(monkeypatch):
import services.nemoton_dispatcher_service as module
monkeypatch.setattr(module, "NIM_API_KEY", "test-key")
monkeypatch.setattr(module, "_check_nim_quota", lambda: True)
dispatcher = module.NemotronDispatcher()
calls = _patch_execution_methods(monkeypatch, dispatcher)
monkeypatch.setattr(dispatcher, "_call_nim", lambda threats: ([], {"total_tokens": 10}))
result = dispatcher.dispatch([FakeThreat("SKU-3", "測試品")], hermes_stats={"duration_sec": 1})
assert result["dispatched"] == 1
assert result["skipped"] == 0
assert result["nim_stats"]["degraded"] is True
assert calls[0]["kind"] == "price_alert"