diff --git a/routes/admin_observability_routes.py b/routes/admin_observability_routes.py index e0b92b8..25834af 100644 --- a/routes/admin_observability_routes.py +++ b/routes/admin_observability_routes.py @@ -79,27 +79,25 @@ def ai_calls_dashboard(): {'since': since}, ).fetchall() - # 3. TOP 20 calls(最近)— 動態 WHERE - where_parts = ["called_at >= :since"] - params = {'since': since} - if caller_filter: - where_parts.append("caller = :caller") - params['caller'] = caller_filter - if provider_filter: - where_parts.append("provider = :provider") - params['provider'] = provider_filter - + # 3. TOP 100 calls — Phase 33 Critic HIGH #2 修補: + # 改用固定 SQL + 全綁參數,移除 f-string 動態 WHERE 拼接(防後人不慎注入) recent = session.execute( - sa_text(f""" + sa_text(""" SELECT id, called_at, caller, provider, model, input_tokens, output_tokens, duration_ms, status, cost_usd, cache_hit, rag_hit FROM ai_calls - WHERE {' AND '.join(where_parts)} + WHERE called_at >= :since + AND (:caller_f = '' OR caller = :caller_f) + AND (:provider_f = '' OR provider = :provider_f) ORDER BY called_at DESC LIMIT 100 """), - params, + { + 'since': since, + 'caller_f': caller_filter, + 'provider_f': provider_filter, + }, ).fetchall() # 4. caller 列表(給篩選 dropdown) @@ -369,6 +367,7 @@ def budget_update(budget_id: int): def ppt_audit_history(): """掃 reports/ 目錄列近 7 日 .pptx 檔 + 即時跑 audit(如已啟用)""" import os + import time reports_dir = 'reports' files = [] error = None @@ -377,18 +376,21 @@ def ppt_audit_history(): if not os.path.isdir(reports_dir): error = f'{reports_dir} 目錄不存在' else: - cutoff = __import__('time').time() - 7 * 86400 + cutoff = time.time() - 7 * 86400 for f in os.listdir(reports_dir): if not f.lower().endswith('.pptx'): continue full = os.path.join(reports_dir, f) + # symlink 防護:reports/ 內不接受 symlink,避免目錄逃逸(Critic MEDIUM #2) + if os.path.islink(full): + continue try: mtime = os.path.getmtime(full) if mtime >= cutoff: files.append({ 'name': f, 'size_kb': round(os.path.getsize(full) / 1024, 1), - 'mtime': __import__('datetime').datetime.fromtimestamp(mtime).strftime('%Y-%m-%d %H:%M'), + 'mtime': datetime.fromtimestamp(mtime).strftime('%Y-%m-%d %H:%M'), 'mtime_ts': mtime, }) except OSError: