Files
ewoooc/services/market_intel/candidate_preview.py
OoO 921e9eeb15
All checks were successful
CD Pipeline / deploy (push) Successful in 1m6s
feat(market-intel): gate manual fetch behind mcp readiness
2026-05-18 15:40:56 +08:00

86 lines
3.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""市場情報候選連結 preview 聚合。
只整理本次 diagnostics 結果供人工審核,不建立 campaign/product不寫 DB。
"""
BAND_RANK = {
"high": 3,
"medium": 2,
"low": 1,
}
def _band_allowed(candidate, min_band):
if not min_band or min_band == "all":
return True
candidate_rank = BAND_RANK.get(candidate.get("confidence_band"), 0)
min_rank = BAND_RANK.get(min_band, 0)
return candidate_rank >= min_rank
def build_candidate_preview_from_discovery(discovery_result, *, min_band="all", limit=50):
"""把 manual discovery diagnostics 整理成人工審核 preview。"""
candidates = []
run_statuses = []
for run in discovery_result.get("runs", []):
platform_code = run.get("platform_code")
run_statuses.append({
"platform_code": platform_code,
"status": run.get("status"),
"sources_planned": run.get("sources_planned", 0),
"sources_fetched": run.get("sources_fetched", 0),
"errors": run.get("errors", 0),
})
for source_result in run.get("results", []):
diagnostics = source_result.get("diagnostics") or {}
for candidate in diagnostics.get("campaign_link_candidates", []):
if not _band_allowed(candidate, min_band):
continue
candidates.append({
"platform_code": platform_code,
"source_key": source_result.get("source_key"),
"source_name": source_result.get("name"),
"source_url": source_result.get("url"),
"source_status": source_result.get("status"),
"page_title": diagnostics.get("title"),
"page_hash": diagnostics.get("page_hash"),
"href": candidate.get("href"),
"text": candidate.get("text"),
"is_same_host": candidate.get("is_same_host"),
"score": candidate.get("score", 0),
"generic_score": candidate.get("generic_score", 0),
"platform_score": candidate.get("platform_score", 0),
"confidence_band": candidate.get("confidence_band"),
"confidence_reason": candidate.get("confidence_reason"),
})
candidates = sorted(
candidates,
key=lambda item: (
BAND_RANK.get(item.get("confidence_band"), 0),
item.get("score", 0),
item.get("platform_score", 0),
),
reverse=True,
)
return {
"platform_code": discovery_result.get("platform_code", "all"),
"fetch_requested": bool(discovery_result.get("fetch_requested")),
"manual_fetch_allowed": bool(discovery_result.get("manual_fetch_allowed")),
"mcp_fetch_gate": discovery_result.get("mcp_fetch_gate"),
"mcp_fetch_gate_open": bool(
(discovery_result.get("mcp_fetch_gate") or {}).get("manual_fetch_gate_open")
),
"min_band": min_band or "all",
"limit": limit,
"candidate_count": len(candidates),
"candidates": candidates[:limit],
"run_statuses": run_statuses,
"database_write_allowed": False,
"scheduler_attached": False,
}