From e6109c2ef8a8477481032a22088430dbbb25ce34 Mon Sep 17 00:00:00 2001 From: ogt Date: Sun, 19 Apr 2026 11:38:01 +0800 Subject: [PATCH] =?UTF-8?q?feat(adr-005):=20=E6=AF=8F=E6=97=A5=E5=8E=BB?= =?UTF-8?q?=E9=87=8D=2003:00=20+=20=E5=93=81=E8=B3=AA=E5=88=86=E6=95=B8?= =?UTF-8?q?=E9=87=8D=E7=AE=97=2004:00=20=E6=89=B9=E6=AC=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit openclaw_learning_service.py: - run_dedup_batch(): 同 SKU/type/period 保留最高 avg_quality,其餘 archived - run_quality_rescore_batch(): 套時間衰減公式全量重算 avg_quality; relearn 狀態額外 -20%;分數 < 0.05 自動歸檔 scheduler.py + run_scheduler.py: - run_dedup_batch_task() → 每日 03:00 - run_quality_rescore_task() → 每日 04:00 Co-Authored-By: Claude Sonnet 4.6 --- run_scheduler.py | 8 ++ scheduler.py | 30 +++++++ services/openclaw_learning_service.py | 117 ++++++++++++++++++++++++++ 3 files changed, 155 insertions(+) diff --git a/run_scheduler.py b/run_scheduler.py index 50a3964..57680ad 100644 --- a/run_scheduler.py +++ b/run_scheduler.py @@ -48,6 +48,8 @@ def main(): run_weekly_strategy_task, run_db_backup_task, run_backup_monitor_task, + run_dedup_batch_task, + run_quality_rescore_task, ) logger.info("✅ 排程任務模組載入成功") except ImportError as e: @@ -91,6 +93,12 @@ def main(): schedule.every(6).hours.do(run_backup_monitor_task) logger.info("📅 已設定:每 6 小時執行備份健康監控(AI Agent 跟進)") + schedule.every().day.at("03:00").do(run_dedup_batch_task) + logger.info("📅 已設定:每日 03:00 執行 ai_insights 去重批次") + + schedule.every().day.at("04:00").do(run_quality_rescore_task) + logger.info("📅 已設定:每日 04:00 執行 ai_insights 品質分數時間衰減重算") + logger.info("=" * 60) logger.info("✅ 排程器已啟動,等待任務執行...") logger.info("=" * 60) diff --git a/scheduler.py b/scheduler.py index c4826ce..a66e026 100644 --- a/scheduler.py +++ b/scheduler.py @@ -1805,6 +1805,36 @@ def run_backup_monitor_task(): _save_stats('backup_monitor', {"status": "Error", "error": str(e)}) +def run_dedup_batch_task(): + """每日 03:00 — ai_insights 去重批次(同 SKU 同 type 同 period 保留最高品質)""" + try: + from services.openclaw_learning_service import run_dedup_batch + result = run_dedup_batch() + logging.info( + f"[Scheduler] [Dedup] 去重完成 | 歸檔={result.get('archived', 0)} 筆" + f" / 掃描={result.get('scanned', 0)} 筆" + ) + _save_stats('dedup_batch', result) + except Exception as e: + logging.error(f"[Scheduler] [Dedup] 去重批次異常: {e}") + _save_stats('dedup_batch', {"status": "Error", "error": str(e)}) + + +def run_quality_rescore_task(): + """每日 04:00 — ai_insights 品質分數時間衰減重算批次""" + try: + from services.openclaw_learning_service import run_quality_rescore_batch + result = run_quality_rescore_batch() + logging.info( + f"[Scheduler] [Rescore] 品質重算完成 | 更新={result.get('updated', 0)} 筆" + f" | relearn 自動歸檔={result.get('relearn_reset', 0)} 筆" + ) + _save_stats('quality_rescore', result) + except Exception as e: + logging.error(f"[Scheduler] [Rescore] 品質分數重算異常: {e}") + _save_stats('quality_rescore', {"status": "Error", "error": str(e)}) + + if __name__ == "__main__": # 此檔案現在由 app.py 導入並由其主執行緒管理排程。 # 若需獨立測試,可在此處臨時加入調用程式碼。 diff --git a/services/openclaw_learning_service.py b/services/openclaw_learning_service.py index f5d52f0..6490c89 100644 --- a/services/openclaw_learning_service.py +++ b/services/openclaw_learning_service.py @@ -391,3 +391,120 @@ def get_learning_stats() -> dict: return {"total_insights": 0, "status": "error", "error": str(e)} finally: session.close() + + +# ===================================================================== +# ADR-005 Step-2: 每日批次維護(03:00 去重 / 04:00 品質分數重算) +# ===================================================================== + +def run_dedup_batch() -> dict: + """ + 03:00 批次:去除同日同 SKU 同 insight_type 的重複洞察。 + 策略:保留 avg_quality 最高者(tie-break: 最新 id),其餘設 status='archived'。 + 回傳 {"archived": int, "scanned": int} + """ + session = get_session() + archived = 0 + scanned = 0 + try: + dupes = session.execute(text(""" + SELECT insight_type, product_sku, period, COUNT(*) AS cnt + FROM ai_insights + WHERE status NOT IN ('archived', 'relearn') + AND product_sku IS NOT NULL + GROUP BY insight_type, product_sku, period + HAVING COUNT(*) > 1 + """)).fetchall() + + for row in dupes: + scanned += row[3] + keep = session.execute(text(""" + SELECT id FROM ai_insights + WHERE insight_type = :t AND product_sku = :sku + AND period IS NOT DISTINCT FROM :p + AND status NOT IN ('archived', 'relearn') + ORDER BY avg_quality DESC NULLS LAST, id DESC + LIMIT 1 + """), {"t": row[0], "sku": row[1], "p": row[2]}).scalar() + + if keep is None: + continue + + result = session.execute(text(""" + UPDATE ai_insights + SET status = 'archived', updated_at = CURRENT_TIMESTAMP + WHERE insight_type = :t AND product_sku = :sku + AND period IS NOT DISTINCT FROM :p + AND status NOT IN ('archived', 'relearn') + AND id != :keep_id + """), {"t": row[0], "sku": row[1], "p": row[2], "keep_id": keep}) + archived += result.rowcount + + session.commit() + sys_log.info(f"[OCLearn] 去重批次完成:掃描 {scanned} 筆,歸檔 {archived} 筆") + return {"archived": archived, "scanned": scanned} + except Exception as e: + session.rollback() + sys_log.error(f"[OCLearn] 去重批次失敗: {e}") + return {"archived": 0, "scanned": 0, "error": str(e)} + finally: + session.close() + + +def run_quality_rescore_batch() -> dict: + """ + 04:00 批次:對所有 approved/relearn 狀態的 ai_insights 套用時間衰減公式。 + decay_exempt=True 的記錄跳過衰減(永久知識)。 + relearn 狀態額外懲罰 20%。分數 < 0.05 自動歸檔。 + 回傳 {"updated": int, "relearn_reset": int} + """ + session = get_session() + updated = 0 + relearn_reset = 0 + try: + rows = session.execute(text(""" + SELECT id, avg_quality, created_at, decay_exempt, status + FROM ai_insights + WHERE status IN ('approved', 'relearn') + """)).fetchall() + + for row in rows: + row_id, base_q, created_at, exempt, status = row + base_q = base_q or 0.5 + + if exempt: + continue + + if created_at and created_at.tzinfo is None: + created_at = created_at.replace(tzinfo=timezone.utc) + + new_score = compute_effective_score(base_q, created_at) + if status == "relearn": + new_score *= 0.8 + + session.execute(text(""" + UPDATE ai_insights + SET avg_quality = :q, updated_at = CURRENT_TIMESTAMP + WHERE id = :id + """), {"q": round(new_score, 4), "id": row_id}) + updated += 1 + + if new_score < 0.05: + session.execute(text( + "UPDATE ai_insights SET status = 'archived' WHERE id = :id" + ), {"id": row_id}) + if status == "relearn": + relearn_reset += 1 + + session.commit() + sys_log.info( + f"[OCLearn] 品質分數重算完成:更新 {updated} 筆," + f"低分自動歸檔含 relearn {relearn_reset} 筆" + ) + return {"updated": updated, "relearn_reset": relearn_reset} + except Exception as e: + session.rollback() + sys_log.error(f"[OCLearn] 品質分數重算批次失敗: {e}") + return {"updated": 0, "relearn_reset": 0, "error": str(e)} + finally: + session.close()