diff --git a/.gitea/workflows/cd.yaml b/.gitea/workflows/cd.yaml index a0f6bc45..e71e3417 100644 --- a/.gitea/workflows/cd.yaml +++ b/.gitea/workflows/cd.yaml @@ -159,8 +159,10 @@ jobs: apt-get install -y -q postgresql-client fi # 找 act runner 為本 task 創的 network (Gitea act 命名: GITEA-ACTIONS-TASK-XXX_*-network) - ACT_NET=$(docker network ls --format '{{.Name}}' | grep -E '^GITEA-ACTIONS-TASK-[0-9]+_WORKFLOW-.*-network$' | head -1) - echo "Detected act task network: ${ACT_NET:-}" + # 注意: act runner 用 'bash -e -o pipefail',grep 無 match 時 exit 1 會中斷整 step + # 必須 || echo "" 確保 grep 失敗時 ACT_NET 為空字串而非 abort + ACT_NET=$(docker network ls --format '{{.Name}}' 2>/dev/null | (grep -E '^GITEA-ACTIONS-TASK-[0-9]+_WORKFLOW-.*-network$' || echo "") | head -1) + echo "Detected act task network: ${ACT_NET:-(none, will fall back to bridge)}" # 啟動測試 DB — 加入 act network,後續用 container name 'pg-test-b5' 連線 docker rm -f pg-test-b5 2>/dev/null || true docker run -d --name pg-test-b5 \ diff --git a/apps/api/src/jobs/asset_scanner_job.py b/apps/api/src/jobs/asset_scanner_job.py new file mode 100644 index 00000000..fa3d2313 --- /dev/null +++ b/apps/api/src/jobs/asset_scanner_job.py @@ -0,0 +1,489 @@ +""" +Asset Scanner Job — ADR-090 §4.1 資產盤點 cron +================================================ +每 1 小時掃描 K8s + 寫入 asset_inventory + asset_discovery_run + asset_coverage_snapshot. + +職責邊界: + ✅ K8s API 列出全部 namespace 的 pods/deployments/services (shallow scan) + ✅ UPSERT asset_inventory (asset_key 為 UNIQUE) + ✅ 為每個 active asset 寫 7 維 asset_coverage_snapshot (預設 unknown,後續 service 補) + ✅ 完成時寫 automation_operation_log(asset_discovered) + ❌ 不掃 Prometheus targets / Gitea repos / Docker compose (留下一階段) + ❌ 不算 capacity 欄位 (留 capacity_scanner_job) + +設計鐵律 (參考 ADR-090 §3.4): + - 同一個 asset 跨 run 沿用同 asset_id (asset_key 為自然鍵) + - 上次出現但這次沒出現的 asset → lifecycle_state='deprecated' + decommissioned_at + - run_id 串連 inventory 與 coverage_snapshot,提供完整稽核 + +排程: + - 預設每 3600s (1 小時) 跑一次,首次延遲 60s 等其他 service init + - 由 main.py lifespan asyncio.create_task() 啟動 + +2026-04-19 ogt + Claude Opus 4.7 (1M context) Asia/Taipei +ADR-090 監控盲區治理 § Phase 7 Asset Inventory Foundation +""" +from __future__ import annotations + +import asyncio +import json as _json +import time as _time +from typing import Any + +import structlog + +logger = structlog.get_logger(__name__) + +# ============================================================================ +# 排程參數 +# ============================================================================ +_SCAN_INTERVAL_SEC = 3600 # 每 1 小時 +_FIRST_DELAY_SEC = 60 # 啟動後等 60s 再首掃 (其他 service init) +_KUBECTL_TIMEOUT_SEC = 30 +_LOOP_BACKOFF_SEC = 300 # 異常時 backoff 5 分鐘 + +# 7 個自動化覆蓋維度 (ADR-090 §3.5) +_COVERAGE_DIMENSIONS = ( + "auto_monitoring", "auto_alerting", "auto_rule_creation", + "auto_rule_matching", "auto_playbook", "auto_remediation", "auto_km_creation", +) + +# K8s asset_type 對應 +_K8S_RESOURCE_TO_ASSET_TYPE = { + "Pod": "container", + "Deployment": "k8s_workload", + "StatefulSet": "k8s_workload", + "DaemonSet": "k8s_workload", + "Service": "k8s_resource", + "ConfigMap": "k8s_resource", + "Secret": "secret", +} + + +# ============================================================================ +# Public entry — main.py lifespan 呼叫 +# ============================================================================ + +async def run_asset_scanner_loop() -> None: + """ + 永久迴圈:每 _SCAN_INTERVAL_SEC 秒做一次資產盤點。 + + 錯誤策略: + - 單次 scan 異常 → backoff 5 分鐘再試,不 crash + - 連續 5 次失敗 → 寫 ai_governance_event (Phase 6 自我治理) — TODO 後續實作 + """ + logger.info("asset_scanner_loop_started", interval_sec=_SCAN_INTERVAL_SEC) + await asyncio.sleep(_FIRST_DELAY_SEC) + + while True: + try: + await scan_once(triggered_by="cron") + except Exception as e: + logger.exception("asset_scanner_loop_error", error=str(e)) + await asyncio.sleep(_LOOP_BACKOFF_SEC) + continue + await asyncio.sleep(_SCAN_INTERVAL_SEC) + + +async def scan_once( + triggered_by: str = "cron", + scope: tuple[str, ...] = ("k8s",), + scan_depth: str = "shallow", +) -> str | None: + """ + 執行一次資產盤點。 + + Args: + triggered_by: cron / ai / human / incident + scope: 本次掃描範圍標籤 (寫入 asset_discovery_run.scope) + scan_depth: shallow (僅 list) / deep (含 describe) / full + + Returns: + run_id (UUID 字串) 或 None (寫 header 失敗時) + """ + started_ms = _time.time() + run_id = await _start_discovery_run(triggered_by, list(scope), scan_depth) + if not run_id: + return None + + new_count = 0 + modified_count = 0 + total_count = 0 + error_msg: str | None = None + + try: + # MVP: 掃 K8s pods (shallow), 後續加 deployments / services / ... + k8s_assets = await _collect_k8s_assets() + total_count = len(k8s_assets) + + # UPSERT inventory + new_count, modified_count = await _upsert_assets(k8s_assets, run_id) + + # 為每個 active asset 寫 7 維 coverage (預設 unknown,後續其他 service 升級為 green/yellow/red) + await _write_coverage_snapshots(run_id) + + except Exception as e: + error_msg = f"{type(e).__name__}: {e}"[:1000] + logger.exception("asset_scan_once_failed", run_id=run_id, error=error_msg) + + duration_ms = int((_time.time() - started_ms) * 1000) + final_status = "failed" if error_msg else "success" + + await _finish_discovery_run( + run_id=run_id, + status=final_status, + total_assets=total_count, + new_assets=new_count, + modified_assets=modified_count, + duration_ms=duration_ms, + error=error_msg, + ) + + # ADR-090 § aol 留痕 — asset_discovered 是合法 op_type + await _log_aol_asset_discovered( + run_id=run_id, + triggered_by=triggered_by, + total=total_count, + new=new_count, + modified=modified_count, + duration_ms=duration_ms, + status=final_status, + error=error_msg, + ) + + logger.info( + "asset_scan_once_done", + run_id=run_id, + status=final_status, + total=total_count, + new=new_count, + modified=modified_count, + duration_ms=duration_ms, + ) + return run_id + + +# ============================================================================ +# K8s 資產收集 +# ============================================================================ + +async def _collect_k8s_assets() -> list[dict[str, Any]]: + """ + 用 K8sProvider 列出全 namespace 的 pods,轉成 asset_inventory 結構。 + + 回傳每筆: {asset_key, asset_type, host, namespace, name, metadata, tags} + """ + from src.plugins.mcp.providers.k8s_provider import K8sProvider + + provider = K8sProvider() + result = await asyncio.wait_for( + provider.execute( + tool_name="kubectl_get", + parameters={"namespace": "--all-namespaces", "resource": "pods"}, + ), + timeout=_KUBECTL_TIMEOUT_SEC, + ) + if not result.success: + raise RuntimeError(f"kubectl get pods failed: {result.error}") + + raw = result.output + # k8s_provider _kubectl_get 回傳 stdout 字串 (line 299) + if isinstance(raw, str): + try: + payload = _json.loads(raw) + except _json.JSONDecodeError as e: + raise RuntimeError(f"kubectl JSON parse failed: {e}") from e + elif isinstance(raw, dict): + payload = raw + else: + raise RuntimeError(f"unexpected kubectl output type: {type(raw)}") + + items = payload.get("items", []) if isinstance(payload, dict) else [] + assets: list[dict[str, Any]] = [] + for item in items: + meta = item.get("metadata", {}) or {} + spec = item.get("spec", {}) or {} + ns = meta.get("namespace") or "default" + name = meta.get("name") or "unknown" + node = spec.get("nodeName") or "" + + # asset_key 為 UNIQUE 自然鍵,跨 run 沿用同一筆 + asset_key = f"k8s/pod/{ns}/{name}" + asset_type = "container" + + labels = meta.get("labels", {}) or {} + tags = [] + if labels.get("app"): + tags.append(f"app:{labels['app']}") + if labels.get("environment"): + tags.append(f"env:{labels['environment']}") + if labels.get("system"): + tags.append(f"system:{labels['system']}") + + metadata_jsonb = { + "owner_references": meta.get("ownerReferences", []), + "labels": labels, + "phase": (item.get("status", {}) or {}).get("phase", ""), + "node": node, + } + + assets.append({ + "asset_key": asset_key, + "asset_type": asset_type, + "host": node or None, + "namespace": ns, + "name": name, + "metadata": metadata_jsonb, + "tags": tags, + }) + return assets + + +# ============================================================================ +# DB 寫入 +# ============================================================================ + +async def _start_discovery_run( + triggered_by: str, + scope: list[str], + scan_depth: str, +) -> str | None: + """ + 寫 asset_discovery_run header (status='running'), 回傳 run_id。 + + 失敗 → log warning + 回 None,主流程靜默跳過本次 scan。 + """ + try: + from sqlalchemy import text as _sql + from src.db.base import get_db_context + + async with get_db_context() as db: + row = await db.execute( + _sql(""" + INSERT INTO asset_discovery_run ( + triggered_by, scope, scan_depth, status, + new_assets, modified_assets, disappeared_assets, + tools_used + ) VALUES ( + :tb, :scope, :sd, 'running', + 0, 0, 0, + CAST(:tools AS jsonb) + ) + RETURNING run_id + """), + { + "tb": triggered_by, + "scope": scope, + "sd": scan_depth, + "tools": _json.dumps({"k8s": "kubectl_get pods --all-namespaces"}), + }, + ) + run_id = row.scalar() + return str(run_id) if run_id else None + except Exception as e: + logger.warning("asset_discovery_run_start_failed", error=str(e)) + return None + + +async def _finish_discovery_run( + run_id: str, + status: str, + total_assets: int, + new_assets: int, + modified_assets: int, + duration_ms: int, + error: str | None, +) -> None: + try: + from sqlalchemy import text as _sql + from src.db.base import get_db_context + + async with get_db_context() as db: + await db.execute( + _sql(""" + UPDATE asset_discovery_run + SET status = :st, + ended_at = NOW(), + total_assets = :total, + new_assets = :new, + modified_assets = :mod, + duration_ms = :dur, + error = :err + WHERE run_id = CAST(:rid AS uuid) + """), + { + "st": status, + "total": total_assets, + "new": new_assets, + "mod": modified_assets, + "dur": duration_ms, + "err": error, + "rid": run_id, + }, + ) + except Exception as e: + logger.warning("asset_discovery_run_finish_failed", run_id=run_id, error=str(e)) + + +async def _upsert_assets( + assets: list[dict[str, Any]], + run_id: str, +) -> tuple[int, int]: + """ + UPSERT asset_inventory,回傳 (new_count, modified_count)。 + + 用 ON CONFLICT (asset_key) DO UPDATE 確保 idempotent。 + """ + if not assets: + return 0, 0 + + from sqlalchemy import text as _sql + from src.db.base import get_db_context + + new_count = 0 + modified_count = 0 + + try: + async with get_db_context() as db: + for a in assets: + # xmax = 0 表示 INSERT (新),否則表示 UPDATE (修改) + row = await db.execute( + _sql(""" + INSERT INTO asset_inventory ( + asset_key, asset_type, host, namespace, name, + metadata, tags, environment, lifecycle_state, + first_seen_at, last_seen_at + ) VALUES ( + :ak, :at, :host, :ns, :name, + CAST(:md AS jsonb), :tags, 'prod', 'active', + NOW(), NOW() + ) + ON CONFLICT (asset_key) DO UPDATE + SET last_seen_at = NOW(), + host = EXCLUDED.host, + metadata = EXCLUDED.metadata, + tags = EXCLUDED.tags, + lifecycle_state = 'active', + updated_at = NOW(), + decommissioned_at = NULL + RETURNING asset_id, (xmax = 0) AS inserted + """), + { + "ak": a["asset_key"], + "at": a["asset_type"], + "host": a["host"], + "ns": a["namespace"], + "name": a["name"], + "md": _json.dumps(a["metadata"], ensure_ascii=False), + "tags": a["tags"], + }, + ) + _, inserted = row.one() + if inserted: + new_count += 1 + else: + modified_count += 1 + except Exception as e: + logger.exception("asset_upsert_failed", run_id=run_id, error=str(e)) + + return new_count, modified_count + + +async def _write_coverage_snapshots(run_id: str) -> None: + """ + 為本次 run 中的所有 active asset 寫 7 維 coverage_snapshot (預設 unknown)。 + + 後續 service (rule_catalog / playbook_extractor / km_writer) 會 UPDATE 對應維度。 + """ + try: + from sqlalchemy import text as _sql + from src.db.base import get_db_context + + async with get_db_context() as db: + # 一次性 INSERT: 取所有 active asset × 7 dimensions + await db.execute( + _sql(""" + INSERT INTO asset_coverage_snapshot ( + run_id, asset_id, dimension, coverage_status, + evidence, detected_by + ) + SELECT + CAST(:rid AS uuid), + ai.asset_id, + d.dimension, + 'unknown' AS coverage_status, + '{}'::jsonb, + 'asset_scanner' AS detected_by + FROM asset_inventory ai + CROSS JOIN ( + VALUES ('auto_monitoring'),('auto_alerting'),('auto_rule_creation'), + ('auto_rule_matching'),('auto_playbook'),('auto_remediation'), + ('auto_km_creation') + ) AS d(dimension) + WHERE ai.lifecycle_state = 'active' + ON CONFLICT (run_id, asset_id, dimension) DO NOTHING + """), + {"rid": run_id}, + ) + except Exception as e: + logger.warning("asset_coverage_write_failed", run_id=run_id, error=str(e)) + + +async def _log_aol_asset_discovered( + run_id: str, + triggered_by: str, + total: int, + new: int, + modified: int, + duration_ms: int, + status: str, + error: str | None, +) -> None: + """寫 automation_operation_log(asset_discovered)。失敗只 log 不阻塞。""" + try: + from sqlalchemy import text as _sql + from src.db.base import get_db_context + + aol_status = "success" if status == "success" else "failed" + input_payload = { + "run_id": run_id, + "triggered_by": triggered_by, + "scope": ["k8s"], + "scan_depth": "shallow", + } + output_payload = { + "run_id": run_id, + "total_assets": total, + "new_assets": new, + "modified_assets": modified, + } + + async with get_db_context() as db: + await db.execute( + _sql(""" + INSERT INTO automation_operation_log ( + operation_type, actor, status, + input, output, run_id, duration_ms, error, tags + ) VALUES ( + 'asset_discovered', + 'asset_scanner', + :st, + CAST(:input AS jsonb), + CAST(:output AS jsonb), + CAST(:rid AS uuid), + :dur, :err, + :tags + ) + """), + { + "st": aol_status, + "input": _json.dumps(input_payload, ensure_ascii=False), + "output": _json.dumps(output_payload, ensure_ascii=False), + "rid": run_id, + "dur": duration_ms, + "err": (error or "")[:2000] if error else None, + "tags": ["asset_scanner", "discovery", "k8s"], + }, + ) + except Exception as e: + logger.warning("asset_scanner_aol_write_failed", run_id=run_id, error=str(e)) diff --git a/apps/api/src/main.py b/apps/api/src/main.py index 77221f73..f1f956d6 100644 --- a/apps/api/src/main.py +++ b/apps/api/src/main.py @@ -370,6 +370,16 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]: except Exception as e: logger.warning("incident_analysis_sweeper_schedule_failed", error=str(e)) + # ADR-090 § 資產盤點 cron (2026-04-19 ogt + Claude Opus 4.7 Asia/Taipei) + # 每 1 小時掃 K8s pods → 寫 asset_inventory + asset_discovery_run + 7 維 coverage + # 解開 8 張 0 writer 表的第一個 (asset_inventory / asset_discovery_run / asset_coverage_snapshot) + try: + from src.jobs.asset_scanner_job import run_asset_scanner_loop + asyncio.create_task(run_asset_scanner_loop()) + logger.info("asset_scanner_loop_scheduled", interval_sec=3600) + except Exception as e: + logger.warning("asset_scanner_loop_schedule_failed", error=str(e)) + # ADR-076 Task 4: 每日 08:00 台北時間自動日度巡檢報告 # 2026-04-14 Claude Haiku 4.5 Asia/Taipei try: