feat(asset_scanner): v3 擴充多資源類型 + asset_relationship builder
Some checks failed
CD Pipeline / build-and-deploy (push) Has been cancelled
Some checks failed
CD Pipeline / build-and-deploy (push) Has been cancelled
Review 原本 MVP 只掃 pods (39 assets) 盲點,本次擴充:
新增資源類型掃描:
- nodes (asset_type='host') — 實體主機
- deployments/statefulsets/daemonsets (asset_type='k8s_workload')
- services (asset_type='k8s_resource')
- configmaps (asset_type='k8s_resource')
跳過 secrets (awoooi-executor RBAC 禁止 list,正確設計)
新增 asset_relationship 自動建立:
- Pod → Deployment/StatefulSet/DaemonSet (depends_on, via ownerReferences)
- Service → Pod (routes_to, via spec.selector 匹配 Pod.labels)
- Pod → ConfigMap (depends_on, via spec.volumes[].configMap.name)
用 ON CONFLICT (from/to/type) DO UPDATE last_verified_at 保持 idempotent
新增 _fetch_kubectl_json helper (nodes 不帶 --all-namespaces)
新增 _build_{pod,workload,service,node,configmap}_asset 各自 asset 建構器
預期效果 (下次 scan 1h 後或 Pod 重啟時):
- asset_inventory: 39 → 300+ (全集群多種資源)
- asset_relationship: 0 → 數百 (OpenClaw 爆炸半徑計算終於有拓樸)
解鎖下游:
- AI 計算 blast_radius 可查 asset_relationship (之前無資料)
- MASTER §3.3 D3 Declarative Remediation 的 blast_radius_calculator 有真實依賴圖
Refs: ADR-090 §3.3, MASTER §3.1 L6×D1 (8D 感官拓樸)
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -112,16 +112,24 @@ async def scan_once(
|
||||
error_msg: str | None = None
|
||||
|
||||
try:
|
||||
# MVP: 掃 K8s pods (shallow), 後續加 deployments / services / ...
|
||||
k8s_assets = await _collect_k8s_assets()
|
||||
# 2026-04-19 v3 擴充: 多資源類型掃描 + relationship 提取
|
||||
# 資源類型: pods (container), deployments/statefulsets/daemonsets (k8s_workload),
|
||||
# services (k8s_resource), nodes (host), configmaps (k8s_resource)
|
||||
# 跳過: secrets (awoooi-executor RBAC 不允許 list)
|
||||
k8s_assets, relationships = await _collect_all_k8s_assets()
|
||||
total_count = len(k8s_assets)
|
||||
|
||||
# UPSERT inventory
|
||||
new_count, modified_count = await _upsert_assets(k8s_assets, run_id)
|
||||
|
||||
# 建立 asset_relationship (OwnerReference + Service selector + Pod volumes)
|
||||
rel_written = await _upsert_relationships(relationships)
|
||||
|
||||
# 為每個 active asset 寫 7 維 coverage (預設 unknown,後續其他 service 升級為 green/yellow/red)
|
||||
await _write_coverage_snapshots(run_id)
|
||||
|
||||
logger.info("asset_scan_relationships_written", run_id=run_id, relationships=rel_written)
|
||||
|
||||
except Exception as e:
|
||||
error_msg = f"{type(e).__name__}: {e}"[:1000]
|
||||
logger.exception("asset_scan_once_failed", run_id=run_id, error=error_msg)
|
||||
@@ -167,19 +175,17 @@ async def scan_once(
|
||||
# K8s 資產收集
|
||||
# ============================================================================
|
||||
|
||||
async def _collect_k8s_assets() -> list[dict[str, Any]]:
|
||||
async def _fetch_kubectl_json(resource: str, all_namespaces: bool = True) -> dict[str, Any]:
|
||||
"""
|
||||
直接 subprocess 執行 kubectl get pods --all-namespaces.
|
||||
|
||||
2026-04-19 ogt + Claude Opus 4.7 v2: 不走 K8sProvider._kubectl_get,
|
||||
因為它把 namespace 參數當 -n 旗標,無法處理 '--all-namespaces'.
|
||||
直接 subprocess 最可靠 + Pod 內 /usr/local/bin/kubectl 已確認可用.
|
||||
|
||||
回傳每筆: {asset_key, asset_type, host, namespace, name, metadata, tags}
|
||||
subprocess 執行 kubectl get <resource> --all-namespaces -o json (或 nodes 不帶 ns).
|
||||
回傳 parse 後的 payload dict ({'items': [...]}).
|
||||
"""
|
||||
cmd = ["kubectl", "get", resource, "-o", "json"]
|
||||
if all_namespaces:
|
||||
cmd.insert(3, "--all-namespaces")
|
||||
proc = await asyncio.wait_for(
|
||||
asyncio.create_subprocess_exec(
|
||||
"kubectl", "get", "pods", "--all-namespaces", "-o", "json",
|
||||
*cmd,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
),
|
||||
@@ -187,52 +193,257 @@ async def _collect_k8s_assets() -> list[dict[str, Any]]:
|
||||
)
|
||||
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=_KUBECTL_TIMEOUT_SEC)
|
||||
if proc.returncode != 0:
|
||||
raise RuntimeError(f"kubectl failed rc={proc.returncode}: {stderr.decode('utf-8', errors='replace')[:500]}")
|
||||
|
||||
raise RuntimeError(f"kubectl {resource} failed rc={proc.returncode}: {stderr.decode('utf-8', errors='replace')[:300]}")
|
||||
try:
|
||||
payload = _json.loads(stdout.decode("utf-8", errors="replace"))
|
||||
return _json.loads(stdout.decode("utf-8", errors="replace"))
|
||||
except _json.JSONDecodeError as e:
|
||||
raise RuntimeError(f"kubectl JSON parse failed: {e}") from e
|
||||
raise RuntimeError(f"kubectl {resource} JSON parse failed: {e}") from e
|
||||
|
||||
items = payload.get("items", []) if isinstance(payload, dict) else []
|
||||
assets: list[dict[str, Any]] = []
|
||||
for item in items:
|
||||
meta = item.get("metadata", {}) or {}
|
||||
spec = item.get("spec", {}) or {}
|
||||
ns = meta.get("namespace") or "default"
|
||||
name = meta.get("name") or "unknown"
|
||||
node = spec.get("nodeName") or ""
|
||||
|
||||
# asset_key 為 UNIQUE 自然鍵,跨 run 沿用同一筆
|
||||
asset_key = f"k8s/pod/{ns}/{name}"
|
||||
asset_type = "container"
|
||||
def _build_pod_asset(item: dict[str, Any]) -> dict[str, Any]:
|
||||
"""Pod → asset_inventory row (asset_type='container')."""
|
||||
meta = item.get("metadata", {}) or {}
|
||||
spec = item.get("spec", {}) or {}
|
||||
ns = meta.get("namespace") or "default"
|
||||
name = meta.get("name") or "unknown"
|
||||
node = spec.get("nodeName") or ""
|
||||
labels = meta.get("labels", {}) or {}
|
||||
|
||||
labels = meta.get("labels", {}) or {}
|
||||
tags = []
|
||||
if labels.get("app"):
|
||||
tags.append(f"app:{labels['app']}")
|
||||
if labels.get("environment"):
|
||||
tags.append(f"env:{labels['environment']}")
|
||||
if labels.get("system"):
|
||||
tags.append(f"system:{labels['system']}")
|
||||
tags = []
|
||||
for k in ("app", "environment", "system"):
|
||||
if labels.get(k):
|
||||
tags.append(f"{k if k != 'environment' else 'env'}:{labels[k]}")
|
||||
|
||||
metadata_jsonb = {
|
||||
return {
|
||||
"asset_key": f"k8s/pod/{ns}/{name}",
|
||||
"asset_type": "container",
|
||||
"host": node or None,
|
||||
"namespace": ns,
|
||||
"name": name,
|
||||
"metadata": {
|
||||
"owner_references": meta.get("ownerReferences", []),
|
||||
"labels": labels,
|
||||
"phase": (item.get("status", {}) or {}).get("phase", ""),
|
||||
"node": node,
|
||||
}
|
||||
"volumes": [
|
||||
{
|
||||
"name": v.get("name"),
|
||||
"configMap": v.get("configMap", {}).get("name"),
|
||||
"secret": v.get("secret", {}).get("secretName"),
|
||||
}
|
||||
for v in (spec.get("volumes") or [])
|
||||
if v.get("configMap") or v.get("secret")
|
||||
],
|
||||
},
|
||||
"tags": tags,
|
||||
}
|
||||
|
||||
assets.append({
|
||||
"asset_key": asset_key,
|
||||
"asset_type": asset_type,
|
||||
"host": node or None,
|
||||
"namespace": ns,
|
||||
"name": name,
|
||||
"metadata": metadata_jsonb,
|
||||
"tags": tags,
|
||||
})
|
||||
return assets
|
||||
|
||||
def _build_workload_asset(item: dict[str, Any], kind: str) -> dict[str, Any]:
|
||||
"""Deployment/StatefulSet/DaemonSet → asset_inventory row (asset_type='k8s_workload')."""
|
||||
meta = item.get("metadata", {}) or {}
|
||||
ns = meta.get("namespace") or "default"
|
||||
name = meta.get("name") or "unknown"
|
||||
labels = meta.get("labels", {}) or {}
|
||||
spec = item.get("spec", {}) or {}
|
||||
status = item.get("status", {}) or {}
|
||||
|
||||
return {
|
||||
"asset_key": f"k8s/{kind.lower()}/{ns}/{name}",
|
||||
"asset_type": "k8s_workload",
|
||||
"host": None,
|
||||
"namespace": ns,
|
||||
"name": name,
|
||||
"metadata": {
|
||||
"kind": kind,
|
||||
"labels": labels,
|
||||
"replicas": spec.get("replicas"),
|
||||
"ready_replicas": status.get("readyReplicas"),
|
||||
"selector": (spec.get("selector", {}) or {}).get("matchLabels", {}),
|
||||
},
|
||||
"tags": [f"kind:{kind}"] + [f"app:{labels['app']}"] if labels.get("app") else [f"kind:{kind}"],
|
||||
}
|
||||
|
||||
|
||||
def _build_service_asset(item: dict[str, Any]) -> dict[str, Any]:
|
||||
"""Service → asset_inventory row (asset_type='k8s_resource')."""
|
||||
meta = item.get("metadata", {}) or {}
|
||||
ns = meta.get("namespace") or "default"
|
||||
name = meta.get("name") or "unknown"
|
||||
spec = item.get("spec", {}) or {}
|
||||
|
||||
return {
|
||||
"asset_key": f"k8s/service/{ns}/{name}",
|
||||
"asset_type": "k8s_resource",
|
||||
"host": None,
|
||||
"namespace": ns,
|
||||
"name": name,
|
||||
"metadata": {
|
||||
"kind": "Service",
|
||||
"type": spec.get("type"),
|
||||
"cluster_ip": spec.get("clusterIP"),
|
||||
"selector": spec.get("selector", {}),
|
||||
"ports": spec.get("ports", []),
|
||||
},
|
||||
"tags": [f"svc_type:{spec.get('type', '')}"],
|
||||
}
|
||||
|
||||
|
||||
def _build_node_asset(item: dict[str, Any]) -> dict[str, Any]:
|
||||
"""Node → asset_inventory row (asset_type='host')."""
|
||||
meta = item.get("metadata", {}) or {}
|
||||
name = meta.get("name") or "unknown"
|
||||
labels = meta.get("labels", {}) or {}
|
||||
status = item.get("status", {}) or {}
|
||||
addresses = status.get("addresses", []) or []
|
||||
internal_ip = next((a["address"] for a in addresses if a.get("type") == "InternalIP"), "")
|
||||
|
||||
return {
|
||||
"asset_key": f"k8s/node/{name}",
|
||||
"asset_type": "host",
|
||||
"host": name,
|
||||
"namespace": None,
|
||||
"name": name,
|
||||
"metadata": {
|
||||
"kind": "Node",
|
||||
"internal_ip": internal_ip,
|
||||
"labels": labels,
|
||||
"capacity": status.get("capacity", {}),
|
||||
"conditions": [
|
||||
{"type": c.get("type"), "status": c.get("status")}
|
||||
for c in status.get("conditions", [])
|
||||
],
|
||||
},
|
||||
"tags": [f"role:{labels.get('kubernetes.io/role', 'worker')}"],
|
||||
}
|
||||
|
||||
|
||||
def _build_configmap_asset(item: dict[str, Any]) -> dict[str, Any]:
|
||||
"""ConfigMap → asset_inventory row (asset_type='k8s_resource')."""
|
||||
meta = item.get("metadata", {}) or {}
|
||||
ns = meta.get("namespace") or "default"
|
||||
name = meta.get("name") or "unknown"
|
||||
|
||||
return {
|
||||
"asset_key": f"k8s/configmap/{ns}/{name}",
|
||||
"asset_type": "k8s_resource",
|
||||
"host": None,
|
||||
"namespace": ns,
|
||||
"name": name,
|
||||
"metadata": {
|
||||
"kind": "ConfigMap",
|
||||
"labels": meta.get("labels", {}),
|
||||
"keys": list((item.get("data") or {}).keys()),
|
||||
"creationTimestamp": meta.get("creationTimestamp"),
|
||||
},
|
||||
"tags": ["kind:ConfigMap"],
|
||||
}
|
||||
|
||||
|
||||
async def _collect_all_k8s_assets() -> tuple[list[dict[str, Any]], list[dict[str, str]]]:
|
||||
"""
|
||||
掃多種 K8s 資源類型 + 提取 relationship.
|
||||
|
||||
Relationships:
|
||||
- Pod ─ depends_on ─> Deployment/StatefulSet/DaemonSet (via ownerReferences)
|
||||
- Service ─ routes_to ─> Pod (via spec.selector 匹配 Pod.labels)
|
||||
- Pod ─ depends_on ─> ConfigMap (via spec.volumes[].configMap.name)
|
||||
|
||||
回傳 (assets, relationships) tuple.
|
||||
relationships: [{'from_key': ..., 'to_key': ..., 'relationship_type': ...}, ...]
|
||||
"""
|
||||
assets: list[dict[str, Any]] = []
|
||||
relationships: list[dict[str, str]] = []
|
||||
|
||||
# 1. Nodes (不帶 ns)
|
||||
try:
|
||||
payload = await _fetch_kubectl_json("nodes", all_namespaces=False)
|
||||
for item in payload.get("items", []) or []:
|
||||
assets.append(_build_node_asset(item))
|
||||
except Exception as e:
|
||||
logger.warning("collect_nodes_failed", error=str(e))
|
||||
|
||||
# 2. Pods — 主體 + 從 ownerReferences 建 relationship
|
||||
pod_by_key: dict[str, dict[str, Any]] = {} # asset_key -> pod item (for Service selector match)
|
||||
try:
|
||||
payload = await _fetch_kubectl_json("pods")
|
||||
for item in payload.get("items", []) or []:
|
||||
a = _build_pod_asset(item)
|
||||
assets.append(a)
|
||||
pod_by_key[a["asset_key"]] = item
|
||||
|
||||
# OwnerReference → relationship (Pod ReplicaSet → Deployment 通常是兩層)
|
||||
meta = item.get("metadata", {}) or {}
|
||||
ns = meta.get("namespace") or "default"
|
||||
for ref in meta.get("ownerReferences", []) or []:
|
||||
owner_kind = ref.get("kind", "").lower()
|
||||
owner_name = ref.get("name", "")
|
||||
if not owner_name:
|
||||
continue
|
||||
# 跳過 ReplicaSet (中介層),直接找到真正的 Deployment 在後續步驟補
|
||||
if owner_kind in ("deployment", "statefulset", "daemonset"):
|
||||
relationships.append({
|
||||
"from_key": a["asset_key"],
|
||||
"to_key": f"k8s/{owner_kind}/{ns}/{owner_name}",
|
||||
"relationship_type": "depends_on",
|
||||
})
|
||||
|
||||
# Pod volumes → ConfigMap relationship
|
||||
for v in (item.get("spec", {}) or {}).get("volumes", []) or []:
|
||||
cm = (v.get("configMap") or {}).get("name")
|
||||
if cm:
|
||||
relationships.append({
|
||||
"from_key": a["asset_key"],
|
||||
"to_key": f"k8s/configmap/{ns}/{cm}",
|
||||
"relationship_type": "depends_on",
|
||||
})
|
||||
except Exception as e:
|
||||
logger.warning("collect_pods_failed", error=str(e))
|
||||
|
||||
# 3. Deployments / StatefulSets / DaemonSets
|
||||
for kind, resource in (("Deployment", "deployments"), ("StatefulSet", "statefulsets"), ("DaemonSet", "daemonsets")):
|
||||
try:
|
||||
payload = await _fetch_kubectl_json(resource)
|
||||
for item in payload.get("items", []) or []:
|
||||
assets.append(_build_workload_asset(item, kind))
|
||||
except Exception as e:
|
||||
logger.warning(f"collect_{resource}_failed", error=str(e))
|
||||
|
||||
# 4. Services + routes_to Pod (via selector match)
|
||||
try:
|
||||
payload = await _fetch_kubectl_json("services")
|
||||
for item in payload.get("items", []) or []:
|
||||
svc = _build_service_asset(item)
|
||||
assets.append(svc)
|
||||
|
||||
# 為該 Service 找出匹配的 Pod
|
||||
selector = (item.get("spec", {}) or {}).get("selector") or {}
|
||||
if not selector:
|
||||
continue
|
||||
svc_ns = (item.get("metadata", {}) or {}).get("namespace") or "default"
|
||||
for pod_key, pod_item in pod_by_key.items():
|
||||
if not pod_key.startswith(f"k8s/pod/{svc_ns}/"):
|
||||
continue
|
||||
pod_labels = (pod_item.get("metadata", {}) or {}).get("labels", {}) or {}
|
||||
# selector 所有 kv 必須都在 pod labels 內
|
||||
if all(pod_labels.get(k) == v for k, v in selector.items()):
|
||||
relationships.append({
|
||||
"from_key": svc["asset_key"],
|
||||
"to_key": pod_key,
|
||||
"relationship_type": "routes_to",
|
||||
})
|
||||
except Exception as e:
|
||||
logger.warning("collect_services_failed", error=str(e))
|
||||
|
||||
# 5. ConfigMaps
|
||||
try:
|
||||
payload = await _fetch_kubectl_json("configmaps")
|
||||
for item in payload.get("items", []) or []:
|
||||
assets.append(_build_configmap_asset(item))
|
||||
except Exception as e:
|
||||
logger.warning("collect_configmaps_failed", error=str(e))
|
||||
|
||||
return assets, relationships
|
||||
|
||||
|
||||
# ============================================================================
|
||||
@@ -385,6 +596,54 @@ async def _upsert_assets(
|
||||
return new_count, modified_count
|
||||
|
||||
|
||||
async def _upsert_relationships(relationships: list[dict[str, str]]) -> int:
|
||||
"""
|
||||
UPSERT asset_relationship (from_asset_id/to_asset_id/relationship_type 為 UNIQUE).
|
||||
|
||||
用 asset_key 查 asset_id 後 INSERT,忽略 asset 不存在的 relationship.
|
||||
回傳實際寫入 (新建/更新) 筆數.
|
||||
"""
|
||||
if not relationships:
|
||||
return 0
|
||||
|
||||
from sqlalchemy import text as _sql
|
||||
from src.db.base import get_db_context
|
||||
|
||||
written = 0
|
||||
try:
|
||||
async with get_db_context() as db:
|
||||
for rel in relationships:
|
||||
try:
|
||||
# 用 asset_key → asset_id 解析,同時 UPSERT
|
||||
await db.execute(
|
||||
_sql("""
|
||||
INSERT INTO asset_relationship (
|
||||
from_asset_id, to_asset_id, relationship_type,
|
||||
first_detected_at, last_verified_at, is_active
|
||||
)
|
||||
SELECT a1.asset_id, a2.asset_id, :rt, NOW(), NOW(), true
|
||||
FROM asset_inventory a1, asset_inventory a2
|
||||
WHERE a1.asset_key = :from_key AND a2.asset_key = :to_key
|
||||
AND a1.asset_id <> a2.asset_id
|
||||
ON CONFLICT (from_asset_id, to_asset_id, relationship_type) DO UPDATE
|
||||
SET last_verified_at = NOW(),
|
||||
is_active = true
|
||||
"""),
|
||||
{
|
||||
"from_key": rel["from_key"],
|
||||
"to_key": rel["to_key"],
|
||||
"rt": rel["relationship_type"],
|
||||
},
|
||||
)
|
||||
written += 1
|
||||
except Exception as e:
|
||||
logger.debug("relationship_upsert_skipped",
|
||||
from_key=rel["from_key"], to_key=rel["to_key"], error=str(e))
|
||||
except Exception as e:
|
||||
logger.warning("relationship_upsert_failed", error=str(e))
|
||||
return written
|
||||
|
||||
|
||||
async def _write_coverage_snapshots(run_id: str) -> None:
|
||||
"""
|
||||
為本次 run 中的所有 active asset 寫 7 維 coverage_snapshot (預設 unknown)。
|
||||
|
||||
Reference in New Issue
Block a user