Files
awoooi/scripts/ops/host-runaway-process-remediation.py
Your Name a6dc806d38
Some checks failed
CD Pipeline / workflow-shape (push) Successful in 0s
CD Pipeline / cancel-stale-cd (push) Has been skipped
CD Pipeline / tests (push) Failing after 28s
CD Pipeline / build-and-deploy (push) Has been skipped
CD Pipeline / post-deploy-checks (push) Has been skipped
feat(agent): automate sustained host load response
2026-07-01 08:43:40 +08:00

198 lines
7.4 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Gated remediation helper for AWOOOI host runaway process groups.
Default mode is dry-run. Applying SIGTERM requires an explicit controlled apply
receipt, evidence reference, post-apply verifier, and --confirm-apply. Owner and
maintenance-window identifiers are accepted as optional evidence, but they are
not the default gate for allowlisted low-blast-radius orphan browser cleanup.
This script is a PlayBook primitive, not a background auto-kill daemon.
"""
from __future__ import annotations
import argparse
import importlib.util
import json
import os
import signal
import sys
import time
from pathlib import Path
from types import ModuleType
from typing import Any
EXPORTER_PATH = Path(__file__).with_name("host-runaway-process-exporter.py")
def load_exporter() -> ModuleType:
spec = importlib.util.spec_from_file_location("host_runaway_process_exporter", EXPORTER_PATH)
if spec is None or spec.loader is None:
raise RuntimeError(f"cannot load exporter module: {EXPORTER_PATH}")
module = importlib.util.module_from_spec(spec)
sys.modules[spec.name] = module
spec.loader.exec_module(module)
return module
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Dry-run or gated SIGTERM for AWOOOI runaway process groups.")
parser.add_argument("--host", default=os.environ.get("AIOPS_HOST_LABEL", os.uname().nodename))
parser.add_argument("--rule", help="Limit candidates to one rule id. Required with --apply.")
parser.add_argument("--ps-file", type=Path, help="Use a fixture ps file for tests or offline review.")
parser.add_argument("--min-age-seconds", type=int, default=1800)
parser.add_argument("--min-cpu-percent", type=float, default=50)
parser.add_argument("--apply", action="store_true", help="Send SIGTERM to matching process groups.")
parser.add_argument("--confirm-apply", action="store_true", help="Required together with --apply.")
parser.add_argument("--controlled-apply-id", default="")
parser.add_argument("--owner-approval-id", default="")
parser.add_argument("--maintenance-window-id", default="")
parser.add_argument("--evidence-ref", default="")
parser.add_argument("--post-apply-verifier", default="")
parser.add_argument("--wait-seconds", type=int, default=0, help="Optional wait after SIGTERM before re-reading ps.")
return parser.parse_args()
def validate_apply_args(args: argparse.Namespace) -> None:
if not args.apply:
return
missing = []
if not args.confirm_apply:
missing.append("--confirm-apply")
if not args.rule:
missing.append("--rule")
if not args.controlled_apply_id:
missing.append("--controlled-apply-id")
if not args.evidence_ref:
missing.append("--evidence-ref")
if not args.post_apply_verifier:
missing.append("--post-apply-verifier")
if missing:
raise SystemExit(
"Refusing apply; missing required gates: "
+ ", ".join(missing)
+ ". Use dry-run output for the controlled PlayBook packet first."
)
def current_process_group() -> int:
try:
return os.getpgrp()
except Exception:
return -1
def main() -> None:
args = parse_args()
validate_apply_args(args)
exporter = load_exporter()
rows = exporter.parse_ps_rows(exporter.read_ps_text(args.ps_file))
groups = exporter.classify_groups(
rows,
min_age_seconds=args.min_age_seconds,
min_cpu_percent=args.min_cpu_percent,
)
if args.rule:
groups = [group for group in groups if group.rule_id == args.rule]
own_pgrp = current_process_group()
candidates = []
for group in groups:
blocked_reason = None
if group.pgid <= 1:
blocked_reason = "unsafe_pgid"
elif group.pgid == own_pgrp:
blocked_reason = "own_process_group"
candidates.append(
{
"rule": group.rule_id,
"pgid": group.pgid,
"process_count": len(group.rows),
"cpu_percent": round(group.cpu_percent, 3),
"oldest_age_seconds": group.oldest_age_seconds,
"orphan_reason": group.orphan_reason,
"sample_comm": group.sample_comm,
"blocked_reason": blocked_reason,
"action": "skip" if blocked_reason else ("sigterm" if args.apply else "dry_run"),
}
)
signaled: list[int] = []
missing_process_groups: list[int] = []
signal_errors: list[dict[str, Any]] = []
if args.apply:
for candidate in candidates:
if candidate["blocked_reason"]:
continue
pgid = int(candidate["pgid"])
try:
os.killpg(pgid, signal.SIGTERM)
except ProcessLookupError:
candidate["action"] = "already_exited"
candidate["blocked_reason"] = "process_group_missing_at_apply"
missing_process_groups.append(pgid)
continue
except PermissionError as exc:
candidate["action"] = "signal_failed"
candidate["blocked_reason"] = "permission_denied"
signal_errors.append(
{
"pgid": pgid,
"error": exc.__class__.__name__,
}
)
continue
signaled.append(pgid)
remaining_after_wait = None
if args.apply and args.wait_seconds > 0:
time.sleep(args.wait_seconds)
fresh_rows = exporter.parse_ps_rows(exporter.read_ps_text(args.ps_file))
fresh_groups = exporter.classify_groups(
fresh_rows,
min_age_seconds=args.min_age_seconds,
min_cpu_percent=args.min_cpu_percent,
)
remaining_after_wait = [
group.pgid for group in fresh_groups if not args.rule or group.rule_id == args.rule
]
payload = {
"schema_version": "host_runaway_process_remediation_v1",
"host": args.host,
"mode": "apply_sigterm" if args.apply else "dry_run",
"runtime_gate": 1 if args.apply else 0,
"controlled_apply_id": args.controlled_apply_id if args.apply else None,
"owner_approval_id": args.owner_approval_id if args.apply else None,
"maintenance_window_id": args.maintenance_window_id if args.apply else None,
"evidence_ref": args.evidence_ref if args.apply else None,
"post_apply_verifier": args.post_apply_verifier if args.apply else None,
"min_age_seconds": args.min_age_seconds,
"min_cpu_percent": args.min_cpu_percent,
"candidate_count": len(candidates),
"signaled_process_group_count": len(signaled),
"signaled_process_groups": signaled,
"missing_process_group_count": len(missing_process_groups),
"missing_process_groups": missing_process_groups,
"signal_error_count": len(signal_errors),
"signal_errors": signal_errors,
"remaining_after_wait": remaining_after_wait,
"candidates": candidates,
"forbidden_without_gates": [
"sigkill",
"docker_restart",
"systemctl_restart",
"nginx_reload",
"firewall_change",
"secret_collection",
],
}
print(json.dumps(payload, ensure_ascii=False, indent=2, sort_keys=True))
if signal_errors:
raise SystemExit(75)
if __name__ == "__main__":
main()