274 lines
9.5 KiB
Python
274 lines
9.5 KiB
Python
"""Action plan queue hygiene.
|
|
|
|
Closes stale, non-executable automation suggestions without deleting audit data.
|
|
The service is intentionally conservative: it only handles sources that are
|
|
known to be advisory/noisy when old, and it records the closure in metadata_json.
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
from datetime import datetime, timedelta
|
|
from typing import Any, Dict, Iterable, List, Optional
|
|
|
|
from sqlalchemy import text
|
|
|
|
from database.manager import get_session
|
|
from services.logger_manager import SystemLogger
|
|
|
|
logger = SystemLogger("ActionPlanHygiene").get_logger()
|
|
|
|
DEFAULT_STALE_HOURS = int(os.getenv("ACTION_PLAN_HYGIENE_STALE_HOURS", "72"))
|
|
DEFAULT_MAX_UPDATES = int(os.getenv("ACTION_PLAN_HYGIENE_MAX_UPDATES", "200"))
|
|
|
|
CLOSABLE_STATUSES = frozenset({"pending", "auto_pending", "pending_review"})
|
|
SOURCE_TARGET_STATUS = {
|
|
"code_review_fix": "auto_disabled",
|
|
"code_review_pipeline": "auto_disabled",
|
|
"nemotron_direct_response": "auto_disabled",
|
|
"openclaw_recommendation": "rejected",
|
|
"openclaw": "rejected",
|
|
}
|
|
|
|
|
|
def _row_get(row: Any, key: str) -> Any:
|
|
if isinstance(row, dict):
|
|
return row.get(key)
|
|
if hasattr(row, "_mapping"):
|
|
return row._mapping.get(key)
|
|
try:
|
|
return row[key]
|
|
except Exception:
|
|
return getattr(row, key, None)
|
|
|
|
|
|
def _coerce_datetime(value: Any) -> Optional[datetime]:
|
|
if isinstance(value, datetime):
|
|
return value.replace(tzinfo=None)
|
|
if isinstance(value, str) and value.strip():
|
|
try:
|
|
return datetime.fromisoformat(value.replace("Z", "+00:00")).replace(tzinfo=None)
|
|
except ValueError:
|
|
return None
|
|
return None
|
|
|
|
|
|
def _parse_metadata(raw: Any) -> Dict[str, Any]:
|
|
if isinstance(raw, dict):
|
|
return dict(raw)
|
|
if not raw:
|
|
return {}
|
|
try:
|
|
parsed = json.loads(str(raw))
|
|
return parsed if isinstance(parsed, dict) else {"legacy_metadata": parsed}
|
|
except Exception:
|
|
return {"legacy_metadata_raw": str(raw)[:1000]}
|
|
|
|
|
|
def _parse_payload(raw: Any) -> Dict[str, Any]:
|
|
if isinstance(raw, dict):
|
|
return dict(raw)
|
|
if not raw:
|
|
return {}
|
|
try:
|
|
parsed = json.loads(str(raw))
|
|
return parsed if isinstance(parsed, dict) else {}
|
|
except Exception:
|
|
return {}
|
|
|
|
|
|
def _source_for_row(row: Any) -> str:
|
|
created_by = str(_row_get(row, "created_by") or "")
|
|
if created_by == "nemotron":
|
|
payload = _parse_payload(_row_get(row, "payload"))
|
|
actions = payload.get("action_plan") if isinstance(payload.get("action_plan"), list) else []
|
|
is_direct_response = payload.get("dispatch_to") == "direct_response"
|
|
is_reply_only = bool(actions) and all(
|
|
isinstance(action, dict) and action.get("action") == "reply_simple"
|
|
for action in actions
|
|
)
|
|
if is_direct_response or is_reply_only:
|
|
return "nemotron_direct_response"
|
|
|
|
return str(
|
|
_row_get(row, "action_type")
|
|
or _row_get(row, "plan_type")
|
|
or created_by
|
|
or "unknown"
|
|
)
|
|
|
|
|
|
def build_action_plan_hygiene_preview(
|
|
rows: Iterable[Any],
|
|
*,
|
|
now: Optional[datetime] = None,
|
|
stale_hours: int = DEFAULT_STALE_HOURS,
|
|
max_updates: int = DEFAULT_MAX_UPDATES,
|
|
) -> Dict[str, Any]:
|
|
"""Return stale action plan candidates without mutating the database."""
|
|
now = (now or datetime.now()).replace(tzinfo=None)
|
|
cutoff = now - timedelta(hours=stale_hours)
|
|
candidates: List[Dict[str, Any]] = []
|
|
scanned = 0
|
|
|
|
for row in rows:
|
|
scanned += 1
|
|
status = str(_row_get(row, "status") or "")
|
|
source = _source_for_row(row)
|
|
created_at = _coerce_datetime(_row_get(row, "created_at"))
|
|
if status not in CLOSABLE_STATUSES:
|
|
continue
|
|
if source not in SOURCE_TARGET_STATUS:
|
|
continue
|
|
if not created_at or created_at > cutoff:
|
|
continue
|
|
|
|
age_hours = max(0.0, (now - created_at).total_seconds() / 3600.0)
|
|
candidates.append({
|
|
"id": int(_row_get(row, "id")),
|
|
"source": source,
|
|
"from_status": status,
|
|
"to_status": SOURCE_TARGET_STATUS[source],
|
|
"priority": _row_get(row, "priority"),
|
|
"created_at": created_at.isoformat(sep=" "),
|
|
"age_hours": round(age_hours, 1),
|
|
"description": str(_row_get(row, "description") or "")[:160],
|
|
"reason": f"stale_{source}_older_than_{stale_hours}h",
|
|
})
|
|
|
|
candidates = candidates[:max_updates]
|
|
by_source: Dict[str, int] = {}
|
|
by_from_status: Dict[str, int] = {}
|
|
for item in candidates:
|
|
by_source[item["source"]] = by_source.get(item["source"], 0) + 1
|
|
by_from_status[item["from_status"]] = by_from_status.get(item["from_status"], 0) + 1
|
|
|
|
return {
|
|
"scanned_count": scanned,
|
|
"candidate_count": len(candidates),
|
|
"stale_hours": stale_hours,
|
|
"max_updates": max_updates,
|
|
"by_source": by_source,
|
|
"by_from_status": by_from_status,
|
|
"candidates": candidates,
|
|
}
|
|
|
|
|
|
class ActionPlanHygieneService:
|
|
"""Close stale advisory action_plans while preserving audit metadata."""
|
|
|
|
def __init__(self, stale_hours: int = DEFAULT_STALE_HOURS, max_updates: int = DEFAULT_MAX_UPDATES):
|
|
self.stale_hours = stale_hours
|
|
self.max_updates = max_updates
|
|
|
|
def preview(self) -> Dict[str, Any]:
|
|
session = get_session()
|
|
try:
|
|
rows = session.execute(text("""
|
|
SELECT id, status, priority, created_at, action_type, plan_type,
|
|
created_by, description, metadata_json, payload
|
|
FROM action_plans
|
|
WHERE status IN ('pending', 'auto_pending', 'pending_review')
|
|
AND (
|
|
action_type IN ('code_review_fix', 'openclaw_recommendation')
|
|
OR created_by IN ('code_review_pipeline', 'openclaw', 'nemotron')
|
|
)
|
|
ORDER BY created_at ASC
|
|
""")).fetchall()
|
|
return build_action_plan_hygiene_preview(
|
|
rows,
|
|
stale_hours=self.stale_hours,
|
|
max_updates=self.max_updates,
|
|
)
|
|
finally:
|
|
session.close()
|
|
|
|
def run(self) -> Dict[str, Any]:
|
|
session = get_session()
|
|
now = datetime.now()
|
|
try:
|
|
rows = session.execute(text("""
|
|
SELECT id, status, priority, created_at, action_type, plan_type,
|
|
created_by, description, metadata_json, payload
|
|
FROM action_plans
|
|
WHERE status IN ('pending', 'auto_pending', 'pending_review')
|
|
AND (
|
|
action_type IN ('code_review_fix', 'openclaw_recommendation')
|
|
OR created_by IN ('code_review_pipeline', 'openclaw', 'nemotron')
|
|
)
|
|
ORDER BY created_at ASC
|
|
""")).fetchall()
|
|
preview = build_action_plan_hygiene_preview(
|
|
rows,
|
|
now=now,
|
|
stale_hours=self.stale_hours,
|
|
max_updates=self.max_updates,
|
|
)
|
|
|
|
row_by_id = {int(_row_get(row, "id")): row for row in rows}
|
|
updated: List[Dict[str, Any]] = []
|
|
for item in preview["candidates"]:
|
|
row = row_by_id.get(item["id"])
|
|
if not row:
|
|
continue
|
|
metadata = _parse_metadata(_row_get(row, "metadata_json"))
|
|
history = metadata.get("hygiene_history")
|
|
if not isinstance(history, list):
|
|
history = []
|
|
history.append({
|
|
"closed_at": now.isoformat(timespec="seconds"),
|
|
"from_status": item["from_status"],
|
|
"to_status": item["to_status"],
|
|
"reason": item["reason"],
|
|
"age_hours": item["age_hours"],
|
|
})
|
|
metadata["hygiene_history"] = history[-10:]
|
|
metadata["hygiene_last_reason"] = item["reason"]
|
|
metadata["hygiene_last_closed_at"] = now.isoformat(timespec="seconds")
|
|
|
|
session.execute(
|
|
text("""
|
|
UPDATE action_plans
|
|
SET status = :status,
|
|
metadata_json = :metadata
|
|
WHERE id = :id
|
|
"""),
|
|
{
|
|
"id": item["id"],
|
|
"status": item["to_status"],
|
|
"metadata": json.dumps(metadata, ensure_ascii=False),
|
|
},
|
|
)
|
|
updated.append(item)
|
|
|
|
session.commit()
|
|
result = dict(preview)
|
|
result["updated_count"] = len(updated)
|
|
result["updated_ids"] = [item["id"] for item in updated]
|
|
result["ran_at"] = now.isoformat(timespec="seconds")
|
|
logger.info(
|
|
"Action plan hygiene updated=%d by_source=%s",
|
|
result["updated_count"],
|
|
result.get("by_source"),
|
|
)
|
|
return result
|
|
except Exception:
|
|
session.rollback()
|
|
raise
|
|
finally:
|
|
session.close()
|
|
|
|
|
|
def run_action_plan_hygiene(
|
|
*,
|
|
stale_hours: int = DEFAULT_STALE_HOURS,
|
|
max_updates: int = DEFAULT_MAX_UPDATES,
|
|
) -> Dict[str, Any]:
|
|
return ActionPlanHygieneService(stale_hours=stale_hours, max_updates=max_updates).run()
|
|
|
|
|
|
__all__ = [
|
|
"ActionPlanHygieneService",
|
|
"build_action_plan_hygiene_preview",
|
|
"run_action_plan_hygiene",
|
|
]
|