diff --git a/apps/api/src/services/ai_control.py b/apps/api/src/services/ai_control.py new file mode 100644 index 00000000..5ff7d678 --- /dev/null +++ b/apps/api/src/services/ai_control.py @@ -0,0 +1,284 @@ +""" +AI Control Service — Phase 24 C (2026-04-03 ogt) + +Telegram /ai 指令動態控制 AI Router 狀態 +- Redis 狀態優先於 env var +- OPENCLAW_TG_USER_WHITELIST 白名單保護 + +Redis Keys: + ai:control:use_router "true"/"false" (覆蓋 USE_AI_ROUTER env) + ai:control:primary_provider "openclaw_nemo"/"gemini"/"ollama"/"claude" + ai:control:disabled: "1" (TTL: 永久或手動清除) + +Usage: + /ai status — 顯示所有 Provider 狀態 + 當前路由模式 + /ai primary

— 設定主要 Provider (openclaw_nemo/gemini/ollama/claude) + /ai enable

— 啟用特定 Provider + /ai disable

— 停用特定 Provider + /ai cost — 顯示費用統計 + /ai router on — 啟用 AIRouter (覆蓋 env var) + /ai router off — 停用 AIRouter (回滾到舊 fallback chain) +""" + +import structlog + +logger = structlog.get_logger(__name__) + +# Redis Key 常數 +_AI_ROUTER_KEY = "ai:control:use_router" +_PRIMARY_PROVIDER_KEY = "ai:control:primary_provider" +_DISABLED_KEY_PREFIX = "ai:control:disabled:" + +VALID_PROVIDERS = {"openclaw_nemo", "gemini", "ollama", "claude", "nemotron"} + + +async def _get_redis(): + from src.core.redis_client import get_redis + return get_redis() + + +async def get_ai_router_enabled() -> bool | None: + """ + 從 Redis 取得 AIRouter 啟用狀態。 + 回傳 None 表示未設定,使用 env var (USE_AI_ROUTER)。 + """ + try: + r = await _get_redis() + val = await r.get(_AI_ROUTER_KEY) + if val is None: + return None + return val.decode() == "true" if isinstance(val, bytes) else val == "true" + except Exception as e: + logger.warning("ai_control_redis_read_failed", key=_AI_ROUTER_KEY, error=str(e)) + return None + + +async def set_ai_router_enabled(enabled: bool) -> bool: + """設定 AIRouter 啟用狀態到 Redis""" + try: + r = await _get_redis() + await r.set(_AI_ROUTER_KEY, "true" if enabled else "false") + logger.info("ai_control_router_set", enabled=enabled) + return True + except Exception as e: + logger.error("ai_control_router_set_failed", error=str(e)) + return False + + +async def get_primary_provider() -> str | None: + """從 Redis 取得主要 Provider,None 表示使用 Router 智慧路由""" + try: + r = await _get_redis() + val = await r.get(_PRIMARY_PROVIDER_KEY) + if val is None: + return None + return val.decode() if isinstance(val, bytes) else val + except Exception: + return None + + +async def set_primary_provider(provider: str) -> bool: + """設定主要 Provider 到 Redis""" + if provider not in VALID_PROVIDERS: + return False + try: + r = await _get_redis() + await r.set(_PRIMARY_PROVIDER_KEY, provider) + logger.info("ai_control_primary_set", provider=provider) + return True + except Exception as e: + logger.error("ai_control_primary_set_failed", error=str(e)) + return False + + +async def clear_primary_provider() -> bool: + """清除主要 Provider 設定(回歸智慧路由)""" + try: + r = await _get_redis() + await r.delete(_PRIMARY_PROVIDER_KEY) + return True + except Exception: + return False + + +async def is_provider_disabled(provider: str) -> bool: + """檢查 Provider 是否被停用""" + try: + r = await _get_redis() + val = await r.get(f"{_DISABLED_KEY_PREFIX}{provider}") + return val is not None + except Exception: + return False + + +async def set_provider_disabled(provider: str, disabled: bool) -> bool: + """設定 Provider 啟用/停用""" + if provider not in VALID_PROVIDERS: + return False + try: + r = await _get_redis() + key = f"{_DISABLED_KEY_PREFIX}{provider}" + if disabled: + await r.set(key, "1") + else: + await r.delete(key) + logger.info("ai_control_provider_set", provider=provider, disabled=disabled) + return True + except Exception as e: + logger.error("ai_control_provider_set_failed", error=str(e)) + return False + + +async def get_status_summary() -> str: + """ + 取得 AI 控制狀態摘要 (Telegram 格式 HTML) + """ + from src.core.config import get_settings + settings = get_settings() + + # AIRouter 狀態 + redis_router = await get_ai_router_enabled() + if redis_router is not None: + router_status = "✅ ON (Redis 覆蓋)" if redis_router else "❌ OFF (Redis 覆蓋)" + else: + router_status = "✅ ON (env)" if settings.USE_AI_ROUTER else "❌ OFF (env)" + + # Primary Provider + primary = await get_primary_provider() + primary_str = f"🎯 {primary}" if primary else "🤖 智慧路由 (AIRouter 決定)" + + # Provider 狀態 + provider_lines = [] + for p in ["openclaw_nemo", "gemini", "ollama", "claude", "nemotron"]: + disabled = await is_provider_disabled(p) + icon = "❌" if disabled else "✅" + provider_lines.append(f" {icon} {p}") + + # 費用統計 + cost_lines = [] + try: + r = await _get_redis() + for p in ["openclaw_nemo", "gemini", "claude", "nemotron"]: + val = await r.get(f"ai_rate:total_cost:{p}") + if val: + cost = float(val) + if cost > 0: + cost_lines.append(f" 💰 {p}: ${cost:.4f}") + except Exception: + cost_lines.append(" (費用統計不可用)") + + providers_str = "\n".join(provider_lines) + costs_str = "\n".join(cost_lines) if cost_lines else " (尚無費用記錄)" + + return ( + f"🤖 AI Router 控制面板\n\n" + f"路由模式: {router_status}\n" + f"主要 Provider: {primary_str}\n\n" + f"Provider 狀態:\n{providers_str}\n\n" + f"費用統計:\n{costs_str}\n\n" + f"/ai primary <provider> 切換\n" + f"/ai router on/off 開關 AIRouter\n" + f"/ai enable/disable <provider> 控制 Provider" + ) + + +async def handle_ai_command(text: str) -> str: + """ + 處理 /ai 指令,回傳 Telegram HTML 回應。 + + Args: + text: 完整訊息文字 (e.g., "/ai status", "/ai primary gemini") + + Returns: + HTML 格式回應字串 + """ + parts = text.strip().split() + if len(parts) < 2: + return await get_status_summary() + + sub = parts[1].lower() + + if sub == "status": + return await get_status_summary() + + elif sub == "router": + if len(parts) < 3: + return "用法: /ai router on/ai router off" + action = parts[2].lower() + if action in ("on", "true", "enable"): + ok = await set_ai_router_enabled(True) + return "✅ AIRouter 已啟用 (Redis 覆蓋)" if ok else "❌ 寫入 Redis 失敗" + elif action in ("off", "false", "disable"): + ok = await set_ai_router_enabled(False) + return "⚠️ AIRouter 已停用,回滾舊 fallback chain (Redis 覆蓋)" if ok else "❌ 寫入 Redis 失敗" + else: + return f"❌ 未知動作: {action},請用 on/off" + + elif sub == "primary": + if len(parts) < 3: + current = await get_primary_provider() + if current: + return f"當前主要 Provider: {current}\n用法: /ai primary <provider>/ai primary auto" + return "當前: 🤖 智慧路由\n用法: /ai primary <provider>" + provider = parts[2].lower() + if provider == "auto": + ok = await clear_primary_provider() + return "✅ 已清除主要 Provider,恢復智慧路由" if ok else "❌ 操作失敗" + if provider not in VALID_PROVIDERS: + return f"❌ 未知 Provider: {provider}\n可用: {', '.join(sorted(VALID_PROVIDERS))}" + ok = await set_primary_provider(provider) + return f"✅ 主要 Provider 已設為 {provider}" if ok else "❌ 寫入 Redis 失敗" + + elif sub == "enable": + if len(parts) < 3: + return "用法: /ai enable <provider>" + provider = parts[2].lower() + if provider not in VALID_PROVIDERS: + return f"❌ 未知 Provider: {provider}" + ok = await set_provider_disabled(provider, False) + return f"✅ {provider} 已啟用" if ok else "❌ 操作失敗" + + elif sub == "disable": + if len(parts) < 3: + return "用法: /ai disable <provider>" + provider = parts[2].lower() + if provider not in VALID_PROVIDERS: + return f"❌ 未知 Provider: {provider}" + ok = await set_provider_disabled(provider, True) + return f"⚠️ {provider} 已停用" if ok else "❌ 操作失敗" + + elif sub == "cost": + lines = ["💰 AI 費用統計\n"] + try: + r = await _get_redis() + total = 0.0 + for p in ["openclaw_nemo", "gemini", "claude", "nemotron"]: + val = await r.get(f"ai_rate:total_cost:{p}") + if val: + cost = float(val) + total += cost + if cost > 0: + lines.append(f" {p}: ${cost:.4f}") + if total > 0: + lines.append(f"\n 總計: ${total:.4f}") + else: + lines.append(" (尚無費用記錄)") + except Exception as e: + lines.append(f" ⚠️ 費用查詢失敗: {e}") + return "\n".join(lines) + + elif sub == "help": + return ( + "/ai 指令說明\n\n" + "/ai status — 顯示所有狀態\n" + "/ai router on/off — 開關 AIRouter\n" + "/ai primary <p> — 設主要 Provider\n" + "/ai primary auto — 恢復智慧路由\n" + "/ai enable <p> — 啟用 Provider\n" + "/ai disable <p> — 停用 Provider\n" + "/ai cost — 費用統計\n\n" + f"可用 Provider: {', '.join(sorted(VALID_PROVIDERS))}" + ) + + else: + return f"❌ 未知子指令: {sub}\n輸入 /ai help 查看說明" diff --git a/apps/api/src/services/openclaw.py b/apps/api/src/services/openclaw.py index 448f92f0..325da25e 100644 --- a/apps/api/src/services/openclaw.py +++ b/apps/api/src/services/openclaw.py @@ -924,8 +924,19 @@ class OpenClawService: # USE_AI_ROUTER=true → 新 AIRouterExecutor 路由 # USE_AI_ROUTER=false → 舊 if/else fallback chain (現狀) # 回滾: kubectl set env deployment/awoooi-api USE_AI_ROUTER=false + # Phase 24 C: Redis 狀態覆蓋 env var (/ai router on/off) # ================================================================= - if settings.USE_AI_ROUTER: + # Redis 狀態優先 (Phase 24 C — 2026-04-03 ogt) + _use_ai_router = settings.USE_AI_ROUTER + try: + from src.services.ai_control import get_ai_router_enabled + _redis_override = await get_ai_router_enabled() + if _redis_override is not None: + _use_ai_router = _redis_override + except Exception: + pass + + if _use_ai_router: try: # 2026-04-02 ogt: C2 修復 — 呼叫 AIRouter.route() 智慧路由 (非靜態 order) # D1 意圖分類路由、D7 隱私保護 (DIAGNOSE/CODE_REVIEW 強制 local) 生效 @@ -937,10 +948,23 @@ class OpenClawService: decision = await router.route(prompt, alert_context) # Step 2: 從 RoutingDecision 建立 provider_order (主 + fallback) + # Phase 24 C: Redis primary_provider 覆蓋路由決策 provider_order = [decision.selected_provider.value] + [ p.value for p, _ in decision.fallback_chain if p.value != decision.selected_provider.value ] + try: + from src.services.ai_control import get_primary_provider, is_provider_disabled + _primary = await get_primary_provider() + if _primary and _primary != decision.selected_provider.value: + # 把 primary 移到首位 (保留原始 fallback) + provider_order = [_primary] + [p for p in provider_order if p != _primary] + # 過濾被停用的 Provider + _filtered = [p for p in provider_order if not await is_provider_disabled(p)] + if _filtered: + provider_order = _filtered + except Exception: + pass # Step 3: D7 隱私 — DIAGNOSE/CODE_REVIEW 強制 local require_local = decision.intent in (IntentType.DIAGNOSE, IntentType.CODE_REVIEW) diff --git a/apps/api/src/services/telegram_gateway.py b/apps/api/src/services/telegram_gateway.py index 13a9708b..105fead3 100644 --- a/apps/api/src/services/telegram_gateway.py +++ b/apps/api/src/services/telegram_gateway.py @@ -2797,10 +2797,28 @@ class TelegramGateway: logger.warning("telegram_chat_unauthorized", user_id=user_id, error=str(e)) return - # 2. 顯示 "正在輸入中..." + # 2. /ai 指令攔截 (Phase 24 C — 2026-04-03 ogt) + # 白名單: OPENCLAW_TG_USER_WHITELIST (與審核白名單共用) + if text.strip().lower().startswith("/ai"): + whitelist = settings.get_tg_user_whitelist() + if whitelist and user_id not in whitelist: + logger.warning("telegram_ai_command_unauthorized", user_id=user_id) + await self.send_notification( + "⛔ 未授權:/ai 指令僅限白名單用戶", + parse_mode="HTML", + chat_id=chat_id, + ) + return + from src.services.ai_control import handle_ai_command + response = await handle_ai_command(text.strip()) + await self.send_notification(response, parse_mode="HTML", chat_id=chat_id) + logger.info("telegram_ai_command_handled", user_id=user_id, text=text[:50]) + return + + # 3. 顯示 "正在輸入中..." await self._send_chat_action(chat_id, "typing") - # 3. 呼叫 ChatManager 處理 + # 4. 呼叫 ChatManager 處理 chat_manager = get_chat_manager() response = await chat_manager.generate_response( user_id=user_id, @@ -2808,7 +2826,7 @@ class TelegramGateway: message_text=text, ) - # 4. 回覆統帥 (定向回傳) + # 5. 回覆統帥 (定向回傳) await self.send_notification(response, parse_mode="HTML", chat_id=chat_id) async def _send_chat_action(self, chat_id: int, action: str) -> None: