chore: 刪除孤兒 AI service 並補齊 env 範例

ADR-017 Phase 3f-5:刪除未被 runtime 引用的 elephant_alpha_decision_router、telegram_ai_integration、watcher_agent;補 .env.example 的 Aider/AutoHeal/NVIDIA/OpenClaw/backup/report/PG sync 等實際讀取變數。
This commit is contained in:
OoO
2026-04-29 21:46:24 +08:00
parent 53edcc0077
commit 4d5a995718
4 changed files with 95 additions and 1258 deletions

View File

@@ -166,3 +166,98 @@ SSH_JUMP_HOST=192.168.0.110
SSH_JUMP_USER=wooo
SSH_TARGET_HOST=192.168.0.188
SSH_TARGET_USER=ollama
# ──────────────────────────────────────────────────────────────────────────
# AIOps / Autonomous Code RepairADR-014
# ──────────────────────────────────────────────────────────────────────────
# [選填] Aider 自動修復執行所在 SSH 主機(預設 110 Gateway
HEAL_SSH_HOST=192.168.0.110
HEAL_SSH_USER=wooo
HEAL_SSH_PORT=22
# [選填] SSH private key 路徑;未設定則使用 services/aider_heal_executor.py 預設值
DEPLOY_SSH_KEY_PATH=/home/wooo/.ssh/id_ed25519
# [選填] 110 主機上的 repo 路徑
AIDER_REPO_PATH=/home/wooo/ewoooc
# [選填] Aider 使用的模型與 Ollama API endpoint
AIDER_MODEL=ollama/qwen2.5-coder:7b
OLLAMA_API_BASE=http://192.168.0.111:11434
# [選填] 自動修復安全閥
AIDER_MAX_DIFF_LINES=50
AIDER_MAX_HOURLY_FIX=5
MOMO_BASE_URL=https://mo.wooo.work
# ──────────────────────────────────────────────────────────────────────────
# Elephant Alpha / AutoHeal SSH 控制
# ──────────────────────────────────────────────────────────────────────────
# [選填] AutoHeal/Elephant Alpha SSH 跳板設定
ELEPHANT_ALPHA_JUMP_HOST=192.168.0.110
ELEPHANT_ALPHA_JUMP_USER=wooo
ELEPHANT_ALPHA_SSH_KEY_PATH=config/autoheal_id_ed25519
ELEPHANT_ALPHA_SSH_PORT=22
ELEPHANT_ALPHA_SSH_CONNECT_TIMEOUT=10
ELEPHANT_ALPHA_SSH_COMMAND_TIMEOUT=60
# [選填] 自愈節流與狀態快取
ELEPHANT_ALPHA_CACHE_DB=:memory:
ELEPHANT_ALPHA_ESCALATION_COOLDOWN_MIN=30
ELEPHANT_TIMEOUT=120
# ──────────────────────────────────────────────────────────────────────────
# NVIDIA NIM / OpenClaw / Internal Webhook
# ──────────────────────────────────────────────────────────────────────────
# [選填] NemoTron / NIM / OpenClaw 相關服務共用
NVIDIA_API_KEY=your_nvidia_api_key_here
INTERNAL_WEBHOOK_TOKEN=your_internal_webhook_token_here
# [選填] OpenClaw Telegram bot
OPENCLAW_BOT_TOKEN=your_openclaw_bot_token_here
OPENCLAW_GROUP_ID=-1003940688311
OPENCLAW_ALLOWED_USERS=
# [選填] AI provider 選擇與外部資料源
AI_PROVIDER=ollama
YOUTUBE_API_KEY=
GEMINI_TIMEOUT=60
# ──────────────────────────────────────────────────────────────────────────
# Ollama / MCP / 密碼政策
# ──────────────────────────────────────────────────────────────────────────
OLLAMA_HOST=https://ollama.wooo.work/ollama
OLLAMA_MODEL=gemma3:4b
OLLAMA_TIMEOUT=120
OLLAMA_COPY_TIMEOUT=180
MCP_CACHE_TTL_HOURS=24
MCP_GEMINI_MODEL=gemini-2.0-flash
PASSWORD_MIN_LENGTH=8
PASSWORD_REQUIRE_UPPERCASE=true
PASSWORD_REQUIRE_LOWERCASE=true
PASSWORD_REQUIRE_DIGIT=true
PASSWORD_REQUIRE_SPECIAL=false
PASSWORD_SPECIAL_CHARS='!@#$%^&*()_+-=[]{}|;:,.<>?'
PASSWORD_EXPIRY_DAYS=90
# ──────────────────────────────────────────────────────────────────────────
# 備份 / 報表 / 同步
# ──────────────────────────────────────────────────────────────────────────
BACKUP_DIR=/app/data/db_backups
BACKUP_RETENTION_DAYS=7
DB_CONTAINER=momo-db
REPORTS_DIR=/app/data/reports
DATABASE_PATH=data/momo_database.db
PG_SYNC_ENABLED=false
PG_SYNC_INTERVAL=300
# [選填] 外部 BI 連結(模板全域變數)
METABASE_URL=https://mo.wooo.work/metabase
GRIST_URL=https://grist.wooo.work

View File

@@ -1,625 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Elephant Alpha Intelligent Decision Router
AI 3.0 Decision Intelligence:
- Multi-agent coordination routing
- Dynamic task allocation
- Performance-based routing
- Adaptive decision flows
"""
import asyncio
import json
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional, Tuple
from dataclasses import dataclass
from enum import Enum
import numpy as np
from services.logger_manager import SystemLogger
from services.elephant_alpha_orchestrator import elephant_orchestrator, StrategicDecision
from services.elephant_alpha_autonomous_engine import autonomous_engine
from database.manager import get_session
from sqlalchemy import text
logger = SystemLogger("ElephantAlphaRouter").get_logger()
class RoutingStrategy(Enum):
PERFORMANCE_BASED = "performance_based"
COST_OPTIMIZED = "cost_optimized"
SPEED_PRIORITY = "speed_priority"
QUALITY_FOCUS = "quality_focus"
ADAPTIVE = "adaptive"
@dataclass
class AgentPerformance:
"""Track agent performance metrics"""
agent_name: str
success_rate: float
avg_response_time: float
cost_per_decision: float
quality_score: float
reliability_score: float
last_updated: datetime
@dataclass
class RoutingDecision:
"""Routing decision metadata"""
task_id: str
task_type: str
selected_agents: List[str]
routing_strategy: RoutingStrategy
confidence: float
expected_duration: timedelta
estimated_cost: float
reasoning: str
class ElephantAlphaDecisionRouter:
"""
Intelligent Decision Router for AI Agent Orchestration
Features:
- Dynamic agent selection based on performance
- Multi-agent task coordination
- Adaptive routing strategies
- Performance monitoring and optimization
- Cost-aware routing decisions
"""
def __init__(self):
self.agent_performance: Dict[str, AgentPerformance] = {}
self.routing_strategy = RoutingStrategy.ADAPTIVE
self.performance_history: List[Dict[str, Any]] = []
# Initialize agent performance metrics
self._initialize_agent_performance()
def _initialize_agent_performance(self):
"""Initialize baseline performance metrics for all agents"""
self.agent_performance = {
"hermes": AgentPerformance(
agent_name="hermes",
success_rate=0.85,
avg_response_time=120.0, # 2 minutes
cost_per_decision=0.0,
quality_score=0.88,
reliability_score=0.92,
last_updated=datetime.now()
),
"nemotron": AgentPerformance(
agent_name="nemotron",
success_rate=0.90,
avg_response_time=30.0, # 30 seconds
cost_per_decision=0.0,
quality_score=0.82,
reliability_score=0.95,
last_updated=datetime.now()
),
"openclaw": AgentPerformance(
agent_name="openclaw",
success_rate=0.87,
avg_response_time=90.0, # 1.5 minutes
cost_per_decision=0.0,
quality_score=0.91,
reliability_score=0.89,
last_updated=datetime.now()
)
}
async def route_decision_request(self, request: Dict[str, Any]) -> RoutingDecision:
"""
Route decision request to optimal agent combination
Args:
request: Decision request with context and requirements
Returns:
RoutingDecision: Optimal routing decision
"""
# Analyze request requirements
task_analysis = self._analyze_task_requirements(request)
# Select optimal routing strategy
strategy = self._select_routing_strategy(task_analysis)
# Select best agents for the task
selected_agents = await self._select_agents(task_analysis, strategy)
# Calculate routing metrics
duration, cost, confidence = self._calculate_routing_metrics(
selected_agents, task_analysis
)
# Generate routing decision
routing_decision = RoutingDecision(
task_id=request.get("task_id", f"task_{datetime.now().timestamp()}"),
task_type=task_analysis["task_type"],
selected_agents=selected_agents,
routing_strategy=strategy,
confidence=confidence,
expected_duration=duration,
estimated_cost=cost,
reasoning=self._generate_routing_reasoning(
selected_agents, strategy, task_analysis
)
)
# Log routing decision for learning
await self._log_routing_decision(routing_decision, request)
return routing_decision
def _analyze_task_requirements(self, request: Dict[str, Any]) -> Dict[str, Any]:
"""Analyze task requirements and characteristics"""
task_type = request.get("task_type", "general_analysis")
urgency = request.get("urgency", "normal")
complexity = request.get("complexity", "medium")
quality_requirement = request.get("quality_requirement", "standard")
budget_constraint = request.get("budget_constraint", "none")
# Determine required capabilities
required_capabilities = []
if "price" in task_type.lower() or "competition" in task_type.lower():
required_capabilities.extend(["market_analysis", "price_intelligence"])
if "threat" in task_type.lower() or "alert" in task_type.lower():
required_capabilities.extend(["threat_detection", "rapid_response"])
if "strategy" in task_type.lower() or "planning" in task_type.lower():
required_capabilities.extend(["strategic_thinking", "long_term_planning"])
if "action" in task_type.lower() or "dispatch" in task_type.lower():
required_capabilities.extend(["tool_calling", "execution"])
return {
"task_type": task_type,
"urgency": urgency,
"complexity": complexity,
"quality_requirement": quality_requirement,
"budget_constraint": budget_constraint,
"required_capabilities": required_capabilities,
"estimated_data_size": request.get("data_size", "medium"),
"requires_human_oversight": request.get("requires_human_oversight", False)
}
def _select_routing_strategy(self, task_analysis: Dict[str, Any]) -> RoutingStrategy:
"""Select optimal routing strategy based on task requirements"""
urgency = task_analysis["urgency"]
quality = task_analysis["quality_requirement"]
budget = task_analysis["budget_constraint"]
if urgency == "critical":
return RoutingStrategy.SPEED_PRIORITY
elif quality == "premium":
return RoutingStrategy.QUALITY_FOCUS
elif budget != "none":
return RoutingStrategy.COST_OPTIMIZED
else:
return RoutingStrategy.ADAPTIVE
async def _select_agents(self, task_analysis: Dict[str, Any],
strategy: RoutingStrategy) -> List[str]:
"""Select optimal agents based on strategy and task requirements"""
required_capabilities = task_analysis["required_capabilities"]
complexity = task_analysis["complexity"]
# Score each agent for this task
agent_scores = {}
for agent_name, performance in self.agent_performance.items():
score = self._calculate_agent_score(
agent_name, performance, task_analysis, strategy
)
agent_scores[agent_name] = score
# Sort agents by score
sorted_agents = sorted(agent_scores.items(), key=lambda x: x[1], reverse=True)
# Select agents based on complexity and requirements
if complexity == "simple":
selected = [sorted_agents[0][0]] # Best single agent
elif complexity == "medium":
selected = [sorted_agents[0][0], sorted_agents[1][0]] # Top 2 agents
else: # complex
selected = [agent for agent, _ in sorted_agents] # All agents
return selected
def _calculate_agent_score(self, agent_name: str, performance: AgentPerformance,
task_analysis: Dict[str, Any],
strategy: RoutingStrategy) -> float:
"""Calculate score for agent based on strategy and task"""
base_score = 0.0
# Strategy-specific scoring
if strategy == RoutingStrategy.SPEED_PRIORITY:
base_score = (1.0 / performance.avg_response_time) * 100
base_score *= performance.reliability_score
elif strategy == RoutingStrategy.QUALITY_FOCUS:
base_score = performance.quality_score * 100
base_score *= performance.success_rate
elif strategy == RoutingStrategy.COST_OPTIMIZED:
base_score = (1.0 / max(performance.cost_per_decision, 0.01)) * 100
base_score *= performance.success_rate
elif strategy == RoutingStrategy.PERFORMANCE_BASED:
base_score = (performance.success_rate * performance.quality_score) * 100
else: # ADAPTIVE
# Balanced scoring
base_score = (
performance.success_rate * 30 +
performance.quality_score * 25 +
performance.reliability_score * 25 +
(1.0 / performance.avg_response_time) * 20
)
# Capability matching bonus
capability_bonus = self._calculate_capability_bonus(
agent_name, task_analysis["required_capabilities"]
)
return base_score + capability_bonus
def _calculate_capability_bonus(self, agent_name: str,
required_capabilities: List[str]) -> float:
"""Calculate bonus for agent capabilities matching requirements"""
agent_capabilities = {
"hermes": ["market_analysis", "price_intelligence", "threat_detection"],
"nemotron": ["tool_calling", "execution", "rapid_response"],
"openclaw": ["strategic_thinking", "long_term_planning", "insight_generation"]
}
if agent_name not in agent_capabilities:
return 0.0
capabilities = agent_capabilities[agent_name]
matches = sum(1 for cap in required_capabilities if cap in capabilities)
if not required_capabilities:
return 0.0
return (matches / len(required_capabilities)) * 50 # Up to 50 point bonus
def _calculate_routing_metrics(self, selected_agents: List[str],
task_analysis: Dict[str, Any]) -> Tuple[timedelta, float, float]:
"""Calculate routing decision metrics"""
# Duration estimation
total_response_time = sum(
self.agent_performance[agent].avg_response_time
for agent in selected_agents
)
# Add coordination overhead for multiple agents
if len(selected_agents) > 1:
coordination_overhead = (len(selected_agents) - 1) * 30 # 30s per additional agent
total_response_time += coordination_overhead
duration = timedelta(seconds=total_response_time)
# Cost estimation
total_cost = sum(
self.agent_performance[agent].cost_per_decision
for agent in selected_agents
)
# Confidence calculation
avg_success_rate = np.mean([
self.agent_performance[agent].success_rate
for agent in selected_agents
])
# Adjust confidence based on task complexity
complexity_penalty = {
"simple": 0.0,
"medium": -0.1,
"complex": -0.2
}.get(task_analysis["complexity"], 0.0)
confidence = max(0.5, min(0.95, avg_success_rate + complexity_penalty))
return duration, total_cost, confidence
def _generate_routing_reasoning(self, selected_agents: List[str],
strategy: RoutingStrategy,
task_analysis: Dict[str, Any]) -> str:
"""Generate human-readable reasoning for routing decision"""
reasoning_parts = []
# Strategy explanation
strategy_explanations = {
RoutingStrategy.SPEED_PRIORITY: "Prioritized speed due to urgent requirements",
RoutingStrategy.QUALITY_FOCUS: "Prioritized quality for premium requirements",
RoutingStrategy.COST_OPTIMIZED: "Optimized for cost efficiency",
RoutingStrategy.PERFORMANCE_BASED: "Selected based on historical performance",
RoutingStrategy.ADAPTIVE: "Adaptive selection balancing multiple factors"
}
reasoning_parts.append(f"Strategy: {strategy_explanations[strategy]}")
# Agent selection reasoning
agent_reasoning = []
for agent in selected_agents:
perf = self.agent_performance[agent]
agent_reasoning.append(
f"{agent}: {perf.success_rate:.1%} success rate, "
f"{perf.avg_response_time:.0f}s avg response"
)
reasoning_parts.append(f"Selected agents: {', '.join(agent_reasoning)}")
# Task complexity consideration
complexity_notes = {
"simple": "Single agent sufficient for straightforward task",
"medium": "Two agents provide balanced capability for moderate complexity",
"complex": "Full agent coordination required for comprehensive analysis"
}
reasoning_parts.append(f"Complexity: {complexity_notes[task_analysis['complexity']]}")
return " | ".join(reasoning_parts)
async def _log_routing_decision(self, decision: RoutingDecision,
request: Dict[str, Any]):
"""Log routing decision for performance tracking"""
log_entry = {
"timestamp": datetime.now().isoformat(),
"task_id": decision.task_id,
"task_type": decision.task_type,
"selected_agents": decision.selected_agents,
"strategy": decision.routing_strategy.value,
"confidence": decision.confidence,
"estimated_duration": decision.expected_duration.total_seconds(),
"estimated_cost": decision.estimated_cost,
"reasoning": decision.reasoning
}
self.performance_history.append(log_entry)
# Keep only last 1000 entries
if len(self.performance_history) > 1000:
self.performance_history = self.performance_history[-1000:]
# W4: ADR-007 dual-write → ai_insights DB (non-fatal)
try:
with get_session() as session:
session.execute(text("""
INSERT INTO ai_insights
(insight_type, content, confidence, created_by, status, metadata_json)
VALUES
(:itype, :content, :confidence, 'elephant_router', 'active', :metadata)
"""), {
"itype": "routing_decision",
"content": f"[{decision.routing_strategy.value}] {decision.task_type}{decision.selected_agents}",
"confidence": decision.confidence,
"metadata": json.dumps(log_entry),
})
session.commit()
except Exception as e:
logger.warning("[ElephantAlphaRouter] DB routing log failed (non-fatal): %s", e)
async def execute_routed_decision(self, routing_decision: RoutingDecision,
request: Dict[str, Any]) -> Dict[str, Any]:
"""Execute decision through routed agents"""
start_time = datetime.now()
results = {}
try:
# Build business context for Elephant Alpha
context = {
**request,
"routing_decision": {
"selected_agents": routing_decision.selected_agents,
"strategy": routing_decision.routing_strategy.value,
"confidence": routing_decision.confidence
}
}
# Get strategic coordination from Elephant Alpha
strategic_decision = await elephant_orchestrator.analyze_and_coordinate(context)
# Execute through selected agents
for agent_name in routing_decision.selected_agents:
agent_result = await self._execute_agent_task(
agent_name, strategic_decision, request
)
results[agent_name] = agent_result
# Aggregate results
final_result = self._aggregate_agent_results(results, strategic_decision)
# Update performance metrics
await self._update_performance_metrics(
routing_decision, final_result, start_time
)
return final_result
except Exception as e:
logger.error(f"[ElephantAlphaRouter] Execution failed: {e}")
return {
"success": False,
"error": str(e),
"routing_decision": routing_decision.__dict__
}
async def _execute_agent_task(self, agent_name: str,
strategic_decision: StrategicDecision,
request: Dict[str, Any]) -> Dict[str, Any]:
"""Execute task through specific agent"""
try:
if agent_name == "hermes":
return await self._execute_hermes_task(strategic_decision, request)
elif agent_name == "nemotron":
return await self._execute_nemotron_task(strategic_decision, request)
elif agent_name == "openclaw":
return await self._execute_openclaw_task(strategic_decision, request)
else:
raise ValueError(f"Unknown agent: {agent_name}")
except Exception as e:
logger.error(f"[ElephantAlphaRouter] Agent {agent_name} execution failed: {e}")
return {"success": False, "error": str(e), "agent": agent_name}
# B7 FIX: all three execute methods must be async (called with await above)
async def _execute_hermes_task(self, strategic_decision: StrategicDecision,
request: Dict[str, Any]) -> Dict[str, Any]:
"""Execute task through Hermes agent"""
from services.hermes_analyst_service import HermesAnalystService
hermes = HermesAnalystService()
try:
result = hermes.run() # sync call inside async is fine
return {
"success": result.success,
"agent": "hermes",
"result": result.__dict__ if hasattr(result, '__dict__') else str(result),
"execution_time": result.analysis_duration_sec if hasattr(result, 'analysis_duration_sec') else 0
}
except Exception as e:
return {"success": False, "agent": "hermes", "error": str(e)}
async def _execute_nemotron_task(self, strategic_decision: StrategicDecision,
request: Dict[str, Any]) -> Dict[str, Any]:
"""Execute task through NemoTron agent (routed via Orchestrator plan)"""
return {
"success": True,
"agent": "nemotron",
"message": "NemoTron action dispatched via Orchestrator plan",
"execution_time": 5.0
}
async def _execute_openclaw_task(self, strategic_decision: StrategicDecision,
request: Dict[str, Any]) -> Dict[str, Any]:
"""Execute task through OpenClaw agent"""
from services.openclaw_strategist_service import generate_weekly_strategy_report
try:
report = generate_weekly_strategy_report()
return {
"success": True,
"agent": "openclaw",
"result": {"report_length": len(report)},
"execution_time": 45.0
}
except Exception as e:
return {"success": False, "agent": "openclaw", "error": str(e)}
def _aggregate_agent_results(self, results: Dict[str, Any],
strategic_decision: StrategicDecision) -> Dict[str, Any]:
"""Aggregate results from multiple agents"""
successful_agents = [
agent for agent, result in results.items()
if result.get("success", False)
]
failed_agents = [
agent for agent, result in results.items()
if not result.get("success", False)
]
# Calculate overall success
overall_success = len(successful_agents) > 0 and len(failed_agents) == 0
# Aggregate execution times
total_execution_time = sum(
result.get("execution_time", 0)
for result in results.values()
)
return {
"success": overall_success,
"strategic_decision": strategic_decision.__dict__,
"agent_results": results,
"successful_agents": successful_agents,
"failed_agents": failed_agents,
"total_execution_time": total_execution_time,
"agents_used": len(successful_agents) + len(failed_agents)
}
async def _update_performance_metrics(self, routing_decision: RoutingDecision,
result: Dict[str, Any], start_time: datetime):
"""Update agent performance metrics based on execution results"""
execution_time = (datetime.now() - start_time).total_seconds()
success = result.get("success", False)
# Update metrics for each used agent
for agent_name in routing_decision.selected_agents:
if agent_name in result.get("agent_results", {}):
await self._update_single_agent_performance(
agent_name, success, execution_time
)
async def _update_single_agent_performance(self, agent_name: str,
success: bool, execution_time: float):
"""Update performance metrics for a single agent"""
if agent_name not in self.agent_performance:
return
performance = self.agent_performance[agent_name]
# Exponential moving average updates
alpha = 0.1 # Learning rate
# Update success rate
new_success = 1.0 if success else 0.0
performance.success_rate = (
alpha * new_success +
(1 - alpha) * performance.success_rate
)
# Update response time
performance.avg_response_time = (
alpha * execution_time +
(1 - alpha) * performance.avg_response_time
)
# Update timestamp
performance.last_updated = datetime.now()
logger.info("[ElephantAlphaRouter] Updated %s performance: success_rate=%.2f, response_time=%.1fs",
agent_name, performance.success_rate, performance.avg_response_time)
# W4: ADR-007 — persist agent performance snapshot to DB (non-fatal)
try:
with get_session() as session:
session.execute(text("""
INSERT INTO ai_insights
(insight_type, content, confidence, created_by, status, metadata_json)
VALUES
(:itype, :content, :confidence, 'elephant_router', 'active', :metadata)
"""), {
"itype": "agent_performance_snapshot",
"content": f"{agent_name}: success={performance.success_rate:.3f} rt={performance.avg_response_time:.1f}s",
"confidence": performance.success_rate,
"metadata": json.dumps({
"agent": agent_name,
"success_rate": performance.success_rate,
"avg_response_time": performance.avg_response_time,
"quality_score": performance.quality_score,
"reliability_score": performance.reliability_score,
"timestamp": datetime.now().isoformat(),
}),
})
session.commit()
except Exception as e:
logger.warning("[ElephantAlphaRouter] DB perf snapshot failed (non-fatal): %s", e)
# Global router instance
decision_router = ElephantAlphaDecisionRouter()

View File

@@ -1,282 +0,0 @@
#!/usr/bin/env python3
"""
Telegram Bot AI Integration
Integrate existing AI Orchestrator for natural language processing
All responses in Traditional Chinese
"""
import asyncio
import logging
from typing import Dict, Any, Optional
from services.ai_orchestrator import AIOrchestrator
from services import openclaw_strategist_service
from datetime import datetime
logger = logging.getLogger(__name__)
class TelegramAIIntegration:
"""Telegram Bot AI Integration for natural language understanding"""
def __init__(self):
self.orchestrator = AIOrchestrator()
async def process_natural_language_query(self, user_message: str, user_id: int, chat_id: int) -> Dict[str, Any]:
"""
Process natural language query using existing AI infrastructure
Args:
user_message: User's message in Traditional Chinese
user_id: Telegram user ID
chat_id: Telegram chat ID
Returns:
Response dictionary with Traditional Chinese content
"""
try:
# Create session ID based on user and chat
session_id = f"tg_{user_id}_{chat_id}"
# Prepare event for AI processing
event = {
"type": "telegram_query",
"source": "telegram_bot",
"timestamp": datetime.now().isoformat(),
"user_id": user_id,
"chat_id": chat_id,
"message": user_message,
"language": "zh-TW", # Traditional Chinese
"context": "telegram_group_chat"
}
# L1: Semantic understanding (Hermes)
l1_result = await self.orchestrator.handle_l1(event, session_id)
if not l1_result or l1_result.get("metadata", {}).get("source") != "hermes_llm":
logger.warning(
f"[TelegramAIIntegration] Hermes LLM 未回應,走規則引擎降級"
f"session={session_id} source={( l1_result or {}).get('metadata', {}).get('source', 'none')}"
)
# Check if this is a complex query requiring L2 processing
if self._is_complex_query(user_message, l1_result):
# L2: Planning and execution (Nemotron)
l2_result = await self.orchestrator.handle_l2(event, session_id)
dispatch_to = (l2_result or {}).get("dispatch_to", "direct_response")
complexity = float((l1_result or {}).get("complexity_score", 0.0) or 0.0)
# 若 L2 判定走 OpenClaw 或複雜度 >= 0.7 → 呼叫策略師補真實繁中洞察
if dispatch_to == "openclaw" or complexity >= 0.7:
strategist_text = ""
try:
# 同步呼叫 Gemini 可能耗時數秒,丟到 executor 避免阻塞 event loop
loop = asyncio.get_running_loop()
strategist_text = await loop.run_in_executor(
None,
openclaw_strategist_service.generate_strategy_response,
user_message,
{
"intent": (l1_result or {}).get("intent"),
"user_id": user_id,
"chat_id": chat_id,
},
)
except Exception as e:
logger.error(
f"[TelegramAIIntegration] OpenClaw 策略師呼叫失敗"
f"{type(e).__name__}: {e}",
exc_info=True,
)
strategist_text = ""
response = self._format_complex_response(l1_result, l2_result, user_message)
if strategist_text:
response["response_text"] = strategist_text
response["strategist_used"] = True
return response
return self._format_complex_response(l1_result, l2_result, user_message)
else:
# Simple query, handle directly
return self._format_simple_response(l1_result, user_message)
except Exception as e:
logger.error(f"[TelegramAIIntegration] Error processing query: {e}", exc_info=True)
return self._format_error_response(user_message)
def _is_complex_query(self, message: str, l1_result: Dict[str, Any]) -> bool:
"""Determine if query requires complex processing"""
complex_indicators = [
"momo", " momo", "momo ",
"2026", "2025", "2024", # Date ranges
"brand", "brands", "brand:", "brands:", # Brand queries
"category", "categories", "category:", # Category queries
"report", "analysis", "ppt", "presentation", # Report generation
"compare", "comparison", "vs", "versus" # Comparison queries
]
message_lower = message.lower()
# Check for complex indicators
for indicator in complex_indicators:
if indicator in message_lower:
return True
# Check L1 analysis result
if l1_result.get("complexity_score", 0) > 0.7:
return True
if l1_result.get("requires_data_fetch", False):
return True
return False
def _format_simple_response(self, l1_result: Dict[str, Any], original_message: str) -> Dict[str, Any]:
"""Format response for simple queries"""
intent = l1_result.get("intent", "unknown")
confidence = l1_result.get("confidence", 0.0)
# 繁體中文(台灣)回應模板
responses = {
"greeting": {
"zh_tw": "您好!我是 MOMO Pro 智能助理,今天需要什麼協助?",
"suggestions": ["查看今日業績", "商品排行榜", "市場情報摘要"],
},
"help": {
"zh_tw": (
"我可以協助您處理業績查詢、商品資訊、市場情報、產出報告等需求。\n"
"請直接輸入問題,或使用選單選擇功能。"
),
"suggestions": ["業績表現", "商品趨勢", "市場分析"],
},
"unknown": {
"zh_tw": "收到您的訊息,若需特定功能,請從選單選擇或使用 /help 查看可用指令。",
"suggestions": ["顯示主選單", "查看業績數據", "商品分析"],
},
}
# 若 L1 已帶 preliminary_answerHermes 生成或規則引擎),優先採用
preliminary = (l1_result or {}).get("preliminary_answer", "") or ""
response_data = responses.get(intent, responses["unknown"])
if preliminary:
response_data = {**response_data, "zh_tw": preliminary}
return {
"success": True,
"type": "simple_response",
"intent": intent,
"confidence": confidence,
"response_text": response_data["zh_tw"],
"suggestions": response_data["suggestions"],
"show_menu": intent == "unknown"
}
def _format_complex_response(self, l1_result: Dict[str, Any], l2_result: Dict[str, Any], original_message: str) -> Dict[str, Any]:
"""Format response for complex queries requiring data fetching"""
action_plan = l2_result.get("action_plan", {})
# Extract relevant information
query_type = self._extract_query_type(original_message)
date_range = self._extract_date_range(original_message)
brands = self._extract_brands(original_message)
# 繁體中文(台灣)回應
query_type_zh = {
"sales analysis": "業績分析",
"product analysis": "商品分析",
"market intelligence": "市場情報",
"report generation": "報告產製",
"comparative analysis": "比較分析",
"general query": "一般查詢",
}.get(query_type, query_type)
response_text = f"正在處理您的「{query_type_zh}」需求"
if date_range:
response_text += f",期間:{date_range}"
if brands:
response_text += f",品牌:{', '.join(brands)}"
response_text += "。分析準備中,稍候片刻…"
return {
"success": True,
"type": "complex_response",
"query_type": query_type,
"date_range": date_range,
"brands": brands,
"action_plan": action_plan,
"response_text": response_text,
"requires_processing": True,
"processing_status": "queued"
}
def _format_error_response(self, original_message: str) -> Dict[str, Any]:
"""Format error response in Traditional Chinese"""
return {
"success": False,
"type": "error_response",
"response_text": "抱歉,處理您的訊息時發生問題,請改用選單功能或稍後再試。",
"error_suggestions": [
"查看今日業績",
"商品排行榜",
"市場情報摘要",
"使用 /help 查看可用指令",
],
"show_menu": True,
}
def _extract_query_type(self, message: str) -> str:
"""Extract type of query from message"""
if any(word in message.lower() for word in ["sales", "revenue", "performance"]):
return "sales analysis"
elif any(word in message.lower() for word in ["product", "brand", "item"]):
return "product analysis"
elif any(word in message.lower() for word in ["market", "trend", "intelligence"]):
return "market intelligence"
elif any(word in message.lower() for word in ["report", "ppt", "presentation"]):
return "report generation"
elif any(word in message.lower() for word in ["compare", "comparison", "vs"]):
return "comparative analysis"
else:
return "general query"
def _extract_date_range(self, message: str) -> Optional[str]:
"""Extract date range from message"""
import re
date_pattern = r'(\d{4}[./-]\d{2}[./-]\d{2})\s*[-~]\s*(\d{4}[./-]\d{2}[./-]\d{2})'
match = re.search(date_pattern, message)
if match:
start = match.group(1).replace('/', '-').replace('.', '-')
end = match.group(2).replace('/', '-').replace('.', '-')
return f"{start}{end}"
return None
def _extract_brands(self, message: str) -> list:
"""Extract brand names from message (Chinese and English)"""
brand_mapping = {
"nivea": "Nivea", "loreal": "Loreal", "sk-ii": "SK-II",
"kiehls": "Kiehls", "clinique": "Clinique", "dior": "Dior",
"chanel": "Chanel", "ysl": "YSL", "givenchy": "Givenchy",
"gucci": "Gucci", "prada": "Prada", "versace": "Versace",
"armani": "Armani", "coach": "Coach", "michael kors": "Michael Kors",
"neutrogena": "Neutrogena", "aveeno": "Aveeno",
"estee lauder": "Estee Lauder", "lancome": "Lancome",
"biotherm": "Biotherm", "clarins": "Clarins", "nars": "NARS",
"bobbi brown": "Bobbi Brown", "mac": "MAC", "tumi": "Tumi",
"samsonite": "Samsonite", "longchamp": "Longchamp", "shiseido": "Shiseido"
}
message_lower = message.lower()
found_brands = []
for brand_key, brand_name in brand_mapping.items():
if brand_key in message_lower and brand_name not in found_brands:
found_brands.append(brand_name)
return found_brands
# Global instance for use in telegram bot service
telegram_ai_integration = TelegramAIIntegration()
async def process_telegram_query(user_message: str, user_id: int, chat_id: int) -> Dict[str, Any]:
"""Convenience function for processing telegram queries"""
return await telegram_ai_integration.process_natural_language_query(user_message, user_id, chat_id)

View File

@@ -1,351 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Watcher Agent — 主動偵測與觸發
角色:
- 定期輪詢 sales snapshot檢查銷量下滑或競品價格突漲
- 發生異常時構建 event 並 dispatch 到 EventRouter
- 與 ActionPlanner 配合生成後續計畫
設計:
- 輕量級,無需額外 infra僅用 PostgreSQL
- 異常閾值可透過 env 調整
- 與 OpenClaw 共享 agent_context 記憶
"""
import json
import logging
import os
import time
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional
import requests
from sqlalchemy import text
from database.manager import get_session
from services.ai_orchestrator import AIOrchestrator
from services.event_router import dispatch
sys_log = logging.getLogger(__name__)
# ─── 環境設定 ────────────────────────────────────────────────
SALES_SNAPSHOT_TABLE = os.getenv("WATCHER_SNAPSHOT_TABLE", "daily_sales_snapshot")
SALES_DROP_THRESHOLD = float(os.getenv("WATCHER_SALES_DROP_THRESHOLD", "0.20")) # 20%
PRICE_SURGE_THRESHOLD = float(os.getenv("WATCHER_PRICE_SURGE_THRESHOLD", "0.15")) # 15%
CACHE_TTL_MIN = int(os.getenv("WATCHER_CACHE_TTL_MIN", "30")) # 輪詻間隔
# ─── 共享上下文鍵 ────────────────────────────────────────────
WATCHER_CTX_NS = "watcher"
class WatcherAgent:
"""
主動偵測 Agent
流程:
1) 載入最近兩週銷售快照
2) 計算環比變化
3) 篩選異常 SKU
4) 構建 event 並 dispatch
5) 寫入 agent_context 供後續 Agent 使用
"""
def __init__(self, orchestrator: Optional[AIOrchestrator] = None):
self.orchestrator = orchestrator or AIOrchestrator()
async def scan(self) -> int:
"""執行一次掃描,回傳觸發的異常數"""
rows = await self._fetch_sales_snapshot()
if not rows:
sys_log.info("[Watcher] 無銷售快照,跳過掃描")
return 0
anomalies = self._detect_anomalies(rows)
if not anomalies:
sys_log.info("[Watcher] 未檢測到異常")
return 0
sys_log.info(f"[Watcher] 檢測到 {len(anomalies)} 筆異常,開始 dispatch")
triggered = 0
for an in anomalies:
if await self._dispatch_anomaly(an):
triggered += 1
return triggered
async def track_outcomes(self, days: int = 7) -> None:
"""
排程回撥:執行後 days 天後檢查 action_outcomes
並將結果回饋給 OpenClaw 學習。
這裡僅作佈署示意;實際排程由外部 scheduler 負責。
"""
sys_log.info(f"[Watcher] 排程 outcome 回撥({days} 天後)")
# 範例:
# await outcome_tracker.schedule_follow_up(plan_id, sku, metric)
# ── 內部方法 ────────────────────────────────────────────────
async def _fetch_sales_snapshot(self) -> List[Dict[str, Any]]:
"""
讀取銷售快照。
欄位假設:
- sku
- name
- category
- sales_curr (最近7天銷售金額)
- sales_prev (前7天銷售金額)
- price_momo (MOMO 價格)
- price_pchome (PChome 價格)
- stock_status (庫存狀態)
若實際欄位名不同,請依實際調整。
"""
session = get_session()
try:
sql = text(f"""
SELECT sku, name, category,
COALESCE(sales_curr, 0) AS sales_curr,
COALESCE(sales_prev, 0) AS sales_prev,
price_momo, price_pchome, stock_status
FROM {SALES_SNAPSHOT_TABLE}
WHERE snapshot_date = CURRENT_DATE - INTERVAL '1 day'
LIMIT 500
""")
result = session.execute(sql).fetchall()
return [dict(row._mapping) for row in result]
except Exception as e:
sys_log.error(f"[Watcher] 無法讀取快照: {e}")
return []
finally:
session.close()
def _detect_anomalies(self, rows: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
anomalies: List[Dict[str, Any]] = []
for r in rows:
sku = r["sku"]
name = r["name"]
curr = float(r["sales_curr"] or 0)
prev = float(r["sales_prev"] or 1) # 避免除以 0
pchome = r["price_pchome"]
momo = r["price_momo"]
stock = r.get("stock_status", "unknown")
drop_pct = (curr - prev) / prev if prev else 0.0
price_gap_pct = ((momo - pchome) / pchome * 100) if pchome else 0.0
reasons = []
# 銷量下滑異常
if drop_pct <= -SALES_DROP_THRESHOLD:
reasons.append(
f"銷量下滑 {drop_pct:+.1%}(閾值 {SALES_DROP_THRESHOLD:+.0%}"
)
# 競品價格突漲(若我方價格低且差距擴大)
if price_gap_pct > PRICE_SURGE_THRESHOLD:
reasons.append(
f"競品價格突漲 {price_gap_pct:+.1f}% 形成高價差"
)
# 庫存危機(可擴充)
if stock in ("out_of_stock", "low_stock"):
reasons.append(f"庫存狀態: {stock}")
if not reasons:
continue
anomalies.append({
"sku": sku,
"name": name,
"category": r.get("category", ""),
"drop_pct": drop_pct,
"price_gap_pct": price_gap_pct,
"reasons": reasons,
"stock": stock,
"momo_price": momo,
"pchome_price": pchome,
})
return anomalies
async def _dispatch_anomaly(self, anom: Dict[str, Any]) -> bool:
"""
依異常類型決定路由:
- 銷量下滑 + 價差微小 → L1分析原因
- 銷量下滑 + 價差大 → L2規劃 + 審核)
- 競品價格突漲 → L2防範被動
"""
drop = anom["drop_pct"]
gap = anom["price_gap_pct"]
sku = anom["sku"]
name = anom["name"]
session_id = self._ensure_session(sku)
# 構建 event payload與 EventRouter 對齊)
event = {
"source": "watcher",
"event_type": "sales_anomaly",
"severity": "alert",
"title": f"銷售異常偵測 — {sku} {name}",
"summary": "; ".join(anom["reasons"]),
"payload": {
"sku": sku,
"name": name,
"category": anom["category"],
"drop_pct": anom["drop_pct"],
"price_gap_pct": anom["price_gap_pct"],
"stock": anom["stock"],
"momo_price": anom["momo_price"],
"pchome_price": anom["pchome_price"],
"sales_prev": anom.get("sales_prev"),
"sales_curr": anom.get("sales_curr"),
},
"impact": "銷量下滑可能導致收入損失",
"status": "open",
}
# 決策路由
if drop <= -SALES_DROP_THRESHOLD and abs(gap) < PRICE_SURGE_THRESHOLD:
# 銷量下滑但價差微小 → 檢查是否非價格因素(缺貨/流量)
event["severity"] = "alert"
event["payload"]["non_price_factor"] = True
# 交由 L1 分析原因
return await self._route_l1(event, session_id)
else:
# 銷量下滑 + 價差大 或 競品價格突漲 → L2 規劃
event["severity"] = "alert"
return await self._route_l2(event, session_id)
async def _route_l1(self, event: Dict[str, Any], session_id: str) -> bool:
"""L1Hermes 分析下滑原因"""
try:
result = await self.orchestrator.handle_l1(event, session_id)
sys_log.info(f"[Watcher] L1 dispatch success for {event['payload']['sku']}")
# 寫入共享上下文
await self._save_context(session_id, "hermes", {
"summary": result.get("summary"),
"probable_cause": result.get("probable_cause"),
"actions": result.get("actions", []),
})
return True
except Exception as e:
sys_log.error(f"[Watcher] L1 dispatch failed: {e}")
# 保底:直接通知
await self._fallback_notify(event)
return False
async def _route_l2(self, event: Dict[str, Any], session_id: str) -> bool:
"""L2NemoTron 規劃 + 審核閘"""
try:
result = await self.orchestrator.handle_l2(event, session_id)
sys_log.info(f"[Watcher] L2 dispatch success for {event['payload']['sku']}")
# 寫入共享上下文與 action_plans
await self._save_context(session_id, "nemotron", {
"plan": result.get("plan"),
"actions_taken": result.get("actions_taken", []),
})
await self._save_action_plan(event, result.get("plan"))
return True
except Exception as e:
sys_log.error(f"[Watcher] L2 dispatch failed: {e}")
# 保底通知
await self._fallback_notify(event)
return False
async def _fallback_notify(self, event: Dict[str, Any]) -> None:
"""當 AI 失敗時,直接通知並記錄原因"""
sku = event["payload"]["sku"]
name = event["payload"]["name"]
text = (
f"⚠️ [Watcher Fallback] {sku} {name}\n"
f"原因:{event['summary']}\n"
f"建議:立即人工檢查銷售與庫存狀態。"
)
await self._notify_telegram(text)
async def _notify_telegram(self, text: str) -> bool:
"""透過 Telegram 發送訊息"""
from services.telegram_templates import alert as render_alert
bot_token = os.getenv("TELEGRAM_BOT_TOKEN")
if not bot_token:
sys_log.warning("[Watcher] TELEGRAM_BOT_TOKEN 未設定")
return False
chat_ids_raw = os.getenv("TELEGRAM_CHAT_IDS", "[]")
try:
chat_ids = json.loads(chat_ids_raw)
except json.JSONDecodeError:
chat_ids = []
url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
payload = {
"chat_id": chat_ids[0] if chat_ids else -1003940688311,
"text": render_alert(title="銷售異常通知", content=text),
"parse_mode": "HTML",
}
try:
r = requests.post(url, json=payload, timeout=10)
return r.ok
except Exception as e:
sys_log.error(f"[Watcher] Telegram 通知失敗: {e}")
return False
def _ensure_session(self, sku: str) -> str:
"""保證 session_id 存在簡化skuid 作為 session"""
return f"session:{sku}"
async def _save_context(self, session_id: str, agent: str, data: Dict[str, Any]) -> None:
"""寫入 agent_context共享記憶"""
session = get_session()
try:
# 刪除舊的 key
session.execute(
text("DELETE FROM agent_context WHERE session_id = :sid AND agent_name = :ag"),
{"sid": session_id, "ag": agent},
)
# 寫入新 context
session.execute(
text("""
INSERT INTO agent_context
(session_id, agent_name, context_key, context_val, created_at, ttl_minutes)
VALUES
(:sid, :ag, :ck, :cv, NOW(), :ttl)
"""),
{
"sid": session_id,
"ag": agent,
"ck": "latest",
"cv": json.dumps(data, ensure_ascii=False),
"ttl": CACHE_TTL_MIN * 2,
},
)
session.commit()
except Exception as e:
session.rollback()
sys_log.warning(f"[Watcher] 寫入 context 失敗: {e}")
finally:
session.close()
async def _save_action_plan(self, event: Dict[str, Any], plan: Optional[Dict[str, Any]]) -> None:
"""將 NemoTron 的 plan 寫入 action_plans"""
if not plan:
return
session = get_session()
try:
sku = event["payload"]["sku"]
session.execute(
text("""
INSERT INTO action_plans
(session_id, plan_type, sku, payload, status, created_by)
VALUES
(:sid, :pt, :sku, :pl, 'pending', 'nemotron')
"""),
{
"sid": f"session:{sku}",
"pt": plan.get("type", "price_adjust"),
"sku": sku,
"pl": json.dumps(plan, ensure_ascii=False),
},
)
session.commit()
except Exception as e:
session.rollback()
sys_log.warning(f"[Watcher] 寫入 action_plan 失敗: {e}")
finally:
session.close()