Files
ewoooc/services/openclaw_learning_service.py
ogt e6109c2ef8
All checks were successful
CD Pipeline / deploy (push) Successful in 1m8s
feat(adr-005): 每日去重 03:00 + 品質分數重算 04:00 批次
openclaw_learning_service.py:
- run_dedup_batch(): 同 SKU/type/period 保留最高 avg_quality,其餘 archived
- run_quality_rescore_batch(): 套時間衰減公式全量重算 avg_quality;
  relearn 狀態額外 -20%;分數 < 0.05 自動歸檔

scheduler.py + run_scheduler.py:
- run_dedup_batch_task()  → 每日 03:00
- run_quality_rescore_task() → 每日 04:00

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-19 11:38:01 +08:00

511 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import json
import math
import threading
import time
from datetime import datetime, timezone
from sqlalchemy import text
from services.logger_manager import SystemLogger
from database.manager import get_session
from database.ai_models import AIInsight
from services.ollama_service import ollama_service
sys_log = SystemLogger("OCLearn").get_logger()
# =====================================================================
# ADR-007 Step 4 (2026-04-19): DB 持久化 Embedding Retry Queue
#
# 改造原因:原記憶體 Queue 重啟會遺失未處理項目,違反 ADR-007「雙寫必達」。
# 新架構:入列寫 embedding_retry_queue 表 → Hermes worker 每分鐘輪詢批次處理。
# =====================================================================
EMBED_POLL_INTERVAL_SEC = 60 # worker 輪詢間隔
EMBED_BATCH_SIZE = 10 # 單次最多處理筆數
EMBED_MAX_ATTEMPTS = 5 # 超過則標記 failed
DECAY_RATE = 0.01 # ADR-005半衰期約 70 天
def _enqueue_embedding(target_table: str, target_id: int, text_content: str,
model: str = "bge-m3:latest") -> bool:
"""將待 embed 項目寫入 DB retry queue持久化"""
session = get_session()
try:
session.execute(
text("""
INSERT INTO embedding_retry_queue
(target_table, target_id, text_content, model, status, created_at)
VALUES (:t, :i, :txt, :m, 'pending', :now)
"""),
{
"t": target_table,
"i": target_id,
"txt": text_content,
"m": model,
"now": datetime.now(),
},
)
session.commit()
return True
except Exception as e:
session.rollback()
sys_log.warning(f"[OCLearn] enqueue embedding 失敗 (可能表尚未建立): {e}")
return False
finally:
session.close()
def _process_one_embedding(row_id: int, target_table: str, target_id: int,
text_content: str, model: str) -> bool:
"""處理單筆 embedding成功寫回目標表失敗累加 attempts"""
session = get_session()
try:
session.execute(
text("UPDATE embedding_retry_queue SET status='processing', updated_at=:now WHERE id=:id"),
{"now": datetime.now(), "id": row_id},
)
session.commit()
vec = ollama_service.generate_embedding(text_content, model=model)
if not vec:
raise RuntimeError("embedding 回傳空值")
vec_str = str(vec)
session.execute(
text(f"UPDATE {target_table} SET embedding = :vec WHERE id = :id"),
{"vec": vec_str, "id": target_id},
)
session.execute(
text("""
UPDATE embedding_retry_queue
SET status='done', processed_at=:now, updated_at=:now
WHERE id=:id
"""),
{"now": datetime.now(), "id": row_id},
)
session.commit()
sys_log.debug(f"[OCLearn] embedding 寫入成功 {target_table}#{target_id}")
return True
except Exception as e:
session.rollback()
try:
session.execute(
text("""
UPDATE embedding_retry_queue
SET attempts = attempts + 1,
last_error = :err,
status = CASE WHEN attempts + 1 >= :max THEN 'failed' ELSE 'pending' END,
updated_at = :now
WHERE id = :id
"""),
{
"err": str(e)[:500],
"max": EMBED_MAX_ATTEMPTS,
"now": datetime.now(),
"id": row_id,
},
)
session.commit()
except Exception as e2:
session.rollback()
sys_log.error(f"[OCLearn] 更新 retry attempts 失敗: {e2}")
sys_log.warning(f"[OCLearn] embedding 失敗 (attempt 累加): {e}")
return False
finally:
session.close()
def _embedding_worker_loop():
"""背景執行緒:輪詢 embedding_retry_queue批次處理 pending 項目"""
sys_log.info("[OCLearn] Hermes Embedding Worker (DB-backed) 啟動")
while True:
try:
session = get_session()
try:
rows = session.execute(
text("""
SELECT id, target_table, target_id, text_content, model
FROM embedding_retry_queue
WHERE status = 'pending'
AND attempts < :max
ORDER BY created_at
LIMIT :lim
"""),
{"max": EMBED_MAX_ATTEMPTS, "lim": EMBED_BATCH_SIZE},
).fetchall()
finally:
session.close()
for row in rows:
_process_one_embedding(
row[0], row[1], row[2], row[3], row[4] or "bge-m3:latest"
)
except Exception as e:
sys_log.warning(f"[OCLearn] worker 輪詢略過 (表可能未建立): {e}")
time.sleep(EMBED_POLL_INTERVAL_SEC)
# 啟動背景 WorkerdaemonPython 進程結束會自然回收)
threading.Thread(target=_embedding_worker_loop, daemon=True).start()
# =====================================================================
# ADR-007 核心store_insight 雙寫DB + 排程 embedding
# =====================================================================
def store_insight(insight_type: str, content: str, period: str = None,
product_sku: str = None, metadata: dict = None,
ai_model: str = None) -> int:
"""
將 AI 產出存入 ai_insights 表並排程向量化。
- Cache-aside同 insight_type + period + product_sku 已存在則覆蓋
- 向量化:寫入 embedding_retry_queue 供 Hermes worker 處理(持久化)
"""
session = get_session()
try:
meta_str = json.dumps(metadata, ensure_ascii=False) if metadata else None
existing = None
if period and insight_type:
q = session.query(AIInsight).filter_by(insight_type=insight_type, period=period)
if product_sku:
q = q.filter_by(product_sku=product_sku)
existing = q.first()
if existing:
existing.content = content
if meta_str:
existing.metadata_json = meta_str
existing.updated_at = datetime.now()
session.commit()
insight_id = existing.id
sys_log.info(f"[OCLearn] 更新 insight_type={insight_type} period={period}")
else:
new_insight = AIInsight(
insight_type=insight_type,
period=period,
product_sku=product_sku,
content=content,
metadata_json=meta_str,
created_at=datetime.now(),
updated_at=datetime.now(),
)
session.add(new_insight)
session.commit()
insight_id = new_insight.id
sys_log.info(f"[OCLearn] 新增 insight_type={insight_type} period={period}")
# 若已有 ai_model 欄位則寫入Migration 010 後可用,先試再容錯)
if ai_model:
try:
session.execute(
text("UPDATE ai_insights SET ai_model = :m WHERE id = :i"),
{"m": ai_model, "i": insight_id},
)
session.commit()
except Exception:
session.rollback()
# 推入 DB retry queue持久化
embed_target_text = f"{insight_type} ({period or ''}): {content}"
_enqueue_embedding("ai_insights", insight_id, embed_target_text)
return insight_id
except Exception as e:
session.rollback()
sys_log.error(f"[OCLearn] store_insight 錯誤: {e}")
return None
finally:
session.close()
# =====================================================================
# ADR-005: 時間衰減品質分數
# =====================================================================
def compute_effective_score(base_score: float, created_at: datetime,
decay_rate: float = DECAY_RATE) -> float:
"""Effective_Score = base × exp(-λ × days_since_created)"""
if not created_at:
return base_score
if created_at.tzinfo is None:
created_at = created_at.replace(tzinfo=timezone.utc)
days = (datetime.now(timezone.utc) - created_at).days
return base_score * math.exp(-decay_rate * max(days, 0))
# =====================================================================
# RAG 檢索(套時間衰減)
# =====================================================================
def build_rag_context(query: str, insight_type: str = None, period: str = None,
top_k: int = 5) -> str:
"""
RAG 上下文檢索Cache-aside 精確命中 + 未來語意檢索)
2026-04-19: 加入時間衰減重排ADR-005
"""
session = get_session()
try:
q = session.query(AIInsight)
if insight_type:
q = q.filter_by(insight_type=insight_type)
if period:
q = q.filter_by(period=period)
if not period:
q = q.order_by(AIInsight.created_at.desc()).limit(20)
results = q.all()
if not results:
return ""
# 套用時間衰減重排(無 avg_quality 欄位時,預設 base=0.5
ranked = []
for r in results:
base = getattr(r, "avg_quality", None) or 0.5
exempt = getattr(r, "decay_exempt", False)
effective = base if exempt else compute_effective_score(base, r.created_at)
ranked.append((effective, r))
ranked.sort(key=lambda x: x[0], reverse=True)
top = ranked[:top_k]
context_parts = []
for score, r in top:
p_tag = f"[{r.period}]" if r.period else "[歷史紀錄]"
context_parts.append(
f"{p_tag} {r.insight_type} (分數={score:.2f}):\n{r.content}"
)
sys_log.info(f"[OCLearn] RAG context: {len(top)}/{len(results)} 筆(時間衰減重排)")
return "\n\n---\n\n".join(context_parts)
except Exception as e:
sys_log.error(f"[OCLearn] build_rag_context 錯誤: {e}")
return ""
finally:
session.close()
def build_rag_context_by_date(start_date: str, end_date: str) -> str:
"""週報 RAG依日期區間過濾 ai_insights"""
session = get_session()
try:
start_dt = datetime.strptime(f"{start_date} 00:00:00", "%Y-%m-%d %H:%M:%S")
end_dt = datetime.strptime(f"{end_date} 23:59:59", "%Y-%m-%d %H:%M:%S")
results = (
session.query(AIInsight)
.filter(AIInsight.created_at >= start_dt)
.filter(AIInsight.created_at <= end_dt)
.order_by(AIInsight.created_at.asc())
.all()
)
if not results:
return ""
parts = []
for r in results:
dt_str = r.created_at.strftime("%Y-%m-%d %H:%M") if r.created_at else ""
p_tag = f"[{r.period if r.period else dt_str}]"
parts.append(f"{p_tag} 洞察類型({r.insight_type}):\n{r.content}")
sys_log.info(
f"[OCLearn] 週報 RAG: {len(results)} 筆({start_date}~{end_date}"
)
return "\n\n---\n\n".join(parts)
except Exception as e:
sys_log.error(f"[OCLearn] build_rag_context_by_date 錯誤: {e}")
return ""
finally:
session.close()
# =====================================================================
# 其他 API
# =====================================================================
def store_conversation(user_id: int, user_message: str, bot_response: str) -> bool:
"""用戶對話沉澱(走 store_insight 雙寫)"""
try:
store_insight(
insight_type="chat_history",
content=f"User: {user_message}\nBot: {bot_response}",
metadata={"user_id": user_id},
)
return True
except Exception as e:
sys_log.error(f"[OCLearn] store_conversation 錯誤: {e}")
return False
def update_feedback(insight_id: int, rating: int, comment: str = None) -> bool:
"""用戶回饋:累加 feedback_up/down 並重算 avg_qualityMigration 010 後生效)"""
session = get_session()
try:
col = "feedback_up" if rating >= 4 else "feedback_down"
session.execute(
text(f"UPDATE ai_insights SET {col} = COALESCE({col},0) + 1 WHERE id = :id"),
{"id": insight_id},
)
# 重算 avg_quality = up / (up+down)
session.execute(
text("""
UPDATE ai_insights
SET avg_quality = CASE
WHEN COALESCE(feedback_up,0) + COALESCE(feedback_down,0) = 0 THEN 0.5
ELSE COALESCE(feedback_up,0)::float
/ (COALESCE(feedback_up,0) + COALESCE(feedback_down,0))
END
WHERE id = :id
"""),
{"id": insight_id},
)
session.commit()
sys_log.info(f"[OCLearn] Feedback {insight_id}: rating={rating}")
return True
except Exception as e:
session.rollback()
sys_log.warning(f"[OCLearn] update_feedback 略過 (Migration 010 未執行?): {e}")
return False
finally:
session.close()
def get_learning_stats() -> dict:
"""學習系統總攬指標"""
session = get_session()
try:
count = session.query(AIInsight).count()
try:
pending = session.execute(
text("SELECT COUNT(*) FROM embedding_retry_queue WHERE status='pending'")
).scalar()
except Exception:
pending = None
return {
"total_insights": count,
"embedding_queue_pending": pending,
"status": "active",
}
except Exception as e:
return {"total_insights": 0, "status": "error", "error": str(e)}
finally:
session.close()
# =====================================================================
# ADR-005 Step-2: 每日批次維護03:00 去重 / 04:00 品質分數重算)
# =====================================================================
def run_dedup_batch() -> dict:
"""
03:00 批次:去除同日同 SKU 同 insight_type 的重複洞察。
策略:保留 avg_quality 最高者tie-break: 最新 id其餘設 status='archived'
回傳 {"archived": int, "scanned": int}
"""
session = get_session()
archived = 0
scanned = 0
try:
dupes = session.execute(text("""
SELECT insight_type, product_sku, period, COUNT(*) AS cnt
FROM ai_insights
WHERE status NOT IN ('archived', 'relearn')
AND product_sku IS NOT NULL
GROUP BY insight_type, product_sku, period
HAVING COUNT(*) > 1
""")).fetchall()
for row in dupes:
scanned += row[3]
keep = session.execute(text("""
SELECT id FROM ai_insights
WHERE insight_type = :t AND product_sku = :sku
AND period IS NOT DISTINCT FROM :p
AND status NOT IN ('archived', 'relearn')
ORDER BY avg_quality DESC NULLS LAST, id DESC
LIMIT 1
"""), {"t": row[0], "sku": row[1], "p": row[2]}).scalar()
if keep is None:
continue
result = session.execute(text("""
UPDATE ai_insights
SET status = 'archived', updated_at = CURRENT_TIMESTAMP
WHERE insight_type = :t AND product_sku = :sku
AND period IS NOT DISTINCT FROM :p
AND status NOT IN ('archived', 'relearn')
AND id != :keep_id
"""), {"t": row[0], "sku": row[1], "p": row[2], "keep_id": keep})
archived += result.rowcount
session.commit()
sys_log.info(f"[OCLearn] 去重批次完成:掃描 {scanned} 筆,歸檔 {archived}")
return {"archived": archived, "scanned": scanned}
except Exception as e:
session.rollback()
sys_log.error(f"[OCLearn] 去重批次失敗: {e}")
return {"archived": 0, "scanned": 0, "error": str(e)}
finally:
session.close()
def run_quality_rescore_batch() -> dict:
"""
04:00 批次:對所有 approved/relearn 狀態的 ai_insights 套用時間衰減公式。
decay_exempt=True 的記錄跳過衰減(永久知識)。
relearn 狀態額外懲罰 20%。分數 < 0.05 自動歸檔。
回傳 {"updated": int, "relearn_reset": int}
"""
session = get_session()
updated = 0
relearn_reset = 0
try:
rows = session.execute(text("""
SELECT id, avg_quality, created_at, decay_exempt, status
FROM ai_insights
WHERE status IN ('approved', 'relearn')
""")).fetchall()
for row in rows:
row_id, base_q, created_at, exempt, status = row
base_q = base_q or 0.5
if exempt:
continue
if created_at and created_at.tzinfo is None:
created_at = created_at.replace(tzinfo=timezone.utc)
new_score = compute_effective_score(base_q, created_at)
if status == "relearn":
new_score *= 0.8
session.execute(text("""
UPDATE ai_insights
SET avg_quality = :q, updated_at = CURRENT_TIMESTAMP
WHERE id = :id
"""), {"q": round(new_score, 4), "id": row_id})
updated += 1
if new_score < 0.05:
session.execute(text(
"UPDATE ai_insights SET status = 'archived' WHERE id = :id"
), {"id": row_id})
if status == "relearn":
relearn_reset += 1
session.commit()
sys_log.info(
f"[OCLearn] 品質分數重算完成:更新 {updated} 筆,"
f"低分自動歸檔含 relearn {relearn_reset}"
)
return {"updated": updated, "relearn_reset": relearn_reset}
except Exception as e:
session.rollback()
sys_log.error(f"[OCLearn] 品質分數重算批次失敗: {e}")
return {"updated": 0, "relearn_reset": 0, "error": str(e)}
finally:
session.close()