Files
awoooi/scripts/ops/host-runaway-process-exporter.py
Your Name ff18872a23
Some checks failed
Code Review / ai-code-review (push) Successful in 14s
Deploy Alert Rules / Deploy Prometheus Alert Rules (push) Failing after 26s
Ansible / Reboot Recovery Contract / validate (push) Has been cancelled
feat(ops): 新增 host runaway process aiops guard
2026-06-18 14:17:03 +08:00

391 lines
14 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Host runaway process textfile exporter for AWOOOI AIOps.
This exporter is read-only. It classifies orphaned headless browser/smoke
process groups separately from legitimate Gitea Actions load so host CPU alerts
can point to a concrete PlayBook instead of a generic "high CPU" symptom.
"""
from __future__ import annotations
import argparse
import os
import re
import subprocess
import tempfile
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Iterable
TEXTFILE_DIR = Path(os.environ.get("NODE_EXPORTER_TEXTFILE_DIR", "/var/lib/node_exporter/textfile_collector"))
OUTPUT_NAME = "host_runaway_process.prom"
HOST_LABEL = os.environ.get("AIOPS_HOST_LABEL", os.uname().nodename)
LABEL_RE = re.compile(r'["\\\n]')
@dataclass(frozen=True)
class ProcessRow:
pid: int
ppid: int
pgid: int
sid: int
etimes: int
pcpu: float
stat: str
comm: str
args: str
@dataclass(frozen=True)
class RunawayRule:
rule_id: str
command_pattern: re.Pattern[str]
context_pattern: re.Pattern[str]
@dataclass(frozen=True)
class ProcessGroup:
rule_id: str
pgid: int
rows: tuple[ProcessRow, ...]
cpu_percent: float
oldest_age_seconds: int
orphan_reason: str
sample_comm: str
DEFAULT_RULES = (
RunawayRule(
"stockplatform_headless_smoke",
re.compile(r"(chrome|chromium|playwright)", re.IGNORECASE),
re.compile(r"stockplatform-review-bulk-ux|/tmp/stockplatform", re.IGNORECASE),
),
RunawayRule(
"headless_browser_smoke",
re.compile(r"(chrome|chromium|playwright)", re.IGNORECASE),
re.compile(r"--headless|--user-data-dir=/tmp|/tmp/.*(smoke|ux|playwright)", re.IGNORECASE),
),
)
def escape_label(value: str) -> str:
return LABEL_RE.sub(lambda m: {"\n": r"\n", "\\": r"\\", '"': r"\""}[m.group(0)], value)
def run_text(command: list[str], timeout: int = 20) -> str:
return subprocess.run(command, check=True, capture_output=True, text=True, timeout=timeout).stdout
def read_ps_text(ps_file: Path | None = None) -> str:
if ps_file:
return ps_file.read_text(encoding="utf-8")
linux_command = [
"ps",
"-eo",
"pid=,ppid=,pgid=,sid=,etimes=,pcpu=,stat=,comm=,args=",
]
try:
return run_text(linux_command)
except (subprocess.CalledProcessError, subprocess.TimeoutExpired):
return run_text(
[
"ps",
"-axo",
"pid=,ppid=,pgid=,sess=,etime=,pcpu=,stat=,comm=,command=",
]
)
def elapsed_to_seconds(value: str) -> int:
try:
return int(float(value))
except ValueError:
pass
days = 0
clock = value
if "-" in value:
raw_days, clock = value.split("-", 1)
days = int(raw_days)
parts = [int(part) for part in clock.split(":")]
if len(parts) == 3:
hours, minutes, seconds = parts
elif len(parts) == 2:
hours = 0
minutes, seconds = parts
else:
hours = 0
minutes = 0
seconds = parts[0]
return days * 86400 + hours * 3600 + minutes * 60 + seconds
def parse_ps_rows(text: str) -> list[ProcessRow]:
rows: list[ProcessRow] = []
for line in text.splitlines():
raw = line.strip()
if not raw:
continue
parts = raw.split(None, 8)
if len(parts) < 9:
continue
try:
rows.append(
ProcessRow(
pid=int(parts[0]),
ppid=int(parts[1]),
pgid=int(parts[2]),
sid=int(parts[3]),
etimes=elapsed_to_seconds(parts[4]),
pcpu=float(parts[5]),
stat=parts[6],
comm=parts[7],
args=parts[8],
)
)
except ValueError:
continue
return rows
def matching_rule(row: ProcessRow, rules: Iterable[RunawayRule] = DEFAULT_RULES) -> str | None:
haystack = f"{row.comm} {row.args}"
for rule in rules:
if rule.command_pattern.search(haystack) and rule.context_pattern.search(haystack):
return rule.rule_id
return None
def orphan_reason(rows: list[ProcessRow], all_pids: set[int]) -> str | None:
if any(row.ppid == 1 for row in rows):
return "ppid_1"
pgid = rows[0].pgid
if pgid not in all_pids:
return "missing_group_leader"
return None
def classify_groups(
rows: list[ProcessRow],
*,
min_age_seconds: int,
min_cpu_percent: float,
) -> list[ProcessGroup]:
all_pids = {row.pid for row in rows}
grouped: dict[tuple[str, int], list[ProcessRow]] = {}
for row in rows:
rule_id = matching_rule(row)
if rule_id is None:
continue
grouped.setdefault((rule_id, row.pgid), []).append(row)
groups: list[ProcessGroup] = []
for (rule_id, pgid), members in grouped.items():
reason = orphan_reason(members, all_pids)
if reason is None:
continue
oldest = max(row.etimes for row in members)
cpu_percent = sum(row.pcpu for row in members)
if oldest < min_age_seconds or cpu_percent < min_cpu_percent:
continue
sample_comm = sorted({row.comm for row in members})[0][:48]
groups.append(
ProcessGroup(
rule_id=rule_id,
pgid=pgid,
rows=tuple(sorted(members, key=lambda row: row.pid)),
cpu_percent=cpu_percent,
oldest_age_seconds=oldest,
orphan_reason=reason,
sample_comm=sample_comm,
)
)
return sorted(groups, key=lambda group: (-group.cpu_percent, group.rule_id, group.pgid))
def active_gitea_action_containers(docker_file: Path | None = None) -> int:
try:
if docker_file:
names = docker_file.read_text(encoding="utf-8").splitlines()
else:
names = run_text(["docker", "ps", "--format", "{{.Names}}"], timeout=10).splitlines()
except Exception:
return -1
return sum(1 for name in names if "GITEA-ACTIONS-TASK-" in name)
def load5_per_core() -> float:
try:
load5 = float(Path("/proc/loadavg").read_text(encoding="utf-8").split()[1])
except Exception:
try:
load5 = os.getloadavg()[1]
except OSError:
return 0.0
cores = os.cpu_count() or 1
return load5 / cores
def swap_used_ratio(meminfo_file: Path | None = None) -> float:
path = meminfo_file or Path("/proc/meminfo")
try:
values: dict[str, float] = {}
for line in path.read_text(encoding="utf-8").splitlines():
key, _, raw = line.partition(":")
if key in {"SwapTotal", "SwapFree"}:
values[key] = float(raw.strip().split()[0]) * 1024
total = values.get("SwapTotal", 0.0)
free = values.get("SwapFree", 0.0)
if total <= 0:
return 0.0
return max(0.0, min(1.0, (total - free) / total))
except Exception:
return 0.0
def render_metrics(
*,
host: str,
groups: list[ProcessGroup],
active_action_containers: int,
min_age_seconds: int,
min_cpu_percent: float,
now: int,
load_ratio: float,
swap_ratio: float,
) -> str:
labels_host = f'host="{escape_label(host)}"'
rule_ids = sorted({rule.rule_id for rule in DEFAULT_RULES})
by_rule = {rule_id: [group for group in groups if group.rule_id == rule_id] for rule_id in rule_ids}
lines = [
"# HELP awoooi_host_runaway_process_monitor_up Whether the host runaway process exporter completed.",
"# TYPE awoooi_host_runaway_process_monitor_up gauge",
"# HELP awoooi_host_runaway_process_last_run_timestamp Unix timestamp of the last exporter run.",
"# TYPE awoooi_host_runaway_process_last_run_timestamp gauge",
"# HELP awoooi_host_runaway_browser_orphan_group_count Count of orphaned browser/smoke process groups above thresholds.",
"# TYPE awoooi_host_runaway_browser_orphan_group_count gauge",
"# HELP awoooi_host_runaway_browser_orphan_process_count Count of orphaned browser/smoke processes above thresholds.",
"# TYPE awoooi_host_runaway_browser_orphan_process_count gauge",
"# HELP awoooi_host_runaway_browser_orphan_cpu_percent Sum CPU percent for orphaned browser/smoke process groups above thresholds.",
"# TYPE awoooi_host_runaway_browser_orphan_cpu_percent gauge",
"# HELP awoooi_host_runaway_browser_orphan_oldest_age_seconds Oldest age of matching orphaned process groups.",
"# TYPE awoooi_host_runaway_browser_orphan_oldest_age_seconds gauge",
"# HELP awoooi_host_runaway_browser_orphan_group_cpu_percent CPU percent for an individual orphaned browser/smoke process group.",
"# TYPE awoooi_host_runaway_browser_orphan_group_cpu_percent gauge",
"# HELP awoooi_host_runaway_browser_orphan_group_info Metadata for an individual orphaned browser/smoke process group.",
"# TYPE awoooi_host_runaway_browser_orphan_group_info gauge",
"# HELP awoooi_host_gitea_actions_active_container_count Active Gitea Actions task containers visible on the host, -1 when Docker is unavailable.",
"# TYPE awoooi_host_gitea_actions_active_container_count gauge",
"# HELP awoooi_host_load5_per_core Host load5 divided by CPU core count.",
"# TYPE awoooi_host_load5_per_core gauge",
"# HELP awoooi_host_swap_used_ratio Host swap used ratio from /proc/meminfo.",
"# TYPE awoooi_host_swap_used_ratio gauge",
"# HELP awoooi_host_runaway_process_remediation_authorized Static guardrail: remediation is not authorized by this exporter.",
"# TYPE awoooi_host_runaway_process_remediation_authorized gauge",
f"awoooi_host_runaway_process_monitor_up{{{labels_host},mode=\"read_only\"}} 1",
f"awoooi_host_runaway_process_last_run_timestamp{{{labels_host}}} {now}",
f"awoooi_host_gitea_actions_active_container_count{{{labels_host}}} {active_action_containers}",
f"awoooi_host_load5_per_core{{{labels_host}}} {load_ratio:.6f}",
f"awoooi_host_swap_used_ratio{{{labels_host}}} {swap_ratio:.6f}",
f"awoooi_host_runaway_process_remediation_authorized{{{labels_host}}} 0",
]
for rule_id in rule_ids:
rule_labels = (
f'{labels_host},rule="{escape_label(rule_id)}",'
f'min_age_seconds="{min_age_seconds}",min_cpu_percent="{min_cpu_percent:g}"'
)
rule_groups = by_rule[rule_id]
lines.append(f"awoooi_host_runaway_browser_orphan_group_count{{{rule_labels}}} {len(rule_groups)}")
lines.append(
f"awoooi_host_runaway_browser_orphan_process_count{{{rule_labels}}} "
f"{sum(len(group.rows) for group in rule_groups)}"
)
lines.append(
f"awoooi_host_runaway_browser_orphan_cpu_percent{{{rule_labels}}} "
f"{sum(group.cpu_percent for group in rule_groups):.6f}"
)
lines.append(
f"awoooi_host_runaway_browser_orphan_oldest_age_seconds{{{rule_labels}}} "
f"{max((group.oldest_age_seconds for group in rule_groups), default=0)}"
)
for group in groups[:20]:
group_labels = (
f'{labels_host},rule="{escape_label(group.rule_id)}",pgid="{group.pgid}",'
f'orphan_reason="{escape_label(group.orphan_reason)}",comm="{escape_label(group.sample_comm)}"'
)
lines.append(f"awoooi_host_runaway_browser_orphan_group_cpu_percent{{{group_labels}}} {group.cpu_percent:.6f}")
lines.append(f"awoooi_host_runaway_browser_orphan_group_info{{{group_labels}}} 1")
return "\n".join(lines) + "\n"
def collect(args: argparse.Namespace) -> str:
rows = parse_ps_rows(read_ps_text(args.ps_file))
groups = classify_groups(
rows,
min_age_seconds=args.min_age_seconds,
min_cpu_percent=args.min_cpu_percent,
)
return render_metrics(
host=args.host,
groups=groups,
active_action_containers=active_gitea_action_containers(args.docker_ps_file),
min_age_seconds=args.min_age_seconds,
min_cpu_percent=args.min_cpu_percent,
now=int(time.time()),
load_ratio=load5_per_core(),
swap_ratio=swap_used_ratio(args.meminfo_file),
)
def write_textfile(payload: str, textfile_dir: Path, output_name: str) -> Path:
textfile_dir.mkdir(parents=True, exist_ok=True)
with tempfile.NamedTemporaryFile("w", dir=textfile_dir, delete=False, encoding="utf-8") as tmp:
tmp.write(payload)
tmp_path = Path(tmp.name)
output_path = textfile_dir / output_name
tmp_path.replace(output_path)
output_path.chmod(0o644)
return output_path
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Export AWOOOI host runaway process metrics.")
parser.add_argument("--host", default=HOST_LABEL)
parser.add_argument("--textfile-dir", type=Path, default=TEXTFILE_DIR)
parser.add_argument("--output-name", default=OUTPUT_NAME)
parser.add_argument("--stdout", action="store_true", help="Print metrics instead of writing the textfile.")
parser.add_argument("--ps-file", type=Path, help="Use a fixture file instead of running ps.")
parser.add_argument("--docker-ps-file", type=Path, help="Use a fixture file instead of docker ps.")
parser.add_argument("--meminfo-file", type=Path, help="Use a fixture file instead of /proc/meminfo.")
parser.add_argument(
"--min-age-seconds",
type=int,
default=int(os.environ.get("AIOPS_RUNAWAY_PROCESS_MIN_AGE_SECONDS", "1800")),
)
parser.add_argument(
"--min-cpu-percent",
type=float,
default=float(os.environ.get("AIOPS_RUNAWAY_PROCESS_MIN_CPU_PERCENT", "50")),
)
return parser.parse_args()
def main() -> None:
args = parse_args()
payload = collect(args)
if args.stdout:
print(payload, end="")
return
output_path = write_textfile(payload, args.textfile_dir, args.output_name)
print(f"HOST_RUNAWAY_PROCESS_EXPORTER_OK output={output_path}")
if __name__ == "__main__":
main()