Files
ewoooc/services/market_intel/seed_writer_cli.py
OoO a4590db8ba
All checks were successful
CD Pipeline / deploy (push) Successful in 1m3s
[V10.323] 明確標示 seed writer API token 邊界
2026-05-20 12:37:31 +08:00

410 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""市場情報 seed writer CLI。
預設只回報 CLI 執行計畫。只有 CLI 明確帶入 execute、apply-real-write 與確認 token
時,才會以短 transaction upsert market_platforms seed rows。
"""
import hashlib
import hmac
import json
import os
from sqlalchemy import bindparam, create_engine, text
APPROVAL_ENV_VAR = "MARKET_INTEL_SEED_WRITE_APPROVAL"
MIN_APPROVAL_TOKEN_LENGTH = 16
PLATFORM_UPSERT_SQL = """
INSERT INTO market_platforms (
code,
name,
base_url,
enabled,
crawl_policy_json,
created_at,
updated_at
) VALUES (
:code,
:name,
:base_url,
:enabled,
:crawl_policy_json,
CURRENT_TIMESTAMP,
CURRENT_TIMESTAMP
)
ON CONFLICT (code) DO UPDATE SET
name = EXCLUDED.name,
base_url = EXCLUDED.base_url,
enabled = EXCLUDED.enabled,
crawl_policy_json = EXCLUDED.crawl_policy_json,
updated_at = CURRENT_TIMESTAMP
""".strip()
def _payload_hash(payload):
encoded = json.dumps(payload, ensure_ascii=False, sort_keys=True).encode("utf-8")
return hashlib.sha256(encoded).hexdigest()[:16]
def build_seed_transaction_preview(writer_plan, migration_blueprint):
"""建立 seed writer transaction preview不建立 DB session。"""
statements = []
for index, operation in enumerate(writer_plan.get("operations", []), start=1):
values = operation.get("values", {})
lookup_code = operation.get("lookup", {}).get("code") or values.get("code")
statements.append(
{
"index": index,
"operation": operation.get("operation", "upsert"),
"table": operation.get("table", "market_platforms"),
"lookup": {"code": lookup_code},
"sql_template": PLATFORM_UPSERT_SQL,
"parameter_keys": sorted(values),
"parameter_payload_hash": _payload_hash(values),
"idempotency_key": f"market_platforms:{lookup_code}",
"diff_status": "not_loaded_no_db_session",
"write_status": "blocked_transaction_preview_only",
}
)
migration_ready = bool(
migration_blueprint.get("file_created")
and migration_blueprint.get("file_matches_blueprint")
and not migration_blueprint.get("migration_executed")
)
return {
"mode": "seed_transaction_preview_no_session",
"target_table": "market_platforms",
"statement_count": len(statements),
"statements": statements,
"migration_draft_ready": migration_ready,
"database_snapshot_loaded": False,
"existing_rows_seen": 0,
"database_session_created": False,
"transaction_opened": False,
"writes_executed": False,
"would_write_database": False,
"database_commit_executed": False,
"database_rollback_executed": False,
"external_network_executed": False,
"scheduler_attached": False,
"required_runtime_order": [
"backup_verified",
"migration_applied_by_operator",
"schema_smoke_passed",
"feature_flags_reviewed",
"one_time_approval_token_verified",
"apply_real_write_flag_verified",
],
"safety_contract": {
"idempotent_upsert_preview_only": True,
"does_not_load_existing_rows": True,
"does_not_open_transaction": True,
"does_not_commit": True,
},
}
def _query_existing_platform_rows(conn, platform_codes):
if not platform_codes:
return {}
rows = conn.execute(
text(
"""
SELECT code, name, base_url, enabled, crawl_policy_json
FROM market_platforms
WHERE code IN :platform_codes
ORDER BY code
"""
).bindparams(bindparam("platform_codes", expanding=True)),
{"platform_codes": tuple(platform_codes)},
).fetchall()
return {
row._mapping["code"]: dict(row._mapping)
for row in rows
}
def _operation_values(operation):
values = dict(operation.get("values", {}))
values["enabled"] = bool(values.get("enabled", False))
return values
def _approval_token_valid(approval_token, approval_token_secret):
if not approval_token or not approval_token_secret:
return False
if len(str(approval_token_secret)) < MIN_APPROVAL_TOKEN_LENGTH:
return False
return hmac.compare_digest(str(approval_token), str(approval_token_secret))
def execute_seed_writer_transaction(
*,
writer_plan,
database_url=None,
database_type=None,
engine=None,
):
"""執行 market_platforms seed upsert不建立 ORM session。"""
operations = list(writer_plan.get("operations", []))
platform_codes = [
operation.get("lookup", {}).get("code") or operation.get("values", {}).get("code")
for operation in operations
]
effective_database_type = (database_type or "").lower()
effective_database_url = database_url
created_engine = False
connection_opened = False
transaction_opened = False
try:
if engine is None:
if not effective_database_url:
from config import DATABASE_PATH, DATABASE_TYPE
effective_database_url = DATABASE_PATH
effective_database_type = (database_type or DATABASE_TYPE or "").lower()
connect_args = {}
if effective_database_type == "postgresql":
connect_args = {
"connect_timeout": 8,
"options": "-c statement_timeout=15000",
}
engine = create_engine(
effective_database_url,
pool_pre_ping=True,
connect_args=connect_args,
)
created_engine = True
with engine.begin() as conn:
connection_opened = True
transaction_opened = True
before_by_code = _query_existing_platform_rows(conn, platform_codes)
for operation in operations:
conn.execute(text(PLATFORM_UPSERT_SQL), _operation_values(operation))
after_by_code = _query_existing_platform_rows(conn, platform_codes)
inserted_codes = [
code for code in platform_codes
if code and code not in before_by_code and code in after_by_code
]
updated_codes = [
code for code in platform_codes
if code and code in before_by_code and code in after_by_code
]
return {
"mode": "seed_writer_cli_executed",
"database_connection_opened": connection_opened,
"database_session_created": False,
"explicit_transaction_opened": transaction_opened,
"writes_executed": True,
"would_write_database": True,
"database_write_executed": True,
"database_commit_executed": True,
"database_rollback_executed": False,
"external_network_executed": False,
"scheduler_attached": False,
"operation_count": len(operations),
"existing_rows_seen": len(before_by_code),
"rows_after": len(after_by_code),
"inserted_codes": inserted_codes,
"updated_codes": updated_codes,
"affected_codes": [code for code in platform_codes if code],
"rollback_note": (
"若需回退且尚未建立 campaign 關聯,可人工刪除本次 affected_codes"
"此 CLI 不自動刪資料。"
),
}
except Exception as exc:
return {
"mode": "seed_writer_cli_execute_error",
"database_connection_opened": connection_opened,
"database_session_created": False,
"explicit_transaction_opened": transaction_opened,
"writes_executed": False,
"would_write_database": True,
"database_write_executed": False,
"database_commit_executed": False,
"database_rollback_executed": bool(transaction_opened),
"external_network_executed": False,
"scheduler_attached": False,
"operation_count": len(operations),
"error_message": str(exc),
}
finally:
if created_engine:
engine.dispose()
def build_seed_writer_cli_plan(
*,
platform_code,
execute_requested,
approval_token,
approval_token_secret=None,
apply_real_write=False,
seed_plan,
write_guard,
writer_plan,
migration_blueprint,
engine=None,
database_url=None,
database_type=None,
):
"""建立 seed writer CLI plan必要時執行受控 seed upsert。"""
approval_token_present = bool(approval_token)
approval_token_secret = approval_token_secret or os.getenv(APPROVAL_ENV_VAR)
approval_token_secret_configured = bool(approval_token_secret)
approval_token_valid = _approval_token_valid(approval_token, approval_token_secret)
migration_ready = bool(
migration_blueprint.get("file_created")
and migration_blueprint.get("file_matches_blueprint")
and not migration_blueprint.get("migration_executed")
)
schema_smoke_passed = bool(writer_plan.get("schema_smoke", {}).get("passed"))
seed_rows_present = bool(seed_plan.get("seed_count"))
gates = [
{
"key": "script_created",
"label": "scripts/market_intel_seed_writer.py exists",
"passed": True,
},
{
"key": "migration_file_matches_blueprint",
"label": "migration draft exists and matches the reviewed blueprint",
"passed": migration_ready,
},
{
"key": "execute_requested",
"label": "--execute flag was explicitly provided",
"passed": bool(execute_requested),
},
{
"key": "approval_token_present",
"label": f"{APPROVAL_ENV_VAR} or --approval-token was provided",
"passed": approval_token_present,
},
{
"key": "approval_token_secret_configured",
"label": f"{APPROVAL_ENV_VAR} environment token is configured",
"passed": approval_token_secret_configured,
},
{
"key": "approval_token_valid",
"label": "approval token matches the configured one-time environment token",
"passed": approval_token_valid,
},
{
"key": "apply_real_write_requested",
"label": "--apply-real-write flag was explicitly provided",
"passed": bool(apply_real_write),
},
{
"key": "schema_smoke_passed",
"label": "market_intel ORM schema smoke passed",
"passed": schema_smoke_passed,
},
{
"key": "seed_rows_present",
"label": "platform seed rows are present",
"passed": seed_rows_present,
},
{
"key": "manual_operator_approval",
"label": "operator approval confirmed through CLI token and apply flag",
"passed": bool(execute_requested and apply_real_write and approval_token_valid),
},
{
"key": "crawler_stays_disabled",
"label": "crawler remains detached from this seed writer",
"passed": not bool(seed_plan.get("status", {}).get("crawler_enabled")),
},
]
blocked_reasons = [gate["key"] for gate in gates if not gate["passed"]]
ready_for_real_write = bool(execute_requested and not blocked_reasons)
transaction_preview = build_seed_transaction_preview(
writer_plan=writer_plan,
migration_blueprint=migration_blueprint,
)
execution_result = None
if ready_for_real_write:
execution_result = execute_seed_writer_transaction(
writer_plan=writer_plan,
database_url=database_url,
database_type=database_type,
engine=engine,
)
if execution_result["mode"] == "seed_writer_cli_execute_error":
blocked_reasons = ["seed_writer_execute_error"]
writes_executed = bool(execution_result and execution_result.get("writes_executed"))
return {
"mode": (
execution_result["mode"]
if execution_result
else "seed_writer_cli_ready"
if ready_for_real_write
else "seed_writer_cli_blocked"
),
"platform_code": platform_code or "all",
"execute_requested": bool(execute_requested),
"apply_real_write_requested": bool(apply_real_write),
"approval_token_present": approval_token_present,
"approval_token_valid": approval_token_valid,
"approval_env_var": APPROVAL_ENV_VAR,
"approval_token_secret_configured": approval_token_secret_configured,
"api_reads_approval_token": False,
"api_executes_cli": False,
"api_writes_database": False,
"ready_for_real_write": ready_for_real_write,
"writes_executed": writes_executed,
"would_write_database": bool(ready_for_real_write),
"database_session_created": False,
"database_connection_opened": bool(
execution_result and execution_result.get("database_connection_opened")
),
"explicit_transaction_opened": bool(
execution_result and execution_result.get("explicit_transaction_opened")
),
"database_write_executed": bool(
execution_result and execution_result.get("database_write_executed")
),
"database_commit_executed": bool(
execution_result and execution_result.get("database_commit_executed")
),
"database_rollback_executed": bool(
execution_result and execution_result.get("database_rollback_executed")
),
"external_network_executed": False,
"scheduler_attached": False,
"exit_code": 0 if writes_executed else 2 if execute_requested else 0,
"blocked_reasons": blocked_reasons,
"approval_gates": gates,
"seed_count": int(seed_plan.get("seed_count") or 0),
"writer_operation_count": int(writer_plan.get("operation_count") or 0),
"transaction_preview": transaction_preview,
"execution_result": execution_result,
"write_guard_summary": {
"ready_to_write": bool(write_guard.get("ready_to_write")),
"would_write_database": bool(write_guard.get("would_write_database")),
"database_write_allowed": bool(write_guard.get("database_write_allowed")),
"blocked_reasons": write_guard.get("blocked_reasons", []),
},
"migration_file_summary": {
"suggested_filename": migration_blueprint.get("suggested_filename"),
"file_created": bool(migration_blueprint.get("file_created")),
"file_matches_blueprint": bool(migration_blueprint.get("file_matches_blueprint")),
"migration_executed": bool(migration_blueprint.get("migration_executed")),
},
"safety_contract": {
"refuses_execute_without_apply_flag": True,
"requires_independent_approval_token": True,
"keeps_crawler_disabled_for_seed_write": True,
"uses_core_connection_not_orm_session": True,
"target_table": "market_platforms",
},
}