diff --git a/.gitignore b/.gitignore index a3f7218..d0b005b 100644 --- a/.gitignore +++ b/.gitignore @@ -68,6 +68,7 @@ database/*.db-shm database/*.db-wal database/*.sqlite database/*.sqlite3 +sqlite:/ # 備份檔案 backups/ diff --git a/database/ai_models.py b/database/ai_models.py index ea3960d..112dc49 100644 --- a/database/ai_models.py +++ b/database/ai_models.py @@ -121,14 +121,14 @@ class AIInsight(Base): product_sku = Column(String(50)) content = Column(Text, nullable=False) metadata_json = Column(Text) # JSON extra payload - avg_quality = Column(Float, default=0.0) - status = Column(String(20), default='active') # active / archived + avg_quality = Column(Float, default=0.5) + status = Column(String(20), default='approved') # approved / pending / rejected / archived decay_exempt = Column(Boolean, default=False) ai_model = Column(String(50)) feedback_up = Column(Integer, default=0) feedback_down = Column(Integer, default=0) - confidence = Column(Float, default=1.0) - created_by = Column(String(50)) + confidence = Column(Float, default=0.5) + created_by = Column(String(50), default='system') created_at = Column(DateTime, default=datetime.now) updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now) # embedding 欄位為 pgvector 型別,透過 raw SQL 寫入,此處不聲明以避免型別衝突 diff --git a/database/manager.py b/database/manager.py index 84ff9ef..a4a23e8 100644 --- a/database/manager.py +++ b/database/manager.py @@ -2,6 +2,7 @@ import os import re import threading from sqlalchemy import create_engine, desc, select, text, literal +from sqlalchemy.engine.url import make_url from sqlalchemy.orm import sessionmaker from datetime import datetime from .models import Base, Category, Product, PriceRecord, MonthlySummaryAnalysis @@ -112,8 +113,14 @@ class DatabaseManager: base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) db_path = os.path.join(base_dir, 'data', 'momo_database.db') - os.makedirs(os.path.dirname(db_path), exist_ok=True) - self.engine = create_engine(f'sqlite:///{db_path}', echo=False) + if str(db_path).startswith('sqlite://'): + sqlite_db_file = make_url(db_path).database + if sqlite_db_file: + os.makedirs(os.path.dirname(sqlite_db_file), exist_ok=True) + self.engine = create_engine(db_path, echo=False) + else: + os.makedirs(os.path.dirname(db_path), exist_ok=True) + self.engine = create_engine(f'sqlite:///{db_path}', echo=False) Base.metadata.create_all(self.engine) self.Session = sessionmaker(bind=self.engine) self._check_and_fix_schema() diff --git a/docs/memory/README.md b/docs/memory/README.md index 255b7b4..0d67e16 100644 --- a/docs/memory/README.md +++ b/docs/memory/README.md @@ -16,6 +16,7 @@ | `credentials_passbook.md` | 伺服器、帳密、埠位對照 | 需要維運、部署、憑證核對時 | | `feedback_db_metadata_import.md` | SQLAlchemy metadata / `create_all()` 漏表鐵律 | 新增 model、修 schema、排查 fresh env 漏表時 | | `project_phase3f_cleanup_roadmap.md` | ADR-017 執行矩陣與階段紅線 | 正在做 3f 模組化收尾時 | +| `schema_inventory_baseline.md` | DB 表分類與 drift 基線 | 要收斂 migration / ORM / raw SQL 真相時 | ## 關聯 Guide diff --git a/docs/memory/schema_inventory_baseline.md b/docs/memory/schema_inventory_baseline.md new file mode 100644 index 0000000..b94442e --- /dev/null +++ b/docs/memory/schema_inventory_baseline.md @@ -0,0 +1,75 @@ +# Schema Inventory Baseline + +> 目的:提供之後做 DB 收斂、migration 對齊、fresh env 驗證時的最小入口。 + +## 何時閱讀 + +- 在修 ORM / migration drift 時 +- 在新增資料表或欄位時 +- 在排查「哪張表才是真正 source of truth」時 + +## 目前已確認的三類表 + +### 1. ORM + metadata 已納管 + +- `categories` +- `products` +- `price_records` +- `monthly_summary_analysis` +- `users` +- `login_history` +- `permissions` +- `user_permissions` +- `promo_products` +- `trend_records` +- `trend_keywords` +- `trend_analysis` +- `web_search_cache` +- `telegram_users` +- `ai_generation_history` +- `ai_prompt_templates` +- `ai_usage_tracking` +- `ai_insights` +- `agent_context` +- `action_plans` +- `action_outcomes` +- `agent_strategy_weights` +- `incidents` +- `playbooks` +- `heal_logs` +- `import_jobs` +- `import_config` +- `notification_templates` +- `ppt_reports` +- `vendor_stockout` +- `vendor_list` +- `vendor_emails` +- `email_send_log` +- `realtime_sales_monthly` + +### 2. SQL migration / raw SQL 仍在用,但未見完整 ORM source of truth + +- `ai_price_recommendations` +- `competitor_prices` +- `embedding_retry_queue` +- `backup_log` + +### 3. 目前最需要優先對齊的 drift + +- `incidents` + ORM:`traceback_str`, `matched_playbook_id` + migration 013:`error_traceback`, `playbook_id` + +- `ai_insights` + migration 010 預設:`avg_quality=0.5`, `status='approved'` + migration 015 預設:`confidence=0.5`, `created_by='system'` + ORM 已在 2026-04-29 P1-1 對齊這組預設 + +## 下一步建議 + +1. 為每張 live table 標記: + `ORM-first` + `migration-first` + `legacy/raw-sql` +2. 為 drift 最嚴重的表補正式 migration 修正腳本 +3. 建立 fresh env schema smoke test diff --git a/migrations/013_autoheal.sql b/migrations/013_autoheal.sql index a3b2269..cc3ffe9 100644 --- a/migrations/013_autoheal.sql +++ b/migrations/013_autoheal.sql @@ -73,16 +73,16 @@ SELECT * FROM (VALUES 'Docker DNS 解析失敗修復', 'DNS_FAIL', '["name resolution", "could not translate host name", "Temporary failure in name resolution"]', - 'P2', 'DOCKER_RESTART', - '{"container": "momo-db"}', + 'P2', 'ALERT_ONLY', + '{"message": "DB/DNS 異常,依 ADR-011 不重啟 momo-db,轉人工檢查。"}', 30, 3 ), ( 'DB 連線被拒修復', 'DB_UNREACHABLE', '["connection refused", "Connection reset by peer", "could not connect to server"]', - 'P2', 'DOCKER_RESTART', - '{"container": "momo-db", "compose": true}', + 'P2', 'ALERT_ONLY', + '{"message": "DB 無法連線,依 ADR-011 不重啟 momo-db,轉人工檢查。"}', 30, 3 ), ( diff --git a/migrations/021_disable_momo_db_autoheal_restart.sql b/migrations/021_disable_momo_db_autoheal_restart.sql new file mode 100644 index 0000000..9bbf3b1 --- /dev/null +++ b/migrations/021_disable_momo_db_autoheal_restart.sql @@ -0,0 +1,9 @@ +-- Migration 021: 禁止 AutoHeal PlayBook 重啟 momo-db +-- 原因:ADR-011 / AGENTS.md 生產紅線要求不得影響 momo-db 生命週期。 + +UPDATE playbooks + SET action_type = 'ALERT_ONLY', + action_params = '{"message": "DB/DNS 異常,依 ADR-011 不重啟 momo-db,轉人工檢查。"}', + updated_at = NOW() + WHERE action_type = 'DOCKER_RESTART' + AND action_params LIKE '%momo-db%'; diff --git a/services/auto_heal_service.py b/services/auto_heal_service.py index 44277d6..0d6dc08 100644 --- a/services/auto_heal_service.py +++ b/services/auto_heal_service.py @@ -18,7 +18,7 @@ import os import re import sqlite3 import threading -from dataclasses import dataclass, asdict +from dataclasses import dataclass from datetime import datetime, timedelta from typing import Dict, Any, List, Optional @@ -42,6 +42,25 @@ ESCALATION_COOLDOWN_MIN = int(os.getenv("ELEPHANT_ALPHA_ESCALATION_COOLDOWN_MIN" # ---- Constants ---- _ALLOWED_ACTION_TYPES = frozenset({"DOCKER_RESTART", "WAIT_RETRY", "ALERT_ONLY", "SSH_CMD", "CODE_FIX"}) +_PROTECTED_CONTAINERS = frozenset({"momo-db", "momo-postgres"}) +_OFFLINE_PLAYBOOKS = { + "DNS_FAIL": { + "id": 0, + "name": "Offline DNS/DB guard", + "action_type": "ALERT_ONLY", + "action_params": {"message": "DB/DNS 異常,依 ADR-011 不重啟 momo-db,轉人工檢查。"}, + "max_retries": 1, + "cooldown_min": 30, + }, + "DB_UNREACHABLE": { + "id": 0, + "name": "Offline DB guard", + "action_type": "ALERT_ONLY", + "action_params": {"message": "DB 無法連線,依 ADR-011 不重啟 momo-db,轉人工檢查。"}, + "max_retries": 1, + "cooldown_min": 30, + }, +} # ---- DB / dedup ---- _dedup_lock = threading.Lock() @@ -139,6 +158,7 @@ def _ssh_exec( # ---- PlayBook ---- def _find_playbook(error_type: str) -> Optional[Dict[str, Any]]: + session = None try: session = get_session() row = session.execute( @@ -155,8 +175,12 @@ def _find_playbook(error_type: str) -> Optional[Dict[str, Any]]: "cooldown_min": row.cooldown_min, } return None + except Exception as exc: + logger.warning("[AutoHeal] playbook lookup failed for %s: %s", error_type, exc) + return None finally: - session.close() + if session is not None: + session.close() # ---- Executor ---- @@ -195,7 +219,7 @@ class AutoHealService: if incident_id is not None: context["incident_id"] = incident_id - playbook = _find_playbook(error_type) + playbook = _find_playbook(error_type) or self._offline_playbook(error_type) if not playbook: msg = f"No playbook matched for error_type={error_type}" self._log.info("[AutoHeal] %s", msg) @@ -210,7 +234,8 @@ class AutoHealService: return self._handle_code_fix(playbook, context) # cooldown guard - last = _load_escalation(playbook["action_type"]) + cooldown_key = self._cooldown_key(playbook) + last = _load_escalation(cooldown_key) if last and (datetime.now().timestamp() - last) / 60 < playbook["cooldown_min"]: msg = f"Cooldown active for {playbook['action_type']}" self._log.info("[AutoHeal] %s", msg) @@ -224,8 +249,15 @@ class AutoHealService: traceback_text = str(context.get("traceback_str") or "").lower() combined = f"{error_text}\n{traceback_text}" - if "could not translate host name" in combined or "db_connection_error" in combined: - return "db_connection_error" + if "could not translate host name" in combined or "temporary failure in name resolution" in combined: + return "DNS_FAIL" + if ( + "db_connection_error" in combined + or "connection refused" in combined + or "could not connect to server" in combined + or "operationalerror" in combined + ): + return "DB_UNREACHABLE" if "timeout" in combined: return "crawler_timeout" if "nvidia" in combined and "quota" in combined: @@ -234,11 +266,23 @@ class AutoHealService: return "embedding_failure" return "scheduler_task_failure" - def _ensure_incident(self, error_type: str, context: Dict[str, Any]) -> Optional[int]: - from database.autoheal_models import Incident + def _offline_playbook(self, error_type: str) -> Optional[Dict[str, Any]]: + """Fallback when the primary DB is unavailable during DB/DNS incidents.""" + playbook = _OFFLINE_PLAYBOOKS.get(error_type) + return dict(playbook) if playbook else None - session = get_session() + def _cooldown_key(self, playbook: Dict[str, Any]) -> str: + playbook_id = playbook.get("id") + if playbook_id: + return f"playbook:{playbook_id}:{playbook['action_type']}" + return f"offline:{playbook.get('name', playbook['action_type'])}:{playbook['action_type']}" + + def _ensure_incident(self, error_type: str, context: Dict[str, Any]) -> Optional[int]: + session = None try: + from database.autoheal_models import Incident + + session = get_session() incident = Incident( task_name=str(context.get("task_name") or context.get("source") or "unknown_task"), error_type=error_type, @@ -255,11 +299,13 @@ class AutoHealService: self._log.info("[AutoHeal] incident created: id=%s error_type=%s", incident_id, error_type) return incident_id except Exception as exc: - session.rollback() + if session is not None: + session.rollback() self._log.error("[AutoHeal] incident create failed: %s", exc) return None finally: - session.close() + if session is not None: + session.close() def _handle_code_fix(self, playbook: Dict[str, Any], context: Dict[str, Any]) -> AutoHealResult: from services.aider_heal_executor import execute_code_fix @@ -310,6 +356,12 @@ class AutoHealService: safe_container = re.sub(r"[^a-zA-Z0-9._-]", "", container) if safe_container != container: return AutoHealResult(success=False, action="DOCKER_RESTART", message=f"unsafe container: {container}") + if safe_container in _PROTECTED_CONTAINERS: + return AutoHealResult( + success=False, + action="DOCKER_RESTART", + message=f"protected container blocked by ADR-011: {safe_container}", + ) result = _ssh_exec( jump_host=SSH_JUMP_HOST, @@ -365,7 +417,7 @@ class AutoHealService: result: "AutoHealResult", duration_ms: float = 0.0, ) -> None: - _store_escalation(playbook["action_type"]) + _store_escalation(self._cooldown_key(playbook)) self._log.info("[AutoHeal] Alert stored for: %s", playbook["action_type"]) self._write_heal_log(playbook, context, result, duration_ms) self._write_ai_insight(playbook, context, result, duration_ms) @@ -384,11 +436,13 @@ class AutoHealService: if not incident_id: self._log.warning("[AutoHeal] _write_heal_log: incident_id missing in context, skipping DB write") return - session = get_session() + session = None try: + session = get_session() + playbook_id = int(playbook["id"]) if int(playbook.get("id") or 0) > 0 else None log_entry = HealLog( incident_id=int(incident_id), - playbook_id=int(playbook["id"]), + playbook_id=playbook_id, action_type=playbook["action_type"], action_detail=json.dumps(playbook.get("action_params", {}), ensure_ascii=False), result="success" if result.success else "failed", @@ -403,9 +457,11 @@ class AutoHealService: ) except Exception as exc: self._log.error("[AutoHeal] _write_heal_log DB error: %s", exc) - session.rollback() + if session is not None: + session.rollback() finally: - session.close() + if session is not None: + session.close() def _write_ai_insight( self, @@ -416,8 +472,9 @@ class AutoHealService: ) -> None: from database.ai_models import AIInsight - session = get_session() + session = None try: + session = get_session() payload = { "incident_id": context.get("incident_id"), "task_name": context.get("task_name"), @@ -440,23 +497,26 @@ class AutoHealService: confidence=1.0 if result.success else 0.4, created_by="auto_heal_service", ai_model="rule_based", - status="active", + status="approved", ) session.add(insight) session.commit() except Exception as exc: self._log.error("[AutoHeal] ai_insight write failed: %s", exc) - session.rollback() + if session is not None: + session.rollback() finally: - session.close() + if session is not None: + session.close() def _update_incident_status(self, context: Dict[str, Any], result: "AutoHealResult") -> None: incident_id = context.get("incident_id") if not incident_id: return - session = get_session() + session = None try: + session = get_session() session.execute( text(""" UPDATE incidents @@ -471,10 +531,12 @@ class AutoHealService: ) session.commit() except Exception as exc: - session.rollback() + if session is not None: + session.rollback() self._log.error("[AutoHeal] incident status update failed: %s", exc) finally: - session.close() + if session is not None: + session.close() auto_heal_service = AutoHealService() diff --git a/services/notification_manager.py b/services/notification_manager.py index 03311f3..d473057 100644 --- a/services/notification_manager.py +++ b/services/notification_manager.py @@ -10,6 +10,7 @@ from collections import defaultdict from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from email.mime.image import MIMEImage +from services.telegram_templates import send_telegram_with_result def _format_product_changes_message(products, title="WOOO TECH 商品異動通知", public_url=None): """ @@ -219,36 +220,27 @@ class NotificationManager: return False success = True - for chat_id in self.telegram_chat_ids: - for message in messages: - url = f"https://api.telegram.org/bot{self.telegram_token}/sendMessage" - # Telegram 的 Markdown V2 需要對特殊字元進行轉義 - safe_message = re.sub(r'([_*\\~`>#\+\-=|{}.!()\[\]])', r'\\\1', message) - payload = { - 'chat_id': chat_id, - 'text': safe_message, - 'parse_mode': 'MarkdownV2', - 'disable_web_page_preview': True - } - try: - resp = requests.post(url, json=payload, timeout=10) - if resp.status_code != 200: - self.logger.error(f"[Notification] [Telegram] ❌ Telegram 通知發送失敗 | ChatID: {chat_id} | Code: {resp.status_code} | Body: {resp.text}") - success = False - - # V-New: 發送圖片 (僅在最後一則訊息後發送) - # V-Fix: 改用 sendDocument 以保留高解析度,避免 Telegram 自動壓縮 - if image_path and message == messages[-1] and os.path.exists(image_path): - try: - with open(image_path, 'rb') as photo: - # 使用 sendDocument 發送完整解析度截圖(不壓縮) - requests.post(f"https://api.telegram.org/bot{self.telegram_token}/sendDocument", - data={'chat_id': chat_id}, files={'document': photo}, timeout=20) - except Exception as e: - self.logger.error(f"[Notification] [Telegram] ❌ Telegram 圖片發送失敗 | Error: {e}") + for message in messages: + result = send_telegram_with_result(message, chat_ids=self.telegram_chat_ids, parse_mode=None) + if result["failed"] > 0 or result["sent"] == 0: + self.logger.error( + "[Notification] [Telegram] ❌ Telegram 通知發送失敗 | Sent: %s | Failed: %s | Errors: %s", + result["sent"], result["failed"], result["errors"] + ) + success = False + if image_path and os.path.exists(image_path): + for chat_id in self.telegram_chat_ids: + try: + with open(image_path, 'rb') as photo: + requests.post( + f"https://api.telegram.org/bot{self.telegram_token}/sendDocument", + data={'chat_id': chat_id}, + files={'document': photo}, + timeout=20 + ) except Exception as e: - self.logger.error(f"[Notification] [Telegram] ❌ Telegram 連線錯誤 | ChatID: {chat_id} | Error: {e}") + self.logger.error(f"[Notification] [Telegram] ❌ Telegram 圖片發送失敗 | ChatID: {chat_id} | Error: {e}") success = False if success: self.logger.info("[Notification] [Telegram] ✅ Telegram 通知發送成功") @@ -384,4 +376,4 @@ class NotificationManager: self.logger.warning(f"[Notification] [Whitepage] 🚨 準備發送白頁告警 | URL: {url} | Error: {error_msg}") self._send_line_messages([msg]) self._send_telegram_messages([msg]) - self._send_email_message(f"🚨 網頁異常告警 - {url}", msg) \ No newline at end of file + self._send_email_message(f"🚨 網頁異常告警 - {url}", msg) diff --git a/services/telegram_templates.py b/services/telegram_templates.py index 7346e6c..b794f38 100644 --- a/services/telegram_templates.py +++ b/services/telegram_templates.py @@ -56,7 +56,7 @@ def _get_chat_ids() -> list: def _send_telegram_raw(text: str, chat_ids: Optional[list] = None, reply_markup: Optional[Dict[str, Any]] = None, - parse_mode: str = "HTML") -> bool: + parse_mode: Optional[str] = "HTML") -> bool: """發送純文字訊息""" import requests token = _get_bot_token() @@ -68,7 +68,9 @@ def _send_telegram_raw(text: str, chat_ids: Optional[list] = None, chat_ids = [-1003940688311] # fallback url = f"https://api.telegram.org/bot{token}/sendMessage" - payload = {"chat_id": chat_ids[0], "text": text, "parse_mode": parse_mode} + payload = {"chat_id": chat_ids[0], "text": text} + if parse_mode: + payload["parse_mode"] = parse_mode if reply_markup: payload["reply_markup"] = json.dumps(reply_markup, ensure_ascii=False) try: @@ -84,7 +86,7 @@ def _send_telegram_raw(text: str, chat_ids: Optional[list] = None, def send_telegram_with_result(text: str, chat_ids: Optional[list] = None, reply_markup: Optional[Dict[str, Any]] = None, - parse_mode: str = "HTML") -> Dict[str, Any]: + parse_mode: Optional[str] = "HTML") -> Dict[str, Any]: """發送 Telegram 並回傳結果明細,供 EventRouter / AIOps 使用。""" token = _get_bot_token() if not token: @@ -103,7 +105,9 @@ def send_telegram_with_result(text: str, chat_ids: Optional[list] = None, errors: List[str] = [] for chat_id in chat_ids: - payload = {"chat_id": chat_id, "text": text, "parse_mode": parse_mode} + payload = {"chat_id": chat_id, "text": text} + if parse_mode: + payload["parse_mode"] = parse_mode if reply_markup: payload["reply_markup"] = json.dumps(reply_markup, ensure_ascii=False) try: