feat(iwooos): add Wazuh owner evidence preflight

This commit is contained in:
ogt
2026-06-25 10:18:52 +08:00
parent 548c8fcae8
commit 86e9092218
6 changed files with 455 additions and 1 deletions

View File

@@ -115,6 +115,10 @@ def validate(root: Path) -> None:
str(root / "scripts" / "security" / "wazuh-agent-visibility-runtime-gate.py")
)
wazuh_agent_visibility_runtime_gate["validate"](root)
wazuh_agent_visibility_owner_evidence_preflight = runpy.run_path(
str(root / "scripts" / "security" / "wazuh-agent-visibility-owner-evidence-preflight.py")
)
wazuh_agent_visibility_owner_evidence_preflight["validate"](root)
telegram_alert_readability_guard = runpy.run_path(
str(root / "scripts" / "security" / "telegram-alert-readability-guard.py")
)

View File

@@ -0,0 +1,255 @@
#!/usr/bin/env python3
"""
Wazuh agent visibility owner evidence 收件預檢。
本工具只驗證 repo 內 committed snapshot不查 Wazuh、不 SSH、不讀
secret、不保存 raw log、不重新註冊 agent也不啟用 active response。
"""
from __future__ import annotations
import argparse
import json
import re
from pathlib import Path
from typing import Any
SNAPSHOT_PATH = Path("docs/security/wazuh-agent-visibility-owner-evidence-preflight.snapshot.json")
SCHEMA_VERSION = "wazuh_agent_visibility_owner_evidence_preflight_v1"
REQUIRED_FIELDS = [
"owner_role",
"team",
"decision",
"decision_reason",
"affected_scope",
"agent_total",
"agent_active",
"agent_disconnected",
"agent_never_connected",
"last_seen_window_start",
"last_seen_window_end",
"registry_collected_at",
"manager_health_ref",
"dashboard_api_status_ref",
"redacted_evidence_refs",
"followup_owner",
"rollback_owner",
"postcheck_plan",
]
REVIEWER_CHECKS = [
"欄位齊全且皆為脫敏 metadata。",
"agent_total 不可小於 active + disconnected + never_connected。",
"last_seen 時間窗需能覆蓋事故觀察區間。",
"manager health ref 與 dashboard API status ref 不可互相替代。",
"redacted evidence refs 不得包含 raw payload、截圖原文或主機完整輸出。",
"owner decision 不可直接授權 active response、host write 或 secret rotation。",
"rollback owner 與 postcheck plan 必須存在。",
]
OUTCOME_LANES = [
"waiting_owner_evidence",
"request_missing_fields",
"quarantine_sensitive_payload",
"reject_runtime_action_request",
"ready_for_reviewer_validation",
]
FORBIDDEN_PAYLOADS = [
"raw_wazuh_payload",
"raw_log",
"full_journal",
"unredacted_screenshot",
"agent_name",
"internal_ip",
"authorization_header",
"bearer_token",
"basic_auth",
"password",
"cookie",
"private_key",
"client_keys",
"active_response_enable",
"host_write",
"firewall_change",
"nginx_reload",
]
FORBIDDEN_TEXT_PATTERNS = [
re.compile(r"\b(?:10|127|172\.(?:1[6-9]|2\d|3[01])|192\.168)\.\d{1,3}\.\d{1,3}\b"),
re.compile(r"Authorization\s*:", re.IGNORECASE),
re.compile(r"Bearer\s+[A-Za-z0-9._-]{10,}", re.IGNORECASE),
re.compile(r"Basic\s+[A-Za-z0-9+/=]{10,}", re.IGNORECASE),
re.compile(r"password\s*[:=]\s*['\"][^'\"]+['\"]", re.IGNORECASE),
re.compile(r"token\s*[:=]\s*['\"][^'\"]+['\"]", re.IGNORECASE),
re.compile(r"cookie\s*[:=]\s*['\"][^'\"]+['\"]", re.IGNORECASE),
re.compile(r"client\.keys", re.IGNORECASE),
re.compile(r"-----BEGIN [A-Z ]*PRIVATE KEY-----"),
]
def load_json(path: Path) -> dict[str, Any]:
return json.loads(path.read_text(encoding="utf-8"))
def assert_equal(label: str, actual: Any, expected: Any) -> None:
if actual != expected:
raise SystemExit(f"BLOCKED {label}: expected {expected!r}, got {actual!r}")
def assert_false(label: str, actual: Any) -> None:
assert_equal(label, actual, False)
def assert_zero(label: str, actual: Any) -> None:
assert_equal(label, actual, 0)
def collect_string_values(value: Any) -> list[str]:
if isinstance(value, str):
return [value]
if isinstance(value, list):
values: list[str] = []
for item in value:
values.extend(collect_string_values(item))
return values
if isinstance(value, dict):
values = []
for item in value.values():
values.extend(collect_string_values(item))
return values
return []
def validate_no_forbidden_text(snapshot: dict[str, Any]) -> None:
for text in collect_string_values(snapshot):
for pattern in FORBIDDEN_TEXT_PATTERNS:
if pattern.search(text):
raise SystemExit(
"BLOCKED wazuh_agent_visibility_owner_evidence_preflight: "
"snapshot contains forbidden sensitive text"
)
def build_snapshot() -> dict[str, Any]:
return {
"schema_version": SCHEMA_VERSION,
"status": "owner_evidence_preflight_ready_no_runtime_action",
"mode": "redacted_metadata_only_no_secret_no_wazuh_query",
"scope": "wazuh_agent_visibility_registry_truth",
"required_fields": REQUIRED_FIELDS,
"reviewer_checks": REVIEWER_CHECKS,
"outcome_lanes": OUTCOME_LANES,
"forbidden_payloads": FORBIDDEN_PAYLOADS,
"summary": {
"required_field_count": len(REQUIRED_FIELDS),
"reviewer_check_count": len(REVIEWER_CHECKS),
"outcome_lane_count": len(OUTCOME_LANES),
"forbidden_payload_count": len(FORBIDDEN_PAYLOADS),
"owner_evidence_received_count": 0,
"owner_evidence_accepted_count": 0,
"owner_evidence_rejected_count": 0,
"owner_evidence_quarantined_count": 0,
"runtime_gate_count": 0,
"active_response_authorized_count": 0,
"host_write_authorized_count": 0,
"secret_value_collection_allowed_count": 0,
},
"execution_boundaries": {
"wazuh_api_live_query_authorized": False,
"wazuh_active_response_authorized": False,
"host_write_authorized": False,
"secret_value_collection_allowed": False,
"raw_wazuh_payload_storage_allowed": False,
"agent_identity_public_display_allowed": False,
"internal_ip_public_display_allowed": False,
"runtime_execution_authorized": False,
"not_authorization": True,
},
"operator_interpretation": [
"這是 Wazuh agent registry 脫敏證據收件前的預檢,不代表已收到或已接受 owner evidence。",
"agent service active、TCP 連線存在、Dashboard 可見或口頭宣稱都不可替代 manager registry counts。",
"若 evidence 夾帶 raw log、未脫敏截圖、內網位址、agent 原名或 secret必須隔離不得渲染到前台。",
"任何 active response、host write、firewall、Nginx、Docker、K8s 或 secret 變更都要切獨立人工批准。",
],
}
def validate(root: Path) -> None:
snapshot = load_json(root / SNAPSHOT_PATH)
assert_equal("schema_version", snapshot.get("schema_version"), SCHEMA_VERSION)
assert_equal(
"status",
snapshot.get("status"),
"owner_evidence_preflight_ready_no_runtime_action",
)
assert_equal(
"mode",
snapshot.get("mode"),
"redacted_metadata_only_no_secret_no_wazuh_query",
)
assert_equal("required_fields", snapshot.get("required_fields"), REQUIRED_FIELDS)
assert_equal("reviewer_checks", snapshot.get("reviewer_checks"), REVIEWER_CHECKS)
assert_equal("outcome_lanes", snapshot.get("outcome_lanes"), OUTCOME_LANES)
assert_equal("forbidden_payloads", snapshot.get("forbidden_payloads"), FORBIDDEN_PAYLOADS)
summary = snapshot.get("summary", {})
assert_equal("summary.required_field_count", summary.get("required_field_count"), len(REQUIRED_FIELDS))
assert_equal("summary.reviewer_check_count", summary.get("reviewer_check_count"), len(REVIEWER_CHECKS))
assert_equal("summary.outcome_lane_count", summary.get("outcome_lane_count"), len(OUTCOME_LANES))
assert_equal("summary.forbidden_payload_count", summary.get("forbidden_payload_count"), len(FORBIDDEN_PAYLOADS))
for key in [
"owner_evidence_received_count",
"owner_evidence_accepted_count",
"owner_evidence_rejected_count",
"owner_evidence_quarantined_count",
"runtime_gate_count",
"active_response_authorized_count",
"host_write_authorized_count",
"secret_value_collection_allowed_count",
]:
assert_zero(f"summary.{key}", summary.get(key))
boundaries = snapshot.get("execution_boundaries", {})
for key, value in boundaries.items():
if key == "not_authorization":
assert_equal(f"execution_boundaries.{key}", value, True)
else:
assert_false(f"execution_boundaries.{key}", value)
validate_no_forbidden_text(snapshot)
def main() -> None:
parser = argparse.ArgumentParser(description="Wazuh agent visibility owner evidence 收件預檢")
parser.add_argument("--root", type=Path, default=Path.cwd())
parser.add_argument("--output", type=Path)
parser.add_argument("--json", action="store_true")
args = parser.parse_args()
root = args.root.resolve()
if args.output:
snapshot = build_snapshot()
output = args.output if args.output.is_absolute() else root / args.output
output.parent.mkdir(parents=True, exist_ok=True)
output.write_text(json.dumps(snapshot, ensure_ascii=False, indent=2, sort_keys=True) + "\n", encoding="utf-8")
validate(root)
snapshot = load_json(root / SNAPSHOT_PATH)
if args.json:
print(json.dumps(snapshot, ensure_ascii=False, sort_keys=True))
return
summary = snapshot["summary"]
print(
"WAZUH_AGENT_VISIBILITY_OWNER_EVIDENCE_PREFLIGHT_OK "
f"fields={summary['required_field_count']} "
f"checks={summary['reviewer_check_count']} "
f"received={summary['owner_evidence_received_count']} "
f"accepted={summary['owner_evidence_accepted_count']} "
f"runtime_gate={summary['runtime_gate_count']}"
)
if __name__ == "__main__":
main()