Files
awoooi/scripts/security/wazuh-managed-host-coverage-gate.py

289 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Wazuh 主機納管覆蓋 Gate。
本工具只驗證 repo 內的脫敏 snapshot不連線 Wazuh、不讀 secret、
不重新註冊 agent、不重啟服務、不修改主機也不啟用 active response。
"""
from __future__ import annotations
import argparse
import json
import re
from pathlib import Path
from typing import Any
SNAPSHOT_PATH = Path("docs/security/wazuh-managed-host-coverage-gate.snapshot.json")
SCHEMA_VERSION = "wazuh_managed_host_coverage_gate_v1"
HOST_SCOPE_MATRIX = [
{
"node_id": "managed_core_node_a",
"role": "核心服務節點",
"readback_status": "agent_active_transport_observed",
"manager_registry_accepted": False,
"next_gate": "manager_registry_cross_check",
},
{
"node_id": "managed_core_node_b",
"role": "資料服務節點",
"readback_status": "agent_active_transport_observed",
"manager_registry_accepted": False,
"next_gate": "manager_registry_cross_check",
},
{
"node_id": "managed_dev_node_a",
"role": "開發工作節點",
"readback_status": "no_agent_transport_observed",
"manager_registry_accepted": False,
"next_gate": "agent_install_or_service_owner_decision",
},
{
"node_id": "managed_dev_node_b",
"role": "開發工作節點",
"readback_status": "ssh_readback_blocked",
"manager_registry_accepted": False,
"next_gate": "read_only_access_or_owner_export",
},
{
"node_id": "managed_control_node_a",
"role": "控制平面節點",
"readback_status": "ssh_readback_blocked",
"manager_registry_accepted": False,
"next_gate": "read_only_access_or_owner_export",
},
{
"node_id": "managed_control_node_b",
"role": "控制平面節點",
"readback_status": "ssh_readback_blocked",
"manager_registry_accepted": False,
"next_gate": "read_only_access_or_owner_export",
},
]
REQUIRED_EVIDENCE_BEFORE_GREEN = [
"manager_registry_agent_counts",
"per_host_agent_scope_matrix",
"dashboard_api_rbac_tls_repair_readback",
"readonly_credential_metadata_without_secret",
"owner_response_and_rollback_owner",
"post_enable_iwooos_readback",
]
FORBIDDEN_COMPLETION_CLAIMS = [
"所有 Wazuh 用戶端已恢復",
"所有主機已納入 Wazuh",
"Wazuh agent registry 已驗收",
"Dashboard 可見等於 registry 已恢復",
"transport 連線等於全數納管",
]
FORBIDDEN_ACTIONS = [
"wazuh_agent_reenroll",
"wazuh_agent_restart",
"wazuh_manager_restart",
"wazuh_dashboard_secret_patch",
"active_response_enable",
"host_write",
"firewall_change",
"nginx_reload",
"kali_active_scan",
]
FORBIDDEN_TEXT_PATTERNS = [
re.compile(r"\b(?:10|127|172\.(?:1[6-9]|2\d|3[01])|192\.168)\.\d{1,3}\.\d{1,3}\b"),
re.compile(r"Authorization\s*:", re.IGNORECASE),
re.compile(r"Bearer\s+[A-Za-z0-9._-]{10,}", re.IGNORECASE),
re.compile(r"Basic\s+[A-Za-z0-9+/=]{10,}", re.IGNORECASE),
re.compile(r"password\s*[:=]\s*['\"][^'\"]+['\"]", re.IGNORECASE),
re.compile(r"token\s*[:=]\s*['\"][^'\"]+['\"]", re.IGNORECASE),
re.compile(r"cookie\s*[:=]\s*['\"][^'\"]+['\"]", re.IGNORECASE),
re.compile(r"client\.keys", re.IGNORECASE),
re.compile(r"-----BEGIN [A-Z ]*PRIVATE KEY-----"),
]
def load_json(path: Path) -> dict[str, Any]:
return json.loads(path.read_text(encoding="utf-8"))
def assert_equal(label: str, actual: Any, expected: Any) -> None:
if actual != expected:
raise SystemExit(f"BLOCKED {label}: expected {expected!r}, got {actual!r}")
def assert_false(label: str, actual: Any) -> None:
assert_equal(label, actual, False)
def assert_zero(label: str, actual: Any) -> None:
assert_equal(label, actual, 0)
def collect_string_values(value: Any) -> list[str]:
if isinstance(value, str):
return [value]
if isinstance(value, list):
values: list[str] = []
for item in value:
values.extend(collect_string_values(item))
return values
if isinstance(value, dict):
values = []
for item in value.values():
values.extend(collect_string_values(item))
return values
return []
def validate_no_forbidden_text(snapshot: dict[str, Any]) -> None:
for text in collect_string_values(snapshot):
for pattern in FORBIDDEN_TEXT_PATTERNS:
if pattern.search(text):
raise SystemExit("BLOCKED wazuh_managed_host_coverage_gate: snapshot contains forbidden sensitive text")
def build_snapshot(generated_at: str) -> dict[str, Any]:
return {
"schema_version": SCHEMA_VERSION,
"generated_at": generated_at,
"status": "blocked_waiting_full_host_registry_readback",
"mode": "snapshot_only_no_runtime_no_secret_collection",
"scope": "wazuh_managed_host_coverage",
"summary": {
"expected_host_scope_count": len(HOST_SCOPE_MATRIX),
"manager_service_active_observed_count": 1,
"manager_api_unauthenticated_response_count": 1,
"manager_transport_established_connection_count": 6,
"direct_agent_active_observed_count": 2,
"direct_agent_transport_observed_count": 2,
"direct_agent_missing_or_no_transport_count": 1,
"ssh_readback_blocked_count": 3,
"manager_registry_accepted_count": 0,
"dashboard_api_degraded_observed_count": 1,
"live_metadata_env_enabled_count": 0,
"active_response_authorized_count": 0,
"host_write_authorized_count": 0,
"agent_reenroll_authorized_count": 0,
"agent_restart_authorized_count": 0,
"runtime_gate_count": 0,
},
"host_scope_matrix": HOST_SCOPE_MATRIX,
"required_evidence_before_green": [
{"evidence_id": evidence_id, "accepted": False}
for evidence_id in REQUIRED_EVIDENCE_BEFORE_GREEN
],
"forbidden_completion_claims": FORBIDDEN_COMPLETION_CLAIMS,
"forbidden_actions": FORBIDDEN_ACTIONS,
"operator_interpretation": [
"目前只能確認部分節點有 agent service 與 transportmanager registry 仍沒有可驗收讀回。",
"Dashboard API、RBAC、rate-limit 或 TLS 退化會讓 UI 代理清單看起來消失,但不能用 UI 畫面單獨判定 agent 全部恢復。",
"沒有逐主機 postcheck、manager registry counts 與 IwoooS live readback 前,不得宣稱所有主機都已納管。",
"重新註冊 agent、重啟 Wazuh、修改主機或改機密都必須走獨立維護窗口與 rollback owner。",
],
"execution_boundaries": {
"wazuh_api_live_query_authorized": False,
"wazuh_agent_reenroll_authorized": False,
"wazuh_agent_restart_authorized": False,
"wazuh_manager_restart_authorized": False,
"wazuh_active_response_authorized": False,
"host_write_authorized": False,
"secret_value_collection_allowed": False,
"raw_wazuh_payload_storage_allowed": False,
"kali_active_scan_authorized": False,
"runtime_execution_authorized": False,
"not_authorization": True,
},
}
def validate(root: Path) -> None:
snapshot = load_json(root / SNAPSHOT_PATH)
assert_equal("schema_version", snapshot.get("schema_version"), SCHEMA_VERSION)
assert_equal("status", snapshot.get("status"), "blocked_waiting_full_host_registry_readback")
assert_equal("mode", snapshot.get("mode"), "snapshot_only_no_runtime_no_secret_collection")
assert_equal("scope", snapshot.get("scope"), "wazuh_managed_host_coverage")
summary = snapshot.get("summary", {})
assert_equal("summary.expected_host_scope_count", summary.get("expected_host_scope_count"), len(HOST_SCOPE_MATRIX))
assert_equal("summary.manager_service_active_observed_count", summary.get("manager_service_active_observed_count"), 1)
assert_equal("summary.manager_api_unauthenticated_response_count", summary.get("manager_api_unauthenticated_response_count"), 1)
assert_equal("summary.manager_transport_established_connection_count", summary.get("manager_transport_established_connection_count"), 6)
assert_equal("summary.direct_agent_active_observed_count", summary.get("direct_agent_active_observed_count"), 2)
assert_equal("summary.direct_agent_transport_observed_count", summary.get("direct_agent_transport_observed_count"), 2)
assert_equal("summary.direct_agent_missing_or_no_transport_count", summary.get("direct_agent_missing_or_no_transport_count"), 1)
assert_equal("summary.ssh_readback_blocked_count", summary.get("ssh_readback_blocked_count"), 3)
assert_zero("summary.manager_registry_accepted_count", summary.get("manager_registry_accepted_count"))
assert_equal("summary.dashboard_api_degraded_observed_count", summary.get("dashboard_api_degraded_observed_count"), 1)
for key in [
"live_metadata_env_enabled_count",
"active_response_authorized_count",
"host_write_authorized_count",
"agent_reenroll_authorized_count",
"agent_restart_authorized_count",
"runtime_gate_count",
]:
assert_zero(f"summary.{key}", summary.get(key))
assert_equal("host_scope_matrix", snapshot.get("host_scope_matrix"), HOST_SCOPE_MATRIX)
for item in snapshot.get("host_scope_matrix", []):
assert_false(f"host_scope_matrix.{item.get('node_id')}.manager_registry_accepted", item.get("manager_registry_accepted"))
required = snapshot.get("required_evidence_before_green", [])
assert_equal("required_evidence_before_green.count", len(required), len(REQUIRED_EVIDENCE_BEFORE_GREEN))
assert_equal(
"required_evidence_before_green.ids",
[item.get("evidence_id") for item in required],
REQUIRED_EVIDENCE_BEFORE_GREEN,
)
for item in required:
assert_false(f"required_evidence_before_green.{item.get('evidence_id')}.accepted", item.get("accepted"))
assert_equal("forbidden_completion_claims", snapshot.get("forbidden_completion_claims"), FORBIDDEN_COMPLETION_CLAIMS)
assert_equal("forbidden_actions", snapshot.get("forbidden_actions"), FORBIDDEN_ACTIONS)
boundaries = snapshot.get("execution_boundaries", {})
for key, value in boundaries.items():
if key == "not_authorization":
assert_equal(f"execution_boundaries.{key}", value, True)
else:
assert_false(f"execution_boundaries.{key}", value)
validate_no_forbidden_text(snapshot)
def main() -> None:
parser = argparse.ArgumentParser(description="Wazuh 主機納管覆蓋 Gate")
parser.add_argument("--root", type=Path, default=Path.cwd())
parser.add_argument("--output", type=Path)
parser.add_argument("--generated-at", default="2026-06-25T11:45:31+08:00")
parser.add_argument("--json", action="store_true")
args = parser.parse_args()
root = args.root.resolve()
if args.output:
snapshot = build_snapshot(args.generated_at)
output = args.output if args.output.is_absolute() else root / args.output
output.parent.mkdir(parents=True, exist_ok=True)
output.write_text(json.dumps(snapshot, ensure_ascii=False, indent=2, sort_keys=True) + "\n", encoding="utf-8")
validate(root)
snapshot = load_json(root / SNAPSHOT_PATH)
if args.json:
print(json.dumps(snapshot, ensure_ascii=False, sort_keys=True))
return
summary = snapshot["summary"]
print(
"WAZUH_MANAGED_HOST_COVERAGE_GATE_OK "
f"scope={summary['expected_host_scope_count']} "
f"direct_active={summary['direct_agent_active_observed_count']} "
f"no_transport={summary['direct_agent_missing_or_no_transport_count']} "
f"ssh_blocked={summary['ssh_readback_blocked_count']} "
f"registry={summary['manager_registry_accepted_count']} "
f"runtime_gate={summary['runtime_gate_count']}"
)
if __name__ == "__main__":
main()