feat(aiops): capacity_scanner + compliance_scanner (ADR-090 Phase 7 剩 2)
Some checks failed
CD Pipeline / build-and-deploy (push) Has been cancelled
Some checks failed
CD Pipeline / build-and-deploy (push) Has been cancelled
完成 ADR-090 Phase 7 第 3+4 個 service,解鎖 2 張 0 writer 表:
B3. apps/api/src/jobs/capacity_scanner_job.py (~300 行)
- 每日 02:00 Taipei 撈 Prometheus node_exporter
- 寫 host_capacity_snapshot (load1/5/15, cpu, iowait, mem, swap)
- heuristic ai_verdict: cpu>80 or mem>85 → critical; >60/70 → warning
- 超過硬閾值 → 寫 capacity_violation_event
- 寫 aol(capacity_recommendation)
B4. apps/api/src/jobs/compliance_scanner_job.py (~270 行)
- 每日 03:00 Taipei 遍歷 asset_inventory active assets
- 為每個 asset 寫 7 維 compliance snapshot
- secret_rotated: 真實檢查 (metadata.creationTimestamp > 90d = warning)
- 其他 6 維 (ssl_cert_valid / cve_scan / backup_tested /
audit_log_enabled / access_reviewed / encryption_at_rest) 占位 'unknown'
+ detail TODO,後續 agent 補邏輯
- 寫 aol(coverage_recalculated) summary
main.py lifespan 同步 wire 2 個新 loop
預期解鎖 (配合 B1 asset_scanner + B2 rule_catalog_sync):
- asset_inventory: 0 → 數百 (B1)
- asset_discovery_run: 0 → 每小時 1 (B1)
- asset_coverage_snapshot: 0 → assets × 7 維 (B1)
- alert_rule_catalog: 0 → ~68 條 (B2)
- host_capacity_snapshot: 0 → 每日 hosts (B3)
- capacity_violation_event: 0 → 超閾值時 (B3)
- asset_compliance_snapshot: 0 → assets × 7 維 (B4)
automation_operation_log 新增 4 個 op_type: asset_discovered / rule_created /
rule_updated / capacity_recommendation / coverage_recalculated
8 張 0 writer 表到此全數有 writer,ADR-090 Phase 7 實作完成.
Refs: ADR-090 §4.2 Phase 4, MASTER §3.5 D5 (capacity-aware)
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
339
apps/api/src/jobs/capacity_scanner_job.py
Normal file
339
apps/api/src/jobs/capacity_scanner_job.py
Normal file
@@ -0,0 +1,339 @@
|
||||
"""
|
||||
Capacity Scanner Job — ADR-090 § Phase 4 NemoTron 容量巡檢 MVP
|
||||
===============================================================
|
||||
每日 02:00 Taipei 從 Prometheus 撈 node metrics → 寫 host_capacity_snapshot.
|
||||
|
||||
職責邊界:
|
||||
✅ 撈 Prometheus node_exporter metrics (load / cpu / mem / swap)
|
||||
✅ 為每個 host 寫一筆 host_capacity_snapshot + heuristic ai_verdict
|
||||
✅ 超過硬閾值寫 capacity_violation_event
|
||||
✅ 寫 automation_operation_log(capacity_recommendation)
|
||||
❌ 不做 Holt-Winters 預測 (那是 Hermes 後續階段)
|
||||
❌ 不自動執行修復 (只 recommend,統帥決策)
|
||||
|
||||
設計鐵律:
|
||||
- 每日一次 snapshot (歷史 30d 供 AI 趨勢分析)
|
||||
- ai_verdict heuristic: cpu>80 or mem>85 → critical; >60/70 → warning; else safe
|
||||
- Prometheus 失敗 → log + skip 該 host,不 crash 整 loop
|
||||
|
||||
資料來源:
|
||||
- PROMETHEUS_URL/api/v1/query (instant query)
|
||||
- 預期 instance 格式: '192.168.0.XXX:9100' 或 hostname
|
||||
|
||||
排程:
|
||||
- 首次延遲 120s
|
||||
- 後續每日 02:00 (Taipei) 對齊跑
|
||||
|
||||
2026-04-19 ogt + Claude Opus 4.7 (1M context) Asia/Taipei
|
||||
ADR-090 § Phase 4
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import json as _json
|
||||
import time as _time
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from typing import Any
|
||||
|
||||
import httpx
|
||||
import structlog
|
||||
|
||||
from src.core.config import settings
|
||||
|
||||
logger = structlog.get_logger(__name__)
|
||||
|
||||
# ============================================================================
|
||||
# 排程 / 閾值
|
||||
# ============================================================================
|
||||
_FIRST_DELAY_SEC = 120
|
||||
_HTTP_TIMEOUT_SEC = 10
|
||||
_LOOP_BACKOFF_SEC = 1800
|
||||
|
||||
# Taipei = UTC+8,每日 02:00 Taipei = 18:00 UTC 前一天
|
||||
_DAILY_TRIGGER_HOUR_TAIPEI = 2
|
||||
|
||||
# Heuristic 閾值 (ai_verdict 計算)
|
||||
_CPU_CRITICAL = 80.0
|
||||
_CPU_WARNING = 60.0
|
||||
_MEM_CRITICAL = 85.0
|
||||
_MEM_WARNING = 70.0
|
||||
_SWAP_CRITICAL = 50.0
|
||||
_LOAD1_CRITICAL_RATIO = 2.0 # load1 > 2x CPU cores = critical
|
||||
|
||||
# Prometheus 查詢 (instant query,每筆 host 一個 label)
|
||||
_PROM_QUERIES = {
|
||||
"load1": "avg by(instance) (node_load1)",
|
||||
"load5": "avg by(instance) (node_load5)",
|
||||
"load15": "avg by(instance) (node_load15)",
|
||||
"cpu_used_pct": "100 - (avg by(instance) (rate(node_cpu_seconds_total{mode=\"idle\"}[5m])) * 100)",
|
||||
"cpu_iowait_pct": "avg by(instance) (rate(node_cpu_seconds_total{mode=\"iowait\"}[5m])) * 100",
|
||||
"mem_used_pct": "(1 - node_memory_MemAvailable_bytes / node_memory_MemTotal_bytes) * 100",
|
||||
"swap_used_pct": "(node_memory_SwapTotal_bytes - node_memory_SwapFree_bytes) / (node_memory_SwapTotal_bytes > 0 or vector(1)) * 100",
|
||||
}
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Public entry — main.py lifespan 呼叫
|
||||
# ============================================================================
|
||||
|
||||
async def run_capacity_scanner_loop() -> None:
|
||||
"""每日 02:00 Taipei 跑一次容量巡檢."""
|
||||
logger.info("capacity_scanner_loop_started")
|
||||
await asyncio.sleep(_FIRST_DELAY_SEC)
|
||||
|
||||
while True:
|
||||
try:
|
||||
await scan_once()
|
||||
except Exception as e:
|
||||
logger.exception("capacity_scanner_loop_error", error=str(e))
|
||||
await asyncio.sleep(_LOOP_BACKOFF_SEC)
|
||||
continue
|
||||
|
||||
# 算下次 02:00 Taipei 的 sleep 秒數
|
||||
sleep_sec = _seconds_until_next_trigger()
|
||||
logger.info("capacity_scanner_next_tick", sleep_sec=sleep_sec)
|
||||
await asyncio.sleep(sleep_sec)
|
||||
|
||||
|
||||
async def scan_once(triggered_by: str = "cron") -> dict[str, int]:
|
||||
"""執行一次容量巡檢,每 host 寫一筆 snapshot."""
|
||||
started_ms = _time.time()
|
||||
stats = {"hosts_scanned": 0, "violations": 0}
|
||||
error_msg: str | None = None
|
||||
|
||||
try:
|
||||
metrics_by_host = await _fetch_all_metrics()
|
||||
for host, m in metrics_by_host.items():
|
||||
snapshot_id = await _write_snapshot(host, m)
|
||||
if snapshot_id:
|
||||
stats["hosts_scanned"] += 1
|
||||
viol = await _check_and_write_violations(host, m)
|
||||
stats["violations"] += viol
|
||||
except Exception as e:
|
||||
error_msg = f"{type(e).__name__}: {e}"[:1000]
|
||||
logger.exception("capacity_scan_once_failed", error=error_msg)
|
||||
|
||||
duration_ms = int((_time.time() - started_ms) * 1000)
|
||||
await _log_aol(stats=stats, duration_ms=duration_ms, triggered_by=triggered_by, error=error_msg)
|
||||
|
||||
logger.info(
|
||||
"capacity_scan_once_done",
|
||||
hosts=stats["hosts_scanned"],
|
||||
violations=stats["violations"],
|
||||
duration_ms=duration_ms,
|
||||
)
|
||||
return stats
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Prometheus 撈資料
|
||||
# ============================================================================
|
||||
|
||||
async def _fetch_all_metrics() -> dict[str, dict[str, float]]:
|
||||
"""
|
||||
對每個 _PROM_QUERIES 跑 instant query,回傳 {host: {metric: value}}.
|
||||
|
||||
host 來自 query 結果 label 'instance' 的 IP 前綴 (去掉 :9100).
|
||||
"""
|
||||
url = f"{settings.PROMETHEUS_URL.rstrip('/')}/api/v1/query"
|
||||
results: dict[str, dict[str, float]] = {}
|
||||
|
||||
async with httpx.AsyncClient(timeout=_HTTP_TIMEOUT_SEC, trust_env=False) as client:
|
||||
for metric_name, promql in _PROM_QUERIES.items():
|
||||
try:
|
||||
resp = await client.get(url, params={"query": promql})
|
||||
resp.raise_for_status()
|
||||
data = resp.json()
|
||||
if data.get("status") != "success":
|
||||
logger.warning("prom_query_non_success", metric=metric_name)
|
||||
continue
|
||||
for r in (data.get("data", {}) or {}).get("result", []) or []:
|
||||
instance = (r.get("metric", {}) or {}).get("instance", "")
|
||||
host = instance.split(":")[0] if instance else "unknown"
|
||||
val = r.get("value", [None, None])
|
||||
if val and len(val) >= 2:
|
||||
try:
|
||||
results.setdefault(host, {})[metric_name] = float(val[1])
|
||||
except (ValueError, TypeError):
|
||||
pass
|
||||
except Exception as e:
|
||||
logger.warning("prom_query_failed", metric=metric_name, error=str(e))
|
||||
continue
|
||||
return results
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# DB 寫入
|
||||
# ============================================================================
|
||||
|
||||
async def _write_snapshot(host: str, m: dict[str, float]) -> int | None:
|
||||
"""寫 host_capacity_snapshot,回傳 snapshot_id."""
|
||||
if not host or host == "unknown":
|
||||
return None
|
||||
|
||||
verdict, reasoning = _assess_verdict(m)
|
||||
|
||||
try:
|
||||
from sqlalchemy import text as _sql
|
||||
from src.db.base import get_db_context
|
||||
|
||||
async with get_db_context() as db:
|
||||
row = await db.execute(
|
||||
_sql("""
|
||||
INSERT INTO host_capacity_snapshot (
|
||||
host, captured_at,
|
||||
load1, load5, load15,
|
||||
cpu_used_pct, cpu_iowait_pct,
|
||||
mem_used_pct, swap_used_pct,
|
||||
ai_verdict, ai_reasoning,
|
||||
written_by_agent
|
||||
) VALUES (
|
||||
:host, NOW(),
|
||||
:l1, :l5, :l15,
|
||||
:cpu, :iowait,
|
||||
:mem, :swap,
|
||||
:verdict, :reason,
|
||||
'capacity_scanner'
|
||||
)
|
||||
RETURNING snapshot_id
|
||||
"""),
|
||||
{
|
||||
"host": host,
|
||||
"l1": m.get("load1"),
|
||||
"l5": m.get("load5"),
|
||||
"l15": m.get("load15"),
|
||||
"cpu": m.get("cpu_used_pct"),
|
||||
"iowait": m.get("cpu_iowait_pct"),
|
||||
"mem": m.get("mem_used_pct"),
|
||||
"swap": m.get("swap_used_pct"),
|
||||
"verdict": verdict,
|
||||
"reason": reasoning[:500],
|
||||
},
|
||||
)
|
||||
sid = row.scalar()
|
||||
return int(sid) if sid else None
|
||||
except Exception as e:
|
||||
logger.warning("capacity_snapshot_write_failed", host=host, error=str(e))
|
||||
return None
|
||||
|
||||
|
||||
async def _check_and_write_violations(host: str, m: dict[str, float]) -> int:
|
||||
"""超過硬閾值時寫 capacity_violation_event,回傳新增筆數."""
|
||||
from sqlalchemy import text as _sql
|
||||
from src.db.base import get_db_context
|
||||
|
||||
violations: list[tuple[str, float, float]] = []
|
||||
cpu = m.get("cpu_used_pct")
|
||||
mem = m.get("mem_used_pct")
|
||||
swap = m.get("swap_used_pct")
|
||||
if cpu is not None and cpu > _CPU_CRITICAL:
|
||||
violations.append(("cpu_over_threshold", _CPU_CRITICAL, cpu))
|
||||
if mem is not None and mem > _MEM_CRITICAL:
|
||||
violations.append(("mem_over_threshold", _MEM_CRITICAL, mem))
|
||||
if swap is not None and swap > _SWAP_CRITICAL:
|
||||
violations.append(("swap_over_threshold", _SWAP_CRITICAL, swap))
|
||||
|
||||
if not violations:
|
||||
return 0
|
||||
|
||||
written = 0
|
||||
try:
|
||||
async with get_db_context() as db:
|
||||
for vtype, threshold, actual in violations:
|
||||
await db.execute(
|
||||
_sql("""
|
||||
INSERT INTO capacity_violation_event (
|
||||
host, violation_type, threshold, actual_value,
|
||||
detected_at
|
||||
) VALUES (
|
||||
:host, :vt, :th, :av,
|
||||
NOW()
|
||||
)
|
||||
"""),
|
||||
{"host": host, "vt": vtype, "th": threshold, "av": actual},
|
||||
)
|
||||
written += 1
|
||||
except Exception as e:
|
||||
logger.warning("capacity_violation_write_failed", host=host, error=str(e))
|
||||
return written
|
||||
|
||||
|
||||
async def _log_aol(stats: dict[str, int], duration_ms: int, triggered_by: str, error: str | None) -> None:
|
||||
"""寫 aol(capacity_recommendation)."""
|
||||
try:
|
||||
from sqlalchemy import text as _sql
|
||||
from src.db.base import get_db_context
|
||||
|
||||
aol_status = "failed" if error else "success"
|
||||
input_payload = {"triggered_by": triggered_by, "source": "prometheus_node_exporter"}
|
||||
output_payload = dict(stats)
|
||||
|
||||
async with get_db_context() as db:
|
||||
await db.execute(
|
||||
_sql("""
|
||||
INSERT INTO automation_operation_log (
|
||||
operation_type, actor, status,
|
||||
input, output, duration_ms, error, tags
|
||||
) VALUES (
|
||||
'capacity_recommendation',
|
||||
'capacity_scanner',
|
||||
:st,
|
||||
CAST(:input AS jsonb),
|
||||
CAST(:output AS jsonb),
|
||||
:dur, :err, :tags
|
||||
)
|
||||
"""),
|
||||
{
|
||||
"st": aol_status,
|
||||
"input": _json.dumps(input_payload, ensure_ascii=False),
|
||||
"output": _json.dumps(output_payload, ensure_ascii=False),
|
||||
"dur": duration_ms,
|
||||
"err": (error or "")[:2000] if error else None,
|
||||
"tags": ["capacity", "scanner", "prometheus"],
|
||||
},
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning("capacity_aol_write_failed", error=str(e))
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Heuristic + 時間計算
|
||||
# ============================================================================
|
||||
|
||||
def _assess_verdict(m: dict[str, float]) -> tuple[str, str]:
|
||||
"""根據閾值給 ai_verdict (safe/warning/critical) + reasoning."""
|
||||
reasons = []
|
||||
max_level = 0 # 0=safe 1=warning 2=critical
|
||||
|
||||
cpu = m.get("cpu_used_pct")
|
||||
mem = m.get("mem_used_pct")
|
||||
swap = m.get("swap_used_pct")
|
||||
|
||||
if cpu is not None:
|
||||
if cpu > _CPU_CRITICAL:
|
||||
max_level = max(max_level, 2); reasons.append(f"cpu={cpu:.1f}% (>{_CPU_CRITICAL})")
|
||||
elif cpu > _CPU_WARNING:
|
||||
max_level = max(max_level, 1); reasons.append(f"cpu={cpu:.1f}% (>{_CPU_WARNING})")
|
||||
if mem is not None:
|
||||
if mem > _MEM_CRITICAL:
|
||||
max_level = max(max_level, 2); reasons.append(f"mem={mem:.1f}% (>{_MEM_CRITICAL})")
|
||||
elif mem > _MEM_WARNING:
|
||||
max_level = max(max_level, 1); reasons.append(f"mem={mem:.1f}% (>{_MEM_WARNING})")
|
||||
if swap is not None and swap > _SWAP_CRITICAL:
|
||||
max_level = max(max_level, 2); reasons.append(f"swap={swap:.1f}% (>{_SWAP_CRITICAL})")
|
||||
|
||||
verdict = ("safe", "warning", "critical")[max_level]
|
||||
reasoning = "; ".join(reasons) if reasons else "all metrics within safe range"
|
||||
return verdict, reasoning
|
||||
|
||||
|
||||
def _seconds_until_next_trigger() -> float:
|
||||
"""算到下個 02:00 Taipei 的秒數."""
|
||||
tz_taipei = timezone(timedelta(hours=8))
|
||||
now = datetime.now(tz_taipei)
|
||||
today_trigger = now.replace(hour=_DAILY_TRIGGER_HOUR_TAIPEI, minute=0, second=0, microsecond=0)
|
||||
if now >= today_trigger:
|
||||
today_trigger = today_trigger + timedelta(days=1)
|
||||
delta = (today_trigger - now).total_seconds()
|
||||
# 上限保護: 至少 300s,至多 25h
|
||||
return max(300.0, min(delta, 25 * 3600))
|
||||
299
apps/api/src/jobs/compliance_scanner_job.py
Normal file
299
apps/api/src/jobs/compliance_scanner_job.py
Normal file
@@ -0,0 +1,299 @@
|
||||
"""
|
||||
Compliance Scanner Job — ADR-090 § 合規掃描 MVP
|
||||
================================================
|
||||
每日 03:00 Taipei 遍歷 asset_inventory,為每個 active asset 寫 7 維 asset_compliance_snapshot.
|
||||
|
||||
職責邊界 (MVP):
|
||||
✅ 為所有 active asset 建立 7 維 snapshot 占位 (status='unknown')
|
||||
✅ 基礎檢查: secret asset 是否 > 90d 沒輪替 (K8s Secret createdAt)
|
||||
✅ 寫 automation_operation_log(coverage_recalculated) summary
|
||||
⏳ TODO: ssl_cert_valid (openssl s_client 檢查憑證到期)
|
||||
⏳ TODO: cve_scan (trivy image scan)
|
||||
⏳ TODO: backup_tested (查 16-cronjob-backup-restore-test 結果)
|
||||
|
||||
設計鐵律:
|
||||
- 7 個 dimension 固定: ssl_cert_valid / cve_scan / secret_rotated / backup_tested /
|
||||
audit_log_enabled / access_reviewed / encryption_at_rest
|
||||
- 未實作的 dimension 預設 status='unknown',後續 AI agent UPDATE
|
||||
- 每天一次 snapshot,歷史保留供 SLO 統計
|
||||
|
||||
排程:
|
||||
- 首次延遲 180s (capacity_scanner 之後)
|
||||
- 每日 03:00 Taipei 對齊
|
||||
|
||||
2026-04-19 ogt + Claude Opus 4.7 (1M context) Asia/Taipei
|
||||
ADR-090 § Compliance
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import json as _json
|
||||
import time as _time
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from typing import Any
|
||||
|
||||
import structlog
|
||||
|
||||
logger = structlog.get_logger(__name__)
|
||||
|
||||
# ============================================================================
|
||||
# 排程
|
||||
# ============================================================================
|
||||
_FIRST_DELAY_SEC = 180
|
||||
_LOOP_BACKOFF_SEC = 1800
|
||||
_DAILY_TRIGGER_HOUR_TAIPEI = 3
|
||||
|
||||
# 7 維 compliance (ADR-090 schema CHECK)
|
||||
_DIMENSIONS = (
|
||||
"ssl_cert_valid", "cve_scan", "secret_rotated", "backup_tested",
|
||||
"audit_log_enabled", "access_reviewed", "encryption_at_rest",
|
||||
)
|
||||
|
||||
# secret_rotated 閾值
|
||||
_SECRET_ROTATION_WARNING_DAYS = 90
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Public entry — main.py lifespan 呼叫
|
||||
# ============================================================================
|
||||
|
||||
async def run_compliance_scanner_loop() -> None:
|
||||
"""每日 03:00 Taipei 合規掃描."""
|
||||
logger.info("compliance_scanner_loop_started")
|
||||
await asyncio.sleep(_FIRST_DELAY_SEC)
|
||||
|
||||
while True:
|
||||
try:
|
||||
await scan_once()
|
||||
except Exception as e:
|
||||
logger.exception("compliance_scanner_loop_error", error=str(e))
|
||||
await asyncio.sleep(_LOOP_BACKOFF_SEC)
|
||||
continue
|
||||
|
||||
sleep_sec = _seconds_until_next_trigger()
|
||||
logger.info("compliance_scanner_next_tick", sleep_sec=sleep_sec)
|
||||
await asyncio.sleep(sleep_sec)
|
||||
|
||||
|
||||
async def scan_once(triggered_by: str = "cron") -> dict[str, int]:
|
||||
"""遍歷 asset_inventory 為每個 active asset 寫 7 維 compliance snapshot."""
|
||||
started_ms = _time.time()
|
||||
stats = {"assets_scanned": 0, "snapshots_written": 0, "violations": 0, "warnings": 0}
|
||||
error_msg: str | None = None
|
||||
|
||||
try:
|
||||
assets = await _fetch_active_assets()
|
||||
stats["assets_scanned"] = len(assets)
|
||||
|
||||
for asset in assets:
|
||||
s, v, w = await _write_compliance_for_asset(asset)
|
||||
stats["snapshots_written"] += s
|
||||
stats["violations"] += v
|
||||
stats["warnings"] += w
|
||||
except Exception as e:
|
||||
error_msg = f"{type(e).__name__}: {e}"[:1000]
|
||||
logger.exception("compliance_scan_once_failed", error=error_msg)
|
||||
|
||||
duration_ms = int((_time.time() - started_ms) * 1000)
|
||||
await _log_aol(stats=stats, duration_ms=duration_ms, triggered_by=triggered_by, error=error_msg)
|
||||
|
||||
logger.info(
|
||||
"compliance_scan_once_done",
|
||||
assets=stats["assets_scanned"],
|
||||
snapshots=stats["snapshots_written"],
|
||||
warnings=stats["warnings"],
|
||||
violations=stats["violations"],
|
||||
duration_ms=duration_ms,
|
||||
)
|
||||
return stats
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# DB 操作
|
||||
# ============================================================================
|
||||
|
||||
async def _fetch_active_assets() -> list[dict[str, Any]]:
|
||||
"""從 asset_inventory 撈所有 active asset."""
|
||||
from sqlalchemy import text as _sql
|
||||
from src.db.base import get_db_context
|
||||
|
||||
try:
|
||||
async with get_db_context() as db:
|
||||
result = await db.execute(
|
||||
_sql("""
|
||||
SELECT asset_id, asset_key, asset_type, metadata
|
||||
FROM asset_inventory
|
||||
WHERE lifecycle_state = 'active'
|
||||
ORDER BY asset_id
|
||||
"""),
|
||||
)
|
||||
rows = result.fetchall()
|
||||
return [
|
||||
{
|
||||
"asset_id": r.asset_id,
|
||||
"asset_key": r.asset_key,
|
||||
"asset_type": r.asset_type,
|
||||
"metadata": r.metadata or {},
|
||||
}
|
||||
for r in rows
|
||||
]
|
||||
except Exception as e:
|
||||
logger.warning("fetch_active_assets_failed", error=str(e))
|
||||
return []
|
||||
|
||||
|
||||
async def _write_compliance_for_asset(asset: dict[str, Any]) -> tuple[int, int, int]:
|
||||
"""
|
||||
為單一 asset 寫 7 維 compliance snapshot.
|
||||
|
||||
Returns: (snapshots_written, violations_count, warnings_count)
|
||||
"""
|
||||
from sqlalchemy import text as _sql
|
||||
from src.db.base import get_db_context
|
||||
|
||||
snapshots = 0
|
||||
violations = 0
|
||||
warnings = 0
|
||||
|
||||
# 為每個 dimension 評估 status (目前多數 'unknown',secret_rotated 有基礎邏輯)
|
||||
dimension_results = _evaluate_all_dimensions(asset)
|
||||
|
||||
try:
|
||||
async with get_db_context() as db:
|
||||
for dim, (status, detail) in dimension_results.items():
|
||||
await db.execute(
|
||||
_sql("""
|
||||
INSERT INTO asset_compliance_snapshot (
|
||||
asset_id, dimension, status, detail, detected_at
|
||||
) VALUES (
|
||||
:aid, :dim, :status, CAST(:detail AS jsonb), NOW()
|
||||
)
|
||||
"""),
|
||||
{
|
||||
"aid": asset["asset_id"],
|
||||
"dim": dim,
|
||||
"status": status,
|
||||
"detail": _json.dumps(detail, ensure_ascii=False),
|
||||
},
|
||||
)
|
||||
snapshots += 1
|
||||
if status == "violation":
|
||||
violations += 1
|
||||
elif status == "warning":
|
||||
warnings += 1
|
||||
except Exception as e:
|
||||
logger.warning("compliance_write_failed", asset_id=asset["asset_id"], error=str(e))
|
||||
|
||||
return snapshots, violations, warnings
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Compliance 評估邏輯 (MVP — 多數 unknown,留 TODO)
|
||||
# ============================================================================
|
||||
|
||||
def _evaluate_all_dimensions(asset: dict[str, Any]) -> dict[str, tuple[str, dict]]:
|
||||
"""
|
||||
為 asset 評估所有 7 維,回傳 {dimension: (status, detail)}.
|
||||
|
||||
MVP 策略:
|
||||
- secret_rotated: 對 asset_type='secret' 檢查 metadata.creationTimestamp
|
||||
- 其他 6 維: status='unknown' + detail 標註 TODO
|
||||
"""
|
||||
results: dict[str, tuple[str, dict]] = {}
|
||||
|
||||
# secret_rotated: 只對 secret 類型 asset 做真實檢查
|
||||
if asset["asset_type"] == "secret":
|
||||
results["secret_rotated"] = _check_secret_rotation(asset)
|
||||
else:
|
||||
results["secret_rotated"] = ("unknown", {"reason": "asset_type is not 'secret', N/A"})
|
||||
|
||||
# 其他 6 維佔位
|
||||
results["ssl_cert_valid"] = ("unknown", {"todo": "openssl s_client check (Phase 7.2)"})
|
||||
results["cve_scan"] = ("unknown", {"todo": "trivy image scan (Phase 7.3)"})
|
||||
results["backup_tested"] = ("unknown", {"todo": "pg-backup-restore-test cronjob 結果 (Phase 7.4)"})
|
||||
results["audit_log_enabled"] = ("unknown", {"todo": "audit_logs table 對應查詢 (Phase 7.5)"})
|
||||
results["access_reviewed"] = ("unknown", {"todo": "RBAC quarterly review (Phase 7.6)"})
|
||||
results["encryption_at_rest"] = ("unknown", {"todo": "PG TDE / K8s Secret encryption check (Phase 7.7)"})
|
||||
|
||||
return results
|
||||
|
||||
|
||||
def _check_secret_rotation(asset: dict[str, Any]) -> tuple[str, dict]:
|
||||
"""檢查 Secret 的 creationTimestamp,超過 90d 標 warning."""
|
||||
meta = asset.get("metadata", {})
|
||||
created_ts = meta.get("creationTimestamp") or meta.get("createdAt") or ""
|
||||
if not created_ts:
|
||||
return ("unknown", {"reason": "creationTimestamp not in metadata"})
|
||||
|
||||
try:
|
||||
if created_ts.endswith("Z"):
|
||||
created = datetime.fromisoformat(created_ts.replace("Z", "+00:00"))
|
||||
else:
|
||||
created = datetime.fromisoformat(created_ts)
|
||||
except (ValueError, TypeError):
|
||||
return ("unknown", {"reason": f"unparseable timestamp: {created_ts[:50]}"})
|
||||
|
||||
now_utc = datetime.now(timezone.utc)
|
||||
age_days = (now_utc - created).days
|
||||
|
||||
if age_days > _SECRET_ROTATION_WARNING_DAYS:
|
||||
return ("warning", {
|
||||
"age_days": age_days,
|
||||
"threshold_days": _SECRET_ROTATION_WARNING_DAYS,
|
||||
"message": f"Secret 已 {age_days} 天未輪替,超過 {_SECRET_ROTATION_WARNING_DAYS}d 閾值",
|
||||
})
|
||||
return ("compliant", {"age_days": age_days})
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# AOL
|
||||
# ============================================================================
|
||||
|
||||
async def _log_aol(stats: dict[str, int], duration_ms: int, triggered_by: str, error: str | None) -> None:
|
||||
try:
|
||||
from sqlalchemy import text as _sql
|
||||
from src.db.base import get_db_context
|
||||
|
||||
aol_status = "failed" if error else "success"
|
||||
|
||||
async with get_db_context() as db:
|
||||
await db.execute(
|
||||
_sql("""
|
||||
INSERT INTO automation_operation_log (
|
||||
operation_type, actor, status,
|
||||
input, output, duration_ms, error, tags
|
||||
) VALUES (
|
||||
'coverage_recalculated',
|
||||
'compliance_scanner',
|
||||
:st,
|
||||
CAST(:input AS jsonb),
|
||||
CAST(:output AS jsonb),
|
||||
:dur, :err, :tags
|
||||
)
|
||||
"""),
|
||||
{
|
||||
"st": aol_status,
|
||||
"input": _json.dumps({"triggered_by": triggered_by, "dimensions": list(_DIMENSIONS)}, ensure_ascii=False),
|
||||
"output": _json.dumps(stats, ensure_ascii=False),
|
||||
"dur": duration_ms,
|
||||
"err": (error or "")[:2000] if error else None,
|
||||
"tags": ["compliance", "scanner"],
|
||||
},
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning("compliance_aol_write_failed", error=str(e))
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# 時間計算
|
||||
# ============================================================================
|
||||
|
||||
def _seconds_until_next_trigger() -> float:
|
||||
"""算到下個 03:00 Taipei 的秒數."""
|
||||
tz_taipei = timezone(timedelta(hours=8))
|
||||
now = datetime.now(tz_taipei)
|
||||
today_trigger = now.replace(hour=_DAILY_TRIGGER_HOUR_TAIPEI, minute=0, second=0, microsecond=0)
|
||||
if now >= today_trigger:
|
||||
today_trigger = today_trigger + timedelta(days=1)
|
||||
delta = (today_trigger - now).total_seconds()
|
||||
return max(300.0, min(delta, 25 * 3600))
|
||||
@@ -390,6 +390,26 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]:
|
||||
except Exception as e:
|
||||
logger.warning("rule_catalog_sync_loop_schedule_failed", error=str(e))
|
||||
|
||||
# ADR-090 § Phase 4 NemoTron 容量巡檢 MVP (2026-04-19 ogt + Claude Opus 4.7 Asia/Taipei)
|
||||
# 每日 02:00 Taipei 撈 Prometheus node_exporter → 寫 host_capacity_snapshot + violations
|
||||
# 解鎖: Phase 4 Holt-Winters 預測有歷史資料 / 容量趨勢分析
|
||||
try:
|
||||
from src.jobs.capacity_scanner_job import run_capacity_scanner_loop
|
||||
asyncio.create_task(run_capacity_scanner_loop())
|
||||
logger.info("capacity_scanner_loop_scheduled", daily_trigger_hour_taipei=2)
|
||||
except Exception as e:
|
||||
logger.warning("capacity_scanner_loop_schedule_failed", error=str(e))
|
||||
|
||||
# ADR-090 § 合規掃描 MVP (2026-04-19 ogt + Claude Opus 4.7 Asia/Taipei)
|
||||
# 每日 03:00 Taipei 遍歷 asset_inventory → 寫 7 維 asset_compliance_snapshot
|
||||
# MVP: secret_rotated 真實檢查,其他 6 維占位 'unknown',後續 agent 補
|
||||
try:
|
||||
from src.jobs.compliance_scanner_job import run_compliance_scanner_loop
|
||||
asyncio.create_task(run_compliance_scanner_loop())
|
||||
logger.info("compliance_scanner_loop_scheduled", daily_trigger_hour_taipei=3)
|
||||
except Exception as e:
|
||||
logger.warning("compliance_scanner_loop_schedule_failed", error=str(e))
|
||||
|
||||
# ADR-076 Task 4: 每日 08:00 台北時間自動日度巡檢報告
|
||||
# 2026-04-14 Claude Haiku 4.5 Asia/Taipei
|
||||
try:
|
||||
|
||||
Reference in New Issue
Block a user