清理 EA 過期行動隊列
All checks were successful
CD Pipeline / deploy (push) Successful in 1m3s

This commit is contained in:
OoO
2026-05-19 21:01:44 +08:00
parent d32b1f0e80
commit aaebd6b86d
6 changed files with 383 additions and 4 deletions

View File

@@ -320,7 +320,7 @@ YOUTUBE_API_KEY = os.getenv('YOUTUBE_API_KEY', '')
# ==========================================
# 系統版本與路徑
# ==========================================
SYSTEM_VERSION = "V10.268"
SYSTEM_VERSION = "V10.269"
LOG_FILE_PATH = os.path.join(BASE_DIR, 'logs/system.log')
public_url = PUBLIC_URL # 用於模板顯示

View File

@@ -2,7 +2,7 @@
> **最後更新**: 2026-05-19 (台北時間)
> **狀態**: 🟢 四 AI Agent 自動化閉環已落地LLM 路由紅線升級為 Ollama-first 三主機級聯Gemini 僅備援 / 鎖定場景
> **適用版本**: V10.267
> **適用版本**: V10.269
---
@@ -107,6 +107,7 @@ SQL漏斗(~300筆)
- ElephantAlpha 使用 NVIDIA NIM hosted APIproduction 預設模型為 `nvidia/llama-3.3-nemotron-super-49b-v1.5``ELEPHANT_ALPHA_FALLBACK_MODELS` 需保留至少一個可呼叫備援403/404、408/409/425/429、5xx、timeout 與 connection error 必須嘗試下一個模型。
- ElephantAlpha L3 HITL 只允許發送有實證、可審核、可行動的升級告警;價格類 trigger 無 Hermes 具體威脅時,只記錄 suppressed escalation telemetry 與 cooldown不寫 pending `human_review`,不發 Telegram 空告警。
- `resource_optimization` 不再交給 LLM 生成「預期效益 / 已執行」敘事。此 trigger 必須先由程式量測 `action_plans` backlog、P1/P2 數、pending_review、逾時項目與 CPU load只有 CPU 達門檻、P1/P2 積壓或逾時積壓才發 Telegram「資源壓力告警」。單純 queue 大但 CPU 正常只記錄 telemetry不派發 Hermes/NemoTron、不宣稱 48 小時效益。
- `resource_optimization` 會先執行 `ActionPlanHygieneService` 清理過期噪音:只關閉超過 72 小時的 `code_review_fix` / `openclaw_recommendation` 類 advisory action_plans將狀態改為 `auto_disabled``rejected` 並寫入 `metadata_json.hygiene_history`;不刪資料、不碰 NemoTron 業務行動。
- OpenClaw/Hermes embedding 優先呼叫 Ollama `/api/embed`,只在舊節點不支援時 fallback `/api/embeddings`timeout 由 `EMBEDDING_TIMEOUT` / `OLLAMA_EMBED_TIMEOUT` 控制。
- PPT 自動產線由 `momo-scheduler` 依節奏執行 `run_ppt_auto_generation_task(schedule_kind)`:每日 20:30 產日報、週一 20:40 產週報/市場情報、每月 1 日 20:50 產月報與管理型簡報、季初 21:00 產季報、半年初 21:10 產半年報、年初 21:20 產年報,再交給 22:00 `ppt_vision_audit` 做視覺審核;每次嘗試會寫入 `ppt_generation_runs``/observability/ppt_audit_history` 以精準參數檢查目標版本是否已產生,並可用 `/observability/ppt_audit/generate_missing` 手動補齊缺漏,總開關為 `PPT_AUTO_GENERATION_ENABLED`。PPT vision 需 `PPT_VISION_ENABLED=true` 與容器內 LibreOffice`/observability/ppt_audit_file/<filename>` 會把 PPTX 轉成 PDF 快取供站內線上預覽,原始 PPTX 仍保留下載。

View File

@@ -0,0 +1,248 @@
"""Action plan queue hygiene.
Closes stale, non-executable automation suggestions without deleting audit data.
The service is intentionally conservative: it only handles sources that are
known to be advisory/noisy when old, and it records the closure in metadata_json.
"""
import json
import os
from datetime import datetime, timedelta
from typing import Any, Dict, Iterable, List, Optional
from sqlalchemy import text
from database.manager import get_session
from services.logger_manager import SystemLogger
logger = SystemLogger("ActionPlanHygiene").get_logger()
DEFAULT_STALE_HOURS = int(os.getenv("ACTION_PLAN_HYGIENE_STALE_HOURS", "72"))
DEFAULT_MAX_UPDATES = int(os.getenv("ACTION_PLAN_HYGIENE_MAX_UPDATES", "200"))
CLOSABLE_STATUSES = frozenset({"pending", "auto_pending", "pending_review"})
SOURCE_TARGET_STATUS = {
"code_review_fix": "auto_disabled",
"code_review_pipeline": "auto_disabled",
"openclaw_recommendation": "rejected",
"openclaw": "rejected",
}
def _row_get(row: Any, key: str) -> Any:
if isinstance(row, dict):
return row.get(key)
if hasattr(row, "_mapping"):
return row._mapping.get(key)
try:
return row[key]
except Exception:
return getattr(row, key, None)
def _coerce_datetime(value: Any) -> Optional[datetime]:
if isinstance(value, datetime):
return value.replace(tzinfo=None)
if isinstance(value, str) and value.strip():
try:
return datetime.fromisoformat(value.replace("Z", "+00:00")).replace(tzinfo=None)
except ValueError:
return None
return None
def _source_for_row(row: Any) -> str:
return str(
_row_get(row, "action_type")
or _row_get(row, "plan_type")
or _row_get(row, "created_by")
or "unknown"
)
def _parse_metadata(raw: Any) -> Dict[str, Any]:
if isinstance(raw, dict):
return dict(raw)
if not raw:
return {}
try:
parsed = json.loads(str(raw))
return parsed if isinstance(parsed, dict) else {"legacy_metadata": parsed}
except Exception:
return {"legacy_metadata_raw": str(raw)[:1000]}
def build_action_plan_hygiene_preview(
rows: Iterable[Any],
*,
now: Optional[datetime] = None,
stale_hours: int = DEFAULT_STALE_HOURS,
max_updates: int = DEFAULT_MAX_UPDATES,
) -> Dict[str, Any]:
"""Return stale action plan candidates without mutating the database."""
now = (now or datetime.now()).replace(tzinfo=None)
cutoff = now - timedelta(hours=stale_hours)
candidates: List[Dict[str, Any]] = []
scanned = 0
for row in rows:
scanned += 1
status = str(_row_get(row, "status") or "")
source = _source_for_row(row)
created_at = _coerce_datetime(_row_get(row, "created_at"))
if status not in CLOSABLE_STATUSES:
continue
if source not in SOURCE_TARGET_STATUS:
continue
if not created_at or created_at > cutoff:
continue
age_hours = max(0.0, (now - created_at).total_seconds() / 3600.0)
candidates.append({
"id": int(_row_get(row, "id")),
"source": source,
"from_status": status,
"to_status": SOURCE_TARGET_STATUS[source],
"priority": _row_get(row, "priority"),
"created_at": created_at.isoformat(sep=" "),
"age_hours": round(age_hours, 1),
"description": str(_row_get(row, "description") or "")[:160],
"reason": f"stale_{source}_older_than_{stale_hours}h",
})
candidates = candidates[:max_updates]
by_source: Dict[str, int] = {}
by_from_status: Dict[str, int] = {}
for item in candidates:
by_source[item["source"]] = by_source.get(item["source"], 0) + 1
by_from_status[item["from_status"]] = by_from_status.get(item["from_status"], 0) + 1
return {
"scanned_count": scanned,
"candidate_count": len(candidates),
"stale_hours": stale_hours,
"max_updates": max_updates,
"by_source": by_source,
"by_from_status": by_from_status,
"candidates": candidates,
}
class ActionPlanHygieneService:
"""Close stale advisory action_plans while preserving audit metadata."""
def __init__(self, stale_hours: int = DEFAULT_STALE_HOURS, max_updates: int = DEFAULT_MAX_UPDATES):
self.stale_hours = stale_hours
self.max_updates = max_updates
def preview(self) -> Dict[str, Any]:
session = get_session()
try:
rows = session.execute(text("""
SELECT id, status, priority, created_at, action_type, plan_type,
created_by, description, metadata_json
FROM action_plans
WHERE status IN ('pending', 'auto_pending', 'pending_review')
AND (
action_type IN ('code_review_fix', 'openclaw_recommendation')
OR created_by IN ('code_review_pipeline', 'openclaw')
)
ORDER BY created_at ASC
""")).fetchall()
return build_action_plan_hygiene_preview(
rows,
stale_hours=self.stale_hours,
max_updates=self.max_updates,
)
finally:
session.close()
def run(self) -> Dict[str, Any]:
session = get_session()
now = datetime.now()
try:
rows = session.execute(text("""
SELECT id, status, priority, created_at, action_type, plan_type,
created_by, description, metadata_json
FROM action_plans
WHERE status IN ('pending', 'auto_pending', 'pending_review')
AND (
action_type IN ('code_review_fix', 'openclaw_recommendation')
OR created_by IN ('code_review_pipeline', 'openclaw')
)
ORDER BY created_at ASC
""")).fetchall()
preview = build_action_plan_hygiene_preview(
rows,
now=now,
stale_hours=self.stale_hours,
max_updates=self.max_updates,
)
row_by_id = {int(_row_get(row, "id")): row for row in rows}
updated: List[Dict[str, Any]] = []
for item in preview["candidates"]:
row = row_by_id.get(item["id"])
if not row:
continue
metadata = _parse_metadata(_row_get(row, "metadata_json"))
history = metadata.get("hygiene_history")
if not isinstance(history, list):
history = []
history.append({
"closed_at": now.isoformat(timespec="seconds"),
"from_status": item["from_status"],
"to_status": item["to_status"],
"reason": item["reason"],
"age_hours": item["age_hours"],
})
metadata["hygiene_history"] = history[-10:]
metadata["hygiene_last_reason"] = item["reason"]
metadata["hygiene_last_closed_at"] = now.isoformat(timespec="seconds")
session.execute(
text("""
UPDATE action_plans
SET status = :status,
metadata_json = :metadata
WHERE id = :id
"""),
{
"id": item["id"],
"status": item["to_status"],
"metadata": json.dumps(metadata, ensure_ascii=False),
},
)
updated.append(item)
session.commit()
result = dict(preview)
result["updated_count"] = len(updated)
result["updated_ids"] = [item["id"] for item in updated]
result["ran_at"] = now.isoformat(timespec="seconds")
logger.info(
"Action plan hygiene updated=%d by_source=%s",
result["updated_count"],
result.get("by_source"),
)
return result
except Exception:
session.rollback()
raise
finally:
session.close()
def run_action_plan_hygiene(
*,
stale_hours: int = DEFAULT_STALE_HOURS,
max_updates: int = DEFAULT_MAX_UPDATES,
) -> Dict[str, Any]:
return ActionPlanHygieneService(stale_hours=stale_hours, max_updates=max_updates).run()
__all__ = [
"ActionPlanHygieneService",
"build_action_plan_hygiene_preview",
"run_action_plan_hygiene",
]

View File

@@ -52,6 +52,7 @@ RESOURCE_LOAD_THRESHOLD_PCT = float(os.getenv("ELEPHANT_ALPHA_RESOURCE_LOAD_THRE
RESOURCE_HIGH_PRIORITY_THRESHOLD = int(os.getenv("ELEPHANT_ALPHA_RESOURCE_HIGH_PRIORITY_THRESHOLD", "5"))
RESOURCE_STALE_THRESHOLD = int(os.getenv("ELEPHANT_ALPHA_RESOURCE_STALE_THRESHOLD", "5"))
RESOURCE_STALE_HOURS = int(os.getenv("ELEPHANT_ALPHA_RESOURCE_STALE_HOURS", "24"))
RESOURCE_HYGIENE_ENABLED = os.getenv("ELEPHANT_ALPHA_RESOURCE_HYGIENE_ENABLED", "true").lower() in {"1", "true", "yes", "on"}
# ---- Constants ----
_ALLOWED_ACTION_TYPES = frozenset({
@@ -870,12 +871,21 @@ class ElephantAlphaAutonomousEngine:
else:
metrics = self._classify_resource_pressure(metrics)
pre_hygiene_metrics = dict(metrics)
hygiene_result = None
if metrics.get("should_alert") and RESOURCE_HYGIENE_ENABLED:
hygiene_result = self._run_action_plan_hygiene()
if hygiene_result and int(hygiene_result.get("updated_count") or 0) > 0:
metrics = self._collect_resource_pressure_metrics()
metrics["pre_hygiene"] = pre_hygiene_metrics
metrics["hygiene_result"] = hygiene_result
trigger.conditions = dict(trigger.conditions or {})
trigger.conditions["_resource_metrics"] = metrics
trigger.conditions["resource_pressure_level"] = metrics.get("pressure_level")
trigger.conditions["resource_evidence"] = metrics.get("evidence", [])
if not metrics.get("should_alert"):
if not metrics.get("should_alert") and not hygiene_result:
self._store_escalation(trigger.trigger_type)
self._record_resource_optimization_suppressed(trigger, metrics, "below_actionable_threshold")
return
@@ -897,6 +907,15 @@ class ElephantAlphaAutonomousEngine:
self._store_escalation(trigger.trigger_type)
self._circuit_reset()
def _run_action_plan_hygiene(self) -> Optional[Dict[str, Any]]:
try:
from services.action_plan_hygiene import run_action_plan_hygiene
return run_action_plan_hygiene()
except Exception as exc:
self._log.error("Action plan hygiene failed (non-blocking): %s", exc)
return {"updated_count": 0, "error": f"{type(exc).__name__}: {str(exc)[:200]}"}
def _record_resource_pressure_insight(
self,
metrics: Dict[str, Any],
@@ -996,7 +1015,9 @@ class ElephantAlphaAutonomousEngine:
f"pending_review={metrics.get('human_review_count', 0)}"
f"stale={metrics.get('stale_count', 0)}"
f"system_load={float(metrics.get('system_load_pct') or 0):.1f}%。"
f"{load_text}autonomous_limit={previous_limit}->{new_limit}"
f"{load_text}"
f"auto_closed={int((metrics.get('hygiene_result') or {}).get('updated_count') or 0)}"
f"autonomous_limit={previous_limit}->{new_limit}"
)
@staticmethod
@@ -1030,6 +1051,11 @@ class ElephantAlphaAutonomousEngine:
"normal": "P4 normal",
}.get(level, level)
load_pct = float(metrics.get("system_load_pct") or 0.0)
pre_hygiene = metrics.get("pre_hygiene") if isinstance(metrics.get("pre_hygiene"), dict) else None
hygiene = metrics.get("hygiene_result") if isinstance(metrics.get("hygiene_result"), dict) else None
hygiene_count = int((hygiene or {}).get("updated_count") or 0)
if hygiene_count > 0 and not metrics.get("should_alert"):
level_label = "P4 resolved"
load_judgement = (
"主機 CPU 已達高負載門檻。"
if metrics.get("load_pressure")
@@ -1042,6 +1068,13 @@ class ElephantAlphaAutonomousEngine:
]
if new_limit != previous_limit:
executed.append(f"已將 ElephantAlpha 自主決策上限由 {previous_limit} 調整為 {new_limit} 次/小時。")
if hygiene_count > 0:
by_source = hygiene.get("by_source") or {}
source_text = "".join(f"{key} {value}" for key, value in by_source.items()) or f"{hygiene_count}"
executed.insert(
1,
f"已自動關閉過期 action_plans {hygiene_count} 筆({source_text});只改 status/metadata不刪除資料。",
)
executed.append("未執行外部修復、未啟動 Hermes/NemoTron 價格分析、未宣稱效益預測。")
lines = [
@@ -1051,6 +1084,16 @@ class ElephantAlphaAutonomousEngine:
f"時間:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
"",
"<b>量測指標</b>",
]
if pre_hygiene:
lines.append(
"• 清理前 Action queue"
f"{int(pre_hygiene.get('action_queue_size') or 0)}"
f"P1/P2{int(pre_hygiene.get('high_priority_count') or 0)}"
f"逾時:{int(pre_hygiene.get('stale_count') or 0)}"
)
lines.append("<b>清理後</b>")
lines += [
f"• Action queue{int(metrics.get('action_queue_size') or 0)} / {int(metrics.get('queue_threshold') or RESOURCE_QUEUE_THRESHOLD)}",
f"• P1/P2 待處理:{int(metrics.get('high_priority_count') or 0)} / {int(metrics.get('high_priority_threshold') or RESOURCE_HIGH_PRIORITY_THRESHOLD)}",
f"• Pending review{int(metrics.get('human_review_count') or 0)}",

View File

@@ -0,0 +1,49 @@
from datetime import datetime, timedelta
def test_action_plan_hygiene_preview_closes_only_stale_advisory_sources():
from services.action_plan_hygiene import build_action_plan_hygiene_preview
now = datetime(2026, 5, 19, 12, 0, 0)
rows = [
{
"id": 1,
"status": "pending",
"priority": 1,
"created_at": now - timedelta(hours=100),
"action_type": "openclaw_recommendation",
"description": "old advisory",
},
{
"id": 2,
"status": "auto_pending",
"priority": 1,
"created_at": now - timedelta(hours=90),
"action_type": "code_review_fix",
"description": "old code review",
},
{
"id": 3,
"status": "pending",
"priority": 1,
"created_at": now - timedelta(hours=90),
"created_by": "nemotron",
"description": "must stay",
},
{
"id": 4,
"status": "pending",
"priority": 1,
"created_at": now - timedelta(hours=2),
"action_type": "openclaw_recommendation",
"description": "fresh advisory",
},
]
preview = build_action_plan_hygiene_preview(rows, now=now, stale_hours=72)
assert preview["candidate_count"] == 2
assert preview["by_source"] == {"openclaw_recommendation": 1, "code_review_fix": 1}
assert {item["id"] for item in preview["candidates"]} == {1, 2}
target_by_id = {item["id"]: item["to_status"] for item in preview["candidates"]}
assert target_by_id == {1: "rejected", 2: "auto_disabled"}

View File

@@ -252,6 +252,7 @@ def test_resource_optimization_bypasses_llm_orchestrator(monkeypatch):
sent.append(args)
monkeypatch.setattr(engine_module.elephant_orchestrator, "analyze_and_coordinate", _boom)
monkeypatch.setattr(engine, "_run_action_plan_hygiene", lambda: {"updated_count": 0})
monkeypatch.setattr(engine, "_record_resource_pressure_insight", lambda *args, **kwargs: 77)
monkeypatch.setattr(engine, "_send_resource_pressure_telegram", _capture_send)
monkeypatch.setattr(engine, "_store_escalation", lambda trigger_type: stored.append(trigger_type))
@@ -280,3 +281,40 @@ def test_resource_optimization_bypasses_llm_orchestrator(monkeypatch):
assert stored == ["resource_optimization"]
assert len(sent) == 1
assert engine.max_autonomous_decisions_per_hour == 8
def test_resource_pressure_message_reports_hygiene_result():
from services.elephant_alpha_autonomous_engine import ElephantAlphaAutonomousEngine
metrics = ElephantAlphaAutonomousEngine._classify_resource_pressure({
"action_queue_size": 9,
"high_priority_count": 0,
"human_review_count": 0,
"stale_count": 0,
"system_load_pct": 20.0,
"queue_threshold": 10,
"load_threshold_pct": 80,
"high_priority_threshold": 5,
"stale_threshold": 5,
})
metrics["pre_hygiene"] = {
"action_queue_size": 100,
"high_priority_count": 47,
"stale_count": 99,
}
metrics["hygiene_result"] = {
"updated_count": 91,
"by_source": {"code_review_fix": 66, "openclaw_recommendation": 25},
}
msg = ElephantAlphaAutonomousEngine._build_resource_pressure_telegram_message(
metrics,
insight_id=124,
previous_limit=10,
new_limit=10,
)
assert "P4 resolved" in msg
assert "清理前 Action queue100" in msg
assert "已自動關閉過期 action_plans 91 筆" in msg
assert "只改 status/metadata不刪除資料" in msg