Files
ewoooc/services/openclaw_learning_service.py
OoO 9bafa73ffc
All checks were successful
CD Pipeline / deploy (push) Successful in 1m10s
fix: defer embedding work during gcp backoff
2026-06-18 12:06:24 +08:00

795 lines
30 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import json
import math
import threading
import time
from datetime import datetime, timedelta, timezone
from sqlalchemy import text
from services.logger_manager import SystemLogger
from database.manager import get_session
from database.ai_models import AIInsight
from services.ollama_service import (
embedding_gcp_circuit_remaining_seconds,
is_embedding_gcp_circuit_open,
ollama_service,
)
sys_log = SystemLogger("OCLearn").get_logger()
# =====================================================================
# ADR-007 Step 4 (2026-04-19): DB 持久化 Embedding Retry Queue
#
# 改造原因:原記憶體 Queue 重啟會遺失未處理項目,違反 ADR-007「雙寫必達」。
# 新架構:入列寫 embedding_retry_queue 表 → Hermes worker 每分鐘輪詢批次處理。
# =====================================================================
EMBED_POLL_INTERVAL_SEC = 60 # worker 輪詢間隔
EMBED_BATCH_SIZE = 10 # 單次最多處理筆數
EMBED_MAX_ATTEMPTS = 5 # 超過則標記 failed
DECAY_RATE = 0.01 # ADR-005半衰期約 70 天
_EMBEDDING_TARGET_TABLES = {"ai_insights", "learning_episodes"}
TAIPEI_TZ = timezone(timedelta(hours=8))
def _now_taipei_naive() -> datetime:
"""DB DateTime 欄位統一存台北時間 naive避免 UTC stale 判斷誤判。"""
return datetime.now(TAIPEI_TZ).replace(tzinfo=None)
def _enqueue_embedding(target_table: str, target_id: int, text_content: str,
model: str = "bge-m3:latest") -> bool:
"""將待 embed 項目寫入 DB retry queue持久化"""
if target_table not in _EMBEDDING_TARGET_TABLES:
sys_log.warning(f"[OCLearn] enqueue embedding 拒絕未知 target_table={target_table}")
return False
session = get_session()
try:
session.execute(
text("""
INSERT INTO embedding_retry_queue
(target_table, target_id, text_content, model, status, created_at)
VALUES (:t, :i, :txt, :m, 'pending', :now)
"""),
{
"t": target_table,
"i": target_id,
"txt": text_content,
"m": model,
"now": _now_taipei_naive(),
},
)
session.commit()
return True
except Exception as e:
session.rollback()
sys_log.warning(f"[OCLearn] enqueue embedding 失敗 (可能表尚未建立): {e}")
return False
finally:
session.close()
def _defer_embedding_row(row_id: int, reason: str) -> bool:
"""把已 claim 的 embedding 任務退回 pending不扣 attempts。"""
session = get_session()
try:
session.execute(
text("""
UPDATE embedding_retry_queue
SET status='pending',
last_error=:err,
updated_at=:now
WHERE id=:id
"""),
{
"err": reason[:500],
"now": _now_taipei_naive(),
"id": row_id,
},
)
session.commit()
return True
except Exception as exc:
session.rollback()
sys_log.warning(f"[OCLearn] embedding 延後處理狀態更新失敗: {exc}")
return False
finally:
session.close()
def enqueue_insight_embedding(insight_id: int, insight_type: str, content: str,
period: str = None) -> bool:
"""Public bridge for legacy raw ai_insights inserts to join ADR-007 embedding queue."""
if not insight_id or not content:
return False
embed_target_text = f"{insight_type} ({period or ''}): {content}"
return _enqueue_embedding("ai_insights", int(insight_id), embed_target_text)
def enqueue_missing_insight_embeddings(limit: int = 200) -> dict:
"""Backfill existing ai_insights that have not yet entered the embedding queue."""
session = get_session()
try:
rows = session.execute(
text("""
SELECT id, insight_type, period, content
FROM ai_insights i
WHERE i.embedding IS NULL
AND COALESCE(i.status, 'approved') NOT IN ('archived', 'rejected')
AND NOT EXISTS (
SELECT 1
FROM embedding_retry_queue q
WHERE q.target_table = 'ai_insights'
AND q.target_id = i.id
AND q.status IN ('pending', 'processing', 'done')
)
ORDER BY i.created_at DESC
LIMIT :lim
"""),
{"lim": limit},
).fetchall()
except Exception as exc:
sys_log.warning(f"[OCLearn] embedding backfill 略過 (schema/pgvector 未就緒?): {exc}")
return {"scanned": 0, "enqueued": 0, "status": "skipped", "error": str(exc)[:200]}
finally:
session.close()
enqueued = 0
for row in rows:
if enqueue_insight_embedding(row.id, row.insight_type, row.content, row.period):
enqueued += 1
return {"scanned": len(rows), "enqueued": enqueued, "status": "ok"}
def _process_one_embedding(row_id: int, target_table: str, target_id: int,
text_content: str, model: str) -> bool:
"""處理單筆 embedding成功寫回目標表失敗累加 attempts"""
session = get_session()
try:
session.execute(
text("UPDATE embedding_retry_queue SET status='processing', updated_at=:now WHERE id=:id"),
{"now": _now_taipei_naive(), "id": row_id},
)
session.commit()
vec = ollama_service.generate_embedding(
text_content,
model=model,
allow_111_fallback=False,
)
if not vec:
if is_embedding_gcp_circuit_open():
remaining = embedding_gcp_circuit_remaining_seconds()
reason = f"GCP embedding 熔斷中,延後處理,不扣 attempts{remaining:.0f}s 後重試)"
_defer_embedding_row(row_id, reason)
sys_log.info(f"[OCLearn] embedding 延後處理 row_id={row_id}: {reason}")
return False
raise RuntimeError("embedding 回傳空值")
if target_table not in _EMBEDDING_TARGET_TABLES:
raise RuntimeError(f"不允許的 embedding target_table: {target_table}")
from services.rag_service import get_embedding_signature
vec_str = str(vec)
embedding_signature = get_embedding_signature(model=model, dim=len(vec))
session.execute(
text(f"""
UPDATE {target_table}
SET embedding = :vec,
embedding_signature = :sig
WHERE id = :id
"""),
{"vec": vec_str, "sig": embedding_signature, "id": target_id},
)
session.execute(
text("""
UPDATE embedding_retry_queue
SET status='done', processed_at=:now, updated_at=:now
WHERE id=:id
"""),
{"now": _now_taipei_naive(), "id": row_id},
)
session.commit()
sys_log.debug(f"[OCLearn] embedding 寫入成功 {target_table}#{target_id}")
return True
except Exception as e:
session.rollback()
try:
session.execute(
text("""
UPDATE embedding_retry_queue
SET attempts = attempts + 1,
last_error = :err,
status = CASE WHEN attempts + 1 >= :max THEN 'failed' ELSE 'pending' END,
updated_at = :now
WHERE id = :id
"""),
{
"err": str(e)[:500],
"max": EMBED_MAX_ATTEMPTS,
"now": _now_taipei_naive(),
"id": row_id,
},
)
session.commit()
except Exception as e2:
session.rollback()
sys_log.error(f"[OCLearn] 更新 retry attempts 失敗: {e2}")
sys_log.warning(f"[OCLearn] embedding 失敗 (attempt 累加): {e}")
return False
finally:
session.close()
def _claim_pending_embeddings(limit: int = EMBED_BATCH_SIZE,
max_attempts: int = EMBED_MAX_ATTEMPTS):
"""Atomically claim pending embedding jobs across multiple workers."""
if is_embedding_gcp_circuit_open():
remaining = embedding_gcp_circuit_remaining_seconds()
sys_log.info(f"[OCLearn] embedding GCP 熔斷中,暫停 claim pending{remaining:.0f}s")
return []
session = get_session()
try:
session.execute(
text("""
UPDATE embedding_retry_queue
SET status = 'pending',
updated_at = :now,
last_error = COALESCE(last_error, '') || ' | reset stale processing'
WHERE status = 'processing'
AND updated_at < :cutoff
"""),
{
"now": _now_taipei_naive(),
"cutoff": _now_taipei_naive() - timedelta(minutes=15),
},
)
session.commit()
rows = session.execute(
text("""
WITH picked AS (
SELECT id
FROM embedding_retry_queue
WHERE status = 'pending'
AND attempts < :max
ORDER BY created_at
FOR UPDATE SKIP LOCKED
LIMIT :lim
)
UPDATE embedding_retry_queue q
SET status = 'processing',
updated_at = :now
FROM picked
WHERE q.id = picked.id
RETURNING q.id, q.target_table, q.target_id, q.text_content, q.model
"""),
{
"now": _now_taipei_naive(),
"max": max_attempts,
"lim": limit,
},
).fetchall()
session.commit()
return rows
except Exception:
session.rollback()
raise
finally:
session.close()
def _embedding_worker_loop():
"""背景執行緒:輪詢 embedding_retry_queue批次處理 pending 項目"""
sys_log.info("[OCLearn] Hermes Embedding Worker (DB-backed) 啟動")
while True:
try:
rows = _claim_pending_embeddings()
for idx, row in enumerate(rows):
_process_one_embedding(
row[0], row[1], row[2], row[3], row[4] or "bge-m3:latest"
)
if is_embedding_gcp_circuit_open():
remaining_rows = rows[idx + 1:]
remaining = embedding_gcp_circuit_remaining_seconds()
reason = f"GCP embedding 熔斷中,同批任務退回 pending不扣 attempts{remaining:.0f}s 後重試)"
for pending_row in remaining_rows:
_defer_embedding_row(pending_row[0], reason)
if remaining_rows:
sys_log.info(
f"[OCLearn] embedding 同批任務已延後 {len(remaining_rows)} 筆: {reason}"
)
break
except Exception as e:
sys_log.warning(f"[OCLearn] worker 輪詢略過 (表可能未建立): {e}")
time.sleep(EMBED_POLL_INTERVAL_SEC)
# 啟動背景 WorkerdaemonPython 進程結束會自然回收)
threading.Thread(target=_embedding_worker_loop, daemon=True).start()
# =====================================================================
# ADR-007 核心store_insight 雙寫DB + 排程 embedding
# =====================================================================
def store_insight(insight_type: str, content: str, period: str = None,
product_sku: str = None, metadata: dict = None,
ai_model: str = None, confidence: float = None,
created_by: str = None, status: str = None) -> int:
"""
將 AI 產出存入 ai_insights 表並排程向量化。
- Cache-aside同 insight_type + period + product_sku 已存在則覆蓋
- 向量化:寫入 embedding_retry_queue 供 Hermes worker 處理(持久化)
"""
session = get_session()
try:
meta_str = json.dumps(metadata, ensure_ascii=False) if metadata else None
existing = None
if period and insight_type:
q = session.query(AIInsight).filter_by(insight_type=insight_type, period=period)
if product_sku:
q = q.filter_by(product_sku=product_sku)
existing = q.first()
if existing:
existing.content = content
if meta_str:
existing.metadata_json = meta_str
existing.updated_at = _now_taipei_naive()
session.commit()
insight_id = existing.id
sys_log.info(f"[OCLearn] 更新 insight_type={insight_type} period={period}")
else:
insight_kwargs = {
"insight_type": insight_type,
"period": period,
"product_sku": product_sku,
"content": content,
"metadata_json": meta_str,
"status": status or "approved",
"created_at": _now_taipei_naive(),
"updated_at": _now_taipei_naive(),
}
if confidence is not None:
insight_kwargs["confidence"] = confidence
if created_by:
insight_kwargs["created_by"] = created_by
new_insight = AIInsight(**insight_kwargs)
session.add(new_insight)
session.commit()
insight_id = new_insight.id
sys_log.info(f"[OCLearn] 新增 insight_type={insight_type} period={period}")
# 若已有進階欄位則寫入Migration 010/015 後可用,先試再容錯)
update_fields = {}
if ai_model:
update_fields["ai_model"] = ai_model
if confidence is not None:
update_fields["confidence"] = confidence
if created_by:
update_fields["created_by"] = created_by
if status:
update_fields["status"] = status
if update_fields:
try:
assignments = ", ".join(f"{key} = :{key}" for key in update_fields)
params = dict(update_fields)
params["i"] = insight_id
session.execute(
text(f"UPDATE ai_insights SET {assignments}, updated_at = :now WHERE id = :i"),
{**params, "now": _now_taipei_naive()},
)
session.commit()
except Exception:
session.rollback()
# 推入 DB retry queue持久化
embed_target_text = f"{insight_type} ({period or ''}): {content}"
embedding_queued = _enqueue_embedding("ai_insights", insight_id, embed_target_text)
if not embedding_queued:
try:
metadata_payload = metadata.copy() if isinstance(metadata, dict) else {}
metadata_payload["embedding_queue_error"] = True
session.execute(
text("""
UPDATE ai_insights
SET metadata_json = :meta,
updated_at = :now
WHERE id = :id
"""),
{
"meta": json.dumps(metadata_payload, ensure_ascii=False),
"now": _now_taipei_naive(),
"id": insight_id,
},
)
session.commit()
except Exception as meta_err:
session.rollback()
sys_log.warning(f"[OCLearn] embedding queue error metadata 寫入失敗: {meta_err}")
return insight_id
except Exception as e:
session.rollback()
sys_log.error(f"[OCLearn] store_insight 錯誤: {e}")
return None
finally:
session.close()
# =====================================================================
# ADR-005: 時間衰減品質分數
# =====================================================================
def compute_effective_score(base_score: float, created_at: datetime,
decay_rate: float = DECAY_RATE) -> float:
"""Effective_Score = base × exp(-λ × days_since_created)"""
if not created_at:
return base_score
if created_at.tzinfo is None:
created_at = created_at.replace(tzinfo=timezone.utc)
days = (datetime.now(timezone.utc) - created_at).days
return base_score * math.exp(-decay_rate * max(days, 0))
# =====================================================================
# RAG 檢索(套時間衰減)
# =====================================================================
def build_rag_context(query: str, insight_type: str = None, period: str = None,
top_k: int = 5) -> str:
"""
RAG 上下文檢索Cache-aside 精確命中 + 未來語意檢索)
2026-04-19: 加入時間衰減重排ADR-005
"""
session = get_session()
try:
semantic_context = _build_semantic_rag_context(session, query, insight_type, period, top_k)
if semantic_context:
return semantic_context
q = session.query(AIInsight)
if insight_type:
q = q.filter_by(insight_type=insight_type)
if period:
q = q.filter_by(period=period)
if not period:
q = q.order_by(AIInsight.created_at.desc()).limit(20)
else:
q = q.limit(200)
results = q.all()
if not results:
return ""
# 套用時間衰減重排(無 avg_quality 欄位時,預設 base=0.5
ranked = []
for r in results:
base = getattr(r, "avg_quality", None) or 0.5
exempt = getattr(r, "decay_exempt", False)
effective = base if exempt else compute_effective_score(base, r.created_at)
ranked.append((effective, r))
ranked.sort(key=lambda x: x[0], reverse=True)
top = ranked[:top_k]
context_parts = []
for score, r in top:
p_tag = f"[{r.period}]" if r.period else "[歷史紀錄]"
context_parts.append(
f"{p_tag} {r.insight_type} (分數={score:.2f}):\n{r.content}"
)
sys_log.info(f"[OCLearn] RAG context: {len(top)}/{len(results)} 筆(時間衰減重排)")
return "\n\n---\n\n".join(context_parts)
except Exception as e:
sys_log.error(f"[OCLearn] build_rag_context 錯誤: {e}")
return ""
finally:
session.close()
def _build_semantic_rag_context(session, query: str, insight_type: str = None,
period: str = None, top_k: int = 5) -> str:
if not query:
return ""
try:
vec = ollama_service.generate_embedding(query, allow_111_fallback=False)
if not vec:
return ""
filters = ["embedding IS NOT NULL", "status IN ('approved', 'active', 'executed')"]
params = {"qvec": str(vec), "lim": top_k}
if insight_type:
filters.append("insight_type = :insight_type")
params["insight_type"] = insight_type
if period:
filters.append("period = :period")
params["period"] = period
rows = session.execute(
text(f"""
SELECT id, insight_type, period, content, avg_quality, decay_exempt, created_at,
embedding <=> CAST(:qvec AS vector) AS distance
FROM ai_insights
WHERE {' AND '.join(filters)}
ORDER BY distance ASC, created_at DESC
LIMIT :lim
"""),
params,
).fetchall()
if not rows:
return ""
parts = []
for row in rows:
base = row.avg_quality if row.avg_quality is not None else 0.5
effective = base if row.decay_exempt else compute_effective_score(base, row.created_at)
p_tag = f"[{row.period}]" if row.period else "[語意記憶]"
parts.append(
f"{p_tag} {row.insight_type} (語意距離={row.distance:.3f}, 分數={effective:.2f}):\n{row.content}"
)
sys_log.info(f"[OCLearn] semantic RAG context: {len(parts)}")
return "\n\n---\n\n".join(parts)
except Exception as exc:
sys_log.debug(f"[OCLearn] semantic RAG fallback to decay ranking: {exc}")
return ""
def build_rag_context_by_date(start_date: str, end_date: str) -> str:
"""週報 RAG依日期區間過濾 ai_insights"""
session = get_session()
try:
start_dt = datetime.strptime(f"{start_date} 00:00:00", "%Y-%m-%d %H:%M:%S")
end_dt = datetime.strptime(f"{end_date} 23:59:59", "%Y-%m-%d %H:%M:%S")
results = (
session.query(AIInsight)
.filter(AIInsight.created_at >= start_dt)
.filter(AIInsight.created_at <= end_dt)
.order_by(AIInsight.created_at.asc())
.limit(200)
.all()
)
if not results:
return ""
parts = []
for r in results:
dt_str = r.created_at.strftime("%Y-%m-%d %H:%M") if r.created_at else ""
p_tag = f"[{r.period if r.period else dt_str}]"
parts.append(f"{p_tag} 洞察類型({r.insight_type}):\n{r.content}")
sys_log.info(
f"[OCLearn] 週報 RAG: {len(results)} 筆({start_date}~{end_date}"
)
return "\n\n---\n\n".join(parts)
except Exception as e:
sys_log.error(f"[OCLearn] build_rag_context_by_date 錯誤: {e}")
return ""
finally:
session.close()
# =====================================================================
# 其他 API
# =====================================================================
def store_conversation(
user_id: int,
chat_id: int,
user_message: str,
bot_response: str,
source: str = "",
used_sources: list = None,
) -> bool:
"""用戶對話沉澱(走 store_insight 雙寫)
ADR-007 持久化鐵律DB + KM 雙寫必達。
Args:
user_id: 使用者 IDOpenClaw bot 目前傳 0待後續 user_id 真實化)
chat_id: Telegram chat_id同上目前傳 0
user_message: 用戶訊息
bot_response: bot 回覆
source: 回覆來源標記,例:'ollama' / 'direct' / 'tool1,tool2'
used_sources: 觸發的 tool/sources list['query_sales', 'get_market_intel']
元數據策略source / used_sources / chat_id 都寫入 metadata
保留給後續分析(哪個來源/工具被使用、哪個 chat 觸發)。
"""
used_sources = used_sources or []
try:
store_insight(
insight_type="chat_history",
content=f"User: {user_message}\nBot: {bot_response}",
metadata={
"user_id": user_id,
"chat_id": chat_id,
"source": source,
"used_sources": used_sources,
},
)
return True
except Exception as e:
sys_log.error(f"[OCLearn] store_conversation 錯誤: {e}")
return False
def update_feedback(insight_id: int, rating: int, comment: str = None) -> bool:
"""用戶回饋:累加 feedback_up/down 並重算 avg_qualityMigration 010 後生效)"""
session = get_session()
try:
col = "feedback_up" if rating >= 4 else "feedback_down"
session.execute(
text(f"UPDATE ai_insights SET {col} = COALESCE({col},0) + 1 WHERE id = :id"),
{"id": insight_id},
)
# 重算 avg_quality = up / (up+down)
session.execute(
text("""
UPDATE ai_insights
SET avg_quality = CASE
WHEN COALESCE(feedback_up,0) + COALESCE(feedback_down,0) = 0 THEN 0.5
ELSE COALESCE(feedback_up,0)::float
/ (COALESCE(feedback_up,0) + COALESCE(feedback_down,0))
END
WHERE id = :id
"""),
{"id": insight_id},
)
session.commit()
sys_log.info(f"[OCLearn] Feedback {insight_id}: rating={rating}")
return True
except Exception as e:
session.rollback()
sys_log.warning(f"[OCLearn] update_feedback 略過 (Migration 010 未執行?): {e}")
return False
finally:
session.close()
def get_learning_stats() -> dict:
"""學習系統總攬指標"""
session = get_session()
try:
count = session.query(AIInsight).count()
try:
pending = session.execute(
text("SELECT COUNT(*) FROM embedding_retry_queue WHERE status='pending'")
).scalar()
except Exception:
pending = None
return {
"total_insights": count,
"embedding_queue_pending": pending,
"status": "active",
}
except Exception as e:
return {"total_insights": 0, "status": "error", "error": str(e)}
finally:
session.close()
# =====================================================================
# ADR-005 Step-2: 每日批次維護03:00 去重 / 04:00 品質分數重算)
# =====================================================================
def run_dedup_batch() -> dict:
"""
03:00 批次:去除同日同 SKU 同 insight_type 的重複洞察。
策略:保留 avg_quality 最高者tie-break: 最新 id其餘設 status='archived'
回傳 {"archived": int, "scanned": int}
"""
session = get_session()
archived = 0
scanned = 0
try:
dupes = session.execute(text("""
SELECT insight_type, product_sku, period, COUNT(*) AS cnt
FROM ai_insights
WHERE status NOT IN ('archived', 'relearn')
AND product_sku IS NOT NULL
GROUP BY insight_type, product_sku, period
HAVING COUNT(*) > 1
""")).fetchall()
for row in dupes:
scanned += row[3]
keep = session.execute(text("""
SELECT id FROM ai_insights
WHERE insight_type = :t AND product_sku = :sku
AND period IS NOT DISTINCT FROM :p
AND status NOT IN ('archived', 'relearn')
ORDER BY avg_quality DESC NULLS LAST, id DESC
LIMIT 1
"""), {"t": row[0], "sku": row[1], "p": row[2]}).scalar()
if keep is None:
continue
result = session.execute(text("""
UPDATE ai_insights
SET status = 'archived', updated_at = CURRENT_TIMESTAMP
WHERE insight_type = :t AND product_sku = :sku
AND period IS NOT DISTINCT FROM :p
AND status NOT IN ('archived', 'relearn')
AND id != :keep_id
"""), {"t": row[0], "sku": row[1], "p": row[2], "keep_id": keep})
archived += result.rowcount
session.commit()
sys_log.info(f"[OCLearn] 去重批次完成:掃描 {scanned} 筆,歸檔 {archived}")
return {"archived": archived, "scanned": scanned}
except Exception as e:
session.rollback()
sys_log.error(f"[OCLearn] 去重批次失敗: {e}")
return {"archived": 0, "scanned": 0, "error": str(e)}
finally:
session.close()
def run_quality_rescore_batch() -> dict:
"""
04:00 批次:對所有 approved/relearn 狀態的 ai_insights 套用時間衰減公式。
decay_exempt=True 的記錄跳過衰減(永久知識)。
relearn 狀態額外懲罰 20%。分數 < 0.05 自動歸檔。
回傳 {"updated": int, "relearn_reset": int}
"""
session = get_session()
updated = 0
relearn_reset = 0
try:
# 單條 SQL UPDATE指數衰減計算移至 SQL避免 N+1 UPDATE
# decay_exempt=TRUE 的記錄跳過(永久知識)
# relearn 狀態額外懲罰 20%(乘以 0.8
result = session.execute(text("""
UPDATE ai_insights
SET avg_quality = ROUND(
CASE WHEN status = 'relearn'
THEN avg_quality * EXP(-:rate * (CURRENT_DATE - created_at::date)) * 0.8
ELSE avg_quality * EXP(-:rate * (CURRENT_DATE - created_at::date))
END,
4
),
updated_at = CURRENT_TIMESTAMP
WHERE status IN ('approved', 'relearn')
AND decay_exempt = FALSE
AND created_at < CURRENT_DATE
"""), {"rate": DECAY_RATE})
updated = result.rowcount
# 低分自動歸檔(分數 < 0.05
archive_result = session.execute(text("""
UPDATE ai_insights
SET status = 'archived', updated_at = CURRENT_TIMESTAMP
WHERE status IN ('approved', 'relearn')
AND decay_exempt = FALSE
AND avg_quality < 0.05
"""))
# 計算 relearn_reset歸檔前 status 為 relearn 的筆數)
relearn_reset_result = session.execute(text("""
SELECT COUNT(*) FROM ai_insights
WHERE status = 'archived'
AND decay_exempt = FALSE
AND avg_quality < 0.05
AND updated_at >= CURRENT_TIMESTAMP - INTERVAL '5 seconds'
"""))
relearn_reset = relearn_reset_result.scalar() or 0
session.commit()
sys_log.info(
f"[OCLearn] 品質分數重算完成:更新 {updated} 筆,"
f"低分自動歸檔含 relearn {relearn_reset}"
)
return {"updated": updated, "relearn_reset": relearn_reset}
except Exception as e:
session.rollback()
sys_log.error(f"[OCLearn] 品質分數重算批次失敗: {e}")
return {"updated": 0, "relearn_reset": 0, "error": str(e)}
finally:
session.close()