Files
awoooi/apps/api/src/services/iwooos_runtime_security_readback.py
2026-06-28 19:15:48 +08:00

890 lines
39 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
IwoooS runtime security readback.
Loads committed security snapshots and exposes a public-safe, read-only runtime
security board. This module never queries Wazuh, Kali, hosts, Docker, Nginx,
firewalls, databases, Telegram, or secrets.
"""
from __future__ import annotations
import json
from pathlib import Path
from typing import Any
from src.services.snapshot_paths import default_security_dir
_DEFAULT_SECURITY_DIR = default_security_dir(Path(__file__))
_SNAPSHOT_FILES = {
"owner_gap": "s4-9-owner-response-gap-audit.snapshot.json",
"wazuh_coverage": "wazuh-managed-host-coverage-gate.snapshot.json",
"wazuh_runtime": "wazuh-agent-visibility-runtime-gate.snapshot.json",
"wazuh_live_metadata_gate": "wazuh-readonly-live-metadata-env-gate.snapshot.json",
"wazuh_owner_evidence_preflight": "wazuh-agent-visibility-owner-evidence-preflight.snapshot.json",
"wazuh_runtime_apply_preflight": "wazuh-runtime-controlled-apply-preflight.snapshot.json",
"wazuh_runtime_owner_review": "wazuh-runtime-gate-owner-review-readback.snapshot.json",
"wazuh_allowlisted_check_mode_dry_run": "wazuh-allowlisted-check-mode-dry-run.snapshot.json",
"kali_status": "kali-integration-status.snapshot.json",
"soc_control": "soc-siem-kali-wazuh-integration-control.snapshot.json",
"alert_readability": "telegram-alert-readability-guard.snapshot.json",
"owner_dispatch": "monitoring-owner-request-draft.snapshot.json",
"intrusion_prevention": "external-host-intrusion-prevention-control.snapshot.json",
}
_EXPECTED_SCHEMAS = {
"owner_gap": "s4_9_owner_response_gap_audit_v1",
"wazuh_coverage": "wazuh_managed_host_coverage_gate_v1",
"wazuh_runtime": "wazuh_agent_visibility_runtime_gate_v1",
"wazuh_live_metadata_gate": "iwooos_wazuh_readonly_live_metadata_env_gate_v1",
"wazuh_owner_evidence_preflight": "wazuh_agent_visibility_owner_evidence_preflight_v1",
"wazuh_runtime_apply_preflight": "wazuh_runtime_controlled_apply_preflight_v1",
"wazuh_runtime_owner_review": "wazuh_runtime_gate_owner_review_readback_v1",
"wazuh_allowlisted_check_mode_dry_run": "wazuh_allowlisted_check_mode_dry_run_v1",
"kali_status": "kali_integration_status_v1",
"soc_control": "soc_siem_kali_wazuh_integration_control_v1",
"alert_readability": "telegram_alert_readability_guard_v1",
"owner_dispatch": "monitoring_owner_request_draft_v1",
"intrusion_prevention": "external_host_intrusion_prevention_control_v1",
}
_FALSE_BOUNDARY_KEYS = {
"active_scan_authorized",
"alertmanager_reload_authorized",
"auto_block_authorized",
"credentialed_scan_authorized",
"firewall_change_authorized",
"host_write_authorized",
"kali_execute_authorized",
"kali_scan_authorized",
"nginx_reload_authorized",
"production_write_authorized",
"runtime_execution_authorized",
"runtime_gate_open",
"secret_value_collection_allowed",
"telegram_send_authorized",
"wazuh_active_response_authorized",
"wazuh_api_live_query_authorized",
}
def load_latest_iwooos_runtime_security_readback(
security_dir: Path | None = None,
wazuh_live_status: dict[str, Any] | None = None,
wazuh_live_http_status: int = 0,
) -> dict[str, Any]:
"""Load and normalize the current IwoooS runtime security readback."""
directory = security_dir or _DEFAULT_SECURITY_DIR
snapshots = {
key: _load_snapshot(directory, key, filename)
for key, filename in _SNAPSHOT_FILES.items()
}
_require_runtime_boundaries(snapshots)
owner_gap_summary = _summary(snapshots["owner_gap"])
wazuh_summary = _summary(snapshots["wazuh_coverage"])
live_metadata_gate_summary = _summary(snapshots["wazuh_live_metadata_gate"])
owner_evidence_preflight_summary = _summary(
snapshots["wazuh_owner_evidence_preflight"]
)
runtime_apply_preflight_summary = _summary(
snapshots["wazuh_runtime_apply_preflight"]
)
runtime_owner_review_summary = _summary(snapshots["wazuh_runtime_owner_review"])
allowlisted_dry_run_summary = _summary(
snapshots["wazuh_allowlisted_check_mode_dry_run"]
)
soc_summary = _summary(snapshots["soc_control"])
alert_summary = _summary(snapshots["alert_readability"])
dispatch_summary = _summary(snapshots["owner_dispatch"])
intrusion_summary = _summary(snapshots["intrusion_prevention"])
live_wazuh = _wazuh_live_summary(wazuh_live_status, wazuh_live_http_status)
live_metadata_gate_ready = all(
_int(live_metadata_gate_summary.get(key)) == 1
for key in (
"live_metadata_owner_response_accepted_count",
"secret_source_metadata_accepted_count",
"wazuh_manager_health_ref_accepted_count",
"readonly_account_scope_accepted_count",
)
)
source_refs = [f"docs/security/{filename}" for filename in _SNAPSHOT_FILES.values()]
runtime_gate_count = _max_summary_count(
snapshots,
"runtime_gate_count",
"active_response_authorized_count",
"kali_active_scan_authorized_count",
"telegram_send_authorized_count",
)
return {
"schema_version": "iwooos_runtime_security_readback_v1",
"status": "blocked_waiting_owner_evidence_and_runtime_gates",
"mode": "committed_snapshot_readback_with_public_safe_wazuh_route_metadata",
"source_refs": source_refs,
"summary": {
"source_snapshot_count": len(source_refs),
"p0_lane_count": 12,
"control_plane_visibility_percent": _average_percent(
soc_summary.get("coverage_percent_after_soc_integration_control"),
intrusion_summary.get("coverage_percent_after_prevention_control"),
_alert_contract_percent(alert_summary),
),
"actual_runtime_acceptance_percent": 0,
"owner_response_received_count": _int(
owner_gap_summary.get("owner_response_received_count")
),
"owner_response_accepted_count": _int(
owner_gap_summary.get("owner_response_accepted_count")
),
"redacted_evidence_refs_received_count": 0,
"request_sent_count": _int(dispatch_summary.get("request_sent_count")),
"wazuh_expected_host_scope_count": _int(
wazuh_summary.get("expected_host_scope_count")
),
"wazuh_manager_registry_accepted_count": _int(
wazuh_summary.get("manager_registry_accepted_count")
),
"wazuh_transport_observed_count": _int(
wazuh_summary.get("manager_transport_established_connection_count")
),
"wazuh_dashboard_api_degraded_observed_count": _int(
wazuh_summary.get("dashboard_api_degraded_observed_count")
),
"wazuh_live_route_http_status": live_wazuh["http_status"],
"wazuh_live_route_degraded_count": live_wazuh["degraded_count"],
"wazuh_live_readonly_api_enabled_count": live_wazuh[
"readonly_api_enabled_count"
],
"wazuh_live_agent_total": live_wazuh["agent_total"],
"wazuh_live_agent_active": live_wazuh["agent_active"],
"wazuh_live_registry_empty_count": live_wazuh["agent_registry_empty_count"],
"wazuh_live_below_expected_count": live_wazuh[
"agent_below_expected_minimum_count"
],
"wazuh_live_metadata_available_count": live_wazuh[
"metadata_available_count"
],
"wazuh_live_status": live_wazuh["status"],
"wazuh_live_metadata_gate_owner_accepted_count": _int(
live_metadata_gate_summary.get(
"live_metadata_owner_response_accepted_count"
)
),
"wazuh_live_metadata_gate_secret_source_accepted_count": _int(
live_metadata_gate_summary.get("secret_source_metadata_accepted_count")
),
"wazuh_live_metadata_gate_manager_health_accepted_count": _int(
live_metadata_gate_summary.get(
"wazuh_manager_health_ref_accepted_count"
)
),
"wazuh_live_metadata_gate_readonly_scope_accepted_count": _int(
live_metadata_gate_summary.get("readonly_account_scope_accepted_count")
),
"wazuh_live_metadata_gate_post_enable_readback_count": _int(
live_metadata_gate_summary.get("post_enable_readback_passed_count")
),
"wazuh_live_metadata_gate_live_query_authorized_count": _int(
live_metadata_gate_summary.get("wazuh_api_live_query_authorized_count")
),
"wazuh_owner_evidence_required_field_count": _int(
owner_evidence_preflight_summary.get("required_field_count")
),
"wazuh_owner_evidence_reviewer_check_count": _int(
owner_evidence_preflight_summary.get("reviewer_check_count")
),
"wazuh_owner_evidence_outcome_lane_count": _int(
owner_evidence_preflight_summary.get("outcome_lane_count")
),
"wazuh_owner_evidence_forbidden_payload_count": _int(
owner_evidence_preflight_summary.get("forbidden_payload_count")
),
"wazuh_owner_evidence_expected_alias_count": _int(
owner_evidence_preflight_summary.get("expected_scope_alias_count")
),
"wazuh_owner_evidence_registry_export_received_count": _int(
owner_evidence_preflight_summary.get("registry_export_received_count")
),
"wazuh_owner_evidence_registry_export_accepted_count": _int(
owner_evidence_preflight_summary.get("registry_export_accepted_count")
),
"wazuh_owner_evidence_received_count": _int(
owner_evidence_preflight_summary.get("owner_evidence_received_count")
),
"wazuh_owner_evidence_accepted_count": _int(
owner_evidence_preflight_summary.get("owner_evidence_accepted_count")
),
"wazuh_owner_evidence_runtime_gate_count": _int(
owner_evidence_preflight_summary.get("runtime_gate_count")
),
"wazuh_runtime_apply_preflight_ready_count": _int(
runtime_apply_preflight_summary.get(
"controlled_apply_preflight_ready_count"
)
),
"wazuh_runtime_apply_target_selector_count": _int(
runtime_apply_preflight_summary.get("target_selector_count")
),
"wazuh_runtime_apply_source_diff_count": _int(
runtime_apply_preflight_summary.get("source_of_truth_diff_count")
),
"wazuh_runtime_apply_check_mode_plan_count": _int(
runtime_apply_preflight_summary.get("check_mode_plan_count")
),
"wazuh_runtime_apply_dry_run_required_count": _int(
runtime_apply_preflight_summary.get("dry_run_required_count")
),
"wazuh_runtime_apply_rollback_plan_count": _int(
runtime_apply_preflight_summary.get("rollback_plan_count")
),
"wazuh_runtime_apply_post_apply_verifier_count": _int(
runtime_apply_preflight_summary.get("post_apply_verifier_count")
),
"wazuh_runtime_apply_km_writeback_count": _int(
runtime_apply_preflight_summary.get("km_playbook_writeback_count")
),
"wazuh_runtime_apply_owner_review_ready_count": _int(
runtime_apply_preflight_summary.get("owner_review_ready_count")
),
"wazuh_runtime_apply_runtime_gate_count": _int(
runtime_apply_preflight_summary.get("runtime_gate_count")
),
"wazuh_runtime_owner_review_target_selector_count": _int(
runtime_owner_review_summary.get("target_selector_count")
),
"wazuh_runtime_owner_review_source_diff_count": _int(
runtime_owner_review_summary.get("source_of_truth_diff_count")
),
"wazuh_runtime_owner_review_check_mode_plan_count": _int(
runtime_owner_review_summary.get("check_mode_plan_count")
),
"wazuh_runtime_owner_review_dry_run_evidence_count": _int(
runtime_owner_review_summary.get("dry_run_evidence_count")
),
"wazuh_runtime_owner_review_rollback_plan_count": _int(
runtime_owner_review_summary.get("rollback_plan_count")
),
"wazuh_runtime_owner_review_post_apply_verifier_count": _int(
runtime_owner_review_summary.get("post_apply_verifier_count")
),
"wazuh_runtime_owner_review_km_writeback_count": _int(
runtime_owner_review_summary.get("km_playbook_writeback_count")
),
"wazuh_runtime_owner_review_packet_received_count": _int(
runtime_owner_review_summary.get("owner_review_packet_received_count")
),
"wazuh_runtime_owner_review_packet_review_ready_count": _int(
runtime_owner_review_summary.get(
"owner_review_packet_review_ready_count"
)
),
"wazuh_runtime_owner_review_packet_accepted_count": _int(
runtime_owner_review_summary.get("owner_review_packet_accepted_count")
),
"wazuh_runtime_owner_review_runtime_gate_count": _int(
runtime_owner_review_summary.get("runtime_gate_count")
),
"wazuh_allowlisted_check_mode_dry_run_target_selector_count": _int(
allowlisted_dry_run_summary.get("allowlisted_target_selector_count")
),
"wazuh_allowlisted_check_mode_dry_run_check_mode_plan_count": _int(
allowlisted_dry_run_summary.get("check_mode_plan_count")
),
"wazuh_allowlisted_check_mode_dry_run_evidence_ref_count": _int(
allowlisted_dry_run_summary.get("dry_run_evidence_ref_count")
),
"wazuh_allowlisted_check_mode_dry_run_result_ref_count": _int(
allowlisted_dry_run_summary.get("dry_run_result_ref_count")
),
"wazuh_allowlisted_check_mode_dry_run_packet_received_count": _int(
allowlisted_dry_run_summary.get("dry_run_packet_received_count")
),
"wazuh_allowlisted_check_mode_dry_run_packet_accepted_count": _int(
allowlisted_dry_run_summary.get("dry_run_packet_accepted_count")
),
"wazuh_allowlisted_check_mode_dry_run_post_verifier_count": _int(
allowlisted_dry_run_summary.get("post_dry_run_verifier_count")
),
"wazuh_allowlisted_check_mode_dry_run_runtime_gate_count": _int(
allowlisted_dry_run_summary.get("runtime_gate_count")
),
"kali_active_scan_authorized_count": _int(
soc_summary.get("kali_active_scan_authorized_count")
),
"kali_execute_authorized_count": _int(
soc_summary.get("kali_execute_authorized_count")
),
"kali_finding_envelope_accepted_count": _int(
soc_summary.get("kali_finding_envelope_accepted_count")
),
"alert_formatter_contract_marker_count": _int(
alert_summary.get("source_formatter_marker_count")
),
"alert_receipt_runtime_send_count": _int(
alert_summary.get("telegram_send_authorized_count")
),
"intrusion_prevention_candidate_count": _int(
intrusion_summary.get("urgent_prevention_candidate_count")
),
"runtime_gate_count": runtime_gate_count,
},
"lanes": [
_lane(
"wazuh_registry",
(
"manager_registry_readback_accepted_runtime_gate_closed"
if _int(wazuh_summary.get("manager_registry_accepted_count"))
>= _int(wazuh_summary.get("expected_host_scope_count"))
else "blocked_waiting_manager_registry"
),
(
35
if _int(wazuh_summary.get("manager_registry_accepted_count"))
>= _int(wazuh_summary.get("expected_host_scope_count"))
else 0
),
(
"steady"
if _int(wazuh_summary.get("manager_registry_accepted_count"))
>= _int(wazuh_summary.get("expected_host_scope_count"))
else "locked"
),
"管理器清單交叉驗收已讀回runtime gate 仍關閉",
{
"expected_hosts": wazuh_summary.get("expected_host_scope_count", 0),
"transport_observed": wazuh_summary.get(
"manager_transport_established_connection_count", 0
),
"registry_accepted": wazuh_summary.get(
"manager_registry_accepted_count", 0
),
},
["docs/security/wazuh-managed-host-coverage-gate.snapshot.json"],
),
_lane(
"wazuh_live_route",
live_wazuh["status"],
0 if live_wazuh["degraded_count"] else 30,
"steady" if live_wazuh["metadata_available_count"] else "warn",
"先通過唯讀中繼資料負責人閘門與管理器清單交叉驗收",
{
"http_status": live_wazuh["http_status"],
"readonly_enabled": live_wazuh["readonly_api_enabled_count"],
"agent_total": live_wazuh["agent_total"],
"metadata_available": live_wazuh["metadata_available_count"],
"route_degraded": live_wazuh["degraded_count"],
},
["GET /api/iwooos/wazuh"],
),
_lane(
"wazuh_live_metadata_gate",
snapshots["wazuh_live_metadata_gate"].get(
"status", "blocked_waiting_live_metadata_owner_response"
),
40 if live_metadata_gate_ready else 0,
"steady" if live_metadata_gate_ready else "locked",
(
"進入 server-side env enable controlled review 與 post-enable readbacklive query 仍未授權"
if live_metadata_gate_ready
else "補齊負責人回覆、機密中繼資料、管理節點健康、唯讀範圍與啟用後讀回"
),
{
"route_readback": live_metadata_gate_summary.get(
"production_route_readback_passed_count", 0
),
"owner_accepted": live_metadata_gate_summary.get(
"live_metadata_owner_response_accepted_count", 0
),
"secret_metadata_accepted": live_metadata_gate_summary.get(
"secret_source_metadata_accepted_count",
0,
),
"manager_health_accepted": live_metadata_gate_summary.get(
"wazuh_manager_health_ref_accepted_count",
0,
),
"readonly_scope_accepted": live_metadata_gate_summary.get(
"readonly_account_scope_accepted_count",
0,
),
"post_enable_readback": live_metadata_gate_summary.get(
"post_enable_readback_passed_count", 0
),
"live_query_authorized": live_metadata_gate_summary.get(
"wazuh_api_live_query_authorized_count",
0,
),
"runtime_gate": live_metadata_gate_summary.get(
"runtime_gate_count", 0
),
},
["docs/security/wazuh-readonly-live-metadata-env-gate.snapshot.json"],
),
_lane(
"wazuh_owner_evidence_preflight",
snapshots["wazuh_owner_evidence_preflight"].get(
"status",
"owner_evidence_preflight_ready_no_runtime_action",
),
0,
"locked",
"補齊 manager registry 脫敏封包、逐主機矩陣、Dashboard API 狀態與負責人決策",
{
"required_fields": owner_evidence_preflight_summary.get(
"required_field_count", 0
),
"reviewer_checks": owner_evidence_preflight_summary.get(
"reviewer_check_count", 0
),
"outcome_lanes": owner_evidence_preflight_summary.get(
"outcome_lane_count", 0
),
"forbidden_payloads": owner_evidence_preflight_summary.get(
"forbidden_payload_count", 0
),
"owner_received": owner_evidence_preflight_summary.get(
"owner_evidence_received_count", 0
),
"owner_accepted": owner_evidence_preflight_summary.get(
"owner_evidence_accepted_count", 0
),
"registry_export_accepted": owner_evidence_preflight_summary.get(
"registry_export_accepted_count",
0,
),
"runtime_gate": owner_evidence_preflight_summary.get(
"runtime_gate_count", 0
),
},
[
"docs/security/wazuh-agent-visibility-owner-evidence-preflight.snapshot.json"
],
),
_lane(
"wazuh_runtime_controlled_apply_preflight",
snapshots["wazuh_runtime_apply_preflight"].get(
"status",
"controlled_apply_preflight_ready_no_runtime_action",
),
45
if _int(
runtime_apply_preflight_summary.get(
"controlled_apply_preflight_ready_count"
)
)
else 0,
"steady"
if _int(
runtime_apply_preflight_summary.get(
"controlled_apply_preflight_ready_count"
)
)
else "locked",
"執行前仍需 allowlisted check-mode、dry-run evidence 與 post-apply verifier readback",
{
"target_selectors": runtime_apply_preflight_summary.get(
"target_selector_count", 0
),
"source_diff": runtime_apply_preflight_summary.get(
"source_of_truth_diff_count", 0
),
"check_mode": runtime_apply_preflight_summary.get(
"check_mode_plan_count", 0
),
"dry_run": runtime_apply_preflight_summary.get(
"dry_run_required_count", 0
),
"rollback": runtime_apply_preflight_summary.get(
"rollback_plan_count", 0
),
"post_apply_verifier": runtime_apply_preflight_summary.get(
"post_apply_verifier_count", 0
),
"km_writeback": runtime_apply_preflight_summary.get(
"km_playbook_writeback_count", 0
),
"runtime_gate": runtime_apply_preflight_summary.get(
"runtime_gate_count", 0
),
},
[
"docs/security/wazuh-runtime-controlled-apply-preflight.snapshot.json"
],
),
_lane(
"wazuh_runtime_gate_owner_review",
snapshots["wazuh_runtime_owner_review"].get(
"status",
"runtime_gate_owner_review_packet_committed_no_runtime_action",
),
55
if _int(
runtime_owner_review_summary.get(
"owner_review_packet_accepted_count"
)
)
else 0,
"steady"
if _int(
runtime_owner_review_summary.get(
"owner_review_packet_accepted_count"
)
)
else "locked",
"進入 allowlisted check-mode 與 dry-run readbackruntime gate 仍關閉",
{
"target_selectors": runtime_owner_review_summary.get(
"target_selector_count", 0
),
"source_diff": runtime_owner_review_summary.get(
"source_of_truth_diff_count", 0
),
"check_mode": runtime_owner_review_summary.get(
"check_mode_plan_count", 0
),
"dry_run": runtime_owner_review_summary.get(
"dry_run_evidence_count", 0
),
"rollback": runtime_owner_review_summary.get(
"rollback_plan_count", 0
),
"post_apply_verifier": runtime_owner_review_summary.get(
"post_apply_verifier_count", 0
),
"owner_review_accepted": runtime_owner_review_summary.get(
"owner_review_packet_accepted_count", 0
),
"runtime_gate": runtime_owner_review_summary.get(
"runtime_gate_count", 0
),
},
[
"docs/security/wazuh-runtime-gate-owner-review-readback.snapshot.json"
],
),
_lane(
"wazuh_allowlisted_check_mode_dry_run",
snapshots["wazuh_allowlisted_check_mode_dry_run"].get(
"status",
"allowlisted_check_mode_dry_run_staged_no_runtime_action",
),
65
if _int(allowlisted_dry_run_summary.get("dry_run_packet_accepted_count"))
else 0,
"steady"
if _int(allowlisted_dry_run_summary.get("dry_run_packet_accepted_count"))
else "locked",
"進入 post-dry-run verifier readbackruntime gate 仍關閉",
{
"target_selectors": allowlisted_dry_run_summary.get(
"allowlisted_target_selector_count", 0
),
"check_mode": allowlisted_dry_run_summary.get(
"check_mode_plan_count", 0
),
"dry_run_evidence": allowlisted_dry_run_summary.get(
"dry_run_evidence_ref_count", 0
),
"dry_run_result": allowlisted_dry_run_summary.get(
"dry_run_result_ref_count", 0
),
"packet_accepted": allowlisted_dry_run_summary.get(
"dry_run_packet_accepted_count", 0
),
"post_dry_run_verifier": allowlisted_dry_run_summary.get(
"post_dry_run_verifier_count", 0
),
"runtime_gate": allowlisted_dry_run_summary.get(
"runtime_gate_count", 0
),
},
[
"docs/security/wazuh-allowlisted-check-mode-dry-run.snapshot.json"
],
),
_lane(
"wazuh_dashboard_api",
"degraded_api_connection_not_green",
0,
"warn",
"儀表板 API、RBAC 與 TLS 修復後重新讀回",
{
"dashboard_api_degraded": wazuh_summary.get(
"dashboard_api_degraded_observed_count", 0
),
"runtime_gate": wazuh_summary.get("runtime_gate_count", 0),
"accepted_evidence": _accepted_evidence_count(
snapshots["wazuh_runtime"]
),
},
[
"docs/security/wazuh-managed-host-coverage-gate.snapshot.json",
"docs/security/wazuh-agent-visibility-runtime-gate.snapshot.json",
],
),
_lane(
"kali_intake",
snapshots["kali_status"].get("status", "blocked_waiting_kali_scope"),
0,
"locked",
"資安觀測範圍與 finding envelope 先被接受",
{
"active_scan_authorized": soc_summary.get(
"kali_active_scan_authorized_count", 0
),
"execute_authorized": soc_summary.get(
"kali_execute_authorized_count", 0
),
"finding_envelope_accepted": soc_summary.get(
"kali_finding_envelope_accepted_count", 0
),
},
[
"docs/security/kali-integration-status.snapshot.json",
"docs/security/soc-siem-kali-wazuh-integration-control.snapshot.json",
],
),
_lane(
"alert_readability",
"contract_ready_no_send_receipt",
_alert_contract_percent(alert_summary),
"warn",
"補齊告警路由 receipt 與實發驗證",
{
"formatter_markers": alert_summary.get(
"source_formatter_marker_count", 0
),
"required_markers": alert_summary.get(
"required_output_marker_count", 0
),
"telegram_send": alert_summary.get(
"telegram_send_authorized_count", 0
),
},
["docs/security/telegram-alert-readability-guard.snapshot.json"],
),
_lane(
"owner_dispatch",
snapshots["owner_dispatch"].get(
"status", "owner_request_draft_ready_not_dispatched"
),
0,
"locked",
"正式負責人回覆封包送達與接受",
{
"request_drafts": dispatch_summary.get("request_draft_count", 0),
"request_sent": dispatch_summary.get("request_sent_count", 0),
"owner_accepted": dispatch_summary.get(
"owner_response_accepted_count", 0
),
},
["docs/security/monitoring-owner-request-draft.snapshot.json"],
),
_lane(
"intrusion_prevention",
"candidate_only_no_runtime_containment",
_int(
intrusion_summary.get("coverage_percent_after_prevention_control")
),
"warn",
"補脫敏證據參照與維護窗口",
{
"urgent_candidates": intrusion_summary.get(
"urgent_prevention_candidate_count", 0
),
"evidence_received": intrusion_summary.get(
"evidence_ref_received_count", 0
),
"containment_accepted": intrusion_summary.get(
"containment_decision_accepted_count", 0
),
},
[
"docs/security/external-host-intrusion-prevention-control.snapshot.json"
],
),
],
"boundaries": {
"active_response_authorized": False,
"active_scan_authorized": False,
"action_buttons_allowed": False,
"host_write_authorized": False,
"kali_execute_authorized": False,
"nginx_reload_authorized": False,
"raw_payload_storage_allowed": False,
"runtime_execution_authorized": False,
"secret_value_collection_allowed": False,
"telegram_send_authorized": False,
"wazuh_active_response_authorized": False,
"wazuh_api_live_query_authorized": False,
"workflow_modification_authorized": False,
"not_authorization": True,
},
"no_false_green_rules": [
"儀表板路由 200 不代表 Wazuh 清單已恢復",
"傳輸連線數不代表所有主機都已納管",
"前台可見不代表執行期已授權",
"負責人送件草案不代表負責人已接受",
"資安觀測節點健康不代表主動掃描已授權",
"告警格式合約不代表通知已實發或已取得 receipt",
"Wazuh 正式只讀路由 disabled 或退化時仍是 P0 紅燈",
"Wazuh 即時中繼資料必須先通過負責人、機密中繼資料、唯讀範圍與啟用後讀回",
"Wazuh 負責人證據預檢 ready 不代表已收件、已接受或可啟用 active response",
"Wazuh controlled apply preflight ready 不代表 runtime gate 已開或已執行修復",
"Wazuh runtime gate owner-review accepted 只代表 review readiness不代表已查 live Wazuh 或可寫主機",
"Wazuh allowlisted check-mode dry-run staged 只代表脫敏 dry-run refs 已讀回,不代表 runtime gate、live query 或 host write 已授權",
],
}
def _load_snapshot(directory: Path, key: str, filename: str) -> dict[str, Any]:
path = directory / filename
if not path.is_file():
raise FileNotFoundError(f"{path}: security snapshot not found")
with path.open(encoding="utf-8") as handle:
payload = json.load(handle)
if not isinstance(payload, dict):
raise ValueError(f"{path}: expected JSON object")
expected_schema = _EXPECTED_SCHEMAS[key]
if payload.get("schema_version") != expected_schema:
raise ValueError(f"{path}: expected schema_version={expected_schema}")
return payload
def _summary(payload: dict[str, Any]) -> dict[str, Any]:
summary = payload.get("summary")
return summary if isinstance(summary, dict) else {}
def _int(value: Any) -> int:
return value if isinstance(value, int) else 0
def _wazuh_live_summary(
payload: dict[str, Any] | None, http_status: int
) -> dict[str, Any]:
if not isinstance(payload, dict):
return {
"status": "not_checked_by_snapshot_loader",
"http_status": 0,
"degraded_count": 1,
"readonly_api_enabled_count": 0,
"agent_total": 0,
"agent_active": 0,
"agent_registry_empty_count": 0,
"agent_below_expected_minimum_count": 0,
"metadata_available_count": 0,
}
summary = payload.get("summary")
summary = summary if isinstance(summary, dict) else {}
status_text = str(payload.get("status") or "unknown")
metadata_available = status_text == "readonly_metadata_available"
return {
"status": status_text,
"http_status": http_status if isinstance(http_status, int) else 0,
"degraded_count": 0 if metadata_available else 1,
"readonly_api_enabled_count": _int(summary.get("readonly_api_enabled_count")),
"agent_total": _int(summary.get("agent_total")),
"agent_active": _int(summary.get("agent_active")),
"agent_registry_empty_count": _int(summary.get("agent_registry_empty_count")),
"agent_below_expected_minimum_count": _int(
summary.get("agent_below_expected_minimum_count")
),
"metadata_available_count": 1 if metadata_available else 0,
}
def _average_percent(*values: Any) -> int:
percents = [_int(value) for value in values if isinstance(value, int)]
return int(round(sum(percents) / len(percents))) if percents else 0
def _alert_contract_percent(summary: dict[str, Any]) -> int:
source_markers = _int(summary.get("source_formatter_marker_count"))
required_markers = max(1, _int(summary.get("required_output_marker_count")))
return min(100, int(round((source_markers / required_markers) * 100)))
def _accepted_evidence_count(payload: dict[str, Any]) -> int:
evidence = payload.get("required_evidence_before_green")
if not isinstance(evidence, list):
return 0
return sum(
1
for item in evidence
if isinstance(item, dict) and item.get("accepted") is True
)
def _lane(
lane_id: str,
status_text: Any,
completion_percent: int,
tone: str,
next_gate: str,
metrics: dict[str, Any],
source_refs: list[str],
) -> dict[str, Any]:
return {
"lane_id": lane_id,
"status": str(status_text),
"completion_percent": completion_percent,
"tone": tone,
"next_gate": next_gate,
"metrics": {key: _int(value) for key, value in metrics.items()},
"source_refs": source_refs,
}
def _max_summary_count(
snapshots: dict[str, dict[str, Any]],
*keys: str,
) -> int:
return max(
(
_int(_summary(payload).get(key))
for payload in snapshots.values()
for key in keys
),
default=0,
)
def _require_runtime_boundaries(snapshots: dict[str, dict[str, Any]]) -> None:
for name, payload in snapshots.items():
summary = _summary(payload)
if _int(summary.get("runtime_gate_count")) != 0:
raise ValueError(f"{name}: runtime_gate_count must remain 0")
for key in (
"owner_response_accepted_count",
"wazuh_active_response_enabled_count",
"active_response_enabled_count",
"active_scan_authorized_count",
"kali_active_scan_authorized_count",
"telegram_send_authorized_count",
"host_write_authorized_count",
"secret_value_collection_allowed_count",
"wazuh_api_live_query_authorized_count",
"wazuh_active_response_authorized_count",
"active_response_authorized_count",
"registry_export_accepted_count",
"owner_evidence_accepted_count",
"post_enable_readback_passed_count",
):
if key in summary and _int(summary.get(key)) != 0:
raise ValueError(f"{name}: {key} must remain 0")
boundaries = payload.get("execution_boundaries")
if isinstance(boundaries, dict):
invalid = sorted(
key
for key in _FALSE_BOUNDARY_KEYS
if key in boundaries and boundaries.get(key) is not False
)
if invalid:
raise ValueError(
f"{name}: execution boundaries must remain false: {invalid}"
)