[V10.276] 修正 EA 價格觸發 DB 實證 prefetch | elephant_alpha_autonomous_engine.py
All checks were successful
CD Pipeline / deploy (push) Successful in 1m5s

This commit is contained in:
OoO
2026-05-19 21:41:21 +08:00
parent 6461d2453f
commit e3b4ed9d1e
5 changed files with 240 additions and 8 deletions

View File

@@ -4,6 +4,7 @@
================================================================================
【已完成】
- V10.276 修正 ElephantAlpha 價格類 Hermes prefetch timeout`price_drop` / `market_opportunity` trigger 直接把 SQL 命中的 MOMO / PChome 價差實證轉成 HITL action lines完整 Hermes LLM prefetch 預設關閉;無 DB 實證仍只記 suppressed telemetry / cooldown不寫 `human_review`、不發空 Telegram。
- V10.266 強化核心 MOMO/PChome 比價鏈路:新增 `marketplace_product_matcher.py` 身份比對、只讓 `identity_v2` 且分數 ≥ 0.76 的高信心配對進 Dashboard/AI/Excel/Daily/Growth/PPT並建立 `competitor_intel_repository.py` 統一圖表與簡報資料出口;同品牌但不同型號、不同組數、套組/單品或多品項不一致會進待審,不進正式比價。
- V10.267 專業化 ElephantAlpha `resource_optimization` 告警:不再讓 LLM 生成「48 小時預期效益 / 已執行」敘事,改由程式量測 action queue、P1/P2、pending_review、逾時項目與 CPU load單純 backlog 不發 Telegram只有可行動資源壓力才寫 `ai_insights(resource_pressure)` 並發送量測型告警。
- V10.254 續補 `/growth_analysis` 快取命中效能PostgreSQL source fingerprint 加 60 秒短 TTL匯入 realtime_sales_monthly 後同步清除 growth shared cache 與短快取,避免快取命中仍頻繁掃大表 COUNT。

View File

@@ -320,7 +320,7 @@ YOUTUBE_API_KEY = os.getenv('YOUTUBE_API_KEY', '')
# ==========================================
# 系統版本與路徑
# ==========================================
SYSTEM_VERSION = "V10.275"
SYSTEM_VERSION = "V10.276"
LOG_FILE_PATH = os.path.join(BASE_DIR, 'logs/system.log')
public_url = PUBLIC_URL # 用於模板顯示

View File

@@ -2,7 +2,7 @@
> **最後更新**: 2026-05-19 (台北時間)
> **狀態**: 🟢 四 AI Agent 自動化閉環已落地LLM 路由紅線升級為 Ollama-first 三主機級聯Gemini 僅備援 / 鎖定場景
> **適用版本**: V10.274
> **適用版本**: V10.276
---
@@ -106,6 +106,7 @@ SQL漏斗(~300筆)
- CD rebuild 模式必須先 build image 成功,再短暫 stop/rm/recreate 三應用容器,避免 no-cache build 造成長時間 502。
- ElephantAlpha 使用 NVIDIA NIM hosted APIproduction 預設模型為 `nvidia/llama-3.3-nemotron-super-49b-v1.5``ELEPHANT_ALPHA_FALLBACK_MODELS` 需保留至少一個可呼叫備援403/404、408/409/425/429、5xx、timeout 與 connection error 必須嘗試下一個模型。
- ElephantAlpha L3 HITL 只允許發送有實證、可審核、可行動的升級告警;價格類 trigger 無 Hermes 具體威脅時,只記錄 suppressed escalation telemetry 與 cooldown不寫 pending `human_review`,不發 Telegram 空告警。
- ElephantAlpha 價格類 trigger 的 HITL / 決策 prefetch 必須先使用觸發 SQL 與 `competitor_prices` / `price_records` 的 DB 實證生成 SKU、MOMO / PChome 價差與建議 action lines完整 Hermes LLM prefetch 預設關閉(`ELEPHANT_ALPHA_HERMES_LLM_PREFETCH_ENABLED=false`),避免 5s timeout 後落入無實證摘要或雲端備援。若無 DB 實證,只記錄 suppressed telemetry / cooldown不發 Telegram 空告警。
- `resource_optimization` 不再交給 LLM 生成「預期效益 / 已執行」敘事。此 trigger 必須先由程式量測 `action_plans` backlog、P1/P2 數、pending_review、逾時項目與 CPU load只有 CPU 達門檻、P1/P2 積壓或逾時積壓才發 Telegram「資源壓力告警」。單純 queue 大但 CPU 正常只記錄 telemetry不派發 Hermes/NemoTron、不宣稱 48 小時效益。
- `resource_optimization` 會先執行 `ActionPlanHygieneService` 清理過期噪音:只關閉超過 72 小時的 `code_review_fix` / `openclaw_recommendation` 類 advisory action_plans以及 NemoTron `direct_response/reply_simple` 舊聊天回覆計畫;將狀態改為 `auto_disabled``rejected` 並寫入 `metadata_json.hygiene_history`。不刪資料,也不碰 NemoTron human_review / pricing / tool action 類業務行動。
- `momo-scheduler` 每 6 小時固定執行 `run_action_plan_hygiene_task()`,讓過期 advisory action_plans 的關閉不再依賴 `resource_optimization` 告警觸發;排程失敗會經 EventRouter 發送 `action_plan_hygiene_failure`

View File

@@ -53,6 +53,7 @@ RESOURCE_HIGH_PRIORITY_THRESHOLD = int(os.getenv("ELEPHANT_ALPHA_RESOURCE_HIGH_P
RESOURCE_STALE_THRESHOLD = int(os.getenv("ELEPHANT_ALPHA_RESOURCE_STALE_THRESHOLD", "5"))
RESOURCE_STALE_HOURS = int(os.getenv("ELEPHANT_ALPHA_RESOURCE_STALE_HOURS", "24"))
RESOURCE_HYGIENE_ENABLED = os.getenv("ELEPHANT_ALPHA_RESOURCE_HYGIENE_ENABLED", "true").lower() in {"1", "true", "yes", "on"}
HERMES_LLM_PREFETCH_ENABLED = os.getenv("ELEPHANT_ALPHA_HERMES_LLM_PREFETCH_ENABLED", "false").lower() in {"1", "true", "yes", "on"}
# ---- Constants ----
_ALLOWED_ACTION_TYPES = frozenset({
@@ -373,7 +374,10 @@ class ElephantAlphaAutonomousEngine:
text("""
SELECT p.i_code AS sku, p.name, p.category,
cp.price AS competitor_price, pr.price AS momo_price,
((pr.price - cp.price) / pr.price * 100) AS price_gap_pct
((pr.price - cp.price) / NULLIF(pr.price, 0) * 100) AS price_gap_pct,
cp.competitor_product_id,
cp.competitor_product_name,
cp.crawled_at
FROM products p
JOIN (
SELECT DISTINCT ON (product_id) product_id, price
@@ -388,7 +392,14 @@ class ElephantAlphaAutonomousEngine:
AND cp.crawled_at >= NOW() - INTERVAL '2 hours'
LIMIT 10
""")
).fetchall()
).mappings().fetchall()
trigger.conditions = dict(trigger.conditions or {})
trigger.conditions["_db_evidence_actions"] = self._format_competitor_evidence_actions(
rows,
top_n=5,
trigger_type=trigger.trigger_type,
)
trigger.conditions["db_evidence_count"] = len(rows)
return len(rows) >= 3
finally:
session.close()
@@ -398,7 +409,12 @@ class ElephantAlphaAutonomousEngine:
try:
rows = session.execute(
text("""
SELECT p.i_code AS sku
SELECT p.i_code AS sku, p.name, p.category,
cp.price AS competitor_price, pr.price AS momo_price,
((pr.price - cp.price) / NULLIF(pr.price, 0) * 100) AS price_gap_pct,
cp.competitor_product_id,
cp.competitor_product_name,
cp.crawled_at
FROM products p
JOIN (
SELECT DISTINCT ON (product_id) product_id, price
@@ -413,7 +429,14 @@ class ElephantAlphaAutonomousEngine:
AND cp.crawled_at >= NOW() - INTERVAL '1 hour'
LIMIT 5
""")
).fetchall()
).mappings().fetchall()
trigger.conditions = dict(trigger.conditions or {})
trigger.conditions["_db_evidence_actions"] = self._format_competitor_evidence_actions(
rows,
top_n=5,
trigger_type=trigger.trigger_type,
)
trigger.conditions["db_evidence_count"] = len(rows)
return bool(rows)
finally:
session.close()
@@ -574,7 +597,10 @@ class ElephantAlphaAutonomousEngine:
# 預期:>70% 的 trigger 在 Hermes 階段就被攔截,月省 Gemini ~3-5M tokens
if trigger.trigger_type in _PRICE_RELATED_TRIGGERS:
try:
hermes_threats = await self._fetch_hermes_threats_summary(top_n=5)
hermes_threats = (
self._get_trigger_db_concrete_actions(trigger)
or await self._fetch_hermes_threats_summary(top_n=5)
)
except Exception as e:
self._log.warning("Hermes pre-flight check 失敗 (non-blocking): %s", e)
hermes_threats = None
@@ -1134,6 +1160,115 @@ class ElephantAlphaAutonomousEngine:
self._log.error("Resource pressure Telegram failed (non-blocking): %s", e)
# ---- Sub-services ----
@classmethod
def _format_competitor_evidence_actions(
cls,
rows: Any,
*,
top_n: int = 5,
trigger_type: str = "",
) -> List[str]:
actions: List[str] = []
for row in list(rows or [])[:top_n]:
sku = str(cls._row_get(row, "sku") or "").strip()
name = str(cls._row_get(row, "name") or "")[:24]
momo = cls._to_float(cls._row_get(row, "momo_price")) or 0.0
pchome = cls._to_float(cls._row_get(row, "competitor_price")) or 0.0
if not sku or momo <= 0 or pchome <= 0:
continue
gap_pct = cls._to_float(cls._row_get(row, "price_gap_pct"))
if gap_pct is None:
gap_pct = (momo - pchome) / momo * 100 if momo else 0.0
gap_amount = abs(momo - pchome)
if gap_amount <= 0:
continue
competitor_id = str(cls._row_get(row, "competitor_product_id") or "").strip()
comparison = f"MOMO ${momo:,.0f} vs PChome ${pchome:,.0f} ({gap_pct:+.1f}%)"
if trigger_type == "market_opportunity" or pchome > momo:
action = "建議加強曝光或列入 AI 挑品,不需降價"
impact = f"MOMO 每件價格優勢 NT$ {gap_amount:,.0f}"
else:
action = "建議人工確認 PChome identity_v2 後評估跟價或促銷"
impact = f"每件價差 NT$ {gap_amount:,.0f}"
parts = [f"[{sku}] {name}", comparison, impact, action]
if competitor_id:
parts.append(f"PChome {competitor_id}")
actions.append("".join(parts))
return actions
@staticmethod
def _to_float(value: Any) -> Optional[float]:
if value is None:
return None
try:
return float(value)
except (TypeError, ValueError):
return None
def _fetch_recent_competitor_evidence_actions(self, top_n: int = 5) -> Optional[List[str]]:
"""用最新 DB 價差產生 EA HITL 實證,不啟動完整 Hermes LLM。"""
session = get_session()
try:
rows = session.execute(
text("""
WITH latest_momo AS (
SELECT DISTINCT ON (p.i_code)
p.i_code AS sku,
p.name,
p.category,
pr.price AS momo_price,
pr.timestamp
FROM products p
JOIN price_records pr ON pr.product_id = p.id
WHERE p.status = 'ACTIVE'
AND pr.price IS NOT NULL
AND pr.price > 0
ORDER BY p.i_code, pr.timestamp DESC, pr.id DESC
),
latest_competitor AS (
SELECT DISTINCT ON (cp.sku)
cp.sku,
cp.price AS competitor_price,
cp.competitor_product_id,
cp.competitor_product_name,
cp.crawled_at
FROM competitor_prices cp
WHERE cp.source = 'pchome'
AND (cp.expires_at IS NULL OR cp.expires_at > NOW())
AND cp.price IS NOT NULL
AND cp.price > 0
AND COALESCE(cp.match_score, 0) >= 0.76
AND COALESCE(cp.tags, '[]'::jsonb) ? 'identity_v2'
AND cp.crawled_at >= NOW() - INTERVAL '2 hours'
ORDER BY cp.sku, cp.crawled_at DESC NULLS LAST
)
SELECT lm.sku, lm.name, lm.category,
lm.momo_price,
lc.competitor_price,
((lm.momo_price - lc.competitor_price) / NULLIF(lm.momo_price, 0) * 100) AS price_gap_pct,
lc.competitor_product_id,
lc.competitor_product_name,
lc.crawled_at
FROM latest_momo lm
JOIN latest_competitor lc ON lc.sku = lm.sku
WHERE lc.competitor_price < lm.momo_price * 0.85
OR lc.competitor_price > lm.momo_price * 1.05
ORDER BY ABS((lm.momo_price - lc.competitor_price) / NULLIF(lm.momo_price, 0)) DESC NULLS LAST,
lc.crawled_at DESC NULLS LAST
LIMIT :limit
"""),
{"limit": int(top_n or 5)},
).mappings().fetchall()
actions = self._format_competitor_evidence_actions(rows, top_n=top_n)
return actions or None
except Exception as exc:
self._log.warning("EA DB evidence prefetch failed (non-blocking): %s", exc)
return None
finally:
session.close()
async def _hermes_analyze(self) -> Any:
from services.hermes_analyst_service import HermesAnalystService
return await self._run_with_timeout(
@@ -1153,6 +1288,20 @@ class ElephantAlphaAutonomousEngine:
Hermes 完整 run 可能 30-60sHITL 訊息應快速送出)
Critic High-2 fix: 若每筆都缺 loss/rec_price視同無料、return None 觸發 fallback
"""
db_actions = self._fetch_recent_competitor_evidence_actions(top_n=top_n)
if db_actions:
self._log.info(
"EA prefetch DB evidence produced %d concrete actions",
len(db_actions),
)
return db_actions
if not HERMES_LLM_PREFETCH_ENABLED:
self._log.info(
"EA Hermes LLM prefetch disabled; no recent DB evidence actions"
)
return None
# 使用 5s 短超時Hermes 熱駐留時實測 < 10s但若需冷啟動會拖到 30s+
# HITL 訊息延遲不可大於 10s影響統帥決策時效性寧可 fallback 到原 plan 文字
try:
@@ -1339,6 +1488,14 @@ class ElephantAlphaAutonomousEngine:
cleaned = [str(action).strip() for action in actions if str(action).strip()]
return cleaned[:5] or None
@staticmethod
def _get_trigger_db_concrete_actions(trigger: AutonomousTrigger) -> Optional[List[str]]:
actions = (trigger.conditions or {}).get("_db_evidence_actions")
if not isinstance(actions, list):
return None
cleaned = [str(action).strip() for action in actions if str(action).strip()]
return cleaned[:5] or None
@staticmethod
def _should_suppress_no_concrete_escalation(trigger: AutonomousTrigger) -> bool:
return (
@@ -1385,7 +1542,10 @@ class ElephantAlphaAutonomousEngine:
async def _escalate_to_human(self, decision: StrategicDecision, trigger: AutonomousTrigger) -> None:
self._log.warning("Escalating to human: %s", trigger.trigger_type)
concrete_actions = self._get_prefetched_concrete_actions(trigger)
concrete_actions = (
self._get_prefetched_concrete_actions(trigger)
or self._get_trigger_db_concrete_actions(trigger)
)
if not concrete_actions and trigger.trigger_type in _PRICE_RELATED_TRIGGERS:
try:
concrete_actions = await self._fetch_hermes_threats_summary(top_n=5)

View File

@@ -116,6 +116,76 @@ def test_execute_autonomous_decision_logs_short_circuit_telemetry_failure(monkey
assert "EA short-circuit telemetry failed" in caplog.text
def test_competitor_db_evidence_actions_are_concrete():
from services.elephant_alpha_autonomous_engine import ElephantAlphaAutonomousEngine
actions = ElephantAlphaAutonomousEngine._format_competitor_evidence_actions(
[
{
"sku": "SKU-1",
"name": "測試商品",
"momo_price": 1200,
"competitor_price": 990,
"price_gap_pct": 17.5,
"competitor_product_id": "D123456",
}
],
trigger_type="price_drop_alert",
)
assert actions == [
"[SKU-1] 測試商品MOMO $1,200 vs PChome $990 (+17.5%)|每件價差 NT$ 210建議人工確認 PChome identity_v2 後評估跟價或促銷PChome D123456"
]
def test_execute_autonomous_decision_uses_db_evidence_without_hermes_prefetch(monkeypatch):
import services.elephant_alpha_autonomous_engine as engine_module
from services.elephant_alpha_autonomous_engine import (
AutonomousTrigger,
ElephantAlphaAutonomousEngine,
)
from services.elephant_alpha_orchestrator import StrategicDecision
engine = ElephantAlphaAutonomousEngine()
contexts = []
notified = []
async def _capture_context(context):
contexts.append(context)
return StrategicDecision(
priority="high",
agents_required=["elephant_alpha"],
reasoning="已有 DB 價差實證,允許進入決策流程。",
expected_outcome="生成可稽核行動",
confidence=0.95,
execution_plan=[],
resource_requirements={},
)
async def _fetch_should_not_run(top_n=5):
raise AssertionError("DB evidence should avoid Hermes LLM prefetch")
async def _capture_notify(decision, trigger):
notified.append(trigger.trigger_type)
monkeypatch.setattr(engine_module.elephant_orchestrator, "analyze_and_coordinate", _capture_context)
monkeypatch.setattr(engine, "_fetch_hermes_threats_summary", _fetch_should_not_run)
monkeypatch.setattr(engine, "_notify_telegram_executed", _capture_notify)
monkeypatch.setattr(engine, "_store_escalation", lambda trigger_type: None)
trigger = AutonomousTrigger(
trigger_type="price_drop_alert",
conditions={"_db_evidence_actions": ["[SKU-1] DB 實證價差"]},
threshold=0.8,
enabled=True,
)
asyncio.run(engine._execute_autonomous_decision(trigger))
assert contexts[0]["conditions"]["_prefetched_hermes_threats"] == ["[SKU-1] DB 實證價差"]
assert notified == ["price_drop_alert"]
def test_escalate_resource_optimization_without_evidence_is_suppressed(monkeypatch):
import services.elephant_alpha_autonomous_engine as engine_module
from services.elephant_alpha_autonomous_engine import (