Files
ewoooc/services/openclaw_learning_service.py
ogt 1b4f3a7bbe
Some checks failed
CD Pipeline / deploy (push) Failing after 59s
feat: EwoooC 初始化 — 完整專案推版至 Gitea
- 建立 Gitea Actions CD pipeline (.gitea/workflows/cd.yaml)
- 部署模式: rsync Python 檔案至 188 → docker restart (volume mount)
- Dockerfile/requirements 變動時自動重建 Docker image
- 部署通知: Telegram (開始/成功/失敗)
- 健康檢查: https://mo.wooo.work/health (最多 5 次重試)
- 同步最新 CLAUDE.md / ADR-008 / memory (2026-04-19)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-19 01:21:13 +08:00

394 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import json
import math
import threading
import time
from datetime import datetime, timezone
from sqlalchemy import text
from services.logger_manager import SystemLogger
from database.manager import get_session
from database.ai_models import AIInsight
from services.ollama_service import ollama_service
sys_log = SystemLogger("OCLearn").get_logger()
# =====================================================================
# ADR-007 Step 4 (2026-04-19): DB 持久化 Embedding Retry Queue
#
# 改造原因:原記憶體 Queue 重啟會遺失未處理項目,違反 ADR-007「雙寫必達」。
# 新架構:入列寫 embedding_retry_queue 表 → Hermes worker 每分鐘輪詢批次處理。
# =====================================================================
EMBED_POLL_INTERVAL_SEC = 60 # worker 輪詢間隔
EMBED_BATCH_SIZE = 10 # 單次最多處理筆數
EMBED_MAX_ATTEMPTS = 5 # 超過則標記 failed
DECAY_RATE = 0.01 # ADR-005半衰期約 70 天
def _enqueue_embedding(target_table: str, target_id: int, text_content: str,
model: str = "bge-m3:latest") -> bool:
"""將待 embed 項目寫入 DB retry queue持久化"""
session = get_session()
try:
session.execute(
text("""
INSERT INTO embedding_retry_queue
(target_table, target_id, text_content, model, status, created_at)
VALUES (:t, :i, :txt, :m, 'pending', :now)
"""),
{
"t": target_table,
"i": target_id,
"txt": text_content,
"m": model,
"now": datetime.now(),
},
)
session.commit()
return True
except Exception as e:
session.rollback()
sys_log.warning(f"[OCLearn] enqueue embedding 失敗 (可能表尚未建立): {e}")
return False
finally:
session.close()
def _process_one_embedding(row_id: int, target_table: str, target_id: int,
text_content: str, model: str) -> bool:
"""處理單筆 embedding成功寫回目標表失敗累加 attempts"""
session = get_session()
try:
session.execute(
text("UPDATE embedding_retry_queue SET status='processing', updated_at=:now WHERE id=:id"),
{"now": datetime.now(), "id": row_id},
)
session.commit()
vec = ollama_service.generate_embedding(text_content, model=model)
if not vec:
raise RuntimeError("embedding 回傳空值")
vec_str = str(vec)
session.execute(
text(f"UPDATE {target_table} SET embedding = :vec WHERE id = :id"),
{"vec": vec_str, "id": target_id},
)
session.execute(
text("""
UPDATE embedding_retry_queue
SET status='done', processed_at=:now, updated_at=:now
WHERE id=:id
"""),
{"now": datetime.now(), "id": row_id},
)
session.commit()
sys_log.debug(f"[OCLearn] embedding 寫入成功 {target_table}#{target_id}")
return True
except Exception as e:
session.rollback()
try:
session.execute(
text("""
UPDATE embedding_retry_queue
SET attempts = attempts + 1,
last_error = :err,
status = CASE WHEN attempts + 1 >= :max THEN 'failed' ELSE 'pending' END,
updated_at = :now
WHERE id = :id
"""),
{
"err": str(e)[:500],
"max": EMBED_MAX_ATTEMPTS,
"now": datetime.now(),
"id": row_id,
},
)
session.commit()
except Exception as e2:
session.rollback()
sys_log.error(f"[OCLearn] 更新 retry attempts 失敗: {e2}")
sys_log.warning(f"[OCLearn] embedding 失敗 (attempt 累加): {e}")
return False
finally:
session.close()
def _embedding_worker_loop():
"""背景執行緒:輪詢 embedding_retry_queue批次處理 pending 項目"""
sys_log.info("[OCLearn] Hermes Embedding Worker (DB-backed) 啟動")
while True:
try:
session = get_session()
try:
rows = session.execute(
text("""
SELECT id, target_table, target_id, text_content, model
FROM embedding_retry_queue
WHERE status = 'pending'
AND attempts < :max
ORDER BY created_at
LIMIT :lim
"""),
{"max": EMBED_MAX_ATTEMPTS, "lim": EMBED_BATCH_SIZE},
).fetchall()
finally:
session.close()
for row in rows:
_process_one_embedding(
row[0], row[1], row[2], row[3], row[4] or "bge-m3:latest"
)
except Exception as e:
sys_log.warning(f"[OCLearn] worker 輪詢略過 (表可能未建立): {e}")
time.sleep(EMBED_POLL_INTERVAL_SEC)
# 啟動背景 WorkerdaemonPython 進程結束會自然回收)
threading.Thread(target=_embedding_worker_loop, daemon=True).start()
# =====================================================================
# ADR-007 核心store_insight 雙寫DB + 排程 embedding
# =====================================================================
def store_insight(insight_type: str, content: str, period: str = None,
product_sku: str = None, metadata: dict = None,
ai_model: str = None) -> int:
"""
將 AI 產出存入 ai_insights 表並排程向量化。
- Cache-aside同 insight_type + period + product_sku 已存在則覆蓋
- 向量化:寫入 embedding_retry_queue 供 Hermes worker 處理(持久化)
"""
session = get_session()
try:
meta_str = json.dumps(metadata, ensure_ascii=False) if metadata else None
existing = None
if period and insight_type:
q = session.query(AIInsight).filter_by(insight_type=insight_type, period=period)
if product_sku:
q = q.filter_by(product_sku=product_sku)
existing = q.first()
if existing:
existing.content = content
if meta_str:
existing.metadata_json = meta_str
existing.updated_at = datetime.now()
session.commit()
insight_id = existing.id
sys_log.info(f"[OCLearn] 更新 insight_type={insight_type} period={period}")
else:
new_insight = AIInsight(
insight_type=insight_type,
period=period,
product_sku=product_sku,
content=content,
metadata_json=meta_str,
created_at=datetime.now(),
updated_at=datetime.now(),
)
session.add(new_insight)
session.commit()
insight_id = new_insight.id
sys_log.info(f"[OCLearn] 新增 insight_type={insight_type} period={period}")
# 若已有 ai_model 欄位則寫入Migration 010 後可用,先試再容錯)
if ai_model:
try:
session.execute(
text("UPDATE ai_insights SET ai_model = :m WHERE id = :i"),
{"m": ai_model, "i": insight_id},
)
session.commit()
except Exception:
session.rollback()
# 推入 DB retry queue持久化
embed_target_text = f"{insight_type} ({period or ''}): {content}"
_enqueue_embedding("ai_insights", insight_id, embed_target_text)
return insight_id
except Exception as e:
session.rollback()
sys_log.error(f"[OCLearn] store_insight 錯誤: {e}")
return None
finally:
session.close()
# =====================================================================
# ADR-005: 時間衰減品質分數
# =====================================================================
def compute_effective_score(base_score: float, created_at: datetime,
decay_rate: float = DECAY_RATE) -> float:
"""Effective_Score = base × exp(-λ × days_since_created)"""
if not created_at:
return base_score
if created_at.tzinfo is None:
created_at = created_at.replace(tzinfo=timezone.utc)
days = (datetime.now(timezone.utc) - created_at).days
return base_score * math.exp(-decay_rate * max(days, 0))
# =====================================================================
# RAG 檢索(套時間衰減)
# =====================================================================
def build_rag_context(query: str, insight_type: str = None, period: str = None,
top_k: int = 5) -> str:
"""
RAG 上下文檢索Cache-aside 精確命中 + 未來語意檢索)
2026-04-19: 加入時間衰減重排ADR-005
"""
session = get_session()
try:
q = session.query(AIInsight)
if insight_type:
q = q.filter_by(insight_type=insight_type)
if period:
q = q.filter_by(period=period)
if not period:
q = q.order_by(AIInsight.created_at.desc()).limit(20)
results = q.all()
if not results:
return ""
# 套用時間衰減重排(無 avg_quality 欄位時,預設 base=0.5
ranked = []
for r in results:
base = getattr(r, "avg_quality", None) or 0.5
exempt = getattr(r, "decay_exempt", False)
effective = base if exempt else compute_effective_score(base, r.created_at)
ranked.append((effective, r))
ranked.sort(key=lambda x: x[0], reverse=True)
top = ranked[:top_k]
context_parts = []
for score, r in top:
p_tag = f"[{r.period}]" if r.period else "[歷史紀錄]"
context_parts.append(
f"{p_tag} {r.insight_type} (分數={score:.2f}):\n{r.content}"
)
sys_log.info(f"[OCLearn] RAG context: {len(top)}/{len(results)} 筆(時間衰減重排)")
return "\n\n---\n\n".join(context_parts)
except Exception as e:
sys_log.error(f"[OCLearn] build_rag_context 錯誤: {e}")
return ""
finally:
session.close()
def build_rag_context_by_date(start_date: str, end_date: str) -> str:
"""週報 RAG依日期區間過濾 ai_insights"""
session = get_session()
try:
start_dt = datetime.strptime(f"{start_date} 00:00:00", "%Y-%m-%d %H:%M:%S")
end_dt = datetime.strptime(f"{end_date} 23:59:59", "%Y-%m-%d %H:%M:%S")
results = (
session.query(AIInsight)
.filter(AIInsight.created_at >= start_dt)
.filter(AIInsight.created_at <= end_dt)
.order_by(AIInsight.created_at.asc())
.all()
)
if not results:
return ""
parts = []
for r in results:
dt_str = r.created_at.strftime("%Y-%m-%d %H:%M") if r.created_at else ""
p_tag = f"[{r.period if r.period else dt_str}]"
parts.append(f"{p_tag} 洞察類型({r.insight_type}):\n{r.content}")
sys_log.info(
f"[OCLearn] 週報 RAG: {len(results)} 筆({start_date}~{end_date}"
)
return "\n\n---\n\n".join(parts)
except Exception as e:
sys_log.error(f"[OCLearn] build_rag_context_by_date 錯誤: {e}")
return ""
finally:
session.close()
# =====================================================================
# 其他 API
# =====================================================================
def store_conversation(user_id: int, user_message: str, bot_response: str) -> bool:
"""用戶對話沉澱(走 store_insight 雙寫)"""
try:
store_insight(
insight_type="chat_history",
content=f"User: {user_message}\nBot: {bot_response}",
metadata={"user_id": user_id},
)
return True
except Exception as e:
sys_log.error(f"[OCLearn] store_conversation 錯誤: {e}")
return False
def update_feedback(insight_id: int, rating: int, comment: str = None) -> bool:
"""用戶回饋:累加 feedback_up/down 並重算 avg_qualityMigration 010 後生效)"""
session = get_session()
try:
col = "feedback_up" if rating >= 4 else "feedback_down"
session.execute(
text(f"UPDATE ai_insights SET {col} = COALESCE({col},0) + 1 WHERE id = :id"),
{"id": insight_id},
)
# 重算 avg_quality = up / (up+down)
session.execute(
text("""
UPDATE ai_insights
SET avg_quality = CASE
WHEN COALESCE(feedback_up,0) + COALESCE(feedback_down,0) = 0 THEN 0.5
ELSE COALESCE(feedback_up,0)::float
/ (COALESCE(feedback_up,0) + COALESCE(feedback_down,0))
END
WHERE id = :id
"""),
{"id": insight_id},
)
session.commit()
sys_log.info(f"[OCLearn] Feedback {insight_id}: rating={rating}")
return True
except Exception as e:
session.rollback()
sys_log.warning(f"[OCLearn] update_feedback 略過 (Migration 010 未執行?): {e}")
return False
finally:
session.close()
def get_learning_stats() -> dict:
"""學習系統總攬指標"""
session = get_session()
try:
count = session.query(AIInsight).count()
try:
pending = session.execute(
text("SELECT COUNT(*) FROM embedding_retry_queue WHERE status='pending'")
).scalar()
except Exception:
pending = None
return {
"total_insights": count,
"embedding_queue_pending": pending,
"status": "active",
}
except Exception as e:
return {"total_insights": 0, "status": "error", "error": str(e)}
finally:
session.close()