Files
awoooi/scripts/security/wazuh-agent-visibility-runtime-gate.py
ogt 6ca53fafc9
Some checks failed
Code Review / ai-code-review (push) Successful in 13s
CD Pipeline / tests (push) Successful in 1m44s
CD Pipeline / post-deploy-checks (push) Has been cancelled
CD Pipeline / build-and-deploy (push) Has been cancelled
Ansible / Reboot Recovery Contract / validate (push) Has been cancelled
feat(iwooos): gate Wazuh dashboard API readiness
2026-06-25 18:04:58 +08:00

246 lines
9.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
檢查 Wazuh agent visibility runtime gate 的 no-false-green 邊界。
本 guard 只驗證 repo 內的脫敏 snapshot不連線 Wazuh、不讀 secret、
不重新註冊 agent、不啟用 active response也不做任何 runtime 寫入。
"""
from __future__ import annotations
import argparse
import json
import re
from pathlib import Path
from typing import Any
SNAPSHOT_PATH = Path("docs/security/wazuh-agent-visibility-runtime-gate.snapshot.json")
SCHEMA_VERSION = "wazuh_agent_visibility_runtime_gate_v1"
FORBIDDEN_TEXT_PATTERNS = [
re.compile(r"Authorization\s*:", re.IGNORECASE),
re.compile(r"Bearer\s+[A-Za-z0-9._-]{10,}", re.IGNORECASE),
re.compile(r"Basic\s+[A-Za-z0-9+/=]{10,}", re.IGNORECASE),
re.compile(r"password\s*[:=]\s*['\"][^'\"]+['\"]", re.IGNORECASE),
re.compile(r"token\s*[:=]\s*['\"][^'\"]+['\"]", re.IGNORECASE),
re.compile(r"cookie\s*[:=]\s*['\"][^'\"]+['\"]", re.IGNORECASE),
re.compile(r"-----BEGIN [A-Z ]*PRIVATE KEY-----"),
]
def load_json(path: Path) -> dict[str, Any]:
return json.loads(path.read_text(encoding="utf-8"))
def assert_equal(label: str, actual: Any, expected: Any) -> None:
if actual != expected:
raise SystemExit(f"BLOCKED {label}: expected {expected!r}, got {actual!r}")
def assert_false(label: str, actual: Any) -> None:
assert_equal(label, actual, False)
def assert_true(label: str, actual: Any) -> None:
assert_equal(label, actual, True)
def assert_zero(label: str, actual: Any) -> None:
assert_equal(label, actual, 0)
def assert_at_least(label: str, actual: Any, minimum: int) -> None:
if not isinstance(actual, int) or actual < minimum:
raise SystemExit(f"BLOCKED {label}: expected >= {minimum!r}, got {actual!r}")
def collect_string_values(value: Any) -> list[str]:
if isinstance(value, str):
return [value]
if isinstance(value, list):
values: list[str] = []
for item in value:
values.extend(collect_string_values(item))
return values
if isinstance(value, dict):
values = []
for item in value.values():
values.extend(collect_string_values(item))
return values
return []
def validate_no_secret_values(snapshot: dict[str, Any]) -> None:
for text in collect_string_values(snapshot):
for pattern in FORBIDDEN_TEXT_PATTERNS:
if pattern.search(text):
raise SystemExit("BLOCKED wazuh_agent_visibility_runtime_gate: snapshot contains forbidden secret-shaped text")
def validate_required_evidence(snapshot: dict[str, Any]) -> None:
required = snapshot.get("required_evidence_before_green", [])
required_ids = {item.get("evidence_id") for item in required if isinstance(item, dict)}
expected_ids = {
"manager_agent_registry_counts",
"iwooos_live_route_readback",
"dashboard_api_check_repaired_or_explained",
"readonly_account_scope",
"owner_response",
}
missing = sorted(expected_ids - required_ids)
if missing:
raise SystemExit(f"BLOCKED wazuh_agent_visibility_runtime_gate.required_evidence: missing {missing!r}")
for item in required:
assert_equal(
f"wazuh_agent_visibility_runtime_gate.required_evidence.{item.get('evidence_id')}.accepted",
item.get("accepted"),
False,
)
def validate(root: Path) -> None:
path = root / SNAPSHOT_PATH
snapshot = load_json(path)
assert_equal("wazuh_agent_visibility_runtime_gate.schema_version", snapshot.get("schema_version"), SCHEMA_VERSION)
assert_equal(
"wazuh_agent_visibility_runtime_gate.status",
snapshot.get("status"),
"blocked_waiting_manager_agent_registry_readback",
)
assert_equal(
"wazuh_agent_visibility_runtime_gate.mode",
snapshot.get("mode"),
"snapshot_only_no_runtime_no_secret_collection",
)
assert_zero("wazuh_agent_visibility_runtime_gate.runtime_gate_count", snapshot.get("runtime_gate_count"))
assert_false(
"wazuh_agent_visibility_runtime_gate.manager_agent_registry_readback_passed",
snapshot.get("manager_agent_registry_readback_passed"),
)
assert_false(
"wazuh_agent_visibility_runtime_gate.iwooos_live_route_readback_passed",
snapshot.get("iwooos_live_route_readback_passed"),
)
assert_false(
"wazuh_agent_visibility_runtime_gate.dashboard_agent_list_recovered",
snapshot.get("dashboard_agent_list_recovered"),
)
assert_false(
"wazuh_agent_visibility_runtime_gate.active_response_authorized",
snapshot.get("active_response_authorized"),
)
assert_false("wazuh_agent_visibility_runtime_gate.host_write_authorized", snapshot.get("host_write_authorized"))
assert_false(
"wazuh_agent_visibility_runtime_gate.secret_value_collection_allowed",
snapshot.get("secret_value_collection_allowed"),
)
assert_true(
"wazuh_agent_visibility_runtime_gate.manager_services_active_observed",
snapshot.get("manager_services_active_observed"),
)
assert_true(
"wazuh_agent_visibility_runtime_gate.agent_transport_connected_observed",
snapshot.get("agent_transport_connected_observed"),
)
assert_at_least(
"wazuh_agent_visibility_runtime_gate.manager_transport_established_connection_count",
snapshot.get("manager_transport_established_connection_count"),
1,
)
assert_true(
"wazuh_agent_visibility_runtime_gate.dashboard_api_degraded_observed",
snapshot.get("dashboard_api_degraded_observed"),
)
assert_true(
"wazuh_agent_visibility_runtime_gate.dashboard_startup_check_observed",
snapshot.get("dashboard_startup_check_observed"),
)
assert_equal(
"wazuh_agent_visibility_runtime_gate.dashboard_api_connection_check_status",
snapshot.get("dashboard_api_connection_check_status"),
"pending_or_spinning",
)
assert_equal(
"wazuh_agent_visibility_runtime_gate.dashboard_api_version_check_status",
snapshot.get("dashboard_api_version_check_status"),
"not_verified",
)
assert_equal(
"wazuh_agent_visibility_runtime_gate.dashboard_alerts_index_pattern_check_status",
snapshot.get("dashboard_alerts_index_pattern_check_status"),
"ok",
)
assert_equal(
"wazuh_agent_visibility_runtime_gate.dashboard_monitoring_index_pattern_check_status",
snapshot.get("dashboard_monitoring_index_pattern_check_status"),
"ok",
)
assert_equal(
"wazuh_agent_visibility_runtime_gate.dashboard_statistics_index_pattern_check_status",
snapshot.get("dashboard_statistics_index_pattern_check_status"),
"ok",
)
assert_equal(
"wazuh_agent_visibility_runtime_gate.dashboard_index_pattern_ok_count",
snapshot.get("dashboard_index_pattern_ok_count"),
3,
)
assert_zero(
"wazuh_agent_visibility_runtime_gate.dashboard_api_connection_ok_count",
snapshot.get("dashboard_api_connection_ok_count"),
)
assert_zero(
"wazuh_agent_visibility_runtime_gate.dashboard_api_version_ok_count",
snapshot.get("dashboard_api_version_ok_count"),
)
for key in [
"dashboard_stored_api_unreachable_observed",
"dashboard_api_login_500_observed",
"dashboard_api_rate_limited_observed",
"dashboard_api_run_as_permission_error_observed",
"dashboard_api_tls_client_cert_unknown_observed",
"manager_registry_cli_permission_blocked",
"manager_registry_cli_requires_privilege",
]:
assert_true(f"wazuh_agent_visibility_runtime_gate.{key}", snapshot.get(key))
assert_equal(
"wazuh_agent_visibility_runtime_gate.production_route_http_status",
snapshot.get("production_route_http_status"),
200,
)
expected_error_codes = {400, 429, 500}
actual_error_codes = set(snapshot.get("dashboard_error_codes_observed", []))
missing_error_codes = sorted(expected_error_codes - actual_error_codes)
if missing_error_codes:
raise SystemExit(f"BLOCKED wazuh_agent_visibility_runtime_gate.dashboard_error_codes_observed: missing {missing_error_codes!r}")
validate_required_evidence(snapshot)
validate_no_secret_values(snapshot)
def main() -> None:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--root", type=Path, default=Path.cwd())
parser.add_argument("--json", action="store_true")
args = parser.parse_args()
root = args.root.resolve()
validate(root)
snapshot = load_json(root / SNAPSHOT_PATH)
if args.json:
print(json.dumps(snapshot, ensure_ascii=False, indent=2))
return
print(
"WAZUH_AGENT_VISIBILITY_RUNTIME_GATE_OK "
f"registry=0 route={snapshot['production_route_http_status']} "
f"transport={snapshot['manager_transport_established_connection_count']} "
f"dashboard_degraded={int(snapshot['dashboard_api_degraded_observed'])} "
f"api_connection={snapshot['dashboard_api_connection_check_status']} "
f"index_ok={snapshot['dashboard_index_pattern_ok_count']} "
f"runtime_gate={snapshot['runtime_gate_count']}"
)
if __name__ == "__main__":
main()