feat(ai): Phase 24 C — Telegram /ai 動態控制 + Redis 狀態管理
Some checks failed
CD Pipeline / build-and-deploy (push) Has been cancelled
Some checks failed
CD Pipeline / build-and-deploy (push) Has been cancelled
新增 ai_control.py: - /ai status: 所有 Provider 狀態 + 路由模式 - /ai router on/off: 動態切換 AIRouter (覆蓋 env var) - /ai primary <provider>: 設定主要 Provider - /ai enable/disable <provider>: 控制 Provider 啟停 - /ai cost: 費用統計 - 白名單: OPENCLAW_TG_USER_WHITELIST 保護 telegram_gateway.py: - _handle_chat_message 加入 /ai 指令攔截路由 - 白名單未授權返回警告 openclaw.py: - Redis 狀態覆蓋 env USE_AI_ROUTER (/ai router on/off 生效) - Redis primary_provider 覆蓋路由決策 (/ai primary 生效) - Redis disabled provider 過濾 (/ai disable 生效) Redis Keys: ai:control:use_router ai:control:primary_provider ai:control:disabled:<provider> Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
284
apps/api/src/services/ai_control.py
Normal file
284
apps/api/src/services/ai_control.py
Normal file
@@ -0,0 +1,284 @@
|
||||
"""
|
||||
AI Control Service — Phase 24 C (2026-04-03 ogt)
|
||||
|
||||
Telegram /ai 指令動態控制 AI Router 狀態
|
||||
- Redis 狀態優先於 env var
|
||||
- OPENCLAW_TG_USER_WHITELIST 白名單保護
|
||||
|
||||
Redis Keys:
|
||||
ai:control:use_router "true"/"false" (覆蓋 USE_AI_ROUTER env)
|
||||
ai:control:primary_provider "openclaw_nemo"/"gemini"/"ollama"/"claude"
|
||||
ai:control:disabled:<provider> "1" (TTL: 永久或手動清除)
|
||||
|
||||
Usage:
|
||||
/ai status — 顯示所有 Provider 狀態 + 當前路由模式
|
||||
/ai primary <p> — 設定主要 Provider (openclaw_nemo/gemini/ollama/claude)
|
||||
/ai enable <p> — 啟用特定 Provider
|
||||
/ai disable <p> — 停用特定 Provider
|
||||
/ai cost — 顯示費用統計
|
||||
/ai router on — 啟用 AIRouter (覆蓋 env var)
|
||||
/ai router off — 停用 AIRouter (回滾到舊 fallback chain)
|
||||
"""
|
||||
|
||||
import structlog
|
||||
|
||||
logger = structlog.get_logger(__name__)
|
||||
|
||||
# Redis Key 常數
|
||||
_AI_ROUTER_KEY = "ai:control:use_router"
|
||||
_PRIMARY_PROVIDER_KEY = "ai:control:primary_provider"
|
||||
_DISABLED_KEY_PREFIX = "ai:control:disabled:"
|
||||
|
||||
VALID_PROVIDERS = {"openclaw_nemo", "gemini", "ollama", "claude", "nemotron"}
|
||||
|
||||
|
||||
async def _get_redis():
|
||||
from src.core.redis_client import get_redis
|
||||
return get_redis()
|
||||
|
||||
|
||||
async def get_ai_router_enabled() -> bool | None:
|
||||
"""
|
||||
從 Redis 取得 AIRouter 啟用狀態。
|
||||
回傳 None 表示未設定,使用 env var (USE_AI_ROUTER)。
|
||||
"""
|
||||
try:
|
||||
r = await _get_redis()
|
||||
val = await r.get(_AI_ROUTER_KEY)
|
||||
if val is None:
|
||||
return None
|
||||
return val.decode() == "true" if isinstance(val, bytes) else val == "true"
|
||||
except Exception as e:
|
||||
logger.warning("ai_control_redis_read_failed", key=_AI_ROUTER_KEY, error=str(e))
|
||||
return None
|
||||
|
||||
|
||||
async def set_ai_router_enabled(enabled: bool) -> bool:
|
||||
"""設定 AIRouter 啟用狀態到 Redis"""
|
||||
try:
|
||||
r = await _get_redis()
|
||||
await r.set(_AI_ROUTER_KEY, "true" if enabled else "false")
|
||||
logger.info("ai_control_router_set", enabled=enabled)
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error("ai_control_router_set_failed", error=str(e))
|
||||
return False
|
||||
|
||||
|
||||
async def get_primary_provider() -> str | None:
|
||||
"""從 Redis 取得主要 Provider,None 表示使用 Router 智慧路由"""
|
||||
try:
|
||||
r = await _get_redis()
|
||||
val = await r.get(_PRIMARY_PROVIDER_KEY)
|
||||
if val is None:
|
||||
return None
|
||||
return val.decode() if isinstance(val, bytes) else val
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
async def set_primary_provider(provider: str) -> bool:
|
||||
"""設定主要 Provider 到 Redis"""
|
||||
if provider not in VALID_PROVIDERS:
|
||||
return False
|
||||
try:
|
||||
r = await _get_redis()
|
||||
await r.set(_PRIMARY_PROVIDER_KEY, provider)
|
||||
logger.info("ai_control_primary_set", provider=provider)
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error("ai_control_primary_set_failed", error=str(e))
|
||||
return False
|
||||
|
||||
|
||||
async def clear_primary_provider() -> bool:
|
||||
"""清除主要 Provider 設定(回歸智慧路由)"""
|
||||
try:
|
||||
r = await _get_redis()
|
||||
await r.delete(_PRIMARY_PROVIDER_KEY)
|
||||
return True
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
async def is_provider_disabled(provider: str) -> bool:
|
||||
"""檢查 Provider 是否被停用"""
|
||||
try:
|
||||
r = await _get_redis()
|
||||
val = await r.get(f"{_DISABLED_KEY_PREFIX}{provider}")
|
||||
return val is not None
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
async def set_provider_disabled(provider: str, disabled: bool) -> bool:
|
||||
"""設定 Provider 啟用/停用"""
|
||||
if provider not in VALID_PROVIDERS:
|
||||
return False
|
||||
try:
|
||||
r = await _get_redis()
|
||||
key = f"{_DISABLED_KEY_PREFIX}{provider}"
|
||||
if disabled:
|
||||
await r.set(key, "1")
|
||||
else:
|
||||
await r.delete(key)
|
||||
logger.info("ai_control_provider_set", provider=provider, disabled=disabled)
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error("ai_control_provider_set_failed", error=str(e))
|
||||
return False
|
||||
|
||||
|
||||
async def get_status_summary() -> str:
|
||||
"""
|
||||
取得 AI 控制狀態摘要 (Telegram 格式 HTML)
|
||||
"""
|
||||
from src.core.config import get_settings
|
||||
settings = get_settings()
|
||||
|
||||
# AIRouter 狀態
|
||||
redis_router = await get_ai_router_enabled()
|
||||
if redis_router is not None:
|
||||
router_status = "✅ ON (Redis 覆蓋)" if redis_router else "❌ OFF (Redis 覆蓋)"
|
||||
else:
|
||||
router_status = "✅ ON (env)" if settings.USE_AI_ROUTER else "❌ OFF (env)"
|
||||
|
||||
# Primary Provider
|
||||
primary = await get_primary_provider()
|
||||
primary_str = f"🎯 {primary}" if primary else "🤖 智慧路由 (AIRouter 決定)"
|
||||
|
||||
# Provider 狀態
|
||||
provider_lines = []
|
||||
for p in ["openclaw_nemo", "gemini", "ollama", "claude", "nemotron"]:
|
||||
disabled = await is_provider_disabled(p)
|
||||
icon = "❌" if disabled else "✅"
|
||||
provider_lines.append(f" {icon} {p}")
|
||||
|
||||
# 費用統計
|
||||
cost_lines = []
|
||||
try:
|
||||
r = await _get_redis()
|
||||
for p in ["openclaw_nemo", "gemini", "claude", "nemotron"]:
|
||||
val = await r.get(f"ai_rate:total_cost:{p}")
|
||||
if val:
|
||||
cost = float(val)
|
||||
if cost > 0:
|
||||
cost_lines.append(f" 💰 {p}: ${cost:.4f}")
|
||||
except Exception:
|
||||
cost_lines.append(" (費用統計不可用)")
|
||||
|
||||
providers_str = "\n".join(provider_lines)
|
||||
costs_str = "\n".join(cost_lines) if cost_lines else " (尚無費用記錄)"
|
||||
|
||||
return (
|
||||
f"<b>🤖 AI Router 控制面板</b>\n\n"
|
||||
f"<b>路由模式</b>: {router_status}\n"
|
||||
f"<b>主要 Provider</b>: {primary_str}\n\n"
|
||||
f"<b>Provider 狀態</b>:\n{providers_str}\n\n"
|
||||
f"<b>費用統計</b>:\n{costs_str}\n\n"
|
||||
f"<code>/ai primary <provider></code> 切換\n"
|
||||
f"<code>/ai router on/off</code> 開關 AIRouter\n"
|
||||
f"<code>/ai enable/disable <provider></code> 控制 Provider"
|
||||
)
|
||||
|
||||
|
||||
async def handle_ai_command(text: str) -> str:
|
||||
"""
|
||||
處理 /ai 指令,回傳 Telegram HTML 回應。
|
||||
|
||||
Args:
|
||||
text: 完整訊息文字 (e.g., "/ai status", "/ai primary gemini")
|
||||
|
||||
Returns:
|
||||
HTML 格式回應字串
|
||||
"""
|
||||
parts = text.strip().split()
|
||||
if len(parts) < 2:
|
||||
return await get_status_summary()
|
||||
|
||||
sub = parts[1].lower()
|
||||
|
||||
if sub == "status":
|
||||
return await get_status_summary()
|
||||
|
||||
elif sub == "router":
|
||||
if len(parts) < 3:
|
||||
return "用法: <code>/ai router on</code> 或 <code>/ai router off</code>"
|
||||
action = parts[2].lower()
|
||||
if action in ("on", "true", "enable"):
|
||||
ok = await set_ai_router_enabled(True)
|
||||
return "✅ AIRouter 已啟用 (Redis 覆蓋)" if ok else "❌ 寫入 Redis 失敗"
|
||||
elif action in ("off", "false", "disable"):
|
||||
ok = await set_ai_router_enabled(False)
|
||||
return "⚠️ AIRouter 已停用,回滾舊 fallback chain (Redis 覆蓋)" if ok else "❌ 寫入 Redis 失敗"
|
||||
else:
|
||||
return f"❌ 未知動作: {action},請用 on/off"
|
||||
|
||||
elif sub == "primary":
|
||||
if len(parts) < 3:
|
||||
current = await get_primary_provider()
|
||||
if current:
|
||||
return f"當前主要 Provider: <b>{current}</b>\n用法: <code>/ai primary <provider></code> 或 <code>/ai primary auto</code>"
|
||||
return "當前: 🤖 智慧路由\n用法: <code>/ai primary <provider></code>"
|
||||
provider = parts[2].lower()
|
||||
if provider == "auto":
|
||||
ok = await clear_primary_provider()
|
||||
return "✅ 已清除主要 Provider,恢復智慧路由" if ok else "❌ 操作失敗"
|
||||
if provider not in VALID_PROVIDERS:
|
||||
return f"❌ 未知 Provider: {provider}\n可用: {', '.join(sorted(VALID_PROVIDERS))}"
|
||||
ok = await set_primary_provider(provider)
|
||||
return f"✅ 主要 Provider 已設為 <b>{provider}</b>" if ok else "❌ 寫入 Redis 失敗"
|
||||
|
||||
elif sub == "enable":
|
||||
if len(parts) < 3:
|
||||
return "用法: <code>/ai enable <provider></code>"
|
||||
provider = parts[2].lower()
|
||||
if provider not in VALID_PROVIDERS:
|
||||
return f"❌ 未知 Provider: {provider}"
|
||||
ok = await set_provider_disabled(provider, False)
|
||||
return f"✅ {provider} 已啟用" if ok else "❌ 操作失敗"
|
||||
|
||||
elif sub == "disable":
|
||||
if len(parts) < 3:
|
||||
return "用法: <code>/ai disable <provider></code>"
|
||||
provider = parts[2].lower()
|
||||
if provider not in VALID_PROVIDERS:
|
||||
return f"❌ 未知 Provider: {provider}"
|
||||
ok = await set_provider_disabled(provider, True)
|
||||
return f"⚠️ {provider} 已停用" if ok else "❌ 操作失敗"
|
||||
|
||||
elif sub == "cost":
|
||||
lines = ["<b>💰 AI 費用統計</b>\n"]
|
||||
try:
|
||||
r = await _get_redis()
|
||||
total = 0.0
|
||||
for p in ["openclaw_nemo", "gemini", "claude", "nemotron"]:
|
||||
val = await r.get(f"ai_rate:total_cost:{p}")
|
||||
if val:
|
||||
cost = float(val)
|
||||
total += cost
|
||||
if cost > 0:
|
||||
lines.append(f" {p}: ${cost:.4f}")
|
||||
if total > 0:
|
||||
lines.append(f"\n <b>總計: ${total:.4f}</b>")
|
||||
else:
|
||||
lines.append(" (尚無費用記錄)")
|
||||
except Exception as e:
|
||||
lines.append(f" ⚠️ 費用查詢失敗: {e}")
|
||||
return "\n".join(lines)
|
||||
|
||||
elif sub == "help":
|
||||
return (
|
||||
"<b>/ai 指令說明</b>\n\n"
|
||||
"<code>/ai status</code> — 顯示所有狀態\n"
|
||||
"<code>/ai router on/off</code> — 開關 AIRouter\n"
|
||||
"<code>/ai primary <p></code> — 設主要 Provider\n"
|
||||
"<code>/ai primary auto</code> — 恢復智慧路由\n"
|
||||
"<code>/ai enable <p></code> — 啟用 Provider\n"
|
||||
"<code>/ai disable <p></code> — 停用 Provider\n"
|
||||
"<code>/ai cost</code> — 費用統計\n\n"
|
||||
f"可用 Provider: {', '.join(sorted(VALID_PROVIDERS))}"
|
||||
)
|
||||
|
||||
else:
|
||||
return f"❌ 未知子指令: {sub}\n輸入 <code>/ai help</code> 查看說明"
|
||||
@@ -924,8 +924,19 @@ class OpenClawService:
|
||||
# USE_AI_ROUTER=true → 新 AIRouterExecutor 路由
|
||||
# USE_AI_ROUTER=false → 舊 if/else fallback chain (現狀)
|
||||
# 回滾: kubectl set env deployment/awoooi-api USE_AI_ROUTER=false
|
||||
# Phase 24 C: Redis 狀態覆蓋 env var (/ai router on/off)
|
||||
# =================================================================
|
||||
if settings.USE_AI_ROUTER:
|
||||
# Redis 狀態優先 (Phase 24 C — 2026-04-03 ogt)
|
||||
_use_ai_router = settings.USE_AI_ROUTER
|
||||
try:
|
||||
from src.services.ai_control import get_ai_router_enabled
|
||||
_redis_override = await get_ai_router_enabled()
|
||||
if _redis_override is not None:
|
||||
_use_ai_router = _redis_override
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if _use_ai_router:
|
||||
try:
|
||||
# 2026-04-02 ogt: C2 修復 — 呼叫 AIRouter.route() 智慧路由 (非靜態 order)
|
||||
# D1 意圖分類路由、D7 隱私保護 (DIAGNOSE/CODE_REVIEW 強制 local) 生效
|
||||
@@ -937,10 +948,23 @@ class OpenClawService:
|
||||
decision = await router.route(prompt, alert_context)
|
||||
|
||||
# Step 2: 從 RoutingDecision 建立 provider_order (主 + fallback)
|
||||
# Phase 24 C: Redis primary_provider 覆蓋路由決策
|
||||
provider_order = [decision.selected_provider.value] + [
|
||||
p.value for p, _ in decision.fallback_chain
|
||||
if p.value != decision.selected_provider.value
|
||||
]
|
||||
try:
|
||||
from src.services.ai_control import get_primary_provider, is_provider_disabled
|
||||
_primary = await get_primary_provider()
|
||||
if _primary and _primary != decision.selected_provider.value:
|
||||
# 把 primary 移到首位 (保留原始 fallback)
|
||||
provider_order = [_primary] + [p for p in provider_order if p != _primary]
|
||||
# 過濾被停用的 Provider
|
||||
_filtered = [p for p in provider_order if not await is_provider_disabled(p)]
|
||||
if _filtered:
|
||||
provider_order = _filtered
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Step 3: D7 隱私 — DIAGNOSE/CODE_REVIEW 強制 local
|
||||
require_local = decision.intent in (IntentType.DIAGNOSE, IntentType.CODE_REVIEW)
|
||||
|
||||
@@ -2797,10 +2797,28 @@ class TelegramGateway:
|
||||
logger.warning("telegram_chat_unauthorized", user_id=user_id, error=str(e))
|
||||
return
|
||||
|
||||
# 2. 顯示 "正在輸入中..."
|
||||
# 2. /ai 指令攔截 (Phase 24 C — 2026-04-03 ogt)
|
||||
# 白名單: OPENCLAW_TG_USER_WHITELIST (與審核白名單共用)
|
||||
if text.strip().lower().startswith("/ai"):
|
||||
whitelist = settings.get_tg_user_whitelist()
|
||||
if whitelist and user_id not in whitelist:
|
||||
logger.warning("telegram_ai_command_unauthorized", user_id=user_id)
|
||||
await self.send_notification(
|
||||
"⛔ 未授權:/ai 指令僅限白名單用戶",
|
||||
parse_mode="HTML",
|
||||
chat_id=chat_id,
|
||||
)
|
||||
return
|
||||
from src.services.ai_control import handle_ai_command
|
||||
response = await handle_ai_command(text.strip())
|
||||
await self.send_notification(response, parse_mode="HTML", chat_id=chat_id)
|
||||
logger.info("telegram_ai_command_handled", user_id=user_id, text=text[:50])
|
||||
return
|
||||
|
||||
# 3. 顯示 "正在輸入中..."
|
||||
await self._send_chat_action(chat_id, "typing")
|
||||
|
||||
# 3. 呼叫 ChatManager 處理
|
||||
# 4. 呼叫 ChatManager 處理
|
||||
chat_manager = get_chat_manager()
|
||||
response = await chat_manager.generate_response(
|
||||
user_id=user_id,
|
||||
@@ -2808,7 +2826,7 @@ class TelegramGateway:
|
||||
message_text=text,
|
||||
)
|
||||
|
||||
# 4. 回覆統帥 (定向回傳)
|
||||
# 5. 回覆統帥 (定向回傳)
|
||||
await self.send_notification(response, parse_mode="HTML", chat_id=chat_id)
|
||||
|
||||
async def _send_chat_action(self, chat_id: int, action: str) -> None:
|
||||
|
||||
Reference in New Issue
Block a user