From f0e044aa48d6568978db204987ab9bdf2e3789a6 Mon Sep 17 00:00:00 2001 From: OoO Date: Wed, 29 Apr 2026 23:02:06 +0800 Subject: [PATCH] =?UTF-8?q?=E8=A3=9C=E9=BD=8A=E8=87=AA=E7=99=92=E7=A8=BD?= =?UTF-8?q?=E6=A0=B8=E8=88=87=20OpenClaw=20=E8=A8=98=E6=86=B6=E9=96=89?= =?UTF-8?q?=E7=92=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- scheduler.py | 52 ++++++++++ services/auto_heal_service.py | 39 ++++++-- services/openclaw_learning_service.py | 131 +++++++++++++++++++++++--- tests/test_auto_heal_safety.py | 42 +++++++++ 4 files changed, 241 insertions(+), 23 deletions(-) create mode 100644 tests/test_auto_heal_safety.py diff --git a/scheduler.py b/scheduler.py index 3c93839..d85bb0e 100644 --- a/scheduler.py +++ b/scheduler.py @@ -2059,6 +2059,19 @@ def run_icaim_analysis_task(): if not result.success: logging.error(f"[Scheduler] [ICAIM] Hermes 失敗: {result.error}") + try: + from services.event_router import notify_failure + notify_failure( + task_name="run_icaim_analysis_task", + error=RuntimeError(result.error or "Hermes analysis failed"), + source="Scheduler.ICAIM", + event_type="icaim_hermes_failure", + priority="P1", + title="ICAIM Hermes 分析失敗", + payload={"duration_sec": hermes_duration, "candidates": result.total_candidates}, + ) + except Exception as _router_e: + logging.error(f"[Scheduler] [ICAIM] Hermes failure event_router 失敗: {_router_e}") return if not result.threats: @@ -2323,6 +2336,19 @@ def run_openclaw_meta_analysis_task(): import traceback as _tb logging.error(f"[Scheduler] [MetaAnalysis] 🚨 Meta-Analysis 任務異常: {e}") _save_stats('meta_analysis', {"status": "Error", "error": str(e)}) + try: + from services.event_router import notify_failure + notify_failure( + task_name="run_openclaw_meta_analysis_task", + error=e, + source="Scheduler.MetaAnalysis", + event_type="openclaw_report_failure", + priority="P2", + title="OpenClaw Meta-Analysis 任務異常", + trace=_tb.format_exc(), + ) + except Exception as _router_e: + logging.error(f"[Scheduler] [MetaAnalysis] event_router 失敗: {_router_e}") try: from services.auto_heal_service import auto_heal_service auto_heal_service.handle_exception( @@ -2406,6 +2432,19 @@ def run_daily_report_task(): import traceback as _tb logging.error(f"[Scheduler] [DailyReport] 🚨 日報任務異常: {e}") _save_stats('daily_report', {"status": "Error", "error": str(e)}) + try: + from services.event_router import notify_failure + notify_failure( + task_name="run_daily_report_task", + error=e, + source="Scheduler.DailyReport", + event_type="openclaw_report_failure", + priority="P2", + title="OpenClaw 日報任務異常", + trace=_tb.format_exc(), + ) + except Exception as _router_e: + logging.error(f"[Scheduler] [DailyReport] event_router 失敗: {_router_e}") try: from services.auto_heal_service import auto_heal_service auto_heal_service.handle_exception( @@ -2431,6 +2470,19 @@ def run_monthly_report_task(): import traceback as _tb logging.error(f"[Scheduler] [MonthlyReport] 🚨 月報任務異常: {e}") _save_stats('monthly_report', {"status": "Error", "error": str(e)}) + try: + from services.event_router import notify_failure + notify_failure( + task_name="run_monthly_report_task", + error=e, + source="Scheduler.MonthlyReport", + event_type="openclaw_report_failure", + priority="P2", + title="OpenClaw 月報任務異常", + trace=_tb.format_exc(), + ) + except Exception as _router_e: + logging.error(f"[Scheduler] [MonthlyReport] event_router 失敗: {_router_e}") try: from services.auto_heal_service import auto_heal_service auto_heal_service.handle_exception( diff --git a/services/auto_heal_service.py b/services/auto_heal_service.py index 0d6dc08..989a6b7 100644 --- a/services/auto_heal_service.py +++ b/services/auto_heal_service.py @@ -43,6 +43,11 @@ ESCALATION_COOLDOWN_MIN = int(os.getenv("ELEPHANT_ALPHA_ESCALATION_COOLDOWN_MIN" # ---- Constants ---- _ALLOWED_ACTION_TYPES = frozenset({"DOCKER_RESTART", "WAIT_RETRY", "ALERT_ONLY", "SSH_CMD", "CODE_FIX"}) _PROTECTED_CONTAINERS = frozenset({"momo-db", "momo-postgres"}) +_ALLOWED_SSH_HOSTS = frozenset( + h.strip() for h in os.getenv("ELEPHANT_ALPHA_ALLOWED_SSH_HOSTS", "192.168.0.188").split(",") if h.strip() +) +_ALLOWED_SSH_DOCKER_SUBCOMMANDS = frozenset({"ps", "logs", "inspect"}) +_ALLOWED_SSH_READONLY_COMMANDS = frozenset({"df", "free", "uptime"}) _OFFLINE_PLAYBOOKS = { "DNS_FAIL": { "id": 0, @@ -230,9 +235,6 @@ class AutoHealService: self._log.warning("[AutoHeal] %s", msg) return AutoHealResult(success=False, action=playbook["action_type"], message=msg) - if playbook["action_type"] == "CODE_FIX": - return self._handle_code_fix(playbook, context) - # cooldown guard cooldown_key = self._cooldown_key(playbook) last = _load_escalation(cooldown_key) @@ -241,6 +243,13 @@ class AutoHealService: self._log.info("[AutoHeal] %s", msg) return AutoHealResult(success=False, action=playbook["action_type"], message=msg) + if playbook["action_type"] == "CODE_FIX": + started_at = datetime.now() + result = self._handle_code_fix(playbook, context) + duration_ms = (datetime.now() - started_at).total_seconds() * 1000 + self._alert_and_store(playbook, context, result, duration_ms) + return result + # generic action execution return self._execute_playbook_action(playbook, context) @@ -391,14 +400,14 @@ class AutoHealService: argv = params.get("argv") if not isinstance(argv, list) or not argv: return AutoHealResult(success=False, action="SSH_CMD", message="argv must be a non-empty list") + if not self._is_allowed_ssh_argv(argv): + return AutoHealResult(success=False, action="SSH_CMD", message=f"argv is not in read-only allowlist: {argv}") host = params.get("host", "") user = params.get("user", "ollama") - try: - from services.elephant_alpha_orchestrator import elephant_orchestrator - # validate host/user via orchestrator context (lightweight) - _ = elephant_orchestrator._validate_host_port(host) - except Exception: - pass + if host not in _ALLOWED_SSH_HOSTS: + return AutoHealResult(success=False, action="SSH_CMD", message=f"host is not allowed: {host}") + if user != "ollama": + return AutoHealResult(success=False, action="SSH_CMD", message=f"user is not allowed: {user}") result = _ssh_exec( jump_host=SSH_JUMP_HOST, jump_user=SSH_JUMP_USER, @@ -410,6 +419,18 @@ class AutoHealService: out = result["stdout"] or result["stderr"] return AutoHealResult(success=result["success"], action="SSH_CMD", message=out) + def _is_allowed_ssh_argv(self, argv: List[Any]) -> bool: + normalized = [str(part) for part in argv] + if not normalized: + return False + if normalized[0] == "docker": + if len(normalized) < 2 or normalized[1] not in _ALLOWED_SSH_DOCKER_SUBCOMMANDS: + return False + return not any(part in _PROTECTED_CONTAINERS for part in normalized[2:]) + if normalized[0] in _ALLOWED_SSH_READONLY_COMMANDS: + return True + return False + def _alert_and_store( self, playbook: Dict[str, Any], diff --git a/services/openclaw_learning_service.py b/services/openclaw_learning_service.py index 6a604be..1eeaeb4 100644 --- a/services/openclaw_learning_service.py +++ b/services/openclaw_learning_service.py @@ -121,6 +121,21 @@ def _embedding_worker_loop(): try: session = get_session() try: + session.execute( + text(""" + UPDATE embedding_retry_queue + SET status = 'pending', + updated_at = :now, + last_error = COALESCE(last_error, '') || ' | reset stale processing' + WHERE status = 'processing' + AND updated_at < :cutoff + """), + { + "now": datetime.now(), + "cutoff": datetime.fromtimestamp(time.time() - 15 * 60), + }, + ) + session.commit() rows = session.execute( text(""" SELECT id, target_table, target_id, text_content, model @@ -155,7 +170,8 @@ threading.Thread(target=_embedding_worker_loop, daemon=True).start() def store_insight(insight_type: str, content: str, period: str = None, product_sku: str = None, metadata: dict = None, - ai_model: str = None) -> int: + ai_model: str = None, confidence: float = None, + created_by: str = None, status: str = None) -> int: """ 將 AI 產出存入 ai_insights 表並排程向量化。 - Cache-aside:同 insight_type + period + product_sku 已存在則覆蓋 @@ -181,26 +197,44 @@ def store_insight(insight_type: str, content: str, period: str = None, insight_id = existing.id sys_log.info(f"[OCLearn] 更新 insight_type={insight_type} period={period}") else: - new_insight = AIInsight( - insight_type=insight_type, - period=period, - product_sku=product_sku, - content=content, - metadata_json=meta_str, - created_at=datetime.now(), - updated_at=datetime.now(), - ) + insight_kwargs = { + "insight_type": insight_type, + "period": period, + "product_sku": product_sku, + "content": content, + "metadata_json": meta_str, + "status": status or "approved", + "created_at": datetime.now(), + "updated_at": datetime.now(), + } + if confidence is not None: + insight_kwargs["confidence"] = confidence + if created_by: + insight_kwargs["created_by"] = created_by + new_insight = AIInsight(**insight_kwargs) session.add(new_insight) session.commit() insight_id = new_insight.id sys_log.info(f"[OCLearn] 新增 insight_type={insight_type} period={period}") - # 若已有 ai_model 欄位則寫入(Migration 010 後可用,先試再容錯) + # 若已有進階欄位則寫入(Migration 010/015 後可用,先試再容錯) + update_fields = {} if ai_model: + update_fields["ai_model"] = ai_model + if confidence is not None: + update_fields["confidence"] = confidence + if created_by: + update_fields["created_by"] = created_by + if status: + update_fields["status"] = status + if update_fields: try: + assignments = ", ".join(f"{key} = :{key}" for key in update_fields) + params = dict(update_fields) + params["i"] = insight_id session.execute( - text("UPDATE ai_insights SET ai_model = :m WHERE id = :i"), - {"m": ai_model, "i": insight_id}, + text(f"UPDATE ai_insights SET {assignments}, updated_at = :now WHERE id = :i"), + {**params, "now": datetime.now()}, ) session.commit() except Exception: @@ -208,7 +242,28 @@ def store_insight(insight_type: str, content: str, period: str = None, # 推入 DB retry queue(持久化) embed_target_text = f"{insight_type} ({period or ''}): {content}" - _enqueue_embedding("ai_insights", insight_id, embed_target_text) + embedding_queued = _enqueue_embedding("ai_insights", insight_id, embed_target_text) + if not embedding_queued: + try: + metadata_payload = metadata.copy() if isinstance(metadata, dict) else {} + metadata_payload["embedding_queue_error"] = True + session.execute( + text(""" + UPDATE ai_insights + SET metadata_json = :meta, + updated_at = :now + WHERE id = :id + """), + { + "meta": json.dumps(metadata_payload, ensure_ascii=False), + "now": datetime.now(), + "id": insight_id, + }, + ) + session.commit() + except Exception as meta_err: + session.rollback() + sys_log.warning(f"[OCLearn] embedding queue error metadata 寫入失敗: {meta_err}") return insight_id @@ -247,6 +302,10 @@ def build_rag_context(query: str, insight_type: str = None, period: str = None, """ session = get_session() try: + semantic_context = _build_semantic_rag_context(session, query, insight_type, period, top_k) + if semantic_context: + return semantic_context + q = session.query(AIInsight) if insight_type: q = q.filter_by(insight_type=insight_type) @@ -288,6 +347,50 @@ def build_rag_context(query: str, insight_type: str = None, period: str = None, session.close() +def _build_semantic_rag_context(session, query: str, insight_type: str = None, + period: str = None, top_k: int = 5) -> str: + if not query: + return "" + try: + vec = ollama_service.generate_embedding(query) + if not vec: + return "" + filters = ["embedding IS NOT NULL", "status IN ('approved', 'active', 'executed')"] + params = {"qvec": str(vec), "lim": top_k} + if insight_type: + filters.append("insight_type = :insight_type") + params["insight_type"] = insight_type + if period: + filters.append("period = :period") + params["period"] = period + rows = session.execute( + text(f""" + SELECT id, insight_type, period, content, avg_quality, decay_exempt, created_at, + embedding <=> CAST(:qvec AS vector) AS distance + FROM ai_insights + WHERE {' AND '.join(filters)} + ORDER BY distance ASC, created_at DESC + LIMIT :lim + """), + params, + ).fetchall() + if not rows: + return "" + parts = [] + for row in rows: + base = row.avg_quality if row.avg_quality is not None else 0.5 + effective = base if row.decay_exempt else compute_effective_score(base, row.created_at) + p_tag = f"[{row.period}]" if row.period else "[語意記憶]" + parts.append( + f"{p_tag} {row.insight_type} (語意距離={row.distance:.3f}, 分數={effective:.2f}):\n{row.content}" + ) + sys_log.info(f"[OCLearn] semantic RAG context: {len(parts)} 筆") + return "\n\n---\n\n".join(parts) + except Exception as exc: + sys_log.debug(f"[OCLearn] semantic RAG fallback to decay ranking: {exc}") + return "" + + def build_rag_context_by_date(start_date: str, end_date: str) -> str: """週報 RAG:依日期區間過濾 ai_insights""" session = get_session() diff --git a/tests/test_auto_heal_safety.py b/tests/test_auto_heal_safety.py new file mode 100644 index 0000000..3e9f4f0 --- /dev/null +++ b/tests/test_auto_heal_safety.py @@ -0,0 +1,42 @@ +def test_auto_heal_blocks_unsafe_ssh_commands(): + from services.auto_heal_service import AutoHealService + + svc = AutoHealService() + + assert svc._is_allowed_ssh_argv(["docker", "ps"]) is True + assert svc._is_allowed_ssh_argv(["df", "-h"]) is True + assert svc._is_allowed_ssh_argv(["docker", "restart", "momo-pro-system"]) is False + assert svc._is_allowed_ssh_argv(["docker", "logs", "momo-db"]) is False + assert svc._is_allowed_ssh_argv(["bash", "-lc", "whoami"]) is False + + +def test_auto_heal_code_fix_writes_audit(monkeypatch): + from services.auto_heal_service import AutoHealResult, AutoHealService + + svc = AutoHealService() + calls = [] + monkeypatch.setattr( + svc, + "_handle_code_fix", + lambda playbook, context: AutoHealResult(True, "CODE_FIX", "fixed"), + ) + monkeypatch.setattr( + svc, + "_alert_and_store", + lambda playbook, context, result, duration_ms=0.0: calls.append((playbook, context, result, duration_ms)), + ) + monkeypatch.setattr("services.auto_heal_service._find_playbook", lambda error_type: { + "id": 99, + "name": "Code fix", + "action_type": "CODE_FIX", + "action_params": {}, + "max_retries": 1, + "cooldown_min": 0, + }) + monkeypatch.setattr(svc, "_ensure_incident", lambda error_type, context: 123) + + result = svc.handle_exception("code_exception", {"target_file": "services/example.py"}) + + assert result.success is True + assert calls + assert calls[0][2].action == "CODE_FIX"