Files
ewoooc/services/action_plan_hygiene.py
OoO 8c13c941c0
Some checks failed
CD Pipeline / deploy (push) Has been cancelled
收斂 NemoTron 舊回覆行動
2026-05-19 21:09:37 +08:00

274 lines
9.5 KiB
Python

"""Action plan queue hygiene.
Closes stale, non-executable automation suggestions without deleting audit data.
The service is intentionally conservative: it only handles sources that are
known to be advisory/noisy when old, and it records the closure in metadata_json.
"""
import json
import os
from datetime import datetime, timedelta
from typing import Any, Dict, Iterable, List, Optional
from sqlalchemy import text
from database.manager import get_session
from services.logger_manager import SystemLogger
logger = SystemLogger("ActionPlanHygiene").get_logger()
DEFAULT_STALE_HOURS = int(os.getenv("ACTION_PLAN_HYGIENE_STALE_HOURS", "72"))
DEFAULT_MAX_UPDATES = int(os.getenv("ACTION_PLAN_HYGIENE_MAX_UPDATES", "200"))
CLOSABLE_STATUSES = frozenset({"pending", "auto_pending", "pending_review"})
SOURCE_TARGET_STATUS = {
"code_review_fix": "auto_disabled",
"code_review_pipeline": "auto_disabled",
"nemotron_direct_response": "auto_disabled",
"openclaw_recommendation": "rejected",
"openclaw": "rejected",
}
def _row_get(row: Any, key: str) -> Any:
if isinstance(row, dict):
return row.get(key)
if hasattr(row, "_mapping"):
return row._mapping.get(key)
try:
return row[key]
except Exception:
return getattr(row, key, None)
def _coerce_datetime(value: Any) -> Optional[datetime]:
if isinstance(value, datetime):
return value.replace(tzinfo=None)
if isinstance(value, str) and value.strip():
try:
return datetime.fromisoformat(value.replace("Z", "+00:00")).replace(tzinfo=None)
except ValueError:
return None
return None
def _parse_metadata(raw: Any) -> Dict[str, Any]:
if isinstance(raw, dict):
return dict(raw)
if not raw:
return {}
try:
parsed = json.loads(str(raw))
return parsed if isinstance(parsed, dict) else {"legacy_metadata": parsed}
except Exception:
return {"legacy_metadata_raw": str(raw)[:1000]}
def _parse_payload(raw: Any) -> Dict[str, Any]:
if isinstance(raw, dict):
return dict(raw)
if not raw:
return {}
try:
parsed = json.loads(str(raw))
return parsed if isinstance(parsed, dict) else {}
except Exception:
return {}
def _source_for_row(row: Any) -> str:
created_by = str(_row_get(row, "created_by") or "")
if created_by == "nemotron":
payload = _parse_payload(_row_get(row, "payload"))
actions = payload.get("action_plan") if isinstance(payload.get("action_plan"), list) else []
is_direct_response = payload.get("dispatch_to") == "direct_response"
is_reply_only = bool(actions) and all(
isinstance(action, dict) and action.get("action") == "reply_simple"
for action in actions
)
if is_direct_response or is_reply_only:
return "nemotron_direct_response"
return str(
_row_get(row, "action_type")
or _row_get(row, "plan_type")
or created_by
or "unknown"
)
def build_action_plan_hygiene_preview(
rows: Iterable[Any],
*,
now: Optional[datetime] = None,
stale_hours: int = DEFAULT_STALE_HOURS,
max_updates: int = DEFAULT_MAX_UPDATES,
) -> Dict[str, Any]:
"""Return stale action plan candidates without mutating the database."""
now = (now or datetime.now()).replace(tzinfo=None)
cutoff = now - timedelta(hours=stale_hours)
candidates: List[Dict[str, Any]] = []
scanned = 0
for row in rows:
scanned += 1
status = str(_row_get(row, "status") or "")
source = _source_for_row(row)
created_at = _coerce_datetime(_row_get(row, "created_at"))
if status not in CLOSABLE_STATUSES:
continue
if source not in SOURCE_TARGET_STATUS:
continue
if not created_at or created_at > cutoff:
continue
age_hours = max(0.0, (now - created_at).total_seconds() / 3600.0)
candidates.append({
"id": int(_row_get(row, "id")),
"source": source,
"from_status": status,
"to_status": SOURCE_TARGET_STATUS[source],
"priority": _row_get(row, "priority"),
"created_at": created_at.isoformat(sep=" "),
"age_hours": round(age_hours, 1),
"description": str(_row_get(row, "description") or "")[:160],
"reason": f"stale_{source}_older_than_{stale_hours}h",
})
candidates = candidates[:max_updates]
by_source: Dict[str, int] = {}
by_from_status: Dict[str, int] = {}
for item in candidates:
by_source[item["source"]] = by_source.get(item["source"], 0) + 1
by_from_status[item["from_status"]] = by_from_status.get(item["from_status"], 0) + 1
return {
"scanned_count": scanned,
"candidate_count": len(candidates),
"stale_hours": stale_hours,
"max_updates": max_updates,
"by_source": by_source,
"by_from_status": by_from_status,
"candidates": candidates,
}
class ActionPlanHygieneService:
"""Close stale advisory action_plans while preserving audit metadata."""
def __init__(self, stale_hours: int = DEFAULT_STALE_HOURS, max_updates: int = DEFAULT_MAX_UPDATES):
self.stale_hours = stale_hours
self.max_updates = max_updates
def preview(self) -> Dict[str, Any]:
session = get_session()
try:
rows = session.execute(text("""
SELECT id, status, priority, created_at, action_type, plan_type,
created_by, description, metadata_json, payload
FROM action_plans
WHERE status IN ('pending', 'auto_pending', 'pending_review')
AND (
action_type IN ('code_review_fix', 'openclaw_recommendation')
OR created_by IN ('code_review_pipeline', 'openclaw', 'nemotron')
)
ORDER BY created_at ASC
""")).fetchall()
return build_action_plan_hygiene_preview(
rows,
stale_hours=self.stale_hours,
max_updates=self.max_updates,
)
finally:
session.close()
def run(self) -> Dict[str, Any]:
session = get_session()
now = datetime.now()
try:
rows = session.execute(text("""
SELECT id, status, priority, created_at, action_type, plan_type,
created_by, description, metadata_json, payload
FROM action_plans
WHERE status IN ('pending', 'auto_pending', 'pending_review')
AND (
action_type IN ('code_review_fix', 'openclaw_recommendation')
OR created_by IN ('code_review_pipeline', 'openclaw', 'nemotron')
)
ORDER BY created_at ASC
""")).fetchall()
preview = build_action_plan_hygiene_preview(
rows,
now=now,
stale_hours=self.stale_hours,
max_updates=self.max_updates,
)
row_by_id = {int(_row_get(row, "id")): row for row in rows}
updated: List[Dict[str, Any]] = []
for item in preview["candidates"]:
row = row_by_id.get(item["id"])
if not row:
continue
metadata = _parse_metadata(_row_get(row, "metadata_json"))
history = metadata.get("hygiene_history")
if not isinstance(history, list):
history = []
history.append({
"closed_at": now.isoformat(timespec="seconds"),
"from_status": item["from_status"],
"to_status": item["to_status"],
"reason": item["reason"],
"age_hours": item["age_hours"],
})
metadata["hygiene_history"] = history[-10:]
metadata["hygiene_last_reason"] = item["reason"]
metadata["hygiene_last_closed_at"] = now.isoformat(timespec="seconds")
session.execute(
text("""
UPDATE action_plans
SET status = :status,
metadata_json = :metadata
WHERE id = :id
"""),
{
"id": item["id"],
"status": item["to_status"],
"metadata": json.dumps(metadata, ensure_ascii=False),
},
)
updated.append(item)
session.commit()
result = dict(preview)
result["updated_count"] = len(updated)
result["updated_ids"] = [item["id"] for item in updated]
result["ran_at"] = now.isoformat(timespec="seconds")
logger.info(
"Action plan hygiene updated=%d by_source=%s",
result["updated_count"],
result.get("by_source"),
)
return result
except Exception:
session.rollback()
raise
finally:
session.close()
def run_action_plan_hygiene(
*,
stale_hours: int = DEFAULT_STALE_HOURS,
max_updates: int = DEFAULT_MAX_UPDATES,
) -> Dict[str, Any]:
return ActionPlanHygieneService(stale_hours=stale_hours, max_updates=max_updates).run()
__all__ = [
"ActionPlanHygieneService",
"build_action_plan_hygiene_preview",
"run_action_plan_hygiene",
]