feat(iwooos): add wazuh controlled apply preflight
Some checks failed
Ansible / Reboot Recovery Contract / validate (push) Successful in 1m16s
CD Pipeline / tests (push) Failing after 35s
CD Pipeline / build-and-deploy (push) Has been skipped
CD Pipeline / post-deploy-checks (push) Has been skipped
Code Review / ai-code-review (push) Successful in 17s

This commit is contained in:
Your Name
2026-06-28 10:10:39 +08:00
parent 5d5832e558
commit b010afdbf6
9 changed files with 1626 additions and 90 deletions

View File

@@ -14,21 +14,18 @@ from typing import Any
from fastapi import APIRouter, HTTPException, status
from fastapi.responses import JSONResponse
from src.services.iwooos_runtime_security_readback import (
load_latest_iwooos_runtime_security_readback,
)
from src.services.iwooos_high_value_config_control_coverage import (
load_latest_iwooos_high_value_config_control_coverage,
)
from src.services.iwooos_owner_evidence_intake_preflight import (
load_latest_iwooos_owner_evidence_intake_preflight,
)
from src.services.iwooos_runtime_security_readback import (
load_latest_iwooos_runtime_security_readback,
)
from src.services.iwooos_security_control_coverage import (
load_latest_iwooos_security_control_coverage,
)
from src.services.iwooos_wazuh_readonly_status import (
load_iwooos_wazuh_readonly_status,
)
from src.services.iwooos_wazuh_live_metadata_gate import (
load_latest_iwooos_wazuh_live_metadata_gate,
)
@@ -37,15 +34,27 @@ from src.services.iwooos_wazuh_managed_host_coverage import (
)
from src.services.iwooos_wazuh_manager_registry_reviewer_validation import (
load_latest_iwooos_wazuh_manager_registry_reviewer_validation,
)
from src.services.iwooos_wazuh_manager_registry_reviewer_validation import (
validate_iwooos_wazuh_manager_registry_acceptance_evidence as validate_wazuh_manager_registry_acceptance_evidence_payload,
)
from src.services.iwooos_wazuh_manager_registry_reviewer_validation import (
validate_iwooos_wazuh_manager_registry_owner_export as validate_wazuh_manager_registry_owner_export_payload,
)
from src.services.iwooos_wazuh_owner_evidence_preflight import (
load_latest_iwooos_wazuh_owner_evidence_preflight,
)
from src.services.iwooos_wazuh_readonly_status import (
load_iwooos_wazuh_readonly_status,
)
from src.services.iwooos_wazuh_runtime_controlled_apply_preflight import (
load_latest_iwooos_wazuh_runtime_controlled_apply_preflight,
)
from src.services.iwooos_wazuh_runtime_controlled_apply_preflight import (
validate_iwooos_wazuh_runtime_controlled_apply_packet as validate_wazuh_runtime_controlled_apply_packet_payload,
)
from src.services.public_redaction import redact_public_lan_topology
router = APIRouter(tags=["IwoooS Security"])
@@ -110,7 +119,9 @@ async def get_iwooos_wazuh_live_metadata_gate() -> dict[str, Any]:
async def get_iwooos_wazuh_owner_evidence_preflight() -> dict[str, Any]:
"""回傳 Wazuh manager registry 負責人證據收件預檢只讀狀態。"""
try:
payload = await asyncio.to_thread(load_latest_iwooos_wazuh_owner_evidence_preflight)
payload = await asyncio.to_thread(
load_latest_iwooos_wazuh_owner_evidence_preflight
)
return redact_public_lan_topology(payload)
except FileNotFoundError as exc:
raise HTTPException(
@@ -138,7 +149,9 @@ async def get_iwooos_wazuh_owner_evidence_preflight() -> dict[str, Any]:
async def get_iwooos_wazuh_managed_host_coverage() -> dict[str, Any]:
"""回傳 Wazuh 受管主機覆蓋公開安全只讀狀態。"""
try:
payload = await asyncio.to_thread(load_latest_iwooos_wazuh_managed_host_coverage)
payload = await asyncio.to_thread(
load_latest_iwooos_wazuh_managed_host_coverage
)
return redact_public_lan_topology(payload)
except FileNotFoundError as exc:
raise HTTPException(
@@ -166,7 +179,9 @@ async def get_iwooos_wazuh_managed_host_coverage() -> dict[str, Any]:
async def get_iwooos_wazuh_manager_registry_reviewer_validation() -> dict[str, Any]:
"""回傳 Wazuh manager registry reviewer validation 公開安全只讀狀態。"""
try:
payload = await asyncio.to_thread(load_latest_iwooos_wazuh_manager_registry_reviewer_validation)
payload = await asyncio.to_thread(
load_latest_iwooos_wazuh_manager_registry_reviewer_validation
)
return redact_public_lan_topology(payload)
except FileNotFoundError as exc:
raise HTTPException(
@@ -192,7 +207,9 @@ async def get_iwooos_wazuh_manager_registry_reviewer_validation() -> dict[str, A
"accepted 總帳。"
),
)
async def validate_iwooos_wazuh_manager_registry_owner_export(owner_export: dict[str, Any]) -> dict[str, Any]:
async def validate_iwooos_wazuh_manager_registry_owner_export(
owner_export: dict[str, Any],
) -> dict[str, Any]:
"""回傳單次 Wazuh manager registry 脫敏匯出的公開安全驗證結果。"""
try:
payload = await asyncio.to_thread(
@@ -246,6 +263,70 @@ async def validate_iwooos_wazuh_manager_registry_acceptance_evidence(
) from exc
@router.get(
"/api/v1/iwooos/wazuh-runtime-controlled-apply-preflight",
response_model=dict[str, Any],
summary="取得 Wazuh runtime controlled apply preflight 只讀讀回",
description=(
"讀取已提交的 Wazuh runtime controlled apply preflight contract回傳 target selector、"
"source-of-truth diff、check-mode / dry-run、rollback、post-apply verifier、KM / PlayBook "
"writeback 與 0 / false 邊界。此端點不查 Wazuh API、不讀主機、不重新註冊 agent、"
"不重啟服務、不保存機密、不啟用主動回應、不改 Nginx / Docker / K8s / firewall。"
),
)
async def get_iwooos_wazuh_runtime_controlled_apply_preflight() -> dict[str, Any]:
"""回傳 Wazuh runtime controlled apply preflight 公開安全只讀狀態。"""
try:
payload = await asyncio.to_thread(
load_latest_iwooos_wazuh_runtime_controlled_apply_preflight
)
return redact_public_lan_topology(payload)
except FileNotFoundError as exc:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=str(exc),
) from exc
except (json.JSONDecodeError, ValueError) as exc:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"IwoooS Wazuh runtime controlled apply preflight 無效:{exc}",
) from exc
@router.post(
"/api/v1/iwooos/wazuh-runtime-controlled-apply-preflight/validate-controlled-apply-packet",
response_model=dict[str, Any],
summary="驗證 Wazuh runtime controlled apply 脫敏 preflight packet",
description=(
"針對單次 owner / reviewer 提供的 redacted Wazuh runtime controlled apply preflight packet "
"進行 no-persist review readiness validation回傳 accepted-for-review / needs supplement / "
"quarantined / rejected runtime action 分流。此端點不保存 payload、不查 Wazuh API、不讀主機、"
"不重新註冊 agent、不重啟服務、不讀或回傳機密明文、不啟用主動回應、不改 Nginx / Docker / "
"K8s / firewall也不更新 runtime gate 總帳。"
),
)
async def validate_iwooos_wazuh_runtime_controlled_apply_packet(
controlled_apply_packet: dict[str, Any],
) -> dict[str, Any]:
"""回傳單次 Wazuh runtime controlled apply preflight packet 的公開安全驗證結果。"""
try:
payload = await asyncio.to_thread(
validate_wazuh_runtime_controlled_apply_packet_payload,
controlled_apply_packet,
)
return redact_public_lan_topology(payload)
except FileNotFoundError as exc:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=str(exc),
) from exc
except (json.JSONDecodeError, ValueError) as exc:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"IwoooS Wazuh runtime controlled apply preflight packet 驗證器無效:{exc}",
) from exc
@router.get(
"/api/v1/iwooos/runtime-security-readback",
response_model=dict[str, Any],
@@ -320,7 +401,9 @@ async def get_iwooos_security_control_coverage() -> dict[str, Any]:
async def get_iwooos_high_value_config_control_coverage() -> dict[str, Any]:
"""回傳高價值配置控管矩陣公開安全只讀狀態。"""
try:
payload = await asyncio.to_thread(load_latest_iwooos_high_value_config_control_coverage)
payload = await asyncio.to_thread(
load_latest_iwooos_high_value_config_control_coverage
)
return redact_public_lan_topology(payload)
except FileNotFoundError as exc:
raise HTTPException(
@@ -348,7 +431,9 @@ async def get_iwooos_high_value_config_control_coverage() -> dict[str, Any]:
async def get_iwooos_owner_evidence_intake_preflight() -> dict[str, Any]:
"""回傳 IwoooS 負責人脫敏證據收件預檢公開安全只讀狀態。"""
try:
payload = await asyncio.to_thread(load_latest_iwooos_owner_evidence_intake_preflight)
payload = await asyncio.to_thread(
load_latest_iwooos_owner_evidence_intake_preflight
)
return redact_public_lan_topology(payload)
except FileNotFoundError as exc:
raise HTTPException(

View File

@@ -22,6 +22,7 @@ _SNAPSHOT_FILES = {
"wazuh_runtime": "wazuh-agent-visibility-runtime-gate.snapshot.json",
"wazuh_live_metadata_gate": "wazuh-readonly-live-metadata-env-gate.snapshot.json",
"wazuh_owner_evidence_preflight": "wazuh-agent-visibility-owner-evidence-preflight.snapshot.json",
"wazuh_runtime_apply_preflight": "wazuh-runtime-controlled-apply-preflight.snapshot.json",
"kali_status": "kali-integration-status.snapshot.json",
"soc_control": "soc-siem-kali-wazuh-integration-control.snapshot.json",
"alert_readability": "telegram-alert-readability-guard.snapshot.json",
@@ -35,6 +36,7 @@ _EXPECTED_SCHEMAS = {
"wazuh_runtime": "wazuh_agent_visibility_runtime_gate_v1",
"wazuh_live_metadata_gate": "iwooos_wazuh_readonly_live_metadata_env_gate_v1",
"wazuh_owner_evidence_preflight": "wazuh_agent_visibility_owner_evidence_preflight_v1",
"wazuh_runtime_apply_preflight": "wazuh_runtime_controlled_apply_preflight_v1",
"kali_status": "kali_integration_status_v1",
"soc_control": "soc_siem_kali_wazuh_integration_control_v1",
"alert_readability": "telegram_alert_readability_guard_v1",
@@ -69,13 +71,21 @@ def load_latest_iwooos_runtime_security_readback(
) -> dict[str, Any]:
"""Load and normalize the current IwoooS runtime security readback."""
directory = security_dir or _DEFAULT_SECURITY_DIR
snapshots = {key: _load_snapshot(directory, key, filename) for key, filename in _SNAPSHOT_FILES.items()}
snapshots = {
key: _load_snapshot(directory, key, filename)
for key, filename in _SNAPSHOT_FILES.items()
}
_require_runtime_boundaries(snapshots)
owner_gap_summary = _summary(snapshots["owner_gap"])
wazuh_summary = _summary(snapshots["wazuh_coverage"])
live_metadata_gate_summary = _summary(snapshots["wazuh_live_metadata_gate"])
owner_evidence_preflight_summary = _summary(snapshots["wazuh_owner_evidence_preflight"])
owner_evidence_preflight_summary = _summary(
snapshots["wazuh_owner_evidence_preflight"]
)
runtime_apply_preflight_summary = _summary(
snapshots["wazuh_runtime_apply_preflight"]
)
soc_summary = _summary(snapshots["soc_control"])
alert_summary = _summary(snapshots["alert_readability"])
dispatch_summary = _summary(snapshots["owner_dispatch"])
@@ -98,40 +108,60 @@ def load_latest_iwooos_runtime_security_readback(
"source_refs": source_refs,
"summary": {
"source_snapshot_count": len(source_refs),
"p0_lane_count": 9,
"p0_lane_count": 10,
"control_plane_visibility_percent": _average_percent(
soc_summary.get("coverage_percent_after_soc_integration_control"),
intrusion_summary.get("coverage_percent_after_prevention_control"),
_alert_contract_percent(alert_summary),
),
"actual_runtime_acceptance_percent": 0,
"owner_response_received_count": _int(owner_gap_summary.get("owner_response_received_count")),
"owner_response_accepted_count": _int(owner_gap_summary.get("owner_response_accepted_count")),
"owner_response_received_count": _int(
owner_gap_summary.get("owner_response_received_count")
),
"owner_response_accepted_count": _int(
owner_gap_summary.get("owner_response_accepted_count")
),
"redacted_evidence_refs_received_count": 0,
"request_sent_count": _int(dispatch_summary.get("request_sent_count")),
"wazuh_expected_host_scope_count": _int(wazuh_summary.get("expected_host_scope_count")),
"wazuh_manager_registry_accepted_count": _int(wazuh_summary.get("manager_registry_accepted_count")),
"wazuh_transport_observed_count": _int(wazuh_summary.get("manager_transport_established_connection_count")),
"wazuh_expected_host_scope_count": _int(
wazuh_summary.get("expected_host_scope_count")
),
"wazuh_manager_registry_accepted_count": _int(
wazuh_summary.get("manager_registry_accepted_count")
),
"wazuh_transport_observed_count": _int(
wazuh_summary.get("manager_transport_established_connection_count")
),
"wazuh_dashboard_api_degraded_observed_count": _int(
wazuh_summary.get("dashboard_api_degraded_observed_count")
),
"wazuh_live_route_http_status": live_wazuh["http_status"],
"wazuh_live_route_degraded_count": live_wazuh["degraded_count"],
"wazuh_live_readonly_api_enabled_count": live_wazuh["readonly_api_enabled_count"],
"wazuh_live_readonly_api_enabled_count": live_wazuh[
"readonly_api_enabled_count"
],
"wazuh_live_agent_total": live_wazuh["agent_total"],
"wazuh_live_agent_active": live_wazuh["agent_active"],
"wazuh_live_registry_empty_count": live_wazuh["agent_registry_empty_count"],
"wazuh_live_below_expected_count": live_wazuh["agent_below_expected_minimum_count"],
"wazuh_live_metadata_available_count": live_wazuh["metadata_available_count"],
"wazuh_live_below_expected_count": live_wazuh[
"agent_below_expected_minimum_count"
],
"wazuh_live_metadata_available_count": live_wazuh[
"metadata_available_count"
],
"wazuh_live_status": live_wazuh["status"],
"wazuh_live_metadata_gate_owner_accepted_count": _int(
live_metadata_gate_summary.get("live_metadata_owner_response_accepted_count")
live_metadata_gate_summary.get(
"live_metadata_owner_response_accepted_count"
)
),
"wazuh_live_metadata_gate_secret_source_accepted_count": _int(
live_metadata_gate_summary.get("secret_source_metadata_accepted_count")
),
"wazuh_live_metadata_gate_manager_health_accepted_count": _int(
live_metadata_gate_summary.get("wazuh_manager_health_ref_accepted_count")
live_metadata_gate_summary.get(
"wazuh_manager_health_ref_accepted_count"
)
),
"wazuh_live_metadata_gate_readonly_scope_accepted_count": _int(
live_metadata_gate_summary.get("readonly_account_scope_accepted_count")
@@ -172,12 +202,56 @@ def load_latest_iwooos_runtime_security_readback(
"wazuh_owner_evidence_runtime_gate_count": _int(
owner_evidence_preflight_summary.get("runtime_gate_count")
),
"kali_active_scan_authorized_count": _int(soc_summary.get("kali_active_scan_authorized_count")),
"kali_execute_authorized_count": _int(soc_summary.get("kali_execute_authorized_count")),
"kali_finding_envelope_accepted_count": _int(soc_summary.get("kali_finding_envelope_accepted_count")),
"alert_formatter_contract_marker_count": _int(alert_summary.get("source_formatter_marker_count")),
"alert_receipt_runtime_send_count": _int(alert_summary.get("telegram_send_authorized_count")),
"intrusion_prevention_candidate_count": _int(intrusion_summary.get("urgent_prevention_candidate_count")),
"wazuh_runtime_apply_preflight_ready_count": _int(
runtime_apply_preflight_summary.get(
"controlled_apply_preflight_ready_count"
)
),
"wazuh_runtime_apply_target_selector_count": _int(
runtime_apply_preflight_summary.get("target_selector_count")
),
"wazuh_runtime_apply_source_diff_count": _int(
runtime_apply_preflight_summary.get("source_of_truth_diff_count")
),
"wazuh_runtime_apply_check_mode_plan_count": _int(
runtime_apply_preflight_summary.get("check_mode_plan_count")
),
"wazuh_runtime_apply_dry_run_required_count": _int(
runtime_apply_preflight_summary.get("dry_run_required_count")
),
"wazuh_runtime_apply_rollback_plan_count": _int(
runtime_apply_preflight_summary.get("rollback_plan_count")
),
"wazuh_runtime_apply_post_apply_verifier_count": _int(
runtime_apply_preflight_summary.get("post_apply_verifier_count")
),
"wazuh_runtime_apply_km_writeback_count": _int(
runtime_apply_preflight_summary.get("km_playbook_writeback_count")
),
"wazuh_runtime_apply_owner_review_ready_count": _int(
runtime_apply_preflight_summary.get("owner_review_ready_count")
),
"wazuh_runtime_apply_runtime_gate_count": _int(
runtime_apply_preflight_summary.get("runtime_gate_count")
),
"kali_active_scan_authorized_count": _int(
soc_summary.get("kali_active_scan_authorized_count")
),
"kali_execute_authorized_count": _int(
soc_summary.get("kali_execute_authorized_count")
),
"kali_finding_envelope_accepted_count": _int(
soc_summary.get("kali_finding_envelope_accepted_count")
),
"alert_formatter_contract_marker_count": _int(
alert_summary.get("source_formatter_marker_count")
),
"alert_receipt_runtime_send_count": _int(
alert_summary.get("telegram_send_authorized_count")
),
"intrusion_prevention_candidate_count": _int(
intrusion_summary.get("urgent_prevention_candidate_count")
),
"runtime_gate_count": runtime_gate_count,
},
"lanes": [
@@ -204,8 +278,12 @@ def load_latest_iwooos_runtime_security_readback(
"管理器清單交叉驗收已讀回runtime gate 仍關閉",
{
"expected_hosts": wazuh_summary.get("expected_host_scope_count", 0),
"transport_observed": wazuh_summary.get("manager_transport_established_connection_count", 0),
"registry_accepted": wazuh_summary.get("manager_registry_accepted_count", 0),
"transport_observed": wazuh_summary.get(
"manager_transport_established_connection_count", 0
),
"registry_accepted": wazuh_summary.get(
"manager_registry_accepted_count", 0
),
},
["docs/security/wazuh-managed-host-coverage-gate.snapshot.json"],
),
@@ -226,13 +304,19 @@ def load_latest_iwooos_runtime_security_readback(
),
_lane(
"wazuh_live_metadata_gate",
snapshots["wazuh_live_metadata_gate"].get("status", "blocked_waiting_live_metadata_owner_response"),
snapshots["wazuh_live_metadata_gate"].get(
"status", "blocked_waiting_live_metadata_owner_response"
),
0,
"locked",
"補齊負責人回覆、機密中繼資料、管理節點健康、唯讀範圍與啟用後讀回",
{
"route_readback": live_metadata_gate_summary.get("production_route_readback_passed_count", 0),
"owner_accepted": live_metadata_gate_summary.get("live_metadata_owner_response_accepted_count", 0),
"route_readback": live_metadata_gate_summary.get(
"production_route_readback_passed_count", 0
),
"owner_accepted": live_metadata_gate_summary.get(
"live_metadata_owner_response_accepted_count", 0
),
"secret_metadata_accepted": live_metadata_gate_summary.get(
"secret_source_metadata_accepted_count",
0,
@@ -245,12 +329,16 @@ def load_latest_iwooos_runtime_security_readback(
"readonly_account_scope_accepted_count",
0,
),
"post_enable_readback": live_metadata_gate_summary.get("post_enable_readback_passed_count", 0),
"post_enable_readback": live_metadata_gate_summary.get(
"post_enable_readback_passed_count", 0
),
"live_query_authorized": live_metadata_gate_summary.get(
"wazuh_api_live_query_authorized_count",
0,
),
"runtime_gate": live_metadata_gate_summary.get("runtime_gate_count", 0),
"runtime_gate": live_metadata_gate_summary.get(
"runtime_gate_count", 0
),
},
["docs/security/wazuh-readonly-live-metadata-env-gate.snapshot.json"],
),
@@ -264,19 +352,86 @@ def load_latest_iwooos_runtime_security_readback(
"locked",
"補齊 manager registry 脫敏封包、逐主機矩陣、Dashboard API 狀態與負責人決策",
{
"required_fields": owner_evidence_preflight_summary.get("required_field_count", 0),
"reviewer_checks": owner_evidence_preflight_summary.get("reviewer_check_count", 0),
"outcome_lanes": owner_evidence_preflight_summary.get("outcome_lane_count", 0),
"forbidden_payloads": owner_evidence_preflight_summary.get("forbidden_payload_count", 0),
"owner_received": owner_evidence_preflight_summary.get("owner_evidence_received_count", 0),
"owner_accepted": owner_evidence_preflight_summary.get("owner_evidence_accepted_count", 0),
"required_fields": owner_evidence_preflight_summary.get(
"required_field_count", 0
),
"reviewer_checks": owner_evidence_preflight_summary.get(
"reviewer_check_count", 0
),
"outcome_lanes": owner_evidence_preflight_summary.get(
"outcome_lane_count", 0
),
"forbidden_payloads": owner_evidence_preflight_summary.get(
"forbidden_payload_count", 0
),
"owner_received": owner_evidence_preflight_summary.get(
"owner_evidence_received_count", 0
),
"owner_accepted": owner_evidence_preflight_summary.get(
"owner_evidence_accepted_count", 0
),
"registry_export_accepted": owner_evidence_preflight_summary.get(
"registry_export_accepted_count",
0,
),
"runtime_gate": owner_evidence_preflight_summary.get("runtime_gate_count", 0),
"runtime_gate": owner_evidence_preflight_summary.get(
"runtime_gate_count", 0
),
},
["docs/security/wazuh-agent-visibility-owner-evidence-preflight.snapshot.json"],
[
"docs/security/wazuh-agent-visibility-owner-evidence-preflight.snapshot.json"
],
),
_lane(
"wazuh_runtime_controlled_apply_preflight",
snapshots["wazuh_runtime_apply_preflight"].get(
"status",
"controlled_apply_preflight_ready_no_runtime_action",
),
45
if _int(
runtime_apply_preflight_summary.get(
"controlled_apply_preflight_ready_count"
)
)
else 0,
"steady"
if _int(
runtime_apply_preflight_summary.get(
"controlled_apply_preflight_ready_count"
)
)
else "locked",
"執行前仍需 allowlisted check-mode、dry-run evidence 與 post-apply verifier readback",
{
"target_selectors": runtime_apply_preflight_summary.get(
"target_selector_count", 0
),
"source_diff": runtime_apply_preflight_summary.get(
"source_of_truth_diff_count", 0
),
"check_mode": runtime_apply_preflight_summary.get(
"check_mode_plan_count", 0
),
"dry_run": runtime_apply_preflight_summary.get(
"dry_run_required_count", 0
),
"rollback": runtime_apply_preflight_summary.get(
"rollback_plan_count", 0
),
"post_apply_verifier": runtime_apply_preflight_summary.get(
"post_apply_verifier_count", 0
),
"km_writeback": runtime_apply_preflight_summary.get(
"km_playbook_writeback_count", 0
),
"runtime_gate": runtime_apply_preflight_summary.get(
"runtime_gate_count", 0
),
},
[
"docs/security/wazuh-runtime-controlled-apply-preflight.snapshot.json"
],
),
_lane(
"wazuh_dashboard_api",
@@ -285,9 +440,13 @@ def load_latest_iwooos_runtime_security_readback(
"warn",
"儀表板 API、RBAC 與 TLS 修復後重新讀回",
{
"dashboard_api_degraded": wazuh_summary.get("dashboard_api_degraded_observed_count", 0),
"dashboard_api_degraded": wazuh_summary.get(
"dashboard_api_degraded_observed_count", 0
),
"runtime_gate": wazuh_summary.get("runtime_gate_count", 0),
"accepted_evidence": _accepted_evidence_count(snapshots["wazuh_runtime"]),
"accepted_evidence": _accepted_evidence_count(
snapshots["wazuh_runtime"]
),
},
[
"docs/security/wazuh-managed-host-coverage-gate.snapshot.json",
@@ -301,9 +460,15 @@ def load_latest_iwooos_runtime_security_readback(
"locked",
"資安觀測範圍與 finding envelope 先被接受",
{
"active_scan_authorized": soc_summary.get("kali_active_scan_authorized_count", 0),
"execute_authorized": soc_summary.get("kali_execute_authorized_count", 0),
"finding_envelope_accepted": soc_summary.get("kali_finding_envelope_accepted_count", 0),
"active_scan_authorized": soc_summary.get(
"kali_active_scan_authorized_count", 0
),
"execute_authorized": soc_summary.get(
"kali_execute_authorized_count", 0
),
"finding_envelope_accepted": soc_summary.get(
"kali_finding_envelope_accepted_count", 0
),
},
[
"docs/security/kali-integration-status.snapshot.json",
@@ -317,37 +482,57 @@ def load_latest_iwooos_runtime_security_readback(
"warn",
"補齊告警路由 receipt 與實發驗證",
{
"formatter_markers": alert_summary.get("source_formatter_marker_count", 0),
"required_markers": alert_summary.get("required_output_marker_count", 0),
"telegram_send": alert_summary.get("telegram_send_authorized_count", 0),
"formatter_markers": alert_summary.get(
"source_formatter_marker_count", 0
),
"required_markers": alert_summary.get(
"required_output_marker_count", 0
),
"telegram_send": alert_summary.get(
"telegram_send_authorized_count", 0
),
},
["docs/security/telegram-alert-readability-guard.snapshot.json"],
),
_lane(
"owner_dispatch",
snapshots["owner_dispatch"].get("status", "owner_request_draft_ready_not_dispatched"),
snapshots["owner_dispatch"].get(
"status", "owner_request_draft_ready_not_dispatched"
),
0,
"locked",
"正式負責人回覆封包送達與接受",
{
"request_drafts": dispatch_summary.get("request_draft_count", 0),
"request_sent": dispatch_summary.get("request_sent_count", 0),
"owner_accepted": dispatch_summary.get("owner_response_accepted_count", 0),
"owner_accepted": dispatch_summary.get(
"owner_response_accepted_count", 0
),
},
["docs/security/monitoring-owner-request-draft.snapshot.json"],
),
_lane(
"intrusion_prevention",
"candidate_only_no_runtime_containment",
_int(intrusion_summary.get("coverage_percent_after_prevention_control")),
_int(
intrusion_summary.get("coverage_percent_after_prevention_control")
),
"warn",
"補脫敏證據參照與維護窗口",
{
"urgent_candidates": intrusion_summary.get("urgent_prevention_candidate_count", 0),
"evidence_received": intrusion_summary.get("evidence_ref_received_count", 0),
"containment_accepted": intrusion_summary.get("containment_decision_accepted_count", 0),
"urgent_candidates": intrusion_summary.get(
"urgent_prevention_candidate_count", 0
),
"evidence_received": intrusion_summary.get(
"evidence_ref_received_count", 0
),
"containment_accepted": intrusion_summary.get(
"containment_decision_accepted_count", 0
),
},
["docs/security/external-host-intrusion-prevention-control.snapshot.json"],
[
"docs/security/external-host-intrusion-prevention-control.snapshot.json"
],
),
],
"boundaries": {
@@ -376,6 +561,7 @@ def load_latest_iwooos_runtime_security_readback(
"Wazuh 正式只讀路由 disabled 或退化時仍是 P0 紅燈",
"Wazuh 即時中繼資料必須先通過負責人、機密中繼資料、唯讀範圍與啟用後讀回",
"Wazuh 負責人證據預檢 ready 不代表已收件、已接受或可啟用 active response",
"Wazuh controlled apply preflight ready 不代表 runtime gate 已開或已執行修復",
],
}
@@ -403,7 +589,9 @@ def _int(value: Any) -> int:
return value if isinstance(value, int) else 0
def _wazuh_live_summary(payload: dict[str, Any] | None, http_status: int) -> dict[str, Any]:
def _wazuh_live_summary(
payload: dict[str, Any] | None, http_status: int
) -> dict[str, Any]:
if not isinstance(payload, dict):
return {
"status": "not_checked_by_snapshot_loader",
@@ -428,7 +616,9 @@ def _wazuh_live_summary(payload: dict[str, Any] | None, http_status: int) -> dic
"agent_total": _int(summary.get("agent_total")),
"agent_active": _int(summary.get("agent_active")),
"agent_registry_empty_count": _int(summary.get("agent_registry_empty_count")),
"agent_below_expected_minimum_count": _int(summary.get("agent_below_expected_minimum_count")),
"agent_below_expected_minimum_count": _int(
summary.get("agent_below_expected_minimum_count")
),
"metadata_available_count": 1 if metadata_available else 0,
}
@@ -448,7 +638,11 @@ def _accepted_evidence_count(payload: dict[str, Any]) -> int:
evidence = payload.get("required_evidence_before_green")
if not isinstance(evidence, list):
return 0
return sum(1 for item in evidence if isinstance(item, dict) and item.get("accepted") is True)
return sum(
1
for item in evidence
if isinstance(item, dict) and item.get("accepted") is True
)
def _lane(
@@ -475,7 +669,14 @@ def _max_summary_count(
snapshots: dict[str, dict[str, Any]],
*keys: str,
) -> int:
return max((_int(_summary(payload).get(key)) for payload in snapshots.values() for key in keys), default=0)
return max(
(
_int(_summary(payload).get(key))
for payload in snapshots.values()
for key in keys
),
default=0,
)
def _require_runtime_boundaries(snapshots: dict[str, dict[str, Any]]) -> None:
@@ -505,7 +706,11 @@ def _require_runtime_boundaries(snapshots: dict[str, dict[str, Any]]) -> None:
boundaries = payload.get("execution_boundaries")
if isinstance(boundaries, dict):
invalid = sorted(
key for key in _FALSE_BOUNDARY_KEYS if key in boundaries and boundaries.get(key) is not False
key
for key in _FALSE_BOUNDARY_KEYS
if key in boundaries and boundaries.get(key) is not False
)
if invalid:
raise ValueError(f"{name}: execution boundaries must remain false: {invalid}")
raise ValueError(
f"{name}: execution boundaries must remain false: {invalid}"
)

View File

@@ -0,0 +1,679 @@
"""
IwoooS Wazuh runtime controlled apply preflight readback.
This service exposes a committed preflight contract and a no-persist validator
for redacted controlled-apply packets. It never queries live Wazuh, reads host
data, reads secrets, persists raw payloads, or authorizes runtime actions.
"""
from __future__ import annotations
import json
import re
from pathlib import Path
from typing import Any
from src.services.snapshot_paths import default_security_dir
_DEFAULT_SECURITY_DIR = default_security_dir(Path(__file__))
_SNAPSHOT_FILE = "wazuh-runtime-controlled-apply-preflight.snapshot.json"
_EXPECTED_SCHEMA = "wazuh_runtime_controlled_apply_preflight_v1"
_REQUIRED_FALSE_BOUNDARIES = {
"active_scan_authorized",
"alertmanager_reload_authorized",
"auto_block_authorized",
"credentialed_scan_authorized",
"firewall_change_authorized",
"host_write_authorized",
"kali_execute_authorized",
"kali_scan_authorized",
"nginx_reload_authorized",
"production_write_authorized",
"runtime_execution_authorized",
"runtime_gate_open",
"secret_value_collection_allowed",
"telegram_send_authorized",
"wazuh_active_response_authorized",
"wazuh_api_live_query_authorized",
}
_SENSITIVE_TEXT_PATTERNS = {
"internal_ip": re.compile(
r"\b(?:10|127|172\.(?:1[6-9]|2\d|3[01])|192\.168)\.\d{1,3}\.\d{1,3}\b"
),
"authorization_header": re.compile(r"Authorization\s*:", re.IGNORECASE),
"bearer_token": re.compile(r"Bearer\s+[A-Za-z0-9._-]{10,}", re.IGNORECASE),
"basic_auth": re.compile(r"Basic\s+[A-Za-z0-9+/=]{10,}", re.IGNORECASE),
"password_assignment": re.compile(
r"password\s*[:=]\s*['\"][^'\"]+['\"]", re.IGNORECASE
),
"token_assignment": re.compile(r"token\s*[:=]\s*['\"][^'\"]+['\"]", re.IGNORECASE),
"cookie_assignment": re.compile(
r"cookie\s*[:=]\s*['\"][^'\"]+['\"]", re.IGNORECASE
),
"client_keys": re.compile(r"client\.keys", re.IGNORECASE),
"private_key": re.compile(r"-----BEGIN [A-Z ]*PRIVATE KEY-----"),
"raw_session_text": re.compile(
r"(工作視窗|批准!繼續|source_thread_id|raw session)", re.IGNORECASE
),
}
_FORBIDDEN_KEY_FRAGMENTS = {
"authorization_header",
"basic_auth",
"bearer_token",
"client_keys",
"cookie",
"env_file",
"full_cli_output",
"full_journal",
"hostname",
"internal_ip",
"password",
"private_key",
"raw_agent_identity",
"raw_dashboard_request",
"raw_env",
"raw_hostname",
"raw_log",
"raw_runtime_volume",
"raw_wazuh_payload",
"session",
"stored_api_password",
"token",
"unredacted_screenshot",
"wazuh_api_password",
}
_RUNTIME_ACTION_KEYS = {
"active_response_enable",
"agent_reenroll",
"agent_restart",
"ansible_apply",
"ansible_playbook_run",
"apply_now",
"argocd_sync",
"credentialed_scan",
"database_migration",
"docker_restart",
"execute_now",
"exploit_attempt",
"firewall_change",
"force_push",
"host_write",
"k8s_apply",
"kali_active_scan",
"nginx_reload",
"repo_ref_delete",
"runtime_execution_authorized",
"secret_rotation",
"systemd_restart",
"wazuh_active_response",
"wazuh_agent_reenroll",
"wazuh_agent_restart",
"wazuh_api_live_query",
"wazuh_manager_restart",
"workflow_trigger",
}
def load_latest_iwooos_wazuh_runtime_controlled_apply_preflight(
security_dir: Path | None = None,
) -> dict[str, Any]:
"""Load the public-safe Wazuh runtime controlled-apply preflight contract."""
directory = security_dir or _DEFAULT_SECURITY_DIR
snapshot = _load_snapshot(directory)
_require_boundaries(snapshot)
summary = _summary(snapshot)
merged_summary = {
"expected_scope_alias_count": _int(summary.get("expected_scope_alias_count")),
"target_selector_count": _int(summary.get("target_selector_count")),
"source_of_truth_diff_count": _int(summary.get("source_of_truth_diff_count")),
"check_mode_plan_count": _int(summary.get("check_mode_plan_count")),
"dry_run_required_count": _int(summary.get("dry_run_required_count")),
"rollback_plan_count": _int(summary.get("rollback_plan_count")),
"post_apply_verifier_count": _int(summary.get("post_apply_verifier_count")),
"km_playbook_writeback_count": _int(summary.get("km_playbook_writeback_count")),
"maintenance_window_required_count": _int(
summary.get("maintenance_window_required_count")
),
"owner_review_ready_count": _int(summary.get("owner_review_ready_count")),
"controlled_apply_preflight_ready_count": _int(
summary.get("controlled_apply_preflight_ready_count")
),
"controlled_apply_packet_received_count": _int(
summary.get("controlled_apply_packet_received_count")
),
"controlled_apply_packet_accepted_count": _int(
summary.get("controlled_apply_packet_accepted_count")
),
"controlled_apply_packet_quarantined_count": _int(
summary.get("controlled_apply_packet_quarantined_count")
),
"controlled_apply_runtime_action_rejected_count": _int(
summary.get("controlled_apply_runtime_action_rejected_count")
),
"forbidden_payload_count": _int(summary.get("forbidden_payload_count")),
"forbidden_action_count": _int(summary.get("forbidden_action_count")),
"runtime_gate_count": _int(summary.get("runtime_gate_count")),
"wazuh_api_live_query_authorized_count": _int(
summary.get("wazuh_api_live_query_authorized_count")
),
"wazuh_active_response_authorized_count": _int(
summary.get("wazuh_active_response_authorized_count")
),
"host_write_authorized_count": _int(summary.get("host_write_authorized_count")),
"secret_value_collection_allowed_count": _int(
summary.get("secret_value_collection_allowed_count")
),
}
return {
"schema_version": "iwooos_wazuh_runtime_controlled_apply_preflight_readback_v1",
"source_schema_version": snapshot["schema_version"],
"status": snapshot.get(
"status", "controlled_apply_preflight_ready_no_runtime_action"
),
"mode": snapshot.get(
"mode", "committed_preflight_readback_no_live_wazuh_no_secret_collection"
),
"source_refs": [
f"docs/security/{_SNAPSHOT_FILE}",
],
"controlled_apply_packet_validation_endpoint": (
"/api/v1/iwooos/wazuh-runtime-controlled-apply-preflight/validate-controlled-apply-packet"
),
"controlled_apply_packet_validation_mode": "no_persist_preflight_review_no_runtime_action",
"summary": merged_summary,
"target_selectors": _target_selectors(snapshot.get("target_selectors")),
"required_packet_fields": _strings(snapshot.get("required_packet_fields")),
"preflight_items": _preflight_items(snapshot.get("preflight_items")),
"outcome_lanes": _strings(snapshot.get("outcome_lanes")),
"forbidden_payloads": _strings(snapshot.get("forbidden_payloads")),
"forbidden_actions": _strings(snapshot.get("forbidden_actions")),
"boundary_markers": _boundary_markers(merged_summary),
"boundaries": {
"payload_persisted": False,
"wazuh_api_live_query_authorized": False,
"wazuh_active_response_authorized": False,
"wazuh_agent_reenroll_authorized": False,
"wazuh_agent_restart_authorized": False,
"wazuh_manager_restart_authorized": False,
"host_write_authorized": False,
"active_scan_authorized": False,
"kali_execute_authorized": False,
"nginx_reload_authorized": False,
"secret_value_collection_allowed": False,
"runtime_execution_authorized": False,
"runtime_gate_open": False,
"not_authorization": True,
},
"no_false_green_rules": _strings(snapshot.get("no_false_green_rules")),
}
def validate_iwooos_wazuh_runtime_controlled_apply_packet(
packet: dict[str, Any],
security_dir: Path | None = None,
) -> dict[str, Any]:
"""Validate one redacted controlled-apply preflight packet without applying it."""
contract = load_latest_iwooos_wazuh_runtime_controlled_apply_preflight(security_dir)
snapshot = _load_snapshot(security_dir or _DEFAULT_SECURITY_DIR)
required_fields = _strings(snapshot.get("required_packet_fields"))
expected_aliases = {
item["node_alias"]
for item in contract["target_selectors"]
if item.get("node_alias")
}
findings: list[dict[str, Any]] = []
if not isinstance(packet, dict):
findings.append(
_finding(
"CAP-01",
"blocker",
"request_controlled_apply_packet_supplement",
"controlled apply packet must be a JSON object.",
[],
)
)
return _validation_result(
contract, "request_controlled_apply_packet_supplement", findings
)
sensitive_hits = _collect_sensitive_hits(packet)
if sensitive_hits:
findings.append(
_finding(
"CAP-04",
"critical",
"quarantine_sensitive_payload",
"controlled apply packet contains forbidden or likely unredacted content; response omits raw values.",
[hit["path"] for hit in sensitive_hits[:12]],
{"categories": sorted({hit["category"] for hit in sensitive_hits})},
)
)
return _validation_result(contract, "quarantine_sensitive_payload", findings)
runtime_hits = _collect_runtime_action_hits(packet)
if runtime_hits:
findings.append(
_finding(
"CAP-05",
"critical",
"reject_runtime_action_request",
"controlled apply packet requested runtime execution; this validator only reviews preflight evidence.",
runtime_hits[:12],
)
)
return _validation_result(contract, "reject_runtime_action_request", findings)
missing_fields = [
field for field in required_fields if not _present(packet.get(field))
]
if missing_fields:
findings.append(
_finding(
"CAP-01",
"blocker",
"request_controlled_apply_packet_supplement",
"controlled apply packet is missing required preflight fields.",
missing_fields,
)
)
alias_issue = _validate_aliases(
packet.get("target_selector_aliases"), expected_aliases
)
if alias_issue:
findings.append(
_finding(
"CAP-02",
"blocker",
"request_target_selector_fix",
alias_issue,
["target_selector_aliases"],
)
)
intent = packet.get("controlled_apply_intent")
if intent != "prepare_controlled_apply_preflight_only":
findings.append(
_finding(
"CAP-03",
"blocker",
"request_controlled_apply_intent_fix",
"controlled_apply_intent must be prepare_controlled_apply_preflight_only.",
["controlled_apply_intent"],
)
)
if packet.get("runtime_boundary_ack") != "runtime_gate_remains_closed":
findings.append(
_finding(
"CAP-06",
"blocker",
"request_runtime_boundary_ack_fix",
"runtime_boundary_ack must state runtime_gate_remains_closed.",
["runtime_boundary_ack"],
)
)
outcome = (
_first_blocking_lane(findings)
or "accepted_for_controlled_apply_preflight_review_only"
)
if outcome == "accepted_for_controlled_apply_preflight_review_only":
findings.append(
_finding(
"CAP-07",
"info",
"controlled_apply_preflight_review_ready",
"controlled apply preflight packet passed no-persist review; runtime gate remains closed.",
[
"source_of_truth_diff_ref",
"check_mode_plan_ref",
"post_apply_verifier_ref",
],
)
)
return _validation_result(contract, outcome, findings)
def _load_snapshot(directory: Path) -> dict[str, Any]:
path = directory / _SNAPSHOT_FILE
if not path.is_file():
raise FileNotFoundError(
f"{path}: Wazuh runtime controlled apply preflight snapshot not found"
)
with path.open(encoding="utf-8") as handle:
payload = json.load(handle)
if not isinstance(payload, dict):
raise ValueError(f"{path}: expected JSON object")
if payload.get("schema_version") != _EXPECTED_SCHEMA:
raise ValueError(f"{path}: expected schema_version={_EXPECTED_SCHEMA}")
return payload
def _summary(payload: dict[str, Any]) -> dict[str, Any]:
summary = payload.get("summary")
return summary if isinstance(summary, dict) else {}
def _int(value: Any) -> int:
return value if isinstance(value, int) else 0
def _strings(value: Any) -> list[str]:
if not isinstance(value, list):
return []
return [item for item in value if isinstance(item, str)]
def _target_selectors(value: Any) -> list[dict[str, Any]]:
if not isinstance(value, list):
return []
selectors: list[dict[str, Any]] = []
for item in value:
if not isinstance(item, dict):
continue
selectors.append(
{
"node_alias": str(item.get("node_alias", "")),
"scope": str(item.get("scope", "")),
"selector_kind": str(item.get("selector_kind", "")),
"runtime_write_allowed": item.get("runtime_write_allowed") is True,
}
)
return selectors
def _preflight_items(value: Any) -> list[dict[str, Any]]:
if not isinstance(value, list):
return []
items: list[dict[str, Any]] = []
for item in value:
if not isinstance(item, dict):
continue
items.append(
{
"item_id": str(item.get("item_id", "")),
"title": str(item.get("title", "")),
"state_key": str(item.get("state_key", "")),
"ready": item.get("ready") is True,
"required_fields": _strings(item.get("required_fields")),
"next_gate": str(item.get("next_gate", "")),
}
)
return items
def _boundary_markers(summary: dict[str, int]) -> list[str]:
return [
"wazuh_runtime_controlled_apply_preflight_visible=true",
"wazuh_runtime_controlled_apply_packet_validation_api_available=true",
f"wazuh_runtime_controlled_apply_target_selector_count={summary['target_selector_count']}",
f"wazuh_runtime_controlled_apply_source_of_truth_diff_count={summary['source_of_truth_diff_count']}",
f"wazuh_runtime_controlled_apply_check_mode_plan_count={summary['check_mode_plan_count']}",
f"wazuh_runtime_controlled_apply_rollback_plan_count={summary['rollback_plan_count']}",
f"wazuh_runtime_controlled_apply_post_apply_verifier_count={summary['post_apply_verifier_count']}",
f"wazuh_runtime_controlled_apply_km_playbook_writeback_count={summary['km_playbook_writeback_count']}",
f"wazuh_runtime_controlled_apply_preflight_ready_count={summary['controlled_apply_preflight_ready_count']}",
"wazuh_runtime_controlled_apply_runtime_gate_count=0",
"wazuh_api_live_query_authorized=false",
"wazuh_active_response_authorized=false",
"host_write_authorized=false",
"secret_value_collection_allowed=false",
"not_authorization=true",
]
def _require_boundaries(payload: dict[str, Any]) -> None:
summary = _summary(payload)
for key in (
"runtime_gate_count",
"wazuh_api_live_query_authorized_count",
"wazuh_active_response_authorized_count",
"host_write_authorized_count",
"secret_value_collection_allowed_count",
"controlled_apply_packet_received_count",
"controlled_apply_packet_accepted_count",
"controlled_apply_packet_quarantined_count",
"controlled_apply_runtime_action_rejected_count",
):
if _int(summary.get(key)) != 0:
raise ValueError(
f"Wazuh runtime controlled apply preflight summary.{key} must remain 0"
)
expected_alias_count = _int(summary.get("expected_scope_alias_count"))
target_selector_count = _int(summary.get("target_selector_count"))
target_selectors = _target_selectors(payload.get("target_selectors"))
if (
target_selector_count != expected_alias_count
or len(target_selectors) != expected_alias_count
):
raise ValueError(
"Wazuh runtime controlled apply preflight target selectors must match expected alias count"
)
if any(item.get("runtime_write_allowed") is True for item in target_selectors):
raise ValueError(
"Wazuh runtime controlled apply preflight target selectors must not allow runtime writes"
)
readiness_keys = (
"source_of_truth_diff_count",
"check_mode_plan_count",
"dry_run_required_count",
"rollback_plan_count",
"post_apply_verifier_count",
"km_playbook_writeback_count",
"maintenance_window_required_count",
"owner_review_ready_count",
"controlled_apply_preflight_ready_count",
)
if any(_int(summary.get(key)) <= 0 for key in readiness_keys):
raise ValueError(
"Wazuh runtime controlled apply preflight readiness counters must be positive"
)
boundaries = payload.get("execution_boundaries")
if not isinstance(boundaries, dict):
raise ValueError(
"Wazuh runtime controlled apply preflight execution_boundaries missing"
)
for key in _REQUIRED_FALSE_BOUNDARIES:
if boundaries.get(key) is not False:
raise ValueError(
f"Wazuh runtime controlled apply preflight execution_boundaries.{key} must remain false"
)
if boundaries.get("not_authorization") is not True:
raise ValueError(
"Wazuh runtime controlled apply preflight not_authorization must remain true"
)
def _validation_result(
contract: dict[str, Any],
outcome_lane: str,
findings: list[dict[str, Any]],
) -> dict[str, Any]:
accepted = outcome_lane == "accepted_for_controlled_apply_preflight_review_only"
quarantined = outcome_lane == "quarantine_sensitive_payload"
rejected_runtime = outcome_lane == "reject_runtime_action_request"
supplement_required = not accepted and not quarantined and not rejected_runtime
return {
"schema_version": "iwooos_wazuh_runtime_controlled_apply_packet_validation_result_v1",
"contract_schema_version": contract["schema_version"],
"status": outcome_lane,
"mode": "no_persist_controlled_apply_preflight_review_no_runtime_no_secret_collection",
"outcome_lane": outcome_lane,
"accepted_for_controlled_apply_preflight_review_only": accepted,
"quarantined": quarantined,
"runtime_action_rejected": rejected_runtime,
"summary": {
"controlled_apply_packet_received_count": 1,
"controlled_apply_preflight_ready_count": 1 if accepted else 0,
"controlled_apply_packet_supplement_required_count": 1
if supplement_required
else 0,
"controlled_apply_packet_quarantined_count": 1 if quarantined else 0,
"controlled_apply_runtime_action_rejected_count": 1
if rejected_runtime
else 0,
"runtime_gate_count": 0,
"wazuh_api_live_query_authorized_count": 0,
"wazuh_active_response_authorized_count": 0,
"host_write_authorized_count": 0,
"secret_value_collection_allowed_count": 0,
"finding_count": len(findings),
},
"validation_findings": findings,
"boundary_markers": [
"wazuh_runtime_controlled_apply_packet_validation_received_count=1",
f"wazuh_runtime_controlled_apply_packet_validation_ready_count={1 if accepted else 0}",
f"wazuh_runtime_controlled_apply_packet_validation_quarantined_count={1 if quarantined else 0}",
f"wazuh_runtime_controlled_apply_packet_validation_runtime_action_rejected_count={1 if rejected_runtime else 0}",
"wazuh_runtime_controlled_apply_packet_validation_runtime_gate_count=0",
"wazuh_runtime_controlled_apply_packet_validation_no_persist=true",
"wazuh_api_live_query_authorized=false",
"wazuh_active_response_authorized=false",
"host_write_authorized=false",
"secret_value_collection_allowed=false",
"not_authorization=true",
],
"boundaries": {
"payload_persisted": False,
"wazuh_api_live_query_authorized": False,
"wazuh_active_response_authorized": False,
"wazuh_agent_reenroll_authorized": False,
"wazuh_agent_restart_authorized": False,
"wazuh_manager_restart_authorized": False,
"host_write_authorized": False,
"active_scan_authorized": False,
"kali_execute_authorized": False,
"nginx_reload_authorized": False,
"secret_value_collection_allowed": False,
"runtime_execution_authorized": False,
"runtime_gate_open": False,
"not_authorization": True,
},
"next_gate": "run_allowlisted_check_mode_and_post_apply_verifier_preflight"
if accepted
else "controlled_apply_packet_fix_and_resubmit",
}
def _finding(
check_id: str,
severity: str,
lane: str,
message: str,
field_paths: list[str],
extra: dict[str, Any] | None = None,
) -> dict[str, Any]:
payload: dict[str, Any] = {
"check_id": check_id,
"severity": severity,
"lane": lane,
"message": message,
"field_paths": field_paths,
}
if extra:
payload.update(extra)
return payload
def _present(value: Any) -> bool:
if value is None:
return False
if isinstance(value, str):
return bool(value.strip())
if isinstance(value, list | dict | tuple | set):
return bool(value)
return True
def _validate_aliases(value: Any, expected_aliases: set[str]) -> str | None:
aliases = value if isinstance(value, list) else []
if not aliases or not all(isinstance(item, str) for item in aliases):
return "target_selector_aliases must be an array of public alias strings."
alias_set = set(aliases)
if len(aliases) != len(alias_set):
return "target_selector_aliases must not contain duplicates."
if alias_set != expected_aliases:
missing = sorted(expected_aliases - alias_set)
extra = sorted(alias_set - expected_aliases)
return f"target_selector_aliases must match expected public aliases; missing={missing} extra={extra}"
return None
def _collect_sensitive_hits(value: Any, path: str = "$") -> list[dict[str, str]]:
hits: list[dict[str, str]] = []
if isinstance(value, dict):
for key, item in value.items():
key_text = str(key)
key_lower = key_text.lower()
for fragment in _FORBIDDEN_KEY_FRAGMENTS:
if fragment in key_lower:
hits.append(
{
"path": f"{path}.{key_text}",
"category": f"forbidden_key:{fragment}",
}
)
hits.extend(_collect_sensitive_hits(item, f"{path}.{key_text}"))
return hits
if isinstance(value, list):
for index, item in enumerate(value):
hits.extend(_collect_sensitive_hits(item, f"{path}[{index}]"))
return hits
if isinstance(value, str):
for category, pattern in _SENSITIVE_TEXT_PATTERNS.items():
if pattern.search(value):
hits.append({"path": path, "category": category})
return hits
def _collect_runtime_action_hits(value: Any, path: str = "$") -> list[str]:
hits: list[str] = []
if isinstance(value, dict):
for key, item in value.items():
key_text = str(key)
normalized_key = key_text.lower().replace("-", "_").replace(" ", "_")
if normalized_key in _RUNTIME_ACTION_KEYS and item not in (
False,
None,
"",
[],
{},
):
hits.append(f"{path}.{key_text}")
hits.extend(_collect_runtime_action_hits(item, f"{path}.{key_text}"))
return hits
if isinstance(value, list):
for index, item in enumerate(value):
hits.extend(_collect_runtime_action_hits(item, f"{path}[{index}]"))
return hits
if isinstance(value, str):
normalized = value.lower().replace("-", "_").replace(" ", "_")
if normalized in _RUNTIME_ACTION_KEYS:
hits.append(path)
return hits
def _first_blocking_lane(findings: list[dict[str, Any]]) -> str | None:
severity_order = {"critical": 0, "blocker": 1, "warn": 2, "info": 3}
blocking = [
finding
for finding in findings
if finding.get("severity") in {"critical", "blocker"}
]
if not blocking:
return None
blocking.sort(
key=lambda finding: severity_order.get(str(finding.get("severity")), 99)
)
return str(blocking[0].get("lane") or "request_controlled_apply_packet_supplement")

View File

@@ -7,6 +7,9 @@ from src.api.v1.iwooos import router
from src.services.iwooos_runtime_security_readback import (
load_latest_iwooos_runtime_security_readback,
)
from src.services.iwooos_wazuh_runtime_controlled_apply_preflight import (
load_latest_iwooos_wazuh_runtime_controlled_apply_preflight,
)
def _client() -> TestClient:
@@ -15,13 +18,39 @@ def _client() -> TestClient:
return TestClient(app)
def _valid_runtime_controlled_apply_packet() -> dict[str, object]:
return {
"controlled_apply_intent": "prepare_controlled_apply_preflight_only",
"target_selector_aliases": [
"managed_core_node_a",
"managed_core_node_b",
"managed_core_node_c",
"managed_edge_node_a",
"managed_edge_node_b",
"managed_lab_node_a",
],
"source_of_truth_diff_ref": "docs/security/wazuh-runtime-controlled-apply-preflight.snapshot.json#source-diff",
"check_mode_plan_ref": "playbooks/wazuh-controlled-apply-check-mode#redacted-plan",
"dry_run_evidence_ref": "evidence/iwooos/wazuh-runtime-dry-run-redacted-v1",
"blast_radius_statement": "public aliases only; no live Wazuh query and no host write in this preflight",
"rollback_plan_ref": "playbooks/wazuh-controlled-apply-rollback#redacted-plan",
"post_apply_verifier_ref": "verifiers/iwooos-wazuh-post-apply-readback#public-safe",
"km_playbook_writeback_ref": "km/playbook-trust/wazuh-controlled-apply-preflight-v1",
"maintenance_window": "low-traffic-window-required-before-any-future-apply",
"followup_owner": "iwooos-security-reviewer",
"rollback_owner": "iwooos-security-reviewer",
"audit_receipt_ref": "audit/iwooos-wazuh-controlled-apply-preflight-redacted-v1",
"runtime_boundary_ack": "runtime_gate_remains_closed",
}
def test_iwooos_runtime_security_readback_preserves_zero_runtime_gates() -> None:
payload = load_latest_iwooos_runtime_security_readback()
assert payload["schema_version"] == "iwooos_runtime_security_readback_v1"
assert payload["status"] == "blocked_waiting_owner_evidence_and_runtime_gates"
assert payload["summary"]["source_snapshot_count"] == 10
assert payload["summary"]["p0_lane_count"] == 9
assert payload["summary"]["source_snapshot_count"] == 11
assert payload["summary"]["p0_lane_count"] == 10
assert payload["summary"]["runtime_gate_count"] == 0
assert payload["summary"]["owner_response_received_count"] == 0
assert payload["summary"]["owner_response_accepted_count"] == 0
@@ -31,21 +60,47 @@ def test_iwooos_runtime_security_readback_preserves_zero_runtime_gates() -> None
assert payload["summary"]["wazuh_live_readonly_api_enabled_count"] == 0
assert payload["summary"]["wazuh_live_metadata_available_count"] == 0
assert payload["summary"]["wazuh_live_metadata_gate_owner_accepted_count"] == 0
assert payload["summary"]["wazuh_live_metadata_gate_secret_source_accepted_count"] == 0
assert payload["summary"]["wazuh_live_metadata_gate_manager_health_accepted_count"] == 0
assert payload["summary"]["wazuh_live_metadata_gate_readonly_scope_accepted_count"] == 0
assert payload["summary"]["wazuh_live_metadata_gate_post_enable_readback_count"] == 0
assert payload["summary"]["wazuh_live_metadata_gate_live_query_authorized_count"] == 0
assert (
payload["summary"]["wazuh_live_metadata_gate_secret_source_accepted_count"] == 0
)
assert (
payload["summary"]["wazuh_live_metadata_gate_manager_health_accepted_count"]
== 0
)
assert (
payload["summary"]["wazuh_live_metadata_gate_readonly_scope_accepted_count"]
== 0
)
assert (
payload["summary"]["wazuh_live_metadata_gate_post_enable_readback_count"] == 0
)
assert (
payload["summary"]["wazuh_live_metadata_gate_live_query_authorized_count"] == 0
)
assert payload["summary"]["wazuh_owner_evidence_required_field_count"] == 28
assert payload["summary"]["wazuh_owner_evidence_reviewer_check_count"] == 15
assert payload["summary"]["wazuh_owner_evidence_outcome_lane_count"] == 8
assert payload["summary"]["wazuh_owner_evidence_forbidden_payload_count"] == 22
assert payload["summary"]["wazuh_owner_evidence_expected_alias_count"] == 6
assert payload["summary"]["wazuh_owner_evidence_registry_export_received_count"] == 0
assert payload["summary"]["wazuh_owner_evidence_registry_export_accepted_count"] == 0
assert (
payload["summary"]["wazuh_owner_evidence_registry_export_received_count"] == 0
)
assert (
payload["summary"]["wazuh_owner_evidence_registry_export_accepted_count"] == 0
)
assert payload["summary"]["wazuh_owner_evidence_received_count"] == 0
assert payload["summary"]["wazuh_owner_evidence_accepted_count"] == 0
assert payload["summary"]["wazuh_owner_evidence_runtime_gate_count"] == 0
assert payload["summary"]["wazuh_runtime_apply_preflight_ready_count"] == 1
assert payload["summary"]["wazuh_runtime_apply_target_selector_count"] == 6
assert payload["summary"]["wazuh_runtime_apply_source_diff_count"] == 1
assert payload["summary"]["wazuh_runtime_apply_check_mode_plan_count"] == 1
assert payload["summary"]["wazuh_runtime_apply_dry_run_required_count"] == 1
assert payload["summary"]["wazuh_runtime_apply_rollback_plan_count"] == 1
assert payload["summary"]["wazuh_runtime_apply_post_apply_verifier_count"] == 1
assert payload["summary"]["wazuh_runtime_apply_km_writeback_count"] == 1
assert payload["summary"]["wazuh_runtime_apply_owner_review_ready_count"] == 1
assert payload["summary"]["wazuh_runtime_apply_runtime_gate_count"] == 0
assert payload["summary"]["kali_active_scan_authorized_count"] == 0
assert payload["summary"]["kali_execute_authorized_count"] == 0
assert payload["summary"]["alert_receipt_runtime_send_count"] == 0
@@ -64,6 +119,7 @@ def test_iwooos_runtime_security_readback_lanes_are_candidate_only() -> None:
"wazuh_live_route",
"wazuh_live_metadata_gate",
"wazuh_owner_evidence_preflight",
"wazuh_runtime_controlled_apply_preflight",
"wazuh_dashboard_api",
"kali_intake",
"alert_readability",
@@ -74,10 +130,28 @@ def test_iwooos_runtime_security_readback_lanes_are_candidate_only() -> None:
assert all(lane["next_gate"] for lane in payload["lanes"])
assert all(lane["source_refs"] for lane in payload["lanes"])
assert any(lane["completion_percent"] > 0 for lane in payload["lanes"])
assert all(lane["lane_id"] != "wazuh_registry" or lane["completion_percent"] == 35 for lane in payload["lanes"])
assert all(lane["lane_id"] != "wazuh_live_route" or lane["metrics"]["route_degraded"] == 1 for lane in payload["lanes"])
assert all(lane["lane_id"] != "wazuh_live_metadata_gate" or lane["completion_percent"] == 0 for lane in payload["lanes"])
assert all(lane["lane_id"] != "wazuh_owner_evidence_preflight" or lane["metrics"]["owner_accepted"] == 0 for lane in payload["lanes"])
assert all(
lane["lane_id"] != "wazuh_registry" or lane["completion_percent"] == 35
for lane in payload["lanes"]
)
assert all(
lane["lane_id"] != "wazuh_live_route" or lane["metrics"]["route_degraded"] == 1
for lane in payload["lanes"]
)
assert all(
lane["lane_id"] != "wazuh_live_metadata_gate" or lane["completion_percent"] == 0
for lane in payload["lanes"]
)
assert all(
lane["lane_id"] != "wazuh_owner_evidence_preflight"
or lane["metrics"]["owner_accepted"] == 0
for lane in payload["lanes"]
)
assert all(
lane["lane_id"] != "wazuh_runtime_controlled_apply_preflight"
or (lane["completion_percent"] == 45 and lane["metrics"]["runtime_gate"] == 0)
for lane in payload["lanes"]
)
def test_iwooos_runtime_security_readback_api_is_public_safe(monkeypatch) -> None:
@@ -92,20 +166,27 @@ def test_iwooos_runtime_security_readback_api_is_public_safe(monkeypatch) -> Non
data = response.json()
assert data["schema_version"] == "iwooos_runtime_security_readback_v1"
assert data["summary"]["runtime_gate_count"] == 0
assert data["summary"]["wazuh_live_status"] == "disabled_waiting_iwooos_wazuh_owner_gate"
assert (
data["summary"]["wazuh_live_status"]
== "disabled_waiting_iwooos_wazuh_owner_gate"
)
assert data["summary"]["wazuh_live_route_http_status"] == 200
assert data["summary"]["wazuh_live_route_degraded_count"] == 1
assert data["summary"]["wazuh_live_metadata_available_count"] == 0
assert data["summary"]["wazuh_live_metadata_gate_live_query_authorized_count"] == 0
assert data["summary"]["wazuh_owner_evidence_accepted_count"] == 0
assert data["summary"]["wazuh_owner_evidence_runtime_gate_count"] == 0
assert data["summary"]["wazuh_runtime_apply_preflight_ready_count"] == 1
assert data["summary"]["wazuh_runtime_apply_runtime_gate_count"] == 0
assert data["boundaries"]["secret_value_collection_allowed"] is False
assert "192.168.0." not in response.text
assert "工作視窗" not in response.text
assert "批准!繼續" not in response.text
def test_iwooos_runtime_security_readback_api_includes_live_wazuh_empty_registry(monkeypatch) -> None:
def test_iwooos_runtime_security_readback_api_includes_live_wazuh_empty_registry(
monkeypatch,
) -> None:
import httpx
monkeypatch.setenv("IWOOOS_WAZUH_READONLY_ENABLED", "true")
@@ -120,7 +201,16 @@ def test_iwooos_runtime_security_readback_api_includes_live_wazuh_empty_registry
if request.url.path == "/agents/summary/status":
return httpx.Response(
200,
json={"data": {"connection": {"total": 0, "active": 0, "disconnected": 0, "pending": 0}}},
json={
"data": {
"connection": {
"total": 0,
"active": 0,
"disconnected": 0,
"pending": 0,
}
}
},
)
if request.url.path == "/agents":
return httpx.Response(200, json={"data": {"affected_items": []}})
@@ -171,7 +261,10 @@ def test_iwooos_wazuh_live_metadata_gate_api_is_public_safe(monkeypatch) -> None
assert data["summary"]["runtime_gate_count"] == 0
assert data["summary"]["wazuh_live_route_http_status"] == 200
assert data["summary"]["wazuh_live_route_degraded_count"] == 1
assert data["summary"]["wazuh_live_status"] == "disabled_waiting_iwooos_wazuh_owner_gate"
assert (
data["summary"]["wazuh_live_status"]
== "disabled_waiting_iwooos_wazuh_owner_gate"
)
assert data["boundaries"]["secret_value_collection_allowed"] is False
assert data["boundaries"]["wazuh_api_live_query_authorized"] is False
assert data["boundaries"]["wazuh_active_response_authorized"] is False
@@ -219,8 +312,154 @@ def test_iwooos_wazuh_owner_evidence_preflight_api_is_public_safe(monkeypatch) -
assert data["boundaries"]["not_authorization"] is True
assert len(data["items"]) == 8
assert any(marker == "必要欄位=28" for marker in data["boundary_markers"])
assert any(rule.startswith("負責人證據預檢 ready") for rule in data["no_false_green_rules"])
assert any(
rule.startswith("負責人證據預檢 ready") for rule in data["no_false_green_rules"]
)
assert "192.168.0." not in response.text
assert "工作視窗" not in response.text
assert "批准!繼續" not in response.text
assert "WAZUH_API_PASSWORD" not in response.text
def test_iwooos_wazuh_runtime_controlled_apply_preflight_api_is_public_safe(
monkeypatch,
) -> None:
monkeypatch.delenv("IWOOOS_WAZUH_READONLY_ENABLED", raising=False)
monkeypatch.delenv("WAZUH_API_BASE_URL", raising=False)
monkeypatch.delenv("WAZUH_API_USERNAME", raising=False)
monkeypatch.delenv("WAZUH_API_PASSWORD", raising=False)
payload = load_latest_iwooos_wazuh_runtime_controlled_apply_preflight()
assert (
payload["schema_version"]
== "iwooos_wazuh_runtime_controlled_apply_preflight_readback_v1"
)
assert payload["status"] == "controlled_apply_preflight_ready_no_runtime_action"
assert payload["summary"]["controlled_apply_preflight_ready_count"] == 1
assert payload["summary"]["target_selector_count"] == 6
assert payload["summary"]["runtime_gate_count"] == 0
assert payload["boundaries"]["runtime_execution_authorized"] is False
response = _client().get("/api/v1/iwooos/wazuh-runtime-controlled-apply-preflight")
assert response.status_code == 200
data = response.json()
assert (
data["schema_version"]
== "iwooos_wazuh_runtime_controlled_apply_preflight_readback_v1"
)
assert data["summary"]["controlled_apply_preflight_ready_count"] == 1
assert data["summary"]["target_selector_count"] == 6
assert data["summary"]["runtime_gate_count"] == 0
assert data["summary"]["wazuh_api_live_query_authorized_count"] == 0
assert data["summary"]["wazuh_active_response_authorized_count"] == 0
assert data["summary"]["host_write_authorized_count"] == 0
assert data["summary"]["secret_value_collection_allowed_count"] == 0
assert data["boundaries"]["payload_persisted"] is False
assert data["boundaries"]["wazuh_api_live_query_authorized"] is False
assert data["boundaries"]["wazuh_active_response_authorized"] is False
assert data["boundaries"]["host_write_authorized"] is False
assert data["boundaries"]["runtime_gate_open"] is False
assert data["boundaries"]["not_authorization"] is True
assert len(data["target_selectors"]) == 6
assert len(data["preflight_items"]) == 6
assert any(
marker == "wazuh_runtime_controlled_apply_preflight_visible=true"
for marker in data["boundary_markers"]
)
assert "192.168.0." not in response.text
assert "工作視窗" not in response.text
assert "批准!繼續" not in response.text
assert "WAZUH_API_PASSWORD" not in response.text
def test_iwooos_wazuh_runtime_controlled_apply_preflight_validator_accepts_redacted_packet(
monkeypatch,
) -> None:
monkeypatch.delenv("IWOOOS_WAZUH_READONLY_ENABLED", raising=False)
monkeypatch.delenv("WAZUH_API_BASE_URL", raising=False)
monkeypatch.delenv("WAZUH_API_USERNAME", raising=False)
monkeypatch.delenv("WAZUH_API_PASSWORD", raising=False)
client = _client()
before = client.get(
"/api/v1/iwooos/wazuh-runtime-controlled-apply-preflight"
).json()
response = client.post(
"/api/v1/iwooos/wazuh-runtime-controlled-apply-preflight/validate-controlled-apply-packet",
json=_valid_runtime_controlled_apply_packet(),
)
after = client.get("/api/v1/iwooos/wazuh-runtime-controlled-apply-preflight").json()
assert response.status_code == 200
data = response.json()
assert (
data["schema_version"]
== "iwooos_wazuh_runtime_controlled_apply_packet_validation_result_v1"
)
assert data["status"] == "accepted_for_controlled_apply_preflight_review_only"
assert data["accepted_for_controlled_apply_preflight_review_only"] is True
assert data["summary"]["controlled_apply_packet_received_count"] == 1
assert data["summary"]["controlled_apply_preflight_ready_count"] == 1
assert data["summary"]["runtime_gate_count"] == 0
assert data["summary"]["wazuh_api_live_query_authorized_count"] == 0
assert data["summary"]["wazuh_active_response_authorized_count"] == 0
assert data["summary"]["host_write_authorized_count"] == 0
assert data["summary"]["secret_value_collection_allowed_count"] == 0
assert data["boundaries"]["payload_persisted"] is False
assert data["boundaries"]["runtime_execution_authorized"] is False
assert data["boundaries"]["runtime_gate_open"] is False
assert before["summary"] == after["summary"]
assert "192.168.0." not in response.text
assert "工作視窗" not in response.text
assert "批准!繼續" not in response.text
def test_iwooos_wazuh_runtime_controlled_apply_preflight_validator_quarantines_sensitive_payload(
monkeypatch,
) -> None:
monkeypatch.delenv("IWOOOS_WAZUH_READONLY_ENABLED", raising=False)
monkeypatch.delenv("WAZUH_API_BASE_URL", raising=False)
monkeypatch.delenv("WAZUH_API_USERNAME", raising=False)
monkeypatch.delenv("WAZUH_API_PASSWORD", raising=False)
packet = _valid_runtime_controlled_apply_packet()
packet[
"redacted_evidence_ref"
] = "raw output includes 10.1.2.3 and Authorization: Bearer abcdefghijklmnop"
response = _client().post(
"/api/v1/iwooos/wazuh-runtime-controlled-apply-preflight/validate-controlled-apply-packet",
json=packet,
)
assert response.status_code == 200
data = response.json()
assert data["status"] == "quarantine_sensitive_payload"
assert data["quarantined"] is True
assert data["summary"]["controlled_apply_packet_quarantined_count"] == 1
assert data["summary"]["runtime_gate_count"] == 0
assert "10.1.2.3" not in response.text
assert "Bearer abcdefghijklmnop" not in response.text
def test_iwooos_wazuh_runtime_controlled_apply_preflight_validator_rejects_runtime_action(
monkeypatch,
) -> None:
monkeypatch.delenv("IWOOOS_WAZUH_READONLY_ENABLED", raising=False)
monkeypatch.delenv("WAZUH_API_BASE_URL", raising=False)
monkeypatch.delenv("WAZUH_API_USERNAME", raising=False)
monkeypatch.delenv("WAZUH_API_PASSWORD", raising=False)
packet = _valid_runtime_controlled_apply_packet()
packet["wazuh_active_response"] = True
response = _client().post(
"/api/v1/iwooos/wazuh-runtime-controlled-apply-preflight/validate-controlled-apply-packet",
json=packet,
)
assert response.status_code == 200
data = response.json()
assert data["status"] == "reject_runtime_action_request"
assert data["runtime_action_rejected"] is True
assert data["summary"]["controlled_apply_runtime_action_rejected_count"] == 1
assert data["summary"]["runtime_gate_count"] == 0

View File

@@ -20429,8 +20429,8 @@
},
"runtimeSecurityReadback": {
"eyebrow": "IwoooS Runtime 資安讀回",
"title": "條 P0 資安線先接到同一張讀回板",
"subtitle": "這張板讀取後端彙整的只讀快照,並附上 Wazuh 正式只讀路由的公開安全彙總讀回;它不保存原始 Wazuh 載荷、不啟動掃描、不送訊息、不改主機。",
"title": "條 P0 資安線先接到同一張讀回板",
"subtitle": "這張板讀取後端彙整的只讀快照,並附上 Wazuh 正式只讀路由與 controlled apply preflight 的公開安全彙總讀回;它不保存原始 Wazuh 載荷、不啟動掃描、不送訊息、不改主機。",
"statusLabel": "讀回狀態",
"statusDetail": "讀回成功只代表 IwoooS 能看見目前的證據邊界runtime 寫入、主動回應、掃描、重啟、Nginx reload、workflow 修改與機密操作仍全部關閉。",
"laneStatusLabel": "目前狀態",
@@ -20448,6 +20448,7 @@
"wazuh_live_route": "正式只讀路由已關閉,等待 IwoooS Wazuh 負責人閘門",
"wazuh_live_metadata_gate": "等待即時中繼資料負責人回覆",
"wazuh_owner_evidence_preflight": "負責人證據預檢已就緒,尚未授權執行期動作",
"wazuh_runtime_controlled_apply_preflight": "受控執行預檢已就緒,執行期閘門仍關閉",
"wazuh_dashboard_api": "API 連線退化,禁止顯示綠燈",
"kali_intake": "部分執行期健康已整合,仍待完整驗收",
"alert_readability": "格式合約已就緒,尚未有實發收件證據",
@@ -20476,6 +20477,10 @@
"label": "中繼資料閘門",
"detail": "即時中繼資料查詢仍需負責人、機密中繼資料、唯讀範圍與啟用後讀回。"
},
"controlledApplyPreflight": {
"label": "受控預檢",
"detail": "target selector、diff、check-mode、rollback、verifier 與 writeback 已可審查runtime 仍為 0。"
},
"ownerAccepted": {
"label": "負責人驗收",
"detail": "收到 / 接受都必須由正式 owner response 證明。"
@@ -20506,6 +20511,10 @@
"title": "Wazuh 負責人證據預檢",
"body": "把 manager registry 脫敏封包、逐主機矩陣、Dashboard API 狀態、拒收敏感類型與負責人決策先變成可驗收格式;收件、接受與執行期仍為 0。"
},
"wazuh_runtime_controlled_apply_preflight": {
"title": "Wazuh 受控執行預檢",
"body": "target selector、source-of-truth diff、check-mode / dry-run、rollback、post-apply verifier 與 KM / PlayBook writeback 已成為可審查 packet它仍不查 live Wazuh、不開 active response、不寫主機。"
},
"wazuh_dashboard_api": {
"title": "Wazuh Dashboard API",
"body": "API connection / API version 還要補 readbackindex pattern 通過不能宣稱 Wazuh 全綠。"

View File

@@ -20429,8 +20429,8 @@
},
"runtimeSecurityReadback": {
"eyebrow": "IwoooS Runtime 資安讀回",
"title": "條 P0 資安線先接到同一張讀回板",
"subtitle": "這張板讀取後端彙整的只讀快照,並附上 Wazuh 正式只讀路由的公開安全彙總讀回;它不保存原始 Wazuh 載荷、不啟動掃描、不送訊息、不改主機。",
"title": "條 P0 資安線先接到同一張讀回板",
"subtitle": "這張板讀取後端彙整的只讀快照,並附上 Wazuh 正式只讀路由與 controlled apply preflight 的公開安全彙總讀回;它不保存原始 Wazuh 載荷、不啟動掃描、不送訊息、不改主機。",
"statusLabel": "讀回狀態",
"statusDetail": "讀回成功只代表 IwoooS 能看見目前的證據邊界runtime 寫入、主動回應、掃描、重啟、Nginx reload、workflow 修改與機密操作仍全部關閉。",
"laneStatusLabel": "目前狀態",
@@ -20448,6 +20448,7 @@
"wazuh_live_route": "正式只讀路由已關閉,等待 IwoooS Wazuh 負責人閘門",
"wazuh_live_metadata_gate": "等待即時中繼資料負責人回覆",
"wazuh_owner_evidence_preflight": "負責人證據預檢已就緒,尚未授權執行期動作",
"wazuh_runtime_controlled_apply_preflight": "受控執行預檢已就緒,執行期閘門仍關閉",
"wazuh_dashboard_api": "API 連線退化,禁止顯示綠燈",
"kali_intake": "部分執行期健康已整合,仍待完整驗收",
"alert_readability": "格式合約已就緒,尚未有實發收件證據",
@@ -20476,6 +20477,10 @@
"label": "中繼資料閘門",
"detail": "即時中繼資料查詢仍需負責人、機密中繼資料、唯讀範圍與啟用後讀回。"
},
"controlledApplyPreflight": {
"label": "受控預檢",
"detail": "target selector、diff、check-mode、rollback、verifier 與 writeback 已可審查runtime 仍為 0。"
},
"ownerAccepted": {
"label": "負責人驗收",
"detail": "收到 / 接受都必須由正式 owner response 證明。"
@@ -20506,6 +20511,10 @@
"title": "Wazuh 負責人證據預檢",
"body": "把 manager registry 脫敏封包、逐主機矩陣、Dashboard API 狀態、拒收敏感類型與負責人決策先變成可驗收格式;收件、接受與執行期仍為 0。"
},
"wazuh_runtime_controlled_apply_preflight": {
"title": "Wazuh 受控執行預檢",
"body": "target selector、source-of-truth diff、check-mode / dry-run、rollback、post-apply verifier 與 KM / PlayBook writeback 已成為可審查 packet它仍不查 live Wazuh、不開 active response、不寫主機。"
},
"wazuh_dashboard_api": {
"title": "Wazuh Dashboard API",
"body": "API connection / API version 還要補 readbackindex pattern 通過不能宣稱 Wazuh 全綠。"

View File

@@ -346,6 +346,7 @@ type RuntimeSecurityReadbackSummaryItem = {
| 'wazuhRegistry'
| 'wazuhLive'
| 'metadataGate'
| 'controlledApplyPreflight'
| 'ownerAccepted'
| 'kaliRuntime'
| 'runtimeGate'
@@ -8242,6 +8243,7 @@ const runtimeSecurityLaneStatusKeys = new Set<IwoooSRuntimeSecurityReadbackRespo
'wazuh_live_route',
'wazuh_live_metadata_gate',
'wazuh_owner_evidence_preflight',
'wazuh_runtime_controlled_apply_preflight',
'wazuh_dashboard_api',
'kali_intake',
'alert_readability',
@@ -8319,6 +8321,12 @@ function IwoooSRuntimeSecurityReadbackBoard() {
icon: ClipboardCheck,
tone: 'locked',
},
{
key: 'controlledApplyPreflight',
value: summary ? String(summary.wazuh_runtime_apply_preflight_ready_count) : '...',
icon: ListChecks,
tone: summary && summary.wazuh_runtime_apply_preflight_ready_count > 0 ? 'steady' : 'locked',
},
{
key: 'ownerAccepted',
value: summary ? `${summary.owner_response_accepted_count}/${summary.owner_response_received_count}` : '...',

View File

@@ -103,6 +103,7 @@ export interface IwoooSRuntimeSecurityReadbackLane {
| 'wazuh_live_route'
| 'wazuh_live_metadata_gate'
| 'wazuh_owner_evidence_preflight'
| 'wazuh_runtime_controlled_apply_preflight'
| 'wazuh_dashboard_api'
| 'kali_intake'
| 'alert_readability'
@@ -159,6 +160,16 @@ export interface IwoooSRuntimeSecurityReadbackResponse {
wazuh_owner_evidence_received_count: number
wazuh_owner_evidence_accepted_count: number
wazuh_owner_evidence_runtime_gate_count: number
wazuh_runtime_apply_preflight_ready_count: number
wazuh_runtime_apply_target_selector_count: number
wazuh_runtime_apply_source_diff_count: number
wazuh_runtime_apply_check_mode_plan_count: number
wazuh_runtime_apply_dry_run_required_count: number
wazuh_runtime_apply_rollback_plan_count: number
wazuh_runtime_apply_post_apply_verifier_count: number
wazuh_runtime_apply_km_writeback_count: number
wazuh_runtime_apply_owner_review_ready_count: number
wazuh_runtime_apply_runtime_gate_count: number
kali_active_scan_authorized_count: number
kali_execute_authorized_count: number
kali_finding_envelope_accepted_count: number
@@ -268,6 +279,69 @@ export interface IwoooSWazuhOwnerEvidencePreflightResponse {
no_false_green_rules: string[]
}
export interface IwoooSWazuhRuntimeControlledApplyPreflightItem {
item_id:
| 'target_selector'
| 'source_of_truth_diff'
| 'check_mode_dry_run'
| 'rollback'
| 'post_apply_verifier'
| 'learning_writeback'
title: string
state_key: string
ready: boolean
required_fields: string[]
next_gate: string
}
export interface IwoooSWazuhRuntimeControlledApplyPreflightResponse {
schema_version: 'iwooos_wazuh_runtime_controlled_apply_preflight_readback_v1'
source_schema_version: 'wazuh_runtime_controlled_apply_preflight_v1'
status: string
mode: string
source_refs: string[]
controlled_apply_packet_validation_endpoint: string
controlled_apply_packet_validation_mode: string
summary: {
expected_scope_alias_count: number
target_selector_count: number
source_of_truth_diff_count: number
check_mode_plan_count: number
dry_run_required_count: number
rollback_plan_count: number
post_apply_verifier_count: number
km_playbook_writeback_count: number
maintenance_window_required_count: number
owner_review_ready_count: number
controlled_apply_preflight_ready_count: number
controlled_apply_packet_received_count: number
controlled_apply_packet_accepted_count: number
controlled_apply_packet_quarantined_count: number
controlled_apply_runtime_action_rejected_count: number
forbidden_payload_count: number
forbidden_action_count: number
runtime_gate_count: number
wazuh_api_live_query_authorized_count: number
wazuh_active_response_authorized_count: number
host_write_authorized_count: number
secret_value_collection_allowed_count: number
}
target_selectors: Array<{
node_alias: string
scope: string
selector_kind: string
runtime_write_allowed: boolean
}>
required_packet_fields: string[]
preflight_items: IwoooSWazuhRuntimeControlledApplyPreflightItem[]
outcome_lanes: string[]
forbidden_payloads: string[]
forbidden_actions: string[]
boundary_markers: string[]
boundaries: Record<string, boolean>
no_false_green_rules: string[]
}
export interface IwoooSWazuhManagedHostCoverageHost {
node_id: string
role: string
@@ -621,6 +695,11 @@ export const apiClient = {
return handleResponse<IwoooSWazuhOwnerEvidencePreflightResponse>(res)
},
async getIwoooSWazuhRuntimeControlledApplyPreflight() {
const res = await fetch(`${API_BASE_URL}/iwooos/wazuh-runtime-controlled-apply-preflight`, { cache: 'no-store' })
return handleResponse<IwoooSWazuhRuntimeControlledApplyPreflightResponse>(res)
},
async getIwoooSWazuhManagedHostCoverage() {
const res = await fetch(`${API_BASE_URL}/iwooos/wazuh-managed-host-coverage`, { cache: 'no-store' })
return handleResponse<IwoooSWazuhManagedHostCoverageResponse>(res)

View File

@@ -0,0 +1,223 @@
{
"schema_version": "wazuh_runtime_controlled_apply_preflight_v1",
"generated_at": "2026-06-28T10:30:00+08:00",
"status": "controlled_apply_preflight_ready_no_runtime_action",
"mode": "committed_preflight_readback_no_live_wazuh_no_secret_collection",
"summary": {
"expected_scope_alias_count": 6,
"target_selector_count": 6,
"source_of_truth_diff_count": 1,
"check_mode_plan_count": 1,
"dry_run_required_count": 1,
"rollback_plan_count": 1,
"post_apply_verifier_count": 1,
"km_playbook_writeback_count": 1,
"maintenance_window_required_count": 1,
"owner_review_ready_count": 1,
"controlled_apply_preflight_ready_count": 1,
"controlled_apply_packet_received_count": 0,
"controlled_apply_packet_accepted_count": 0,
"controlled_apply_packet_quarantined_count": 0,
"controlled_apply_runtime_action_rejected_count": 0,
"forbidden_payload_count": 18,
"forbidden_action_count": 20,
"runtime_gate_count": 0,
"wazuh_api_live_query_authorized_count": 0,
"wazuh_active_response_authorized_count": 0,
"host_write_authorized_count": 0,
"secret_value_collection_allowed_count": 0
},
"target_selectors": [
{
"node_alias": "managed_core_node_a",
"scope": "wazuh_manager_registry_accepted_alias",
"selector_kind": "public_alias_only",
"runtime_write_allowed": false
},
{
"node_alias": "managed_core_node_b",
"scope": "wazuh_manager_registry_accepted_alias",
"selector_kind": "public_alias_only",
"runtime_write_allowed": false
},
{
"node_alias": "managed_core_node_c",
"scope": "wazuh_manager_registry_accepted_alias",
"selector_kind": "public_alias_only",
"runtime_write_allowed": false
},
{
"node_alias": "managed_edge_node_a",
"scope": "wazuh_manager_registry_accepted_alias",
"selector_kind": "public_alias_only",
"runtime_write_allowed": false
},
{
"node_alias": "managed_edge_node_b",
"scope": "wazuh_manager_registry_accepted_alias",
"selector_kind": "public_alias_only",
"runtime_write_allowed": false
},
{
"node_alias": "managed_lab_node_a",
"scope": "wazuh_manager_registry_accepted_alias",
"selector_kind": "public_alias_only",
"runtime_write_allowed": false
}
],
"required_packet_fields": [
"controlled_apply_intent",
"target_selector_aliases",
"source_of_truth_diff_ref",
"check_mode_plan_ref",
"dry_run_evidence_ref",
"blast_radius_statement",
"rollback_plan_ref",
"post_apply_verifier_ref",
"km_playbook_writeback_ref",
"maintenance_window",
"followup_owner",
"rollback_owner",
"audit_receipt_ref",
"runtime_boundary_ack"
],
"preflight_items": [
{
"item_id": "target_selector",
"title": "Public alias target selector",
"state_key": "target_selector_ready",
"ready": true,
"required_fields": [
"target_selector_aliases"
],
"next_gate": "run check-mode against allowlisted route before any runtime action"
},
{
"item_id": "source_of_truth_diff",
"title": "Source-of-truth diff reference",
"state_key": "source_of_truth_diff_ready",
"ready": true,
"required_fields": [
"source_of_truth_diff_ref"
],
"next_gate": "review repo or playbook diff before controlled apply"
},
{
"item_id": "check_mode_dry_run",
"title": "Check-mode and dry-run evidence",
"state_key": "check_mode_dry_run_ready",
"ready": true,
"required_fields": [
"check_mode_plan_ref",
"dry_run_evidence_ref"
],
"next_gate": "store dry-run evidence reference without raw host output"
},
{
"item_id": "rollback",
"title": "Rollback plan",
"state_key": "rollback_ready",
"ready": true,
"required_fields": [
"rollback_plan_ref",
"rollback_owner"
],
"next_gate": "rollback must remain available before apply"
},
{
"item_id": "post_apply_verifier",
"title": "Post-apply verifier",
"state_key": "post_apply_verifier_ready",
"ready": true,
"required_fields": [
"post_apply_verifier_ref"
],
"next_gate": "verifier readback must run after any future controlled apply"
},
{
"item_id": "learning_writeback",
"title": "KM and PlayBook trust writeback",
"state_key": "learning_writeback_ready",
"ready": true,
"required_fields": [
"km_playbook_writeback_ref",
"audit_receipt_ref"
],
"next_gate": "writeback receipt required after verifier"
}
],
"outcome_lanes": [
"accepted_for_controlled_apply_preflight_review_only",
"request_controlled_apply_packet_supplement",
"quarantine_sensitive_payload",
"reject_runtime_action_request"
],
"forbidden_payloads": [
"secret_value",
"token_value",
"private_key",
"cookie",
"session",
"authorization_header",
"client.keys",
"raw_wazuh_payload",
"raw_agent_identity",
"raw_hostname",
"internal_ip",
"full_cli_output",
"full_journal",
"raw_dashboard_request",
"unredacted_screenshot",
"private_namespace",
"raw_env_file",
"raw_runtime_volume"
],
"forbidden_actions": [
"wazuh_api_live_query",
"wazuh_active_response",
"wazuh_agent_restart",
"wazuh_agent_reenroll",
"wazuh_manager_restart",
"host_write",
"systemd_restart",
"docker_restart",
"nginx_reload",
"firewall_change",
"kali_active_scan",
"credentialed_scan",
"exploit_attempt",
"secret_rotation",
"k8s_apply",
"argocd_sync",
"database_migration",
"force_push",
"repo_ref_delete",
"workflow_trigger"
],
"execution_boundaries": {
"active_scan_authorized": false,
"alertmanager_reload_authorized": false,
"auto_block_authorized": false,
"credentialed_scan_authorized": false,
"firewall_change_authorized": false,
"host_write_authorized": false,
"kali_execute_authorized": false,
"kali_scan_authorized": false,
"nginx_reload_authorized": false,
"production_write_authorized": false,
"runtime_execution_authorized": false,
"runtime_gate_open": false,
"secret_value_collection_allowed": false,
"telegram_send_authorized": false,
"wazuh_active_response_authorized": false,
"wazuh_api_live_query_authorized": false,
"not_authorization": true
},
"no_false_green_rules": [
"Controlled apply preflight ready does not open runtime gate.",
"Target selectors are public aliases only and do not authorize host writes.",
"Check-mode and dry-run references do not authorize live Wazuh queries.",
"Rollback and verifier readiness does not authorize active response.",
"KM and PlayBook writeback readiness does not permit secret collection."
]
}