diff --git a/run_scheduler.py b/run_scheduler.py index 3278c57..9e96ee2 100644 --- a/run_scheduler.py +++ b/run_scheduler.py @@ -105,6 +105,17 @@ def _register_schedules(): schedule.every(12).hours.do(run_dedup_batch_task) logger.info("๐Ÿ“… ๆฏ 12 ๅฐๆ™‚๏ผšdedup_batch") + # Operation Ollama-First v5.0 Phase 11+ โ€” RAG ๅญธ็ฟ’่ฟดๅœˆ worker๏ผˆPhase 12 ๆ”ถๅฐพ๏ผ‰ + # ้ ่จญ RAG_ENABLED=false ๆ™‚๏ผŒlearning_episodes ไธๆœƒๆœ‰่ณ‡ๆ–™๏ผŒworker ่ท‘็ฉบ loop๏ผˆ็„กๅฎณ๏ผ‰ + schedule.every(5).minutes.do(run_promotion_gate_worker) + logger.info("๐Ÿ“… ๆฏ 5 ๅˆ†้˜๏ผšpromotion_gate_worker๏ผˆpending โ†’ promote/reject/await๏ผ‰") + + schedule.every(30).minutes.do(run_awaiting_review_push) + logger.info("๐Ÿ“… ๆฏ 30 ๅˆ†้˜๏ผšawaiting_review_push๏ผˆๆŽจ Telegram ็ญ‰ ๐Ÿ‘/๐Ÿ‘Ž๏ผ‰") + + schedule.every(4).hours.do(run_expire_stale_reviews) + logger.info("๐Ÿ“… ๆฏ 4 ๅฐๆ™‚๏ผšexpire_stale_reviews๏ผˆ24h ็„กๅ›žๆ‡‰้™ๆฌŠ 0.5๏ผ‰") + schedule.every().day.at("03:00").do(run_db_backup_task) logger.info("๐Ÿ“… ๆฏๆ—ฅ 03:00๏ผšdb_backup") @@ -161,8 +172,57 @@ def run_daily_token_report_task(): ) except Exception as e: logger.error(f"[TokenReport] task failed: {e}", exc_info=True) - # ไธๅ†ๅ˜—่ฉฆ event_router๏ผˆ้ฟๅ…ๅพช็’ฐไพ่ณด๏ผ‰๏ผŒ็ด” log ๅณๅฏ - # ็ตฑๅธฅๅฏๅพž scheduler logs ่ง€ๅฏŸๅคฑๆ•— + + +# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ +# Operation Ollama-First v5.0 Phase 11+ โ€” RAG ๅญธ็ฟ’่ฟดๅœˆ worker๏ผˆPhase 12 ๆ”ถๅฐพ๏ผ‰ +# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + +def run_promotion_gate_worker(): + """ๆฏ 5 ๅˆ†้˜ โ€” ๆ‰นๆฌก่™•็† learning_episodes pending โ†’ can_promote โ†’ promote/reject/awaitใ€‚ + + ไพ ADR-032 PromotionGate 4 ้šŽๆฎต๏ผŒไธไธปๅ‹•่ท‘ LLM๏ผˆDistiller ็ด”่ฆๅ‰‡ๅผ•ๆ“Ž๏ผ‰ใ€‚ + RAG_ENABLED=false ๆ™‚ learning_episodes ็‚บ็ฉบ๏ผŒworker ่ท‘็ฉบ loop๏ผˆ็„กๅฎณ๏ผ‰ใ€‚ + """ + try: + from services.learning_pipeline import process_pending_episodes + stats = process_pending_episodes() + if stats.get('pending_seen', 0) > 0: + logger.info( + "[PromotionWorker] pending=%d promoted=%d rejected=%d awaiting=%d errors=%d", + stats['pending_seen'], stats['promoted'], stats['rejected'], + stats['awaiting'], stats['errors'], + ) + except Exception as e: + logger.error(f"[PromotionWorker] task failed: {e}", exc_info=True) + + +def run_awaiting_review_push(): + """ๆฏ 30 ๅˆ†้˜ โ€” ๆŽจ awaiting_review episode ๅˆฐ Telegram ็ญ‰ ๐Ÿ‘/๐Ÿ‘Žใ€‚ + + ้™ๅˆถ๏ผšTELEGRAM_ADMIN_CHAT_ID ๆœช่จญๅ‰‡่ทณ้Ž๏ผˆfail-safe๏ผ‰ใ€‚ + """ + try: + from services.learning_pipeline import push_awaiting_reviews_to_telegram + pushed = push_awaiting_reviews_to_telegram() + if pushed > 0: + logger.info("[AwaitingReviewPush] pushed=%d episodes", pushed) + except Exception as e: + logger.error(f"[AwaitingReviewPush] task failed: {e}", exc_info=True) + + +def run_expire_stale_reviews(): + """ๆฏ 4 ๅฐๆ™‚ โ€” 24h ็„กๅ›žๆ‡‰ awaiting_review โ†’ expired๏ผˆweight=0.5๏ผ‰ใ€‚ + + ไพ ADR-033 ่ญทๆฌ„ #1 Stage 4 ่ฆๅ‰‡ใ€‚ + """ + try: + from services.learning_pipeline import expire_stale_reviews + n = expire_stale_reviews() + if n > 0: + logger.info("[ExpireStale] expired %d awaiting_review episodes (24h timeout)", n) + except Exception as e: + logger.error(f"[ExpireStale] task failed: {e}", exc_info=True) def run_cleanup_agent_context(): diff --git a/services/learning_pipeline.py b/services/learning_pipeline.py index c74cf18..2b60955 100644 --- a/services/learning_pipeline.py +++ b/services/learning_pipeline.py @@ -26,6 +26,7 @@ from __future__ import annotations import hashlib import json import logging +import os import re from dataclasses import dataclass from typing import Any, Dict, List, Optional, Tuple @@ -724,6 +725,160 @@ def hash_human_approver(username: str) -> str: return hashlib.sha1(username.encode('utf-8')).hexdigest()[:8] +# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ +# Worker ๅ‡ฝๆ•ธ๏ผˆ็ตฆ run_scheduler.py ๆŽ’็จ‹็”จ๏ผ‰โ€” Phase 11+ ๆ”ถๅฐพ +# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + +# ้ ่จญๆ‰นๆฌกๅคงๅฐ๏ผšๆฏๆฌก่™•็† N ็ญ† pending๏ผŒ้ฟๅ… worker ไธ€ๆฌก่ท‘ๅคชไน…้˜ปๅกžๆŽ’็จ‹ +PENDING_BATCH_SIZE = int(os.environ.get('PROMOTION_PENDING_BATCH_SIZE', '50')) +AWAITING_REVIEW_PUSH_BATCH = int(os.environ.get('AWAITING_REVIEW_PUSH_BATCH', '5')) + + +def process_pending_episodes(batch_size: int = PENDING_BATCH_SIZE) -> Dict[str, int]: + """ๆ‰นๆฌก่™•็† learning_episodes pending โ†’ can_promote โ†’ promote/reject/await_reviewใ€‚ + + ็ตฆ run_scheduler.py ๆฏ 5 ๅˆ†้˜่ท‘ไธ€ๆฌกใ€‚ + ไพ ADR-032 PromotionGate 4 ้šŽๆฎต๏ผŒๆฏ็ญ†่ตฐๅฎŒๆ•ดๆชขๆŸฅใ€‚ + + Returns: + {'pending_seen': N, 'promoted': X, 'rejected': Y, 'awaiting': Z, 'errors': E} + """ + stats = {'pending_seen': 0, 'promoted': 0, 'rejected': 0, 'awaiting': 0, 'errors': 0} + + try: + from sqlalchemy import text as sa_text + from database.manager import get_session + except Exception as exc: + logger.warning('[PromotionWorker] DB import failed: %s', exc) + return stats + + session = get_session() + try: + rows = session.execute( + sa_text(""" + SELECT id FROM learning_episodes + WHERE promotion_status = 'pending' + ORDER BY created_at ASC + LIMIT :n + """), + {'n': batch_size}, + ).fetchall() + episode_ids = [int(r[0]) for r in rows] + except Exception as exc: + logger.error('[PromotionWorker] SELECT pending failed: %s', exc) + session.close() + return stats + finally: + session.close() + + stats['pending_seen'] = len(episode_ids) + if not episode_ids: + return stats + + for ep_id in episode_ids: + try: + decision = promotion_gate.can_promote(ep_id) + if decision.reason == 'approved': + if promotion_gate.promote(ep_id): + stats['promoted'] += 1 + else: + stats['errors'] += 1 + elif decision.reason == 'awaiting_review': + if promotion_gate.mark_awaiting_review(ep_id): + stats['awaiting'] += 1 + else: + stats['errors'] += 1 + elif decision.reason.startswith('rejected_'): + if promotion_gate.reject(ep_id, decision.reason, decision.detail): + stats['rejected'] += 1 + else: + stats['errors'] += 1 + except Exception as exc: + logger.warning('[PromotionWorker] episode_id=%s failed: %s', ep_id, exc) + stats['errors'] += 1 + + logger.info( + '[PromotionWorker] batch done: pending=%d promoted=%d rejected=%d awaiting=%d errors=%d', + stats['pending_seen'], stats['promoted'], stats['rejected'], + stats['awaiting'], stats['errors'] + ) + return stats + + +def push_awaiting_reviews_to_telegram(batch: int = AWAITING_REVIEW_PUSH_BATCH, + chat_id: Optional[str] = None) -> int: + """ๆ‰พ awaiting_review ไฝ†ๅฐšๆœชๆŽจ้€็š„ episode โ†’ ๆŽจ Telegram ๅธถ ๐Ÿ‘/๐Ÿ‘Ž keyboardใ€‚ + + ็ตฆ run_scheduler.py ๆฏ 30 ๅˆ†้˜่ท‘๏ผˆ่ˆ‡ expire_stale_reviews ้…ๅˆ 24h timeout๏ผ‰ใ€‚ + + ๅˆคๆ–ทใ€ŒๆœชๆŽจ้€ใ€๏ผšreviewed_at IS NULL๏ผˆmark_awaiting_review ๆ™‚ไธ่จญ reviewed_at๏ผ› + 24h expired / human approve/reject ๆ™‚ๆ‰ๅฏซ reviewed_at๏ผ‰ใ€‚ + """ + pushed = 0 + try: + from sqlalchemy import text as sa_text + from database.manager import get_session + except Exception as exc: + logger.warning('[AwaitingReviewPush] DB import failed: %s', exc) + return 0 + + # ๅ– chat_id๏ผˆ้ ่จญ admin๏ผ‰ + if chat_id is None: + chat_id = os.environ.get('TELEGRAM_ADMIN_CHAT_ID', '').strip() or None + if not chat_id: + logger.info('[AwaitingReviewPush] TELEGRAM_ADMIN_CHAT_ID ๆœช่จญ๏ผŒ่ทณ้ŽๆŽจ้€') + return 0 + + session = get_session() + try: + rows = session.execute( + sa_text(""" + SELECT id, distilled_text, weight, quality_score + FROM learning_episodes + WHERE promotion_status = 'awaiting_review' + AND reviewed_at IS NULL + ORDER BY created_at ASC + LIMIT :n + """), + {'n': batch}, + ).fetchall() + except Exception as exc: + logger.error('[AwaitingReviewPush] SELECT failed: %s', exc) + session.close() + return 0 + finally: + session.close() + + if not rows: + return 0 + + # ๆŽจ้€ + try: + from services.telegram_templates import promotion_review_keyboard, _send_telegram_raw + except Exception as exc: + logger.warning('[AwaitingReviewPush] template import failed: %s', exc) + return 0 + + for r in rows: + ep_id, text_, weight, quality = r[0], r[1], float(r[2] or 0), float(r[3] or 0) + msg = ( + f"๐Ÿง  RAG ๅญธ็ฟ’ๆ™‰ๅ‡ๅฏฉๆ ธ\n" + f"โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”\n" + f"๐Ÿ“‹ episode #{ep_id} (weight={weight:.2f} quality={quality:.2f})\n\n" + f"{(text_ or '')[:600]}\n\n" + f"ๅฏฉๆ ธ๏ผš้€š้Ž โ†’ ๅฏซๅ…ฅ ai_insights ไพ› RAG ๆชข็ดข๏ผ›ๆ‹’็ต• โ†’ ๆฐธไธๆ™‰ๅ‡" + ) + try: + _send_telegram_raw(msg, chat_id=chat_id, reply_markup=promotion_review_keyboard(ep_id)) + pushed += 1 + except Exception as exc: + logger.warning('[AwaitingReviewPush] episode_id=%s push failed: %s', ep_id, exc) + + logger.info('[AwaitingReviewPush] pushed %d awaiting_review episodes to chat=%s', + pushed, chat_id) + return pushed + + # โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ # ๅ…จๅŸŸๅ–ฎไพ‹ # โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ @@ -742,6 +897,8 @@ __all__ = [ 'learning_pipeline', 'promotion_gate', 'expire_stale_reviews', + 'process_pending_episodes', + 'push_awaiting_reviews_to_telegram', 'hash_human_approver', 'STAGE_1_AUTO_QUALITY', 'STAGE_3_DEDUP_THRESHOLD',