526 lines
19 KiB
Python
526 lines
19 KiB
Python
"""市場情報候選審核 queue writer CLI。
|
||
|
||
預設只建立正式 queue writer 前的安全 gate 狀態。只有 CLI 明確帶入
|
||
execute、apply-real-write、一次性 token、備份確認、migration live smoke 與
|
||
read-only preflight 全部通過時,才會以短 transaction 寫入
|
||
market_alert_review_queue。
|
||
"""
|
||
|
||
import hmac
|
||
import json
|
||
import os
|
||
|
||
from sqlalchemy import create_engine, text
|
||
|
||
from services.market_intel.candidate_queue_writer_preflight import (
|
||
PAYLOAD_COLUMN_MAP,
|
||
QUEUE_TABLE,
|
||
)
|
||
|
||
|
||
APPROVAL_ENV_VAR = "MARKET_INTEL_QUEUE_WRITE_APPROVAL"
|
||
MIN_APPROVAL_TOKEN_LENGTH = 16
|
||
QUEUE_INSERT_COLUMNS = (
|
||
"alert_candidate_id",
|
||
"review_state",
|
||
"priority_lane",
|
||
"threshold_level",
|
||
"total_score",
|
||
"evidence_bundle_id",
|
||
"dedupe_key",
|
||
"source_batch_id",
|
||
"metadata_json",
|
||
)
|
||
QUEUE_INSERT_SQL_POSTGRESQL = """
|
||
INSERT INTO market_alert_review_queue (
|
||
alert_candidate_id,
|
||
review_state,
|
||
priority_lane,
|
||
threshold_level,
|
||
total_score,
|
||
evidence_bundle_id,
|
||
dedupe_key,
|
||
source_batch_id,
|
||
metadata_json
|
||
) VALUES (
|
||
:alert_candidate_id,
|
||
:review_state,
|
||
:priority_lane,
|
||
:threshold_level,
|
||
:total_score,
|
||
:evidence_bundle_id,
|
||
:dedupe_key,
|
||
:source_batch_id,
|
||
:metadata_json
|
||
)
|
||
ON CONFLICT (dedupe_key) DO NOTHING
|
||
RETURNING dedupe_key
|
||
""".strip()
|
||
QUEUE_INSERT_SQL_SQLITE = """
|
||
INSERT OR IGNORE INTO market_alert_review_queue (
|
||
alert_candidate_id,
|
||
review_state,
|
||
priority_lane,
|
||
threshold_level,
|
||
total_score,
|
||
evidence_bundle_id,
|
||
dedupe_key,
|
||
source_batch_id,
|
||
metadata_json
|
||
) VALUES (
|
||
:alert_candidate_id,
|
||
:review_state,
|
||
:priority_lane,
|
||
:threshold_level,
|
||
:total_score,
|
||
:evidence_bundle_id,
|
||
:dedupe_key,
|
||
:source_batch_id,
|
||
:metadata_json
|
||
)
|
||
""".strip()
|
||
|
||
|
||
def _approval_token_valid(approval_token, approval_token_secret):
|
||
if not approval_token or not approval_token_secret:
|
||
return False
|
||
if len(str(approval_token_secret)) < MIN_APPROVAL_TOKEN_LENGTH:
|
||
return False
|
||
return hmac.compare_digest(str(approval_token), str(approval_token_secret))
|
||
|
||
|
||
def _database_type_for_engine(engine, database_type):
|
||
if database_type:
|
||
return str(database_type).lower()
|
||
dialect = getattr(getattr(engine, "dialect", None), "name", "")
|
||
return str(dialect or "").lower()
|
||
|
||
|
||
def _queue_insert_sql(database_type):
|
||
if database_type == "sqlite":
|
||
return QUEUE_INSERT_SQL_SQLITE
|
||
return QUEUE_INSERT_SQL_POSTGRESQL
|
||
|
||
|
||
def _serialize_metadata(value):
|
||
if value is None or isinstance(value, str):
|
||
return value
|
||
return json.dumps(value, ensure_ascii=False, sort_keys=True)
|
||
|
||
|
||
def _writer_row_from_payload(payload):
|
||
row = {}
|
||
for payload_key, column in PAYLOAD_COLUMN_MAP.items():
|
||
if payload_key not in payload:
|
||
continue
|
||
value = payload[payload_key]
|
||
if column == "metadata_json":
|
||
value = _serialize_metadata(value)
|
||
row[column] = value
|
||
return row
|
||
|
||
|
||
def _writer_rows_from_transaction(transaction_preview):
|
||
rows = []
|
||
missing_payloads = []
|
||
invalid_rows = []
|
||
for statement in transaction_preview.get("statements", []):
|
||
payload = statement.get("parameter_preview")
|
||
if not isinstance(payload, dict):
|
||
missing_payloads.append(
|
||
statement.get("idempotency_key") or statement.get("index")
|
||
)
|
||
continue
|
||
row = _writer_row_from_payload(payload)
|
||
missing_columns = [
|
||
column for column in QUEUE_INSERT_COLUMNS
|
||
if row.get(column) in (None, "")
|
||
]
|
||
if missing_columns:
|
||
invalid_rows.append(
|
||
{
|
||
"idempotency_key": statement.get("idempotency_key"),
|
||
"missing_columns": missing_columns,
|
||
}
|
||
)
|
||
rows.append(row)
|
||
return rows, missing_payloads, invalid_rows
|
||
|
||
|
||
def execute_candidate_queue_writer_transaction(
|
||
*,
|
||
transaction_preview,
|
||
database_url=None,
|
||
database_type=None,
|
||
engine=None,
|
||
):
|
||
"""執行候選審核 queue insert;不建立 ORM session。"""
|
||
writer_rows, missing_payloads, invalid_rows = _writer_rows_from_transaction(
|
||
transaction_preview
|
||
)
|
||
effective_database_url = database_url
|
||
effective_database_type = (database_type or "").lower()
|
||
created_engine = False
|
||
connection_opened = False
|
||
transaction_opened = False
|
||
write_attempted = False
|
||
|
||
if missing_payloads or invalid_rows:
|
||
return {
|
||
"mode": "candidate_queue_writer_cli_execute_error",
|
||
"database_connection_opened": False,
|
||
"database_session_created": False,
|
||
"explicit_transaction_opened": False,
|
||
"writes_executed": False,
|
||
"would_write_database": True,
|
||
"database_write_executed": False,
|
||
"database_commit_executed": False,
|
||
"database_rollback_executed": False,
|
||
"external_network_executed": False,
|
||
"scheduler_attached": False,
|
||
"statement_count": len(writer_rows),
|
||
"inserted_count": 0,
|
||
"skipped_count": 0,
|
||
"affected_dedupe_keys": [],
|
||
"skipped_dedupe_keys": [],
|
||
"error_message": "candidate_queue_statement_payload_invalid",
|
||
"missing_payloads": missing_payloads,
|
||
"invalid_rows": invalid_rows,
|
||
}
|
||
|
||
try:
|
||
if engine is None:
|
||
if not effective_database_url:
|
||
from config import DATABASE_PATH, DATABASE_TYPE
|
||
|
||
effective_database_url = DATABASE_PATH
|
||
effective_database_type = (
|
||
database_type or DATABASE_TYPE or ""
|
||
).lower()
|
||
connect_args = {}
|
||
if effective_database_type == "postgresql":
|
||
connect_args = {
|
||
"connect_timeout": 8,
|
||
"options": "-c statement_timeout=15000",
|
||
}
|
||
engine = create_engine(
|
||
effective_database_url,
|
||
pool_pre_ping=True,
|
||
connect_args=connect_args,
|
||
)
|
||
created_engine = True
|
||
|
||
effective_database_type = _database_type_for_engine(
|
||
engine,
|
||
effective_database_type,
|
||
)
|
||
insert_sql = text(_queue_insert_sql(effective_database_type))
|
||
inserted_dedupe_keys = []
|
||
skipped_dedupe_keys = []
|
||
with engine.begin() as conn:
|
||
connection_opened = True
|
||
transaction_opened = True
|
||
for row in writer_rows:
|
||
write_attempted = True
|
||
result = conn.execute(insert_sql, row)
|
||
if effective_database_type == "sqlite":
|
||
inserted = int(result.rowcount or 0) > 0
|
||
else:
|
||
inserted = bool(result.fetchall())
|
||
if inserted:
|
||
inserted_dedupe_keys.append(row["dedupe_key"])
|
||
else:
|
||
skipped_dedupe_keys.append(row["dedupe_key"])
|
||
|
||
return {
|
||
"mode": "candidate_queue_writer_cli_executed",
|
||
"database_connection_opened": connection_opened,
|
||
"database_session_created": False,
|
||
"explicit_transaction_opened": transaction_opened,
|
||
"writes_executed": True,
|
||
"would_write_database": True,
|
||
"database_write_executed": write_attempted,
|
||
"database_commit_executed": True,
|
||
"database_rollback_executed": False,
|
||
"external_network_executed": False,
|
||
"scheduler_attached": False,
|
||
"statement_count": len(writer_rows),
|
||
"inserted_count": len(inserted_dedupe_keys),
|
||
"skipped_count": len(skipped_dedupe_keys),
|
||
"affected_dedupe_keys": inserted_dedupe_keys,
|
||
"skipped_dedupe_keys": skipped_dedupe_keys,
|
||
"idempotency": {
|
||
"conflict_policy": "dedupe_key_do_nothing",
|
||
"safe_to_rerun": True,
|
||
},
|
||
"rollback_note": (
|
||
"若需回退,必須依 affected_dedupe_keys 人工審核清理;"
|
||
"此 CLI 不自動刪除資料。"
|
||
),
|
||
}
|
||
except Exception as exc:
|
||
return {
|
||
"mode": "candidate_queue_writer_cli_execute_error",
|
||
"database_connection_opened": connection_opened,
|
||
"database_session_created": False,
|
||
"explicit_transaction_opened": transaction_opened,
|
||
"writes_executed": False,
|
||
"would_write_database": True,
|
||
"database_write_executed": write_attempted,
|
||
"database_commit_executed": False,
|
||
"database_rollback_executed": bool(transaction_opened),
|
||
"external_network_executed": False,
|
||
"scheduler_attached": False,
|
||
"statement_count": len(writer_rows),
|
||
"inserted_count": 0,
|
||
"skipped_count": 0,
|
||
"affected_dedupe_keys": [],
|
||
"skipped_dedupe_keys": [],
|
||
"error_message": str(exc),
|
||
}
|
||
finally:
|
||
if created_engine:
|
||
engine.dispose()
|
||
|
||
|
||
def build_candidate_queue_writer_cli_plan(
|
||
*,
|
||
transaction_preview,
|
||
writer_preflight=None,
|
||
execute_requested=False,
|
||
approval_token=None,
|
||
approval_token_secret=None,
|
||
apply_real_write=False,
|
||
backup_verified=False,
|
||
migration_live_smoke_passed=False,
|
||
database_url=None,
|
||
database_type=None,
|
||
engine=None,
|
||
):
|
||
"""建立候選審核 queue writer CLI gate,必要時執行受控 insert。"""
|
||
approval_token_present = bool(approval_token)
|
||
approval_token_secret = approval_token_secret or os.getenv(APPROVAL_ENV_VAR)
|
||
approval_token_secret_configured = bool(approval_token_secret)
|
||
approval_token_valid = _approval_token_valid(approval_token, approval_token_secret)
|
||
summary = transaction_preview.get("transaction_summary", {})
|
||
statement_count = int(summary.get("statement_count") or 0)
|
||
transaction_preview_created = bool(
|
||
transaction_preview.get("transaction_preview_created")
|
||
)
|
||
preflight_ready = bool(
|
||
writer_preflight and writer_preflight.get("ready_for_writer_review")
|
||
)
|
||
writer_rows, missing_payloads, invalid_rows = _writer_rows_from_transaction(
|
||
transaction_preview
|
||
)
|
||
writer_rows_valid = bool(writer_rows and not missing_payloads and not invalid_rows)
|
||
writer_enabled = True
|
||
gates = [
|
||
{
|
||
"key": "script_created",
|
||
"label": "scripts/market_intel_candidate_queue_writer.py exists",
|
||
"passed": True,
|
||
},
|
||
{
|
||
"key": "transaction_preview_created",
|
||
"label": "候選 queue transaction preview 已建立",
|
||
"passed": transaction_preview_created,
|
||
},
|
||
{
|
||
"key": "transaction_has_statements",
|
||
"label": "transaction preview 至少包含一筆 statement",
|
||
"passed": statement_count > 0,
|
||
},
|
||
{
|
||
"key": "execute_requested",
|
||
"label": "--execute flag was explicitly provided",
|
||
"passed": bool(execute_requested),
|
||
},
|
||
{
|
||
"key": "approval_token_present",
|
||
"label": f"{APPROVAL_ENV_VAR} approval token was provided",
|
||
"passed": approval_token_present,
|
||
},
|
||
{
|
||
"key": "approval_token_secret_configured",
|
||
"label": f"{APPROVAL_ENV_VAR} environment token is configured",
|
||
"passed": approval_token_secret_configured,
|
||
},
|
||
{
|
||
"key": "approval_token_valid",
|
||
"label": "approval token matches the configured environment token",
|
||
"passed": approval_token_valid,
|
||
},
|
||
{
|
||
"key": "apply_real_write_requested",
|
||
"label": "--apply-real-write flag was explicitly provided",
|
||
"passed": bool(apply_real_write),
|
||
},
|
||
{
|
||
"key": "queue_writer_preflight_ready",
|
||
"label": "候選 queue writer schema / payload preflight 已通過",
|
||
"passed": preflight_ready,
|
||
},
|
||
{
|
||
"key": "transaction_statement_payloads_present",
|
||
"label": "transaction statements include reviewed parameter payloads",
|
||
"passed": writer_rows_valid,
|
||
},
|
||
{
|
||
"key": "backup_verified",
|
||
"label": "正式寫入前必須確認最新備份已完成",
|
||
"passed": bool(backup_verified),
|
||
},
|
||
{
|
||
"key": "migration_live_smoke_passed",
|
||
"label": "正式 schema live smoke 必須通過",
|
||
"passed": bool(migration_live_smoke_passed),
|
||
},
|
||
{
|
||
"key": "queue_writer_implementation_enabled",
|
||
"label": "候選 queue writer 實際寫入實作已限定於 CLI 啟用",
|
||
"passed": writer_enabled,
|
||
},
|
||
{
|
||
"key": "manual_operator_approval",
|
||
"label": "操作者需在 CLI 明確批准一次性寫入",
|
||
"passed": bool(
|
||
execute_requested and apply_real_write and approval_token_valid
|
||
),
|
||
},
|
||
{
|
||
"key": "crawler_stays_disabled",
|
||
"label": "queue writer 不掛 crawler 或 scheduler",
|
||
"passed": True,
|
||
},
|
||
]
|
||
blocked_reasons = [gate["key"] for gate in gates if not gate["passed"]]
|
||
ready_for_real_write = bool(execute_requested and not blocked_reasons)
|
||
execution_result = None
|
||
if ready_for_real_write:
|
||
execution_result = execute_candidate_queue_writer_transaction(
|
||
transaction_preview=transaction_preview,
|
||
database_url=database_url,
|
||
database_type=database_type,
|
||
engine=engine,
|
||
)
|
||
if execution_result["mode"] == "candidate_queue_writer_cli_execute_error":
|
||
blocked_reasons = ["candidate_queue_writer_execute_error"]
|
||
|
||
writes_executed = bool(
|
||
execution_result and execution_result.get("writes_executed")
|
||
)
|
||
|
||
return {
|
||
"mode": (
|
||
execution_result["mode"]
|
||
if execution_result
|
||
else "candidate_queue_writer_cli_ready"
|
||
if ready_for_real_write
|
||
else "candidate_queue_writer_cli_blocked"
|
||
),
|
||
"target_table": "market_alert_review_queue",
|
||
"execute_requested": bool(execute_requested),
|
||
"apply_real_write_requested": bool(apply_real_write),
|
||
"backup_verified": bool(backup_verified),
|
||
"migration_live_smoke_passed": bool(migration_live_smoke_passed),
|
||
"approval_token_present": approval_token_present,
|
||
"approval_token_valid": approval_token_valid,
|
||
"approval_env_var": APPROVAL_ENV_VAR,
|
||
"approval_token_secret_configured": approval_token_secret_configured,
|
||
"queue_writer_implementation_enabled": writer_enabled,
|
||
"ready_for_real_write": ready_for_real_write,
|
||
"writes_executed": writes_executed,
|
||
"would_write_database": bool(ready_for_real_write),
|
||
"database_connection_opened": bool(
|
||
execution_result and execution_result.get("database_connection_opened")
|
||
),
|
||
"database_session_created": False,
|
||
"explicit_transaction_opened": bool(
|
||
execution_result and execution_result.get("explicit_transaction_opened")
|
||
),
|
||
"database_write_executed": bool(
|
||
execution_result and execution_result.get("database_write_executed")
|
||
),
|
||
"database_commit_executed": bool(
|
||
execution_result and execution_result.get("database_commit_executed")
|
||
),
|
||
"database_rollback_executed": bool(
|
||
execution_result and execution_result.get("database_rollback_executed")
|
||
),
|
||
"external_network_executed": False,
|
||
"scheduler_attached": False,
|
||
"exit_code": 0 if writes_executed else 2 if execute_requested else 0,
|
||
"blocked_reasons": blocked_reasons,
|
||
"approval_gates": gates,
|
||
"writer_payload_summary": {
|
||
"writer_row_count": len(writer_rows),
|
||
"missing_statement_payload_count": len(missing_payloads),
|
||
"invalid_row_count": len(invalid_rows),
|
||
"missing_payloads": missing_payloads,
|
||
"invalid_rows": invalid_rows,
|
||
},
|
||
"transaction_preview_summary": {
|
||
"mode": transaction_preview.get("mode"),
|
||
"transaction_preview_created": transaction_preview_created,
|
||
"transaction_ready": bool(transaction_preview.get("transaction_ready")),
|
||
"transaction_opened": bool(transaction_preview.get("transaction_opened")),
|
||
"transaction_committed": bool(
|
||
transaction_preview.get("transaction_committed")
|
||
),
|
||
"statement_count": statement_count,
|
||
"idempotency_key_count": int(summary.get("idempotency_key_count") or 0),
|
||
"conflict_policy": summary.get("conflict_policy"),
|
||
},
|
||
"transaction_preview": transaction_preview,
|
||
"execution_result": execution_result,
|
||
"inserted_count": int(
|
||
execution_result.get("inserted_count") if execution_result else 0
|
||
),
|
||
"skipped_count": int(
|
||
execution_result.get("skipped_count") if execution_result else 0
|
||
),
|
||
"affected_dedupe_keys": (
|
||
execution_result.get("affected_dedupe_keys") if execution_result else []
|
||
),
|
||
"skipped_dedupe_keys": (
|
||
execution_result.get("skipped_dedupe_keys") if execution_result else []
|
||
),
|
||
"writer_preflight": writer_preflight or {
|
||
"mode": "candidate_queue_writer_preflight_not_requested",
|
||
"ready_for_writer_review": False,
|
||
"read_only_query_executed": False,
|
||
"database_connection_opened": False,
|
||
"database_write_executed": False,
|
||
"database_commit_executed": False,
|
||
},
|
||
"rollback_plan": [
|
||
{
|
||
"key": "no_write_no_db_rollback_required",
|
||
"label": "若被 gate 阻擋且未寫 DB,不需要 DB rollback",
|
||
},
|
||
{
|
||
"key": "dedupe_key_cleanup_review",
|
||
"label": "正式寫入後若需回退,必須依 affected_dedupe_keys 人工審核清理",
|
||
},
|
||
],
|
||
"safety_contract": {
|
||
"refuses_api_execution": True,
|
||
"refuses_execute_without_apply_flag": True,
|
||
"requires_independent_approval_token": True,
|
||
"requires_backup_verified": True,
|
||
"requires_migration_live_smoke": True,
|
||
"uses_core_connection_not_orm_session": True,
|
||
"keeps_crawler_disabled_for_queue_write": True,
|
||
"target_table": "market_alert_review_queue",
|
||
},
|
||
"safe_boundaries": [
|
||
"do_not_execute_candidate_queue_writer_from_api",
|
||
"do_not_open_database_connection_from_api_queue_writer_status",
|
||
"do_not_commit_api_queue_writer_status",
|
||
"do_not_attach_scheduler_from_queue_writer",
|
||
"no_remove_orphans",
|
||
"no_momo_db_lifecycle_change",
|
||
],
|
||
}
|