docs(iwooos): guard Wazuh agent visibility incident
This commit is contained in:
@@ -111,6 +111,10 @@ def validate(root: Path) -> None:
|
||||
str(root / "scripts" / "security" / "wazuh-readonly-live-metadata-env-gate.py")
|
||||
)
|
||||
wazuh_readonly_live_metadata_env_gate["validate"](root)
|
||||
wazuh_agent_visibility_runtime_gate = runpy.run_path(
|
||||
str(root / "scripts" / "security" / "wazuh-agent-visibility-runtime-gate.py")
|
||||
)
|
||||
wazuh_agent_visibility_runtime_gate["validate"](root)
|
||||
telegram_alert_readability_guard = runpy.run_path(
|
||||
str(root / "scripts" / "security" / "telegram-alert-readability-guard.py")
|
||||
)
|
||||
|
||||
175
scripts/security/wazuh-agent-visibility-runtime-gate.py
Normal file
175
scripts/security/wazuh-agent-visibility-runtime-gate.py
Normal file
@@ -0,0 +1,175 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
檢查 Wazuh agent visibility runtime gate 的 no-false-green 邊界。
|
||||
|
||||
本 guard 只驗證 repo 內的脫敏 snapshot,不連線 Wazuh、不讀 secret、
|
||||
不重新註冊 agent、不啟用 active response,也不做任何 runtime 寫入。
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import re
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
|
||||
SNAPSHOT_PATH = Path("docs/security/wazuh-agent-visibility-runtime-gate.snapshot.json")
|
||||
SCHEMA_VERSION = "wazuh_agent_visibility_runtime_gate_v1"
|
||||
|
||||
FORBIDDEN_TEXT_PATTERNS = [
|
||||
re.compile(r"Authorization\s*:", re.IGNORECASE),
|
||||
re.compile(r"Bearer\s+[A-Za-z0-9._-]{10,}", re.IGNORECASE),
|
||||
re.compile(r"Basic\s+[A-Za-z0-9+/=]{10,}", re.IGNORECASE),
|
||||
re.compile(r"password\s*[:=]\s*['\"][^'\"]+['\"]", re.IGNORECASE),
|
||||
re.compile(r"token\s*[:=]\s*['\"][^'\"]+['\"]", re.IGNORECASE),
|
||||
re.compile(r"cookie\s*[:=]\s*['\"][^'\"]+['\"]", re.IGNORECASE),
|
||||
re.compile(r"-----BEGIN [A-Z ]*PRIVATE KEY-----"),
|
||||
]
|
||||
|
||||
|
||||
def load_json(path: Path) -> dict[str, Any]:
|
||||
return json.loads(path.read_text(encoding="utf-8"))
|
||||
|
||||
|
||||
def assert_equal(label: str, actual: Any, expected: Any) -> None:
|
||||
if actual != expected:
|
||||
raise SystemExit(f"BLOCKED {label}: expected {expected!r}, got {actual!r}")
|
||||
|
||||
|
||||
def assert_false(label: str, actual: Any) -> None:
|
||||
assert_equal(label, actual, False)
|
||||
|
||||
|
||||
def assert_true(label: str, actual: Any) -> None:
|
||||
assert_equal(label, actual, True)
|
||||
|
||||
|
||||
def assert_zero(label: str, actual: Any) -> None:
|
||||
assert_equal(label, actual, 0)
|
||||
|
||||
|
||||
def collect_string_values(value: Any) -> list[str]:
|
||||
if isinstance(value, str):
|
||||
return [value]
|
||||
if isinstance(value, list):
|
||||
values: list[str] = []
|
||||
for item in value:
|
||||
values.extend(collect_string_values(item))
|
||||
return values
|
||||
if isinstance(value, dict):
|
||||
values = []
|
||||
for item in value.values():
|
||||
values.extend(collect_string_values(item))
|
||||
return values
|
||||
return []
|
||||
|
||||
|
||||
def validate_no_secret_values(snapshot: dict[str, Any]) -> None:
|
||||
for text in collect_string_values(snapshot):
|
||||
for pattern in FORBIDDEN_TEXT_PATTERNS:
|
||||
if pattern.search(text):
|
||||
raise SystemExit("BLOCKED wazuh_agent_visibility_runtime_gate: snapshot contains forbidden secret-shaped text")
|
||||
|
||||
|
||||
def validate_required_evidence(snapshot: dict[str, Any]) -> None:
|
||||
required = snapshot.get("required_evidence_before_green", [])
|
||||
required_ids = {item.get("evidence_id") for item in required if isinstance(item, dict)}
|
||||
expected_ids = {
|
||||
"manager_agent_registry_counts",
|
||||
"iwooos_live_route_readback",
|
||||
"dashboard_api_check_repaired_or_explained",
|
||||
"readonly_account_scope",
|
||||
"owner_response",
|
||||
}
|
||||
missing = sorted(expected_ids - required_ids)
|
||||
if missing:
|
||||
raise SystemExit(f"BLOCKED wazuh_agent_visibility_runtime_gate.required_evidence: missing {missing!r}")
|
||||
|
||||
for item in required:
|
||||
assert_equal(
|
||||
f"wazuh_agent_visibility_runtime_gate.required_evidence.{item.get('evidence_id')}.accepted",
|
||||
item.get("accepted"),
|
||||
False,
|
||||
)
|
||||
|
||||
|
||||
def validate(root: Path) -> None:
|
||||
path = root / SNAPSHOT_PATH
|
||||
snapshot = load_json(path)
|
||||
assert_equal("wazuh_agent_visibility_runtime_gate.schema_version", snapshot.get("schema_version"), SCHEMA_VERSION)
|
||||
assert_equal(
|
||||
"wazuh_agent_visibility_runtime_gate.status",
|
||||
snapshot.get("status"),
|
||||
"blocked_waiting_manager_agent_registry_readback",
|
||||
)
|
||||
assert_equal(
|
||||
"wazuh_agent_visibility_runtime_gate.mode",
|
||||
snapshot.get("mode"),
|
||||
"snapshot_only_no_runtime_no_secret_collection",
|
||||
)
|
||||
assert_zero("wazuh_agent_visibility_runtime_gate.runtime_gate_count", snapshot.get("runtime_gate_count"))
|
||||
assert_false(
|
||||
"wazuh_agent_visibility_runtime_gate.manager_agent_registry_readback_passed",
|
||||
snapshot.get("manager_agent_registry_readback_passed"),
|
||||
)
|
||||
assert_false(
|
||||
"wazuh_agent_visibility_runtime_gate.iwooos_live_route_readback_passed",
|
||||
snapshot.get("iwooos_live_route_readback_passed"),
|
||||
)
|
||||
assert_false(
|
||||
"wazuh_agent_visibility_runtime_gate.dashboard_agent_list_recovered",
|
||||
snapshot.get("dashboard_agent_list_recovered"),
|
||||
)
|
||||
assert_false(
|
||||
"wazuh_agent_visibility_runtime_gate.active_response_authorized",
|
||||
snapshot.get("active_response_authorized"),
|
||||
)
|
||||
assert_false("wazuh_agent_visibility_runtime_gate.host_write_authorized", snapshot.get("host_write_authorized"))
|
||||
assert_false(
|
||||
"wazuh_agent_visibility_runtime_gate.secret_value_collection_allowed",
|
||||
snapshot.get("secret_value_collection_allowed"),
|
||||
)
|
||||
assert_true(
|
||||
"wazuh_agent_visibility_runtime_gate.manager_services_active_observed",
|
||||
snapshot.get("manager_services_active_observed"),
|
||||
)
|
||||
assert_true(
|
||||
"wazuh_agent_visibility_runtime_gate.agent_transport_connected_observed",
|
||||
snapshot.get("agent_transport_connected_observed"),
|
||||
)
|
||||
assert_true(
|
||||
"wazuh_agent_visibility_runtime_gate.dashboard_api_degraded_observed",
|
||||
snapshot.get("dashboard_api_degraded_observed"),
|
||||
)
|
||||
assert_equal(
|
||||
"wazuh_agent_visibility_runtime_gate.production_route_http_status",
|
||||
snapshot.get("production_route_http_status"),
|
||||
404,
|
||||
)
|
||||
validate_required_evidence(snapshot)
|
||||
validate_no_secret_values(snapshot)
|
||||
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser(description=__doc__)
|
||||
parser.add_argument("--root", type=Path, default=Path.cwd())
|
||||
parser.add_argument("--json", action="store_true")
|
||||
args = parser.parse_args()
|
||||
root = args.root.resolve()
|
||||
validate(root)
|
||||
snapshot = load_json(root / SNAPSHOT_PATH)
|
||||
if args.json:
|
||||
print(json.dumps(snapshot, ensure_ascii=False, indent=2))
|
||||
return
|
||||
print(
|
||||
"WAZUH_AGENT_VISIBILITY_RUNTIME_GATE_OK "
|
||||
f"registry=0 route={snapshot['production_route_http_status']} "
|
||||
f"dashboard_degraded={int(snapshot['dashboard_api_degraded_observed'])} "
|
||||
f"runtime_gate={snapshot['runtime_gate_count']}"
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user