feat(asset_scanner): v3 擴充多資源類型 + asset_relationship builder
Some checks failed
CD Pipeline / build-and-deploy (push) Has been cancelled

Review 原本 MVP 只掃 pods (39 assets) 盲點,本次擴充:

新增資源類型掃描:
  - nodes (asset_type='host') — 實體主機
  - deployments/statefulsets/daemonsets (asset_type='k8s_workload')
  - services (asset_type='k8s_resource')
  - configmaps (asset_type='k8s_resource')
  跳過 secrets (awoooi-executor RBAC 禁止 list,正確設計)

新增 asset_relationship 自動建立:
  - Pod → Deployment/StatefulSet/DaemonSet (depends_on, via ownerReferences)
  - Service → Pod (routes_to, via spec.selector 匹配 Pod.labels)
  - Pod → ConfigMap (depends_on, via spec.volumes[].configMap.name)
  用 ON CONFLICT (from/to/type) DO UPDATE last_verified_at 保持 idempotent

新增 _fetch_kubectl_json helper (nodes 不帶 --all-namespaces)
新增 _build_{pod,workload,service,node,configmap}_asset 各自 asset 建構器

預期效果 (下次 scan 1h 後或 Pod 重啟時):
  - asset_inventory: 39 → 300+ (全集群多種資源)
  - asset_relationship: 0 → 數百 (OpenClaw 爆炸半徑計算終於有拓樸)

解鎖下游:
  - AI 計算 blast_radius 可查 asset_relationship (之前無資料)
  - MASTER §3.3 D3 Declarative Remediation 的 blast_radius_calculator 有真實依賴圖

Refs: ADR-090 §3.3, MASTER §3.1 L6×D1 (8D 感官拓樸)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
OG T
2026-04-19 16:54:11 +08:00
parent c77ce63a32
commit fdf8b739f1

View File

@@ -112,16 +112,24 @@ async def scan_once(
error_msg: str | None = None error_msg: str | None = None
try: try:
# MVP: 掃 K8s pods (shallow), 後續加 deployments / services / ... # 2026-04-19 v3 擴充: 多資源類型掃描 + relationship 提取
k8s_assets = await _collect_k8s_assets() # 資源類型: pods (container), deployments/statefulsets/daemonsets (k8s_workload),
# services (k8s_resource), nodes (host), configmaps (k8s_resource)
# 跳過: secrets (awoooi-executor RBAC 不允許 list)
k8s_assets, relationships = await _collect_all_k8s_assets()
total_count = len(k8s_assets) total_count = len(k8s_assets)
# UPSERT inventory # UPSERT inventory
new_count, modified_count = await _upsert_assets(k8s_assets, run_id) new_count, modified_count = await _upsert_assets(k8s_assets, run_id)
# 建立 asset_relationship (OwnerReference + Service selector + Pod volumes)
rel_written = await _upsert_relationships(relationships)
# 為每個 active asset 寫 7 維 coverage (預設 unknown,後續其他 service 升級為 green/yellow/red) # 為每個 active asset 寫 7 維 coverage (預設 unknown,後續其他 service 升級為 green/yellow/red)
await _write_coverage_snapshots(run_id) await _write_coverage_snapshots(run_id)
logger.info("asset_scan_relationships_written", run_id=run_id, relationships=rel_written)
except Exception as e: except Exception as e:
error_msg = f"{type(e).__name__}: {e}"[:1000] error_msg = f"{type(e).__name__}: {e}"[:1000]
logger.exception("asset_scan_once_failed", run_id=run_id, error=error_msg) logger.exception("asset_scan_once_failed", run_id=run_id, error=error_msg)
@@ -167,19 +175,17 @@ async def scan_once(
# K8s 資產收集 # K8s 資產收集
# ============================================================================ # ============================================================================
async def _collect_k8s_assets() -> list[dict[str, Any]]: async def _fetch_kubectl_json(resource: str, all_namespaces: bool = True) -> dict[str, Any]:
""" """
直接 subprocess 執行 kubectl get pods --all-namespaces. subprocess 執行 kubectl get <resource> --all-namespaces -o json (或 nodes 不帶 ns).
回傳 parse 後的 payload dict ({'items': [...]}).
2026-04-19 ogt + Claude Opus 4.7 v2: 不走 K8sProvider._kubectl_get,
因為它把 namespace 參數當 -n 旗標,無法處理 '--all-namespaces'.
直接 subprocess 最可靠 + Pod 內 /usr/local/bin/kubectl 已確認可用.
回傳每筆: {asset_key, asset_type, host, namespace, name, metadata, tags}
""" """
cmd = ["kubectl", "get", resource, "-o", "json"]
if all_namespaces:
cmd.insert(3, "--all-namespaces")
proc = await asyncio.wait_for( proc = await asyncio.wait_for(
asyncio.create_subprocess_exec( asyncio.create_subprocess_exec(
"kubectl", "get", "pods", "--all-namespaces", "-o", "json", *cmd,
stdout=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
), ),
@@ -187,52 +193,257 @@ async def _collect_k8s_assets() -> list[dict[str, Any]]:
) )
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=_KUBECTL_TIMEOUT_SEC) stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=_KUBECTL_TIMEOUT_SEC)
if proc.returncode != 0: if proc.returncode != 0:
raise RuntimeError(f"kubectl failed rc={proc.returncode}: {stderr.decode('utf-8', errors='replace')[:500]}") raise RuntimeError(f"kubectl {resource} failed rc={proc.returncode}: {stderr.decode('utf-8', errors='replace')[:300]}")
try: try:
payload = _json.loads(stdout.decode("utf-8", errors="replace")) return _json.loads(stdout.decode("utf-8", errors="replace"))
except _json.JSONDecodeError as e: except _json.JSONDecodeError as e:
raise RuntimeError(f"kubectl JSON parse failed: {e}") from e raise RuntimeError(f"kubectl {resource} JSON parse failed: {e}") from e
items = payload.get("items", []) if isinstance(payload, dict) else []
assets: list[dict[str, Any]] = []
for item in items:
meta = item.get("metadata", {}) or {}
spec = item.get("spec", {}) or {}
ns = meta.get("namespace") or "default"
name = meta.get("name") or "unknown"
node = spec.get("nodeName") or ""
# asset_key 為 UNIQUE 自然鍵,跨 run 沿用同一筆 def _build_pod_asset(item: dict[str, Any]) -> dict[str, Any]:
asset_key = f"k8s/pod/{ns}/{name}" """Pod → asset_inventory row (asset_type='container')."""
asset_type = "container" meta = item.get("metadata", {}) or {}
spec = item.get("spec", {}) or {}
ns = meta.get("namespace") or "default"
name = meta.get("name") or "unknown"
node = spec.get("nodeName") or ""
labels = meta.get("labels", {}) or {}
labels = meta.get("labels", {}) or {} tags = []
tags = [] for k in ("app", "environment", "system"):
if labels.get("app"): if labels.get(k):
tags.append(f"app:{labels['app']}") tags.append(f"{k if k != 'environment' else 'env'}:{labels[k]}")
if labels.get("environment"):
tags.append(f"env:{labels['environment']}")
if labels.get("system"):
tags.append(f"system:{labels['system']}")
metadata_jsonb = { return {
"asset_key": f"k8s/pod/{ns}/{name}",
"asset_type": "container",
"host": node or None,
"namespace": ns,
"name": name,
"metadata": {
"owner_references": meta.get("ownerReferences", []), "owner_references": meta.get("ownerReferences", []),
"labels": labels, "labels": labels,
"phase": (item.get("status", {}) or {}).get("phase", ""), "phase": (item.get("status", {}) or {}).get("phase", ""),
"node": node, "node": node,
} "volumes": [
{
"name": v.get("name"),
"configMap": v.get("configMap", {}).get("name"),
"secret": v.get("secret", {}).get("secretName"),
}
for v in (spec.get("volumes") or [])
if v.get("configMap") or v.get("secret")
],
},
"tags": tags,
}
assets.append({
"asset_key": asset_key, def _build_workload_asset(item: dict[str, Any], kind: str) -> dict[str, Any]:
"asset_type": asset_type, """Deployment/StatefulSet/DaemonSet → asset_inventory row (asset_type='k8s_workload')."""
"host": node or None, meta = item.get("metadata", {}) or {}
"namespace": ns, ns = meta.get("namespace") or "default"
"name": name, name = meta.get("name") or "unknown"
"metadata": metadata_jsonb, labels = meta.get("labels", {}) or {}
"tags": tags, spec = item.get("spec", {}) or {}
}) status = item.get("status", {}) or {}
return assets
return {
"asset_key": f"k8s/{kind.lower()}/{ns}/{name}",
"asset_type": "k8s_workload",
"host": None,
"namespace": ns,
"name": name,
"metadata": {
"kind": kind,
"labels": labels,
"replicas": spec.get("replicas"),
"ready_replicas": status.get("readyReplicas"),
"selector": (spec.get("selector", {}) or {}).get("matchLabels", {}),
},
"tags": [f"kind:{kind}"] + [f"app:{labels['app']}"] if labels.get("app") else [f"kind:{kind}"],
}
def _build_service_asset(item: dict[str, Any]) -> dict[str, Any]:
"""Service → asset_inventory row (asset_type='k8s_resource')."""
meta = item.get("metadata", {}) or {}
ns = meta.get("namespace") or "default"
name = meta.get("name") or "unknown"
spec = item.get("spec", {}) or {}
return {
"asset_key": f"k8s/service/{ns}/{name}",
"asset_type": "k8s_resource",
"host": None,
"namespace": ns,
"name": name,
"metadata": {
"kind": "Service",
"type": spec.get("type"),
"cluster_ip": spec.get("clusterIP"),
"selector": spec.get("selector", {}),
"ports": spec.get("ports", []),
},
"tags": [f"svc_type:{spec.get('type', '')}"],
}
def _build_node_asset(item: dict[str, Any]) -> dict[str, Any]:
"""Node → asset_inventory row (asset_type='host')."""
meta = item.get("metadata", {}) or {}
name = meta.get("name") or "unknown"
labels = meta.get("labels", {}) or {}
status = item.get("status", {}) or {}
addresses = status.get("addresses", []) or []
internal_ip = next((a["address"] for a in addresses if a.get("type") == "InternalIP"), "")
return {
"asset_key": f"k8s/node/{name}",
"asset_type": "host",
"host": name,
"namespace": None,
"name": name,
"metadata": {
"kind": "Node",
"internal_ip": internal_ip,
"labels": labels,
"capacity": status.get("capacity", {}),
"conditions": [
{"type": c.get("type"), "status": c.get("status")}
for c in status.get("conditions", [])
],
},
"tags": [f"role:{labels.get('kubernetes.io/role', 'worker')}"],
}
def _build_configmap_asset(item: dict[str, Any]) -> dict[str, Any]:
"""ConfigMap → asset_inventory row (asset_type='k8s_resource')."""
meta = item.get("metadata", {}) or {}
ns = meta.get("namespace") or "default"
name = meta.get("name") or "unknown"
return {
"asset_key": f"k8s/configmap/{ns}/{name}",
"asset_type": "k8s_resource",
"host": None,
"namespace": ns,
"name": name,
"metadata": {
"kind": "ConfigMap",
"labels": meta.get("labels", {}),
"keys": list((item.get("data") or {}).keys()),
"creationTimestamp": meta.get("creationTimestamp"),
},
"tags": ["kind:ConfigMap"],
}
async def _collect_all_k8s_assets() -> tuple[list[dict[str, Any]], list[dict[str, str]]]:
"""
掃多種 K8s 資源類型 + 提取 relationship.
Relationships:
- Pod ─ depends_on ─> Deployment/StatefulSet/DaemonSet (via ownerReferences)
- Service ─ routes_to ─> Pod (via spec.selector 匹配 Pod.labels)
- Pod ─ depends_on ─> ConfigMap (via spec.volumes[].configMap.name)
回傳 (assets, relationships) tuple.
relationships: [{'from_key': ..., 'to_key': ..., 'relationship_type': ...}, ...]
"""
assets: list[dict[str, Any]] = []
relationships: list[dict[str, str]] = []
# 1. Nodes (不帶 ns)
try:
payload = await _fetch_kubectl_json("nodes", all_namespaces=False)
for item in payload.get("items", []) or []:
assets.append(_build_node_asset(item))
except Exception as e:
logger.warning("collect_nodes_failed", error=str(e))
# 2. Pods — 主體 + 從 ownerReferences 建 relationship
pod_by_key: dict[str, dict[str, Any]] = {} # asset_key -> pod item (for Service selector match)
try:
payload = await _fetch_kubectl_json("pods")
for item in payload.get("items", []) or []:
a = _build_pod_asset(item)
assets.append(a)
pod_by_key[a["asset_key"]] = item
# OwnerReference → relationship (Pod ReplicaSet → Deployment 通常是兩層)
meta = item.get("metadata", {}) or {}
ns = meta.get("namespace") or "default"
for ref in meta.get("ownerReferences", []) or []:
owner_kind = ref.get("kind", "").lower()
owner_name = ref.get("name", "")
if not owner_name:
continue
# 跳過 ReplicaSet (中介層),直接找到真正的 Deployment 在後續步驟補
if owner_kind in ("deployment", "statefulset", "daemonset"):
relationships.append({
"from_key": a["asset_key"],
"to_key": f"k8s/{owner_kind}/{ns}/{owner_name}",
"relationship_type": "depends_on",
})
# Pod volumes → ConfigMap relationship
for v in (item.get("spec", {}) or {}).get("volumes", []) or []:
cm = (v.get("configMap") or {}).get("name")
if cm:
relationships.append({
"from_key": a["asset_key"],
"to_key": f"k8s/configmap/{ns}/{cm}",
"relationship_type": "depends_on",
})
except Exception as e:
logger.warning("collect_pods_failed", error=str(e))
# 3. Deployments / StatefulSets / DaemonSets
for kind, resource in (("Deployment", "deployments"), ("StatefulSet", "statefulsets"), ("DaemonSet", "daemonsets")):
try:
payload = await _fetch_kubectl_json(resource)
for item in payload.get("items", []) or []:
assets.append(_build_workload_asset(item, kind))
except Exception as e:
logger.warning(f"collect_{resource}_failed", error=str(e))
# 4. Services + routes_to Pod (via selector match)
try:
payload = await _fetch_kubectl_json("services")
for item in payload.get("items", []) or []:
svc = _build_service_asset(item)
assets.append(svc)
# 為該 Service 找出匹配的 Pod
selector = (item.get("spec", {}) or {}).get("selector") or {}
if not selector:
continue
svc_ns = (item.get("metadata", {}) or {}).get("namespace") or "default"
for pod_key, pod_item in pod_by_key.items():
if not pod_key.startswith(f"k8s/pod/{svc_ns}/"):
continue
pod_labels = (pod_item.get("metadata", {}) or {}).get("labels", {}) or {}
# selector 所有 kv 必須都在 pod labels 內
if all(pod_labels.get(k) == v for k, v in selector.items()):
relationships.append({
"from_key": svc["asset_key"],
"to_key": pod_key,
"relationship_type": "routes_to",
})
except Exception as e:
logger.warning("collect_services_failed", error=str(e))
# 5. ConfigMaps
try:
payload = await _fetch_kubectl_json("configmaps")
for item in payload.get("items", []) or []:
assets.append(_build_configmap_asset(item))
except Exception as e:
logger.warning("collect_configmaps_failed", error=str(e))
return assets, relationships
# ============================================================================ # ============================================================================
@@ -385,6 +596,54 @@ async def _upsert_assets(
return new_count, modified_count return new_count, modified_count
async def _upsert_relationships(relationships: list[dict[str, str]]) -> int:
"""
UPSERT asset_relationship (from_asset_id/to_asset_id/relationship_type 為 UNIQUE).
用 asset_key 查 asset_id 後 INSERT,忽略 asset 不存在的 relationship.
回傳實際寫入 (新建/更新) 筆數.
"""
if not relationships:
return 0
from sqlalchemy import text as _sql
from src.db.base import get_db_context
written = 0
try:
async with get_db_context() as db:
for rel in relationships:
try:
# 用 asset_key → asset_id 解析,同時 UPSERT
await db.execute(
_sql("""
INSERT INTO asset_relationship (
from_asset_id, to_asset_id, relationship_type,
first_detected_at, last_verified_at, is_active
)
SELECT a1.asset_id, a2.asset_id, :rt, NOW(), NOW(), true
FROM asset_inventory a1, asset_inventory a2
WHERE a1.asset_key = :from_key AND a2.asset_key = :to_key
AND a1.asset_id <> a2.asset_id
ON CONFLICT (from_asset_id, to_asset_id, relationship_type) DO UPDATE
SET last_verified_at = NOW(),
is_active = true
"""),
{
"from_key": rel["from_key"],
"to_key": rel["to_key"],
"rt": rel["relationship_type"],
},
)
written += 1
except Exception as e:
logger.debug("relationship_upsert_skipped",
from_key=rel["from_key"], to_key=rel["to_key"], error=str(e))
except Exception as e:
logger.warning("relationship_upsert_failed", error=str(e))
return written
async def _write_coverage_snapshots(run_id: str) -> None: async def _write_coverage_snapshots(run_id: str) -> None:
""" """
為本次 run 中的所有 active asset 寫 7 維 coverage_snapshot (預設 unknown)。 為本次 run 中的所有 active asset 寫 7 維 coverage_snapshot (預設 unknown)。