Files
awoooi/apps/api/src/services/iwooos_security_operating_system.py
Your Name d50a453a79
Some checks failed
CD Pipeline / workflow-shape (push) Successful in 0s
CD Pipeline / cancel-stale-cd (push) Has been skipped
CD Pipeline / tests (push) Failing after 2m46s
CD Pipeline / build-and-deploy (push) Has been skipped
CD Pipeline / post-deploy-checks (push) Has been skipped
feat(security): add iwooos operation packet validator
2026-06-29 13:43:51 +08:00

840 lines
32 KiB
Python

"""
IwoooS security operating system readback and operation-packet validator.
The validator is the public-safe intake point for a redacted security operation
packet. It checks the security loop evidence contract without persisting the
payload, reading secrets, querying live hosts, or authorizing runtime actions.
"""
from __future__ import annotations
import json
import re
from pathlib import Path
from typing import Any
from src.services.snapshot_paths import default_security_dir
_DEFAULT_SECURITY_DIR = default_security_dir(Path(__file__))
_SNAPSHOT_FILE = "iwooos-security-operating-system.snapshot.json"
_EXPECTED_SCHEMA = "iwooos_security_operating_system_v1"
_REQUIRED_FALSE_BOUNDARIES = {
"auto_block_authorized",
"firewall_change_authorized",
"host_write_authorized",
"kali_active_scan_authorized",
"kali_execute_authorized",
"nginx_reload_authorized",
"production_write_authorized",
"runtime_execution_authorized",
"secret_value_collection_allowed",
"soar_action_authorized",
"telegram_live_send_authorized",
"wazuh_active_response_authorized",
}
_REQUIRED_OPERATION_PACKET_FIELDS = [
"operation_intent",
"event_title",
"severity",
"confidence",
"asset_aliases",
"asset_scope",
"what_happened_plain_language",
"why_it_matters",
"redacted_evidence_refs",
"ai_triage_lane",
"candidate_action",
"owner_gate_and_verification",
"target_selector_aliases",
"source_of_truth_diff_ref",
"check_mode_plan_ref",
"dry_run_evidence_ref",
"rollback_plan_ref",
"rollback_owner",
"post_apply_verifier_ref",
"km_playbook_writeback_ref",
"audit_receipt_ref",
"runtime_boundary_ack",
"host_write_boundary_ack",
"secret_boundary_ack",
]
_EXPECTED_ACKS = {
"runtime_boundary_ack": "runtime_gate_remains_closed_until_post_verifier_passes",
"host_write_boundary_ack": "no_host_write_performed_by_packet_validator",
"secret_boundary_ack": "no_secret_value_collected_or_submitted",
}
_SENSITIVE_TEXT_PATTERNS = {
"internal_ip": re.compile(
r"\b(?:10|127|172\.(?:1[6-9]|2\d|3[01])|192\.168)\.\d{1,3}\.\d{1,3}\b"
),
"authorization_header": re.compile(r"Authorization\s*:", re.IGNORECASE),
"bearer_token": re.compile(r"Bearer\s+[A-Za-z0-9._-]{10,}", re.IGNORECASE),
"basic_auth": re.compile(r"Basic\s+[A-Za-z0-9+/=]{10,}", re.IGNORECASE),
"password_assignment": re.compile(
r"password\s*[:=]\s*['\"][^'\"]+['\"]", re.IGNORECASE
),
"token_assignment": re.compile(r"token\s*[:=]\s*['\"][^'\"]+['\"]", re.IGNORECASE),
"cookie_assignment": re.compile(
r"cookie\s*[:=]\s*['\"][^'\"]+['\"]", re.IGNORECASE
),
"client_keys": re.compile(r"client\.keys", re.IGNORECASE),
"private_key": re.compile(r"-----BEGIN [A-Z ]*PRIVATE KEY-----"),
"raw_session_text": re.compile(
r"(工作視窗|批准!繼續|source_thread_id|raw session)", re.IGNORECASE
),
}
_FORBIDDEN_KEY_FRAGMENTS = {
"authorization_header",
"basic_auth",
"bearer_token",
"client_keys",
"cookie",
"env_file",
"full_cli_output",
"full_journal",
"hostname",
"internal_ip",
"password",
"private_key",
"raw_agent_identity",
"raw_alert_payload",
"raw_dashboard_request",
"raw_env",
"raw_hostname",
"raw_log",
"raw_runtime_volume",
"raw_session",
"raw_wazuh_payload",
"secret_value",
"session",
"stored_api_password",
"token",
"unredacted_screenshot",
"wazuh_api_password",
}
_RUNTIME_ACTION_KEYS = {
"active_response_enable",
"agent_reenroll",
"agent_restart",
"ansible_apply",
"ansible_playbook_run",
"apply_now",
"argocd_sync",
"auto_block",
"credentialed_scan",
"database_migration",
"docker_restart",
"execute_now",
"exploit_attempt",
"firewall_change",
"force_push",
"host_write",
"k8s_apply",
"kali_active_scan",
"nginx_reload",
"production_write",
"repo_ref_delete",
"runtime_execution_authorized",
"secret_rotation",
"soar_action",
"systemd_restart",
"telegram_live_send",
"wazuh_active_response",
"wazuh_agent_reenroll",
"wazuh_agent_restart",
"wazuh_api_live_query",
"wazuh_manager_restart",
"workflow_trigger",
}
def load_latest_iwooos_security_operating_system(
security_dir: Path | None = None,
) -> dict[str, Any]:
"""Load the committed IwoooS security operating system contract."""
directory = security_dir or _DEFAULT_SECURITY_DIR
snapshot = _load_snapshot(directory)
_require_boundaries(snapshot)
summary = _summary(snapshot)
merged_summary = {
"reference_framework_count": _int(summary.get("reference_framework_count")),
"operating_role_count": _int(summary.get("operating_role_count")),
"severity_lane_count": _int(summary.get("severity_lane_count")),
"workstream_count": _int(summary.get("workstream_count")),
"p0_workstream_count": _int(summary.get("p0_workstream_count")),
"p1_workstream_count": _int(summary.get("p1_workstream_count")),
"p2_workstream_count": _int(summary.get("p2_workstream_count")),
"alert_contract_field_count": _int(summary.get("alert_contract_field_count")),
"automation_loop_stage_count": _int(summary.get("automation_loop_stage_count")),
"verification_stage_count": _int(summary.get("verification_stage_count")),
"no_false_green_rule_count": _int(summary.get("no_false_green_rule_count")),
"cross_session_sync_checkpoint_count": _int(
summary.get("cross_session_sync_checkpoint_count")
),
"blocked_action_count": _int(summary.get("blocked_action_count")),
"source_control_artifact_percent": _int(
summary.get("source_control_artifact_percent")
),
"soc_siem_framework_percent": _int(summary.get("soc_siem_framework_percent")),
"evidence_weighted_security_operating_system_percent": _int(
summary.get("evidence_weighted_security_operating_system_percent")
),
"wazuh_manager_registry_acceptance_percent": _int(
summary.get("wazuh_manager_registry_acceptance_percent")
),
"wazuh_registry_accepted_count": _int(
summary.get("wazuh_registry_accepted_count")
),
"alert_receipt_accepted_count": _int(
summary.get("alert_receipt_accepted_count")
),
"incident_case_accepted_count": _int(
summary.get("incident_case_accepted_count")
),
"host_forensics_accepted_count": _int(
summary.get("host_forensics_accepted_count")
),
"kali_scope_accepted_count": _int(summary.get("kali_scope_accepted_count")),
"owner_response_received_count": _int(
summary.get("owner_response_received_count")
),
"owner_response_accepted_count": _int(
summary.get("owner_response_accepted_count")
),
"runtime_response_percent": _int(summary.get("runtime_response_percent")),
"runtime_gate_count": _int(summary.get("runtime_gate_count")),
"action_button_count": _int(summary.get("action_button_count")),
"operation_packet_validator_available_count": 1,
"operation_packet_required_field_count": len(_REQUIRED_OPERATION_PACKET_FIELDS),
}
return {
"schema_version": "iwooos_security_operating_system_readback_v1",
"source_schema_version": snapshot["schema_version"],
"status": snapshot.get(
"status", "iwooos_security_operating_system_ready_no_runtime_action"
),
"mode": "committed_security_operating_system_readback_with_no_persist_operation_packet_validator",
"source_refs": [
f"docs/security/{_SNAPSHOT_FILE}",
"scripts/security/iwooos-security-operating-system.py",
],
"operation_packet_validation_endpoint": (
"/api/v1/iwooos/security-operating-system/validate-operation-packet"
),
"operation_packet_validation_mode": (
"no_persist_security_operation_review_no_runtime_no_secret_collection"
),
"summary": merged_summary,
"reference_frameworks": _reference_frameworks(snapshot.get("reference_frameworks")),
"operating_roles": _operating_roles(snapshot.get("operating_roles")),
"severity_lanes": _severity_lanes(snapshot.get("severity_lanes")),
"automation_loop_stages": _loop_stages(snapshot.get("automation_loop_stages")),
"alert_message_contract": _contract_fields(snapshot.get("alert_message_contract")),
"verification_stages": _verification_stages(snapshot.get("verification_stages")),
"workstreams": _workstreams(snapshot.get("workstreams")),
"required_operation_packet_fields": list(_REQUIRED_OPERATION_PACKET_FIELDS),
"expected_boundary_acks": dict(_EXPECTED_ACKS),
"blocked_actions": _strings(snapshot.get("blocked_actions")),
"no_false_green_rules": _rules(snapshot.get("no_false_green_rules")),
"cross_session_sync_checkpoints": _checkpoints(
snapshot.get("cross_session_sync_checkpoints")
),
"boundary_markers": _boundary_markers(merged_summary),
"boundaries": {
"payload_persisted": False,
"secret_value_collection_allowed": False,
"host_write_authorized": False,
"wazuh_active_response_authorized": False,
"kali_active_scan_authorized": False,
"kali_execute_authorized": False,
"soar_action_authorized": False,
"auto_block_authorized": False,
"telegram_live_send_authorized": False,
"production_write_authorized": False,
"runtime_execution_authorized": False,
"runtime_gate_open": False,
"non_incident_guard_mode": "ai_controlled_apply_open_for_review_and_dry_run",
"break_glass_required_for_destructive_or_secret_or_live_response": True,
"not_authorization": True,
},
}
def validate_iwooos_security_operation_packet(
operation_packet: dict[str, Any],
security_dir: Path | None = None,
) -> dict[str, Any]:
"""Validate one redacted security operation packet without applying it."""
contract = load_latest_iwooos_security_operating_system(security_dir)
snapshot = _load_snapshot(security_dir or _DEFAULT_SECURITY_DIR)
severity_values = {
item["severity"]
for item in _severity_lanes(snapshot.get("severity_lanes"))
if item.get("severity")
}
workstream_ids = {
item["workstream_id"]
for item in _workstreams(snapshot.get("workstreams"))
if item.get("workstream_id")
}
findings: list[dict[str, Any]] = []
if not isinstance(operation_packet, dict):
findings.append(
_finding(
"SOS-01",
"blocker",
"request_security_operation_packet_supplement",
"security operation packet must be a JSON object.",
[],
)
)
return _validation_result(
contract, "request_security_operation_packet_supplement", findings
)
sensitive_hits = _collect_sensitive_hits(operation_packet)
if sensitive_hits:
findings.append(
_finding(
"SOS-04",
"critical",
"quarantine_sensitive_payload",
"security operation packet contains forbidden or likely unredacted content; response omits raw values.",
[hit["path"] for hit in sensitive_hits[:12]],
{"categories": sorted({hit["category"] for hit in sensitive_hits})},
)
)
return _validation_result(contract, "quarantine_sensitive_payload", findings)
runtime_hits = _collect_runtime_action_hits(operation_packet)
if runtime_hits:
findings.append(
_finding(
"SOS-05",
"critical",
"reject_runtime_action_request",
"security operation packet requested live runtime execution; this validator only reviews evidence, dry-run, verifier, and writeback readiness.",
runtime_hits[:12],
)
)
return _validation_result(contract, "reject_runtime_action_request", findings)
missing_fields = [
field
for field in _REQUIRED_OPERATION_PACKET_FIELDS
if not _present(operation_packet.get(field))
]
if missing_fields:
findings.append(
_finding(
"SOS-01",
"blocker",
"request_security_operation_packet_supplement",
"security operation packet is missing required security loop fields.",
missing_fields,
)
)
if operation_packet.get("operation_intent") != "validate_security_operation_loop_only":
findings.append(
_finding(
"SOS-02",
"blocker",
"request_operation_intent_fix",
"operation_intent must be validate_security_operation_loop_only.",
["operation_intent"],
)
)
if operation_packet.get("severity") not in severity_values:
findings.append(
_finding(
"SOS-03",
"blocker",
"request_severity_lane_fix",
"severity must match one committed IwoooS severity lane.",
["severity"],
{"allowed_severities": sorted(severity_values)},
)
)
workstream_id = operation_packet.get("workstream_id")
if workstream_id is not None and workstream_id not in workstream_ids:
findings.append(
_finding(
"SOS-06",
"blocker",
"request_workstream_fix",
"workstream_id must match one committed IwoooS security workstream when supplied.",
["workstream_id"],
)
)
alias_issue = _validate_public_aliases(operation_packet.get("target_selector_aliases"))
if alias_issue:
findings.append(
_finding(
"SOS-07",
"blocker",
"request_target_selector_fix",
alias_issue,
["target_selector_aliases"],
)
)
bad_ack_fields = [
field
for field, expected in _EXPECTED_ACKS.items()
if operation_packet.get(field) != expected
]
if bad_ack_fields:
findings.append(
_finding(
"SOS-08",
"blocker",
"request_boundary_ack_fix",
"boundary acks must keep runtime execution, host writes, and secret collection closed until verifier-backed apply is separately authorized.",
bad_ack_fields,
)
)
evidence_refs = operation_packet.get("redacted_evidence_refs")
if isinstance(evidence_refs, list) and len(evidence_refs) < 2:
findings.append(
_finding(
"SOS-09",
"blocker",
"request_evidence_ref_supplement",
"redacted_evidence_refs must include at least two independent public-safe evidence references.",
["redacted_evidence_refs"],
)
)
outcome = _first_blocking_lane(findings) or "accepted_for_security_operation_review_only"
if outcome == "accepted_for_security_operation_review_only":
findings.append(
_finding(
"SOS-10",
"info",
"security_operation_loop_ready",
"security operation packet passed no-persist review; next action is controlled check-mode / dry-run and verifier readback, not live runtime execution.",
[
"source_of_truth_diff_ref",
"check_mode_plan_ref",
"dry_run_evidence_ref",
"post_apply_verifier_ref",
"km_playbook_writeback_ref",
],
)
)
return _validation_result(contract, outcome, findings)
def _load_snapshot(directory: Path) -> dict[str, Any]:
path = directory / _SNAPSHOT_FILE
if not path.is_file():
raise FileNotFoundError(f"{path}: IwoooS security operating system snapshot not found")
with path.open(encoding="utf-8") as handle:
payload = json.load(handle)
if not isinstance(payload, dict):
raise ValueError(f"{path}: expected JSON object")
if payload.get("schema_version") != _EXPECTED_SCHEMA:
raise ValueError(f"{path}: expected schema_version={_EXPECTED_SCHEMA}")
return payload
def _summary(payload: dict[str, Any]) -> dict[str, Any]:
summary = payload.get("summary")
return summary if isinstance(summary, dict) else {}
def _int(value: Any) -> int:
return value if isinstance(value, int) else 0
def _strings(value: Any) -> list[str]:
if not isinstance(value, list):
return []
return [item for item in value if isinstance(item, str)]
def _reference_frameworks(value: Any) -> list[dict[str, str]]:
if not isinstance(value, list):
return []
items: list[dict[str, str]] = []
for item in value:
if isinstance(item, dict):
items.append(
{
"framework_id": str(item.get("framework_id", "")),
"label": str(item.get("label", "")),
"source_url": str(item.get("source_url", "")),
}
)
return items
def _operating_roles(value: Any) -> list[dict[str, Any]]:
if not isinstance(value, list):
return []
items: list[dict[str, Any]] = []
for item in value:
if isinstance(item, dict):
items.append(
{
"role_id": str(item.get("role_id", "")),
"label": str(item.get("label", "")),
"responsibility": str(item.get("responsibility", "")),
"runtime_gate_open": item.get("runtime_gate_open") is True,
}
)
return items
def _severity_lanes(value: Any) -> list[dict[str, Any]]:
if not isinstance(value, list):
return []
items: list[dict[str, Any]] = []
for item in value:
if isinstance(item, dict):
items.append(
{
"severity": str(item.get("severity", "")),
"label": str(item.get("label", "")),
"triage_target": str(item.get("triage_target", "")),
"runtime_gate_open": item.get("runtime_gate_open") is True,
}
)
return items
def _loop_stages(value: Any) -> list[dict[str, Any]]:
if not isinstance(value, list):
return []
items: list[dict[str, Any]] = []
for item in value:
if isinstance(item, dict):
items.append(
{
"stage_id": str(item.get("stage_id", "")),
"runtime_gate_open": item.get("runtime_gate_open") is True,
}
)
return items
def _contract_fields(value: Any) -> list[dict[str, Any]]:
if not isinstance(value, list):
return []
items: list[dict[str, Any]] = []
for item in value:
if isinstance(item, dict):
items.append(
{
"field_id": str(item.get("field_id", "")),
"required": item.get("required") is True,
"raw_payload_allowed": item.get("raw_payload_allowed") is True,
}
)
return items
def _verification_stages(value: Any) -> list[dict[str, Any]]:
if not isinstance(value, list):
return []
items: list[dict[str, Any]] = []
for item in value:
if isinstance(item, dict):
items.append(
{
"stage_id": str(item.get("stage_id", "")),
"accepted": item.get("accepted") is True,
"runtime_gate_open": item.get("runtime_gate_open") is True,
}
)
return items
def _workstreams(value: Any) -> list[dict[str, str]]:
if not isinstance(value, list):
return []
items: list[dict[str, str]] = []
for item in value:
if isinstance(item, dict):
items.append(
{
"workstream_id": str(item.get("workstream_id", "")),
"lane_id": str(item.get("lane_id", "")),
"priority": str(item.get("priority", "")),
"title": str(item.get("title", "")),
"scope": str(item.get("scope", item.get("coverage", ""))),
}
)
return items
def _rules(value: Any) -> list[dict[str, Any]]:
if not isinstance(value, list):
return []
items: list[dict[str, Any]] = []
for item in value:
if isinstance(item, dict):
items.append(
{
"rule_id": str(item.get("rule_id", "")),
"enforced": item.get("enforced") is True,
}
)
return items
def _checkpoints(value: Any) -> list[dict[str, Any]]:
if not isinstance(value, list):
return []
items: list[dict[str, Any]] = []
for item in value:
if isinstance(item, dict):
items.append(
{
"checkpoint_id": str(item.get("checkpoint_id", "")),
"required": item.get("required") is True,
}
)
return items
def _boundary_markers(summary: dict[str, int]) -> list[str]:
return [
"iwooos_security_operating_system_api_visible=true",
"iwooos_security_operation_packet_validation_api_available=true",
f"iwooos_security_operating_system_reference_framework_count={summary['reference_framework_count']}",
f"iwooos_security_operating_system_operating_role_count={summary['operating_role_count']}",
f"iwooos_security_operating_system_severity_lane_count={summary['severity_lane_count']}",
f"iwooos_security_operating_system_workstream_count={summary['workstream_count']}",
f"iwooos_security_operating_system_p0_workstream_count={summary['p0_workstream_count']}",
f"iwooos_security_operating_system_alert_contract_field_count={summary['alert_contract_field_count']}",
f"iwooos_security_operating_system_automation_loop_stage_count={summary['automation_loop_stage_count']}",
f"iwooos_security_operating_system_verification_stage_count={summary['verification_stage_count']}",
f"iwooos_security_operating_system_operation_packet_required_field_count={summary['operation_packet_required_field_count']}",
f"iwooos_security_operating_system_evidence_weighted_percent={summary['evidence_weighted_security_operating_system_percent']}",
f"iwooos_security_operating_system_wazuh_registry_accepted_count={summary['wazuh_registry_accepted_count']}",
"iwooos_security_operating_system_operation_packet_validation_no_persist=true",
"iwooos_security_operating_system_runtime_gate_count=0",
"runtime_execution_authorized=false",
"host_write_authorized=false",
"secret_value_collection_allowed=false",
"not_authorization=true",
]
def _require_boundaries(payload: dict[str, Any]) -> None:
summary = _summary(payload)
for key in (
"runtime_gate_count",
"action_button_count",
"owner_response_received_count",
"owner_response_accepted_count",
"alert_receipt_accepted_count",
"incident_case_accepted_count",
"host_forensics_accepted_count",
"kali_scope_accepted_count",
):
if _int(summary.get(key)) != 0:
raise ValueError(f"IwoooS security operating system summary.{key} must remain 0")
expected_lengths = {
"reference_frameworks": _int(summary.get("reference_framework_count")),
"operating_roles": _int(summary.get("operating_role_count")),
"severity_lanes": _int(summary.get("severity_lane_count")),
"workstreams": _int(summary.get("workstream_count")),
"alert_message_contract": _int(summary.get("alert_contract_field_count")),
"automation_loop_stages": _int(summary.get("automation_loop_stage_count")),
"verification_stages": _int(summary.get("verification_stage_count")),
"no_false_green_rules": _int(summary.get("no_false_green_rule_count")),
"cross_session_sync_checkpoints": _int(
summary.get("cross_session_sync_checkpoint_count")
),
"blocked_actions": _int(summary.get("blocked_action_count")),
}
for key, expected in expected_lengths.items():
value = payload.get(key)
if not isinstance(value, list) or len(value) != expected:
raise ValueError(
f"IwoooS security operating system {key} must match summary count"
)
boundaries = payload.get("execution_boundaries")
if not isinstance(boundaries, dict):
raise ValueError("IwoooS security operating system execution_boundaries missing")
for key in _REQUIRED_FALSE_BOUNDARIES:
if boundaries.get(key) is not False:
raise ValueError(
f"IwoooS security operating system execution_boundaries.{key} must remain false"
)
if boundaries.get("not_authorization") is not True:
raise ValueError("IwoooS security operating system not_authorization must remain true")
def _validation_result(
contract: dict[str, Any],
outcome_lane: str,
findings: list[dict[str, Any]],
) -> dict[str, Any]:
accepted = outcome_lane == "accepted_for_security_operation_review_only"
quarantined = outcome_lane == "quarantine_sensitive_payload"
rejected_runtime = outcome_lane == "reject_runtime_action_request"
supplement_required = not accepted and not quarantined and not rejected_runtime
return {
"schema_version": "iwooos_security_operation_packet_validation_result_v1",
"contract_schema_version": contract["schema_version"],
"status": outcome_lane,
"mode": "no_persist_security_operation_review_no_runtime_no_secret_collection",
"outcome_lane": outcome_lane,
"accepted_for_security_operation_review_only": accepted,
"quarantined": quarantined,
"runtime_action_rejected": rejected_runtime,
"summary": {
"security_operation_packet_received_count": 1,
"security_operation_packet_accepted_count": 1 if accepted else 0,
"security_operation_packet_supplement_required_count": 1
if supplement_required
else 0,
"security_operation_packet_quarantined_count": 1 if quarantined else 0,
"security_operation_runtime_action_rejected_count": 1
if rejected_runtime
else 0,
"runtime_gate_count": 0,
"host_write_authorized_count": 0,
"secret_value_collection_allowed_count": 0,
"wazuh_active_response_authorized_count": 0,
"kali_active_scan_authorized_count": 0,
"soar_action_authorized_count": 0,
"finding_count": len(findings),
},
"validation_findings": findings,
"boundary_markers": [
"iwooos_security_operation_packet_validation_received_count=1",
f"iwooos_security_operation_packet_validation_accepted_count={1 if accepted else 0}",
f"iwooos_security_operation_packet_validation_quarantined_count={1 if quarantined else 0}",
f"iwooos_security_operation_packet_validation_runtime_action_rejected_count={1 if rejected_runtime else 0}",
"iwooos_security_operation_packet_validation_no_persist=true",
"iwooos_security_operation_packet_validation_runtime_gate_count=0",
"runtime_execution_authorized=false",
"host_write_authorized=false",
"secret_value_collection_allowed=false",
"not_authorization=true",
],
"boundaries": {
"payload_persisted": False,
"secret_value_collection_allowed": False,
"host_write_authorized": False,
"wazuh_active_response_authorized": False,
"kali_active_scan_authorized": False,
"kali_execute_authorized": False,
"soar_action_authorized": False,
"auto_block_authorized": False,
"telegram_live_send_authorized": False,
"production_write_authorized": False,
"runtime_execution_authorized": False,
"runtime_gate_open": False,
"not_authorization": True,
},
"next_gate": (
"controlled_check_mode_dry_run_then_post_apply_verifier_readback"
if accepted
else "security_operation_packet_fix_and_resubmit"
),
}
def _finding(
check_id: str,
severity: str,
lane: str,
message: str,
field_paths: list[str],
extra: dict[str, Any] | None = None,
) -> dict[str, Any]:
payload: dict[str, Any] = {
"check_id": check_id,
"severity": severity,
"lane": lane,
"message": message,
"field_paths": field_paths,
}
if extra:
payload.update(extra)
return payload
def _present(value: Any) -> bool:
if value is None:
return False
if isinstance(value, str):
return bool(value.strip())
if isinstance(value, list | dict | tuple | set):
return bool(value)
return True
def _validate_public_aliases(value: Any) -> str | None:
aliases = value if isinstance(value, list) else []
if not aliases or not all(isinstance(item, str) for item in aliases):
return "target_selector_aliases must be an array of public alias strings."
alias_set = set(aliases)
if len(aliases) != len(alias_set):
return "target_selector_aliases must not contain duplicates."
if any(not re.fullmatch(r"[a-z0-9][a-z0-9_-]{2,80}", item) for item in aliases):
return "target_selector_aliases must use public-safe alias ids only."
return None
def _collect_sensitive_hits(value: Any, path: str = "$") -> list[dict[str, str]]:
hits: list[dict[str, str]] = []
if isinstance(value, dict):
for key, item in value.items():
key_text = str(key)
key_lower = key_text.lower()
for fragment in _FORBIDDEN_KEY_FRAGMENTS:
if fragment in key_lower:
hits.append({"category": f"key:{fragment}", "path": f"{path}.{key_text}"})
hits.extend(_collect_sensitive_hits(item, f"{path}.{key_text}"))
elif isinstance(value, list):
for index, item in enumerate(value):
hits.extend(_collect_sensitive_hits(item, f"{path}[{index}]"))
elif isinstance(value, str):
for category, pattern in _SENSITIVE_TEXT_PATTERNS.items():
if pattern.search(value):
hits.append({"category": category, "path": path})
return hits
def _collect_runtime_action_hits(value: Any, path: str = "$") -> list[str]:
hits: list[str] = []
if isinstance(value, dict):
for key, item in value.items():
key_text = str(key)
if key_text.lower() in _RUNTIME_ACTION_KEYS:
hits.append(f"{path}.{key_text}")
hits.extend(_collect_runtime_action_hits(item, f"{path}.{key_text}"))
elif isinstance(value, list):
for index, item in enumerate(value):
hits.extend(_collect_runtime_action_hits(item, f"{path}[{index}]"))
return hits
def _first_blocking_lane(findings: list[dict[str, Any]]) -> str | None:
for finding in findings:
if finding.get("severity") in {"critical", "blocker"}:
return str(finding.get("lane", "request_security_operation_packet_supplement"))
return None