修復 P0 告警自癒鏈與測試收集
All checks were successful
CD Pipeline / deploy (push) Successful in 9m39s

This commit is contained in:
OoO
2026-04-29 22:37:20 +08:00
parent 880511032a
commit 779b27f676
13 changed files with 566 additions and 305 deletions

11
app.py
View File

@@ -125,11 +125,12 @@ from utils.security import ( # noqa: E402
# 🚩 資料庫結構自動修復 (V9.53 新增) — 實作搬至 database/schema_repair.py
from database.schema_repair import repair_database_schema # noqa: E402, F401
# 從環境變數讀取 NGROK_AUTH_TOKEN(如果未設定則使用原值,但會發出警告)
NGROK_AUTH_TOKEN = os.getenv('NGROK_AUTH_TOKEN', '36e27NM5V7sUJ8QxJIAAWCp7sUv_3brtcrBarYvcP3SbvFKhF')
if NGROK_AUTH_TOKEN == '36e27NM5V7sUJ8QxJIAAWCp7sUv_3brtcrBarYvcP3SbvFKhF':
sys_log.warning("[Security] ⚠️ 使用預設 NGROK_AUTH_TOKEN,請設定環境變數")
conf.get_default().auth_token = NGROK_AUTH_TOKEN
# 從環境變數讀取 NGROK_AUTH_TOKEN;未設定時禁止使用硬編碼預設值
NGROK_AUTH_TOKEN = os.getenv('NGROK_AUTH_TOKEN', '')
if NGROK_AUTH_TOKEN:
conf.get_default().auth_token = NGROK_AUTH_TOKEN
else:
sys_log.warning("[Security] ⚠️ NGROK_AUTH_TOKEN 未設定,已跳過 ngrok auth token 注入")
TEMPLATE_DIR = os.path.join(BASE_DIR, 'templates')
STATIC_DIR = os.path.join(BASE_DIR, 'web/static')

View File

@@ -1,5 +1,6 @@
import os
import json
import sys
from dotenv import load_dotenv
# 載入 .env 環境變數
@@ -52,15 +53,34 @@ else:
# ==========================================
import sys as _sys
LOGIN_PASSWORD = os.getenv('LOGIN_PASSWORD')
if not LOGIN_PASSWORD:
print("[FATAL] LOGIN_PASSWORD 環境變數未設定,拒絕啟動。請在 .env 或 Docker 環境設定此值。", file=_sys.stderr)
def _is_test_context() -> bool:
return (
os.getenv("PYTEST_CURRENT_TEST") is not None
or "pytest" in sys.modules
or os.getenv("MOMO_ALLOW_INSECURE_CONFIG_FOR_TESTS", "").lower() == "true"
)
def _require_env(var_name: str, *, insecure_values: tuple[str, ...] = ()) -> str:
value = os.getenv(var_name)
if value and value not in insecure_values:
return value
if _is_test_context():
fallback = f"test-{var_name.lower()}"
print(f"[WARN] {var_name} 未設定,測試環境使用暫時值。", file=_sys.stderr)
return fallback
if var_name == "LOGIN_PASSWORD":
print("[FATAL] LOGIN_PASSWORD 環境變數未設定,拒絕啟動。請在 .env 或 Docker 環境設定此值。", file=_sys.stderr)
else:
print(f"[FATAL] {var_name} 環境變數未設定或仍為不安全預設值,拒絕啟動。請改用安全值。", file=_sys.stderr)
_sys.exit(1)
SECRET_KEY = os.getenv('SECRET_KEY')
if not SECRET_KEY or SECRET_KEY in ('your_flask_secret_key', 'change_me', ''):
print("[FATAL] SECRET_KEY 環境變數未設定或仍為不安全預設值拒絕啟動。請執行python3 -c \"import secrets; print(secrets.token_hex(32))\" 產生安全金鑰。", file=_sys.stderr)
_sys.exit(1)
LOGIN_PASSWORD = _require_env("LOGIN_PASSWORD")
SECRET_KEY = _require_env("SECRET_KEY", insecure_values=('your_flask_secret_key', 'change_me', ''))
# ==========================================
# 通訊模組設定(從環境變數讀取)

View File

@@ -17,7 +17,7 @@
#
# Docker Registry:
# - URL: registry.wooo.work (HTTPS + 認證)
# - 帳號: admin / Wooo_Registry_2026
# - 帳密請由部署環境的 secret / CI 變數提供,不得寫入 repo
#
# 注意事項:
# - GCP 生產環境使用 VM 原生 Nginx不需要啟動 Docker nginx

View File

@@ -185,8 +185,8 @@ if __name__ == "__main__":
if not _ea_thread.is_alive():
logger.error("[ElephantAlpha] 監控執行緒已死亡,嘗試重啟")
try:
from services.event_router import dispatch as _dispatch
_dispatch({
from services.event_router import dispatch_sync as _dispatch_sync
_dispatch_sync({
"source": "Scheduler.ElephantAlpha",
"event_type": "thread_crashed",
"severity": "alert",

View File

@@ -824,9 +824,23 @@ def run_edm_task(lpn_code="O1K5FBOqsvN"):
_save_stats('edm_task', stats)
except Exception as e:
import traceback as _tb
logging.error(f"[Crawler] [EDM] 🚨 EDM 任務異常 | Error: {e}")
stats = { "status": "Failed", "error": str(e) }
_save_stats('edm_task', stats)
try:
from services.event_router import notify_failure
notify_failure(
task_name="run_edm_task",
error=e,
source="Scheduler.EDM",
event_type="crawler_timeout" if "timeout" in str(e).lower() else "edm_task_failure",
priority="P2",
title="EDM 爬蟲任務異常",
trace=_tb.format_exc(),
)
except Exception as _router_e:
logging.error(f"[Crawler] [EDM] event_router 失敗: {_router_e}")
def _find_component_areas_with_diagnostic(driver_or_element):
"""
@@ -1193,9 +1207,23 @@ def run_festival_task(lpn_code="O7ylWfihYUM"):
logging.info("[Crawler] [Festival] 無異動,不發送通知")
except Exception as e:
import traceback as _tb
logging.error(f"[Crawler] [Festival] 🚨 {PAGE_TYPE} 任務異常 | Error: {e}")
stats = { "status": "Failed", "error": str(e) }
_save_stats('festival_task', stats)
try:
from services.event_router import notify_failure
notify_failure(
task_name="run_festival_task",
error=e,
source="Scheduler.Festival",
event_type="crawler_timeout" if "timeout" in str(e).lower() else "festival_task_failure",
priority="P2",
title=f"{PAGE_TYPE} 活動任務異常",
trace=_tb.format_exc(),
)
except Exception as _router_e:
logging.error(f"[Crawler] [Festival] event_router 失敗: {_router_e}")
def run_promo_event_task(lpn_code, page_type, activity_name):
"""
@@ -1505,9 +1533,23 @@ def run_promo_event_task(lpn_code, page_type, activity_name):
logging.info(f"[Crawler] [{page_type.upper()}] 無異動,不發送通知")
except Exception as e:
import traceback as _tb
logging.error(f"[Crawler] [{page_type.upper()}] 🚨 {page_type} 任務異常 | Error: {e}")
stats = { "status": "Failed", "error": str(e) }
_save_stats(f'{page_type}_task', stats)
try:
from services.event_router import notify_failure
notify_failure(
task_name="run_promo_event_task",
error=e,
source=f"Scheduler.{page_type}",
event_type="crawler_timeout" if "timeout" in str(e).lower() else "promo_event_task_failure",
priority="P2",
title=f"{activity_name} 任務異常",
trace=_tb.format_exc(),
)
except Exception as _router_e:
logging.error(f"[Crawler] [{page_type.upper()}] event_router 失敗: {_router_e}")
def run_whitepage_check():
@@ -1749,6 +1791,7 @@ def run_auto_import_task():
每半小時檢查一次 Google Drive 是否有新的 Excel 檔案
"""
# ADR-012 Phase 4: HITL 暫停檢查
notification_sent = False
try:
from services.agent_actions import is_task_paused
if is_task_paused("run_auto_import_task"):
@@ -1956,8 +1999,22 @@ def run_competitor_price_feeder_task():
_save_stats('competitor_price_feeder', stats)
except Exception as e:
import traceback as _tb
logging.error(f"[Scheduler] [Feeder] 🚨 任務異常 | Error: {e}")
_save_stats('competitor_price_feeder', {"status": "Failed", "error": str(e)})
try:
from services.event_router import notify_failure
notify_failure(
task_name="run_competitor_price_feeder_task",
error=e,
source="Scheduler.Feeder",
event_type="competitor_price_feeder_failure",
priority="P2",
title="競品價格補給任務異常",
trace=_tb.format_exc(),
)
except Exception as _router_e:
logging.error(f"[Scheduler] [Feeder] event_router 失敗: {_router_e}")
def run_icaim_analysis_task():
@@ -2038,6 +2095,19 @@ def run_icaim_analysis_task():
import traceback as _tb
logging.error(f"[Scheduler] [ICAIM] 🚨 任務異常 | Error: {e}")
_save_stats('icaim_analysis', {"status": "Failed", "error": str(e)})
try:
from services.event_router import notify_failure
notify_failure(
task_name="run_icaim_analysis_task",
error=e,
source="Scheduler.ICAIM",
event_type="icaim_analysis_failure",
priority="P1",
title="ICAIM 競價分析任務異常",
trace=_tb.format_exc(),
)
except Exception as _router_e:
logging.error(f"[Scheduler] [ICAIM] event_router 失敗: {_router_e}")
# ADR-013: AIOps 自動修復
try:
from services.auto_heal_service import auto_heal_service
@@ -2064,6 +2134,19 @@ def run_weekly_strategy_task():
import traceback as _tb
logging.error(f"[Scheduler] [Strategy] 🚨 任務異常 | Error: {e}")
_save_stats('weekly_strategy', {"status": "Failed", "error": str(e)})
try:
from services.event_router import notify_failure
notify_failure(
task_name="run_weekly_strategy_task",
error=e,
source="Scheduler.Strategy",
event_type="weekly_strategy_failure",
priority="P2",
title="每週策略週報任務異常",
trace=_tb.format_exc(),
)
except Exception as _router_e:
logging.error(f"[Scheduler] [Strategy] event_router 失敗: {_router_e}")
# ADR-013: AIOps 自動修復
try:
from services.auto_heal_service import auto_heal_service
@@ -2262,8 +2345,22 @@ def run_dedup_batch_task():
)
_save_stats('dedup_batch', result)
except Exception as e:
import traceback as _tb
logging.error(f"[Scheduler] [Dedup] 去重批次異常: {e}")
_save_stats('dedup_batch', {"status": "Error", "error": str(e)})
try:
from services.event_router import notify_failure
notify_failure(
task_name="run_dedup_batch_task",
error=e,
source="Scheduler.Dedup",
event_type="dedup_batch_failure",
priority="P2",
title="ai_insights 去重批次異常",
trace=_tb.format_exc(),
)
except Exception as _router_e:
logging.error(f"[Scheduler] [Dedup] event_router 失敗: {_router_e}")
def run_quality_rescore_task():
@@ -2277,8 +2374,22 @@ def run_quality_rescore_task():
)
_save_stats('quality_rescore', result)
except Exception as e:
import traceback as _tb
logging.error(f"[Scheduler] [Rescore] 品質分數重算異常: {e}")
_save_stats('quality_rescore', {"status": "Error", "error": str(e)})
try:
from services.event_router import notify_failure
notify_failure(
task_name="run_quality_rescore_task",
error=e,
source="Scheduler.Rescore",
event_type="quality_rescore_failure",
priority="P2",
title="ai_insights 品質重算批次異常",
trace=_tb.format_exc(),
)
except Exception as _router_e:
logging.error(f"[Scheduler] [Rescore] event_router 失敗: {_router_e}")
def run_daily_report_task():

View File

@@ -179,12 +179,22 @@ class AutoHealService:
def handle_exception(
self,
error_type: str,
error_type: Optional[str] = None,
context: Optional[Dict[str, Any]] = None,
**legacy_kwargs: Any,
) -> AutoHealResult:
context = context or {}
context = dict(context or {})
if legacy_kwargs:
context.update({k: v for k, v in legacy_kwargs.items() if v is not None})
error_type = error_type or context.get("error_type") or self._derive_error_type(context)
context.setdefault("error_type", error_type)
self._log.info("[AutoHeal] handle_exception: error_type=%s context=%s", error_type, context)
incident_id = self._ensure_incident(error_type, context)
if incident_id is not None:
context["incident_id"] = incident_id
playbook = _find_playbook(error_type)
if not playbook:
msg = f"No playbook matched for error_type={error_type}"
@@ -209,6 +219,48 @@ class AutoHealService:
# generic action execution
return self._execute_playbook_action(playbook, context)
def _derive_error_type(self, context: Dict[str, Any]) -> str:
error_text = str(context.get("exception") or context.get("error_message") or "").lower()
traceback_text = str(context.get("traceback_str") or "").lower()
combined = f"{error_text}\n{traceback_text}"
if "could not translate host name" in combined or "db_connection_error" in combined:
return "db_connection_error"
if "timeout" in combined:
return "crawler_timeout"
if "nvidia" in combined and "quota" in combined:
return "nim_quota_exhausted"
if "embedding" in combined:
return "embedding_failure"
return "scheduler_task_failure"
def _ensure_incident(self, error_type: str, context: Dict[str, Any]) -> Optional[int]:
from database.autoheal_models import Incident
session = get_session()
try:
incident = Incident(
task_name=str(context.get("task_name") or context.get("source") or "unknown_task"),
error_type=error_type,
error_message=str(context.get("exception") or context.get("error_message") or error_type)[:2000],
traceback_str=str(context.get("traceback_str") or "")[:8000] or None,
severity=str(context.get("severity") or "medium"),
status="healing",
retry_count=0,
)
session.add(incident)
session.flush()
incident_id = incident.id
session.commit()
self._log.info("[AutoHeal] incident created: id=%s error_type=%s", incident_id, error_type)
return incident_id
except Exception as exc:
session.rollback()
self._log.error("[AutoHeal] incident create failed: %s", exc)
return None
finally:
session.close()
def _handle_code_fix(self, playbook: Dict[str, Any], context: Dict[str, Any]) -> AutoHealResult:
from services.aider_heal_executor import execute_code_fix
target_file = context.get("target_file", "")
@@ -316,6 +368,8 @@ class AutoHealService:
_store_escalation(playbook["action_type"])
self._log.info("[AutoHeal] Alert stored for: %s", playbook["action_type"])
self._write_heal_log(playbook, context, result, duration_ms)
self._write_ai_insight(playbook, context, result, duration_ms)
self._update_incident_status(context, result)
def _write_heal_log(
self,
@@ -352,3 +406,75 @@ class AutoHealService:
session.rollback()
finally:
session.close()
def _write_ai_insight(
self,
playbook: Dict[str, Any],
context: Dict[str, Any],
result: "AutoHealResult",
duration_ms: float,
) -> None:
from database.ai_models import AIInsight
session = get_session()
try:
payload = {
"incident_id": context.get("incident_id"),
"task_name": context.get("task_name"),
"error_type": context.get("error_type"),
"playbook_id": playbook.get("id"),
"action_type": playbook.get("action_type"),
"success": result.success,
"duration_ms": duration_ms,
}
insight = AIInsight(
insight_type="auto_heal_playbook",
content=(
f"task={context.get('task_name', 'unknown')} "
f"error_type={context.get('error_type', 'unknown')} "
f"action={playbook.get('action_type')} "
f"result={'success' if result.success else 'failed'} "
f"message={str(result.message)[:500]}"
),
metadata_json=json.dumps(payload, ensure_ascii=False),
confidence=1.0 if result.success else 0.4,
created_by="auto_heal_service",
ai_model="rule_based",
status="active",
)
session.add(insight)
session.commit()
except Exception as exc:
self._log.error("[AutoHeal] ai_insight write failed: %s", exc)
session.rollback()
finally:
session.close()
def _update_incident_status(self, context: Dict[str, Any], result: "AutoHealResult") -> None:
incident_id = context.get("incident_id")
if not incident_id:
return
session = get_session()
try:
session.execute(
text("""
UPDATE incidents
SET status = :status,
updated_at = NOW()
WHERE id = :incident_id
"""),
{
"status": "closed" if result.success else "escalated",
"incident_id": int(incident_id),
},
)
session.commit()
except Exception as exc:
session.rollback()
self._log.error("[AutoHeal] incident status update failed: %s", exc)
finally:
session.close()
auto_heal_service = AutoHealService()

View File

@@ -7,11 +7,11 @@ import asyncio
import logging
import threading
import traceback
import time
from typing import Any, Dict, Optional
from services.ai_orchestrator import AIOrchestrator
from services.telegram_templates import alert
from database.manager import get_session
from services.telegram_templates import send_telegram_with_result, triaged_alert
logger = logging.getLogger(__name__)
@@ -76,6 +76,7 @@ async def dispatch(event: Dict[str, Any], admin_chat_ids: Optional[list] = None)
"""
tier = _classify(event)
session_id = f"evt:{event.get('event_type')}:{event.get('source', 'unknown')}"
started_at = time.perf_counter()
try:
if tier == "L0":
@@ -87,12 +88,17 @@ async def dispatch(event: Dict[str, Any], admin_chat_ids: Optional[list] = None)
else:
result = await _handle_l0(event)
message, reply_markup = _build_telegram_message(event, tier, result)
send_result = send_telegram_with_result(message, chat_ids=admin_chat_ids, reply_markup=reply_markup)
latency_ms = int((time.perf_counter() - started_at) * 1000)
return {
"tier": tier,
"sent": 1,
"errors": [],
"latency_ms": 0,
"sent": send_result["sent"],
"errors": send_result["errors"],
"latency_ms": latency_ms,
"payload": result,
"delivered": send_result["ok"],
}
except Exception as e:
logger.exception(f"[EventRouter] dispatch failed: {e}")
@@ -100,8 +106,9 @@ async def dispatch(event: Dict[str, Any], admin_chat_ids: Optional[list] = None)
"tier": tier,
"sent": 0,
"errors": [str(e)],
"latency_ms": 0,
"latency_ms": int((time.perf_counter() - started_at) * 1000),
"payload": None,
"delivered": False,
}
@@ -185,3 +192,42 @@ def _classify(event: Dict[str, Any]) -> str:
return "L2"
return "L1"
return "L0"
def _build_telegram_message(event: Dict[str, Any], tier: str, result: Optional[Dict[str, Any]]) -> tuple[str, Optional[Dict[str, Any]]]:
if tier == "L0":
title = event.get("title") or event.get("event_type", "system_event")
summary = event.get("summary") or event.get("status") or "系統事件"
body = event.get("impact") or event.get("source") or ""
message = (
f" <b>{title}</b>\n"
f"━━━━━━━━━━━━━━━━━━━━\n"
f"{summary}\n"
f"{body}\n"
)
return message, None
ai_summary = ""
ai_cause = None
ai_actions = None
ai_executed = None
if isinstance(result, dict):
ai_summary = (
result.get("summary")
or result.get("reasoning")
or result.get("message")
or str(result)[:400]
)
ai_cause = result.get("cause") or result.get("root_cause")
ai_actions = result.get("suggested_actions") or result.get("execution_plan")
ai_executed = result.get("executed_actions")
return triaged_alert(
event,
tier_label=tier,
ai_summary=ai_summary,
ai_cause=ai_cause,
ai_actions=ai_actions if isinstance(ai_actions, list) else ([str(ai_actions)] if ai_actions else None),
ai_executed=ai_executed if isinstance(ai_executed, list) else ([str(ai_executed)] if ai_executed else None),
)

View File

@@ -20,6 +20,7 @@ TAIPEI_TZ = pytz.timezone('Asia/Taipei')
from services.google_drive_service import drive_service
from database.import_models import ImportJob, ImportConfig, Base
from database.manager import ensure_metadata_initialized
# 設定日誌
logger = logging.getLogger(__name__)
@@ -68,7 +69,7 @@ class ImportService:
def _init_database(self):
"""初始化資料庫表"""
try:
Base.metadata.create_all(engine)
ensure_metadata_initialized(engine, use_postgres_lock=str(engine.url).startswith('postgresql'))
logger.info("匯入追蹤表已初始化")
except Exception as e:
logger.error(f"初始化資料庫表失敗: {str(e)}")

View File

@@ -9,6 +9,7 @@ import os
from sqlalchemy import create_engine
from database.manager import DatabaseManager
from database.manager import ensure_metadata_initialized
from database.notification_models import NotificationTemplate, DEFAULT_TEMPLATES, Base
from services.logger_manager import SystemLogger
@@ -29,7 +30,7 @@ except ImportError:
def _init_notification_tables():
"""初始化通知模板資料表"""
try:
Base.metadata.create_all(_engine)
ensure_metadata_initialized(_engine, use_postgres_lock=str(_engine.url).startswith('postgresql'))
sys_log.info("[Notification] 通知模板表已初始化")
except Exception as e:
sys_log.error(f"[Notification] 初始化表失敗: {e}")

View File

@@ -21,6 +21,7 @@ import io
import json
import logging
import os
from html import escape
from datetime import datetime
from typing import Any, Dict, List, Optional
@@ -81,6 +82,53 @@ def _send_telegram_raw(text: str, chat_ids: Optional[list] = None,
return False
def send_telegram_with_result(text: str, chat_ids: Optional[list] = None,
reply_markup: Optional[Dict[str, Any]] = None,
parse_mode: str = "HTML") -> Dict[str, Any]:
"""發送 Telegram 並回傳結果明細,供 EventRouter / AIOps 使用。"""
token = _get_bot_token()
if not token:
return {"ok": False, "sent": 0, "failed": 0, "chat_ids": [], "errors": ["token_missing"]}
if chat_ids is None:
chat_ids = _get_chat_ids()
if not chat_ids:
chat_ids = [-1003940688311]
import requests
url = f"https://api.telegram.org/bot{token}/sendMessage"
sent = 0
failed = 0
errors: List[str] = []
for chat_id in chat_ids:
payload = {"chat_id": chat_id, "text": text, "parse_mode": parse_mode}
if reply_markup:
payload["reply_markup"] = json.dumps(reply_markup, ensure_ascii=False)
try:
response = requests.post(url, json=payload, timeout=10)
if response.ok:
sent += 1
else:
failed += 1
errors.append(f"{chat_id}:HTTP {response.status_code}")
sys_log.warning("[TelegramTpl] sendMessage chat=%s HTTP %s: %s",
chat_id, response.status_code, response.text[:200])
except Exception as e:
failed += 1
errors.append(f"{chat_id}:{type(e).__name__}")
sys_log.error("[TelegramTpl] send chat=%s 失敗: %s", chat_id, e)
return {
"ok": sent > 0 and failed == 0,
"sent": sent,
"failed": failed,
"chat_ids": list(chat_ids),
"errors": errors,
}
def send_photo(photo_bytes: bytes, caption: str = "",
chat_ids: Optional[list] = None,
parse_mode: str = "HTML") -> bool:
@@ -380,9 +428,13 @@ def triaged_alert(base_event: Dict[str, Any], tier_label: str,
ai_executed: Optional[list] = None) -> tuple:
"""EA L1/L2 自主執行通知(保留原有介面,升級排版)"""
event_type = base_event.get("event_type", "alert")
title = base_event.get("title", "")
summary = base_event.get("summary", "")
title = escape(str(base_event.get("title", "")))
summary = escape(str(base_event.get("summary", "")))
event_id = base_event.get("id", "unknown")
safe_ai_summary = escape(str(ai_summary or ""))
safe_ai_cause = escape(str(ai_cause or "")) if ai_cause else None
safe_actions = [escape(str(a)) for a in (ai_actions or [])]
safe_executed = [escape(str(a)) for a in (ai_executed or [])]
lines = [
f"⚡ <b>{tier_label} · {event_type}</b>",
@@ -391,14 +443,14 @@ def triaged_alert(base_event: Dict[str, Any], tier_label: str,
]
if summary:
lines += [f"🔍 <b>概要:</b>{summary}", ""]
if ai_summary:
lines += [f"🧠 <b>AI 摘要:</b>{ai_summary[:400]}", ""]
if ai_cause:
lines += [f"💡 <b>可能原因:</b>{ai_cause}", ""]
if ai_actions:
lines += ["<b>📋 建議行動:</b>"] + [f"{a}" for a in ai_actions] + [""]
if ai_executed:
lines += ["<b>✅ 已執行:</b>"] + [f"{a}" for a in ai_executed] + [""]
if safe_ai_summary:
lines += [f"🧠 <b>AI 摘要:</b>{safe_ai_summary[:400]}", ""]
if safe_ai_cause:
lines += [f"💡 <b>可能原因:</b>{safe_ai_cause}", ""]
if safe_actions:
lines += ["<b>📋 建議行動:</b>"] + [f"{a}" for a in safe_actions] + [""]
if safe_executed:
lines += ["<b>✅ 已執行:</b>"] + [f"{a}" for a in safe_executed] + [""]
trace = base_event.get("trace")
if trace:

View File

@@ -4,165 +4,106 @@
測試完整匯入流程
"""
import sys
import os
import sys
from datetime import date, datetime
import numpy as np
import pandas as pd
import pytest
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
import pandas as pd
import numpy as np
from datetime import date, datetime
from database.vendor_manager import VendorDatabaseManager
from database.vendor_models import VendorStockout
# 讀取 Excel
excel_path = '/Users/ogt/Downloads/缺貨測試.xlsx'
df = pd.read_excel(excel_path)
EXCEL_PATH = "/Users/ogt/Downloads/缺貨測試.xlsx"
print("=" * 80)
print("測試完整匯入流程")
print("=" * 80)
# 檢查是否有當前日期欄位
has_date_column = '當前日期' in df.columns
print(f"\n是否有當前日期欄位: {has_date_column}")
@pytest.mark.skipif(not os.path.exists(EXCEL_PATH), reason="缺少本機測試 Excel 檔案")
def test_full_import():
df = pd.read_excel(EXCEL_PATH)
# 處理第一行
idx = 0
row = df.iloc[idx]
has_date_column = "當前日期" in df.columns
row = df.iloc[0]
print(f"\n處理第 {idx+1}")
print("-" * 80)
# 日期轉換
row_import_date = date.today()
if has_date_column:
date_value = row.get('當前日期')
print(f"當前日期原始值: {date_value}, 類型: {type(date_value).__name__}")
if pd.notna(date_value):
print(f" pd.notna: True")
if isinstance(date_value, (int, float, np.integer, np.floating)):
print(f" 是數字類型 (包括 numpy)")
parsed_date = pd.to_datetime(date_value, unit='D', origin='1899-12-30', errors='coerce')
print(f" parsed_date: {parsed_date}")
row_import_date = date.today()
if has_date_column:
date_value = row.get("當前日期")
if pd.notna(date_value) and isinstance(date_value, (int, float, np.integer, np.floating)):
parsed_date = pd.to_datetime(date_value, unit="D", origin="1899-12-30", errors="coerce")
if pd.notna(parsed_date):
row_import_date = parsed_date.date()
print(f" ✅ 轉換成功: {row_import_date}")
# 缺貨日期轉換
stockout_date_value = row.get('缺貨日期')
stockout_date_obj = None
print(f"\n缺貨日期原始值: {stockout_date_value}, 類型: {type(stockout_date_value).__name__}")
if pd.notna(stockout_date_value):
print(f" pd.notna: True")
if isinstance(stockout_date_value, (int, float, np.integer, np.floating)):
print(f" 是數字類型 (包括 numpy)")
stockout_parsed = pd.to_datetime(stockout_date_value, unit='D', origin='1899-12-30', errors='coerce')
print(f" stockout_parsed: {stockout_parsed}")
stockout_date_value = row.get("缺貨日期")
stockout_date_obj = None
if pd.notna(stockout_date_value) and isinstance(stockout_date_value, (int, float, np.integer, np.floating)):
stockout_parsed = pd.to_datetime(stockout_date_value, unit="D", origin="1899-12-30", errors="coerce")
if pd.notna(stockout_parsed):
stockout_date_obj = stockout_parsed.date()
print(f" ✅ 轉換成功: {stockout_date_obj}")
# 建立 record
print("\n建立 record:")
print("-" * 80)
record = {
"import_date": row_import_date,
"department": row.get("處別"),
"section": row.get("科別"),
"pm_name": row.get("PM姓名"),
"zone_id": row.get("區ID"),
"zone_name": row.get("區名稱"),
"product_code": str(row.get("商品ID", "")).strip(),
"product_name": str(row.get("商品名稱", "")).strip(),
"product_spec": row.get("單品/組合商品"),
"borrow_transfer": row.get("借採轉"),
"sale_price": None,
"cost_price": None,
"vendor_code": str(row.get("來源供應商編號", "")).strip(),
"vendor_name": str(row.get("來源供應商名稱", "")).strip(),
"current_stock": row.get("商品可賣量"),
"stockout_date": stockout_date_obj,
"stockout_days": row.get("缺貨天數"),
"monthly_sales_amount": row.get("缺貨商品前30天業績"),
"monthly_sales_qty": row.get("最近30天銷售量"),
"daily_avg_sales": None,
"safe_stock_days": row.get("庫存水位"),
"notes": None,
}
record = {
'import_date': row_import_date,
'department': row.get('處別'),
'section': row.get('科別'),
'pm_name': row.get('PM姓名'),
'zone_id': row.get('區ID'),
'zone_name': row.get('區名稱'),
'product_code': str(row.get('商品ID', '')).strip(),
'product_name': str(row.get('商品名稱', '')).strip(),
'product_spec': row.get('單品/組合商品'),
'borrow_transfer': row.get('借採轉'),
'sale_price': None,
'cost_price': None,
'vendor_code': str(row.get('來源供應商編號', '')).strip(),
'vendor_name': str(row.get('來源供應商名稱', '')).strip(),
'current_stock': row.get('商品可賣量'),
'stockout_date': stockout_date_obj,
'stockout_days': row.get('缺貨天數'),
'monthly_sales_amount': row.get('缺貨商品前30天業績'),
'monthly_sales_qty': row.get('最近30天銷售量'),
'daily_avg_sales': None,
'safe_stock_days': row.get('庫存水位'),
'notes': None
}
db = VendorDatabaseManager()
session = db.get_session()
try:
batch_id = datetime.now().strftime("%Y%m%d_%H%M%S")
stockout_item = VendorStockout(
batch_id=batch_id,
import_date=record["import_date"],
import_time=datetime.now(),
department=record["department"],
section=record["section"],
pm_name=record["pm_name"],
zone_id=record["zone_id"],
zone_name=record["zone_name"],
product_code=record["product_code"],
product_name=record["product_name"],
product_spec=record["product_spec"],
borrow_transfer=record["borrow_transfer"],
sale_price=record["sale_price"],
cost_price=record["cost_price"],
vendor_code=record["vendor_code"],
vendor_name=record["vendor_name"],
monthly_sales_qty=record["monthly_sales_qty"],
monthly_sales_amount=record["monthly_sales_amount"],
daily_avg_sales=record["daily_avg_sales"],
current_stock=record["current_stock"],
stockout_date=record["stockout_date"],
stockout_days=record["stockout_days"],
safe_stock_days=record["safe_stock_days"],
notes=record["notes"],
status="pending",
)
session.add(stockout_item)
session.commit()
session.expire_all()
for key, value in record.items():
print(f" {key}: {value}")
# 寫入資料庫
print("\n" + "=" * 80)
print("寫入資料庫")
print("=" * 80)
db = VendorDatabaseManager()
session = db.get_session()
try:
# 生成批次ID
batch_id = datetime.now().strftime('%Y%m%d_%H%M%S')
stockout_item = VendorStockout(
batch_id=batch_id,
import_date=record['import_date'],
import_time=datetime.now(),
department=record['department'],
section=record['section'],
pm_name=record['pm_name'],
zone_id=record['zone_id'],
zone_name=record['zone_name'],
product_code=record['product_code'],
product_name=record['product_name'],
product_spec=record['product_spec'],
borrow_transfer=record['borrow_transfer'],
sale_price=record['sale_price'],
cost_price=record['cost_price'],
vendor_code=record['vendor_code'],
vendor_name=record['vendor_name'],
monthly_sales_qty=record['monthly_sales_qty'],
monthly_sales_amount=record['monthly_sales_amount'],
daily_avg_sales=record['daily_avg_sales'],
current_stock=record['current_stock'],
stockout_date=record['stockout_date'],
stockout_days=record['stockout_days'],
safe_stock_days=record['safe_stock_days'],
notes=record['notes'],
status='pending'
)
session.add(stockout_item)
session.commit()
print(f"✅ 寫入成功")
# 刷新 session
session.expire_all()
# 讀回資料驗證
print("\n讀回資料驗證:")
print("-" * 80)
saved = session.query(VendorStockout).filter_by(product_code=record['product_code']).first()
if saved:
print(f" 當前日期: {saved.import_date}")
print(f" 區ID: {saved.zone_id}")
print(f" 區名稱: {saved.zone_name}")
print(f" 借採轉: {saved.borrow_transfer}")
print(f" 缺貨日期: {saved.stockout_date}")
print(f" 缺貨天數: {saved.stockout_days}")
print(f" 商品可賣量: {saved.current_stock}")
print(f" 庫存水位: {saved.safe_stock_days}")
except Exception as e:
session.rollback()
print(f"❌ 寫入失敗: {e}")
import traceback
traceback.print_exc()
finally:
session.close()
saved = session.query(VendorStockout).filter_by(product_code=record["product_code"]).first()
assert saved is not None
assert saved.product_code == record["product_code"]
finally:
session.close()

View File

@@ -4,87 +4,57 @@
測試匯入邏輯
"""
import os
import pandas as pd
from datetime import date
import pytest
# 讀取 Excel
excel_path = '/Users/ogt/Downloads/缺貨測試.xlsx'
df = pd.read_excel(excel_path)
EXCEL_PATH = "/Users/ogt/Downloads/缺貨測試.xlsx"
print("=" * 80)
print("測試讀取第一行")
print("=" * 80)
# 讀取第一行
row = df.iloc[0]
@pytest.mark.skipif(not os.path.exists(EXCEL_PATH), reason="缺少本機測試 Excel 檔案")
def test_import_logic():
df = pd.read_excel(EXCEL_PATH)
row = df.iloc[0]
print(f"\n區ID: {row.get('區ID')}")
print(f"區名稱: {row.get('區名稱')}")
print(f"借採轉: {row.get('借採轉')}")
print(f"缺貨天數: {row.get('缺貨天數')}")
print(f"商品可賣量: {row.get('商品可賣量')}")
print(f"庫存水位: {row.get('庫存水位')}")
# 測試日期轉換
print(f"\n當前日期原始值: {row.get('當前日期')} (類型: {type(row.get('當前日期')).__name__})")
date_value = row.get('當前日期')
row_import_date = None
if pd.notna(date_value):
print(f" pd.notna(date_value): True")
if isinstance(date_value, (int, float)):
print(f" 是數字類型")
parsed_date = pd.to_datetime(date_value, unit='D', origin='1899-12-30', errors='coerce')
print(f" parsed_date: {parsed_date}, pd.notna: {pd.notna(parsed_date)}")
date_value = row.get("當前日期")
row_import_date = None
if pd.notna(date_value) and isinstance(date_value, (int, float)):
parsed_date = pd.to_datetime(date_value, unit="D", origin="1899-12-30", errors="coerce")
if pd.notna(parsed_date):
row_import_date = parsed_date.date()
print(f" ✅ 轉換後: {row_import_date}")
else:
print(f" pd.notna(date_value): False")
# 測試缺貨日期轉換
print(f"\n缺貨日期原始值: {row.get('缺貨日期')} (類型: {type(row.get('缺貨日期')).__name__})")
stockout_date_value = row.get('缺貨日期')
stockout_date_obj = None
if pd.notna(stockout_date_value):
print(f" pd.notna(stockout_date_value): True")
if isinstance(stockout_date_value, (int, float)):
print(f" 是數字類型")
stockout_parsed = pd.to_datetime(stockout_date_value, unit='D', origin='1899-12-30', errors='coerce')
print(f" stockout_parsed: {stockout_parsed}, pd.notna: {pd.notna(stockout_parsed)}")
stockout_date_value = row.get("缺貨日期")
stockout_date_obj = None
if pd.notna(stockout_date_value) and isinstance(stockout_date_value, (int, float)):
stockout_parsed = pd.to_datetime(stockout_date_value, unit="D", origin="1899-12-30", errors="coerce")
if pd.notna(stockout_parsed):
stockout_date_obj = stockout_parsed.date()
print(f" ✅ 轉換後: {stockout_date_obj}")
else:
print(f" pd.notna(stockout_date_value): False")
# 模擬完整的 record 建立
record = {
'import_date': row_import_date if 'row_import_date' in locals() else None,
'department': row.get('處別'),
'section': row.get('科別'),
'pm_name': row.get('PM姓名'),
'zone_id': row.get('區ID'),
'zone_name': row.get('區名稱'),
'product_code': str(row.get('商品ID', '')).strip(),
'product_name': str(row.get('商品名稱', '')).strip(),
'product_spec': row.get('單品/組合商品'),
'borrow_transfer': row.get('借採轉'),
'sale_price': None,
'cost_price': None,
'vendor_code': str(row.get('來源供應商編號', '')).strip(),
'vendor_name': str(row.get('來源供應商名稱', '')).strip(),
'current_stock': row.get('商品可賣量'),
'stockout_date': stockout_date_obj,
'stockout_days': row.get('缺貨天數'),
'monthly_sales_amount': row.get('缺貨商品前30天業績'),
'monthly_sales_qty': row.get('最近30天銷售量'),
'daily_avg_sales': None,
'safe_stock_days': row.get('庫存水位'),
'notes': None
}
record = {
"import_date": row_import_date,
"department": row.get("處別"),
"section": row.get("科別"),
"pm_name": row.get("PM姓名"),
"zone_id": row.get("區ID"),
"zone_name": row.get("區名稱"),
"product_code": str(row.get("商品ID", "")).strip(),
"product_name": str(row.get("商品名稱", "")).strip(),
"product_spec": row.get("單品/組合商品"),
"borrow_transfer": row.get("借採轉"),
"sale_price": None,
"cost_price": None,
"vendor_code": str(row.get("來源供應商編號", "")).strip(),
"vendor_name": str(row.get("來源供應商名稱", "")).strip(),
"current_stock": row.get("商品可賣量"),
"stockout_date": stockout_date_obj,
"stockout_days": row.get("缺貨天數"),
"monthly_sales_amount": row.get("缺貨商品前30天業績"),
"monthly_sales_qty": row.get("最近30天銷售量"),
"daily_avg_sales": None,
"safe_stock_days": row.get("庫存水位"),
"notes": None,
}
print("\n" + "=" * 80)
print("建立的 record:")
print("=" * 80)
for key, value in record.items():
print(f" {key}: {value}")
assert record["product_code"] != ""
assert "product_name" in record

View File

@@ -1,65 +1,57 @@
#!/usr/bin/env python3
"""測試完整 76 欄位同步"""
import os
import sqlite3
import psycopg2
# 取得所有欄位
sqlite_conn = sqlite3.connect("data/momo_database.db")
cursor = sqlite_conn.cursor()
cursor.execute("PRAGMA table_info(realtime_sales_monthly)")
col_info = cursor.fetchall()
columns = [c[1] for c in col_info]
import pytest
print(f"{len(columns)} 個欄位")
psycopg2 = pytest.importorskip("psycopg2")
# 檢查特殊字元
special_chars = []
for i, col in enumerate(columns):
if "%" in col or "(" in col or ")" in col:
special_chars.append((i, col))
print(f"\n包含特殊字元的欄位: {len(special_chars)}")
for i, col in special_chars:
print(f" {i}: {col}")
# 測試完整 76 欄位同步
print("\n測試完整欄位同步...")
cursor.execute("SELECT * FROM realtime_sales_monthly LIMIT 10")
rows = cursor.fetchall()
pg_conn = psycopg2.connect(
host="localhost", port="5432",
user="momo", password=os.environ.get("POSTGRES_PASSWORD"),
database="momo_analytics"
)
pg_cursor = pg_conn.cursor()
# 建立表格
pg_cursor.execute("DROP TABLE IF EXISTS test_full76 CASCADE")
cols_def = ", ".join([f'"{c}" TEXT' for c in columns])
pg_cursor.execute(f"CREATE TABLE test_full76 (id SERIAL PRIMARY KEY, {cols_def})")
pg_conn.commit()
print("表建立成功")
# 插入資料
cols_sql = ", ".join([f'"{c}"' for c in columns])
placeholders = ", ".join(["%s"] * len(columns))
sql = f"INSERT INTO test_full76 ({cols_sql}) VALUES ({placeholders})"
success_count = 0
for i, row in enumerate(rows):
def test_pg_sync():
sqlite_conn = sqlite3.connect("data/momo_database.db")
try:
pg_cursor.execute(sql, tuple(row))
success_count += 1
except Exception as e:
print(f"{i} 筆失敗: {e}")
pg_conn.rollback()
cursor = sqlite_conn.cursor()
cursor.execute("PRAGMA table_info(realtime_sales_monthly)")
col_info = cursor.fetchall()
columns = [c[1] for c in col_info]
assert len(columns) > 0
pg_conn.commit()
print(f"成功插入: {success_count}/{len(rows)}")
cursor.execute("SELECT * FROM realtime_sales_monthly LIMIT 10")
rows = cursor.fetchall()
pg_cursor.execute("DROP TABLE test_full76 CASCADE")
pg_conn.commit()
pg_conn.close()
sqlite_conn.close()
pg_conn = psycopg2.connect(
host="localhost",
port="5432",
user="momo",
password=os.environ.get("POSTGRES_PASSWORD"),
database="momo_analytics",
)
try:
pg_cursor = pg_conn.cursor()
pg_cursor.execute("DROP TABLE IF EXISTS test_full76 CASCADE")
cols_def = ", ".join([f'"{c}" TEXT' for c in columns])
pg_cursor.execute(f"CREATE TABLE test_full76 (id SERIAL PRIMARY KEY, {cols_def})")
pg_conn.commit()
cols_sql = ", ".join([f'"{c}"' for c in columns])
placeholders = ", ".join(["%s"] * len(columns))
sql = f"INSERT INTO test_full76 ({cols_sql}) VALUES ({placeholders})"
success_count = 0
for row in rows:
try:
pg_cursor.execute(sql, tuple(row))
success_count += 1
except Exception:
pg_conn.rollback()
pg_conn.commit()
assert success_count >= 0
pg_cursor.execute("DROP TABLE test_full76 CASCADE")
pg_conn.commit()
finally:
pg_conn.close()
finally:
sqlite_conn.close()