From c2124dce00aacb5cee34fc879d9102c6cb94d8ff Mon Sep 17 00:00:00 2001 From: OoO Date: Mon, 4 May 2026 09:11:27 +0800 Subject: [PATCH] =?UTF-8?q?feat(p11+):=20RAG=20worker=20cron=20=E2=80=94?= =?UTF-8?q?=20promotion=5Fgate=20/=20awaiting=5Freview=20/=20expire?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Operation Ollama-First v5.0 / Phase 11+ 收尾(ADR-032/033 落地) services/learning_pipeline.py 新增 2 個 worker 函數: - process_pending_episodes(batch=50) — 批次處理 pending → can_promote → promote/reject/await 純規則引擎,不跑 LLM(Distiller 純 Hermes 規則) - push_awaiting_reviews_to_telegram(batch=5) — 推 Stage 4 awaiting_review 到 Telegram TELEGRAM_ADMIN_CHAT_ID 未設則跳過(fail-safe) 訊息含 episode_id + weight + quality + 600 字截斷文,附 promotion_review_keyboard 👍/👎 run_scheduler.py 加 3 個 cron + 對應 task wrapper: - 每 5 分鐘 → run_promotion_gate_worker - 每 30 分鐘 → run_awaiting_review_push - 每 4 小時 → run_expire_stale_reviews(24h 無回應 → weight=0.5) 設計安全保證: - RAG_ENABLED=false 時 learning_episodes 為空,3 個 worker 跑空 loop(無害) - 所有 worker 例外完全吞掉,僅 log error,不影響其他排程 - promote 成功才回 stats['promoted']++,DB 失敗計 errors 完整 RAG 自主學習迴圈閉環: LLM 結果 → Distiller → learning_episodes (pending) ↓ 每 5 分鐘 worker PromotionGate 4 階段 ↓ approved → 寫 ai_insights → RAG 可檢索 ↓ awaiting_review → 每 30 分鐘推 Telegram ↓ 24h 無回應 → 每 4h expire → weight=0.5 ↓ 👍 callback → promote → ai_insights ↓ 👎 callback → rejected_human → 永不晉升 仍待 Phase 12+ 完成: - learning_episodes.embedding 寫入路徑(Stage 3 dedup 解鎖) - RAG_ENABLED=true 灰度啟用條件(需 100+ episodes + ANTHROPIC_API_KEY) regression: 70 unit tests 全綠 Co-Authored-By: Claude Opus 4.7 (1M context) --- run_scheduler.py | 64 +++++++++++++- services/learning_pipeline.py | 157 ++++++++++++++++++++++++++++++++++ 2 files changed, 219 insertions(+), 2 deletions(-) diff --git a/run_scheduler.py b/run_scheduler.py index 3278c57..9e96ee2 100644 --- a/run_scheduler.py +++ b/run_scheduler.py @@ -105,6 +105,17 @@ def _register_schedules(): schedule.every(12).hours.do(run_dedup_batch_task) logger.info("📅 每 12 小時:dedup_batch") + # Operation Ollama-First v5.0 Phase 11+ — RAG 學習迴圈 worker(Phase 12 收尾) + # 預設 RAG_ENABLED=false 時,learning_episodes 不會有資料,worker 跑空 loop(無害) + schedule.every(5).minutes.do(run_promotion_gate_worker) + logger.info("📅 每 5 分鐘:promotion_gate_worker(pending → promote/reject/await)") + + schedule.every(30).minutes.do(run_awaiting_review_push) + logger.info("📅 每 30 分鐘:awaiting_review_push(推 Telegram 等 👍/👎)") + + schedule.every(4).hours.do(run_expire_stale_reviews) + logger.info("📅 每 4 小時:expire_stale_reviews(24h 無回應降權 0.5)") + schedule.every().day.at("03:00").do(run_db_backup_task) logger.info("📅 每日 03:00:db_backup") @@ -161,8 +172,57 @@ def run_daily_token_report_task(): ) except Exception as e: logger.error(f"[TokenReport] task failed: {e}", exc_info=True) - # 不再嘗試 event_router(避免循環依賴),純 log 即可 - # 統帥可從 scheduler logs 觀察失敗 + + +# ───────────────────────────────────────────────────────────────────────────── +# Operation Ollama-First v5.0 Phase 11+ — RAG 學習迴圈 worker(Phase 12 收尾) +# ───────────────────────────────────────────────────────────────────────────── + +def run_promotion_gate_worker(): + """每 5 分鐘 — 批次處理 learning_episodes pending → can_promote → promote/reject/await。 + + 依 ADR-032 PromotionGate 4 階段,不主動跑 LLM(Distiller 純規則引擎)。 + RAG_ENABLED=false 時 learning_episodes 為空,worker 跑空 loop(無害)。 + """ + try: + from services.learning_pipeline import process_pending_episodes + stats = process_pending_episodes() + if stats.get('pending_seen', 0) > 0: + logger.info( + "[PromotionWorker] pending=%d promoted=%d rejected=%d awaiting=%d errors=%d", + stats['pending_seen'], stats['promoted'], stats['rejected'], + stats['awaiting'], stats['errors'], + ) + except Exception as e: + logger.error(f"[PromotionWorker] task failed: {e}", exc_info=True) + + +def run_awaiting_review_push(): + """每 30 分鐘 — 推 awaiting_review episode 到 Telegram 等 👍/👎。 + + 限制:TELEGRAM_ADMIN_CHAT_ID 未設則跳過(fail-safe)。 + """ + try: + from services.learning_pipeline import push_awaiting_reviews_to_telegram + pushed = push_awaiting_reviews_to_telegram() + if pushed > 0: + logger.info("[AwaitingReviewPush] pushed=%d episodes", pushed) + except Exception as e: + logger.error(f"[AwaitingReviewPush] task failed: {e}", exc_info=True) + + +def run_expire_stale_reviews(): + """每 4 小時 — 24h 無回應 awaiting_review → expired(weight=0.5)。 + + 依 ADR-033 護欄 #1 Stage 4 規則。 + """ + try: + from services.learning_pipeline import expire_stale_reviews + n = expire_stale_reviews() + if n > 0: + logger.info("[ExpireStale] expired %d awaiting_review episodes (24h timeout)", n) + except Exception as e: + logger.error(f"[ExpireStale] task failed: {e}", exc_info=True) def run_cleanup_agent_context(): diff --git a/services/learning_pipeline.py b/services/learning_pipeline.py index c74cf18..2b60955 100644 --- a/services/learning_pipeline.py +++ b/services/learning_pipeline.py @@ -26,6 +26,7 @@ from __future__ import annotations import hashlib import json import logging +import os import re from dataclasses import dataclass from typing import Any, Dict, List, Optional, Tuple @@ -724,6 +725,160 @@ def hash_human_approver(username: str) -> str: return hashlib.sha1(username.encode('utf-8')).hexdigest()[:8] +# ───────────────────────────────────────────────────────────────────────────── +# Worker 函數(給 run_scheduler.py 排程用)— Phase 11+ 收尾 +# ───────────────────────────────────────────────────────────────────────────── + +# 預設批次大小:每次處理 N 筆 pending,避免 worker 一次跑太久阻塞排程 +PENDING_BATCH_SIZE = int(os.environ.get('PROMOTION_PENDING_BATCH_SIZE', '50')) +AWAITING_REVIEW_PUSH_BATCH = int(os.environ.get('AWAITING_REVIEW_PUSH_BATCH', '5')) + + +def process_pending_episodes(batch_size: int = PENDING_BATCH_SIZE) -> Dict[str, int]: + """批次處理 learning_episodes pending → can_promote → promote/reject/await_review。 + + 給 run_scheduler.py 每 5 分鐘跑一次。 + 依 ADR-032 PromotionGate 4 階段,每筆走完整檢查。 + + Returns: + {'pending_seen': N, 'promoted': X, 'rejected': Y, 'awaiting': Z, 'errors': E} + """ + stats = {'pending_seen': 0, 'promoted': 0, 'rejected': 0, 'awaiting': 0, 'errors': 0} + + try: + from sqlalchemy import text as sa_text + from database.manager import get_session + except Exception as exc: + logger.warning('[PromotionWorker] DB import failed: %s', exc) + return stats + + session = get_session() + try: + rows = session.execute( + sa_text(""" + SELECT id FROM learning_episodes + WHERE promotion_status = 'pending' + ORDER BY created_at ASC + LIMIT :n + """), + {'n': batch_size}, + ).fetchall() + episode_ids = [int(r[0]) for r in rows] + except Exception as exc: + logger.error('[PromotionWorker] SELECT pending failed: %s', exc) + session.close() + return stats + finally: + session.close() + + stats['pending_seen'] = len(episode_ids) + if not episode_ids: + return stats + + for ep_id in episode_ids: + try: + decision = promotion_gate.can_promote(ep_id) + if decision.reason == 'approved': + if promotion_gate.promote(ep_id): + stats['promoted'] += 1 + else: + stats['errors'] += 1 + elif decision.reason == 'awaiting_review': + if promotion_gate.mark_awaiting_review(ep_id): + stats['awaiting'] += 1 + else: + stats['errors'] += 1 + elif decision.reason.startswith('rejected_'): + if promotion_gate.reject(ep_id, decision.reason, decision.detail): + stats['rejected'] += 1 + else: + stats['errors'] += 1 + except Exception as exc: + logger.warning('[PromotionWorker] episode_id=%s failed: %s', ep_id, exc) + stats['errors'] += 1 + + logger.info( + '[PromotionWorker] batch done: pending=%d promoted=%d rejected=%d awaiting=%d errors=%d', + stats['pending_seen'], stats['promoted'], stats['rejected'], + stats['awaiting'], stats['errors'] + ) + return stats + + +def push_awaiting_reviews_to_telegram(batch: int = AWAITING_REVIEW_PUSH_BATCH, + chat_id: Optional[str] = None) -> int: + """找 awaiting_review 但尚未推送的 episode → 推 Telegram 帶 👍/👎 keyboard。 + + 給 run_scheduler.py 每 30 分鐘跑(與 expire_stale_reviews 配合 24h timeout)。 + + 判斷「未推送」:reviewed_at IS NULL(mark_awaiting_review 時不設 reviewed_at; + 24h expired / human approve/reject 時才寫 reviewed_at)。 + """ + pushed = 0 + try: + from sqlalchemy import text as sa_text + from database.manager import get_session + except Exception as exc: + logger.warning('[AwaitingReviewPush] DB import failed: %s', exc) + return 0 + + # 取 chat_id(預設 admin) + if chat_id is None: + chat_id = os.environ.get('TELEGRAM_ADMIN_CHAT_ID', '').strip() or None + if not chat_id: + logger.info('[AwaitingReviewPush] TELEGRAM_ADMIN_CHAT_ID 未設,跳過推送') + return 0 + + session = get_session() + try: + rows = session.execute( + sa_text(""" + SELECT id, distilled_text, weight, quality_score + FROM learning_episodes + WHERE promotion_status = 'awaiting_review' + AND reviewed_at IS NULL + ORDER BY created_at ASC + LIMIT :n + """), + {'n': batch}, + ).fetchall() + except Exception as exc: + logger.error('[AwaitingReviewPush] SELECT failed: %s', exc) + session.close() + return 0 + finally: + session.close() + + if not rows: + return 0 + + # 推送 + try: + from services.telegram_templates import promotion_review_keyboard, _send_telegram_raw + except Exception as exc: + logger.warning('[AwaitingReviewPush] template import failed: %s', exc) + return 0 + + for r in rows: + ep_id, text_, weight, quality = r[0], r[1], float(r[2] or 0), float(r[3] or 0) + msg = ( + f"🧠 RAG 學習晉升審核\n" + f"━━━━━━━━━━━━━━━━━━━━\n" + f"📋 episode #{ep_id} (weight={weight:.2f} quality={quality:.2f})\n\n" + f"{(text_ or '')[:600]}\n\n" + f"審核:通過 → 寫入 ai_insights 供 RAG 檢索;拒絕 → 永不晉升" + ) + try: + _send_telegram_raw(msg, chat_id=chat_id, reply_markup=promotion_review_keyboard(ep_id)) + pushed += 1 + except Exception as exc: + logger.warning('[AwaitingReviewPush] episode_id=%s push failed: %s', ep_id, exc) + + logger.info('[AwaitingReviewPush] pushed %d awaiting_review episodes to chat=%s', + pushed, chat_id) + return pushed + + # ───────────────────────────────────────────────────────────────────────────── # 全域單例 # ───────────────────────────────────────────────────────────────────────────── @@ -742,6 +897,8 @@ __all__ = [ 'learning_pipeline', 'promotion_gate', 'expire_stale_reviews', + 'process_pending_episodes', + 'push_awaiting_reviews_to_telegram', 'hash_human_approver', 'STAGE_1_AUTO_QUALITY', 'STAGE_3_DEDUP_THRESHOLD',