Files
ewoooc/services/webcrumbs_host_data_service.py
ogt 3351f5f93e
Some checks failed
CD Pipeline / deploy (push) Has been cancelled
清除產品表面 AI 自動化人工阻擋
2026-07-01 13:36:36 +08:00

207 lines
8.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Host data adapter for Webcrumbs shared-ui plugins.
This module is intentionally read-only. It converts the canonical
MOMO/PChome competitor intelligence payload into the small
``window.StockPlatformSharedUI`` shape consumed by the shared plugin
runtime.
"""
from __future__ import annotations
from typing import Any
from services.competitor_intel_repository import (
fetch_competitor_coverage,
fetch_competitor_review_queue,
fetch_top_competitor_risks,
summarize_review_decision_envelopes,
)
def _num(value: Any, default: float = 0.0) -> float:
try:
return float(value)
except (TypeError, ValueError):
return default
def _money(value: Any) -> str:
return f"NT${_num(value):,.0f}"
def _short_text(value: Any, limit: int = 42) -> str:
text = str(value or "").strip()
if len(text) <= limit:
return text
return f"{text[:limit - 1]}"
def _risk_level(gap_pct: float) -> str:
if gap_pct >= 15:
return "high"
if gap_pct >= 8:
return "medium"
return "watch"
def _is_direct_price_alert(item: dict) -> bool:
return (
str(item.get("match_type") or "") == "exact"
and str(item.get("price_basis") or "") == "total_price"
and str(item.get("alert_tier") or "") == "price_alert_exact"
)
def _empty_payload(reason: str = "no_price_alert_exact") -> dict:
return {
"marketSnapshot": [
{
"name": "MOMO/PChome exact price alert",
"price": "not_available",
"change_pct": "not_available",
"freshness_status": reason,
}
],
"aiCandidate": {
"ticker": "-",
"name": "目前沒有可直接告警的 exact 同款價差",
"thesis": "資料源已接入,但目前沒有符合 exact / total_price / price_alert_exact 的高風險候選;非同款、單位價或變體候選會進入 AI 例外決策隊列。",
"confidence_score": "not_available",
"risk_level": "none",
"release_status": "blocked",
"evidence_refs": ["competitor_prices", "price_records", reason],
"updated_at": "read_only",
},
}
def _coverage_metadata(coverage: dict, row_count: int) -> dict:
return {
"source": "competitor_intel_repository",
"matched_count": int(coverage.get("valid_matches") or coverage.get("matched_count") or 0),
"coverage_rate": _num(coverage.get("match_rate")),
"identity_coverage_rate": _num(coverage.get("match_rate")),
"decision_ready_count": int(
coverage.get("decision_ready_matches")
or coverage.get("fresh_matches")
or coverage.get("fresh_match_count")
or 0
),
"decision_ready_rate": _num(coverage.get("decision_ready_rate")),
"decision_support_count": int(coverage.get("decision_support_count") or 0),
"decision_support_rate": _num(coverage.get("decision_support_rate")),
"decision_support_non_exact_count": int(
coverage.get("decision_support_non_exact_count") or 0
),
"catalog_comparable_count": int(coverage.get("catalog_comparable_count") or 0),
"catalog_comparable_rate": _num(coverage.get("catalog_comparable_rate")),
"catalog_variant_review_count": int(coverage.get("catalog_variant_review_count") or 0),
"catalog_unit_review_count": int(coverage.get("catalog_unit_review_count") or 0),
"catalog_identity_review_count": int(coverage.get("catalog_identity_review_count") or 0),
"catalog_review_plan": coverage.get("catalog_review_plan") or {},
"unit_comparable_count": int(coverage.get("unit_comparable_count") or 0),
"fresh_match_count": int(coverage.get("fresh_matches") or coverage.get("fresh_match_count") or 0),
"fresh_match_rate": _num(coverage.get("fresh_match_rate")),
"stale_match_count": int(coverage.get("stale_matches") or coverage.get("stale_match_count") or 0),
"pending_match_count": int(coverage.get("pending") or coverage.get("pending_match_count") or 0),
"row_count": row_count,
"writes_database": False,
"calls_llm": False,
"fetches_external": False,
}
def _review_decision_brief(engine, limit: int) -> dict:
"""Read the shared review decision brief without blocking host-data preview."""
try:
review_queue = fetch_competitor_review_queue(engine, limit=min(max(limit, 1), 8))
return summarize_review_decision_envelopes(review_queue, limit=min(max(limit, 1), 5))
except Exception:
return summarize_review_decision_envelopes([], limit=min(max(limit, 1), 5))
def _attach_review_brief(payload: dict, review_brief: dict) -> dict:
payload["reviewDecisionBrief"] = review_brief
metadata = payload.setdefault("metadata", {})
metadata["review_queue_count"] = len(review_brief.get("items") or [])
metadata["hitl_count"] = 0
metadata["primary_human_gate_count"] = 0
metadata["ai_exception_count"] = int(review_brief.get("auto_execute_blocked_count") or 0)
metadata["auto_execute_blocked_count"] = int(
review_brief.get("auto_execute_blocked_count") or 0
)
metadata["decision_envelope_source"] = "competitor_intel_repository"
return payload
def build_webcrumbs_marketplace_host_data(engine=None, limit: int = 5) -> dict:
"""Build read-only host data for the Webcrumbs diagnostic page.
Args:
engine: Optional SQLAlchemy engine. When omitted, the default
momo-pro database manager is used.
limit: Maximum number of price-alert rows exposed to the shared-ui
preview. Kept small because this powers a diagnostic surface.
"""
if engine is None:
from database.manager import DatabaseManager
engine = DatabaseManager().engine
limit = max(1, min(int(limit or 5), 8))
raw_risks = fetch_top_competitor_risks(engine, limit=max(limit * 4, 20)) or []
risks = [item for item in raw_risks if _is_direct_price_alert(item)][:limit]
coverage = fetch_competitor_coverage(engine) or {}
review_brief = _review_decision_brief(engine, limit=limit)
if not risks:
payload = _empty_payload("no_current_exact_risk")
payload["metadata"] = _coverage_metadata(coverage, row_count=0)
return _attach_review_brief(payload, review_brief)
rows = []
for item in risks[:limit]:
gap_pct = _num(item.get("gap_pct"))
momo_price = _num(item.get("momo_price"))
pchome_price = _num(item.get("pchome_price"))
rows.append({
"name": f"{item.get('sku') or '-'} {_short_text(item.get('name'))}",
"price": pchome_price,
"change_pct": gap_pct,
"freshness_status": item.get("alert_tier") or "price_alert_exact",
"momo_price": momo_price,
"pchome_price": pchome_price,
"match_score": _num(item.get("match_score")),
"competitor_product_id": item.get("pchome_id") or "",
"competitor_product_name": _short_text(item.get("pchome_name"), 48),
})
top = risks[0]
top_gap = _num(top.get("gap_pct"))
top_score = _num(top.get("match_score"))
sku = str(top.get("sku") or "-")
evidence_refs = ["competitor_prices", "price_records", "exact", "total_price", "price_alert_exact"]
payload = {
"marketSnapshot": rows,
"aiCandidate": {
"ticker": sku,
"name": _short_text(top.get("name"), 58),
"thesis": (
f"MOMO {_money(top.get('momo_price'))} vs PChome {_money(top.get('pchome_price'))}"
f"價差 {top_gap:+.1f}%MOMO - PChome"
f"此候選已通過 exact / total_price / price_alert_exact 只讀過濾,"
f"match_score={top_score:.2f};由 AI 決策信封檢查促銷、庫存與商品頁條件後再採取價格或曝光調整。"
),
"confidence_score": round(top_score, 2),
"risk_level": _risk_level(top_gap),
"release_status": "review_required",
"evidence_refs": evidence_refs,
"updated_at": top.get("crawled_at") or "latest_competitor_prices",
},
"metadata": _coverage_metadata(coverage, row_count=len(rows)),
}
return _attach_review_brief(payload, review_brief)