This commit is contained in:
52
scheduler.py
52
scheduler.py
@@ -2059,6 +2059,19 @@ def run_icaim_analysis_task():
|
||||
|
||||
if not result.success:
|
||||
logging.error(f"[Scheduler] [ICAIM] Hermes 失敗: {result.error}")
|
||||
try:
|
||||
from services.event_router import notify_failure
|
||||
notify_failure(
|
||||
task_name="run_icaim_analysis_task",
|
||||
error=RuntimeError(result.error or "Hermes analysis failed"),
|
||||
source="Scheduler.ICAIM",
|
||||
event_type="icaim_hermes_failure",
|
||||
priority="P1",
|
||||
title="ICAIM Hermes 分析失敗",
|
||||
payload={"duration_sec": hermes_duration, "candidates": result.total_candidates},
|
||||
)
|
||||
except Exception as _router_e:
|
||||
logging.error(f"[Scheduler] [ICAIM] Hermes failure event_router 失敗: {_router_e}")
|
||||
return
|
||||
|
||||
if not result.threats:
|
||||
@@ -2323,6 +2336,19 @@ def run_openclaw_meta_analysis_task():
|
||||
import traceback as _tb
|
||||
logging.error(f"[Scheduler] [MetaAnalysis] 🚨 Meta-Analysis 任務異常: {e}")
|
||||
_save_stats('meta_analysis', {"status": "Error", "error": str(e)})
|
||||
try:
|
||||
from services.event_router import notify_failure
|
||||
notify_failure(
|
||||
task_name="run_openclaw_meta_analysis_task",
|
||||
error=e,
|
||||
source="Scheduler.MetaAnalysis",
|
||||
event_type="openclaw_report_failure",
|
||||
priority="P2",
|
||||
title="OpenClaw Meta-Analysis 任務異常",
|
||||
trace=_tb.format_exc(),
|
||||
)
|
||||
except Exception as _router_e:
|
||||
logging.error(f"[Scheduler] [MetaAnalysis] event_router 失敗: {_router_e}")
|
||||
try:
|
||||
from services.auto_heal_service import auto_heal_service
|
||||
auto_heal_service.handle_exception(
|
||||
@@ -2406,6 +2432,19 @@ def run_daily_report_task():
|
||||
import traceback as _tb
|
||||
logging.error(f"[Scheduler] [DailyReport] 🚨 日報任務異常: {e}")
|
||||
_save_stats('daily_report', {"status": "Error", "error": str(e)})
|
||||
try:
|
||||
from services.event_router import notify_failure
|
||||
notify_failure(
|
||||
task_name="run_daily_report_task",
|
||||
error=e,
|
||||
source="Scheduler.DailyReport",
|
||||
event_type="openclaw_report_failure",
|
||||
priority="P2",
|
||||
title="OpenClaw 日報任務異常",
|
||||
trace=_tb.format_exc(),
|
||||
)
|
||||
except Exception as _router_e:
|
||||
logging.error(f"[Scheduler] [DailyReport] event_router 失敗: {_router_e}")
|
||||
try:
|
||||
from services.auto_heal_service import auto_heal_service
|
||||
auto_heal_service.handle_exception(
|
||||
@@ -2431,6 +2470,19 @@ def run_monthly_report_task():
|
||||
import traceback as _tb
|
||||
logging.error(f"[Scheduler] [MonthlyReport] 🚨 月報任務異常: {e}")
|
||||
_save_stats('monthly_report', {"status": "Error", "error": str(e)})
|
||||
try:
|
||||
from services.event_router import notify_failure
|
||||
notify_failure(
|
||||
task_name="run_monthly_report_task",
|
||||
error=e,
|
||||
source="Scheduler.MonthlyReport",
|
||||
event_type="openclaw_report_failure",
|
||||
priority="P2",
|
||||
title="OpenClaw 月報任務異常",
|
||||
trace=_tb.format_exc(),
|
||||
)
|
||||
except Exception as _router_e:
|
||||
logging.error(f"[Scheduler] [MonthlyReport] event_router 失敗: {_router_e}")
|
||||
try:
|
||||
from services.auto_heal_service import auto_heal_service
|
||||
auto_heal_service.handle_exception(
|
||||
|
||||
@@ -43,6 +43,11 @@ ESCALATION_COOLDOWN_MIN = int(os.getenv("ELEPHANT_ALPHA_ESCALATION_COOLDOWN_MIN"
|
||||
# ---- Constants ----
|
||||
_ALLOWED_ACTION_TYPES = frozenset({"DOCKER_RESTART", "WAIT_RETRY", "ALERT_ONLY", "SSH_CMD", "CODE_FIX"})
|
||||
_PROTECTED_CONTAINERS = frozenset({"momo-db", "momo-postgres"})
|
||||
_ALLOWED_SSH_HOSTS = frozenset(
|
||||
h.strip() for h in os.getenv("ELEPHANT_ALPHA_ALLOWED_SSH_HOSTS", "192.168.0.188").split(",") if h.strip()
|
||||
)
|
||||
_ALLOWED_SSH_DOCKER_SUBCOMMANDS = frozenset({"ps", "logs", "inspect"})
|
||||
_ALLOWED_SSH_READONLY_COMMANDS = frozenset({"df", "free", "uptime"})
|
||||
_OFFLINE_PLAYBOOKS = {
|
||||
"DNS_FAIL": {
|
||||
"id": 0,
|
||||
@@ -230,9 +235,6 @@ class AutoHealService:
|
||||
self._log.warning("[AutoHeal] %s", msg)
|
||||
return AutoHealResult(success=False, action=playbook["action_type"], message=msg)
|
||||
|
||||
if playbook["action_type"] == "CODE_FIX":
|
||||
return self._handle_code_fix(playbook, context)
|
||||
|
||||
# cooldown guard
|
||||
cooldown_key = self._cooldown_key(playbook)
|
||||
last = _load_escalation(cooldown_key)
|
||||
@@ -241,6 +243,13 @@ class AutoHealService:
|
||||
self._log.info("[AutoHeal] %s", msg)
|
||||
return AutoHealResult(success=False, action=playbook["action_type"], message=msg)
|
||||
|
||||
if playbook["action_type"] == "CODE_FIX":
|
||||
started_at = datetime.now()
|
||||
result = self._handle_code_fix(playbook, context)
|
||||
duration_ms = (datetime.now() - started_at).total_seconds() * 1000
|
||||
self._alert_and_store(playbook, context, result, duration_ms)
|
||||
return result
|
||||
|
||||
# generic action execution
|
||||
return self._execute_playbook_action(playbook, context)
|
||||
|
||||
@@ -391,14 +400,14 @@ class AutoHealService:
|
||||
argv = params.get("argv")
|
||||
if not isinstance(argv, list) or not argv:
|
||||
return AutoHealResult(success=False, action="SSH_CMD", message="argv must be a non-empty list")
|
||||
if not self._is_allowed_ssh_argv(argv):
|
||||
return AutoHealResult(success=False, action="SSH_CMD", message=f"argv is not in read-only allowlist: {argv}")
|
||||
host = params.get("host", "")
|
||||
user = params.get("user", "ollama")
|
||||
try:
|
||||
from services.elephant_alpha_orchestrator import elephant_orchestrator
|
||||
# validate host/user via orchestrator context (lightweight)
|
||||
_ = elephant_orchestrator._validate_host_port(host)
|
||||
except Exception:
|
||||
pass
|
||||
if host not in _ALLOWED_SSH_HOSTS:
|
||||
return AutoHealResult(success=False, action="SSH_CMD", message=f"host is not allowed: {host}")
|
||||
if user != "ollama":
|
||||
return AutoHealResult(success=False, action="SSH_CMD", message=f"user is not allowed: {user}")
|
||||
result = _ssh_exec(
|
||||
jump_host=SSH_JUMP_HOST,
|
||||
jump_user=SSH_JUMP_USER,
|
||||
@@ -410,6 +419,18 @@ class AutoHealService:
|
||||
out = result["stdout"] or result["stderr"]
|
||||
return AutoHealResult(success=result["success"], action="SSH_CMD", message=out)
|
||||
|
||||
def _is_allowed_ssh_argv(self, argv: List[Any]) -> bool:
|
||||
normalized = [str(part) for part in argv]
|
||||
if not normalized:
|
||||
return False
|
||||
if normalized[0] == "docker":
|
||||
if len(normalized) < 2 or normalized[1] not in _ALLOWED_SSH_DOCKER_SUBCOMMANDS:
|
||||
return False
|
||||
return not any(part in _PROTECTED_CONTAINERS for part in normalized[2:])
|
||||
if normalized[0] in _ALLOWED_SSH_READONLY_COMMANDS:
|
||||
return True
|
||||
return False
|
||||
|
||||
def _alert_and_store(
|
||||
self,
|
||||
playbook: Dict[str, Any],
|
||||
|
||||
@@ -121,6 +121,21 @@ def _embedding_worker_loop():
|
||||
try:
|
||||
session = get_session()
|
||||
try:
|
||||
session.execute(
|
||||
text("""
|
||||
UPDATE embedding_retry_queue
|
||||
SET status = 'pending',
|
||||
updated_at = :now,
|
||||
last_error = COALESCE(last_error, '') || ' | reset stale processing'
|
||||
WHERE status = 'processing'
|
||||
AND updated_at < :cutoff
|
||||
"""),
|
||||
{
|
||||
"now": datetime.now(),
|
||||
"cutoff": datetime.fromtimestamp(time.time() - 15 * 60),
|
||||
},
|
||||
)
|
||||
session.commit()
|
||||
rows = session.execute(
|
||||
text("""
|
||||
SELECT id, target_table, target_id, text_content, model
|
||||
@@ -155,7 +170,8 @@ threading.Thread(target=_embedding_worker_loop, daemon=True).start()
|
||||
|
||||
def store_insight(insight_type: str, content: str, period: str = None,
|
||||
product_sku: str = None, metadata: dict = None,
|
||||
ai_model: str = None) -> int:
|
||||
ai_model: str = None, confidence: float = None,
|
||||
created_by: str = None, status: str = None) -> int:
|
||||
"""
|
||||
將 AI 產出存入 ai_insights 表並排程向量化。
|
||||
- Cache-aside:同 insight_type + period + product_sku 已存在則覆蓋
|
||||
@@ -181,26 +197,44 @@ def store_insight(insight_type: str, content: str, period: str = None,
|
||||
insight_id = existing.id
|
||||
sys_log.info(f"[OCLearn] 更新 insight_type={insight_type} period={period}")
|
||||
else:
|
||||
new_insight = AIInsight(
|
||||
insight_type=insight_type,
|
||||
period=period,
|
||||
product_sku=product_sku,
|
||||
content=content,
|
||||
metadata_json=meta_str,
|
||||
created_at=datetime.now(),
|
||||
updated_at=datetime.now(),
|
||||
)
|
||||
insight_kwargs = {
|
||||
"insight_type": insight_type,
|
||||
"period": period,
|
||||
"product_sku": product_sku,
|
||||
"content": content,
|
||||
"metadata_json": meta_str,
|
||||
"status": status or "approved",
|
||||
"created_at": datetime.now(),
|
||||
"updated_at": datetime.now(),
|
||||
}
|
||||
if confidence is not None:
|
||||
insight_kwargs["confidence"] = confidence
|
||||
if created_by:
|
||||
insight_kwargs["created_by"] = created_by
|
||||
new_insight = AIInsight(**insight_kwargs)
|
||||
session.add(new_insight)
|
||||
session.commit()
|
||||
insight_id = new_insight.id
|
||||
sys_log.info(f"[OCLearn] 新增 insight_type={insight_type} period={period}")
|
||||
|
||||
# 若已有 ai_model 欄位則寫入(Migration 010 後可用,先試再容錯)
|
||||
# 若已有進階欄位則寫入(Migration 010/015 後可用,先試再容錯)
|
||||
update_fields = {}
|
||||
if ai_model:
|
||||
update_fields["ai_model"] = ai_model
|
||||
if confidence is not None:
|
||||
update_fields["confidence"] = confidence
|
||||
if created_by:
|
||||
update_fields["created_by"] = created_by
|
||||
if status:
|
||||
update_fields["status"] = status
|
||||
if update_fields:
|
||||
try:
|
||||
assignments = ", ".join(f"{key} = :{key}" for key in update_fields)
|
||||
params = dict(update_fields)
|
||||
params["i"] = insight_id
|
||||
session.execute(
|
||||
text("UPDATE ai_insights SET ai_model = :m WHERE id = :i"),
|
||||
{"m": ai_model, "i": insight_id},
|
||||
text(f"UPDATE ai_insights SET {assignments}, updated_at = :now WHERE id = :i"),
|
||||
{**params, "now": datetime.now()},
|
||||
)
|
||||
session.commit()
|
||||
except Exception:
|
||||
@@ -208,7 +242,28 @@ def store_insight(insight_type: str, content: str, period: str = None,
|
||||
|
||||
# 推入 DB retry queue(持久化)
|
||||
embed_target_text = f"{insight_type} ({period or ''}): {content}"
|
||||
_enqueue_embedding("ai_insights", insight_id, embed_target_text)
|
||||
embedding_queued = _enqueue_embedding("ai_insights", insight_id, embed_target_text)
|
||||
if not embedding_queued:
|
||||
try:
|
||||
metadata_payload = metadata.copy() if isinstance(metadata, dict) else {}
|
||||
metadata_payload["embedding_queue_error"] = True
|
||||
session.execute(
|
||||
text("""
|
||||
UPDATE ai_insights
|
||||
SET metadata_json = :meta,
|
||||
updated_at = :now
|
||||
WHERE id = :id
|
||||
"""),
|
||||
{
|
||||
"meta": json.dumps(metadata_payload, ensure_ascii=False),
|
||||
"now": datetime.now(),
|
||||
"id": insight_id,
|
||||
},
|
||||
)
|
||||
session.commit()
|
||||
except Exception as meta_err:
|
||||
session.rollback()
|
||||
sys_log.warning(f"[OCLearn] embedding queue error metadata 寫入失敗: {meta_err}")
|
||||
|
||||
return insight_id
|
||||
|
||||
@@ -247,6 +302,10 @@ def build_rag_context(query: str, insight_type: str = None, period: str = None,
|
||||
"""
|
||||
session = get_session()
|
||||
try:
|
||||
semantic_context = _build_semantic_rag_context(session, query, insight_type, period, top_k)
|
||||
if semantic_context:
|
||||
return semantic_context
|
||||
|
||||
q = session.query(AIInsight)
|
||||
if insight_type:
|
||||
q = q.filter_by(insight_type=insight_type)
|
||||
@@ -288,6 +347,50 @@ def build_rag_context(query: str, insight_type: str = None, period: str = None,
|
||||
session.close()
|
||||
|
||||
|
||||
def _build_semantic_rag_context(session, query: str, insight_type: str = None,
|
||||
period: str = None, top_k: int = 5) -> str:
|
||||
if not query:
|
||||
return ""
|
||||
try:
|
||||
vec = ollama_service.generate_embedding(query)
|
||||
if not vec:
|
||||
return ""
|
||||
filters = ["embedding IS NOT NULL", "status IN ('approved', 'active', 'executed')"]
|
||||
params = {"qvec": str(vec), "lim": top_k}
|
||||
if insight_type:
|
||||
filters.append("insight_type = :insight_type")
|
||||
params["insight_type"] = insight_type
|
||||
if period:
|
||||
filters.append("period = :period")
|
||||
params["period"] = period
|
||||
rows = session.execute(
|
||||
text(f"""
|
||||
SELECT id, insight_type, period, content, avg_quality, decay_exempt, created_at,
|
||||
embedding <=> CAST(:qvec AS vector) AS distance
|
||||
FROM ai_insights
|
||||
WHERE {' AND '.join(filters)}
|
||||
ORDER BY distance ASC, created_at DESC
|
||||
LIMIT :lim
|
||||
"""),
|
||||
params,
|
||||
).fetchall()
|
||||
if not rows:
|
||||
return ""
|
||||
parts = []
|
||||
for row in rows:
|
||||
base = row.avg_quality if row.avg_quality is not None else 0.5
|
||||
effective = base if row.decay_exempt else compute_effective_score(base, row.created_at)
|
||||
p_tag = f"[{row.period}]" if row.period else "[語意記憶]"
|
||||
parts.append(
|
||||
f"{p_tag} {row.insight_type} (語意距離={row.distance:.3f}, 分數={effective:.2f}):\n{row.content}"
|
||||
)
|
||||
sys_log.info(f"[OCLearn] semantic RAG context: {len(parts)} 筆")
|
||||
return "\n\n---\n\n".join(parts)
|
||||
except Exception as exc:
|
||||
sys_log.debug(f"[OCLearn] semantic RAG fallback to decay ranking: {exc}")
|
||||
return ""
|
||||
|
||||
|
||||
def build_rag_context_by_date(start_date: str, end_date: str) -> str:
|
||||
"""週報 RAG:依日期區間過濾 ai_insights"""
|
||||
session = get_session()
|
||||
|
||||
42
tests/test_auto_heal_safety.py
Normal file
42
tests/test_auto_heal_safety.py
Normal file
@@ -0,0 +1,42 @@
|
||||
def test_auto_heal_blocks_unsafe_ssh_commands():
|
||||
from services.auto_heal_service import AutoHealService
|
||||
|
||||
svc = AutoHealService()
|
||||
|
||||
assert svc._is_allowed_ssh_argv(["docker", "ps"]) is True
|
||||
assert svc._is_allowed_ssh_argv(["df", "-h"]) is True
|
||||
assert svc._is_allowed_ssh_argv(["docker", "restart", "momo-pro-system"]) is False
|
||||
assert svc._is_allowed_ssh_argv(["docker", "logs", "momo-db"]) is False
|
||||
assert svc._is_allowed_ssh_argv(["bash", "-lc", "whoami"]) is False
|
||||
|
||||
|
||||
def test_auto_heal_code_fix_writes_audit(monkeypatch):
|
||||
from services.auto_heal_service import AutoHealResult, AutoHealService
|
||||
|
||||
svc = AutoHealService()
|
||||
calls = []
|
||||
monkeypatch.setattr(
|
||||
svc,
|
||||
"_handle_code_fix",
|
||||
lambda playbook, context: AutoHealResult(True, "CODE_FIX", "fixed"),
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
svc,
|
||||
"_alert_and_store",
|
||||
lambda playbook, context, result, duration_ms=0.0: calls.append((playbook, context, result, duration_ms)),
|
||||
)
|
||||
monkeypatch.setattr("services.auto_heal_service._find_playbook", lambda error_type: {
|
||||
"id": 99,
|
||||
"name": "Code fix",
|
||||
"action_type": "CODE_FIX",
|
||||
"action_params": {},
|
||||
"max_retries": 1,
|
||||
"cooldown_min": 0,
|
||||
})
|
||||
monkeypatch.setattr(svc, "_ensure_incident", lambda error_type, context: 123)
|
||||
|
||||
result = svc.handle_exception("code_exception", {"target_file": "services/example.py"})
|
||||
|
||||
assert result.success is True
|
||||
assert calls
|
||||
assert calls[0][2].action == "CODE_FIX"
|
||||
Reference in New Issue
Block a user