diff --git a/apps/api/src/services/chat_manager.py b/apps/api/src/services/chat_manager.py new file mode 100644 index 00000000..3030bd53 --- /dev/null +++ b/apps/api/src/services/chat_manager.py @@ -0,0 +1,117 @@ +""" +AWOOOI Chat Manager - 統帥對話核心 +=================================== +Phase 21.5: 實作 Telegram 互動對話功能 + +職責: +1. 整合系統上下文 (K3s 狀態, 最近告警, 目前時間) +2. 決定對話風格 (OpenClaw 專業風 vs Nemo 參謀風) +3. 調用 LLM (Nemo-4B / Gemini) 產出回應 +4. 遵守 SOUL.md Nothing.tech 純淨美學 + +2026-03-31 ogt: 初版建立 +""" + +import structlog +from datetime import datetime +from src.utils.timezone import now_taipei +from src.services.nvidia_provider import get_nvidia_provider +from src.repositories.k8s_repository import get_k8s_repository +from src.repositories.incident_repository import get_incident_repository + +logger = structlog.get_logger(__name__) + +class ChatManager: + """ + AWOOOI 對話管理器 - 系統的大腦與聲帶 + """ + + def __init__(self): + self.nvidia = get_nvidia_provider() + self.k8s = get_k8s_repository() + self.incidents = get_incident_repository() + + async def get_system_context(self) -> str: + """ + 收集系統即時上下文,供 LLM 參考 + """ + now = now_taipei() + + # 1. K3s 狀態 + k8s_status = await self.k8s.get_pod_status_summary(namespace="awoooi-prod") + cluster_info = f"Cluster: {k8s_status['running']}/{k8s_status['total']} Pods Running" + if k8s_status['problem_pods']: + cluster_info += f", {len(k8s_status['problem_pods'])} anomalies detected." + + # 2. 最近告警 (取 3 筆) + active_incidents = await self.incidents.get_active() + incident_summary = "None" + if active_incidents: + lines = [] + for inc in active_incidents[:3]: + lines.append(f"- {inc.incident_id}: {inc.status.value} (Severity: {inc.severity.value})") + incident_summary = "\n".join(lines) + + context = f""" +## Current System Context (Taipei Time: {now.strftime('%Y-%m-%d %H:%M:%S')}) +- Environment: AWOOOI Production (K3s) +- {cluster_info} +- Active Incidents: +{incident_summary} +""" + return context + + async def generate_response( + self, + user_id: int, + username: str, + message_text: str + ) -> str: + """ + 根據統帥訊息產生回覆 + """ + system_context = await self.get_system_context() + + # 判定是否在跟 Nemo 對話 + is_asking_nemo = "nemo" in message_text.lower() + + role_description = "You are OpenClaw, the AI operations assistant for AWOOOI platform." + if is_asking_nemo: + role_description = "You are Nemo-4B, the elite AI tactical advisor for AWOOOI. Address the user as 'Supreme Commander' (統帥)." + + system_prompt = f"""{role_description} +{system_context} + +## Guidelines: +1. Keep responses extremely concise and professional (Nothing.tech aesthetic). +2. For status queries, provide precise data. +3. For general chat, be supportive but focused on operations. +4. Language: Preferred Traditional Chinese (繁體中文). +5. No emojis except for functional ones (🚨, ✅, 📊). +""" + + try: + # 優先使用 NVIDIA Nemo-4B + response, provider, success, tokens, cost = await self.nvidia.chat( + prompt=f"{system_prompt}\n\nCommander's Message: {message_text}", + model="nvidia/nemotron-mini-4b-instruct", + max_tokens=1024 + ) + + if not success: + return "🛸 抱歉統帥,Nemo 參謀暫時離線。請稍後再試。" + + return response.strip() + + except Exception as e: + logger.exception("chat_generation_error", error=str(e)) + return "⚠️ 通訊鏈路異常,無法聯繫 AI 腦區。" + +# Singleton +_chat_manager = None + +def get_chat_manager() -> ChatManager: + global _chat_manager + if _chat_manager is None: + _chat_manager = ChatManager() + return _chat_manager diff --git a/apps/api/src/services/telegram_gateway.py b/apps/api/src/services/telegram_gateway.py index 4745b226..c200bb2d 100644 --- a/apps/api/src/services/telegram_gateway.py +++ b/apps/api/src/services/telegram_gateway.py @@ -38,6 +38,7 @@ from src.services.security_interceptor import ( UserNotWhitelistedError, get_security_interceptor, ) +from src.services.chat_manager import get_chat_manager # ============================================================================= # Snooze/Silence Redis Keys (2026-03-27 P1 優化) @@ -2259,7 +2260,7 @@ class TelegramGateway: payload = { "offset": self._last_update_id + 1, "timeout": LONG_POLLING_TIMEOUT, - "allowed_updates": ["callback_query"], # 僅監聽按鈕點擊 + "allowed_updates": ["callback_query", "message"], # 監聽按鈕與文字訊息 } response = await self._http_client.post( @@ -2290,25 +2291,32 @@ class TelegramGateway: """ update_id = update.get("update_id") callback_query = update.get("callback_query") + message = update.get("message") - if not callback_query: - logger.debug("telegram_update_ignored", update_id=update_id, reason="not callback_query") + if not callback_query and not message: + logger.debug("telegram_update_ignored", update_id=update_id, reason="unsupported update type") return - # 解析 callback_query + if callback_query: + await self._handle_callback_query(update_id, callback_query) + elif message: + await self._handle_chat_message(update_id, message) + + async def _handle_callback_query(self, update_id: int, callback_query: dict) -> None: + """處理按鈕點擊更新""" callback_query_id = callback_query.get("id") callback_data = callback_query.get("data") user = callback_query.get("from", {}) user_id = user.get("id") - username = user.get("username") or user.get("first_name") or str(user_id) - message = callback_query.get("message", {}) - message_id = message.get("message_id") - original_text = message.get("text", "") if not all([callback_query_id, callback_data, user_id]): logger.warning("telegram_callback_invalid", update_id=update_id) return + username = user.get("username") or user.get("first_name") or str(user_id) + original_text = callback_query.get("message", {}).get("text", "") + message_id = callback_query.get("message", {}).get("message_id") + logger.info( "telegram_callback_received", update_id=update_id, @@ -2336,6 +2344,55 @@ class TelegramGateway: message_id=message_id, ) + async def _handle_chat_message(self, update_id: int, message: dict) -> None: + """處理统帥的文字訊息""" + text = message.get("text") + user = message.get("from", {}) + user_id = user.get("id") + chat_id = message.get("chat", {}).get("id") + username = user.get("username") or user.get("first_name") or str(user_id) + + if not text or not user_id: + return + + logger.info( + "telegram_chat_received", + update_id=update_id, + user_id=user_id, + username=username, + text=text[:50], + ) + + # 1. 安全檢查 (ADR-012) + try: + interceptor = get_security_interceptor() + await interceptor.intercept_telegram(user_id) + except Exception as e: + logger.warning("telegram_chat_unauthorized", user_id=user_id, error=str(e)) + return + + # 2. 顯示 "正在輸入中..." + await self._send_chat_action(chat_id, "typing") + + # 3. 呼叫 ChatManager 處理 + chat_manager = get_chat_manager() + response = await chat_manager.generate_response( + user_id=user_id, + username=username, + message_text=text, + ) + + # 4. 回覆統帥 + await self.send_notification(response, parse_mode="HTML") + + async def _send_chat_action(self, chat_id: int, action: str) -> None: + """發送聊天狀態 (e.g., typing)""" + if not self._http_client: return + try: + url = f"{self.api_url}/sendChatAction" + await self._http_client.post(url, json={"chat_id": chat_id, "action": action}) + except: pass + async def _execute_approval_action( self, action: str,