Files
ewoooc/services/market_intel/mcp_fetch_run_package.py
OoO 74e19400bd
Some checks failed
CD Pipeline / deploy (push) Has been cancelled
V10.412 add market intel MCP fetch run package gate
2026-05-24 14:00:59 +08:00

477 lines
18 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""市場情報 MCP manual fetch run package preview。
本模組只把已通過的 target review 轉成可審核的人工 run package
不執行 CLI、不發 HTTP request、不寫檔、不開 DB、不掛 scheduler。
"""
from services.market_intel.adapters import get_adapter_registry
from services.market_intel.mcp_fetch_target_review import (
build_mcp_fetch_target_review_preview,
)
MAX_TOTAL_REQUESTS = 20
MAX_STOP_AFTER_ERROR_COUNT = 3
RUN_ARTIFACT_DIR_PREFIX = "artifacts/market_intel/manual_fetch/"
RUN_ACKNOWLEDGEMENT_LABELS = {
"target_review_accepted": "target review 已通過且仍需另開 run package gate",
"command_bundle_reviewed": "操作員已覆核 command argv preview",
"artifact_path_reviewed": "artifact path 限定於 market intel manual fetch 目錄",
"dry_run_first": "第一次人工執行必須 dry-run first",
"receipt_required": "每個公開入口必須貼回 receipt 才能進下一段",
"stop_on_error": "達到錯誤門檻立即停止,不做重試風暴",
"no_api_execution": "本 API 不執行 CLI、不發外部 request",
"no_database_write": "本階段不得寫 DB",
"no_scheduler_attach": "不得掛 scheduler",
}
_BLOCKED_SIDE_EFFECT_KEYS = (
"allow_api_execution",
"allow_database_write",
"allow_external_network_in_api",
"allow_scheduler_attach",
"api_executes_cli",
"api_executes_docker",
"api_executes_health_check",
"api_executes_ssh",
"api_opens_database_connection",
"api_uses_external_network",
"api_writes_database",
"attach_scheduler",
"cli_executed",
"command_executed",
"database_commit_executed",
"database_session_created",
"database_write_executed",
"external_network_executed",
"fetch_executed",
"file_written",
"manual_fetch_gate_opened_by_api",
"network_request_allowed",
"package_artifact_created",
"run_package_persisted",
"scheduler_attached",
"write_database",
"writes_executed",
"would_write_database",
)
def _sample_operator_run_controls(target_review):
return {
"run_label": "market-intel-manual-fetch-sample",
"artifact_dir": (
RUN_ARTIFACT_DIR_PREFIX +
"sample_market_intel_manual_fetch"
),
"max_total_requests": max(
1,
min(
target_review.get("source_target_count", 1),
MAX_TOTAL_REQUESTS,
),
),
"stop_after_error_count": 2,
"requires_shell_operator": True,
"dry_run_first": True,
"write_receipt_required": True,
"allow_api_execution": False,
"allow_external_network_in_api": False,
"allow_database_write": False,
"allow_scheduler_attach": False,
"operator_acknowledgements": {
key: True for key in RUN_ACKNOWLEDGEMENT_LABELS
},
}
def _sample_run_package():
target_preview = build_mcp_fetch_target_review_preview()
target_review_package = target_preview["sample_target_review_package"]
accepted_target_review = build_mcp_fetch_target_review_preview(
handoff_package=target_review_package["handoff_package"],
target_review=target_review_package["target_review"],
)
return {
"target_review_package": target_review_package,
"operator_run_controls": _sample_operator_run_controls(
accepted_target_review
),
}
def _target_review_from_inputs(target_review_package, target_review_result, phase):
if isinstance(target_review_result, dict) and target_review_result:
return target_review_result
target_review_package = (
target_review_package
if isinstance(target_review_package, dict)
else {}
)
return build_mcp_fetch_target_review_preview(
handoff_package=target_review_package.get("handoff_package", {}),
handoff_review=target_review_package.get("handoff_review"),
target_review=target_review_package.get("target_review", {}),
phase=phase,
)
def _blocked_side_effects(payload):
found = []
def visit(value, path):
if isinstance(value, dict):
for key, item in value.items():
key_path = f"{path}.{key}" if path else key
if key in _BLOCKED_SIDE_EFFECT_KEYS and bool(item):
found.append(key_path)
visit(item, key_path)
elif isinstance(value, list):
for index, item in enumerate(value):
visit(item, f"{path}[{index}]")
visit(payload, "")
return found
def _acknowledgement_status(operator_run_controls):
acknowledgements = operator_run_controls.get("operator_acknowledgements", {})
if not isinstance(acknowledgements, dict):
acknowledgements = {}
return {
key: {
"label": label,
"acknowledged": bool(acknowledgements.get(key)),
}
for key, label in RUN_ACKNOWLEDGEMENT_LABELS.items()
}
def _to_positive_int(value):
try:
parsed = int(value)
except (TypeError, ValueError):
return None
if parsed <= 0:
return None
return parsed
def _safe_artifact_dir(value):
if not isinstance(value, str):
return False
normalized = value.strip().replace("\\", "/")
if not normalized or normalized.startswith("/"):
return False
parts = [part for part in normalized.split("/") if part]
if any(part in (".", "..") for part in parts):
return False
return normalized.startswith(RUN_ARTIFACT_DIR_PREFIX)
def _source_index():
index = {}
for adapter in get_adapter_registry().values():
for source in adapter.campaign_sources():
index[(adapter.platform_code, source.source_key)] = {
**source.to_dict(),
"platform_code": adapter.platform_code,
"platform_name": adapter.platform_name,
}
return index
def _receipt_path(artifact_dir, platform_code, source_key):
safe_platform = str(platform_code).replace("/", "_")
safe_source = str(source_key).replace("/", "_")
return f"{artifact_dir}/{safe_platform}_{safe_source}_receipt.json"
def _command_previews(target_review, operator_run_controls):
source_index = _source_index()
artifact_dir = operator_run_controls.get("artifact_dir") or ""
command_previews = []
for target in target_review.get("target_summaries", []):
if not target.get("ready"):
continue
platform_code = target.get("platform_code")
for source_key in target.get("source_keys", []):
source = source_index.get((platform_code, source_key), {})
receipt_path = _receipt_path(artifact_dir, platform_code, source_key)
command_previews.append(
{
"platform_code": platform_code,
"platform_name": target.get("platform_name"),
"source_key": source_key,
"source_url": source.get("url"),
"campaign_type": source.get("campaign_type"),
"delay_seconds": target.get("delay_seconds"),
"timeout_seconds": target.get("timeout_seconds"),
"sample_limit": target.get("sample_limit"),
"receipt_path": receipt_path,
"argv_preview": [
"python",
"scripts/market_intel_manual_fetch_runner.py",
"--platform",
platform_code,
"--source",
source_key,
"--source-url",
source.get("url"),
"--delay",
str(target.get("delay_seconds")),
"--timeout",
str(target.get("timeout_seconds")),
"--sample-limit",
str(target.get("sample_limit")),
"--receipt-path",
receipt_path,
"--dry-run-first",
],
"command_executed": False,
"external_network_executed_by_api": False,
"receipt_written_by_api": False,
}
)
return command_previews
def _run_controls_payload(operator_run_controls):
return operator_run_controls if isinstance(operator_run_controls, dict) else {}
def build_mcp_fetch_run_package_preview(
*,
target_review_package=None,
target_review_result=None,
operator_run_controls=None,
phase=None,
):
"""建立 manual fetch run package review只輸出 command preview。"""
target_review_package = (
target_review_package
if isinstance(target_review_package, dict)
else {}
)
operator_controls = _run_controls_payload(operator_run_controls)
target_review = _target_review_from_inputs(
target_review_package,
target_review_result,
phase,
)
run_payload_received = bool(
target_review_package or target_review_result or operator_run_controls
)
target_review_received = bool(target_review_package or target_review_result)
target_review_accepted = bool(
target_review.get("mcp_fetch_target_review_accepted")
)
controls_received = bool(operator_controls)
acknowledgements = _acknowledgement_status(operator_controls)
acknowledgements_complete = all(
item["acknowledged"] for item in acknowledgements.values()
)
artifact_dir = operator_controls.get("artifact_dir")
max_total_requests = _to_positive_int(
operator_controls.get("max_total_requests")
)
stop_after_error_count = _to_positive_int(
operator_controls.get("stop_after_error_count")
)
command_previews = _command_previews(target_review, operator_controls)
side_effects = _blocked_side_effects(operator_controls) + _blocked_side_effects(
target_review
)
no_api_execution = bool(
not operator_controls.get("allow_api_execution")
and not operator_controls.get("allow_external_network_in_api")
and not operator_controls.get("api_executes_cli")
and not operator_controls.get("api_uses_external_network")
and not operator_controls.get("fetch_executed")
and not operator_controls.get("network_request_allowed")
)
no_database_write = bool(
not operator_controls.get("allow_database_write")
and not operator_controls.get("write_database")
and not operator_controls.get("api_writes_database")
and not operator_controls.get("database_write_executed")
)
no_scheduler_attach = bool(
not operator_controls.get("allow_scheduler_attach")
and not operator_controls.get("attach_scheduler")
and not operator_controls.get("scheduler_attached")
)
command_budget_ok = bool(
max_total_requests is not None
and max_total_requests <= MAX_TOTAL_REQUESTS
and max_total_requests <= max(1, len(command_previews))
)
error_stop_ok = bool(
stop_after_error_count is not None
and stop_after_error_count <= MAX_STOP_AFTER_ERROR_COUNT
)
gates = [
{
"key": "target_review_payload_or_result_received",
"passed": target_review_received,
"label": "已提供 target review package 或已審核結果",
},
{
"key": "target_review_accepted",
"passed": target_review_accepted,
"label": "target review 已通過,平台與來源白名單可供 run package 使用",
},
{
"key": "operator_run_controls_received",
"passed": controls_received,
"label": "已提供操作員 run controls",
},
{
"key": "operator_acknowledgements_complete",
"passed": acknowledgements_complete,
"label": "操作員已確認 command、artifact、dry-run、receipt 與無副作用邊界",
},
{
"key": "artifact_dir_safe",
"passed": _safe_artifact_dir(artifact_dir),
"label": "artifact_dir 必須在 market intel manual fetch 相對目錄內",
},
{
"key": "command_preview_built_from_targets",
"passed": bool(command_previews),
"label": "command preview 只由已通過的 target summaries 生成",
},
{
"key": "request_budget_within_target_limit",
"passed": command_budget_ok,
"label": "max_total_requests 不得超過來源數與全域安全上限",
},
{
"key": "stop_after_error_count_safe",
"passed": error_stop_ok,
"label": "錯誤停止門檻需維持小於等於 3",
},
{
"key": "dry_run_first_confirmed",
"passed": operator_controls.get("dry_run_first") is True,
"label": "人工執行必須先 dry-run first",
},
{
"key": "receipt_required",
"passed": operator_controls.get("write_receipt_required") is True,
"label": "每個來源必須產生 receipt回貼後才可進下一段",
},
{
"key": "shell_operator_required",
"passed": operator_controls.get("requires_shell_operator") is True,
"label": "正式抓取只能由操作員 shell 命令執行,不由 API 執行",
},
{
"key": "no_api_fetch_execution",
"passed": no_api_execution,
"label": "本 API 不執行 CLI、不發外部 request",
},
{
"key": "no_database_write",
"passed": no_database_write,
"label": "本階段不寫 DB、不 commit",
},
{
"key": "no_scheduler_attach",
"passed": no_scheduler_attach,
"label": "本階段不掛 scheduler",
},
{
"key": "run_package_side_effect_free",
"passed": not side_effects,
"label": "run package review 不執行 CLI、不寫檔、不連外、不開 DB",
},
]
gates.extend(
{
"key": f"ack_{key}",
"passed": item["acknowledged"],
"label": item["label"],
}
for key, item in acknowledgements.items()
)
blocked_reasons = [
gate["key"] for gate in gates
if not gate["passed"]
]
accepted = bool(run_payload_received and not blocked_reasons)
return {
"mode": (
"mcp_fetch_run_package_review"
if run_payload_received
else "mcp_fetch_run_package_preview"
),
"phase": phase,
"run_payload_received": run_payload_received,
"target_review_received": target_review_received,
"target_review_accepted": target_review_accepted,
"operator_run_controls_received": controls_received,
"operator_acknowledgements_complete": acknowledgements_complete,
"mcp_fetch_run_package_accepted": accepted,
"ready_for_manual_fetch_run_readiness_review": accepted,
"ready_for_manual_fetch_operator_run": False,
"manual_fetch_gate_opened_by_api": False,
"network_request_allowed": False,
"fetch_executed": False,
"cli_executed": False,
"database_write_executed": False,
"scheduler_attached": False,
"command_preview_count": len(command_previews),
"max_total_requests": max_total_requests,
"stop_after_error_count": stop_after_error_count,
"artifact_dir": artifact_dir,
"artifact_dir_safe": _safe_artifact_dir(artifact_dir),
"blocked_side_effects": side_effects,
"gate_count": len(gates),
"passed_gate_count": sum(1 for gate in gates if gate["passed"]),
"blocked_reasons": blocked_reasons,
"gates": gates,
"operator_acknowledgement_status": acknowledgements,
"command_previews": command_previews,
"target_review_summary": {
"mode": target_review.get("mode"),
"accepted": target_review_accepted,
"platform_target_count": target_review.get("platform_target_count", 0),
"source_target_count": target_review.get("source_target_count", 0),
"blocked_reasons": target_review.get("blocked_reasons", []),
"ready_for_manual_fetch_run_package_review": bool(
target_review.get("ready_for_manual_fetch_run_package_review")
),
},
"mcp_fetch_target_review": target_review,
"sample_run_package": _sample_run_package(),
"next_operator_steps": [
"若 run package review 通過,只代表可進下一段 run readiness不代表 API 可執行抓取",
"操作員需在 shell 依 argv preview 人工執行,並貼回 receipt 供下一段審核",
"正式 DB write、scheduler attach、Telegram/AI 摘要仍需各自獨立 gate",
],
"payload_persisted": False,
"run_package_persisted": False,
"command_preview_persisted": False,
"package_artifact_created": False,
"api_executes_health_check": False,
"api_executes_docker": False,
"api_executes_ssh": False,
"api_executes_cli": False,
"api_opens_database_connection": False,
"api_writes_database": False,
"api_uses_external_network": False,
"database_session_created": False,
"database_commit_executed": False,
"external_network_executed": False,
"file_written": False,
"writes_executed": False,
"would_write_database": False,
}