Files
awoooi/apps/sensor/agent.py
OG T 2a2dac865a feat(api): 統一使用台北時區 UTC+8 (禁止 UTC)
- 新增 src/utils/timezone.py 時區工具函式
- 修改 11 個後端檔案,全部改用 now_taipei()
- 更新 HARD_RULES.md 加入時區鐵律章節
- 更新 Skills 02/04 加入時區禁令

🔴 HARD RULE: 禁止 datetime.utcnow() / datetime.now(UTC)
 正確做法: from src.utils.timezone import now_taipei

Memory: feedback_timezone_taipei.md

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-03-25 09:08:34 +08:00

306 lines
8.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
AWOOOI Sensor Agent - Phase 6.5 神經末梢
=========================================
極度輕量的告警採集代理,部署於各主機。
唯一職責:採集本地告警 → 無腦轉發至 188 基地 Event Bus
設計鐵律:
- 嚴禁 Incident/GraphRAG 邏輯 (防腦分裂)
- 零依賴 AWOOOI 核心資料庫
- 純 Python + Redis 即可運行
使用方式:
# 設定環境變數
export AWOOOI_REDIS_URL="redis://192.168.68.188:6379/0"
# 執行代理 (發送模擬告警)
python agent.py
# 持續監控模式 (每 30 秒發送一次)
python agent.py --loop --interval 30
Version: 1.0.0
Date: 2026-03-22
"""
import argparse
import json
import os
import random
import socket
import sys
import time
from datetime import datetime, timedelta, timezone
# 台北時區 (UTC+8) - 系統統一時區
TAIPEI_TZ = timezone(timedelta(hours=8))
from typing import Any
from uuid import uuid4
# ============================================================================
# 唯一外部依賴redis-py (pip install redis)
# ============================================================================
try:
import redis
except ImportError:
print("[FATAL] redis-py not installed. Run: pip install redis")
sys.exit(1)
# ============================================================================
# 常量定義
# ============================================================================
STREAM_NAME = "stream:awoooi_signals"
DEFAULT_REDIS_URL = "redis://192.168.68.188:6379/0"
# 模擬告警模板 (實際部署時會讀取 Prometheus/Alertmanager)
MOCK_ALERTS = [
{
"alert_name": "PodCrashLoopBackOff",
"severity": "critical",
"source": "prometheus",
"namespace": "production",
"target": "payment-service",
},
{
"alert_name": "HighLatencyP99",
"severity": "warning",
"source": "prometheus",
"namespace": "production",
"target": "api-gateway",
},
{
"alert_name": "HighErrorRate",
"severity": "critical",
"source": "prometheus",
"namespace": "staging",
"target": "order-service",
},
{
"alert_name": "MemoryPressure",
"severity": "warning",
"source": "node-exporter",
"namespace": "infra",
"target": "k3s-worker-01",
},
{
"alert_name": "FINAL_PHASE_6_TEST",
"severity": "critical",
"source": "sensor-agent",
"namespace": "production",
"target": "awoooi-brain",
},
]
# ============================================================================
# Sensor Agent Core
# ============================================================================
class SensorAgent:
"""
神經末梢 - 極簡告警採集代理
職責:
1. 採集本地告警 (或模擬生成)
2. 格式化為標準 Signal
3. 透過 Redis XADD 打入 188 基地 Event Bus
嚴禁邏輯:
- Incident 聚合 (由 188 大腦負責)
- GraphRAG 分析 (由 188 大腦負責)
- 任何決策邏輯 (由 188 大腦負責)
"""
def __init__(self, redis_url: str | None = None) -> None:
self.redis_url = redis_url or os.getenv("AWOOOI_REDIS_URL", DEFAULT_REDIS_URL)
self.hostname = socket.gethostname()
self.sensor_id = f"sensor-{self.hostname}"
self._redis: redis.Redis | None = None
def connect(self) -> bool:
"""連線至 188 基地 Redis"""
try:
self._redis = redis.from_url(
self.redis_url,
decode_responses=True,
socket_connect_timeout=5,
)
# 測試連線
self._redis.ping()
print(f"[OK] Connected to 188 Event Bus: {self._mask_url(self.redis_url)}")
return True
except redis.ConnectionError as e:
print(f"[FATAL] Cannot connect to 188 Event Bus: {e}")
return False
def _mask_url(self, url: str) -> str:
"""遮蔽密碼"""
if "@" in url:
parts = url.split("@")
return f"redis://***@{parts[-1]}"
return url
def send_signal(self, alert: dict[str, Any]) -> str | None:
"""
發送單一 Signal 至 Event Bus
無腦轉發邏輯:
1. 補齊必要欄位 (fingerprint, timestamp, sensor_id)
2. 直接 XADD 到 stream:awoooi_signals
3. 返回 message_id 或 None
Args:
alert: 告警字典 (至少需 alert_name, severity, source)
Returns:
Redis Stream message ID or None
"""
if not self._redis:
print("[ERROR] Not connected to Redis")
return None
# 建立標準 Signal 格式
now = datetime.now(TAIPEI_TZ)
signal = {
"alert_name": alert.get("alert_name", "UnknownAlert"),
"severity": alert.get("severity", "warning"),
"source": alert.get("source", "sensor-agent"),
"namespace": alert.get("namespace", "default"),
"target": alert.get("target", "unknown"),
"fingerprint": alert.get("fingerprint", f"fp_{uuid4().hex[:12]}"),
"labels": json.dumps(alert.get("labels", {"sensor_id": self.sensor_id})),
"annotations": json.dumps(alert.get("annotations", {})),
"received_at": now.isoformat(),
"sensor_id": self.sensor_id,
"sensor_host": self.hostname,
}
try:
# 無腦 XADD - 直接打入 188 基地
message_id = self._redis.xadd(STREAM_NAME, signal)
return message_id
except redis.RedisError as e:
print(f"[ERROR] XADD failed: {e}")
return None
def fire_mock_alert(self, alert_name: str | None = None) -> str | None:
"""
發射模擬告警 (測試用)
Args:
alert_name: 指定告警名稱,或隨機選擇
Returns:
message_id or None
"""
if alert_name:
# 尋找指定告警
alert = next(
(a for a in MOCK_ALERTS if a["alert_name"] == alert_name),
MOCK_ALERTS[-1], # 預設使用 FINAL_PHASE_6_TEST
)
else:
alert = random.choice(MOCK_ALERTS)
print(f"\n[FIRE] Sending alert: {alert['alert_name']}")
print(f" Severity: {alert['severity']}")
print(f" Target: {alert['namespace']}/{alert['target']}")
print(f" Sensor: {self.sensor_id}")
message_id = self.send_signal(alert)
if message_id:
print(f"[OK] Signal delivered to 188 Event Bus")
print(f" Stream: {STREAM_NAME}")
print(f" Message ID: {message_id}")
else:
print(f"[FAIL] Signal delivery failed!")
return message_id
def close(self) -> None:
"""關閉連線"""
if self._redis:
self._redis.close()
print("[OK] Disconnected from 188 Event Bus")
# ============================================================================
# CLI Entry Point
# ============================================================================
def main() -> int:
parser = argparse.ArgumentParser(
description="AWOOOI Sensor Agent - 神經末梢告警採集代理"
)
parser.add_argument(
"--alert",
type=str,
default="FINAL_PHASE_6_TEST",
help="告警名稱 (預設: FINAL_PHASE_6_TEST)",
)
parser.add_argument(
"--loop",
action="store_true",
help="持續監控模式",
)
parser.add_argument(
"--interval",
type=int,
default=30,
help="監控間隔秒數 (預設: 30)",
)
parser.add_argument(
"--redis-url",
type=str,
help="Redis URL (預設讀取 AWOOOI_REDIS_URL 環境變數)",
)
args = parser.parse_args()
print("=" * 70)
print("AWOOOI Sensor Agent - Phase 6.5 神經末梢")
print("=" * 70)
print(f"Time: {datetime.now(TAIPEI_TZ).isoformat()}")
print(f"Host: {socket.gethostname()}")
print()
# 初始化 Agent
agent = SensorAgent(redis_url=args.redis_url)
if not agent.connect():
return 1
try:
if args.loop:
# 持續監控模式
print(f"\n[LOOP] Continuous mode: sending random alert every {args.interval}s")
print("[LOOP] Press Ctrl+C to stop\n")
while True:
agent.fire_mock_alert()
time.sleep(args.interval)
else:
# 單發模式
message_id = agent.fire_mock_alert(alert_name=args.alert)
if not message_id:
return 1
except KeyboardInterrupt:
print("\n[STOP] Interrupted by user")
finally:
agent.close()
print("\n" + "=" * 70)
print("Sensor Agent terminated")
print("=" * 70)
return 0
if __name__ == "__main__":
sys.exit(main())