Files
ewoooc/services/roi_report_service.py
OoO c300e496c5
All checks were successful
CD Pipeline / deploy (push) Successful in 55s
記錄 ROI 月報反饋區塊失敗
2026-05-13 10:00:04 +08:00

254 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
services/roi_report_service.py
Operation Ollama-First v5.0 / Phase 24 — ROI 月報生成器
設計原則:
- 每月 1 日 09:00 跑(與 daily_report 同時段)
- SQL 統計上月 ai_calls × mcp_calls × rag_query_log
- 對比戰前 baselinehardcode 戰前估算數字)
- 推 Telegram「上月節省 X tokens / $Y / 攔截 Z 次 LLM 呼叫」
- 寫進 ai_insightstype='roi_monthly_report')作長期記錄
戰前 baseline戰役 v5.0 啟動前估算):
- Gemini 月 ~50M tokens / $20
- NIM 月 ~5.8M tokens
- Ollama 月 ~30M tokens自架免費
- Total ~$25/月(含 OpenRouter
"""
from __future__ import annotations
import os
import logging
from datetime import datetime, timedelta
from calendar import monthrange
from typing import Dict, Any, Optional
logger = logging.getLogger(__name__)
# 戰前 baselinev5.0 啟動前估算,後續可從 ai_call_budgets 表動態算)
BASELINE = {
'gemini_monthly_tokens': 50_000_000,
'gemini_monthly_cost_usd': 20.0,
'nim_monthly_tokens': 5_800_000,
'ollama_monthly_tokens': 30_000_000,
'total_monthly_cost_usd': 25.0,
}
def _last_month_range(today: Optional[datetime] = None) -> tuple:
"""取上月 1 號 00:00 到 本月 1 號 00:00"""
today = today or datetime.now()
this_month_start = datetime(today.year, today.month, 1)
if today.month == 1:
last_month_start = datetime(today.year - 1, 12, 1)
else:
last_month_start = datetime(today.year, today.month - 1, 1)
return last_month_start, this_month_start
def query_last_month_stats() -> Dict[str, Any]:
"""SQL 查上月 ai_calls / mcp_calls / rag_query_log 統計"""
try:
from sqlalchemy import text as sa_text
from database.manager import get_session
except Exception as exc:
logger.warning('[ROI] DB import failed: %s', exc)
return {}
last_start, this_start = _last_month_range()
period_label = last_start.strftime('%Y年%m月')
session = get_session()
try:
# 1. ai_calls 統計
ai = session.execute(
sa_text("""
SELECT
COALESCE(SUM(input_tokens + output_tokens), 0) AS total_tokens,
COALESCE(SUM(cost_usd), 0) AS total_cost,
COUNT(*) AS total_calls,
COUNT(*) FILTER (WHERE provider IN ('gcp_ollama','ollama_secondary','ollama_111')) AS ollama_calls,
COUNT(*) FILTER (WHERE provider = 'gemini') AS gemini_calls,
COUNT(*) FILTER (WHERE provider = 'claude') AS claude_calls,
COUNT(*) FILTER (WHERE provider IN ('nim','nim_via_elephant')) AS nim_calls,
COUNT(*) FILTER (WHERE rag_hit) AS rag_hit_calls,
COUNT(*) FILTER (WHERE cache_hit) AS cache_hit_calls,
COALESCE(SUM(input_tokens + output_tokens) FILTER (WHERE provider = 'gemini'), 0) AS gemini_tokens,
COALESCE(SUM(cost_usd) FILTER (WHERE provider = 'gemini'), 0) AS gemini_cost,
COALESCE(SUM(cost_usd) FILTER (WHERE provider = 'claude'), 0) AS claude_cost
FROM ai_calls
WHERE called_at >= :start AND called_at < :end
"""),
{'start': last_start, 'end': this_start},
).fetchone()
# 2. mcp_calls 統計
try:
mcp = session.execute(
sa_text("""
SELECT COUNT(*) AS total, COUNT(*) FILTER (WHERE cache_hit) AS cache_hits
FROM mcp_calls
WHERE called_at >= :start AND called_at < :end
"""),
{'start': last_start, 'end': this_start},
).fetchone()
except Exception:
mcp = None
# 3. rag_query_log 統計
try:
rag = session.execute(
sa_text("""
SELECT
COUNT(*) AS total_queries,
COUNT(*) FILTER (WHERE saved_call) AS saved_calls,
AVG(feedback_score) FILTER (WHERE feedback_score IS NOT NULL) AS avg_feedback
FROM rag_query_log
WHERE queried_at >= :start AND queried_at < :end
"""),
{'start': last_start, 'end': this_start},
).fetchone()
except Exception:
rag = None
return {
'period_label': period_label,
'period_start': last_start,
'period_end': this_start,
'ai_total_tokens': int(ai[0] or 0) if ai else 0,
'ai_total_cost': float(ai[1] or 0) if ai else 0.0,
'ai_total_calls': int(ai[2] or 0) if ai else 0,
'ollama_calls': int(ai[3] or 0) if ai else 0,
'gemini_calls': int(ai[4] or 0) if ai else 0,
'claude_calls': int(ai[5] or 0) if ai else 0,
'nim_calls': int(ai[6] or 0) if ai else 0,
'rag_hit_calls': int(ai[7] or 0) if ai else 0,
'cache_hit_calls': int(ai[8] or 0) if ai else 0,
'gemini_tokens': int(ai[9] or 0) if ai else 0,
'gemini_cost': float(ai[10] or 0) if ai else 0.0,
'claude_cost': float(ai[11] or 0) if ai else 0.0,
'mcp_total': int(mcp[0] or 0) if mcp else 0,
'mcp_cache_hits': int(mcp[1] or 0) if mcp else 0,
'rag_total': int(rag[0] or 0) if rag else 0,
'rag_saved': int(rag[1] or 0) if rag else 0,
'rag_avg_feedback': float(rag[2] or 0) if rag and rag[2] else 0.0,
}
except Exception as exc:
logger.error('[ROI] query failed: %s', exc)
return {}
finally:
session.close()
def render_roi_report(stats: Dict[str, Any]) -> str:
"""組 Telegram 訊息HTML format"""
if not stats:
return "⚠️ ROI 月報資料查詢失敗,請查 logs"
period = stats['period_label']
gemini_saved_tokens = max(0, BASELINE['gemini_monthly_tokens'] - stats['gemini_tokens'])
gemini_saved_cost = max(0.0, BASELINE['gemini_monthly_cost_usd'] - stats['gemini_cost'])
saved_pct = (
gemini_saved_tokens / BASELINE['gemini_monthly_tokens'] * 100
if BASELINE['gemini_monthly_tokens'] else 0
)
# Phase 25 整合caller-level feedback 趨勢
feedback_summary = ''
recommendations_block = ''
try:
from services.feedback_quality_tracker import (
compute_caller_quality_trend, get_caller_recommendations,
render_quality_summary,
)
trends = compute_caller_quality_trend(days=30) # 月報用 30 日窗格
if trends:
feedback_summary = '\n💬 <b>Caller 反饋趨勢30 日)</b>\n' + render_quality_summary(trends)
recs = get_caller_recommendations(days=30)
if recs:
recommendations_block = '\n🔮 <b>智能建議</b>\n'
for r in recs[:3]: # 最多 3 條
action_emoji = '⚠️' if r['action'] == 'review' else ''
recommendations_block += f" {action_emoji} {r['caller']}: {r['reason']}\n"
except Exception:
logger.warning('[ROI] 反饋趨勢查詢失敗,略過月報附加區塊', exc_info=True)
return (
f"📊 <b>ROI 月報 {period}</b>\n"
f"━━━━━━━━━━━━━━━━━━━━\n"
f"💰 <b>成本攔截</b>\n"
f" Gemini: {stats['gemini_tokens']:,} tokens / ${stats['gemini_cost']:.2f}\n"
f" vs 戰前: {BASELINE['gemini_monthly_tokens']:,} / ${BASELINE['gemini_monthly_cost_usd']:.2f}\n"
f" ✅ 攔截: <b>{gemini_saved_tokens:,} tokens / ${gemini_saved_cost:.2f} ({saved_pct:.1f}%)</b>\n"
f"\n"
f"🤖 <b>Provider 分布</b>\n"
f" Ollama (自架免費): {stats['ollama_calls']:,} calls\n"
f" Gemini: {stats['gemini_calls']:,} calls\n"
f" Claude: {stats['claude_calls']:,} calls / ${stats['claude_cost']:.2f}\n"
f" NIM: {stats['nim_calls']:,} calls\n"
f"\n"
f"🧠 <b>RAG 自主學習</b>\n"
f" 查詢: {stats['rag_total']:,}\n"
f" 攔截 LLM: {stats['rag_saved']:,}saved_call=true\n"
f" 反饋分: {stats['rag_avg_feedback']:.2f} / 5\n"
f"\n"
f"🔧 <b>MCP + Cache</b>\n"
f" MCP 呼叫: {stats['mcp_total']:,}\n"
f" Cache 命中: {stats['cache_hit_calls']:,} ai_calls + {stats['mcp_cache_hits']:,} mcp_calls\n"
f"{feedback_summary}"
f"{recommendations_block}"
f"\n"
f"📈 <b>戰役 v5.0 KPI</b>\n"
f" Gemini -23.5% 目標:{'✅ 達標' if saved_pct >= 23 else f'⚠️ {saved_pct:.1f}%'}\n"
f" RAG 命中 ≥25% 目標:{'' if stats['rag_total'] > 0 and stats['rag_saved']/max(stats['rag_total'],1) >= 0.25 else ''}"
)
def generate_and_send_roi_report() -> Dict[str, Any]:
"""每月 1 日 09:00 cron 呼叫主入口"""
stats = query_last_month_stats()
if not stats:
logger.warning('[ROI] no stats; skip')
return {'sent': False, 'reason': 'no_stats'}
msg = render_roi_report(stats)
# 推 Telegram
try:
from services.telegram_templates import _send_telegram_raw
_send_telegram_raw(msg)
logger.info('[ROI] %s 月報已推 Telegram', stats['period_label'])
except Exception as exc:
logger.warning('[ROI] telegram push failed: %s', exc)
# 寫 ai_insights 長期記錄
try:
from sqlalchemy import text as sa_text
from database.manager import get_session
session = get_session()
try:
session.execute(
sa_text("""
INSERT INTO ai_insights (insight_type, content, status, created_by, confidence)
VALUES ('roi_monthly_report', :content, 'approved', 'roi_report_service', 0.95)
"""),
{'content': msg},
)
session.commit()
finally:
session.close()
except Exception as exc:
logger.warning('[ROI] persist failed: %s', exc)
return {'sent': True, 'period': stats['period_label'], 'msg_chars': len(msg)}
__all__ = [
'query_last_month_stats',
'render_roi_report',
'generate_and_send_roi_report',
'BASELINE',
]