"""Action plan queue hygiene. Closes stale, non-executable automation suggestions without deleting audit data. The service is intentionally conservative: it only handles sources that are known to be advisory/noisy when old, and it records the closure in metadata_json. """ import json import os from datetime import datetime, timedelta from typing import Any, Dict, Iterable, List, Optional from sqlalchemy import text from database.manager import get_session from services.logger_manager import SystemLogger logger = SystemLogger("ActionPlanHygiene").get_logger() DEFAULT_STALE_HOURS = int(os.getenv("ACTION_PLAN_HYGIENE_STALE_HOURS", "72")) DEFAULT_MAX_UPDATES = int(os.getenv("ACTION_PLAN_HYGIENE_MAX_UPDATES", "200")) CLOSABLE_STATUSES = frozenset({"pending", "auto_pending", "pending_review"}) SOURCE_TARGET_STATUS = { "code_review_fix": "auto_disabled", "code_review_pipeline": "auto_disabled", "nemotron_direct_response": "auto_disabled", "openclaw_recommendation": "rejected", "openclaw": "rejected", } def _row_get(row: Any, key: str) -> Any: if isinstance(row, dict): return row.get(key) if hasattr(row, "_mapping"): return row._mapping.get(key) try: return row[key] except Exception: return getattr(row, key, None) def _coerce_datetime(value: Any) -> Optional[datetime]: if isinstance(value, datetime): return value.replace(tzinfo=None) if isinstance(value, str) and value.strip(): try: return datetime.fromisoformat(value.replace("Z", "+00:00")).replace(tzinfo=None) except ValueError: return None return None def _parse_metadata(raw: Any) -> Dict[str, Any]: if isinstance(raw, dict): return dict(raw) if not raw: return {} try: parsed = json.loads(str(raw)) return parsed if isinstance(parsed, dict) else {"legacy_metadata": parsed} except Exception: return {"legacy_metadata_raw": str(raw)[:1000]} def _parse_payload(raw: Any) -> Dict[str, Any]: if isinstance(raw, dict): return dict(raw) if not raw: return {} try: parsed = json.loads(str(raw)) return parsed if isinstance(parsed, dict) else {} except Exception: return {} def _source_for_row(row: Any) -> str: created_by = str(_row_get(row, "created_by") or "") if created_by == "nemotron": payload = _parse_payload(_row_get(row, "payload")) actions = payload.get("action_plan") if isinstance(payload.get("action_plan"), list) else [] is_direct_response = payload.get("dispatch_to") == "direct_response" is_reply_only = bool(actions) and all( isinstance(action, dict) and action.get("action") == "reply_simple" for action in actions ) if is_direct_response or is_reply_only: return "nemotron_direct_response" return str( _row_get(row, "action_type") or _row_get(row, "plan_type") or created_by or "unknown" ) def build_action_plan_hygiene_preview( rows: Iterable[Any], *, now: Optional[datetime] = None, stale_hours: int = DEFAULT_STALE_HOURS, max_updates: int = DEFAULT_MAX_UPDATES, ) -> Dict[str, Any]: """Return stale action plan candidates without mutating the database.""" now = (now or datetime.now()).replace(tzinfo=None) cutoff = now - timedelta(hours=stale_hours) candidates: List[Dict[str, Any]] = [] scanned = 0 for row in rows: scanned += 1 status = str(_row_get(row, "status") or "") source = _source_for_row(row) created_at = _coerce_datetime(_row_get(row, "created_at")) if status not in CLOSABLE_STATUSES: continue if source not in SOURCE_TARGET_STATUS: continue if not created_at or created_at > cutoff: continue age_hours = max(0.0, (now - created_at).total_seconds() / 3600.0) candidates.append({ "id": int(_row_get(row, "id")), "source": source, "from_status": status, "to_status": SOURCE_TARGET_STATUS[source], "priority": _row_get(row, "priority"), "created_at": created_at.isoformat(sep=" "), "age_hours": round(age_hours, 1), "description": str(_row_get(row, "description") or "")[:160], "reason": f"stale_{source}_older_than_{stale_hours}h", }) candidates = candidates[:max_updates] by_source: Dict[str, int] = {} by_from_status: Dict[str, int] = {} for item in candidates: by_source[item["source"]] = by_source.get(item["source"], 0) + 1 by_from_status[item["from_status"]] = by_from_status.get(item["from_status"], 0) + 1 return { "scanned_count": scanned, "candidate_count": len(candidates), "stale_hours": stale_hours, "max_updates": max_updates, "by_source": by_source, "by_from_status": by_from_status, "candidates": candidates, } class ActionPlanHygieneService: """Close stale advisory action_plans while preserving audit metadata.""" def __init__(self, stale_hours: int = DEFAULT_STALE_HOURS, max_updates: int = DEFAULT_MAX_UPDATES): self.stale_hours = stale_hours self.max_updates = max_updates def preview(self) -> Dict[str, Any]: session = get_session() try: rows = session.execute(text(""" SELECT id, status, priority, created_at, action_type, plan_type, created_by, description, metadata_json, payload FROM action_plans WHERE status IN ('pending', 'auto_pending', 'pending_review') AND ( action_type IN ('code_review_fix', 'openclaw_recommendation') OR created_by IN ('code_review_pipeline', 'openclaw', 'nemotron') ) ORDER BY created_at ASC """)).fetchall() return build_action_plan_hygiene_preview( rows, stale_hours=self.stale_hours, max_updates=self.max_updates, ) finally: session.close() def run(self) -> Dict[str, Any]: session = get_session() now = datetime.now() try: rows = session.execute(text(""" SELECT id, status, priority, created_at, action_type, plan_type, created_by, description, metadata_json, payload FROM action_plans WHERE status IN ('pending', 'auto_pending', 'pending_review') AND ( action_type IN ('code_review_fix', 'openclaw_recommendation') OR created_by IN ('code_review_pipeline', 'openclaw', 'nemotron') ) ORDER BY created_at ASC """)).fetchall() preview = build_action_plan_hygiene_preview( rows, now=now, stale_hours=self.stale_hours, max_updates=self.max_updates, ) row_by_id = {int(_row_get(row, "id")): row for row in rows} updated: List[Dict[str, Any]] = [] for item in preview["candidates"]: row = row_by_id.get(item["id"]) if not row: continue metadata = _parse_metadata(_row_get(row, "metadata_json")) history = metadata.get("hygiene_history") if not isinstance(history, list): history = [] history.append({ "closed_at": now.isoformat(timespec="seconds"), "from_status": item["from_status"], "to_status": item["to_status"], "reason": item["reason"], "age_hours": item["age_hours"], }) metadata["hygiene_history"] = history[-10:] metadata["hygiene_last_reason"] = item["reason"] metadata["hygiene_last_closed_at"] = now.isoformat(timespec="seconds") session.execute( text(""" UPDATE action_plans SET status = :status, metadata_json = :metadata WHERE id = :id """), { "id": item["id"], "status": item["to_status"], "metadata": json.dumps(metadata, ensure_ascii=False), }, ) updated.append(item) session.commit() result = dict(preview) result["updated_count"] = len(updated) result["updated_ids"] = [item["id"] for item in updated] result["ran_at"] = now.isoformat(timespec="seconds") logger.info( "Action plan hygiene updated=%d by_source=%s", result["updated_count"], result.get("by_source"), ) return result except Exception: session.rollback() raise finally: session.close() def run_action_plan_hygiene( *, stale_hours: int = DEFAULT_STALE_HOURS, max_updates: int = DEFAULT_MAX_UPDATES, ) -> Dict[str, Any]: return ActionPlanHygieneService(stale_hours=stale_hours, max_updates=max_updates).run() __all__ = [ "ActionPlanHygieneService", "build_action_plan_hygiene_preview", "run_action_plan_hygiene", ]