Files
ewoooc/services/market_intel/discovery_runner.py
OoO 921e9eeb15
All checks were successful
CD Pipeline / deploy (push) Successful in 1m6s
feat(market-intel): gate manual fetch behind mcp readiness
2026-05-18 15:40:56 +08:00

234 lines
7.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""市場情報手動 discovery dry-run runner。
預設不發 HTTP request即使手動 fetch也只做公開頁面探測與摘要不寫 DB。
"""
import hashlib
import time
from dataclasses import asdict, dataclass
from datetime import datetime, timedelta, timezone
from typing import Callable, Optional
from uuid import uuid4
import requests
from services.market_intel.html_diagnostics import parse_html_diagnostics
TAIPEI_TZ = timezone(timedelta(hours=8))
DEFAULT_HEADERS = {
"User-Agent": "EwoooC-MarketIntel-DryRun/1.0 (+https://mo.wooo.work)",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "zh-TW,zh;q=0.9,en;q=0.8",
}
@dataclass(frozen=True)
class DiscoverySourceResult:
"""單一活動入口的 dry-run 結果。"""
source_key: str
name: str
url: str
campaign_type: str
status: str
network_requested: bool
network_executed: bool
status_code: Optional[int] = None
content_length: int = 0
page_hash: Optional[str] = None
title: Optional[str] = None
diagnostics: Optional[dict] = None
error_message: Optional[str] = None
def to_dict(self):
return asdict(self)
@dataclass(frozen=True)
class ManualDiscoveryRunResult:
"""手動 discovery dry-run 的整體結果。"""
batch_id: str
platform_code: str
started_at: str
finished_at: str
status: str
fetch_requested: bool
network_allowed: bool
database_write_allowed: bool
scheduler_attached: bool
sources_planned: int
sources_fetched: int
errors: int
results: list
error_message: Optional[str] = None
network_gate: Optional[dict] = None
def to_dict(self):
return asdict(self)
def _now_iso():
return datetime.now(TAIPEI_TZ).replace(tzinfo=None).isoformat()
class ManualDiscoveryRunner:
"""手動 discovery runner透過 feature gate 控制是否允許網路探測。"""
def __init__(
self,
*,
runtime_status,
http_get: Optional[Callable] = None,
network_allowed_override: Optional[bool] = None,
network_gate: Optional[dict] = None,
):
self.runtime_status = runtime_status
self.http_get = http_get or requests.get
self.network_allowed_override = network_allowed_override
self.network_gate = network_gate
def _network_allowed(self):
if self.network_allowed_override is not None:
return bool(self.network_allowed_override)
return bool(self.runtime_status.enabled and self.runtime_status.crawler_enabled)
def run(self, adapter, *, fetch=False):
started_at = _now_iso()
sources = list(adapter.campaign_sources())
network_allowed = self._network_allowed()
if fetch and not network_allowed:
return ManualDiscoveryRunResult(
batch_id=f"market-manual-{uuid4().hex[:12]}",
platform_code=adapter.platform_code,
started_at=started_at,
finished_at=_now_iso(),
status="blocked",
fetch_requested=True,
network_allowed=False,
database_write_allowed=False,
scheduler_attached=False,
sources_planned=len(sources),
sources_fetched=0,
errors=0,
results=[
self._source_result(source, "blocked", True, False).to_dict()
for source in sources
],
error_message=(
self.network_gate.get("operator_message")
if self.network_gate
else "MARKET_INTEL_ENABLED 與 MARKET_INTEL_CRAWLER_ENABLED 必須同時開啟才允許手動 fetch"
),
network_gate=self.network_gate,
)
capped_sources = sources[:adapter.safety_policy.max_pages_per_run]
results = []
errors = 0
fetched = 0
last_request_at = 0.0
for source in capped_sources:
if not fetch:
results.append(self._source_result(source, "planned", False, False).to_dict())
continue
elapsed = time.time() - last_request_at
if last_request_at and elapsed < adapter.safety_policy.request_interval_sec:
time.sleep(adapter.safety_policy.request_interval_sec - elapsed)
try:
response = self.http_get(
source.url,
headers=DEFAULT_HEADERS,
timeout=adapter.safety_policy.timeout_sec,
)
last_request_at = time.time()
text = response.text or ""
fetched += 1
results.append(
self._source_result(
source,
"fetched",
True,
True,
status_code=getattr(response, "status_code", None),
content=text,
score_link=adapter.score_campaign_link,
).to_dict()
)
except Exception as exc:
last_request_at = time.time()
errors += 1
results.append(
self._source_result(
source,
"failed",
True,
True,
error_message=str(exc),
).to_dict()
)
status = "planned"
if fetch:
status = "success" if errors == 0 else "partial_failed"
return ManualDiscoveryRunResult(
batch_id=f"market-manual-{uuid4().hex[:12]}",
platform_code=adapter.platform_code,
started_at=started_at,
finished_at=_now_iso(),
status=status,
fetch_requested=bool(fetch),
network_allowed=network_allowed,
database_write_allowed=False,
scheduler_attached=False,
sources_planned=len(sources),
sources_fetched=fetched,
errors=errors,
results=results,
network_gate=self.network_gate,
)
def _source_result(
self,
source,
status,
network_requested,
network_executed,
*,
status_code=None,
content=None,
score_link=None,
error_message=None,
):
content = content or ""
diagnostics = (
parse_html_diagnostics(
content,
base_url=source.url,
score_link=score_link,
).to_dict()
if content
else None
)
return DiscoverySourceResult(
source_key=source.source_key,
name=source.name,
url=source.url,
campaign_type=source.campaign_type,
status=status,
network_requested=network_requested,
network_executed=network_executed,
status_code=status_code,
content_length=len(content),
page_hash=hashlib.sha256(content.encode("utf-8", errors="ignore")).hexdigest() if content else None,
title=diagnostics.get("title") if diagnostics else None,
diagnostics=diagnostics,
error_message=error_message,
)