#!/usr/bin/env python3 """Classify sustained host load and emit a controlled automation packet. The controller is intentionally read-only by default. It turns HostLoadAverageSustainedHigh from a generic "SSH and look around" alert into a deterministic AI Agent control packet: * orphan browser/smoke load -> gated SIGTERM helper dry-run, then controlled apply with evidence and post-apply verifier * active Gitea Actions/BuildKit load -> runner pressure stays fail-closed; drain/cancel decisions must use runner/CD verifiers, not process kills * unknown or critical pressure -> source-specific playbook or break-glass It never reads secrets, raw runner registrations, sessions, or environment files, and it never mutates host state. """ from __future__ import annotations import argparse import json import re from pathlib import Path from typing import Any DEFAULT_METRICS_FILE = Path("/home/wooo/node_exporter_textfiles/host_runaway_process.prom") DEFAULT_DOCKER_STATS_FILE = Path("/home/wooo/node_exporter_textfiles/docker_stats.prom") SCHEMA_VERSION = "host_sustained_load_controlled_automation_v1" LABEL_RE = re.compile(r"(?P[A-Za-z_][A-Za-z0-9_]*)=\"(?P(?:[^\"\\\\]|\\\\.)*)\"") METRIC_RE = re.compile( r"^(?P[A-Za-z_:][A-Za-z0-9_:]*)(?:\{(?P[^}]*)\})?\s+" r"(?P[-+]?(?:\d+(?:\.\d*)?|\.\d+)(?:[eE][-+]?\d+)?)$" ) def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Build a controlled AI Agent packet for sustained host load." ) parser.add_argument("--host", default="110") parser.add_argument("--metrics-file", type=Path, default=DEFAULT_METRICS_FILE) parser.add_argument("--docker-stats-file", type=Path, default=DEFAULT_DOCKER_STATS_FILE) parser.add_argument("--load5-per-core-threshold", type=float, default=1.5) parser.add_argument("--ci-stale-age-seconds", type=int, default=1800) parser.add_argument("--json", action="store_true", help="Print JSON only.") return parser.parse_args() def _unescape_label(value: str) -> str: return value.replace(r"\"", '"').replace(r"\\", "\\").replace(r"\n", "\n") def parse_prometheus_text(text: str) -> list[dict[str, Any]]: samples: list[dict[str, Any]] = [] for raw_line in text.splitlines(): line = raw_line.strip() if not line or line.startswith("#"): continue match = METRIC_RE.match(line) if not match: continue labels = { item.group("key"): _unescape_label(item.group("value")) for item in LABEL_RE.finditer(match.group("labels") or "") } samples.append( { "name": match.group("name"), "labels": labels, "value": float(match.group("value")), } ) return samples def _sample_value( samples: list[dict[str, Any]], name: str, *, host: str, labels: dict[str, str] | None = None, default: float = 0.0, ) -> float: expected = {"host": host, **(labels or {})} for sample in samples: if sample["name"] != name: continue sample_labels = sample["labels"] if all(sample_labels.get(key) == value for key, value in expected.items()): return float(sample["value"]) return default def _rule_values(samples: list[dict[str, Any]], name: str, *, host: str) -> list[dict[str, Any]]: values = [] for sample in samples: if sample["name"] != name: continue labels = sample["labels"] if labels.get("host") != host: continue rule = labels.get("rule") if not rule: continue values.append({"rule": rule, "value": float(sample["value"])}) return values def _top_orphan_rule(samples: list[dict[str, Any]], *, host: str) -> dict[str, Any] | None: counts = _rule_values( samples, "awoooi_host_runaway_browser_orphan_group_count", host=host, ) cpu_by_rule = { item["rule"]: item["value"] for item in _rule_values( samples, "awoooi_host_runaway_browser_orphan_cpu_percent", host=host, ) } candidates = [ { "rule": item["rule"], "group_count": int(item["value"]), "cpu_percent": round(cpu_by_rule.get(item["rule"], 0.0), 3), } for item in counts if item["value"] > 0 ] if not candidates: return None return sorted(candidates, key=lambda item: (-item["cpu_percent"], item["rule"]))[0] def _top_container_cpu(samples: list[dict[str, Any]], *, host: str) -> dict[str, Any] | None: candidates = [] for sample in samples: if sample["name"] != "docker_container_cpu_cores": continue labels = sample["labels"] if labels.get("host", host) != host: continue candidates.append( { "container_name": labels.get("container_name") or labels.get("name") or "unknown", "cpu_cores": round(float(sample["value"]), 6), } ) if not candidates: return None return sorted(candidates, key=lambda item: (-item["cpu_cores"], item["container_name"]))[0] def build_packet( *, host: str, samples: list[dict[str, Any]], docker_samples: list[dict[str, Any]], load5_per_core_threshold: float, ci_stale_age_seconds: int, ) -> dict[str, Any]: monitor_up = int( _sample_value( samples, "awoooi_host_runaway_process_monitor_up", host=host, labels={"mode": "read_only"}, default=0, ) ) load5_per_core = _sample_value(samples, "awoooi_host_load5_per_core", host=host) swap_used_ratio = _sample_value(samples, "awoooi_host_swap_used_ratio", host=host) remediation_authorized = int( _sample_value( samples, "awoooi_host_runaway_process_remediation_authorized", host=host, ) ) active_ci_containers = int( _sample_value( samples, "awoooi_host_gitea_actions_active_container_count", host=host, default=0, ) ) active_ci_groups = int( _sample_value( samples, "awoooi_host_gitea_actions_active_process_group_count", host=host, default=0, ) ) active_ci_cpu = _sample_value( samples, "awoooi_host_gitea_actions_active_process_cpu_percent", host=host, ) active_ci_oldest_age = int( _sample_value( samples, "awoooi_host_gitea_actions_active_process_oldest_age_seconds", host=host, ) ) top_orphan = _top_orphan_rule(samples, host=host) top_container = _top_container_cpu(docker_samples, host=host) top_container_name = str((top_container or {}).get("container_name") or "").lower() top_container_cpu = float((top_container or {}).get("cpu_cores") or 0.0) classification = "observing_load_within_threshold" severity = "info" controlled_apply_allowed = False next_action = "keep_read_only_monitoring" dry_run_command = "" controlled_apply_command = "" verifier_command = ( "scripts/ops/host-sustained-load-controller.py " f"--host {host} --metrics-file {DEFAULT_METRICS_FILE}" ) if monitor_up != 1: classification = "blocked_monitor_unavailable" severity = "warning" next_action = "restore_host_runaway_process_exporter_textfile_before_apply" elif remediation_authorized > 0: classification = "blocked_monitor_authority_violation" severity = "critical" next_action = "rollback_monitor_to_read_only_exporter" elif load5_per_core > load5_per_core_threshold and top_orphan: classification = "controlled_orphan_browser_remediation_ready" severity = "critical" controlled_apply_allowed = True rule = top_orphan["rule"] dry_run_command = f"scripts/ops/host-runaway-process-remediation.py --rule {rule}" controlled_apply_command = ( "scripts/ops/host-runaway-process-remediation.py " f"--rule {rule} --apply --confirm-apply " "--controlled-apply-id ${CONTROLLED_APPLY_ID} " "--evidence-ref ${EVIDENCE_REF} " "--post-apply-verifier " "'scripts/ops/host-sustained-load-controller.py --host " f"{host} --metrics-file {DEFAULT_METRICS_FILE}' " "--wait-seconds 10" ) next_action = "run_orphan_browser_remediation_dry_run_then_controlled_sigterm" elif ( load5_per_core > load5_per_core_threshold and (active_ci_containers > 0 or active_ci_groups > 0) ): classification = "controlled_ci_runner_saturation_guarded" severity = "critical" if active_ci_oldest_age >= ci_stale_age_seconds else "warning" controlled_apply_allowed = active_ci_oldest_age >= ci_stale_age_seconds dry_run_command = ( "ops/runner/read-public-gitea-actions-queue.py --json " "&& ops/runner/check-awoooi-non110-runner-readiness.sh" ) controlled_apply_command = ( "keep_110_runner_pressure_gate_fail_closed; " "only cancel/drain stale Gitea Actions through runner verifier packet" ) next_action = ( "prepare_runner_drain_or_cancel_packet_without_process_kill" if controlled_apply_allowed else "keep_pressure_gate_fail_closed_until_ci_load_clears" ) elif ( load5_per_core > load5_per_core_threshold and top_container_name == "gitea" and top_container_cpu >= 2.0 ): classification = "blocked_gitea_queue_or_hook_backlog_requires_playbook" severity = "critical" dry_run_command = ( "scripts/ops/host-sustained-load-evidence.py " f"--host {host} --metrics-file {DEFAULT_METRICS_FILE} " f"--docker-stats-file {DEFAULT_DOCKER_STATS_FILE} --json" ) next_action = "run_gitea_queue_or_hook_backlog_playbook_check_mode" elif load5_per_core > load5_per_core_threshold and swap_used_ratio >= 0.85: classification = "blocked_memory_or_swap_pressure_requires_service_playbook" severity = "critical" next_action = "route_to_service_specific_memory_pressure_playbook" elif load5_per_core > load5_per_core_threshold: classification = "blocked_unknown_sustained_load_requires_source_specific_playbook" severity = "critical" dry_run_command = ( "scripts/ops/host-sustained-load-evidence.py " f"--host {host} --metrics-file {DEFAULT_METRICS_FILE} " "--docker-stats-file /home/wooo/node_exporter_textfiles/docker_stats.prom " "--json" ) next_action = "collect_sanitized_top_process_and_container_stats_then_select_playbook" return { "schema_version": SCHEMA_VERSION, "host": host, "mode": "read_only_control_packet", "classification": classification, "severity": severity, "controlled_apply_allowed": controlled_apply_allowed, "next_action": next_action, "readback": { "monitor_up": monitor_up, "load5_per_core": round(load5_per_core, 6), "load5_per_core_threshold": load5_per_core_threshold, "swap_used_ratio": round(swap_used_ratio, 6), "remediation_authorized": remediation_authorized, "active_ci_container_count": active_ci_containers, "active_ci_process_group_count": active_ci_groups, "active_ci_process_cpu_percent": round(active_ci_cpu, 3), "active_ci_oldest_age_seconds": active_ci_oldest_age, "top_orphan_rule": top_orphan, "top_container_cpu": top_container, }, "commands": { "dry_run": dry_run_command, "controlled_apply": controlled_apply_command, "post_apply_verifier": verifier_command, "rollback": "send SIGTERM only; no persistent host mutation. Re-run workload if needed.", }, "operation_boundaries": { "secret_value_read": False, "raw_session_read": False, "raw_runner_registration_read": False, "host_write_performed": False, "process_signal_performed": False, "docker_restart_allowed": False, "systemd_restart_allowed": False, "firewall_change_allowed": False, "critical_break_glass_required": True, }, "forbidden_actions": [ "SIGKILL", "docker_restart", "systemctl_restart", "nginx_reload", "firewall_change", "kubectl_action", "secret_read", "legacy_or_generic_runner_restore", ], } def main() -> int: args = parse_args() try: text = args.metrics_file.read_text(encoding="utf-8") samples = parse_prometheus_text(text) except FileNotFoundError: samples = [] try: docker_text = args.docker_stats_file.read_text(encoding="utf-8") docker_samples = parse_prometheus_text(docker_text) except FileNotFoundError: docker_samples = [] packet = build_packet( host=args.host, samples=samples, docker_samples=docker_samples, load5_per_core_threshold=args.load5_per_core_threshold, ci_stale_age_seconds=args.ci_stale_age_seconds, ) if args.json: print(json.dumps(packet, ensure_ascii=False, indent=2, sort_keys=True)) else: print(f"status={packet['classification']}") print(f"controlled_apply_allowed={str(packet['controlled_apply_allowed']).lower()}") print(f"next_action={packet['next_action']}") if packet["commands"]["dry_run"]: print(f"dry_run_command={packet['commands']['dry_run']}") if packet["commands"]["controlled_apply"]: print(f"controlled_apply_command={packet['commands']['controlled_apply']}") print(f"post_apply_verifier={packet['commands']['post_apply_verifier']}") return 0 if not packet["classification"].startswith("blocked_") else 75 if __name__ == "__main__": raise SystemExit(main())