專業化 EA 資源壓力告警
All checks were successful
CD Pipeline / deploy (push) Successful in 1m2s

This commit is contained in:
OoO
2026-05-19 20:47:46 +08:00
parent 1911c1c53d
commit 32013ae3b2
5 changed files with 524 additions and 14 deletions

View File

@@ -5,6 +5,7 @@
【已完成】
- V10.266 強化核心 MOMO/PChome 比價鏈路:新增 `marketplace_product_matcher.py` 身份比對、只讓 `identity_v2` 且分數 ≥ 0.76 的高信心配對進 Dashboard/AI/Excel/Daily/Growth/PPT並建立 `competitor_intel_repository.py` 統一圖表與簡報資料出口;同品牌但不同型號、不同組數、套組/單品或多品項不一致會進待審,不進正式比價。
- V10.267 專業化 ElephantAlpha `resource_optimization` 告警:不再讓 LLM 生成「48 小時預期效益 / 已執行」敘事,改由程式量測 action queue、P1/P2、pending_review、逾時項目與 CPU load單純 backlog 不發 Telegram只有可行動資源壓力才寫 `ai_insights(resource_pressure)` 並發送量測型告警。
- V10.254 續補 `/growth_analysis` 快取命中效能PostgreSQL source fingerprint 加 60 秒短 TTL匯入 realtime_sales_monthly 後同步清除 growth shared cache 與短快取,避免快取命中仍頻繁掃大表 COUNT。
- V10.253 修正 Elephant Alpha L3 HITL 空告警:價格類與資源調配低信心事件若沒有 Hermes/實證資料,只記 suppressed telemetry 與 cooldown不寫 pending human_review、不發 Telegram`resource_optimization` 會保留 queue/load 原始指標供追查。
- V10.251 修正 OpenClaw Q&A 備援遙測Ollama 主路徑仍為 GCP-A → GCP-B → 111Gemini 只記為 `openclaw_qa_gemini_fallback`NIM 只記為 `openclaw_qa_nim`AI Calls 會把 legacy `openclaw_qa + gemini` 標成 Gemini 備援,避免再次誤判 Gemini-first。
@@ -149,6 +150,7 @@
- Phase 73 candidate queue review decision writer run package新增 `services/market_intel/candidate_queue_review_decision_writer_run_package.py`、POST `/api/market_intel/manual_sample_review/candidate_queue_review_decision_writer_run_package` 與 UI package 按鈕,將 review_state transaction、preflight、operator drill、writer gate、post-write smoke、必要 artifact 與 rollback plan 組成正式 CLI 更新前的可稽核 run packageAPI/UI 不寫檔、不讀 token、不執行 CLI、不連 DB、不更新 review_state、不 commit、不掛 scheduler版本同步至 V10.262。
- Phase 74 candidate queue review decision writer run readiness新增 `services/market_intel/candidate_queue_review_decision_writer_run_readiness.py`、POST `/api/market_intel/manual_sample_review/candidate_queue_review_decision_writer_run_readiness` 與 UI readiness 按鈕,檢查 review_state CLI 更新前的 transaction JSON、備份、preflight、shell-only token 與 post-write smoke 計畫是否齊備API/UI 不寫檔、不讀 token、不執行 CLI、不連 DB、不更新 review_state、不 commit、不掛 scheduler版本同步至 V10.264。
- Phase 75 candidate queue review decision writer run receipt新增 `services/market_intel/candidate_queue_review_decision_writer_run_receipt.py`、POST `/api/market_intel/manual_sample_review/candidate_queue_review_decision_writer_run_receipt` 與 UI receipt 按鈕,審核 review_state CLI 更新後的 writer output、post-write smoke、dedupe key 一致性、artifact 路徑與 token 外洩風險API/UI 不回吐 receipt 原文、不讀 token、不執行 CLI、不連 DB、不更新 review_state、不 commit、不掛 scheduler版本同步至 V10.266。
- Phase 76 candidate queue review decision writer run closeout新增 `services/market_intel/candidate_queue_review_decision_writer_run_closeout.py`、POST `/api/market_intel/manual_sample_review/candidate_queue_review_decision_writer_run_closeout` 與 UI closeout 按鈕,在 review_state receipt 通過後整理 closeout gate、操作員 closeout artifact、post-closeout read-only inventory 確認與 promotion 摘要API/UI 不回吐 receipt 原文、不讀 token、不執行 CLI、不連 DB、不更新 review_state、不 commit、不掛 scheduler版本同步至 V10.267。
- V10.248 補市場情報 390px preview panel QAsample review 工具列改為 textarea + 可換行 action rail移除舊的硬編 8 欄 grid`check_responsive_overflow` 新增 `--screenshot-all`,本機 390x844 `/market_intel` 真頁面 QA 通過且 overflow=0。
- V10.250 補 Code Review Gemini 備援遙測護欄Ollama 主路徑失敗時 `fallback_to` 明確指向 `code_review_openclaw_gemini`測試鎖住「Gemini 不得記成 `code_review_openclaw` 主 caller」AI Calls 觀測台會把 legacy `code_review_openclaw + gemini` 顯示成 Gemini 備援,避免誤判 Gemini-first。
- Schema smoke`tests/test_market_intel_skeleton.py` 檢查 `Base.metadata` 內含 ADR-035 八張 `market_*` tables。

View File

@@ -320,7 +320,7 @@ YOUTUBE_API_KEY = os.getenv('YOUTUBE_API_KEY', '')
# ==========================================
# 系統版本與路徑
# ==========================================
SYSTEM_VERSION = "V10.266"
SYSTEM_VERSION = "V10.267"
LOG_FILE_PATH = os.path.join(BASE_DIR, 'logs/system.log')
public_url = PUBLIC_URL # 用於模板顯示

View File

@@ -2,7 +2,7 @@
> **最後更新**: 2026-05-19 (台北時間)
> **狀態**: 🟢 四 AI Agent 自動化閉環已落地LLM 路由紅線升級為 Ollama-first 三主機級聯Gemini 僅備援 / 鎖定場景
> **適用版本**: V10.253
> **適用版本**: V10.267
---
@@ -105,7 +105,8 @@ SQL漏斗(~300筆)
- Gunicorn runtime 預設 `worker_class = gthread``GUNICORN_THREADS=4``preload_app = False`;此組合讓 HUP 熱重載可用,也避免 Dashboard 長查詢完全阻塞 `/health`
- CD rebuild 模式必須先 build image 成功,再短暫 stop/rm/recreate 三應用容器,避免 no-cache build 造成長時間 502。
- ElephantAlpha 使用 NVIDIA NIM hosted APIproduction 預設模型為 `nvidia/llama-3.3-nemotron-super-49b-v1.5``ELEPHANT_ALPHA_FALLBACK_MODELS` 需保留至少一個可呼叫備援403/404、408/409/425/429、5xx、timeout 與 connection error 必須嘗試下一個模型。
- ElephantAlpha L3 HITL 只允許發送有實證、可審核、可行動的升級告警;價格類 trigger 無 Hermes 具體威脅、或 `resource_optimization` 低信心且沒有具體行動時,只記錄 suppressed escalation telemetry 與 cooldown不寫 pending `human_review`,不發 Telegram 空告警。
- ElephantAlpha L3 HITL 只允許發送有實證、可審核、可行動的升級告警;價格類 trigger 無 Hermes 具體威脅時,只記錄 suppressed escalation telemetry 與 cooldown不寫 pending `human_review`,不發 Telegram 空告警。
- `resource_optimization` 不再交給 LLM 生成「預期效益 / 已執行」敘事。此 trigger 必須先由程式量測 `action_plans` backlog、P1/P2 數、pending_review、逾時項目與 CPU load只有 CPU 達門檻、P1/P2 積壓或逾時積壓才發 Telegram「資源壓力告警」。單純 queue 大但 CPU 正常只記錄 telemetry不派發 Hermes/NemoTron、不宣稱 48 小時效益。
- OpenClaw/Hermes embedding 優先呼叫 Ollama `/api/embed`,只在舊節點不支援時 fallback `/api/embeddings`timeout 由 `EMBEDDING_TIMEOUT` / `OLLAMA_EMBED_TIMEOUT` 控制。
- PPT 自動產線由 `momo-scheduler` 依節奏執行 `run_ppt_auto_generation_task(schedule_kind)`:每日 20:30 產日報、週一 20:40 產週報/市場情報、每月 1 日 20:50 產月報與管理型簡報、季初 21:00 產季報、半年初 21:10 產半年報、年初 21:20 產年報,再交給 22:00 `ppt_vision_audit` 做視覺審核;每次嘗試會寫入 `ppt_generation_runs``/observability/ppt_audit_history` 以精準參數檢查目標版本是否已產生,並可用 `/observability/ppt_audit/generate_missing` 手動補齊缺漏,總開關為 `PPT_AUTO_GENERATION_ENABLED`。PPT vision 需 `PPT_VISION_ENABLED=true` 與容器內 LibreOffice`/observability/ppt_audit_file/<filename>` 會把 PPTX 轉成 PDF 快取供站內線上預覽,原始 PPTX 仍保留下載。

View File

@@ -11,7 +11,7 @@ ADR-012 Compliance:
§③ 單一 audit trail — 所有基行完畢後必發 triaged_alert Telegram
§⑤ 雙寫強制 — ai_insights (由 orchestrator._log_decision) + Telegram
ADR-013 Compliance:
resource_optimization trigger → auto_heal_service.handle_exception
resource_optimization trigger → deterministic resource-pressure telemetry + controlled throttling
"""
import asyncio
@@ -25,6 +25,7 @@ import threading
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta
from enum import Enum
from html import escape
from typing import Dict, List, Any, Optional
from sqlalchemy import text
@@ -46,6 +47,11 @@ CACHE_DB_PATH = os.getenv("ELEPHANT_ALPHA_CACHE_DB", ":memory:")
ESCALATION_COOLDOWN_MIN = int(os.getenv("ELEPHANT_ALPHA_ESCALATION_COOLDOWN_MIN", "30"))
CONFIDENCE_THRESHOLD = float(os.getenv("ELEPHANT_ALPHA_CONFIDENCE_THRESHOLD", "0.7"))
MAX_AUTONOMOUS_DECISIONS_PER_HOUR = int(os.getenv("ELEPHANT_ALPHA_MAX_AUTONOMOUS_DECISIONS_PER_HOUR", "10"))
RESOURCE_QUEUE_THRESHOLD = int(os.getenv("ELEPHANT_ALPHA_RESOURCE_QUEUE_THRESHOLD", "10"))
RESOURCE_LOAD_THRESHOLD_PCT = float(os.getenv("ELEPHANT_ALPHA_RESOURCE_LOAD_THRESHOLD_PCT", "80"))
RESOURCE_HIGH_PRIORITY_THRESHOLD = int(os.getenv("ELEPHANT_ALPHA_RESOURCE_HIGH_PRIORITY_THRESHOLD", "5"))
RESOURCE_STALE_THRESHOLD = int(os.getenv("ELEPHANT_ALPHA_RESOURCE_STALE_THRESHOLD", "5"))
RESOURCE_STALE_HOURS = int(os.getenv("ELEPHANT_ALPHA_RESOURCE_STALE_HOURS", "24"))
# ---- Constants ----
_ALLOWED_ACTION_TYPES = frozenset({
@@ -274,7 +280,13 @@ class ElephantAlphaAutonomousEngine:
),
AutonomousTrigger(
trigger_type="resource_optimization",
conditions={"system_load": "high", "queue_size": ">10"},
conditions={
"queue_threshold": RESOURCE_QUEUE_THRESHOLD,
"load_threshold_pct": RESOURCE_LOAD_THRESHOLD_PCT,
"high_priority_threshold": RESOURCE_HIGH_PRIORITY_THRESHOLD,
"stale_threshold": RESOURCE_STALE_THRESHOLD,
"stale_hours": RESOURCE_STALE_HOURS,
},
threshold=0.6,
enabled=True,
),
@@ -424,16 +436,12 @@ class ElephantAlphaAutonomousEngine:
session.close()
async def _check_resource_optimization_trigger(self, trigger: AutonomousTrigger) -> bool:
queue_size = self._get_action_queue_size()
system_load_pct = self._get_system_load_percentage()
metrics = self._collect_resource_pressure_metrics()
trigger.conditions = dict(trigger.conditions or {})
trigger.conditions["_resource_metrics"] = {
"action_queue_size": queue_size,
"system_load_pct": system_load_pct,
"queue_threshold": 10,
"load_threshold_pct": 80,
}
return queue_size > 10 or system_load_pct > 80
trigger.conditions["_resource_metrics"] = metrics
trigger.conditions["resource_pressure_level"] = metrics.get("pressure_level", "normal")
trigger.conditions["resource_evidence"] = metrics.get("evidence", [])
return bool(metrics.get("should_alert"))
async def _check_code_exception_trigger(self, trigger: AutonomousTrigger) -> bool:
containers = trigger.conditions.get("scan_containers", ["momo-pro-system", "momo-scheduler"])
@@ -551,6 +559,10 @@ class ElephantAlphaAutonomousEngine:
return default
async def _execute_autonomous_decision(self, trigger: AutonomousTrigger) -> None:
if trigger.trigger_type == "resource_optimization":
await self._handle_resource_optimization_trigger(trigger)
return
# ─── Operation Ollama-First v5.0 修補2026-05-03───
# 統帥反饋:每個 EA trigger 都先跑 Gemini orchestrate燒錢才 prefetch Hermes
# 結果 Hermes 0 threats 時送出空泛幻覺訊息 + Gemini 帳單照付。
@@ -704,6 +716,380 @@ class ElephantAlphaAutonomousEngine:
raise ValueError(f"Unrecognized step: agent={agent_type} action={action}")
# ---- Deterministic resource pressure handling ----
def _collect_resource_pressure_metrics(self) -> Dict[str, Any]:
system_load_pct = float(self._safe_metric(self._get_system_load_percentage, default=0.0) or 0.0)
rows: List[Any] = []
query_error = None
try:
session = get_session()
try:
rows = session.execute(text("""
SELECT status, priority, created_at, action_type, created_by, plan_type
FROM action_plans
WHERE status IN ('pending', 'auto_pending', 'pending_review')
""")).fetchall()
finally:
session.close()
except Exception as exc:
query_error = f"{type(exc).__name__}: {str(exc)[:160]}"
rows = []
now = datetime.now()
status_breakdown: Dict[str, int] = {}
type_breakdown: Dict[str, int] = {}
stale_count = 0
high_priority_count = 0
human_review_count = 0
oldest_pending_age_hours = 0.0
for row in rows:
data = row._mapping if hasattr(row, "_mapping") else row
status = self._row_get(data, "status")
priority = self._row_get(data, "priority")
action_type = (
self._row_get(data, "action_type")
or self._row_get(data, "plan_type")
or self._row_get(data, "created_by")
or "unknown"
)
created_at = self._coerce_datetime(self._row_get(data, "created_at"))
status_breakdown[str(status or "unknown")] = status_breakdown.get(str(status or "unknown"), 0) + 1
type_breakdown[str(action_type)] = type_breakdown.get(str(action_type), 0) + 1
if str(status) == "pending_review":
human_review_count += 1
try:
if priority is not None and int(priority) <= 2:
high_priority_count += 1
except (TypeError, ValueError):
pass
if created_at:
age_hours = max(0.0, (now - created_at).total_seconds() / 3600.0)
oldest_pending_age_hours = max(oldest_pending_age_hours, age_hours)
if age_hours >= RESOURCE_STALE_HOURS:
stale_count += 1
queue_size = len(rows)
if query_error:
queue_size = int(self._safe_metric(self._get_action_queue_size, default=0) or 0)
metrics = {
"action_queue_size": queue_size,
"high_priority_count": high_priority_count,
"human_review_count": human_review_count,
"stale_count": stale_count,
"oldest_pending_age_hours": round(oldest_pending_age_hours, 1),
"system_load_pct": round(system_load_pct, 1),
"queue_threshold": RESOURCE_QUEUE_THRESHOLD,
"load_threshold_pct": RESOURCE_LOAD_THRESHOLD_PCT,
"high_priority_threshold": RESOURCE_HIGH_PRIORITY_THRESHOLD,
"stale_threshold": RESOURCE_STALE_THRESHOLD,
"stale_hours": RESOURCE_STALE_HOURS,
"status_breakdown": status_breakdown,
"type_breakdown": dict(sorted(type_breakdown.items(), key=lambda item: item[1], reverse=True)[:6]),
}
if query_error:
metrics["query_error"] = query_error
return self._classify_resource_pressure(metrics)
@staticmethod
def _row_get(row: Any, key: str) -> Any:
if isinstance(row, dict):
return row.get(key)
try:
return row[key]
except Exception:
return getattr(row, key, None)
@staticmethod
def _coerce_datetime(value: Any) -> Optional[datetime]:
if isinstance(value, datetime):
return value.replace(tzinfo=None)
if isinstance(value, str) and value.strip():
try:
return datetime.fromisoformat(value.replace("Z", "+00:00")).replace(tzinfo=None)
except ValueError:
return None
return None
@staticmethod
def _classify_resource_pressure(metrics: Dict[str, Any]) -> Dict[str, Any]:
classified = dict(metrics or {})
queue_size = int(classified.get("action_queue_size") or 0)
high_priority_count = int(classified.get("high_priority_count") or 0)
human_review_count = int(classified.get("human_review_count") or 0)
stale_count = int(classified.get("stale_count") or 0)
system_load_pct = float(classified.get("system_load_pct") or 0.0)
queue_threshold = int(classified.get("queue_threshold") or RESOURCE_QUEUE_THRESHOLD)
load_threshold_pct = float(classified.get("load_threshold_pct") or RESOURCE_LOAD_THRESHOLD_PCT)
high_priority_threshold = int(classified.get("high_priority_threshold") or RESOURCE_HIGH_PRIORITY_THRESHOLD)
stale_threshold = int(classified.get("stale_threshold") or RESOURCE_STALE_THRESHOLD)
queue_pressure = queue_size > queue_threshold
load_pressure = system_load_pct >= load_threshold_pct
high_priority_pressure = high_priority_count >= high_priority_threshold
stale_pressure = stale_count >= stale_threshold
if load_pressure and (queue_pressure or high_priority_count > 0 or stale_count > 0):
pressure_level = "critical"
elif load_pressure or high_priority_pressure or stale_pressure:
pressure_level = "warning"
elif queue_pressure:
pressure_level = "backlog_only"
else:
pressure_level = "normal"
evidence = [
f"action_queue={queue_size}/{queue_threshold}",
f"system_load={system_load_pct:.1f}%/{load_threshold_pct:.0f}%",
f"high_priority={high_priority_count}/{high_priority_threshold}",
f"stale={stale_count}/{stale_threshold}",
]
if human_review_count:
evidence.append(f"pending_review={human_review_count}")
classified.update({
"pressure_level": pressure_level,
"should_alert": pressure_level in {"critical", "warning"},
"queue_pressure": queue_pressure,
"load_pressure": load_pressure,
"high_priority_pressure": high_priority_pressure,
"stale_pressure": stale_pressure,
"evidence": evidence,
"confidence": 0.95 if pressure_level == "critical" else 0.86 if pressure_level == "warning" else 0.68,
})
return classified
async def _handle_resource_optimization_trigger(self, trigger: AutonomousTrigger) -> None:
metrics = (trigger.conditions or {}).get("_resource_metrics")
if not isinstance(metrics, dict):
metrics = self._collect_resource_pressure_metrics()
else:
metrics = self._classify_resource_pressure(metrics)
trigger.conditions = dict(trigger.conditions or {})
trigger.conditions["_resource_metrics"] = metrics
trigger.conditions["resource_pressure_level"] = metrics.get("pressure_level")
trigger.conditions["resource_evidence"] = metrics.get("evidence", [])
if not metrics.get("should_alert"):
self._store_escalation(trigger.trigger_type)
self._record_resource_optimization_suppressed(trigger, metrics, "below_actionable_threshold")
return
previous_limit = self.max_autonomous_decisions_per_hour
new_limit = previous_limit
if metrics.get("load_pressure") or metrics.get("pressure_level") == "critical":
new_limit = min(previous_limit, 5)
elif metrics.get("high_priority_pressure") or metrics.get("stale_pressure"):
new_limit = min(previous_limit, 8)
self.max_autonomous_decisions_per_hour = new_limit
insight_id = None
try:
insight_id = self._record_resource_pressure_insight(metrics, previous_limit, new_limit)
except Exception as exc:
self._log.error("Resource pressure insight write failed (non-blocking): %s", exc)
await self._send_resource_pressure_telegram(metrics, insight_id, previous_limit, new_limit)
self._store_escalation(trigger.trigger_type)
self._circuit_reset()
def _record_resource_pressure_insight(
self,
metrics: Dict[str, Any],
previous_limit: int,
new_limit: int,
) -> Optional[int]:
content = self._format_resource_pressure_content(metrics, previous_limit, new_limit)
session = get_session()
try:
row = session.execute(
text("""
INSERT INTO ai_insights
(insight_type, content, confidence, created_by, status, metadata_json)
VALUES (:type, :content, :confidence, :created_by, :status, :metadata)
RETURNING id
"""),
{
"type": "resource_pressure",
"content": content,
"confidence": float(metrics.get("confidence") or 0.86),
"created_by": "elephant_alpha",
"status": "pending",
"metadata": json.dumps({
"source": "elephant_alpha_resource_pressure",
"metrics": metrics,
"previous_limit": previous_limit,
"new_limit": new_limit,
}, ensure_ascii=False),
},
).fetchone()
session.commit()
insight_id = row[0] if row else None
if insight_id:
try:
from services.openclaw_learning_service import enqueue_insight_embedding
enqueue_insight_embedding(insight_id, "resource_pressure", content)
except Exception as embed_err:
self._log.warning("Embedding enqueue failed for resource_pressure: %s", embed_err)
return insight_id
except Exception:
session.rollback()
raise
finally:
session.close()
def _record_resource_optimization_suppressed(
self,
trigger: AutonomousTrigger,
metrics: Dict[str, Any],
reason: str,
) -> None:
self._log.info(
"EA resource optimization suppressed: reason=%s level=%s metrics=%s",
reason,
metrics.get("pressure_level"),
metrics,
)
try:
from services.ai_call_logger import log_ai_call
with log_ai_call(
caller="ea_engine",
provider="gcp_ollama",
model="hermes3:latest",
meta={
"suppressed_resource_optimization": True,
"trigger": trigger.trigger_type,
"reason": reason,
"metrics": metrics,
},
) as ctx:
ctx.set_tokens(input=0, output=0)
ctx.status = "cache_only"
except Exception:
self._log.warning(
"EA resource suppression telemetry failed; trigger=%s",
trigger.trigger_type,
exc_info=True,
)
@staticmethod
def _format_resource_pressure_content(
metrics: Dict[str, Any],
previous_limit: int,
new_limit: int,
) -> str:
level = metrics.get("pressure_level", "unknown")
load_text = (
"主機負載達告警門檻"
if metrics.get("load_pressure")
else "主機負載未達告警門檻,壓力主要來自工作隊列"
)
return (
f"[Elephant Alpha 資源壓力] level={level}"
f"action_queue={metrics.get('action_queue_size', 0)}"
f"P1/P2={metrics.get('high_priority_count', 0)}"
f"pending_review={metrics.get('human_review_count', 0)}"
f"stale={metrics.get('stale_count', 0)}"
f"system_load={float(metrics.get('system_load_pct') or 0):.1f}%。"
f"{load_text}autonomous_limit={previous_limit}->{new_limit}"
)
@staticmethod
def _build_resource_pressure_actions(metrics: Dict[str, Any]) -> List[str]:
actions = []
if metrics.get("high_priority_pressure"):
actions.append("先處理 priority <= 2 的 action_plans避免高風險項目被一般建議淹沒。")
if metrics.get("stale_pressure"):
actions.append(
f"清理或關閉超過 {metrics.get('stale_hours', RESOURCE_STALE_HOURS)} 小時仍未處理的 pending / pending_review 項目。"
)
if metrics.get("load_pressure"):
actions.append("暫停非必要背景任務優先保留匯入、Dashboard、Telegram 與 AutoHeal 路徑。")
if not actions:
actions.append("僅保留觀測,不派發 Hermes/NemoTron/價格分析,避免把一般 backlog 誤報成資源事件。")
actions.append("確認 action_plans 來源是否持續產生重複建議;若是報表型建議,應改為摘要消化而非逐筆告警。")
return actions
@staticmethod
def _build_resource_pressure_telegram_message(
metrics: Dict[str, Any],
insight_id: Optional[int],
previous_limit: int,
new_limit: int,
) -> str:
level = str(metrics.get("pressure_level", "unknown"))
level_label = {
"critical": "P1 critical",
"warning": "P2 warning",
"backlog_only": "P3 backlog",
"normal": "P4 normal",
}.get(level, level)
load_pct = float(metrics.get("system_load_pct") or 0.0)
load_judgement = (
"主機 CPU 已達高負載門檻。"
if metrics.get("load_pressure")
else "主機 CPU 未達高負載門檻,這不是主機資源耗盡,而是工作隊列/人工審核積壓。"
)
executed = [
f"已寫入 ai_insights(resource_pressure) #{insight_id}"
if insight_id
else "ai_insights(resource_pressure) 寫入未取得 id請查看 scheduler log。"
]
if new_limit != previous_limit:
executed.append(f"已將 ElephantAlpha 自主決策上限由 {previous_limit} 調整為 {new_limit} 次/小時。")
executed.append("未執行外部修復、未啟動 Hermes/NemoTron 價格分析、未宣稱效益預測。")
lines = [
"<b>Elephant Alpha · 資源壓力告警</b>",
f"事件:<code>resource_optimization</code>",
f"等級:<b>{escape(level_label)}</b>",
f"時間:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
"",
"<b>量測指標</b>",
f"• Action queue{int(metrics.get('action_queue_size') or 0)} / {int(metrics.get('queue_threshold') or RESOURCE_QUEUE_THRESHOLD)}",
f"• P1/P2 待處理:{int(metrics.get('high_priority_count') or 0)} / {int(metrics.get('high_priority_threshold') or RESOURCE_HIGH_PRIORITY_THRESHOLD)}",
f"• Pending review{int(metrics.get('human_review_count') or 0)}",
f"• 逾時未處理:{int(metrics.get('stale_count') or 0)} / {int(metrics.get('stale_threshold') or RESOURCE_STALE_THRESHOLD)}",
f"• CPU load{load_pct:.1f}% / {float(metrics.get('load_threshold_pct') or RESOURCE_LOAD_THRESHOLD_PCT):.0f}%",
"",
"<b>判讀</b>",
f"{escape(load_judgement)}",
"• 這則告警只採用 action_plans 與 CPU 實測值,不採用 LLM 生成的 48 小時效益預測。",
"",
"<b>已執行</b>",
*[f"{escape(item)}" for item in executed],
"",
"<b>建議下一步</b>",
*[f"{escape(item)}" for item in ElephantAlphaAutonomousEngine._build_resource_pressure_actions(metrics)],
]
return "\n".join(lines)
async def _send_resource_pressure_telegram(
self,
metrics: Dict[str, Any],
insight_id: Optional[int],
previous_limit: int,
new_limit: int,
) -> None:
try:
from services.telegram_templates import _send_telegram_raw
msg = self._build_resource_pressure_telegram_message(
metrics,
insight_id,
previous_limit,
new_limit,
)
await self._run_with_timeout(_send_telegram_raw, msg, timeout=10)
self._log.info("Resource pressure Telegram sent: level=%s", metrics.get("pressure_level"))
except Exception as e:
self._log.error("Resource pressure Telegram failed (non-blocking): %s", e)
# ---- Sub-services ----
async def _hermes_analyze(self) -> Any:
from services.hermes_analyst_service import HermesAnalystService

View File

@@ -159,3 +159,124 @@ def test_escalate_resource_optimization_without_evidence_is_suppressed(monkeypat
assert cooldown == ["resource_optimization"]
assert suppressed == [("resource_optimization", "no_concrete_evidence")]
def test_resource_pressure_classifier_does_not_equate_backlog_with_cpu_load():
from services.elephant_alpha_autonomous_engine import ElephantAlphaAutonomousEngine
metrics = ElephantAlphaAutonomousEngine._classify_resource_pressure({
"action_queue_size": 34,
"high_priority_count": 0,
"human_review_count": 0,
"stale_count": 0,
"system_load_pct": 19.2,
"queue_threshold": 10,
"load_threshold_pct": 80,
"high_priority_threshold": 5,
"stale_threshold": 5,
})
assert metrics["pressure_level"] == "backlog_only"
assert metrics["should_alert"] is False
assert metrics["load_pressure"] is False
assert "system_load=19.2%/80%" in metrics["evidence"]
def test_resource_pressure_classifier_alerts_on_actionable_review_backlog():
from services.elephant_alpha_autonomous_engine import ElephantAlphaAutonomousEngine
metrics = ElephantAlphaAutonomousEngine._classify_resource_pressure({
"action_queue_size": 34,
"high_priority_count": 8,
"human_review_count": 6,
"stale_count": 0,
"system_load_pct": 22.0,
"queue_threshold": 10,
"load_threshold_pct": 80,
"high_priority_threshold": 5,
"stale_threshold": 5,
})
assert metrics["pressure_level"] == "warning"
assert metrics["should_alert"] is True
assert metrics["high_priority_pressure"] is True
assert metrics["load_pressure"] is False
def test_resource_pressure_message_is_measurement_based_not_llm_theatre():
from services.elephant_alpha_autonomous_engine import ElephantAlphaAutonomousEngine
metrics = ElephantAlphaAutonomousEngine._classify_resource_pressure({
"action_queue_size": 34,
"high_priority_count": 8,
"human_review_count": 6,
"stale_count": 5,
"system_load_pct": 22.0,
"queue_threshold": 10,
"load_threshold_pct": 80,
"high_priority_threshold": 5,
"stale_threshold": 5,
"stale_hours": 24,
})
msg = ElephantAlphaAutonomousEngine._build_resource_pressure_telegram_message(
metrics,
insight_id=123,
previous_limit=10,
new_limit=8,
)
assert "量測指標" in msg
assert "主機 CPU 未達高負載門檻" in msg
assert "未啟動 Hermes/NemoTron 價格分析" in msg
assert "預期效益" not in msg
assert "48小時" not in msg
assert "48 小時效益預測" in msg
def test_resource_optimization_bypasses_llm_orchestrator(monkeypatch):
import services.elephant_alpha_autonomous_engine as engine_module
from services.elephant_alpha_autonomous_engine import (
AutonomousTrigger,
ElephantAlphaAutonomousEngine,
)
engine = ElephantAlphaAutonomousEngine()
sent = []
stored = []
async def _boom(*args, **kwargs):
raise AssertionError("resource_optimization must not call LLM orchestrator")
async def _capture_send(*args, **kwargs):
sent.append(args)
monkeypatch.setattr(engine_module.elephant_orchestrator, "analyze_and_coordinate", _boom)
monkeypatch.setattr(engine, "_record_resource_pressure_insight", lambda *args, **kwargs: 77)
monkeypatch.setattr(engine, "_send_resource_pressure_telegram", _capture_send)
monkeypatch.setattr(engine, "_store_escalation", lambda trigger_type: stored.append(trigger_type))
trigger = AutonomousTrigger(
trigger_type="resource_optimization",
conditions={
"_resource_metrics": {
"action_queue_size": 34,
"high_priority_count": 9,
"human_review_count": 6,
"stale_count": 0,
"system_load_pct": 20.0,
"queue_threshold": 10,
"load_threshold_pct": 80,
"high_priority_threshold": 5,
"stale_threshold": 5,
}
},
threshold=0.6,
enabled=True,
)
asyncio.run(engine._execute_autonomous_decision(trigger))
assert stored == ["resource_optimization"]
assert len(sent) == 1
assert engine.max_autonomous_decisions_per_hour == 8