Files
ewoooc/services/market_intel/mcp_fetch_run_readiness.py
OoO 1b94177828
All checks were successful
CD Pipeline / deploy (push) Successful in 1m24s
V10.414 add market intel MCP fetch run readiness gate
2026-05-24 14:16:55 +08:00

519 lines
19 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""市場情報 MCP manual fetch run readiness preview。
本模組只檢查上一段 run package 是否可交給操作員 shell 手動 dry-run
不執行 CLI、不發 HTTP request、不寫檔、不開 DB、不掛 scheduler。
"""
from services.market_intel.mcp_fetch_run_package import (
MAX_TOTAL_REQUESTS,
RUN_ARTIFACT_DIR_PREFIX,
build_mcp_fetch_run_package_preview,
)
FORBIDDEN_SECRET_KEYS = (
"approval_token",
"approval-token",
"api_key",
"authorization",
"bearer",
"client_secret",
"cookie",
"password",
"session_cookie",
)
_BLOCKED_SIDE_EFFECT_KEYS = (
"allow_api_execution",
"allow_database_write",
"allow_external_network_in_api",
"allow_scheduler_attach",
"api_executes_cli",
"api_executes_docker",
"api_executes_health_check",
"api_executes_ssh",
"api_opens_database_connection",
"api_uses_external_network",
"api_writes_database",
"attach_scheduler",
"cli_executed",
"command_executed",
"database_commit_executed",
"database_session_created",
"database_write_executed",
"external_network_executed",
"fetch_executed",
"file_written",
"manual_fetch_gate_opened_by_api",
"network_request_allowed",
"package_artifact_created",
"ready_for_manual_fetch_operator_run",
"receipt_file_written",
"run_package_persisted",
"run_readiness_artifact_created",
"run_readiness_file_written",
"run_readiness_persisted",
"scheduler_attached",
"write_database",
"writes_executed",
"would_write_database",
)
def _as_dict(value):
return value if isinstance(value, dict) else {}
def _as_list(value):
if value is None:
return []
if isinstance(value, (list, tuple, set)):
return list(value)
return [value]
def _has_text(value):
return bool(isinstance(value, str) and value.strip())
def _safe_text(value, limit=500):
if value is None:
return None
text = str(value).strip()
return text[:limit] if text else None
def _safe_artifact_path(value, *, require_json=False):
if not isinstance(value, str):
return False
normalized = value.strip().replace("\\", "/")
if not normalized or normalized.startswith("/"):
return False
parts = [part for part in normalized.split("/") if part]
if any(part in (".", "..") for part in parts):
return False
if not normalized.startswith(RUN_ARTIFACT_DIR_PREFIX):
return False
return not require_json or normalized.endswith(".json")
def _contains_forbidden_secret_key(value):
if isinstance(value, dict):
for key, nested in value.items():
normalized_key = str(key).lower()
if any(secret_key in normalized_key for secret_key in FORBIDDEN_SECRET_KEYS):
return True
if _contains_forbidden_secret_key(nested):
return True
elif isinstance(value, list):
return any(_contains_forbidden_secret_key(item) for item in value)
return False
def _blocked_side_effects(payload):
found = []
def visit(value, path):
if isinstance(value, dict):
for key, item in value.items():
key_path = f"{path}.{key}" if path else key
if key in _BLOCKED_SIDE_EFFECT_KEYS and bool(item):
found.append(key_path)
visit(item, key_path)
elif isinstance(value, list):
for index, item in enumerate(value):
visit(item, f"{path}[{index}]")
visit(payload, "")
return found
def _run_package_from_inputs(run_package, run_package_result, phase):
if isinstance(run_package_result, dict) and run_package_result:
return run_package_result
run_package = _as_dict(run_package)
return build_mcp_fetch_run_package_preview(
target_review_package=run_package.get("target_review_package", {}),
target_review_result=run_package.get("target_review_result"),
operator_run_controls=run_package.get("operator_run_controls", {}),
phase=phase,
)
def _sample_run_readiness_package():
package_preview = build_mcp_fetch_run_package_preview()
run_package = package_preview["sample_run_package"]
run_package_result = build_mcp_fetch_run_package_preview(
target_review_package=run_package["target_review_package"],
operator_run_controls=run_package["operator_run_controls"],
)
artifact_dir = run_package_result.get("artifact_dir") or (
RUN_ARTIFACT_DIR_PREFIX + "sample_market_intel_manual_fetch"
)
operator_readiness = {
"run_readiness_artifact_path": f"{artifact_dir}/run_readiness_review.json",
"dry_run_receipt_path": f"{artifact_dir}/dry_run_receipt.json",
"command_previews_reviewed": True,
"receipt_paths_reviewed": True,
"artifact_dir_reviewed": True,
"rate_limit_confirmed": True,
"timeout_confirmed": True,
"stop_on_error_confirmed": True,
"dry_run_first_confirmed": True,
"shell_only_confirmed": True,
"external_network_shell_only_confirmed": True,
"receipt_review_next_confirmed": True,
"operator_confirmed_no_secret_payload": True,
"operator_confirmed_no_api_execution": True,
"operator_confirmed_no_database_write": True,
"operator_confirmed_no_scheduler_attach": True,
}
return {
"run_package": run_package,
"run_package_result": run_package_result,
"operator_readiness": operator_readiness,
}
def _command_summary(run_package_result):
commands = [_as_dict(item) for item in _as_list(
run_package_result.get("command_previews")
)]
return {
"command_count": len(commands),
"receipt_path_count": sum(
1 for item in commands
if _has_text(item.get("receipt_path"))
),
"receipt_paths_safe": bool(
commands
and all(
_safe_artifact_path(item.get("receipt_path"), require_json=True)
for item in commands
)
),
"argv_preview_count": sum(
1 for item in commands
if isinstance(item.get("argv_preview"), list)
),
"all_have_dry_run_first": bool(
commands
and all("--dry-run-first" in _as_list(item.get("argv_preview")) for item in commands)
),
"commands_executed_by_api": any(
item.get("command_executed")
or item.get("external_network_executed_by_api")
or item.get("receipt_written_by_api")
for item in commands
),
"commands": [
{
"platform_code": item.get("platform_code"),
"source_key": item.get("source_key"),
"receipt_path": item.get("receipt_path"),
"argv_preview": item.get("argv_preview"),
"ready_for_operator_shell": False,
"command_executed": bool(item.get("command_executed")),
"external_network_executed_by_api": bool(
item.get("external_network_executed_by_api")
),
"receipt_written_by_api": bool(item.get("receipt_written_by_api")),
}
for item in commands
],
}
def _package_summary(run_package_result):
command_summary = _command_summary(run_package_result)
return {
"mode": run_package_result.get("mode"),
"accepted": bool(run_package_result.get("mcp_fetch_run_package_accepted")),
"ready_for_manual_fetch_run_readiness_review": bool(
run_package_result.get("ready_for_manual_fetch_run_readiness_review")
),
"ready_for_manual_fetch_operator_run": bool(
run_package_result.get("ready_for_manual_fetch_operator_run")
),
"artifact_dir": run_package_result.get("artifact_dir"),
"artifact_dir_safe": bool(run_package_result.get("artifact_dir_safe")),
"max_total_requests": run_package_result.get("max_total_requests"),
"stop_after_error_count": run_package_result.get("stop_after_error_count"),
"blocked_reasons": run_package_result.get("blocked_reasons", []),
"side_effects_clear": not _blocked_side_effects(run_package_result),
**command_summary,
}
def _operator_summary(operator_readiness):
operator_readiness = _as_dict(operator_readiness)
return {
"provided_keys": sorted(operator_readiness.keys()),
"run_readiness_artifact_path": _safe_text(
operator_readiness.get("run_readiness_artifact_path")
),
"dry_run_receipt_path": _safe_text(
operator_readiness.get("dry_run_receipt_path")
),
"run_readiness_artifact_path_safe": _safe_artifact_path(
operator_readiness.get("run_readiness_artifact_path"),
require_json=True,
),
"dry_run_receipt_path_safe": _safe_artifact_path(
operator_readiness.get("dry_run_receipt_path"),
require_json=True,
),
"command_previews_reviewed": bool(
operator_readiness.get("command_previews_reviewed")
),
"receipt_paths_reviewed": bool(
operator_readiness.get("receipt_paths_reviewed")
),
"artifact_dir_reviewed": bool(
operator_readiness.get("artifact_dir_reviewed")
),
"rate_limit_confirmed": bool(
operator_readiness.get("rate_limit_confirmed")
),
"timeout_confirmed": bool(operator_readiness.get("timeout_confirmed")),
"stop_on_error_confirmed": bool(
operator_readiness.get("stop_on_error_confirmed")
),
"dry_run_first_confirmed": bool(
operator_readiness.get("dry_run_first_confirmed")
),
"shell_only_confirmed": bool(
operator_readiness.get("shell_only_confirmed")
),
"external_network_shell_only_confirmed": bool(
operator_readiness.get("external_network_shell_only_confirmed")
),
"receipt_review_next_confirmed": bool(
operator_readiness.get("receipt_review_next_confirmed")
),
"operator_confirmed_no_secret_payload": bool(
operator_readiness.get("operator_confirmed_no_secret_payload")
),
"operator_confirmed_no_api_execution": bool(
operator_readiness.get("operator_confirmed_no_api_execution")
),
"operator_confirmed_no_database_write": bool(
operator_readiness.get("operator_confirmed_no_database_write")
),
"operator_confirmed_no_scheduler_attach": bool(
operator_readiness.get("operator_confirmed_no_scheduler_attach")
),
"secret_or_token_submitted_to_api": _contains_forbidden_secret_key(
operator_readiness
),
"blocked_side_effects": _blocked_side_effects(operator_readiness),
}
def _readiness_gates(*, run_package_received, package, operator):
operator_confirmed_review = bool(
operator["command_previews_reviewed"]
and operator["receipt_paths_reviewed"]
and operator["artifact_dir_reviewed"]
)
operator_confirmed_runtime_limits = bool(
operator["rate_limit_confirmed"]
and operator["timeout_confirmed"]
and operator["stop_on_error_confirmed"]
and operator["dry_run_first_confirmed"]
)
operator_confirmed_boundaries = bool(
operator["shell_only_confirmed"]
and operator["external_network_shell_only_confirmed"]
and operator["receipt_review_next_confirmed"]
and operator["operator_confirmed_no_secret_payload"]
and operator["operator_confirmed_no_api_execution"]
and operator["operator_confirmed_no_database_write"]
and operator["operator_confirmed_no_scheduler_attach"]
)
return [
{
"key": "run_package_payload_or_result_received",
"label": "已提供 run package payload 或已審核結果",
"passed": run_package_received,
},
{
"key": "run_package_accepted",
"label": "run package gate 已通過",
"passed": package["accepted"],
},
{
"key": "run_package_ready_for_readiness_review",
"label": "run package 只放行到 run readiness review",
"passed": package["ready_for_manual_fetch_run_readiness_review"],
},
{
"key": "run_package_did_not_open_operator_run",
"label": "前一階段不得自行打開 operator run",
"passed": not package["ready_for_manual_fetch_operator_run"],
},
{
"key": "command_preview_count_within_limit",
"label": "command preview 數量必須介於 1 與全域安全上限",
"passed": bool(0 < package["command_count"] <= MAX_TOTAL_REQUESTS),
},
{
"key": "command_previews_not_executed_by_api",
"label": "command preview 尚未由 API 執行或寫 receipt",
"passed": not package["commands_executed_by_api"],
},
{
"key": "command_receipt_paths_safe",
"label": "所有 receipt path 都限定於 market intel manual fetch 目錄",
"passed": package["receipt_paths_safe"],
},
{
"key": "command_previews_keep_dry_run_first",
"label": "所有命令預覽都保留 dry-run-first",
"passed": package["all_have_dry_run_first"],
},
{
"key": "run_package_side_effect_free",
"label": "run package 結果未夾帶 API 執行、DB、寫檔或 scheduler 副作用",
"passed": package["side_effects_clear"],
},
{
"key": "operator_readiness_received",
"label": "已提供操作員 readiness 證據",
"passed": bool(operator["provided_keys"]),
},
{
"key": "run_readiness_artifact_path_safe",
"label": "run readiness artifact path 合法且為 JSON",
"passed": operator["run_readiness_artifact_path_safe"],
},
{
"key": "dry_run_receipt_path_safe",
"label": "dry-run receipt path 合法且為 JSON",
"passed": operator["dry_run_receipt_path_safe"],
},
{
"key": "operator_confirmed_command_and_receipt_review",
"label": "操作員已覆核 command preview、receipt path 與 artifact 目錄",
"passed": operator_confirmed_review,
},
{
"key": "operator_confirmed_runtime_limits",
"label": "操作員已確認節流、timeout、錯誤停止與 dry-run-first",
"passed": operator_confirmed_runtime_limits,
},
{
"key": "operator_confirmed_shell_only_boundaries",
"label": "操作員確認 shell-only、無 secret、無 API/DB/scheduler 副作用",
"passed": operator_confirmed_boundaries,
},
{
"key": "secret_or_token_not_submitted_to_api",
"label": "readiness payload 不得包含 secret、cookie、password 或 token key",
"passed": not operator["secret_or_token_submitted_to_api"],
},
{
"key": "operator_readiness_side_effect_free",
"label": "operator readiness payload 不得要求 API 執行、連外、寫檔或寫 DB",
"passed": not operator["blocked_side_effects"],
},
]
def build_mcp_fetch_run_readiness_preview(
*,
run_package=None,
run_package_result=None,
operator_readiness=None,
phase=None,
):
"""建立 manual fetch run readiness review不執行任何抓取。"""
run_package = _as_dict(run_package)
run_package_result_received = bool(
isinstance(run_package_result, dict) and run_package_result
)
operator_readiness = _as_dict(operator_readiness)
run_package_result = _run_package_from_inputs(
run_package,
run_package_result,
phase,
)
run_readiness_payload_received = bool(
run_package or run_package_result_received or operator_readiness
)
run_package_received = bool(run_package or run_package_result_received)
package = _package_summary(run_package_result)
operator = _operator_summary(operator_readiness)
gates = _readiness_gates(
run_package_received=run_package_received,
package=package,
operator=operator,
)
blocked_reasons = [gate["key"] for gate in gates if not gate["passed"]]
accepted = bool(run_readiness_payload_received and not blocked_reasons)
command_readiness = [
{
**command,
"ready_for_operator_shell": accepted,
}
for command in package["commands"]
]
return {
"mode": (
"mcp_fetch_run_readiness_review"
if run_readiness_payload_received
else "mcp_fetch_run_readiness_preview"
),
"phase": phase,
"run_readiness_payload_received": run_readiness_payload_received,
"run_package_received": run_package_received,
"run_package_accepted": package["accepted"],
"operator_readiness_received": bool(operator["provided_keys"]),
"mcp_fetch_run_readiness_accepted": accepted,
"run_readiness_ready": accepted,
"ready_for_manual_fetch_operator_run": accepted,
"ready_for_manual_fetch_run_receipt_gate": accepted,
"manual_fetch_gate_opened_by_api": False,
"network_request_allowed": False,
"operator_shell_external_network_required": accepted,
"fetch_executed": False,
"cli_executed": False,
"database_write_executed": False,
"scheduler_attached": False,
"command_readiness_count": len(command_readiness),
"gate_count": len(gates),
"passed_gate_count": sum(1 for gate in gates if gate["passed"]),
"blocked_reasons": blocked_reasons,
"gates": gates,
"run_package_summary": package,
"operator_readiness_summary": operator,
"command_readiness": command_readiness,
"sample_run_readiness_package": _sample_run_readiness_package(),
"next_operator_steps": [
"readiness 通過後,操作員才可在 shell 依 argv preview 執行 dry-run fetch",
"每個來源都必須保存 receipt JSON再回貼到下一個 receipt review gate",
"API/UI 仍不得執行 CLI、不得抓外站、不得寫檔、不得開 DB、不得掛 scheduler",
],
"payload_persisted": False,
"run_readiness_persisted": False,
"run_readiness_artifact_created": False,
"run_readiness_file_written": False,
"receipt_file_written": False,
"run_receipt_file_written": False,
"api_executes_health_check": False,
"api_executes_docker": False,
"api_executes_ssh": False,
"api_executes_cli": False,
"api_opens_database_connection": False,
"api_writes_database": False,
"api_uses_external_network": False,
"database_session_created": False,
"database_commit_executed": False,
"external_network_executed": False,
"file_written": False,
"writes_executed": False,
"would_write_database": False,
}