Files
awoooi/scripts/ops/systemd-units-textfile-exporter.py
Your Name 40b2742456
Some checks failed
CD Pipeline / workflow-shape (push) Successful in 1s
CD Pipeline / cancel-stale-cd (push) Has been skipped
CD Pipeline / tests (push) Failing after 1m7s
CD Pipeline / build-and-deploy (push) Has been cancelled
CD Pipeline / post-deploy-checks (push) Has been cancelled
fix(recovery): drain host cpu pressure gates
2026-07-01 08:25:00 +08:00

206 lines
7.1 KiB
Python

#!/usr/bin/env python3
"""
Systemd unit textfile exporter for host-level AIOps baselines.
2026-05-05 ogt + Codex: 110 runner overload follow-up.
Why: GitHub/Gitea runner services are outside Docker metrics. A bad
WatchdogSec drop-in caused repeated runner restarts on 110, so the AI monitor
needs direct visibility into systemd restarts, watchdog settings, and limits.
"""
from __future__ import annotations
import os
import fcntl
import re
import subprocess
import tempfile
from pathlib import Path
TEXTFILE_DIR = Path(os.environ.get("NODE_EXPORTER_TEXTFILE_DIR", "/var/lib/node_exporter/textfile_collector"))
OUTPUT_NAME = "systemd_units.prom"
HOST_LABEL = os.environ.get("AIOPS_HOST_LABEL", os.uname().nodename)
LOCK_PATH = Path(os.environ.get("AIOPS_SYSTEMD_EXPORTER_LOCK", f"/tmp/{OUTPUT_NAME}.lock"))
UNIT_NAMES = [
unit.strip()
for unit in os.environ.get("AIOPS_SYSTEMD_UNITS", "").split(",")
if unit.strip()
]
LABEL_RE = re.compile(r'["\\\n]')
DEFAULT_SYSTEMCTL_TIMEOUT_SECONDS = 2.0
DEFAULT_MAX_TIMEOUTS = 1
def _escape_label(value: str) -> str:
return LABEL_RE.sub(lambda m: {"\n": r"\n", "\\": r"\\", '"': r"\""}[m.group(0)], value)
def _parse_usec(value: str) -> float:
value = value.strip()
if value in {"", "0", "infinity"}:
return 0.0
multipliers = {
"us": 0.000001,
"ms": 0.001,
"s": 1.0,
"min": 60.0,
"h": 3600.0,
}
match = re.fullmatch(r"([0-9.]+)\s*([A-Za-z]+)?", value)
if not match:
return 0.0
number, unit = match.groups()
if unit is None:
return float(number) / 1_000_000
return float(number) * multipliers.get(unit, 0.0)
def _parse_bytes(value: str) -> float:
if value in {"", "infinity"}:
return 0.0
try:
return float(value)
except ValueError:
return 0.0
def _systemctl_timeout_seconds() -> float:
raw = os.environ.get("AIOPS_SYSTEMD_SHOW_TIMEOUT_SECONDS", "")
if not raw:
return DEFAULT_SYSTEMCTL_TIMEOUT_SECONDS
try:
return max(0.25, float(raw))
except ValueError:
return DEFAULT_SYSTEMCTL_TIMEOUT_SECONDS
def _max_timeouts() -> int:
raw = os.environ.get("AIOPS_SYSTEMD_MAX_TIMEOUTS", "")
if not raw:
return DEFAULT_MAX_TIMEOUTS
try:
return max(1, int(raw))
except ValueError:
return DEFAULT_MAX_TIMEOUTS
def _compact_error(exc: Exception) -> str:
if isinstance(exc, subprocess.TimeoutExpired):
return "timeout"
if isinstance(exc, subprocess.CalledProcessError):
return f"exit_{exc.returncode}"
return exc.__class__.__name__
def _show_unit(unit: str) -> dict[str, str]:
result = subprocess.run(
[
"systemctl",
"show",
unit,
"-p",
"ActiveState",
"-p",
"SubState",
"-p",
"NRestarts",
"-p",
"WatchdogUSec",
"-p",
"CPUQuotaPerSecUSec",
"-p",
"MemoryMax",
],
check=True,
capture_output=True,
text=True,
timeout=_systemctl_timeout_seconds(),
)
values = {}
for line in result.stdout.splitlines():
key, _, raw = line.partition("=")
values[key] = raw
return values
def collect() -> str:
lines = [
"# HELP systemd_unit_info Systemd unit inventory exposed by the textfile exporter.",
"# TYPE systemd_unit_info gauge",
"# HELP systemd_unit_restarts_total Systemd unit restart counter from systemctl show NRestarts.",
"# TYPE systemd_unit_restarts_total counter",
"# HELP systemd_unit_watchdog_seconds Systemd unit WatchdogSec setting in seconds, 0 when disabled.",
"# TYPE systemd_unit_watchdog_seconds gauge",
"# HELP systemd_unit_cpu_quota_cores Systemd unit CPUQuota converted to CPU cores, 0 when unlimited.",
"# TYPE systemd_unit_cpu_quota_cores gauge",
"# HELP systemd_unit_memory_max_bytes Systemd unit MemoryMax in bytes, 0 when unlimited.",
"# TYPE systemd_unit_memory_max_bytes gauge",
"# HELP systemd_unit_exporter_timeout_budget_exhausted Whether this scrape stopped probing after repeated systemctl timeouts.",
"# TYPE systemd_unit_exporter_timeout_budget_exhausted gauge",
]
timeout_count = 0
timeout_budget_exhausted = False
for index, unit in enumerate(UNIT_NAMES):
labels = f'host="{_escape_label(HOST_LABEL)}",unit="{_escape_label(unit)}"'
try:
values = _show_unit(unit)
except Exception as exc:
reason = _compact_error(exc)
lines.append(f'systemd_unit_info{{{labels},active_state="scrape_error",sub_state="{_escape_label(reason)}"}} 0')
if reason == "timeout":
timeout_count += 1
if timeout_count >= _max_timeouts():
timeout_budget_exhausted = True
for skipped_unit in UNIT_NAMES[index + 1 :]:
skipped_labels = (
f'host="{_escape_label(HOST_LABEL)}",unit="{_escape_label(skipped_unit)}"'
)
lines.append(
f'systemd_unit_info{{{skipped_labels},active_state="scrape_skipped",sub_state="systemctl_timeout_budget_exhausted"}} 0'
)
break
continue
active_state = values.get("ActiveState", "")
sub_state = values.get("SubState", "")
restarts = int(values.get("NRestarts") or 0)
watchdog_seconds = _parse_usec(values.get("WatchdogUSec", "0"))
cpu_quota = _parse_usec(values.get("CPUQuotaPerSecUSec", "0"))
memory_max = _parse_bytes(values.get("MemoryMax", "0"))
lines.append(
f'systemd_unit_info{{{labels},active_state="{_escape_label(active_state)}",sub_state="{_escape_label(sub_state)}"}} 1'
)
lines.append(f"systemd_unit_restarts_total{{{labels}}} {restarts}")
lines.append(f"systemd_unit_watchdog_seconds{{{labels}}} {watchdog_seconds:.6f}")
lines.append(f"systemd_unit_cpu_quota_cores{{{labels}}} {cpu_quota:.6f}")
lines.append(f"systemd_unit_memory_max_bytes{{{labels}}} {memory_max:.0f}")
host_labels = f'host="{_escape_label(HOST_LABEL)}"'
lines.append(
f"systemd_unit_exporter_timeout_budget_exhausted{{{host_labels}}} {1 if timeout_budget_exhausted else 0}"
)
return "\n".join(lines) + "\n"
def main() -> None:
if not UNIT_NAMES:
raise SystemExit("AIOPS_SYSTEMD_UNITS is required")
LOCK_PATH.parent.mkdir(parents=True, exist_ok=True)
with LOCK_PATH.open("w") as lock_handle:
try:
fcntl.flock(lock_handle, fcntl.LOCK_EX | fcntl.LOCK_NB)
except BlockingIOError:
return
TEXTFILE_DIR.mkdir(parents=True, exist_ok=True)
payload = collect()
with tempfile.NamedTemporaryFile("w", dir=TEXTFILE_DIR, delete=False, encoding="utf-8") as tmp:
tmp.write(payload)
tmp_path = Path(tmp.name)
output_path = TEXTFILE_DIR / OUTPUT_NAME
tmp_path.replace(output_path)
output_path.chmod(0o644)
if __name__ == "__main__":
main()