diff --git a/TODO_NEXT_STEPS.txt b/TODO_NEXT_STEPS.txt index ed5d0a5..305d8a8 100644 --- a/TODO_NEXT_STEPS.txt +++ b/TODO_NEXT_STEPS.txt @@ -118,6 +118,7 @@ - V10.229 修正 PPT 視覺 QA 多 worker 狀態漂移:將 queued/running/completed/error 寫入 `/app/data/ppt_vision_audit_status.json` runtime state,所有 Gunicorn worker 共用同一份狀態並阻擋重複排入。 - Phase 55 candidate queue writer CLI gate:新增 `/api/market_intel/manual_sample_review/candidate_queue_writer_status` POST、`scripts/market_intel_candidate_queue_writer.py` 與 UI writer gate 按鈕,定義 `MARKET_INTEL_QUEUE_WRITE_APPROVAL` 一次性 token、execute/apply flags、備份、migration smoke 與 rollback gate;本階段仍不開 DB connection、不寫 `market_alert_review_queue`、不 commit、不掛 scheduler;版本同步至 V10.230。 - Phase 56 candidate queue writer preflight:新增 `/api/market_intel/manual_sample_review/candidate_queue_writer_preflight` POST 與 `services/market_intel/candidate_queue_writer_preflight.py`,檢查 transaction payload key 到 `market_alert_review_queue` 欄位映射、缺欄與 dedupe unique index;頁面預設 execute=false 不連 DB,CLI 可明確 `--read-only-preflight` 只讀 catalog;版本同步至 V10.232。 + - Phase 57 candidate queue writer CLI transaction:`scripts/market_intel_candidate_queue_writer.py` 在 CLI-only 情境支援受控 transaction,必須同時通過 transaction payload、read-only preflight、`--execute`、`--apply-real-write`、一次性 token、備份確認與 migration live smoke 才會以 SQLAlchemy Core idempotent insert `market_alert_review_queue`;API/UI 仍不傳 token、不連 DB、不寫 queue、不掛 scheduler;版本同步至 V10.234。 - Schema smoke:`tests/test_market_intel_skeleton.py` 檢查 `Base.metadata` 內含 ADR-035 八張 `market_*` tables。 - Desktop UI QA:本機只註冊 `market_intel_bp` 的 Flask harness 載入 `/market_intel`,確認 Phase 15、候選預覽、writer preview、安全 flags、點陣暖紙視覺正常,console error 0。 - API QA:`/api/market_intel/schema_smoke` 通過 7 張表與 `market_platforms` 必要欄位檢查;`/api/market_intel/platform_seed_writer_plan` 回傳 4 筆 dry-run upsert preview,`writes_executed=false`,四平台皆 `blocked_dry_run_only`。 @@ -128,7 +129,7 @@ - 補 UI preview panel 真 390px 截圖 QA;本輪 in-app browser 不支援直接設定 viewport,且 data URL iframe QA 被瀏覽器安全策略阻擋,不做繞過。 - 正式端 seed writer token-hardening drift:優先白名單同步 main 的一次性環境 token 版本,避免 API 暴露固定 approval token hint。 - 正式推版前需實際執行 worktree scope review、`python backup_system.py`、commit/push 目標變更、讀 deployment SOP 與 ADR-011,且只跑 `/health` 與市場情報頁 smoke。 - - 下一步才可在明確批准後設計 seed writer 的真寫入 implementation gate;預設環境不得寫 DB,也不得執行 migration。 + - 下一步才可在明確批准後做正式 DB 的 queue writer operator drill;預設 API/UI 不得寫 DB,也不得執行 migration。 - 市場情報 UI 後續頁面必須沿用 V2 暖紙、暖墨、等寬數字與點陣風格,禁止複製巨型分析頁 template 模式。 ================================================================================ diff --git a/config.py b/config.py index 2e14a4e..5e102aa 100644 --- a/config.py +++ b/config.py @@ -320,7 +320,7 @@ YOUTUBE_API_KEY = os.getenv('YOUTUBE_API_KEY', '') # ========================================== # 系統版本與路徑 # ========================================== -SYSTEM_VERSION = "V10.233" +SYSTEM_VERSION = "V10.234" LOG_FILE_PATH = os.path.join(BASE_DIR, 'logs/system.log') public_url = PUBLIC_URL # 用於模板顯示 diff --git a/docs/adr/ADR-035-cross-platform-market-campaign-intelligence.md b/docs/adr/ADR-035-cross-platform-market-campaign-intelligence.md index 84ecf85..208f84d 100644 --- a/docs/adr/ADR-035-cross-platform-market-campaign-intelligence.md +++ b/docs/adr/ADR-035-cross-platform-market-campaign-intelligence.md @@ -182,6 +182,7 @@ EwoooC 目前已有 MOMO EDM / 節慶活動資料、`promo_products`、PChome - 2026-05-19 追加 manual sample candidate queue transaction:`/api/market_intel/manual_sample_review/candidate_queue_transaction` 將 queue row preview 轉成 idempotent insert statement preview、payload hash、runtime order 與 rollback plan。此入口保持 CSRF 保護,不開 DB connection、不開 transaction、不 commit、不建立 approval record、不寫 `market_alert_review_queue`。 - 2026-05-19 追加 candidate queue writer CLI gate:`services.market_intel.candidate_queue_writer_cli`、`scripts/market_intel_candidate_queue_writer.py` 與 `/api/market_intel/manual_sample_review/candidate_queue_writer_status` 定義 `MARKET_INTEL_QUEUE_WRITE_APPROVAL` 一次性 token、execute/apply flags、備份、migration smoke 與 rollback gate。此階段只回 writer status,不開 DB connection、不啟用實際 writer、不寫 `market_alert_review_queue`、不掛 scheduler。 - 2026-05-19 追加 candidate queue writer preflight:`services.market_intel.candidate_queue_writer_preflight` 與 `/api/market_intel/manual_sample_review/candidate_queue_writer_preflight` 檢查 transaction preview payload key 到 `market_alert_review_queue` 欄位映射、缺欄與 dedupe unique index。UI 預設 `execute=false` 不連 DB;CLI 明確 `--read-only-preflight` 時也只查 catalog,不寫 DB、不 commit、不掛 scheduler。 +- 2026-05-19 追加 candidate queue writer CLI transaction:`scripts/market_intel_candidate_queue_writer.py` 在 CLI-only 情境支援受控 idempotent insert transaction,必須同時通過 transaction payload、read-only preflight、`--execute`、`--apply-real-write`、一次性 approval token、備份確認與 migration live smoke。API/UI 仍不得讀取 approval token、不得開 DB connection、不得寫 `market_alert_review_queue`、不得掛 scheduler;部署 smoke 不執行正式 DB 寫入。 ### Phase 4:Coupang / Shopee Adapter diff --git a/routes/README.md b/routes/README.md index 5dc48d2..637efcf 100644 --- a/routes/README.md +++ b/routes/README.md @@ -19,7 +19,7 @@ | `edm_routes.py` | EDM 與節慶儀表板 | `/edm`, `/festival` | | `monthly_routes.py` | 月結分析 | `/monthly_summary_analysis`, `/api/monthly_summary_data` | | `daily_sales_routes.py` | 當日業績 | `/daily_sales`, `/daily_sales/export*` | -| `market_intel_routes.py` | 市場情報 Phase 56 candidate queue writer preflight | `/market_intel`, `/market_intel/*`, `/api/market_intel/status`, `/api/market_intel/schema`, `/api/market_intel/schema_smoke`, `/api/market_intel/schema_db_probe`, `/api/market_intel/platform_seed_db_diff`, `/api/market_intel/legacy_source_bridge`, `/api/market_intel/mcp_readiness`, `/api/market_intel/mcp_tool_contract`, `/api/market_intel/mcp_deploy_preflight`, `/api/market_intel/mcp_activation_runbook`, `/api/market_intel/mcp_fetch_gate`, `/api/market_intel/scheduler_plan`, `/api/market_intel/manual_sample_plan`, `/api/market_intel/manual_sample_acceptance`, `/api/market_intel/manual_sample_review`, `/api/market_intel/manual_sample_review/evaluate`, `/api/market_intel/manual_sample_review/candidate_handoff`, `/api/market_intel/manual_sample_review/candidate_queue_draft`, `/api/market_intel/manual_sample_review/candidate_queue_approval`, `/api/market_intel/manual_sample_review/candidate_queue_transaction`, `/api/market_intel/manual_sample_review/candidate_queue_writer_status`, `/api/market_intel/manual_sample_review/candidate_queue_writer_preflight`, `/api/market_intel/match_review_plan`, `/api/market_intel/opportunity_plan`, `/api/market_intel/opportunity_scoring_plan`, `/api/market_intel/opportunity_evidence_plan`, `/api/market_intel/opportunity_alert_plan`, `/api/market_intel/adapters`, `/api/market_intel/dry_run_plan`, `/api/market_intel/discovery_plan`, `/api/market_intel/manual_discovery`, `/api/market_intel/candidate_preview`, `/api/market_intel/platform_seed_plan`, `/api/market_intel/platform_seed_write_guard`, `/api/market_intel/platform_seed_writer_plan`, `/api/market_intel/migration_blueprint`, `/api/market_intel/migration_apply_drill`, `/api/market_intel/migration_catalog_review`, `/api/market_intel/migration_live_smoke`, `/api/market_intel/live_db_inventory`, `/api/market_intel/seed_writer_cli_status`, `/api/market_intel/write_approval_runbook`, `/api/market_intel/deployment_readiness` | +| `market_intel_routes.py` | 市場情報 Phase 57 candidate queue writer CLI transaction | `/market_intel`, `/market_intel/*`, `/api/market_intel/status`, `/api/market_intel/schema`, `/api/market_intel/schema_smoke`, `/api/market_intel/schema_db_probe`, `/api/market_intel/platform_seed_db_diff`, `/api/market_intel/legacy_source_bridge`, `/api/market_intel/mcp_readiness`, `/api/market_intel/mcp_tool_contract`, `/api/market_intel/mcp_deploy_preflight`, `/api/market_intel/mcp_activation_runbook`, `/api/market_intel/mcp_fetch_gate`, `/api/market_intel/scheduler_plan`, `/api/market_intel/manual_sample_plan`, `/api/market_intel/manual_sample_acceptance`, `/api/market_intel/manual_sample_review`, `/api/market_intel/manual_sample_review/evaluate`, `/api/market_intel/manual_sample_review/candidate_handoff`, `/api/market_intel/manual_sample_review/candidate_queue_draft`, `/api/market_intel/manual_sample_review/candidate_queue_approval`, `/api/market_intel/manual_sample_review/candidate_queue_transaction`, `/api/market_intel/manual_sample_review/candidate_queue_writer_status`, `/api/market_intel/manual_sample_review/candidate_queue_writer_preflight`, `/api/market_intel/match_review_plan`, `/api/market_intel/opportunity_plan`, `/api/market_intel/opportunity_scoring_plan`, `/api/market_intel/opportunity_evidence_plan`, `/api/market_intel/opportunity_alert_plan`, `/api/market_intel/adapters`, `/api/market_intel/dry_run_plan`, `/api/market_intel/discovery_plan`, `/api/market_intel/manual_discovery`, `/api/market_intel/candidate_preview`, `/api/market_intel/platform_seed_plan`, `/api/market_intel/platform_seed_write_guard`, `/api/market_intel/platform_seed_writer_plan`, `/api/market_intel/migration_blueprint`, `/api/market_intel/migration_apply_drill`, `/api/market_intel/migration_catalog_review`, `/api/market_intel/migration_live_smoke`, `/api/market_intel/live_db_inventory`, `/api/market_intel/seed_writer_cli_status`, `/api/market_intel/write_approval_runbook`, `/api/market_intel/deployment_readiness` | | `api_routes.py` | 通用任務與查詢 API | `/api/run_task`, `/api/history/*` | | `export_routes.py` | 匯出功能 | `/api/export/*` | | `import_routes.py` | 匯入功能 | `/api/import_excel`, `/api/import/monthly_summary` | diff --git a/scripts/market_intel_candidate_queue_writer.py b/scripts/market_intel_candidate_queue_writer.py index da8b1c3..4f059e1 100755 --- a/scripts/market_intel_candidate_queue_writer.py +++ b/scripts/market_intel_candidate_queue_writer.py @@ -1,9 +1,10 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- -"""Market intelligence candidate queue writer CLI gate. +"""Market intelligence candidate queue writer CLI. -This script prints a JSON gate status. Real queue writes are intentionally -disabled in this phase even when --execute and --apply-real-write are provided. +This script prints a JSON gate status by default. Real queue writes require +--execute, --apply-real-write, --read-only-preflight, backup/smoke confirmations, +and a one-time approval token. """ import argparse @@ -41,12 +42,12 @@ def parse_args(argv=None): parser.add_argument( "--execute", action="store_true", - help="Request real execution. Still blocked in this phase.", + help="Request real execution.", ) parser.add_argument( "--apply-real-write", action="store_true", - help="Request the guarded queue write transaction. Still blocked in this phase.", + help="Request the guarded queue write transaction.", ) parser.add_argument( "--approval-token", @@ -58,6 +59,16 @@ def parse_args(argv=None): action="store_true", help="Run read-only schema / payload preflight before returning the gate.", ) + parser.add_argument( + "--backup-verified", + action="store_true", + help="Confirm a fresh operator backup before allowing a real write.", + ) + parser.add_argument( + "--migration-live-smoke-passed", + action="store_true", + help="Confirm live schema smoke passed before allowing a real write.", + ) return parser.parse_args(argv) @@ -94,6 +105,8 @@ def main(argv=None): apply_real_write=args.apply_real_write, approval_token=args.approval_token, approval_token_secret=os.getenv(APPROVAL_ENV_VAR), + backup_verified=args.backup_verified, + migration_live_smoke_passed=args.migration_live_smoke_passed, ) plan["phase"] = service.phase print(json.dumps(plan, ensure_ascii=False, indent=2, sort_keys=True)) diff --git a/services/market_intel/candidate_queue_writer_cli.py b/services/market_intel/candidate_queue_writer_cli.py index c0e37de..e5344fb 100644 --- a/services/market_intel/candidate_queue_writer_cli.py +++ b/services/market_intel/candidate_queue_writer_cli.py @@ -1,15 +1,84 @@ -"""市場情報候選審核 queue writer CLI gate。 +"""市場情報候選審核 queue writer CLI。 -本模組只建立正式 queue writer 前的安全 gate 狀態;不開 DB connection、 -不開 transaction、不寫 market_alert_review_queue、不掛 scheduler。 +預設只建立正式 queue writer 前的安全 gate 狀態。只有 CLI 明確帶入 +execute、apply-real-write、一次性 token、備份確認、migration live smoke 與 +read-only preflight 全部通過時,才會以短 transaction 寫入 +market_alert_review_queue。 """ import hmac +import json import os +from sqlalchemy import create_engine, text + +from services.market_intel.candidate_queue_writer_preflight import ( + PAYLOAD_COLUMN_MAP, + QUEUE_TABLE, +) + APPROVAL_ENV_VAR = "MARKET_INTEL_QUEUE_WRITE_APPROVAL" MIN_APPROVAL_TOKEN_LENGTH = 16 +QUEUE_INSERT_COLUMNS = ( + "alert_candidate_id", + "review_state", + "priority_lane", + "threshold_level", + "total_score", + "evidence_bundle_id", + "dedupe_key", + "source_batch_id", + "metadata_json", +) +QUEUE_INSERT_SQL_POSTGRESQL = """ +INSERT INTO market_alert_review_queue ( + alert_candidate_id, + review_state, + priority_lane, + threshold_level, + total_score, + evidence_bundle_id, + dedupe_key, + source_batch_id, + metadata_json +) VALUES ( + :alert_candidate_id, + :review_state, + :priority_lane, + :threshold_level, + :total_score, + :evidence_bundle_id, + :dedupe_key, + :source_batch_id, + :metadata_json +) +ON CONFLICT (dedupe_key) DO NOTHING +RETURNING dedupe_key +""".strip() +QUEUE_INSERT_SQL_SQLITE = """ +INSERT OR IGNORE INTO market_alert_review_queue ( + alert_candidate_id, + review_state, + priority_lane, + threshold_level, + total_score, + evidence_bundle_id, + dedupe_key, + source_batch_id, + metadata_json +) VALUES ( + :alert_candidate_id, + :review_state, + :priority_lane, + :threshold_level, + :total_score, + :evidence_bundle_id, + :dedupe_key, + :source_batch_id, + :metadata_json +) +""".strip() def _approval_token_valid(approval_token, approval_token_secret): @@ -20,6 +89,200 @@ def _approval_token_valid(approval_token, approval_token_secret): return hmac.compare_digest(str(approval_token), str(approval_token_secret)) +def _database_type_for_engine(engine, database_type): + if database_type: + return str(database_type).lower() + dialect = getattr(getattr(engine, "dialect", None), "name", "") + return str(dialect or "").lower() + + +def _queue_insert_sql(database_type): + if database_type == "sqlite": + return QUEUE_INSERT_SQL_SQLITE + return QUEUE_INSERT_SQL_POSTGRESQL + + +def _serialize_metadata(value): + if value is None or isinstance(value, str): + return value + return json.dumps(value, ensure_ascii=False, sort_keys=True) + + +def _writer_row_from_payload(payload): + row = {} + for payload_key, column in PAYLOAD_COLUMN_MAP.items(): + if payload_key not in payload: + continue + value = payload[payload_key] + if column == "metadata_json": + value = _serialize_metadata(value) + row[column] = value + return row + + +def _writer_rows_from_transaction(transaction_preview): + rows = [] + missing_payloads = [] + invalid_rows = [] + for statement in transaction_preview.get("statements", []): + payload = statement.get("parameter_preview") + if not isinstance(payload, dict): + missing_payloads.append( + statement.get("idempotency_key") or statement.get("index") + ) + continue + row = _writer_row_from_payload(payload) + missing_columns = [ + column for column in QUEUE_INSERT_COLUMNS + if row.get(column) in (None, "") + ] + if missing_columns: + invalid_rows.append( + { + "idempotency_key": statement.get("idempotency_key"), + "missing_columns": missing_columns, + } + ) + rows.append(row) + return rows, missing_payloads, invalid_rows + + +def execute_candidate_queue_writer_transaction( + *, + transaction_preview, + database_url=None, + database_type=None, + engine=None, +): + """執行候選審核 queue insert;不建立 ORM session。""" + writer_rows, missing_payloads, invalid_rows = _writer_rows_from_transaction( + transaction_preview + ) + effective_database_url = database_url + effective_database_type = (database_type or "").lower() + created_engine = False + connection_opened = False + transaction_opened = False + write_attempted = False + + if missing_payloads or invalid_rows: + return { + "mode": "candidate_queue_writer_cli_execute_error", + "database_connection_opened": False, + "database_session_created": False, + "explicit_transaction_opened": False, + "writes_executed": False, + "would_write_database": True, + "database_write_executed": False, + "database_commit_executed": False, + "database_rollback_executed": False, + "external_network_executed": False, + "scheduler_attached": False, + "statement_count": len(writer_rows), + "inserted_count": 0, + "skipped_count": 0, + "affected_dedupe_keys": [], + "skipped_dedupe_keys": [], + "error_message": "candidate_queue_statement_payload_invalid", + "missing_payloads": missing_payloads, + "invalid_rows": invalid_rows, + } + + try: + if engine is None: + if not effective_database_url: + from config import DATABASE_PATH, DATABASE_TYPE + + effective_database_url = DATABASE_PATH + effective_database_type = ( + database_type or DATABASE_TYPE or "" + ).lower() + connect_args = {} + if effective_database_type == "postgresql": + connect_args = { + "connect_timeout": 8, + "options": "-c statement_timeout=15000", + } + engine = create_engine( + effective_database_url, + pool_pre_ping=True, + connect_args=connect_args, + ) + created_engine = True + + effective_database_type = _database_type_for_engine( + engine, + effective_database_type, + ) + insert_sql = text(_queue_insert_sql(effective_database_type)) + inserted_dedupe_keys = [] + skipped_dedupe_keys = [] + with engine.begin() as conn: + connection_opened = True + transaction_opened = True + for row in writer_rows: + write_attempted = True + result = conn.execute(insert_sql, row) + if effective_database_type == "sqlite": + inserted = int(result.rowcount or 0) > 0 + else: + inserted = bool(result.fetchall()) + if inserted: + inserted_dedupe_keys.append(row["dedupe_key"]) + else: + skipped_dedupe_keys.append(row["dedupe_key"]) + + return { + "mode": "candidate_queue_writer_cli_executed", + "database_connection_opened": connection_opened, + "database_session_created": False, + "explicit_transaction_opened": transaction_opened, + "writes_executed": True, + "would_write_database": True, + "database_write_executed": write_attempted, + "database_commit_executed": True, + "database_rollback_executed": False, + "external_network_executed": False, + "scheduler_attached": False, + "statement_count": len(writer_rows), + "inserted_count": len(inserted_dedupe_keys), + "skipped_count": len(skipped_dedupe_keys), + "affected_dedupe_keys": inserted_dedupe_keys, + "skipped_dedupe_keys": skipped_dedupe_keys, + "idempotency": { + "conflict_policy": "dedupe_key_do_nothing", + "safe_to_rerun": True, + }, + "rollback_note": ( + "若需回退,必須依 affected_dedupe_keys 人工審核清理;" + "此 CLI 不自動刪除資料。" + ), + } + except Exception as exc: + return { + "mode": "candidate_queue_writer_cli_execute_error", + "database_connection_opened": connection_opened, + "database_session_created": False, + "explicit_transaction_opened": transaction_opened, + "writes_executed": False, + "would_write_database": True, + "database_write_executed": write_attempted, + "database_commit_executed": False, + "database_rollback_executed": bool(transaction_opened), + "external_network_executed": False, + "scheduler_attached": False, + "statement_count": len(writer_rows), + "inserted_count": 0, + "skipped_count": 0, + "affected_dedupe_keys": [], + "skipped_dedupe_keys": [], + "error_message": str(exc), + } + finally: + if created_engine: + engine.dispose() + + def build_candidate_queue_writer_cli_plan( *, transaction_preview, @@ -28,8 +291,13 @@ def build_candidate_queue_writer_cli_plan( approval_token=None, approval_token_secret=None, apply_real_write=False, + backup_verified=False, + migration_live_smoke_passed=False, + database_url=None, + database_type=None, + engine=None, ): - """建立候選審核 queue writer CLI gate;本階段拒絕實際寫入。""" + """建立候選審核 queue writer CLI gate,必要時執行受控 insert。""" approval_token_present = bool(approval_token) approval_token_secret = approval_token_secret or os.getenv(APPROVAL_ENV_VAR) approval_token_secret_configured = bool(approval_token_secret) @@ -42,7 +310,11 @@ def build_candidate_queue_writer_cli_plan( preflight_ready = bool( writer_preflight and writer_preflight.get("ready_for_writer_review") ) - writer_enabled = False + writer_rows, missing_payloads, invalid_rows = _writer_rows_from_transaction( + transaction_preview + ) + writer_rows_valid = bool(writer_rows and not missing_payloads and not invalid_rows) + writer_enabled = True gates = [ { "key": "script_created", @@ -89,25 +361,32 @@ def build_candidate_queue_writer_cli_plan( "label": "候選 queue writer schema / payload preflight 已通過", "passed": preflight_ready, }, + { + "key": "transaction_statement_payloads_present", + "label": "transaction statements include reviewed parameter payloads", + "passed": writer_rows_valid, + }, { "key": "backup_verified", "label": "正式寫入前必須確認最新備份已完成", - "passed": False, + "passed": bool(backup_verified), }, { "key": "migration_live_smoke_passed", "label": "正式 schema live smoke 必須通過", - "passed": False, + "passed": bool(migration_live_smoke_passed), }, { "key": "queue_writer_implementation_enabled", - "label": "候選 queue writer 實際寫入實作仍未啟用", + "label": "候選 queue writer 實際寫入實作已限定於 CLI 啟用", "passed": writer_enabled, }, { "key": "manual_operator_approval", "label": "操作者需在 CLI 明確批准一次性寫入", - "passed": bool(execute_requested and apply_real_write and approval_token_valid), + "passed": bool( + execute_requested and apply_real_write and approval_token_valid + ), }, { "key": "crawler_stays_disabled", @@ -116,32 +395,71 @@ def build_candidate_queue_writer_cli_plan( }, ] blocked_reasons = [gate["key"] for gate in gates if not gate["passed"]] - blocked_reasons.append("candidate_queue_writer_execution_not_enabled") + ready_for_real_write = bool(execute_requested and not blocked_reasons) + execution_result = None + if ready_for_real_write: + execution_result = execute_candidate_queue_writer_transaction( + transaction_preview=transaction_preview, + database_url=database_url, + database_type=database_type, + engine=engine, + ) + if execution_result["mode"] == "candidate_queue_writer_cli_execute_error": + blocked_reasons = ["candidate_queue_writer_execute_error"] + + writes_executed = bool( + execution_result and execution_result.get("writes_executed") + ) return { - "mode": "candidate_queue_writer_cli_blocked", + "mode": ( + execution_result["mode"] + if execution_result + else "candidate_queue_writer_cli_ready" + if ready_for_real_write + else "candidate_queue_writer_cli_blocked" + ), "target_table": "market_alert_review_queue", "execute_requested": bool(execute_requested), "apply_real_write_requested": bool(apply_real_write), + "backup_verified": bool(backup_verified), + "migration_live_smoke_passed": bool(migration_live_smoke_passed), "approval_token_present": approval_token_present, "approval_token_valid": approval_token_valid, "approval_env_var": APPROVAL_ENV_VAR, "approval_token_secret_configured": approval_token_secret_configured, "queue_writer_implementation_enabled": writer_enabled, - "ready_for_real_write": False, - "writes_executed": False, - "would_write_database": False, - "database_connection_opened": False, + "ready_for_real_write": ready_for_real_write, + "writes_executed": writes_executed, + "would_write_database": bool(ready_for_real_write), + "database_connection_opened": bool( + execution_result and execution_result.get("database_connection_opened") + ), "database_session_created": False, - "explicit_transaction_opened": False, - "database_write_executed": False, - "database_commit_executed": False, - "database_rollback_executed": False, + "explicit_transaction_opened": bool( + execution_result and execution_result.get("explicit_transaction_opened") + ), + "database_write_executed": bool( + execution_result and execution_result.get("database_write_executed") + ), + "database_commit_executed": bool( + execution_result and execution_result.get("database_commit_executed") + ), + "database_rollback_executed": bool( + execution_result and execution_result.get("database_rollback_executed") + ), "external_network_executed": False, "scheduler_attached": False, - "exit_code": 2 if execute_requested else 0, + "exit_code": 0 if writes_executed else 2 if execute_requested else 0, "blocked_reasons": blocked_reasons, "approval_gates": gates, + "writer_payload_summary": { + "writer_row_count": len(writer_rows), + "missing_statement_payload_count": len(missing_payloads), + "invalid_row_count": len(invalid_rows), + "missing_payloads": missing_payloads, + "invalid_rows": invalid_rows, + }, "transaction_preview_summary": { "mode": transaction_preview.get("mode"), "transaction_preview_created": transaction_preview_created, @@ -155,6 +473,19 @@ def build_candidate_queue_writer_cli_plan( "conflict_policy": summary.get("conflict_policy"), }, "transaction_preview": transaction_preview, + "execution_result": execution_result, + "inserted_count": int( + execution_result.get("inserted_count") if execution_result else 0 + ), + "skipped_count": int( + execution_result.get("skipped_count") if execution_result else 0 + ), + "affected_dedupe_keys": ( + execution_result.get("affected_dedupe_keys") if execution_result else [] + ), + "skipped_dedupe_keys": ( + execution_result.get("skipped_dedupe_keys") if execution_result else [] + ), "writer_preflight": writer_preflight or { "mode": "candidate_queue_writer_preflight_not_requested", "ready_for_writer_review": False, @@ -166,26 +497,27 @@ def build_candidate_queue_writer_cli_plan( "rollback_plan": [ { "key": "no_write_no_db_rollback_required", - "label": "本階段不寫 DB;若被阻擋,不需要 DB rollback", + "label": "若被 gate 阻擋且未寫 DB,不需要 DB rollback", }, { - "key": "future_dedupe_key_cleanup_review", - "label": "未來正式寫入若需回退,必須依 dedupe_key 人工審核清理", + "key": "dedupe_key_cleanup_review", + "label": "正式寫入後若需回退,必須依 affected_dedupe_keys 人工審核清理", }, ], "safety_contract": { "refuses_api_execution": True, "refuses_execute_without_apply_flag": True, "requires_independent_approval_token": True, - "does_not_open_transaction_from_status": True, - "does_not_commit_from_status": True, + "requires_backup_verified": True, + "requires_migration_live_smoke": True, + "uses_core_connection_not_orm_session": True, "keeps_crawler_disabled_for_queue_write": True, "target_table": "market_alert_review_queue", }, "safe_boundaries": [ "do_not_execute_candidate_queue_writer_from_api", - "do_not_open_database_connection_from_queue_writer_status", - "do_not_commit_queue_writer_status", + "do_not_open_database_connection_from_api_queue_writer_status", + "do_not_commit_api_queue_writer_status", "do_not_attach_scheduler_from_queue_writer", "no_remove_orphans", "no_momo_db_lifecycle_change", diff --git a/services/market_intel/candidate_queue_writer_preflight.py b/services/market_intel/candidate_queue_writer_preflight.py index 423058c..00f64e5 100644 --- a/services/market_intel/candidate_queue_writer_preflight.py +++ b/services/market_intel/candidate_queue_writer_preflight.py @@ -75,7 +75,7 @@ def _planned_result(transaction_preview, payload_keys, mapped, unmapped): "blocked_reasons": [ "execute_false_planned_only", "queue_writer_preflight_not_loaded", - "candidate_queue_writer_execution_not_enabled", + "candidate_queue_writer_real_write_requires_cli", ], "safety_contract": _safety_contract(), } @@ -222,7 +222,7 @@ def build_candidate_queue_writer_preflight( schema_ready = bool(table_exists and not missing_columns and dedupe_unique) payload_mappable = bool(payload_keys and not unmapped) ready_for_writer_review = bool(schema_ready and payload_mappable) - blocked_reasons = ["candidate_queue_writer_execution_not_enabled"] + blocked_reasons = [] if not table_exists: blocked_reasons.insert(0, "queue_table_missing") if missing_columns: @@ -298,7 +298,6 @@ def build_candidate_queue_writer_preflight( "index_names": [], "blocked_reasons": [ "queue_writer_preflight_error", - "candidate_queue_writer_execution_not_enabled", ], "error_message": str(exc), "safety_contract": _safety_contract(), diff --git a/services/market_intel/deployment_readiness.py b/services/market_intel/deployment_readiness.py index 6baf54c..aeaf8e1 100644 --- a/services/market_intel/deployment_readiness.py +++ b/services/market_intel/deployment_readiness.py @@ -368,8 +368,8 @@ def build_deployment_readiness_preview( }, { "key": "database_write_blocked", - "label": "writer 仍是 preview_only_no_session_no_commit;異常時不需要 DB rollback", - "trigger": "seed writer 或 schema smoke 異常", + "label": "API/UI writer 仍不寫 DB;真寫入只允許 CLI 在完整 gate 通過後執行", + "trigger": "queue writer、seed writer 或 schema smoke 異常", }, ] safe_deploy_boundaries = [ diff --git a/services/market_intel/manual_sample_candidate_queue.py b/services/market_intel/manual_sample_candidate_queue.py index b365d19..dbcd871 100644 --- a/services/market_intel/manual_sample_candidate_queue.py +++ b/services/market_intel/manual_sample_candidate_queue.py @@ -361,6 +361,10 @@ def _payload_hash(payload): def _transaction_statements_from_rows(queue_rows): statements = [] for index, row in enumerate(queue_rows, start=1): + parameter_preview = { + key: value for key, value in row.items() + if key != "write_status" + } statements.append( { "index": index, @@ -368,10 +372,9 @@ def _transaction_statements_from_rows(queue_rows): "table": "market_alert_review_queue", "lookup": {"dedupe_key": row["dedupe_key"]}, "sql_shape": QUEUE_INSERT_SQL_SHAPE, - "parameter_keys": sorted( - key for key in row if key != "write_status" - ), - "parameter_payload_hash": _payload_hash(row), + "parameter_keys": sorted(parameter_preview), + "parameter_preview": parameter_preview, + "parameter_payload_hash": _payload_hash(parameter_preview), "idempotency_key": ( f"market_alert_review_queue:{row['dedupe_key']}" ), diff --git a/services/market_intel/service.py b/services/market_intel/service.py index 22c7345..d6fa5cf 100644 --- a/services/market_intel/service.py +++ b/services/market_intel/service.py @@ -108,7 +108,7 @@ class MarketIntelRuntimeStatus: class MarketIntelService: """市場情報入口服務,先集中 feature gate 與安全狀態。""" - phase = "phase_56_candidate_queue_writer_preflight" + phase = "phase_57_candidate_queue_writer_cli_transaction" def get_runtime_status(self) -> MarketIntelRuntimeStatus: return MarketIntelRuntimeStatus( diff --git a/tests/test_market_intel_skeleton.py b/tests/test_market_intel_skeleton.py index 883a4d6..b87de25 100644 --- a/tests/test_market_intel_skeleton.py +++ b/tests/test_market_intel_skeleton.py @@ -585,7 +585,7 @@ def test_legacy_source_bridge_default_is_planned_only(): bridge = MarketIntelService().build_legacy_source_bridge() assert bridge["mode"] == "legacy_source_bridge_planned" - assert bridge["phase"] == "phase_56_candidate_queue_writer_preflight" + assert bridge["phase"] == "phase_57_candidate_queue_writer_cli_transaction" assert bridge["execute_requested"] is False assert bridge["read_only_query_executed"] is False assert bridge["database_connection_opened"] is False @@ -743,7 +743,7 @@ def test_mcp_tool_contract_preview_is_read_only_and_whitelisted(): contract = MarketIntelService().build_mcp_tool_contract() assert contract["mode"] == "mcp_tool_contract_preview" - assert contract["phase"] == "phase_56_candidate_queue_writer_preflight" + assert contract["phase"] == "phase_57_candidate_queue_writer_cli_transaction" assert contract["caller"] == "market_intel" assert contract["contract_ready"] is True assert contract["blocked_reasons"] == [] @@ -876,7 +876,7 @@ def test_mcp_activation_runbook_route_is_preview_only(): assert response.status_code == 200 assert data["mode"] == "mcp_activation_runbook_preview" - assert data["phase"] == "phase_56_candidate_queue_writer_preflight" + assert data["phase"] == "phase_57_candidate_queue_writer_cli_transaction" assert data["deployment_actions_executed"] is False assert data["docker_command_executed"] is False assert data["ssh_command_executed"] is False @@ -889,7 +889,7 @@ def test_mcp_fetch_gate_default_blocks_external_fetch(): gate = MarketIntelService().build_mcp_fetch_gate(fetch_requested=True) assert gate["mode"] == "mcp_fetch_gate_planned" - assert gate["phase"] == "phase_56_candidate_queue_writer_preflight" + assert gate["phase"] == "phase_57_candidate_queue_writer_cli_transaction" assert gate["fetch_requested"] is True assert gate["manual_fetch_gate_open"] is False assert gate["network_request_allowed"] is False @@ -959,7 +959,7 @@ def test_mcp_fetch_gate_route_is_preview_only(): assert response.status_code == 200 assert data["mode"] == "mcp_fetch_gate_planned" - assert data["phase"] == "phase_56_candidate_queue_writer_preflight" + assert data["phase"] == "phase_57_candidate_queue_writer_cli_transaction" assert data["fetch_requested"] is False assert data["network_request_allowed"] is False assert data["external_network_executed"] is False @@ -971,7 +971,7 @@ def test_manual_sample_plan_preview_blocks_fetch_and_write(): plan = MarketIntelService().build_manual_sample_plan() assert plan["mode"] == "manual_sample_fetch_plan_preview" - assert plan["phase"] == "phase_56_candidate_queue_writer_preflight" + assert plan["phase"] == "phase_57_candidate_queue_writer_cli_transaction" assert plan["ready_for_manual_sample_fetch"] is False assert plan["sample_fetch_executed"] is False assert plan["external_network_executed"] is False @@ -1019,7 +1019,7 @@ def test_manual_sample_plan_route_is_preview_only(): assert response.status_code == 200 assert data["mode"] == "manual_sample_fetch_plan_preview" - assert data["phase"] == "phase_56_candidate_queue_writer_preflight" + assert data["phase"] == "phase_57_candidate_queue_writer_cli_transaction" assert data["sample_fetch_executed"] is False assert data["external_network_executed"] is False assert data["database_write_executed"] is False @@ -1030,7 +1030,7 @@ def test_manual_sample_acceptance_preview_blocks_candidate_import(): acceptance = MarketIntelService().build_manual_sample_acceptance() assert acceptance["mode"] == "manual_sample_acceptance_preview" - assert acceptance["phase"] == "phase_56_candidate_queue_writer_preflight" + assert acceptance["phase"] == "phase_57_candidate_queue_writer_cli_transaction" assert acceptance["contract_ready"] is True assert acceptance["sample_result_loaded"] is False assert acceptance["sample_result_accepted"] is False @@ -1072,7 +1072,7 @@ def test_manual_sample_acceptance_route_is_preview_only(): assert response.status_code == 200 assert data["mode"] == "manual_sample_acceptance_preview" - assert data["phase"] == "phase_56_candidate_queue_writer_preflight" + assert data["phase"] == "phase_57_candidate_queue_writer_cli_transaction" assert data["sample_result_loaded"] is False assert data["candidate_import_allowed"] is False assert data["external_network_executed"] is False @@ -1084,7 +1084,7 @@ def test_manual_sample_review_preview_is_planned_until_result_loaded(): review = MarketIntelService().build_manual_sample_review() assert review["mode"] == "manual_sample_review_preview" - assert review["phase"] == "phase_56_candidate_queue_writer_preflight" + assert review["phase"] == "phase_57_candidate_queue_writer_cli_transaction" assert review["contract_ready"] is True assert review["sample_result_loaded"] is False assert review["sample_result_reviewed"] is False @@ -1195,7 +1195,7 @@ def test_manual_sample_review_route_is_preview_only(): assert response.status_code == 200 assert data["mode"] == "manual_sample_review_preview" - assert data["phase"] == "phase_56_candidate_queue_writer_preflight" + assert data["phase"] == "phase_57_candidate_queue_writer_cli_transaction" assert data["sample_result_loaded"] is False assert data["sample_result_reviewed"] is False assert data["candidate_import_allowed"] is False @@ -1234,7 +1234,7 @@ def test_manual_sample_review_evaluation_preview_accepts_payload_without_persist ) assert review["mode"] == "manual_sample_review_evaluation_preview" - assert review["phase"] == "phase_56_candidate_queue_writer_preflight" + assert review["phase"] == "phase_57_candidate_queue_writer_cli_transaction" assert review["review_request_type"] == "operator_posted_json" assert review["payload_received"] is True assert review["payload_valid_json_object"] is True @@ -1296,7 +1296,7 @@ def test_manual_sample_review_evaluate_route_is_post_only_and_no_write(): assert response.status_code == 200 assert data["mode"] == "manual_sample_review_evaluation_preview" - assert data["phase"] == "phase_56_candidate_queue_writer_preflight" + assert data["phase"] == "phase_57_candidate_queue_writer_cli_transaction" assert data["payload_received"] is True assert data["payload_valid_json_object"] is True assert data["payload_persisted"] is False @@ -1376,7 +1376,7 @@ def test_manual_sample_candidate_handoff_preview_creates_candidates_without_pers ) assert handoff["mode"] == "manual_sample_candidate_handoff_preview" - assert handoff["phase"] == "phase_56_candidate_queue_writer_preflight" + assert handoff["phase"] == "phase_57_candidate_queue_writer_cli_transaction" assert handoff["payload_received"] is True assert handoff["payload_valid_json_object"] is True assert handoff["payload_persisted"] is False @@ -1440,7 +1440,7 @@ def test_manual_sample_candidate_handoff_route_is_post_only_and_no_write(): assert response.status_code == 200 assert data["mode"] == "manual_sample_candidate_handoff_preview" - assert data["phase"] == "phase_56_candidate_queue_writer_preflight" + assert data["phase"] == "phase_57_candidate_queue_writer_cli_transaction" assert data["payload_received"] is True assert data["handoff_ready"] is True assert data["candidate_handoff_created"] is True @@ -1499,7 +1499,7 @@ def test_manual_sample_candidate_queue_draft_preview_builds_review_items_without ) assert queue_draft["mode"] == "manual_sample_candidate_queue_draft_preview" - assert queue_draft["phase"] == "phase_56_candidate_queue_writer_preflight" + assert queue_draft["phase"] == "phase_57_candidate_queue_writer_cli_transaction" assert queue_draft["payload_received"] is True assert queue_draft["payload_valid_json_object"] is True assert queue_draft["payload_persisted"] is False @@ -1573,7 +1573,7 @@ def test_manual_sample_candidate_queue_draft_route_is_post_only_and_no_write(): assert response.status_code == 200 assert data["mode"] == "manual_sample_candidate_queue_draft_preview" - assert data["phase"] == "phase_56_candidate_queue_writer_preflight" + assert data["phase"] == "phase_57_candidate_queue_writer_cli_transaction" assert data["payload_received"] is True assert data["handoff_ready"] is True assert data["queue_draft_ready"] is True @@ -1636,7 +1636,7 @@ def test_manual_sample_candidate_queue_approval_preview_blocks_write_and_maps_ro ) assert approval["mode"] == "manual_sample_candidate_queue_approval_preview" - assert approval["phase"] == "phase_56_candidate_queue_writer_preflight" + assert approval["phase"] == "phase_57_candidate_queue_writer_cli_transaction" assert approval["payload_received"] is True assert approval["payload_valid_json_object"] is True assert approval["payload_persisted"] is False @@ -1714,7 +1714,7 @@ def test_manual_sample_candidate_queue_approval_route_is_post_only_and_no_write( assert response.status_code == 200 assert data["mode"] == "manual_sample_candidate_queue_approval_preview" - assert data["phase"] == "phase_56_candidate_queue_writer_preflight" + assert data["phase"] == "phase_57_candidate_queue_writer_cli_transaction" assert data["payload_received"] is True assert data["approval_preview_created"] is True assert data["approval_request_created"] is False @@ -1777,7 +1777,7 @@ def test_manual_sample_candidate_queue_transaction_preview_blocks_execution(): ) assert transaction["mode"] == "manual_sample_candidate_queue_transaction_preview" - assert transaction["phase"] == "phase_56_candidate_queue_writer_preflight" + assert transaction["phase"] == "phase_57_candidate_queue_writer_cli_transaction" assert transaction["payload_received"] is True assert transaction["payload_valid_json_object"] is True assert transaction["payload_persisted"] is False @@ -1805,6 +1805,8 @@ def test_manual_sample_candidate_queue_transaction_preview_blocks_execution(): assert statement["table"] == "market_alert_review_queue" assert statement["write_status"] == "blocked_transaction_preview_only" assert statement["idempotency_key"].startswith("market_alert_review_queue:") + assert statement["parameter_preview"]["dedupe_key"].startswith("sample-candidate:") + assert statement["parameter_preview"]["metadata_json_preview"]["platform_code"] == "momo" assert "queue_approval_gates_not_all_passed" in transaction["blocked_reasons"] assert "queue_transaction_execution_still_blocked" in transaction["blocked_reasons"] assert ( @@ -1855,7 +1857,7 @@ def test_manual_sample_candidate_queue_transaction_route_is_post_only_and_no_wri assert response.status_code == 200 assert data["mode"] == "manual_sample_candidate_queue_transaction_preview" - assert data["phase"] == "phase_56_candidate_queue_writer_preflight" + assert data["phase"] == "phase_57_candidate_queue_writer_cli_transaction" assert data["payload_received"] is True assert data["transaction_preview_created"] is True assert data["transaction_ready"] is False @@ -1888,7 +1890,7 @@ def test_manual_sample_candidate_queue_transaction_blocks_invalid_payload(): assert "queue_transaction_preview_not_ready" in transaction["blocked_reasons"] -def test_candidate_queue_writer_cli_gate_blocks_real_write_even_with_token(): +def test_candidate_queue_writer_cli_gate_blocks_until_preflight_backup_and_smoke_pass(): from services.market_intel.candidate_queue_writer_cli import ( build_candidate_queue_writer_cli_plan, ) @@ -1932,7 +1934,7 @@ def test_candidate_queue_writer_cli_gate_blocks_real_write_even_with_token(): assert status["apply_real_write_requested"] is True assert status["approval_token_present"] is True assert status["approval_token_valid"] is True - assert status["queue_writer_implementation_enabled"] is False + assert status["queue_writer_implementation_enabled"] is True assert status["ready_for_real_write"] is False assert status["writes_executed"] is False assert status["would_write_database"] is False @@ -1946,8 +1948,9 @@ def test_candidate_queue_writer_cli_gate_blocks_real_write_even_with_token(): assert status["transaction_preview_summary"]["statement_count"] == 1 assert "approval_token_valid" not in status["blocked_reasons"] assert "queue_writer_preflight_ready" in status["blocked_reasons"] - assert "queue_writer_implementation_enabled" in status["blocked_reasons"] - assert "candidate_queue_writer_execution_not_enabled" in status["blocked_reasons"] + assert "backup_verified" in status["blocked_reasons"] + assert "migration_live_smoke_passed" in status["blocked_reasons"] + assert "queue_writer_implementation_enabled" not in status["blocked_reasons"] assert status["safety_contract"]["refuses_api_execution"] is True assert ( "do_not_execute_candidate_queue_writer_from_api" @@ -2090,9 +2093,135 @@ def test_candidate_queue_writer_preflight_sqlite_read_only_validates_columns(): assert preflight["missing_insert_columns"] == [] assert preflight["dedupe_unique_index_present"] is True assert preflight["unmapped_payload_keys"] == [] - assert preflight["blocked_reasons"] == [ - "candidate_queue_writer_execution_not_enabled" - ] + assert preflight["blocked_reasons"] == [] + + +def test_candidate_queue_writer_cli_sqlite_transaction_is_idempotent(): + from services.market_intel.candidate_queue_writer_cli import ( + build_candidate_queue_writer_cli_plan, + ) + from services.market_intel.candidate_queue_writer_preflight import ( + build_candidate_queue_writer_preflight, + ) + + sample_result = { + "batch_id": "sample-batch-19", + "platform_code": "momo", + "source_key": "homepage", + "source_url": "https://www.momoshop.com.tw/", + "status": "fetched", + "status_code": 200, + "content_length": 1700, + "page_hash": "1" * 64, + "title": "MOMO 活動", + "diagnostics": { + "link_count": 1, + "same_host_link_count": 1, + "campaign_link_candidates": [ + { + "confidence_band": "high", + "score": 94, + "url": "https://www.momoshop.com.tw/activity/sample", + "text": "品牌活動", + }, + ], + }, + } + transaction = MarketIntelService().build_manual_sample_candidate_queue_transaction( + sample_result=sample_result + ) + engine = create_engine("sqlite:///:memory:") + with engine.begin() as conn: + conn.execute( + text( + """ + CREATE TABLE market_alert_review_queue ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + alert_candidate_id TEXT NOT NULL UNIQUE, + review_state TEXT NOT NULL DEFAULT 'draft', + priority_lane TEXT NOT NULL DEFAULT 'watch', + threshold_level TEXT NOT NULL, + total_score FLOAT NOT NULL DEFAULT 0, + evidence_bundle_id TEXT NOT NULL, + dedupe_key TEXT NOT NULL, + source_batch_id TEXT NOT NULL, + metadata_json TEXT, + created_at TEXT DEFAULT CURRENT_TIMESTAMP, + updated_at TEXT DEFAULT CURRENT_TIMESTAMP + ) + """ + ) + ) + conn.execute( + text( + "CREATE UNIQUE INDEX ux_market_alert_review_queue_dedupe " + "ON market_alert_review_queue (dedupe_key)" + ) + ) + + preflight = build_candidate_queue_writer_preflight( + transaction_preview=transaction, + execute_requested=True, + engine=engine, + database_type="sqlite", + ) + first_run = build_candidate_queue_writer_cli_plan( + transaction_preview=transaction, + writer_preflight=preflight, + execute_requested=True, + apply_real_write=True, + approval_token=TEST_APPROVAL_TOKEN, + approval_token_secret=TEST_APPROVAL_TOKEN, + backup_verified=True, + migration_live_smoke_passed=True, + engine=engine, + database_type="sqlite", + ) + second_run = build_candidate_queue_writer_cli_plan( + transaction_preview=transaction, + writer_preflight=preflight, + execute_requested=True, + apply_real_write=True, + approval_token=TEST_APPROVAL_TOKEN, + approval_token_secret=TEST_APPROVAL_TOKEN, + backup_verified=True, + migration_live_smoke_passed=True, + engine=engine, + database_type="sqlite", + ) + with engine.connect() as conn: + rows = conn.execute( + text( + """ + SELECT alert_candidate_id, review_state, dedupe_key, metadata_json + FROM market_alert_review_queue + """ + ) + ).fetchall() + + assert first_run["mode"] == "candidate_queue_writer_cli_executed" + assert first_run["ready_for_real_write"] is True + assert first_run["writes_executed"] is True + assert first_run["would_write_database"] is True + assert first_run["database_connection_opened"] is True + assert first_run["database_session_created"] is False + assert first_run["explicit_transaction_opened"] is True + assert first_run["database_write_executed"] is True + assert first_run["database_commit_executed"] is True + assert first_run["database_rollback_executed"] is False + assert first_run["scheduler_attached"] is False + assert first_run["inserted_count"] == 1 + assert first_run["skipped_count"] == 0 + assert first_run["blocked_reasons"] == [] + assert first_run["exit_code"] == 0 + assert len(rows) == 1 + assert rows[0]._mapping["review_state"] == "needs_review" + assert "platform_code" in rows[0]._mapping["metadata_json"] + + assert second_run["mode"] == "candidate_queue_writer_cli_executed" + assert second_run["inserted_count"] == 0 + assert second_run["skipped_count"] == 1 + assert second_run["database_commit_executed"] is True def test_candidate_queue_writer_preflight_route_is_post_only_and_no_write(): @@ -2137,7 +2266,7 @@ def test_candidate_queue_writer_preflight_route_is_post_only_and_no_write(): assert response.status_code == 200 assert data["mode"] == "candidate_queue_writer_preflight_planned" - assert data["phase"] == "phase_56_candidate_queue_writer_preflight" + assert data["phase"] == "phase_57_candidate_queue_writer_cli_transaction" assert data["execute_requested"] is False assert data["read_only_query_executed"] is False assert data["database_connection_opened"] is False @@ -2194,7 +2323,7 @@ def test_candidate_queue_writer_status_route_never_leaks_approval_token(monkeypa assert response.status_code == 200 assert data["mode"] == "candidate_queue_writer_cli_blocked" - assert data["phase"] == "phase_56_candidate_queue_writer_preflight" + assert data["phase"] == "phase_57_candidate_queue_writer_cli_transaction" assert data["execute_requested"] is True assert data["apply_real_write_requested"] is True assert data["approval_token_present"] is False @@ -2244,7 +2373,7 @@ def test_scheduler_plan_preview_blocks_job_attachment(): plan = MarketIntelService().build_scheduler_plan() assert plan["mode"] == "scheduler_attach_plan_preview" - assert plan["phase"] == "phase_56_candidate_queue_writer_preflight" + assert plan["phase"] == "phase_57_candidate_queue_writer_cli_transaction" assert plan["ready_to_attach_scheduler"] is False assert plan["scheduler_attached"] is False assert plan["scheduler_registration_executed"] is False @@ -2282,7 +2411,7 @@ def test_scheduler_plan_route_is_preview_only(): assert response.status_code == 200 assert data["mode"] == "scheduler_attach_plan_preview" - assert data["phase"] == "phase_56_candidate_queue_writer_preflight" + assert data["phase"] == "phase_57_candidate_queue_writer_cli_transaction" assert data["scheduler_registration_executed"] is False assert data["crawler_job_started"] is False assert data["external_network_executed"] is False @@ -2293,7 +2422,7 @@ def test_match_review_plan_preview_blocks_auto_confirm(): plan = MarketIntelService().build_match_review_plan() assert plan["mode"] == "match_review_plan_preview" - assert plan["phase"] == "phase_56_candidate_queue_writer_preflight" + assert plan["phase"] == "phase_57_candidate_queue_writer_cli_transaction" assert plan["ready_for_review_queue"] is False assert plan["review_queue_created"] is False assert plan["auto_match_executed"] is False @@ -2329,7 +2458,7 @@ def test_match_review_plan_route_is_preview_only(): assert response.status_code == 200 assert data["mode"] == "match_review_plan_preview" - assert data["phase"] == "phase_56_candidate_queue_writer_preflight" + assert data["phase"] == "phase_57_candidate_queue_writer_cli_transaction" assert data["review_queue_created"] is False assert data["auto_confirm_executed"] is False assert data["external_network_executed"] is False @@ -2340,7 +2469,7 @@ def test_opportunity_plan_preview_blocks_alerts_and_ai_summary(): plan = MarketIntelService().build_opportunity_plan() assert plan["mode"] == "opportunity_plan_preview" - assert plan["phase"] == "phase_56_candidate_queue_writer_preflight" + assert plan["phase"] == "phase_57_candidate_queue_writer_cli_transaction" assert plan["ready_for_opportunity_queue"] is False assert plan["opportunity_queue_created"] is False assert plan["threat_alert_dispatched"] is False @@ -2381,7 +2510,7 @@ def test_opportunity_plan_route_is_preview_only(): assert response.status_code == 200 assert data["mode"] == "opportunity_plan_preview" - assert data["phase"] == "phase_56_candidate_queue_writer_preflight" + assert data["phase"] == "phase_57_candidate_queue_writer_cli_transaction" assert data["opportunity_queue_created"] is False assert data["threat_alert_dispatched"] is False assert data["ai_summary_generated"] is False @@ -2392,7 +2521,7 @@ def test_opportunity_scoring_plan_preview_blocks_scoring_and_alerts(): plan = MarketIntelService().build_opportunity_scoring_plan() assert plan["mode"] == "opportunity_scoring_plan_preview" - assert plan["phase"] == "phase_56_candidate_queue_writer_preflight" + assert plan["phase"] == "phase_57_candidate_queue_writer_cli_transaction" assert plan["ready_for_scoring_job"] is False assert plan["scoring_job_created"] is False assert plan["score_calculation_executed"] is False @@ -2440,7 +2569,7 @@ def test_opportunity_scoring_plan_route_is_preview_only(): assert response.status_code == 200 assert data["mode"] == "opportunity_scoring_plan_preview" - assert data["phase"] == "phase_56_candidate_queue_writer_preflight" + assert data["phase"] == "phase_57_candidate_queue_writer_cli_transaction" assert data["scoring_job_created"] is False assert data["score_calculation_executed"] is False assert data["sample_scores_generated"] is False @@ -2452,7 +2581,7 @@ def test_opportunity_evidence_plan_preview_blocks_queries_and_alerts(): plan = MarketIntelService().build_opportunity_evidence_plan() assert plan["mode"] == "opportunity_evidence_plan_preview" - assert plan["phase"] == "phase_56_candidate_queue_writer_preflight" + assert plan["phase"] == "phase_57_candidate_queue_writer_cli_transaction" assert plan["ready_for_evidence_bundle"] is False assert plan["evidence_bundle_created"] is False assert plan["evidence_query_executed"] is False @@ -2498,7 +2627,7 @@ def test_opportunity_evidence_plan_route_is_preview_only(): assert response.status_code == 200 assert data["mode"] == "opportunity_evidence_plan_preview" - assert data["phase"] == "phase_56_candidate_queue_writer_preflight" + assert data["phase"] == "phase_57_candidate_queue_writer_cli_transaction" assert data["evidence_bundle_created"] is False assert data["evidence_query_executed"] is False assert data["sample_evidence_generated"] is False @@ -2511,7 +2640,7 @@ def test_opportunity_alert_plan_preview_blocks_dispatch_and_llm_calls(): plan = MarketIntelService().build_opportunity_alert_plan() assert plan["mode"] == "opportunity_alert_plan_preview" - assert plan["phase"] == "phase_56_candidate_queue_writer_preflight" + assert plan["phase"] == "phase_57_candidate_queue_writer_cli_transaction" assert plan["ready_for_alert_candidates"] is False assert plan["alert_candidate_created"] is False assert plan["alert_queue_created"] is False @@ -2596,7 +2725,7 @@ def test_opportunity_alert_plan_route_is_preview_only(): assert response.status_code == 200 assert data["mode"] == "opportunity_alert_plan_preview" - assert data["phase"] == "phase_56_candidate_queue_writer_preflight" + assert data["phase"] == "phase_57_candidate_queue_writer_cli_transaction" assert data["alert_candidate_created"] is False assert data["alert_queue_created"] is False assert data["review_queue_created"] is False @@ -2674,7 +2803,7 @@ def test_mcp_deploy_preflight_route_is_preview_only(): assert response.status_code == 200 assert data["mode"] == "mcp_external_deploy_preflight_preview" - assert data["phase"] == "phase_56_candidate_queue_writer_preflight" + assert data["phase"] == "phase_57_candidate_queue_writer_cli_transaction" assert data["deployment_actions_executed"] is False assert data["docker_command_executed"] is False assert data["ssh_command_executed"] is False @@ -2689,7 +2818,7 @@ def test_mcp_readiness_default_is_planned_only(monkeypatch): readiness = MarketIntelService().build_mcp_readiness() assert readiness["mode"] == "mcp_readiness_planned" - assert readiness["phase"] == "phase_56_candidate_queue_writer_preflight" + assert readiness["phase"] == "phase_57_candidate_queue_writer_cli_transaction" assert readiness["execute_requested"] is False assert readiness["router_enabled"] is False assert readiness["external_mcp_complete"] is False @@ -3548,7 +3677,7 @@ def test_migration_apply_drill_planned_is_safe_and_manual_only(): drill = MarketIntelService().build_migration_apply_drill() assert drill["mode"] == "migration_apply_drill_preview" - assert drill["phase"] == "phase_56_candidate_queue_writer_preflight" + assert drill["phase"] == "phase_57_candidate_queue_writer_cli_transaction" assert drill["execute_requested"] is False assert drill["schema_state"] == "planned_no_db_probe" assert drill["drill_ready_for_operator_review"] is True @@ -3663,7 +3792,7 @@ def test_migration_apply_drill_route_is_preview_only(): assert response.status_code == 200 assert data["mode"] == "migration_apply_drill_preview" - assert data["phase"] == "phase_56_candidate_queue_writer_preflight" + assert data["phase"] == "phase_57_candidate_queue_writer_cli_transaction" assert data["execute_requested"] is False assert data["migration_executed"] is False assert data["rollback_executed"] is False @@ -3675,7 +3804,7 @@ def test_migration_catalog_review_planned_is_safe_and_diagnostic(): review = MarketIntelService().build_migration_catalog_review() assert review["mode"] == "migration_catalog_review_preview" - assert review["phase"] == "phase_56_candidate_queue_writer_preflight" + assert review["phase"] == "phase_57_candidate_queue_writer_cli_transaction" assert review["execute_requested"] is False assert review["catalog_state"] == "planned_no_probe" assert review["seed_state"] == "planned_no_probe" @@ -3790,7 +3919,7 @@ def test_migration_catalog_review_route_is_preview_only(): assert response.status_code == 200 assert data["mode"] == "migration_catalog_review_preview" - assert data["phase"] == "phase_56_candidate_queue_writer_preflight" + assert data["phase"] == "phase_57_candidate_queue_writer_cli_transaction" assert data["execute_requested"] is False assert data["catalog_state"] == "planned_no_probe" assert data["migration_executed"] is False @@ -3803,7 +3932,7 @@ def test_migration_live_smoke_planned_is_preview_only(): smoke = MarketIntelService().build_migration_live_smoke() assert smoke["mode"] == "migration_live_smoke_preview" - assert smoke["phase"] == "phase_56_candidate_queue_writer_preflight" + assert smoke["phase"] == "phase_57_candidate_queue_writer_cli_transaction" assert smoke["execute_requested"] is False assert smoke["smoke_result"] == "planned_no_execution" assert smoke["live_smoke_passed"] is False @@ -3865,7 +3994,7 @@ def test_migration_live_smoke_route_is_preview_only(): assert response.status_code == 200 assert data["mode"] == "migration_live_smoke_preview" - assert data["phase"] == "phase_56_candidate_queue_writer_preflight" + assert data["phase"] == "phase_57_candidate_queue_writer_cli_transaction" assert data["execute_requested"] is False assert data["smoke_result"] == "planned_no_execution" assert data["migration_executed"] is False @@ -3878,7 +4007,7 @@ def test_live_db_inventory_planned_is_preview_only(): inventory = MarketIntelService().build_live_db_inventory() assert inventory["mode"] == "live_db_inventory_planned" - assert inventory["phase"] == "phase_56_candidate_queue_writer_preflight" + assert inventory["phase"] == "phase_57_candidate_queue_writer_cli_transaction" assert inventory["execute_requested"] is False assert inventory["read_only_query_executed"] is False assert inventory["database_connection_opened"] is False @@ -4022,7 +4151,7 @@ def test_live_db_inventory_route_is_preview_only(): assert response.status_code == 200 assert data["mode"] == "live_db_inventory_planned" - assert data["phase"] == "phase_56_candidate_queue_writer_preflight" + assert data["phase"] == "phase_57_candidate_queue_writer_cli_transaction" assert data["execute_requested"] is False assert data["read_only_query_executed"] is False assert data["database_write_executed"] is False @@ -4249,7 +4378,7 @@ def test_candidate_queue_writer_cli_script_outputs_blocked_gate(tmp_path): assert result.returncode == 0 assert data["mode"] == "candidate_queue_writer_cli_blocked" - assert data["phase"] == "phase_56_candidate_queue_writer_preflight" + assert data["phase"] == "phase_57_candidate_queue_writer_cli_transaction" assert data["execute_requested"] is False assert data["apply_real_write_requested"] is False assert data["writes_executed"] is False