This commit is contained in:
@@ -146,6 +146,7 @@ def run_cleanup_agent_context():
|
||||
event_type="agent_context_cleanup_failure",
|
||||
priority="P2",
|
||||
title="agent_context TTL 清理失敗",
|
||||
dedup_ttl_sec=3600,
|
||||
)
|
||||
except Exception as _router_e:
|
||||
logger.error(f"[Cleanup] event_router 失敗: {_router_e}")
|
||||
|
||||
10
scheduler.py
10
scheduler.py
@@ -1644,6 +1644,7 @@ def run_whitepage_check():
|
||||
event_type="whitepage_check_failure",
|
||||
priority="P1",
|
||||
title="白頁檢查任務異常",
|
||||
dedup_ttl_sec=1800,
|
||||
)
|
||||
except Exception as _router_e:
|
||||
logging.error(f"[Whitepage] [Check] event_router 失敗: {_router_e}")
|
||||
@@ -2247,6 +2248,7 @@ def run_db_backup_task():
|
||||
priority="P1",
|
||||
title="資料庫備份失敗",
|
||||
payload={"filename": result.get("filename")},
|
||||
dedup_ttl_sec=3600,
|
||||
)
|
||||
except Exception as _router_e:
|
||||
logging.error(f"[Scheduler] [Backup] event_router 失敗: {_router_e}")
|
||||
@@ -2277,6 +2279,7 @@ def run_db_backup_task():
|
||||
event_type="db_backup_failure",
|
||||
priority="P1",
|
||||
title="資料庫備份排程異常",
|
||||
dedup_ttl_sec=3600,
|
||||
)
|
||||
except Exception as _router_e:
|
||||
logging.error(f"[Scheduler] [Backup] event_router 失敗: {_router_e}")
|
||||
@@ -2345,6 +2348,7 @@ def run_backup_monitor_task():
|
||||
priority="P2",
|
||||
title="資料庫備份健康告警",
|
||||
payload={"latest_file": info.get("filename"), "created_at": str(info.get("created_at"))},
|
||||
dedup_ttl_sec=21600,
|
||||
)
|
||||
except Exception as _router_e:
|
||||
logging.error(f"[Scheduler] [BackupMonitor] event_router 失敗: {_router_e}")
|
||||
@@ -2382,6 +2386,7 @@ def run_backup_monitor_task():
|
||||
event_type="backup_monitor_failure",
|
||||
priority="P2",
|
||||
title="資料庫備份監控任務異常",
|
||||
dedup_ttl_sec=3600,
|
||||
)
|
||||
except Exception as _router_e:
|
||||
logging.error(f"[Scheduler] [BackupMonitor] event_router 失敗: {_router_e}")
|
||||
@@ -2456,6 +2461,11 @@ def run_quality_rescore_task():
|
||||
try:
|
||||
from services.openclaw_learning_service import run_quality_rescore_batch
|
||||
result = run_quality_rescore_batch()
|
||||
try:
|
||||
from services.openclaw_learning_service import enqueue_missing_insight_embeddings
|
||||
result["embedding_backfill"] = enqueue_missing_insight_embeddings(limit=200)
|
||||
except Exception as _embed_e:
|
||||
logging.warning(f"[Scheduler] [Rescore] embedding backfill 略過: {_embed_e}")
|
||||
logging.info(
|
||||
f"[Scheduler] [Rescore] 品質重算完成 | 更新={result.get('updated', 0)} 筆"
|
||||
f" | relearn 自動歸檔={result.get('relearn_reset', 0)} 筆"
|
||||
|
||||
@@ -22,6 +22,9 @@ _QUEUE_PATH = os.getenv(
|
||||
os.path.join(os.path.dirname(os.path.dirname(__file__)), "data", "event_router_failed_deliveries.jsonl"),
|
||||
)
|
||||
_QUEUE_LOCK = threading.Lock()
|
||||
_DEDUP_LOCK = threading.Lock()
|
||||
_EVENT_DEDUP: Dict[str, float] = {}
|
||||
_DEFAULT_DEDUP_SEC = int(os.getenv("MOMO_EVENT_ROUTER_DEFAULT_DEDUP_SEC", "0"))
|
||||
|
||||
|
||||
async def _handle_l1(event: Dict[str, Any], session_id: str) -> Dict[str, Any]:
|
||||
@@ -112,6 +115,29 @@ def _is_event_silenced(event: Dict[str, Any]) -> bool:
|
||||
return False
|
||||
|
||||
|
||||
def _dedup_ttl_sec(event: Dict[str, Any]) -> int:
|
||||
payload = event.get("payload") if isinstance(event.get("payload"), dict) else {}
|
||||
raw = event.get("dedup_ttl_sec", payload.get("dedup_ttl_sec", _DEFAULT_DEDUP_SEC))
|
||||
try:
|
||||
return max(0, int(raw or 0))
|
||||
except (TypeError, ValueError):
|
||||
return 0
|
||||
|
||||
|
||||
def _is_duplicate_event(event: Dict[str, Any]) -> bool:
|
||||
ttl = _dedup_ttl_sec(event)
|
||||
if ttl <= 0:
|
||||
return False
|
||||
key = _event_key(event)
|
||||
now = time.time()
|
||||
with _DEDUP_LOCK:
|
||||
until = _EVENT_DEDUP.get(key)
|
||||
if until and until > now:
|
||||
return True
|
||||
_EVENT_DEDUP[key] = now + ttl
|
||||
return False
|
||||
|
||||
|
||||
def _queue_failed_delivery(
|
||||
event: Dict[str, Any],
|
||||
tier: str,
|
||||
@@ -197,6 +223,20 @@ async def dispatch(event: Dict[str, Any], admin_chat_ids: Optional[list] = None)
|
||||
"delivered": True,
|
||||
"silenced": True,
|
||||
"queued": False,
|
||||
"deduped": False,
|
||||
}
|
||||
|
||||
if _is_duplicate_event(event):
|
||||
return {
|
||||
"tier": tier,
|
||||
"sent": 0,
|
||||
"errors": [],
|
||||
"latency_ms": int((time.perf_counter() - started_at) * 1000),
|
||||
"payload": {"status": "deduped", "event_key": _event_key(event)},
|
||||
"delivered": True,
|
||||
"silenced": False,
|
||||
"queued": False,
|
||||
"deduped": True,
|
||||
}
|
||||
|
||||
result = await _run_tier_handler(tier, event, session_id)
|
||||
@@ -220,6 +260,7 @@ async def dispatch(event: Dict[str, Any], admin_chat_ids: Optional[list] = None)
|
||||
"delivered": send_result["ok"],
|
||||
"silenced": False,
|
||||
"queued": queued,
|
||||
"deduped": False,
|
||||
}
|
||||
except Exception as e:
|
||||
logger.exception(f"[EventRouter] dispatch failed: {e}")
|
||||
@@ -233,6 +274,7 @@ async def dispatch(event: Dict[str, Any], admin_chat_ids: Optional[list] = None)
|
||||
"delivered": False,
|
||||
"silenced": False,
|
||||
"queued": queued,
|
||||
"deduped": False,
|
||||
}
|
||||
|
||||
|
||||
@@ -286,6 +328,7 @@ def notify_failure(
|
||||
title: Optional[str] = None,
|
||||
trace: Optional[str] = None,
|
||||
payload: Optional[Dict[str, Any]] = None,
|
||||
dedup_ttl_sec: Optional[int] = None,
|
||||
) -> Dict[str, Any]:
|
||||
"""排程/背景任務失敗的同步通知 helper。"""
|
||||
severity = "alert" if priority in {"P1", "P2"} else "warning"
|
||||
@@ -300,6 +343,8 @@ def notify_failure(
|
||||
"trace": trace or "".join(traceback.format_exception(type(error), error, error.__traceback__)),
|
||||
"payload": {"task_name": task_name, **(payload or {})},
|
||||
}
|
||||
if dedup_ttl_sec is not None:
|
||||
event["dedup_ttl_sec"] = dedup_ttl_sec
|
||||
return dispatch_sync(event)
|
||||
|
||||
|
||||
|
||||
@@ -62,6 +62,41 @@ def enqueue_insight_embedding(insight_id: int, insight_type: str, content: str,
|
||||
return _enqueue_embedding("ai_insights", int(insight_id), embed_target_text)
|
||||
|
||||
|
||||
def enqueue_missing_insight_embeddings(limit: int = 200) -> dict:
|
||||
"""Backfill existing ai_insights that have not yet entered the embedding queue."""
|
||||
session = get_session()
|
||||
try:
|
||||
rows = session.execute(
|
||||
text("""
|
||||
SELECT id, insight_type, period, content
|
||||
FROM ai_insights i
|
||||
WHERE i.embedding IS NULL
|
||||
AND COALESCE(i.status, 'approved') NOT IN ('archived', 'rejected')
|
||||
AND NOT EXISTS (
|
||||
SELECT 1
|
||||
FROM embedding_retry_queue q
|
||||
WHERE q.target_table = 'ai_insights'
|
||||
AND q.target_id = i.id
|
||||
AND q.status IN ('pending', 'processing', 'done')
|
||||
)
|
||||
ORDER BY i.created_at DESC
|
||||
LIMIT :lim
|
||||
"""),
|
||||
{"lim": limit},
|
||||
).fetchall()
|
||||
except Exception as exc:
|
||||
sys_log.warning(f"[OCLearn] embedding backfill 略過 (schema/pgvector 未就緒?): {exc}")
|
||||
return {"scanned": 0, "enqueued": 0, "status": "skipped", "error": str(exc)[:200]}
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
enqueued = 0
|
||||
for row in rows:
|
||||
if enqueue_insight_embedding(row.id, row.insight_type, row.content, row.period):
|
||||
enqueued += 1
|
||||
return {"scanned": len(rows), "enqueued": enqueued, "status": "ok"}
|
||||
|
||||
|
||||
def _process_one_embedding(row_id: int, target_table: str, target_id: int,
|
||||
text_content: str, model: str) -> bool:
|
||||
"""處理單筆 embedding,成功寫回目標表,失敗累加 attempts"""
|
||||
|
||||
@@ -23,3 +23,37 @@ def test_enqueue_insight_embedding_rejects_missing_content(monkeypatch):
|
||||
|
||||
assert learning.enqueue_insight_embedding(42, "agent_action", "") is False
|
||||
assert learning.enqueue_insight_embedding(None, "agent_action", "hello") is False
|
||||
|
||||
|
||||
def test_enqueue_missing_insight_embeddings_queues_rows(monkeypatch):
|
||||
from types import SimpleNamespace
|
||||
import services.openclaw_learning_service as learning
|
||||
|
||||
rows = [
|
||||
SimpleNamespace(id=1, insight_type="mcp_cache", period=None, content="市場資料"),
|
||||
SimpleNamespace(id=2, insight_type="human_review", period="2026-04-29", content="人工審核"),
|
||||
]
|
||||
|
||||
class Result:
|
||||
def fetchall(self):
|
||||
return rows
|
||||
|
||||
class Session:
|
||||
def execute(self, *args, **kwargs):
|
||||
return Result()
|
||||
|
||||
def close(self):
|
||||
pass
|
||||
|
||||
calls = []
|
||||
monkeypatch.setattr(learning, "get_session", lambda: Session())
|
||||
monkeypatch.setattr(
|
||||
learning,
|
||||
"enqueue_insight_embedding",
|
||||
lambda insight_id, insight_type, content, period=None: calls.append((insight_id, insight_type, content, period)) or True,
|
||||
)
|
||||
|
||||
result = learning.enqueue_missing_insight_embeddings(limit=10)
|
||||
|
||||
assert result == {"scanned": 2, "enqueued": 2, "status": "ok"}
|
||||
assert calls[0] == (1, "mcp_cache", "市場資料", None)
|
||||
|
||||
@@ -85,3 +85,33 @@ def test_dispatch_executes_only_auto_safe_actions(monkeypatch):
|
||||
assert calls == [{"task_name": "run_momo_task"}]
|
||||
assert result["payload"]["executed_actions"][0]["status"] == "ok"
|
||||
assert result["payload"]["executed_actions"][1]["status"] == "rejected"
|
||||
|
||||
|
||||
def test_dispatch_dedupes_repeated_event(monkeypatch):
|
||||
import services.event_router as event_router
|
||||
|
||||
event_router._EVENT_DEDUP.clear()
|
||||
sent = []
|
||||
monkeypatch.setattr(event_router, "_is_event_silenced", lambda event: False)
|
||||
monkeypatch.setattr(
|
||||
event_router,
|
||||
"send_telegram_with_result",
|
||||
lambda *args, **kwargs: sent.append(True) or {"ok": True, "sent": 1, "failed": 0, "chat_ids": [123], "errors": []},
|
||||
)
|
||||
|
||||
event = {
|
||||
"source": "Scheduler.BackupMonitor",
|
||||
"event_type": "backup_monitor_alert",
|
||||
"severity": "alert",
|
||||
"title": "備份健康告警",
|
||||
"summary": "stale backup",
|
||||
"dedup_ttl_sec": 60,
|
||||
}
|
||||
|
||||
first = asyncio.run(event_router.dispatch(event))
|
||||
second = asyncio.run(event_router.dispatch(event))
|
||||
|
||||
assert first["deduped"] is False
|
||||
assert second["deduped"] is True
|
||||
assert second["sent"] == 0
|
||||
assert len(sent) == 1
|
||||
|
||||
Reference in New Issue
Block a user