190 lines
7.2 KiB
Python
190 lines
7.2 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
IwoooS Wazuh 只讀 API production readback 驗收。
|
||
|
||
本工具只做 HTTPS GET,不收 secret、不連 Wazuh manager、不做 active
|
||
response、不碰主機。預設模式用於部署後驗收:production API 不可是 404。
|
||
若 release 尚未部署,可加 --allow-predeploy-404 記錄目前仍未上線。
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import json
|
||
import re
|
||
import sys
|
||
from dataclasses import dataclass
|
||
from typing import Any
|
||
from urllib.error import HTTPError, URLError
|
||
from urllib.request import Request, urlopen
|
||
|
||
|
||
DEFAULT_URL = "https://awoooi.wooo.work/api/iwooos/wazuh"
|
||
EXPECTED_SCHEMA = "iwooos_wazuh_readonly_status_v1"
|
||
ALLOWED_STATUSES = {
|
||
"disabled_waiting_iwooos_wazuh_owner_gate",
|
||
"misconfigured_missing_server_side_wazuh_env",
|
||
"wazuh_auth_token_missing",
|
||
"wazuh_readonly_metadata_unavailable",
|
||
"wazuh_agent_registry_empty",
|
||
"wazuh_agent_registry_below_expected",
|
||
"readonly_metadata_available",
|
||
}
|
||
FORBIDDEN_RESPONSE_PATTERNS = [
|
||
("private_ipv4", re.compile(r"\b(?:10|127|172\.(?:1[6-9]|2\d|3[01])|192\.168)\.\d{1,3}\.\d{1,3}\b")),
|
||
("known_secret_shape", re.compile(r"Wooo-[0-9]{6,}")),
|
||
("token_like_field", re.compile(r'"(?:token|password|secret|private_key|runner_token)"\s*:', re.IGNORECASE)),
|
||
(
|
||
"raw_payload_marker",
|
||
re.compile(r"raw[_ -]?(?:log|event|alert|body|request|response)", re.IGNORECASE),
|
||
),
|
||
("legacy_fake_soc_copy", re.compile(r"IWOOOS SOC Dashboard|Threat Blocked|Recent Automated Responses", re.IGNORECASE)),
|
||
]
|
||
|
||
|
||
@dataclass(frozen=True)
|
||
class HttpResult:
|
||
status_code: int
|
||
body: str
|
||
content_type: str
|
||
|
||
|
||
def fetch_url(url: str, timeout_seconds: float) -> HttpResult:
|
||
request = Request(url, headers={"Accept": "application/json", "User-Agent": "iwooos-wazuh-readback/1.0"})
|
||
try:
|
||
with urlopen(request, timeout=timeout_seconds) as response:
|
||
body = response.read().decode("utf-8", errors="replace")
|
||
return HttpResult(
|
||
status_code=response.status,
|
||
body=body,
|
||
content_type=response.headers.get("content-type", ""),
|
||
)
|
||
except HTTPError as error:
|
||
body = error.read().decode("utf-8", errors="replace")
|
||
return HttpResult(
|
||
status_code=error.code,
|
||
body=body,
|
||
content_type=error.headers.get("content-type", ""),
|
||
)
|
||
except URLError as error:
|
||
raise SystemExit(f"BLOCKED production readback network_error={error}") from error
|
||
|
||
|
||
def load_json_body(body: str) -> dict[str, Any]:
|
||
try:
|
||
payload = json.loads(body)
|
||
except json.JSONDecodeError as error:
|
||
raise SystemExit(f"BLOCKED production readback non_json_response: {error}") from error
|
||
if not isinstance(payload, dict):
|
||
raise SystemExit("BLOCKED production readback response_not_object")
|
||
return payload
|
||
|
||
|
||
def require_false(boundaries: dict[str, Any], key: str) -> None:
|
||
if boundaries.get(key) is not False:
|
||
raise SystemExit(f"BLOCKED production readback boundaries.{key}: expected false")
|
||
|
||
|
||
def require_zero(summary: dict[str, Any], key: str) -> None:
|
||
if summary.get(key) != 0:
|
||
raise SystemExit(f"BLOCKED production readback summary.{key}: expected 0")
|
||
|
||
|
||
def validate_payload(result: HttpResult, *, allow_predeploy_404: bool) -> dict[str, Any]:
|
||
if result.status_code == 404 and allow_predeploy_404:
|
||
payload = load_json_body(result.body)
|
||
if payload.get("detail") != "Not Found":
|
||
raise SystemExit("BLOCKED predeploy readback 404 body is not FastAPI Not Found")
|
||
return {
|
||
"schema_version": "iwooos_wazuh_production_readback_v1",
|
||
"status": "predeploy_404_observed",
|
||
"http_status": result.status_code,
|
||
"runtime_gate_count": 0,
|
||
}
|
||
|
||
if result.status_code == 404:
|
||
raise SystemExit("BLOCKED production readback returned 404; Wazuh FastAPI compatibility route is not deployed")
|
||
if result.status_code not in {200, 502, 503}:
|
||
raise SystemExit(f"BLOCKED production readback unexpected_http_status={result.status_code}")
|
||
|
||
for pattern_id, pattern in FORBIDDEN_RESPONSE_PATTERNS:
|
||
if pattern.search(result.body):
|
||
raise SystemExit(f"BLOCKED production readback response leaked forbidden pattern {pattern_id}")
|
||
|
||
payload = load_json_body(result.body)
|
||
if payload.get("schema_version") != EXPECTED_SCHEMA:
|
||
raise SystemExit(f"BLOCKED production readback schema_version={payload.get('schema_version')!r}")
|
||
if payload.get("status") not in ALLOWED_STATUSES:
|
||
raise SystemExit(f"BLOCKED production readback status={payload.get('status')!r}")
|
||
if payload.get("mode") != "metadata_only_no_active_response_no_raw_payload":
|
||
raise SystemExit(f"BLOCKED production readback mode={payload.get('mode')!r}")
|
||
|
||
summary = payload.get("summary")
|
||
if not isinstance(summary, dict):
|
||
raise SystemExit("BLOCKED production readback summary missing")
|
||
boundaries = payload.get("boundaries")
|
||
if not isinstance(boundaries, dict):
|
||
raise SystemExit("BLOCKED production readback boundaries missing")
|
||
|
||
for key in [
|
||
"wazuh_manager_query_accepted_count",
|
||
"wazuh_event_accepted_count",
|
||
"host_forensics_accepted_count",
|
||
"active_response_authorized_count",
|
||
"host_write_authorized_count",
|
||
"runtime_gate_count",
|
||
]:
|
||
require_zero(summary, key)
|
||
|
||
for key in [
|
||
"active_response_authorized",
|
||
"host_write_authorized",
|
||
"secret_value_collection_allowed",
|
||
"raw_wazuh_payload_storage_allowed",
|
||
"agent_identity_public_display_allowed",
|
||
"internal_ip_public_display_allowed",
|
||
]:
|
||
require_false(boundaries, key)
|
||
|
||
if boundaries.get("not_authorization") is not True:
|
||
raise SystemExit("BLOCKED production readback boundaries.not_authorization: expected true")
|
||
|
||
return {
|
||
"schema_version": "iwooos_wazuh_production_readback_v1",
|
||
"status": "production_readback_passed",
|
||
"http_status": result.status_code,
|
||
"api_status": payload["status"],
|
||
"configured": bool(payload.get("configured")),
|
||
"runtime_gate_count": summary["runtime_gate_count"],
|
||
}
|
||
|
||
|
||
def main() -> int:
|
||
parser = argparse.ArgumentParser(description="IwoooS Wazuh 只讀 API production readback 驗收")
|
||
parser.add_argument("--url", default=DEFAULT_URL, help="production Wazuh read-only API URL")
|
||
parser.add_argument("--timeout-seconds", type=float, default=10.0)
|
||
parser.add_argument(
|
||
"--allow-predeploy-404",
|
||
action="store_true",
|
||
help="僅供尚未部署時記錄 404 現況;正式部署驗收不得使用",
|
||
)
|
||
parser.add_argument("--json", action="store_true", help="輸出 JSON 摘要")
|
||
args = parser.parse_args()
|
||
|
||
result = fetch_url(args.url, args.timeout_seconds)
|
||
report = validate_payload(result, allow_predeploy_404=args.allow_predeploy_404)
|
||
if args.json:
|
||
print(json.dumps(report, ensure_ascii=False, sort_keys=True))
|
||
else:
|
||
print(
|
||
"WAZUH_READONLY_PRODUCTION_READBACK_OK "
|
||
f"status={report['status']} "
|
||
f"http={report['http_status']} "
|
||
f"runtime_gate={report['runtime_gate_count']}"
|
||
)
|
||
return 0
|
||
|
||
|
||
if __name__ == "__main__":
|
||
sys.exit(main())
|