Files
awoooi/apps/api/src/api/v1/iwooos.py
Your Name 9778cc22fc
Some checks failed
Code Review / ai-code-review (push) Successful in 12s
CD Pipeline / tests (push) Successful in 1m38s
CD Pipeline / build-and-deploy (push) Successful in 4m22s
Ansible / Reboot Recovery Contract / validate (push) Has been cancelled
CD Pipeline / post-deploy-checks (push) Has been cancelled
feat(iwooos): surface Wazuh live route in runtime readback
2026-06-26 23:32:39 +08:00

105 lines
3.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
IwoooS 安全治理 API。
Wazuh 接線採用只讀 metadata 模式:預設關閉、不保存 raw payload、
不公開 agent 原名 / 內網 IP、不啟用 active response。
"""
from __future__ import annotations
import asyncio
import json
from typing import Any
from fastapi import APIRouter, HTTPException, status
from fastapi.responses import JSONResponse
from src.services.iwooos_runtime_security_readback import (
load_latest_iwooos_runtime_security_readback,
)
from src.services.iwooos_security_control_coverage import (
load_latest_iwooos_security_control_coverage,
)
from src.services.iwooos_wazuh_readonly_status import (
load_iwooos_wazuh_readonly_status,
)
from src.services.public_redaction import redact_public_lan_topology
router = APIRouter(tags=["IwoooS Security"])
async def _wazuh_readonly_status() -> JSONResponse:
result = await load_iwooos_wazuh_readonly_status()
return JSONResponse(status_code=result.http_status, content=result.payload)
@router.get("/api/iwooos/wazuh")
async def get_iwooos_wazuh_readonly_status_compat() -> JSONResponse:
return await _wazuh_readonly_status()
@router.get("/api/v1/iwooos/wazuh")
async def get_iwooos_wazuh_readonly_status_v1() -> JSONResponse:
return await _wazuh_readonly_status()
@router.get(
"/api/v1/iwooos/runtime-security-readback",
response_model=dict[str, Any],
summary="取得 IwoooS runtime security readback",
description=(
"讀取最新已提交的 IwoooS 資安只讀快照,彙總 Wazuh、Kali、SOC/SIEM、"
"告警可讀性、owner dispatch 與外部入侵防護 Gate並附上 Wazuh 只讀路由的"
"公開安全 aggregate 讀回。此端點不呼叫 Kali / 主機 / Docker / Nginx / firewall / "
"Telegram不保存 raw Wazuh payload不收集 secret不授權 runtime 寫入。"
),
)
async def get_iwooos_runtime_security_readback() -> dict[str, Any]:
"""回傳 IwoooS 資安 runtime readback 只讀總板。"""
try:
wazuh_result = await load_iwooos_wazuh_readonly_status()
payload = await asyncio.to_thread(
load_latest_iwooos_runtime_security_readback,
wazuh_live_status=wazuh_result.payload,
wazuh_live_http_status=wazuh_result.http_status,
)
return redact_public_lan_topology(payload)
except FileNotFoundError as exc:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=str(exc),
) from exc
except (json.JSONDecodeError, ValueError) as exc:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"IwoooS runtime security readback 無效:{exc}",
) from exc
@router.get(
"/api/v1/iwooos/security-control-coverage",
response_model=dict[str, Any],
summary="取得 IwoooS 資安納管覆蓋總表",
description=(
"彙整已提交的主機、產品、服務、配置、監控、Wazuh、AI Agent 與 agent-bounty "
"資安納管 snapshot形成只讀覆蓋總表。此端點不查 live host、不讀 secret、不啟動掃描、"
"不送告警、不開 runtime gate。"
),
)
async def get_iwooos_security_control_coverage() -> dict[str, Any]:
"""回傳 IwoooS 資安納管覆蓋只讀總表。"""
try:
payload = await asyncio.to_thread(load_latest_iwooos_security_control_coverage)
return redact_public_lan_topology(payload)
except FileNotFoundError as exc:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=str(exc),
) from exc
except (json.JSONDecodeError, ValueError) as exc:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"IwoooS security control coverage 無效:{exc}",
) from exc