From 4d5a995718e6898dfbc073dce9ed2a80ca4f9db4 Mon Sep 17 00:00:00 2001 From: OoO Date: Wed, 29 Apr 2026 21:46:24 +0800 Subject: [PATCH] =?UTF-8?q?chore:=20=E5=88=AA=E9=99=A4=E5=AD=A4=E5=85=92?= =?UTF-8?q?=20AI=20service=20=E4=B8=A6=E8=A3=9C=E9=BD=8A=20env=20=E7=AF=84?= =?UTF-8?q?=E4=BE=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ADR-017 Phase 3f-5:刪除未被 runtime 引用的 elephant_alpha_decision_router、telegram_ai_integration、watcher_agent;補 .env.example 的 Aider/AutoHeal/NVIDIA/OpenClaw/backup/report/PG sync 等實際讀取變數。 --- .env.example | 95 ++++ services/elephant_alpha_decision_router.py | 625 --------------------- services/telegram_ai_integration.py | 282 ---------- services/watcher_agent.py | 351 ------------ 4 files changed, 95 insertions(+), 1258 deletions(-) delete mode 100644 services/elephant_alpha_decision_router.py delete mode 100644 services/telegram_ai_integration.py delete mode 100644 services/watcher_agent.py diff --git a/.env.example b/.env.example index a8d162d..081c3f3 100644 --- a/.env.example +++ b/.env.example @@ -166,3 +166,98 @@ SSH_JUMP_HOST=192.168.0.110 SSH_JUMP_USER=wooo SSH_TARGET_HOST=192.168.0.188 SSH_TARGET_USER=ollama + +# ────────────────────────────────────────────────────────────────────────── +# AIOps / Autonomous Code Repair(ADR-014) +# ────────────────────────────────────────────────────────────────────────── + +# [選填] Aider 自動修復執行所在 SSH 主機(預設 110 Gateway) +HEAL_SSH_HOST=192.168.0.110 +HEAL_SSH_USER=wooo +HEAL_SSH_PORT=22 + +# [選填] SSH private key 路徑;未設定則使用 services/aider_heal_executor.py 預設值 +DEPLOY_SSH_KEY_PATH=/home/wooo/.ssh/id_ed25519 + +# [選填] 110 主機上的 repo 路徑 +AIDER_REPO_PATH=/home/wooo/ewoooc + +# [選填] Aider 使用的模型與 Ollama API endpoint +AIDER_MODEL=ollama/qwen2.5-coder:7b +OLLAMA_API_BASE=http://192.168.0.111:11434 + +# [選填] 自動修復安全閥 +AIDER_MAX_DIFF_LINES=50 +AIDER_MAX_HOURLY_FIX=5 +MOMO_BASE_URL=https://mo.wooo.work + +# ────────────────────────────────────────────────────────────────────────── +# Elephant Alpha / AutoHeal SSH 控制 +# ────────────────────────────────────────────────────────────────────────── + +# [選填] AutoHeal/Elephant Alpha SSH 跳板設定 +ELEPHANT_ALPHA_JUMP_HOST=192.168.0.110 +ELEPHANT_ALPHA_JUMP_USER=wooo +ELEPHANT_ALPHA_SSH_KEY_PATH=config/autoheal_id_ed25519 +ELEPHANT_ALPHA_SSH_PORT=22 +ELEPHANT_ALPHA_SSH_CONNECT_TIMEOUT=10 +ELEPHANT_ALPHA_SSH_COMMAND_TIMEOUT=60 + +# [選填] 自愈節流與狀態快取 +ELEPHANT_ALPHA_CACHE_DB=:memory: +ELEPHANT_ALPHA_ESCALATION_COOLDOWN_MIN=30 +ELEPHANT_TIMEOUT=120 + +# ────────────────────────────────────────────────────────────────────────── +# NVIDIA NIM / OpenClaw / Internal Webhook +# ────────────────────────────────────────────────────────────────────────── + +# [選填] NemoTron / NIM / OpenClaw 相關服務共用 +NVIDIA_API_KEY=your_nvidia_api_key_here +INTERNAL_WEBHOOK_TOKEN=your_internal_webhook_token_here + +# [選填] OpenClaw Telegram bot +OPENCLAW_BOT_TOKEN=your_openclaw_bot_token_here +OPENCLAW_GROUP_ID=-1003940688311 +OPENCLAW_ALLOWED_USERS= + +# [選填] AI provider 選擇與外部資料源 +AI_PROVIDER=ollama +YOUTUBE_API_KEY= +GEMINI_TIMEOUT=60 + +# ────────────────────────────────────────────────────────────────────────── +# Ollama / MCP / 密碼政策 +# ────────────────────────────────────────────────────────────────────────── + +OLLAMA_HOST=https://ollama.wooo.work/ollama +OLLAMA_MODEL=gemma3:4b +OLLAMA_TIMEOUT=120 +OLLAMA_COPY_TIMEOUT=180 +MCP_CACHE_TTL_HOURS=24 +MCP_GEMINI_MODEL=gemini-2.0-flash + +PASSWORD_MIN_LENGTH=8 +PASSWORD_REQUIRE_UPPERCASE=true +PASSWORD_REQUIRE_LOWERCASE=true +PASSWORD_REQUIRE_DIGIT=true +PASSWORD_REQUIRE_SPECIAL=false +PASSWORD_SPECIAL_CHARS='!@#$%^&*()_+-=[]{}|;:,.<>?' +PASSWORD_EXPIRY_DAYS=90 + +# ────────────────────────────────────────────────────────────────────────── +# 備份 / 報表 / 同步 +# ────────────────────────────────────────────────────────────────────────── + +BACKUP_DIR=/app/data/db_backups +BACKUP_RETENTION_DAYS=7 +DB_CONTAINER=momo-db +REPORTS_DIR=/app/data/reports +DATABASE_PATH=data/momo_database.db + +PG_SYNC_ENABLED=false +PG_SYNC_INTERVAL=300 + +# [選填] 外部 BI 連結(模板全域變數) +METABASE_URL=https://mo.wooo.work/metabase +GRIST_URL=https://grist.wooo.work diff --git a/services/elephant_alpha_decision_router.py b/services/elephant_alpha_decision_router.py deleted file mode 100644 index deeafa3..0000000 --- a/services/elephant_alpha_decision_router.py +++ /dev/null @@ -1,625 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -""" -Elephant Alpha Intelligent Decision Router - -AI 3.0 Decision Intelligence: -- Multi-agent coordination routing -- Dynamic task allocation -- Performance-based routing -- Adaptive decision flows -""" - -import asyncio -import json -from datetime import datetime, timedelta -from typing import Dict, List, Any, Optional, Tuple -from dataclasses import dataclass -from enum import Enum -import numpy as np -from services.logger_manager import SystemLogger -from services.elephant_alpha_orchestrator import elephant_orchestrator, StrategicDecision -from services.elephant_alpha_autonomous_engine import autonomous_engine -from database.manager import get_session -from sqlalchemy import text - -logger = SystemLogger("ElephantAlphaRouter").get_logger() - -class RoutingStrategy(Enum): - PERFORMANCE_BASED = "performance_based" - COST_OPTIMIZED = "cost_optimized" - SPEED_PRIORITY = "speed_priority" - QUALITY_FOCUS = "quality_focus" - ADAPTIVE = "adaptive" - -@dataclass -class AgentPerformance: - """Track agent performance metrics""" - agent_name: str - success_rate: float - avg_response_time: float - cost_per_decision: float - quality_score: float - reliability_score: float - last_updated: datetime - -@dataclass -class RoutingDecision: - """Routing decision metadata""" - task_id: str - task_type: str - selected_agents: List[str] - routing_strategy: RoutingStrategy - confidence: float - expected_duration: timedelta - estimated_cost: float - reasoning: str - -class ElephantAlphaDecisionRouter: - """ - Intelligent Decision Router for AI Agent Orchestration - - Features: - - Dynamic agent selection based on performance - - Multi-agent task coordination - - Adaptive routing strategies - - Performance monitoring and optimization - - Cost-aware routing decisions - """ - - def __init__(self): - self.agent_performance: Dict[str, AgentPerformance] = {} - self.routing_strategy = RoutingStrategy.ADAPTIVE - self.performance_history: List[Dict[str, Any]] = [] - - # Initialize agent performance metrics - self._initialize_agent_performance() - - def _initialize_agent_performance(self): - """Initialize baseline performance metrics for all agents""" - self.agent_performance = { - "hermes": AgentPerformance( - agent_name="hermes", - success_rate=0.85, - avg_response_time=120.0, # 2 minutes - cost_per_decision=0.0, - quality_score=0.88, - reliability_score=0.92, - last_updated=datetime.now() - ), - "nemotron": AgentPerformance( - agent_name="nemotron", - success_rate=0.90, - avg_response_time=30.0, # 30 seconds - cost_per_decision=0.0, - quality_score=0.82, - reliability_score=0.95, - last_updated=datetime.now() - ), - "openclaw": AgentPerformance( - agent_name="openclaw", - success_rate=0.87, - avg_response_time=90.0, # 1.5 minutes - cost_per_decision=0.0, - quality_score=0.91, - reliability_score=0.89, - last_updated=datetime.now() - ) - } - - async def route_decision_request(self, request: Dict[str, Any]) -> RoutingDecision: - """ - Route decision request to optimal agent combination - - Args: - request: Decision request with context and requirements - - Returns: - RoutingDecision: Optimal routing decision - """ - - # Analyze request requirements - task_analysis = self._analyze_task_requirements(request) - - # Select optimal routing strategy - strategy = self._select_routing_strategy(task_analysis) - - # Select best agents for the task - selected_agents = await self._select_agents(task_analysis, strategy) - - # Calculate routing metrics - duration, cost, confidence = self._calculate_routing_metrics( - selected_agents, task_analysis - ) - - # Generate routing decision - routing_decision = RoutingDecision( - task_id=request.get("task_id", f"task_{datetime.now().timestamp()}"), - task_type=task_analysis["task_type"], - selected_agents=selected_agents, - routing_strategy=strategy, - confidence=confidence, - expected_duration=duration, - estimated_cost=cost, - reasoning=self._generate_routing_reasoning( - selected_agents, strategy, task_analysis - ) - ) - - # Log routing decision for learning - await self._log_routing_decision(routing_decision, request) - - return routing_decision - - def _analyze_task_requirements(self, request: Dict[str, Any]) -> Dict[str, Any]: - """Analyze task requirements and characteristics""" - - task_type = request.get("task_type", "general_analysis") - urgency = request.get("urgency", "normal") - complexity = request.get("complexity", "medium") - quality_requirement = request.get("quality_requirement", "standard") - budget_constraint = request.get("budget_constraint", "none") - - # Determine required capabilities - required_capabilities = [] - - if "price" in task_type.lower() or "competition" in task_type.lower(): - required_capabilities.extend(["market_analysis", "price_intelligence"]) - - if "threat" in task_type.lower() or "alert" in task_type.lower(): - required_capabilities.extend(["threat_detection", "rapid_response"]) - - if "strategy" in task_type.lower() or "planning" in task_type.lower(): - required_capabilities.extend(["strategic_thinking", "long_term_planning"]) - - if "action" in task_type.lower() or "dispatch" in task_type.lower(): - required_capabilities.extend(["tool_calling", "execution"]) - - return { - "task_type": task_type, - "urgency": urgency, - "complexity": complexity, - "quality_requirement": quality_requirement, - "budget_constraint": budget_constraint, - "required_capabilities": required_capabilities, - "estimated_data_size": request.get("data_size", "medium"), - "requires_human_oversight": request.get("requires_human_oversight", False) - } - - def _select_routing_strategy(self, task_analysis: Dict[str, Any]) -> RoutingStrategy: - """Select optimal routing strategy based on task requirements""" - - urgency = task_analysis["urgency"] - quality = task_analysis["quality_requirement"] - budget = task_analysis["budget_constraint"] - - if urgency == "critical": - return RoutingStrategy.SPEED_PRIORITY - elif quality == "premium": - return RoutingStrategy.QUALITY_FOCUS - elif budget != "none": - return RoutingStrategy.COST_OPTIMIZED - else: - return RoutingStrategy.ADAPTIVE - - async def _select_agents(self, task_analysis: Dict[str, Any], - strategy: RoutingStrategy) -> List[str]: - """Select optimal agents based on strategy and task requirements""" - - required_capabilities = task_analysis["required_capabilities"] - complexity = task_analysis["complexity"] - - # Score each agent for this task - agent_scores = {} - - for agent_name, performance in self.agent_performance.items(): - score = self._calculate_agent_score( - agent_name, performance, task_analysis, strategy - ) - agent_scores[agent_name] = score - - # Sort agents by score - sorted_agents = sorted(agent_scores.items(), key=lambda x: x[1], reverse=True) - - # Select agents based on complexity and requirements - if complexity == "simple": - selected = [sorted_agents[0][0]] # Best single agent - elif complexity == "medium": - selected = [sorted_agents[0][0], sorted_agents[1][0]] # Top 2 agents - else: # complex - selected = [agent for agent, _ in sorted_agents] # All agents - - return selected - - def _calculate_agent_score(self, agent_name: str, performance: AgentPerformance, - task_analysis: Dict[str, Any], - strategy: RoutingStrategy) -> float: - """Calculate score for agent based on strategy and task""" - - base_score = 0.0 - - # Strategy-specific scoring - if strategy == RoutingStrategy.SPEED_PRIORITY: - base_score = (1.0 / performance.avg_response_time) * 100 - base_score *= performance.reliability_score - - elif strategy == RoutingStrategy.QUALITY_FOCUS: - base_score = performance.quality_score * 100 - base_score *= performance.success_rate - - elif strategy == RoutingStrategy.COST_OPTIMIZED: - base_score = (1.0 / max(performance.cost_per_decision, 0.01)) * 100 - base_score *= performance.success_rate - - elif strategy == RoutingStrategy.PERFORMANCE_BASED: - base_score = (performance.success_rate * performance.quality_score) * 100 - - else: # ADAPTIVE - # Balanced scoring - base_score = ( - performance.success_rate * 30 + - performance.quality_score * 25 + - performance.reliability_score * 25 + - (1.0 / performance.avg_response_time) * 20 - ) - - # Capability matching bonus - capability_bonus = self._calculate_capability_bonus( - agent_name, task_analysis["required_capabilities"] - ) - - return base_score + capability_bonus - - def _calculate_capability_bonus(self, agent_name: str, - required_capabilities: List[str]) -> float: - """Calculate bonus for agent capabilities matching requirements""" - - agent_capabilities = { - "hermes": ["market_analysis", "price_intelligence", "threat_detection"], - "nemotron": ["tool_calling", "execution", "rapid_response"], - "openclaw": ["strategic_thinking", "long_term_planning", "insight_generation"] - } - - if agent_name not in agent_capabilities: - return 0.0 - - capabilities = agent_capabilities[agent_name] - matches = sum(1 for cap in required_capabilities if cap in capabilities) - - if not required_capabilities: - return 0.0 - - return (matches / len(required_capabilities)) * 50 # Up to 50 point bonus - - def _calculate_routing_metrics(self, selected_agents: List[str], - task_analysis: Dict[str, Any]) -> Tuple[timedelta, float, float]: - """Calculate routing decision metrics""" - - # Duration estimation - total_response_time = sum( - self.agent_performance[agent].avg_response_time - for agent in selected_agents - ) - - # Add coordination overhead for multiple agents - if len(selected_agents) > 1: - coordination_overhead = (len(selected_agents) - 1) * 30 # 30s per additional agent - total_response_time += coordination_overhead - - duration = timedelta(seconds=total_response_time) - - # Cost estimation - total_cost = sum( - self.agent_performance[agent].cost_per_decision - for agent in selected_agents - ) - - # Confidence calculation - avg_success_rate = np.mean([ - self.agent_performance[agent].success_rate - for agent in selected_agents - ]) - - # Adjust confidence based on task complexity - complexity_penalty = { - "simple": 0.0, - "medium": -0.1, - "complex": -0.2 - }.get(task_analysis["complexity"], 0.0) - - confidence = max(0.5, min(0.95, avg_success_rate + complexity_penalty)) - - return duration, total_cost, confidence - - def _generate_routing_reasoning(self, selected_agents: List[str], - strategy: RoutingStrategy, - task_analysis: Dict[str, Any]) -> str: - """Generate human-readable reasoning for routing decision""" - - reasoning_parts = [] - - # Strategy explanation - strategy_explanations = { - RoutingStrategy.SPEED_PRIORITY: "Prioritized speed due to urgent requirements", - RoutingStrategy.QUALITY_FOCUS: "Prioritized quality for premium requirements", - RoutingStrategy.COST_OPTIMIZED: "Optimized for cost efficiency", - RoutingStrategy.PERFORMANCE_BASED: "Selected based on historical performance", - RoutingStrategy.ADAPTIVE: "Adaptive selection balancing multiple factors" - } - - reasoning_parts.append(f"Strategy: {strategy_explanations[strategy]}") - - # Agent selection reasoning - agent_reasoning = [] - for agent in selected_agents: - perf = self.agent_performance[agent] - agent_reasoning.append( - f"{agent}: {perf.success_rate:.1%} success rate, " - f"{perf.avg_response_time:.0f}s avg response" - ) - - reasoning_parts.append(f"Selected agents: {', '.join(agent_reasoning)}") - - # Task complexity consideration - complexity_notes = { - "simple": "Single agent sufficient for straightforward task", - "medium": "Two agents provide balanced capability for moderate complexity", - "complex": "Full agent coordination required for comprehensive analysis" - } - - reasoning_parts.append(f"Complexity: {complexity_notes[task_analysis['complexity']]}") - - return " | ".join(reasoning_parts) - - async def _log_routing_decision(self, decision: RoutingDecision, - request: Dict[str, Any]): - """Log routing decision for performance tracking""" - - log_entry = { - "timestamp": datetime.now().isoformat(), - "task_id": decision.task_id, - "task_type": decision.task_type, - "selected_agents": decision.selected_agents, - "strategy": decision.routing_strategy.value, - "confidence": decision.confidence, - "estimated_duration": decision.expected_duration.total_seconds(), - "estimated_cost": decision.estimated_cost, - "reasoning": decision.reasoning - } - - self.performance_history.append(log_entry) - - # Keep only last 1000 entries - if len(self.performance_history) > 1000: - self.performance_history = self.performance_history[-1000:] - - # W4: ADR-007 dual-write → ai_insights DB (non-fatal) - try: - with get_session() as session: - session.execute(text(""" - INSERT INTO ai_insights - (insight_type, content, confidence, created_by, status, metadata_json) - VALUES - (:itype, :content, :confidence, 'elephant_router', 'active', :metadata) - """), { - "itype": "routing_decision", - "content": f"[{decision.routing_strategy.value}] {decision.task_type} → {decision.selected_agents}", - "confidence": decision.confidence, - "metadata": json.dumps(log_entry), - }) - session.commit() - except Exception as e: - logger.warning("[ElephantAlphaRouter] DB routing log failed (non-fatal): %s", e) - - async def execute_routed_decision(self, routing_decision: RoutingDecision, - request: Dict[str, Any]) -> Dict[str, Any]: - """Execute decision through routed agents""" - - start_time = datetime.now() - results = {} - - try: - # Build business context for Elephant Alpha - context = { - **request, - "routing_decision": { - "selected_agents": routing_decision.selected_agents, - "strategy": routing_decision.routing_strategy.value, - "confidence": routing_decision.confidence - } - } - - # Get strategic coordination from Elephant Alpha - strategic_decision = await elephant_orchestrator.analyze_and_coordinate(context) - - # Execute through selected agents - for agent_name in routing_decision.selected_agents: - agent_result = await self._execute_agent_task( - agent_name, strategic_decision, request - ) - results[agent_name] = agent_result - - # Aggregate results - final_result = self._aggregate_agent_results(results, strategic_decision) - - # Update performance metrics - await self._update_performance_metrics( - routing_decision, final_result, start_time - ) - - return final_result - - except Exception as e: - logger.error(f"[ElephantAlphaRouter] Execution failed: {e}") - return { - "success": False, - "error": str(e), - "routing_decision": routing_decision.__dict__ - } - - async def _execute_agent_task(self, agent_name: str, - strategic_decision: StrategicDecision, - request: Dict[str, Any]) -> Dict[str, Any]: - """Execute task through specific agent""" - - try: - if agent_name == "hermes": - return await self._execute_hermes_task(strategic_decision, request) - elif agent_name == "nemotron": - return await self._execute_nemotron_task(strategic_decision, request) - elif agent_name == "openclaw": - return await self._execute_openclaw_task(strategic_decision, request) - else: - raise ValueError(f"Unknown agent: {agent_name}") - - except Exception as e: - logger.error(f"[ElephantAlphaRouter] Agent {agent_name} execution failed: {e}") - return {"success": False, "error": str(e), "agent": agent_name} - - # B7 FIX: all three execute methods must be async (called with await above) - async def _execute_hermes_task(self, strategic_decision: StrategicDecision, - request: Dict[str, Any]) -> Dict[str, Any]: - """Execute task through Hermes agent""" - from services.hermes_analyst_service import HermesAnalystService - hermes = HermesAnalystService() - try: - result = hermes.run() # sync call inside async is fine - return { - "success": result.success, - "agent": "hermes", - "result": result.__dict__ if hasattr(result, '__dict__') else str(result), - "execution_time": result.analysis_duration_sec if hasattr(result, 'analysis_duration_sec') else 0 - } - except Exception as e: - return {"success": False, "agent": "hermes", "error": str(e)} - - async def _execute_nemotron_task(self, strategic_decision: StrategicDecision, - request: Dict[str, Any]) -> Dict[str, Any]: - """Execute task through NemoTron agent (routed via Orchestrator plan)""" - return { - "success": True, - "agent": "nemotron", - "message": "NemoTron action dispatched via Orchestrator plan", - "execution_time": 5.0 - } - - async def _execute_openclaw_task(self, strategic_decision: StrategicDecision, - request: Dict[str, Any]) -> Dict[str, Any]: - """Execute task through OpenClaw agent""" - from services.openclaw_strategist_service import generate_weekly_strategy_report - try: - report = generate_weekly_strategy_report() - return { - "success": True, - "agent": "openclaw", - "result": {"report_length": len(report)}, - "execution_time": 45.0 - } - except Exception as e: - return {"success": False, "agent": "openclaw", "error": str(e)} - - def _aggregate_agent_results(self, results: Dict[str, Any], - strategic_decision: StrategicDecision) -> Dict[str, Any]: - """Aggregate results from multiple agents""" - - successful_agents = [ - agent for agent, result in results.items() - if result.get("success", False) - ] - - failed_agents = [ - agent for agent, result in results.items() - if not result.get("success", False) - ] - - # Calculate overall success - overall_success = len(successful_agents) > 0 and len(failed_agents) == 0 - - # Aggregate execution times - total_execution_time = sum( - result.get("execution_time", 0) - for result in results.values() - ) - - return { - "success": overall_success, - "strategic_decision": strategic_decision.__dict__, - "agent_results": results, - "successful_agents": successful_agents, - "failed_agents": failed_agents, - "total_execution_time": total_execution_time, - "agents_used": len(successful_agents) + len(failed_agents) - } - - async def _update_performance_metrics(self, routing_decision: RoutingDecision, - result: Dict[str, Any], start_time: datetime): - """Update agent performance metrics based on execution results""" - - execution_time = (datetime.now() - start_time).total_seconds() - success = result.get("success", False) - - # Update metrics for each used agent - for agent_name in routing_decision.selected_agents: - if agent_name in result.get("agent_results", {}): - await self._update_single_agent_performance( - agent_name, success, execution_time - ) - - async def _update_single_agent_performance(self, agent_name: str, - success: bool, execution_time: float): - """Update performance metrics for a single agent""" - - if agent_name not in self.agent_performance: - return - - performance = self.agent_performance[agent_name] - - # Exponential moving average updates - alpha = 0.1 # Learning rate - - # Update success rate - new_success = 1.0 if success else 0.0 - performance.success_rate = ( - alpha * new_success + - (1 - alpha) * performance.success_rate - ) - - # Update response time - performance.avg_response_time = ( - alpha * execution_time + - (1 - alpha) * performance.avg_response_time - ) - - # Update timestamp - performance.last_updated = datetime.now() - - logger.info("[ElephantAlphaRouter] Updated %s performance: success_rate=%.2f, response_time=%.1fs", - agent_name, performance.success_rate, performance.avg_response_time) - - # W4: ADR-007 — persist agent performance snapshot to DB (non-fatal) - try: - with get_session() as session: - session.execute(text(""" - INSERT INTO ai_insights - (insight_type, content, confidence, created_by, status, metadata_json) - VALUES - (:itype, :content, :confidence, 'elephant_router', 'active', :metadata) - """), { - "itype": "agent_performance_snapshot", - "content": f"{agent_name}: success={performance.success_rate:.3f} rt={performance.avg_response_time:.1f}s", - "confidence": performance.success_rate, - "metadata": json.dumps({ - "agent": agent_name, - "success_rate": performance.success_rate, - "avg_response_time": performance.avg_response_time, - "quality_score": performance.quality_score, - "reliability_score": performance.reliability_score, - "timestamp": datetime.now().isoformat(), - }), - }) - session.commit() - except Exception as e: - logger.warning("[ElephantAlphaRouter] DB perf snapshot failed (non-fatal): %s", e) - -# Global router instance -decision_router = ElephantAlphaDecisionRouter() diff --git a/services/telegram_ai_integration.py b/services/telegram_ai_integration.py deleted file mode 100644 index a96c9e4..0000000 --- a/services/telegram_ai_integration.py +++ /dev/null @@ -1,282 +0,0 @@ -#!/usr/bin/env python3 -""" -Telegram Bot AI Integration -Integrate existing AI Orchestrator for natural language processing -All responses in Traditional Chinese -""" - -import asyncio -import logging -from typing import Dict, Any, Optional -from services.ai_orchestrator import AIOrchestrator -from services import openclaw_strategist_service -from datetime import datetime - -logger = logging.getLogger(__name__) - -class TelegramAIIntegration: - """Telegram Bot AI Integration for natural language understanding""" - - def __init__(self): - self.orchestrator = AIOrchestrator() - - async def process_natural_language_query(self, user_message: str, user_id: int, chat_id: int) -> Dict[str, Any]: - """ - Process natural language query using existing AI infrastructure - - Args: - user_message: User's message in Traditional Chinese - user_id: Telegram user ID - chat_id: Telegram chat ID - - Returns: - Response dictionary with Traditional Chinese content - """ - try: - # Create session ID based on user and chat - session_id = f"tg_{user_id}_{chat_id}" - - # Prepare event for AI processing - event = { - "type": "telegram_query", - "source": "telegram_bot", - "timestamp": datetime.now().isoformat(), - "user_id": user_id, - "chat_id": chat_id, - "message": user_message, - "language": "zh-TW", # Traditional Chinese - "context": "telegram_group_chat" - } - - # L1: Semantic understanding (Hermes) - l1_result = await self.orchestrator.handle_l1(event, session_id) - if not l1_result or l1_result.get("metadata", {}).get("source") != "hermes_llm": - logger.warning( - f"[TelegramAIIntegration] Hermes LLM 未回應,走規則引擎降級" - f"(session={session_id} source={( l1_result or {}).get('metadata', {}).get('source', 'none')})" - ) - - # Check if this is a complex query requiring L2 processing - if self._is_complex_query(user_message, l1_result): - # L2: Planning and execution (Nemotron) - l2_result = await self.orchestrator.handle_l2(event, session_id) - - dispatch_to = (l2_result or {}).get("dispatch_to", "direct_response") - complexity = float((l1_result or {}).get("complexity_score", 0.0) or 0.0) - - # 若 L2 判定走 OpenClaw 或複雜度 >= 0.7 → 呼叫策略師補真實繁中洞察 - if dispatch_to == "openclaw" or complexity >= 0.7: - strategist_text = "" - try: - # 同步呼叫 Gemini 可能耗時數秒,丟到 executor 避免阻塞 event loop - loop = asyncio.get_running_loop() - strategist_text = await loop.run_in_executor( - None, - openclaw_strategist_service.generate_strategy_response, - user_message, - { - "intent": (l1_result or {}).get("intent"), - "user_id": user_id, - "chat_id": chat_id, - }, - ) - except Exception as e: - logger.error( - f"[TelegramAIIntegration] OpenClaw 策略師呼叫失敗" - f"({type(e).__name__}: {e})", - exc_info=True, - ) - strategist_text = "" - - response = self._format_complex_response(l1_result, l2_result, user_message) - if strategist_text: - response["response_text"] = strategist_text - response["strategist_used"] = True - return response - - return self._format_complex_response(l1_result, l2_result, user_message) - else: - # Simple query, handle directly - return self._format_simple_response(l1_result, user_message) - - except Exception as e: - logger.error(f"[TelegramAIIntegration] Error processing query: {e}", exc_info=True) - return self._format_error_response(user_message) - - def _is_complex_query(self, message: str, l1_result: Dict[str, Any]) -> bool: - """Determine if query requires complex processing""" - complex_indicators = [ - "momo", " momo", "momo ", - "2026", "2025", "2024", # Date ranges - "brand", "brands", "brand:", "brands:", # Brand queries - "category", "categories", "category:", # Category queries - "report", "analysis", "ppt", "presentation", # Report generation - "compare", "comparison", "vs", "versus" # Comparison queries - ] - - message_lower = message.lower() - - # Check for complex indicators - for indicator in complex_indicators: - if indicator in message_lower: - return True - - # Check L1 analysis result - if l1_result.get("complexity_score", 0) > 0.7: - return True - - if l1_result.get("requires_data_fetch", False): - return True - - return False - - def _format_simple_response(self, l1_result: Dict[str, Any], original_message: str) -> Dict[str, Any]: - """Format response for simple queries""" - intent = l1_result.get("intent", "unknown") - confidence = l1_result.get("confidence", 0.0) - - # 繁體中文(台灣)回應模板 - responses = { - "greeting": { - "zh_tw": "您好!我是 MOMO Pro 智能助理,今天需要什麼協助?", - "suggestions": ["查看今日業績", "商品排行榜", "市場情報摘要"], - }, - "help": { - "zh_tw": ( - "我可以協助您處理業績查詢、商品資訊、市場情報、產出報告等需求。\n" - "請直接輸入問題,或使用選單選擇功能。" - ), - "suggestions": ["業績表現", "商品趨勢", "市場分析"], - }, - "unknown": { - "zh_tw": "收到您的訊息,若需特定功能,請從選單選擇或使用 /help 查看可用指令。", - "suggestions": ["顯示主選單", "查看業績數據", "商品分析"], - }, - } - - # 若 L1 已帶 preliminary_answer(Hermes 生成或規則引擎),優先採用 - preliminary = (l1_result or {}).get("preliminary_answer", "") or "" - response_data = responses.get(intent, responses["unknown"]) - if preliminary: - response_data = {**response_data, "zh_tw": preliminary} - - return { - "success": True, - "type": "simple_response", - "intent": intent, - "confidence": confidence, - "response_text": response_data["zh_tw"], - "suggestions": response_data["suggestions"], - "show_menu": intent == "unknown" - } - - def _format_complex_response(self, l1_result: Dict[str, Any], l2_result: Dict[str, Any], original_message: str) -> Dict[str, Any]: - """Format response for complex queries requiring data fetching""" - action_plan = l2_result.get("action_plan", {}) - - # Extract relevant information - query_type = self._extract_query_type(original_message) - date_range = self._extract_date_range(original_message) - brands = self._extract_brands(original_message) - - # 繁體中文(台灣)回應 - query_type_zh = { - "sales analysis": "業績分析", - "product analysis": "商品分析", - "market intelligence": "市場情報", - "report generation": "報告產製", - "comparative analysis": "比較分析", - "general query": "一般查詢", - }.get(query_type, query_type) - - response_text = f"正在處理您的「{query_type_zh}」需求" - if date_range: - response_text += f",期間:{date_range}" - if brands: - response_text += f",品牌:{', '.join(brands)}" - response_text += "。分析準備中,稍候片刻…" - - return { - "success": True, - "type": "complex_response", - "query_type": query_type, - "date_range": date_range, - "brands": brands, - "action_plan": action_plan, - "response_text": response_text, - "requires_processing": True, - "processing_status": "queued" - } - - def _format_error_response(self, original_message: str) -> Dict[str, Any]: - """Format error response in Traditional Chinese""" - return { - "success": False, - "type": "error_response", - "response_text": "抱歉,處理您的訊息時發生問題,請改用選單功能或稍後再試。", - "error_suggestions": [ - "查看今日業績", - "商品排行榜", - "市場情報摘要", - "使用 /help 查看可用指令", - ], - "show_menu": True, - } - - def _extract_query_type(self, message: str) -> str: - """Extract type of query from message""" - if any(word in message.lower() for word in ["sales", "revenue", "performance"]): - return "sales analysis" - elif any(word in message.lower() for word in ["product", "brand", "item"]): - return "product analysis" - elif any(word in message.lower() for word in ["market", "trend", "intelligence"]): - return "market intelligence" - elif any(word in message.lower() for word in ["report", "ppt", "presentation"]): - return "report generation" - elif any(word in message.lower() for word in ["compare", "comparison", "vs"]): - return "comparative analysis" - else: - return "general query" - - def _extract_date_range(self, message: str) -> Optional[str]: - """Extract date range from message""" - import re - date_pattern = r'(\d{4}[./-]\d{2}[./-]\d{2})\s*[-~]\s*(\d{4}[./-]\d{2}[./-]\d{2})' - match = re.search(date_pattern, message) - - if match: - start = match.group(1).replace('/', '-').replace('.', '-') - end = match.group(2).replace('/', '-').replace('.', '-') - return f"{start} 至 {end}" - - return None - - def _extract_brands(self, message: str) -> list: - """Extract brand names from message (Chinese and English)""" - brand_mapping = { - "nivea": "Nivea", "loreal": "Loreal", "sk-ii": "SK-II", - "kiehls": "Kiehls", "clinique": "Clinique", "dior": "Dior", - "chanel": "Chanel", "ysl": "YSL", "givenchy": "Givenchy", - "gucci": "Gucci", "prada": "Prada", "versace": "Versace", - "armani": "Armani", "coach": "Coach", "michael kors": "Michael Kors", - "neutrogena": "Neutrogena", "aveeno": "Aveeno", - "estee lauder": "Estee Lauder", "lancome": "Lancome", - "biotherm": "Biotherm", "clarins": "Clarins", "nars": "NARS", - "bobbi brown": "Bobbi Brown", "mac": "MAC", "tumi": "Tumi", - "samsonite": "Samsonite", "longchamp": "Longchamp", "shiseido": "Shiseido" - } - - message_lower = message.lower() - found_brands = [] - for brand_key, brand_name in brand_mapping.items(): - if brand_key in message_lower and brand_name not in found_brands: - found_brands.append(brand_name) - return found_brands - - -# Global instance for use in telegram bot service -telegram_ai_integration = TelegramAIIntegration() - -async def process_telegram_query(user_message: str, user_id: int, chat_id: int) -> Dict[str, Any]: - """Convenience function for processing telegram queries""" - return await telegram_ai_integration.process_natural_language_query(user_message, user_id, chat_id) diff --git a/services/watcher_agent.py b/services/watcher_agent.py deleted file mode 100644 index 8a50be5..0000000 --- a/services/watcher_agent.py +++ /dev/null @@ -1,351 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -""" -Watcher Agent — 主動偵測與觸發 - -角色: -- 定期輪詢 sales snapshot,檢查銷量下滑或競品價格突漲 -- 發生異常時構建 event 並 dispatch 到 EventRouter -- 與 ActionPlanner 配合生成後續計畫 - -設計: -- 輕量級,無需額外 infra(僅用 PostgreSQL) -- 異常閾值可透過 env 調整 -- 與 OpenClaw 共享 agent_context 記憶 -""" - -import json -import logging -import os -import time -from datetime import datetime, timedelta -from typing import Any, Dict, List, Optional - -import requests -from sqlalchemy import text - -from database.manager import get_session -from services.ai_orchestrator import AIOrchestrator -from services.event_router import dispatch - -sys_log = logging.getLogger(__name__) - -# ─── 環境設定 ──────────────────────────────────────────────── -SALES_SNAPSHOT_TABLE = os.getenv("WATCHER_SNAPSHOT_TABLE", "daily_sales_snapshot") -SALES_DROP_THRESHOLD = float(os.getenv("WATCHER_SALES_DROP_THRESHOLD", "0.20")) # 20% -PRICE_SURGE_THRESHOLD = float(os.getenv("WATCHER_PRICE_SURGE_THRESHOLD", "0.15")) # 15% -CACHE_TTL_MIN = int(os.getenv("WATCHER_CACHE_TTL_MIN", "30")) # 輪詻間隔 - -# ─── 共享上下文鍵 ──────────────────────────────────────────── -WATCHER_CTX_NS = "watcher" - - -class WatcherAgent: - """ - 主動偵測 Agent - 流程: - 1) 載入最近兩週銷售快照 - 2) 計算環比變化 - 3) 篩選異常 SKU - 4) 構建 event 並 dispatch - 5) 寫入 agent_context 供後續 Agent 使用 - """ - - def __init__(self, orchestrator: Optional[AIOrchestrator] = None): - self.orchestrator = orchestrator or AIOrchestrator() - - async def scan(self) -> int: - """執行一次掃描,回傳觸發的異常數""" - rows = await self._fetch_sales_snapshot() - if not rows: - sys_log.info("[Watcher] 無銷售快照,跳過掃描") - return 0 - - anomalies = self._detect_anomalies(rows) - if not anomalies: - sys_log.info("[Watcher] 未檢測到異常") - return 0 - - sys_log.info(f"[Watcher] 檢測到 {len(anomalies)} 筆異常,開始 dispatch") - triggered = 0 - for an in anomalies: - if await self._dispatch_anomaly(an): - triggered += 1 - return triggered - - async def track_outcomes(self, days: int = 7) -> None: - """ - 排程回撥:執行後 days 天後檢查 action_outcomes, - 並將結果回饋給 OpenClaw 學習。 - 這裡僅作佈署示意;實際排程由外部 scheduler 負責。 - """ - sys_log.info(f"[Watcher] 排程 outcome 回撥({days} 天後)") - # 範例: - # await outcome_tracker.schedule_follow_up(plan_id, sku, metric) - - # ── 內部方法 ──────────────────────────────────────────────── - - async def _fetch_sales_snapshot(self) -> List[Dict[str, Any]]: - """ - 讀取銷售快照。 - 欄位假設: - - sku - - name - - category - - sales_curr (最近7天銷售金額) - - sales_prev (前7天銷售金額) - - price_momo (MOMO 價格) - - price_pchome (PChome 價格) - - stock_status (庫存狀態) - 若實際欄位名不同,請依實際調整。 - """ - session = get_session() - try: - sql = text(f""" - SELECT sku, name, category, - COALESCE(sales_curr, 0) AS sales_curr, - COALESCE(sales_prev, 0) AS sales_prev, - price_momo, price_pchome, stock_status - FROM {SALES_SNAPSHOT_TABLE} - WHERE snapshot_date = CURRENT_DATE - INTERVAL '1 day' - LIMIT 500 - """) - result = session.execute(sql).fetchall() - return [dict(row._mapping) for row in result] - except Exception as e: - sys_log.error(f"[Watcher] 無法讀取快照: {e}") - return [] - finally: - session.close() - - def _detect_anomalies(self, rows: List[Dict[str, Any]]) -> List[Dict[str, Any]]: - anomalies: List[Dict[str, Any]] = [] - for r in rows: - sku = r["sku"] - name = r["name"] - curr = float(r["sales_curr"] or 0) - prev = float(r["sales_prev"] or 1) # 避免除以 0 - pchome = r["price_pchome"] - momo = r["price_momo"] - stock = r.get("stock_status", "unknown") - - drop_pct = (curr - prev) / prev if prev else 0.0 - price_gap_pct = ((momo - pchome) / pchome * 100) if pchome else 0.0 - - reasons = [] - - # 銷量下滑異常 - if drop_pct <= -SALES_DROP_THRESHOLD: - reasons.append( - f"銷量下滑 {drop_pct:+.1%}(閾值 {SALES_DROP_THRESHOLD:+.0%})" - ) - - # 競品價格突漲(若我方價格低且差距擴大) - if price_gap_pct > PRICE_SURGE_THRESHOLD: - reasons.append( - f"競品價格突漲 {price_gap_pct:+.1f}% 形成高價差" - ) - - # 庫存危機(可擴充) - if stock in ("out_of_stock", "low_stock"): - reasons.append(f"庫存狀態: {stock}") - - if not reasons: - continue - - anomalies.append({ - "sku": sku, - "name": name, - "category": r.get("category", ""), - "drop_pct": drop_pct, - "price_gap_pct": price_gap_pct, - "reasons": reasons, - "stock": stock, - "momo_price": momo, - "pchome_price": pchome, - }) - return anomalies - - async def _dispatch_anomaly(self, anom: Dict[str, Any]) -> bool: - """ - 依異常類型決定路由: - - 銷量下滑 + 價差微小 → L1(分析原因) - - 銷量下滑 + 價差大 → L2(規劃 + 審核) - - 競品價格突漲 → L2(防範被動) - """ - drop = anom["drop_pct"] - gap = anom["price_gap_pct"] - sku = anom["sku"] - name = anom["name"] - session_id = self._ensure_session(sku) - - # 構建 event payload(與 EventRouter 對齊) - event = { - "source": "watcher", - "event_type": "sales_anomaly", - "severity": "alert", - "title": f"銷售異常偵測 — {sku} {name}", - "summary": "; ".join(anom["reasons"]), - "payload": { - "sku": sku, - "name": name, - "category": anom["category"], - "drop_pct": anom["drop_pct"], - "price_gap_pct": anom["price_gap_pct"], - "stock": anom["stock"], - "momo_price": anom["momo_price"], - "pchome_price": anom["pchome_price"], - "sales_prev": anom.get("sales_prev"), - "sales_curr": anom.get("sales_curr"), - }, - "impact": "銷量下滑可能導致收入損失", - "status": "open", - } - - # 決策路由 - if drop <= -SALES_DROP_THRESHOLD and abs(gap) < PRICE_SURGE_THRESHOLD: - # 銷量下滑但價差微小 → 檢查是否非價格因素(缺貨/流量) - event["severity"] = "alert" - event["payload"]["non_price_factor"] = True - # 交由 L1 分析原因 - return await self._route_l1(event, session_id) - else: - # 銷量下滑 + 價差大 或 競品價格突漲 → L2 規劃 - event["severity"] = "alert" - return await self._route_l2(event, session_id) - - async def _route_l1(self, event: Dict[str, Any], session_id: str) -> bool: - """L1:Hermes 分析下滑原因""" - try: - result = await self.orchestrator.handle_l1(event, session_id) - sys_log.info(f"[Watcher] L1 dispatch success for {event['payload']['sku']}") - # 寫入共享上下文 - await self._save_context(session_id, "hermes", { - "summary": result.get("summary"), - "probable_cause": result.get("probable_cause"), - "actions": result.get("actions", []), - }) - return True - except Exception as e: - sys_log.error(f"[Watcher] L1 dispatch failed: {e}") - # 保底:直接通知 - await self._fallback_notify(event) - return False - - async def _route_l2(self, event: Dict[str, Any], session_id: str) -> bool: - """L2:NemoTron 規劃 + 審核閘""" - try: - result = await self.orchestrator.handle_l2(event, session_id) - sys_log.info(f"[Watcher] L2 dispatch success for {event['payload']['sku']}") - # 寫入共享上下文與 action_plans - await self._save_context(session_id, "nemotron", { - "plan": result.get("plan"), - "actions_taken": result.get("actions_taken", []), - }) - await self._save_action_plan(event, result.get("plan")) - return True - except Exception as e: - sys_log.error(f"[Watcher] L2 dispatch failed: {e}") - # 保底通知 - await self._fallback_notify(event) - return False - - async def _fallback_notify(self, event: Dict[str, Any]) -> None: - """當 AI 失敗時,直接通知並記錄原因""" - sku = event["payload"]["sku"] - name = event["payload"]["name"] - text = ( - f"⚠️ [Watcher Fallback] {sku} {name}\n" - f"原因:{event['summary']}\n" - f"建議:立即人工檢查銷售與庫存狀態。" - ) - await self._notify_telegram(text) - - async def _notify_telegram(self, text: str) -> bool: - """透過 Telegram 發送訊息""" - from services.telegram_templates import alert as render_alert - bot_token = os.getenv("TELEGRAM_BOT_TOKEN") - if not bot_token: - sys_log.warning("[Watcher] TELEGRAM_BOT_TOKEN 未設定") - return False - chat_ids_raw = os.getenv("TELEGRAM_CHAT_IDS", "[]") - try: - chat_ids = json.loads(chat_ids_raw) - except json.JSONDecodeError: - chat_ids = [] - url = f"https://api.telegram.org/bot{bot_token}/sendMessage" - payload = { - "chat_id": chat_ids[0] if chat_ids else -1003940688311, - "text": render_alert(title="銷售異常通知", content=text), - "parse_mode": "HTML", - } - try: - r = requests.post(url, json=payload, timeout=10) - return r.ok - except Exception as e: - sys_log.error(f"[Watcher] Telegram 通知失敗: {e}") - return False - - def _ensure_session(self, sku: str) -> str: - """保證 session_id 存在(簡化:skuid 作為 session)""" - return f"session:{sku}" - - async def _save_context(self, session_id: str, agent: str, data: Dict[str, Any]) -> None: - """寫入 agent_context(共享記憶)""" - session = get_session() - try: - # 刪除舊的 key - session.execute( - text("DELETE FROM agent_context WHERE session_id = :sid AND agent_name = :ag"), - {"sid": session_id, "ag": agent}, - ) - # 寫入新 context - session.execute( - text(""" - INSERT INTO agent_context - (session_id, agent_name, context_key, context_val, created_at, ttl_minutes) - VALUES - (:sid, :ag, :ck, :cv, NOW(), :ttl) - """), - { - "sid": session_id, - "ag": agent, - "ck": "latest", - "cv": json.dumps(data, ensure_ascii=False), - "ttl": CACHE_TTL_MIN * 2, - }, - ) - session.commit() - except Exception as e: - session.rollback() - sys_log.warning(f"[Watcher] 寫入 context 失敗: {e}") - finally: - session.close() - - async def _save_action_plan(self, event: Dict[str, Any], plan: Optional[Dict[str, Any]]) -> None: - """將 NemoTron 的 plan 寫入 action_plans""" - if not plan: - return - session = get_session() - try: - sku = event["payload"]["sku"] - session.execute( - text(""" - INSERT INTO action_plans - (session_id, plan_type, sku, payload, status, created_by) - VALUES - (:sid, :pt, :sku, :pl, 'pending', 'nemotron') - """), - { - "sid": f"session:{sku}", - "pt": plan.get("type", "price_adjust"), - "sku": sku, - "pl": json.dumps(plan, ensure_ascii=False), - }, - ) - session.commit() - except Exception as e: - session.rollback() - sys_log.warning(f"[Watcher] 寫入 action_plan 失敗: {e}") - finally: - session.close()