Some checks failed
CD Pipeline / workflow-shape (push) Successful in 0s
CD Pipeline / cancel-stale-cd (push) Has been skipped
CD Pipeline / tests (push) Failing after 2m46s
CD Pipeline / build-and-deploy (push) Has been skipped
CD Pipeline / post-deploy-checks (push) Has been skipped
182 lines
8.6 KiB
Python
182 lines
8.6 KiB
Python
from __future__ import annotations
|
|
|
|
from fastapi import FastAPI
|
|
from fastapi.testclient import TestClient
|
|
|
|
from src.api.v1.iwooos import router
|
|
from src.services.iwooos_security_operating_system import (
|
|
load_latest_iwooos_security_operating_system,
|
|
validate_iwooos_security_operation_packet,
|
|
)
|
|
|
|
|
|
def _client() -> TestClient:
|
|
app = FastAPI()
|
|
app.include_router(router)
|
|
return TestClient(app)
|
|
|
|
|
|
def _valid_operation_packet() -> dict[str, object]:
|
|
return {
|
|
"operation_intent": "validate_security_operation_loop_only",
|
|
"event_title": "Wazuh registry parity drift review",
|
|
"severity": "SEV1",
|
|
"confidence": "high",
|
|
"asset_aliases": [
|
|
"managed_core_node_a",
|
|
"managed_core_node_b",
|
|
],
|
|
"asset_scope": "iwooos_wazuh_manager_registry_public_aliases",
|
|
"what_happened_plain_language": "manager registry parity requires review before any controlled apply",
|
|
"why_it_matters": "registry drift can hide unmanaged hosts and weaken intrusion detection",
|
|
"redacted_evidence_refs": [
|
|
"evidence/iwooos/wazuh-registry-parity-redacted-v1",
|
|
"evidence/iwooos/post-enable-readback-redacted-v1",
|
|
],
|
|
"ai_triage_lane": "wazuh_registry_truth",
|
|
"candidate_action": "prepare_controlled_check_mode_dry_run",
|
|
"owner_gate_and_verification": "commander_blanket_authorized_for_low_blast_radius_review_only",
|
|
"target_selector_aliases": [
|
|
"managed_core_node_a",
|
|
"managed_core_node_b",
|
|
],
|
|
"workstream_id": "P0-02",
|
|
"source_of_truth_diff_ref": "docs/security/iwooos-security-operating-system.snapshot.json#wazuh-registry-truth",
|
|
"check_mode_plan_ref": "playbooks/iwooos-security-operation-check-mode#redacted",
|
|
"dry_run_evidence_ref": "evidence/iwooos/security-operation-dry-run-redacted-v1",
|
|
"rollback_plan_ref": "playbooks/iwooos-security-operation-rollback#redacted",
|
|
"rollback_owner": "iwooos-security-reviewer",
|
|
"post_apply_verifier_ref": "verifiers/iwooos-security-operation-readback#public-safe",
|
|
"km_playbook_writeback_ref": "km/playbook-trust/iwooos-security-operation-loop-v1",
|
|
"audit_receipt_ref": "audit/iwooos-security-operation-packet-redacted-v1",
|
|
"runtime_boundary_ack": "runtime_gate_remains_closed_until_post_verifier_passes",
|
|
"host_write_boundary_ack": "no_host_write_performed_by_packet_validator",
|
|
"secret_boundary_ack": "no_secret_value_collected_or_submitted",
|
|
}
|
|
|
|
|
|
def test_iwooos_security_operating_system_readback_exposes_api_validator() -> None:
|
|
payload = load_latest_iwooos_security_operating_system()
|
|
|
|
assert payload["schema_version"] == "iwooos_security_operating_system_readback_v1"
|
|
assert payload["source_schema_version"] == "iwooos_security_operating_system_v1"
|
|
assert payload["status"] == "iwooos_security_operating_system_ready_no_runtime_action"
|
|
assert payload["summary"]["reference_framework_count"] == 20
|
|
assert payload["summary"]["operating_role_count"] == 10
|
|
assert payload["summary"]["severity_lane_count"] == 5
|
|
assert payload["summary"]["workstream_count"] == 24
|
|
assert payload["summary"]["p0_workstream_count"] == 12
|
|
assert payload["summary"]["automation_loop_stage_count"] == 8
|
|
assert payload["summary"]["verification_stage_count"] == 12
|
|
assert payload["summary"]["operation_packet_validator_available_count"] == 1
|
|
assert payload["summary"]["operation_packet_required_field_count"] == 24
|
|
assert payload["summary"]["wazuh_registry_accepted_count"] == 6
|
|
assert payload["summary"]["runtime_gate_count"] == 0
|
|
assert payload["operation_packet_validation_endpoint"] == (
|
|
"/api/v1/iwooos/security-operating-system/validate-operation-packet"
|
|
)
|
|
assert len(payload["required_operation_packet_fields"]) == 24
|
|
assert payload["boundaries"]["payload_persisted"] is False
|
|
assert payload["boundaries"]["runtime_execution_authorized"] is False
|
|
assert payload["boundaries"]["host_write_authorized"] is False
|
|
assert payload["boundaries"]["secret_value_collection_allowed"] is False
|
|
assert payload["boundaries"]["not_authorization"] is True
|
|
assert any(
|
|
marker == "iwooos_security_operation_packet_validation_api_available=true"
|
|
for marker in payload["boundary_markers"]
|
|
)
|
|
|
|
|
|
def test_iwooos_security_operating_system_api_is_public_safe() -> None:
|
|
response = _client().get("/api/v1/iwooos/security-operating-system")
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["schema_version"] == "iwooos_security_operating_system_readback_v1"
|
|
assert data["summary"]["operation_packet_validator_available_count"] == 1
|
|
assert data["summary"]["runtime_gate_count"] == 0
|
|
assert len(data["workstreams"]) == 24
|
|
assert any(item["workstream_id"] == "P0-02" for item in data["workstreams"])
|
|
assert "192.168.0." not in response.text
|
|
assert "工作視窗" not in response.text
|
|
assert "批准!繼續" not in response.text
|
|
assert "WAZUH_API_PASSWORD" not in response.text
|
|
|
|
|
|
def test_iwooos_security_operation_packet_validator_accepts_redacted_loop() -> None:
|
|
payload = validate_iwooos_security_operation_packet(_valid_operation_packet())
|
|
|
|
assert payload["schema_version"] == "iwooos_security_operation_packet_validation_result_v1"
|
|
assert payload["status"] == "accepted_for_security_operation_review_only"
|
|
assert payload["accepted_for_security_operation_review_only"] is True
|
|
assert payload["summary"]["security_operation_packet_received_count"] == 1
|
|
assert payload["summary"]["security_operation_packet_accepted_count"] == 1
|
|
assert payload["summary"]["runtime_gate_count"] == 0
|
|
assert payload["summary"]["host_write_authorized_count"] == 0
|
|
assert payload["summary"]["secret_value_collection_allowed_count"] == 0
|
|
assert payload["boundaries"]["payload_persisted"] is False
|
|
assert payload["boundaries"]["runtime_execution_authorized"] is False
|
|
assert payload["next_gate"] == "controlled_check_mode_dry_run_then_post_apply_verifier_readback"
|
|
|
|
|
|
def test_iwooos_security_operation_packet_api_does_not_persist_or_open_gate() -> None:
|
|
client = _client()
|
|
before = client.get("/api/v1/iwooos/security-operating-system").json()
|
|
response = client.post(
|
|
"/api/v1/iwooos/security-operating-system/validate-operation-packet",
|
|
json=_valid_operation_packet(),
|
|
)
|
|
after = client.get("/api/v1/iwooos/security-operating-system").json()
|
|
|
|
assert response.status_code == 200
|
|
result = response.json()
|
|
assert result["status"] == "accepted_for_security_operation_review_only"
|
|
assert result["summary"]["security_operation_packet_accepted_count"] == 1
|
|
assert result["summary"]["runtime_gate_count"] == 0
|
|
assert before["summary"] == after["summary"]
|
|
|
|
|
|
def test_iwooos_security_operation_packet_validator_requests_missing_fields() -> None:
|
|
packet = _valid_operation_packet()
|
|
packet.pop("post_apply_verifier_ref")
|
|
|
|
payload = validate_iwooos_security_operation_packet(packet)
|
|
|
|
assert payload["status"] == "request_security_operation_packet_supplement"
|
|
assert payload["summary"]["security_operation_packet_supplement_required_count"] == 1
|
|
assert payload["summary"]["runtime_gate_count"] == 0
|
|
assert any(
|
|
"post_apply_verifier_ref" in finding["field_paths"]
|
|
for finding in payload["validation_findings"]
|
|
)
|
|
|
|
|
|
def test_iwooos_security_operation_packet_validator_quarantines_sensitive_payload() -> None:
|
|
packet = _valid_operation_packet()
|
|
packet["redacted_evidence_refs"] = [
|
|
"bad ref includes 10.1.2.3 and Authorization: Bearer abcdefghijklmnop",
|
|
"evidence/iwooos/second-ref-redacted-v1",
|
|
]
|
|
|
|
payload = validate_iwooos_security_operation_packet(packet)
|
|
|
|
assert payload["status"] == "quarantine_sensitive_payload"
|
|
assert payload["quarantined"] is True
|
|
assert payload["summary"]["security_operation_packet_quarantined_count"] == 1
|
|
assert payload["summary"]["runtime_gate_count"] == 0
|
|
assert "10.1.2.3" not in str(payload)
|
|
assert "Bearer abcdefghijklmnop" not in str(payload)
|
|
|
|
|
|
def test_iwooos_security_operation_packet_validator_rejects_runtime_action() -> None:
|
|
packet = _valid_operation_packet()
|
|
packet["wazuh_active_response"] = True
|
|
|
|
payload = validate_iwooos_security_operation_packet(packet)
|
|
|
|
assert payload["status"] == "reject_runtime_action_request"
|
|
assert payload["runtime_action_rejected"] is True
|
|
assert payload["summary"]["security_operation_runtime_action_rejected_count"] == 1
|
|
assert payload["summary"]["wazuh_active_response_authorized_count"] == 0
|
|
assert payload["summary"]["runtime_gate_count"] == 0
|