Files
ewoooc/services/market_intel/manual_sample_candidate_queue.py
OoO 20d22b69ea
All checks were successful
CD Pipeline / deploy (push) Successful in 1m18s
新增市場情報候選佇列 writer transaction
2026-05-19 09:57:59 +08:00

488 lines
18 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""市場情報人工候選審核 queue 草案預覽。
本模組只把 sample result handoff 轉成 review queue draft
不建立正式 queue、不寫 DB、不掛 scheduler、不連外。
"""
import hashlib
import json
from services.market_intel.manual_sample_review import (
build_manual_sample_candidate_handoff_preview,
)
def _priority_for_candidate(candidate):
band = str(candidate.get("confidence_band") or "").lower()
if band == "high":
return 80
if band == "medium":
return 50
return 20
def _priority_lane_for_candidate(candidate):
score = int(candidate.get("score") or 0)
band = str(candidate.get("confidence_band") or "").lower()
if band == "high" and score >= 90:
return "urgent"
if band in ("high", "medium"):
return "watch"
return "backlog"
def _threshold_level_for_candidate(candidate):
score = int(candidate.get("score") or 0)
if score >= 90:
return "high"
if score >= 70:
return "medium"
return "low"
def _build_queue_item(candidate, batch_id):
candidate_key = str(candidate.get("candidate_key") or "")
evidence_bundle_id = f"sample-candidate:{candidate_key}"
return {
"queue_item_key": f"review:{candidate_key}",
"candidate_key": candidate_key,
"alert_candidate_id": candidate_key,
"platform_code": str(candidate.get("platform_code") or ""),
"candidate_url": str(candidate.get("candidate_url") or ""),
"candidate_text": str(candidate.get("candidate_text") or ""),
"confidence_band": str(candidate.get("confidence_band") or "unknown"),
"score": candidate.get("score") or 0,
"rank_position": candidate.get("rank_position") or 0,
"review_state": "needs_review",
"priority_lane": _priority_lane_for_candidate(candidate),
"threshold_level": _threshold_level_for_candidate(candidate),
"total_score": float(candidate.get("score") or 0),
"evidence_bundle_id": evidence_bundle_id,
"dedupe_key": evidence_bundle_id,
"source_batch_id": batch_id,
"review_priority": _priority_for_candidate(candidate),
"created_from_batch_id": batch_id,
"write_status": "blocked_preview_only",
"approval_required": True,
}
def build_manual_sample_candidate_queue_draft_preview(
*,
runtime_status,
acceptance_contract,
sample_result=None,
payload_error=None,
limit=20,
):
"""建立候選活動人工審核 queue 草案;只回 preview不保存。"""
handoff = build_manual_sample_candidate_handoff_preview(
runtime_status=runtime_status,
acceptance_contract=acceptance_contract,
sample_result=sample_result,
payload_error=payload_error,
limit=limit,
)
batch_id = ""
if isinstance(sample_result, dict):
batch_id = str(sample_result.get("batch_id") or "")
queue_items = [
_build_queue_item(candidate, batch_id)
for candidate in handoff.get("candidates", [])
]
queue_draft_ready = bool(handoff.get("handoff_ready") and queue_items)
blocked_reasons = list(handoff.get("blocked_reasons", []))
if not queue_draft_ready:
blocked_reasons.append("review_queue_draft_not_ready")
blocked_reasons.append("review_queue_persist_still_blocked")
return {
"mode": "manual_sample_candidate_queue_draft_preview",
"handoff": {
"mode": handoff["mode"],
"handoff_ready": handoff["handoff_ready"],
"candidate_handoff_created": handoff["candidate_handoff_created"],
"candidate_handoff_persisted": handoff["candidate_handoff_persisted"],
"handoff_summary": handoff["handoff_summary"],
},
"payload_received": handoff["payload_received"],
"payload_valid_json_object": handoff["payload_valid_json_object"],
"payload_error": handoff["payload_error"],
"payload_persisted": False,
"sample_result_persisted": False,
"handoff_ready": handoff["handoff_ready"],
"queue_draft_ready": queue_draft_ready,
"review_queue_draft_created": bool(queue_items),
"review_queue_created": False,
"review_queue_persisted": False,
"candidate_import_allowed": False,
"scheduler_attached": False,
"external_network_executed": False,
"database_connection_opened": False,
"database_session_created": False,
"database_write_executed": False,
"database_commit_executed": False,
"writes_executed": False,
"would_write_database": False,
"blocked_reasons": blocked_reasons,
"queue_summary": {
"queue_item_count": len(queue_items),
"review_queue_created": False,
"queue_persisted": False,
"import_allowed": False,
},
"queue_contract": {
"contract_type": "draft_contract_only",
"table_name": "market_alert_review_queue",
"required_fields": [
"alert_candidate_id",
"review_state",
"priority_lane",
"threshold_level",
"total_score",
"evidence_bundle_id",
"dedupe_key",
"source_batch_id",
],
"forbidden_actions": [
"insert_review_queue_rows",
"auto_approve_candidates",
"insert_market_campaigns",
"attach_scheduler_jobs",
],
},
"queue_items": queue_items,
"operator_next_actions": [
{
"key": "review_queue_draft_manually",
"label": "人工確認候選 URL、分數與優先序後才可提出正式 queue 建立申請",
"write_status": "blocked",
},
{
"key": "approve_queue_persistence_later",
"label": "正式寫入候選審核 queue 需要另一次明確批准與 DB 備援檢查",
"write_status": "blocked",
},
],
"safe_boundaries": [
*handoff["safe_boundaries"],
"do_not_create_candidate_review_queue_from_preview",
"do_not_persist_candidate_review_queue_draft",
"do_not_write_market_tables_from_queue_draft",
"do_not_auto_approve_candidates",
"do_not_attach_scheduler_from_queue_draft",
"do_not_touch_momo_db_lifecycle",
],
}
def _queue_rows_from_items(queue_items):
rows = []
for item in queue_items:
rows.append(
{
"alert_candidate_id": item["alert_candidate_id"],
"review_state": item["review_state"],
"priority_lane": item["priority_lane"],
"threshold_level": item["threshold_level"],
"total_score": item["total_score"],
"evidence_bundle_id": item["evidence_bundle_id"],
"dedupe_key": item["dedupe_key"],
"source_batch_id": item["source_batch_id"],
"metadata_json_preview": {
"platform_code": item["platform_code"],
"candidate_url": item["candidate_url"],
"candidate_text": item["candidate_text"],
"confidence_band": item["confidence_band"],
"rank_position": item["rank_position"],
},
"write_status": "blocked_approval_preview_only",
}
)
return rows
def build_manual_sample_candidate_queue_approval_preview(
*,
runtime_status,
acceptance_contract,
sample_result=None,
payload_error=None,
limit=20,
):
"""建立候選審核 queue 寫入前 gate只回 row preview不寫 DB。"""
queue_draft = build_manual_sample_candidate_queue_draft_preview(
runtime_status=runtime_status,
acceptance_contract=acceptance_contract,
sample_result=sample_result,
payload_error=payload_error,
limit=limit,
)
queue_items = queue_draft.get("queue_items", [])
required_fields = queue_draft["queue_contract"]["required_fields"]
queue_rows = _queue_rows_from_items(queue_items)
rows_have_required_fields = all(
all(row.get(field) not in (None, "") for field in required_fields)
for row in queue_rows
)
gates = [
{
"key": "queue_draft_ready",
"label": "候選 queue 草案已可交給人工審核",
"passed": bool(queue_draft["queue_draft_ready"]),
},
{
"key": "queue_contract_targets_existing_table",
"label": "queue 草案目標表對齊既有 market_alert_review_queue",
"passed": queue_draft["queue_contract"]["table_name"]
== "market_alert_review_queue",
},
{
"key": "queue_rows_have_required_fields",
"label": "row preview 具備 market_alert_review_queue 必填欄位",
"passed": rows_have_required_fields,
},
{
"key": "runtime_write_flags_enabled",
"label": "正式寫入窗口才可啟用 MARKET_INTEL 寫入 flags",
"passed": bool(getattr(runtime_status, "database_write_allowed", False)),
},
{
"key": "backup_verified",
"label": "寫入前必須確認正式環境備份完成",
"passed": False,
},
{
"key": "manual_operator_approval",
"label": "操作者需另行明確批准候選審核 queue 寫入",
"passed": False,
},
{
"key": "rollback_plan_reviewed",
"label": "已確認可關閉 flags 並回退 app-only 程式碼",
"passed": False,
},
]
blocked_reasons = list(queue_draft["blocked_reasons"])
blocked_reasons.extend(gate["key"] for gate in gates if not gate["passed"])
blocked_reasons.append("review_queue_write_still_blocked")
return {
"mode": "manual_sample_candidate_queue_approval_preview",
"queue_draft": {
"mode": queue_draft["mode"],
"queue_draft_ready": queue_draft["queue_draft_ready"],
"review_queue_draft_created": queue_draft["review_queue_draft_created"],
"queue_summary": queue_draft["queue_summary"],
},
"payload_received": queue_draft["payload_received"],
"payload_valid_json_object": queue_draft["payload_valid_json_object"],
"payload_error": queue_draft["payload_error"],
"payload_persisted": False,
"approval_preview_created": bool(queue_rows),
"approval_request_created": False,
"approval_record_written": False,
"review_queue_write_allowed": False,
"review_queue_created": False,
"review_queue_persisted": False,
"candidate_import_allowed": False,
"external_network_executed": False,
"database_connection_opened": False,
"database_session_created": False,
"database_write_executed": False,
"database_commit_executed": False,
"scheduler_attached": False,
"writes_executed": False,
"would_write_database": False,
"blocked_reasons": blocked_reasons,
"approval_summary": {
"target_table": "market_alert_review_queue",
"row_preview_count": len(queue_rows),
"gates_passed": len([gate for gate in gates if gate["passed"]]),
"gate_count": len(gates),
"manual_approval_required": True,
},
"approval_gates": gates,
"queue_write_preview": {
"target_table": "market_alert_review_queue",
"row_count": len(queue_rows),
"sql_shape": (
"INSERT INTO market_alert_review_queue (...) VALUES (...) "
"ON CONFLICT(dedupe_key) DO NOTHING"
),
"rows": queue_rows,
"write_status": "blocked_approval_preview_only",
},
"operator_next_actions": [
{
"key": "verify_queue_rows",
"label": "人工確認 row preview 的候選 URL、分數、優先 lane 與 dedupe key",
"write_status": "blocked",
},
{
"key": "open_explicit_write_window_later",
"label": "後續另開正式寫入窗口,先備份、開 flags、再用一次性批准執行",
"write_status": "blocked",
},
],
"rollback_plan": [
{
"key": "disable_market_intel_flags",
"label": "關閉 MARKET_INTEL_ENABLED / CRAWLER / WRITE flags",
},
{
"key": "app_only_rollback",
"label": "只回退 momo-app 程式碼與模板,不碰 momo-db 容器生命週期",
},
],
"safe_boundaries": [
*queue_draft["safe_boundaries"],
"do_not_create_queue_approval_record_from_preview",
"do_not_insert_market_alert_review_queue_from_preview",
"do_not_open_database_transaction_from_approval_preview",
"do_not_auto_approve_queue_write",
"do_not_attach_scheduler_from_queue_approval",
"do_not_touch_momo_db_lifecycle",
],
}
QUEUE_INSERT_SQL_SHAPE = (
"INSERT INTO market_alert_review_queue (...) VALUES (...) "
"ON CONFLICT(dedupe_key) DO NOTHING"
)
def _payload_hash(payload):
encoded = json.dumps(payload, ensure_ascii=False, sort_keys=True).encode("utf-8")
return hashlib.sha256(encoded).hexdigest()[:16]
def _transaction_statements_from_rows(queue_rows):
statements = []
for index, row in enumerate(queue_rows, start=1):
parameter_preview = {
key: value for key, value in row.items()
if key != "write_status"
}
statements.append(
{
"index": index,
"operation": "insert",
"table": "market_alert_review_queue",
"lookup": {"dedupe_key": row["dedupe_key"]},
"sql_shape": QUEUE_INSERT_SQL_SHAPE,
"parameter_keys": sorted(parameter_preview),
"parameter_preview": parameter_preview,
"parameter_payload_hash": _payload_hash(parameter_preview),
"idempotency_key": (
f"market_alert_review_queue:{row['dedupe_key']}"
),
"write_status": "blocked_transaction_preview_only",
}
)
return statements
def build_manual_sample_candidate_queue_transaction_preview(
*,
runtime_status,
acceptance_contract,
sample_result=None,
payload_error=None,
limit=20,
):
"""建立候選審核 queue transaction preview不開 transaction、不寫 DB。"""
approval = build_manual_sample_candidate_queue_approval_preview(
runtime_status=runtime_status,
acceptance_contract=acceptance_contract,
sample_result=sample_result,
payload_error=payload_error,
limit=limit,
)
queue_rows = approval.get("queue_write_preview", {}).get("rows", [])
statements = _transaction_statements_from_rows(queue_rows)
approval_gates_passed = all(
gate.get("passed") for gate in approval.get("approval_gates", [])
)
transaction_preview_created = bool(statements)
blocked_reasons = list(approval["blocked_reasons"])
if not transaction_preview_created:
blocked_reasons.append("queue_transaction_preview_not_ready")
if not approval_gates_passed:
blocked_reasons.append("queue_approval_gates_not_all_passed")
blocked_reasons.append("queue_transaction_execution_still_blocked")
return {
"mode": "manual_sample_candidate_queue_transaction_preview",
"approval": {
"mode": approval["mode"],
"approval_preview_created": approval["approval_preview_created"],
"approval_summary": approval["approval_summary"],
},
"payload_received": approval["payload_received"],
"payload_valid_json_object": approval["payload_valid_json_object"],
"payload_error": approval["payload_error"],
"payload_persisted": False,
"transaction_preview_created": transaction_preview_created,
"transaction_ready": False,
"transaction_opened": False,
"transaction_committed": False,
"transaction_rolled_back": False,
"approval_request_created": False,
"approval_record_written": False,
"review_queue_write_allowed": False,
"review_queue_created": False,
"review_queue_persisted": False,
"candidate_import_allowed": False,
"external_network_executed": False,
"database_connection_opened": False,
"database_session_created": False,
"database_write_executed": False,
"database_commit_executed": False,
"scheduler_attached": False,
"writes_executed": False,
"would_write_database": False,
"blocked_reasons": blocked_reasons,
"transaction_summary": {
"target_table": "market_alert_review_queue",
"statement_count": len(statements),
"idempotency_key_count": len(
{item["idempotency_key"] for item in statements}
),
"conflict_policy": "dedupe_key_do_nothing",
"write_allowed": False,
},
"transaction_contract": {
"target_table": "market_alert_review_queue",
"sql_shape": QUEUE_INSERT_SQL_SHAPE,
"required_runtime_order": [
"backup_verified",
"migration_live_smoke_passed",
"runtime_write_flags_enabled",
"manual_operator_approval",
"one_time_queue_write_token_verified",
"post_write_smoke",
],
},
"statements": statements,
"rollback_plan": [
{
"key": "transaction_not_opened_by_preview",
"label": "本 preview 不開 transaction若正式寫入失敗必須由 CLI transaction rollback",
},
{
"key": "dedupe_key_cleanup_review",
"label": "若正式寫入後需回退,依 dedupe_key 人工審核清理",
},
],
"safe_boundaries": [
*approval["safe_boundaries"],
"do_not_execute_queue_transaction_from_preview",
"do_not_open_database_connection_from_transaction_preview",
"do_not_commit_queue_transaction_from_preview",
"do_not_create_approval_record_from_transaction_preview",
"do_not_touch_momo_db_lifecycle",
],
}