From aaebd6b86d63eb46a2d7508c778005c43373ebf0 Mon Sep 17 00:00:00 2001 From: OoO Date: Tue, 19 May 2026 21:01:44 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B8=85=E7=90=86=20EA=20=E9=81=8E=E6=9C=9F?= =?UTF-8?q?=E8=A1=8C=E5=8B=95=E9=9A=8A=E5=88=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config.py | 2 +- docs/AI_INTELLIGENCE_MODULE_SOT.md | 3 +- services/action_plan_hygiene.py | 248 +++++++++++++++++++ services/elephant_alpha_autonomous_engine.py | 47 +++- tests/test_action_plan_hygiene.py | 49 ++++ tests/test_elephant_alpha_engine.py | 38 +++ 6 files changed, 383 insertions(+), 4 deletions(-) create mode 100644 services/action_plan_hygiene.py create mode 100644 tests/test_action_plan_hygiene.py diff --git a/config.py b/config.py index af5955d..cef3436 100644 --- a/config.py +++ b/config.py @@ -320,7 +320,7 @@ YOUTUBE_API_KEY = os.getenv('YOUTUBE_API_KEY', '') # ========================================== # 系統版本與路徑 # ========================================== -SYSTEM_VERSION = "V10.268" +SYSTEM_VERSION = "V10.269" LOG_FILE_PATH = os.path.join(BASE_DIR, 'logs/system.log') public_url = PUBLIC_URL # 用於模板顯示 diff --git a/docs/AI_INTELLIGENCE_MODULE_SOT.md b/docs/AI_INTELLIGENCE_MODULE_SOT.md index 6d15739..101ddf1 100644 --- a/docs/AI_INTELLIGENCE_MODULE_SOT.md +++ b/docs/AI_INTELLIGENCE_MODULE_SOT.md @@ -2,7 +2,7 @@ > **最後更新**: 2026-05-19 (台北時間) > **狀態**: 🟢 四 AI Agent 自動化閉環已落地;LLM 路由紅線升級為 Ollama-first 三主機級聯,Gemini 僅備援 / 鎖定場景 -> **適用版本**: V10.267 +> **適用版本**: V10.269 --- @@ -107,6 +107,7 @@ SQL漏斗(~300筆) - ElephantAlpha 使用 NVIDIA NIM hosted API;production 預設模型為 `nvidia/llama-3.3-nemotron-super-49b-v1.5`,`ELEPHANT_ALPHA_FALLBACK_MODELS` 需保留至少一個可呼叫備援;403/404、408/409/425/429、5xx、timeout 與 connection error 必須嘗試下一個模型。 - ElephantAlpha L3 HITL 只允許發送有實證、可審核、可行動的升級告警;價格類 trigger 無 Hermes 具體威脅時,只記錄 suppressed escalation telemetry 與 cooldown,不寫 pending `human_review`,不發 Telegram 空告警。 - `resource_optimization` 不再交給 LLM 生成「預期效益 / 已執行」敘事。此 trigger 必須先由程式量測 `action_plans` backlog、P1/P2 數、pending_review、逾時項目與 CPU load;只有 CPU 達門檻、P1/P2 積壓或逾時積壓才發 Telegram「資源壓力告警」。單純 queue 大但 CPU 正常只記錄 telemetry,不派發 Hermes/NemoTron、不宣稱 48 小時效益。 +- `resource_optimization` 會先執行 `ActionPlanHygieneService` 清理過期噪音:只關閉超過 72 小時的 `code_review_fix` / `openclaw_recommendation` 類 advisory action_plans,將狀態改為 `auto_disabled` 或 `rejected` 並寫入 `metadata_json.hygiene_history`;不刪資料、不碰 NemoTron 業務行動。 - OpenClaw/Hermes embedding 優先呼叫 Ollama `/api/embed`,只在舊節點不支援時 fallback `/api/embeddings`;timeout 由 `EMBEDDING_TIMEOUT` / `OLLAMA_EMBED_TIMEOUT` 控制。 - PPT 自動產線由 `momo-scheduler` 依節奏執行 `run_ppt_auto_generation_task(schedule_kind)`:每日 20:30 產日報、週一 20:40 產週報/市場情報、每月 1 日 20:50 產月報與管理型簡報、季初 21:00 產季報、半年初 21:10 產半年報、年初 21:20 產年報,再交給 22:00 `ppt_vision_audit` 做視覺審核;每次嘗試會寫入 `ppt_generation_runs`,`/observability/ppt_audit_history` 以精準參數檢查目標版本是否已產生,並可用 `/observability/ppt_audit/generate_missing` 手動補齊缺漏,總開關為 `PPT_AUTO_GENERATION_ENABLED`。PPT vision 需 `PPT_VISION_ENABLED=true` 與容器內 LibreOffice;`/observability/ppt_audit_file/` 會把 PPTX 轉成 PDF 快取供站內線上預覽,原始 PPTX 仍保留下載。 diff --git a/services/action_plan_hygiene.py b/services/action_plan_hygiene.py new file mode 100644 index 0000000..3f09af1 --- /dev/null +++ b/services/action_plan_hygiene.py @@ -0,0 +1,248 @@ +"""Action plan queue hygiene. + +Closes stale, non-executable automation suggestions without deleting audit data. +The service is intentionally conservative: it only handles sources that are +known to be advisory/noisy when old, and it records the closure in metadata_json. +""" + +import json +import os +from datetime import datetime, timedelta +from typing import Any, Dict, Iterable, List, Optional + +from sqlalchemy import text + +from database.manager import get_session +from services.logger_manager import SystemLogger + +logger = SystemLogger("ActionPlanHygiene").get_logger() + +DEFAULT_STALE_HOURS = int(os.getenv("ACTION_PLAN_HYGIENE_STALE_HOURS", "72")) +DEFAULT_MAX_UPDATES = int(os.getenv("ACTION_PLAN_HYGIENE_MAX_UPDATES", "200")) + +CLOSABLE_STATUSES = frozenset({"pending", "auto_pending", "pending_review"}) +SOURCE_TARGET_STATUS = { + "code_review_fix": "auto_disabled", + "code_review_pipeline": "auto_disabled", + "openclaw_recommendation": "rejected", + "openclaw": "rejected", +} + + +def _row_get(row: Any, key: str) -> Any: + if isinstance(row, dict): + return row.get(key) + if hasattr(row, "_mapping"): + return row._mapping.get(key) + try: + return row[key] + except Exception: + return getattr(row, key, None) + + +def _coerce_datetime(value: Any) -> Optional[datetime]: + if isinstance(value, datetime): + return value.replace(tzinfo=None) + if isinstance(value, str) and value.strip(): + try: + return datetime.fromisoformat(value.replace("Z", "+00:00")).replace(tzinfo=None) + except ValueError: + return None + return None + + +def _source_for_row(row: Any) -> str: + return str( + _row_get(row, "action_type") + or _row_get(row, "plan_type") + or _row_get(row, "created_by") + or "unknown" + ) + + +def _parse_metadata(raw: Any) -> Dict[str, Any]: + if isinstance(raw, dict): + return dict(raw) + if not raw: + return {} + try: + parsed = json.loads(str(raw)) + return parsed if isinstance(parsed, dict) else {"legacy_metadata": parsed} + except Exception: + return {"legacy_metadata_raw": str(raw)[:1000]} + + +def build_action_plan_hygiene_preview( + rows: Iterable[Any], + *, + now: Optional[datetime] = None, + stale_hours: int = DEFAULT_STALE_HOURS, + max_updates: int = DEFAULT_MAX_UPDATES, +) -> Dict[str, Any]: + """Return stale action plan candidates without mutating the database.""" + now = (now or datetime.now()).replace(tzinfo=None) + cutoff = now - timedelta(hours=stale_hours) + candidates: List[Dict[str, Any]] = [] + scanned = 0 + + for row in rows: + scanned += 1 + status = str(_row_get(row, "status") or "") + source = _source_for_row(row) + created_at = _coerce_datetime(_row_get(row, "created_at")) + if status not in CLOSABLE_STATUSES: + continue + if source not in SOURCE_TARGET_STATUS: + continue + if not created_at or created_at > cutoff: + continue + + age_hours = max(0.0, (now - created_at).total_seconds() / 3600.0) + candidates.append({ + "id": int(_row_get(row, "id")), + "source": source, + "from_status": status, + "to_status": SOURCE_TARGET_STATUS[source], + "priority": _row_get(row, "priority"), + "created_at": created_at.isoformat(sep=" "), + "age_hours": round(age_hours, 1), + "description": str(_row_get(row, "description") or "")[:160], + "reason": f"stale_{source}_older_than_{stale_hours}h", + }) + + candidates = candidates[:max_updates] + by_source: Dict[str, int] = {} + by_from_status: Dict[str, int] = {} + for item in candidates: + by_source[item["source"]] = by_source.get(item["source"], 0) + 1 + by_from_status[item["from_status"]] = by_from_status.get(item["from_status"], 0) + 1 + + return { + "scanned_count": scanned, + "candidate_count": len(candidates), + "stale_hours": stale_hours, + "max_updates": max_updates, + "by_source": by_source, + "by_from_status": by_from_status, + "candidates": candidates, + } + + +class ActionPlanHygieneService: + """Close stale advisory action_plans while preserving audit metadata.""" + + def __init__(self, stale_hours: int = DEFAULT_STALE_HOURS, max_updates: int = DEFAULT_MAX_UPDATES): + self.stale_hours = stale_hours + self.max_updates = max_updates + + def preview(self) -> Dict[str, Any]: + session = get_session() + try: + rows = session.execute(text(""" + SELECT id, status, priority, created_at, action_type, plan_type, + created_by, description, metadata_json + FROM action_plans + WHERE status IN ('pending', 'auto_pending', 'pending_review') + AND ( + action_type IN ('code_review_fix', 'openclaw_recommendation') + OR created_by IN ('code_review_pipeline', 'openclaw') + ) + ORDER BY created_at ASC + """)).fetchall() + return build_action_plan_hygiene_preview( + rows, + stale_hours=self.stale_hours, + max_updates=self.max_updates, + ) + finally: + session.close() + + def run(self) -> Dict[str, Any]: + session = get_session() + now = datetime.now() + try: + rows = session.execute(text(""" + SELECT id, status, priority, created_at, action_type, plan_type, + created_by, description, metadata_json + FROM action_plans + WHERE status IN ('pending', 'auto_pending', 'pending_review') + AND ( + action_type IN ('code_review_fix', 'openclaw_recommendation') + OR created_by IN ('code_review_pipeline', 'openclaw') + ) + ORDER BY created_at ASC + """)).fetchall() + preview = build_action_plan_hygiene_preview( + rows, + now=now, + stale_hours=self.stale_hours, + max_updates=self.max_updates, + ) + + row_by_id = {int(_row_get(row, "id")): row for row in rows} + updated: List[Dict[str, Any]] = [] + for item in preview["candidates"]: + row = row_by_id.get(item["id"]) + if not row: + continue + metadata = _parse_metadata(_row_get(row, "metadata_json")) + history = metadata.get("hygiene_history") + if not isinstance(history, list): + history = [] + history.append({ + "closed_at": now.isoformat(timespec="seconds"), + "from_status": item["from_status"], + "to_status": item["to_status"], + "reason": item["reason"], + "age_hours": item["age_hours"], + }) + metadata["hygiene_history"] = history[-10:] + metadata["hygiene_last_reason"] = item["reason"] + metadata["hygiene_last_closed_at"] = now.isoformat(timespec="seconds") + + session.execute( + text(""" + UPDATE action_plans + SET status = :status, + metadata_json = :metadata + WHERE id = :id + """), + { + "id": item["id"], + "status": item["to_status"], + "metadata": json.dumps(metadata, ensure_ascii=False), + }, + ) + updated.append(item) + + session.commit() + result = dict(preview) + result["updated_count"] = len(updated) + result["updated_ids"] = [item["id"] for item in updated] + result["ran_at"] = now.isoformat(timespec="seconds") + logger.info( + "Action plan hygiene updated=%d by_source=%s", + result["updated_count"], + result.get("by_source"), + ) + return result + except Exception: + session.rollback() + raise + finally: + session.close() + + +def run_action_plan_hygiene( + *, + stale_hours: int = DEFAULT_STALE_HOURS, + max_updates: int = DEFAULT_MAX_UPDATES, +) -> Dict[str, Any]: + return ActionPlanHygieneService(stale_hours=stale_hours, max_updates=max_updates).run() + + +__all__ = [ + "ActionPlanHygieneService", + "build_action_plan_hygiene_preview", + "run_action_plan_hygiene", +] diff --git a/services/elephant_alpha_autonomous_engine.py b/services/elephant_alpha_autonomous_engine.py index e9ebb56..c99b64d 100644 --- a/services/elephant_alpha_autonomous_engine.py +++ b/services/elephant_alpha_autonomous_engine.py @@ -52,6 +52,7 @@ RESOURCE_LOAD_THRESHOLD_PCT = float(os.getenv("ELEPHANT_ALPHA_RESOURCE_LOAD_THRE RESOURCE_HIGH_PRIORITY_THRESHOLD = int(os.getenv("ELEPHANT_ALPHA_RESOURCE_HIGH_PRIORITY_THRESHOLD", "5")) RESOURCE_STALE_THRESHOLD = int(os.getenv("ELEPHANT_ALPHA_RESOURCE_STALE_THRESHOLD", "5")) RESOURCE_STALE_HOURS = int(os.getenv("ELEPHANT_ALPHA_RESOURCE_STALE_HOURS", "24")) +RESOURCE_HYGIENE_ENABLED = os.getenv("ELEPHANT_ALPHA_RESOURCE_HYGIENE_ENABLED", "true").lower() in {"1", "true", "yes", "on"} # ---- Constants ---- _ALLOWED_ACTION_TYPES = frozenset({ @@ -870,12 +871,21 @@ class ElephantAlphaAutonomousEngine: else: metrics = self._classify_resource_pressure(metrics) + pre_hygiene_metrics = dict(metrics) + hygiene_result = None + if metrics.get("should_alert") and RESOURCE_HYGIENE_ENABLED: + hygiene_result = self._run_action_plan_hygiene() + if hygiene_result and int(hygiene_result.get("updated_count") or 0) > 0: + metrics = self._collect_resource_pressure_metrics() + metrics["pre_hygiene"] = pre_hygiene_metrics + metrics["hygiene_result"] = hygiene_result + trigger.conditions = dict(trigger.conditions or {}) trigger.conditions["_resource_metrics"] = metrics trigger.conditions["resource_pressure_level"] = metrics.get("pressure_level") trigger.conditions["resource_evidence"] = metrics.get("evidence", []) - if not metrics.get("should_alert"): + if not metrics.get("should_alert") and not hygiene_result: self._store_escalation(trigger.trigger_type) self._record_resource_optimization_suppressed(trigger, metrics, "below_actionable_threshold") return @@ -897,6 +907,15 @@ class ElephantAlphaAutonomousEngine: self._store_escalation(trigger.trigger_type) self._circuit_reset() + def _run_action_plan_hygiene(self) -> Optional[Dict[str, Any]]: + try: + from services.action_plan_hygiene import run_action_plan_hygiene + + return run_action_plan_hygiene() + except Exception as exc: + self._log.error("Action plan hygiene failed (non-blocking): %s", exc) + return {"updated_count": 0, "error": f"{type(exc).__name__}: {str(exc)[:200]}"} + def _record_resource_pressure_insight( self, metrics: Dict[str, Any], @@ -996,7 +1015,9 @@ class ElephantAlphaAutonomousEngine: f"pending_review={metrics.get('human_review_count', 0)}," f"stale={metrics.get('stale_count', 0)}," f"system_load={float(metrics.get('system_load_pct') or 0):.1f}%。" - f"{load_text};autonomous_limit={previous_limit}->{new_limit}。" + f"{load_text};" + f"auto_closed={int((metrics.get('hygiene_result') or {}).get('updated_count') or 0)};" + f"autonomous_limit={previous_limit}->{new_limit}。" ) @staticmethod @@ -1030,6 +1051,11 @@ class ElephantAlphaAutonomousEngine: "normal": "P4 normal", }.get(level, level) load_pct = float(metrics.get("system_load_pct") or 0.0) + pre_hygiene = metrics.get("pre_hygiene") if isinstance(metrics.get("pre_hygiene"), dict) else None + hygiene = metrics.get("hygiene_result") if isinstance(metrics.get("hygiene_result"), dict) else None + hygiene_count = int((hygiene or {}).get("updated_count") or 0) + if hygiene_count > 0 and not metrics.get("should_alert"): + level_label = "P4 resolved" load_judgement = ( "主機 CPU 已達高負載門檻。" if metrics.get("load_pressure") @@ -1042,6 +1068,13 @@ class ElephantAlphaAutonomousEngine: ] if new_limit != previous_limit: executed.append(f"已將 ElephantAlpha 自主決策上限由 {previous_limit} 調整為 {new_limit} 次/小時。") + if hygiene_count > 0: + by_source = hygiene.get("by_source") or {} + source_text = "、".join(f"{key} {value}" for key, value in by_source.items()) or f"{hygiene_count} 筆" + executed.insert( + 1, + f"已自動關閉過期 action_plans {hygiene_count} 筆({source_text});只改 status/metadata,不刪除資料。", + ) executed.append("未執行外部修復、未啟動 Hermes/NemoTron 價格分析、未宣稱效益預測。") lines = [ @@ -1051,6 +1084,16 @@ class ElephantAlphaAutonomousEngine: f"時間:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", "", "量測指標", + ] + if pre_hygiene: + lines.append( + "• 清理前 Action queue:" + f"{int(pre_hygiene.get('action_queue_size') or 0)}," + f"P1/P2:{int(pre_hygiene.get('high_priority_count') or 0)}," + f"逾時:{int(pre_hygiene.get('stale_count') or 0)}" + ) + lines.append("清理後") + lines += [ f"• Action queue:{int(metrics.get('action_queue_size') or 0)} / {int(metrics.get('queue_threshold') or RESOURCE_QUEUE_THRESHOLD)}", f"• P1/P2 待處理:{int(metrics.get('high_priority_count') or 0)} / {int(metrics.get('high_priority_threshold') or RESOURCE_HIGH_PRIORITY_THRESHOLD)}", f"• Pending review:{int(metrics.get('human_review_count') or 0)}", diff --git a/tests/test_action_plan_hygiene.py b/tests/test_action_plan_hygiene.py new file mode 100644 index 0000000..59e4330 --- /dev/null +++ b/tests/test_action_plan_hygiene.py @@ -0,0 +1,49 @@ +from datetime import datetime, timedelta + + +def test_action_plan_hygiene_preview_closes_only_stale_advisory_sources(): + from services.action_plan_hygiene import build_action_plan_hygiene_preview + + now = datetime(2026, 5, 19, 12, 0, 0) + rows = [ + { + "id": 1, + "status": "pending", + "priority": 1, + "created_at": now - timedelta(hours=100), + "action_type": "openclaw_recommendation", + "description": "old advisory", + }, + { + "id": 2, + "status": "auto_pending", + "priority": 1, + "created_at": now - timedelta(hours=90), + "action_type": "code_review_fix", + "description": "old code review", + }, + { + "id": 3, + "status": "pending", + "priority": 1, + "created_at": now - timedelta(hours=90), + "created_by": "nemotron", + "description": "must stay", + }, + { + "id": 4, + "status": "pending", + "priority": 1, + "created_at": now - timedelta(hours=2), + "action_type": "openclaw_recommendation", + "description": "fresh advisory", + }, + ] + + preview = build_action_plan_hygiene_preview(rows, now=now, stale_hours=72) + + assert preview["candidate_count"] == 2 + assert preview["by_source"] == {"openclaw_recommendation": 1, "code_review_fix": 1} + assert {item["id"] for item in preview["candidates"]} == {1, 2} + target_by_id = {item["id"]: item["to_status"] for item in preview["candidates"]} + assert target_by_id == {1: "rejected", 2: "auto_disabled"} diff --git a/tests/test_elephant_alpha_engine.py b/tests/test_elephant_alpha_engine.py index 32cc126..542ff6e 100644 --- a/tests/test_elephant_alpha_engine.py +++ b/tests/test_elephant_alpha_engine.py @@ -252,6 +252,7 @@ def test_resource_optimization_bypasses_llm_orchestrator(monkeypatch): sent.append(args) monkeypatch.setattr(engine_module.elephant_orchestrator, "analyze_and_coordinate", _boom) + monkeypatch.setattr(engine, "_run_action_plan_hygiene", lambda: {"updated_count": 0}) monkeypatch.setattr(engine, "_record_resource_pressure_insight", lambda *args, **kwargs: 77) monkeypatch.setattr(engine, "_send_resource_pressure_telegram", _capture_send) monkeypatch.setattr(engine, "_store_escalation", lambda trigger_type: stored.append(trigger_type)) @@ -280,3 +281,40 @@ def test_resource_optimization_bypasses_llm_orchestrator(monkeypatch): assert stored == ["resource_optimization"] assert len(sent) == 1 assert engine.max_autonomous_decisions_per_hour == 8 + + +def test_resource_pressure_message_reports_hygiene_result(): + from services.elephant_alpha_autonomous_engine import ElephantAlphaAutonomousEngine + + metrics = ElephantAlphaAutonomousEngine._classify_resource_pressure({ + "action_queue_size": 9, + "high_priority_count": 0, + "human_review_count": 0, + "stale_count": 0, + "system_load_pct": 20.0, + "queue_threshold": 10, + "load_threshold_pct": 80, + "high_priority_threshold": 5, + "stale_threshold": 5, + }) + metrics["pre_hygiene"] = { + "action_queue_size": 100, + "high_priority_count": 47, + "stale_count": 99, + } + metrics["hygiene_result"] = { + "updated_count": 91, + "by_source": {"code_review_fix": 66, "openclaw_recommendation": 25}, + } + + msg = ElephantAlphaAutonomousEngine._build_resource_pressure_telegram_message( + metrics, + insight_id=124, + previous_limit=10, + new_limit=10, + ) + + assert "P4 resolved" in msg + assert "清理前 Action queue:100" in msg + assert "已自動關閉過期 action_plans 91 筆" in msg + assert "只改 status/metadata,不刪除資料" in msg