補強 5.5 自癒安全回看
All checks were successful
CD Pipeline / deploy (push) Successful in 1m11s

This commit is contained in:
OoO
2026-04-29 22:48:24 +08:00
parent 779b27f676
commit 0875dd8fda
10 changed files with 217 additions and 66 deletions

1
.gitignore vendored
View File

@@ -68,6 +68,7 @@ database/*.db-shm
database/*.db-wal
database/*.sqlite
database/*.sqlite3
sqlite:/
# 備份檔案
backups/

View File

@@ -121,14 +121,14 @@ class AIInsight(Base):
product_sku = Column(String(50))
content = Column(Text, nullable=False)
metadata_json = Column(Text) # JSON extra payload
avg_quality = Column(Float, default=0.0)
status = Column(String(20), default='active') # active / archived
avg_quality = Column(Float, default=0.5)
status = Column(String(20), default='approved') # approved / pending / rejected / archived
decay_exempt = Column(Boolean, default=False)
ai_model = Column(String(50))
feedback_up = Column(Integer, default=0)
feedback_down = Column(Integer, default=0)
confidence = Column(Float, default=1.0)
created_by = Column(String(50))
confidence = Column(Float, default=0.5)
created_by = Column(String(50), default='system')
created_at = Column(DateTime, default=datetime.now)
updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now)
# embedding 欄位為 pgvector 型別,透過 raw SQL 寫入,此處不聲明以避免型別衝突

View File

@@ -2,6 +2,7 @@ import os
import re
import threading
from sqlalchemy import create_engine, desc, select, text, literal
from sqlalchemy.engine.url import make_url
from sqlalchemy.orm import sessionmaker
from datetime import datetime
from .models import Base, Category, Product, PriceRecord, MonthlySummaryAnalysis
@@ -112,8 +113,14 @@ class DatabaseManager:
base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
db_path = os.path.join(base_dir, 'data', 'momo_database.db')
os.makedirs(os.path.dirname(db_path), exist_ok=True)
self.engine = create_engine(f'sqlite:///{db_path}', echo=False)
if str(db_path).startswith('sqlite://'):
sqlite_db_file = make_url(db_path).database
if sqlite_db_file:
os.makedirs(os.path.dirname(sqlite_db_file), exist_ok=True)
self.engine = create_engine(db_path, echo=False)
else:
os.makedirs(os.path.dirname(db_path), exist_ok=True)
self.engine = create_engine(f'sqlite:///{db_path}', echo=False)
Base.metadata.create_all(self.engine)
self.Session = sessionmaker(bind=self.engine)
self._check_and_fix_schema()

View File

@@ -16,6 +16,7 @@
| `credentials_passbook.md` | 伺服器、帳密、埠位對照 | 需要維運、部署、憑證核對時 |
| `feedback_db_metadata_import.md` | SQLAlchemy metadata / `create_all()` 漏表鐵律 | 新增 model、修 schema、排查 fresh env 漏表時 |
| `project_phase3f_cleanup_roadmap.md` | ADR-017 執行矩陣與階段紅線 | 正在做 3f 模組化收尾時 |
| `schema_inventory_baseline.md` | DB 表分類與 drift 基線 | 要收斂 migration / ORM / raw SQL 真相時 |
## 關聯 Guide

View File

@@ -0,0 +1,75 @@
# Schema Inventory Baseline
> 目的:提供之後做 DB 收斂、migration 對齊、fresh env 驗證時的最小入口。
## 何時閱讀
- 在修 ORM / migration drift 時
- 在新增資料表或欄位時
- 在排查「哪張表才是真正 source of truth」時
## 目前已確認的三類表
### 1. ORM + metadata 已納管
- `categories`
- `products`
- `price_records`
- `monthly_summary_analysis`
- `users`
- `login_history`
- `permissions`
- `user_permissions`
- `promo_products`
- `trend_records`
- `trend_keywords`
- `trend_analysis`
- `web_search_cache`
- `telegram_users`
- `ai_generation_history`
- `ai_prompt_templates`
- `ai_usage_tracking`
- `ai_insights`
- `agent_context`
- `action_plans`
- `action_outcomes`
- `agent_strategy_weights`
- `incidents`
- `playbooks`
- `heal_logs`
- `import_jobs`
- `import_config`
- `notification_templates`
- `ppt_reports`
- `vendor_stockout`
- `vendor_list`
- `vendor_emails`
- `email_send_log`
- `realtime_sales_monthly`
### 2. SQL migration / raw SQL 仍在用,但未見完整 ORM source of truth
- `ai_price_recommendations`
- `competitor_prices`
- `embedding_retry_queue`
- `backup_log`
### 3. 目前最需要優先對齊的 drift
- `incidents`
ORM`traceback_str`, `matched_playbook_id`
migration 013`error_traceback`, `playbook_id`
- `ai_insights`
migration 010 預設:`avg_quality=0.5`, `status='approved'`
migration 015 預設:`confidence=0.5`, `created_by='system'`
ORM 已在 2026-04-29 P1-1 對齊這組預設
## 下一步建議
1. 為每張 live table 標記:
`ORM-first`
`migration-first`
`legacy/raw-sql`
2. 為 drift 最嚴重的表補正式 migration 修正腳本
3. 建立 fresh env schema smoke test

View File

@@ -73,16 +73,16 @@ SELECT * FROM (VALUES
'Docker DNS 解析失敗修復',
'DNS_FAIL',
'["name resolution", "could not translate host name", "Temporary failure in name resolution"]',
'P2', 'DOCKER_RESTART',
'{"container": "momo-db"}',
'P2', 'ALERT_ONLY',
'{"message": "DB/DNS 異常,依 ADR-011 不重啟 momo-db轉人工檢查。"}',
30, 3
),
(
'DB 連線被拒修復',
'DB_UNREACHABLE',
'["connection refused", "Connection reset by peer", "could not connect to server"]',
'P2', 'DOCKER_RESTART',
'{"container": "momo-db", "compose": true}',
'P2', 'ALERT_ONLY',
'{"message": "DB 無法連線,依 ADR-011 不重啟 momo-db轉人工檢查。"}',
30, 3
),
(

View File

@@ -0,0 +1,9 @@
-- Migration 021: 禁止 AutoHeal PlayBook 重啟 momo-db
-- 原因ADR-011 / AGENTS.md 生產紅線要求不得影響 momo-db 生命週期。
UPDATE playbooks
SET action_type = 'ALERT_ONLY',
action_params = '{"message": "DB/DNS 異常,依 ADR-011 不重啟 momo-db轉人工檢查。"}',
updated_at = NOW()
WHERE action_type = 'DOCKER_RESTART'
AND action_params LIKE '%momo-db%';

View File

@@ -18,7 +18,7 @@ import os
import re
import sqlite3
import threading
from dataclasses import dataclass, asdict
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Dict, Any, List, Optional
@@ -42,6 +42,25 @@ ESCALATION_COOLDOWN_MIN = int(os.getenv("ELEPHANT_ALPHA_ESCALATION_COOLDOWN_MIN"
# ---- Constants ----
_ALLOWED_ACTION_TYPES = frozenset({"DOCKER_RESTART", "WAIT_RETRY", "ALERT_ONLY", "SSH_CMD", "CODE_FIX"})
_PROTECTED_CONTAINERS = frozenset({"momo-db", "momo-postgres"})
_OFFLINE_PLAYBOOKS = {
"DNS_FAIL": {
"id": 0,
"name": "Offline DNS/DB guard",
"action_type": "ALERT_ONLY",
"action_params": {"message": "DB/DNS 異常,依 ADR-011 不重啟 momo-db轉人工檢查。"},
"max_retries": 1,
"cooldown_min": 30,
},
"DB_UNREACHABLE": {
"id": 0,
"name": "Offline DB guard",
"action_type": "ALERT_ONLY",
"action_params": {"message": "DB 無法連線,依 ADR-011 不重啟 momo-db轉人工檢查。"},
"max_retries": 1,
"cooldown_min": 30,
},
}
# ---- DB / dedup ----
_dedup_lock = threading.Lock()
@@ -139,6 +158,7 @@ def _ssh_exec(
# ---- PlayBook ----
def _find_playbook(error_type: str) -> Optional[Dict[str, Any]]:
session = None
try:
session = get_session()
row = session.execute(
@@ -155,8 +175,12 @@ def _find_playbook(error_type: str) -> Optional[Dict[str, Any]]:
"cooldown_min": row.cooldown_min,
}
return None
except Exception as exc:
logger.warning("[AutoHeal] playbook lookup failed for %s: %s", error_type, exc)
return None
finally:
session.close()
if session is not None:
session.close()
# ---- Executor ----
@@ -195,7 +219,7 @@ class AutoHealService:
if incident_id is not None:
context["incident_id"] = incident_id
playbook = _find_playbook(error_type)
playbook = _find_playbook(error_type) or self._offline_playbook(error_type)
if not playbook:
msg = f"No playbook matched for error_type={error_type}"
self._log.info("[AutoHeal] %s", msg)
@@ -210,7 +234,8 @@ class AutoHealService:
return self._handle_code_fix(playbook, context)
# cooldown guard
last = _load_escalation(playbook["action_type"])
cooldown_key = self._cooldown_key(playbook)
last = _load_escalation(cooldown_key)
if last and (datetime.now().timestamp() - last) / 60 < playbook["cooldown_min"]:
msg = f"Cooldown active for {playbook['action_type']}"
self._log.info("[AutoHeal] %s", msg)
@@ -224,8 +249,15 @@ class AutoHealService:
traceback_text = str(context.get("traceback_str") or "").lower()
combined = f"{error_text}\n{traceback_text}"
if "could not translate host name" in combined or "db_connection_error" in combined:
return "db_connection_error"
if "could not translate host name" in combined or "temporary failure in name resolution" in combined:
return "DNS_FAIL"
if (
"db_connection_error" in combined
or "connection refused" in combined
or "could not connect to server" in combined
or "operationalerror" in combined
):
return "DB_UNREACHABLE"
if "timeout" in combined:
return "crawler_timeout"
if "nvidia" in combined and "quota" in combined:
@@ -234,11 +266,23 @@ class AutoHealService:
return "embedding_failure"
return "scheduler_task_failure"
def _ensure_incident(self, error_type: str, context: Dict[str, Any]) -> Optional[int]:
from database.autoheal_models import Incident
def _offline_playbook(self, error_type: str) -> Optional[Dict[str, Any]]:
"""Fallback when the primary DB is unavailable during DB/DNS incidents."""
playbook = _OFFLINE_PLAYBOOKS.get(error_type)
return dict(playbook) if playbook else None
session = get_session()
def _cooldown_key(self, playbook: Dict[str, Any]) -> str:
playbook_id = playbook.get("id")
if playbook_id:
return f"playbook:{playbook_id}:{playbook['action_type']}"
return f"offline:{playbook.get('name', playbook['action_type'])}:{playbook['action_type']}"
def _ensure_incident(self, error_type: str, context: Dict[str, Any]) -> Optional[int]:
session = None
try:
from database.autoheal_models import Incident
session = get_session()
incident = Incident(
task_name=str(context.get("task_name") or context.get("source") or "unknown_task"),
error_type=error_type,
@@ -255,11 +299,13 @@ class AutoHealService:
self._log.info("[AutoHeal] incident created: id=%s error_type=%s", incident_id, error_type)
return incident_id
except Exception as exc:
session.rollback()
if session is not None:
session.rollback()
self._log.error("[AutoHeal] incident create failed: %s", exc)
return None
finally:
session.close()
if session is not None:
session.close()
def _handle_code_fix(self, playbook: Dict[str, Any], context: Dict[str, Any]) -> AutoHealResult:
from services.aider_heal_executor import execute_code_fix
@@ -310,6 +356,12 @@ class AutoHealService:
safe_container = re.sub(r"[^a-zA-Z0-9._-]", "", container)
if safe_container != container:
return AutoHealResult(success=False, action="DOCKER_RESTART", message=f"unsafe container: {container}")
if safe_container in _PROTECTED_CONTAINERS:
return AutoHealResult(
success=False,
action="DOCKER_RESTART",
message=f"protected container blocked by ADR-011: {safe_container}",
)
result = _ssh_exec(
jump_host=SSH_JUMP_HOST,
@@ -365,7 +417,7 @@ class AutoHealService:
result: "AutoHealResult",
duration_ms: float = 0.0,
) -> None:
_store_escalation(playbook["action_type"])
_store_escalation(self._cooldown_key(playbook))
self._log.info("[AutoHeal] Alert stored for: %s", playbook["action_type"])
self._write_heal_log(playbook, context, result, duration_ms)
self._write_ai_insight(playbook, context, result, duration_ms)
@@ -384,11 +436,13 @@ class AutoHealService:
if not incident_id:
self._log.warning("[AutoHeal] _write_heal_log: incident_id missing in context, skipping DB write")
return
session = get_session()
session = None
try:
session = get_session()
playbook_id = int(playbook["id"]) if int(playbook.get("id") or 0) > 0 else None
log_entry = HealLog(
incident_id=int(incident_id),
playbook_id=int(playbook["id"]),
playbook_id=playbook_id,
action_type=playbook["action_type"],
action_detail=json.dumps(playbook.get("action_params", {}), ensure_ascii=False),
result="success" if result.success else "failed",
@@ -403,9 +457,11 @@ class AutoHealService:
)
except Exception as exc:
self._log.error("[AutoHeal] _write_heal_log DB error: %s", exc)
session.rollback()
if session is not None:
session.rollback()
finally:
session.close()
if session is not None:
session.close()
def _write_ai_insight(
self,
@@ -416,8 +472,9 @@ class AutoHealService:
) -> None:
from database.ai_models import AIInsight
session = get_session()
session = None
try:
session = get_session()
payload = {
"incident_id": context.get("incident_id"),
"task_name": context.get("task_name"),
@@ -440,23 +497,26 @@ class AutoHealService:
confidence=1.0 if result.success else 0.4,
created_by="auto_heal_service",
ai_model="rule_based",
status="active",
status="approved",
)
session.add(insight)
session.commit()
except Exception as exc:
self._log.error("[AutoHeal] ai_insight write failed: %s", exc)
session.rollback()
if session is not None:
session.rollback()
finally:
session.close()
if session is not None:
session.close()
def _update_incident_status(self, context: Dict[str, Any], result: "AutoHealResult") -> None:
incident_id = context.get("incident_id")
if not incident_id:
return
session = get_session()
session = None
try:
session = get_session()
session.execute(
text("""
UPDATE incidents
@@ -471,10 +531,12 @@ class AutoHealService:
)
session.commit()
except Exception as exc:
session.rollback()
if session is not None:
session.rollback()
self._log.error("[AutoHeal] incident status update failed: %s", exc)
finally:
session.close()
if session is not None:
session.close()
auto_heal_service = AutoHealService()

View File

@@ -10,6 +10,7 @@ from collections import defaultdict
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.image import MIMEImage
from services.telegram_templates import send_telegram_with_result
def _format_product_changes_message(products, title="WOOO TECH 商品異動通知", public_url=None):
"""
@@ -219,36 +220,27 @@ class NotificationManager:
return False
success = True
for chat_id in self.telegram_chat_ids:
for message in messages:
url = f"https://api.telegram.org/bot{self.telegram_token}/sendMessage"
# Telegram 的 Markdown V2 需要對特殊字元進行轉義
safe_message = re.sub(r'([_*\\~`>#\+\-=|{}.!()\[\]])', r'\\\1', message)
payload = {
'chat_id': chat_id,
'text': safe_message,
'parse_mode': 'MarkdownV2',
'disable_web_page_preview': True
}
try:
resp = requests.post(url, json=payload, timeout=10)
if resp.status_code != 200:
self.logger.error(f"[Notification] [Telegram] ❌ Telegram 通知發送失敗 | ChatID: {chat_id} | Code: {resp.status_code} | Body: {resp.text}")
success = False
# V-New: 發送圖片 (僅在最後一則訊息後發送)
# V-Fix: 改用 sendDocument 以保留高解析度,避免 Telegram 自動壓縮
if image_path and message == messages[-1] and os.path.exists(image_path):
try:
with open(image_path, 'rb') as photo:
# 使用 sendDocument 發送完整解析度截圖(不壓縮)
requests.post(f"https://api.telegram.org/bot{self.telegram_token}/sendDocument",
data={'chat_id': chat_id}, files={'document': photo}, timeout=20)
except Exception as e:
self.logger.error(f"[Notification] [Telegram] ❌ Telegram 圖片發送失敗 | Error: {e}")
for message in messages:
result = send_telegram_with_result(message, chat_ids=self.telegram_chat_ids, parse_mode=None)
if result["failed"] > 0 or result["sent"] == 0:
self.logger.error(
"[Notification] [Telegram] ❌ Telegram 通知發送失敗 | Sent: %s | Failed: %s | Errors: %s",
result["sent"], result["failed"], result["errors"]
)
success = False
if image_path and os.path.exists(image_path):
for chat_id in self.telegram_chat_ids:
try:
with open(image_path, 'rb') as photo:
requests.post(
f"https://api.telegram.org/bot{self.telegram_token}/sendDocument",
data={'chat_id': chat_id},
files={'document': photo},
timeout=20
)
except Exception as e:
self.logger.error(f"[Notification] [Telegram] ❌ Telegram 連線錯誤 | ChatID: {chat_id} | Error: {e}")
self.logger.error(f"[Notification] [Telegram] ❌ Telegram 圖片發送失敗 | ChatID: {chat_id} | Error: {e}")
success = False
if success: self.logger.info("[Notification] [Telegram] ✅ Telegram 通知發送成功")
@@ -384,4 +376,4 @@ class NotificationManager:
self.logger.warning(f"[Notification] [Whitepage] 🚨 準備發送白頁告警 | URL: {url} | Error: {error_msg}")
self._send_line_messages([msg])
self._send_telegram_messages([msg])
self._send_email_message(f"🚨 網頁異常告警 - {url}", msg)
self._send_email_message(f"🚨 網頁異常告警 - {url}", msg)

View File

@@ -56,7 +56,7 @@ def _get_chat_ids() -> list:
def _send_telegram_raw(text: str, chat_ids: Optional[list] = None,
reply_markup: Optional[Dict[str, Any]] = None,
parse_mode: str = "HTML") -> bool:
parse_mode: Optional[str] = "HTML") -> bool:
"""發送純文字訊息"""
import requests
token = _get_bot_token()
@@ -68,7 +68,9 @@ def _send_telegram_raw(text: str, chat_ids: Optional[list] = None,
chat_ids = [-1003940688311] # fallback
url = f"https://api.telegram.org/bot{token}/sendMessage"
payload = {"chat_id": chat_ids[0], "text": text, "parse_mode": parse_mode}
payload = {"chat_id": chat_ids[0], "text": text}
if parse_mode:
payload["parse_mode"] = parse_mode
if reply_markup:
payload["reply_markup"] = json.dumps(reply_markup, ensure_ascii=False)
try:
@@ -84,7 +86,7 @@ def _send_telegram_raw(text: str, chat_ids: Optional[list] = None,
def send_telegram_with_result(text: str, chat_ids: Optional[list] = None,
reply_markup: Optional[Dict[str, Any]] = None,
parse_mode: str = "HTML") -> Dict[str, Any]:
parse_mode: Optional[str] = "HTML") -> Dict[str, Any]:
"""發送 Telegram 並回傳結果明細,供 EventRouter / AIOps 使用。"""
token = _get_bot_token()
if not token:
@@ -103,7 +105,9 @@ def send_telegram_with_result(text: str, chat_ids: Optional[list] = None,
errors: List[str] = []
for chat_id in chat_ids:
payload = {"chat_id": chat_id, "text": text, "parse_mode": parse_mode}
payload = {"chat_id": chat_id, "text": text}
if parse_mode:
payload["parse_mode"] = parse_mode
if reply_markup:
payload["reply_markup"] = json.dumps(reply_markup, ensure_ascii=False)
try: