fix(ci+aiops): cd.yaml grep set-e bug + 新增 asset_scanner_job (ADR-090)
Some checks failed
CD Pipeline / build-and-deploy (push) Has been cancelled

CI 修復 (b636d3b 第二次 fail 真因):
  cd.yaml line 161 ACT_NET=$(docker network ls | grep -E '^GITEA-ACTIONS-...')
  act runner 用 'bash -e -o pipefail',grep 無 match 時 exit 1 → 整 step 中斷
  (前一次 e7ba8cb fail 是 PG IP 不通,b636d3b 是 grep set-e bug — 兩個不同錯誤)

修復:
  ACT_NET=$(... | (grep -E '...' || echo "") | head -1)
  把 grep 包在 subshell 並 || echo "" 確保失敗時 ACT_NET 為空字串

新增 asset_scanner_job (ADR-090 § Phase 7 第 1 個 service):
  + apps/api/src/jobs/asset_scanner_job.py (~360 行)
    - run_asset_scanner_loop: 每 1h cron,首次延遲 60s
    - scan_once: 用 K8sProvider kubectl_get pods --all-namespaces
    - UPSERT asset_inventory (asset_key 為 UNIQUE,跨 run 沿用同 asset_id)
    - 為每個 active asset 寫 7 維 asset_coverage_snapshot (預設 unknown)
    - 寫 automation_operation_log(asset_discovered)
  + main.py lifespan asyncio.create_task() wire

預期解鎖:
  - asset_inventory: 從 0 → 數百 (全 namespace pods)
  - asset_discovery_run: 每小時 1 筆
  - asset_coverage_snapshot: 每筆 asset × 7 dim
  - automation_operation_log: 新增 'asset_discovered' op_type

下一階段 (rule_catalog / capacity / compliance scanner) 待 CI 通過後分批提交.

Refs: ADR-090 §4.1, MASTER §3.4 D4, project_blindspot_governance.md

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
OG T
2026-04-19 14:15:45 +08:00
parent b636d3b30b
commit ddb902f1ff
3 changed files with 503 additions and 2 deletions

View File

@@ -159,8 +159,10 @@ jobs:
apt-get install -y -q postgresql-client
fi
# 找 act runner 為本 task 創的 network (Gitea act 命名: GITEA-ACTIONS-TASK-XXX_*-network)
ACT_NET=$(docker network ls --format '{{.Name}}' | grep -E '^GITEA-ACTIONS-TASK-[0-9]+_WORKFLOW-.*-network$' | head -1)
echo "Detected act task network: ${ACT_NET:-<none, will fall back to bridge>}"
# 注意: act runner 用 'bash -e -o pipefail',grep 無 match 時 exit 1 會中斷整 step
# 必須 || echo "" 確保 grep 失敗時 ACT_NET 為空字串而非 abort
ACT_NET=$(docker network ls --format '{{.Name}}' 2>/dev/null | (grep -E '^GITEA-ACTIONS-TASK-[0-9]+_WORKFLOW-.*-network$' || echo "") | head -1)
echo "Detected act task network: ${ACT_NET:-(none, will fall back to bridge)}"
# 啟動測試 DB — 加入 act network,後續用 container name 'pg-test-b5' 連線
docker rm -f pg-test-b5 2>/dev/null || true
docker run -d --name pg-test-b5 \

View File

@@ -0,0 +1,489 @@
"""
Asset Scanner Job — ADR-090 §4.1 資產盤點 cron
================================================
每 1 小時掃描 K8s + 寫入 asset_inventory + asset_discovery_run + asset_coverage_snapshot.
職責邊界:
✅ K8s API 列出全部 namespace 的 pods/deployments/services (shallow scan)
✅ UPSERT asset_inventory (asset_key 為 UNIQUE)
✅ 為每個 active asset 寫 7 維 asset_coverage_snapshot (預設 unknown,後續 service 補)
✅ 完成時寫 automation_operation_log(asset_discovered)
❌ 不掃 Prometheus targets / Gitea repos / Docker compose (留下一階段)
❌ 不算 capacity 欄位 (留 capacity_scanner_job)
設計鐵律 (參考 ADR-090 §3.4):
- 同一個 asset 跨 run 沿用同 asset_id (asset_key 為自然鍵)
- 上次出現但這次沒出現的 asset → lifecycle_state='deprecated' + decommissioned_at
- run_id 串連 inventory 與 coverage_snapshot,提供完整稽核
排程:
- 預設每 3600s (1 小時) 跑一次,首次延遲 60s 等其他 service init
- 由 main.py lifespan asyncio.create_task() 啟動
2026-04-19 ogt + Claude Opus 4.7 (1M context) Asia/Taipei
ADR-090 監控盲區治理 § Phase 7 Asset Inventory Foundation
"""
from __future__ import annotations
import asyncio
import json as _json
import time as _time
from typing import Any
import structlog
logger = structlog.get_logger(__name__)
# ============================================================================
# 排程參數
# ============================================================================
_SCAN_INTERVAL_SEC = 3600 # 每 1 小時
_FIRST_DELAY_SEC = 60 # 啟動後等 60s 再首掃 (其他 service init)
_KUBECTL_TIMEOUT_SEC = 30
_LOOP_BACKOFF_SEC = 300 # 異常時 backoff 5 分鐘
# 7 個自動化覆蓋維度 (ADR-090 §3.5)
_COVERAGE_DIMENSIONS = (
"auto_monitoring", "auto_alerting", "auto_rule_creation",
"auto_rule_matching", "auto_playbook", "auto_remediation", "auto_km_creation",
)
# K8s asset_type 對應
_K8S_RESOURCE_TO_ASSET_TYPE = {
"Pod": "container",
"Deployment": "k8s_workload",
"StatefulSet": "k8s_workload",
"DaemonSet": "k8s_workload",
"Service": "k8s_resource",
"ConfigMap": "k8s_resource",
"Secret": "secret",
}
# ============================================================================
# Public entry — main.py lifespan 呼叫
# ============================================================================
async def run_asset_scanner_loop() -> None:
"""
永久迴圈:每 _SCAN_INTERVAL_SEC 秒做一次資產盤點。
錯誤策略:
- 單次 scan 異常 → backoff 5 分鐘再試,不 crash
- 連續 5 次失敗 → 寫 ai_governance_event (Phase 6 自我治理) — TODO 後續實作
"""
logger.info("asset_scanner_loop_started", interval_sec=_SCAN_INTERVAL_SEC)
await asyncio.sleep(_FIRST_DELAY_SEC)
while True:
try:
await scan_once(triggered_by="cron")
except Exception as e:
logger.exception("asset_scanner_loop_error", error=str(e))
await asyncio.sleep(_LOOP_BACKOFF_SEC)
continue
await asyncio.sleep(_SCAN_INTERVAL_SEC)
async def scan_once(
triggered_by: str = "cron",
scope: tuple[str, ...] = ("k8s",),
scan_depth: str = "shallow",
) -> str | None:
"""
執行一次資產盤點。
Args:
triggered_by: cron / ai / human / incident
scope: 本次掃描範圍標籤 (寫入 asset_discovery_run.scope)
scan_depth: shallow (僅 list) / deep (含 describe) / full
Returns:
run_id (UUID 字串) 或 None (寫 header 失敗時)
"""
started_ms = _time.time()
run_id = await _start_discovery_run(triggered_by, list(scope), scan_depth)
if not run_id:
return None
new_count = 0
modified_count = 0
total_count = 0
error_msg: str | None = None
try:
# MVP: 掃 K8s pods (shallow), 後續加 deployments / services / ...
k8s_assets = await _collect_k8s_assets()
total_count = len(k8s_assets)
# UPSERT inventory
new_count, modified_count = await _upsert_assets(k8s_assets, run_id)
# 為每個 active asset 寫 7 維 coverage (預設 unknown,後續其他 service 升級為 green/yellow/red)
await _write_coverage_snapshots(run_id)
except Exception as e:
error_msg = f"{type(e).__name__}: {e}"[:1000]
logger.exception("asset_scan_once_failed", run_id=run_id, error=error_msg)
duration_ms = int((_time.time() - started_ms) * 1000)
final_status = "failed" if error_msg else "success"
await _finish_discovery_run(
run_id=run_id,
status=final_status,
total_assets=total_count,
new_assets=new_count,
modified_assets=modified_count,
duration_ms=duration_ms,
error=error_msg,
)
# ADR-090 § aol 留痕 — asset_discovered 是合法 op_type
await _log_aol_asset_discovered(
run_id=run_id,
triggered_by=triggered_by,
total=total_count,
new=new_count,
modified=modified_count,
duration_ms=duration_ms,
status=final_status,
error=error_msg,
)
logger.info(
"asset_scan_once_done",
run_id=run_id,
status=final_status,
total=total_count,
new=new_count,
modified=modified_count,
duration_ms=duration_ms,
)
return run_id
# ============================================================================
# K8s 資產收集
# ============================================================================
async def _collect_k8s_assets() -> list[dict[str, Any]]:
"""
用 K8sProvider 列出全 namespace 的 pods,轉成 asset_inventory 結構。
回傳每筆: {asset_key, asset_type, host, namespace, name, metadata, tags}
"""
from src.plugins.mcp.providers.k8s_provider import K8sProvider
provider = K8sProvider()
result = await asyncio.wait_for(
provider.execute(
tool_name="kubectl_get",
parameters={"namespace": "--all-namespaces", "resource": "pods"},
),
timeout=_KUBECTL_TIMEOUT_SEC,
)
if not result.success:
raise RuntimeError(f"kubectl get pods failed: {result.error}")
raw = result.output
# k8s_provider _kubectl_get 回傳 stdout 字串 (line 299)
if isinstance(raw, str):
try:
payload = _json.loads(raw)
except _json.JSONDecodeError as e:
raise RuntimeError(f"kubectl JSON parse failed: {e}") from e
elif isinstance(raw, dict):
payload = raw
else:
raise RuntimeError(f"unexpected kubectl output type: {type(raw)}")
items = payload.get("items", []) if isinstance(payload, dict) else []
assets: list[dict[str, Any]] = []
for item in items:
meta = item.get("metadata", {}) or {}
spec = item.get("spec", {}) or {}
ns = meta.get("namespace") or "default"
name = meta.get("name") or "unknown"
node = spec.get("nodeName") or ""
# asset_key 為 UNIQUE 自然鍵,跨 run 沿用同一筆
asset_key = f"k8s/pod/{ns}/{name}"
asset_type = "container"
labels = meta.get("labels", {}) or {}
tags = []
if labels.get("app"):
tags.append(f"app:{labels['app']}")
if labels.get("environment"):
tags.append(f"env:{labels['environment']}")
if labels.get("system"):
tags.append(f"system:{labels['system']}")
metadata_jsonb = {
"owner_references": meta.get("ownerReferences", []),
"labels": labels,
"phase": (item.get("status", {}) or {}).get("phase", ""),
"node": node,
}
assets.append({
"asset_key": asset_key,
"asset_type": asset_type,
"host": node or None,
"namespace": ns,
"name": name,
"metadata": metadata_jsonb,
"tags": tags,
})
return assets
# ============================================================================
# DB 寫入
# ============================================================================
async def _start_discovery_run(
triggered_by: str,
scope: list[str],
scan_depth: str,
) -> str | None:
"""
寫 asset_discovery_run header (status='running'), 回傳 run_id。
失敗 → log warning + 回 None,主流程靜默跳過本次 scan。
"""
try:
from sqlalchemy import text as _sql
from src.db.base import get_db_context
async with get_db_context() as db:
row = await db.execute(
_sql("""
INSERT INTO asset_discovery_run (
triggered_by, scope, scan_depth, status,
new_assets, modified_assets, disappeared_assets,
tools_used
) VALUES (
:tb, :scope, :sd, 'running',
0, 0, 0,
CAST(:tools AS jsonb)
)
RETURNING run_id
"""),
{
"tb": triggered_by,
"scope": scope,
"sd": scan_depth,
"tools": _json.dumps({"k8s": "kubectl_get pods --all-namespaces"}),
},
)
run_id = row.scalar()
return str(run_id) if run_id else None
except Exception as e:
logger.warning("asset_discovery_run_start_failed", error=str(e))
return None
async def _finish_discovery_run(
run_id: str,
status: str,
total_assets: int,
new_assets: int,
modified_assets: int,
duration_ms: int,
error: str | None,
) -> None:
try:
from sqlalchemy import text as _sql
from src.db.base import get_db_context
async with get_db_context() as db:
await db.execute(
_sql("""
UPDATE asset_discovery_run
SET status = :st,
ended_at = NOW(),
total_assets = :total,
new_assets = :new,
modified_assets = :mod,
duration_ms = :dur,
error = :err
WHERE run_id = CAST(:rid AS uuid)
"""),
{
"st": status,
"total": total_assets,
"new": new_assets,
"mod": modified_assets,
"dur": duration_ms,
"err": error,
"rid": run_id,
},
)
except Exception as e:
logger.warning("asset_discovery_run_finish_failed", run_id=run_id, error=str(e))
async def _upsert_assets(
assets: list[dict[str, Any]],
run_id: str,
) -> tuple[int, int]:
"""
UPSERT asset_inventory,回傳 (new_count, modified_count)。
用 ON CONFLICT (asset_key) DO UPDATE 確保 idempotent。
"""
if not assets:
return 0, 0
from sqlalchemy import text as _sql
from src.db.base import get_db_context
new_count = 0
modified_count = 0
try:
async with get_db_context() as db:
for a in assets:
# xmax = 0 表示 INSERT (新),否則表示 UPDATE (修改)
row = await db.execute(
_sql("""
INSERT INTO asset_inventory (
asset_key, asset_type, host, namespace, name,
metadata, tags, environment, lifecycle_state,
first_seen_at, last_seen_at
) VALUES (
:ak, :at, :host, :ns, :name,
CAST(:md AS jsonb), :tags, 'prod', 'active',
NOW(), NOW()
)
ON CONFLICT (asset_key) DO UPDATE
SET last_seen_at = NOW(),
host = EXCLUDED.host,
metadata = EXCLUDED.metadata,
tags = EXCLUDED.tags,
lifecycle_state = 'active',
updated_at = NOW(),
decommissioned_at = NULL
RETURNING asset_id, (xmax = 0) AS inserted
"""),
{
"ak": a["asset_key"],
"at": a["asset_type"],
"host": a["host"],
"ns": a["namespace"],
"name": a["name"],
"md": _json.dumps(a["metadata"], ensure_ascii=False),
"tags": a["tags"],
},
)
_, inserted = row.one()
if inserted:
new_count += 1
else:
modified_count += 1
except Exception as e:
logger.exception("asset_upsert_failed", run_id=run_id, error=str(e))
return new_count, modified_count
async def _write_coverage_snapshots(run_id: str) -> None:
"""
為本次 run 中的所有 active asset 寫 7 維 coverage_snapshot (預設 unknown)。
後續 service (rule_catalog / playbook_extractor / km_writer) 會 UPDATE 對應維度。
"""
try:
from sqlalchemy import text as _sql
from src.db.base import get_db_context
async with get_db_context() as db:
# 一次性 INSERT: 取所有 active asset × 7 dimensions
await db.execute(
_sql("""
INSERT INTO asset_coverage_snapshot (
run_id, asset_id, dimension, coverage_status,
evidence, detected_by
)
SELECT
CAST(:rid AS uuid),
ai.asset_id,
d.dimension,
'unknown' AS coverage_status,
'{}'::jsonb,
'asset_scanner' AS detected_by
FROM asset_inventory ai
CROSS JOIN (
VALUES ('auto_monitoring'),('auto_alerting'),('auto_rule_creation'),
('auto_rule_matching'),('auto_playbook'),('auto_remediation'),
('auto_km_creation')
) AS d(dimension)
WHERE ai.lifecycle_state = 'active'
ON CONFLICT (run_id, asset_id, dimension) DO NOTHING
"""),
{"rid": run_id},
)
except Exception as e:
logger.warning("asset_coverage_write_failed", run_id=run_id, error=str(e))
async def _log_aol_asset_discovered(
run_id: str,
triggered_by: str,
total: int,
new: int,
modified: int,
duration_ms: int,
status: str,
error: str | None,
) -> None:
"""寫 automation_operation_log(asset_discovered)。失敗只 log 不阻塞。"""
try:
from sqlalchemy import text as _sql
from src.db.base import get_db_context
aol_status = "success" if status == "success" else "failed"
input_payload = {
"run_id": run_id,
"triggered_by": triggered_by,
"scope": ["k8s"],
"scan_depth": "shallow",
}
output_payload = {
"run_id": run_id,
"total_assets": total,
"new_assets": new,
"modified_assets": modified,
}
async with get_db_context() as db:
await db.execute(
_sql("""
INSERT INTO automation_operation_log (
operation_type, actor, status,
input, output, run_id, duration_ms, error, tags
) VALUES (
'asset_discovered',
'asset_scanner',
:st,
CAST(:input AS jsonb),
CAST(:output AS jsonb),
CAST(:rid AS uuid),
:dur, :err,
:tags
)
"""),
{
"st": aol_status,
"input": _json.dumps(input_payload, ensure_ascii=False),
"output": _json.dumps(output_payload, ensure_ascii=False),
"rid": run_id,
"dur": duration_ms,
"err": (error or "")[:2000] if error else None,
"tags": ["asset_scanner", "discovery", "k8s"],
},
)
except Exception as e:
logger.warning("asset_scanner_aol_write_failed", run_id=run_id, error=str(e))

View File

@@ -370,6 +370,16 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]:
except Exception as e:
logger.warning("incident_analysis_sweeper_schedule_failed", error=str(e))
# ADR-090 § 資產盤點 cron (2026-04-19 ogt + Claude Opus 4.7 Asia/Taipei)
# 每 1 小時掃 K8s pods → 寫 asset_inventory + asset_discovery_run + 7 維 coverage
# 解開 8 張 0 writer 表的第一個 (asset_inventory / asset_discovery_run / asset_coverage_snapshot)
try:
from src.jobs.asset_scanner_job import run_asset_scanner_loop
asyncio.create_task(run_asset_scanner_loop())
logger.info("asset_scanner_loop_scheduled", interval_sec=3600)
except Exception as e:
logger.warning("asset_scanner_loop_schedule_failed", error=str(e))
# ADR-076 Task 4: 每日 08:00 台北時間自動日度巡檢報告
# 2026-04-14 Claude Haiku 4.5 Asia/Taipei
try: