diff --git a/apps/api/src/api/v1/agents.py b/apps/api/src/api/v1/agents.py index 896aaf7a..190d7a1a 100644 --- a/apps/api/src/api/v1/agents.py +++ b/apps/api/src/api/v1/agents.py @@ -319,6 +319,9 @@ from src.services.backup_restore_drill_approval_package_template import ( from src.services.credential_escrow_evidence_intake_readiness import ( load_latest_credential_escrow_evidence_intake_readiness, ) +from src.services.credential_escrow_evidence_intake_readiness import ( + validate_credential_escrow_evidence_owner_response, +) from src.services.delivery_closure_workbench import ( load_delivery_closure_workbench, ) @@ -3750,6 +3753,40 @@ async def get_credential_escrow_evidence_intake_readiness() -> dict[str, Any]: ) from exc +@router.post( + "/credential-escrow-evidence-intake-readiness/validate-owner-response", + response_model=dict[str, Any], + summary="驗證 P0-005 credential escrow 脫敏 owner response", + description=( + "針對單次 owner-provided redacted non-secret credential escrow evidence " + "response 進行 no-persist validation;此端點只回傳 independent reviewer " + "readiness / needs supplement / quarantined / rejected runtime action 分流," + "不保存 payload、不讀 credential、不寫 escrow marker、不執行 backup / restore / " + "offsite sync、不觸發 workflow、不碰 host/K8s、不讀 raw session/SQLite。" + ), +) +async def validate_credential_escrow_evidence_owner_response_packet( + owner_response: dict[str, Any], +) -> dict[str, Any]: + """Return no-persist validation for one P0-005 redacted owner response.""" + try: + return await asyncio.to_thread( + validate_credential_escrow_evidence_owner_response, + owner_response, + ) + except FileNotFoundError as exc: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=str(exc), + ) from exc + except (json.JSONDecodeError, ValueError) as exc: + logger.error("credential_escrow_evidence_owner_response_invalid", error=str(exc)) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="P0-005 credential escrow owner response 驗證器無效", + ) from exc + + @router.get( "/backup-notification-policy", response_model=dict[str, Any], diff --git a/apps/api/src/services/credential_escrow_evidence_intake_readiness.py b/apps/api/src/services/credential_escrow_evidence_intake_readiness.py index 8dc0390b..c0c8aeca 100644 --- a/apps/api/src/services/credential_escrow_evidence_intake_readiness.py +++ b/apps/api/src/services/credential_escrow_evidence_intake_readiness.py @@ -9,6 +9,7 @@ touches hosts. from __future__ import annotations import json +import re from pathlib import Path from typing import Any @@ -18,7 +19,50 @@ from src.services.snapshot_paths import default_security_dir _DEFAULT_SECURITY_DIR = default_security_dir(Path(__file__)) _OWNER_REQUEST_FILE = "credential-escrow-evidence-owner-request.snapshot.json" _SCHEMA_VERSION = "credential_escrow_evidence_intake_readiness_v1" +_VALIDATION_SCHEMA_VERSION = "credential_escrow_evidence_owner_response_validation_v1" _OWNER_REQUEST_SCOPE = "credential_escrow_evidence_owner_request" +_OWNER_RESPONSE_SCHEMA = "awoooi_post_reboot_next_gate_owner_response_v1" +_GATE_ID = "credential_escrow_evidence" +_REQUIRED_ITEM_IDS = { + "restic_repository_password", + "offsite_provider_credentials", + "break_glass_admin_credentials", + "dns_registrar_recovery", + "oauth_ai_provider_recovery", +} +_FORBIDDEN_BOOLEAN_FIELDS = { + "runtime_action_requested", + "runtime_action_authorized", + "host_write_requested", + "host_write_authorized", + "secret_value_included", + "secret_value_collection_allowed", + "credential_marker_write_requested", + "credential_marker_write_authorized", +} +_PLACEHOLDER_VALUES = { + "", + "pending", + "todo", + "tbd", + "n/a", + "na", + "owner_role_here", + "owner_team_here", + "decision_reason_here", + "redacted_evidence_ref_here", + "non_secret_evidence_ref_here", + "followup_owner_here", + "reviewer_here", +} +_SECRET_VALUE_PATTERNS = { + "authorization_header": re.compile(r"Authorization\s*:", re.IGNORECASE), + "bearer_token": re.compile(r"Bearer\s+[A-Za-z0-9._~+/=-]{12,}", re.IGNORECASE), + "client_keys": re.compile(r"\bclient\.keys\b", re.IGNORECASE), + "password_assignment": re.compile(r"\bpassword\s*[:=]\s*[^,\s]+", re.IGNORECASE), + "private_key": re.compile(r"-----BEGIN [A-Z ]*PRIVATE KEY-----"), + "token_assignment": re.compile(r"\btoken\s*[:=]\s*[^,\s]+", re.IGNORECASE), +} def load_latest_credential_escrow_evidence_intake_readiness( @@ -45,6 +89,149 @@ def load_latest_credential_escrow_evidence_intake_readiness( return payload +def validate_credential_escrow_evidence_owner_response( + owner_response: dict[str, Any], + readiness: dict[str, Any] | None = None, +) -> dict[str, Any]: + """Validate one redacted P0-005 owner response without persisting it.""" + current = readiness or load_latest_credential_escrow_evidence_intake_readiness() + blockers: list[str] = [] + sensitive_hits = _find_sensitive_strings(owner_response) + forbidden_boolean_hits = _find_forbidden_booleans(owner_response) + if owner_response.get("schema_version") != _OWNER_RESPONSE_SCHEMA: + blockers.append(f"schema_version={owner_response.get('schema_version')!r}") + + responses = [ + item + for item in _list(owner_response.get("responses")) + if isinstance(item, dict) and str(item.get("gate_id") or "") == _GATE_ID + ] + response = responses[0] if responses else {} + if not response: + blockers.append(f"{_GATE_ID}.response_missing") + + for key in ( + "owner_role", + "owner_team", + "decision", + "decision_reason", + "affected_scope", + "followup_owner", + ): + if _is_placeholder(response.get(key)): + blockers.append(f"{_GATE_ID}.{key}_missing") + decision = str(response.get("decision") or "").strip().lower() + if decision not in {"accepted", "needs_supplement", "rejected"}: + blockers.append(f"{_GATE_ID}.decision_invalid={decision!r}") + elif decision != "accepted": + blockers.append(f"{_GATE_ID}.decision_not_accepted={decision!r}") + + evidence_refs = [ + ref + for ref in _list(response.get("redacted_evidence_refs")) + if not _is_placeholder(ref) + ] + if not evidence_refs: + blockers.append(f"{_GATE_ID}.redacted_evidence_refs_missing") + + escrow_items = [ + item for item in _list(response.get("escrow_items")) if isinstance(item, dict) + ] + seen_item_ids = { + str(item.get("item_id") or "").strip() + for item in escrow_items + if str(item.get("item_id") or "").strip() + } + missing_item_ids = sorted(_REQUIRED_ITEM_IDS - seen_item_ids) + unknown_item_ids = sorted(seen_item_ids - _REQUIRED_ITEM_IDS) + if missing_item_ids: + blockers.append(f"{_GATE_ID}.missing_items={missing_item_ids}") + for item_id in unknown_item_ids: + blockers.append(f"{_GATE_ID}.unknown_item={item_id!r}") + accepted_item_ids: list[str] = [] + for item in escrow_items: + item_id = str(item.get("item_id") or "").strip() + if item_id not in _REQUIRED_ITEM_IDS: + continue + item_blocked = False + for key in ( + "non_secret_evidence_ref", + "recovery_owner", + "reviewer", + "last_reviewed_at", + ): + if _is_placeholder(item.get(key)): + blockers.append(f"{_GATE_ID}.{item_id}.{key}_missing") + item_blocked = True + if item.get("contains_secret_value") is not False: + blockers.append(f"{_GATE_ID}.{item_id}.contains_secret_value_not_false") + item_blocked = True + if not item_blocked: + accepted_item_ids.append(item_id) + + if forbidden_boolean_hits: + status = "rejected_runtime_or_marker_action" + elif sensitive_hits: + status = "quarantined_sensitive_payload" + elif blockers: + status = "needs_supplement" + else: + status = "accepted_for_independent_reviewer_readiness_only" + + owner_response_received_count = 1 if status in { + "accepted_for_independent_reviewer_readiness_only", + "needs_supplement", + } and response else 0 + owner_response_accepted_count = ( + 1 if status == "accepted_for_independent_reviewer_readiness_only" else 0 + ) + return { + "schema_version": _VALIDATION_SCHEMA_VERSION, + "status": status, + "priority": "P0-005", + "scope": "credential_escrow_evidence_intake", + "source_readiness_status": current.get("status"), + "result": { + "owner_response_received_count": owner_response_received_count, + "owner_response_accepted_count": owner_response_accepted_count, + "required_item_count": len(_REQUIRED_ITEM_IDS), + "provided_item_count": len(seen_item_ids & _REQUIRED_ITEM_IDS), + "accepted_item_count": len(set(accepted_item_ids)), + "missing_item_ids": missing_item_ids, + "unknown_item_ids": unknown_item_ids, + "redacted_evidence_ref_count": len(evidence_refs), + "blocker_count": len(blockers), + "sensitive_payload_hit_count": len(sensitive_hits), + "forbidden_true_field_count": len(forbidden_boolean_hits), + "runtime_gate_count": 0, + "credential_marker_write_authorized_count": 0, + "secret_value_collection_allowed": False, + }, + "blockers": blockers, + "sensitive_payload_hits": sensitive_hits, + "forbidden_boolean_hits": forbidden_boolean_hits, + "operation_boundaries": { + "payload_persisted": False, + "backup_execution_performed": False, + "restore_execution_performed": False, + "offsite_sync_execution_performed": False, + "credential_marker_write_performed": False, + "credential_read_performed": False, + "secret_plaintext_read": False, + "secret_value_collection_allowed": False, + "workflow_trigger_performed": False, + "host_or_k8s_write_performed": False, + "raw_session_or_sqlite_read_performed": False, + "runtime_action_performed": False, + }, + "safe_next_step": ( + "independent_reviewer_acceptance_then_marker_dry_run" + if status == "accepted_for_independent_reviewer_readiness_only" + else "supplement_redacted_non_secret_evidence_refs_without_secret_values" + ), + } + + def _build_payload( owner_request: dict[str, Any], matrix: dict[str, Any], @@ -230,3 +417,67 @@ def _int(value: Any) -> int: return int(value) except (TypeError, ValueError): return 0 + + +def _normalized(value: Any) -> str: + return "" if value is None else str(value).strip() + + +def _is_placeholder(value: Any) -> bool: + text = _normalized(value) + lower = text.lower() + if lower in _PLACEHOLDER_VALUES: + return True + if re.fullmatch(r"<[^<>]+>", text): + return True + if lower in {"yyyy-mm-dd", "yyyy/mm/dd"}: + return True + return lower.startswith( + ( + "vault-item-id-for-", + "sealed-envelope-id-for-", + "recovery-checklist-id-for-", + "ticket-id-for-", + ) + ) + + +def _collect_strings(value: Any, path: str = "$") -> list[tuple[str, str]]: + strings: list[tuple[str, str]] = [] + if isinstance(value, str): + strings.append((path, value)) + elif isinstance(value, dict): + for key, child in value.items(): + strings.extend(_collect_strings(child, f"{path}.{key}")) + elif isinstance(value, list): + for index, child in enumerate(value): + strings.extend(_collect_strings(child, f"{path}[{index}]")) + return strings + + +def _find_sensitive_strings(payload: dict[str, Any]) -> list[str]: + hits: list[str] = [] + for path, value in _collect_strings(payload): + if path.endswith(".item_id") and value in _REQUIRED_ITEM_IDS: + continue + if path.endswith(".gate_id") and value == _GATE_ID: + continue + for label, pattern in _SECRET_VALUE_PATTERNS.items(): + if pattern.search(value): + hits.append(f"{label}_at={path}") + break + return hits + + +def _find_forbidden_booleans(value: Any, path: str = "$") -> list[str]: + hits: list[str] = [] + if isinstance(value, dict): + for key, child in value.items(): + child_path = f"{path}.{key}" + if key in _FORBIDDEN_BOOLEAN_FIELDS and child is not False: + hits.append(f"{child_path}={child!r}") + hits.extend(_find_forbidden_booleans(child, child_path)) + elif isinstance(value, list): + for index, child in enumerate(value): + hits.extend(_find_forbidden_booleans(child, f"{path}[{index}]")) + return hits diff --git a/apps/api/tests/test_credential_escrow_evidence_intake_readiness_api.py b/apps/api/tests/test_credential_escrow_evidence_intake_readiness_api.py index 6b48f9f4..4f8aa5ce 100644 --- a/apps/api/tests/test_credential_escrow_evidence_intake_readiness_api.py +++ b/apps/api/tests/test_credential_escrow_evidence_intake_readiness_api.py @@ -10,6 +10,18 @@ from src.api.v1.agents import router from src.services.credential_escrow_evidence_intake_readiness import ( load_latest_credential_escrow_evidence_intake_readiness, ) +from src.services.credential_escrow_evidence_intake_readiness import ( + validate_credential_escrow_evidence_owner_response, +) + + +ESCROW_ITEMS = [ + "restic_repository_password", + "offsite_provider_credentials", + "break_glass_admin_credentials", + "dns_registrar_recovery", + "oauth_ai_provider_recovery", +] def test_credential_escrow_evidence_intake_reports_p0_blocker(): @@ -52,6 +64,79 @@ def test_credential_escrow_evidence_intake_endpoint_returns_readiness(): assert data["operation_boundaries"]["workflow_trigger_allowed"] is False +def test_credential_escrow_evidence_owner_response_validator_accepts_redacted_refs_only(): + payload = validate_credential_escrow_evidence_owner_response( + _valid_redacted_owner_response(), + readiness={"status": "blocked_waiting_non_secret_credential_escrow_evidence"}, + ) + + assert payload["schema_version"] == "credential_escrow_evidence_owner_response_validation_v1" + assert payload["status"] == "accepted_for_independent_reviewer_readiness_only" + assert payload["result"]["owner_response_received_count"] == 1 + assert payload["result"]["owner_response_accepted_count"] == 1 + assert payload["result"]["required_item_count"] == 5 + assert payload["result"]["accepted_item_count"] == 5 + assert payload["result"]["runtime_gate_count"] == 0 + assert payload["result"]["credential_marker_write_authorized_count"] == 0 + assert payload["result"]["secret_value_collection_allowed"] is False + assert payload["operation_boundaries"]["payload_persisted"] is False + assert payload["operation_boundaries"]["credential_read_performed"] is False + assert payload["operation_boundaries"]["credential_marker_write_performed"] is False + assert payload["operation_boundaries"]["runtime_action_performed"] is False + + +def test_credential_escrow_evidence_owner_response_validator_quarantines_secret_payload(): + response = _valid_redacted_owner_response() + response["responses"][0]["redacted_evidence_refs"] = [ + "password=not-allowed-even-in-tests" + ] + + payload = validate_credential_escrow_evidence_owner_response( + response, + readiness={"status": "blocked_waiting_non_secret_credential_escrow_evidence"}, + ) + + assert payload["status"] == "quarantined_sensitive_payload" + assert payload["result"]["owner_response_accepted_count"] == 0 + assert payload["result"]["sensitive_payload_hit_count"] == 1 + assert payload["operation_boundaries"]["payload_persisted"] is False + + +def test_credential_escrow_evidence_owner_response_validator_rejects_runtime_or_marker_action(): + response = _valid_redacted_owner_response() + response["responses"][0]["runtime_action_authorized"] = True + response["responses"][0]["credential_marker_write_authorized"] = True + + payload = validate_credential_escrow_evidence_owner_response( + response, + readiness={"status": "blocked_waiting_non_secret_credential_escrow_evidence"}, + ) + + assert payload["status"] == "rejected_runtime_or_marker_action" + assert payload["result"]["owner_response_accepted_count"] == 0 + assert payload["result"]["forbidden_true_field_count"] == 2 + assert payload["operation_boundaries"]["credential_marker_write_performed"] is False + + +def test_credential_escrow_evidence_owner_response_endpoint_validates_without_persisting(): + app = FastAPI() + app.include_router(router, prefix="/api/v1") + client = TestClient(app) + + response = client.post( + "/api/v1/agents/credential-escrow-evidence-intake-readiness/validate-owner-response", + json=_valid_redacted_owner_response(), + ) + + assert response.status_code == 200 + data = response.json() + assert data["status"] == "accepted_for_independent_reviewer_readiness_only" + assert data["result"]["owner_response_accepted_count"] == 1 + assert data["operation_boundaries"]["payload_persisted"] is False + assert data["operation_boundaries"]["secret_plaintext_read"] is False + assert data["operation_boundaries"]["host_or_k8s_write_performed"] is False + + def test_credential_escrow_evidence_intake_rejects_secret_collection(tmp_path): security_dir = tmp_path / "security" security_dir.mkdir() @@ -131,3 +216,40 @@ def _backup_matrix() -> dict: "credential_escrow_forbidden_true_field_count": 0, } } + + +def _valid_redacted_owner_response() -> dict: + return { + "schema_version": "awoooi_post_reboot_next_gate_owner_response_v1", + "responses": [ + { + "gate_id": "credential_escrow_evidence", + "owner_role": "backup_dr_owner", + "owner_team": "platform_security", + "decision": "accepted", + "decision_reason": "reviewed redacted non-secret evidence refs only", + "affected_scope": "P0-005 DR credential escrow evidence", + "redacted_evidence_refs": ["review-ticket-20260629-p0-005"], + "followup_owner": "backup_dr_owner", + "runtime_action_requested": False, + "runtime_action_authorized": False, + "host_write_requested": False, + "host_write_authorized": False, + "secret_value_included": False, + "secret_value_collection_allowed": False, + "credential_marker_write_requested": False, + "credential_marker_write_authorized": False, + "escrow_items": [ + { + "item_id": item_id, + "non_secret_evidence_ref": f"review-ticket-20260629-p0-005-{index}", + "recovery_owner": "backup_dr_owner", + "reviewer": "security_reviewer", + "last_reviewed_at": "2026-06-29", + "contains_secret_value": False, + } + for index, item_id in enumerate(ESCROW_ITEMS) + ], + } + ], + }