diff --git a/apps/api/src/api/v1/agents.py b/apps/api/src/api/v1/agents.py index 1e513d71..04aeac78 100644 --- a/apps/api/src/api/v1/agents.py +++ b/apps/api/src/api/v1/agents.py @@ -85,6 +85,9 @@ from src.services.ai_agent_deployment_layout import ( from src.services.awoooi_status_cleanup_dashboard import ( load_latest_awoooi_status_cleanup_dashboard, ) +from src.services.github_target_private_backup_evidence_gate import ( + load_latest_github_target_private_backup_evidence_gate, +) from src.services.ai_agent_failure_receipt_no_send_replay import ( load_latest_ai_agent_failure_receipt_no_send_replay, ) @@ -929,6 +932,35 @@ async def get_delivery_closure_workbench() -> dict[str, Any]: ) from exc +@router.get( + "/github-target-private-backup-evidence-gate", + response_model=dict[str, Any], + summary="取得 GitHub 私有備援證據閘門", + description=( + "彙整既有 GitHub target decision、owner response、approval package 與 probe snapshot," + "用只讀方式判定 GitHub 備援是否具備 private visibility、safe credential 與 owner evidence。" + "此端點不呼叫 GitHub live API、不建立 repo、不改 visibility、不同步 refs、不觸發 workflow、" + "不收 private clone URL credential 或任何 secret value。" + ), +) +async def get_github_target_private_backup_evidence_gate() -> dict[str, Any]: + """回傳 GitHub 私有備援 evidence gate 只讀彙總。""" + try: + payload = await asyncio.to_thread(load_latest_github_target_private_backup_evidence_gate) + return redact_public_lan_topology(payload) + except FileNotFoundError as exc: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=str(exc), + ) from exc + except (json.JSONDecodeError, ValueError) as exc: + logger.error("github_target_private_backup_evidence_gate_invalid", error=str(exc)) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="GitHub 私有備援證據閘門無效", + ) from exc + + @router.get( "/agent-12-agent-war-room", response_model=dict[str, Any], diff --git a/apps/api/src/services/github_target_private_backup_evidence_gate.py b/apps/api/src/services/github_target_private_backup_evidence_gate.py new file mode 100644 index 00000000..1a89ff35 --- /dev/null +++ b/apps/api/src/services/github_target_private_backup_evidence_gate.py @@ -0,0 +1,385 @@ +"""GitHub private backup evidence gate. + +Builds a read-only gate from committed GitHub target snapshots. The gate is +deliberately conservative: a public probe hit is not considered a valid backup +because AWOOOI policy requires GitHub backup targets to be private. +""" + +from __future__ import annotations + +import json +from pathlib import Path +from typing import Any + +from src.services.snapshot_paths import default_security_dir + +_DEFAULT_SECURITY_DIR = default_security_dir(Path(__file__)) +_SCHEMA_VERSION = "github_target_private_backup_evidence_gate_v1" + +_DECISION_FILE = "github-target-decision.snapshot.json" +_OWNER_RESPONSE_FILE = "github-target-owner-decision-response.snapshot.json" +_APPROVAL_PACKAGE_FILE = "github-target-repo-approval-package.snapshot.json" +_PROBE_FILE = "github-target-probe.snapshot.json" + + +def load_latest_github_target_private_backup_evidence_gate( + security_dir: Path | None = None, +) -> dict[str, Any]: + """Load committed GitHub snapshots and return a private-backup gate.""" + directory = security_dir or _DEFAULT_SECURITY_DIR + decision = _load_snapshot(directory / _DECISION_FILE) + owner_response = _load_snapshot(directory / _OWNER_RESPONSE_FILE) + approval_package = _load_snapshot(directory / _APPROVAL_PACKAGE_FILE) + probe = _load_snapshot(directory / _PROBE_FILE) + + _require_source_contracts( + decision=decision, + owner_response=owner_response, + approval_package=approval_package, + probe=probe, + ) + return build_github_target_private_backup_evidence_gate( + decision=decision, + owner_response=owner_response, + approval_package=approval_package, + probe=probe, + ) + + +def build_github_target_private_backup_evidence_gate( + *, + decision: dict[str, Any], + owner_response: dict[str, Any], + approval_package: dict[str, Any], + probe: dict[str, Any], +) -> dict[str, Any]: + """Build the read-only gate response from source-control snapshots.""" + decisions = [_dict(row) for row in _list(decision.get("decisions"))] + probe_by_repo = { + str(row.get("github_repo")): _dict(row) + for row in _list(probe.get("candidates")) + if row.get("github_repo") + } + package_by_repo = { + str(row.get("github_repo")): _dict(row) + for row in _list(approval_package.get("approval_items")) + if row.get("github_repo") + } + owner_summary = _dict(owner_response.get("summary")) + + targets = [ + _build_target( + decision=row, + probe=probe_by_repo.get(str(row.get("github_repo")), {}), + approval_item=package_by_repo.get(str(row.get("github_repo")), {}), + ) + for row in decisions + ] + + approval_required_targets = [row for row in targets if row["approval_required"]] + public_probe_visible_targets = [ + row + for row in approval_required_targets + if row["visibility_evidence_status"] == "blocked_public_probe_visible_private_evidence_required" + ] + not_found_or_private_targets = [ + row + for row in approval_required_targets + if row["visibility_evidence_status"] == "blocked_private_or_absent_not_verified" + ] + external_scope_targets = [ + row for row in targets if row["visibility_evidence_status"] == "external_scope_not_backup_target" + ] + blocked_targets = [ + row + for row in approval_required_targets + if not row["private_backup_verified"] or not row["execution_ready"] + ] + + private_backup_verified_count = sum(1 for row in approval_required_targets if row["private_backup_verified"]) + forbidden_actions = sorted({item for row in targets for item in row["forbidden_actions"]}) + + return { + "schema_version": _SCHEMA_VERSION, + "generated_at": _generated_at(owner_response), + "status": "blocked_public_visibility_and_safe_credential_evidence_required" + if public_probe_visible_targets + else "blocked_private_visibility_and_safe_credential_evidence_required", + "mode": "read_only_private_backup_evidence_gate", + "source_reviews": { + "target_decision": f"docs/security/{_DECISION_FILE}", + "owner_decision_response": f"docs/security/{_OWNER_RESPONSE_FILE}", + "approval_package": f"docs/security/{_APPROVAL_PACKAGE_FILE}", + "github_target_probe": f"docs/security/{_PROBE_FILE}", + }, + "summary": { + "target_decision_count": len(targets), + "approval_required_target_count": len(approval_required_targets), + "approval_package_item_count": len(_list(approval_package.get("approval_items"))), + "public_probe_visible_target_count": len(public_probe_visible_targets), + "not_found_or_private_target_count": len(not_found_or_private_targets), + "private_backup_verified_count": private_backup_verified_count, + "private_visibility_evidence_missing_count": len(approval_required_targets) - private_backup_verified_count, + "safe_credential_required_count": len(approval_required_targets), + "safe_credential_accepted_evidence_count": 0, + "owner_response_received_count": _int(owner_summary.get("received_response_count")), + "owner_response_accepted_count": _int(owner_summary.get("accepted_response_count")), + "execution_ready_count": sum(1 for row in approval_required_targets if row["execution_ready"]), + "blocked_target_count": len(blocked_targets), + "external_scope_target_count": len(external_scope_targets), + "forbidden_action_count": len(forbidden_actions), + "repo_creation_authorized": False, + "visibility_change_authorized": False, + "refs_sync_authorized": False, + "github_primary_switch_authorized": False, + "workflow_modification_authorized": False, + "workflow_trigger_authorized": False, + "secret_value_collection_allowed": False, + "private_clone_url_collection_allowed": False, + "not_found_or_private_as_absent_allowed": False, + "public_repo_allowed": False, + }, + "targets": targets, + "acceptance_requirements": _acceptance_requirements(owner_response), + "rejection_rules": _rejection_rules(owner_response), + "operation_boundaries": { + "read_only_api_allowed": True, + "github_api_write_allowed": False, + "gitea_api_write_allowed": False, + "repo_creation_allowed": False, + "visibility_change_allowed": False, + "refs_sync_allowed": False, + "workflow_modification_allowed": False, + "workflow_trigger_allowed": False, + "github_primary_switch_allowed": False, + "secret_value_collection_allowed": False, + "private_clone_url_collection_allowed": False, + }, + "authorization_flags": { + "repo_creation_authorized": False, + "visibility_change_authorized": False, + "refs_sync_authorized": False, + "github_primary_switch_authorized": False, + "workflow_modification_authorized": False, + "workflow_trigger_authorized": False, + "secret_value_collection_allowed": False, + "private_clone_url_collection_allowed": False, + }, + } + + +def _build_target( + *, + decision: dict[str, Any], + probe: dict[str, Any], + approval_item: dict[str, Any], +) -> dict[str, Any]: + github_repo = str(decision.get("github_repo") or "") + probe_status = str(probe.get("status") or decision.get("probe_status") or "unknown") + target_state = str(decision.get("target_state") or "unknown") + approval_required = decision.get("approval_required") is True + external_scope = not approval_required or target_state == "external_scope" + visibility_status = _visibility_evidence_status( + external_scope=external_scope, + probe_status=probe_status, + ) + blockers = _target_blockers(visibility_status, approval_required) + forbidden_actions = _strings(approval_item.get("still_forbidden")) or [ + "create_github_repo", + "change_repo_visibility", + "push_refs", + "delete_refs", + "force_push", + "switch_github_primary", + "store_secret_value", + "store_token_value", + ] + + return { + "github_repo": github_repo, + "source_key": str(decision.get("source_key") or ""), + "approval_required": approval_required, + "probe_status": probe_status, + "target_state": target_state, + "risk": str(decision.get("risk") or "UNKNOWN"), + "visibility_evidence_status": visibility_status, + "private_backup_verified": False, + "private_visibility_owner_evidence_ref": None, + "safe_credential_evidence_status": "not_collected", + "safe_credential_evidence_ref": None, + "owner_response_accepted": False, + "refs_sync_ready": False, + "execution_ready": False, + "blockers": blockers, + "evidence_refs": _strings(decision.get("evidence_refs")), + "next_action": str(approval_item.get("approval_action") or decision.get("recommended_action") or ""), + "forbidden_actions": forbidden_actions, + "repo_creation_authorized": False, + "visibility_change_authorized": False, + "refs_sync_authorized": False, + "github_primary_switch_authorized": False, + "secret_values_collected": False, + } + + +def _visibility_evidence_status(*, external_scope: bool, probe_status: str) -> str: + if external_scope: + return "external_scope_not_backup_target" + if probe_status == "exists": + return "blocked_public_probe_visible_private_evidence_required" + if probe_status == "not_found_or_private": + return "blocked_private_or_absent_not_verified" + return "blocked_probe_status_unknown" + + +def _target_blockers(visibility_status: str, approval_required: bool) -> list[str]: + if not approval_required: + return ["external_scope_not_backup_target"] + blockers = [ + "private_visibility_owner_evidence_missing", + "safe_credential_evidence_missing", + "owner_response_not_accepted", + "refs_sync_not_authorized", + ] + if visibility_status == "blocked_public_probe_visible_private_evidence_required": + blockers.insert(0, "public_probe_visible_not_private_backup") + if visibility_status == "blocked_private_or_absent_not_verified": + blockers.insert(0, "not_found_or_private_requires_owner_evidence") + return blockers + + +def _load_snapshot(path: Path) -> dict[str, Any]: + with path.open(encoding="utf-8") as handle: + payload = json.load(handle) + if not isinstance(payload, dict): + raise ValueError(f"{path}: expected JSON object") + return payload + + +def _require_source_contracts( + *, + decision: dict[str, Any], + owner_response: dict[str, Any], + approval_package: dict[str, Any], + probe: dict[str, Any], +) -> None: + _require_schema(decision, "github_target_decision_v1", _DECISION_FILE) + _require_schema(owner_response, "github_target_owner_decision_response_v1", _OWNER_RESPONSE_FILE) + _require_schema(approval_package, "github_target_repo_approval_package_v1", _APPROVAL_PACKAGE_FILE) + _require_schema(probe, "github_target_probe_v1", _PROBE_FILE) + _require_decision_consistency(decision, _DECISION_FILE) + _require_probe_consistency(probe, _PROBE_FILE) + _require_approval_package_consistency(approval_package, _APPROVAL_PACKAGE_FILE) + _require_owner_response_boundaries(owner_response, _OWNER_RESPONSE_FILE) + + +def _require_schema(payload: dict[str, Any], expected: str, label: str) -> None: + actual = payload.get("schema_version") + if actual != expected: + raise ValueError(f"{label}: expected schema_version={expected}, got {actual!r}") + + +def _require_decision_consistency(payload: dict[str, Any], label: str) -> None: + decisions = _list(payload.get("decisions")) + if _int(payload.get("decision_count")) != len(decisions): + raise ValueError(f"{label}: decision_count must equal decisions length") + actual_approval_required = sum(1 for row in decisions if _dict(row).get("approval_required") is True) + if _int(payload.get("approval_required_count")) != actual_approval_required: + raise ValueError(f"{label}: approval_required_count must match decisions") + + +def _require_probe_consistency(payload: dict[str, Any], label: str) -> None: + candidates = _list(payload.get("candidates")) + if _int(payload.get("candidate_count")) != len(candidates): + raise ValueError(f"{label}: candidate_count must equal candidates length") + exists_count = sum(1 for row in candidates if _dict(row).get("status") == "exists") + not_found_count = sum(1 for row in candidates if _dict(row).get("status") == "not_found_or_private") + if _int(payload.get("exists_count")) != exists_count: + raise ValueError(f"{label}: exists_count must match candidates") + if _int(payload.get("not_found_or_private_count")) != not_found_count: + raise ValueError(f"{label}: not_found_or_private_count must match candidates") + + +def _require_approval_package_consistency(payload: dict[str, Any], label: str) -> None: + approval_items = _list(payload.get("approval_items")) + if _int(payload.get("package_count")) != len(approval_items): + raise ValueError(f"{label}: package_count must equal approval_items length") + + +def _require_owner_response_boundaries(payload: dict[str, Any], label: str) -> None: + if payload.get("runtime_execution_authorized") is not False: + raise ValueError(f"{label}: runtime_execution_authorized must be false") + summary = _dict(payload.get("summary")) + false_flags = { + "repo_creation_authorized", + "visibility_change_authorized", + "refs_sync_authorized", + "github_primary_switch_authorized", + "secret_value_collection_allowed", + "action_buttons_allowed", + "target_owner_request_dispatch_authorized", + "not_found_or_private_as_absent_allowed", + "repo_creation_allowed_without_owner_response", + "visibility_change_allowed_without_owner_response", + } + enabled = sorted(flag for flag in false_flags if summary.get(flag) is not False) + if enabled: + raise ValueError(f"{label}: owner response boundary flags must remain false: {enabled}") + + +def _generated_at(owner_response: dict[str, Any]) -> str: + if owner_response.get("generated_at"): + return str(owner_response["generated_at"]) + if owner_response.get("date"): + return f"{owner_response['date']}T00:00:00+08:00" + return "" + + +def _acceptance_requirements(owner_response: dict[str, Any]) -> list[str]: + packet = _dict(owner_response.get("target_owner_handoff_packet")) + requirements = _strings(packet.get("required_response_fields")) + return requirements or [ + "owner_role_or_team", + "decision", + "decision_reason", + "affected_scope", + "redacted_evidence_refs", + "followup_owner", + "rollback_owner", + "maintenance_window", + "validation_plan", + ] + + +def _rejection_rules(owner_response: dict[str, Any]) -> list[str]: + packet = _dict(owner_response.get("owner_response_request_packet")) + rules = _strings(packet.get("forbidden_payloads")) + return rules or [ + "token_value", + "secret_value", + "private_key", + "private_clone_url_credential", + "repo_creation_command", + "visibility_change_command", + "refs_sync_or_delete_request", + ] + + +def _dict(value: Any) -> dict[str, Any]: + return value if isinstance(value, dict) else {} + + +def _list(value: Any) -> list[Any]: + return value if isinstance(value, list) else [] + + +def _strings(value: Any) -> list[str]: + return [str(item) for item in _list(value) if item is not None] + + +def _int(value: Any) -> int: + if isinstance(value, bool): + return int(value) + if isinstance(value, (int, float)): + return int(value) + return 0 diff --git a/apps/api/tests/test_delivery_closure_workbench_api.py b/apps/api/tests/test_delivery_closure_workbench_api.py index 5501b4fb..a80eae3d 100644 --- a/apps/api/tests/test_delivery_closure_workbench_api.py +++ b/apps/api/tests/test_delivery_closure_workbench_api.py @@ -17,7 +17,7 @@ def test_delivery_closure_workbench_endpoint_returns_product_summary(): data = response.json() assert data["schema_version"] == "delivery_closure_workbench_v1" assert data["summary"]["source_count"] == 5 - assert 4 <= data["summary"]["loaded_source_count"] <= 5 + assert data["summary"]["loaded_source_count"] == 5 assert data["summary"]["runtime_execution_authorized"] is False assert data["summary"]["remote_write_authorized"] is False assert data["summary"]["repo_creation_authorized"] is False @@ -35,10 +35,13 @@ def test_delivery_closure_workbench_endpoint_returns_product_summary(): assert lanes["gitea"]["metric"]["kind"] == "workflow_count" assert lanes["runtime"]["metric"]["kind"] == "surface_count" assert lanes["backup"]["metric"]["kind"] == "readiness_row_count" - if sources["github_private_backup"]["loaded"] is False: - assert lanes["github"]["blocker_count"] == 1 - assert lanes["github"]["status"] == "blocked_github_private_backup_source_missing" - assert sources["github_private_backup"]["missing_reason"] + assert sources["github_private_backup"]["loaded"] is True + assert sources["github_private_backup"]["schema_version"] == "github_target_private_backup_evidence_gate_v1" + assert sources["github_private_backup"]["missing_reason"] == "" + assert lanes["github"]["blocker_count"] == 9 + assert lanes["github"]["status"] == "blocked_public_visibility_and_safe_credential_evidence_required" + assert lanes["github"]["metric"]["verified"] == 0 + assert lanes["github"]["metric"]["total"] == 9 assert all(0 <= lane["completion_percent"] <= 100 for lane in lanes.values()) assert all(lane["tone"] in {"ok", "warn", "danger"} for lane in lanes.values()) diff --git a/apps/api/tests/test_github_target_private_backup_evidence_gate.py b/apps/api/tests/test_github_target_private_backup_evidence_gate.py new file mode 100644 index 00000000..4be3e2db --- /dev/null +++ b/apps/api/tests/test_github_target_private_backup_evidence_gate.py @@ -0,0 +1,78 @@ +from __future__ import annotations + +import json +import shutil +from pathlib import Path + +import pytest + +from src.services.github_target_private_backup_evidence_gate import ( + load_latest_github_target_private_backup_evidence_gate, +) +from src.services.snapshot_paths import default_security_dir + + +def test_load_github_target_private_backup_evidence_gate_from_committed_snapshots(): + snapshot = load_latest_github_target_private_backup_evidence_gate() + + assert snapshot["schema_version"] == "github_target_private_backup_evidence_gate_v1" + assert snapshot["mode"] == "read_only_private_backup_evidence_gate" + assert snapshot["status"] == "blocked_public_visibility_and_safe_credential_evidence_required" + assert snapshot["summary"]["target_decision_count"] == 10 + assert snapshot["summary"]["approval_required_target_count"] == 9 + assert snapshot["summary"]["private_backup_verified_count"] == 0 + assert snapshot["summary"]["blocked_target_count"] == 9 + assert snapshot["summary"]["public_repo_allowed"] is False + assert snapshot["summary"]["repo_creation_authorized"] is False + assert snapshot["summary"]["visibility_change_authorized"] is False + assert snapshot["summary"]["refs_sync_authorized"] is False + assert snapshot["summary"]["secret_value_collection_allowed"] is False + assert snapshot["operation_boundaries"]["read_only_api_allowed"] is True + assert snapshot["operation_boundaries"]["repo_creation_allowed"] is False + assert snapshot["operation_boundaries"]["visibility_change_allowed"] is False + assert snapshot["operation_boundaries"]["refs_sync_allowed"] is False + assert snapshot["operation_boundaries"]["secret_value_collection_allowed"] is False + + targets = {target["github_repo"]: target for target in snapshot["targets"]} + assert targets["owenhytsai/awoooi"]["visibility_evidence_status"] == ( + "blocked_public_probe_visible_private_evidence_required" + ) + assert targets["owenhytsai/ewoooc"]["visibility_evidence_status"] == ( + "blocked_private_or_absent_not_verified" + ) + assert targets["nexu-io/open-design"]["visibility_evidence_status"] == ( + "external_scope_not_backup_target" + ) + + +def test_github_target_private_backup_gate_rejects_runtime_authorization(tmp_path): + _copy_security_snapshots(tmp_path) + owner_response_path = tmp_path / "github-target-owner-decision-response.snapshot.json" + owner_response = json.loads(owner_response_path.read_text(encoding="utf-8")) + owner_response["summary"]["repo_creation_authorized"] = True + owner_response_path.write_text(json.dumps(owner_response), encoding="utf-8") + + with pytest.raises(ValueError, match="boundary flags"): + load_latest_github_target_private_backup_evidence_gate(tmp_path) + + +def test_github_target_private_backup_gate_requires_decision_rollup_consistency(tmp_path): + _copy_security_snapshots(tmp_path) + decision_path = tmp_path / "github-target-decision.snapshot.json" + decision = json.loads(decision_path.read_text(encoding="utf-8")) + decision["decision_count"] = 999 + decision_path.write_text(json.dumps(decision), encoding="utf-8") + + with pytest.raises(ValueError, match="decision_count"): + load_latest_github_target_private_backup_evidence_gate(tmp_path) + + +def _copy_security_snapshots(tmp_path: Path) -> None: + source_dir = default_security_dir(Path(__file__)) + for filename in ( + "github-target-decision.snapshot.json", + "github-target-owner-decision-response.snapshot.json", + "github-target-repo-approval-package.snapshot.json", + "github-target-probe.snapshot.json", + ): + shutil.copy(source_dir / filename, tmp_path / filename) diff --git a/apps/api/tests/test_github_target_private_backup_evidence_gate_api.py b/apps/api/tests/test_github_target_private_backup_evidence_gate_api.py new file mode 100644 index 00000000..0019f832 --- /dev/null +++ b/apps/api/tests/test_github_target_private_backup_evidence_gate_api.py @@ -0,0 +1,35 @@ +from __future__ import annotations + +from fastapi import FastAPI +from fastapi.testclient import TestClient + +from src.api.v1.agents import router + + +def test_github_target_private_backup_evidence_gate_endpoint_returns_read_only_gate(): + app = FastAPI() + app.include_router(router, prefix="/api/v1") + client = TestClient(app) + + response = client.get("/api/v1/agents/github-target-private-backup-evidence-gate") + + assert response.status_code == 200 + data = response.json() + assert data["schema_version"] == "github_target_private_backup_evidence_gate_v1" + assert data["mode"] == "read_only_private_backup_evidence_gate" + assert data["summary"]["approval_required_target_count"] == 9 + assert data["summary"]["private_backup_verified_count"] == 0 + assert data["summary"]["blocked_target_count"] == 9 + assert data["summary"]["public_repo_allowed"] is False + assert data["summary"]["repo_creation_authorized"] is False + assert data["summary"]["visibility_change_authorized"] is False + assert data["summary"]["refs_sync_authorized"] is False + assert data["summary"]["secret_value_collection_allowed"] is False + assert data["operation_boundaries"]["read_only_api_allowed"] is True + assert data["operation_boundaries"]["github_api_write_allowed"] is False + assert data["operation_boundaries"]["repo_creation_allowed"] is False + assert data["operation_boundaries"]["visibility_change_allowed"] is False + assert data["operation_boundaries"]["refs_sync_allowed"] is False + assert data["operation_boundaries"]["workflow_trigger_allowed"] is False + assert data["operation_boundaries"]["private_clone_url_collection_allowed"] is False + assert "192.168.0." not in response.text