From 14c5349b696856d492c3f1293735f41191b3e285 Mon Sep 17 00:00:00 2001 From: OoO Date: Tue, 12 May 2026 23:13:20 +0800 Subject: [PATCH] =?UTF-8?q?=E8=A3=9C=E9=BD=8A=20AI=20=E8=A7=80=E6=B8=AC?= =?UTF-8?q?=E8=A1=A8=20ORM=20=E8=88=87=20embedding=20=E7=B0=BD=E5=90=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app.py | 12 +- config.py | 2 +- database/ai_models.py | 174 +++++++++++++++++- database/manager.py | 14 +- ..._add_embedding_signature_to_rag_tables.sql | 40 ++++ routes/openclaw_bot_routes.py | 12 +- scheduler.py | 1 - services/notification_manager.py | 3 +- services/openclaw_learning_service.py | 20 +- services/rag_service.py | 7 + tests/test_ai_insight_embedding_bridge.py | 41 +++++ tests/test_ai_observability_models.py | 73 ++++++++ tests/test_rag_service.py | 29 +++ 13 files changed, 410 insertions(+), 18 deletions(-) create mode 100644 migrations/034_add_embedding_signature_to_rag_tables.sql create mode 100644 tests/test_ai_observability_models.py diff --git a/app.py b/app.py index 4e8d405..3988cb8 100644 --- a/app.py +++ b/app.py @@ -9,14 +9,9 @@ import os import sys import time import threading -import math import json -import hashlib import shutil -import zipfile import re -import io # V-New: 用於 Excel 匯出 -import traceback # V-Fix: 用於錯誤追蹤 from datetime import datetime, timedelta, timezone # ================= 🔧 1. 環境與路徑鎖定 ================= @@ -70,7 +65,7 @@ except ImportError as e: # ================= 🔧 3. 系統核心配置 ================= # 從 config.py 匯入必要的設定 -from config import EXCEL_EXPORT_DIR, DATABASE_TYPE, validate_critical_config +from config import SYSTEM_VERSION, validate_critical_config sys_log = SystemLogger("Web_Server").get_logger() @@ -100,7 +95,10 @@ except Exception as e: # 🚩 2026-05-12 V10.92: Frontend V3 responsive QA + daily_sales/edm fixes # 🚩 2026-05-12 V10.93: Growth analysis mobile KPI readability polish # 🚩 2026-05-12 V10.94: Market intelligence seed writer CLI skeleton -SYSTEM_VERSION = "V10.94" +# 🚩 2026-05-12 V10.96: Market intelligence seed transaction preview +# 🚩 2026-05-12 V10.97: Market intelligence read-only DB schema probe +# 🚩 2026-05-12 V10.98: Market intelligence platform seed DB diff probe +# SYSTEM_VERSION 單一來源於 config.py。 # ========================================== # 🔒 SQL Injection 防護函數 diff --git a/config.py b/config.py index d2257ca..0ab1b33 100644 --- a/config.py +++ b/config.py @@ -326,7 +326,7 @@ YOUTUBE_API_KEY = os.getenv('YOUTUBE_API_KEY', '') # ========================================== # 系統版本與路徑 # ========================================== -SYSTEM_VERSION = "V10.94" +SYSTEM_VERSION = "V10.98" LOG_FILE_PATH = os.path.join(BASE_DIR, 'logs/system.log') public_url = PUBLIC_URL # 用於模板顯示 diff --git a/database/ai_models.py b/database/ai_models.py index 112dc49..ea9d02e 100644 --- a/database/ai_models.py +++ b/database/ai_models.py @@ -1,8 +1,36 @@ # AI history and template models -from sqlalchemy import Column, Integer, String, DateTime, Text, Boolean, Float +from sqlalchemy import BigInteger, Column, DateTime, Integer, Numeric, String, Text, Boolean, Float +from sqlalchemy.dialects import postgresql +from sqlalchemy.ext.compiler import compiles +from sqlalchemy.types import JSON, UserDefinedType from database.models import Base from datetime import datetime + +class Vector(UserDefinedType): + """pgvector column with a SQLite-safe fallback for local metadata tests.""" + + cache_ok = True + + def __init__(self, dimensions): + self.dimensions = dimensions + + def get_col_spec(self, **kw): + return f"VECTOR({self.dimensions})" + + +@compiles(Vector, "sqlite") +def _compile_vector_sqlite(type_, compiler, **kw): + return "TEXT" + + +def _jsonb_type(): + return JSON().with_variant(postgresql.JSONB, "postgresql") + + +def _bigint_array_type(): + return Text().with_variant(postgresql.ARRAY(BigInteger), "postgresql") + class AIGenerationHistory(Base): """ AI generation history tracking @@ -151,6 +179,150 @@ class AIInsight(Base): } +class AICall(Base): + """ai_calls unified LLM call telemetry table (migration 024).""" + + __tablename__ = 'ai_calls' + __table_args__ = {'extend_existing': True} + + id = Column(BigInteger, primary_key=True, autoincrement=True) + called_at = Column(DateTime(timezone=True), default=datetime.now, nullable=False) + caller = Column(String(64), nullable=False) + provider = Column(String(32), nullable=False) + model = Column(String(128), nullable=False) + input_tokens = Column(Integer, default=0, nullable=False) + output_tokens = Column(Integer, default=0, nullable=False) + duration_ms = Column(Integer) + status = Column(String(16), nullable=False) + fallback_to = Column(String(64)) + cost_usd = Column(Numeric(10, 6), default=0, nullable=False) + cache_hit = Column(Boolean, default=False, nullable=False) + rag_hit = Column(Boolean, default=False, nullable=False) + request_id = Column(String(64)) + error = Column(Text) + meta = Column(_jsonb_type()) + + +class MCPCall(Base): + """mcp_calls MCP server call telemetry table (migration 025).""" + + __tablename__ = 'mcp_calls' + __table_args__ = {'extend_existing': True} + + id = Column(BigInteger, primary_key=True, autoincrement=True) + called_at = Column(DateTime(timezone=True), default=datetime.now, nullable=False) + caller = Column(String(64), nullable=False) + server = Column(String(64), nullable=False) + tool = Column(String(128), nullable=False) + input_args = Column(_jsonb_type()) + output_size = Column(Integer) + duration_ms = Column(Integer) + status = Column(String(16), nullable=False) + error = Column(Text) + cost_usd = Column(Numeric(10, 6), default=0, nullable=False) + cache_hit = Column(Boolean, default=False, nullable=False) + request_id = Column(String(64)) + insight_id = Column(BigInteger) + + +class AICallBudget(Base): + """ai_call_budgets budget guardrail table (migration 025).""" + + __tablename__ = 'ai_call_budgets' + __table_args__ = {'extend_existing': True} + + id = Column(Integer, primary_key=True, autoincrement=True) + period = Column(String(16), nullable=False) + provider = Column(String(32)) + budget_usd = Column(Numeric(10, 2), nullable=False) + alert_pct = Column(Integer, default=80, nullable=False) + updated_at = Column(DateTime(timezone=True), default=datetime.now) + + +class RAGQueryLog(Base): + """rag_query_log RAG recall telemetry table (migration 027).""" + + __tablename__ = 'rag_query_log' + __table_args__ = {'extend_existing': True} + + id = Column(BigInteger, primary_key=True, autoincrement=True) + queried_at = Column(DateTime(timezone=True), default=datetime.now, nullable=False) + caller = Column(String(64), nullable=False) + query_text = Column(Text, nullable=False) + query_embedding = Column(Vector(1024)) + embedding_signature = Column(String(64)) + top_k = Column(Integer, default=5, nullable=False) + threshold = Column(Numeric(4, 3), default=0.85, nullable=False) + hit_count = Column(Integer, default=0, nullable=False) + used_results = Column(_bigint_array_type()) + saved_call = Column(Boolean, default=False, nullable=False) + feedback_score = Column(Integer) + request_id = Column(String(64)) + + +class LearningEpisode(Base): + """learning_episodes PromotionGate staging table (migration 028).""" + + __tablename__ = 'learning_episodes' + __table_args__ = {'extend_existing': True} + + id = Column(BigInteger, primary_key=True, autoincrement=True) + created_at = Column(DateTime(timezone=True), default=datetime.now, nullable=False) + episode_type = Column(String(32), nullable=False) + source_table = Column(String(32)) + source_id = Column(BigInteger) + distilled_text = Column(Text, nullable=False) + embedding = Column(Vector(1024)) + embedding_signature = Column(String(64)) + quality_score = Column(Numeric(4, 3), default=0.0, nullable=False) + weight = Column(Numeric(4, 3), default=0.5, nullable=False) + promotion_status = Column(String(32), default='pending', nullable=False) + insight_id = Column(BigInteger) + rejected_reason = Column(Text) + human_approver = Column(String(64)) + reviewed_at = Column(DateTime(timezone=True)) + + +class HostHealthProbe(Base): + """host_health_probes Ollama failover health history table (migration 029).""" + + __tablename__ = 'host_health_probes' + __table_args__ = {'extend_existing': True} + + id = Column(BigInteger, primary_key=True, autoincrement=True) + probed_at = Column(DateTime(timezone=True), default=datetime.now, nullable=False) + host_label = Column(String(64), nullable=False) + host_url = Column(String(256), nullable=False) + healthy = Column(Boolean, nullable=False) + unhealthy_mark = Column(Boolean, default=False, nullable=False) + models_count = Column(Integer, default=0) + response_ms = Column(Integer) + error_msg = Column(Text) + + +class PPTAuditResult(Base): + """ppt_audit_results PPT vision audit history table (migration 030).""" + + __tablename__ = 'ppt_audit_results' + __table_args__ = {'extend_existing': True} + + id = Column(BigInteger, primary_key=True, autoincrement=True) + audited_at = Column(DateTime(timezone=True), default=datetime.now, nullable=False) + pptx_filename = Column(String(256), nullable=False) + pptx_size_kb = Column(Integer) + pptx_mtime = Column(DateTime(timezone=True)) + vision_enabled = Column(Boolean, nullable=False) + audit_status = Column(String(32), nullable=False) + issues_count = Column(Integer, default=0) + issues_found = Column(_jsonb_type()) + confidence = Column(Numeric(4, 3)) + duration_ms = Column(Integer) + error_msg = Column(Text) + reviewer_notes = Column(Text) + + __all__ = [ "AIGenerationHistory", "AIPromptTemplate", "AIUsageTracking", "AIInsight", + "AICall", "MCPCall", "AICallBudget", "RAGQueryLog", "LearningEpisode", + "HostHealthProbe", "PPTAuditResult", ] diff --git a/database/manager.py b/database/manager.py index 182180f..976f6f5 100644 --- a/database/manager.py +++ b/database/manager.py @@ -10,7 +10,19 @@ from .user_models import User, LoginHistory # noqa: F401 - 必須在 trend_mode from .edm_models import PromoProduct # V-Fix: 確保 EDM 模型被註冊,以便自動建表 from .trend_models import TrendRecord, TrendKeyword, TrendAnalysis, WebSearchCache, TelegramUser # noqa: F401 - 趨勢資料表 from .permission_models import Permission, UserPermission # noqa: F401 - 確保權限表被 Base.metadata 管理 -from .ai_models import AIGenerationHistory, AIPromptTemplate, AIUsageTracking, AIInsight # noqa: F401 - AI history/template 表 +from .ai_models import ( # noqa: F401 - AI history/template + v5 telemetry 表 + AIGenerationHistory, + AIPromptTemplate, + AIUsageTracking, + AIInsight, + AICall, + MCPCall, + AICallBudget, + RAGQueryLog, + LearningEpisode, + HostHealthProbe, + PPTAuditResult, +) from .autoheal_models import ( # noqa: F401 - ADR-013 AIOps 自動修復表 AgentContext, ActionPlan, diff --git a/migrations/034_add_embedding_signature_to_rag_tables.sql b/migrations/034_add_embedding_signature_to_rag_tables.sql new file mode 100644 index 0000000..bc3d194 --- /dev/null +++ b/migrations/034_add_embedding_signature_to_rag_tables.sql @@ -0,0 +1,40 @@ +-- ============================================================================= +-- Migration 034: rag_query_log / learning_episodes embedding_signature +-- Operation Ollama-First v5.0 — BGE-M3 cross-table guardrail +-- 日期: 2026-05-12 台北 +-- ============================================================================= +-- 背景: +-- migration 026 已替 ai_insights.embedding 加 embedding_signature,但 +-- rag_query_log.query_embedding 與 learning_episodes.embedding 仍缺同一護欄。 +-- 一旦 bge-m3:latest / normalize / dim 漂移,跨表 cosine 會退化且無法定位。 +-- +-- 設計: +-- 1. 新增 nullable VARCHAR(64),metadata-only,不回填、不重寫大表。 +-- 2. 新寫入由 services/rag_service.py 與 services/openclaw_learning_service.py 補簽名。 +-- 3. 既有 NULL 代表歷史資料,查詢端可保守放行或在後續 backfill 後收緊。 +-- ============================================================================= + +ALTER TABLE IF EXISTS rag_query_log + ADD COLUMN IF NOT EXISTS embedding_signature VARCHAR(64); + +COMMENT ON COLUMN rag_query_log.embedding_signature IS + 'BGE-M3 一致性簽名;對應 query_embedding,NULL = 歷史資料或 embedding 失敗'; + +CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_rag_query_log_embedding_signature + ON rag_query_log (embedding_signature) + WHERE query_embedding IS NOT NULL AND embedding_signature IS NOT NULL; + +ALTER TABLE IF EXISTS learning_episodes + ADD COLUMN IF NOT EXISTS embedding_signature VARCHAR(64); + +COMMENT ON COLUMN learning_episodes.embedding_signature IS + 'BGE-M3 一致性簽名;對應 embedding,NULL = 尚未 embedding 或歷史資料'; + +CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_le_embedding_signature + ON learning_episodes (embedding_signature) + WHERE embedding IS NOT NULL AND embedding_signature IS NOT NULL; + +DO $$ +BEGIN + RAISE NOTICE 'Migration 034 done: embedding_signature added to rag_query_log and learning_episodes'; +END $$; diff --git a/routes/openclaw_bot_routes.py b/routes/openclaw_bot_routes.py index 92a6944..e068a6c 100644 --- a/routes/openclaw_bot_routes.py +++ b/routes/openclaw_bot_routes.py @@ -7204,8 +7204,10 @@ def handle_cmd(cmd, arg, chat_id, reply_to): png = gen_trend_chart(data_points=data, title=chart_title) if png: send_photo(chat_id, png, caption=f'📈 {period_label} 趨勢走勢圖') - try: os.unlink(png) - except Exception: pass + try: + os.unlink(png) + except Exception: + sys_log.exception('[OpenClawBot] trend chart temp cleanup failed: %s', png) except Exception as _te: sys_log.warning(f'[OpenClawBot] trend chart error: {_te}') else: @@ -7217,8 +7219,10 @@ def handle_cmd(cmd, arg, chat_id, reply_to): if png: unit = '月' if granularity == 'monthly' else '週' send_photo(chat_id, png, caption=f'📊 {period_label} 業績走勢(按{unit})') - try: os.unlink(png) - except Exception: pass + try: + os.unlink(png) + except Exception: + sys_log.exception('[OpenClawBot] trend agg chart temp cleanup failed: %s', png) except Exception as _te: sys_log.warning(f'[OpenClawBot] trend agg chart error: {_te}') diff --git a/scheduler.py b/scheduler.py index e6802e3..032bd01 100644 --- a/scheduler.py +++ b/scheduler.py @@ -6,7 +6,6 @@ import logging import json import threading import requests -import schedule from datetime import datetime, timedelta, timezone from selenium import webdriver from selenium.webdriver.chrome.options import Options diff --git a/services/notification_manager.py b/services/notification_manager.py index d473057..84651e1 100644 --- a/services/notification_manager.py +++ b/services/notification_manager.py @@ -161,7 +161,8 @@ class NotificationManager: if os.path.exists(url_config_path): with open(url_config_path, 'r') as f: self.public_url = json.load(f).get('public_url') - except: pass + except Exception: + self.logger.exception("[Notification] 讀取 public_url 設定失敗") def _send_line_messages(self, messages, image_url=None): # 檢查 LINE 是否啟用 diff --git a/services/openclaw_learning_service.py b/services/openclaw_learning_service.py index 2001491..142ac45 100644 --- a/services/openclaw_learning_service.py +++ b/services/openclaw_learning_service.py @@ -22,11 +22,16 @@ EMBED_POLL_INTERVAL_SEC = 60 # worker 輪詢間隔 EMBED_BATCH_SIZE = 10 # 單次最多處理筆數 EMBED_MAX_ATTEMPTS = 5 # 超過則標記 failed DECAY_RATE = 0.01 # ADR-005:半衰期約 70 天 +_EMBEDDING_TARGET_TABLES = {"ai_insights", "learning_episodes"} def _enqueue_embedding(target_table: str, target_id: int, text_content: str, model: str = "bge-m3:latest") -> bool: """將待 embed 項目寫入 DB retry queue(持久化)""" + if target_table not in _EMBEDDING_TARGET_TABLES: + sys_log.warning(f"[OCLearn] enqueue embedding 拒絕未知 target_table={target_table}") + return False + session = get_session() try: session.execute( @@ -112,10 +117,21 @@ def _process_one_embedding(row_id: int, target_table: str, target_id: int, if not vec: raise RuntimeError("embedding 回傳空值") + if target_table not in _EMBEDDING_TARGET_TABLES: + raise RuntimeError(f"不允許的 embedding target_table: {target_table}") + + from services.rag_service import get_embedding_signature + vec_str = str(vec) + embedding_signature = get_embedding_signature(model=model, dim=len(vec)) session.execute( - text(f"UPDATE {target_table} SET embedding = :vec WHERE id = :id"), - {"vec": vec_str, "id": target_id}, + text(f""" + UPDATE {target_table} + SET embedding = :vec, + embedding_signature = :sig + WHERE id = :id + """), + {"vec": vec_str, "sig": embedding_signature, "id": target_id}, ) session.execute( text(""" diff --git a/services/rag_service.py b/services/rag_service.py index f8558c5..cb702a1 100644 --- a/services/rag_service.py +++ b/services/rag_service.py @@ -592,6 +592,10 @@ class RAGService: used_results = [int(h['id']) for h in hits if h.get('id')] embedding_str = str(query_vec) if query_vec else None + embedding_signature = ( + get_embedding_signature(dim=len(query_vec)) + if query_vec else None + ) session = get_session() try: @@ -599,12 +603,14 @@ class RAGService: sa_text(""" INSERT INTO rag_query_log ( caller, query_text, query_embedding, + embedding_signature, top_k, threshold, hit_count, used_results, saved_call, request_id ) VALUES ( :caller, :query_text, CAST(:embedding AS vector), + :embedding_signature, :top_k, :threshold, :hit_count, CAST(:used_results AS BIGINT[]), :saved_call, :request_id @@ -614,6 +620,7 @@ class RAGService: 'caller': (caller or 'unknown')[:64], 'query_text': safe_text, 'embedding': embedding_str, + 'embedding_signature': embedding_signature, 'top_k': int(top_k), 'threshold': round(float(threshold), 3), 'hit_count': len(hits), diff --git a/tests/test_ai_insight_embedding_bridge.py b/tests/test_ai_insight_embedding_bridge.py index 7efc8f4..e82be0a 100644 --- a/tests/test_ai_insight_embedding_bridge.py +++ b/tests/test_ai_insight_embedding_bridge.py @@ -57,3 +57,44 @@ def test_enqueue_missing_insight_embeddings_queues_rows(monkeypatch): assert result == {"scanned": 2, "enqueued": 2, "status": "ok"} assert calls[0] == (1, "mcp_cache", "市場資料", None) + + +def test_process_one_embedding_writes_signature(monkeypatch): + import services.openclaw_learning_service as learning + from services.rag_service import get_embedding_signature + + executed = [] + + class Session: + def execute(self, stmt, params=None): + executed.append((str(stmt), params or {})) + + def commit(self): + pass + + def rollback(self): + pass + + def close(self): + pass + + monkeypatch.setattr(learning, "get_session", lambda: Session()) + monkeypatch.setattr( + learning.ollama_service, + "generate_embedding", + lambda text, model="bge-m3:latest": [0.1] * 1024, + ) + + ok = learning._process_one_embedding( + row_id=7, + target_table="learning_episodes", + target_id=42, + text_content="測試內容", + model="bge-m3:latest", + ) + + assert ok is True + target_updates = [item for item in executed if "UPDATE learning_episodes" in item[0]] + assert target_updates + assert "embedding_signature" in target_updates[0][0] + assert target_updates[0][1]["sig"] == get_embedding_signature(model="bge-m3:latest", dim=1024) diff --git a/tests/test_ai_observability_models.py b/tests/test_ai_observability_models.py new file mode 100644 index 0000000..61be18e --- /dev/null +++ b/tests/test_ai_observability_models.py @@ -0,0 +1,73 @@ +from sqlalchemy import create_engine + +from database.manager import Base + + +AI_OBSERVABILITY_TABLES = { + "ai_calls": { + "caller", + "provider", + "model", + "status", + "request_id", + "meta", + }, + "mcp_calls": { + "caller", + "server", + "tool", + "status", + "request_id", + "input_args", + }, + "ai_call_budgets": { + "period", + "provider", + "budget_usd", + "alert_pct", + }, + "rag_query_log": { + "caller", + "query_text", + "query_embedding", + "embedding_signature", + "used_results", + "saved_call", + }, + "learning_episodes": { + "episode_type", + "source_table", + "distilled_text", + "embedding", + "embedding_signature", + "promotion_status", + }, + "host_health_probes": { + "host_label", + "host_url", + "healthy", + "unhealthy_mark", + }, + "ppt_audit_results": { + "pptx_filename", + "vision_enabled", + "audit_status", + "issues_found", + }, +} + + +def test_ai_observability_tables_registered_in_metadata(): + metadata_tables = set(Base.metadata.tables) + + assert AI_OBSERVABILITY_TABLES.keys() <= metadata_tables + for table_name, expected_columns in AI_OBSERVABILITY_TABLES.items(): + actual_columns = set(Base.metadata.tables[table_name].columns.keys()) + assert expected_columns <= actual_columns + + +def test_ai_observability_tables_compile_for_sqlite_metadata_init(): + engine = create_engine("sqlite:///:memory:") + + for table_name in AI_OBSERVABILITY_TABLES: + Base.metadata.tables[table_name].create(engine, checkfirst=True) diff --git a/tests/test_rag_service.py b/tests/test_rag_service.py index 416c2f4..dc70381 100644 --- a/tests/test_rag_service.py +++ b/tests/test_rag_service.py @@ -269,6 +269,35 @@ class TestFireAndForgetLog: # 等 daemon thread 跑完 time.sleep(0.2) + def test_log_write_includes_embedding_signature(self, monkeypatch): + """rag_query_log 寫入 query_embedding 時同步保存 BGE-M3 signature。""" + from services import rag_service as rs + + captured = {} + fake_session = MagicMock() + + def _exec(stmt, params): + captured["sql"] = str(stmt) + captured["params"] = params + return MagicMock() + + fake_session.execute.side_effect = _exec + monkeypatch.setattr('database.manager.get_session', lambda: fake_session) + + rs.rag_service._write_log( + caller='openclaw_qa', + text='query', + query_vec=_fake_embedding(), + top_k=5, + threshold=0.85, + hits=[{'id': 101}], + request_id='req-1', + ) + + assert "embedding_signature" in captured["sql"] + assert captured["params"]["embedding_signature"] == rs.get_embedding_signature() + fake_session.commit.assert_called_once() + def test_embedding_failure_falls_back_to_empty(self, rag_enabled, monkeypatch): """embedding 回 [] → 不查 DB → 回空 hits 給 caller fallback LLM。""" from services import rag_service as rs