#!/usr/bin/env python3 """ AWOOOI Sensor Agent - Phase 5.5 神經末梢 ========================================= 極度輕量的主機告警採集代理,部署於 110/188 兩台原生 Linux 主機。 唯一職責:採集本地真實告警 → XADD → 188 Redis Event Bus 三層採集器: A. NodeMetricsCollector — node-exporter metrics (CPU/Mem/Disk/Load) B. JournalCollector — systemd journal ERROR/CRITICAL logs C. ServiceProbeCollector — TCP port 存活探測 設計鐵律: - 嚴禁 Incident/決策邏輯(防腦分裂) - 零依賴 AWOOOI 核心 DB - 純 Python + redis-py 即可運行 - fingerprint 去重:同一告警 10 分鐘只送一次 Stream: awoooi:signals (2026-03-27 ADR-038 更名,舊 stream:awoooi_signals 已廢棄) Redis: 192.168.0.188:6380/0 版本: 2.0.0 建立: 2026-04-09 (台北時區) 建立者: Claude Sonnet 4.6 (Phase 5.5 B1 實作) """ from __future__ import annotations import argparse import hashlib import json import os import socket import subprocess import sys import time import urllib.request from datetime import datetime, timedelta, timezone try: import redis except ImportError: print("[FATAL] redis-py not installed. Run: pip install redis") sys.exit(1) # ============================================================================ # 常量 # ============================================================================ TAIPEI_TZ = timezone(timedelta(hours=8)) STREAM_KEY = "awoooi:signals" # ADR-038 更名後正確 key DEDUP_TTL = 600 # 10 分鐘去重 TTL(秒) # DB 10: signal_worker 的 REDIS_URL (redis://192.168.0.188:6380/10) REDIS_URL = os.getenv("AWOOOI_REDIS_URL", "redis://192.168.0.188:6380/10") # 各主機服務探測配置 # key = hostname prefix 或 IP 最後段 HOST_CONFIGS: dict[str, dict] = { # 192.168.0.188 — AI+Web 中心 "188": { "node_exporter_url": "http://localhost:9100/metrics", "journal_enabled": True, "services": [ {"name": "PostgreSQL", "host": "127.0.0.1", "port": 5432}, {"name": "Redis", "host": "127.0.0.1", "port": 6380}, {"name": "Ollama", "host": "127.0.0.1", "port": 11434}, {"name": "Nginx", "host": "127.0.0.1", "port": 80}, {"name": "SigNoz", "host": "127.0.0.1", "port": 3301}, # OpenClaw 跑在 K3s (awoooi-prod),非 188 本機服務,不在此探測 ], }, # 192.168.0.110 — DevOps 金庫 "110": { "node_exporter_url": "http://localhost:9100/metrics", "journal_enabled": True, "services": [ {"name": "Harbor", "host": "127.0.0.1", "port": 5000}, {"name": "Gitea", "host": "127.0.0.1", "port": 3001}, {"name": "GH-Runner", "host": "127.0.0.1", "port": 8080}, ], }, } # 告警閾值 THRESHOLDS = { "cpu_pct_high": 85.0, # CPU > 85% → HIGH "mem_pct_high": 90.0, # Mem > 90% → HIGH "disk_pct_high": 85.0, # Disk > 85% → MEDIUM "load_factor": 2.0, # Load > cpu_count * 2 → MEDIUM "journal_err_min": 10, # 近5分鐘 ERROR 筆數 > 10 → MEDIUM } # ============================================================================ # A. NodeMetricsCollector # ============================================================================ def _fetch_metrics(url: str) -> dict[str, float]: """從 node-exporter 抓取指定 metrics,返回 {metric_key: value}""" try: with urllib.request.urlopen(url, timeout=5) as resp: text = resp.read().decode() except Exception: return {} result: dict[str, float] = {} for line in text.splitlines(): if line.startswith("#"): continue parts = line.split(" ") if len(parts) < 2: continue try: result[parts[0]] = float(parts[1]) except ValueError: continue return result def collect_node_metrics(url: str, hostname: str) -> list[dict]: """採集 CPU/Mem/Disk/Load 告警""" alerts = [] raw = _fetch_metrics(url) if not raw: return alerts # --- CPU usage --- idle_keys = [k for k in raw if "node_cpu_seconds_total" in k and 'mode="idle"' in k] total_keys = [k for k in raw if "node_cpu_seconds_total" in k] if idle_keys and total_keys: idle = sum(raw[k] for k in idle_keys) total = sum(raw[k] for k in total_keys) cpu_pct = (1.0 - idle / total) * 100 if total else 0.0 if cpu_pct > THRESHOLDS["cpu_pct_high"]: alerts.append({ "alert_name": "HostHighCpuLoad", "severity": "critical", "source": "node-exporter", "namespace": "infra", "target": hostname, "labels": {"cpu_pct": f"{cpu_pct:.1f}"}, "annotations": {"summary": f"CPU usage {cpu_pct:.1f}% > {THRESHOLDS['cpu_pct_high']}%"}, }) # --- Memory usage --- mem_total = raw.get("node_memory_MemTotal_bytes", 0) mem_avail = raw.get("node_memory_MemAvailable_bytes", 0) if mem_total > 0: mem_pct = (1.0 - mem_avail / mem_total) * 100 if mem_pct > THRESHOLDS["mem_pct_high"]: alerts.append({ "alert_name": "HostOutOfMemory", "severity": "critical", "source": "node-exporter", "namespace": "infra", "target": hostname, "labels": {"mem_pct": f"{mem_pct:.1f}"}, "annotations": {"summary": f"Memory usage {mem_pct:.1f}% > {THRESHOLDS['mem_pct_high']}%"}, }) # --- Disk usage (root filesystem) --- disk_size = raw.get('node_filesystem_size_bytes{mountpoint="/",fstype="ext4"}', 0) or \ raw.get('node_filesystem_size_bytes{mountpoint="/"}', 0) disk_free = raw.get('node_filesystem_free_bytes{mountpoint="/",fstype="ext4"}', 0) or \ raw.get('node_filesystem_free_bytes{mountpoint="/"}', 0) # fallback: 找第一個 mountpoint="/" if disk_size == 0: for k, v in raw.items(): if "node_filesystem_size_bytes" in k and 'mountpoint="/"' in k: disk_size = v break if disk_free == 0: for k, v in raw.items(): if "node_filesystem_free_bytes" in k and 'mountpoint="/"' in k: disk_free = v break if disk_size > 0: disk_pct = (1.0 - disk_free / disk_size) * 100 if disk_pct > THRESHOLDS["disk_pct_high"]: alerts.append({ "alert_name": "HostDiskAlmostFull", "severity": "warning", "source": "node-exporter", "namespace": "infra", "target": hostname, "labels": {"disk_pct": f"{disk_pct:.1f}"}, "annotations": {"summary": f"Disk usage {disk_pct:.1f}% > {THRESHOLDS['disk_pct_high']}%"}, }) # --- Load average --- load1 = raw.get("node_load1", 0) cpu_count = len([k for k in raw if "node_cpu_seconds_total" in k and 'mode="idle"' in k]) if cpu_count > 0 and load1 > cpu_count * THRESHOLDS["load_factor"]: alerts.append({ "alert_name": "HostHighLoadAverage", "severity": "warning", "source": "node-exporter", "namespace": "infra", "target": hostname, "labels": {"load1": f"{load1:.2f}", "cpu_count": str(cpu_count)}, "annotations": {"summary": f"Load {load1:.2f} > {cpu_count * THRESHOLDS['load_factor']:.0f} (cpu×{THRESHOLDS['load_factor']})"}, }) return alerts # ============================================================================ # B. JournalCollector # ============================================================================ def collect_journal_errors(hostname: str) -> list[dict]: """採集 systemd journal 近 5 分鐘 ERROR/CRITICAL 告警""" alerts = [] try: result = subprocess.run( ["journalctl", "-p", "err", "--since", "5 minutes ago", "--no-pager", "-q"], capture_output=True, text=True, timeout=10, ) lines = [l for l in result.stdout.splitlines() if l.strip()] count = len(lines) # OOM killer 特殊處理 oom_lines = [l for l in lines if "Out of memory" in l or "oom_kill" in l.lower()] if oom_lines: alerts.append({ "alert_name": "HostOOMKiller", "severity": "critical", "source": "journal", "namespace": "infra", "target": hostname, "labels": {"oom_count": str(len(oom_lines))}, "annotations": {"summary": f"OOM killer triggered {len(oom_lines)} times in last 5min"}, }) # kernel panic panic_lines = [l for l in lines if "kernel panic" in l.lower() or "Kernel panic" in l] if panic_lines: alerts.append({ "alert_name": "HostKernelPanic", "severity": "critical", "source": "journal", "namespace": "infra", "target": hostname, "labels": {}, "annotations": {"summary": "Kernel panic detected"}, }) # 一般 ERROR 洪水 if count > THRESHOLDS["journal_err_min"] and not oom_lines and not panic_lines: alerts.append({ "alert_name": "HostErrorLogFlood", "severity": "warning", "source": "journal", "namespace": "infra", "target": hostname, "labels": {"error_count": str(count)}, "annotations": {"summary": f"{count} ERROR log entries in last 5min"}, }) except (subprocess.TimeoutExpired, FileNotFoundError): pass # journalctl 不存在或超時,靜默跳過 return alerts # ============================================================================ # C. ServiceProbeCollector # ============================================================================ def collect_service_probes(services: list[dict], hostname: str) -> list[dict]: """TCP port 存活探測""" alerts = [] for svc in services: try: sock = socket.create_connection((svc["host"], svc["port"]), timeout=3) sock.close() except (OSError, ConnectionRefusedError, TimeoutError): alerts.append({ "alert_name": "ServiceDown", "severity": "critical", "source": "sensor-probe", "namespace": "infra", "target": f"{hostname}/{svc['name']}", "labels": {"service": svc["name"], "port": str(svc["port"])}, "annotations": {"summary": f"{svc['name']} port {svc['port']} unreachable on {hostname}"}, }) return alerts # ============================================================================ # Sensor Agent Core # ============================================================================ class SensorAgent: def __init__(self, redis_url: str = REDIS_URL) -> None: self.redis_url = redis_url self.hostname = socket.gethostname() self.host_ip = self._detect_ip() self.config = self._detect_config() self._redis: redis.Redis | None = None def _detect_ip(self) -> str: """取得主機 IP 最後段(用於對應 HOST_CONFIGS)""" try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(("192.168.0.188", 6380)) ip = s.getsockname()[0] s.close() return ip except Exception: return "unknown" def _detect_config(self) -> dict | None: """根據 IP 自動對應 HOST_CONFIGS""" last_octet = self.host_ip.split(".")[-1] if "." in self.host_ip else "" return HOST_CONFIGS.get(last_octet) def connect(self) -> bool: try: self._redis = redis.from_url( self.redis_url, decode_responses=True, socket_connect_timeout=5, ) self._redis.ping() print(f"[OK] Connected to Redis Event Bus ({self.redis_url.split('@')[-1]})") return True except redis.ConnectionError as e: print(f"[FATAL] Cannot connect to Redis: {e}") return False def _fingerprint(self, alert: dict) -> str: """生成告警 fingerprint(alert_name + target → 10 min 去重)""" key = f"{alert['alert_name']}:{alert['target']}" return hashlib.md5(key.encode()).hexdigest()[:16] def _is_dedup(self, fingerprint: str) -> bool: """檢查是否在去重 TTL 內""" dedup_key = f"sensor:dedup:{fingerprint}" return bool(self._redis.exists(dedup_key)) def _mark_sent(self, fingerprint: str) -> None: """標記已發送(設 TTL)""" dedup_key = f"sensor:dedup:{fingerprint}" self._redis.set(dedup_key, "1", ex=DEDUP_TTL) def send_signal(self, alert: dict) -> str | None: """發送 Signal 至 Event Bus(含去重)""" fingerprint = self._fingerprint(alert) if self._is_dedup(fingerprint): print(f" [SKIP] {alert['alert_name']} ({alert['target']}) — dedup ({DEDUP_TTL//60}min TTL)") return None now = datetime.now(TAIPEI_TZ) signal = { "alert_name": alert["alert_name"], "severity": alert["severity"], "source": alert.get("source", "sensor-agent"), "namespace": alert.get("namespace", "infra"), "target": alert["target"], "fingerprint": fingerprint, "labels": json.dumps(alert.get("labels", {})), "annotations": json.dumps(alert.get("annotations", {})), "received_at": now.isoformat(), "sensor_id": f"sensor-{self.hostname}", "sensor_host": self.hostname, "sensor_ip": self.host_ip, } try: message_id = self._redis.xadd(STREAM_KEY, signal) self._mark_sent(fingerprint) print(f" [SENT] {alert['alert_name']} → {message_id}") return message_id except redis.RedisError as e: print(f" [FAIL] XADD failed: {e}") return None def run_once(self) -> int: """執行一次採集,返回發送的告警數""" if not self.config: print(f"[WARN] Host {self.hostname} ({self.host_ip}) not in HOST_CONFIGS, skipping") return 0 print(f"\n[SCAN] {self.hostname} ({self.host_ip}) @ {datetime.now(TAIPEI_TZ).strftime('%H:%M:%S')}") all_alerts: list[dict] = [] # A. Node Metrics node_url = self.config.get("node_exporter_url") if node_url: metrics_alerts = collect_node_metrics(node_url, self.hostname) print(f" [NodeMetrics] {len(metrics_alerts)} alert(s)") all_alerts.extend(metrics_alerts) # B. Journal if self.config.get("journal_enabled"): journal_alerts = collect_journal_errors(self.hostname) print(f" [Journal] {len(journal_alerts)} alert(s)") all_alerts.extend(journal_alerts) # C. Service Probes services = self.config.get("services", []) if services: probe_alerts = collect_service_probes(services, self.hostname) print(f" [Probes] {len(probe_alerts)} alert(s)") all_alerts.extend(probe_alerts) sent = 0 for alert in all_alerts: if self.send_signal(alert): sent += 1 print(f"[DONE] {sent}/{len(all_alerts)} signals sent\n") return sent def close(self) -> None: if self._redis: self._redis.close() # ============================================================================ # CLI # ============================================================================ def main() -> int: parser = argparse.ArgumentParser(description="AWOOOI Sensor Agent v2.0 — 神經末梢告警採集代理") parser.add_argument("--once", action="store_true", help="單次採集後退出(cron 模式)") parser.add_argument("--loop", action="store_true", help="持續循環模式") parser.add_argument("--interval", type=int, default=60, help="循環間隔秒數(預設: 60)") parser.add_argument("--redis-url", type=str, help="Redis URL(預設讀取 AWOOOI_REDIS_URL)") args = parser.parse_args() agent = SensorAgent(redis_url=args.redis_url or REDIS_URL) print("=" * 60) print("AWOOOI Sensor Agent v2.0 — Phase 5.5 神經末梢") print("=" * 60) print(f"Host: {agent.hostname} ({agent.host_ip})") print(f"Stream: {STREAM_KEY}") print(f"Redis: {REDIS_URL.split('@')[-1]}") print() if not agent.connect(): return 1 try: if args.loop: print(f"[LOOP] 每 {args.interval}s 採集一次,Ctrl+C 停止\n") while True: agent.run_once() time.sleep(args.interval) else: # 預設 --once(cron 用) agent.run_once() except KeyboardInterrupt: print("\n[STOP] Interrupted") finally: agent.close() return 0 if __name__ == "__main__": sys.exit(main())