Compare commits

...

3 Commits

Author SHA1 Message Date
ogt
96ef71736d Merge remote-tracking branch 'gitea/main' into codex/delivery-workbench-release-20260626-ffsync 2026-06-27 00:13:56 +08:00
Your Name
10a925bab6 feat(iwooos): expose Wazuh live metadata gate readback
All checks were successful
Code Review / ai-code-review (push) Successful in 16s
CD Pipeline / tests (push) Successful in 1m45s
CD Pipeline / build-and-deploy (push) Successful in 5m8s
CD Pipeline / post-deploy-checks (push) Successful in 1m35s
2026-06-27 00:11:54 +08:00
AWOOOI CD
bfecd87c04 chore(cd): deploy b7045a4 [skip ci] 2026-06-26 16:10:52 +00:00
10 changed files with 717 additions and 37 deletions

View File

@@ -23,6 +23,9 @@ from src.services.iwooos_security_control_coverage import (
from src.services.iwooos_wazuh_readonly_status import (
load_iwooos_wazuh_readonly_status,
)
from src.services.iwooos_wazuh_live_metadata_gate import (
load_latest_iwooos_wazuh_live_metadata_gate,
)
from src.services.public_redaction import redact_public_lan_topology
@@ -44,6 +47,38 @@ async def get_iwooos_wazuh_readonly_status_v1() -> JSONResponse:
return await _wazuh_readonly_status()
@router.get(
"/api/v1/iwooos/wazuh-live-metadata-gate",
response_model=dict[str, Any],
summary="取得 Wazuh 即時中繼資料負責人閘門讀回",
description=(
"讀取已提交的 Wazuh 即時中繼資料負責人閘門,並附上 Wazuh 正式只讀路由的"
"公開安全彙總。此端點不讀機密明文、不查主機、不保存原始 Wazuh 載荷、"
"不啟用主動回應、不改 K8s / ArgoCD / Docker / Nginx / firewall。"
),
)
async def get_iwooos_wazuh_live_metadata_gate() -> dict[str, Any]:
"""回傳 Wazuh 即時中繼資料啟用前負責人閘門只讀狀態。"""
try:
wazuh_result = await load_iwooos_wazuh_readonly_status()
payload = await asyncio.to_thread(
load_latest_iwooos_wazuh_live_metadata_gate,
wazuh_live_status=wazuh_result.payload,
wazuh_live_http_status=wazuh_result.http_status,
)
return redact_public_lan_topology(payload)
except FileNotFoundError as exc:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=str(exc),
) from exc
except (json.JSONDecodeError, ValueError) as exc:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"IwoooS Wazuh 即時中繼資料閘門無效:{exc}",
) from exc
@router.get(
"/api/v1/iwooos/runtime-security-readback",
response_model=dict[str, Any],

View File

@@ -20,6 +20,7 @@ _SNAPSHOT_FILES = {
"owner_gap": "s4-9-owner-response-gap-audit.snapshot.json",
"wazuh_coverage": "wazuh-managed-host-coverage-gate.snapshot.json",
"wazuh_runtime": "wazuh-agent-visibility-runtime-gate.snapshot.json",
"wazuh_live_metadata_gate": "wazuh-readonly-live-metadata-env-gate.snapshot.json",
"kali_status": "kali-integration-status.snapshot.json",
"soc_control": "soc-siem-kali-wazuh-integration-control.snapshot.json",
"alert_readability": "telegram-alert-readability-guard.snapshot.json",
@@ -31,6 +32,7 @@ _EXPECTED_SCHEMAS = {
"owner_gap": "s4_9_owner_response_gap_audit_v1",
"wazuh_coverage": "wazuh_managed_host_coverage_gate_v1",
"wazuh_runtime": "wazuh_agent_visibility_runtime_gate_v1",
"wazuh_live_metadata_gate": "iwooos_wazuh_readonly_live_metadata_env_gate_v1",
"kali_status": "kali_integration_status_v1",
"soc_control": "soc_siem_kali_wazuh_integration_control_v1",
"alert_readability": "telegram_alert_readability_guard_v1",
@@ -70,6 +72,7 @@ def load_latest_iwooos_runtime_security_readback(
owner_gap_summary = _summary(snapshots["owner_gap"])
wazuh_summary = _summary(snapshots["wazuh_coverage"])
live_metadata_gate_summary = _summary(snapshots["wazuh_live_metadata_gate"])
soc_summary = _summary(snapshots["soc_control"])
alert_summary = _summary(snapshots["alert_readability"])
dispatch_summary = _summary(snapshots["owner_dispatch"])
@@ -92,7 +95,7 @@ def load_latest_iwooos_runtime_security_readback(
"source_refs": source_refs,
"summary": {
"source_snapshot_count": len(source_refs),
"p0_lane_count": 7,
"p0_lane_count": 8,
"control_plane_visibility_percent": _average_percent(
soc_summary.get("coverage_percent_after_soc_integration_control"),
intrusion_summary.get("coverage_percent_after_prevention_control"),
@@ -118,6 +121,24 @@ def load_latest_iwooos_runtime_security_readback(
"wazuh_live_below_expected_count": live_wazuh["agent_below_expected_minimum_count"],
"wazuh_live_metadata_available_count": live_wazuh["metadata_available_count"],
"wazuh_live_status": live_wazuh["status"],
"wazuh_live_metadata_gate_owner_accepted_count": _int(
live_metadata_gate_summary.get("live_metadata_owner_response_accepted_count")
),
"wazuh_live_metadata_gate_secret_source_accepted_count": _int(
live_metadata_gate_summary.get("secret_source_metadata_accepted_count")
),
"wazuh_live_metadata_gate_manager_health_accepted_count": _int(
live_metadata_gate_summary.get("wazuh_manager_health_ref_accepted_count")
),
"wazuh_live_metadata_gate_readonly_scope_accepted_count": _int(
live_metadata_gate_summary.get("readonly_account_scope_accepted_count")
),
"wazuh_live_metadata_gate_post_enable_readback_count": _int(
live_metadata_gate_summary.get("post_enable_readback_passed_count")
),
"wazuh_live_metadata_gate_live_query_authorized_count": _int(
live_metadata_gate_summary.get("wazuh_api_live_query_authorized_count")
),
"kali_active_scan_authorized_count": _int(soc_summary.get("kali_active_scan_authorized_count")),
"kali_execute_authorized_count": _int(soc_summary.get("kali_execute_authorized_count")),
"kali_finding_envelope_accepted_count": _int(soc_summary.get("kali_finding_envelope_accepted_count")),
@@ -132,7 +153,7 @@ def load_latest_iwooos_runtime_security_readback(
"blocked_waiting_manager_registry",
0,
"locked",
"manager_registry_cross_check",
"管理器清單交叉驗收",
{
"expected_hosts": wazuh_summary.get("expected_host_scope_count", 0),
"transport_observed": wazuh_summary.get("manager_transport_established_connection_count", 0),
@@ -145,7 +166,7 @@ def load_latest_iwooos_runtime_security_readback(
live_wazuh["status"],
0 if live_wazuh["degraded_count"] else 30,
"steady" if live_wazuh["metadata_available_count"] else "warn",
"enable_readonly_metadata_owner_gate_and_manager_registry_cross_check",
"先通過唯讀中繼資料負責人閘門與管理器清單交叉驗收",
{
"http_status": live_wazuh["http_status"],
"readonly_enabled": live_wazuh["readonly_api_enabled_count"],
@@ -155,12 +176,42 @@ def load_latest_iwooos_runtime_security_readback(
},
["GET /api/iwooos/wazuh"],
),
_lane(
"wazuh_live_metadata_gate",
snapshots["wazuh_live_metadata_gate"].get("status", "blocked_waiting_live_metadata_owner_response"),
0,
"locked",
"補齊負責人回覆、機密中繼資料、管理節點健康、唯讀範圍與啟用後讀回",
{
"route_readback": live_metadata_gate_summary.get("production_route_readback_passed_count", 0),
"owner_accepted": live_metadata_gate_summary.get("live_metadata_owner_response_accepted_count", 0),
"secret_metadata_accepted": live_metadata_gate_summary.get(
"secret_source_metadata_accepted_count",
0,
),
"manager_health_accepted": live_metadata_gate_summary.get(
"wazuh_manager_health_ref_accepted_count",
0,
),
"readonly_scope_accepted": live_metadata_gate_summary.get(
"readonly_account_scope_accepted_count",
0,
),
"post_enable_readback": live_metadata_gate_summary.get("post_enable_readback_passed_count", 0),
"live_query_authorized": live_metadata_gate_summary.get(
"wazuh_api_live_query_authorized_count",
0,
),
"runtime_gate": live_metadata_gate_summary.get("runtime_gate_count", 0),
},
["docs/security/wazuh-readonly-live-metadata-env-gate.snapshot.json"],
),
_lane(
"wazuh_dashboard_api",
"degraded_api_connection_not_green",
0,
"warn",
"dashboard_api_rbac_tls_repair_readback",
"儀表板 API、RBAC 與 TLS 修復後重新讀回",
{
"dashboard_api_degraded": wazuh_summary.get("dashboard_api_degraded_observed_count", 0),
"runtime_gate": wazuh_summary.get("runtime_gate_count", 0),
@@ -176,7 +227,7 @@ def load_latest_iwooos_runtime_security_readback(
snapshots["kali_status"].get("status", "blocked_waiting_kali_scope"),
0,
"locked",
"kali_scope_and_finding_envelope_accepted",
"資安觀測範圍與 finding envelope 先被接受",
{
"active_scan_authorized": soc_summary.get("kali_active_scan_authorized_count", 0),
"execute_authorized": soc_summary.get("kali_execute_authorized_count", 0),
@@ -192,7 +243,7 @@ def load_latest_iwooos_runtime_security_readback(
"contract_ready_no_send_receipt",
_alert_contract_percent(alert_summary),
"warn",
"alert_route_receipt_available",
"補齊告警路由 receipt 與實發驗證",
{
"formatter_markers": alert_summary.get("source_formatter_marker_count", 0),
"required_markers": alert_summary.get("required_output_marker_count", 0),
@@ -205,7 +256,7 @@ def load_latest_iwooos_runtime_security_readback(
snapshots["owner_dispatch"].get("status", "owner_request_draft_ready_not_dispatched"),
0,
"locked",
"owner_response_packet_delivery",
"正式負責人回覆封包送達與接受",
{
"request_drafts": dispatch_summary.get("request_draft_count", 0),
"request_sent": dispatch_summary.get("request_sent_count", 0),
@@ -218,7 +269,7 @@ def load_latest_iwooos_runtime_security_readback(
"candidate_only_no_runtime_containment",
_int(intrusion_summary.get("coverage_percent_after_prevention_control")),
"warn",
"redacted_evidence_refs_and_maintenance_window",
"補脫敏證據參照與維護窗口",
{
"urgent_candidates": intrusion_summary.get("urgent_prevention_candidate_count", 0),
"evidence_received": intrusion_summary.get("evidence_ref_received_count", 0),
@@ -244,13 +295,14 @@ def load_latest_iwooos_runtime_security_readback(
"not_authorization": True,
},
"no_false_green_rules": [
"dashboard_route_200_is_not_wazuh_registry_recovery",
"transport_count_is_not_full_host_management",
"ui_visible_is_not_runtime_authorization",
"owner_request_draft_is_not_owner_acceptance",
"kali_health_is_not_active_scan_authorization",
"alert_format_contract_is_not_telegram_send_receipt",
"wazuh_live_route_disabled_or_degraded_is_p0_not_green",
"儀表板路由 200 不代表 Wazuh 清單已恢復",
"傳輸連線數不代表所有主機都已納管",
"前台可見不代表執行期已授權",
"負責人送件草案不代表負責人已接受",
"資安觀測節點健康不代表主動掃描已授權",
"告警格式合約不代表通知已實發或已取得 receipt",
"Wazuh 正式只讀路由 disabled 或退化時仍是 P0 紅燈",
"Wazuh 即時中繼資料必須先通過負責人、機密中繼資料、唯讀範圍與啟用後讀回",
],
}
@@ -367,6 +419,9 @@ def _require_runtime_boundaries(snapshots: dict[str, dict[str, Any]]) -> None:
"telegram_send_authorized_count",
"host_write_authorized_count",
"secret_value_collection_allowed_count",
"wazuh_api_live_query_authorized_count",
"wazuh_active_response_authorized_count",
"post_enable_readback_passed_count",
):
if key in summary and _int(summary.get(key)) != 0:
raise ValueError(f"{name}: {key} must remain 0")

View File

@@ -0,0 +1,280 @@
"""
IwoooS Wazuh live metadata owner gate readback.
This module only exposes committed gate metadata plus public-safe Wazuh route
aggregate status. It never reads secret values, never queries hosts, and never
authorizes Wazuh active response, scans, restarts, reloads, or host writes.
"""
from __future__ import annotations
import json
from pathlib import Path
from typing import Any
from src.services.snapshot_paths import default_security_dir
_DEFAULT_SECURITY_DIR = default_security_dir(Path(__file__))
_SNAPSHOT_FILE = "wazuh-readonly-live-metadata-env-gate.snapshot.json"
_EXPECTED_SCHEMA = "iwooos_wazuh_readonly_live_metadata_env_gate_v1"
_REQUIRED_FALSE_BOUNDARIES = {
"argocd_sync_authorized",
"docker_restart_authorized",
"firewall_change_authorized",
"host_write_authorized",
"k8s_secret_patch_authorized",
"kali_active_scan_authorized",
"nginx_gateway_workaround_authorized",
"production_deploy_authorized",
"raw_wazuh_payload_storage_allowed",
"repo_write_authorized",
"runtime_execution_authorized",
"secret_value_collection_allowed",
"wazuh_active_response_authorized",
"wazuh_api_live_query_authorized",
}
def load_latest_iwooos_wazuh_live_metadata_gate(
security_dir: Path | None = None,
wazuh_live_status: dict[str, Any] | None = None,
wazuh_live_http_status: int = 0,
) -> dict[str, Any]:
"""Load the committed Wazuh live metadata owner gate as a public-safe payload."""
directory = security_dir or _DEFAULT_SECURITY_DIR
snapshot = _load_snapshot(directory)
_require_boundaries(snapshot)
summary = _summary(snapshot)
live_route = _live_route_summary(wazuh_live_status, wazuh_live_http_status)
merged_summary = {
"server_side_env_key_count": _int(summary.get("server_side_env_key_count")),
"required_owner_field_count": _int(summary.get("required_owner_field_count")),
"reviewer_check_count": _int(summary.get("reviewer_check_count")),
"outcome_lane_count": _int(summary.get("outcome_lane_count")),
"blocked_action_count": _int(summary.get("blocked_action_count")),
"production_route_readback_passed_count": _int(summary.get("production_route_readback_passed_count")),
"live_metadata_owner_response_received_count": _int(
summary.get("live_metadata_owner_response_received_count")
),
"live_metadata_owner_response_accepted_count": _int(
summary.get("live_metadata_owner_response_accepted_count")
),
"secret_source_metadata_accepted_count": _int(summary.get("secret_source_metadata_accepted_count")),
"wazuh_manager_health_ref_accepted_count": _int(
summary.get("wazuh_manager_health_ref_accepted_count")
),
"readonly_account_scope_accepted_count": _int(summary.get("readonly_account_scope_accepted_count")),
"post_enable_readback_passed_count": _int(summary.get("post_enable_readback_passed_count")),
"wazuh_api_live_query_authorized_count": _int(summary.get("wazuh_api_live_query_authorized_count")),
"wazuh_active_response_authorized_count": _int(summary.get("wazuh_active_response_authorized_count")),
"host_write_authorized_count": _int(summary.get("host_write_authorized_count")),
"runtime_gate_count": _int(summary.get("runtime_gate_count")),
"wazuh_live_route_http_status": live_route["http_status"],
"wazuh_live_route_degraded_count": live_route["degraded_count"],
"wazuh_live_readonly_api_enabled_count": live_route["readonly_api_enabled_count"],
"wazuh_live_agent_total": live_route["agent_total"],
"wazuh_live_metadata_available_count": live_route["metadata_available_count"],
"wazuh_live_status": live_route["status"],
}
return {
"schema_version": "iwooos_wazuh_live_metadata_gate_readback_v1",
"status": snapshot.get("status", "blocked_waiting_live_metadata_owner_response"),
"mode": "committed_snapshot_readback_with_public_safe_wazuh_route_metadata",
"source_refs": [f"docs/security/{_SNAPSHOT_FILE}", "GET /api/iwooos/wazuh"],
"summary": merged_summary,
"items": _items(merged_summary),
"boundary_markers": _boundary_markers(merged_summary),
"boundaries": {
"runtime_execution_authorized": False,
"secret_value_collection_allowed": False,
"raw_wazuh_payload_storage_allowed": False,
"wazuh_api_live_query_authorized": False,
"wazuh_active_response_authorized": False,
"host_write_authorized": False,
"kali_active_scan_authorized": False,
"k8s_secret_patch_authorized": False,
"argocd_sync_authorized": False,
"docker_restart_authorized": False,
"nginx_gateway_workaround_authorized": False,
"firewall_change_authorized": False,
"not_authorization": True,
},
"no_false_green_rules": [
"正式路由 200 不是即時查詢授權",
"Wazuh 儀表板可見不是 manager registry 已驗收",
"機密來源只能交付中繼資料,不得交付明文、片段或雜湊",
"live metadata query、active response、host write 與 Kali active scan 是不同閘門",
"owner response accepted、post-enable readback 與 runtime gate 仍全部維持 0",
],
}
def _load_snapshot(directory: Path) -> dict[str, Any]:
path = directory / _SNAPSHOT_FILE
if not path.is_file():
raise FileNotFoundError(f"{path}: Wazuh 即時中繼資料閘門快照不存在")
with path.open(encoding="utf-8") as handle:
payload = json.load(handle)
if not isinstance(payload, dict):
raise ValueError(f"{path}: expected JSON object")
if payload.get("schema_version") != _EXPECTED_SCHEMA:
raise ValueError(f"{path}: expected schema_version={_EXPECTED_SCHEMA}")
return payload
def _summary(payload: dict[str, Any]) -> dict[str, Any]:
summary = payload.get("summary")
return summary if isinstance(summary, dict) else {}
def _int(value: Any) -> int:
return value if isinstance(value, int) else 0
def _live_route_summary(payload: dict[str, Any] | None, http_status: int) -> dict[str, Any]:
if not isinstance(payload, dict):
return {
"status": "not_checked_by_snapshot_loader",
"http_status": 0,
"degraded_count": 1,
"readonly_api_enabled_count": 0,
"agent_total": 0,
"metadata_available_count": 0,
}
summary = _summary(payload)
status_text = str(payload.get("status") or "unknown")
metadata_available = status_text == "readonly_metadata_available"
return {
"status": status_text,
"http_status": http_status if isinstance(http_status, int) else 0,
"degraded_count": 0 if metadata_available else 1,
"readonly_api_enabled_count": _int(summary.get("readonly_api_enabled_count")),
"agent_total": _int(summary.get("agent_total")),
"metadata_available_count": 1 if metadata_available else 0,
}
def _items(summary: dict[str, Any]) -> list[dict[str, Any]]:
return [
_item(
"release_readback",
"ENV-1",
"route_readback_passed",
"steady" if summary["production_route_readback_passed_count"] == 1 else "warn",
{"route_readback": summary["production_route_readback_passed_count"]},
),
_item(
"server_env_owner",
"ENV-2",
"waiting_owner_response",
"locked",
{
"owner_received": summary["live_metadata_owner_response_received_count"],
"owner_accepted": summary["live_metadata_owner_response_accepted_count"],
},
),
_item(
"secret_metadata",
"ENV-3",
"metadata_only_waiting_acceptance",
"locked",
{"secret_metadata_accepted": summary["secret_source_metadata_accepted_count"]},
),
_item(
"manager_health",
"ENV-4",
"waiting_manager_health_ref",
"warn",
{
"manager_health_accepted": summary["wazuh_manager_health_ref_accepted_count"],
"live_route_degraded": summary["wazuh_live_route_degraded_count"],
},
),
_item(
"readonly_scope",
"ENV-5",
"waiting_readonly_scope_ref",
"warn",
{"readonly_scope_accepted": summary["readonly_account_scope_accepted_count"]},
),
_item(
"post_enable_readback",
"ENV-6",
"waiting_post_enable_readback",
"locked",
{
"post_enable_readback": summary["post_enable_readback_passed_count"],
"live_query_authorized": summary["wazuh_api_live_query_authorized_count"],
"runtime_gate": summary["runtime_gate_count"],
},
),
]
def _item(
item_id: str,
check: str,
state_key: str,
tone: str,
metrics: dict[str, int],
) -> dict[str, Any]:
return {
"item_id": item_id,
"check": check,
"state_key": state_key,
"tone": tone,
"metrics": metrics,
}
def _boundary_markers(summary: dict[str, Any]) -> list[str]:
return [
f"正式路由讀回={summary['production_route_readback_passed_count']}",
f"負責人回覆接受={summary['live_metadata_owner_response_accepted_count']}",
f"機密來源中繼資料接受={summary['secret_source_metadata_accepted_count']}",
f"管理節點健康參照接受={summary['wazuh_manager_health_ref_accepted_count']}",
f"唯讀帳號範圍接受={summary['readonly_account_scope_accepted_count']}",
f"啟用後讀回={summary['post_enable_readback_passed_count']}",
f"Wazuh 即時查詢授權={summary['wazuh_api_live_query_authorized_count']}",
f"Wazuh 主動回應授權={summary['wazuh_active_response_authorized_count']}",
f"主機寫入授權={summary['host_write_authorized_count']}",
f"執行期閘門={summary['runtime_gate_count']}",
"機密明文收集=false",
"原始 Wazuh 載荷保存=false",
"K8s secret 手動修改=false",
"ArgoCD 手動同步=false",
"Docker 重啟=false",
"Nginx 或 gateway 繞路=false",
"防火牆變更=false",
"資安觀測節點主動掃描=false",
"不是執行授權=true",
]
def _require_boundaries(payload: dict[str, Any]) -> None:
summary = _summary(payload)
for key in (
"live_metadata_owner_response_accepted_count",
"secret_source_metadata_accepted_count",
"wazuh_manager_health_ref_accepted_count",
"readonly_account_scope_accepted_count",
"post_enable_readback_passed_count",
"wazuh_api_live_query_authorized_count",
"wazuh_active_response_authorized_count",
"host_write_authorized_count",
"runtime_gate_count",
):
if _int(summary.get(key)) != 0:
raise ValueError(f"Wazuh 即時中繼資料閘門 summary.{key} 必須維持 0")
boundaries = payload.get("execution_boundaries")
if not isinstance(boundaries, dict):
raise ValueError("Wazuh 即時中繼資料閘門 execution_boundaries 缺失")
for key in _REQUIRED_FALSE_BOUNDARIES:
if boundaries.get(key) is not False:
raise ValueError(f"Wazuh 即時中繼資料閘門 execution_boundaries.{key} 必須維持 false")
if boundaries.get("not_authorization") is not True:
raise ValueError("Wazuh 即時中繼資料閘門 not_authorization 必須維持 true")

View File

@@ -20,8 +20,8 @@ def test_iwooos_runtime_security_readback_preserves_zero_runtime_gates() -> None
assert payload["schema_version"] == "iwooos_runtime_security_readback_v1"
assert payload["status"] == "blocked_waiting_owner_evidence_and_runtime_gates"
assert payload["summary"]["source_snapshot_count"] == 8
assert payload["summary"]["p0_lane_count"] == 7
assert payload["summary"]["source_snapshot_count"] == 9
assert payload["summary"]["p0_lane_count"] == 8
assert payload["summary"]["runtime_gate_count"] == 0
assert payload["summary"]["owner_response_received_count"] == 0
assert payload["summary"]["owner_response_accepted_count"] == 0
@@ -30,6 +30,12 @@ def test_iwooos_runtime_security_readback_preserves_zero_runtime_gates() -> None
assert payload["summary"]["wazuh_live_route_degraded_count"] == 1
assert payload["summary"]["wazuh_live_readonly_api_enabled_count"] == 0
assert payload["summary"]["wazuh_live_metadata_available_count"] == 0
assert payload["summary"]["wazuh_live_metadata_gate_owner_accepted_count"] == 0
assert payload["summary"]["wazuh_live_metadata_gate_secret_source_accepted_count"] == 0
assert payload["summary"]["wazuh_live_metadata_gate_manager_health_accepted_count"] == 0
assert payload["summary"]["wazuh_live_metadata_gate_readonly_scope_accepted_count"] == 0
assert payload["summary"]["wazuh_live_metadata_gate_post_enable_readback_count"] == 0
assert payload["summary"]["wazuh_live_metadata_gate_live_query_authorized_count"] == 0
assert payload["summary"]["kali_active_scan_authorized_count"] == 0
assert payload["summary"]["kali_execute_authorized_count"] == 0
assert payload["summary"]["alert_receipt_runtime_send_count"] == 0
@@ -46,6 +52,7 @@ def test_iwooos_runtime_security_readback_lanes_are_candidate_only() -> None:
assert lane_ids == {
"wazuh_registry",
"wazuh_live_route",
"wazuh_live_metadata_gate",
"wazuh_dashboard_api",
"kali_intake",
"alert_readability",
@@ -58,6 +65,7 @@ def test_iwooos_runtime_security_readback_lanes_are_candidate_only() -> None:
assert any(lane["completion_percent"] > 0 for lane in payload["lanes"])
assert all(lane["lane_id"] != "wazuh_registry" or lane["completion_percent"] == 0 for lane in payload["lanes"])
assert all(lane["lane_id"] != "wazuh_live_route" or lane["metrics"]["route_degraded"] == 1 for lane in payload["lanes"])
assert all(lane["lane_id"] != "wazuh_live_metadata_gate" or lane["completion_percent"] == 0 for lane in payload["lanes"])
def test_iwooos_runtime_security_readback_api_is_public_safe(monkeypatch) -> None:
@@ -76,6 +84,7 @@ def test_iwooos_runtime_security_readback_api_is_public_safe(monkeypatch) -> Non
assert data["summary"]["wazuh_live_route_http_status"] == 200
assert data["summary"]["wazuh_live_route_degraded_count"] == 1
assert data["summary"]["wazuh_live_metadata_available_count"] == 0
assert data["summary"]["wazuh_live_metadata_gate_live_query_authorized_count"] == 0
assert data["boundaries"]["secret_value_collection_allowed"] is False
assert "192.168.0." not in response.text
assert "工作視窗" not in response.text
@@ -123,3 +132,40 @@ def test_iwooos_runtime_security_readback_api_includes_live_wazuh_empty_registry
assert data["summary"]["wazuh_live_route_degraded_count"] == 1
assert data["summary"]["runtime_gate_count"] == 0
assert "token-value" not in response.text
def test_iwooos_wazuh_live_metadata_gate_api_is_public_safe(monkeypatch) -> None:
monkeypatch.delenv("IWOOOS_WAZUH_READONLY_ENABLED", raising=False)
monkeypatch.delenv("WAZUH_API_BASE_URL", raising=False)
monkeypatch.delenv("WAZUH_API_USERNAME", raising=False)
monkeypatch.delenv("WAZUH_API_PASSWORD", raising=False)
response = _client().get("/api/v1/iwooos/wazuh-live-metadata-gate")
assert response.status_code == 200
data = response.json()
assert data["schema_version"] == "iwooos_wazuh_live_metadata_gate_readback_v1"
assert data["status"] == "blocked_waiting_live_metadata_owner_response"
assert data["summary"]["production_route_readback_passed_count"] == 1
assert data["summary"]["live_metadata_owner_response_accepted_count"] == 0
assert data["summary"]["secret_source_metadata_accepted_count"] == 0
assert data["summary"]["readonly_account_scope_accepted_count"] == 0
assert data["summary"]["post_enable_readback_passed_count"] == 0
assert data["summary"]["wazuh_api_live_query_authorized_count"] == 0
assert data["summary"]["wazuh_active_response_authorized_count"] == 0
assert data["summary"]["host_write_authorized_count"] == 0
assert data["summary"]["runtime_gate_count"] == 0
assert data["summary"]["wazuh_live_route_http_status"] == 200
assert data["summary"]["wazuh_live_route_degraded_count"] == 1
assert data["summary"]["wazuh_live_status"] == "disabled_waiting_iwooos_wazuh_owner_gate"
assert data["boundaries"]["secret_value_collection_allowed"] is False
assert data["boundaries"]["wazuh_api_live_query_authorized"] is False
assert data["boundaries"]["wazuh_active_response_authorized"] is False
assert data["boundaries"]["host_write_authorized"] is False
assert data["boundaries"]["not_authorization"] is True
assert len(data["items"]) == 6
assert any(marker == "正式路由讀回=1" for marker in data["boundary_markers"])
assert "192.168.0." not in response.text
assert "工作視窗" not in response.text
assert "批准!繼續" not in response.text
assert "WAZUH_API_PASSWORD" not in response.text

View File

@@ -20284,7 +20284,7 @@
},
"runtimeSecurityReadback": {
"eyebrow": "IwoooS Runtime 資安讀回",
"title": "條 P0 資安線先接到同一張讀回板",
"title": "條 P0 資安線先接到同一張讀回板",
"subtitle": "這張板讀取後端彙整的只讀快照,並附上 Wazuh 正式只讀路由的公開安全 aggregate 讀回;它不保存 raw Wazuh payload、不啟動掃描、不送訊息、不改主機。",
"statusLabel": "讀回狀態",
"statusDetail": "讀回成功只代表 IwoooS 能看見目前的證據邊界runtime 寫入、主動回應、掃描、重啟、Nginx reload、workflow 修改與機密操作仍全部關閉。",
@@ -20315,6 +20315,10 @@
"label": "Wazuh live",
"detail": "正式只讀路由若 disabled、空清單或低於預期首屏直接顯示警示。"
},
"metadataGate": {
"label": "中繼資料閘門",
"detail": "即時中繼資料查詢仍需負責人、機密中繼資料、唯讀範圍與啟用後讀回。"
},
"ownerAccepted": {
"label": "負責人驗收",
"detail": "收到 / 接受都必須由正式 owner response 證明。"
@@ -20337,6 +20341,10 @@
"title": "Wazuh 正式只讀路由",
"body": "直接讀正式路由的公開安全 aggregatedisabled、misconfigured、empty、below expected 都是 P0 紅燈,不會被 route 200 蓋過。"
},
"wazuh_live_metadata_gate": {
"title": "Wazuh 即時中繼資料閘門",
"body": "正式路由讀回後仍必須補負責人回覆、機密來源中繼資料、管理節點健康、唯讀範圍與啟用後讀回;即時查詢授權維持 0。"
},
"wazuh_dashboard_api": {
"title": "Wazuh Dashboard API",
"body": "API connection / API version 還要補 readbackindex pattern 通過不能宣稱 Wazuh 全綠。"
@@ -20647,6 +20655,7 @@
"subtitle": "這張卡把 Wazuh 即時中繼資料啟用前的條件拆開:正式路由讀回、伺服端環境變數負責人、機密來源中繼資料、管理節點健康、唯讀帳號範圍與啟用後讀回都要先補齊;目前即時查詢、主動回應、主機寫入與執行期閘門都是 0。",
"checkLabel": "檢核",
"stateLabel": "狀態",
"loadingBoundary": "正在讀取只讀閘門",
"boundaryTitle": "即時中繼資料環境邊界",
"boundaryIntro": "以下鍵值固定:正式路由 200、Wazuh 已建置或 UI 可見都不能直接代表可以查 Wazuh 即時中繼資料;不得收機密明文值、不得改 K8s 機密、不得用 Nginx 或主機操作繞過釋出閘門。",
"summary": {
@@ -20667,6 +20676,19 @@
"detail": "Wazuh API 即時查詢授權目前維持 0。"
}
},
"status": {
"loading": "正在讀取 Wazuh 即時中繼資料閘門",
"failed": "Wazuh 即時中繼資料閘門尚未部署或讀取失敗",
"ready": "Wazuh 即時中繼資料閘門已讀回,但執行期仍關閉"
},
"states": {
"route_readback_passed": "路由已讀回",
"waiting_owner_response": "待負責人",
"metadata_only_waiting_acceptance": "只收中繼資料",
"waiting_manager_health_ref": "待健康參照",
"waiting_readonly_scope_ref": "待範圍參照",
"waiting_post_enable_readback": "待讀回驗證"
},
"items": {
"releaseReadback": {
"title": "正式路由已不再 404",

View File

@@ -20284,7 +20284,7 @@
},
"runtimeSecurityReadback": {
"eyebrow": "IwoooS Runtime 資安讀回",
"title": "條 P0 資安線先接到同一張讀回板",
"title": "條 P0 資安線先接到同一張讀回板",
"subtitle": "這張板讀取後端彙整的只讀快照,並附上 Wazuh 正式只讀路由的公開安全 aggregate 讀回;它不保存 raw Wazuh payload、不啟動掃描、不送訊息、不改主機。",
"statusLabel": "讀回狀態",
"statusDetail": "讀回成功只代表 IwoooS 能看見目前的證據邊界runtime 寫入、主動回應、掃描、重啟、Nginx reload、workflow 修改與機密操作仍全部關閉。",
@@ -20315,6 +20315,10 @@
"label": "Wazuh live",
"detail": "正式只讀路由若 disabled、空清單或低於預期首屏直接顯示警示。"
},
"metadataGate": {
"label": "中繼資料閘門",
"detail": "即時中繼資料查詢仍需負責人、機密中繼資料、唯讀範圍與啟用後讀回。"
},
"ownerAccepted": {
"label": "負責人驗收",
"detail": "收到 / 接受都必須由正式 owner response 證明。"
@@ -20337,6 +20341,10 @@
"title": "Wazuh 正式只讀路由",
"body": "直接讀正式路由的公開安全 aggregatedisabled、misconfigured、empty、below expected 都是 P0 紅燈,不會被 route 200 蓋過。"
},
"wazuh_live_metadata_gate": {
"title": "Wazuh 即時中繼資料閘門",
"body": "正式路由讀回後仍必須補負責人回覆、機密來源中繼資料、管理節點健康、唯讀範圍與啟用後讀回;即時查詢授權維持 0。"
},
"wazuh_dashboard_api": {
"title": "Wazuh Dashboard API",
"body": "API connection / API version 還要補 readbackindex pattern 通過不能宣稱 Wazuh 全綠。"
@@ -20647,6 +20655,7 @@
"subtitle": "這張卡把 Wazuh 即時中繼資料啟用前的條件拆開:正式路由讀回、伺服端環境變數負責人、機密來源中繼資料、管理節點健康、唯讀帳號範圍與啟用後讀回都要先補齊;目前即時查詢、主動回應、主機寫入與執行期閘門都是 0。",
"checkLabel": "檢核",
"stateLabel": "狀態",
"loadingBoundary": "正在讀取只讀閘門",
"boundaryTitle": "即時中繼資料環境邊界",
"boundaryIntro": "以下鍵值固定:正式路由 200、Wazuh 已建置或 UI 可見都不能直接代表可以查 Wazuh 即時中繼資料;不得收機密明文值、不得改 K8s 機密、不得用 Nginx 或主機操作繞過釋出閘門。",
"summary": {
@@ -20667,6 +20676,19 @@
"detail": "Wazuh API 即時查詢授權目前維持 0。"
}
},
"status": {
"loading": "正在讀取 Wazuh 即時中繼資料閘門",
"failed": "Wazuh 即時中繼資料閘門尚未部署或讀取失敗",
"ready": "Wazuh 即時中繼資料閘門已讀回,但執行期仍關閉"
},
"states": {
"route_readback_passed": "路由已讀回",
"waiting_owner_response": "待負責人",
"metadata_only_waiting_acceptance": "只收中繼資料",
"waiting_manager_health_ref": "待健康參照",
"waiting_readonly_scope_ref": "待範圍參照",
"waiting_post_enable_readback": "待讀回驗證"
},
"items": {
"releaseReadback": {
"title": "正式路由已不再 404",

View File

@@ -38,6 +38,8 @@ import {
type IwoooSRuntimeSecurityReadbackResponse,
type IwoooSSecurityControlCoverageDomain,
type IwoooSSecurityControlCoverageResponse,
type IwoooSWazuhLiveMetadataGateItem,
type IwoooSWazuhLiveMetadataGateResponse,
} from '@/lib/api-client'
type PostureMetric = {
@@ -326,7 +328,15 @@ type WazuhReadonlyStatusResponse = {
}
type RuntimeSecurityReadbackSummaryItem = {
key: 'controlPlane' | 'runtimeAcceptance' | 'wazuhRegistry' | 'wazuhLive' | 'ownerAccepted' | 'kaliRuntime' | 'runtimeGate'
key:
| 'controlPlane'
| 'runtimeAcceptance'
| 'wazuhRegistry'
| 'wazuhLive'
| 'metadataGate'
| 'ownerAccepted'
| 'kaliRuntime'
| 'runtimeGate'
value: string
icon: typeof ShieldCheck
tone: 'steady' | 'warn' | 'locked'
@@ -2275,13 +2285,6 @@ const wazuhIntrusionReadbackBoundaries = [
'not_authorization=true',
] as const
const wazuhLiveMetadataEnvGateSummary = [
{ key: 'routeReadback', value: '1', icon: Route, tone: 'steady' },
{ key: 'owner', value: '0', icon: ClipboardCheck, tone: 'locked' },
{ key: 'secretMeta', value: '0', icon: Lock, tone: 'locked' },
{ key: 'liveQuery', value: '0', icon: Radar, tone: 'locked' },
] as const
const wazuhReleaseGateSummary = [
{ key: 'source', value: '1', icon: CheckCircle2, tone: 'steady' },
{ key: 'branch', value: '1', icon: GitBranch, tone: 'steady' },
@@ -2325,6 +2328,15 @@ const wazuhLiveMetadataEnvGateItems: WazuhLiveMetadataEnvGateItem[] = [
{ key: 'postEnable', check: 'ENV-6', state: '待讀回驗證', icon: SearchCheck, tone: 'locked' },
] as const
const wazuhLiveMetadataEnvGateItemKeyById: Record<IwoooSWazuhLiveMetadataGateItem['item_id'], WazuhLiveMetadataEnvGateItem['key']> = {
release_readback: 'releaseReadback',
server_env_owner: 'serverEnv',
secret_metadata: 'secretMetadata',
manager_health: 'managerHealth',
readonly_scope: 'readonlyScope',
post_enable_readback: 'postEnable',
}
const wazuhLiveMetadataEnvGateBoundaries = [
'wazuh_live_metadata_env_gate_visible=true',
'wazuh_live_metadata_env_gate_server_side_env_key_count=4',
@@ -8130,6 +8142,12 @@ function IwoooSRuntimeSecurityReadbackBoard() {
icon: Route,
tone: summary && summary.wazuh_live_metadata_available_count === 1 && summary.wazuh_live_route_degraded_count === 0 ? 'steady' : 'warn',
},
{
key: 'metadataGate',
value: summary ? String(summary.wazuh_live_metadata_gate_live_query_authorized_count) : '...',
icon: ClipboardCheck,
tone: 'locked',
},
{
key: 'ownerAccepted',
value: summary ? `${summary.owner_response_accepted_count}/${summary.owner_response_received_count}` : '...',
@@ -8882,6 +8900,86 @@ function IwoooSWazuhReleaseGateBoard() {
function IwoooSWazuhLiveMetadataEnvGateBoard() {
const t = useTranslations('iwooos.wazuhLiveMetadataEnvGate')
const textWrap = { overflowWrap: 'anywhere' as const, wordBreak: 'break-word' as const }
const [data, setData] = useState<IwoooSWazuhLiveMetadataGateResponse | null>(null)
const [loading, setLoading] = useState(true)
const [failed, setFailed] = useState(false)
useEffect(() => {
let mounted = true
async function loadGate() {
setLoading(true)
setFailed(false)
try {
const payload = await apiClient.getIwoooSWazuhLiveMetadataGate()
if (mounted) {
setData(payload)
}
} catch {
if (mounted) {
setData(null)
setFailed(true)
}
} finally {
if (mounted) {
setLoading(false)
}
}
}
loadGate()
return () => {
mounted = false
}
}, [])
const summary = data?.summary
const summaryItems = [
{
key: 'routeReadback',
value: summary ? String(summary.production_route_readback_passed_count) : loading ? '...' : '0',
icon: Route,
tone: summary?.production_route_readback_passed_count === 1 ? 'steady' : 'warn',
},
{
key: 'owner',
value: summary ? String(summary.live_metadata_owner_response_accepted_count) : loading ? '...' : '0',
icon: ClipboardCheck,
tone: 'locked',
},
{
key: 'secretMeta',
value: summary ? String(summary.secret_source_metadata_accepted_count) : loading ? '...' : '0',
icon: Lock,
tone: 'locked',
},
{
key: 'liveQuery',
value: summary ? String(summary.wazuh_api_live_query_authorized_count) : loading ? '...' : '0',
icon: Radar,
tone: 'locked',
},
] as const
const gateItems = data?.items?.length
? data.items.map(item => {
const key = wazuhLiveMetadataEnvGateItemKeyById[item.item_id]
const fallback = wazuhLiveMetadataEnvGateItems.find(candidate => candidate.key === key)
return {
key,
check: item.check,
state: t(`states.${item.state_key}` as never),
icon: fallback?.icon ?? FileWarning,
tone: item.tone,
}
})
: wazuhLiveMetadataEnvGateItems
const boundaryMarkers = data?.boundary_markers?.length
? data.boundary_markers
: loading
? [t('loadingBoundary')]
: wazuhLiveMetadataEnvGateBoundaries
const statusText = loading ? t('status.loading') : failed ? t('status.failed') : t('status.ready')
const statusTone: 'steady' | 'warn' | 'locked' = loading || failed ? 'warn' : 'locked'
return (
<section
@@ -8899,10 +8997,14 @@ function IwoooSWazuhLiveMetadataEnvGateBoard() {
<p style={{ fontSize: 12, color: '#47683f', margin: '6px 0 0', lineHeight: 1.55, ...textWrap }}>
{t('subtitle')}
</p>
<div style={{ marginTop: 8, display: 'inline-flex', alignItems: 'center', gap: 8, color: toneColors[statusTone], fontSize: 12, fontWeight: 700, ...textWrap }}>
<ToneDot tone={statusTone} />
{statusText}
</div>
</div>
<div style={{ display: 'grid', gridTemplateColumns: 'repeat(auto-fit, minmax(126px, 1fr))', gap: 8 }}>
{wazuhLiveMetadataEnvGateSummary.map(item => {
{summaryItems.map(item => {
const Icon = item.icon
return (
<div key={item.key} style={{ border: '0.5px solid #d2e4cf', borderRadius: 8, padding: 12, background: '#fff' }}>
@@ -8930,7 +9032,7 @@ function IwoooSWazuhLiveMetadataEnvGateBoard() {
gap: 10,
}}
>
{wazuhLiveMetadataEnvGateItems.map(item => {
{gateItems.map(item => {
const Icon = item.icon
return (
<div
@@ -8986,7 +9088,7 @@ function IwoooSWazuhLiveMetadataEnvGateBoard() {
{t('boundaryIntro')}
</p>
<div style={{ display: 'grid', gridTemplateColumns: 'repeat(auto-fit, minmax(230px, 1fr))', gap: 6 }}>
{wazuhLiveMetadataEnvGateBoundaries.map(item => (
{boundaryMarkers.map(item => (
<code
key={item}
style={{

View File

@@ -101,6 +101,7 @@ export interface IwoooSRuntimeSecurityReadbackLane {
lane_id:
| 'wazuh_registry'
| 'wazuh_live_route'
| 'wazuh_live_metadata_gate'
| 'wazuh_dashboard_api'
| 'kali_intake'
| 'alert_readability'
@@ -141,6 +142,12 @@ export interface IwoooSRuntimeSecurityReadbackResponse {
wazuh_live_below_expected_count: number
wazuh_live_metadata_available_count: number
wazuh_live_status: string
wazuh_live_metadata_gate_owner_accepted_count: number
wazuh_live_metadata_gate_secret_source_accepted_count: number
wazuh_live_metadata_gate_manager_health_accepted_count: number
wazuh_live_metadata_gate_readonly_scope_accepted_count: number
wazuh_live_metadata_gate_post_enable_readback_count: number
wazuh_live_metadata_gate_live_query_authorized_count: number
kali_active_scan_authorized_count: number
kali_execute_authorized_count: number
kali_finding_envelope_accepted_count: number
@@ -154,6 +161,55 @@ export interface IwoooSRuntimeSecurityReadbackResponse {
no_false_green_rules: string[]
}
export interface IwoooSWazuhLiveMetadataGateItem {
item_id:
| 'release_readback'
| 'server_env_owner'
| 'secret_metadata'
| 'manager_health'
| 'readonly_scope'
| 'post_enable_readback'
check: string
state_key: string
tone: IwoooSRuntimeSecurityReadbackTone
metrics: Record<string, number>
}
export interface IwoooSWazuhLiveMetadataGateResponse {
schema_version: 'iwooos_wazuh_live_metadata_gate_readback_v1'
status: string
mode: string
source_refs: string[]
summary: {
server_side_env_key_count: number
required_owner_field_count: number
reviewer_check_count: number
outcome_lane_count: number
blocked_action_count: number
production_route_readback_passed_count: number
live_metadata_owner_response_received_count: number
live_metadata_owner_response_accepted_count: number
secret_source_metadata_accepted_count: number
wazuh_manager_health_ref_accepted_count: number
readonly_account_scope_accepted_count: number
post_enable_readback_passed_count: number
wazuh_api_live_query_authorized_count: number
wazuh_active_response_authorized_count: number
host_write_authorized_count: number
runtime_gate_count: number
wazuh_live_route_http_status: number
wazuh_live_route_degraded_count: number
wazuh_live_readonly_api_enabled_count: number
wazuh_live_agent_total: number
wazuh_live_metadata_available_count: number
wazuh_live_status: string
}
items: IwoooSWazuhLiveMetadataGateItem[]
boundary_markers: string[]
boundaries: Record<string, boolean>
no_false_green_rules: string[]
}
export interface IwoooSSecurityControlCoverageDomain {
domain_id:
| 'high_value_asset_control'
@@ -259,6 +315,11 @@ export const apiClient = {
return handleResponse<IwoooSRuntimeSecurityReadbackResponse>(res)
},
async getIwoooSWazuhLiveMetadataGate() {
const res = await fetch(`${API_BASE_URL}/iwooos/wazuh-live-metadata-gate`, { cache: 'no-store' })
return handleResponse<IwoooSWazuhLiveMetadataGateResponse>(res)
},
async getIwoooSSecurityControlCoverage() {
const res = await fetch(`${API_BASE_URL}/iwooos/security-control-coverage`, { cache: 'no-store' })
return handleResponse<IwoooSSecurityControlCoverageResponse>(res)

View File

@@ -41,7 +41,7 @@ resources:
images:
- name: 192.168.0.110:5000/library/api:IMAGE_TAG_PLACEHOLDER
newName: 192.168.0.110:5000/awoooi/api
newTag: fe74d8616e409f7c4485e7aea8194f198037034b
newTag: b7045a412c8be3d67490fb64790b12c0380fa23c
- name: 192.168.0.110:5000/library/web:IMAGE_TAG_PLACEHOLDER
newName: 192.168.0.110:5000/awoooi/web
newTag: fe74d8616e409f7c4485e7aea8194f198037034b
newTag: b7045a412c8be3d67490fb64790b12c0380fa23c

View File

@@ -21,6 +21,7 @@ TAIPEI = timezone(timedelta(hours=8))
NEXT_ROUTE_PATH = Path("apps/web/src/app/api/iwooos/wazuh/route.ts")
BACKEND_ROUTE_PATH = Path("apps/api/src/api/v1/iwooos.py")
BACKEND_SERVICE_PATH = Path("apps/api/src/services/iwooos_wazuh_readonly_status.py")
BACKEND_LIVE_METADATA_GATE_SERVICE_PATH = Path("apps/api/src/services/iwooos_wazuh_live_metadata_gate.py")
PUBLIC_PAGE_PATH = Path("apps/web/src/app/[locale]/iwooos/page.tsx")
PUBLIC_COMPONENT_ROOT = Path("apps/web/src/components/iwooos")
@@ -84,6 +85,22 @@ BACKEND_SERVICE_REQUIRED_TOKENS = [
"agent_visibility_no_false_green_count",
]
BACKEND_LIVE_METADATA_GATE_REQUIRED_TOKENS = [
"iwooos_wazuh_live_metadata_gate_readback_v1",
"committed_snapshot_readback_with_public_safe_wazuh_route_metadata",
"boundary_markers",
"secret_value_collection_allowed",
"raw_wazuh_payload_storage_allowed",
"wazuh_api_live_query_authorized",
"wazuh_active_response_authorized",
"host_write_authorized",
"runtime_gate_count",
"not_authorization",
"正式路由讀回",
"機密明文收集=false",
"原始 Wazuh 載荷保存=false",
]
FORBIDDEN_PATTERNS = [
ForbiddenPattern(
@@ -184,6 +201,7 @@ def collect_forbidden_matches(root: Path) -> list[dict[str, Any]]:
("route", NEXT_ROUTE_PATH),
("route", BACKEND_ROUTE_PATH),
("route", BACKEND_SERVICE_PATH),
("route", BACKEND_LIVE_METADATA_GATE_SERVICE_PATH),
]
targets.extend(("public_ui", path) for path in collect_public_ui_files(root))
@@ -214,12 +232,17 @@ def build_report(root: Path, generated_at: str | None = None) -> dict[str, Any]:
next_route = root / NEXT_ROUTE_PATH
backend_route = root / BACKEND_ROUTE_PATH
backend_service = root / BACKEND_SERVICE_PATH
backend_live_metadata_gate_service = root / BACKEND_LIVE_METADATA_GATE_SERVICE_PATH
next_route_present = next_route.exists()
backend_route_present = backend_route.exists()
backend_service_present = backend_service.exists()
backend_live_metadata_gate_service_present = backend_live_metadata_gate_service.exists()
next_route_text = read_text(next_route) if next_route_present else ""
backend_route_text = read_text(backend_route) if backend_route_present else ""
backend_service_text = read_text(backend_service) if backend_service_present else ""
backend_live_metadata_gate_service_text = (
read_text(backend_live_metadata_gate_service) if backend_live_metadata_gate_service_present else ""
)
public_ui_files = collect_public_ui_files(root)
missing_required_tokens = {
NEXT_ROUTE_PATH.as_posix(): collect_missing_required_tokens(next_route_text, ROUTE_REQUIRED_TOKENS),
@@ -227,6 +250,10 @@ def build_report(root: Path, generated_at: str | None = None) -> dict[str, Any]:
BACKEND_SERVICE_PATH.as_posix(): collect_missing_required_tokens(
backend_service_text, BACKEND_SERVICE_REQUIRED_TOKENS
),
BACKEND_LIVE_METADATA_GATE_SERVICE_PATH.as_posix(): collect_missing_required_tokens(
backend_live_metadata_gate_service_text,
BACKEND_LIVE_METADATA_GATE_REQUIRED_TOKENS,
),
}
missing_required_token_count = sum(len(tokens) for tokens in missing_required_tokens.values())
forbidden_matches = collect_forbidden_matches(root)
@@ -236,7 +263,14 @@ def build_report(root: Path, generated_at: str | None = None) -> dict[str, Any]:
"generated_at": generated_at or now_iso(),
"status": (
"pass"
if next_route_present and backend_route_present and not missing_required_token_count and not forbidden_matches
if (
next_route_present
and backend_route_present
and backend_service_present
and backend_live_metadata_gate_service_present
and not missing_required_token_count
and not forbidden_matches
)
else "blocked"
),
"mode": "repo_source_scan_no_runtime_no_secret_collection",
@@ -244,33 +278,52 @@ def build_report(root: Path, generated_at: str | None = None) -> dict[str, Any]:
NEXT_ROUTE_PATH.as_posix(),
BACKEND_ROUTE_PATH.as_posix(),
BACKEND_SERVICE_PATH.as_posix(),
BACKEND_LIVE_METADATA_GATE_SERVICE_PATH.as_posix(),
],
"guarded_public_ui_paths": [path.as_posix() for path in public_ui_files],
"required_route_tokens": {
NEXT_ROUTE_PATH.as_posix(): ROUTE_REQUIRED_TOKENS,
BACKEND_ROUTE_PATH.as_posix(): BACKEND_REQUIRED_TOKENS,
BACKEND_SERVICE_PATH.as_posix(): BACKEND_SERVICE_REQUIRED_TOKENS,
BACKEND_LIVE_METADATA_GATE_SERVICE_PATH.as_posix(): BACKEND_LIVE_METADATA_GATE_REQUIRED_TOKENS,
},
"forbidden_pattern_ids": [pattern.pattern_id for pattern in FORBIDDEN_PATTERNS],
"summary": {
"route_present_count": int(next_route_present) + int(backend_route_present) + int(backend_service_present),
"route_present_count": (
int(next_route_present)
+ int(backend_route_present)
+ int(backend_service_present)
+ int(backend_live_metadata_gate_service_present)
),
"live_metadata_gate_service_present_count": 1 if backend_live_metadata_gate_service_present else 0,
"next_route_present_count": 1 if next_route_present else 0,
"backend_route_present_count": 1 if backend_route_present else 0,
"backend_service_present_count": 1 if backend_service_present else 0,
"public_ui_file_count": len(public_ui_files),
"required_token_count": len(ROUTE_REQUIRED_TOKENS)
+ len(BACKEND_REQUIRED_TOKENS)
+ len(BACKEND_SERVICE_REQUIRED_TOKENS),
+ len(BACKEND_SERVICE_REQUIRED_TOKENS)
+ len(BACKEND_LIVE_METADATA_GATE_REQUIRED_TOKENS),
"missing_required_token_count": missing_required_token_count,
"forbidden_pattern_count": len(FORBIDDEN_PATTERNS),
"forbidden_match_count": len(forbidden_matches),
"readonly_api_default_closed_count": sum(
"IWOOOS_WAZUH_READONLY_ENABLED" in text
for text in [next_route_text, backend_route_text, backend_service_text]
for text in [
next_route_text,
backend_route_text,
backend_service_text,
backend_live_metadata_gate_service_text,
]
),
"server_side_env_required_count": sum(
token in text
for text in [next_route_text, backend_route_text, backend_service_text]
for text in [
next_route_text,
backend_route_text,
backend_service_text,
backend_live_metadata_gate_service_text,
]
for token in ["WAZUH_API_BASE_URL", "WAZUH_API_USERNAME", "WAZUH_API_PASSWORD"]
),
"tls_disable_match_count": sum(
@@ -325,6 +378,10 @@ def validate(root: Path) -> None:
errors.append(f"{BACKEND_ROUTE_PATH.as_posix()}: Wazuh FastAPI 相容 route 不存在")
if report["summary"]["backend_service_present_count"] != 1:
errors.append(f"{BACKEND_SERVICE_PATH.as_posix()}: Wazuh FastAPI 只讀 service 不存在")
if report["summary"]["live_metadata_gate_service_present_count"] != 1:
errors.append(
f"{BACKEND_LIVE_METADATA_GATE_SERVICE_PATH.as_posix()}: Wazuh 即時中繼資料閘門 service 不存在"
)
for path, tokens in report["missing_required_tokens"].items():
for token in tokens:
errors.append(f"{path}: 缺少必要只讀邊界 token {token!r}")