diff --git a/apps/api/src/jobs/capacity_scanner_job.py b/apps/api/src/jobs/capacity_scanner_job.py new file mode 100644 index 00000000..621e97fa --- /dev/null +++ b/apps/api/src/jobs/capacity_scanner_job.py @@ -0,0 +1,339 @@ +""" +Capacity Scanner Job — ADR-090 § Phase 4 NemoTron 容量巡檢 MVP +=============================================================== +每日 02:00 Taipei 從 Prometheus 撈 node metrics → 寫 host_capacity_snapshot. + +職責邊界: + ✅ 撈 Prometheus node_exporter metrics (load / cpu / mem / swap) + ✅ 為每個 host 寫一筆 host_capacity_snapshot + heuristic ai_verdict + ✅ 超過硬閾值寫 capacity_violation_event + ✅ 寫 automation_operation_log(capacity_recommendation) + ❌ 不做 Holt-Winters 預測 (那是 Hermes 後續階段) + ❌ 不自動執行修復 (只 recommend,統帥決策) + +設計鐵律: + - 每日一次 snapshot (歷史 30d 供 AI 趨勢分析) + - ai_verdict heuristic: cpu>80 or mem>85 → critical; >60/70 → warning; else safe + - Prometheus 失敗 → log + skip 該 host,不 crash 整 loop + +資料來源: + - PROMETHEUS_URL/api/v1/query (instant query) + - 預期 instance 格式: '192.168.0.XXX:9100' 或 hostname + +排程: + - 首次延遲 120s + - 後續每日 02:00 (Taipei) 對齊跑 + +2026-04-19 ogt + Claude Opus 4.7 (1M context) Asia/Taipei +ADR-090 § Phase 4 +""" +from __future__ import annotations + +import asyncio +import json as _json +import time as _time +from datetime import datetime, timedelta, timezone +from typing import Any + +import httpx +import structlog + +from src.core.config import settings + +logger = structlog.get_logger(__name__) + +# ============================================================================ +# 排程 / 閾值 +# ============================================================================ +_FIRST_DELAY_SEC = 120 +_HTTP_TIMEOUT_SEC = 10 +_LOOP_BACKOFF_SEC = 1800 + +# Taipei = UTC+8,每日 02:00 Taipei = 18:00 UTC 前一天 +_DAILY_TRIGGER_HOUR_TAIPEI = 2 + +# Heuristic 閾值 (ai_verdict 計算) +_CPU_CRITICAL = 80.0 +_CPU_WARNING = 60.0 +_MEM_CRITICAL = 85.0 +_MEM_WARNING = 70.0 +_SWAP_CRITICAL = 50.0 +_LOAD1_CRITICAL_RATIO = 2.0 # load1 > 2x CPU cores = critical + +# Prometheus 查詢 (instant query,每筆 host 一個 label) +_PROM_QUERIES = { + "load1": "avg by(instance) (node_load1)", + "load5": "avg by(instance) (node_load5)", + "load15": "avg by(instance) (node_load15)", + "cpu_used_pct": "100 - (avg by(instance) (rate(node_cpu_seconds_total{mode=\"idle\"}[5m])) * 100)", + "cpu_iowait_pct": "avg by(instance) (rate(node_cpu_seconds_total{mode=\"iowait\"}[5m])) * 100", + "mem_used_pct": "(1 - node_memory_MemAvailable_bytes / node_memory_MemTotal_bytes) * 100", + "swap_used_pct": "(node_memory_SwapTotal_bytes - node_memory_SwapFree_bytes) / (node_memory_SwapTotal_bytes > 0 or vector(1)) * 100", +} + + +# ============================================================================ +# Public entry — main.py lifespan 呼叫 +# ============================================================================ + +async def run_capacity_scanner_loop() -> None: + """每日 02:00 Taipei 跑一次容量巡檢.""" + logger.info("capacity_scanner_loop_started") + await asyncio.sleep(_FIRST_DELAY_SEC) + + while True: + try: + await scan_once() + except Exception as e: + logger.exception("capacity_scanner_loop_error", error=str(e)) + await asyncio.sleep(_LOOP_BACKOFF_SEC) + continue + + # 算下次 02:00 Taipei 的 sleep 秒數 + sleep_sec = _seconds_until_next_trigger() + logger.info("capacity_scanner_next_tick", sleep_sec=sleep_sec) + await asyncio.sleep(sleep_sec) + + +async def scan_once(triggered_by: str = "cron") -> dict[str, int]: + """執行一次容量巡檢,每 host 寫一筆 snapshot.""" + started_ms = _time.time() + stats = {"hosts_scanned": 0, "violations": 0} + error_msg: str | None = None + + try: + metrics_by_host = await _fetch_all_metrics() + for host, m in metrics_by_host.items(): + snapshot_id = await _write_snapshot(host, m) + if snapshot_id: + stats["hosts_scanned"] += 1 + viol = await _check_and_write_violations(host, m) + stats["violations"] += viol + except Exception as e: + error_msg = f"{type(e).__name__}: {e}"[:1000] + logger.exception("capacity_scan_once_failed", error=error_msg) + + duration_ms = int((_time.time() - started_ms) * 1000) + await _log_aol(stats=stats, duration_ms=duration_ms, triggered_by=triggered_by, error=error_msg) + + logger.info( + "capacity_scan_once_done", + hosts=stats["hosts_scanned"], + violations=stats["violations"], + duration_ms=duration_ms, + ) + return stats + + +# ============================================================================ +# Prometheus 撈資料 +# ============================================================================ + +async def _fetch_all_metrics() -> dict[str, dict[str, float]]: + """ + 對每個 _PROM_QUERIES 跑 instant query,回傳 {host: {metric: value}}. + + host 來自 query 結果 label 'instance' 的 IP 前綴 (去掉 :9100). + """ + url = f"{settings.PROMETHEUS_URL.rstrip('/')}/api/v1/query" + results: dict[str, dict[str, float]] = {} + + async with httpx.AsyncClient(timeout=_HTTP_TIMEOUT_SEC, trust_env=False) as client: + for metric_name, promql in _PROM_QUERIES.items(): + try: + resp = await client.get(url, params={"query": promql}) + resp.raise_for_status() + data = resp.json() + if data.get("status") != "success": + logger.warning("prom_query_non_success", metric=metric_name) + continue + for r in (data.get("data", {}) or {}).get("result", []) or []: + instance = (r.get("metric", {}) or {}).get("instance", "") + host = instance.split(":")[0] if instance else "unknown" + val = r.get("value", [None, None]) + if val and len(val) >= 2: + try: + results.setdefault(host, {})[metric_name] = float(val[1]) + except (ValueError, TypeError): + pass + except Exception as e: + logger.warning("prom_query_failed", metric=metric_name, error=str(e)) + continue + return results + + +# ============================================================================ +# DB 寫入 +# ============================================================================ + +async def _write_snapshot(host: str, m: dict[str, float]) -> int | None: + """寫 host_capacity_snapshot,回傳 snapshot_id.""" + if not host or host == "unknown": + return None + + verdict, reasoning = _assess_verdict(m) + + try: + from sqlalchemy import text as _sql + from src.db.base import get_db_context + + async with get_db_context() as db: + row = await db.execute( + _sql(""" + INSERT INTO host_capacity_snapshot ( + host, captured_at, + load1, load5, load15, + cpu_used_pct, cpu_iowait_pct, + mem_used_pct, swap_used_pct, + ai_verdict, ai_reasoning, + written_by_agent + ) VALUES ( + :host, NOW(), + :l1, :l5, :l15, + :cpu, :iowait, + :mem, :swap, + :verdict, :reason, + 'capacity_scanner' + ) + RETURNING snapshot_id + """), + { + "host": host, + "l1": m.get("load1"), + "l5": m.get("load5"), + "l15": m.get("load15"), + "cpu": m.get("cpu_used_pct"), + "iowait": m.get("cpu_iowait_pct"), + "mem": m.get("mem_used_pct"), + "swap": m.get("swap_used_pct"), + "verdict": verdict, + "reason": reasoning[:500], + }, + ) + sid = row.scalar() + return int(sid) if sid else None + except Exception as e: + logger.warning("capacity_snapshot_write_failed", host=host, error=str(e)) + return None + + +async def _check_and_write_violations(host: str, m: dict[str, float]) -> int: + """超過硬閾值時寫 capacity_violation_event,回傳新增筆數.""" + from sqlalchemy import text as _sql + from src.db.base import get_db_context + + violations: list[tuple[str, float, float]] = [] + cpu = m.get("cpu_used_pct") + mem = m.get("mem_used_pct") + swap = m.get("swap_used_pct") + if cpu is not None and cpu > _CPU_CRITICAL: + violations.append(("cpu_over_threshold", _CPU_CRITICAL, cpu)) + if mem is not None and mem > _MEM_CRITICAL: + violations.append(("mem_over_threshold", _MEM_CRITICAL, mem)) + if swap is not None and swap > _SWAP_CRITICAL: + violations.append(("swap_over_threshold", _SWAP_CRITICAL, swap)) + + if not violations: + return 0 + + written = 0 + try: + async with get_db_context() as db: + for vtype, threshold, actual in violations: + await db.execute( + _sql(""" + INSERT INTO capacity_violation_event ( + host, violation_type, threshold, actual_value, + detected_at + ) VALUES ( + :host, :vt, :th, :av, + NOW() + ) + """), + {"host": host, "vt": vtype, "th": threshold, "av": actual}, + ) + written += 1 + except Exception as e: + logger.warning("capacity_violation_write_failed", host=host, error=str(e)) + return written + + +async def _log_aol(stats: dict[str, int], duration_ms: int, triggered_by: str, error: str | None) -> None: + """寫 aol(capacity_recommendation).""" + try: + from sqlalchemy import text as _sql + from src.db.base import get_db_context + + aol_status = "failed" if error else "success" + input_payload = {"triggered_by": triggered_by, "source": "prometheus_node_exporter"} + output_payload = dict(stats) + + async with get_db_context() as db: + await db.execute( + _sql(""" + INSERT INTO automation_operation_log ( + operation_type, actor, status, + input, output, duration_ms, error, tags + ) VALUES ( + 'capacity_recommendation', + 'capacity_scanner', + :st, + CAST(:input AS jsonb), + CAST(:output AS jsonb), + :dur, :err, :tags + ) + """), + { + "st": aol_status, + "input": _json.dumps(input_payload, ensure_ascii=False), + "output": _json.dumps(output_payload, ensure_ascii=False), + "dur": duration_ms, + "err": (error or "")[:2000] if error else None, + "tags": ["capacity", "scanner", "prometheus"], + }, + ) + except Exception as e: + logger.warning("capacity_aol_write_failed", error=str(e)) + + +# ============================================================================ +# Heuristic + 時間計算 +# ============================================================================ + +def _assess_verdict(m: dict[str, float]) -> tuple[str, str]: + """根據閾值給 ai_verdict (safe/warning/critical) + reasoning.""" + reasons = [] + max_level = 0 # 0=safe 1=warning 2=critical + + cpu = m.get("cpu_used_pct") + mem = m.get("mem_used_pct") + swap = m.get("swap_used_pct") + + if cpu is not None: + if cpu > _CPU_CRITICAL: + max_level = max(max_level, 2); reasons.append(f"cpu={cpu:.1f}% (>{_CPU_CRITICAL})") + elif cpu > _CPU_WARNING: + max_level = max(max_level, 1); reasons.append(f"cpu={cpu:.1f}% (>{_CPU_WARNING})") + if mem is not None: + if mem > _MEM_CRITICAL: + max_level = max(max_level, 2); reasons.append(f"mem={mem:.1f}% (>{_MEM_CRITICAL})") + elif mem > _MEM_WARNING: + max_level = max(max_level, 1); reasons.append(f"mem={mem:.1f}% (>{_MEM_WARNING})") + if swap is not None and swap > _SWAP_CRITICAL: + max_level = max(max_level, 2); reasons.append(f"swap={swap:.1f}% (>{_SWAP_CRITICAL})") + + verdict = ("safe", "warning", "critical")[max_level] + reasoning = "; ".join(reasons) if reasons else "all metrics within safe range" + return verdict, reasoning + + +def _seconds_until_next_trigger() -> float: + """算到下個 02:00 Taipei 的秒數.""" + tz_taipei = timezone(timedelta(hours=8)) + now = datetime.now(tz_taipei) + today_trigger = now.replace(hour=_DAILY_TRIGGER_HOUR_TAIPEI, minute=0, second=0, microsecond=0) + if now >= today_trigger: + today_trigger = today_trigger + timedelta(days=1) + delta = (today_trigger - now).total_seconds() + # 上限保護: 至少 300s,至多 25h + return max(300.0, min(delta, 25 * 3600)) diff --git a/apps/api/src/jobs/compliance_scanner_job.py b/apps/api/src/jobs/compliance_scanner_job.py new file mode 100644 index 00000000..3176ba4e --- /dev/null +++ b/apps/api/src/jobs/compliance_scanner_job.py @@ -0,0 +1,299 @@ +""" +Compliance Scanner Job — ADR-090 § 合規掃描 MVP +================================================ +每日 03:00 Taipei 遍歷 asset_inventory,為每個 active asset 寫 7 維 asset_compliance_snapshot. + +職責邊界 (MVP): + ✅ 為所有 active asset 建立 7 維 snapshot 占位 (status='unknown') + ✅ 基礎檢查: secret asset 是否 > 90d 沒輪替 (K8s Secret createdAt) + ✅ 寫 automation_operation_log(coverage_recalculated) summary + ⏳ TODO: ssl_cert_valid (openssl s_client 檢查憑證到期) + ⏳ TODO: cve_scan (trivy image scan) + ⏳ TODO: backup_tested (查 16-cronjob-backup-restore-test 結果) + +設計鐵律: + - 7 個 dimension 固定: ssl_cert_valid / cve_scan / secret_rotated / backup_tested / + audit_log_enabled / access_reviewed / encryption_at_rest + - 未實作的 dimension 預設 status='unknown',後續 AI agent UPDATE + - 每天一次 snapshot,歷史保留供 SLO 統計 + +排程: + - 首次延遲 180s (capacity_scanner 之後) + - 每日 03:00 Taipei 對齊 + +2026-04-19 ogt + Claude Opus 4.7 (1M context) Asia/Taipei +ADR-090 § Compliance +""" +from __future__ import annotations + +import asyncio +import json as _json +import time as _time +from datetime import datetime, timedelta, timezone +from typing import Any + +import structlog + +logger = structlog.get_logger(__name__) + +# ============================================================================ +# 排程 +# ============================================================================ +_FIRST_DELAY_SEC = 180 +_LOOP_BACKOFF_SEC = 1800 +_DAILY_TRIGGER_HOUR_TAIPEI = 3 + +# 7 維 compliance (ADR-090 schema CHECK) +_DIMENSIONS = ( + "ssl_cert_valid", "cve_scan", "secret_rotated", "backup_tested", + "audit_log_enabled", "access_reviewed", "encryption_at_rest", +) + +# secret_rotated 閾值 +_SECRET_ROTATION_WARNING_DAYS = 90 + + +# ============================================================================ +# Public entry — main.py lifespan 呼叫 +# ============================================================================ + +async def run_compliance_scanner_loop() -> None: + """每日 03:00 Taipei 合規掃描.""" + logger.info("compliance_scanner_loop_started") + await asyncio.sleep(_FIRST_DELAY_SEC) + + while True: + try: + await scan_once() + except Exception as e: + logger.exception("compliance_scanner_loop_error", error=str(e)) + await asyncio.sleep(_LOOP_BACKOFF_SEC) + continue + + sleep_sec = _seconds_until_next_trigger() + logger.info("compliance_scanner_next_tick", sleep_sec=sleep_sec) + await asyncio.sleep(sleep_sec) + + +async def scan_once(triggered_by: str = "cron") -> dict[str, int]: + """遍歷 asset_inventory 為每個 active asset 寫 7 維 compliance snapshot.""" + started_ms = _time.time() + stats = {"assets_scanned": 0, "snapshots_written": 0, "violations": 0, "warnings": 0} + error_msg: str | None = None + + try: + assets = await _fetch_active_assets() + stats["assets_scanned"] = len(assets) + + for asset in assets: + s, v, w = await _write_compliance_for_asset(asset) + stats["snapshots_written"] += s + stats["violations"] += v + stats["warnings"] += w + except Exception as e: + error_msg = f"{type(e).__name__}: {e}"[:1000] + logger.exception("compliance_scan_once_failed", error=error_msg) + + duration_ms = int((_time.time() - started_ms) * 1000) + await _log_aol(stats=stats, duration_ms=duration_ms, triggered_by=triggered_by, error=error_msg) + + logger.info( + "compliance_scan_once_done", + assets=stats["assets_scanned"], + snapshots=stats["snapshots_written"], + warnings=stats["warnings"], + violations=stats["violations"], + duration_ms=duration_ms, + ) + return stats + + +# ============================================================================ +# DB 操作 +# ============================================================================ + +async def _fetch_active_assets() -> list[dict[str, Any]]: + """從 asset_inventory 撈所有 active asset.""" + from sqlalchemy import text as _sql + from src.db.base import get_db_context + + try: + async with get_db_context() as db: + result = await db.execute( + _sql(""" + SELECT asset_id, asset_key, asset_type, metadata + FROM asset_inventory + WHERE lifecycle_state = 'active' + ORDER BY asset_id + """), + ) + rows = result.fetchall() + return [ + { + "asset_id": r.asset_id, + "asset_key": r.asset_key, + "asset_type": r.asset_type, + "metadata": r.metadata or {}, + } + for r in rows + ] + except Exception as e: + logger.warning("fetch_active_assets_failed", error=str(e)) + return [] + + +async def _write_compliance_for_asset(asset: dict[str, Any]) -> tuple[int, int, int]: + """ + 為單一 asset 寫 7 維 compliance snapshot. + + Returns: (snapshots_written, violations_count, warnings_count) + """ + from sqlalchemy import text as _sql + from src.db.base import get_db_context + + snapshots = 0 + violations = 0 + warnings = 0 + + # 為每個 dimension 評估 status (目前多數 'unknown',secret_rotated 有基礎邏輯) + dimension_results = _evaluate_all_dimensions(asset) + + try: + async with get_db_context() as db: + for dim, (status, detail) in dimension_results.items(): + await db.execute( + _sql(""" + INSERT INTO asset_compliance_snapshot ( + asset_id, dimension, status, detail, detected_at + ) VALUES ( + :aid, :dim, :status, CAST(:detail AS jsonb), NOW() + ) + """), + { + "aid": asset["asset_id"], + "dim": dim, + "status": status, + "detail": _json.dumps(detail, ensure_ascii=False), + }, + ) + snapshots += 1 + if status == "violation": + violations += 1 + elif status == "warning": + warnings += 1 + except Exception as e: + logger.warning("compliance_write_failed", asset_id=asset["asset_id"], error=str(e)) + + return snapshots, violations, warnings + + +# ============================================================================ +# Compliance 評估邏輯 (MVP — 多數 unknown,留 TODO) +# ============================================================================ + +def _evaluate_all_dimensions(asset: dict[str, Any]) -> dict[str, tuple[str, dict]]: + """ + 為 asset 評估所有 7 維,回傳 {dimension: (status, detail)}. + + MVP 策略: + - secret_rotated: 對 asset_type='secret' 檢查 metadata.creationTimestamp + - 其他 6 維: status='unknown' + detail 標註 TODO + """ + results: dict[str, tuple[str, dict]] = {} + + # secret_rotated: 只對 secret 類型 asset 做真實檢查 + if asset["asset_type"] == "secret": + results["secret_rotated"] = _check_secret_rotation(asset) + else: + results["secret_rotated"] = ("unknown", {"reason": "asset_type is not 'secret', N/A"}) + + # 其他 6 維佔位 + results["ssl_cert_valid"] = ("unknown", {"todo": "openssl s_client check (Phase 7.2)"}) + results["cve_scan"] = ("unknown", {"todo": "trivy image scan (Phase 7.3)"}) + results["backup_tested"] = ("unknown", {"todo": "pg-backup-restore-test cronjob 結果 (Phase 7.4)"}) + results["audit_log_enabled"] = ("unknown", {"todo": "audit_logs table 對應查詢 (Phase 7.5)"}) + results["access_reviewed"] = ("unknown", {"todo": "RBAC quarterly review (Phase 7.6)"}) + results["encryption_at_rest"] = ("unknown", {"todo": "PG TDE / K8s Secret encryption check (Phase 7.7)"}) + + return results + + +def _check_secret_rotation(asset: dict[str, Any]) -> tuple[str, dict]: + """檢查 Secret 的 creationTimestamp,超過 90d 標 warning.""" + meta = asset.get("metadata", {}) + created_ts = meta.get("creationTimestamp") or meta.get("createdAt") or "" + if not created_ts: + return ("unknown", {"reason": "creationTimestamp not in metadata"}) + + try: + if created_ts.endswith("Z"): + created = datetime.fromisoformat(created_ts.replace("Z", "+00:00")) + else: + created = datetime.fromisoformat(created_ts) + except (ValueError, TypeError): + return ("unknown", {"reason": f"unparseable timestamp: {created_ts[:50]}"}) + + now_utc = datetime.now(timezone.utc) + age_days = (now_utc - created).days + + if age_days > _SECRET_ROTATION_WARNING_DAYS: + return ("warning", { + "age_days": age_days, + "threshold_days": _SECRET_ROTATION_WARNING_DAYS, + "message": f"Secret 已 {age_days} 天未輪替,超過 {_SECRET_ROTATION_WARNING_DAYS}d 閾值", + }) + return ("compliant", {"age_days": age_days}) + + +# ============================================================================ +# AOL +# ============================================================================ + +async def _log_aol(stats: dict[str, int], duration_ms: int, triggered_by: str, error: str | None) -> None: + try: + from sqlalchemy import text as _sql + from src.db.base import get_db_context + + aol_status = "failed" if error else "success" + + async with get_db_context() as db: + await db.execute( + _sql(""" + INSERT INTO automation_operation_log ( + operation_type, actor, status, + input, output, duration_ms, error, tags + ) VALUES ( + 'coverage_recalculated', + 'compliance_scanner', + :st, + CAST(:input AS jsonb), + CAST(:output AS jsonb), + :dur, :err, :tags + ) + """), + { + "st": aol_status, + "input": _json.dumps({"triggered_by": triggered_by, "dimensions": list(_DIMENSIONS)}, ensure_ascii=False), + "output": _json.dumps(stats, ensure_ascii=False), + "dur": duration_ms, + "err": (error or "")[:2000] if error else None, + "tags": ["compliance", "scanner"], + }, + ) + except Exception as e: + logger.warning("compliance_aol_write_failed", error=str(e)) + + +# ============================================================================ +# 時間計算 +# ============================================================================ + +def _seconds_until_next_trigger() -> float: + """算到下個 03:00 Taipei 的秒數.""" + tz_taipei = timezone(timedelta(hours=8)) + now = datetime.now(tz_taipei) + today_trigger = now.replace(hour=_DAILY_TRIGGER_HOUR_TAIPEI, minute=0, second=0, microsecond=0) + if now >= today_trigger: + today_trigger = today_trigger + timedelta(days=1) + delta = (today_trigger - now).total_seconds() + return max(300.0, min(delta, 25 * 3600)) diff --git a/apps/api/src/main.py b/apps/api/src/main.py index 337c5854..04fc858a 100644 --- a/apps/api/src/main.py +++ b/apps/api/src/main.py @@ -390,6 +390,26 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]: except Exception as e: logger.warning("rule_catalog_sync_loop_schedule_failed", error=str(e)) + # ADR-090 § Phase 4 NemoTron 容量巡檢 MVP (2026-04-19 ogt + Claude Opus 4.7 Asia/Taipei) + # 每日 02:00 Taipei 撈 Prometheus node_exporter → 寫 host_capacity_snapshot + violations + # 解鎖: Phase 4 Holt-Winters 預測有歷史資料 / 容量趨勢分析 + try: + from src.jobs.capacity_scanner_job import run_capacity_scanner_loop + asyncio.create_task(run_capacity_scanner_loop()) + logger.info("capacity_scanner_loop_scheduled", daily_trigger_hour_taipei=2) + except Exception as e: + logger.warning("capacity_scanner_loop_schedule_failed", error=str(e)) + + # ADR-090 § 合規掃描 MVP (2026-04-19 ogt + Claude Opus 4.7 Asia/Taipei) + # 每日 03:00 Taipei 遍歷 asset_inventory → 寫 7 維 asset_compliance_snapshot + # MVP: secret_rotated 真實檢查,其他 6 維占位 'unknown',後續 agent 補 + try: + from src.jobs.compliance_scanner_job import run_compliance_scanner_loop + asyncio.create_task(run_compliance_scanner_loop()) + logger.info("compliance_scanner_loop_scheduled", daily_trigger_hour_taipei=3) + except Exception as e: + logger.warning("compliance_scanner_loop_schedule_failed", error=str(e)) + # ADR-076 Task 4: 每日 08:00 台北時間自動日度巡檢報告 # 2026-04-14 Claude Haiku 4.5 Asia/Taipei try: