feat(api): expose credential escrow preflight intake
All checks were successful
CD Pipeline / workflow-shape (push) Successful in 0s
CD Pipeline / cancel-stale-cd (push) Has been skipped
CD Pipeline / tests (push) Successful in 19s
CD Pipeline / build-and-deploy (push) Successful in 8m47s
CD Pipeline / post-deploy-checks (push) Successful in 57s

This commit is contained in:
Your Name
2026-06-29 18:49:34 +08:00
parent c70bcf4819
commit 5981586d0f
2 changed files with 255 additions and 2 deletions

View File

@@ -22,16 +22,19 @@ _DEFAULT_SECURITY_DIR = default_security_dir(Path(__file__))
_OWNER_REQUEST_FILE = "credential-escrow-evidence-owner-request.snapshot.json"
_SCHEMA_VERSION = "credential_escrow_evidence_intake_readiness_v1"
_VALIDATION_SCHEMA_VERSION = "credential_escrow_evidence_owner_response_validation_v1"
_SINGLE_PREFLIGHT_INTAKE_SCHEMA_VERSION = "credential_escrow_single_preflight_intake_v1"
_OWNER_PACKET_SCHEMA = "awoooi_post_reboot_next_gate_owner_packets_v1"
_OWNER_REQUEST_SCOPE = "credential_escrow_evidence_owner_request"
_OWNER_RESPONSE_SCHEMA = "awoooi_post_reboot_next_gate_owner_response_v1"
_GATE_ID = "credential_escrow_evidence"
_REQUIRED_ITEM_IDS = {
_REQUIRED_ITEM_ORDER = (
"restic_repository_password",
"offsite_provider_credentials",
"break_glass_admin_credentials",
"dns_registrar_recovery",
"oauth_ai_provider_recovery",
}
)
_REQUIRED_ITEM_IDS = set(_REQUIRED_ITEM_ORDER)
_FORBIDDEN_BOOLEAN_FIELDS = {
"runtime_action_requested",
"runtime_action_authorized",
@@ -88,6 +91,7 @@ def load_latest_credential_escrow_evidence_intake_readiness(
payload = _build_payload(owner_request, matrix, path)
_require_operation_boundaries(payload, str(path))
_require_rollup_consistency(payload, str(path))
_require_single_preflight_intake(payload, str(path))
return payload
@@ -325,6 +329,7 @@ def _build_payload(
rollups.get("credential_escrow_forbidden_true_field_count")
),
"forbidden_value_count": len(forbidden_values),
"single_preflight_intake_ready_count": 1,
}
payload = {
@@ -344,9 +349,18 @@ def _build_payload(
"source_backup_dr_readiness_matrix_ref": (
"docs/evaluations/backup_dr_readiness_matrix_2026-06-04.json"
),
"single_preflight_intake_schema_version": (
_SINGLE_PREFLIGHT_INTAKE_SCHEMA_VERSION
),
"safe_next_step": safe_next_step,
},
"required_evidence_items": missing_items,
"single_preflight_intake_ready": True,
"owner_response_skeleton_required_item_count": len(_REQUIRED_ITEM_ORDER),
"owner_response_skeleton_secret_value_collection_allowed": False,
"single_preflight_intake": _build_single_preflight_intake(
owner_request.get("generated_at"),
),
"forbidden_values": forbidden_values,
"next_actions": [
"collect_redacted_non_secret_evidence_refs_for_all_missing_items",
@@ -381,6 +395,151 @@ def _build_payload(
return payload
def _build_single_preflight_intake(generated_at: Any) -> dict[str, Any]:
return {
"schema_version": _SINGLE_PREFLIGHT_INTAKE_SCHEMA_VERSION,
"generated_at": generated_at,
"workplan_id": "P0-005",
"status": "waiting_for_five_redacted_non_secret_evidence_refs",
"active_gate": _GATE_ID,
"required_item_count": len(_REQUIRED_ITEM_ORDER),
"required_item_ids": list(_REQUIRED_ITEM_ORDER),
"required_items": [
_build_single_preflight_evidence_item(item_id)
for item_id in _REQUIRED_ITEM_ORDER
],
"owner_packet": _build_single_preflight_owner_packet(),
"owner_response_skeleton": _build_single_preflight_owner_response_skeleton(
generated_at,
),
"single_preflight_command": (
"python3 scripts/reboot-recovery/post-reboot-owner-response-preflight.py "
"--owner-packet-file <owner-packet.json> "
"--response-file <filled-owner-response.json> --json --no-color"
),
"scorecard_command": (
"python3 scripts/reboot-recovery/post-reboot-credential-escrow-intake-scorecard.py "
"--summary-file <summary.txt> --owner-packet-file <owner-packet.json> "
"--response-file <filled-owner-response.json> "
"--offsite-report-file <offsite-report.txt> "
"--escrow-status-file <escrow-status.txt> --json --no-color"
),
"exit_criteria": [
"preflight_status=ready_for_independent_reviewer_acceptance",
"owner_response_received_count=1",
"owner_response_accepted_count=1",
"forbidden_true_field_count=0",
"projected_effective_escrow_missing_count=0",
],
"operation_boundaries": {
"payload_persisted": False,
"backup_execution_performed": False,
"restore_execution_performed": False,
"offsite_sync_execution_performed": False,
"credential_marker_write_performed": False,
"credential_read_performed": False,
"secret_plaintext_read": False,
"secret_value_collection_allowed": False,
"workflow_trigger_performed": False,
"host_or_k8s_write_performed": False,
"raw_session_or_sqlite_read_performed": False,
"runtime_action_performed": False,
},
"execution_rules": [
"Use this as the only P0-005 intake packet.",
"Do not create per-item owner packets.",
"Do not include secret values in evidence refs.",
"Do not reopen cold-start or CI/CD while this checklist is pending.",
],
}
def _build_single_preflight_evidence_item(item_id: str) -> dict[str, Any]:
return {
"item_id": item_id,
"required_fields": [
"non_secret_evidence_ref",
"recovery_owner",
"reviewer",
"last_reviewed_at",
"contains_secret_value=false",
],
"accepted_ref_classes": [
"password_manager_item_id",
"sealed_envelope_id",
"recovery_checklist_id",
"review_ticket_id",
],
"rejected_values": [
"passwords",
"tokens",
"private keys",
"recovery codes",
"secret URLs",
"session cookies",
],
"marker_dry_run_command": (
"/backup/scripts/mark-credential-escrow-verified.sh "
f"--item {item_id} --evidence-id <NON_SECRET_REF_FOR_{item_id}> --dry-run"
),
}
def _build_single_preflight_owner_packet() -> dict[str, Any]:
return {
"schema_version": _OWNER_PACKET_SCHEMA,
"source": {"next_required_gates": [_GATE_ID]},
"owner_packets": [
{
"packet_id": _GATE_ID,
"title": "P0-005 DR credential escrow evidence",
"priority": "P0",
"required_items": list(_REQUIRED_ITEM_ORDER),
}
],
}
def _build_single_preflight_owner_response_skeleton(
generated_at: Any,
) -> dict[str, Any]:
return {
"schema_version": _OWNER_RESPONSE_SCHEMA,
"generated_at": generated_at,
"responses": [
{
"gate_id": _GATE_ID,
"owner_role": "owner_role_here",
"owner_team": "owner_team_here",
"decision": "pending",
"decision_reason": "decision_reason_here",
"affected_scope": "P0-005 DR credential escrow evidence",
"redacted_evidence_refs": ["redacted_evidence_ref_here"],
"followup_owner": "followup_owner_here",
"runtime_action_requested": False,
"runtime_action_authorized": False,
"host_write_requested": False,
"host_write_authorized": False,
"secret_value_included": False,
"secret_value_collection_allowed": False,
"credential_marker_write_requested": False,
"credential_marker_write_authorized": False,
"escrow_items": [
{
"item_id": item_id,
"non_secret_evidence_ref": "non_secret_evidence_ref_here",
"recovery_owner": "owner_role_here",
"reviewer": "reviewer_here",
"last_reviewed_at": "pending",
"contains_secret_value": False,
}
for item_id in _REQUIRED_ITEM_ORDER
],
}
],
}
def _normalize_item(item: Any) -> dict[str, Any]:
data = _dict(item)
return {
@@ -452,6 +611,53 @@ def _require_rollup_consistency(payload: dict[str, Any], label: str) -> None:
raise ValueError(f"{label}: marker write must remain closed")
def _require_single_preflight_intake(payload: dict[str, Any], label: str) -> None:
intake = _dict(payload.get("single_preflight_intake"))
if intake.get("schema_version") != _SINGLE_PREFLIGHT_INTAKE_SCHEMA_VERSION:
raise ValueError(f"{label}: single preflight intake schema mismatch")
if intake.get("required_item_count") != len(_REQUIRED_ITEM_ORDER):
raise ValueError(f"{label}: single preflight intake item count mismatch")
if intake.get("required_item_ids") != list(_REQUIRED_ITEM_ORDER):
raise ValueError(f"{label}: single preflight intake item order mismatch")
skeleton = _dict(intake.get("owner_response_skeleton"))
responses = _list(skeleton.get("responses"))
response = _dict(responses[0]) if responses else {}
escrow_items = [_dict(item) for item in _list(response.get("escrow_items"))]
if len(escrow_items) != len(_REQUIRED_ITEM_ORDER):
raise ValueError(f"{label}: owner response skeleton item count mismatch")
if [item.get("item_id") for item in escrow_items] != list(_REQUIRED_ITEM_ORDER):
raise ValueError(f"{label}: owner response skeleton item order mismatch")
if response.get("secret_value_collection_allowed") is not False:
raise ValueError(f"{label}: skeleton secret collection must remain false")
if response.get("credential_marker_write_authorized") is not False:
raise ValueError(f"{label}: skeleton marker write must remain false")
for item in escrow_items:
if item.get("contains_secret_value") is not False:
raise ValueError(f"{label}: skeleton escrow item secret flag must be false")
boundaries = _dict(intake.get("operation_boundaries"))
blocked_flags = {
"payload_persisted",
"backup_execution_performed",
"restore_execution_performed",
"offsite_sync_execution_performed",
"credential_marker_write_performed",
"credential_read_performed",
"secret_plaintext_read",
"secret_value_collection_allowed",
"workflow_trigger_performed",
"host_or_k8s_write_performed",
"raw_session_or_sqlite_read_performed",
"runtime_action_performed",
}
open_flags = sorted(flag for flag in blocked_flags if boundaries.get(flag) is not False)
if open_flags:
raise ValueError(
f"{label}: single preflight intake boundaries opened: {open_flags}"
)
def _dict(value: Any) -> dict[str, Any]:
return value if isinstance(value, dict) else {}

View File

@@ -54,6 +54,49 @@ def test_credential_escrow_evidence_intake_reports_p0_blocker():
)
def test_credential_escrow_evidence_intake_exposes_single_preflight_packet():
payload = load_latest_credential_escrow_evidence_intake_readiness()
assert payload["single_preflight_intake_ready"] is True
assert payload["single_preflight_intake_ready_count"] == 1
assert payload["rollups"]["single_preflight_intake_ready_count"] == 1
assert payload["owner_response_skeleton_required_item_count"] == 5
assert payload["owner_response_skeleton_secret_value_collection_allowed"] is False
assert payload["readback"]["single_preflight_intake_schema_version"] == (
"credential_escrow_single_preflight_intake_v1"
)
intake = payload["single_preflight_intake"]
assert intake["schema_version"] == "credential_escrow_single_preflight_intake_v1"
assert intake["workplan_id"] == "P0-005"
assert intake["active_gate"] == "credential_escrow_evidence"
assert intake["required_item_count"] == 5
assert intake["required_item_ids"] == ESCROW_ITEMS
assert [item["item_id"] for item in intake["required_items"]] == ESCROW_ITEMS
assert all("passwords" in item["rejected_values"] for item in intake["required_items"])
assert "post-reboot-owner-response-preflight.py" in intake["single_preflight_command"]
assert (
"post-reboot-credential-escrow-intake-scorecard.py"
in intake["scorecard_command"]
)
assert "projected_effective_escrow_missing_count=0" in intake["exit_criteria"]
owner_packet = intake["owner_packet"]["owner_packets"][0]
assert owner_packet["packet_id"] == "credential_escrow_evidence"
assert owner_packet["required_items"] == ESCROW_ITEMS
response = intake["owner_response_skeleton"]["responses"][0]
assert response["secret_value_collection_allowed"] is False
assert response["credential_marker_write_authorized"] is False
assert response["runtime_action_authorized"] is False
assert [item["item_id"] for item in response["escrow_items"]] == ESCROW_ITEMS
assert all(item["contains_secret_value"] is False for item in response["escrow_items"])
assert intake["operation_boundaries"]["payload_persisted"] is False
assert intake["operation_boundaries"]["credential_read_performed"] is False
assert intake["operation_boundaries"]["credential_marker_write_performed"] is False
assert intake["operation_boundaries"]["runtime_action_performed"] is False
def test_credential_escrow_evidence_intake_endpoint_returns_readiness():
app = FastAPI()
app.include_router(router, prefix="/api/v1")
@@ -72,6 +115,10 @@ def test_credential_escrow_evidence_intake_endpoint_returns_readiness():
assert data["runtime_gate_count"] == 0
assert data["secret_value_collection_allowed"] is False
assert data["rollups"]["missing_item_count"] == 5
assert data["single_preflight_intake"]["required_item_ids"] == ESCROW_ITEMS
assert data["single_preflight_intake"]["operation_boundaries"][
"secret_value_collection_allowed"
] is False
assert data["operation_boundaries"]["restore_execution_allowed"] is False
assert data["operation_boundaries"]["workflow_trigger_allowed"] is False