From 1fb4bfc09d1e2a192527fb53fdf4694ab2380b6d Mon Sep 17 00:00:00 2001 From: Your Name Date: Mon, 29 Jun 2026 20:10:04 +0800 Subject: [PATCH] feat(api): validate gitea inventory closeout --- .gitea/workflows/cd.yaml | 3 + apps/api/src/api/v1/agents.py | 38 ++++ ...a_private_inventory_closeout_validation.py | 199 ++++++++++++++++++ ...itea_private_inventory_p0_scorecard_api.py | 77 +++++++ .../test_cd_controlled_runtime_profile.py | 2 + 5 files changed, 319 insertions(+) create mode 100644 apps/api/src/services/gitea_private_inventory_closeout_validation.py diff --git a/.gitea/workflows/cd.yaml b/.gitea/workflows/cd.yaml index f36feacc..03df70fd 100644 --- a/.gitea/workflows/cd.yaml +++ b/.gitea/workflows/cd.yaml @@ -262,6 +262,8 @@ jobs: ;; apps/api/src/services/gitea_owner_coverage_attestation_validation.py) ;; + apps/api/src/services/gitea_private_inventory_closeout_validation.py) + ;; apps/api/src/services/gitea_private_inventory_p0_scorecard.py) ;; apps/api/src/services/reboot_auto_recovery_slo_scorecard.py) @@ -498,6 +500,7 @@ jobs: src/services/credential_escrow_evidence_intake_readiness.py \ src/services/gitea_authenticated_inventory_payload_validation.py \ src/services/gitea_owner_coverage_attestation_validation.py \ + src/services/gitea_private_inventory_closeout_validation.py \ src/services/gitea_private_inventory_p0_scorecard.py \ src/services/reboot_auto_recovery_slo_scorecard.py \ src/services/iwooos_security_operating_system.py \ diff --git a/apps/api/src/api/v1/agents.py b/apps/api/src/api/v1/agents.py index 01f69742..cd392c52 100644 --- a/apps/api/src/api/v1/agents.py +++ b/apps/api/src/api/v1/agents.py @@ -344,6 +344,9 @@ from src.services.gitea_authenticated_inventory_payload_validation import ( from src.services.gitea_owner_coverage_attestation_validation import ( validate_gitea_owner_coverage_attestation, ) +from src.services.gitea_private_inventory_closeout_validation import ( + validate_gitea_private_inventory_controlled_closeout, +) from src.services.gitea_private_inventory_p0_scorecard import ( load_latest_gitea_private_inventory_p0_scorecard, ) @@ -1186,6 +1189,41 @@ async def validate_gitea_owner_coverage_attestation_packet( ) from exc +@router.post( + "/gitea-private-inventory-p0-scorecard/validate-controlled-closeout", + response_model=dict[str, Any], + summary="驗證 P0-003 Gitea private inventory controlled closeout 脫敏收斂包", + description=( + "同時驗證 owner-provided redacted authenticated/admin inventory payload " + "與 owner coverage attestation response,回傳 P0-003 controlled closeout " + "readiness;此端點不保存 payload、不寫 source scorecard、不呼叫 Gitea/GitHub、" + "不收 token value、不建立 repo、不改 visibility、不同步 refs、不觸發 workflow、" + "不讀 secret、不讀 raw session/SQLite。" + ), +) +async def validate_gitea_private_inventory_controlled_closeout_packet( + closeout_packet: dict[str, Any], +) -> dict[str, Any]: + """Return no-persist closeout validation for paired P0-003 redacted inputs.""" + try: + payload = await asyncio.to_thread( + validate_gitea_private_inventory_controlled_closeout, + closeout_packet, + ) + return redact_public_lan_topology(payload) + except FileNotFoundError as exc: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=str(exc), + ) from exc + except (json.JSONDecodeError, ValueError) as exc: + logger.error("gitea_private_inventory_controlled_closeout_invalid", error=str(exc)) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="P0-003 Gitea private inventory controlled closeout 驗證器無效", + ) from exc + + @router.get( "/github-target-private-backup-evidence-gate", response_model=dict[str, Any], diff --git a/apps/api/src/services/gitea_private_inventory_closeout_validation.py b/apps/api/src/services/gitea_private_inventory_closeout_validation.py new file mode 100644 index 00000000..40ccc993 --- /dev/null +++ b/apps/api/src/services/gitea_private_inventory_closeout_validation.py @@ -0,0 +1,199 @@ +"""P0-003 Gitea private inventory controlled closeout validation. + +This combines the redacted authenticated inventory payload and owner coverage +attestation validators into one no-persist closeout receipt. It never calls +Gitea or GitHub, never stores submitted payloads, and never writes runtime state. +""" + +from __future__ import annotations + +from typing import Any + +from src.services.gitea_authenticated_inventory_payload_validation import ( + validate_gitea_authenticated_inventory_payload, +) +from src.services.gitea_owner_coverage_attestation_validation import ( + validate_gitea_owner_coverage_attestation, +) +from src.services.gitea_private_inventory_p0_scorecard import ( + load_latest_gitea_private_inventory_p0_scorecard, +) + +_SCHEMA_VERSION = "gitea_private_inventory_controlled_closeout_validation_v1" +_ACCEPTED_INVENTORY_STATUS = "accepted_for_private_inventory_review_only" +_ACCEPTED_ATTESTATION_STATUS = "accepted_for_owner_coverage_attestation_review_only" +_REJECTED_STATUS = "rejected_execution_request" +_QUARANTINED_STATUS = "quarantined_sensitive_payload" + + +def validate_gitea_private_inventory_controlled_closeout( + closeout_packet: dict[str, Any], + scorecard: dict[str, Any] | None = None, +) -> dict[str, Any]: + """Validate the paired P0-003 receipts without persisting or applying them.""" + current = scorecard or load_latest_gitea_private_inventory_p0_scorecard() + inventory_payload = _as_dict(closeout_packet.get("authenticated_inventory_payload")) + owner_attestation = _as_dict(closeout_packet.get("owner_coverage_attestation")) + + inventory = validate_gitea_authenticated_inventory_payload( + inventory_payload, + scorecard=current, + ) + attestation = validate_gitea_owner_coverage_attestation( + owner_attestation, + scorecard=current, + ) + + inventory_status = str(inventory.get("status") or "") + attestation_status = str(attestation.get("status") or "") + inventory_ready = inventory_status == _ACCEPTED_INVENTORY_STATUS + attestation_ready = attestation_status == _ACCEPTED_ATTESTATION_STATUS + rejected = _REJECTED_STATUS in {inventory_status, attestation_status} + quarantined = _QUARANTINED_STATUS in {inventory_status, attestation_status} + + closeout_blockers: list[str] = [] + if not inventory_ready: + closeout_blockers.append("authenticated_inventory_payload_not_accepted") + if not attestation_ready: + closeout_blockers.append("owner_coverage_attestation_not_accepted") + + if rejected: + status = _REJECTED_STATUS + elif quarantined: + status = _QUARANTINED_STATUS + elif closeout_blockers: + status = "needs_supplement" + else: + status = "ready_for_p0_003_controlled_closeout" + + closeout_ready = status == "ready_for_p0_003_controlled_closeout" + current_blockers = _strings(current.get("active_blockers")) + projected_remaining_blockers = [] if closeout_ready else current_blockers + + return { + "schema_version": _SCHEMA_VERSION, + "status": status, + "priority": "P0-003", + "scope": "gitea_private_inventory_controlled_closeout", + "source_scorecard_status": current.get("status"), + "result": { + "authenticated_inventory_payload_accepted_count": 1 if inventory_ready else 0, + "owner_coverage_attestation_accepted_count": 1 if attestation_ready else 0, + "paired_receipt_count": 2 if closeout_ready else 0, + "redacted_receipt_writeback_ready_count": 2 if closeout_ready else 0, + "closeout_blocker_count": len(closeout_blockers), + "current_active_blocker_count": len(current_blockers), + "projected_active_blocker_count": len(projected_remaining_blockers), + "projected_gitea_repo_inventory_status": ( + inventory.get("result", {}).get("projected_gitea_repo_inventory_status") + if inventory_ready + else current.get("rollups", {}).get("gitea_repo_inventory_status") + ), + "projected_gitea_visibility_scope": ( + inventory.get("result", {}).get("projected_gitea_visibility_scope") + if inventory_ready + else current.get("rollups", {}).get("gitea_visibility_scope") + ), + "projected_owner_coverage_attestation_accepted_count": ( + attestation.get("result", {}).get( + "projected_owner_coverage_attestation_accepted_count" + ) + if attestation_ready + else current.get("rollups", {}).get( + "owner_coverage_attestation_accepted_count" + ) + ), + "repo_write_authorized_count": 0, + "refs_sync_authorized_count": 0, + "github_primary_switch_authorized_count": 0, + "runtime_gate_count": 0, + "token_value_collection_allowed": False, + "secret_value_collection_allowed": False, + }, + "closeout_blockers": closeout_blockers, + "validator_statuses": { + "authenticated_inventory_payload": inventory_status, + "owner_coverage_attestation": attestation_status, + }, + "validator_summaries": { + "authenticated_inventory_payload": { + "accepted_payload_count": inventory.get("result", {}).get( + "accepted_payload_count" + ), + "projected_active_blocker_count": inventory.get("result", {}).get( + "projected_active_blocker_count" + ), + "payload_persisted": inventory.get("operation_boundaries", {}).get( + "payload_persisted" + ), + }, + "owner_coverage_attestation": { + "accepted_attestation_packet_count": attestation.get("result", {}).get( + "accepted_attestation_packet_count" + ), + "accepted_response_count": attestation.get("result", {}).get( + "accepted_response_count" + ), + "projected_active_blocker_count_after_redacted_inventory_receipt_writeback": ( + attestation.get("result", {}).get( + "projected_active_blocker_count_after_redacted_inventory_receipt_writeback" + ) + ), + "payload_persisted": attestation.get("operation_boundaries", {}).get( + "payload_persisted" + ), + }, + }, + "operation_boundaries": { + "payload_persisted": False, + "source_scorecard_written": False, + "gitea_api_called": False, + "gitea_write_performed": False, + "repo_write_performed": False, + "refs_sync_performed": False, + "github_api_used": False, + "github_cli_used": False, + "secret_plaintext_read": False, + "token_value_collection_allowed": False, + "secret_value_collection_allowed": False, + "runtime_action_performed": False, + "raw_session_or_sqlite_read_performed": False, + }, + "reviewer_readiness": { + "schema_version": "gitea_private_inventory_controlled_closeout_readiness_v1", + "status": ( + "ready_for_redacted_receipt_source_writeback" + if closeout_ready + else "not_ready_for_redacted_receipt_source_writeback" + ), + "redacted_receipt_writeback_ready_count": 2 if closeout_ready else 0, + "projected_active_blocker_count": len(projected_remaining_blockers), + "projected_remaining_blockers": projected_remaining_blockers, + "repo_write_authorized_count": 0, + "refs_sync_authorized_count": 0, + "github_primary_switch_authorized_count": 0, + "runtime_gate_count": 0, + "payload_persisted": False, + "source_scorecard_written": False, + "safe_next_step": ( + "write_redacted_receipt_source_of_truth_then_deploy_p0_003_closeout" + if closeout_ready + else "supplement_rejected_or_missing_p0_003_closeout_inputs" + ), + }, + "safe_next_step": ( + "write_redacted_receipt_source_of_truth_then_deploy_p0_003_closeout" + if closeout_ready + else "supplement_rejected_or_missing_p0_003_closeout_inputs" + ), + } + + +def _as_dict(value: Any) -> dict[str, Any]: + return value if isinstance(value, dict) else {} + + +def _strings(value: Any) -> list[str]: + if not isinstance(value, list): + return [] + return [str(item) for item in value if item is not None] diff --git a/apps/api/tests/test_gitea_private_inventory_p0_scorecard_api.py b/apps/api/tests/test_gitea_private_inventory_p0_scorecard_api.py index 06272e3b..0e550b7f 100644 --- a/apps/api/tests/test_gitea_private_inventory_p0_scorecard_api.py +++ b/apps/api/tests/test_gitea_private_inventory_p0_scorecard_api.py @@ -13,6 +13,9 @@ from src.services.gitea_authenticated_inventory_payload_validation import ( from src.services.gitea_owner_coverage_attestation_validation import ( validate_gitea_owner_coverage_attestation, ) +from src.services.gitea_private_inventory_closeout_validation import ( + validate_gitea_private_inventory_controlled_closeout, +) from src.services.gitea_private_inventory_p0_scorecard import ( load_latest_gitea_private_inventory_p0_scorecard, ) @@ -303,6 +306,73 @@ def test_gitea_owner_coverage_attestation_validator_rejects_execution_request(): assert validation["operation_boundaries"]["repo_write_performed"] is False +def test_gitea_private_inventory_controlled_closeout_accepts_paired_redacted_inputs(): + payload = validate_gitea_private_inventory_controlled_closeout( + _valid_controlled_closeout_payload(), + scorecard=_scorecard_readback(), + ) + + assert ( + payload["schema_version"] + == "gitea_private_inventory_controlled_closeout_validation_v1" + ) + assert payload["status"] == "ready_for_p0_003_controlled_closeout" + assert payload["result"]["authenticated_inventory_payload_accepted_count"] == 1 + assert payload["result"]["owner_coverage_attestation_accepted_count"] == 1 + assert payload["result"]["paired_receipt_count"] == 2 + assert payload["result"]["redacted_receipt_writeback_ready_count"] == 2 + assert payload["result"]["current_active_blocker_count"] == 4 + assert payload["result"]["projected_active_blocker_count"] == 0 + assert payload["result"]["runtime_gate_count"] == 0 + assert payload["operation_boundaries"]["payload_persisted"] is False + assert payload["operation_boundaries"]["source_scorecard_written"] is False + assert payload["operation_boundaries"]["gitea_api_called"] is False + assert payload["operation_boundaries"]["repo_write_performed"] is False + assert payload["operation_boundaries"]["refs_sync_performed"] is False + assert payload["operation_boundaries"]["github_api_used"] is False + assert payload["operation_boundaries"]["runtime_action_performed"] is False + assert ( + payload["reviewer_readiness"]["status"] + == "ready_for_redacted_receipt_source_writeback" + ) + + +def test_gitea_private_inventory_controlled_closeout_endpoint_no_persist(): + app = FastAPI() + app.include_router(router, prefix="/api/v1") + client = TestClient(app) + + response = client.post( + "/api/v1/agents/gitea-private-inventory-p0-scorecard/validate-controlled-closeout", + json=_valid_controlled_closeout_payload(), + ) + + assert response.status_code == 200 + data = response.json() + assert data["status"] == "ready_for_p0_003_controlled_closeout" + assert data["result"]["projected_active_blocker_count"] == 0 + assert data["result"]["redacted_receipt_writeback_ready_count"] == 2 + assert data["operation_boundaries"]["payload_persisted"] is False + assert data["operation_boundaries"]["source_scorecard_written"] is False + assert data["operation_boundaries"]["raw_session_or_sqlite_read_performed"] is False + + +def test_gitea_private_inventory_controlled_closeout_blocks_unaccepted_pair(): + packet = _valid_controlled_closeout_payload() + packet["owner_coverage_attestation"]["responses"][0]["decision"] = "not_allowed" + + validation = validate_gitea_private_inventory_controlled_closeout( + packet, + scorecard=_scorecard_readback(), + ) + + assert validation["status"] == "needs_supplement" + assert validation["result"]["paired_receipt_count"] == 0 + assert validation["result"]["projected_active_blocker_count"] == 4 + assert "owner_coverage_attestation_not_accepted" in validation["closeout_blockers"] + assert validation["operation_boundaries"]["source_scorecard_written"] is False + + def test_gitea_authenticated_inventory_payload_validator_quarantines_secret_material(): payload = _valid_authenticated_inventory_payload() payload["repos"][0]["clone_url_redacted"] = "https://user:password@example.test/repo.git" @@ -594,6 +664,13 @@ def _valid_owner_coverage_attestation_payload() -> dict: } +def _valid_controlled_closeout_payload() -> dict: + return { + "authenticated_inventory_payload": _valid_authenticated_inventory_payload(), + "owner_coverage_attestation": _valid_owner_coverage_attestation_payload(), + } + + def _repo(full_name: str) -> dict: _, name = full_name.split("/", 1) return { diff --git a/ops/runner/test_cd_controlled_runtime_profile.py b/ops/runner/test_cd_controlled_runtime_profile.py index a81a1bc7..f3170abf 100644 --- a/ops/runner/test_cd_controlled_runtime_profile.py +++ b/ops/runner/test_cd_controlled_runtime_profile.py @@ -122,6 +122,7 @@ def test_gitea_private_inventory_scorecard_stays_on_controlled_runtime_profile() "docs/operations/awoooi-gitea-private-inventory-p0-scorecard.snapshot.json)", "apps/api/src/services/gitea_authenticated_inventory_payload_validation.py)", "apps/api/src/services/gitea_owner_coverage_attestation_validation.py)", + "apps/api/src/services/gitea_private_inventory_closeout_validation.py)", "apps/api/src/services/gitea_private_inventory_p0_scorecard.py)", "apps/api/tests/test_gitea_private_inventory_p0_scorecard_api.py)", "docs/operations/awoooi-gitea-authenticated-inventory-payload-validation.snapshot.json)", @@ -133,6 +134,7 @@ def test_gitea_private_inventory_scorecard_stays_on_controlled_runtime_profile() "scripts/security/tests/test_gitea_private_inventory_p0_scorecard.py)", "src/services/gitea_authenticated_inventory_payload_validation.py", "src/services/gitea_owner_coverage_attestation_validation.py", + "src/services/gitea_private_inventory_closeout_validation.py", "src/services/gitea_private_inventory_p0_scorecard.py", "tests/test_gitea_private_inventory_p0_scorecard_api.py", "scripts/security/tests/test_gitea_authenticated_inventory_payload_validator.py)",