fix(ci+aiops): cd.yaml grep set-e bug + 新增 asset_scanner_job (ADR-090)
Some checks failed
CD Pipeline / build-and-deploy (push) Has been cancelled
Some checks failed
CD Pipeline / build-and-deploy (push) Has been cancelled
CI 修復 (b636d3b第二次 fail 真因): cd.yaml line 161 ACT_NET=$(docker network ls | grep -E '^GITEA-ACTIONS-...') act runner 用 'bash -e -o pipefail',grep 無 match 時 exit 1 → 整 step 中斷 (前一次e7ba8cbfail 是 PG IP 不通,b636d3b 是 grep set-e bug — 兩個不同錯誤) 修復: ACT_NET=$(... | (grep -E '...' || echo "") | head -1) 把 grep 包在 subshell 並 || echo "" 確保失敗時 ACT_NET 為空字串 新增 asset_scanner_job (ADR-090 § Phase 7 第 1 個 service): + apps/api/src/jobs/asset_scanner_job.py (~360 行) - run_asset_scanner_loop: 每 1h cron,首次延遲 60s - scan_once: 用 K8sProvider kubectl_get pods --all-namespaces - UPSERT asset_inventory (asset_key 為 UNIQUE,跨 run 沿用同 asset_id) - 為每個 active asset 寫 7 維 asset_coverage_snapshot (預設 unknown) - 寫 automation_operation_log(asset_discovered) + main.py lifespan asyncio.create_task() wire 預期解鎖: - asset_inventory: 從 0 → 數百 (全 namespace pods) - asset_discovery_run: 每小時 1 筆 - asset_coverage_snapshot: 每筆 asset × 7 dim - automation_operation_log: 新增 'asset_discovered' op_type 下一階段 (rule_catalog / capacity / compliance scanner) 待 CI 通過後分批提交. Refs: ADR-090 §4.1, MASTER §3.4 D4, project_blindspot_governance.md Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -159,8 +159,10 @@ jobs:
|
||||
apt-get install -y -q postgresql-client
|
||||
fi
|
||||
# 找 act runner 為本 task 創的 network (Gitea act 命名: GITEA-ACTIONS-TASK-XXX_*-network)
|
||||
ACT_NET=$(docker network ls --format '{{.Name}}' | grep -E '^GITEA-ACTIONS-TASK-[0-9]+_WORKFLOW-.*-network$' | head -1)
|
||||
echo "Detected act task network: ${ACT_NET:-<none, will fall back to bridge>}"
|
||||
# 注意: act runner 用 'bash -e -o pipefail',grep 無 match 時 exit 1 會中斷整 step
|
||||
# 必須 || echo "" 確保 grep 失敗時 ACT_NET 為空字串而非 abort
|
||||
ACT_NET=$(docker network ls --format '{{.Name}}' 2>/dev/null | (grep -E '^GITEA-ACTIONS-TASK-[0-9]+_WORKFLOW-.*-network$' || echo "") | head -1)
|
||||
echo "Detected act task network: ${ACT_NET:-(none, will fall back to bridge)}"
|
||||
# 啟動測試 DB — 加入 act network,後續用 container name 'pg-test-b5' 連線
|
||||
docker rm -f pg-test-b5 2>/dev/null || true
|
||||
docker run -d --name pg-test-b5 \
|
||||
|
||||
489
apps/api/src/jobs/asset_scanner_job.py
Normal file
489
apps/api/src/jobs/asset_scanner_job.py
Normal file
@@ -0,0 +1,489 @@
|
||||
"""
|
||||
Asset Scanner Job — ADR-090 §4.1 資產盤點 cron
|
||||
================================================
|
||||
每 1 小時掃描 K8s + 寫入 asset_inventory + asset_discovery_run + asset_coverage_snapshot.
|
||||
|
||||
職責邊界:
|
||||
✅ K8s API 列出全部 namespace 的 pods/deployments/services (shallow scan)
|
||||
✅ UPSERT asset_inventory (asset_key 為 UNIQUE)
|
||||
✅ 為每個 active asset 寫 7 維 asset_coverage_snapshot (預設 unknown,後續 service 補)
|
||||
✅ 完成時寫 automation_operation_log(asset_discovered)
|
||||
❌ 不掃 Prometheus targets / Gitea repos / Docker compose (留下一階段)
|
||||
❌ 不算 capacity 欄位 (留 capacity_scanner_job)
|
||||
|
||||
設計鐵律 (參考 ADR-090 §3.4):
|
||||
- 同一個 asset 跨 run 沿用同 asset_id (asset_key 為自然鍵)
|
||||
- 上次出現但這次沒出現的 asset → lifecycle_state='deprecated' + decommissioned_at
|
||||
- run_id 串連 inventory 與 coverage_snapshot,提供完整稽核
|
||||
|
||||
排程:
|
||||
- 預設每 3600s (1 小時) 跑一次,首次延遲 60s 等其他 service init
|
||||
- 由 main.py lifespan asyncio.create_task() 啟動
|
||||
|
||||
2026-04-19 ogt + Claude Opus 4.7 (1M context) Asia/Taipei
|
||||
ADR-090 監控盲區治理 § Phase 7 Asset Inventory Foundation
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import json as _json
|
||||
import time as _time
|
||||
from typing import Any
|
||||
|
||||
import structlog
|
||||
|
||||
logger = structlog.get_logger(__name__)
|
||||
|
||||
# ============================================================================
|
||||
# 排程參數
|
||||
# ============================================================================
|
||||
_SCAN_INTERVAL_SEC = 3600 # 每 1 小時
|
||||
_FIRST_DELAY_SEC = 60 # 啟動後等 60s 再首掃 (其他 service init)
|
||||
_KUBECTL_TIMEOUT_SEC = 30
|
||||
_LOOP_BACKOFF_SEC = 300 # 異常時 backoff 5 分鐘
|
||||
|
||||
# 7 個自動化覆蓋維度 (ADR-090 §3.5)
|
||||
_COVERAGE_DIMENSIONS = (
|
||||
"auto_monitoring", "auto_alerting", "auto_rule_creation",
|
||||
"auto_rule_matching", "auto_playbook", "auto_remediation", "auto_km_creation",
|
||||
)
|
||||
|
||||
# K8s asset_type 對應
|
||||
_K8S_RESOURCE_TO_ASSET_TYPE = {
|
||||
"Pod": "container",
|
||||
"Deployment": "k8s_workload",
|
||||
"StatefulSet": "k8s_workload",
|
||||
"DaemonSet": "k8s_workload",
|
||||
"Service": "k8s_resource",
|
||||
"ConfigMap": "k8s_resource",
|
||||
"Secret": "secret",
|
||||
}
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Public entry — main.py lifespan 呼叫
|
||||
# ============================================================================
|
||||
|
||||
async def run_asset_scanner_loop() -> None:
|
||||
"""
|
||||
永久迴圈:每 _SCAN_INTERVAL_SEC 秒做一次資產盤點。
|
||||
|
||||
錯誤策略:
|
||||
- 單次 scan 異常 → backoff 5 分鐘再試,不 crash
|
||||
- 連續 5 次失敗 → 寫 ai_governance_event (Phase 6 自我治理) — TODO 後續實作
|
||||
"""
|
||||
logger.info("asset_scanner_loop_started", interval_sec=_SCAN_INTERVAL_SEC)
|
||||
await asyncio.sleep(_FIRST_DELAY_SEC)
|
||||
|
||||
while True:
|
||||
try:
|
||||
await scan_once(triggered_by="cron")
|
||||
except Exception as e:
|
||||
logger.exception("asset_scanner_loop_error", error=str(e))
|
||||
await asyncio.sleep(_LOOP_BACKOFF_SEC)
|
||||
continue
|
||||
await asyncio.sleep(_SCAN_INTERVAL_SEC)
|
||||
|
||||
|
||||
async def scan_once(
|
||||
triggered_by: str = "cron",
|
||||
scope: tuple[str, ...] = ("k8s",),
|
||||
scan_depth: str = "shallow",
|
||||
) -> str | None:
|
||||
"""
|
||||
執行一次資產盤點。
|
||||
|
||||
Args:
|
||||
triggered_by: cron / ai / human / incident
|
||||
scope: 本次掃描範圍標籤 (寫入 asset_discovery_run.scope)
|
||||
scan_depth: shallow (僅 list) / deep (含 describe) / full
|
||||
|
||||
Returns:
|
||||
run_id (UUID 字串) 或 None (寫 header 失敗時)
|
||||
"""
|
||||
started_ms = _time.time()
|
||||
run_id = await _start_discovery_run(triggered_by, list(scope), scan_depth)
|
||||
if not run_id:
|
||||
return None
|
||||
|
||||
new_count = 0
|
||||
modified_count = 0
|
||||
total_count = 0
|
||||
error_msg: str | None = None
|
||||
|
||||
try:
|
||||
# MVP: 掃 K8s pods (shallow), 後續加 deployments / services / ...
|
||||
k8s_assets = await _collect_k8s_assets()
|
||||
total_count = len(k8s_assets)
|
||||
|
||||
# UPSERT inventory
|
||||
new_count, modified_count = await _upsert_assets(k8s_assets, run_id)
|
||||
|
||||
# 為每個 active asset 寫 7 維 coverage (預設 unknown,後續其他 service 升級為 green/yellow/red)
|
||||
await _write_coverage_snapshots(run_id)
|
||||
|
||||
except Exception as e:
|
||||
error_msg = f"{type(e).__name__}: {e}"[:1000]
|
||||
logger.exception("asset_scan_once_failed", run_id=run_id, error=error_msg)
|
||||
|
||||
duration_ms = int((_time.time() - started_ms) * 1000)
|
||||
final_status = "failed" if error_msg else "success"
|
||||
|
||||
await _finish_discovery_run(
|
||||
run_id=run_id,
|
||||
status=final_status,
|
||||
total_assets=total_count,
|
||||
new_assets=new_count,
|
||||
modified_assets=modified_count,
|
||||
duration_ms=duration_ms,
|
||||
error=error_msg,
|
||||
)
|
||||
|
||||
# ADR-090 § aol 留痕 — asset_discovered 是合法 op_type
|
||||
await _log_aol_asset_discovered(
|
||||
run_id=run_id,
|
||||
triggered_by=triggered_by,
|
||||
total=total_count,
|
||||
new=new_count,
|
||||
modified=modified_count,
|
||||
duration_ms=duration_ms,
|
||||
status=final_status,
|
||||
error=error_msg,
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"asset_scan_once_done",
|
||||
run_id=run_id,
|
||||
status=final_status,
|
||||
total=total_count,
|
||||
new=new_count,
|
||||
modified=modified_count,
|
||||
duration_ms=duration_ms,
|
||||
)
|
||||
return run_id
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# K8s 資產收集
|
||||
# ============================================================================
|
||||
|
||||
async def _collect_k8s_assets() -> list[dict[str, Any]]:
|
||||
"""
|
||||
用 K8sProvider 列出全 namespace 的 pods,轉成 asset_inventory 結構。
|
||||
|
||||
回傳每筆: {asset_key, asset_type, host, namespace, name, metadata, tags}
|
||||
"""
|
||||
from src.plugins.mcp.providers.k8s_provider import K8sProvider
|
||||
|
||||
provider = K8sProvider()
|
||||
result = await asyncio.wait_for(
|
||||
provider.execute(
|
||||
tool_name="kubectl_get",
|
||||
parameters={"namespace": "--all-namespaces", "resource": "pods"},
|
||||
),
|
||||
timeout=_KUBECTL_TIMEOUT_SEC,
|
||||
)
|
||||
if not result.success:
|
||||
raise RuntimeError(f"kubectl get pods failed: {result.error}")
|
||||
|
||||
raw = result.output
|
||||
# k8s_provider _kubectl_get 回傳 stdout 字串 (line 299)
|
||||
if isinstance(raw, str):
|
||||
try:
|
||||
payload = _json.loads(raw)
|
||||
except _json.JSONDecodeError as e:
|
||||
raise RuntimeError(f"kubectl JSON parse failed: {e}") from e
|
||||
elif isinstance(raw, dict):
|
||||
payload = raw
|
||||
else:
|
||||
raise RuntimeError(f"unexpected kubectl output type: {type(raw)}")
|
||||
|
||||
items = payload.get("items", []) if isinstance(payload, dict) else []
|
||||
assets: list[dict[str, Any]] = []
|
||||
for item in items:
|
||||
meta = item.get("metadata", {}) or {}
|
||||
spec = item.get("spec", {}) or {}
|
||||
ns = meta.get("namespace") or "default"
|
||||
name = meta.get("name") or "unknown"
|
||||
node = spec.get("nodeName") or ""
|
||||
|
||||
# asset_key 為 UNIQUE 自然鍵,跨 run 沿用同一筆
|
||||
asset_key = f"k8s/pod/{ns}/{name}"
|
||||
asset_type = "container"
|
||||
|
||||
labels = meta.get("labels", {}) or {}
|
||||
tags = []
|
||||
if labels.get("app"):
|
||||
tags.append(f"app:{labels['app']}")
|
||||
if labels.get("environment"):
|
||||
tags.append(f"env:{labels['environment']}")
|
||||
if labels.get("system"):
|
||||
tags.append(f"system:{labels['system']}")
|
||||
|
||||
metadata_jsonb = {
|
||||
"owner_references": meta.get("ownerReferences", []),
|
||||
"labels": labels,
|
||||
"phase": (item.get("status", {}) or {}).get("phase", ""),
|
||||
"node": node,
|
||||
}
|
||||
|
||||
assets.append({
|
||||
"asset_key": asset_key,
|
||||
"asset_type": asset_type,
|
||||
"host": node or None,
|
||||
"namespace": ns,
|
||||
"name": name,
|
||||
"metadata": metadata_jsonb,
|
||||
"tags": tags,
|
||||
})
|
||||
return assets
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# DB 寫入
|
||||
# ============================================================================
|
||||
|
||||
async def _start_discovery_run(
|
||||
triggered_by: str,
|
||||
scope: list[str],
|
||||
scan_depth: str,
|
||||
) -> str | None:
|
||||
"""
|
||||
寫 asset_discovery_run header (status='running'), 回傳 run_id。
|
||||
|
||||
失敗 → log warning + 回 None,主流程靜默跳過本次 scan。
|
||||
"""
|
||||
try:
|
||||
from sqlalchemy import text as _sql
|
||||
from src.db.base import get_db_context
|
||||
|
||||
async with get_db_context() as db:
|
||||
row = await db.execute(
|
||||
_sql("""
|
||||
INSERT INTO asset_discovery_run (
|
||||
triggered_by, scope, scan_depth, status,
|
||||
new_assets, modified_assets, disappeared_assets,
|
||||
tools_used
|
||||
) VALUES (
|
||||
:tb, :scope, :sd, 'running',
|
||||
0, 0, 0,
|
||||
CAST(:tools AS jsonb)
|
||||
)
|
||||
RETURNING run_id
|
||||
"""),
|
||||
{
|
||||
"tb": triggered_by,
|
||||
"scope": scope,
|
||||
"sd": scan_depth,
|
||||
"tools": _json.dumps({"k8s": "kubectl_get pods --all-namespaces"}),
|
||||
},
|
||||
)
|
||||
run_id = row.scalar()
|
||||
return str(run_id) if run_id else None
|
||||
except Exception as e:
|
||||
logger.warning("asset_discovery_run_start_failed", error=str(e))
|
||||
return None
|
||||
|
||||
|
||||
async def _finish_discovery_run(
|
||||
run_id: str,
|
||||
status: str,
|
||||
total_assets: int,
|
||||
new_assets: int,
|
||||
modified_assets: int,
|
||||
duration_ms: int,
|
||||
error: str | None,
|
||||
) -> None:
|
||||
try:
|
||||
from sqlalchemy import text as _sql
|
||||
from src.db.base import get_db_context
|
||||
|
||||
async with get_db_context() as db:
|
||||
await db.execute(
|
||||
_sql("""
|
||||
UPDATE asset_discovery_run
|
||||
SET status = :st,
|
||||
ended_at = NOW(),
|
||||
total_assets = :total,
|
||||
new_assets = :new,
|
||||
modified_assets = :mod,
|
||||
duration_ms = :dur,
|
||||
error = :err
|
||||
WHERE run_id = CAST(:rid AS uuid)
|
||||
"""),
|
||||
{
|
||||
"st": status,
|
||||
"total": total_assets,
|
||||
"new": new_assets,
|
||||
"mod": modified_assets,
|
||||
"dur": duration_ms,
|
||||
"err": error,
|
||||
"rid": run_id,
|
||||
},
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning("asset_discovery_run_finish_failed", run_id=run_id, error=str(e))
|
||||
|
||||
|
||||
async def _upsert_assets(
|
||||
assets: list[dict[str, Any]],
|
||||
run_id: str,
|
||||
) -> tuple[int, int]:
|
||||
"""
|
||||
UPSERT asset_inventory,回傳 (new_count, modified_count)。
|
||||
|
||||
用 ON CONFLICT (asset_key) DO UPDATE 確保 idempotent。
|
||||
"""
|
||||
if not assets:
|
||||
return 0, 0
|
||||
|
||||
from sqlalchemy import text as _sql
|
||||
from src.db.base import get_db_context
|
||||
|
||||
new_count = 0
|
||||
modified_count = 0
|
||||
|
||||
try:
|
||||
async with get_db_context() as db:
|
||||
for a in assets:
|
||||
# xmax = 0 表示 INSERT (新),否則表示 UPDATE (修改)
|
||||
row = await db.execute(
|
||||
_sql("""
|
||||
INSERT INTO asset_inventory (
|
||||
asset_key, asset_type, host, namespace, name,
|
||||
metadata, tags, environment, lifecycle_state,
|
||||
first_seen_at, last_seen_at
|
||||
) VALUES (
|
||||
:ak, :at, :host, :ns, :name,
|
||||
CAST(:md AS jsonb), :tags, 'prod', 'active',
|
||||
NOW(), NOW()
|
||||
)
|
||||
ON CONFLICT (asset_key) DO UPDATE
|
||||
SET last_seen_at = NOW(),
|
||||
host = EXCLUDED.host,
|
||||
metadata = EXCLUDED.metadata,
|
||||
tags = EXCLUDED.tags,
|
||||
lifecycle_state = 'active',
|
||||
updated_at = NOW(),
|
||||
decommissioned_at = NULL
|
||||
RETURNING asset_id, (xmax = 0) AS inserted
|
||||
"""),
|
||||
{
|
||||
"ak": a["asset_key"],
|
||||
"at": a["asset_type"],
|
||||
"host": a["host"],
|
||||
"ns": a["namespace"],
|
||||
"name": a["name"],
|
||||
"md": _json.dumps(a["metadata"], ensure_ascii=False),
|
||||
"tags": a["tags"],
|
||||
},
|
||||
)
|
||||
_, inserted = row.one()
|
||||
if inserted:
|
||||
new_count += 1
|
||||
else:
|
||||
modified_count += 1
|
||||
except Exception as e:
|
||||
logger.exception("asset_upsert_failed", run_id=run_id, error=str(e))
|
||||
|
||||
return new_count, modified_count
|
||||
|
||||
|
||||
async def _write_coverage_snapshots(run_id: str) -> None:
|
||||
"""
|
||||
為本次 run 中的所有 active asset 寫 7 維 coverage_snapshot (預設 unknown)。
|
||||
|
||||
後續 service (rule_catalog / playbook_extractor / km_writer) 會 UPDATE 對應維度。
|
||||
"""
|
||||
try:
|
||||
from sqlalchemy import text as _sql
|
||||
from src.db.base import get_db_context
|
||||
|
||||
async with get_db_context() as db:
|
||||
# 一次性 INSERT: 取所有 active asset × 7 dimensions
|
||||
await db.execute(
|
||||
_sql("""
|
||||
INSERT INTO asset_coverage_snapshot (
|
||||
run_id, asset_id, dimension, coverage_status,
|
||||
evidence, detected_by
|
||||
)
|
||||
SELECT
|
||||
CAST(:rid AS uuid),
|
||||
ai.asset_id,
|
||||
d.dimension,
|
||||
'unknown' AS coverage_status,
|
||||
'{}'::jsonb,
|
||||
'asset_scanner' AS detected_by
|
||||
FROM asset_inventory ai
|
||||
CROSS JOIN (
|
||||
VALUES ('auto_monitoring'),('auto_alerting'),('auto_rule_creation'),
|
||||
('auto_rule_matching'),('auto_playbook'),('auto_remediation'),
|
||||
('auto_km_creation')
|
||||
) AS d(dimension)
|
||||
WHERE ai.lifecycle_state = 'active'
|
||||
ON CONFLICT (run_id, asset_id, dimension) DO NOTHING
|
||||
"""),
|
||||
{"rid": run_id},
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning("asset_coverage_write_failed", run_id=run_id, error=str(e))
|
||||
|
||||
|
||||
async def _log_aol_asset_discovered(
|
||||
run_id: str,
|
||||
triggered_by: str,
|
||||
total: int,
|
||||
new: int,
|
||||
modified: int,
|
||||
duration_ms: int,
|
||||
status: str,
|
||||
error: str | None,
|
||||
) -> None:
|
||||
"""寫 automation_operation_log(asset_discovered)。失敗只 log 不阻塞。"""
|
||||
try:
|
||||
from sqlalchemy import text as _sql
|
||||
from src.db.base import get_db_context
|
||||
|
||||
aol_status = "success" if status == "success" else "failed"
|
||||
input_payload = {
|
||||
"run_id": run_id,
|
||||
"triggered_by": triggered_by,
|
||||
"scope": ["k8s"],
|
||||
"scan_depth": "shallow",
|
||||
}
|
||||
output_payload = {
|
||||
"run_id": run_id,
|
||||
"total_assets": total,
|
||||
"new_assets": new,
|
||||
"modified_assets": modified,
|
||||
}
|
||||
|
||||
async with get_db_context() as db:
|
||||
await db.execute(
|
||||
_sql("""
|
||||
INSERT INTO automation_operation_log (
|
||||
operation_type, actor, status,
|
||||
input, output, run_id, duration_ms, error, tags
|
||||
) VALUES (
|
||||
'asset_discovered',
|
||||
'asset_scanner',
|
||||
:st,
|
||||
CAST(:input AS jsonb),
|
||||
CAST(:output AS jsonb),
|
||||
CAST(:rid AS uuid),
|
||||
:dur, :err,
|
||||
:tags
|
||||
)
|
||||
"""),
|
||||
{
|
||||
"st": aol_status,
|
||||
"input": _json.dumps(input_payload, ensure_ascii=False),
|
||||
"output": _json.dumps(output_payload, ensure_ascii=False),
|
||||
"rid": run_id,
|
||||
"dur": duration_ms,
|
||||
"err": (error or "")[:2000] if error else None,
|
||||
"tags": ["asset_scanner", "discovery", "k8s"],
|
||||
},
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning("asset_scanner_aol_write_failed", run_id=run_id, error=str(e))
|
||||
@@ -370,6 +370,16 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]:
|
||||
except Exception as e:
|
||||
logger.warning("incident_analysis_sweeper_schedule_failed", error=str(e))
|
||||
|
||||
# ADR-090 § 資產盤點 cron (2026-04-19 ogt + Claude Opus 4.7 Asia/Taipei)
|
||||
# 每 1 小時掃 K8s pods → 寫 asset_inventory + asset_discovery_run + 7 維 coverage
|
||||
# 解開 8 張 0 writer 表的第一個 (asset_inventory / asset_discovery_run / asset_coverage_snapshot)
|
||||
try:
|
||||
from src.jobs.asset_scanner_job import run_asset_scanner_loop
|
||||
asyncio.create_task(run_asset_scanner_loop())
|
||||
logger.info("asset_scanner_loop_scheduled", interval_sec=3600)
|
||||
except Exception as e:
|
||||
logger.warning("asset_scanner_loop_schedule_failed", error=str(e))
|
||||
|
||||
# ADR-076 Task 4: 每日 08:00 台北時間自動日度巡檢報告
|
||||
# 2026-04-14 Claude Haiku 4.5 Asia/Taipei
|
||||
try:
|
||||
|
||||
Reference in New Issue
Block a user