410 lines
15 KiB
Python
410 lines
15 KiB
Python
"""市場情報 seed writer CLI。
|
||
|
||
預設只回報 CLI 執行計畫。只有 CLI 明確帶入 execute、apply-real-write 與確認 token
|
||
時,才會以短 transaction upsert market_platforms seed rows。
|
||
"""
|
||
|
||
import hashlib
|
||
import hmac
|
||
import json
|
||
import os
|
||
|
||
from sqlalchemy import bindparam, create_engine, text
|
||
|
||
|
||
APPROVAL_ENV_VAR = "MARKET_INTEL_SEED_WRITE_APPROVAL"
|
||
MIN_APPROVAL_TOKEN_LENGTH = 16
|
||
PLATFORM_UPSERT_SQL = """
|
||
INSERT INTO market_platforms (
|
||
code,
|
||
name,
|
||
base_url,
|
||
enabled,
|
||
crawl_policy_json,
|
||
created_at,
|
||
updated_at
|
||
) VALUES (
|
||
:code,
|
||
:name,
|
||
:base_url,
|
||
:enabled,
|
||
:crawl_policy_json,
|
||
CURRENT_TIMESTAMP,
|
||
CURRENT_TIMESTAMP
|
||
)
|
||
ON CONFLICT (code) DO UPDATE SET
|
||
name = EXCLUDED.name,
|
||
base_url = EXCLUDED.base_url,
|
||
enabled = EXCLUDED.enabled,
|
||
crawl_policy_json = EXCLUDED.crawl_policy_json,
|
||
updated_at = CURRENT_TIMESTAMP
|
||
""".strip()
|
||
|
||
|
||
def _payload_hash(payload):
|
||
encoded = json.dumps(payload, ensure_ascii=False, sort_keys=True).encode("utf-8")
|
||
return hashlib.sha256(encoded).hexdigest()[:16]
|
||
|
||
|
||
def build_seed_transaction_preview(writer_plan, migration_blueprint):
|
||
"""建立 seed writer transaction preview;不建立 DB session。"""
|
||
statements = []
|
||
for index, operation in enumerate(writer_plan.get("operations", []), start=1):
|
||
values = operation.get("values", {})
|
||
lookup_code = operation.get("lookup", {}).get("code") or values.get("code")
|
||
statements.append(
|
||
{
|
||
"index": index,
|
||
"operation": operation.get("operation", "upsert"),
|
||
"table": operation.get("table", "market_platforms"),
|
||
"lookup": {"code": lookup_code},
|
||
"sql_template": PLATFORM_UPSERT_SQL,
|
||
"parameter_keys": sorted(values),
|
||
"parameter_payload_hash": _payload_hash(values),
|
||
"idempotency_key": f"market_platforms:{lookup_code}",
|
||
"diff_status": "not_loaded_no_db_session",
|
||
"write_status": "blocked_transaction_preview_only",
|
||
}
|
||
)
|
||
|
||
migration_ready = bool(
|
||
migration_blueprint.get("file_created")
|
||
and migration_blueprint.get("file_matches_blueprint")
|
||
and not migration_blueprint.get("migration_executed")
|
||
)
|
||
return {
|
||
"mode": "seed_transaction_preview_no_session",
|
||
"target_table": "market_platforms",
|
||
"statement_count": len(statements),
|
||
"statements": statements,
|
||
"migration_draft_ready": migration_ready,
|
||
"database_snapshot_loaded": False,
|
||
"existing_rows_seen": 0,
|
||
"database_session_created": False,
|
||
"transaction_opened": False,
|
||
"writes_executed": False,
|
||
"would_write_database": False,
|
||
"database_commit_executed": False,
|
||
"database_rollback_executed": False,
|
||
"external_network_executed": False,
|
||
"scheduler_attached": False,
|
||
"required_runtime_order": [
|
||
"backup_verified",
|
||
"migration_applied_by_operator",
|
||
"schema_smoke_passed",
|
||
"feature_flags_reviewed",
|
||
"one_time_approval_token_verified",
|
||
"apply_real_write_flag_verified",
|
||
],
|
||
"safety_contract": {
|
||
"idempotent_upsert_preview_only": True,
|
||
"does_not_load_existing_rows": True,
|
||
"does_not_open_transaction": True,
|
||
"does_not_commit": True,
|
||
},
|
||
}
|
||
|
||
|
||
def _query_existing_platform_rows(conn, platform_codes):
|
||
if not platform_codes:
|
||
return {}
|
||
|
||
rows = conn.execute(
|
||
text(
|
||
"""
|
||
SELECT code, name, base_url, enabled, crawl_policy_json
|
||
FROM market_platforms
|
||
WHERE code IN :platform_codes
|
||
ORDER BY code
|
||
"""
|
||
).bindparams(bindparam("platform_codes", expanding=True)),
|
||
{"platform_codes": tuple(platform_codes)},
|
||
).fetchall()
|
||
return {
|
||
row._mapping["code"]: dict(row._mapping)
|
||
for row in rows
|
||
}
|
||
|
||
|
||
def _operation_values(operation):
|
||
values = dict(operation.get("values", {}))
|
||
values["enabled"] = bool(values.get("enabled", False))
|
||
return values
|
||
|
||
|
||
def _approval_token_valid(approval_token, approval_token_secret):
|
||
if not approval_token or not approval_token_secret:
|
||
return False
|
||
if len(str(approval_token_secret)) < MIN_APPROVAL_TOKEN_LENGTH:
|
||
return False
|
||
return hmac.compare_digest(str(approval_token), str(approval_token_secret))
|
||
|
||
|
||
def execute_seed_writer_transaction(
|
||
*,
|
||
writer_plan,
|
||
database_url=None,
|
||
database_type=None,
|
||
engine=None,
|
||
):
|
||
"""執行 market_platforms seed upsert;不建立 ORM session。"""
|
||
operations = list(writer_plan.get("operations", []))
|
||
platform_codes = [
|
||
operation.get("lookup", {}).get("code") or operation.get("values", {}).get("code")
|
||
for operation in operations
|
||
]
|
||
effective_database_type = (database_type or "").lower()
|
||
effective_database_url = database_url
|
||
created_engine = False
|
||
connection_opened = False
|
||
transaction_opened = False
|
||
|
||
try:
|
||
if engine is None:
|
||
if not effective_database_url:
|
||
from config import DATABASE_PATH, DATABASE_TYPE
|
||
|
||
effective_database_url = DATABASE_PATH
|
||
effective_database_type = (database_type or DATABASE_TYPE or "").lower()
|
||
connect_args = {}
|
||
if effective_database_type == "postgresql":
|
||
connect_args = {
|
||
"connect_timeout": 8,
|
||
"options": "-c statement_timeout=15000",
|
||
}
|
||
engine = create_engine(
|
||
effective_database_url,
|
||
pool_pre_ping=True,
|
||
connect_args=connect_args,
|
||
)
|
||
created_engine = True
|
||
|
||
with engine.begin() as conn:
|
||
connection_opened = True
|
||
transaction_opened = True
|
||
before_by_code = _query_existing_platform_rows(conn, platform_codes)
|
||
for operation in operations:
|
||
conn.execute(text(PLATFORM_UPSERT_SQL), _operation_values(operation))
|
||
after_by_code = _query_existing_platform_rows(conn, platform_codes)
|
||
|
||
inserted_codes = [
|
||
code for code in platform_codes
|
||
if code and code not in before_by_code and code in after_by_code
|
||
]
|
||
updated_codes = [
|
||
code for code in platform_codes
|
||
if code and code in before_by_code and code in after_by_code
|
||
]
|
||
return {
|
||
"mode": "seed_writer_cli_executed",
|
||
"database_connection_opened": connection_opened,
|
||
"database_session_created": False,
|
||
"explicit_transaction_opened": transaction_opened,
|
||
"writes_executed": True,
|
||
"would_write_database": True,
|
||
"database_write_executed": True,
|
||
"database_commit_executed": True,
|
||
"database_rollback_executed": False,
|
||
"external_network_executed": False,
|
||
"scheduler_attached": False,
|
||
"operation_count": len(operations),
|
||
"existing_rows_seen": len(before_by_code),
|
||
"rows_after": len(after_by_code),
|
||
"inserted_codes": inserted_codes,
|
||
"updated_codes": updated_codes,
|
||
"affected_codes": [code for code in platform_codes if code],
|
||
"rollback_note": (
|
||
"若需回退且尚未建立 campaign 關聯,可人工刪除本次 affected_codes;"
|
||
"此 CLI 不自動刪資料。"
|
||
),
|
||
}
|
||
except Exception as exc:
|
||
return {
|
||
"mode": "seed_writer_cli_execute_error",
|
||
"database_connection_opened": connection_opened,
|
||
"database_session_created": False,
|
||
"explicit_transaction_opened": transaction_opened,
|
||
"writes_executed": False,
|
||
"would_write_database": True,
|
||
"database_write_executed": False,
|
||
"database_commit_executed": False,
|
||
"database_rollback_executed": bool(transaction_opened),
|
||
"external_network_executed": False,
|
||
"scheduler_attached": False,
|
||
"operation_count": len(operations),
|
||
"error_message": str(exc),
|
||
}
|
||
finally:
|
||
if created_engine:
|
||
engine.dispose()
|
||
|
||
|
||
def build_seed_writer_cli_plan(
|
||
*,
|
||
platform_code,
|
||
execute_requested,
|
||
approval_token,
|
||
approval_token_secret=None,
|
||
apply_real_write=False,
|
||
seed_plan,
|
||
write_guard,
|
||
writer_plan,
|
||
migration_blueprint,
|
||
engine=None,
|
||
database_url=None,
|
||
database_type=None,
|
||
):
|
||
"""建立 seed writer CLI plan,必要時執行受控 seed upsert。"""
|
||
approval_token_present = bool(approval_token)
|
||
approval_token_secret = approval_token_secret or os.getenv(APPROVAL_ENV_VAR)
|
||
approval_token_secret_configured = bool(approval_token_secret)
|
||
approval_token_valid = _approval_token_valid(approval_token, approval_token_secret)
|
||
migration_ready = bool(
|
||
migration_blueprint.get("file_created")
|
||
and migration_blueprint.get("file_matches_blueprint")
|
||
and not migration_blueprint.get("migration_executed")
|
||
)
|
||
schema_smoke_passed = bool(writer_plan.get("schema_smoke", {}).get("passed"))
|
||
seed_rows_present = bool(seed_plan.get("seed_count"))
|
||
gates = [
|
||
{
|
||
"key": "script_created",
|
||
"label": "scripts/market_intel_seed_writer.py exists",
|
||
"passed": True,
|
||
},
|
||
{
|
||
"key": "migration_file_matches_blueprint",
|
||
"label": "migration draft exists and matches the reviewed blueprint",
|
||
"passed": migration_ready,
|
||
},
|
||
{
|
||
"key": "execute_requested",
|
||
"label": "--execute flag was explicitly provided",
|
||
"passed": bool(execute_requested),
|
||
},
|
||
{
|
||
"key": "approval_token_present",
|
||
"label": f"{APPROVAL_ENV_VAR} or --approval-token was provided",
|
||
"passed": approval_token_present,
|
||
},
|
||
{
|
||
"key": "approval_token_secret_configured",
|
||
"label": f"{APPROVAL_ENV_VAR} environment token is configured",
|
||
"passed": approval_token_secret_configured,
|
||
},
|
||
{
|
||
"key": "approval_token_valid",
|
||
"label": "approval token matches the configured one-time environment token",
|
||
"passed": approval_token_valid,
|
||
},
|
||
{
|
||
"key": "apply_real_write_requested",
|
||
"label": "--apply-real-write flag was explicitly provided",
|
||
"passed": bool(apply_real_write),
|
||
},
|
||
{
|
||
"key": "schema_smoke_passed",
|
||
"label": "market_intel ORM schema smoke passed",
|
||
"passed": schema_smoke_passed,
|
||
},
|
||
{
|
||
"key": "seed_rows_present",
|
||
"label": "platform seed rows are present",
|
||
"passed": seed_rows_present,
|
||
},
|
||
{
|
||
"key": "manual_operator_approval",
|
||
"label": "operator approval confirmed through CLI token and apply flag",
|
||
"passed": bool(execute_requested and apply_real_write and approval_token_valid),
|
||
},
|
||
{
|
||
"key": "crawler_stays_disabled",
|
||
"label": "crawler remains detached from this seed writer",
|
||
"passed": not bool(seed_plan.get("status", {}).get("crawler_enabled")),
|
||
},
|
||
]
|
||
blocked_reasons = [gate["key"] for gate in gates if not gate["passed"]]
|
||
ready_for_real_write = bool(execute_requested and not blocked_reasons)
|
||
transaction_preview = build_seed_transaction_preview(
|
||
writer_plan=writer_plan,
|
||
migration_blueprint=migration_blueprint,
|
||
)
|
||
execution_result = None
|
||
if ready_for_real_write:
|
||
execution_result = execute_seed_writer_transaction(
|
||
writer_plan=writer_plan,
|
||
database_url=database_url,
|
||
database_type=database_type,
|
||
engine=engine,
|
||
)
|
||
if execution_result["mode"] == "seed_writer_cli_execute_error":
|
||
blocked_reasons = ["seed_writer_execute_error"]
|
||
|
||
writes_executed = bool(execution_result and execution_result.get("writes_executed"))
|
||
return {
|
||
"mode": (
|
||
execution_result["mode"]
|
||
if execution_result
|
||
else "seed_writer_cli_ready"
|
||
if ready_for_real_write
|
||
else "seed_writer_cli_blocked"
|
||
),
|
||
"platform_code": platform_code or "all",
|
||
"execute_requested": bool(execute_requested),
|
||
"apply_real_write_requested": bool(apply_real_write),
|
||
"approval_token_present": approval_token_present,
|
||
"approval_token_valid": approval_token_valid,
|
||
"approval_env_var": APPROVAL_ENV_VAR,
|
||
"approval_token_secret_configured": approval_token_secret_configured,
|
||
"api_reads_approval_token": False,
|
||
"api_executes_cli": False,
|
||
"api_writes_database": False,
|
||
"ready_for_real_write": ready_for_real_write,
|
||
"writes_executed": writes_executed,
|
||
"would_write_database": bool(ready_for_real_write),
|
||
"database_session_created": False,
|
||
"database_connection_opened": bool(
|
||
execution_result and execution_result.get("database_connection_opened")
|
||
),
|
||
"explicit_transaction_opened": bool(
|
||
execution_result and execution_result.get("explicit_transaction_opened")
|
||
),
|
||
"database_write_executed": bool(
|
||
execution_result and execution_result.get("database_write_executed")
|
||
),
|
||
"database_commit_executed": bool(
|
||
execution_result and execution_result.get("database_commit_executed")
|
||
),
|
||
"database_rollback_executed": bool(
|
||
execution_result and execution_result.get("database_rollback_executed")
|
||
),
|
||
"external_network_executed": False,
|
||
"scheduler_attached": False,
|
||
"exit_code": 0 if writes_executed else 2 if execute_requested else 0,
|
||
"blocked_reasons": blocked_reasons,
|
||
"approval_gates": gates,
|
||
"seed_count": int(seed_plan.get("seed_count") or 0),
|
||
"writer_operation_count": int(writer_plan.get("operation_count") or 0),
|
||
"transaction_preview": transaction_preview,
|
||
"execution_result": execution_result,
|
||
"write_guard_summary": {
|
||
"ready_to_write": bool(write_guard.get("ready_to_write")),
|
||
"would_write_database": bool(write_guard.get("would_write_database")),
|
||
"database_write_allowed": bool(write_guard.get("database_write_allowed")),
|
||
"blocked_reasons": write_guard.get("blocked_reasons", []),
|
||
},
|
||
"migration_file_summary": {
|
||
"suggested_filename": migration_blueprint.get("suggested_filename"),
|
||
"file_created": bool(migration_blueprint.get("file_created")),
|
||
"file_matches_blueprint": bool(migration_blueprint.get("file_matches_blueprint")),
|
||
"migration_executed": bool(migration_blueprint.get("migration_executed")),
|
||
},
|
||
"safety_contract": {
|
||
"refuses_execute_without_apply_flag": True,
|
||
"requires_independent_approval_token": True,
|
||
"keeps_crawler_disabled_for_seed_write": True,
|
||
"uses_core_connection_not_orm_session": True,
|
||
"target_table": "market_platforms",
|
||
},
|
||
}
|