feat(api): verify log feedback writeback dry run
Some checks failed
CD Pipeline / workflow-shape (push) Successful in 0s
CD Pipeline / cancel-stale-cd (push) Has been skipped
CD Pipeline / tests (push) Failing after 2m48s
CD Pipeline / build-and-deploy (push) Has been skipped
CD Pipeline / post-deploy-checks (push) Has been skipped

This commit is contained in:
Your Name
2026-06-29 21:15:36 +08:00
parent 1144955ca9
commit f300b459dc
3 changed files with 468 additions and 0 deletions

View File

@@ -106,6 +106,9 @@ from src.services.ai_agent_log_feedback_receipt_dry_run import (
from src.services.ai_agent_log_intelligence_integration_readback import (
load_latest_ai_agent_log_intelligence_integration_readback,
)
from src.services.ai_agent_log_post_write_verifier_dry_run import (
load_latest_ai_agent_log_post_write_verifier_dry_run,
)
from src.services.ai_agent_low_medium_risk_whitelist import (
load_latest_ai_agent_low_medium_risk_whitelist,
)
@@ -1869,6 +1872,37 @@ async def get_agent_log_feedback_receipt_dry_run() -> dict[str, Any]:
) from exc
@router.get(
"/agent-log-post-write-verifier-dry-run",
response_model=dict[str, Any],
summary="取得 AI Agent LOG post-write verifier dry-run",
description=(
"驗證 LOG feedback receipt 候選是否已具備 KM、RAG、PlayBook、MCP audit、"
"post-apply verifier 與 AI Agent 決策上下文回寫前的必要標籤、redaction 與 no-write 邊界。"
"此端點不寫 KM、不寫 RAG index、不更新 PlayBook trust、不呼叫 MCP tool、"
"不執行 runtime action、不保存 raw log payload、不讀 secret、不呼叫 GitHub。"
),
)
async def get_agent_log_post_write_verifier_dry_run() -> dict[str, Any]:
"""Return LOG feedback post-write verifier dry-run readback."""
try:
payload = await asyncio.to_thread(
load_latest_ai_agent_log_post_write_verifier_dry_run
)
return redact_public_lan_topology(payload)
except FileNotFoundError as exc:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=str(exc),
) from exc
except (json.JSONDecodeError, ValueError) as exc:
logger.error("ai_agent_log_post_write_verifier_dry_run_invalid", error=str(exc))
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="AI Agent LOG post-write verifier dry-run 無效",
) from exc
@router.get(
"/agent-telegram-receipt-approval-package",
response_model=dict[str, Any],

View File

@@ -0,0 +1,325 @@
"""AI Agent LOG post-write verifier dry-run.
Validates LOG feedback receipt candidates before a controlled KM / RAG /
PlayBook / MCP trust writeback lane is enabled. This module is deliberately
dry-run only: it performs structural verification against the committed
receipt plan and does not write KM, index RAG, update PlayBook trust, call MCP
tools, execute runtime actions, or persist raw log payloads.
"""
from __future__ import annotations
from typing import Any
from src.services.ai_agent_log_feedback_receipt_dry_run import (
load_latest_ai_agent_log_feedback_receipt_dry_run,
)
_SCHEMA_VERSION = "ai_agent_log_post_write_verifier_dry_run_v1"
_SOURCE_READY_STATUS = "trusted_feedback_receipt_dry_run_ready"
_COMMON_REQUIRED_FIELDS = (
"receipt_id",
"target",
"source_sample_id",
"project_id",
"product",
"service",
"package",
"tool",
"source_system",
"redaction_state",
"observed_event_count",
"observed_field_count",
"dry_run_status",
"write_enabled",
"raw_log_payload_persisted",
)
_TARGET_REQUIREMENTS: tuple[dict[str, Any], ...] = (
{
"target": "km",
"node_id": "verify_km_memory_receipt_shape",
"artifact": "km_memory_receipt_candidate",
"required_fields": _COMMON_REQUIRED_FIELDS,
},
{
"target": "rag",
"node_id": "verify_rag_chunk_receipt_shape",
"artifact": "rag_chunk_receipt_candidate",
"required_fields": _COMMON_REQUIRED_FIELDS,
},
{
"target": "playbook",
"node_id": "verify_playbook_trust_receipt_shape",
"artifact": "playbook_trust_receipt_candidate",
"required_fields": _COMMON_REQUIRED_FIELDS,
},
{
"target": "mcp",
"node_id": "verify_mcp_audit_feedback_shape",
"artifact": "mcp_audit_feedback_candidate",
"required_fields": _COMMON_REQUIRED_FIELDS,
},
{
"target": "verifier",
"node_id": "verify_post_apply_feedback_binding",
"artifact": "post_apply_verifier_feedback_receipt",
"required_fields": _COMMON_REQUIRED_FIELDS,
},
{
"target": "ai_agent",
"node_id": "verify_agent_decision_context_binding",
"artifact": "agent_decision_context_dry_run",
"required_fields": _COMMON_REQUIRED_FIELDS,
},
)
_SOURCE_FALSE_BOUNDARIES = (
"km_write_performed",
"rag_index_write_performed",
"playbook_trust_write_performed",
"mcp_tool_call_performed",
"agent_runtime_action_performed",
"post_apply_verifier_executed",
"raw_log_payload_persisted",
"secret_value_collection_allowed",
"workflow_trigger_performed",
"github_api_used",
)
def load_latest_ai_agent_log_post_write_verifier_dry_run() -> dict[str, Any]:
"""Return the latest dry-run verifier for LOG feedback receipt candidates."""
source = load_latest_ai_agent_log_feedback_receipt_dry_run()
source_rollups = source.get("rollups") or {}
source_boundaries = source.get("operation_boundaries") or {}
receipts = _candidate_receipts(source)
source_ready = (
source.get("status") == _SOURCE_READY_STATUS
and source_rollups.get("dry_run_ready") is True
and source.get("active_blockers") == []
)
target_verifications = [
_verify_target(requirement, receipts) for requirement in _TARGET_REQUIREMENTS
]
active_blockers = _active_blockers(
source_ready=source_ready,
source_boundaries=source_boundaries,
receipts=receipts,
target_verifications=target_verifications,
)
verifier_nodes = _verifier_nodes(
source_ready=source_ready,
target_verifications=target_verifications,
)
verified_candidate_count = sum(
verification["verified_candidate_count"]
for verification in target_verifications
)
return {
"schema_version": _SCHEMA_VERSION,
"priority": "P1-LOG-KM-RAG-MCP-PLAYBOOK",
"scope": "ai_agent_log_post_write_verifier_dry_run",
"status": (
"post_write_verifier_dry_run_ready"
if not active_blockers
else "blocked_waiting_feedback_receipt_verifier"
),
"readback": {
"workplan_id": "P1-LOG-POST-WRITE-VERIFIER-DRY-RUN",
"workplan_title": "KM / RAG / PlayBook / MCP feedback receipt post-write verifier dry-run",
"source_schema_version": source.get("schema_version"),
"source_status": source.get("status"),
"safe_next_step": (
"enable_controlled_km_rag_playbook_trust_writeback_with_post_write_verifier"
),
},
"verifier_nodes": verifier_nodes,
"target_verifications": target_verifications,
"rollups": {
"verifier_node_count": len(verifier_nodes),
"passed_verifier_node_count": sum(
1 for node in verifier_nodes if node["status"] == "dry_run_passed"
),
"target_count": len(_TARGET_REQUIREMENTS),
"verified_target_count": sum(
1
for verification in target_verifications
if verification["status"] == "target_receipts_verified"
),
"candidate_receipt_count": len(receipts),
"verified_candidate_receipt_count": verified_candidate_count,
"source_pipeline_node_count": source_rollups.get("pipeline_node_count", 0),
"source_candidate_receipt_count": source_rollups.get(
"candidate_receipt_count",
0,
),
"runtime_sample_count": source_rollups.get("runtime_sample_count", 0),
"dry_run_ready": not active_blockers,
"controlled_writeback_next_step_ready": not active_blockers,
"writeback_authorized_by_this_endpoint": False,
},
"active_blockers": active_blockers,
"operation_boundaries": {
"dry_run_only": True,
"post_write_verifier_dry_run_performed": True,
"km_write_performed": False,
"rag_index_write_performed": False,
"playbook_trust_write_performed": False,
"mcp_tool_call_performed": False,
"agent_runtime_action_performed": False,
"post_write_verifier_runtime_write_performed": False,
"raw_log_payload_persisted": False,
"secret_value_collection_allowed": False,
"workflow_trigger_performed": False,
"github_api_used": False,
},
}
def _candidate_receipts(source: dict[str, Any]) -> list[dict[str, Any]]:
receipts = source.get("candidate_receipts")
if not isinstance(receipts, list):
return []
return [receipt for receipt in receipts if isinstance(receipt, dict)]
def _verify_target(
requirement: dict[str, Any],
receipts: list[dict[str, Any]],
) -> dict[str, Any]:
target = requirement["target"]
required_fields = tuple(requirement["required_fields"])
target_receipts = [
receipt for receipt in receipts if receipt.get("target") == target
]
invalid_receipts = [
_invalid_receipt(receipt, required_fields)
for receipt in target_receipts
]
invalid_receipts = [invalid for invalid in invalid_receipts if invalid["reasons"]]
return {
"target": target,
"artifact": requirement["artifact"],
"status": (
"target_receipts_verified"
if target_receipts and not invalid_receipts
else "target_receipts_invalid_or_missing"
),
"candidate_count": len(target_receipts),
"verified_candidate_count": len(target_receipts) - len(invalid_receipts),
"invalid_candidate_count": len(invalid_receipts),
"required_fields": list(required_fields),
"invalid_receipts": invalid_receipts,
"write_enabled": False,
}
def _invalid_receipt(
receipt: dict[str, Any],
required_fields: tuple[str, ...],
) -> dict[str, Any]:
reasons: list[str] = []
missing_fields = [
field for field in required_fields if _missing_value(receipt.get(field))
]
if missing_fields:
reasons.append(f"missing_fields:{','.join(missing_fields)}")
if receipt.get("dry_run_status") != "candidate_ready":
reasons.append("dry_run_status_not_candidate_ready")
if receipt.get("write_enabled") is not False:
reasons.append("write_enabled_must_be_false")
if receipt.get("raw_log_payload_persisted") is not False:
reasons.append("raw_log_payload_must_not_be_persisted")
if receipt.get("redaction_state") != "metadata_only_no_raw_payload":
reasons.append("redaction_state_must_be_metadata_only")
if not isinstance(receipt.get("observed_event_count"), int) or (
receipt.get("observed_event_count") or 0
) <= 0:
reasons.append("observed_event_count_must_be_positive")
if not isinstance(receipt.get("observed_field_count"), int) or (
receipt.get("observed_field_count") or 0
) <= 0:
reasons.append("observed_field_count_must_be_positive")
return {
"receipt_id": str(receipt.get("receipt_id") or ""),
"reasons": reasons,
}
def _verifier_nodes(
*,
source_ready: bool,
target_verifications: list[dict[str, Any]],
) -> list[dict[str, Any]]:
verifications_by_target = {
verification["target"]: verification for verification in target_verifications
}
nodes = []
for requirement in _TARGET_REQUIREMENTS:
verification = verifications_by_target.get(requirement["target"]) or {}
target_ready = verification.get("status") == "target_receipts_verified"
nodes.append(
{
"node_id": requirement["node_id"],
"stage": "post_write_verifier_dry_run",
"target": requirement["target"],
"artifact": requirement["artifact"],
"status": (
"dry_run_passed"
if source_ready and target_ready
else "blocked_waiting_valid_receipts"
),
"write_enabled": False,
"requires_controlled_writeback_verifier": True,
}
)
return nodes
def _active_blockers(
*,
source_ready: bool,
source_boundaries: dict[str, Any],
receipts: list[dict[str, Any]],
target_verifications: list[dict[str, Any]],
) -> list[str]:
blockers: list[str] = []
if not source_ready:
blockers.append("feedback_receipt_dry_run_not_ready")
if not receipts:
blockers.append("candidate_receipts_missing")
if source_boundaries.get("dry_run_only") is not True:
blockers.append("source_boundary_dry_run_only_not_true")
for boundary in _SOURCE_FALSE_BOUNDARIES:
if source_boundaries.get(boundary) is not False:
blockers.append(f"source_boundary_{boundary}_not_false")
for verification in target_verifications:
if verification["candidate_count"] == 0:
blockers.append(f"{verification['target']}_candidate_receipts_missing")
if verification["invalid_candidate_count"]:
blockers.append(f"{verification['target']}_candidate_receipts_invalid")
return _unique(blockers)
def _missing_value(value: Any) -> bool:
if value is None:
return True
if isinstance(value, str) and not value.strip():
return True
return False
def _unique(values: list[str]) -> list[str]:
seen = set()
result = []
for value in values:
if value in seen:
continue
seen.add(value)
result.append(value)
return result

View File

@@ -0,0 +1,109 @@
from __future__ import annotations
from fastapi import FastAPI
from fastapi.testclient import TestClient
from src.api.v1.agents import router
from src.services.ai_agent_log_post_write_verifier_dry_run import (
load_latest_ai_agent_log_post_write_verifier_dry_run,
)
def test_log_post_write_verifier_dry_run_loader_verifies_all_targets():
payload = load_latest_ai_agent_log_post_write_verifier_dry_run()
_assert_post_write_verifier_payload(payload)
def test_log_post_write_verifier_dry_run_endpoint_returns_readback():
app = FastAPI()
app.include_router(router, prefix="/api/v1")
client = TestClient(app)
response = client.get("/api/v1/agents/agent-log-post-write-verifier-dry-run")
assert response.status_code == 200
_assert_post_write_verifier_payload(response.json())
def _assert_post_write_verifier_payload(payload: dict):
assert payload["schema_version"] == "ai_agent_log_post_write_verifier_dry_run_v1"
assert payload["priority"] == "P1-LOG-KM-RAG-MCP-PLAYBOOK"
assert payload["status"] == "post_write_verifier_dry_run_ready"
assert payload["active_blockers"] == []
assert payload["readback"]["source_status"] == "trusted_feedback_receipt_dry_run_ready"
assert payload["readback"]["safe_next_step"] == (
"enable_controlled_km_rag_playbook_trust_writeback_with_post_write_verifier"
)
assert payload["rollups"]["verifier_node_count"] == 6
assert payload["rollups"]["passed_verifier_node_count"] == 6
assert payload["rollups"]["target_count"] == 6
assert payload["rollups"]["verified_target_count"] == 6
assert payload["rollups"]["candidate_receipt_count"] == 12
assert payload["rollups"]["verified_candidate_receipt_count"] == 12
assert payload["rollups"]["source_pipeline_node_count"] == 9
assert payload["rollups"]["source_candidate_receipt_count"] == 12
assert payload["rollups"]["runtime_sample_count"] == 2
assert payload["rollups"]["dry_run_ready"] is True
assert payload["rollups"]["controlled_writeback_next_step_ready"] is True
assert payload["rollups"]["writeback_authorized_by_this_endpoint"] is False
nodes = {node["node_id"]: node for node in payload["verifier_nodes"]}
assert set(nodes) == {
"verify_km_memory_receipt_shape",
"verify_rag_chunk_receipt_shape",
"verify_playbook_trust_receipt_shape",
"verify_mcp_audit_feedback_shape",
"verify_post_apply_feedback_binding",
"verify_agent_decision_context_binding",
}
assert all(node["status"] == "dry_run_passed" for node in nodes.values())
assert all(node["write_enabled"] is False for node in nodes.values())
assert all(
node["requires_controlled_writeback_verifier"] is True
for node in nodes.values()
)
verifications = {
verification["target"]: verification
for verification in payload["target_verifications"]
}
assert set(verifications) == {"km", "rag", "playbook", "mcp", "verifier", "ai_agent"}
for verification in verifications.values():
assert verification["status"] == "target_receipts_verified"
assert verification["candidate_count"] == 2
assert verification["verified_candidate_count"] == 2
assert verification["invalid_candidate_count"] == 0
assert verification["invalid_receipts"] == []
assert verification["write_enabled"] is False
assert {
"receipt_id",
"source_sample_id",
"project_id",
"product",
"service",
"package",
"tool",
"source_system",
"redaction_state",
"observed_event_count",
"observed_field_count",
"dry_run_status",
"write_enabled",
"raw_log_payload_persisted",
}.issubset(set(verification["required_fields"]))
boundaries = payload["operation_boundaries"]
assert boundaries["dry_run_only"] is True
assert boundaries["post_write_verifier_dry_run_performed"] is True
assert boundaries["km_write_performed"] is False
assert boundaries["rag_index_write_performed"] is False
assert boundaries["playbook_trust_write_performed"] is False
assert boundaries["mcp_tool_call_performed"] is False
assert boundaries["agent_runtime_action_performed"] is False
assert boundaries["post_write_verifier_runtime_write_performed"] is False
assert boundaries["raw_log_payload_persisted"] is False
assert boundaries["secret_value_collection_allowed"] is False
assert boundaries["workflow_trigger_performed"] is False
assert boundaries["github_api_used"] is False