diff --git a/TODO_NEXT_STEPS.txt b/TODO_NEXT_STEPS.txt index 2179832..c659f35 100644 --- a/TODO_NEXT_STEPS.txt +++ b/TODO_NEXT_STEPS.txt @@ -5,6 +5,7 @@ 【已完成】 - V10.266 強化核心 MOMO/PChome 比價鏈路:新增 `marketplace_product_matcher.py` 身份比對、只讓 `identity_v2` 且分數 ≥ 0.76 的高信心配對進 Dashboard/AI/Excel/Daily/Growth/PPT,並建立 `competitor_intel_repository.py` 統一圖表與簡報資料出口;同品牌但不同型號、不同組數、套組/單品或多品項不一致會進待審,不進正式比價。 + - V10.267 專業化 ElephantAlpha `resource_optimization` 告警:不再讓 LLM 生成「48 小時預期效益 / 已執行」敘事,改由程式量測 action queue、P1/P2、pending_review、逾時項目與 CPU load;單純 backlog 不發 Telegram,只有可行動資源壓力才寫 `ai_insights(resource_pressure)` 並發送量測型告警。 - V10.254 續補 `/growth_analysis` 快取命中效能:PostgreSQL source fingerprint 加 60 秒短 TTL,匯入 realtime_sales_monthly 後同步清除 growth shared cache 與短快取,避免快取命中仍頻繁掃大表 COUNT。 - V10.253 修正 Elephant Alpha L3 HITL 空告警:價格類與資源調配低信心事件若沒有 Hermes/實證資料,只記 suppressed telemetry 與 cooldown,不寫 pending human_review、不發 Telegram;`resource_optimization` 會保留 queue/load 原始指標供追查。 - V10.251 修正 OpenClaw Q&A 備援遙測:Ollama 主路徑仍為 GCP-A → GCP-B → 111,Gemini 只記為 `openclaw_qa_gemini_fallback`,NIM 只記為 `openclaw_qa_nim`;AI Calls 會把 legacy `openclaw_qa + gemini` 標成 Gemini 備援,避免再次誤判 Gemini-first。 @@ -149,6 +150,7 @@ - Phase 73 candidate queue review decision writer run package:新增 `services/market_intel/candidate_queue_review_decision_writer_run_package.py`、POST `/api/market_intel/manual_sample_review/candidate_queue_review_decision_writer_run_package` 與 UI package 按鈕,將 review_state transaction、preflight、operator drill、writer gate、post-write smoke、必要 artifact 與 rollback plan 組成正式 CLI 更新前的可稽核 run package;API/UI 不寫檔、不讀 token、不執行 CLI、不連 DB、不更新 review_state、不 commit、不掛 scheduler;版本同步至 V10.262。 - Phase 74 candidate queue review decision writer run readiness:新增 `services/market_intel/candidate_queue_review_decision_writer_run_readiness.py`、POST `/api/market_intel/manual_sample_review/candidate_queue_review_decision_writer_run_readiness` 與 UI readiness 按鈕,檢查 review_state CLI 更新前的 transaction JSON、備份、preflight、shell-only token 與 post-write smoke 計畫是否齊備;API/UI 不寫檔、不讀 token、不執行 CLI、不連 DB、不更新 review_state、不 commit、不掛 scheduler;版本同步至 V10.264。 - Phase 75 candidate queue review decision writer run receipt:新增 `services/market_intel/candidate_queue_review_decision_writer_run_receipt.py`、POST `/api/market_intel/manual_sample_review/candidate_queue_review_decision_writer_run_receipt` 與 UI receipt 按鈕,審核 review_state CLI 更新後的 writer output、post-write smoke、dedupe key 一致性、artifact 路徑與 token 外洩風險;API/UI 不回吐 receipt 原文、不讀 token、不執行 CLI、不連 DB、不更新 review_state、不 commit、不掛 scheduler;版本同步至 V10.266。 + - Phase 76 candidate queue review decision writer run closeout:新增 `services/market_intel/candidate_queue_review_decision_writer_run_closeout.py`、POST `/api/market_intel/manual_sample_review/candidate_queue_review_decision_writer_run_closeout` 與 UI closeout 按鈕,在 review_state receipt 通過後整理 closeout gate、操作員 closeout artifact、post-closeout read-only inventory 確認與 promotion 摘要;API/UI 不回吐 receipt 原文、不讀 token、不執行 CLI、不連 DB、不更新 review_state、不 commit、不掛 scheduler;版本同步至 V10.267。 - V10.248 補市場情報 390px preview panel QA:sample review 工具列改為 textarea + 可換行 action rail,移除舊的硬編 8 欄 grid;`check_responsive_overflow` 新增 `--screenshot-all`,本機 390x844 `/market_intel` 真頁面 QA 通過且 overflow=0。 - V10.250 補 Code Review Gemini 備援遙測護欄:Ollama 主路徑失敗時 `fallback_to` 明確指向 `code_review_openclaw_gemini`,測試鎖住「Gemini 不得記成 `code_review_openclaw` 主 caller」;AI Calls 觀測台會把 legacy `code_review_openclaw + gemini` 顯示成 Gemini 備援,避免誤判 Gemini-first。 - Schema smoke:`tests/test_market_intel_skeleton.py` 檢查 `Base.metadata` 內含 ADR-035 八張 `market_*` tables。 diff --git a/config.py b/config.py index 7441755..7874031 100644 --- a/config.py +++ b/config.py @@ -320,7 +320,7 @@ YOUTUBE_API_KEY = os.getenv('YOUTUBE_API_KEY', '') # ========================================== # 系統版本與路徑 # ========================================== -SYSTEM_VERSION = "V10.266" +SYSTEM_VERSION = "V10.267" LOG_FILE_PATH = os.path.join(BASE_DIR, 'logs/system.log') public_url = PUBLIC_URL # 用於模板顯示 diff --git a/docs/AI_INTELLIGENCE_MODULE_SOT.md b/docs/AI_INTELLIGENCE_MODULE_SOT.md index 1ded158..6d15739 100644 --- a/docs/AI_INTELLIGENCE_MODULE_SOT.md +++ b/docs/AI_INTELLIGENCE_MODULE_SOT.md @@ -2,7 +2,7 @@ > **最後更新**: 2026-05-19 (台北時間) > **狀態**: 🟢 四 AI Agent 自動化閉環已落地;LLM 路由紅線升級為 Ollama-first 三主機級聯,Gemini 僅備援 / 鎖定場景 -> **適用版本**: V10.253 +> **適用版本**: V10.267 --- @@ -105,7 +105,8 @@ SQL漏斗(~300筆) - Gunicorn runtime 預設 `worker_class = gthread`、`GUNICORN_THREADS=4`、`preload_app = False`;此組合讓 HUP 熱重載可用,也避免 Dashboard 長查詢完全阻塞 `/health`。 - CD rebuild 模式必須先 build image 成功,再短暫 stop/rm/recreate 三應用容器,避免 no-cache build 造成長時間 502。 - ElephantAlpha 使用 NVIDIA NIM hosted API;production 預設模型為 `nvidia/llama-3.3-nemotron-super-49b-v1.5`,`ELEPHANT_ALPHA_FALLBACK_MODELS` 需保留至少一個可呼叫備援;403/404、408/409/425/429、5xx、timeout 與 connection error 必須嘗試下一個模型。 -- ElephantAlpha L3 HITL 只允許發送有實證、可審核、可行動的升級告警;價格類 trigger 無 Hermes 具體威脅、或 `resource_optimization` 低信心且沒有具體行動時,只記錄 suppressed escalation telemetry 與 cooldown,不寫 pending `human_review`,不發 Telegram 空告警。 +- ElephantAlpha L3 HITL 只允許發送有實證、可審核、可行動的升級告警;價格類 trigger 無 Hermes 具體威脅時,只記錄 suppressed escalation telemetry 與 cooldown,不寫 pending `human_review`,不發 Telegram 空告警。 +- `resource_optimization` 不再交給 LLM 生成「預期效益 / 已執行」敘事。此 trigger 必須先由程式量測 `action_plans` backlog、P1/P2 數、pending_review、逾時項目與 CPU load;只有 CPU 達門檻、P1/P2 積壓或逾時積壓才發 Telegram「資源壓力告警」。單純 queue 大但 CPU 正常只記錄 telemetry,不派發 Hermes/NemoTron、不宣稱 48 小時效益。 - OpenClaw/Hermes embedding 優先呼叫 Ollama `/api/embed`,只在舊節點不支援時 fallback `/api/embeddings`;timeout 由 `EMBEDDING_TIMEOUT` / `OLLAMA_EMBED_TIMEOUT` 控制。 - PPT 自動產線由 `momo-scheduler` 依節奏執行 `run_ppt_auto_generation_task(schedule_kind)`:每日 20:30 產日報、週一 20:40 產週報/市場情報、每月 1 日 20:50 產月報與管理型簡報、季初 21:00 產季報、半年初 21:10 產半年報、年初 21:20 產年報,再交給 22:00 `ppt_vision_audit` 做視覺審核;每次嘗試會寫入 `ppt_generation_runs`,`/observability/ppt_audit_history` 以精準參數檢查目標版本是否已產生,並可用 `/observability/ppt_audit/generate_missing` 手動補齊缺漏,總開關為 `PPT_AUTO_GENERATION_ENABLED`。PPT vision 需 `PPT_VISION_ENABLED=true` 與容器內 LibreOffice;`/observability/ppt_audit_file/` 會把 PPTX 轉成 PDF 快取供站內線上預覽,原始 PPTX 仍保留下載。 diff --git a/services/elephant_alpha_autonomous_engine.py b/services/elephant_alpha_autonomous_engine.py index c8edb25..e9ebb56 100644 --- a/services/elephant_alpha_autonomous_engine.py +++ b/services/elephant_alpha_autonomous_engine.py @@ -11,7 +11,7 @@ ADR-012 Compliance: §③ 單一 audit trail — 所有基行完畢後必發 triaged_alert Telegram §⑤ 雙寫強制 — ai_insights (由 orchestrator._log_decision) + Telegram ADR-013 Compliance: - resource_optimization trigger → auto_heal_service.handle_exception + resource_optimization trigger → deterministic resource-pressure telemetry + controlled throttling """ import asyncio @@ -25,6 +25,7 @@ import threading from dataclasses import dataclass, asdict from datetime import datetime, timedelta from enum import Enum +from html import escape from typing import Dict, List, Any, Optional from sqlalchemy import text @@ -46,6 +47,11 @@ CACHE_DB_PATH = os.getenv("ELEPHANT_ALPHA_CACHE_DB", ":memory:") ESCALATION_COOLDOWN_MIN = int(os.getenv("ELEPHANT_ALPHA_ESCALATION_COOLDOWN_MIN", "30")) CONFIDENCE_THRESHOLD = float(os.getenv("ELEPHANT_ALPHA_CONFIDENCE_THRESHOLD", "0.7")) MAX_AUTONOMOUS_DECISIONS_PER_HOUR = int(os.getenv("ELEPHANT_ALPHA_MAX_AUTONOMOUS_DECISIONS_PER_HOUR", "10")) +RESOURCE_QUEUE_THRESHOLD = int(os.getenv("ELEPHANT_ALPHA_RESOURCE_QUEUE_THRESHOLD", "10")) +RESOURCE_LOAD_THRESHOLD_PCT = float(os.getenv("ELEPHANT_ALPHA_RESOURCE_LOAD_THRESHOLD_PCT", "80")) +RESOURCE_HIGH_PRIORITY_THRESHOLD = int(os.getenv("ELEPHANT_ALPHA_RESOURCE_HIGH_PRIORITY_THRESHOLD", "5")) +RESOURCE_STALE_THRESHOLD = int(os.getenv("ELEPHANT_ALPHA_RESOURCE_STALE_THRESHOLD", "5")) +RESOURCE_STALE_HOURS = int(os.getenv("ELEPHANT_ALPHA_RESOURCE_STALE_HOURS", "24")) # ---- Constants ---- _ALLOWED_ACTION_TYPES = frozenset({ @@ -274,7 +280,13 @@ class ElephantAlphaAutonomousEngine: ), AutonomousTrigger( trigger_type="resource_optimization", - conditions={"system_load": "high", "queue_size": ">10"}, + conditions={ + "queue_threshold": RESOURCE_QUEUE_THRESHOLD, + "load_threshold_pct": RESOURCE_LOAD_THRESHOLD_PCT, + "high_priority_threshold": RESOURCE_HIGH_PRIORITY_THRESHOLD, + "stale_threshold": RESOURCE_STALE_THRESHOLD, + "stale_hours": RESOURCE_STALE_HOURS, + }, threshold=0.6, enabled=True, ), @@ -424,16 +436,12 @@ class ElephantAlphaAutonomousEngine: session.close() async def _check_resource_optimization_trigger(self, trigger: AutonomousTrigger) -> bool: - queue_size = self._get_action_queue_size() - system_load_pct = self._get_system_load_percentage() + metrics = self._collect_resource_pressure_metrics() trigger.conditions = dict(trigger.conditions or {}) - trigger.conditions["_resource_metrics"] = { - "action_queue_size": queue_size, - "system_load_pct": system_load_pct, - "queue_threshold": 10, - "load_threshold_pct": 80, - } - return queue_size > 10 or system_load_pct > 80 + trigger.conditions["_resource_metrics"] = metrics + trigger.conditions["resource_pressure_level"] = metrics.get("pressure_level", "normal") + trigger.conditions["resource_evidence"] = metrics.get("evidence", []) + return bool(metrics.get("should_alert")) async def _check_code_exception_trigger(self, trigger: AutonomousTrigger) -> bool: containers = trigger.conditions.get("scan_containers", ["momo-pro-system", "momo-scheduler"]) @@ -551,6 +559,10 @@ class ElephantAlphaAutonomousEngine: return default async def _execute_autonomous_decision(self, trigger: AutonomousTrigger) -> None: + if trigger.trigger_type == "resource_optimization": + await self._handle_resource_optimization_trigger(trigger) + return + # ─── Operation Ollama-First v5.0 修補(2026-05-03)─── # 統帥反饋:每個 EA trigger 都先跑 Gemini orchestrate(燒錢)才 prefetch Hermes, # 結果 Hermes 0 threats 時送出空泛幻覺訊息 + Gemini 帳單照付。 @@ -704,6 +716,380 @@ class ElephantAlphaAutonomousEngine: raise ValueError(f"Unrecognized step: agent={agent_type} action={action}") + # ---- Deterministic resource pressure handling ---- + def _collect_resource_pressure_metrics(self) -> Dict[str, Any]: + system_load_pct = float(self._safe_metric(self._get_system_load_percentage, default=0.0) or 0.0) + rows: List[Any] = [] + query_error = None + + try: + session = get_session() + try: + rows = session.execute(text(""" + SELECT status, priority, created_at, action_type, created_by, plan_type + FROM action_plans + WHERE status IN ('pending', 'auto_pending', 'pending_review') + """)).fetchall() + finally: + session.close() + except Exception as exc: + query_error = f"{type(exc).__name__}: {str(exc)[:160]}" + rows = [] + + now = datetime.now() + status_breakdown: Dict[str, int] = {} + type_breakdown: Dict[str, int] = {} + stale_count = 0 + high_priority_count = 0 + human_review_count = 0 + oldest_pending_age_hours = 0.0 + + for row in rows: + data = row._mapping if hasattr(row, "_mapping") else row + status = self._row_get(data, "status") + priority = self._row_get(data, "priority") + action_type = ( + self._row_get(data, "action_type") + or self._row_get(data, "plan_type") + or self._row_get(data, "created_by") + or "unknown" + ) + created_at = self._coerce_datetime(self._row_get(data, "created_at")) + + status_breakdown[str(status or "unknown")] = status_breakdown.get(str(status or "unknown"), 0) + 1 + type_breakdown[str(action_type)] = type_breakdown.get(str(action_type), 0) + 1 + + if str(status) == "pending_review": + human_review_count += 1 + try: + if priority is not None and int(priority) <= 2: + high_priority_count += 1 + except (TypeError, ValueError): + pass + if created_at: + age_hours = max(0.0, (now - created_at).total_seconds() / 3600.0) + oldest_pending_age_hours = max(oldest_pending_age_hours, age_hours) + if age_hours >= RESOURCE_STALE_HOURS: + stale_count += 1 + + queue_size = len(rows) + if query_error: + queue_size = int(self._safe_metric(self._get_action_queue_size, default=0) or 0) + + metrics = { + "action_queue_size": queue_size, + "high_priority_count": high_priority_count, + "human_review_count": human_review_count, + "stale_count": stale_count, + "oldest_pending_age_hours": round(oldest_pending_age_hours, 1), + "system_load_pct": round(system_load_pct, 1), + "queue_threshold": RESOURCE_QUEUE_THRESHOLD, + "load_threshold_pct": RESOURCE_LOAD_THRESHOLD_PCT, + "high_priority_threshold": RESOURCE_HIGH_PRIORITY_THRESHOLD, + "stale_threshold": RESOURCE_STALE_THRESHOLD, + "stale_hours": RESOURCE_STALE_HOURS, + "status_breakdown": status_breakdown, + "type_breakdown": dict(sorted(type_breakdown.items(), key=lambda item: item[1], reverse=True)[:6]), + } + if query_error: + metrics["query_error"] = query_error + return self._classify_resource_pressure(metrics) + + @staticmethod + def _row_get(row: Any, key: str) -> Any: + if isinstance(row, dict): + return row.get(key) + try: + return row[key] + except Exception: + return getattr(row, key, None) + + @staticmethod + def _coerce_datetime(value: Any) -> Optional[datetime]: + if isinstance(value, datetime): + return value.replace(tzinfo=None) + if isinstance(value, str) and value.strip(): + try: + return datetime.fromisoformat(value.replace("Z", "+00:00")).replace(tzinfo=None) + except ValueError: + return None + return None + + @staticmethod + def _classify_resource_pressure(metrics: Dict[str, Any]) -> Dict[str, Any]: + classified = dict(metrics or {}) + queue_size = int(classified.get("action_queue_size") or 0) + high_priority_count = int(classified.get("high_priority_count") or 0) + human_review_count = int(classified.get("human_review_count") or 0) + stale_count = int(classified.get("stale_count") or 0) + system_load_pct = float(classified.get("system_load_pct") or 0.0) + queue_threshold = int(classified.get("queue_threshold") or RESOURCE_QUEUE_THRESHOLD) + load_threshold_pct = float(classified.get("load_threshold_pct") or RESOURCE_LOAD_THRESHOLD_PCT) + high_priority_threshold = int(classified.get("high_priority_threshold") or RESOURCE_HIGH_PRIORITY_THRESHOLD) + stale_threshold = int(classified.get("stale_threshold") or RESOURCE_STALE_THRESHOLD) + + queue_pressure = queue_size > queue_threshold + load_pressure = system_load_pct >= load_threshold_pct + high_priority_pressure = high_priority_count >= high_priority_threshold + stale_pressure = stale_count >= stale_threshold + + if load_pressure and (queue_pressure or high_priority_count > 0 or stale_count > 0): + pressure_level = "critical" + elif load_pressure or high_priority_pressure or stale_pressure: + pressure_level = "warning" + elif queue_pressure: + pressure_level = "backlog_only" + else: + pressure_level = "normal" + + evidence = [ + f"action_queue={queue_size}/{queue_threshold}", + f"system_load={system_load_pct:.1f}%/{load_threshold_pct:.0f}%", + f"high_priority={high_priority_count}/{high_priority_threshold}", + f"stale={stale_count}/{stale_threshold}", + ] + if human_review_count: + evidence.append(f"pending_review={human_review_count}") + + classified.update({ + "pressure_level": pressure_level, + "should_alert": pressure_level in {"critical", "warning"}, + "queue_pressure": queue_pressure, + "load_pressure": load_pressure, + "high_priority_pressure": high_priority_pressure, + "stale_pressure": stale_pressure, + "evidence": evidence, + "confidence": 0.95 if pressure_level == "critical" else 0.86 if pressure_level == "warning" else 0.68, + }) + return classified + + async def _handle_resource_optimization_trigger(self, trigger: AutonomousTrigger) -> None: + metrics = (trigger.conditions or {}).get("_resource_metrics") + if not isinstance(metrics, dict): + metrics = self._collect_resource_pressure_metrics() + else: + metrics = self._classify_resource_pressure(metrics) + + trigger.conditions = dict(trigger.conditions or {}) + trigger.conditions["_resource_metrics"] = metrics + trigger.conditions["resource_pressure_level"] = metrics.get("pressure_level") + trigger.conditions["resource_evidence"] = metrics.get("evidence", []) + + if not metrics.get("should_alert"): + self._store_escalation(trigger.trigger_type) + self._record_resource_optimization_suppressed(trigger, metrics, "below_actionable_threshold") + return + + previous_limit = self.max_autonomous_decisions_per_hour + new_limit = previous_limit + if metrics.get("load_pressure") or metrics.get("pressure_level") == "critical": + new_limit = min(previous_limit, 5) + elif metrics.get("high_priority_pressure") or metrics.get("stale_pressure"): + new_limit = min(previous_limit, 8) + self.max_autonomous_decisions_per_hour = new_limit + + insight_id = None + try: + insight_id = self._record_resource_pressure_insight(metrics, previous_limit, new_limit) + except Exception as exc: + self._log.error("Resource pressure insight write failed (non-blocking): %s", exc) + await self._send_resource_pressure_telegram(metrics, insight_id, previous_limit, new_limit) + self._store_escalation(trigger.trigger_type) + self._circuit_reset() + + def _record_resource_pressure_insight( + self, + metrics: Dict[str, Any], + previous_limit: int, + new_limit: int, + ) -> Optional[int]: + content = self._format_resource_pressure_content(metrics, previous_limit, new_limit) + session = get_session() + try: + row = session.execute( + text(""" + INSERT INTO ai_insights + (insight_type, content, confidence, created_by, status, metadata_json) + VALUES (:type, :content, :confidence, :created_by, :status, :metadata) + RETURNING id + """), + { + "type": "resource_pressure", + "content": content, + "confidence": float(metrics.get("confidence") or 0.86), + "created_by": "elephant_alpha", + "status": "pending", + "metadata": json.dumps({ + "source": "elephant_alpha_resource_pressure", + "metrics": metrics, + "previous_limit": previous_limit, + "new_limit": new_limit, + }, ensure_ascii=False), + }, + ).fetchone() + session.commit() + insight_id = row[0] if row else None + if insight_id: + try: + from services.openclaw_learning_service import enqueue_insight_embedding + enqueue_insight_embedding(insight_id, "resource_pressure", content) + except Exception as embed_err: + self._log.warning("Embedding enqueue failed for resource_pressure: %s", embed_err) + return insight_id + except Exception: + session.rollback() + raise + finally: + session.close() + + def _record_resource_optimization_suppressed( + self, + trigger: AutonomousTrigger, + metrics: Dict[str, Any], + reason: str, + ) -> None: + self._log.info( + "EA resource optimization suppressed: reason=%s level=%s metrics=%s", + reason, + metrics.get("pressure_level"), + metrics, + ) + try: + from services.ai_call_logger import log_ai_call + + with log_ai_call( + caller="ea_engine", + provider="gcp_ollama", + model="hermes3:latest", + meta={ + "suppressed_resource_optimization": True, + "trigger": trigger.trigger_type, + "reason": reason, + "metrics": metrics, + }, + ) as ctx: + ctx.set_tokens(input=0, output=0) + ctx.status = "cache_only" + except Exception: + self._log.warning( + "EA resource suppression telemetry failed; trigger=%s", + trigger.trigger_type, + exc_info=True, + ) + + @staticmethod + def _format_resource_pressure_content( + metrics: Dict[str, Any], + previous_limit: int, + new_limit: int, + ) -> str: + level = metrics.get("pressure_level", "unknown") + load_text = ( + "主機負載達告警門檻" + if metrics.get("load_pressure") + else "主機負載未達告警門檻,壓力主要來自工作隊列" + ) + return ( + f"[Elephant Alpha 資源壓力] level={level};" + f"action_queue={metrics.get('action_queue_size', 0)}," + f"P1/P2={metrics.get('high_priority_count', 0)}," + f"pending_review={metrics.get('human_review_count', 0)}," + f"stale={metrics.get('stale_count', 0)}," + f"system_load={float(metrics.get('system_load_pct') or 0):.1f}%。" + f"{load_text};autonomous_limit={previous_limit}->{new_limit}。" + ) + + @staticmethod + def _build_resource_pressure_actions(metrics: Dict[str, Any]) -> List[str]: + actions = [] + if metrics.get("high_priority_pressure"): + actions.append("先處理 priority <= 2 的 action_plans,避免高風險項目被一般建議淹沒。") + if metrics.get("stale_pressure"): + actions.append( + f"清理或關閉超過 {metrics.get('stale_hours', RESOURCE_STALE_HOURS)} 小時仍未處理的 pending / pending_review 項目。" + ) + if metrics.get("load_pressure"): + actions.append("暫停非必要背景任務,優先保留匯入、Dashboard、Telegram 與 AutoHeal 路徑。") + if not actions: + actions.append("僅保留觀測,不派發 Hermes/NemoTron/價格分析,避免把一般 backlog 誤報成資源事件。") + actions.append("確認 action_plans 來源是否持續產生重複建議;若是報表型建議,應改為摘要消化而非逐筆告警。") + return actions + + @staticmethod + def _build_resource_pressure_telegram_message( + metrics: Dict[str, Any], + insight_id: Optional[int], + previous_limit: int, + new_limit: int, + ) -> str: + level = str(metrics.get("pressure_level", "unknown")) + level_label = { + "critical": "P1 critical", + "warning": "P2 warning", + "backlog_only": "P3 backlog", + "normal": "P4 normal", + }.get(level, level) + load_pct = float(metrics.get("system_load_pct") or 0.0) + load_judgement = ( + "主機 CPU 已達高負載門檻。" + if metrics.get("load_pressure") + else "主機 CPU 未達高負載門檻,這不是主機資源耗盡,而是工作隊列/人工審核積壓。" + ) + executed = [ + f"已寫入 ai_insights(resource_pressure) #{insight_id}。" + if insight_id + else "ai_insights(resource_pressure) 寫入未取得 id;請查看 scheduler log。" + ] + if new_limit != previous_limit: + executed.append(f"已將 ElephantAlpha 自主決策上限由 {previous_limit} 調整為 {new_limit} 次/小時。") + executed.append("未執行外部修復、未啟動 Hermes/NemoTron 價格分析、未宣稱效益預測。") + + lines = [ + "Elephant Alpha · 資源壓力告警", + f"事件:resource_optimization", + f"等級:{escape(level_label)}", + f"時間:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", + "", + "量測指標", + f"• Action queue:{int(metrics.get('action_queue_size') or 0)} / {int(metrics.get('queue_threshold') or RESOURCE_QUEUE_THRESHOLD)}", + f"• P1/P2 待處理:{int(metrics.get('high_priority_count') or 0)} / {int(metrics.get('high_priority_threshold') or RESOURCE_HIGH_PRIORITY_THRESHOLD)}", + f"• Pending review:{int(metrics.get('human_review_count') or 0)}", + f"• 逾時未處理:{int(metrics.get('stale_count') or 0)} / {int(metrics.get('stale_threshold') or RESOURCE_STALE_THRESHOLD)}", + f"• CPU load:{load_pct:.1f}% / {float(metrics.get('load_threshold_pct') or RESOURCE_LOAD_THRESHOLD_PCT):.0f}%", + "", + "判讀", + f"• {escape(load_judgement)}", + "• 這則告警只採用 action_plans 與 CPU 實測值,不採用 LLM 生成的 48 小時效益預測。", + "", + "已執行", + *[f"• {escape(item)}" for item in executed], + "", + "建議下一步", + *[f"• {escape(item)}" for item in ElephantAlphaAutonomousEngine._build_resource_pressure_actions(metrics)], + ] + return "\n".join(lines) + + async def _send_resource_pressure_telegram( + self, + metrics: Dict[str, Any], + insight_id: Optional[int], + previous_limit: int, + new_limit: int, + ) -> None: + try: + from services.telegram_templates import _send_telegram_raw + + msg = self._build_resource_pressure_telegram_message( + metrics, + insight_id, + previous_limit, + new_limit, + ) + await self._run_with_timeout(_send_telegram_raw, msg, timeout=10) + self._log.info("Resource pressure Telegram sent: level=%s", metrics.get("pressure_level")) + except Exception as e: + self._log.error("Resource pressure Telegram failed (non-blocking): %s", e) + # ---- Sub-services ---- async def _hermes_analyze(self) -> Any: from services.hermes_analyst_service import HermesAnalystService diff --git a/tests/test_elephant_alpha_engine.py b/tests/test_elephant_alpha_engine.py index cba70bc..32cc126 100644 --- a/tests/test_elephant_alpha_engine.py +++ b/tests/test_elephant_alpha_engine.py @@ -159,3 +159,124 @@ def test_escalate_resource_optimization_without_evidence_is_suppressed(monkeypat assert cooldown == ["resource_optimization"] assert suppressed == [("resource_optimization", "no_concrete_evidence")] + + +def test_resource_pressure_classifier_does_not_equate_backlog_with_cpu_load(): + from services.elephant_alpha_autonomous_engine import ElephantAlphaAutonomousEngine + + metrics = ElephantAlphaAutonomousEngine._classify_resource_pressure({ + "action_queue_size": 34, + "high_priority_count": 0, + "human_review_count": 0, + "stale_count": 0, + "system_load_pct": 19.2, + "queue_threshold": 10, + "load_threshold_pct": 80, + "high_priority_threshold": 5, + "stale_threshold": 5, + }) + + assert metrics["pressure_level"] == "backlog_only" + assert metrics["should_alert"] is False + assert metrics["load_pressure"] is False + assert "system_load=19.2%/80%" in metrics["evidence"] + + +def test_resource_pressure_classifier_alerts_on_actionable_review_backlog(): + from services.elephant_alpha_autonomous_engine import ElephantAlphaAutonomousEngine + + metrics = ElephantAlphaAutonomousEngine._classify_resource_pressure({ + "action_queue_size": 34, + "high_priority_count": 8, + "human_review_count": 6, + "stale_count": 0, + "system_load_pct": 22.0, + "queue_threshold": 10, + "load_threshold_pct": 80, + "high_priority_threshold": 5, + "stale_threshold": 5, + }) + + assert metrics["pressure_level"] == "warning" + assert metrics["should_alert"] is True + assert metrics["high_priority_pressure"] is True + assert metrics["load_pressure"] is False + + +def test_resource_pressure_message_is_measurement_based_not_llm_theatre(): + from services.elephant_alpha_autonomous_engine import ElephantAlphaAutonomousEngine + + metrics = ElephantAlphaAutonomousEngine._classify_resource_pressure({ + "action_queue_size": 34, + "high_priority_count": 8, + "human_review_count": 6, + "stale_count": 5, + "system_load_pct": 22.0, + "queue_threshold": 10, + "load_threshold_pct": 80, + "high_priority_threshold": 5, + "stale_threshold": 5, + "stale_hours": 24, + }) + + msg = ElephantAlphaAutonomousEngine._build_resource_pressure_telegram_message( + metrics, + insight_id=123, + previous_limit=10, + new_limit=8, + ) + + assert "量測指標" in msg + assert "主機 CPU 未達高負載門檻" in msg + assert "未啟動 Hermes/NemoTron 價格分析" in msg + assert "預期效益" not in msg + assert "48小時" not in msg + assert "48 小時效益預測" in msg + + +def test_resource_optimization_bypasses_llm_orchestrator(monkeypatch): + import services.elephant_alpha_autonomous_engine as engine_module + from services.elephant_alpha_autonomous_engine import ( + AutonomousTrigger, + ElephantAlphaAutonomousEngine, + ) + + engine = ElephantAlphaAutonomousEngine() + sent = [] + stored = [] + + async def _boom(*args, **kwargs): + raise AssertionError("resource_optimization must not call LLM orchestrator") + + async def _capture_send(*args, **kwargs): + sent.append(args) + + monkeypatch.setattr(engine_module.elephant_orchestrator, "analyze_and_coordinate", _boom) + monkeypatch.setattr(engine, "_record_resource_pressure_insight", lambda *args, **kwargs: 77) + monkeypatch.setattr(engine, "_send_resource_pressure_telegram", _capture_send) + monkeypatch.setattr(engine, "_store_escalation", lambda trigger_type: stored.append(trigger_type)) + + trigger = AutonomousTrigger( + trigger_type="resource_optimization", + conditions={ + "_resource_metrics": { + "action_queue_size": 34, + "high_priority_count": 9, + "human_review_count": 6, + "stale_count": 0, + "system_load_pct": 20.0, + "queue_threshold": 10, + "load_threshold_pct": 80, + "high_priority_threshold": 5, + "stale_threshold": 5, + } + }, + threshold=0.6, + enabled=True, + ) + + asyncio.run(engine._execute_autonomous_decision(trigger)) + + assert stored == ["resource_optimization"] + assert len(sent) == 1 + assert engine.max_autonomous_decisions_per_hour == 8