From 62071dd1f76cd1466c2f93f80d9252493deffefe Mon Sep 17 00:00:00 2001 From: Your Name Date: Wed, 1 Jul 2026 21:07:27 +0800 Subject: [PATCH] fix(agent): degrade log consumer db pressure --- ...log_controlled_writeback_consumer_apply.py | 100 ++++++++++-- ..._controlled_writeback_consumer_readback.py | 144 +++++++++++++++++- ...controlled_writeback_consumer_apply_api.py | 75 +++++++++ ...trolled_writeback_consumer_readback_api.py | 34 +++++ 4 files changed, 339 insertions(+), 14 deletions(-) diff --git a/apps/api/src/services/ai_agent_log_controlled_writeback_consumer_apply.py b/apps/api/src/services/ai_agent_log_controlled_writeback_consumer_apply.py index 402929ae..a9feb5ca 100644 --- a/apps/api/src/services/ai_agent_log_controlled_writeback_consumer_apply.py +++ b/apps/api/src/services/ai_agent_log_controlled_writeback_consumer_apply.py @@ -9,11 +9,13 @@ triggering workflows, sending Telegram, or reading secrets. from __future__ import annotations +import asyncio import json from typing import Any from sqlalchemy import text +from src.core.logging import get_logger from src.db.base import get_db_context from src.services.ai_agent_log_controlled_writeback_consumer_readback import ( CONSUMER_EXECUTOR_ROUTE, @@ -26,6 +28,8 @@ from src.services.ai_agent_log_controlled_writeback_dispatch import ( ) SCHEMA_VERSION = "ai_agent_log_controlled_writeback_consumer_apply_receipt_v1" +_DB_RETRY_DELAYS_SECONDS = (0.0, 0.25, 1.0) +logger = get_logger(__name__) async def consume_latest_ai_agent_log_controlled_writeback( @@ -39,20 +43,44 @@ async def consume_latest_ai_agent_log_controlled_writeback( ) receipt = build_ai_agent_log_controlled_writeback_consumer_apply_receipt(readback) if receipt["active_blockers"]: + receipt["apply_result"] = { + "operation_type": CONSUMER_OPERATION_TYPE, + "ledger_operation_type": None, + "semantic_operation_type": CONSUMER_OPERATION_TYPE, + "db_write_status": "not_attempted", + "consumer_apply_receipt_row_count": 0, + "reason": "active_blockers_present", + } return receipt - rows = [] - async with get_db_context(project_id) as db: - ledger_operation_type = await _resolve_ledger_operation_type(db) - for item in receipt["consumer_apply_receipts"]: - rows.append( - await _insert_consumer_receipt_row( - db, - project_id=project_id, - item=item, - ledger_operation_type=ledger_operation_type, - ) - ) + try: + ledger_operation_type, rows = await _write_consumer_receipts_with_retry( + project_id=project_id, + receipt=receipt, + ) + except Exception as exc: # pragma: no cover - live DB pool pressure + logger.warning( + "log_controlled_writeback_consumer_apply_db_unavailable", + project_id=project_id, + error_type=type(exc).__name__, + ) + receipt["status"] = "blocked_waiting_controlled_writeback_consumer_apply_db" + receipt["active_blockers"] = _unique( + [ + *receipt.get("active_blockers", []), + "consumer_apply_db_write_unavailable", + ] + ) + receipt["apply_result"] = { + "operation_type": CONSUMER_OPERATION_TYPE, + "ledger_operation_type": None, + "semantic_operation_type": CONSUMER_OPERATION_TYPE, + "db_write_status": "unavailable", + "error_type": type(exc).__name__, + "consumer_apply_receipt_row_count": 0, + "rows": [], + } + return receipt inserted_count = sum(1 for row in rows if row["created"] is True) existing_count = sum(1 for row in rows if row["created"] is False) @@ -74,6 +102,54 @@ async def consume_latest_ai_agent_log_controlled_writeback( return receipt +async def _write_consumer_receipts_with_retry( + *, + project_id: str, + receipt: dict[str, Any], +) -> tuple[str, list[dict[str, Any]]]: + last_error: Exception | None = None + for attempt, delay_seconds in enumerate(_DB_RETRY_DELAYS_SECONDS, start=1): + if delay_seconds: + await asyncio.sleep(delay_seconds) + try: + return await _write_consumer_receipts_once( + project_id=project_id, + receipt=receipt, + ) + except Exception as exc: # pragma: no cover - exercised by live pressure + last_error = exc + logger.warning( + "log_controlled_writeback_consumer_apply_attempt_failed", + project_id=project_id, + attempt=attempt, + error_type=type(exc).__name__, + ) + if last_error is not None: + raise last_error + return FALLBACK_LEDGER_OPERATION_TYPE, [] + + +async def _write_consumer_receipts_once( + *, + project_id: str, + receipt: dict[str, Any], +) -> tuple[str, list[dict[str, Any]]]: + rows = [] + async with get_db_context(project_id) as db: + await db.execute(text("SET LOCAL statement_timeout = '5000ms'"), {}) + ledger_operation_type = await _resolve_ledger_operation_type(db) + for item in receipt["consumer_apply_receipts"]: + rows.append( + await _insert_consumer_receipt_row( + db, + project_id=project_id, + item=item, + ledger_operation_type=ledger_operation_type, + ) + ) + return ledger_operation_type, rows + + def build_ai_agent_log_controlled_writeback_consumer_apply_receipt( readback: dict[str, Any], ) -> dict[str, Any]: diff --git a/apps/api/src/services/ai_agent_log_controlled_writeback_consumer_readback.py b/apps/api/src/services/ai_agent_log_controlled_writeback_consumer_readback.py index 5400409f..d9358a68 100644 --- a/apps/api/src/services/ai_agent_log_controlled_writeback_consumer_readback.py +++ b/apps/api/src/services/ai_agent_log_controlled_writeback_consumer_readback.py @@ -10,11 +10,13 @@ prove this route has rolled out. from __future__ import annotations +import asyncio from collections.abc import Mapping from typing import Any from sqlalchemy import text +from src.core.logging import get_logger from src.db.base import get_db_context from src.services.ai_agent_log_controlled_writeback_dispatch import ( EXECUTOR_ROUTE, @@ -34,6 +36,8 @@ _CONSUMER_SURFACES = { "verifier": "post_apply_verifier_feedback_context", "ai_agent": "autonomous_runtime_decision_context", } +_DB_RETRY_DELAYS_SECONDS = (0.0, 0.25, 1.0) +logger = get_logger(__name__) async def load_latest_ai_agent_log_controlled_writeback_consumer_readback( @@ -42,7 +46,51 @@ async def load_latest_ai_agent_log_controlled_writeback_consumer_readback( ) -> dict[str, Any]: """Return live consumer bindings for LOG controlled writeback receipts.""" + try: + rows, consumer_rows = await _load_consumer_rows_with_retry(project_id=project_id) + except Exception as exc: # pragma: no cover - live DB pool pressure + logger.warning( + "log_controlled_writeback_consumer_readback_db_unavailable", + project_id=project_id, + error_type=type(exc).__name__, + ) + return _fallback_consumer_readback( + project_id=project_id, + error_type=type(exc).__name__, + ) + + return _build_consumer_readback(rows=rows, consumer_rows=consumer_rows) + + +async def _load_consumer_rows_with_retry( + *, + project_id: str, +) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]: + last_error: Exception | None = None + for attempt, delay_seconds in enumerate(_DB_RETRY_DELAYS_SECONDS, start=1): + if delay_seconds: + await asyncio.sleep(delay_seconds) + try: + return await _load_consumer_rows_once(project_id=project_id) + except Exception as exc: # pragma: no cover - exercised by live pressure + last_error = exc + logger.warning( + "log_controlled_writeback_consumer_readback_attempt_failed", + project_id=project_id, + attempt=attempt, + error_type=type(exc).__name__, + ) + if last_error is not None: + raise last_error + return [], [] + + +async def _load_consumer_rows_once( + *, + project_id: str, +) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]: async with get_db_context(project_id) as db: + await db.execute(text("SET LOCAL statement_timeout = '5000ms'"), {}) result = await db.execute( text(""" SELECT @@ -106,8 +154,14 @@ async def load_latest_ai_agent_log_controlled_writeback_consumer_readback( """), {"operation_type": CONSUMER_OPERATION_TYPE}, ) - rows = _result_rows(result) - consumer_rows = _result_rows(consumer_result) + return _result_rows(result), _result_rows(consumer_result) + + +def _build_consumer_readback( + *, + rows: list[dict[str, Any]], + consumer_rows: list[dict[str, Any]], +) -> dict[str, Any]: consumer_receipts = _consumer_receipts_by_dispatch_id(consumer_rows) bindings = [_consumer_binding(row, consumer_receipts) for row in rows] active_blockers = _active_blockers(bindings) @@ -219,6 +273,92 @@ async def load_latest_ai_agent_log_controlled_writeback_consumer_readback( } +def _fallback_consumer_readback( + *, + project_id: str, + error_type: str, +) -> dict[str, Any]: + return { + "schema_version": SCHEMA_VERSION, + "priority": "P1-LOG-KM-RAG-MCP-PLAYBOOK", + "scope": "ai_agent_log_controlled_writeback_consumer_readback", + "status": "blocked_waiting_controlled_writeback_consumer_db_readback", + "readback": { + "workplan_id": "P1-LOG-CONTROLLED-WRITEBACK-CONSUMER-READBACK", + "workplan_title": ( + "LOG metadata ledger receipts consumable by KM / RAG / PlayBook / " + "MCP / verifier / AI Agent context" + ), + "source_operation_type": OPERATION_TYPE, + "source_executor_route": EXECUTOR_ROUTE, + "db_read_status": "unavailable", + "error_type": error_type, + "safe_next_step": "retry_consumer_readback_after_db_pool_pressure_clears", + }, + "controlled_consume": { + "mode": "blocked_waiting_consumer_db_readback", + "controlled_consume_allowed": False, + "owner_review_required_for_low_medium_high": False, + "critical_break_glass_required": True, + "target_selector_required": True, + "source_of_truth_diff_required": True, + "check_mode_required": True, + "rollback_required": True, + "post_apply_verifier_required": True, + "runtime_target_write_performed": False, + "consumer_apply_route": ( + "/api/v1/agents/agent-log-controlled-writeback-consumer-apply" + ), + }, + "consumer_bindings": [], + "target_rollups": _target_rollups([]), + "rollups": { + "target_count": len(_TARGETS), + "dispatch_ledger_row_count": 0, + "consumer_apply_receipt_row_count": 0, + "consumer_binding_count": 0, + "ready_consumer_binding_count": 0, + "ready_target_count": 0, + "metadata_only_receipt_count": 0, + "post_apply_verifier_ref_count": 0, + "controlled_consumer_readback_ready": False, + "km_consumer_binding_count": 0, + "rag_consumer_binding_count": 0, + "playbook_consumer_binding_count": 0, + "mcp_consumer_binding_count": 0, + "verifier_consumer_binding_count": 0, + "ai_agent_consumer_binding_count": 0, + "target_context_receipt_write_count": 0, + "km_context_receipt_write_count": 0, + "rag_context_receipt_write_count": 0, + "playbook_context_receipt_write_count": 0, + "mcp_context_receipt_write_count": 0, + "verifier_context_receipt_write_count": 0, + "ai_agent_context_receipt_write_count": 0, + "runtime_target_write_performed": False, + }, + "active_blockers": [ + "log_controlled_writeback_consumer_readback_db_unavailable" + ], + "operation_boundaries": { + "consumer_readback_only": True, + "metadata_ledger_read_performed": False, + "consumer_apply_receipt_read_performed": False, + "runtime_target_write_performed": False, + "km_write_performed": False, + "rag_index_write_performed": False, + "playbook_trust_write_performed": False, + "mcp_tool_call_performed": False, + "agent_runtime_action_performed": False, + "telegram_send_performed": False, + "workflow_trigger_performed": False, + "raw_log_payload_persisted": False, + "secret_value_collection_allowed": False, + "github_api_used": False, + }, + } + + def _consumer_binding( row: Mapping[str, Any], consumer_receipts: Mapping[str, Mapping[str, Any]], diff --git a/apps/api/tests/test_ai_agent_log_controlled_writeback_consumer_apply_api.py b/apps/api/tests/test_ai_agent_log_controlled_writeback_consumer_apply_api.py index 36730e77..631f8b27 100644 --- a/apps/api/tests/test_ai_agent_log_controlled_writeback_consumer_apply_api.py +++ b/apps/api/tests/test_ai_agent_log_controlled_writeback_consumer_apply_api.py @@ -28,6 +28,8 @@ class _FakeDb: self.params: list[dict] = [] async def execute(self, statement, params: dict): + if not params: + return _FakeResult(None) self.params.append(params) if "fallback_operation_type" in params: return _FakeResult("km_linked") @@ -50,6 +52,14 @@ class _FakeContext: return False +class _FailingContext: + async def __aenter__(self): + raise TimeoutError("pool checkout unavailable") + + async def __aexit__(self, _exc_type, _exc, _tb) -> bool: + return False + + def _consumer_readback() -> dict: targets = ("km", "rag", "playbook", "mcp", "verifier", "ai_agent") bindings = [ @@ -185,6 +195,71 @@ async def test_log_controlled_writeback_consumer_apply_writes_idempotent_rows(mo assert '"km_write_performed": false' in params["output"] +@pytest.mark.asyncio +async def test_log_controlled_writeback_consumer_apply_blocks_when_readback_degraded( + monkeypatch, +): + degraded_readback = _consumer_readback() + degraded_readback["status"] = ( + "blocked_waiting_controlled_writeback_consumer_db_readback" + ) + degraded_readback["controlled_consume"]["controlled_consume_allowed"] = False + degraded_readback["active_blockers"] = [ + "log_controlled_writeback_consumer_readback_db_unavailable" + ] + + async def fake_readback(*, project_id: str): + assert project_id == "awoooi" + return degraded_readback + + monkeypatch.setattr( + apply_module, + "load_latest_ai_agent_log_controlled_writeback_consumer_readback", + fake_readback, + ) + + payload = await consume_latest_ai_agent_log_controlled_writeback(project_id="awoooi") + + assert payload["status"] == ( + "blocked_waiting_controlled_writeback_consumer_apply_inputs" + ) + assert "controlled_writeback_consumer_readback_not_ready" in payload[ + "active_blockers" + ] + assert payload["apply_result"]["db_write_status"] == "not_attempted" + assert payload["operation_boundaries"]["runtime_target_write_performed"] is False + + +@pytest.mark.asyncio +async def test_log_controlled_writeback_consumer_apply_degrades_on_db_write_pressure( + monkeypatch, +): + monkeypatch.setattr( + apply_module, + "get_db_context", + lambda project_id: _FailingContext(), + ) + + async def fake_readback(*, project_id: str): + assert project_id == "awoooi" + return _consumer_readback() + + monkeypatch.setattr( + apply_module, + "load_latest_ai_agent_log_controlled_writeback_consumer_readback", + fake_readback, + ) + + payload = await consume_latest_ai_agent_log_controlled_writeback(project_id="awoooi") + + assert payload["status"] == "blocked_waiting_controlled_writeback_consumer_apply_db" + assert "consumer_apply_db_write_unavailable" in payload["active_blockers"] + assert payload["apply_result"]["db_write_status"] == "unavailable" + assert payload["apply_result"]["error_type"] == "TimeoutError" + assert payload["operation_boundaries"]["consumer_context_receipt_write_performed"] is False + assert payload["operation_boundaries"]["runtime_target_write_performed"] is False + + def test_log_controlled_writeback_consumer_apply_endpoint_returns_receipt(monkeypatch): async def fake_apply(): payload = build_ai_agent_log_controlled_writeback_consumer_apply_receipt( diff --git a/apps/api/tests/test_ai_agent_log_controlled_writeback_consumer_readback_api.py b/apps/api/tests/test_ai_agent_log_controlled_writeback_consumer_readback_api.py index 7b96589c..62bd7e05 100644 --- a/apps/api/tests/test_ai_agent_log_controlled_writeback_consumer_readback_api.py +++ b/apps/api/tests/test_ai_agent_log_controlled_writeback_consumer_readback_api.py @@ -32,6 +32,8 @@ class _FakeDb: self.params: list[dict] = [] async def execute(self, _statement, params: dict): + if not params: + return _FakeMappingResult([]) self.params.append(params) if params.get("operation_type") == "log_controlled_writeback_consumed": return _FakeMappingResult(self.consumer_rows) @@ -49,6 +51,14 @@ class _FakeContext: return False +class _FailingContext: + async def __aenter__(self): + raise TimeoutError("pool checkout unavailable") + + async def __aexit__(self, _exc_type, _exc, _tb) -> bool: + return False + + def _ledger_rows() -> list[dict]: return [ { @@ -147,6 +157,30 @@ async def test_log_controlled_writeback_consumer_loader_reads_apply_receipts(mon assert binding["target_write_receipt"]["runtime_target_write_performed"] is True +@pytest.mark.asyncio +async def test_log_controlled_writeback_consumer_loader_degrades_on_db_pressure(monkeypatch): + monkeypatch.setattr( + consumer_module, + "get_db_context", + lambda project_id: _FailingContext(), + ) + + payload = await load_latest_ai_agent_log_controlled_writeback_consumer_readback() + + assert payload["status"] == ( + "blocked_waiting_controlled_writeback_consumer_db_readback" + ) + assert payload["readback"]["db_read_status"] == "unavailable" + assert payload["readback"]["error_type"] == "TimeoutError" + assert payload["controlled_consume"]["controlled_consume_allowed"] is False + assert payload["rollups"]["controlled_consumer_readback_ready"] is False + assert payload["active_blockers"] == [ + "log_controlled_writeback_consumer_readback_db_unavailable" + ] + assert payload["operation_boundaries"]["metadata_ledger_read_performed"] is False + assert payload["operation_boundaries"]["runtime_target_write_performed"] is False + + def test_log_controlled_writeback_consumer_endpoint_returns_readback(monkeypatch): async def fake_loader(): fake_db = _FakeDb(_ledger_rows())