Files
ewoooc/routes/admin_observability_routes.py
ogt f823439496
Some checks failed
CD Pipeline / deploy (push) Failing after 1m11s
fix: hide caller keys in observability UI
2026-06-26 19:10:12 +08:00

4153 lines
174 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
routes/admin_observability_routes.py
Operation Ollama-First v5.0 / Phase 27 — Admin Observability Dashboard
提供 admin 介面看戰役累積的觀測資料:
/observability/ai_calls — ai_calls 即時查詢(含篩選 / 圖表)
/observability/promotion_review — Phase 28 PromotionGate 待審核列表
/observability/quality_trend — Phase 25 caller 反饋趨勢
/observability/host_health — 三主機 Ollama + MCP 健康度
設計原則:
- 純讀(除了 promotion approve/reject 是 mutation
- 失敗安全DB 失敗回空清單 + 警告 banner
- 每頁 100 筆分頁,無限捲動
- 不暴露 secret / prompt 原文
"""
import logging
import re
import threading
import time
from datetime import datetime, timedelta
from flask import Blueprint, render_template, request, jsonify, send_file, url_for
from sqlalchemy import text as sa_text
from auth import login_required, get_current_user
from database.manager import get_session
logger = logging.getLogger(__name__)
admin_observability_bp = Blueprint(
'admin_observability',
__name__,
url_prefix='/observability',
)
_PPT_AIDER_HEAL_LOCK = threading.Lock()
_PPT_AIDER_HEAL_ACTIVE = {}
_HEALTH_INDICATOR_CACHE_LOCK = threading.Lock()
_HEALTH_INDICATOR_CACHE = {
'expires_at': 0.0,
'payload': None,
}
_HEALTH_INDICATOR_CACHE_TTL_SECONDS = 30
_PPT_PUBLIC_RUNTIME_ERROR = '視覺審核暫時無法完成;請先用線上預覽人工確認版面,稍後重新執行審核。'
_PPT_INTERNAL_ERROR_MARKERS = (
'all 3 hosts failed',
'httpconnectionpool',
'multimodal data provided',
'model does not support',
'/api/generate',
'connectionerror',
'readtimeout',
'traceback',
'gcp-a',
'gcp-b',
)
_PPT_PUBLIC_REPLACEMENTS = (
('AiderHeal', '修復流程'),
('RAG', '知識建議'),
('Ollama', 'AI 建議服務'),
('minicpm-v', '視覺模型'),
('LibreOffice', '轉檔服務'),
('runtime', '執行條件'),
('DB', '產出紀錄'),
('database', '產出紀錄'),
('filesystem', '檔案來源'),
('ppt_audit_results', '審核紀錄'),
('ppt_generation_runs', '產出紀錄'),
)
_GEMINI_BACKUP_CALLER_DISPLAY = {
# 專用備援 caller 落地前的舊資料仍會存在;顯示時標成備援,避免誤判 Gemini-first。
'code_review_openclaw': 'code_review_openclaw_gemini',
'openclaw_qa': 'openclaw_qa_gemini_fallback',
}
_GEMINI_BACKUP_CALLERS = {
'code_review_openclaw_gemini',
'openclaw_daily_gemini_fallback',
'openclaw_daily_insight_gemini_fallback',
'openclaw_weekly_gemini_fallback',
'openclaw_monthly_gemini_fallback',
'openclaw_meta_gemini_fallback',
'openclaw_qa_gemini_fallback',
'openclaw_bot_gemini',
'openclaw_bot_image_gemini',
}
def _list_ppt_aider_heal_active_jobs():
with _PPT_AIDER_HEAL_LOCK:
jobs = [dict(job) for job in _PPT_AIDER_HEAL_ACTIVE.values()]
for job in jobs:
job['diagnosis'] = _public_ppt_text(job.get('diagnosis'), max_chars=100)
return jobs
def _ppt_text_has_internal_detail(value) -> bool:
text = str(value or '')
if not text:
return False
lowered = text.lower()
if any(marker in lowered for marker in _PPT_INTERNAL_ERROR_MARKERS):
return True
return bool(re.search(r'\b(?:\d{1,3}\.){3}\d{1,3}(?::\d+)?\b', text))
def _public_ppt_text(value, *, empty='', max_chars=180):
"""把 PPT 觀測台的內部錯誤轉成操作員可讀的處置文字。"""
text = str(value or '').strip()
if not text:
return empty
if _ppt_text_has_internal_detail(text):
return _PPT_PUBLIC_RUNTIME_ERROR
for raw, label in _PPT_PUBLIC_REPLACEMENTS:
text = text.replace(raw, label)
text = re.sub(r'\b(?:\d{1,3}\.){3}\d{1,3}(?::\d+)?\b', '內部主機', text)
text = re.sub(r'\s+', ' ', text).strip()
if max_chars and len(text) > max_chars:
return text[:max_chars].rstrip() + ''
return text
def _public_ppt_text_list(values, *, max_chars=120):
public_values = []
for value in values or []:
text = _public_ppt_text(value, max_chars=max_chars)
if text and text not in public_values:
public_values.append(text)
return public_values
def _public_ppt_source_label(source):
return {
'both': '檔案 + 產出紀錄',
'database': '產出紀錄',
'filesystem': '檔案來源',
}.get(str(source or '').strip(), '檔案來源')
def _public_ppt_vision_status(status):
status = dict(status or {})
status['summary'] = _public_ppt_text(
status.get('summary'),
empty='視覺檢查狀態待確認。',
max_chars=140,
)
status['status_label'] = _public_ppt_text(
status.get('status_label'),
empty='待確認',
max_chars=40,
)
status['model_label'] = '視覺模型' if status.get('model') else '未啟用'
status['converter_label'] = '轉檔服務' if status.get('converter') else '轉檔條件待確認'
status['blockers'] = _public_ppt_text_list(
status.get('blockers'),
max_chars=80,
) or ['視覺檢查條件待確認']
status['next_actions'] = _public_ppt_text_list(
status.get('next_actions'),
max_chars=90,
) or ['確認視覺檢查條件後重新整理此頁。']
sanitized_checks = []
for check in status.get('readiness_checks') or []:
if not isinstance(check, dict):
continue
item = dict(check)
item['label'] = _public_ppt_text(item.get('label'), empty='檢查項目', max_chars=50)
item['value'] = _public_ppt_text(item.get('value'), empty='待確認', max_chars=50)
item['detail'] = _public_ppt_text(item.get('detail'), empty='等待檢查結果', max_chars=90)
sanitized_checks.append(item)
status['readiness_checks'] = sanitized_checks
return status
def _public_ppt_vision_audit_status(status):
status = dict(status or {})
status['status_label'] = _public_ppt_text(status.get('status_label'), empty='待確認', max_chars=60)
status['message'] = _public_ppt_text(
status.get('message'),
empty='視覺檢查狀態待確認。',
max_chars=140,
)
return status
def _build_ai_call_recent_row(row):
"""整理最近 ai_calls 列表,讓 Ollama-first 備援語意可被辨識。"""
caller = row[2] or ''
provider = row[3] or ''
caller_display = caller
route_badges = []
if provider == 'gemini':
if caller in _GEMINI_BACKUP_CALLER_DISPLAY:
caller_display = _GEMINI_BACKUP_CALLER_DISPLAY[caller]
route_badges.append('雲端備援')
route_badges.append('舊 caller')
elif caller in _GEMINI_BACKUP_CALLERS or caller.endswith('_gemini'):
route_badges.append('雲端備援')
else:
route_badges.append('ADR-028 鎖定/升級')
return {
'id': row[0],
'called_at': row[1].strftime('%H:%M:%S'),
'caller': caller,
'caller_display': caller_display,
'provider': provider,
'model': row[4],
'in_tokens': int(row[5] or 0),
'out_tokens': int(row[6] or 0),
'duration_ms': int(row[7] or 0),
'status': row[8],
'cost': float(row[9] or 0),
'cache_hit': bool(row[10]),
'rag_hit': bool(row[11]),
'route_badges': route_badges,
}
# ─────────────────────────────────────────────────────────────────────────────
# /observability/overview — Phase 45 總覽(單頁聚合 6 項 KPI
# ─────────────────────────────────────────────────────────────────────────────
@admin_observability_bp.route('/')
@admin_observability_bp.route('/overview')
@login_required
def observability_overview():
"""Phase 45 — 觀測台總覽:一頁式聚合 6 個 sub-page 的關鍵 KPI。
對應 Phase 44 daily Telegram summary 的 web 版本,做為 sidebar 入口頁。
所有區塊失敗安全:個別 query 失敗只跳過該卡片,不擋整頁渲染。
"""
from datetime import datetime as _dt
today = _dt.now()
month_start = _dt(today.year, today.month, 1)
summary = {}
session = get_session()
try:
# 三主機 24h 在線率
try:
host_rows = session.execute(
sa_text("""
SELECT host_label, COUNT(*) AS total,
COUNT(*) FILTER (WHERE healthy) AS up,
COALESCE(AVG(response_ms) FILTER (WHERE healthy), 0) AS avg_ms
FROM host_health_probes
WHERE probed_at >= NOW() - INTERVAL '24 hours'
GROUP BY host_label ORDER BY host_label
"""),
).fetchall()
summary['hosts'] = [
{
'label': r[0],
'total': int(r[1] or 0),
'up': int(r[2] or 0),
'avg_ms': int(r[3] or 0),
'uptime_pct': (float(r[2] or 0) / float(r[1]) * 100) if r[1] else 0,
}
for r in host_rows
]
except Exception:
summary['hosts'] = []
# AI 呼叫 24h
try:
ai = session.execute(
sa_text("""
SELECT COUNT(*), COALESCE(SUM(input_tokens + output_tokens), 0),
COALESCE(SUM(cost_usd), 0),
COUNT(*) FILTER (WHERE status NOT IN ('ok','cache_only')),
COUNT(*) FILTER (WHERE rag_hit),
COUNT(*) FILTER (WHERE cache_hit)
FROM ai_calls
WHERE called_at >= NOW() - INTERVAL '24 hours'
"""),
).fetchone()
total = int(ai[0] or 0)
summary['ai_calls'] = {
'total': total,
'tokens': int(ai[1] or 0),
'cost_24h': float(ai[2] or 0),
'errors': int(ai[3] or 0),
'rag_hits': int(ai[4] or 0),
'cache_hits': int(ai[5] or 0),
'error_rate': (float(ai[3] or 0) / total * 100) if total else 0,
'rag_rate': (float(ai[4] or 0) / total * 100) if total else 0,
'cache_rate': (float(ai[5] or 0) / total * 100) if total else 0,
}
except Exception:
summary['ai_calls'] = {}
# 當月成本
try:
month_cost = session.execute(
sa_text("SELECT COALESCE(SUM(cost_usd), 0) FROM ai_calls WHERE called_at >= :ms"),
{'ms': month_start},
).fetchone()[0]
summary['month_cost'] = float(month_cost or 0)
except Exception:
summary['month_cost'] = 0
# 預算 over 80%
try:
budgets = session.execute(
sa_text("""
SELECT b.period, b.provider, b.budget_usd, b.alert_pct,
COALESCE((
SELECT SUM(cost_usd) FROM ai_calls
WHERE called_at >= :ms
AND (b.provider IS NULL OR provider = b.provider)
), 0) AS spent
FROM ai_call_budgets b
"""),
{'ms': month_start},
).fetchall()
over_threshold = []
for r in budgets:
budget = float(r[2] or 0)
spent = float(r[4] or 0)
ratio = spent / budget if budget > 0 else 0
threshold = float(r[3] or 80) / 100
if ratio >= threshold:
over_threshold.append({
'period': r[0], 'provider': r[1] or '(全部)',
'spent': spent, 'budget': budget, 'ratio': ratio,
})
summary['budget_alerts'] = over_threshold
except Exception:
summary['budget_alerts'] = []
# 待審 + 蒸餾池
try:
ep_pending = session.execute(
sa_text("SELECT COUNT(*) FROM learning_episodes WHERE promotion_status = 'awaiting_review' AND reviewed_at IS NULL"),
).fetchone()[0]
ep_total_30d = session.execute(
sa_text("SELECT COUNT(*) FROM learning_episodes WHERE created_at >= NOW() - INTERVAL '30 days'"),
).fetchone()[0]
ep_approved_30d = session.execute(
sa_text("SELECT COUNT(*) FROM learning_episodes WHERE created_at >= NOW() - INTERVAL '30 days' AND promotion_status = 'approved'"),
).fetchone()[0]
summary['episodes'] = {
'pending': int(ep_pending or 0),
'total_30d': int(ep_total_30d or 0),
'approved_30d': int(ep_approved_30d or 0),
'approval_rate': (float(ep_approved_30d or 0) / float(ep_total_30d) * 100) if ep_total_30d else 0,
}
except Exception:
summary['episodes'] = {}
# PPT 視覺審核 7d
try:
ppt = session.execute(
sa_text("""
SELECT COUNT(*),
COUNT(*) FILTER (WHERE audit_status='passed'),
COUNT(*) FILTER (WHERE audit_status='failed')
FROM ppt_audit_results
WHERE audited_at >= NOW() - INTERVAL '7 days'
"""),
).fetchone()
ppt_total = int(ppt[0] or 0)
summary['ppt'] = {
'total': ppt_total,
'passed': int(ppt[1] or 0),
'failed': int(ppt[2] or 0),
'pass_rate': (float(ppt[1] or 0) / ppt_total * 100) if ppt_total else 0,
}
except Exception:
summary['ppt'] = {}
# AIOps 7d
try:
inc = session.execute(
sa_text("""
SELECT COUNT(*),
COUNT(*) FILTER (WHERE status='open'),
COUNT(*) FILTER (WHERE severity IN ('P0','P1'))
FROM incidents
WHERE created_at >= NOW() - INTERVAL '7 days'
"""),
).fetchone()
heal = session.execute(
sa_text("""
SELECT COUNT(*),
COUNT(*) FILTER (WHERE result='success')
FROM heal_logs
WHERE created_at >= NOW() - INTERVAL '7 days'
"""),
).fetchone()
summary['aiops'] = {
'incidents_total': int(inc[0] or 0),
'incidents_open': int(inc[1] or 0),
'incidents_p0_p1': int(inc[2] or 0),
'heals_total': int(heal[0] or 0),
'heals_success': int(heal[1] or 0),
'heal_rate': (float(heal[1] or 0) / float(heal[0]) * 100) if heal[0] else 0,
}
except Exception:
summary['aiops'] = {}
# MCP 24h
try:
mcp = session.execute(
sa_text("""
SELECT COUNT(*), COUNT(DISTINCT server),
COUNT(*) FILTER (WHERE cache_hit),
COALESCE(SUM(cost_usd), 0)
FROM mcp_calls
WHERE called_at >= NOW() - INTERVAL '24 hours'
"""),
).fetchone()
mcp_total = int(mcp[0] or 0)
summary['mcp'] = {
'total': mcp_total,
'servers': int(mcp[1] or 0),
'cache_hits': int(mcp[2] or 0),
'cost': float(mcp[3] or 0),
'cache_rate': (float(mcp[2] or 0) / mcp_total * 100) if mcp_total else 0,
}
except Exception:
summary['mcp'] = {}
finally:
session.close()
# Phase 51 O-3: 24h 三主機健康 sparkline 資料(每小時 bucket × 3 host
host_sparkline = {}
try:
s_sp = get_session()
try:
sp_rows = s_sp.execute(
sa_text("""
SELECT host_label,
date_trunc('hour', probed_at) AS hr,
COUNT(*) AS total,
COUNT(*) FILTER (WHERE healthy) AS up
FROM host_health_probes
WHERE probed_at >= NOW() - INTERVAL '24 hours'
GROUP BY host_label, hr
ORDER BY host_label, hr ASC
"""),
).fetchall()
for r in sp_rows:
label, hr, total, up = r[0], r[1], int(r[2] or 0), int(r[3] or 0)
if label not in host_sparkline:
host_sparkline[label] = {'hours': [], 'uptime_pct': []}
host_sparkline[label]['hours'].append(
hr.strftime('%H:00') if hr else ''
)
host_sparkline[label]['uptime_pct'].append(
(up / total * 100) if total else 0
)
finally:
s_sp.close()
except Exception:
pass
return render_template(
'admin/observability_overview.html',
active_page='obs_overview',
summary=summary,
host_sparkline=host_sparkline,
today=today.strftime('%Y-%m-%d'),
)
# ─────────────────────────────────────────────────────────────────────────────
# /observability/rag_queries — Phase 51 RAG 召回詳情
# ─────────────────────────────────────────────────────────────────────────────
@admin_observability_bp.route('/rag_queries')
@login_required
def rag_queries_dashboard():
"""Phase 51 — RAG 召回詳情:每筆 query 的命中、saved_call、反饋。
補完 RAG 觀測深度:之前只看 caller 級命中率,現在看每筆查詢的真實內容。
"""
hours = int(request.args.get('hours', '24'))
caller_filter = request.args.get('caller', '').strip()
saved_only = request.args.get('saved_only', '').strip() == '1'
session = get_session()
try:
rag_query_log_exists = bool(session.execute(
sa_text("SELECT to_regclass('public.rag_query_log') IS NOT NULL")
).scalar())
if not rag_query_log_exists:
return render_template(
'admin/rag_queries.html',
active_page='obs_rag_queries',
hours=hours,
caller_filter=caller_filter,
saved_only=saved_only,
summary={},
callers=[],
by_caller=[],
queries=[],
error='rag_query_log 尚未建立RAG 召回資料待接入。',
)
# 整體統計
summary_row = session.execute(
sa_text("""
SELECT COUNT(*) AS total,
COUNT(*) FILTER (WHERE saved_call) AS saved,
COUNT(*) FILTER (WHERE hit_count > 0) AS with_hits,
COALESCE(AVG(hit_count), 0) AS avg_hits,
COALESCE(AVG(feedback_score) FILTER (WHERE feedback_score IS NOT NULL), 0) AS avg_score,
COUNT(*) FILTER (WHERE feedback_score IS NOT NULL) AS feedback_count,
COUNT(DISTINCT caller) AS distinct_callers
FROM rag_query_log
WHERE queried_at >= NOW() - (:h * INTERVAL '1 hour')
"""),
{'h': hours},
).fetchone()
total = int(summary_row[0] or 0)
saved = int(summary_row[1] or 0)
with_hits = int(summary_row[2] or 0)
summary = {
'total': total,
'saved': saved,
'with_hits': with_hits,
'no_hits': total - with_hits,
'avg_hits': round(float(summary_row[3] or 0), 2),
'avg_score': round(float(summary_row[4] or 0), 2),
'feedback_count': int(summary_row[5] or 0),
'distinct_callers': int(summary_row[6] or 0),
'saved_rate': (float(saved) / total * 100) if total else 0,
'hit_rate': (float(with_hits) / total * 100) if total else 0,
}
# caller 列表dropdown
callers = session.execute(
sa_text("""
SELECT DISTINCT caller FROM rag_query_log
WHERE queried_at >= NOW() - (:h * INTERVAL '1 hour')
ORDER BY caller
"""),
{'h': hours},
).fetchall()
caller_list = [r[0] for r in callers]
# by caller 統計
by_caller = session.execute(
sa_text("""
SELECT caller,
COUNT(*) AS total,
COUNT(*) FILTER (WHERE saved_call) AS saved,
COUNT(*) FILTER (WHERE hit_count > 0) AS with_hits,
COALESCE(AVG(feedback_score) FILTER (WHERE feedback_score IS NOT NULL), 0) AS avg_score,
COUNT(*) FILTER (WHERE feedback_score IS NOT NULL) AS fb_count
FROM rag_query_log
WHERE queried_at >= NOW() - (:h * INTERVAL '1 hour')
GROUP BY caller
ORDER BY total DESC
"""),
{'h': hours},
).fetchall()
# 最近 50 筆查詢(套 caller filter + saved_only
params = {'h': hours, 'caller_f': caller_filter}
recent_queries = session.execute(
sa_text(f"""
SELECT id, queried_at, caller, LEFT(query_text, 200) AS qtext,
top_k, threshold, hit_count, used_results, saved_call,
feedback_score, request_id
FROM rag_query_log
WHERE queried_at >= NOW() - (:h * INTERVAL '1 hour')
AND (:caller_f = '' OR caller = :caller_f)
{"AND saved_call = TRUE" if saved_only else ""}
ORDER BY queried_at DESC
LIMIT 50
"""),
params,
).fetchall()
queries = []
for r in recent_queries:
used_ids = list(r[7]) if r[7] else []
queries.append({
'id': int(r[0]),
'queried_at': r[1].strftime('%Y-%m-%d %H:%M:%S') if r[1] else '',
'caller': r[2],
'query_text': r[3] or '',
'take_count': int(r[4] or 0),
'threshold': round(float(r[5] or 0), 3),
'hit_count': int(r[6] or 0),
'used_results': used_ids,
'saved_call': bool(r[8]),
'feedback_score': int(r[9]) if r[9] is not None else None,
'request_id': r[10],
})
return render_template(
'admin/rag_queries.html',
active_page='obs_rag_queries',
hours=hours,
caller_filter=caller_filter,
saved_only=saved_only,
summary=summary,
callers=caller_list,
by_caller=[
{
'caller': r[0], 'total': int(r[1] or 0),
'saved': int(r[2] or 0), 'with_hits': int(r[3] or 0),
'avg_score': round(float(r[4] or 0), 2),
'fb_count': int(r[5] or 0),
'saved_rate': (float(r[2] or 0) / float(r[1]) * 100) if r[1] else 0,
'hit_rate': (float(r[3] or 0) / float(r[1]) * 100) if r[1] else 0,
}
for r in by_caller
],
queries=queries,
error=None,
)
except Exception as e:
return render_template(
'admin/rag_queries.html',
active_page='obs_rag_queries', hours=hours,
caller_filter=caller_filter, saved_only=saved_only,
summary={}, callers=[], by_caller=[], queries=[],
error='RAG 召回資料暫時不可用,已切換安全空狀態。',
)
finally:
session.close()
@admin_observability_bp.route('/rag_queries/<int:query_id>/hits', methods=['GET'])
@login_required
def rag_query_hits(query_id: int):
"""Phase 51 — JSON API回傳單筆 query 的 hits 詳細內容(給 modal 展開)。"""
try:
session = get_session()
try:
row = session.execute(
sa_text("""
SELECT id, query_text, used_results, hit_count, threshold
FROM rag_query_log WHERE id = :id
"""),
{'id': query_id},
).fetchone()
if not row:
return jsonify({'ok': False, 'error': 'not found'}), 404
used_ids = list(row[2]) if row[2] else []
hits = []
if used_ids:
rows = session.execute(
sa_text("""
SELECT id, insight_type, period, product_sku,
LEFT(content, 300) AS preview, created_at
FROM ai_insights
WHERE id = ANY(:ids)
ORDER BY created_at DESC
"""),
{'ids': used_ids},
).fetchall()
hits = [
{
'id': int(h[0]),
'insight_type': h[1],
'period': h[2],
'product_sku': h[3],
'content': h[4] or '',
'created_at': h[5].strftime('%Y-%m-%d') if h[5] else '',
}
for h in rows
]
return jsonify({
'ok': True,
'query_id': query_id,
'query_text': row[1],
'hit_count': int(row[3] or 0),
'threshold': round(float(row[4] or 0), 3),
'hits': hits,
})
finally:
session.close()
except Exception as e:
return jsonify({'ok': False, 'error': f'{type(e).__name__}: {str(e)[:200]}'}), 500
# ─────────────────────────────────────────────────────────────────────────────
# /observability/business_intel — Phase 48 商業面 × AI 編排
# ─────────────────────────────────────────────────────────────────────────────
@admin_observability_bp.route('/business_intel')
@login_required
def business_intel_dashboard():
"""Phase 48 — 商業面 × AI 編排:把 AI 觀測台延伸到商業層級。
展現「AI 在做什麼生意」:
- ai_price_recommendations × competitor_prices: AI 看到什麼定價機會
- action_plans × action_outcomes: 計畫到 verdict 的閉環
- competitor_match_attempts: 競品比對失敗追蹤
"""
days = int(request.args.get('days', '7'))
session = get_session()
try:
# 1. ai_price_recommendations 30d 總覽
rec_summary = session.execute(
sa_text(f"""
SELECT strategy, COUNT(*) AS cnt,
COALESCE(AVG(confidence), 0) AS avg_conf,
COALESCE(AVG(gap_pct), 0) AS avg_gap_pct,
COALESCE(AVG(sales_7d_delta), 0) AS avg_sales_delta
FROM ai_price_recommendations
WHERE created_at >= NOW() - INTERVAL '{int(days)} days'
GROUP BY strategy ORDER BY cnt DESC
"""),
).fetchall()
rec_by_strategy = [
{
'strategy': r[0], 'count': int(r[1] or 0),
'avg_confidence': round(float(r[2] or 0), 3),
'avg_gap_pct': round(float(r[3] or 0), 2),
'avg_sales_delta': round(float(r[4] or 0), 2),
}
for r in rec_summary
]
# 2. ai_price_recommendations 最近 20 筆詳細
latest_recs = session.execute(
sa_text("""
SELECT id, sku, LEFT(name, 50), strategy, confidence,
momo_price, pchome_price, gap_pct, sales_7d_delta,
LEFT(reason, 120), created_at
FROM ai_price_recommendations
ORDER BY created_at DESC LIMIT 20
"""),
).fetchall()
latest_recommendations = [
{
'id': r[0], 'sku': r[1], 'name': r[2], 'strategy': r[3],
'confidence': round(float(r[4] or 0), 3),
'momo_price': float(r[5] or 0),
'pchome_price': float(r[6] or 0) if r[6] else None,
'gap_pct': round(float(r[7] or 0), 2),
'sales_delta': round(float(r[8] or 0), 2) if r[8] is not None else None,
'reason': r[9] or '',
'created_at': r[10].strftime('%m-%d %H:%M') if r[10] else '',
}
for r in latest_recs
]
# 3. action_plans × action_outcomes 閉環30d
closed_loops = session.execute(
sa_text(f"""
SELECT p.id, p.sku, p.plan_type, p.status,
p.created_by, p.created_at, p.executed_at,
o.verdict, o.metric_type, o.before_val, o.after_val
FROM action_plans p
LEFT JOIN action_outcomes o ON o.plan_id = p.id
WHERE p.created_at >= NOW() - INTERVAL '{int(days)} days'
ORDER BY p.created_at DESC LIMIT 25
"""),
).fetchall()
loop_records = []
for r in closed_loops:
before = float(r[9]) if r[9] is not None else None
after = float(r[10]) if r[10] is not None else None
change_pct = None
if before and before != 0 and after is not None:
change_pct = (after - before) / abs(before) * 100
loop_records.append({
'plan_id': r[0], 'sku': r[1], 'plan_type': r[2],
'status': r[3], 'created_by': r[4],
'created_at': r[5].strftime('%m-%d %H:%M') if r[5] else '',
'executed_at': r[6].strftime('%m-%d %H:%M') if r[6] else None,
'verdict': r[7], 'metric_type': r[8],
'before': before, 'after': after, 'change_pct': change_pct,
})
# 4. action_outcomes verdict 統計
verdict_summary = session.execute(
sa_text(f"""
SELECT verdict, COUNT(*) AS cnt,
AVG(after_val - before_val) AS avg_delta
FROM action_outcomes
WHERE created_at >= NOW() - INTERVAL '{int(days)} days'
AND before_val IS NOT NULL AND after_val IS NOT NULL
GROUP BY verdict ORDER BY cnt DESC
"""),
).fetchall()
verdict_stats = [
{
'verdict': r[0] or 'unknown', 'count': int(r[1] or 0),
'avg_delta': round(float(r[2] or 0), 2),
}
for r in verdict_summary
]
# 5. competitor_match_attempts 失敗統計30d
match_attempts = session.execute(
sa_text(f"""
SELECT attempt_status, COUNT(*) AS cnt,
COALESCE(AVG(candidate_count), 0) AS avg_candidates,
COALESCE(AVG(best_match_score), 0) AS avg_score
FROM competitor_match_attempts
WHERE attempted_at >= NOW() - INTERVAL '{int(days)} days'
GROUP BY attempt_status ORDER BY cnt DESC
"""),
).fetchall()
match_stats = [
{
'status': r[0], 'count': int(r[1] or 0),
'avg_candidates': round(float(r[2] or 0), 1),
'avg_score': round(float(r[3] or 0), 3),
}
for r in match_attempts
]
# 6. competitor_prices 24h 變動 TOP 10
recent_competitor = session.execute(
sa_text("""
SELECT cph.sku, cph.competitor_product_name, cph.price,
cph.momo_price, cph.discount_pct, cph.match_score,
cph.crawled_at, cph.source
FROM competitor_price_history cph
WHERE cph.crawled_at >= NOW() - INTERVAL '24 hours'
AND cph.match_score >= 0.7
AND COALESCE(cph.tags, '[]'::jsonb) ? 'identity_v2'
ORDER BY cph.crawled_at DESC LIMIT 12
"""),
).fetchall()
recent_competitor_prices = [
{
'sku': r[0],
'product_name': (r[1] or '')[:50],
'pchome_price': float(r[2] or 0),
'momo_price': float(r[3] or 0) if r[3] else None,
'discount_pct': int(r[4]) if r[4] else None,
'match_score': round(float(r[5] or 0), 3),
'gap': (float(r[3]) - float(r[2])) if (r[2] and r[3]) else None,
'crawled_at': r[6].strftime('%m-%d %H:%M') if r[6] else '',
'source': r[7] or '外部電商',
}
for r in recent_competitor
]
promo_watch_rows = []
for item in recent_competitor_prices:
discount_pct = float(item.get('discount_pct') or 0)
gap = item.get('gap')
gap_value = float(gap) if gap is not None else 0.0
is_external_pressure = gap_value < 0
is_discount_signal = discount_pct >= 5
if not (is_external_pressure or is_discount_signal):
continue
if is_external_pressure:
pressure_label = '外部低價壓力'
recommended_action = '檢查 PChome 售價、折扣券、組合包與商品頁主賣點'
else:
pressure_label = '外部促銷訊號'
recommended_action = '比對活動條件後,安排 PChome 主推曝光或會員回饋'
promo_watch_rows.append({
**item,
'pressure_label': pressure_label,
'recommended_action': recommended_action,
'gap_abs': abs(gap_value),
})
if len(promo_watch_rows) >= 8:
break
# 7. 高 confidence 但未 follow-through (recommendation 沒對應 action_plan)
unfollowed = session.execute(
sa_text(f"""
SELECT COUNT(*)
FROM ai_price_recommendations r
WHERE r.created_at >= NOW() - INTERVAL '{int(days)} days'
AND r.confidence >= 0.7
AND NOT EXISTS (
SELECT 1 FROM action_plans p
WHERE p.sku = r.sku
AND p.created_at >= r.created_at
AND p.created_at < r.created_at + INTERVAL '7 days'
)
"""),
).fetchone()
unfollowed_count = int(unfollowed[0] or 0) if unfollowed else 0
return render_template(
'admin/business_intel.html',
active_page='obs_business_intel',
days=days,
rec_by_strategy=rec_by_strategy,
latest_recommendations=latest_recommendations,
loop_records=loop_records,
verdict_stats=verdict_stats,
match_stats=match_stats,
recent_competitor_prices=recent_competitor_prices,
promo_watch_rows=promo_watch_rows,
unfollowed_count=unfollowed_count,
error=None,
)
except Exception as e:
return render_template(
'admin/business_intel.html',
active_page='obs_business_intel', days=days,
rec_by_strategy=[], latest_recommendations=[], loop_records=[],
verdict_stats=[], match_stats=[], recent_competitor_prices=[],
promo_watch_rows=[],
unfollowed_count=0,
error='商業 AI 資料暫時不可用,已切換安全空狀態。',
)
finally:
session.close()
# ─────────────────────────────────────────────────────────────────────────────
# /observability/agent_orchestration — Phase 46 編排矩陣
# ─────────────────────────────────────────────────────────────────────────────
# caller → agent 歸類規則(同 services/* 各 agent 真實 caller 值)
_AGENT_CALLER_GROUPS = {
'openclaw': [
'openclaw_qa', 'openclaw_qa_gemini_fallback', 'openclaw_qa_nim',
'openclaw_daily', 'openclaw_daily_gemini_fallback', 'openclaw_daily_nim',
'openclaw_daily_insight', 'openclaw_daily_insight_gemini_fallback',
'openclaw_daily_insight_nim',
'openclaw_meta', 'openclaw_meta_gemini_fallback', 'openclaw_meta_nim',
'openclaw_monthly', 'openclaw_monthly_gemini_fallback', 'openclaw_monthly_nim',
'openclaw_weekly', 'openclaw_weekly_gemini_fallback', 'openclaw_weekly_nim',
'openclaw_bot_main', 'openclaw_bot_gemini', 'openclaw_bot_nim',
'sales_copy', 'code_review_openclaw', 'code_review_openclaw_gemini',
],
'hermes': [
'hermes_analyst', 'hermes_intent', 'code_review_hermes',
],
'nemotron': [
'nemotron_dispatch',
],
'elephant_alpha': [
'ea_engine', 'code_review_elephant',
],
}
_AGENT_LABELS = {
'openclaw': ('🤖 OpenClaw', '主編排者 / Bot 對話 / 報告生成'),
'hermes': ('🔍 Hermes', '價格/程式碼分析師'),
'nemotron': ('🧬 NemoTron', '任務 dispatcher'),
'elephant_alpha': ('🐘 ElephantAlpha', '自主決策引擎'),
}
# Provider → 類別歸類
_PROVIDER_TIER = {
'gcp_ollama': 'ollama_local',
'ollama_secondary': 'ollama_local',
'ollama_111': 'ollama_local',
'ollama_other': 'ollama_local',
'gemini': 'paid_external',
'claude': 'paid_external',
'nim': 'paid_external',
'nim_via_elephant': 'paid_external',
'openrouter': 'paid_external',
}
@admin_observability_bp.route('/agent_orchestration')
@login_required
def agent_orchestration_dashboard():
"""Phase 46 — 4 Agent × Models × MCP × RAG 編排矩陣
展現「組合發揮」:每個 agent 在 24h 內如何調用 Ollama/Gemini
搭配 MCP tool外部 + 內部 mcp_collector與 RAG 知識庫的協作。
資料來源ai_calls × mcp_calls × rag_query_log 三表跨 JOIN + caller 分組。
"""
hours = int(request.args.get('hours', '24'))
session = get_session()
try:
# 1. 整體統計
overall = session.execute(
sa_text("""
SELECT COUNT(*),
COALESCE(SUM(cost_usd), 0),
COUNT(*) FILTER (WHERE provider IN ('gemini','claude','nim','openrouter','nim_via_elephant')),
COUNT(*) FILTER (WHERE provider IN ('gcp_ollama','ollama_secondary','ollama_111','ollama_other')),
COUNT(*) FILTER (WHERE rag_hit),
COALESCE(SUM(input_tokens + output_tokens), 0)
FROM ai_calls
WHERE called_at >= NOW() - (:h * INTERVAL '1 hour')
"""),
{'h': hours},
).fetchone()
total_calls = int(overall[0] or 0)
total_cost = float(overall[1] or 0)
paid_calls = int(overall[2] or 0)
local_calls = int(overall[3] or 0)
rag_hits = int(overall[4] or 0)
total_tokens = int(overall[5] or 0)
mcp_calls_table_exists = bool(session.execute(
sa_text("SELECT to_regclass('public.mcp_calls') IS NOT NULL")
).scalar())
# 2. 每個 agent group 的細節
agent_matrix = []
for agent_key, callers in _AGENT_CALLER_GROUPS.items():
ag_row = session.execute(
sa_text("""
SELECT COUNT(*) AS calls,
COALESCE(SUM(input_tokens + output_tokens), 0) AS tokens,
COALESCE(SUM(cost_usd), 0) AS cost,
COUNT(*) FILTER (WHERE rag_hit) AS rag_hits,
COUNT(*) FILTER (WHERE provider IN ('gcp_ollama','ollama_secondary','ollama_111','ollama_other')) AS ollama,
COUNT(*) FILTER (WHERE provider = 'gcp_ollama') AS ollama_gcp_a,
COUNT(*) FILTER (WHERE provider = 'ollama_secondary') AS ollama_gcp_b,
COUNT(*) FILTER (WHERE provider = 'ollama_111') AS ollama_111,
COUNT(*) FILTER (WHERE provider = 'gemini') AS gemini,
COUNT(*) FILTER (WHERE provider IN ('claude','nim','openrouter','nim_via_elephant')) AS other_paid,
COUNT(*) FILTER (WHERE status NOT IN ('ok','cache_only')) AS errors,
COALESCE(AVG(duration_ms), 0) AS avg_ms
FROM ai_calls
WHERE called_at >= NOW() - (:h * INTERVAL '1 hour')
AND caller = ANY(:callers)
"""),
{'h': hours, 'callers': callers},
).fetchone()
calls = int(ag_row[0] or 0)
if calls == 0:
# 沒呼叫也佔位顯示
agent_matrix.append({
'key': agent_key, 'label': _AGENT_LABELS[agent_key][0],
'desc': _AGENT_LABELS[agent_key][1],
'calls': 0, 'tokens': 0, 'cost': 0,
'rag_hits': 0, 'rag_rate': 0,
'ollama_pct': 0, 'gemini_pct': 0, 'paid_pct': 0,
'ollama_gcp_a': 0, 'ollama_gcp_b': 0, 'ollama_111': 0,
'gemini': 0, 'other_paid': 0,
'errors': 0, 'error_rate': 0,
'avg_ms': 0, 'mcp_calls': 0, 'mcp_rate': 0,
'callers_in_group': callers,
})
continue
# MCP 編排率(透過 request_id 串接。mcp_calls 尚未 migration 時安全降級為 0。
if mcp_calls_table_exists:
mcp_count = session.execute(
sa_text("""
SELECT COUNT(DISTINCT a.request_id)
FROM ai_calls a
INNER JOIN mcp_calls m ON m.request_id = a.request_id
WHERE a.called_at >= NOW() - (:h * INTERVAL '1 hour')
AND a.caller = ANY(:callers)
AND a.request_id IS NOT NULL
"""),
{'h': hours, 'callers': callers},
).fetchone()[0] or 0
else:
mcp_count = 0
errors = int(ag_row[10] or 0)
ollama = int(ag_row[4] or 0)
gemini = int(ag_row[8] or 0)
other_paid = int(ag_row[9] or 0)
agent_matrix.append({
'key': agent_key,
'label': _AGENT_LABELS[agent_key][0],
'desc': _AGENT_LABELS[agent_key][1],
'calls': calls,
'tokens': int(ag_row[1] or 0),
'cost': float(ag_row[2] or 0),
'rag_hits': int(ag_row[3] or 0),
'rag_rate': (float(ag_row[3] or 0) / calls * 100) if calls else 0,
'ollama': ollama, 'ollama_pct': (ollama / calls * 100) if calls else 0,
'ollama_gcp_a': int(ag_row[5] or 0),
'ollama_gcp_b': int(ag_row[6] or 0),
'ollama_111': int(ag_row[7] or 0),
'gemini': gemini, 'gemini_pct': (gemini / calls * 100) if calls else 0,
'other_paid': other_paid,
'paid_pct': ((gemini + other_paid) / calls * 100) if calls else 0,
'errors': errors, 'error_rate': (errors / calls * 100) if calls else 0,
'avg_ms': int(ag_row[11] or 0),
'mcp_calls': int(mcp_count),
'mcp_rate': (float(mcp_count) / calls * 100) if calls else 0,
'callers_in_group': callers,
})
# 3. MCP server 24h 工作量(同 host_health 邏輯)
if mcp_calls_table_exists:
mcp_servers = session.execute(
sa_text("""
SELECT server, caller, COUNT(*) AS calls,
COUNT(*) FILTER (WHERE cache_hit) AS cache_hits,
COALESCE(SUM(cost_usd), 0) AS cost
FROM mcp_calls
WHERE called_at >= NOW() - (:h * INTERVAL '1 hour')
GROUP BY server, caller
ORDER BY calls DESC
LIMIT 30
"""),
{'h': hours},
).fetchall()
mcp_matrix = [
{
'server': r[0], 'caller': r[1],
'calls': int(r[2] or 0),
'cache_hits': int(r[3] or 0),
'cost': float(r[4] or 0),
'cache_rate': (float(r[3] or 0) / float(r[2]) * 100) if r[2] else 0,
}
for r in mcp_servers
]
else:
mcp_matrix = []
# 4. 自動編排建議rule-based 提案)
recommendations = []
if not mcp_calls_table_exists:
recommendations.append({
'severity': 'med', 'agent': 'MCP 觀測',
'finding': 'mcp_calls 尚未建立MCP 編排率目前以 0 顯示',
'suggestion': '執行 Phase 10.7 migration 後,本頁會自動接回 MCP server/caller 矩陣',
})
for ag in agent_matrix:
if ag['calls'] == 0:
continue
# 規則 1付費比例 > 50% 且 ollama 比例 < 20% → 建議切 Hermes-first
if ag['paid_pct'] > 50 and ag['ollama_pct'] < 20:
recommendations.append({
'severity': 'high', 'agent': ag['label'],
'finding': f"付費 LLM 比例 {ag['paid_pct']:.0f}%cost ${ag['cost']:.2f}",
'suggestion': '改用 Hermes-first 短路機制:先試 Ollama 三主機 5s timeout0 hits 才 escalate Gemini',
})
# 規則 2錯誤率 > 10% → 建議跑 code review
if ag['error_rate'] > 10:
recommendations.append({
'severity': 'high', 'agent': ag['label'],
'finding': f"錯誤率 {ag['error_rate']:.1f}%{ag['errors']}/{ag['calls']}",
'suggestion': '觸發程式碼審查流程找回歸問題(可由 AI 流量控制台執行)',
})
# 規則 3MCP 編排率 < 5% 但 calls 多 → 建議擴大 MCP 使用
if mcp_calls_table_exists and ag['mcp_rate'] < 5 and ag['calls'] > 50:
recommendations.append({
'severity': 'med', 'agent': ag['label'],
'finding': f"MCP 編排率僅 {ag['mcp_rate']:.1f}%,未善用外部工具",
'suggestion': '考慮加 MCP omnisearch / firecrawl 補強事實查證鏈',
})
# 規則 4RAG 命中率高≥40%)但有 saved_call=False 的多 → 提醒 feedback
if ag['rag_rate'] >= 40 and ag['rag_hits'] >= 20:
recommendations.append({
'severity': 'low', 'agent': ag['label'],
'finding': f"RAG 命中率 {ag['rag_rate']:.1f}%{ag['rag_hits']} hits— 知識庫貢獻度高",
'suggestion': '推 Telegram inline button 收集 feedback_score 強化 promotion gate',
})
# 規則 5111 fallback 比例 > 20% → 警示
if ag['calls'] > 0 and ag['ollama_111'] / max(ag['calls'], 1) > 0.20:
fb_pct = ag['ollama_111'] / ag['calls'] * 100
recommendations.append({
'severity': 'med', 'agent': ag['label'],
'finding': f"111 fallback 比例 {fb_pct:.0f}%GCP 兩台不可達?)",
'suggestion': '檢查 mo.wooo.work/observability/host_health AIOps incidents',
})
return render_template(
'admin/agent_orchestration.html',
active_page='obs_agent_orchestration',
hours=hours,
agent_matrix=agent_matrix,
mcp_matrix=mcp_matrix,
recommendations=recommendations,
overall={
'total_calls': total_calls,
'total_cost': total_cost,
'total_tokens': total_tokens,
'paid_calls': paid_calls,
'local_calls': local_calls,
'rag_hits': rag_hits,
'paid_pct': (paid_calls / total_calls * 100) if total_calls else 0,
'local_pct': (local_calls / total_calls * 100) if total_calls else 0,
'rag_rate': (rag_hits / total_calls * 100) if total_calls else 0,
},
error=None,
)
except Exception as e:
return render_template(
'admin/agent_orchestration.html',
active_page='obs_agent_orchestration', hours=hours,
agent_matrix=[], mcp_matrix=[], recommendations=[], overall={},
error='資料查詢暫時不可用,已切換安全空狀態。',
)
finally:
session.close()
# ─────────────────────────────────────────────────────────────────────────────
# /observability/ai_calls — Phase 27 主入口
# ─────────────────────────────────────────────────────────────────────────────
@admin_observability_bp.route('/ai_calls')
@login_required
def ai_calls_dashboard():
"""ai_calls 表觀測 dashboard24h 預設視窗)"""
hours = int(request.args.get('hours', '24'))
caller_filter = request.args.get('caller', '').strip()
provider_filter = request.args.get('provider', '').strip()
since = datetime.now() - timedelta(hours=hours)
session = get_session()
try:
# 1. 總覽
summary = session.execute(
sa_text("""
SELECT
COUNT(*) AS total_calls,
COALESCE(SUM(input_tokens + output_tokens), 0) AS total_tokens,
COALESCE(SUM(cost_usd), 0) AS total_cost,
COALESCE(AVG(duration_ms), 0) AS avg_duration,
COUNT(*) FILTER (WHERE status = 'ok') AS ok_calls,
COUNT(*) FILTER (WHERE status NOT IN ('ok','cache_only')) AS error_calls,
COUNT(*) FILTER (WHERE rag_hit) AS rag_hits,
COUNT(*) FILTER (WHERE cache_hit) AS cache_hits
FROM ai_calls
WHERE called_at >= :since
"""),
{'since': since},
).fetchone()
# 2. by provider
by_provider = session.execute(
sa_text("""
SELECT provider, COUNT(*) AS calls,
COALESCE(SUM(input_tokens + output_tokens), 0) AS tokens,
COALESCE(SUM(cost_usd), 0) AS cost
FROM ai_calls
WHERE called_at >= :since
GROUP BY provider
ORDER BY tokens DESC
"""),
{'since': since},
).fetchall()
# 3. TOP 100 calls — Phase 33 Critic HIGH #2 修補:
# 改用固定 SQL + 全綁參數,移除 f-string 動態 WHERE 拼接(防後人不慎注入)
recent = session.execute(
sa_text("""
SELECT id, called_at, caller, provider, model,
input_tokens, output_tokens, duration_ms, status,
cost_usd, cache_hit, rag_hit
FROM ai_calls
WHERE called_at >= :since
AND (:caller_f = '' OR caller = :caller_f)
AND (:provider_f = '' OR provider = :provider_f)
ORDER BY called_at DESC
LIMIT 100
"""),
{
'since': since,
'caller_f': caller_filter,
'provider_f': provider_filter,
},
).fetchall()
# 4. caller 列表(給篩選 dropdown
callers = session.execute(
sa_text("""
SELECT DISTINCT caller FROM ai_calls
WHERE called_at >= :since ORDER BY caller
"""),
{'since': since},
).fetchall()
# 5b. Phase 47 K-2: by model 細分(不只 provider到實際 model
by_model = session.execute(
sa_text("""
SELECT model, provider, COUNT(*) AS calls,
COALESCE(SUM(input_tokens + output_tokens), 0) AS tokens,
COALESCE(SUM(cost_usd), 0) AS cost,
COALESCE(AVG(duration_ms), 0) AS avg_ms,
COUNT(*) FILTER (WHERE status NOT IN ('ok','cache_only')) AS errors
FROM ai_calls
WHERE called_at >= :since
AND model IS NOT NULL AND model != ''
GROUP BY model, provider
ORDER BY calls DESC
LIMIT 15
"""),
{'since': since},
).fetchall()
# 5c. Phase 47 K-2: hourly 呼叫量趨勢24 個 bucket
hourly_trend = session.execute(
sa_text("""
SELECT date_trunc('hour', called_at) AS hr,
COUNT(*) AS calls,
COALESCE(SUM(cost_usd), 0) AS cost,
COUNT(*) FILTER (WHERE status NOT IN ('ok','cache_only')) AS errors
FROM ai_calls
WHERE called_at >= NOW() - INTERVAL '24 hours'
GROUP BY hr ORDER BY hr ASC
"""),
).fetchall()
# 5d. Phase 47 K-2: agent_context 最近 10 筆OpenClaw/Hermes 對話上下文)
recent_contexts = session.execute(
sa_text("""
SELECT created_at, agent_name, context_key, ttl_minutes,
LEFT(context_val, 120) AS preview
FROM agent_context
ORDER BY created_at DESC LIMIT 10
"""),
).fetchall()
# 5. Phase 39 D-3: caller × RAG 命中率 × MCP 編排率(跨表 JOIN
# mcp_calls / rag_query_log 尚未 migration 時安全降級,不曝露 DB exception。
mcp_calls_table_exists = bool(session.execute(
sa_text("SELECT to_regclass('public.mcp_calls') IS NOT NULL")
).scalar())
rag_query_log_exists = bool(session.execute(
sa_text("SELECT to_regclass('public.rag_query_log') IS NOT NULL")
).scalar())
if mcp_calls_table_exists and rag_query_log_exists:
caller_richness = session.execute(
sa_text("""
SELECT a.caller,
COUNT(*) AS total_calls,
COUNT(*) FILTER (WHERE a.rag_hit) AS rag_hits,
COUNT(DISTINCT m.request_id) AS mcp_orchestrated,
COALESCE(AVG(rl.feedback_score) FILTER (WHERE rl.feedback_score IS NOT NULL), 0)
AS avg_rag_feedback,
COUNT(rl.feedback_score) AS feedback_count
FROM ai_calls a
LEFT JOIN mcp_calls m
ON m.request_id = a.request_id
AND m.called_at >= :since
LEFT JOIN rag_query_log rl
ON rl.caller = a.caller
AND rl.queried_at >= :since
WHERE a.called_at >= :since
GROUP BY a.caller
HAVING COUNT(*) >= 5
ORDER BY total_calls DESC
LIMIT 12
"""),
{'since': since},
).fetchall()
else:
caller_richness = session.execute(
sa_text("""
SELECT caller,
COUNT(*) AS total_calls,
COUNT(*) FILTER (WHERE rag_hit) AS rag_hits
FROM ai_calls
WHERE called_at >= :since
GROUP BY caller
HAVING COUNT(*) >= 5
ORDER BY total_calls DESC
LIMIT 12
"""),
{'since': since},
).fetchall()
return render_template(
'admin/ai_calls_dashboard.html',
active_page='obs_ai_calls',
hours=hours,
caller_filter=caller_filter,
provider_filter=provider_filter,
summary={
'total_calls': int(summary[0] or 0),
'total_tokens': int(summary[1] or 0),
'total_cost': float(summary[2] or 0),
'avg_duration': int(summary[3] or 0),
'ok_calls': int(summary[4] or 0),
'error_calls': int(summary[5] or 0),
'rag_hits': int(summary[6] or 0),
'cache_hits': int(summary[7] or 0),
},
by_provider=[
{'provider': r[0], 'calls': int(r[1] or 0),
'tokens': int(r[2] or 0), 'cost': float(r[3] or 0)}
for r in by_provider
],
recent=[_build_ai_call_recent_row(r) for r in recent],
callers=[r[0] for r in callers],
by_model=[
{
'model': r[0], 'provider': r[1],
'calls': int(r[2] or 0), 'tokens': int(r[3] or 0),
'cost': float(r[4] or 0), 'avg_ms': int(r[5] or 0),
'errors': int(r[6] or 0),
}
for r in by_model
],
hourly_trend=[
{
'hour': r[0].strftime('%H:%M') if r[0] else '',
'calls': int(r[1] or 0),
'cost': float(r[2] or 0),
'errors': int(r[3] or 0),
}
for r in hourly_trend
],
recent_contexts=[
{
'created_at': r[0].strftime('%Y-%m-%d %H:%M') if r[0] else '',
'agent_name': r[1], 'context_key': r[2],
'ttl_minutes': int(r[3] or 0),
'preview': r[4] or '',
}
for r in recent_contexts
],
caller_richness=[
{
'caller': r[0],
'total_calls': int(r[1] or 0),
'rag_hits': int(r[2] or 0),
'mcp_orchestrated': int(r[3] or 0) if mcp_calls_table_exists and rag_query_log_exists else 0,
'avg_rag_feedback': round(float(r[4] or 0), 2) if mcp_calls_table_exists and rag_query_log_exists else 0,
'feedback_count': int(r[5] or 0) if mcp_calls_table_exists and rag_query_log_exists else 0,
'rag_hit_rate': (float(r[2] or 0) / float(r[1]) * 100) if r[1] else 0,
'mcp_rate': (float(r[3] or 0) / float(r[1]) * 100) if mcp_calls_table_exists and rag_query_log_exists and r[1] else 0,
}
for r in caller_richness
],
error=None,
)
except Exception as e:
return render_template(
'admin/ai_calls_dashboard.html',
active_page='obs_ai_calls',
hours=hours, caller_filter=caller_filter,
provider_filter=provider_filter,
summary={}, by_provider=[], recent=[], callers=[], caller_richness=[],
by_model=[], hourly_trend=[], recent_contexts=[],
error='AI 呼叫資料暫時不可用,已切換安全空狀態。',
)
finally:
session.close()
# ─────────────────────────────────────────────────────────────────────────────
# /observability/promotion_review — Phase 28 PromotionGate 待審核列表
# ─────────────────────────────────────────────────────────────────────────────
@admin_observability_bp.route('/promotion_review')
@login_required
def promotion_review_list():
"""awaiting_review episodes 列表24h 內 reviewed_at IS NULL
Phase 39D-1每筆 episode 自動跑 RAG 找 Top 3 相似已晉升 ai_insights
輔助人工判斷晉升價值。RAG fail-safe失敗則 similar_insights=[],不擋頁面。
"""
session = get_session()
try:
rows = session.execute(
sa_text("""
SELECT id, created_at, episode_type, source_table, source_id,
distilled_text, quality_score, weight, promotion_status
FROM learning_episodes
WHERE promotion_status = 'awaiting_review'
AND reviewed_at IS NULL
ORDER BY weight DESC, created_at ASC
LIMIT 50
"""),
).fetchall()
# ai_insights 全表大小(給「晉升後 KB 增長」對照)
kb_size = 0
try:
kb_row = session.execute(
sa_text("SELECT COUNT(*) FROM ai_insights"),
).fetchone()
kb_size = int(kb_row[0] or 0)
except Exception:
pass
episodes = [
{'id': r[0], 'created_at': r[1].strftime('%Y-%m-%d %H:%M'),
'episode_type': r[2], 'source_table': r[3], 'source_id': r[4],
'distilled_text': (r[5] or '')[:600],
'quality_score': float(r[6] or 0),
'weight': float(r[7] or 0),
'status': r[8],
'similar_insights': []}
for r in rows
]
# Phase 39 D-1對每筆 episode 跑 RAG 找 Top 3 相似已晉升
try:
from services.rag_service import rag_service
for ep in episodes:
try:
rag_result = rag_service.query(
text=ep['distilled_text'][:500],
caller='admin_promotion_review',
top_k=3,
threshold=0.7,
)
ep['similar_insights'] = [
{
'id': h.get('id'),
'insight_type': h.get('insight_type'),
'content': (h.get('content') or '')[:180],
'similarity': round(float(h.get('similarity', 0)), 3),
'created_at': h.get('created_at').strftime('%Y-%m-%d')
if h.get('created_at') else '',
}
for h in rag_result.hits[:3]
]
except Exception:
logger.debug(
"Promotion review similar-insight lookup failed for episode_id=%s",
ep.get('id'),
exc_info=True,
)
except Exception:
logger.debug("Promotion review RAG service unavailable; skipping similar insights", exc_info=True)
# Phase 47 K-4: 蒸餾池 status 分布30d
ep_distribution = session.execute(
sa_text("""
SELECT promotion_status, COUNT(*) AS cnt
FROM learning_episodes
WHERE created_at >= NOW() - INTERVAL '30 days'
GROUP BY promotion_status ORDER BY cnt DESC
"""),
).fetchall()
episode_distribution_30d = {r[0]: int(r[1] or 0) for r in ep_distribution}
# Phase 47 K-4: ai_insights 最近 10 筆已晉升type/created_at 視覺)
latest_insights = session.execute(
sa_text("""
SELECT id, insight_type, period, product_sku, created_at,
LEFT(content, 160) AS preview
FROM ai_insights
ORDER BY created_at DESC LIMIT 10
"""),
).fetchall()
# Phase 47 K-4: agent_strategy_weights TOP 12OpenClaw 學習權重)
strategy_weights = session.execute(
sa_text("""
SELECT strategy_key, weight, success_cnt, fail_cnt, updated_at
FROM agent_strategy_weights
ORDER BY (success_cnt + fail_cnt) DESC
LIMIT 12
"""),
).fetchall()
return render_template(
'admin/promotion_review.html',
active_page='obs_promotion_review',
episodes=episodes,
kb_size=kb_size,
episode_distribution_30d=episode_distribution_30d,
latest_insights=[
{
'id': r[0], 'insight_type': r[1], 'period': r[2],
'product_sku': r[3],
'created_at': r[4].strftime('%Y-%m-%d %H:%M') if r[4] else '',
'preview': r[5] or '',
}
for r in latest_insights
],
strategy_weights=[
{
'strategy_key': r[0], 'weight': float(r[1] or 0),
'success': int(r[2] or 0), 'fail': int(r[3] or 0),
'updated_at': r[4].strftime('%Y-%m-%d') if r[4] else '',
'success_rate': (
float(r[2] or 0) / float((r[2] or 0) + (r[3] or 0)) * 100
) if ((r[2] or 0) + (r[3] or 0)) > 0 else 0,
}
for r in strategy_weights
],
error=None,
)
except Exception as e:
return render_template(
'admin/promotion_review.html',
active_page='obs_promotion_review',
episodes=[],
kb_size=0,
episode_distribution_30d={},
latest_insights=[],
strategy_weights=[],
error='RAG 晉升資料暫時不可用,已切換安全空狀態。',
)
finally:
session.close()
@admin_observability_bp.route('/promotion_review/approve/<int:episode_id>', methods=['POST'])
@login_required
def promotion_review_approve(episode_id: int):
"""Web 介面「通過」按鈕 — 等同於 Telegram pg_ok callback"""
try:
from services.learning_pipeline import promotion_gate, hash_human_approver
# 從 Flask session 取(已過 @login_required— 不信任 client header
user = get_current_user() or {}
username = user.get('username', 'web_admin')
approver_hash = hash_human_approver(username)
insight_id = promotion_gate.promote(
episode_id,
human_approver=approver_hash,
)
if insight_id:
return jsonify({'ok': True, 'insight_id': insight_id, 'approver': approver_hash})
return jsonify({'ok': False, 'error': 'promote failed'}), 500
except Exception as e:
return jsonify({'ok': False, 'error': str(e)[:200]}), 500
@admin_observability_bp.route('/promotion_review/reject/<int:episode_id>', methods=['POST'])
@login_required
def promotion_review_reject(episode_id: int):
"""Web 介面「拒絕」按鈕"""
try:
from services.learning_pipeline import promotion_gate, hash_human_approver
user = get_current_user() or {}
username = user.get('username', 'web_admin')
approver_hash = hash_human_approver(username)
ok = promotion_gate.reject(
episode_id,
'rejected_human',
detail='web admin reject',
human_approver=approver_hash,
)
return jsonify({'ok': ok})
except Exception as e:
return jsonify({'ok': False, 'error': str(e)[:200]}), 500
# ─────────────────────────────────────────────────────────────────────────────
# /observability/quality_trend — Phase 25 caller 反饋趨勢視覺化
# ─────────────────────────────────────────────────────────────────────────────
@admin_observability_bp.route('/quality_trend')
@login_required
def quality_trend_dashboard():
"""caller × feedback 趨勢30 日窗格)"""
days = int(request.args.get('days', '30'))
try:
from services.feedback_quality_tracker import (
compute_caller_quality_trend, get_caller_recommendations,
)
trends = compute_caller_quality_trend(days=days)
recommendations = get_caller_recommendations(days=days)
# 排序avg_score 升序(最差先看)
sorted_trends = sorted(
trends.items(),
key=lambda kv: kv[1].get('avg_score', 5),
)
# Phase 40 D-6: learning_episodes 各 status 分布(蒸餾池飽和度)
episode_distribution = {}
try:
session = get_session()
try:
rows = session.execute(
sa_text("""
SELECT promotion_status, COUNT(*) AS cnt
FROM learning_episodes
WHERE created_at >= NOW() - INTERVAL ':days days'
GROUP BY promotion_status
""").bindparams(days=days),
).fetchall() if False else session.execute(
sa_text(f"""
SELECT promotion_status, COUNT(*) AS cnt
FROM learning_episodes
WHERE created_at >= NOW() - INTERVAL '{int(days)} days'
GROUP BY promotion_status
"""),
).fetchall()
episode_distribution = {r[0]: int(r[1] or 0) for r in rows}
finally:
session.close()
except Exception:
pass
# Phase 40 D-6: 對最差 3 名 caller 跑 RAG 找根因建議
rag_root_causes = []
try:
from services.rag_service import rag_service
worst_3 = sorted_trends[:3] if len(sorted_trends) >= 3 else sorted_trends
for caller, info in worst_3:
if info.get('avg_score', 5) < 3.0 and info.get('total_feedback', 0) >= 3:
try:
q = (
f"caller {caller} 反饋分數低 平均 "
f"{info.get('avg_score', 0):.1f}/5 應採取什麼根因排查"
)
rag_result = rag_service.query(
text=q,
caller='admin_quality_trend',
top_k=2,
threshold=0.6,
)
if rag_result.hits:
rag_root_causes.append({
'caller': caller,
'avg_score': info.get('avg_score', 0),
'feedback_n': info.get('total_feedback', 0),
'hits': [
{
'id': h.get('id'),
'insight_type': h.get('insight_type'),
'content': (h.get('content') or '')[:200],
'similarity': round(float(h.get('similarity', 0)), 3),
}
for h in rag_result.hits[:2]
],
})
except Exception:
pass
except Exception:
pass
# Phase 47 K-5: action_outcomes verdict 統計ADR-012 閉環學習結果)
action_outcomes_stats = []
action_plans_status = []
rag_overall_dist = []
try:
session = get_session()
try:
# action_outcomes verdict 分布30d
ao_rows = session.execute(
sa_text(f"""
SELECT verdict, COUNT(*) AS cnt
FROM action_outcomes
WHERE created_at >= NOW() - INTERVAL '{int(days)} days'
GROUP BY verdict ORDER BY cnt DESC
"""),
).fetchall()
action_outcomes_stats = [{'verdict': r[0] or 'unknown', 'count': int(r[1] or 0)} for r in ao_rows]
# action_plans status 分布30d
ap_rows = session.execute(
sa_text(f"""
SELECT status, plan_type, COUNT(*) AS cnt
FROM action_plans
WHERE created_at >= NOW() - INTERVAL '{int(days)} days'
GROUP BY status, plan_type ORDER BY cnt DESC
"""),
).fetchall()
action_plans_status = [
{'status': r[0], 'plan_type': r[1] or 'misc', 'count': int(r[2] or 0)}
for r in ap_rows
]
# rag_query_log 整體 feedback 分布(不只 caller-level整體
rag_dist_rows = session.execute(
sa_text(f"""
SELECT feedback_score, COUNT(*) AS cnt
FROM rag_query_log
WHERE queried_at >= NOW() - INTERVAL '{int(days)} days'
AND feedback_score IS NOT NULL
GROUP BY feedback_score ORDER BY feedback_score
"""),
).fetchall()
rag_overall_dist = [{'score': int(r[0] or 0), 'count': int(r[1] or 0)} for r in rag_dist_rows]
finally:
session.close()
except Exception:
pass
return render_template(
'admin/quality_trend.html',
active_page='obs_quality_trend',
days=days,
trends=[(c, info) for c, info in sorted_trends],
recommendations=recommendations,
episode_distribution=episode_distribution,
rag_root_causes=rag_root_causes,
action_outcomes_stats=action_outcomes_stats,
action_plans_status=action_plans_status,
rag_overall_dist=rag_overall_dist,
error=None,
)
except Exception as e:
return render_template(
'admin/quality_trend.html',
active_page='obs_quality_trend',
days=days, trends=[], recommendations=[],
episode_distribution={}, rag_root_causes=[],
action_outcomes_stats=[], action_plans_status=[], rag_overall_dist=[],
error='AI 品質資料暫時不可用,已切換安全空狀態。',
)
# ─────────────────────────────────────────────────────────────────────────────
# /observability/budget — Phase 29 預算管理 + 手動 throttle
# ─────────────────────────────────────────────────────────────────────────────
@admin_observability_bp.route('/budget')
@login_required
def budget_dashboard():
"""ai_call_budgets 編輯 + 當月 spent 即時對比"""
from datetime import datetime as _dt
today = _dt.now()
month_start = _dt(today.year, today.month, 1)
session = get_session()
try:
ai_call_budgets_exists = bool(session.execute(
sa_text("SELECT to_regclass('public.ai_call_budgets') IS NOT NULL")
).scalar())
if ai_call_budgets_exists:
budgets = session.execute(
sa_text("""
SELECT id, period, provider, budget_usd, alert_pct, updated_at
FROM ai_call_budgets
ORDER BY period, provider NULLS FIRST
"""),
).fetchall()
else:
budgets = []
spent_rows = session.execute(
sa_text("""
SELECT provider, COALESCE(SUM(cost_usd), 0) AS spent
FROM ai_calls
WHERE called_at >= :ms
GROUP BY provider
"""),
{'ms': month_start},
).fetchall()
spent_map = {r[0]: float(r[1] or 0) for r in spent_rows}
# throttle 狀態
throttle_state = {}
try:
from services.cost_throttle_service import get_throttle_state
throttle_state = get_throttle_state()
except Exception:
pass
rows = []
for b in budgets:
provider = b[2] # 可能 None全供應商總額
spent = spent_map.get(provider, 0.0) if provider else sum(spent_map.values())
budget_usd = float(b[3] or 0)
ratio = (spent / budget_usd) if budget_usd > 0 else 0
rows.append({
'id': b[0], 'period': b[1], 'provider': provider or '(all)',
'budget_usd': budget_usd, 'alert_pct': int(b[4] or 80),
'spent': spent, 'ratio': ratio,
'throttled': throttle_state.get(provider, {}).get('throttled', False) if provider else False,
'updated_at': b[5].strftime('%Y-%m-%d %H:%M') if b[5] else '-',
})
# Phase 47 K-3: 30d daily cost trend by provider
cost_30d = session.execute(
sa_text("""
SELECT date_trunc('day', called_at)::date AS d,
provider, COALESCE(SUM(cost_usd), 0) AS cost
FROM ai_calls
WHERE called_at >= NOW() - INTERVAL '30 days'
GROUP BY d, provider
ORDER BY d DESC, cost DESC
"""),
).fetchall()
cost_trend_30d = []
for r in cost_30d:
cost_trend_30d.append({
'date': r[0].strftime('%m-%d') if r[0] else '',
'provider': r[1],
'cost': float(r[2] or 0),
})
# Phase 55 S-3: 當月各 provider cost 分布(給圓餅圖用)
provider_cost_month = session.execute(
sa_text("""
SELECT provider, COALESCE(SUM(cost_usd), 0) AS cost
FROM ai_calls
WHERE called_at >= :ms AND cost_usd > 0
GROUP BY provider ORDER BY cost DESC
"""),
{'ms': month_start},
).fetchall()
# Phase 47 K-3: top 5 cost-burning caller (當月)
top_cost_callers = session.execute(
sa_text("""
SELECT caller, COUNT(*) AS calls,
COALESCE(SUM(cost_usd), 0) AS cost,
COALESCE(SUM(input_tokens + output_tokens), 0) AS tokens
FROM ai_calls
WHERE called_at >= :ms
AND cost_usd > 0
GROUP BY caller
ORDER BY cost DESC LIMIT 5
"""),
{'ms': month_start},
).fetchall()
# Phase 47 K-3: ai_price_recommendations 7d 統計
price_rec_7d = session.execute(
sa_text("""
SELECT strategy, COUNT(*) AS cnt,
COALESCE(AVG(confidence), 0) AS avg_conf
FROM ai_price_recommendations
WHERE created_at >= NOW() - INTERVAL '7 days'
GROUP BY strategy ORDER BY cnt DESC
"""),
).fetchall()
# Phase 39 D-4: RAG 自動建議策略(針對超 80% 的 row
budget_strategies = []
over_threshold_rows = [r for r in rows if r.get('ratio', 0) >= 0.8]
if over_threshold_rows:
try:
from services.rag_service import rag_service
top_breach = max(over_threshold_rows, key=lambda r: r.get('ratio', 0))
query_text = (
f"預算超出 alert_pct provider={top_breach['provider']} "
f"ratio={int(top_breach['ratio']*100)}% 應採取什麼節流策略"
)
rag_result = rag_service.query(
text=query_text,
caller='admin_budget_dashboard',
top_k=3,
threshold=0.65,
)
budget_strategies = [
{
'id': h.get('id'),
'insight_type': h.get('insight_type'),
'content': (h.get('content') or '')[:240],
'similarity': round(float(h.get('similarity', 0)), 3),
}
for h in rag_result.hits[:3]
]
except Exception:
pass
return render_template(
'admin/budget.html',
active_page='obs_budget',
rows=rows,
budget_strategies=budget_strategies,
cost_trend_30d=cost_trend_30d,
top_cost_callers=[
{
'caller': r[0], 'calls': int(r[1] or 0),
'cost': float(r[2] or 0), 'tokens': int(r[3] or 0),
}
for r in top_cost_callers
],
provider_cost_month=[
{'provider': r[0], 'cost': float(r[1] or 0)}
for r in provider_cost_month
],
price_rec_7d=[
{
'strategy': r[0], 'count': int(r[1] or 0),
'avg_confidence': round(float(r[2] or 0), 3),
}
for r in price_rec_7d
],
error=None,
)
except Exception as e:
return render_template('admin/budget.html', active_page='obs_budget', rows=[],
budget_strategies=[], cost_trend_30d=[],
top_cost_callers=[], price_rec_7d=[],
provider_cost_month=[],
error='預算資料暫時不可用,已切換安全空狀態。')
finally:
session.close()
@admin_observability_bp.route('/ai_calls/trigger_code_review', methods=['POST'])
@login_required
def ai_calls_trigger_code_review():
"""Phase 40 D-7 (L2 自動化):對高錯誤率時段觸發程式碼審查流程。
用途admin 在觀測台看到某 caller 錯誤率飆高時,一鍵觸發 5-step
pipeline (read→hermes_scan→openclaw_summary→ea_decision→nemoton_act)
在 daemon thread 自動審查最近 commit 變更檔案,找出可能的回歸問題。
"""
try:
import subprocess
import threading
from services.code_review_pipeline_service import CodeReviewPipeline
# 取最新 commit + 變更檔案
commit_sha = subprocess.check_output(
['git', 'rev-parse', 'HEAD'], stderr=subprocess.DEVNULL,
).decode().strip()
changed = subprocess.check_output(
['git', 'diff-tree', '--no-commit-id', '--name-only', '-r', '-m', commit_sha],
stderr=subprocess.DEVNULL,
).decode().strip().split('\n')
changed = list(dict.fromkeys(f for f in changed if f))
if not changed:
return jsonify({'ok': False, 'error': '最新 commit 無變更檔案'}), 503
pipeline = CodeReviewPipeline(
commit_sha=commit_sha,
changed_files=changed,
branch='main',
deploy_type='manual_observability',
)
threading.Thread(target=pipeline.run, daemon=True).start()
return jsonify({
'ok': True,
'pipeline_id': pipeline.pipeline_id,
'commit_sha': commit_sha[:8],
'changed_files_count': len(changed),
'message': f'已觸發程式碼審查流程(流程編號:{pipeline.pipeline_id})在背景執行,'
f'5 個步驟完成後會推 Telegram 通知。',
})
except Exception as e:
return jsonify({'ok': False, 'error': f'{type(e).__name__}: {str(e)[:200]}'}), 500
@admin_observability_bp.route('/ppt_audit/trigger_aider_heal', methods=['POST'])
@login_required
def ppt_audit_trigger_aider_heal():
"""Phase 40 D-8 (L2 自動化):對失敗 PPT audit 觸發 AiderHeal 修 generator。
用途admin 在觀測台看到 PPT vision audit 連續失敗時,一鍵觸發 AiderHeal
自動修 services/ppt_generator.py或對應 template generator
結果會 git push 到 main 觸發 CD 自動部署。
"""
try:
from services import aider_heal_executor
data = request.get_json(silent=True) or {}
error_msg = (data.get('error_msg') or '').strip()
issue_summary = (data.get('issue_summary') or '').strip()
pptx_filename = (data.get('pptx_filename') or '').strip()
diagnosis = error_msg or issue_summary
if not diagnosis:
return jsonify({'ok': False, 'error': '需提供 error_msg 或 issue_summary'}), 400
error_message = diagnosis[:500]
if pptx_filename:
error_message = f"PPT 視覺審核失敗:{pptx_filename}\n診斷:{error_message}"
# 構造 context 給 AiderHeal
context = {
'error_type': 'ppt_vision_audit_failure',
'error_message': error_message[:500],
'target_file': 'services/ppt_generator.py',
'pptx_filename': pptx_filename,
'triggered_by': 'admin_observability',
'issue_summary': issue_summary[:500],
}
heal_key = pptx_filename or diagnosis[:160] or 'manual'
queued_at = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
active_job = {
'key': heal_key,
'pptx_filename': pptx_filename,
'target_file': 'services/ppt_generator.py',
'queued_at': queued_at,
'diagnosis': _public_ppt_text(diagnosis, max_chars=120),
}
with _PPT_AIDER_HEAL_LOCK:
if heal_key in _PPT_AIDER_HEAL_ACTIVE:
existing_job = dict(_PPT_AIDER_HEAL_ACTIVE.get(heal_key) or active_job)
existing_job['diagnosis'] = _public_ppt_text(existing_job.get('diagnosis'), max_chars=120)
return jsonify({
'ok': True,
'status': 'already_running',
'action': 'CODE_FIX',
'message': '這份簡報的修復流程已在背景執行中,請等通知結果回報。',
'target_file': 'services/ppt_generator.py',
'active_count': len(_PPT_AIDER_HEAL_ACTIVE),
'job': existing_job,
}), 202
_PPT_AIDER_HEAL_ACTIVE[heal_key] = active_job
def _heal_worker():
try:
result = aider_heal_executor.execute_code_fix(
error_type='ppt_vision_audit_failure',
error_message=error_message,
target_file='services/ppt_generator.py',
context=context,
)
logger.info(
"[PPTAudit] AiderHeal 背景任務完成 | file=%s | ok=%s | message=%s",
pptx_filename or '-',
bool(result.get('success')),
(result.get('message') or '')[:160],
)
except Exception:
logger.exception(
"[PPTAudit] AiderHeal 背景任務失敗 | file=%s",
pptx_filename or '-',
)
finally:
with _PPT_AIDER_HEAL_LOCK:
_PPT_AIDER_HEAL_ACTIVE.pop(heal_key, None)
thread_key = ''.join(ch for ch in pptx_filename if ch.isalnum())[:24] or 'manual'
threading.Thread(
target=_heal_worker,
daemon=True,
name=f"ppt-aider-heal-{thread_key}",
).start()
return jsonify({
'ok': True,
'status': 'queued',
'action': 'CODE_FIX',
'message': '修復流程已排入背景執行;完成後會由通知結果回報。',
'target_file': 'services/ppt_generator.py',
'active_count': len(_list_ppt_aider_heal_active_jobs()),
'job': active_job,
}), 202
except Exception as e:
return jsonify({'ok': False, 'error': f'{type(e).__name__}: {str(e)[:200]}'}), 500
@admin_observability_bp.route('/ppt_audit/aider_heal_status')
@login_required
def ppt_audit_aider_heal_status():
jobs = _list_ppt_aider_heal_active_jobs()
return jsonify({
'ok': True,
'active_count': len(jobs),
'jobs': jobs,
})
@admin_observability_bp.route('/ppt_audit/generate_missing', methods=['POST'])
@login_required
def ppt_audit_generate_missing():
"""補齊 PPT audit 頁定義中的簡報產出。
這是非阻塞入口Web 頁面只負責排入背景 thread真正的產生流程共用
Telegram/OpenClaw 既有 generator 與 cache key。
"""
try:
from services.ppt_auto_generation_service import start_defined_ppt_generation_background
data = request.get_json(silent=True) or {}
report_types = data.get('report_types') or None
force = bool(data.get('force'))
result = start_defined_ppt_generation_background(
report_types=report_types,
schedule_kind='manual',
force=force,
)
return jsonify(result), 202 if result.get('status') == 'queued' else 200
except Exception as e:
return jsonify({'ok': False, 'error': f'{type(e).__name__}: {str(e)[:200]}'}), 500
@admin_observability_bp.route('/ppt_audit/run_vision', methods=['POST'])
@login_required
def ppt_audit_run_vision():
"""Queue a non-blocking visual QA run for selected generated PPT files."""
try:
from services.ppt_vision_service import start_ppt_vision_audit_background
data = request.get_json(silent=True) or {}
filenames = data.get('filenames') or []
if isinstance(filenames, str):
filenames = [filenames]
filenames = [str(name) for name in filenames if str(name).lower().endswith('.pptx')]
max_files = data.get('max_files') or (len(filenames) if filenames else 10)
try:
max_files = max(1, min(int(max_files), 20))
except Exception:
max_files = 10
result = start_ppt_vision_audit_background(
reports_dir=None,
filenames=filenames,
max_files=max_files,
hours=24,
)
return jsonify(result), 202 if result.get('status') == 'queued' else 200
except Exception as e:
return jsonify({'ok': False, 'error': f'{type(e).__name__}: {str(e)[:200]}'}), 500
@admin_observability_bp.route('/ppt_audit/vision_status')
@login_required
def ppt_audit_vision_status():
"""Expose current/last background PPT vision audit status for the admin UI."""
try:
from services.ppt_vision_service import get_ppt_vision_audit_status
return jsonify(get_ppt_vision_audit_status())
except Exception as e:
return jsonify({'ok': False, 'error': f'{type(e).__name__}: {str(e)[:200]}'}), 500
def _resolve_ppt_report_path(filename: str):
"""在 REPORTS_DIR 內解析簡報檔名,並阻擋路徑逃逸。"""
import os
from utils.security import safe_join
reports_dir = os.environ.get('REPORTS_DIR', '/app/data/reports')
safe_path = safe_join(reports_dir, filename)
if not safe_path.exists() or not safe_path.is_file():
return None, ('檔案不存在', 404)
if safe_path.suffix.lower() != '.pptx':
return None, ('不支援的檔案格式', 400)
return safe_path, None
def _validate_pptx_for_preview(safe_path):
import zipfile
try:
with zipfile.ZipFile(safe_path, 'r') as zf:
bad = zf.testzip()
if bad is not None:
return f'PPT 檔案損毀,無法預覽(損毀區段:{bad}'
except zipfile.BadZipFile:
return 'PPT 檔案損毀,無法預覽(非有效 zip'
except Exception as e:
return f'預覽檢查失敗:{type(e).__name__}'
return None
@admin_observability_bp.route('/ppt_audit_file/<path:filename>/prewarm', methods=['POST'])
@login_required
def ppt_audit_file_prewarm(filename: str):
"""建立單一 PPT 的 PDF 預覽快取,並回傳 JSON 狀態。"""
try:
safe_path, error_response = _resolve_ppt_report_path(filename)
if error_response:
message, status_code = error_response
return jsonify({'ok': False, 'error': message}), status_code
validation_error = _validate_pptx_for_preview(safe_path)
if validation_error:
return jsonify({'ok': False, 'error': validation_error}), 409
from services.ppt_preview_service import build_ppt_preview
preview = build_ppt_preview(safe_path)
if not preview.ok or not preview.pdf_path:
return jsonify({'ok': False, 'error': preview.error or '無法產生預覽'}), 409
return jsonify({
'ok': True,
'filename': safe_path.name,
'cache_hit': bool(preview.cache_hit),
'converter': preview.converter,
'message': 'PDF 預覽快取已建立' if not preview.cache_hit else 'PDF 預覽快取已存在',
})
except ValueError:
return jsonify({'ok': False, 'error': '非法路徑'}), 400
except Exception as e:
return jsonify({'ok': False, 'error': f'{type(e).__name__}: {str(e)[:200]}'}), 500
@admin_observability_bp.route('/ppt_audit_file/<path:filename>')
@login_required
def ppt_audit_file(filename: str):
"""提供觀測台簡報檔案預覽/下載。
- action=view 開啟預覽(預設)
- action=pdf 產生/回傳線上預覽 PDF
- action=download 直接下載
"""
action = (request.args.get('action', 'view') or 'view').strip().lower()
try:
safe_path, error_response = _resolve_ppt_report_path(filename)
if error_response:
message, status_code = error_response
return message, status_code
if action in ('view', 'pdf'):
validation_error = _validate_pptx_for_preview(safe_path)
if validation_error:
return validation_error, 409
if action in ('view', 'pdf'):
from services.ppt_preview_service import build_ppt_preview
preview = build_ppt_preview(safe_path)
if action == 'pdf':
if not preview.ok or not preview.pdf_path:
return preview.error or '無法產生預覽', 409
return send_file(
preview.pdf_path,
mimetype='application/pdf',
as_attachment=False,
download_name=f'{safe_path.stem}.pdf',
)
return render_template(
'admin/ppt_audit_preview.html',
active_page='obs_ppt_audit',
filename=safe_path.name,
file_size_kb=round(safe_path.stat().st_size / 1024, 1),
file_mtime=datetime.fromtimestamp(safe_path.stat().st_mtime).strftime('%Y-%m-%d %H:%M'),
preview=preview,
pdf_url=url_for('admin_observability.ppt_audit_file', filename=safe_path.name, action='pdf'),
download_url=url_for('admin_observability.ppt_audit_file', filename=safe_path.name, action='download'),
back_url=url_for('admin_observability.ppt_audit_history'),
)
return send_file(
str(safe_path),
mimetype='application/vnd.openxmlformats-officedocument.presentationml.presentation',
as_attachment=(action == 'download'),
download_name=safe_path.name,
)
except ValueError:
return '非法路徑', 400
except Exception as e:
return f'{type(e).__name__}: {str(e)[:200]}', 500
@admin_observability_bp.route('/api/health_indicator')
@login_required
def health_indicator_api():
"""Phase 52 P-1給 topbar 觀測台 indicator 用的輕量 JSON API。
回傳當前是否有「需要關注」的事件:
- 三主機掛掉
- 待審 episode > 0
- 過去 1h 錯誤率 ≥ 30%
- 預算 ≥ 90%
"""
try:
now_ts = time.time()
with _HEALTH_INDICATOR_CACHE_LOCK:
cached_payload = _HEALTH_INDICATOR_CACHE.get('payload')
if cached_payload and now_ts < float(_HEALTH_INDICATOR_CACHE.get('expires_at') or 0):
return jsonify(dict(cached_payload))
session = get_session()
try:
# 三主機最新狀態
host_unhealthy = 0
try:
rows = session.execute(
sa_text("""
WITH latest AS (
SELECT host_label,
FIRST_VALUE(healthy) OVER (
PARTITION BY host_label ORDER BY probed_at DESC
) AS healthy
FROM host_health_probes
WHERE probed_at >= NOW() - INTERVAL '1 hour'
)
SELECT host_label, BOOL_AND(NOT healthy) AS down
FROM latest
GROUP BY host_label
"""),
).fetchall()
host_unhealthy = sum(1 for r in rows if r[1])
except Exception:
pass
# 待審 episode
ep_pending = 0
try:
ep_pending = int(session.execute(
sa_text("SELECT COUNT(*) FROM learning_episodes WHERE promotion_status = 'awaiting_review' AND reviewed_at IS NULL"),
).fetchone()[0] or 0)
except Exception:
pass
# 1h 錯誤率
error_rate = 0
try:
row = session.execute(
sa_text("""
SELECT COUNT(*),
COUNT(*) FILTER (WHERE status NOT IN ('ok','cache_only'))
FROM ai_calls WHERE called_at >= NOW() - INTERVAL '1 hour'
"""),
).fetchone()
total = int(row[0] or 0)
errs = int(row[1] or 0)
error_rate = (errs / total * 100) if total > 20 else 0
except Exception:
pass
# 預算告警(任一 ≥ 90%
budget_alert = False
try:
from datetime import datetime as _dt
today = _dt.now()
ms = _dt(today.year, today.month, 1)
bgs = session.execute(
sa_text("""
SELECT b.budget_usd,
COALESCE((SELECT SUM(cost_usd) FROM ai_calls
WHERE called_at >= :ms
AND (b.provider IS NULL OR provider = b.provider)), 0) AS spent
FROM ai_call_budgets b
"""),
{'ms': ms},
).fetchall()
for budget, spent in bgs:
if budget and float(budget) > 0 and float(spent) / float(budget) >= 0.9:
budget_alert = True
break
except Exception:
pass
alert_count = (
host_unhealthy
+ (1 if ep_pending > 0 else 0)
+ (1 if error_rate >= 30 else 0)
+ (1 if budget_alert else 0)
)
payload = {
'ok': True,
'alert_count': alert_count,
'host_unhealthy': host_unhealthy,
'ep_pending': ep_pending,
'error_rate_high': error_rate >= 30,
'budget_alert': budget_alert,
'tooltip': _build_indicator_tooltip(host_unhealthy, ep_pending, error_rate, budget_alert),
}
with _HEALTH_INDICATOR_CACHE_LOCK:
_HEALTH_INDICATOR_CACHE['payload'] = dict(payload)
_HEALTH_INDICATOR_CACHE['expires_at'] = time.time() + _HEALTH_INDICATOR_CACHE_TTL_SECONDS
return jsonify(payload)
finally:
session.close()
except Exception as e:
return jsonify({'ok': False, 'error': f'{type(e).__name__}: {str(e)[:200]}'}), 500
def _build_indicator_tooltip(host_unhealthy, ep_pending, error_rate, budget_alert) -> str:
parts = []
if host_unhealthy:
parts.append(f"{host_unhealthy} 主機異常")
if ep_pending > 0:
parts.append(f"{ep_pending} 待審")
if error_rate >= 30:
parts.append(f"錯誤率 {error_rate:.0f}%")
if budget_alert:
parts.append("預算 ≥ 90%")
if not parts:
return "AI 觀測台(一切正常)"
return "AI 觀測台 — " + " / ".join(parts)
def _latest_host_probe_unhealthy(host_label: str, window_minutes: int = 30) -> bool:
"""查 DB 最新 host_health_probe作為 AutoHeal 按鈕的真實狀態來源。
`_is_unhealthy()` 只代表 Ollama client 在 30 秒 TTL 內的記憶體標記;
scheduler / 頁面 probe 寫入的是 `host_health_probes`。L2 AutoHeal 入口
必須接受 DB 最新探針異常,避免 Telegram 或 Web 顯示主機已掛、按鈕卻拒絕執行。
"""
if not host_label:
return False
session = get_session()
try:
row = session.execute(
sa_text("""
SELECT healthy
FROM host_health_probes
WHERE host_label = :label
AND probed_at >= NOW() - (:minutes || ' minutes')::interval
ORDER BY probed_at DESC
LIMIT 1
"""),
{'label': host_label, 'minutes': int(window_minutes)},
).fetchone()
return bool(row is not None and row[0] is False)
except Exception:
return False
finally:
session.close()
@admin_observability_bp.route('/playbooks/toggle/<int:playbook_id>', methods=['POST'])
@login_required
def playbook_toggle(playbook_id: int):
"""Phase 50 N-3一鍵啟用/停用 playbookis_active 翻轉)。
用途:在 host_health 觀測台直接管理 AutoHeal playbook
不需 SSH 188 改 DB。
"""
try:
session = get_session()
try:
row = session.execute(
sa_text("SELECT id, name, is_active FROM playbooks WHERE id = :id"),
{'id': playbook_id},
).fetchone()
if not row:
return jsonify({'ok': False, 'error': f'playbook #{playbook_id} 不存在'}), 404
new_active = not bool(row[2])
session.execute(
sa_text("UPDATE playbooks SET is_active = :a, updated_at = NOW() WHERE id = :id"),
{'a': new_active, 'id': playbook_id},
)
session.commit()
return jsonify({
'ok': True,
'playbook_id': playbook_id,
'name': row[1],
'is_active': new_active,
'message': f'Playbook 「{row[1]}」已{"啟用" if new_active else "停用"}',
})
finally:
session.close()
except Exception as e:
return jsonify({'ok': False, 'error': f'{type(e).__name__}: {str(e)[:200]}'}), 500
@admin_observability_bp.route('/host_health/trigger_autoheal', methods=['POST'])
@login_required
def host_health_trigger_autoheal():
"""Phase 40 D-9 (L2 自動化):對掛掉的主機觸發 AutoHeal playbook。
用途admin 看到某台 Ollama 主機標記 unhealthy 時一鍵觸發 AutoHeal
(ADR-013) 跑對應 playbookDOCKER_RESTART / SSH_CMD / ALERT_ONLY
安全:只能對已標記 unhealthy 的 host 觸發;不接受任意 host URL防 SSRF
"""
try:
data = request.json or {}
host_label = (data.get('host_label') or '').strip()
from services.auto_heal_service import auto_heal_service
from services.ollama_service import _is_unhealthy, OLLAMA_HOST_PRIMARY, OLLAMA_HOST_SECONDARY, OLLAMA_HOST_FALLBACK
# 白名單對應
host_map = {
'Primary (GCP)': OLLAMA_HOST_PRIMARY,
'Secondary (GCP)': OLLAMA_HOST_SECONDARY,
'Fallback (111)': OLLAMA_HOST_FALLBACK,
}
host_url = host_map.get(host_label)
if not host_url:
return jsonify({'ok': False, 'error': f'未知 host_label: {host_label}'}), 400
if not (_is_unhealthy(host_url) or _latest_host_probe_unhealthy(host_label)):
return jsonify({
'ok': False,
'error': f'{host_label} 目前未標記異常,無需 AutoHeal',
}), 400
result = auto_heal_service.handle_exception(
error_type='ollama_unhealthy',
context={
'host_label': host_label,
'host_url': host_url,
'error_message': f'Ollama host {host_label} ({host_url}) marked unhealthy',
'triggered_by': 'admin_observability',
},
)
return jsonify({
'ok': bool(getattr(result, 'success', False)),
'action': getattr(result, 'action', None),
'message': getattr(result, 'message', '') or 'AutoHeal 已派遣',
})
except Exception as e:
return jsonify({'ok': False, 'error': f'{type(e).__name__}: {str(e)[:200]}'}), 500
@admin_observability_bp.route('/budget/force_throttle', methods=['POST'])
@login_required
def budget_force_throttle():
"""Phase 39 D-4 (L2 自動化):立即強制執行 cost_throttle evaluate不等 hourly cron
用途admin 在觀測台看到 ratio 飆超 110% 時不需等下次 cron
直接點按鈕強制 re-evaluate 三主機 throttle 狀態claude→gemini fallback 立即生效)。
"""
try:
from services.cost_throttle_service import (
evaluate_throttle_status, is_cost_throttle_enabled,
)
if not is_cost_throttle_enabled():
return jsonify({
'ok': False,
'error': 'COST_THROTTLE_ENABLED=false先設環境變數',
}), 400
new_state = evaluate_throttle_status()
throttled = [p for p, s in new_state.items() if s.get('throttled')]
return jsonify({
'ok': True,
'throttled_providers': throttled,
'state': new_state,
'message': f'已立即重算 throttle 狀態,被節流的 provider{throttled or "(無)"}',
})
except Exception as e:
return jsonify({'ok': False, 'error': f'{type(e).__name__}: {str(e)[:200]}'}), 500
@admin_observability_bp.route('/budget/update/<int:budget_id>', methods=['POST'])
@login_required
def budget_update(budget_id: int):
"""更新 budget_usd / alert_pct"""
try:
new_budget = float(request.json.get('budget_usd'))
new_alert = int(request.json.get('alert_pct', 80))
if new_budget <= 0 or not (1 <= new_alert <= 100):
return jsonify({'ok': False, 'error': 'invalid range'}), 400
session = get_session()
try:
session.execute(
sa_text("""
UPDATE ai_call_budgets
SET budget_usd = :b, alert_pct = :a, updated_at = NOW()
WHERE id = :id
"""),
{'b': new_budget, 'a': new_alert, 'id': budget_id},
)
session.commit()
return jsonify({'ok': True})
finally:
session.close()
except Exception as e:
return jsonify({'ok': False, 'error': str(e)[:200]}), 500
# ─────────────────────────────────────────────────────────────────────────────
# /observability/ppt_audit_history — Phase 29 PPT 視覺審核歷史
# ─────────────────────────────────────────────────────────────────────────────
def _guess_ppt_report_type_from_filename(filename: str) -> str:
"""從產出的 PPT 檔名推回報表類型,供 QA 失敗重跑與 triage 使用。"""
name = str(filename or "")
if not name:
return ""
try:
from services.ppt_auto_generation_service import REPORT_PREFIXES
for report_type, prefix in REPORT_PREFIXES.items():
if prefix and name.startswith(prefix):
return report_type
except Exception:
pass
fallback_prefixes = {
"ocbot_daily_": "daily",
"ocbot_weekly_": "weekly",
"ocbot_monthly_": "monthly",
"ocbot_quarterly_": "quarterly",
"ocbot_half_yearly_": "half_yearly",
"ocbot_annual_": "annual",
"ocbot_ttm_": "ttm",
"ocbot_strategy_": "strategy",
"ocbot_competitor_v4_": "competitor_v4",
"ocbot_competitor_": "competitor",
"ocbot_promo_compare_": "promo_compare",
"ocbot_promo_": "promo",
"ocbot_forecast_pre_event_": "forecast_pre_event",
"ocbot_vendor_": "vendor",
"ocbot_category_": "category",
"ocbot_customer_": "customer",
"ocbot_new_product_": "new_product",
"ocbot_market_intel_": "market_intel",
"ocbot_price_elasticity_": "price_elasticity",
}
for prefix, report_type in fallback_prefixes.items():
if name.startswith(prefix):
return report_type
return ""
def _ppt_filename_matches_month(filename: str, *, year: int, month: int) -> bool:
"""判斷檔名是否明確帶有指定月份,用於補足歷史檔案 mtime 漂移。"""
name = str(filename or "")
for match in re.finditer(r"(?<!\d)(20\d{2})(0[1-9]|1[0-2])([0-3]\d)(?!\d)", name):
try:
parsed = datetime(int(match.group(1)), int(match.group(2)), int(match.group(3)))
except ValueError:
continue
if parsed.year == year and parsed.month == month:
return True
for match in re.finditer(r"(?<!\d)(20\d{2})(0[1-9]|1[0-2])(?!\d)", name):
if int(match.group(1)) == year and int(match.group(2)) == month:
return True
return False
def _build_ppt_pipeline_view(files, auto_generation, audit_stats, generation_runs, vision_status, audit_records=None):
"""Compose page-level PPT pipeline health so the template stays declarative."""
files = files or []
auto_generation = auto_generation or {}
audit_stats = audit_stats or {}
generation_runs = generation_runs or []
vision_status = vision_status or {}
audit_records = audit_records or []
def _as_int(value):
try:
return int(value or 0)
except Exception:
return 0
def _as_float(value):
try:
return float(value or 0)
except Exception:
return 0.0
ready_count = _as_int(auto_generation.get('ready_count'))
total_count = _as_int(auto_generation.get('total'))
missing_count = _as_int(auto_generation.get('missing_count'))
coverage_pct = round((ready_count / total_count * 100), 1) if total_count else 0
valid_preview_count = sum(1 for item in files if item.get('file_exists') and item.get('is_valid_ppt'))
cached_preview_count = sum(
1 for item in files
if item.get('file_exists') and item.get('is_valid_ppt') and item.get('preview_cache_ready')
)
uncached_preview_count = max(valid_preview_count - cached_preview_count, 0)
broken_file_count = sum(1 for item in files if item.get('file_exists') and not item.get('is_valid_ppt'))
db_backed_count = sum(1 for item in files if item.get('source') in ('database', 'both'))
run_error_count = sum(1 for item in generation_runs if item.get('status') == 'error')
run_ready_count = sum(1 for item in generation_runs if item.get('status') == 'ready')
audit_total = _as_int(audit_stats.get('total'))
audit_issues = _as_int(audit_stats.get('total_issues'))
pass_rate = round(_as_float(audit_stats.get('pass_rate')), 1)
latest_run = generation_runs[0] if generation_runs else {}
latest_file = files[0] if files else {}
if not vision_status.get('ready'):
health_status = 'partial'
health_title = '視覺審核環境待確認'
health_message = 'PPT 可產出與預覽,但視覺檢查與轉檔條件仍需維持就緒。'
elif run_error_count or broken_file_count:
health_status = 'error'
health_title = '產線有異常待處理'
health_message = f'目前有 {run_error_count} 筆產出失敗、{broken_file_count} 份檔案不可預覽,應先處理最近錯誤。'
elif missing_count > 0:
health_status = 'partial'
health_title = '定義簡報尚未全數補齊'
health_message = f'本月已完成 {ready_count}/{total_count} 類,仍缺 {missing_count} 類,可等排程或手動補齊。'
elif audit_total and pass_rate < 80:
health_status = 'partial'
health_title = '審核通過率偏低'
health_message = f'本月視覺 QA 通過率 {pass_rate:.1f}%,需優先檢查失敗熱點與修復建議。'
elif total_count:
health_status = 'ready'
health_title = '產線覆蓋完整'
health_message = '定義簡報、產出紀錄、線上預覽與視覺 QA 都已具備可追蹤入口。'
else:
health_status = 'planned'
health_title = '產線等待資料'
health_message = '目前尚未讀到定義簡報覆蓋資料,頁面會保留安全空狀態。'
if audit_total:
qa_value = f'{pass_rate:.0f}%'
qa_meta = f'{audit_total} 筆審核 / {audit_issues} 個問題'
qa_status = 'ready' if pass_rate >= 80 and audit_issues == 0 else 'partial'
else:
qa_value = '待審核'
qa_meta = '可立即補跑,或等待 22:00 排程'
qa_status = 'planned'
stages = [
{
'key': 'schedule',
'icon': 'calendar-check',
'label': '排程節奏',
'value': '6 條',
'meta': '每日 / 每週 / 每月 / 每季 / 半年 / 年度',
'detail': auto_generation.get('cadence_summary') or '等待排程設定',
'status': 'ready' if auto_generation.get('enabled') else 'partial',
},
{
'key': 'coverage',
'icon': 'diagram-project',
'label': '定義覆蓋',
'value': f'{ready_count}/{total_count}' if total_count else '',
'meta': f'{coverage_pct:.1f}% 完成',
'detail': f'缺漏 {missing_count}' if missing_count else '當期目標完整',
'status': 'ready' if total_count and missing_count == 0 else 'partial',
},
{
'key': 'records',
'icon': 'database',
'label': '產出紀錄',
'value': f'{len(generation_runs)}',
'meta': f'{run_ready_count} 成功 / {run_error_count} 失敗',
'detail': latest_run.get('started_at') or '尚無本月產出紀錄',
'status': 'error' if run_error_count else ('ready' if generation_runs else 'planned'),
},
{
'key': 'preview',
'icon': 'desktop',
'label': '線上預覽',
'value': f'{valid_preview_count}',
'meta': f'{cached_preview_count} 份 PDF 快取',
'detail': latest_file.get('name') or '尚無可預覽檔案',
'status': 'error' if broken_file_count else ('ready' if valid_preview_count else 'planned'),
},
{
'key': 'qa',
'icon': 'eye',
'label': '視覺 QA',
'value': qa_value,
'meta': qa_meta,
'detail': '視覺檢查 + 修復建議 + 派工',
'status': qa_status,
},
]
missing_items = [
item for item in auto_generation.get('items', [])
if not item.get('ready')
]
preview_items = [
item for item in files
if item.get('file_exists') and item.get('is_valid_ppt')
]
audit_attention = [
item for item in audit_records
if item.get('audit_status') in ('failed', 'error')
]
run_failures = [
item for item in generation_runs
if item.get('status') == 'error'
]
broken_files = [
item for item in files
if item.get('file_exists') and not item.get('is_valid_ppt')
]
triage_entries = []
for item in run_failures[:3]:
triage_entries.append({
'title': item.get('report_label') or item.get('report_type') or '未知簡報',
'meta': item.get('started_at') or '時間未知',
'detail': _public_ppt_text(
item.get('error_msg'),
empty=f"{item.get('schedule_label') or '手動'} · {item.get('target_label') or '最新資料'}",
),
'status_label': '產出失敗',
'filename': item.get('file_name') or '',
'report_type': item.get('report_type') or '',
'can_regenerate': bool(item.get('report_type')),
})
for item in broken_files[:3]:
triage_entries.append({
'title': item.get('name') or '未命名檔案',
'meta': item.get('mtime') or '時間未知',
'detail': _public_ppt_text(
item.get('file_error'),
empty='PPTX 檔案不可預覽,建議重新產生。',
),
'status_label': '檔案異常',
'filename': item.get('name') or '',
'report_type': item.get('report_type') or '',
'can_regenerate': bool(item.get('report_type')),
})
for item in audit_attention[:3]:
filename = item.get('pptx_filename') or ''
inferred_report_type = item.get('report_type') or _guess_ppt_report_type_from_filename(filename)
triage_entries.append({
'title': filename or '未命名檔案',
'meta': item.get('audited_at') or '時間未知',
'detail': _public_ppt_text(
item.get('issue_summary') or item.get('error_msg'),
empty=f"問題 {item.get('issues_count', 0)}",
),
'status_label': '視覺 QA',
'filename': filename,
'report_type': inferred_report_type,
'can_regenerate': bool(inferred_report_type),
})
action_lanes = [
{
'key': 'triage',
'label': '異常優先',
'status': 'error' if triage_entries else 'ready',
'count': len(triage_entries),
'empty_text': '目前沒有產出失敗、檔案異常或視覺 QA 失敗。',
'entries': triage_entries[:4],
},
{
'key': 'missing',
'label': '待補齊',
'status': 'partial' if missing_items else 'ready',
'count': len(missing_items),
'empty_text': '目前定義簡報都已對齊本期目標。',
'entries': [
{
'title': item.get('label') or item.get('key') or '未命名簡報',
'meta': item.get('target_label') or '最新資料',
'detail': item.get('status_hint') or item.get('status_label') or '等待排程補齊',
'status_label': item.get('status_label') or '待補齊',
'report_type': item.get('key') or '',
'can_regenerate': bool(item.get('key')),
}
for item in missing_items[:4]
],
},
{
'key': 'preview',
'label': '可預覽',
'status': 'ready' if preview_items else 'planned',
'count': len(preview_items),
'empty_text': '目前沒有可線上預覽的 PPTX 檔案。',
'entries': [
{
'title': item.get('name') or '未命名檔案',
'meta': item.get('mtime') or '時間未知',
'detail': (
f"{item.get('size_kb') if item.get('size_kb') is not None else ''} KB · "
f"{item.get('source_label') or _public_ppt_source_label(item.get('source'))} · "
f"{'PDF 已快取' if item.get('preview_cache_ready') else '開啟時轉檔'}"
),
'status_label': 'PDF 快取' if item.get('preview_cache_ready') else '線上預覽',
'filename': item.get('name'),
}
for item in preview_items[:4]
],
},
{
'key': 'audit',
'label': '視覺 QA',
'status': 'error' if audit_attention else ('ready' if audit_total else 'planned'),
'count': len(audit_attention) if audit_attention else audit_total,
'empty_text': '目前沒有需要處理的視覺 QA 失敗紀錄。',
'entries': [
{
'title': item.get('pptx_filename') or '未命名檔案',
'meta': item.get('audited_at') or '時間未知',
'detail': _public_ppt_text(
item.get('issue_summary') or item.get('error_msg'),
empty=f"問題 {item.get('issues_count', 0)} 個,信心 {item.get('confidence', 0):.2f}",
),
'status_label': '需修復' if item.get('audit_status') == 'failed' else '需排查',
'filename': item.get('pptx_filename'),
'report_type': item.get('report_type') or _guess_ppt_report_type_from_filename(item.get('pptx_filename') or ''),
'can_regenerate': bool(item.get('report_type') or _guess_ppt_report_type_from_filename(item.get('pptx_filename') or '')),
}
for item in audit_attention[:4]
],
},
{
'key': 'records',
'label': '產出紀錄',
'status': 'error' if run_error_count else ('ready' if generation_runs else 'planned'),
'count': len(generation_runs),
'empty_text': '本月尚未看到簡報產出紀錄。',
'entries': [
{
'title': item.get('report_label') or item.get('report_type') or '未知簡報',
'meta': item.get('started_at') or '時間未知',
'detail': f"{item.get('schedule_label') or '手動'} · {item.get('target_label') or '最新資料'}",
'status_label': item.get('status_label') or item.get('status') or '未知',
'filename': item.get('file_name') or '',
}
for item in generation_runs[:4]
],
},
]
return {
'status': health_status,
'title': health_title,
'message': health_message,
'ready_count': ready_count,
'total_count': total_count,
'missing_count': missing_count,
'coverage_pct': coverage_pct,
'valid_preview_count': valid_preview_count,
'cached_preview_count': cached_preview_count,
'uncached_preview_count': uncached_preview_count,
'broken_file_count': broken_file_count,
'db_backed_count': db_backed_count,
'run_error_count': run_error_count,
'pass_rate': pass_rate,
'audit_total': audit_total,
'latest_run': latest_run,
'latest_file': latest_file,
'stages': stages,
'action_lanes': action_lanes,
}
def _build_ppt_operator_summary(files, auto_generation, pipeline_view, vision_status, audit_stats, generation_runs):
"""Build first-screen operator copy that prioritizes deck work over raw pipeline states."""
files = files or []
auto_generation = auto_generation or {}
pipeline_view = pipeline_view or {}
vision_status = vision_status or {}
audit_stats = audit_stats or {}
generation_runs = generation_runs or []
latest_preview = next(
(
item for item in files
if item.get('file_exists') and item.get('is_valid_ppt') and item.get('name')
),
None,
)
issue_count = int(audit_stats.get('total_issues') or 0) if audit_stats else 0
missing_count = int(auto_generation.get('missing_count') or 0)
valid_preview_count = int(pipeline_view.get('valid_preview_count') or 0)
cached_preview_count = int(pipeline_view.get('cached_preview_count') or 0)
audit_total = int(pipeline_view.get('audit_total') or 0)
run_error_count = int(pipeline_view.get('run_error_count') or 0)
broken_file_count = int(pipeline_view.get('broken_file_count') or 0)
blockers = vision_status.get('blockers') or []
if run_error_count or broken_file_count:
status = 'error'
headline = '先處理異常,再放行簡報'
message = f'目前有 {run_error_count} 筆產出失敗、{broken_file_count} 份檔案不可預覽,建議先看 Action Queue。'
primary_action = '查看待處理'
primary_anchor = '#ppt-action-queue'
elif missing_count:
status = 'partial'
headline = '定期簡報尚未全數補齊'
message = f'本期還有 {missing_count} 類定義簡報缺漏,可手動補齊或等待排程寫入產出紀錄。'
primary_action = '補齊缺漏'
primary_anchor = '#ppt-production-center'
elif not vision_status.get('ready'):
status = 'partial'
headline = '簡報可管理,視覺 QA 待啟用'
message = 'PPT 產出與預覽入口仍可用;視覺檢查與轉檔條件補齊後才會自動審核。'
primary_action = '查看就緒檢查'
primary_anchor = '#ppt-runtime-diagnostic'
elif issue_count:
status = 'partial'
headline = '有視覺問題待回放'
message = f'本期視覺 QA 發現 {issue_count} 個問題,請從問題追蹤或審核歷史回放檢查。'
primary_action = '查看問題'
primary_anchor = '#ppt-issue-board'
else:
status = 'ready' if valid_preview_count else 'planned'
headline = '簡報工作台待命'
message = '最新簡報、PDF 預覽、產出紀錄與視覺 QA 都集中在同一頁追蹤。'
primary_action = '查看簡報'
primary_anchor = '#ppt-deck-workbench'
latest_run = generation_runs[0] if generation_runs else {}
latest_deck_label = latest_preview.get('name') if latest_preview else '尚無可預覽 PPTX'
latest_deck_meta = (
f"{latest_preview.get('mtime') or '時間未知'} · "
f"{latest_preview.get('size_kb') if latest_preview.get('size_kb') is not None else ''} KB · "
f"{'PDF 已快取' if latest_preview and latest_preview.get('preview_cache_ready') else '首次開啟轉檔'}"
if latest_preview else
'請先補齊本期簡報或切換月份 / 報表類型'
)
return {
'status': status,
'headline': headline,
'message': message,
'primary_action': primary_action,
'primary_anchor': primary_anchor,
'latest_deck': latest_preview or {},
'latest_deck_label': latest_deck_label,
'latest_deck_meta': latest_deck_meta,
'latest_run_label': latest_run.get('report_label') or latest_run.get('report_type') or '尚無產出紀錄',
'latest_run_meta': latest_run.get('started_at') or '等待下一次排程寫入',
'blocker_text': ''.join(_public_ppt_text_list(blockers[:2])) if blockers else '',
'signals': [
{
'label': '可預覽簡報',
'value': valid_preview_count,
'meta': f'{cached_preview_count} 份 PDF 快取',
'status': 'ready' if valid_preview_count else 'planned',
},
{
'label': '待補齊定義',
'value': missing_count,
'meta': f"{auto_generation.get('ready_count', 0)}/{auto_generation.get('total', 0)} 已覆蓋",
'status': 'ready' if missing_count == 0 and auto_generation.get('total') else 'partial',
},
{
'label': '視覺 QA',
'value': audit_total if audit_total else '待跑',
'meta': '已就緒' if vision_status.get('ready') else '執行條件待確認',
'status': 'ready' if vision_status.get('ready') and not issue_count else 'partial',
},
{
'label': '視覺問題',
'value': issue_count,
'meta': '需回放' if issue_count else '目前無待處理',
'status': 'partial' if issue_count else 'ready',
},
],
}
def _enrich_ppt_coverage_items(auto_generation_items, files, generation_runs, audit_records):
"""Join coverage rows with file, DB run, preview and QA state for the UI matrix."""
files = files or []
generation_runs = generation_runs or []
audit_records = audit_records or []
file_by_name = {item.get('name'): item for item in files if item.get('name')}
latest_file_by_type = {}
for item in files:
report_type = item.get('report_type')
if report_type and report_type not in latest_file_by_type:
latest_file_by_type[report_type] = item
latest_run_by_type = {}
for item in generation_runs:
report_type = item.get('report_type')
if report_type and report_type not in latest_run_by_type:
latest_run_by_type[report_type] = item
latest_audit_by_file = {}
for item in audit_records:
filename = item.get('pptx_filename')
if filename and filename not in latest_audit_by_file:
latest_audit_by_file[filename] = item
enriched = []
for raw_item in auto_generation_items or []:
item = dict(raw_item)
report_type = item.get('key') or ''
latest_run = latest_run_by_type.get(report_type, {})
candidate_file = latest_file_by_type.get(report_type, {})
file_name = (
item.get('latest_file_name')
or latest_run.get('file_name')
or candidate_file.get('name')
or ''
)
file_item = file_by_name.get(file_name) or candidate_file or {}
audit = latest_audit_by_file.get(file_name, {})
sources = set(item.get('sources') or [])
if file_item.get('source'):
sources.add(file_item.get('source'))
file_exists = bool(file_item.get('file_exists') or ('filesystem' in sources and file_name))
valid_ppt = bool(file_exists and (file_item.get('is_valid_ppt') is not False))
db_backed = bool(
latest_run
or 'database' in sources
or file_item.get('source') in ('database', 'both')
)
preview_cached = bool(valid_ppt and file_item.get('preview_cache_ready'))
audit_status = audit.get('audit_status') or ''
run_status = latest_run.get('status') or ''
if latest_run and run_status == 'error':
db_status, db_label = 'error', '紀錄失敗'
elif db_backed:
db_status, db_label = 'ready', '已記錄'
else:
db_status, db_label = 'planned', '待紀錄'
if valid_ppt and preview_cached:
preview_status, preview_label = 'ready', 'PDF 快取'
elif valid_ppt:
preview_status, preview_label = 'partial', '可預覽'
elif file_name:
preview_status, preview_label = 'error', '不可預覽'
else:
preview_status, preview_label = 'planned', '待產檔'
if audit_status == 'passed':
qa_status, qa_label = 'ready', 'QA 通過'
elif audit_status == 'failed':
qa_status, qa_label = 'error', 'QA 有問題'
elif audit_status == 'error':
qa_status, qa_label = 'error', 'QA 錯誤'
elif audit_status == 'skipped':
qa_status, qa_label = 'partial', 'QA 跳過'
elif valid_ppt:
qa_status, qa_label = 'planned', '待 QA'
else:
qa_status, qa_label = 'planned', '待產檔'
if not item.get('ready'):
delivery_status, delivery_label = 'missing', '待產出'
elif latest_run and run_status == 'error':
delivery_status, delivery_label = 'error', '產出失敗'
elif file_name and not valid_ppt:
delivery_status, delivery_label = 'error', '檔案異常'
elif qa_status == 'error':
delivery_status, delivery_label = 'error', '需修復'
elif valid_ppt and db_backed and audit_status == 'passed':
delivery_status, delivery_label = 'ready', '可交付'
elif valid_ppt:
delivery_status, delivery_label = 'partial', '待驗收'
else:
delivery_status, delivery_label = item.get('status') or 'planned', item.get('status_label') or '待確認'
item.update({
'latest_file_name': file_name,
'latest_file_mtime': file_item.get('mtime') or item.get('latest_generated_at') or '',
'latest_file_size_kb': file_item.get('size_kb'),
'file_exists': file_exists,
'is_valid_ppt': valid_ppt,
'preview_cache_ready': preview_cached,
'db_status': db_status,
'db_label': db_label,
'preview_status': preview_status,
'preview_label': preview_label,
'qa_status': qa_status,
'qa_label': qa_label,
'delivery_status': delivery_status,
'delivery_label': delivery_label,
'audit_summary': _public_ppt_text(
audit.get('issue_summary') or audit.get('error_msg'),
max_chars=160,
),
'can_preview': valid_ppt and bool(file_name),
'can_prewarm': valid_ppt and bool(file_name) and not preview_cached,
'can_regenerate': bool(report_type),
})
enriched.append(item)
return enriched
@admin_observability_bp.route('/ppt_audit_history')
@login_required
def ppt_audit_history():
"""掃 reports/ 目錄列指定月份 daily 報表 + 從 ppt_audit_results 讀審核歷史Phase 38"""
import os
import zipfile
reports_dir = os.environ.get('REPORTS_DIR', '/app/data/reports')
files = []
audit_records = []
error = None
month_arg = request.args.get('month', '').strip()
report_type = request.args.get('report_type', 'daily').strip().lower() or 'daily'
try:
from services.ppt_auto_generation_service import get_report_type_options
report_type_options = get_report_type_options()
except Exception:
report_type_options = [
{'key': 'daily', 'label': '每日日報', 'prefix': 'ocbot_daily_'},
{'key': 'weekly', 'label': '週報', 'prefix': 'ocbot_weekly_'},
{'key': 'monthly', 'label': '月報', 'prefix': 'ocbot_monthly_'},
{'key': 'strategy', 'label': '策略', 'prefix': 'ocbot_strategy_'},
{'key': 'competitor', 'label': '競品', 'prefix': 'ocbot_competitor_'},
{'key': 'promo', 'label': '促銷', 'prefix': 'ocbot_promo_'},
{'key': 'all', 'label': '全部', 'prefix': 'all'},
]
report_type_map = {opt['key']: opt for opt in report_type_options}
if report_type not in report_type_map:
report_type = 'daily'
selected_report_type = report_type_map[report_type]
report_prefix = selected_report_type['prefix']
now = datetime.now()
target_year = now.year
target_month = now.month
if month_arg:
sep = '-' if '-' in month_arg else '/' if '/' in month_arg else None
parts = month_arg.split(sep) if sep else [month_arg]
try:
if len(parts) == 2:
target_year = int(parts[0])
target_month = int(parts[1])
elif len(parts) == 1 and 1 <= len(parts[0]) <= 2:
target_month = int(parts[0])
else:
raise ValueError
if not (1 <= target_month <= 12):
raise ValueError
except Exception:
target_year = now.year
target_month = now.month
month_start = datetime(target_year, target_month, 1)
month_end = datetime(target_year + 1, 1, 1) if target_month == 12 else datetime(target_year, target_month + 1, 1)
month_start_ts = int(month_start.timestamp())
month_end_ts = int(month_end.timestamp())
month_label = month_start.strftime('%Y-%m')
prev_month = target_month - 1
prev_year = target_year
if prev_month == 0:
prev_month = 12
prev_year -= 1
next_month = target_month + 1
next_year = target_year
if next_month == 13:
next_month = 1
next_year += 1
prev_month_label = f"{prev_year:04d}-{prev_month:02d}"
next_month_label = f"{next_year:04d}-{next_month:02d}"
show_next_month = (next_year < now.year) or (next_year == now.year and next_month <= now.month)
def _inspect_ppt_file(file_path: str):
try:
with zipfile.ZipFile(file_path, 'r') as zf:
bad = zf.testzip()
return (bad is None, None if bad is None else f'壓縮檔異常:{bad}')
except zipfile.BadZipFile:
return False, 'PPTX 檔案損毀(非有效 zip'
except Exception as e:
return False, f'檢查失敗:{str(e)[:60]}'
def _guess_report_type(filename: str) -> str:
for opt in report_type_options:
prefix = opt.get('prefix')
if prefix and prefix != 'all' and filename.startswith(prefix):
return opt.get('key') or ''
return report_type if report_type != 'all' else ''
def _preview_cache_payload(file_path: str):
payload = {
'preview_cache_ready': False,
'preview_cache_size_kb': None,
'preview_cache_mtime': '',
}
try:
from services.ppt_preview_service import get_ppt_preview_cache_info
info = get_ppt_preview_cache_info(file_path)
payload['preview_cache_ready'] = bool(info.cache_exists)
payload['preview_cache_size_kb'] = info.cache_size_kb
if info.cache_mtime_ts:
payload['preview_cache_mtime'] = datetime.fromtimestamp(info.cache_mtime_ts).strftime('%Y-%m-%d %H:%M')
except Exception:
logger.debug("PPT preview cache state unavailable", exc_info=True)
return payload
try:
if not os.path.isdir(reports_dir):
error = f'{reports_dir} 目錄不存在'
else:
files_by_name = {}
for f in os.listdir(reports_dir):
if not f.lower().endswith('.pptx'):
continue
if report_prefix != 'all' and not f.startswith(report_prefix):
continue
full = os.path.join(reports_dir, f)
# symlink 防護reports/ 內不接受 symlink避免目錄逃逸Critic MEDIUM #2
if os.path.islink(full):
continue
try:
mtime = os.path.getmtime(full)
matches_selected_month = (
month_start_ts <= mtime < month_end_ts
or _ppt_filename_matches_month(f, year=target_year, month=target_month)
)
if matches_selected_month:
is_valid, check_msg = _inspect_ppt_file(full)
files_by_name[f] = {
'source': 'filesystem',
'name': f,
'size_kb': round(os.path.getsize(full) / 1024, 1),
'mtime': datetime.fromtimestamp(mtime).strftime('%Y-%m-%d %H:%M'),
'mtime_ts': mtime,
'file_exists': True,
'file_path': full,
'report_type': _guess_report_type(f),
'is_valid_ppt': is_valid,
'file_error': check_msg,
**_preview_cache_payload(full),
}
except OSError:
continue
# 補充:若 188 主機僅保留 DB 快取紀錄或掃描過程漏掉,仍可回補當月報表清單
try:
session = get_session()
try:
sql = """
SELECT report_type, file_path, file_size, generated_at
FROM ppt_reports
WHERE generated_at >= :month_start
AND generated_at < :month_end
"""
params = {'month_start': month_start, 'month_end': month_end}
if report_type != 'all':
sql += " AND report_type = :report_type"
params['report_type'] = report_type
rows = session.execute(sa_text(sql), params).fetchall()
for rpt_type, file_path, file_size, generated_at in rows:
if not file_path:
continue
name = os.path.basename(file_path)
if not name.lower().endswith('.pptx'):
continue
if report_prefix != 'all' and not name.startswith(report_prefix):
continue
if name in files_by_name:
files_by_name[name]['source'] = 'both'
continue
if not generated_at:
continue
mtime = generated_at.timestamp()
if not (month_start_ts <= mtime < month_end_ts):
continue
candidate_path = os.path.join(reports_dir, name)
exists = os.path.isfile(candidate_path)
is_valid = False
check_msg = '檔案未落盤'
if exists:
is_valid, check_msg = _inspect_ppt_file(candidate_path)
files_by_name[name] = {
'source': 'database',
'name': name,
'size_kb': round((file_size or 0) / 1024, 1),
'mtime': generated_at.strftime('%Y-%m-%d %H:%M'),
'mtime_ts': mtime,
'file_exists': exists,
'file_path': candidate_path if exists else file_path,
'report_type': rpt_type,
'is_valid_ppt': is_valid,
'file_error': None if exists else check_msg,
**(_preview_cache_payload(candidate_path) if exists else {}),
}
finally:
session.close()
except Exception:
pass
files = list(files_by_name.values())
files.sort(key=lambda x: x['mtime_ts'], reverse=True)
for item in files:
item['source_label'] = _public_ppt_source_label(item.get('source'))
item['file_error'] = _public_ppt_text(item.get('file_error'), max_chars=120)
except Exception as e:
error = _public_ppt_text(f'{type(e).__name__}: {str(e)[:200]}', empty='簡報清單讀取異常')
audit_filter_sql = ""
audit_params = {'month_start': month_start, 'month_end': month_end}
if report_prefix != 'all':
audit_filter_sql = " AND pptx_filename LIKE :audit_prefix"
audit_params['audit_prefix'] = f"{report_prefix}%"
def _summarize_ppt_issues(raw_issues) -> str:
"""把 ppt_audit_results.issues_found 壓成表格可讀的診斷摘要。"""
if not raw_issues:
return ''
try:
import json as _json
issues_payload = _json.loads(raw_issues) if isinstance(raw_issues, str) else raw_issues
except Exception:
return ''
if not isinstance(issues_payload, list):
return ''
snippets = []
for item in issues_payload:
if not isinstance(item, dict):
continue
slide = item.get('slide')
for issue in item.get('issues') or []:
text = _public_ppt_text(issue, max_chars=120)
if not text:
continue
prefix = f"S{slide}: " if slide else ""
snippets.append(f"{prefix}{text}")
if len(snippets) >= 3:
return ''.join(snippets)
return ''.join(snippets)
def _load_ppt_issues(raw_issues):
if not raw_issues:
return []
try:
import json as _json
issues_payload = _json.loads(raw_issues) if isinstance(raw_issues, str) else raw_issues
except Exception:
return []
return issues_payload if isinstance(issues_payload, list) else []
def _classify_ppt_issue(issue_text: str):
text = issue_text or ''
if _ppt_text_has_internal_detail(text):
return '審核執行', 'warn'
if any(k in text for k in ['圖表', '切掉', '截斷', '超出', '溢出']):
return '版面越界', 'error'
if any(k in text for k in ['空白', '未填', '缺少', '無資料']):
return '內容缺漏', 'warn'
if any(k in text for k in ['低對比', '顏色', '字體', '字型', '閱讀']):
return '可讀性', 'warn'
return '視覺問題', 'warn'
def _extract_ppt_issue_items(raw_issues, *, pptx_filename: str, audited_at: str):
issue_items = []
report_type_for_file = _guess_report_type(pptx_filename)
for slide_item in _load_ppt_issues(raw_issues):
if not isinstance(slide_item, dict):
continue
slide = slide_item.get('slide')
slide_label = f"S{slide}" if slide else 'S?'
for raw_issue in slide_item.get('issues') or []:
category, status = _classify_ppt_issue(str(raw_issue or ''))
issue_text = _public_ppt_text(raw_issue, max_chars=140)
if not issue_text:
continue
issue_items.append({
'pptx_filename': pptx_filename,
'report_type': report_type_for_file,
'audited_at': audited_at,
'slide_label': slide_label,
'category': category,
'status': status,
'text': issue_text,
})
return issue_items
# Phase 38+:讀指定月份 / 指定簡報類型 audit 歷史
try:
session = get_session()
try:
audit_rows = session.execute(
sa_text(f"""
SELECT audited_at, pptx_filename, audit_status,
issues_count, confidence, duration_ms, error_msg,
issues_found
FROM ppt_audit_results
WHERE audited_at >= :month_start
AND audited_at < :month_end
{audit_filter_sql}
ORDER BY audited_at DESC
LIMIT 1000
"""),
audit_params,
).fetchall()
audit_records = []
for r in audit_rows:
audited_at = r[0].strftime('%Y-%m-%d %H:%M')
pptx_filename = r[1]
raw_issues = r[7]
audit_records.append({
'audited_at': audited_at,
'pptx_filename': pptx_filename,
'report_type': _guess_report_type(pptx_filename),
'audit_status': r[2],
'issues_count': int(r[3] or 0),
'confidence': float(r[4] or 0),
'duration_ms': int(r[5] or 0),
'error_msg': _public_ppt_text(r[6], max_chars=160),
'issue_summary': _summarize_ppt_issues(raw_issues),
'issue_items': _extract_ppt_issue_items(
raw_issues,
pptx_filename=pptx_filename,
audited_at=audited_at,
),
})
finally:
session.close()
except Exception:
logger.debug("PPT audit history table unavailable; rendering empty audit history", exc_info=True)
issue_items = [
issue
for record in audit_records
for issue in record.get('issue_items', [])
]
issue_files = {issue.get('pptx_filename') for issue in issue_items if issue.get('pptx_filename')}
issue_digest = {
'total': len(issue_items),
'files': len(issue_files),
'error_count': sum(1 for issue in issue_items if issue.get('status') == 'error'),
'warn_count': sum(1 for issue in issue_items if issue.get('status') == 'warn'),
'latest_audit': issue_items[0].get('audited_at') if issue_items else '',
}
# PPT vision 啟用狀態
vision_status = {
'enabled': False,
'ready': False,
'blockers': ['視覺狀態讀取失敗'],
'ready_count': 0,
'check_count': 0,
'summary': '視覺 QA runtime 狀態讀取失敗。',
'readiness_checks': [],
'next_actions': ['確認 ppt_vision_service import 與 runtime 設定後重新整理此頁。'],
}
try:
from services.ppt_vision_service import get_ppt_vision_audit_status, get_ppt_vision_runtime_status
vision_status = get_ppt_vision_runtime_status()
vision_enabled = bool(vision_status.get('enabled'))
vision_audit_status = get_ppt_vision_audit_status()
except Exception:
vision_enabled = False
vision_audit_status = {
'ok': False,
'running': False,
'status': 'unknown',
'status_label': '讀取失敗',
'message': '最近視覺 QA 狀態讀取失敗。',
'last_run': None,
}
vision_status = _public_ppt_vision_status(vision_status)
vision_audit_status = _public_ppt_vision_audit_status(vision_audit_status)
# Phase 47 K-6: 月報表統計 + top failure files
audit_30d_stats = {}
top_failure_files = []
try:
s_ppt = get_session()
try:
stat_row = s_ppt.execute(
sa_text(f"""
SELECT COUNT(*),
COUNT(*) FILTER (WHERE audit_status = 'passed'),
COUNT(*) FILTER (WHERE audit_status = 'failed'),
COUNT(*) FILTER (WHERE audit_status = 'skipped'),
COUNT(*) FILTER (WHERE audit_status = 'error'),
COALESCE(AVG(confidence) FILTER (WHERE audit_status = 'passed'), 0),
COALESCE(SUM(issues_count), 0)
FROM ppt_audit_results
WHERE audited_at >= :month_start
AND audited_at < :month_end
{audit_filter_sql}
"""),
audit_params,
).fetchone()
total_30d = int(stat_row[0] or 0)
audit_30d_stats = {
'total': total_30d,
'passed': int(stat_row[1] or 0),
'failed': int(stat_row[2] or 0),
'skipped': int(stat_row[3] or 0),
'error': int(stat_row[4] or 0),
'avg_confidence': round(float(stat_row[5] or 0), 3),
'total_issues': int(stat_row[6] or 0),
'pass_rate': (float(stat_row[1] or 0) / total_30d * 100) if total_30d else 0,
}
top_fail_rows = s_ppt.execute(
sa_text(f"""
SELECT pptx_filename, COUNT(*) AS attempts,
SUM(issues_count) AS total_issues,
MAX(audited_at) AS last_audit
FROM ppt_audit_results
WHERE audit_status IN ('failed', 'error')
AND audited_at >= :month_start
AND audited_at < :month_end
{audit_filter_sql}
GROUP BY pptx_filename
ORDER BY attempts DESC, total_issues DESC LIMIT 10
"""),
audit_params,
).fetchall()
top_failure_files = [
{
'filename': r[0], 'attempts': int(r[1] or 0),
'total_issues': int(r[2] or 0),
'last_audit': r[3].strftime('%Y-%m-%d %H:%M') if r[3] else '',
}
for r in top_fail_rows
]
finally:
s_ppt.close()
except Exception:
pass
# Phase 41 E-2: 對最近 3 筆 failed audit 跑 RAG 找相似修法
rag_fixes = []
failed_records = [r for r in audit_records if r.get('audit_status') in ('failed', 'error')][:3]
if failed_records:
try:
from services.rag_service import rag_service
for fr in failed_records:
try:
err_text = fr.get('error_msg') or 'PPT vision audit failed'
rag_result = rag_service.query(
text=f"PPT 視覺審核失敗 {err_text[:200]} 怎麼修",
caller='admin_ppt_audit',
top_k=2,
threshold=0.6,
)
if rag_result.hits:
rag_fixes.append({
'pptx_filename': fr.get('pptx_filename'),
'audited_at': fr.get('audited_at'),
'error_msg': _public_ppt_text(err_text, max_chars=160),
'hits': [
{
'id': h.get('id'),
'insight_type': h.get('insight_type'),
'content': _public_ppt_text(h.get('content'), max_chars=180),
'similarity': round(float(h.get('similarity', 0)), 3),
}
for h in rag_result.hits[:2]
],
})
except Exception:
pass
except Exception:
pass
auto_generation = {
'enabled': False,
'items': [],
'missing_report_types': [],
'missing_count': 0,
'ready_count': 0,
'total': 0,
'last_run': None,
'can_auto_start': False,
'cadences': [],
'cadence_summary': '',
}
generation_runs = []
try:
from services.ppt_auto_generation_service import (
get_defined_report_coverage,
get_generation_run_history,
get_schedule_cadence_status,
)
auto_generation = get_defined_report_coverage(
month_start=month_start,
month_end=month_end,
reports_dir=reports_dir,
)
auto_generation.setdefault('cadences', get_schedule_cadence_status(auto_generation.get('items', [])))
auto_generation.setdefault(
'cadence_summary',
''.join(c.get('schedule_text', '') for c in auto_generation.get('cadences', []) if c.get('schedule_text')),
)
auto_generation['can_auto_start'] = (
bool(auto_generation.get('enabled'))
and int(auto_generation.get('missing_count') or 0) > 0
and month_label == datetime.now().strftime('%Y-%m')
)
generation_runs = get_generation_run_history(
month_start=month_start,
month_end=month_end,
limit=24,
)
generation_runs = [
{
**dict(item),
'error_msg': _public_ppt_text(item.get('error_msg'), max_chars=160),
'status_label': _public_ppt_text(item.get('status_label'), max_chars=60) or item.get('status_label'),
}
for item in generation_runs
]
except Exception:
logger.debug("PPT auto-generation coverage unavailable", exc_info=True)
pipeline_view = _build_ppt_pipeline_view(
files=files,
auto_generation=auto_generation,
audit_stats=audit_30d_stats,
generation_runs=generation_runs,
vision_status=vision_status,
audit_records=audit_records,
)
operator_summary = _build_ppt_operator_summary(
files=files,
auto_generation=auto_generation,
pipeline_view=pipeline_view,
vision_status=vision_status,
audit_stats=audit_30d_stats,
generation_runs=generation_runs,
)
vision_audit_filenames = [
item.get('name')
for item in files
if item.get('file_exists') and item.get('is_valid_ppt') and item.get('name')
][:10]
aider_heal_active_jobs = _list_ppt_aider_heal_active_jobs()
auto_generation_items = _enrich_ppt_coverage_items(
auto_generation.get('items', []),
files,
generation_runs,
audit_records,
)
return render_template(
'admin/ppt_audit_history.html',
active_page='obs_ppt_audit',
report_month=month_label,
report_type=report_type,
report_type_options=report_type_options,
selected_report_type=selected_report_type,
prev_month_label=prev_month_label,
next_month_label=next_month_label,
show_next_month=show_next_month,
files=files,
audit_records=audit_records,
rag_fixes=rag_fixes,
audit_30d_stats=audit_30d_stats,
top_failure_files=top_failure_files,
vision_enabled=vision_enabled,
vision_status=vision_status,
vision_audit_status=vision_audit_status,
auto_generation=auto_generation,
auto_generation_items=auto_generation_items,
auto_generation_missing_report_types=auto_generation.get('missing_report_types', []),
generation_runs=generation_runs,
pipeline_view=pipeline_view,
operator_summary=operator_summary,
vision_audit_filenames=vision_audit_filenames,
issue_items=issue_items,
issue_digest=issue_digest,
aider_heal_active_jobs=aider_heal_active_jobs,
aider_heal_active_count=len(aider_heal_active_jobs),
error=error,
)
# ─────────────────────────────────────────────────────────────────────────────
# /observability/host_health — 三主機 + MCP 健康度
# ─────────────────────────────────────────────────────────────────────────────
@admin_observability_bp.route('/host_health')
@login_required
def host_health_dashboard():
"""三主機 Ollama + 4 個 MCP server 即時健康(同時寫入 host_health_probes 留歷史)"""
import time as _time
def _session_uses_sqlite(session) -> bool:
try:
bind = session.get_bind()
return getattr(getattr(bind, 'dialect', None), 'name', None) == 'sqlite'
except Exception:
return False
ollama_hosts = []
probe_records = [] # 收集本次 probe 結果以批次寫 DB
try:
from services.ollama_service import (
OLLAMA_HOST_PRIMARY, OLLAMA_HOST_SECONDARY, OLLAMA_HOST_FALLBACK,
_is_unhealthy, _unhealthy_marks,
)
from services.ollama_health_probe import (
host_health_model_probe_enabled,
probe_ollama_embedding_runtime,
)
import requests as _r
for label, host in [
('Primary (GCP)', OLLAMA_HOST_PRIMARY),
('Secondary (GCP)', OLLAMA_HOST_SECONDARY),
('Fallback (111)', OLLAMA_HOST_FALLBACK),
]:
entry = {'label': label, 'host': host, 'healthy': False,
'unhealthy_mark': _is_unhealthy(host), 'models': [], 'error': None}
t0 = _time.monotonic()
err = None
try:
resp = _r.get(f"{host.rstrip('/')}/api/tags", timeout=3)
if resp.status_code == 200:
entry['healthy'] = True
entry['models'] = [
m.get('name', '') for m in resp.json().get('models', [])
][:15]
if host_health_model_probe_enabled(label):
model_ok, model_err = probe_ollama_embedding_runtime(_r, host)
if not model_ok:
entry['healthy'] = False
err = model_err
else:
err = f"HTTP {resp.status_code}"
except Exception as e:
err = f"{type(e).__name__}: {str(e)[:200]}"
entry['error'] = err
response_ms = int((_time.monotonic() - t0) * 1000)
probe_records.append({
'host_label': label, 'host_url': host, 'healthy': entry['healthy'],
'unhealthy_mark': entry['unhealthy_mark'],
'models_count': len(entry['models']), 'response_ms': response_ms,
'error_msg': err,
})
ollama_hosts.append(entry)
except Exception:
pass
# Phase 38寫入 host_health_probes 留歷史(失敗安全,不擋頁面渲染)
if probe_records:
try:
_session = get_session()
try:
if _session_uses_sqlite(_session):
logger.debug("Skipping host health probe persistence on SQLite")
else:
for rec in probe_records:
_session.execute(
sa_text("""
INSERT INTO host_health_probes
(host_label, host_url, healthy, unhealthy_mark,
models_count, response_ms, error_msg)
VALUES
(:host_label, :host_url, :healthy, :unhealthy_mark,
:models_count, :response_ms, :error_msg)
"""),
rec,
)
_session.commit()
finally:
_session.close()
except Exception:
logger.warning("Failed to persist host health probe records", exc_info=True)
# MCP server 健康
mcp_status = {}
try:
from services.mcp_router import mcp_router
mcp_status = mcp_router.health_check()
except Exception:
pass
# cost throttle 狀態
throttle_state = {}
try:
from services.cost_throttle_service import get_throttle_state
throttle_state = get_throttle_state()
except Exception:
pass
# Phase 38讀過去 24h 三主機健康歷史(給趨勢卡片)
health_history = []
mcp_24h = [] # Phase 39 D-2: MCP 24h 各 server 工作量
aiops_summary = {} # Phase 39 D-5: incidents + heal_logs 7d 統計
try:
_session2 = get_session()
try:
history_rows = _session2.execute(
sa_text("""
SELECT host_label,
COUNT(*) FILTER (WHERE healthy) AS up_count,
COUNT(*) FILTER (WHERE NOT healthy) AS down_count,
COALESCE(AVG(response_ms) FILTER (WHERE healthy), 0) AS avg_ms,
COUNT(*) AS total
FROM host_health_probes
WHERE probed_at >= NOW() - INTERVAL '24 hours'
GROUP BY host_label
ORDER BY host_label
"""),
).fetchall()
health_history = [
{
'host_label': r[0],
'up_count': int(r[1] or 0),
'down_count': int(r[2] or 0),
'avg_ms': int(r[3] or 0),
'total': int(r[4] or 0),
'uptime_pct': (float(r[1] or 0) / float(r[4]) * 100) if r[4] else 0,
}
for r in history_rows
]
# Phase 39 D-5incidents + heal_logs 過去 7d 統計
try:
inc_rows = _session2.execute(
sa_text("""
SELECT
COUNT(*) AS total_incidents,
COUNT(*) FILTER (WHERE status = 'open') AS open_count,
COUNT(*) FILTER (WHERE status = 'resolved') AS resolved_count,
COUNT(*) FILTER (WHERE severity = 'P0') AS p0_count,
COUNT(*) FILTER (WHERE severity = 'P1') AS p1_count
FROM incidents
WHERE created_at >= NOW() - INTERVAL '7 days'
"""),
).fetchone()
heal_rows = _session2.execute(
sa_text("""
SELECT
COUNT(*) AS total_heals,
COUNT(*) FILTER (WHERE result = 'success') AS heal_success,
COUNT(*) FILTER (WHERE result = 'failed') AS heal_failed,
COALESCE(AVG(duration_ms) FILTER (WHERE result = 'success'), 0) AS avg_ms
FROM heal_logs
WHERE created_at >= NOW() - INTERVAL '7 days'
"""),
).fetchone()
aiops_summary = {
'incidents_total': int(inc_rows[0] or 0),
'incidents_open': int(inc_rows[1] or 0),
'incidents_resolved': int(inc_rows[2] or 0),
'incidents_p0': int(inc_rows[3] or 0),
'incidents_p1': int(inc_rows[4] or 0),
'heals_total': int(heal_rows[0] or 0),
'heals_success': int(heal_rows[1] or 0),
'heals_failed': int(heal_rows[2] or 0),
'heals_avg_ms': int(heal_rows[3] or 0),
'heal_success_rate': (
float(heal_rows[1] or 0) / float(heal_rows[0]) * 100
) if heal_rows[0] else 0,
}
# Phase 54 R-3: heal 7d daily success rate sparkline
heal_daily = _session2.execute(
sa_text("""
SELECT date_trunc('day', created_at)::date AS d,
COUNT(*) AS total,
COUNT(*) FILTER (WHERE result = 'success') AS ok
FROM heal_logs
WHERE created_at >= NOW() - INTERVAL '7 days'
GROUP BY d ORDER BY d ASC
"""),
).fetchall()
aiops_summary['heal_sparkline'] = [
{
'date': r[0].strftime('%m-%d') if r[0] else '',
'total': int(r[1] or 0),
'ok': int(r[2] or 0),
'rate': (float(r[2] or 0) / float(r[1]) * 100) if r[1] else 0,
}
for r in heal_daily
]
except Exception:
aiops_summary = {}
# Phase 39 D-2MCP 24h 工作量(每個 server
mcp_rows = _session2.execute(
sa_text("""
SELECT server,
COUNT(*) AS total_calls,
COUNT(*) FILTER (WHERE status = 'ok') AS ok_calls,
COUNT(*) FILTER (WHERE cache_hit) AS cache_hits,
COALESCE(SUM(cost_usd), 0) AS total_cost,
COALESCE(AVG(duration_ms), 0) AS avg_ms,
COUNT(DISTINCT tool) AS tools_used
FROM mcp_calls
WHERE called_at >= NOW() - INTERVAL '24 hours'
GROUP BY server
ORDER BY total_calls DESC
"""),
).fetchall()
mcp_24h = [
{
'server': r[0],
'total_calls': int(r[1] or 0),
'ok_calls': int(r[2] or 0),
'cache_hits': int(r[3] or 0),
'total_cost': float(r[4] or 0),
'avg_ms': int(r[5] or 0),
'tools_used': int(r[6] or 0),
'success_rate': (float(r[2] or 0) / float(r[1]) * 100) if r[1] else 0,
'cache_rate': (float(r[3] or 0) / float(r[1]) * 100) if r[1] else 0,
}
for r in mcp_rows
]
finally:
_session2.close()
except Exception:
logger.debug("MCP calls table unavailable; rendering empty MCP 24h summary", exc_info=True)
# Phase 47 K-1: incidents + heal_logs 詳細列表 + playbooks 排行 + backup + embed queue
recent_incidents = []
recent_heals = []
playbook_ranking = []
backup_history = []
embed_queue_pending = 0
embed_queue_failed = 0
try:
s3 = get_session()
try:
inc_rows = s3.execute(
sa_text("""
SELECT id, created_at, task_name, error_type, severity,
status, error_message, retry_count, resolved_at
FROM incidents
ORDER BY created_at DESC LIMIT 10
"""),
).fetchall()
recent_incidents = [
{
'id': r[0], 'created_at': r[1].strftime('%Y-%m-%d %H:%M'),
'task_name': r[2], 'error_type': r[3], 'severity': r[4],
'status': r[5], 'error_message': (r[6] or '')[:200],
'retry_count': int(r[7] or 0),
'resolved_at': r[8].strftime('%Y-%m-%d %H:%M') if r[8] else None,
}
for r in inc_rows
]
heal_rows = s3.execute(
sa_text("""
SELECT h.id, h.created_at, h.action_type, h.result,
h.duration_ms, h.action_detail, h.incident_id,
i.error_type
FROM heal_logs h
LEFT JOIN incidents i ON i.id = h.incident_id
ORDER BY h.created_at DESC LIMIT 10
"""),
).fetchall()
recent_heals = [
{
'id': r[0], 'created_at': r[1].strftime('%Y-%m-%d %H:%M'),
'action_type': r[2], 'result': r[3],
'duration_ms': int(r[4] or 0),
'action_detail': (r[5] or '')[:160],
'incident_id': r[6], 'error_type': r[7],
}
for r in heal_rows
]
# playbooks 庫排行success_count + fail_count + 是否 active
pb_rows = s3.execute(
sa_text("""
SELECT id, name, error_type, action_type, severity_min,
success_count, fail_count, is_active, cooldown_min
FROM playbooks
ORDER BY (success_count + fail_count) DESC, success_count DESC
LIMIT 12
"""),
).fetchall()
playbook_ranking = [
{
'id': int(r[0]),
'name': r[1], 'error_type': r[2], 'action_type': r[3],
'severity': r[4], 'success': int(r[5] or 0),
'fail': int(r[6] or 0), 'is_active': bool(r[7]),
'cooldown_min': int(r[8] or 0),
'success_rate': (
float(r[5] or 0) / float((r[5] or 0) + (r[6] or 0)) * 100
) if ((r[5] or 0) + (r[6] or 0)) > 0 else 0,
}
for r in pb_rows
]
# backup_log 7d 歷史
bk_rows = s3.execute(
sa_text("""
SELECT created_at, backup_type, status, file_size_bytes,
duration_seconds, error_message
FROM backup_log
WHERE created_at >= NOW() - INTERVAL '7 days'
ORDER BY created_at DESC LIMIT 10
"""),
).fetchall()
backup_history = [
{
'created_at': r[0].strftime('%Y-%m-%d %H:%M'),
'backup_type': r[1], 'status': r[2],
'size_mb': round(float(r[3] or 0) / (1024 * 1024), 1),
'duration_s': round(float(r[4] or 0), 1),
'error': (r[5] or '')[:120],
}
for r in bk_rows
]
# embedding_retry_queue pending / failed
embed_q = s3.execute(
sa_text("""
SELECT
COUNT(*) FILTER (WHERE status = 'pending'),
COUNT(*) FILTER (WHERE status = 'failed')
FROM embedding_retry_queue
"""),
).fetchone()
embed_queue_pending = int(embed_q[0] or 0)
embed_queue_failed = int(embed_q[1] or 0)
finally:
s3.close()
except Exception:
pass
return render_template(
'admin/host_health.html',
active_page='obs_host_health',
ollama_hosts=ollama_hosts,
mcp_status=mcp_status,
throttle_state=throttle_state,
health_history=health_history,
mcp_24h=mcp_24h,
aiops_summary=aiops_summary,
recent_incidents=recent_incidents,
recent_heals=recent_heals,
playbook_ranking=playbook_ranking,
backup_history=backup_history,
embed_queue_pending=embed_queue_pending,
embed_queue_failed=embed_queue_failed,
)