diff --git a/.claude/settings.json b/.claude/settings.json
index c59ece2..8eb928a 100644
--- a/.claude/settings.json
+++ b/.claude/settings.json
@@ -37,7 +37,19 @@
"Bash(python3 -c \"import py_compile; py_compile.compile\\('services/daily_sales_service.py', doraise=True\\); py_compile.compile\\('utils/df_helpers.py', doraise=True\\); print\\('ALL SYNTAX OK'\\)\")",
"Bash(ssh wooo@192.168.0.110 \"ssh ollama@192.168.0.188 'ls -la /home/ollama/momo-pro/scripts/tools/sanitize_momo_urls.py /home/ollama/momo-pro/utils/momo_url_utils.py 2>&1'\")",
"Bash(ssh wooo@192.168.0.110 \"ssh ollama@192.168.0.188 'cd /home/ollama/momo-pro && git pull 2>&1 | tail -15'\")",
- "Bash(ssh wooo@192.168.0.110 \"ssh ollama@192.168.0.188 'docker exec -e PYTHONPATH=/app -w /app momo-pro-system python3 /tmp/sanitize_momo_urls.py --commit 2>&1 | tail -8'\")"
+ "Bash(ssh wooo@192.168.0.110 \"ssh ollama@192.168.0.188 'docker exec -e PYTHONPATH=/app -w /app momo-pro-system python3 /tmp/sanitize_momo_urls.py --commit 2>&1 | tail -8'\")",
+ "Bash(python3 -m pytest tests/ --collect-only -q)",
+ "Bash(python3 -c \"import sys; print\\(sys.executable\\)\")",
+ "Bash(ls venv/bin/python3)",
+ "Bash(ls .venv/bin/python3)",
+ "Bash(awk '{print $2\"/pytest\"}')",
+ "Bash(/Users/ooo/Documents/momo_pro_system/venv/bin/pytest tests/ --collect-only -q)",
+ "Bash(git -C \"/Users/ooo/Library/Mobile Documents/com~apple~CloudDocs/momo-pro-system\" ls-files k8s/01-secrets.yaml k8s/03-secrets.yaml k8s/08-google-drive-secret.yaml)",
+ "Bash(git -C \"/Users/ooo/Library/Mobile Documents/com~apple~CloudDocs/momo-pro-system\" log --oneline --all -- k8s/01-secrets.yaml k8s/03-secrets.yaml k8s/08-google-drive-secret.yaml)",
+ "Bash(git -C \"/Users/ooo/Library/Mobile Documents/com~apple~CloudDocs/momo-pro-system\" ls-files scripts/docker_health_monitor.sh scripts/cicd_auto_repair.sh)",
+ "Bash(git -C \"/Users/ooo/Library/Mobile Documents/com~apple~CloudDocs/momo-pro-system\" show 1b4f3a7:k8s/01-secrets.yaml)",
+ "Bash(sed -n '515,522p' \"/Users/ooo/Library/Mobile Documents/com~apple~CloudDocs/momo-pro-system/scheduler.py\")",
+ "Bash(sed -n '1108,1114p' \"/Users/ooo/Library/Mobile Documents/com~apple~CloudDocs/momo-pro-system/scheduler.py\")"
],
"defaultMode": "bypassPermissions",
"additionalDirectories": [
diff --git a/CONSTITUTION.md b/CONSTITUTION.md
index 3866b74..1be21c7 100644
--- a/CONSTITUTION.md
+++ b/CONSTITUTION.md
@@ -332,7 +332,7 @@
## 第十三章:AI 四 Agent 自主學習與自動化架構規範(2026-04-29 修訂)
### 第 40 條:四 Agent 分工架構(絕對禁止違反)
-- **Hermes(採集層)**: `192.168.0.111` Ollama,負責 embedding、去重、品質分數計算。成本 = $0
+- **Hermes(採集層)**: Primary `34.143.170.20` (GCP SSD) / Secondary `34.21.145.224` (GCP SSD) / Fallback `192.168.0.111` Ollama,負責 embedding、去重、品質分數計算。成本 = $0
- **NemoTron(處理層)**: NVIDIA NIM Llama 3.1 8B,負責 tool calling 邏輯路由與 DB 寫入。限額 80 次/天
- **OpenClaw / Gemini(應用層)**: 負責最終 PPT 生成、洞察報告對外輸出。成本最高,最後動用
- **ElephantAlpha(編排層)**: 負責跨 Agent orchestration、HITL、AutoHeal bridge 與受控執行計畫,不可繞過安全入口
@@ -353,7 +353,7 @@
- **理由**:混合查詢(`WHERE` 結構化 + `ORDER BY embedding <->` 語意)只有 pgvector 能一條 SQL 搞定(ADR-002)
### 第 43 條:Embedding 本地化(強制要求)
-- ✅ **正確**:使用 `bge-m3`(或 `nomic-embed-text`)掛載在 Hermes 主機 `192.168.0.111` Ollama
+- ✅ **正確**:使用 `bge-m3` 掛載在 Primary `34.143.170.20` (GCP SSD) 或 Fallback `192.168.0.111` Ollama
- ❌ **禁止**:呼叫外部 Embedding API(成本與隱私雙重問題)
- **維度**:1024 dim(`vector(1024)` 欄位)(ADR-003)
diff --git a/TODO_NEXT_STEPS.txt b/TODO_NEXT_STEPS.txt
index 115ee9c..133f502 100644
--- a/TODO_NEXT_STEPS.txt
+++ b/TODO_NEXT_STEPS.txt
@@ -1,4 +1,78 @@
+================================================================================
+ 分析報表 V2 視覺統一 (2026-05-05) [DONE]
+================================================================================
+
+【已完成】
+ - 2026-05-06 視覺複核:重做 `/daily_sales` 行事曆元件視覺,移除舊紫藍系,改為 V2 暖紙、暖墨、焦糖、點陣、等寬數字與高對比資料列。
+ - 2026-05-06 視覺複核:提高四個分析頁共用字級、表格、small/text-muted、badge、card header 對比,避免文字過淡或過小。
+ - `app.py` 與 `config.py` 版本更新至 V10.81。
+ - 新增 `web/static/css/analysis-workbench.css`,統一分析報表頁的工作台底色、卡片、表格、表單、徽章與分頁樣式。
+ - 新版 sidebar 的「分析報表」改為可展開第二層,直接露出業績分析、當日業績、成長分析、月份總表與外部 Metabase/Grist。
+ - `/sales_analysis` 與 `/growth_analysis` 已從舊上方 navbar 轉入新版 `ewoooc_base.html` shell。
+ - `/daily_sales` 與 `/monthly_summary_analysis` 新增一致的分析報表分頁捷徑。
+ - 新增 `templates/components/_analysis_report_tabs.html`,把四頁共用的分析報表分頁抽成單一元件,並補上 Metabase/Grist 外部入口。
+ - 報表表格與 DataTables 容器補行動版橫向捲動,避免手機版表格撐破版面。
+ - 修復報表前端例外:`/daily_sales` 補 Chart.js 載入,`/sales_analysis` 在無圖表 canvas 的狀態下不再硬初始化圖表。
+ - `/sales_analysis` 首次進頁預設導到最近 1 個月資料,避免只顯示引導卡而沒有圖表。
+ - 新增 `web/static/js/analysis-chart-theme.js`,把 Chart.js / ECharts 統一套用 EwoooC 暖紙、暖墨、焦糖、等寬數字、點陣背景與專業 tooltip/legend/axis 主題。
+ - 第二波圖表優化:ECharts 軸線、legend、tooltip、label、visualMap 深度套用 V2 主題;手機版自動降低圖上 label 密度。
+ - 第二波圖表優化:Chart.js 增加一致 hover/tooltip interaction、柱狀圓角、手機 tick auto-skip 與 resize delay。
+ - 第二波圖表優化:ECharts chart 容器接入 ResizeObserver / IntersectionObserver,進入視窗或容器變動時自動 resize 重繪。
+ - 報表主題 CSS/JS 改帶 `system_version` 快取參數,避免使用者瀏覽器沿用舊版視覺檔案。
+ - 補 `/favicon.ico` 導向站台 logo,避免分析頁瀏覽器巡檢出現無關 404 noise。
+ - 2026-05-06 複核:生產目錄曾回到舊版 `V10.76` 模板;已重新同步 bind-mounted routes/templates/static/config,並把 `system_version` 注入移到 sales/daily blueprint,避免依賴未 bind mount 的 `app.py`。
+ - `analysis-workbench.css` 補 chart canvas 點陣紙感背景,讓圖表更貼近 V2 視覺規範。
+ - `app.py` 與 `config.py` 版本更新至 V10.79,並把 `system_version` 注入全域模板變數。
+ - 線上 188 已熱修並只重建 `momo-app`,未碰 `momo-db`。
+
+【驗證】
+ - `sales_analysis.html`、`growth_analysis.html`、`daily_sales.html`、`monthly_summary_analysis.html`、`ewoooc_base.html` Jinja 編譯通過。
+ - `components/_analysis_report_tabs.html` Jinja 編譯通過。
+ - Python route 語法檢查通過。
+ - 線上 `/sales_analysis`、`/daily_sales`、`/growth_analysis`、`/monthly_summary_analysis` 皆回 HTTP 200。
+ - 線上 `/static/css/analysis-workbench.css` 回 HTTP 200,報表頁 HTML 已載入新版 shell 與報表分頁。
+ - Playwright 桌機/手機 viewport 巡檢:四頁皆無整頁橫向爆版;分頁在手機版可橫向滑動。
+ - Playwright 圖表像素巡檢通過:`/sales_analysis` 15 張 Chart.js、`/daily_sales` 6 張 Chart.js、`/growth_analysis` 4 張 Chart.js、`/monthly_summary_analysis` 13 個 ECharts canvas 皆有實際非空像素。
+ - 圖表主題巡檢通過:前三頁 `analysis-chart-theme.js` / Chart.js wrapper 生效,月份總表 ECharts wrapper 生效。
+ - 月份總表手機版已逐張 scroll into view 後複驗 13 張 ECharts,避免離屏 canvas 尚未 rasterize 時被誤判為空白。
+ - 第二波線上巡檢通過:四頁桌機/手機皆載入 `V10.79` 主題資源,console 無圖表相關錯誤,body 無橫向爆版。
+ - `analysis-chart-theme.js` Node 語法檢查通過。
+
+【後續可優化】
+ - 用瀏覽器截圖巡檢桌面 / 手機 viewport,微調 DataTables、Chart.js 區塊高度與行動版橫向捲動。
+
+
+================================================================================
+ 當日業績匯入驗證修復 (2026-05-05) [DONE]
+================================================================================
+
+【已完成】
+ - 2026-05-06 複核:線上 `momo-scheduler` 曾仍綁在舊 `scheduler.py` inode,導致 Telegram 繼續用全表筆數比對誤報;已重新同步 `scheduler.py` / `services/import_service.py` 並重建 compose service `scheduler`。
+ - 2026-05-06 複核:線上 scoped 驗證 `2026-05-01 ~ 2026-05-05` 通過,`daily_sales_snapshot=3271`、`realtime_sales_monthly=3271`。
+ - 修復 `scheduler.py` 匯入後驗證:只比對本次匯入日期範圍,不再用 `daily_sales_snapshot` 與 `realtime_sales_monthly` 全表筆數硬比。
+ - 修復 0 筆 / 日期未知誤報:缺少匯入日期範圍或本次匯入 0 筆時,直接回報可診斷錯誤,不跑全表驗證。
+ - 強化 `services/import_service.py`:有檔案但 0 個成功匯入時改回報失敗,避免 Telegram 顯示「成功但 0 筆」。
+ - 新增 Excel 表頭自動偵測:可處理前幾列為說明文字、真正表頭不在第一列的 MOMO 匯出檔。
+ - 日期清單 SQL 改為 expanding bind 參數化,移除匯入日期 `IN (...)` 字串拼接。
+ - 成功摘要新增 `imported_dates` 與 `data_lag_days`,自動匯入通知會顯示最新資料落後天數。
+ - 匯入失敗檔案會嘗試移至 Google Drive `匯入失敗` 資料夾,避免壞檔在待匯入資料夾中反覆觸發。
+ - daily snapshot 與 monthly sync 已改成同一個 DB transaction,任一步失敗會一起 rollback,避免半套資料。
+ - 修復 `/daily_sales` 500:`templates/daily_sales.html` 補上缺失的 Jinja `{% endblock %}`。
+ - 線上 188 已熱修並只重建 `momo-app` / `momo-scheduler`,未碰 `momo-db`。
+
+【驗證】
+ - 線上 `2026-04-01 ~ 2026-05-03` scoped 驗證通過:兩表皆 19,934 筆。
+ - 線上 `/daily_sales` 回復 HTTP 200。
+ - 線上表頭偵測探針通過:真正表頭位於第 2 列時可正確讀取 `日期 / 商品名稱 / 銷售金額`。
+ - 線上臨時 SQLite 原子匯入 smoke 通過:舊資料被覆蓋,兩表皆為新資料 2 筆。
+ - 本機與線上 `python3 -m py_compile scheduler.py services/import_service.py` 通過。
+
+【後續可優化】
+ - 若未來匯入量再放大,可再升級為 staging table + merge/upsert,降低大批量 delete/append 鎖表時間。
+ - 補正式 pytest 匯入測試環境,目前本機系統 Python 缺 `pytest`。
+
+
================================================================================
AI 自動化閉環治理同步 (2026-04-29) [DONE]
================================================================================
diff --git a/app.py b/app.py
index 7f811ce..9db298d 100644
--- a/app.py
+++ b/app.py
@@ -96,7 +96,7 @@ except Exception as e:
# 🚩 系統版本定義 (備份與顯示用)
# 🚩 2026-05-01 V10.76: Move monthly analysis report onto V2 shell
-SYSTEM_VERSION = "V10.77"
+SYSTEM_VERSION = "V10.83"
# ==========================================
# 🔒 SQL Injection 防護函數
@@ -142,6 +142,12 @@ app = Flask(__name__,
template_folder=TEMPLATE_DIR,
static_folder=STATIC_DIR)
+
+@app.route('/favicon.ico')
+def favicon():
+ """Serve a lightweight site icon so browser checks do not create noisy 404s."""
+ return redirect(url_for('static', filename='images/logo_circle.svg'))
+
# ==========================================
# 🔒 Flask 安全配置
# ==========================================
@@ -412,7 +418,7 @@ verify_metadata_tables()
# ==========================================
# 🔧 全域模板變數注入 (Context Processor)
# ==========================================
-from config import METABASE_URL, GRIST_URL
+from config import METABASE_URL, GRIST_URL, SYSTEM_VERSION as CONFIG_SYSTEM_VERSION
@app.context_processor
def inject_global_vars():
@@ -420,6 +426,7 @@ def inject_global_vars():
return {
'metabase_url': METABASE_URL,
'grist_url': GRIST_URL,
+ 'system_version': CONFIG_SYSTEM_VERSION,
'datetime_now': datetime.now(TAIPEI_TZ).strftime('%Y-%m-%d %H:%M:%S'),
}
diff --git a/config.py b/config.py
index 1c0b1f7..8d309f8 100644
--- a/config.py
+++ b/config.py
@@ -307,7 +307,7 @@ YOUTUBE_API_KEY = os.getenv('YOUTUBE_API_KEY', '')
# ==========================================
# 系統版本與路徑
# ==========================================
-SYSTEM_VERSION = "V10.76"
+SYSTEM_VERSION = "V10.83"
LOG_FILE_PATH = os.path.join(BASE_DIR, 'logs/system.log')
public_url = PUBLIC_URL # 用於模板顯示
diff --git a/docker-compose.yml b/docker-compose.yml
index b530041..8779696 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -88,7 +88,7 @@ services:
- POSTGRES_PASSWORD=${POSTGRES_PASSWORD}
- POSTGRES_DB=${POSTGRES_DB:-momo_analytics}
# Ollama 主機:GCP 優先 / 111 自動備援(ADR-003)
- - OLLAMA_HOST_PRIMARY=${OLLAMA_HOST_PRIMARY:-http://34.21.145.224:11434}
+ - OLLAMA_HOST_PRIMARY=${OLLAMA_HOST_PRIMARY:-http://192.168.0.110:11435}
- OLLAMA_HOST_FALLBACK=${OLLAMA_HOST_FALLBACK:-http://192.168.0.111:11434}
# EMBEDDING_HOST 若未設定,由 resolve_ollama_host() 自動決定(GCP 優先)
- EMBEDDING_HOST=${EMBEDDING_HOST:-}
@@ -215,7 +215,7 @@ services:
- USE_POSTGRESQL=true
- POSTGRES_PORT=5432
# Ollama 主機:GCP 優先 / 111 自動備援(ADR-003)
- - OLLAMA_HOST_PRIMARY=${OLLAMA_HOST_PRIMARY:-http://34.21.145.224:11434}
+ - OLLAMA_HOST_PRIMARY=${OLLAMA_HOST_PRIMARY:-http://192.168.0.110:11435}
- OLLAMA_HOST_FALLBACK=${OLLAMA_HOST_FALLBACK:-http://192.168.0.111:11434}
- EMBEDDING_HOST=${EMBEDDING_HOST:-}
env_file:
@@ -272,7 +272,7 @@ services:
- USE_POSTGRESQL=true
- POSTGRES_PORT=5432
# Ollama 主機:GCP 優先 / 111 自動備援(ADR-003)
- - OLLAMA_HOST_PRIMARY=${OLLAMA_HOST_PRIMARY:-http://34.21.145.224:11434}
+ - OLLAMA_HOST_PRIMARY=${OLLAMA_HOST_PRIMARY:-http://192.168.0.110:11435}
- OLLAMA_HOST_FALLBACK=${OLLAMA_HOST_FALLBACK:-http://192.168.0.111:11434}
- EMBEDDING_HOST=${EMBEDDING_HOST:-}
env_file:
diff --git a/docs/AI_INTELLIGENCE_MODULE_SOT.md b/docs/AI_INTELLIGENCE_MODULE_SOT.md
index b863e8a..e015dd1 100644
--- a/docs/AI_INTELLIGENCE_MODULE_SOT.md
+++ b/docs/AI_INTELLIGENCE_MODULE_SOT.md
@@ -1,7 +1,7 @@
# MOMO PRO — AI 競價情報模組 Single Source of Truth
-> **最後更新**: 2026-05-01 (台北時間)
-> **狀態**: 🟢 四 AI Agent 自動化閉環已落地 — EventRouter / AutoHeal / OpenClaw Memory / ElephantAlpha bridge / Prometheus metrics / Smoke Dashboard / Smoke Trend Management / Telegram Summary / Grafana provisioning / Prometheus scrape / CD Gunicorn 掛載具測試覆蓋
+> **最後更新**: 2026-05-05 (台北時間)
+> **狀態**: 🟢 雙 GCP SSD 節點建置完成 (34.143.170.20 & 34.21.145.224),實現高可用高效能推理。
> **適用版本**: V10.22 Legacy 5888 入口清理版
---
@@ -11,8 +11,9 @@
```
SQL漏斗(~300筆)
↓
-[Hermes 3 8B] — 分析師 (本地 Ollama, 零成本)
- 模型: hermes3:latest @ 192.168.0.111:11434
+[Hermes 3 8B] — 分析師 (GCP / 本地)
+ 模型: hermes3:latest, qwen2.5-coder:7b, llama3.2, gemma3, deepseek-r1:14b 等全量模型
+ 主機: 34.143.170.20:11434 (Primary - SSD) / 34.21.145.224:11434 (Secondary - SSD) / 192.168.0.111:11434 (Fallback)
任務: 競價威脅分類 → TOP 20 HIGH/MED/LOW
↓
[NemoTron NIM] — 派發器 (雲端, 免費配額)
@@ -41,7 +42,7 @@ SQL漏斗(~300筆)
| 角色 | 模型 | 主機 | 成本 | 每日限額 |
|------|------|------|------|---------|
-| Hermes 分析師 | hermes3:latest / embedding model | 192.168.0.111:11434 或 188 Ollama | 零 | 無限 |
+| Hermes 分析師 | 全量模型 (hermes3, qwen2.5-coder, llama3.2, deepseek-r1 等) | 34.143.170.20 (Primary-SSD) / 34.21.145.224 (Secondary-SSD) / 192.168.0.111 (Fallback) | 零 | 無限 |
| NemoTron 派發器 | meta/llama-3.1-8b-instruct | NVIDIA NIM | 免費 80/天 | 80 |
| OpenClaw 策略師 | Gemini | 雲端 | 需審批 | — |
| ElephantAlpha 編排者 | ElephantAlpha | 依部署環境 | 受控 | HITL / 任務制 |
@@ -67,6 +68,9 @@ SQL漏斗(~300筆)
- `momo-db` / `momo-postgres` 不可被 AI 自動 restart / stop / recreate。
- raw `ai_insights` insert 必須接 `enqueue_insight_embedding()` 或可被 backfill。
- ElephantAlpha 只做編排與 bridge,不可繞過 ADR-011 / ADR-012 / ADR-013。
+- ElephantAlpha `resource_optimization` 只可由 operational safe-action queue 或真實系統負載觸發;`openclaw_recommendation`、`human_review` 等人工/商業待辦不得視為系統資源壓力,也不得產生「自主執行」噪音告警。
+- ElephantAlpha trigger cooldown 必須使用 `data/elephant_alpha_cache.db` 持久化;thread 或容器重啟後仍不得在冷卻期重複推送同類告警。
+- ElephantAlpha L3 HITL 告警必須有可驗證資料:價格類需 Hermes 具體 SKU/金額,資源類需 action queue 或 CPU load 超標,程式碼類需容器 log 例外;無實證低信心升級只能記錄 suppressed cooldown,不得發 Telegram 或寫入 `human_review` 噪音待辦。
- ElephantAlpha / NemoTron 不可直接執行商品價格調整;`execute_price_adjustment`、`adjust_price` 等動作必須攔截並寫入 `human_review`,等待人工核准。
可觀測性:
diff --git a/docs/adr/ADR-021-ea-hitl-prefetch-and-alert-impact.md b/docs/adr/ADR-021-ea-hitl-prefetch-and-alert-impact.md
index 1d2f902..d36ecc7 100644
--- a/docs/adr/ADR-021-ea-hitl-prefetch-and-alert-impact.md
+++ b/docs/adr/ADR-021-ea-hitl-prefetch-and-alert-impact.md
@@ -45,8 +45,8 @@
蓋掉原本的 plan 元流程文字。**強制配套限制**:
- `asyncio.wait_for(timeout=5)` 短超時:Hermes 熱駐留 < 10s,但冷啟動會拖到 30s+,HITL 訊息延遲不可大於 10s
-- Pre-fetch 失敗(timeout / 0 threats / 全部缺金額)→ fallback 回原 plan 文字,**不中斷 escalation 主流程**
-- 「全部行皆缺金額」也視同無料 fallback,避免「乾巴巴兩行 MOMO/PChome 比價」比 plan 文字更空泛
+- Pre-fetch 失敗(timeout / 0 threats / 全部缺金額)→ 視為無實證升級,寫入 suppressed cooldown,**不得 fallback 回 plan 文字、不得發 Telegram、不得寫入 `human_review` 噪音待辦**
+- 「全部行皆缺金額」也視同無料 suppressed,避免「乾巴巴兩行 MOMO/PChome 比價」比 plan 文字更空泛
### 規則 2 — NemoTron 告警必填金額影響量化
@@ -78,7 +78,7 @@
|------|--------|------|------|
| Critical-1 | CRITICAL | `user_label` 直接 HTML 拼接 → username 注入 `/` 破版 | `html.escape()` 雙重 escape |
| High-1 | HIGH | Pre-fetch Hermes 同步阻塞 escalation cooldown 視窗(30-60s) | `asyncio.wait_for(timeout=5)` |
-| High-2 | HIGH | Hermes 有 threats 但全部缺金額時 → 兩行乾巴巴比價反而更空泛 | `any_concrete` 判斷,全缺則 `return None` 觸發 plan fallback |
+| High-2 | HIGH | Hermes 有 threats 但全部缺金額時 → 兩行乾巴巴比價反而更空泛 | `any_concrete` 判斷,全缺則 `return None` 觸發 suppressed cooldown |
| Medium-2 | MEDIUM | 空 `event_id` callback 寫入 `'unknown'` 污染 audit | prefix 解析後即拒絕 |
| Medium-3 | MEDIUM | `gap_pct ≤ 0` 但 `prev > curr` 仍顯示「流失」誤導降價 | `revenue_loss_7d` 條件改為 `if gap_pct > 0` |
@@ -98,12 +98,12 @@
- EA 升級審核 Telegram 內容從元流程描述變為「具體 SKU + 價格 + 金額流失 + 建議調價」,HITL 真正可決策
- NemoTron 既有告警再升級,每筆都帶可批准/駁回的金額判斷依據
- `momo:eig:` 按鈕首次有對應 handler,HITL 流程閉環完整
-- pre-fetch 改用 5s 短超時 + fallback,最壞情況退回原 plan 文字,不破壞既有行為
+- pre-fetch 改用 5s 短超時;最壞情況靜默 suppressed cooldown,不再推送無實證 HITL 噪音
### 負面 / 風險
- 每次價格類 escalation 多花 ≤ 5s(Hermes 熱駐留實測 < 10s 但有 timeout),整體告警延遲略增
-- Hermes 在 5s 內若沒回應,告警內容降級回 plan 文字(仍維持原行為,無新增風險)
+- Hermes 在 5s 內若沒回應,價格類升級不發 Telegram;由 suppressed cooldown 防止重複 pre-fetch / LLM 成本
- `gap_pct ≤ 0` 案例的銷量下滑(非價格因素)將完全不顯示流失金額——若統帥需追蹤「非價格流失」需另開告警類型(待後續 ADR)
### 監控指標
@@ -116,7 +116,7 @@
- [x] `services/hermes_analyst_service.py` PriceThreat 新增絕對金額欄位
- [x] `services/nemoton_dispatcher_service.py` `_compute_business_impact` helper + 三條 dispatch 路徑注入
-- [x] `services/elephant_alpha_autonomous_engine.py` `_fetch_hermes_threats_summary` + 5s timeout + fallback
+- [x] `services/elephant_alpha_autonomous_engine.py` `_fetch_hermes_threats_summary` + 5s timeout + suppressed cooldown
- [x] `services/telegram_bot_service.py` `_handle_event_ignore_callback` + HTML escape + 空 id 拒絕
- [x] Critic 審查通過(Critical-1 / High-1 / High-2 / Medium-2 / Medium-3 全修)
- [x] Smoke test:`_compute_business_impact` 對 gap≤0 / gap=0 / 銷量回升 / bogus type 四案例驗證
diff --git a/docs/memory/credentials_passbook.md b/docs/memory/credentials_passbook.md
index 7bf268f..dbcffd8 100644
--- a/docs/memory/credentials_passbook.md
+++ b/docs/memory/credentials_passbook.md
@@ -6,7 +6,9 @@
| 主機別名 | 內網 IP | 角色 | 部署方式 |
|---------|---------|------|---------|
| **UAT / Gateway** | `192.168.0.110` | Nginx 反向代理、Registry、n8n、Superset | Docker |
-| **Production / AI** | `192.168.0.188` | EwoooC App、Clawdbot、Ollama、pgvector | Docker Compose |
+| **Production / AI** | `192.168.0.188` | EwoooC App、Clawdbot、Ollama (Fallback)、pgvector | Docker Compose |
+| **GCP / SSD 1** | `34.143.170.20` | Primary Ollama (SSD Optimized, All Models) | Standalone |
+| **GCP / SSD 2** | `34.21.145.224` | Secondary Ollama (SSD Optimized, Redundancy) | Standalone / Docker |
| **DevSecOps** | `192.168.0.112` | Kali Linux (掃描主機)、WG-Easy | Docker |
## 🔑 關鍵認證
@@ -27,8 +29,8 @@
- **5002**: Private Registry
- **5678**: n8n
- **8088**: Apache Superset
-- **11434**: Ollama API (188)
-- **3000**: Open WebUI (188)
+- **11434**: Ollama API (Primary: 34.143.170.20, Secondary: 34.21.145.224, Fallback: 188/111)
+- **3000/3010**: Open WebUI (188)
- **51820**: WireGuard VPN
---
diff --git a/routes/daily_sales_routes.py b/routes/daily_sales_routes.py
index c78d130..0aecb75 100644
--- a/routes/daily_sales_routes.py
+++ b/routes/daily_sales_routes.py
@@ -14,7 +14,7 @@ from auth import login_required
from sqlalchemy import inspect, text
import pandas as pd
-from config import BASE_DIR
+from config import BASE_DIR, SYSTEM_VERSION
from database.manager import DatabaseManager
from services.logger_manager import SystemLogger
from utils.df_helpers import find_col
@@ -40,6 +40,11 @@ daily_sales_bp = Blueprint('daily_sales', __name__)
_CACHE_EXPIRY_SECONDS = 300 # 5 分鐘緩存過期
+@daily_sales_bp.context_processor
+def inject_daily_sales_template_vars():
+ return {'system_version': SYSTEM_VERSION}
+
+
def clear_daily_sales_cache():
"""清除當日業績緩存(供匯入服務調用)"""
_clear_daily_sales_cache()
diff --git a/routes/sales_routes.py b/routes/sales_routes.py
index aace760..32a84ce 100644
--- a/routes/sales_routes.py
+++ b/routes/sales_routes.py
@@ -20,7 +20,7 @@ from sqlalchemy import inspect, text
import pandas as pd
import numpy as np
-from config import BASE_DIR, DATABASE_TYPE
+from config import BASE_DIR, DATABASE_TYPE, SYSTEM_VERSION
from database.manager import DatabaseManager
from services.logger_manager import SystemLogger
from services.daily_sales_service import prepare_marketing_summary
@@ -43,6 +43,11 @@ _TABLE_DATA_CACHE = {}
_TABLE_DATA_CACHE_TTL = 60
+@sales_bp.context_processor
+def inject_sales_template_vars():
+ return {'system_version': SYSTEM_VERSION}
+
+
# ==========================================
# 輔助函數
# ==========================================
@@ -251,6 +256,10 @@ def sales_analysis():
start_date = request.args.get('start_date', '')
end_date = request.args.get('end_date', '')
+ # V-UX 2026-05-05: 報表頁預設直接載入最近 1 個月,避免首頁只有引導卡而沒有圖表。
+ if not data_range_param and not start_date and not end_date:
+ return redirect(url_for('sales.sales_analysis', data_range='1', metric=request.args.get('metric', 'amount')))
+
# V-New: 按需載入 - 如果沒有任何篩選條件,顯示引導頁面
if not data_range_param and not start_date and not end_date:
sys_log.info("[Sales Analysis] 👋 首次進入頁面,等待用戶選擇篩選條件")
diff --git a/scripts/quick_review.sh b/scripts/quick_review.sh
index 76d5ee5..da3b470 100755
--- a/scripts/quick_review.sh
+++ b/scripts/quick_review.sh
@@ -20,6 +20,7 @@ OBSERVABILITY_UI_GUARD="$PROJECT_ROOT/scripts/check_observability_ui.py"
OBSERVABILITY_PAGE_SMOKE="$PROJECT_ROOT/scripts/check_observability_pages.py"
OBSERVABILITY_QA_SUITE="$PROJECT_ROOT/scripts/check_observability_suite.sh"
OBSERVABILITY_CSS_SYNC="$PROJECT_ROOT/scripts/sync_observability_css.py"
+REVIEW_REPORT_HINT=0
# 顯示標題
echo -e "${BLUE}========================================${NC}"
@@ -151,10 +152,12 @@ if [ $# -eq 0 ]; then
case $choice in
1)
echo -e "${GREEN}🚀 開始自動Review暫存檔案...${NC}"
+ REVIEW_REPORT_HINT=1
run_code_review --auto --type basic
;;
2)
echo -e "${GREEN}🚀 開始Review所有變更檔案...${NC}"
+ REVIEW_REPORT_HINT=1
run_code_review --type basic
;;
3)
@@ -162,6 +165,7 @@ if [ $# -eq 0 ]; then
read -r files_input
if [ -n "$files_input" ]; then
echo -e "${GREEN}🚀 開始Review指定檔案...${NC}"
+ REVIEW_REPORT_HINT=1
run_code_review --files $files_input --type basic
else
echo -e "${RED}❌ 未指定檔案${NC}"
@@ -170,10 +174,12 @@ if [ $# -eq 0 ]; then
;;
4)
echo -e "${GREEN}🛡️ 開始安全檢查...${NC}"
+ REVIEW_REPORT_HINT=1
run_code_review --auto --type security
;;
5)
echo -e "${GREEN}⚡ 開始效能檢查...${NC}"
+ REVIEW_REPORT_HINT=1
run_code_review --auto --type performance
;;
6)
@@ -197,14 +203,17 @@ else
# 有指定檔案,直接Review
echo -e "${GREEN}🚀 開始Review指定檔案...${NC}"
echo -e "${BLUE}檔案:$@${NC}"
+ REVIEW_REPORT_HINT=1
run_code_review --files "$@" --type basic
fi
# 檢查執行結果
if [ $? -eq 0 ]; then
- echo -e "${GREEN}✅ Code Review 完成!${NC}"
- echo -e "${BLUE}📄 Review報告位置:$PROJECT_ROOT/logs/${NC}"
+ echo -e "${GREEN}✅ Quick Review / QA 完成!${NC}"
+ if [ "$REVIEW_REPORT_HINT" -eq 1 ]; then
+ echo -e "${BLUE}📄 Review報告位置:$PROJECT_ROOT/logs/${NC}"
+ fi
else
- echo -e "${RED}❌ Code Review 失敗!${NC}"
+ echo -e "${RED}❌ Quick Review / QA 失敗!${NC}"
exit 1
fi
diff --git a/services/ai_automation_smoke_service.py b/services/ai_automation_smoke_service.py
index 44a51e9..5cb1bd8 100644
--- a/services/ai_automation_smoke_service.py
+++ b/services/ai_automation_smoke_service.py
@@ -17,6 +17,7 @@ from sqlalchemy import text
from config import SYSTEM_VERSION
from database.manager import get_session
+from services.ollama_service import resolve_ollama_host, get_host_label, OLLAMA_HOST_PRIMARY, OLLAMA_HOST_FALLBACK
STATUS_RANK = {"ok": 0, "warning": 1, "critical": 2}
@@ -357,8 +358,36 @@ def _elephant_hitl_check() -> Dict[str, Any]:
return _check("ElephantAlpha HITL", "critical", f"ElephantAlpha smoke 失敗:{exc}")
+def _ollama_status_check() -> Dict[str, Any]:
+ try:
+ current_host = resolve_ollama_host()
+ label = get_host_label(current_host)
+ is_primary = current_host == OLLAMA_HOST_PRIMARY
+
+ status = "ok" if is_primary else "warning"
+ summary = f"Ollama 運作中 (主機: {label})"
+ if not is_primary:
+ summary += " [備援模式]"
+
+ return _check(
+ "Ollama 服務狀態",
+ status,
+ summary,
+ {
+ "current_host": current_host,
+ "host_label": label,
+ "is_primary": is_primary,
+ "primary_config": OLLAMA_HOST_PRIMARY,
+ "fallback_config": OLLAMA_HOST_FALLBACK
+ }
+ )
+ except Exception as exc:
+ return _check("Ollama 服務狀態", "critical", f"Ollama 狀態檢查失敗:{exc}")
+
+
def collect_ai_automation_smoke(*, record_history: bool = True, history_limit: int = 20) -> Dict[str, Any]:
checks: List[Dict[str, Any]] = [
+ _ollama_status_check(),
_event_router_check(),
_autoheal_check(),
_nemotron_check(),
diff --git a/services/elephant_alpha_autonomous_engine.py b/services/elephant_alpha_autonomous_engine.py
index f16c842..e116fba 100644
--- a/services/elephant_alpha_autonomous_engine.py
+++ b/services/elephant_alpha_autonomous_engine.py
@@ -8,8 +8,8 @@ AI 3.0 Autonomous Operations:
- Continuous improvement loop
ADR-012 Compliance:
- §③ 單一 audit trail — 所有基行完畢後必發 triaged_alert Telegram
- §⑤ 雙寫強制 — ai_insights (由 orchestrator._log_decision) + Telegram
+ §③ 單一 audit trail — 有實證的行動/升級需寫入 ai_insights 並發 triaged_alert Telegram
+ §⑤ 雙寫強制 — 無實證低信心升級只寫 suppressed cooldown,避免 human_review 噪音
ADR-013 Compliance:
resource_optimization trigger → auto_heal_service.handle_exception
"""
@@ -42,10 +42,18 @@ SSH_PORT = int(os.getenv("ELEPHANT_ALPHA_SSH_PORT", "22"))
SSH_CONNECT_TIMEOUT = int(os.getenv("ELEPHANT_ALPHA_SSH_CONNECT_TIMEOUT", "10"))
SSH_COMMAND_TIMEOUT = int(os.getenv("ELEPHANT_ALPHA_SSH_COMMAND_TIMEOUT", "60"))
-CACHE_DB_PATH = os.getenv("ELEPHANT_ALPHA_CACHE_DB", ":memory:")
+_DEFAULT_CACHE_DB_PATH = os.path.join(
+ os.path.dirname(os.path.dirname(__file__)),
+ "data",
+ "elephant_alpha_cache.db",
+)
+CACHE_DB_PATH = os.getenv("ELEPHANT_ALPHA_CACHE_DB", _DEFAULT_CACHE_DB_PATH)
ESCALATION_COOLDOWN_MIN = int(os.getenv("ELEPHANT_ALPHA_ESCALATION_COOLDOWN_MIN", "30"))
+NO_EVIDENCE_ESCALATION_COOLDOWN_MIN = int(os.getenv("ELEPHANT_ALPHA_NO_EVIDENCE_COOLDOWN_MIN", "360"))
CONFIDENCE_THRESHOLD = float(os.getenv("ELEPHANT_ALPHA_CONFIDENCE_THRESHOLD", "0.7"))
MAX_AUTONOMOUS_DECISIONS_PER_HOUR = int(os.getenv("ELEPHANT_ALPHA_MAX_AUTONOMOUS_DECISIONS_PER_HOUR", "10"))
+RESOURCE_QUEUE_THRESHOLD = int(os.getenv("ELEPHANT_ALPHA_RESOURCE_QUEUE_THRESHOLD", "10"))
+RESOURCE_LOAD_THRESHOLD = float(os.getenv("ELEPHANT_ALPHA_RESOURCE_LOAD_THRESHOLD", "80"))
# ---- Constants ----
_ALLOWED_ACTION_TYPES = frozenset({
@@ -123,6 +131,15 @@ _PRICE_RELATED_TRIGGERS = frozenset({
"threat_escalation",
})
+_OPERATIONAL_PENDING_ACTION_TYPES = frozenset({
+ "agent_safe_action",
+ "auto_heal",
+ "resource_optimization",
+ "scheduler_retry",
+})
+
+_NO_EVIDENCE_DEDUP_PREFIX = "no_evidence:"
+
def _zh_trigger(trigger_type: str) -> str:
return _TRIGGER_ZH.get(trigger_type, trigger_type)
@@ -210,6 +227,8 @@ class ElephantAlphaAutonomousEngine:
# ---- DB ----
def _init_cache_db(self) -> None:
self._db_lock = threading.Lock()
+ if CACHE_DB_PATH != ":memory:":
+ os.makedirs(os.path.dirname(CACHE_DB_PATH), exist_ok=True)
self._conn = sqlite3.connect(CACHE_DB_PATH, check_same_thread=False)
self._conn.execute("""
CREATE TABLE IF NOT EXISTS escalation_dedup (
@@ -315,9 +334,7 @@ class ElephantAlphaAutonomousEngine:
for trigger in self.triggers:
if not trigger.enabled:
continue
- cooldown_min = self._get_cooldown(trigger.trigger_type)
- last = trigger.last_triggered
- if last and (datetime.now() - last).total_seconds() / 60 < cooldown_min:
+ if self._is_trigger_in_cooldown(trigger):
continue
if await self._evaluate_trigger(trigger):
await self._execute_autonomous_decision(trigger)
@@ -327,8 +344,32 @@ class ElephantAlphaAutonomousEngine:
return self._get_cooldown_min(trigger_type)
def _get_cooldown_min(self, trigger_type: str) -> int:
+ if trigger_type.startswith(_NO_EVIDENCE_DEDUP_PREFIX):
+ return NO_EVIDENCE_ESCALATION_COOLDOWN_MIN
return ESCALATION_COOLDOWN_MIN
+ def _is_trigger_in_cooldown(self, trigger: AutonomousTrigger) -> bool:
+ """同時使用記憶體與持久化冷卻,避免 thread/container 重啟後重送噪音告警。"""
+ def _within_cooldown(trigger_key: str, last_ts: Optional[float]) -> bool:
+ stored_ts = self._load_escalation(trigger_key)
+ if stored_ts:
+ last_ts = max(last_ts or 0, float(stored_ts))
+
+ if not last_ts:
+ return False
+
+ cooldown_sec = self._get_cooldown_min(trigger_key) * 60
+ return (datetime.now().timestamp() - last_ts) < cooldown_sec
+
+ last_ts: Optional[float] = None
+ if trigger.last_triggered:
+ last_ts = trigger.last_triggered.timestamp()
+
+ if _within_cooldown(trigger.trigger_type, last_ts):
+ return True
+
+ return _within_cooldown(f"{_NO_EVIDENCE_DEDUP_PREFIX}{trigger.trigger_type}", None)
+
async def _evaluate_trigger(self, trigger: AutonomousTrigger) -> bool:
try:
if trigger.trigger_type == "price_drop_alert":
@@ -415,8 +456,10 @@ class ElephantAlphaAutonomousEngine:
session.close()
async def _check_resource_optimization_trigger(self, trigger: AutonomousTrigger) -> bool:
- return (self._get_action_queue_size() > 10
- or self._get_system_load_percentage() > 80)
+ operational_queue_size = self._get_action_queue_size()
+ system_load_pct = self._get_system_load_percentage()
+ return (operational_queue_size > RESOURCE_QUEUE_THRESHOLD
+ or system_load_pct > RESOURCE_LOAD_THRESHOLD)
async def _check_code_exception_trigger(self, trigger: AutonomousTrigger) -> bool:
containers = trigger.conditions.get("scan_containers", ["momo-pro-system", "momo-scheduler"])
@@ -696,19 +739,19 @@ class ElephantAlphaAutonomousEngine:
將「步驟 1: [OpenClaw] 生成策略」這類元流程文字換成
「[SKU] 商品|MOMO $X / PChome $Y|流失 NT$ Z|建議 NT$ W」具體可決策行動。
- 失敗回 None,由呼叫端 fallback 至既有 execution_plan 文字。
- 本方法為 best-effort:任何例外都不阻斷 escalation 主流程。
+ 失敗回 None,由呼叫端判斷是否 suppressed;不得 fallback 至 LLM plan 文字。
+ 本方法為 best-effort:任何例外都不阻斷 escalation 判斷流程。
Critic High-1 fix: 加 5 秒短超時防止阻塞 escalation cooldown 視窗
(Hermes 完整 run 可能 30-60s,HITL 訊息應快速送出)
- Critic High-2 fix: 若每筆都缺 loss/rec_price,視同無料、return None 觸發 fallback
+ Critic High-2 fix: 若每筆都缺 loss/rec_price,視同無料、return None 觸發 suppressed
"""
# 使用 5s 短超時:Hermes 熱駐留時實測 < 10s,但若需冷啟動會拖到 30s+
- # HITL 訊息延遲不可大於 10s(影響統帥決策時效性),寧可 fallback 到原 plan 文字
+ # HITL 訊息延遲不可大於 10s;timeout 後由呼叫端 suppressed,避免無實證文字洗版
try:
result = await asyncio.wait_for(self._hermes_analyze(), timeout=5)
except asyncio.TimeoutError:
- self._log.warning("Pre-fetch Hermes 5s timeout; falling back to plan text")
+ self._log.warning("Pre-fetch Hermes 5s timeout; no concrete data for escalation")
return None
except Exception as e:
self._log.warning("Pre-fetch Hermes threats failed (non-blocking): %s", e)
@@ -716,7 +759,7 @@ class ElephantAlphaAutonomousEngine:
threats = getattr(result, "threats", None) or []
if not threats:
- self._log.info("Pre-fetch Hermes returned 0 threats; falling back to plan text")
+ self._log.info("Pre-fetch Hermes returned 0 threats; no concrete data for escalation")
return None
# 模組頂部 import 較乾淨,但這裡保留 lazy import 避免兩服務循環依賴
@@ -756,8 +799,8 @@ class ElephantAlphaAutonomousEngine:
if not any_concrete:
# Critic High-2: 全部都只有「MOMO $X vs PChome $Y」乾巴巴兩行,
- # 比原本「步驟 1:OpenClaw 生成策略」更空泛。返回 None 觸發 plan fallback
- self._log.info("Pre-fetch threats lacked impact figures on all rows; falling back")
+ # 比原本「步驟 1:OpenClaw 生成策略」更空泛。返回 None 觸發 suppressed
+ self._log.info("Pre-fetch threats lacked impact figures on all rows; no concrete data for escalation")
return None
self._log.info("Pre-fetch Hermes threats produced %d concrete actions", len(lines))
@@ -881,8 +924,106 @@ class ElephantAlphaAutonomousEngine:
except Exception as e:
self._log.error("Telegram audit failed (non-blocking): %s", e)
+ def _build_resource_escalation_actions(self) -> Optional[List[str]]:
+ queue_size = self._safe_metric(self._get_action_queue_size, default=0)
+ system_load_pct = self._safe_metric(self._get_system_load_percentage, default=0.0)
+
+ evidence = []
+ if queue_size > RESOURCE_QUEUE_THRESHOLD:
+ evidence.append(f"auto action queue {queue_size} 筆 > 門檻 {RESOURCE_QUEUE_THRESHOLD} 筆")
+ if system_load_pct > RESOURCE_LOAD_THRESHOLD:
+ evidence.append(f"CPU load {system_load_pct:.1f}% > 門檻 {RESOURCE_LOAD_THRESHOLD:.0f}%")
+
+ if not evidence:
+ return None
+
+ return [
+ f"📊 實測資源訊號:{';'.join(evidence)}",
+ "🔎 優先檢查 action_plans 的 auto_pending/running 任務是否卡住",
+ "🔧 僅允許處置 app/scheduler 類服務;禁止操作 momo-db 容器生命週期",
+ ]
+
+ @staticmethod
+ def _build_code_exception_escalation_actions(trigger: AutonomousTrigger) -> Optional[List[str]]:
+ error_msg = (trigger.temp_error_msg or "").strip()
+ if not error_msg:
+ return None
+
+ first_line = next((line.strip() for line in error_msg.splitlines() if line.strip()), "容器日誌偵測到 Python 例外")
+ target = trigger.temp_target_file or "未能自動定位檔案"
+ return [
+ f"🧾 例外摘要:{first_line[:180]}",
+ f"📍 疑似檔案:{target}",
+ "🔎 建議先查最近 5 分鐘 app/scheduler logs,確認是否仍持續重現",
+ ]
+
+ def _suppress_no_evidence_escalation(
+ self,
+ decision: StrategicDecision,
+ trigger: AutonomousTrigger,
+ reason: str,
+ ) -> None:
+ dedup_key = f"{_NO_EVIDENCE_DEDUP_PREFIX}{trigger.trigger_type}"
+ self._store_escalation(dedup_key)
+ self._log.warning(
+ "EA no-evidence escalation suppressed: trigger=%s confidence=%.2f reason=%s cooldown_min=%s",
+ trigger.trigger_type,
+ decision.confidence,
+ reason,
+ NO_EVIDENCE_ESCALATION_COOLDOWN_MIN,
+ )
+
async def _escalate_to_human(self, decision: StrategicDecision, trigger: AutonomousTrigger) -> None:
self._log.warning("Escalating to human: %s", trigger.trigger_type)
+ concrete_actions: Optional[List[str]] = None
+ ai_summary_text = (decision.reasoning or "")[:300]
+ ai_cause_text = (
+ f"觸發類型:{_zh_trigger(trigger.trigger_type)} | "
+ f"信心度:{decision.confidence:.2f} | "
+ f"參與模組:{', '.join(_AGENT_LABEL.get(a.lower(), a) for a in decision.agents_required)}"
+ )
+
+ if trigger.trigger_type in _PRICE_RELATED_TRIGGERS:
+ concrete_actions = (trigger.conditions or {}).get("_prefetched_hermes_threats")
+ if not concrete_actions:
+ try:
+ concrete_actions = await self._fetch_hermes_threats_summary(top_n=5)
+ except Exception as e:
+ self._log.warning("Pre-fetch threats raised (non-blocking): %s", e)
+ concrete_actions = None
+ if not concrete_actions:
+ self._suppress_no_evidence_escalation(decision, trigger, "price_trigger_without_hermes_threats")
+ return
+ elif trigger.trigger_type == "resource_optimization":
+ concrete_actions = self._build_resource_escalation_actions()
+ if concrete_actions:
+ ai_summary_text = "資源類升級含實測 queue/load 訊號,請依安全邊界判斷是否處置。"
+ ai_cause_text = (
+ f"觸發類型:{_zh_trigger(trigger.trigger_type)} | "
+ f"信心度:{decision.confidence:.2f} | "
+ "證據來源:action_plans queue / host CPU load"
+ )
+ else:
+ self._suppress_no_evidence_escalation(decision, trigger, "resource_trigger_without_operational_metrics")
+ return
+ elif trigger.trigger_type == "code_exception":
+ concrete_actions = self._build_code_exception_escalation_actions(trigger)
+ if concrete_actions:
+ ai_summary_text = "容器日誌偵測到具體例外,已轉人工審核避免自動修復風險擴大。"
+ ai_cause_text = (
+ f"觸發類型:{_zh_trigger(trigger.trigger_type)} | "
+ f"信心度:{decision.confidence:.2f} | "
+ "證據來源:最近容器 logs"
+ )
+ else:
+ self._suppress_no_evidence_escalation(decision, trigger, "code_exception_without_log_context")
+ return
+ else:
+ if not decision.execution_plan and not (decision.reasoning or "").strip():
+ self._suppress_no_evidence_escalation(decision, trigger, "empty_decision_payload")
+ return
+ concrete_actions = [_zh_step(s) for s in decision.execution_plan[:3]]
+
session = get_session()
try:
row = session.execute(
@@ -930,68 +1071,16 @@ class ElephantAlphaAutonomousEngine:
if not dedup_ts or (datetime.now().timestamp() - dedup_ts) / 60 >= cooldown_min:
self._store_escalation(trigger.trigger_type)
- # A' 軌:價格類觸發前 pre-fetch Hermes 具體威脅清單,
- # 取代「步驟 1:[OpenClaw] 生成策略」這類元流程文字。
- # — Claude Opus 4.7 (2026-05-02)
- concrete_actions: Optional[List[str]] = None
- if trigger.trigger_type in _PRICE_RELATED_TRIGGERS:
- try:
- concrete_actions = await self._fetch_hermes_threats_summary(top_n=5)
- except Exception as e:
- self._log.warning("Pre-fetch threats raised (non-blocking): %s", e)
- concrete_actions = None
-
- # ─── Operation Ollama-First v5.0 修補:消除空泛幻覺訊息 ───
- # 統帥反饋(2026-05-03):fallback 路徑帶 OpenClaw Gemini plan 文字 +
- # decision.reasoning 全是「312 SKU / 23% / 14 項任務」幻覺數字,無 DB 鉤住,
- # 嚴重誤導決策。修法:concrete=Hermes 實證 vs concrete=None 兩條路徑徹底分離。
- # - 有實證 → 完整訊息(含 SKU 流失金額)
- # - 無實證 → 極簡訊息「Hermes 即時數據不可用」+ 不再灌 LLM 幻覺
-
from services.telegram_templates import triaged_alert, _send_telegram_raw
- if concrete_actions:
- # 有實證數據路徑:保留完整訊息
- ai_actions_payload = concrete_actions
- ai_summary_text = (decision.reasoning or "")[:300]
- ai_cause_text = (
- f"觸發類型:{_zh_trigger(trigger.trigger_type)} | "
- f"信心度:{decision.confidence:.2f} | "
- f"參與模組:{', '.join(_AGENT_LABEL.get(a.lower(), a) for a in decision.agents_required)}"
- )
- else:
- # 無實證數據路徑:極簡訊息,明確標註無數據
- self._log.warning(
- "EA escalation 落入 no-concrete-data fallback (trigger=%s);"
- "送極簡訊息避免 LLM 幻覺數字誤導統帥",
- trigger.trigger_type
- )
- ai_actions_payload = [
- "⚠️ Hermes 即時威脅清單不可用(5s timeout 或無 SKU 命中)",
- "📋 建議:手動下 SQL 查詢過去 24h competitor_price_history 確認狀況",
- "🔧 或:SSH 188 跑 docker exec momo-pro-system python -c "
- "'from services.hermes_analyst_service import HermesAnalystService;"
- " print(HermesAnalystService().run().threats[:5])'",
- ]
- ai_summary_text = (
- f"⚠️ 本訊息為**無實證**告警:Hermes pre-fetch 失敗,"
- f"以下原始決策內容含 LLM 自由發揮數字(非 DB 數據),請審慎參考。"
- )
- ai_cause_text = (
- f"觸發類型:{_zh_trigger(trigger.trigger_type)} | "
- f"信心度:{decision.confidence:.2f} | "
- f"⚠️ 無 Hermes SKU 數據(不顯示 LLM 幻覺 plan 文字)"
- )
+ ai_actions_payload = concrete_actions
try:
msg, keyboard = triaged_alert(
base_event={
"event_type": "ea_escalation",
"title": f"🐘 EA 升級審核 · {_zh_trigger(trigger.trigger_type)}",
- "summary": (
- f"自主決策信心度 {decision.confidence:.2f} 低於門檻,需人工批准"
- + ("" if concrete_actions else "(⚠️ 無實證數據)")
- ),
+ "summary": f"自主決策信心度 {decision.confidence:.2f} 低於門檻,需人工批准",
"id": f"ea_review_{int(datetime.now().timestamp())}",
},
tier_label="🐘 Elephant Alpha · L3 HITL",
@@ -1059,10 +1148,19 @@ class ElephantAlphaAutonomousEngine:
def _get_action_queue_size(self) -> int:
session = get_session()
try:
+ operational_types = "', '".join(sorted(_OPERATIONAL_PENDING_ACTION_TYPES))
row = session.execute(
- text("SELECT COUNT(*) AS count FROM action_plans WHERE status = 'pending'")
+ text(f"""
+ SELECT COUNT(*) AS count
+ FROM action_plans
+ WHERE status IN ('auto_pending', 'running')
+ OR (
+ status = 'pending'
+ AND COALESCE(action_type, '') IN ('{operational_types}')
+ )
+ """)
).fetchone()
- return row.count if row else 0
+ return int(row[0] or 0) if row else 0
finally:
session.close()
diff --git a/services/elephant_alpha_orchestrator.py b/services/elephant_alpha_orchestrator.py
index 8b6d80f..a7b55e5 100644
--- a/services/elephant_alpha_orchestrator.py
+++ b/services/elephant_alpha_orchestrator.py
@@ -28,6 +28,14 @@ from services.elephant_service import elephant_service
logger = SystemLogger("ElephantAlphaOrchestrator").get_logger()
+_OPERATIONAL_PENDING_ACTION_TYPES = frozenset({
+ "agent_safe_action",
+ "auto_heal",
+ "resource_optimization",
+ "scheduler_retry",
+})
+
+
@dataclass
class AgentCapability:
"""AI Agent capability definition"""
@@ -273,7 +281,7 @@ CURRENT AGENT STATUS:
CURRENT SITUATION:
- Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
- System Load: {self._get_system_load()}
-- Pending Actions: {self._get_pending_actions_count()}
+- Operational Pending Actions: {self._get_pending_actions_count()}
REQUIRED DECISION:
Based on the current business context and system state, determine the optimal strategy and agent coordination. Consider:
@@ -376,16 +384,21 @@ Provide your strategic decision in the specified JSON format.
return "normal" # Placeholder - could integrate with monitoring
def _get_pending_actions_count(self) -> int:
- """Get count of pending actions"""
+ """Get count of auto-safe operational actions, not human/business backlog."""
session = get_session()
try:
- row = session.execute(text("""
+ operational_types = "', '".join(sorted(_OPERATIONAL_PENDING_ACTION_TYPES))
+ row = session.execute(text(f"""
SELECT COUNT(*) as count
FROM action_plans
- WHERE status = 'pending'
+ WHERE status IN ('auto_pending', 'running')
+ OR (
+ status = 'pending'
+ AND COALESCE(action_type, '') IN ('{operational_types}')
+ )
""")).fetchone()
- return row.count if row else 0
+ return int(row[0] or 0) if row else 0
finally:
session.close()
diff --git a/services/import_service.py b/services/import_service.py
index e789725..5b4a954 100644
--- a/services/import_service.py
+++ b/services/import_service.py
@@ -10,7 +10,7 @@ import logging
import json
from datetime import datetime
from typing import Optional, Dict, Any
-from sqlalchemy import create_engine
+from sqlalchemy import bindparam, create_engine
from sqlalchemy.orm import sessionmaker
import pandas as pd
import pytz
@@ -298,6 +298,95 @@ class ImportService:
finally:
session.close()
+ @staticmethod
+ def _has_any_column(cols, keywords):
+ """檢查欄位中是否包含任一關鍵字。"""
+ normalized_cols = [str(col).strip() for col in cols]
+ return any(kw in col for col in normalized_cols for kw in keywords)
+
+ def _validate_daily_sales_columns(self, df: pd.DataFrame) -> list:
+ """回傳 daily sales Excel 缺少的必要欄位分類。"""
+ required_groups = {
+ "商品名稱類": ["商品名稱", "品名", "Product", "Name"],
+ "業績金額類": ["銷售金額", "業績", "金額", "Amount", "Sales", "Total"],
+ }
+ return [
+ label
+ for label, keywords in required_groups.items()
+ if not self._has_any_column(df.columns, keywords)
+ ]
+
+ @staticmethod
+ def _cleanup_excel_dataframe(df: pd.DataFrame) -> pd.DataFrame:
+ """清理 Excel 讀取後的全空列欄與欄位名稱。"""
+ df = df.dropna(axis=0, how='all').dropna(axis=1, how='all')
+ df.columns = [str(col).strip() for col in df.columns]
+ return df
+
+ def _read_daily_sales_excel(self, file_path: str) -> pd.DataFrame:
+ """
+ 讀取當日業績 Excel,若預設第一列不是表頭,會掃描前 20 列尋找真正表頭。
+ """
+ df = self._cleanup_excel_dataframe(pd.read_excel(file_path, engine='openpyxl', dtype=str))
+ if not df.empty and not self._validate_daily_sales_columns(df):
+ return df
+
+ excel = pd.ExcelFile(file_path, engine='openpyxl')
+ for sheet_name in excel.sheet_names:
+ preview = pd.read_excel(
+ file_path,
+ sheet_name=sheet_name,
+ header=None,
+ nrows=20,
+ engine='openpyxl',
+ dtype=str,
+ )
+ for header_row in range(len(preview.index)):
+ candidate_columns = preview.iloc[header_row].dropna().astype(str).str.strip().tolist()
+ if not candidate_columns:
+ continue
+ candidate_df = pd.DataFrame(columns=candidate_columns)
+ if self._validate_daily_sales_columns(candidate_df):
+ continue
+
+ detected_df = pd.read_excel(
+ file_path,
+ sheet_name=sheet_name,
+ header=header_row,
+ engine='openpyxl',
+ dtype=str,
+ )
+ detected_df = self._cleanup_excel_dataframe(detected_df)
+ logger.info(
+ f"Excel 表頭自動偵測成功: sheet={sheet_name}, header_row={header_row + 1}"
+ )
+ return detected_df
+
+ return df
+
+ @staticmethod
+ def _normalize_dates_for_sql(date_values) -> list:
+ """將日期值正規化成 YYYY-MM-DD 字串,供 SQL expanding bind 使用。"""
+ normalized_dates = []
+ for value in date_values:
+ if value is None or pd.isna(value):
+ continue
+ parsed = pd.to_datetime(value, errors='coerce')
+ if pd.notna(parsed):
+ normalized_dates.append(str(parsed.date()))
+ return sorted(set(normalized_dates))
+
+ @staticmethod
+ def _calculate_data_lag_days(date_max: str) -> Optional[int]:
+ """計算匯入資料最大日期距今天數。"""
+ if not date_max:
+ return None
+ parsed = pd.to_datetime(date_max, errors='coerce')
+ if pd.isna(parsed):
+ return None
+ today = datetime.now(TAIPEI_TZ).date()
+ return max((today - parsed.date()).days, 0)
+
def process_daily_sales_import(self, job_id: int, file_path: str) -> bool:
"""
處理當日業績匯入
@@ -314,7 +403,7 @@ class ImportService:
# 讀取 Excel 檔案
logger.info(f"開始讀取 Excel 檔案: {file_path}")
- df = pd.read_excel(file_path, engine='openpyxl', dtype=str)
+ df = self._read_daily_sales_excel(file_path)
if df.empty:
error_msg = "Excel 檔案為空"
@@ -326,15 +415,7 @@ class ImportService:
# 原因:若 Excel 欄位名靜默變更,匯入會成功但 Hermes SQL JOIN 會找不到數據 → 告警管線失真
# 規則:至少需偵測到「商品名稱」與「銷售金額」類欄位 (容忍多種別名)
# ─────────────────────────────────────────────
- def _has_any(cols, keywords):
- return any(kw in c for c in cols for kw in keywords)
-
- required_groups = {
- "商品名稱類": ["商品名稱", "品名", "Product", "Name"],
- "業績金額類": ["銷售金額", "業績", "金額", "Amount", "Sales", "Total"],
- }
- missing = [label for label, kws in required_groups.items()
- if not _has_any(df.columns, kws)]
+ missing = self._validate_daily_sales_columns(df)
if missing:
error_msg = (
f"Excel 欄位防禦失敗:缺少必要欄位分類 {missing}。"
@@ -372,211 +453,178 @@ class ImportService:
self.update_job_progress(job_id, total_rows=total_rows, processed_rows=0)
# 取得此次匯入的日期範圍
- import_dates = df['snapshot_date'].unique()
+ import_dates = self._normalize_dates_for_sql(df['snapshot_date'].unique())
logger.info(f"本次匯入包含 {len(import_dates)} 個日期的資料")
- # 刪除資料庫中相同日期的舊資料(覆蓋邏輯)
- if len(import_dates) > 0:
- # 過濾掉 None 值
- valid_dates = [d for d in import_dates if d is not None]
-
- if valid_dates:
- # 將日期轉換為字串格式用於 SQL 查詢
- date_list = ', '.join([f"'{d}'" for d in valid_dates])
-
- with engine.connect() as conn:
- # 刪除相同日期的舊資料
- delete_query = text(f"DELETE FROM {table_name} WHERE snapshot_date IN ({date_list})")
- result = conn.execute(delete_query)
- deleted_count = result.rowcount
- conn.commit()
-
- if deleted_count > 0:
- logger.info(f"已刪除 {deleted_count} 筆舊資料(覆蓋模式)")
-
- # 寫入資料庫(帶驗證和重試機制)
- max_retries = 2
- retry_count = 0
- write_success = False
-
- while retry_count <= max_retries and not write_success:
- try:
- if retry_count > 0:
- logger.warning(f"任務 {job_id} 第 {retry_count} 次重試寫入...")
- self.update_job_status(job_id, 'importing', 60, f'重試寫入中 ({retry_count}/{max_retries})...')
-
- df.to_sql(
- table_name,
- engine,
- if_exists='append',
- index=False,
- method='multi',
- chunksize=1000
- )
-
- # V-Fix: 匯入後驗證 - 確認資料已正確寫入資料庫
- self.update_job_status(job_id, 'importing', 85, '驗證資料寫入...')
-
- # 取得本次匯入的日期
- import_dates = df['snapshot_date'].dropna().unique()
- if len(import_dates) > 0:
- # 查詢資料庫中這些日期的資料筆數
- from sqlalchemy import text
- valid_dates = [str(d) for d in import_dates if d is not None]
- date_list = ', '.join([f"'{d}'" for d in valid_dates])
-
- with engine.connect() as conn:
- verify_query = text(f"SELECT COUNT(*) FROM {table_name} WHERE snapshot_date IN ({date_list})")
- result = conn.execute(verify_query)
- db_count = result.scalar()
-
- # 驗證:資料庫筆數應該 >= 本次匯入筆數(可能有其他日期的舊資料)
- expected_count = len(df[df['snapshot_date'].isin(valid_dates)])
-
- if db_count >= expected_count:
- logger.info(f"任務 {job_id} 驗證成功: 預期 {expected_count} 筆, 資料庫有 {db_count} 筆")
- write_success = True
- else:
- logger.warning(f"任務 {job_id} 驗證失敗: 預期 {expected_count} 筆, 資料庫只有 {db_count} 筆")
- retry_count += 1
- else:
- # 沒有有效日期,跳過驗證
- logger.warning(f"任務 {job_id} 無法驗證: 沒有有效的 snapshot_date")
- write_success = True
-
- except Exception as write_error:
- logger.error(f"任務 {job_id} 寫入失敗 (嘗試 {retry_count + 1}): {str(write_error)}")
- retry_count += 1
- if retry_count > max_retries:
- raise write_error
-
- if not write_success:
- error_msg = f"資料寫入驗證失敗,已重試 {max_retries} 次"
- self.update_job_status(job_id, 'failed', 85, '驗證失敗', error_msg)
+ if not import_dates:
+ error_msg = "匯入資料缺少有效日期,拒絕寫入以避免日期未知資料污染"
+ self.update_job_status(job_id, 'failed', 55, '日期驗證失敗', error_msg)
logger.error(f"任務 {job_id} {error_msg}")
return False
# === V-New 2026-01-15: 同步寫入 realtime_sales_monthly ===
# 目的:讓當日業績 raw data 同時呈現在「業績分析儀表板」
- # 2026-01-30 修復:加強欄位驗證、同步狀態追蹤、失敗告警
- self.update_job_status(job_id, 'importing', 90, '同步至業績分析儀表板...')
+ # 2026-05-05 修復:daily 與 monthly 改成同一個 transaction,避免半成功。
+ self.update_job_status(job_id, 'importing', 80, '準備同步至業績分析儀表板...')
sync_success = False
sync_error_msg = None
monthly_table = 'realtime_sales_monthly'
- try:
- # 準備資料:移除 snapshot_date 欄位(realtime_sales_monthly 不需要此欄位)
- df_monthly = df.drop(columns=['snapshot_date'], errors='ignore')
+ # 準備資料:移除 snapshot_date 欄位(realtime_sales_monthly 不需要此欄位)
+ df_monthly = df.drop(columns=['snapshot_date'], errors='ignore')
- # 2026-01-30 修正:強化欄位名稱轉換
- # 將特殊字符轉換為 PostgreSQL 安全格式
- column_mapping = {}
- for col in df_monthly.columns:
- new_col = col.replace('%', '_pct').replace('(', '_').replace(')', '_')
- column_mapping[col] = new_col
- df_monthly = df_monthly.rename(columns=column_mapping)
+ # 2026-01-30 修正:強化欄位名稱轉換
+ # 將特殊字符轉換為 PostgreSQL 安全格式
+ column_mapping = {}
+ for col in df_monthly.columns:
+ new_col = col.replace('%', '_pct').replace('(', '_').replace(')', '_')
+ column_mapping[col] = new_col
+ df_monthly = df_monthly.rename(columns=column_mapping)
- # 記錄轉換的欄位
- converted_cols = [f"'{k}' -> '{v}'" for k, v in column_mapping.items() if k != v]
- if converted_cols:
- logger.info(f"任務 {job_id} 欄位名稱轉換: {', '.join(converted_cols)}")
- logger.info(f"任務 {job_id} 欄位轉換完成,共 {len(df_monthly.columns)} 個欄位")
+ converted_cols = [f"'{k}' -> '{v}'" for k, v in column_mapping.items() if k != v]
+ if converted_cols:
+ logger.info(f"任務 {job_id} 欄位名稱轉換: {', '.join(converted_cols)}")
+ logger.info(f"任務 {job_id} 欄位轉換完成,共 {len(df_monthly.columns)} 個欄位")
- # 2026-01-30 新增:驗證 DataFrame 欄位和目標表欄位是否一致
- with engine.connect() as conn:
- col_query = text(f"""
+ # 2026-01-30 新增:驗證 DataFrame 欄位和目標表欄位是否一致
+ with engine.connect() as conn:
+ if engine.dialect.name == 'sqlite':
+ col_query = text(f'PRAGMA table_info("{monthly_table}")')
+ target_columns = {row[1] for row in conn.execute(col_query) if row[1] != 'id'}
+ else:
+ col_query = text("""
SELECT column_name FROM information_schema.columns
- WHERE table_name = '{monthly_table}' AND column_name != 'id'
+ WHERE table_name = :table_name AND column_name != 'id'
ORDER BY ordinal_position
""")
- result = conn.execute(col_query)
- target_columns = set([row[0] for row in result])
+ target_columns = {
+ row[0] for row in conn.execute(col_query, {'table_name': monthly_table})
+ }
- df_columns = set(df_monthly.columns)
- missing_in_table = df_columns - target_columns
- missing_in_df = target_columns - df_columns
+ df_columns = set(df_monthly.columns)
+ missing_in_table = df_columns - target_columns
+ missing_in_df = target_columns - df_columns
- if missing_in_table:
- logger.warning(f"任務 {job_id} 欄位警告: DataFrame 有但表中沒有: {missing_in_table}")
- # 移除表中沒有的欄位,避免 INSERT 失敗
- df_monthly = df_monthly.drop(columns=list(missing_in_table), errors='ignore')
- logger.info(f"任務 {job_id} 已移除多餘欄位,剩餘 {len(df_monthly.columns)} 個欄位")
+ if missing_in_table:
+ logger.warning(f"任務 {job_id} 欄位警告: DataFrame 有但表中沒有: {missing_in_table}")
+ # 移除表中沒有的欄位,避免 INSERT 失敗
+ df_monthly = df_monthly.drop(columns=list(missing_in_table), errors='ignore')
+ logger.info(f"任務 {job_id} 已移除多餘欄位,剩餘 {len(df_monthly.columns)} 個欄位")
- if missing_in_df:
- logger.warning(f"任務 {job_id} 欄位警告: 表中有但 DataFrame 沒有: {missing_in_df}")
+ if missing_in_df:
+ logger.warning(f"任務 {job_id} 欄位警告: 表中有但 DataFrame 沒有: {missing_in_df}")
- # 取得本次匯入的日期列表(使用原始「日期」欄位)
- unique_dates = []
- if '日期' in df.columns:
- unique_dates = df['日期'].dropna().unique().tolist()
- logger.info(f"任務 {job_id} 準備同步 {len(unique_dates)} 個日期的資料")
+ unique_dates = self._normalize_dates_for_sql(
+ df[date_col].dropna().unique() if date_col and date_col in df.columns else df['snapshot_date'].dropna().unique()
+ )
+ logger.info(f"任務 {job_id} 準備同步 {len(unique_dates)} 個日期的資料")
+ if not unique_dates:
+ error_msg = "realtime_sales_monthly 同步缺少有效日期,拒絕寫入"
+ self.update_job_status(job_id, 'failed', 85, '同步日期驗證失敗', error_msg)
+ logger.error(f"任務 {job_id} {error_msg}")
+ return False
- if len(unique_dates) > 0:
- # 刪除 realtime_sales_monthly 中相同日期的舊資料(去重)
- date_list_monthly = ', '.join([f"'{d}'" for d in unique_dates])
+ snapshot_date_expr = 'date(snapshot_date)' if engine.dialect.name == 'sqlite' else 'snapshot_date::date'
+ monthly_date_expr = 'date("日期")' if engine.dialect.name == 'sqlite' else '"日期"::date'
+ expected_daily_count = len(df[df['snapshot_date'].astype(str).isin(import_dates)])
+ expected_monthly_count = len(df_monthly)
- with engine.connect() as conn:
- delete_monthly_query = text(f'DELETE FROM {monthly_table} WHERE "日期" IN ({date_list_monthly})')
- result = conn.execute(delete_monthly_query)
- deleted_monthly = result.rowcount
- conn.commit()
+ max_retries = 2
+ retry_count = 0
+ while retry_count <= max_retries and not sync_success:
+ try:
+ if retry_count > 0:
+ logger.warning(f"任務 {job_id} 第 {retry_count} 次重試原子匯入...")
+ self.update_job_status(job_id, 'importing', 82, f'重試原子匯入中 ({retry_count}/{max_retries})...')
- if deleted_monthly > 0:
- logger.info(f"任務 {job_id} 已從 {monthly_table} 刪除 {deleted_monthly} 筆同日期舊資料")
+ self.update_job_status(job_id, 'importing', 85, '原子寫入兩張業績表...')
+ with engine.begin() as conn:
+ delete_snapshot_query = text(
+ f"DELETE FROM {table_name} WHERE {snapshot_date_expr} IN :dates"
+ ).bindparams(bindparam('dates', expanding=True))
+ deleted_snapshot = conn.execute(delete_snapshot_query, {'dates': import_dates}).rowcount
+ if deleted_snapshot > 0:
+ logger.info(f"已刪除 {deleted_snapshot} 筆 daily_sales_snapshot 舊資料(覆蓋模式)")
- # 寫入 realtime_sales_monthly
- df_monthly.to_sql(
- monthly_table,
- engine,
- if_exists='append',
- index=False,
- method='multi',
- chunksize=1000
- )
+ df.to_sql(
+ table_name,
+ conn,
+ if_exists='append',
+ index=False,
+ method='multi',
+ chunksize=1000
+ )
- logger.info(f"任務 {job_id} 已同步 {len(df_monthly)} 筆資料至 {monthly_table}")
+ verify_snapshot_query = text(
+ f"SELECT COUNT(*) FROM {table_name} WHERE {snapshot_date_expr} IN :dates"
+ ).bindparams(bindparam('dates', expanding=True))
+ daily_count = conn.execute(verify_snapshot_query, {'dates': import_dates}).scalar()
+ if daily_count < expected_daily_count:
+ raise RuntimeError(
+ f"daily_sales_snapshot 寫入驗證失敗: 預期 {expected_daily_count} 筆, 實際 {daily_count} 筆"
+ )
- # 驗證同步結果
- if len(unique_dates) > 0:
- with engine.connect() as conn:
- date_list_verify = ', '.join([f"'{d}'" for d in unique_dates])
- verify_query = text(f'SELECT COUNT(*) FROM {monthly_table} WHERE "日期" IN ({date_list_verify})')
- verify_count = conn.execute(verify_query).scalar()
+ delete_monthly_query = text(
+ f'DELETE FROM {monthly_table} WHERE {monthly_date_expr} IN :dates'
+ ).bindparams(bindparam('dates', expanding=True))
+ deleted_monthly = conn.execute(delete_monthly_query, {'dates': unique_dates}).rowcount
+ if deleted_monthly > 0:
+ logger.info(f"任務 {job_id} 已從 {monthly_table} 刪除 {deleted_monthly} 筆同日期舊資料")
- if verify_count >= len(df_monthly):
- logger.info(f"任務 {job_id} 同步驗證成功: {monthly_table} 現有 {verify_count} 筆資料")
- sync_success = True
- else:
- sync_error_msg = f"同步驗證失敗: 預期 {len(df_monthly)} 筆, 實際 {verify_count} 筆"
- logger.error(f"任務 {job_id} {sync_error_msg}")
- else:
- sync_success = True # 沒有日期資料時視為成功
+ df_monthly.to_sql(
+ monthly_table,
+ conn,
+ if_exists='append',
+ index=False,
+ method='multi',
+ chunksize=1000
+ )
- except Exception as sync_error:
- # 同步失敗,記錄完整錯誤
- import traceback
- sync_error_msg = str(sync_error)
- logger.error(f"任務 {job_id} 同步至 {monthly_table} 失敗: {sync_error_msg}")
- logger.error(f"任務 {job_id} 同步錯誤堆疊:\n{traceback.format_exc()}")
+ verify_monthly_query = text(
+ f'SELECT COUNT(*) FROM {monthly_table} WHERE {monthly_date_expr} IN :dates'
+ ).bindparams(bindparam('dates', expanding=True))
+ monthly_count = conn.execute(verify_monthly_query, {'dates': unique_dates}).scalar()
+ if monthly_count < expected_monthly_count:
+ raise RuntimeError(
+ f"{monthly_table} 寫入驗證失敗: 預期 {expected_monthly_count} 筆, 實際 {monthly_count} 筆"
+ )
- # 2026-01-30 新增:發送同步失敗告警
+ sync_success = True
+ logger.info(
+ f"任務 {job_id} 原子匯入成功: daily={expected_daily_count} 筆, "
+ f"monthly={expected_monthly_count} 筆"
+ )
+ except Exception as transaction_error:
+ retry_count += 1
+ sync_error_msg = str(transaction_error)
+ logger.error(
+ f"任務 {job_id} 原子匯入失敗 (嘗試 {retry_count}/{max_retries + 1}): {sync_error_msg}",
+ exc_info=True,
+ )
+ if retry_count > max_retries:
+ break
+
+ if not sync_success:
+ error_msg = f"原子匯入失敗,兩張表已回滾: {sync_error_msg}"
+ self.update_job_status(job_id, 'failed', 90, '原子匯入失敗', error_msg)
+ logger.error(f"任務 {job_id} {error_msg}")
try:
from services.notification_manager import NotificationManager
notifier = NotificationManager()
alert_msg = (
- f"⚠️ 業績資料同步失敗告警\n"
+ f"⚠️ 業績資料原子匯入失敗告警\n"
f"{'='*30}\n"
f"任務 ID: {job_id}\n"
- f"目標表: {monthly_table}\n"
f"錯誤: {sync_error_msg[:200]}\n"
f"{'='*30}\n"
- f"daily_sales_snapshot 已匯入成功,但業績分析儀表板需要手動同步"
+ f"本次 daily_sales_snapshot / realtime_sales_monthly 已一起 rollback,請檢查匯入檔案"
)
notifier._send_telegram_messages([alert_msg])
- logger.info(f"任務 {job_id} 已發送同步失敗告警")
+ logger.info(f"任務 {job_id} 已發送原子匯入失敗告警")
except Exception as notify_error:
logger.error(f"任務 {job_id} 發送告警失敗: {notify_error}")
+ return False
# 更新成功資訊
@@ -602,12 +650,15 @@ class ImportService:
# 計算日期範圍
date_min = None
date_max = None
- valid_dates = df['snapshot_date'].dropna().unique()
+ imported_dates = self._normalize_dates_for_sql(df['snapshot_date'].dropna().unique())
+ data_lag_days = None
+ valid_dates = imported_dates
if len(valid_dates) > 0:
- sorted_dates = sorted([d for d in valid_dates if d is not None])
+ sorted_dates = sorted(valid_dates)
if sorted_dates:
- date_min = str(sorted_dates[0])
- date_max = str(sorted_dates[-1])
+ date_min = sorted_dates[0]
+ date_max = sorted_dates[-1]
+ data_lag_days = self._calculate_data_lag_days(date_max)
logger.info(f"任務 {job_id} 日期範圍: {date_min} ~ {date_max}")
# 更新匯入摘要 (2026-01-30 修正:加入同步狀態)
@@ -625,6 +676,8 @@ class ImportService:
'verified': True, # daily_sales_snapshot 驗證
'date_min': date_min,
'date_max': date_max,
+ 'imported_dates': imported_dates,
+ 'data_lag_days': data_lag_days,
'message': sync_message
}
@@ -714,6 +767,8 @@ class ImportService:
imported_count = 0
total_rows = 0
all_dates = [] # 收集所有匯入的日期
+ failed_files = []
+ data_lag_days = None
for file in files:
file_id = file['id']
@@ -725,6 +780,8 @@ class ImportService:
# 建立匯入任務
job_id = self.create_import_job('daily_sales', file_id, file_name, file_size)
if not job_id:
+ failed_files.append(file_name)
+ logger.error(f"建立匯入任務失敗,跳過檔案: {file_name}")
continue
# 下載檔案
@@ -736,6 +793,8 @@ class ImportService:
if not drive_service.download_file(file_id, local_path):
self.update_job_status(job_id, 'failed', 10, '下載失敗', '無法從 Google Drive 下載檔案')
+ failed_files.append(file_name)
+ logger.error(f"Google Drive 檔案下載失敗: {file_name}")
continue
# 更新本地路徑
@@ -777,6 +836,19 @@ class ImportService:
all_dates.append(summary['date_min'])
if summary.get('date_max'):
all_dates.append(summary['date_max'])
+ all_dates.extend(summary.get('imported_dates') or [])
+ if summary.get('data_lag_days') is not None:
+ lag_value = summary.get('data_lag_days')
+ data_lag_days = lag_value if data_lag_days is None else max(data_lag_days, lag_value)
+ elif job:
+ # V-Fix: 防止摘要缺失時通知顯示 0 筆、日期未知。
+ # summary 是驗證與通知的主要來源;若缺失,至少回退到進度欄位並留下告警。
+ fallback_rows = job.success_rows or job.total_rows or 0
+ total_rows += fallback_rows
+ logger.warning(
+ f"任務 {job_id} 匯入成功但缺少 import_summary,"
+ f"已使用 job 進度欄位回補筆數: {fallback_rows}"
+ )
finally:
session.close()
@@ -786,6 +858,14 @@ class ImportService:
logger.info(f"已清理本地檔案: {local_path}")
except Exception as e:
logger.warning(f"清理本地檔案失敗: {str(e)}")
+ else:
+ failed_files.append(file_name)
+ logger.error(f"檔案匯入失敗,準備移至失敗資料夾: {file_name}")
+ failed_folder = self.get_config('gdrive_failed_folder', '匯入失敗')
+ if drive_service.move_file(file_id, failed_folder):
+ logger.info(f"已移動失敗檔案到「{failed_folder}」: {file_name}")
+ else:
+ logger.warning(f"無法移動失敗檔案,保留於原資料夾待人工檢查: {file_name}")
# 計算日期範圍
date_range = None
@@ -797,13 +877,32 @@ class ImportService:
'max': sorted_dates[-1]
}
+ if failed_files or imported_count == 0:
+ failed_count = len(files) - imported_count
+ failed_label = '、'.join(failed_files[:5]) if failed_files else '無成功匯入檔案'
+ return {
+ 'success': False,
+ 'message': (
+ f'找到 {len(files)} 個檔案,但成功匯入 {imported_count} 個、'
+ f'失敗 {failed_count} 個:{failed_label}'
+ ),
+ 'file_count': len(files),
+ 'imported_count': imported_count,
+ 'failed_count': failed_count,
+ 'failed_files': failed_files,
+ 'total_rows': total_rows,
+ 'date_range': date_range,
+ 'data_lag_days': data_lag_days
+ }
+
return {
'success': True,
'message': f'成功匯入 {imported_count} 個檔案',
'file_count': len(files),
'imported_count': imported_count,
'total_rows': total_rows,
- 'date_range': date_range
+ 'date_range': date_range,
+ 'data_lag_days': data_lag_days
}
except Exception as e:
diff --git a/templates/components/_ewoooc_shell.html b/templates/components/_ewoooc_shell.html
index f62d016..3d90399 100644
--- a/templates/components/_ewoooc_shell.html
+++ b/templates/components/_ewoooc_shell.html
@@ -194,11 +194,37 @@
活動看板
02
-
-
- 分析報表
- 03
-
+
+
+ 分析報表
+ 03
+
+