feat(api): validate gitea inventory closeout
All checks were successful
CD Pipeline / workflow-shape (push) Successful in 0s
CD Pipeline / cancel-stale-cd (push) Has been skipped
CD Pipeline / tests (push) Successful in 29s
CD Pipeline / build-and-deploy (push) Successful in 5m58s
CD Pipeline / post-deploy-checks (push) Successful in 58s

This commit is contained in:
Your Name
2026-06-29 20:10:04 +08:00
parent 3113f34008
commit 1fb4bfc09d
5 changed files with 319 additions and 0 deletions

View File

@@ -344,6 +344,9 @@ from src.services.gitea_authenticated_inventory_payload_validation import (
from src.services.gitea_owner_coverage_attestation_validation import (
validate_gitea_owner_coverage_attestation,
)
from src.services.gitea_private_inventory_closeout_validation import (
validate_gitea_private_inventory_controlled_closeout,
)
from src.services.gitea_private_inventory_p0_scorecard import (
load_latest_gitea_private_inventory_p0_scorecard,
)
@@ -1186,6 +1189,41 @@ async def validate_gitea_owner_coverage_attestation_packet(
) from exc
@router.post(
"/gitea-private-inventory-p0-scorecard/validate-controlled-closeout",
response_model=dict[str, Any],
summary="驗證 P0-003 Gitea private inventory controlled closeout 脫敏收斂包",
description=(
"同時驗證 owner-provided redacted authenticated/admin inventory payload "
"與 owner coverage attestation response回傳 P0-003 controlled closeout "
"readiness此端點不保存 payload、不寫 source scorecard、不呼叫 Gitea/GitHub、"
"不收 token value、不建立 repo、不改 visibility、不同步 refs、不觸發 workflow、"
"不讀 secret、不讀 raw session/SQLite。"
),
)
async def validate_gitea_private_inventory_controlled_closeout_packet(
closeout_packet: dict[str, Any],
) -> dict[str, Any]:
"""Return no-persist closeout validation for paired P0-003 redacted inputs."""
try:
payload = await asyncio.to_thread(
validate_gitea_private_inventory_controlled_closeout,
closeout_packet,
)
return redact_public_lan_topology(payload)
except FileNotFoundError as exc:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=str(exc),
) from exc
except (json.JSONDecodeError, ValueError) as exc:
logger.error("gitea_private_inventory_controlled_closeout_invalid", error=str(exc))
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="P0-003 Gitea private inventory controlled closeout 驗證器無效",
) from exc
@router.get(
"/github-target-private-backup-evidence-gate",
response_model=dict[str, Any],

View File

@@ -0,0 +1,199 @@
"""P0-003 Gitea private inventory controlled closeout validation.
This combines the redacted authenticated inventory payload and owner coverage
attestation validators into one no-persist closeout receipt. It never calls
Gitea or GitHub, never stores submitted payloads, and never writes runtime state.
"""
from __future__ import annotations
from typing import Any
from src.services.gitea_authenticated_inventory_payload_validation import (
validate_gitea_authenticated_inventory_payload,
)
from src.services.gitea_owner_coverage_attestation_validation import (
validate_gitea_owner_coverage_attestation,
)
from src.services.gitea_private_inventory_p0_scorecard import (
load_latest_gitea_private_inventory_p0_scorecard,
)
_SCHEMA_VERSION = "gitea_private_inventory_controlled_closeout_validation_v1"
_ACCEPTED_INVENTORY_STATUS = "accepted_for_private_inventory_review_only"
_ACCEPTED_ATTESTATION_STATUS = "accepted_for_owner_coverage_attestation_review_only"
_REJECTED_STATUS = "rejected_execution_request"
_QUARANTINED_STATUS = "quarantined_sensitive_payload"
def validate_gitea_private_inventory_controlled_closeout(
closeout_packet: dict[str, Any],
scorecard: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Validate the paired P0-003 receipts without persisting or applying them."""
current = scorecard or load_latest_gitea_private_inventory_p0_scorecard()
inventory_payload = _as_dict(closeout_packet.get("authenticated_inventory_payload"))
owner_attestation = _as_dict(closeout_packet.get("owner_coverage_attestation"))
inventory = validate_gitea_authenticated_inventory_payload(
inventory_payload,
scorecard=current,
)
attestation = validate_gitea_owner_coverage_attestation(
owner_attestation,
scorecard=current,
)
inventory_status = str(inventory.get("status") or "")
attestation_status = str(attestation.get("status") or "")
inventory_ready = inventory_status == _ACCEPTED_INVENTORY_STATUS
attestation_ready = attestation_status == _ACCEPTED_ATTESTATION_STATUS
rejected = _REJECTED_STATUS in {inventory_status, attestation_status}
quarantined = _QUARANTINED_STATUS in {inventory_status, attestation_status}
closeout_blockers: list[str] = []
if not inventory_ready:
closeout_blockers.append("authenticated_inventory_payload_not_accepted")
if not attestation_ready:
closeout_blockers.append("owner_coverage_attestation_not_accepted")
if rejected:
status = _REJECTED_STATUS
elif quarantined:
status = _QUARANTINED_STATUS
elif closeout_blockers:
status = "needs_supplement"
else:
status = "ready_for_p0_003_controlled_closeout"
closeout_ready = status == "ready_for_p0_003_controlled_closeout"
current_blockers = _strings(current.get("active_blockers"))
projected_remaining_blockers = [] if closeout_ready else current_blockers
return {
"schema_version": _SCHEMA_VERSION,
"status": status,
"priority": "P0-003",
"scope": "gitea_private_inventory_controlled_closeout",
"source_scorecard_status": current.get("status"),
"result": {
"authenticated_inventory_payload_accepted_count": 1 if inventory_ready else 0,
"owner_coverage_attestation_accepted_count": 1 if attestation_ready else 0,
"paired_receipt_count": 2 if closeout_ready else 0,
"redacted_receipt_writeback_ready_count": 2 if closeout_ready else 0,
"closeout_blocker_count": len(closeout_blockers),
"current_active_blocker_count": len(current_blockers),
"projected_active_blocker_count": len(projected_remaining_blockers),
"projected_gitea_repo_inventory_status": (
inventory.get("result", {}).get("projected_gitea_repo_inventory_status")
if inventory_ready
else current.get("rollups", {}).get("gitea_repo_inventory_status")
),
"projected_gitea_visibility_scope": (
inventory.get("result", {}).get("projected_gitea_visibility_scope")
if inventory_ready
else current.get("rollups", {}).get("gitea_visibility_scope")
),
"projected_owner_coverage_attestation_accepted_count": (
attestation.get("result", {}).get(
"projected_owner_coverage_attestation_accepted_count"
)
if attestation_ready
else current.get("rollups", {}).get(
"owner_coverage_attestation_accepted_count"
)
),
"repo_write_authorized_count": 0,
"refs_sync_authorized_count": 0,
"github_primary_switch_authorized_count": 0,
"runtime_gate_count": 0,
"token_value_collection_allowed": False,
"secret_value_collection_allowed": False,
},
"closeout_blockers": closeout_blockers,
"validator_statuses": {
"authenticated_inventory_payload": inventory_status,
"owner_coverage_attestation": attestation_status,
},
"validator_summaries": {
"authenticated_inventory_payload": {
"accepted_payload_count": inventory.get("result", {}).get(
"accepted_payload_count"
),
"projected_active_blocker_count": inventory.get("result", {}).get(
"projected_active_blocker_count"
),
"payload_persisted": inventory.get("operation_boundaries", {}).get(
"payload_persisted"
),
},
"owner_coverage_attestation": {
"accepted_attestation_packet_count": attestation.get("result", {}).get(
"accepted_attestation_packet_count"
),
"accepted_response_count": attestation.get("result", {}).get(
"accepted_response_count"
),
"projected_active_blocker_count_after_redacted_inventory_receipt_writeback": (
attestation.get("result", {}).get(
"projected_active_blocker_count_after_redacted_inventory_receipt_writeback"
)
),
"payload_persisted": attestation.get("operation_boundaries", {}).get(
"payload_persisted"
),
},
},
"operation_boundaries": {
"payload_persisted": False,
"source_scorecard_written": False,
"gitea_api_called": False,
"gitea_write_performed": False,
"repo_write_performed": False,
"refs_sync_performed": False,
"github_api_used": False,
"github_cli_used": False,
"secret_plaintext_read": False,
"token_value_collection_allowed": False,
"secret_value_collection_allowed": False,
"runtime_action_performed": False,
"raw_session_or_sqlite_read_performed": False,
},
"reviewer_readiness": {
"schema_version": "gitea_private_inventory_controlled_closeout_readiness_v1",
"status": (
"ready_for_redacted_receipt_source_writeback"
if closeout_ready
else "not_ready_for_redacted_receipt_source_writeback"
),
"redacted_receipt_writeback_ready_count": 2 if closeout_ready else 0,
"projected_active_blocker_count": len(projected_remaining_blockers),
"projected_remaining_blockers": projected_remaining_blockers,
"repo_write_authorized_count": 0,
"refs_sync_authorized_count": 0,
"github_primary_switch_authorized_count": 0,
"runtime_gate_count": 0,
"payload_persisted": False,
"source_scorecard_written": False,
"safe_next_step": (
"write_redacted_receipt_source_of_truth_then_deploy_p0_003_closeout"
if closeout_ready
else "supplement_rejected_or_missing_p0_003_closeout_inputs"
),
},
"safe_next_step": (
"write_redacted_receipt_source_of_truth_then_deploy_p0_003_closeout"
if closeout_ready
else "supplement_rejected_or_missing_p0_003_closeout_inputs"
),
}
def _as_dict(value: Any) -> dict[str, Any]:
return value if isinstance(value, dict) else {}
def _strings(value: Any) -> list[str]:
if not isinstance(value, list):
return []
return [str(item) for item in value if item is not None]