Files
awoooi/apps/api/tests/test_iwooos_security_operating_system.py
Your Name d50a453a79
Some checks failed
CD Pipeline / workflow-shape (push) Successful in 0s
CD Pipeline / cancel-stale-cd (push) Has been skipped
CD Pipeline / tests (push) Failing after 2m46s
CD Pipeline / build-and-deploy (push) Has been skipped
CD Pipeline / post-deploy-checks (push) Has been skipped
feat(security): add iwooos operation packet validator
2026-06-29 13:43:51 +08:00

182 lines
8.6 KiB
Python

from __future__ import annotations
from fastapi import FastAPI
from fastapi.testclient import TestClient
from src.api.v1.iwooos import router
from src.services.iwooos_security_operating_system import (
load_latest_iwooos_security_operating_system,
validate_iwooos_security_operation_packet,
)
def _client() -> TestClient:
app = FastAPI()
app.include_router(router)
return TestClient(app)
def _valid_operation_packet() -> dict[str, object]:
return {
"operation_intent": "validate_security_operation_loop_only",
"event_title": "Wazuh registry parity drift review",
"severity": "SEV1",
"confidence": "high",
"asset_aliases": [
"managed_core_node_a",
"managed_core_node_b",
],
"asset_scope": "iwooos_wazuh_manager_registry_public_aliases",
"what_happened_plain_language": "manager registry parity requires review before any controlled apply",
"why_it_matters": "registry drift can hide unmanaged hosts and weaken intrusion detection",
"redacted_evidence_refs": [
"evidence/iwooos/wazuh-registry-parity-redacted-v1",
"evidence/iwooos/post-enable-readback-redacted-v1",
],
"ai_triage_lane": "wazuh_registry_truth",
"candidate_action": "prepare_controlled_check_mode_dry_run",
"owner_gate_and_verification": "commander_blanket_authorized_for_low_blast_radius_review_only",
"target_selector_aliases": [
"managed_core_node_a",
"managed_core_node_b",
],
"workstream_id": "P0-02",
"source_of_truth_diff_ref": "docs/security/iwooos-security-operating-system.snapshot.json#wazuh-registry-truth",
"check_mode_plan_ref": "playbooks/iwooos-security-operation-check-mode#redacted",
"dry_run_evidence_ref": "evidence/iwooos/security-operation-dry-run-redacted-v1",
"rollback_plan_ref": "playbooks/iwooos-security-operation-rollback#redacted",
"rollback_owner": "iwooos-security-reviewer",
"post_apply_verifier_ref": "verifiers/iwooos-security-operation-readback#public-safe",
"km_playbook_writeback_ref": "km/playbook-trust/iwooos-security-operation-loop-v1",
"audit_receipt_ref": "audit/iwooos-security-operation-packet-redacted-v1",
"runtime_boundary_ack": "runtime_gate_remains_closed_until_post_verifier_passes",
"host_write_boundary_ack": "no_host_write_performed_by_packet_validator",
"secret_boundary_ack": "no_secret_value_collected_or_submitted",
}
def test_iwooos_security_operating_system_readback_exposes_api_validator() -> None:
payload = load_latest_iwooos_security_operating_system()
assert payload["schema_version"] == "iwooos_security_operating_system_readback_v1"
assert payload["source_schema_version"] == "iwooos_security_operating_system_v1"
assert payload["status"] == "iwooos_security_operating_system_ready_no_runtime_action"
assert payload["summary"]["reference_framework_count"] == 20
assert payload["summary"]["operating_role_count"] == 10
assert payload["summary"]["severity_lane_count"] == 5
assert payload["summary"]["workstream_count"] == 24
assert payload["summary"]["p0_workstream_count"] == 12
assert payload["summary"]["automation_loop_stage_count"] == 8
assert payload["summary"]["verification_stage_count"] == 12
assert payload["summary"]["operation_packet_validator_available_count"] == 1
assert payload["summary"]["operation_packet_required_field_count"] == 24
assert payload["summary"]["wazuh_registry_accepted_count"] == 6
assert payload["summary"]["runtime_gate_count"] == 0
assert payload["operation_packet_validation_endpoint"] == (
"/api/v1/iwooos/security-operating-system/validate-operation-packet"
)
assert len(payload["required_operation_packet_fields"]) == 24
assert payload["boundaries"]["payload_persisted"] is False
assert payload["boundaries"]["runtime_execution_authorized"] is False
assert payload["boundaries"]["host_write_authorized"] is False
assert payload["boundaries"]["secret_value_collection_allowed"] is False
assert payload["boundaries"]["not_authorization"] is True
assert any(
marker == "iwooos_security_operation_packet_validation_api_available=true"
for marker in payload["boundary_markers"]
)
def test_iwooos_security_operating_system_api_is_public_safe() -> None:
response = _client().get("/api/v1/iwooos/security-operating-system")
assert response.status_code == 200
data = response.json()
assert data["schema_version"] == "iwooos_security_operating_system_readback_v1"
assert data["summary"]["operation_packet_validator_available_count"] == 1
assert data["summary"]["runtime_gate_count"] == 0
assert len(data["workstreams"]) == 24
assert any(item["workstream_id"] == "P0-02" for item in data["workstreams"])
assert "192.168.0." not in response.text
assert "工作視窗" not in response.text
assert "批准!繼續" not in response.text
assert "WAZUH_API_PASSWORD" not in response.text
def test_iwooos_security_operation_packet_validator_accepts_redacted_loop() -> None:
payload = validate_iwooos_security_operation_packet(_valid_operation_packet())
assert payload["schema_version"] == "iwooos_security_operation_packet_validation_result_v1"
assert payload["status"] == "accepted_for_security_operation_review_only"
assert payload["accepted_for_security_operation_review_only"] is True
assert payload["summary"]["security_operation_packet_received_count"] == 1
assert payload["summary"]["security_operation_packet_accepted_count"] == 1
assert payload["summary"]["runtime_gate_count"] == 0
assert payload["summary"]["host_write_authorized_count"] == 0
assert payload["summary"]["secret_value_collection_allowed_count"] == 0
assert payload["boundaries"]["payload_persisted"] is False
assert payload["boundaries"]["runtime_execution_authorized"] is False
assert payload["next_gate"] == "controlled_check_mode_dry_run_then_post_apply_verifier_readback"
def test_iwooos_security_operation_packet_api_does_not_persist_or_open_gate() -> None:
client = _client()
before = client.get("/api/v1/iwooos/security-operating-system").json()
response = client.post(
"/api/v1/iwooos/security-operating-system/validate-operation-packet",
json=_valid_operation_packet(),
)
after = client.get("/api/v1/iwooos/security-operating-system").json()
assert response.status_code == 200
result = response.json()
assert result["status"] == "accepted_for_security_operation_review_only"
assert result["summary"]["security_operation_packet_accepted_count"] == 1
assert result["summary"]["runtime_gate_count"] == 0
assert before["summary"] == after["summary"]
def test_iwooos_security_operation_packet_validator_requests_missing_fields() -> None:
packet = _valid_operation_packet()
packet.pop("post_apply_verifier_ref")
payload = validate_iwooos_security_operation_packet(packet)
assert payload["status"] == "request_security_operation_packet_supplement"
assert payload["summary"]["security_operation_packet_supplement_required_count"] == 1
assert payload["summary"]["runtime_gate_count"] == 0
assert any(
"post_apply_verifier_ref" in finding["field_paths"]
for finding in payload["validation_findings"]
)
def test_iwooos_security_operation_packet_validator_quarantines_sensitive_payload() -> None:
packet = _valid_operation_packet()
packet["redacted_evidence_refs"] = [
"bad ref includes 10.1.2.3 and Authorization: Bearer abcdefghijklmnop",
"evidence/iwooos/second-ref-redacted-v1",
]
payload = validate_iwooos_security_operation_packet(packet)
assert payload["status"] == "quarantine_sensitive_payload"
assert payload["quarantined"] is True
assert payload["summary"]["security_operation_packet_quarantined_count"] == 1
assert payload["summary"]["runtime_gate_count"] == 0
assert "10.1.2.3" not in str(payload)
assert "Bearer abcdefghijklmnop" not in str(payload)
def test_iwooos_security_operation_packet_validator_rejects_runtime_action() -> None:
packet = _valid_operation_packet()
packet["wazuh_active_response"] = True
payload = validate_iwooos_security_operation_packet(packet)
assert payload["status"] == "reject_runtime_action_request"
assert payload["runtime_action_rejected"] is True
assert payload["summary"]["security_operation_runtime_action_rejected_count"] == 1
assert payload["summary"]["wazuh_active_response_authorized_count"] == 0
assert payload["summary"]["runtime_gate_count"] == 0