feat(delivery): add GitHub private backup evidence gate
This commit is contained in:
@@ -85,6 +85,9 @@ from src.services.ai_agent_deployment_layout import (
|
||||
from src.services.awoooi_status_cleanup_dashboard import (
|
||||
load_latest_awoooi_status_cleanup_dashboard,
|
||||
)
|
||||
from src.services.github_target_private_backup_evidence_gate import (
|
||||
load_latest_github_target_private_backup_evidence_gate,
|
||||
)
|
||||
from src.services.ai_agent_failure_receipt_no_send_replay import (
|
||||
load_latest_ai_agent_failure_receipt_no_send_replay,
|
||||
)
|
||||
@@ -929,6 +932,35 @@ async def get_delivery_closure_workbench() -> dict[str, Any]:
|
||||
) from exc
|
||||
|
||||
|
||||
@router.get(
|
||||
"/github-target-private-backup-evidence-gate",
|
||||
response_model=dict[str, Any],
|
||||
summary="取得 GitHub 私有備援證據閘門",
|
||||
description=(
|
||||
"彙整既有 GitHub target decision、owner response、approval package 與 probe snapshot,"
|
||||
"用只讀方式判定 GitHub 備援是否具備 private visibility、safe credential 與 owner evidence。"
|
||||
"此端點不呼叫 GitHub live API、不建立 repo、不改 visibility、不同步 refs、不觸發 workflow、"
|
||||
"不收 private clone URL credential 或任何 secret value。"
|
||||
),
|
||||
)
|
||||
async def get_github_target_private_backup_evidence_gate() -> dict[str, Any]:
|
||||
"""回傳 GitHub 私有備援 evidence gate 只讀彙總。"""
|
||||
try:
|
||||
payload = await asyncio.to_thread(load_latest_github_target_private_backup_evidence_gate)
|
||||
return redact_public_lan_topology(payload)
|
||||
except FileNotFoundError as exc:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_404_NOT_FOUND,
|
||||
detail=str(exc),
|
||||
) from exc
|
||||
except (json.JSONDecodeError, ValueError) as exc:
|
||||
logger.error("github_target_private_backup_evidence_gate_invalid", error=str(exc))
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
detail="GitHub 私有備援證據閘門無效",
|
||||
) from exc
|
||||
|
||||
|
||||
@router.get(
|
||||
"/agent-12-agent-war-room",
|
||||
response_model=dict[str, Any],
|
||||
|
||||
@@ -0,0 +1,385 @@
|
||||
"""GitHub private backup evidence gate.
|
||||
|
||||
Builds a read-only gate from committed GitHub target snapshots. The gate is
|
||||
deliberately conservative: a public probe hit is not considered a valid backup
|
||||
because AWOOOI policy requires GitHub backup targets to be private.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from src.services.snapshot_paths import default_security_dir
|
||||
|
||||
_DEFAULT_SECURITY_DIR = default_security_dir(Path(__file__))
|
||||
_SCHEMA_VERSION = "github_target_private_backup_evidence_gate_v1"
|
||||
|
||||
_DECISION_FILE = "github-target-decision.snapshot.json"
|
||||
_OWNER_RESPONSE_FILE = "github-target-owner-decision-response.snapshot.json"
|
||||
_APPROVAL_PACKAGE_FILE = "github-target-repo-approval-package.snapshot.json"
|
||||
_PROBE_FILE = "github-target-probe.snapshot.json"
|
||||
|
||||
|
||||
def load_latest_github_target_private_backup_evidence_gate(
|
||||
security_dir: Path | None = None,
|
||||
) -> dict[str, Any]:
|
||||
"""Load committed GitHub snapshots and return a private-backup gate."""
|
||||
directory = security_dir or _DEFAULT_SECURITY_DIR
|
||||
decision = _load_snapshot(directory / _DECISION_FILE)
|
||||
owner_response = _load_snapshot(directory / _OWNER_RESPONSE_FILE)
|
||||
approval_package = _load_snapshot(directory / _APPROVAL_PACKAGE_FILE)
|
||||
probe = _load_snapshot(directory / _PROBE_FILE)
|
||||
|
||||
_require_source_contracts(
|
||||
decision=decision,
|
||||
owner_response=owner_response,
|
||||
approval_package=approval_package,
|
||||
probe=probe,
|
||||
)
|
||||
return build_github_target_private_backup_evidence_gate(
|
||||
decision=decision,
|
||||
owner_response=owner_response,
|
||||
approval_package=approval_package,
|
||||
probe=probe,
|
||||
)
|
||||
|
||||
|
||||
def build_github_target_private_backup_evidence_gate(
|
||||
*,
|
||||
decision: dict[str, Any],
|
||||
owner_response: dict[str, Any],
|
||||
approval_package: dict[str, Any],
|
||||
probe: dict[str, Any],
|
||||
) -> dict[str, Any]:
|
||||
"""Build the read-only gate response from source-control snapshots."""
|
||||
decisions = [_dict(row) for row in _list(decision.get("decisions"))]
|
||||
probe_by_repo = {
|
||||
str(row.get("github_repo")): _dict(row)
|
||||
for row in _list(probe.get("candidates"))
|
||||
if row.get("github_repo")
|
||||
}
|
||||
package_by_repo = {
|
||||
str(row.get("github_repo")): _dict(row)
|
||||
for row in _list(approval_package.get("approval_items"))
|
||||
if row.get("github_repo")
|
||||
}
|
||||
owner_summary = _dict(owner_response.get("summary"))
|
||||
|
||||
targets = [
|
||||
_build_target(
|
||||
decision=row,
|
||||
probe=probe_by_repo.get(str(row.get("github_repo")), {}),
|
||||
approval_item=package_by_repo.get(str(row.get("github_repo")), {}),
|
||||
)
|
||||
for row in decisions
|
||||
]
|
||||
|
||||
approval_required_targets = [row for row in targets if row["approval_required"]]
|
||||
public_probe_visible_targets = [
|
||||
row
|
||||
for row in approval_required_targets
|
||||
if row["visibility_evidence_status"] == "blocked_public_probe_visible_private_evidence_required"
|
||||
]
|
||||
not_found_or_private_targets = [
|
||||
row
|
||||
for row in approval_required_targets
|
||||
if row["visibility_evidence_status"] == "blocked_private_or_absent_not_verified"
|
||||
]
|
||||
external_scope_targets = [
|
||||
row for row in targets if row["visibility_evidence_status"] == "external_scope_not_backup_target"
|
||||
]
|
||||
blocked_targets = [
|
||||
row
|
||||
for row in approval_required_targets
|
||||
if not row["private_backup_verified"] or not row["execution_ready"]
|
||||
]
|
||||
|
||||
private_backup_verified_count = sum(1 for row in approval_required_targets if row["private_backup_verified"])
|
||||
forbidden_actions = sorted({item for row in targets for item in row["forbidden_actions"]})
|
||||
|
||||
return {
|
||||
"schema_version": _SCHEMA_VERSION,
|
||||
"generated_at": _generated_at(owner_response),
|
||||
"status": "blocked_public_visibility_and_safe_credential_evidence_required"
|
||||
if public_probe_visible_targets
|
||||
else "blocked_private_visibility_and_safe_credential_evidence_required",
|
||||
"mode": "read_only_private_backup_evidence_gate",
|
||||
"source_reviews": {
|
||||
"target_decision": f"docs/security/{_DECISION_FILE}",
|
||||
"owner_decision_response": f"docs/security/{_OWNER_RESPONSE_FILE}",
|
||||
"approval_package": f"docs/security/{_APPROVAL_PACKAGE_FILE}",
|
||||
"github_target_probe": f"docs/security/{_PROBE_FILE}",
|
||||
},
|
||||
"summary": {
|
||||
"target_decision_count": len(targets),
|
||||
"approval_required_target_count": len(approval_required_targets),
|
||||
"approval_package_item_count": len(_list(approval_package.get("approval_items"))),
|
||||
"public_probe_visible_target_count": len(public_probe_visible_targets),
|
||||
"not_found_or_private_target_count": len(not_found_or_private_targets),
|
||||
"private_backup_verified_count": private_backup_verified_count,
|
||||
"private_visibility_evidence_missing_count": len(approval_required_targets) - private_backup_verified_count,
|
||||
"safe_credential_required_count": len(approval_required_targets),
|
||||
"safe_credential_accepted_evidence_count": 0,
|
||||
"owner_response_received_count": _int(owner_summary.get("received_response_count")),
|
||||
"owner_response_accepted_count": _int(owner_summary.get("accepted_response_count")),
|
||||
"execution_ready_count": sum(1 for row in approval_required_targets if row["execution_ready"]),
|
||||
"blocked_target_count": len(blocked_targets),
|
||||
"external_scope_target_count": len(external_scope_targets),
|
||||
"forbidden_action_count": len(forbidden_actions),
|
||||
"repo_creation_authorized": False,
|
||||
"visibility_change_authorized": False,
|
||||
"refs_sync_authorized": False,
|
||||
"github_primary_switch_authorized": False,
|
||||
"workflow_modification_authorized": False,
|
||||
"workflow_trigger_authorized": False,
|
||||
"secret_value_collection_allowed": False,
|
||||
"private_clone_url_collection_allowed": False,
|
||||
"not_found_or_private_as_absent_allowed": False,
|
||||
"public_repo_allowed": False,
|
||||
},
|
||||
"targets": targets,
|
||||
"acceptance_requirements": _acceptance_requirements(owner_response),
|
||||
"rejection_rules": _rejection_rules(owner_response),
|
||||
"operation_boundaries": {
|
||||
"read_only_api_allowed": True,
|
||||
"github_api_write_allowed": False,
|
||||
"gitea_api_write_allowed": False,
|
||||
"repo_creation_allowed": False,
|
||||
"visibility_change_allowed": False,
|
||||
"refs_sync_allowed": False,
|
||||
"workflow_modification_allowed": False,
|
||||
"workflow_trigger_allowed": False,
|
||||
"github_primary_switch_allowed": False,
|
||||
"secret_value_collection_allowed": False,
|
||||
"private_clone_url_collection_allowed": False,
|
||||
},
|
||||
"authorization_flags": {
|
||||
"repo_creation_authorized": False,
|
||||
"visibility_change_authorized": False,
|
||||
"refs_sync_authorized": False,
|
||||
"github_primary_switch_authorized": False,
|
||||
"workflow_modification_authorized": False,
|
||||
"workflow_trigger_authorized": False,
|
||||
"secret_value_collection_allowed": False,
|
||||
"private_clone_url_collection_allowed": False,
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def _build_target(
|
||||
*,
|
||||
decision: dict[str, Any],
|
||||
probe: dict[str, Any],
|
||||
approval_item: dict[str, Any],
|
||||
) -> dict[str, Any]:
|
||||
github_repo = str(decision.get("github_repo") or "")
|
||||
probe_status = str(probe.get("status") or decision.get("probe_status") or "unknown")
|
||||
target_state = str(decision.get("target_state") or "unknown")
|
||||
approval_required = decision.get("approval_required") is True
|
||||
external_scope = not approval_required or target_state == "external_scope"
|
||||
visibility_status = _visibility_evidence_status(
|
||||
external_scope=external_scope,
|
||||
probe_status=probe_status,
|
||||
)
|
||||
blockers = _target_blockers(visibility_status, approval_required)
|
||||
forbidden_actions = _strings(approval_item.get("still_forbidden")) or [
|
||||
"create_github_repo",
|
||||
"change_repo_visibility",
|
||||
"push_refs",
|
||||
"delete_refs",
|
||||
"force_push",
|
||||
"switch_github_primary",
|
||||
"store_secret_value",
|
||||
"store_token_value",
|
||||
]
|
||||
|
||||
return {
|
||||
"github_repo": github_repo,
|
||||
"source_key": str(decision.get("source_key") or ""),
|
||||
"approval_required": approval_required,
|
||||
"probe_status": probe_status,
|
||||
"target_state": target_state,
|
||||
"risk": str(decision.get("risk") or "UNKNOWN"),
|
||||
"visibility_evidence_status": visibility_status,
|
||||
"private_backup_verified": False,
|
||||
"private_visibility_owner_evidence_ref": None,
|
||||
"safe_credential_evidence_status": "not_collected",
|
||||
"safe_credential_evidence_ref": None,
|
||||
"owner_response_accepted": False,
|
||||
"refs_sync_ready": False,
|
||||
"execution_ready": False,
|
||||
"blockers": blockers,
|
||||
"evidence_refs": _strings(decision.get("evidence_refs")),
|
||||
"next_action": str(approval_item.get("approval_action") or decision.get("recommended_action") or ""),
|
||||
"forbidden_actions": forbidden_actions,
|
||||
"repo_creation_authorized": False,
|
||||
"visibility_change_authorized": False,
|
||||
"refs_sync_authorized": False,
|
||||
"github_primary_switch_authorized": False,
|
||||
"secret_values_collected": False,
|
||||
}
|
||||
|
||||
|
||||
def _visibility_evidence_status(*, external_scope: bool, probe_status: str) -> str:
|
||||
if external_scope:
|
||||
return "external_scope_not_backup_target"
|
||||
if probe_status == "exists":
|
||||
return "blocked_public_probe_visible_private_evidence_required"
|
||||
if probe_status == "not_found_or_private":
|
||||
return "blocked_private_or_absent_not_verified"
|
||||
return "blocked_probe_status_unknown"
|
||||
|
||||
|
||||
def _target_blockers(visibility_status: str, approval_required: bool) -> list[str]:
|
||||
if not approval_required:
|
||||
return ["external_scope_not_backup_target"]
|
||||
blockers = [
|
||||
"private_visibility_owner_evidence_missing",
|
||||
"safe_credential_evidence_missing",
|
||||
"owner_response_not_accepted",
|
||||
"refs_sync_not_authorized",
|
||||
]
|
||||
if visibility_status == "blocked_public_probe_visible_private_evidence_required":
|
||||
blockers.insert(0, "public_probe_visible_not_private_backup")
|
||||
if visibility_status == "blocked_private_or_absent_not_verified":
|
||||
blockers.insert(0, "not_found_or_private_requires_owner_evidence")
|
||||
return blockers
|
||||
|
||||
|
||||
def _load_snapshot(path: Path) -> dict[str, Any]:
|
||||
with path.open(encoding="utf-8") as handle:
|
||||
payload = json.load(handle)
|
||||
if not isinstance(payload, dict):
|
||||
raise ValueError(f"{path}: expected JSON object")
|
||||
return payload
|
||||
|
||||
|
||||
def _require_source_contracts(
|
||||
*,
|
||||
decision: dict[str, Any],
|
||||
owner_response: dict[str, Any],
|
||||
approval_package: dict[str, Any],
|
||||
probe: dict[str, Any],
|
||||
) -> None:
|
||||
_require_schema(decision, "github_target_decision_v1", _DECISION_FILE)
|
||||
_require_schema(owner_response, "github_target_owner_decision_response_v1", _OWNER_RESPONSE_FILE)
|
||||
_require_schema(approval_package, "github_target_repo_approval_package_v1", _APPROVAL_PACKAGE_FILE)
|
||||
_require_schema(probe, "github_target_probe_v1", _PROBE_FILE)
|
||||
_require_decision_consistency(decision, _DECISION_FILE)
|
||||
_require_probe_consistency(probe, _PROBE_FILE)
|
||||
_require_approval_package_consistency(approval_package, _APPROVAL_PACKAGE_FILE)
|
||||
_require_owner_response_boundaries(owner_response, _OWNER_RESPONSE_FILE)
|
||||
|
||||
|
||||
def _require_schema(payload: dict[str, Any], expected: str, label: str) -> None:
|
||||
actual = payload.get("schema_version")
|
||||
if actual != expected:
|
||||
raise ValueError(f"{label}: expected schema_version={expected}, got {actual!r}")
|
||||
|
||||
|
||||
def _require_decision_consistency(payload: dict[str, Any], label: str) -> None:
|
||||
decisions = _list(payload.get("decisions"))
|
||||
if _int(payload.get("decision_count")) != len(decisions):
|
||||
raise ValueError(f"{label}: decision_count must equal decisions length")
|
||||
actual_approval_required = sum(1 for row in decisions if _dict(row).get("approval_required") is True)
|
||||
if _int(payload.get("approval_required_count")) != actual_approval_required:
|
||||
raise ValueError(f"{label}: approval_required_count must match decisions")
|
||||
|
||||
|
||||
def _require_probe_consistency(payload: dict[str, Any], label: str) -> None:
|
||||
candidates = _list(payload.get("candidates"))
|
||||
if _int(payload.get("candidate_count")) != len(candidates):
|
||||
raise ValueError(f"{label}: candidate_count must equal candidates length")
|
||||
exists_count = sum(1 for row in candidates if _dict(row).get("status") == "exists")
|
||||
not_found_count = sum(1 for row in candidates if _dict(row).get("status") == "not_found_or_private")
|
||||
if _int(payload.get("exists_count")) != exists_count:
|
||||
raise ValueError(f"{label}: exists_count must match candidates")
|
||||
if _int(payload.get("not_found_or_private_count")) != not_found_count:
|
||||
raise ValueError(f"{label}: not_found_or_private_count must match candidates")
|
||||
|
||||
|
||||
def _require_approval_package_consistency(payload: dict[str, Any], label: str) -> None:
|
||||
approval_items = _list(payload.get("approval_items"))
|
||||
if _int(payload.get("package_count")) != len(approval_items):
|
||||
raise ValueError(f"{label}: package_count must equal approval_items length")
|
||||
|
||||
|
||||
def _require_owner_response_boundaries(payload: dict[str, Any], label: str) -> None:
|
||||
if payload.get("runtime_execution_authorized") is not False:
|
||||
raise ValueError(f"{label}: runtime_execution_authorized must be false")
|
||||
summary = _dict(payload.get("summary"))
|
||||
false_flags = {
|
||||
"repo_creation_authorized",
|
||||
"visibility_change_authorized",
|
||||
"refs_sync_authorized",
|
||||
"github_primary_switch_authorized",
|
||||
"secret_value_collection_allowed",
|
||||
"action_buttons_allowed",
|
||||
"target_owner_request_dispatch_authorized",
|
||||
"not_found_or_private_as_absent_allowed",
|
||||
"repo_creation_allowed_without_owner_response",
|
||||
"visibility_change_allowed_without_owner_response",
|
||||
}
|
||||
enabled = sorted(flag for flag in false_flags if summary.get(flag) is not False)
|
||||
if enabled:
|
||||
raise ValueError(f"{label}: owner response boundary flags must remain false: {enabled}")
|
||||
|
||||
|
||||
def _generated_at(owner_response: dict[str, Any]) -> str:
|
||||
if owner_response.get("generated_at"):
|
||||
return str(owner_response["generated_at"])
|
||||
if owner_response.get("date"):
|
||||
return f"{owner_response['date']}T00:00:00+08:00"
|
||||
return ""
|
||||
|
||||
|
||||
def _acceptance_requirements(owner_response: dict[str, Any]) -> list[str]:
|
||||
packet = _dict(owner_response.get("target_owner_handoff_packet"))
|
||||
requirements = _strings(packet.get("required_response_fields"))
|
||||
return requirements or [
|
||||
"owner_role_or_team",
|
||||
"decision",
|
||||
"decision_reason",
|
||||
"affected_scope",
|
||||
"redacted_evidence_refs",
|
||||
"followup_owner",
|
||||
"rollback_owner",
|
||||
"maintenance_window",
|
||||
"validation_plan",
|
||||
]
|
||||
|
||||
|
||||
def _rejection_rules(owner_response: dict[str, Any]) -> list[str]:
|
||||
packet = _dict(owner_response.get("owner_response_request_packet"))
|
||||
rules = _strings(packet.get("forbidden_payloads"))
|
||||
return rules or [
|
||||
"token_value",
|
||||
"secret_value",
|
||||
"private_key",
|
||||
"private_clone_url_credential",
|
||||
"repo_creation_command",
|
||||
"visibility_change_command",
|
||||
"refs_sync_or_delete_request",
|
||||
]
|
||||
|
||||
|
||||
def _dict(value: Any) -> dict[str, Any]:
|
||||
return value if isinstance(value, dict) else {}
|
||||
|
||||
|
||||
def _list(value: Any) -> list[Any]:
|
||||
return value if isinstance(value, list) else []
|
||||
|
||||
|
||||
def _strings(value: Any) -> list[str]:
|
||||
return [str(item) for item in _list(value) if item is not None]
|
||||
|
||||
|
||||
def _int(value: Any) -> int:
|
||||
if isinstance(value, bool):
|
||||
return int(value)
|
||||
if isinstance(value, (int, float)):
|
||||
return int(value)
|
||||
return 0
|
||||
@@ -17,7 +17,7 @@ def test_delivery_closure_workbench_endpoint_returns_product_summary():
|
||||
data = response.json()
|
||||
assert data["schema_version"] == "delivery_closure_workbench_v1"
|
||||
assert data["summary"]["source_count"] == 5
|
||||
assert 4 <= data["summary"]["loaded_source_count"] <= 5
|
||||
assert data["summary"]["loaded_source_count"] == 5
|
||||
assert data["summary"]["runtime_execution_authorized"] is False
|
||||
assert data["summary"]["remote_write_authorized"] is False
|
||||
assert data["summary"]["repo_creation_authorized"] is False
|
||||
@@ -35,10 +35,13 @@ def test_delivery_closure_workbench_endpoint_returns_product_summary():
|
||||
assert lanes["gitea"]["metric"]["kind"] == "workflow_count"
|
||||
assert lanes["runtime"]["metric"]["kind"] == "surface_count"
|
||||
assert lanes["backup"]["metric"]["kind"] == "readiness_row_count"
|
||||
if sources["github_private_backup"]["loaded"] is False:
|
||||
assert lanes["github"]["blocker_count"] == 1
|
||||
assert lanes["github"]["status"] == "blocked_github_private_backup_source_missing"
|
||||
assert sources["github_private_backup"]["missing_reason"]
|
||||
assert sources["github_private_backup"]["loaded"] is True
|
||||
assert sources["github_private_backup"]["schema_version"] == "github_target_private_backup_evidence_gate_v1"
|
||||
assert sources["github_private_backup"]["missing_reason"] == ""
|
||||
assert lanes["github"]["blocker_count"] == 9
|
||||
assert lanes["github"]["status"] == "blocked_public_visibility_and_safe_credential_evidence_required"
|
||||
assert lanes["github"]["metric"]["verified"] == 0
|
||||
assert lanes["github"]["metric"]["total"] == 9
|
||||
assert all(0 <= lane["completion_percent"] <= 100 for lane in lanes.values())
|
||||
assert all(lane["tone"] in {"ok", "warn", "danger"} for lane in lanes.values())
|
||||
|
||||
|
||||
@@ -0,0 +1,78 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from src.services.github_target_private_backup_evidence_gate import (
|
||||
load_latest_github_target_private_backup_evidence_gate,
|
||||
)
|
||||
from src.services.snapshot_paths import default_security_dir
|
||||
|
||||
|
||||
def test_load_github_target_private_backup_evidence_gate_from_committed_snapshots():
|
||||
snapshot = load_latest_github_target_private_backup_evidence_gate()
|
||||
|
||||
assert snapshot["schema_version"] == "github_target_private_backup_evidence_gate_v1"
|
||||
assert snapshot["mode"] == "read_only_private_backup_evidence_gate"
|
||||
assert snapshot["status"] == "blocked_public_visibility_and_safe_credential_evidence_required"
|
||||
assert snapshot["summary"]["target_decision_count"] == 10
|
||||
assert snapshot["summary"]["approval_required_target_count"] == 9
|
||||
assert snapshot["summary"]["private_backup_verified_count"] == 0
|
||||
assert snapshot["summary"]["blocked_target_count"] == 9
|
||||
assert snapshot["summary"]["public_repo_allowed"] is False
|
||||
assert snapshot["summary"]["repo_creation_authorized"] is False
|
||||
assert snapshot["summary"]["visibility_change_authorized"] is False
|
||||
assert snapshot["summary"]["refs_sync_authorized"] is False
|
||||
assert snapshot["summary"]["secret_value_collection_allowed"] is False
|
||||
assert snapshot["operation_boundaries"]["read_only_api_allowed"] is True
|
||||
assert snapshot["operation_boundaries"]["repo_creation_allowed"] is False
|
||||
assert snapshot["operation_boundaries"]["visibility_change_allowed"] is False
|
||||
assert snapshot["operation_boundaries"]["refs_sync_allowed"] is False
|
||||
assert snapshot["operation_boundaries"]["secret_value_collection_allowed"] is False
|
||||
|
||||
targets = {target["github_repo"]: target for target in snapshot["targets"]}
|
||||
assert targets["owenhytsai/awoooi"]["visibility_evidence_status"] == (
|
||||
"blocked_public_probe_visible_private_evidence_required"
|
||||
)
|
||||
assert targets["owenhytsai/ewoooc"]["visibility_evidence_status"] == (
|
||||
"blocked_private_or_absent_not_verified"
|
||||
)
|
||||
assert targets["nexu-io/open-design"]["visibility_evidence_status"] == (
|
||||
"external_scope_not_backup_target"
|
||||
)
|
||||
|
||||
|
||||
def test_github_target_private_backup_gate_rejects_runtime_authorization(tmp_path):
|
||||
_copy_security_snapshots(tmp_path)
|
||||
owner_response_path = tmp_path / "github-target-owner-decision-response.snapshot.json"
|
||||
owner_response = json.loads(owner_response_path.read_text(encoding="utf-8"))
|
||||
owner_response["summary"]["repo_creation_authorized"] = True
|
||||
owner_response_path.write_text(json.dumps(owner_response), encoding="utf-8")
|
||||
|
||||
with pytest.raises(ValueError, match="boundary flags"):
|
||||
load_latest_github_target_private_backup_evidence_gate(tmp_path)
|
||||
|
||||
|
||||
def test_github_target_private_backup_gate_requires_decision_rollup_consistency(tmp_path):
|
||||
_copy_security_snapshots(tmp_path)
|
||||
decision_path = tmp_path / "github-target-decision.snapshot.json"
|
||||
decision = json.loads(decision_path.read_text(encoding="utf-8"))
|
||||
decision["decision_count"] = 999
|
||||
decision_path.write_text(json.dumps(decision), encoding="utf-8")
|
||||
|
||||
with pytest.raises(ValueError, match="decision_count"):
|
||||
load_latest_github_target_private_backup_evidence_gate(tmp_path)
|
||||
|
||||
|
||||
def _copy_security_snapshots(tmp_path: Path) -> None:
|
||||
source_dir = default_security_dir(Path(__file__))
|
||||
for filename in (
|
||||
"github-target-decision.snapshot.json",
|
||||
"github-target-owner-decision-response.snapshot.json",
|
||||
"github-target-repo-approval-package.snapshot.json",
|
||||
"github-target-probe.snapshot.json",
|
||||
):
|
||||
shutil.copy(source_dir / filename, tmp_path / filename)
|
||||
@@ -0,0 +1,35 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from fastapi import FastAPI
|
||||
from fastapi.testclient import TestClient
|
||||
|
||||
from src.api.v1.agents import router
|
||||
|
||||
|
||||
def test_github_target_private_backup_evidence_gate_endpoint_returns_read_only_gate():
|
||||
app = FastAPI()
|
||||
app.include_router(router, prefix="/api/v1")
|
||||
client = TestClient(app)
|
||||
|
||||
response = client.get("/api/v1/agents/github-target-private-backup-evidence-gate")
|
||||
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert data["schema_version"] == "github_target_private_backup_evidence_gate_v1"
|
||||
assert data["mode"] == "read_only_private_backup_evidence_gate"
|
||||
assert data["summary"]["approval_required_target_count"] == 9
|
||||
assert data["summary"]["private_backup_verified_count"] == 0
|
||||
assert data["summary"]["blocked_target_count"] == 9
|
||||
assert data["summary"]["public_repo_allowed"] is False
|
||||
assert data["summary"]["repo_creation_authorized"] is False
|
||||
assert data["summary"]["visibility_change_authorized"] is False
|
||||
assert data["summary"]["refs_sync_authorized"] is False
|
||||
assert data["summary"]["secret_value_collection_allowed"] is False
|
||||
assert data["operation_boundaries"]["read_only_api_allowed"] is True
|
||||
assert data["operation_boundaries"]["github_api_write_allowed"] is False
|
||||
assert data["operation_boundaries"]["repo_creation_allowed"] is False
|
||||
assert data["operation_boundaries"]["visibility_change_allowed"] is False
|
||||
assert data["operation_boundaries"]["refs_sync_allowed"] is False
|
||||
assert data["operation_boundaries"]["workflow_trigger_allowed"] is False
|
||||
assert data["operation_boundaries"]["private_clone_url_collection_allowed"] is False
|
||||
assert "192.168.0." not in response.text
|
||||
Reference in New Issue
Block a user