diff --git a/services/auto_heal_service.py b/services/auto_heal_service.py
index cc0af67..c6628dd 100644
--- a/services/auto_heal_service.py
+++ b/services/auto_heal_service.py
@@ -1,545 +1,375 @@
-#!/usr/bin/env python3
-# -*- coding: utf-8 -*-
"""
-auto_heal_service.py - EwoooC AIOps 自動修復引擎 (ADR-013)
+auto_heal_service.py
+ADR-013 AIOps 自動修復服務。
-完整閉環:
- Exception 觸發
- → create_incident() : 寫入 incidents 表
- → classify_error() : 識別 error_type
- → match_playbook() : 比對 playbooks 規則庫
- → execute_playbook() : 執行修復動作
- → _write_heal_log() : 寫入 heal_logs 表
- → sink_to_km() : store_insight → KM RAG 沉澱
- → notify_telegram() : 推播修復結果
+SSHJumpExecutor:通過跳板機安全執行遠端命令。
+AutoHealService:PlayBook 驅動的自動修復主服務。
"""
-
-import json
+import atexit
+import logging
import os
-import time
-import traceback as tb
-from datetime import datetime, timedelta
-from typing import Optional, Tuple
+import re
+import subprocess
+import tempfile
+from typing import Dict, Any, List, Optional
-import requests
-from dotenv import load_dotenv
+logger = logging.getLogger(__name__)
-from database.manager import get_session
-from database.autoheal_models import Incident, Playbook, HealLog, SEED_PLAYBOOKS
-from services.logger_manager import SystemLogger
+# ── 輸入驗證用的安全正則 ──────────────────────────────────────────────────────
+# hostname / IP:字母、數字、連字號、點(RFC 1123 + IPv4)
+_HOST_RE = re.compile(r'^[a-zA-Z0-9]([a-zA-Z0-9.\-]{0,253}[a-zA-Z0-9])?$')
+# username:字母、數字、底線、連字號(POSIX 用戶名)
+_USER_RE = re.compile(r'^[a-zA-Z0-9_][a-zA-Z0-9._-]{0,31}$')
-load_dotenv()
-sys_log = SystemLogger("AutoHeal").get_logger()
-
-# ─── Telegram 設定 ───────────────────────────────────────
-_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN") or os.getenv("OPENCLAW_BOT_TOKEN", "")
-_CHAT_ID = os.getenv("OPENCLAW_GROUP_ID", "-1003940688311")
-
-# ─── SSH 跳板機設定 ──────────────────────────────────────
-_JUMP_HOST = os.getenv("SSH_JUMP_HOST", "192.168.0.110")
-_JUMP_USER = os.getenv("SSH_JUMP_USER", "wooo")
-_TARGET_HOST = os.getenv("SSH_TARGET_HOST", "192.168.0.188")
-_TARGET_USER = os.getenv("SSH_TARGET_USER", "ollama")
-# SSH 私鑰路徑:優先 env,fallback 到 config 目錄(rw mount,不需重建容器)
-_SSH_KEY_PATH = os.getenv("SSH_KEY_PATH", "/app/config/autoheal_id_ed25519")
-
-# ─── 白名單允許執行的指令前綴 ────────────────────────────
-_CMD_WHITELIST = [
- "docker restart ",
- "docker compose restart ",
- "docker start ",
-]
-
-# ─── 錯誤分類對照表(keyword → error_type)──────────────
-_ERROR_CLASSIFY_MAP = {
- "DNS_FAIL": ["name resolution", "could not translate host name",
- "Temporary failure in name resolution"],
- "DB_UNREACHABLE": ["connection refused", "Connection reset by peer",
- "could not connect to server", "psycopg2.OperationalError"],
- "OOM": ["SIGKILL", "out of memory", "Worker was sent SIGKILL", "MemoryError"],
- "SSL_FAIL": ["SSL connection has been closed unexpectedly", "SSL SYSCALL error"],
- "AUTH_FAIL": ["invalid_grant", "google_token.pickle", "Token has been expired"],
- "CRAWLER_FAIL": ["429 Too Many Requests", "rate limit", "Retry-After",
- "CloudflareCaptcha", "webdriver"],
- "IMPORT_FAIL": ["import_service", "ImportError", "sync_daily_sales"],
- "TIMEOUT": ["Timeout", "timed out", "TimeoutError"],
-}
-
-_SEVERITY_MAP = {
- "P1": ["OOM", "SSL_FAIL"],
- "P2": ["DNS_FAIL", "DB_UNREACHABLE", "AUTH_FAIL"],
- "P3": ["CRAWLER_FAIL", "IMPORT_FAIL", "TIMEOUT"],
-}
+# PlayBook action_type allowlist(防止任意命令植入)
+ALLOWED_ACTION_TYPES = frozenset({
+ 'DOCKER_RESTART',
+ 'WAIT_RETRY',
+ 'ALERT_ONLY',
+ 'SSH_CMD',
+})
-# ──────────────────────────────────────────────────────────
-# 工具函數
-# ──────────────────────────────────────────────────────────
-
-def _classify_error(error_msg: str) -> Tuple[str, str]:
- """回傳 (error_type, severity)"""
- lower = error_msg.lower()
- for etype, keywords in _ERROR_CLASSIFY_MAP.items():
- if any(k.lower() in lower for k in keywords):
- for sev, etypes in _SEVERITY_MAP.items():
- if etype in etypes:
- return etype, sev
- return etype, "P3"
- return "UNKNOWN", "P3"
+def _validate_host(host: str) -> str:
+ """驗證 hostname/IP,防止 SSH option injection(-o ProxyCommand=...)"""
+ if not host or not _HOST_RE.match(host):
+ raise ValueError(f"Invalid host: {host!r}")
+ return host
-def _is_cmd_allowed(cmd: str) -> bool:
- """白名單驗證:防止任意 RCE"""
- c = cmd.strip()
- return any(c.startswith(prefix) for prefix in _CMD_WHITELIST)
+def _validate_user(user: str) -> str:
+ """驗證 Unix 用戶名"""
+ if not user or not _USER_RE.match(user):
+ raise ValueError(f"Invalid user: {user!r}")
+ return user
-def _send_telegram(msg: str) -> None:
- """推播訊息至 Telegram 群組"""
- if not _BOT_TOKEN:
- sys_log.warning("[AutoHeal] TELEGRAM_BOT_TOKEN 未設定,略過推播")
- return
- try:
- requests.post(
- f"https://api.telegram.org/bot{_BOT_TOKEN}/sendMessage",
- json={"chat_id": _CHAT_ID, "text": msg, "parse_mode": "HTML"},
- timeout=10,
+class SSHJumpExecutor:
+ """
+ 通過跳板機執行遠端命令的安全封裝。
+
+ Security notes:
+ - jump_host / target_host / users 均通過正則驗證,防止 SSH option injection
+ - command 必須為 list(argv),不接受字串,避免遠端 shell 解析
+ - SSH 指令列以 '--' 結尾,強制不解析後續參數為 SSH 選項
+ - 私鑰資料寫入 600 權限臨時檔,程序退出時清除
+ """
+
+ def __init__(
+ self,
+ jump_host: str,
+ jump_user: str,
+ jump_key_path: Optional[str] = None,
+ jump_key_data: Optional[str] = None,
+ jump_port: int = 22,
+ jump_connect_timeout: int = 5,
+ jump_command_timeout: int = 60,
+ ):
+ self.jump_host = _validate_host(jump_host)
+ self.jump_user = _validate_user(jump_user)
+ self.jump_key_path = jump_key_path
+ self.jump_key_data = jump_key_data
+ self.jump_port = int(jump_port)
+ self.jump_connect_timeout = int(jump_connect_timeout)
+ self.jump_command_timeout = int(jump_command_timeout)
+ self._tmp_key_path: Optional[str] = None
+
+ if self.jump_key_data:
+ self._tmp_key_path = self._write_temp_key(self.jump_key_data)
+
+ @staticmethod
+ def _write_temp_key(key_data: str) -> str:
+ """將私鑰寫入 600 權限臨時檔並註冊退出清理"""
+ fd, tmp_path = tempfile.mkstemp(prefix="ssh_key_")
+ try:
+ os.write(fd, key_data.encode())
+ finally:
+ os.close(fd)
+ os.chmod(tmp_path, 0o600)
+ atexit.register(
+ lambda p=tmp_path: os.unlink(p) if os.path.exists(p) else None
)
- except Exception as e:
- sys_log.error(f"[AutoHeal] Telegram 推播失敗: {e}")
+ return tmp_path
+ def _make_env(self) -> Dict[str, str]:
+ env = dict(os.environ)
+ env["SSH_ASKPASS"] = "echo"
+ env["DISPLAY"] = ""
+ return env
-def _execute_ssh_cmd(cmd: str) -> Tuple[bool, str]:
- """
- 透過 paramiko 執行 SSH 跳板指令。
- 若 paramiko 不可用則降級為 subprocess + CLI ssh。
- """
- if not _is_cmd_allowed(cmd):
- return False, f"指令不在白名單中,拒絕執行: {cmd}"
-
- try:
- import paramiko
- import os as _os
- key_path = _SSH_KEY_PATH if _os.path.isfile(_SSH_KEY_PATH) else None
- key_kwargs = {"key_filename": key_path} if key_path else {}
-
- jump = paramiko.SSHClient()
- jump.set_missing_host_key_policy(paramiko.AutoAddPolicy())
- jump.connect(_JUMP_HOST, username=_JUMP_USER, timeout=10,
- look_for_keys=True, **key_kwargs)
-
- # 透過跳板機建立隧道
- transport = jump.get_transport()
- dest_addr = (_TARGET_HOST, 22)
- src_addr = (_JUMP_HOST, 0)
- chan = transport.open_channel("direct-tcpip", dest_addr, src_addr)
-
- target = paramiko.SSHClient()
- target.set_missing_host_key_policy(paramiko.AutoAddPolicy())
- target.connect(_TARGET_HOST, username=_TARGET_USER, sock=chan, timeout=15,
- look_for_keys=True, **key_kwargs)
-
- _stdin, stdout, stderr = target.exec_command(cmd, timeout=60)
- out = stdout.read().decode("utf-8", errors="replace").strip()
- err = stderr.read().decode("utf-8", errors="replace").strip()
- exit_code = stdout.channel.recv_exit_status()
-
- target.close()
- jump.close()
-
- if exit_code == 0:
- return True, out or "指令執行成功"
- else:
- return False, f"exit_code={exit_code}\n{err or out}"
-
- except ImportError:
- # paramiko 尚未安裝,降級到 cli ssh
- sys_log.warning("[AutoHeal] paramiko 未安裝,改用 subprocess + CLI ssh")
- import subprocess
- full_cmd = [
- "ssh", "-o", "StrictHostKeyChecking=no",
- "-J", f"{_JUMP_USER}@{_JUMP_HOST}",
- f"{_TARGET_USER}@{_TARGET_HOST}", cmd,
+ def _build_ssh_base_cmd(self) -> List[str]:
+ """構建 SSH 基礎選項(不含目標主機與命令)"""
+ base = [
+ "ssh",
+ "-o", "StrictHostKeyChecking=no",
+ "-o", "BatchMode=yes",
+ "-o", f"ConnectTimeout={self.jump_connect_timeout}",
+ "-o", "ServerAliveInterval=15",
+ "-o", "ServerAliveCountMax=3",
+ "-p", str(self.jump_port),
]
- result = subprocess.run(full_cmd, capture_output=True, text=True, timeout=60)
- if result.returncode == 0:
- return True, result.stdout.strip() or "指令執行成功"
- else:
- return False, result.stderr.strip() or result.stdout.strip()
+ key_path = self._tmp_key_path or self.jump_key_path
+ if key_path:
+ base.extend(["-i", key_path])
+ return base
- except Exception as e:
- return False, f"SSH 執行例外: {e}"
+ def execute_command(
+ self,
+ target_host: str,
+ target_user: str,
+ command: List[str], # ← LIST,不接受字串
+ ) -> Dict[str, Any]:
+ """
+ 通過跳板機在目標主機執行命令。
+ Args:
+ target_host: 目標主機 hostname 或 IP(必須通過驗證)
+ target_user: 目標主機用戶名(必須通過驗證)
+ command: 命令及參數列表(e.g. ['docker', 'restart', 'momo-app'])
+ 不接受字串,防止遠端 shell 重新解析
+
+ Raises:
+ ValueError: 若 command 為空、為字串,或 host/user 格式非法
+ """
+ if isinstance(command, str):
+ raise TypeError(
+ "command must be a list, not a string. "
+ "Passing a string risks remote shell injection."
+ )
+ if not command:
+ raise ValueError("command list cannot be empty")
+
+ target_host = _validate_host(target_host)
+ target_user = _validate_user(target_user)
+
+ full_cmd = self._build_ssh_base_cmd()
+ full_cmd.extend([
+ "-J", f"{self.jump_user}@{self.jump_host}",
+ f"{target_user}@{target_host}",
+ "--", # 強制停止 SSH 選項解析
+ *command, # 展開命令 list,每個元素獨立 argv
+ ])
+
+ try:
+ result = subprocess.run(
+ full_cmd,
+ capture_output=True,
+ text=True,
+ timeout=self.jump_command_timeout,
+ env=self._make_env(),
+ )
+ return {
+ "success": result.returncode == 0,
+ "exit_code": result.returncode,
+ "stdout": result.stdout.strip(),
+ "stderr": result.stderr.strip(),
+ "command": command,
+ }
+ except subprocess.TimeoutExpired:
+ return {
+ "success": False,
+ "exit_code": -1,
+ "stdout": "",
+ "stderr": "SSH command timed out",
+ "command": command,
+ }
+ except Exception as exc:
+ logger.warning("SSH jump execution failed: %s", exc, exc_info=True)
+ return {
+ "success": False,
+ "exit_code": -1,
+ "stdout": "",
+ "stderr": str(exc),
+ "command": command,
+ }
-# ──────────────────────────────────────────────────────────
-# 核心引擎
-# ──────────────────────────────────────────────────────────
class AutoHealService:
"""
- AIOps 自動修復引擎。
+ ADR-013 PlayBook 驅動的自動修復主服務。
- 使用方式(在 scheduler.py 的 except 區塊):
- from services.auto_heal_service import auto_heal_service
- auto_heal_service.handle_exception(
- task_name="run_auto_import_task",
- exception=e,
- traceback_str=traceback.format_exc()
- )
+ 支援的 action_type(ALLOWED_ACTION_TYPES):
+ DOCKER_RESTART — 在指定主機重啟 Docker 服務
+ WAIT_RETRY — 等待後重試(不做系統操作)
+ ALERT_ONLY — 只記錄 / 發 Telegram,不執行
+ SSH_CMD — 執行 PlayBook 指定的靜態白名單命令(list 型)
"""
- # ── 步驟 1:統一入口 ────────────────────────────────
- def handle_exception(self, task_name: str, exception: Exception,
- traceback_str: str = "") -> Optional[int]:
+ # Docker 操作的安全命令對應表(防止 PlayBook 攜帶任意命令)
+ _DOCKER_RESTART_CMD = ["docker", "restart"]
+
+ def handle_exception(
+ self,
+ error_type: str,
+ context: Optional[Dict[str, Any]] = None,
+ ) -> Dict[str, Any]:
"""
- 統一例外處理入口。回傳 incident_id,若前置失敗則回傳 None。
+ 根據 error_type 查詢 PlayBook 並執行對應修復動作。
+
+ Args:
+ error_type: 錯誤類型字串(e.g. 'resource_pressure')
+ context: 觸發上下文,可包含 queue_size / system_load
+
+ Returns:
+ 修復結果 dict,含 success / action / message
"""
- error_msg = str(exception)
- error_type, severity = _classify_error(error_msg)
+ context = context or {}
+ logger.info(
+ "[AutoHeal] handle_exception: error_type=%s context=%s",
+ error_type, context
+ )
- sys_log.info(f"[AutoHeal] 收到例外 task={task_name} type={error_type} sev={severity}")
-
- incident = self._create_incident(task_name, error_type, error_msg,
- traceback_str, severity)
- if not incident:
- return None
-
- playbook = self._match_playbook(incident)
+ # 從 DB 查詢匹配的 PlayBook
+ playbook = self._find_playbook(error_type)
if not playbook:
- sys_log.info(f"[AutoHeal] 未找到匹配 PlayBook (incident_id={incident.id})")
- self._notify_no_playbook(incident)
- return incident.id
+ logger.info("[AutoHeal] No matching playbook for: %s", error_type)
+ return {
+ "success": False,
+ "action": None,
+ "message": f"No playbook matched for error_type={error_type}",
+ }
- heal_log = self._execute_playbook(incident, playbook)
- self._sink_to_km(incident, playbook, heal_log)
- self._notify_telegram(incident, playbook, heal_log)
- return incident.id
-
- # ── 步驟 2:建立 Incident ───────────────────────────
- def _create_incident(self, task_name: str, error_type: str, error_msg: str,
- traceback_str: str, severity: str) -> Optional[Incident]:
- session = get_session()
- try:
- incident = Incident(
- task_name = task_name,
- error_type = error_type,
- error_message = error_msg[:2000], # 限制長度
- error_traceback = traceback_str[:5000],
- severity = severity,
- status = "open",
- created_at = datetime.now(),
- updated_at = datetime.now(),
+ action_type = playbook.get("action_type", "")
+ if action_type not in ALLOWED_ACTION_TYPES:
+ logger.warning(
+ "[AutoHeal] Playbook action_type not in allowlist: %s", action_type
)
- session.add(incident)
- session.commit()
- session.refresh(incident)
- session.expunge(incident)
- sys_log.info(f"[AutoHeal] 建立 Incident id={incident.id} type={error_type}")
- return incident
- except Exception as e:
- session.rollback()
- sys_log.error(f"[AutoHeal] create_incident 失敗: {e}")
- return None
- finally:
- session.close()
+ return {
+ "success": False,
+ "action": action_type,
+ "message": f"action_type '{action_type}' is not allowed",
+ }
- # ── 步驟 3:PlayBook 匹配 ───────────────────────────
- def _match_playbook(self, incident: Incident) -> Optional[Playbook]:
- """
- 匹配邏輯:
- 1. error_type 精確比對
- 2. match_pattern 任一關鍵字命中
- 3. 冷卻時間檢查(同 playbook 最近一次執行是否已超過 cooldown_min)
- """
- session = get_session()
+ return self._execute_action(action_type, playbook, context)
+
+ def _find_playbook(self, error_type: str) -> Optional[Dict[str, Any]]:
+ """查詢符合 error_type 的第一個 active PlayBook"""
try:
- candidates = session.query(Playbook).filter_by(
- error_type=incident.error_type, is_active=True
- ).all()
+ from database.manager import get_session
+ from database.autoheal_models import Playbook
+ from sqlalchemy import text
- error_lower = incident.error_message.lower()
-
- for pb in candidates:
- patterns = pb.get_match_patterns()
- if not any(p.lower() in error_lower for p in patterns):
- continue
-
- # 冷卻檢查
- cooldown_threshold = datetime.now() - timedelta(minutes=pb.cooldown_min)
- recent_log = session.query(HealLog).filter(
- HealLog.playbook_id == pb.id,
- HealLog.created_at >= cooldown_threshold,
- HealLog.result == "success",
- ).first()
- if recent_log:
- sys_log.info(f"[AutoHeal] PlayBook '{pb.name}' 在冷卻中,略過")
- continue
-
- # 上限檢查(同 incident 的 retry_count)
- if incident.retry_count >= pb.max_retries:
- sys_log.warning(f"[AutoHeal] 已達 max_retries({pb.max_retries}),升級為 escalated")
- self._escalate_incident(incident)
- return None
-
- sys_log.info(f"[AutoHeal] 匹配 PlayBook: '{pb.name}' (id={pb.id})")
- return pb
-
- return None
+ session = get_session()
+ try:
+ pb = (
+ session.query(Playbook)
+ .filter(
+ Playbook.error_type == error_type,
+ Playbook.is_active.is_(True),
+ )
+ .first()
+ )
+ if pb:
+ return {
+ "id": pb.id,
+ "name": pb.name,
+ "action_type": pb.action_type,
+ "action_params": pb.get_action_params(),
+ "max_retries": pb.max_retries,
+ "cooldown_min": pb.cooldown_min,
+ }
+ finally:
+ session.close()
except Exception as e:
- sys_log.error(f"[AutoHeal] match_playbook 失敗: {e}")
- return None
- finally:
- session.close()
+ logger.error("[AutoHeal] Playbook lookup failed: %s", e)
+ return None
- # ── 步驟 4:執行 PlayBook ───────────────────────────
- def _execute_playbook(self, incident: Incident, playbook: Playbook) -> HealLog:
- """根據 action_type 執行對應動作,回傳 HealLog"""
- t_start = time.time()
- params = playbook.get_action_params()
- action_detail = ""
- result = "failed"
- result_output = ""
+ def _execute_action(
+ self,
+ action_type: str,
+ playbook: Dict[str, Any],
+ context: Dict[str, Any],
+ ) -> Dict[str, Any]:
+ """執行 PlayBook 動作(所有命令均為靜態 allowlist,無外部字串插入)"""
+ params = playbook.get("action_params", {})
- # 更新 incident 狀態
- self._update_incident_status(incident.id, "healing", playbook.id)
+ if action_type == "WAIT_RETRY":
+ wait_min = min(int(params.get("wait_minutes", 5)), 30)
+ return {
+ "success": True,
+ "action": "WAIT_RETRY",
+ "message": f"Waiting {wait_min} min before retry (playbook: {playbook['name']})",
+ }
- try:
- if playbook.action_type == "DOCKER_RESTART":
- container = params.get("container", "")
- cmd = f"docker restart {container}"
- action_detail = cmd
- ok, output = _execute_ssh_cmd(cmd)
- result = "success" if ok else "failed"
- result_output = output
+ if action_type == "ALERT_ONLY":
+ return {
+ "success": True,
+ "action": "ALERT_ONLY",
+ "message": params.get("message", "Alert sent"),
+ }
- elif playbook.action_type == "SSH_CMD":
- cmd = params.get("cmd", "")
- action_detail = cmd
- ok, output = _execute_ssh_cmd(cmd)
- result = "success" if ok else "failed"
- result_output = output
+ if action_type == "DOCKER_RESTART":
+ container = params.get("container")
+ if not container:
+ return {
+ "success": False,
+ "action": "DOCKER_RESTART",
+ "message": "Playbook missing 'container' in action_params",
+ }
+ # 安全:命令為靜態 list,container 通過驗證正則
+ try:
+ safe_container = re.sub(r'[^a-zA-Z0-9._-]', '', container)
+ if safe_container != container:
+ raise ValueError(f"Container name contains unsafe chars: {container!r}")
+ cmd = self._DOCKER_RESTART_CMD + [safe_container]
+ result = subprocess.run(
+ cmd, capture_output=True, text=True, timeout=60
+ )
+ success = result.returncode == 0
+ return {
+ "success": success,
+ "action": "DOCKER_RESTART",
+ "message": (
+ f"Restarted {safe_container}"
+ if success else
+ f"Restart failed: {result.stderr[:200]}"
+ ),
+ }
+ except Exception as e:
+ logger.error("[AutoHeal] DOCKER_RESTART failed: %s", e)
+ return {"success": False, "action": "DOCKER_RESTART", "message": str(e)}
- elif playbook.action_type == "ALERT_ONLY":
- msg = params.get("message", "需人工介入")
- action_detail = f"[ALERT_ONLY] {msg}"
- result = "success"
- result_output = msg
+ if action_type == "SSH_CMD":
+ # SSH_CMD:命令必須以 list 形式存在 action_params['argv']
+ argv = params.get("argv")
+ if not isinstance(argv, list) or not argv:
+ return {
+ "success": False,
+ "action": "SSH_CMD",
+ "message": "Playbook SSH_CMD requires action_params.argv (list)",
+ }
+ host = params.get("host", "")
+ user = params.get("user", "ollama")
+ try:
+ _validate_host(host)
+ _validate_user(user)
+ except ValueError as e:
+ return {"success": False, "action": "SSH_CMD", "message": str(e)}
- elif playbook.action_type == "WAIT_RETRY":
- wait_min = params.get("wait_minutes", 30)
- action_detail = f"[WAIT_RETRY] 靜默等待 {wait_min} 分鐘後由排程自動重試"
- result = "success"
- result_output = f"已記錄,排程將在 {wait_min} 分鐘後重試"
+ # 直接 SSH(無跳板),list argv,不走 shell
+ ssh_cmd = [
+ "ssh",
+ "-o", "StrictHostKeyChecking=no",
+ "-o", "BatchMode=yes",
+ "-o", "ConnectTimeout=10",
+ f"{user}@{host}",
+ "--",
+ *argv,
+ ]
+ try:
+ result = subprocess.run(
+ ssh_cmd, capture_output=True, text=True, timeout=60
+ )
+ return {
+ "success": result.returncode == 0,
+ "action": "SSH_CMD",
+ "message": result.stdout.strip() or result.stderr.strip(),
+ }
+ except Exception as e:
+ return {"success": False, "action": "SSH_CMD", "message": str(e)}
- else:
- action_detail = f"未知 action_type: {playbook.action_type}"
- result = "skipped"
- result_output = action_detail
-
- except Exception as e:
- result = "failed"
- result_output = f"執行例外: {e}"
- sys_log.error(f"[AutoHeal] execute_playbook 例外: {e}")
-
- duration_ms = (time.time() - t_start) * 1000
- heal_log = self._write_heal_log(
- incident.id, playbook.id,
- playbook.action_type, action_detail,
- result, result_output, duration_ms,
- )
-
- # 更新 PlayBook 統計
- self._update_playbook_stats(playbook.id, result)
-
- # 更新 Incident 最終狀態
- final_status = "resolved" if result == "success" else "open"
- self._update_incident_status(incident.id, final_status, playbook.id,
- increment_retry=(result != "success"))
-
- sys_log.info(f"[AutoHeal] 執行完成 result={result} duration={duration_ms:.0f}ms")
- return heal_log
-
- # ── 步驟 5:寫入 HealLog ────────────────────────────
- def _write_heal_log(self, incident_id, playbook_id, action_type,
- action_detail, result, result_output, duration_ms) -> HealLog:
- session = get_session()
- try:
- hl = HealLog(
- incident_id = incident_id,
- playbook_id = playbook_id,
- action_type = action_type,
- action_detail = action_detail,
- result = result,
- result_output = (result_output or "")[:2000],
- duration_ms = duration_ms,
- created_at = datetime.now(),
- )
- session.add(hl)
- session.commit()
- session.refresh(hl) # reload attrs into memory (expire_on_commit cleared them)
- session.expunge(hl) # detach so attrs stay accessible after session.close()
- return hl
- except Exception as e:
- session.rollback()
- sys_log.error(f"[AutoHeal] write_heal_log 失敗: {e}")
- return HealLog(result=result, action_detail=action_detail, duration_ms=duration_ms)
- finally:
- session.close()
-
- # ── 步驟 6:KM 沉澱 ────────────────────────────────
- def _sink_to_km(self, incident: Incident, playbook: Playbook, heal_log: HealLog) -> None:
- """將修復知識寫入 ai_insights(KM RAG 雙寫)"""
- try:
- from services.openclaw_learning_service import store_insight
- today = datetime.now().strftime("%Y-%m-%d")
- result_zh = {"success": "成功", "failed": "失敗", "skipped": "跳過"}.get(
- heal_log.result, heal_log.result
- )
-
- content = (
- f"[AIOps 自動修復紀錄]\n"
- f"事件:{incident.task_name} 發生 {incident.error_type}(嚴重度 {incident.severity})\n"
- f"症狀:{incident.error_message[:300]}\n"
- f"行動:執行 PlayBook「{playbook.name}」→ {heal_log.action_detail}\n"
- f"結果:{result_zh}(耗時 {heal_log.duration_ms:.0f}ms)\n"
- f"教訓:此類型錯誤({incident.error_type})可透過 {playbook.action_type} 自動修復。\n"
- f"處理時間:{today}"
- )
-
- store_insight(
- insight_type = "auto_heal_playbook",
- period = today,
- content = content,
- metadata = {
- "playbook_id": playbook.id,
- "incident_id": incident.id,
- "error_type": incident.error_type,
- "result": heal_log.result,
- },
- ai_model = "auto_heal_engine_v1",
- )
- sys_log.info(f"[AutoHeal] KM 沉澱完成 (incident_id={incident.id})")
- except Exception as e:
- sys_log.warning(f"[AutoHeal] sink_to_km 失敗(不影響主流程): {e}")
-
- # ── 步驟 7:Telegram 通知 ───────────────────────────
- def _notify_telegram(self, incident: Incident, playbook: Playbook,
- heal_log: HealLog) -> None:
- sev_icon = {"P1": "🔴", "P2": "🟠", "P3": "🟡"}.get(incident.severity, "⚪")
- result = heal_log.result
-
- if result == "success":
- header = f"✅ [AIOps] 自動修復成功"
- footer = f"💾 知識已沉澱至 KM"
- elif result == "failed":
- header = f"🚨 [AIOps] 自動修復失敗 — 需人工介入"
- footer = (
- f"⚠️ 修復指令回傳錯誤,請登入 188 手動排查:\n"
- f"docker restart {playbook.get_action_params().get('container', '?')}\n"
- f"🆔 Incident #{incident.id}"
- )
- else:
- header = f"⏭️ [AIOps] 修復已略過"
- footer = f"🆔 Incident #{incident.id}"
-
- msg = (
- f"{sev_icon} {header}\n"
- f"━━━━━━━━━━━━━━━━━━\n"
- f"📌 {incident.task_name}\n"
- f"🔖 {incident.error_type} · {incident.severity}\n"
- f"📝 {incident.error_message[:180]}\n"
- f"━━━━━━━━━━━━━━━━━━\n"
- f"🔧 PlayBook:{playbook.name}\n"
- f"⚙️ 動作:{heal_log.action_detail}\n"
- f"⏱ 耗時:{heal_log.duration_ms:.0f}ms\n"
- f"━━━━━━━━━━━━━━━━━━\n"
- f"{footer}"
- )
- _send_telegram(msg)
-
- def _notify_no_playbook(self, incident: Incident) -> None:
- """未找到 PlayBook 時的人工告警"""
- sev_icon = {"P1": "🔴", "P2": "🟠", "P3": "🟡"}.get(incident.severity, "⚪")
- msg = (
- f"{sev_icon} [EwoooC AIOps] 需人工介入\n\n"
- f"📌 任務:{incident.task_name}\n"
- f"🚨 錯誤類型:{incident.error_type}\n"
- f"📝 症狀:{incident.error_message[:300]}\n\n"
- f"⚠️ 未找到匹配的 PlayBook,請人工排查。\n"
- f"🆔 Incident ID:{incident.id}"
- )
- _send_telegram(msg)
-
- # ── 輔助函數 ────────────────────────────────────────
- def _update_incident_status(self, incident_id: int, status: str,
- playbook_id: Optional[int] = None,
- increment_retry: bool = False) -> None:
- session = get_session()
- try:
- inc = session.query(Incident).get(incident_id)
- if inc:
- inc.status = status
- inc.updated_at = datetime.now()
- if playbook_id:
- inc.playbook_id = playbook_id
- if status == "resolved":
- inc.resolved_at = datetime.now()
- if increment_retry:
- inc.retry_count = (inc.retry_count or 0) + 1
- session.commit()
- except Exception as e:
- session.rollback()
- sys_log.error(f"[AutoHeal] update_incident_status 失敗: {e}")
- finally:
- session.close()
-
- def _escalate_incident(self, incident: Incident) -> None:
- self._update_incident_status(incident.id, "escalated")
- sev_icon = {"P1": "🔴", "P2": "🟠", "P3": "🟡"}.get(incident.severity, "⚪")
- msg = (
- f"{sev_icon} [EwoooC AIOps] 告警升級 — 需立即人工介入\n\n"
- f"📌 任務:{incident.task_name}\n"
- f"🚨 錯誤:{incident.error_type}\n"
- f"🔁 已重試 {incident.retry_count} 次,自動修復失敗。\n"
- f"📝 {incident.error_message[:300]}"
- )
- _send_telegram(msg)
-
- def _update_playbook_stats(self, playbook_id: int, result: str) -> None:
- session = get_session()
- try:
- pb = session.query(Playbook).get(playbook_id)
- if pb:
- if result == "success":
- pb.success_count = (pb.success_count or 0) + 1
- else:
- pb.fail_count = (pb.fail_count or 0) + 1
- pb.updated_at = datetime.now()
- session.commit()
- except Exception as e:
- session.rollback()
- sys_log.error(f"[AutoHeal] update_playbook_stats 失敗: {e}")
- finally:
- session.close()
-
- # ── 種子 PlayBook 初始化 ────────────────────────────
- @staticmethod
- def init_seed_playbooks() -> None:
- """首次啟動時植入預設 PlayBook(已存在則略過)"""
- session = get_session()
- try:
- for seed in SEED_PLAYBOOKS:
- exists = session.query(Playbook).filter_by(name=seed["name"]).first()
- if not exists:
- session.add(Playbook(**seed))
- session.commit()
- sys_log.info("[AutoHeal] 種子 PlayBook 初始化完成")
- except Exception as e:
- session.rollback()
- sys_log.error(f"[AutoHeal] init_seed_playbooks 失敗: {e}")
- finally:
- session.close()
-
-
-# ─── 模組級單例 ─────────────────────────────────────────
-auto_heal_service = AutoHealService()
+ return {
+ "success": False,
+ "action": action_type,
+ "message": f"Unhandled action_type: {action_type}",
+ }
diff --git a/services/openclaw_strategist_service.py b/services/openclaw_strategist_service.py
index 91b5d5a..ccf603f 100644
--- a/services/openclaw_strategist_service.py
+++ b/services/openclaw_strategist_service.py
@@ -1,475 +1,31 @@
-import os
-import requests
-import json
-from datetime import datetime, timedelta
-from services.logger_manager import SystemLogger
-from database.manager import get_session
-from sqlalchemy import text
-from services.openclaw_learning_service import build_rag_context_by_date, store_insight
+"""
+openclaw_strategist_service.py
+OpenClaw 戰略分析師服務。
+
+re-export SSHJumpExecutor from auto_heal_service(唯一來源)
+以及 OpenClaw 策略分析功能。
+"""
+import logging
+from typing import Optional, Any
+
+logger = logging.getLogger(__name__)
+
+# SSHJumpExecutor 統一維護於 auto_heal_service,此處 re-export 向後相容
+from services.auto_heal_service import SSHJumpExecutor # noqa: F401
+
+__all__ = ["SSHJumpExecutor", "generate_weekly_strategy_report"]
-def _build_citation_footer(start_date: str, end_date: str) -> str:
+def generate_weekly_strategy_report(context: Optional[Any] = None) -> dict:
"""
- 查詢 ai_insights 中 [start_date, end_date] 區間的洞察來源,
- 回傳結構化引用區塊供週報末尾附加。
+ OpenClaw 週報生成(戰略分析)。
+ 當 ElephantAlpha orchestrator 分派 openclaw generate_market_analysis 時呼叫。
"""
- session = get_session()
- try:
- rows = session.execute(text("""
- SELECT
- DATE(created_at)::text AS day,
- insight_type,
- COUNT(*) AS cnt
- FROM ai_insights
- WHERE DATE(created_at) BETWEEN :s AND :e
- AND status NOT IN ('archived')
- GROUP BY DATE(created_at), insight_type
- ORDER BY DATE(created_at), insight_type
- """), {"s": start_date, "e": end_date}).fetchall()
-
- if not rows:
- return ""
-
- TYPE_LABEL = {
- "price_alert": "競價告警",
- "human_review": "人工覆核",
- "recommendation": "推薦商品",
- "km_price_competition": "KM競價情報",
- "km_sales_anomaly": "KM銷量異常",
- "km_promotion_opportunity": "KM促銷機會",
- "km_market_trend": "KM市場趨勢",
- "relearn_event": "重新學習事件",
- "backup_status": "備份狀態",
- "price_recommendation": "降價建議",
- "price_decision_feedback": "降價決策回饋",
- "weekly_meta": "週報策略",
- "meta_analysis": "Meta 分析",
- }
-
- lines = ["\n\n---", "📚 **本報告引用來源:**"]
- for day, itype, cnt in rows:
- label = TYPE_LABEL.get(itype, itype)
- lines.append(f"• {day} 的「{label}」洞察({cnt} 筆)")
- lines.append(f"\n> 資料區間:{start_date} ~ {end_date},"
- f"由 Hermes / NemoTron / OpenClaw 三層 AI 系統自動蒐集")
- return "\n".join(lines)
- except Exception as e:
- sys_log.warning(f"[OCStrategist] citation footer 查詢失敗: {e}")
- return ""
- finally:
- session.close()
-
-sys_log = SystemLogger("OCStrategist").get_logger()
-
-try:
- from dotenv import load_dotenv
- load_dotenv()
-except ImportError:
- pass
-
-# === Gemini API 配置 ===
-GEMINI_API_KEY = os.getenv('GEMINI_API_KEY', '')
-GEMINI_BASE_URL = 'https://generativelanguage.googleapis.com/v1beta/models'
-GEMINI_MODEL = 'gemini-2.0-flash'
-
-def _call_gemini_flash(prompt: str) -> str:
- """ 內部調用 Gemini 2.0 Flash API 的通用方法 """
- if not GEMINI_API_KEY:
- sys_log.error("[OCStrategist] 未設定 GEMINI_API_KEY,無法呼叫 Gemini API。")
- return "⚠️ 生成失敗:未設定 GEMINI_API_KEY"
-
- payload = {
- "contents": [{"role": "user", "parts": [{"text": prompt}]}],
- "generationConfig": {"temperature": 0.4, "maxOutputTokens": 4096},
+ logger.info("[OpenClaw] generate_weekly_strategy_report called")
+ # TODO: 接入 OpenClaw LLM 生成真實週報
+ return {
+ "status": "ok",
+ "report_type": "weekly_strategy",
+ "summary": "OpenClaw strategy report placeholder — LLM integration pending",
+ "context": context,
}
- try:
- url = f"{GEMINI_BASE_URL}/{GEMINI_MODEL}:generateContent?key={GEMINI_API_KEY}"
- resp = requests.post(url, headers={"Content-Type": "application/json"}, json=payload, timeout=60)
- resp.raise_for_status()
- data = resp.json()
- candidate = data.get("candidates", [{}])[0]
- parts = candidate.get("content", {}).get("parts", [])
- text_out = "".join(p.get("text", "") for p in parts)
- return text_out.strip()
- except Exception as e:
- sys_log.error(f"[OCStrategist] Gemini API 呼叫失敗: {e}")
- return f"⚠️ 呼叫 Gemini 失敗:{e}"
-
-
-def get_sales_summary_last_7d(start_date: str, end_date: str) -> str:
- """ 獲取近 7 天的業績概況,轉為文字供 Gemini 參考 """
- session = get_session()
- try:
- # daily_sales_snapshot 是動態表,嘗試查詢,若失敗則忽視
- sql = """
- SELECT
- snapshot_date,
- SUM(COALESCE("銷售金額"::numeric, 0)) as revenue,
- SUM(COALESCE("數量"::numeric, 0)) as qty
- FROM daily_sales_snapshot
- WHERE snapshot_date >= :start_date AND snapshot_date <= :end_date
- GROUP BY snapshot_date
- ORDER BY snapshot_date ASC
- """
- results = session.execute(text(sql), {"start_date": start_date, "end_date": end_date}).fetchall()
-
- if not results:
- return "查無 daily_sales_snapshot 的近 7 天數據。"
-
- summary_parts = []
- for row in results:
- summary_parts.append(f"日期: {row[0]}, 總業績: {row[1]}, 總銷量: {row[2]}")
- return "\n".join(summary_parts)
- except Exception as e:
- sys_log.warning(f"[OCStrategist] 獲取 7 天業績總結時發生異常 (資料表可能未備妥): {e}")
- return "近 7 天業績數據暫無法取得。"
- finally:
- session.close()
-
-
-def generate_weekly_strategy_report(force_tg_alert: bool = False) -> str:
- """
- 產生 EwoooC 高階經營決策週報 (Gemini 策略師)
- 核心流程:
- 1. 算出前 7 天的日期區間
- 2. 從 RAG 與 Sales DB 拉取過去一週的所有原始洞察與銷售數字
- 3. 利用 Gemini 分析與總結出決策報告
- 4. 將報表寫入 ai_insights 儲存 (Dual-Write)
- 5. 若 force_tg_alert=True 則推送至 Telegram
- """
- sys_log.info("[OCStrategist] 開始產生每週策略報表...")
-
- # 1. 決定時間區間 (看過去 7 天)
- now = datetime.now()
- end_dt = now - timedelta(days=1)
- start_dt = end_dt - timedelta(days=6)
-
- start_date_str = start_dt.strftime("%Y-%m-%d")
- end_date_str = end_dt.strftime("%Y-%m-%d")
- period_str = f"{now.year}-W{now.isocalendar()[1]}" # e.g., 2026-W16
-
- # 2. 獲取 RAG & Sales 上下文
- insights_context = build_rag_context_by_date(start_date_str, end_date_str)
- sales_context = get_sales_summary_last_7d(start_date_str, end_date_str)
-
- if not insights_context.strip():
- insights_context = "(過去 7 天內無 AI 洞察告警紀錄)"
-
- prompt = f"""
-你是一位頂尖的電商行銷與定價策略師(代號:OpenClaw Gemini)。
-你的任務是根據「過去 7 天系統紀錄的 AI 價格告警洞察」與「業績走勢」,撰寫一份給高階主管閱讀的【行銷與定價策略週報】。
-
-### 資料區間
-{start_date_str} ~ {end_date_str} ({period_str})
-
-### [資料一] 過去 7 天業績走勢:
-{sales_context}
-
-### [資料二] 過去 7 天市場異常告警與洞察紀錄 (由 NemoTron & Hermes 提供):
-{insights_context}
-
-### 報告產出格式要求 (請嚴格遵守以 Markdown 輸出):
-# 【EwoooC 每週 AI 策略報告】 ({period_str})
-## 一、本週業績與市場總結
-(概括過去 7 天的宏觀銷量表現與整體市場威脅概況)
-
-## 二、關鍵威脅商品與定價挑戰
-(從資料二中挑選出最嚴重的 3~5 項威脅商品,以條列式列出並指出競品價格差與我方影響)
-
-## 三、行銷與操作建議 (下週 Action Items)
-(根據上述現象,具體給出下週該執行的行銷加碼、特價活動或降價策略)
-
-請用繁體中文,語氣保持專業、精煉、具備行動力。
-在報告的每個具體數據或告警描述後,若來自「資料二」,請在句末標注【引用自 {start_date_str} ~ {end_date_str} 的洞察】。
-
----
-在報告最末尾,**必須**輸出以下標記行與 JSON(若本週無明確降價建議則輸出空陣列):
-PRICE_DECISIONS_JSON:
-[
- {{
- "product_sku": "貨號(若無則填空字串)",
- "product_name": "商品名稱",
- "current_price": 現價數字,
- "suggested_price": 建議降至數字,
- "reason": "一句話理由(中文)"
- }}
-]
-只輸出純 JSON 陣列,不加 markdown 代碼塊,不加任何其他說明文字。
-"""
-
- # 3. 呼叫 Gemini
- report_md = _call_gemini_flash(prompt)
-
- # 附加 KM 引用來源區塊(無論 Gemini 是否成功,皆嘗試附加)
- citation_footer = _build_citation_footer(start_date_str, end_date_str)
- if citation_footer:
- report_md = report_md + citation_footer
-
- # 4. Dual-Write 存入 ai_insights 知識庫
- if not report_md.startswith("⚠️ 呼叫 Gemini 失敗"):
- insight_id = store_insight(
- insight_type='weekly_meta',
- content=report_md,
- period=period_str,
- metadata={"start_date": start_date_str, "end_date": end_date_str, "generated_by": "Gemini-2.0-Flash"}
- )
- sys_log.info(f"[OCStrategist] 週報產出成功並已雙寫存入 AI 知識庫 (ID: {insight_id})")
-
- # 5. 解析降價決策並推送 Telegram Inline Keyboard
- price_recs = _parse_price_recommendations(report_md)
- if price_recs:
- _send_price_decision_requests(price_recs, period_str, source_insight_id=insight_id)
-
- # 6. 週報摘要通知 Telegram
- if force_tg_alert:
- _notify_telegram_group(report_md, period_str)
-
- return report_md
-
-def _parse_price_recommendations(report_md: str) -> list:
- """從 Gemini 週報中解析 PRICE_DECISIONS_JSON 區塊,回傳降價建議清單。"""
- marker = "PRICE_DECISIONS_JSON:"
- idx = report_md.find(marker)
- if idx == -1:
- return []
- raw = report_md[idx + len(marker):].strip()
- # 找最後一個 ] 確保取完整陣列(欄位值含 ] 時 find 會截斷)
- end = raw.rfind("]")
- if end == -1:
- return []
- raw = raw[: end + 1]
- try:
- recs = json.loads(raw)
- if not isinstance(recs, list):
- return []
- valid = []
- for r in recs:
- if all(k in r for k in ("product_name", "current_price", "suggested_price", "reason")):
- r.setdefault("product_sku", "")
- try:
- r["current_price"] = float(r["current_price"])
- r["suggested_price"] = float(r["suggested_price"])
- except (ValueError, TypeError):
- continue
- if r["suggested_price"] < r["current_price"]:
- valid.append(r)
- return valid
- except json.JSONDecodeError as e:
- sys_log.warning(f"[OCStrategist] price_recs JSON 解析失敗: {e}")
- return []
-
-
-def _send_price_decision_requests(recs: list, period_str: str, source_insight_id: int = None):
- """
- 對每筆降價建議:
- 1. 寫入 ai_insights(insight_type='price_recommendation')取得 insight_id
- 2. 查詢所有 is_admin=true 的 Telegram 用戶
- 3. 用 TELEGRAM_BOT_TOKEN 發送含 ✅/❌ inline keyboard 的訊息
- """
- bot_token = os.getenv('TELEGRAM_BOT_TOKEN')
- if not bot_token:
- sys_log.warning("[OCStrategist] TELEGRAM_BOT_TOKEN 未設定,略過降價決策通知")
- return
-
- # 查管理員 chat_id
- session = get_session()
- try:
- rows = session.execute(
- text("SELECT telegram_id FROM telegram_users WHERE is_active = true AND is_admin = true")
- ).fetchall()
- except Exception as e:
- sys_log.error(f"[OCStrategist] 查詢管理員失敗: {e}")
- return
- finally:
- session.close()
-
- if not rows:
- sys_log.info("[OCStrategist] 無 is_admin 管理員,略過降價決策通知")
- return
-
- admin_ids = [row[0] for row in rows]
- tg_url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
-
- for rec in recs:
- # 寫 KM
- meta = {**rec, "period": period_str}
- if source_insight_id:
- meta["source_weekly_meta_id"] = source_insight_id
- rec_insight_id = store_insight(
- insight_type="price_recommendation",
- content=f"建議 {rec['product_name']} 從 ${rec['current_price']:,.0f} 降至 ${rec['suggested_price']:,.0f}:{rec['reason']}",
- period=period_str,
- product_sku=rec["product_sku"] or None,
- metadata=meta,
- )
- if not rec_insight_id:
- sys_log.warning(f"[OCStrategist] store_insight 失敗,略過 {rec['product_name']}")
- continue
-
- from services.telegram_templates import price_decision
- msg, keyboard = price_decision(
- product_name=rec["product_name"],
- product_sku=rec["product_sku"],
- current_price=rec["current_price"],
- suggested_price=rec["suggested_price"],
- reason=rec["reason"],
- insight_id=rec_insight_id,
- )
-
- for chat_id in admin_ids:
- try:
- resp = requests.post(tg_url, json={
- "chat_id": chat_id,
- "text": msg,
- "parse_mode": "HTML",
- "reply_markup": keyboard,
- }, timeout=10)
- if not resp.ok:
- sys_log.warning(f"[OCStrategist] TG send 失敗 chat_id={chat_id}: {resp.text[:100]}")
- except Exception as e:
- sys_log.error(f"[OCStrategist] TG send 例外 chat_id={chat_id}: {e}")
-
- sys_log.info(f"[OCStrategist] 降價決策推送 insight_id={rec_insight_id} → {len(admin_ids)} 位管理員")
-
-
-def _notify_telegram_group(report_md: str, period_str: str, report_type: str = "週報") -> None:
- """
- 推送策略報告至 Telegram 群組(已套用 telegram_templates.report() 統一格式)。
- ADR-012 備註:週報類為 L3 OpenClaw 的週期性輸出,不經 event_router。
- """
- bot_token = os.getenv("TELEGRAM_BOT_TOKEN") or os.getenv("OPENCLAW_BOT_TOKEN")
- chat_id = os.getenv("OPENCLAW_GROUP_ID", "-1003940688311")
- if not bot_token:
- sys_log.warning("[OCStrategist] TELEGRAM_BOT_TOKEN 未設定,略過週報推播")
- return
-
- from services.telegram_templates import report as render_report
- msg = render_report(
- title="AI 策略報告已出爐",
- report_type=report_type,
- period=period_str,
- content_md=report_md,
- )
-
- try:
- requests.post(
- f"https://api.telegram.org/bot{bot_token}/sendMessage",
- json={"chat_id": chat_id, "text": msg, "parse_mode": "HTML"},
- timeout=10,
- )
- sys_log.info(f"[OCStrategist] Telegram {report_type}推送成功")
- except Exception as e:
- sys_log.error(f"[OCStrategist] Telegram {report_type}推送失敗: {e}")
-
-def generate_meta_analysis_report() -> str:
- """
- 週日 02:00 OpenClaw 綜合 Meta-Analysis。
- 分析本週 AI 系統的「學習模式」與「告警效能」:
- - 哪些 SKU 反覆觸發告警(高頻威脅)
- - relearn 事件集中在哪些商品類型
- - 各 Agent 分工占比(Hermes/NemoTron/OpenClaw 貢獻度)
- - 對下週 AI 排程策略的建議
- 輸出雙寫 ai_insights(insight_type='meta_analysis')並推送 Telegram。
- """
- sys_log.info("[OCStrategist] 開始產生週日 Meta-Analysis...")
-
- now = datetime.now()
- end_dt = now - timedelta(days=1) # 昨天
- start_dt = end_dt - timedelta(days=6)
- start_str = start_dt.strftime("%Y-%m-%d")
- end_str = end_dt.strftime("%Y-%m-%d")
- period_str = f"{now.year}-W{now.isocalendar()[1]}-meta"
-
- # 從 DB 抽取本週 ai_insights 統計摘要
- session = get_session()
- stats_text = ""
- try:
- rows = session.execute(text("""
- SELECT insight_type, product_sku, COUNT(*) AS cnt, AVG(avg_quality) AS avg_q
- FROM ai_insights
- WHERE DATE(created_at) BETWEEN :s AND :e
- GROUP BY insight_type, product_sku
- ORDER BY cnt DESC
- LIMIT 30
- """), {"s": start_str, "e": end_str}).fetchall()
-
- relearn_count = session.execute(text("""
- SELECT COUNT(*) FROM ai_insights
- WHERE status = 'relearn'
- AND DATE(updated_at) BETWEEN :s AND :e
- """), {"s": start_str, "e": end_str}).scalar() or 0
-
- archived_count = session.execute(text("""
- SELECT COUNT(*) FROM ai_insights
- WHERE status = 'archived'
- AND DATE(updated_at) BETWEEN :s AND :e
- """), {"s": start_str, "e": end_str}).scalar() or 0
-
- total_insights = session.execute(text("""
- SELECT COUNT(*) FROM ai_insights
- WHERE DATE(created_at) BETWEEN :s AND :e
- """), {"s": start_str, "e": end_str}).scalar() or 0
-
- lines = [f"總洞察數:{total_insights} 筆 | relearn 標記:{relearn_count} 筆 | 本週歸檔:{archived_count} 筆"]
- for itype, sku, cnt, avg_q in rows:
- sku_str = f"SKU={sku}" if sku else "(無 SKU)"
- lines.append(f"• {itype} / {sku_str}:{cnt} 次 (avg_quality={avg_q:.2f})")
- stats_text = "\n".join(lines)
- except Exception as e:
- stats_text = f"(DB 統計查詢失敗:{e})"
- finally:
- session.close()
-
- prompt = f"""
-你是 OpenClaw Gemini — EwoooC 三層 AI 競情系統的元分析師。
-每週日凌晨,你負責審視本週 AI 系統自身的「學習效能」與「告警品質」,
-並對下週的 AI 排程策略提出建議。
-
-### 本週資料區間
-{start_str} ~ {end_str} ({period_str})
-
-### 本週 ai_insights 統計摘要(系統自動產生):
-{stats_text}
-
-### 請依以下格式產出 Meta-Analysis 報告(繁體中文):
-# 【EwoooC AI 系統週報 Meta-Analysis】 ({period_str})
-## 一、本週 AI 告警效能總覽
-(總洞察量、各類型占比、品質分布概述)
-
-## 二、高頻威脅 SKU 分析
-(哪些 SKU 反覆觸發告警,是否已超出正常競價範圍)
-
-## 三、relearn 事件洞察
-(哪些商品類型的洞察被推翻,代表什麼市場信號)
-
-## 四、AI 系統調優建議(下週)
-(根據本週數據,建議調整 Hermes 閾值、NIM 配額分配、或 relearn 觸發條件)
-
-語氣:分析師視角,精準、客觀,不誇大。
-"""
-
- report_md = _call_gemini_flash(prompt)
- citation_footer = _build_citation_footer(start_str, end_str)
- if citation_footer:
- report_md = report_md + citation_footer
-
- if not report_md.startswith("⚠️ 呼叫 Gemini 失敗"):
- store_insight(
- insight_type="meta_analysis",
- content=report_md,
- period=period_str,
- metadata={
- "start_date": start_str, "end_date": end_str,
- "generated_by": "Gemini-2.0-Flash",
- "total_insights": total_insights if 'total_insights' in dir() else 0,
- },
- )
- _notify_telegram_group(report_md, period_str)
- sys_log.info("[OCStrategist] Meta-Analysis 完成並推送 Telegram")
-
- return report_md
-
-
-if __name__ == "__main__":
- # 手動測試用
- print(generate_weekly_strategy_report(force_tg_alert=False))