Files
ewoooc/services/market_intel/candidate_queue_writer_preflight.py
OoO 20d22b69ea
All checks were successful
CD Pipeline / deploy (push) Successful in 1m18s
新增市場情報候選佇列 writer transaction
2026-05-19 09:57:59 +08:00

308 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""市場情報候選 queue writer 只讀 preflight。
本模組檢查 transaction preview payload 能否對齊正式
`market_alert_review_queue` 欄位與 dedupe unique index預設不連 DB
execute=true 時也只查 catalog不寫入、不 commit、不掛 scheduler。
"""
from sqlalchemy import create_engine, text
QUEUE_TABLE = "market_alert_review_queue"
PAYLOAD_COLUMN_MAP = {
"alert_candidate_id": "alert_candidate_id",
"review_state": "review_state",
"priority_lane": "priority_lane",
"threshold_level": "threshold_level",
"total_score": "total_score",
"evidence_bundle_id": "evidence_bundle_id",
"dedupe_key": "dedupe_key",
"source_batch_id": "source_batch_id",
"metadata_json_preview": "metadata_json",
}
def _statement_payload_keys(transaction_preview):
keys = set()
for statement in transaction_preview.get("statements", []):
keys.update(statement.get("parameter_keys", []))
return sorted(keys)
def _mapped_columns(payload_keys):
mapped = {}
unmapped = []
for key in payload_keys:
column = PAYLOAD_COLUMN_MAP.get(key)
if column:
mapped[key] = column
else:
unmapped.append(key)
return mapped, unmapped
def _planned_result(transaction_preview, payload_keys, mapped, unmapped):
mapped_columns = sorted(set(mapped.values()))
return {
"mode": "candidate_queue_writer_preflight_planned",
"target_table": QUEUE_TABLE,
"execute_requested": False,
"read_only_query_executed": False,
"database_connection_opened": False,
"database_session_created": False,
"explicit_transaction_opened": False,
"database_write_executed": False,
"database_commit_executed": False,
"database_rollback_executed": False,
"external_network_executed": False,
"scheduler_attached": False,
"table_exists": False,
"schema_ready": False,
"ready_for_writer_review": False,
"ready_for_real_write": False,
"transaction_preview_created": bool(
transaction_preview.get("transaction_preview_created")
),
"statement_count": len(transaction_preview.get("statements", [])),
"payload_keys": payload_keys,
"payload_column_map": mapped,
"mapped_insert_columns": mapped_columns,
"unmapped_payload_keys": unmapped,
"existing_columns": [],
"missing_insert_columns": mapped_columns,
"dedupe_unique_index_present": False,
"index_names": [],
"blocked_reasons": [
"execute_false_planned_only",
"queue_writer_preflight_not_loaded",
"candidate_queue_writer_real_write_requires_cli",
],
"safety_contract": _safety_contract(),
}
def _probe_postgresql(conn):
columns = conn.execute(
text(
"""
SELECT column_name, data_type, is_nullable
FROM information_schema.columns
WHERE table_schema = ANY (current_schemas(false))
AND table_name = :table_name
ORDER BY ordinal_position
"""
),
{"table_name": QUEUE_TABLE},
).fetchall()
indexes = conn.execute(
text(
"""
SELECT indexname, indexdef
FROM pg_indexes
WHERE schemaname = ANY (current_schemas(false))
AND tablename = :table_name
ORDER BY indexname
"""
),
{"table_name": QUEUE_TABLE},
).fetchall()
return [
{
"name": row._mapping["column_name"],
"type": row._mapping["data_type"],
"nullable": row._mapping["is_nullable"] == "YES",
}
for row in columns
], [
{
"name": row._mapping["indexname"],
"definition": row._mapping["indexdef"],
"unique": "UNIQUE" in str(row._mapping["indexdef"]).upper(),
}
for row in indexes
]
def _probe_sqlite(conn):
columns = conn.execute(text(f"PRAGMA table_info({QUEUE_TABLE})")).fetchall()
indexes = conn.execute(text(f"PRAGMA index_list({QUEUE_TABLE})")).fetchall()
column_rows = [
{
"name": row._mapping["name"],
"type": row._mapping["type"],
"nullable": not bool(row._mapping["notnull"]),
}
for row in columns
]
index_rows = []
for row in indexes:
index_name = row._mapping["name"]
indexed_columns = conn.execute(text(f"PRAGMA index_info({index_name})")).fetchall()
index_rows.append(
{
"name": index_name,
"definition": ",".join(item._mapping["name"] for item in indexed_columns),
"unique": bool(row._mapping["unique"]),
}
)
return column_rows, index_rows
def _dedupe_unique_index_present(index_rows):
for row in index_rows:
definition = str(row.get("definition") or "")
if row.get("unique") and "dedupe_key" in definition:
return True
return False
def _safety_contract():
return {
"read_only_catalog_query_only": True,
"does_not_open_transaction": True,
"does_not_commit": True,
"does_not_insert_queue_rows": True,
"does_not_attach_scheduler": True,
"target_table": QUEUE_TABLE,
}
def build_candidate_queue_writer_preflight(
*,
transaction_preview,
execute_requested=False,
database_url=None,
database_type=None,
engine=None,
):
"""建立候選 queue writer preflightexecute=true 仍只做 catalog read。"""
payload_keys = _statement_payload_keys(transaction_preview)
mapped, unmapped = _mapped_columns(payload_keys)
if not execute_requested:
return _planned_result(transaction_preview, payload_keys, mapped, unmapped)
from config import DATABASE_PATH, DATABASE_TYPE
effective_database_type = (database_type or DATABASE_TYPE or "").lower()
effective_database_url = database_url or DATABASE_PATH
created_engine = False
connection_opened = False
try:
if engine is None:
connect_args = {}
if effective_database_type == "postgresql":
connect_args = {
"connect_timeout": 8,
"options": "-c statement_timeout=15000",
}
engine = create_engine(
effective_database_url,
isolation_level="AUTOCOMMIT",
pool_pre_ping=True,
connect_args=connect_args,
)
created_engine = True
with engine.connect() as conn:
connection_opened = True
if effective_database_type == "postgresql":
column_rows, index_rows = _probe_postgresql(conn)
else:
column_rows, index_rows = _probe_sqlite(conn)
existing_columns = sorted(row["name"] for row in column_rows)
mapped_columns = sorted(set(mapped.values()))
missing_columns = [
column for column in mapped_columns
if column not in set(existing_columns)
]
table_exists = bool(column_rows)
dedupe_unique = _dedupe_unique_index_present(index_rows)
schema_ready = bool(table_exists and not missing_columns and dedupe_unique)
payload_mappable = bool(payload_keys and not unmapped)
ready_for_writer_review = bool(schema_ready and payload_mappable)
blocked_reasons = []
if not table_exists:
blocked_reasons.insert(0, "queue_table_missing")
if missing_columns:
blocked_reasons.insert(0, "queue_insert_columns_missing")
if unmapped:
blocked_reasons.insert(0, "queue_payload_keys_unmapped")
if not dedupe_unique:
blocked_reasons.insert(0, "dedupe_unique_index_missing")
if not payload_keys:
blocked_reasons.insert(0, "transaction_statement_payload_missing")
return {
"mode": "candidate_queue_writer_preflight_read_only",
"target_table": QUEUE_TABLE,
"execute_requested": True,
"read_only_query_executed": True,
"database_connection_opened": connection_opened,
"database_session_created": False,
"explicit_transaction_opened": False,
"database_write_executed": False,
"database_commit_executed": False,
"database_rollback_executed": False,
"external_network_executed": False,
"scheduler_attached": False,
"table_exists": table_exists,
"schema_ready": schema_ready,
"ready_for_writer_review": ready_for_writer_review,
"ready_for_real_write": False,
"transaction_preview_created": bool(
transaction_preview.get("transaction_preview_created")
),
"statement_count": len(transaction_preview.get("statements", [])),
"payload_keys": payload_keys,
"payload_column_map": mapped,
"mapped_insert_columns": mapped_columns,
"unmapped_payload_keys": unmapped,
"existing_columns": existing_columns,
"missing_insert_columns": missing_columns,
"dedupe_unique_index_present": dedupe_unique,
"index_names": [row["name"] for row in index_rows],
"blocked_reasons": blocked_reasons,
"safety_contract": _safety_contract(),
}
except Exception as exc:
return {
"mode": "candidate_queue_writer_preflight_error",
"target_table": QUEUE_TABLE,
"execute_requested": True,
"read_only_query_executed": False,
"database_connection_opened": connection_opened,
"database_session_created": False,
"explicit_transaction_opened": False,
"database_write_executed": False,
"database_commit_executed": False,
"database_rollback_executed": False,
"external_network_executed": False,
"scheduler_attached": False,
"table_exists": False,
"schema_ready": False,
"ready_for_writer_review": False,
"ready_for_real_write": False,
"transaction_preview_created": bool(
transaction_preview.get("transaction_preview_created")
),
"statement_count": len(transaction_preview.get("statements", [])),
"payload_keys": payload_keys,
"payload_column_map": mapped,
"mapped_insert_columns": sorted(set(mapped.values())),
"unmapped_payload_keys": unmapped,
"existing_columns": [],
"missing_insert_columns": sorted(set(mapped.values())),
"dedupe_unique_index_present": False,
"index_names": [],
"blocked_reasons": [
"queue_writer_preflight_error",
],
"error_message": str(exc),
"safety_contract": _safety_contract(),
}
finally:
if created_engine:
engine.dispose()