Files
awoooi/scripts/security/nginx-config-drift-detector.py

412 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
IwoooS Nginx 只讀配置漂移偵測器。
本工具只讀取 repo 內的 Nginx source-of-truth或由 owner 另行提供的
live conf 匯出檔;它不 SSH、不 reload、不寫入主機、不觸發部署。
"""
from __future__ import annotations
import argparse
import hashlib
import json
import re
import subprocess
import sys
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any
TAIPEI = timezone(timedelta(hours=8))
@dataclass(frozen=True)
class NginxSource:
config_id: str
host: str
role: str
source_path: str
live_path: str
control_tier: str
owner_gate: str
SOURCES = [
NginxSource(
config_id="host188_all_sites",
host="192.168.0.188",
role="public_gateway_all_sites",
source_path="infra/ansible/roles/nginx/templates/188-all-sites.conf.j2",
live_path="/etc/nginx/sites-enabled/all-sites.conf",
control_tier="C0",
owner_gate="public_gateway_owner_response_required",
),
NginxSource(
config_id="host188_internal_tools_https",
host="192.168.0.188",
role="public_internal_tools_https",
source_path="infra/ansible/roles/nginx/templates/188-internal-tools-https.conf.j2",
live_path="owner_confirmation_required",
control_tier="C0",
owner_gate="public_tools_owner_response_required",
),
NginxSource(
config_id="host110_ollama_proxy",
host="192.168.0.110",
role="ollama_proxy_gateway",
source_path="infra/ansible/roles/nginx/templates/110-ollama-proxy.conf.j2",
live_path="/etc/nginx/sites-enabled/110-ollama-proxy.conf",
control_tier="C1",
owner_gate="ai_provider_proxy_owner_response_required",
),
]
def strip_comments(text: str) -> str:
lines: list[str] = []
for line in text.splitlines():
if "#" in line:
line = line.split("#", 1)[0]
lines.append(line)
return "\n".join(lines)
def normalized_text(text: str) -> str:
clean = strip_comments(text)
return "\n".join(line.strip() for line in clean.splitlines() if line.strip())
def sha256_text(text: str) -> str:
return hashlib.sha256(text.encode("utf-8")).hexdigest()
def match_closing_brace(text: str, open_brace: int) -> int:
depth = 0
for index in range(open_brace, len(text)):
char = text[index]
if char == "{":
depth += 1
elif char == "}":
depth -= 1
if depth == 0:
return index
return -1
def named_blocks(text: str, name: str) -> list[tuple[str, str]]:
clean = strip_comments(text)
if name == "server":
pattern = re.compile(r"\bserver\s*\{")
else:
pattern = re.compile(rf"\b{name}\s+([^{{]+)\{{")
blocks: list[tuple[str, str]] = []
for match in pattern.finditer(clean):
open_brace = clean.find("{", match.start())
close_brace = match_closing_brace(clean, open_brace)
if close_brace == -1:
continue
args = ""
if name != "server":
args = (match.group(1) or "").strip()
blocks.append((args, clean[match.start() : close_brace + 1]))
return blocks
def directive_values(block: str, directive: str) -> list[str]:
pattern = re.compile(rf"(?ms)^\s*{re.escape(directive)}\s+(.*?);")
return [" ".join(match.group(1).split()) for match in pattern.finditer(block)]
def split_words(value: str) -> list[str]:
return [part for part in re.split(r"\s+", value.strip()) if part]
def location_entries(block: str) -> list[dict[str, Any]]:
entries: list[dict[str, Any]] = []
for args, body in named_blocks(block, "location"):
proxy_passes = directive_values(body, "proxy_pass")
roots = directive_values(body, "root")
auth_basic = directive_values(body, "auth_basic")
entries.append(
{
"path": args,
"proxy_passes": proxy_passes,
"roots": roots,
"auth_basic": auth_basic,
"websocket_upgrade": "Upgrade $http_upgrade" in body
or "Connection \"upgrade\"" in body
or "Connection $connection_upgrade" in body,
}
)
return entries
def parse_nginx(text: str) -> dict[str, Any]:
servers: list[dict[str, Any]] = []
all_server_names: set[str] = set()
all_listens: set[str] = set()
all_proxy_passes: set[str] = set()
all_ssl_certificates: set[str] = set()
all_ssl_certificate_keys: set[str] = set()
admin_routes: list[dict[str, Any]] = []
acme_routes: list[dict[str, Any]] = []
websocket_routes: list[dict[str, Any]] = []
for index, (_, block) in enumerate(named_blocks(text, "server"), start=1):
names = [
word
for value in directive_values(block, "server_name")
for word in split_words(value)
if word and word != "_"
]
listens = directive_values(block, "listen")
ssl_certs = directive_values(block, "ssl_certificate")
ssl_keys = directive_values(block, "ssl_certificate_key")
locations = location_entries(block)
proxy_passes = [
proxy
for location in locations
for proxy in location.get("proxy_passes", [])
]
all_server_names.update(names)
all_listens.update(listens)
all_proxy_passes.update(proxy_passes)
all_ssl_certificates.update(ssl_certs)
all_ssl_certificate_keys.update(ssl_keys)
for location in locations:
path = str(location["path"])
entry = {
"server_names": names,
"path": path,
"proxy_passes": location.get("proxy_passes", []),
"roots": location.get("roots", []),
"auth_basic": location.get("auth_basic", []),
}
if "/admin" in path:
admin_routes.append(entry)
if ".well-known/acme-challenge" in path:
acme_routes.append(entry)
if location.get("websocket_upgrade"):
websocket_routes.append(entry)
servers.append(
{
"index": index,
"server_names": names,
"listens": listens,
"ssl_certificates": ssl_certs,
"ssl_certificate_keys": ssl_keys,
"proxy_passes": proxy_passes,
"locations": locations,
"has_tls": bool(ssl_certs or any("443" in item for item in listens)),
}
)
return {
"server_block_count": len(servers),
"server_names": sorted(all_server_names),
"listens": sorted(all_listens),
"proxy_passes": sorted(all_proxy_passes),
"ssl_certificates": sorted(all_ssl_certificates),
"ssl_certificate_keys": sorted(all_ssl_certificate_keys),
"admin_routes": admin_routes,
"acme_routes": acme_routes,
"websocket_routes": websocket_routes,
"servers": servers,
}
def git_short_sha(root: Path) -> str:
try:
result = subprocess.run(
["git", "rev-parse", "--short", "HEAD"],
cwd=root,
check=True,
capture_output=True,
text=True,
)
return result.stdout.strip()
except Exception:
return "unknown"
def read_source(path: Path) -> dict[str, Any]:
raw = path.read_text(encoding="utf-8")
normalized = normalized_text(raw)
parsed = parse_nginx(raw)
return {
"raw_sha256": sha256_text(raw),
"normalized_sha256": sha256_text(normalized),
"line_count": len(raw.splitlines()),
"parsed": parsed,
}
def compare_sets(source_values: list[str], live_values: list[str]) -> dict[str, list[str]]:
source_set = set(source_values)
live_set = set(live_values)
return {
"missing_in_live": sorted(source_set - live_set),
"extra_in_live": sorted(live_set - source_set),
}
def compare_config(source: dict[str, Any], live: dict[str, Any]) -> dict[str, Any]:
source_parsed = source["parsed"]
live_parsed = live["parsed"]
normalized_matches = source["normalized_sha256"] == live["normalized_sha256"]
semantic_diff = {
"server_names": compare_sets(source_parsed["server_names"], live_parsed["server_names"]),
"proxy_passes": compare_sets(source_parsed["proxy_passes"], live_parsed["proxy_passes"]),
"ssl_certificates": compare_sets(
source_parsed["ssl_certificates"],
live_parsed["ssl_certificates"],
),
}
has_semantic_diff = any(
diff["missing_in_live"] or diff["extra_in_live"]
for diff in semantic_diff.values()
)
return {
"normalized_hash_matches": normalized_matches,
"semantic_diff": semantic_diff,
"drift_detected": (not normalized_matches) or has_semantic_diff,
}
def parse_live_files(items: list[str]) -> dict[str, Path]:
live_files: dict[str, Path] = {}
for item in items:
if "=" not in item:
raise ValueError(f"--live-file 必須使用 config_id=/path 格式:{item}")
config_id, raw_path = item.split("=", 1)
live_files[config_id.strip()] = Path(raw_path.strip())
return live_files
def build_report(
root: Path,
live_files: dict[str, Path],
generated_at: str | None,
) -> dict[str, Any]:
report_time = generated_at or datetime.now(TAIPEI).isoformat(timespec="seconds")
configs: list[dict[str, Any]] = []
drift_count = 0
live_input_count = 0
for source in SOURCES:
source_path = root / source.source_path
source_report = read_source(source_path)
live_path = live_files.get(source.config_id)
live_report: dict[str, Any] | None = None
comparison: dict[str, Any] = {
"status": "repo_only_no_live_evidence",
"drift_detected": None,
"note": "尚未提供 live conf 匯出檔;本階段不 SSH、不讀 live、不 reload。",
}
if live_path is not None:
live_input_count += 1
if not live_path.exists():
comparison = {
"status": "live_file_missing",
"drift_detected": None,
"note": f"找不到 owner 提供的 live conf 匯出檔:{live_path}",
}
else:
live_report = read_source(live_path)
comparison = compare_config(source_report, live_report)
comparison["status"] = (
"drift_detected" if comparison["drift_detected"] else "matched"
)
if comparison["drift_detected"]:
drift_count += 1
configs.append(
{
"config_id": source.config_id,
"host": source.host,
"role": source.role,
"control_tier": source.control_tier,
"owner_gate": source.owner_gate,
"repo_source_path": source.source_path,
"live_path": source.live_path,
"repo_source": source_report,
"live_input": {
"provided": live_path is not None,
"path": str(live_path) if live_path else None,
"summary": live_report,
},
"comparison": comparison,
}
)
return {
"schema_version": "nginx_config_drift_detector_v1",
"generated_at": report_time,
"mode": "repo_only" if live_input_count == 0 else "compare_owner_provided_live_files",
"git_commit": git_short_sha(root),
"execution_boundaries": {
"ssh_executed": False,
"nginx_test_executed": False,
"nginx_reload_executed": False,
"host_write_executed": False,
"runtime_gate_opened": False,
"secret_value_collected": False,
},
"summary": {
"source_config_count": len(configs),
"live_input_count": live_input_count,
"drift_detected_count": drift_count,
"repo_source_inventory_complete": True,
"live_evidence_collected": live_input_count > 0,
},
"configs": configs,
"next_steps": [
"由 owner 提供脫敏 live conf 匯出檔後重跑比較模式。",
"若偵測 drift只建立 evidence 與 owner decision不自動覆寫 live。",
"任何 Nginx reload 仍需 maintenance window、rollback owner、nginx -t 與 route smoke。",
],
}
def main() -> int:
parser = argparse.ArgumentParser(description="IwoooS Nginx 只讀配置漂移偵測器")
parser.add_argument("--root", default=".", help="repo root")
parser.add_argument("--output", help="寫出 JSON 報告")
parser.add_argument(
"--live-file",
action="append",
default=[],
help="owner 提供的 live conf 匯出檔格式config_id=/path/to/file",
)
parser.add_argument("--generated-at", help="固定報告時間,供 committed snapshot 使用")
parser.add_argument("--fail-on-drift", action="store_true", help="偵測到 drift 時回傳 1")
args = parser.parse_args()
root = Path(args.root).resolve()
live_files = parse_live_files(args.live_file)
report = build_report(root, live_files, args.generated_at)
payload = json.dumps(report, ensure_ascii=False, indent=2, sort_keys=True)
if args.output:
output = Path(args.output)
output.parent.mkdir(parents=True, exist_ok=True)
output.write_text(payload + "\n", encoding="utf-8")
else:
print(payload)
if args.fail_on_drift and report["summary"]["drift_detected_count"] > 0:
return 1
return 0
if __name__ == "__main__":
sys.exit(main())