diff --git a/apps/api/src/jobs/asset_change_tracker_job.py b/apps/api/src/jobs/asset_change_tracker_job.py new file mode 100644 index 00000000..e46b7875 --- /dev/null +++ b/apps/api/src/jobs/asset_change_tracker_job.py @@ -0,0 +1,246 @@ +""" +Asset Change Tracker Job — ADR-090 § asset_change_event +========================================================= +每 1h 比對最近兩次 asset_discovery_run,寫 asset_change_event (added/removed/modified). + +職責邊界: + ✅ 比對 run_N vs run_N-1 的 asset set + ✅ 新出現的 asset → 'asset_added' event + ✅ 消失的 asset (lifecycle 'deprecated' 或完全不在新 run) → 'asset_removed' + ✅ 存在於兩次但 metadata 有差異 → 'asset_modified' + ⏳ TODO: coverage_improved/degraded (需要 coverage_evaluator 歷史比對) + ⏳ TODO: criticality_changed / owner_changed (需人工設定 criticality 欄位) + +設計鐵律: + - 用 asset_key (UNIQUE) 作比對基準,跨 run 穩定 + - before_state/after_state 存 metadata JSONB 便於 AI 分析 + - diff jsonb 標註變動欄位 + - 失敗 → log + 跳過,下次重試 + +排程: + - 首次延遲 360s (讓 asset_scanner 至少跑 2 次) + - 每 1h + +2026-04-19 ogt + Claude Opus 4.7 (1M context) Asia/Taipei +ADR-090 § Phase 7 Change Tracking +""" +from __future__ import annotations + +import asyncio +import json as _json +import time as _time +from typing import Any + +import structlog + +logger = structlog.get_logger(__name__) + +_TRACK_INTERVAL_SEC = 3600 +_FIRST_DELAY_SEC = 360 +_LOOP_BACKOFF_SEC = 600 + + +async def run_asset_change_tracker_loop() -> None: + """每 1h 比對最近兩次 run,寫 asset_change_event.""" + logger.info("asset_change_tracker_loop_started", interval_sec=_TRACK_INTERVAL_SEC) + await asyncio.sleep(_FIRST_DELAY_SEC) + + while True: + try: + await track_once() + except Exception as e: + logger.exception("asset_change_tracker_loop_error", error=str(e)) + await asyncio.sleep(_LOOP_BACKOFF_SEC) + continue + await asyncio.sleep(_TRACK_INTERVAL_SEC) + + +async def track_once() -> dict[str, int]: + """比對兩個最近的 run,產出 change events.""" + started_ms = _time.time() + stats = {"added": 0, "removed": 0, "modified": 0} + error_msg: str | None = None + + try: + runs = await _get_recent_runs(limit=2) + if len(runs) < 2: + logger.info("asset_change_tracker_need_two_runs", got=len(runs)) + return stats + + newer_run, older_run = runs[0], runs[1] + logger.info("asset_change_tracker_comparing", newer=newer_run, older=older_run) + + stats = await _diff_runs(newer_run, older_run) + + except Exception as e: + error_msg = f"{type(e).__name__}: {e}"[:1000] + logger.exception("asset_change_track_once_failed", error=error_msg) + + duration_ms = int((_time.time() - started_ms) * 1000) + await _log_aol(stats, duration_ms, error_msg) + + logger.info( + "asset_change_track_once_done", + added=stats["added"], + removed=stats["removed"], + modified=stats["modified"], + duration_ms=duration_ms, + ) + return stats + + +async def _get_recent_runs(limit: int = 2) -> list[str]: + """取最近 N 個 success 的 run_id (降序).""" + from sqlalchemy import text as _sql + from src.db.base import get_db_context + + async with get_db_context() as db: + rows = await db.execute( + _sql("SELECT run_id FROM asset_discovery_run WHERE status='success' ORDER BY ended_at DESC LIMIT :lim"), + {"lim": limit}, + ) + return [str(r[0]) for r in rows.fetchall()] + + +async def _diff_runs(newer_run: str, older_run: str) -> dict[str, int]: + """ + 比較兩個 run 所關聯的 asset set (via asset_coverage_snapshot JOIN asset_inventory). + + Strategy: + - 用 coverage_snapshot 知道哪些 asset 出現在哪 run + - newer - older = added + - older - newer = removed (同時 lifecycle_state 改 deprecated by asset_scanner 流程) + - newer ∩ older 且 metadata 變 = modified + """ + from sqlalchemy import text as _sql + from src.db.base import get_db_context + + stats = {"added": 0, "removed": 0, "modified": 0} + + async with get_db_context() as db: + # 1. Added: newer run 有但 older run 沒有 + result = await db.execute( + _sql(""" + INSERT INTO asset_change_event ( + run_id, asset_id, change_type, + before_state, after_state, diff, detected_at + ) + SELECT + CAST(:newer AS uuid), + ai.asset_id, + 'asset_added', + NULL, + ai.metadata, + jsonb_build_object('asset_key', ai.asset_key, 'asset_type', ai.asset_type), + NOW() + FROM asset_inventory ai + WHERE ai.asset_id IN ( + SELECT DISTINCT cs_new.asset_id FROM asset_coverage_snapshot cs_new + WHERE cs_new.run_id = CAST(:newer AS uuid) + EXCEPT + SELECT DISTINCT cs_old.asset_id FROM asset_coverage_snapshot cs_old + WHERE cs_old.run_id = CAST(:older AS uuid) + ) + ON CONFLICT DO NOTHING + """), + {"newer": newer_run, "older": older_run}, + ) + stats["added"] = result.rowcount or 0 + + # 2. Removed: older 有但 newer 沒有 + result = await db.execute( + _sql(""" + INSERT INTO asset_change_event ( + run_id, asset_id, change_type, + before_state, after_state, diff, detected_at + ) + SELECT + CAST(:newer AS uuid), + ai.asset_id, + 'asset_removed', + ai.metadata, + NULL, + jsonb_build_object('asset_key', ai.asset_key, 'asset_type', ai.asset_type), + NOW() + FROM asset_inventory ai + WHERE ai.asset_id IN ( + SELECT DISTINCT cs_old.asset_id FROM asset_coverage_snapshot cs_old + WHERE cs_old.run_id = CAST(:older AS uuid) + EXCEPT + SELECT DISTINCT cs_new.asset_id FROM asset_coverage_snapshot cs_new + WHERE cs_new.run_id = CAST(:newer AS uuid) + ) + ON CONFLICT DO NOTHING + """), + {"newer": newer_run, "older": older_run}, + ) + stats["removed"] = result.rowcount or 0 + + # 3. Modified: 兩次都在,lifecycle_state 有變化 (asset_scanner UPSERT 會改 lifecycle) + # 實務上 metadata 差異過於 noisy,只追蹤 lifecycle_state 變化 + # 另外: pod phase 變化 (Running→CrashLoopBackOff 等) 也記 + # 本 MVP 版偵測: asset.updated_at 比 asset.first_seen_at 新且相差在兩次 run 之間 + # (簡化: 只記 lifecycle_state='deprecated' 被標的 asset,這些通常是新失去的 pods) + result = await db.execute( + _sql(""" + INSERT INTO asset_change_event ( + run_id, asset_id, change_type, + before_state, after_state, diff, detected_at + ) + SELECT + CAST(:newer AS uuid), + ai.asset_id, + 'lifecycle_changed', + jsonb_build_object('prior_state', 'active'), + jsonb_build_object('new_state', ai.lifecycle_state), + jsonb_build_object('asset_key', ai.asset_key), + NOW() + FROM asset_inventory ai + WHERE ai.lifecycle_state = 'deprecated' + AND ai.updated_at > NOW() - INTERVAL '2 hours' + AND NOT EXISTS ( + SELECT 1 FROM asset_change_event ace + WHERE ace.asset_id = ai.asset_id + AND ace.change_type = 'lifecycle_changed' + AND ace.detected_at > NOW() - INTERVAL '2 hours' + ) + ON CONFLICT DO NOTHING + """), + {"newer": newer_run}, + ) + stats["modified"] = result.rowcount or 0 + + return stats + + +async def _log_aol(stats: dict[str, int], duration_ms: int, error: str | None) -> None: + try: + from sqlalchemy import text as _sql + from src.db.base import get_db_context + + aol_status = "failed" if error else "success" + async with get_db_context() as db: + await db.execute( + _sql(""" + INSERT INTO automation_operation_log ( + operation_type, actor, status, + input, output, duration_ms, error, tags + ) VALUES ( + 'asset_discovered', + 'asset_change_tracker', + :st, + '{}'::jsonb, + CAST(:output AS jsonb), + :dur, :err, :tags + ) + """), + { + "st": aol_status, + "output": _json.dumps(stats, ensure_ascii=False), + "dur": duration_ms, + "err": (error or "")[:2000] if error else None, + "tags": ["change_tracker", "asset"], + }, + ) + except Exception as e: + logger.warning("asset_change_tracker_aol_failed", error=str(e)) diff --git a/apps/api/src/main.py b/apps/api/src/main.py index 5d27985b..1bb609e5 100644 --- a/apps/api/src/main.py +++ b/apps/api/src/main.py @@ -430,6 +430,16 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]: except Exception as e: logger.warning("rule_stats_updater_loop_schedule_failed", error=str(e)) + # ADR-090 § Asset Change Tracker (2026-04-19 ogt + Claude Opus 4.7 Asia/Taipei) + # 每 1h 比對最近兩次 asset_discovery_run,寫 asset_change_event + # 解鎖: 資產變化歷史 (added/removed/lifecycle_changed),AI 可追蹤集群演進 + try: + from src.jobs.asset_change_tracker_job import run_asset_change_tracker_loop + asyncio.create_task(run_asset_change_tracker_loop()) + logger.info("asset_change_tracker_loop_scheduled", interval_sec=3600) + except Exception as e: + logger.warning("asset_change_tracker_loop_schedule_failed", error=str(e)) + # ADR-076 Task 4: 每日 08:00 台北時間自動日度巡檢報告 # 2026-04-14 Claude Haiku 4.5 Asia/Taipei try: