Files
ewoooc/services/market_intel/service.py
OoO 07b76870c9
All checks were successful
CD Pipeline / deploy (push) Successful in 1m1s
feat(market-intel): add internal mcp contract
2026-05-18 14:42:25 +08:00

580 lines
23 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""市場情報 Phase 1 骨架服務。
本階段只回報 feature flag 與 rollout 狀態,不啟動爬蟲、不寫資料庫。
"""
from dataclasses import asdict, dataclass
from datetime import datetime, timedelta, timezone
from uuid import uuid4
from config import (
MARKET_INTEL_CRAWLER_ENABLED,
MARKET_INTEL_ENABLED,
MARKET_INTEL_WRITE_ENABLED,
)
from services.market_intel.adapters import (
get_adapter,
get_adapter_registry,
get_adapter_summaries,
)
from services.market_intel.candidate_preview import build_candidate_preview_from_discovery
from services.market_intel.discovery_runner import ManualDiscoveryRunner
from services.market_intel.legacy_source_bridge import build_legacy_source_bridge_plan
from services.market_intel.mcp_contract import build_mcp_tool_contract_preview
from services.market_intel.mcp_readiness import build_mcp_readiness_plan
from services.market_intel.migration_blueprint import build_migration_blueprint
from services.market_intel.platform_seed import build_platform_seed_rows
from services.market_intel.platform_seed_db_diff import build_platform_seed_db_diff_plan
from services.market_intel.platform_seed_writer import (
build_platform_seed_writer_plan,
build_schema_smoke,
)
from services.market_intel.seed_writer_cli import build_seed_writer_cli_plan
from services.market_intel.schema_db_probe import build_schema_db_probe_plan
from services.market_intel.write_approval_runbook import build_write_approval_runbook
TAIPEI_TZ = timezone(timedelta(hours=8))
MARKET_INTEL_TABLES = (
"market_platforms",
"market_campaigns",
"market_campaign_snapshots",
"market_campaign_products",
"market_product_price_history",
"market_product_matches",
"market_crawler_runs",
)
@dataclass(frozen=True)
class MarketIntelRuntimeStatus:
"""市場情報模組目前的啟用狀態。"""
phase: str
enabled: bool
crawler_enabled: bool
write_enabled: bool
dry_run_only: bool
scheduler_attached: bool
database_write_allowed: bool
def to_dict(self):
return asdict(self)
class MarketIntelService:
"""市場情報入口服務,先集中 feature gate 與安全狀態。"""
phase = "phase_29_internal_mcp_contract_preview"
def get_runtime_status(self) -> MarketIntelRuntimeStatus:
return MarketIntelRuntimeStatus(
phase=self.phase,
enabled=MARKET_INTEL_ENABLED,
crawler_enabled=MARKET_INTEL_CRAWLER_ENABLED,
write_enabled=MARKET_INTEL_WRITE_ENABLED,
dry_run_only=not MARKET_INTEL_WRITE_ENABLED,
scheduler_attached=False,
database_write_allowed=(
MARKET_INTEL_ENABLED
and MARKET_INTEL_CRAWLER_ENABLED
and MARKET_INTEL_WRITE_ENABLED
),
)
def manual_fetch_allowed(self):
status = self.get_runtime_status()
return bool(status.enabled and status.crawler_enabled)
def get_schema_tables(self):
"""回傳 ADR-035 定義的 market_* schema 名稱。"""
return list(MARKET_INTEL_TABLES)
def get_adapter_summaries(self):
"""回傳目前已註冊 adapter不觸發網路。"""
return get_adapter_summaries()
def build_dry_run_plan(self, platform_code="all"):
"""建立 dry-run 計畫,不執行爬蟲、不寫 DB。"""
status = self.get_runtime_status()
adapter_registry = get_adapter_registry()
return {
"batch_id": f"market-dry-run-{uuid4().hex[:12]}",
"platform_code": platform_code,
"created_at": datetime.now(TAIPEI_TZ).replace(tzinfo=None).isoformat(),
"phase": self.phase,
"would_discover_campaigns": bool(status.enabled and status.crawler_enabled),
"would_write_database": bool(status.database_write_allowed),
"scheduler_attached": status.scheduler_attached,
"manual_fetch_allowed": self.manual_fetch_allowed(),
"schema_tables": self.get_schema_tables(),
"adapter_count": len(adapter_registry),
"adapters": [adapter.summary() for adapter in adapter_registry.values()],
"status": status.to_dict(),
}
def build_discovery_plan(self, platform_code="all"):
"""建立平台 discovery dry-run plan不發 request、不寫 DB。"""
if platform_code and platform_code != "all":
adapter = get_adapter(platform_code)
if not adapter:
return {
"platform_code": platform_code,
"found": False,
"plans": [],
"error": "未知平台 adapter",
}
return {
"platform_code": platform_code,
"found": True,
"plans": [adapter.build_discovery_plan()],
}
return {
"platform_code": "all",
"found": True,
"plans": [
adapter.build_discovery_plan()
for adapter in get_adapter_registry().values()
],
}
def run_manual_discovery(self, platform_code="all", *, fetch=False, http_get=None):
"""手動執行 discovery dry-run預設不發 request永遠不寫 DB。"""
registry = get_adapter_registry()
adapters = []
if platform_code and platform_code != "all":
adapter = get_adapter(platform_code)
if not adapter:
return {
"platform_code": platform_code,
"found": False,
"runs": [],
"error": "未知平台 adapter",
}
adapters = [adapter]
else:
adapters = list(registry.values())
runner = ManualDiscoveryRunner(
runtime_status=self.get_runtime_status(),
http_get=http_get,
)
return {
"platform_code": platform_code or "all",
"found": True,
"fetch_requested": bool(fetch),
"manual_fetch_allowed": self.manual_fetch_allowed(),
"runs": [
runner.run(adapter, fetch=fetch).to_dict()
for adapter in adapters
],
}
def build_candidate_preview(
self,
platform_code="all",
*,
fetch=False,
min_band="all",
limit=50,
http_get=None,
):
"""聚合候選連結 preview只供人工審核不寫 DB。"""
discovery_result = self.run_manual_discovery(
platform_code=platform_code,
fetch=fetch,
http_get=http_get,
)
preview = build_candidate_preview_from_discovery(
discovery_result,
min_band=min_band,
limit=limit,
)
preview["discovery_found"] = bool(discovery_result.get("found"))
preview["error"] = discovery_result.get("error")
return preview
def build_platform_seed_plan(self, platform_code="all"):
"""建立 market_platforms 初始資料計畫,不寫入 DB。"""
status = self.get_runtime_status()
seed_rows = build_platform_seed_rows(platform_code=platform_code)
found = bool(seed_rows) or platform_code in (None, "", "all")
return {
"platform_code": platform_code or "all",
"found": found,
"phase": self.phase,
"seed_count": len(seed_rows),
"seeds": seed_rows,
"would_write_database": False,
"database_write_allowed": bool(status.database_write_allowed),
"required_gates": {
"market_intel_enabled": bool(status.enabled),
"market_intel_write_enabled": bool(status.write_enabled),
"schema_smoke_required": True,
"migration_required": True,
"manual_operator_approval_required": True,
},
"status": status.to_dict(),
"error": None if found else "未知平台 adapter",
}
def build_platform_seed_write_guard(self, platform_code="all"):
"""回報 platform seed 寫入前置 gate本方法不執行寫入。"""
status = self.get_runtime_status()
seed_plan = self.build_platform_seed_plan(platform_code=platform_code)
schema_smoke = build_schema_smoke(MARKET_INTEL_TABLES)
guard_checks = {
"seed_plan_found": bool(seed_plan["found"]),
"has_seed_rows": bool(seed_plan["seed_count"]),
"market_intel_enabled": bool(status.enabled),
"market_intel_write_enabled": bool(status.write_enabled),
"database_write_allowed": bool(status.database_write_allowed),
"migration_confirmed": False,
"schema_smoke_confirmed": bool(schema_smoke["passed"]),
"manual_operator_approval": False,
}
blocked_reasons = [
name for name, passed in guard_checks.items()
if not passed
]
return {
"platform_code": platform_code or "all",
"phase": self.phase,
"seed_count": seed_plan["seed_count"],
"ready_to_write": False,
"would_write_database": False,
"database_write_allowed": bool(status.database_write_allowed),
"guard_checks": guard_checks,
"blocked_reasons": blocked_reasons,
"write_action": "blocked_dry_run_only",
"schema_smoke": schema_smoke,
"seed_plan": seed_plan,
}
def build_schema_smoke(self):
"""回報 market_intel ORM metadata smoke 結果,不查詢 DB。"""
return {
"phase": self.phase,
"schema_smoke": build_schema_smoke(MARKET_INTEL_TABLES),
"expected_tables": self.get_schema_tables(),
}
def build_schema_db_probe(self, *, execute_requested=False):
"""回報正式 DB schema 只讀探針;預設不連 DB。"""
probe = build_schema_db_probe_plan(
MARKET_INTEL_TABLES,
execute_requested=execute_requested,
)
probe["phase"] = self.phase
return probe
def build_platform_seed_db_diff(self, platform_code="all", *, execute_requested=False):
"""回報 platform seed 與 DB 的只讀差異;預設不連 DB。"""
seed_plan = self.build_platform_seed_plan(platform_code=platform_code)
diff = build_platform_seed_db_diff_plan(
seed_plan,
execute_requested=execute_requested,
)
diff["phase"] = self.phase
diff["platform_code"] = platform_code or "all"
diff["seed_plan_found"] = bool(seed_plan["found"])
return diff
def build_legacy_source_bridge(
self,
*,
execute_requested=False,
sample_limit=5,
engine=None,
database_url=None,
database_type=None,
):
"""回報既有 EDM/PChome 資料源橋接 preview預設不連 DB。"""
bridge = build_legacy_source_bridge_plan(
execute_requested=execute_requested,
sample_limit=sample_limit,
engine=engine,
database_url=database_url,
database_type=database_type,
)
bridge["phase"] = self.phase
return bridge
def build_mcp_readiness(
self,
*,
execute_requested=False,
timeout_sec=3,
engine=None,
database_url=None,
database_type=None,
):
"""回報市場情報 MCP 整合就緒度;預設不連 MCP server、不查 DB。"""
readiness = build_mcp_readiness_plan(
execute_requested=execute_requested,
timeout_sec=timeout_sec,
engine=engine,
database_url=database_url,
database_type=database_type,
)
readiness["phase"] = self.phase
return readiness
def build_mcp_tool_contract(self):
"""回報市場情報內部 MCP tool contract不呼叫 MCP、不查 DB。"""
contract = build_mcp_tool_contract_preview()
contract["phase"] = self.phase
return contract
def build_platform_seed_writer_plan(self, platform_code="all"):
"""建立 platform seed writer dry-run plan不建立 DB session。"""
seed_plan = self.build_platform_seed_plan(platform_code=platform_code)
write_guard = self.build_platform_seed_write_guard(platform_code=platform_code)
schema_smoke = write_guard["schema_smoke"]
writer_plan = build_platform_seed_writer_plan(
seed_plan=seed_plan,
write_guard=write_guard,
schema_smoke=schema_smoke,
)
writer_plan["phase"] = self.phase
writer_plan["platform_code"] = platform_code or "all"
writer_plan["seed_plan_found"] = bool(seed_plan["found"])
writer_plan["seed_count"] = seed_plan["seed_count"]
return writer_plan
def build_write_approval_runbook(self, platform_code="all"):
"""建立正式 seed writer 前的人工批准 runbook本方法不執行寫入。"""
status = self.get_runtime_status()
seed_plan = self.build_platform_seed_plan(platform_code=platform_code)
write_guard = self.build_platform_seed_write_guard(platform_code=platform_code)
writer_plan = self.build_platform_seed_writer_plan(platform_code=platform_code)
return build_write_approval_runbook(
phase=self.phase,
status=status,
schema_smoke=write_guard["schema_smoke"],
seed_plan=seed_plan,
write_guard=write_guard,
writer_plan=writer_plan,
)
def build_migration_blueprint(self):
"""建立 market_intel migration 與 seed writer 命令草案;不執行 SQL。"""
blueprint = build_migration_blueprint(self.get_schema_tables())
blueprint["phase"] = self.phase
return blueprint
def build_seed_writer_cli_status(
self,
platform_code="all",
*,
execute_requested=False,
approval_token=None,
approval_token_secret=None,
apply_real_write=False,
engine=None,
database_url=None,
database_type=None,
):
"""建立 seed writer CLI status只有 CLI 明確批准時才寫入。"""
seed_plan = self.build_platform_seed_plan(platform_code=platform_code)
write_guard = self.build_platform_seed_write_guard(platform_code=platform_code)
writer_plan = self.build_platform_seed_writer_plan(platform_code=platform_code)
migration_blueprint = self.build_migration_blueprint()
status = build_seed_writer_cli_plan(
platform_code=platform_code or "all",
execute_requested=execute_requested,
approval_token=approval_token,
approval_token_secret=approval_token_secret,
apply_real_write=apply_real_write,
seed_plan=seed_plan,
write_guard=write_guard,
writer_plan=writer_plan,
migration_blueprint=migration_blueprint,
engine=engine,
database_url=database_url,
database_type=database_type,
)
status["phase"] = self.phase
return status
def build_deployment_readiness(self):
"""建立市場情報推版準備狀態;不執行 git、部署或遠端操作。"""
status = self.get_runtime_status()
schema_smoke = build_schema_smoke(MARKET_INTEL_TABLES)
writer_plan = self.build_platform_seed_writer_plan()
checks = {
"schema_smoke_passed": bool(schema_smoke["passed"]),
"feature_flags_default_safe": bool(
not status.enabled
and not status.crawler_enabled
and not status.write_enabled
),
"database_write_blocked": bool(not status.database_write_allowed),
"scheduler_detached": bool(not status.scheduler_attached),
"manual_fetch_disabled": bool(not self.manual_fetch_allowed()),
"writer_plan_dry_run_only": bool(
writer_plan["mode"] == "dry_run"
and not writer_plan["writes_executed"]
and not writer_plan["would_write_database"]
),
"registered_adapters_present": bool(len(self.get_adapter_summaries()) >= 4),
"schema_db_probe_planned_safe": bool(
not self.build_schema_db_probe()["read_only_query_executed"]
),
"platform_seed_db_diff_planned_safe": bool(
not self.build_platform_seed_db_diff()["read_only_query_executed"]
),
"legacy_source_bridge_planned_safe": bool(
not self.build_legacy_source_bridge()["read_only_query_executed"]
),
"mcp_readiness_planned_safe": bool(
self.build_mcp_readiness()["mode"] == "mcp_readiness_planned"
),
"mcp_tool_contract_ready": bool(
self.build_mcp_tool_contract()["contract_ready"]
),
}
ready_for_production_deploy = all(checks.values())
blocked_reasons = [
reason for reason, blocked in (
("readiness_checks_not_all_passed", not ready_for_production_deploy),
("production_deploy_not_executed_by_api", True),
("git_commit_not_created_by_api", True),
("git_push_not_executed_by_api", True),
("backup_must_be_verified_by_operator", True),
("production_smoke_must_be_verified_by_operator", True),
)
if blocked
]
required_manual_steps = [
{
"key": "review_worktree_scope",
"label": "審核 worktree只納入市場情報相關變更排除 unrelated dirty files",
"status": "required",
},
{
"key": "run_backup_system",
"label": "重大更新前執行 python backup_system.py",
"status": "required",
},
{
"key": "commit_market_intel_changes_only",
"label": "只 commit 市場情報模組、ADR/TODO 與必要測試",
"status": "operator_optional",
},
{
"key": "push_reviewed_branch_or_main",
"label": "推送已審核分支或 main再進入部署 SOP",
"status": "operator_optional",
},
{
"key": "run_deployment_sop",
"label": "依 deployment SOP app-only 部署,不碰 momo-db",
"status": "required",
},
{
"key": "verify_health_endpoint",
"label": "部署後先驗證 /health不使用首頁作為探測",
"status": "required",
},
{
"key": "verify_market_intel_page_after_deploy",
"label": "驗證 /market_intel 與市場情報 API 仍維持 blocked dry-run",
"status": "required",
},
]
fallback_plan = [
{
"key": "feature_flag_kill_switch",
"label": "MARKET_INTEL_ENABLED、MARKET_INTEL_CRAWLER_ENABLED、MARKET_INTEL_WRITE_ENABLED 保持全關,可立即停用新功能面",
"trigger": "任何 UI/API 異常或非預期連外行為",
},
{
"key": "app_only_rollback",
"label": "回退到上一個已知正常版本後,只 recreate momo-app避免影響 momo-db 資料生命週期",
"trigger": "部署後 /health 或 /market_intel smoke 失敗",
},
{
"key": "scheduler_detached",
"label": "市場情報 scheduler 尚未掛載;異常時不需停爬蟲排程,因為本階段沒有排程入口",
"trigger": "排程或外部流量疑慮",
},
{
"key": "database_write_blocked",
"label": "writer 仍是 preview_only_no_session_no_commit異常時不需要 DB rollback",
"trigger": "seed writer 或 schema smoke 異常",
},
]
safe_deploy_boundaries = [
{
"key": "no_remove_orphans",
"label": "禁止使用 docker compose --remove-orphans",
},
{
"key": "no_momo_db_lifecycle_change",
"label": "禁止 stop/rm/recreate momo-db 或變更資料生命週期",
},
{
"key": "health_probe_only",
"label": "HTTP health / blackbox / CD 探測只打 /health",
},
{
"key": "flags_default_off",
"label": "市場情報三個 feature flags 預設維持 OFF",
},
]
return {
"phase": self.phase,
"mode": "app_only_release_gate",
"production_deployed": False,
"git_committed": False,
"git_pushed": False,
"ready_for_production_deploy": ready_for_production_deploy,
"deployment_actions_executed": False,
"execution_boundary": {
"api_executes_git": False,
"api_executes_backup": False,
"api_executes_scp": False,
"api_executes_ssh": False,
"api_recreates_container": False,
"api_runs_migration": False,
"api_writes_database": False,
},
"checks": checks,
"blocked_reasons": blocked_reasons,
"requires_backup_before_major_update": True,
"backup_command": "python backup_system.py",
"required_manual_steps": required_manual_steps,
"fallback_plan": fallback_plan,
"safe_deploy_boundaries": safe_deploy_boundaries,
"production_smoke_targets": [
"/health",
"/market_intel",
"/api/market_intel/status",
"/api/market_intel/deployment_readiness",
"/api/market_intel/schema_smoke",
"/api/market_intel/schema_db_probe",
"/api/market_intel/platform_seed_db_diff",
"/api/market_intel/legacy_source_bridge",
"/api/market_intel/mcp_readiness",
"/api/market_intel/mcp_tool_contract",
],
"status": status.to_dict(),
"schema_smoke": schema_smoke,
"writer_plan_summary": {
"operation_count": writer_plan["operation_count"],
"writes_executed": writer_plan["writes_executed"],
"would_write_database": writer_plan["would_write_database"],
},
"write_approval_runbook": self.build_write_approval_runbook(),
"migration_blueprint": self.build_migration_blueprint(),
"seed_writer_cli_status": self.build_seed_writer_cli_status(),
"schema_db_probe": self.build_schema_db_probe(),
"platform_seed_db_diff": self.build_platform_seed_db_diff(),
"legacy_source_bridge": self.build_legacy_source_bridge(),
"mcp_readiness": self.build_mcp_readiness(),
"mcp_tool_contract": self.build_mcp_tool_contract(),
}