feat(governance): 接入三 Agent 佈建布局
All checks were successful
CD Pipeline / tests (push) Successful in 1m24s
Code Review / ai-code-review (push) Successful in 15s
CD Pipeline / build-and-deploy (push) Successful in 6m5s
CD Pipeline / post-deploy-checks (push) Successful in 1m37s

This commit is contained in:
Your Name
2026-06-11 11:27:50 +08:00
parent 3418e014bc
commit e427af3cb2
13 changed files with 2308 additions and 41 deletions

View File

@@ -35,70 +35,73 @@ from pydantic import BaseModel, Field
from src.core.logging import get_logger
from src.core.sse import get_publisher
from src.services.agent_market_governance_snapshot import (
load_latest_agent_market_governance_snapshot,
)
from src.services.agent_service import (
AgentService,
TaskState,
get_agent_service,
)
from src.services.ai_agent_automation_backlog_snapshot import (
load_latest_ai_agent_automation_backlog_snapshot,
)
from src.services.ai_agent_automation_inventory_snapshot import (
load_latest_ai_agent_automation_inventory_snapshot,
)
from src.services.agent_market_governance_snapshot import (
load_latest_agent_market_governance_snapshot,
from src.services.ai_agent_deployment_layout import (
load_latest_ai_agent_deployment_layout,
)
from src.services.backup_dr_target_inventory import (
load_latest_backup_dr_target_inventory,
from src.services.ai_provider_route_matrix import (
load_latest_ai_provider_route_matrix,
)
from src.services.backup_dr_readiness_matrix import (
load_latest_backup_dr_readiness_matrix,
)
from src.services.backup_dr_target_inventory import (
load_latest_backup_dr_target_inventory,
)
from src.services.backup_notification_policy import (
load_latest_backup_notification_policy,
)
from src.services.backup_restore_drill_approval_package_template import (
load_latest_backup_restore_drill_approval_package_template,
)
from src.services.offsite_escrow_readiness_status import (
load_latest_offsite_escrow_readiness_status,
)
from src.services.runtime_surface_inventory import (
load_latest_runtime_surface_inventory,
)
from src.services.gitea_workflow_runner_health import (
load_latest_gitea_workflow_runner_health,
)
from src.services.observability_contract_matrix import (
load_latest_observability_contract_matrix,
)
from src.services.ai_provider_route_matrix import (
load_latest_ai_provider_route_matrix,
)
from src.services.service_health_gap_matrix import (
load_latest_service_health_gap_matrix,
)
from src.services.service_health_failure_notification_policy import (
load_latest_service_health_failure_notification_policy,
)
from src.services.package_supply_chain_inventory import (
load_latest_package_supply_chain_inventory,
)
from src.services.javascript_package_inventory import (
load_latest_javascript_package_inventory,
)
from src.services.docker_build_surface_inventory import (
load_latest_docker_build_surface_inventory,
from src.services.dependency_drift_check_plan import (
load_latest_dependency_drift_check_plan,
)
from src.services.dependency_risk_policy import (
load_latest_dependency_risk_policy,
)
from src.services.dependency_drift_check_plan import (
load_latest_dependency_drift_check_plan,
)
from src.services.dependency_upgrade_approval_package_template import (
load_latest_dependency_upgrade_approval_package_template,
)
from src.services.agent_service import (
AgentService,
TaskState,
get_agent_service,
from src.services.docker_build_surface_inventory import (
load_latest_docker_build_surface_inventory,
)
from src.services.gitea_workflow_runner_health import (
load_latest_gitea_workflow_runner_health,
)
from src.services.javascript_package_inventory import (
load_latest_javascript_package_inventory,
)
from src.services.observability_contract_matrix import (
load_latest_observability_contract_matrix,
)
from src.services.offsite_escrow_readiness_status import (
load_latest_offsite_escrow_readiness_status,
)
from src.services.package_supply_chain_inventory import (
load_latest_package_supply_chain_inventory,
)
from src.services.runtime_surface_inventory import (
load_latest_runtime_surface_inventory,
)
from src.services.service_health_failure_notification_policy import (
load_latest_service_health_failure_notification_policy,
)
from src.services.service_health_gap_matrix import (
load_latest_service_health_gap_matrix,
)
router = APIRouter(prefix="/agents", tags=["Agent Teams"])
@@ -494,6 +497,33 @@ async def get_automation_backlog_snapshot() -> dict[str, Any]:
) from exc
@router.get(
"/agent-deployment-layout",
response_model=dict[str, Any],
summary="取得 AI Agent 佈建布局快照",
description=(
"讀取最新已提交的 OpenClaw / Hermes / NemoTron 佈建布局快照;"
"此端點不部署 Agent、不呼叫外部模型、不送 Telegram、不碰 DB/Redis、不讀 Secret payload、"
"不批准 SDK/API/shadow/canary/生產路由或主機變更。"
),
)
async def get_agent_deployment_layout() -> dict[str, Any]:
"""Return the latest read-only AI Agent deployment layout snapshot."""
try:
return await asyncio.to_thread(load_latest_ai_agent_deployment_layout)
except FileNotFoundError as exc:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=str(exc),
) from exc
except (json.JSONDecodeError, ValueError) as exc:
logger.error("ai_agent_deployment_layout_invalid", error=str(exc))
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="AI Agent 佈建布局快照無效",
) from exc
@router.get(
"/runtime-surface-inventory",
response_model=dict[str, Any],

View File

@@ -0,0 +1,135 @@
"""
AI Agent deployment layout snapshot.
Loads the latest committed, read-only layout for OpenClaw, Hermes, and
NemoTron across hosts, packages, tools, services, projects, web surfaces,
learning loops, and Telegram notification boundaries. This module never
deploys agents, sends Telegram messages, calls providers, or approves writes.
"""
from __future__ import annotations
import json
from pathlib import Path
from typing import Any
from src.services.snapshot_paths import default_evaluations_dir
_DEFAULT_EVALUATIONS_DIR = default_evaluations_dir(Path(__file__))
_SNAPSHOT_PATTERN = "ai_agent_deployment_layout_*.json"
_SCHEMA_VERSION = "ai_agent_deployment_layout_v1"
def load_latest_ai_agent_deployment_layout(
evaluations_dir: Path | None = None,
) -> dict[str, Any]:
"""Load the newest committed AI Agent deployment layout snapshot."""
directory = evaluations_dir or _DEFAULT_EVALUATIONS_DIR
candidates = sorted(directory.glob(_SNAPSHOT_PATTERN))
if not candidates:
raise FileNotFoundError(f"no AI Agent deployment layout snapshots found in {directory}")
latest = candidates[-1]
with latest.open(encoding="utf-8") as handle:
payload = json.load(handle)
if not isinstance(payload, dict):
raise ValueError(f"{latest}: expected JSON object")
_require_schema(payload, _SCHEMA_VERSION, str(latest))
_require_read_only_layout(payload, str(latest))
_require_rollup_consistency(payload, str(latest))
_require_frontend_redaction(payload, str(latest))
_require_target_boundaries(payload, str(latest))
return payload
def _require_schema(payload: dict[str, Any], expected: str, label: str) -> None:
actual = payload.get("schema_version")
if actual != expected:
raise ValueError(f"{label}: expected schema_version={expected}, got {actual!r}")
def _require_read_only_layout(payload: dict[str, Any], label: str) -> None:
program_status = payload.get("program_status") or {}
if program_status.get("read_only_mode") is not True:
raise ValueError(f"{label}: program_status.read_only_mode must be true")
if program_status.get("deployment_authority") != "layout_only_no_runtime_deploy":
raise ValueError(f"{label}: deployment_authority must stay layout_only_no_runtime_deploy")
boundaries = payload.get("approval_boundaries") or {}
blocked_flags = {
"sdk_installation_allowed",
"paid_api_call_allowed",
"shadow_or_canary_allowed",
"production_routing_allowed",
"destructive_operation_allowed",
"secret_plaintext_allowed",
"autonomous_host_mutation_allowed",
"telegram_direct_send_allowed",
}
allowed = sorted(flag for flag in blocked_flags if boundaries.get(flag) is not False)
if allowed:
raise ValueError(f"{label}: approval boundaries must remain false: {allowed}")
def _require_rollup_consistency(payload: dict[str, Any], label: str) -> None:
targets = payload.get("deployment_targets") or []
rollups = payload.get("rollups") or {}
if rollups.get("total_targets") != len(targets):
raise ValueError(f"{label}: rollups.total_targets must match deployment_targets")
if rollups.get("by_domain") != _count_by(targets, "domain_id"):
raise ValueError(f"{label}: rollups.by_domain must match deployment_targets")
if rollups.get("by_primary_agent") != _count_by(targets, "primary_agent"):
raise ValueError(f"{label}: rollups.by_primary_agent must match deployment_targets")
if rollups.get("by_deployment_state") != _count_by(targets, "deployment_state"):
raise ValueError(f"{label}: rollups.by_deployment_state must match deployment_targets")
if rollups.get("by_telegram_policy") != _count_by(targets, "telegram_policy"):
raise ValueError(f"{label}: rollups.by_telegram_policy must match deployment_targets")
blocked_target_ids = sorted(
target.get("target_id")
for target in targets
if target.get("deployment_state") == "blocked_by_gate"
or target.get("automation_level") == "blocked"
)
if sorted(rollups.get("blocked_target_ids") or []) != blocked_target_ids:
raise ValueError(f"{label}: rollups.blocked_target_ids must match blocked targets")
def _require_frontend_redaction(payload: dict[str, Any], label: str) -> None:
redaction = ((payload.get("collaboration_contract") or {}).get("frontend_redaction") or {})
if redaction.get("operator_conversation_display_allowed") is not False:
raise ValueError(f"{label}: operator conversation display must stay false")
if redaction.get("agent_private_reasoning_display_allowed") is not False:
raise ValueError(f"{label}: agent private reasoning display must stay false")
def _require_target_boundaries(payload: dict[str, Any], label: str) -> None:
targets = payload.get("deployment_targets") or []
missing = [
target.get("target_id")
for target in targets
if not target.get("approval_gate")
or not target.get("telegram_policy")
or not target.get("communication_channels")
]
if missing:
raise ValueError(f"{label}: deployment targets missing boundary fields: {sorted(missing)}")
invalid_nemotron_runtime = [
target.get("target_id")
for target in targets
if target.get("primary_agent") == "nemotron"
and target.get("automation_level") not in {"observe_only", "blocked"}
]
if invalid_nemotron_runtime:
raise ValueError(f"{label}: Nemotron targets must stay observe_only or blocked")
def _count_by(items: list[dict[str, Any]], key: str) -> dict[str, int]:
counts: dict[str, int] = {}
for item in items:
value = item.get(key)
counts[value] = counts.get(value, 0) + 1
return counts