feat(agent): automate sustained host load response
Some checks failed
CD Pipeline / workflow-shape (push) Successful in 0s
CD Pipeline / cancel-stale-cd (push) Has been skipped
CD Pipeline / tests (push) Failing after 28s
CD Pipeline / build-and-deploy (push) Has been skipped
CD Pipeline / post-deploy-checks (push) Has been skipped

This commit is contained in:
Your Name
2026-07-01 08:43:40 +08:00
parent 5e629efa44
commit a6dc806d38
10 changed files with 1285 additions and 40 deletions

View File

@@ -2,9 +2,11 @@
"""
Gated remediation helper for AWOOOI host runaway process groups.
Default mode is dry-run. Applying SIGTERM requires explicit owner approval,
maintenance window, evidence reference, and --confirm-apply. This script is a
PlayBook primitive, not a background auto-kill daemon.
Default mode is dry-run. Applying SIGTERM requires an explicit controlled apply
receipt, evidence reference, post-apply verifier, and --confirm-apply. Owner and
maintenance-window identifiers are accepted as optional evidence, but they are
not the default gate for allowlisted low-blast-radius orphan browser cleanup.
This script is a PlayBook primitive, not a background auto-kill daemon.
"""
from __future__ import annotations
@@ -18,6 +20,7 @@ import sys
import time
from pathlib import Path
from types import ModuleType
from typing import Any
EXPORTER_PATH = Path(__file__).with_name("host-runaway-process-exporter.py")
@@ -42,9 +45,11 @@ def parse_args() -> argparse.Namespace:
parser.add_argument("--min-cpu-percent", type=float, default=50)
parser.add_argument("--apply", action="store_true", help="Send SIGTERM to matching process groups.")
parser.add_argument("--confirm-apply", action="store_true", help="Required together with --apply.")
parser.add_argument("--controlled-apply-id", default="")
parser.add_argument("--owner-approval-id", default="")
parser.add_argument("--maintenance-window-id", default="")
parser.add_argument("--evidence-ref", default="")
parser.add_argument("--post-apply-verifier", default="")
parser.add_argument("--wait-seconds", type=int, default=0, help="Optional wait after SIGTERM before re-reading ps.")
return parser.parse_args()
@@ -57,17 +62,17 @@ def validate_apply_args(args: argparse.Namespace) -> None:
missing.append("--confirm-apply")
if not args.rule:
missing.append("--rule")
if not args.owner_approval_id:
missing.append("--owner-approval-id")
if not args.maintenance_window_id:
missing.append("--maintenance-window-id")
if not args.controlled_apply_id:
missing.append("--controlled-apply-id")
if not args.evidence_ref:
missing.append("--evidence-ref")
if not args.post_apply_verifier:
missing.append("--post-apply-verifier")
if missing:
raise SystemExit(
"Refusing apply; missing required gates: "
+ ", ".join(missing)
+ ". Use dry-run output for the PlayBook packet first."
+ ". Use dry-run output for the controlled PlayBook packet first."
)
@@ -114,12 +119,31 @@ def main() -> None:
)
signaled: list[int] = []
missing_process_groups: list[int] = []
signal_errors: list[dict[str, Any]] = []
if args.apply:
for candidate in candidates:
if candidate["blocked_reason"]:
continue
os.killpg(int(candidate["pgid"]), signal.SIGTERM)
signaled.append(int(candidate["pgid"]))
pgid = int(candidate["pgid"])
try:
os.killpg(pgid, signal.SIGTERM)
except ProcessLookupError:
candidate["action"] = "already_exited"
candidate["blocked_reason"] = "process_group_missing_at_apply"
missing_process_groups.append(pgid)
continue
except PermissionError as exc:
candidate["action"] = "signal_failed"
candidate["blocked_reason"] = "permission_denied"
signal_errors.append(
{
"pgid": pgid,
"error": exc.__class__.__name__,
}
)
continue
signaled.append(pgid)
remaining_after_wait = None
if args.apply and args.wait_seconds > 0:
@@ -139,14 +163,20 @@ def main() -> None:
"host": args.host,
"mode": "apply_sigterm" if args.apply else "dry_run",
"runtime_gate": 1 if args.apply else 0,
"controlled_apply_id": args.controlled_apply_id if args.apply else None,
"owner_approval_id": args.owner_approval_id if args.apply else None,
"maintenance_window_id": args.maintenance_window_id if args.apply else None,
"evidence_ref": args.evidence_ref if args.apply else None,
"post_apply_verifier": args.post_apply_verifier if args.apply else None,
"min_age_seconds": args.min_age_seconds,
"min_cpu_percent": args.min_cpu_percent,
"candidate_count": len(candidates),
"signaled_process_group_count": len(signaled),
"signaled_process_groups": signaled,
"missing_process_group_count": len(missing_process_groups),
"missing_process_groups": missing_process_groups,
"signal_error_count": len(signal_errors),
"signal_errors": signal_errors,
"remaining_after_wait": remaining_after_wait,
"candidates": candidates,
"forbidden_without_gates": [
@@ -159,6 +189,8 @@ def main() -> None:
],
}
print(json.dumps(payload, ensure_ascii=False, indent=2, sort_keys=True))
if signal_errors:
raise SystemExit(75)
if __name__ == "__main__":

View File

@@ -0,0 +1,340 @@
#!/usr/bin/env python3
"""Classify sustained host load and emit a controlled automation packet.
The controller is intentionally read-only by default. It turns
HostLoadAverageSustainedHigh from a generic "SSH and look around" alert into a
deterministic AI Agent control packet:
* orphan browser/smoke load -> gated SIGTERM helper dry-run, then controlled
apply with evidence and post-apply verifier
* active Gitea Actions/BuildKit load -> runner pressure stays fail-closed;
drain/cancel decisions must use runner/CD verifiers, not process kills
* unknown or critical pressure -> source-specific playbook or break-glass
It never reads secrets, raw runner registrations, sessions, or environment
files, and it never mutates host state.
"""
from __future__ import annotations
import argparse
import json
import re
from pathlib import Path
from typing import Any
DEFAULT_METRICS_FILE = Path("/home/wooo/node_exporter_textfiles/host_runaway_process.prom")
SCHEMA_VERSION = "host_sustained_load_controlled_automation_v1"
LABEL_RE = re.compile(r"(?P<key>[A-Za-z_][A-Za-z0-9_]*)=\"(?P<value>(?:[^\"\\\\]|\\\\.)*)\"")
METRIC_RE = re.compile(
r"^(?P<name>[A-Za-z_:][A-Za-z0-9_:]*)(?:\{(?P<labels>[^}]*)\})?\s+"
r"(?P<value>[-+]?(?:\d+(?:\.\d*)?|\.\d+)(?:[eE][-+]?\d+)?)$"
)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Build a controlled AI Agent packet for sustained host load."
)
parser.add_argument("--host", default="110")
parser.add_argument("--metrics-file", type=Path, default=DEFAULT_METRICS_FILE)
parser.add_argument("--load5-per-core-threshold", type=float, default=1.5)
parser.add_argument("--ci-stale-age-seconds", type=int, default=1800)
parser.add_argument("--json", action="store_true", help="Print JSON only.")
return parser.parse_args()
def _unescape_label(value: str) -> str:
return value.replace(r"\"", '"').replace(r"\\", "\\").replace(r"\n", "\n")
def parse_prometheus_text(text: str) -> list[dict[str, Any]]:
samples: list[dict[str, Any]] = []
for raw_line in text.splitlines():
line = raw_line.strip()
if not line or line.startswith("#"):
continue
match = METRIC_RE.match(line)
if not match:
continue
labels = {
item.group("key"): _unescape_label(item.group("value"))
for item in LABEL_RE.finditer(match.group("labels") or "")
}
samples.append(
{
"name": match.group("name"),
"labels": labels,
"value": float(match.group("value")),
}
)
return samples
def _sample_value(
samples: list[dict[str, Any]],
name: str,
*,
host: str,
labels: dict[str, str] | None = None,
default: float = 0.0,
) -> float:
expected = {"host": host, **(labels or {})}
for sample in samples:
if sample["name"] != name:
continue
sample_labels = sample["labels"]
if all(sample_labels.get(key) == value for key, value in expected.items()):
return float(sample["value"])
return default
def _rule_values(samples: list[dict[str, Any]], name: str, *, host: str) -> list[dict[str, Any]]:
values = []
for sample in samples:
if sample["name"] != name:
continue
labels = sample["labels"]
if labels.get("host") != host:
continue
rule = labels.get("rule")
if not rule:
continue
values.append({"rule": rule, "value": float(sample["value"])})
return values
def _top_orphan_rule(samples: list[dict[str, Any]], *, host: str) -> dict[str, Any] | None:
counts = _rule_values(
samples,
"awoooi_host_runaway_browser_orphan_group_count",
host=host,
)
cpu_by_rule = {
item["rule"]: item["value"]
for item in _rule_values(
samples,
"awoooi_host_runaway_browser_orphan_cpu_percent",
host=host,
)
}
candidates = [
{
"rule": item["rule"],
"group_count": int(item["value"]),
"cpu_percent": round(cpu_by_rule.get(item["rule"], 0.0), 3),
}
for item in counts
if item["value"] > 0
]
if not candidates:
return None
return sorted(candidates, key=lambda item: (-item["cpu_percent"], item["rule"]))[0]
def build_packet(
*,
host: str,
samples: list[dict[str, Any]],
load5_per_core_threshold: float,
ci_stale_age_seconds: int,
) -> dict[str, Any]:
monitor_up = int(
_sample_value(
samples,
"awoooi_host_runaway_process_monitor_up",
host=host,
labels={"mode": "read_only"},
default=0,
)
)
load5_per_core = _sample_value(samples, "awoooi_host_load5_per_core", host=host)
swap_used_ratio = _sample_value(samples, "awoooi_host_swap_used_ratio", host=host)
remediation_authorized = int(
_sample_value(
samples,
"awoooi_host_runaway_process_remediation_authorized",
host=host,
)
)
active_ci_containers = int(
_sample_value(
samples,
"awoooi_host_gitea_actions_active_container_count",
host=host,
default=0,
)
)
active_ci_groups = int(
_sample_value(
samples,
"awoooi_host_gitea_actions_active_process_group_count",
host=host,
default=0,
)
)
active_ci_cpu = _sample_value(
samples,
"awoooi_host_gitea_actions_active_process_cpu_percent",
host=host,
)
active_ci_oldest_age = int(
_sample_value(
samples,
"awoooi_host_gitea_actions_active_process_oldest_age_seconds",
host=host,
)
)
top_orphan = _top_orphan_rule(samples, host=host)
classification = "observing_load_within_threshold"
severity = "info"
controlled_apply_allowed = False
next_action = "keep_read_only_monitoring"
dry_run_command = ""
controlled_apply_command = ""
verifier_command = (
"scripts/ops/host-sustained-load-controller.py "
f"--host {host} --metrics-file {DEFAULT_METRICS_FILE}"
)
if monitor_up != 1:
classification = "blocked_monitor_unavailable"
severity = "warning"
next_action = "restore_host_runaway_process_exporter_textfile_before_apply"
elif remediation_authorized > 0:
classification = "blocked_monitor_authority_violation"
severity = "critical"
next_action = "rollback_monitor_to_read_only_exporter"
elif load5_per_core > load5_per_core_threshold and top_orphan:
classification = "controlled_orphan_browser_remediation_ready"
severity = "critical"
controlled_apply_allowed = True
rule = top_orphan["rule"]
dry_run_command = f"scripts/ops/host-runaway-process-remediation.py --rule {rule}"
controlled_apply_command = (
"scripts/ops/host-runaway-process-remediation.py "
f"--rule {rule} --apply --confirm-apply "
"--controlled-apply-id ${CONTROLLED_APPLY_ID} "
"--evidence-ref ${EVIDENCE_REF} "
"--post-apply-verifier "
"'scripts/ops/host-sustained-load-controller.py --host "
f"{host} --metrics-file {DEFAULT_METRICS_FILE}' "
"--wait-seconds 10"
)
next_action = "run_orphan_browser_remediation_dry_run_then_controlled_sigterm"
elif (
load5_per_core > load5_per_core_threshold
and (active_ci_containers > 0 or active_ci_groups > 0)
):
classification = "controlled_ci_runner_saturation_guarded"
severity = "critical" if active_ci_oldest_age >= ci_stale_age_seconds else "warning"
controlled_apply_allowed = active_ci_oldest_age >= ci_stale_age_seconds
dry_run_command = (
"ops/runner/read-public-gitea-actions-queue.py --json "
"&& ops/runner/check-awoooi-non110-runner-readiness.sh"
)
controlled_apply_command = (
"keep_110_runner_pressure_gate_fail_closed; "
"only cancel/drain stale Gitea Actions through runner verifier packet"
)
next_action = (
"prepare_runner_drain_or_cancel_packet_without_process_kill"
if controlled_apply_allowed
else "keep_pressure_gate_fail_closed_until_ci_load_clears"
)
elif load5_per_core > load5_per_core_threshold and swap_used_ratio >= 0.85:
classification = "blocked_memory_or_swap_pressure_requires_service_playbook"
severity = "critical"
next_action = "route_to_service_specific_memory_pressure_playbook"
elif load5_per_core > load5_per_core_threshold:
classification = "blocked_unknown_sustained_load_requires_source_specific_playbook"
severity = "critical"
dry_run_command = (
"scripts/ops/host-sustained-load-evidence.py "
f"--host {host} --metrics-file {DEFAULT_METRICS_FILE} "
"--docker-stats-file /home/wooo/node_exporter_textfiles/docker_stats.prom "
"--json"
)
next_action = "collect_sanitized_top_process_and_container_stats_then_select_playbook"
return {
"schema_version": SCHEMA_VERSION,
"host": host,
"mode": "read_only_control_packet",
"classification": classification,
"severity": severity,
"controlled_apply_allowed": controlled_apply_allowed,
"next_action": next_action,
"readback": {
"monitor_up": monitor_up,
"load5_per_core": round(load5_per_core, 6),
"load5_per_core_threshold": load5_per_core_threshold,
"swap_used_ratio": round(swap_used_ratio, 6),
"remediation_authorized": remediation_authorized,
"active_ci_container_count": active_ci_containers,
"active_ci_process_group_count": active_ci_groups,
"active_ci_process_cpu_percent": round(active_ci_cpu, 3),
"active_ci_oldest_age_seconds": active_ci_oldest_age,
"top_orphan_rule": top_orphan,
},
"commands": {
"dry_run": dry_run_command,
"controlled_apply": controlled_apply_command,
"post_apply_verifier": verifier_command,
"rollback": "send SIGTERM only; no persistent host mutation. Re-run workload if needed.",
},
"operation_boundaries": {
"secret_value_read": False,
"raw_session_read": False,
"raw_runner_registration_read": False,
"host_write_performed": False,
"process_signal_performed": False,
"docker_restart_allowed": False,
"systemd_restart_allowed": False,
"firewall_change_allowed": False,
"critical_break_glass_required": True,
},
"forbidden_actions": [
"SIGKILL",
"docker_restart",
"systemctl_restart",
"nginx_reload",
"firewall_change",
"kubectl_action",
"secret_read",
"legacy_or_generic_runner_restore",
],
}
def main() -> int:
args = parse_args()
try:
text = args.metrics_file.read_text(encoding="utf-8")
samples = parse_prometheus_text(text)
except FileNotFoundError:
samples = []
packet = build_packet(
host=args.host,
samples=samples,
load5_per_core_threshold=args.load5_per_core_threshold,
ci_stale_age_seconds=args.ci_stale_age_seconds,
)
if args.json:
print(json.dumps(packet, ensure_ascii=False, indent=2, sort_keys=True))
else:
print(f"status={packet['classification']}")
print(f"controlled_apply_allowed={str(packet['controlled_apply_allowed']).lower()}")
print(f"next_action={packet['next_action']}")
if packet["commands"]["dry_run"]:
print(f"dry_run_command={packet['commands']['dry_run']}")
if packet["commands"]["controlled_apply"]:
print(f"controlled_apply_command={packet['commands']['controlled_apply']}")
print(f"post_apply_verifier={packet['commands']['post_apply_verifier']}")
return 0 if not packet["classification"].startswith("blocked_") else 75
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,287 @@
#!/usr/bin/env python3
"""Build sanitized evidence for unknown sustained host load.
This collector is read-only. It intentionally emits process families and
container names instead of raw command lines so CPU-pressure alerts can proceed
to a source-specific PlayBook without leaking workspace paths, URLs, JSON
payloads, or secrets.
"""
from __future__ import annotations
import argparse
import json
import os
import re
import subprocess
from pathlib import Path
from typing import Any
DEFAULT_HOST_METRICS_FILE = Path("/home/wooo/node_exporter_textfiles/host_runaway_process.prom")
DEFAULT_DOCKER_STATS_FILE = Path("/home/wooo/node_exporter_textfiles/docker_stats.prom")
SCHEMA_VERSION = "host_sustained_load_sanitized_evidence_v1"
LABEL_RE = re.compile(r"(?P<key>[A-Za-z_][A-Za-z0-9_]*)=\"(?P<value>(?:[^\"\\\\]|\\\\.)*)\"")
METRIC_RE = re.compile(
r"^(?P<name>[A-Za-z_:][A-Za-z0-9_:]*)(?:\{(?P<labels>[^}]*)\})?\s+"
r"(?P<value>[-+]?(?:\d+(?:\.\d*)?|\.\d+)(?:[eE][-+]?\d+)?)$"
)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Collect sanitized sustained-load evidence.")
parser.add_argument("--host", default=os.environ.get("AIOPS_HOST_LABEL", "110"))
parser.add_argument("--metrics-file", type=Path, default=DEFAULT_HOST_METRICS_FILE)
parser.add_argument("--docker-stats-file", type=Path, default=DEFAULT_DOCKER_STATS_FILE)
parser.add_argument("--ps-file", type=Path)
parser.add_argument("--top-n", type=int, default=8)
parser.add_argument("--json", action="store_true")
return parser.parse_args()
def _unescape_label(value: str) -> str:
return value.replace(r"\"", '"').replace(r"\\", "\\").replace(r"\n", "\n")
def parse_prometheus_text(text: str) -> list[dict[str, Any]]:
samples: list[dict[str, Any]] = []
for raw_line in text.splitlines():
line = raw_line.strip()
if not line or line.startswith("#"):
continue
match = METRIC_RE.match(line)
if not match:
continue
labels = {
item.group("key"): _unescape_label(item.group("value"))
for item in LABEL_RE.finditer(match.group("labels") or "")
}
samples.append(
{
"name": match.group("name"),
"labels": labels,
"value": float(match.group("value")),
}
)
return samples
def read_text(path: Path | None) -> str:
if path is None:
return ""
try:
return path.read_text(encoding="utf-8")
except FileNotFoundError:
return ""
def collect_ps_text(ps_file: Path | None) -> str:
if ps_file is not None:
return read_text(ps_file)
result = subprocess.run(
["ps", "-eo", "pid=,ppid=,pgid=,etimes=,pcpu=,pmem=,comm=,args="],
check=True,
capture_output=True,
text=True,
timeout=10,
)
return result.stdout
def parse_ps_text(text: str) -> list[dict[str, Any]]:
rows: list[dict[str, Any]] = []
for raw_line in text.splitlines():
line = raw_line.strip()
if not line:
continue
parts = line.split(None, 7)
if len(parts) < 7:
continue
pid, ppid, pgid, etimes, pcpu, pmem, comm = parts[:7]
args = parts[7] if len(parts) > 7 else comm
try:
rows.append(
{
"pid": int(pid),
"ppid": int(ppid),
"pgid": int(pgid),
"etimes": int(float(etimes)),
"cpu_percent": float(pcpu),
"mem_percent": float(pmem),
"comm": Path(comm).name[:48],
"family": classify_process_family(comm, args),
}
)
except ValueError:
continue
return rows
def classify_process_family(comm: str, args: str) -> str:
text = f"{comm} {args}".lower()
if "act_runner" in text or "gitea-actions-task" in text or "/.cache/act/" in text:
return "gitea_actions_runner"
if "docker build" in text or "buildx" in text or "buildkit" in text:
return "docker_build"
if "next build" in text or "turbo build" in text or "pnpm" in text and " build" in text:
return "web_build"
if "chrome" in text or "chromium" in text or "playwright" in text:
return "headless_browser"
if "gitea" in text:
return "gitea_service"
if "postgres" in text or "postmaster" in text:
return "postgres"
if "clickhouse" in text:
return "clickhouse"
if "kafka" in text:
return "kafka"
if "sentry" in text:
return "sentry"
if "systemctl" in text or "systemd" in text or "dbus" in text:
return "systemd_control_plane"
if "sshd" in text:
return "ssh_control_plane"
if "python" in text:
return "python_job"
if "node" in text:
return "node_service"
return "unknown"
def summarize_processes(rows: list[dict[str, Any]], *, top_n: int) -> dict[str, Any]:
top_rows = sorted(rows, key=lambda item: (-item["cpu_percent"], item["comm"], item["pid"]))[:top_n]
families: dict[str, dict[str, Any]] = {}
for row in rows:
family = row["family"]
current = families.setdefault(
family,
{
"family": family,
"process_count": 0,
"cpu_percent": 0.0,
"max_age_seconds": 0,
"sample_comm": "",
},
)
current["process_count"] += 1
current["cpu_percent"] += row["cpu_percent"]
current["max_age_seconds"] = max(current["max_age_seconds"], row["etimes"])
if not current["sample_comm"] or row["cpu_percent"] > current.get("_sample_cpu", -1):
current["sample_comm"] = row["comm"]
current["_sample_cpu"] = row["cpu_percent"]
family_rows = []
for item in families.values():
item.pop("_sample_cpu", None)
item["cpu_percent"] = round(float(item["cpu_percent"]), 3)
family_rows.append(item)
return {
"top_processes": [
{
"pid": row["pid"],
"ppid": row["ppid"],
"pgid": row["pgid"],
"cpu_percent": round(row["cpu_percent"], 3),
"mem_percent": round(row["mem_percent"], 3),
"age_seconds": row["etimes"],
"comm": row["comm"],
"family": row["family"],
}
for row in top_rows
],
"families": sorted(family_rows, key=lambda item: (-item["cpu_percent"], item["family"]))[:top_n],
}
def top_docker_containers(samples: list[dict[str, Any]], *, host: str, top_n: int) -> list[dict[str, Any]]:
rows = []
for sample in samples:
if sample["name"] != "docker_container_cpu_cores":
continue
labels = sample["labels"]
if labels.get("host", host) != host:
continue
rows.append(
{
"container_name": labels.get("container_name") or labels.get("name") or "unknown",
"cpu_cores": round(float(sample["value"]), 6),
}
)
return sorted(rows, key=lambda item: (-item["cpu_cores"], item["container_name"]))[:top_n]
def recommend_playbook(process_families: list[dict[str, Any]], containers: list[dict[str, Any]]) -> str:
top_container = containers[0] if containers else {}
top_container_name = str(top_container.get("container_name") or "").lower()
top_container_cpu = float(top_container.get("cpu_cores") or 0.0)
top_family = process_families[0] if process_families else {}
family = str(top_family.get("family") or "")
if "gitea" in top_container_name and top_container_cpu >= 2.0:
return "gitea_queue_or_hook_backlog_playbook"
if "postgres" in top_container_name or "postgres" in family:
return "postgres_hot_query_or_backup_export_playbook"
if family in {"docker_build", "web_build", "gitea_actions_runner"}:
return "build_or_runner_pressure_playbook"
if family in {"systemd_control_plane", "ssh_control_plane"}:
return "control_plane_saturation_playbook"
if family == "headless_browser":
return "orphan_browser_classification_refresh_playbook"
return "source_specific_playbook_required"
def build_payload(args: argparse.Namespace) -> dict[str, Any]:
host_samples = parse_prometheus_text(read_text(args.metrics_file))
docker_samples = parse_prometheus_text(read_text(args.docker_stats_file))
process_summary = summarize_processes(parse_ps_text(collect_ps_text(args.ps_file)), top_n=args.top_n)
containers = top_docker_containers(docker_samples, host=args.host, top_n=args.top_n)
recommendation = recommend_playbook(process_summary["families"], containers)
return {
"schema_version": SCHEMA_VERSION,
"host": args.host,
"mode": "read_only_sanitized_evidence",
"recommendation": recommendation,
"controlled_apply_allowed": False,
"next_action": "select_or_generate_source_specific_playbook_then_run_check_mode",
"readback": {
"host_metric_sample_count": len(host_samples),
"docker_metric_sample_count": len(docker_samples),
"top_container_count": len(containers),
"top_process_family_count": len(process_summary["families"]),
},
"top_containers": containers,
"top_process_families": process_summary["families"],
"top_processes_sanitized": process_summary["top_processes"],
"redaction": {
"raw_command_lines_emitted": False,
"workspace_paths_emitted": False,
"urls_emitted": False,
"secret_values_read": False,
},
"operation_boundaries": {
"host_write_performed": False,
"process_signal_performed": False,
"docker_restart_performed": False,
"systemd_restart_performed": False,
"raw_session_read": False,
"raw_runner_registration_read": False,
},
}
def main() -> int:
args = parse_args()
payload = build_payload(args)
if args.json:
print(json.dumps(payload, ensure_ascii=False, indent=2, sort_keys=True))
else:
print(f"recommendation={payload['recommendation']}")
print(f"controlled_apply_allowed={str(payload['controlled_apply_allowed']).lower()}")
print(f"next_action={payload['next_action']}")
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -1,6 +1,7 @@
from __future__ import annotations
import importlib.util
import json
import subprocess
import sys
from pathlib import Path
@@ -9,6 +10,8 @@ from pathlib import Path
SCRIPT_ROOT = Path(__file__).resolve().parents[1]
EXPORTER_PATH = SCRIPT_ROOT / "host-runaway-process-exporter.py"
REMEDIATION_PATH = SCRIPT_ROOT / "host-runaway-process-remediation.py"
CONTROLLER_PATH = SCRIPT_ROOT / "host-sustained-load-controller.py"
EVIDENCE_PATH = SCRIPT_ROOT / "host-sustained-load-evidence.py"
def load_exporter():
@@ -167,7 +170,7 @@ def test_ignores_the_host_pressure_gate_process_group() -> None:
def test_remediation_defaults_to_dry_run(tmp_path: Path) -> None:
ps_file = tmp_path / "ps.txt"
ps_file.write_text(
"100 1 100 100 7200 65.0 S chrome /opt/chrome/chrome --headless --user-data-dir=/tmp/stockplatform-review-bulk-ux-aa\n",
"999999 1 999999 999999 7200 65.0 S chrome /opt/chrome/chrome --headless --user-data-dir=/tmp/stockplatform-review-bulk-ux-aa\n",
encoding="utf-8",
)
@@ -193,7 +196,7 @@ def test_remediation_defaults_to_dry_run(tmp_path: Path) -> None:
def test_remediation_refuses_apply_without_gates(tmp_path: Path) -> None:
ps_file = tmp_path / "ps.txt"
ps_file.write_text(
"100 1 100 100 7200 65.0 S chrome /opt/chrome/chrome --headless --user-data-dir=/tmp/stockplatform-review-bulk-ux-aa\n",
"999999 1 999999 999999 7200 65.0 S chrome /opt/chrome/chrome --headless --user-data-dir=/tmp/stockplatform-review-bulk-ux-aa\n",
encoding="utf-8",
)
@@ -213,3 +216,329 @@ def test_remediation_refuses_apply_without_gates(tmp_path: Path) -> None:
assert result.returncode != 0
assert "Refusing apply" in result.stderr
assert "--controlled-apply-id" in result.stderr
assert "--confirm-apply" in result.stderr
assert "--post-apply-verifier" in result.stderr
def test_remediation_accepts_controlled_apply_gate_without_owner_gate(tmp_path: Path) -> None:
ps_file = tmp_path / "ps.txt"
ps_file.write_text(
"100 1 1 100 7200 65.0 S chrome /opt/chrome/chrome --headless --user-data-dir=/tmp/stockplatform-review-bulk-ux-aa\n",
encoding="utf-8",
)
result = subprocess.run(
[
sys.executable,
str(REMEDIATION_PATH),
"--ps-file",
str(ps_file),
"--apply",
"--confirm-apply",
"--rule",
"stockplatform_headless_smoke",
"--controlled-apply-id",
"CAP-20260701-HOSTLOAD",
"--evidence-ref",
"HostLoadAverageSustainedHigh:110",
"--post-apply-verifier",
"scripts/ops/host-sustained-load-controller.py --host 110 --json",
],
check=True,
capture_output=True,
text=True,
)
assert '"mode": "apply_sigterm"' in result.stdout
assert '"runtime_gate": 1' in result.stdout
assert '"controlled_apply_id": "CAP-20260701-HOSTLOAD"' in result.stdout
assert '"owner_approval_id": ""' in result.stdout
assert '"blocked_reason": "unsafe_pgid"' in result.stdout
assert '"missing_process_group_count": 0' in result.stdout
assert '"signal_error_count": 0' in result.stdout
assert '"signaled_process_group_count": 0' in result.stdout
def test_sustained_load_controller_routes_orphan_browser_to_controlled_remediation(tmp_path: Path) -> None:
metrics_file = tmp_path / "host.prom"
metrics_file.write_text(
"\n".join(
[
'awoooi_host_runaway_process_monitor_up{host="110",mode="read_only"} 1',
'awoooi_host_load5_per_core{host="110"} 2.2',
'awoooi_host_swap_used_ratio{host="110"} 0.1',
'awoooi_host_runaway_process_remediation_authorized{host="110"} 0',
'awoooi_host_gitea_actions_active_container_count{host="110"} 0',
'awoooi_host_gitea_actions_active_process_group_count{host="110"} 0',
'awoooi_host_runaway_browser_orphan_group_count{host="110",rule="stockplatform_headless_smoke",min_age_seconds="1800",min_cpu_percent="50"} 1',
'awoooi_host_runaway_browser_orphan_cpu_percent{host="110",rule="stockplatform_headless_smoke",min_age_seconds="1800",min_cpu_percent="50"} 155.5',
]
),
encoding="utf-8",
)
result = subprocess.run(
[
sys.executable,
str(CONTROLLER_PATH),
"--host",
"110",
"--metrics-file",
str(metrics_file),
"--json",
],
check=True,
capture_output=True,
text=True,
)
payload = json.loads(result.stdout)
assert payload["classification"] == "controlled_orphan_browser_remediation_ready"
assert payload["controlled_apply_allowed"] is True
assert "host-runaway-process-remediation.py --rule stockplatform_headless_smoke" in payload["commands"]["dry_run"]
assert "--controlled-apply-id" in payload["commands"]["controlled_apply"]
assert payload["operation_boundaries"]["process_signal_performed"] is False
def test_sustained_load_controller_keeps_ci_saturation_on_runner_path(tmp_path: Path) -> None:
metrics_file = tmp_path / "host.prom"
metrics_file.write_text(
"\n".join(
[
'awoooi_host_runaway_process_monitor_up{host="110",mode="read_only"} 1',
'awoooi_host_load5_per_core{host="110"} 2.0',
'awoooi_host_swap_used_ratio{host="110"} 0.1',
'awoooi_host_runaway_process_remediation_authorized{host="110"} 0',
'awoooi_host_gitea_actions_active_container_count{host="110"} 2',
'awoooi_host_gitea_actions_active_process_group_count{host="110"} 1',
'awoooi_host_gitea_actions_active_process_cpu_percent{host="110"} 180.0',
'awoooi_host_gitea_actions_active_process_oldest_age_seconds{host="110"} 1900',
]
),
encoding="utf-8",
)
result = subprocess.run(
[
sys.executable,
str(CONTROLLER_PATH),
"--host",
"110",
"--metrics-file",
str(metrics_file),
"--json",
],
check=True,
capture_output=True,
text=True,
)
payload = json.loads(result.stdout)
assert payload["classification"] == "controlled_ci_runner_saturation_guarded"
assert payload["controlled_apply_allowed"] is True
assert "fail_closed" in payload["commands"]["controlled_apply"]
assert "process_kill" not in payload["commands"]["controlled_apply"]
def test_sustained_load_controller_blocks_monitor_authority_violation(tmp_path: Path) -> None:
metrics_file = tmp_path / "host.prom"
metrics_file.write_text(
"\n".join(
[
'awoooi_host_runaway_process_monitor_up{host="110",mode="read_only"} 1',
'awoooi_host_load5_per_core{host="110"} 2.0',
'awoooi_host_runaway_process_remediation_authorized{host="110"} 1',
]
),
encoding="utf-8",
)
result = subprocess.run(
[
sys.executable,
str(CONTROLLER_PATH),
"--host",
"110",
"--metrics-file",
str(metrics_file),
"--json",
],
capture_output=True,
text=True,
)
assert result.returncode == 75
payload = json.loads(result.stdout)
assert payload["classification"] == "blocked_monitor_authority_violation"
assert payload["controlled_apply_allowed"] is False
def test_sustained_load_controller_routes_unknown_load_to_sanitized_evidence(tmp_path: Path) -> None:
metrics_file = tmp_path / "host.prom"
metrics_file.write_text(
"\n".join(
[
'awoooi_host_runaway_process_monitor_up{host="110",mode="read_only"} 1',
'awoooi_host_load5_per_core{host="110"} 2.0',
'awoooi_host_swap_used_ratio{host="110"} 0.1',
'awoooi_host_runaway_process_remediation_authorized{host="110"} 0',
'awoooi_host_gitea_actions_active_container_count{host="110"} 0',
'awoooi_host_gitea_actions_active_process_group_count{host="110"} 0',
'awoooi_host_runaway_browser_orphan_group_count{host="110",rule="stockplatform_headless_smoke",min_age_seconds="1800",min_cpu_percent="50"} 0',
]
),
encoding="utf-8",
)
result = subprocess.run(
[
sys.executable,
str(CONTROLLER_PATH),
"--host",
"110",
"--metrics-file",
str(metrics_file),
"--json",
],
capture_output=True,
text=True,
)
assert result.returncode == 75
payload = json.loads(result.stdout)
assert payload["classification"] == "blocked_unknown_sustained_load_requires_source_specific_playbook"
assert payload["controlled_apply_allowed"] is False
assert "host-sustained-load-evidence.py" in payload["commands"]["dry_run"]
assert payload["operation_boundaries"]["process_signal_performed"] is False
def test_sustained_load_evidence_emits_sanitized_gitea_recommendation(tmp_path: Path) -> None:
ps_file = tmp_path / "ps.txt"
ps_file.write_text(
"\n".join(
[
"100 1 100 7200 280.0 1.0 gitea /usr/local/bin/gitea web --config /home/wooo/gitea/app.ini",
"200 1 200 180 15.0 0.5 systemd systemctl show gitea-act-runner-host.service",
]
),
encoding="utf-8",
)
docker_file = tmp_path / "docker.prom"
docker_file.write_text(
'docker_container_cpu_cores{host="110",container_name="gitea"} 3.4\n',
encoding="utf-8",
)
result = subprocess.run(
[
sys.executable,
str(EVIDENCE_PATH),
"--host",
"110",
"--ps-file",
str(ps_file),
"--docker-stats-file",
str(docker_file),
"--json",
],
check=True,
capture_output=True,
text=True,
)
payload = json.loads(result.stdout)
assert payload["schema_version"] == "host_sustained_load_sanitized_evidence_v1"
assert payload["recommendation"] == "gitea_queue_or_hook_backlog_playbook"
assert payload["redaction"]["raw_command_lines_emitted"] is False
assert payload["operation_boundaries"]["host_write_performed"] is False
assert "/home/wooo" not in result.stdout
def test_sustained_load_controller_routes_unknown_load_to_sanitized_evidence(tmp_path: Path) -> None:
metrics_file = tmp_path / "host.prom"
metrics_file.write_text(
"\n".join(
[
'awoooi_host_runaway_process_monitor_up{host="110",mode="read_only"} 1',
'awoooi_host_load5_per_core{host="110"} 2.4',
'awoooi_host_swap_used_ratio{host="110"} 0.1',
'awoooi_host_runaway_process_remediation_authorized{host="110"} 0',
]
),
encoding="utf-8",
)
result = subprocess.run(
[
sys.executable,
str(CONTROLLER_PATH),
"--host",
"110",
"--metrics-file",
str(metrics_file),
"--json",
],
capture_output=True,
text=True,
)
assert result.returncode == 75
payload = json.loads(result.stdout)
assert (
payload["classification"]
== "blocked_unknown_sustained_load_requires_source_specific_playbook"
)
assert payload["controlled_apply_allowed"] is False
assert "host-sustained-load-evidence.py" in payload["commands"]["dry_run"]
assert payload["operation_boundaries"]["host_write_performed"] is False
def test_sustained_load_evidence_sanitizes_process_details(tmp_path: Path) -> None:
ps_file = tmp_path / "ps.txt"
ps_file.write_text(
"\n".join(
[
"101 1 101 7200 65.0 2.5 chrome /opt/chrome/chrome --headless --user-data-dir=/tmp/stockplatform-review-bulk-ux-aa --url=https://example.invalid/token",
"102 1 102 3600 20.0 1.0 node node /srv/private/app/server.js --api-key=SECRET",
]
),
encoding="utf-8",
)
docker_stats_file = tmp_path / "docker.prom"
docker_stats_file.write_text(
'docker_container_cpu_cores{host="110",container_name="gitea"} 3.2\n',
encoding="utf-8",
)
result = subprocess.run(
[
sys.executable,
str(EVIDENCE_PATH),
"--host",
"110",
"--ps-file",
str(ps_file),
"--docker-stats-file",
str(docker_stats_file),
"--json",
],
check=True,
capture_output=True,
text=True,
)
payload = json.loads(result.stdout)
assert payload["schema_version"] == "host_sustained_load_sanitized_evidence_v1"
assert payload["recommendation"] == "gitea_queue_or_hook_backlog_playbook"
assert payload["redaction"]["raw_command_lines_emitted"] is False
assert payload["redaction"]["workspace_paths_emitted"] is False
assert payload["redaction"]["urls_emitted"] is False
assert payload["operation_boundaries"]["host_write_performed"] is False
assert "https://example.invalid/token" not in result.stdout
assert "/tmp/stockplatform-review-bulk-ux-aa" not in result.stdout
assert "SECRET" not in result.stdout
assert {item["family"] for item in payload["top_process_families"]} >= {
"headless_browser",
"node_service",
}