import json import math import threading import time from datetime import datetime, timedelta, timezone from sqlalchemy import text from services.logger_manager import SystemLogger from database.manager import get_session from database.ai_models import AIInsight from services.ollama_service import ( embedding_gcp_circuit_remaining_seconds, is_embedding_gcp_circuit_open, ollama_service, ) sys_log = SystemLogger("OCLearn").get_logger() # ===================================================================== # ADR-007 Step 4 (2026-04-19): DB 持久化 Embedding Retry Queue # # 改造原因:原記憶體 Queue 重啟會遺失未處理項目,違反 ADR-007「雙寫必達」。 # 新架構:入列寫 embedding_retry_queue 表 → Hermes worker 每分鐘輪詢批次處理。 # ===================================================================== EMBED_POLL_INTERVAL_SEC = 60 # worker 輪詢間隔 EMBED_BATCH_SIZE = 10 # 單次最多處理筆數 EMBED_MAX_ATTEMPTS = 5 # 超過則標記 failed DECAY_RATE = 0.01 # ADR-005:半衰期約 70 天 _EMBEDDING_TARGET_TABLES = {"ai_insights", "learning_episodes"} TAIPEI_TZ = timezone(timedelta(hours=8)) def _now_taipei_naive() -> datetime: """DB DateTime 欄位統一存台北時間 naive,避免 UTC stale 判斷誤判。""" return datetime.now(TAIPEI_TZ).replace(tzinfo=None) def _enqueue_embedding(target_table: str, target_id: int, text_content: str, model: str = "bge-m3:latest") -> bool: """將待 embed 項目寫入 DB retry queue(持久化)""" if target_table not in _EMBEDDING_TARGET_TABLES: sys_log.warning(f"[OCLearn] enqueue embedding 拒絕未知 target_table={target_table}") return False session = get_session() try: session.execute( text(""" INSERT INTO embedding_retry_queue (target_table, target_id, text_content, model, status, created_at) VALUES (:t, :i, :txt, :m, 'pending', :now) """), { "t": target_table, "i": target_id, "txt": text_content, "m": model, "now": _now_taipei_naive(), }, ) session.commit() return True except Exception as e: session.rollback() sys_log.warning(f"[OCLearn] enqueue embedding 失敗 (可能表尚未建立): {e}") return False finally: session.close() def _defer_embedding_row(row_id: int, reason: str) -> bool: """把已 claim 的 embedding 任務退回 pending,不扣 attempts。""" session = get_session() try: session.execute( text(""" UPDATE embedding_retry_queue SET status='pending', last_error=:err, updated_at=:now WHERE id=:id """), { "err": reason[:500], "now": _now_taipei_naive(), "id": row_id, }, ) session.commit() return True except Exception as exc: session.rollback() sys_log.warning(f"[OCLearn] embedding 延後處理狀態更新失敗: {exc}") return False finally: session.close() def enqueue_insight_embedding(insight_id: int, insight_type: str, content: str, period: str = None) -> bool: """Public bridge for legacy raw ai_insights inserts to join ADR-007 embedding queue.""" if not insight_id or not content: return False embed_target_text = f"{insight_type} ({period or ''}): {content}" return _enqueue_embedding("ai_insights", int(insight_id), embed_target_text) def enqueue_missing_insight_embeddings(limit: int = 200) -> dict: """Backfill existing ai_insights that have not yet entered the embedding queue.""" session = get_session() try: rows = session.execute( text(""" SELECT id, insight_type, period, content FROM ai_insights i WHERE i.embedding IS NULL AND COALESCE(i.status, 'approved') NOT IN ('archived', 'rejected') AND NOT EXISTS ( SELECT 1 FROM embedding_retry_queue q WHERE q.target_table = 'ai_insights' AND q.target_id = i.id AND q.status IN ('pending', 'processing', 'done') ) ORDER BY i.created_at DESC LIMIT :lim """), {"lim": limit}, ).fetchall() except Exception as exc: sys_log.warning(f"[OCLearn] embedding backfill 略過 (schema/pgvector 未就緒?): {exc}") return {"scanned": 0, "enqueued": 0, "status": "skipped", "error": str(exc)[:200]} finally: session.close() enqueued = 0 for row in rows: if enqueue_insight_embedding(row.id, row.insight_type, row.content, row.period): enqueued += 1 return {"scanned": len(rows), "enqueued": enqueued, "status": "ok"} def _process_one_embedding(row_id: int, target_table: str, target_id: int, text_content: str, model: str) -> bool: """處理單筆 embedding,成功寫回目標表,失敗累加 attempts""" session = get_session() try: session.execute( text("UPDATE embedding_retry_queue SET status='processing', updated_at=:now WHERE id=:id"), {"now": _now_taipei_naive(), "id": row_id}, ) session.commit() vec = ollama_service.generate_embedding( text_content, model=model, allow_111_fallback=False, ) if not vec: if is_embedding_gcp_circuit_open(): remaining = embedding_gcp_circuit_remaining_seconds() reason = f"GCP embedding 熔斷中,延後處理,不扣 attempts(約 {remaining:.0f}s 後重試)" _defer_embedding_row(row_id, reason) sys_log.info(f"[OCLearn] embedding 延後處理 row_id={row_id}: {reason}") return False raise RuntimeError("embedding 回傳空值") if target_table not in _EMBEDDING_TARGET_TABLES: raise RuntimeError(f"不允許的 embedding target_table: {target_table}") from services.rag_service import get_embedding_signature vec_str = str(vec) embedding_signature = get_embedding_signature(model=model, dim=len(vec)) session.execute( text(f""" UPDATE {target_table} SET embedding = :vec, embedding_signature = :sig WHERE id = :id """), {"vec": vec_str, "sig": embedding_signature, "id": target_id}, ) session.execute( text(""" UPDATE embedding_retry_queue SET status='done', processed_at=:now, updated_at=:now WHERE id=:id """), {"now": _now_taipei_naive(), "id": row_id}, ) session.commit() sys_log.debug(f"[OCLearn] embedding 寫入成功 {target_table}#{target_id}") return True except Exception as e: session.rollback() try: session.execute( text(""" UPDATE embedding_retry_queue SET attempts = attempts + 1, last_error = :err, status = CASE WHEN attempts + 1 >= :max THEN 'failed' ELSE 'pending' END, updated_at = :now WHERE id = :id """), { "err": str(e)[:500], "max": EMBED_MAX_ATTEMPTS, "now": _now_taipei_naive(), "id": row_id, }, ) session.commit() except Exception as e2: session.rollback() sys_log.error(f"[OCLearn] 更新 retry attempts 失敗: {e2}") sys_log.warning(f"[OCLearn] embedding 失敗 (attempt 累加): {e}") return False finally: session.close() def _claim_pending_embeddings(limit: int = EMBED_BATCH_SIZE, max_attempts: int = EMBED_MAX_ATTEMPTS): """Atomically claim pending embedding jobs across multiple workers.""" if is_embedding_gcp_circuit_open(): remaining = embedding_gcp_circuit_remaining_seconds() sys_log.info(f"[OCLearn] embedding GCP 熔斷中,暫停 claim pending(約 {remaining:.0f}s)") return [] session = get_session() try: session.execute( text(""" UPDATE embedding_retry_queue SET status = 'pending', updated_at = :now, last_error = COALESCE(last_error, '') || ' | reset stale processing' WHERE status = 'processing' AND updated_at < :cutoff """), { "now": _now_taipei_naive(), "cutoff": _now_taipei_naive() - timedelta(minutes=15), }, ) session.commit() rows = session.execute( text(""" WITH picked AS ( SELECT id FROM embedding_retry_queue WHERE status = 'pending' AND attempts < :max ORDER BY created_at FOR UPDATE SKIP LOCKED LIMIT :lim ) UPDATE embedding_retry_queue q SET status = 'processing', updated_at = :now FROM picked WHERE q.id = picked.id RETURNING q.id, q.target_table, q.target_id, q.text_content, q.model """), { "now": _now_taipei_naive(), "max": max_attempts, "lim": limit, }, ).fetchall() session.commit() return rows except Exception: session.rollback() raise finally: session.close() def _embedding_worker_loop(): """背景執行緒:輪詢 embedding_retry_queue,批次處理 pending 項目""" sys_log.info("[OCLearn] Hermes Embedding Worker (DB-backed) 啟動") while True: try: rows = _claim_pending_embeddings() for idx, row in enumerate(rows): _process_one_embedding( row[0], row[1], row[2], row[3], row[4] or "bge-m3:latest" ) if is_embedding_gcp_circuit_open(): remaining_rows = rows[idx + 1:] remaining = embedding_gcp_circuit_remaining_seconds() reason = f"GCP embedding 熔斷中,同批任務退回 pending,不扣 attempts(約 {remaining:.0f}s 後重試)" for pending_row in remaining_rows: _defer_embedding_row(pending_row[0], reason) if remaining_rows: sys_log.info( f"[OCLearn] embedding 同批任務已延後 {len(remaining_rows)} 筆: {reason}" ) break except Exception as e: sys_log.warning(f"[OCLearn] worker 輪詢略過 (表可能未建立): {e}") time.sleep(EMBED_POLL_INTERVAL_SEC) # 啟動背景 Worker(daemon,Python 進程結束會自然回收) threading.Thread(target=_embedding_worker_loop, daemon=True).start() # ===================================================================== # ADR-007 核心:store_insight 雙寫(DB + 排程 embedding) # ===================================================================== def store_insight(insight_type: str, content: str, period: str = None, product_sku: str = None, metadata: dict = None, ai_model: str = None, confidence: float = None, created_by: str = None, status: str = None) -> int: """ 將 AI 產出存入 ai_insights 表並排程向量化。 - Cache-aside:同 insight_type + period + product_sku 已存在則覆蓋 - 向量化:寫入 embedding_retry_queue 供 Hermes worker 處理(持久化) """ session = get_session() try: meta_str = json.dumps(metadata, ensure_ascii=False) if metadata else None existing = None if period and insight_type: q = session.query(AIInsight).filter_by(insight_type=insight_type, period=period) if product_sku: q = q.filter_by(product_sku=product_sku) existing = q.first() if existing: existing.content = content if meta_str: existing.metadata_json = meta_str existing.updated_at = _now_taipei_naive() session.commit() insight_id = existing.id sys_log.info(f"[OCLearn] 更新 insight_type={insight_type} period={period}") else: insight_kwargs = { "insight_type": insight_type, "period": period, "product_sku": product_sku, "content": content, "metadata_json": meta_str, "status": status or "approved", "created_at": _now_taipei_naive(), "updated_at": _now_taipei_naive(), } if confidence is not None: insight_kwargs["confidence"] = confidence if created_by: insight_kwargs["created_by"] = created_by new_insight = AIInsight(**insight_kwargs) session.add(new_insight) session.commit() insight_id = new_insight.id sys_log.info(f"[OCLearn] 新增 insight_type={insight_type} period={period}") # 若已有進階欄位則寫入(Migration 010/015 後可用,先試再容錯) update_fields = {} if ai_model: update_fields["ai_model"] = ai_model if confidence is not None: update_fields["confidence"] = confidence if created_by: update_fields["created_by"] = created_by if status: update_fields["status"] = status if update_fields: try: assignments = ", ".join(f"{key} = :{key}" for key in update_fields) params = dict(update_fields) params["i"] = insight_id session.execute( text(f"UPDATE ai_insights SET {assignments}, updated_at = :now WHERE id = :i"), {**params, "now": _now_taipei_naive()}, ) session.commit() except Exception: session.rollback() # 推入 DB retry queue(持久化) embed_target_text = f"{insight_type} ({period or ''}): {content}" embedding_queued = _enqueue_embedding("ai_insights", insight_id, embed_target_text) if not embedding_queued: try: metadata_payload = metadata.copy() if isinstance(metadata, dict) else {} metadata_payload["embedding_queue_error"] = True session.execute( text(""" UPDATE ai_insights SET metadata_json = :meta, updated_at = :now WHERE id = :id """), { "meta": json.dumps(metadata_payload, ensure_ascii=False), "now": _now_taipei_naive(), "id": insight_id, }, ) session.commit() except Exception as meta_err: session.rollback() sys_log.warning(f"[OCLearn] embedding queue error metadata 寫入失敗: {meta_err}") return insight_id except Exception as e: session.rollback() sys_log.error(f"[OCLearn] store_insight 錯誤: {e}") return None finally: session.close() # ===================================================================== # ADR-005: 時間衰減品質分數 # ===================================================================== def compute_effective_score(base_score: float, created_at: datetime, decay_rate: float = DECAY_RATE) -> float: """Effective_Score = base × exp(-λ × days_since_created)""" if not created_at: return base_score if created_at.tzinfo is None: created_at = created_at.replace(tzinfo=timezone.utc) days = (datetime.now(timezone.utc) - created_at).days return base_score * math.exp(-decay_rate * max(days, 0)) # ===================================================================== # RAG 檢索(套時間衰減) # ===================================================================== def build_rag_context(query: str, insight_type: str = None, period: str = None, top_k: int = 5) -> str: """ RAG 上下文檢索(Cache-aside 精確命中 + 未來語意檢索) 2026-04-19: 加入時間衰減重排(ADR-005) """ session = get_session() try: semantic_context = _build_semantic_rag_context(session, query, insight_type, period, top_k) if semantic_context: return semantic_context q = session.query(AIInsight) if insight_type: q = q.filter_by(insight_type=insight_type) if period: q = q.filter_by(period=period) if not period: q = q.order_by(AIInsight.created_at.desc()).limit(20) else: q = q.limit(200) results = q.all() if not results: return "" # 套用時間衰減重排(無 avg_quality 欄位時,預設 base=0.5) ranked = [] for r in results: base = getattr(r, "avg_quality", None) or 0.5 exempt = getattr(r, "decay_exempt", False) effective = base if exempt else compute_effective_score(base, r.created_at) ranked.append((effective, r)) ranked.sort(key=lambda x: x[0], reverse=True) top = ranked[:top_k] context_parts = [] for score, r in top: p_tag = f"[{r.period}]" if r.period else "[歷史紀錄]" context_parts.append( f"{p_tag} {r.insight_type} (分數={score:.2f}):\n{r.content}" ) sys_log.info(f"[OCLearn] RAG context: {len(top)}/{len(results)} 筆(時間衰減重排)") return "\n\n---\n\n".join(context_parts) except Exception as e: sys_log.error(f"[OCLearn] build_rag_context 錯誤: {e}") return "" finally: session.close() def _build_semantic_rag_context(session, query: str, insight_type: str = None, period: str = None, top_k: int = 5) -> str: if not query: return "" try: vec = ollama_service.generate_embedding(query, allow_111_fallback=False) if not vec: return "" filters = ["embedding IS NOT NULL", "status IN ('approved', 'active', 'executed')"] params = {"qvec": str(vec), "lim": top_k} if insight_type: filters.append("insight_type = :insight_type") params["insight_type"] = insight_type if period: filters.append("period = :period") params["period"] = period rows = session.execute( text(f""" SELECT id, insight_type, period, content, avg_quality, decay_exempt, created_at, embedding <=> CAST(:qvec AS vector) AS distance FROM ai_insights WHERE {' AND '.join(filters)} ORDER BY distance ASC, created_at DESC LIMIT :lim """), params, ).fetchall() if not rows: return "" parts = [] for row in rows: base = row.avg_quality if row.avg_quality is not None else 0.5 effective = base if row.decay_exempt else compute_effective_score(base, row.created_at) p_tag = f"[{row.period}]" if row.period else "[語意記憶]" parts.append( f"{p_tag} {row.insight_type} (語意距離={row.distance:.3f}, 分數={effective:.2f}):\n{row.content}" ) sys_log.info(f"[OCLearn] semantic RAG context: {len(parts)} 筆") return "\n\n---\n\n".join(parts) except Exception as exc: sys_log.debug(f"[OCLearn] semantic RAG fallback to decay ranking: {exc}") return "" def build_rag_context_by_date(start_date: str, end_date: str) -> str: """週報 RAG:依日期區間過濾 ai_insights""" session = get_session() try: start_dt = datetime.strptime(f"{start_date} 00:00:00", "%Y-%m-%d %H:%M:%S") end_dt = datetime.strptime(f"{end_date} 23:59:59", "%Y-%m-%d %H:%M:%S") results = ( session.query(AIInsight) .filter(AIInsight.created_at >= start_dt) .filter(AIInsight.created_at <= end_dt) .order_by(AIInsight.created_at.asc()) .limit(200) .all() ) if not results: return "" parts = [] for r in results: dt_str = r.created_at.strftime("%Y-%m-%d %H:%M") if r.created_at else "" p_tag = f"[{r.period if r.period else dt_str}]" parts.append(f"{p_tag} 洞察類型({r.insight_type}):\n{r.content}") sys_log.info( f"[OCLearn] 週報 RAG: {len(results)} 筆({start_date}~{end_date})" ) return "\n\n---\n\n".join(parts) except Exception as e: sys_log.error(f"[OCLearn] build_rag_context_by_date 錯誤: {e}") return "" finally: session.close() # ===================================================================== # 其他 API # ===================================================================== def store_conversation( user_id: int, chat_id: int, user_message: str, bot_response: str, source: str = "", used_sources: list = None, ) -> bool: """用戶對話沉澱(走 store_insight 雙寫) ADR-007 持久化鐵律:DB + KM 雙寫必達。 Args: user_id: 使用者 ID(OpenClaw bot 目前傳 0,待後續 user_id 真實化) chat_id: Telegram chat_id(同上,目前傳 0) user_message: 用戶訊息 bot_response: bot 回覆 source: 回覆來源標記,例:'ollama' / 'direct' / 'tool1,tool2' used_sources: 觸發的 tool/sources list,例:['query_sales', 'get_market_intel'] 元數據策略:source / used_sources / chat_id 都寫入 metadata, 保留給後續分析(哪個來源/工具被使用、哪個 chat 觸發)。 """ used_sources = used_sources or [] try: store_insight( insight_type="chat_history", content=f"User: {user_message}\nBot: {bot_response}", metadata={ "user_id": user_id, "chat_id": chat_id, "source": source, "used_sources": used_sources, }, ) return True except Exception as e: sys_log.error(f"[OCLearn] store_conversation 錯誤: {e}") return False def update_feedback(insight_id: int, rating: int, comment: str = None) -> bool: """用戶回饋:累加 feedback_up/down 並重算 avg_quality(Migration 010 後生效)""" session = get_session() try: col = "feedback_up" if rating >= 4 else "feedback_down" session.execute( text(f"UPDATE ai_insights SET {col} = COALESCE({col},0) + 1 WHERE id = :id"), {"id": insight_id}, ) # 重算 avg_quality = up / (up+down) session.execute( text(""" UPDATE ai_insights SET avg_quality = CASE WHEN COALESCE(feedback_up,0) + COALESCE(feedback_down,0) = 0 THEN 0.5 ELSE COALESCE(feedback_up,0)::float / (COALESCE(feedback_up,0) + COALESCE(feedback_down,0)) END WHERE id = :id """), {"id": insight_id}, ) session.commit() sys_log.info(f"[OCLearn] Feedback {insight_id}: rating={rating}") return True except Exception as e: session.rollback() sys_log.warning(f"[OCLearn] update_feedback 略過 (Migration 010 未執行?): {e}") return False finally: session.close() def get_learning_stats() -> dict: """學習系統總攬指標""" session = get_session() try: count = session.query(AIInsight).count() try: pending = session.execute( text("SELECT COUNT(*) FROM embedding_retry_queue WHERE status='pending'") ).scalar() except Exception: pending = None return { "total_insights": count, "embedding_queue_pending": pending, "status": "active", } except Exception as e: return {"total_insights": 0, "status": "error", "error": str(e)} finally: session.close() # ===================================================================== # ADR-005 Step-2: 每日批次維護(03:00 去重 / 04:00 品質分數重算) # ===================================================================== def run_dedup_batch() -> dict: """ 03:00 批次:去除同日同 SKU 同 insight_type 的重複洞察。 策略:保留 avg_quality 最高者(tie-break: 最新 id),其餘設 status='archived'。 回傳 {"archived": int, "scanned": int} """ session = get_session() archived = 0 scanned = 0 try: dupes = session.execute(text(""" SELECT insight_type, product_sku, period, COUNT(*) AS cnt FROM ai_insights WHERE status NOT IN ('archived', 'relearn') AND product_sku IS NOT NULL GROUP BY insight_type, product_sku, period HAVING COUNT(*) > 1 """)).fetchall() for row in dupes: scanned += row[3] keep = session.execute(text(""" SELECT id FROM ai_insights WHERE insight_type = :t AND product_sku = :sku AND period IS NOT DISTINCT FROM :p AND status NOT IN ('archived', 'relearn') ORDER BY avg_quality DESC NULLS LAST, id DESC LIMIT 1 """), {"t": row[0], "sku": row[1], "p": row[2]}).scalar() if keep is None: continue result = session.execute(text(""" UPDATE ai_insights SET status = 'archived', updated_at = CURRENT_TIMESTAMP WHERE insight_type = :t AND product_sku = :sku AND period IS NOT DISTINCT FROM :p AND status NOT IN ('archived', 'relearn') AND id != :keep_id """), {"t": row[0], "sku": row[1], "p": row[2], "keep_id": keep}) archived += result.rowcount session.commit() sys_log.info(f"[OCLearn] 去重批次完成:掃描 {scanned} 筆,歸檔 {archived} 筆") return {"archived": archived, "scanned": scanned} except Exception as e: session.rollback() sys_log.error(f"[OCLearn] 去重批次失敗: {e}") return {"archived": 0, "scanned": 0, "error": str(e)} finally: session.close() def run_quality_rescore_batch() -> dict: """ 04:00 批次:對所有 approved/relearn 狀態的 ai_insights 套用時間衰減公式。 decay_exempt=True 的記錄跳過衰減(永久知識)。 relearn 狀態額外懲罰 20%。分數 < 0.05 自動歸檔。 回傳 {"updated": int, "relearn_reset": int} """ session = get_session() updated = 0 relearn_reset = 0 try: # 單條 SQL UPDATE:指數衰減計算移至 SQL,避免 N+1 UPDATE # decay_exempt=TRUE 的記錄跳過(永久知識) # relearn 狀態額外懲罰 20%(乘以 0.8) result = session.execute(text(""" UPDATE ai_insights SET avg_quality = ROUND( CASE WHEN status = 'relearn' THEN avg_quality * EXP(-:rate * (CURRENT_DATE - created_at::date)) * 0.8 ELSE avg_quality * EXP(-:rate * (CURRENT_DATE - created_at::date)) END, 4 ), updated_at = CURRENT_TIMESTAMP WHERE status IN ('approved', 'relearn') AND decay_exempt = FALSE AND created_at < CURRENT_DATE """), {"rate": DECAY_RATE}) updated = result.rowcount # 低分自動歸檔(分數 < 0.05) archive_result = session.execute(text(""" UPDATE ai_insights SET status = 'archived', updated_at = CURRENT_TIMESTAMP WHERE status IN ('approved', 'relearn') AND decay_exempt = FALSE AND avg_quality < 0.05 """)) # 計算 relearn_reset(歸檔前 status 為 relearn 的筆數) relearn_reset_result = session.execute(text(""" SELECT COUNT(*) FROM ai_insights WHERE status = 'archived' AND decay_exempt = FALSE AND avg_quality < 0.05 AND updated_at >= CURRENT_TIMESTAMP - INTERVAL '5 seconds' """)) relearn_reset = relearn_reset_result.scalar() or 0 session.commit() sys_log.info( f"[OCLearn] 品質分數重算完成:更新 {updated} 筆," f"低分自動歸檔含 relearn {relearn_reset} 筆" ) return {"updated": updated, "relearn_reset": relearn_reset} except Exception as e: session.rollback() sys_log.error(f"[OCLearn] 品質分數重算批次失敗: {e}") return {"updated": 0, "relearn_reset": 0, "error": str(e)} finally: session.close()