From 78eebfbcfce65b0f8762295382569d1a085efa3a Mon Sep 17 00:00:00 2001 From: OoO Date: Wed, 29 Apr 2026 23:10:27 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8A=A0=E5=85=A5=E5=91=8A=E8=AD=A6=E5=8E=BB?= =?UTF-8?q?=E9=87=8D=E8=88=87=E6=B4=9E=E5=AF=9F=E5=90=91=E9=87=8F=E5=9B=9E?= =?UTF-8?q?=E8=A3=9C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- run_scheduler.py | 1 + scheduler.py | 10 +++++ services/event_router.py | 45 +++++++++++++++++++++++ services/openclaw_learning_service.py | 35 ++++++++++++++++++ tests/test_ai_insight_embedding_bridge.py | 34 +++++++++++++++++ tests/test_event_router.py | 30 +++++++++++++++ 6 files changed, 155 insertions(+) diff --git a/run_scheduler.py b/run_scheduler.py index c93b1bd..e50e4b7 100644 --- a/run_scheduler.py +++ b/run_scheduler.py @@ -146,6 +146,7 @@ def run_cleanup_agent_context(): event_type="agent_context_cleanup_failure", priority="P2", title="agent_context TTL 清理失敗", + dedup_ttl_sec=3600, ) except Exception as _router_e: logger.error(f"[Cleanup] event_router 失敗: {_router_e}") diff --git a/scheduler.py b/scheduler.py index 8f2735e..14c4dcd 100644 --- a/scheduler.py +++ b/scheduler.py @@ -1644,6 +1644,7 @@ def run_whitepage_check(): event_type="whitepage_check_failure", priority="P1", title="白頁檢查任務異常", + dedup_ttl_sec=1800, ) except Exception as _router_e: logging.error(f"[Whitepage] [Check] event_router 失敗: {_router_e}") @@ -2247,6 +2248,7 @@ def run_db_backup_task(): priority="P1", title="資料庫備份失敗", payload={"filename": result.get("filename")}, + dedup_ttl_sec=3600, ) except Exception as _router_e: logging.error(f"[Scheduler] [Backup] event_router 失敗: {_router_e}") @@ -2277,6 +2279,7 @@ def run_db_backup_task(): event_type="db_backup_failure", priority="P1", title="資料庫備份排程異常", + dedup_ttl_sec=3600, ) except Exception as _router_e: logging.error(f"[Scheduler] [Backup] event_router 失敗: {_router_e}") @@ -2345,6 +2348,7 @@ def run_backup_monitor_task(): priority="P2", title="資料庫備份健康告警", payload={"latest_file": info.get("filename"), "created_at": str(info.get("created_at"))}, + dedup_ttl_sec=21600, ) except Exception as _router_e: logging.error(f"[Scheduler] [BackupMonitor] event_router 失敗: {_router_e}") @@ -2382,6 +2386,7 @@ def run_backup_monitor_task(): event_type="backup_monitor_failure", priority="P2", title="資料庫備份監控任務異常", + dedup_ttl_sec=3600, ) except Exception as _router_e: logging.error(f"[Scheduler] [BackupMonitor] event_router 失敗: {_router_e}") @@ -2456,6 +2461,11 @@ def run_quality_rescore_task(): try: from services.openclaw_learning_service import run_quality_rescore_batch result = run_quality_rescore_batch() + try: + from services.openclaw_learning_service import enqueue_missing_insight_embeddings + result["embedding_backfill"] = enqueue_missing_insight_embeddings(limit=200) + except Exception as _embed_e: + logging.warning(f"[Scheduler] [Rescore] embedding backfill 略過: {_embed_e}") logging.info( f"[Scheduler] [Rescore] 品質重算完成 | 更新={result.get('updated', 0)} 筆" f" | relearn 自動歸檔={result.get('relearn_reset', 0)} 筆" diff --git a/services/event_router.py b/services/event_router.py index 2342bf6..71eb4b8 100644 --- a/services/event_router.py +++ b/services/event_router.py @@ -22,6 +22,9 @@ _QUEUE_PATH = os.getenv( os.path.join(os.path.dirname(os.path.dirname(__file__)), "data", "event_router_failed_deliveries.jsonl"), ) _QUEUE_LOCK = threading.Lock() +_DEDUP_LOCK = threading.Lock() +_EVENT_DEDUP: Dict[str, float] = {} +_DEFAULT_DEDUP_SEC = int(os.getenv("MOMO_EVENT_ROUTER_DEFAULT_DEDUP_SEC", "0")) async def _handle_l1(event: Dict[str, Any], session_id: str) -> Dict[str, Any]: @@ -112,6 +115,29 @@ def _is_event_silenced(event: Dict[str, Any]) -> bool: return False +def _dedup_ttl_sec(event: Dict[str, Any]) -> int: + payload = event.get("payload") if isinstance(event.get("payload"), dict) else {} + raw = event.get("dedup_ttl_sec", payload.get("dedup_ttl_sec", _DEFAULT_DEDUP_SEC)) + try: + return max(0, int(raw or 0)) + except (TypeError, ValueError): + return 0 + + +def _is_duplicate_event(event: Dict[str, Any]) -> bool: + ttl = _dedup_ttl_sec(event) + if ttl <= 0: + return False + key = _event_key(event) + now = time.time() + with _DEDUP_LOCK: + until = _EVENT_DEDUP.get(key) + if until and until > now: + return True + _EVENT_DEDUP[key] = now + ttl + return False + + def _queue_failed_delivery( event: Dict[str, Any], tier: str, @@ -197,6 +223,20 @@ async def dispatch(event: Dict[str, Any], admin_chat_ids: Optional[list] = None) "delivered": True, "silenced": True, "queued": False, + "deduped": False, + } + + if _is_duplicate_event(event): + return { + "tier": tier, + "sent": 0, + "errors": [], + "latency_ms": int((time.perf_counter() - started_at) * 1000), + "payload": {"status": "deduped", "event_key": _event_key(event)}, + "delivered": True, + "silenced": False, + "queued": False, + "deduped": True, } result = await _run_tier_handler(tier, event, session_id) @@ -220,6 +260,7 @@ async def dispatch(event: Dict[str, Any], admin_chat_ids: Optional[list] = None) "delivered": send_result["ok"], "silenced": False, "queued": queued, + "deduped": False, } except Exception as e: logger.exception(f"[EventRouter] dispatch failed: {e}") @@ -233,6 +274,7 @@ async def dispatch(event: Dict[str, Any], admin_chat_ids: Optional[list] = None) "delivered": False, "silenced": False, "queued": queued, + "deduped": False, } @@ -286,6 +328,7 @@ def notify_failure( title: Optional[str] = None, trace: Optional[str] = None, payload: Optional[Dict[str, Any]] = None, + dedup_ttl_sec: Optional[int] = None, ) -> Dict[str, Any]: """排程/背景任務失敗的同步通知 helper。""" severity = "alert" if priority in {"P1", "P2"} else "warning" @@ -300,6 +343,8 @@ def notify_failure( "trace": trace or "".join(traceback.format_exception(type(error), error, error.__traceback__)), "payload": {"task_name": task_name, **(payload or {})}, } + if dedup_ttl_sec is not None: + event["dedup_ttl_sec"] = dedup_ttl_sec return dispatch_sync(event) diff --git a/services/openclaw_learning_service.py b/services/openclaw_learning_service.py index 6c3445c..2001491 100644 --- a/services/openclaw_learning_service.py +++ b/services/openclaw_learning_service.py @@ -62,6 +62,41 @@ def enqueue_insight_embedding(insight_id: int, insight_type: str, content: str, return _enqueue_embedding("ai_insights", int(insight_id), embed_target_text) +def enqueue_missing_insight_embeddings(limit: int = 200) -> dict: + """Backfill existing ai_insights that have not yet entered the embedding queue.""" + session = get_session() + try: + rows = session.execute( + text(""" + SELECT id, insight_type, period, content + FROM ai_insights i + WHERE i.embedding IS NULL + AND COALESCE(i.status, 'approved') NOT IN ('archived', 'rejected') + AND NOT EXISTS ( + SELECT 1 + FROM embedding_retry_queue q + WHERE q.target_table = 'ai_insights' + AND q.target_id = i.id + AND q.status IN ('pending', 'processing', 'done') + ) + ORDER BY i.created_at DESC + LIMIT :lim + """), + {"lim": limit}, + ).fetchall() + except Exception as exc: + sys_log.warning(f"[OCLearn] embedding backfill 略過 (schema/pgvector 未就緒?): {exc}") + return {"scanned": 0, "enqueued": 0, "status": "skipped", "error": str(exc)[:200]} + finally: + session.close() + + enqueued = 0 + for row in rows: + if enqueue_insight_embedding(row.id, row.insight_type, row.content, row.period): + enqueued += 1 + return {"scanned": len(rows), "enqueued": enqueued, "status": "ok"} + + def _process_one_embedding(row_id: int, target_table: str, target_id: int, text_content: str, model: str) -> bool: """處理單筆 embedding,成功寫回目標表,失敗累加 attempts""" diff --git a/tests/test_ai_insight_embedding_bridge.py b/tests/test_ai_insight_embedding_bridge.py index c824c58..7efc8f4 100644 --- a/tests/test_ai_insight_embedding_bridge.py +++ b/tests/test_ai_insight_embedding_bridge.py @@ -23,3 +23,37 @@ def test_enqueue_insight_embedding_rejects_missing_content(monkeypatch): assert learning.enqueue_insight_embedding(42, "agent_action", "") is False assert learning.enqueue_insight_embedding(None, "agent_action", "hello") is False + + +def test_enqueue_missing_insight_embeddings_queues_rows(monkeypatch): + from types import SimpleNamespace + import services.openclaw_learning_service as learning + + rows = [ + SimpleNamespace(id=1, insight_type="mcp_cache", period=None, content="市場資料"), + SimpleNamespace(id=2, insight_type="human_review", period="2026-04-29", content="人工審核"), + ] + + class Result: + def fetchall(self): + return rows + + class Session: + def execute(self, *args, **kwargs): + return Result() + + def close(self): + pass + + calls = [] + monkeypatch.setattr(learning, "get_session", lambda: Session()) + monkeypatch.setattr( + learning, + "enqueue_insight_embedding", + lambda insight_id, insight_type, content, period=None: calls.append((insight_id, insight_type, content, period)) or True, + ) + + result = learning.enqueue_missing_insight_embeddings(limit=10) + + assert result == {"scanned": 2, "enqueued": 2, "status": "ok"} + assert calls[0] == (1, "mcp_cache", "市場資料", None) diff --git a/tests/test_event_router.py b/tests/test_event_router.py index 7b3da8b..07a2dee 100644 --- a/tests/test_event_router.py +++ b/tests/test_event_router.py @@ -85,3 +85,33 @@ def test_dispatch_executes_only_auto_safe_actions(monkeypatch): assert calls == [{"task_name": "run_momo_task"}] assert result["payload"]["executed_actions"][0]["status"] == "ok" assert result["payload"]["executed_actions"][1]["status"] == "rejected" + + +def test_dispatch_dedupes_repeated_event(monkeypatch): + import services.event_router as event_router + + event_router._EVENT_DEDUP.clear() + sent = [] + monkeypatch.setattr(event_router, "_is_event_silenced", lambda event: False) + monkeypatch.setattr( + event_router, + "send_telegram_with_result", + lambda *args, **kwargs: sent.append(True) or {"ok": True, "sent": 1, "failed": 0, "chat_ids": [123], "errors": []}, + ) + + event = { + "source": "Scheduler.BackupMonitor", + "event_type": "backup_monitor_alert", + "severity": "alert", + "title": "備份健康告警", + "summary": "stale backup", + "dedup_ttl_sec": 60, + } + + first = asyncio.run(event_router.dispatch(event)) + second = asyncio.run(event_router.dispatch(event)) + + assert first["deduped"] is False + assert second["deduped"] is True + assert second["sent"] == 0 + assert len(sent) == 1