"""Dedupe helpers for action_plans producers.""" import hashlib import json import re from typing import Any, Dict from sqlalchemy import text ACTIVE_ACTION_PLAN_STATUSES = ("pending", "auto_pending", "pending_review") def normalize_action_plan_text(value: Any) -> str: text_value = str(value or "") text_value = re.sub(r"[*_`#>\[\]()]", " ", text_value) text_value = re.sub(r"\s*([::,,。;;、||/→])\s*", r"\1", text_value) text_value = re.sub(r"\s+", " ", text_value).strip().lower() return text_value[:500] def action_plan_fingerprint(value: Any) -> str: normalized = normalize_action_plan_text(value) return hashlib.sha256(normalized.encode("utf-8")).hexdigest()[:16] def is_nemotron_direct_response_plan(plan: Dict[str, Any]) -> bool: if not isinstance(plan, dict): return False actions = plan.get("action_plan") if isinstance(plan.get("action_plan"), list) else [] is_direct_response = plan.get("dispatch_to") == "direct_response" is_reply_only = bool(actions) and all( isinstance(action, dict) and action.get("action") == "reply_simple" for action in actions ) return bool(is_direct_response or is_reply_only) def active_code_review_action_exists(session: Any, file_path: str) -> bool: if not file_path: return False desc_prefix = f"Code Review 修復:{file_path}|%" metadata_marker = f'"file": "{file_path}"' row = session.execute( text(""" SELECT id FROM action_plans WHERE action_type = 'code_review_fix' AND status IN ('pending', 'auto_pending', 'pending_review') AND ( description LIKE :desc_prefix OR metadata_json LIKE :metadata_marker ) LIMIT 1 """), { "desc_prefix": desc_prefix, "metadata_marker": f"%{metadata_marker}%", }, ).fetchone() return row is not None def active_openclaw_recommendation_exists(session: Any, description: str) -> bool: desc = str(description or "")[:500] if not desc: return False fingerprint = action_plan_fingerprint(desc) row = session.execute( text(""" SELECT id FROM action_plans WHERE action_type = 'openclaw_recommendation' AND status IN ('pending', 'auto_pending', 'pending_review') AND ( description = :description OR metadata_json LIKE :fingerprint_marker ) LIMIT 1 """), { "description": desc, "fingerprint_marker": f'%"dedupe_fingerprint": "{fingerprint}"%', }, ).fetchone() return row is not None def active_nemotron_action_plan_exists(session: Any, plan: Dict[str, Any], payload_json: str) -> bool: if not isinstance(plan, dict): return False row = session.execute( text(""" SELECT id FROM action_plans WHERE created_by = 'nemotron' AND status IN ('pending', 'auto_pending', 'pending_review') AND COALESCE(session_id, '') = COALESCE(:session_id, '') AND COALESCE(plan_type, '') = COALESCE(:plan_type, '') AND COALESCE(sku, '') = COALESCE(:sku, '') AND payload = :payload LIMIT 1 """), { "session_id": plan.get("session_id"), "plan_type": plan.get("plan_type"), "sku": plan.get("sku"), "payload": payload_json, }, ).fetchone() return row is not None def openclaw_action_metadata(source_insight_id: Any, description: str) -> str: return json.dumps( { "source_insight_id": source_insight_id, "created_by": "openclaw", "dedupe_fingerprint": action_plan_fingerprint(description), }, ensure_ascii=False, )