"""市場情報人工候選審核 queue 草案預覽。 本模組只把 sample result handoff 轉成 review queue draft; 不建立正式 queue、不寫 DB、不掛 scheduler、不連外。 """ import hashlib import json from services.market_intel.manual_sample_review import ( build_manual_sample_candidate_handoff_preview, ) def _priority_for_candidate(candidate): band = str(candidate.get("confidence_band") or "").lower() if band == "high": return 80 if band == "medium": return 50 return 20 def _priority_lane_for_candidate(candidate): score = int(candidate.get("score") or 0) band = str(candidate.get("confidence_band") or "").lower() if band == "high" and score >= 90: return "urgent" if band in ("high", "medium"): return "watch" return "backlog" def _threshold_level_for_candidate(candidate): score = int(candidate.get("score") or 0) if score >= 90: return "high" if score >= 70: return "medium" return "low" def _build_queue_item(candidate, batch_id): candidate_key = str(candidate.get("candidate_key") or "") evidence_bundle_id = f"sample-candidate:{candidate_key}" return { "queue_item_key": f"review:{candidate_key}", "candidate_key": candidate_key, "alert_candidate_id": candidate_key, "platform_code": str(candidate.get("platform_code") or ""), "candidate_url": str(candidate.get("candidate_url") or ""), "candidate_text": str(candidate.get("candidate_text") or ""), "confidence_band": str(candidate.get("confidence_band") or "unknown"), "score": candidate.get("score") or 0, "rank_position": candidate.get("rank_position") or 0, "review_state": "needs_review", "priority_lane": _priority_lane_for_candidate(candidate), "threshold_level": _threshold_level_for_candidate(candidate), "total_score": float(candidate.get("score") or 0), "evidence_bundle_id": evidence_bundle_id, "dedupe_key": evidence_bundle_id, "source_batch_id": batch_id, "review_priority": _priority_for_candidate(candidate), "created_from_batch_id": batch_id, "write_status": "blocked_preview_only", "approval_required": True, } def build_manual_sample_candidate_queue_draft_preview( *, runtime_status, acceptance_contract, sample_result=None, payload_error=None, limit=20, ): """建立候選活動人工審核 queue 草案;只回 preview,不保存。""" handoff = build_manual_sample_candidate_handoff_preview( runtime_status=runtime_status, acceptance_contract=acceptance_contract, sample_result=sample_result, payload_error=payload_error, limit=limit, ) batch_id = "" if isinstance(sample_result, dict): batch_id = str(sample_result.get("batch_id") or "") queue_items = [ _build_queue_item(candidate, batch_id) for candidate in handoff.get("candidates", []) ] queue_draft_ready = bool(handoff.get("handoff_ready") and queue_items) blocked_reasons = list(handoff.get("blocked_reasons", [])) if not queue_draft_ready: blocked_reasons.append("review_queue_draft_not_ready") blocked_reasons.append("review_queue_persist_still_blocked") return { "mode": "manual_sample_candidate_queue_draft_preview", "handoff": { "mode": handoff["mode"], "handoff_ready": handoff["handoff_ready"], "candidate_handoff_created": handoff["candidate_handoff_created"], "candidate_handoff_persisted": handoff["candidate_handoff_persisted"], "handoff_summary": handoff["handoff_summary"], }, "payload_received": handoff["payload_received"], "payload_valid_json_object": handoff["payload_valid_json_object"], "payload_error": handoff["payload_error"], "payload_persisted": False, "sample_result_persisted": False, "handoff_ready": handoff["handoff_ready"], "queue_draft_ready": queue_draft_ready, "review_queue_draft_created": bool(queue_items), "review_queue_created": False, "review_queue_persisted": False, "candidate_import_allowed": False, "scheduler_attached": False, "external_network_executed": False, "database_connection_opened": False, "database_session_created": False, "database_write_executed": False, "database_commit_executed": False, "writes_executed": False, "would_write_database": False, "blocked_reasons": blocked_reasons, "queue_summary": { "queue_item_count": len(queue_items), "review_queue_created": False, "queue_persisted": False, "import_allowed": False, }, "queue_contract": { "contract_type": "draft_contract_only", "table_name": "market_alert_review_queue", "required_fields": [ "alert_candidate_id", "review_state", "priority_lane", "threshold_level", "total_score", "evidence_bundle_id", "dedupe_key", "source_batch_id", ], "forbidden_actions": [ "insert_review_queue_rows", "auto_approve_candidates", "insert_market_campaigns", "attach_scheduler_jobs", ], }, "queue_items": queue_items, "operator_next_actions": [ { "key": "review_queue_draft_manually", "label": "人工確認候選 URL、分數與優先序後,才可提出正式 queue 建立申請", "write_status": "blocked", }, { "key": "approve_queue_persistence_later", "label": "正式寫入候選審核 queue 需要另一次明確批准與 DB 備援檢查", "write_status": "blocked", }, ], "safe_boundaries": [ *handoff["safe_boundaries"], "do_not_create_candidate_review_queue_from_preview", "do_not_persist_candidate_review_queue_draft", "do_not_write_market_tables_from_queue_draft", "do_not_auto_approve_candidates", "do_not_attach_scheduler_from_queue_draft", "do_not_touch_momo_db_lifecycle", ], } def _queue_rows_from_items(queue_items): rows = [] for item in queue_items: rows.append( { "alert_candidate_id": item["alert_candidate_id"], "review_state": item["review_state"], "priority_lane": item["priority_lane"], "threshold_level": item["threshold_level"], "total_score": item["total_score"], "evidence_bundle_id": item["evidence_bundle_id"], "dedupe_key": item["dedupe_key"], "source_batch_id": item["source_batch_id"], "metadata_json_preview": { "platform_code": item["platform_code"], "candidate_url": item["candidate_url"], "candidate_text": item["candidate_text"], "confidence_band": item["confidence_band"], "rank_position": item["rank_position"], }, "write_status": "blocked_approval_preview_only", } ) return rows def build_manual_sample_candidate_queue_approval_preview( *, runtime_status, acceptance_contract, sample_result=None, payload_error=None, limit=20, ): """建立候選審核 queue 寫入前 gate;只回 row preview,不寫 DB。""" queue_draft = build_manual_sample_candidate_queue_draft_preview( runtime_status=runtime_status, acceptance_contract=acceptance_contract, sample_result=sample_result, payload_error=payload_error, limit=limit, ) queue_items = queue_draft.get("queue_items", []) required_fields = queue_draft["queue_contract"]["required_fields"] queue_rows = _queue_rows_from_items(queue_items) rows_have_required_fields = all( all(row.get(field) not in (None, "") for field in required_fields) for row in queue_rows ) gates = [ { "key": "queue_draft_ready", "label": "候選 queue 草案已可交給人工審核", "passed": bool(queue_draft["queue_draft_ready"]), }, { "key": "queue_contract_targets_existing_table", "label": "queue 草案目標表對齊既有 market_alert_review_queue", "passed": queue_draft["queue_contract"]["table_name"] == "market_alert_review_queue", }, { "key": "queue_rows_have_required_fields", "label": "row preview 具備 market_alert_review_queue 必填欄位", "passed": rows_have_required_fields, }, { "key": "runtime_write_flags_enabled", "label": "正式寫入窗口才可啟用 MARKET_INTEL 寫入 flags", "passed": bool(getattr(runtime_status, "database_write_allowed", False)), }, { "key": "backup_verified", "label": "寫入前必須確認正式環境備份完成", "passed": False, }, { "key": "manual_operator_approval", "label": "操作者需另行明確批准候選審核 queue 寫入", "passed": False, }, { "key": "rollback_plan_reviewed", "label": "已確認可關閉 flags 並回退 app-only 程式碼", "passed": False, }, ] blocked_reasons = list(queue_draft["blocked_reasons"]) blocked_reasons.extend(gate["key"] for gate in gates if not gate["passed"]) blocked_reasons.append("review_queue_write_still_blocked") return { "mode": "manual_sample_candidate_queue_approval_preview", "queue_draft": { "mode": queue_draft["mode"], "queue_draft_ready": queue_draft["queue_draft_ready"], "review_queue_draft_created": queue_draft["review_queue_draft_created"], "queue_summary": queue_draft["queue_summary"], }, "payload_received": queue_draft["payload_received"], "payload_valid_json_object": queue_draft["payload_valid_json_object"], "payload_error": queue_draft["payload_error"], "payload_persisted": False, "approval_preview_created": bool(queue_rows), "approval_request_created": False, "approval_record_written": False, "review_queue_write_allowed": False, "review_queue_created": False, "review_queue_persisted": False, "candidate_import_allowed": False, "external_network_executed": False, "database_connection_opened": False, "database_session_created": False, "database_write_executed": False, "database_commit_executed": False, "scheduler_attached": False, "writes_executed": False, "would_write_database": False, "blocked_reasons": blocked_reasons, "approval_summary": { "target_table": "market_alert_review_queue", "row_preview_count": len(queue_rows), "gates_passed": len([gate for gate in gates if gate["passed"]]), "gate_count": len(gates), "manual_approval_required": True, }, "approval_gates": gates, "queue_write_preview": { "target_table": "market_alert_review_queue", "row_count": len(queue_rows), "sql_shape": ( "INSERT INTO market_alert_review_queue (...) VALUES (...) " "ON CONFLICT(dedupe_key) DO NOTHING" ), "rows": queue_rows, "write_status": "blocked_approval_preview_only", }, "operator_next_actions": [ { "key": "verify_queue_rows", "label": "人工確認 row preview 的候選 URL、分數、優先 lane 與 dedupe key", "write_status": "blocked", }, { "key": "open_explicit_write_window_later", "label": "後續另開正式寫入窗口,先備份、開 flags、再用一次性批准執行", "write_status": "blocked", }, ], "rollback_plan": [ { "key": "disable_market_intel_flags", "label": "關閉 MARKET_INTEL_ENABLED / CRAWLER / WRITE flags", }, { "key": "app_only_rollback", "label": "只回退 momo-app 程式碼與模板,不碰 momo-db 容器生命週期", }, ], "safe_boundaries": [ *queue_draft["safe_boundaries"], "do_not_create_queue_approval_record_from_preview", "do_not_insert_market_alert_review_queue_from_preview", "do_not_open_database_transaction_from_approval_preview", "do_not_auto_approve_queue_write", "do_not_attach_scheduler_from_queue_approval", "do_not_touch_momo_db_lifecycle", ], } QUEUE_INSERT_SQL_SHAPE = ( "INSERT INTO market_alert_review_queue (...) VALUES (...) " "ON CONFLICT(dedupe_key) DO NOTHING" ) def _payload_hash(payload): encoded = json.dumps(payload, ensure_ascii=False, sort_keys=True).encode("utf-8") return hashlib.sha256(encoded).hexdigest()[:16] def _transaction_statements_from_rows(queue_rows): statements = [] for index, row in enumerate(queue_rows, start=1): parameter_preview = { key: value for key, value in row.items() if key != "write_status" } statements.append( { "index": index, "operation": "insert", "table": "market_alert_review_queue", "lookup": {"dedupe_key": row["dedupe_key"]}, "sql_shape": QUEUE_INSERT_SQL_SHAPE, "parameter_keys": sorted(parameter_preview), "parameter_preview": parameter_preview, "parameter_payload_hash": _payload_hash(parameter_preview), "idempotency_key": ( f"market_alert_review_queue:{row['dedupe_key']}" ), "write_status": "blocked_transaction_preview_only", } ) return statements def build_manual_sample_candidate_queue_transaction_preview( *, runtime_status, acceptance_contract, sample_result=None, payload_error=None, limit=20, ): """建立候選審核 queue transaction preview;不開 transaction、不寫 DB。""" approval = build_manual_sample_candidate_queue_approval_preview( runtime_status=runtime_status, acceptance_contract=acceptance_contract, sample_result=sample_result, payload_error=payload_error, limit=limit, ) queue_rows = approval.get("queue_write_preview", {}).get("rows", []) statements = _transaction_statements_from_rows(queue_rows) approval_gates_passed = all( gate.get("passed") for gate in approval.get("approval_gates", []) ) transaction_preview_created = bool(statements) blocked_reasons = list(approval["blocked_reasons"]) if not transaction_preview_created: blocked_reasons.append("queue_transaction_preview_not_ready") if not approval_gates_passed: blocked_reasons.append("queue_approval_gates_not_all_passed") blocked_reasons.append("queue_transaction_execution_still_blocked") return { "mode": "manual_sample_candidate_queue_transaction_preview", "approval": { "mode": approval["mode"], "approval_preview_created": approval["approval_preview_created"], "approval_summary": approval["approval_summary"], }, "payload_received": approval["payload_received"], "payload_valid_json_object": approval["payload_valid_json_object"], "payload_error": approval["payload_error"], "payload_persisted": False, "transaction_preview_created": transaction_preview_created, "transaction_ready": False, "transaction_opened": False, "transaction_committed": False, "transaction_rolled_back": False, "approval_request_created": False, "approval_record_written": False, "review_queue_write_allowed": False, "review_queue_created": False, "review_queue_persisted": False, "candidate_import_allowed": False, "external_network_executed": False, "database_connection_opened": False, "database_session_created": False, "database_write_executed": False, "database_commit_executed": False, "scheduler_attached": False, "writes_executed": False, "would_write_database": False, "blocked_reasons": blocked_reasons, "transaction_summary": { "target_table": "market_alert_review_queue", "statement_count": len(statements), "idempotency_key_count": len( {item["idempotency_key"] for item in statements} ), "conflict_policy": "dedupe_key_do_nothing", "write_allowed": False, }, "transaction_contract": { "target_table": "market_alert_review_queue", "sql_shape": QUEUE_INSERT_SQL_SHAPE, "required_runtime_order": [ "backup_verified", "migration_live_smoke_passed", "runtime_write_flags_enabled", "manual_operator_approval", "one_time_queue_write_token_verified", "post_write_smoke", ], }, "statements": statements, "rollback_plan": [ { "key": "transaction_not_opened_by_preview", "label": "本 preview 不開 transaction;若正式寫入失敗,必須由 CLI transaction rollback", }, { "key": "dedupe_key_cleanup_review", "label": "若正式寫入後需回退,依 dedupe_key 人工審核清理", }, ], "safe_boundaries": [ *approval["safe_boundaries"], "do_not_execute_queue_transaction_from_preview", "do_not_open_database_connection_from_transaction_preview", "do_not_commit_queue_transaction_from_preview", "do_not_create_approval_record_from_transaction_preview", "do_not_touch_momo_db_lifecycle", ], }