Files
awoooi/apps/api/tests/test_iwooos_runtime_security_readback.py
Your Name fccd8874fc
Some checks failed
Code Review / ai-code-review (push) Successful in 19s
CD Pipeline / build-and-deploy (push) Has been cancelled
CD Pipeline / post-deploy-checks (push) Has been cancelled
CD Pipeline / tests (push) Has been cancelled
feat(iwooos): expose Wazuh owner evidence preflight
2026-06-27 11:41:09 +08:00

227 lines
12 KiB
Python

from __future__ import annotations
from fastapi import FastAPI
from fastapi.testclient import TestClient
from src.api.v1.iwooos import router
from src.services.iwooos_runtime_security_readback import (
load_latest_iwooos_runtime_security_readback,
)
def _client() -> TestClient:
app = FastAPI()
app.include_router(router)
return TestClient(app)
def test_iwooos_runtime_security_readback_preserves_zero_runtime_gates() -> None:
payload = load_latest_iwooos_runtime_security_readback()
assert payload["schema_version"] == "iwooos_runtime_security_readback_v1"
assert payload["status"] == "blocked_waiting_owner_evidence_and_runtime_gates"
assert payload["summary"]["source_snapshot_count"] == 10
assert payload["summary"]["p0_lane_count"] == 9
assert payload["summary"]["runtime_gate_count"] == 0
assert payload["summary"]["owner_response_received_count"] == 0
assert payload["summary"]["owner_response_accepted_count"] == 0
assert payload["summary"]["wazuh_manager_registry_accepted_count"] == 0
assert payload["summary"]["wazuh_live_status"] == "not_checked_by_snapshot_loader"
assert payload["summary"]["wazuh_live_route_degraded_count"] == 1
assert payload["summary"]["wazuh_live_readonly_api_enabled_count"] == 0
assert payload["summary"]["wazuh_live_metadata_available_count"] == 0
assert payload["summary"]["wazuh_live_metadata_gate_owner_accepted_count"] == 0
assert payload["summary"]["wazuh_live_metadata_gate_secret_source_accepted_count"] == 0
assert payload["summary"]["wazuh_live_metadata_gate_manager_health_accepted_count"] == 0
assert payload["summary"]["wazuh_live_metadata_gate_readonly_scope_accepted_count"] == 0
assert payload["summary"]["wazuh_live_metadata_gate_post_enable_readback_count"] == 0
assert payload["summary"]["wazuh_live_metadata_gate_live_query_authorized_count"] == 0
assert payload["summary"]["wazuh_owner_evidence_required_field_count"] == 28
assert payload["summary"]["wazuh_owner_evidence_reviewer_check_count"] == 15
assert payload["summary"]["wazuh_owner_evidence_outcome_lane_count"] == 8
assert payload["summary"]["wazuh_owner_evidence_forbidden_payload_count"] == 22
assert payload["summary"]["wazuh_owner_evidence_expected_alias_count"] == 6
assert payload["summary"]["wazuh_owner_evidence_registry_export_received_count"] == 0
assert payload["summary"]["wazuh_owner_evidence_registry_export_accepted_count"] == 0
assert payload["summary"]["wazuh_owner_evidence_received_count"] == 0
assert payload["summary"]["wazuh_owner_evidence_accepted_count"] == 0
assert payload["summary"]["wazuh_owner_evidence_runtime_gate_count"] == 0
assert payload["summary"]["kali_active_scan_authorized_count"] == 0
assert payload["summary"]["kali_execute_authorized_count"] == 0
assert payload["summary"]["alert_receipt_runtime_send_count"] == 0
assert payload["boundaries"]["runtime_execution_authorized"] is False
assert payload["boundaries"]["active_scan_authorized"] is False
assert payload["boundaries"]["wazuh_active_response_authorized"] is False
assert payload["boundaries"]["telegram_send_authorized"] is False
def test_iwooos_runtime_security_readback_lanes_are_candidate_only() -> None:
payload = load_latest_iwooos_runtime_security_readback()
lane_ids = {lane["lane_id"] for lane in payload["lanes"]}
assert lane_ids == {
"wazuh_registry",
"wazuh_live_route",
"wazuh_live_metadata_gate",
"wazuh_owner_evidence_preflight",
"wazuh_dashboard_api",
"kali_intake",
"alert_readability",
"owner_dispatch",
"intrusion_prevention",
}
assert all(lane["metrics"] for lane in payload["lanes"])
assert all(lane["next_gate"] for lane in payload["lanes"])
assert all(lane["source_refs"] for lane in payload["lanes"])
assert any(lane["completion_percent"] > 0 for lane in payload["lanes"])
assert all(lane["lane_id"] != "wazuh_registry" or lane["completion_percent"] == 0 for lane in payload["lanes"])
assert all(lane["lane_id"] != "wazuh_live_route" or lane["metrics"]["route_degraded"] == 1 for lane in payload["lanes"])
assert all(lane["lane_id"] != "wazuh_live_metadata_gate" or lane["completion_percent"] == 0 for lane in payload["lanes"])
assert all(lane["lane_id"] != "wazuh_owner_evidence_preflight" or lane["metrics"]["owner_accepted"] == 0 for lane in payload["lanes"])
def test_iwooos_runtime_security_readback_api_is_public_safe(monkeypatch) -> None:
monkeypatch.delenv("IWOOOS_WAZUH_READONLY_ENABLED", raising=False)
monkeypatch.delenv("WAZUH_API_BASE_URL", raising=False)
monkeypatch.delenv("WAZUH_API_USERNAME", raising=False)
monkeypatch.delenv("WAZUH_API_PASSWORD", raising=False)
response = _client().get("/api/v1/iwooos/runtime-security-readback")
assert response.status_code == 200
data = response.json()
assert data["schema_version"] == "iwooos_runtime_security_readback_v1"
assert data["summary"]["runtime_gate_count"] == 0
assert data["summary"]["wazuh_live_status"] == "disabled_waiting_iwooos_wazuh_owner_gate"
assert data["summary"]["wazuh_live_route_http_status"] == 200
assert data["summary"]["wazuh_live_route_degraded_count"] == 1
assert data["summary"]["wazuh_live_metadata_available_count"] == 0
assert data["summary"]["wazuh_live_metadata_gate_live_query_authorized_count"] == 0
assert data["summary"]["wazuh_owner_evidence_accepted_count"] == 0
assert data["summary"]["wazuh_owner_evidence_runtime_gate_count"] == 0
assert data["boundaries"]["secret_value_collection_allowed"] is False
assert "192.168.0." not in response.text
assert "工作視窗" not in response.text
assert "批准!繼續" not in response.text
def test_iwooos_runtime_security_readback_api_includes_live_wazuh_empty_registry(monkeypatch) -> None:
import httpx
monkeypatch.setenv("IWOOOS_WAZUH_READONLY_ENABLED", "true")
monkeypatch.setenv("WAZUH_API_BASE_URL", "https://wazuh.example.test:55000")
monkeypatch.setenv("WAZUH_API_USERNAME", "readonly")
monkeypatch.setenv("WAZUH_API_PASSWORD", "placeholder")
monkeypatch.setenv("IWOOOS_WAZUH_EXPECTED_MIN_AGENT_COUNT", "6")
def handler(request: httpx.Request) -> httpx.Response:
if request.url.path == "/security/user/authenticate":
return httpx.Response(200, json={"data": {"token": "token-value"}})
if request.url.path == "/agents/summary/status":
return httpx.Response(
200,
json={"data": {"connection": {"total": 0, "active": 0, "disconnected": 0, "pending": 0}}},
)
if request.url.path == "/agents":
return httpx.Response(200, json={"data": {"affected_items": []}})
return httpx.Response(404)
transport = httpx.MockTransport(handler)
original_async_client = httpx.AsyncClient
def client_factory(*args, **kwargs):
kwargs["transport"] = transport
return original_async_client(*args, **kwargs)
monkeypatch.setattr(httpx, "AsyncClient", client_factory)
response = _client().get("/api/v1/iwooos/runtime-security-readback")
assert response.status_code == 200
data = response.json()
assert data["summary"]["wazuh_live_status"] == "wazuh_agent_registry_empty"
assert data["summary"]["wazuh_live_readonly_api_enabled_count"] == 1
assert data["summary"]["wazuh_live_agent_total"] == 0
assert data["summary"]["wazuh_live_registry_empty_count"] == 1
assert data["summary"]["wazuh_live_route_degraded_count"] == 1
assert data["summary"]["runtime_gate_count"] == 0
assert "token-value" not in response.text
def test_iwooos_wazuh_live_metadata_gate_api_is_public_safe(monkeypatch) -> None:
monkeypatch.delenv("IWOOOS_WAZUH_READONLY_ENABLED", raising=False)
monkeypatch.delenv("WAZUH_API_BASE_URL", raising=False)
monkeypatch.delenv("WAZUH_API_USERNAME", raising=False)
monkeypatch.delenv("WAZUH_API_PASSWORD", raising=False)
response = _client().get("/api/v1/iwooos/wazuh-live-metadata-gate")
assert response.status_code == 200
data = response.json()
assert data["schema_version"] == "iwooos_wazuh_live_metadata_gate_readback_v1"
assert data["status"] == "blocked_waiting_live_metadata_owner_response"
assert data["summary"]["production_route_readback_passed_count"] == 1
assert data["summary"]["live_metadata_owner_response_accepted_count"] == 0
assert data["summary"]["secret_source_metadata_accepted_count"] == 0
assert data["summary"]["readonly_account_scope_accepted_count"] == 0
assert data["summary"]["post_enable_readback_passed_count"] == 0
assert data["summary"]["wazuh_api_live_query_authorized_count"] == 0
assert data["summary"]["wazuh_active_response_authorized_count"] == 0
assert data["summary"]["host_write_authorized_count"] == 0
assert data["summary"]["runtime_gate_count"] == 0
assert data["summary"]["wazuh_live_route_http_status"] == 200
assert data["summary"]["wazuh_live_route_degraded_count"] == 1
assert data["summary"]["wazuh_live_status"] == "disabled_waiting_iwooos_wazuh_owner_gate"
assert data["boundaries"]["secret_value_collection_allowed"] is False
assert data["boundaries"]["wazuh_api_live_query_authorized"] is False
assert data["boundaries"]["wazuh_active_response_authorized"] is False
assert data["boundaries"]["host_write_authorized"] is False
assert data["boundaries"]["not_authorization"] is True
assert len(data["items"]) == 6
assert any(marker == "正式路由讀回=1" for marker in data["boundary_markers"])
assert "192.168.0." not in response.text
assert "工作視窗" not in response.text
assert "批准!繼續" not in response.text
assert "WAZUH_API_PASSWORD" not in response.text
def test_iwooos_wazuh_owner_evidence_preflight_api_is_public_safe(monkeypatch) -> None:
monkeypatch.delenv("IWOOOS_WAZUH_READONLY_ENABLED", raising=False)
monkeypatch.delenv("WAZUH_API_BASE_URL", raising=False)
monkeypatch.delenv("WAZUH_API_USERNAME", raising=False)
monkeypatch.delenv("WAZUH_API_PASSWORD", raising=False)
response = _client().get("/api/v1/iwooos/wazuh-owner-evidence-preflight")
assert response.status_code == 200
data = response.json()
assert data["schema_version"] == "iwooos_wazuh_owner_evidence_preflight_readback_v1"
assert data["status"] == "owner_evidence_preflight_ready_no_runtime_action"
assert data["summary"]["required_field_count"] == 28
assert data["summary"]["reviewer_check_count"] == 15
assert data["summary"]["outcome_lane_count"] == 8
assert data["summary"]["forbidden_payload_count"] == 22
assert data["summary"]["expected_scope_alias_count"] == 6
assert data["summary"]["per_host_required_field_count"] == 9
assert data["summary"]["registry_export_received_count"] == 0
assert data["summary"]["registry_export_accepted_count"] == 0
assert data["summary"]["owner_evidence_received_count"] == 0
assert data["summary"]["owner_evidence_accepted_count"] == 0
assert data["summary"]["runtime_gate_count"] == 0
assert data["summary"]["wazuh_api_live_query_authorized_count"] == 0
assert data["summary"]["active_response_authorized_count"] == 0
assert data["summary"]["host_write_authorized_count"] == 0
assert data["summary"]["secret_value_collection_allowed_count"] == 0
assert data["boundaries"]["wazuh_api_live_query_authorized"] is False
assert data["boundaries"]["wazuh_active_response_authorized"] is False
assert data["boundaries"]["host_write_authorized"] is False
assert data["boundaries"]["runtime_execution_authorized"] is False
assert data["boundaries"]["not_authorization"] is True
assert len(data["items"]) == 8
assert any(marker == "必要欄位=28" for marker in data["boundary_markers"])
assert any(rule.startswith("負責人證據預檢 ready") for rule in data["no_false_green_rules"])
assert "192.168.0." not in response.text
assert "工作視窗" not in response.text
assert "批准!繼續" not in response.text
assert "WAZUH_API_PASSWORD" not in response.text