feat(sensor): Phase 5.5 B1 Sensor Agent v2.0 — 三層真實採集
Some checks failed
CD Pipeline / build-and-deploy (push) Has been cancelled
Some checks failed
CD Pipeline / build-and-deploy (push) Has been cancelled
- NodeMetricsCollector: node-exporter CPU/Mem/Disk/Load 閾值告警
- JournalCollector: systemd journal ERROR/OOM/KernelPanic 偵測
- ServiceProbeCollector: TCP port 存活探測 (188: PG/Redis/Ollama/Nginx/SigNoz, 110: Harbor/Gitea)
- 10分鐘 fingerprint dedup (Redis sensor:dedup:{fp})
- 正確 Stream key: awoooi:signals DB10 (ADR-038)
- HOST_CONFIGS 自動 IP 偵測 (110/188)
- 已部署 cron @188/@110,E2E 驗證:sensor→stream→INC-20260409-2F1DD6
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -1,98 +1,293 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
AWOOOI Sensor Agent - Phase 6.5 神經末梢
|
||||
AWOOOI Sensor Agent - Phase 5.5 神經末梢
|
||||
=========================================
|
||||
|
||||
極度輕量的告警採集代理,部署於各主機。
|
||||
唯一職責:採集本地告警 → 無腦轉發至 188 基地 Event Bus
|
||||
極度輕量的主機告警採集代理,部署於 110/188 兩台原生 Linux 主機。
|
||||
唯一職責:採集本地真實告警 → XADD → 188 Redis Event Bus
|
||||
|
||||
三層採集器:
|
||||
A. NodeMetricsCollector — node-exporter metrics (CPU/Mem/Disk/Load)
|
||||
B. JournalCollector — systemd journal ERROR/CRITICAL logs
|
||||
C. ServiceProbeCollector — TCP port 存活探測
|
||||
|
||||
設計鐵律:
|
||||
- 嚴禁 Incident/GraphRAG 邏輯 (防腦分裂)
|
||||
- 零依賴 AWOOOI 核心資料庫
|
||||
- 純 Python + Redis 即可運行
|
||||
- 嚴禁 Incident/決策邏輯(防腦分裂)
|
||||
- 零依賴 AWOOOI 核心 DB
|
||||
- 純 Python + redis-py 即可運行
|
||||
- fingerprint 去重:同一告警 10 分鐘只送一次
|
||||
|
||||
使用方式:
|
||||
# 設定環境變數
|
||||
export AWOOOI_REDIS_URL="redis://192.168.68.188:6379/0"
|
||||
Stream: awoooi:signals (2026-03-27 ADR-038 更名,舊 stream:awoooi_signals 已廢棄)
|
||||
Redis: 192.168.0.188:6380/0
|
||||
|
||||
# 執行代理 (發送模擬告警)
|
||||
python agent.py
|
||||
|
||||
# 持續監控模式 (每 30 秒發送一次)
|
||||
python agent.py --loop --interval 30
|
||||
|
||||
Version: 1.0.0
|
||||
Date: 2026-03-22
|
||||
版本: 2.0.0
|
||||
建立: 2026-04-09 (台北時區)
|
||||
建立者: Claude Sonnet 4.6 (Phase 5.5 B1 實作)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import hashlib
|
||||
import json
|
||||
import os
|
||||
import random
|
||||
import socket
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
import urllib.request
|
||||
from datetime import datetime, timedelta, timezone
|
||||
|
||||
# 台北時區 (UTC+8) - 系統統一時區
|
||||
TAIPEI_TZ = timezone(timedelta(hours=8))
|
||||
from typing import Any
|
||||
from uuid import uuid4
|
||||
|
||||
# ============================================================================
|
||||
# 唯一外部依賴:redis-py (pip install redis)
|
||||
# ============================================================================
|
||||
try:
|
||||
import redis
|
||||
except ImportError:
|
||||
print("[FATAL] redis-py not installed. Run: pip install redis")
|
||||
sys.exit(1)
|
||||
|
||||
# ============================================================================
|
||||
# 常量
|
||||
# ============================================================================
|
||||
|
||||
TAIPEI_TZ = timezone(timedelta(hours=8))
|
||||
STREAM_KEY = "awoooi:signals" # ADR-038 更名後正確 key
|
||||
DEDUP_TTL = 600 # 10 分鐘去重 TTL(秒)
|
||||
# DB 10: signal_worker 的 REDIS_URL (redis://192.168.0.188:6380/10)
|
||||
REDIS_URL = os.getenv("AWOOOI_REDIS_URL", "redis://192.168.0.188:6380/10")
|
||||
|
||||
# 各主機服務探測配置
|
||||
# key = hostname prefix 或 IP 最後段
|
||||
HOST_CONFIGS: dict[str, dict] = {
|
||||
# 192.168.0.188 — AI+Web 中心
|
||||
"188": {
|
||||
"node_exporter_url": "http://localhost:9100/metrics",
|
||||
"journal_enabled": True,
|
||||
"services": [
|
||||
{"name": "PostgreSQL", "host": "127.0.0.1", "port": 5432},
|
||||
{"name": "Redis", "host": "127.0.0.1", "port": 6380},
|
||||
{"name": "Ollama", "host": "127.0.0.1", "port": 11434},
|
||||
{"name": "Nginx", "host": "127.0.0.1", "port": 80},
|
||||
{"name": "SigNoz", "host": "127.0.0.1", "port": 3301},
|
||||
# OpenClaw 跑在 K3s (awoooi-prod),非 188 本機服務,不在此探測
|
||||
],
|
||||
},
|
||||
# 192.168.0.110 — DevOps 金庫
|
||||
"110": {
|
||||
"node_exporter_url": "http://localhost:9100/metrics",
|
||||
"journal_enabled": True,
|
||||
"services": [
|
||||
{"name": "Harbor", "host": "127.0.0.1", "port": 5000},
|
||||
{"name": "Gitea", "host": "127.0.0.1", "port": 3001},
|
||||
{"name": "GH-Runner", "host": "127.0.0.1", "port": 8080},
|
||||
],
|
||||
},
|
||||
}
|
||||
|
||||
# 告警閾值
|
||||
THRESHOLDS = {
|
||||
"cpu_pct_high": 85.0, # CPU > 85% → HIGH
|
||||
"mem_pct_high": 90.0, # Mem > 90% → HIGH
|
||||
"disk_pct_high": 85.0, # Disk > 85% → MEDIUM
|
||||
"load_factor": 2.0, # Load > cpu_count * 2 → MEDIUM
|
||||
"journal_err_min": 10, # 近5分鐘 ERROR 筆數 > 10 → MEDIUM
|
||||
}
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# 常量定義
|
||||
# A. NodeMetricsCollector
|
||||
# ============================================================================
|
||||
STREAM_NAME = "stream:awoooi_signals"
|
||||
DEFAULT_REDIS_URL = "redis://192.168.68.188:6379/0"
|
||||
|
||||
# 模擬告警模板 (實際部署時會讀取 Prometheus/Alertmanager)
|
||||
MOCK_ALERTS = [
|
||||
{
|
||||
"alert_name": "PodCrashLoopBackOff",
|
||||
"severity": "critical",
|
||||
"source": "prometheus",
|
||||
"namespace": "production",
|
||||
"target": "payment-service",
|
||||
},
|
||||
{
|
||||
"alert_name": "HighLatencyP99",
|
||||
"severity": "warning",
|
||||
"source": "prometheus",
|
||||
"namespace": "production",
|
||||
"target": "api-gateway",
|
||||
},
|
||||
{
|
||||
"alert_name": "HighErrorRate",
|
||||
"severity": "critical",
|
||||
"source": "prometheus",
|
||||
"namespace": "staging",
|
||||
"target": "order-service",
|
||||
},
|
||||
{
|
||||
"alert_name": "MemoryPressure",
|
||||
"severity": "warning",
|
||||
"source": "node-exporter",
|
||||
"namespace": "infra",
|
||||
"target": "k3s-worker-01",
|
||||
},
|
||||
{
|
||||
"alert_name": "FINAL_PHASE_6_TEST",
|
||||
"severity": "critical",
|
||||
"source": "sensor-agent",
|
||||
"namespace": "production",
|
||||
"target": "awoooi-brain",
|
||||
},
|
||||
]
|
||||
def _fetch_metrics(url: str) -> dict[str, float]:
|
||||
"""從 node-exporter 抓取指定 metrics,返回 {metric_key: value}"""
|
||||
try:
|
||||
with urllib.request.urlopen(url, timeout=5) as resp:
|
||||
text = resp.read().decode()
|
||||
except Exception:
|
||||
return {}
|
||||
|
||||
result: dict[str, float] = {}
|
||||
for line in text.splitlines():
|
||||
if line.startswith("#"):
|
||||
continue
|
||||
parts = line.split(" ")
|
||||
if len(parts) < 2:
|
||||
continue
|
||||
try:
|
||||
result[parts[0]] = float(parts[1])
|
||||
except ValueError:
|
||||
continue
|
||||
return result
|
||||
|
||||
|
||||
def collect_node_metrics(url: str, hostname: str) -> list[dict]:
|
||||
"""採集 CPU/Mem/Disk/Load 告警"""
|
||||
alerts = []
|
||||
raw = _fetch_metrics(url)
|
||||
if not raw:
|
||||
return alerts
|
||||
|
||||
# --- CPU usage ---
|
||||
idle_keys = [k for k in raw if "node_cpu_seconds_total" in k and 'mode="idle"' in k]
|
||||
total_keys = [k for k in raw if "node_cpu_seconds_total" in k]
|
||||
if idle_keys and total_keys:
|
||||
idle = sum(raw[k] for k in idle_keys)
|
||||
total = sum(raw[k] for k in total_keys)
|
||||
cpu_pct = (1.0 - idle / total) * 100 if total else 0.0
|
||||
if cpu_pct > THRESHOLDS["cpu_pct_high"]:
|
||||
alerts.append({
|
||||
"alert_name": "HostHighCpuLoad",
|
||||
"severity": "critical",
|
||||
"source": "node-exporter",
|
||||
"namespace": "infra",
|
||||
"target": hostname,
|
||||
"labels": {"cpu_pct": f"{cpu_pct:.1f}"},
|
||||
"annotations": {"summary": f"CPU usage {cpu_pct:.1f}% > {THRESHOLDS['cpu_pct_high']}%"},
|
||||
})
|
||||
|
||||
# --- Memory usage ---
|
||||
mem_total = raw.get("node_memory_MemTotal_bytes", 0)
|
||||
mem_avail = raw.get("node_memory_MemAvailable_bytes", 0)
|
||||
if mem_total > 0:
|
||||
mem_pct = (1.0 - mem_avail / mem_total) * 100
|
||||
if mem_pct > THRESHOLDS["mem_pct_high"]:
|
||||
alerts.append({
|
||||
"alert_name": "HostOutOfMemory",
|
||||
"severity": "critical",
|
||||
"source": "node-exporter",
|
||||
"namespace": "infra",
|
||||
"target": hostname,
|
||||
"labels": {"mem_pct": f"{mem_pct:.1f}"},
|
||||
"annotations": {"summary": f"Memory usage {mem_pct:.1f}% > {THRESHOLDS['mem_pct_high']}%"},
|
||||
})
|
||||
|
||||
# --- Disk usage (root filesystem) ---
|
||||
disk_size = raw.get('node_filesystem_size_bytes{mountpoint="/",fstype="ext4"}', 0) or \
|
||||
raw.get('node_filesystem_size_bytes{mountpoint="/"}', 0)
|
||||
disk_free = raw.get('node_filesystem_free_bytes{mountpoint="/",fstype="ext4"}', 0) or \
|
||||
raw.get('node_filesystem_free_bytes{mountpoint="/"}', 0)
|
||||
|
||||
# fallback: 找第一個 mountpoint="/"
|
||||
if disk_size == 0:
|
||||
for k, v in raw.items():
|
||||
if "node_filesystem_size_bytes" in k and 'mountpoint="/"' in k:
|
||||
disk_size = v
|
||||
break
|
||||
if disk_free == 0:
|
||||
for k, v in raw.items():
|
||||
if "node_filesystem_free_bytes" in k and 'mountpoint="/"' in k:
|
||||
disk_free = v
|
||||
break
|
||||
|
||||
if disk_size > 0:
|
||||
disk_pct = (1.0 - disk_free / disk_size) * 100
|
||||
if disk_pct > THRESHOLDS["disk_pct_high"]:
|
||||
alerts.append({
|
||||
"alert_name": "HostDiskAlmostFull",
|
||||
"severity": "warning",
|
||||
"source": "node-exporter",
|
||||
"namespace": "infra",
|
||||
"target": hostname,
|
||||
"labels": {"disk_pct": f"{disk_pct:.1f}"},
|
||||
"annotations": {"summary": f"Disk usage {disk_pct:.1f}% > {THRESHOLDS['disk_pct_high']}%"},
|
||||
})
|
||||
|
||||
# --- Load average ---
|
||||
load1 = raw.get("node_load1", 0)
|
||||
cpu_count = len([k for k in raw if "node_cpu_seconds_total" in k and 'mode="idle"' in k])
|
||||
if cpu_count > 0 and load1 > cpu_count * THRESHOLDS["load_factor"]:
|
||||
alerts.append({
|
||||
"alert_name": "HostHighLoadAverage",
|
||||
"severity": "warning",
|
||||
"source": "node-exporter",
|
||||
"namespace": "infra",
|
||||
"target": hostname,
|
||||
"labels": {"load1": f"{load1:.2f}", "cpu_count": str(cpu_count)},
|
||||
"annotations": {"summary": f"Load {load1:.2f} > {cpu_count * THRESHOLDS['load_factor']:.0f} (cpu×{THRESHOLDS['load_factor']})"},
|
||||
})
|
||||
|
||||
return alerts
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# B. JournalCollector
|
||||
# ============================================================================
|
||||
|
||||
def collect_journal_errors(hostname: str) -> list[dict]:
|
||||
"""採集 systemd journal 近 5 分鐘 ERROR/CRITICAL 告警"""
|
||||
alerts = []
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["journalctl", "-p", "err", "--since", "5 minutes ago", "--no-pager", "-q"],
|
||||
capture_output=True, text=True, timeout=10,
|
||||
)
|
||||
lines = [l for l in result.stdout.splitlines() if l.strip()]
|
||||
count = len(lines)
|
||||
|
||||
# OOM killer 特殊處理
|
||||
oom_lines = [l for l in lines if "Out of memory" in l or "oom_kill" in l.lower()]
|
||||
if oom_lines:
|
||||
alerts.append({
|
||||
"alert_name": "HostOOMKiller",
|
||||
"severity": "critical",
|
||||
"source": "journal",
|
||||
"namespace": "infra",
|
||||
"target": hostname,
|
||||
"labels": {"oom_count": str(len(oom_lines))},
|
||||
"annotations": {"summary": f"OOM killer triggered {len(oom_lines)} times in last 5min"},
|
||||
})
|
||||
|
||||
# kernel panic
|
||||
panic_lines = [l for l in lines if "kernel panic" in l.lower() or "Kernel panic" in l]
|
||||
if panic_lines:
|
||||
alerts.append({
|
||||
"alert_name": "HostKernelPanic",
|
||||
"severity": "critical",
|
||||
"source": "journal",
|
||||
"namespace": "infra",
|
||||
"target": hostname,
|
||||
"labels": {},
|
||||
"annotations": {"summary": "Kernel panic detected"},
|
||||
})
|
||||
|
||||
# 一般 ERROR 洪水
|
||||
if count > THRESHOLDS["journal_err_min"] and not oom_lines and not panic_lines:
|
||||
alerts.append({
|
||||
"alert_name": "HostErrorLogFlood",
|
||||
"severity": "warning",
|
||||
"source": "journal",
|
||||
"namespace": "infra",
|
||||
"target": hostname,
|
||||
"labels": {"error_count": str(count)},
|
||||
"annotations": {"summary": f"{count} ERROR log entries in last 5min"},
|
||||
})
|
||||
|
||||
except (subprocess.TimeoutExpired, FileNotFoundError):
|
||||
pass # journalctl 不存在或超時,靜默跳過
|
||||
|
||||
return alerts
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# C. ServiceProbeCollector
|
||||
# ============================================================================
|
||||
|
||||
def collect_service_probes(services: list[dict], hostname: str) -> list[dict]:
|
||||
"""TCP port 存活探測"""
|
||||
alerts = []
|
||||
for svc in services:
|
||||
try:
|
||||
sock = socket.create_connection((svc["host"], svc["port"]), timeout=3)
|
||||
sock.close()
|
||||
except (OSError, ConnectionRefusedError, TimeoutError):
|
||||
alerts.append({
|
||||
"alert_name": "ServiceDown",
|
||||
"severity": "critical",
|
||||
"source": "sensor-probe",
|
||||
"namespace": "infra",
|
||||
"target": f"{hostname}/{svc['name']}",
|
||||
"labels": {"service": svc["name"], "port": str(svc["port"])},
|
||||
"annotations": {"summary": f"{svc['name']} port {svc['port']} unreachable on {hostname}"},
|
||||
})
|
||||
return alerts
|
||||
|
||||
|
||||
# ============================================================================
|
||||
@@ -100,204 +295,172 @@ MOCK_ALERTS = [
|
||||
# ============================================================================
|
||||
|
||||
class SensorAgent:
|
||||
"""
|
||||
神經末梢 - 極簡告警採集代理
|
||||
|
||||
職責:
|
||||
1. 採集本地告警 (或模擬生成)
|
||||
2. 格式化為標準 Signal
|
||||
3. 透過 Redis XADD 打入 188 基地 Event Bus
|
||||
|
||||
嚴禁邏輯:
|
||||
- Incident 聚合 (由 188 大腦負責)
|
||||
- GraphRAG 分析 (由 188 大腦負責)
|
||||
- 任何決策邏輯 (由 188 大腦負責)
|
||||
"""
|
||||
|
||||
def __init__(self, redis_url: str | None = None) -> None:
|
||||
self.redis_url = redis_url or os.getenv("AWOOOI_REDIS_URL", DEFAULT_REDIS_URL)
|
||||
def __init__(self, redis_url: str = REDIS_URL) -> None:
|
||||
self.redis_url = redis_url
|
||||
self.hostname = socket.gethostname()
|
||||
self.sensor_id = f"sensor-{self.hostname}"
|
||||
self.host_ip = self._detect_ip()
|
||||
self.config = self._detect_config()
|
||||
self._redis: redis.Redis | None = None
|
||||
|
||||
def _detect_ip(self) -> str:
|
||||
"""取得主機 IP 最後段(用於對應 HOST_CONFIGS)"""
|
||||
try:
|
||||
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
s.connect(("192.168.0.188", 6380))
|
||||
ip = s.getsockname()[0]
|
||||
s.close()
|
||||
return ip
|
||||
except Exception:
|
||||
return "unknown"
|
||||
|
||||
def _detect_config(self) -> dict | None:
|
||||
"""根據 IP 自動對應 HOST_CONFIGS"""
|
||||
last_octet = self.host_ip.split(".")[-1] if "." in self.host_ip else ""
|
||||
return HOST_CONFIGS.get(last_octet)
|
||||
|
||||
def connect(self) -> bool:
|
||||
"""連線至 188 基地 Redis"""
|
||||
try:
|
||||
self._redis = redis.from_url(
|
||||
self.redis_url,
|
||||
decode_responses=True,
|
||||
socket_connect_timeout=5,
|
||||
)
|
||||
# 測試連線
|
||||
self._redis.ping()
|
||||
print(f"[OK] Connected to 188 Event Bus: {self._mask_url(self.redis_url)}")
|
||||
print(f"[OK] Connected to Redis Event Bus ({self.redis_url.split('@')[-1]})")
|
||||
return True
|
||||
except redis.ConnectionError as e:
|
||||
print(f"[FATAL] Cannot connect to 188 Event Bus: {e}")
|
||||
print(f"[FATAL] Cannot connect to Redis: {e}")
|
||||
return False
|
||||
|
||||
def _mask_url(self, url: str) -> str:
|
||||
"""遮蔽密碼"""
|
||||
if "@" in url:
|
||||
parts = url.split("@")
|
||||
return f"redis://***@{parts[-1]}"
|
||||
return url
|
||||
def _fingerprint(self, alert: dict) -> str:
|
||||
"""生成告警 fingerprint(alert_name + target → 10 min 去重)"""
|
||||
key = f"{alert['alert_name']}:{alert['target']}"
|
||||
return hashlib.md5(key.encode()).hexdigest()[:16]
|
||||
|
||||
def send_signal(self, alert: dict[str, Any]) -> str | None:
|
||||
"""
|
||||
發送單一 Signal 至 Event Bus
|
||||
def _is_dedup(self, fingerprint: str) -> bool:
|
||||
"""檢查是否在去重 TTL 內"""
|
||||
dedup_key = f"sensor:dedup:{fingerprint}"
|
||||
return bool(self._redis.exists(dedup_key))
|
||||
|
||||
無腦轉發邏輯:
|
||||
1. 補齊必要欄位 (fingerprint, timestamp, sensor_id)
|
||||
2. 直接 XADD 到 stream:awoooi_signals
|
||||
3. 返回 message_id 或 None
|
||||
def _mark_sent(self, fingerprint: str) -> None:
|
||||
"""標記已發送(設 TTL)"""
|
||||
dedup_key = f"sensor:dedup:{fingerprint}"
|
||||
self._redis.set(dedup_key, "1", ex=DEDUP_TTL)
|
||||
|
||||
Args:
|
||||
alert: 告警字典 (至少需 alert_name, severity, source)
|
||||
def send_signal(self, alert: dict) -> str | None:
|
||||
"""發送 Signal 至 Event Bus(含去重)"""
|
||||
fingerprint = self._fingerprint(alert)
|
||||
|
||||
Returns:
|
||||
Redis Stream message ID or None
|
||||
"""
|
||||
if not self._redis:
|
||||
print("[ERROR] Not connected to Redis")
|
||||
if self._is_dedup(fingerprint):
|
||||
print(f" [SKIP] {alert['alert_name']} ({alert['target']}) — dedup ({DEDUP_TTL//60}min TTL)")
|
||||
return None
|
||||
|
||||
# 建立標準 Signal 格式
|
||||
now = datetime.now(TAIPEI_TZ)
|
||||
signal = {
|
||||
"alert_name": alert.get("alert_name", "UnknownAlert"),
|
||||
"severity": alert.get("severity", "warning"),
|
||||
"alert_name": alert["alert_name"],
|
||||
"severity": alert["severity"],
|
||||
"source": alert.get("source", "sensor-agent"),
|
||||
"namespace": alert.get("namespace", "default"),
|
||||
"target": alert.get("target", "unknown"),
|
||||
"fingerprint": alert.get("fingerprint", f"fp_{uuid4().hex[:12]}"),
|
||||
"labels": json.dumps(alert.get("labels", {"sensor_id": self.sensor_id})),
|
||||
"namespace": alert.get("namespace", "infra"),
|
||||
"target": alert["target"],
|
||||
"fingerprint": fingerprint,
|
||||
"labels": json.dumps(alert.get("labels", {})),
|
||||
"annotations": json.dumps(alert.get("annotations", {})),
|
||||
"received_at": now.isoformat(),
|
||||
"sensor_id": self.sensor_id,
|
||||
"sensor_id": f"sensor-{self.hostname}",
|
||||
"sensor_host": self.hostname,
|
||||
"sensor_ip": self.host_ip,
|
||||
}
|
||||
|
||||
try:
|
||||
# 無腦 XADD - 直接打入 188 基地
|
||||
message_id = self._redis.xadd(STREAM_NAME, signal)
|
||||
message_id = self._redis.xadd(STREAM_KEY, signal)
|
||||
self._mark_sent(fingerprint)
|
||||
print(f" [SENT] {alert['alert_name']} → {message_id}")
|
||||
return message_id
|
||||
except redis.RedisError as e:
|
||||
print(f"[ERROR] XADD failed: {e}")
|
||||
print(f" [FAIL] XADD failed: {e}")
|
||||
return None
|
||||
|
||||
def fire_mock_alert(self, alert_name: str | None = None) -> str | None:
|
||||
"""
|
||||
發射模擬告警 (測試用)
|
||||
def run_once(self) -> int:
|
||||
"""執行一次採集,返回發送的告警數"""
|
||||
if not self.config:
|
||||
print(f"[WARN] Host {self.hostname} ({self.host_ip}) not in HOST_CONFIGS, skipping")
|
||||
return 0
|
||||
|
||||
Args:
|
||||
alert_name: 指定告警名稱,或隨機選擇
|
||||
print(f"\n[SCAN] {self.hostname} ({self.host_ip}) @ {datetime.now(TAIPEI_TZ).strftime('%H:%M:%S')}")
|
||||
all_alerts: list[dict] = []
|
||||
|
||||
Returns:
|
||||
message_id or None
|
||||
"""
|
||||
if alert_name:
|
||||
# 尋找指定告警
|
||||
alert = next(
|
||||
(a for a in MOCK_ALERTS if a["alert_name"] == alert_name),
|
||||
MOCK_ALERTS[-1], # 預設使用 FINAL_PHASE_6_TEST
|
||||
)
|
||||
else:
|
||||
alert = random.choice(MOCK_ALERTS)
|
||||
# A. Node Metrics
|
||||
node_url = self.config.get("node_exporter_url")
|
||||
if node_url:
|
||||
metrics_alerts = collect_node_metrics(node_url, self.hostname)
|
||||
print(f" [NodeMetrics] {len(metrics_alerts)} alert(s)")
|
||||
all_alerts.extend(metrics_alerts)
|
||||
|
||||
print(f"\n[FIRE] Sending alert: {alert['alert_name']}")
|
||||
print(f" Severity: {alert['severity']}")
|
||||
print(f" Target: {alert['namespace']}/{alert['target']}")
|
||||
print(f" Sensor: {self.sensor_id}")
|
||||
# B. Journal
|
||||
if self.config.get("journal_enabled"):
|
||||
journal_alerts = collect_journal_errors(self.hostname)
|
||||
print(f" [Journal] {len(journal_alerts)} alert(s)")
|
||||
all_alerts.extend(journal_alerts)
|
||||
|
||||
message_id = self.send_signal(alert)
|
||||
# C. Service Probes
|
||||
services = self.config.get("services", [])
|
||||
if services:
|
||||
probe_alerts = collect_service_probes(services, self.hostname)
|
||||
print(f" [Probes] {len(probe_alerts)} alert(s)")
|
||||
all_alerts.extend(probe_alerts)
|
||||
|
||||
if message_id:
|
||||
print(f"[OK] Signal delivered to 188 Event Bus")
|
||||
print(f" Stream: {STREAM_NAME}")
|
||||
print(f" Message ID: {message_id}")
|
||||
else:
|
||||
print(f"[FAIL] Signal delivery failed!")
|
||||
sent = 0
|
||||
for alert in all_alerts:
|
||||
if self.send_signal(alert):
|
||||
sent += 1
|
||||
|
||||
return message_id
|
||||
print(f"[DONE] {sent}/{len(all_alerts)} signals sent\n")
|
||||
return sent
|
||||
|
||||
def close(self) -> None:
|
||||
"""關閉連線"""
|
||||
if self._redis:
|
||||
self._redis.close()
|
||||
print("[OK] Disconnected from 188 Event Bus")
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# CLI Entry Point
|
||||
# CLI
|
||||
# ============================================================================
|
||||
|
||||
def main() -> int:
|
||||
parser = argparse.ArgumentParser(
|
||||
description="AWOOOI Sensor Agent - 神經末梢告警採集代理"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--alert",
|
||||
type=str,
|
||||
default="FINAL_PHASE_6_TEST",
|
||||
help="告警名稱 (預設: FINAL_PHASE_6_TEST)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--loop",
|
||||
action="store_true",
|
||||
help="持續監控模式",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--interval",
|
||||
type=int,
|
||||
default=30,
|
||||
help="監控間隔秒數 (預設: 30)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--redis-url",
|
||||
type=str,
|
||||
help="Redis URL (預設讀取 AWOOOI_REDIS_URL 環境變數)",
|
||||
)
|
||||
|
||||
parser = argparse.ArgumentParser(description="AWOOOI Sensor Agent v2.0 — 神經末梢告警採集代理")
|
||||
parser.add_argument("--once", action="store_true", help="單次採集後退出(cron 模式)")
|
||||
parser.add_argument("--loop", action="store_true", help="持續循環模式")
|
||||
parser.add_argument("--interval", type=int, default=60, help="循環間隔秒數(預設: 60)")
|
||||
parser.add_argument("--redis-url", type=str, help="Redis URL(預設讀取 AWOOOI_REDIS_URL)")
|
||||
args = parser.parse_args()
|
||||
|
||||
print("=" * 70)
|
||||
print("AWOOOI Sensor Agent - Phase 6.5 神經末梢")
|
||||
print("=" * 70)
|
||||
print(f"Time: {datetime.now(TAIPEI_TZ).isoformat()}")
|
||||
print(f"Host: {socket.gethostname()}")
|
||||
print()
|
||||
agent = SensorAgent(redis_url=args.redis_url or REDIS_URL)
|
||||
|
||||
# 初始化 Agent
|
||||
agent = SensorAgent(redis_url=args.redis_url)
|
||||
print("=" * 60)
|
||||
print("AWOOOI Sensor Agent v2.0 — Phase 5.5 神經末梢")
|
||||
print("=" * 60)
|
||||
print(f"Host: {agent.hostname} ({agent.host_ip})")
|
||||
print(f"Stream: {STREAM_KEY}")
|
||||
print(f"Redis: {REDIS_URL.split('@')[-1]}")
|
||||
print()
|
||||
|
||||
if not agent.connect():
|
||||
return 1
|
||||
|
||||
try:
|
||||
if args.loop:
|
||||
# 持續監控模式
|
||||
print(f"\n[LOOP] Continuous mode: sending random alert every {args.interval}s")
|
||||
print("[LOOP] Press Ctrl+C to stop\n")
|
||||
print(f"[LOOP] 每 {args.interval}s 採集一次,Ctrl+C 停止\n")
|
||||
while True:
|
||||
agent.fire_mock_alert()
|
||||
agent.run_once()
|
||||
time.sleep(args.interval)
|
||||
else:
|
||||
# 單發模式
|
||||
message_id = agent.fire_mock_alert(alert_name=args.alert)
|
||||
if not message_id:
|
||||
return 1
|
||||
|
||||
# 預設 --once(cron 用)
|
||||
agent.run_once()
|
||||
except KeyboardInterrupt:
|
||||
print("\n[STOP] Interrupted by user")
|
||||
|
||||
print("\n[STOP] Interrupted")
|
||||
finally:
|
||||
agent.close()
|
||||
|
||||
print("\n" + "=" * 70)
|
||||
print("Sensor Agent terminated")
|
||||
print("=" * 70)
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user