"""市場情報候選 queue writer 只讀 preflight。 本模組檢查 transaction preview payload 能否對齊正式 `market_alert_review_queue` 欄位與 dedupe unique index;預設不連 DB, execute=true 時也只查 catalog,不寫入、不 commit、不掛 scheduler。 """ from sqlalchemy import create_engine, text QUEUE_TABLE = "market_alert_review_queue" PAYLOAD_COLUMN_MAP = { "alert_candidate_id": "alert_candidate_id", "review_state": "review_state", "priority_lane": "priority_lane", "threshold_level": "threshold_level", "total_score": "total_score", "evidence_bundle_id": "evidence_bundle_id", "dedupe_key": "dedupe_key", "source_batch_id": "source_batch_id", "metadata_json_preview": "metadata_json", } def _statement_payload_keys(transaction_preview): keys = set() for statement in transaction_preview.get("statements", []): keys.update(statement.get("parameter_keys", [])) return sorted(keys) def _mapped_columns(payload_keys): mapped = {} unmapped = [] for key in payload_keys: column = PAYLOAD_COLUMN_MAP.get(key) if column: mapped[key] = column else: unmapped.append(key) return mapped, unmapped def _planned_result(transaction_preview, payload_keys, mapped, unmapped): mapped_columns = sorted(set(mapped.values())) return { "mode": "candidate_queue_writer_preflight_planned", "target_table": QUEUE_TABLE, "execute_requested": False, "read_only_query_executed": False, "database_connection_opened": False, "database_session_created": False, "explicit_transaction_opened": False, "database_write_executed": False, "database_commit_executed": False, "database_rollback_executed": False, "external_network_executed": False, "scheduler_attached": False, "table_exists": False, "schema_ready": False, "ready_for_writer_review": False, "ready_for_real_write": False, "transaction_preview_created": bool( transaction_preview.get("transaction_preview_created") ), "statement_count": len(transaction_preview.get("statements", [])), "payload_keys": payload_keys, "payload_column_map": mapped, "mapped_insert_columns": mapped_columns, "unmapped_payload_keys": unmapped, "existing_columns": [], "missing_insert_columns": mapped_columns, "dedupe_unique_index_present": False, "index_names": [], "blocked_reasons": [ "execute_false_planned_only", "queue_writer_preflight_not_loaded", "candidate_queue_writer_real_write_requires_cli", ], "safety_contract": _safety_contract(), } def _probe_postgresql(conn): columns = conn.execute( text( """ SELECT column_name, data_type, is_nullable FROM information_schema.columns WHERE table_schema = ANY (current_schemas(false)) AND table_name = :table_name ORDER BY ordinal_position """ ), {"table_name": QUEUE_TABLE}, ).fetchall() indexes = conn.execute( text( """ SELECT indexname, indexdef FROM pg_indexes WHERE schemaname = ANY (current_schemas(false)) AND tablename = :table_name ORDER BY indexname """ ), {"table_name": QUEUE_TABLE}, ).fetchall() return [ { "name": row._mapping["column_name"], "type": row._mapping["data_type"], "nullable": row._mapping["is_nullable"] == "YES", } for row in columns ], [ { "name": row._mapping["indexname"], "definition": row._mapping["indexdef"], "unique": "UNIQUE" in str(row._mapping["indexdef"]).upper(), } for row in indexes ] def _probe_sqlite(conn): columns = conn.execute(text(f"PRAGMA table_info({QUEUE_TABLE})")).fetchall() indexes = conn.execute(text(f"PRAGMA index_list({QUEUE_TABLE})")).fetchall() column_rows = [ { "name": row._mapping["name"], "type": row._mapping["type"], "nullable": not bool(row._mapping["notnull"]), } for row in columns ] index_rows = [] for row in indexes: index_name = row._mapping["name"] indexed_columns = conn.execute(text(f"PRAGMA index_info({index_name})")).fetchall() index_rows.append( { "name": index_name, "definition": ",".join(item._mapping["name"] for item in indexed_columns), "unique": bool(row._mapping["unique"]), } ) return column_rows, index_rows def _dedupe_unique_index_present(index_rows): for row in index_rows: definition = str(row.get("definition") or "") if row.get("unique") and "dedupe_key" in definition: return True return False def _safety_contract(): return { "read_only_catalog_query_only": True, "does_not_open_transaction": True, "does_not_commit": True, "does_not_insert_queue_rows": True, "does_not_attach_scheduler": True, "target_table": QUEUE_TABLE, } def build_candidate_queue_writer_preflight( *, transaction_preview, execute_requested=False, database_url=None, database_type=None, engine=None, ): """建立候選 queue writer preflight;execute=true 仍只做 catalog read。""" payload_keys = _statement_payload_keys(transaction_preview) mapped, unmapped = _mapped_columns(payload_keys) if not execute_requested: return _planned_result(transaction_preview, payload_keys, mapped, unmapped) from config import DATABASE_PATH, DATABASE_TYPE effective_database_type = (database_type or DATABASE_TYPE or "").lower() effective_database_url = database_url or DATABASE_PATH created_engine = False connection_opened = False try: if engine is None: connect_args = {} if effective_database_type == "postgresql": connect_args = { "connect_timeout": 8, "options": "-c statement_timeout=15000", } engine = create_engine( effective_database_url, isolation_level="AUTOCOMMIT", pool_pre_ping=True, connect_args=connect_args, ) created_engine = True with engine.connect() as conn: connection_opened = True if effective_database_type == "postgresql": column_rows, index_rows = _probe_postgresql(conn) else: column_rows, index_rows = _probe_sqlite(conn) existing_columns = sorted(row["name"] for row in column_rows) mapped_columns = sorted(set(mapped.values())) missing_columns = [ column for column in mapped_columns if column not in set(existing_columns) ] table_exists = bool(column_rows) dedupe_unique = _dedupe_unique_index_present(index_rows) schema_ready = bool(table_exists and not missing_columns and dedupe_unique) payload_mappable = bool(payload_keys and not unmapped) ready_for_writer_review = bool(schema_ready and payload_mappable) blocked_reasons = [] if not table_exists: blocked_reasons.insert(0, "queue_table_missing") if missing_columns: blocked_reasons.insert(0, "queue_insert_columns_missing") if unmapped: blocked_reasons.insert(0, "queue_payload_keys_unmapped") if not dedupe_unique: blocked_reasons.insert(0, "dedupe_unique_index_missing") if not payload_keys: blocked_reasons.insert(0, "transaction_statement_payload_missing") return { "mode": "candidate_queue_writer_preflight_read_only", "target_table": QUEUE_TABLE, "execute_requested": True, "read_only_query_executed": True, "database_connection_opened": connection_opened, "database_session_created": False, "explicit_transaction_opened": False, "database_write_executed": False, "database_commit_executed": False, "database_rollback_executed": False, "external_network_executed": False, "scheduler_attached": False, "table_exists": table_exists, "schema_ready": schema_ready, "ready_for_writer_review": ready_for_writer_review, "ready_for_real_write": False, "transaction_preview_created": bool( transaction_preview.get("transaction_preview_created") ), "statement_count": len(transaction_preview.get("statements", [])), "payload_keys": payload_keys, "payload_column_map": mapped, "mapped_insert_columns": mapped_columns, "unmapped_payload_keys": unmapped, "existing_columns": existing_columns, "missing_insert_columns": missing_columns, "dedupe_unique_index_present": dedupe_unique, "index_names": [row["name"] for row in index_rows], "blocked_reasons": blocked_reasons, "safety_contract": _safety_contract(), } except Exception as exc: return { "mode": "candidate_queue_writer_preflight_error", "target_table": QUEUE_TABLE, "execute_requested": True, "read_only_query_executed": False, "database_connection_opened": connection_opened, "database_session_created": False, "explicit_transaction_opened": False, "database_write_executed": False, "database_commit_executed": False, "database_rollback_executed": False, "external_network_executed": False, "scheduler_attached": False, "table_exists": False, "schema_ready": False, "ready_for_writer_review": False, "ready_for_real_write": False, "transaction_preview_created": bool( transaction_preview.get("transaction_preview_created") ), "statement_count": len(transaction_preview.get("statements", [])), "payload_keys": payload_keys, "payload_column_map": mapped, "mapped_insert_columns": sorted(set(mapped.values())), "unmapped_payload_keys": unmapped, "existing_columns": [], "missing_insert_columns": sorted(set(mapped.values())), "dedupe_unique_index_present": False, "index_names": [], "blocked_reasons": [ "queue_writer_preflight_error", ], "error_message": str(exc), "safety_contract": _safety_contract(), } finally: if created_engine: engine.dispose()