737 lines
28 KiB
Python
737 lines
28 KiB
Python
import json
|
||
import math
|
||
import threading
|
||
import time
|
||
from datetime import datetime, timedelta, timezone
|
||
from sqlalchemy import text
|
||
from services.logger_manager import SystemLogger
|
||
from database.manager import get_session
|
||
from database.ai_models import AIInsight
|
||
from services.ollama_service import ollama_service
|
||
|
||
sys_log = SystemLogger("OCLearn").get_logger()
|
||
|
||
# =====================================================================
|
||
# ADR-007 Step 4 (2026-04-19): DB 持久化 Embedding Retry Queue
|
||
#
|
||
# 改造原因:原記憶體 Queue 重啟會遺失未處理項目,違反 ADR-007「雙寫必達」。
|
||
# 新架構:入列寫 embedding_retry_queue 表 → Hermes worker 每分鐘輪詢批次處理。
|
||
# =====================================================================
|
||
|
||
EMBED_POLL_INTERVAL_SEC = 60 # worker 輪詢間隔
|
||
EMBED_BATCH_SIZE = 10 # 單次最多處理筆數
|
||
EMBED_MAX_ATTEMPTS = 5 # 超過則標記 failed
|
||
DECAY_RATE = 0.01 # ADR-005:半衰期約 70 天
|
||
_EMBEDDING_TARGET_TABLES = {"ai_insights", "learning_episodes"}
|
||
TAIPEI_TZ = timezone(timedelta(hours=8))
|
||
|
||
|
||
def _now_taipei_naive() -> datetime:
|
||
"""DB DateTime 欄位統一存台北時間 naive,避免 UTC stale 判斷誤判。"""
|
||
return datetime.now(TAIPEI_TZ).replace(tzinfo=None)
|
||
|
||
|
||
def _enqueue_embedding(target_table: str, target_id: int, text_content: str,
|
||
model: str = "bge-m3:latest") -> bool:
|
||
"""將待 embed 項目寫入 DB retry queue(持久化)"""
|
||
if target_table not in _EMBEDDING_TARGET_TABLES:
|
||
sys_log.warning(f"[OCLearn] enqueue embedding 拒絕未知 target_table={target_table}")
|
||
return False
|
||
|
||
session = get_session()
|
||
try:
|
||
session.execute(
|
||
text("""
|
||
INSERT INTO embedding_retry_queue
|
||
(target_table, target_id, text_content, model, status, created_at)
|
||
VALUES (:t, :i, :txt, :m, 'pending', :now)
|
||
"""),
|
||
{
|
||
"t": target_table,
|
||
"i": target_id,
|
||
"txt": text_content,
|
||
"m": model,
|
||
"now": _now_taipei_naive(),
|
||
},
|
||
)
|
||
session.commit()
|
||
return True
|
||
except Exception as e:
|
||
session.rollback()
|
||
sys_log.warning(f"[OCLearn] enqueue embedding 失敗 (可能表尚未建立): {e}")
|
||
return False
|
||
finally:
|
||
session.close()
|
||
|
||
|
||
def enqueue_insight_embedding(insight_id: int, insight_type: str, content: str,
|
||
period: str = None) -> bool:
|
||
"""Public bridge for legacy raw ai_insights inserts to join ADR-007 embedding queue."""
|
||
if not insight_id or not content:
|
||
return False
|
||
embed_target_text = f"{insight_type} ({period or ''}): {content}"
|
||
return _enqueue_embedding("ai_insights", int(insight_id), embed_target_text)
|
||
|
||
|
||
def enqueue_missing_insight_embeddings(limit: int = 200) -> dict:
|
||
"""Backfill existing ai_insights that have not yet entered the embedding queue."""
|
||
session = get_session()
|
||
try:
|
||
rows = session.execute(
|
||
text("""
|
||
SELECT id, insight_type, period, content
|
||
FROM ai_insights i
|
||
WHERE i.embedding IS NULL
|
||
AND COALESCE(i.status, 'approved') NOT IN ('archived', 'rejected')
|
||
AND NOT EXISTS (
|
||
SELECT 1
|
||
FROM embedding_retry_queue q
|
||
WHERE q.target_table = 'ai_insights'
|
||
AND q.target_id = i.id
|
||
AND q.status IN ('pending', 'processing', 'done')
|
||
)
|
||
ORDER BY i.created_at DESC
|
||
LIMIT :lim
|
||
"""),
|
||
{"lim": limit},
|
||
).fetchall()
|
||
except Exception as exc:
|
||
sys_log.warning(f"[OCLearn] embedding backfill 略過 (schema/pgvector 未就緒?): {exc}")
|
||
return {"scanned": 0, "enqueued": 0, "status": "skipped", "error": str(exc)[:200]}
|
||
finally:
|
||
session.close()
|
||
|
||
enqueued = 0
|
||
for row in rows:
|
||
if enqueue_insight_embedding(row.id, row.insight_type, row.content, row.period):
|
||
enqueued += 1
|
||
return {"scanned": len(rows), "enqueued": enqueued, "status": "ok"}
|
||
|
||
|
||
def _process_one_embedding(row_id: int, target_table: str, target_id: int,
|
||
text_content: str, model: str) -> bool:
|
||
"""處理單筆 embedding,成功寫回目標表,失敗累加 attempts"""
|
||
session = get_session()
|
||
try:
|
||
session.execute(
|
||
text("UPDATE embedding_retry_queue SET status='processing', updated_at=:now WHERE id=:id"),
|
||
{"now": _now_taipei_naive(), "id": row_id},
|
||
)
|
||
session.commit()
|
||
|
||
vec = ollama_service.generate_embedding(text_content, model=model)
|
||
if not vec:
|
||
raise RuntimeError("embedding 回傳空值")
|
||
|
||
if target_table not in _EMBEDDING_TARGET_TABLES:
|
||
raise RuntimeError(f"不允許的 embedding target_table: {target_table}")
|
||
|
||
from services.rag_service import get_embedding_signature
|
||
|
||
vec_str = str(vec)
|
||
embedding_signature = get_embedding_signature(model=model, dim=len(vec))
|
||
session.execute(
|
||
text(f"""
|
||
UPDATE {target_table}
|
||
SET embedding = :vec,
|
||
embedding_signature = :sig
|
||
WHERE id = :id
|
||
"""),
|
||
{"vec": vec_str, "sig": embedding_signature, "id": target_id},
|
||
)
|
||
session.execute(
|
||
text("""
|
||
UPDATE embedding_retry_queue
|
||
SET status='done', processed_at=:now, updated_at=:now
|
||
WHERE id=:id
|
||
"""),
|
||
{"now": _now_taipei_naive(), "id": row_id},
|
||
)
|
||
session.commit()
|
||
sys_log.debug(f"[OCLearn] embedding 寫入成功 {target_table}#{target_id}")
|
||
return True
|
||
|
||
except Exception as e:
|
||
session.rollback()
|
||
try:
|
||
session.execute(
|
||
text("""
|
||
UPDATE embedding_retry_queue
|
||
SET attempts = attempts + 1,
|
||
last_error = :err,
|
||
status = CASE WHEN attempts + 1 >= :max THEN 'failed' ELSE 'pending' END,
|
||
updated_at = :now
|
||
WHERE id = :id
|
||
"""),
|
||
{
|
||
"err": str(e)[:500],
|
||
"max": EMBED_MAX_ATTEMPTS,
|
||
"now": _now_taipei_naive(),
|
||
"id": row_id,
|
||
},
|
||
)
|
||
session.commit()
|
||
except Exception as e2:
|
||
session.rollback()
|
||
sys_log.error(f"[OCLearn] 更新 retry attempts 失敗: {e2}")
|
||
sys_log.warning(f"[OCLearn] embedding 失敗 (attempt 累加): {e}")
|
||
return False
|
||
finally:
|
||
session.close()
|
||
|
||
|
||
def _claim_pending_embeddings(limit: int = EMBED_BATCH_SIZE,
|
||
max_attempts: int = EMBED_MAX_ATTEMPTS):
|
||
"""Atomically claim pending embedding jobs across multiple workers."""
|
||
session = get_session()
|
||
try:
|
||
session.execute(
|
||
text("""
|
||
UPDATE embedding_retry_queue
|
||
SET status = 'pending',
|
||
updated_at = :now,
|
||
last_error = COALESCE(last_error, '') || ' | reset stale processing'
|
||
WHERE status = 'processing'
|
||
AND updated_at < :cutoff
|
||
"""),
|
||
{
|
||
"now": _now_taipei_naive(),
|
||
"cutoff": _now_taipei_naive() - timedelta(minutes=15),
|
||
},
|
||
)
|
||
session.commit()
|
||
rows = session.execute(
|
||
text("""
|
||
WITH picked AS (
|
||
SELECT id
|
||
FROM embedding_retry_queue
|
||
WHERE status = 'pending'
|
||
AND attempts < :max
|
||
ORDER BY created_at
|
||
FOR UPDATE SKIP LOCKED
|
||
LIMIT :lim
|
||
)
|
||
UPDATE embedding_retry_queue q
|
||
SET status = 'processing',
|
||
updated_at = :now
|
||
FROM picked
|
||
WHERE q.id = picked.id
|
||
RETURNING q.id, q.target_table, q.target_id, q.text_content, q.model
|
||
"""),
|
||
{
|
||
"now": _now_taipei_naive(),
|
||
"max": max_attempts,
|
||
"lim": limit,
|
||
},
|
||
).fetchall()
|
||
session.commit()
|
||
return rows
|
||
except Exception:
|
||
session.rollback()
|
||
raise
|
||
finally:
|
||
session.close()
|
||
|
||
|
||
def _embedding_worker_loop():
|
||
"""背景執行緒:輪詢 embedding_retry_queue,批次處理 pending 項目"""
|
||
sys_log.info("[OCLearn] Hermes Embedding Worker (DB-backed) 啟動")
|
||
while True:
|
||
try:
|
||
rows = _claim_pending_embeddings()
|
||
for row in rows:
|
||
_process_one_embedding(
|
||
row[0], row[1], row[2], row[3], row[4] or "bge-m3:latest"
|
||
)
|
||
except Exception as e:
|
||
sys_log.warning(f"[OCLearn] worker 輪詢略過 (表可能未建立): {e}")
|
||
|
||
time.sleep(EMBED_POLL_INTERVAL_SEC)
|
||
|
||
|
||
# 啟動背景 Worker(daemon,Python 進程結束會自然回收)
|
||
threading.Thread(target=_embedding_worker_loop, daemon=True).start()
|
||
|
||
|
||
# =====================================================================
|
||
# ADR-007 核心:store_insight 雙寫(DB + 排程 embedding)
|
||
# =====================================================================
|
||
|
||
def store_insight(insight_type: str, content: str, period: str = None,
|
||
product_sku: str = None, metadata: dict = None,
|
||
ai_model: str = None, confidence: float = None,
|
||
created_by: str = None, status: str = None) -> int:
|
||
"""
|
||
將 AI 產出存入 ai_insights 表並排程向量化。
|
||
- Cache-aside:同 insight_type + period + product_sku 已存在則覆蓋
|
||
- 向量化:寫入 embedding_retry_queue 供 Hermes worker 處理(持久化)
|
||
"""
|
||
session = get_session()
|
||
try:
|
||
meta_str = json.dumps(metadata, ensure_ascii=False) if metadata else None
|
||
|
||
existing = None
|
||
if period and insight_type:
|
||
q = session.query(AIInsight).filter_by(insight_type=insight_type, period=period)
|
||
if product_sku:
|
||
q = q.filter_by(product_sku=product_sku)
|
||
existing = q.first()
|
||
|
||
if existing:
|
||
existing.content = content
|
||
if meta_str:
|
||
existing.metadata_json = meta_str
|
||
existing.updated_at = _now_taipei_naive()
|
||
session.commit()
|
||
insight_id = existing.id
|
||
sys_log.info(f"[OCLearn] 更新 insight_type={insight_type} period={period}")
|
||
else:
|
||
insight_kwargs = {
|
||
"insight_type": insight_type,
|
||
"period": period,
|
||
"product_sku": product_sku,
|
||
"content": content,
|
||
"metadata_json": meta_str,
|
||
"status": status or "approved",
|
||
"created_at": _now_taipei_naive(),
|
||
"updated_at": _now_taipei_naive(),
|
||
}
|
||
if confidence is not None:
|
||
insight_kwargs["confidence"] = confidence
|
||
if created_by:
|
||
insight_kwargs["created_by"] = created_by
|
||
new_insight = AIInsight(**insight_kwargs)
|
||
session.add(new_insight)
|
||
session.commit()
|
||
insight_id = new_insight.id
|
||
sys_log.info(f"[OCLearn] 新增 insight_type={insight_type} period={period}")
|
||
|
||
# 若已有進階欄位則寫入(Migration 010/015 後可用,先試再容錯)
|
||
update_fields = {}
|
||
if ai_model:
|
||
update_fields["ai_model"] = ai_model
|
||
if confidence is not None:
|
||
update_fields["confidence"] = confidence
|
||
if created_by:
|
||
update_fields["created_by"] = created_by
|
||
if status:
|
||
update_fields["status"] = status
|
||
if update_fields:
|
||
try:
|
||
assignments = ", ".join(f"{key} = :{key}" for key in update_fields)
|
||
params = dict(update_fields)
|
||
params["i"] = insight_id
|
||
session.execute(
|
||
text(f"UPDATE ai_insights SET {assignments}, updated_at = :now WHERE id = :i"),
|
||
{**params, "now": _now_taipei_naive()},
|
||
)
|
||
session.commit()
|
||
except Exception:
|
||
session.rollback()
|
||
|
||
# 推入 DB retry queue(持久化)
|
||
embed_target_text = f"{insight_type} ({period or ''}): {content}"
|
||
embedding_queued = _enqueue_embedding("ai_insights", insight_id, embed_target_text)
|
||
if not embedding_queued:
|
||
try:
|
||
metadata_payload = metadata.copy() if isinstance(metadata, dict) else {}
|
||
metadata_payload["embedding_queue_error"] = True
|
||
session.execute(
|
||
text("""
|
||
UPDATE ai_insights
|
||
SET metadata_json = :meta,
|
||
updated_at = :now
|
||
WHERE id = :id
|
||
"""),
|
||
{
|
||
"meta": json.dumps(metadata_payload, ensure_ascii=False),
|
||
"now": _now_taipei_naive(),
|
||
"id": insight_id,
|
||
},
|
||
)
|
||
session.commit()
|
||
except Exception as meta_err:
|
||
session.rollback()
|
||
sys_log.warning(f"[OCLearn] embedding queue error metadata 寫入失敗: {meta_err}")
|
||
|
||
return insight_id
|
||
|
||
except Exception as e:
|
||
session.rollback()
|
||
sys_log.error(f"[OCLearn] store_insight 錯誤: {e}")
|
||
return None
|
||
finally:
|
||
session.close()
|
||
|
||
|
||
# =====================================================================
|
||
# ADR-005: 時間衰減品質分數
|
||
# =====================================================================
|
||
|
||
def compute_effective_score(base_score: float, created_at: datetime,
|
||
decay_rate: float = DECAY_RATE) -> float:
|
||
"""Effective_Score = base × exp(-λ × days_since_created)"""
|
||
if not created_at:
|
||
return base_score
|
||
if created_at.tzinfo is None:
|
||
created_at = created_at.replace(tzinfo=timezone.utc)
|
||
days = (datetime.now(timezone.utc) - created_at).days
|
||
return base_score * math.exp(-decay_rate * max(days, 0))
|
||
|
||
|
||
# =====================================================================
|
||
# RAG 檢索(套時間衰減)
|
||
# =====================================================================
|
||
|
||
def build_rag_context(query: str, insight_type: str = None, period: str = None,
|
||
top_k: int = 5) -> str:
|
||
"""
|
||
RAG 上下文檢索(Cache-aside 精確命中 + 未來語意檢索)
|
||
2026-04-19: 加入時間衰減重排(ADR-005)
|
||
"""
|
||
session = get_session()
|
||
try:
|
||
semantic_context = _build_semantic_rag_context(session, query, insight_type, period, top_k)
|
||
if semantic_context:
|
||
return semantic_context
|
||
|
||
q = session.query(AIInsight)
|
||
if insight_type:
|
||
q = q.filter_by(insight_type=insight_type)
|
||
if period:
|
||
q = q.filter_by(period=period)
|
||
|
||
if not period:
|
||
q = q.order_by(AIInsight.created_at.desc()).limit(20)
|
||
else:
|
||
q = q.limit(200)
|
||
|
||
results = q.all()
|
||
if not results:
|
||
return ""
|
||
|
||
# 套用時間衰減重排(無 avg_quality 欄位時,預設 base=0.5)
|
||
ranked = []
|
||
for r in results:
|
||
base = getattr(r, "avg_quality", None) or 0.5
|
||
exempt = getattr(r, "decay_exempt", False)
|
||
effective = base if exempt else compute_effective_score(base, r.created_at)
|
||
ranked.append((effective, r))
|
||
ranked.sort(key=lambda x: x[0], reverse=True)
|
||
|
||
top = ranked[:top_k]
|
||
context_parts = []
|
||
for score, r in top:
|
||
p_tag = f"[{r.period}]" if r.period else "[歷史紀錄]"
|
||
context_parts.append(
|
||
f"{p_tag} {r.insight_type} (分數={score:.2f}):\n{r.content}"
|
||
)
|
||
|
||
sys_log.info(f"[OCLearn] RAG context: {len(top)}/{len(results)} 筆(時間衰減重排)")
|
||
return "\n\n---\n\n".join(context_parts)
|
||
except Exception as e:
|
||
sys_log.error(f"[OCLearn] build_rag_context 錯誤: {e}")
|
||
return ""
|
||
finally:
|
||
session.close()
|
||
|
||
|
||
def _build_semantic_rag_context(session, query: str, insight_type: str = None,
|
||
period: str = None, top_k: int = 5) -> str:
|
||
if not query:
|
||
return ""
|
||
try:
|
||
vec = ollama_service.generate_embedding(query)
|
||
if not vec:
|
||
return ""
|
||
filters = ["embedding IS NOT NULL", "status IN ('approved', 'active', 'executed')"]
|
||
params = {"qvec": str(vec), "lim": top_k}
|
||
if insight_type:
|
||
filters.append("insight_type = :insight_type")
|
||
params["insight_type"] = insight_type
|
||
if period:
|
||
filters.append("period = :period")
|
||
params["period"] = period
|
||
rows = session.execute(
|
||
text(f"""
|
||
SELECT id, insight_type, period, content, avg_quality, decay_exempt, created_at,
|
||
embedding <=> CAST(:qvec AS vector) AS distance
|
||
FROM ai_insights
|
||
WHERE {' AND '.join(filters)}
|
||
ORDER BY distance ASC, created_at DESC
|
||
LIMIT :lim
|
||
"""),
|
||
params,
|
||
).fetchall()
|
||
if not rows:
|
||
return ""
|
||
parts = []
|
||
for row in rows:
|
||
base = row.avg_quality if row.avg_quality is not None else 0.5
|
||
effective = base if row.decay_exempt else compute_effective_score(base, row.created_at)
|
||
p_tag = f"[{row.period}]" if row.period else "[語意記憶]"
|
||
parts.append(
|
||
f"{p_tag} {row.insight_type} (語意距離={row.distance:.3f}, 分數={effective:.2f}):\n{row.content}"
|
||
)
|
||
sys_log.info(f"[OCLearn] semantic RAG context: {len(parts)} 筆")
|
||
return "\n\n---\n\n".join(parts)
|
||
except Exception as exc:
|
||
sys_log.debug(f"[OCLearn] semantic RAG fallback to decay ranking: {exc}")
|
||
return ""
|
||
|
||
|
||
def build_rag_context_by_date(start_date: str, end_date: str) -> str:
|
||
"""週報 RAG:依日期區間過濾 ai_insights"""
|
||
session = get_session()
|
||
try:
|
||
start_dt = datetime.strptime(f"{start_date} 00:00:00", "%Y-%m-%d %H:%M:%S")
|
||
end_dt = datetime.strptime(f"{end_date} 23:59:59", "%Y-%m-%d %H:%M:%S")
|
||
|
||
results = (
|
||
session.query(AIInsight)
|
||
.filter(AIInsight.created_at >= start_dt)
|
||
.filter(AIInsight.created_at <= end_dt)
|
||
.order_by(AIInsight.created_at.asc())
|
||
.limit(200)
|
||
.all()
|
||
)
|
||
if not results:
|
||
return ""
|
||
|
||
parts = []
|
||
for r in results:
|
||
dt_str = r.created_at.strftime("%Y-%m-%d %H:%M") if r.created_at else ""
|
||
p_tag = f"[{r.period if r.period else dt_str}]"
|
||
parts.append(f"{p_tag} 洞察類型({r.insight_type}):\n{r.content}")
|
||
|
||
sys_log.info(
|
||
f"[OCLearn] 週報 RAG: {len(results)} 筆({start_date}~{end_date})"
|
||
)
|
||
return "\n\n---\n\n".join(parts)
|
||
except Exception as e:
|
||
sys_log.error(f"[OCLearn] build_rag_context_by_date 錯誤: {e}")
|
||
return ""
|
||
finally:
|
||
session.close()
|
||
|
||
|
||
# =====================================================================
|
||
# 其他 API
|
||
# =====================================================================
|
||
|
||
def store_conversation(
|
||
user_id: int,
|
||
chat_id: int,
|
||
user_message: str,
|
||
bot_response: str,
|
||
source: str = "",
|
||
used_sources: list = None,
|
||
) -> bool:
|
||
"""用戶對話沉澱(走 store_insight 雙寫)
|
||
|
||
ADR-007 持久化鐵律:DB + KM 雙寫必達。
|
||
|
||
Args:
|
||
user_id: 使用者 ID(OpenClaw bot 目前傳 0,待後續 user_id 真實化)
|
||
chat_id: Telegram chat_id(同上,目前傳 0)
|
||
user_message: 用戶訊息
|
||
bot_response: bot 回覆
|
||
source: 回覆來源標記,例:'ollama' / 'direct' / 'tool1,tool2'
|
||
used_sources: 觸發的 tool/sources list,例:['query_sales', 'get_market_intel']
|
||
|
||
元數據策略:source / used_sources / chat_id 都寫入 metadata,
|
||
保留給後續分析(哪個來源/工具被使用、哪個 chat 觸發)。
|
||
"""
|
||
used_sources = used_sources or []
|
||
try:
|
||
store_insight(
|
||
insight_type="chat_history",
|
||
content=f"User: {user_message}\nBot: {bot_response}",
|
||
metadata={
|
||
"user_id": user_id,
|
||
"chat_id": chat_id,
|
||
"source": source,
|
||
"used_sources": used_sources,
|
||
},
|
||
)
|
||
return True
|
||
except Exception as e:
|
||
sys_log.error(f"[OCLearn] store_conversation 錯誤: {e}")
|
||
return False
|
||
|
||
|
||
def update_feedback(insight_id: int, rating: int, comment: str = None) -> bool:
|
||
"""用戶回饋:累加 feedback_up/down 並重算 avg_quality(Migration 010 後生效)"""
|
||
session = get_session()
|
||
try:
|
||
col = "feedback_up" if rating >= 4 else "feedback_down"
|
||
session.execute(
|
||
text(f"UPDATE ai_insights SET {col} = COALESCE({col},0) + 1 WHERE id = :id"),
|
||
{"id": insight_id},
|
||
)
|
||
# 重算 avg_quality = up / (up+down)
|
||
session.execute(
|
||
text("""
|
||
UPDATE ai_insights
|
||
SET avg_quality = CASE
|
||
WHEN COALESCE(feedback_up,0) + COALESCE(feedback_down,0) = 0 THEN 0.5
|
||
ELSE COALESCE(feedback_up,0)::float
|
||
/ (COALESCE(feedback_up,0) + COALESCE(feedback_down,0))
|
||
END
|
||
WHERE id = :id
|
||
"""),
|
||
{"id": insight_id},
|
||
)
|
||
session.commit()
|
||
sys_log.info(f"[OCLearn] Feedback {insight_id}: rating={rating}")
|
||
return True
|
||
except Exception as e:
|
||
session.rollback()
|
||
sys_log.warning(f"[OCLearn] update_feedback 略過 (Migration 010 未執行?): {e}")
|
||
return False
|
||
finally:
|
||
session.close()
|
||
|
||
|
||
def get_learning_stats() -> dict:
|
||
"""學習系統總攬指標"""
|
||
session = get_session()
|
||
try:
|
||
count = session.query(AIInsight).count()
|
||
try:
|
||
pending = session.execute(
|
||
text("SELECT COUNT(*) FROM embedding_retry_queue WHERE status='pending'")
|
||
).scalar()
|
||
except Exception:
|
||
pending = None
|
||
return {
|
||
"total_insights": count,
|
||
"embedding_queue_pending": pending,
|
||
"status": "active",
|
||
}
|
||
except Exception as e:
|
||
return {"total_insights": 0, "status": "error", "error": str(e)}
|
||
finally:
|
||
session.close()
|
||
|
||
|
||
# =====================================================================
|
||
# ADR-005 Step-2: 每日批次維護(03:00 去重 / 04:00 品質分數重算)
|
||
# =====================================================================
|
||
|
||
def run_dedup_batch() -> dict:
|
||
"""
|
||
03:00 批次:去除同日同 SKU 同 insight_type 的重複洞察。
|
||
策略:保留 avg_quality 最高者(tie-break: 最新 id),其餘設 status='archived'。
|
||
回傳 {"archived": int, "scanned": int}
|
||
"""
|
||
session = get_session()
|
||
archived = 0
|
||
scanned = 0
|
||
try:
|
||
dupes = session.execute(text("""
|
||
SELECT insight_type, product_sku, period, COUNT(*) AS cnt
|
||
FROM ai_insights
|
||
WHERE status NOT IN ('archived', 'relearn')
|
||
AND product_sku IS NOT NULL
|
||
GROUP BY insight_type, product_sku, period
|
||
HAVING COUNT(*) > 1
|
||
""")).fetchall()
|
||
|
||
for row in dupes:
|
||
scanned += row[3]
|
||
keep = session.execute(text("""
|
||
SELECT id FROM ai_insights
|
||
WHERE insight_type = :t AND product_sku = :sku
|
||
AND period IS NOT DISTINCT FROM :p
|
||
AND status NOT IN ('archived', 'relearn')
|
||
ORDER BY avg_quality DESC NULLS LAST, id DESC
|
||
LIMIT 1
|
||
"""), {"t": row[0], "sku": row[1], "p": row[2]}).scalar()
|
||
|
||
if keep is None:
|
||
continue
|
||
|
||
result = session.execute(text("""
|
||
UPDATE ai_insights
|
||
SET status = 'archived', updated_at = CURRENT_TIMESTAMP
|
||
WHERE insight_type = :t AND product_sku = :sku
|
||
AND period IS NOT DISTINCT FROM :p
|
||
AND status NOT IN ('archived', 'relearn')
|
||
AND id != :keep_id
|
||
"""), {"t": row[0], "sku": row[1], "p": row[2], "keep_id": keep})
|
||
archived += result.rowcount
|
||
|
||
session.commit()
|
||
sys_log.info(f"[OCLearn] 去重批次完成:掃描 {scanned} 筆,歸檔 {archived} 筆")
|
||
return {"archived": archived, "scanned": scanned}
|
||
except Exception as e:
|
||
session.rollback()
|
||
sys_log.error(f"[OCLearn] 去重批次失敗: {e}")
|
||
return {"archived": 0, "scanned": 0, "error": str(e)}
|
||
finally:
|
||
session.close()
|
||
|
||
|
||
def run_quality_rescore_batch() -> dict:
|
||
"""
|
||
04:00 批次:對所有 approved/relearn 狀態的 ai_insights 套用時間衰減公式。
|
||
decay_exempt=True 的記錄跳過衰減(永久知識)。
|
||
relearn 狀態額外懲罰 20%。分數 < 0.05 自動歸檔。
|
||
回傳 {"updated": int, "relearn_reset": int}
|
||
"""
|
||
session = get_session()
|
||
updated = 0
|
||
relearn_reset = 0
|
||
try:
|
||
# 單條 SQL UPDATE:指數衰減計算移至 SQL,避免 N+1 UPDATE
|
||
# decay_exempt=TRUE 的記錄跳過(永久知識)
|
||
# relearn 狀態額外懲罰 20%(乘以 0.8)
|
||
result = session.execute(text("""
|
||
UPDATE ai_insights
|
||
SET avg_quality = ROUND(
|
||
CASE WHEN status = 'relearn'
|
||
THEN avg_quality * EXP(-:rate * (CURRENT_DATE - created_at::date)) * 0.8
|
||
ELSE avg_quality * EXP(-:rate * (CURRENT_DATE - created_at::date))
|
||
END,
|
||
4
|
||
),
|
||
updated_at = CURRENT_TIMESTAMP
|
||
WHERE status IN ('approved', 'relearn')
|
||
AND decay_exempt = FALSE
|
||
AND created_at < CURRENT_DATE
|
||
"""), {"rate": DECAY_RATE})
|
||
updated = result.rowcount
|
||
|
||
# 低分自動歸檔(分數 < 0.05)
|
||
archive_result = session.execute(text("""
|
||
UPDATE ai_insights
|
||
SET status = 'archived', updated_at = CURRENT_TIMESTAMP
|
||
WHERE status IN ('approved', 'relearn')
|
||
AND decay_exempt = FALSE
|
||
AND avg_quality < 0.05
|
||
"""))
|
||
|
||
# 計算 relearn_reset(歸檔前 status 為 relearn 的筆數)
|
||
relearn_reset_result = session.execute(text("""
|
||
SELECT COUNT(*) FROM ai_insights
|
||
WHERE status = 'archived'
|
||
AND decay_exempt = FALSE
|
||
AND avg_quality < 0.05
|
||
AND updated_at >= CURRENT_TIMESTAMP - INTERVAL '5 seconds'
|
||
"""))
|
||
relearn_reset = relearn_reset_result.scalar() or 0
|
||
|
||
session.commit()
|
||
sys_log.info(
|
||
f"[OCLearn] 品質分數重算完成:更新 {updated} 筆,"
|
||
f"低分自動歸檔含 relearn {relearn_reset} 筆"
|
||
)
|
||
return {"updated": updated, "relearn_reset": relearn_reset}
|
||
except Exception as e:
|
||
session.rollback()
|
||
sys_log.error(f"[OCLearn] 品質分數重算批次失敗: {e}")
|
||
return {"updated": 0, "relearn_reset": 0, "error": str(e)}
|
||
finally:
|
||
session.close()
|