feat(aiops): capacity_scanner + compliance_scanner (ADR-090 Phase 7 剩 2)
Some checks failed
CD Pipeline / build-and-deploy (push) Has been cancelled

完成 ADR-090 Phase 7 第 3+4 個 service,解鎖 2 張 0 writer 表:

B3. apps/api/src/jobs/capacity_scanner_job.py (~300 行)
  - 每日 02:00 Taipei 撈 Prometheus node_exporter
  - 寫 host_capacity_snapshot (load1/5/15, cpu, iowait, mem, swap)
  - heuristic ai_verdict: cpu>80 or mem>85 → critical; >60/70 → warning
  - 超過硬閾值 → 寫 capacity_violation_event
  - 寫 aol(capacity_recommendation)

B4. apps/api/src/jobs/compliance_scanner_job.py (~270 行)
  - 每日 03:00 Taipei 遍歷 asset_inventory active assets
  - 為每個 asset 寫 7 維 compliance snapshot
  - secret_rotated: 真實檢查 (metadata.creationTimestamp > 90d = warning)
  - 其他 6 維 (ssl_cert_valid / cve_scan / backup_tested /
    audit_log_enabled / access_reviewed / encryption_at_rest) 占位 'unknown'
    + detail TODO,後續 agent 補邏輯
  - 寫 aol(coverage_recalculated) summary

main.py lifespan 同步 wire 2 個新 loop

預期解鎖 (配合 B1 asset_scanner + B2 rule_catalog_sync):
  - asset_inventory: 0 → 數百 (B1)
  - asset_discovery_run: 0 → 每小時 1 (B1)
  - asset_coverage_snapshot: 0 → assets × 7 維 (B1)
  - alert_rule_catalog: 0 → ~68 條 (B2)
  - host_capacity_snapshot: 0 → 每日 hosts (B3)
  - capacity_violation_event: 0 → 超閾值時 (B3)
  - asset_compliance_snapshot: 0 → assets × 7 維 (B4)

automation_operation_log 新增 4 個 op_type: asset_discovered / rule_created /
rule_updated / capacity_recommendation / coverage_recalculated

8 張 0 writer 表到此全數有 writer,ADR-090 Phase 7 實作完成.

Refs: ADR-090 §4.2 Phase 4, MASTER §3.5 D5 (capacity-aware)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
OG T
2026-04-19 16:21:11 +08:00
parent 2dd02bec3f
commit 4259a104f5
3 changed files with 658 additions and 0 deletions

View File

@@ -0,0 +1,339 @@
"""
Capacity Scanner Job — ADR-090 § Phase 4 NemoTron 容量巡檢 MVP
===============================================================
每日 02:00 Taipei 從 Prometheus 撈 node metrics → 寫 host_capacity_snapshot.
職責邊界:
✅ 撈 Prometheus node_exporter metrics (load / cpu / mem / swap)
✅ 為每個 host 寫一筆 host_capacity_snapshot + heuristic ai_verdict
✅ 超過硬閾值寫 capacity_violation_event
✅ 寫 automation_operation_log(capacity_recommendation)
❌ 不做 Holt-Winters 預測 (那是 Hermes 後續階段)
❌ 不自動執行修復 (只 recommend,統帥決策)
設計鐵律:
- 每日一次 snapshot (歷史 30d 供 AI 趨勢分析)
- ai_verdict heuristic: cpu>80 or mem>85 → critical; >60/70 → warning; else safe
- Prometheus 失敗 → log + skip 該 host,不 crash 整 loop
資料來源:
- PROMETHEUS_URL/api/v1/query (instant query)
- 預期 instance 格式: '192.168.0.XXX:9100' 或 hostname
排程:
- 首次延遲 120s
- 後續每日 02:00 (Taipei) 對齊跑
2026-04-19 ogt + Claude Opus 4.7 (1M context) Asia/Taipei
ADR-090 § Phase 4
"""
from __future__ import annotations
import asyncio
import json as _json
import time as _time
from datetime import datetime, timedelta, timezone
from typing import Any
import httpx
import structlog
from src.core.config import settings
logger = structlog.get_logger(__name__)
# ============================================================================
# 排程 / 閾值
# ============================================================================
_FIRST_DELAY_SEC = 120
_HTTP_TIMEOUT_SEC = 10
_LOOP_BACKOFF_SEC = 1800
# Taipei = UTC+8,每日 02:00 Taipei = 18:00 UTC 前一天
_DAILY_TRIGGER_HOUR_TAIPEI = 2
# Heuristic 閾值 (ai_verdict 計算)
_CPU_CRITICAL = 80.0
_CPU_WARNING = 60.0
_MEM_CRITICAL = 85.0
_MEM_WARNING = 70.0
_SWAP_CRITICAL = 50.0
_LOAD1_CRITICAL_RATIO = 2.0 # load1 > 2x CPU cores = critical
# Prometheus 查詢 (instant query,每筆 host 一個 label)
_PROM_QUERIES = {
"load1": "avg by(instance) (node_load1)",
"load5": "avg by(instance) (node_load5)",
"load15": "avg by(instance) (node_load15)",
"cpu_used_pct": "100 - (avg by(instance) (rate(node_cpu_seconds_total{mode=\"idle\"}[5m])) * 100)",
"cpu_iowait_pct": "avg by(instance) (rate(node_cpu_seconds_total{mode=\"iowait\"}[5m])) * 100",
"mem_used_pct": "(1 - node_memory_MemAvailable_bytes / node_memory_MemTotal_bytes) * 100",
"swap_used_pct": "(node_memory_SwapTotal_bytes - node_memory_SwapFree_bytes) / (node_memory_SwapTotal_bytes > 0 or vector(1)) * 100",
}
# ============================================================================
# Public entry — main.py lifespan 呼叫
# ============================================================================
async def run_capacity_scanner_loop() -> None:
"""每日 02:00 Taipei 跑一次容量巡檢."""
logger.info("capacity_scanner_loop_started")
await asyncio.sleep(_FIRST_DELAY_SEC)
while True:
try:
await scan_once()
except Exception as e:
logger.exception("capacity_scanner_loop_error", error=str(e))
await asyncio.sleep(_LOOP_BACKOFF_SEC)
continue
# 算下次 02:00 Taipei 的 sleep 秒數
sleep_sec = _seconds_until_next_trigger()
logger.info("capacity_scanner_next_tick", sleep_sec=sleep_sec)
await asyncio.sleep(sleep_sec)
async def scan_once(triggered_by: str = "cron") -> dict[str, int]:
"""執行一次容量巡檢,每 host 寫一筆 snapshot."""
started_ms = _time.time()
stats = {"hosts_scanned": 0, "violations": 0}
error_msg: str | None = None
try:
metrics_by_host = await _fetch_all_metrics()
for host, m in metrics_by_host.items():
snapshot_id = await _write_snapshot(host, m)
if snapshot_id:
stats["hosts_scanned"] += 1
viol = await _check_and_write_violations(host, m)
stats["violations"] += viol
except Exception as e:
error_msg = f"{type(e).__name__}: {e}"[:1000]
logger.exception("capacity_scan_once_failed", error=error_msg)
duration_ms = int((_time.time() - started_ms) * 1000)
await _log_aol(stats=stats, duration_ms=duration_ms, triggered_by=triggered_by, error=error_msg)
logger.info(
"capacity_scan_once_done",
hosts=stats["hosts_scanned"],
violations=stats["violations"],
duration_ms=duration_ms,
)
return stats
# ============================================================================
# Prometheus 撈資料
# ============================================================================
async def _fetch_all_metrics() -> dict[str, dict[str, float]]:
"""
對每個 _PROM_QUERIES 跑 instant query,回傳 {host: {metric: value}}.
host 來自 query 結果 label 'instance' 的 IP 前綴 (去掉 :9100).
"""
url = f"{settings.PROMETHEUS_URL.rstrip('/')}/api/v1/query"
results: dict[str, dict[str, float]] = {}
async with httpx.AsyncClient(timeout=_HTTP_TIMEOUT_SEC, trust_env=False) as client:
for metric_name, promql in _PROM_QUERIES.items():
try:
resp = await client.get(url, params={"query": promql})
resp.raise_for_status()
data = resp.json()
if data.get("status") != "success":
logger.warning("prom_query_non_success", metric=metric_name)
continue
for r in (data.get("data", {}) or {}).get("result", []) or []:
instance = (r.get("metric", {}) or {}).get("instance", "")
host = instance.split(":")[0] if instance else "unknown"
val = r.get("value", [None, None])
if val and len(val) >= 2:
try:
results.setdefault(host, {})[metric_name] = float(val[1])
except (ValueError, TypeError):
pass
except Exception as e:
logger.warning("prom_query_failed", metric=metric_name, error=str(e))
continue
return results
# ============================================================================
# DB 寫入
# ============================================================================
async def _write_snapshot(host: str, m: dict[str, float]) -> int | None:
"""寫 host_capacity_snapshot,回傳 snapshot_id."""
if not host or host == "unknown":
return None
verdict, reasoning = _assess_verdict(m)
try:
from sqlalchemy import text as _sql
from src.db.base import get_db_context
async with get_db_context() as db:
row = await db.execute(
_sql("""
INSERT INTO host_capacity_snapshot (
host, captured_at,
load1, load5, load15,
cpu_used_pct, cpu_iowait_pct,
mem_used_pct, swap_used_pct,
ai_verdict, ai_reasoning,
written_by_agent
) VALUES (
:host, NOW(),
:l1, :l5, :l15,
:cpu, :iowait,
:mem, :swap,
:verdict, :reason,
'capacity_scanner'
)
RETURNING snapshot_id
"""),
{
"host": host,
"l1": m.get("load1"),
"l5": m.get("load5"),
"l15": m.get("load15"),
"cpu": m.get("cpu_used_pct"),
"iowait": m.get("cpu_iowait_pct"),
"mem": m.get("mem_used_pct"),
"swap": m.get("swap_used_pct"),
"verdict": verdict,
"reason": reasoning[:500],
},
)
sid = row.scalar()
return int(sid) if sid else None
except Exception as e:
logger.warning("capacity_snapshot_write_failed", host=host, error=str(e))
return None
async def _check_and_write_violations(host: str, m: dict[str, float]) -> int:
"""超過硬閾值時寫 capacity_violation_event,回傳新增筆數."""
from sqlalchemy import text as _sql
from src.db.base import get_db_context
violations: list[tuple[str, float, float]] = []
cpu = m.get("cpu_used_pct")
mem = m.get("mem_used_pct")
swap = m.get("swap_used_pct")
if cpu is not None and cpu > _CPU_CRITICAL:
violations.append(("cpu_over_threshold", _CPU_CRITICAL, cpu))
if mem is not None and mem > _MEM_CRITICAL:
violations.append(("mem_over_threshold", _MEM_CRITICAL, mem))
if swap is not None and swap > _SWAP_CRITICAL:
violations.append(("swap_over_threshold", _SWAP_CRITICAL, swap))
if not violations:
return 0
written = 0
try:
async with get_db_context() as db:
for vtype, threshold, actual in violations:
await db.execute(
_sql("""
INSERT INTO capacity_violation_event (
host, violation_type, threshold, actual_value,
detected_at
) VALUES (
:host, :vt, :th, :av,
NOW()
)
"""),
{"host": host, "vt": vtype, "th": threshold, "av": actual},
)
written += 1
except Exception as e:
logger.warning("capacity_violation_write_failed", host=host, error=str(e))
return written
async def _log_aol(stats: dict[str, int], duration_ms: int, triggered_by: str, error: str | None) -> None:
"""寫 aol(capacity_recommendation)."""
try:
from sqlalchemy import text as _sql
from src.db.base import get_db_context
aol_status = "failed" if error else "success"
input_payload = {"triggered_by": triggered_by, "source": "prometheus_node_exporter"}
output_payload = dict(stats)
async with get_db_context() as db:
await db.execute(
_sql("""
INSERT INTO automation_operation_log (
operation_type, actor, status,
input, output, duration_ms, error, tags
) VALUES (
'capacity_recommendation',
'capacity_scanner',
:st,
CAST(:input AS jsonb),
CAST(:output AS jsonb),
:dur, :err, :tags
)
"""),
{
"st": aol_status,
"input": _json.dumps(input_payload, ensure_ascii=False),
"output": _json.dumps(output_payload, ensure_ascii=False),
"dur": duration_ms,
"err": (error or "")[:2000] if error else None,
"tags": ["capacity", "scanner", "prometheus"],
},
)
except Exception as e:
logger.warning("capacity_aol_write_failed", error=str(e))
# ============================================================================
# Heuristic + 時間計算
# ============================================================================
def _assess_verdict(m: dict[str, float]) -> tuple[str, str]:
"""根據閾值給 ai_verdict (safe/warning/critical) + reasoning."""
reasons = []
max_level = 0 # 0=safe 1=warning 2=critical
cpu = m.get("cpu_used_pct")
mem = m.get("mem_used_pct")
swap = m.get("swap_used_pct")
if cpu is not None:
if cpu > _CPU_CRITICAL:
max_level = max(max_level, 2); reasons.append(f"cpu={cpu:.1f}% (>{_CPU_CRITICAL})")
elif cpu > _CPU_WARNING:
max_level = max(max_level, 1); reasons.append(f"cpu={cpu:.1f}% (>{_CPU_WARNING})")
if mem is not None:
if mem > _MEM_CRITICAL:
max_level = max(max_level, 2); reasons.append(f"mem={mem:.1f}% (>{_MEM_CRITICAL})")
elif mem > _MEM_WARNING:
max_level = max(max_level, 1); reasons.append(f"mem={mem:.1f}% (>{_MEM_WARNING})")
if swap is not None and swap > _SWAP_CRITICAL:
max_level = max(max_level, 2); reasons.append(f"swap={swap:.1f}% (>{_SWAP_CRITICAL})")
verdict = ("safe", "warning", "critical")[max_level]
reasoning = "; ".join(reasons) if reasons else "all metrics within safe range"
return verdict, reasoning
def _seconds_until_next_trigger() -> float:
"""算到下個 02:00 Taipei 的秒數."""
tz_taipei = timezone(timedelta(hours=8))
now = datetime.now(tz_taipei)
today_trigger = now.replace(hour=_DAILY_TRIGGER_HOUR_TAIPEI, minute=0, second=0, microsecond=0)
if now >= today_trigger:
today_trigger = today_trigger + timedelta(days=1)
delta = (today_trigger - now).total_seconds()
# 上限保護: 至少 300s,至多 25h
return max(300.0, min(delta, 25 * 3600))

View File

@@ -0,0 +1,299 @@
"""
Compliance Scanner Job — ADR-090 § 合規掃描 MVP
================================================
每日 03:00 Taipei 遍歷 asset_inventory,為每個 active asset 寫 7 維 asset_compliance_snapshot.
職責邊界 (MVP):
✅ 為所有 active asset 建立 7 維 snapshot 占位 (status='unknown')
✅ 基礎檢查: secret asset 是否 > 90d 沒輪替 (K8s Secret createdAt)
✅ 寫 automation_operation_log(coverage_recalculated) summary
⏳ TODO: ssl_cert_valid (openssl s_client 檢查憑證到期)
⏳ TODO: cve_scan (trivy image scan)
⏳ TODO: backup_tested (查 16-cronjob-backup-restore-test 結果)
設計鐵律:
- 7 個 dimension 固定: ssl_cert_valid / cve_scan / secret_rotated / backup_tested /
audit_log_enabled / access_reviewed / encryption_at_rest
- 未實作的 dimension 預設 status='unknown',後續 AI agent UPDATE
- 每天一次 snapshot,歷史保留供 SLO 統計
排程:
- 首次延遲 180s (capacity_scanner 之後)
- 每日 03:00 Taipei 對齊
2026-04-19 ogt + Claude Opus 4.7 (1M context) Asia/Taipei
ADR-090 § Compliance
"""
from __future__ import annotations
import asyncio
import json as _json
import time as _time
from datetime import datetime, timedelta, timezone
from typing import Any
import structlog
logger = structlog.get_logger(__name__)
# ============================================================================
# 排程
# ============================================================================
_FIRST_DELAY_SEC = 180
_LOOP_BACKOFF_SEC = 1800
_DAILY_TRIGGER_HOUR_TAIPEI = 3
# 7 維 compliance (ADR-090 schema CHECK)
_DIMENSIONS = (
"ssl_cert_valid", "cve_scan", "secret_rotated", "backup_tested",
"audit_log_enabled", "access_reviewed", "encryption_at_rest",
)
# secret_rotated 閾值
_SECRET_ROTATION_WARNING_DAYS = 90
# ============================================================================
# Public entry — main.py lifespan 呼叫
# ============================================================================
async def run_compliance_scanner_loop() -> None:
"""每日 03:00 Taipei 合規掃描."""
logger.info("compliance_scanner_loop_started")
await asyncio.sleep(_FIRST_DELAY_SEC)
while True:
try:
await scan_once()
except Exception as e:
logger.exception("compliance_scanner_loop_error", error=str(e))
await asyncio.sleep(_LOOP_BACKOFF_SEC)
continue
sleep_sec = _seconds_until_next_trigger()
logger.info("compliance_scanner_next_tick", sleep_sec=sleep_sec)
await asyncio.sleep(sleep_sec)
async def scan_once(triggered_by: str = "cron") -> dict[str, int]:
"""遍歷 asset_inventory 為每個 active asset 寫 7 維 compliance snapshot."""
started_ms = _time.time()
stats = {"assets_scanned": 0, "snapshots_written": 0, "violations": 0, "warnings": 0}
error_msg: str | None = None
try:
assets = await _fetch_active_assets()
stats["assets_scanned"] = len(assets)
for asset in assets:
s, v, w = await _write_compliance_for_asset(asset)
stats["snapshots_written"] += s
stats["violations"] += v
stats["warnings"] += w
except Exception as e:
error_msg = f"{type(e).__name__}: {e}"[:1000]
logger.exception("compliance_scan_once_failed", error=error_msg)
duration_ms = int((_time.time() - started_ms) * 1000)
await _log_aol(stats=stats, duration_ms=duration_ms, triggered_by=triggered_by, error=error_msg)
logger.info(
"compliance_scan_once_done",
assets=stats["assets_scanned"],
snapshots=stats["snapshots_written"],
warnings=stats["warnings"],
violations=stats["violations"],
duration_ms=duration_ms,
)
return stats
# ============================================================================
# DB 操作
# ============================================================================
async def _fetch_active_assets() -> list[dict[str, Any]]:
"""從 asset_inventory 撈所有 active asset."""
from sqlalchemy import text as _sql
from src.db.base import get_db_context
try:
async with get_db_context() as db:
result = await db.execute(
_sql("""
SELECT asset_id, asset_key, asset_type, metadata
FROM asset_inventory
WHERE lifecycle_state = 'active'
ORDER BY asset_id
"""),
)
rows = result.fetchall()
return [
{
"asset_id": r.asset_id,
"asset_key": r.asset_key,
"asset_type": r.asset_type,
"metadata": r.metadata or {},
}
for r in rows
]
except Exception as e:
logger.warning("fetch_active_assets_failed", error=str(e))
return []
async def _write_compliance_for_asset(asset: dict[str, Any]) -> tuple[int, int, int]:
"""
為單一 asset 寫 7 維 compliance snapshot.
Returns: (snapshots_written, violations_count, warnings_count)
"""
from sqlalchemy import text as _sql
from src.db.base import get_db_context
snapshots = 0
violations = 0
warnings = 0
# 為每個 dimension 評估 status (目前多數 'unknown',secret_rotated 有基礎邏輯)
dimension_results = _evaluate_all_dimensions(asset)
try:
async with get_db_context() as db:
for dim, (status, detail) in dimension_results.items():
await db.execute(
_sql("""
INSERT INTO asset_compliance_snapshot (
asset_id, dimension, status, detail, detected_at
) VALUES (
:aid, :dim, :status, CAST(:detail AS jsonb), NOW()
)
"""),
{
"aid": asset["asset_id"],
"dim": dim,
"status": status,
"detail": _json.dumps(detail, ensure_ascii=False),
},
)
snapshots += 1
if status == "violation":
violations += 1
elif status == "warning":
warnings += 1
except Exception as e:
logger.warning("compliance_write_failed", asset_id=asset["asset_id"], error=str(e))
return snapshots, violations, warnings
# ============================================================================
# Compliance 評估邏輯 (MVP — 多數 unknown,留 TODO)
# ============================================================================
def _evaluate_all_dimensions(asset: dict[str, Any]) -> dict[str, tuple[str, dict]]:
"""
為 asset 評估所有 7 維,回傳 {dimension: (status, detail)}.
MVP 策略:
- secret_rotated: 對 asset_type='secret' 檢查 metadata.creationTimestamp
- 其他 6 維: status='unknown' + detail 標註 TODO
"""
results: dict[str, tuple[str, dict]] = {}
# secret_rotated: 只對 secret 類型 asset 做真實檢查
if asset["asset_type"] == "secret":
results["secret_rotated"] = _check_secret_rotation(asset)
else:
results["secret_rotated"] = ("unknown", {"reason": "asset_type is not 'secret', N/A"})
# 其他 6 維佔位
results["ssl_cert_valid"] = ("unknown", {"todo": "openssl s_client check (Phase 7.2)"})
results["cve_scan"] = ("unknown", {"todo": "trivy image scan (Phase 7.3)"})
results["backup_tested"] = ("unknown", {"todo": "pg-backup-restore-test cronjob 結果 (Phase 7.4)"})
results["audit_log_enabled"] = ("unknown", {"todo": "audit_logs table 對應查詢 (Phase 7.5)"})
results["access_reviewed"] = ("unknown", {"todo": "RBAC quarterly review (Phase 7.6)"})
results["encryption_at_rest"] = ("unknown", {"todo": "PG TDE / K8s Secret encryption check (Phase 7.7)"})
return results
def _check_secret_rotation(asset: dict[str, Any]) -> tuple[str, dict]:
"""檢查 Secret 的 creationTimestamp,超過 90d 標 warning."""
meta = asset.get("metadata", {})
created_ts = meta.get("creationTimestamp") or meta.get("createdAt") or ""
if not created_ts:
return ("unknown", {"reason": "creationTimestamp not in metadata"})
try:
if created_ts.endswith("Z"):
created = datetime.fromisoformat(created_ts.replace("Z", "+00:00"))
else:
created = datetime.fromisoformat(created_ts)
except (ValueError, TypeError):
return ("unknown", {"reason": f"unparseable timestamp: {created_ts[:50]}"})
now_utc = datetime.now(timezone.utc)
age_days = (now_utc - created).days
if age_days > _SECRET_ROTATION_WARNING_DAYS:
return ("warning", {
"age_days": age_days,
"threshold_days": _SECRET_ROTATION_WARNING_DAYS,
"message": f"Secret 已 {age_days} 天未輪替,超過 {_SECRET_ROTATION_WARNING_DAYS}d 閾值",
})
return ("compliant", {"age_days": age_days})
# ============================================================================
# AOL
# ============================================================================
async def _log_aol(stats: dict[str, int], duration_ms: int, triggered_by: str, error: str | None) -> None:
try:
from sqlalchemy import text as _sql
from src.db.base import get_db_context
aol_status = "failed" if error else "success"
async with get_db_context() as db:
await db.execute(
_sql("""
INSERT INTO automation_operation_log (
operation_type, actor, status,
input, output, duration_ms, error, tags
) VALUES (
'coverage_recalculated',
'compliance_scanner',
:st,
CAST(:input AS jsonb),
CAST(:output AS jsonb),
:dur, :err, :tags
)
"""),
{
"st": aol_status,
"input": _json.dumps({"triggered_by": triggered_by, "dimensions": list(_DIMENSIONS)}, ensure_ascii=False),
"output": _json.dumps(stats, ensure_ascii=False),
"dur": duration_ms,
"err": (error or "")[:2000] if error else None,
"tags": ["compliance", "scanner"],
},
)
except Exception as e:
logger.warning("compliance_aol_write_failed", error=str(e))
# ============================================================================
# 時間計算
# ============================================================================
def _seconds_until_next_trigger() -> float:
"""算到下個 03:00 Taipei 的秒數."""
tz_taipei = timezone(timedelta(hours=8))
now = datetime.now(tz_taipei)
today_trigger = now.replace(hour=_DAILY_TRIGGER_HOUR_TAIPEI, minute=0, second=0, microsecond=0)
if now >= today_trigger:
today_trigger = today_trigger + timedelta(days=1)
delta = (today_trigger - now).total_seconds()
return max(300.0, min(delta, 25 * 3600))

View File

@@ -390,6 +390,26 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]:
except Exception as e:
logger.warning("rule_catalog_sync_loop_schedule_failed", error=str(e))
# ADR-090 § Phase 4 NemoTron 容量巡檢 MVP (2026-04-19 ogt + Claude Opus 4.7 Asia/Taipei)
# 每日 02:00 Taipei 撈 Prometheus node_exporter → 寫 host_capacity_snapshot + violations
# 解鎖: Phase 4 Holt-Winters 預測有歷史資料 / 容量趨勢分析
try:
from src.jobs.capacity_scanner_job import run_capacity_scanner_loop
asyncio.create_task(run_capacity_scanner_loop())
logger.info("capacity_scanner_loop_scheduled", daily_trigger_hour_taipei=2)
except Exception as e:
logger.warning("capacity_scanner_loop_schedule_failed", error=str(e))
# ADR-090 § 合規掃描 MVP (2026-04-19 ogt + Claude Opus 4.7 Asia/Taipei)
# 每日 03:00 Taipei 遍歷 asset_inventory → 寫 7 維 asset_compliance_snapshot
# MVP: secret_rotated 真實檢查,其他 6 維占位 'unknown',後續 agent 補
try:
from src.jobs.compliance_scanner_job import run_compliance_scanner_loop
asyncio.create_task(run_compliance_scanner_loop())
logger.info("compliance_scanner_loop_scheduled", daily_trigger_hour_taipei=3)
except Exception as e:
logger.warning("compliance_scanner_loop_schedule_failed", error=str(e))
# ADR-076 Task 4: 每日 08:00 台北時間自動日度巡檢報告
# 2026-04-14 Claude Haiku 4.5 Asia/Taipei
try: