diff --git a/TODO_NEXT_STEPS.txt b/TODO_NEXT_STEPS.txt index fd61cb7..6b327f1 100644 --- a/TODO_NEXT_STEPS.txt +++ b/TODO_NEXT_STEPS.txt @@ -4,6 +4,7 @@ ================================================================================ 【已完成】 + - V10.276 修正 ElephantAlpha 價格類 Hermes prefetch timeout:`price_drop` / `market_opportunity` trigger 直接把 SQL 命中的 MOMO / PChome 價差實證轉成 HITL action lines,完整 Hermes LLM prefetch 預設關閉;無 DB 實證仍只記 suppressed telemetry / cooldown,不寫 `human_review`、不發空 Telegram。 - V10.266 強化核心 MOMO/PChome 比價鏈路:新增 `marketplace_product_matcher.py` 身份比對、只讓 `identity_v2` 且分數 ≥ 0.76 的高信心配對進 Dashboard/AI/Excel/Daily/Growth/PPT,並建立 `competitor_intel_repository.py` 統一圖表與簡報資料出口;同品牌但不同型號、不同組數、套組/單品或多品項不一致會進待審,不進正式比價。 - V10.267 專業化 ElephantAlpha `resource_optimization` 告警:不再讓 LLM 生成「48 小時預期效益 / 已執行」敘事,改由程式量測 action queue、P1/P2、pending_review、逾時項目與 CPU load;單純 backlog 不發 Telegram,只有可行動資源壓力才寫 `ai_insights(resource_pressure)` 並發送量測型告警。 - V10.254 續補 `/growth_analysis` 快取命中效能:PostgreSQL source fingerprint 加 60 秒短 TTL,匯入 realtime_sales_monthly 後同步清除 growth shared cache 與短快取,避免快取命中仍頻繁掃大表 COUNT。 diff --git a/config.py b/config.py index d647840..f08fe7e 100644 --- a/config.py +++ b/config.py @@ -320,7 +320,7 @@ YOUTUBE_API_KEY = os.getenv('YOUTUBE_API_KEY', '') # ========================================== # 系統版本與路徑 # ========================================== -SYSTEM_VERSION = "V10.275" +SYSTEM_VERSION = "V10.276" LOG_FILE_PATH = os.path.join(BASE_DIR, 'logs/system.log') public_url = PUBLIC_URL # 用於模板顯示 diff --git a/docs/AI_INTELLIGENCE_MODULE_SOT.md b/docs/AI_INTELLIGENCE_MODULE_SOT.md index ab90548..d0fcc94 100644 --- a/docs/AI_INTELLIGENCE_MODULE_SOT.md +++ b/docs/AI_INTELLIGENCE_MODULE_SOT.md @@ -2,7 +2,7 @@ > **最後更新**: 2026-05-19 (台北時間) > **狀態**: 🟢 四 AI Agent 自動化閉環已落地;LLM 路由紅線升級為 Ollama-first 三主機級聯,Gemini 僅備援 / 鎖定場景 -> **適用版本**: V10.274 +> **適用版本**: V10.276 --- @@ -106,6 +106,7 @@ SQL漏斗(~300筆) - CD rebuild 模式必須先 build image 成功,再短暫 stop/rm/recreate 三應用容器,避免 no-cache build 造成長時間 502。 - ElephantAlpha 使用 NVIDIA NIM hosted API;production 預設模型為 `nvidia/llama-3.3-nemotron-super-49b-v1.5`,`ELEPHANT_ALPHA_FALLBACK_MODELS` 需保留至少一個可呼叫備援;403/404、408/409/425/429、5xx、timeout 與 connection error 必須嘗試下一個模型。 - ElephantAlpha L3 HITL 只允許發送有實證、可審核、可行動的升級告警;價格類 trigger 無 Hermes 具體威脅時,只記錄 suppressed escalation telemetry 與 cooldown,不寫 pending `human_review`,不發 Telegram 空告警。 +- ElephantAlpha 價格類 trigger 的 HITL / 決策 prefetch 必須先使用觸發 SQL 與 `competitor_prices` / `price_records` 的 DB 實證生成 SKU、MOMO / PChome 價差與建議 action lines;完整 Hermes LLM prefetch 預設關閉(`ELEPHANT_ALPHA_HERMES_LLM_PREFETCH_ENABLED=false`),避免 5s timeout 後落入無實證摘要或雲端備援。若無 DB 實證,只記錄 suppressed telemetry / cooldown,不發 Telegram 空告警。 - `resource_optimization` 不再交給 LLM 生成「預期效益 / 已執行」敘事。此 trigger 必須先由程式量測 `action_plans` backlog、P1/P2 數、pending_review、逾時項目與 CPU load;只有 CPU 達門檻、P1/P2 積壓或逾時積壓才發 Telegram「資源壓力告警」。單純 queue 大但 CPU 正常只記錄 telemetry,不派發 Hermes/NemoTron、不宣稱 48 小時效益。 - `resource_optimization` 會先執行 `ActionPlanHygieneService` 清理過期噪音:只關閉超過 72 小時的 `code_review_fix` / `openclaw_recommendation` 類 advisory action_plans,以及 NemoTron `direct_response/reply_simple` 舊聊天回覆計畫;將狀態改為 `auto_disabled` 或 `rejected` 並寫入 `metadata_json.hygiene_history`。不刪資料,也不碰 NemoTron human_review / pricing / tool action 類業務行動。 - `momo-scheduler` 每 6 小時固定執行 `run_action_plan_hygiene_task()`,讓過期 advisory action_plans 的關閉不再依賴 `resource_optimization` 告警觸發;排程失敗會經 EventRouter 發送 `action_plan_hygiene_failure`。 diff --git a/services/elephant_alpha_autonomous_engine.py b/services/elephant_alpha_autonomous_engine.py index c99b64d..69e4ed4 100644 --- a/services/elephant_alpha_autonomous_engine.py +++ b/services/elephant_alpha_autonomous_engine.py @@ -53,6 +53,7 @@ RESOURCE_HIGH_PRIORITY_THRESHOLD = int(os.getenv("ELEPHANT_ALPHA_RESOURCE_HIGH_P RESOURCE_STALE_THRESHOLD = int(os.getenv("ELEPHANT_ALPHA_RESOURCE_STALE_THRESHOLD", "5")) RESOURCE_STALE_HOURS = int(os.getenv("ELEPHANT_ALPHA_RESOURCE_STALE_HOURS", "24")) RESOURCE_HYGIENE_ENABLED = os.getenv("ELEPHANT_ALPHA_RESOURCE_HYGIENE_ENABLED", "true").lower() in {"1", "true", "yes", "on"} +HERMES_LLM_PREFETCH_ENABLED = os.getenv("ELEPHANT_ALPHA_HERMES_LLM_PREFETCH_ENABLED", "false").lower() in {"1", "true", "yes", "on"} # ---- Constants ---- _ALLOWED_ACTION_TYPES = frozenset({ @@ -373,7 +374,10 @@ class ElephantAlphaAutonomousEngine: text(""" SELECT p.i_code AS sku, p.name, p.category, cp.price AS competitor_price, pr.price AS momo_price, - ((pr.price - cp.price) / pr.price * 100) AS price_gap_pct + ((pr.price - cp.price) / NULLIF(pr.price, 0) * 100) AS price_gap_pct, + cp.competitor_product_id, + cp.competitor_product_name, + cp.crawled_at FROM products p JOIN ( SELECT DISTINCT ON (product_id) product_id, price @@ -388,7 +392,14 @@ class ElephantAlphaAutonomousEngine: AND cp.crawled_at >= NOW() - INTERVAL '2 hours' LIMIT 10 """) - ).fetchall() + ).mappings().fetchall() + trigger.conditions = dict(trigger.conditions or {}) + trigger.conditions["_db_evidence_actions"] = self._format_competitor_evidence_actions( + rows, + top_n=5, + trigger_type=trigger.trigger_type, + ) + trigger.conditions["db_evidence_count"] = len(rows) return len(rows) >= 3 finally: session.close() @@ -398,7 +409,12 @@ class ElephantAlphaAutonomousEngine: try: rows = session.execute( text(""" - SELECT p.i_code AS sku + SELECT p.i_code AS sku, p.name, p.category, + cp.price AS competitor_price, pr.price AS momo_price, + ((pr.price - cp.price) / NULLIF(pr.price, 0) * 100) AS price_gap_pct, + cp.competitor_product_id, + cp.competitor_product_name, + cp.crawled_at FROM products p JOIN ( SELECT DISTINCT ON (product_id) product_id, price @@ -413,7 +429,14 @@ class ElephantAlphaAutonomousEngine: AND cp.crawled_at >= NOW() - INTERVAL '1 hour' LIMIT 5 """) - ).fetchall() + ).mappings().fetchall() + trigger.conditions = dict(trigger.conditions or {}) + trigger.conditions["_db_evidence_actions"] = self._format_competitor_evidence_actions( + rows, + top_n=5, + trigger_type=trigger.trigger_type, + ) + trigger.conditions["db_evidence_count"] = len(rows) return bool(rows) finally: session.close() @@ -574,7 +597,10 @@ class ElephantAlphaAutonomousEngine: # 預期:>70% 的 trigger 在 Hermes 階段就被攔截,月省 Gemini ~3-5M tokens if trigger.trigger_type in _PRICE_RELATED_TRIGGERS: try: - hermes_threats = await self._fetch_hermes_threats_summary(top_n=5) + hermes_threats = ( + self._get_trigger_db_concrete_actions(trigger) + or await self._fetch_hermes_threats_summary(top_n=5) + ) except Exception as e: self._log.warning("Hermes pre-flight check 失敗 (non-blocking): %s", e) hermes_threats = None @@ -1134,6 +1160,115 @@ class ElephantAlphaAutonomousEngine: self._log.error("Resource pressure Telegram failed (non-blocking): %s", e) # ---- Sub-services ---- + @classmethod + def _format_competitor_evidence_actions( + cls, + rows: Any, + *, + top_n: int = 5, + trigger_type: str = "", + ) -> List[str]: + actions: List[str] = [] + for row in list(rows or [])[:top_n]: + sku = str(cls._row_get(row, "sku") or "").strip() + name = str(cls._row_get(row, "name") or "")[:24] + momo = cls._to_float(cls._row_get(row, "momo_price")) or 0.0 + pchome = cls._to_float(cls._row_get(row, "competitor_price")) or 0.0 + if not sku or momo <= 0 or pchome <= 0: + continue + gap_pct = cls._to_float(cls._row_get(row, "price_gap_pct")) + if gap_pct is None: + gap_pct = (momo - pchome) / momo * 100 if momo else 0.0 + gap_amount = abs(momo - pchome) + if gap_amount <= 0: + continue + + competitor_id = str(cls._row_get(row, "competitor_product_id") or "").strip() + comparison = f"MOMO ${momo:,.0f} vs PChome ${pchome:,.0f} ({gap_pct:+.1f}%)" + if trigger_type == "market_opportunity" or pchome > momo: + action = "建議加強曝光或列入 AI 挑品,不需降價" + impact = f"MOMO 每件價格優勢 NT$ {gap_amount:,.0f}" + else: + action = "建議人工確認 PChome identity_v2 後評估跟價或促銷" + impact = f"每件價差 NT$ {gap_amount:,.0f}" + + parts = [f"[{sku}] {name}", comparison, impact, action] + if competitor_id: + parts.append(f"PChome {competitor_id}") + actions.append("|".join(parts)) + return actions + + @staticmethod + def _to_float(value: Any) -> Optional[float]: + if value is None: + return None + try: + return float(value) + except (TypeError, ValueError): + return None + + def _fetch_recent_competitor_evidence_actions(self, top_n: int = 5) -> Optional[List[str]]: + """用最新 DB 價差產生 EA HITL 實證,不啟動完整 Hermes LLM。""" + session = get_session() + try: + rows = session.execute( + text(""" + WITH latest_momo AS ( + SELECT DISTINCT ON (p.i_code) + p.i_code AS sku, + p.name, + p.category, + pr.price AS momo_price, + pr.timestamp + FROM products p + JOIN price_records pr ON pr.product_id = p.id + WHERE p.status = 'ACTIVE' + AND pr.price IS NOT NULL + AND pr.price > 0 + ORDER BY p.i_code, pr.timestamp DESC, pr.id DESC + ), + latest_competitor AS ( + SELECT DISTINCT ON (cp.sku) + cp.sku, + cp.price AS competitor_price, + cp.competitor_product_id, + cp.competitor_product_name, + cp.crawled_at + FROM competitor_prices cp + WHERE cp.source = 'pchome' + AND (cp.expires_at IS NULL OR cp.expires_at > NOW()) + AND cp.price IS NOT NULL + AND cp.price > 0 + AND COALESCE(cp.match_score, 0) >= 0.76 + AND COALESCE(cp.tags, '[]'::jsonb) ? 'identity_v2' + AND cp.crawled_at >= NOW() - INTERVAL '2 hours' + ORDER BY cp.sku, cp.crawled_at DESC NULLS LAST + ) + SELECT lm.sku, lm.name, lm.category, + lm.momo_price, + lc.competitor_price, + ((lm.momo_price - lc.competitor_price) / NULLIF(lm.momo_price, 0) * 100) AS price_gap_pct, + lc.competitor_product_id, + lc.competitor_product_name, + lc.crawled_at + FROM latest_momo lm + JOIN latest_competitor lc ON lc.sku = lm.sku + WHERE lc.competitor_price < lm.momo_price * 0.85 + OR lc.competitor_price > lm.momo_price * 1.05 + ORDER BY ABS((lm.momo_price - lc.competitor_price) / NULLIF(lm.momo_price, 0)) DESC NULLS LAST, + lc.crawled_at DESC NULLS LAST + LIMIT :limit + """), + {"limit": int(top_n or 5)}, + ).mappings().fetchall() + actions = self._format_competitor_evidence_actions(rows, top_n=top_n) + return actions or None + except Exception as exc: + self._log.warning("EA DB evidence prefetch failed (non-blocking): %s", exc) + return None + finally: + session.close() + async def _hermes_analyze(self) -> Any: from services.hermes_analyst_service import HermesAnalystService return await self._run_with_timeout( @@ -1153,6 +1288,20 @@ class ElephantAlphaAutonomousEngine: (Hermes 完整 run 可能 30-60s,HITL 訊息應快速送出) Critic High-2 fix: 若每筆都缺 loss/rec_price,視同無料、return None 觸發 fallback """ + db_actions = self._fetch_recent_competitor_evidence_actions(top_n=top_n) + if db_actions: + self._log.info( + "EA prefetch DB evidence produced %d concrete actions", + len(db_actions), + ) + return db_actions + + if not HERMES_LLM_PREFETCH_ENABLED: + self._log.info( + "EA Hermes LLM prefetch disabled; no recent DB evidence actions" + ) + return None + # 使用 5s 短超時:Hermes 熱駐留時實測 < 10s,但若需冷啟動會拖到 30s+ # HITL 訊息延遲不可大於 10s(影響統帥決策時效性),寧可 fallback 到原 plan 文字 try: @@ -1339,6 +1488,14 @@ class ElephantAlphaAutonomousEngine: cleaned = [str(action).strip() for action in actions if str(action).strip()] return cleaned[:5] or None + @staticmethod + def _get_trigger_db_concrete_actions(trigger: AutonomousTrigger) -> Optional[List[str]]: + actions = (trigger.conditions or {}).get("_db_evidence_actions") + if not isinstance(actions, list): + return None + cleaned = [str(action).strip() for action in actions if str(action).strip()] + return cleaned[:5] or None + @staticmethod def _should_suppress_no_concrete_escalation(trigger: AutonomousTrigger) -> bool: return ( @@ -1385,7 +1542,10 @@ class ElephantAlphaAutonomousEngine: async def _escalate_to_human(self, decision: StrategicDecision, trigger: AutonomousTrigger) -> None: self._log.warning("Escalating to human: %s", trigger.trigger_type) - concrete_actions = self._get_prefetched_concrete_actions(trigger) + concrete_actions = ( + self._get_prefetched_concrete_actions(trigger) + or self._get_trigger_db_concrete_actions(trigger) + ) if not concrete_actions and trigger.trigger_type in _PRICE_RELATED_TRIGGERS: try: concrete_actions = await self._fetch_hermes_threats_summary(top_n=5) diff --git a/tests/test_elephant_alpha_engine.py b/tests/test_elephant_alpha_engine.py index 542ff6e..ad53da3 100644 --- a/tests/test_elephant_alpha_engine.py +++ b/tests/test_elephant_alpha_engine.py @@ -116,6 +116,76 @@ def test_execute_autonomous_decision_logs_short_circuit_telemetry_failure(monkey assert "EA short-circuit telemetry failed" in caplog.text +def test_competitor_db_evidence_actions_are_concrete(): + from services.elephant_alpha_autonomous_engine import ElephantAlphaAutonomousEngine + + actions = ElephantAlphaAutonomousEngine._format_competitor_evidence_actions( + [ + { + "sku": "SKU-1", + "name": "測試商品", + "momo_price": 1200, + "competitor_price": 990, + "price_gap_pct": 17.5, + "competitor_product_id": "D123456", + } + ], + trigger_type="price_drop_alert", + ) + + assert actions == [ + "[SKU-1] 測試商品|MOMO $1,200 vs PChome $990 (+17.5%)|每件價差 NT$ 210|建議人工確認 PChome identity_v2 後評估跟價或促銷|PChome D123456" + ] + + +def test_execute_autonomous_decision_uses_db_evidence_without_hermes_prefetch(monkeypatch): + import services.elephant_alpha_autonomous_engine as engine_module + from services.elephant_alpha_autonomous_engine import ( + AutonomousTrigger, + ElephantAlphaAutonomousEngine, + ) + from services.elephant_alpha_orchestrator import StrategicDecision + + engine = ElephantAlphaAutonomousEngine() + contexts = [] + notified = [] + + async def _capture_context(context): + contexts.append(context) + return StrategicDecision( + priority="high", + agents_required=["elephant_alpha"], + reasoning="已有 DB 價差實證,允許進入決策流程。", + expected_outcome="生成可稽核行動", + confidence=0.95, + execution_plan=[], + resource_requirements={}, + ) + + async def _fetch_should_not_run(top_n=5): + raise AssertionError("DB evidence should avoid Hermes LLM prefetch") + + async def _capture_notify(decision, trigger): + notified.append(trigger.trigger_type) + + monkeypatch.setattr(engine_module.elephant_orchestrator, "analyze_and_coordinate", _capture_context) + monkeypatch.setattr(engine, "_fetch_hermes_threats_summary", _fetch_should_not_run) + monkeypatch.setattr(engine, "_notify_telegram_executed", _capture_notify) + monkeypatch.setattr(engine, "_store_escalation", lambda trigger_type: None) + + trigger = AutonomousTrigger( + trigger_type="price_drop_alert", + conditions={"_db_evidence_actions": ["[SKU-1] DB 實證價差"]}, + threshold=0.8, + enabled=True, + ) + + asyncio.run(engine._execute_autonomous_decision(trigger)) + + assert contexts[0]["conditions"]["_prefetched_hermes_threats"] == ["[SKU-1] DB 實證價差"] + assert notified == ["price_drop_alert"] + + def test_escalate_resource_optimization_without_evidence_is_suppressed(monkeypatch): import services.elephant_alpha_autonomous_engine as engine_module from services.elephant_alpha_autonomous_engine import (