diff --git a/apps/api/src/api/v1/iwooos.py b/apps/api/src/api/v1/iwooos.py
index 3251afb3..998f53aa 100644
--- a/apps/api/src/api/v1/iwooos.py
+++ b/apps/api/src/api/v1/iwooos.py
@@ -37,6 +37,7 @@ from src.services.iwooos_wazuh_managed_host_coverage import (
)
from src.services.iwooos_wazuh_manager_registry_reviewer_validation import (
load_latest_iwooos_wazuh_manager_registry_reviewer_validation,
+ validate_iwooos_wazuh_manager_registry_owner_export as validate_wazuh_manager_registry_owner_export_payload,
)
from src.services.iwooos_wazuh_owner_evidence_preflight import (
load_latest_iwooos_wazuh_owner_evidence_preflight,
@@ -178,6 +179,38 @@ async def get_iwooos_wazuh_manager_registry_reviewer_validation() -> dict[str, A
) from exc
+@router.post(
+ "/api/v1/iwooos/wazuh-manager-registry-reviewer-validation/validate-owner-export",
+ response_model=dict[str, Any],
+ summary="驗證 Wazuh manager registry 脫敏 owner export",
+ description=(
+ "針對單次 owner-provided redacted Wazuh manager registry export 進行 no-persist reviewer "
+ "validation,回傳 accepted / needs supplement / quarantined / rejected runtime action 分流。"
+ "此端點不保存 payload、不查 Wazuh API、不讀主機、不重新註冊 agent、不重啟服務、不讀或回傳"
+ "機密明文、不啟用主動回應、不改 Nginx / Docker / K8s / firewall,也不更新 manager registry "
+ "accepted 總帳。"
+ ),
+)
+async def validate_iwooos_wazuh_manager_registry_owner_export(owner_export: dict[str, Any]) -> dict[str, Any]:
+ """回傳單次 Wazuh manager registry 脫敏匯出的公開安全驗證結果。"""
+ try:
+ payload = await asyncio.to_thread(
+ validate_wazuh_manager_registry_owner_export_payload,
+ owner_export,
+ )
+ return redact_public_lan_topology(payload)
+ except FileNotFoundError as exc:
+ raise HTTPException(
+ status_code=status.HTTP_404_NOT_FOUND,
+ detail=str(exc),
+ ) from exc
+ except (json.JSONDecodeError, ValueError) as exc:
+ raise HTTPException(
+ status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
+ detail=f"IwoooS Wazuh manager registry owner export 驗證器無效:{exc}",
+ ) from exc
+
+
@router.get(
"/api/v1/iwooos/runtime-security-readback",
response_model=dict[str, Any],
diff --git a/apps/api/src/services/iwooos_wazuh_manager_registry_reviewer_validation.py b/apps/api/src/services/iwooos_wazuh_manager_registry_reviewer_validation.py
index aba0eecc..934413fb 100644
--- a/apps/api/src/services/iwooos_wazuh_manager_registry_reviewer_validation.py
+++ b/apps/api/src/services/iwooos_wazuh_manager_registry_reviewer_validation.py
@@ -1,15 +1,16 @@
"""
IwoooS Wazuh manager registry reviewer validation readback.
-This service exposes a committed reviewer-validation contract for future
-owner-provided redacted Wazuh manager registry exports. It never receives raw
-payloads, queries Wazuh, reads host data, reads secrets, or authorizes runtime
-actions.
+This service exposes a committed reviewer-validation contract and a no-persist
+validator for owner-provided redacted Wazuh manager registry exports. It never
+queries Wazuh, reads host data, reads secrets, persists raw payloads, or
+authorizes runtime actions.
"""
from __future__ import annotations
import json
+import re
from pathlib import Path
from typing import Any
@@ -34,6 +35,82 @@ _REQUIRED_FALSE_BOUNDARIES = {
"wazuh_manager_restart_authorized",
}
+_SENSITIVE_TEXT_PATTERNS = {
+ "internal_ip": re.compile(r"\b(?:10|127|172\.(?:1[6-9]|2\d|3[01])|192\.168)\.\d{1,3}\.\d{1,3}\b"),
+ "authorization_header": re.compile(r"Authorization\s*:", re.IGNORECASE),
+ "bearer_token": re.compile(r"Bearer\s+[A-Za-z0-9._-]{10,}", re.IGNORECASE),
+ "basic_auth": re.compile(r"Basic\s+[A-Za-z0-9+/=]{10,}", re.IGNORECASE),
+ "password_assignment": re.compile(r"password\s*[:=]\s*['\"][^'\"]+['\"]", re.IGNORECASE),
+ "token_assignment": re.compile(r"token\s*[:=]\s*['\"][^'\"]+['\"]", re.IGNORECASE),
+ "cookie_assignment": re.compile(r"cookie\s*[:=]\s*['\"][^'\"]+['\"]", re.IGNORECASE),
+ "client_keys": re.compile(r"client\.keys", re.IGNORECASE),
+ "private_key": re.compile(r"-----BEGIN [A-Z ]*PRIVATE KEY-----"),
+ "work_window_text": re.compile(r"(工作視窗|批准!繼續|source_thread_id|owenhytsai/)", re.IGNORECASE),
+}
+
+_FORBIDDEN_KEY_FRAGMENTS = {
+ "authorization_header",
+ "basic_auth",
+ "bearer_token",
+ "client_keys",
+ "cookie",
+ "dashboard_api_secret",
+ "firewall_change",
+ "full_cli_output",
+ "full_journal",
+ "host_write",
+ "hostname",
+ "internal_ip",
+ "nginx_reload",
+ "password",
+ "private_key",
+ "raw_dashboard_request",
+ "raw_log",
+ "raw_wazuh_payload",
+ "stored_api_password",
+ "unredacted_screenshot",
+}
+
+_RUNTIME_ACTION_KEYS = {
+ "active_response_enable",
+ "agent_reenroll",
+ "agent_restart",
+ "argocd_sync",
+ "firewall_change",
+ "host_write",
+ "k8s_or_argocd_change",
+ "kali_active_scan",
+ "nginx_reload",
+ "runtime_execution_authorized",
+ "secret_rotation",
+ "wazuh_active_response",
+ "wazuh_agent_reenroll",
+ "wazuh_agent_restart",
+ "wazuh_api_live_query",
+ "wazuh_dashboard_secret_patch",
+ "wazuh_manager_restart",
+}
+
+_DASHBOARD_REQUIRED_FIELDS = {
+ "dashboard_api_connection_check_status",
+ "dashboard_api_version_check_status",
+ "dashboard_index_pattern_statuses",
+ "dashboard_api_degradation_root_cause",
+ "dashboard_api_repair_postcheck_ref",
+}
+
+_READONLY_CREDENTIAL_REQUIRED_FIELDS = {
+ "collection_method",
+ "manager_health_ref",
+ "redacted_evidence_refs",
+}
+
+_ACCOUNTABILITY_REQUIRED_FIELDS = {
+ "followup_owner",
+ "rollback_owner",
+ "postcheck_plan",
+}
+
def load_latest_iwooos_wazuh_manager_registry_reviewer_validation(
security_dir: Path | None = None,
@@ -76,6 +153,10 @@ def load_latest_iwooos_wazuh_manager_registry_reviewer_validation(
f"docs/security/{_SNAPSHOT_FILE}",
"scripts/security/wazuh-manager-registry-reviewer-validation.py",
],
+ "owner_export_validation_endpoint": (
+ "/api/v1/iwooos/wazuh-manager-registry-reviewer-validation/validate-owner-export"
+ ),
+ "owner_export_validation_mode": "no_persist_validation_no_runtime_action",
"summary": merged_summary,
"expected_scope_aliases": _strings(snapshot.get("expected_scope_aliases")),
"reviewer_validation_checks": _checks(snapshot.get("reviewer_validation_checks")),
@@ -173,6 +254,7 @@ def _evidence_slots(value: Any) -> list[dict[str, Any]]:
def _boundary_markers(summary: dict[str, int]) -> list[str]:
return [
"wazuh_manager_registry_reviewer_validation_visible=true",
+ "wazuh_manager_registry_owner_export_validation_api_available=true",
f"wazuh_manager_registry_reviewer_validation_expected_scope_alias_count={summary['expected_scope_alias_count']}",
f"wazuh_manager_registry_reviewer_validation_required_owner_field_count={summary['required_owner_field_count']}",
f"wazuh_manager_registry_reviewer_validation_per_host_required_field_count={summary['per_host_required_field_count']}",
@@ -225,3 +307,394 @@ def _require_boundaries(payload: dict[str, Any]) -> None:
raise ValueError(f"Wazuh manager registry reviewer validation execution_boundaries.{key} 必須維持 false")
if boundaries.get("not_authorization") is not True:
raise ValueError("Wazuh manager registry reviewer validation not_authorization 必須維持 true")
+
+
+def validate_iwooos_wazuh_manager_registry_owner_export(
+ owner_export: dict[str, Any],
+ security_dir: Path | None = None,
+) -> dict[str, Any]:
+ """Validate one redacted owner export without persisting it or changing runtime truth."""
+ contract = load_latest_iwooos_wazuh_manager_registry_reviewer_validation(security_dir)
+ snapshot = _load_snapshot(security_dir or _DEFAULT_SECURITY_DIR)
+ expected_aliases = set(_strings(snapshot.get("expected_scope_aliases")))
+ required_owner_fields = _strings(snapshot.get("required_owner_fields"))
+ per_host_required_fields = _strings(snapshot.get("per_host_required_fields"))
+
+ findings: list[dict[str, Any]] = []
+ evidence_status = _initial_evidence_status(snapshot)
+
+ if not isinstance(owner_export, dict):
+ findings.append(_finding("RV-01", "blocker", "request_missing_fields", "owner export 必須是 JSON object", []))
+ return _validation_result(contract, "request_missing_fields", findings, evidence_status)
+
+ sensitive_hits = _collect_sensitive_hits(owner_export)
+ if sensitive_hits:
+ findings.append(
+ _finding(
+ "RV-07",
+ "critical",
+ "quarantine_sensitive_payload",
+ "owner export 含禁止內容或疑似未脫敏內容,已進隔離分流;回應不回傳原始值。",
+ [hit["path"] for hit in sensitive_hits[:12]],
+ {"categories": sorted({hit["category"] for hit in sensitive_hits})},
+ )
+ )
+ return _validation_result(contract, "quarantine_sensitive_payload", findings, evidence_status)
+
+ runtime_hits = _collect_runtime_action_hits(owner_export)
+ if runtime_hits:
+ findings.append(
+ _finding(
+ "RV-09",
+ "critical",
+ "reject_runtime_action_request",
+ "owner export 夾帶 runtime action request;收件驗證只允許脫敏證據,不授權 active response、restart、host write 或掃描。",
+ runtime_hits[:12],
+ )
+ )
+ return _validation_result(contract, "reject_runtime_action_request", findings, evidence_status)
+
+ missing_owner_fields = [field for field in required_owner_fields if not _present(owner_export.get(field))]
+ if missing_owner_fields:
+ findings.append(
+ _finding(
+ "RV-01",
+ "blocker",
+ "request_missing_fields",
+ "owner export envelope 欄位不足,需要補齊後再驗證。",
+ missing_owner_fields,
+ )
+ )
+
+ count_issue = _validate_counts(owner_export)
+ if count_issue:
+ findings.append(
+ _finding(
+ "RV-02",
+ "blocker",
+ "request_counts_arithmetic_fix",
+ count_issue,
+ ["agent_total", "agent_active", "agent_disconnected", "agent_never_connected"],
+ )
+ )
+
+ alias_issue = _validate_aliases(owner_export.get("registry_export_scope_aliases"), expected_aliases)
+ if alias_issue:
+ findings.append(
+ _finding(
+ "RV-03",
+ "blocker",
+ "request_alias_scope_parity_fix",
+ alias_issue,
+ ["registry_export_scope_aliases"],
+ )
+ )
+
+ matrix_issue_paths = _validate_per_host_matrix(
+ owner_export.get("per_host_registry_matrix"),
+ expected_aliases,
+ per_host_required_fields,
+ )
+ if matrix_issue_paths:
+ findings.append(
+ _finding(
+ "RV-04",
+ "blocker",
+ "request_per_host_matrix_supplement",
+ "逐主機矩陣未完整覆蓋 6 個公開別名或缺少必要欄位。",
+ matrix_issue_paths[:30],
+ )
+ )
+
+ dashboard_missing = [field for field in sorted(_DASHBOARD_REQUIRED_FIELDS) if not _present(owner_export.get(field))]
+ if dashboard_missing:
+ findings.append(
+ _finding(
+ "RV-05",
+ "blocker",
+ "request_dashboard_api_repair_postcheck",
+ "Dashboard API connection / version / index pattern / root cause / repair postcheck 必須分欄。",
+ dashboard_missing,
+ )
+ )
+
+ readonly_missing = [field for field in sorted(_READONLY_CREDENTIAL_REQUIRED_FIELDS) if not _present(owner_export.get(field))]
+ if readonly_missing:
+ findings.append(
+ _finding(
+ "RV-06",
+ "blocker",
+ "request_readonly_credential_metadata",
+ "唯讀 credential metadata 與 manager health ref 必須可追溯,且不得含 secret value。",
+ readonly_missing,
+ )
+ )
+
+ accountability_missing = [field for field in sorted(_ACCOUNTABILITY_REQUIRED_FIELDS) if not _present(owner_export.get(field))]
+ if accountability_missing:
+ findings.append(
+ _finding(
+ "RV-08",
+ "blocker",
+ "request_owner_accountability_supplement",
+ "followup owner、rollback owner 與 postcheck plan 必須可讀。",
+ accountability_missing,
+ )
+ )
+
+ _mark_evidence_status(evidence_status, owner_export)
+ outcome = _first_blocking_lane(findings) or "accepted_for_readonly_posture_only"
+ if outcome == "accepted_for_readonly_posture_only":
+ evidence_status = [
+ {**slot, "received": True, "accepted": True, "quarantined": False}
+ for slot in evidence_status
+ ]
+ findings.append(
+ _finding(
+ "RV-10",
+ "info",
+ "waiting_post_enable_iwooos_readback",
+ "owner export 已通過 no-persist reviewer validation;下一步仍只能進 post-enable IwoooS readback,不開 runtime gate。",
+ ["post_enable_iwooos_readback"],
+ )
+ )
+
+ return _validation_result(contract, outcome, findings, evidence_status)
+
+
+def _validation_result(
+ contract: dict[str, Any],
+ outcome_lane: str,
+ findings: list[dict[str, Any]],
+ evidence_status: list[dict[str, Any]],
+) -> dict[str, Any]:
+ accepted = outcome_lane == "accepted_for_readonly_posture_only"
+ quarantined = outcome_lane == "quarantine_sensitive_payload"
+ rejected_runtime = outcome_lane == "reject_runtime_action_request"
+ return {
+ "schema_version": "iwooos_wazuh_manager_registry_owner_export_validation_result_v1",
+ "contract_schema_version": contract["schema_version"],
+ "status": outcome_lane,
+ "mode": "no_persist_validation_no_runtime_no_secret_collection",
+ "outcome_lane": outcome_lane,
+ "accepted_for_readonly_posture_only": accepted,
+ "reviewer_validation_passed": accepted,
+ "quarantined": quarantined,
+ "runtime_action_rejected": rejected_runtime,
+ "summary": {
+ "owner_registry_export_received_count": 1,
+ "owner_registry_export_accepted_count": 1 if accepted else 0,
+ "reviewer_validation_passed_count": 1 if accepted else 0,
+ "reviewer_validation_quarantined_count": 1 if quarantined else 0,
+ "manager_registry_accepted_count": 0,
+ "post_enable_readback_passed_count": 0,
+ "runtime_gate_count": 0,
+ "host_write_authorized_count": 0,
+ "active_response_authorized_count": 0,
+ "secret_value_collection_allowed_count": 0,
+ "finding_count": len(findings),
+ },
+ "validation_findings": findings,
+ "evidence_slots": evidence_status,
+ "boundary_markers": [
+ "wazuh_manager_registry_owner_export_validation_received_count=1",
+ f"wazuh_manager_registry_owner_export_validation_accepted_count={1 if accepted else 0}",
+ f"wazuh_manager_registry_owner_export_validation_quarantined_count={1 if quarantined else 0}",
+ "wazuh_manager_registry_owner_export_validation_manager_registry_accepted_count=0",
+ "wazuh_manager_registry_owner_export_validation_runtime_gate_count=0",
+ "wazuh_manager_registry_owner_export_validation_no_persist=true",
+ "wazuh_api_live_query_authorized=false",
+ "wazuh_active_response_authorized=false",
+ "host_write_authorized=false",
+ "secret_value_collection_allowed=false",
+ "not_authorization=true",
+ ],
+ "boundaries": {
+ "payload_persisted": False,
+ "wazuh_api_live_query_authorized": False,
+ "wazuh_agent_reenroll_authorized": False,
+ "wazuh_agent_restart_authorized": False,
+ "wazuh_manager_restart_authorized": False,
+ "wazuh_active_response_authorized": False,
+ "host_write_authorized": False,
+ "secret_value_collection_allowed": False,
+ "raw_wazuh_payload_storage_allowed": False,
+ "kali_active_scan_authorized": False,
+ "runtime_execution_authorized": False,
+ "manager_registry_accepted_updated": False,
+ "not_authorization": True,
+ },
+ "next_gate": "post_enable_iwooos_readback" if accepted else "owner_export_fix_and_resubmit",
+ }
+
+
+def _finding(
+ check_id: str,
+ severity: str,
+ lane: str,
+ message: str,
+ field_paths: list[str],
+ extra: dict[str, Any] | None = None,
+) -> dict[str, Any]:
+ payload: dict[str, Any] = {
+ "check_id": check_id,
+ "severity": severity,
+ "lane": lane,
+ "message": message,
+ "field_paths": field_paths,
+ }
+ if extra:
+ payload.update(extra)
+ return payload
+
+
+def _initial_evidence_status(snapshot: dict[str, Any]) -> list[dict[str, Any]]:
+ slots = _evidence_slots(snapshot.get("evidence_slots"))
+ return [{**slot, "received": False, "accepted": False, "quarantined": False} for slot in slots]
+
+
+def _mark_evidence_status(evidence_status: list[dict[str, Any]], owner_export: dict[str, Any]) -> None:
+ for slot in evidence_status:
+ required_fields = slot.get("required_fields", [])
+ slot["received"] = bool(required_fields) and all(_present(owner_export.get(field)) for field in required_fields)
+ slot["accepted"] = False
+ slot["quarantined"] = False
+
+
+def _present(value: Any) -> bool:
+ if value is None:
+ return False
+ if isinstance(value, str):
+ return bool(value.strip())
+ if isinstance(value, (list, dict, tuple, set)):
+ return bool(value)
+ return True
+
+
+def _int_or_none(value: Any) -> int | None:
+ if isinstance(value, bool):
+ return None
+ if isinstance(value, int):
+ return value
+ if isinstance(value, str) and value.strip().isdigit():
+ return int(value.strip())
+ return None
+
+
+def _validate_counts(owner_export: dict[str, Any]) -> str | None:
+ fields = ["agent_total", "agent_active", "agent_disconnected", "agent_never_connected"]
+ counts = {field: _int_or_none(owner_export.get(field)) for field in fields}
+ if any(value is None for value in counts.values()):
+ return "agent count 欄位必須是非負整數。"
+ if any(value is not None and value < 0 for value in counts.values()):
+ return "agent count 欄位不得為負數。"
+ total = counts["agent_total"]
+ active = counts["agent_active"]
+ disconnected = counts["agent_disconnected"]
+ never_connected = counts["agent_never_connected"]
+ if total is None or active is None or disconnected is None or never_connected is None:
+ return "agent count 欄位必須是非負整數。"
+ if total < active + disconnected + never_connected:
+ return "agent_total 不得小於 active + disconnected + never_connected。"
+ return None
+
+
+def _validate_aliases(value: Any, expected_aliases: set[str]) -> str | None:
+ aliases = value if isinstance(value, list) else []
+ if not aliases or not all(isinstance(item, str) for item in aliases):
+ return "registry_export_scope_aliases 必須是公開別名字串陣列。"
+ alias_set = set(aliases)
+ if len(aliases) != len(alias_set):
+ return "registry_export_scope_aliases 不得重複。"
+ if alias_set != expected_aliases:
+ missing = sorted(expected_aliases - alias_set)
+ extra = sorted(alias_set - expected_aliases)
+ return f"registry_export_scope_aliases 必須剛好等於 6 個公開別名;missing={missing} extra={extra}"
+ return None
+
+
+def _validate_per_host_matrix(value: Any, expected_aliases: set[str], required_fields: list[str]) -> list[str]:
+ if not isinstance(value, list):
+ return ["per_host_registry_matrix"]
+ paths: list[str] = []
+ seen_aliases: list[str] = []
+ for index, item in enumerate(value):
+ if not isinstance(item, dict):
+ paths.append(f"per_host_registry_matrix[{index}]")
+ continue
+ alias = item.get("node_alias")
+ if not isinstance(alias, str) or alias not in expected_aliases:
+ paths.append(f"per_host_registry_matrix[{index}].node_alias")
+ else:
+ seen_aliases.append(alias)
+ for field in required_fields:
+ if not _present(item.get(field)):
+ paths.append(f"per_host_registry_matrix[{index}].{field}")
+ seen_set = set(seen_aliases)
+ if len(seen_aliases) != len(seen_set):
+ paths.append("per_host_registry_matrix.duplicate_node_alias")
+ for missing_alias in sorted(expected_aliases - seen_set):
+ paths.append(f"per_host_registry_matrix.missing.{missing_alias}")
+ for extra_alias in sorted(seen_set - expected_aliases):
+ paths.append(f"per_host_registry_matrix.extra.{extra_alias}")
+ return paths
+
+
+def _collect_sensitive_hits(value: Any, path: str = "$") -> list[dict[str, str]]:
+ hits: list[dict[str, str]] = []
+ if isinstance(value, dict):
+ for key, item in value.items():
+ key_text = str(key)
+ key_lower = key_text.lower()
+ for fragment in _FORBIDDEN_KEY_FRAGMENTS:
+ if fragment in key_lower:
+ hits.append({"path": f"{path}.{key_text}", "category": f"forbidden_key:{fragment}"})
+ hits.extend(_collect_sensitive_hits(item, f"{path}.{key_text}"))
+ return hits
+ if isinstance(value, list):
+ for index, item in enumerate(value):
+ hits.extend(_collect_sensitive_hits(item, f"{path}[{index}]"))
+ return hits
+ if isinstance(value, str):
+ for category, pattern in _SENSITIVE_TEXT_PATTERNS.items():
+ if pattern.search(value):
+ hits.append({"path": path, "category": category})
+ return hits
+
+
+def _collect_runtime_action_hits(value: Any, path: str = "$") -> list[str]:
+ hits: list[str] = []
+ if isinstance(value, dict):
+ for key, item in value.items():
+ key_text = str(key)
+ normalized_key = key_text.lower().replace("-", "_").replace(" ", "_")
+ if normalized_key in _RUNTIME_ACTION_KEYS and item not in (False, None, "", [], {}):
+ hits.append(f"{path}.{key_text}")
+ hits.extend(_collect_runtime_action_hits(item, f"{path}.{key_text}"))
+ return hits
+ if isinstance(value, list):
+ for index, item in enumerate(value):
+ hits.extend(_collect_runtime_action_hits(item, f"{path}[{index}]"))
+ return hits
+ if isinstance(value, str):
+ normalized = value.lower().replace("-", "_").replace(" ", "_")
+ if normalized in _RUNTIME_ACTION_KEYS:
+ hits.append(path)
+ return hits
+
+
+def _first_blocking_lane(findings: list[dict[str, Any]]) -> str | None:
+ for lane in (
+ "quarantine_sensitive_payload",
+ "reject_runtime_action_request",
+ "request_missing_fields",
+ "request_counts_arithmetic_fix",
+ "request_alias_scope_parity_fix",
+ "request_per_host_matrix_supplement",
+ "request_dashboard_api_repair_postcheck",
+ "request_readonly_credential_metadata",
+ "request_owner_accountability_supplement",
+ ):
+ if any(item.get("lane") == lane for item in findings):
+ return lane
+ return None
diff --git a/apps/api/tests/test_iwooos_wazuh_manager_registry_reviewer_validation.py b/apps/api/tests/test_iwooos_wazuh_manager_registry_reviewer_validation.py
index d078678c..7b943d95 100644
--- a/apps/api/tests/test_iwooos_wazuh_manager_registry_reviewer_validation.py
+++ b/apps/api/tests/test_iwooos_wazuh_manager_registry_reviewer_validation.py
@@ -6,15 +6,78 @@ from fastapi.testclient import TestClient
from src.api.v1.iwooos import router
from src.services.iwooos_wazuh_manager_registry_reviewer_validation import (
load_latest_iwooos_wazuh_manager_registry_reviewer_validation,
+ validate_iwooos_wazuh_manager_registry_owner_export,
)
+EXPECTED_ALIASES = [
+ "managed_core_node_a",
+ "managed_core_node_b",
+ "managed_dev_node_a",
+ "managed_dev_node_b",
+ "managed_control_node_a",
+ "managed_control_node_b",
+]
+
+
def _client() -> TestClient:
app = FastAPI()
app.include_router(router)
return TestClient(app)
+def _valid_owner_export() -> dict:
+ return {
+ "owner_role": "資安負責人",
+ "team": "IwoooS",
+ "decision": "accept_readonly_registry_export_for_reviewer_validation",
+ "decision_reason": "Wazuh manager registry 已脫敏,供 reviewer 驗證主機覆蓋與 Dashboard API 修復狀態。",
+ "affected_scope": "iwooos_wazuh_expected_scope_aliases",
+ "collection_method": "owner_export_redacted_summary",
+ "agent_total": 6,
+ "agent_active": 2,
+ "agent_disconnected": 3,
+ "agent_never_connected": 1,
+ "last_seen_window_start": "2026-06-27T15:00:00+08:00",
+ "last_seen_window_end": "2026-06-27T16:00:00+08:00",
+ "registry_collected_at": "2026-06-27T16:05:00+08:00",
+ "registry_export_scope_aliases": EXPECTED_ALIASES,
+ "per_host_registry_matrix": [
+ {
+ "node_alias": alias,
+ "scope_role": "managed_scope",
+ "registry_presence": "present",
+ "agent_status_bucket": "active" if index < 2 else "disconnected",
+ "last_seen_state": "within_owner_window" if index < 2 else "outside_owner_window",
+ "manager_group_ref": f"group-ref-{index}",
+ "agent_id_redacted_ref": f"agent-redacted-ref-{index}",
+ "gap_reason": "none" if index < 2 else "owner_followup_required",
+ "redacted_evidence_ref": f"evidence-ref-{index}",
+ }
+ for index, alias in enumerate(EXPECTED_ALIASES)
+ ],
+ "registry_gap_reason_by_alias": {
+ alias: "none" if index < 2 else "owner_followup_required"
+ for index, alias in enumerate(EXPECTED_ALIASES)
+ },
+ "registry_export_summary_ref": "evidence-ref-registry-summary",
+ "manager_health_ref": "evidence-ref-manager-health",
+ "dashboard_api_status_ref": "evidence-ref-dashboard-api",
+ "dashboard_api_connection_check_status": "ok",
+ "dashboard_api_version_check_status": "ok",
+ "dashboard_index_pattern_statuses": ["alerts_ok", "monitoring_ok", "statistics_ok"],
+ "dashboard_api_degradation_root_cause": "stored_api_connection_repaired_by_owner",
+ "dashboard_api_repair_postcheck_ref": "evidence-ref-dashboard-postcheck",
+ "redacted_evidence_refs": [
+ "evidence-ref-registry-summary",
+ "evidence-ref-dashboard-postcheck",
+ ],
+ "followup_owner": "IwoooS reviewer",
+ "rollback_owner": "IwoooS runtime owner",
+ "postcheck_plan": "post_enable_iwooos_readback_no_raw_payload",
+ }
+
+
def test_iwooos_wazuh_manager_registry_reviewer_validation_contract_is_waiting_only() -> None:
payload = load_latest_iwooos_wazuh_manager_registry_reviewer_validation()
@@ -94,3 +157,82 @@ def test_iwooos_wazuh_manager_registry_reviewer_validation_api_is_public_safe()
assert "source_thread_id" not in response.text
assert "owenhytsai/" not in response.text
assert "WAZUH_API_PASSWORD" not in response.text
+
+
+def test_iwooos_wazuh_manager_registry_owner_export_validation_accepts_redacted_payload() -> None:
+ payload = validate_iwooos_wazuh_manager_registry_owner_export(_valid_owner_export())
+
+ assert payload["schema_version"] == "iwooos_wazuh_manager_registry_owner_export_validation_result_v1"
+ assert payload["status"] == "accepted_for_readonly_posture_only"
+ assert payload["accepted_for_readonly_posture_only"] is True
+ assert payload["reviewer_validation_passed"] is True
+ assert payload["summary"]["owner_registry_export_received_count"] == 1
+ assert payload["summary"]["owner_registry_export_accepted_count"] == 1
+ assert payload["summary"]["manager_registry_accepted_count"] == 0
+ assert payload["summary"]["runtime_gate_count"] == 0
+ assert payload["boundaries"]["payload_persisted"] is False
+ assert payload["boundaries"]["runtime_execution_authorized"] is False
+ assert payload["boundaries"]["manager_registry_accepted_updated"] is False
+ assert all(slot["received"] is True for slot in payload["evidence_slots"])
+ assert all(slot["accepted"] is True for slot in payload["evidence_slots"])
+
+
+def test_iwooos_wazuh_manager_registry_owner_export_validation_api_does_not_update_global_counters() -> None:
+ client = _client()
+ response = client.post(
+ "/api/v1/iwooos/wazuh-manager-registry-reviewer-validation/validate-owner-export",
+ json=_valid_owner_export(),
+ )
+
+ assert response.status_code == 200
+ result = response.json()
+ assert result["status"] == "accepted_for_readonly_posture_only"
+ assert result["summary"]["owner_registry_export_accepted_count"] == 1
+ assert result["summary"]["manager_registry_accepted_count"] == 0
+ assert result["summary"]["runtime_gate_count"] == 0
+
+ readback = client.get("/api/v1/iwooos/wazuh-manager-registry-reviewer-validation").json()
+ assert readback["summary"]["owner_registry_export_received_count"] == 0
+ assert readback["summary"]["owner_registry_export_accepted_count"] == 0
+ assert readback["summary"]["manager_registry_accepted_count"] == 0
+ assert readback["summary"]["runtime_gate_count"] == 0
+
+
+def test_iwooos_wazuh_manager_registry_owner_export_validation_requests_missing_fields() -> None:
+ candidate = _valid_owner_export()
+ candidate.pop("decision_reason")
+
+ payload = validate_iwooos_wazuh_manager_registry_owner_export(candidate)
+
+ assert payload["status"] == "request_missing_fields"
+ assert payload["accepted_for_readonly_posture_only"] is False
+ assert payload["summary"]["owner_registry_export_received_count"] == 1
+ assert payload["summary"]["owner_registry_export_accepted_count"] == 0
+ assert any("decision_reason" in finding["field_paths"] for finding in payload["validation_findings"])
+
+
+def test_iwooos_wazuh_manager_registry_owner_export_validation_quarantines_sensitive_payload() -> None:
+ candidate = _valid_owner_export()
+ candidate["manager_health_ref"] = "health evidence mentioned 10.250.250.250 by mistake"
+
+ payload = validate_iwooos_wazuh_manager_registry_owner_export(candidate)
+
+ assert payload["status"] == "quarantine_sensitive_payload"
+ assert payload["quarantined"] is True
+ assert payload["summary"]["reviewer_validation_quarantined_count"] == 1
+ assert payload["summary"]["owner_registry_export_accepted_count"] == 0
+ assert "10.250.250.250" not in str(payload)
+ assert any(finding["check_id"] == "RV-07" for finding in payload["validation_findings"])
+
+
+def test_iwooos_wazuh_manager_registry_owner_export_validation_rejects_runtime_action_request() -> None:
+ candidate = _valid_owner_export()
+ candidate["requested_actions"] = ["wazuh_active_response"]
+
+ payload = validate_iwooos_wazuh_manager_registry_owner_export(candidate)
+
+ assert payload["status"] == "reject_runtime_action_request"
+ assert payload["runtime_action_rejected"] is True
+ assert payload["summary"]["owner_registry_export_accepted_count"] == 0
+ assert payload["summary"]["runtime_gate_count"] == 0
+ assert any(finding["check_id"] == "RV-09" for finding in payload["validation_findings"])
diff --git a/apps/web/messages/en.json b/apps/web/messages/en.json
index cf73e717..502ff97d 100644
--- a/apps/web/messages/en.json
+++ b/apps/web/messages/en.json
@@ -20838,6 +20838,8 @@
"title": "Owner export 進來後,先由 reviewer 驗收脫敏清單",
"subtitle": "這張卡固定 Wazuh manager registry owner export 的驗收規則:欄位、計數、公開別名矩陣、Dashboard API 修復讀回、唯讀 credential metadata、拒收內容與下一個 Gate 都先可視化;目前尚未收到、尚未接受,也不開 runtime。",
"loadingBoundary": "正在讀取 Wazuh manager registry reviewer validation API",
+ "validationEndpointLabel": "脫敏 owner export 驗證端點",
+ "validationModeLabel": "驗證模式",
"slotReceivedLabel": "已收件",
"slotAcceptedLabel": "已接受",
"slotNextGateLabel": "下一關",
diff --git a/apps/web/messages/zh-TW.json b/apps/web/messages/zh-TW.json
index cf73e717..502ff97d 100644
--- a/apps/web/messages/zh-TW.json
+++ b/apps/web/messages/zh-TW.json
@@ -20838,6 +20838,8 @@
"title": "Owner export 進來後,先由 reviewer 驗收脫敏清單",
"subtitle": "這張卡固定 Wazuh manager registry owner export 的驗收規則:欄位、計數、公開別名矩陣、Dashboard API 修復讀回、唯讀 credential metadata、拒收內容與下一個 Gate 都先可視化;目前尚未收到、尚未接受,也不開 runtime。",
"loadingBoundary": "正在讀取 Wazuh manager registry reviewer validation API",
+ "validationEndpointLabel": "脫敏 owner export 驗證端點",
+ "validationModeLabel": "驗證模式",
"slotReceivedLabel": "已收件",
"slotAcceptedLabel": "已接受",
"slotNextGateLabel": "下一關",
diff --git a/apps/web/src/app/[locale]/iwooos/page.tsx b/apps/web/src/app/[locale]/iwooos/page.tsx
index c917fd7c..e37ac42a 100644
--- a/apps/web/src/app/[locale]/iwooos/page.tsx
+++ b/apps/web/src/app/[locale]/iwooos/page.tsx
@@ -2474,6 +2474,7 @@ const wazuhManagedHostCoverageBoundaries = [
const wazuhManagerRegistryReviewerValidationBoundaries = [
'wazuh_manager_registry_reviewer_validation_visible=true',
+ 'wazuh_manager_registry_owner_export_validation_api_available=true',
'wazuh_manager_registry_reviewer_validation_expected_scope_alias_count=6',
'wazuh_manager_registry_reviewer_validation_required_owner_field_count=28',
'wazuh_manager_registry_reviewer_validation_per_host_required_field_count=9',
@@ -9845,6 +9846,9 @@ function IwoooSWazuhManagerRegistryReviewerValidationBoard() {
: loading
? [t('loadingBoundary')]
: wazuhManagerRegistryReviewerValidationBoundaries
+ const validationEndpoint = data?.owner_export_validation_endpoint
+ ?? '/api/v1/iwooos/wazuh-manager-registry-reviewer-validation/validate-owner-export'
+ const validationMode = data?.owner_export_validation_mode ?? 'no_persist_validation_no_runtime_action'
const evidenceSlots = data?.evidence_slots ?? []
const visibleChecks = data?.reviewer_validation_checks?.slice(0, 4) ?? []
const statusText = loading ? t('status.loading') : failed ? t('status.failed') : t('status.ready')
@@ -9870,6 +9874,10 @@ function IwoooSWazuhManagerRegistryReviewerValidationBoard() {
{validationEndpoint}
+ {t('validationModeLabel')}:{validationMode}
+