Files
awoooi/apps/api/src/services/weekly_report_service.py
2026-05-05 22:14:54 +08:00

297 lines
9.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Weekly Report Service - Phase 21.3 定期報告
============================================
整合 StatsService + K3sMonitorService生成週報並推送 Telegram。
數據來源:
- StatsService (告警/AI 效能統計)
- K3sMonitorService (K3s 健康)
- Git API (開發活動)
符合 leWOOOgo 積木化規範:
- Service 層封裝所有邏輯
- 透過 DI 注入依賴
@author Claude Code (首席架構師)
@version 1.0.0
@date 2026-03-31 (台北時間)
@see ADR-041-periodic-reporting-architecture.md
"""
import asyncio
import subprocess
from datetime import datetime, timedelta
from typing import Protocol, runtime_checkable
import structlog
from zoneinfo import ZoneInfo
from src.services.stats_service import StatsService, get_stats_service
from src.services.k3s_monitor_service import K3sMonitorService, get_k3s_monitor_service
from src.services.telegram_gateway import WeeklyReportMessage, get_telegram_gateway
logger = structlog.get_logger(__name__)
# 台北時區
TZ_TAIPEI = ZoneInfo("Asia/Taipei")
# =============================================================================
# Protocol (Interface)
# =============================================================================
@runtime_checkable
class IWeeklyReportService(Protocol):
"""
週報服務介面
Phase 21.3: 定義 Protocol 供依賴注入
"""
async def generate_report(self) -> WeeklyReportMessage:
"""生成週報"""
...
async def send_weekly_report(self) -> bool:
"""發送週報"""
...
# =============================================================================
# Implementation
# =============================================================================
class WeeklyReportService:
"""
週報服務實作
整合多個數據來源生成週報
"""
def __init__(
self,
stats_service: StatsService | None = None,
k3s_monitor: K3sMonitorService | None = None,
):
self._stats_service = stats_service or get_stats_service()
self._k3s_monitor = k3s_monitor or get_k3s_monitor_service()
def _get_week_range(self) -> tuple[str, datetime, datetime]:
"""取得本週範圍"""
now = datetime.now(TZ_TAIPEI)
# 找到本週一
monday = now - timedelta(days=now.weekday())
monday = monday.replace(hour=0, minute=0, second=0, microsecond=0)
# 本週日
sunday = monday + timedelta(days=6, hours=23, minutes=59, seconds=59)
# ISO 週次
week_num = now.isocalendar()[1]
week_range = f"{now.year}-W{week_num:02d}"
return week_range, monday, sunday
def _get_git_stats(self, since: datetime) -> tuple[int, int]:
"""取得 Git 統計 (commits, deploys)"""
try:
# 取得本週 commits 數量
since_str = since.strftime("%Y-%m-%d")
result = subprocess.run(
["git", "log", f"--since={since_str}", "--oneline"],
capture_output=True,
text=True,
timeout=10,
cwd="/app", # K8s 容器內的工作目錄
)
commits = len(result.stdout.strip().split("\n")) if result.stdout.strip() else 0
# 取得部署次數 (計算含 "deploy" 或 "CD" 的 commits)
result_deploy = subprocess.run(
["git", "log", f"--since={since_str}", "--oneline", "--grep=deploy", "--grep=CD", "-i"],
capture_output=True,
text=True,
timeout=10,
cwd="/app",
)
deploys = len(result_deploy.stdout.strip().split("\n")) if result_deploy.stdout.strip() else 0
return commits, deploys
except Exception as e:
logger.warning("git_stats_failed", error=str(e))
return 0, 0
async def generate_report(self) -> WeeklyReportMessage:
"""
生成週報
整合多個數據來源
"""
now = datetime.now(TZ_TAIPEI)
week_range, monday, sunday = self._get_week_range()
report_date = now.strftime("%Y-%m-%d %H:%M")
# 取得統計數據 (7 天)
try:
incident_summary = await self._stats_service.get_incident_summary(days=7)
resolution_stats = await self._stats_service.get_resolution_stats(days=7)
ai_performance = await self._stats_service.get_ai_performance(days=7)
except Exception as e:
logger.warning("stats_fetch_failed", error=str(e))
incident_summary = {}
resolution_stats = {}
ai_performance = {}
# 取得 K3s 狀態
try:
k3s_status = await self._k3s_monitor.collect_cluster_status()
except Exception as e:
logger.warning("k3s_fetch_failed", error=str(e))
k3s_status = None
# 取得 Git 統計
commits, deploys = self._get_git_stats(monday)
# 計算指標
total_incidents = incident_summary.get("total_incidents", 0)
resolved_rate = incident_summary.get("resolved_rate", 0.0)
# 從嚴重度分佈中取得 Critical 數量
severity_dist = incident_summary.get("severity_distribution", [])
critical_count = sum(
s.get("count", 0) for s in severity_dist
if s.get("severity", "").upper() in ["P0", "CRITICAL"]
)
# AI 效能
ai_proposals = ai_performance.get("total_proposals", 0)
ai_executed = ai_performance.get("executed_count", 0)
ai_success_rate = ai_performance.get("success_rate", 0.0)
# 平均回應時間
avg_response = resolution_stats.get("avg_minutes") or 0.0
# K3s 指標
k3s_uptime = 99.9 # 假設值,實際應從 Prometheus 取得
pod_restarts = k3s_status.pod_restart_48h if k3s_status else 0
hpa_events = 0 # 需要從 Prometheus 取得 HPA 事件
# 2026-04-07 Claude Code: Sprint 4 F1 — 取得處置分佈
disp_auto = disp_human = disp_manual = disp_cold = disp_total = 0
try:
from src.services.anomaly_counter import get_anomaly_counter
counter = get_anomaly_counter()
disp_summary, _ = await counter.get_all_disposition_stats()
disp_auto = disp_summary.get("auto_repair", 0)
disp_human = disp_summary.get("human_approved", 0)
disp_manual = disp_summary.get("manual_resolved", 0)
disp_cold = disp_summary.get("cold_start_trust", 0)
disp_total = disp_summary.get("total", 0)
except Exception as _disp_e:
logger.warning("weekly_report_disposition_failed", error=str(_disp_e))
# 組裝週報
report = WeeklyReportMessage(
week_range=week_range,
report_date=report_date,
alert_total=total_incidents,
alert_critical=critical_count,
alert_resolved=int(total_incidents * resolved_rate / 100) if total_incidents > 0 else 0,
resolved_rate=resolved_rate,
ai_proposal_count=ai_proposals,
ai_executed_count=ai_executed,
ai_success_rate=ai_success_rate,
avg_response_minutes=avg_response,
k3s_uptime_pct=k3s_uptime,
pod_restart_total=pod_restarts,
hpa_scale_events=hpa_events,
commits_count=commits,
deploy_count=deploys,
ai_cost_week=0.0, # 需要從 AI 成本追蹤取得
ai_tokens_week=0, # 需要從 AI 成本追蹤取得
disposition_auto=disp_auto,
disposition_human=disp_human,
disposition_manual=disp_manual,
disposition_cold_start=disp_cold,
disposition_total=disp_total,
)
logger.info(
"weekly_report_generated",
week=week_range,
alerts=total_incidents,
commits=commits,
)
return report
async def send_weekly_report(self) -> bool:
"""
發送週報
生成週報並推送到 Telegram
"""
try:
# 生成週報
report = await self.generate_report()
# 取得 Telegram Gateway
gateway = get_telegram_gateway()
if not gateway._initialized:
await gateway.initialize()
# 發送訊息
formatted = report.format()
result = await gateway.send_text(formatted)
if result:
logger.info("weekly_report_sent", week=report.week_range)
return True
else:
logger.error("weekly_report_failed", week=report.week_range)
return False
except Exception as e:
logger.error("weekly_report_error", error=str(e))
return False
# =============================================================================
# Dependency Injection
# =============================================================================
_weekly_report_service: WeeklyReportService | None = None
def get_weekly_report_service() -> WeeklyReportService:
"""取得 WeeklyReportService 實例 (Singleton)"""
global _weekly_report_service
if _weekly_report_service is None:
_weekly_report_service = WeeklyReportService()
return _weekly_report_service
# =============================================================================
# CLI Entry Point (for CronJob)
# =============================================================================
async def main():
"""
CLI 入口點
供 K8s CronJob 調用:
python -m src.services.weekly_report_service
"""
service = get_weekly_report_service()
success = await service.send_weekly_report()
return 0 if success else 1
if __name__ == "__main__":
import sys
exit_code = asyncio.run(main())
sys.exit(exit_code)