Files
ewoooc/services/market_intel/manual_sample_review.py
OoO 7a6afa3055
All checks were successful
CD Pipeline / deploy (push) Successful in 1m3s
新增市場情報候選活動交接預覽
2026-05-19 00:54:41 +08:00

500 lines
18 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""市場情報人工 sample result 審核預覽。
本模組只用純函式評估操作員提供的 sample result payload
不抓外部網站、不查 DB、不寫 DB、不建立候選活動、不掛排程。
"""
from services.market_intel.manual_sample_acceptance import (
REQUIRED_DIAGNOSTIC_FIELDS,
REQUIRED_RESULT_FIELDS,
)
DEFAULT_ACCEPTANCE_THRESHOLDS = {
"http_status_min": 200,
"http_status_max": 299,
"minimum_content_length": 500,
"page_hash_length": 64,
"minimum_title_length": 2,
"minimum_link_count": 1,
"minimum_campaign_candidates": 1,
"accepted_candidate_bands": ["high", "medium"],
}
def _as_int(value, default=0):
try:
return int(value)
except (TypeError, ValueError):
return default
def _thresholds(acceptance_contract):
configured = {}
if isinstance(acceptance_contract, dict):
configured = acceptance_contract.get("acceptance_thresholds") or {}
return {
**DEFAULT_ACCEPTANCE_THRESHOLDS,
**configured,
}
def _normalize_candidate(candidate):
if not isinstance(candidate, dict):
return {
"confidence_band": "unknown",
"score": 0,
"url": "",
"text": "",
}
return {
"confidence_band": str(candidate.get("confidence_band") or "unknown"),
"score": _as_int(candidate.get("score"), 0),
"url": str(candidate.get("url") or candidate.get("href") or ""),
"text": str(candidate.get("text") or candidate.get("title") or ""),
}
def _build_check(key, label, passed, observed, expected):
return {
"key": key,
"label": label,
"status": "pass" if passed else "block",
"passed": bool(passed),
"observed": observed,
"expected": expected,
}
def evaluate_manual_sample_result(sample_result, acceptance_contract):
"""以驗收契約評估單一 sample result不做任何 IO。"""
thresholds = _thresholds(acceptance_contract)
if not sample_result:
return {
"sample_result_loaded": False,
"sample_result_reviewed": False,
"sample_result_accepted": False,
"ready_for_candidate_preview": False,
"candidate_import_allowed": False,
"review_result": "planned_no_sample_result",
"review_checks": [],
"review_findings": [
{
"key": "sample_result_not_loaded",
"severity": "block",
"label": "尚未載入人工樣本結果,維持預覽狀態",
},
],
"candidate_summary": {
"candidate_count": 0,
"accepted_candidate_count": 0,
"accepted_candidate_bands": thresholds["accepted_candidate_bands"],
"top_candidates": [],
},
}
diagnostics = sample_result.get("diagnostics")
diagnostics = diagnostics if isinstance(diagnostics, dict) else {}
candidates = diagnostics.get("campaign_link_candidates")
candidates = candidates if isinstance(candidates, list) else []
normalized_candidates = [_normalize_candidate(item) for item in candidates]
accepted_bands = set(thresholds["accepted_candidate_bands"])
accepted_candidates = [
item for item in normalized_candidates
if item["confidence_band"] in accepted_bands
]
missing_result_fields = [
field for field in REQUIRED_RESULT_FIELDS
if sample_result.get(field) in (None, "")
]
missing_diagnostic_fields = [
field for field in REQUIRED_DIAGNOSTIC_FIELDS
if diagnostics.get(field) in (None, "")
]
status_code = _as_int(sample_result.get("status_code"), 0)
content_length = _as_int(sample_result.get("content_length"), 0)
page_hash = str(sample_result.get("page_hash") or "")
title = str(sample_result.get("title") or "")
link_count = _as_int(diagnostics.get("link_count"), 0)
checks = [
_build_check(
"required_result_fields_present",
"sample result 必須包含 Phase 48 定義的必要欄位",
not missing_result_fields,
missing_result_fields,
list(REQUIRED_RESULT_FIELDS),
),
_build_check(
"required_diagnostics_present",
"diagnostics 必須包含連結與候選診斷欄位",
not missing_diagnostic_fields,
missing_diagnostic_fields,
list(REQUIRED_DIAGNOSTIC_FIELDS),
),
_build_check(
"http_status_ok",
"HTTP status 必須落在允收區間",
thresholds["http_status_min"]
<= status_code
<= thresholds["http_status_max"],
status_code,
f"{thresholds['http_status_min']}-{thresholds['http_status_max']}",
),
_build_check(
"content_has_body",
"content_length 必須超過最低門檻",
content_length >= thresholds["minimum_content_length"],
content_length,
thresholds["minimum_content_length"],
),
_build_check(
"page_fingerprint_present",
"page_hash 必須符合固定長度,後續才能比對頁面變化",
len(page_hash) == thresholds["page_hash_length"],
len(page_hash),
thresholds["page_hash_length"],
),
_build_check(
"title_present",
"title 必須能判斷頁面內容,不接受空白或挑戰頁",
len(title.strip()) >= thresholds["minimum_title_length"],
len(title.strip()),
thresholds["minimum_title_length"],
),
_build_check(
"link_count_present",
"link_count 必須高於最低門檻,避免空頁或驗證頁",
link_count >= thresholds["minimum_link_count"],
link_count,
thresholds["minimum_link_count"],
),
_build_check(
"candidate_quality_reviewed",
"至少需要一筆 high/medium 活動候選進入人工候選預覽",
len(accepted_candidates) >= thresholds["minimum_campaign_candidates"],
len(accepted_candidates),
thresholds["minimum_campaign_candidates"],
),
]
findings = [
{
"key": check["key"],
"severity": "block",
"label": check["label"],
"observed": check["observed"],
"expected": check["expected"],
}
for check in checks
if not check["passed"]
]
accepted = all(check["passed"] for check in checks)
return {
"sample_result_loaded": True,
"sample_result_reviewed": True,
"sample_result_accepted": accepted,
"ready_for_candidate_preview": accepted,
"candidate_import_allowed": False,
"review_result": (
"accepted_for_candidate_preview"
if accepted
else "rejected_sample_result"
),
"review_checks": checks,
"review_findings": findings,
"candidate_summary": {
"candidate_count": len(normalized_candidates),
"accepted_candidate_count": len(accepted_candidates),
"accepted_candidate_bands": thresholds["accepted_candidate_bands"],
"top_candidates": accepted_candidates[:5],
},
}
def build_manual_sample_review_preview(
*,
runtime_status,
acceptance_contract,
sample_result=None,
):
"""建立人工樣本結果審核預覽;預設不載入 sample result。"""
evaluation = evaluate_manual_sample_result(
sample_result,
acceptance_contract,
)
gate_checks = {
"acceptance_contract_ready": bool(
acceptance_contract
and acceptance_contract.get("contract_ready")
),
"sample_review_is_pure_function": True,
"external_network_blocked_in_review": True,
"database_write_still_blocked": not bool(
getattr(runtime_status, "database_write_allowed", False)
),
"scheduler_detached": not bool(
getattr(runtime_status, "scheduler_attached", False)
),
}
blocked_reasons = [
key for key, passed in gate_checks.items()
if not passed
]
if not evaluation["sample_result_loaded"]:
blocked_reasons.append("sample_result_not_loaded")
if not evaluation["ready_for_candidate_preview"]:
blocked_reasons.append("candidate_preview_not_ready")
blocked_reasons.append("candidate_import_still_blocked_until_operator_approval")
return {
"mode": "manual_sample_review_preview",
"contract_ready": bool(gate_checks["acceptance_contract_ready"]),
"sample_result_loaded": evaluation["sample_result_loaded"],
"sample_result_reviewed": evaluation["sample_result_reviewed"],
"sample_result_accepted": evaluation["sample_result_accepted"],
"ready_for_candidate_preview": evaluation["ready_for_candidate_preview"],
"candidate_import_allowed": False,
"external_network_executed": False,
"database_connection_opened": False,
"database_session_created": False,
"database_write_executed": False,
"database_commit_executed": False,
"scheduler_attached": False,
"writes_executed": False,
"would_write_database": False,
"gate_checks": gate_checks,
"blocked_reasons": blocked_reasons,
"review_result": evaluation["review_result"],
"review_checks": evaluation["review_checks"],
"review_findings": evaluation["review_findings"],
"candidate_summary": evaluation["candidate_summary"],
"operator_next_actions": [
{
"key": "load_single_sample_result_manually",
"label": "由操作員提供單一平台 sample result JSON再用純函式審核",
"write_status": "blocked",
},
{
"key": "open_candidate_preview_after_pass",
"label": "審核通過後只開候選活動預覽,仍不得寫入 market_campaigns",
"write_status": "blocked",
},
{
"key": "revise_adapter_source_after_reject",
"label": "審核未通過時調整 adapter source 或暫停該平台",
"write_status": "blocked",
},
],
"safe_boundaries": [
"do_not_fetch_external_pages_from_review_api",
"do_not_store_sample_result_from_review_preview",
"do_not_import_candidates_from_review_preview",
"do_not_write_market_tables_from_review_preview",
"do_not_attach_scheduler_from_review_preview",
"do_not_touch_momo_db_lifecycle",
],
}
def build_manual_sample_review_evaluation_preview(
*,
runtime_status,
acceptance_contract,
sample_result=None,
payload_error=None,
):
"""建立操作員 POST sample result 的即時審核預覽;不保存 payload。"""
payload_received = sample_result is not None
payload_valid = isinstance(sample_result, dict) and not payload_error
review = build_manual_sample_review_preview(
runtime_status=runtime_status,
acceptance_contract=acceptance_contract,
sample_result=sample_result if payload_valid else None,
)
blocked_reasons = list(review["blocked_reasons"])
review_findings = list(review["review_findings"])
if not payload_valid:
blocked_reasons.append("sample_result_payload_invalid")
review_findings.append(
{
"key": "sample_result_payload_invalid",
"severity": "block",
"label": "POST body 必須是單一 sample result JSON object 或 sample_result object",
}
)
return {
**review,
"mode": "manual_sample_review_evaluation_preview",
"review_request_type": "operator_posted_json",
"payload_received": payload_received,
"payload_valid_json_object": payload_valid,
"payload_error": payload_error,
"payload_persisted": False,
"sample_result_persisted": False,
"candidate_preview_payload_created": bool(
review["ready_for_candidate_preview"]
),
"candidate_preview_persisted": False,
"blocked_reasons": blocked_reasons,
"review_findings": review_findings,
"safe_boundaries": [
*review["safe_boundaries"],
"do_not_echo_full_sample_payload",
"do_not_persist_posted_review_payload",
],
}
def _accepted_candidates_from_sample(sample_result, acceptance_contract, limit):
diagnostics = sample_result.get("diagnostics") if isinstance(sample_result, dict) else {}
diagnostics = diagnostics if isinstance(diagnostics, dict) else {}
raw_candidates = diagnostics.get("campaign_link_candidates")
raw_candidates = raw_candidates if isinstance(raw_candidates, list) else []
accepted_bands = set(_thresholds(acceptance_contract)["accepted_candidate_bands"])
normalized = [
_normalize_candidate(item)
for item in raw_candidates
]
return [
item for item in normalized
if item["confidence_band"] in accepted_bands
][:limit]
def build_manual_sample_candidate_handoff_preview(
*,
runtime_status,
acceptance_contract,
sample_result=None,
payload_error=None,
limit=20,
):
"""建立人工樣本候選活動 handoff只產生 preview payload不保存。"""
safe_limit = max(1, min(_as_int(limit, 20), 50))
review = build_manual_sample_review_evaluation_preview(
runtime_status=runtime_status,
acceptance_contract=acceptance_contract,
sample_result=sample_result,
payload_error=payload_error,
)
handoff_ready = bool(
review["payload_valid_json_object"]
and review["sample_result_accepted"]
and review["ready_for_candidate_preview"]
)
candidates = []
if handoff_ready:
platform_code = str(sample_result.get("platform_code") or "")
source_key = str(sample_result.get("source_key") or "")
source_url = str(sample_result.get("source_url") or "")
for index, candidate in enumerate(
_accepted_candidates_from_sample(
sample_result,
acceptance_contract,
safe_limit,
),
start=1,
):
candidates.append(
{
"candidate_key": (
f"{platform_code}:{source_key}:{index}:"
f"{candidate['confidence_band']}:{candidate['score']}"
),
"platform_code": platform_code,
"source_key": source_key,
"source_url": source_url,
"candidate_url": candidate["url"],
"candidate_text": candidate["text"],
"confidence_band": candidate["confidence_band"],
"score": candidate["score"],
"rank_position": index,
"review_status": "needs_operator_review",
"write_status": "blocked_preview_only",
"import_allowed": False,
}
)
blocked_reasons = list(review["blocked_reasons"])
if not handoff_ready:
blocked_reasons.append("candidate_handoff_not_ready")
blocked_reasons.append("candidate_handoff_persist_still_blocked")
return {
"mode": "manual_sample_candidate_handoff_preview",
"review": {
"mode": review["mode"],
"review_result": review["review_result"],
"sample_result_accepted": review["sample_result_accepted"],
"ready_for_candidate_preview": review["ready_for_candidate_preview"],
"review_findings": review["review_findings"],
},
"payload_received": review["payload_received"],
"payload_valid_json_object": review["payload_valid_json_object"],
"payload_error": review["payload_error"],
"payload_persisted": False,
"sample_result_persisted": False,
"handoff_ready": handoff_ready,
"candidate_handoff_created": bool(candidates),
"candidate_handoff_persisted": False,
"candidate_import_allowed": False,
"external_network_executed": False,
"database_connection_opened": False,
"database_session_created": False,
"database_write_executed": False,
"database_commit_executed": False,
"scheduler_attached": False,
"writes_executed": False,
"would_write_database": False,
"blocked_reasons": blocked_reasons,
"handoff_summary": {
"candidate_count": len(candidates),
"limit": safe_limit,
"review_status": "needs_operator_review" if candidates else "blocked",
"import_allowed": False,
},
"candidate_preview_contract": {
"required_fields": [
"candidate_key",
"platform_code",
"source_key",
"source_url",
"candidate_url",
"candidate_text",
"confidence_band",
"score",
"rank_position",
"review_status",
],
"forbidden_actions": [
"insert_market_campaigns",
"insert_market_campaign_products",
"create_crawler_run",
"auto_import_candidates",
],
},
"candidates": candidates,
"operator_next_actions": [
{
"key": "review_candidate_urls",
"label": "人工檢查候選活動 URL、文字與信心分級",
"write_status": "blocked",
},
{
"key": "promote_to_candidate_review_queue_later",
"label": "後續需另行批准才可建立候選審核 queue",
"write_status": "blocked",
},
],
"safe_boundaries": [
"do_not_fetch_external_pages_from_handoff_api",
"do_not_persist_candidate_handoff_payload",
"do_not_import_candidates_from_handoff_preview",
"do_not_write_market_tables_from_handoff_preview",
"do_not_attach_scheduler_from_handoff_preview",
"do_not_touch_momo_db_lifecycle",
],
}