"""市場情報候選審核 queue writer CLI。 預設只建立正式 queue writer 前的安全 gate 狀態。只有 CLI 明確帶入 execute、apply-real-write、一次性 token、備份確認、migration live smoke 與 read-only preflight 全部通過時,才會以短 transaction 寫入 market_alert_review_queue。 """ import hmac import json import os from sqlalchemy import create_engine, text from services.market_intel.candidate_queue_writer_preflight import ( PAYLOAD_COLUMN_MAP, QUEUE_TABLE, ) APPROVAL_ENV_VAR = "MARKET_INTEL_QUEUE_WRITE_APPROVAL" MIN_APPROVAL_TOKEN_LENGTH = 16 QUEUE_INSERT_COLUMNS = ( "alert_candidate_id", "review_state", "priority_lane", "threshold_level", "total_score", "evidence_bundle_id", "dedupe_key", "source_batch_id", "metadata_json", ) QUEUE_INSERT_SQL_POSTGRESQL = """ INSERT INTO market_alert_review_queue ( alert_candidate_id, review_state, priority_lane, threshold_level, total_score, evidence_bundle_id, dedupe_key, source_batch_id, metadata_json ) VALUES ( :alert_candidate_id, :review_state, :priority_lane, :threshold_level, :total_score, :evidence_bundle_id, :dedupe_key, :source_batch_id, :metadata_json ) ON CONFLICT (dedupe_key) DO NOTHING RETURNING dedupe_key """.strip() QUEUE_INSERT_SQL_SQLITE = """ INSERT OR IGNORE INTO market_alert_review_queue ( alert_candidate_id, review_state, priority_lane, threshold_level, total_score, evidence_bundle_id, dedupe_key, source_batch_id, metadata_json ) VALUES ( :alert_candidate_id, :review_state, :priority_lane, :threshold_level, :total_score, :evidence_bundle_id, :dedupe_key, :source_batch_id, :metadata_json ) """.strip() def _approval_token_valid(approval_token, approval_token_secret): if not approval_token or not approval_token_secret: return False if len(str(approval_token_secret)) < MIN_APPROVAL_TOKEN_LENGTH: return False return hmac.compare_digest(str(approval_token), str(approval_token_secret)) def _database_type_for_engine(engine, database_type): if database_type: return str(database_type).lower() dialect = getattr(getattr(engine, "dialect", None), "name", "") return str(dialect or "").lower() def _queue_insert_sql(database_type): if database_type == "sqlite": return QUEUE_INSERT_SQL_SQLITE return QUEUE_INSERT_SQL_POSTGRESQL def _serialize_metadata(value): if value is None or isinstance(value, str): return value return json.dumps(value, ensure_ascii=False, sort_keys=True) def _writer_row_from_payload(payload): row = {} for payload_key, column in PAYLOAD_COLUMN_MAP.items(): if payload_key not in payload: continue value = payload[payload_key] if column == "metadata_json": value = _serialize_metadata(value) row[column] = value return row def _writer_rows_from_transaction(transaction_preview): rows = [] missing_payloads = [] invalid_rows = [] for statement in transaction_preview.get("statements", []): payload = statement.get("parameter_preview") if not isinstance(payload, dict): missing_payloads.append( statement.get("idempotency_key") or statement.get("index") ) continue row = _writer_row_from_payload(payload) missing_columns = [ column for column in QUEUE_INSERT_COLUMNS if row.get(column) in (None, "") ] if missing_columns: invalid_rows.append( { "idempotency_key": statement.get("idempotency_key"), "missing_columns": missing_columns, } ) rows.append(row) return rows, missing_payloads, invalid_rows def execute_candidate_queue_writer_transaction( *, transaction_preview, database_url=None, database_type=None, engine=None, ): """執行候選審核 queue insert;不建立 ORM session。""" writer_rows, missing_payloads, invalid_rows = _writer_rows_from_transaction( transaction_preview ) effective_database_url = database_url effective_database_type = (database_type or "").lower() created_engine = False connection_opened = False transaction_opened = False write_attempted = False if missing_payloads or invalid_rows: return { "mode": "candidate_queue_writer_cli_execute_error", "database_connection_opened": False, "database_session_created": False, "explicit_transaction_opened": False, "writes_executed": False, "would_write_database": True, "database_write_executed": False, "database_commit_executed": False, "database_rollback_executed": False, "external_network_executed": False, "scheduler_attached": False, "statement_count": len(writer_rows), "inserted_count": 0, "skipped_count": 0, "affected_dedupe_keys": [], "skipped_dedupe_keys": [], "error_message": "candidate_queue_statement_payload_invalid", "missing_payloads": missing_payloads, "invalid_rows": invalid_rows, } try: if engine is None: if not effective_database_url: from config import DATABASE_PATH, DATABASE_TYPE effective_database_url = DATABASE_PATH effective_database_type = ( database_type or DATABASE_TYPE or "" ).lower() connect_args = {} if effective_database_type == "postgresql": connect_args = { "connect_timeout": 8, "options": "-c statement_timeout=15000", } engine = create_engine( effective_database_url, pool_pre_ping=True, connect_args=connect_args, ) created_engine = True effective_database_type = _database_type_for_engine( engine, effective_database_type, ) insert_sql = text(_queue_insert_sql(effective_database_type)) inserted_dedupe_keys = [] skipped_dedupe_keys = [] with engine.begin() as conn: connection_opened = True transaction_opened = True for row in writer_rows: write_attempted = True result = conn.execute(insert_sql, row) if effective_database_type == "sqlite": inserted = int(result.rowcount or 0) > 0 else: inserted = bool(result.fetchall()) if inserted: inserted_dedupe_keys.append(row["dedupe_key"]) else: skipped_dedupe_keys.append(row["dedupe_key"]) return { "mode": "candidate_queue_writer_cli_executed", "database_connection_opened": connection_opened, "database_session_created": False, "explicit_transaction_opened": transaction_opened, "writes_executed": True, "would_write_database": True, "database_write_executed": write_attempted, "database_commit_executed": True, "database_rollback_executed": False, "external_network_executed": False, "scheduler_attached": False, "statement_count": len(writer_rows), "inserted_count": len(inserted_dedupe_keys), "skipped_count": len(skipped_dedupe_keys), "affected_dedupe_keys": inserted_dedupe_keys, "skipped_dedupe_keys": skipped_dedupe_keys, "idempotency": { "conflict_policy": "dedupe_key_do_nothing", "safe_to_rerun": True, }, "rollback_note": ( "若需回退,必須依 affected_dedupe_keys 人工審核清理;" "此 CLI 不自動刪除資料。" ), } except Exception as exc: return { "mode": "candidate_queue_writer_cli_execute_error", "database_connection_opened": connection_opened, "database_session_created": False, "explicit_transaction_opened": transaction_opened, "writes_executed": False, "would_write_database": True, "database_write_executed": write_attempted, "database_commit_executed": False, "database_rollback_executed": bool(transaction_opened), "external_network_executed": False, "scheduler_attached": False, "statement_count": len(writer_rows), "inserted_count": 0, "skipped_count": 0, "affected_dedupe_keys": [], "skipped_dedupe_keys": [], "error_message": str(exc), } finally: if created_engine: engine.dispose() def build_candidate_queue_writer_cli_plan( *, transaction_preview, writer_preflight=None, execute_requested=False, approval_token=None, approval_token_secret=None, apply_real_write=False, backup_verified=False, migration_live_smoke_passed=False, database_url=None, database_type=None, engine=None, ): """建立候選審核 queue writer CLI gate,必要時執行受控 insert。""" approval_token_present = bool(approval_token) approval_token_secret = approval_token_secret or os.getenv(APPROVAL_ENV_VAR) approval_token_secret_configured = bool(approval_token_secret) approval_token_valid = _approval_token_valid(approval_token, approval_token_secret) summary = transaction_preview.get("transaction_summary", {}) statement_count = int(summary.get("statement_count") or 0) transaction_preview_created = bool( transaction_preview.get("transaction_preview_created") ) preflight_ready = bool( writer_preflight and writer_preflight.get("ready_for_writer_review") ) writer_rows, missing_payloads, invalid_rows = _writer_rows_from_transaction( transaction_preview ) writer_rows_valid = bool(writer_rows and not missing_payloads and not invalid_rows) writer_enabled = True gates = [ { "key": "script_created", "label": "scripts/market_intel_candidate_queue_writer.py exists", "passed": True, }, { "key": "transaction_preview_created", "label": "候選 queue transaction preview 已建立", "passed": transaction_preview_created, }, { "key": "transaction_has_statements", "label": "transaction preview 至少包含一筆 statement", "passed": statement_count > 0, }, { "key": "execute_requested", "label": "--execute flag was explicitly provided", "passed": bool(execute_requested), }, { "key": "approval_token_present", "label": f"{APPROVAL_ENV_VAR} approval token was provided", "passed": approval_token_present, }, { "key": "approval_token_secret_configured", "label": f"{APPROVAL_ENV_VAR} environment token is configured", "passed": approval_token_secret_configured, }, { "key": "approval_token_valid", "label": "approval token matches the configured environment token", "passed": approval_token_valid, }, { "key": "apply_real_write_requested", "label": "--apply-real-write flag was explicitly provided", "passed": bool(apply_real_write), }, { "key": "queue_writer_preflight_ready", "label": "候選 queue writer schema / payload preflight 已通過", "passed": preflight_ready, }, { "key": "transaction_statement_payloads_present", "label": "transaction statements include reviewed parameter payloads", "passed": writer_rows_valid, }, { "key": "backup_verified", "label": "正式寫入前必須確認最新備份已完成", "passed": bool(backup_verified), }, { "key": "migration_live_smoke_passed", "label": "正式 schema live smoke 必須通過", "passed": bool(migration_live_smoke_passed), }, { "key": "queue_writer_implementation_enabled", "label": "候選 queue writer 實際寫入實作已限定於 CLI 啟用", "passed": writer_enabled, }, { "key": "manual_operator_approval", "label": "操作者需在 CLI 明確批准一次性寫入", "passed": bool( execute_requested and apply_real_write and approval_token_valid ), }, { "key": "crawler_stays_disabled", "label": "queue writer 不掛 crawler 或 scheduler", "passed": True, }, ] blocked_reasons = [gate["key"] for gate in gates if not gate["passed"]] ready_for_real_write = bool(execute_requested and not blocked_reasons) execution_result = None if ready_for_real_write: execution_result = execute_candidate_queue_writer_transaction( transaction_preview=transaction_preview, database_url=database_url, database_type=database_type, engine=engine, ) if execution_result["mode"] == "candidate_queue_writer_cli_execute_error": blocked_reasons = ["candidate_queue_writer_execute_error"] writes_executed = bool( execution_result and execution_result.get("writes_executed") ) return { "mode": ( execution_result["mode"] if execution_result else "candidate_queue_writer_cli_ready" if ready_for_real_write else "candidate_queue_writer_cli_blocked" ), "target_table": "market_alert_review_queue", "execute_requested": bool(execute_requested), "apply_real_write_requested": bool(apply_real_write), "backup_verified": bool(backup_verified), "migration_live_smoke_passed": bool(migration_live_smoke_passed), "approval_token_present": approval_token_present, "approval_token_valid": approval_token_valid, "approval_env_var": APPROVAL_ENV_VAR, "approval_token_secret_configured": approval_token_secret_configured, "queue_writer_implementation_enabled": writer_enabled, "ready_for_real_write": ready_for_real_write, "writes_executed": writes_executed, "would_write_database": bool(ready_for_real_write), "database_connection_opened": bool( execution_result and execution_result.get("database_connection_opened") ), "database_session_created": False, "explicit_transaction_opened": bool( execution_result and execution_result.get("explicit_transaction_opened") ), "database_write_executed": bool( execution_result and execution_result.get("database_write_executed") ), "database_commit_executed": bool( execution_result and execution_result.get("database_commit_executed") ), "database_rollback_executed": bool( execution_result and execution_result.get("database_rollback_executed") ), "external_network_executed": False, "scheduler_attached": False, "exit_code": 0 if writes_executed else 2 if execute_requested else 0, "blocked_reasons": blocked_reasons, "approval_gates": gates, "writer_payload_summary": { "writer_row_count": len(writer_rows), "missing_statement_payload_count": len(missing_payloads), "invalid_row_count": len(invalid_rows), "missing_payloads": missing_payloads, "invalid_rows": invalid_rows, }, "transaction_preview_summary": { "mode": transaction_preview.get("mode"), "transaction_preview_created": transaction_preview_created, "transaction_ready": bool(transaction_preview.get("transaction_ready")), "transaction_opened": bool(transaction_preview.get("transaction_opened")), "transaction_committed": bool( transaction_preview.get("transaction_committed") ), "statement_count": statement_count, "idempotency_key_count": int(summary.get("idempotency_key_count") or 0), "conflict_policy": summary.get("conflict_policy"), }, "transaction_preview": transaction_preview, "execution_result": execution_result, "inserted_count": int( execution_result.get("inserted_count") if execution_result else 0 ), "skipped_count": int( execution_result.get("skipped_count") if execution_result else 0 ), "affected_dedupe_keys": ( execution_result.get("affected_dedupe_keys") if execution_result else [] ), "skipped_dedupe_keys": ( execution_result.get("skipped_dedupe_keys") if execution_result else [] ), "writer_preflight": writer_preflight or { "mode": "candidate_queue_writer_preflight_not_requested", "ready_for_writer_review": False, "read_only_query_executed": False, "database_connection_opened": False, "database_write_executed": False, "database_commit_executed": False, }, "rollback_plan": [ { "key": "no_write_no_db_rollback_required", "label": "若被 gate 阻擋且未寫 DB,不需要 DB rollback", }, { "key": "dedupe_key_cleanup_review", "label": "正式寫入後若需回退,必須依 affected_dedupe_keys 人工審核清理", }, ], "safety_contract": { "refuses_api_execution": True, "refuses_execute_without_apply_flag": True, "requires_independent_approval_token": True, "requires_backup_verified": True, "requires_migration_live_smoke": True, "uses_core_connection_not_orm_session": True, "keeps_crawler_disabled_for_queue_write": True, "target_table": "market_alert_review_queue", }, "safe_boundaries": [ "do_not_execute_candidate_queue_writer_from_api", "do_not_open_database_connection_from_api_queue_writer_status", "do_not_commit_api_queue_writer_status", "do_not_attach_scheduler_from_queue_writer", "no_remove_orphans", "no_momo_db_lifecycle_change", ], }