Files
awoooi/apps/api/src/services/iwooos_runtime_security_readback.py
Your Name 10a925bab6
All checks were successful
Code Review / ai-code-review (push) Successful in 16s
CD Pipeline / tests (push) Successful in 1m45s
CD Pipeline / build-and-deploy (push) Successful in 5m8s
CD Pipeline / post-deploy-checks (push) Successful in 1m35s
feat(iwooos): expose Wazuh live metadata gate readback
2026-06-27 00:11:54 +08:00

436 lines
20 KiB
Python

"""
IwoooS runtime security readback.
Loads committed security snapshots and exposes a public-safe, read-only runtime
security board. This module never queries Wazuh, Kali, hosts, Docker, Nginx,
firewalls, databases, Telegram, or secrets.
"""
from __future__ import annotations
import json
from pathlib import Path
from typing import Any
from src.services.snapshot_paths import default_security_dir
_DEFAULT_SECURITY_DIR = default_security_dir(Path(__file__))
_SNAPSHOT_FILES = {
"owner_gap": "s4-9-owner-response-gap-audit.snapshot.json",
"wazuh_coverage": "wazuh-managed-host-coverage-gate.snapshot.json",
"wazuh_runtime": "wazuh-agent-visibility-runtime-gate.snapshot.json",
"wazuh_live_metadata_gate": "wazuh-readonly-live-metadata-env-gate.snapshot.json",
"kali_status": "kali-integration-status.snapshot.json",
"soc_control": "soc-siem-kali-wazuh-integration-control.snapshot.json",
"alert_readability": "telegram-alert-readability-guard.snapshot.json",
"owner_dispatch": "monitoring-owner-request-draft.snapshot.json",
"intrusion_prevention": "external-host-intrusion-prevention-control.snapshot.json",
}
_EXPECTED_SCHEMAS = {
"owner_gap": "s4_9_owner_response_gap_audit_v1",
"wazuh_coverage": "wazuh_managed_host_coverage_gate_v1",
"wazuh_runtime": "wazuh_agent_visibility_runtime_gate_v1",
"wazuh_live_metadata_gate": "iwooos_wazuh_readonly_live_metadata_env_gate_v1",
"kali_status": "kali_integration_status_v1",
"soc_control": "soc_siem_kali_wazuh_integration_control_v1",
"alert_readability": "telegram_alert_readability_guard_v1",
"owner_dispatch": "monitoring_owner_request_draft_v1",
"intrusion_prevention": "external_host_intrusion_prevention_control_v1",
}
_FALSE_BOUNDARY_KEYS = {
"active_scan_authorized",
"alertmanager_reload_authorized",
"auto_block_authorized",
"credentialed_scan_authorized",
"firewall_change_authorized",
"host_write_authorized",
"kali_execute_authorized",
"kali_scan_authorized",
"nginx_reload_authorized",
"production_write_authorized",
"runtime_execution_authorized",
"runtime_gate_open",
"secret_value_collection_allowed",
"telegram_send_authorized",
"wazuh_active_response_authorized",
"wazuh_api_live_query_authorized",
}
def load_latest_iwooos_runtime_security_readback(
security_dir: Path | None = None,
wazuh_live_status: dict[str, Any] | None = None,
wazuh_live_http_status: int = 0,
) -> dict[str, Any]:
"""Load and normalize the current IwoooS runtime security readback."""
directory = security_dir or _DEFAULT_SECURITY_DIR
snapshots = {key: _load_snapshot(directory, key, filename) for key, filename in _SNAPSHOT_FILES.items()}
_require_runtime_boundaries(snapshots)
owner_gap_summary = _summary(snapshots["owner_gap"])
wazuh_summary = _summary(snapshots["wazuh_coverage"])
live_metadata_gate_summary = _summary(snapshots["wazuh_live_metadata_gate"])
soc_summary = _summary(snapshots["soc_control"])
alert_summary = _summary(snapshots["alert_readability"])
dispatch_summary = _summary(snapshots["owner_dispatch"])
intrusion_summary = _summary(snapshots["intrusion_prevention"])
live_wazuh = _wazuh_live_summary(wazuh_live_status, wazuh_live_http_status)
source_refs = [f"docs/security/{filename}" for filename in _SNAPSHOT_FILES.values()]
runtime_gate_count = _max_summary_count(
snapshots,
"runtime_gate_count",
"active_response_authorized_count",
"kali_active_scan_authorized_count",
"telegram_send_authorized_count",
)
return {
"schema_version": "iwooos_runtime_security_readback_v1",
"status": "blocked_waiting_owner_evidence_and_runtime_gates",
"mode": "committed_snapshot_readback_with_public_safe_wazuh_route_metadata",
"source_refs": source_refs,
"summary": {
"source_snapshot_count": len(source_refs),
"p0_lane_count": 8,
"control_plane_visibility_percent": _average_percent(
soc_summary.get("coverage_percent_after_soc_integration_control"),
intrusion_summary.get("coverage_percent_after_prevention_control"),
_alert_contract_percent(alert_summary),
),
"actual_runtime_acceptance_percent": 0,
"owner_response_received_count": _int(owner_gap_summary.get("owner_response_received_count")),
"owner_response_accepted_count": _int(owner_gap_summary.get("owner_response_accepted_count")),
"redacted_evidence_refs_received_count": 0,
"request_sent_count": _int(dispatch_summary.get("request_sent_count")),
"wazuh_expected_host_scope_count": _int(wazuh_summary.get("expected_host_scope_count")),
"wazuh_manager_registry_accepted_count": _int(wazuh_summary.get("manager_registry_accepted_count")),
"wazuh_transport_observed_count": _int(wazuh_summary.get("manager_transport_established_connection_count")),
"wazuh_dashboard_api_degraded_observed_count": _int(
wazuh_summary.get("dashboard_api_degraded_observed_count")
),
"wazuh_live_route_http_status": live_wazuh["http_status"],
"wazuh_live_route_degraded_count": live_wazuh["degraded_count"],
"wazuh_live_readonly_api_enabled_count": live_wazuh["readonly_api_enabled_count"],
"wazuh_live_agent_total": live_wazuh["agent_total"],
"wazuh_live_agent_active": live_wazuh["agent_active"],
"wazuh_live_registry_empty_count": live_wazuh["agent_registry_empty_count"],
"wazuh_live_below_expected_count": live_wazuh["agent_below_expected_minimum_count"],
"wazuh_live_metadata_available_count": live_wazuh["metadata_available_count"],
"wazuh_live_status": live_wazuh["status"],
"wazuh_live_metadata_gate_owner_accepted_count": _int(
live_metadata_gate_summary.get("live_metadata_owner_response_accepted_count")
),
"wazuh_live_metadata_gate_secret_source_accepted_count": _int(
live_metadata_gate_summary.get("secret_source_metadata_accepted_count")
),
"wazuh_live_metadata_gate_manager_health_accepted_count": _int(
live_metadata_gate_summary.get("wazuh_manager_health_ref_accepted_count")
),
"wazuh_live_metadata_gate_readonly_scope_accepted_count": _int(
live_metadata_gate_summary.get("readonly_account_scope_accepted_count")
),
"wazuh_live_metadata_gate_post_enable_readback_count": _int(
live_metadata_gate_summary.get("post_enable_readback_passed_count")
),
"wazuh_live_metadata_gate_live_query_authorized_count": _int(
live_metadata_gate_summary.get("wazuh_api_live_query_authorized_count")
),
"kali_active_scan_authorized_count": _int(soc_summary.get("kali_active_scan_authorized_count")),
"kali_execute_authorized_count": _int(soc_summary.get("kali_execute_authorized_count")),
"kali_finding_envelope_accepted_count": _int(soc_summary.get("kali_finding_envelope_accepted_count")),
"alert_formatter_contract_marker_count": _int(alert_summary.get("source_formatter_marker_count")),
"alert_receipt_runtime_send_count": _int(alert_summary.get("telegram_send_authorized_count")),
"intrusion_prevention_candidate_count": _int(intrusion_summary.get("urgent_prevention_candidate_count")),
"runtime_gate_count": runtime_gate_count,
},
"lanes": [
_lane(
"wazuh_registry",
"blocked_waiting_manager_registry",
0,
"locked",
"管理器清單交叉驗收",
{
"expected_hosts": wazuh_summary.get("expected_host_scope_count", 0),
"transport_observed": wazuh_summary.get("manager_transport_established_connection_count", 0),
"registry_accepted": wazuh_summary.get("manager_registry_accepted_count", 0),
},
["docs/security/wazuh-managed-host-coverage-gate.snapshot.json"],
),
_lane(
"wazuh_live_route",
live_wazuh["status"],
0 if live_wazuh["degraded_count"] else 30,
"steady" if live_wazuh["metadata_available_count"] else "warn",
"先通過唯讀中繼資料負責人閘門與管理器清單交叉驗收",
{
"http_status": live_wazuh["http_status"],
"readonly_enabled": live_wazuh["readonly_api_enabled_count"],
"agent_total": live_wazuh["agent_total"],
"metadata_available": live_wazuh["metadata_available_count"],
"route_degraded": live_wazuh["degraded_count"],
},
["GET /api/iwooos/wazuh"],
),
_lane(
"wazuh_live_metadata_gate",
snapshots["wazuh_live_metadata_gate"].get("status", "blocked_waiting_live_metadata_owner_response"),
0,
"locked",
"補齊負責人回覆、機密中繼資料、管理節點健康、唯讀範圍與啟用後讀回",
{
"route_readback": live_metadata_gate_summary.get("production_route_readback_passed_count", 0),
"owner_accepted": live_metadata_gate_summary.get("live_metadata_owner_response_accepted_count", 0),
"secret_metadata_accepted": live_metadata_gate_summary.get(
"secret_source_metadata_accepted_count",
0,
),
"manager_health_accepted": live_metadata_gate_summary.get(
"wazuh_manager_health_ref_accepted_count",
0,
),
"readonly_scope_accepted": live_metadata_gate_summary.get(
"readonly_account_scope_accepted_count",
0,
),
"post_enable_readback": live_metadata_gate_summary.get("post_enable_readback_passed_count", 0),
"live_query_authorized": live_metadata_gate_summary.get(
"wazuh_api_live_query_authorized_count",
0,
),
"runtime_gate": live_metadata_gate_summary.get("runtime_gate_count", 0),
},
["docs/security/wazuh-readonly-live-metadata-env-gate.snapshot.json"],
),
_lane(
"wazuh_dashboard_api",
"degraded_api_connection_not_green",
0,
"warn",
"儀表板 API、RBAC 與 TLS 修復後重新讀回",
{
"dashboard_api_degraded": wazuh_summary.get("dashboard_api_degraded_observed_count", 0),
"runtime_gate": wazuh_summary.get("runtime_gate_count", 0),
"accepted_evidence": _accepted_evidence_count(snapshots["wazuh_runtime"]),
},
[
"docs/security/wazuh-managed-host-coverage-gate.snapshot.json",
"docs/security/wazuh-agent-visibility-runtime-gate.snapshot.json",
],
),
_lane(
"kali_intake",
snapshots["kali_status"].get("status", "blocked_waiting_kali_scope"),
0,
"locked",
"資安觀測範圍與 finding envelope 先被接受",
{
"active_scan_authorized": soc_summary.get("kali_active_scan_authorized_count", 0),
"execute_authorized": soc_summary.get("kali_execute_authorized_count", 0),
"finding_envelope_accepted": soc_summary.get("kali_finding_envelope_accepted_count", 0),
},
[
"docs/security/kali-integration-status.snapshot.json",
"docs/security/soc-siem-kali-wazuh-integration-control.snapshot.json",
],
),
_lane(
"alert_readability",
"contract_ready_no_send_receipt",
_alert_contract_percent(alert_summary),
"warn",
"補齊告警路由 receipt 與實發驗證",
{
"formatter_markers": alert_summary.get("source_formatter_marker_count", 0),
"required_markers": alert_summary.get("required_output_marker_count", 0),
"telegram_send": alert_summary.get("telegram_send_authorized_count", 0),
},
["docs/security/telegram-alert-readability-guard.snapshot.json"],
),
_lane(
"owner_dispatch",
snapshots["owner_dispatch"].get("status", "owner_request_draft_ready_not_dispatched"),
0,
"locked",
"正式負責人回覆封包送達與接受",
{
"request_drafts": dispatch_summary.get("request_draft_count", 0),
"request_sent": dispatch_summary.get("request_sent_count", 0),
"owner_accepted": dispatch_summary.get("owner_response_accepted_count", 0),
},
["docs/security/monitoring-owner-request-draft.snapshot.json"],
),
_lane(
"intrusion_prevention",
"candidate_only_no_runtime_containment",
_int(intrusion_summary.get("coverage_percent_after_prevention_control")),
"warn",
"補脫敏證據參照與維護窗口",
{
"urgent_candidates": intrusion_summary.get("urgent_prevention_candidate_count", 0),
"evidence_received": intrusion_summary.get("evidence_ref_received_count", 0),
"containment_accepted": intrusion_summary.get("containment_decision_accepted_count", 0),
},
["docs/security/external-host-intrusion-prevention-control.snapshot.json"],
),
],
"boundaries": {
"active_response_authorized": False,
"active_scan_authorized": False,
"action_buttons_allowed": False,
"host_write_authorized": False,
"kali_execute_authorized": False,
"nginx_reload_authorized": False,
"raw_payload_storage_allowed": False,
"runtime_execution_authorized": False,
"secret_value_collection_allowed": False,
"telegram_send_authorized": False,
"wazuh_active_response_authorized": False,
"wazuh_api_live_query_authorized": False,
"workflow_modification_authorized": False,
"not_authorization": True,
},
"no_false_green_rules": [
"儀表板路由 200 不代表 Wazuh 清單已恢復",
"傳輸連線數不代表所有主機都已納管",
"前台可見不代表執行期已授權",
"負責人送件草案不代表負責人已接受",
"資安觀測節點健康不代表主動掃描已授權",
"告警格式合約不代表通知已實發或已取得 receipt",
"Wazuh 正式只讀路由 disabled 或退化時仍是 P0 紅燈",
"Wazuh 即時中繼資料必須先通過負責人、機密中繼資料、唯讀範圍與啟用後讀回",
],
}
def _load_snapshot(directory: Path, key: str, filename: str) -> dict[str, Any]:
path = directory / filename
if not path.is_file():
raise FileNotFoundError(f"{path}: security snapshot not found")
with path.open(encoding="utf-8") as handle:
payload = json.load(handle)
if not isinstance(payload, dict):
raise ValueError(f"{path}: expected JSON object")
expected_schema = _EXPECTED_SCHEMAS[key]
if payload.get("schema_version") != expected_schema:
raise ValueError(f"{path}: expected schema_version={expected_schema}")
return payload
def _summary(payload: dict[str, Any]) -> dict[str, Any]:
summary = payload.get("summary")
return summary if isinstance(summary, dict) else {}
def _int(value: Any) -> int:
return value if isinstance(value, int) else 0
def _wazuh_live_summary(payload: dict[str, Any] | None, http_status: int) -> dict[str, Any]:
if not isinstance(payload, dict):
return {
"status": "not_checked_by_snapshot_loader",
"http_status": 0,
"degraded_count": 1,
"readonly_api_enabled_count": 0,
"agent_total": 0,
"agent_active": 0,
"agent_registry_empty_count": 0,
"agent_below_expected_minimum_count": 0,
"metadata_available_count": 0,
}
summary = payload.get("summary")
summary = summary if isinstance(summary, dict) else {}
status_text = str(payload.get("status") or "unknown")
metadata_available = status_text == "readonly_metadata_available"
return {
"status": status_text,
"http_status": http_status if isinstance(http_status, int) else 0,
"degraded_count": 0 if metadata_available else 1,
"readonly_api_enabled_count": _int(summary.get("readonly_api_enabled_count")),
"agent_total": _int(summary.get("agent_total")),
"agent_active": _int(summary.get("agent_active")),
"agent_registry_empty_count": _int(summary.get("agent_registry_empty_count")),
"agent_below_expected_minimum_count": _int(summary.get("agent_below_expected_minimum_count")),
"metadata_available_count": 1 if metadata_available else 0,
}
def _average_percent(*values: Any) -> int:
percents = [_int(value) for value in values if isinstance(value, int)]
return int(round(sum(percents) / len(percents))) if percents else 0
def _alert_contract_percent(summary: dict[str, Any]) -> int:
source_markers = _int(summary.get("source_formatter_marker_count"))
required_markers = max(1, _int(summary.get("required_output_marker_count")))
return min(100, int(round((source_markers / required_markers) * 100)))
def _accepted_evidence_count(payload: dict[str, Any]) -> int:
evidence = payload.get("required_evidence_before_green")
if not isinstance(evidence, list):
return 0
return sum(1 for item in evidence if isinstance(item, dict) and item.get("accepted") is True)
def _lane(
lane_id: str,
status_text: Any,
completion_percent: int,
tone: str,
next_gate: str,
metrics: dict[str, Any],
source_refs: list[str],
) -> dict[str, Any]:
return {
"lane_id": lane_id,
"status": str(status_text),
"completion_percent": completion_percent,
"tone": tone,
"next_gate": next_gate,
"metrics": {key: _int(value) for key, value in metrics.items()},
"source_refs": source_refs,
}
def _max_summary_count(
snapshots: dict[str, dict[str, Any]],
*keys: str,
) -> int:
return max((_int(_summary(payload).get(key)) for payload in snapshots.values() for key in keys), default=0)
def _require_runtime_boundaries(snapshots: dict[str, dict[str, Any]]) -> None:
for name, payload in snapshots.items():
summary = _summary(payload)
if _int(summary.get("runtime_gate_count")) != 0:
raise ValueError(f"{name}: runtime_gate_count must remain 0")
for key in (
"owner_response_accepted_count",
"wazuh_active_response_enabled_count",
"active_response_enabled_count",
"active_scan_authorized_count",
"kali_active_scan_authorized_count",
"telegram_send_authorized_count",
"host_write_authorized_count",
"secret_value_collection_allowed_count",
"wazuh_api_live_query_authorized_count",
"wazuh_active_response_authorized_count",
"post_enable_readback_passed_count",
):
if key in summary and _int(summary.get(key)) != 0:
raise ValueError(f"{name}: {key} must remain 0")
boundaries = payload.get("execution_boundaries")
if isinstance(boundaries, dict):
invalid = sorted(
key for key in _FALSE_BOUNDARY_KEYS if key in boundaries and boundaries.get(key) is not False
)
if invalid:
raise ValueError(f"{name}: execution boundaries must remain false: {invalid}")