補齊 AI 觀測表 ORM 與 embedding 簽名
All checks were successful
CD Pipeline / deploy (push) Successful in 56s

This commit is contained in:
OoO
2026-05-12 23:13:20 +08:00
parent fb145d6425
commit 14c5349b69
13 changed files with 410 additions and 18 deletions

12
app.py
View File

@@ -9,14 +9,9 @@ import os
import sys
import time
import threading
import math
import json
import hashlib
import shutil
import zipfile
import re
import io # V-New: 用於 Excel 匯出
import traceback # V-Fix: 用於錯誤追蹤
from datetime import datetime, timedelta, timezone
# ================= 🔧 1. 環境與路徑鎖定 =================
@@ -70,7 +65,7 @@ except ImportError as e:
# ================= 🔧 3. 系統核心配置 =================
# 從 config.py 匯入必要的設定
from config import EXCEL_EXPORT_DIR, DATABASE_TYPE, validate_critical_config
from config import SYSTEM_VERSION, validate_critical_config
sys_log = SystemLogger("Web_Server").get_logger()
@@ -100,7 +95,10 @@ except Exception as e:
# 🚩 2026-05-12 V10.92: Frontend V3 responsive QA + daily_sales/edm fixes
# 🚩 2026-05-12 V10.93: Growth analysis mobile KPI readability polish
# 🚩 2026-05-12 V10.94: Market intelligence seed writer CLI skeleton
SYSTEM_VERSION = "V10.94"
# 🚩 2026-05-12 V10.96: Market intelligence seed transaction preview
# 🚩 2026-05-12 V10.97: Market intelligence read-only DB schema probe
# 🚩 2026-05-12 V10.98: Market intelligence platform seed DB diff probe
# SYSTEM_VERSION 單一來源於 config.py。
# ==========================================
# 🔒 SQL Injection 防護函數

View File

@@ -326,7 +326,7 @@ YOUTUBE_API_KEY = os.getenv('YOUTUBE_API_KEY', '')
# ==========================================
# 系統版本與路徑
# ==========================================
SYSTEM_VERSION = "V10.94"
SYSTEM_VERSION = "V10.98"
LOG_FILE_PATH = os.path.join(BASE_DIR, 'logs/system.log')
public_url = PUBLIC_URL # 用於模板顯示

View File

@@ -1,8 +1,36 @@
# AI history and template models
from sqlalchemy import Column, Integer, String, DateTime, Text, Boolean, Float
from sqlalchemy import BigInteger, Column, DateTime, Integer, Numeric, String, Text, Boolean, Float
from sqlalchemy.dialects import postgresql
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.types import JSON, UserDefinedType
from database.models import Base
from datetime import datetime
class Vector(UserDefinedType):
"""pgvector column with a SQLite-safe fallback for local metadata tests."""
cache_ok = True
def __init__(self, dimensions):
self.dimensions = dimensions
def get_col_spec(self, **kw):
return f"VECTOR({self.dimensions})"
@compiles(Vector, "sqlite")
def _compile_vector_sqlite(type_, compiler, **kw):
return "TEXT"
def _jsonb_type():
return JSON().with_variant(postgresql.JSONB, "postgresql")
def _bigint_array_type():
return Text().with_variant(postgresql.ARRAY(BigInteger), "postgresql")
class AIGenerationHistory(Base):
"""
AI generation history tracking
@@ -151,6 +179,150 @@ class AIInsight(Base):
}
class AICall(Base):
"""ai_calls unified LLM call telemetry table (migration 024)."""
__tablename__ = 'ai_calls'
__table_args__ = {'extend_existing': True}
id = Column(BigInteger, primary_key=True, autoincrement=True)
called_at = Column(DateTime(timezone=True), default=datetime.now, nullable=False)
caller = Column(String(64), nullable=False)
provider = Column(String(32), nullable=False)
model = Column(String(128), nullable=False)
input_tokens = Column(Integer, default=0, nullable=False)
output_tokens = Column(Integer, default=0, nullable=False)
duration_ms = Column(Integer)
status = Column(String(16), nullable=False)
fallback_to = Column(String(64))
cost_usd = Column(Numeric(10, 6), default=0, nullable=False)
cache_hit = Column(Boolean, default=False, nullable=False)
rag_hit = Column(Boolean, default=False, nullable=False)
request_id = Column(String(64))
error = Column(Text)
meta = Column(_jsonb_type())
class MCPCall(Base):
"""mcp_calls MCP server call telemetry table (migration 025)."""
__tablename__ = 'mcp_calls'
__table_args__ = {'extend_existing': True}
id = Column(BigInteger, primary_key=True, autoincrement=True)
called_at = Column(DateTime(timezone=True), default=datetime.now, nullable=False)
caller = Column(String(64), nullable=False)
server = Column(String(64), nullable=False)
tool = Column(String(128), nullable=False)
input_args = Column(_jsonb_type())
output_size = Column(Integer)
duration_ms = Column(Integer)
status = Column(String(16), nullable=False)
error = Column(Text)
cost_usd = Column(Numeric(10, 6), default=0, nullable=False)
cache_hit = Column(Boolean, default=False, nullable=False)
request_id = Column(String(64))
insight_id = Column(BigInteger)
class AICallBudget(Base):
"""ai_call_budgets budget guardrail table (migration 025)."""
__tablename__ = 'ai_call_budgets'
__table_args__ = {'extend_existing': True}
id = Column(Integer, primary_key=True, autoincrement=True)
period = Column(String(16), nullable=False)
provider = Column(String(32))
budget_usd = Column(Numeric(10, 2), nullable=False)
alert_pct = Column(Integer, default=80, nullable=False)
updated_at = Column(DateTime(timezone=True), default=datetime.now)
class RAGQueryLog(Base):
"""rag_query_log RAG recall telemetry table (migration 027)."""
__tablename__ = 'rag_query_log'
__table_args__ = {'extend_existing': True}
id = Column(BigInteger, primary_key=True, autoincrement=True)
queried_at = Column(DateTime(timezone=True), default=datetime.now, nullable=False)
caller = Column(String(64), nullable=False)
query_text = Column(Text, nullable=False)
query_embedding = Column(Vector(1024))
embedding_signature = Column(String(64))
top_k = Column(Integer, default=5, nullable=False)
threshold = Column(Numeric(4, 3), default=0.85, nullable=False)
hit_count = Column(Integer, default=0, nullable=False)
used_results = Column(_bigint_array_type())
saved_call = Column(Boolean, default=False, nullable=False)
feedback_score = Column(Integer)
request_id = Column(String(64))
class LearningEpisode(Base):
"""learning_episodes PromotionGate staging table (migration 028)."""
__tablename__ = 'learning_episodes'
__table_args__ = {'extend_existing': True}
id = Column(BigInteger, primary_key=True, autoincrement=True)
created_at = Column(DateTime(timezone=True), default=datetime.now, nullable=False)
episode_type = Column(String(32), nullable=False)
source_table = Column(String(32))
source_id = Column(BigInteger)
distilled_text = Column(Text, nullable=False)
embedding = Column(Vector(1024))
embedding_signature = Column(String(64))
quality_score = Column(Numeric(4, 3), default=0.0, nullable=False)
weight = Column(Numeric(4, 3), default=0.5, nullable=False)
promotion_status = Column(String(32), default='pending', nullable=False)
insight_id = Column(BigInteger)
rejected_reason = Column(Text)
human_approver = Column(String(64))
reviewed_at = Column(DateTime(timezone=True))
class HostHealthProbe(Base):
"""host_health_probes Ollama failover health history table (migration 029)."""
__tablename__ = 'host_health_probes'
__table_args__ = {'extend_existing': True}
id = Column(BigInteger, primary_key=True, autoincrement=True)
probed_at = Column(DateTime(timezone=True), default=datetime.now, nullable=False)
host_label = Column(String(64), nullable=False)
host_url = Column(String(256), nullable=False)
healthy = Column(Boolean, nullable=False)
unhealthy_mark = Column(Boolean, default=False, nullable=False)
models_count = Column(Integer, default=0)
response_ms = Column(Integer)
error_msg = Column(Text)
class PPTAuditResult(Base):
"""ppt_audit_results PPT vision audit history table (migration 030)."""
__tablename__ = 'ppt_audit_results'
__table_args__ = {'extend_existing': True}
id = Column(BigInteger, primary_key=True, autoincrement=True)
audited_at = Column(DateTime(timezone=True), default=datetime.now, nullable=False)
pptx_filename = Column(String(256), nullable=False)
pptx_size_kb = Column(Integer)
pptx_mtime = Column(DateTime(timezone=True))
vision_enabled = Column(Boolean, nullable=False)
audit_status = Column(String(32), nullable=False)
issues_count = Column(Integer, default=0)
issues_found = Column(_jsonb_type())
confidence = Column(Numeric(4, 3))
duration_ms = Column(Integer)
error_msg = Column(Text)
reviewer_notes = Column(Text)
__all__ = [
"AIGenerationHistory", "AIPromptTemplate", "AIUsageTracking", "AIInsight",
"AICall", "MCPCall", "AICallBudget", "RAGQueryLog", "LearningEpisode",
"HostHealthProbe", "PPTAuditResult",
]

View File

@@ -10,7 +10,19 @@ from .user_models import User, LoginHistory # noqa: F401 - 必須在 trend_mode
from .edm_models import PromoProduct # V-Fix: 確保 EDM 模型被註冊,以便自動建表
from .trend_models import TrendRecord, TrendKeyword, TrendAnalysis, WebSearchCache, TelegramUser # noqa: F401 - 趨勢資料表
from .permission_models import Permission, UserPermission # noqa: F401 - 確保權限表被 Base.metadata 管理
from .ai_models import AIGenerationHistory, AIPromptTemplate, AIUsageTracking, AIInsight # noqa: F401 - AI history/template 表
from .ai_models import ( # noqa: F401 - AI history/template + v5 telemetry
AIGenerationHistory,
AIPromptTemplate,
AIUsageTracking,
AIInsight,
AICall,
MCPCall,
AICallBudget,
RAGQueryLog,
LearningEpisode,
HostHealthProbe,
PPTAuditResult,
)
from .autoheal_models import ( # noqa: F401 - ADR-013 AIOps 自動修復表
AgentContext,
ActionPlan,

View File

@@ -0,0 +1,40 @@
-- =============================================================================
-- Migration 034: rag_query_log / learning_episodes embedding_signature
-- Operation Ollama-First v5.0 — BGE-M3 cross-table guardrail
-- 日期: 2026-05-12 台北
-- =============================================================================
-- 背景:
-- migration 026 已替 ai_insights.embedding 加 embedding_signature
-- rag_query_log.query_embedding 與 learning_episodes.embedding 仍缺同一護欄。
-- 一旦 bge-m3:latest / normalize / dim 漂移,跨表 cosine 會退化且無法定位。
--
-- 設計:
-- 1. 新增 nullable VARCHAR(64)metadata-only不回填、不重寫大表。
-- 2. 新寫入由 services/rag_service.py 與 services/openclaw_learning_service.py 補簽名。
-- 3. 既有 NULL 代表歷史資料,查詢端可保守放行或在後續 backfill 後收緊。
-- =============================================================================
ALTER TABLE IF EXISTS rag_query_log
ADD COLUMN IF NOT EXISTS embedding_signature VARCHAR(64);
COMMENT ON COLUMN rag_query_log.embedding_signature IS
'BGE-M3 一致性簽名;對應 query_embeddingNULL = 歷史資料或 embedding 失敗';
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_rag_query_log_embedding_signature
ON rag_query_log (embedding_signature)
WHERE query_embedding IS NOT NULL AND embedding_signature IS NOT NULL;
ALTER TABLE IF EXISTS learning_episodes
ADD COLUMN IF NOT EXISTS embedding_signature VARCHAR(64);
COMMENT ON COLUMN learning_episodes.embedding_signature IS
'BGE-M3 一致性簽名;對應 embeddingNULL = 尚未 embedding 或歷史資料';
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_le_embedding_signature
ON learning_episodes (embedding_signature)
WHERE embedding IS NOT NULL AND embedding_signature IS NOT NULL;
DO $$
BEGIN
RAISE NOTICE 'Migration 034 done: embedding_signature added to rag_query_log and learning_episodes';
END $$;

View File

@@ -7204,8 +7204,10 @@ def handle_cmd(cmd, arg, chat_id, reply_to):
png = gen_trend_chart(data_points=data, title=chart_title)
if png:
send_photo(chat_id, png, caption=f'📈 {period_label} 趨勢走勢圖')
try: os.unlink(png)
except Exception: pass
try:
os.unlink(png)
except Exception:
sys_log.exception('[OpenClawBot] trend chart temp cleanup failed: %s', png)
except Exception as _te:
sys_log.warning(f'[OpenClawBot] trend chart error: {_te}')
else:
@@ -7217,8 +7219,10 @@ def handle_cmd(cmd, arg, chat_id, reply_to):
if png:
unit = '' if granularity == 'monthly' else ''
send_photo(chat_id, png, caption=f'📊 {period_label} 業績走勢(按{unit}')
try: os.unlink(png)
except Exception: pass
try:
os.unlink(png)
except Exception:
sys_log.exception('[OpenClawBot] trend agg chart temp cleanup failed: %s', png)
except Exception as _te:
sys_log.warning(f'[OpenClawBot] trend agg chart error: {_te}')

View File

@@ -6,7 +6,6 @@ import logging
import json
import threading
import requests
import schedule
from datetime import datetime, timedelta, timezone
from selenium import webdriver
from selenium.webdriver.chrome.options import Options

View File

@@ -161,7 +161,8 @@ class NotificationManager:
if os.path.exists(url_config_path):
with open(url_config_path, 'r') as f:
self.public_url = json.load(f).get('public_url')
except: pass
except Exception:
self.logger.exception("[Notification] 讀取 public_url 設定失敗")
def _send_line_messages(self, messages, image_url=None):
# 檢查 LINE 是否啟用

View File

@@ -22,11 +22,16 @@ EMBED_POLL_INTERVAL_SEC = 60 # worker 輪詢間隔
EMBED_BATCH_SIZE = 10 # 單次最多處理筆數
EMBED_MAX_ATTEMPTS = 5 # 超過則標記 failed
DECAY_RATE = 0.01 # ADR-005半衰期約 70 天
_EMBEDDING_TARGET_TABLES = {"ai_insights", "learning_episodes"}
def _enqueue_embedding(target_table: str, target_id: int, text_content: str,
model: str = "bge-m3:latest") -> bool:
"""將待 embed 項目寫入 DB retry queue持久化"""
if target_table not in _EMBEDDING_TARGET_TABLES:
sys_log.warning(f"[OCLearn] enqueue embedding 拒絕未知 target_table={target_table}")
return False
session = get_session()
try:
session.execute(
@@ -112,10 +117,21 @@ def _process_one_embedding(row_id: int, target_table: str, target_id: int,
if not vec:
raise RuntimeError("embedding 回傳空值")
if target_table not in _EMBEDDING_TARGET_TABLES:
raise RuntimeError(f"不允許的 embedding target_table: {target_table}")
from services.rag_service import get_embedding_signature
vec_str = str(vec)
embedding_signature = get_embedding_signature(model=model, dim=len(vec))
session.execute(
text(f"UPDATE {target_table} SET embedding = :vec WHERE id = :id"),
{"vec": vec_str, "id": target_id},
text(f"""
UPDATE {target_table}
SET embedding = :vec,
embedding_signature = :sig
WHERE id = :id
"""),
{"vec": vec_str, "sig": embedding_signature, "id": target_id},
)
session.execute(
text("""

View File

@@ -592,6 +592,10 @@ class RAGService:
used_results = [int(h['id']) for h in hits if h.get('id')]
embedding_str = str(query_vec) if query_vec else None
embedding_signature = (
get_embedding_signature(dim=len(query_vec))
if query_vec else None
)
session = get_session()
try:
@@ -599,12 +603,14 @@ class RAGService:
sa_text("""
INSERT INTO rag_query_log (
caller, query_text, query_embedding,
embedding_signature,
top_k, threshold,
hit_count, used_results,
saved_call, request_id
) VALUES (
:caller, :query_text,
CAST(:embedding AS vector),
:embedding_signature,
:top_k, :threshold,
:hit_count, CAST(:used_results AS BIGINT[]),
:saved_call, :request_id
@@ -614,6 +620,7 @@ class RAGService:
'caller': (caller or 'unknown')[:64],
'query_text': safe_text,
'embedding': embedding_str,
'embedding_signature': embedding_signature,
'top_k': int(top_k),
'threshold': round(float(threshold), 3),
'hit_count': len(hits),

View File

@@ -57,3 +57,44 @@ def test_enqueue_missing_insight_embeddings_queues_rows(monkeypatch):
assert result == {"scanned": 2, "enqueued": 2, "status": "ok"}
assert calls[0] == (1, "mcp_cache", "市場資料", None)
def test_process_one_embedding_writes_signature(monkeypatch):
import services.openclaw_learning_service as learning
from services.rag_service import get_embedding_signature
executed = []
class Session:
def execute(self, stmt, params=None):
executed.append((str(stmt), params or {}))
def commit(self):
pass
def rollback(self):
pass
def close(self):
pass
monkeypatch.setattr(learning, "get_session", lambda: Session())
monkeypatch.setattr(
learning.ollama_service,
"generate_embedding",
lambda text, model="bge-m3:latest": [0.1] * 1024,
)
ok = learning._process_one_embedding(
row_id=7,
target_table="learning_episodes",
target_id=42,
text_content="測試內容",
model="bge-m3:latest",
)
assert ok is True
target_updates = [item for item in executed if "UPDATE learning_episodes" in item[0]]
assert target_updates
assert "embedding_signature" in target_updates[0][0]
assert target_updates[0][1]["sig"] == get_embedding_signature(model="bge-m3:latest", dim=1024)

View File

@@ -0,0 +1,73 @@
from sqlalchemy import create_engine
from database.manager import Base
AI_OBSERVABILITY_TABLES = {
"ai_calls": {
"caller",
"provider",
"model",
"status",
"request_id",
"meta",
},
"mcp_calls": {
"caller",
"server",
"tool",
"status",
"request_id",
"input_args",
},
"ai_call_budgets": {
"period",
"provider",
"budget_usd",
"alert_pct",
},
"rag_query_log": {
"caller",
"query_text",
"query_embedding",
"embedding_signature",
"used_results",
"saved_call",
},
"learning_episodes": {
"episode_type",
"source_table",
"distilled_text",
"embedding",
"embedding_signature",
"promotion_status",
},
"host_health_probes": {
"host_label",
"host_url",
"healthy",
"unhealthy_mark",
},
"ppt_audit_results": {
"pptx_filename",
"vision_enabled",
"audit_status",
"issues_found",
},
}
def test_ai_observability_tables_registered_in_metadata():
metadata_tables = set(Base.metadata.tables)
assert AI_OBSERVABILITY_TABLES.keys() <= metadata_tables
for table_name, expected_columns in AI_OBSERVABILITY_TABLES.items():
actual_columns = set(Base.metadata.tables[table_name].columns.keys())
assert expected_columns <= actual_columns
def test_ai_observability_tables_compile_for_sqlite_metadata_init():
engine = create_engine("sqlite:///:memory:")
for table_name in AI_OBSERVABILITY_TABLES:
Base.metadata.tables[table_name].create(engine, checkfirst=True)

View File

@@ -269,6 +269,35 @@ class TestFireAndForgetLog:
# 等 daemon thread 跑完
time.sleep(0.2)
def test_log_write_includes_embedding_signature(self, monkeypatch):
"""rag_query_log 寫入 query_embedding 時同步保存 BGE-M3 signature。"""
from services import rag_service as rs
captured = {}
fake_session = MagicMock()
def _exec(stmt, params):
captured["sql"] = str(stmt)
captured["params"] = params
return MagicMock()
fake_session.execute.side_effect = _exec
monkeypatch.setattr('database.manager.get_session', lambda: fake_session)
rs.rag_service._write_log(
caller='openclaw_qa',
text='query',
query_vec=_fake_embedding(),
top_k=5,
threshold=0.85,
hits=[{'id': 101}],
request_id='req-1',
)
assert "embedding_signature" in captured["sql"]
assert captured["params"]["embedding_signature"] == rs.get_embedding_signature()
fake_session.commit.assert_called_once()
def test_embedding_failure_falls_back_to_empty(self, rag_enabled, monkeypatch):
"""embedding 回 [] → 不查 DB → 回空 hits 給 caller fallback LLM。"""
from services import rag_service as rs