From 5e2186a808d9f7868c316790337b504fbc03f031 Mon Sep 17 00:00:00 2001 From: OoO Date: Tue, 19 May 2026 21:19:41 +0800 Subject: [PATCH] =?UTF-8?q?=E9=98=B2=E6=AD=A2=20action=5Fplans=20=E9=87=8D?= =?UTF-8?q?=E8=A4=87=E5=9B=9E=E9=95=B7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config.py | 2 +- docs/AI_INTELLIGENCE_MODULE_SOT.md | 3 +- services/action_plan_dedupe.py | 121 +++++++++++++++++++++++ services/ai_orchestrator.py | 14 ++- services/code_review_pipeline_service.py | 4 + services/openclaw_strategist_service.py | 12 ++- tests/test_action_plan_dedupe.py | 89 +++++++++++++++++ 7 files changed, 240 insertions(+), 5 deletions(-) create mode 100644 services/action_plan_dedupe.py create mode 100644 tests/test_action_plan_dedupe.py diff --git a/config.py b/config.py index 9383960..80dc199 100644 --- a/config.py +++ b/config.py @@ -320,7 +320,7 @@ YOUTUBE_API_KEY = os.getenv('YOUTUBE_API_KEY', '') # ========================================== # 系統版本與路徑 # ========================================== -SYSTEM_VERSION = "V10.270" +SYSTEM_VERSION = "V10.271" LOG_FILE_PATH = os.path.join(BASE_DIR, 'logs/system.log') public_url = PUBLIC_URL # 用於模板顯示 diff --git a/docs/AI_INTELLIGENCE_MODULE_SOT.md b/docs/AI_INTELLIGENCE_MODULE_SOT.md index 2daca6f..c15da64 100644 --- a/docs/AI_INTELLIGENCE_MODULE_SOT.md +++ b/docs/AI_INTELLIGENCE_MODULE_SOT.md @@ -2,7 +2,7 @@ > **最後更新**: 2026-05-19 (台北時間) > **狀態**: 🟢 四 AI Agent 自動化閉環已落地;LLM 路由紅線升級為 Ollama-first 三主機級聯,Gemini 僅備援 / 鎖定場景 -> **適用版本**: V10.270 +> **適用版本**: V10.271 --- @@ -108,6 +108,7 @@ SQL漏斗(~300筆) - ElephantAlpha L3 HITL 只允許發送有實證、可審核、可行動的升級告警;價格類 trigger 無 Hermes 具體威脅時,只記錄 suppressed escalation telemetry 與 cooldown,不寫 pending `human_review`,不發 Telegram 空告警。 - `resource_optimization` 不再交給 LLM 生成「預期效益 / 已執行」敘事。此 trigger 必須先由程式量測 `action_plans` backlog、P1/P2 數、pending_review、逾時項目與 CPU load;只有 CPU 達門檻、P1/P2 積壓或逾時積壓才發 Telegram「資源壓力告警」。單純 queue 大但 CPU 正常只記錄 telemetry,不派發 Hermes/NemoTron、不宣稱 48 小時效益。 - `resource_optimization` 會先執行 `ActionPlanHygieneService` 清理過期噪音:只關閉超過 72 小時的 `code_review_fix` / `openclaw_recommendation` 類 advisory action_plans,以及 NemoTron `direct_response/reply_simple` 舊聊天回覆計畫;將狀態改為 `auto_disabled` 或 `rejected` 並寫入 `metadata_json.hygiene_history`。不刪資料,也不碰 NemoTron human_review / pricing / tool action 類業務行動。 +- `action_plans` 產生端必須防重:Code Review 同一檔案已有 active `code_review_fix` 時不重建;OpenClaw recommendation 會寫入文字 fingerprint 並跳過同一建議;AIOrchestrator 不再把 NemoTron `direct_response/reply_simple` 聊天回覆存成 action plan,真正需工具、審核或執行的 NemoTron action 才能進 queue。 - OpenClaw/Hermes embedding 優先呼叫 Ollama `/api/embed`,只在舊節點不支援時 fallback `/api/embeddings`;timeout 由 `EMBEDDING_TIMEOUT` / `OLLAMA_EMBED_TIMEOUT` 控制。 - PPT 自動產線由 `momo-scheduler` 依節奏執行 `run_ppt_auto_generation_task(schedule_kind)`:每日 20:30 產日報、週一 20:40 產週報/市場情報、每月 1 日 20:50 產月報與管理型簡報、季初 21:00 產季報、半年初 21:10 產半年報、年初 21:20 產年報,再交給 22:00 `ppt_vision_audit` 做視覺審核;每次嘗試會寫入 `ppt_generation_runs`,`/observability/ppt_audit_history` 以精準參數檢查目標版本是否已產生,並可用 `/observability/ppt_audit/generate_missing` 手動補齊缺漏,總開關為 `PPT_AUTO_GENERATION_ENABLED`。PPT vision 需 `PPT_VISION_ENABLED=true` 與容器內 LibreOffice;`/observability/ppt_audit_file/` 會把 PPTX 轉成 PDF 快取供站內線上預覽,原始 PPTX 仍保留下載。 diff --git a/services/action_plan_dedupe.py b/services/action_plan_dedupe.py new file mode 100644 index 0000000..f559197 --- /dev/null +++ b/services/action_plan_dedupe.py @@ -0,0 +1,121 @@ +"""Dedupe helpers for action_plans producers.""" + +import hashlib +import json +import re +from typing import Any, Dict + +from sqlalchemy import text + +ACTIVE_ACTION_PLAN_STATUSES = ("pending", "auto_pending", "pending_review") + + +def normalize_action_plan_text(value: Any) -> str: + text_value = str(value or "") + text_value = re.sub(r"[*_`#>\[\]()]", " ", text_value) + text_value = re.sub(r"\s*([::,,。;;、||/→])\s*", r"\1", text_value) + text_value = re.sub(r"\s+", " ", text_value).strip().lower() + return text_value[:500] + + +def action_plan_fingerprint(value: Any) -> str: + normalized = normalize_action_plan_text(value) + return hashlib.sha256(normalized.encode("utf-8")).hexdigest()[:16] + + +def is_nemotron_direct_response_plan(plan: Dict[str, Any]) -> bool: + if not isinstance(plan, dict): + return False + actions = plan.get("action_plan") if isinstance(plan.get("action_plan"), list) else [] + is_direct_response = plan.get("dispatch_to") == "direct_response" + is_reply_only = bool(actions) and all( + isinstance(action, dict) and action.get("action") == "reply_simple" + for action in actions + ) + return bool(is_direct_response or is_reply_only) + + +def active_code_review_action_exists(session: Any, file_path: str) -> bool: + if not file_path: + return False + desc_prefix = f"Code Review 修復:{file_path}|%" + metadata_marker = f'"file": "{file_path}"' + row = session.execute( + text(""" + SELECT id + FROM action_plans + WHERE action_type = 'code_review_fix' + AND status IN ('pending', 'auto_pending', 'pending_review') + AND ( + description LIKE :desc_prefix + OR metadata_json LIKE :metadata_marker + ) + LIMIT 1 + """), + { + "desc_prefix": desc_prefix, + "metadata_marker": f"%{metadata_marker}%", + }, + ).fetchone() + return row is not None + + +def active_openclaw_recommendation_exists(session: Any, description: str) -> bool: + desc = str(description or "")[:500] + if not desc: + return False + fingerprint = action_plan_fingerprint(desc) + row = session.execute( + text(""" + SELECT id + FROM action_plans + WHERE action_type = 'openclaw_recommendation' + AND status IN ('pending', 'auto_pending', 'pending_review') + AND ( + description = :description + OR metadata_json LIKE :fingerprint_marker + ) + LIMIT 1 + """), + { + "description": desc, + "fingerprint_marker": f'%"dedupe_fingerprint": "{fingerprint}"%', + }, + ).fetchone() + return row is not None + + +def active_nemotron_action_plan_exists(session: Any, plan: Dict[str, Any], payload_json: str) -> bool: + if not isinstance(plan, dict): + return False + row = session.execute( + text(""" + SELECT id + FROM action_plans + WHERE created_by = 'nemotron' + AND status IN ('pending', 'auto_pending', 'pending_review') + AND COALESCE(session_id, '') = COALESCE(:session_id, '') + AND COALESCE(plan_type, '') = COALESCE(:plan_type, '') + AND COALESCE(sku, '') = COALESCE(:sku, '') + AND payload = :payload + LIMIT 1 + """), + { + "session_id": plan.get("session_id"), + "plan_type": plan.get("plan_type"), + "sku": plan.get("sku"), + "payload": payload_json, + }, + ).fetchone() + return row is not None + + +def openclaw_action_metadata(source_insight_id: Any, description: str) -> str: + return json.dumps( + { + "source_insight_id": source_insight_id, + "created_by": "openclaw", + "dedupe_fingerprint": action_plan_fingerprint(description), + }, + ensure_ascii=False, + ) diff --git a/services/ai_orchestrator.py b/services/ai_orchestrator.py index 283a53b..5346a5d 100644 --- a/services/ai_orchestrator.py +++ b/services/ai_orchestrator.py @@ -8,6 +8,10 @@ from sqlalchemy import text from services.hermes_analyst_service import HermesAnalystService from services.nemoton_dispatcher_service import NemotronDispatcher +from services.action_plan_dedupe import ( + active_nemotron_action_plan_exists, + is_nemotron_direct_response_plan, +) from database.manager import get_session from database.autoheal_models import AgentContext, ActionPlan @@ -86,8 +90,16 @@ class AIOrchestrator: session.close() async def _save_action_plan(self, plan: Dict[str, Any]) -> None: + if is_nemotron_direct_response_plan(plan): + logger.info("[AIOrchestrator] skip direct_response action_plan persistence") + return + session = get_session() try: + payload_json = json.dumps(plan, ensure_ascii=False) + if active_nemotron_action_plan_exists(session, plan, payload_json): + logger.info("[AIOrchestrator] skip duplicate nemotron action_plan") + return session.execute( text(""" INSERT INTO action_plans @@ -99,7 +111,7 @@ class AIOrchestrator: "sid": plan.get("session_id"), "pt": plan.get("plan_type"), "sku": plan.get("sku"), - "pl": json.dumps(plan, ensure_ascii=False), + "pl": payload_json, "status": "auto_pending" if plan.get("auto_execute") else "pending", }, ) diff --git a/services/code_review_pipeline_service.py b/services/code_review_pipeline_service.py index 1813796..7cf3311 100644 --- a/services/code_review_pipeline_service.py +++ b/services/code_review_pipeline_service.py @@ -32,6 +32,7 @@ from sqlalchemy import text # ADR-027:Code Review 走 OllamaService 取得三主機級聯 retry。 from services.hermes_analyst_service import HERMES_MODEL as _HERMES_MODEL from services.ai_call_logger import log_ai_call # Operation Ollama-First v5.0 P1 +from services.action_plan_dedupe import active_code_review_action_exists logger = logging.getLogger(__name__) @@ -574,6 +575,9 @@ class CodeReviewPipeline: try: # 每個需修復的檔案建立一筆 action_plan for fpath in fix_files[:3]: + if active_code_review_action_exists(session, fpath): + logger.info("[CodeReview] skip duplicate active action_plan file=%s", fpath) + continue related = [f for f in findings if f.get("file") == fpath][:3] desc = f"Code Review 修復:{fpath}|{', '.join(f.get('description','')[:40] for f in related)}" session.execute(text(""" diff --git a/services/openclaw_strategist_service.py b/services/openclaw_strategist_service.py index 2f34419..d0ced47 100644 --- a/services/openclaw_strategist_service.py +++ b/services/openclaw_strategist_service.py @@ -33,6 +33,10 @@ from database.manager import get_session from sqlalchemy import bindparam, text from services.ai_call_logger import log_ai_call # Operation Ollama-First v5.0 P1 +from services.action_plan_dedupe import ( + active_openclaw_recommendation_exists, + openclaw_action_metadata, +) from services.rag_service import rag_service, is_rag_enabled # Phase 11 RAG-first(Q&A 限定) logger = logging.getLogger(__name__) @@ -940,14 +944,18 @@ def _save_action_items(actions: List[str], source_insight_id: Optional[int]) -> session = get_session() try: for i, action in enumerate(actions[:10]): + desc = action[:500] + if active_openclaw_recommendation_exists(session, desc): + logger.info("[OpenClaw] action_plans skip duplicate recommendation") + continue session.execute(text(""" INSERT INTO action_plans (action_type, description, status, priority, metadata_json, created_at) VALUES ('openclaw_recommendation', :desc, 'pending', :priority, :meta, NOW()) """), { - "desc": action[:500], + "desc": desc, "priority": i + 1, - "meta": json.dumps({"source_insight_id": source_insight_id, "created_by": "openclaw"}), + "meta": openclaw_action_metadata(source_insight_id, desc), }) session.commit() logger.info("[OpenClaw] action_plans 寫入 %d 筆", len(actions[:10])) diff --git a/tests/test_action_plan_dedupe.py b/tests/test_action_plan_dedupe.py new file mode 100644 index 0000000..9dff4b8 --- /dev/null +++ b/tests/test_action_plan_dedupe.py @@ -0,0 +1,89 @@ +import asyncio + + +class _Result: + def __init__(self, row=None): + self._row = row + + def fetchone(self): + return self._row + + +class _Session: + def __init__(self, exists=False): + self.exists = exists + self.calls = [] + self.commits = 0 + + def execute(self, statement, params=None): + self.calls.append((str(statement), params or {})) + return _Result((1,) if self.exists else None) + + def commit(self): + self.commits += 1 + + def rollback(self): + pass + + def close(self): + pass + + +def test_nemotron_direct_response_is_not_persisted(monkeypatch): + import services.ai_orchestrator as module + + session = _Session() + monkeypatch.setattr(module, "get_session", lambda: session) + + plan = { + "session_id": "tg-1", + "dispatch_to": "direct_response", + "action_plan": [{"action": "reply_simple", "params": {"message": "hello"}}], + } + + asyncio.run(module.AIOrchestrator()._save_action_plan(plan)) + + assert session.calls == [] + assert session.commits == 0 + + +def test_nemotron_duplicate_executable_plan_is_skipped(monkeypatch): + import services.ai_orchestrator as module + + session = _Session(exists=True) + monkeypatch.setattr(module, "get_session", lambda: session) + + plan = { + "session_id": "tg-1", + "plan_type": "price_adjust", + "sku": "SKU-1", + "action_plan": [{"action": "flag_for_human_review"}], + } + + asyncio.run(module.AIOrchestrator()._save_action_plan(plan)) + + assert len(session.calls) == 1 + assert "SELECT id" in session.calls[0][0] + assert session.commits == 0 + + +def test_code_review_active_file_dedupe_uses_file_path_marker(): + from services.action_plan_dedupe import active_code_review_action_exists + + session = _Session(exists=True) + + assert active_code_review_action_exists(session, "services/config.py") is True + params = session.calls[0][1] + assert params["desc_prefix"] == "Code Review 修復:services/config.py|%" + assert '"file": "services/config.py"' in params["metadata_marker"] + + +def test_openclaw_metadata_contains_stable_fingerprint(): + import json + from services.action_plan_dedupe import openclaw_action_metadata + + first = json.loads(openclaw_action_metadata(10, "**立即調查**:昨日業績為零")) + second = json.loads(openclaw_action_metadata(10, "立即調查 : 昨日業績為零")) + + assert first["created_by"] == "openclaw" + assert first["dedupe_fingerprint"] == second["dedupe_fingerprint"]