From f300b459dc2bf2d60077996c6022a42e2165dfec Mon Sep 17 00:00:00 2001 From: Your Name Date: Mon, 29 Jun 2026 21:15:36 +0800 Subject: [PATCH] feat(api): verify log feedback writeback dry run --- apps/api/src/api/v1/agents.py | 34 ++ ...i_agent_log_post_write_verifier_dry_run.py | 325 ++++++++++++++++++ ...ent_log_post_write_verifier_dry_run_api.py | 109 ++++++ 3 files changed, 468 insertions(+) create mode 100644 apps/api/src/services/ai_agent_log_post_write_verifier_dry_run.py create mode 100644 apps/api/tests/test_ai_agent_log_post_write_verifier_dry_run_api.py diff --git a/apps/api/src/api/v1/agents.py b/apps/api/src/api/v1/agents.py index 91f38e21..cfe08dc3 100644 --- a/apps/api/src/api/v1/agents.py +++ b/apps/api/src/api/v1/agents.py @@ -106,6 +106,9 @@ from src.services.ai_agent_log_feedback_receipt_dry_run import ( from src.services.ai_agent_log_intelligence_integration_readback import ( load_latest_ai_agent_log_intelligence_integration_readback, ) +from src.services.ai_agent_log_post_write_verifier_dry_run import ( + load_latest_ai_agent_log_post_write_verifier_dry_run, +) from src.services.ai_agent_low_medium_risk_whitelist import ( load_latest_ai_agent_low_medium_risk_whitelist, ) @@ -1869,6 +1872,37 @@ async def get_agent_log_feedback_receipt_dry_run() -> dict[str, Any]: ) from exc +@router.get( + "/agent-log-post-write-verifier-dry-run", + response_model=dict[str, Any], + summary="取得 AI Agent LOG post-write verifier dry-run", + description=( + "驗證 LOG feedback receipt 候選是否已具備 KM、RAG、PlayBook、MCP audit、" + "post-apply verifier 與 AI Agent 決策上下文回寫前的必要標籤、redaction 與 no-write 邊界。" + "此端點不寫 KM、不寫 RAG index、不更新 PlayBook trust、不呼叫 MCP tool、" + "不執行 runtime action、不保存 raw log payload、不讀 secret、不呼叫 GitHub。" + ), +) +async def get_agent_log_post_write_verifier_dry_run() -> dict[str, Any]: + """Return LOG feedback post-write verifier dry-run readback.""" + try: + payload = await asyncio.to_thread( + load_latest_ai_agent_log_post_write_verifier_dry_run + ) + return redact_public_lan_topology(payload) + except FileNotFoundError as exc: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=str(exc), + ) from exc + except (json.JSONDecodeError, ValueError) as exc: + logger.error("ai_agent_log_post_write_verifier_dry_run_invalid", error=str(exc)) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="AI Agent LOG post-write verifier dry-run 無效", + ) from exc + + @router.get( "/agent-telegram-receipt-approval-package", response_model=dict[str, Any], diff --git a/apps/api/src/services/ai_agent_log_post_write_verifier_dry_run.py b/apps/api/src/services/ai_agent_log_post_write_verifier_dry_run.py new file mode 100644 index 00000000..4d67dc75 --- /dev/null +++ b/apps/api/src/services/ai_agent_log_post_write_verifier_dry_run.py @@ -0,0 +1,325 @@ +"""AI Agent LOG post-write verifier dry-run. + +Validates LOG feedback receipt candidates before a controlled KM / RAG / +PlayBook / MCP trust writeback lane is enabled. This module is deliberately +dry-run only: it performs structural verification against the committed +receipt plan and does not write KM, index RAG, update PlayBook trust, call MCP +tools, execute runtime actions, or persist raw log payloads. +""" + +from __future__ import annotations + +from typing import Any + +from src.services.ai_agent_log_feedback_receipt_dry_run import ( + load_latest_ai_agent_log_feedback_receipt_dry_run, +) + +_SCHEMA_VERSION = "ai_agent_log_post_write_verifier_dry_run_v1" +_SOURCE_READY_STATUS = "trusted_feedback_receipt_dry_run_ready" + +_COMMON_REQUIRED_FIELDS = ( + "receipt_id", + "target", + "source_sample_id", + "project_id", + "product", + "service", + "package", + "tool", + "source_system", + "redaction_state", + "observed_event_count", + "observed_field_count", + "dry_run_status", + "write_enabled", + "raw_log_payload_persisted", +) + +_TARGET_REQUIREMENTS: tuple[dict[str, Any], ...] = ( + { + "target": "km", + "node_id": "verify_km_memory_receipt_shape", + "artifact": "km_memory_receipt_candidate", + "required_fields": _COMMON_REQUIRED_FIELDS, + }, + { + "target": "rag", + "node_id": "verify_rag_chunk_receipt_shape", + "artifact": "rag_chunk_receipt_candidate", + "required_fields": _COMMON_REQUIRED_FIELDS, + }, + { + "target": "playbook", + "node_id": "verify_playbook_trust_receipt_shape", + "artifact": "playbook_trust_receipt_candidate", + "required_fields": _COMMON_REQUIRED_FIELDS, + }, + { + "target": "mcp", + "node_id": "verify_mcp_audit_feedback_shape", + "artifact": "mcp_audit_feedback_candidate", + "required_fields": _COMMON_REQUIRED_FIELDS, + }, + { + "target": "verifier", + "node_id": "verify_post_apply_feedback_binding", + "artifact": "post_apply_verifier_feedback_receipt", + "required_fields": _COMMON_REQUIRED_FIELDS, + }, + { + "target": "ai_agent", + "node_id": "verify_agent_decision_context_binding", + "artifact": "agent_decision_context_dry_run", + "required_fields": _COMMON_REQUIRED_FIELDS, + }, +) + +_SOURCE_FALSE_BOUNDARIES = ( + "km_write_performed", + "rag_index_write_performed", + "playbook_trust_write_performed", + "mcp_tool_call_performed", + "agent_runtime_action_performed", + "post_apply_verifier_executed", + "raw_log_payload_persisted", + "secret_value_collection_allowed", + "workflow_trigger_performed", + "github_api_used", +) + + +def load_latest_ai_agent_log_post_write_verifier_dry_run() -> dict[str, Any]: + """Return the latest dry-run verifier for LOG feedback receipt candidates.""" + source = load_latest_ai_agent_log_feedback_receipt_dry_run() + source_rollups = source.get("rollups") or {} + source_boundaries = source.get("operation_boundaries") or {} + receipts = _candidate_receipts(source) + source_ready = ( + source.get("status") == _SOURCE_READY_STATUS + and source_rollups.get("dry_run_ready") is True + and source.get("active_blockers") == [] + ) + target_verifications = [ + _verify_target(requirement, receipts) for requirement in _TARGET_REQUIREMENTS + ] + active_blockers = _active_blockers( + source_ready=source_ready, + source_boundaries=source_boundaries, + receipts=receipts, + target_verifications=target_verifications, + ) + verifier_nodes = _verifier_nodes( + source_ready=source_ready, + target_verifications=target_verifications, + ) + verified_candidate_count = sum( + verification["verified_candidate_count"] + for verification in target_verifications + ) + + return { + "schema_version": _SCHEMA_VERSION, + "priority": "P1-LOG-KM-RAG-MCP-PLAYBOOK", + "scope": "ai_agent_log_post_write_verifier_dry_run", + "status": ( + "post_write_verifier_dry_run_ready" + if not active_blockers + else "blocked_waiting_feedback_receipt_verifier" + ), + "readback": { + "workplan_id": "P1-LOG-POST-WRITE-VERIFIER-DRY-RUN", + "workplan_title": "KM / RAG / PlayBook / MCP feedback receipt post-write verifier dry-run", + "source_schema_version": source.get("schema_version"), + "source_status": source.get("status"), + "safe_next_step": ( + "enable_controlled_km_rag_playbook_trust_writeback_with_post_write_verifier" + ), + }, + "verifier_nodes": verifier_nodes, + "target_verifications": target_verifications, + "rollups": { + "verifier_node_count": len(verifier_nodes), + "passed_verifier_node_count": sum( + 1 for node in verifier_nodes if node["status"] == "dry_run_passed" + ), + "target_count": len(_TARGET_REQUIREMENTS), + "verified_target_count": sum( + 1 + for verification in target_verifications + if verification["status"] == "target_receipts_verified" + ), + "candidate_receipt_count": len(receipts), + "verified_candidate_receipt_count": verified_candidate_count, + "source_pipeline_node_count": source_rollups.get("pipeline_node_count", 0), + "source_candidate_receipt_count": source_rollups.get( + "candidate_receipt_count", + 0, + ), + "runtime_sample_count": source_rollups.get("runtime_sample_count", 0), + "dry_run_ready": not active_blockers, + "controlled_writeback_next_step_ready": not active_blockers, + "writeback_authorized_by_this_endpoint": False, + }, + "active_blockers": active_blockers, + "operation_boundaries": { + "dry_run_only": True, + "post_write_verifier_dry_run_performed": True, + "km_write_performed": False, + "rag_index_write_performed": False, + "playbook_trust_write_performed": False, + "mcp_tool_call_performed": False, + "agent_runtime_action_performed": False, + "post_write_verifier_runtime_write_performed": False, + "raw_log_payload_persisted": False, + "secret_value_collection_allowed": False, + "workflow_trigger_performed": False, + "github_api_used": False, + }, + } + + +def _candidate_receipts(source: dict[str, Any]) -> list[dict[str, Any]]: + receipts = source.get("candidate_receipts") + if not isinstance(receipts, list): + return [] + return [receipt for receipt in receipts if isinstance(receipt, dict)] + + +def _verify_target( + requirement: dict[str, Any], + receipts: list[dict[str, Any]], +) -> dict[str, Any]: + target = requirement["target"] + required_fields = tuple(requirement["required_fields"]) + target_receipts = [ + receipt for receipt in receipts if receipt.get("target") == target + ] + invalid_receipts = [ + _invalid_receipt(receipt, required_fields) + for receipt in target_receipts + ] + invalid_receipts = [invalid for invalid in invalid_receipts if invalid["reasons"]] + + return { + "target": target, + "artifact": requirement["artifact"], + "status": ( + "target_receipts_verified" + if target_receipts and not invalid_receipts + else "target_receipts_invalid_or_missing" + ), + "candidate_count": len(target_receipts), + "verified_candidate_count": len(target_receipts) - len(invalid_receipts), + "invalid_candidate_count": len(invalid_receipts), + "required_fields": list(required_fields), + "invalid_receipts": invalid_receipts, + "write_enabled": False, + } + + +def _invalid_receipt( + receipt: dict[str, Any], + required_fields: tuple[str, ...], +) -> dict[str, Any]: + reasons: list[str] = [] + missing_fields = [ + field for field in required_fields if _missing_value(receipt.get(field)) + ] + if missing_fields: + reasons.append(f"missing_fields:{','.join(missing_fields)}") + if receipt.get("dry_run_status") != "candidate_ready": + reasons.append("dry_run_status_not_candidate_ready") + if receipt.get("write_enabled") is not False: + reasons.append("write_enabled_must_be_false") + if receipt.get("raw_log_payload_persisted") is not False: + reasons.append("raw_log_payload_must_not_be_persisted") + if receipt.get("redaction_state") != "metadata_only_no_raw_payload": + reasons.append("redaction_state_must_be_metadata_only") + if not isinstance(receipt.get("observed_event_count"), int) or ( + receipt.get("observed_event_count") or 0 + ) <= 0: + reasons.append("observed_event_count_must_be_positive") + if not isinstance(receipt.get("observed_field_count"), int) or ( + receipt.get("observed_field_count") or 0 + ) <= 0: + reasons.append("observed_field_count_must_be_positive") + + return { + "receipt_id": str(receipt.get("receipt_id") or ""), + "reasons": reasons, + } + + +def _verifier_nodes( + *, + source_ready: bool, + target_verifications: list[dict[str, Any]], +) -> list[dict[str, Any]]: + verifications_by_target = { + verification["target"]: verification for verification in target_verifications + } + nodes = [] + for requirement in _TARGET_REQUIREMENTS: + verification = verifications_by_target.get(requirement["target"]) or {} + target_ready = verification.get("status") == "target_receipts_verified" + nodes.append( + { + "node_id": requirement["node_id"], + "stage": "post_write_verifier_dry_run", + "target": requirement["target"], + "artifact": requirement["artifact"], + "status": ( + "dry_run_passed" + if source_ready and target_ready + else "blocked_waiting_valid_receipts" + ), + "write_enabled": False, + "requires_controlled_writeback_verifier": True, + } + ) + return nodes + + +def _active_blockers( + *, + source_ready: bool, + source_boundaries: dict[str, Any], + receipts: list[dict[str, Any]], + target_verifications: list[dict[str, Any]], +) -> list[str]: + blockers: list[str] = [] + if not source_ready: + blockers.append("feedback_receipt_dry_run_not_ready") + if not receipts: + blockers.append("candidate_receipts_missing") + if source_boundaries.get("dry_run_only") is not True: + blockers.append("source_boundary_dry_run_only_not_true") + for boundary in _SOURCE_FALSE_BOUNDARIES: + if source_boundaries.get(boundary) is not False: + blockers.append(f"source_boundary_{boundary}_not_false") + for verification in target_verifications: + if verification["candidate_count"] == 0: + blockers.append(f"{verification['target']}_candidate_receipts_missing") + if verification["invalid_candidate_count"]: + blockers.append(f"{verification['target']}_candidate_receipts_invalid") + return _unique(blockers) + + +def _missing_value(value: Any) -> bool: + if value is None: + return True + if isinstance(value, str) and not value.strip(): + return True + return False + + +def _unique(values: list[str]) -> list[str]: + seen = set() + result = [] + for value in values: + if value in seen: + continue + seen.add(value) + result.append(value) + return result diff --git a/apps/api/tests/test_ai_agent_log_post_write_verifier_dry_run_api.py b/apps/api/tests/test_ai_agent_log_post_write_verifier_dry_run_api.py new file mode 100644 index 00000000..9014b9c2 --- /dev/null +++ b/apps/api/tests/test_ai_agent_log_post_write_verifier_dry_run_api.py @@ -0,0 +1,109 @@ +from __future__ import annotations + +from fastapi import FastAPI +from fastapi.testclient import TestClient + +from src.api.v1.agents import router +from src.services.ai_agent_log_post_write_verifier_dry_run import ( + load_latest_ai_agent_log_post_write_verifier_dry_run, +) + + +def test_log_post_write_verifier_dry_run_loader_verifies_all_targets(): + payload = load_latest_ai_agent_log_post_write_verifier_dry_run() + + _assert_post_write_verifier_payload(payload) + + +def test_log_post_write_verifier_dry_run_endpoint_returns_readback(): + app = FastAPI() + app.include_router(router, prefix="/api/v1") + client = TestClient(app) + + response = client.get("/api/v1/agents/agent-log-post-write-verifier-dry-run") + + assert response.status_code == 200 + _assert_post_write_verifier_payload(response.json()) + + +def _assert_post_write_verifier_payload(payload: dict): + assert payload["schema_version"] == "ai_agent_log_post_write_verifier_dry_run_v1" + assert payload["priority"] == "P1-LOG-KM-RAG-MCP-PLAYBOOK" + assert payload["status"] == "post_write_verifier_dry_run_ready" + assert payload["active_blockers"] == [] + assert payload["readback"]["source_status"] == "trusted_feedback_receipt_dry_run_ready" + assert payload["readback"]["safe_next_step"] == ( + "enable_controlled_km_rag_playbook_trust_writeback_with_post_write_verifier" + ) + + assert payload["rollups"]["verifier_node_count"] == 6 + assert payload["rollups"]["passed_verifier_node_count"] == 6 + assert payload["rollups"]["target_count"] == 6 + assert payload["rollups"]["verified_target_count"] == 6 + assert payload["rollups"]["candidate_receipt_count"] == 12 + assert payload["rollups"]["verified_candidate_receipt_count"] == 12 + assert payload["rollups"]["source_pipeline_node_count"] == 9 + assert payload["rollups"]["source_candidate_receipt_count"] == 12 + assert payload["rollups"]["runtime_sample_count"] == 2 + assert payload["rollups"]["dry_run_ready"] is True + assert payload["rollups"]["controlled_writeback_next_step_ready"] is True + assert payload["rollups"]["writeback_authorized_by_this_endpoint"] is False + + nodes = {node["node_id"]: node for node in payload["verifier_nodes"]} + assert set(nodes) == { + "verify_km_memory_receipt_shape", + "verify_rag_chunk_receipt_shape", + "verify_playbook_trust_receipt_shape", + "verify_mcp_audit_feedback_shape", + "verify_post_apply_feedback_binding", + "verify_agent_decision_context_binding", + } + assert all(node["status"] == "dry_run_passed" for node in nodes.values()) + assert all(node["write_enabled"] is False for node in nodes.values()) + assert all( + node["requires_controlled_writeback_verifier"] is True + for node in nodes.values() + ) + + verifications = { + verification["target"]: verification + for verification in payload["target_verifications"] + } + assert set(verifications) == {"km", "rag", "playbook", "mcp", "verifier", "ai_agent"} + for verification in verifications.values(): + assert verification["status"] == "target_receipts_verified" + assert verification["candidate_count"] == 2 + assert verification["verified_candidate_count"] == 2 + assert verification["invalid_candidate_count"] == 0 + assert verification["invalid_receipts"] == [] + assert verification["write_enabled"] is False + assert { + "receipt_id", + "source_sample_id", + "project_id", + "product", + "service", + "package", + "tool", + "source_system", + "redaction_state", + "observed_event_count", + "observed_field_count", + "dry_run_status", + "write_enabled", + "raw_log_payload_persisted", + }.issubset(set(verification["required_fields"])) + + boundaries = payload["operation_boundaries"] + assert boundaries["dry_run_only"] is True + assert boundaries["post_write_verifier_dry_run_performed"] is True + assert boundaries["km_write_performed"] is False + assert boundaries["rag_index_write_performed"] is False + assert boundaries["playbook_trust_write_performed"] is False + assert boundaries["mcp_tool_call_performed"] is False + assert boundaries["agent_runtime_action_performed"] is False + assert boundaries["post_write_verifier_runtime_write_performed"] is False + assert boundaries["raw_log_payload_persisted"] is False + assert boundaries["secret_value_collection_allowed"] is False + assert boundaries["workflow_trigger_performed"] is False + assert boundaries["github_api_used"] is False