From 0c2e9bbcedea01bf40587c484fdbb1961a2262f9 Mon Sep 17 00:00:00 2001 From: OoO Date: Wed, 29 Apr 2026 23:05:46 +0800 Subject: [PATCH] =?UTF-8?q?=E4=B8=B2=E6=8E=A5=20AI=20=E6=B4=9E=E5=AF=9F?= =?UTF-8?q?=E5=90=91=E9=87=8F=E5=8C=96=E8=88=87=E6=BC=8F=E9=80=9A=E7=9F=A5?= =?UTF-8?q?=E5=85=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- run_scheduler.py | 12 ++++ scheduler.py | 62 ++++++++++++++++++++ services/auto_heal_service.py | 5 ++ services/code_review_pipeline_service.py | 15 ++++- services/elephant_alpha_autonomous_engine.py | 15 ++++- services/elephant_alpha_orchestrator.py | 14 ++++- services/mcp_collector_service.py | 11 +++- services/openclaw_learning_service.py | 9 +++ services/openclaw_strategist_service.py | 6 ++ tests/test_ai_insight_embedding_bridge.py | 25 ++++++++ 10 files changed, 166 insertions(+), 8 deletions(-) create mode 100644 tests/test_ai_insight_embedding_bridge.py diff --git a/run_scheduler.py b/run_scheduler.py index d6e1550..c93b1bd 100644 --- a/run_scheduler.py +++ b/run_scheduler.py @@ -137,6 +137,18 @@ def run_cleanup_agent_context(): logger.info("[Cleanup] agent_context TTL 清理完成") except Exception as e: logger.error(f"[Cleanup] agent_context 清理失敗: {e}") + try: + from services.event_router import notify_failure + notify_failure( + task_name="run_cleanup_agent_context", + error=e, + source="Scheduler.Cleanup", + event_type="agent_context_cleanup_failure", + priority="P2", + title="agent_context TTL 清理失敗", + ) + except Exception as _router_e: + logger.error(f"[Cleanup] event_router 失敗: {_router_e}") finally: session.close() diff --git a/scheduler.py b/scheduler.py index d85bb0e..8f2735e 100644 --- a/scheduler.py +++ b/scheduler.py @@ -1635,6 +1635,18 @@ def run_whitepage_check(): except Exception as e: logging.error(f"[Whitepage] [Check] 🚨 白頁檢查任務異常 | Error: {e}") _save_stats('whitepage_check', {"status": "Failed", "error": str(e)}) + try: + from services.event_router import notify_failure + notify_failure( + task_name="run_whitepage_check", + error=e, + source="Scheduler.Whitepage", + event_type="whitepage_check_failure", + priority="P1", + title="白頁檢查任務異常", + ) + except Exception as _router_e: + logging.error(f"[Whitepage] [Check] event_router 失敗: {_router_e}") def start_scheduled_job(): """使用執行緒啟動任務,避免阻塞主程式""" @@ -2225,6 +2237,19 @@ def run_db_backup_task(): ) logging.error(f"[Scheduler] [Backup] ❌ 備份失敗: {result.get('error')}") _save_stats('db_backup', {"status": "Failed", "error": result.get("error")}) + try: + from services.event_router import notify_failure + notify_failure( + task_name="run_db_backup_task", + error=RuntimeError(result.get("error") or "database backup failed"), + source="Scheduler.DBBackup", + event_type="db_backup_failure", + priority="P1", + title="資料庫備份失敗", + payload={"filename": result.get("filename")}, + ) + except Exception as _router_e: + logging.error(f"[Scheduler] [Backup] event_router 失敗: {_router_e}") try: from services.openclaw_learning_service import store_insight @@ -2243,6 +2268,18 @@ def run_db_backup_task(): except Exception as e: logging.error(f"[Scheduler] [Backup] 🚨 任務異常 | Error: {e}") _save_stats('db_backup', {"status": "Error", "error": str(e)}) + try: + from services.event_router import notify_failure + notify_failure( + task_name="run_db_backup_task", + error=e, + source="Scheduler.DBBackup", + event_type="db_backup_failure", + priority="P1", + title="資料庫備份排程異常", + ) + except Exception as _router_e: + logging.error(f"[Scheduler] [Backup] event_router 失敗: {_router_e}") try: from services.notification_manager import NotificationManager NotificationManager()._send_telegram_messages([ @@ -2298,6 +2335,19 @@ def run_backup_monitor_task(): ) logging.warning(f"[Scheduler] [BackupMonitor] ⚠️ 備份告警: {alert_reason}") NotificationManager()._send_telegram_messages([msg]) + try: + from services.event_router import notify_failure + notify_failure( + task_name="run_backup_monitor_task", + error=RuntimeError(alert_reason), + source="Scheduler.BackupMonitor", + event_type="backup_monitor_alert", + priority="P2", + title="資料庫備份健康告警", + payload={"latest_file": info.get("filename"), "created_at": str(info.get("created_at"))}, + ) + except Exception as _router_e: + logging.error(f"[Scheduler] [BackupMonitor] event_router 失敗: {_router_e}") try: from services.openclaw_learning_service import store_insight @@ -2323,6 +2373,18 @@ def run_backup_monitor_task(): except Exception as e: logging.error(f"[Scheduler] [BackupMonitor] 🚨 監控任務異常 | Error: {e}") _save_stats('backup_monitor', {"status": "Error", "error": str(e)}) + try: + from services.event_router import notify_failure + notify_failure( + task_name="run_backup_monitor_task", + error=e, + source="Scheduler.BackupMonitor", + event_type="backup_monitor_failure", + priority="P2", + title="資料庫備份監控任務異常", + ) + except Exception as _router_e: + logging.error(f"[Scheduler] [BackupMonitor] event_router 失敗: {_router_e}") def run_openclaw_meta_analysis_task(): diff --git a/services/auto_heal_service.py b/services/auto_heal_service.py index 989a6b7..2872ca6 100644 --- a/services/auto_heal_service.py +++ b/services/auto_heal_service.py @@ -522,6 +522,11 @@ class AutoHealService: ) session.add(insight) session.commit() + try: + from services.openclaw_learning_service import enqueue_insight_embedding + enqueue_insight_embedding(insight.id, "auto_heal_playbook", insight.content) + except Exception as embed_err: + self._log.warning("[AutoHeal] embedding queue enqueue failed: %s", embed_err) except Exception as exc: self._log.error("[AutoHeal] ai_insight write failed: %s", exc) if session is not None: diff --git a/services/code_review_pipeline_service.py b/services/code_review_pipeline_service.py index e1cf718..f813919 100644 --- a/services/code_review_pipeline_service.py +++ b/services/code_review_pipeline_service.py @@ -477,13 +477,14 @@ class CodeReviewPipeline: def _save_to_db(self, findings: List[Dict], openclaw_report: str, ea: Dict): session = get_session() try: - session.execute(text(""" + row = session.execute(text(""" INSERT INTO ai_insights (insight_type, content, confidence, created_by, status, metadata_json, period, created_at) VALUES ('code_review_result', :content, :conf, 'code_review_pipeline', 'active', :meta, :period, NOW()) + RETURNING id """), { "content": json.dumps({ "findings": findings, @@ -500,8 +501,18 @@ class CodeReviewPipeline: "auto_fix_triggered": self.state["auto_fix_triggered"], }, ensure_ascii=False), "period": datetime.now().strftime("%Y-%m-%d"), - }) + }).fetchone() session.commit() + if row: + try: + from services.openclaw_learning_service import enqueue_insight_embedding + enqueue_insight_embedding(row[0], "code_review_result", json.dumps({ + "findings": findings, + "openclaw_report": openclaw_report, + "ea_decision": ea, + }, ensure_ascii=False), datetime.now().strftime("%Y-%m-%d")) + except Exception as embed_err: + logger.warning("[CodeReview] embedding queue enqueue failed: %s", embed_err) logger.info("[CodeReview] ai_insights 寫入成功 pipeline=%s", self.pipeline_id) except Exception as e: logger.error("[CodeReview] DB 寫入失敗: %s", e) diff --git a/services/elephant_alpha_autonomous_engine.py b/services/elephant_alpha_autonomous_engine.py index 46999d6..83bb0b5 100644 --- a/services/elephant_alpha_autonomous_engine.py +++ b/services/elephant_alpha_autonomous_engine.py @@ -648,11 +648,12 @@ class ElephantAlphaAutonomousEngine: self._log.warning("Escalating to human: %s", trigger.trigger_type) session = get_session() try: - session.execute( + row = session.execute( text(""" INSERT INTO ai_insights (insight_type, content, confidence, created_by, status, metadata_json) VALUES (:type, :content, :conf, :by, :status, :meta) + RETURNING id """), { "type": "human_review", @@ -669,8 +670,18 @@ class ElephantAlphaAutonomousEngine: "reason": "low_confidence" }), }, - ) + ).fetchone() session.commit() + if row: + try: + from services.openclaw_learning_service import enqueue_insight_embedding + enqueue_insight_embedding( + row[0], + "human_review", + f"[Elephant Alpha 升級審核] {trigger.trigger_type} 信心度僅 {decision.confidence:.2f}", + ) + except Exception as embed_err: + self._log.warning("Embedding enqueue failed for human_review: %s", embed_err) except Exception as e: self._log.error("DB escalation write failed: %s", e) session.rollback() diff --git a/services/elephant_alpha_orchestrator.py b/services/elephant_alpha_orchestrator.py index ad73abc..2b33a71 100644 --- a/services/elephant_alpha_orchestrator.py +++ b/services/elephant_alpha_orchestrator.py @@ -374,11 +374,12 @@ Provide your strategic decision in the specified JSON format. session = get_session() try: # B4b FIX: metadata → metadata_json; confidence/created_by added by migration 015 - session.execute(text(""" + row = session.execute(text(""" INSERT INTO ai_insights (insight_type, content, confidence, created_by, status, metadata_json) VALUES (:type, :content, :confidence, :created_by, :status, :metadata) + RETURNING id """), { "type": "elephant_alpha_decision", "content": json.dumps({ @@ -390,8 +391,17 @@ Provide your strategic decision in the specified JSON format. "created_by": "elephant_alpha", "status": "executed", "metadata": json.dumps({"agents_required": decision.agents_required}) - }) + }).fetchone() session.commit() + if row: + try: + from services.openclaw_learning_service import enqueue_insight_embedding + enqueue_insight_embedding(row[0], "elephant_alpha_decision", json.dumps({ + "decision": decision.__dict__, + "context": context, + }, ensure_ascii=False)) + except Exception as embed_err: + logger.warning("[ElephantAlpha] embedding queue enqueue failed: %s", embed_err) finally: session.close() diff --git a/services/mcp_collector_service.py b/services/mcp_collector_service.py index 3c88652..7e50f74 100644 --- a/services/mcp_collector_service.py +++ b/services/mcp_collector_service.py @@ -110,15 +110,22 @@ class MCPCollectorService: def _write_cache(self, topic: str, content: str) -> None: session = get_session() try: - session.execute(text(""" + row = session.execute(text(""" INSERT INTO ai_insights (insight_type, content, confidence, created_by, status, metadata_json) VALUES ('mcp_cache', :content, 0.9, 'mcp_collector', 'active', :meta) + RETURNING id """), { "content": content[:4000], "meta": json.dumps({"topic": topic, "model": MCP_MODEL, "cached_at": datetime.now().isoformat()}) - }) + }).fetchone() session.commit() + if row: + try: + from services.openclaw_learning_service import enqueue_insight_embedding + enqueue_insight_embedding(row[0], "mcp_cache", content[:4000]) + except Exception as embed_err: + logger.warning("[MCP] embedding queue enqueue failed: %s", embed_err) except Exception as e: logger.warning("[MCP] 快取寫入失敗: %s", e) session.rollback() diff --git a/services/openclaw_learning_service.py b/services/openclaw_learning_service.py index 1eeaeb4..6c3445c 100644 --- a/services/openclaw_learning_service.py +++ b/services/openclaw_learning_service.py @@ -53,6 +53,15 @@ def _enqueue_embedding(target_table: str, target_id: int, text_content: str, session.close() +def enqueue_insight_embedding(insight_id: int, insight_type: str, content: str, + period: str = None) -> bool: + """Public bridge for legacy raw ai_insights inserts to join ADR-007 embedding queue.""" + if not insight_id or not content: + return False + embed_target_text = f"{insight_type} ({period or ''}): {content}" + return _enqueue_embedding("ai_insights", int(insight_id), embed_target_text) + + def _process_one_embedding(row_id: int, target_table: str, target_id: int, text_content: str, model: str) -> bool: """處理單筆 embedding,成功寫回目標表,失敗累加 attempts""" diff --git a/services/openclaw_strategist_service.py b/services/openclaw_strategist_service.py index c228a07..16e272d 100644 --- a/services/openclaw_strategist_service.py +++ b/services/openclaw_strategist_service.py @@ -286,6 +286,12 @@ def _save_to_ai_insights( }).fetchone() session.commit() insight_id = row[0] if row else None + if insight_id: + try: + from services.openclaw_learning_service import enqueue_insight_embedding + enqueue_insight_embedding(insight_id, insight_type, content[:8000], period or datetime.now().strftime("%Y-%m-%d")) + except Exception as embed_err: + logger.warning("[OpenClaw] embedding queue enqueue failed id=%s: %s", insight_id, embed_err) logger.info("[OpenClaw] ai_insights 寫入成功 id=%s type=%s", insight_id, insight_type) return insight_id except Exception as e: diff --git a/tests/test_ai_insight_embedding_bridge.py b/tests/test_ai_insight_embedding_bridge.py new file mode 100644 index 0000000..c824c58 --- /dev/null +++ b/tests/test_ai_insight_embedding_bridge.py @@ -0,0 +1,25 @@ +def test_enqueue_insight_embedding_builds_queue_payload(monkeypatch): + import services.openclaw_learning_service as learning + + calls = [] + monkeypatch.setattr( + learning, + "_enqueue_embedding", + lambda table, target_id, text: calls.append((table, target_id, text)) or True, + ) + + assert learning.enqueue_insight_embedding(42, "agent_action", "hello", "2026-04-29") is True + assert calls == [("ai_insights", 42, "agent_action (2026-04-29): hello")] + + +def test_enqueue_insight_embedding_rejects_missing_content(monkeypatch): + import services.openclaw_learning_service as learning + + monkeypatch.setattr( + learning, + "_enqueue_embedding", + lambda *args, **kwargs: (_ for _ in ()).throw(AssertionError("should not enqueue")), + ) + + assert learning.enqueue_insight_embedding(42, "agent_action", "") is False + assert learning.enqueue_insight_embedding(None, "agent_action", "hello") is False