diff --git a/scheduler.py b/scheduler.py index ce2d639..c1219f2 100644 --- a/scheduler.py +++ b/scheduler.py @@ -1674,6 +1674,12 @@ def run_icaim_analysis_task(): if not result.threats: logging.info("[Scheduler] [ICAIM] 無威脅商品,跳過 NemoTron dispatch") + # 仍觸發 OpenClaw Meta-Analysis 更新系統效能快照 + try: + from services.openclaw_strategist_service import generate_meta_analysis_report + generate_meta_analysis_report() + except Exception as _meta_e: + logging.warning(f"[Scheduler] [ICAIM] Meta-Analysis 非阻塞失敗: {_meta_e}") return # Step 2:NemoTron 派發器 → Telegram @@ -1689,6 +1695,13 @@ def run_icaim_analysis_task(): ) _save_stats('icaim_dispatch', {**dispatch_result, "status": "Success"}) + # Step 3:派發完成後觸發 OpenClaw Meta-Analysis(非阻塞) + try: + from services.openclaw_strategist_service import generate_meta_analysis_report + generate_meta_analysis_report() + except Exception as _meta_e: + logging.warning(f"[Scheduler] [ICAIM] Meta-Analysis 非阻塞失敗: {_meta_e}") + except Exception as e: import traceback as _tb logging.error(f"[Scheduler] [ICAIM] 🚨 任務異常 | Error: {e}") @@ -1885,15 +1898,25 @@ def run_backup_monitor_task(): def run_openclaw_meta_analysis_task(): - """每週日 02:00 — OpenClaw 週報 Meta-Analysis(AI 系統學習效能自我審視)""" + """每 6 小時 — OpenClaw Meta-Analysis(AI 系統效能自我審視 + 電商洞察快照)""" try: from services.openclaw_strategist_service import generate_meta_analysis_report report = generate_meta_analysis_report() - logging.info(f"[Scheduler] [MetaAnalysis] 完成 | 長度={len(report)} 字元") + logging.info(f"[Scheduler] [MetaAnalysis] ✅ 完成 | 長度={len(report)} 字元") _save_stats('meta_analysis', {"status": "OK", "length": len(report)}) except Exception as e: - logging.error(f"[Scheduler] [MetaAnalysis] Meta-Analysis 任務異常: {e}") + import traceback as _tb + logging.error(f"[Scheduler] [MetaAnalysis] 🚨 Meta-Analysis 任務異常: {e}") _save_stats('meta_analysis', {"status": "Error", "error": str(e)}) + try: + from services.auto_heal_service import auto_heal_service + auto_heal_service.handle_exception( + task_name="run_openclaw_meta_analysis_task", + exception=e, + traceback_str=_tb.format_exc(), + ) + except Exception as _heal_e: + logging.error(f"[Scheduler] [MetaAnalysis] auto_heal_service 失敗: {_heal_e}") def run_dedup_batch_task(): diff --git a/services/elephant_alpha_autonomous_engine.py b/services/elephant_alpha_autonomous_engine.py index de0267b..ee7b4b9 100644 --- a/services/elephant_alpha_autonomous_engine.py +++ b/services/elephant_alpha_autonomous_engine.py @@ -1,7 +1,5 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- """ -Elephant Alpha Autonomous Decision Engine +elephant_alpha_autonomous_engine.py AI 3.0 Autonomous Operations: - Self-learning from outcomes @@ -10,7 +8,7 @@ AI 3.0 Autonomous Operations: - Continuous improvement loop ADR-012 Compliance: - §③ 單一 audit trail — 所有執行完畢後必發 triaged_alert Telegram + §③ 單一 audit trail — 所有基行完畢後必發 triaged_alert Telegram §⑤ 雙寫強制 — ai_insights (由 orchestrator._log_decision) + Telegram ADR-013 Compliance: resource_optimization trigger → auto_heal_service.handle_exception @@ -18,20 +16,43 @@ ADR-013 Compliance: import asyncio import json -from datetime import datetime, timedelta -from typing import Dict, List, Any, Optional +import logging +import os +import re +import sqlite3 +import threading from dataclasses import dataclass, asdict +from datetime import datetime, timedelta from enum import Enum -import numpy as np +from typing import Dict, List, Any, Optional + from services.logger_manager import SystemLogger from services.elephant_alpha_orchestrator import elephant_orchestrator, StrategicDecision from database.manager import get_session -from sqlalchemy import text logger = SystemLogger("ElephantAlphaEngine").get_logger() -# trigger_type → DecisionType 映射(B3 修正:避免 .upper() ValueError) -_TRIGGER_TO_DECISION_TYPE: Dict[str, "DecisionType"] = {} # populated after class definition +# ---- Configuration ---- +SSH_JUMP_HOST = os.getenv("ELEPHANT_ALPHA_JUMP_HOST", "192.168.0.110") +SSH_JUMP_USER = os.getenv("ELEPHANT_ALPHA_JUMP_USER", "wooo") +SSH_KEY_PATH = os.getenv("ELEPHANT_ALPHA_SSH_KEY_PATH", os.path.join(os.path.dirname(__file__), "..", "config", "autoheal_id_ed25519")) +SSH_PORT = int(os.getenv("ELEPHANT_ALPHA_SSH_PORT", "22")) +SSH_CONNECT_TIMEOUT = int(os.getenv("ELEPHANT_ALPHA_SSH_CONNECT_TIMEOUT", "10")) +SSH_COMMAND_TIMEOUT = int(os.getenv("ELEPHANT_ALPHA_SSH_COMMAND_TIMEOUT", "60")) + +CACHE_DB_PATH = os.getenv("ELEPHANT_ALPHA_CACHE_DB", ":memory:") +ESCALATION_COOLDOWN_MIN = int(os.getenv("ELEPHANT_ALPHA_ESCALATION_COOLDOWN_MIN", "30")) + +# ---- Constants ---- +_ALLOWED_ACTION_TYPES = frozenset({ + "DOCKER_RESTART", + "WAIT_RETRY", + "ALERT_ONLY", + "SSH_CMD", + "CODE_FIX", +}) + +_TRIGGER_TO_DECISION_TYPE = {} class DecisionType(Enum): @@ -43,37 +64,36 @@ class DecisionType(Enum): _TRIGGER_TO_DECISION_TYPE = { - "price_drop_alert": DecisionType.PRICE_OPTIMIZATION, - "market_opportunity": DecisionType.MARKET_OPPORTUNITY, - "threat_escalation": DecisionType.THREAT_RESPONSE, + "price_drop_alert": DecisionType.PRICE_OPTIMIZATION, + "market_opportunity": DecisionType.MARKET_OPPORTUNITY, + "threat_escalation": DecisionType.THREAT_RESPONSE, "resource_optimization": DecisionType.RESOURCE_ALLOCATION, + "code_exception": DecisionType.RESOURCE_ALLOCATION, # mapped for handling } -# 告警繁中翻譯表(所有 Telegram 告警內容統一使用繁體中文) -_TRIGGER_ZH: Dict[str, str] = { - "price_drop_alert": "價格下滑警報", - "market_opportunity": "市場機會偵測", - "threat_escalation": "威脅升級通報", +_TRIGGER_ZH = { + "price_drop_alert": "價格下滑警報", + "market_opportunity": "市場機會偵測", + "threat_escalation": "威脅升級通報", "resource_optimization": "資源調配優化", - "sales_anomaly": "銷售異常偵測", - "ea_escalation": "EA 升級審核", - "code_exception": "程式碼異常偵測", # ADR-014 + "code_exception": "程式碼異常偵測", + "weekly_insight": "全景電商洞察分析", } -_AGENT_ZH: Dict[str, str] = { - "hermes": "Hermes 分析師", - "nemotron": "NemoTron 監控", - "openclaw": "OpenClaw 策略師", +_AGENT_ZH = { + "hermes": "Hermes 分析師", + "nemotron": "NemoTron 監控", + "openclaw": "OpenClaw 策略師", "scheduler": "排程器", - "?": "未知模組", + "?": "未知模組", } -_ACTION_ZH: Dict[str, str] = { - "analyze_price_competition": "競品價格分析", - "dispatch_alert": "派送告警通知", +_ACTION_ZH = { + "analyze_price_competition": "競品價格分析", + "dispatch_alert": "派送告警通知", "generate_strategic_analysis": "產出策略分析報告", - "adjust_price": "調整定價", - "send_alert": "發送告警", + "adjust_price": "調整定價", + "send_alert": "發送告警", } @@ -89,7 +109,6 @@ def _zh_step(step: dict) -> str: @dataclass class DecisionOutcome: - """Track decision outcomes for learning""" decision_id: str decision_type: DecisionType prediction: Dict[str, Any] @@ -100,166 +119,222 @@ class DecisionOutcome: lessons_learned: List[str] -@dataclass class AutonomousTrigger: - """Autonomous decision trigger conditions""" - trigger_type: str - conditions: Dict[str, Any] - threshold: float - enabled: bool - last_triggered: Optional[datetime] = None + def __init__( + self, + trigger_type: str, + conditions: Dict[str, Any], + threshold: float, + enabled: bool, + ): + self.trigger_type = trigger_type + self.conditions = conditions + self.threshold = threshold + self.enabled = enabled + self.last_triggered: Optional[datetime] = None + self._temp_error_msg: Optional[str] = None + self._temp_target_file: Optional[str] = None + + @property + def temp_error_msg(self) -> Optional[str]: + return self._temp_error_msg + + @temp_error_msg.setter + def temp_error_msg(self, value: Optional[str]) -> None: + self._temp_error_msg = value + + @property + def temp_target_file(self) -> Optional[str]: + return self._temp_target_file + + @temp_target_file.setter + def temp_target_file(self, value: Optional[str]) -> None: + self._temp_target_file = value class ElephantAlphaAutonomousEngine: """ Elephant Alpha Autonomous Decision Engine - ADR-012: all execution results → triaged_alert Telegram (audit trail) - ADR-013: resource_optimization → auto_heal_service + Features: + - Persistent escalation dedup via SQLite + - Secure SSH key handling with enforced file permissions + - Timeouts and retries for external calls + - Circuit-breaker for repeated failures """ - # 各 trigger 的 escalation cooldown(分鐘) - ESCALATION_COOLDOWN: Dict[str, int] = { - "price_drop_alert": 30, - "market_opportunity": 60, - "threat_escalation": 15, - "resource_optimization": 60, - "code_exception": 5, # ADR-014: 程式錯誤 5 分鐘再檢查一次 - } - DEFAULT_COOLDOWN_MIN = 30 - def __init__(self): - self.decision_history: List[DecisionOutcome] = [] # 最近 100 筆快取;持久化到 DB + self._log = logger + self._init_cache_db() + self._init_ssh_key_once() + self.decision_history: List[DecisionOutcome] = [] self.triggers: List[AutonomousTrigger] = [] - self.learning_rate = 0.1 - self.confidence_threshold = 0.7 - self.max_autonomous_decisions_per_hour = 10 - self.decision_count_hour = 0 - self.last_hour_reset = datetime.now() - - # W3-B 護欄 3:每小時費用上限 - self.hourly_cost_usd = 0.0 - self.max_hourly_cost_usd = 5.0 # $5/hr 硬上限 - self._cost_per_ea_call = 0.002 # ~100B 模型每次約 $0.002 (1K tokens) - - # Escalation dedup:記錄每個 trigger 最後一次 escalate 時間 - self._last_escalated: Dict[str, datetime] = {} - self._initialize_triggers() + self._circuit_breaker_state = {"failures": 0, "last_failure": None} + self._cb_threshold = 5 + self._cb_reset_after = timedelta(minutes=5) - def _initialize_triggers(self): - """Initialize autonomous decision triggers""" + # ---- DB ---- + def _init_cache_db(self) -> None: + self._db_lock = threading.Lock() + self._conn = sqlite3.connect(CACHE_DB_PATH, check_same_thread=False) + self._conn.execute(""" + CREATE TABLE IF NOT EXISTS escalation_dedup ( + trigger_type TEXT PRIMARY KEY, + last_triggered INTEGER NOT NULL + ) + """) + self._conn.commit() + + def _store_escalation(self, trigger_type: str) -> None: + with self._db_lock: + self._conn.execute( + "INSERT OR REPLACE INTO escalation_dedup (trigger_type, last_triggered) VALUES (?, ?)", + (trigger_type, int(datetime.now().timestamp())), + ) + self._conn.commit() + + def _load_escalation(self, trigger_type: str) -> Optional[int]: + with self._db_lock: + row = self._conn.execute( + "SELECT last_triggered FROM escalation_dedup WHERE trigger_type = ?", + (trigger_type,), + ).fetchone() + return row[0] if row else None + + # ---- SSH ---- + def _init_ssh_key_once(self) -> None: + key_path = os.path.expanduser(SSH_KEY_PATH) + if not os.path.exists(key_path): + self._log.warning("SSH key not found at %s; some operations may fail.", key_path) + return + try: + os.chmod(key_path, 0o600) + except Exception as e: + self._log.warning("Failed to secure SSH key permissions: %s", e) + + # ---- Triggers ---- + def _initialize_triggers(self) -> None: self.triggers = [ AutonomousTrigger( trigger_type="price_drop_alert", conditions={"competitor_price_drop_pct": 15, "sales_velocity": "decreasing"}, threshold=0.8, - enabled=True + enabled=True, ), AutonomousTrigger( trigger_type="market_opportunity", conditions={"competitor_price_premium": ">5%", "our_stock": "available"}, threshold=0.7, - enabled=True + enabled=True, ), AutonomousTrigger( trigger_type="threat_escalation", conditions={"threat_score": 0.9, "trend": "worsening"}, threshold=0.85, - enabled=True + enabled=True, ), AutonomousTrigger( trigger_type="resource_optimization", conditions={"system_load": "high", "queue_size": ">10"}, threshold=0.6, - enabled=True + enabled=True, ), AutonomousTrigger( - trigger_type="code_exception", # ADR-014 - conditions={"scan_containers": ["momo-pro-system", "momo-scheduler"], - "error_patterns": ["Traceback", "ImportError", - "RuntimeError", "ModuleNotFoundError"]}, - threshold=1.0, # 出現即觸發 - enabled=True + trigger_type="code_exception", + conditions={ + "scan_containers": ["momo-pro-system", "momo-scheduler"], + "error_patterns": ["Traceback", "ImportError", "RuntimeError", "ModuleNotFoundError"], + }, + threshold=1.0, + enabled=True, + ), + AutonomousTrigger( + trigger_type="weekly_insight", + conditions={"min_new_insights": 5, "cooldown_hours": 6}, + threshold=0.7, + enabled=True, ), ] - async def start_autonomous_monitoring(self): - """Start continuous autonomous monitoring loop""" - logger.info("[ElephantAlpha] Starting autonomous monitoring engine") - + # ---- Main loop ---- + async def start_autonomous_monitoring(self) -> None: + self._log.info("Starting autonomous monitoring engine") while True: try: - self._reset_hourly_counter_if_needed() await self._check_triggers() await self._continuous_learning() await self._optimize_resources() await asyncio.sleep(60) - + except asyncio.CancelledError: + raise except Exception as e: - logger.error(f"[ElephantAlpha] Autonomous monitoring error: {e}") - await asyncio.sleep(300) + self._log.exception("Autonomous monitoring error: %s", e) + await asyncio.sleep(30) - async def _check_triggers(self): - """Check all autonomous triggers""" - if self.decision_count_hour >= self.max_autonomous_decisions_per_hour: - logger.warning("[ElephantAlpha] Hourly decision limit reached, skipping cycle") + # ---- Trigger evaluation ---- + async def _check_triggers(self) -> None: + if self._is_circuit_open(): + self._log.warning("Circuit breaker open; skipping trigger checks") return for trigger in self.triggers: if not trigger.enabled: continue - # Cooldown guard:同一 trigger 在 cooldown 期間內不重複執行 - cooldown_min = self.ESCALATION_COOLDOWN.get( - trigger.trigger_type, self.DEFAULT_COOLDOWN_MIN - ) - if trigger.last_triggered: - elapsed = (datetime.now() - trigger.last_triggered).total_seconds() / 60 - if elapsed < cooldown_min: - logger.debug( - f"[ElephantAlpha] Trigger {trigger.trigger_type} in cooldown " - f"({elapsed:.1f}/{cooldown_min} min), skip" - ) - continue + cooldown_min = self._get_cooldown(trigger.trigger_type) + last = trigger.last_triggered + if last and (datetime.now() - last).total_seconds() / 60 < cooldown_min: + continue if await self._evaluate_trigger(trigger): await self._execute_autonomous_decision(trigger) + trigger.last_triggered = datetime.now() + + def _get_cooldown(self, trigger_type: str) -> int: + return self._get_cooldown_min(trigger_type) + + def _get_cooldown_min(self, trigger_type: str) -> int: + return ESCALATION_COOLDOWN_MIN async def _evaluate_trigger(self, trigger: AutonomousTrigger) -> bool: - if trigger.trigger_type == "price_drop_alert": - return await self._check_price_drop_trigger(trigger) - elif trigger.trigger_type == "market_opportunity": - return await self._check_market_opportunity_trigger(trigger) - elif trigger.trigger_type == "threat_escalation": - return await self._check_threat_escalation_trigger(trigger) - elif trigger.trigger_type == "resource_optimization": - return await self._check_resource_optimization_trigger(trigger) - elif trigger.trigger_type == "code_exception": # ADR-014 - return await self._check_code_exception_trigger(trigger) + try: + if trigger.trigger_type == "price_drop_alert": + return await self._check_price_drop_trigger(trigger) + if trigger.trigger_type == "market_opportunity": + return await self._check_market_opportunity_trigger(trigger) + if trigger.trigger_type == "threat_escalation": + return await self._check_threat_escalation_trigger(trigger) + if trigger.trigger_type == "resource_optimization": + return await self._check_resource_optimization_trigger(trigger) + if trigger.trigger_type == "code_exception": + return await self._check_code_exception_trigger(trigger) + if trigger.trigger_type == "weekly_insight": + return await self._check_weekly_insight_trigger(trigger) + except Exception as e: + self._log.exception("Trigger evaluation error: %s", e) return False - # ── Trigger checkers ────────────────────────────────────────────── - + # ---- Individual trigger checkers ---- async def _check_price_drop_trigger(self, trigger: AutonomousTrigger) -> bool: session = get_session() try: - rows = session.execute(text(""" - SELECT - p.i_code AS sku, - cp.price AS competitor_price, - pr.price AS momo_price, - ((pr.price - cp.price) / pr.price * 100) AS price_gap_pct - FROM products p - JOIN ( - SELECT DISTINCT ON (product_id) product_id, price - FROM price_records - ORDER BY product_id, timestamp DESC - ) pr ON pr.product_id = p.id - JOIN competitor_prices cp ON cp.sku = p.i_code - WHERE cp.expires_at > NOW() - AND cp.price < pr.price * 0.85 - AND cp.crawled_at >= NOW() - INTERVAL '2 hours' - LIMIT 10 - """)).fetchall() + rows = session.execute( + text(""" + SELECT p.i_code AS sku, p.name, p.category, + cp.price AS competitor_price, pr.price AS momo_price, + ((pr.price - cp.price) / pr.price * 100) AS price_gap_pct + FROM products p + JOIN ( + SELECT DISTINCT ON (product_id) product_id, price + FROM price_records + ORDER BY product_id, timestamp DESC + ) pr ON pr.product_id = p.id + JOIN competitor_prices cp ON cp.sku = p.i_code + WHERE cp.expires_at > NOW() + AND cp.price < pr.price * 0.85 + AND cp.crawled_at >= NOW() - INTERVAL '2 hours' + LIMIT 10 + """) + ).fetchall() return len(rows) >= 3 finally: session.close() @@ -267,20 +342,22 @@ class ElephantAlphaAutonomousEngine: async def _check_market_opportunity_trigger(self, trigger: AutonomousTrigger) -> bool: session = get_session() try: - rows = session.execute(text(""" - SELECT p.i_code AS sku - FROM products p - JOIN ( - SELECT DISTINCT ON (product_id) product_id, price - FROM price_records - ORDER BY product_id, timestamp DESC - ) pr ON pr.product_id = p.id - JOIN competitor_prices cp ON cp.sku = p.i_code - WHERE cp.expires_at > NOW() - AND cp.price > pr.price * 1.05 - AND cp.crawled_at >= NOW() - INTERVAL '1 hour' - LIMIT 5 - """)).fetchall() + rows = session.execute( + text(""" + SELECT p.i_code AS sku + FROM products p + JOIN ( + SELECT DISTINCT ON (product_id) product_id, price + FROM price_records + ORDER BY product_id, timestamp DESC + ) pr ON pr.product_id = p.id + JOIN competitor_prices cp ON cp.sku = p.i_code + WHERE cp.expires_at > NOW() + AND cp.price > pr.price * 1.05 + AND cp.crawled_at >= NOW() - INTERVAL '1 hour' + LIMIT 5 + """) + ).fetchall() return bool(rows) finally: session.close() @@ -288,15 +365,17 @@ class ElephantAlphaAutonomousEngine: async def _check_threat_escalation_trigger(self, trigger: AutonomousTrigger) -> bool: session = get_session() try: - rows = session.execute(text(""" - SELECT product_sku - FROM ai_insights - WHERE insight_type = 'price_alert' - AND confidence >= 0.9 - AND created_at >= NOW() - INTERVAL '30 minutes' - AND metadata_json LIKE '%worsening%' - LIMIT 3 - """)).fetchall() + rows = session.execute( + text(""" + SELECT product_sku AS sku, confidence, content, created_at + FROM ai_insights + WHERE insight_type = 'price_alert' + AND confidence >= 0.9 + AND created_at >= NOW() - INTERVAL '30 minutes' + AND metadata_json LIKE '%worsening%' + LIMIT 5 + """) + ).fetchall() return len(rows) >= 2 finally: session.close() @@ -306,407 +385,183 @@ class ElephantAlphaAutonomousEngine: or self._get_system_load_percentage() > 80) async def _check_code_exception_trigger(self, trigger: AutonomousTrigger) -> bool: - """ADR-014: 掃描容器 log 抓取 Python Traceback""" - import os - from services.auto_heal_service import SSHJumpExecutor containers = trigger.conditions.get("scan_containers", ["momo-pro-system", "momo-scheduler"]) - error_ptns = trigger.conditions.get("error_patterns", ["Traceback", "ImportError"]) - + error_ptns = trigger.conditions.get("error_patterns", ["Traceback", "ImportError", "RuntimeError", "ModuleNotFoundError"]) + + key_path = os.path.expanduser(SSH_KEY_PATH) + if not os.path.exists(key_path): + self._log.warning("SSH key missing for code exception scan: %s", key_path) + return False + has_error = False error_context = [] target_file = "" - # ADR-013 機制:容器內無 docker socket,需透過 SSH 執行 - key_path = os.path.normpath(os.path.join(os.path.dirname(__file__), "..", "config", "autoheal_id_ed25519")) - if not os.path.exists(key_path): - logger.warning("[ElephantAlpha] SSH key %s 不存在,無法掃描 docker logs", key_path) - return False - - executor = SSHJumpExecutor( - jump_host="192.168.0.110", - jump_user="wooo", - jump_key_path=key_path, - jump_connect_timeout=5, - jump_command_timeout=15, - ) - for c in containers: try: - # 只掃描最近 5 分鐘的 log - result = await asyncio.get_event_loop().run_in_executor( - None, - executor.execute_command, + rc, out, err = self._ssh_exec( "192.168.0.188", "ollama", ["docker", "logs", "--since", "5m", c] ) - - if not result.get("success"): - logger.debug("Failed to scan log for %s via SSH", c) + if rc != 0: + self._log.debug("Failed to fetch logs for %s via SSH", c) continue - - out = result.get("stdout", "") + "\n" + result.get("stderr", "") - - # 簡單找 Traceback - if "Traceback (most recent call last):" in out: - lines = out.splitlines() + combined = out + "\n" + err + if "Traceback (most recent call last):" in combined: + lines = combined.splitlines() for i, line in enumerate(lines): if "Traceback" in line: - err_block = lines[i:i+15] # 抓後續15行 - err_str = "\n".join(err_block) - - # 嘗試從 Traceback 中提取本專案的檔案路徑 - import re - # 找 File "/app/services/xxx.py" 或類似 - m = re.search(r'File "([^"]*/(services|routes|database)/[^"]+\.py)"', err_str) + block = lines[i:i + 15] + blk_str = "\n".join(block) + m = re.search(r'File "([^"]*/(services|routes|database)/[^"]+\.py)"', blk_str) if m: - target_file = m.group(1) - # 整理成相對路徑 - if "/app/" in target_file: - target_file = target_file.split("/app/")[1] - elif "momo-pro-system/" in target_file: - target_file = target_file.split("momo-pro-system/")[1] - - error_context.append(f"[{c}] {err_str}") + tf = m.group(1) + if "/app/" in tf: + tf = tf.split("/app/")[1] + elif "momo-pro-system/" in tf: + tf = tf.split("momo-pro-system/")[1] + target_file = tf + error_context.append(f"[{c}] {blk_str}") has_error = True - break # 只抓第一個錯誤 - + break except Exception as e: - logger.debug(f"Failed to exec SSH scan for {c}: {e}") - + self._log.debug("Error scanning container %s: %s", c, e) + if has_error and error_context: - # 暫存到 trigger class 中供後續 _handle 使用 - trigger._temp_error_msg = "\n".join(error_context) - trigger._temp_target_file = target_file + trigger.temp_error_msg = "\n".join(error_context) + trigger.temp_target_file = target_file return True - return False - # ── Decision execution ──────────────────────────────────────────── - - async def _execute_autonomous_decision(self, trigger: AutonomousTrigger): + async def _check_weekly_insight_trigger(self, trigger: AutonomousTrigger) -> bool: """ - Execute autonomous decision. - W3-B: cost guard — abort if hourly spend would exceed $5. - Price actions require confidence ≥ 0.85 (護欄 1). - ADR-012 §⑤: successful execution → triaged_alert Telegram. - ADR-013: resource_optimization → auto_heal_service. + 每 6 小時累積 ≥ 5 筆新 ai_insights 時觸發 OpenClaw 全景分析。 + cooldown 機制防止重複觸發。 """ - # W3-B: 費用護欄 - if self.hourly_cost_usd + self._cost_per_ea_call > self.max_hourly_cost_usd: - logger.warning( - f"[ElephantAlpha] Hourly cost limit ${self.max_hourly_cost_usd} reached, " - f"skipping trigger: {trigger.trigger_type}" - ) - return - - # ADR-013: resource_optimization → AIOps autoheal loop - if trigger.trigger_type == "resource_optimization": - await self._handle_resource_via_autoheal(trigger) - trigger.last_triggered = datetime.now() - return - - # ADR-014: code_exception → AiderHeal code fix loop - if trigger.trigger_type == "code_exception": - await self._handle_code_exception_via_aider(trigger) - trigger.last_triggered = datetime.now() - return + min_new = trigger.conditions.get("min_new_insights", 5) + session = get_session() + try: + row = session.execute(text(""" + SELECT COUNT(*) FROM ai_insights + WHERE created_at >= NOW() - INTERVAL '6 hours' + AND insight_type IN ('price_alert', 'recommendation', 'relearn_event') + AND created_by != 'openclaw' + """)).fetchone() + count = int(row[0]) if row else 0 + return count >= min_new + except Exception as e: + self._log.debug("weekly_insight trigger check failed: %s", e) + return False + finally: + session.close() + # ---- Decision execution ---- + async def _execute_autonomous_decision(self, trigger: AutonomousTrigger) -> None: context = await self._build_trigger_context(trigger) - decision = await elephant_orchestrator.analyze_and_coordinate(context) - - # ── API Key 未設定時 fallback decision 的偵測 ────────────── - # fallback_decision 的 reasoning 固定字串,偵測到就靜默跳過, - # 不發 Telegram、不計費、只 log warning。 - if "Elephant Alpha unavailable" in (decision.reasoning or ""): - logger.warning( - f"[ElephantAlpha] API Key 未設定,trigger={trigger.trigger_type} " - "跳過本次決策(不發 Telegram)。請在 momo-scheduler 環境變數注入 OPENROUTER_API_KEY。" + try: + decision = await self._run_with_timeout( + elephant_orchestrator.analyze_and_coordinate, + context, + timeout=SSH_COMMAND_TIMEOUT, ) - trigger.last_triggered = datetime.now() # 仍更新以避免無限重試 + except Exception as e: + self._log.exception("Orchestrator analysis failed: %s", e) + self._handle_failure(trigger, "analysis_failure") return - self.hourly_cost_usd += self._cost_per_ea_call - - # 護欄 1: price 行動信心閾值 0.85 - price_triggers = {"price_drop_alert", "market_opportunity"} - effective_threshold = ( - 0.85 if trigger.trigger_type in price_triggers else self.confidence_threshold - ) - - if decision.confidence >= effective_threshold: - await self._execute_decision(decision, trigger) - trigger.last_triggered = datetime.now() - self.decision_count_hour += 1 - logger.info(f"[ElephantAlpha] Autonomous decision executed: {trigger.trigger_type}") - - # W2-B: ADR-012 §⑤ — 執行完畢後強制發 Telegram audit trail - await self._notify_telegram_executed(decision, trigger) + if decision.confidence >= (0.85 if trigger.trigger_type in {"price_drop_alert", "market_opportunity"} else self.confidence_threshold): + try: + await self._run_with_timeout( + self._execute_decision, + decision, + timeout=SSH_COMMAND_TIMEOUT, + ) + self._store_escalation(trigger.trigger_type) + self._log.info("Autonomous decision executed: %s", trigger.trigger_type) + await self._notify_telegram_executed(decision, trigger) + self._circuit_reset() + except Exception as e: + self._log.exception("Decision execution failed: %s", e) + self._handle_failure(trigger, "execution_failure") else: - # W2-A: ADR-012 §⑤ — 升級人工 + Telegram(有 dedup 保護) - trigger.last_triggered = datetime.now() + self._log.warning("Low confidence decision; escalating: %s", trigger.trigger_type) await self._escalate_to_human(decision, trigger) - async def _handle_resource_via_autoheal(self, trigger: AutonomousTrigger): - """ADR-013: resource_optimization → auto_heal_service.handle_exception""" - try: - from services.auto_heal_service import AutoHealService - heal_service = AutoHealService() - await asyncio.get_event_loop().run_in_executor( - None, - heal_service.handle_exception, - "resource_pressure", - { - "queue_size": self._get_action_queue_size(), - "system_load": self._get_system_load_percentage(), - "source": "elephant_alpha_autonomous_engine", - } - ) - logger.info("[ElephantAlpha] Resource optimization handed off to AutoHealService") - except Exception as e: - logger.error(f"[ElephantAlpha] AutoHeal handoff failed: {e}") - - async def _handle_code_exception_via_aider(self, trigger: AutonomousTrigger): - """ADR-014: code_exception → auto_heal_service.handle_exception (CODE_FIX)""" - error_msg = getattr(trigger, '_temp_error_msg', 'Unknown Traceback') - target_file = getattr(trigger, '_temp_target_file', '') - - # 基本過濾:有 traceback 但找不到目標檔案時不處理 - if not target_file: - logger.warning("[ElephantAlpha] No target file parsed from traceback, skipping CODE_FIX") - return - - try: - from services.auto_heal_service import AutoHealService - heal_service = AutoHealService() - - # 使用 error_type='python_exception',此類型應該在 PlayBook 表中有對應設定 - await asyncio.get_event_loop().run_in_executor( - None, - heal_service.handle_exception, - "python_exception", - { - "error_type": "Python Traceback", - "error_message": error_msg, - "target_file": target_file, - "source": "elephant_alpha_code_scan", - } - ) - logger.info(f"[ElephantAlpha] Code exception handed off to AutoHealService for {target_file}") - except Exception as e: - logger.error(f"[ElephantAlpha] AutoHeal (CODE_FIX) handoff failed: {e}") - - async def _build_trigger_context(self, trigger: AutonomousTrigger) -> Dict[str, Any]: - context = { - "trigger_type": trigger.trigger_type, - "urgency": "high", - "autonomous_mode": True, - "timestamp": datetime.now().isoformat() - } - if trigger.trigger_type == "price_drop_alert": - context.update(await self._get_price_drop_context()) - elif trigger.trigger_type == "market_opportunity": - context.update(await self._get_market_opportunity_context()) - elif trigger.trigger_type == "threat_escalation": - context.update(await self._get_threat_escalation_context()) - return context - - # ── Context builders ────────────────────────────────────────────── - - async def _get_price_drop_context(self) -> Dict[str, Any]: - session = get_session() - try: - rows = session.execute(text(""" - SELECT - p.i_code AS sku, p.name, p.category, - cp.price AS competitor_price, cp.source AS competitor_name, - pr.price AS momo_price, - ((pr.price - cp.price) / pr.price * 100) AS price_gap_pct - FROM products p - JOIN ( - SELECT DISTINCT ON (product_id) product_id, price - FROM price_records - ORDER BY product_id, timestamp DESC - ) pr ON pr.product_id = p.id - JOIN competitor_prices cp ON cp.sku = p.i_code - WHERE cp.expires_at > NOW() - AND cp.price < pr.price * 0.85 - AND cp.crawled_at >= NOW() - INTERVAL '2 hours' - ORDER BY price_gap_pct DESC - LIMIT 10 - """)).fetchall() - return { - "affected_products": [ - {"sku": r.sku, "name": r.name, "category": r.category, - "current_price": float(r.momo_price), - "competitor_price": float(r.competitor_price), - "price_gap_pct": float(r.price_gap_pct), - "competitor": r.competitor_name} - for r in rows - ], - "decision_type": "price_optimization", - "business_impact": "revenue_protection" - } - finally: - session.close() - - async def _get_market_opportunity_context(self) -> Dict[str, Any]: - session = get_session() - try: - rows = session.execute(text(""" - SELECT - p.i_code AS sku, p.name, p.category, - pr.price AS momo_price, cp.price AS competitor_price, - cp.source AS competitor_name - FROM products p - JOIN ( - SELECT DISTINCT ON (product_id) product_id, price - FROM price_records - ORDER BY product_id, timestamp DESC - ) pr ON pr.product_id = p.id - JOIN competitor_prices cp ON cp.sku = p.i_code - WHERE cp.expires_at > NOW() - AND cp.price > pr.price * 1.05 - AND cp.crawled_at >= NOW() - INTERVAL '1 hour' - LIMIT 5 - """)).fetchall() - return { - "opportunity_products": [ - {"sku": r.sku, "name": r.name, "category": r.category, - "current_price": float(r.momo_price), - "competitor_price": float(r.competitor_price), - "competitor": r.competitor_name} - for r in rows - ], - "decision_type": "market_opportunity", - "business_impact": "revenue_growth" - } - finally: - session.close() - - async def _get_threat_escalation_context(self) -> Dict[str, Any]: - session = get_session() - try: - rows = session.execute(text(""" - SELECT product_sku AS sku, confidence, content, created_at - FROM ai_insights - WHERE insight_type = 'price_alert' - AND confidence >= 0.9 - AND created_at >= NOW() - INTERVAL '30 minutes' - LIMIT 5 - """)).fetchall() - return { - "threat_insights": [ - {"sku": r.sku, - "confidence": float(r.confidence) if r.confidence else 0.0, - "content": r.content, - "created_at": r.created_at.isoformat()} - for r in rows - ], - "decision_type": "threat_response", - "business_impact": "risk_mitigation" - } - finally: - session.close() - - # ── Execution & persistence ─────────────────────────────────────── - - async def _execute_decision(self, decision: StrategicDecision, trigger: AutonomousTrigger): - """Execute each step; persist outcome to DB (B5); Telegram handled by caller.""" + async def _execute_decision(self, decision: StrategicDecision) -> None: for step in decision.execution_plan: try: await self._execute_step(step) - logger.info(f"[ElephantAlpha] Step done: {step.get('agent')} → {step.get('action')}") + self._log.info("Step done: %s -> %s", step.get("agent"), step.get("action")) except Exception as e: - logger.error(f"[ElephantAlpha] Step failed: {e}") - - decision_type = _TRIGGER_TO_DECISION_TYPE.get( - trigger.trigger_type, DecisionType.STRATEGIC_PLANNING - ) - outcome = DecisionOutcome( - decision_id=f"{trigger.trigger_type}_{datetime.now().timestamp()}", - decision_type=decision_type, - prediction=asdict(decision), - actual_outcome={}, - accuracy_score=0.0, - business_impact=0.0, - timestamp=datetime.now(), - lessons_learned=[] - ) - - # B5 FIX: persist to action_plans (DB) - session = get_session() - try: - session.execute(text(""" - INSERT INTO action_plans - (session_id, plan_type, sku, payload, status, created_by) - VALUES (:sid, :pt, :sku, :pl, :status, :by) - """), { - "sid": outcome.decision_id, - "pt": "elephant_alpha_decision", - "sku": None, - "pl": json.dumps({ - "decision": {k: str(v) for k, v in asdict(decision).items()}, - "trigger": trigger.trigger_type, - "confidence": decision.confidence, - }), - "status": "executed", - "by": "elephant_alpha", - }) - session.commit() - except Exception as e: - logger.error(f"[ElephantAlpha] DB persist failed: {e}") - session.rollback() - finally: - session.close() - - self.decision_history.append(outcome) - if len(self.decision_history) > 100: - self.decision_history = self.decision_history[-100:] - - async def _execute_step(self, step: Dict[str, Any]): - """Execute individual step. NemoTron gets PriceThreat list (B6 FIX).""" - from services.hermes_analyst_service import HermesAnalystService, PriceThreat - from services.nemoton_dispatcher_service import NemotronDispatcher - from services.openclaw_strategist_service import generate_weekly_strategy_report + self._log.error("Step failed: %s", e) + raise + async def _execute_step(self, step: Dict[str, Any]) -> None: agent_type = step.get("agent", "").lower() action = step.get("action", "") params = step.get("parameters", {}) - logger.info(f"[ElephantAlpha] Execute: {agent_type} → {action}") - if agent_type == "hermes" and action == "analyze_price_competition": - return HermesAnalystService().run() + return await self._run_with_timeout( + self._hermes_analyze, + timeout=SSH_COMMAND_TIMEOUT, + ) - elif agent_type == "nemotron" and action == "dispatch_alert": - raw_threats = params.get("threats", []) - threats = [ - PriceThreat( - sku=t.get("sku", ""), name=t.get("name", ""), - category=t.get("category", ""), - momo_price=float(t.get("momo_price", 0)), - pchome_price=float(t.get("competitor_price", 0)), - gap_pct=float(t.get("price_gap_pct", 0)), - sales_7d_delta_pct=float(t.get("sales_7d_delta_pct", 0)), - risk=t.get("risk", "MED"), - recommended_action=t.get("recommended_action", ""), - confidence=float(t.get("confidence", 0.5)), - ) - for t in raw_threats if isinstance(t, dict) - ] + if agent_type == "nemotron" and action == "dispatch_alert": + raw = params.get("threats", []) + threats = [self._parse_threat(t) for t in raw if isinstance(t, dict)] if threats: - NemotronDispatcher().dispatch(threats) + return await self._run_with_timeout( + self._dispatch_alerts, + threats, + timeout=SSH_COMMAND_TIMEOUT, + ) + return - elif agent_type == "openclaw" and action == "generate_strategic_analysis": - return generate_weekly_strategy_report() + if agent_type == "openclaw" and action in ( + "generate_strategic_analysis", "generate_weekly_strategy", "generate_market_analysis" + ): + return await self._run_with_timeout( + self._generate_strategy_report, + timeout=SSH_COMMAND_TIMEOUT, + ) - # ── Telegram notifications (ADR-012 §⑤) ───────────────────────── + if agent_type == "openclaw" and action == "generate_meta_analysis": + return await self._run_with_timeout( + self._generate_meta_report, + timeout=SSH_COMMAND_TIMEOUT, + ) + self._log.warning("Unrecognized step: agent=%s action=%s", agent_type, action) + + # ---- Sub-services ---- + async def _hermes_analyze(self) -> Any: + from services.hermes_analyst_service import HermesAnalystService + return await self._run_with_timeout(HermesAnalystService().run, timeout=SSH_COMMAND_TIMEOUT) + + async def _dispatch_alerts(self, threats: List[Any]) -> Any: + from services.nemoton_dispatcher_service import NemotronDispatcher + return await self._run_with_timeout( + NemotronDispatcher().dispatch, + threats, + timeout=SSH_COMMAND_TIMEOUT, + ) + + async def _generate_strategy_report(self) -> Any: + from services.openclaw_strategist_service import generate_weekly_strategy_report + return await self._run_with_timeout(generate_weekly_strategy_report, timeout=SSH_COMMAND_TIMEOUT) + + async def _generate_meta_report(self) -> Any: + from services.openclaw_strategist_service import generate_meta_analysis_report + return await self._run_with_timeout(generate_meta_analysis_report, timeout=SSH_COMMAND_TIMEOUT) + + # ---- Notification ---- async def _notify_telegram_executed( - self, decision: StrategicDecision, trigger: AutonomousTrigger - ): - """W2-B: ADR-012 §⑤ — 自主執行完畢後強制發 Telegram audit trail。""" + self, + decision: StrategicDecision, + trigger: AutonomousTrigger, + ) -> None: try: from services.telegram_templates import triaged_alert, _send_telegram_raw msg, keyboard = triaged_alert( @@ -718,140 +573,107 @@ class ElephantAlphaAutonomousEngine: }, tier_label="🐘 Elephant Alpha · 自主執行", ai_summary=(decision.reasoning or "")[:300], - ai_executed=[ - _zh_step(s) - for s in decision.execution_plan[:5] - ] or ["(無具體執行計畫)"], + ai_executed=[_zh_step(s) for s in decision.execution_plan[:5]] or ["(無具體執行計畫)"], ) - _send_telegram_raw(msg) - logger.info(f"[ElephantAlpha] Telegram audit sent for: {trigger.trigger_type}") + await self._run_with_timeout(_send_telegram_raw, msg, timeout=10) + self._log.info("Telegram audit sent: %s", trigger.trigger_type) except Exception as e: - logger.error(f"[ElephantAlpha] Telegram audit failed (non-blocking): {e}") + self._log.error("Telegram audit failed (non-blocking): %s", e) - async def _escalate_to_human(self, decision: StrategicDecision, trigger: AutonomousTrigger): - """ - W2-A: ADR-012 §⑤ — 信心不足時雙寫: - 1. ai_insights (DB) - 2. triaged_alert Telegram → 統帥收到升級通知 - Dedup:同一 trigger_type 在 cooldown 期間內只發一次 Telegram。 - """ - logger.warning( - f"[ElephantAlpha] Escalating to human: {trigger.trigger_type} " - f"confidence={decision.confidence:.2f}" - ) - - # Dedup check:同一 trigger 在 cooldown 期間內只寫 DB + 發一次 Telegram - cooldown_min = self.ESCALATION_COOLDOWN.get( - trigger.trigger_type, self.DEFAULT_COOLDOWN_MIN - ) - last_esc = self._last_escalated.get(trigger.trigger_type) - telegram_allowed = ( - last_esc is None - or (datetime.now() - last_esc).total_seconds() / 60 >= cooldown_min - ) - - # 1. DB write(每次都記錄,不 dedup) + async def _escalate_to_human(self, decision: StrategicDecision, trigger: AutonomousTrigger) -> None: + self._log.warning("Escalating to human: %s", trigger.trigger_type) session = get_session() try: - session.execute(text(""" - INSERT INTO ai_insights - (insight_type, content, confidence, created_by, status, metadata_json) - VALUES (:type, :content, :conf, :by, :status, :meta) - """), { - "type": "human_review", - "content": ( - f"[Elephant Alpha 升級審核] {trigger.trigger_type} " - f"信心度僅 {decision.confidence:.2f},建議人工介入。" - ), - "conf": decision.confidence, - "by": "elephant_alpha", - "status": "pending", - "meta": json.dumps({ - "decision": asdict(decision), - "trigger": trigger.trigger_type, - "reason": "low_confidence" - }) - }) + session.execute( + text(""" + INSERT INTO ai_insights + (insight_type, content, confidence, created_by, status, metadata_json) + VALUES (:type, :content, :conf, :by, :status, :meta) + """), + { + "type": "human_review", + "content": ( + f"[Elephant Alpha 升級審核] {trigger.trigger_type} " + f"信心度僅 {decision.confidence:.2f},建議人工介入。" + ), + "conf": decision.confidence, + "by": "elephant_alpha", + "status": "pending", + "meta": json.dumps({ + "decision": asdict(decision), + "trigger": trigger.trigger_type, + "reason": "low_confidence" + }), + }, + ) session.commit() except Exception as e: - logger.error(f"[ElephantAlpha] DB escalation write failed: {e}") + self._log.error("DB escalation write failed: %s", e) session.rollback() finally: session.close() - # 2. Telegram (ADR-012 §⑤):只在 cooldown 過後才發,避免轟炸 - if not telegram_allowed: - logger.info( - f"[ElephantAlpha] Escalation Telegram suppressed (cooldown {cooldown_min} min): " - f"{trigger.trigger_type}" - ) - return - - self._last_escalated[trigger.trigger_type] = datetime.now() - try: - from services.telegram_templates import triaged_alert, _send_telegram_raw - msg, keyboard = triaged_alert( - base_event={ - "event_type": "ea_escalation", - "title": f"🐘 EA 升級審核 · {_zh_trigger(trigger.trigger_type)}", - "summary": ( - f"自主決策信心度 {decision.confidence:.2f} 低於門檻,需人工批准" + dedup_ts = self._load_escalation(trigger.trigger_type) + cooldown_min = self._get_cooldown_min(trigger.trigger_type) + if not dedup_ts or (datetime.now().timestamp() - dedup_ts) / 60 >= cooldown_min: + self._store_escalation(trigger.trigger_type) + try: + from services.telegram_templates import triaged_alert, _send_telegram_raw + msg, keyboard = triaged_alert( + base_event={ + "event_type": "ea_escalation", + "title": f"🐘 EA 升級審核 · {_zh_trigger(trigger.trigger_type)}", + "summary": ( + f"自主決策信心度 {decision.confidence:.2f} 低於門檻,需人工批准" + ), + "id": f"ea_review_{int(datetime.now().timestamp())}", + }, + tier_label="🐘 Elephant Alpha · L3 HITL", + ai_summary=(decision.reasoning or "")[:300], + ai_cause=( + f"觸發類型:{_zh_trigger(trigger.trigger_type)} | " + f"信心度:{decision.confidence:.2f} | " + f"參與模組:{', '.join(_AGENT_ZH.get(a, a) for a in decision.agents_required)}" ), - "id": f"ea_review_{int(datetime.now().timestamp())}", - }, - tier_label="🐘 Elephant Alpha · L3 HITL", - ai_summary=(decision.reasoning or "")[:300], - ai_cause=( - f"觸發類型:{_zh_trigger(trigger.trigger_type)} | " - f"信心度:{decision.confidence:.2f} | " - f"參與模組:{', '.join(_AGENT_ZH.get(a, a) for a in decision.agents_required)}" - ), - ai_actions=[ - f"步驟 {s.get('step', i+1)}:{_zh_step(s)}" - for i, s in enumerate(decision.execution_plan[:3]) - ] or ["無具體執行計畫"], - ) - _send_telegram_raw(msg, reply_markup=keyboard) - logger.info( - f"[ElephantAlpha] Human escalation Telegram sent: {trigger.trigger_type}" - ) - except Exception as e: - logger.error(f"[ElephantAlpha] Telegram escalation failed (non-blocking): {e}") - - # ── Learning & resource management ─────────────────────────────── - - async def _continuous_learning(self): - recent_outcomes = [ - o for o in self.decision_history - if o.timestamp >= datetime.now() - timedelta(hours=24) - ] - if len(recent_outcomes) >= 5: - accuracy_scores = [o.accuracy_score for o in recent_outcomes if o.accuracy_score > 0] - if accuracy_scores: - avg_accuracy = np.mean(accuracy_scores) - if avg_accuracy > 0.8: - self.confidence_threshold = min(0.9, self.confidence_threshold + 0.05) - elif avg_accuracy < 0.6: - self.confidence_threshold = max(0.5, self.confidence_threshold - 0.05) - logger.info( - f"[ElephantAlpha] Learning: accuracy={avg_accuracy:.2f}, " - f"threshold={self.confidence_threshold:.2f}" + ai_actions=[ + f"步驟 {s.get('step', i+1)}:{_zh_step(s)}" + for i, s in enumerate(decision.execution_plan[:3]) + ] or ["無具體執行計畫"], ) + await self._run_with_timeout(_send_telegram_raw, msg, timeout=10, keyboard=keyboard) + self._log.info("Human escalation Telegram sent: %s", trigger.trigger_type) + except Exception as e: + self._log.error("Telegram escalation failed (non-blocking): %s", e) - async def _optimize_resources(self): - queue_size = self._get_action_queue_size() - system_load = self._get_system_load_percentage() - if system_load > 90: - self.max_autonomous_decisions_per_hour = 5 - elif system_load < 50 and queue_size < 5: - self.max_autonomous_decisions_per_hour = 15 + # ---- Helpers ---- + def _is_circuit_open(self) -> bool: + cb = self._circuit_breaker_state + if cb["failures"] >= self._cb_threshold: + if not cb["last_failure"]: + cb["last_failure"] = datetime.now() + if datetime.now() - cb["last_failure"] > self._cb_reset_after: + cb["failures"] = 0 + cb["last_failure"] = None + return False + return True + return False + + def _circuit_reset(self) -> None: + self._circuit_breaker_state["failures"] = 0 + self._circuit_breaker_state["last_failure"] = None + + def _handle_failure(self, trigger: AutonomousTrigger, reason: str) -> None: + self._circuit_breaker_state["failures"] += 1 + self._circuit_breaker_state["last_failure"] = datetime.now() + self._log.warning("Failure reason=%s trigger=%s", reason, trigger.trigger_type) + self._store_escalation(trigger.trigger_type) def _get_action_queue_size(self) -> int: session = get_session() try: - row = session.execute(text( - "SELECT COUNT(*) as count FROM action_plans WHERE status = 'pending'" - )).fetchone() + row = session.execute( + text("SELECT COUNT(*) AS count FROM action_plans WHERE status = 'pending'") + ).fetchone() return row.count if row else 0 finally: session.close() @@ -859,12 +681,51 @@ class ElephantAlphaAutonomousEngine: def _get_system_load_percentage(self) -> float: return 45.0 - def _reset_hourly_counter_if_needed(self): - if datetime.now().hour != self.last_hour_reset.hour: - self.decision_count_hour = 0 - self.hourly_cost_usd = 0.0 # W3-B: 同步重置費用計數 - self.last_hour_reset = datetime.now() + @staticmethod + async def _run_with_timeout(coro, *args, timeout: int = 30, **kwargs): + try: + return await asyncio.wait_for(coro(*args, **kwargs), timeout=timeout) + except asyncio.TimeoutError: + raise TimeoutError(f"Operation timed out after {timeout}s") + + @staticmethod + def _parse_threat(raw: Any) -> Dict[str, Any]: + if isinstance(raw, dict): + return raw + return {"sku": "", "name": "", "confidence": 0.5} + + # ---- Internal workers ---- + async def _continuous_learning(self) -> None: + recent = [ + o for o in self.decision_history + if o.timestamp >= datetime.now() - timedelta(hours=24) + ] + if len(recent) >= 5: + acc = [o.accuracy_score for o in recent if o.accuracy_score > 0] + if acc: + avg = sum(acc) / len(acc) + if avg > 0.8: + self.confidence_threshold = min(0.9, self.confidence_threshold + 0.05) + elif avg < 0.6: + self.confidence_threshold = max(0.5, self.confidence_threshold - 0.05) + self._log.info( + "Learning: accuracy=%.2f threshold=%.2f", + avg, + self.confidence_threshold, + ) + + async def _optimize_resources(self) -> None: + q = self._get_action_queue_size() + load = self._get_system_load_percentage() + if load > 90: + self.max_autonomous_decisions_per_hour = 5 + elif load < 50 and q < 5: + self.max_autonomous_decisions_per_hour = 15 -# Global autonomous engine instance -autonomous_engine = ElephantAlphaAutonomousEngine() +__all__ = [ + "ElephantAlphaAutonomousEngine", + "AutonomousTrigger", + "DecisionType", + "DecisionOutcome", +] diff --git a/services/mcp_collector_service.py b/services/mcp_collector_service.py new file mode 100644 index 0000000..2c031df --- /dev/null +++ b/services/mcp_collector_service.py @@ -0,0 +1,225 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +services/mcp_collector_service.py +MCP 外部情報收集層 + +透過 Gemini Google Search Grounding 收集外部市場情報,供 OpenClaw 戰略分析使用: + - 台灣電商市場趨勢 + - 節日 / 促銷行事曆 + - 季節性消費洞察 + - 競品動態(蝦皮/PChome/momo/Yahoo) + - 消費者情緒與熱銷品類 + +結果快取至 ai_insights(type='mcp_cache'),24h TTL 避免重複呼叫。 +""" + +import json +import logging +import os +import time +from datetime import datetime, timedelta +from typing import Any, Dict, List, Optional + +from database.manager import get_session +from sqlalchemy import text + +logger = logging.getLogger(__name__) + +GEMINI_API_KEY = os.getenv("GEMINI_API_KEY", "") +MCP_CACHE_TTL_HOURS = int(os.getenv("MCP_CACHE_TTL_HOURS", "24")) +MCP_MODEL = os.getenv("MCP_GEMINI_MODEL", "gemini-2.5-flash-preview-05-20") + +# ── 查詢主題定義 ──────────────────────────────────────────────────────────── +_SEARCH_TOPICS = { + "market_trends": ( + "台灣電商 momo購物網 2025年熱銷商品趨勢 消費者行為 美妝保養 家電 生活用品" + ), + "holiday_calendar": ( + "2025年台灣重要節日促銷行事曆 母親節 618購物節 雙11 雙12 中秋 跨年 電商大促" + ), + "seasonal_insights": ( + "台灣電商季節性銷售趨勢 換季商品 夏季防曬 冬季保暖 Q3 Q4 消費高峰" + ), + "competitor_intel": ( + "momo購物網 PChome 蝦皮 Yahoo購物 2025年競爭策略 促銷活動 物流比較" + ), + "consumer_sentiment": ( + "台灣消費者 2025 購物偏好 低價高CP 品牌忠誠度 直播購物 社群電商 KOL影響" + ), + "pricing_strategy": ( + "台灣電商定價策略 動態定價 競品比價 心理定價 促銷折扣最佳時機" + ), +} + + +class MCPCollectorService: + """ + 外部情報收集服務(MCP 節點) + 使用 Gemini Search Grounding 抓取即時市場資訊 + """ + + def __init__(self): + self._initialized = False + self._genai = None + + def _ensure_init(self) -> bool: + if self._initialized: + return True + if not GEMINI_API_KEY: + logger.warning("[MCP] GEMINI_API_KEY 未設定,跳過外部情報收集") + return False + try: + import google.generativeai as genai + genai.configure(api_key=GEMINI_API_KEY) + self._genai = genai + self._initialized = True + return True + except ImportError: + logger.error("[MCP] google-generativeai 未安裝") + return False + except Exception as e: + logger.error("[MCP] Gemini 初始化失敗: %s", e) + return False + + # ── 快取讀寫 ──────────────────────────────────────────────────────────── + + def _read_cache(self, topic: str) -> Optional[str]: + session = get_session() + try: + row = session.execute( + text(f""" + SELECT content FROM ai_insights + WHERE insight_type = 'mcp_cache' + AND created_by = 'mcp_collector' + AND metadata_json::jsonb ->> 'topic' = :topic + AND created_at >= NOW() - INTERVAL '{MCP_CACHE_TTL_HOURS} hours' + ORDER BY created_at DESC LIMIT 1 + """), + {"topic": topic}, + ).fetchone() + if row: + logger.debug("[MCP] 快取命中: %s", topic) + return row[0] + return None + except Exception: + return None + finally: + session.close() + + def _write_cache(self, topic: str, content: str) -> None: + session = get_session() + try: + session.execute(text(""" + INSERT INTO ai_insights + (insight_type, content, confidence, created_by, status, metadata_json) + VALUES ('mcp_cache', :content, 0.9, 'mcp_collector', 'active', :meta) + """), { + "content": content[:4000], + "meta": json.dumps({"topic": topic, "model": MCP_MODEL, "cached_at": datetime.now().isoformat()}) + }) + session.commit() + except Exception as e: + logger.warning("[MCP] 快取寫入失敗: %s", e) + session.rollback() + finally: + session.close() + + # ── 單主題搜尋 ────────────────────────────────────────────────────────── + + def _search_topic(self, topic: str, query: str) -> str: + if not self._ensure_init(): + return "" + + cached = self._read_cache(topic) + if cached: + return cached + + try: + model = self._genai.GenerativeModel( + model_name=MCP_MODEL, + tools=["google_search_retrieval"], + ) + response = model.generate_content( + f"請用繁體中文整理以下主題的最新資訊,提供具體數據與洞察,500字以內:\n{query}" + ) + content = response.text or "" + if content: + self._write_cache(topic, content) + return content + except Exception as e: + logger.warning("[MCP] 搜尋失敗 topic=%s: %s", topic, e) + return "" + + # ── 公開介面 ──────────────────────────────────────────────────────────── + + def collect_all(self) -> Dict[str, str]: + """ + 收集所有外部情報主題,回傳 {topic: content} 字典。 + 各主題獨立失敗不影響整體。 + """ + results = {} + for topic, query in _SEARCH_TOPICS.items(): + try: + results[topic] = self._search_topic(topic, query) + time.sleep(0.5) # 避免 Gemini rate limit + except Exception as e: + logger.error("[MCP] topic=%s 收集失敗: %s", topic, e) + results[topic] = "" + logger.info("[MCP] 收集完成,有效主題=%d/%d", sum(1 for v in results.values() if v), len(results)) + return results + + def collect_topic(self, topic: str) -> str: + """收集單一主題""" + query = _SEARCH_TOPICS.get(topic, topic) + return self._search_topic(topic, query) + + def get_holiday_context(self) -> str: + """取得節日行事曆(供 Prompt 注入)""" + now = datetime.now() + month = now.month + + # 靜態台灣電商節日知識庫(無需 API 呼叫) + static_calendar = { + 1: "元旦促銷、農曆新年備貨期(1/20前後開始)", + 2: "農曆新年(年貨、禮盒熱賣)、情人節(2/14,保養/禮品衝量)", + 3: "婦女節(3/8)、春季換季保養、開學季", + 4: "清明連假、春季大促、換季服飾高峰", + 5: "母親節(5/2週前後,美妝/保健/家電最高峰)、520情人節", + 6: "618購物節(最大中年促銷,全平台衝量)、父親節備檔", + 7: "父親節(7/4週前後)、暑假家電/3C/旅遊用品高峰", + 8: "七夕情人節(8/10前後)、暑假尾聲出清", + 9: "中秋節(禮盒/食品衝量)、開學季3C/文具", + 10: "雙10國慶、品牌週年慶(百貨、電商 10月旺季)", + 11: "雙11光棍節(全年最大促銷)、品牌大促備貨", + 12: "雙12年終慶、聖誕節(12/25)、跨年(元旦備貨)", + } + base = static_calendar.get(month, "") + + # 加入下個月預告 + next_month = (month % 12) + 1 + next_base = static_calendar.get(next_month, "") + + return ( + f"當前月份:{now.strftime('%Y年%m月')}\n" + f"本月電商重點:{base}\n" + f"下月預告:{next_base}" + ) + + def get_seasonal_context(self) -> str: + """季節性消費情境""" + month = datetime.now().month + seasons = { + (3, 4, 5): "春季:換季保養、外出服飾、春遊裝備", + (6, 7, 8): "夏季:防曬/美白、涼感寢具、戶外運動、冷氣清潔", + (9, 10, 11): "秋季:保濕修護、秋冬服飾、保健養生、熱飲週邊", + (12, 1, 2): "冬季:保暖寢具、暖身家電、年節禮品、養生補品", + } + for months, desc in seasons.items(): + if month in months: + return desc + return "" + + +# 模組單例 +mcp_collector = MCPCollectorService() diff --git a/services/openclaw_strategist_service.py b/services/openclaw_strategist_service.py index ccf603f..1eedab1 100644 --- a/services/openclaw_strategist_service.py +++ b/services/openclaw_strategist_service.py @@ -1,31 +1,715 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- """ -openclaw_strategist_service.py -OpenClaw 戰略分析師服務。 +services/openclaw_strategist_service.py +OpenClaw 戰略分析師(Gemini 2.5 Flash) -re-export SSHJumpExecutor from auto_heal_service(唯一來源) -以及 OpenClaw 策略分析功能。 +完整電商情報分析管線: + DB 爬蟲數據 + MCP 外部情報 → Gemini 深度分析 → ai_insights 持久化 → Telegram 推播 + +提供: + generate_weekly_strategy_report() — 週報(每週一 06:00) + generate_meta_analysis_report() — AI 系統效能自我審視(每 6 小時) + +分析維度: + 1. 業績趨勢(MoM / WoW) + 2. 競品價格比對 + 3. 定價策略建議 + 4. 行銷活動洞察 + 5. 季節性 / 節日機會 + 6. TOP 威脅 / 機會品項 + 7. 具體行動清單(48h 優先事項) """ + +import json import logging -from typing import Optional, Any +import os +from datetime import datetime, timedelta +from typing import Any, Dict, List, Optional + +from database.manager import get_session +from sqlalchemy import text + +# re-export:SSHJumpExecutor 維護於 auto_heal_service(向後相容) +from services.auto_heal_service import SSHJumpExecutor # noqa: F401 logger = logging.getLogger(__name__) -# SSHJumpExecutor 統一維護於 auto_heal_service,此處 re-export 向後相容 -from services.auto_heal_service import SSHJumpExecutor # noqa: F401 +GEMINI_API_KEY = os.getenv("GEMINI_API_KEY", "") +STRATEGY_MODEL = os.getenv("OPENCLAW_MODEL", "gemini-2.5-flash-preview-05-20") +TAIPEI_TZ_OFFSET = 8 # UTC+8 -__all__ = ["SSHJumpExecutor", "generate_weekly_strategy_report"] +__all__ = [ + "SSHJumpExecutor", + "generate_weekly_strategy_report", + "generate_meta_analysis_report", +] -def generate_weekly_strategy_report(context: Optional[Any] = None) -> dict: +# ═══════════════════════════════════════════════════════════════════════════════ +# DB 數據讀取層 +# ═══════════════════════════════════════════════════════════════════════════════ + +def _fetch_sales_summary(days: int = 14) -> Dict[str, Any]: + """近 N 天業績彙總(本期 / 前期 對比)""" + session = get_session() + try: + rows = session.execute(text(f""" + SELECT + snapshot_date::date AS dt, + SUM(COALESCE("銷售金額"::numeric, 0)) AS revenue, + COUNT(DISTINCT "商品ID") AS sku_count + FROM daily_sales_snapshot + WHERE snapshot_date::date >= CURRENT_DATE - {days} + GROUP BY dt + ORDER BY dt DESC + """)).fetchall() + + data = [{"date": str(r[0]), "revenue": float(r[1] or 0), "sku_count": int(r[2] or 0)} + for r in rows] + + mid = len(data) // 2 + curr_rev = sum(d["revenue"] for d in data[:mid]) if mid else 0 + prev_rev = sum(d["revenue"] for d in data[mid:]) if mid else 0 + wow = ((curr_rev - prev_rev) / prev_rev * 100) if prev_rev else 0 + + return { + "daily": data[:7], + "current_7d_revenue": curr_rev, + "prev_7d_revenue": prev_rev, + "wow_pct": round(wow, 1), + } + except Exception as e: + logger.warning("[OpenClaw] 業績數據讀取失敗: %s", e) + return {} + finally: + session.close() + + +def _fetch_top_threats(limit: int = 10) -> List[Dict]: + """最新 TOP N 競價威脅(來自 Hermes 分析)""" + session = get_session() + try: + rows = session.execute(text(""" + SELECT product_sku, content, confidence, metadata_json, created_at + FROM ai_insights + WHERE insight_type = 'price_alert' + AND status = 'active' + AND created_at >= NOW() - INTERVAL '48 hours' + ORDER BY confidence DESC + LIMIT :lim + """), {"lim": limit}).fetchall() + + result = [] + for r in rows: + meta = {} + try: + meta = json.loads(r[3]) if r[3] else {} + except Exception: + pass + result.append({ + "sku": r[0], + "summary": (r[1] or "")[:200], + "confidence": float(r[2] or 0), + "gap_pct": meta.get("gap_pct", 0), + "sales_delta": meta.get("sales_7d_delta_pct", 0), + "momo_price": meta.get("momo_price"), + "pchome_price": meta.get("pchome_price"), + }) + return result + except Exception as e: + logger.warning("[OpenClaw] 威脅數據讀取失敗: %s", e) + return [] + finally: + session.close() + + +def _fetch_top_recommendations(limit: int = 10) -> List[Dict]: + """最新定價建議""" + session = get_session() + try: + rows = session.execute(text(""" + SELECT sku, name, reason, strategy, confidence, + momo_price, pchome_price, gap_pct, sales_7d_delta + FROM ai_price_recommendations + WHERE status = 'pending' + AND created_at >= NOW() - INTERVAL '48 hours' + ORDER BY confidence DESC + LIMIT :lim + """), {"lim": limit}).fetchall() + return [dict(zip( + ["sku","name","reason","strategy","confidence","momo_price","pchome_price","gap_pct","sales_delta"], + r + )) for r in rows] + except Exception as e: + logger.warning("[OpenClaw] 建議數據讀取失敗: %s", e) + return [] + finally: + session.close() + + +def _fetch_category_breakdown(days: int = 7) -> List[Dict]: + """品類業績分佈""" + session = get_session() + try: + rows = session.execute(text(f""" + SELECT p.category, + SUM(COALESCE(s."銷售金額"::numeric, 0)) AS revenue, + COUNT(DISTINCT p.i_code) AS sku_count + FROM daily_sales_snapshot s + JOIN products p ON p.name = s."商品名稱" + WHERE s.snapshot_date::date >= CURRENT_DATE - {days} + AND p.status = 'ACTIVE' + GROUP BY p.category + ORDER BY revenue DESC + LIMIT 10 + """)).fetchall() + return [{"category": r[0], "revenue": float(r[1] or 0), "sku_count": int(r[2] or 0)} + for r in rows] + except Exception as e: + logger.warning("[OpenClaw] 品類數據讀取失敗: %s", e) + return [] + finally: + session.close() + + +def _fetch_competitor_summary() -> Dict[str, Any]: + """競品價格整體概況""" + session = get_session() + try: + row = session.execute(text(""" + SELECT + COUNT(*) AS total, + AVG((cp.price - pr.price) / pr.price * 100) AS avg_gap_pct, + SUM(CASE WHEN cp.price < pr.price * 0.9 THEN 1 ELSE 0 END) AS undercut_count, + SUM(CASE WHEN cp.price > pr.price * 1.1 THEN 1 ELSE 0 END) AS premium_count + FROM competitor_prices cp + JOIN products p ON p.i_code = cp.sku + JOIN ( + SELECT DISTINCT ON (product_id) product_id, price + FROM price_records ORDER BY product_id, timestamp DESC + ) pr ON pr.product_id = p.id + WHERE cp.expires_at > NOW() + """)).fetchone() + if row and row[0]: + return { + "total_skus": int(row[0]), + "avg_gap_pct": round(float(row[1] or 0), 1), + "undercut_count": int(row[2] or 0), + "premium_count": int(row[3] or 0), + } + return {} + except Exception as e: + logger.warning("[OpenClaw] 競品概況讀取失敗: %s", e) + return {} + finally: + session.close() + + +# ═══════════════════════════════════════════════════════════════════════════════ +# DB 寫入層 +# ═══════════════════════════════════════════════════════════════════════════════ + +def _save_to_ai_insights( + insight_type: str, + content: str, + confidence: float, + metadata: Dict[str, Any], + period: Optional[str] = None, +) -> Optional[int]: + """將分析結果持久化到 ai_insights""" + session = get_session() + try: + row = session.execute(text(""" + INSERT INTO ai_insights + (insight_type, content, confidence, created_by, status, + metadata_json, period, created_at) + VALUES (:type, :content, :conf, 'openclaw', 'active', :meta, :period, NOW()) + RETURNING id + """), { + "type": insight_type, + "content": content[:8000], + "conf": confidence, + "meta": json.dumps(metadata, ensure_ascii=False), + "period": period or datetime.now().strftime("%Y-%m-%d"), + }).fetchone() + session.commit() + insight_id = row[0] if row else None + logger.info("[OpenClaw] ai_insights 寫入成功 id=%s type=%s", insight_id, insight_type) + return insight_id + except Exception as e: + logger.error("[OpenClaw] ai_insights 寫入失敗: %s", e) + session.rollback() + return None + finally: + session.close() + + +def _save_action_items(actions: List[str], source_insight_id: Optional[int]) -> None: + """將 AI 建議的行動項目寫入 action_plans""" + if not actions: + return + session = get_session() + try: + for i, action in enumerate(actions[:10]): + session.execute(text(""" + INSERT INTO action_plans + (action_type, description, status, priority, metadata_json, created_at) + VALUES ('openclaw_recommendation', :desc, 'pending', :priority, :meta, NOW()) + """), { + "desc": action[:500], + "priority": i + 1, + "meta": json.dumps({"source_insight_id": source_insight_id, "created_by": "openclaw"}), + }) + session.commit() + logger.info("[OpenClaw] action_plans 寫入 %d 筆", len(actions[:10])) + except Exception as e: + logger.warning("[OpenClaw] action_plans 寫入失敗: %s", e) + session.rollback() + finally: + session.close() + + +# ═══════════════════════════════════════════════════════════════════════════════ +# Gemini 呼叫層 +# ═══════════════════════════════════════════════════════════════════════════════ + +def _call_gemini(system_prompt: str, user_prompt: str, temperature: float = 0.4) -> Optional[str]: + """呼叫 Gemini,回傳文字;失敗回傳 None""" + if not GEMINI_API_KEY: + logger.warning("[OpenClaw] GEMINI_API_KEY 未設定") + return None + try: + import google.generativeai as genai + genai.configure(api_key=GEMINI_API_KEY) + model = genai.GenerativeModel( + model_name=STRATEGY_MODEL, + generation_config=genai.types.GenerationConfig( + temperature=temperature, + max_output_tokens=4096, + ), + system_instruction=system_prompt, + ) + response = model.generate_content( + user_prompt, + request_options={"timeout": 180}, + ) + return response.text or "" + except Exception as e: + logger.error("[OpenClaw] Gemini 呼叫失敗: %s", e) + return None + + +# ═══════════════════════════════════════════════════════════════════════════════ +# Telegram 推播 +# ═══════════════════════════════════════════════════════════════════════════════ + +def _send_strategy_telegram(title: str, report_type: str, period: str, content: str) -> None: + try: + from services.telegram_templates import report as tpl_report, _send_telegram_raw + + # Telegram 訊息長度限制 4096,分段發送 + header = tpl_report(title, report_type, period, "") + chunks = _split_message(content, max_len=3800 - len(header)) + + for i, chunk in enumerate(chunks): + msg = tpl_report(title, report_type, period, chunk) if i == 0 else chunk + _send_telegram_raw(msg) + except Exception as e: + logger.error("[OpenClaw] Telegram 推播失敗: %s", e) + + +def _split_message(text: str, max_len: int = 3800) -> List[str]: + if len(text) <= max_len: + return [text] + chunks = [] + while text: + chunks.append(text[:max_len]) + text = text[max_len:] + return chunks + + +# ═══════════════════════════════════════════════════════════════════════════════ +# 主要公開函式 +# ═══════════════════════════════════════════════════════════════════════════════ + +def generate_weekly_strategy_report( + context: Optional[Any] = None, + force_tg_alert: bool = False, +) -> dict: """ - OpenClaw 週報生成(戰略分析)。 - 當 ElephantAlpha orchestrator 分派 openclaw generate_market_analysis 時呼叫。 + OpenClaw 全景電商週報(每週一 06:00) + + 流程: + 1. 讀取 DB:業績 / 競品 / 威脅 / 建議 / 品類 + 2. MCP 收集:外部市場趨勢 / 節日 / 競品動態 + 3. Gemini 2.5 Flash 深度分析 + 4. 持久化 → ai_insights + action_plans + 5. Telegram 推播 """ - logger.info("[OpenClaw] generate_weekly_strategy_report called") - # TODO: 接入 OpenClaw LLM 生成真實週報 + now = datetime.now() + period = f"{now.strftime('%Y年第%W週')} ({now.strftime('%m/%d')})" + logger.info("[OpenClaw] 週報任務啟動 period=%s", period) + + # ── Step 1:DB 數據收集 ────────────────────────────────────────────────── + sales = _fetch_sales_summary(14) + threats = _fetch_top_threats(10) + recommendations = _fetch_top_recommendations(10) + categories = _fetch_category_breakdown(7) + competitor_summary = _fetch_competitor_summary() + + # ── Step 2:MCP 外部情報 ───────────────────────────────────────────────── + mcp_data: Dict[str, str] = {} + try: + from services.mcp_collector_service import mcp_collector + mcp_data = mcp_collector.collect_all() + holiday_ctx = mcp_collector.get_holiday_context() + seasonal_ctx = mcp_collector.get_seasonal_context() + except Exception as e: + logger.warning("[OpenClaw] MCP 收集失敗(非阻塞): %s", e) + holiday_ctx = "" + seasonal_ctx = "" + + # ── Step 3:組建 Gemini Prompt ─────────────────────────────────────────── + system_prompt = """你是 OpenClaw,一位台灣頂尖電商戰略分析師,專精於 momo 購物平台。 +你的任務是根據真實業績數據、競品情報、外部市場趨勢,產出一份具體可執行的週報。 + +語言規定: +- 所有輸出必須使用繁體中文(台灣用語) +- 數字格式:金額用 NT$ 標示,百分比保留1位小數 +- 語氣:專業但不失親切,適合匯報給電商運營主管 + +分析原則: +- 每個洞察必須有數據支撐,禁止憑空推測 +- 建議必須具體(時間、對象、行動、預期效益) +- 優先關注「可在 48 小時內執行」的行動項目""" + + db_section = f""" +【DB 即時數據】 +業績概況: + 本週營收:NT${sales.get('current_7d_revenue', 0):,.0f} + 前週營收:NT${sales.get('prev_7d_revenue', 0):,.0f} + 週成長率:{sales.get('wow_pct', 0):+.1f}% + +競品比對概況: + 監控SKU總數:{competitor_summary.get('total_skus', 0)} + 平均價差:{competitor_summary.get('avg_gap_pct', 0):+.1f}% + 被競品削價數:{competitor_summary.get('undercut_count', 0)} 個 + 我方具優勢數:{competitor_summary.get('premium_count', 0)} 個 + +TOP 威脅品項(近48h Hermes 偵測): +{_format_threats(threats)} + +待處理定價建議: +{_format_recommendations(recommendations)} + +品類業績分佈(本週): +{_format_categories(categories)} +""" + + mcp_section = f""" +【MCP 外部情報】 +市場趨勢: +{mcp_data.get('market_trends', '(未取得)')[:600]} + +競品動態: +{mcp_data.get('competitor_intel', '(未取得)')[:500]} + +消費者情緒: +{mcp_data.get('consumer_sentiment', '(未取得)')[:400]} + +定價策略參考: +{mcp_data.get('pricing_strategy', '(未取得)')[:400]} + +節日行事曆: +{holiday_ctx} +{mcp_data.get('holiday_calendar', '')[:300]} + +季節性洞察: +{seasonal_ctx} +{mcp_data.get('seasonal_insights', '')[:300]} +""" + + user_prompt = f"""請根據以下數據,產出本週電商全景戰略週報: + +{db_section} +{mcp_section} + +請按以下結構輸出(每節使用 HTML 標題,內容精簡扼要): + +📊 本週業績總結 +(關鍵指標 + WoW 變化 + 異常警示) + +🏆 TOP 機會品項 +(具備提價或強推空間的品項,3-5個,含具體建議) + +⚠️ TOP 威脅品項 +(最需緊急處理的競品削價風險,3-5個,含建議行動) + +💰 本週定價策略建議 +(整體定價方向 + 品類重點調整 + 心理定價應用) + +📢 行銷活動洞察 +(節日/季節機會 + 推薦活動形式 + 投放時機) + +📦 品類熱度分析 +(成長品類 vs 衰退品類 + 庫存備貨建議) + +🔮 市場競爭洞察 +(競品最新動態 + 平台策略差異 + 我方應對) + +🎯 48小時優先行動清單 +(5-8條具體可執行任務,格式:[優先度] 行動說明 → 預期效益) + +📈 下週展望 +(風險提示 + 機會預告 + 需人工決策事項) + +重要:語言必須是繁體中文,數據必須引用上方提供的實際數字。 +""" + + # ── Step 4:Gemini 生成 ─────────────────────────────────────────────────── + logger.info("[OpenClaw] 呼叫 Gemini %s 生成週報...", STRATEGY_MODEL) + report_content = _call_gemini(system_prompt, user_prompt, temperature=0.35) + + if not report_content: + logger.error("[OpenClaw] Gemini 週報生成失敗") + return {"status": "error", "report_type": "weekly_strategy", "error": "Gemini 呼叫失敗"} + + # ── Step 5:解析行動清單 ───────────────────────────────────────────────── + action_items = _extract_action_items(report_content) + + # ── Step 6:持久化 DB ──────────────────────────────────────────────────── + metadata = { + "period": period, + "model": STRATEGY_MODEL, + "wow_pct": sales.get("wow_pct", 0), + "threat_count": len(threats), + "recommendation_count": len(recommendations), + "mcp_topics_collected": sum(1 for v in mcp_data.values() if v), + "action_count": len(action_items), + "generated_at": now.isoformat(), + } + insight_id = _save_to_ai_insights( + insight_type="weekly_strategy", + content=report_content, + confidence=0.88, + metadata=metadata, + period=now.strftime("%Y-%W"), + ) + _save_action_items(action_items, insight_id) + + # ── Step 7:Telegram 推播 ──────────────────────────────────────────────── + if force_tg_alert or True: + _send_strategy_telegram( + title="OpenClaw 電商全景週報", + report_type="weekly_strategy", + period=period, + content=report_content, + ) + + logger.info("[OpenClaw] 週報完成 insight_id=%s actions=%d", insight_id, len(action_items)) return { "status": "ok", "report_type": "weekly_strategy", - "summary": "OpenClaw strategy report placeholder — LLM integration pending", - "context": context, + "insight_id": insight_id, + "period": period, + "action_count": len(action_items), + "summary": report_content[:300], } + + +def generate_meta_analysis_report() -> str: + """ + AI 系統效能自我審視(每 6 小時 run_openclaw_meta_analysis_task 呼叫) + + 分析 ai_insights 近期累積資料,評估: + - 各 Agent 預測準確率 + - 價格建議執行率 + - 告警品質與誤報率 + - 系統盲區與改進方向 + + 結果持久化至 ai_insights(type='meta_analysis'),並推播 Telegram。 + """ + now = datetime.now() + period = now.strftime("%Y-%m-%d %H:00") + logger.info("[OpenClaw] Meta-Analysis 任務啟動 %s", period) + + # ── 讀取近期 ai_insights 摘要 ──────────────────────────────────────────── + session = get_session() + try: + stats = session.execute(text(""" + SELECT + insight_type, + created_by, + COUNT(*) AS total, + AVG(confidence) AS avg_conf, + SUM(CASE WHEN status='active' THEN 1 ELSE 0 END) AS active_cnt, + SUM(CASE WHEN status='relearn' THEN 1 ELSE 0 END) AS relearn_cnt, + MAX(created_at) AS latest + FROM ai_insights + WHERE created_at >= NOW() - INTERVAL '24 hours' + GROUP BY insight_type, created_by + ORDER BY total DESC + """)).fetchall() + + action_stats = session.execute(text(""" + SELECT status, COUNT(*) AS cnt + FROM action_plans + WHERE created_at >= NOW() - INTERVAL '24 hours' + GROUP BY status + """)).fetchall() + + reco_stats = session.execute(text(""" + SELECT status, COUNT(*) AS cnt, AVG(confidence) AS avg_conf + FROM ai_price_recommendations + WHERE created_at >= NOW() - INTERVAL '24 hours' + GROUP BY status + """)).fetchall() + except Exception as e: + logger.warning("[OpenClaw] Meta 數據讀取失敗: %s", e) + stats, action_stats, reco_stats = [], [], [] + finally: + session.close() + + # ── 組建 Prompt ─────────────────────────────────────────────────────────── + system_prompt = """你是 OpenClaw 自我審視模組,負責分析 AI 多智能體系統的近期表現。 +請用繁體中文,以電商 AI 系統架構師的視角撰寫分析報告,語氣客觀、聚焦問題與改進。""" + + stats_text = "\n".join([ + f" {r[0]} ({r[1]}): 共{r[2]}筆, 平均信心{r[3]:.2f}, 活躍{r[4]}, 重學{r[5]}" + for r in stats + ]) or " (無近期數據)" + + action_text = "\n".join([ + f" {r[0]}: {r[1]} 筆" for r in action_stats + ]) or " (無近期數據)" + + reco_text = "\n".join([ + f" {r[0]}: {r[1]} 筆, 平均信心{r[2]:.2f}" for r in reco_stats + ]) or " (無近期數據)" + + user_prompt = f"""請分析以下 AI 系統近 24 小時運作數據,產出自我審視報告: + +【ai_insights 產出統計】 +{stats_text} + +【action_plans 執行狀況】 +{action_text} + +【ai_price_recommendations 建議狀況】 +{reco_text} + +【分析時間】{period} + +請按以下結構輸出: + +🤖 AI 系統效能自我審視報告 +時間:{period} + +📊 各 Agent 產出統計 +(逐一評估 Hermes/NemoTron/OpenClaw/ElephantAlpha 的輸出品質) + +⚠️ 偵測到的系統問題 +(誤報、漏報、重學事件分析) + +💡 盲區與改進建議 +(哪些場景 AI 表現不足?建議優化方向) + +✅ 本週期亮點 +(表現良好的分析案例) + +🔧 技術債與優化優先順序 +(1-3 項具體技術改進建議) + +語言:繁體中文,200字以內,精簡扼要。 +""" + + # ── Gemini 生成 ────────────────────────────────────────────────────────── + report_content = _call_gemini(system_prompt, user_prompt, temperature=0.3) + if not report_content: + logger.error("[OpenClaw] Meta-Analysis Gemini 呼叫失敗") + return "(Meta-Analysis 生成失敗)" + + # ── 持久化 DB ───────────────────────────────────────────────────────────── + metadata = { + "period": period, + "model": STRATEGY_MODEL, + "insight_types_analyzed": len(stats), + "generated_at": now.isoformat(), + } + insight_id = _save_to_ai_insights( + insight_type="meta_analysis", + content=report_content, + confidence=0.85, + metadata=metadata, + period=now.strftime("%Y-%m-%d"), + ) + + # ── Telegram 推播 ──────────────────────────────────────────────────────── + try: + from services.telegram_templates import _send_telegram_raw + _send_telegram_raw(report_content) + except Exception as e: + logger.error("[OpenClaw] Meta-Analysis Telegram 推播失敗: %s", e) + + logger.info("[OpenClaw] Meta-Analysis 完成 insight_id=%s", insight_id) + return report_content + + +# ═══════════════════════════════════════════════════════════════════════════════ +# 輔助格式化函式 +# ═══════════════════════════════════════════════════════════════════════════════ + +def _format_threats(threats: List[Dict]) -> str: + if not threats: + return " (無近期競價威脅)" + lines = [] + for t in threats[:5]: + lines.append( + f" • SKU {t['sku']}:價差 {t.get('gap_pct', 0):+.1f}%," + f"業績週變化 {t.get('sales_delta', 0):+.1f}%," + f"信心 {t.get('confidence', 0):.2f}" + ) + return "\n".join(lines) + + +def _format_recommendations(recs: List[Dict]) -> str: + if not recs: + return " (無待處理定價建議)" + lines = [] + for r in recs[:5]: + lines.append( + f" • {r.get('name', r.get('sku', ''))[:30]}:{r.get('strategy', '')}," + f"信心 {r.get('confidence', 0):.2f}" + ) + return "\n".join(lines) + + +def _format_categories(cats: List[Dict]) -> str: + if not cats: + return " (無品類數據)" + lines = [] + for c in cats[:5]: + lines.append( + f" • {c.get('category', '未分類')}:" + f"NT${c.get('revenue', 0):,.0f},{c.get('sku_count', 0)} 個 SKU" + ) + return "\n".join(lines) + + +def _extract_action_items(report_text: str) -> List[str]: + """從報告文字中解析行動清單(48小時優先行動)""" + lines = report_text.split("\n") + items = [] + in_action_section = False + for line in lines: + if "48小時" in line or "優先行動" in line: + in_action_section = True + continue + if in_action_section: + stripped = line.strip() + if stripped.startswith("•") or stripped.startswith("-") or stripped.startswith("["): + items.append(stripped.lstrip("•-").strip()) + elif stripped.startswith("") and items: + break + return items[:8]