Files
ewoooc/services/market_intel/mcp_fetch_run_receipt.py
OoO 81701e85f4
All checks were successful
CD Pipeline / deploy (push) Successful in 1m12s
V10.488 add market intel MCP fetch receipt gate
2026-05-29 11:17:37 +08:00

745 lines
28 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""市場情報 MCP manual fetch run receipt review preview。
本模組只審核操作員 shell 手動 dry-run fetch 後貼回的 receipt
不執行 CLI、不發 HTTP request、不寫檔、不開 DB、不掛 scheduler。
"""
import ipaddress
from urllib.parse import urlparse
from services.market_intel.mcp_fetch_run_package import (
MAX_TOTAL_REQUESTS,
RUN_ARTIFACT_DIR_PREFIX,
)
from services.market_intel.mcp_fetch_run_readiness import (
build_mcp_fetch_run_readiness_preview,
)
FORBIDDEN_SECRET_KEYS = (
"approval_token",
"approval-token",
"api_key",
"authorization",
"bearer",
"client_secret",
"cookie",
"password",
"secret",
"session_cookie",
"token",
)
SAFE_SECRET_METADATA_KEYS = {
"operator_confirmed_no_secret_payload",
}
_BLOCKED_SIDE_EFFECT_KEYS = (
"allow_api_execution",
"allow_database_write",
"allow_external_network_in_api",
"allow_scheduler_attach",
"api_executed_cli",
"api_executes_cli",
"api_executes_docker",
"api_executes_health_check",
"api_executes_ssh",
"api_opens_database_connection",
"api_uses_external_network",
"api_writes_database",
"attach_scheduler",
"cli_executed",
"command_executed",
"database_commit_executed",
"database_session_created",
"database_write_executed",
"external_network_executed",
"external_network_executed_by_api",
"fetch_executed",
"fetch_executed_by_api",
"file_written",
"manual_fetch_gate_opened_by_api",
"network_request_allowed",
"receipt_file_written_by_api",
"receipt_persisted_by_api",
"run_receipt_file_written",
"run_receipt_persisted",
"scheduler_attached",
"write_database",
"writes_executed",
"would_write_database",
)
def _as_dict(value):
return value if isinstance(value, dict) else {}
def _as_list(value):
if value is None:
return []
if isinstance(value, (list, tuple, set)):
return list(value)
return [value]
def _safe_int(value):
try:
return int(value or 0)
except (TypeError, ValueError):
return 0
def _has_text(value):
return bool(isinstance(value, str) and value.strip())
def _safe_text(value, limit=500):
if value is None:
return None
text = str(value).strip()
return text[:limit] if text else None
def _safe_artifact_path(value, *, require_json=False):
if not isinstance(value, str):
return False
normalized = value.strip().replace("\\", "/")
if not normalized or normalized.startswith("/"):
return False
parts = [part for part in normalized.split("/") if part]
if any(part in (".", "..") for part in parts):
return False
if not normalized.startswith(RUN_ARTIFACT_DIR_PREFIX):
return False
return not require_json or normalized.endswith(".json")
def _contains_forbidden_secret_key(value):
if isinstance(value, dict):
for key, nested in value.items():
normalized_key = str(key).lower()
if normalized_key in SAFE_SECRET_METADATA_KEYS and isinstance(nested, bool):
continue
if any(secret_key in normalized_key for secret_key in FORBIDDEN_SECRET_KEYS):
return True
if _contains_forbidden_secret_key(nested):
return True
elif isinstance(value, list):
return any(_contains_forbidden_secret_key(item) for item in value)
return False
def _blocked_side_effects(payload):
found = []
def visit(value, path):
if isinstance(value, dict):
for key, item in value.items():
key_path = f"{path}.{key}" if path else key
if key in _BLOCKED_SIDE_EFFECT_KEYS and bool(item):
found.append(key_path)
visit(item, key_path)
elif isinstance(value, list):
for index, item in enumerate(value):
visit(item, f"{path}[{index}]")
visit(payload, "")
return found
def _is_public_http_url(value):
if not isinstance(value, str):
return False
parsed = urlparse(value.strip())
if parsed.scheme not in {"http", "https"} or not parsed.netloc:
return False
if parsed.username or parsed.password:
return False
host = (parsed.hostname or "").lower()
if host in {"localhost", "127.0.0.1", "::1"}:
return False
try:
ip = ipaddress.ip_address(host)
except ValueError:
return bool(host and "." in host)
return not (
ip.is_private
or ip.is_loopback
or ip.is_link_local
or ip.is_reserved
or ip.is_multicast
)
def _run_readiness_from_inputs(run_readiness_package, run_readiness_result, phase):
if isinstance(run_readiness_result, dict) and run_readiness_result:
return run_readiness_result
run_readiness_package = _as_dict(run_readiness_package)
return build_mcp_fetch_run_readiness_preview(
run_package=(
run_readiness_package.get("run_package")
or run_readiness_package.get("fetch_run_package")
or run_readiness_package.get("run_package_payload")
),
run_package_result=(
run_readiness_package.get("run_package_result")
or run_readiness_package.get("mcp_fetch_run_package")
),
operator_readiness=(
run_readiness_package.get("operator_readiness")
or run_readiness_package.get("operator_run_readiness")
),
phase=phase,
)
def _command_index(run_readiness, run_readiness_package):
run_readiness = _as_dict(run_readiness)
run_readiness_package = _as_dict(run_readiness_package)
commands = {}
package_result = _as_dict(
run_readiness_package.get("run_package_result")
or run_readiness_package.get("mcp_fetch_run_package")
)
for item in _as_list(package_result.get("command_previews")):
item = _as_dict(item)
key = (item.get("platform_code"), item.get("source_key"))
if key[0] and key[1]:
commands[key] = {
"platform_code": item.get("platform_code"),
"source_key": item.get("source_key"),
"source_url": item.get("source_url"),
"receipt_path": item.get("receipt_path"),
"sample_limit": item.get("sample_limit"),
"delay_seconds": item.get("delay_seconds"),
"timeout_seconds": item.get("timeout_seconds"),
"argv_preview_count": len(_as_list(item.get("argv_preview"))),
}
for item in _as_list(run_readiness.get("command_readiness")):
item = _as_dict(item)
key = (item.get("platform_code"), item.get("source_key"))
if not key[0] or not key[1]:
continue
existing = commands.get(key, {})
commands[key] = {
"platform_code": item.get("platform_code"),
"source_key": item.get("source_key"),
"source_url": existing.get("source_url") or item.get("source_url"),
"receipt_path": item.get("receipt_path") or existing.get("receipt_path"),
"sample_limit": existing.get("sample_limit") or item.get("sample_limit"),
"delay_seconds": existing.get("delay_seconds") or item.get("delay_seconds"),
"timeout_seconds": existing.get("timeout_seconds") or item.get("timeout_seconds"),
"argv_preview_count": len(_as_list(item.get("argv_preview")))
or existing.get("argv_preview_count", 0),
}
return commands
def _sample_run_receipt_package():
readiness_preview = build_mcp_fetch_run_readiness_preview()
run_readiness_package = readiness_preview["sample_run_readiness_package"]
run_readiness_result = build_mcp_fetch_run_readiness_preview(
run_package=run_readiness_package["run_package"],
run_package_result=run_readiness_package["run_package_result"],
operator_readiness=run_readiness_package["operator_readiness"],
)
commands = _command_index(run_readiness_result, run_readiness_package)
receipt_sources = []
for command in commands.values():
receipt_sources.append(
{
"platform_code": command["platform_code"],
"source_key": command["source_key"],
"source_url": command["source_url"],
"receipt_path": command["receipt_path"],
"status": "dry_run_completed",
"executed_by": "operator_shell",
"dry_run_first": True,
"request_count": 1,
"success_count": 1,
"error_count": 0,
"http_status_codes": [200],
"campaign_candidate_count": 1,
"product_candidate_count": 0,
"api_executed_cli": False,
"external_network_executed_by_api": False,
"database_write_executed": False,
"scheduler_attached": False,
}
)
artifact_path = run_readiness_package["operator_readiness"]["dry_run_receipt_path"]
return {
"run_readiness_package": run_readiness_package,
"run_readiness_result": run_readiness_result,
"manual_fetch_receipt": {
"receipt_id": "market-intel-manual-fetch-sample-receipt",
"run_label": "market-intel-manual-fetch-sample",
"receipt_artifact_path": artifact_path,
"executed_by": "operator_shell",
"dry_run_first": True,
"operator_shell_external_network_executed": True,
"operator_confirmed_public_pages_only": True,
"operator_confirmed_no_login_no_antibot": True,
"operator_confirmed_no_secret_payload": True,
"operator_confirmed_no_api_execution": True,
"operator_confirmed_no_database_write": True,
"operator_confirmed_no_scheduler_attach": True,
"summary": {
"source_count": len(receipt_sources),
"request_count": len(receipt_sources),
"success_count": len(receipt_sources),
"error_count": 0,
},
"sources": receipt_sources,
},
}
def _readiness_summary(run_readiness):
run_readiness = _as_dict(run_readiness)
package = _as_dict(run_readiness.get("run_package_summary"))
side_effects_clear = bool(
not run_readiness.get("manual_fetch_gate_opened_by_api")
and not run_readiness.get("network_request_allowed")
and not run_readiness.get("api_executes_cli")
and not run_readiness.get("api_opens_database_connection")
and not run_readiness.get("api_writes_database")
and not run_readiness.get("api_uses_external_network")
and not run_readiness.get("database_write_executed")
and not run_readiness.get("fetch_executed")
and not run_readiness.get("cli_executed")
and not run_readiness.get("file_written")
and not run_readiness.get("scheduler_attached")
)
return {
"mode": run_readiness.get("mode"),
"accepted": bool(run_readiness.get("mcp_fetch_run_readiness_accepted")),
"ready_for_manual_fetch_operator_run": bool(
run_readiness.get("ready_for_manual_fetch_operator_run")
),
"ready_for_manual_fetch_run_receipt_gate": bool(
run_readiness.get("ready_for_manual_fetch_run_receipt_gate")
),
"operator_shell_external_network_required": bool(
run_readiness.get("operator_shell_external_network_required")
),
"command_count": _safe_int(run_readiness.get("command_readiness_count")),
"max_total_requests": _safe_int(package.get("max_total_requests")),
"stop_after_error_count": _safe_int(package.get("stop_after_error_count")),
"side_effects_clear": side_effects_clear,
"blocked_reasons": run_readiness.get("blocked_reasons", []),
}
def _receipt_summary(manual_fetch_receipt):
receipt = _as_dict(manual_fetch_receipt)
sources = [_as_dict(item) for item in _as_list(receipt.get("sources"))]
summary = _as_dict(receipt.get("summary"))
source_summaries = []
for source in sources:
request_count = _safe_int(source.get("request_count"))
error_count = _safe_int(source.get("error_count"))
success_count = _safe_int(source.get("success_count"))
source_summaries.append(
{
"platform_code": _safe_text(source.get("platform_code"), 80),
"source_key": _safe_text(source.get("source_key"), 160),
"source_url": _safe_text(source.get("source_url"), 500),
"receipt_path": _safe_text(source.get("receipt_path"), 500),
"receipt_path_safe": _safe_artifact_path(
source.get("receipt_path"),
require_json=True,
),
"status": _safe_text(source.get("status"), 80),
"executed_by": _safe_text(source.get("executed_by"), 80),
"dry_run_first": bool(source.get("dry_run_first")),
"request_count": request_count,
"success_count": success_count,
"error_count": error_count,
"source_url_public": _is_public_http_url(source.get("source_url")),
"api_executed_cli": bool(source.get("api_executed_cli")),
"external_network_executed_by_api": bool(
source.get("external_network_executed_by_api")
),
"database_write_executed": bool(
source.get("database_write_executed")
),
"scheduler_attached": bool(source.get("scheduler_attached")),
}
)
request_count = _safe_int(summary.get("request_count")) or sum(
item["request_count"] for item in source_summaries
)
success_count = _safe_int(summary.get("success_count")) or sum(
item["success_count"] for item in source_summaries
)
error_count = _safe_int(summary.get("error_count")) or sum(
item["error_count"] for item in source_summaries
)
artifact_path = receipt.get("receipt_artifact_path")
return {
"provided_keys": sorted(receipt.keys()),
"receipt_id": _safe_text(receipt.get("receipt_id"), 160),
"run_label": _safe_text(receipt.get("run_label"), 160),
"receipt_artifact_path": _safe_text(artifact_path, 500),
"receipt_artifact_path_safe": _safe_artifact_path(
artifact_path,
require_json=True,
),
"executed_by": _safe_text(receipt.get("executed_by"), 80),
"dry_run_first": bool(receipt.get("dry_run_first")),
"operator_shell_external_network_executed": bool(
receipt.get("operator_shell_external_network_executed")
),
"operator_confirmed_public_pages_only": bool(
receipt.get("operator_confirmed_public_pages_only")
),
"operator_confirmed_no_login_no_antibot": bool(
receipt.get("operator_confirmed_no_login_no_antibot")
),
"operator_confirmed_no_secret_payload": bool(
receipt.get("operator_confirmed_no_secret_payload")
),
"operator_confirmed_no_api_execution": bool(
receipt.get("operator_confirmed_no_api_execution")
),
"operator_confirmed_no_database_write": bool(
receipt.get("operator_confirmed_no_database_write")
),
"operator_confirmed_no_scheduler_attach": bool(
receipt.get("operator_confirmed_no_scheduler_attach")
),
"source_count": len(source_summaries),
"summary_source_count": _safe_int(summary.get("source_count")),
"request_count": request_count,
"success_count": success_count,
"error_count": error_count,
"sources": source_summaries,
"source_keys": sorted(
(
item["platform_code"],
item["source_key"],
)
for item in source_summaries
if item["platform_code"] and item["source_key"]
),
"receipt_paths": sorted(
item["receipt_path"]
for item in source_summaries
if item["receipt_path"]
),
"all_receipt_paths_safe": bool(
source_summaries
and all(item["receipt_path_safe"] for item in source_summaries)
),
"all_urls_public": bool(
source_summaries
and all(item["source_url_public"] for item in source_summaries)
),
"all_status_dry_run_completed": bool(
source_summaries
and all(item["status"] == "dry_run_completed" for item in source_summaries)
),
"all_executed_by_operator_shell": bool(
receipt.get("executed_by") == "operator_shell"
and source_summaries
and all(item["executed_by"] == "operator_shell" for item in source_summaries)
),
"all_dry_run_first": bool(
receipt.get("dry_run_first") is True
and source_summaries
and all(item["dry_run_first"] for item in source_summaries)
),
"secret_or_token_submitted_to_api": _contains_forbidden_secret_key(receipt),
"blocked_side_effects": _blocked_side_effects(receipt),
}
def _expected_summary(command_index):
expected_source_keys = sorted(command_index)
expected_receipt_paths = sorted(
command.get("receipt_path")
for command in command_index.values()
if command.get("receipt_path")
)
return {
"command_count": len(command_index),
"expected_source_keys": expected_source_keys,
"expected_receipt_paths": expected_receipt_paths,
"all_expected_receipt_paths_safe": bool(
expected_receipt_paths
and all(
_safe_artifact_path(path, require_json=True)
for path in expected_receipt_paths
)
),
"commands": list(command_index.values()),
}
def _receipt_gates(
*,
run_readiness_received,
receipt_received,
receipt_valid_object,
readiness,
expected,
receipt,
):
expected_source_keys = set(expected["expected_source_keys"])
receipt_source_keys = set(tuple(item) for item in receipt["source_keys"])
expected_receipt_paths = set(expected["expected_receipt_paths"])
receipt_paths = set(receipt["receipt_paths"])
max_requests = readiness["max_total_requests"] or MAX_TOTAL_REQUESTS
stop_after_error_count = readiness["stop_after_error_count"] or 0
operator_boundaries_confirmed = bool(
receipt["operator_confirmed_public_pages_only"]
and receipt["operator_confirmed_no_login_no_antibot"]
and receipt["operator_confirmed_no_secret_payload"]
and receipt["operator_confirmed_no_api_execution"]
and receipt["operator_confirmed_no_database_write"]
and receipt["operator_confirmed_no_scheduler_attach"]
)
return [
{
"key": "run_readiness_payload_or_result_received",
"label": "已提供 run readiness package 或已審核結果",
"passed": run_readiness_received,
},
{
"key": "run_readiness_accepted",
"label": "run readiness gate 必須已通過",
"passed": readiness["accepted"],
},
{
"key": "run_readiness_ready_for_receipt_gate",
"label": "run readiness 只放行到 receipt review gate",
"passed": readiness["ready_for_manual_fetch_run_receipt_gate"],
},
{
"key": "run_readiness_side_effect_free",
"label": "readiness 結果未顯示 API 執行、連外、寫檔、寫 DB 或掛 scheduler",
"passed": readiness["side_effects_clear"],
},
{
"key": "manual_fetch_receipt_received",
"label": "已提供操作員 dry-run fetch receipt",
"passed": receipt_received,
},
{
"key": "manual_fetch_receipt_valid_object",
"label": "receipt payload 必須是 JSON object",
"passed": receipt_valid_object,
},
{
"key": "receipt_identity_recorded",
"label": "receipt 必須記錄 receipt_id、run_label 與 artifact path",
"passed": bool(
receipt["receipt_id"]
and receipt["run_label"]
and receipt["receipt_artifact_path_safe"]
),
},
{
"key": "expected_command_manifest_present",
"label": "readiness 必須有 command manifest 可供對帳",
"passed": bool(
expected["command_count"] > 0
and expected["all_expected_receipt_paths_safe"]
),
},
{
"key": "receipt_source_count_matches_command_count",
"label": "receipt source 數必須等於 readiness command 數",
"passed": bool(
receipt["source_count"] > 0
and receipt["source_count"] == expected["command_count"]
),
},
{
"key": "receipt_sources_match_expected",
"label": "receipt 的 platform/source 必須完全對齊 command manifest",
"passed": bool(
expected_source_keys
and receipt_source_keys == expected_source_keys
),
},
{
"key": "receipt_paths_match_expected",
"label": "receipt path 必須完全對齊 command preview 且位於安全目錄",
"passed": bool(
expected_receipt_paths
and receipt_paths == expected_receipt_paths
and receipt["all_receipt_paths_safe"]
),
},
{
"key": "receipt_status_dry_run_completed",
"label": "每個來源都必須回報 dry_run_completed",
"passed": receipt["all_status_dry_run_completed"],
},
{
"key": "receipt_executed_by_operator_shell",
"label": "receipt 必須標明由操作員 shell 執行,不是 API",
"passed": receipt["all_executed_by_operator_shell"],
},
{
"key": "receipt_dry_run_first_confirmed",
"label": "receipt 必須確認 dry-run-first",
"passed": receipt["all_dry_run_first"],
},
{
"key": "receipt_operator_external_network_only",
"label": "外部網路只能發生在操作員 shell不在 API",
"passed": bool(
receipt["operator_shell_external_network_executed"]
and readiness["operator_shell_external_network_required"]
),
},
{
"key": "receipt_request_budget_within_limit",
"label": "request count 不得超過 run package 上限",
"passed": bool(0 <= receipt["request_count"] <= max_requests),
},
{
"key": "receipt_error_count_within_stop_limit",
"label": "error count 不得超過 stop-after-error 門檻",
"passed": bool(receipt["error_count"] <= stop_after_error_count),
},
{
"key": "receipt_urls_public_http",
"label": "receipt source_url 必須是公開 http(s) URL且不含帳密",
"passed": receipt["all_urls_public"],
},
{
"key": "receipt_operator_boundaries_confirmed",
"label": "操作員確認公開頁、無登入/反爬、無 secret、無 API/DB/scheduler 副作用",
"passed": operator_boundaries_confirmed,
},
{
"key": "receipt_no_secret_or_token_key",
"label": "receipt payload 不得包含 secret、cookie、password 或 token key",
"passed": not receipt["secret_or_token_submitted_to_api"],
},
{
"key": "receipt_side_effect_free",
"label": "receipt payload 不得要求 API 執行、連外、寫檔、寫 DB 或掛 scheduler",
"passed": not receipt["blocked_side_effects"],
},
]
def build_mcp_fetch_run_receipt_preview(
*,
run_readiness_package=None,
run_readiness_result=None,
manual_fetch_receipt=None,
phase=None,
):
"""建立 manual fetch run receipt review不執行任何抓取或寫入。"""
run_readiness_package = _as_dict(run_readiness_package)
run_readiness_result_received = bool(
isinstance(run_readiness_result, dict) and run_readiness_result
)
receipt_valid_object = isinstance(manual_fetch_receipt, dict) if (
manual_fetch_receipt is not None
) else True
receipt_payload = _as_dict(manual_fetch_receipt)
run_readiness = _run_readiness_from_inputs(
run_readiness_package,
run_readiness_result,
phase,
)
run_readiness_received = bool(
run_readiness_package or run_readiness_result_received
)
run_receipt_payload_received = bool(
run_readiness_received or receipt_payload or manual_fetch_receipt is not None
)
receipt_received = bool(receipt_payload)
commands = _command_index(run_readiness, run_readiness_package)
readiness = _readiness_summary(run_readiness)
expected = _expected_summary(commands)
receipt = _receipt_summary(receipt_payload)
gates = _receipt_gates(
run_readiness_received=run_readiness_received,
receipt_received=receipt_received,
receipt_valid_object=receipt_valid_object,
readiness=readiness,
expected=expected,
receipt=receipt,
)
blocked_reasons = [gate["key"] for gate in gates if not gate["passed"]]
accepted = bool(run_receipt_payload_received and not blocked_reasons)
return {
"mode": (
"mcp_fetch_run_receipt_review"
if run_receipt_payload_received
else "mcp_fetch_run_receipt_preview"
),
"phase": phase,
"run_receipt_payload_received": run_receipt_payload_received,
"run_readiness_received": run_readiness_received,
"manual_fetch_receipt_received": receipt_received,
"manual_fetch_receipt_valid_object": receipt_valid_object,
"run_readiness_accepted": readiness["accepted"],
"mcp_fetch_run_receipt_accepted": accepted,
"run_receipt_ready": accepted,
"operator_shell_fetch_receipt_received": accepted,
"ready_for_manual_fetch_result_parser_review": accepted,
"ready_for_api_database_write": False,
"ready_for_scheduler_attach": False,
"manual_fetch_gate_opened_by_api": False,
"network_request_allowed": False,
"operator_shell_external_network_observed": bool(
receipt["operator_shell_external_network_executed"]
),
"fetch_executed_by_api": False,
"fetch_executed": False,
"cli_executed": False,
"database_write_executed": False,
"scheduler_attached": False,
"expected_command_count": expected["command_count"],
"receipt_source_count": receipt["source_count"],
"request_count": receipt["request_count"],
"error_count": receipt["error_count"],
"gate_count": len(gates),
"passed_gate_count": sum(1 for gate in gates if gate["passed"]),
"blocked_reasons": blocked_reasons,
"gates": gates,
"run_readiness_summary": readiness,
"expected_command_manifest": expected,
"manual_fetch_receipt_summary": receipt,
"sample_run_receipt_package": _sample_run_receipt_package(),
"next_operator_steps": [
"receipt 通過後,只代表可進結果 parser review不代表可寫 market_*",
"parser、DB import、scheduler attach、AI/Telegram 摘要都必須另開獨立 gate",
"API/UI 仍不得執行 CLI、不得抓外站、不得寫檔、不得開 DB、不得掛 scheduler",
],
"payload_persisted": False,
"run_receipt_persisted": False,
"receipt_persisted": False,
"run_receipt_file_written": False,
"receipt_file_written": False,
"api_executes_health_check": False,
"api_executes_docker": False,
"api_executes_ssh": False,
"api_executes_cli": False,
"api_opens_database_connection": False,
"api_writes_database": False,
"api_uses_external_network": False,
"database_session_created": False,
"database_commit_executed": False,
"external_network_executed": False,
"file_written": False,
"writes_executed": False,
"would_write_database": False,
}