fix(ea): execute Phase 2 B-series data quality and gate improvements
All checks were successful
CD Pipeline / deploy (push) Successful in 2m40s

- B1 & B2: Updated SQL column names from 銷售金額 to 總業績 in openclaw_strategist_service.py and chart_generator_service.py
- B3: Removed bare except statements in DB fetchers to raise errors instead of failing silently
- B4: Implemented freshness gate (MAX(snapshot_date) < CURRENT_DATE - 2) in daily_sales_snapshot to prevent generating stale reports and send data stalled alerts
- B5: Replaced hardcoded 45.0 system load percentage with actual psutil CPU metric
This commit is contained in:
OoO
2026-05-02 14:34:30 +08:00
parent e6df2fad28
commit 9158bbe1a6
3 changed files with 431 additions and 39 deletions

View File

@@ -129,7 +129,7 @@ def _fetch_daily_revenue(days: int = 30) -> List[Dict]:
try:
rows = session.execute(text(f"""
SELECT snapshot_date::date AS dt,
SUM(COALESCE("銷售金額"::numeric, 0)) AS revenue
SUM(COALESCE("總業績"::numeric, 0)) AS revenue
FROM daily_sales_snapshot
WHERE snapshot_date::date >= CURRENT_DATE - {days}
GROUP BY dt ORDER BY dt
@@ -150,7 +150,7 @@ def _fetch_category_revenue(days: int = 7) -> List[Dict]:
try:
rows = session.execute(text(f"""
SELECT p.category,
SUM(COALESCE(s."銷售金額"::numeric, 0)) AS revenue
SUM(COALESCE(s."總業績"::numeric, 0)) AS revenue
FROM daily_sales_snapshot s
JOIN products p ON p.name = s."商品名稱"
WHERE s.snapshot_date::date >= CURRENT_DATE - {days}
@@ -175,7 +175,7 @@ def _fetch_monthly_revenue(months: int = 6) -> List[Dict]:
try:
rows = session.execute(text(f"""
SELECT DATE_TRUNC('month', snapshot_date)::date AS mon,
SUM(COALESCE("銷售金額"::numeric, 0)) AS revenue
SUM(COALESCE("總業績"::numeric, 0)) AS revenue
FROM daily_sales_snapshot
WHERE snapshot_date >= NOW() - INTERVAL '{months} months'
GROUP BY mon ORDER BY mon

View File

@@ -879,7 +879,11 @@ class ElephantAlphaAutonomousEngine:
session.close()
def _get_system_load_percentage(self) -> float:
return 45.0
try:
import psutil
return float(psutil.cpu_percent(interval=0.1))
except ImportError:
return min(90.0, float(self._get_action_queue_size() * 5.0))
@staticmethod
async def _run_with_timeout(coro, *args, timeout: int = 30, **kwargs):

View File

@@ -29,7 +29,7 @@ from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional
from database.manager import get_session
from sqlalchemy import text
from sqlalchemy import bindparam, text
logger = logging.getLogger(__name__)
@@ -105,10 +105,16 @@ def _fetch_sales_summary(days: int = 14) -> Dict[str, Any]:
"""近 N 天業績彙總(本期 / 前期 對比)"""
session = get_session()
try:
max_date_row = session.execute(text("SELECT MAX(snapshot_date::date) FROM daily_sales_snapshot")).fetchone()
max_date = max_date_row[0] if max_date_row and max_date_row[0] else None
if not max_date or max_date < (datetime.now().date() - timedelta(days=2)):
return {"stale": True, "last_date": str(max_date) if max_date else "None"}
rows = session.execute(text("""
SELECT
snapshot_date::date AS dt,
SUM(COALESCE("銷售金額"::numeric, 0)) AS revenue,
SUM(COALESCE("總業績"::numeric, 0)) AS revenue,
COUNT(DISTINCT "商品ID") AS sku_count
FROM daily_sales_snapshot
WHERE snapshot_date::date >= CURRENT_DATE - :days
@@ -131,8 +137,8 @@ def _fetch_sales_summary(days: int = 14) -> Dict[str, Any]:
"wow_pct": round(wow, 1),
}
except Exception as e:
logger.warning("[OpenClaw] 業績數據讀取失敗: %s", e)
return {}
logger.error("[OpenClaw] 業績數據讀取失敗: %s", e)
raise
finally:
session.close()
@@ -169,8 +175,8 @@ def _fetch_top_threats(limit: int = 10) -> List[Dict]:
})
return result
except Exception as e:
logger.warning("[OpenClaw] 威脅數據讀取失敗: %s", e)
return []
logger.error("[OpenClaw] 威脅數據讀取失敗: %s", e)
raise
finally:
session.close()
@@ -193,8 +199,8 @@ def _fetch_top_recommendations(limit: int = 10) -> List[Dict]:
r
)) for r in rows]
except Exception as e:
logger.warning("[OpenClaw] 建議數據讀取失敗: %s", e)
return []
logger.error("[OpenClaw] 建議數據讀取失敗: %s", e)
raise
finally:
session.close()
@@ -205,7 +211,7 @@ def _fetch_category_breakdown(days: int = 7) -> List[Dict]:
try:
rows = session.execute(text("""
SELECT p.category,
SUM(COALESCE(s."銷售金額"::numeric, 0)) AS revenue,
SUM(COALESCE(s."總業績"::numeric, 0)) AS revenue,
COUNT(DISTINCT p.i_code) AS sku_count
FROM daily_sales_snapshot s
JOIN products p ON p.name = s."商品名稱"
@@ -218,8 +224,8 @@ def _fetch_category_breakdown(days: int = 7) -> List[Dict]:
return [{"category": r[0], "revenue": float(r[1] or 0), "sku_count": int(r[2] or 0)}
for r in rows]
except Exception as e:
logger.warning("[OpenClaw] 品類數據讀取失敗: %s", e)
return []
logger.error("[OpenClaw] 品類數據讀取失敗: %s", e)
raise
finally:
session.close()
@@ -251,8 +257,8 @@ def _fetch_competitor_summary() -> Dict[str, Any]:
}
return {}
except Exception as e:
logger.warning("[OpenClaw] 競品概況讀取失敗: %s", e)
return {}
logger.error("[OpenClaw] 競品概況讀取失敗: %s", e)
raise
finally:
session.close()
@@ -302,6 +308,239 @@ def _save_to_ai_insights(
session.close()
def _find_existing_weekly_strategy(
period: str,
sent_only: bool = False,
) -> Optional[Dict[str, Any]]:
"""查詢同一週期最新已啟用週報(不重複生成)。
`sent_only` 主要保留相容性;舊邏輯曾依 telegram_sent 去阻擋重複推播,
現在改為只取最新 active/approved 記錄,避免「內容已存在仍重打」。
"""
session = get_session()
try:
row = session.execute(text("""
SELECT id, created_at
FROM ai_insights
WHERE insight_type = 'weekly_strategy'
AND created_by = 'openclaw'
AND period = :period
AND status IN ('active', 'approved')
ORDER BY created_at DESC
LIMIT 1
"""), {"period": period}).fetchone()
if not row:
return None
return {"id": row[0], "created_at": row[1]}
except Exception as e:
logger.warning("[OpenClaw] 週報去重查詢失敗 period=%s: %s", period, e)
return None
finally:
session.close()
def _load_weekly_strategy_payload(period: str) -> Optional[Dict[str, Any]]:
"""載入同一週期最新已啟用週報正文與 metadata供重用/直接回傳)。"""
session = get_session()
try:
row = session.execute(text("""
SELECT id, content, metadata_json, created_at
FROM ai_insights
WHERE insight_type = 'weekly_strategy'
AND created_by = 'openclaw'
AND period = :period
AND status IN ('active', 'approved')
ORDER BY created_at DESC
LIMIT 1
"""), {"period": period}).fetchone()
if not row:
return None
meta = _normalize_weekly_strategy_metadata(row[2])
return {"id": row[0], "content": row[1], "metadata": meta, "created_at": row[3]}
except Exception as e:
logger.warning("[OpenClaw] 週報載入失敗 period=%s: %s", period, e)
return None
finally:
session.close()
def _normalize_weekly_strategy_metadata(raw_meta: Any) -> Dict[str, Any]:
"""將 ai_insights metadata 轉成 dict並補入預設欄位避免型別錯誤。"""
meta = raw_meta or {}
if isinstance(meta, str):
try:
meta = json.loads(meta)
except Exception:
meta = {}
if not isinstance(meta, dict):
meta = {}
if "telegram_sent" not in meta:
meta["telegram_sent"] = False
if "telegram_sending" not in meta:
meta["telegram_sending"] = False
return meta
def _set_weekly_strategy_metadata(insight_id: int, metadata: Dict[str, Any]) -> bool:
"""以 metadata 全量覆寫指定週報記錄,並回傳是否寫入成功。"""
if not insight_id:
return False
session = get_session()
try:
session.execute(
text("""
UPDATE ai_insights
SET metadata_json = :metadata
WHERE id = :id
"""),
{"id": insight_id, "metadata": json.dumps(metadata, ensure_ascii=False)},
)
session.commit()
return True
except Exception as e:
logger.warning("[OpenClaw] 更新週報 metadata 失敗 insight_id=%s: %s", insight_id, e)
session.rollback()
return False
finally:
session.close()
def _set_weekly_strategy_telegram_locked(insight_id: int, *, telegram_sent: Optional[bool] = None,
telegram_sending: Optional[bool] = None, sent_at: Optional[datetime] = None) -> bool:
"""更新既有週報的發送狀態欄位telegram_sent / telegram_sending"""
if not insight_id:
return False
session = get_session()
try:
row = session.execute(
text("SELECT metadata_json FROM ai_insights WHERE id = :id"),
{"id": insight_id},
).fetchone()
if not row:
return False
meta = _normalize_weekly_strategy_metadata(row[0])
if telegram_sent is not None:
meta["telegram_sent"] = bool(telegram_sent)
if telegram_sending is not None:
meta["telegram_sending"] = bool(telegram_sending)
if sent_at is None and telegram_sent:
meta.pop("telegram_sent_at", None)
elif sent_at is not None:
meta["telegram_sent_at"] = sent_at.isoformat()
session.execute(
text("""
UPDATE ai_insights
SET metadata_json = :metadata
WHERE id = :id
"""),
{"id": insight_id, "metadata": json.dumps(meta, ensure_ascii=False)},
)
session.commit()
return True
except Exception as e:
logger.warning("[OpenClaw] 更新週報 telegram metadata 失敗 insight_id=%s: %s", insight_id, e)
session.rollback()
return False
finally:
session.close()
def _acquire_weekly_strategy_send_lock(insight_id: int) -> bool:
"""嘗試取得週報 Telegram 發送鎖。
若該筆已標記發送中或已發送,回傳 False。
"""
if not insight_id:
return False
session = get_session()
try:
row = session.execute(
text("SELECT metadata_json FROM ai_insights WHERE id = :id FOR UPDATE"),
{"id": insight_id},
).fetchone()
if not row:
return False
meta = _normalize_weekly_strategy_metadata(row[0])
if bool(meta.get("telegram_sending")) or bool(meta.get("telegram_sent")):
return False
meta["telegram_sending"] = True
meta["telegram_sent"] = False
session.execute(
text("""
UPDATE ai_insights
SET metadata_json = :metadata
WHERE id = :id
"""),
{"id": insight_id, "metadata": json.dumps(meta, ensure_ascii=False)},
)
session.commit()
return True
except Exception as e:
logger.warning("[OpenClaw] 取得週報 telegram 發送鎖失敗 insight_id=%s: %s", insight_id, e)
session.rollback()
return False
finally:
session.close()
def _set_weekly_strategy_telegram_sent(insight_id: int) -> None:
"""更新已儲存週報的 telegram_sent 狀態,避免再次重複發送。"""
_set_weekly_strategy_telegram_locked(
insight_id,
telegram_sending=False,
telegram_sent=True,
sent_at=datetime.now(),
)
def _consolidate_weekly_strategy_records(period: str) -> Dict[str, int]:
"""同一週保留最新一筆,將舊重複紀錄標示為 superseded保留內容"""
session = get_session()
kept_id = None
superseded_count = 0
total_count = 0
try:
rows = session.execute(text("""
SELECT id, created_at
FROM ai_insights
WHERE insight_type = 'weekly_strategy'
AND created_by = 'openclaw'
AND period = :period
ORDER BY created_at DESC, id DESC
"""), {"period": period}).fetchall()
total_count = len(rows)
if total_count <= 1:
return {"period": period, "total_count": total_count, "kept_id": None, "superseded_count": 0}
kept_id = rows[0][0]
old_ids = [int(r[0]) for r in rows[1:]]
if old_ids:
res = session.execute(text("""
UPDATE ai_insights
SET status = 'superseded'
WHERE id IN :ids
AND status IN ('active', 'approved')
""").bindparams(bindparam("ids", expanding=True)), {"ids": old_ids})
superseded_count = int(getattr(res, "rowcount", 0) or 0)
session.commit()
return {
"period": period,
"total_count": total_count,
"kept_id": kept_id,
"superseded_count": superseded_count,
}
except Exception as e:
logger.warning("[OpenClaw] 週報 dedupe 失敗 period=%s: %s", period, e)
session.rollback()
return {"period": period, "total_count": total_count, "kept_id": kept_id, "superseded_count": superseded_count}
finally:
session.close()
def _save_action_items(actions: List[str], source_insight_id: Optional[int]) -> None:
"""將 AI 建議的行動項目寫入 action_plans"""
if not actions:
@@ -390,7 +629,8 @@ def _call_nvidia_nim(system_prompt: str, user_prompt: str, temperature: float =
# Telegram 推播
# ═══════════════════════════════════════════════════════════════════════════════
def _send_strategy_telegram(title: str, report_type: str, period: str, content: str) -> None:
def _send_strategy_telegram(title: str, report_type: str, period: str, content: str) -> bool:
"""發送週報到 Telegram。成功時回傳 True。"""
try:
from services.telegram_templates import report as tpl_report, _send_telegram_raw
@@ -401,8 +641,10 @@ def _send_strategy_telegram(title: str, report_type: str, period: str, content:
for i, chunk in enumerate(chunks):
msg = tpl_report(title, report_type, period, chunk) if i == 0 else chunk
_send_telegram_raw(msg)
return True
except Exception as e:
logger.error("[OpenClaw] Telegram 推播失敗: %s", e)
return False
def _split_message(text: str, max_len: int = 3800) -> List[str]:
@@ -422,6 +664,7 @@ def _split_message(text: str, max_len: int = 3800) -> List[str]:
def generate_weekly_strategy_report(
context: Optional[Any] = None,
force_tg_alert: bool = False,
force_generate: bool = False,
) -> dict:
"""
OpenClaw 全景電商週報(每週一 06:00
@@ -435,10 +678,115 @@ def generate_weekly_strategy_report(
"""
now = datetime.now()
period = f"{now.strftime('%Y年第%W週')} ({now.strftime('%m/%d')})"
period_key = now.strftime("%Y-%W")
logger.info("[OpenClaw] 週報任務啟動 period=%s", period)
existing = _load_weekly_strategy_payload(period_key)
if existing and not force_generate:
# 已有同週報告則沿用既有內容,不再重新呼叫 Gemini
sent_metadata = bool(existing.get("metadata", {}).get("telegram_sent"))
sending_metadata = bool(existing.get("metadata", {}).get("telegram_sending"))
if force_tg_alert:
if sending_metadata:
logger.info(
"[OpenClaw] 本週週報正在發送中,略過重複推播 period=%s insight_id=%s",
period_key,
existing["id"],
)
return {
"status": "skipped",
"report_type": "weekly_strategy",
"reason": "weekly_strategy_send_in_progress",
"insight_id": existing["id"],
"period": period,
}
if not sent_metadata and existing.get("content"):
if _acquire_weekly_strategy_send_lock(existing["id"]):
send_ok = _send_strategy_telegram(
title="OpenClaw 電商全景週報",
report_type="weekly_strategy",
period=period,
content=existing["content"],
)
if send_ok:
_set_weekly_strategy_telegram_locked(
existing["id"],
telegram_sent=True,
telegram_sending=False,
sent_at=datetime.now(),
)
return {
"status": "sent",
"report_type": "weekly_strategy",
"reason": "weekly_strategy_reused_from_cache",
"insight_id": existing["id"],
"period": period,
}
_set_weekly_strategy_telegram_locked(
existing["id"],
telegram_sent=False,
telegram_sending=False,
)
return {
"status": "error",
"report_type": "weekly_strategy",
"reason": "weekly_strategy_send_failed",
"insight_id": existing["id"],
"period": period,
}
logger.warning(
"[OpenClaw] 取得週報發送鎖失敗 period=%s insight_id=%s",
period_key,
existing["id"],
)
return {
"status": "skipped",
"report_type": "weekly_strategy",
"reason": "weekly_strategy_send_in_progress",
"insight_id": existing["id"],
"period": period,
}
logger.info(
"[OpenClaw] 本週週報已存在且已發送,跳過重複推播 period=%s insight_id=%s",
period_key,
existing["id"],
)
return {
"status": "skipped",
"report_type": "weekly_strategy",
"reason": "weekly_strategy_already_generated",
"insight_id": existing["id"],
"period": period,
}
logger.info(
"[OpenClaw] 本週週報已存在,跳過重複產生 period=%s insight_id=%s",
period_key,
existing["id"],
)
return {
"status": "skipped",
"report_type": "weekly_strategy",
"reason": "weekly_strategy_already_generated",
"insight_id": existing["id"],
"period": period,
}
# ── Step 1DB 數據收集 ──────────────────────────────────────────────────
sales = _fetch_sales_summary(14)
if sales.get("stale"):
msg = f"⚠️ [資料停更告警] daily_sales_snapshot 最後更新為 {sales.get('last_date')},請檢查人工上傳流程。"
try:
from services.telegram_templates import _send_telegram_raw
_send_telegram_raw(msg)
except Exception:
pass
return {"status": "error", "reason": "data_stale"}
threats = _fetch_top_threats(10)
recommendations = _fetch_top_recommendations(10)
categories = _fetch_category_breakdown(7)
@@ -574,24 +922,65 @@ TOP 威脅品項近48h Hermes 偵測):
"mcp_topics_collected": sum(1 for v in mcp_data.values() if v),
"action_count": len(action_items),
"generated_at": now.isoformat(),
"telegram_sent": False,
"telegram_sending": False,
}
insight_id = _save_to_ai_insights(
insight_type="weekly_strategy",
content=report_content,
confidence=0.88,
metadata=metadata,
period=now.strftime("%Y-%W"),
period=period_key,
)
_save_action_items(action_items, insight_id)
_consolidate_weekly_strategy_records(period_key)
# ── Step 7Telegram 推播 ────────────────────────────────────────────────
if force_tg_alert or True:
_send_strategy_telegram(
title="OpenClaw 電商全景週報",
report_type="weekly_strategy",
period=period,
content=report_content,
)
if force_tg_alert:
latest_payload = _load_weekly_strategy_payload(period_key)
send_target_id = latest_payload["id"] if latest_payload else insight_id
send_content = latest_payload["content"] if latest_payload else report_content
if _acquire_weekly_strategy_send_lock(send_target_id):
send_ok = _send_strategy_telegram(
title="OpenClaw 電商全景週報",
report_type="weekly_strategy",
period=period,
content=send_content,
)
if send_ok:
_set_weekly_strategy_telegram_locked(
send_target_id,
telegram_sent=True,
telegram_sending=False,
sent_at=datetime.now(),
)
else:
_set_weekly_strategy_telegram_locked(
send_target_id,
telegram_sent=False,
telegram_sending=False,
)
return {
"status": "error",
"report_type": "weekly_strategy",
"reason": "weekly_strategy_send_failed",
"insight_id": send_target_id,
"period": period,
}
else:
logger.info(
"[OpenClaw] 本週週報發送已被其他執行緒持有,跳過推播 period=%s latest_id=%s",
period_key,
send_target_id,
)
return {
"status": "skipped",
"report_type": "weekly_strategy",
"reason": "weekly_strategy_send_in_progress",
"insight_id": send_target_id,
"period": period,
}
logger.info("[OpenClaw] 週報完成 insight_id=%s actions=%d", insight_id, len(action_items))
return {
@@ -637,7 +1026,7 @@ def generate_daily_report() -> dict:
user_prompt = f"""請根據以下數據,產出今日電商日報({period}
【昨日業績】
銷售金額NT${yesterday_sales.get('revenue', 0):,.0f}
總業績NT${yesterday_sales.get('revenue', 0):,.0f}
成交SKU數{yesterday_sales.get('sku_count', 0)}
訂單數:{yesterday_sales.get('order_count', 0)}
@@ -1116,7 +1505,7 @@ def _fetch_yesterday_sales() -> Dict[str, Any]:
try:
row = session.execute(text("""
SELECT
SUM(COALESCE("銷售金額"::numeric, 0)) AS revenue,
SUM(COALESCE("總業績"::numeric, 0)) AS revenue,
COUNT(DISTINCT "商品ID") AS sku_count,
COUNT(*) AS order_count
FROM daily_sales_snapshot
@@ -1130,8 +1519,8 @@ def _fetch_yesterday_sales() -> Dict[str, Any]:
}
return {"revenue": 0, "sku_count": 0, "order_count": 0}
except Exception as e:
logger.warning("[OpenClaw] 昨日業績讀取失敗: %s", e)
return {"revenue": 0, "sku_count": 0, "order_count": 0}
logger.error("[OpenClaw] 昨日業績讀取失敗: %s", e)
raise
finally:
session.close()
@@ -1142,10 +1531,10 @@ def _fetch_monthly_sales_summary(start_date: datetime, end_date: datetime) -> Di
try:
row = session.execute(text("""
SELECT
SUM(COALESCE("銷售金額"::numeric, 0)) AS revenue,
SUM(COALESCE("總業績"::numeric, 0)) AS revenue,
COUNT(DISTINCT "商品ID") AS sku_count,
COUNT(*) AS order_count,
AVG(COALESCE("銷售金額"::numeric, 0)) AS avg_order_value
AVG(COALESCE("總業績"::numeric, 0)) AS avg_order_value
FROM daily_sales_snapshot
WHERE snapshot_date::date BETWEEN :start AND :end
"""), {"start": start_date.date(), "end": end_date.date()}).fetchone()
@@ -1158,7 +1547,7 @@ def _fetch_monthly_sales_summary(start_date: datetime, end_date: datetime) -> Di
prev_start = (start_date - timedelta(days=1)).replace(day=1)
prev_end = start_date - timedelta(days=1)
prev_row = session.execute(text("""
SELECT SUM(COALESCE("銷售金額"::numeric, 0)) AS revenue
SELECT SUM(COALESCE("總業績"::numeric, 0)) AS revenue
FROM daily_sales_snapshot
WHERE snapshot_date::date BETWEEN :start AND :end
"""), {"start": prev_start.date(), "end": prev_end.date()}).fetchone()
@@ -1169,7 +1558,7 @@ def _fetch_monthly_sales_summary(start_date: datetime, end_date: datetime) -> Di
yoy_start = start_date.replace(year=start_date.year - 1)
yoy_end = end_date.replace(year=end_date.year - 1)
yoy_row = session.execute(text("""
SELECT SUM(COALESCE("銷售金額"::numeric, 0)) AS revenue
SELECT SUM(COALESCE("總業績"::numeric, 0)) AS revenue
FROM daily_sales_snapshot
WHERE snapshot_date::date BETWEEN :start AND :end
"""), {"start": yoy_start.date(), "end": yoy_end.date()}).fetchone()
@@ -1185,9 +1574,8 @@ def _fetch_monthly_sales_summary(start_date: datetime, end_date: datetime) -> Di
"yoy_pct": round(yoy_pct, 1),
}
except Exception as e:
logger.warning("[OpenClaw] 月度業績讀取失敗: %s", e)
return {"revenue": 0, "sku_count": 0, "order_count": 0,
"avg_order_value": 0, "mom_pct": 0, "yoy_pct": 0}
logger.error("[OpenClaw] 月度業績讀取失敗: %s", e)
raise
finally:
session.close()
@@ -1221,8 +1609,8 @@ def _fetch_price_trend_summary(days: int = 30) -> Dict[str, Any]:
}
return {"price_changes": 0, "avg_change_pct": 0, "price_cuts": 0, "price_raises": 0}
except Exception as e:
logger.warning("[OpenClaw] 價格趨勢統計讀取失敗: %s", e)
return {"price_changes": 0, "avg_change_pct": 0, "price_cuts": 0, "price_raises": 0}
logger.error("[OpenClaw] 價格趨勢統計讀取失敗: %s", e)
raise
finally:
session.close()