feat(governance): expose credential escrow intake readiness

This commit is contained in:
Your Name
2026-06-29 13:47:57 +08:00
parent 9362588ced
commit cecfa54575
6 changed files with 425 additions and 0 deletions

View File

@@ -224,6 +224,8 @@ jobs:
;;
apps/api/src/services/heartbeat_report_service.py)
;;
apps/api/src/services/credential_escrow_evidence_intake_readiness.py)
;;
apps/api/Dockerfile)
;;
apps/api/src/services/awoooi_gitea_onboarding_warning_step_dashboard.py)
@@ -280,6 +282,8 @@ jobs:
;;
apps/api/tests/test_delivery_closure_workbench_api.py)
;;
apps/api/tests/test_credential_escrow_evidence_intake_readiness_api.py)
;;
apps/api/tests/e2e_network_test.py)
;;
apps/api/tests/test_p0_cicd_baseline_source_readiness_api.py)
@@ -397,6 +401,7 @@ jobs:
src/services/decision_fusion.py \
src/services/delivery_closure_workbench.py \
src/services/heartbeat_report_service.py \
src/services/credential_escrow_evidence_intake_readiness.py \
src/services/awoooi_gitea_onboarding_warning_step_dashboard.py \
src/services/awoooi_gitea_onboarding_warning_step_owner_package.py \
src/services/awoooi_gitea_onboarding_warning_step_owner_response_preflight.py \
@@ -418,6 +423,7 @@ jobs:
tests/test_approval_pending_visibility.py \
tests/test_awooop_operator_timeline_labels.py::test_outbound_timeline_title_labels_runbook_review \
tests/test_delivery_closure_workbench_api.py \
tests/test_credential_escrow_evidence_intake_readiness_api.py \
tests/e2e_network_test.py::TestHMACVerification::test_valid_hmac_signature \
tests/test_p0_cicd_baseline_source_readiness_api.py \
tests/test_product_awoooi_manifest_standard_api.py \

View File

@@ -316,6 +316,9 @@ from src.services.backup_notification_policy import (
from src.services.backup_restore_drill_approval_package_template import (
load_latest_backup_restore_drill_approval_package_template,
)
from src.services.credential_escrow_evidence_intake_readiness import (
load_latest_credential_escrow_evidence_intake_readiness,
)
from src.services.delivery_closure_workbench import (
load_delivery_closure_workbench,
)
@@ -3678,6 +3681,36 @@ async def get_backup_dr_readiness_matrix() -> dict[str, Any]:
) from exc
@router.get(
"/credential-escrow-evidence-intake-readiness",
response_model=dict[str, Any],
summary="取得 P0-005 credential escrow evidence intake readiness",
description=(
"讀取已提交的 P0-005 credential escrow evidence intake 只讀投影;"
"此端點只回傳 redacted evidence-ref metadata 與目前缺項,"
"不讀 credential、不寫 escrow marker、不執行 backup/restore/offsite sync、"
"不觸發 workflow、不碰 host/K8s、不讀 raw session/SQLite。"
),
)
async def get_credential_escrow_evidence_intake_readiness() -> dict[str, Any]:
"""Return P0-005 credential escrow evidence intake readiness."""
try:
return await asyncio.to_thread(
load_latest_credential_escrow_evidence_intake_readiness
)
except FileNotFoundError as exc:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=str(exc),
) from exc
except (json.JSONDecodeError, ValueError) as exc:
logger.error("credential_escrow_evidence_intake_invalid", error=str(exc))
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="P0-005 credential escrow evidence intake readiness 快照無效",
) from exc
@router.get(
"/backup-notification-policy",
response_model=dict[str, Any],

View File

@@ -0,0 +1,232 @@
"""P0-005 credential escrow evidence intake readiness.
This read-only projection turns the Backup / DR escrow blocker into a direct
API contract. It only reads committed, redacted metadata snapshots; it never
reads credential values, writes escrow markers, triggers backups/restores, or
touches hosts.
"""
from __future__ import annotations
import json
from pathlib import Path
from typing import Any
from src.services.backup_dr_readiness_matrix import load_latest_backup_dr_readiness_matrix
from src.services.snapshot_paths import default_security_dir
_DEFAULT_SECURITY_DIR = default_security_dir(Path(__file__))
_OWNER_REQUEST_FILE = "credential-escrow-evidence-owner-request.snapshot.json"
_SCHEMA_VERSION = "credential_escrow_evidence_intake_readiness_v1"
_OWNER_REQUEST_SCOPE = "credential_escrow_evidence_owner_request"
def load_latest_credential_escrow_evidence_intake_readiness(
security_dir: Path | None = None,
backup_dr_readiness_matrix: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Load P0-005 credential escrow evidence intake readiness."""
directory = security_dir or _DEFAULT_SECURITY_DIR
path = directory / _OWNER_REQUEST_FILE
with path.open(encoding="utf-8") as handle:
owner_request = json.load(handle)
if not isinstance(owner_request, dict):
raise ValueError(f"{path}: expected JSON object")
matrix = backup_dr_readiness_matrix or load_latest_backup_dr_readiness_matrix()
if not isinstance(matrix, dict):
raise ValueError("backup_dr_readiness_matrix: expected JSON object")
_require_owner_request(owner_request, str(path))
_require_backup_rollups(matrix, "backup_dr_readiness_matrix")
payload = _build_payload(owner_request, matrix, path)
_require_operation_boundaries(payload, str(path))
_require_rollup_consistency(payload, str(path))
return payload
def _build_payload(
owner_request: dict[str, Any],
matrix: dict[str, Any],
owner_request_path: Path,
) -> dict[str, Any]:
rollups = _dict(matrix.get("rollups"))
missing_items = [_normalize_item(item) for item in _list(owner_request.get("missing_items"))]
blocked_items = [
item["item_id"]
for item in missing_items
if item.get("status") != "verified"
]
effective_missing = _int(rollups.get("credential_escrow_effective_missing_count"))
forbidden_values = _strings(owner_request.get("forbidden_values"))
status = str(
rollups.get("credential_escrow_intake_status")
or "blocked_waiting_non_secret_credential_escrow_evidence"
)
if effective_missing == 0 and not blocked_items:
status = "ready_for_escrow_marker_review"
payload = {
"schema_version": _SCHEMA_VERSION,
"generated_at": owner_request.get("generated_at"),
"priority": "P0-005",
"scope": "credential_escrow_evidence_intake",
"status": status,
"readback": {
"workplan_id": "P0-005",
"workplan_title": "產品資料與備份 contract",
"source_owner_request_ref": (
f"docs/security/{owner_request_path.name}"
),
"source_backup_dr_readiness_matrix_ref": (
"docs/evaluations/backup_dr_readiness_matrix_2026-06-04.json"
),
"safe_next_step": (
"collect_redacted_non_secret_evidence_refs_then_rerun_preflight"
),
},
"required_evidence_items": missing_items,
"forbidden_values": forbidden_values,
"next_actions": [
"collect_redacted_non_secret_evidence_refs_for_all_missing_items",
"rerun_owner_response_preflight_without_secret_values",
"keep_credential_marker_write_closed_until_preflight_accepts_all_items",
],
"blocked_operations": [
"credential_marker_write",
"credential_read",
"secret_plaintext_export",
"restore_execution",
"offsite_sync_execution",
"backup_execution",
"workflow_dispatch",
"host_or_k8s_write",
],
"rollups": {
"required_item_count": len(missing_items),
"missing_item_count": len(blocked_items),
"blocked_item_ids": blocked_items,
"effective_escrow_missing_count": effective_missing,
"owner_response_received_count": _int(
rollups.get("credential_escrow_owner_response_received_count")
),
"owner_response_accepted_count": _int(
rollups.get("credential_escrow_owner_response_accepted_count")
),
"runtime_gate_count": _int(rollups.get("credential_escrow_runtime_gate_count")),
"credential_marker_write_authorized_count": _int(
rollups.get("credential_marker_write_authorized_count")
),
"secret_value_collection_allowed": bool(
rollups.get("credential_escrow_secret_value_collection_allowed")
),
"forbidden_true_field_count": _int(
rollups.get("credential_escrow_forbidden_true_field_count")
),
"forbidden_value_count": len(forbidden_values),
},
"operation_boundaries": {
"read_only_api_allowed": True,
"backup_execution_allowed": False,
"restore_execution_allowed": False,
"offsite_sync_execution_allowed": False,
"credential_marker_write_allowed": False,
"credential_read_allowed": False,
"secret_plaintext_allowed": False,
"secret_value_collection_allowed": False,
"workflow_trigger_allowed": False,
"host_or_k8s_write_allowed": False,
"raw_session_or_sqlite_read_allowed": False,
},
}
return payload
def _normalize_item(item: Any) -> dict[str, Any]:
data = _dict(item)
return {
"item_id": str(data.get("item") or data.get("item_id") or ""),
"status": str(data.get("status") or "missing"),
"allowed_evidence_id_types": _strings(data.get("allowed_evidence_id_types")),
"contains_secret_value_allowed": False,
"required_non_secret_evidence_ref": True,
}
def _require_owner_request(payload: dict[str, Any], label: str) -> None:
if payload.get("scope") != _OWNER_REQUEST_SCOPE:
raise ValueError(f"{label}: scope must be {_OWNER_REQUEST_SCOPE}")
if payload.get("gates", {}).get("secret_value_collection_authorized") is not False:
raise ValueError(f"{label}: secret value collection must remain false")
if payload.get("gates", {}).get("marker_write_completed") is not False:
raise ValueError(f"{label}: marker_write_completed must remain false")
if not _list(payload.get("missing_items")):
raise ValueError(f"{label}: missing_items must be present")
def _require_backup_rollups(payload: dict[str, Any], label: str) -> None:
rollups = _dict(payload.get("rollups"))
if rollups.get("credential_escrow_secret_value_collection_allowed") is not False:
raise ValueError(f"{label}: secret value collection must remain false")
if _int(rollups.get("credential_marker_write_authorized_count")) != 0:
raise ValueError(f"{label}: credential marker write must remain closed")
def _require_operation_boundaries(payload: dict[str, Any], label: str) -> None:
boundaries = _dict(payload.get("operation_boundaries"))
if boundaries.get("read_only_api_allowed") is not True:
raise ValueError(f"{label}: read_only_api_allowed must be true")
blocked_flags = {
"backup_execution_allowed",
"restore_execution_allowed",
"offsite_sync_execution_allowed",
"credential_marker_write_allowed",
"credential_read_allowed",
"secret_plaintext_allowed",
"secret_value_collection_allowed",
"workflow_trigger_allowed",
"host_or_k8s_write_allowed",
"raw_session_or_sqlite_read_allowed",
}
allowed = sorted(flag for flag in blocked_flags if boundaries.get(flag) is not False)
if allowed:
raise ValueError(f"{label}: operation boundaries must remain false: {allowed}")
def _require_rollup_consistency(payload: dict[str, Any], label: str) -> None:
items = _list(payload.get("required_evidence_items"))
blocked = [
str(_dict(item).get("item_id") or "")
for item in items
if _dict(item).get("status") != "verified"
]
rollups = _dict(payload.get("rollups"))
if rollups.get("required_item_count") != len(items):
raise ValueError(f"{label}: required_item_count mismatch")
if rollups.get("missing_item_count") != len(blocked):
raise ValueError(f"{label}: missing_item_count mismatch")
if rollups.get("blocked_item_ids") != blocked:
raise ValueError(f"{label}: blocked_item_ids mismatch")
if rollups.get("secret_value_collection_allowed") is not False:
raise ValueError(f"{label}: secret_value_collection_allowed must be false")
if rollups.get("credential_marker_write_authorized_count") != 0:
raise ValueError(f"{label}: marker write must remain closed")
def _dict(value: Any) -> dict[str, Any]:
return value if isinstance(value, dict) else {}
def _list(value: Any) -> list[Any]:
return value if isinstance(value, list) else []
def _strings(value: Any) -> list[str]:
return [str(item) for item in _list(value) if str(item)]
def _int(value: Any) -> int:
try:
return int(value)
except (TypeError, ValueError):
return 0

View File

@@ -0,0 +1,133 @@
from __future__ import annotations
import json
import pytest
from fastapi import FastAPI
from fastapi.testclient import TestClient
from src.api.v1.agents import router
from src.services.credential_escrow_evidence_intake_readiness import (
load_latest_credential_escrow_evidence_intake_readiness,
)
def test_credential_escrow_evidence_intake_reports_p0_blocker():
payload = load_latest_credential_escrow_evidence_intake_readiness()
assert payload["schema_version"] == "credential_escrow_evidence_intake_readiness_v1"
assert payload["priority"] == "P0-005"
assert payload["status"] == "blocked_waiting_non_secret_credential_escrow_evidence"
assert payload["rollups"]["required_item_count"] == 5
assert payload["rollups"]["missing_item_count"] == 5
assert payload["rollups"]["effective_escrow_missing_count"] == 5
assert payload["rollups"]["owner_response_received_count"] == 0
assert payload["rollups"]["owner_response_accepted_count"] == 0
assert payload["rollups"]["runtime_gate_count"] == 0
assert payload["rollups"]["credential_marker_write_authorized_count"] == 0
assert payload["rollups"]["secret_value_collection_allowed"] is False
assert payload["operation_boundaries"]["credential_marker_write_allowed"] is False
assert payload["operation_boundaries"]["credential_read_allowed"] is False
assert payload["operation_boundaries"]["secret_plaintext_allowed"] is False
assert payload["operation_boundaries"]["raw_session_or_sqlite_read_allowed"] is False
assert "restic_repository_password" in payload["rollups"]["blocked_item_ids"]
assert all(
item["contains_secret_value_allowed"] is False
for item in payload["required_evidence_items"]
)
def test_credential_escrow_evidence_intake_endpoint_returns_readiness():
app = FastAPI()
app.include_router(router, prefix="/api/v1")
client = TestClient(app)
response = client.get("/api/v1/agents/credential-escrow-evidence-intake-readiness")
assert response.status_code == 200
data = response.json()
assert data["priority"] == "P0-005"
assert data["rollups"]["missing_item_count"] == 5
assert data["operation_boundaries"]["restore_execution_allowed"] is False
assert data["operation_boundaries"]["workflow_trigger_allowed"] is False
def test_credential_escrow_evidence_intake_rejects_secret_collection(tmp_path):
security_dir = tmp_path / "security"
security_dir.mkdir()
owner_request = {
"scope": "credential_escrow_evidence_owner_request",
"generated_at": "2026-06-29T00:00:00+08:00",
"missing_items": [
{
"item": "restic_repository_password",
"status": "missing",
"allowed_evidence_id_types": ["vault_item_id"],
}
],
"forbidden_values": ["password"],
"gates": {
"secret_value_collection_authorized": True,
"marker_write_completed": False,
},
}
(security_dir / "credential-escrow-evidence-owner-request.snapshot.json").write_text(
json.dumps(owner_request),
encoding="utf-8",
)
with pytest.raises(ValueError, match="secret value collection"):
load_latest_credential_escrow_evidence_intake_readiness(
security_dir,
backup_dr_readiness_matrix=_backup_matrix(),
)
def test_credential_escrow_evidence_intake_rejects_marker_write_authorization(tmp_path):
security_dir = tmp_path / "security"
security_dir.mkdir()
owner_request = {
"scope": "credential_escrow_evidence_owner_request",
"generated_at": "2026-06-29T00:00:00+08:00",
"missing_items": [
{
"item": "restic_repository_password",
"status": "missing",
"allowed_evidence_id_types": ["vault_item_id"],
}
],
"forbidden_values": ["password"],
"gates": {
"secret_value_collection_authorized": False,
"marker_write_completed": False,
},
}
(security_dir / "credential-escrow-evidence-owner-request.snapshot.json").write_text(
json.dumps(owner_request),
encoding="utf-8",
)
matrix = _backup_matrix()
matrix["rollups"]["credential_marker_write_authorized_count"] = 1
with pytest.raises(ValueError, match="credential marker write"):
load_latest_credential_escrow_evidence_intake_readiness(
security_dir,
backup_dr_readiness_matrix=matrix,
)
def _backup_matrix() -> dict:
return {
"rollups": {
"credential_escrow_intake_status": (
"blocked_waiting_non_secret_credential_escrow_evidence"
),
"credential_escrow_effective_missing_count": 1,
"credential_escrow_owner_response_received_count": 0,
"credential_escrow_owner_response_accepted_count": 0,
"credential_escrow_runtime_gate_count": 0,
"credential_marker_write_authorized_count": 0,
"credential_escrow_secret_value_collection_allowed": False,
"credential_escrow_forbidden_true_field_count": 0,
}
}

View File

@@ -1,3 +1,17 @@
## 2026-06-29 — 13:40 P0-005 credential escrow evidence intake 只讀投影
**完成內容**
- 接續 P0-002 / P0-004 production readback green 後,按順位推進 P0-005。
- 新增 `credential_escrow_evidence_intake_readiness` loader 與 `/api/v1/agents/credential-escrow-evidence-intake-readiness`,把五個 missing escrow evidence items、禁止 secret collection、禁止 marker write 與 safe next step 集中成 P0-005 專屬只讀 API。
- 此投影只讀 `docs/security/credential-escrow-evidence-owner-request.snapshot.json` 與 Backup / DR readiness matrix不讀 `/backup` runtime secret、不寫 marker、不執行 backup / restore / offsite sync。
- Gitea CD controlled-runtime profile 納入此 service/test避免 docs/readback 小修回退 heavy B5。
**驗證目標**
- Focused`test_credential_escrow_evidence_intake_readiness_api.py``test_backup_dr_readiness_matrix_api.py``test_delivery_closure_workbench_api.py``ops/runner/test_cd_controlled_runtime_profile.py`
- Source / workflow guards`py_compile``guard-gitea-runner-pressure.py``check-gitea-step-env-secrets.js``git diff --check`
**邊界**:未使用 GitHub / `gh` / GitHub API未 workflow_dispatch未讀 token / cookie / session / secret / auth / `.env`;未讀 raw sessions / SQLite未操作 host / Docker / K8s / DB未寫 credential marker未 force push。
## 2026-06-29 — 13:35 P0-002 product manifest API image packaging 修補
**完成內容**

View File

@@ -27,6 +27,13 @@ def test_product_manifest_changes_stay_on_controlled_runtime_profile() -> None:
assert "tests/test_product_awoooi_manifest_standard_api.py" in text
def test_credential_escrow_intake_stays_on_controlled_runtime_profile() -> None:
text = _workflow_text()
assert "apps/api/src/services/credential_escrow_evidence_intake_readiness.py)" in text
assert "src/services/credential_escrow_evidence_intake_readiness.py" in text
assert "tests/test_credential_escrow_evidence_intake_readiness_api.py" in text
def test_p0_onboarding_readiness_sources_stay_on_controlled_runtime_profile() -> None:
text = _workflow_text()
expected_sources = [