diff --git a/apps/sensor/agent.py b/apps/sensor/agent.py index aecf4daf..a873d31d 100644 --- a/apps/sensor/agent.py +++ b/apps/sensor/agent.py @@ -1,98 +1,293 @@ #!/usr/bin/env python3 """ -AWOOOI Sensor Agent - Phase 6.5 神經末梢 +AWOOOI Sensor Agent - Phase 5.5 神經末梢 ========================================= -極度輕量的告警採集代理,部署於各主機。 -唯一職責:採集本地告警 → 無腦轉發至 188 基地 Event Bus +極度輕量的主機告警採集代理,部署於 110/188 兩台原生 Linux 主機。 +唯一職責:採集本地真實告警 → XADD → 188 Redis Event Bus + +三層採集器: + A. NodeMetricsCollector — node-exporter metrics (CPU/Mem/Disk/Load) + B. JournalCollector — systemd journal ERROR/CRITICAL logs + C. ServiceProbeCollector — TCP port 存活探測 設計鐵律: -- 嚴禁 Incident/GraphRAG 邏輯 (防腦分裂) -- 零依賴 AWOOOI 核心資料庫 -- 純 Python + Redis 即可運行 + - 嚴禁 Incident/決策邏輯(防腦分裂) + - 零依賴 AWOOOI 核心 DB + - 純 Python + redis-py 即可運行 + - fingerprint 去重:同一告警 10 分鐘只送一次 -使用方式: - # 設定環境變數 - export AWOOOI_REDIS_URL="redis://192.168.68.188:6379/0" +Stream: awoooi:signals (2026-03-27 ADR-038 更名,舊 stream:awoooi_signals 已廢棄) +Redis: 192.168.0.188:6380/0 - # 執行代理 (發送模擬告警) - python agent.py - - # 持續監控模式 (每 30 秒發送一次) - python agent.py --loop --interval 30 - -Version: 1.0.0 -Date: 2026-03-22 +版本: 2.0.0 +建立: 2026-04-09 (台北時區) +建立者: Claude Sonnet 4.6 (Phase 5.5 B1 實作) """ +from __future__ import annotations + import argparse +import hashlib import json import os -import random import socket +import subprocess import sys import time +import urllib.request from datetime import datetime, timedelta, timezone -# 台北時區 (UTC+8) - 系統統一時區 -TAIPEI_TZ = timezone(timedelta(hours=8)) -from typing import Any -from uuid import uuid4 - -# ============================================================================ -# 唯一外部依賴:redis-py (pip install redis) -# ============================================================================ try: import redis except ImportError: print("[FATAL] redis-py not installed. Run: pip install redis") sys.exit(1) +# ============================================================================ +# 常量 +# ============================================================================ + +TAIPEI_TZ = timezone(timedelta(hours=8)) +STREAM_KEY = "awoooi:signals" # ADR-038 更名後正確 key +DEDUP_TTL = 600 # 10 分鐘去重 TTL(秒) +# DB 10: signal_worker 的 REDIS_URL (redis://192.168.0.188:6380/10) +REDIS_URL = os.getenv("AWOOOI_REDIS_URL", "redis://192.168.0.188:6380/10") + +# 各主機服務探測配置 +# key = hostname prefix 或 IP 最後段 +HOST_CONFIGS: dict[str, dict] = { + # 192.168.0.188 — AI+Web 中心 + "188": { + "node_exporter_url": "http://localhost:9100/metrics", + "journal_enabled": True, + "services": [ + {"name": "PostgreSQL", "host": "127.0.0.1", "port": 5432}, + {"name": "Redis", "host": "127.0.0.1", "port": 6380}, + {"name": "Ollama", "host": "127.0.0.1", "port": 11434}, + {"name": "Nginx", "host": "127.0.0.1", "port": 80}, + {"name": "SigNoz", "host": "127.0.0.1", "port": 3301}, + # OpenClaw 跑在 K3s (awoooi-prod),非 188 本機服務,不在此探測 + ], + }, + # 192.168.0.110 — DevOps 金庫 + "110": { + "node_exporter_url": "http://localhost:9100/metrics", + "journal_enabled": True, + "services": [ + {"name": "Harbor", "host": "127.0.0.1", "port": 5000}, + {"name": "Gitea", "host": "127.0.0.1", "port": 3001}, + {"name": "GH-Runner", "host": "127.0.0.1", "port": 8080}, + ], + }, +} + +# 告警閾值 +THRESHOLDS = { + "cpu_pct_high": 85.0, # CPU > 85% → HIGH + "mem_pct_high": 90.0, # Mem > 90% → HIGH + "disk_pct_high": 85.0, # Disk > 85% → MEDIUM + "load_factor": 2.0, # Load > cpu_count * 2 → MEDIUM + "journal_err_min": 10, # 近5分鐘 ERROR 筆數 > 10 → MEDIUM +} + # ============================================================================ -# 常量定義 +# A. NodeMetricsCollector # ============================================================================ -STREAM_NAME = "stream:awoooi_signals" -DEFAULT_REDIS_URL = "redis://192.168.68.188:6379/0" -# 模擬告警模板 (實際部署時會讀取 Prometheus/Alertmanager) -MOCK_ALERTS = [ - { - "alert_name": "PodCrashLoopBackOff", - "severity": "critical", - "source": "prometheus", - "namespace": "production", - "target": "payment-service", - }, - { - "alert_name": "HighLatencyP99", - "severity": "warning", - "source": "prometheus", - "namespace": "production", - "target": "api-gateway", - }, - { - "alert_name": "HighErrorRate", - "severity": "critical", - "source": "prometheus", - "namespace": "staging", - "target": "order-service", - }, - { - "alert_name": "MemoryPressure", - "severity": "warning", - "source": "node-exporter", - "namespace": "infra", - "target": "k3s-worker-01", - }, - { - "alert_name": "FINAL_PHASE_6_TEST", - "severity": "critical", - "source": "sensor-agent", - "namespace": "production", - "target": "awoooi-brain", - }, -] +def _fetch_metrics(url: str) -> dict[str, float]: + """從 node-exporter 抓取指定 metrics,返回 {metric_key: value}""" + try: + with urllib.request.urlopen(url, timeout=5) as resp: + text = resp.read().decode() + except Exception: + return {} + + result: dict[str, float] = {} + for line in text.splitlines(): + if line.startswith("#"): + continue + parts = line.split(" ") + if len(parts) < 2: + continue + try: + result[parts[0]] = float(parts[1]) + except ValueError: + continue + return result + + +def collect_node_metrics(url: str, hostname: str) -> list[dict]: + """採集 CPU/Mem/Disk/Load 告警""" + alerts = [] + raw = _fetch_metrics(url) + if not raw: + return alerts + + # --- CPU usage --- + idle_keys = [k for k in raw if "node_cpu_seconds_total" in k and 'mode="idle"' in k] + total_keys = [k for k in raw if "node_cpu_seconds_total" in k] + if idle_keys and total_keys: + idle = sum(raw[k] for k in idle_keys) + total = sum(raw[k] for k in total_keys) + cpu_pct = (1.0 - idle / total) * 100 if total else 0.0 + if cpu_pct > THRESHOLDS["cpu_pct_high"]: + alerts.append({ + "alert_name": "HostHighCpuLoad", + "severity": "critical", + "source": "node-exporter", + "namespace": "infra", + "target": hostname, + "labels": {"cpu_pct": f"{cpu_pct:.1f}"}, + "annotations": {"summary": f"CPU usage {cpu_pct:.1f}% > {THRESHOLDS['cpu_pct_high']}%"}, + }) + + # --- Memory usage --- + mem_total = raw.get("node_memory_MemTotal_bytes", 0) + mem_avail = raw.get("node_memory_MemAvailable_bytes", 0) + if mem_total > 0: + mem_pct = (1.0 - mem_avail / mem_total) * 100 + if mem_pct > THRESHOLDS["mem_pct_high"]: + alerts.append({ + "alert_name": "HostOutOfMemory", + "severity": "critical", + "source": "node-exporter", + "namespace": "infra", + "target": hostname, + "labels": {"mem_pct": f"{mem_pct:.1f}"}, + "annotations": {"summary": f"Memory usage {mem_pct:.1f}% > {THRESHOLDS['mem_pct_high']}%"}, + }) + + # --- Disk usage (root filesystem) --- + disk_size = raw.get('node_filesystem_size_bytes{mountpoint="/",fstype="ext4"}', 0) or \ + raw.get('node_filesystem_size_bytes{mountpoint="/"}', 0) + disk_free = raw.get('node_filesystem_free_bytes{mountpoint="/",fstype="ext4"}', 0) or \ + raw.get('node_filesystem_free_bytes{mountpoint="/"}', 0) + + # fallback: 找第一個 mountpoint="/" + if disk_size == 0: + for k, v in raw.items(): + if "node_filesystem_size_bytes" in k and 'mountpoint="/"' in k: + disk_size = v + break + if disk_free == 0: + for k, v in raw.items(): + if "node_filesystem_free_bytes" in k and 'mountpoint="/"' in k: + disk_free = v + break + + if disk_size > 0: + disk_pct = (1.0 - disk_free / disk_size) * 100 + if disk_pct > THRESHOLDS["disk_pct_high"]: + alerts.append({ + "alert_name": "HostDiskAlmostFull", + "severity": "warning", + "source": "node-exporter", + "namespace": "infra", + "target": hostname, + "labels": {"disk_pct": f"{disk_pct:.1f}"}, + "annotations": {"summary": f"Disk usage {disk_pct:.1f}% > {THRESHOLDS['disk_pct_high']}%"}, + }) + + # --- Load average --- + load1 = raw.get("node_load1", 0) + cpu_count = len([k for k in raw if "node_cpu_seconds_total" in k and 'mode="idle"' in k]) + if cpu_count > 0 and load1 > cpu_count * THRESHOLDS["load_factor"]: + alerts.append({ + "alert_name": "HostHighLoadAverage", + "severity": "warning", + "source": "node-exporter", + "namespace": "infra", + "target": hostname, + "labels": {"load1": f"{load1:.2f}", "cpu_count": str(cpu_count)}, + "annotations": {"summary": f"Load {load1:.2f} > {cpu_count * THRESHOLDS['load_factor']:.0f} (cpu×{THRESHOLDS['load_factor']})"}, + }) + + return alerts + + +# ============================================================================ +# B. JournalCollector +# ============================================================================ + +def collect_journal_errors(hostname: str) -> list[dict]: + """採集 systemd journal 近 5 分鐘 ERROR/CRITICAL 告警""" + alerts = [] + try: + result = subprocess.run( + ["journalctl", "-p", "err", "--since", "5 minutes ago", "--no-pager", "-q"], + capture_output=True, text=True, timeout=10, + ) + lines = [l for l in result.stdout.splitlines() if l.strip()] + count = len(lines) + + # OOM killer 特殊處理 + oom_lines = [l for l in lines if "Out of memory" in l or "oom_kill" in l.lower()] + if oom_lines: + alerts.append({ + "alert_name": "HostOOMKiller", + "severity": "critical", + "source": "journal", + "namespace": "infra", + "target": hostname, + "labels": {"oom_count": str(len(oom_lines))}, + "annotations": {"summary": f"OOM killer triggered {len(oom_lines)} times in last 5min"}, + }) + + # kernel panic + panic_lines = [l for l in lines if "kernel panic" in l.lower() or "Kernel panic" in l] + if panic_lines: + alerts.append({ + "alert_name": "HostKernelPanic", + "severity": "critical", + "source": "journal", + "namespace": "infra", + "target": hostname, + "labels": {}, + "annotations": {"summary": "Kernel panic detected"}, + }) + + # 一般 ERROR 洪水 + if count > THRESHOLDS["journal_err_min"] and not oom_lines and not panic_lines: + alerts.append({ + "alert_name": "HostErrorLogFlood", + "severity": "warning", + "source": "journal", + "namespace": "infra", + "target": hostname, + "labels": {"error_count": str(count)}, + "annotations": {"summary": f"{count} ERROR log entries in last 5min"}, + }) + + except (subprocess.TimeoutExpired, FileNotFoundError): + pass # journalctl 不存在或超時,靜默跳過 + + return alerts + + +# ============================================================================ +# C. ServiceProbeCollector +# ============================================================================ + +def collect_service_probes(services: list[dict], hostname: str) -> list[dict]: + """TCP port 存活探測""" + alerts = [] + for svc in services: + try: + sock = socket.create_connection((svc["host"], svc["port"]), timeout=3) + sock.close() + except (OSError, ConnectionRefusedError, TimeoutError): + alerts.append({ + "alert_name": "ServiceDown", + "severity": "critical", + "source": "sensor-probe", + "namespace": "infra", + "target": f"{hostname}/{svc['name']}", + "labels": {"service": svc["name"], "port": str(svc["port"])}, + "annotations": {"summary": f"{svc['name']} port {svc['port']} unreachable on {hostname}"}, + }) + return alerts # ============================================================================ @@ -100,204 +295,172 @@ MOCK_ALERTS = [ # ============================================================================ class SensorAgent: - """ - 神經末梢 - 極簡告警採集代理 - - 職責: - 1. 採集本地告警 (或模擬生成) - 2. 格式化為標準 Signal - 3. 透過 Redis XADD 打入 188 基地 Event Bus - - 嚴禁邏輯: - - Incident 聚合 (由 188 大腦負責) - - GraphRAG 分析 (由 188 大腦負責) - - 任何決策邏輯 (由 188 大腦負責) - """ - - def __init__(self, redis_url: str | None = None) -> None: - self.redis_url = redis_url or os.getenv("AWOOOI_REDIS_URL", DEFAULT_REDIS_URL) + def __init__(self, redis_url: str = REDIS_URL) -> None: + self.redis_url = redis_url self.hostname = socket.gethostname() - self.sensor_id = f"sensor-{self.hostname}" + self.host_ip = self._detect_ip() + self.config = self._detect_config() self._redis: redis.Redis | None = None + def _detect_ip(self) -> str: + """取得主機 IP 最後段(用於對應 HOST_CONFIGS)""" + try: + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + s.connect(("192.168.0.188", 6380)) + ip = s.getsockname()[0] + s.close() + return ip + except Exception: + return "unknown" + + def _detect_config(self) -> dict | None: + """根據 IP 自動對應 HOST_CONFIGS""" + last_octet = self.host_ip.split(".")[-1] if "." in self.host_ip else "" + return HOST_CONFIGS.get(last_octet) + def connect(self) -> bool: - """連線至 188 基地 Redis""" try: self._redis = redis.from_url( self.redis_url, decode_responses=True, socket_connect_timeout=5, ) - # 測試連線 self._redis.ping() - print(f"[OK] Connected to 188 Event Bus: {self._mask_url(self.redis_url)}") + print(f"[OK] Connected to Redis Event Bus ({self.redis_url.split('@')[-1]})") return True except redis.ConnectionError as e: - print(f"[FATAL] Cannot connect to 188 Event Bus: {e}") + print(f"[FATAL] Cannot connect to Redis: {e}") return False - def _mask_url(self, url: str) -> str: - """遮蔽密碼""" - if "@" in url: - parts = url.split("@") - return f"redis://***@{parts[-1]}" - return url + def _fingerprint(self, alert: dict) -> str: + """生成告警 fingerprint(alert_name + target → 10 min 去重)""" + key = f"{alert['alert_name']}:{alert['target']}" + return hashlib.md5(key.encode()).hexdigest()[:16] - def send_signal(self, alert: dict[str, Any]) -> str | None: - """ - 發送單一 Signal 至 Event Bus + def _is_dedup(self, fingerprint: str) -> bool: + """檢查是否在去重 TTL 內""" + dedup_key = f"sensor:dedup:{fingerprint}" + return bool(self._redis.exists(dedup_key)) - 無腦轉發邏輯: - 1. 補齊必要欄位 (fingerprint, timestamp, sensor_id) - 2. 直接 XADD 到 stream:awoooi_signals - 3. 返回 message_id 或 None + def _mark_sent(self, fingerprint: str) -> None: + """標記已發送(設 TTL)""" + dedup_key = f"sensor:dedup:{fingerprint}" + self._redis.set(dedup_key, "1", ex=DEDUP_TTL) - Args: - alert: 告警字典 (至少需 alert_name, severity, source) + def send_signal(self, alert: dict) -> str | None: + """發送 Signal 至 Event Bus(含去重)""" + fingerprint = self._fingerprint(alert) - Returns: - Redis Stream message ID or None - """ - if not self._redis: - print("[ERROR] Not connected to Redis") + if self._is_dedup(fingerprint): + print(f" [SKIP] {alert['alert_name']} ({alert['target']}) — dedup ({DEDUP_TTL//60}min TTL)") return None - # 建立標準 Signal 格式 now = datetime.now(TAIPEI_TZ) signal = { - "alert_name": alert.get("alert_name", "UnknownAlert"), - "severity": alert.get("severity", "warning"), + "alert_name": alert["alert_name"], + "severity": alert["severity"], "source": alert.get("source", "sensor-agent"), - "namespace": alert.get("namespace", "default"), - "target": alert.get("target", "unknown"), - "fingerprint": alert.get("fingerprint", f"fp_{uuid4().hex[:12]}"), - "labels": json.dumps(alert.get("labels", {"sensor_id": self.sensor_id})), + "namespace": alert.get("namespace", "infra"), + "target": alert["target"], + "fingerprint": fingerprint, + "labels": json.dumps(alert.get("labels", {})), "annotations": json.dumps(alert.get("annotations", {})), "received_at": now.isoformat(), - "sensor_id": self.sensor_id, + "sensor_id": f"sensor-{self.hostname}", "sensor_host": self.hostname, + "sensor_ip": self.host_ip, } try: - # 無腦 XADD - 直接打入 188 基地 - message_id = self._redis.xadd(STREAM_NAME, signal) + message_id = self._redis.xadd(STREAM_KEY, signal) + self._mark_sent(fingerprint) + print(f" [SENT] {alert['alert_name']} → {message_id}") return message_id except redis.RedisError as e: - print(f"[ERROR] XADD failed: {e}") + print(f" [FAIL] XADD failed: {e}") return None - def fire_mock_alert(self, alert_name: str | None = None) -> str | None: - """ - 發射模擬告警 (測試用) + def run_once(self) -> int: + """執行一次採集,返回發送的告警數""" + if not self.config: + print(f"[WARN] Host {self.hostname} ({self.host_ip}) not in HOST_CONFIGS, skipping") + return 0 - Args: - alert_name: 指定告警名稱,或隨機選擇 + print(f"\n[SCAN] {self.hostname} ({self.host_ip}) @ {datetime.now(TAIPEI_TZ).strftime('%H:%M:%S')}") + all_alerts: list[dict] = [] - Returns: - message_id or None - """ - if alert_name: - # 尋找指定告警 - alert = next( - (a for a in MOCK_ALERTS if a["alert_name"] == alert_name), - MOCK_ALERTS[-1], # 預設使用 FINAL_PHASE_6_TEST - ) - else: - alert = random.choice(MOCK_ALERTS) + # A. Node Metrics + node_url = self.config.get("node_exporter_url") + if node_url: + metrics_alerts = collect_node_metrics(node_url, self.hostname) + print(f" [NodeMetrics] {len(metrics_alerts)} alert(s)") + all_alerts.extend(metrics_alerts) - print(f"\n[FIRE] Sending alert: {alert['alert_name']}") - print(f" Severity: {alert['severity']}") - print(f" Target: {alert['namespace']}/{alert['target']}") - print(f" Sensor: {self.sensor_id}") + # B. Journal + if self.config.get("journal_enabled"): + journal_alerts = collect_journal_errors(self.hostname) + print(f" [Journal] {len(journal_alerts)} alert(s)") + all_alerts.extend(journal_alerts) - message_id = self.send_signal(alert) + # C. Service Probes + services = self.config.get("services", []) + if services: + probe_alerts = collect_service_probes(services, self.hostname) + print(f" [Probes] {len(probe_alerts)} alert(s)") + all_alerts.extend(probe_alerts) - if message_id: - print(f"[OK] Signal delivered to 188 Event Bus") - print(f" Stream: {STREAM_NAME}") - print(f" Message ID: {message_id}") - else: - print(f"[FAIL] Signal delivery failed!") + sent = 0 + for alert in all_alerts: + if self.send_signal(alert): + sent += 1 - return message_id + print(f"[DONE] {sent}/{len(all_alerts)} signals sent\n") + return sent def close(self) -> None: - """關閉連線""" if self._redis: self._redis.close() - print("[OK] Disconnected from 188 Event Bus") # ============================================================================ -# CLI Entry Point +# CLI # ============================================================================ def main() -> int: - parser = argparse.ArgumentParser( - description="AWOOOI Sensor Agent - 神經末梢告警採集代理" - ) - parser.add_argument( - "--alert", - type=str, - default="FINAL_PHASE_6_TEST", - help="告警名稱 (預設: FINAL_PHASE_6_TEST)", - ) - parser.add_argument( - "--loop", - action="store_true", - help="持續監控模式", - ) - parser.add_argument( - "--interval", - type=int, - default=30, - help="監控間隔秒數 (預設: 30)", - ) - parser.add_argument( - "--redis-url", - type=str, - help="Redis URL (預設讀取 AWOOOI_REDIS_URL 環境變數)", - ) - + parser = argparse.ArgumentParser(description="AWOOOI Sensor Agent v2.0 — 神經末梢告警採集代理") + parser.add_argument("--once", action="store_true", help="單次採集後退出(cron 模式)") + parser.add_argument("--loop", action="store_true", help="持續循環模式") + parser.add_argument("--interval", type=int, default=60, help="循環間隔秒數(預設: 60)") + parser.add_argument("--redis-url", type=str, help="Redis URL(預設讀取 AWOOOI_REDIS_URL)") args = parser.parse_args() - print("=" * 70) - print("AWOOOI Sensor Agent - Phase 6.5 神經末梢") - print("=" * 70) - print(f"Time: {datetime.now(TAIPEI_TZ).isoformat()}") - print(f"Host: {socket.gethostname()}") - print() + agent = SensorAgent(redis_url=args.redis_url or REDIS_URL) - # 初始化 Agent - agent = SensorAgent(redis_url=args.redis_url) + print("=" * 60) + print("AWOOOI Sensor Agent v2.0 — Phase 5.5 神經末梢") + print("=" * 60) + print(f"Host: {agent.hostname} ({agent.host_ip})") + print(f"Stream: {STREAM_KEY}") + print(f"Redis: {REDIS_URL.split('@')[-1]}") + print() if not agent.connect(): return 1 try: if args.loop: - # 持續監控模式 - print(f"\n[LOOP] Continuous mode: sending random alert every {args.interval}s") - print("[LOOP] Press Ctrl+C to stop\n") + print(f"[LOOP] 每 {args.interval}s 採集一次,Ctrl+C 停止\n") while True: - agent.fire_mock_alert() + agent.run_once() time.sleep(args.interval) else: - # 單發模式 - message_id = agent.fire_mock_alert(alert_name=args.alert) - if not message_id: - return 1 - + # 預設 --once(cron 用) + agent.run_once() except KeyboardInterrupt: - print("\n[STOP] Interrupted by user") - + print("\n[STOP] Interrupted") finally: agent.close() - print("\n" + "=" * 70) - print("Sensor Agent terminated") - print("=" * 70) - return 0