Files
awoooi/scripts/ops/host-sustained-load-evidence.py
Your Name 1ac8808607
Some checks failed
CD Pipeline / workflow-shape (push) Successful in 0s
CD Pipeline / cancel-stale-cd (push) Has been skipped
CD Pipeline / tests (push) Failing after 33s
AWOOOI Harbor 110 Local Repair / workflow-shape (push) Successful in 0s
CD Pipeline / build-and-deploy (push) Has been skipped
AWOOOI Harbor 110 Local Repair / harbor-110-local-repair (push) Failing after 1m41s
CD Pipeline / post-deploy-checks (push) Has been skipped
fix(recovery): ignore stale docker cpu attribution
2026-07-01 14:06:51 +08:00

352 lines
12 KiB
Python
Executable File

#!/usr/bin/env python3
"""Build sanitized evidence for unknown sustained host load.
This collector is read-only. It intentionally emits process families and
container names instead of raw command lines so CPU-pressure alerts can proceed
to a source-specific PlayBook without leaking workspace paths, URLs, JSON
payloads, or secrets.
"""
from __future__ import annotations
import argparse
import json
import os
import re
import subprocess
import time
from pathlib import Path
from typing import Any
DEFAULT_HOST_METRICS_FILE = Path("/home/wooo/node_exporter_textfiles/host_runaway_process.prom")
DEFAULT_DOCKER_STATS_FILE = Path("/home/wooo/node_exporter_textfiles/docker_stats.prom")
DEFAULT_DOCKER_STATS_MAX_AGE_SECONDS = 300
SCHEMA_VERSION = "host_sustained_load_sanitized_evidence_v1"
LABEL_RE = re.compile(r"(?P<key>[A-Za-z_][A-Za-z0-9_]*)=\"(?P<value>(?:[^\"\\\\]|\\\\.)*)\"")
METRIC_RE = re.compile(
r"^(?P<name>[A-Za-z_:][A-Za-z0-9_:]*)(?:\{(?P<labels>[^}]*)\})?\s+"
r"(?P<value>[-+]?(?:\d+(?:\.\d*)?|\.\d+)(?:[eE][-+]?\d+)?)$"
)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Collect sanitized sustained-load evidence.")
parser.add_argument("--host", default=os.environ.get("AIOPS_HOST_LABEL", "110"))
parser.add_argument("--metrics-file", type=Path, default=DEFAULT_HOST_METRICS_FILE)
parser.add_argument("--docker-stats-file", type=Path, default=DEFAULT_DOCKER_STATS_FILE)
parser.add_argument(
"--docker-stats-max-age-seconds",
type=int,
default=DEFAULT_DOCKER_STATS_MAX_AGE_SECONDS,
)
parser.add_argument("--ps-file", type=Path)
parser.add_argument("--top-n", type=int, default=8)
parser.add_argument("--json", action="store_true")
return parser.parse_args()
def _unescape_label(value: str) -> str:
return value.replace(r"\"", '"').replace(r"\\", "\\").replace(r"\n", "\n")
def parse_prometheus_text(text: str) -> list[dict[str, Any]]:
samples: list[dict[str, Any]] = []
for raw_line in text.splitlines():
line = raw_line.strip()
if not line or line.startswith("#"):
continue
match = METRIC_RE.match(line)
if not match:
continue
labels = {
item.group("key"): _unescape_label(item.group("value"))
for item in LABEL_RE.finditer(match.group("labels") or "")
}
samples.append(
{
"name": match.group("name"),
"labels": labels,
"value": float(match.group("value")),
}
)
return samples
def _sample_value_any(samples: list[dict[str, Any]], name: str) -> float | None:
for sample in samples:
if sample["name"] == name:
return float(sample["value"])
return None
def _textfile_mtime_seconds(samples: list[dict[str, Any]], suffix: str) -> float | None:
for sample in samples:
if sample["name"] != "node_textfile_mtime_seconds":
continue
file_label = str(sample["labels"].get("file") or "")
if file_label.endswith(suffix):
return float(sample["value"])
return None
def docker_stats_freshness(
*,
samples: list[dict[str, Any]],
docker_stats_file: Path,
max_age_seconds: int,
) -> dict[str, Any]:
mtime = _textfile_mtime_seconds(samples, "docker_stats.prom")
now = _sample_value_any(samples, "node_time_seconds")
source = "node_textfile_mtime_seconds"
if mtime is None:
try:
mtime = docker_stats_file.stat().st_mtime
now = time.time()
source = "file_stat_mtime"
except FileNotFoundError:
return {
"fresh": False,
"age_seconds": None,
"max_age_seconds": max_age_seconds,
"source": "missing",
}
if now is None:
now = time.time()
age_seconds = max(0, int(now - mtime))
return {
"fresh": age_seconds <= max_age_seconds,
"age_seconds": age_seconds,
"max_age_seconds": max_age_seconds,
"source": source,
}
def read_text(path: Path | None) -> str:
if path is None:
return ""
try:
return path.read_text(encoding="utf-8")
except FileNotFoundError:
return ""
def collect_ps_text(ps_file: Path | None) -> str:
if ps_file is not None:
return read_text(ps_file)
result = subprocess.run(
["ps", "-eo", "pid=,ppid=,pgid=,etimes=,pcpu=,pmem=,comm=,args="],
check=True,
capture_output=True,
text=True,
timeout=10,
)
return result.stdout
def parse_ps_text(text: str) -> list[dict[str, Any]]:
rows: list[dict[str, Any]] = []
for raw_line in text.splitlines():
line = raw_line.strip()
if not line:
continue
parts = line.split(None, 7)
if len(parts) < 7:
continue
pid, ppid, pgid, etimes, pcpu, pmem, comm = parts[:7]
args = parts[7] if len(parts) > 7 else comm
try:
rows.append(
{
"pid": int(pid),
"ppid": int(ppid),
"pgid": int(pgid),
"etimes": int(float(etimes)),
"cpu_percent": float(pcpu),
"mem_percent": float(pmem),
"comm": Path(comm).name[:48],
"family": classify_process_family(comm, args),
}
)
except ValueError:
continue
return rows
def classify_process_family(comm: str, args: str) -> str:
text = f"{comm} {args}".lower()
if "act_runner" in text or "gitea-actions-task" in text or "/.cache/act/" in text:
return "gitea_actions_runner"
if "docker build" in text or "buildx" in text or "buildkit" in text:
return "docker_build"
if "next build" in text or "turbo build" in text or "pnpm" in text and " build" in text:
return "web_build"
if "chrome" in text or "chromium" in text or "playwright" in text:
return "headless_browser"
if "gitea" in text:
return "gitea_service"
if "postgres" in text or "postmaster" in text:
return "postgres"
if "clickhouse" in text:
return "clickhouse"
if "kafka" in text:
return "kafka"
if "sentry" in text:
return "sentry"
if "systemctl" in text or "systemd" in text or "dbus" in text:
return "systemd_control_plane"
if "sshd" in text:
return "ssh_control_plane"
if "python" in text:
return "python_job"
if "node" in text:
return "node_service"
return "unknown"
def summarize_processes(rows: list[dict[str, Any]], *, top_n: int) -> dict[str, Any]:
top_rows = sorted(rows, key=lambda item: (-item["cpu_percent"], item["comm"], item["pid"]))[:top_n]
families: dict[str, dict[str, Any]] = {}
for row in rows:
family = row["family"]
current = families.setdefault(
family,
{
"family": family,
"process_count": 0,
"cpu_percent": 0.0,
"max_age_seconds": 0,
"sample_comm": "",
},
)
current["process_count"] += 1
current["cpu_percent"] += row["cpu_percent"]
current["max_age_seconds"] = max(current["max_age_seconds"], row["etimes"])
if not current["sample_comm"] or row["cpu_percent"] > current.get("_sample_cpu", -1):
current["sample_comm"] = row["comm"]
current["_sample_cpu"] = row["cpu_percent"]
family_rows = []
for item in families.values():
item.pop("_sample_cpu", None)
item["cpu_percent"] = round(float(item["cpu_percent"]), 3)
family_rows.append(item)
return {
"top_processes": [
{
"pid": row["pid"],
"ppid": row["ppid"],
"pgid": row["pgid"],
"cpu_percent": round(row["cpu_percent"], 3),
"mem_percent": round(row["mem_percent"], 3),
"age_seconds": row["etimes"],
"comm": row["comm"],
"family": row["family"],
}
for row in top_rows
],
"families": sorted(family_rows, key=lambda item: (-item["cpu_percent"], item["family"]))[:top_n],
}
def top_docker_containers(samples: list[dict[str, Any]], *, host: str, top_n: int) -> list[dict[str, Any]]:
rows = []
for sample in samples:
if sample["name"] != "docker_container_cpu_cores":
continue
labels = sample["labels"]
if labels.get("host", host) != host:
continue
rows.append(
{
"container_name": labels.get("container_name") or labels.get("name") or "unknown",
"cpu_cores": round(float(sample["value"]), 6),
}
)
return sorted(rows, key=lambda item: (-item["cpu_cores"], item["container_name"]))[:top_n]
def recommend_playbook(process_families: list[dict[str, Any]], containers: list[dict[str, Any]]) -> str:
top_container = containers[0] if containers else {}
top_container_name = str(top_container.get("container_name") or "").lower()
top_container_cpu = float(top_container.get("cpu_cores") or 0.0)
top_family = process_families[0] if process_families else {}
family = str(top_family.get("family") or "")
if "gitea" in top_container_name and top_container_cpu >= 2.0:
return "gitea_queue_or_hook_backlog_playbook"
if "postgres" in top_container_name or "postgres" in family:
return "postgres_hot_query_or_backup_export_playbook"
if family in {"docker_build", "web_build", "gitea_actions_runner"}:
return "build_or_runner_pressure_playbook"
if family in {"systemd_control_plane", "ssh_control_plane"}:
return "control_plane_saturation_playbook"
if family == "headless_browser":
return "orphan_browser_classification_refresh_playbook"
return "source_specific_playbook_required"
def build_payload(args: argparse.Namespace) -> dict[str, Any]:
host_samples = parse_prometheus_text(read_text(args.metrics_file))
docker_samples = parse_prometheus_text(read_text(args.docker_stats_file))
docker_stats_status = docker_stats_freshness(
samples=host_samples,
docker_stats_file=args.docker_stats_file,
max_age_seconds=args.docker_stats_max_age_seconds,
)
process_summary = summarize_processes(parse_ps_text(collect_ps_text(args.ps_file)), top_n=args.top_n)
untrusted_containers = top_docker_containers(docker_samples, host=args.host, top_n=args.top_n)
containers = untrusted_containers if docker_stats_status.get("fresh") is True else []
recommendation = recommend_playbook(process_summary["families"], containers)
return {
"schema_version": SCHEMA_VERSION,
"host": args.host,
"mode": "read_only_sanitized_evidence",
"recommendation": recommendation,
"controlled_apply_allowed": False,
"next_action": "select_or_generate_source_specific_playbook_then_run_check_mode",
"readback": {
"host_metric_sample_count": len(host_samples),
"docker_metric_sample_count": len(docker_samples),
"docker_stats": docker_stats_status,
"top_container_count": len(containers),
"top_process_family_count": len(process_summary["families"]),
},
"top_containers": containers,
"top_containers_untrusted": untrusted_containers,
"top_process_families": process_summary["families"],
"top_processes_sanitized": process_summary["top_processes"],
"redaction": {
"raw_command_lines_emitted": False,
"workspace_paths_emitted": False,
"urls_emitted": False,
"secret_values_read": False,
},
"operation_boundaries": {
"host_write_performed": False,
"process_signal_performed": False,
"docker_restart_performed": False,
"systemd_restart_performed": False,
"raw_session_read": False,
"raw_runner_registration_read": False,
},
}
def main() -> int:
args = parse_args()
payload = build_payload(args)
if args.json:
print(json.dumps(payload, ensure_ascii=False, indent=2, sort_keys=True))
else:
print(f"recommendation={payload['recommendation']}")
print(f"controlled_apply_allowed={str(payload['controlled_apply_allowed']).lower()}")
print(f"next_action={payload['next_action']}")
return 0
if __name__ == "__main__":
raise SystemExit(main())