Files
ewoooc/services/market_intel/candidate_queue_review_handoff.py
OoO e7e045253d
All checks were successful
CD Pipeline / deploy (push) Successful in 1m4s
補 OpenClaw QA 備援與市場情報交接
2026-05-19 12:46:43 +08:00

248 lines
9.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""候選審核 queue review handoff preview。
本模組只在 writer run closeout 通過後,整理人工審核交接契約;
不查 DB、不更新 review_state、不讀 approval token、不執行 CLI、不掛 scheduler。
"""
FORBIDDEN_TOKEN_KEYWORDS = (
"approval_token",
"approval-token",
"market_intel_queue_write_approval",
)
SAFE_TOKEN_METADATA_KEYS = {
"approval_token_present",
"approval_token_valid",
"approval_token_secret_configured",
}
SAFE_APPROVAL_ENV_VAR = "MARKET_INTEL_QUEUE_WRITE_APPROVAL"
def _as_dict(value):
return value if isinstance(value, dict) else {}
def _as_list(value):
if value is None:
return []
if isinstance(value, (list, tuple, set)):
return list(value)
return [value]
def _contains_forbidden_token_key(value):
if isinstance(value, dict):
for key, nested in value.items():
normalized_key = str(key).lower()
if normalized_key in SAFE_TOKEN_METADATA_KEYS and isinstance(nested, bool):
continue
if normalized_key == "approval_env_var" and nested == SAFE_APPROVAL_ENV_VAR:
continue
if any(token_key in normalized_key for token_key in FORBIDDEN_TOKEN_KEYWORDS):
return True
if _contains_forbidden_token_key(nested):
return True
elif isinstance(value, list):
return any(_contains_forbidden_token_key(item) for item in value)
return False
def _dedupe_keys(transaction_preview, closeout):
keys = []
for statement in _as_list(_as_dict(transaction_preview).get("statements")):
lookup = _as_dict(statement.get("lookup"))
parameter_preview = _as_dict(statement.get("parameter_preview"))
dedupe_key = lookup.get("dedupe_key") or parameter_preview.get("dedupe_key")
if dedupe_key:
keys.append(str(dedupe_key))
if not keys:
keys = _as_list(_as_dict(_as_dict(closeout).get("receipt_summary")).get("expected_dedupe_keys"))
return sorted(set(str(key) for key in keys if key))
def _closeout_summary(closeout):
closeout = _as_dict(closeout)
promotion_gate = _as_dict(closeout.get("promotion_gate"))
return {
"provided": bool(closeout),
"mode": closeout.get("mode"),
"closeout_passed": bool(closeout.get("closeout_passed")),
"ready_for_next_manual_phase": bool(closeout.get("ready_for_next_manual_phase")),
"promotion_allowed": bool(promotion_gate.get("allowed")),
"next_manual_phase": promotion_gate.get("next_manual_phase"),
"ready_for_api_database_write": bool(closeout.get("ready_for_api_database_write")),
"ready_for_scheduler_attach": bool(closeout.get("ready_for_scheduler_attach")),
"api_executes_cli": bool(closeout.get("api_executes_cli")),
"api_reads_approval_token": bool(closeout.get("api_reads_approval_token")),
"api_writes_file": bool(closeout.get("api_writes_file")),
"api_writes_database": bool(closeout.get("api_writes_database")),
"database_connection_opened": bool(closeout.get("database_connection_opened")),
"database_write_executed": bool(closeout.get("database_write_executed")),
"database_commit_executed": bool(closeout.get("database_commit_executed")),
"scheduler_attached": bool(closeout.get("scheduler_attached")),
}
def _operator_summary(operator_evidence):
operator_evidence = _as_dict(operator_evidence)
return {
"provided_keys": sorted(operator_evidence.keys()),
"operator_confirmed_queue_review_next": bool(
operator_evidence.get("operator_confirmed_queue_review_next")
),
"operator_confirmed_no_scheduler_attach": bool(
operator_evidence.get("operator_confirmed_no_scheduler_attach")
),
"operator_confirmed_no_api_db_write": bool(
operator_evidence.get("operator_confirmed_no_api_db_write")
),
"approval_token_submitted_to_api": _contains_forbidden_token_key(
operator_evidence
),
}
def _handoff_gates(closeout_summary, operator_summary, expected_dedupe_keys):
return [
{
"key": "closeout_preview_provided",
"label": "必須提供上一階段 closeout preview",
"passed": bool(
closeout_summary["provided"]
and closeout_summary["mode"]
== "candidate_queue_writer_run_closeout_preview"
),
},
{
"key": "closeout_passed",
"label": "closeout 必須通過才可交接人工審核",
"passed": closeout_summary["closeout_passed"],
},
{
"key": "closeout_promotes_manual_queue_review",
"label": "promotion 只能指向人工 queue review / read-only inventory",
"passed": bool(
closeout_summary["promotion_allowed"]
and closeout_summary["next_manual_phase"]
== "manual_queue_review_and_live_inventory_read_only"
),
},
{
"key": "expected_dedupe_keys_present",
"label": "交接包必須有 expected dedupe key 供人工比對",
"passed": bool(expected_dedupe_keys),
},
{
"key": "closeout_no_api_write_or_scheduler",
"label": "closeout 不得允許 API/UI DB 寫入、CLI 或 scheduler",
"passed": bool(
not closeout_summary["ready_for_api_database_write"]
and not closeout_summary["ready_for_scheduler_attach"]
and not closeout_summary["api_executes_cli"]
and not closeout_summary["api_writes_database"]
and not closeout_summary["database_write_executed"]
and not closeout_summary["database_commit_executed"]
and not closeout_summary["scheduler_attached"]
),
},
{
"key": "operator_confirmed_review_is_manual",
"label": "操作員確認下一步只做人工審核,不由 API 更新 review_state",
"passed": bool(
operator_summary["operator_confirmed_queue_review_next"]
and operator_summary["operator_confirmed_no_api_db_write"]
and operator_summary["operator_confirmed_no_scheduler_attach"]
),
},
{
"key": "handoff_no_approval_token_submitted_to_api",
"label": "handoff payload 不得包含一次性 approval token key",
"passed": not operator_summary["approval_token_submitted_to_api"],
},
]
def build_candidate_queue_review_handoff(
*,
transaction_preview,
run_closeout,
operator_evidence=None,
):
"""建立人工 queue review handoff不執行任何副作用。"""
closeout_summary = _closeout_summary(run_closeout)
operator_summary = _operator_summary(operator_evidence)
expected_dedupe_keys = _dedupe_keys(transaction_preview, run_closeout)
gates = _handoff_gates(closeout_summary, operator_summary, expected_dedupe_keys)
blocked_reasons = [gate["key"] for gate in gates if not gate["passed"]]
handoff_ready = bool(not blocked_reasons)
return {
"mode": "candidate_queue_review_handoff_preview",
"target_table": "market_alert_review_queue",
"handoff_ready": handoff_ready,
"ready_for_manual_queue_review": handoff_ready,
"ready_for_live_inventory_read_only": handoff_ready,
"ready_for_api_database_write": False,
"ready_for_scheduler_attach": False,
"api_executes_cli": False,
"api_reads_approval_token": False,
"api_writes_file": False,
"api_writes_database": False,
"api_updates_review_state": False,
"read_only_query_executed": False,
"database_connection_opened": False,
"database_session_created": False,
"explicit_transaction_opened": False,
"database_write_executed": False,
"database_commit_executed": False,
"database_rollback_executed": False,
"external_network_executed": False,
"scheduler_attached": False,
"writes_executed": False,
"would_write_database": False,
"expected_dedupe_keys": expected_dedupe_keys,
"blocked_reasons": blocked_reasons,
"gates": gates,
"closeout_summary": closeout_summary,
"operator_handoff_summary": operator_summary,
"review_contract": {
"expected_review_state": "needs_review",
"allowed_manual_actions": [
"confirm_queue_row_exists",
"inspect_evidence_json",
"compare_dedupe_key",
"record_human_decision_outside_api",
],
"forbidden_api_actions": [
"update_review_state",
"insert_missing_queue_row",
"dispatch_alert",
"attach_scheduler",
],
"required_columns": [
"dedupe_key",
"review_state",
"priority_lane",
"source_url",
"evidence_json",
"created_at",
],
},
"next_operator_steps": [
"以只讀 inventory 或人工 DB console 確認 queue row 存在",
"比對 expected dedupe key 與 review_state=needs_review",
"只在人工審核流程中記錄 confirmed / rejected / deferred 決策",
"若 queue row 不存在或 evidence 不一致,停回 writer receipt / closeout",
],
"safe_boundaries": [
"do_not_query_database_from_handoff_preview",
"do_not_update_review_state_from_api",
"do_not_insert_missing_queue_row_from_api",
"do_not_read_approval_token_from_api",
"do_not_execute_cli_from_handoff_preview",
"do_not_attach_scheduler_from_handoff_preview",
"no_remove_orphans",
"no_momo_db_lifecycle_change",
],
}