feat(iwooos): validate wazuh owner registry exports
Some checks failed
CD Pipeline / tests (push) Successful in 1m43s
CD Pipeline / build-and-deploy (push) Has been cancelled
CD Pipeline / post-deploy-checks (push) Has been cancelled
Code Review / ai-code-review (push) Has been cancelled

This commit is contained in:
Your Name
2026-06-27 19:31:47 +08:00
parent ce0c7cbaf8
commit 82a73250f4
8 changed files with 673 additions and 4 deletions

View File

@@ -37,6 +37,7 @@ from src.services.iwooos_wazuh_managed_host_coverage import (
)
from src.services.iwooos_wazuh_manager_registry_reviewer_validation import (
load_latest_iwooos_wazuh_manager_registry_reviewer_validation,
validate_iwooos_wazuh_manager_registry_owner_export as validate_wazuh_manager_registry_owner_export_payload,
)
from src.services.iwooos_wazuh_owner_evidence_preflight import (
load_latest_iwooos_wazuh_owner_evidence_preflight,
@@ -178,6 +179,38 @@ async def get_iwooos_wazuh_manager_registry_reviewer_validation() -> dict[str, A
) from exc
@router.post(
"/api/v1/iwooos/wazuh-manager-registry-reviewer-validation/validate-owner-export",
response_model=dict[str, Any],
summary="驗證 Wazuh manager registry 脫敏 owner export",
description=(
"針對單次 owner-provided redacted Wazuh manager registry export 進行 no-persist reviewer "
"validation回傳 accepted / needs supplement / quarantined / rejected runtime action 分流。"
"此端點不保存 payload、不查 Wazuh API、不讀主機、不重新註冊 agent、不重啟服務、不讀或回傳"
"機密明文、不啟用主動回應、不改 Nginx / Docker / K8s / firewall也不更新 manager registry "
"accepted 總帳。"
),
)
async def validate_iwooos_wazuh_manager_registry_owner_export(owner_export: dict[str, Any]) -> dict[str, Any]:
"""回傳單次 Wazuh manager registry 脫敏匯出的公開安全驗證結果。"""
try:
payload = await asyncio.to_thread(
validate_wazuh_manager_registry_owner_export_payload,
owner_export,
)
return redact_public_lan_topology(payload)
except FileNotFoundError as exc:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=str(exc),
) from exc
except (json.JSONDecodeError, ValueError) as exc:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"IwoooS Wazuh manager registry owner export 驗證器無效:{exc}",
) from exc
@router.get(
"/api/v1/iwooos/runtime-security-readback",
response_model=dict[str, Any],

View File

@@ -1,15 +1,16 @@
"""
IwoooS Wazuh manager registry reviewer validation readback.
This service exposes a committed reviewer-validation contract for future
owner-provided redacted Wazuh manager registry exports. It never receives raw
payloads, queries Wazuh, reads host data, reads secrets, or authorizes runtime
actions.
This service exposes a committed reviewer-validation contract and a no-persist
validator for owner-provided redacted Wazuh manager registry exports. It never
queries Wazuh, reads host data, reads secrets, persists raw payloads, or
authorizes runtime actions.
"""
from __future__ import annotations
import json
import re
from pathlib import Path
from typing import Any
@@ -34,6 +35,82 @@ _REQUIRED_FALSE_BOUNDARIES = {
"wazuh_manager_restart_authorized",
}
_SENSITIVE_TEXT_PATTERNS = {
"internal_ip": re.compile(r"\b(?:10|127|172\.(?:1[6-9]|2\d|3[01])|192\.168)\.\d{1,3}\.\d{1,3}\b"),
"authorization_header": re.compile(r"Authorization\s*:", re.IGNORECASE),
"bearer_token": re.compile(r"Bearer\s+[A-Za-z0-9._-]{10,}", re.IGNORECASE),
"basic_auth": re.compile(r"Basic\s+[A-Za-z0-9+/=]{10,}", re.IGNORECASE),
"password_assignment": re.compile(r"password\s*[:=]\s*['\"][^'\"]+['\"]", re.IGNORECASE),
"token_assignment": re.compile(r"token\s*[:=]\s*['\"][^'\"]+['\"]", re.IGNORECASE),
"cookie_assignment": re.compile(r"cookie\s*[:=]\s*['\"][^'\"]+['\"]", re.IGNORECASE),
"client_keys": re.compile(r"client\.keys", re.IGNORECASE),
"private_key": re.compile(r"-----BEGIN [A-Z ]*PRIVATE KEY-----"),
"work_window_text": re.compile(r"(工作視窗|批准!繼續|source_thread_id|owenhytsai/)", re.IGNORECASE),
}
_FORBIDDEN_KEY_FRAGMENTS = {
"authorization_header",
"basic_auth",
"bearer_token",
"client_keys",
"cookie",
"dashboard_api_secret",
"firewall_change",
"full_cli_output",
"full_journal",
"host_write",
"hostname",
"internal_ip",
"nginx_reload",
"password",
"private_key",
"raw_dashboard_request",
"raw_log",
"raw_wazuh_payload",
"stored_api_password",
"unredacted_screenshot",
}
_RUNTIME_ACTION_KEYS = {
"active_response_enable",
"agent_reenroll",
"agent_restart",
"argocd_sync",
"firewall_change",
"host_write",
"k8s_or_argocd_change",
"kali_active_scan",
"nginx_reload",
"runtime_execution_authorized",
"secret_rotation",
"wazuh_active_response",
"wazuh_agent_reenroll",
"wazuh_agent_restart",
"wazuh_api_live_query",
"wazuh_dashboard_secret_patch",
"wazuh_manager_restart",
}
_DASHBOARD_REQUIRED_FIELDS = {
"dashboard_api_connection_check_status",
"dashboard_api_version_check_status",
"dashboard_index_pattern_statuses",
"dashboard_api_degradation_root_cause",
"dashboard_api_repair_postcheck_ref",
}
_READONLY_CREDENTIAL_REQUIRED_FIELDS = {
"collection_method",
"manager_health_ref",
"redacted_evidence_refs",
}
_ACCOUNTABILITY_REQUIRED_FIELDS = {
"followup_owner",
"rollback_owner",
"postcheck_plan",
}
def load_latest_iwooos_wazuh_manager_registry_reviewer_validation(
security_dir: Path | None = None,
@@ -76,6 +153,10 @@ def load_latest_iwooos_wazuh_manager_registry_reviewer_validation(
f"docs/security/{_SNAPSHOT_FILE}",
"scripts/security/wazuh-manager-registry-reviewer-validation.py",
],
"owner_export_validation_endpoint": (
"/api/v1/iwooos/wazuh-manager-registry-reviewer-validation/validate-owner-export"
),
"owner_export_validation_mode": "no_persist_validation_no_runtime_action",
"summary": merged_summary,
"expected_scope_aliases": _strings(snapshot.get("expected_scope_aliases")),
"reviewer_validation_checks": _checks(snapshot.get("reviewer_validation_checks")),
@@ -173,6 +254,7 @@ def _evidence_slots(value: Any) -> list[dict[str, Any]]:
def _boundary_markers(summary: dict[str, int]) -> list[str]:
return [
"wazuh_manager_registry_reviewer_validation_visible=true",
"wazuh_manager_registry_owner_export_validation_api_available=true",
f"wazuh_manager_registry_reviewer_validation_expected_scope_alias_count={summary['expected_scope_alias_count']}",
f"wazuh_manager_registry_reviewer_validation_required_owner_field_count={summary['required_owner_field_count']}",
f"wazuh_manager_registry_reviewer_validation_per_host_required_field_count={summary['per_host_required_field_count']}",
@@ -225,3 +307,394 @@ def _require_boundaries(payload: dict[str, Any]) -> None:
raise ValueError(f"Wazuh manager registry reviewer validation execution_boundaries.{key} 必須維持 false")
if boundaries.get("not_authorization") is not True:
raise ValueError("Wazuh manager registry reviewer validation not_authorization 必須維持 true")
def validate_iwooos_wazuh_manager_registry_owner_export(
owner_export: dict[str, Any],
security_dir: Path | None = None,
) -> dict[str, Any]:
"""Validate one redacted owner export without persisting it or changing runtime truth."""
contract = load_latest_iwooos_wazuh_manager_registry_reviewer_validation(security_dir)
snapshot = _load_snapshot(security_dir or _DEFAULT_SECURITY_DIR)
expected_aliases = set(_strings(snapshot.get("expected_scope_aliases")))
required_owner_fields = _strings(snapshot.get("required_owner_fields"))
per_host_required_fields = _strings(snapshot.get("per_host_required_fields"))
findings: list[dict[str, Any]] = []
evidence_status = _initial_evidence_status(snapshot)
if not isinstance(owner_export, dict):
findings.append(_finding("RV-01", "blocker", "request_missing_fields", "owner export 必須是 JSON object", []))
return _validation_result(contract, "request_missing_fields", findings, evidence_status)
sensitive_hits = _collect_sensitive_hits(owner_export)
if sensitive_hits:
findings.append(
_finding(
"RV-07",
"critical",
"quarantine_sensitive_payload",
"owner export 含禁止內容或疑似未脫敏內容,已進隔離分流;回應不回傳原始值。",
[hit["path"] for hit in sensitive_hits[:12]],
{"categories": sorted({hit["category"] for hit in sensitive_hits})},
)
)
return _validation_result(contract, "quarantine_sensitive_payload", findings, evidence_status)
runtime_hits = _collect_runtime_action_hits(owner_export)
if runtime_hits:
findings.append(
_finding(
"RV-09",
"critical",
"reject_runtime_action_request",
"owner export 夾帶 runtime action request收件驗證只允許脫敏證據不授權 active response、restart、host write 或掃描。",
runtime_hits[:12],
)
)
return _validation_result(contract, "reject_runtime_action_request", findings, evidence_status)
missing_owner_fields = [field for field in required_owner_fields if not _present(owner_export.get(field))]
if missing_owner_fields:
findings.append(
_finding(
"RV-01",
"blocker",
"request_missing_fields",
"owner export envelope 欄位不足,需要補齊後再驗證。",
missing_owner_fields,
)
)
count_issue = _validate_counts(owner_export)
if count_issue:
findings.append(
_finding(
"RV-02",
"blocker",
"request_counts_arithmetic_fix",
count_issue,
["agent_total", "agent_active", "agent_disconnected", "agent_never_connected"],
)
)
alias_issue = _validate_aliases(owner_export.get("registry_export_scope_aliases"), expected_aliases)
if alias_issue:
findings.append(
_finding(
"RV-03",
"blocker",
"request_alias_scope_parity_fix",
alias_issue,
["registry_export_scope_aliases"],
)
)
matrix_issue_paths = _validate_per_host_matrix(
owner_export.get("per_host_registry_matrix"),
expected_aliases,
per_host_required_fields,
)
if matrix_issue_paths:
findings.append(
_finding(
"RV-04",
"blocker",
"request_per_host_matrix_supplement",
"逐主機矩陣未完整覆蓋 6 個公開別名或缺少必要欄位。",
matrix_issue_paths[:30],
)
)
dashboard_missing = [field for field in sorted(_DASHBOARD_REQUIRED_FIELDS) if not _present(owner_export.get(field))]
if dashboard_missing:
findings.append(
_finding(
"RV-05",
"blocker",
"request_dashboard_api_repair_postcheck",
"Dashboard API connection / version / index pattern / root cause / repair postcheck 必須分欄。",
dashboard_missing,
)
)
readonly_missing = [field for field in sorted(_READONLY_CREDENTIAL_REQUIRED_FIELDS) if not _present(owner_export.get(field))]
if readonly_missing:
findings.append(
_finding(
"RV-06",
"blocker",
"request_readonly_credential_metadata",
"唯讀 credential metadata 與 manager health ref 必須可追溯,且不得含 secret value。",
readonly_missing,
)
)
accountability_missing = [field for field in sorted(_ACCOUNTABILITY_REQUIRED_FIELDS) if not _present(owner_export.get(field))]
if accountability_missing:
findings.append(
_finding(
"RV-08",
"blocker",
"request_owner_accountability_supplement",
"followup owner、rollback owner 與 postcheck plan 必須可讀。",
accountability_missing,
)
)
_mark_evidence_status(evidence_status, owner_export)
outcome = _first_blocking_lane(findings) or "accepted_for_readonly_posture_only"
if outcome == "accepted_for_readonly_posture_only":
evidence_status = [
{**slot, "received": True, "accepted": True, "quarantined": False}
for slot in evidence_status
]
findings.append(
_finding(
"RV-10",
"info",
"waiting_post_enable_iwooos_readback",
"owner export 已通過 no-persist reviewer validation下一步仍只能進 post-enable IwoooS readback不開 runtime gate。",
["post_enable_iwooos_readback"],
)
)
return _validation_result(contract, outcome, findings, evidence_status)
def _validation_result(
contract: dict[str, Any],
outcome_lane: str,
findings: list[dict[str, Any]],
evidence_status: list[dict[str, Any]],
) -> dict[str, Any]:
accepted = outcome_lane == "accepted_for_readonly_posture_only"
quarantined = outcome_lane == "quarantine_sensitive_payload"
rejected_runtime = outcome_lane == "reject_runtime_action_request"
return {
"schema_version": "iwooos_wazuh_manager_registry_owner_export_validation_result_v1",
"contract_schema_version": contract["schema_version"],
"status": outcome_lane,
"mode": "no_persist_validation_no_runtime_no_secret_collection",
"outcome_lane": outcome_lane,
"accepted_for_readonly_posture_only": accepted,
"reviewer_validation_passed": accepted,
"quarantined": quarantined,
"runtime_action_rejected": rejected_runtime,
"summary": {
"owner_registry_export_received_count": 1,
"owner_registry_export_accepted_count": 1 if accepted else 0,
"reviewer_validation_passed_count": 1 if accepted else 0,
"reviewer_validation_quarantined_count": 1 if quarantined else 0,
"manager_registry_accepted_count": 0,
"post_enable_readback_passed_count": 0,
"runtime_gate_count": 0,
"host_write_authorized_count": 0,
"active_response_authorized_count": 0,
"secret_value_collection_allowed_count": 0,
"finding_count": len(findings),
},
"validation_findings": findings,
"evidence_slots": evidence_status,
"boundary_markers": [
"wazuh_manager_registry_owner_export_validation_received_count=1",
f"wazuh_manager_registry_owner_export_validation_accepted_count={1 if accepted else 0}",
f"wazuh_manager_registry_owner_export_validation_quarantined_count={1 if quarantined else 0}",
"wazuh_manager_registry_owner_export_validation_manager_registry_accepted_count=0",
"wazuh_manager_registry_owner_export_validation_runtime_gate_count=0",
"wazuh_manager_registry_owner_export_validation_no_persist=true",
"wazuh_api_live_query_authorized=false",
"wazuh_active_response_authorized=false",
"host_write_authorized=false",
"secret_value_collection_allowed=false",
"not_authorization=true",
],
"boundaries": {
"payload_persisted": False,
"wazuh_api_live_query_authorized": False,
"wazuh_agent_reenroll_authorized": False,
"wazuh_agent_restart_authorized": False,
"wazuh_manager_restart_authorized": False,
"wazuh_active_response_authorized": False,
"host_write_authorized": False,
"secret_value_collection_allowed": False,
"raw_wazuh_payload_storage_allowed": False,
"kali_active_scan_authorized": False,
"runtime_execution_authorized": False,
"manager_registry_accepted_updated": False,
"not_authorization": True,
},
"next_gate": "post_enable_iwooos_readback" if accepted else "owner_export_fix_and_resubmit",
}
def _finding(
check_id: str,
severity: str,
lane: str,
message: str,
field_paths: list[str],
extra: dict[str, Any] | None = None,
) -> dict[str, Any]:
payload: dict[str, Any] = {
"check_id": check_id,
"severity": severity,
"lane": lane,
"message": message,
"field_paths": field_paths,
}
if extra:
payload.update(extra)
return payload
def _initial_evidence_status(snapshot: dict[str, Any]) -> list[dict[str, Any]]:
slots = _evidence_slots(snapshot.get("evidence_slots"))
return [{**slot, "received": False, "accepted": False, "quarantined": False} for slot in slots]
def _mark_evidence_status(evidence_status: list[dict[str, Any]], owner_export: dict[str, Any]) -> None:
for slot in evidence_status:
required_fields = slot.get("required_fields", [])
slot["received"] = bool(required_fields) and all(_present(owner_export.get(field)) for field in required_fields)
slot["accepted"] = False
slot["quarantined"] = False
def _present(value: Any) -> bool:
if value is None:
return False
if isinstance(value, str):
return bool(value.strip())
if isinstance(value, (list, dict, tuple, set)):
return bool(value)
return True
def _int_or_none(value: Any) -> int | None:
if isinstance(value, bool):
return None
if isinstance(value, int):
return value
if isinstance(value, str) and value.strip().isdigit():
return int(value.strip())
return None
def _validate_counts(owner_export: dict[str, Any]) -> str | None:
fields = ["agent_total", "agent_active", "agent_disconnected", "agent_never_connected"]
counts = {field: _int_or_none(owner_export.get(field)) for field in fields}
if any(value is None for value in counts.values()):
return "agent count 欄位必須是非負整數。"
if any(value is not None and value < 0 for value in counts.values()):
return "agent count 欄位不得為負數。"
total = counts["agent_total"]
active = counts["agent_active"]
disconnected = counts["agent_disconnected"]
never_connected = counts["agent_never_connected"]
if total is None or active is None or disconnected is None or never_connected is None:
return "agent count 欄位必須是非負整數。"
if total < active + disconnected + never_connected:
return "agent_total 不得小於 active + disconnected + never_connected。"
return None
def _validate_aliases(value: Any, expected_aliases: set[str]) -> str | None:
aliases = value if isinstance(value, list) else []
if not aliases or not all(isinstance(item, str) for item in aliases):
return "registry_export_scope_aliases 必須是公開別名字串陣列。"
alias_set = set(aliases)
if len(aliases) != len(alias_set):
return "registry_export_scope_aliases 不得重複。"
if alias_set != expected_aliases:
missing = sorted(expected_aliases - alias_set)
extra = sorted(alias_set - expected_aliases)
return f"registry_export_scope_aliases 必須剛好等於 6 個公開別名missing={missing} extra={extra}"
return None
def _validate_per_host_matrix(value: Any, expected_aliases: set[str], required_fields: list[str]) -> list[str]:
if not isinstance(value, list):
return ["per_host_registry_matrix"]
paths: list[str] = []
seen_aliases: list[str] = []
for index, item in enumerate(value):
if not isinstance(item, dict):
paths.append(f"per_host_registry_matrix[{index}]")
continue
alias = item.get("node_alias")
if not isinstance(alias, str) or alias not in expected_aliases:
paths.append(f"per_host_registry_matrix[{index}].node_alias")
else:
seen_aliases.append(alias)
for field in required_fields:
if not _present(item.get(field)):
paths.append(f"per_host_registry_matrix[{index}].{field}")
seen_set = set(seen_aliases)
if len(seen_aliases) != len(seen_set):
paths.append("per_host_registry_matrix.duplicate_node_alias")
for missing_alias in sorted(expected_aliases - seen_set):
paths.append(f"per_host_registry_matrix.missing.{missing_alias}")
for extra_alias in sorted(seen_set - expected_aliases):
paths.append(f"per_host_registry_matrix.extra.{extra_alias}")
return paths
def _collect_sensitive_hits(value: Any, path: str = "$") -> list[dict[str, str]]:
hits: list[dict[str, str]] = []
if isinstance(value, dict):
for key, item in value.items():
key_text = str(key)
key_lower = key_text.lower()
for fragment in _FORBIDDEN_KEY_FRAGMENTS:
if fragment in key_lower:
hits.append({"path": f"{path}.{key_text}", "category": f"forbidden_key:{fragment}"})
hits.extend(_collect_sensitive_hits(item, f"{path}.{key_text}"))
return hits
if isinstance(value, list):
for index, item in enumerate(value):
hits.extend(_collect_sensitive_hits(item, f"{path}[{index}]"))
return hits
if isinstance(value, str):
for category, pattern in _SENSITIVE_TEXT_PATTERNS.items():
if pattern.search(value):
hits.append({"path": path, "category": category})
return hits
def _collect_runtime_action_hits(value: Any, path: str = "$") -> list[str]:
hits: list[str] = []
if isinstance(value, dict):
for key, item in value.items():
key_text = str(key)
normalized_key = key_text.lower().replace("-", "_").replace(" ", "_")
if normalized_key in _RUNTIME_ACTION_KEYS and item not in (False, None, "", [], {}):
hits.append(f"{path}.{key_text}")
hits.extend(_collect_runtime_action_hits(item, f"{path}.{key_text}"))
return hits
if isinstance(value, list):
for index, item in enumerate(value):
hits.extend(_collect_runtime_action_hits(item, f"{path}[{index}]"))
return hits
if isinstance(value, str):
normalized = value.lower().replace("-", "_").replace(" ", "_")
if normalized in _RUNTIME_ACTION_KEYS:
hits.append(path)
return hits
def _first_blocking_lane(findings: list[dict[str, Any]]) -> str | None:
for lane in (
"quarantine_sensitive_payload",
"reject_runtime_action_request",
"request_missing_fields",
"request_counts_arithmetic_fix",
"request_alias_scope_parity_fix",
"request_per_host_matrix_supplement",
"request_dashboard_api_repair_postcheck",
"request_readonly_credential_metadata",
"request_owner_accountability_supplement",
):
if any(item.get("lane") == lane for item in findings):
return lane
return None

View File

@@ -6,15 +6,78 @@ from fastapi.testclient import TestClient
from src.api.v1.iwooos import router
from src.services.iwooos_wazuh_manager_registry_reviewer_validation import (
load_latest_iwooos_wazuh_manager_registry_reviewer_validation,
validate_iwooos_wazuh_manager_registry_owner_export,
)
EXPECTED_ALIASES = [
"managed_core_node_a",
"managed_core_node_b",
"managed_dev_node_a",
"managed_dev_node_b",
"managed_control_node_a",
"managed_control_node_b",
]
def _client() -> TestClient:
app = FastAPI()
app.include_router(router)
return TestClient(app)
def _valid_owner_export() -> dict:
return {
"owner_role": "資安負責人",
"team": "IwoooS",
"decision": "accept_readonly_registry_export_for_reviewer_validation",
"decision_reason": "Wazuh manager registry 已脫敏,供 reviewer 驗證主機覆蓋與 Dashboard API 修復狀態。",
"affected_scope": "iwooos_wazuh_expected_scope_aliases",
"collection_method": "owner_export_redacted_summary",
"agent_total": 6,
"agent_active": 2,
"agent_disconnected": 3,
"agent_never_connected": 1,
"last_seen_window_start": "2026-06-27T15:00:00+08:00",
"last_seen_window_end": "2026-06-27T16:00:00+08:00",
"registry_collected_at": "2026-06-27T16:05:00+08:00",
"registry_export_scope_aliases": EXPECTED_ALIASES,
"per_host_registry_matrix": [
{
"node_alias": alias,
"scope_role": "managed_scope",
"registry_presence": "present",
"agent_status_bucket": "active" if index < 2 else "disconnected",
"last_seen_state": "within_owner_window" if index < 2 else "outside_owner_window",
"manager_group_ref": f"group-ref-{index}",
"agent_id_redacted_ref": f"agent-redacted-ref-{index}",
"gap_reason": "none" if index < 2 else "owner_followup_required",
"redacted_evidence_ref": f"evidence-ref-{index}",
}
for index, alias in enumerate(EXPECTED_ALIASES)
],
"registry_gap_reason_by_alias": {
alias: "none" if index < 2 else "owner_followup_required"
for index, alias in enumerate(EXPECTED_ALIASES)
},
"registry_export_summary_ref": "evidence-ref-registry-summary",
"manager_health_ref": "evidence-ref-manager-health",
"dashboard_api_status_ref": "evidence-ref-dashboard-api",
"dashboard_api_connection_check_status": "ok",
"dashboard_api_version_check_status": "ok",
"dashboard_index_pattern_statuses": ["alerts_ok", "monitoring_ok", "statistics_ok"],
"dashboard_api_degradation_root_cause": "stored_api_connection_repaired_by_owner",
"dashboard_api_repair_postcheck_ref": "evidence-ref-dashboard-postcheck",
"redacted_evidence_refs": [
"evidence-ref-registry-summary",
"evidence-ref-dashboard-postcheck",
],
"followup_owner": "IwoooS reviewer",
"rollback_owner": "IwoooS runtime owner",
"postcheck_plan": "post_enable_iwooos_readback_no_raw_payload",
}
def test_iwooos_wazuh_manager_registry_reviewer_validation_contract_is_waiting_only() -> None:
payload = load_latest_iwooos_wazuh_manager_registry_reviewer_validation()
@@ -94,3 +157,82 @@ def test_iwooos_wazuh_manager_registry_reviewer_validation_api_is_public_safe()
assert "source_thread_id" not in response.text
assert "owenhytsai/" not in response.text
assert "WAZUH_API_PASSWORD" not in response.text
def test_iwooos_wazuh_manager_registry_owner_export_validation_accepts_redacted_payload() -> None:
payload = validate_iwooos_wazuh_manager_registry_owner_export(_valid_owner_export())
assert payload["schema_version"] == "iwooos_wazuh_manager_registry_owner_export_validation_result_v1"
assert payload["status"] == "accepted_for_readonly_posture_only"
assert payload["accepted_for_readonly_posture_only"] is True
assert payload["reviewer_validation_passed"] is True
assert payload["summary"]["owner_registry_export_received_count"] == 1
assert payload["summary"]["owner_registry_export_accepted_count"] == 1
assert payload["summary"]["manager_registry_accepted_count"] == 0
assert payload["summary"]["runtime_gate_count"] == 0
assert payload["boundaries"]["payload_persisted"] is False
assert payload["boundaries"]["runtime_execution_authorized"] is False
assert payload["boundaries"]["manager_registry_accepted_updated"] is False
assert all(slot["received"] is True for slot in payload["evidence_slots"])
assert all(slot["accepted"] is True for slot in payload["evidence_slots"])
def test_iwooos_wazuh_manager_registry_owner_export_validation_api_does_not_update_global_counters() -> None:
client = _client()
response = client.post(
"/api/v1/iwooos/wazuh-manager-registry-reviewer-validation/validate-owner-export",
json=_valid_owner_export(),
)
assert response.status_code == 200
result = response.json()
assert result["status"] == "accepted_for_readonly_posture_only"
assert result["summary"]["owner_registry_export_accepted_count"] == 1
assert result["summary"]["manager_registry_accepted_count"] == 0
assert result["summary"]["runtime_gate_count"] == 0
readback = client.get("/api/v1/iwooos/wazuh-manager-registry-reviewer-validation").json()
assert readback["summary"]["owner_registry_export_received_count"] == 0
assert readback["summary"]["owner_registry_export_accepted_count"] == 0
assert readback["summary"]["manager_registry_accepted_count"] == 0
assert readback["summary"]["runtime_gate_count"] == 0
def test_iwooos_wazuh_manager_registry_owner_export_validation_requests_missing_fields() -> None:
candidate = _valid_owner_export()
candidate.pop("decision_reason")
payload = validate_iwooos_wazuh_manager_registry_owner_export(candidate)
assert payload["status"] == "request_missing_fields"
assert payload["accepted_for_readonly_posture_only"] is False
assert payload["summary"]["owner_registry_export_received_count"] == 1
assert payload["summary"]["owner_registry_export_accepted_count"] == 0
assert any("decision_reason" in finding["field_paths"] for finding in payload["validation_findings"])
def test_iwooos_wazuh_manager_registry_owner_export_validation_quarantines_sensitive_payload() -> None:
candidate = _valid_owner_export()
candidate["manager_health_ref"] = "health evidence mentioned 10.250.250.250 by mistake"
payload = validate_iwooos_wazuh_manager_registry_owner_export(candidate)
assert payload["status"] == "quarantine_sensitive_payload"
assert payload["quarantined"] is True
assert payload["summary"]["reviewer_validation_quarantined_count"] == 1
assert payload["summary"]["owner_registry_export_accepted_count"] == 0
assert "10.250.250.250" not in str(payload)
assert any(finding["check_id"] == "RV-07" for finding in payload["validation_findings"])
def test_iwooos_wazuh_manager_registry_owner_export_validation_rejects_runtime_action_request() -> None:
candidate = _valid_owner_export()
candidate["requested_actions"] = ["wazuh_active_response"]
payload = validate_iwooos_wazuh_manager_registry_owner_export(candidate)
assert payload["status"] == "reject_runtime_action_request"
assert payload["runtime_action_rejected"] is True
assert payload["summary"]["owner_registry_export_accepted_count"] == 0
assert payload["summary"]["runtime_gate_count"] == 0
assert any(finding["check_id"] == "RV-09" for finding in payload["validation_findings"])

View File

@@ -20838,6 +20838,8 @@
"title": "Owner export 進來後,先由 reviewer 驗收脫敏清單",
"subtitle": "這張卡固定 Wazuh manager registry owner export 的驗收規則欄位、計數、公開別名矩陣、Dashboard API 修復讀回、唯讀 credential metadata、拒收內容與下一個 Gate 都先可視化;目前尚未收到、尚未接受,也不開 runtime。",
"loadingBoundary": "正在讀取 Wazuh manager registry reviewer validation API",
"validationEndpointLabel": "脫敏 owner export 驗證端點",
"validationModeLabel": "驗證模式",
"slotReceivedLabel": "已收件",
"slotAcceptedLabel": "已接受",
"slotNextGateLabel": "下一關",

View File

@@ -20838,6 +20838,8 @@
"title": "Owner export 進來後,先由 reviewer 驗收脫敏清單",
"subtitle": "這張卡固定 Wazuh manager registry owner export 的驗收規則欄位、計數、公開別名矩陣、Dashboard API 修復讀回、唯讀 credential metadata、拒收內容與下一個 Gate 都先可視化;目前尚未收到、尚未接受,也不開 runtime。",
"loadingBoundary": "正在讀取 Wazuh manager registry reviewer validation API",
"validationEndpointLabel": "脫敏 owner export 驗證端點",
"validationModeLabel": "驗證模式",
"slotReceivedLabel": "已收件",
"slotAcceptedLabel": "已接受",
"slotNextGateLabel": "下一關",

View File

@@ -2474,6 +2474,7 @@ const wazuhManagedHostCoverageBoundaries = [
const wazuhManagerRegistryReviewerValidationBoundaries = [
'wazuh_manager_registry_reviewer_validation_visible=true',
'wazuh_manager_registry_owner_export_validation_api_available=true',
'wazuh_manager_registry_reviewer_validation_expected_scope_alias_count=6',
'wazuh_manager_registry_reviewer_validation_required_owner_field_count=28',
'wazuh_manager_registry_reviewer_validation_per_host_required_field_count=9',
@@ -9845,6 +9846,9 @@ function IwoooSWazuhManagerRegistryReviewerValidationBoard() {
: loading
? [t('loadingBoundary')]
: wazuhManagerRegistryReviewerValidationBoundaries
const validationEndpoint = data?.owner_export_validation_endpoint
?? '/api/v1/iwooos/wazuh-manager-registry-reviewer-validation/validate-owner-export'
const validationMode = data?.owner_export_validation_mode ?? 'no_persist_validation_no_runtime_action'
const evidenceSlots = data?.evidence_slots ?? []
const visibleChecks = data?.reviewer_validation_checks?.slice(0, 4) ?? []
const statusText = loading ? t('status.loading') : failed ? t('status.failed') : t('status.ready')
@@ -9870,6 +9874,10 @@ function IwoooSWazuhManagerRegistryReviewerValidationBoard() {
<ToneDot tone={statusTone} />
{statusText}
</div>
<div style={{ marginTop: 10, display: 'grid', gap: 6, fontSize: 11, color: '#45686a', ...textWrap }}>
<span>{t('validationEndpointLabel')}<code style={{ color: '#2f6265', overflowWrap: 'anywhere' }}>{validationEndpoint}</code></span>
<span>{t('validationModeLabel')}<code style={{ color: '#2f6265', overflowWrap: 'anywhere' }}>{validationMode}</code></span>
</div>
</div>
<div style={{ display: 'grid', gridTemplateColumns: 'repeat(auto-fit, minmax(126px, 1fr))', gap: 8 }}>

View File

@@ -341,6 +341,8 @@ export interface IwoooSWazuhManagerRegistryReviewerValidationResponse {
status: string
mode: string
source_refs: string[]
owner_export_validation_endpoint: string
owner_export_validation_mode: string
summary: {
expected_scope_alias_count: number
required_owner_field_count: number

View File

@@ -29573,6 +29573,7 @@ def validate(root: Path) -> None:
"getIwoooSWazuhManagerRegistryReviewerValidation",
"apiClient.getIwoooSWazuhManagerRegistryReviewerValidation",
"Wazuh manager registry reviewer validation 已讀回",
"wazuh_manager_registry_owner_export_validation_api_available=true",
"wazuh_manager_registry_reviewer_validation_owner_registry_export_received_count=0",
"wazuh_manager_registry_reviewer_validation_owner_registry_export_accepted_count=0",
"wazuh_manager_registry_reviewer_validation_manager_registry_accepted_count=0",
@@ -29594,7 +29595,13 @@ def validate(root: Path) -> None:
"/api/v1/iwooos/wazuh-manager-registry-reviewer-validation",
"wazuh_manager_registry_reviewer_validation_v1",
"iwooos_wazuh_manager_registry_reviewer_validation_readback_v1",
"/api/v1/iwooos/wazuh-manager-registry-reviewer-validation/validate-owner-export",
"iwooos_wazuh_manager_registry_owner_export_validation_result_v1",
"validate_iwooos_wazuh_manager_registry_owner_export",
"test_iwooos_wazuh_manager_registry_reviewer_validation_api_is_public_safe",
"test_iwooos_wazuh_manager_registry_owner_export_validation_accepts_redacted_payload",
"test_iwooos_wazuh_manager_registry_owner_export_validation_quarantines_sensitive_payload",
"test_iwooos_wazuh_manager_registry_owner_export_validation_rejects_runtime_action_request",
"wazuh_manager_registry_reviewer_validation_owner_registry_export_received_count=0",
"wazuh_manager_registry_reviewer_validation_owner_registry_export_accepted_count=0",
"wazuh_manager_registry_reviewer_validation_manager_registry_accepted_count=0",