#!/usr/bin/env python3 """ AWOOOI Sensor Agent - Phase 6.5 神經末梢 ========================================= 極度輕量的告警採集代理,部署於各主機。 唯一職責:採集本地告警 → 無腦轉發至 188 基地 Event Bus 設計鐵律: - 嚴禁 Incident/GraphRAG 邏輯 (防腦分裂) - 零依賴 AWOOOI 核心資料庫 - 純 Python + Redis 即可運行 使用方式: # 設定環境變數 export AWOOOI_REDIS_URL="redis://192.168.68.188:6379/0" # 執行代理 (發送模擬告警) python agent.py # 持續監控模式 (每 30 秒發送一次) python agent.py --loop --interval 30 Version: 1.0.0 Date: 2026-03-22 """ import argparse import json import os import random import socket import sys import time from datetime import datetime, timedelta, timezone # 台北時區 (UTC+8) - 系統統一時區 TAIPEI_TZ = timezone(timedelta(hours=8)) from typing import Any from uuid import uuid4 # ============================================================================ # 唯一外部依賴:redis-py (pip install redis) # ============================================================================ try: import redis except ImportError: print("[FATAL] redis-py not installed. Run: pip install redis") sys.exit(1) # ============================================================================ # 常量定義 # ============================================================================ STREAM_NAME = "stream:awoooi_signals" DEFAULT_REDIS_URL = "redis://192.168.68.188:6379/0" # 模擬告警模板 (實際部署時會讀取 Prometheus/Alertmanager) MOCK_ALERTS = [ { "alert_name": "PodCrashLoopBackOff", "severity": "critical", "source": "prometheus", "namespace": "production", "target": "payment-service", }, { "alert_name": "HighLatencyP99", "severity": "warning", "source": "prometheus", "namespace": "production", "target": "api-gateway", }, { "alert_name": "HighErrorRate", "severity": "critical", "source": "prometheus", "namespace": "staging", "target": "order-service", }, { "alert_name": "MemoryPressure", "severity": "warning", "source": "node-exporter", "namespace": "infra", "target": "k3s-worker-01", }, { "alert_name": "FINAL_PHASE_6_TEST", "severity": "critical", "source": "sensor-agent", "namespace": "production", "target": "awoooi-brain", }, ] # ============================================================================ # Sensor Agent Core # ============================================================================ class SensorAgent: """ 神經末梢 - 極簡告警採集代理 職責: 1. 採集本地告警 (或模擬生成) 2. 格式化為標準 Signal 3. 透過 Redis XADD 打入 188 基地 Event Bus 嚴禁邏輯: - Incident 聚合 (由 188 大腦負責) - GraphRAG 分析 (由 188 大腦負責) - 任何決策邏輯 (由 188 大腦負責) """ def __init__(self, redis_url: str | None = None) -> None: self.redis_url = redis_url or os.getenv("AWOOOI_REDIS_URL", DEFAULT_REDIS_URL) self.hostname = socket.gethostname() self.sensor_id = f"sensor-{self.hostname}" self._redis: redis.Redis | None = None def connect(self) -> bool: """連線至 188 基地 Redis""" try: self._redis = redis.from_url( self.redis_url, decode_responses=True, socket_connect_timeout=5, ) # 測試連線 self._redis.ping() print(f"[OK] Connected to 188 Event Bus: {self._mask_url(self.redis_url)}") return True except redis.ConnectionError as e: print(f"[FATAL] Cannot connect to 188 Event Bus: {e}") return False def _mask_url(self, url: str) -> str: """遮蔽密碼""" if "@" in url: parts = url.split("@") return f"redis://***@{parts[-1]}" return url def send_signal(self, alert: dict[str, Any]) -> str | None: """ 發送單一 Signal 至 Event Bus 無腦轉發邏輯: 1. 補齊必要欄位 (fingerprint, timestamp, sensor_id) 2. 直接 XADD 到 stream:awoooi_signals 3. 返回 message_id 或 None Args: alert: 告警字典 (至少需 alert_name, severity, source) Returns: Redis Stream message ID or None """ if not self._redis: print("[ERROR] Not connected to Redis") return None # 建立標準 Signal 格式 now = datetime.now(TAIPEI_TZ) signal = { "alert_name": alert.get("alert_name", "UnknownAlert"), "severity": alert.get("severity", "warning"), "source": alert.get("source", "sensor-agent"), "namespace": alert.get("namespace", "default"), "target": alert.get("target", "unknown"), "fingerprint": alert.get("fingerprint", f"fp_{uuid4().hex[:12]}"), "labels": json.dumps(alert.get("labels", {"sensor_id": self.sensor_id})), "annotations": json.dumps(alert.get("annotations", {})), "received_at": now.isoformat(), "sensor_id": self.sensor_id, "sensor_host": self.hostname, } try: # 無腦 XADD - 直接打入 188 基地 message_id = self._redis.xadd(STREAM_NAME, signal) return message_id except redis.RedisError as e: print(f"[ERROR] XADD failed: {e}") return None def fire_mock_alert(self, alert_name: str | None = None) -> str | None: """ 發射模擬告警 (測試用) Args: alert_name: 指定告警名稱,或隨機選擇 Returns: message_id or None """ if alert_name: # 尋找指定告警 alert = next( (a for a in MOCK_ALERTS if a["alert_name"] == alert_name), MOCK_ALERTS[-1], # 預設使用 FINAL_PHASE_6_TEST ) else: alert = random.choice(MOCK_ALERTS) print(f"\n[FIRE] Sending alert: {alert['alert_name']}") print(f" Severity: {alert['severity']}") print(f" Target: {alert['namespace']}/{alert['target']}") print(f" Sensor: {self.sensor_id}") message_id = self.send_signal(alert) if message_id: print(f"[OK] Signal delivered to 188 Event Bus") print(f" Stream: {STREAM_NAME}") print(f" Message ID: {message_id}") else: print(f"[FAIL] Signal delivery failed!") return message_id def close(self) -> None: """關閉連線""" if self._redis: self._redis.close() print("[OK] Disconnected from 188 Event Bus") # ============================================================================ # CLI Entry Point # ============================================================================ def main() -> int: parser = argparse.ArgumentParser( description="AWOOOI Sensor Agent - 神經末梢告警採集代理" ) parser.add_argument( "--alert", type=str, default="FINAL_PHASE_6_TEST", help="告警名稱 (預設: FINAL_PHASE_6_TEST)", ) parser.add_argument( "--loop", action="store_true", help="持續監控模式", ) parser.add_argument( "--interval", type=int, default=30, help="監控間隔秒數 (預設: 30)", ) parser.add_argument( "--redis-url", type=str, help="Redis URL (預設讀取 AWOOOI_REDIS_URL 環境變數)", ) args = parser.parse_args() print("=" * 70) print("AWOOOI Sensor Agent - Phase 6.5 神經末梢") print("=" * 70) print(f"Time: {datetime.now(TAIPEI_TZ).isoformat()}") print(f"Host: {socket.gethostname()}") print() # 初始化 Agent agent = SensorAgent(redis_url=args.redis_url) if not agent.connect(): return 1 try: if args.loop: # 持續監控模式 print(f"\n[LOOP] Continuous mode: sending random alert every {args.interval}s") print("[LOOP] Press Ctrl+C to stop\n") while True: agent.fire_mock_alert() time.sleep(args.interval) else: # 單發模式 message_id = agent.fire_mock_alert(alert_name=args.alert) if not message_id: return 1 except KeyboardInterrupt: print("\n[STOP] Interrupted by user") finally: agent.close() print("\n" + "=" * 70) print("Sensor Agent terminated") print("=" * 70) return 0 if __name__ == "__main__": sys.exit(main())