feat(sensor): Phase 5.5 B1 Sensor Agent v2.0 — 三層真實採集
Some checks failed
CD Pipeline / build-and-deploy (push) Has been cancelled

- NodeMetricsCollector: node-exporter CPU/Mem/Disk/Load 閾值告警
- JournalCollector: systemd journal ERROR/OOM/KernelPanic 偵測
- ServiceProbeCollector: TCP port 存活探測 (188: PG/Redis/Ollama/Nginx/SigNoz, 110: Harbor/Gitea)
- 10分鐘 fingerprint dedup (Redis sensor:dedup:{fp})
- 正確 Stream key: awoooi:signals DB10 (ADR-038)
- HOST_CONFIGS 自動 IP 偵測 (110/188)
- 已部署 cron @188/@110,E2E 驗證:sensor→stream→INC-20260409-2F1DD6

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
OG T
2026-04-09 23:31:35 +08:00
parent eb46079b4a
commit 31d45f0c99

View File

@@ -1,98 +1,293 @@
#!/usr/bin/env python3
"""
AWOOOI Sensor Agent - Phase 6.5 神經末梢
AWOOOI Sensor Agent - Phase 5.5 神經末梢
=========================================
極度輕量的告警採集代理,部署於主機。
唯一職責:採集本地告警 → 無腦轉發至 188 基地 Event Bus
極度輕量的主機告警採集代理,部署於 110/188 兩台原生 Linux 主機。
唯一職責:採集本地真實告警 → XADD → 188 Redis Event Bus
三層採集器:
A. NodeMetricsCollector — node-exporter metrics (CPU/Mem/Disk/Load)
B. JournalCollector — systemd journal ERROR/CRITICAL logs
C. ServiceProbeCollector — TCP port 存活探測
設計鐵律:
- 嚴禁 Incident/GraphRAG 邏輯 (防腦分裂)
- 零依賴 AWOOOI 核心資料庫
- 純 Python + Redis 即可運行
- 嚴禁 Incident/決策邏輯(防腦分裂
- 零依賴 AWOOOI 核心 DB
- 純 Python + redis-py 即可運行
- fingerprint 去重:同一告警 10 分鐘只送一次
使用方式:
# 設定環境變數
export AWOOOI_REDIS_URL="redis://192.168.68.188:6379/0"
Stream: awoooi:signals (2026-03-27 ADR-038 更名,舊 stream:awoooi_signals 已廢棄)
Redis: 192.168.0.188:6380/0
# 執行代理 (發送模擬告警)
python agent.py
# 持續監控模式 (每 30 秒發送一次)
python agent.py --loop --interval 30
Version: 1.0.0
Date: 2026-03-22
版本: 2.0.0
建立: 2026-04-09 (台北時區)
建立者: Claude Sonnet 4.6 (Phase 5.5 B1 實作)
"""
from __future__ import annotations
import argparse
import hashlib
import json
import os
import random
import socket
import subprocess
import sys
import time
import urllib.request
from datetime import datetime, timedelta, timezone
# 台北時區 (UTC+8) - 系統統一時區
TAIPEI_TZ = timezone(timedelta(hours=8))
from typing import Any
from uuid import uuid4
# ============================================================================
# 唯一外部依賴redis-py (pip install redis)
# ============================================================================
try:
import redis
except ImportError:
print("[FATAL] redis-py not installed. Run: pip install redis")
sys.exit(1)
# ============================================================================
# 常量
# ============================================================================
TAIPEI_TZ = timezone(timedelta(hours=8))
STREAM_KEY = "awoooi:signals" # ADR-038 更名後正確 key
DEDUP_TTL = 600 # 10 分鐘去重 TTL
# DB 10: signal_worker 的 REDIS_URL (redis://192.168.0.188:6380/10)
REDIS_URL = os.getenv("AWOOOI_REDIS_URL", "redis://192.168.0.188:6380/10")
# 各主機服務探測配置
# key = hostname prefix 或 IP 最後段
HOST_CONFIGS: dict[str, dict] = {
# 192.168.0.188 — AI+Web 中心
"188": {
"node_exporter_url": "http://localhost:9100/metrics",
"journal_enabled": True,
"services": [
{"name": "PostgreSQL", "host": "127.0.0.1", "port": 5432},
{"name": "Redis", "host": "127.0.0.1", "port": 6380},
{"name": "Ollama", "host": "127.0.0.1", "port": 11434},
{"name": "Nginx", "host": "127.0.0.1", "port": 80},
{"name": "SigNoz", "host": "127.0.0.1", "port": 3301},
# OpenClaw 跑在 K3s (awoooi-prod),非 188 本機服務,不在此探測
],
},
# 192.168.0.110 — DevOps 金庫
"110": {
"node_exporter_url": "http://localhost:9100/metrics",
"journal_enabled": True,
"services": [
{"name": "Harbor", "host": "127.0.0.1", "port": 5000},
{"name": "Gitea", "host": "127.0.0.1", "port": 3001},
{"name": "GH-Runner", "host": "127.0.0.1", "port": 8080},
],
},
}
# 告警閾值
THRESHOLDS = {
"cpu_pct_high": 85.0, # CPU > 85% → HIGH
"mem_pct_high": 90.0, # Mem > 90% → HIGH
"disk_pct_high": 85.0, # Disk > 85% → MEDIUM
"load_factor": 2.0, # Load > cpu_count * 2 → MEDIUM
"journal_err_min": 10, # 近5分鐘 ERROR 筆數 > 10 → MEDIUM
}
# ============================================================================
# 常量定義
# A. NodeMetricsCollector
# ============================================================================
STREAM_NAME = "stream:awoooi_signals"
DEFAULT_REDIS_URL = "redis://192.168.68.188:6379/0"
# 模擬告警模板 (實際部署時會讀取 Prometheus/Alertmanager)
MOCK_ALERTS = [
{
"alert_name": "PodCrashLoopBackOff",
"severity": "critical",
"source": "prometheus",
"namespace": "production",
"target": "payment-service",
},
{
"alert_name": "HighLatencyP99",
"severity": "warning",
"source": "prometheus",
"namespace": "production",
"target": "api-gateway",
},
{
"alert_name": "HighErrorRate",
"severity": "critical",
"source": "prometheus",
"namespace": "staging",
"target": "order-service",
},
{
"alert_name": "MemoryPressure",
"severity": "warning",
"source": "node-exporter",
"namespace": "infra",
"target": "k3s-worker-01",
},
{
"alert_name": "FINAL_PHASE_6_TEST",
"severity": "critical",
"source": "sensor-agent",
"namespace": "production",
"target": "awoooi-brain",
},
]
def _fetch_metrics(url: str) -> dict[str, float]:
"""從 node-exporter 抓取指定 metrics返回 {metric_key: value}"""
try:
with urllib.request.urlopen(url, timeout=5) as resp:
text = resp.read().decode()
except Exception:
return {}
result: dict[str, float] = {}
for line in text.splitlines():
if line.startswith("#"):
continue
parts = line.split(" ")
if len(parts) < 2:
continue
try:
result[parts[0]] = float(parts[1])
except ValueError:
continue
return result
def collect_node_metrics(url: str, hostname: str) -> list[dict]:
"""採集 CPU/Mem/Disk/Load 告警"""
alerts = []
raw = _fetch_metrics(url)
if not raw:
return alerts
# --- CPU usage ---
idle_keys = [k for k in raw if "node_cpu_seconds_total" in k and 'mode="idle"' in k]
total_keys = [k for k in raw if "node_cpu_seconds_total" in k]
if idle_keys and total_keys:
idle = sum(raw[k] for k in idle_keys)
total = sum(raw[k] for k in total_keys)
cpu_pct = (1.0 - idle / total) * 100 if total else 0.0
if cpu_pct > THRESHOLDS["cpu_pct_high"]:
alerts.append({
"alert_name": "HostHighCpuLoad",
"severity": "critical",
"source": "node-exporter",
"namespace": "infra",
"target": hostname,
"labels": {"cpu_pct": f"{cpu_pct:.1f}"},
"annotations": {"summary": f"CPU usage {cpu_pct:.1f}% > {THRESHOLDS['cpu_pct_high']}%"},
})
# --- Memory usage ---
mem_total = raw.get("node_memory_MemTotal_bytes", 0)
mem_avail = raw.get("node_memory_MemAvailable_bytes", 0)
if mem_total > 0:
mem_pct = (1.0 - mem_avail / mem_total) * 100
if mem_pct > THRESHOLDS["mem_pct_high"]:
alerts.append({
"alert_name": "HostOutOfMemory",
"severity": "critical",
"source": "node-exporter",
"namespace": "infra",
"target": hostname,
"labels": {"mem_pct": f"{mem_pct:.1f}"},
"annotations": {"summary": f"Memory usage {mem_pct:.1f}% > {THRESHOLDS['mem_pct_high']}%"},
})
# --- Disk usage (root filesystem) ---
disk_size = raw.get('node_filesystem_size_bytes{mountpoint="/",fstype="ext4"}', 0) or \
raw.get('node_filesystem_size_bytes{mountpoint="/"}', 0)
disk_free = raw.get('node_filesystem_free_bytes{mountpoint="/",fstype="ext4"}', 0) or \
raw.get('node_filesystem_free_bytes{mountpoint="/"}', 0)
# fallback: 找第一個 mountpoint="/"
if disk_size == 0:
for k, v in raw.items():
if "node_filesystem_size_bytes" in k and 'mountpoint="/"' in k:
disk_size = v
break
if disk_free == 0:
for k, v in raw.items():
if "node_filesystem_free_bytes" in k and 'mountpoint="/"' in k:
disk_free = v
break
if disk_size > 0:
disk_pct = (1.0 - disk_free / disk_size) * 100
if disk_pct > THRESHOLDS["disk_pct_high"]:
alerts.append({
"alert_name": "HostDiskAlmostFull",
"severity": "warning",
"source": "node-exporter",
"namespace": "infra",
"target": hostname,
"labels": {"disk_pct": f"{disk_pct:.1f}"},
"annotations": {"summary": f"Disk usage {disk_pct:.1f}% > {THRESHOLDS['disk_pct_high']}%"},
})
# --- Load average ---
load1 = raw.get("node_load1", 0)
cpu_count = len([k for k in raw if "node_cpu_seconds_total" in k and 'mode="idle"' in k])
if cpu_count > 0 and load1 > cpu_count * THRESHOLDS["load_factor"]:
alerts.append({
"alert_name": "HostHighLoadAverage",
"severity": "warning",
"source": "node-exporter",
"namespace": "infra",
"target": hostname,
"labels": {"load1": f"{load1:.2f}", "cpu_count": str(cpu_count)},
"annotations": {"summary": f"Load {load1:.2f} > {cpu_count * THRESHOLDS['load_factor']:.0f} (cpu×{THRESHOLDS['load_factor']})"},
})
return alerts
# ============================================================================
# B. JournalCollector
# ============================================================================
def collect_journal_errors(hostname: str) -> list[dict]:
"""採集 systemd journal 近 5 分鐘 ERROR/CRITICAL 告警"""
alerts = []
try:
result = subprocess.run(
["journalctl", "-p", "err", "--since", "5 minutes ago", "--no-pager", "-q"],
capture_output=True, text=True, timeout=10,
)
lines = [l for l in result.stdout.splitlines() if l.strip()]
count = len(lines)
# OOM killer 特殊處理
oom_lines = [l for l in lines if "Out of memory" in l or "oom_kill" in l.lower()]
if oom_lines:
alerts.append({
"alert_name": "HostOOMKiller",
"severity": "critical",
"source": "journal",
"namespace": "infra",
"target": hostname,
"labels": {"oom_count": str(len(oom_lines))},
"annotations": {"summary": f"OOM killer triggered {len(oom_lines)} times in last 5min"},
})
# kernel panic
panic_lines = [l for l in lines if "kernel panic" in l.lower() or "Kernel panic" in l]
if panic_lines:
alerts.append({
"alert_name": "HostKernelPanic",
"severity": "critical",
"source": "journal",
"namespace": "infra",
"target": hostname,
"labels": {},
"annotations": {"summary": "Kernel panic detected"},
})
# 一般 ERROR 洪水
if count > THRESHOLDS["journal_err_min"] and not oom_lines and not panic_lines:
alerts.append({
"alert_name": "HostErrorLogFlood",
"severity": "warning",
"source": "journal",
"namespace": "infra",
"target": hostname,
"labels": {"error_count": str(count)},
"annotations": {"summary": f"{count} ERROR log entries in last 5min"},
})
except (subprocess.TimeoutExpired, FileNotFoundError):
pass # journalctl 不存在或超時,靜默跳過
return alerts
# ============================================================================
# C. ServiceProbeCollector
# ============================================================================
def collect_service_probes(services: list[dict], hostname: str) -> list[dict]:
"""TCP port 存活探測"""
alerts = []
for svc in services:
try:
sock = socket.create_connection((svc["host"], svc["port"]), timeout=3)
sock.close()
except (OSError, ConnectionRefusedError, TimeoutError):
alerts.append({
"alert_name": "ServiceDown",
"severity": "critical",
"source": "sensor-probe",
"namespace": "infra",
"target": f"{hostname}/{svc['name']}",
"labels": {"service": svc["name"], "port": str(svc["port"])},
"annotations": {"summary": f"{svc['name']} port {svc['port']} unreachable on {hostname}"},
})
return alerts
# ============================================================================
@@ -100,204 +295,172 @@ MOCK_ALERTS = [
# ============================================================================
class SensorAgent:
"""
神經末梢 - 極簡告警採集代理
職責:
1. 採集本地告警 (或模擬生成)
2. 格式化為標準 Signal
3. 透過 Redis XADD 打入 188 基地 Event Bus
嚴禁邏輯:
- Incident 聚合 (由 188 大腦負責)
- GraphRAG 分析 (由 188 大腦負責)
- 任何決策邏輯 (由 188 大腦負責)
"""
def __init__(self, redis_url: str | None = None) -> None:
self.redis_url = redis_url or os.getenv("AWOOOI_REDIS_URL", DEFAULT_REDIS_URL)
def __init__(self, redis_url: str = REDIS_URL) -> None:
self.redis_url = redis_url
self.hostname = socket.gethostname()
self.sensor_id = f"sensor-{self.hostname}"
self.host_ip = self._detect_ip()
self.config = self._detect_config()
self._redis: redis.Redis | None = None
def _detect_ip(self) -> str:
"""取得主機 IP 最後段(用於對應 HOST_CONFIGS"""
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("192.168.0.188", 6380))
ip = s.getsockname()[0]
s.close()
return ip
except Exception:
return "unknown"
def _detect_config(self) -> dict | None:
"""根據 IP 自動對應 HOST_CONFIGS"""
last_octet = self.host_ip.split(".")[-1] if "." in self.host_ip else ""
return HOST_CONFIGS.get(last_octet)
def connect(self) -> bool:
"""連線至 188 基地 Redis"""
try:
self._redis = redis.from_url(
self.redis_url,
decode_responses=True,
socket_connect_timeout=5,
)
# 測試連線
self._redis.ping()
print(f"[OK] Connected to 188 Event Bus: {self._mask_url(self.redis_url)}")
print(f"[OK] Connected to Redis Event Bus ({self.redis_url.split('@')[-1]})")
return True
except redis.ConnectionError as e:
print(f"[FATAL] Cannot connect to 188 Event Bus: {e}")
print(f"[FATAL] Cannot connect to Redis: {e}")
return False
def _mask_url(self, url: str) -> str:
"""遮蔽密碼"""
if "@" in url:
parts = url.split("@")
return f"redis://***@{parts[-1]}"
return url
def _fingerprint(self, alert: dict) -> str:
"""生成告警 fingerprintalert_name + target → 10 min 去重)"""
key = f"{alert['alert_name']}:{alert['target']}"
return hashlib.md5(key.encode()).hexdigest()[:16]
def send_signal(self, alert: dict[str, Any]) -> str | None:
"""
發送單一 Signal 至 Event Bus
def _is_dedup(self, fingerprint: str) -> bool:
"""檢查是否在去重 TTL 內"""
dedup_key = f"sensor:dedup:{fingerprint}"
return bool(self._redis.exists(dedup_key))
無腦轉發邏輯:
1. 補齊必要欄位 (fingerprint, timestamp, sensor_id)
2. 直接 XADD 到 stream:awoooi_signals
3. 返回 message_id 或 None
def _mark_sent(self, fingerprint: str) -> None:
"""標記已發送(設 TTL"""
dedup_key = f"sensor:dedup:{fingerprint}"
self._redis.set(dedup_key, "1", ex=DEDUP_TTL)
Args:
alert: 告警字典 (至少需 alert_name, severity, source)
def send_signal(self, alert: dict) -> str | None:
"""發送 Signal 至 Event Bus含去重"""
fingerprint = self._fingerprint(alert)
Returns:
Redis Stream message ID or None
"""
if not self._redis:
print("[ERROR] Not connected to Redis")
if self._is_dedup(fingerprint):
print(f" [SKIP] {alert['alert_name']} ({alert['target']}) — dedup ({DEDUP_TTL//60}min TTL)")
return None
# 建立標準 Signal 格式
now = datetime.now(TAIPEI_TZ)
signal = {
"alert_name": alert.get("alert_name", "UnknownAlert"),
"severity": alert.get("severity", "warning"),
"alert_name": alert["alert_name"],
"severity": alert["severity"],
"source": alert.get("source", "sensor-agent"),
"namespace": alert.get("namespace", "default"),
"target": alert.get("target", "unknown"),
"fingerprint": alert.get("fingerprint", f"fp_{uuid4().hex[:12]}"),
"labels": json.dumps(alert.get("labels", {"sensor_id": self.sensor_id})),
"namespace": alert.get("namespace", "infra"),
"target": alert["target"],
"fingerprint": fingerprint,
"labels": json.dumps(alert.get("labels", {})),
"annotations": json.dumps(alert.get("annotations", {})),
"received_at": now.isoformat(),
"sensor_id": self.sensor_id,
"sensor_id": f"sensor-{self.hostname}",
"sensor_host": self.hostname,
"sensor_ip": self.host_ip,
}
try:
# 無腦 XADD - 直接打入 188 基地
message_id = self._redis.xadd(STREAM_NAME, signal)
message_id = self._redis.xadd(STREAM_KEY, signal)
self._mark_sent(fingerprint)
print(f" [SENT] {alert['alert_name']}{message_id}")
return message_id
except redis.RedisError as e:
print(f"[ERROR] XADD failed: {e}")
print(f" [FAIL] XADD failed: {e}")
return None
def fire_mock_alert(self, alert_name: str | None = None) -> str | None:
"""
發射模擬告警 (測試用)
def run_once(self) -> int:
"""執行一次採集,返回發送的告警數"""
if not self.config:
print(f"[WARN] Host {self.hostname} ({self.host_ip}) not in HOST_CONFIGS, skipping")
return 0
Args:
alert_name: 指定告警名稱,或隨機選擇
print(f"\n[SCAN] {self.hostname} ({self.host_ip}) @ {datetime.now(TAIPEI_TZ).strftime('%H:%M:%S')}")
all_alerts: list[dict] = []
Returns:
message_id or None
"""
if alert_name:
# 尋找指定告警
alert = next(
(a for a in MOCK_ALERTS if a["alert_name"] == alert_name),
MOCK_ALERTS[-1], # 預設使用 FINAL_PHASE_6_TEST
)
else:
alert = random.choice(MOCK_ALERTS)
# A. Node Metrics
node_url = self.config.get("node_exporter_url")
if node_url:
metrics_alerts = collect_node_metrics(node_url, self.hostname)
print(f" [NodeMetrics] {len(metrics_alerts)} alert(s)")
all_alerts.extend(metrics_alerts)
print(f"\n[FIRE] Sending alert: {alert['alert_name']}")
print(f" Severity: {alert['severity']}")
print(f" Target: {alert['namespace']}/{alert['target']}")
print(f" Sensor: {self.sensor_id}")
# B. Journal
if self.config.get("journal_enabled"):
journal_alerts = collect_journal_errors(self.hostname)
print(f" [Journal] {len(journal_alerts)} alert(s)")
all_alerts.extend(journal_alerts)
message_id = self.send_signal(alert)
# C. Service Probes
services = self.config.get("services", [])
if services:
probe_alerts = collect_service_probes(services, self.hostname)
print(f" [Probes] {len(probe_alerts)} alert(s)")
all_alerts.extend(probe_alerts)
if message_id:
print(f"[OK] Signal delivered to 188 Event Bus")
print(f" Stream: {STREAM_NAME}")
print(f" Message ID: {message_id}")
else:
print(f"[FAIL] Signal delivery failed!")
sent = 0
for alert in all_alerts:
if self.send_signal(alert):
sent += 1
return message_id
print(f"[DONE] {sent}/{len(all_alerts)} signals sent\n")
return sent
def close(self) -> None:
"""關閉連線"""
if self._redis:
self._redis.close()
print("[OK] Disconnected from 188 Event Bus")
# ============================================================================
# CLI Entry Point
# CLI
# ============================================================================
def main() -> int:
parser = argparse.ArgumentParser(
description="AWOOOI Sensor Agent - 神經末梢告警採集代理"
)
parser.add_argument(
"--alert",
type=str,
default="FINAL_PHASE_6_TEST",
help="告警名稱 (預設: FINAL_PHASE_6_TEST)",
)
parser.add_argument(
"--loop",
action="store_true",
help="持續監控模式",
)
parser.add_argument(
"--interval",
type=int,
default=30,
help="監控間隔秒數 (預設: 30)",
)
parser.add_argument(
"--redis-url",
type=str,
help="Redis URL (預設讀取 AWOOOI_REDIS_URL 環境變數)",
)
parser = argparse.ArgumentParser(description="AWOOOI Sensor Agent v2.0 — 神經末梢告警採集代理")
parser.add_argument("--once", action="store_true", help="單次採集後退出cron 模式)")
parser.add_argument("--loop", action="store_true", help="持續循環模式")
parser.add_argument("--interval", type=int, default=60, help="循環間隔秒數(預設: 60")
parser.add_argument("--redis-url", type=str, help="Redis URL預設讀取 AWOOOI_REDIS_URL")
args = parser.parse_args()
print("=" * 70)
print("AWOOOI Sensor Agent - Phase 6.5 神經末梢")
print("=" * 70)
print(f"Time: {datetime.now(TAIPEI_TZ).isoformat()}")
print(f"Host: {socket.gethostname()}")
print()
agent = SensorAgent(redis_url=args.redis_url or REDIS_URL)
# 初始化 Agent
agent = SensorAgent(redis_url=args.redis_url)
print("=" * 60)
print("AWOOOI Sensor Agent v2.0 — Phase 5.5 神經末梢")
print("=" * 60)
print(f"Host: {agent.hostname} ({agent.host_ip})")
print(f"Stream: {STREAM_KEY}")
print(f"Redis: {REDIS_URL.split('@')[-1]}")
print()
if not agent.connect():
return 1
try:
if args.loop:
# 持續監控模式
print(f"\n[LOOP] Continuous mode: sending random alert every {args.interval}s")
print("[LOOP] Press Ctrl+C to stop\n")
print(f"[LOOP] 每 {args.interval}s 採集一次Ctrl+C 停止\n")
while True:
agent.fire_mock_alert()
agent.run_once()
time.sleep(args.interval)
else:
# 單發模式
message_id = agent.fire_mock_alert(alert_name=args.alert)
if not message_id:
return 1
# 預設 --oncecron 用)
agent.run_once()
except KeyboardInterrupt:
print("\n[STOP] Interrupted by user")
print("\n[STOP] Interrupted")
finally:
agent.close()
print("\n" + "=" * 70)
print("Sensor Agent terminated")
print("=" * 70)
return 0