防止 action_plans 重複回長
All checks were successful
CD Pipeline / deploy (push) Successful in 1m0s

This commit is contained in:
OoO
2026-05-19 21:19:41 +08:00
parent 8cd70d8326
commit 5e2186a808
7 changed files with 240 additions and 5 deletions

View File

@@ -320,7 +320,7 @@ YOUTUBE_API_KEY = os.getenv('YOUTUBE_API_KEY', '')
# ==========================================
# 系統版本與路徑
# ==========================================
SYSTEM_VERSION = "V10.270"
SYSTEM_VERSION = "V10.271"
LOG_FILE_PATH = os.path.join(BASE_DIR, 'logs/system.log')
public_url = PUBLIC_URL # 用於模板顯示

View File

@@ -2,7 +2,7 @@
> **最後更新**: 2026-05-19 (台北時間)
> **狀態**: 🟢 四 AI Agent 自動化閉環已落地LLM 路由紅線升級為 Ollama-first 三主機級聯Gemini 僅備援 / 鎖定場景
> **適用版本**: V10.270
> **適用版本**: V10.271
---
@@ -108,6 +108,7 @@ SQL漏斗(~300筆)
- ElephantAlpha L3 HITL 只允許發送有實證、可審核、可行動的升級告警;價格類 trigger 無 Hermes 具體威脅時,只記錄 suppressed escalation telemetry 與 cooldown不寫 pending `human_review`,不發 Telegram 空告警。
- `resource_optimization` 不再交給 LLM 生成「預期效益 / 已執行」敘事。此 trigger 必須先由程式量測 `action_plans` backlog、P1/P2 數、pending_review、逾時項目與 CPU load只有 CPU 達門檻、P1/P2 積壓或逾時積壓才發 Telegram「資源壓力告警」。單純 queue 大但 CPU 正常只記錄 telemetry不派發 Hermes/NemoTron、不宣稱 48 小時效益。
- `resource_optimization` 會先執行 `ActionPlanHygieneService` 清理過期噪音:只關閉超過 72 小時的 `code_review_fix` / `openclaw_recommendation` 類 advisory action_plans以及 NemoTron `direct_response/reply_simple` 舊聊天回覆計畫;將狀態改為 `auto_disabled``rejected` 並寫入 `metadata_json.hygiene_history`。不刪資料,也不碰 NemoTron human_review / pricing / tool action 類業務行動。
- `action_plans` 產生端必須防重Code Review 同一檔案已有 active `code_review_fix` 時不重建OpenClaw recommendation 會寫入文字 fingerprint 並跳過同一建議AIOrchestrator 不再把 NemoTron `direct_response/reply_simple` 聊天回覆存成 action plan真正需工具、審核或執行的 NemoTron action 才能進 queue。
- OpenClaw/Hermes embedding 優先呼叫 Ollama `/api/embed`,只在舊節點不支援時 fallback `/api/embeddings`timeout 由 `EMBEDDING_TIMEOUT` / `OLLAMA_EMBED_TIMEOUT` 控制。
- PPT 自動產線由 `momo-scheduler` 依節奏執行 `run_ppt_auto_generation_task(schedule_kind)`:每日 20:30 產日報、週一 20:40 產週報/市場情報、每月 1 日 20:50 產月報與管理型簡報、季初 21:00 產季報、半年初 21:10 產半年報、年初 21:20 產年報,再交給 22:00 `ppt_vision_audit` 做視覺審核;每次嘗試會寫入 `ppt_generation_runs``/observability/ppt_audit_history` 以精準參數檢查目標版本是否已產生,並可用 `/observability/ppt_audit/generate_missing` 手動補齊缺漏,總開關為 `PPT_AUTO_GENERATION_ENABLED`。PPT vision 需 `PPT_VISION_ENABLED=true` 與容器內 LibreOffice`/observability/ppt_audit_file/<filename>` 會把 PPTX 轉成 PDF 快取供站內線上預覽,原始 PPTX 仍保留下載。

View File

@@ -0,0 +1,121 @@
"""Dedupe helpers for action_plans producers."""
import hashlib
import json
import re
from typing import Any, Dict
from sqlalchemy import text
ACTIVE_ACTION_PLAN_STATUSES = ("pending", "auto_pending", "pending_review")
def normalize_action_plan_text(value: Any) -> str:
text_value = str(value or "")
text_value = re.sub(r"[*_`#>\[\]()]", " ", text_value)
text_value = re.sub(r"\s*([:,。;;、||/→])\s*", r"\1", text_value)
text_value = re.sub(r"\s+", " ", text_value).strip().lower()
return text_value[:500]
def action_plan_fingerprint(value: Any) -> str:
normalized = normalize_action_plan_text(value)
return hashlib.sha256(normalized.encode("utf-8")).hexdigest()[:16]
def is_nemotron_direct_response_plan(plan: Dict[str, Any]) -> bool:
if not isinstance(plan, dict):
return False
actions = plan.get("action_plan") if isinstance(plan.get("action_plan"), list) else []
is_direct_response = plan.get("dispatch_to") == "direct_response"
is_reply_only = bool(actions) and all(
isinstance(action, dict) and action.get("action") == "reply_simple"
for action in actions
)
return bool(is_direct_response or is_reply_only)
def active_code_review_action_exists(session: Any, file_path: str) -> bool:
if not file_path:
return False
desc_prefix = f"Code Review 修復:{file_path}%"
metadata_marker = f'"file": "{file_path}"'
row = session.execute(
text("""
SELECT id
FROM action_plans
WHERE action_type = 'code_review_fix'
AND status IN ('pending', 'auto_pending', 'pending_review')
AND (
description LIKE :desc_prefix
OR metadata_json LIKE :metadata_marker
)
LIMIT 1
"""),
{
"desc_prefix": desc_prefix,
"metadata_marker": f"%{metadata_marker}%",
},
).fetchone()
return row is not None
def active_openclaw_recommendation_exists(session: Any, description: str) -> bool:
desc = str(description or "")[:500]
if not desc:
return False
fingerprint = action_plan_fingerprint(desc)
row = session.execute(
text("""
SELECT id
FROM action_plans
WHERE action_type = 'openclaw_recommendation'
AND status IN ('pending', 'auto_pending', 'pending_review')
AND (
description = :description
OR metadata_json LIKE :fingerprint_marker
)
LIMIT 1
"""),
{
"description": desc,
"fingerprint_marker": f'%"dedupe_fingerprint": "{fingerprint}"%',
},
).fetchone()
return row is not None
def active_nemotron_action_plan_exists(session: Any, plan: Dict[str, Any], payload_json: str) -> bool:
if not isinstance(plan, dict):
return False
row = session.execute(
text("""
SELECT id
FROM action_plans
WHERE created_by = 'nemotron'
AND status IN ('pending', 'auto_pending', 'pending_review')
AND COALESCE(session_id, '') = COALESCE(:session_id, '')
AND COALESCE(plan_type, '') = COALESCE(:plan_type, '')
AND COALESCE(sku, '') = COALESCE(:sku, '')
AND payload = :payload
LIMIT 1
"""),
{
"session_id": plan.get("session_id"),
"plan_type": plan.get("plan_type"),
"sku": plan.get("sku"),
"payload": payload_json,
},
).fetchone()
return row is not None
def openclaw_action_metadata(source_insight_id: Any, description: str) -> str:
return json.dumps(
{
"source_insight_id": source_insight_id,
"created_by": "openclaw",
"dedupe_fingerprint": action_plan_fingerprint(description),
},
ensure_ascii=False,
)

View File

@@ -8,6 +8,10 @@ from sqlalchemy import text
from services.hermes_analyst_service import HermesAnalystService
from services.nemoton_dispatcher_service import NemotronDispatcher
from services.action_plan_dedupe import (
active_nemotron_action_plan_exists,
is_nemotron_direct_response_plan,
)
from database.manager import get_session
from database.autoheal_models import AgentContext, ActionPlan
@@ -86,8 +90,16 @@ class AIOrchestrator:
session.close()
async def _save_action_plan(self, plan: Dict[str, Any]) -> None:
if is_nemotron_direct_response_plan(plan):
logger.info("[AIOrchestrator] skip direct_response action_plan persistence")
return
session = get_session()
try:
payload_json = json.dumps(plan, ensure_ascii=False)
if active_nemotron_action_plan_exists(session, plan, payload_json):
logger.info("[AIOrchestrator] skip duplicate nemotron action_plan")
return
session.execute(
text("""
INSERT INTO action_plans
@@ -99,7 +111,7 @@ class AIOrchestrator:
"sid": plan.get("session_id"),
"pt": plan.get("plan_type"),
"sku": plan.get("sku"),
"pl": json.dumps(plan, ensure_ascii=False),
"pl": payload_json,
"status": "auto_pending" if plan.get("auto_execute") else "pending",
},
)

View File

@@ -32,6 +32,7 @@ from sqlalchemy import text
# ADR-027Code Review 走 OllamaService 取得三主機級聯 retry。
from services.hermes_analyst_service import HERMES_MODEL as _HERMES_MODEL
from services.ai_call_logger import log_ai_call # Operation Ollama-First v5.0 P1
from services.action_plan_dedupe import active_code_review_action_exists
logger = logging.getLogger(__name__)
@@ -574,6 +575,9 @@ class CodeReviewPipeline:
try:
# 每個需修復的檔案建立一筆 action_plan
for fpath in fix_files[:3]:
if active_code_review_action_exists(session, fpath):
logger.info("[CodeReview] skip duplicate active action_plan file=%s", fpath)
continue
related = [f for f in findings if f.get("file") == fpath][:3]
desc = f"Code Review 修復:{fpath}{', '.join(f.get('description','')[:40] for f in related)}"
session.execute(text("""

View File

@@ -33,6 +33,10 @@ from database.manager import get_session
from sqlalchemy import bindparam, text
from services.ai_call_logger import log_ai_call # Operation Ollama-First v5.0 P1
from services.action_plan_dedupe import (
active_openclaw_recommendation_exists,
openclaw_action_metadata,
)
from services.rag_service import rag_service, is_rag_enabled # Phase 11 RAG-firstQ&A 限定)
logger = logging.getLogger(__name__)
@@ -940,14 +944,18 @@ def _save_action_items(actions: List[str], source_insight_id: Optional[int]) ->
session = get_session()
try:
for i, action in enumerate(actions[:10]):
desc = action[:500]
if active_openclaw_recommendation_exists(session, desc):
logger.info("[OpenClaw] action_plans skip duplicate recommendation")
continue
session.execute(text("""
INSERT INTO action_plans
(action_type, description, status, priority, metadata_json, created_at)
VALUES ('openclaw_recommendation', :desc, 'pending', :priority, :meta, NOW())
"""), {
"desc": action[:500],
"desc": desc,
"priority": i + 1,
"meta": json.dumps({"source_insight_id": source_insight_id, "created_by": "openclaw"}),
"meta": openclaw_action_metadata(source_insight_id, desc),
})
session.commit()
logger.info("[OpenClaw] action_plans 寫入 %d", len(actions[:10]))

View File

@@ -0,0 +1,89 @@
import asyncio
class _Result:
def __init__(self, row=None):
self._row = row
def fetchone(self):
return self._row
class _Session:
def __init__(self, exists=False):
self.exists = exists
self.calls = []
self.commits = 0
def execute(self, statement, params=None):
self.calls.append((str(statement), params or {}))
return _Result((1,) if self.exists else None)
def commit(self):
self.commits += 1
def rollback(self):
pass
def close(self):
pass
def test_nemotron_direct_response_is_not_persisted(monkeypatch):
import services.ai_orchestrator as module
session = _Session()
monkeypatch.setattr(module, "get_session", lambda: session)
plan = {
"session_id": "tg-1",
"dispatch_to": "direct_response",
"action_plan": [{"action": "reply_simple", "params": {"message": "hello"}}],
}
asyncio.run(module.AIOrchestrator()._save_action_plan(plan))
assert session.calls == []
assert session.commits == 0
def test_nemotron_duplicate_executable_plan_is_skipped(monkeypatch):
import services.ai_orchestrator as module
session = _Session(exists=True)
monkeypatch.setattr(module, "get_session", lambda: session)
plan = {
"session_id": "tg-1",
"plan_type": "price_adjust",
"sku": "SKU-1",
"action_plan": [{"action": "flag_for_human_review"}],
}
asyncio.run(module.AIOrchestrator()._save_action_plan(plan))
assert len(session.calls) == 1
assert "SELECT id" in session.calls[0][0]
assert session.commits == 0
def test_code_review_active_file_dedupe_uses_file_path_marker():
from services.action_plan_dedupe import active_code_review_action_exists
session = _Session(exists=True)
assert active_code_review_action_exists(session, "services/config.py") is True
params = session.calls[0][1]
assert params["desc_prefix"] == "Code Review 修復services/config.py%"
assert '"file": "services/config.py"' in params["metadata_marker"]
def test_openclaw_metadata_contains_stable_fingerprint():
import json
from services.action_plan_dedupe import openclaw_action_metadata
first = json.loads(openclaw_action_metadata(10, "**立即調查**:昨日業績為零"))
second = json.loads(openclaw_action_metadata(10, "立即調查 昨日業績為零"))
assert first["created_by"] == "openclaw"
assert first["dedupe_fingerprint"] == second["dedupe_fingerprint"]