Files
awoooi/scripts/security/wazuh-readonly-production-readback.py

190 lines
7.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
IwoooS Wazuh 只讀 API production readback 驗收。
本工具只做 HTTPS GET不收 secret、不連 Wazuh manager、不做 active
response、不碰主機。預設模式用於部署後驗收production API 不可是 404。
若 release 尚未部署,可加 --allow-predeploy-404 記錄目前仍未上線。
"""
from __future__ import annotations
import argparse
import json
import re
import sys
from dataclasses import dataclass
from typing import Any
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
DEFAULT_URL = "https://awoooi.wooo.work/api/iwooos/wazuh"
EXPECTED_SCHEMA = "iwooos_wazuh_readonly_status_v1"
ALLOWED_STATUSES = {
"disabled_waiting_iwooos_wazuh_owner_gate",
"misconfigured_missing_server_side_wazuh_env",
"wazuh_auth_token_missing",
"wazuh_readonly_metadata_unavailable",
"wazuh_agent_registry_empty",
"wazuh_agent_registry_below_expected",
"readonly_metadata_available",
}
FORBIDDEN_RESPONSE_PATTERNS = [
("private_ipv4", re.compile(r"\b(?:10|127|172\.(?:1[6-9]|2\d|3[01])|192\.168)\.\d{1,3}\.\d{1,3}\b")),
("known_secret_shape", re.compile(r"Wooo-[0-9]{6,}")),
("token_like_field", re.compile(r'"(?:token|password|secret|private_key|runner_token)"\s*:', re.IGNORECASE)),
(
"raw_payload_marker",
re.compile(r"raw[_ -]?(?:log|event|alert|body|request|response)", re.IGNORECASE),
),
("legacy_fake_soc_copy", re.compile(r"IWOOOS SOC Dashboard|Threat Blocked|Recent Automated Responses", re.IGNORECASE)),
]
@dataclass(frozen=True)
class HttpResult:
status_code: int
body: str
content_type: str
def fetch_url(url: str, timeout_seconds: float) -> HttpResult:
request = Request(url, headers={"Accept": "application/json", "User-Agent": "iwooos-wazuh-readback/1.0"})
try:
with urlopen(request, timeout=timeout_seconds) as response:
body = response.read().decode("utf-8", errors="replace")
return HttpResult(
status_code=response.status,
body=body,
content_type=response.headers.get("content-type", ""),
)
except HTTPError as error:
body = error.read().decode("utf-8", errors="replace")
return HttpResult(
status_code=error.code,
body=body,
content_type=error.headers.get("content-type", ""),
)
except URLError as error:
raise SystemExit(f"BLOCKED production readback network_error={error}") from error
def load_json_body(body: str) -> dict[str, Any]:
try:
payload = json.loads(body)
except json.JSONDecodeError as error:
raise SystemExit(f"BLOCKED production readback non_json_response: {error}") from error
if not isinstance(payload, dict):
raise SystemExit("BLOCKED production readback response_not_object")
return payload
def require_false(boundaries: dict[str, Any], key: str) -> None:
if boundaries.get(key) is not False:
raise SystemExit(f"BLOCKED production readback boundaries.{key}: expected false")
def require_zero(summary: dict[str, Any], key: str) -> None:
if summary.get(key) != 0:
raise SystemExit(f"BLOCKED production readback summary.{key}: expected 0")
def validate_payload(result: HttpResult, *, allow_predeploy_404: bool) -> dict[str, Any]:
if result.status_code == 404 and allow_predeploy_404:
payload = load_json_body(result.body)
if payload.get("detail") != "Not Found":
raise SystemExit("BLOCKED predeploy readback 404 body is not FastAPI Not Found")
return {
"schema_version": "iwooos_wazuh_production_readback_v1",
"status": "predeploy_404_observed",
"http_status": result.status_code,
"runtime_gate_count": 0,
}
if result.status_code == 404:
raise SystemExit("BLOCKED production readback returned 404; Wazuh FastAPI compatibility route is not deployed")
if result.status_code not in {200, 502, 503}:
raise SystemExit(f"BLOCKED production readback unexpected_http_status={result.status_code}")
for pattern_id, pattern in FORBIDDEN_RESPONSE_PATTERNS:
if pattern.search(result.body):
raise SystemExit(f"BLOCKED production readback response leaked forbidden pattern {pattern_id}")
payload = load_json_body(result.body)
if payload.get("schema_version") != EXPECTED_SCHEMA:
raise SystemExit(f"BLOCKED production readback schema_version={payload.get('schema_version')!r}")
if payload.get("status") not in ALLOWED_STATUSES:
raise SystemExit(f"BLOCKED production readback status={payload.get('status')!r}")
if payload.get("mode") != "metadata_only_no_active_response_no_raw_payload":
raise SystemExit(f"BLOCKED production readback mode={payload.get('mode')!r}")
summary = payload.get("summary")
if not isinstance(summary, dict):
raise SystemExit("BLOCKED production readback summary missing")
boundaries = payload.get("boundaries")
if not isinstance(boundaries, dict):
raise SystemExit("BLOCKED production readback boundaries missing")
for key in [
"wazuh_manager_query_accepted_count",
"wazuh_event_accepted_count",
"host_forensics_accepted_count",
"active_response_authorized_count",
"host_write_authorized_count",
"runtime_gate_count",
]:
require_zero(summary, key)
for key in [
"active_response_authorized",
"host_write_authorized",
"secret_value_collection_allowed",
"raw_wazuh_payload_storage_allowed",
"agent_identity_public_display_allowed",
"internal_ip_public_display_allowed",
]:
require_false(boundaries, key)
if boundaries.get("not_authorization") is not True:
raise SystemExit("BLOCKED production readback boundaries.not_authorization: expected true")
return {
"schema_version": "iwooos_wazuh_production_readback_v1",
"status": "production_readback_passed",
"http_status": result.status_code,
"api_status": payload["status"],
"configured": bool(payload.get("configured")),
"runtime_gate_count": summary["runtime_gate_count"],
}
def main() -> int:
parser = argparse.ArgumentParser(description="IwoooS Wazuh 只讀 API production readback 驗收")
parser.add_argument("--url", default=DEFAULT_URL, help="production Wazuh read-only API URL")
parser.add_argument("--timeout-seconds", type=float, default=10.0)
parser.add_argument(
"--allow-predeploy-404",
action="store_true",
help="僅供尚未部署時記錄 404 現況;正式部署驗收不得使用",
)
parser.add_argument("--json", action="store_true", help="輸出 JSON 摘要")
args = parser.parse_args()
result = fetch_url(args.url, args.timeout_seconds)
report = validate_payload(result, allow_predeploy_404=args.allow_predeploy_404)
if args.json:
print(json.dumps(report, ensure_ascii=False, sort_keys=True))
else:
print(
"WAZUH_READONLY_PRODUCTION_READBACK_OK "
f"status={report['status']} "
f"http={report['http_status']} "
f"runtime_gate={report['runtime_gate_count']}"
)
return 0
if __name__ == "__main__":
sys.exit(main())