Files
awoooi/apps/api/src/services/gitea_owner_coverage_attestation_validation.py
Your Name b9293b76b5
All checks were successful
CD Pipeline / workflow-shape (push) Successful in 1s
CD Pipeline / cancel-stale-cd (push) Has been skipped
CD Pipeline / tests (push) Successful in 19s
CD Pipeline / build-and-deploy (push) Successful in 4m29s
CD Pipeline / post-deploy-checks (push) Successful in 59s
feat(api): validate gitea owner attestation intake
2026-06-29 19:48:05 +08:00

389 lines
15 KiB
Python

"""P0-003 Gitea owner coverage attestation no-persist validation.
This service validates one owner-provided redacted coverage attestation packet.
It loads the committed response templates, accepts only redacted metadata, and
never writes repos, refs, secrets, workflow state, or runtime state.
"""
from __future__ import annotations
import json
import re
from pathlib import Path
from typing import Any
from urllib.parse import parse_qsl, urlsplit
from src.services.gitea_private_inventory_p0_scorecard import (
load_latest_gitea_private_inventory_p0_scorecard,
)
from src.services.snapshot_paths import default_security_dir
_SCHEMA_VERSION = "gitea_owner_coverage_attestation_validation_v1"
_OWNER_RESPONSE_SCHEMA_VERSION = "gitea_inventory_owner_attestation_response_v1"
_OWNER_RESPONSE_PACKET = "gitea-inventory-owner-attestation-response.snapshot.json"
_LANE_ID = "s4_9_gitea_inventory_owner_attestation_response"
_BLOCKERS_CLEARED_BY_ACCEPTED_ATTESTATION = {
"gitea_owner_coverage_attestation_not_received",
}
_BLOCKERS_CLEARED_AFTER_INVENTORY_RECEIPT_WRITEBACK = {
"gitea_repo_inventory_status_not_ok",
"gitea_visibility_scope_public_only_or_unknown",
"gitea_authenticated_inventory_payload_not_accepted",
"gitea_owner_coverage_attestation_not_received",
}
_FORBIDDEN_TRUE_FIELDS = {
"action_buttons_allowed",
"change_repo_visibility",
"create_gitea_repo",
"create_github_repo",
"delete_or_archive_gitea_repo",
"execution_authorized",
"force_push",
"gitea_repo_write_authorized",
"github_primary_switch_authorized",
"refs_sync_allowed",
"refs_sync_authorized",
"repo_write_allowed",
"runtime_action_requested",
"runtime_execution_authorized",
"secret_value_collection_allowed",
"sync_git_refs",
"token_value_collection_allowed",
"workflow_dispatch_requested",
"write_to_gitea",
}
_SECRET_PATTERNS = {
"authorization_header": re.compile(r"Authorization\s*:", re.IGNORECASE),
"bearer_token": re.compile(r"Bearer\s+[A-Za-z0-9._~+/=-]{12,}", re.IGNORECASE),
"cookie_header": re.compile(r"\bCookie\s*:", re.IGNORECASE),
"password_assignment": re.compile(r"\bpassword\s*[:=]\s*[^,\s]+", re.IGNORECASE),
"private_key": re.compile(r"-----BEGIN [A-Z ]*PRIVATE KEY-----"),
"token_assignment": re.compile(r"\btoken\s*[:=]\s*[^,\s]+", re.IGNORECASE),
}
_SECRET_QUERY_KEYS = {"access_token", "auth", "key", "password", "secret", "token"}
def validate_gitea_owner_coverage_attestation(
owner_response: dict[str, Any],
scorecard: dict[str, Any] | None = None,
security_dir: Path | None = None,
) -> dict[str, Any]:
"""Validate one redacted P0-003 owner attestation packet without persisting it."""
current = scorecard or load_latest_gitea_private_inventory_p0_scorecard()
templates = _load_response_templates(security_dir)
template_by_item = {
str(template.get("attestation_item_id")): template for template in templates
}
required_item_ids = set(template_by_item)
blockers: list[str] = []
sensitive_hits = _find_sensitive_strings(owner_response)
forbidden_true_fields = _find_forbidden_true_fields(owner_response)
if owner_response.get("schema_version") != _OWNER_RESPONSE_SCHEMA_VERSION:
blockers.append(f"schema_version_not_{_OWNER_RESPONSE_SCHEMA_VERSION}")
if str(owner_response.get("lane_id") or "") != _LANE_ID:
blockers.append("lane_id_not_s4_9_owner_attestation_response")
responses = _response_items(owner_response)
response_by_item: dict[str, dict[str, Any]] = {}
duplicate_item_ids: list[str] = []
for index, response in enumerate(responses):
item_id = str(response.get("attestation_item_id") or "")
if not item_id:
blockers.append(f"responses[{index}].attestation_item_id_missing")
continue
if item_id in response_by_item:
duplicate_item_ids.append(item_id)
response_by_item[item_id] = response
unknown_item_ids = sorted(set(response_by_item) - required_item_ids)
missing_item_ids = sorted(required_item_ids - set(response_by_item))
blockers.extend(f"unknown_attestation_item_id:{item_id}" for item_id in unknown_item_ids)
blockers.extend(f"missing_attestation_item_id:{item_id}" for item_id in missing_item_ids)
blockers.extend(f"duplicate_attestation_item_id:{item_id}" for item_id in duplicate_item_ids)
for item_id in sorted(required_item_ids & set(response_by_item)):
blockers.extend(
_validate_response_item(
item_id=item_id,
response=response_by_item[item_id],
template=template_by_item[item_id],
)
)
if forbidden_true_fields:
status = "rejected_execution_request"
elif sensitive_hits:
status = "quarantined_sensitive_payload"
elif blockers:
status = "needs_supplement"
else:
status = "accepted_for_owner_coverage_attestation_review_only"
accepted = status == "accepted_for_owner_coverage_attestation_review_only"
current_rollups = _as_dict(current.get("rollups"))
current_blockers = _strings(current.get("active_blockers"))
projected_blockers = (
[
blocker
for blocker in current_blockers
if blocker not in _BLOCKERS_CLEARED_BY_ACCEPTED_ATTESTATION
]
if accepted
else current_blockers
)
projected_after_inventory_receipt = (
[
blocker
for blocker in current_blockers
if blocker not in _BLOCKERS_CLEARED_AFTER_INVENTORY_RECEIPT_WRITEBACK
]
if accepted
else current_blockers
)
current_accepted_attestation_count = _as_int(
current_rollups.get("owner_coverage_attestation_accepted_count")
)
projected_accepted_attestation_count = (
max(current_accepted_attestation_count, 1)
if accepted
else current_accepted_attestation_count
)
return {
"schema_version": _SCHEMA_VERSION,
"status": status,
"priority": "P0-003",
"scope": "gitea_owner_coverage_attestation_validation",
"source_contract": _OWNER_RESPONSE_SCHEMA_VERSION,
"source_scorecard_status": current.get("status"),
"result": {
"accepted_attestation_packet_count": 1 if accepted else 0,
"required_response_item_count": len(required_item_ids),
"provided_response_item_count": len(response_by_item),
"accepted_response_count": len(required_item_ids) if accepted else 0,
"blocker_count": len(blockers),
"sensitive_payload_hit_count": len(sensitive_hits),
"forbidden_true_field_count": len(forbidden_true_fields),
"current_active_blocker_count": len(current_blockers),
"projected_active_blocker_count": len(projected_blockers),
"projected_active_blocker_count_after_redacted_inventory_receipt_writeback": (
len(projected_after_inventory_receipt)
),
"current_owner_coverage_attestation_accepted_count": (
current_accepted_attestation_count
),
"projected_owner_coverage_attestation_accepted_count": (
projected_accepted_attestation_count
),
"token_value_collection_allowed": False,
"secret_value_collection_allowed": False,
"repo_write_allowed": False,
"refs_sync_allowed": False,
"github_primary_switch_authorized": False,
"runtime_gate_count": 0,
},
"blockers": blockers,
"missing_attestation_item_ids": missing_item_ids,
"unknown_attestation_item_ids": unknown_item_ids,
"duplicate_attestation_item_ids": sorted(duplicate_item_ids),
"sensitive_payload_hits": sensitive_hits,
"forbidden_true_fields": forbidden_true_fields,
"operation_boundaries": {
"payload_persisted": False,
"gitea_api_called": False,
"gitea_write_performed": False,
"repo_write_performed": False,
"refs_sync_performed": False,
"github_api_used": False,
"github_cli_used": False,
"secret_plaintext_read": False,
"token_value_collection_allowed": False,
"secret_value_collection_allowed": False,
"runtime_action_performed": False,
"raw_session_or_sqlite_read_performed": False,
},
"reviewer_readiness": {
"schema_version": "gitea_owner_coverage_attestation_reviewer_readiness_v1",
"status": (
"ready_for_private_inventory_closeout_after_inventory_receipt_writeback"
if accepted
else "not_ready_for_private_inventory_closeout"
),
"redacted_owner_attestation_receipt_writeback_ready_count": (
1 if accepted else 0
),
"accepted_response_count": len(required_item_ids) if accepted else 0,
"projected_active_blocker_count": len(projected_blockers),
"projected_remaining_blockers": projected_blockers,
"projected_active_blocker_count_after_redacted_inventory_receipt_writeback": (
len(projected_after_inventory_receipt)
),
"projected_remaining_blockers_after_redacted_inventory_receipt_writeback": (
projected_after_inventory_receipt
),
"repo_write_authorized_count": 0,
"refs_sync_authorized_count": 0,
"github_primary_switch_authorized_count": 0,
"runtime_gate_count": 0,
"token_value_collection_allowed": False,
"secret_value_collection_allowed": False,
"payload_persisted": False,
"safe_next_step": (
"write_redacted_inventory_receipt_and_owner_attestation_receipt_then_close_p0_003"
if accepted
else "supplement_owner_coverage_attestation_redacted_metadata"
),
"blocked_operations": [
"store_token_value",
"store_raw_secret",
"gitea_api_write",
"repo_write",
"refs_sync",
"github_api",
"github_primary_switch",
"workflow_dispatch",
"runtime_action",
"raw_session_or_sqlite_read",
],
},
"safe_next_step": (
"review_redacted_owner_attestation_then_pair_with_inventory_receipt_writeback"
if accepted
else "supplement_owner_coverage_attestation_redacted_metadata"
),
}
def _load_response_templates(security_dir: Path | None) -> list[dict[str, Any]]:
directory = security_dir or default_security_dir(Path(__file__))
path = directory / _OWNER_RESPONSE_PACKET
with path.open(encoding="utf-8") as handle:
packet = json.load(handle)
if packet.get("schema_version") != _OWNER_RESPONSE_SCHEMA_VERSION:
raise ValueError(f"{path}: owner response packet schema mismatch")
templates = packet.get("response_templates")
if not isinstance(templates, list) or len(templates) != 5:
raise ValueError(f"{path}: expected five response templates")
return [template for template in templates if isinstance(template, dict)]
def _response_items(owner_response: dict[str, Any]) -> list[dict[str, Any]]:
for key in ("responses", "owner_responses", "attestation_responses"):
value = owner_response.get(key)
if isinstance(value, list):
return [item for item in value if isinstance(item, dict)]
return []
def _validate_response_item(
*,
item_id: str,
response: dict[str, Any],
template: dict[str, Any],
) -> list[str]:
blockers: list[str] = []
decision = str(response.get("decision") or "")
acceptable_decisions = set(_strings(template.get("acceptable_decisions")))
if decision not in acceptable_decisions:
blockers.append(f"{item_id}.decision_not_allowed")
for field in _strings(template.get("required_owner_fields")):
value = response.get(field)
if field == "evidence_refs":
if not _has_redacted_evidence_refs(value):
blockers.append(f"{item_id}.evidence_refs_missing_or_not_redacted_list")
continue
if _is_placeholder(value):
blockers.append(f"{item_id}.{field}_missing")
if _is_placeholder(response.get("decision_reason")):
blockers.append(f"{item_id}.decision_reason_missing")
return blockers
def _has_redacted_evidence_refs(value: Any) -> bool:
if not isinstance(value, list) or not value:
return False
for item in value:
if not isinstance(item, str) or _is_placeholder(item):
return False
if _url_has_secret(item):
return False
return True
def _find_sensitive_strings(value: Any) -> list[str]:
hits: list[str] = []
def walk(node: Any, path: str) -> None:
if isinstance(node, dict):
for key, item in node.items():
walk(item, f"{path}.{key}" if path else str(key))
elif isinstance(node, list):
for index, item in enumerate(node):
walk(item, f"{path}[{index}]")
elif isinstance(node, str):
for name, pattern in _SECRET_PATTERNS.items():
if pattern.search(node):
hits.append(f"{path}:{name}")
if _url_has_secret(node):
hits.append(f"{path}:url_contains_secret_material")
walk(value, "")
return sorted(set(hits))
def _find_forbidden_true_fields(value: Any) -> list[str]:
hits: list[str] = []
def walk(node: Any, path: str) -> None:
if isinstance(node, dict):
for key, item in node.items():
next_path = f"{path}.{key}" if path else str(key)
if key in _FORBIDDEN_TRUE_FIELDS and item is True:
hits.append(next_path)
walk(item, next_path)
elif isinstance(node, list):
for index, item in enumerate(node):
walk(item, f"{path}[{index}]")
walk(value, "")
return sorted(hits)
def _url_has_secret(value: str) -> bool:
if "://" not in value:
return False
parsed = urlsplit(value)
if parsed.username or parsed.password:
return True
return any(key.lower() in _SECRET_QUERY_KEYS for key, _ in parse_qsl(parsed.query))
def _is_placeholder(value: Any) -> bool:
if value is None:
return True
if isinstance(value, str):
return value.strip().lower() in {"", "pending", "todo", "tbd", "n/a", "na"}
if isinstance(value, list):
return not value
return False
def _strings(value: Any) -> list[str]:
if not isinstance(value, list):
return []
return [str(item) for item in value if item is not None]
def _as_dict(value: Any) -> dict[str, Any]:
return value if isinstance(value, dict) else {}
def _as_int(value: Any) -> int:
try:
return int(value)
except (TypeError, ValueError):
return 0