Files
ewoooc/scripts/e2e_test_ai_pipeline.py
ogt 0099543c05
Some checks failed
CD Pipeline / deploy (push) Failing after 5m18s
fix(security): 全域健檢 — 40 項安全/Bug/品質修復
🔴 Critical
- auto_heal_service: 補 import re + sqlalchemy.text + 修正 orchestrator 變數名
  + autoheal_playbook→playbooks 表名 + _alert_and_store cooldown 修復
- aider_heal_executor: shell injection 改 shell=False + list 參數
- docker-compose: DISABLE_LOGIN 改 env var + 移除密碼 fallback + POSTGRES_HOST 修正
- app.py: /api/backup /api/run_task 等 6 個管理 API 加 @login_required
- config.py + pg_sync + e2e_test: 移除 wooo_pg_2026 hardcoded 密碼 fallback
- pg_backup.sh: 移除 TELEGRAM_TOKEN= 中間變數,直接用 $TELEGRAM_BOT_TOKEN
- migration 014: trigger_pattern→match_pattern + 補 error_type NOT NULL 欄位

🟡 High
- telegram_bot_service: str(e) 改通用訊息 + session try/finally + 移除 pa:/pr: 舊 callback
- run_scheduler: ElephantAlpha thread 死亡監控 + 自動重啟 + Telegram 告警
  + agent_context 03:30 TTL 定時清理任務
- openclaw_learning_service: build_rag_context 兩路徑加 .limit(200)
- hooks: commit-quality + momo-prod-guard 空 catch 改 stderr+exit(1)
- scripts/code_review: auto_yes 預設改 false
- db_backup_service: PGPASSWORD 透過 env dict 傳遞

📦 Migrations
- 013_autoheal: 修正建表順序 playbooks→incidents(外鍵前向引用)
- 018_add_missing_indexes: heal_logs/incidents 外鍵索引 + cleanup_expired_agent_context()

🟢 Infrastructure
- requirements.txt: 加版本下界 Flask>=2.3 SQLAlchemy>=1.4 等
- cd.yaml: 新增 run_scheduler.py + run_telegram_bot.py 監聽路徑
- .gitignore: insert_playbook_local.py 加入忽略

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-22 01:12:23 +08:00

544 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
AI 競價情報模組 E2E 整合測試
通關清單(三道關卡):
關卡一Feeder 語意萃取 — PChome 搜尋 + 模糊比對 + tags 生成
關卡二Hermes 情境認知 — competitor_tags 注入後 JSON 品質
關卡三NemoTron + Telegram — DB 寫入 + 語意化排版 + 運算足跡
測試策略:使用 SQLite in-memory DB + 真實假資料,不依賴 Docker/PostgreSQL。
NIM 與 Telegram 可選跳過(設 SKIP_NIM=1 / SKIP_TELEGRAM=1 環境變數)。
執行:
cd /Users/ogt/momo-pro-system
python3 scripts/e2e_test_ai_pipeline.py
# 跳過真實 API純本地邏輯驗證
SKIP_NIM=1 SKIP_TELEGRAM=1 python3 scripts/e2e_test_ai_pipeline.py
# 啟用 Hermes需 192.168.0.111 可達)
SKIP_NIM=1 python3 scripts/e2e_test_ai_pipeline.py
"""
import json
import logging
import os
import sys
import time
from dataclasses import dataclass
from datetime import datetime, timedelta
# ── 確保從 project root 執行 ──────────────────────────
PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, PROJECT_ROOT)
from dotenv import load_dotenv
load_dotenv(os.path.join(PROJECT_ROOT, ".env"))
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
datefmt="%H:%M:%S",
)
logger = logging.getLogger("e2e")
SKIP_NIM = os.getenv("SKIP_NIM", "0") == "1"
SKIP_TELEGRAM = os.getenv("SKIP_TELEGRAM", "0") == "1"
SKIP_HERMES = os.getenv("SKIP_HERMES", "0") == "1"
SKIP_FEEDER = os.getenv("SKIP_FEEDER", "0") == "1"
USE_POSTGRESQL = os.getenv("USE_POSTGRESQL", "0") == "1"
# PostgreSQL 連線參數(從 env 讀,對齊 docker-compose.yml 預設值)
PG_HOST = os.getenv("POSTGRES_HOST", "127.0.0.1")
PG_PORT = os.getenv("POSTGRES_PORT", "5432")
PG_USER = os.getenv("POSTGRES_USER", "momo")
PG_PASSWORD = os.getenv("POSTGRES_PASSWORD")
PG_DB = os.getenv("POSTGRES_DB", "momo_analytics")
# ═══════════════════════════════════════════════════════════
# 測試基礎設施SQLite in-memory + 建表 + 假資料
# ═══════════════════════════════════════════════════════════
def _build_test_engine():
"""
建立測試 engine
- 預設SQLite in-memory不需任何外部服務
- USE_POSTGRESQL=1連接本地 Docker PostgreSQL需先跑 migrations
"""
from sqlalchemy import create_engine, text
if USE_POSTGRESQL:
dsn = f"postgresql+psycopg2://{PG_USER}:{PG_PASSWORD}@{PG_HOST}:{PG_PORT}/{PG_DB}"
logger.info(f"[Engine] 使用 PostgreSQL: {PG_HOST}:{PG_PORT}/{PG_DB}")
engine = create_engine(dsn, echo=False)
# PostgreSQL 模式:表已由 migrations 建立,直接回傳
return engine
# SQLite in-memory自建所有表
engine = create_engine("sqlite:///:memory:", echo=False)
with engine.begin() as conn:
# products
conn.execute(text("""
CREATE TABLE products (
id INTEGER PRIMARY KEY AUTOINCREMENT,
i_code TEXT UNIQUE NOT NULL,
name TEXT NOT NULL,
category TEXT,
status TEXT DEFAULT 'ACTIVE',
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
updated_at TEXT DEFAULT CURRENT_TIMESTAMP
)
"""))
# price_records
conn.execute(text("""
CREATE TABLE price_records (
id INTEGER PRIMARY KEY AUTOINCREMENT,
product_id INTEGER NOT NULL,
price REAL NOT NULL,
timestamp TEXT DEFAULT CURRENT_TIMESTAMP
)
"""))
# daily_sales_snapshot動態表欄位用實際 MOMO Excel 名稱)
conn.execute(text("""
CREATE TABLE daily_sales_snapshot (
id INTEGER PRIMARY KEY AUTOINCREMENT,
"商品ID" TEXT,
"商品名稱" TEXT,
"銷售金額" REAL,
snapshot_date TEXT
)
"""))
# competitor_prices
conn.execute(text("""
CREATE TABLE competitor_prices (
id INTEGER PRIMARY KEY AUTOINCREMENT,
sku TEXT NOT NULL,
source TEXT NOT NULL DEFAULT 'pchome',
price REAL NOT NULL,
original_price REAL,
discount_pct INTEGER,
competitor_product_id TEXT,
competitor_product_name TEXT,
match_score REAL,
tags TEXT DEFAULT '[]',
crawled_at TEXT DEFAULT CURRENT_TIMESTAMP,
expires_at TEXT,
UNIQUE(sku, source)
)
"""))
# ai_price_recommendations
conn.execute(text("""
CREATE TABLE ai_price_recommendations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
sku TEXT NOT NULL UNIQUE,
name TEXT NOT NULL,
reason TEXT,
strategy TEXT DEFAULT 'promote',
confidence REAL,
momo_price REAL,
pchome_price REAL,
gap_pct REAL,
sales_7d_delta REAL,
model_footprint TEXT DEFAULT '{}',
status TEXT DEFAULT 'pending',
reviewed_by TEXT,
reviewed_at TEXT,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
updated_at TEXT DEFAULT CURRENT_TIMESTAMP
)
"""))
return engine
def _seed_test_data(engine):
"""填充測試假資料"""
from sqlalchemy import text
today = datetime.now().strftime("%Y-%m-%d")
week_ago = (datetime.now() - timedelta(days=8)).strftime("%Y-%m-%d")
with engine.begin() as conn:
# 三支監控商品
products = [
("A003", "舒特膚AD乳液200ml", "美妝保養"),
("A001", "玻尿酸面膜10片裝", "美妝保養"),
("A009", "美白化妝水150ml", "美妝保養"),
]
for i, (i_code, name, cat) in enumerate(products, 1):
conn.execute(text("""
INSERT INTO products (i_code, name, category, status)
VALUES (:i_code, :name, :cat, 'ACTIVE')
ON CONFLICT (i_code) DO NOTHING
"""), {"i_code": i_code, "name": name, "cat": cat})
# 最新 MOMO 售價price_records透過 i_code subquery 取 product_id
prices = [
("A003", 1200.0),
("A001", 320.0),
("A009", 420.0),
]
for i_code, price in prices:
conn.execute(text("""
INSERT INTO price_records (product_id, price, timestamp)
SELECT id, :price, CURRENT_TIMESTAMP
FROM products WHERE i_code = :i_code
LIMIT 1
"""), {"i_code": i_code, "price": price})
# daily_sales_snapshot近7天銷量下滑 > 10%(漏斗條件)
sales_data = [
# sku, name, curr, prev, date
("A003", "舒特膚AD乳液200ml", 65000, 100000, today),
("A001", "玻尿酸面膜10片裝", 58000, 100000, today),
("A009", "美白化妝水150ml", 78000, 100000, today),
# 前一週資料(用 week_ago 日期)
("A003", "舒特膚AD乳液200ml", 100000, 0, week_ago),
("A001", "玻尿酸面膜10片裝", 100000, 0, week_ago),
("A009", "美白化妝水150ml", 100000, 0, week_ago),
]
for sku, name, amount, _, date in sales_data:
conn.execute(text("""
INSERT INTO daily_sales_snapshot ("商品ID", "商品名稱", "銷售金額", snapshot_date)
VALUES (:sku, :name, :amount, :date)
"""), {"sku": sku, "name": name, "amount": amount, "date": date})
# competitor_prices預填 PChome 競品資料(模擬 Feeder 已跑過)
expires = (datetime.now() + timedelta(hours=6)).strftime("%Y-%m-%d %H:%M:%S")
comp_data = [
("A003", 980.0, None, None, "DDABSD-A003FAKE", "舒特膚 AD 乳液 200ml",
0.82, '["on_sale","discount_10pct"]'),
("A001", 280.0, 320.0, 12, "DDABSD-A001FAKE", "玻尿酸保濕面膜10入",
0.76, '["discount_10pct"]'),
("A009", 350.0, 400.0, 12, "DDABSD-A009FAKE", "美白化妝水 150ml",
0.71, '["on_sale"]'),
]
for sku, price, orig, disc, comp_id, comp_name, score, tags in comp_data:
conn.execute(text("""
INSERT INTO competitor_prices
(sku, source, price, original_price, discount_pct,
competitor_product_id, competitor_product_name,
match_score, tags, crawled_at, expires_at)
VALUES
(:sku, 'pchome', :price, :orig, :disc,
:comp_id, :comp_name,
:score, :tags, CURRENT_TIMESTAMP, :expires)
ON CONFLICT(sku, source) DO NOTHING
"""), {
"sku": sku, "price": price, "orig": orig, "disc": disc,
"comp_id": comp_id, "comp_name": comp_name,
"score": score, "tags": tags, "expires": expires,
})
logger.info("✅ 測試資料填充完成")
# ═══════════════════════════════════════════════════════════
# 關卡一Feeder 語意萃取CLI 模式,不需 DB
# ═══════════════════════════════════════════════════════════
def gate1_feeder():
print("\n" + ""*60)
print("【關卡一】Feeder 語意萃取 — PChome 搜尋 + 比對 + tags")
print(""*60)
if SKIP_FEEDER:
print("⏭️ SKIP_FEEDER=1跳過假設 competitor_prices 已由 seed 填入)")
return True
from services.pchome_crawler import PChomeCrawler
from services.competitor_price_feeder import _find_best_match, _extract_tags, MIN_MATCH_SCORE
test_skus = [
{"sku": "A003", "name": "舒特膚AD乳液200ml"},
{"sku": "A001", "name": "玻尿酸面膜10片裝"},
]
crawler = PChomeCrawler(delay=0.8)
ok_count = 0
for item in test_skus:
keyword = item["name"][:20]
success, _, products = crawler.search_products(keyword, limit=10)
if not success or not products:
print(f" ⚠️ [{item['sku']}] PChome 搜尋無結果(網路問題或無此商品)")
continue
result = _find_best_match(item["name"], products)
if not result:
print(f" ⚠️ [{item['sku']}] 無法找到比對結果")
continue
best, score = result
tags = _extract_tags(best)
status = "" if score >= MIN_MATCH_SCORE else "⚠️ 低分"
print(f" {status} [{item['sku']}] {item['name'][:25]}")
print(f" PChome: {best.name[:40]}")
print(f" 售價 ${best.price} | 分數 {score:.3f} | 標籤 {tags}")
if score >= MIN_MATCH_SCORE:
ok_count += 1
print(f"\n → 關卡一結果:{ok_count}/{len(test_skus)} 支成功比對")
return ok_count > 0
# ═══════════════════════════════════════════════════════════
# 關卡二Hermes 情境認知
# ═══════════════════════════════════════════════════════════
def gate2_hermes(engine):
print("\n" + ""*60)
print("【關卡二】Hermes 情境認知 — competitor_tags 注入後 JSON 品質")
print(""*60)
if SKIP_HERMES:
print("⏭️ SKIP_HERMES=1跳過")
return _mock_threats()
from services.hermes_analyst_service import HermesAnalystService
# 用 SQLite 版的 fetch_candidates需調整 SQL
# SQLite 不支援 PostgreSQL 特有語法,改為帶資料的直接查詢
service = HermesAnalystService(engine=engine)
from sqlalchemy import text
with engine.connect() as conn:
rows = conn.execute(text("""
SELECT
p.i_code AS sku, p.name, p.category,
pr.price AS momo_price,
(SELECT SUM(CAST("銷售金額" AS REAL))
FROM daily_sales_snapshot WHERE "商品ID" = p.i_code
AND date(snapshot_date) = date('now')) AS sales_7d_curr,
(SELECT SUM(CAST("銷售金額" AS REAL))
FROM daily_sales_snapshot WHERE "商品ID" = p.i_code
AND date(snapshot_date) < date('now')) AS sales_7d_prev,
cp.price AS pchome_price,
cp.tags AS competitor_tags
FROM products p
JOIN price_records pr ON pr.product_id = p.id
LEFT JOIN competitor_prices cp ON cp.sku = p.i_code AND cp.source = 'pchome'
WHERE p.status = 'ACTIVE'
""")).fetchall()
candidates = [dict(r._mapping) for r in rows]
logger.info(f"[關卡二] 候選商品 {len(candidates)}")
if not candidates:
print(" ❌ 沒有候選商品,測試資料可能有誤")
return []
# 顯示 candidates 驗證
for c in candidates:
tags = c.get("competitor_tags") or ""
print(f"{c['sku']} | MOMO ${c['momo_price']} | PChome ${c.get('pchome_price','?')} "
f"| tags={tags}")
# 呼叫 Hermes
start = time.time()
raw = service._batch_analyze(candidates)
duration = round(time.time() - start, 2)
if not raw:
print(" ⚠️ Hermes 回傳空結果Ollama 未啟動或逾時)")
return _mock_threats()
print(f"\n ✅ Hermes 推理完成 ({duration}s),回傳 {len(raw)}")
for item in raw[:3]:
tags_mention = "competitor_tags" in str(item.get("recommended_action", ""))
print(f" [{item['sku']}] {item.get('risk','?')} | "
f"confidence={item.get('confidence','?')} | "
f"action={item.get('recommended_action','')[:50]}")
if tags_mention:
print(" 🎯 AI 洞察中包含競品標籤資訊")
from services.hermes_analyst_service import PriceThreat
threats = [
PriceThreat(
sku=t["sku"], name=t["name"],
category=t.get("category", ""),
momo_price=t.get("momo_price", 0),
pchome_price=t.get("pchome_price", 0),
gap_pct=t.get("gap_pct", 0),
sales_7d_delta_pct=t.get("sales_7d_delta_pct", 0),
risk=t.get("risk", "LOW"),
recommended_action=t.get("recommended_action", ""),
confidence=t.get("confidence", 0.5),
)
for t in raw
]
return threats, {"duration_sec": duration, "tokens": 512}
def _mock_threats():
"""Hermes 跳過時的假威脅清單"""
from services.hermes_analyst_service import PriceThreat
threats = [
PriceThreat("A003", "舒特膚AD乳液200ml", "美妝保養",
1200, 980, 22.4, -35.0, "HIGH",
"建議立即降價至 $1,000競品有 on_sale 促銷標籤)", 0.85),
PriceThreat("A001", "玻尿酸面膜10片裝", "美妝保養",
320, 280, 14.3, -42.0, "HIGH",
"跟進降價至 $285競品有 discount_10pct", 0.78),
PriceThreat("A009", "美白化妝水150ml", "美妝保養",
420, 350, 20.0, -22.0, "HIGH",
"價格差距大,建議促銷或捆包", 0.45),
]
return threats, {"duration_sec": 0, "tokens": 0}
# ═══════════════════════════════════════════════════════════
# 關卡三NemoTron + DB 寫入 + Telegram
# ═══════════════════════════════════════════════════════════
def gate3_dispatcher(engine, threats, hermes_stats):
print("\n" + ""*60)
print("【關卡三】NemoTron 決策 + DB 寫入 + Telegram 排版")
print(""*60)
from services.nemoton_dispatcher_service import NemotronDispatcher
# Mock Telegram不真實發送只印出訊息
class MockNotificationManager:
def _send_telegram_messages(self, msgs):
for msg in msgs:
print("\n 📩 [Telegram 預覽]")
print(" " + "\n ".join(msg.split("\n")))
nm = None if not SKIP_TELEGRAM else MockNotificationManager()
if SKIP_NIM:
print(" ⏭️ SKIP_NIM=1使用 Fallback 路徑直接派發 HIGH 威脅")
# 手動模擬 dispatch fallback 路徑
dispatcher = NemotronDispatcher(notification_manager=nm, engine=engine)
from services.nemoton_dispatcher_service import (
_build_footprint_block, _build_footprint_json, NIM_DAILY_LIMIT
)
fake_nim_stats = {"total_tokens": 185, "quota_used": 1}
fp_text = _build_footprint_block(hermes_stats, fake_nim_stats)
fp_data = _build_footprint_json(hermes_stats, fake_nim_stats)
dispatched = 0
for t in threats:
if t.risk == "HIGH":
dispatcher._exec_trigger_price_alert(
t.sku, t.name, t.gap_pct, t.sales_7d_delta_pct,
t.recommended_action, t.confidence,
momo_price=t.momo_price, comp_price=t.pchome_price,
footprint=fp_text,
)
dispatched += 1
# 對 A009 測試 add_to_recommendation
if t.sku == "A009":
dispatcher._exec_add_to_recommendation(
t.sku, t.name,
"我方價格低於市場近7天流量上升建議提升曝光",
t.confidence,
footprint=fp_text,
footprint_data=fp_data,
threat=t,
)
result = {"dispatched": dispatched, "skipped": 0, "errors": [], "nim_stats": fake_nim_stats}
else:
dispatcher = NemotronDispatcher(notification_manager=nm, engine=engine)
result = dispatcher.dispatch(threats, hermes_stats=hermes_stats)
print(f"\n → dispatched={result['dispatched']} | "
f"skipped={result['skipped']} | errors={result['errors']}")
# 驗證 DB 寫入
_verify_db(engine)
return result
def _verify_db(engine):
"""驗證 DB 寫入是否正確關鍵model_footprint 必須是真實 JSON不是 {}"""
from sqlalchemy import text
print("\n 🔍 DB 寫入驗證:")
with engine.connect() as conn:
recs = conn.execute(text(
"SELECT sku, name, confidence, model_footprint, status FROM ai_price_recommendations"
)).fetchall()
if not recs:
print(" ⚠️ ai_price_recommendations 無資料add_to_recommendation 未被觸發)")
return
all_ok = True
for r in recs:
fp = r._mapping.get("model_footprint", "{}")
try:
fp_dict = json.loads(fp) if isinstance(fp, str) else fp
has_analyst = "analyst" in fp_dict
has_dispatcher = "dispatcher" in fp_dict
fp_ok = has_analyst or has_dispatcher
except Exception:
fp_ok = False
status = "" if fp_ok else "❌ model_footprint 為空 {}"
print(f" {status} [{r._mapping['sku']}] "
f"confidence={r._mapping['confidence']} "
f"status={r._mapping['status']} "
f"footprint_keys={list(json.loads(fp).keys()) if fp_ok else '[]'}")
if not fp_ok:
all_ok = False
if all_ok:
print(" ✅ 所有推薦記錄的 model_footprint 均已正確寫入 DB")
else:
print(" ❌ 部分記錄 model_footprint 為空,請檢查 footprint_data 傳遞路徑")
# ═══════════════════════════════════════════════════════════
# 主程式
# ═══════════════════════════════════════════════════════════
def main():
print("=" * 60)
print("🚀 AI 競價情報模組 E2E 整合測試")
print(f" 模式NIM={'跳過' if SKIP_NIM else '真實'} | "
f"Hermes={'跳過' if SKIP_HERMES else '真實'} | "
f"Telegram={'Mock' if SKIP_TELEGRAM else '真實'} | "
f"Feeder={'跳過' if SKIP_FEEDER else '真實'}")
print("=" * 60)
# 建立測試 DB
engine = _build_test_engine()
_seed_test_data(engine)
# 關卡一
gate1_feeder()
# 關卡二
result2 = gate2_hermes(engine)
if isinstance(result2, tuple):
threats, hermes_stats = result2
else:
threats, hermes_stats = result2, {"duration_sec": 0, "tokens": 0}
# 關卡三
gate3_dispatcher(engine, threats, hermes_stats)
print("\n" + ""*60)
print("✅ E2E 測試完成")
print(""*60)
print()
print("下一步:")
print(" 1. 啟動 Docker Desktop → docker compose -f docker-compose.yml up -d postgres")
print(" 2. psql -U momo -h 127.0.0.1 -d momo_analytics -f migrations/003_ai_price_recommendations.sql")
print(" 3. psql -U momo -h 127.0.0.1 -d momo_analytics -f migrations/004_competitor_prices.sql")
print(" 4. USE_POSTGRESQL=true python3 scripts/e2e_test_ai_pipeline.py")
if __name__ == "__main__":
main()