feat(iwooos): surface Wazuh live route in runtime readback
Some checks failed
Code Review / ai-code-review (push) Successful in 12s
CD Pipeline / tests (push) Successful in 1m38s
CD Pipeline / build-and-deploy (push) Successful in 4m22s
Ansible / Reboot Recovery Contract / validate (push) Has been cancelled
CD Pipeline / post-deploy-checks (push) Has been cancelled

This commit is contained in:
Your Name
2026-06-26 23:32:39 +08:00
parent 335d5f4a7b
commit 9778cc22fc
11 changed files with 443 additions and 194 deletions

View File

@@ -9,12 +9,8 @@ from __future__ import annotations
import asyncio
import json
import os
from base64 import b64encode
from typing import Any
from urllib.parse import urljoin, urlparse
import httpx
from fastapi import APIRouter, HTTPException, status
from fastapi.responses import JSONResponse
@@ -24,180 +20,18 @@ from src.services.iwooos_runtime_security_readback import (
from src.services.iwooos_security_control_coverage import (
load_latest_iwooos_security_control_coverage,
)
from src.services.iwooos_wazuh_readonly_status import (
load_iwooos_wazuh_readonly_status,
)
from src.services.public_redaction import redact_public_lan_topology
router = APIRouter(tags=["IwoooS Security"])
REQUEST_TIMEOUT_SECONDS = 5.0
def _wazuh_env() -> dict[str, str]:
return {
"enabled": os.getenv("IWOOOS_WAZUH_READONLY_ENABLED", "").strip().lower(),
"base_url": os.getenv("WAZUH_API_BASE_URL", "").strip(),
"username": os.getenv("WAZUH_API_USERNAME", "").strip(),
"password": os.getenv("WAZUH_API_PASSWORD", "").strip(),
"expected_min_agent_count": os.getenv("IWOOOS_WAZUH_EXPECTED_MIN_AGENT_COUNT", "").strip(),
}
def _expected_min_agent_count(value: str) -> int:
try:
return max(0, int(value))
except ValueError:
return 0
def _https_url(value: str) -> str | None:
parsed = urlparse(value)
if parsed.scheme != "https" or not parsed.netloc:
return None
return value.rstrip("/") + "/"
def _boundary_response(status_text: str, http_status: int = 200) -> JSONResponse:
return JSONResponse(
status_code=http_status,
content={
"schema_version": "iwooos_wazuh_readonly_status_v1",
"status": status_text,
"mode": "metadata_only_no_active_response_no_raw_payload",
"configured": False,
"summary": {
"wazuh_platform_reported_count": 1,
"readonly_api_enabled_count": 0,
"wazuh_manager_query_accepted_count": 0,
"wazuh_event_accepted_count": 0,
"host_forensics_accepted_count": 0,
"active_response_authorized_count": 0,
"host_write_authorized_count": 0,
"runtime_gate_count": 0,
"expected_min_agent_count": _expected_min_agent_count(_wazuh_env()["expected_min_agent_count"]),
"agent_registry_empty_count": 0,
"agent_below_expected_minimum_count": 0,
"agent_visibility_no_false_green_count": 1,
},
"boundaries": _boundaries(),
},
)
def _boundaries() -> dict[str, bool]:
return {
"active_response_authorized": False,
"host_write_authorized": False,
"secret_value_collection_allowed": False,
"raw_wazuh_payload_storage_allowed": False,
"agent_identity_public_display_allowed": False,
"internal_ip_public_display_allowed": False,
"not_authorization": True,
}
def _redacted_agent(agent: dict[str, Any], index: int) -> dict[str, Any]:
os_info = agent.get("os") if isinstance(agent.get("os"), dict) else {}
return {
"alias": f"agent-{index + 1:02d}",
"status": agent.get("status", "unknown"),
"os": os_info.get("platform") or os_info.get("name") or "unknown",
"last_seen_present": bool(agent.get("lastKeepAlive")),
}
def _int_or_default(value: Any, default: int) -> int:
return value if isinstance(value, int) else default
def _agent_visibility_status(agent_total: int, expected_min_agent_count: int) -> str:
if agent_total <= 0:
return "wazuh_agent_registry_empty"
if expected_min_agent_count > 0 and agent_total < expected_min_agent_count:
return "wazuh_agent_registry_below_expected"
return "readonly_metadata_available"
async def _fetch_json(client: httpx.AsyncClient, url: str, headers: dict[str, str]) -> dict[str, Any]:
response = await client.get(url, headers=headers)
response.raise_for_status()
payload = response.json()
return payload if isinstance(payload, dict) else {}
async def _wazuh_readonly_status() -> JSONResponse:
env = _wazuh_env()
if env["enabled"] != "true":
return _boundary_response("disabled_waiting_iwooos_wazuh_owner_gate")
base_url = _https_url(env["base_url"])
if not base_url or not env["username"] or not env["password"]:
return _boundary_response("misconfigured_missing_server_side_wazuh_env", 503)
try:
auth_header = b64encode(f"{env['username']}:{env['password']}".encode("utf-8")).decode("ascii")
async with httpx.AsyncClient(timeout=REQUEST_TIMEOUT_SECONDS) as client:
auth = await _fetch_json(
client,
urljoin(base_url, "security/user/authenticate"),
{"Authorization": f"Basic {auth_header}"},
)
token = (auth.get("data") or {}).get("token")
if not token:
return _boundary_response("wazuh_auth_token_missing", 502)
bearer_headers = {"Authorization": f"Bearer {token}"}
status_payload = await _fetch_json(
client,
urljoin(base_url, "agents/summary/status"),
bearer_headers,
)
agents_payload = await _fetch_json(
client,
urljoin(base_url, "agents?limit=100&select=id,status,os.name,os.platform,lastKeepAlive"),
bearer_headers,
)
except (httpx.HTTPError, ValueError):
return _boundary_response("wazuh_readonly_metadata_unavailable", 502)
connection = ((status_payload.get("data") or {}).get("connection") or {})
affected_items = ((agents_payload.get("data") or {}).get("affected_items") or [])
if not isinstance(affected_items, list):
affected_items = []
expected_min_agent_count = _expected_min_agent_count(env["expected_min_agent_count"])
agent_total = _int_or_default(connection.get("total"), len(affected_items))
agent_active = _int_or_default(connection.get("active"), 0)
agent_disconnected = _int_or_default(connection.get("disconnected"), 0)
agent_pending = _int_or_default(connection.get("pending"), 0)
agent_registry_empty = agent_total <= 0
agent_below_expected = expected_min_agent_count > 0 and agent_total < expected_min_agent_count
return JSONResponse(
content={
"schema_version": "iwooos_wazuh_readonly_status_v1",
"status": _agent_visibility_status(agent_total, expected_min_agent_count),
"mode": "metadata_only_no_active_response_no_raw_payload",
"configured": True,
"summary": {
"wazuh_platform_reported_count": 1,
"readonly_api_enabled_count": 1,
"agent_total": agent_total,
"agent_active": agent_active,
"agent_disconnected": agent_disconnected,
"agent_pending": agent_pending,
"expected_min_agent_count": expected_min_agent_count,
"agent_registry_empty_count": 1 if agent_registry_empty else 0,
"agent_below_expected_minimum_count": 1 if agent_below_expected else 0,
"agent_visibility_no_false_green_count": 1,
"wazuh_manager_query_accepted_count": 0,
"wazuh_event_accepted_count": 0,
"host_forensics_accepted_count": 0,
"active_response_authorized_count": 0,
"host_write_authorized_count": 0,
"runtime_gate_count": 0,
},
"agents": [_redacted_agent(agent, index) for index, agent in enumerate(affected_items[:20])],
"boundaries": _boundaries(),
},
)
result = await load_iwooos_wazuh_readonly_status()
return JSONResponse(status_code=result.http_status, content=result.payload)
@router.get("/api/iwooos/wazuh")
@@ -216,14 +50,20 @@ async def get_iwooos_wazuh_readonly_status_v1() -> JSONResponse:
summary="取得 IwoooS runtime security readback",
description=(
"讀取最新已提交的 IwoooS 資安只讀快照,彙總 Wazuh、Kali、SOC/SIEM、"
"告警可讀性、owner dispatch 與外部入侵防護 Gate。此端點不呼叫 Wazuh / Kali / "
"主機 / Docker / Nginx / firewall / Telegram不收集 secret不授權 runtime 寫入。"
"告警可讀性、owner dispatch 與外部入侵防護 Gate,並附上 Wazuh 只讀路由的"
"公開安全 aggregate 讀回。此端點不呼叫 Kali / 主機 / Docker / Nginx / firewall / "
"Telegram不保存 raw Wazuh payload不收集 secret不授權 runtime 寫入。"
),
)
async def get_iwooos_runtime_security_readback() -> dict[str, Any]:
"""回傳 IwoooS 資安 runtime readback 只讀總板。"""
try:
payload = await asyncio.to_thread(load_latest_iwooos_runtime_security_readback)
wazuh_result = await load_iwooos_wazuh_readonly_status()
payload = await asyncio.to_thread(
load_latest_iwooos_runtime_security_readback,
wazuh_live_status=wazuh_result.payload,
wazuh_live_http_status=wazuh_result.http_status,
)
return redact_public_lan_topology(payload)
except FileNotFoundError as exc:
raise HTTPException(

View File

@@ -60,6 +60,8 @@ _FALSE_BOUNDARY_KEYS = {
def load_latest_iwooos_runtime_security_readback(
security_dir: Path | None = None,
wazuh_live_status: dict[str, Any] | None = None,
wazuh_live_http_status: int = 0,
) -> dict[str, Any]:
"""Load and normalize the current IwoooS runtime security readback."""
directory = security_dir or _DEFAULT_SECURITY_DIR
@@ -72,6 +74,7 @@ def load_latest_iwooos_runtime_security_readback(
alert_summary = _summary(snapshots["alert_readability"])
dispatch_summary = _summary(snapshots["owner_dispatch"])
intrusion_summary = _summary(snapshots["intrusion_prevention"])
live_wazuh = _wazuh_live_summary(wazuh_live_status, wazuh_live_http_status)
source_refs = [f"docs/security/{filename}" for filename in _SNAPSHOT_FILES.values()]
runtime_gate_count = _max_summary_count(
@@ -85,11 +88,11 @@ def load_latest_iwooos_runtime_security_readback(
return {
"schema_version": "iwooos_runtime_security_readback_v1",
"status": "blocked_waiting_owner_evidence_and_runtime_gates",
"mode": "committed_snapshot_readback_only_no_runtime_query",
"mode": "committed_snapshot_readback_with_public_safe_wazuh_route_metadata",
"source_refs": source_refs,
"summary": {
"source_snapshot_count": len(source_refs),
"p0_lane_count": 6,
"p0_lane_count": 7,
"control_plane_visibility_percent": _average_percent(
soc_summary.get("coverage_percent_after_soc_integration_control"),
intrusion_summary.get("coverage_percent_after_prevention_control"),
@@ -106,6 +109,15 @@ def load_latest_iwooos_runtime_security_readback(
"wazuh_dashboard_api_degraded_observed_count": _int(
wazuh_summary.get("dashboard_api_degraded_observed_count")
),
"wazuh_live_route_http_status": live_wazuh["http_status"],
"wazuh_live_route_degraded_count": live_wazuh["degraded_count"],
"wazuh_live_readonly_api_enabled_count": live_wazuh["readonly_api_enabled_count"],
"wazuh_live_agent_total": live_wazuh["agent_total"],
"wazuh_live_agent_active": live_wazuh["agent_active"],
"wazuh_live_registry_empty_count": live_wazuh["agent_registry_empty_count"],
"wazuh_live_below_expected_count": live_wazuh["agent_below_expected_minimum_count"],
"wazuh_live_metadata_available_count": live_wazuh["metadata_available_count"],
"wazuh_live_status": live_wazuh["status"],
"kali_active_scan_authorized_count": _int(soc_summary.get("kali_active_scan_authorized_count")),
"kali_execute_authorized_count": _int(soc_summary.get("kali_execute_authorized_count")),
"kali_finding_envelope_accepted_count": _int(soc_summary.get("kali_finding_envelope_accepted_count")),
@@ -128,6 +140,21 @@ def load_latest_iwooos_runtime_security_readback(
},
["docs/security/wazuh-managed-host-coverage-gate.snapshot.json"],
),
_lane(
"wazuh_live_route",
live_wazuh["status"],
0 if live_wazuh["degraded_count"] else 30,
"steady" if live_wazuh["metadata_available_count"] else "warn",
"enable_readonly_metadata_owner_gate_and_manager_registry_cross_check",
{
"http_status": live_wazuh["http_status"],
"readonly_enabled": live_wazuh["readonly_api_enabled_count"],
"agent_total": live_wazuh["agent_total"],
"metadata_available": live_wazuh["metadata_available_count"],
"route_degraded": live_wazuh["degraded_count"],
},
["GET /api/iwooos/wazuh"],
),
_lane(
"wazuh_dashboard_api",
"degraded_api_connection_not_green",
@@ -223,6 +250,7 @@ def load_latest_iwooos_runtime_security_readback(
"owner_request_draft_is_not_owner_acceptance",
"kali_health_is_not_active_scan_authorization",
"alert_format_contract_is_not_telegram_send_receipt",
"wazuh_live_route_disabled_or_degraded_is_p0_not_green",
],
}
@@ -250,6 +278,36 @@ def _int(value: Any) -> int:
return value if isinstance(value, int) else 0
def _wazuh_live_summary(payload: dict[str, Any] | None, http_status: int) -> dict[str, Any]:
if not isinstance(payload, dict):
return {
"status": "not_checked_by_snapshot_loader",
"http_status": 0,
"degraded_count": 1,
"readonly_api_enabled_count": 0,
"agent_total": 0,
"agent_active": 0,
"agent_registry_empty_count": 0,
"agent_below_expected_minimum_count": 0,
"metadata_available_count": 0,
}
summary = payload.get("summary")
summary = summary if isinstance(summary, dict) else {}
status_text = str(payload.get("status") or "unknown")
metadata_available = status_text == "readonly_metadata_available"
return {
"status": status_text,
"http_status": http_status if isinstance(http_status, int) else 0,
"degraded_count": 0 if metadata_available else 1,
"readonly_api_enabled_count": _int(summary.get("readonly_api_enabled_count")),
"agent_total": _int(summary.get("agent_total")),
"agent_active": _int(summary.get("agent_active")),
"agent_registry_empty_count": _int(summary.get("agent_registry_empty_count")),
"agent_below_expected_minimum_count": _int(summary.get("agent_below_expected_minimum_count")),
"metadata_available_count": 1 if metadata_available else 0,
}
def _average_percent(*values: Any) -> int:
percents = [_int(value) for value in values if isinstance(value, int)]
return int(round(sum(percents) / len(percents))) if percents else 0

View File

@@ -0,0 +1,194 @@
"""
IwoooS Wazuh read-only metadata status.
This module returns public-safe aggregate metadata only. It never stores raw
Wazuh payloads, never exposes agent identity or LAN topology, and never
authorizes active response or host writes.
"""
from __future__ import annotations
import os
from base64 import b64encode
from dataclasses import dataclass
from typing import Any
from urllib.parse import urljoin, urlparse
import httpx
REQUEST_TIMEOUT_SECONDS = 5.0
@dataclass(frozen=True)
class WazuhReadonlyStatus:
payload: dict[str, Any]
http_status: int = 200
def wazuh_env() -> dict[str, str]:
return {
"enabled": os.getenv("IWOOOS_WAZUH_READONLY_ENABLED", "").strip().lower(),
"base_url": os.getenv("WAZUH_API_BASE_URL", "").strip(),
"username": os.getenv("WAZUH_API_USERNAME", "").strip(),
"password": os.getenv("WAZUH_API_PASSWORD", "").strip(),
"expected_min_agent_count": os.getenv("IWOOOS_WAZUH_EXPECTED_MIN_AGENT_COUNT", "").strip(),
}
def expected_min_agent_count(value: str) -> int:
try:
return max(0, int(value))
except ValueError:
return 0
def https_url(value: str) -> str | None:
parsed = urlparse(value)
if parsed.scheme != "https" or not parsed.netloc:
return None
return value.rstrip("/") + "/"
def boundaries() -> dict[str, bool]:
return {
"active_response_authorized": False,
"host_write_authorized": False,
"secret_value_collection_allowed": False,
"raw_wazuh_payload_storage_allowed": False,
"agent_identity_public_display_allowed": False,
"internal_ip_public_display_allowed": False,
"not_authorization": True,
}
def boundary_status(status_text: str, http_status: int = 200) -> WazuhReadonlyStatus:
return WazuhReadonlyStatus(
http_status=http_status,
payload={
"schema_version": "iwooos_wazuh_readonly_status_v1",
"status": status_text,
"mode": "metadata_only_no_active_response_no_raw_payload",
"configured": False,
"summary": {
"wazuh_platform_reported_count": 1,
"readonly_api_enabled_count": 0,
"wazuh_manager_query_accepted_count": 0,
"wazuh_event_accepted_count": 0,
"host_forensics_accepted_count": 0,
"active_response_authorized_count": 0,
"host_write_authorized_count": 0,
"runtime_gate_count": 0,
"expected_min_agent_count": expected_min_agent_count(wazuh_env()["expected_min_agent_count"]),
"agent_registry_empty_count": 0,
"agent_below_expected_minimum_count": 0,
"agent_visibility_no_false_green_count": 1,
},
"boundaries": boundaries(),
},
)
def redacted_agent(agent: dict[str, Any], index: int) -> dict[str, Any]:
os_info = agent.get("os") if isinstance(agent.get("os"), dict) else {}
return {
"alias": f"agent-{index + 1:02d}",
"status": agent.get("status", "unknown"),
"os": os_info.get("platform") or os_info.get("name") or "unknown",
"last_seen_present": bool(agent.get("lastKeepAlive")),
}
def int_or_default(value: Any, default: int) -> int:
return value if isinstance(value, int) else default
def agent_visibility_status(agent_total: int, expected_minimum: int) -> str:
if agent_total <= 0:
return "wazuh_agent_registry_empty"
if expected_minimum > 0 and agent_total < expected_minimum:
return "wazuh_agent_registry_below_expected"
return "readonly_metadata_available"
async def fetch_json(client: httpx.AsyncClient, url: str, headers: dict[str, str]) -> dict[str, Any]:
response = await client.get(url, headers=headers)
response.raise_for_status()
payload = response.json()
return payload if isinstance(payload, dict) else {}
async def load_iwooos_wazuh_readonly_status() -> WazuhReadonlyStatus:
env = wazuh_env()
if env["enabled"] != "true":
return boundary_status("disabled_waiting_iwooos_wazuh_owner_gate")
base_url = https_url(env["base_url"])
if not base_url or not env["username"] or not env["password"]:
return boundary_status("misconfigured_missing_server_side_wazuh_env", 503)
try:
auth_header = b64encode(f"{env['username']}:{env['password']}".encode("utf-8")).decode("ascii")
async with httpx.AsyncClient(timeout=REQUEST_TIMEOUT_SECONDS) as client:
auth = await fetch_json(
client,
urljoin(base_url, "security/user/authenticate"),
{"Authorization": f"Basic {auth_header}"},
)
token = (auth.get("data") or {}).get("token")
if not token:
return boundary_status("wazuh_auth_token_missing", 502)
bearer_headers = {"Authorization": f"Bearer {token}"}
status_payload = await fetch_json(
client,
urljoin(base_url, "agents/summary/status"),
bearer_headers,
)
agents_payload = await fetch_json(
client,
urljoin(base_url, "agents?limit=100&select=id,status,os.name,os.platform,lastKeepAlive"),
bearer_headers,
)
except (httpx.HTTPError, ValueError):
return boundary_status("wazuh_readonly_metadata_unavailable", 502)
connection = ((status_payload.get("data") or {}).get("connection") or {})
affected_items = ((agents_payload.get("data") or {}).get("affected_items") or [])
if not isinstance(affected_items, list):
affected_items = []
expected_minimum = expected_min_agent_count(env["expected_min_agent_count"])
agent_total = int_or_default(connection.get("total"), len(affected_items))
agent_active = int_or_default(connection.get("active"), 0)
agent_disconnected = int_or_default(connection.get("disconnected"), 0)
agent_pending = int_or_default(connection.get("pending"), 0)
agent_registry_empty = agent_total <= 0
agent_below_expected = expected_minimum > 0 and agent_total < expected_minimum
return WazuhReadonlyStatus(
payload={
"schema_version": "iwooos_wazuh_readonly_status_v1",
"status": agent_visibility_status(agent_total, expected_minimum),
"mode": "metadata_only_no_active_response_no_raw_payload",
"configured": True,
"summary": {
"wazuh_platform_reported_count": 1,
"readonly_api_enabled_count": 1,
"agent_total": agent_total,
"agent_active": agent_active,
"agent_disconnected": agent_disconnected,
"agent_pending": agent_pending,
"expected_min_agent_count": expected_minimum,
"agent_registry_empty_count": 1 if agent_registry_empty else 0,
"agent_below_expected_minimum_count": 1 if agent_below_expected else 0,
"agent_visibility_no_false_green_count": 1,
"wazuh_manager_query_accepted_count": 0,
"wazuh_event_accepted_count": 0,
"host_forensics_accepted_count": 0,
"active_response_authorized_count": 0,
"host_write_authorized_count": 0,
"runtime_gate_count": 0,
},
"agents": [redacted_agent(agent, index) for index, agent in enumerate(affected_items[:20])],
"boundaries": boundaries(),
},
)

View File

@@ -21,11 +21,15 @@ def test_iwooos_runtime_security_readback_preserves_zero_runtime_gates() -> None
assert payload["schema_version"] == "iwooos_runtime_security_readback_v1"
assert payload["status"] == "blocked_waiting_owner_evidence_and_runtime_gates"
assert payload["summary"]["source_snapshot_count"] == 8
assert payload["summary"]["p0_lane_count"] == 6
assert payload["summary"]["p0_lane_count"] == 7
assert payload["summary"]["runtime_gate_count"] == 0
assert payload["summary"]["owner_response_received_count"] == 0
assert payload["summary"]["owner_response_accepted_count"] == 0
assert payload["summary"]["wazuh_manager_registry_accepted_count"] == 0
assert payload["summary"]["wazuh_live_status"] == "not_checked_by_snapshot_loader"
assert payload["summary"]["wazuh_live_route_degraded_count"] == 1
assert payload["summary"]["wazuh_live_readonly_api_enabled_count"] == 0
assert payload["summary"]["wazuh_live_metadata_available_count"] == 0
assert payload["summary"]["kali_active_scan_authorized_count"] == 0
assert payload["summary"]["kali_execute_authorized_count"] == 0
assert payload["summary"]["alert_receipt_runtime_send_count"] == 0
@@ -41,6 +45,7 @@ def test_iwooos_runtime_security_readback_lanes_are_candidate_only() -> None:
lane_ids = {lane["lane_id"] for lane in payload["lanes"]}
assert lane_ids == {
"wazuh_registry",
"wazuh_live_route",
"wazuh_dashboard_api",
"kali_intake",
"alert_readability",
@@ -52,16 +57,69 @@ def test_iwooos_runtime_security_readback_lanes_are_candidate_only() -> None:
assert all(lane["source_refs"] for lane in payload["lanes"])
assert any(lane["completion_percent"] > 0 for lane in payload["lanes"])
assert all(lane["lane_id"] != "wazuh_registry" or lane["completion_percent"] == 0 for lane in payload["lanes"])
assert all(lane["lane_id"] != "wazuh_live_route" or lane["metrics"]["route_degraded"] == 1 for lane in payload["lanes"])
def test_iwooos_runtime_security_readback_api_is_public_safe() -> None:
def test_iwooos_runtime_security_readback_api_is_public_safe(monkeypatch) -> None:
monkeypatch.delenv("IWOOOS_WAZUH_READONLY_ENABLED", raising=False)
monkeypatch.delenv("WAZUH_API_BASE_URL", raising=False)
monkeypatch.delenv("WAZUH_API_USERNAME", raising=False)
monkeypatch.delenv("WAZUH_API_PASSWORD", raising=False)
response = _client().get("/api/v1/iwooos/runtime-security-readback")
assert response.status_code == 200
data = response.json()
assert data["schema_version"] == "iwooos_runtime_security_readback_v1"
assert data["summary"]["runtime_gate_count"] == 0
assert data["summary"]["wazuh_live_status"] == "disabled_waiting_iwooos_wazuh_owner_gate"
assert data["summary"]["wazuh_live_route_http_status"] == 200
assert data["summary"]["wazuh_live_route_degraded_count"] == 1
assert data["summary"]["wazuh_live_metadata_available_count"] == 0
assert data["boundaries"]["secret_value_collection_allowed"] is False
assert "192.168.0." not in response.text
assert "工作視窗" not in response.text
assert "批准!繼續" not in response.text
def test_iwooos_runtime_security_readback_api_includes_live_wazuh_empty_registry(monkeypatch) -> None:
import httpx
monkeypatch.setenv("IWOOOS_WAZUH_READONLY_ENABLED", "true")
monkeypatch.setenv("WAZUH_API_BASE_URL", "https://wazuh.example.test:55000")
monkeypatch.setenv("WAZUH_API_USERNAME", "readonly")
monkeypatch.setenv("WAZUH_API_PASSWORD", "placeholder")
monkeypatch.setenv("IWOOOS_WAZUH_EXPECTED_MIN_AGENT_COUNT", "6")
def handler(request: httpx.Request) -> httpx.Response:
if request.url.path == "/security/user/authenticate":
return httpx.Response(200, json={"data": {"token": "token-value"}})
if request.url.path == "/agents/summary/status":
return httpx.Response(
200,
json={"data": {"connection": {"total": 0, "active": 0, "disconnected": 0, "pending": 0}}},
)
if request.url.path == "/agents":
return httpx.Response(200, json={"data": {"affected_items": []}})
return httpx.Response(404)
transport = httpx.MockTransport(handler)
original_async_client = httpx.AsyncClient
def client_factory(*args, **kwargs):
kwargs["transport"] = transport
return original_async_client(*args, **kwargs)
monkeypatch.setattr(httpx, "AsyncClient", client_factory)
response = _client().get("/api/v1/iwooos/runtime-security-readback")
assert response.status_code == 200
data = response.json()
assert data["summary"]["wazuh_live_status"] == "wazuh_agent_registry_empty"
assert data["summary"]["wazuh_live_readonly_api_enabled_count"] == 1
assert data["summary"]["wazuh_live_agent_total"] == 0
assert data["summary"]["wazuh_live_registry_empty_count"] == 1
assert data["summary"]["wazuh_live_route_degraded_count"] == 1
assert data["summary"]["runtime_gate_count"] == 0
assert "token-value" not in response.text

View File

@@ -20206,8 +20206,8 @@
},
"runtimeSecurityReadback": {
"eyebrow": "IwoooS Runtime 資安讀回",
"title": "條 P0 資安線先接到同一張讀回板",
"subtitle": "這張板讀取後端彙整的只讀快照,集中顯示 Wazuh 清單驗收、儀表板 API、資安觀測節點 intake、告警可讀性、負責人送件與外部入侵防護它不查 live Wazuh、不啟動掃描、不送訊息、不改主機。",
"title": "條 P0 資安線先接到同一張讀回板",
"subtitle": "這張板讀取後端彙整的只讀快照,並附上 Wazuh 正式只讀路由的公開安全 aggregate 讀回;它不保存 raw Wazuh payload、不啟動掃描、不送訊息、不改主機。",
"statusLabel": "讀回狀態",
"statusDetail": "讀回成功只代表 IwoooS 能看見目前的證據邊界runtime 寫入、主動回應、掃描、重啟、Nginx reload、workflow 修改與機密操作仍全部關閉。",
"laneStatusLabel": "目前狀態",
@@ -20233,6 +20233,10 @@
"label": "Wazuh 清單",
"detail": "管理器清單接受數仍為 0。"
},
"wazuhLive": {
"label": "Wazuh live",
"detail": "正式只讀路由若 disabled、空清單或低於預期首屏直接顯示警示。"
},
"ownerAccepted": {
"label": "負責人驗收",
"detail": "收到 / 接受都必須由正式 owner response 證明。"
@@ -20251,6 +20255,10 @@
"title": "Wazuh manager registry",
"body": "必須拿到總數、在線、離線、從未連線與時間窗transport 或儀表板可見不能代替驗收。"
},
"wazuh_live_route": {
"title": "Wazuh 正式只讀路由",
"body": "直接讀正式路由的公開安全 aggregatedisabled、misconfigured、empty、below expected 都是 P0 紅燈,不會被 route 200 蓋過。"
},
"wazuh_dashboard_api": {
"title": "Wazuh Dashboard API",
"body": "API connection / API version 還要補 readbackindex pattern 通過不能宣稱 Wazuh 全綠。"

View File

@@ -20206,8 +20206,8 @@
},
"runtimeSecurityReadback": {
"eyebrow": "IwoooS Runtime 資安讀回",
"title": "條 P0 資安線先接到同一張讀回板",
"subtitle": "這張板讀取後端彙整的只讀快照,集中顯示 Wazuh 清單驗收、儀表板 API、資安觀測節點 intake、告警可讀性、負責人送件與外部入侵防護它不查 live Wazuh、不啟動掃描、不送訊息、不改主機。",
"title": "條 P0 資安線先接到同一張讀回板",
"subtitle": "這張板讀取後端彙整的只讀快照,並附上 Wazuh 正式只讀路由的公開安全 aggregate 讀回;它不保存 raw Wazuh payload、不啟動掃描、不送訊息、不改主機。",
"statusLabel": "讀回狀態",
"statusDetail": "讀回成功只代表 IwoooS 能看見目前的證據邊界runtime 寫入、主動回應、掃描、重啟、Nginx reload、workflow 修改與機密操作仍全部關閉。",
"laneStatusLabel": "目前狀態",
@@ -20233,6 +20233,10 @@
"label": "Wazuh 清單",
"detail": "管理器清單接受數仍為 0。"
},
"wazuhLive": {
"label": "Wazuh live",
"detail": "正式只讀路由若 disabled、空清單或低於預期首屏直接顯示警示。"
},
"ownerAccepted": {
"label": "負責人驗收",
"detail": "收到 / 接受都必須由正式 owner response 證明。"
@@ -20251,6 +20255,10 @@
"title": "Wazuh manager registry",
"body": "必須拿到總數、在線、離線、從未連線與時間窗transport 或儀表板可見不能代替驗收。"
},
"wazuh_live_route": {
"title": "Wazuh 正式只讀路由",
"body": "直接讀正式路由的公開安全 aggregatedisabled、misconfigured、empty、below expected 都是 P0 紅燈,不會被 route 200 蓋過。"
},
"wazuh_dashboard_api": {
"title": "Wazuh Dashboard API",
"body": "API connection / API version 還要補 readbackindex pattern 通過不能宣稱 Wazuh 全綠。"

View File

@@ -326,7 +326,7 @@ type WazuhReadonlyStatusResponse = {
}
type RuntimeSecurityReadbackSummaryItem = {
key: 'controlPlane' | 'runtimeAcceptance' | 'wazuhRegistry' | 'ownerAccepted' | 'kaliRuntime' | 'runtimeGate'
key: 'controlPlane' | 'runtimeAcceptance' | 'wazuhRegistry' | 'wazuhLive' | 'ownerAccepted' | 'kaliRuntime' | 'runtimeGate'
value: string
icon: typeof ShieldCheck
tone: 'steady' | 'warn' | 'locked'
@@ -8124,6 +8124,12 @@ function IwoooSRuntimeSecurityReadbackBoard() {
icon: Radar,
tone: 'locked',
},
{
key: 'wazuhLive',
value: summary ? `${summary.wazuh_live_agent_total}/${summary.wazuh_live_status}` : '...',
icon: Route,
tone: summary && summary.wazuh_live_metadata_available_count === 1 && summary.wazuh_live_route_degraded_count === 0 ? 'steady' : 'warn',
},
{
key: 'ownerAccepted',
value: summary ? `${summary.owner_response_accepted_count}/${summary.owner_response_received_count}` : '...',

View File

@@ -100,6 +100,7 @@ export type IwoooSRuntimeSecurityReadbackTone = 'steady' | 'warn' | 'locked'
export interface IwoooSRuntimeSecurityReadbackLane {
lane_id:
| 'wazuh_registry'
| 'wazuh_live_route'
| 'wazuh_dashboard_api'
| 'kali_intake'
| 'alert_readability'
@@ -131,6 +132,15 @@ export interface IwoooSRuntimeSecurityReadbackResponse {
wazuh_manager_registry_accepted_count: number
wazuh_transport_observed_count: number
wazuh_dashboard_api_degraded_observed_count: number
wazuh_live_route_http_status: number
wazuh_live_route_degraded_count: number
wazuh_live_readonly_api_enabled_count: number
wazuh_live_agent_total: number
wazuh_live_agent_active: number
wazuh_live_registry_empty_count: number
wazuh_live_below_expected_count: number
wazuh_live_metadata_available_count: number
wazuh_live_status: string
kali_active_scan_authorized_count: number
kali_execute_authorized_count: number
kali_finding_envelope_accepted_count: number

View File

@@ -151,6 +151,42 @@
**邊界**:本段只改前端全域 App Shell、Sidebar/Header 與 i18n沒有改 AI runtime gate、Telegram send、告警路由、主機、Nginx、K8s、secret、DB、workflow 或自動修復授權。
## 2026-06-26P0 Wazuh live route 進 Runtime 資安讀回disabled / empty 不再藏在下方卡片
**背景**:正式站 live readback 顯示 `GET /api/iwooos/wazuh``GET /api/v1/iwooos/wazuh` 目前皆為 `200 disabled_waiting_iwooos_wazuh_owner_gate`,代表 Wazuh 只讀 live metadata 尚未啟用;這不是 manager registry 恢復、不是 agent 全部納管,也不是 API connection 已修復。為避免 Wazuh 退化只藏在 IwoooS 頁面下方卡片,本段把正式 Wazuh 只讀路由的公開安全 aggregate 結果接進 Runtime 資安讀回總板。
**完成**
- 新增 `apps/api/src/services/iwooos_wazuh_readonly_status.py`,把 Wazuh 只讀 metadata 邏輯從 API router 抽成可重用 service仍只回 aggregate / redacted agent alias不保存 raw Wazuh payload、不顯示 agent 原名、內網位址、token、password 或 secret。
- `GET /api/iwooos/wazuh``GET /api/v1/iwooos/wazuh` 保持相容,改由 service 回傳相同 disabled / misconfigured / unavailable / empty / below expected / available 狀態。
- `GET /api/v1/iwooos/runtime-security-readback` 新增 `wazuh_live_route` P0 lane 與 `wazuh_live_*` summary正式路由 HTTP、唯讀查詢啟用數、agent total / active、registry empty、below expected、metadata available 與 degraded count 都會進 Runtime board。
- `/zh-TW/iwooos` Runtime 資安讀回摘要新增 `Wazuh live`,將 `agent_total / status` 顯示在首屏板disabled、misconfigured、empty、below expected 或 unavailable 都以警示色呈現,不能被 route 200 蓋過。
- `scripts/security/wazuh-readonly-route-boundary-guard.py` 已從掃 2 個 route 擴充為掃 3 個 sourceNext route、FastAPI route、新 Wazuh service避免 service 內硬編 Wazuh URL、帳密、關 TLS、raw payload 或假 SOC 文案。
- `IWOOOS-WAZUH-READONLY-API-RELEASE-HANDOFF.md` 已同步新 service、Runtime lane 與 `route=3` guard 結果。
**本地驗證**
- `pytest apps/api/tests/test_iwooos_runtime_security_readback.py apps/api/tests/test_iwooos_wazuh_api.py -q``10 passed, 1 warning`
- IwoooS / Telegram / operator 關鍵子集:`255 passed, 2 warnings`
- `python3 scripts/security/wazuh-readonly-route-boundary-guard.py --root .``WAZUH_READONLY_ROUTE_BOUNDARY_GUARD_OK route=3 public_ui_files=1 forbidden=0 runtime_gate=0`
- `python3 scripts/security/security-mirror-progress-guard.py --root .``SECURITY_MIRROR_PROGRESS_GUARD_OK`
- `python3 scripts/security/source-control-owner-response-guard.py --root .``SOURCE_CONTROL_OWNER_RESPONSE_GUARD_OK`
- `python3 scripts/ops/doc-secrets-sanity-check.py docs/security docs/templates docs/LOGBOOK.md apps/web/messages apps/api/src/api/v1/iwooos.py apps/api/src/services/iwooos_runtime_security_readback.py apps/api/src/services/iwooos_wazuh_readonly_status.py apps/web/src/app/[locale]/iwooos/page.tsx apps/web/src/lib/api-client.ts scripts/security/wazuh-readonly-route-boundary-guard.py``DOC_SECRET_SANITY_OK scanned_files=274`
- `python3 -m json.tool apps/web/messages/zh-TW.json` / `apps/web/messages/en.json`:通過。
- `python3 -m py_compile apps/api/src/api/v1/iwooos.py apps/api/src/services/iwooos_runtime_security_readback.py apps/api/src/services/iwooos_wazuh_readonly_status.py scripts/security/wazuh-readonly-route-boundary-guard.py`:通過。
- `git diff --check`:通過。
- 本地 FastAPI TestClient smoke`/api/v1/iwooos/runtime-security-readback``200``p0_lane_count=7``wazuh_live_status=disabled_waiting_iwooos_wazuh_owner_gate``wazuh_live_route_degraded_count=1``wazuh_live_route` lane 存在,且未含 `192.168.0.``工作視窗``批准!繼續``My request for Codex`
- `pnpm --dir apps/web typecheck`:本臨時 worktree 缺 `apps/web/node_modules/typescript`,未能本地執行;需以 Gitea CD / production browser readback 補正式驗證。
**完成度同步**
- 本階段 source-side 實作:`100%`
- Runtime 資安讀回納入 Wazuh live route`0% -> 100%`
- Wazuh live metadata enable`0%`
- Wazuh manager registry accepted`0`
- IwoooS Runtime 資安讀回層:`94% -> 95%`
- IwoooS 整體資安推進保守維持:`65%`;不因 route 可見或 lane 接上而提高 runtime acceptance。
- Runtime acceptance、owner accepted、active response、host write、Kali active scan、Telegram send、secret collection仍全部 `0 / false`
**邊界**:本段沒有啟用 Wazuh live metadata env、沒有收集 Wazuh secret、沒有修 dashboard stored API、沒有重新註冊 agent、沒有重啟 Wazuh manager / dashboard、沒有 SSH 主機、沒有改 Nginx / Docker / firewall / K8s、沒有 active response、沒有 Kali scan、沒有 Telegram send、沒有 force push。下一個 P0 仍是 Wazuh live metadata owner gate、server-side secret metadata、readonly account scope、manager health ref、post-enable readback 與 manager registry 全量交叉驗收。
## 2026-06-26IwoooS controlled apply guard 收斂:資安讀回、防退化與正式站驗證完成
**背景**P2-414B 已把 AwoooP allowlisted low / medium / high 修復路徑改成 `controlled_apply`,但部分資安 guard 與文件仍停在舊的 `runtime_write_gate=0` / `candidate_only` / 高風險人工 Gate 語意,造成 CD guard 與最新產品方向衝突。本段只收斂 source / guard / readback / production verification不碰主機、Wazuh live、Kali、Nginx、Docker、firewall、secret 或 Telegram send。

View File

@@ -35,8 +35,11 @@
變更範圍:
- `apps/api/src/api/v1/iwooos.py`
- `apps/api/src/services/iwooos_wazuh_readonly_status.py`
- `apps/api/src/services/iwooos_runtime_security_readback.py`
- `apps/api/src/main.py`
- `apps/api/tests/test_iwooos_wazuh_api.py`
- `apps/api/tests/test_iwooos_runtime_security_readback.py`
- `scripts/security/wazuh-readonly-route-boundary-guard.py`
- `scripts/security/wazuh-readonly-production-readback.py`
- `scripts/security/wazuh-readonly-release-gate.py`
@@ -59,6 +62,8 @@
- 新增 FastAPI `GET /api/iwooos/wazuh`
- 新增 FastAPI `GET /api/v1/iwooos/wazuh`
- 將 Wazuh 只讀 metadata 邏輯抽成 `iwooos_wazuh_readonly_status.py`,讓正式 route、Runtime 資安讀回與後續告警鏈共用同一份脫敏 aggregate 結果。
- `GET /api/v1/iwooos/runtime-security-readback` 新增 `wazuh_live_route` P0 lane 與 `wazuh_live_*` summary正式路由 disabled、misconfigured、registry empty、below expected 或 unavailable 都會在 Runtime board 顯示退化,不再只藏在下方卡片。
- 預設回 `disabled_waiting_iwooos_wazuh_owner_gate`,避免 production 繼續用 404 表示未啟用。
- live Wazuh 查詢仍需 `IWOOOS_WAZUH_READONLY_ENABLED=true` 與 server-side env`WAZUH_API_BASE_URL``WAZUH_API_USERNAME``WAZUH_API_PASSWORD`
- 強制 Wazuh base URL 使用 HTTPS。
@@ -87,8 +92,8 @@ python3 scripts/security/wazuh-readonly-release-owner-request.py --root .
python3 scripts/security/wazuh-readonly-release-owner-response-acceptance.py --root .
python3 scripts/security/wazuh-readonly-live-metadata-env-gate.py --root .
python3 scripts/security/security-mirror-progress-guard.py --root .
python3 scripts/ops/doc-secrets-sanity-check.py docs apps/api/src/api/v1/iwooos.py apps/web/src/app/api/iwooos/wazuh/route.ts scripts/security/wazuh-readonly-route-boundary-guard.py scripts/security/wazuh-readonly-production-readback.py scripts/security/wazuh-readonly-release-gate.py scripts/security/wazuh-readonly-release-lane-preflight.py scripts/security/wazuh-readonly-release-owner-request.py scripts/security/wazuh-readonly-release-owner-response-acceptance.py scripts/security/wazuh-readonly-live-metadata-env-gate.py
python3 -m py_compile apps/api/src/api/v1/iwooos.py scripts/security/wazuh-readonly-route-boundary-guard.py scripts/security/wazuh-readonly-production-readback.py scripts/security/wazuh-readonly-release-gate.py scripts/security/wazuh-readonly-release-lane-preflight.py scripts/security/wazuh-readonly-release-owner-request.py scripts/security/wazuh-readonly-release-owner-response-acceptance.py scripts/security/wazuh-readonly-live-metadata-env-gate.py scripts/security/security-mirror-progress-guard.py
python3 scripts/ops/doc-secrets-sanity-check.py docs apps/api/src/api/v1/iwooos.py apps/api/src/services/iwooos_wazuh_readonly_status.py apps/api/src/services/iwooos_runtime_security_readback.py apps/web/src/app/api/iwooos/wazuh/route.ts scripts/security/wazuh-readonly-route-boundary-guard.py scripts/security/wazuh-readonly-production-readback.py scripts/security/wazuh-readonly-release-gate.py scripts/security/wazuh-readonly-release-lane-preflight.py scripts/security/wazuh-readonly-release-owner-request.py scripts/security/wazuh-readonly-release-owner-response-acceptance.py scripts/security/wazuh-readonly-live-metadata-env-gate.py
python3 -m py_compile apps/api/src/api/v1/iwooos.py apps/api/src/services/iwooos_wazuh_readonly_status.py apps/api/src/services/iwooos_runtime_security_readback.py scripts/security/wazuh-readonly-route-boundary-guard.py scripts/security/wazuh-readonly-production-readback.py scripts/security/wazuh-readonly-release-gate.py scripts/security/wazuh-readonly-release-lane-preflight.py scripts/security/wazuh-readonly-release-owner-request.py scripts/security/wazuh-readonly-release-owner-response-acceptance.py scripts/security/wazuh-readonly-live-metadata-env-gate.py scripts/security/security-mirror-progress-guard.py
git diff --check
node -e "JSON.parse(require('fs').readFileSync('apps/web/messages/zh-TW.json','utf8')); JSON.parse(require('fs').readFileSync('apps/web/messages/en.json','utf8')); console.log('i18n json ok')"
cmp -s apps/web/messages/zh-TW.json apps/web/messages/en.json
@@ -99,7 +104,7 @@ NEXT_PUBLIC_API_URL=https://awoooi.wooo.work NEXT_PRIVATE_BUILD_WORKER_COUNT=1 S
驗證結果:
- `pytest apps/api/tests/test_iwooos_wazuh_api.py``6 passed`
- `wazuh-readonly-route-boundary-guard``route=2 public_ui_files=1 forbidden=0 runtime_gate=0`
- `wazuh-readonly-route-boundary-guard``route=3 public_ui_files=1 forbidden=0 runtime_gate=0`
- `wazuh-readonly-release-gate``source=1 push=1 main=1 deploy=1 readback=1 runtime_gate=0`
- `wazuh-readonly-release-lane-preflight``ready=0 acks=0/6 evidence=0/6 runtime_gate=0`
- `wazuh-readonly-release-owner-request``drafts=1 sent=0 accepted=0 runtime_gate=0`
@@ -130,7 +135,7 @@ git am /private/tmp/awoooi-iwooos-wazuh-boundary-release-patch-<timestamp>/*.pat
乾淨套用 worktree 驗證結果:
- `pytest apps/api/tests/test_iwooos_wazuh_api.py``6 passed`
- `python3 scripts/security/wazuh-readonly-route-boundary-guard.py --root .``WAZUH_READONLY_ROUTE_BOUNDARY_GUARD_OK route=2 public_ui_files=1 forbidden=0 runtime_gate=0`
- `python3 scripts/security/wazuh-readonly-route-boundary-guard.py --root .``WAZUH_READONLY_ROUTE_BOUNDARY_GUARD_OK route=3 public_ui_files=1 forbidden=0 runtime_gate=0`
- `python3 scripts/security/wazuh-readonly-release-gate.py --root .``WAZUH_READONLY_RELEASE_GATE_OK source=1 push=1 main=1 deploy=1 readback=1 runtime_gate=0`
- `python3 scripts/security/wazuh-readonly-release-lane-preflight.py --root .``WAZUH_READONLY_RELEASE_LANE_PREFLIGHT_OK ready=0 acks=0/6 evidence=0/6 runtime_gate=0`
- `python3 scripts/security/security-mirror-progress-guard.py --root .``SECURITY_MIRROR_PROGRESS_GUARD_OK`

View File

@@ -20,6 +20,7 @@ from typing import Any
TAIPEI = timezone(timedelta(hours=8))
NEXT_ROUTE_PATH = Path("apps/web/src/app/api/iwooos/wazuh/route.ts")
BACKEND_ROUTE_PATH = Path("apps/api/src/api/v1/iwooos.py")
BACKEND_SERVICE_PATH = Path("apps/api/src/services/iwooos_wazuh_readonly_status.py")
PUBLIC_PAGE_PATH = Path("apps/web/src/app/[locale]/iwooos/page.tsx")
PUBLIC_COMPONENT_ROOT = Path("apps/web/src/components/iwooos")
@@ -60,6 +61,10 @@ ROUTE_REQUIRED_TOKENS = [
BACKEND_REQUIRED_TOKENS = [
"/api/iwooos/wazuh",
"/api/v1/iwooos/wazuh",
"load_iwooos_wazuh_readonly_status",
]
BACKEND_SERVICE_REQUIRED_TOKENS = [
"IWOOOS_WAZUH_READONLY_ENABLED",
"IWOOOS_WAZUH_EXPECTED_MIN_AGENT_COUNT",
"WAZUH_API_BASE_URL",
@@ -71,7 +76,7 @@ BACKEND_REQUIRED_TOKENS = [
"runtime_gate_count",
"raw_wazuh_payload_storage_allowed",
"internal_ip_public_display_allowed",
"_redacted_agent",
"redacted_agent",
"wazuh_agent_registry_empty",
"wazuh_agent_registry_below_expected",
"agent_registry_empty_count",
@@ -175,7 +180,11 @@ def pattern_applies(pattern: ForbiddenPattern, source_kind: str) -> bool:
def collect_forbidden_matches(root: Path) -> list[dict[str, Any]]:
targets: list[tuple[str, Path]] = [("route", NEXT_ROUTE_PATH), ("route", BACKEND_ROUTE_PATH)]
targets: list[tuple[str, Path]] = [
("route", NEXT_ROUTE_PATH),
("route", BACKEND_ROUTE_PATH),
("route", BACKEND_SERVICE_PATH),
]
targets.extend(("public_ui", path) for path in collect_public_ui_files(root))
matches: list[dict[str, Any]] = []
@@ -204,14 +213,20 @@ def collect_missing_required_tokens(route_text: str, required_tokens: list[str])
def build_report(root: Path, generated_at: str | None = None) -> dict[str, Any]:
next_route = root / NEXT_ROUTE_PATH
backend_route = root / BACKEND_ROUTE_PATH
backend_service = root / BACKEND_SERVICE_PATH
next_route_present = next_route.exists()
backend_route_present = backend_route.exists()
backend_service_present = backend_service.exists()
next_route_text = read_text(next_route) if next_route_present else ""
backend_route_text = read_text(backend_route) if backend_route_present else ""
backend_service_text = read_text(backend_service) if backend_service_present else ""
public_ui_files = collect_public_ui_files(root)
missing_required_tokens = {
NEXT_ROUTE_PATH.as_posix(): collect_missing_required_tokens(next_route_text, ROUTE_REQUIRED_TOKENS),
BACKEND_ROUTE_PATH.as_posix(): collect_missing_required_tokens(backend_route_text, BACKEND_REQUIRED_TOKENS),
BACKEND_SERVICE_PATH.as_posix(): collect_missing_required_tokens(
backend_service_text, BACKEND_SERVICE_REQUIRED_TOKENS
),
}
missing_required_token_count = sum(len(tokens) for tokens in missing_required_tokens.values())
forbidden_matches = collect_forbidden_matches(root)
@@ -225,28 +240,37 @@ def build_report(root: Path, generated_at: str | None = None) -> dict[str, Any]:
else "blocked"
),
"mode": "repo_source_scan_no_runtime_no_secret_collection",
"guarded_route_paths": [NEXT_ROUTE_PATH.as_posix(), BACKEND_ROUTE_PATH.as_posix()],
"guarded_route_paths": [
NEXT_ROUTE_PATH.as_posix(),
BACKEND_ROUTE_PATH.as_posix(),
BACKEND_SERVICE_PATH.as_posix(),
],
"guarded_public_ui_paths": [path.as_posix() for path in public_ui_files],
"required_route_tokens": {
NEXT_ROUTE_PATH.as_posix(): ROUTE_REQUIRED_TOKENS,
BACKEND_ROUTE_PATH.as_posix(): BACKEND_REQUIRED_TOKENS,
BACKEND_SERVICE_PATH.as_posix(): BACKEND_SERVICE_REQUIRED_TOKENS,
},
"forbidden_pattern_ids": [pattern.pattern_id for pattern in FORBIDDEN_PATTERNS],
"summary": {
"route_present_count": int(next_route_present) + int(backend_route_present),
"route_present_count": int(next_route_present) + int(backend_route_present) + int(backend_service_present),
"next_route_present_count": 1 if next_route_present else 0,
"backend_route_present_count": 1 if backend_route_present else 0,
"backend_service_present_count": 1 if backend_service_present else 0,
"public_ui_file_count": len(public_ui_files),
"required_token_count": len(ROUTE_REQUIRED_TOKENS) + len(BACKEND_REQUIRED_TOKENS),
"required_token_count": len(ROUTE_REQUIRED_TOKENS)
+ len(BACKEND_REQUIRED_TOKENS)
+ len(BACKEND_SERVICE_REQUIRED_TOKENS),
"missing_required_token_count": missing_required_token_count,
"forbidden_pattern_count": len(FORBIDDEN_PATTERNS),
"forbidden_match_count": len(forbidden_matches),
"readonly_api_default_closed_count": sum(
"IWOOOS_WAZUH_READONLY_ENABLED" in text for text in [next_route_text, backend_route_text]
"IWOOOS_WAZUH_READONLY_ENABLED" in text
for text in [next_route_text, backend_route_text, backend_service_text]
),
"server_side_env_required_count": sum(
token in text
for text in [next_route_text, backend_route_text]
for text in [next_route_text, backend_route_text, backend_service_text]
for token in ["WAZUH_API_BASE_URL", "WAZUH_API_USERNAME", "WAZUH_API_PASSWORD"]
),
"tls_disable_match_count": sum(
@@ -299,6 +323,8 @@ def validate(root: Path) -> None:
errors.append(f"{NEXT_ROUTE_PATH.as_posix()}: Wazuh Next.js 只讀 route 不存在")
if report["summary"]["backend_route_present_count"] != 1:
errors.append(f"{BACKEND_ROUTE_PATH.as_posix()}: Wazuh FastAPI 相容 route 不存在")
if report["summary"]["backend_service_present_count"] != 1:
errors.append(f"{BACKEND_SERVICE_PATH.as_posix()}: Wazuh FastAPI 只讀 service 不存在")
for path, tokens in report["missing_required_tokens"].items():
for token in tokens:
errors.append(f"{path}: 缺少必要只讀邊界 token {token!r}")