Files
ewoooc/services/market_intel/migration_blueprint.py
OoO bc900321f8
All checks were successful
CD Pipeline / deploy (push) Successful in 1m1s
feat(market-intel): add alert review queue migration blueprint
2026-05-18 19:51:36 +08:00

433 lines
18 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""市場情報 migration 與正式 seed writer 命令草案。
本模組只產生可審核的 SQL / command preview不連線 DB、不執行命令。
"""
from pathlib import Path
MIGRATION_NUMBER = "032"
MIGRATION_FILENAME = "migrations/032_market_intel_core_schema.sql"
SEED_WRITER_SCRIPT = "scripts/market_intel_seed_writer.py"
FORWARD_SQL = """
-- =============================================================================
-- Migration 032: market_intel core schema
-- MOMO PRO - Cross-platform market campaign intelligence
-- 2026-05-07 Taipei
-- =============================================================================
-- Notes:
-- Creates the ADR-035 market_* schema. This migration is additive only:
-- it creates tables, indexes, and grants. It does not drop or alter existing
-- sales tables, and it does not touch the momo-db container lifecycle.
-- =============================================================================
CREATE TABLE IF NOT EXISTS market_platforms (
id BIGSERIAL PRIMARY KEY,
code VARCHAR(50) NOT NULL UNIQUE,
name VARCHAR(120) NOT NULL,
base_url VARCHAR(500),
enabled BOOLEAN NOT NULL DEFAULT FALSE,
crawl_policy_json TEXT,
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_market_platforms_code
ON market_platforms (code);
CREATE TABLE IF NOT EXISTS market_campaigns (
id BIGSERIAL PRIMARY KEY,
platform_code VARCHAR(50) NOT NULL REFERENCES market_platforms(code),
campaign_key VARCHAR(200) NOT NULL,
campaign_name VARCHAR(500) NOT NULL,
campaign_type VARCHAR(80),
campaign_url TEXT,
start_at TIMESTAMP,
end_at TIMESTAMP,
status VARCHAR(30) NOT NULL DEFAULT 'unknown',
discovered_at TIMESTAMP NOT NULL DEFAULT NOW(),
last_seen_at TIMESTAMP NOT NULL DEFAULT NOW(),
metadata_json TEXT,
CONSTRAINT uq_market_campaign_platform_key
UNIQUE (platform_code, campaign_key)
);
CREATE INDEX IF NOT EXISTS idx_market_campaigns_platform_code
ON market_campaigns (platform_code);
CREATE INDEX IF NOT EXISTS idx_market_campaigns_campaign_type
ON market_campaigns (campaign_type);
CREATE INDEX IF NOT EXISTS idx_market_campaigns_status
ON market_campaigns (status);
CREATE INDEX IF NOT EXISTS idx_market_campaign_status_time
ON market_campaigns (status, start_at, end_at);
CREATE TABLE IF NOT EXISTS market_campaign_snapshots (
id BIGSERIAL PRIMARY KEY,
campaign_id BIGINT NOT NULL REFERENCES market_campaigns(id),
batch_id VARCHAR(80) NOT NULL,
crawled_at TIMESTAMP NOT NULL DEFAULT NOW(),
title VARCHAR(500),
hero_text TEXT,
coupon_text TEXT,
raw_discount_text TEXT,
page_hash VARCHAR(128),
raw_snapshot_path TEXT,
status VARCHAR(30) NOT NULL DEFAULT 'success',
error_message TEXT,
metadata_json TEXT
);
CREATE INDEX IF NOT EXISTS idx_market_campaign_snapshots_campaign_id
ON market_campaign_snapshots (campaign_id);
CREATE INDEX IF NOT EXISTS idx_market_campaign_snapshots_batch_id
ON market_campaign_snapshots (batch_id);
CREATE INDEX IF NOT EXISTS idx_market_campaign_snapshots_crawled_at
ON market_campaign_snapshots (crawled_at);
CREATE INDEX IF NOT EXISTS idx_market_campaign_snapshots_page_hash
ON market_campaign_snapshots (page_hash);
CREATE INDEX IF NOT EXISTS idx_market_campaign_snapshots_status
ON market_campaign_snapshots (status);
CREATE INDEX IF NOT EXISTS idx_market_campaign_snapshot_campaign_time
ON market_campaign_snapshots (campaign_id, crawled_at);
CREATE TABLE IF NOT EXISTS market_campaign_products (
id BIGSERIAL PRIMARY KEY,
campaign_id BIGINT NOT NULL REFERENCES market_campaigns(id),
platform_code VARCHAR(50) NOT NULL,
platform_product_id VARCHAR(200) NOT NULL,
product_url TEXT,
name VARCHAR(500) NOT NULL,
brand VARCHAR(200),
image_url TEXT,
category_text VARCHAR(300),
price DOUBLE PRECISION,
original_price DOUBLE PRECISION,
discount_text VARCHAR(200),
discount_rate DOUBLE PRECISION,
coupon_text TEXT,
stock_text VARCHAR(200),
sold_count INTEGER,
rating DOUBLE PRECISION,
review_count INTEGER,
rank_position INTEGER,
is_active BOOLEAN NOT NULL DEFAULT TRUE,
first_seen_at TIMESTAMP NOT NULL DEFAULT NOW(),
last_seen_at TIMESTAMP NOT NULL DEFAULT NOW(),
metadata_json TEXT,
CONSTRAINT uq_market_campaign_product
UNIQUE (campaign_id, platform_code, platform_product_id)
);
CREATE INDEX IF NOT EXISTS idx_market_campaign_products_campaign_id
ON market_campaign_products (campaign_id);
CREATE INDEX IF NOT EXISTS idx_market_campaign_products_platform_code
ON market_campaign_products (platform_code);
CREATE INDEX IF NOT EXISTS idx_market_campaign_products_platform_product_id
ON market_campaign_products (platform_product_id);
CREATE INDEX IF NOT EXISTS idx_market_campaign_products_brand
ON market_campaign_products (brand);
CREATE INDEX IF NOT EXISTS idx_market_campaign_products_category_text
ON market_campaign_products (category_text);
CREATE INDEX IF NOT EXISTS idx_market_campaign_products_is_active
ON market_campaign_products (is_active);
CREATE INDEX IF NOT EXISTS idx_market_campaign_products_last_seen_at
ON market_campaign_products (last_seen_at);
CREATE INDEX IF NOT EXISTS idx_market_product_platform_seen
ON market_campaign_products (platform_code, last_seen_at);
CREATE INDEX IF NOT EXISTS idx_market_product_discount
ON market_campaign_products (discount_rate, price);
CREATE TABLE IF NOT EXISTS market_product_price_history (
id BIGSERIAL PRIMARY KEY,
market_product_id BIGINT NOT NULL REFERENCES market_campaign_products(id),
campaign_id BIGINT NOT NULL REFERENCES market_campaigns(id),
platform_code VARCHAR(50) NOT NULL,
platform_product_id VARCHAR(200) NOT NULL,
price DOUBLE PRECISION,
original_price DOUBLE PRECISION,
discount_rate DOUBLE PRECISION,
stock_text VARCHAR(200),
sold_count INTEGER,
rank_position INTEGER,
crawled_at TIMESTAMP NOT NULL DEFAULT NOW(),
batch_id VARCHAR(80) NOT NULL,
metadata_json TEXT
);
CREATE INDEX IF NOT EXISTS idx_market_product_price_history_market_product_id
ON market_product_price_history (market_product_id);
CREATE INDEX IF NOT EXISTS idx_market_product_price_history_campaign_id
ON market_product_price_history (campaign_id);
CREATE INDEX IF NOT EXISTS idx_market_product_price_history_platform_code
ON market_product_price_history (platform_code);
CREATE INDEX IF NOT EXISTS idx_market_product_price_history_platform_product_id
ON market_product_price_history (platform_product_id);
CREATE INDEX IF NOT EXISTS idx_market_product_price_history_crawled_at
ON market_product_price_history (crawled_at);
CREATE INDEX IF NOT EXISTS idx_market_product_price_history_batch_id
ON market_product_price_history (batch_id);
CREATE INDEX IF NOT EXISTS idx_market_price_platform_time
ON market_product_price_history (platform_code, platform_product_id, crawled_at);
CREATE INDEX IF NOT EXISTS idx_market_price_campaign_time
ON market_product_price_history (campaign_id, crawled_at);
CREATE TABLE IF NOT EXISTS market_product_matches (
id BIGSERIAL PRIMARY KEY,
market_product_id BIGINT NOT NULL REFERENCES market_campaign_products(id),
momo_product_id INTEGER REFERENCES products(id),
momo_i_code VARCHAR(50),
match_score DOUBLE PRECISION NOT NULL DEFAULT 0.0,
match_status VARCHAR(30) NOT NULL DEFAULT 'needs_review',
match_reason_json TEXT,
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
reviewed_at TIMESTAMP,
reviewed_by VARCHAR(120),
CONSTRAINT uq_market_product_momo_match
UNIQUE (market_product_id, momo_i_code)
);
CREATE INDEX IF NOT EXISTS idx_market_product_matches_market_product_id
ON market_product_matches (market_product_id);
CREATE INDEX IF NOT EXISTS idx_market_product_matches_momo_product_id
ON market_product_matches (momo_product_id);
CREATE INDEX IF NOT EXISTS idx_market_product_matches_momo_i_code
ON market_product_matches (momo_i_code);
CREATE INDEX IF NOT EXISTS idx_market_product_matches_match_status
ON market_product_matches (match_status);
CREATE INDEX IF NOT EXISTS idx_market_match_status_score
ON market_product_matches (match_status, match_score);
CREATE TABLE IF NOT EXISTS market_crawler_runs (
id BIGSERIAL PRIMARY KEY,
platform_code VARCHAR(50),
crawler_name VARCHAR(120) NOT NULL,
campaign_id BIGINT REFERENCES market_campaigns(id),
batch_id VARCHAR(80) NOT NULL,
started_at TIMESTAMP NOT NULL DEFAULT NOW(),
finished_at TIMESTAMP,
status VARCHAR(30) NOT NULL DEFAULT 'started',
dry_run BOOLEAN NOT NULL DEFAULT TRUE,
pages_found INTEGER NOT NULL DEFAULT 0,
products_found INTEGER NOT NULL DEFAULT 0,
products_changed INTEGER NOT NULL DEFAULT 0,
error_count INTEGER NOT NULL DEFAULT 0,
error_message TEXT,
metadata_json TEXT
);
CREATE INDEX IF NOT EXISTS idx_market_crawler_runs_platform_code
ON market_crawler_runs (platform_code);
CREATE INDEX IF NOT EXISTS idx_market_crawler_runs_crawler_name
ON market_crawler_runs (crawler_name);
CREATE INDEX IF NOT EXISTS idx_market_crawler_runs_campaign_id
ON market_crawler_runs (campaign_id);
CREATE INDEX IF NOT EXISTS idx_market_crawler_runs_batch_id
ON market_crawler_runs (batch_id);
CREATE INDEX IF NOT EXISTS idx_market_crawler_runs_started_at
ON market_crawler_runs (started_at);
CREATE INDEX IF NOT EXISTS idx_market_crawler_runs_status
ON market_crawler_runs (status);
CREATE INDEX IF NOT EXISTS idx_market_crawler_run_platform_time
ON market_crawler_runs (platform_code, started_at);
CREATE INDEX IF NOT EXISTS idx_market_crawler_run_status_time
ON market_crawler_runs (status, started_at);
CREATE TABLE IF NOT EXISTS market_alert_review_queue (
id BIGSERIAL PRIMARY KEY,
alert_candidate_id VARCHAR(120) NOT NULL UNIQUE,
review_state VARCHAR(40) NOT NULL DEFAULT 'draft',
priority_lane VARCHAR(40) NOT NULL DEFAULT 'watch',
threshold_level VARCHAR(40) NOT NULL,
total_score DOUBLE PRECISION NOT NULL DEFAULT 0.0,
evidence_bundle_id VARCHAR(120) NOT NULL,
dedupe_key VARCHAR(240) NOT NULL,
source_batch_id VARCHAR(80) NOT NULL,
campaign_id BIGINT REFERENCES market_campaigns(id),
market_product_id BIGINT REFERENCES market_campaign_products(id),
momo_i_code VARCHAR(50),
reviewer_identity VARCHAR(120),
review_action VARCHAR(60),
review_reason TEXT,
reviewed_at TIMESTAMP,
previous_state VARCHAR(40),
next_state VARCHAR(40),
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP NOT NULL DEFAULT NOW(),
metadata_json TEXT
);
CREATE INDEX IF NOT EXISTS idx_market_alert_review_queue_alert_candidate_id
ON market_alert_review_queue (alert_candidate_id);
CREATE INDEX IF NOT EXISTS idx_market_alert_review_queue_state
ON market_alert_review_queue (review_state);
CREATE INDEX IF NOT EXISTS idx_market_alert_review_queue_priority_lane
ON market_alert_review_queue (priority_lane);
CREATE INDEX IF NOT EXISTS idx_market_alert_review_queue_threshold_level
ON market_alert_review_queue (threshold_level);
CREATE INDEX IF NOT EXISTS idx_market_alert_review_queue_evidence_bundle_id
ON market_alert_review_queue (evidence_bundle_id);
CREATE INDEX IF NOT EXISTS idx_market_alert_review_queue_source_batch_id
ON market_alert_review_queue (source_batch_id);
CREATE INDEX IF NOT EXISTS idx_market_alert_review_queue_campaign_id
ON market_alert_review_queue (campaign_id);
CREATE INDEX IF NOT EXISTS idx_market_alert_review_queue_market_product_id
ON market_alert_review_queue (market_product_id);
CREATE INDEX IF NOT EXISTS idx_market_alert_review_queue_momo_i_code
ON market_alert_review_queue (momo_i_code);
CREATE INDEX IF NOT EXISTS idx_market_alert_review_queue_state_priority
ON market_alert_review_queue (review_state, priority_lane, created_at);
CREATE UNIQUE INDEX IF NOT EXISTS ux_market_alert_review_queue_dedupe
ON market_alert_review_queue (dedupe_key);
CREATE INDEX IF NOT EXISTS idx_market_alert_review_queue_bundle
ON market_alert_review_queue (evidence_bundle_id, source_batch_id);
GRANT ALL PRIVILEGES ON market_platforms TO momo;
GRANT ALL PRIVILEGES ON market_campaigns TO momo;
GRANT ALL PRIVILEGES ON market_campaign_snapshots TO momo;
GRANT ALL PRIVILEGES ON market_campaign_products TO momo;
GRANT ALL PRIVILEGES ON market_product_price_history TO momo;
GRANT ALL PRIVILEGES ON market_product_matches TO momo;
GRANT ALL PRIVILEGES ON market_crawler_runs TO momo;
GRANT ALL PRIVILEGES ON market_alert_review_queue TO momo;
GRANT USAGE, SELECT ON SEQUENCE market_platforms_id_seq TO momo;
GRANT USAGE, SELECT ON SEQUENCE market_campaigns_id_seq TO momo;
GRANT USAGE, SELECT ON SEQUENCE market_campaign_snapshots_id_seq TO momo;
GRANT USAGE, SELECT ON SEQUENCE market_campaign_products_id_seq TO momo;
GRANT USAGE, SELECT ON SEQUENCE market_product_price_history_id_seq TO momo;
GRANT USAGE, SELECT ON SEQUENCE market_product_matches_id_seq TO momo;
GRANT USAGE, SELECT ON SEQUENCE market_crawler_runs_id_seq TO momo;
GRANT USAGE, SELECT ON SEQUENCE market_alert_review_queue_id_seq TO momo;
""".strip()
ROLLBACK_SQL = """
-- Manual rollback draft only. Do not run without operator approval and backup verification.
DROP TABLE IF EXISTS market_alert_review_queue;
DROP TABLE IF EXISTS market_crawler_runs;
DROP TABLE IF EXISTS market_product_matches;
DROP TABLE IF EXISTS market_product_price_history;
DROP TABLE IF EXISTS market_campaign_products;
DROP TABLE IF EXISTS market_campaign_snapshots;
DROP TABLE IF EXISTS market_campaigns;
DROP TABLE IF EXISTS market_platforms;
""".strip()
def _statement_count(sql_text):
return len([part for part in sql_text.split(";") if part.strip()])
def _contains_destructive_forward_sql(sql_text):
lowered = sql_text.lower()
destructive_markers = (
"drop table",
"truncate ",
"delete from",
"alter table products",
"alter table daily_sales",
"alter table monthly_sales",
)
return any(marker in lowered for marker in destructive_markers)
def build_migration_blueprint(expected_tables):
"""建立 market_intel migration 與真寫入命令草案;不執行任何 SQL。"""
expected_tables = list(expected_tables)
migration_path = Path(MIGRATION_FILENAME)
seed_writer_path = Path(SEED_WRITER_SCRIPT)
migration_file_exists = migration_path.exists()
seed_writer_script_exists = seed_writer_path.exists()
migration_file_text = (
migration_path.read_text(encoding="utf-8").strip()
if migration_file_exists
else ""
)
migration_file_matches_blueprint = (
migration_file_exists and migration_file_text == FORWARD_SQL
)
forward_has_destructive_sql = _contains_destructive_forward_sql(FORWARD_SQL)
blocked_reasons = [
"migration_not_executed",
"backup_not_verified",
"operator_approval_missing",
"production_maintenance_window_required",
"seed_writer_real_write_requires_cli_apply_flag",
]
if not migration_file_exists:
blocked_reasons.insert(0, "migration_file_not_created")
elif not migration_file_matches_blueprint:
blocked_reasons.insert(0, "migration_file_differs_from_blueprint")
return {
"mode": "migration_file_draft_read_only",
"migration_number": MIGRATION_NUMBER,
"suggested_filename": MIGRATION_FILENAME,
"file_created": migration_file_exists,
"file_matches_blueprint": migration_file_matches_blueprint,
"file_status": (
"local_draft_matches_blueprint"
if migration_file_matches_blueprint
else "local_draft_differs_from_blueprint"
if migration_file_exists
else "not_created"
),
"migration_executed": False,
"database_session_created": False,
"database_commit_executed": False,
"external_network_executed": False,
"scheduler_attached": False,
"expected_tables": expected_tables,
"table_count": len(expected_tables),
"forward_sql": FORWARD_SQL,
"forward_statement_count": _statement_count(FORWARD_SQL),
"forward_has_destructive_sql": forward_has_destructive_sql,
"rollback_sql": ROLLBACK_SQL,
"rollback_requires_manual_approval": True,
"rollback_statement_count": _statement_count(ROLLBACK_SQL),
"blocked_reasons": blocked_reasons,
"table_operations": [
{
"table": table_name,
"operation": "CREATE TABLE IF NOT EXISTS",
"write_status": "preview_only_not_executed",
}
for table_name in expected_tables
],
"command_plan": {
"migration_apply_command": {
"command": (
"psql \"$DATABASE_URL\" -v ON_ERROR_STOP=1 "
f"-f {MIGRATION_FILENAME}"
),
"executed": False,
"requires_backup": True,
"requires_adr011_deploy_boundary_review": True,
},
"seed_writer_command": {
"command": (
"MARKET_INTEL_CRAWLER_ENABLED=false "
"MARKET_INTEL_SEED_WRITE_APPROVAL=<one_time_token> "
f"python {SEED_WRITER_SCRIPT} --execute --apply-real-write --platform all"
),
"executed": False,
"script_created": seed_writer_script_exists,
"script_path": SEED_WRITER_SCRIPT,
"requires_new_approval_token": True,
"notes": (
"Seed writer 真寫入只限 CLI、確認 token 與 apply flag不要為了 "
"seed upsert 而打開 crawler/manual fetch 權限。"
),
},
},
"safety_checks": {
"forward_sql_additive_only": not forward_has_destructive_sql,
"does_not_touch_momo_db_container": True,
"does_not_attach_scheduler": True,
"does_not_enable_external_crawling": True,
"writes_seed_rows_only_with_cli_apply_flag": True,
},
}