412 lines
14 KiB
Python
412 lines
14 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
IwoooS Nginx 只讀配置漂移偵測器。
|
||
|
||
本工具只讀取 repo 內的 Nginx source-of-truth,或由 owner 另行提供的
|
||
live conf 匯出檔;它不 SSH、不 reload、不寫入主機、不觸發部署。
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import hashlib
|
||
import json
|
||
import re
|
||
import subprocess
|
||
import sys
|
||
from dataclasses import dataclass
|
||
from datetime import datetime, timedelta, timezone
|
||
from pathlib import Path
|
||
from typing import Any
|
||
|
||
|
||
TAIPEI = timezone(timedelta(hours=8))
|
||
|
||
|
||
@dataclass(frozen=True)
|
||
class NginxSource:
|
||
config_id: str
|
||
host: str
|
||
role: str
|
||
source_path: str
|
||
live_path: str
|
||
control_tier: str
|
||
owner_gate: str
|
||
|
||
|
||
SOURCES = [
|
||
NginxSource(
|
||
config_id="host188_all_sites",
|
||
host="192.168.0.188",
|
||
role="public_gateway_all_sites",
|
||
source_path="infra/ansible/roles/nginx/templates/188-all-sites.conf.j2",
|
||
live_path="/etc/nginx/sites-enabled/all-sites.conf",
|
||
control_tier="C0",
|
||
owner_gate="public_gateway_owner_response_required",
|
||
),
|
||
NginxSource(
|
||
config_id="host188_internal_tools_https",
|
||
host="192.168.0.188",
|
||
role="public_internal_tools_https",
|
||
source_path="infra/ansible/roles/nginx/templates/188-internal-tools-https.conf.j2",
|
||
live_path="owner_confirmation_required",
|
||
control_tier="C0",
|
||
owner_gate="public_tools_owner_response_required",
|
||
),
|
||
NginxSource(
|
||
config_id="host110_ollama_proxy",
|
||
host="192.168.0.110",
|
||
role="ollama_proxy_gateway",
|
||
source_path="infra/ansible/roles/nginx/templates/110-ollama-proxy.conf.j2",
|
||
live_path="/etc/nginx/sites-enabled/110-ollama-proxy.conf",
|
||
control_tier="C1",
|
||
owner_gate="ai_provider_proxy_owner_response_required",
|
||
),
|
||
]
|
||
|
||
|
||
def strip_comments(text: str) -> str:
|
||
lines: list[str] = []
|
||
for line in text.splitlines():
|
||
if "#" in line:
|
||
line = line.split("#", 1)[0]
|
||
lines.append(line)
|
||
return "\n".join(lines)
|
||
|
||
|
||
def normalized_text(text: str) -> str:
|
||
clean = strip_comments(text)
|
||
return "\n".join(line.strip() for line in clean.splitlines() if line.strip())
|
||
|
||
|
||
def sha256_text(text: str) -> str:
|
||
return hashlib.sha256(text.encode("utf-8")).hexdigest()
|
||
|
||
|
||
def match_closing_brace(text: str, open_brace: int) -> int:
|
||
depth = 0
|
||
for index in range(open_brace, len(text)):
|
||
char = text[index]
|
||
if char == "{":
|
||
depth += 1
|
||
elif char == "}":
|
||
depth -= 1
|
||
if depth == 0:
|
||
return index
|
||
return -1
|
||
|
||
|
||
def named_blocks(text: str, name: str) -> list[tuple[str, str]]:
|
||
clean = strip_comments(text)
|
||
if name == "server":
|
||
pattern = re.compile(r"\bserver\s*\{")
|
||
else:
|
||
pattern = re.compile(rf"\b{name}\s+([^{{]+)\{{")
|
||
|
||
blocks: list[tuple[str, str]] = []
|
||
for match in pattern.finditer(clean):
|
||
open_brace = clean.find("{", match.start())
|
||
close_brace = match_closing_brace(clean, open_brace)
|
||
if close_brace == -1:
|
||
continue
|
||
args = ""
|
||
if name != "server":
|
||
args = (match.group(1) or "").strip()
|
||
blocks.append((args, clean[match.start() : close_brace + 1]))
|
||
return blocks
|
||
|
||
|
||
def directive_values(block: str, directive: str) -> list[str]:
|
||
pattern = re.compile(rf"(?ms)^\s*{re.escape(directive)}\s+(.*?);")
|
||
return [" ".join(match.group(1).split()) for match in pattern.finditer(block)]
|
||
|
||
|
||
def split_words(value: str) -> list[str]:
|
||
return [part for part in re.split(r"\s+", value.strip()) if part]
|
||
|
||
|
||
def location_entries(block: str) -> list[dict[str, Any]]:
|
||
entries: list[dict[str, Any]] = []
|
||
for args, body in named_blocks(block, "location"):
|
||
proxy_passes = directive_values(body, "proxy_pass")
|
||
roots = directive_values(body, "root")
|
||
auth_basic = directive_values(body, "auth_basic")
|
||
entries.append(
|
||
{
|
||
"path": args,
|
||
"proxy_passes": proxy_passes,
|
||
"roots": roots,
|
||
"auth_basic": auth_basic,
|
||
"websocket_upgrade": "Upgrade $http_upgrade" in body
|
||
or "Connection \"upgrade\"" in body
|
||
or "Connection $connection_upgrade" in body,
|
||
}
|
||
)
|
||
return entries
|
||
|
||
|
||
def parse_nginx(text: str) -> dict[str, Any]:
|
||
servers: list[dict[str, Any]] = []
|
||
all_server_names: set[str] = set()
|
||
all_listens: set[str] = set()
|
||
all_proxy_passes: set[str] = set()
|
||
all_ssl_certificates: set[str] = set()
|
||
all_ssl_certificate_keys: set[str] = set()
|
||
admin_routes: list[dict[str, Any]] = []
|
||
acme_routes: list[dict[str, Any]] = []
|
||
websocket_routes: list[dict[str, Any]] = []
|
||
|
||
for index, (_, block) in enumerate(named_blocks(text, "server"), start=1):
|
||
names = [
|
||
word
|
||
for value in directive_values(block, "server_name")
|
||
for word in split_words(value)
|
||
if word and word != "_"
|
||
]
|
||
listens = directive_values(block, "listen")
|
||
ssl_certs = directive_values(block, "ssl_certificate")
|
||
ssl_keys = directive_values(block, "ssl_certificate_key")
|
||
locations = location_entries(block)
|
||
proxy_passes = [
|
||
proxy
|
||
for location in locations
|
||
for proxy in location.get("proxy_passes", [])
|
||
]
|
||
|
||
all_server_names.update(names)
|
||
all_listens.update(listens)
|
||
all_proxy_passes.update(proxy_passes)
|
||
all_ssl_certificates.update(ssl_certs)
|
||
all_ssl_certificate_keys.update(ssl_keys)
|
||
|
||
for location in locations:
|
||
path = str(location["path"])
|
||
entry = {
|
||
"server_names": names,
|
||
"path": path,
|
||
"proxy_passes": location.get("proxy_passes", []),
|
||
"roots": location.get("roots", []),
|
||
"auth_basic": location.get("auth_basic", []),
|
||
}
|
||
if "/admin" in path:
|
||
admin_routes.append(entry)
|
||
if ".well-known/acme-challenge" in path:
|
||
acme_routes.append(entry)
|
||
if location.get("websocket_upgrade"):
|
||
websocket_routes.append(entry)
|
||
|
||
servers.append(
|
||
{
|
||
"index": index,
|
||
"server_names": names,
|
||
"listens": listens,
|
||
"ssl_certificates": ssl_certs,
|
||
"ssl_certificate_keys": ssl_keys,
|
||
"proxy_passes": proxy_passes,
|
||
"locations": locations,
|
||
"has_tls": bool(ssl_certs or any("443" in item for item in listens)),
|
||
}
|
||
)
|
||
|
||
return {
|
||
"server_block_count": len(servers),
|
||
"server_names": sorted(all_server_names),
|
||
"listens": sorted(all_listens),
|
||
"proxy_passes": sorted(all_proxy_passes),
|
||
"ssl_certificates": sorted(all_ssl_certificates),
|
||
"ssl_certificate_keys": sorted(all_ssl_certificate_keys),
|
||
"admin_routes": admin_routes,
|
||
"acme_routes": acme_routes,
|
||
"websocket_routes": websocket_routes,
|
||
"servers": servers,
|
||
}
|
||
|
||
|
||
def git_short_sha(root: Path) -> str:
|
||
try:
|
||
result = subprocess.run(
|
||
["git", "rev-parse", "--short", "HEAD"],
|
||
cwd=root,
|
||
check=True,
|
||
capture_output=True,
|
||
text=True,
|
||
)
|
||
return result.stdout.strip()
|
||
except Exception:
|
||
return "unknown"
|
||
|
||
|
||
def read_source(path: Path) -> dict[str, Any]:
|
||
raw = path.read_text(encoding="utf-8")
|
||
normalized = normalized_text(raw)
|
||
parsed = parse_nginx(raw)
|
||
return {
|
||
"raw_sha256": sha256_text(raw),
|
||
"normalized_sha256": sha256_text(normalized),
|
||
"line_count": len(raw.splitlines()),
|
||
"parsed": parsed,
|
||
}
|
||
|
||
|
||
def compare_sets(source_values: list[str], live_values: list[str]) -> dict[str, list[str]]:
|
||
source_set = set(source_values)
|
||
live_set = set(live_values)
|
||
return {
|
||
"missing_in_live": sorted(source_set - live_set),
|
||
"extra_in_live": sorted(live_set - source_set),
|
||
}
|
||
|
||
|
||
def compare_config(source: dict[str, Any], live: dict[str, Any]) -> dict[str, Any]:
|
||
source_parsed = source["parsed"]
|
||
live_parsed = live["parsed"]
|
||
normalized_matches = source["normalized_sha256"] == live["normalized_sha256"]
|
||
semantic_diff = {
|
||
"server_names": compare_sets(source_parsed["server_names"], live_parsed["server_names"]),
|
||
"proxy_passes": compare_sets(source_parsed["proxy_passes"], live_parsed["proxy_passes"]),
|
||
"ssl_certificates": compare_sets(
|
||
source_parsed["ssl_certificates"],
|
||
live_parsed["ssl_certificates"],
|
||
),
|
||
}
|
||
has_semantic_diff = any(
|
||
diff["missing_in_live"] or diff["extra_in_live"]
|
||
for diff in semantic_diff.values()
|
||
)
|
||
return {
|
||
"normalized_hash_matches": normalized_matches,
|
||
"semantic_diff": semantic_diff,
|
||
"drift_detected": (not normalized_matches) or has_semantic_diff,
|
||
}
|
||
|
||
|
||
def parse_live_files(items: list[str]) -> dict[str, Path]:
|
||
live_files: dict[str, Path] = {}
|
||
for item in items:
|
||
if "=" not in item:
|
||
raise ValueError(f"--live-file 必須使用 config_id=/path 格式:{item}")
|
||
config_id, raw_path = item.split("=", 1)
|
||
live_files[config_id.strip()] = Path(raw_path.strip())
|
||
return live_files
|
||
|
||
|
||
def build_report(
|
||
root: Path,
|
||
live_files: dict[str, Path],
|
||
generated_at: str | None,
|
||
) -> dict[str, Any]:
|
||
report_time = generated_at or datetime.now(TAIPEI).isoformat(timespec="seconds")
|
||
configs: list[dict[str, Any]] = []
|
||
drift_count = 0
|
||
live_input_count = 0
|
||
|
||
for source in SOURCES:
|
||
source_path = root / source.source_path
|
||
source_report = read_source(source_path)
|
||
live_path = live_files.get(source.config_id)
|
||
live_report: dict[str, Any] | None = None
|
||
comparison: dict[str, Any] = {
|
||
"status": "repo_only_no_live_evidence",
|
||
"drift_detected": None,
|
||
"note": "尚未提供 live conf 匯出檔;本階段不 SSH、不讀 live、不 reload。",
|
||
}
|
||
|
||
if live_path is not None:
|
||
live_input_count += 1
|
||
if not live_path.exists():
|
||
comparison = {
|
||
"status": "live_file_missing",
|
||
"drift_detected": None,
|
||
"note": f"找不到 owner 提供的 live conf 匯出檔:{live_path}",
|
||
}
|
||
else:
|
||
live_report = read_source(live_path)
|
||
comparison = compare_config(source_report, live_report)
|
||
comparison["status"] = (
|
||
"drift_detected" if comparison["drift_detected"] else "matched"
|
||
)
|
||
if comparison["drift_detected"]:
|
||
drift_count += 1
|
||
|
||
configs.append(
|
||
{
|
||
"config_id": source.config_id,
|
||
"host": source.host,
|
||
"role": source.role,
|
||
"control_tier": source.control_tier,
|
||
"owner_gate": source.owner_gate,
|
||
"repo_source_path": source.source_path,
|
||
"live_path": source.live_path,
|
||
"repo_source": source_report,
|
||
"live_input": {
|
||
"provided": live_path is not None,
|
||
"path": str(live_path) if live_path else None,
|
||
"summary": live_report,
|
||
},
|
||
"comparison": comparison,
|
||
}
|
||
)
|
||
|
||
return {
|
||
"schema_version": "nginx_config_drift_detector_v1",
|
||
"generated_at": report_time,
|
||
"mode": "repo_only" if live_input_count == 0 else "compare_owner_provided_live_files",
|
||
"git_commit": git_short_sha(root),
|
||
"execution_boundaries": {
|
||
"ssh_executed": False,
|
||
"nginx_test_executed": False,
|
||
"nginx_reload_executed": False,
|
||
"host_write_executed": False,
|
||
"runtime_gate_opened": False,
|
||
"secret_value_collected": False,
|
||
},
|
||
"summary": {
|
||
"source_config_count": len(configs),
|
||
"live_input_count": live_input_count,
|
||
"drift_detected_count": drift_count,
|
||
"repo_source_inventory_complete": True,
|
||
"live_evidence_collected": live_input_count > 0,
|
||
},
|
||
"configs": configs,
|
||
"next_steps": [
|
||
"由 owner 提供脫敏 live conf 匯出檔後重跑比較模式。",
|
||
"若偵測 drift,只建立 evidence 與 owner decision,不自動覆寫 live。",
|
||
"任何 Nginx reload 仍需 maintenance window、rollback owner、nginx -t 與 route smoke。",
|
||
],
|
||
}
|
||
|
||
|
||
def main() -> int:
|
||
parser = argparse.ArgumentParser(description="IwoooS Nginx 只讀配置漂移偵測器")
|
||
parser.add_argument("--root", default=".", help="repo root")
|
||
parser.add_argument("--output", help="寫出 JSON 報告")
|
||
parser.add_argument(
|
||
"--live-file",
|
||
action="append",
|
||
default=[],
|
||
help="owner 提供的 live conf 匯出檔,格式:config_id=/path/to/file",
|
||
)
|
||
parser.add_argument("--generated-at", help="固定報告時間,供 committed snapshot 使用")
|
||
parser.add_argument("--fail-on-drift", action="store_true", help="偵測到 drift 時回傳 1")
|
||
args = parser.parse_args()
|
||
|
||
root = Path(args.root).resolve()
|
||
live_files = parse_live_files(args.live_file)
|
||
report = build_report(root, live_files, args.generated_at)
|
||
payload = json.dumps(report, ensure_ascii=False, indent=2, sort_keys=True)
|
||
|
||
if args.output:
|
||
output = Path(args.output)
|
||
output.parent.mkdir(parents=True, exist_ok=True)
|
||
output.write_text(payload + "\n", encoding="utf-8")
|
||
else:
|
||
print(payload)
|
||
|
||
if args.fail_on_drift and report["summary"]["drift_detected_count"] > 0:
|
||
return 1
|
||
return 0
|
||
|
||
|
||
if __name__ == "__main__":
|
||
sys.exit(main())
|