from __future__ import annotations from fastapi import FastAPI from fastapi.testclient import TestClient from src.api.v1.iwooos import router from src.services.iwooos_runtime_security_readback import ( load_latest_iwooos_runtime_security_readback, ) from src.services.iwooos_wazuh_allowlisted_check_mode_dry_run import ( load_latest_iwooos_wazuh_allowlisted_check_mode_dry_run, ) from src.services.iwooos_wazuh_runtime_controlled_apply_preflight import ( load_latest_iwooos_wazuh_runtime_controlled_apply_preflight, ) from src.services.iwooos_wazuh_runtime_gate_owner_review_readback import ( load_latest_iwooos_wazuh_runtime_gate_owner_review_readback, ) def _client() -> TestClient: app = FastAPI() app.include_router(router) return TestClient(app) def _valid_runtime_controlled_apply_packet() -> dict[str, object]: return { "controlled_apply_intent": "prepare_controlled_apply_preflight_only", "target_selector_aliases": [ "managed_core_node_a", "managed_core_node_b", "managed_core_node_c", "managed_edge_node_a", "managed_edge_node_b", "managed_lab_node_a", ], "source_of_truth_diff_ref": "docs/security/wazuh-runtime-controlled-apply-preflight.snapshot.json#source-diff", "check_mode_plan_ref": "playbooks/wazuh-controlled-apply-check-mode#redacted-plan", "dry_run_evidence_ref": "evidence/iwooos/wazuh-runtime-dry-run-redacted-v1", "blast_radius_statement": "public aliases only; no live Wazuh query and no host write in this preflight", "rollback_plan_ref": "playbooks/wazuh-controlled-apply-rollback#redacted-plan", "post_apply_verifier_ref": "verifiers/iwooos-wazuh-post-apply-readback#public-safe", "km_playbook_writeback_ref": "km/playbook-trust/wazuh-controlled-apply-preflight-v1", "maintenance_window": "low-traffic-window-required-before-any-future-apply", "followup_owner": "iwooos-security-reviewer", "rollback_owner": "iwooos-security-reviewer", "audit_receipt_ref": "audit/iwooos-wazuh-controlled-apply-preflight-redacted-v1", "runtime_boundary_ack": "runtime_gate_remains_closed", } def _valid_live_metadata_owner_packet() -> dict[str, object]: return { "wazuh_live_metadata_owner": "iwooos-security-owner", "release_readback_ref": "production-readback-http-200-disabled-owner-gate", "secret_injection_owner": "platform-secret-owner", "secret_source_metadata_ref": "secret-source-metadata-ref-redacted-v1", "wazuh_manager_health_ref": "wazuh-manager-health-ref-redacted-v1", "wazuh_api_tls_validation_ref": "wazuh-api-tls-validation-ref-redacted-v1", "readonly_account_scope_ref": "readonly-account-scope-ref-redacted-v1", "agent_alias_mapping_policy": "public aliases only; no raw agent identity or internal IP display", "post_enable_readback_command": "python3 scripts/security/wazuh-readonly-production-readback.py --json", "rollback_owner": "iwooos-security-owner", "maintenance_window": "low-traffic-window-required-before-any-future-enable", "validation_plan": "GET public-safe aggregate readback only; no raw Wazuh payload storage", "no_secret_value_attestation": "no_secret_value_collected", "no_raw_payload_attestation": "no_raw_wazuh_payload_stored", "active_response_separate_gate_ack": "active_response_requires_separate_gate", } def _valid_runtime_gate_owner_review_packet() -> dict[str, object]: return { "owner_review_intent": "commit_runtime_gate_owner_review_readback_only", "owner_reviewer_role": "iwooos-security-owner", "owner_review_decision": "accept_controlled_apply_review_readiness_only", "owner_review_decision_reason": "redacted owner-review packet accepts review readiness only; runtime gate remains closed", "target_selector_aliases": [ "managed_core_node_a", "managed_core_node_b", "managed_core_node_c", "managed_edge_node_a", "managed_edge_node_b", "managed_lab_node_a", ], "source_of_truth_diff_ref": "docs/security/wazuh-runtime-gate-owner-review-readback.snapshot.json#source-diff", "check_mode_plan_ref": "playbooks/wazuh-controlled-apply-check-mode#redacted-plan", "dry_run_evidence_ref": "evidence/iwooos/wazuh-runtime-owner-review-dry-run-redacted-v1", "blast_radius_statement": "public aliases only; no live Wazuh query and no host write in this owner-review readback", "maintenance_window_ref": "maintenance/iwooos-wazuh-low-traffic-window-redacted-v1", "rollback_plan_ref": "playbooks/wazuh-controlled-apply-rollback#redacted-plan", "rollback_owner": "iwooos-security-owner", "post_apply_verifier_ref": "verifiers/iwooos-wazuh-post-apply-readback#public-safe", "km_playbook_writeback_ref": "km/playbook-trust/wazuh-runtime-gate-owner-review-v1", "followup_owner": "iwooos-security-reviewer", "audit_receipt_ref": "audit/iwooos-wazuh-runtime-gate-owner-review-redacted-v1", "runtime_boundary_ack": "runtime_gate_remains_closed", "secret_boundary_ack": "no_secret_value_collected", "live_wazuh_query_boundary_ack": "no_live_wazuh_query_performed", } def _valid_allowlisted_check_mode_dry_run_packet() -> dict[str, object]: return { "dry_run_intent": "stage_allowlisted_check_mode_dry_run_readback_only", "target_selector_aliases": [ "managed_core_node_a", "managed_core_node_b", "managed_core_node_c", "managed_edge_node_a", "managed_edge_node_b", "managed_lab_node_a", ], "check_mode_plan_ref": "playbooks/wazuh-allowlisted-check-mode#redacted-plan", "dry_run_evidence_ref": "evidence/iwooos/wazuh-allowlisted-dry-run-redacted-v1", "dry_run_result_ref": "evidence/iwooos/wazuh-allowlisted-dry-run-result-redacted-v1", "dry_run_result_state": "staged_passed_no_runtime_action", "dry_run_redaction_attestation": "redacted_refs_only_no_raw_output", "post_dry_run_verifier_ref": "verifiers/iwooos-wazuh-post-dry-run-readback#public-safe", "rollback_revalidation_ref": "playbooks/wazuh-controlled-apply-rollback#revalidated-redacted", "km_playbook_writeback_ref": "km/playbook-trust/wazuh-allowlisted-check-mode-dry-run-v1", "followup_owner": "iwooos-security-reviewer", "audit_receipt_ref": "audit/iwooos-wazuh-allowlisted-check-mode-dry-run-redacted-v1", "runtime_boundary_ack": "runtime_gate_remains_closed", "live_wazuh_query_boundary_ack": "no_live_wazuh_query_performed", "host_write_boundary_ack": "no_host_write_performed", "secret_boundary_ack": "no_secret_value_collected", } def test_iwooos_runtime_security_readback_preserves_zero_runtime_gates() -> None: payload = load_latest_iwooos_runtime_security_readback() assert payload["schema_version"] == "iwooos_runtime_security_readback_v1" assert payload["status"] == "blocked_waiting_owner_evidence_and_runtime_gates" assert payload["summary"]["source_snapshot_count"] == 13 assert payload["summary"]["p0_lane_count"] == 12 assert payload["summary"]["runtime_gate_count"] == 0 assert payload["summary"]["owner_response_received_count"] == 0 assert payload["summary"]["owner_response_accepted_count"] == 0 assert payload["summary"]["wazuh_manager_registry_accepted_count"] == 6 assert payload["summary"]["wazuh_live_status"] == "not_checked_by_snapshot_loader" assert payload["summary"]["wazuh_live_route_degraded_count"] == 1 assert payload["summary"]["wazuh_live_readonly_api_enabled_count"] == 0 assert payload["summary"]["wazuh_live_metadata_available_count"] == 0 assert payload["summary"]["wazuh_live_metadata_gate_owner_accepted_count"] == 1 assert ( payload["summary"]["wazuh_live_metadata_gate_secret_source_accepted_count"] == 1 ) assert ( payload["summary"]["wazuh_live_metadata_gate_manager_health_accepted_count"] == 1 ) assert ( payload["summary"]["wazuh_live_metadata_gate_readonly_scope_accepted_count"] == 1 ) assert ( payload["summary"]["wazuh_live_metadata_gate_post_enable_readback_count"] == 0 ) assert ( payload["summary"]["wazuh_live_metadata_gate_live_query_authorized_count"] == 0 ) assert payload["summary"]["wazuh_owner_evidence_required_field_count"] == 28 assert payload["summary"]["wazuh_owner_evidence_reviewer_check_count"] == 15 assert payload["summary"]["wazuh_owner_evidence_outcome_lane_count"] == 8 assert payload["summary"]["wazuh_owner_evidence_forbidden_payload_count"] == 22 assert payload["summary"]["wazuh_owner_evidence_expected_alias_count"] == 6 assert ( payload["summary"]["wazuh_owner_evidence_registry_export_received_count"] == 0 ) assert ( payload["summary"]["wazuh_owner_evidence_registry_export_accepted_count"] == 0 ) assert payload["summary"]["wazuh_owner_evidence_received_count"] == 0 assert payload["summary"]["wazuh_owner_evidence_accepted_count"] == 0 assert payload["summary"]["wazuh_owner_evidence_runtime_gate_count"] == 0 assert payload["summary"]["wazuh_runtime_apply_preflight_ready_count"] == 1 assert payload["summary"]["wazuh_runtime_apply_target_selector_count"] == 6 assert payload["summary"]["wazuh_runtime_apply_source_diff_count"] == 1 assert payload["summary"]["wazuh_runtime_apply_check_mode_plan_count"] == 1 assert payload["summary"]["wazuh_runtime_apply_dry_run_required_count"] == 1 assert payload["summary"]["wazuh_runtime_apply_rollback_plan_count"] == 1 assert payload["summary"]["wazuh_runtime_apply_post_apply_verifier_count"] == 1 assert payload["summary"]["wazuh_runtime_apply_km_writeback_count"] == 1 assert payload["summary"]["wazuh_runtime_apply_owner_review_ready_count"] == 1 assert payload["summary"]["wazuh_runtime_apply_runtime_gate_count"] == 0 assert payload["summary"]["wazuh_runtime_owner_review_target_selector_count"] == 6 assert payload["summary"]["wazuh_runtime_owner_review_source_diff_count"] == 1 assert payload["summary"]["wazuh_runtime_owner_review_check_mode_plan_count"] == 1 assert payload["summary"]["wazuh_runtime_owner_review_dry_run_evidence_count"] == 1 assert payload["summary"]["wazuh_runtime_owner_review_rollback_plan_count"] == 1 assert ( payload["summary"]["wazuh_runtime_owner_review_post_apply_verifier_count"] == 1 ) assert payload["summary"]["wazuh_runtime_owner_review_km_writeback_count"] == 1 assert payload["summary"]["wazuh_runtime_owner_review_packet_received_count"] == 1 assert ( payload["summary"]["wazuh_runtime_owner_review_packet_review_ready_count"] == 1 ) assert payload["summary"]["wazuh_runtime_owner_review_packet_accepted_count"] == 1 assert payload["summary"]["wazuh_runtime_owner_review_runtime_gate_count"] == 0 assert ( payload["summary"][ "wazuh_allowlisted_check_mode_dry_run_target_selector_count" ] == 6 ) assert ( payload["summary"]["wazuh_allowlisted_check_mode_dry_run_check_mode_plan_count"] == 1 ) assert ( payload["summary"]["wazuh_allowlisted_check_mode_dry_run_evidence_ref_count"] == 1 ) assert ( payload["summary"]["wazuh_allowlisted_check_mode_dry_run_result_ref_count"] == 1 ) assert ( payload["summary"]["wazuh_allowlisted_check_mode_dry_run_packet_received_count"] == 1 ) assert ( payload["summary"]["wazuh_allowlisted_check_mode_dry_run_packet_accepted_count"] == 1 ) assert ( payload["summary"]["wazuh_allowlisted_check_mode_dry_run_post_verifier_count"] == 1 ) assert ( payload["summary"]["wazuh_allowlisted_check_mode_dry_run_runtime_gate_count"] == 0 ) assert payload["summary"]["kali_active_scan_authorized_count"] == 0 assert payload["summary"]["kali_execute_authorized_count"] == 0 assert payload["summary"]["alert_receipt_runtime_send_count"] == 0 assert payload["boundaries"]["runtime_execution_authorized"] is False assert payload["boundaries"]["active_scan_authorized"] is False assert payload["boundaries"]["wazuh_active_response_authorized"] is False assert payload["boundaries"]["telegram_send_authorized"] is False def test_iwooos_runtime_security_readback_lanes_are_candidate_only() -> None: payload = load_latest_iwooos_runtime_security_readback() lane_ids = {lane["lane_id"] for lane in payload["lanes"]} assert lane_ids == { "wazuh_registry", "wazuh_live_route", "wazuh_live_metadata_gate", "wazuh_owner_evidence_preflight", "wazuh_runtime_controlled_apply_preflight", "wazuh_runtime_gate_owner_review", "wazuh_allowlisted_check_mode_dry_run", "wazuh_dashboard_api", "kali_intake", "alert_readability", "owner_dispatch", "intrusion_prevention", } assert all(lane["metrics"] for lane in payload["lanes"]) assert all(lane["next_gate"] for lane in payload["lanes"]) assert all(lane["source_refs"] for lane in payload["lanes"]) assert any(lane["completion_percent"] > 0 for lane in payload["lanes"]) assert all( lane["lane_id"] != "wazuh_registry" or lane["completion_percent"] == 35 for lane in payload["lanes"] ) assert all( lane["lane_id"] != "wazuh_live_route" or lane["metrics"]["route_degraded"] == 1 for lane in payload["lanes"] ) assert all( lane["lane_id"] != "wazuh_live_metadata_gate" or ( lane["completion_percent"] == 40 and lane["metrics"]["owner_accepted"] == 1 and lane["metrics"]["secret_metadata_accepted"] == 1 and lane["metrics"]["post_enable_readback"] == 0 ) for lane in payload["lanes"] ) assert all( lane["lane_id"] != "wazuh_allowlisted_check_mode_dry_run" or ( lane["completion_percent"] == 65 and lane["metrics"]["packet_accepted"] == 1 and lane["metrics"]["runtime_gate"] == 0 ) for lane in payload["lanes"] ) assert all( lane["lane_id"] != "wazuh_owner_evidence_preflight" or lane["metrics"]["owner_accepted"] == 0 for lane in payload["lanes"] ) assert all( lane["lane_id"] != "wazuh_runtime_controlled_apply_preflight" or (lane["completion_percent"] == 45 and lane["metrics"]["runtime_gate"] == 0) for lane in payload["lanes"] ) assert all( lane["lane_id"] != "wazuh_runtime_gate_owner_review" or ( lane["completion_percent"] == 55 and lane["metrics"]["owner_review_accepted"] == 1 and lane["metrics"]["runtime_gate"] == 0 ) for lane in payload["lanes"] ) def test_iwooos_runtime_security_readback_api_is_public_safe(monkeypatch) -> None: monkeypatch.delenv("IWOOOS_WAZUH_READONLY_ENABLED", raising=False) monkeypatch.delenv("WAZUH_API_BASE_URL", raising=False) monkeypatch.delenv("WAZUH_API_USERNAME", raising=False) monkeypatch.delenv("WAZUH_API_PASSWORD", raising=False) response = _client().get("/api/v1/iwooos/runtime-security-readback") assert response.status_code == 200 data = response.json() assert data["schema_version"] == "iwooos_runtime_security_readback_v1" assert data["summary"]["runtime_gate_count"] == 0 assert ( data["summary"]["wazuh_live_status"] == "disabled_waiting_iwooos_wazuh_owner_gate" ) assert data["summary"]["wazuh_live_route_http_status"] == 200 assert data["summary"]["wazuh_live_route_degraded_count"] == 1 assert data["summary"]["wazuh_live_metadata_available_count"] == 0 assert data["summary"]["wazuh_live_metadata_gate_owner_accepted_count"] == 1 assert data["summary"]["wazuh_live_metadata_gate_live_query_authorized_count"] == 0 assert data["summary"]["wazuh_owner_evidence_accepted_count"] == 0 assert data["summary"]["wazuh_owner_evidence_runtime_gate_count"] == 0 assert data["summary"]["wazuh_runtime_apply_preflight_ready_count"] == 1 assert data["summary"]["wazuh_runtime_apply_runtime_gate_count"] == 0 assert data["summary"]["wazuh_runtime_owner_review_packet_accepted_count"] == 1 assert data["summary"]["wazuh_runtime_owner_review_runtime_gate_count"] == 0 assert data["boundaries"]["secret_value_collection_allowed"] is False assert "192.168.0." not in response.text assert "工作視窗" not in response.text assert "批准!繼續" not in response.text def test_iwooos_runtime_security_readback_api_includes_live_wazuh_empty_registry( monkeypatch, ) -> None: import httpx monkeypatch.setenv("IWOOOS_WAZUH_READONLY_ENABLED", "true") monkeypatch.setenv("WAZUH_API_BASE_URL", "https://wazuh.example.test:55000") monkeypatch.setenv("WAZUH_API_USERNAME", "readonly") monkeypatch.setenv("WAZUH_API_PASSWORD", "placeholder") monkeypatch.setenv("IWOOOS_WAZUH_EXPECTED_MIN_AGENT_COUNT", "6") def handler(request: httpx.Request) -> httpx.Response: if request.url.path == "/security/user/authenticate": return httpx.Response(200, json={"data": {"token": "token-value"}}) if request.url.path == "/agents/summary/status": return httpx.Response( 200, json={ "data": { "connection": { "total": 0, "active": 0, "disconnected": 0, "pending": 0, } } }, ) if request.url.path == "/agents": return httpx.Response(200, json={"data": {"affected_items": []}}) return httpx.Response(404) transport = httpx.MockTransport(handler) original_async_client = httpx.AsyncClient def client_factory(*args, **kwargs): kwargs["transport"] = transport return original_async_client(*args, **kwargs) monkeypatch.setattr(httpx, "AsyncClient", client_factory) response = _client().get("/api/v1/iwooos/runtime-security-readback") assert response.status_code == 200 data = response.json() assert data["summary"]["wazuh_live_status"] == "wazuh_agent_registry_empty" assert data["summary"]["wazuh_live_readonly_api_enabled_count"] == 1 assert data["summary"]["wazuh_live_agent_total"] == 0 assert data["summary"]["wazuh_live_registry_empty_count"] == 1 assert data["summary"]["wazuh_live_route_degraded_count"] == 1 assert data["summary"]["runtime_gate_count"] == 0 assert "token-value" not in response.text def test_iwooos_wazuh_live_metadata_gate_api_is_public_safe(monkeypatch) -> None: monkeypatch.delenv("IWOOOS_WAZUH_READONLY_ENABLED", raising=False) monkeypatch.delenv("WAZUH_API_BASE_URL", raising=False) monkeypatch.delenv("WAZUH_API_USERNAME", raising=False) monkeypatch.delenv("WAZUH_API_PASSWORD", raising=False) response = _client().get("/api/v1/iwooos/wazuh-live-metadata-gate") assert response.status_code == 200 data = response.json() assert data["schema_version"] == "iwooos_wazuh_live_metadata_gate_readback_v1" assert data["status"] == "ready_for_server_side_env_enable_review_no_secret_collection" assert data["summary"]["production_route_readback_passed_count"] == 1 assert data["summary"]["live_metadata_owner_response_received_count"] == 1 assert data["summary"]["live_metadata_owner_response_accepted_count"] == 1 assert data["summary"]["secret_source_metadata_accepted_count"] == 1 assert data["summary"]["wazuh_manager_health_ref_accepted_count"] == 1 assert data["summary"]["readonly_account_scope_accepted_count"] == 1 assert data["summary"]["post_enable_readback_passed_count"] == 0 assert data["summary"]["wazuh_api_live_query_authorized_count"] == 0 assert data["summary"]["wazuh_active_response_authorized_count"] == 0 assert data["summary"]["host_write_authorized_count"] == 0 assert data["summary"]["runtime_gate_count"] == 0 assert data["summary"]["wazuh_live_route_http_status"] == 200 assert data["summary"]["wazuh_live_route_degraded_count"] == 1 assert ( data["summary"]["wazuh_live_status"] == "disabled_waiting_iwooos_wazuh_owner_gate" ) assert data["boundaries"]["secret_value_collection_allowed"] is False assert data["boundaries"]["wazuh_api_live_query_authorized"] is False assert data["boundaries"]["wazuh_active_response_authorized"] is False assert data["boundaries"]["host_write_authorized"] is False assert data["boundaries"]["not_authorization"] is True assert ( data["live_metadata_owner_packet_validation_endpoint"] == "/api/v1/iwooos/wazuh-live-metadata-gate/validate-live-metadata-owner-packet" ) assert len(data["items"]) == 6 assert any(marker == "正式路由讀回=1" for marker in data["boundary_markers"]) assert "192.168.0." not in response.text assert "工作視窗" not in response.text assert "批准!繼續" not in response.text assert "WAZUH_API_PASSWORD" not in response.text def test_iwooos_wazuh_live_metadata_gate_validator_accepts_redacted_packet( monkeypatch, ) -> None: monkeypatch.delenv("IWOOOS_WAZUH_READONLY_ENABLED", raising=False) monkeypatch.delenv("WAZUH_API_BASE_URL", raising=False) monkeypatch.delenv("WAZUH_API_USERNAME", raising=False) monkeypatch.delenv("WAZUH_API_PASSWORD", raising=False) client = _client() before = client.get("/api/v1/iwooos/wazuh-live-metadata-gate").json() response = client.post( "/api/v1/iwooos/wazuh-live-metadata-gate/validate-live-metadata-owner-packet", json=_valid_live_metadata_owner_packet(), ) after = client.get("/api/v1/iwooos/wazuh-live-metadata-gate").json() assert response.status_code == 200 data = response.json() assert ( data["schema_version"] == "iwooos_wazuh_live_metadata_owner_packet_validation_result_v1" ) assert data["status"] == "accepted_for_live_metadata_owner_review_only" assert data["accepted_for_live_metadata_owner_review_only"] is True assert data["summary"]["live_metadata_owner_response_received_count"] == 1 assert data["summary"]["live_metadata_owner_response_accepted_count"] == 1 assert data["summary"]["secret_source_metadata_accepted_count"] == 1 assert data["summary"]["wazuh_manager_health_ref_accepted_count"] == 1 assert data["summary"]["readonly_account_scope_accepted_count"] == 1 assert data["summary"]["post_enable_readback_passed_count"] == 0 assert data["summary"]["wazuh_api_live_query_authorized_count"] == 0 assert data["summary"]["wazuh_active_response_authorized_count"] == 0 assert data["summary"]["host_write_authorized_count"] == 0 assert data["summary"]["runtime_gate_count"] == 0 assert data["boundaries"]["payload_persisted"] is False assert data["boundaries"]["wazuh_api_live_query_authorized"] is False assert data["boundaries"]["runtime_execution_authorized"] is False assert data["boundaries"]["runtime_gate_open"] is False assert before["summary"] == after["summary"] assert "192.168.0." not in response.text assert "工作視窗" not in response.text assert "批准!繼續" not in response.text assert "WAZUH_API_PASSWORD" not in response.text def test_iwooos_wazuh_live_metadata_gate_validator_quarantines_sensitive_payload( monkeypatch, ) -> None: monkeypatch.delenv("IWOOOS_WAZUH_READONLY_ENABLED", raising=False) monkeypatch.delenv("WAZUH_API_BASE_URL", raising=False) monkeypatch.delenv("WAZUH_API_USERNAME", raising=False) monkeypatch.delenv("WAZUH_API_PASSWORD", raising=False) packet = _valid_live_metadata_owner_packet() packet[ "release_readback_ref" ] = "bad ref includes 10.1.2.3 and Authorization: Bearer abcdefghijklmnop" response = _client().post( "/api/v1/iwooos/wazuh-live-metadata-gate/validate-live-metadata-owner-packet", json=packet, ) assert response.status_code == 200 data = response.json() assert data["status"] == "quarantine_sensitive_payload" assert data["quarantined"] is True assert data["summary"]["live_metadata_owner_packet_quarantined_count"] == 1 assert data["summary"]["wazuh_api_live_query_authorized_count"] == 0 assert data["summary"]["runtime_gate_count"] == 0 assert "10.1.2.3" not in response.text assert "Bearer abcdefghijklmnop" not in response.text def test_iwooos_wazuh_live_metadata_gate_validator_rejects_runtime_action( monkeypatch, ) -> None: monkeypatch.delenv("IWOOOS_WAZUH_READONLY_ENABLED", raising=False) monkeypatch.delenv("WAZUH_API_BASE_URL", raising=False) monkeypatch.delenv("WAZUH_API_USERNAME", raising=False) monkeypatch.delenv("WAZUH_API_PASSWORD", raising=False) packet = _valid_live_metadata_owner_packet() packet["wazuh_api_live_query_authorized"] = True response = _client().post( "/api/v1/iwooos/wazuh-live-metadata-gate/validate-live-metadata-owner-packet", json=packet, ) assert response.status_code == 200 data = response.json() assert data["status"] == "reject_runtime_action_request" assert data["runtime_action_rejected"] is True assert data["summary"]["live_metadata_owner_runtime_action_rejected_count"] == 1 assert data["summary"]["wazuh_api_live_query_authorized_count"] == 0 assert data["summary"]["runtime_gate_count"] == 0 def test_iwooos_wazuh_owner_evidence_preflight_api_is_public_safe(monkeypatch) -> None: monkeypatch.delenv("IWOOOS_WAZUH_READONLY_ENABLED", raising=False) monkeypatch.delenv("WAZUH_API_BASE_URL", raising=False) monkeypatch.delenv("WAZUH_API_USERNAME", raising=False) monkeypatch.delenv("WAZUH_API_PASSWORD", raising=False) response = _client().get("/api/v1/iwooos/wazuh-owner-evidence-preflight") assert response.status_code == 200 data = response.json() assert data["schema_version"] == "iwooos_wazuh_owner_evidence_preflight_readback_v1" assert data["status"] == "owner_evidence_preflight_ready_no_runtime_action" assert data["summary"]["required_field_count"] == 28 assert data["summary"]["reviewer_check_count"] == 15 assert data["summary"]["outcome_lane_count"] == 8 assert data["summary"]["forbidden_payload_count"] == 22 assert data["summary"]["expected_scope_alias_count"] == 6 assert data["summary"]["per_host_required_field_count"] == 9 assert data["summary"]["registry_export_received_count"] == 0 assert data["summary"]["registry_export_accepted_count"] == 0 assert data["summary"]["owner_evidence_received_count"] == 0 assert data["summary"]["owner_evidence_accepted_count"] == 0 assert data["summary"]["runtime_gate_count"] == 0 assert data["summary"]["wazuh_api_live_query_authorized_count"] == 0 assert data["summary"]["active_response_authorized_count"] == 0 assert data["summary"]["host_write_authorized_count"] == 0 assert data["summary"]["secret_value_collection_allowed_count"] == 0 assert data["boundaries"]["wazuh_api_live_query_authorized"] is False assert data["boundaries"]["wazuh_active_response_authorized"] is False assert data["boundaries"]["host_write_authorized"] is False assert data["boundaries"]["runtime_execution_authorized"] is False assert data["boundaries"]["not_authorization"] is True assert len(data["items"]) == 8 assert any(marker == "必要欄位=28" for marker in data["boundary_markers"]) assert any( rule.startswith("負責人證據預檢 ready") for rule in data["no_false_green_rules"] ) assert "192.168.0." not in response.text assert "工作視窗" not in response.text assert "批准!繼續" not in response.text assert "WAZUH_API_PASSWORD" not in response.text def test_iwooos_wazuh_runtime_controlled_apply_preflight_api_is_public_safe( monkeypatch, ) -> None: monkeypatch.delenv("IWOOOS_WAZUH_READONLY_ENABLED", raising=False) monkeypatch.delenv("WAZUH_API_BASE_URL", raising=False) monkeypatch.delenv("WAZUH_API_USERNAME", raising=False) monkeypatch.delenv("WAZUH_API_PASSWORD", raising=False) payload = load_latest_iwooos_wazuh_runtime_controlled_apply_preflight() assert ( payload["schema_version"] == "iwooos_wazuh_runtime_controlled_apply_preflight_readback_v1" ) assert payload["status"] == "controlled_apply_preflight_ready_no_runtime_action" assert payload["summary"]["controlled_apply_preflight_ready_count"] == 1 assert payload["summary"]["target_selector_count"] == 6 assert payload["summary"]["runtime_gate_count"] == 0 assert payload["boundaries"]["runtime_execution_authorized"] is False response = _client().get("/api/v1/iwooos/wazuh-runtime-controlled-apply-preflight") assert response.status_code == 200 data = response.json() assert ( data["schema_version"] == "iwooos_wazuh_runtime_controlled_apply_preflight_readback_v1" ) assert data["summary"]["controlled_apply_preflight_ready_count"] == 1 assert data["summary"]["target_selector_count"] == 6 assert data["summary"]["runtime_gate_count"] == 0 assert data["summary"]["wazuh_api_live_query_authorized_count"] == 0 assert data["summary"]["wazuh_active_response_authorized_count"] == 0 assert data["summary"]["host_write_authorized_count"] == 0 assert data["summary"]["secret_value_collection_allowed_count"] == 0 assert data["boundaries"]["payload_persisted"] is False assert data["boundaries"]["wazuh_api_live_query_authorized"] is False assert data["boundaries"]["wazuh_active_response_authorized"] is False assert data["boundaries"]["host_write_authorized"] is False assert data["boundaries"]["runtime_gate_open"] is False assert data["boundaries"]["not_authorization"] is True assert len(data["target_selectors"]) == 6 assert len(data["preflight_items"]) == 6 assert any( marker == "wazuh_runtime_controlled_apply_preflight_visible=true" for marker in data["boundary_markers"] ) assert "192.168.0." not in response.text assert "工作視窗" not in response.text assert "批准!繼續" not in response.text assert "WAZUH_API_PASSWORD" not in response.text def test_iwooos_wazuh_runtime_controlled_apply_preflight_validator_accepts_redacted_packet( monkeypatch, ) -> None: monkeypatch.delenv("IWOOOS_WAZUH_READONLY_ENABLED", raising=False) monkeypatch.delenv("WAZUH_API_BASE_URL", raising=False) monkeypatch.delenv("WAZUH_API_USERNAME", raising=False) monkeypatch.delenv("WAZUH_API_PASSWORD", raising=False) client = _client() before = client.get( "/api/v1/iwooos/wazuh-runtime-controlled-apply-preflight" ).json() response = client.post( "/api/v1/iwooos/wazuh-runtime-controlled-apply-preflight/validate-controlled-apply-packet", json=_valid_runtime_controlled_apply_packet(), ) after = client.get("/api/v1/iwooos/wazuh-runtime-controlled-apply-preflight").json() assert response.status_code == 200 data = response.json() assert ( data["schema_version"] == "iwooos_wazuh_runtime_controlled_apply_packet_validation_result_v1" ) assert data["status"] == "accepted_for_controlled_apply_preflight_review_only" assert data["accepted_for_controlled_apply_preflight_review_only"] is True assert data["summary"]["controlled_apply_packet_received_count"] == 1 assert data["summary"]["controlled_apply_preflight_ready_count"] == 1 assert data["summary"]["runtime_gate_count"] == 0 assert data["summary"]["wazuh_api_live_query_authorized_count"] == 0 assert data["summary"]["wazuh_active_response_authorized_count"] == 0 assert data["summary"]["host_write_authorized_count"] == 0 assert data["summary"]["secret_value_collection_allowed_count"] == 0 assert data["boundaries"]["payload_persisted"] is False assert data["boundaries"]["runtime_execution_authorized"] is False assert data["boundaries"]["runtime_gate_open"] is False assert before["summary"] == after["summary"] assert "192.168.0." not in response.text assert "工作視窗" not in response.text assert "批准!繼續" not in response.text def test_iwooos_wazuh_runtime_controlled_apply_preflight_validator_quarantines_sensitive_payload( monkeypatch, ) -> None: monkeypatch.delenv("IWOOOS_WAZUH_READONLY_ENABLED", raising=False) monkeypatch.delenv("WAZUH_API_BASE_URL", raising=False) monkeypatch.delenv("WAZUH_API_USERNAME", raising=False) monkeypatch.delenv("WAZUH_API_PASSWORD", raising=False) packet = _valid_runtime_controlled_apply_packet() packet[ "redacted_evidence_ref" ] = "raw output includes 10.1.2.3 and Authorization: Bearer abcdefghijklmnop" response = _client().post( "/api/v1/iwooos/wazuh-runtime-controlled-apply-preflight/validate-controlled-apply-packet", json=packet, ) assert response.status_code == 200 data = response.json() assert data["status"] == "quarantine_sensitive_payload" assert data["quarantined"] is True assert data["summary"]["controlled_apply_packet_quarantined_count"] == 1 assert data["summary"]["runtime_gate_count"] == 0 assert "10.1.2.3" not in response.text assert "Bearer abcdefghijklmnop" not in response.text def test_iwooos_wazuh_runtime_controlled_apply_preflight_validator_rejects_runtime_action( monkeypatch, ) -> None: monkeypatch.delenv("IWOOOS_WAZUH_READONLY_ENABLED", raising=False) monkeypatch.delenv("WAZUH_API_BASE_URL", raising=False) monkeypatch.delenv("WAZUH_API_USERNAME", raising=False) monkeypatch.delenv("WAZUH_API_PASSWORD", raising=False) packet = _valid_runtime_controlled_apply_packet() packet["wazuh_active_response"] = True response = _client().post( "/api/v1/iwooos/wazuh-runtime-controlled-apply-preflight/validate-controlled-apply-packet", json=packet, ) assert response.status_code == 200 data = response.json() assert data["status"] == "reject_runtime_action_request" assert data["runtime_action_rejected"] is True assert data["summary"]["controlled_apply_runtime_action_rejected_count"] == 1 assert data["summary"]["runtime_gate_count"] == 0 def test_iwooos_wazuh_runtime_gate_owner_review_readback_api_is_public_safe( monkeypatch, ) -> None: monkeypatch.delenv("IWOOOS_WAZUH_READONLY_ENABLED", raising=False) monkeypatch.delenv("WAZUH_API_BASE_URL", raising=False) monkeypatch.delenv("WAZUH_API_USERNAME", raising=False) monkeypatch.delenv("WAZUH_API_PASSWORD", raising=False) payload = load_latest_iwooos_wazuh_runtime_gate_owner_review_readback() assert ( payload["schema_version"] == "iwooos_wazuh_runtime_gate_owner_review_readback_v1" ) assert ( payload["status"] == "runtime_gate_owner_review_packet_committed_no_runtime_action" ) assert payload["summary"]["owner_review_packet_accepted_count"] == 1 assert payload["summary"]["target_selector_count"] == 6 assert payload["summary"]["runtime_gate_count"] == 0 assert payload["boundaries"]["runtime_execution_authorized"] is False response = _client().get("/api/v1/iwooos/wazuh-runtime-gate-owner-review-readback") assert response.status_code == 200 data = response.json() assert ( data["schema_version"] == "iwooos_wazuh_runtime_gate_owner_review_readback_v1" ) assert data["summary"]["owner_review_packet_received_count"] == 1 assert data["summary"]["owner_review_packet_review_ready_count"] == 1 assert data["summary"]["owner_review_packet_accepted_count"] == 1 assert data["summary"]["runtime_gate_count"] == 0 assert data["summary"]["wazuh_api_live_query_authorized_count"] == 0 assert data["summary"]["wazuh_active_response_authorized_count"] == 0 assert data["summary"]["host_write_authorized_count"] == 0 assert data["summary"]["secret_value_collection_allowed_count"] == 0 assert data["boundaries"]["payload_persisted"] is False assert data["boundaries"]["wazuh_api_live_query_authorized"] is False assert data["boundaries"]["wazuh_active_response_authorized"] is False assert data["boundaries"]["host_write_authorized"] is False assert data["boundaries"]["runtime_gate_open"] is False assert data["boundaries"]["not_authorization"] is True assert len(data["target_selectors"]) == 6 assert len(data["review_items"]) == 7 assert any( marker == "wazuh_runtime_gate_owner_review_validation_api_available=true" for marker in data["boundary_markers"] ) assert "192.168.0." not in response.text assert "工作視窗" not in response.text assert "批准!繼續" not in response.text assert "WAZUH_API_PASSWORD" not in response.text def test_iwooos_wazuh_runtime_gate_owner_review_validator_accepts_redacted_packet( monkeypatch, ) -> None: monkeypatch.delenv("IWOOOS_WAZUH_READONLY_ENABLED", raising=False) monkeypatch.delenv("WAZUH_API_BASE_URL", raising=False) monkeypatch.delenv("WAZUH_API_USERNAME", raising=False) monkeypatch.delenv("WAZUH_API_PASSWORD", raising=False) client = _client() before = client.get( "/api/v1/iwooos/wazuh-runtime-gate-owner-review-readback" ).json() response = client.post( "/api/v1/iwooos/wazuh-runtime-gate-owner-review-readback/validate-owner-review-packet", json=_valid_runtime_gate_owner_review_packet(), ) after = client.get("/api/v1/iwooos/wazuh-runtime-gate-owner-review-readback").json() assert response.status_code == 200 data = response.json() assert ( data["schema_version"] == "iwooos_wazuh_runtime_gate_owner_review_packet_validation_result_v1" ) assert data["status"] == "accepted_for_runtime_gate_owner_review_readback_only" assert data["accepted_for_runtime_gate_owner_review_readback_only"] is True assert data["summary"]["owner_review_packet_received_count"] == 1 assert data["summary"]["owner_review_packet_review_ready_count"] == 1 assert data["summary"]["owner_review_packet_accepted_count"] == 1 assert data["summary"]["runtime_gate_count"] == 0 assert data["summary"]["wazuh_api_live_query_authorized_count"] == 0 assert data["summary"]["wazuh_active_response_authorized_count"] == 0 assert data["summary"]["host_write_authorized_count"] == 0 assert data["summary"]["secret_value_collection_allowed_count"] == 0 assert data["boundaries"]["payload_persisted"] is False assert data["boundaries"]["runtime_execution_authorized"] is False assert data["boundaries"]["runtime_gate_open"] is False assert before["summary"] == after["summary"] assert "192.168.0." not in response.text assert "工作視窗" not in response.text assert "批准!繼續" not in response.text def test_iwooos_wazuh_runtime_gate_owner_review_validator_quarantines_sensitive_payload( monkeypatch, ) -> None: monkeypatch.delenv("IWOOOS_WAZUH_READONLY_ENABLED", raising=False) monkeypatch.delenv("WAZUH_API_BASE_URL", raising=False) monkeypatch.delenv("WAZUH_API_USERNAME", raising=False) monkeypatch.delenv("WAZUH_API_PASSWORD", raising=False) packet = _valid_runtime_gate_owner_review_packet() packet[ "redacted_evidence_ref" ] = "redacted note includes 10.1.2.3 and Authorization: Bearer abcdefghijklmnop" response = _client().post( "/api/v1/iwooos/wazuh-runtime-gate-owner-review-readback/validate-owner-review-packet", json=packet, ) assert response.status_code == 200 data = response.json() assert data["status"] == "quarantine_sensitive_payload" assert data["quarantined"] is True assert data["summary"]["owner_review_packet_quarantined_count"] == 1 assert data["summary"]["runtime_gate_count"] == 0 assert "10.1.2.3" not in response.text assert "Bearer abcdefghijklmnop" not in response.text def test_iwooos_wazuh_runtime_gate_owner_review_validator_rejects_runtime_action( monkeypatch, ) -> None: monkeypatch.delenv("IWOOOS_WAZUH_READONLY_ENABLED", raising=False) monkeypatch.delenv("WAZUH_API_BASE_URL", raising=False) monkeypatch.delenv("WAZUH_API_USERNAME", raising=False) monkeypatch.delenv("WAZUH_API_PASSWORD", raising=False) packet = _valid_runtime_gate_owner_review_packet() packet["wazuh_active_response"] = True response = _client().post( "/api/v1/iwooos/wazuh-runtime-gate-owner-review-readback/validate-owner-review-packet", json=packet, ) assert response.status_code == 200 data = response.json() assert data["status"] == "reject_runtime_action_request" assert data["runtime_action_rejected"] is True assert data["summary"]["owner_review_runtime_action_rejected_count"] == 1 assert data["summary"]["runtime_gate_count"] == 0 def test_iwooos_wazuh_allowlisted_check_mode_dry_run_api_is_public_safe( monkeypatch, ) -> None: monkeypatch.delenv("IWOOOS_WAZUH_READONLY_ENABLED", raising=False) monkeypatch.delenv("WAZUH_API_BASE_URL", raising=False) monkeypatch.delenv("WAZUH_API_USERNAME", raising=False) monkeypatch.delenv("WAZUH_API_PASSWORD", raising=False) payload = load_latest_iwooos_wazuh_allowlisted_check_mode_dry_run() assert ( payload["schema_version"] == "iwooos_wazuh_allowlisted_check_mode_dry_run_readback_v1" ) assert payload["status"] == "allowlisted_check_mode_dry_run_staged_no_runtime_action" assert payload["summary"]["dry_run_packet_accepted_count"] == 1 assert payload["summary"]["allowlisted_target_selector_count"] == 6 assert payload["summary"]["runtime_gate_count"] == 0 assert payload["boundaries"]["runtime_execution_authorized"] is False response = _client().get("/api/v1/iwooos/wazuh-allowlisted-check-mode-dry-run") assert response.status_code == 200 data = response.json() assert ( data["schema_version"] == "iwooos_wazuh_allowlisted_check_mode_dry_run_readback_v1" ) assert data["summary"]["dry_run_packet_received_count"] == 1 assert data["summary"]["dry_run_packet_review_ready_count"] == 1 assert data["summary"]["dry_run_packet_accepted_count"] == 1 assert data["summary"]["runtime_gate_count"] == 0 assert data["summary"]["wazuh_api_live_query_authorized_count"] == 0 assert data["summary"]["wazuh_active_response_authorized_count"] == 0 assert data["summary"]["host_write_authorized_count"] == 0 assert data["summary"]["secret_value_collection_allowed_count"] == 0 assert data["boundaries"]["payload_persisted"] is False assert data["boundaries"]["wazuh_api_live_query_authorized"] is False assert data["boundaries"]["wazuh_active_response_authorized"] is False assert data["boundaries"]["host_write_authorized"] is False assert data["boundaries"]["runtime_gate_open"] is False assert data["boundaries"]["not_authorization"] is True assert len(data["target_selectors"]) == 6 assert len(data["dry_run_items"]) == 6 assert any( marker == "wazuh_allowlisted_check_mode_dry_run_validation_api_available=true" for marker in data["boundary_markers"] ) assert "192.168.0." not in response.text assert "工作視窗" not in response.text assert "批准!繼續" not in response.text assert "WAZUH_API_PASSWORD" not in response.text def test_iwooos_wazuh_allowlisted_check_mode_dry_run_validator_accepts_redacted_packet( monkeypatch, ) -> None: monkeypatch.delenv("IWOOOS_WAZUH_READONLY_ENABLED", raising=False) monkeypatch.delenv("WAZUH_API_BASE_URL", raising=False) monkeypatch.delenv("WAZUH_API_USERNAME", raising=False) monkeypatch.delenv("WAZUH_API_PASSWORD", raising=False) client = _client() before = client.get("/api/v1/iwooos/wazuh-allowlisted-check-mode-dry-run").json() response = client.post( "/api/v1/iwooos/wazuh-allowlisted-check-mode-dry-run/validate-dry-run-packet", json=_valid_allowlisted_check_mode_dry_run_packet(), ) after = client.get("/api/v1/iwooos/wazuh-allowlisted-check-mode-dry-run").json() assert response.status_code == 200 data = response.json() assert ( data["schema_version"] == "iwooos_wazuh_allowlisted_check_mode_dry_run_validation_result_v1" ) assert data["status"] == "accepted_for_allowlisted_check_mode_dry_run_readback_only" assert data["accepted_for_allowlisted_check_mode_dry_run_readback_only"] is True assert data["summary"]["dry_run_packet_received_count"] == 1 assert data["summary"]["dry_run_packet_review_ready_count"] == 1 assert data["summary"]["dry_run_packet_accepted_count"] == 1 assert data["summary"]["runtime_gate_count"] == 0 assert data["summary"]["wazuh_api_live_query_authorized_count"] == 0 assert data["summary"]["wazuh_active_response_authorized_count"] == 0 assert data["summary"]["host_write_authorized_count"] == 0 assert data["summary"]["secret_value_collection_allowed_count"] == 0 assert data["boundaries"]["payload_persisted"] is False assert data["boundaries"]["runtime_execution_authorized"] is False assert data["boundaries"]["runtime_gate_open"] is False assert before["summary"] == after["summary"] assert "192.168.0." not in response.text assert "工作視窗" not in response.text assert "批准!繼續" not in response.text def test_iwooos_wazuh_allowlisted_check_mode_dry_run_validator_quarantines_sensitive_payload( monkeypatch, ) -> None: monkeypatch.delenv("IWOOOS_WAZUH_READONLY_ENABLED", raising=False) monkeypatch.delenv("WAZUH_API_BASE_URL", raising=False) monkeypatch.delenv("WAZUH_API_USERNAME", raising=False) monkeypatch.delenv("WAZUH_API_PASSWORD", raising=False) packet = _valid_allowlisted_check_mode_dry_run_packet() packet[ "redacted_evidence_ref" ] = "dry-run raw output includes 10.1.2.3 and Authorization: Bearer abcdefghijklmnop" response = _client().post( "/api/v1/iwooos/wazuh-allowlisted-check-mode-dry-run/validate-dry-run-packet", json=packet, ) assert response.status_code == 200 data = response.json() assert data["status"] == "quarantine_sensitive_payload" assert data["quarantined"] is True assert data["summary"]["dry_run_packet_quarantined_count"] == 1 assert data["summary"]["runtime_gate_count"] == 0 assert "10.1.2.3" not in response.text assert "Bearer abcdefghijklmnop" not in response.text def test_iwooos_wazuh_allowlisted_check_mode_dry_run_validator_rejects_runtime_action( monkeypatch, ) -> None: monkeypatch.delenv("IWOOOS_WAZUH_READONLY_ENABLED", raising=False) monkeypatch.delenv("WAZUH_API_BASE_URL", raising=False) monkeypatch.delenv("WAZUH_API_USERNAME", raising=False) monkeypatch.delenv("WAZUH_API_PASSWORD", raising=False) packet = _valid_allowlisted_check_mode_dry_run_packet() packet["wazuh_active_response"] = True response = _client().post( "/api/v1/iwooos/wazuh-allowlisted-check-mode-dry-run/validate-dry-run-packet", json=packet, ) assert response.status_code == 200 data = response.json() assert data["status"] == "reject_runtime_action_request" assert data["runtime_action_rejected"] is True assert data["summary"]["dry_run_runtime_action_rejected_count"] == 1 assert data["summary"]["runtime_gate_count"] == 0