feat(adr-005): 每日去重 03:00 + 品質分數重算 04:00 批次
All checks were successful
CD Pipeline / deploy (push) Successful in 1m8s

openclaw_learning_service.py:
- run_dedup_batch(): 同 SKU/type/period 保留最高 avg_quality,其餘 archived
- run_quality_rescore_batch(): 套時間衰減公式全量重算 avg_quality;
  relearn 狀態額外 -20%;分數 < 0.05 自動歸檔

scheduler.py + run_scheduler.py:
- run_dedup_batch_task()  → 每日 03:00
- run_quality_rescore_task() → 每日 04:00

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
ogt
2026-04-19 11:38:01 +08:00
parent 8c6fe961cb
commit e6109c2ef8
3 changed files with 155 additions and 0 deletions

View File

@@ -48,6 +48,8 @@ def main():
run_weekly_strategy_task,
run_db_backup_task,
run_backup_monitor_task,
run_dedup_batch_task,
run_quality_rescore_task,
)
logger.info("✅ 排程任務模組載入成功")
except ImportError as e:
@@ -91,6 +93,12 @@ def main():
schedule.every(6).hours.do(run_backup_monitor_task)
logger.info("📅 已設定:每 6 小時執行備份健康監控AI Agent 跟進)")
schedule.every().day.at("03:00").do(run_dedup_batch_task)
logger.info("📅 已設定:每日 03:00 執行 ai_insights 去重批次")
schedule.every().day.at("04:00").do(run_quality_rescore_task)
logger.info("📅 已設定:每日 04:00 執行 ai_insights 品質分數時間衰減重算")
logger.info("=" * 60)
logger.info("✅ 排程器已啟動,等待任務執行...")
logger.info("=" * 60)

View File

@@ -1805,6 +1805,36 @@ def run_backup_monitor_task():
_save_stats('backup_monitor', {"status": "Error", "error": str(e)})
def run_dedup_batch_task():
"""每日 03:00 — ai_insights 去重批次(同 SKU 同 type 同 period 保留最高品質)"""
try:
from services.openclaw_learning_service import run_dedup_batch
result = run_dedup_batch()
logging.info(
f"[Scheduler] [Dedup] 去重完成 | 歸檔={result.get('archived', 0)}"
f" / 掃描={result.get('scanned', 0)}"
)
_save_stats('dedup_batch', result)
except Exception as e:
logging.error(f"[Scheduler] [Dedup] 去重批次異常: {e}")
_save_stats('dedup_batch', {"status": "Error", "error": str(e)})
def run_quality_rescore_task():
"""每日 04:00 — ai_insights 品質分數時間衰減重算批次"""
try:
from services.openclaw_learning_service import run_quality_rescore_batch
result = run_quality_rescore_batch()
logging.info(
f"[Scheduler] [Rescore] 品質重算完成 | 更新={result.get('updated', 0)}"
f" | relearn 自動歸檔={result.get('relearn_reset', 0)}"
)
_save_stats('quality_rescore', result)
except Exception as e:
logging.error(f"[Scheduler] [Rescore] 品質分數重算異常: {e}")
_save_stats('quality_rescore', {"status": "Error", "error": str(e)})
if __name__ == "__main__":
# 此檔案現在由 app.py 導入並由其主執行緒管理排程。
# 若需獨立測試,可在此處臨時加入調用程式碼。

View File

@@ -391,3 +391,120 @@ def get_learning_stats() -> dict:
return {"total_insights": 0, "status": "error", "error": str(e)}
finally:
session.close()
# =====================================================================
# ADR-005 Step-2: 每日批次維護03:00 去重 / 04:00 品質分數重算)
# =====================================================================
def run_dedup_batch() -> dict:
"""
03:00 批次:去除同日同 SKU 同 insight_type 的重複洞察。
策略:保留 avg_quality 最高者tie-break: 最新 id其餘設 status='archived'
回傳 {"archived": int, "scanned": int}
"""
session = get_session()
archived = 0
scanned = 0
try:
dupes = session.execute(text("""
SELECT insight_type, product_sku, period, COUNT(*) AS cnt
FROM ai_insights
WHERE status NOT IN ('archived', 'relearn')
AND product_sku IS NOT NULL
GROUP BY insight_type, product_sku, period
HAVING COUNT(*) > 1
""")).fetchall()
for row in dupes:
scanned += row[3]
keep = session.execute(text("""
SELECT id FROM ai_insights
WHERE insight_type = :t AND product_sku = :sku
AND period IS NOT DISTINCT FROM :p
AND status NOT IN ('archived', 'relearn')
ORDER BY avg_quality DESC NULLS LAST, id DESC
LIMIT 1
"""), {"t": row[0], "sku": row[1], "p": row[2]}).scalar()
if keep is None:
continue
result = session.execute(text("""
UPDATE ai_insights
SET status = 'archived', updated_at = CURRENT_TIMESTAMP
WHERE insight_type = :t AND product_sku = :sku
AND period IS NOT DISTINCT FROM :p
AND status NOT IN ('archived', 'relearn')
AND id != :keep_id
"""), {"t": row[0], "sku": row[1], "p": row[2], "keep_id": keep})
archived += result.rowcount
session.commit()
sys_log.info(f"[OCLearn] 去重批次完成:掃描 {scanned} 筆,歸檔 {archived}")
return {"archived": archived, "scanned": scanned}
except Exception as e:
session.rollback()
sys_log.error(f"[OCLearn] 去重批次失敗: {e}")
return {"archived": 0, "scanned": 0, "error": str(e)}
finally:
session.close()
def run_quality_rescore_batch() -> dict:
"""
04:00 批次:對所有 approved/relearn 狀態的 ai_insights 套用時間衰減公式。
decay_exempt=True 的記錄跳過衰減(永久知識)。
relearn 狀態額外懲罰 20%。分數 < 0.05 自動歸檔。
回傳 {"updated": int, "relearn_reset": int}
"""
session = get_session()
updated = 0
relearn_reset = 0
try:
rows = session.execute(text("""
SELECT id, avg_quality, created_at, decay_exempt, status
FROM ai_insights
WHERE status IN ('approved', 'relearn')
""")).fetchall()
for row in rows:
row_id, base_q, created_at, exempt, status = row
base_q = base_q or 0.5
if exempt:
continue
if created_at and created_at.tzinfo is None:
created_at = created_at.replace(tzinfo=timezone.utc)
new_score = compute_effective_score(base_q, created_at)
if status == "relearn":
new_score *= 0.8
session.execute(text("""
UPDATE ai_insights
SET avg_quality = :q, updated_at = CURRENT_TIMESTAMP
WHERE id = :id
"""), {"q": round(new_score, 4), "id": row_id})
updated += 1
if new_score < 0.05:
session.execute(text(
"UPDATE ai_insights SET status = 'archived' WHERE id = :id"
), {"id": row_id})
if status == "relearn":
relearn_reset += 1
session.commit()
sys_log.info(
f"[OCLearn] 品質分數重算完成:更新 {updated} 筆,"
f"低分自動歸檔含 relearn {relearn_reset}"
)
return {"updated": updated, "relearn_reset": relearn_reset}
except Exception as e:
session.rollback()
sys_log.error(f"[OCLearn] 品質分數重算批次失敗: {e}")
return {"updated": 0, "relearn_reset": 0, "error": str(e)}
finally:
session.close()