Files
awoooi/apps/api/src/main.py
Your Name 13cf02b740
All checks were successful
Code Review / ai-code-review (push) Successful in 10s
CD Pipeline / tests (push) Successful in 1m0s
CD Pipeline / build-and-deploy (push) Successful in 3m21s
CD Pipeline / post-deploy-checks (push) Successful in 1m16s
feat(governance): emit adr100 slo metrics
2026-05-14 18:57:03 +08:00

1061 lines
49 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
AWOOOI API - BFF Gateway
========================
ADR-005: BFF Architecture
ADR-006: AI Fallback Strategy
Four Iron Laws:
1. Async-First - All handlers are async def
2. CORS Whitelist - Strict origin control (NO wildcards)
3. Pydantic Config - Type-safe settings with validation
4. structlog - Structured JSON logging
Observability Stack:
- OpenTelemetry → SignOz (Traces + Logs + Metrics)
- Sentry SDK → Sentry Self-Hosted (Error Tracking + Stack Traces)
Version: 1.0.0
Date: 2026-03-20
"""
import asyncio
import os
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager
import sentry_sdk
import structlog
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, Response
from prometheus_client import CONTENT_TYPE_LATEST, generate_latest
from sentry_sdk.integrations.fastapi import FastApiIntegration
from sentry_sdk.integrations.starlette import StarletteIntegration
from src.api.v1 import agents as agents_v1 # Phase 9.5: Agent Teams API
from src.api.v1 import ai as ai_v1
from src.api.v1 import aider_events as aider_events_v1 # aider-watch v2 ADR-091
from src.api.v1 import ai_governance as ai_governance_v1 # 2026-05-02: /governance 頁面 3 endpoints
from src.api.v1 import ai_slo as ai_slo_v1 # Phase 6 ADR-087: AI SLO 自我治理
from src.api.v1 import aiops_kpi as aiops_kpi_v1 # ADR-090 § Phase 7 KPI Dashboard
from src.api.v1 import aiops_timeline as aiops_timeline_v1 # 2026-04-27 Wave8-X3 B4 timeline endpoint
from src.api.v1 import approvals as approvals_v1
from src.api.v1 import alert_operation_logs as alert_operation_logs_v1
from src.api.v1 import audit_logs as audit_logs_v1
from src.api.v1 import auto_repair as auto_repair_v1 # #8: 自動升級決策
from src.api.v1 import csrf as csrf_v1 # Phase 20: CSRF Protection
from src.api.v1 import dashboard as dashboard_v1
from src.api.v1 import errors as errors_v1 # #40: Sentry 錯誤 BFF API
from src.api.v1 import (
gitea_webhook as gitea_webhook_v1, # ADR-059: Gitea → OpenClaw (GitHub → Gitea 遷移)
)
# Import API routers
from src.api.v1 import health as health_v1
from src.api.v1 import incidents as incidents_v1 # Phase 6.4: Decision Proposal
from src.api.v1 import knowledge as knowledge_v1 # KB Phase 1: Knowledge Base
from src.api.v1 import learning as learning_v1 # Phase D-G P0: Learning API
from src.api.v1 import metrics as metrics_v1 # Phase 7: Gold Metrics (真實血脈)
from src.api.v1 import playbooks as playbooks_v1 # #7: Playbook 萃取
from src.api.v1 import proposals as proposals_v1 # Phase 6.4h: Proposals CRUD API
from src.api.v1 import (
sentry_webhook as sentry_webhook_v1, # Phase 10.2.1: Sentry → Telegram
)
from src.api.v1 import (
signoz_webhook as signoz_webhook_v1, # Phase 21: SignOz → Telegram (ADR-037)
)
from src.api.v1 import drift as drift_v1 # Phase 25 P2: Config Drift Detection
from src.api.v1 import platform as platform_v1 # AwoooP Phase 4: Platform ShellShadow Mode
from src.api.v1 import rag as rag_v1 # Phase 33 ADR-067: RAG 知識庫
from src.api.v1 import monitoring as monitoring_v1 # 2026-04-03: 監控工具狀態
from src.api.v1 import notifications as notifications_v1 # 2026-04-10: 通知頻道狀態
from src.api.v1 import stats as stats_v1 # Phase 6.5: Statistics Analytics
from src.api.v1 import telegram as telegram_v1 # Phase 5.4: Telegram Gateway
from src.api.v1 import telegram_webhook as telegram_webhook_v1 # ADR-094: Webhook入口
from src.api.v1 import terminal as terminal_v1 # Phase 19.1: Omni-Terminal SSE
from src.api.v1 import timeline as timeline_v1
from src.api.v1 import webhooks as webhooks_v1
from src.core.config import settings
from src.core.http_client import close_all_http_clients, init_all_http_clients
from src.core.logging import get_logger, setup_logging
from src.core.redis_client import close_redis_pool, init_redis_pool
from src.core.sse import get_publisher
from src.core.telemetry import setup_telemetry, shutdown_telemetry
from src.services.adr100_slo_metrics_service import get_adr100_slo_metrics_service
from src.services.flywheel_stats_service import get_flywheel_stats_service
# CTO-201: Database & Executor
from src.db.base import close_db, init_db
# Phase 6.4g: lewooogo-brain 積木路由
from src.routers import proposals as proposals_router
# Legacy route imports (to be migrated)
from src.routes import agent, notifications, pipelines, plugins
from src.services.executor import close_executor
# Phase 5: OpenClaw AI Engine
from src.services.openclaw import close_openclaw
from src.services.telegram_gateway import get_telegram_gateway
# Phase 6.1: Event Bus (Signal Worker)
from src.workers import close_signal_worker, init_signal_worker
# =============================================================================
# Initialize Logging (MUST be first)
# =============================================================================
setup_logging()
logger = get_logger("awoooi.api")
# =============================================================================
# Sentry SDK Initialization (Error Tracking - 補強 SignOz)
# Self-Hosted @ 192.168.0.110
# 分工: Sentry 專注 Error TrackingSignOz 專注 Traces/Logs/Metrics
# Phase 15.3: Deep Linking - 注入 OTEL trace_id 供 SignOz 關聯
# =============================================================================
SENTRY_DSN = os.getenv("SENTRY_DSN")
def _sentry_before_send(event, hint): # noqa: ARG001 - hint is Sentry callback signature
"""
Phase 15.3: Sentry → SignOz Deep Linking
在每個 Sentry event 中注入 OTEL trace_id
讓 Sentry 錯誤能直接連結到 SignOz Trace。
"""
try:
from src.core.deep_linking import DeepLinking
from src.core.telemetry import get_current_trace_id
trace_id = get_current_trace_id()
if trace_id:
# 注入 trace_id 到 tags (Sentry UI 可搜尋)
if "tags" not in event:
event["tags"] = {}
event["tags"]["otel_trace_id"] = trace_id
event["tags"]["signoz_trace_url"] = DeepLinking.signoz_trace_url(trace_id)
# 注入到 contexts (詳情頁顯示)
if "contexts" not in event:
event["contexts"] = {}
event["contexts"]["signoz"] = {
"trace_id": trace_id,
"trace_url": DeepLinking.signoz_trace_url(trace_id),
"service": "awoooi-api",
}
except Exception:
# Deep Linking 失敗不應影響錯誤上報
pass
return event
if SENTRY_DSN:
sentry_sdk.init(
dsn=SENTRY_DSN,
environment=settings.ENVIRONMENT,
release=f"awoooi-api@{settings.VERSION}",
# 效能監控取樣率 (生產環境降低)
traces_sample_rate=0.1 if settings.ENVIRONMENT == "production" else 1.0,
# FastAPI 深度整合
integrations=[
FastApiIntegration(transaction_style="endpoint"),
StarletteIntegration(transaction_style="endpoint"),
],
# 忽略常見的非錯誤
ignore_errors=[
ConnectionRefusedError,
TimeoutError,
],
# 只在生產環境發送
send_default_pii=False,
# Phase 15.3: Deep Linking hook
before_send=_sentry_before_send,
)
# 2026-04-05 Claude Code: 加入統一標籤,對齊 Prometheus/auto_repair layer 規範
sentry_sdk.set_tag("layer", "k8s")
sentry_sdk.set_tag("component", "api")
sentry_sdk.set_tag("host", "k8s-awoooi-prod")
sentry_sdk.set_tag("team", "backend")
logger.info("sentry_initialized", dsn=SENTRY_DSN.split("@")[-1])
else:
logger.info("sentry_disabled", reason="SENTRY_DSN not configured")
# =============================================================================
# Application Lifespan
# =============================================================================
@asynccontextmanager
async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]:
"""Application lifespan events"""
# AwoooP Phase 2.4 (2026-05-04 ogt): 設定 startup handler 的 project_id context
# asyncio.create_task() 自動繼承父任務的 ContextVar → 31 個 background loop 全部標記為 awoooi
from src.core.context import PROJECT_ID
PROJECT_ID.set("awoooi")
# Startup
logger.info(
"api_startup",
version=settings.VERSION,
environment=settings.ENVIRONMENT,
mock_mode=settings.MOCK_MODE,
cors_origins=settings.CORS_ORIGINS,
ai_fallback_order=settings.AI_FALLBACK_ORDER,
four_hosts=settings.four_hosts,
kubeconfig=settings.KUBECONFIG_PATH,
)
# CTO-201: Initialize PostgreSQL database (統帥鐵律: 禁止 SQLite)
await init_db()
db_url = settings.DATABASE_URL
logger.info(
"database_initialized", url=db_url.split("@")[-1] if "@" in db_url else db_url
)
# Phase 5: Initialize HTTP Clients (ClickHouse, Ollama)
# 統帥鐵律: 連線池在啟動時建立,關閉時回收
await init_all_http_clients()
logger.info("http_clients_initialized")
# Phase 6.1.1: Initialize Redis Pool (Multi-Sig 狀態持久化)
# 統帥鐵律: Redis 連線池在 Lifespan 啟動時建立
await init_redis_pool()
logger.info("redis_pool_initialized", url=settings.REDIS_URL.split("@")[-1])
# Start SSE publisher
publisher = await get_publisher()
logger.info("sse_publisher_initialized")
# Phase 5: Telegram Gateway 初始化
# 2026-03-23 架構修正: AWOOOI API 不做 Long Polling
# 原因: 同一個 Bot Token 只能有一個 Long Polling 實例
# OpenClaw (192.168.0.188) 是唯一的 Polling 實例
# AWOOOI API 只負責發送訊息,不接收
telegram_gw = get_telegram_gateway()
if settings.TELEGRAM_ENABLE_POLLING:
await telegram_gw.start_long_polling()
logger.info("telegram_long_polling_started")
else:
logger.info("telegram_polling_disabled", reason="OpenClaw 是唯一 Polling 實例")
# ADR-015: MCP Provider 註冊 (DI 模式)
from src.plugins.mcp.providers import register_all_providers
register_all_providers()
logger.info("mcp_providers_registered")
# ADR-081 Phase 1: MCPToolRegistry 初始化PreDecisionInvestigator 感官工具)
# 2026-04-16 ogt + Claude Sonnet 4.6: 修復 sensors=0/0 根因 — init 從未在 startup 被呼叫
try:
from src.services.mcp_tool_registry import init_mcp_tool_registry
await init_mcp_tool_registry()
logger.info("mcp_tool_registry_initialized")
except Exception as e:
logger.warning("mcp_tool_registry_init_failed", error=str(e))
# Phase 6.5: Telegram 心跳監控(每 30 分鐘發送到 SRE 戰情室群組)
# 2026-04-16 ogt + Claude Sonnet 4.6: 恢復 — 使用者確認必須繼續在 SRE 戰情室發送
# 上次停用原因forwarded_to_separate_group有誤群組就是 SRE_GROUP_CHAT_ID
if settings.OPENCLAW_TG_BOT_TOKEN:
await telegram_gw.start_heartbeat_monitor(
heartbeat_interval_minutes=30,
silence_threshold_hours=2,
)
logger.info("telegram_heartbeat_monitor_started", interval_minutes=30)
else:
logger.warning("telegram_heartbeat_monitor_skipped", reason="OPENCLAW_TG_BOT_TOKEN not set")
# Reboot Recovery: Warm-up Redis Working Memory from PostgreSQL
# 2026-04-05 ogt: 重開機後 Redis 清空,從 DB restore 未解決的 incidents
# 統帥批准: 數據必須長久記錄,重開機後自動恢復 Working Memory
try:
from src.services.incident_service import get_incident_service
from src.db.base import get_db_context
from src.db.models import IncidentRecord
from sqlalchemy import select
incident_service = get_incident_service()
async with get_db_context() as db:
result = await db.execute(
select(IncidentRecord).where(
IncidentRecord.status.in_(["investigating", "mitigating"])
)
)
records = result.scalars().all()
restored = 0
for record in records:
try:
from src.models.incident import Incident
incident = Incident(
incident_id=record.incident_id,
status=record.status,
severity=record.severity,
signals=record.signals or [],
affected_services=record.affected_services or [],
decision_chain=record.decision_chain,
proposal_ids=record.proposal_ids or [],
outcome=record.outcome,
created_at=record.created_at,
updated_at=record.updated_at,
resolved_at=record.resolved_at,
closed_at=record.closed_at,
ttl_days=record.ttl_days,
vectorized=record.vectorized,
# ADR-073: 分類欄位必須還原,否則 KM 寫入時全為 "unknown"
notification_type=record.notification_type,
alert_category=record.alert_category,
)
if await incident_service.save_to_working_memory(incident):
restored += 1
except Exception:
# 舊資料 source 值不合法node-exporter 等)→ 跳過
pass
logger.info("working_memory_warmed_up", restored=restored, total=len(records))
except Exception as e:
logger.warning("working_memory_warmup_failed", error=str(e))
# ADR-088: Trust Score 冷啟動 — 從 PostgreSQL 恢復信任分數
# 解決: Pod 重啟後 TrustScoreManager 記憶歸零AI 永遠無法累積到 L4 自動放行
# 2026-04-17 ogt + Claude Sonnet 4.6(亞太): Phase 4 信任持久化
try:
from src.repositories.trust_repository import get_trust_repository
from src.services.trust_engine import get_trust_manager
trust_records = await get_trust_repository().load_all()
loaded = get_trust_manager().bulk_load(trust_records)
logger.info("trust_scores_warmed_up", loaded=loaded)
except Exception as e:
logger.warning("trust_scores_warmup_failed", error=str(e))
# Phase 4 飛輪修復: Playbook Embedding 冷啟動索引
# 2026-04-10 Claude Sonnet 4.6 Asia/Taipei
# 目的: 確保 playbook_embeddings 表有最新向量,供語義相似度查詢
# 使用 asyncio.create_task 非阻塞 — 不影響 API 啟動速度
# Phase ADR-068 2026-04-10: 從 alert_rules.yaml seed Playbook冪等
# 必須在 embedding indexing 之前,確保 playbook 表有資料
try:
from src.services.playbook_seed_service import seed_playbooks_from_rules
asyncio.create_task(seed_playbooks_from_rules())
logger.info("playbook_seed_scheduled")
except Exception as e:
logger.warning("playbook_seed_schedule_failed", error=str(e))
# Phase 3.5 ADR-085: Playbook Redis → PG 補寫(一次性遷移 + 啟動時冪等補救)
# 確保 Redis 中存在但 PG 中缺少的 Playbook 不因 TTL 消失而永久丟失
# 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 3.5 AI 學習成果持久化
try:
from src.repositories.playbook_repository import get_playbook_repository
asyncio.create_task(get_playbook_repository().backfill_redis_to_pg())
logger.info("playbook_pg_backfill_scheduled")
except Exception as e:
logger.warning("playbook_pg_backfill_schedule_failed", error=str(e))
try:
from src.services.playbook_embedding_service import ensure_playbook_embeddings_indexed
asyncio.create_task(ensure_playbook_embeddings_indexed())
logger.info("playbook_embedding_indexing_scheduled")
except Exception as e:
logger.warning("playbook_embedding_schedule_failed", error=str(e))
# Phase 6.1: 啟動 Signal Worker (Redis Streams Consumer)
# 統帥鐵律: Event Bus 解耦告警接收與處理
await init_signal_worker()
logger.info("signal_worker_initialized")
# BUG-005 修復 2026-04-11: 啟動時掃描 Redis 中所有 state=ready 但未送 Telegram 的 token
# dedup TTL 10 分鐘過期後ready decisions 就沒有補送機制 → 長期卡在 ready 無人審核
try:
from src.services.decision_manager import get_decision_manager
asyncio.create_task(get_decision_manager().resend_stale_ready_tokens())
logger.info("stale_ready_tokens_resend_scheduled")
except Exception as e:
logger.warning("stale_ready_tokens_resend_schedule_failed", error=str(e))
# 2026-04-16 Claude Sonnet 4.6: 自動 AI 分析 Sweeper每 90 秒)
# 修復核心 GapSignal Worker 創建 Incident 後無人觸發 AI 分析
# 除非有人呼叫 GET /api/v1/incidents否則 Incident 永遠沒有決策
# Sweeper 定期掃描無 decision token 的 INVESTIGATING incidents → 背景觸發
try:
from src.jobs.incident_analysis_sweeper import run_incident_analysis_sweeper
asyncio.create_task(run_incident_analysis_sweeper())
logger.info("incident_analysis_sweeper_scheduled", interval_sec=90)
except Exception as e:
logger.warning("incident_analysis_sweeper_schedule_failed", error=str(e))
# ADR-090 § 資產盤點 cron (2026-04-19 ogt + Claude Opus 4.7 Asia/Taipei)
# 每 1 小時掃 K8s pods → 寫 asset_inventory + asset_discovery_run + 7 維 coverage
# 解開 8 張 0 writer 表的第一個 (asset_inventory / asset_discovery_run / asset_coverage_snapshot)
try:
from src.jobs.asset_scanner_job import run_asset_scanner_loop
asyncio.create_task(run_asset_scanner_loop())
logger.info("asset_scanner_loop_scheduled", interval_sec=3600)
except Exception as e:
logger.warning("asset_scanner_loop_schedule_failed", error=str(e))
# ADR-090 § Rule Catalog Sync (2026-04-19 ogt + Claude Opus 4.7 Asia/Taipei)
# 每 1 小時從 Prometheus /api/v1/rules 拉 active rules → UPSERT alert_rule_catalog
# 解鎖 E3 Hermes 自動建規則: AI 需要 alert_rule_catalog 作為 baseline 才能提案修正
try:
from src.jobs.rule_catalog_sync_job import run_rule_catalog_sync_loop
asyncio.create_task(run_rule_catalog_sync_loop())
logger.info("rule_catalog_sync_loop_scheduled", interval_sec=3600)
except Exception as e:
logger.warning("rule_catalog_sync_loop_schedule_failed", error=str(e))
# ADR-090 § Phase 4 NemoTron 容量巡檢 MVP (2026-04-19 ogt + Claude Opus 4.7 Asia/Taipei)
# 每日 02:00 Taipei 撈 Prometheus node_exporter → 寫 host_capacity_snapshot + violations
# 解鎖: Phase 4 Holt-Winters 預測有歷史資料 / 容量趨勢分析
try:
from src.jobs.capacity_scanner_job import run_capacity_scanner_loop
asyncio.create_task(run_capacity_scanner_loop())
logger.info("capacity_scanner_loop_scheduled", daily_trigger_hour_taipei=2)
except Exception as e:
logger.warning("capacity_scanner_loop_schedule_failed", error=str(e))
# ADR-090 § 合規掃描 MVP (2026-04-19 ogt + Claude Opus 4.7 Asia/Taipei)
# 每日 03:00 Taipei 遍歷 asset_inventory → 寫 7 維 asset_compliance_snapshot
# MVP: secret_rotated 真實檢查,其他 6 維占位 'unknown',後續 agent 補
try:
from src.jobs.compliance_scanner_job import run_compliance_scanner_loop
asyncio.create_task(run_compliance_scanner_loop())
logger.info("compliance_scanner_loop_scheduled", daily_trigger_hour_taipei=3)
except Exception as e:
logger.warning("compliance_scanner_loop_schedule_failed", error=str(e))
# aider-watch v2 processor (2026-04-20 ADR-091)
# 消費 signals:aider:events stream → 建 incident + 寫 aider_events 表
try:
from src.workers.aider_event_processor import run_aider_event_processor_loop
logger.info("aider_event_processor: starting Redis stream consumer")
asyncio.create_task(run_aider_event_processor_loop())
except Exception as e:
logger.warning("aider_event_processor_schedule_failed", error=str(e))
# ADR-090 § Coverage Evaluator (2026-04-19 ogt + Claude Opus 4.7 Asia/Taipei)
# 每 1h 把 asset_coverage_snapshot 從 'unknown' 升級成 green/yellow/red
# 依據: Prometheus targets / alert_rule_catalog labels / knowledge_entries 覆蓋
try:
from src.jobs.coverage_evaluator_job import run_coverage_evaluator_loop
asyncio.create_task(run_coverage_evaluator_loop())
logger.info("coverage_evaluator_loop_scheduled", interval_sec=3600)
except Exception as e:
logger.warning("coverage_evaluator_loop_schedule_failed", error=str(e))
# ADR-090 § Rule Stats Updater (2026-04-19 ogt + Claude Opus 4.7 Asia/Taipei)
# 每 1h 從 incidents + approval_records 計算 rule 統計
# 解鎖 E3 Hermes: noise_rate > 0.5 的 rule 可被 AI 提案 deprecate
try:
from src.jobs.rule_stats_updater_job import run_rule_stats_updater_loop
asyncio.create_task(run_rule_stats_updater_loop())
logger.info("rule_stats_updater_loop_scheduled", interval_sec=3600)
except Exception as e:
logger.warning("rule_stats_updater_loop_schedule_failed", error=str(e))
# ADR-090 § Asset Change Tracker (2026-04-19 ogt + Claude Opus 4.7 Asia/Taipei)
# 每 1h 比對最近兩次 asset_discovery_run,寫 asset_change_event
# 解鎖: 資產變化歷史 (added/removed/lifecycle_changed),AI 可追蹤集群演進
try:
from src.jobs.asset_change_tracker_job import run_asset_change_tracker_loop
asyncio.create_task(run_asset_change_tracker_loop())
logger.info("asset_change_tracker_loop_scheduled", interval_sec=3600)
except Exception as e:
logger.warning("asset_change_tracker_loop_schedule_failed", error=str(e))
# ADR-090 § Hermes Rule Quality Advisor (2026-04-19 ogt + Claude Opus 4.7 Asia/Taipei)
# 每日 04:00 Taipei 分析 alert_rule_catalog.noise_rate,對高噪音規則推 Telegram 建議
# 統帥鐵律: AI 只推建議不自動改 review_status,人工決策 deprecate
try:
from src.jobs.hermes_rule_quality_job import run_hermes_rule_quality_loop
asyncio.create_task(run_hermes_rule_quality_loop())
logger.info("hermes_rule_quality_loop_scheduled", daily_trigger_hour_taipei=4)
except Exception as e:
logger.warning("hermes_rule_quality_loop_schedule_failed", error=str(e))
# ADR-090 § Phase 4 Capacity Forecaster (2026-04-19 ogt + Claude Opus 4.7 Asia/Taipei)
# 每日 05:00 Taipei 用 Prometheus predict_linear 預測未來 7d disk/mem/cpu 飽和
# 高風險 host 寫 aol(capacity_recommendation) + Telegram 建議
try:
from src.jobs.capacity_forecaster_job import run_capacity_forecaster_loop
asyncio.create_task(run_capacity_forecaster_loop())
logger.info("capacity_forecaster_loop_scheduled", daily_trigger_hour_taipei=5)
except Exception as e:
logger.warning("capacity_forecaster_loop_schedule_failed", error=str(e))
# ADR-076 Task 4: 每日 08:00 台北時間自動日度巡檢報告
# 2026-04-14 Claude Haiku 4.5 Asia/Taipei
try:
from src.services.report_generation_service import run_daily_report_loop
asyncio.create_task(run_daily_report_loop())
logger.info("daily_report_loop_scheduled", trigger_hour_taipei=8)
except Exception as e:
logger.warning("daily_report_loop_schedule_failed", error=str(e))
# ADR-073 P2 修復 2026-04-15: 逾期 Approval 自動結案(每小時)
# 確保 PENDING approval 超過 48h 後觸發 resolve_incident → KM 學習鏈閉環
try:
from src.jobs.approval_timeout_resolver import run_approval_timeout_resolver
asyncio.create_task(run_approval_timeout_resolver())
logger.info("approval_timeout_resolver_scheduled", interval_sec=3600)
except Exception as e:
logger.warning("approval_timeout_resolver_schedule_failed", error=str(e))
# ADR-083 Phase 3: Evolver Agent每日— Playbook 自動合併 + 低信任封存
# 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 3 初始建立
try:
from src.services.playbook_evolver import run_evolver_loop
asyncio.create_task(run_evolver_loop())
logger.info("evolver_loop_scheduled", interval_sec=86400)
except Exception as e:
logger.warning("evolver_loop_schedule_failed", error=str(e))
# ADR-104 T2: LLM Playbook DRAFT governance每小時
try:
from src.jobs.playbook_generation_governance_job import run_playbook_generation_governance_loop
asyncio.create_task(run_playbook_generation_governance_loop())
logger.info(
"playbook_generation_governance_loop_scheduled",
interval_sec=settings.PLAYBOOK_DRAFT_GOVERNANCE_INTERVAL_SECONDS,
)
except Exception as e:
logger.warning("playbook_generation_governance_loop_schedule_failed", error=str(e))
# ADR-083 Phase 3: 知識遺忘 Job每日— 30d 未引用 KB entry 標記 archived
# 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 3 初始建立
try:
from src.jobs.knowledge_decay_job import run_knowledge_decay_loop
asyncio.create_task(run_knowledge_decay_loop())
logger.info("knowledge_decay_loop_scheduled", interval_sec=86400)
except Exception as e:
logger.warning("knowledge_decay_loop_schedule_failed", error=str(e))
# C1 P1-1 2026-04-28 ogt + Claude Sonnet 4.6: KM Backfill Reconciler每 5 分鐘)
# 補救 _backfill_path_a_approval 失敗寫入 km:backfill:dlq 的 Path A related_approval_id 回填
# Feature Flag: ENABLE_KM_BACKFILL_RECONCILER=false 停用(回滾用)
try:
from src.jobs.km_backfill_reconciler_job import run_km_backfill_reconciler_loop
asyncio.create_task(run_km_backfill_reconciler_loop())
logger.info("km_backfill_reconciler_loop_scheduled", interval_sec=300)
except Exception as e:
logger.warning("km_backfill_reconciler_loop_schedule_failed", error=str(e))
# W2 PR-R2 2026-04-28 ogt + Claude Sonnet 4.6: AOL → alert_rule_catalog EWMA Writeback每 1 小時)
# 飛輪斷鏈 C2 修復automation_operation_log 執行結果回灌 alert_rule_catalog.confidence
# Feature Flag: ENABLE_AOL_WRITEBACK_JOB=false 預設停用(人工驗證後再開)
# ADR-091 Task T2
try:
from src.jobs.aol_to_catalog_writeback_job import run_aol_writeback_loop
asyncio.create_task(run_aol_writeback_loop())
logger.info("aol_to_catalog_writeback_loop_scheduled", interval_sec=3600)
except Exception as e:
logger.warning("aol_to_catalog_writeback_loop_schedule_failed", error=str(e))
# ADR-087 Phase 6: KB 腐爛清理(月度)— 每月 1 號 03:00 台北時間
# 掃描 knowledge_entries 中腐爛條目(廢棄 K8s API / Prometheus pattern / 180d 未引用)
# 2026-04-27 P3.1-T3 by Claude
try:
from src.utils.timezone import now_taipei
async def _run_kb_rot_cleaner_loop() -> None:
from src.jobs.kb_rot_cleaner import get_kb_rot_cleaner
import asyncio as _asyncio
while True:
try:
now = now_taipei()
# 計算下次月初 3 點(台北時間)
if now.day == 1 and now.hour < 3:
next_run = now.replace(hour=3, minute=0, second=0, microsecond=0)
elif now.month == 12:
next_run = now.replace(
year=now.year + 1, month=1, day=1,
hour=3, minute=0, second=0, microsecond=0,
)
else:
next_run = now.replace(
month=now.month + 1, day=1,
hour=3, minute=0, second=0, microsecond=0,
)
sleep_sec = (next_run - now).total_seconds()
logger.info("kb_rot_cleaner_next_run", next_run=next_run.isoformat(), sleep_sec=int(sleep_sec))
await _asyncio.sleep(sleep_sec)
try:
result = await get_kb_rot_cleaner().run()
logger.info("kb_rot_cleaner_completed", stale_count=result.stale_count, total=result.total_scanned)
except Exception as _e:
logger.exception("kb_rot_cleaner_failed", error=str(_e))
except _asyncio.CancelledError:
break
except Exception as _e:
logger.exception("kb_rot_cleaner_loop_error", error=str(_e))
await _asyncio.sleep(3600) # 1h 後重試
asyncio.create_task(_run_kb_rot_cleaner_loop())
logger.info("kb_rot_cleaner_loop_scheduled", trigger="monthly_day1_03h_taipei")
except Exception as e:
logger.warning("kb_rot_cleaner_loop_schedule_failed", error=str(e))
# ADR-083 Phase 3: Fine-tune JSONL 匯出(每週)— EvidenceSnapshot × AgentSession → JSONL
# 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 3 初始建立
try:
from src.services.finetune_exporter import run_finetune_export_loop
asyncio.create_task(run_finetune_export_loop())
logger.info("finetune_export_loop_scheduled", interval_sec=604800)
except Exception as e:
logger.warning("finetune_export_loop_schedule_failed", error=str(e))
# Phase 4 ADR-084: 主動巡檢每 5 分鐘執行一次
# 協調 DynamicBaselineService + LogAnomalyDetector + TrendPredictor
# Shadow Mode 控制AIOPS_P4_SHADOW_MODE=True 時只記錄,不觸發 Alert
# 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 4 初始建立
try:
from src.services.proactive_inspector import run_proactive_inspector_loop
asyncio.create_task(run_proactive_inspector_loop())
logger.info("proactive_inspector_loop_scheduled", interval_sec=300)
except Exception as e:
logger.warning("proactive_inspector_schedule_failed", error=str(e))
# ADR-087 Phase 6: 離線回放(每 7 天)— 決策一致率基線
# 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 6 初始建立
try:
from src.jobs.offline_replay_service import run_offline_replay_loop
asyncio.create_task(run_offline_replay_loop())
logger.info("offline_replay_loop_scheduled", interval_sec=604800)
except Exception as e:
logger.warning("offline_replay_loop_schedule_failed", error=str(e))
# ADR-092 (2026-04-20 ogt + Claude Opus 4.7): AI SLO Watchdog — 每 15 分鐘自健診
# MASTER §1.1系統必須能感知自身故障W-1 SLO + W-2 Telegram靜默 + W-3 飛輪成功率
try:
from src.jobs.ai_slo_watchdog_job import run_ai_slo_watchdog_loop
asyncio.create_task(run_ai_slo_watchdog_loop())
logger.info("ai_slo_watchdog_scheduled", interval_sec=900)
except Exception as e:
logger.warning("ai_slo_watchdog_schedule_failed", error=str(e))
# 2026-04-26 P2.2 by Claude — GovernanceAgent 4 項自檢(每 1 小時)
# MASTER P2.2trust_drift / knowledge_degradation / llm_hallucination / execution_blast_radius
try:
from src.services.governance_agent import run_governance_loop
asyncio.create_task(run_governance_loop())
logger.info("governance_agent_scheduled", interval_sec=3600)
except Exception as e:
logger.warning("governance_agent_schedule_failed", error=str(e))
# 2026-05-03 ogt + Claude Sonnet 4.6(亞太): GovernanceDispatcher Wave 2E每 30s poll
try:
from src.services.governance_dispatcher import run_governance_dispatcher_loop
asyncio.create_task(run_governance_dispatcher_loop())
logger.info("governance_dispatcher_scheduled", interval_sec=30)
except Exception as e:
logger.warning("governance_dispatcher_schedule_failed", error=str(e))
# 2026-04-25 P1.2 by Claude Engineer-A2 — failover 整合到 ai_router + lifespan
# OllamaFailoverManager + OllamaAutoRecoveryService 飛輪接線:
# failover 切換時 → recovery_callback → set_current_primary → Redis 持久化
# recovery service 每 30s 檢查 → 111 連續 3 次 HEALTHY → 自動切回 → clear_cache
# 順序:先取 singleton → wire callback → 啟動 recovery service才能接收 callback
try:
from src.services.ollama_failover_manager import get_ollama_failover_manager
from src.services.ollama_auto_recovery import get_ollama_auto_recovery_service
_failover_mgr = get_ollama_failover_manager()
_recovery_svc = get_ollama_auto_recovery_service()
# wire callbackfailover 切換時通知 recovery service 更新 current_primary
_failover_mgr.set_recovery_callback(_recovery_svc.set_current_primary)
# 2026-04-26 critic-H3 hotfix by Claude Opus 4.7 — alerter 必須在 recovery 啟動前注入
# 原順序start() 後才注入 → recovery bootstrap immediate-check 若觸發 alert_recovery
# alerter 還沒注入 Redis → dedup fail-open告警會送出且無 dedup 保護(重複告警風險)
# 修法configure_alerter() 提前到 start() 之前Redis pool 在 lifespan 早期已就緒
try:
from src.services.failover_alerter import configure_alerter
from src.core.redis_client import get_redis
configure_alerter(get_redis())
logger.info("failover_alerter_configured")
except Exception as _alerter_err:
logger.warning("failover_alerter_configure_failed", error=str(_alerter_err))
# 啟動 recovery service從 Redis bootstrap current_primary並啟動背景監控
await _recovery_svc.start()
logger.info("ollama_failover_system_started")
except Exception as e:
logger.warning("ollama_failover_system_start_failed", error=str(e))
# 2026-04-27 P3.2.2 by Claude — AI Provider 版本追蹤(每 1 小時)
# 探測 5 Providerollama/ollama_local/gemini/claude/openclaw_nemo版本
# 寫入 ai_provider_version_history版本變更時 log warningP3.2.3 alerter 後續整合
try:
async def _run_model_version_tracker_loop() -> None:
from src.services.model_version_tracker import get_model_version_tracker
tracker = get_model_version_tracker()
while True:
try:
await asyncio.sleep(3600) # 每 1 小時
result = await tracker.run_probe_cycle()
logger.info(
"model_version_probe_cycle_done",
probed=result["probed"],
changed=result["changed"],
)
except asyncio.CancelledError:
break
except Exception as _loop_err:
logger.exception("model_version_tracker_loop_error", error=str(_loop_err))
await asyncio.sleep(60) # 錯誤後 1 分鐘重試
asyncio.create_task(_run_model_version_tracker_loop())
logger.info("model_version_tracker_scheduled", interval_sec=3600)
except Exception as e:
logger.warning("model_version_tracker_schedule_failed", error=str(e))
# AwoooP Phase 4 (2026-05-04 ogt + Claude Sonnet 4.6): Platform WorkerShadow Mode Shell
# ADR-106 Strangler Fig Phase 4SKIP LOCKED run worker + stale run reaper
# Shadow modeis_shadow=True0 user-visible response0 destructive tool call
try:
from src.workers.platform_worker import start_platform_worker
await start_platform_worker()
logger.info("platform_worker_started", mode="shadow")
except Exception as e:
logger.warning("platform_worker_start_failed", error=str(e))
yield
# Shutdown
# 2026-04-25 P1.2 by Claude Engineer-A2 — 優雅關閉 Ollama failover 背景監控
# 必須在 Redis pool 關閉之前停止recovery service 可能仍在寫 Redis
try:
from src.services.ollama_auto_recovery import get_ollama_auto_recovery_service
await get_ollama_auto_recovery_service().stop()
logger.info("ollama_failover_system_stopped")
except Exception as e:
logger.warning("ollama_failover_system_stop_failed", error=str(e))
# 2026-04-27 Wave8-X3 by Claude — B25/B26 drain fix
# K8s rolling restart等待 auto_repair fire-and-forget tasks 完成後再關閉
# 確保 _verify_and_learn / runbook_generator 寫入不被 SIGTERM cancel
try:
from src.services.auto_repair_service import get_auto_repair_service
_repair_svc = get_auto_repair_service()
if hasattr(_repair_svc, "drain_pending_tasks"):
_drain_result = await _repair_svc.drain_pending_tasks(timeout=60.0)
logger.info("auto_repair_drain_complete", **_drain_result)
except Exception as e:
logger.warning("auto_repair_drain_failed", error=str(e))
# AwoooP Phase 4: Platform Worker 優雅停機2026-05-04 ogt
try:
from src.workers.platform_worker import stop_platform_worker
await stop_platform_worker()
logger.info("platform_worker_stopped")
except Exception as e:
logger.warning("platform_worker_stop_failed", error=str(e))
# Phase 6.1: 關閉 Signal Worker (先關閉 Consumer)
await close_signal_worker()
await publisher.stop()
await close_executor()
await close_openclaw()
# Phase 5.4: Close Telegram Gateway
telegram_gw = get_telegram_gateway()
await telegram_gw.close()
# Phase 33: Close RAG Service httpx client (ADR-067)
from src.services.knowledge_rag_service import get_knowledge_rag_service
await get_knowledge_rag_service().close()
# Phase 5: Close HTTP Clients (統帥鐵律: 連線池回收)
await close_all_http_clients()
# Phase 6.1.1: Close Redis Pool (統帥鐵律: Redis 連線池回收)
await close_redis_pool()
await close_db()
shutdown_telemetry()
logger.info("api_shutdown", version=settings.VERSION)
# =============================================================================
# FastAPI Application
# =============================================================================
app = FastAPI(
title="AWOOOI API",
description="AWOOOI 智能運維平台 API - 由 leWOOOgo Engine 驅動",
version=settings.VERSION,
docs_url="/api/v1/docs",
redoc_url="/api/v1/redoc",
openapi_url="/api/v1/openapi.json",
lifespan=lifespan,
)
# =============================================================================
# OpenTelemetry Instrumentation (可觀測性鐵律)
# 必須在 Middleware 之前初始化,確保追蹤完整性
# 優雅降級: 失敗不影響 API 啟動
# =============================================================================
otel_enabled = setup_telemetry(app)
if otel_enabled:
logger.info(
"otel_initialized",
service=settings.OTEL_SERVICE_NAME,
endpoint=settings.OTEL_EXPORTER_OTLP_ENDPOINT,
)
else:
logger.warning("otel_disabled", reason="initialization failed or disabled")
# =============================================================================
# Middleware
# =============================================================================
# 2026-04-03 ogt: Nginx 反向代理修正 — 讓 FastAPI 信任 X-Forwarded-Proto
# 解決問題: /api/v1/knowledge (無結尾斜線) 307 redirect 產生 http:// Location
# 原因: FastAPI 不知道自己在 HTTPS 後面redirect 回 http://
# 效果: 有了此中間件307 Location 會是 https://
from uvicorn.middleware.proxy_headers import ProxyHeadersMiddleware
app.add_middleware(ProxyHeadersMiddleware, trusted_hosts="*")
# CORS - Strict Whitelist (Iron Law #2)
# NO wildcards, NO UAT
app.add_middleware(
CORSMiddleware,
allow_origins=settings.CORS_ORIGINS,
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE", "PATCH"],
allow_headers=["Authorization", "Content-Type", "X-Request-ID"],
expose_headers=["X-Request-ID"],
)
@app.middleware("http")
async def request_logging_middleware(request: Request, call_next):
"""
Structured request logging middleware
Logs every request with:
- Request ID (from header or generated)
- HTTP method and path
- Response status code
- Request duration
"""
import time
request_id = request.headers.get("X-Request-ID", "-")
start_time = time.perf_counter()
# Bind request context for all logs in this request
structlog.contextvars.clear_contextvars()
structlog.contextvars.bind_contextvars(
request_id=request_id,
method=request.method,
path=request.url.path,
)
log = get_logger("awoooi.http")
log.debug("request_start")
response = await call_next(request)
duration_ms = (time.perf_counter() - start_time) * 1000
log.info(
"request_complete",
status_code=response.status_code,
duration_ms=round(duration_ms, 2),
)
# Add request ID to response headers
response.headers["X-Request-ID"] = request_id
return response
# =============================================================================
# Exception Handlers
# =============================================================================
@app.exception_handler(Exception)
async def global_exception_handler(_request: Request, exc: Exception) -> JSONResponse:
"""
Global exception handler with structured logging + Sentry
Catches all unhandled exceptions and returns a safe error response.
Full exception details are logged but not exposed to clients.
Sentry SDK 會自動捕獲並發送到 Self-Hosted Server。
"""
# Sentry 自動捕獲 (如果已初始化)
sentry_sdk.capture_exception(exc)
log = get_logger("awoooi.error")
log.exception(
"unhandled_exception",
exc_type=type(exc).__name__,
exc_message=str(exc),
)
return JSONResponse(
status_code=500,
content={
"code": "INTERNAL_ERROR",
"message": "An internal error occurred",
},
)
# =============================================================================
# API Routers - Path-based routing (/api/v1/*)
# =============================================================================
# New v1 API routes
app.include_router(health_v1.router, prefix="/api/v1", tags=["Health"])
app.include_router(csrf_v1.router, prefix="/api/v1", tags=["Security"]) # Phase 20
app.include_router(dashboard_v1.router, prefix="/api/v1", tags=["Dashboard"])
app.include_router(approvals_v1.router, prefix="/api/v1", tags=["HITL Approvals"])
app.include_router(ai_v1.router, prefix="/api/v1", tags=["AI Decision"])
app.include_router(ai_governance_v1.router, prefix="/api/v1", tags=["AI Governance"]) # 2026-05-02: /governance 頁面
app.include_router(ai_slo_v1.router, prefix="/api/v1", tags=["AI SLO"]) # Phase 6 ADR-087
app.include_router(aiops_kpi_v1.router, prefix="/api/v1", tags=["AIOps KPI"]) # ADR-090 § Phase 7 Dashboard
app.include_router(aiops_timeline_v1.router, prefix="/api/v1", tags=["AIOps Timeline"]) # 2026-04-27 Wave8-X3 B4
app.include_router(webhooks_v1.router, prefix="/api/v1", tags=["Webhooks"])
app.include_router(timeline_v1.router, prefix="/api/v1", tags=["Timeline"])
app.include_router(audit_logs_v1.router, prefix="/api/v1", tags=["Audit Logs"])
# 2026-04-09 Claude Sonnet 4.6: alert_operation_log 查詢 API (Sprint 5.2)
app.include_router(alert_operation_logs_v1.router, prefix="/api/v1", tags=["Alert Operation Logs"])
app.include_router(
aider_events_v1.router,
prefix="/api/v1",
tags=["Aider Watch"],
) # aider-watch v2 ADR-091
app.include_router(
telegram_v1.router, prefix="/api/v1", tags=["Telegram Gateway"]
) # Phase 5.4
app.include_router(
telegram_webhook_v1.router, prefix="/api/v1", tags=["Telegram Webhook"]
) # ADR-094: Webhook 入口WS4 Hermes NL 預留)
app.include_router(
metrics_v1.router, prefix="/api/v1", tags=["Gold Metrics"]
) # Phase 7: 真實血脈
app.include_router(
incidents_v1.router, prefix="/api/v1", tags=["Incidents"]
) # Phase 6.4: Decision Proposal
app.include_router(
proposals_v1.router, prefix="/api/v1", tags=["Proposals"]
) # Phase 6.4h: Proposals CRUD
app.include_router(
agents_v1.router, prefix="/api/v1", tags=["Agent Teams"]
) # Phase 9.5: Agent Teams
app.include_router(
stats_v1.router, prefix="/api/v1", tags=["Statistics"]
) # Phase 6.5: Statistics Analytics
app.include_router(
monitoring_v1.router, prefix="/api/v1", tags=["Monitoring"]
) # 2026-04-03: 監控工具狀態
app.include_router(
gitea_webhook_v1.router, prefix="/api/v1", tags=["Gitea Webhook"]
) # ADR-059: Gitea → OpenClaw
app.include_router(
playbooks_v1.router, prefix="/api/v1", tags=["Playbooks"]
) # #7: Playbook 萃取
app.include_router(
auto_repair_v1.router, prefix="/api/v1", tags=["Auto Repair"]
) # #8: 自動升級決策
app.include_router(
drift_v1.router, prefix="/api/v1", tags=["Drift Detection"]
) # Phase 25 P2: Config Drift Detection
app.include_router(
rag_v1.router, prefix="/api/v1", tags=["RAG Knowledge Base"]
) # Phase 33 ADR-067: RAG 知識庫
app.include_router(
errors_v1.router, prefix="/api/v1", tags=["Errors"]
) # #40: Sentry 錯誤 BFF API
app.include_router(
sentry_webhook_v1.router, prefix="/api/v1", tags=["Sentry Webhook"]
) # Phase 10.2.1: Sentry → Telegram
app.include_router(
signoz_webhook_v1.router, prefix="/api/v1", tags=["SignOz Webhook"]
) # Phase 21: SignOz → Telegram (ADR-037)
app.include_router(
notifications_v1.router, prefix="/api/v1", tags=["Notifications"]
) # 2026-04-10: 通知頻道狀態
app.include_router(
terminal_v1.router, prefix="/api/v1", tags=["Omni-Terminal"]
) # Phase 19.1: Omni-Terminal SSE
app.include_router(
learning_v1.router, prefix="/api/v1", tags=["Learning"]
) # Phase D-G P0: 學習系統 API
app.include_router(
knowledge_v1.router, prefix="/api/v1", tags=["Knowledge Base"]
) # KB Phase 1: Knowledge Base API
app.include_router(
proposals_router.router, tags=["Proposals (Legacy)"]
) # Phase 6.4g: lewooogo-brain (舊版)
# Legacy routes (to be migrated to api/v1/)
app.include_router(plugins.router, prefix="/api/v1/plugins", tags=["Plugins"])
app.include_router(pipelines.router, prefix="/api/v1/pipelines", tags=["Pipelines"])
app.include_router(agent.router, prefix="/api/v1/agent", tags=["Agent"])
app.include_router(
notifications.router, prefix="/api/v1/notifications", tags=["Notifications"]
)
# AwoooP Phase 4 (2026-05-04 ogt): Platform Shell — Shadow Mode Run API
app.include_router(platform_v1.router, prefix="/api/v1/platform", tags=["AwoooP Platform"])
# =============================================================================
# Prometheus Metrics Endpoint
# =============================================================================
# 2026-03-31 ogt: 暴露 Prometheus 指標供告警系統使用
@app.get("/metrics", include_in_schema=False)
async def prometheus_metrics() -> Response:
"""Prometheus metrics endpoint for alerting"""
content = generate_latest().decode("utf-8")
# 2026-05-07 ogt + Claude Sonnet 4.6 — INC-20260507-99ADF2 修復
# 飛輪指標awoooi_flywheel_*)原本只在 /api/v1/stats/flywheel/metrics 暴露,
# 110 Prom awoooi-api job scrape /metrics 時抓不到 → FlywheelExecutionRateMissing 永久 firing
# 修法:在此串入飛輪指標,讓既有 scrape job 無需新增 job 即可抓到
try:
flywheel_metrics = await get_flywheel_stats_service().compute()
content += flywheel_metrics.to_prometheus_lines()
except Exception:
logger.warning("prometheus_metrics_flywheel_error")
# 2026-05-14 Codex — T18 ADR-100 SLO emitter
# GovernanceAgent 讀 Prometheus recording rules若 /metrics 不吐底層 DB totals
# sli:* rules 會全空並每小時重複發 governance_slo_data_gap。
try:
content += await get_adr100_slo_metrics_service().to_prometheus_lines()
except Exception as exc:
logger.warning("prometheus_metrics_adr100_slo_error", error=str(exc))
return Response(content=content, media_type=CONTENT_TYPE_LATEST)
# =============================================================================
# Root Endpoint
# =============================================================================
@app.get("/", include_in_schema=False)
async def root() -> dict:
"""Root endpoint with API info"""
return {
"name": "AWOOOI API",
"version": settings.VERSION,
"environment": settings.ENVIRONMENT,
"docs": "/api/v1/docs",
"health": "/api/v1/health",
"dashboard": "/api/v1/dashboard",
"stream": "/api/v1/dashboard/stream",
}
# =============================================================================
# Entry Point
# =============================================================================
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"src.main:app",
host="0.0.0.0",
port=8000,
reload=settings.DEBUG,
log_level=settings.LOG_LEVEL.lower(),
)