From 97c446303cef3451e20b533ec8ea336e2d03afc9 Mon Sep 17 00:00:00 2001 From: OoO Date: Mon, 4 May 2026 09:31:31 +0800 Subject: [PATCH] =?UTF-8?q?feat(p11.0):=20BGE-M3=20=E8=B7=A8=E4=B8=BB?= =?UTF-8?q?=E6=A9=9F=E4=B8=80=E8=87=B4=E6=80=A7=E9=A9=97=E8=AD=89=20+=20?= =?UTF-8?q?=E6=AF=8F=E9=80=B1=E6=97=A5=2004:30=20cron?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Operation Ollama-First v5.0 / Phase 11.0 收尾(ADR-033 護欄 #3 完整落地) services/rag_service.py 新增: - verify_embedding_consistency() — 跨三主機 BGE-M3 embedding 一致性驗證 測試文字「momo電商競品分析測試向量一致性檢查」分別呼叫 GCP Primary / Secondary / 111 三主機,計算兩兩 cosine 距離。 max_diff > 1e-4 視為不一致(模型版本漂移)→ logger.error。 - _cosine_distance() — 純 Python,不依賴 numpy - fail-safe:< 2 主機可達也回 ok=True(戰時部分主機暫斷不算錯) run_scheduler.py 新增: - run_embed_consistency_check task wrapper - schedule.every().sunday.at("04:30").do(...) — 每週一次足夠 (不需每次啟動驗證,過頻會打三主機 Ollama 浪費) 落地 ADR-033 護欄 #3 完整版: 簽名鎖定(migration 026 embedding_signature 欄位)✅ 既有 程式端簽名計算(rag_service.get_embedding_signature)✅ 既有 RAG 查詢時簽名比對過濾(rag_service._select_hits)✅ 既有 跨主機一致性驗證 cron ✅ 新增 ⭐ 既有 14k+ 筆回填 ⏳ 待手動跑 enqueue_missing_insight_embeddings() regression: 47 unit tests 全綠 Co-Authored-By: Claude Opus 4.7 (1M context) --- run_scheduler.py | 28 ++++++++++ services/rag_service.py | 119 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 147 insertions(+) diff --git a/run_scheduler.py b/run_scheduler.py index 9e96ee2..7f5c273 100644 --- a/run_scheduler.py +++ b/run_scheduler.py @@ -116,6 +116,11 @@ def _register_schedules(): schedule.every(4).hours.do(run_expire_stale_reviews) logger.info("📅 每 4 小時:expire_stale_reviews(24h 無回應降權 0.5)") + # Phase 11.0 護欄 #3:BGE-M3 跨主機一致性驗證(ADR-033) + # 每週一次足夠(驗證模型版本未漂移;不需每次啟動) + schedule.every().sunday.at("04:30").do(run_embed_consistency_check) + logger.info("📅 每週日 04:30:bge-m3 跨主機一致性驗證") + schedule.every().day.at("03:00").do(run_db_backup_task) logger.info("📅 每日 03:00:db_backup") @@ -225,6 +230,29 @@ def run_expire_stale_reviews(): logger.error(f"[ExpireStale] task failed: {e}", exc_info=True) +def run_embed_consistency_check(): + """每週日 04:30 — BGE-M3 跨主機一致性驗證(ADR-033 護欄 #3)。 + + 跑 verify_embedding_consistency,不一致時 logger.error;ok 時 logger.info。 + 每週一次足夠(驗證模型版本未漂移;過頻會打三主機 Ollama 浪費)。 + """ + try: + from services.rag_service import verify_embedding_consistency + result = verify_embedding_consistency() + logger.info( + "[EmbedConsistency] ok=%s reachable=%s max_diff=%.2e signature=%s", + result['ok'], result['reachable'], + result['max_diff'], result['signature'], + ) + if not result['ok']: + logger.error( + "[EmbedConsistency] ⚠️ INCONSISTENT — RAG 召回率將下降;" + "檢查三主機 bge-m3 模型版本是否同步(ollama list)" + ) + except Exception as e: + logger.error(f"[EmbedConsistency] task failed: {e}", exc_info=True) + + def run_cleanup_agent_context(): """每日 03:30 — 清理 agent_context 表中已過期的 TTL 記錄(migration 018 定義)""" from database.manager import get_session diff --git a/services/rag_service.py b/services/rag_service.py index 282f34b..f8558c5 100644 --- a/services/rag_service.py +++ b/services/rag_service.py @@ -123,6 +123,124 @@ def get_embedding_signature( return hashlib.sha1(raw.encode('utf-8')).hexdigest()[:12] +# ───────────────────────────────────────────────────────────────────────────── +# Phase 11.0 護欄 #3:BGE-M3 跨主機一致性啟動驗證(ADR-033) +# ───────────────────────────────────────────────────────────────────────────── +EMBED_CONSISTENCY_TEST_TEXT = "momo電商競品分析測試向量一致性檢查" +EMBED_CONSISTENCY_MAX_DIFF = 1e-4 # cosine 距離上限(浮點誤差容忍) +EMBED_CONSISTENCY_TIMEOUT_SEC = 10.0 # 各主機 embedding 探測 timeout + + +def _cosine_distance(vec_a: List[float], vec_b: List[float]) -> float: + """純 Python cosine distance(不依賴 numpy 避免額外 import)""" + if not vec_a or not vec_b or len(vec_a) != len(vec_b): + return 1.0 + dot = sum(a * b for a, b in zip(vec_a, vec_b)) + norm_a = sum(a * a for a in vec_a) ** 0.5 + norm_b = sum(b * b for b in vec_b) ** 0.5 + if norm_a == 0 or norm_b == 0: + return 1.0 + return max(0.0, 1.0 - dot / (norm_a * norm_b)) + + +def verify_embedding_consistency( + test_text: str = EMBED_CONSISTENCY_TEST_TEXT, + max_diff: float = EMBED_CONSISTENCY_MAX_DIFF, +) -> Dict[str, Any]: + """跨三主機(GCP Primary / Secondary / 111)BGE-M3 embedding 一致性驗證。 + + Owen v5.0 護欄 #3(ADR-033)— RAG 啟動時驗證;不一致則 log warning。 + fail-safe:任何主機失敗(連線、超時)都跳過,只比對能拿到的 embeddings。 + 最少 2 個主機可達才能比對;只有 1 個 → 回 ok=True + warning「無法比對」。 + + 回傳: + { + 'ok': bool, + 'signature': str, + 'reachable': [...], # ['gcp_ollama', 'ollama_secondary', 'ollama_111'] + 'max_diff': float, # 跨主機最大 cosine 距離 + 'errors': [...], + } + """ + import time + from services.ollama_service import ( + OLLAMA_HOST_PRIMARY, OLLAMA_HOST_SECONDARY, OLLAMA_HOST_FALLBACK, + ollama_service, + ) + + hosts = { + 'gcp_ollama': OLLAMA_HOST_PRIMARY, + 'ollama_secondary': OLLAMA_HOST_SECONDARY, + 'ollama_111': OLLAMA_HOST_FALLBACK, + } + + embeddings: Dict[str, List[float]] = {} + errors: List[str] = [] + + for label, host in hosts.items(): + try: + t0 = time.monotonic() + vec = ollama_service.generate_embedding( + text=test_text, + model=RAG_EMBED_MODEL, + host=host, # 顯式指定(避免 retry 鏈干擾驗證) + timeout=int(EMBED_CONSISTENCY_TIMEOUT_SEC), + ) + elapsed = time.monotonic() - t0 + if vec and len(vec) == RAG_EMBED_DIM: + embeddings[label] = vec + logger.info(f"[EmbedVerify] {label} ({host}) ok in {elapsed:.2f}s, dim={len(vec)}") + else: + errors.append(f"{label}: empty or wrong dim ({len(vec) if vec else 0})") + logger.warning(f"[EmbedVerify] {label} returned empty/wrong-dim vector") + except Exception as exc: + errors.append(f"{label}: {type(exc).__name__}: {str(exc)[:200]}") + logger.warning(f"[EmbedVerify] {label} failed: {exc}") + + signature = get_embedding_signature() + reachable = list(embeddings.keys()) + + if len(embeddings) < 2: + msg = f"only {len(embeddings)} host reachable, cannot cross-verify" + logger.warning(f"[EmbedVerify] {msg}") + return { + 'ok': True, # fail-safe:1 主機可達不算錯(戰時可能 2 主機暫斷) + 'signature': signature, + 'reachable': reachable, + 'max_diff': 0.0, + 'errors': errors + [msg], + } + + # 兩兩比對 cosine 距離 + import itertools + max_diff_observed = 0.0 + for label_a, label_b in itertools.combinations(embeddings, 2): + d = _cosine_distance(embeddings[label_a], embeddings[label_b]) + max_diff_observed = max(max_diff_observed, d) + logger.debug(f"[EmbedVerify] {label_a} vs {label_b}: cosine_distance={d:.6f}") + + consistent = max_diff_observed <= max_diff + if not consistent: + logger.error( + f"[EmbedVerify] ⚠️ INCONSISTENT! max cosine distance {max_diff_observed:.6f} > {max_diff} " + f"(signature={signature}, reachable={reachable}). " + f"模型版本可能漂移;RAG 召回率將下降。" + ) + else: + logger.info( + f"[EmbedVerify] ✅ consistent across {len(reachable)} hosts " + f"(max_diff={max_diff_observed:.2e}, signature={signature})" + ) + + return { + 'ok': consistent, + 'signature': signature, + 'reachable': reachable, + 'max_diff': max_diff_observed, + 'errors': errors, + } + + # ───────────────────────────────────────────────────────────────────────────── # 結果容器 # ───────────────────────────────────────────────────────────────────────────── @@ -528,5 +646,6 @@ __all__ = [ 'RAGResult', 'rag_service', 'get_embedding_signature', + 'verify_embedding_consistency', 'is_rag_enabled', ]