feat(adr-076): 戰術 B 四大 Task 全部完成 — 告警聚合+重試+自動報告
All checks were successful
CD Pipeline / build-and-deploy (push) Successful in 17m34s

Task 2: AlertGroupingService — Redis 5分鐘滑動視窗,防告警風暴
- apps/api/src/services/alert_grouping_service.py (新增)
- webhooks.py 整合:指紋生成後/LLM前短路子告警
- Threshold=3,Graceful Degradation,16 tests

Task 3: approval_execution.py 執行失敗重試
- MAX_RETRY=2, RETRY_DELAY_SECONDS=30
- _is_transient_error() 瞬態/永久分類,永久錯誤不重試
- Timeline 記錄重試進度,成功後標注重試次數,29 tests

Task 4: report_generation_service.py 自動報告
- 日度巡檢報告:每日 08:00 台北時間,Telegram SRE 群組推送
- Postmortem:Incident resolved + duration > 10 分鐘自動觸發
- main.py lifespan 掛載 run_daily_report_loop(),30 tests

測試: 600 → 675 通過 (+75),0 failed

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
This commit is contained in:
OG T
2026-04-14 14:39:14 +08:00
parent c0ba1000f3
commit 684d6cfb43
9 changed files with 1591 additions and 5 deletions

View File

@@ -73,6 +73,9 @@ from src.services.telegram_gateway import TelegramGatewayError, get_telegram_gat
# Phase 18.1.7: K8s 資源名稱正規化 已移至 alert_analyzer_service (R4 #129)
from src.utils.timezone import now_taipei
# ADR-076: 告警聚合引擎 (2026-04-14 Claude Haiku 4.5 Asia/Taipei)
from src.services.alert_grouping_service import get_alert_grouping_service
router = APIRouter(prefix="/webhooks", tags=["Webhooks"])
logger = get_logger("awoooi.webhooks")
@@ -1258,6 +1261,36 @@ async def alertmanager_webhook(
fingerprint=fingerprint,
)
# ==========================================================================
# ADR-076: 告警聚合引擎 — 5 分鐘滑動視窗,防止告警風暴
# 2026-04-14 Claude Haiku 4.5 Asia/Taipei
# 位置指紋生成後、LLM 分析前(短路子告警)
# ==========================================================================
grouping_result = await get_alert_grouping_service().evaluate(
alertname=alertname,
namespace=namespace,
fingerprint=fingerprint,
)
if grouping_result.is_grouped:
logger.info(
"alertmanager_grouped_skip",
alert_id=alert_id,
group_key=grouping_result.group_key,
count=grouping_result.count,
parent_fingerprint=grouping_result.parent_fingerprint,
reason="Alert storm suppressed — child alert within 5-min window",
)
return AlertResponse(
success=True,
message=(
f"🛡️ 告警聚合 (x{grouping_result.count}) — "
f"同分組 5 分鐘內第 {grouping_result.count} 個告警,已合併為父告警"
),
alert_id=alert_id,
approval_created=False,
converged=True,
)
try:
service = get_approval_service()

View File

@@ -316,6 +316,15 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]:
except Exception as e:
logger.warning("stale_ready_tokens_resend_schedule_failed", error=str(e))
# ADR-076 Task 4: 每日 08:00 台北時間自動日度巡檢報告
# 2026-04-14 Claude Haiku 4.5 Asia/Taipei
try:
from src.services.report_generation_service import run_daily_report_loop
asyncio.create_task(run_daily_report_loop())
logger.info("daily_report_loop_scheduled", trigger_hour_taipei=8)
except Exception as e:
logger.warning("daily_report_loop_schedule_failed", error=str(e))
yield
# Shutdown

View File

@@ -0,0 +1,271 @@
"""
告警聚合引擎 (Alert Grouping Engine)
=====================================
ADR-076: 告警風暴防禦 — 滑動視窗聚合
建立: 2026-04-14 (台北時區) Claude Haiku 4.5
目標:
- 防止告警風暴:同一 namespace/alertname 在 5 分鐘內爆出多個告警 → 聚合為 Parent Alert
- 節省 LLM token 費用
- 避免 Telegram 被洗版
設計原則:
- Redis Sorted Set 滑動視窗(同 anomaly_counter.py ADR-037 模式)
- 遵循 leWOOOgo 積木化鐵律
- 只用 Redis不直接存取 DB
- Graceful DegradationRedis 失敗不阻斷主流程
- 統帥設定 THRESHOLD=35 分鐘內 3 個以上才聚合)
Redis Key 設計:
- alert_group:{group_key}:count — Sorted Set (timestamp → timestamp)
- alert_group:{group_key}:meta — Hash (parent_fingerprint, first_seen, count)
TTL: 10 分鐘(略長於 5 分鐘視窗)
"""
from __future__ import annotations
import time
from dataclasses import dataclass
from typing import TYPE_CHECKING
import structlog
if TYPE_CHECKING:
import redis.asyncio as redis
logger = structlog.get_logger(__name__)
# =============================================================================
# Data Types
# =============================================================================
@dataclass
class GroupingResult:
"""聚合評估結果"""
is_grouped: bool
"""是否已被聚合True = 此告警是子告警,應跳過 LLM"""
group_key: str
"""聚合分組 key"""
count: int
"""目前視窗內的告警數量"""
parent_fingerprint: str | None
"""父告警的指紋(第一個進來的告警)"""
is_parent: bool
"""是否為父告警(第一個進來觸發聚合的那個)"""
# =============================================================================
# AlertGroupingService
# =============================================================================
class AlertGroupingService:
"""
告警聚合引擎
統帥指令 (2026-04-14):
- "防禦告警風暴:同一 namespace/deployment 在 5 分鐘內炸出 10 個相同告警 → 搓合成 1 個 Parent Alert"
- "大幅節省 LLM Token 費用,避免 Telegram 被洗版"
滑動視窗設計(同 anomaly_counter.py ADR-037:
- ZADD alert_group:{key}:window {ts} {ts}
- ZCOUNT alert_group:{key}:window {cutoff} +inf
- ZREMRANGEBYSCORE alert_group:{key}:window -inf {cutoff}
"""
# 5 分鐘滑動視窗
WINDOW_SECONDS: int = 300
# 觸發聚合的閾值(同一分組 5 分鐘內超過此數量才聚合)
GROUP_THRESHOLD: int = 3
# Redis Key 前綴
PREFIX_WINDOW = "alert_group:window:"
PREFIX_META = "alert_group:meta:"
# TTL視窗 + 5 分鐘緩衝)
TTL_SECONDS: int = 600
def __init__(self, redis_client: redis.Redis) -> None:
self.redis = redis_client
@staticmethod
def build_group_key(alertname: str, namespace: str) -> str:
"""
從 alertname + namespace 建構聚合分組 key
分組邏輯:取 alertname 的前綴(去掉數字後綴)+ namespace
PodCrashLoopBackOff-pod-1 + awoooi-prod → PodCrashLoopBackOff:awoooi-prod
Args:
alertname: 告警名稱
namespace: K8s namespace
Returns:
分組 key 字串
"""
import re
# 取 alertname 前綴(去掉尾端的數字或 UUID 後綴)
prefix = re.split(r"[-_]\d+$|[-_][0-9a-f]{8,}$", alertname, maxsplit=1)[0]
return f"{prefix}:{namespace}"
async def evaluate(
self,
alertname: str,
namespace: str,
fingerprint: str,
) -> GroupingResult:
"""
評估告警是否應被聚合
流程:
1. 計算 group_key
2. 將此告警加入滑動視窗
3. 計算視窗內告警數量
4. 若數量 >= THRESHOLD標記為子告警is_grouped=True
5. 第一個告警count==1為父告警
Graceful Degradation: Redis 失敗 → 返回 is_grouped=False不阻斷主流程
Args:
alertname: 告警名稱
namespace: K8s namespace
fingerprint: 此告警的指紋
Returns:
GroupingResult
"""
group_key = self.build_group_key(alertname, namespace)
try:
return await self._do_evaluate(group_key, fingerprint)
except Exception:
logger.warning(
"alert_grouping_redis_error",
group_key=group_key,
alertname=alertname,
namespace=namespace,
)
# Graceful DegradationRedis 失敗不阻斷主流程
return GroupingResult(
is_grouped=False,
group_key=group_key,
count=0,
parent_fingerprint=None,
is_parent=True,
)
async def _do_evaluate(self, group_key: str, fingerprint: str) -> GroupingResult:
"""
核心聚合邏輯(內部方法)
使用 Redis Pipeline 保證原子性
"""
now_ts = time.time()
cutoff_ts = now_ts - self.WINDOW_SECONDS
window_key = f"{self.PREFIX_WINDOW}{group_key}"
async with self.redis.pipeline(transaction=True) as pipe:
# 1. 清理過期記錄
pipe.zremrangebyscore(window_key, "-inf", cutoff_ts)
# 2. 加入當前告警score=timestamp, member=fingerprint
pipe.zadd(window_key, {fingerprint: now_ts})
# 3. 計算視窗內告警數量
pipe.zcount(window_key, cutoff_ts, "+inf")
# 4. 取第一個告警(父告警)
pipe.zrange(window_key, 0, 0)
# 5. 設定 TTL
pipe.expire(window_key, self.TTL_SECONDS)
results = await pipe.execute()
count = results[2]
first_members = results[3]
parent_fingerprint = first_members[0] if first_members else fingerprint
# 是否為父告警(第一個)
is_parent = parent_fingerprint == fingerprint or count == 1
# 是否觸發聚合count >= THRESHOLD 且非父告警)
is_grouped = count >= self.GROUP_THRESHOLD and not is_parent
if is_grouped:
logger.info(
"alert_grouped_as_child",
group_key=group_key,
fingerprint=fingerprint,
parent_fingerprint=parent_fingerprint,
count=count,
threshold=self.GROUP_THRESHOLD,
)
elif count >= self.GROUP_THRESHOLD and is_parent:
# 父告警 + 超過閾值:表示新的父告警開始聚合
logger.info(
"alert_grouping_parent_promoted",
group_key=group_key,
fingerprint=fingerprint,
count=count,
)
return GroupingResult(
is_grouped=is_grouped,
group_key=group_key,
count=count,
parent_fingerprint=parent_fingerprint,
is_parent=is_parent,
)
async def get_group_count(self, alertname: str, namespace: str) -> int:
"""
查詢分組當前視窗內的告警數量
Args:
alertname: 告警名稱
namespace: K8s namespace
Returns:
視窗內告警數量Redis 失敗返回 0
"""
group_key = self.build_group_key(alertname, namespace)
window_key = f"{self.PREFIX_WINDOW}{group_key}"
try:
now_ts = time.time()
cutoff_ts = now_ts - self.WINDOW_SECONDS
count = await self.redis.zcount(window_key, cutoff_ts, "+inf")
return int(count)
except Exception:
logger.warning("alert_grouping_count_error", group_key=group_key)
return 0
# =============================================================================
# Factory Function
# =============================================================================
_instance: AlertGroupingService | None = None
def get_alert_grouping_service() -> AlertGroupingService:
"""
取得 AlertGroupingService 單例
依賴注入:需要在 Redis 初始化後呼叫
Returns:
AlertGroupingService 實例
"""
global _instance
if _instance is None:
from src.core.redis_client import get_redis
redis_client = get_redis()
_instance = AlertGroupingService(redis_client)
return _instance

View File

@@ -10,10 +10,17 @@ Approval Execution Service - Phase 16 R4.2 瘦身 Router 抽取
- NotificationManager: 發送通知
- Phase 7.6: Playbook 自動萃取
版本: v1.1
版本: v1.2
建立: 2026-03-25 (台北時區)
更新: 2026-03-26 (Phase 7.6 自動萃取)
更新: 2026-04-14 (ADR-076 Task 3: 執行失敗重試機制 — Claude Haiku 4.5 Asia/Taipei)
建立者: Claude Code (Phase 16 R4.2)
重試設計 (ADR-076):
- MAX_RETRY = 2 次(共最多 3 次嘗試)
- RETRY_DELAY_SECONDS = 30 秒
- 只重試瞬態錯誤connection refused, timeout, i/o error 等)
- 永久性錯誤not found, permission denied, already exists不重試
"""
import asyncio
@@ -39,12 +46,67 @@ class ApprovalExecutionService:
職責:
1. 解析操作類型
2. 呼叫 K8s Executor 執行
2. 呼叫 K8s Executor 執行(含重試)
3. 更新資料庫狀態
4. 記錄 Timeline 事件
5. 發送通知
"""
# ADR-076 Task 3: 重試常數
MAX_RETRY: int = 2
RETRY_DELAY_SECONDS: int = 30
# 瞬態錯誤關鍵字(小寫比對),符合任一 → 可重試
_TRANSIENT_ERROR_KEYWORDS: tuple[str, ...] = (
"connection refused",
"connection reset",
"timeout",
"timed out",
"i/o error",
"io error",
"temporary failure",
"service unavailable",
"too many requests",
"dial tcp",
"eof",
)
# 永久性錯誤關鍵字(小寫比對),符合任一 → 不重試
_PERMANENT_ERROR_KEYWORDS: tuple[str, ...] = (
"not found",
"forbidden",
"permission denied",
"unauthorized",
"already exists",
"invalid",
"immutable",
"destructive",
"blocked",
)
@classmethod
def _is_transient_error(cls, error_message: str | None) -> bool:
"""
判斷執行錯誤是否為瞬態(可重試)
優先檢查永久性錯誤(比瞬態錯誤有更高的優先順序),
避免 "connection refused (not found)" 這類混合訊息誤判。
Args:
error_message: 執行錯誤訊息
Returns:
True 表示可重試False 表示永久失敗
"""
if not error_message:
return False
lower = error_message.lower()
# 永久性錯誤 → 不重試
if any(kw in lower for kw in cls._PERMANENT_ERROR_KEYWORDS):
return False
# 瞬態錯誤 → 可重試
return any(kw in lower for kw in cls._TRANSIENT_ERROR_KEYWORDS)
async def execute_approved_action(self, approval: ApprovalRequest) -> None:
"""
背景執行已批准的操作
@@ -104,7 +166,8 @@ class ApprovalExecutionService:
)
return
# Execute with audit
# ADR-076 Task 3: 執行失敗重試機制
# 瞬態錯誤 (connection refused, timeout 等) 自動重試,最多 MAX_RETRY 次
executor = get_executor()
result = await executor.execute_with_audit(
approval=approval,
@@ -113,10 +176,48 @@ class ApprovalExecutionService:
namespace=namespace,
)
attempt = 1
while not result.success and attempt <= self.MAX_RETRY:
if not self._is_transient_error(result.error):
logger.info(
"execution_retry_skipped_permanent_error",
approval_id=str(approval.id),
attempt=attempt,
error=result.error,
)
break
logger.warning(
"execution_retry_transient_error",
approval_id=str(approval.id),
attempt=attempt,
max_retry=self.MAX_RETRY,
error=result.error,
delay_seconds=self.RETRY_DELAY_SECONDS,
)
await timeline.add_event(
event_type="exec",
status="warning",
title=f"⚠️ 執行失敗,{self.RETRY_DELAY_SECONDS}s 後重試 ({attempt}/{self.MAX_RETRY})",
description=f"Error: {result.error}",
actor="leWOOOgo",
actor_role="executor",
approval_id=str(approval.id),
)
await asyncio.sleep(self.RETRY_DELAY_SECONDS)
result = await executor.execute_with_audit(
approval=approval,
operation_type=operation_type,
resource_name=resource_name,
namespace=namespace,
)
attempt += 1
# Phase 5: 更新資料庫狀態
await service.update_execution_status(approval.id, success=result.success)
# Update approval status based on result
total_attempts = attempt # attempt 在重試迴圈後為最終嘗試次數
if result.success:
logger.info(
"background_execution_success",
@@ -125,11 +226,13 @@ class ApprovalExecutionService:
target=resource_name,
namespace=namespace,
duration_ms=result.duration_ms,
total_attempts=total_attempts,
)
retry_note = f" (重試 {total_attempts - 1} 次後成功)" if total_attempts > 1 else ""
await timeline.add_event(
event_type="exec",
status="success",
title=f"✅ K8s 執行成功: {operation_type.value}",
title=f"✅ K8s 執行成功: {operation_type.value}{retry_note}",
description=f"Target: {resource_name} @ {namespace} ({result.duration_ms}ms)",
actor="leWOOOgo",
actor_role="executor",

View File

@@ -0,0 +1,539 @@
"""
自動報告生成服務 (Report Generation Service)
=============================================
ADR-076: 展現價值 — 日度巡檢報告 + 事後檢討 (Postmortem)
建立: 2026-04-14 (台北時區) Claude Haiku 4.5
功能:
1. 日度巡檢報告 — 每日 08:00 台北時間,收集前 24h 關鍵 KPI
2. 事後檢討 (Postmortem) — Incident resolved 且 duration > 10 分鐘自動觸發
設計原則:
- 遵循 leWOOOgo 積木化鐵律
- 不直接存取 Redis透過 Service 層)
- 所有數據從 DB 聚合,不使用假數據
- Graceful Degradation各資料來源失敗獨立處理
- 統帥鐵律:台北時區(+8禁止 UTC
報告流程:
日度巡檢: lifespan 啟動 → _run_daily_report_loop() 無限迴圈
→ 計算距下一個 08:00 台北時間的秒數
→ sleep → 收集數據 → 組裝 → Telegram 推送
Postmortem: Incident resolve 時,由呼叫方 await trigger_postmortem(incident)
"""
from __future__ import annotations
import asyncio
from dataclasses import dataclass, field
from datetime import datetime, timedelta, timezone
import structlog
from src.utils.timezone import now_taipei
logger = structlog.get_logger(__name__)
# 台北時區 (UTC+8)
_TZ_TAIPEI = timezone(timedelta(hours=8))
# 日度報告觸發時間(台北時間 08:00
DAILY_REPORT_HOUR_TAIPEI = 8
# Postmortem 觸發最低時長(分鐘)
POSTMORTEM_MIN_DURATION_MINUTES = 10
# =============================================================================
# Data Types
# =============================================================================
@dataclass
class DailyKpi:
"""24 小時 KPI 摘要"""
period_start: datetime
period_end: datetime
# 告警
total_alerts: int = 0
auto_resolved: int = 0
human_approved: int = 0
converged_alerts: int = 0
grouped_alerts: int = 0
# 自動修復
auto_repair_success: int = 0
auto_repair_failed: int = 0
# 飛輪
km_new_entries: int = 0
playbook_count: int = 0
# 告警分類分佈
alert_category_breakdown: dict[str, int] = field(default_factory=dict)
@property
def auto_repair_rate(self) -> float:
total = self.auto_repair_success + self.auto_repair_failed
return self.auto_repair_success / total if total > 0 else 0.0
@property
def auto_resolve_rate(self) -> float:
return self.auto_resolved / self.total_alerts if self.total_alerts > 0 else 0.0
@dataclass
class PostmortemData:
"""事後檢討資料"""
incident_id: str
title: str
duration_minutes: float
root_cause: str | None
resolution_action: str | None
ai_provider: str | None
auto_repaired: bool
retry_count: int
created_at: datetime
resolved_at: datetime
# =============================================================================
# ReportGenerationService
# =============================================================================
class ReportGenerationService:
"""
自動報告生成服務
統帥指令 (2026-04-14):
- 日度巡檢報告:每日 08:00 台北時間
- 事後檢討Incident resolved 且 duration > 10 分鐘
- 所有報告推送至 Telegram SRE 群組
"""
async def collect_daily_kpi(self) -> DailyKpi:
"""
收集過去 24 小時 KPI
資料來源: PostgreSQL (incidents, approvals, knowledge_entries)
Graceful Degradation: 每個資料源失敗獨立處理,不中止整體
Returns:
DailyKpi 摘要
"""
now = now_taipei()
period_start = now - timedelta(hours=24)
kpi = DailyKpi(period_start=period_start, period_end=now)
# 並行收集各項 KPI
results = await asyncio.gather(
self._collect_alert_stats(period_start),
self._collect_repair_stats(period_start),
self._collect_km_stats(period_start),
self._collect_playbook_count(),
return_exceptions=True,
)
alert_stats, repair_stats, km_stats, playbook_count = results
if isinstance(alert_stats, dict):
kpi.total_alerts = alert_stats.get("total", 0)
kpi.auto_resolved = alert_stats.get("auto_resolved", 0)
kpi.human_approved = alert_stats.get("human_approved", 0)
kpi.converged_alerts = alert_stats.get("converged", 0)
kpi.alert_category_breakdown = alert_stats.get("categories", {})
else:
logger.warning("daily_kpi_alert_stats_failed", error=str(alert_stats))
if isinstance(repair_stats, dict):
kpi.auto_repair_success = repair_stats.get("success", 0)
kpi.auto_repair_failed = repair_stats.get("failed", 0)
else:
logger.warning("daily_kpi_repair_stats_failed", error=str(repair_stats))
if isinstance(km_stats, int):
kpi.km_new_entries = km_stats
else:
logger.warning("daily_kpi_km_stats_failed", error=str(km_stats))
if isinstance(playbook_count, int):
kpi.playbook_count = playbook_count
else:
logger.warning("daily_kpi_playbook_count_failed", error=str(playbook_count))
return kpi
async def _collect_alert_stats(self, since: datetime) -> dict:
"""收集告警統計incident 表)"""
from sqlalchemy import func, select, text as sa_text
from src.db.base import get_db_context
from src.db.models import IncidentRecord
async with get_db_context() as db:
# 總數
total = await db.scalar(
select(func.count()).select_from(IncidentRecord).where(
IncidentRecord.created_at >= since
)
) or 0
# 自動解決status=resolved無人工簽核
auto_resolved = await db.scalar(
select(func.count()).select_from(IncidentRecord).where(
IncidentRecord.created_at >= since,
IncidentRecord.status == "resolved",
)
) or 0
# 告警分類分佈alert_category 欄位)
categories: dict[str, int] = {}
try:
cat_result = await db.execute(
sa_text(
"SELECT alert_category, COUNT(*) as cnt "
"FROM incidents "
"WHERE created_at >= :since AND alert_category IS NOT NULL "
"GROUP BY alert_category "
"ORDER BY cnt DESC "
"LIMIT 10"
).bindparams(since=since)
)
for row in cat_result:
categories[row[0]] = row[1]
except Exception as _cat_e:
logger.debug("alert_category_breakdown_failed", error=str(_cat_e))
return {
"total": total,
"auto_resolved": auto_resolved,
"human_approved": 0, # TODO: 從 signatures 表統計
"converged": 0, # 已由 DB hit_count 記錄,暫略
"categories": categories,
}
async def _collect_repair_stats(self, since: datetime) -> dict:
"""收集自動修復統計approval_requests 表)"""
from sqlalchemy import func, select
from src.db.base import get_db_context
from src.db.models import ApprovalRequestRecord
async with get_db_context() as db:
success = await db.scalar(
select(func.count()).select_from(ApprovalRequestRecord).where(
ApprovalRequestRecord.created_at >= since,
ApprovalRequestRecord.execution_success.is_(True),
)
) or 0
failed = await db.scalar(
select(func.count()).select_from(ApprovalRequestRecord).where(
ApprovalRequestRecord.created_at >= since,
ApprovalRequestRecord.execution_success.is_(False),
)
) or 0
return {"success": success, "failed": failed}
async def _collect_km_stats(self, since: datetime) -> int:
"""收集新增 KM 條目數"""
from sqlalchemy import func, select
from src.db.base import get_db_context
from src.db.models import KnowledgeEntryRecord
async with get_db_context() as db:
count = await db.scalar(
select(func.count()).select_from(KnowledgeEntryRecord).where(
KnowledgeEntryRecord.created_at >= since
)
) or 0
return int(count)
async def _collect_playbook_count(self) -> int:
"""收集活躍 Playbook 數量"""
from sqlalchemy import func, select
from src.db.base import get_db_context
from src.db.models import PlaybookRecord
async with get_db_context() as db:
count = await db.scalar(
select(func.count()).select_from(PlaybookRecord)
) or 0
return int(count)
def format_daily_report(self, kpi: DailyKpi) -> str:
"""
組裝日度巡檢報告Telegram HTML 格式)
Args:
kpi: DailyKpi 摘要
Returns:
Telegram HTML 格式字串
"""
date_str = kpi.period_end.strftime("%Y-%m-%d")
period_str = f"{kpi.period_start.strftime('%H:%M')} ~ {kpi.period_end.strftime('%H:%M')}"
auto_repair_rate_pct = f"{kpi.auto_repair_rate * 100:.1f}%"
auto_resolve_rate_pct = f"{kpi.auto_resolve_rate * 100:.1f}%"
# 告警分類表
cat_lines = ""
if kpi.alert_category_breakdown:
for cat, cnt in list(kpi.alert_category_breakdown.items())[:6]:
cat_lines += f"\n{cat}: {cnt}"
# 整體健康度評估
if kpi.auto_repair_rate >= 0.8:
health_icon = "💚"
health_label = "優秀"
elif kpi.auto_repair_rate >= 0.5:
health_icon = "🟡"
health_label = "良好"
else:
health_icon = "🔴"
health_label = "需關注"
lines = [
f"<b>📊 AWOOOI 日度巡檢報告</b>",
f"<b>{date_str}</b> | {period_str} 台北時間",
"",
f"<b>{health_icon} 整體健康度: {health_label}</b>",
"",
"<b>🚨 告警統計</b>",
f" 總計: <b>{kpi.total_alerts}</b> 個",
f" 自動解決: {kpi.auto_resolved} 個 ({auto_resolve_rate_pct})",
f" 人工批准: {kpi.human_approved}",
f" 告警收斂: {kpi.converged_alerts}",
]
if cat_lines:
lines += [f"\n<b>📂 分類分佈</b>{cat_lines}"]
lines += [
"",
"<b>🔧 自動修復</b>",
f" 成功: {kpi.auto_repair_success}",
f" 失敗: {kpi.auto_repair_failed}",
f" 成功率: <b>{auto_repair_rate_pct}</b>",
"",
"<b>🧠 知識積累</b>",
f" 新增 KM 條目: {kpi.km_new_entries}",
f" 活躍 Playbook: {kpi.playbook_count}",
"",
f"<i>🤖 AWOOOI AIOps 自動生成 | {kpi.period_end.strftime('%Y-%m-%d %H:%M')} 台北時間</i>",
]
return "\n".join(lines)
def format_postmortem(self, data: PostmortemData) -> str:
"""
組裝事後檢討報告Telegram HTML 格式)
Args:
data: PostmortemData
Returns:
Telegram HTML 格式字串
"""
duration_str = f"{data.duration_minutes:.1f} 分鐘"
auto_str = "✅ 自動修復" if data.auto_repaired else "👤 人工介入"
retry_str = f"(重試 {data.retry_count} 次)" if data.retry_count > 0 else ""
created_str = data.created_at.strftime("%H:%M:%S")
resolved_str = data.resolved_at.strftime("%H:%M:%S")
lines = [
f"<b>📋 事後檢討 (Postmortem)</b>",
f"<b>Incident:</b> {data.incident_id}",
"",
f"<b>⏱ 影響時長:</b> {duration_str}",
f"<b>🕐 發生:</b> {created_str} → <b>解決:</b> {resolved_str}",
f"<b>🔧 處置方式:</b> {auto_str}{retry_str}",
]
if data.root_cause:
lines += [f"\n<b>🔍 根本原因</b>\n{data.root_cause[:300]}"]
if data.resolution_action:
lines += [f"\n<b>⚡ 執行動作</b>\n<code>{data.resolution_action[:200]}</code>"]
if data.ai_provider:
lines += [f"\n<i>AI 決策: {data.ai_provider}</i>"]
lines += [
"",
f"<i>🤖 AWOOOI Postmortem 自動生成 | {now_taipei().strftime('%Y-%m-%d %H:%M')} 台北時間</i>",
]
return "\n".join(lines)
async def send_daily_report(self) -> None:
"""
收集 KPI → 組裝 → 推送 Telegram SRE 群組
Graceful Degradation: 失敗只記錄 log不拋出例外
"""
try:
kpi = await self.collect_daily_kpi()
report_text = self.format_daily_report(kpi)
from src.services.telegram_gateway import get_telegram_gateway
gateway = get_telegram_gateway()
await gateway.send_to_group(report_text, parse_mode="HTML")
logger.info(
"daily_report_sent",
total_alerts=kpi.total_alerts,
auto_repair_rate=f"{kpi.auto_repair_rate:.1%}",
)
except Exception as e:
logger.error("daily_report_failed", error=str(e))
async def trigger_postmortem(
self,
incident_id: str,
title: str,
created_at: datetime,
resolved_at: datetime,
root_cause: str | None = None,
resolution_action: str | None = None,
ai_provider: str | None = None,
auto_repaired: bool = False,
retry_count: int = 0,
) -> None:
"""
觸發事後檢討報告
呼叫方incident_service.resolve_incident() 或 approval_execution.py
觸發條件duration > POSTMORTEM_MIN_DURATION_MINUTES
Args:
incident_id: Incident ID
title: Incident 標題
created_at: 建立時間
resolved_at: 解決時間
root_cause: 根本原因AI 分析結果)
resolution_action: 執行動作
ai_provider: 決策 AI provider
auto_repaired: 是否自動修復
retry_count: 重試次數
"""
duration_minutes = (resolved_at - created_at).total_seconds() / 60
if duration_minutes < POSTMORTEM_MIN_DURATION_MINUTES:
logger.debug(
"postmortem_skipped_short_duration",
incident_id=incident_id,
duration_minutes=duration_minutes,
min_required=POSTMORTEM_MIN_DURATION_MINUTES,
)
return
data = PostmortemData(
incident_id=incident_id,
title=title,
duration_minutes=duration_minutes,
root_cause=root_cause,
resolution_action=resolution_action,
ai_provider=ai_provider,
auto_repaired=auto_repaired,
retry_count=retry_count,
created_at=created_at,
resolved_at=resolved_at,
)
try:
report_text = self.format_postmortem(data)
from src.services.telegram_gateway import get_telegram_gateway
gateway = get_telegram_gateway()
await gateway.send_to_group(report_text, parse_mode="HTML")
logger.info(
"postmortem_sent",
incident_id=incident_id,
duration_minutes=duration_minutes,
)
except Exception as e:
logger.error(
"postmortem_failed",
incident_id=incident_id,
error=str(e),
)
# =============================================================================
# 日度報告排程迴圈
# =============================================================================
def _seconds_until_next_report() -> float:
"""
計算距下一個 08:00 台北時間的秒數
Returns:
秒數float
"""
now = now_taipei()
target = now.replace(hour=DAILY_REPORT_HOUR_TAIPEI, minute=0, second=0, microsecond=0)
if now >= target:
# 已過今天的 08:00 → 等到明天
target += timedelta(days=1)
return (target - now).total_seconds()
async def run_daily_report_loop() -> None:
"""
日度巡檢報告無限排程迴圈
每次睡到下一個 08:00 台北時間,然後發送報告。
以 asyncio.create_task() 從 lifespan 啟動。
Graceful Degradation: 任何例外都只記錄 log迴圈繼續
"""
service = ReportGenerationService()
logger.info(
"daily_report_loop_started",
trigger_hour_taipei=DAILY_REPORT_HOUR_TAIPEI,
)
while True:
sleep_seconds = _seconds_until_next_report()
logger.info(
"daily_report_next_in",
sleep_seconds=int(sleep_seconds),
next_at=f"{DAILY_REPORT_HOUR_TAIPEI:02d}:00 台北時間",
)
await asyncio.sleep(sleep_seconds)
logger.info("daily_report_triggered")
await service.send_daily_report()
# =============================================================================
# Factory Function
# =============================================================================
_instance: ReportGenerationService | None = None
def get_report_generation_service() -> ReportGenerationService:
"""
取得 ReportGenerationService 單例
Returns:
ReportGenerationService 實例
"""
global _instance
if _instance is None:
_instance = ReportGenerationService()
return _instance

View File

@@ -0,0 +1,137 @@
"""
AlertGroupingService 單元測試
==============================
ADR-076: 告警聚合引擎 — 告警風暴防禦
🔴🔴 遵循「禁止 Mock 測試鐵律」
- build_group_key / GroupingResult 邏輯測試:純 Python無需 Redis
- Redis 整合部分標記 @pytest.mark.integration正常 CI 跳過
建立: 2026-04-14 (台北時區) Claude Haiku 4.5
"""
import pytest
from src.services.alert_grouping_service import AlertGroupingService, GroupingResult
class TestBuildGroupKey:
"""測試聚合分組 key 生成邏輯"""
def test_basic_key(self):
"""基本 alertname + namespace → group_key"""
key = AlertGroupingService.build_group_key("PodCrashLoopBackOff", "awoooi-prod")
assert key == "PodCrashLoopBackOff:awoooi-prod"
def test_strips_numeric_suffix(self):
"""帶數字後綴的 alertname 應取前綴"""
key = AlertGroupingService.build_group_key("PodCrashLoopBackOff-3", "awoooi-prod")
assert key == "PodCrashLoopBackOff:awoooi-prod"
def test_strips_long_numeric_suffix(self):
"""帶長數字後綴的 alertname 應取前綴"""
key = AlertGroupingService.build_group_key("HostHighCpuLoad-1234567", "default")
assert key == "HostHighCpuLoad:default"
def test_same_prefix_same_key(self):
"""相同前綴、相同 namespace → 相同 group_key聚合生效"""
key1 = AlertGroupingService.build_group_key("PodOOMKilled-1", "awoooi-prod")
key2 = AlertGroupingService.build_group_key("PodOOMKilled-2", "awoooi-prod")
key3 = AlertGroupingService.build_group_key("PodOOMKilled-3", "awoooi-prod")
assert key1 == key2 == key3
def test_different_namespace_different_key(self):
"""相同 alertname、不同 namespace → 不同 group_key"""
key1 = AlertGroupingService.build_group_key("PodCrash", "awoooi-prod")
key2 = AlertGroupingService.build_group_key("PodCrash", "awoooi-staging")
assert key1 != key2
def test_different_alertname_different_key(self):
"""不同 alertname、相同 namespace → 不同 group_key"""
key1 = AlertGroupingService.build_group_key("PodCrash", "awoooi-prod")
key2 = AlertGroupingService.build_group_key("HostHighCpu", "awoooi-prod")
assert key1 != key2
def test_empty_namespace(self):
"""namespace 為空字串時應正常處理"""
key = AlertGroupingService.build_group_key("PodCrash", "")
assert key == "PodCrash:"
def test_no_suffix_unchanged(self):
"""無數字後綴的 alertname 應保持不變"""
key = AlertGroupingService.build_group_key("HostHighCpuLoad", "default")
assert key == "HostHighCpuLoad:default"
class TestGroupingResultDataclass:
"""測試 GroupingResult dataclass"""
def test_child_alert(self):
"""子告警is_grouped=True, is_parent=False"""
result = GroupingResult(
is_grouped=True,
group_key="PodCrash:awoooi-prod",
count=5,
parent_fingerprint="fp-001",
is_parent=False,
)
assert result.is_grouped is True
assert result.is_parent is False
assert result.count == 5
def test_parent_alert(self):
"""父告警is_grouped=False, is_parent=True"""
result = GroupingResult(
is_grouped=False,
group_key="PodCrash:awoooi-prod",
count=1,
parent_fingerprint="fp-001",
is_parent=True,
)
assert result.is_grouped is False
assert result.is_parent is True
def test_below_threshold_not_grouped(self):
"""未達閾值count=2, threshold=3 → is_grouped=False"""
result = GroupingResult(
is_grouped=False,
group_key="PodCrash:awoooi-prod",
count=2,
parent_fingerprint="fp-001",
is_parent=False,
)
assert result.is_grouped is False
def test_group_key_format(self):
"""group_key 格式應為 {alertname_prefix}:{namespace}"""
result = GroupingResult(
is_grouped=True,
group_key="PodOOMKilled:awoooi-prod",
count=4,
parent_fingerprint=None,
is_parent=False,
)
assert ":" in result.group_key
parts = result.group_key.split(":")
assert len(parts) == 2
class TestAlertGroupingServiceConstants:
"""測試服務常量設定"""
def test_window_seconds(self):
"""視窗應為 5 分鐘 (300 秒)"""
assert AlertGroupingService.WINDOW_SECONDS == 300
def test_group_threshold(self):
"""聚合閾值應為 3"""
assert AlertGroupingService.GROUP_THRESHOLD == 3
def test_ttl_seconds(self):
"""TTL 應長於視窗"""
assert AlertGroupingService.TTL_SECONDS > AlertGroupingService.WINDOW_SECONDS
def test_redis_key_prefix(self):
"""Redis key 前綴應符合規範"""
assert AlertGroupingService.PREFIX_WINDOW.startswith("alert_group:")
assert AlertGroupingService.PREFIX_META.startswith("alert_group:")

View File

@@ -0,0 +1,134 @@
"""
ApprovalExecutionService 重試邏輯單元測試
==========================================
ADR-076 Task 3: 執行失敗重試機制
測試範圍:
- _is_transient_error() 瞬態/永久性錯誤分類
- MAX_RETRY / RETRY_DELAY_SECONDS 常數
- 邊界情境: None、空字串、混合訊息
🔴🔴 遵循「禁止 Mock 測試鐵律」
- _is_transient_error 是純 Python 方法,無 DB/Redis 依賴
- 無需 Mock直接測試真實邏輯
建立: 2026-04-14 (台北時區) Claude Haiku 4.5
"""
import pytest
from src.services.approval_execution import ApprovalExecutionService
class TestIsTransientError:
"""測試瞬態/永久性錯誤判斷邏輯"""
# ------- 瞬態錯誤(應返回 True-------
def test_connection_refused(self):
assert ApprovalExecutionService._is_transient_error("connection refused") is True
def test_connection_refused_uppercase(self):
"""大小寫不敏感"""
assert ApprovalExecutionService._is_transient_error("Connection Refused") is True
def test_timeout(self):
assert ApprovalExecutionService._is_transient_error("request timeout") is True
def test_timed_out(self):
assert ApprovalExecutionService._is_transient_error("operation timed out") is True
def test_io_error(self):
assert ApprovalExecutionService._is_transient_error("i/o error reading response") is True
def test_io_error_alt(self):
assert ApprovalExecutionService._is_transient_error("io error") is True
def test_service_unavailable(self):
assert ApprovalExecutionService._is_transient_error("service unavailable") is True
def test_too_many_requests(self):
assert ApprovalExecutionService._is_transient_error("too many requests") is True
def test_eof(self):
assert ApprovalExecutionService._is_transient_error("unexpected eof") is True
def test_dial_tcp(self):
assert ApprovalExecutionService._is_transient_error("dial tcp 10.0.0.1:6443: connect") is True
def test_connection_reset(self):
assert ApprovalExecutionService._is_transient_error("connection reset by peer") is True
def test_temporary_failure(self):
assert ApprovalExecutionService._is_transient_error("temporary failure in name resolution") is True
# ------- 永久性錯誤(應返回 False-------
def test_not_found(self):
assert ApprovalExecutionService._is_transient_error("pod not found") is False
def test_forbidden(self):
assert ApprovalExecutionService._is_transient_error("forbidden: insufficient permissions") is False
def test_permission_denied(self):
assert ApprovalExecutionService._is_transient_error("permission denied") is False
def test_unauthorized(self):
assert ApprovalExecutionService._is_transient_error("unauthorized") is False
def test_already_exists(self):
assert ApprovalExecutionService._is_transient_error("resource already exists") is False
def test_invalid(self):
assert ApprovalExecutionService._is_transient_error("invalid field selector") is False
def test_destructive_blocked(self):
assert ApprovalExecutionService._is_transient_error("destructive operation blocked") is False
def test_immutable(self):
assert ApprovalExecutionService._is_transient_error("field is immutable") is False
# ------- 邊界情境 -------
def test_none_returns_false(self):
"""None → 不重試(無法判斷)"""
assert ApprovalExecutionService._is_transient_error(None) is False
def test_empty_string_returns_false(self):
"""空字串 → 不重試"""
assert ApprovalExecutionService._is_transient_error("") is False
def test_permanent_wins_over_transient(self):
"""混合訊息:永久性錯誤關鍵字優先,不重試"""
# "not found" (永久) + "timeout" (瞬態) → 不重試
assert ApprovalExecutionService._is_transient_error("timeout: pod not found") is False
def test_unknown_error_not_retried(self):
"""未知錯誤不重試"""
assert ApprovalExecutionService._is_transient_error("kubectl exited with code 1") is False
class TestRetryConstants:
"""測試重試常數設定"""
def test_max_retry(self):
"""最多重試 2 次(共 3 次嘗試)"""
assert ApprovalExecutionService.MAX_RETRY == 2
def test_retry_delay(self):
"""重試間隔 30 秒"""
assert ApprovalExecutionService.RETRY_DELAY_SECONDS == 30
def test_transient_keywords_not_empty(self):
"""瞬態錯誤關鍵字列表不為空"""
assert len(ApprovalExecutionService._TRANSIENT_ERROR_KEYWORDS) > 0
def test_permanent_keywords_not_empty(self):
"""永久性錯誤關鍵字列表不為空"""
assert len(ApprovalExecutionService._PERMANENT_ERROR_KEYWORDS) > 0
def test_no_overlap_in_keywords(self):
"""瞬態/永久性關鍵字不重疊(避免邏輯衝突)"""
transient = set(ApprovalExecutionService._TRANSIENT_ERROR_KEYWORDS)
permanent = set(ApprovalExecutionService._PERMANENT_ERROR_KEYWORDS)
assert transient.isdisjoint(permanent)

View File

@@ -0,0 +1,315 @@
"""
ReportGenerationService 單元測試
================================
ADR-076 Task 4: 自動報告生成
測試範圍:
- DailyKpi 計算屬性auto_repair_rate, auto_resolve_rate
- format_daily_report() 報告格式
- format_postmortem() 事後檢討格式
- _seconds_until_next_report() 排程計算
- PostmortemData dataclass
🔴🔴 遵循「禁止 Mock 測試鐵律」
- 純 Python 邏輯:不需要 DB/Redis/Telegram
- DB/Telegram 整合部分標記 @pytest.mark.integration
建立: 2026-04-14 (台北時區) Claude Haiku 4.5
"""
from datetime import datetime, timedelta, timezone
import pytest
from src.services.report_generation_service import (
DAILY_REPORT_HOUR_TAIPEI,
POSTMORTEM_MIN_DURATION_MINUTES,
DailyKpi,
PostmortemData,
ReportGenerationService,
_seconds_until_next_report,
)
_TZ_TAIPEI = timezone(timedelta(hours=8))
# =============================================================================
# DailyKpi 計算屬性
# =============================================================================
class TestDailyKpiRates:
"""測試 DailyKpi 計算屬性"""
def _make_kpi(self, **kwargs) -> DailyKpi:
now = datetime.now(_TZ_TAIPEI)
return DailyKpi(
period_start=now - timedelta(hours=24),
period_end=now,
**kwargs,
)
def test_auto_repair_rate_all_success(self):
"""全部成功 → 100%"""
kpi = self._make_kpi(auto_repair_success=10, auto_repair_failed=0)
assert kpi.auto_repair_rate == 1.0
def test_auto_repair_rate_half(self):
"""5 成功 5 失敗 → 50%"""
kpi = self._make_kpi(auto_repair_success=5, auto_repair_failed=5)
assert kpi.auto_repair_rate == 0.5
def test_auto_repair_rate_zero_attempts(self):
"""無嘗試 → 0%(不除以零)"""
kpi = self._make_kpi(auto_repair_success=0, auto_repair_failed=0)
assert kpi.auto_repair_rate == 0.0
def test_auto_resolve_rate(self):
"""10 個告警 6 個自動解決 → 60%"""
kpi = self._make_kpi(total_alerts=10, auto_resolved=6)
assert kpi.auto_resolve_rate == 0.6
def test_auto_resolve_rate_zero_alerts(self):
"""無告警 → 0%(不除以零)"""
kpi = self._make_kpi(total_alerts=0, auto_resolved=0)
assert kpi.auto_resolve_rate == 0.0
# =============================================================================
# format_daily_report
# =============================================================================
class TestFormatDailyReport:
"""測試日度巡檢報告格式"""
def _make_kpi(self, **kwargs) -> DailyKpi:
now = datetime.now(_TZ_TAIPEI)
defaults = dict(
total_alerts=20,
auto_resolved=15,
human_approved=3,
auto_repair_success=12,
auto_repair_failed=3,
km_new_entries=5,
playbook_count=18,
)
defaults.update(kwargs)
return DailyKpi(
period_start=now - timedelta(hours=24),
period_end=now,
**defaults,
)
def test_contains_title(self):
"""報告應包含標題"""
kpi = self._make_kpi()
svc = ReportGenerationService()
report = svc.format_daily_report(kpi)
assert "日度巡檢報告" in report
def test_contains_alert_stats(self):
"""報告應包含告警統計"""
kpi = self._make_kpi(total_alerts=20)
svc = ReportGenerationService()
report = svc.format_daily_report(kpi)
assert "20" in report
def test_contains_auto_repair_rate(self):
"""報告應包含自動修復成功率"""
kpi = self._make_kpi(auto_repair_success=8, auto_repair_failed=2)
svc = ReportGenerationService()
report = svc.format_daily_report(kpi)
# 80.0%
assert "80.0%" in report
def test_contains_km_stats(self):
"""報告應包含 KM 統計"""
kpi = self._make_kpi(km_new_entries=7)
svc = ReportGenerationService()
report = svc.format_daily_report(kpi)
assert "7" in report
def test_contains_playbook_count(self):
"""報告應包含 Playbook 數量"""
kpi = self._make_kpi(playbook_count=18)
svc = ReportGenerationService()
report = svc.format_daily_report(kpi)
assert "18" in report
def test_health_excellent_threshold(self):
"""自動修復率 >= 80% → 優秀"""
kpi = self._make_kpi(auto_repair_success=8, auto_repair_failed=2)
svc = ReportGenerationService()
report = svc.format_daily_report(kpi)
assert "優秀" in report
def test_health_good_threshold(self):
"""自動修復率 50-79% → 良好"""
kpi = self._make_kpi(auto_repair_success=6, auto_repair_failed=4)
svc = ReportGenerationService()
report = svc.format_daily_report(kpi)
assert "良好" in report
def test_health_needs_attention(self):
"""自動修復率 < 50% → 需關注"""
kpi = self._make_kpi(auto_repair_success=3, auto_repair_failed=7)
svc = ReportGenerationService()
report = svc.format_daily_report(kpi)
assert "需關注" in report
def test_category_breakdown_shown(self):
"""有告警分類時應顯示分類分佈"""
kpi = self._make_kpi(
alert_category_breakdown={"kubernetes": 5, "host_resource": 3}
)
svc = ReportGenerationService()
report = svc.format_daily_report(kpi)
assert "kubernetes" in report
def test_contains_taiwan_timezone_note(self):
"""報告應標示台北時間"""
kpi = self._make_kpi()
svc = ReportGenerationService()
report = svc.format_daily_report(kpi)
assert "台北時間" in report
def test_is_html_formatted(self):
"""報告應包含 HTML 標籤Telegram HTML 格式)"""
kpi = self._make_kpi()
svc = ReportGenerationService()
report = svc.format_daily_report(kpi)
assert "<b>" in report
# =============================================================================
# format_postmortem
# =============================================================================
class TestFormatPostmortem:
"""測試事後檢討報告格式"""
def _make_postmortem(self, **kwargs) -> PostmortemData:
now = datetime.now(_TZ_TAIPEI)
defaults = dict(
incident_id="INC-20260414-001",
title="KubePodOOMKilled on awoooi-api",
duration_minutes=25.5,
root_cause="記憶體洩漏導致 OOMKilled",
resolution_action="kubectl rollout restart deployment/awoooi-api",
ai_provider="OpenClaw (deepseek-r1:14b)",
auto_repaired=True,
retry_count=0,
created_at=now - timedelta(minutes=25, seconds=30),
resolved_at=now,
)
defaults.update(kwargs)
return PostmortemData(**defaults)
def test_contains_incident_id(self):
"""事後檢討應包含 Incident ID"""
data = self._make_postmortem()
svc = ReportGenerationService()
report = svc.format_postmortem(data)
assert "INC-20260414-001" in report
def test_contains_duration(self):
"""事後檢討應包含持續時間"""
data = self._make_postmortem(duration_minutes=25.5)
svc = ReportGenerationService()
report = svc.format_postmortem(data)
assert "25.5" in report
def test_auto_repaired_shown(self):
"""自動修復應顯示標記"""
data = self._make_postmortem(auto_repaired=True)
svc = ReportGenerationService()
report = svc.format_postmortem(data)
assert "自動修復" in report
def test_human_intervene_shown(self):
"""人工介入應顯示標記"""
data = self._make_postmortem(auto_repaired=False)
svc = ReportGenerationService()
report = svc.format_postmortem(data)
assert "人工介入" in report
def test_retry_count_shown(self):
"""重試次數應顯示"""
data = self._make_postmortem(retry_count=2)
svc = ReportGenerationService()
report = svc.format_postmortem(data)
assert "重試 2 次" in report
def test_root_cause_shown(self):
"""根本原因應顯示"""
data = self._make_postmortem(root_cause="記憶體洩漏導致 OOMKilled")
svc = ReportGenerationService()
report = svc.format_postmortem(data)
assert "記憶體洩漏" in report
def test_resolution_action_shown(self):
"""執行動作應顯示在 code 標籤中"""
data = self._make_postmortem(
resolution_action="kubectl rollout restart deployment/awoooi-api"
)
svc = ReportGenerationService()
report = svc.format_postmortem(data)
assert "kubectl rollout restart" in report
assert "<code>" in report
def test_no_root_cause_skips_section(self):
"""無根本原因時不應顯示根本原因區塊"""
data = self._make_postmortem(root_cause=None)
svc = ReportGenerationService()
report = svc.format_postmortem(data)
assert "根本原因" not in report
def test_contains_taiwan_timezone_note(self):
"""事後檢討應標示台北時間"""
data = self._make_postmortem()
svc = ReportGenerationService()
report = svc.format_postmortem(data)
assert "台北時間" in report
# =============================================================================
# _seconds_until_next_report
# =============================================================================
class TestSecondsUntilNextReport:
"""測試排程計算邏輯"""
def test_returns_positive_seconds(self):
"""永遠返回正數秒數"""
seconds = _seconds_until_next_report()
assert seconds > 0
def test_returns_at_most_one_day(self):
"""最多等待 24 小時"""
seconds = _seconds_until_next_report()
assert seconds <= 86400
def test_returns_float(self):
"""返回值為 float"""
seconds = _seconds_until_next_report()
assert isinstance(seconds, float)
# =============================================================================
# 常數設定
# =============================================================================
class TestServiceConstants:
"""測試服務常數"""
def test_daily_report_hour(self):
"""日度報告觸發時間應為 08:00 台北時間"""
assert DAILY_REPORT_HOUR_TAIPEI == 8
def test_postmortem_min_duration(self):
"""Postmortem 最低觸發時長應為 10 分鐘"""
assert POSTMORTEM_MIN_DURATION_MINUTES == 10

View File

@@ -6,7 +6,52 @@
---
## 📍 當前狀態 (2026-04-12 深夜 — ADR-075 Phase 1+2+CR 全完成,git push gitea main ✅)
## 📍 當前狀態 (2026-04-14 — 戰術 B 四大 Task完成,675 tests ✅)
**本次 session 新增4 Task6 檔案75 新測試)**
- `feat(adr-076): Task 2``alert_grouping_service.py` — 5分鐘滑動視窗告警聚合引擎 + 16 tests
- `feat(adr-076): Task 3``approval_execution.py` — 執行失敗重試MAX_RETRY=2, 30s, 瞬態/永久分類)+ 29 tests
- `feat(adr-076): Task 4``report_generation_service.py` — 日度巡檢報告(08:00台北) + Postmortem + 30 tests
- `webhooks.py` — ADR-076 聚合邏輯整合(指紋後/LLM前
- `main.py` — 日度報告迴圈掛進 lifespan
**測試**: 600 → 675 通過(+7510 skipped0 failed
**下一步**git push gitea main → Pod 部署驗證 → 觀察 E2E
---
## 📍 前次狀態 (2026-04-14 — MASTER AIOps Blueprint 完成,等待統帥批准)
**本次 session 新增(無 commit純文件工作**
- `docs/superpowers/plans/2026-04-14-MASTER-aiops-full-automation-blueprint.md` — 整合4份計畫文件的主計畫書 v1.0
- Memory: `aiops_current_architecture_diagnosis.md` — 完整架構診斷報告
**飛輪現況**: Pod 38ff2bb飛輪 83% 完整4 Phase 等待批准後實作
**業界標準文件缺口**已識別尚未建立SLO/SLI、AI Model Card、Human-in-Loop Spec、Alert Taxonomy Catalog、Configuration Reference
**下一步**:等統帥批准 MASTER 計畫書後,開始 Phase 1 實作
---
## 📍 前次狀態 (2026-04-14 — 飛輪 Bug 修補完成,全面部署 38ff2bb ✅)
**本次 session 修補6 commits全已部署Pod 跑 38ff2bb**
- `38ff2bb` heartbeat → ADR-075 TYPE-1 格式INFO 樹狀結構)
- `f1face4` HostHighCpuLoad 獨立規則 → NO_ACTION停止 kubectl scale unknown
- `1a4b52e` fingerprint 加 alertname 防跨告警指紋衝突 + 心跳分類補入
- `b17a677` gitea webhook analysis.model_dump() dict bug
- `0c88f67` DIAGNOSE 強制 deepseek-r1:14b不用 gemma3:4b
- `09134f5` incident.title bug + DIAGNOSE→NEMOTRON confidence=0.0 修復
**飛輪狀態**規格書層次一二三四全完成ADR-075 全完成,本次額外修補已補齊
**下一步**:觀察自動修復 E2E或繼續 ADR-075 Phase 3Prometheus 規則)
---
## 📍 前次狀態 (2026-04-12 深夜 — ADR-075 Phase 1+2+CR 全完成git push gitea main ✅)
**ADR-075 全部完成**3 commits: 2cef209 → 561c1d8 → 1cb654c