feat(governance): 新增 Agent host stateful 版本盤點
All checks were successful
CD Pipeline / tests (push) Successful in 1m27s
Code Review / ai-code-review (push) Successful in 13s
CD Pipeline / build-and-deploy (push) Successful in 6m0s
CD Pipeline / post-deploy-checks (push) Successful in 1m54s

This commit is contained in:
Your Name
2026-06-11 15:42:06 +08:00
parent cc6140230d
commit 2d00fa1f1e
17 changed files with 1422 additions and 30 deletions

View File

@@ -58,6 +58,9 @@ from src.services.ai_agent_deployment_layout import (
from src.services.ai_agent_gitea_pr_draft_lane import (
load_latest_ai_agent_gitea_pr_draft_lane,
)
from src.services.ai_agent_host_stateful_version_inventory import (
load_latest_ai_agent_host_stateful_version_inventory,
)
from src.services.ai_agent_proactive_operations_contract import (
load_latest_ai_agent_proactive_operations_contract,
)
@@ -709,6 +712,35 @@ async def get_agent_gitea_pr_draft_lane() -> dict[str, Any]:
) from exc
@router.get(
"/agent-host-stateful-version-inventory",
response_model=dict[str, Any],
summary="取得 AI Agent host / K3s / stateful 版本只讀盤點",
description=(
"讀取最新已提交的 AI Agent host OS / K3s / stateful services 版本只讀盤點與 "
"maintenance window 批准包;此端點不 SSH、不執行 host command、不執行 kubectl、"
"不 apt upgrade、不升級 kernel/K3s、不 drain node、不 reboot、不 restart stateful service、"
"不做 DB migration、不刪備份、不 restore、不 pull image、不安裝套件、不查外部版本來源、"
"不 active scan、不發 Telegram、不讀取 secret、不回傳工作視窗對話內容。"
),
)
async def get_agent_host_stateful_version_inventory() -> dict[str, Any]:
"""Return the latest read-only host / K3s / stateful version inventory."""
try:
return await asyncio.to_thread(load_latest_ai_agent_host_stateful_version_inventory)
except FileNotFoundError as exc:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=str(exc),
) from exc
except (json.JSONDecodeError, ValueError) as exc:
logger.error("ai_agent_host_stateful_version_inventory_invalid", error=str(exc))
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="AI Agent host / K3s / stateful 版本只讀盤點無效",
) from exc
@router.get(
"/runtime-surface-inventory",
response_model=dict[str, Any],

View File

@@ -0,0 +1,286 @@
"""
AI Agent host and stateful version inventory snapshot.
Loads the latest committed, read-only host OS, K3s, and stateful services
inventory contract. This module never runs SSH, kubectl, package upgrades,
node drains, reboots, stateful restarts, live scans, Telegram sends, or exposes
work-window transcripts.
"""
from __future__ import annotations
import json
from pathlib import Path
from typing import Any
from src.services.snapshot_paths import default_evaluations_dir
_DEFAULT_EVALUATIONS_DIR = default_evaluations_dir(Path(__file__))
_SNAPSHOT_PATTERN = "ai_agent_host_stateful_version_inventory_*.json"
_SCHEMA_VERSION = "ai_agent_host_stateful_version_inventory_v1"
_RUNTIME_AUTHORITY = "host_stateful_readonly_inventory_no_upgrade_or_restart"
_TRANSCRIPT_MARKERS = {
"# In app browser",
"My request for Codex",
"Current URL:",
"AGENTS.md instructions",
"<environment_context>",
"批准!繼續",
}
def load_latest_ai_agent_host_stateful_version_inventory(
evaluations_dir: Path | None = None,
) -> dict[str, Any]:
"""Load the newest committed host / K3s / stateful version inventory."""
directory = evaluations_dir or _DEFAULT_EVALUATIONS_DIR
candidates = sorted(directory.glob(_SNAPSHOT_PATTERN))
if not candidates:
raise FileNotFoundError(
f"no AI Agent host stateful version inventory snapshots found in {directory}"
)
latest = candidates[-1]
with latest.open(encoding="utf-8") as handle:
payload = json.load(handle)
if not isinstance(payload, dict):
raise ValueError(f"{latest}: expected JSON object")
_require_schema(payload, _SCHEMA_VERSION, str(latest))
_require_read_only_boundaries(payload, str(latest))
_require_rollup_consistency(payload, str(latest))
_require_inventory_safety(payload, str(latest))
_require_maintenance_approval_contract(payload, str(latest))
_require_display_redaction(payload, str(latest))
_require_no_plaintext_secret_payload_keys(payload, str(latest))
_require_no_conversation_transcript_content(payload, str(latest))
return payload
def _require_schema(payload: dict[str, Any], expected: str, label: str) -> None:
actual = payload.get("schema_version")
if actual != expected:
raise ValueError(f"{label}: expected schema_version={expected}, got {actual!r}")
def _require_read_only_boundaries(payload: dict[str, Any], label: str) -> None:
program_status = payload.get("program_status") or {}
if program_status.get("read_only_mode") is not True:
raise ValueError(f"{label}: program_status.read_only_mode must be true")
if program_status.get("runtime_authority") != _RUNTIME_AUTHORITY:
raise ValueError(f"{label}: runtime_authority must stay {_RUNTIME_AUTHORITY}")
operation_boundaries = payload.get("operation_boundaries") or {}
if operation_boundaries.get("read_only_inventory_allowed") is not True:
raise ValueError(f"{label}: read_only_inventory_allowed must be true")
blocked_operation_flags = {
"ssh_login_allowed",
"host_command_execution_allowed",
"kubectl_command_execution_allowed",
"apt_upgrade_allowed",
"os_release_upgrade_allowed",
"kernel_upgrade_allowed",
"k3s_upgrade_allowed",
"kubelet_restart_allowed",
"node_drain_allowed",
"reboot_allowed",
"stateful_service_restart_allowed",
"database_migration_allowed",
"backup_delete_allowed",
"restore_execution_allowed",
"image_pull_allowed",
"package_install_allowed",
"external_version_lookup_allowed",
"active_network_scan_allowed",
"telegram_direct_send_allowed",
"telegram_gateway_queue_write_allowed",
"secret_plaintext_allowed",
"conversation_transcript_allowed",
}
allowed_operation_flags = sorted(
flag
for flag in blocked_operation_flags
if operation_boundaries.get(flag) is not False
)
if allowed_operation_flags:
raise ValueError(
f"{label}: operation boundaries must remain false: {allowed_operation_flags}"
)
approval_boundaries = payload.get("approval_boundaries") or {}
allowed_approval_flags = sorted(
flag for flag, value in approval_boundaries.items() if value is not False
)
if allowed_approval_flags:
raise ValueError(
f"{label}: approval boundaries must remain false: {allowed_approval_flags}"
)
def _require_rollup_consistency(payload: dict[str, Any], label: str) -> None:
host_inventory = payload.get("host_inventory") or []
k3s_inventory = payload.get("k3s_inventory") or {}
stateful_services = payload.get("stateful_services") or []
readonly_probe_plan = payload.get("readonly_probe_plan") or []
maintenance_requirements = payload.get("maintenance_window_approval_package") or {}
rollups = payload.get("rollups") or {}
expected_counts = {
"host_count": len(host_inventory),
"k3s_node_count": len(k3s_inventory.get("nodes") or []),
"stateful_service_count": len(stateful_services),
"readonly_probe_step_count": len(readonly_probe_plan),
"maintenance_required_field_count": len(maintenance_requirements.get("required_fields") or []),
}
mismatched = {
key: {"expected": expected, "actual": rollups.get(key)}
for key, expected in expected_counts.items()
if rollups.get(key) != expected
}
if mismatched:
raise ValueError(f"{label}: rollup counts must match payload sections: {mismatched}")
expected_host_ids = sorted(host.get("host_id") for host in host_inventory)
if sorted(rollups.get("host_ids") or []) != expected_host_ids:
raise ValueError(f"{label}: rollups.host_ids mismatch")
expected_service_ids = sorted(service.get("service_id") for service in stateful_services)
if sorted(rollups.get("stateful_service_ids") or []) != expected_service_ids:
raise ValueError(f"{label}: rollups.stateful_service_ids mismatch")
zero_rollups = {
"ssh_login_allowed_count",
"kubectl_command_execution_allowed_count",
"apt_upgrade_allowed_count",
"k3s_upgrade_allowed_count",
"node_drain_allowed_count",
"reboot_allowed_count",
"stateful_service_restart_allowed_count",
"telegram_direct_send_allowed_count",
"conversation_transcript_allowed_count",
}
nonzero = sorted(key for key in zero_rollups if rollups.get(key) != 0)
if nonzero:
raise ValueError(f"{label}: safety counters must remain 0: {nonzero}")
def _require_inventory_safety(payload: dict[str, Any], label: str) -> None:
unsafe_hosts = [
host.get("host_id")
for host in payload.get("host_inventory") or []
if host.get("readonly_only") is not True
or host.get("host_update_authorized") is not False
or host.get("reboot_authorized") is not False
or host.get("maintenance_window_required") is not True
or not host.get("version_observation_status")
]
if unsafe_hosts:
raise ValueError(f"{label}: host inventory must remain read-only and gated: {unsafe_hosts}")
k3s = payload.get("k3s_inventory") or {}
if k3s.get("skew_policy_required") is not True:
raise ValueError(f"{label}: K3s skew policy must be required")
if k3s.get("upgrade_authorized") is not False:
raise ValueError(f"{label}: K3s upgrade must remain unauthorized")
unsafe_nodes = [
node.get("node_id")
for node in k3s.get("nodes") or []
if node.get("drain_authorized") is not False
or node.get("kubelet_restart_authorized") is not False
or node.get("readonly_only") is not True
]
if unsafe_nodes:
raise ValueError(f"{label}: K3s nodes must remain read-only: {unsafe_nodes}")
unsafe_services = [
service.get("service_id")
for service in payload.get("stateful_services") or []
if service.get("readonly_only") is not True
or service.get("restart_authorized") is not False
or service.get("upgrade_authorized") is not False
or service.get("backup_required_before_change") is not True
or not service.get("version_observation_status")
]
if unsafe_services:
raise ValueError(
f"{label}: stateful services must remain read-only and backup-gated: {unsafe_services}"
)
unsafe_probe_steps = [
step.get("step_id")
for step in payload.get("readonly_probe_plan") or []
if step.get("run_now_allowed") is not False
or step.get("mutation_allowed") is not False
or not step.get("planned_output")
]
if unsafe_probe_steps:
raise ValueError(f"{label}: readonly probe steps must stay planned-only: {unsafe_probe_steps}")
def _require_maintenance_approval_contract(payload: dict[str, Any], label: str) -> None:
required_fields = {
"owner",
"decision",
"maintenance_window",
"affected_hosts",
"affected_services",
"backup_snapshot_ref",
"rollback_owner",
"rollback_plan",
"smoke_plan",
"communication_plan",
"risk_acceptance",
}
package = payload.get("maintenance_window_approval_package") or {}
actual_fields = set(package.get("required_fields") or [])
if not required_fields.issubset(actual_fields):
raise ValueError(f"{label}: maintenance window approval package missing required fields")
if package.get("approval_required_before_probe") is not True:
raise ValueError(f"{label}: approval must be required before live probe")
if package.get("approval_required_before_change") is not True:
raise ValueError(f"{label}: approval must be required before changes")
if package.get("break_glass_record_required") is not True:
raise ValueError(f"{label}: break-glass record must be required")
def _require_display_redaction(payload: dict[str, Any], label: str) -> None:
display = payload.get("display_redaction_contract") or {}
if display.get("conversation_transcript_display_allowed") is not False:
raise ValueError(f"{label}: conversation transcript display must remain false")
if display.get("redaction_required") is not True:
raise ValueError(f"{label}: display redaction must be required")
def _require_no_plaintext_secret_payload_keys(value: Any, label: str, path: str = "$") -> None:
if isinstance(value, dict):
forbidden_key_fragments = {
"secret_value",
"token_plaintext",
"authorization_header",
"private_key",
"credential_value",
}
for key, nested in value.items():
normalized_key = str(key).lower()
if any(fragment in normalized_key for fragment in forbidden_key_fragments):
raise ValueError(f"{label}: forbidden plaintext secret key at {path}.{key}")
_require_no_plaintext_secret_payload_keys(nested, label, f"{path}.{key}")
elif isinstance(value, list):
for index, nested in enumerate(value):
_require_no_plaintext_secret_payload_keys(nested, label, f"{path}[{index}]")
def _require_no_conversation_transcript_content(value: Any, label: str, path: str = "$") -> None:
if isinstance(value, str):
for marker in _TRANSCRIPT_MARKERS:
if marker in value:
raise ValueError(
f"{label}: forbidden work-window conversation content at {path}: {marker}"
)
elif isinstance(value, dict):
for key, nested in value.items():
_require_no_conversation_transcript_content(nested, label, f"{path}.{key}")
elif isinstance(value, list):
for index, nested in enumerate(value):
_require_no_conversation_transcript_content(nested, label, f"{path}[{index}]")