"""
自動報告生成服務 (Report Generation Service)
=============================================
ADR-076: 展現價值 — 日度巡檢報告 + 事後檢討 (Postmortem)
建立: 2026-04-14 (台北時區) Claude Haiku 4.5
功能:
1. 日度巡檢報告 — 每日 08:00 台北時間,收集前 24h 關鍵 KPI
2. 事後檢討 (Postmortem) — Incident resolved 且 duration > 10 分鐘自動觸發
設計原則:
- 遵循 leWOOOgo 積木化鐵律
- 不直接存取 Redis(透過 Service 層)
- 所有數據從 DB 聚合,不使用假數據
- Graceful Degradation:各資料來源失敗獨立處理
- 統帥鐵律:台北時區(+8),禁止 UTC
報告流程:
日度巡檢: lifespan 啟動 → _run_daily_report_loop() 無限迴圈
→ 計算距下一個 08:00 台北時間的秒數
→ sleep → 收集數據 → 組裝 → Telegram 推送
Postmortem: Incident resolve 時,由呼叫方 await trigger_postmortem(incident)
"""
from __future__ import annotations
import asyncio
import html
from dataclasses import dataclass, field
from datetime import datetime, timedelta, timezone
from typing import Any
import structlog
from src.utils.timezone import now_taipei
logger = structlog.get_logger(__name__)
# 台北時區 (UTC+8)
_TZ_TAIPEI = timezone(timedelta(hours=8))
# 日度報告觸發時間(台北時間 08:00)
DAILY_REPORT_HOUR_TAIPEI = 8
# Postmortem 觸發最低時長(分鐘)
POSTMORTEM_MIN_DURATION_MINUTES = 10
# =============================================================================
# Data Types
# =============================================================================
@dataclass
class DailyKpi:
"""24 小時 KPI 摘要"""
period_start: datetime
period_end: datetime
# 告警
total_alerts: int = 0
auto_resolved: int = 0
human_approved: int = 0
converged_alerts: int = 0
grouped_alerts: int = 0
# 自動修復
auto_repair_success: int = 0
auto_repair_failed: int = 0
# 飛輪
km_new_entries: int = 0
playbook_count: int = 0
# 告警分類分佈
alert_category_breakdown: dict[str, int] = field(default_factory=dict)
@property
def auto_repair_rate(self) -> float:
total = self.auto_repair_success + self.auto_repair_failed
return self.auto_repair_success / total if total > 0 else 0.0
@property
def auto_resolve_rate(self) -> float:
return self.auto_resolved / self.total_alerts if self.total_alerts > 0 else 0.0
@dataclass
class PostmortemData:
"""事後檢討資料"""
incident_id: str
title: str
duration_minutes: float
root_cause: str | None
resolution_action: str | None
ai_provider: str | None
auto_repaired: bool
retry_count: int
created_at: datetime
resolved_at: datetime
# =============================================================================
# ReportGenerationService
# =============================================================================
class ReportGenerationService:
"""
自動報告生成服務
統帥指令 (2026-04-14):
- 日度巡檢報告:每日 08:00 台北時間
- 事後檢討:Incident resolved 且 duration > 10 分鐘
- 所有報告推送至 Telegram SRE 群組
"""
async def collect_daily_kpi(self) -> DailyKpi:
"""
收集過去 24 小時 KPI
資料來源: PostgreSQL (incidents, approvals, knowledge_entries)
Graceful Degradation: 每個資料源失敗獨立處理,不中止整體
Returns:
DailyKpi 摘要
"""
now = now_taipei()
period_start = now - timedelta(hours=24)
kpi = DailyKpi(period_start=period_start, period_end=now)
# 並行收集各項 KPI
results = await asyncio.gather(
self._collect_alert_stats(period_start),
self._collect_repair_stats(period_start),
self._collect_km_stats(period_start),
self._collect_playbook_count(),
return_exceptions=True,
)
alert_stats, repair_stats, km_stats, playbook_count = results
if isinstance(alert_stats, dict):
kpi.total_alerts = alert_stats.get("total", 0)
kpi.auto_resolved = alert_stats.get("auto_resolved", 0)
kpi.human_approved = alert_stats.get("human_approved", 0)
kpi.converged_alerts = alert_stats.get("converged", 0)
kpi.alert_category_breakdown = alert_stats.get("categories", {})
else:
logger.warning("daily_kpi_alert_stats_failed", error=str(alert_stats))
if isinstance(repair_stats, dict):
kpi.auto_repair_success = repair_stats.get("success", 0)
kpi.auto_repair_failed = repair_stats.get("failed", 0)
else:
logger.warning("daily_kpi_repair_stats_failed", error=str(repair_stats))
if isinstance(km_stats, int):
kpi.km_new_entries = km_stats
else:
logger.warning("daily_kpi_km_stats_failed", error=str(km_stats))
if isinstance(playbook_count, int):
kpi.playbook_count = playbook_count
else:
logger.warning("daily_kpi_playbook_count_failed", error=str(playbook_count))
return kpi
async def _collect_alert_stats(self, since: datetime) -> dict:
"""收集告警統計(incident 表)"""
from sqlalchemy import func, select
from sqlalchemy import text as sa_text
from src.db.base import get_db_context
from src.db.models import IncidentRecord
async with get_db_context() as db:
# 總數
total = await db.scalar(
select(func.count()).select_from(IncidentRecord).where(
IncidentRecord.created_at >= since
)
) or 0
# 自動解決(status=resolved,無人工簽核)
auto_resolved = await db.scalar(
select(func.count()).select_from(IncidentRecord).where(
IncidentRecord.created_at >= since,
IncidentRecord.status == "resolved",
)
) or 0
# 告警分類分佈(alert_category 欄位)
categories: dict[str, int] = {}
try:
cat_result = await db.execute(
sa_text(
"SELECT alert_category, COUNT(*) as cnt "
"FROM incidents "
"WHERE created_at >= :since AND alert_category IS NOT NULL "
"GROUP BY alert_category "
"ORDER BY cnt DESC "
"LIMIT 10"
).bindparams(since=since)
)
for row in cat_result:
categories[row[0]] = row[1]
except Exception as _cat_e:
logger.debug("alert_category_breakdown_failed", error=str(_cat_e))
return {
"total": total,
"auto_resolved": auto_resolved,
"human_approved": 0, # TODO: 從 signatures 表統計
"converged": 0, # 已由 DB hit_count 記錄,暫略
"categories": categories,
}
async def _collect_repair_stats(self, since: datetime) -> dict:
"""
收集自動修復統計
2026-04-22 Claude Sonnet 4.6 修復 — incidents.outcome JSON 在執行鏈路中從未被寫入
execution_success,導致永遠查詢到 0。改查 approval_records.status 作為 source of truth
(approval_execution.py 每次執行後都會寫入 EXECUTION_SUCCESS / EXECUTION_FAILED)。
"""
from sqlalchemy import text
from src.db.base import get_db_context
async with get_db_context() as db:
row = await db.execute(
text("""
WITH scoped AS (
SELECT
*,
(
COALESCE(extra_metadata->>'execution_kind', '') = 'no_action'
OR COALESCE(extra_metadata->>'repair_executed', '') = 'false'
OR btrim(coalesce(action, '')) = ''
OR UPPER(action) LIKE 'OBSERVE%'
OR UPPER(action) LIKE 'INVESTIGATE%'
OR UPPER(action) LIKE 'NO_ACTION%'
OR UPPER(action) LIKE '% NO_ACTION%'
OR UPPER(action) LIKE '%| NO_ACTION%'
) AS is_observe_only
FROM approval_records
WHERE created_at >= :since
)
SELECT
COUNT(*) FILTER (
WHERE UPPER(status::text) = 'EXECUTION_SUCCESS'
AND NOT is_observe_only
) AS success,
COUNT(*) FILTER (
WHERE UPPER(status::text) = 'EXECUTION_FAILED'
AND NOT is_observe_only
) AS failed
FROM scoped
"""),
{"since": since},
)
r = row.one()
return {"success": int(r.success or 0), "failed": int(r.failed or 0)}
async def _collect_km_stats(self, since: datetime) -> int:
"""收集新增 KM 條目數"""
from sqlalchemy import func, select
from src.db.base import get_db_context
from src.db.models import KnowledgeEntryRecord
async with get_db_context() as db:
count = await db.scalar(
select(func.count()).select_from(KnowledgeEntryRecord).where(
KnowledgeEntryRecord.created_at >= since
)
) or 0
return int(count)
async def _collect_playbook_count(self) -> int:
"""
收集活躍 Playbook 數量
2026-04-14 Claude Sonnet 4.6 修復 — Playbook 儲存在 Redis 非 PostgreSQL,
改用 playbook_service.list_playbooks() 讀 Redis。
"""
from src.services.playbook_service import get_playbook_service
try:
svc = get_playbook_service()
playbooks, total = await svc.list_playbooks(limit=1000)
return int(total or len(playbooks))
except Exception as e:
logger.warning("daily_kpi_playbook_count_failed", error=str(e))
return 0
def _format_report_source_health_block(
self,
source_health: dict[str, Any] | None,
) -> list[str]:
"""Format read-only report source health and automation asset state."""
if not source_health:
return []
rollups = source_health.get("rollups") or {}
ok_count = int(rollups.get("source_ok_count") or 0)
total_count = int(rollups.get("source_count") or 0)
confidence = int(rollups.get("confidence_percent") or 0)
gap_ids = [
str(source.get("work_item_id"))
for source in source_health.get("source_health", [])
if source.get("work_item_id")
][:5]
gap_text = ", ".join(gap_ids) if gap_ids else "無"
lines = [
"",
"🧾 報表資料源 / 沉澱",
f" 來源: {ok_count}/{total_count} | 信心: {confidence}%",
f" 缺口: {html.escape(gap_text)}",
]
for asset in (source_health.get("automation_assets") or [])[:5]:
label = html.escape(str(asset.get("label") or "資產"))
state = html.escape(str(asset.get("state") or "unknown"))
done = int(asset.get("done_count") or 0)
blocked = int(asset.get("blocked_count") or 0)
total = done + blocked
lines.append(f" {label}: {state} {done}/{total}")
assessment = source_health.get("all_zero_assessment") or {}
if assessment.get("all_zero_observed"):
verdict = html.escape(str(assessment.get("verdict") or "source_gap_requires_review"))
lines.append(f" 全 0 判讀: {verdict}")
lines.append(" 只讀判讀:不自動改排程、不直接發修復、不取代人工批准。")
return lines
def format_monthly_report_preview(
self,
source_health: dict[str, Any] | None,
*,
generated_at: datetime | None = None,
) -> str:
"""Format a monthly no-send preview from the unified report source-health model."""
now = generated_at or now_taipei()
source_health = source_health or {}
previews = source_health.get("no_send_previews") or []
monthly_preview = next(
(preview for preview in previews if preview.get("cadence_id") == "monthly"),
{},
)
gap_ids = monthly_preview.get("gap_source_ids") or []
gap_text = ", ".join(str(gap_id) for gap_id in gap_ids[:5]) if gap_ids else "無"
lines = [
"📊 AWOOOI 月報 no-send preview",
f"{now.strftime('%Y-%m')} | {now.strftime('%Y-%m-%d %H:%M')} 台北時間",
"",
"🧭 月報交付狀態",
f" 狀態: {html.escape(str(monthly_preview.get('delivery_state') or 'no_send_preview'))}",
f" Owner: {html.escape(str(monthly_preview.get('owner_agent') or '未指定'))}",
f" 缺口來源: {html.escape(gap_text)}",
" 實發: 0 | Gateway queue write: 0",
]
lines.extend(self._format_report_source_health_block(source_health))
lines += [
"",
"🤖 AWOOOI 月報草案 | no-send preview,不代表已授權發送或自動修復",
]
return "\n".join(lines)
def format_daily_report(
self,
kpi: DailyKpi,
source_health: dict[str, Any] | None = None,
) -> str:
"""
組裝日度巡檢報告(Telegram HTML 格式)
Args:
kpi: DailyKpi 摘要
source_health: 報表資料源健康與自動化資產沉澱(只讀)
Returns:
Telegram HTML 格式字串
"""
date_str = kpi.period_end.strftime("%Y-%m-%d")
period_str = f"{kpi.period_start.strftime('%H:%M')} ~ {kpi.period_end.strftime('%H:%M')}"
auto_repair_rate_pct = f"{kpi.auto_repair_rate * 100:.1f}%"
auto_resolve_rate_pct = f"{kpi.auto_resolve_rate * 100:.1f}%"
# 告警分類表
cat_lines = ""
if kpi.alert_category_breakdown:
for cat, cnt in list(kpi.alert_category_breakdown.items())[:6]:
cat_lines += f"\n • {cat}: {cnt}"
# 整體健康度評估
if kpi.auto_repair_rate >= 0.8:
health_icon = "💚"
health_label = "優秀"
elif kpi.auto_repair_rate >= 0.5:
health_icon = "🟡"
health_label = "良好"
else:
health_icon = "🔴"
health_label = "需關注"
lines = [
"📊 AWOOOI 日度巡檢報告",
f"{date_str} | {period_str} 台北時間",
"",
f"{health_icon} 整體健康度: {health_label}",
"",
"🚨 告警統計",
f" 總計: {kpi.total_alerts} 個",
f" 自動解決: {kpi.auto_resolved} 個 ({auto_resolve_rate_pct})",
f" 人工批准: {kpi.human_approved} 個",
f" 告警收斂: {kpi.converged_alerts} 個",
]
if cat_lines:
lines += [f"\n📂 分類分佈{cat_lines}"]
lines += [
"",
"🔧 自動修復",
f" 成功: {kpi.auto_repair_success} 次",
f" 失敗: {kpi.auto_repair_failed} 次",
f" 成功率: {auto_repair_rate_pct}",
"",
"🧠 知識積累",
f" 新增 KM 條目: {kpi.km_new_entries} 筆",
f" 活躍 Playbook: {kpi.playbook_count} 個",
]
lines.extend(self._format_report_source_health_block(source_health))
lines += [
"",
f"🤖 AWOOOI AIOps 自動生成 | {kpi.period_end.strftime('%Y-%m-%d %H:%M')} 台北時間",
]
return "\n".join(lines)
async def collect_report_source_health(self, days: int) -> dict[str, Any] | None:
"""Collect report source health in read-only mode; never send or write."""
try:
from src.services.ai_agent_report_source_health import (
build_ai_agent_report_source_health,
)
return await build_ai_agent_report_source_health(days=days)
except Exception as exc:
logger.warning("daily_report_source_health_failed", error=str(exc))
return None
def format_postmortem(self, data: PostmortemData) -> str:
"""
組裝事後檢討報告(Telegram HTML 格式)
Args:
data: PostmortemData
Returns:
Telegram HTML 格式字串
"""
duration_str = f"{data.duration_minutes:.1f} 分鐘"
auto_str = "✅ 自動修復" if data.auto_repaired else "👤 人工介入"
retry_str = f"(重試 {data.retry_count} 次)" if data.retry_count > 0 else ""
created_str = data.created_at.strftime("%H:%M:%S")
resolved_str = data.resolved_at.strftime("%H:%M:%S")
lines = [
"📋 事後檢討 (Postmortem)",
f"Incident: {data.incident_id}",
"",
f"⏱ 影響時長: {duration_str}",
f"🕐 發生: {created_str} → 解決: {resolved_str}",
f"🔧 處置方式: {auto_str}{retry_str}",
]
if data.root_cause:
lines += [f"\n🔍 根本原因\n{data.root_cause[:300]}"]
if data.resolution_action:
lines += [f"\n⚡ 執行動作\n{data.resolution_action[:200]}"]
if data.ai_provider:
lines += [f"\nAI 決策: {data.ai_provider}"]
lines += [
"",
f"🤖 AWOOOI Postmortem 自動生成 | {now_taipei().strftime('%Y-%m-%d %H:%M')} 台北時間",
]
return "\n".join(lines)
async def send_daily_report(self) -> None:
"""
收集 KPI → 組裝 → 推送 Telegram SRE 群組
Graceful Degradation: 失敗只記錄 log,不拋出例外
"""
try:
kpi = await self.collect_daily_kpi()
source_health = await self.collect_report_source_health(days=1)
report_text = self.format_daily_report(kpi, source_health)
from src.services.telegram_gateway import get_telegram_gateway
gateway = get_telegram_gateway()
await gateway.send_to_group(report_text, parse_mode="HTML")
logger.info(
"daily_report_sent",
total_alerts=kpi.total_alerts,
auto_repair_rate=f"{kpi.auto_repair_rate:.1%}",
)
except Exception as e:
logger.error("daily_report_failed", error=str(e))
async def trigger_postmortem(
self,
incident_id: str,
title: str,
created_at: datetime,
resolved_at: datetime,
root_cause: str | None = None,
resolution_action: str | None = None,
ai_provider: str | None = None,
auto_repaired: bool = False,
retry_count: int = 0,
) -> None:
"""
觸發事後檢討報告
呼叫方:incident_service.resolve_incident() 或 approval_execution.py
觸發條件:duration > POSTMORTEM_MIN_DURATION_MINUTES
Args:
incident_id: Incident ID
title: Incident 標題
created_at: 建立時間
resolved_at: 解決時間
root_cause: 根本原因(AI 分析結果)
resolution_action: 執行動作
ai_provider: 決策 AI provider
auto_repaired: 是否自動修復
retry_count: 重試次數
"""
duration_minutes = (resolved_at - created_at).total_seconds() / 60
if duration_minutes < POSTMORTEM_MIN_DURATION_MINUTES:
logger.debug(
"postmortem_skipped_short_duration",
incident_id=incident_id,
duration_minutes=duration_minutes,
min_required=POSTMORTEM_MIN_DURATION_MINUTES,
)
return
data = PostmortemData(
incident_id=incident_id,
title=title,
duration_minutes=duration_minutes,
root_cause=root_cause,
resolution_action=resolution_action,
ai_provider=ai_provider,
auto_repaired=auto_repaired,
retry_count=retry_count,
created_at=created_at,
resolved_at=resolved_at,
)
# 技術債修復 (2026-04-14 Claude Sonnet 4.6): 3 次重試 + 指數退避
# 失敗時發送告警到 SRE 群組,避免靜默吞掉錯誤
import asyncio as _asyncio
report_text = self.format_postmortem(data)
await self._persist_postmortem_km(data, report_text)
from src.services.telegram_gateway import get_telegram_gateway
gateway = get_telegram_gateway()
max_attempts = 3
backoff_seconds = 2.0
last_error: Exception | None = None
for attempt in range(1, max_attempts + 1):
try:
await gateway.send_to_group(report_text, parse_mode="HTML")
logger.info(
"postmortem_sent",
incident_id=incident_id,
duration_minutes=duration_minutes,
attempt=attempt,
)
return
except Exception as e:
last_error = e
logger.warning(
"postmortem_send_retry",
incident_id=incident_id,
attempt=attempt,
max_attempts=max_attempts,
error=str(e),
)
if attempt < max_attempts:
await _asyncio.sleep(backoff_seconds * attempt)
# 3 次全失敗 → 記 error + 嘗試簡化降級通知(防止完全靜默)
logger.error(
"postmortem_failed",
incident_id=incident_id,
error=str(last_error),
attempts=max_attempts,
)
try:
fallback_text = (
f"⚠️ Postmortem 發送失敗 (3 次重試)\n"
f"Incident: {incident_id}\n"
f"Duration: {duration_minutes:.1f} 分鐘\n"
f"Error: {str(last_error)[:200]}"
)
await gateway.send_to_group(fallback_text, parse_mode="HTML")
except Exception as _fe:
logger.error(
"postmortem_fallback_failed",
incident_id=incident_id,
error=str(_fe),
)
async def _persist_postmortem_km(
self,
data: PostmortemData,
report_text: str,
) -> None:
"""Persist generated postmortem as an idempotent KM entry before Telegram send."""
try:
from src.db.base import get_db_context
from src.models.knowledge import (
EntrySource,
EntryStatus,
EntryType,
KnowledgeEntryCreate,
)
from src.repositories.alert_operation_log_repository import (
get_alert_operation_log_repository,
)
from src.repositories.knowledge_repository import KnowledgeDBRepository
async with get_db_context() as db:
repo = KnowledgeDBRepository(db)
entry = await repo.create(
KnowledgeEntryCreate(
title=f"Postmortem {data.incident_id}: {data.title}"[:255],
content=report_text,
entry_type=EntryType.POSTMORTEM,
category="postmortem",
tags=[
"postmortem",
"incident",
"telegram",
"auto_repaired" if data.auto_repaired else "human_intervention",
],
source=EntrySource.AI_EXTRACTED,
status=EntryStatus.REVIEW,
related_incident_id=data.incident_id,
path_type="postmortem",
created_by="report_generation_service",
)
)
await get_alert_operation_log_repository().append(
"KM_CONVERTED",
incident_id=data.incident_id,
actor="report_generation_service",
action_detail="postmortem_persisted",
success=True,
context={
"knowledge_entry_id": entry.id,
"entry_type": EntryType.POSTMORTEM.value,
"path_type": "postmortem",
"duration_minutes": round(data.duration_minutes, 2),
},
)
logger.info(
"postmortem_km_persisted",
incident_id=data.incident_id,
knowledge_entry_id=entry.id,
)
except Exception as e:
logger.warning(
"postmortem_km_persist_failed",
incident_id=data.incident_id,
error=str(e),
)
# =============================================================================
# 日度報告排程迴圈
# =============================================================================
def _seconds_until_next_report() -> float:
"""
計算距下一個 08:00 台北時間的秒數
Returns:
秒數(float)
"""
now = now_taipei()
target = now.replace(hour=DAILY_REPORT_HOUR_TAIPEI, minute=0, second=0, microsecond=0)
if now >= target:
# 已過今天的 08:00 → 等到明天
target += timedelta(days=1)
return (target - now).total_seconds()
async def run_daily_report_loop() -> None:
"""
日度巡檢報告無限排程迴圈
每次睡到下一個 08:00 台北時間,然後發送報告。
以 asyncio.create_task() 從 lifespan 啟動。
Graceful Degradation: 任何例外都只記錄 log,迴圈繼續
"""
service = ReportGenerationService()
logger.info(
"daily_report_loop_started",
trigger_hour_taipei=DAILY_REPORT_HOUR_TAIPEI,
)
while True:
sleep_seconds = _seconds_until_next_report()
logger.info(
"daily_report_next_in",
sleep_seconds=int(sleep_seconds),
next_at=f"{DAILY_REPORT_HOUR_TAIPEI:02d}:00 台北時間",
)
await asyncio.sleep(sleep_seconds)
# 2026-04-22 Claude Sonnet 4.6: 多 Pod 競速保護 — 只有搶到 Redis SETNX 的 Pod 才發報告
from src.services.ai_advisory_helpers import try_acquire_daily_lock
if not await try_acquire_daily_lock("daily_report"):
logger.info("daily_report_skipped_other_pod")
continue
logger.info("daily_report_triggered")
await service.send_daily_report()
# =============================================================================
# Factory Function
# =============================================================================
_instance: ReportGenerationService | None = None
def get_report_generation_service() -> ReportGenerationService:
"""
取得 ReportGenerationService 單例
Returns:
ReportGenerationService 實例
"""
global _instance
if _instance is None:
_instance = ReportGenerationService()
return _instance