feat(ai): Phase 24 C — Telegram /ai 動態控制 + Redis 狀態管理
Some checks failed
CD Pipeline / build-and-deploy (push) Has been cancelled

新增 ai_control.py:
- /ai status: 所有 Provider 狀態 + 路由模式
- /ai router on/off: 動態切換 AIRouter (覆蓋 env var)
- /ai primary <provider>: 設定主要 Provider
- /ai enable/disable <provider>: 控制 Provider 啟停
- /ai cost: 費用統計
- 白名單: OPENCLAW_TG_USER_WHITELIST 保護

telegram_gateway.py:
- _handle_chat_message 加入 /ai 指令攔截路由
- 白名單未授權返回警告

openclaw.py:
- Redis 狀態覆蓋 env USE_AI_ROUTER (/ai router on/off 生效)
- Redis primary_provider 覆蓋路由決策 (/ai primary 生效)
- Redis disabled provider 過濾 (/ai disable 生效)

Redis Keys:
  ai:control:use_router
  ai:control:primary_provider
  ai:control:disabled:<provider>

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
OG T
2026-04-03 00:34:14 +08:00
parent b4b3a457c5
commit dbe71f82e3
3 changed files with 330 additions and 4 deletions

View File

@@ -0,0 +1,284 @@
"""
AI Control Service — Phase 24 C (2026-04-03 ogt)
Telegram /ai 指令動態控制 AI Router 狀態
- Redis 狀態優先於 env var
- OPENCLAW_TG_USER_WHITELIST 白名單保護
Redis Keys:
ai:control:use_router "true"/"false" (覆蓋 USE_AI_ROUTER env)
ai:control:primary_provider "openclaw_nemo"/"gemini"/"ollama"/"claude"
ai:control:disabled:<provider> "1" (TTL: 永久或手動清除)
Usage:
/ai status — 顯示所有 Provider 狀態 + 當前路由模式
/ai primary <p> — 設定主要 Provider (openclaw_nemo/gemini/ollama/claude)
/ai enable <p> — 啟用特定 Provider
/ai disable <p> — 停用特定 Provider
/ai cost — 顯示費用統計
/ai router on — 啟用 AIRouter (覆蓋 env var)
/ai router off — 停用 AIRouter (回滾到舊 fallback chain)
"""
import structlog
logger = structlog.get_logger(__name__)
# Redis Key 常數
_AI_ROUTER_KEY = "ai:control:use_router"
_PRIMARY_PROVIDER_KEY = "ai:control:primary_provider"
_DISABLED_KEY_PREFIX = "ai:control:disabled:"
VALID_PROVIDERS = {"openclaw_nemo", "gemini", "ollama", "claude", "nemotron"}
async def _get_redis():
from src.core.redis_client import get_redis
return get_redis()
async def get_ai_router_enabled() -> bool | None:
"""
從 Redis 取得 AIRouter 啟用狀態。
回傳 None 表示未設定,使用 env var (USE_AI_ROUTER)。
"""
try:
r = await _get_redis()
val = await r.get(_AI_ROUTER_KEY)
if val is None:
return None
return val.decode() == "true" if isinstance(val, bytes) else val == "true"
except Exception as e:
logger.warning("ai_control_redis_read_failed", key=_AI_ROUTER_KEY, error=str(e))
return None
async def set_ai_router_enabled(enabled: bool) -> bool:
"""設定 AIRouter 啟用狀態到 Redis"""
try:
r = await _get_redis()
await r.set(_AI_ROUTER_KEY, "true" if enabled else "false")
logger.info("ai_control_router_set", enabled=enabled)
return True
except Exception as e:
logger.error("ai_control_router_set_failed", error=str(e))
return False
async def get_primary_provider() -> str | None:
"""從 Redis 取得主要 ProviderNone 表示使用 Router 智慧路由"""
try:
r = await _get_redis()
val = await r.get(_PRIMARY_PROVIDER_KEY)
if val is None:
return None
return val.decode() if isinstance(val, bytes) else val
except Exception:
return None
async def set_primary_provider(provider: str) -> bool:
"""設定主要 Provider 到 Redis"""
if provider not in VALID_PROVIDERS:
return False
try:
r = await _get_redis()
await r.set(_PRIMARY_PROVIDER_KEY, provider)
logger.info("ai_control_primary_set", provider=provider)
return True
except Exception as e:
logger.error("ai_control_primary_set_failed", error=str(e))
return False
async def clear_primary_provider() -> bool:
"""清除主要 Provider 設定(回歸智慧路由)"""
try:
r = await _get_redis()
await r.delete(_PRIMARY_PROVIDER_KEY)
return True
except Exception:
return False
async def is_provider_disabled(provider: str) -> bool:
"""檢查 Provider 是否被停用"""
try:
r = await _get_redis()
val = await r.get(f"{_DISABLED_KEY_PREFIX}{provider}")
return val is not None
except Exception:
return False
async def set_provider_disabled(provider: str, disabled: bool) -> bool:
"""設定 Provider 啟用/停用"""
if provider not in VALID_PROVIDERS:
return False
try:
r = await _get_redis()
key = f"{_DISABLED_KEY_PREFIX}{provider}"
if disabled:
await r.set(key, "1")
else:
await r.delete(key)
logger.info("ai_control_provider_set", provider=provider, disabled=disabled)
return True
except Exception as e:
logger.error("ai_control_provider_set_failed", error=str(e))
return False
async def get_status_summary() -> str:
"""
取得 AI 控制狀態摘要 (Telegram 格式 HTML)
"""
from src.core.config import get_settings
settings = get_settings()
# AIRouter 狀態
redis_router = await get_ai_router_enabled()
if redis_router is not None:
router_status = "✅ ON (Redis 覆蓋)" if redis_router else "❌ OFF (Redis 覆蓋)"
else:
router_status = "✅ ON (env)" if settings.USE_AI_ROUTER else "❌ OFF (env)"
# Primary Provider
primary = await get_primary_provider()
primary_str = f"🎯 {primary}" if primary else "🤖 智慧路由 (AIRouter 決定)"
# Provider 狀態
provider_lines = []
for p in ["openclaw_nemo", "gemini", "ollama", "claude", "nemotron"]:
disabled = await is_provider_disabled(p)
icon = "" if disabled else ""
provider_lines.append(f" {icon} {p}")
# 費用統計
cost_lines = []
try:
r = await _get_redis()
for p in ["openclaw_nemo", "gemini", "claude", "nemotron"]:
val = await r.get(f"ai_rate:total_cost:{p}")
if val:
cost = float(val)
if cost > 0:
cost_lines.append(f" 💰 {p}: ${cost:.4f}")
except Exception:
cost_lines.append(" (費用統計不可用)")
providers_str = "\n".join(provider_lines)
costs_str = "\n".join(cost_lines) if cost_lines else " (尚無費用記錄)"
return (
f"<b>🤖 AI Router 控制面板</b>\n\n"
f"<b>路由模式</b>: {router_status}\n"
f"<b>主要 Provider</b>: {primary_str}\n\n"
f"<b>Provider 狀態</b>:\n{providers_str}\n\n"
f"<b>費用統計</b>:\n{costs_str}\n\n"
f"<code>/ai primary &lt;provider&gt;</code> 切換\n"
f"<code>/ai router on/off</code> 開關 AIRouter\n"
f"<code>/ai enable/disable &lt;provider&gt;</code> 控制 Provider"
)
async def handle_ai_command(text: str) -> str:
"""
處理 /ai 指令,回傳 Telegram HTML 回應。
Args:
text: 完整訊息文字 (e.g., "/ai status", "/ai primary gemini")
Returns:
HTML 格式回應字串
"""
parts = text.strip().split()
if len(parts) < 2:
return await get_status_summary()
sub = parts[1].lower()
if sub == "status":
return await get_status_summary()
elif sub == "router":
if len(parts) < 3:
return "用法: <code>/ai router on</code> 或 <code>/ai router off</code>"
action = parts[2].lower()
if action in ("on", "true", "enable"):
ok = await set_ai_router_enabled(True)
return "✅ AIRouter 已啟用 (Redis 覆蓋)" if ok else "❌ 寫入 Redis 失敗"
elif action in ("off", "false", "disable"):
ok = await set_ai_router_enabled(False)
return "⚠️ AIRouter 已停用,回滾舊 fallback chain (Redis 覆蓋)" if ok else "❌ 寫入 Redis 失敗"
else:
return f"❌ 未知動作: {action},請用 on/off"
elif sub == "primary":
if len(parts) < 3:
current = await get_primary_provider()
if current:
return f"當前主要 Provider: <b>{current}</b>\n用法: <code>/ai primary &lt;provider&gt;</code> 或 <code>/ai primary auto</code>"
return "當前: 🤖 智慧路由\n用法: <code>/ai primary &lt;provider&gt;</code>"
provider = parts[2].lower()
if provider == "auto":
ok = await clear_primary_provider()
return "✅ 已清除主要 Provider恢復智慧路由" if ok else "❌ 操作失敗"
if provider not in VALID_PROVIDERS:
return f"❌ 未知 Provider: {provider}\n可用: {', '.join(sorted(VALID_PROVIDERS))}"
ok = await set_primary_provider(provider)
return f"✅ 主要 Provider 已設為 <b>{provider}</b>" if ok else "❌ 寫入 Redis 失敗"
elif sub == "enable":
if len(parts) < 3:
return "用法: <code>/ai enable &lt;provider&gt;</code>"
provider = parts[2].lower()
if provider not in VALID_PROVIDERS:
return f"❌ 未知 Provider: {provider}"
ok = await set_provider_disabled(provider, False)
return f"{provider} 已啟用" if ok else "❌ 操作失敗"
elif sub == "disable":
if len(parts) < 3:
return "用法: <code>/ai disable &lt;provider&gt;</code>"
provider = parts[2].lower()
if provider not in VALID_PROVIDERS:
return f"❌ 未知 Provider: {provider}"
ok = await set_provider_disabled(provider, True)
return f"⚠️ {provider} 已停用" if ok else "❌ 操作失敗"
elif sub == "cost":
lines = ["<b>💰 AI 費用統計</b>\n"]
try:
r = await _get_redis()
total = 0.0
for p in ["openclaw_nemo", "gemini", "claude", "nemotron"]:
val = await r.get(f"ai_rate:total_cost:{p}")
if val:
cost = float(val)
total += cost
if cost > 0:
lines.append(f" {p}: ${cost:.4f}")
if total > 0:
lines.append(f"\n <b>總計: ${total:.4f}</b>")
else:
lines.append(" (尚無費用記錄)")
except Exception as e:
lines.append(f" ⚠️ 費用查詢失敗: {e}")
return "\n".join(lines)
elif sub == "help":
return (
"<b>/ai 指令說明</b>\n\n"
"<code>/ai status</code> — 顯示所有狀態\n"
"<code>/ai router on/off</code> — 開關 AIRouter\n"
"<code>/ai primary &lt;p&gt;</code> — 設主要 Provider\n"
"<code>/ai primary auto</code> — 恢復智慧路由\n"
"<code>/ai enable &lt;p&gt;</code> — 啟用 Provider\n"
"<code>/ai disable &lt;p&gt;</code> — 停用 Provider\n"
"<code>/ai cost</code> — 費用統計\n\n"
f"可用 Provider: {', '.join(sorted(VALID_PROVIDERS))}"
)
else:
return f"❌ 未知子指令: {sub}\n輸入 <code>/ai help</code> 查看說明"

View File

@@ -924,8 +924,19 @@ class OpenClawService:
# USE_AI_ROUTER=true → 新 AIRouterExecutor 路由
# USE_AI_ROUTER=false → 舊 if/else fallback chain (現狀)
# 回滾: kubectl set env deployment/awoooi-api USE_AI_ROUTER=false
# Phase 24 C: Redis 狀態覆蓋 env var (/ai router on/off)
# =================================================================
if settings.USE_AI_ROUTER:
# Redis 狀態優先 (Phase 24 C — 2026-04-03 ogt)
_use_ai_router = settings.USE_AI_ROUTER
try:
from src.services.ai_control import get_ai_router_enabled
_redis_override = await get_ai_router_enabled()
if _redis_override is not None:
_use_ai_router = _redis_override
except Exception:
pass
if _use_ai_router:
try:
# 2026-04-02 ogt: C2 修復 — 呼叫 AIRouter.route() 智慧路由 (非靜態 order)
# D1 意圖分類路由、D7 隱私保護 (DIAGNOSE/CODE_REVIEW 強制 local) 生效
@@ -937,10 +948,23 @@ class OpenClawService:
decision = await router.route(prompt, alert_context)
# Step 2: 從 RoutingDecision 建立 provider_order (主 + fallback)
# Phase 24 C: Redis primary_provider 覆蓋路由決策
provider_order = [decision.selected_provider.value] + [
p.value for p, _ in decision.fallback_chain
if p.value != decision.selected_provider.value
]
try:
from src.services.ai_control import get_primary_provider, is_provider_disabled
_primary = await get_primary_provider()
if _primary and _primary != decision.selected_provider.value:
# 把 primary 移到首位 (保留原始 fallback)
provider_order = [_primary] + [p for p in provider_order if p != _primary]
# 過濾被停用的 Provider
_filtered = [p for p in provider_order if not await is_provider_disabled(p)]
if _filtered:
provider_order = _filtered
except Exception:
pass
# Step 3: D7 隱私 — DIAGNOSE/CODE_REVIEW 強制 local
require_local = decision.intent in (IntentType.DIAGNOSE, IntentType.CODE_REVIEW)

View File

@@ -2797,10 +2797,28 @@ class TelegramGateway:
logger.warning("telegram_chat_unauthorized", user_id=user_id, error=str(e))
return
# 2. 顯示 "正在輸入中..."
# 2. /ai 指令攔截 (Phase 24 C — 2026-04-03 ogt)
# 白名單: OPENCLAW_TG_USER_WHITELIST (與審核白名單共用)
if text.strip().lower().startswith("/ai"):
whitelist = settings.get_tg_user_whitelist()
if whitelist and user_id not in whitelist:
logger.warning("telegram_ai_command_unauthorized", user_id=user_id)
await self.send_notification(
"⛔ 未授權:/ai 指令僅限白名單用戶",
parse_mode="HTML",
chat_id=chat_id,
)
return
from src.services.ai_control import handle_ai_command
response = await handle_ai_command(text.strip())
await self.send_notification(response, parse_mode="HTML", chat_id=chat_id)
logger.info("telegram_ai_command_handled", user_id=user_id, text=text[:50])
return
# 3. 顯示 "正在輸入中..."
await self._send_chat_action(chat_id, "typing")
# 3. 呼叫 ChatManager 處理
# 4. 呼叫 ChatManager 處理
chat_manager = get_chat_manager()
response = await chat_manager.generate_response(
user_id=user_id,
@@ -2808,7 +2826,7 @@ class TelegramGateway:
message_text=text,
)
# 4. 回覆統帥 (定向回傳)
# 5. 回覆統帥 (定向回傳)
await self.send_notification(response, parse_mode="HTML", chat_id=chat_id)
async def _send_chat_action(self, chat_id: int, action: str) -> None: