feat(ai): 新增 P2-407 報表 no-write 分析
Some checks failed
Code Review / ai-code-review (push) Successful in 14s
CD Pipeline / build-and-deploy (push) Has been cancelled
CD Pipeline / post-deploy-checks (push) Has been cancelled
CD Pipeline / tests (push) Has been cancelled
Ansible / Reboot Recovery Contract / validate (push) Has been cancelled

This commit is contained in:
Your Name
2026-06-18 15:18:06 +08:00
parent d4fc227ed9
commit 8548892f59
13 changed files with 1676 additions and 7 deletions

View File

@@ -91,6 +91,9 @@ from src.services.ai_agent_professional_task_expansion import (
from src.services.ai_agent_receipt_readback_owner_review import (
load_latest_ai_agent_receipt_readback_owner_review,
)
from src.services.ai_agent_report_no_write_analysis_runtime import (
load_latest_ai_agent_report_no_write_analysis_runtime,
)
from src.services.ai_agent_matched_playbook_learning_gap import (
load_latest_ai_agent_matched_playbook_learning_gap,
)
@@ -829,6 +832,37 @@ async def get_agent_receipt_readback_owner_review() -> dict[str, Any]:
) from exc
@router.get(
"/agent-report-no-write-analysis-runtime",
response_model=dict[str, Any],
summary="取得 P2-407 AI Agent 報表 no-write 分析 runtime",
description=(
"讀取最新已提交的 P2-407 AI Agent 報表 no-write 分析快照;此端點只呈現 "
"OpenClaw、Hermes、NemoTron 讀取日報 / 週報 / 月報、P2-406B receipt owner review、"
"P2-004 供應鏈漂移與 P2-403J 報表真相後產生的分析草稿與風險分級。"
"它不啟動 live AI worker、不排程實發、不寫 Gateway queue、不送 Telegram、"
"不呼叫 Bot API、不寫 receipt production target、不寫 production、不讀 secret、"
"不呼叫付費 API、不改主機、不執行 kubectl 或不可逆操作。"
),
)
async def get_agent_report_no_write_analysis_runtime() -> dict[str, Any]:
"""回傳最新 P2-407 report no-write analysis runtime 只讀快照。"""
try:
payload = await asyncio.to_thread(load_latest_ai_agent_report_no_write_analysis_runtime)
return redact_public_lan_topology(payload)
except FileNotFoundError as exc:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=str(exc),
) from exc
except (json.JSONDecodeError, ValueError) as exc:
logger.error("ai_agent_report_no_write_analysis_runtime_invalid", error=str(exc))
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="P2-407 AI Agent 報表 no-write 分析 runtime 快照無效",
) from exc
@router.get(
"/agent-communication-learning-contract",
response_model=dict[str, Any],

View File

@@ -0,0 +1,374 @@
"""
P2-407 AI Agent report no-write analysis runtime snapshot.
Loads the latest committed analysis draft that lets OpenClaw, Hermes, and
NemoTron read report evidence and propose risk-ranked recommendations. This
module intentionally does not run a live AI worker, send Telegram, write a
Gateway queue, write delivery receipts, read secrets, call paid APIs, mutate
hosts, run kubectl, or write production state.
"""
from __future__ import annotations
import json
from pathlib import Path
from typing import Any
from src.services.snapshot_paths import default_evaluations_dir
_DEFAULT_EVALUATIONS_DIR = default_evaluations_dir(Path(__file__))
_SNAPSHOT_PATTERN = "ai_agent_report_no_write_analysis_runtime_*.json"
_SCHEMA_VERSION = "ai_agent_report_no_write_analysis_runtime_v1"
_RUNTIME_AUTHORITY = "report_analysis_no_write_runtime_only_committed_snapshot"
_EXPECTED_CURRENT_TASK = "P2-407"
_EXPECTED_NEXT_TASK = "P2-408"
_EXPECTED_CANONICAL_ROOM = "AwoooI SRE 戰情室"
_EXPECTED_CANONICAL_ROOM_ENV = "SRE_GROUP_CHAT_ID"
_EXPECTED_SOURCE_SCHEMAS = {
"ai_agent_report_status_board_v1",
"ai_agent_report_automation_review_v1",
"ai_agent_receipt_readback_owner_review_v1",
"dependency_supply_chain_drift_monitor_v1",
"ai_agent_report_truth_actionability_review_v1",
}
_TRUE_TRUTH_FLAGS = {
"daily_weekly_monthly_reports_loaded",
"agent_workload_loaded",
"charts_loaded",
"receipt_owner_review_loaded",
"dependency_drift_loaded",
"report_truth_loaded",
"analysis_draft_snapshot_ready",
}
_FALSE_TRUTH_FLAGS = {
"ai_analysis_runtime_enabled",
"report_delivery_enabled",
"telegram_send_enabled",
"gateway_queue_write_enabled",
"bot_api_call_enabled",
"receipt_production_write_enabled",
"production_write_enabled",
"secret_read_enabled",
"paid_api_call_enabled",
"host_write_enabled",
"kubectl_action_enabled",
}
_ZERO_TRUTH_COUNTS = {
"live_ai_analysis_run_count_24h",
"live_report_delivery_count_24h",
"telegram_send_count_24h",
"gateway_queue_write_count_24h",
"bot_api_call_count_24h",
"receipt_production_write_count_24h",
"production_write_count_24h",
}
_TRUE_BOUNDARY_FLAGS = {
"read_only_analysis_allowed",
"draft_snapshot_write_allowed",
}
_FALSE_BOUNDARY_FLAGS = {
"ai_analysis_runtime_enabled",
"report_delivery_enabled",
"telegram_send_enabled",
"gateway_queue_write_enabled",
"bot_api_call_enabled",
"receipt_production_write_enabled",
"production_write_enabled",
"secret_read_enabled",
"paid_api_call_enabled",
"host_write_enabled",
"kubectl_action_enabled",
"destructive_operation_enabled",
"openclaw_replacement_allowed",
}
_ZERO_ROLLUP_FIELDS = {
"live_report_delivery_count",
"live_ai_analysis_count",
"telegram_send_count",
"gateway_queue_write_count",
"bot_api_call_count",
"receipt_production_write_count",
"production_write_count",
"secret_read_count",
"paid_api_call_count",
"host_write_count",
"kubectl_action_count",
}
_FORBIDDEN_PUBLIC_TERMS = {
"批准!繼續",
"In app browser",
"My request for Codex",
"chain_of_thought",
"chain-of-thought",
"private reasoning text",
"authorization_header",
"authorization header value",
"telegram token value",
"raw prompt",
"raw_payload",
}
def load_latest_ai_agent_report_no_write_analysis_runtime(
evaluations_dir: Path | None = None,
) -> dict[str, Any]:
"""Load the newest committed P2-407 no-write report analysis snapshot."""
directory = evaluations_dir or _DEFAULT_EVALUATIONS_DIR
candidates = sorted(directory.glob(_SNAPSHOT_PATTERN))
if not candidates:
raise FileNotFoundError(
f"no AI Agent report no-write analysis runtime snapshots found in {directory}"
)
latest = candidates[-1]
with latest.open(encoding="utf-8") as handle:
payload = json.load(handle)
if not isinstance(payload, dict):
raise ValueError(f"{latest}: expected JSON object")
label = str(latest)
_require_schema(payload, label)
_require_sources(payload, label)
_require_analysis_truth(payload, label)
_require_report_inputs(payload, label)
_require_agent_passes(payload, label)
_require_recommendations_and_artifacts(payload, label)
_require_owner_gates(payload, label)
_require_boundaries(payload, label)
_require_rollups(payload, label)
_require_no_forbidden_public_terms(payload, label)
return payload
def _require_schema(payload: dict[str, Any], label: str) -> None:
if payload.get("schema_version") != _SCHEMA_VERSION:
raise ValueError(f"{label}: expected schema_version={_SCHEMA_VERSION}")
status = payload.get("program_status") or {}
expected = {
"overall_completion_percent": 100,
"current_priority": "P2",
"current_task_id": _EXPECTED_CURRENT_TASK,
"next_task_id": _EXPECTED_NEXT_TASK,
"read_only_mode": True,
"runtime_authority": _RUNTIME_AUTHORITY,
}
mismatches = _mismatches(status, expected)
if mismatches:
raise ValueError(f"{label}: program_status mismatch: {mismatches}")
if not status.get("status_note"):
raise ValueError(f"{label}: program_status.status_note is required")
def _require_sources(payload: dict[str, Any], label: str) -> None:
if not payload.get("source_refs"):
raise ValueError(f"{label}: source_refs must not be empty")
sources = payload.get("source_readbacks") or []
schemas = {item.get("source_schema_version") for item in sources}
missing = sorted(_EXPECTED_SOURCE_SCHEMAS - schemas)
if missing:
raise ValueError(f"{label}: missing source schemas: {missing}")
for item in sources:
readback_id = item.get("readback_id") or "<missing>"
for field in ("source_ref", "endpoint", "owner_agent", "status", "key_readback", "next_action"):
if not item.get(field):
raise ValueError(f"{label}: source readback {readback_id} missing {field}")
def _require_analysis_truth(payload: dict[str, Any], label: str) -> None:
truth = payload.get("analysis_truth") or {}
missing_true = sorted(flag for flag in _TRUE_TRUTH_FLAGS if truth.get(flag) is not True)
if missing_true:
raise ValueError(f"{label}: analysis truth flags must remain true: {missing_true}")
unsafe_false = sorted(flag for flag in _FALSE_TRUTH_FLAGS if truth.get(flag) is not False)
if unsafe_false:
raise ValueError(f"{label}: analysis truth flags must remain false: {unsafe_false}")
non_zero = sorted(field for field in _ZERO_TRUTH_COUNTS if truth.get(field) != 0)
if non_zero:
raise ValueError(f"{label}: live analysis truth counts must remain zero: {non_zero}")
if not truth.get("truth_note"):
raise ValueError(f"{label}: analysis_truth.truth_note is required")
def _require_report_inputs(payload: dict[str, Any], label: str) -> None:
inputs = payload.get("report_inputs") or []
input_ids = {item.get("report_id") for item in inputs}
if input_ids != {"daily", "weekly", "monthly"}:
raise ValueError(f"{label}: report_inputs must include daily, weekly, monthly")
for item in inputs:
report_id = item.get("report_id") or "<missing>"
if item.get("completion_percent") != 100:
raise ValueError(f"{label}: report input {report_id} completion_percent must remain 100")
if not isinstance(item.get("actionability_score"), int) or item.get("actionability_score") <= 0:
raise ValueError(f"{label}: report input {report_id} actionability_score must be positive")
if not item.get("analysis_focus") or not item.get("blocked_runtime_action"):
raise ValueError(f"{label}: report input {report_id} missing focus or blocked action")
def _require_agent_passes(payload: dict[str, Any], label: str) -> None:
passes = payload.get("agent_analysis_passes") or []
agent_ids = {item.get("agent_id") for item in passes}
if agent_ids != {"openclaw", "hermes", "nemotron"}:
raise ValueError(f"{label}: agent_analysis_passes must include OpenClaw, Hermes, NemoTron")
for item in passes:
agent_id = item.get("agent_id") or "<missing>"
if item.get("live_runtime_write_allowed") is not False:
raise ValueError(f"{label}: agent pass {agent_id} live_runtime_write_allowed must remain false")
if not item.get("summary") or not item.get("handoff_to"):
raise ValueError(f"{label}: agent pass {agent_id} missing summary or handoff")
def _require_recommendations_and_artifacts(payload: dict[str, Any], label: str) -> None:
recommendations = payload.get("draft_recommendations") or []
if len(recommendations) < 1:
raise ValueError(f"{label}: draft_recommendations must not be empty")
for item in recommendations:
recommendation_id = item.get("recommendation_id") or "<missing>"
if not isinstance(item.get("actionability_score"), int) or item.get("actionability_score") <= 0:
raise ValueError(f"{label}: recommendation {recommendation_id} actionability_score must be positive")
if not item.get("blocked_runtime_action"):
raise ValueError(f"{label}: recommendation {recommendation_id} missing blocked_runtime_action")
if item.get("risk_tier") in {"high", "critical"} and item.get("approval_required") is not True:
raise ValueError(f"{label}: high/critical recommendation {recommendation_id} must require approval")
artifacts = payload.get("draft_artifacts") or []
if len(artifacts) < 1:
raise ValueError(f"{label}: draft_artifacts must not be empty")
for item in artifacts:
artifact_id = item.get("artifact_id") or "<missing>"
for flag in ("writes_production", "sends_telegram", "contains_secret"):
if item.get(flag) is not False:
raise ValueError(f"{label}: draft artifact {artifact_id}.{flag} must remain false")
if not item.get("evidence_ref"):
raise ValueError(f"{label}: draft artifact {artifact_id} evidence_ref is required")
def _require_owner_gates(payload: dict[str, Any], label: str) -> None:
gates = payload.get("owner_review_gates") or []
if len(gates) < 1:
raise ValueError(f"{label}: owner_review_gates must not be empty")
for gate in gates:
gate_id = gate.get("gate_id") or "<missing>"
if gate.get("risk_tier") in {"high", "critical"} and gate.get("status") not in {
"owner_review_required",
"blocked_by_runtime_gate",
}:
raise ValueError(f"{label}: high/critical owner gate {gate_id} must remain blocked or owner-review")
for field in ("required_fields", "acceptance_checks", "blocked_runtime_actions"):
if not gate.get(field):
raise ValueError(f"{label}: owner gate {gate_id} missing {field}")
def _require_boundaries(payload: dict[str, Any], label: str) -> None:
boundaries = payload.get("activation_boundaries") or {}
missing_true = sorted(flag for flag in _TRUE_BOUNDARY_FLAGS if boundaries.get(flag) is not True)
if missing_true:
raise ValueError(f"{label}: activation boundaries must remain true: {missing_true}")
unsafe_false = sorted(flag for flag in _FALSE_BOUNDARY_FLAGS if boundaries.get(flag) is not False)
if unsafe_false:
raise ValueError(f"{label}: activation boundaries must remain false: {unsafe_false}")
telegram = payload.get("telegram_policy") or {}
expected_telegram = {
"canonical_room": _EXPECTED_CANONICAL_ROOM,
"canonical_room_env": _EXPECTED_CANONICAL_ROOM_ENV,
"gateway_queue_write_allowed": False,
"direct_bot_api_allowed": False,
"telegram_send_allowed": False,
"receipt_write_allowed": False,
}
mismatches = _mismatches(telegram, expected_telegram)
if mismatches:
raise ValueError(f"{label}: telegram_policy mismatch: {mismatches}")
redaction = payload.get("display_redaction_contract") or {}
for flag in (
"redaction_required",
):
if redaction.get(flag) is not True:
raise ValueError(f"{label}: display redaction flag {flag} must remain true")
for flag in (
"raw_report_payload_display_allowed",
"private_reasoning_display_allowed",
"secret_value_display_allowed",
"work_window_transcript_display_allowed",
):
if redaction.get(flag) is not False:
raise ValueError(f"{label}: display redaction flag {flag} must remain false")
def _require_rollups(payload: dict[str, Any], label: str) -> None:
rollups = payload.get("rollups") or {}
sources = payload.get("source_readbacks") or []
inputs = payload.get("report_inputs") or []
passes = payload.get("agent_analysis_passes") or []
recommendations = payload.get("draft_recommendations") or []
artifacts = payload.get("draft_artifacts") or []
gates = payload.get("owner_review_gates") or []
blocked_actions = {
*(item.get("blocked_runtime_action") for item in recommendations),
*(
action
for gate in gates
for action in (gate.get("blocked_runtime_actions") or [])
),
}
blocked_actions.discard(None)
expected = {
"source_readback_count": len(sources),
"report_input_count": len(inputs),
"agent_analysis_pass_count": len(passes),
"draft_recommendation_count": len(recommendations),
"draft_artifact_count": len(artifacts),
"owner_review_gate_count": len(gates),
"approval_required_recommendation_count": sum(
1 for item in recommendations if item.get("approval_required") is True
),
"low_risk_recommendation_count": sum(1 for item in recommendations if item.get("risk_tier") == "low"),
"medium_risk_recommendation_count": sum(1 for item in recommendations if item.get("risk_tier") == "medium"),
"high_risk_recommendation_count": sum(1 for item in recommendations if item.get("risk_tier") == "high"),
"critical_risk_recommendation_count": sum(1 for item in recommendations if item.get("risk_tier") == "critical"),
"actionability_score_ready_count": sum(
1 for item in recommendations if isinstance(item.get("actionability_score"), int) and item["actionability_score"] > 0
),
"blocked_runtime_action_count": len(blocked_actions),
}
mismatches = {
key: {"expected": value, "actual": rollups.get(key)}
for key, value in expected.items()
if rollups.get(key) != value
}
if mismatches:
raise ValueError(f"{label}: rollup counts must match payload sections: {mismatches}")
approval_required_ids = sorted(
item.get("recommendation_id")
for item in recommendations
if item.get("approval_required") is True
)
if sorted(rollups.get("approval_required_recommendation_ids") or []) != approval_required_ids:
raise ValueError(f"{label}: approval_required_recommendation_ids mismatch")
non_zero = sorted(field for field in _ZERO_ROLLUP_FIELDS if rollups.get(field) != 0)
if non_zero:
raise ValueError(f"{label}: live rollup counts must remain zero: {non_zero}")
def _require_no_forbidden_public_terms(payload: dict[str, Any], label: str) -> None:
public_text = json.dumps(payload, ensure_ascii=False)
lower_public_text = public_text.lower()
leaked_terms = sorted(
term
for term in _FORBIDDEN_PUBLIC_TERMS
if (term.lower() if term.isascii() else term) in lower_public_text
)
if leaked_terms:
raise ValueError(f"{label}: forbidden public terms present: {leaked_terms}")
def _mismatches(actual: dict[str, Any], expected: dict[str, Any]) -> dict[str, dict[str, Any]]:
return {
key: {"expected": expected_value, "actual": actual.get(key)}
for key, expected_value in expected.items()
if actual.get(key) != expected_value
}