Files
ewoooc/services/market_intel/candidate_queue_review_decision.py
OoO 4a0a8bf75b
All checks were successful
CD Pipeline / deploy (push) Successful in 1m3s
新增市場情報 queue review decision
2026-05-19 13:16:59 +08:00

220 lines
8.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""候選審核 queue 人工決策草案。
本模組只在 review inventory 通過後整理人工決策契約;
不更新 review_state、不寫審核紀錄、不讀 approval token、不掛 scheduler。
"""
FORBIDDEN_TOKEN_KEYWORDS = (
"approval_token",
"approval-token",
"market_intel_queue_write_approval",
)
SAFE_TOKEN_METADATA_KEYS = {
"approval_token_present",
"approval_token_valid",
"approval_token_secret_configured",
}
SAFE_APPROVAL_ENV_VAR = "MARKET_INTEL_QUEUE_WRITE_APPROVAL"
TARGET_TABLE = "market_alert_review_queue"
ALLOWED_DECISIONS = ("confirmed", "rejected", "deferred")
def _as_dict(value):
return value if isinstance(value, dict) else {}
def _as_list(value):
if value is None:
return []
if isinstance(value, (list, tuple, set)):
return list(value)
return [value]
def _contains_forbidden_token_key(value):
if isinstance(value, dict):
for key, nested in value.items():
normalized_key = str(key).lower()
if normalized_key in SAFE_TOKEN_METADATA_KEYS and isinstance(nested, bool):
continue
if normalized_key == "approval_env_var" and nested == SAFE_APPROVAL_ENV_VAR:
continue
if any(token_key in normalized_key for token_key in FORBIDDEN_TOKEN_KEYWORDS):
return True
if _contains_forbidden_token_key(nested):
return True
elif isinstance(value, list):
return any(_contains_forbidden_token_key(item) for item in value)
return False
def _operator_summary(operator_evidence):
operator_evidence = _as_dict(operator_evidence)
proposed_decision = str(operator_evidence.get("proposed_review_decision") or "").strip()
return {
"provided_keys": sorted(operator_evidence.keys()),
"reviewer_id": str(operator_evidence.get("reviewer_id") or "").strip(),
"proposed_review_decision": proposed_decision,
"decision_notes_present": bool(str(operator_evidence.get("decision_notes") or "").strip()),
"operator_confirmed_manual_decision_only": bool(
operator_evidence.get("operator_confirmed_manual_decision_only")
),
"operator_confirmed_no_scheduler_attach": bool(
operator_evidence.get("operator_confirmed_no_scheduler_attach")
),
"operator_confirmed_no_api_db_write": bool(
operator_evidence.get("operator_confirmed_no_api_db_write")
),
"approval_token_submitted_to_api": _contains_forbidden_token_key(
operator_evidence
),
}
def _decision_rows(review_inventory, proposed_decision):
rows = []
for row in _as_list(_as_dict(review_inventory).get("row_summaries")):
row = _as_dict(row)
rows.append(
{
"dedupe_key": row.get("dedupe_key"),
"current_review_state": row.get("review_state"),
"proposed_review_state": proposed_decision or None,
"priority_lane": row.get("priority_lane"),
"total_score": row.get("total_score"),
"write_status": "manual_decision_preview_only",
}
)
return rows
def _review_gates(review_inventory, operator_summary, decision_rows):
proposed_decision = operator_summary["proposed_review_decision"]
return [
{
"key": "review_inventory_ready",
"label": "上一階段 inventory 必須通過",
"passed": bool(review_inventory.get("review_inventory_ready")),
},
{
"key": "review_rows_present",
"label": "必須有 needs_review row 可供人工決策",
"passed": bool(decision_rows),
},
{
"key": "all_rows_still_needs_review",
"label": "所有 row 的目前狀態必須仍是 needs_review",
"passed": bool(
decision_rows
and all(row.get("current_review_state") == "needs_review" for row in decision_rows)
),
},
{
"key": "reviewer_identity_present",
"label": "人工審核需提供 reviewer_id",
"passed": bool(operator_summary["reviewer_id"]),
},
{
"key": "proposed_decision_allowed",
"label": "人工決策只能是 confirmed / rejected / deferred",
"passed": proposed_decision in ALLOWED_DECISIONS,
},
{
"key": "decision_notes_present",
"label": "人工決策需留下 notes方便後續稽核",
"passed": bool(operator_summary["decision_notes_present"]),
},
{
"key": "operator_confirmed_decision_is_manual",
"label": "操作員確認 API 只產生草案,不更新 review_state",
"passed": bool(
operator_summary["operator_confirmed_manual_decision_only"]
and operator_summary["operator_confirmed_no_api_db_write"]
and operator_summary["operator_confirmed_no_scheduler_attach"]
),
},
{
"key": "decision_no_approval_token_submitted_to_api",
"label": "payload 不得包含一次性 approval token key",
"passed": not operator_summary["approval_token_submitted_to_api"],
},
]
def build_candidate_queue_review_decision(
*,
review_inventory,
operator_evidence=None,
):
"""建立人工 queue review 決策草案;不執行 DB update。"""
review_inventory = _as_dict(review_inventory)
operator_summary = _operator_summary(operator_evidence)
decision_rows = _decision_rows(
review_inventory,
operator_summary["proposed_review_decision"],
)
gates = _review_gates(review_inventory, operator_summary, decision_rows)
blocked_reasons = [gate["key"] for gate in gates if not gate["passed"]]
decision_ready = bool(not blocked_reasons)
return {
"mode": "candidate_queue_review_decision_preview",
"target_table": TARGET_TABLE,
"decision_ready": decision_ready,
"ready_for_human_decision_record": decision_ready,
"ready_for_api_review_state_update": False,
"ready_for_api_database_write": False,
"ready_for_scheduler_attach": False,
"api_executes_cli": False,
"api_reads_approval_token": False,
"api_writes_file": False,
"api_writes_database": False,
"api_updates_review_state": False,
"decision_record_written": False,
"review_state_update_executed": False,
"read_only_query_executed": bool(review_inventory.get("read_only_query_executed")),
"database_connection_opened": bool(review_inventory.get("database_connection_opened")),
"database_session_created": False,
"explicit_transaction_opened": False,
"database_write_executed": False,
"database_commit_executed": False,
"database_rollback_executed": False,
"external_network_executed": False,
"scheduler_attached": False,
"writes_executed": False,
"would_write_database": False,
"expected_dedupe_keys": _as_list(review_inventory.get("expected_dedupe_keys")),
"decision_rows": decision_rows,
"operator_decision_summary": operator_summary,
"blocked_reasons": blocked_reasons,
"gates": gates,
"decision_contract": {
"expected_current_state": "needs_review",
"allowed_next_states": list(ALLOWED_DECISIONS),
"manual_record_required": True,
"forbidden_api_actions": [
"update_review_state",
"write_decision_record",
"dispatch_alert",
"attach_scheduler",
],
},
"next_operator_steps": [
"人工確認每個 dedupe key 對應 evidence_json",
"選擇 confirmed / rejected / deferred 並留下 decision_notes",
"在 API 外部人工審核流程記錄 reviewer_id 與決策",
"若 row 狀態不是 needs_review停回 inventory / post-write smoke",
],
"safe_boundaries": [
"do_not_update_review_state_from_api",
"do_not_write_decision_record_from_api",
"do_not_insert_missing_queue_row_from_api",
"do_not_read_approval_token_from_api",
"do_not_execute_cli_from_review_decision",
"do_not_attach_scheduler_from_review_decision",
"no_remove_orphans",
"no_momo_db_lifecycle_change",
],
}