feat(governance): 新增 Agent Telegram digest policy
All checks were successful
CD Pipeline / tests (push) Successful in 1m28s
Code Review / ai-code-review (push) Successful in 14s
CD Pipeline / build-and-deploy (push) Successful in 4m8s
CD Pipeline / post-deploy-checks (push) Successful in 1m55s

This commit is contained in:
Your Name
2026-06-11 14:34:49 +08:00
parent 0d536f1406
commit 785494cb77
15 changed files with 1683 additions and 31 deletions

View File

@@ -58,6 +58,9 @@ from src.services.ai_agent_deployment_layout import (
from src.services.ai_agent_proactive_operations_contract import (
load_latest_ai_agent_proactive_operations_contract,
)
from src.services.ai_agent_telegram_action_required_digest_policy import (
load_latest_ai_agent_telegram_action_required_digest_policy,
)
from src.services.ai_agent_tool_adoption_approval_package import (
load_latest_ai_agent_tool_adoption_approval_package,
)
@@ -645,6 +648,35 @@ async def get_agent_tool_adoption_approval_package() -> dict[str, Any]:
) from exc
@router.get(
"/agent-telegram-action-required-digest-policy",
response_model=dict[str, Any],
summary="取得 AI Agent Telegram action-required digest policy",
description=(
"讀取最新已提交的 AI Agent Telegram action-required digest policy"
"此端點只回傳 critical / action-required / failure-only digest 規則與 redaction 邊界,"
"不送 Telegram、不寫 Telegram Gateway queue、不改 Alertmanager route / receiver、"
"不寫 AwoooP event、不觸發 workflow、不查外部掃描、不執行 runtime、不讀取 secret、"
"不回傳工作視窗對話內容。"
),
)
async def get_agent_telegram_action_required_digest_policy() -> dict[str, Any]:
"""Return the latest read-only AI Agent Telegram action-required digest policy."""
try:
return await asyncio.to_thread(load_latest_ai_agent_telegram_action_required_digest_policy)
except FileNotFoundError as exc:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=str(exc),
) from exc
except (json.JSONDecodeError, ValueError) as exc:
logger.error("ai_agent_telegram_action_required_digest_policy_invalid", error=str(exc))
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="AI Agent Telegram action-required digest policy 無效",
) from exc
@router.get(
"/runtime-surface-inventory",
response_model=dict[str, Any],

View File

@@ -0,0 +1,289 @@
"""
AI Agent Telegram action-required digest policy snapshot.
Loads the latest committed, read-only policy for deciding which AI Agent,
version, tool-adoption, scanner, and upgrade signals may become
action-required Telegram digest candidates. This module never sends Telegram
messages, writes Telegram Gateway queues, changes Alertmanager receivers,
triggers workflows, reads secrets, or exposes work-window transcripts.
"""
from __future__ import annotations
import json
from pathlib import Path
from typing import Any
from src.services.snapshot_paths import default_evaluations_dir
_DEFAULT_EVALUATIONS_DIR = default_evaluations_dir(Path(__file__))
_SNAPSHOT_PATTERN = "ai_agent_telegram_action_required_digest_policy_*.json"
_SCHEMA_VERSION = "ai_agent_telegram_action_required_digest_policy_v1"
_RUNTIME_AUTHORITY = "policy_only_no_telegram_send_or_route_change"
_TRANSCRIPT_MARKERS = {
"# In app browser",
"My request for Codex",
"Current URL:",
"AGENTS.md instructions",
"<environment_context>",
"批准!繼續",
}
def load_latest_ai_agent_telegram_action_required_digest_policy(
evaluations_dir: Path | None = None,
) -> dict[str, Any]:
"""Load the newest committed AI Agent Telegram digest policy."""
directory = evaluations_dir or _DEFAULT_EVALUATIONS_DIR
candidates = sorted(directory.glob(_SNAPSHOT_PATTERN))
if not candidates:
raise FileNotFoundError(
f"no AI Agent Telegram action-required digest policy snapshots found in {directory}"
)
latest = candidates[-1]
with latest.open(encoding="utf-8") as handle:
payload = json.load(handle)
if not isinstance(payload, dict):
raise ValueError(f"{latest}: expected JSON object")
_require_schema(payload, _SCHEMA_VERSION, str(latest))
_require_read_only_boundaries(payload, str(latest))
_require_rollup_consistency(payload, str(latest))
_require_success_noise_suppression(payload, str(latest))
_require_digest_gate_safety(payload, str(latest))
_require_message_redaction(payload, str(latest))
_require_no_plaintext_secret_payload_keys(payload, str(latest))
_require_no_conversation_transcript_content(payload, str(latest))
return payload
def _require_schema(payload: dict[str, Any], expected: str, label: str) -> None:
actual = payload.get("schema_version")
if actual != expected:
raise ValueError(f"{label}: expected schema_version={expected}, got {actual!r}")
def _require_read_only_boundaries(payload: dict[str, Any], label: str) -> None:
program_status = payload.get("program_status") or {}
if program_status.get("read_only_mode") is not True:
raise ValueError(f"{label}: program_status.read_only_mode must be true")
if program_status.get("runtime_authority") != _RUNTIME_AUTHORITY:
raise ValueError(f"{label}: runtime_authority must stay {_RUNTIME_AUTHORITY}")
operation_boundaries = payload.get("operation_boundaries") or {}
if operation_boundaries.get("read_only_policy_allowed") is not True:
raise ValueError(f"{label}: read_only_policy_allowed must be true")
blocked_operation_flags = {
"telegram_direct_send_allowed",
"telegram_test_message_allowed",
"telegram_gateway_queue_write_allowed",
"alertmanager_route_change_allowed",
"telegram_receiver_change_allowed",
"awooop_event_write_allowed",
"workflow_trigger_allowed",
"external_scan_allowed",
"runtime_execution_allowed",
"secret_plaintext_allowed",
"conversation_transcript_allowed",
}
allowed_operation_flags = sorted(
flag
for flag in blocked_operation_flags
if operation_boundaries.get(flag) is not False
)
if allowed_operation_flags:
raise ValueError(
f"{label}: operation boundaries must remain false: {allowed_operation_flags}"
)
approval_boundaries = payload.get("approval_boundaries") or {}
allowed_approval_flags = sorted(
flag for flag, value in approval_boundaries.items() if value is not False
)
if allowed_approval_flags:
raise ValueError(
f"{label}: approval boundaries must remain false: {allowed_approval_flags}"
)
def _require_rollup_consistency(payload: dict[str, Any], label: str) -> None:
rules = payload.get("digest_rules") or []
channels = payload.get("digest_channels") or []
categories = payload.get("trigger_categories") or []
rollups = payload.get("rollups") or {}
expected_counts = {
"rule_count": len(rules),
"channel_count": len(channels),
"trigger_category_count": len(categories),
}
mismatched = {
key: {"expected": expected, "actual": rollups.get(key)}
for key, expected in expected_counts.items()
if rollups.get(key) != expected
}
if mismatched:
raise ValueError(f"{label}: rollup counts must match payload sections: {mismatched}")
by_decision: dict[str, int] = {}
for rule in rules:
decision = str(rule.get("decision"))
by_decision[decision] = by_decision.get(decision, 0) + 1
if rollups.get("by_decision") != by_decision:
raise ValueError(f"{label}: rollups.by_decision must match digest rule decisions")
expected_lists = {
"action_required_rule_ids": {
rule.get("rule_id")
for rule in rules
if rule.get("decision") == "draft_action_required_digest"
},
"failure_escalation_rule_ids": {
rule.get("rule_id")
for rule in rules
if rule.get("decision") == "draft_failure_escalation_digest"
},
"suppressed_success_rule_ids": {
rule.get("rule_id")
for rule in rules
if rule.get("decision") == "suppress_success_noise"
},
}
for key, expected in expected_lists.items():
if set(rollups.get(key) or []) != expected:
raise ValueError(f"{label}: rollups.{key} mismatch")
zero_rollups = {
"telegram_direct_send_allowed_count",
"telegram_gateway_queue_write_allowed_count",
"route_change_allowed_count",
"conversation_transcript_allowed_count",
}
nonzero = sorted(key for key in zero_rollups if rollups.get(key) != 0)
if nonzero:
raise ValueError(f"{label}: digest safety counters must remain 0: {nonzero}")
def _require_success_noise_suppression(payload: dict[str, Any], label: str) -> None:
noisy_channels = [
channel.get("channel_id")
for channel in payload.get("digest_channels") or []
if channel.get("success_immediate_allowed") is not False
or channel.get("direct_send_allowed") is not False
]
if noisy_channels:
raise ValueError(f"{label}: digest channels must suppress success/direct send: {noisy_channels}")
noisy_rules = [
rule.get("rule_id")
for rule in payload.get("digest_rules") or []
if rule.get("signal_state") in {"success", "verified"}
and rule.get("decision") != "suppress_success_noise"
]
if noisy_rules:
raise ValueError(f"{label}: success/verified signals must suppress digest noise: {noisy_rules}")
summary_policy = str(payload.get("rate_limit_policy", {}).get("success_policy") or "")
if "成功" not in summary_policy or "不得" not in summary_policy or "Telegram" not in summary_policy:
raise ValueError(f"{label}: rate_limit_policy.success_policy must block success spam")
def _require_digest_gate_safety(payload: dict[str, Any], label: str) -> None:
unsafe_rules = [
rule.get("rule_id")
for rule in payload.get("digest_rules") or []
if rule.get("decision") in {
"draft_action_required_digest",
"draft_failure_escalation_digest",
}
and (
not rule.get("required_evidence")
or rule.get("telegram_direct_send_allowed") is not False
or rule.get("requires_openclaw_review") is not True
)
]
if unsafe_rules:
raise ValueError(f"{label}: digest candidate rules need evidence and OpenClaw gate: {unsafe_rules}")
immediate_non_failure = [
rule.get("rule_id")
for rule in payload.get("digest_rules") or []
if rule.get("decision") == "draft_failure_escalation_digest"
and rule.get("signal_state") not in {"failed", "blocked", "critical"}
]
if immediate_non_failure:
raise ValueError(f"{label}: failure escalation digest must stay failure-only: {immediate_non_failure}")
def _require_message_redaction(payload: dict[str, Any], label: str) -> None:
template = payload.get("message_template_contract") or {}
required_fields = set(template.get("required_fields") or [])
required = {
"stage",
"severity",
"target_id",
"evidence_ref",
"blocked_reason",
"next_action",
"owner_agent",
"approval_gate",
}
if not required.issubset(required_fields):
raise ValueError(f"{label}: message_template_contract.required_fields missing digest context")
forbidden_fields = set(template.get("forbidden_fields") or [])
required_forbidden = {
"secret_value",
"token",
"authorization_header",
"work_window_transcript",
"codex_user_message",
"prompt_text",
"chain_of_thought",
"session_id",
"browser_context",
}
if not required_forbidden.issubset(forbidden_fields):
raise ValueError(f"{label}: message_template_contract.forbidden_fields missing redaction boundary")
display = payload.get("display_redaction_contract") or {}
if display.get("conversation_transcript_display_allowed") is not False:
raise ValueError(f"{label}: conversation transcript display must remain false")
if display.get("redaction_required") is not True:
raise ValueError(f"{label}: display redaction must be required")
def _require_no_plaintext_secret_payload_keys(value: Any, label: str, path: str = "$") -> None:
if isinstance(value, dict):
forbidden_key_fragments = {
"secret_value",
"token_plaintext",
"authorization_header",
"private_key",
"credential_value",
}
for key, nested in value.items():
normalized_key = str(key).lower()
if any(fragment in normalized_key for fragment in forbidden_key_fragments):
raise ValueError(f"{label}: forbidden plaintext secret key at {path}.{key}")
_require_no_plaintext_secret_payload_keys(nested, label, f"{path}.{key}")
elif isinstance(value, list):
for index, nested in enumerate(value):
_require_no_plaintext_secret_payload_keys(nested, label, f"{path}[{index}]")
def _require_no_conversation_transcript_content(value: Any, label: str, path: str = "$") -> None:
if isinstance(value, str):
for marker in _TRANSCRIPT_MARKERS:
if marker in value:
raise ValueError(
f"{label}: forbidden work-window conversation content at {path}: {marker}"
)
elif isinstance(value, dict):
for key, nested in value.items():
_require_no_conversation_transcript_content(nested, label, f"{path}.{key}")
elif isinstance(value, list):
for index, nested in enumerate(value):
_require_no_conversation_transcript_content(nested, label, f"{path}[{index}]")