#!/usr/bin/env python3 """ 檢查 Wazuh agent visibility runtime gate 的 no-false-green 邊界。 本 guard 只驗證 repo 內的脫敏 snapshot,不連線 Wazuh、不讀 secret、 不重新註冊 agent、不啟用 active response,也不做任何 runtime 寫入。 """ from __future__ import annotations import argparse import json import re from pathlib import Path from typing import Any SNAPSHOT_PATH = Path("docs/security/wazuh-agent-visibility-runtime-gate.snapshot.json") SCHEMA_VERSION = "wazuh_agent_visibility_runtime_gate_v1" FORBIDDEN_TEXT_PATTERNS = [ re.compile(r"Authorization\s*:", re.IGNORECASE), re.compile(r"Bearer\s+[A-Za-z0-9._-]{10,}", re.IGNORECASE), re.compile(r"Basic\s+[A-Za-z0-9+/=]{10,}", re.IGNORECASE), re.compile(r"password\s*[:=]\s*['\"][^'\"]+['\"]", re.IGNORECASE), re.compile(r"token\s*[:=]\s*['\"][^'\"]+['\"]", re.IGNORECASE), re.compile(r"cookie\s*[:=]\s*['\"][^'\"]+['\"]", re.IGNORECASE), re.compile(r"-----BEGIN [A-Z ]*PRIVATE KEY-----"), ] def load_json(path: Path) -> dict[str, Any]: return json.loads(path.read_text(encoding="utf-8")) def assert_equal(label: str, actual: Any, expected: Any) -> None: if actual != expected: raise SystemExit(f"BLOCKED {label}: expected {expected!r}, got {actual!r}") def assert_false(label: str, actual: Any) -> None: assert_equal(label, actual, False) def assert_true(label: str, actual: Any) -> None: assert_equal(label, actual, True) def assert_zero(label: str, actual: Any) -> None: assert_equal(label, actual, 0) def assert_at_least(label: str, actual: Any, minimum: int) -> None: if not isinstance(actual, int) or actual < minimum: raise SystemExit(f"BLOCKED {label}: expected >= {minimum!r}, got {actual!r}") def collect_string_values(value: Any) -> list[str]: if isinstance(value, str): return [value] if isinstance(value, list): values: list[str] = [] for item in value: values.extend(collect_string_values(item)) return values if isinstance(value, dict): values = [] for item in value.values(): values.extend(collect_string_values(item)) return values return [] def validate_no_secret_values(snapshot: dict[str, Any]) -> None: for text in collect_string_values(snapshot): for pattern in FORBIDDEN_TEXT_PATTERNS: if pattern.search(text): raise SystemExit("BLOCKED wazuh_agent_visibility_runtime_gate: snapshot contains forbidden secret-shaped text") def validate_required_evidence(snapshot: dict[str, Any]) -> None: required = snapshot.get("required_evidence_before_green", []) required_ids = {item.get("evidence_id") for item in required if isinstance(item, dict)} expected_ids = { "manager_agent_registry_counts", "iwooos_live_route_readback", "dashboard_api_check_repaired_or_explained", "readonly_account_scope", "owner_response", } missing = sorted(expected_ids - required_ids) if missing: raise SystemExit(f"BLOCKED wazuh_agent_visibility_runtime_gate.required_evidence: missing {missing!r}") for item in required: assert_equal( f"wazuh_agent_visibility_runtime_gate.required_evidence.{item.get('evidence_id')}.accepted", item.get("accepted"), False, ) def validate(root: Path) -> None: path = root / SNAPSHOT_PATH snapshot = load_json(path) assert_equal("wazuh_agent_visibility_runtime_gate.schema_version", snapshot.get("schema_version"), SCHEMA_VERSION) assert_equal( "wazuh_agent_visibility_runtime_gate.status", snapshot.get("status"), "blocked_waiting_manager_agent_registry_readback", ) assert_equal( "wazuh_agent_visibility_runtime_gate.mode", snapshot.get("mode"), "snapshot_only_no_runtime_no_secret_collection", ) assert_zero("wazuh_agent_visibility_runtime_gate.runtime_gate_count", snapshot.get("runtime_gate_count")) assert_false( "wazuh_agent_visibility_runtime_gate.manager_agent_registry_readback_passed", snapshot.get("manager_agent_registry_readback_passed"), ) assert_false( "wazuh_agent_visibility_runtime_gate.iwooos_live_route_readback_passed", snapshot.get("iwooos_live_route_readback_passed"), ) assert_false( "wazuh_agent_visibility_runtime_gate.dashboard_agent_list_recovered", snapshot.get("dashboard_agent_list_recovered"), ) assert_false( "wazuh_agent_visibility_runtime_gate.active_response_authorized", snapshot.get("active_response_authorized"), ) assert_false("wazuh_agent_visibility_runtime_gate.host_write_authorized", snapshot.get("host_write_authorized")) assert_false( "wazuh_agent_visibility_runtime_gate.secret_value_collection_allowed", snapshot.get("secret_value_collection_allowed"), ) assert_true( "wazuh_agent_visibility_runtime_gate.manager_services_active_observed", snapshot.get("manager_services_active_observed"), ) assert_true( "wazuh_agent_visibility_runtime_gate.agent_transport_connected_observed", snapshot.get("agent_transport_connected_observed"), ) assert_at_least( "wazuh_agent_visibility_runtime_gate.manager_transport_established_connection_count", snapshot.get("manager_transport_established_connection_count"), 1, ) assert_true( "wazuh_agent_visibility_runtime_gate.dashboard_api_degraded_observed", snapshot.get("dashboard_api_degraded_observed"), ) assert_true( "wazuh_agent_visibility_runtime_gate.dashboard_startup_check_observed", snapshot.get("dashboard_startup_check_observed"), ) assert_equal( "wazuh_agent_visibility_runtime_gate.dashboard_api_connection_check_status", snapshot.get("dashboard_api_connection_check_status"), "pending_or_spinning", ) assert_equal( "wazuh_agent_visibility_runtime_gate.dashboard_api_version_check_status", snapshot.get("dashboard_api_version_check_status"), "not_verified", ) assert_equal( "wazuh_agent_visibility_runtime_gate.dashboard_alerts_index_pattern_check_status", snapshot.get("dashboard_alerts_index_pattern_check_status"), "ok", ) assert_equal( "wazuh_agent_visibility_runtime_gate.dashboard_monitoring_index_pattern_check_status", snapshot.get("dashboard_monitoring_index_pattern_check_status"), "ok", ) assert_equal( "wazuh_agent_visibility_runtime_gate.dashboard_statistics_index_pattern_check_status", snapshot.get("dashboard_statistics_index_pattern_check_status"), "ok", ) assert_equal( "wazuh_agent_visibility_runtime_gate.dashboard_index_pattern_ok_count", snapshot.get("dashboard_index_pattern_ok_count"), 3, ) assert_zero( "wazuh_agent_visibility_runtime_gate.dashboard_api_connection_ok_count", snapshot.get("dashboard_api_connection_ok_count"), ) assert_zero( "wazuh_agent_visibility_runtime_gate.dashboard_api_version_ok_count", snapshot.get("dashboard_api_version_ok_count"), ) for key in [ "dashboard_stored_api_unreachable_observed", "dashboard_api_login_500_observed", "dashboard_api_rate_limited_observed", "dashboard_api_run_as_permission_error_observed", "dashboard_api_tls_client_cert_unknown_observed", "manager_registry_cli_permission_blocked", "manager_registry_cli_requires_privilege", ]: assert_true(f"wazuh_agent_visibility_runtime_gate.{key}", snapshot.get(key)) assert_equal( "wazuh_agent_visibility_runtime_gate.production_route_http_status", snapshot.get("production_route_http_status"), 200, ) expected_error_codes = {400, 429, 500} actual_error_codes = set(snapshot.get("dashboard_error_codes_observed", [])) missing_error_codes = sorted(expected_error_codes - actual_error_codes) if missing_error_codes: raise SystemExit(f"BLOCKED wazuh_agent_visibility_runtime_gate.dashboard_error_codes_observed: missing {missing_error_codes!r}") validate_required_evidence(snapshot) validate_no_secret_values(snapshot) def main() -> None: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--root", type=Path, default=Path.cwd()) parser.add_argument("--json", action="store_true") args = parser.parse_args() root = args.root.resolve() validate(root) snapshot = load_json(root / SNAPSHOT_PATH) if args.json: print(json.dumps(snapshot, ensure_ascii=False, indent=2)) return print( "WAZUH_AGENT_VISIBILITY_RUNTIME_GATE_OK " f"registry=0 route={snapshot['production_route_http_status']} " f"transport={snapshot['manager_transport_established_connection_count']} " f"dashboard_degraded={int(snapshot['dashboard_api_degraded_observed'])} " f"api_connection={snapshot['dashboard_api_connection_check_status']} " f"index_ok={snapshot['dashboard_index_pattern_ok_count']} " f"runtime_gate={snapshot['runtime_gate_count']}" ) if __name__ == "__main__": main()