Files
ewoooc/services/webcrumbs_host_data_service.py
OoO b2604a576d
All checks were successful
CD Pipeline / deploy (push) Successful in 1m23s
V10.575 拆分型錄可比覆核 lane
2026-06-04 11:13:35 +08:00

205 lines
8.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Host data adapter for Webcrumbs shared-ui plugins.
This module is intentionally read-only. It converts the canonical
MOMO/PChome competitor intelligence payload into the small
``window.StockPlatformSharedUI`` shape consumed by the shared plugin
runtime.
"""
from __future__ import annotations
from typing import Any
from services.competitor_intel_repository import (
fetch_competitor_coverage,
fetch_competitor_review_queue,
fetch_top_competitor_risks,
summarize_review_decision_envelopes,
)
def _num(value: Any, default: float = 0.0) -> float:
try:
return float(value)
except (TypeError, ValueError):
return default
def _money(value: Any) -> str:
return f"NT${_num(value):,.0f}"
def _short_text(value: Any, limit: int = 42) -> str:
text = str(value or "").strip()
if len(text) <= limit:
return text
return f"{text[:limit - 1]}"
def _risk_level(gap_pct: float) -> str:
if gap_pct >= 15:
return "high"
if gap_pct >= 8:
return "medium"
return "watch"
def _is_direct_price_alert(item: dict) -> bool:
return (
str(item.get("match_type") or "") == "exact"
and str(item.get("price_basis") or "") == "total_price"
and str(item.get("alert_tier") or "") == "price_alert_exact"
)
def _empty_payload(reason: str = "no_price_alert_exact") -> dict:
return {
"marketSnapshot": [
{
"name": "MOMO/PChome exact price alert",
"price": "not_available",
"change_pct": "not_available",
"freshness_status": reason,
}
],
"aiCandidate": {
"ticker": "-",
"name": "目前沒有可直接告警的 exact 同款價差",
"thesis": "資料源已接入,但目前沒有符合 exact / total_price / price_alert_exact 的高風險候選;非同款、單位價或變體候選仍須留在人工覆核隊列。",
"confidence_score": "not_available",
"risk_level": "none",
"release_status": "blocked",
"evidence_refs": ["competitor_prices", "price_records", reason],
"updated_at": "read_only",
},
}
def _coverage_metadata(coverage: dict, row_count: int) -> dict:
return {
"source": "competitor_intel_repository",
"matched_count": int(coverage.get("valid_matches") or coverage.get("matched_count") or 0),
"coverage_rate": _num(coverage.get("match_rate")),
"identity_coverage_rate": _num(coverage.get("match_rate")),
"decision_ready_count": int(
coverage.get("decision_ready_matches")
or coverage.get("fresh_matches")
or coverage.get("fresh_match_count")
or 0
),
"decision_ready_rate": _num(coverage.get("decision_ready_rate")),
"decision_support_count": int(coverage.get("decision_support_count") or 0),
"decision_support_rate": _num(coverage.get("decision_support_rate")),
"decision_support_non_exact_count": int(
coverage.get("decision_support_non_exact_count") or 0
),
"catalog_comparable_count": int(coverage.get("catalog_comparable_count") or 0),
"catalog_comparable_rate": _num(coverage.get("catalog_comparable_rate")),
"catalog_variant_review_count": int(coverage.get("catalog_variant_review_count") or 0),
"catalog_unit_review_count": int(coverage.get("catalog_unit_review_count") or 0),
"catalog_identity_review_count": int(coverage.get("catalog_identity_review_count") or 0),
"catalog_review_plan": coverage.get("catalog_review_plan") or {},
"unit_comparable_count": int(coverage.get("unit_comparable_count") or 0),
"fresh_match_count": int(coverage.get("fresh_matches") or coverage.get("fresh_match_count") or 0),
"fresh_match_rate": _num(coverage.get("fresh_match_rate")),
"stale_match_count": int(coverage.get("stale_matches") or coverage.get("stale_match_count") or 0),
"pending_match_count": int(coverage.get("pending") or coverage.get("pending_match_count") or 0),
"row_count": row_count,
"writes_database": False,
"calls_llm": False,
"fetches_external": False,
}
def _review_decision_brief(engine, limit: int) -> dict:
"""Read the shared review decision brief without blocking host-data preview."""
try:
review_queue = fetch_competitor_review_queue(engine, limit=min(max(limit, 1), 8))
return summarize_review_decision_envelopes(review_queue, limit=min(max(limit, 1), 5))
except Exception:
return summarize_review_decision_envelopes([], limit=min(max(limit, 1), 5))
def _attach_review_brief(payload: dict, review_brief: dict) -> dict:
payload["reviewDecisionBrief"] = review_brief
metadata = payload.setdefault("metadata", {})
metadata["review_queue_count"] = len(review_brief.get("items") or [])
metadata["hitl_count"] = int(review_brief.get("hitl_count") or 0)
metadata["auto_execute_blocked_count"] = int(
review_brief.get("auto_execute_blocked_count") or 0
)
metadata["decision_envelope_source"] = "competitor_intel_repository"
return payload
def build_webcrumbs_marketplace_host_data(engine=None, limit: int = 5) -> dict:
"""Build read-only host data for the Webcrumbs diagnostic page.
Args:
engine: Optional SQLAlchemy engine. When omitted, the default
momo-pro database manager is used.
limit: Maximum number of price-alert rows exposed to the shared-ui
preview. Kept small because this powers a diagnostic surface.
"""
if engine is None:
from database.manager import DatabaseManager
engine = DatabaseManager().engine
limit = max(1, min(int(limit or 5), 8))
raw_risks = fetch_top_competitor_risks(engine, limit=max(limit * 4, 20)) or []
risks = [item for item in raw_risks if _is_direct_price_alert(item)][:limit]
coverage = fetch_competitor_coverage(engine) or {}
review_brief = _review_decision_brief(engine, limit=limit)
if not risks:
payload = _empty_payload("no_current_exact_risk")
payload["metadata"] = _coverage_metadata(coverage, row_count=0)
return _attach_review_brief(payload, review_brief)
rows = []
for item in risks[:limit]:
gap_pct = _num(item.get("gap_pct"))
momo_price = _num(item.get("momo_price"))
pchome_price = _num(item.get("pchome_price"))
rows.append({
"name": f"{item.get('sku') or '-'} {_short_text(item.get('name'))}",
"price": pchome_price,
"change_pct": gap_pct,
"freshness_status": item.get("alert_tier") or "price_alert_exact",
"momo_price": momo_price,
"pchome_price": pchome_price,
"match_score": _num(item.get("match_score")),
"competitor_product_id": item.get("pchome_id") or "",
"competitor_product_name": _short_text(item.get("pchome_name"), 48),
})
top = risks[0]
top_gap = _num(top.get("gap_pct"))
top_score = _num(top.get("match_score"))
sku = str(top.get("sku") or "-")
evidence_refs = ["competitor_prices", "price_records", "exact", "total_price", "price_alert_exact"]
payload = {
"marketSnapshot": rows,
"aiCandidate": {
"ticker": sku,
"name": _short_text(top.get("name"), 58),
"thesis": (
f"MOMO {_money(top.get('momo_price'))} vs PChome {_money(top.get('pchome_price'))}"
f"價差 {top_gap:+.1f}%MOMO - PChome"
f"此候選已通過 exact / total_price / price_alert_exact 只讀過濾,"
f"match_score={top_score:.2f};仍需人工確認促銷、庫存與商品頁條件後再採取價格或曝光調整。"
),
"confidence_score": round(top_score, 2),
"risk_level": _risk_level(top_gap),
"release_status": "review_required",
"evidence_refs": evidence_refs,
"updated_at": top.get("crawled_at") or "latest_competitor_prices",
},
"metadata": _coverage_metadata(coverage, row_count=len(rows)),
}
return _attach_review_brief(payload, review_brief)