Files
awoooi/scripts/security/wazuh-readonly-route-boundary-guard.py
Your Name 10a925bab6
All checks were successful
Code Review / ai-code-review (push) Successful in 16s
CD Pipeline / tests (push) Successful in 1m45s
CD Pipeline / build-and-deploy (push) Successful in 5m8s
CD Pipeline / post-deploy-checks (push) Successful in 1m35s
feat(iwooos): expose Wazuh live metadata gate readback
2026-06-27 00:11:54 +08:00

422 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
檢查 IwoooS Wazuh 只讀 API 接線邊界。
本 guard 只掃描 repo 內 source不連線 Wazuh、不讀主機、不收 secret、
不啟用 active response也不做任何 production runtime 動作。
"""
from __future__ import annotations
import argparse
import json
import re
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any
TAIPEI = timezone(timedelta(hours=8))
NEXT_ROUTE_PATH = Path("apps/web/src/app/api/iwooos/wazuh/route.ts")
BACKEND_ROUTE_PATH = Path("apps/api/src/api/v1/iwooos.py")
BACKEND_SERVICE_PATH = Path("apps/api/src/services/iwooos_wazuh_readonly_status.py")
BACKEND_LIVE_METADATA_GATE_SERVICE_PATH = Path("apps/api/src/services/iwooos_wazuh_live_metadata_gate.py")
PUBLIC_PAGE_PATH = Path("apps/web/src/app/[locale]/iwooos/page.tsx")
PUBLIC_COMPONENT_ROOT = Path("apps/web/src/components/iwooos")
@dataclass(frozen=True)
class ForbiddenPattern:
pattern_id: str
pattern: re.Pattern[str]
scope: str
ROUTE_REQUIRED_TOKENS = [
"IWOOOS_WAZUH_READONLY_ENABLED",
"IWOOOS_WAZUH_EXPECTED_MIN_AGENT_COUNT",
"WAZUH_API_BASE_URL",
"WAZUH_API_USERNAME",
"WAZUH_API_PASSWORD",
"process.env",
"requireHttpsBaseUrl",
"metadata_only_no_active_response_no_raw_payload",
"active_response_authorized: false",
"host_write_authorized: false",
"runtime_gate_count: 0",
"secret_value_collection_allowed: false",
"raw_wazuh_payload_storage_allowed: false",
"agent_identity_public_display_allowed: false",
"internal_ip_public_display_allowed: false",
"not_authorization: true",
"redactedAgent",
"alias: `agent-",
"wazuh_agent_registry_empty",
"wazuh_agent_registry_below_expected",
"agent_registry_empty_count",
"agent_below_expected_minimum_count",
"agent_visibility_no_false_green_count",
]
BACKEND_REQUIRED_TOKENS = [
"/api/iwooos/wazuh",
"/api/v1/iwooos/wazuh",
"load_iwooos_wazuh_readonly_status",
]
BACKEND_SERVICE_REQUIRED_TOKENS = [
"IWOOOS_WAZUH_READONLY_ENABLED",
"IWOOOS_WAZUH_EXPECTED_MIN_AGENT_COUNT",
"WAZUH_API_BASE_URL",
"WAZUH_API_USERNAME",
"WAZUH_API_PASSWORD",
"metadata_only_no_active_response_no_raw_payload",
"active_response_authorized_count",
"host_write_authorized_count",
"runtime_gate_count",
"raw_wazuh_payload_storage_allowed",
"internal_ip_public_display_allowed",
"redacted_agent",
"wazuh_agent_registry_empty",
"wazuh_agent_registry_below_expected",
"agent_registry_empty_count",
"agent_below_expected_minimum_count",
"agent_visibility_no_false_green_count",
]
BACKEND_LIVE_METADATA_GATE_REQUIRED_TOKENS = [
"iwooos_wazuh_live_metadata_gate_readback_v1",
"committed_snapshot_readback_with_public_safe_wazuh_route_metadata",
"boundary_markers",
"secret_value_collection_allowed",
"raw_wazuh_payload_storage_allowed",
"wazuh_api_live_query_authorized",
"wazuh_active_response_authorized",
"host_write_authorized",
"runtime_gate_count",
"not_authorization",
"正式路由讀回",
"機密明文收集=false",
"原始 Wazuh 載荷保存=false",
]
FORBIDDEN_PATTERNS = [
ForbiddenPattern(
"hardcoded_wazuh_private_url",
re.compile(r"https?://(?:10|127|172\.(?:1[6-9]|2\d|3[01])|192\.168)\.[^\s'\"`]+", re.IGNORECASE),
"route_and_public_ui",
),
ForbiddenPattern(
"wazuh_default_api_port_literal",
re.compile(r"(?<![A-Za-z0-9_])55000(?![A-Za-z0-9_])"),
"route_and_public_ui",
),
ForbiddenPattern(
"node_tls_reject_unauthorized_disabled",
re.compile(r"NODE_TLS_REJECT_UNAUTHORIZED|rejectUnauthorized\s*:\s*false", re.IGNORECASE),
"route_and_public_ui",
),
ForbiddenPattern(
"hardcoded_wazuh_password_assignment",
re.compile(r"WAZUH_API_PASSWORD\s*=\s*['\"]|password\s*:\s*['\"]|passwd\s*:\s*['\"]", re.IGNORECASE),
"route",
),
ForbiddenPattern(
"hardcoded_wazuh_username_assignment",
re.compile(r"WAZUH_API_USERNAME\s*=\s*['\"]|username\s*:\s*['\"]|user\s*:\s*['\"]", re.IGNORECASE),
"route",
),
ForbiddenPattern(
"known_secret_shape",
re.compile(r"Wooo-[0-9]{6,}"),
"route_and_public_ui",
),
ForbiddenPattern(
"fake_soc_dashboard_copy",
re.compile(
r"IWOOOS SOC Dashboard|Threat Blocked|Wazuh Agents Status|Protected Nodes|Recent Automated Responses",
re.IGNORECASE,
),
"public_ui",
),
ForbiddenPattern(
"fake_cve_or_alert_fixture",
re.compile(r"CVE-2025-55182|recentAlerts\s*:\s*\[|vulnerabilities\s*:\s*\[", re.IGNORECASE),
"public_ui",
),
ForbiddenPattern(
"legacy_wazuh_dashboard_component",
re.compile(r"FifaWazuhMonitor"),
"public_ui",
),
ForbiddenPattern(
"raw_wazuh_payload_public_copy",
re.compile(r"rawWazuhPayload|raw_wazuh_payload\s*[:=]\s*[\[{]|raw Wazuh payload\s*[:=]", re.IGNORECASE),
"public_ui",
),
]
def now_iso() -> str:
return datetime.now(TAIPEI).replace(microsecond=0).isoformat()
def read_text(path: Path) -> str:
return path.read_text(encoding="utf-8")
def collect_public_ui_files(root: Path) -> list[Path]:
files: list[Path] = []
page = root / PUBLIC_PAGE_PATH
if page.exists():
files.append(PUBLIC_PAGE_PATH)
component_root = root / PUBLIC_COMPONENT_ROOT
if component_root.exists():
files.extend(
path.relative_to(root)
for path in sorted(component_root.rglob("*"))
if path.is_file() and path.suffix in {".ts", ".tsx", ".js", ".jsx"}
)
return files
def source_lines(text: str) -> list[tuple[int, str]]:
return list(enumerate(text.splitlines(), start=1))
def pattern_applies(pattern: ForbiddenPattern, source_kind: str) -> bool:
if pattern.scope == "route_and_public_ui":
return True
if pattern.scope == "route":
return source_kind == "route"
if pattern.scope == "public_ui":
return source_kind == "public_ui"
return False
def collect_forbidden_matches(root: Path) -> list[dict[str, Any]]:
targets: list[tuple[str, Path]] = [
("route", NEXT_ROUTE_PATH),
("route", BACKEND_ROUTE_PATH),
("route", BACKEND_SERVICE_PATH),
("route", BACKEND_LIVE_METADATA_GATE_SERVICE_PATH),
]
targets.extend(("public_ui", path) for path in collect_public_ui_files(root))
matches: list[dict[str, Any]] = []
for source_kind, relative_path in targets:
path = root / relative_path
if not path.exists():
continue
for line_number, line in source_lines(read_text(path)):
for forbidden in FORBIDDEN_PATTERNS:
if pattern_applies(forbidden, source_kind) and forbidden.pattern.search(line):
matches.append(
{
"path": relative_path.as_posix(),
"line": line_number,
"pattern_id": forbidden.pattern_id,
"source_kind": source_kind,
}
)
return matches
def collect_missing_required_tokens(route_text: str, required_tokens: list[str]) -> list[str]:
return [token for token in required_tokens if token not in route_text]
def build_report(root: Path, generated_at: str | None = None) -> dict[str, Any]:
next_route = root / NEXT_ROUTE_PATH
backend_route = root / BACKEND_ROUTE_PATH
backend_service = root / BACKEND_SERVICE_PATH
backend_live_metadata_gate_service = root / BACKEND_LIVE_METADATA_GATE_SERVICE_PATH
next_route_present = next_route.exists()
backend_route_present = backend_route.exists()
backend_service_present = backend_service.exists()
backend_live_metadata_gate_service_present = backend_live_metadata_gate_service.exists()
next_route_text = read_text(next_route) if next_route_present else ""
backend_route_text = read_text(backend_route) if backend_route_present else ""
backend_service_text = read_text(backend_service) if backend_service_present else ""
backend_live_metadata_gate_service_text = (
read_text(backend_live_metadata_gate_service) if backend_live_metadata_gate_service_present else ""
)
public_ui_files = collect_public_ui_files(root)
missing_required_tokens = {
NEXT_ROUTE_PATH.as_posix(): collect_missing_required_tokens(next_route_text, ROUTE_REQUIRED_TOKENS),
BACKEND_ROUTE_PATH.as_posix(): collect_missing_required_tokens(backend_route_text, BACKEND_REQUIRED_TOKENS),
BACKEND_SERVICE_PATH.as_posix(): collect_missing_required_tokens(
backend_service_text, BACKEND_SERVICE_REQUIRED_TOKENS
),
BACKEND_LIVE_METADATA_GATE_SERVICE_PATH.as_posix(): collect_missing_required_tokens(
backend_live_metadata_gate_service_text,
BACKEND_LIVE_METADATA_GATE_REQUIRED_TOKENS,
),
}
missing_required_token_count = sum(len(tokens) for tokens in missing_required_tokens.values())
forbidden_matches = collect_forbidden_matches(root)
return {
"schema_version": "wazuh_readonly_route_boundary_guard_v1",
"generated_at": generated_at or now_iso(),
"status": (
"pass"
if (
next_route_present
and backend_route_present
and backend_service_present
and backend_live_metadata_gate_service_present
and not missing_required_token_count
and not forbidden_matches
)
else "blocked"
),
"mode": "repo_source_scan_no_runtime_no_secret_collection",
"guarded_route_paths": [
NEXT_ROUTE_PATH.as_posix(),
BACKEND_ROUTE_PATH.as_posix(),
BACKEND_SERVICE_PATH.as_posix(),
BACKEND_LIVE_METADATA_GATE_SERVICE_PATH.as_posix(),
],
"guarded_public_ui_paths": [path.as_posix() for path in public_ui_files],
"required_route_tokens": {
NEXT_ROUTE_PATH.as_posix(): ROUTE_REQUIRED_TOKENS,
BACKEND_ROUTE_PATH.as_posix(): BACKEND_REQUIRED_TOKENS,
BACKEND_SERVICE_PATH.as_posix(): BACKEND_SERVICE_REQUIRED_TOKENS,
BACKEND_LIVE_METADATA_GATE_SERVICE_PATH.as_posix(): BACKEND_LIVE_METADATA_GATE_REQUIRED_TOKENS,
},
"forbidden_pattern_ids": [pattern.pattern_id for pattern in FORBIDDEN_PATTERNS],
"summary": {
"route_present_count": (
int(next_route_present)
+ int(backend_route_present)
+ int(backend_service_present)
+ int(backend_live_metadata_gate_service_present)
),
"live_metadata_gate_service_present_count": 1 if backend_live_metadata_gate_service_present else 0,
"next_route_present_count": 1 if next_route_present else 0,
"backend_route_present_count": 1 if backend_route_present else 0,
"backend_service_present_count": 1 if backend_service_present else 0,
"public_ui_file_count": len(public_ui_files),
"required_token_count": len(ROUTE_REQUIRED_TOKENS)
+ len(BACKEND_REQUIRED_TOKENS)
+ len(BACKEND_SERVICE_REQUIRED_TOKENS)
+ len(BACKEND_LIVE_METADATA_GATE_REQUIRED_TOKENS),
"missing_required_token_count": missing_required_token_count,
"forbidden_pattern_count": len(FORBIDDEN_PATTERNS),
"forbidden_match_count": len(forbidden_matches),
"readonly_api_default_closed_count": sum(
"IWOOOS_WAZUH_READONLY_ENABLED" in text
for text in [
next_route_text,
backend_route_text,
backend_service_text,
backend_live_metadata_gate_service_text,
]
),
"server_side_env_required_count": sum(
token in text
for text in [
next_route_text,
backend_route_text,
backend_service_text,
backend_live_metadata_gate_service_text,
]
for token in ["WAZUH_API_BASE_URL", "WAZUH_API_USERNAME", "WAZUH_API_PASSWORD"]
),
"tls_disable_match_count": sum(
1
for item in forbidden_matches
if item["pattern_id"] == "node_tls_reject_unauthorized_disabled"
),
"hardcoded_private_url_match_count": sum(
1 for item in forbidden_matches if item["pattern_id"] == "hardcoded_wazuh_private_url"
),
"fake_soc_fixture_match_count": sum(
1
for item in forbidden_matches
if item["pattern_id"] in {"fake_soc_dashboard_copy", "fake_cve_or_alert_fixture"}
),
"active_response_authorized_count": 0,
"host_write_authorized_count": 0,
"runtime_gate_count": 0,
"action_button_count": 0,
},
"execution_boundaries": {
"runtime_execution_authorized": False,
"wazuh_api_live_query_authorized": False,
"wazuh_active_response_authorized": False,
"host_read_authorized": False,
"host_write_authorized": False,
"secret_value_collection_allowed": False,
"raw_wazuh_payload_storage_allowed": False,
"agent_identity_public_display_allowed": False,
"internal_ip_public_display_allowed": False,
"frontend_public_raw_alert_display_allowed": False,
"action_buttons_allowed": False,
"not_authorization": True,
},
"missing_required_tokens": missing_required_tokens,
"forbidden_matches": forbidden_matches,
"operator_interpretation": [
"Wazuh API code path 必須預設關閉,只有 server-side env 與 owner gate 允許只讀 metadata 查詢。",
"不得硬編 Wazuh 內網 URL、使用者、密碼或關閉 TLS 驗證。",
"前台不得顯示假 SOC dashboard、假 CVE、假 automated response、raw payload、agent 原名或內網 IP。",
"此 guard 通過只代表 source 邊界合格,不代表 Wazuh live query、active response、host containment 或 runtime remediation 已授權。",
],
}
def validate(root: Path) -> None:
report = build_report(root)
errors: list[str] = []
if report["summary"]["next_route_present_count"] != 1:
errors.append(f"{NEXT_ROUTE_PATH.as_posix()}: Wazuh Next.js 只讀 route 不存在")
if report["summary"]["backend_route_present_count"] != 1:
errors.append(f"{BACKEND_ROUTE_PATH.as_posix()}: Wazuh FastAPI 相容 route 不存在")
if report["summary"]["backend_service_present_count"] != 1:
errors.append(f"{BACKEND_SERVICE_PATH.as_posix()}: Wazuh FastAPI 只讀 service 不存在")
if report["summary"]["live_metadata_gate_service_present_count"] != 1:
errors.append(
f"{BACKEND_LIVE_METADATA_GATE_SERVICE_PATH.as_posix()}: Wazuh 即時中繼資料閘門 service 不存在"
)
for path, tokens in report["missing_required_tokens"].items():
for token in tokens:
errors.append(f"{path}: 缺少必要只讀邊界 token {token!r}")
for item in report["forbidden_matches"]:
errors.append(
f"{item['path']}:{item['line']}: 命中 Wazuh 邊界 forbidden pattern {item['pattern_id']}"
)
if errors:
raise SystemExit("BLOCKED Wazuh readonly route boundary guard:\n" + "\n".join(f"- {error}" for error in errors))
def main() -> None:
parser = argparse.ArgumentParser(description="檢查 IwoooS Wazuh 只讀 API 接線邊界")
parser.add_argument("--root", default=".", help="repository root")
parser.add_argument("--output", help="寫出 JSON 報告")
parser.add_argument("--generated-at", help="固定報告時間,供 committed snapshot 使用")
args = parser.parse_args()
root = Path(args.root).resolve()
report = build_report(root, args.generated_at)
if args.output:
output = Path(args.output)
output.parent.mkdir(parents=True, exist_ok=True)
output.write_text(json.dumps(report, ensure_ascii=False, indent=2, sort_keys=True) + "\n", encoding="utf-8")
validate(root)
summary = report["summary"]
print(
"WAZUH_READONLY_ROUTE_BOUNDARY_GUARD_OK "
f"route={summary['route_present_count']} "
f"public_ui_files={summary['public_ui_file_count']} "
f"forbidden={summary['forbidden_match_count']} "
f"runtime_gate={summary['runtime_gate_count']}"
)
if __name__ == "__main__":
main()