Some checks failed
CD Pipeline / workflow-shape (push) Successful in 0s
CD Pipeline / cancel-stale-cd (push) Has been skipped
CD Pipeline / tests (push) Failing after 33s
AWOOOI Harbor 110 Local Repair / workflow-shape (push) Successful in 0s
CD Pipeline / build-and-deploy (push) Has been skipped
AWOOOI Harbor 110 Local Repair / harbor-110-local-repair (push) Failing after 1m41s
CD Pipeline / post-deploy-checks (push) Has been skipped
451 lines
16 KiB
Python
Executable File
451 lines
16 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""Classify sustained host load and emit a controlled automation packet.
|
|
|
|
The controller is intentionally read-only by default. It turns
|
|
HostLoadAverageSustainedHigh from a generic "SSH and look around" alert into a
|
|
deterministic AI Agent control packet:
|
|
|
|
* orphan browser/smoke load -> gated SIGTERM helper dry-run, then controlled
|
|
apply with evidence and post-apply verifier
|
|
* active Gitea Actions/BuildKit load -> runner pressure stays fail-closed;
|
|
drain/cancel decisions must use runner/CD verifiers, not process kills
|
|
* unknown or critical pressure -> source-specific playbook or break-glass
|
|
|
|
It never reads secrets, raw runner registrations, sessions, or environment
|
|
files, and it never mutates host state.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import re
|
|
import time
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
|
|
DEFAULT_METRICS_FILE = Path("/home/wooo/node_exporter_textfiles/host_runaway_process.prom")
|
|
DEFAULT_DOCKER_STATS_FILE = Path("/home/wooo/node_exporter_textfiles/docker_stats.prom")
|
|
DEFAULT_DOCKER_STATS_MAX_AGE_SECONDS = 300
|
|
SCHEMA_VERSION = "host_sustained_load_controlled_automation_v1"
|
|
LABEL_RE = re.compile(r"(?P<key>[A-Za-z_][A-Za-z0-9_]*)=\"(?P<value>(?:[^\"\\\\]|\\\\.)*)\"")
|
|
METRIC_RE = re.compile(
|
|
r"^(?P<name>[A-Za-z_:][A-Za-z0-9_:]*)(?:\{(?P<labels>[^}]*)\})?\s+"
|
|
r"(?P<value>[-+]?(?:\d+(?:\.\d*)?|\.\d+)(?:[eE][-+]?\d+)?)$"
|
|
)
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(
|
|
description="Build a controlled AI Agent packet for sustained host load."
|
|
)
|
|
parser.add_argument("--host", default="110")
|
|
parser.add_argument("--metrics-file", type=Path, default=DEFAULT_METRICS_FILE)
|
|
parser.add_argument("--docker-stats-file", type=Path, default=DEFAULT_DOCKER_STATS_FILE)
|
|
parser.add_argument(
|
|
"--docker-stats-max-age-seconds",
|
|
type=int,
|
|
default=DEFAULT_DOCKER_STATS_MAX_AGE_SECONDS,
|
|
)
|
|
parser.add_argument("--load5-per-core-threshold", type=float, default=1.5)
|
|
parser.add_argument("--ci-stale-age-seconds", type=int, default=1800)
|
|
parser.add_argument("--json", action="store_true", help="Print JSON only.")
|
|
return parser.parse_args()
|
|
|
|
|
|
def _unescape_label(value: str) -> str:
|
|
return value.replace(r"\"", '"').replace(r"\\", "\\").replace(r"\n", "\n")
|
|
|
|
|
|
def parse_prometheus_text(text: str) -> list[dict[str, Any]]:
|
|
samples: list[dict[str, Any]] = []
|
|
for raw_line in text.splitlines():
|
|
line = raw_line.strip()
|
|
if not line or line.startswith("#"):
|
|
continue
|
|
match = METRIC_RE.match(line)
|
|
if not match:
|
|
continue
|
|
labels = {
|
|
item.group("key"): _unescape_label(item.group("value"))
|
|
for item in LABEL_RE.finditer(match.group("labels") or "")
|
|
}
|
|
samples.append(
|
|
{
|
|
"name": match.group("name"),
|
|
"labels": labels,
|
|
"value": float(match.group("value")),
|
|
}
|
|
)
|
|
return samples
|
|
|
|
|
|
def _sample_value(
|
|
samples: list[dict[str, Any]],
|
|
name: str,
|
|
*,
|
|
host: str,
|
|
labels: dict[str, str] | None = None,
|
|
default: float = 0.0,
|
|
) -> float:
|
|
expected = {"host": host, **(labels or {})}
|
|
for sample in samples:
|
|
if sample["name"] != name:
|
|
continue
|
|
sample_labels = sample["labels"]
|
|
if all(sample_labels.get(key) == value for key, value in expected.items()):
|
|
return float(sample["value"])
|
|
return default
|
|
|
|
|
|
def _sample_value_any(samples: list[dict[str, Any]], name: str) -> float | None:
|
|
for sample in samples:
|
|
if sample["name"] == name:
|
|
return float(sample["value"])
|
|
return None
|
|
|
|
|
|
def _textfile_mtime_seconds(samples: list[dict[str, Any]], suffix: str) -> float | None:
|
|
for sample in samples:
|
|
if sample["name"] != "node_textfile_mtime_seconds":
|
|
continue
|
|
file_label = str(sample["labels"].get("file") or "")
|
|
if file_label.endswith(suffix):
|
|
return float(sample["value"])
|
|
return None
|
|
|
|
|
|
def docker_stats_freshness(
|
|
*,
|
|
samples: list[dict[str, Any]],
|
|
docker_stats_file: Path,
|
|
max_age_seconds: int,
|
|
) -> dict[str, Any]:
|
|
mtime = _textfile_mtime_seconds(samples, "docker_stats.prom")
|
|
now = _sample_value_any(samples, "node_time_seconds")
|
|
source = "node_textfile_mtime_seconds"
|
|
if mtime is None:
|
|
try:
|
|
mtime = docker_stats_file.stat().st_mtime
|
|
now = time.time()
|
|
source = "file_stat_mtime"
|
|
except FileNotFoundError:
|
|
return {
|
|
"fresh": False,
|
|
"age_seconds": None,
|
|
"max_age_seconds": max_age_seconds,
|
|
"source": "missing",
|
|
}
|
|
if now is None:
|
|
now = time.time()
|
|
age_seconds = max(0, int(now - mtime))
|
|
return {
|
|
"fresh": age_seconds <= max_age_seconds,
|
|
"age_seconds": age_seconds,
|
|
"max_age_seconds": max_age_seconds,
|
|
"source": source,
|
|
}
|
|
|
|
|
|
def _rule_values(samples: list[dict[str, Any]], name: str, *, host: str) -> list[dict[str, Any]]:
|
|
values = []
|
|
for sample in samples:
|
|
if sample["name"] != name:
|
|
continue
|
|
labels = sample["labels"]
|
|
if labels.get("host") != host:
|
|
continue
|
|
rule = labels.get("rule")
|
|
if not rule:
|
|
continue
|
|
values.append({"rule": rule, "value": float(sample["value"])})
|
|
return values
|
|
|
|
|
|
def _top_orphan_rule(samples: list[dict[str, Any]], *, host: str) -> dict[str, Any] | None:
|
|
counts = _rule_values(
|
|
samples,
|
|
"awoooi_host_runaway_browser_orphan_group_count",
|
|
host=host,
|
|
)
|
|
cpu_by_rule = {
|
|
item["rule"]: item["value"]
|
|
for item in _rule_values(
|
|
samples,
|
|
"awoooi_host_runaway_browser_orphan_cpu_percent",
|
|
host=host,
|
|
)
|
|
}
|
|
candidates = [
|
|
{
|
|
"rule": item["rule"],
|
|
"group_count": int(item["value"]),
|
|
"cpu_percent": round(cpu_by_rule.get(item["rule"], 0.0), 3),
|
|
}
|
|
for item in counts
|
|
if item["value"] > 0
|
|
]
|
|
if not candidates:
|
|
return None
|
|
return sorted(candidates, key=lambda item: (-item["cpu_percent"], item["rule"]))[0]
|
|
|
|
|
|
def _top_container_cpu(samples: list[dict[str, Any]], *, host: str) -> dict[str, Any] | None:
|
|
candidates = []
|
|
for sample in samples:
|
|
if sample["name"] != "docker_container_cpu_cores":
|
|
continue
|
|
labels = sample["labels"]
|
|
if labels.get("host", host) != host:
|
|
continue
|
|
candidates.append(
|
|
{
|
|
"container_name": labels.get("container_name") or labels.get("name") or "unknown",
|
|
"cpu_cores": round(float(sample["value"]), 6),
|
|
}
|
|
)
|
|
if not candidates:
|
|
return None
|
|
return sorted(candidates, key=lambda item: (-item["cpu_cores"], item["container_name"]))[0]
|
|
|
|
|
|
def build_packet(
|
|
*,
|
|
host: str,
|
|
samples: list[dict[str, Any]],
|
|
docker_samples: list[dict[str, Any]],
|
|
docker_stats_status: dict[str, Any],
|
|
load5_per_core_threshold: float,
|
|
ci_stale_age_seconds: int,
|
|
) -> dict[str, Any]:
|
|
monitor_up = int(
|
|
_sample_value(
|
|
samples,
|
|
"awoooi_host_runaway_process_monitor_up",
|
|
host=host,
|
|
labels={"mode": "read_only"},
|
|
default=0,
|
|
)
|
|
)
|
|
load5_per_core = _sample_value(samples, "awoooi_host_load5_per_core", host=host)
|
|
swap_used_ratio = _sample_value(samples, "awoooi_host_swap_used_ratio", host=host)
|
|
remediation_authorized = int(
|
|
_sample_value(
|
|
samples,
|
|
"awoooi_host_runaway_process_remediation_authorized",
|
|
host=host,
|
|
)
|
|
)
|
|
active_ci_containers = int(
|
|
_sample_value(
|
|
samples,
|
|
"awoooi_host_gitea_actions_active_container_count",
|
|
host=host,
|
|
default=0,
|
|
)
|
|
)
|
|
active_ci_groups = int(
|
|
_sample_value(
|
|
samples,
|
|
"awoooi_host_gitea_actions_active_process_group_count",
|
|
host=host,
|
|
default=0,
|
|
)
|
|
)
|
|
active_ci_cpu = _sample_value(
|
|
samples,
|
|
"awoooi_host_gitea_actions_active_process_cpu_percent",
|
|
host=host,
|
|
)
|
|
active_ci_oldest_age = int(
|
|
_sample_value(
|
|
samples,
|
|
"awoooi_host_gitea_actions_active_process_oldest_age_seconds",
|
|
host=host,
|
|
)
|
|
)
|
|
top_orphan = _top_orphan_rule(samples, host=host)
|
|
raw_top_container = _top_container_cpu(docker_samples, host=host)
|
|
top_container = raw_top_container if docker_stats_status.get("fresh") is True else None
|
|
top_container_name = str((top_container or {}).get("container_name") or "").lower()
|
|
top_container_cpu = float((top_container or {}).get("cpu_cores") or 0.0)
|
|
|
|
classification = "observing_load_within_threshold"
|
|
severity = "info"
|
|
controlled_apply_allowed = False
|
|
next_action = "keep_read_only_monitoring"
|
|
dry_run_command = ""
|
|
controlled_apply_command = ""
|
|
verifier_command = (
|
|
"scripts/ops/host-sustained-load-controller.py "
|
|
f"--host {host} --metrics-file {DEFAULT_METRICS_FILE}"
|
|
)
|
|
|
|
if monitor_up != 1:
|
|
classification = "blocked_monitor_unavailable"
|
|
severity = "warning"
|
|
next_action = "restore_host_runaway_process_exporter_textfile_before_apply"
|
|
elif remediation_authorized > 0:
|
|
classification = "blocked_monitor_authority_violation"
|
|
severity = "critical"
|
|
next_action = "rollback_monitor_to_read_only_exporter"
|
|
elif load5_per_core > load5_per_core_threshold and top_orphan:
|
|
classification = "controlled_orphan_browser_remediation_ready"
|
|
severity = "critical"
|
|
controlled_apply_allowed = True
|
|
rule = top_orphan["rule"]
|
|
dry_run_command = f"scripts/ops/host-runaway-process-remediation.py --rule {rule}"
|
|
controlled_apply_command = (
|
|
"scripts/ops/host-runaway-process-remediation.py "
|
|
f"--rule {rule} --apply --confirm-apply "
|
|
"--controlled-apply-id ${CONTROLLED_APPLY_ID} "
|
|
"--evidence-ref ${EVIDENCE_REF} "
|
|
"--post-apply-verifier "
|
|
"'scripts/ops/host-sustained-load-controller.py --host "
|
|
f"{host} --metrics-file {DEFAULT_METRICS_FILE}' "
|
|
"--wait-seconds 10"
|
|
)
|
|
next_action = "run_orphan_browser_remediation_dry_run_then_controlled_sigterm"
|
|
elif (
|
|
load5_per_core > load5_per_core_threshold
|
|
and (active_ci_containers > 0 or active_ci_groups > 0)
|
|
):
|
|
classification = "controlled_ci_runner_saturation_guarded"
|
|
severity = "critical" if active_ci_oldest_age >= ci_stale_age_seconds else "warning"
|
|
controlled_apply_allowed = active_ci_oldest_age >= ci_stale_age_seconds
|
|
dry_run_command = (
|
|
"ops/runner/read-public-gitea-actions-queue.py --json "
|
|
"&& ops/runner/check-awoooi-non110-runner-readiness.sh"
|
|
)
|
|
controlled_apply_command = (
|
|
"keep_110_runner_pressure_gate_fail_closed; "
|
|
"only cancel/drain stale Gitea Actions through runner verifier packet"
|
|
)
|
|
next_action = (
|
|
"prepare_runner_drain_or_cancel_packet_without_process_kill"
|
|
if controlled_apply_allowed
|
|
else "keep_pressure_gate_fail_closed_until_ci_load_clears"
|
|
)
|
|
elif (
|
|
load5_per_core > load5_per_core_threshold
|
|
and top_container_name == "gitea"
|
|
and top_container_cpu >= 2.0
|
|
):
|
|
classification = "blocked_gitea_queue_or_hook_backlog_requires_playbook"
|
|
severity = "critical"
|
|
dry_run_command = (
|
|
"scripts/ops/host-sustained-load-evidence.py "
|
|
f"--host {host} --metrics-file {DEFAULT_METRICS_FILE} "
|
|
f"--docker-stats-file {DEFAULT_DOCKER_STATS_FILE} --json"
|
|
)
|
|
next_action = "run_gitea_queue_or_hook_backlog_playbook_check_mode"
|
|
elif load5_per_core > load5_per_core_threshold and swap_used_ratio >= 0.85:
|
|
classification = "blocked_memory_or_swap_pressure_requires_service_playbook"
|
|
severity = "critical"
|
|
next_action = "route_to_service_specific_memory_pressure_playbook"
|
|
elif load5_per_core > load5_per_core_threshold:
|
|
classification = "blocked_unknown_sustained_load_requires_source_specific_playbook"
|
|
severity = "critical"
|
|
dry_run_command = (
|
|
"scripts/ops/host-sustained-load-evidence.py "
|
|
f"--host {host} --metrics-file {DEFAULT_METRICS_FILE} "
|
|
"--docker-stats-file /home/wooo/node_exporter_textfiles/docker_stats.prom "
|
|
"--json"
|
|
)
|
|
next_action = "collect_sanitized_top_process_and_container_stats_then_select_playbook"
|
|
|
|
return {
|
|
"schema_version": SCHEMA_VERSION,
|
|
"host": host,
|
|
"mode": "read_only_control_packet",
|
|
"classification": classification,
|
|
"severity": severity,
|
|
"controlled_apply_allowed": controlled_apply_allowed,
|
|
"next_action": next_action,
|
|
"readback": {
|
|
"monitor_up": monitor_up,
|
|
"load5_per_core": round(load5_per_core, 6),
|
|
"load5_per_core_threshold": load5_per_core_threshold,
|
|
"swap_used_ratio": round(swap_used_ratio, 6),
|
|
"remediation_authorized": remediation_authorized,
|
|
"active_ci_container_count": active_ci_containers,
|
|
"active_ci_process_group_count": active_ci_groups,
|
|
"active_ci_process_cpu_percent": round(active_ci_cpu, 3),
|
|
"active_ci_oldest_age_seconds": active_ci_oldest_age,
|
|
"top_orphan_rule": top_orphan,
|
|
"top_container_cpu": top_container,
|
|
"top_container_cpu_untrusted": raw_top_container,
|
|
"docker_stats": docker_stats_status,
|
|
},
|
|
"commands": {
|
|
"dry_run": dry_run_command,
|
|
"controlled_apply": controlled_apply_command,
|
|
"post_apply_verifier": verifier_command,
|
|
"rollback": "send SIGTERM only; no persistent host mutation. Re-run workload if needed.",
|
|
},
|
|
"operation_boundaries": {
|
|
"secret_value_read": False,
|
|
"raw_session_read": False,
|
|
"raw_runner_registration_read": False,
|
|
"host_write_performed": False,
|
|
"process_signal_performed": False,
|
|
"docker_restart_allowed": False,
|
|
"systemd_restart_allowed": False,
|
|
"firewall_change_allowed": False,
|
|
"critical_break_glass_required": True,
|
|
},
|
|
"forbidden_actions": [
|
|
"SIGKILL",
|
|
"docker_restart",
|
|
"systemctl_restart",
|
|
"nginx_reload",
|
|
"firewall_change",
|
|
"kubectl_action",
|
|
"secret_read",
|
|
"legacy_or_generic_runner_restore",
|
|
],
|
|
}
|
|
|
|
|
|
def main() -> int:
|
|
args = parse_args()
|
|
try:
|
|
text = args.metrics_file.read_text(encoding="utf-8")
|
|
samples = parse_prometheus_text(text)
|
|
except FileNotFoundError:
|
|
samples = []
|
|
try:
|
|
docker_text = args.docker_stats_file.read_text(encoding="utf-8")
|
|
docker_samples = parse_prometheus_text(docker_text)
|
|
except FileNotFoundError:
|
|
docker_samples = []
|
|
packet = build_packet(
|
|
host=args.host,
|
|
samples=samples,
|
|
docker_samples=docker_samples,
|
|
docker_stats_status=docker_stats_freshness(
|
|
samples=samples,
|
|
docker_stats_file=args.docker_stats_file,
|
|
max_age_seconds=args.docker_stats_max_age_seconds,
|
|
),
|
|
load5_per_core_threshold=args.load5_per_core_threshold,
|
|
ci_stale_age_seconds=args.ci_stale_age_seconds,
|
|
)
|
|
if args.json:
|
|
print(json.dumps(packet, ensure_ascii=False, indent=2, sort_keys=True))
|
|
else:
|
|
print(f"status={packet['classification']}")
|
|
print(f"controlled_apply_allowed={str(packet['controlled_apply_allowed']).lower()}")
|
|
print(f"next_action={packet['next_action']}")
|
|
if packet["commands"]["dry_run"]:
|
|
print(f"dry_run_command={packet['commands']['dry_run']}")
|
|
if packet["commands"]["controlled_apply"]:
|
|
print(f"controlled_apply_command={packet['commands']['controlled_apply']}")
|
|
print(f"post_apply_verifier={packet['commands']['post_apply_verifier']}")
|
|
return 0 if not packet["classification"].startswith("blocked_") else 75
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|