From 7637bd2cb046691b0246b870bc0cde6a22d20496 Mon Sep 17 00:00:00 2001 From: Your Name Date: Sun, 28 Jun 2026 15:21:54 +0800 Subject: [PATCH] feat(iwooos): add wazuh live metadata owner packet validator --- apps/api/src/api/v1/iwooos.py | 40 ++ .../iwooos_wazuh_live_metadata_gate.py | 422 ++++++++++++++++++ .../test_iwooos_runtime_security_readback.py | 121 +++++ docs/LOGBOOK.md | 14 + 4 files changed, 597 insertions(+) diff --git a/apps/api/src/api/v1/iwooos.py b/apps/api/src/api/v1/iwooos.py index bf823ed1..77e0788e 100644 --- a/apps/api/src/api/v1/iwooos.py +++ b/apps/api/src/api/v1/iwooos.py @@ -29,6 +29,9 @@ from src.services.iwooos_security_control_coverage import ( from src.services.iwooos_wazuh_live_metadata_gate import ( load_latest_iwooos_wazuh_live_metadata_gate, ) +from src.services.iwooos_wazuh_live_metadata_gate import ( + validate_iwooos_wazuh_live_metadata_owner_packet as validate_wazuh_live_metadata_owner_packet_payload, +) from src.services.iwooos_wazuh_managed_host_coverage import ( load_latest_iwooos_wazuh_managed_host_coverage, ) @@ -111,6 +114,43 @@ async def get_iwooos_wazuh_live_metadata_gate() -> dict[str, Any]: ) from exc +@router.post( + "/api/v1/iwooos/wazuh-live-metadata-gate/validate-live-metadata-owner-packet", + response_model=dict[str, Any], + summary="驗證 Wazuh 即時中繼資料脫敏 owner packet", + description=( + "針對單次 owner / reviewer 提供的 redacted Wazuh live metadata owner packet " + "進行 no-persist metadata review validation,回傳 accepted-for-review / needs supplement / " + "quarantined / rejected runtime action 分流。此端點不保存 payload、不查 live Wazuh API、" + "不讀主機、不讀或回傳機密明文、不保存原始 Wazuh 載荷、不啟用主動回應、不改 K8s / " + "ArgoCD / Docker / Nginx / firewall,也不更新 live metadata gate 總帳。" + ), +) +async def validate_iwooos_wazuh_live_metadata_owner_packet( + live_metadata_owner_packet: dict[str, Any], +) -> dict[str, Any]: + """回傳單次 Wazuh live metadata owner packet 的公開安全驗證結果。""" + try: + wazuh_result = await load_iwooos_wazuh_readonly_status() + payload = await asyncio.to_thread( + validate_wazuh_live_metadata_owner_packet_payload, + live_metadata_owner_packet, + wazuh_live_status=wazuh_result.payload, + wazuh_live_http_status=wazuh_result.http_status, + ) + return redact_public_lan_topology(payload) + except FileNotFoundError as exc: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=str(exc), + ) from exc + except (json.JSONDecodeError, ValueError) as exc: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"IwoooS Wazuh 即時中繼資料 owner packet 驗證器無效:{exc}", + ) from exc + + @router.get( "/api/v1/iwooos/wazuh-owner-evidence-preflight", response_model=dict[str, Any], diff --git a/apps/api/src/services/iwooos_wazuh_live_metadata_gate.py b/apps/api/src/services/iwooos_wazuh_live_metadata_gate.py index c06a7f7c..63d0d23c 100644 --- a/apps/api/src/services/iwooos_wazuh_live_metadata_gate.py +++ b/apps/api/src/services/iwooos_wazuh_live_metadata_gate.py @@ -9,6 +9,7 @@ authorizes Wazuh active response, scans, restarts, reloads, or host writes. from __future__ import annotations import json +import re from pathlib import Path from typing import Any @@ -35,6 +36,89 @@ _REQUIRED_FALSE_BOUNDARIES = { "wazuh_api_live_query_authorized", } +_PLACEHOLDER_VALUES = { + "", + "pending", + "todo", + "tbd", + "n/a", + "na", + "owner_here", + "reviewer_here", + "redacted_ref_here", + "secret_source_metadata_ref_here", + "wazuh_manager_health_ref_here", +} + +_SENSITIVE_TEXT_PATTERNS = { + "internal_ip": re.compile( + r"\b(?:10|127|172\.(?:1[6-9]|2\d|3[01])|192\.168)\.\d{1,3}\.\d{1,3}\b" + ), + "authorization_header": re.compile(r"Authorization\s*:", re.IGNORECASE), + "bearer_token": re.compile(r"Bearer\s+[A-Za-z0-9._-]{10,}", re.IGNORECASE), + "basic_auth": re.compile(r"Basic\s+[A-Za-z0-9+/=]{10,}", re.IGNORECASE), + "password_assignment": re.compile( + r"password\s*[:=]\s*['\"][^'\"]+['\"]", re.IGNORECASE + ), + "token_assignment": re.compile(r"token\s*[:=]\s*['\"][^'\"]+['\"]", re.IGNORECASE), + "cookie_assignment": re.compile( + r"cookie\s*[:=]\s*['\"][^'\"]+['\"]", re.IGNORECASE + ), + "client_keys": re.compile(r"client\.keys", re.IGNORECASE), + "private_key": re.compile(r"-----BEGIN [A-Z ]*PRIVATE KEY-----"), + "raw_session_text": re.compile( + r"(工作視窗|批准!繼續|source_thread_id|raw session)", re.IGNORECASE + ), +} + +_FORBIDDEN_KEY_FRAGMENTS = { + "authorization_header", + "basic_auth", + "bearer_token", + "client_keys", + "cookie", + "env_file", + "full_cli_output", + "full_journal", + "password", + "private_key", + "raw_dashboard_request", + "raw_env", + "raw_log", + "raw_runtime_volume", + "raw_wazuh_payload", + "session", + "stored_api_password", + "token", + "unredacted_screenshot", + "wazuh_api_password", +} + +_RUNTIME_ACTION_KEYS = { + "active_response_enable", + "agent_reenroll", + "agent_restart", + "argocd_sync", + "docker_restart", + "enable_live_metadata", + "enable_wazuh_live_metadata_without_owner_gate", + "execute_now", + "firewall_change", + "host_write", + "k8s_secret_patch", + "kali_active_scan", + "nginx_gateway_workaround", + "production_deploy_authorized", + "repo_write_authorized", + "runtime_execution_authorized", + "runtime_gate_open", + "wazuh_active_response", + "wazuh_active_response_authorized", + "wazuh_api_live_query", + "wazuh_api_live_query_authorized", + "wazuh_manager_restart", +} + def load_latest_iwooos_wazuh_live_metadata_gate( security_dir: Path | None = None, @@ -84,6 +168,12 @@ def load_latest_iwooos_wazuh_live_metadata_gate( "status": snapshot.get("status", "blocked_waiting_live_metadata_owner_response"), "mode": "committed_snapshot_readback_with_public_safe_wazuh_route_metadata", "source_refs": [f"docs/security/{_SNAPSHOT_FILE}", "GET /api/iwooos/wazuh"], + "live_metadata_owner_packet_validation_endpoint": ( + "/api/v1/iwooos/wazuh-live-metadata-gate/validate-live-metadata-owner-packet" + ), + "live_metadata_owner_packet_validation_mode": ( + "no_persist_owner_metadata_review_no_wazuh_query_no_secret_collection" + ), "summary": merged_summary, "items": _items(merged_summary), "boundary_markers": _boundary_markers(merged_summary), @@ -112,6 +202,149 @@ def load_latest_iwooos_wazuh_live_metadata_gate( } +def validate_iwooos_wazuh_live_metadata_owner_packet( + owner_packet: dict[str, Any], + security_dir: Path | None = None, + wazuh_live_status: dict[str, Any] | None = None, + wazuh_live_http_status: int = 0, +) -> dict[str, Any]: + """Validate one redacted live metadata owner packet without applying it.""" + contract = load_latest_iwooos_wazuh_live_metadata_gate( + security_dir, + wazuh_live_status=wazuh_live_status, + wazuh_live_http_status=wazuh_live_http_status, + ) + snapshot = _load_snapshot(security_dir or _DEFAULT_SECURITY_DIR) + required_fields = _strings(snapshot.get("required_owner_fields")) + + findings: list[dict[str, Any]] = [] + if not isinstance(owner_packet, dict): + findings.append( + _finding( + "LME-01", + "blocker", + "request_live_metadata_owner_packet_supplement", + "live metadata owner packet must be a JSON object.", + [], + ) + ) + return _validation_result( + contract, "request_live_metadata_owner_packet_supplement", findings + ) + + sensitive_hits = _collect_sensitive_hits(owner_packet) + if sensitive_hits: + findings.append( + _finding( + "LME-04", + "critical", + "quarantine_sensitive_payload", + "live metadata owner packet contains forbidden or likely unredacted content; response omits raw values.", + [hit["path"] for hit in sensitive_hits[:12]], + {"categories": sorted({hit["category"] for hit in sensitive_hits})}, + ) + ) + return _validation_result(contract, "quarantine_sensitive_payload", findings) + + runtime_hits = _collect_runtime_action_hits(owner_packet) + if runtime_hits: + findings.append( + _finding( + "LME-05", + "critical", + "reject_runtime_action_request", + "live metadata owner packet requested runtime execution; this validator only reviews redacted metadata evidence.", + runtime_hits[:12], + ) + ) + return _validation_result(contract, "reject_runtime_action_request", findings) + + missing_fields = [ + field + for field in required_fields + if not _present(owner_packet.get(field)) + or _placeholder(owner_packet.get(field)) + ] + if missing_fields: + findings.append( + _finding( + "LME-01", + "blocker", + "request_live_metadata_owner_packet_supplement", + "live metadata owner packet is missing required metadata-only fields.", + missing_fields, + ) + ) + + if owner_packet.get("no_secret_value_attestation") != "no_secret_value_collected": + findings.append( + _finding( + "LME-06", + "blocker", + "request_secret_boundary_ack_fix", + "no_secret_value_attestation must state no_secret_value_collected.", + ["no_secret_value_attestation"], + ) + ) + + if owner_packet.get("no_raw_payload_attestation") != "no_raw_wazuh_payload_stored": + findings.append( + _finding( + "LME-07", + "blocker", + "request_raw_payload_boundary_ack_fix", + "no_raw_payload_attestation must state no_raw_wazuh_payload_stored.", + ["no_raw_payload_attestation"], + ) + ) + + if ( + owner_packet.get("active_response_separate_gate_ack") + != "active_response_requires_separate_gate" + ): + findings.append( + _finding( + "LME-08", + "blocker", + "request_active_response_boundary_ack_fix", + "active_response_separate_gate_ack must state active_response_requires_separate_gate.", + ["active_response_separate_gate_ack"], + ) + ) + + post_enable_command = str(owner_packet.get("post_enable_readback_command") or "") + if "wazuh-readonly-production-readback.py" not in post_enable_command: + findings.append( + _finding( + "LME-09", + "blocker", + "request_post_enable_readback_command_fix", + "post_enable_readback_command must reference the committed Wazuh readonly production readback script.", + ["post_enable_readback_command"], + ) + ) + + outcome = ( + _first_blocking_lane(findings) + or "accepted_for_live_metadata_owner_review_only" + ) + if outcome == "accepted_for_live_metadata_owner_review_only": + findings.append( + _finding( + "LME-10", + "info", + "live_metadata_owner_review_ready", + "live metadata owner packet passed no-persist metadata review; live Wazuh query and runtime gate remain closed.", + [ + "secret_source_metadata_ref", + "wazuh_manager_health_ref", + "readonly_account_scope_ref", + ], + ) + ) + return _validation_result(contract, outcome, findings) + + def _load_snapshot(directory: Path) -> dict[str, Any]: path = directory / _SNAPSHOT_FILE if not path.is_file(): @@ -134,6 +367,12 @@ def _int(value: Any) -> int: return value if isinstance(value, int) else 0 +def _strings(value: Any) -> list[str]: + if not isinstance(value, list): + return [] + return [item for item in value if isinstance(item, str)] + + def _live_route_summary(payload: dict[str, Any] | None, http_status: int) -> dict[str, Any]: if not isinstance(payload, dict): return { @@ -254,6 +493,189 @@ def _boundary_markers(summary: dict[str, Any]) -> list[str]: ] +def _validation_result( + contract: dict[str, Any], + outcome_lane: str, + findings: list[dict[str, Any]], +) -> dict[str, Any]: + accepted = outcome_lane == "accepted_for_live_metadata_owner_review_only" + quarantined = outcome_lane == "quarantine_sensitive_payload" + rejected_runtime = outcome_lane == "reject_runtime_action_request" + supplement_required = not accepted and not quarantined and not rejected_runtime + return { + "schema_version": "iwooos_wazuh_live_metadata_owner_packet_validation_result_v1", + "contract_schema_version": contract["schema_version"], + "status": outcome_lane, + "mode": "no_persist_live_metadata_owner_packet_no_wazuh_query_no_secret_collection", + "outcome_lane": outcome_lane, + "accepted_for_live_metadata_owner_review_only": accepted, + "quarantined": quarantined, + "runtime_action_rejected": rejected_runtime, + "summary": { + "live_metadata_owner_response_received_count": 1, + "live_metadata_owner_response_accepted_count": 1 if accepted else 0, + "secret_source_metadata_accepted_count": 1 if accepted else 0, + "wazuh_manager_health_ref_accepted_count": 1 if accepted else 0, + "readonly_account_scope_accepted_count": 1 if accepted else 0, + "live_metadata_owner_response_supplement_required_count": 1 + if supplement_required + else 0, + "live_metadata_owner_packet_quarantined_count": 1 if quarantined else 0, + "live_metadata_owner_runtime_action_rejected_count": 1 + if rejected_runtime + else 0, + "post_enable_readback_passed_count": 0, + "wazuh_api_live_query_authorized_count": 0, + "wazuh_active_response_authorized_count": 0, + "host_write_authorized_count": 0, + "runtime_gate_count": 0, + "finding_count": len(findings), + }, + "validation_findings": findings, + "boundary_markers": [ + "wazuh_live_metadata_owner_packet_validation_received_count=1", + f"wazuh_live_metadata_owner_packet_validation_accepted_count={1 if accepted else 0}", + f"wazuh_live_metadata_owner_packet_validation_quarantined_count={1 if quarantined else 0}", + f"wazuh_live_metadata_owner_packet_validation_runtime_action_rejected_count={1 if rejected_runtime else 0}", + "wazuh_live_metadata_owner_packet_validation_no_persist=true", + "wazuh_live_metadata_owner_packet_validation_live_query_authorized_count=0", + "wazuh_live_metadata_owner_packet_validation_runtime_gate_count=0", + "secret_value_collection_allowed=false", + "raw_wazuh_payload_storage_allowed=false", + "wazuh_active_response_authorized=false", + "host_write_authorized=false", + "not_authorization=true", + ], + "boundaries": { + "payload_persisted": False, + "runtime_execution_authorized": False, + "secret_value_collection_allowed": False, + "raw_wazuh_payload_storage_allowed": False, + "wazuh_api_live_query_authorized": False, + "wazuh_active_response_authorized": False, + "host_write_authorized": False, + "kali_active_scan_authorized": False, + "k8s_secret_patch_authorized": False, + "argocd_sync_authorized": False, + "docker_restart_authorized": False, + "nginx_gateway_workaround_authorized": False, + "firewall_change_authorized": False, + "runtime_gate_open": False, + "not_authorization": True, + }, + "next_gate": "reviewer_validation_before_server_side_env_enable" + if accepted + else "live_metadata_owner_packet_fix_and_resubmit", + } + + +def _finding( + check_id: str, + severity: str, + lane: str, + message: str, + field_paths: list[str], + extra: dict[str, Any] | None = None, +) -> dict[str, Any]: + payload: dict[str, Any] = { + "check_id": check_id, + "severity": severity, + "lane": lane, + "message": message, + "field_paths": field_paths, + } + if extra: + payload.update(extra) + return payload + + +def _present(value: Any) -> bool: + if value is None: + return False + if isinstance(value, str): + return bool(value.strip()) + if isinstance(value, list | dict | tuple | set): + return bool(value) + return True + + +def _placeholder(value: Any) -> bool: + if value is None: + return True + return str(value).strip().lower() in _PLACEHOLDER_VALUES + + +def _collect_sensitive_hits(value: Any, path: str = "$") -> list[dict[str, str]]: + hits: list[dict[str, str]] = [] + if isinstance(value, dict): + for key, item in value.items(): + key_text = str(key) + key_lower = key_text.lower() + for fragment in _FORBIDDEN_KEY_FRAGMENTS: + if fragment in key_lower: + hits.append( + { + "path": f"{path}.{key_text}", + "category": f"forbidden_key:{fragment}", + } + ) + hits.extend(_collect_sensitive_hits(item, f"{path}.{key_text}")) + return hits + if isinstance(value, list): + for index, item in enumerate(value): + hits.extend(_collect_sensitive_hits(item, f"{path}[{index}]")) + return hits + if isinstance(value, str): + for category, pattern in _SENSITIVE_TEXT_PATTERNS.items(): + if pattern.search(value): + hits.append({"path": path, "category": category}) + return hits + + +def _collect_runtime_action_hits(value: Any, path: str = "$") -> list[str]: + hits: list[str] = [] + if isinstance(value, dict): + for key, item in value.items(): + key_text = str(key) + normalized_key = key_text.lower().replace("-", "_").replace(" ", "_") + if normalized_key in _RUNTIME_ACTION_KEYS and item not in ( + False, + None, + "", + [], + {}, + ): + hits.append(f"{path}.{key_text}") + hits.extend(_collect_runtime_action_hits(item, f"{path}.{key_text}")) + return hits + if isinstance(value, list): + for index, item in enumerate(value): + hits.extend(_collect_runtime_action_hits(item, f"{path}[{index}]")) + return hits + if isinstance(value, str): + normalized = value.lower().replace("-", "_").replace(" ", "_") + if normalized in _RUNTIME_ACTION_KEYS: + hits.append(path) + return hits + + +def _first_blocking_lane(findings: list[dict[str, Any]]) -> str | None: + severity_order = {"critical": 0, "blocker": 1, "warn": 2, "info": 3} + blocking = [ + finding + for finding in findings + if finding.get("severity") in {"critical", "blocker"} + ] + if not blocking: + return None + blocking.sort( + key=lambda finding: severity_order.get(str(finding.get("severity")), 99) + ) + return str( + blocking[0].get("lane") or "request_live_metadata_owner_packet_supplement" + ) + + def _require_boundaries(payload: dict[str, Any]) -> None: summary = _summary(payload) for key in ( diff --git a/apps/api/tests/test_iwooos_runtime_security_readback.py b/apps/api/tests/test_iwooos_runtime_security_readback.py index 698c9898..6549e6bc 100644 --- a/apps/api/tests/test_iwooos_runtime_security_readback.py +++ b/apps/api/tests/test_iwooos_runtime_security_readback.py @@ -47,6 +47,26 @@ def _valid_runtime_controlled_apply_packet() -> dict[str, object]: } +def _valid_live_metadata_owner_packet() -> dict[str, object]: + return { + "wazuh_live_metadata_owner": "iwooos-security-owner", + "release_readback_ref": "production-readback-http-200-disabled-owner-gate", + "secret_injection_owner": "platform-secret-owner", + "secret_source_metadata_ref": "secret-source-metadata-ref-redacted-v1", + "wazuh_manager_health_ref": "wazuh-manager-health-ref-redacted-v1", + "wazuh_api_tls_validation_ref": "wazuh-api-tls-validation-ref-redacted-v1", + "readonly_account_scope_ref": "readonly-account-scope-ref-redacted-v1", + "agent_alias_mapping_policy": "public aliases only; no raw agent identity or internal IP display", + "post_enable_readback_command": "python3 scripts/security/wazuh-readonly-production-readback.py --json", + "rollback_owner": "iwooos-security-owner", + "maintenance_window": "low-traffic-window-required-before-any-future-enable", + "validation_plan": "GET public-safe aggregate readback only; no raw Wazuh payload storage", + "no_secret_value_attestation": "no_secret_value_collected", + "no_raw_payload_attestation": "no_raw_wazuh_payload_stored", + "active_response_separate_gate_ack": "active_response_requires_separate_gate", + } + + def _valid_runtime_gate_owner_review_packet() -> dict[str, object]: return { "owner_review_intent": "commit_runtime_gate_owner_review_readback_only", @@ -331,6 +351,10 @@ def test_iwooos_wazuh_live_metadata_gate_api_is_public_safe(monkeypatch) -> None assert data["boundaries"]["wazuh_active_response_authorized"] is False assert data["boundaries"]["host_write_authorized"] is False assert data["boundaries"]["not_authorization"] is True + assert ( + data["live_metadata_owner_packet_validation_endpoint"] + == "/api/v1/iwooos/wazuh-live-metadata-gate/validate-live-metadata-owner-packet" + ) assert len(data["items"]) == 6 assert any(marker == "正式路由讀回=1" for marker in data["boundary_markers"]) assert "192.168.0." not in response.text @@ -339,6 +363,103 @@ def test_iwooos_wazuh_live_metadata_gate_api_is_public_safe(monkeypatch) -> None assert "WAZUH_API_PASSWORD" not in response.text +def test_iwooos_wazuh_live_metadata_gate_validator_accepts_redacted_packet( + monkeypatch, +) -> None: + monkeypatch.delenv("IWOOOS_WAZUH_READONLY_ENABLED", raising=False) + monkeypatch.delenv("WAZUH_API_BASE_URL", raising=False) + monkeypatch.delenv("WAZUH_API_USERNAME", raising=False) + monkeypatch.delenv("WAZUH_API_PASSWORD", raising=False) + + client = _client() + before = client.get("/api/v1/iwooos/wazuh-live-metadata-gate").json() + response = client.post( + "/api/v1/iwooos/wazuh-live-metadata-gate/validate-live-metadata-owner-packet", + json=_valid_live_metadata_owner_packet(), + ) + after = client.get("/api/v1/iwooos/wazuh-live-metadata-gate").json() + + assert response.status_code == 200 + data = response.json() + assert ( + data["schema_version"] + == "iwooos_wazuh_live_metadata_owner_packet_validation_result_v1" + ) + assert data["status"] == "accepted_for_live_metadata_owner_review_only" + assert data["accepted_for_live_metadata_owner_review_only"] is True + assert data["summary"]["live_metadata_owner_response_received_count"] == 1 + assert data["summary"]["live_metadata_owner_response_accepted_count"] == 1 + assert data["summary"]["secret_source_metadata_accepted_count"] == 1 + assert data["summary"]["wazuh_manager_health_ref_accepted_count"] == 1 + assert data["summary"]["readonly_account_scope_accepted_count"] == 1 + assert data["summary"]["post_enable_readback_passed_count"] == 0 + assert data["summary"]["wazuh_api_live_query_authorized_count"] == 0 + assert data["summary"]["wazuh_active_response_authorized_count"] == 0 + assert data["summary"]["host_write_authorized_count"] == 0 + assert data["summary"]["runtime_gate_count"] == 0 + assert data["boundaries"]["payload_persisted"] is False + assert data["boundaries"]["wazuh_api_live_query_authorized"] is False + assert data["boundaries"]["runtime_execution_authorized"] is False + assert data["boundaries"]["runtime_gate_open"] is False + assert before["summary"] == after["summary"] + assert "192.168.0." not in response.text + assert "工作視窗" not in response.text + assert "批准!繼續" not in response.text + assert "WAZUH_API_PASSWORD" not in response.text + + +def test_iwooos_wazuh_live_metadata_gate_validator_quarantines_sensitive_payload( + monkeypatch, +) -> None: + monkeypatch.delenv("IWOOOS_WAZUH_READONLY_ENABLED", raising=False) + monkeypatch.delenv("WAZUH_API_BASE_URL", raising=False) + monkeypatch.delenv("WAZUH_API_USERNAME", raising=False) + monkeypatch.delenv("WAZUH_API_PASSWORD", raising=False) + + packet = _valid_live_metadata_owner_packet() + packet[ + "release_readback_ref" + ] = "bad ref includes 10.1.2.3 and Authorization: Bearer abcdefghijklmnop" + response = _client().post( + "/api/v1/iwooos/wazuh-live-metadata-gate/validate-live-metadata-owner-packet", + json=packet, + ) + + assert response.status_code == 200 + data = response.json() + assert data["status"] == "quarantine_sensitive_payload" + assert data["quarantined"] is True + assert data["summary"]["live_metadata_owner_packet_quarantined_count"] == 1 + assert data["summary"]["wazuh_api_live_query_authorized_count"] == 0 + assert data["summary"]["runtime_gate_count"] == 0 + assert "10.1.2.3" not in response.text + assert "Bearer abcdefghijklmnop" not in response.text + + +def test_iwooos_wazuh_live_metadata_gate_validator_rejects_runtime_action( + monkeypatch, +) -> None: + monkeypatch.delenv("IWOOOS_WAZUH_READONLY_ENABLED", raising=False) + monkeypatch.delenv("WAZUH_API_BASE_URL", raising=False) + monkeypatch.delenv("WAZUH_API_USERNAME", raising=False) + monkeypatch.delenv("WAZUH_API_PASSWORD", raising=False) + + packet = _valid_live_metadata_owner_packet() + packet["wazuh_api_live_query_authorized"] = True + response = _client().post( + "/api/v1/iwooos/wazuh-live-metadata-gate/validate-live-metadata-owner-packet", + json=packet, + ) + + assert response.status_code == 200 + data = response.json() + assert data["status"] == "reject_runtime_action_request" + assert data["runtime_action_rejected"] is True + assert data["summary"]["live_metadata_owner_runtime_action_rejected_count"] == 1 + assert data["summary"]["wazuh_api_live_query_authorized_count"] == 0 + assert data["summary"]["runtime_gate_count"] == 0 + + def test_iwooos_wazuh_owner_evidence_preflight_api_is_public_safe(monkeypatch) -> None: monkeypatch.delenv("IWOOOS_WAZUH_READONLY_ENABLED", raising=False) monkeypatch.delenv("WAZUH_API_BASE_URL", raising=False) diff --git a/docs/LOGBOOK.md b/docs/LOGBOOK.md index c785db61..09f4f7af 100644 --- a/docs/LOGBOOK.md +++ b/docs/LOGBOOK.md @@ -1,3 +1,17 @@ +## 2026-06-28 — 15:20 IwoooS Wazuh live metadata owner packet no-persist validator + +**完成內容**: +- API 新增 `POST /api/v1/iwooos/wazuh-live-metadata-gate/validate-live-metadata-owner-packet`,收 redacted live metadata owner packet 並分流 accepted / supplement / quarantine / runtime-action rejected。 +- Validator 只接受 metadata refs / attestation / scope / readback command;拒收 internal IP、Authorization / Bearer、password / token、raw env / raw Wazuh payload / raw session 與 runtime action request。 +- GET `/api/v1/iwooos/wazuh-live-metadata-gate` 新增 validation endpoint/mode;POST 前後 GET summary 不變,payload 不保存,live Wazuh query、active response、host write、runtime gate 仍全 0 / false。 + +**驗證結果**: +- `DATABASE_URL=sqlite:///test.db PYTHONPATH=apps/api python3.11 -m pytest apps/api/tests/test_iwooos_runtime_security_readback.py -q`:`17 passed`。 +- `DATABASE_URL=sqlite:///test.db PYTHONPATH=apps/api python3.11 -m pytest apps/api/tests/test_iwooos_wazuh_managed_host_coverage.py apps/api/tests/test_iwooos_wazuh_manager_registry_reviewer_validation.py apps/api/tests/test_iwooos_runtime_security_readback.py apps/api/tests/test_iwooos_security_control_coverage.py -q`:`36 passed`。 +- `python3 scripts/security/wazuh-readonly-route-boundary-guard.py --root .`、`python3 scripts/security/security-mirror-progress-guard.py --root .`、`py_compile`、`git diff --check`:通過。 + +**邊界**:沒有讀 secret / raw env / raw Wazuh payload / raw session;沒有查 live Wazuh;沒有 host / Docker / systemd / Nginx / firewall / K8s runtime action;沒有打開 runtime gate。 + ## 2026-06-28 — 14:57 110 runner/CD fail-close enforcer 與 startup 收斂 **完成內容**: