diff --git a/apps/api/src/jobs/asset_scanner_job.py b/apps/api/src/jobs/asset_scanner_job.py index 320018d7..2aa88ffc 100644 --- a/apps/api/src/jobs/asset_scanner_job.py +++ b/apps/api/src/jobs/asset_scanner_job.py @@ -112,16 +112,24 @@ async def scan_once( error_msg: str | None = None try: - # MVP: 掃 K8s pods (shallow), 後續加 deployments / services / ... - k8s_assets = await _collect_k8s_assets() + # 2026-04-19 v3 擴充: 多資源類型掃描 + relationship 提取 + # 資源類型: pods (container), deployments/statefulsets/daemonsets (k8s_workload), + # services (k8s_resource), nodes (host), configmaps (k8s_resource) + # 跳過: secrets (awoooi-executor RBAC 不允許 list) + k8s_assets, relationships = await _collect_all_k8s_assets() total_count = len(k8s_assets) # UPSERT inventory new_count, modified_count = await _upsert_assets(k8s_assets, run_id) + # 建立 asset_relationship (OwnerReference + Service selector + Pod volumes) + rel_written = await _upsert_relationships(relationships) + # 為每個 active asset 寫 7 維 coverage (預設 unknown,後續其他 service 升級為 green/yellow/red) await _write_coverage_snapshots(run_id) + logger.info("asset_scan_relationships_written", run_id=run_id, relationships=rel_written) + except Exception as e: error_msg = f"{type(e).__name__}: {e}"[:1000] logger.exception("asset_scan_once_failed", run_id=run_id, error=error_msg) @@ -167,19 +175,17 @@ async def scan_once( # K8s 資產收集 # ============================================================================ -async def _collect_k8s_assets() -> list[dict[str, Any]]: +async def _fetch_kubectl_json(resource: str, all_namespaces: bool = True) -> dict[str, Any]: """ - 直接 subprocess 執行 kubectl get pods --all-namespaces. - - 2026-04-19 ogt + Claude Opus 4.7 v2: 不走 K8sProvider._kubectl_get, - 因為它把 namespace 參數當 -n 旗標,無法處理 '--all-namespaces'. - 直接 subprocess 最可靠 + Pod 內 /usr/local/bin/kubectl 已確認可用. - - 回傳每筆: {asset_key, asset_type, host, namespace, name, metadata, tags} + subprocess 執行 kubectl get --all-namespaces -o json (或 nodes 不帶 ns). + 回傳 parse 後的 payload dict ({'items': [...]}). """ + cmd = ["kubectl", "get", resource, "-o", "json"] + if all_namespaces: + cmd.insert(3, "--all-namespaces") proc = await asyncio.wait_for( asyncio.create_subprocess_exec( - "kubectl", "get", "pods", "--all-namespaces", "-o", "json", + *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ), @@ -187,52 +193,257 @@ async def _collect_k8s_assets() -> list[dict[str, Any]]: ) stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=_KUBECTL_TIMEOUT_SEC) if proc.returncode != 0: - raise RuntimeError(f"kubectl failed rc={proc.returncode}: {stderr.decode('utf-8', errors='replace')[:500]}") - + raise RuntimeError(f"kubectl {resource} failed rc={proc.returncode}: {stderr.decode('utf-8', errors='replace')[:300]}") try: - payload = _json.loads(stdout.decode("utf-8", errors="replace")) + return _json.loads(stdout.decode("utf-8", errors="replace")) except _json.JSONDecodeError as e: - raise RuntimeError(f"kubectl JSON parse failed: {e}") from e + raise RuntimeError(f"kubectl {resource} JSON parse failed: {e}") from e - items = payload.get("items", []) if isinstance(payload, dict) else [] - assets: list[dict[str, Any]] = [] - for item in items: - meta = item.get("metadata", {}) or {} - spec = item.get("spec", {}) or {} - ns = meta.get("namespace") or "default" - name = meta.get("name") or "unknown" - node = spec.get("nodeName") or "" - # asset_key 為 UNIQUE 自然鍵,跨 run 沿用同一筆 - asset_key = f"k8s/pod/{ns}/{name}" - asset_type = "container" +def _build_pod_asset(item: dict[str, Any]) -> dict[str, Any]: + """Pod → asset_inventory row (asset_type='container').""" + meta = item.get("metadata", {}) or {} + spec = item.get("spec", {}) or {} + ns = meta.get("namespace") or "default" + name = meta.get("name") or "unknown" + node = spec.get("nodeName") or "" + labels = meta.get("labels", {}) or {} - labels = meta.get("labels", {}) or {} - tags = [] - if labels.get("app"): - tags.append(f"app:{labels['app']}") - if labels.get("environment"): - tags.append(f"env:{labels['environment']}") - if labels.get("system"): - tags.append(f"system:{labels['system']}") + tags = [] + for k in ("app", "environment", "system"): + if labels.get(k): + tags.append(f"{k if k != 'environment' else 'env'}:{labels[k]}") - metadata_jsonb = { + return { + "asset_key": f"k8s/pod/{ns}/{name}", + "asset_type": "container", + "host": node or None, + "namespace": ns, + "name": name, + "metadata": { "owner_references": meta.get("ownerReferences", []), "labels": labels, "phase": (item.get("status", {}) or {}).get("phase", ""), "node": node, - } + "volumes": [ + { + "name": v.get("name"), + "configMap": v.get("configMap", {}).get("name"), + "secret": v.get("secret", {}).get("secretName"), + } + for v in (spec.get("volumes") or []) + if v.get("configMap") or v.get("secret") + ], + }, + "tags": tags, + } - assets.append({ - "asset_key": asset_key, - "asset_type": asset_type, - "host": node or None, - "namespace": ns, - "name": name, - "metadata": metadata_jsonb, - "tags": tags, - }) - return assets + +def _build_workload_asset(item: dict[str, Any], kind: str) -> dict[str, Any]: + """Deployment/StatefulSet/DaemonSet → asset_inventory row (asset_type='k8s_workload').""" + meta = item.get("metadata", {}) or {} + ns = meta.get("namespace") or "default" + name = meta.get("name") or "unknown" + labels = meta.get("labels", {}) or {} + spec = item.get("spec", {}) or {} + status = item.get("status", {}) or {} + + return { + "asset_key": f"k8s/{kind.lower()}/{ns}/{name}", + "asset_type": "k8s_workload", + "host": None, + "namespace": ns, + "name": name, + "metadata": { + "kind": kind, + "labels": labels, + "replicas": spec.get("replicas"), + "ready_replicas": status.get("readyReplicas"), + "selector": (spec.get("selector", {}) or {}).get("matchLabels", {}), + }, + "tags": [f"kind:{kind}"] + [f"app:{labels['app']}"] if labels.get("app") else [f"kind:{kind}"], + } + + +def _build_service_asset(item: dict[str, Any]) -> dict[str, Any]: + """Service → asset_inventory row (asset_type='k8s_resource').""" + meta = item.get("metadata", {}) or {} + ns = meta.get("namespace") or "default" + name = meta.get("name") or "unknown" + spec = item.get("spec", {}) or {} + + return { + "asset_key": f"k8s/service/{ns}/{name}", + "asset_type": "k8s_resource", + "host": None, + "namespace": ns, + "name": name, + "metadata": { + "kind": "Service", + "type": spec.get("type"), + "cluster_ip": spec.get("clusterIP"), + "selector": spec.get("selector", {}), + "ports": spec.get("ports", []), + }, + "tags": [f"svc_type:{spec.get('type', '')}"], + } + + +def _build_node_asset(item: dict[str, Any]) -> dict[str, Any]: + """Node → asset_inventory row (asset_type='host').""" + meta = item.get("metadata", {}) or {} + name = meta.get("name") or "unknown" + labels = meta.get("labels", {}) or {} + status = item.get("status", {}) or {} + addresses = status.get("addresses", []) or [] + internal_ip = next((a["address"] for a in addresses if a.get("type") == "InternalIP"), "") + + return { + "asset_key": f"k8s/node/{name}", + "asset_type": "host", + "host": name, + "namespace": None, + "name": name, + "metadata": { + "kind": "Node", + "internal_ip": internal_ip, + "labels": labels, + "capacity": status.get("capacity", {}), + "conditions": [ + {"type": c.get("type"), "status": c.get("status")} + for c in status.get("conditions", []) + ], + }, + "tags": [f"role:{labels.get('kubernetes.io/role', 'worker')}"], + } + + +def _build_configmap_asset(item: dict[str, Any]) -> dict[str, Any]: + """ConfigMap → asset_inventory row (asset_type='k8s_resource').""" + meta = item.get("metadata", {}) or {} + ns = meta.get("namespace") or "default" + name = meta.get("name") or "unknown" + + return { + "asset_key": f"k8s/configmap/{ns}/{name}", + "asset_type": "k8s_resource", + "host": None, + "namespace": ns, + "name": name, + "metadata": { + "kind": "ConfigMap", + "labels": meta.get("labels", {}), + "keys": list((item.get("data") or {}).keys()), + "creationTimestamp": meta.get("creationTimestamp"), + }, + "tags": ["kind:ConfigMap"], + } + + +async def _collect_all_k8s_assets() -> tuple[list[dict[str, Any]], list[dict[str, str]]]: + """ + 掃多種 K8s 資源類型 + 提取 relationship. + + Relationships: + - Pod ─ depends_on ─> Deployment/StatefulSet/DaemonSet (via ownerReferences) + - Service ─ routes_to ─> Pod (via spec.selector 匹配 Pod.labels) + - Pod ─ depends_on ─> ConfigMap (via spec.volumes[].configMap.name) + + 回傳 (assets, relationships) tuple. + relationships: [{'from_key': ..., 'to_key': ..., 'relationship_type': ...}, ...] + """ + assets: list[dict[str, Any]] = [] + relationships: list[dict[str, str]] = [] + + # 1. Nodes (不帶 ns) + try: + payload = await _fetch_kubectl_json("nodes", all_namespaces=False) + for item in payload.get("items", []) or []: + assets.append(_build_node_asset(item)) + except Exception as e: + logger.warning("collect_nodes_failed", error=str(e)) + + # 2. Pods — 主體 + 從 ownerReferences 建 relationship + pod_by_key: dict[str, dict[str, Any]] = {} # asset_key -> pod item (for Service selector match) + try: + payload = await _fetch_kubectl_json("pods") + for item in payload.get("items", []) or []: + a = _build_pod_asset(item) + assets.append(a) + pod_by_key[a["asset_key"]] = item + + # OwnerReference → relationship (Pod ReplicaSet → Deployment 通常是兩層) + meta = item.get("metadata", {}) or {} + ns = meta.get("namespace") or "default" + for ref in meta.get("ownerReferences", []) or []: + owner_kind = ref.get("kind", "").lower() + owner_name = ref.get("name", "") + if not owner_name: + continue + # 跳過 ReplicaSet (中介層),直接找到真正的 Deployment 在後續步驟補 + if owner_kind in ("deployment", "statefulset", "daemonset"): + relationships.append({ + "from_key": a["asset_key"], + "to_key": f"k8s/{owner_kind}/{ns}/{owner_name}", + "relationship_type": "depends_on", + }) + + # Pod volumes → ConfigMap relationship + for v in (item.get("spec", {}) or {}).get("volumes", []) or []: + cm = (v.get("configMap") or {}).get("name") + if cm: + relationships.append({ + "from_key": a["asset_key"], + "to_key": f"k8s/configmap/{ns}/{cm}", + "relationship_type": "depends_on", + }) + except Exception as e: + logger.warning("collect_pods_failed", error=str(e)) + + # 3. Deployments / StatefulSets / DaemonSets + for kind, resource in (("Deployment", "deployments"), ("StatefulSet", "statefulsets"), ("DaemonSet", "daemonsets")): + try: + payload = await _fetch_kubectl_json(resource) + for item in payload.get("items", []) or []: + assets.append(_build_workload_asset(item, kind)) + except Exception as e: + logger.warning(f"collect_{resource}_failed", error=str(e)) + + # 4. Services + routes_to Pod (via selector match) + try: + payload = await _fetch_kubectl_json("services") + for item in payload.get("items", []) or []: + svc = _build_service_asset(item) + assets.append(svc) + + # 為該 Service 找出匹配的 Pod + selector = (item.get("spec", {}) or {}).get("selector") or {} + if not selector: + continue + svc_ns = (item.get("metadata", {}) or {}).get("namespace") or "default" + for pod_key, pod_item in pod_by_key.items(): + if not pod_key.startswith(f"k8s/pod/{svc_ns}/"): + continue + pod_labels = (pod_item.get("metadata", {}) or {}).get("labels", {}) or {} + # selector 所有 kv 必須都在 pod labels 內 + if all(pod_labels.get(k) == v for k, v in selector.items()): + relationships.append({ + "from_key": svc["asset_key"], + "to_key": pod_key, + "relationship_type": "routes_to", + }) + except Exception as e: + logger.warning("collect_services_failed", error=str(e)) + + # 5. ConfigMaps + try: + payload = await _fetch_kubectl_json("configmaps") + for item in payload.get("items", []) or []: + assets.append(_build_configmap_asset(item)) + except Exception as e: + logger.warning("collect_configmaps_failed", error=str(e)) + + return assets, relationships # ============================================================================ @@ -385,6 +596,54 @@ async def _upsert_assets( return new_count, modified_count +async def _upsert_relationships(relationships: list[dict[str, str]]) -> int: + """ + UPSERT asset_relationship (from_asset_id/to_asset_id/relationship_type 為 UNIQUE). + + 用 asset_key 查 asset_id 後 INSERT,忽略 asset 不存在的 relationship. + 回傳實際寫入 (新建/更新) 筆數. + """ + if not relationships: + return 0 + + from sqlalchemy import text as _sql + from src.db.base import get_db_context + + written = 0 + try: + async with get_db_context() as db: + for rel in relationships: + try: + # 用 asset_key → asset_id 解析,同時 UPSERT + await db.execute( + _sql(""" + INSERT INTO asset_relationship ( + from_asset_id, to_asset_id, relationship_type, + first_detected_at, last_verified_at, is_active + ) + SELECT a1.asset_id, a2.asset_id, :rt, NOW(), NOW(), true + FROM asset_inventory a1, asset_inventory a2 + WHERE a1.asset_key = :from_key AND a2.asset_key = :to_key + AND a1.asset_id <> a2.asset_id + ON CONFLICT (from_asset_id, to_asset_id, relationship_type) DO UPDATE + SET last_verified_at = NOW(), + is_active = true + """), + { + "from_key": rel["from_key"], + "to_key": rel["to_key"], + "rt": rel["relationship_type"], + }, + ) + written += 1 + except Exception as e: + logger.debug("relationship_upsert_skipped", + from_key=rel["from_key"], to_key=rel["to_key"], error=str(e)) + except Exception as e: + logger.warning("relationship_upsert_failed", error=str(e)) + return written + + async def _write_coverage_snapshots(run_id: str) -> None: """ 為本次 run 中的所有 active asset 寫 7 維 coverage_snapshot (預設 unknown)。