Merge remote-tracking branch 'gitea-ssh/main' into codex/p0-product-manifest-standard-20260629

This commit is contained in:
Your Name
2026-06-29 15:24:40 +08:00
3 changed files with 410 additions and 0 deletions

View File

@@ -319,6 +319,9 @@ from src.services.backup_restore_drill_approval_package_template import (
from src.services.credential_escrow_evidence_intake_readiness import (
load_latest_credential_escrow_evidence_intake_readiness,
)
from src.services.credential_escrow_evidence_intake_readiness import (
validate_credential_escrow_evidence_owner_response,
)
from src.services.delivery_closure_workbench import (
load_delivery_closure_workbench,
)
@@ -3750,6 +3753,40 @@ async def get_credential_escrow_evidence_intake_readiness() -> dict[str, Any]:
) from exc
@router.post(
"/credential-escrow-evidence-intake-readiness/validate-owner-response",
response_model=dict[str, Any],
summary="驗證 P0-005 credential escrow 脫敏 owner response",
description=(
"針對單次 owner-provided redacted non-secret credential escrow evidence "
"response 進行 no-persist validation此端點只回傳 independent reviewer "
"readiness / needs supplement / quarantined / rejected runtime action 分流,"
"不保存 payload、不讀 credential、不寫 escrow marker、不執行 backup / restore / "
"offsite sync、不觸發 workflow、不碰 host/K8s、不讀 raw session/SQLite。"
),
)
async def validate_credential_escrow_evidence_owner_response_packet(
owner_response: dict[str, Any],
) -> dict[str, Any]:
"""Return no-persist validation for one P0-005 redacted owner response."""
try:
return await asyncio.to_thread(
validate_credential_escrow_evidence_owner_response,
owner_response,
)
except FileNotFoundError as exc:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=str(exc),
) from exc
except (json.JSONDecodeError, ValueError) as exc:
logger.error("credential_escrow_evidence_owner_response_invalid", error=str(exc))
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="P0-005 credential escrow owner response 驗證器無效",
) from exc
@router.get(
"/backup-notification-policy",
response_model=dict[str, Any],

View File

@@ -9,6 +9,7 @@ touches hosts.
from __future__ import annotations
import json
import re
from pathlib import Path
from typing import Any
@@ -18,7 +19,50 @@ from src.services.snapshot_paths import default_security_dir
_DEFAULT_SECURITY_DIR = default_security_dir(Path(__file__))
_OWNER_REQUEST_FILE = "credential-escrow-evidence-owner-request.snapshot.json"
_SCHEMA_VERSION = "credential_escrow_evidence_intake_readiness_v1"
_VALIDATION_SCHEMA_VERSION = "credential_escrow_evidence_owner_response_validation_v1"
_OWNER_REQUEST_SCOPE = "credential_escrow_evidence_owner_request"
_OWNER_RESPONSE_SCHEMA = "awoooi_post_reboot_next_gate_owner_response_v1"
_GATE_ID = "credential_escrow_evidence"
_REQUIRED_ITEM_IDS = {
"restic_repository_password",
"offsite_provider_credentials",
"break_glass_admin_credentials",
"dns_registrar_recovery",
"oauth_ai_provider_recovery",
}
_FORBIDDEN_BOOLEAN_FIELDS = {
"runtime_action_requested",
"runtime_action_authorized",
"host_write_requested",
"host_write_authorized",
"secret_value_included",
"secret_value_collection_allowed",
"credential_marker_write_requested",
"credential_marker_write_authorized",
}
_PLACEHOLDER_VALUES = {
"",
"pending",
"todo",
"tbd",
"n/a",
"na",
"owner_role_here",
"owner_team_here",
"decision_reason_here",
"redacted_evidence_ref_here",
"non_secret_evidence_ref_here",
"followup_owner_here",
"reviewer_here",
}
_SECRET_VALUE_PATTERNS = {
"authorization_header": re.compile(r"Authorization\s*:", re.IGNORECASE),
"bearer_token": re.compile(r"Bearer\s+[A-Za-z0-9._~+/=-]{12,}", re.IGNORECASE),
"client_keys": re.compile(r"\bclient\.keys\b", re.IGNORECASE),
"password_assignment": re.compile(r"\bpassword\s*[:=]\s*[^,\s]+", re.IGNORECASE),
"private_key": re.compile(r"-----BEGIN [A-Z ]*PRIVATE KEY-----"),
"token_assignment": re.compile(r"\btoken\s*[:=]\s*[^,\s]+", re.IGNORECASE),
}
def load_latest_credential_escrow_evidence_intake_readiness(
@@ -45,6 +89,149 @@ def load_latest_credential_escrow_evidence_intake_readiness(
return payload
def validate_credential_escrow_evidence_owner_response(
owner_response: dict[str, Any],
readiness: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Validate one redacted P0-005 owner response without persisting it."""
current = readiness or load_latest_credential_escrow_evidence_intake_readiness()
blockers: list[str] = []
sensitive_hits = _find_sensitive_strings(owner_response)
forbidden_boolean_hits = _find_forbidden_booleans(owner_response)
if owner_response.get("schema_version") != _OWNER_RESPONSE_SCHEMA:
blockers.append(f"schema_version={owner_response.get('schema_version')!r}")
responses = [
item
for item in _list(owner_response.get("responses"))
if isinstance(item, dict) and str(item.get("gate_id") or "") == _GATE_ID
]
response = responses[0] if responses else {}
if not response:
blockers.append(f"{_GATE_ID}.response_missing")
for key in (
"owner_role",
"owner_team",
"decision",
"decision_reason",
"affected_scope",
"followup_owner",
):
if _is_placeholder(response.get(key)):
blockers.append(f"{_GATE_ID}.{key}_missing")
decision = str(response.get("decision") or "").strip().lower()
if decision not in {"accepted", "needs_supplement", "rejected"}:
blockers.append(f"{_GATE_ID}.decision_invalid={decision!r}")
elif decision != "accepted":
blockers.append(f"{_GATE_ID}.decision_not_accepted={decision!r}")
evidence_refs = [
ref
for ref in _list(response.get("redacted_evidence_refs"))
if not _is_placeholder(ref)
]
if not evidence_refs:
blockers.append(f"{_GATE_ID}.redacted_evidence_refs_missing")
escrow_items = [
item for item in _list(response.get("escrow_items")) if isinstance(item, dict)
]
seen_item_ids = {
str(item.get("item_id") or "").strip()
for item in escrow_items
if str(item.get("item_id") or "").strip()
}
missing_item_ids = sorted(_REQUIRED_ITEM_IDS - seen_item_ids)
unknown_item_ids = sorted(seen_item_ids - _REQUIRED_ITEM_IDS)
if missing_item_ids:
blockers.append(f"{_GATE_ID}.missing_items={missing_item_ids}")
for item_id in unknown_item_ids:
blockers.append(f"{_GATE_ID}.unknown_item={item_id!r}")
accepted_item_ids: list[str] = []
for item in escrow_items:
item_id = str(item.get("item_id") or "").strip()
if item_id not in _REQUIRED_ITEM_IDS:
continue
item_blocked = False
for key in (
"non_secret_evidence_ref",
"recovery_owner",
"reviewer",
"last_reviewed_at",
):
if _is_placeholder(item.get(key)):
blockers.append(f"{_GATE_ID}.{item_id}.{key}_missing")
item_blocked = True
if item.get("contains_secret_value") is not False:
blockers.append(f"{_GATE_ID}.{item_id}.contains_secret_value_not_false")
item_blocked = True
if not item_blocked:
accepted_item_ids.append(item_id)
if forbidden_boolean_hits:
status = "rejected_runtime_or_marker_action"
elif sensitive_hits:
status = "quarantined_sensitive_payload"
elif blockers:
status = "needs_supplement"
else:
status = "accepted_for_independent_reviewer_readiness_only"
owner_response_received_count = 1 if status in {
"accepted_for_independent_reviewer_readiness_only",
"needs_supplement",
} and response else 0
owner_response_accepted_count = (
1 if status == "accepted_for_independent_reviewer_readiness_only" else 0
)
return {
"schema_version": _VALIDATION_SCHEMA_VERSION,
"status": status,
"priority": "P0-005",
"scope": "credential_escrow_evidence_intake",
"source_readiness_status": current.get("status"),
"result": {
"owner_response_received_count": owner_response_received_count,
"owner_response_accepted_count": owner_response_accepted_count,
"required_item_count": len(_REQUIRED_ITEM_IDS),
"provided_item_count": len(seen_item_ids & _REQUIRED_ITEM_IDS),
"accepted_item_count": len(set(accepted_item_ids)),
"missing_item_ids": missing_item_ids,
"unknown_item_ids": unknown_item_ids,
"redacted_evidence_ref_count": len(evidence_refs),
"blocker_count": len(blockers),
"sensitive_payload_hit_count": len(sensitive_hits),
"forbidden_true_field_count": len(forbidden_boolean_hits),
"runtime_gate_count": 0,
"credential_marker_write_authorized_count": 0,
"secret_value_collection_allowed": False,
},
"blockers": blockers,
"sensitive_payload_hits": sensitive_hits,
"forbidden_boolean_hits": forbidden_boolean_hits,
"operation_boundaries": {
"payload_persisted": False,
"backup_execution_performed": False,
"restore_execution_performed": False,
"offsite_sync_execution_performed": False,
"credential_marker_write_performed": False,
"credential_read_performed": False,
"secret_plaintext_read": False,
"secret_value_collection_allowed": False,
"workflow_trigger_performed": False,
"host_or_k8s_write_performed": False,
"raw_session_or_sqlite_read_performed": False,
"runtime_action_performed": False,
},
"safe_next_step": (
"independent_reviewer_acceptance_then_marker_dry_run"
if status == "accepted_for_independent_reviewer_readiness_only"
else "supplement_redacted_non_secret_evidence_refs_without_secret_values"
),
}
def _build_payload(
owner_request: dict[str, Any],
matrix: dict[str, Any],
@@ -230,3 +417,67 @@ def _int(value: Any) -> int:
return int(value)
except (TypeError, ValueError):
return 0
def _normalized(value: Any) -> str:
return "" if value is None else str(value).strip()
def _is_placeholder(value: Any) -> bool:
text = _normalized(value)
lower = text.lower()
if lower in _PLACEHOLDER_VALUES:
return True
if re.fullmatch(r"<[^<>]+>", text):
return True
if lower in {"yyyy-mm-dd", "yyyy/mm/dd"}:
return True
return lower.startswith(
(
"vault-item-id-for-",
"sealed-envelope-id-for-",
"recovery-checklist-id-for-",
"ticket-id-for-",
)
)
def _collect_strings(value: Any, path: str = "$") -> list[tuple[str, str]]:
strings: list[tuple[str, str]] = []
if isinstance(value, str):
strings.append((path, value))
elif isinstance(value, dict):
for key, child in value.items():
strings.extend(_collect_strings(child, f"{path}.{key}"))
elif isinstance(value, list):
for index, child in enumerate(value):
strings.extend(_collect_strings(child, f"{path}[{index}]"))
return strings
def _find_sensitive_strings(payload: dict[str, Any]) -> list[str]:
hits: list[str] = []
for path, value in _collect_strings(payload):
if path.endswith(".item_id") and value in _REQUIRED_ITEM_IDS:
continue
if path.endswith(".gate_id") and value == _GATE_ID:
continue
for label, pattern in _SECRET_VALUE_PATTERNS.items():
if pattern.search(value):
hits.append(f"{label}_at={path}")
break
return hits
def _find_forbidden_booleans(value: Any, path: str = "$") -> list[str]:
hits: list[str] = []
if isinstance(value, dict):
for key, child in value.items():
child_path = f"{path}.{key}"
if key in _FORBIDDEN_BOOLEAN_FIELDS and child is not False:
hits.append(f"{child_path}={child!r}")
hits.extend(_find_forbidden_booleans(child, child_path))
elif isinstance(value, list):
for index, child in enumerate(value):
hits.extend(_find_forbidden_booleans(child, f"{path}[{index}]"))
return hits

View File

@@ -10,6 +10,18 @@ from src.api.v1.agents import router
from src.services.credential_escrow_evidence_intake_readiness import (
load_latest_credential_escrow_evidence_intake_readiness,
)
from src.services.credential_escrow_evidence_intake_readiness import (
validate_credential_escrow_evidence_owner_response,
)
ESCROW_ITEMS = [
"restic_repository_password",
"offsite_provider_credentials",
"break_glass_admin_credentials",
"dns_registrar_recovery",
"oauth_ai_provider_recovery",
]
def test_credential_escrow_evidence_intake_reports_p0_blocker():
@@ -52,6 +64,79 @@ def test_credential_escrow_evidence_intake_endpoint_returns_readiness():
assert data["operation_boundaries"]["workflow_trigger_allowed"] is False
def test_credential_escrow_evidence_owner_response_validator_accepts_redacted_refs_only():
payload = validate_credential_escrow_evidence_owner_response(
_valid_redacted_owner_response(),
readiness={"status": "blocked_waiting_non_secret_credential_escrow_evidence"},
)
assert payload["schema_version"] == "credential_escrow_evidence_owner_response_validation_v1"
assert payload["status"] == "accepted_for_independent_reviewer_readiness_only"
assert payload["result"]["owner_response_received_count"] == 1
assert payload["result"]["owner_response_accepted_count"] == 1
assert payload["result"]["required_item_count"] == 5
assert payload["result"]["accepted_item_count"] == 5
assert payload["result"]["runtime_gate_count"] == 0
assert payload["result"]["credential_marker_write_authorized_count"] == 0
assert payload["result"]["secret_value_collection_allowed"] is False
assert payload["operation_boundaries"]["payload_persisted"] is False
assert payload["operation_boundaries"]["credential_read_performed"] is False
assert payload["operation_boundaries"]["credential_marker_write_performed"] is False
assert payload["operation_boundaries"]["runtime_action_performed"] is False
def test_credential_escrow_evidence_owner_response_validator_quarantines_secret_payload():
response = _valid_redacted_owner_response()
response["responses"][0]["redacted_evidence_refs"] = [
"password=not-allowed-even-in-tests"
]
payload = validate_credential_escrow_evidence_owner_response(
response,
readiness={"status": "blocked_waiting_non_secret_credential_escrow_evidence"},
)
assert payload["status"] == "quarantined_sensitive_payload"
assert payload["result"]["owner_response_accepted_count"] == 0
assert payload["result"]["sensitive_payload_hit_count"] == 1
assert payload["operation_boundaries"]["payload_persisted"] is False
def test_credential_escrow_evidence_owner_response_validator_rejects_runtime_or_marker_action():
response = _valid_redacted_owner_response()
response["responses"][0]["runtime_action_authorized"] = True
response["responses"][0]["credential_marker_write_authorized"] = True
payload = validate_credential_escrow_evidence_owner_response(
response,
readiness={"status": "blocked_waiting_non_secret_credential_escrow_evidence"},
)
assert payload["status"] == "rejected_runtime_or_marker_action"
assert payload["result"]["owner_response_accepted_count"] == 0
assert payload["result"]["forbidden_true_field_count"] == 2
assert payload["operation_boundaries"]["credential_marker_write_performed"] is False
def test_credential_escrow_evidence_owner_response_endpoint_validates_without_persisting():
app = FastAPI()
app.include_router(router, prefix="/api/v1")
client = TestClient(app)
response = client.post(
"/api/v1/agents/credential-escrow-evidence-intake-readiness/validate-owner-response",
json=_valid_redacted_owner_response(),
)
assert response.status_code == 200
data = response.json()
assert data["status"] == "accepted_for_independent_reviewer_readiness_only"
assert data["result"]["owner_response_accepted_count"] == 1
assert data["operation_boundaries"]["payload_persisted"] is False
assert data["operation_boundaries"]["secret_plaintext_read"] is False
assert data["operation_boundaries"]["host_or_k8s_write_performed"] is False
def test_credential_escrow_evidence_intake_rejects_secret_collection(tmp_path):
security_dir = tmp_path / "security"
security_dir.mkdir()
@@ -131,3 +216,40 @@ def _backup_matrix() -> dict:
"credential_escrow_forbidden_true_field_count": 0,
}
}
def _valid_redacted_owner_response() -> dict:
return {
"schema_version": "awoooi_post_reboot_next_gate_owner_response_v1",
"responses": [
{
"gate_id": "credential_escrow_evidence",
"owner_role": "backup_dr_owner",
"owner_team": "platform_security",
"decision": "accepted",
"decision_reason": "reviewed redacted non-secret evidence refs only",
"affected_scope": "P0-005 DR credential escrow evidence",
"redacted_evidence_refs": ["review-ticket-20260629-p0-005"],
"followup_owner": "backup_dr_owner",
"runtime_action_requested": False,
"runtime_action_authorized": False,
"host_write_requested": False,
"host_write_authorized": False,
"secret_value_included": False,
"secret_value_collection_allowed": False,
"credential_marker_write_requested": False,
"credential_marker_write_authorized": False,
"escrow_items": [
{
"item_id": item_id,
"non_secret_evidence_ref": f"review-ticket-20260629-p0-005-{index}",
"recovery_owner": "backup_dr_owner",
"reviewer": "security_reviewer",
"last_reviewed_at": "2026-06-29",
"contains_secret_value": False,
}
for index, item_id in enumerate(ESCROW_ITEMS)
],
}
],
}