fix(agent): degrade log consumer db pressure
Some checks failed
CD Pipeline / workflow-shape (push) Successful in 0s
CD Pipeline / cancel-stale-cd (push) Has been skipped
CD Pipeline / build-and-deploy (push) Has been cancelled
CD Pipeline / post-deploy-checks (push) Has been cancelled
CD Pipeline / tests (push) Has been cancelled
Some checks failed
CD Pipeline / workflow-shape (push) Successful in 0s
CD Pipeline / cancel-stale-cd (push) Has been skipped
CD Pipeline / build-and-deploy (push) Has been cancelled
CD Pipeline / post-deploy-checks (push) Has been cancelled
CD Pipeline / tests (push) Has been cancelled
This commit is contained in:
@@ -9,11 +9,13 @@ triggering workflows, sending Telegram, or reading secrets.
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
from typing import Any
|
||||
|
||||
from sqlalchemy import text
|
||||
|
||||
from src.core.logging import get_logger
|
||||
from src.db.base import get_db_context
|
||||
from src.services.ai_agent_log_controlled_writeback_consumer_readback import (
|
||||
CONSUMER_EXECUTOR_ROUTE,
|
||||
@@ -26,6 +28,8 @@ from src.services.ai_agent_log_controlled_writeback_dispatch import (
|
||||
)
|
||||
|
||||
SCHEMA_VERSION = "ai_agent_log_controlled_writeback_consumer_apply_receipt_v1"
|
||||
_DB_RETRY_DELAYS_SECONDS = (0.0, 0.25, 1.0)
|
||||
logger = get_logger(__name__)
|
||||
|
||||
|
||||
async def consume_latest_ai_agent_log_controlled_writeback(
|
||||
@@ -39,20 +43,44 @@ async def consume_latest_ai_agent_log_controlled_writeback(
|
||||
)
|
||||
receipt = build_ai_agent_log_controlled_writeback_consumer_apply_receipt(readback)
|
||||
if receipt["active_blockers"]:
|
||||
receipt["apply_result"] = {
|
||||
"operation_type": CONSUMER_OPERATION_TYPE,
|
||||
"ledger_operation_type": None,
|
||||
"semantic_operation_type": CONSUMER_OPERATION_TYPE,
|
||||
"db_write_status": "not_attempted",
|
||||
"consumer_apply_receipt_row_count": 0,
|
||||
"reason": "active_blockers_present",
|
||||
}
|
||||
return receipt
|
||||
|
||||
rows = []
|
||||
async with get_db_context(project_id) as db:
|
||||
ledger_operation_type = await _resolve_ledger_operation_type(db)
|
||||
for item in receipt["consumer_apply_receipts"]:
|
||||
rows.append(
|
||||
await _insert_consumer_receipt_row(
|
||||
db,
|
||||
project_id=project_id,
|
||||
item=item,
|
||||
ledger_operation_type=ledger_operation_type,
|
||||
)
|
||||
)
|
||||
try:
|
||||
ledger_operation_type, rows = await _write_consumer_receipts_with_retry(
|
||||
project_id=project_id,
|
||||
receipt=receipt,
|
||||
)
|
||||
except Exception as exc: # pragma: no cover - live DB pool pressure
|
||||
logger.warning(
|
||||
"log_controlled_writeback_consumer_apply_db_unavailable",
|
||||
project_id=project_id,
|
||||
error_type=type(exc).__name__,
|
||||
)
|
||||
receipt["status"] = "blocked_waiting_controlled_writeback_consumer_apply_db"
|
||||
receipt["active_blockers"] = _unique(
|
||||
[
|
||||
*receipt.get("active_blockers", []),
|
||||
"consumer_apply_db_write_unavailable",
|
||||
]
|
||||
)
|
||||
receipt["apply_result"] = {
|
||||
"operation_type": CONSUMER_OPERATION_TYPE,
|
||||
"ledger_operation_type": None,
|
||||
"semantic_operation_type": CONSUMER_OPERATION_TYPE,
|
||||
"db_write_status": "unavailable",
|
||||
"error_type": type(exc).__name__,
|
||||
"consumer_apply_receipt_row_count": 0,
|
||||
"rows": [],
|
||||
}
|
||||
return receipt
|
||||
|
||||
inserted_count = sum(1 for row in rows if row["created"] is True)
|
||||
existing_count = sum(1 for row in rows if row["created"] is False)
|
||||
@@ -74,6 +102,54 @@ async def consume_latest_ai_agent_log_controlled_writeback(
|
||||
return receipt
|
||||
|
||||
|
||||
async def _write_consumer_receipts_with_retry(
|
||||
*,
|
||||
project_id: str,
|
||||
receipt: dict[str, Any],
|
||||
) -> tuple[str, list[dict[str, Any]]]:
|
||||
last_error: Exception | None = None
|
||||
for attempt, delay_seconds in enumerate(_DB_RETRY_DELAYS_SECONDS, start=1):
|
||||
if delay_seconds:
|
||||
await asyncio.sleep(delay_seconds)
|
||||
try:
|
||||
return await _write_consumer_receipts_once(
|
||||
project_id=project_id,
|
||||
receipt=receipt,
|
||||
)
|
||||
except Exception as exc: # pragma: no cover - exercised by live pressure
|
||||
last_error = exc
|
||||
logger.warning(
|
||||
"log_controlled_writeback_consumer_apply_attempt_failed",
|
||||
project_id=project_id,
|
||||
attempt=attempt,
|
||||
error_type=type(exc).__name__,
|
||||
)
|
||||
if last_error is not None:
|
||||
raise last_error
|
||||
return FALLBACK_LEDGER_OPERATION_TYPE, []
|
||||
|
||||
|
||||
async def _write_consumer_receipts_once(
|
||||
*,
|
||||
project_id: str,
|
||||
receipt: dict[str, Any],
|
||||
) -> tuple[str, list[dict[str, Any]]]:
|
||||
rows = []
|
||||
async with get_db_context(project_id) as db:
|
||||
await db.execute(text("SET LOCAL statement_timeout = '5000ms'"), {})
|
||||
ledger_operation_type = await _resolve_ledger_operation_type(db)
|
||||
for item in receipt["consumer_apply_receipts"]:
|
||||
rows.append(
|
||||
await _insert_consumer_receipt_row(
|
||||
db,
|
||||
project_id=project_id,
|
||||
item=item,
|
||||
ledger_operation_type=ledger_operation_type,
|
||||
)
|
||||
)
|
||||
return ledger_operation_type, rows
|
||||
|
||||
|
||||
def build_ai_agent_log_controlled_writeback_consumer_apply_receipt(
|
||||
readback: dict[str, Any],
|
||||
) -> dict[str, Any]:
|
||||
|
||||
@@ -10,11 +10,13 @@ prove this route has rolled out.
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from collections.abc import Mapping
|
||||
from typing import Any
|
||||
|
||||
from sqlalchemy import text
|
||||
|
||||
from src.core.logging import get_logger
|
||||
from src.db.base import get_db_context
|
||||
from src.services.ai_agent_log_controlled_writeback_dispatch import (
|
||||
EXECUTOR_ROUTE,
|
||||
@@ -34,6 +36,8 @@ _CONSUMER_SURFACES = {
|
||||
"verifier": "post_apply_verifier_feedback_context",
|
||||
"ai_agent": "autonomous_runtime_decision_context",
|
||||
}
|
||||
_DB_RETRY_DELAYS_SECONDS = (0.0, 0.25, 1.0)
|
||||
logger = get_logger(__name__)
|
||||
|
||||
|
||||
async def load_latest_ai_agent_log_controlled_writeback_consumer_readback(
|
||||
@@ -42,7 +46,51 @@ async def load_latest_ai_agent_log_controlled_writeback_consumer_readback(
|
||||
) -> dict[str, Any]:
|
||||
"""Return live consumer bindings for LOG controlled writeback receipts."""
|
||||
|
||||
try:
|
||||
rows, consumer_rows = await _load_consumer_rows_with_retry(project_id=project_id)
|
||||
except Exception as exc: # pragma: no cover - live DB pool pressure
|
||||
logger.warning(
|
||||
"log_controlled_writeback_consumer_readback_db_unavailable",
|
||||
project_id=project_id,
|
||||
error_type=type(exc).__name__,
|
||||
)
|
||||
return _fallback_consumer_readback(
|
||||
project_id=project_id,
|
||||
error_type=type(exc).__name__,
|
||||
)
|
||||
|
||||
return _build_consumer_readback(rows=rows, consumer_rows=consumer_rows)
|
||||
|
||||
|
||||
async def _load_consumer_rows_with_retry(
|
||||
*,
|
||||
project_id: str,
|
||||
) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]:
|
||||
last_error: Exception | None = None
|
||||
for attempt, delay_seconds in enumerate(_DB_RETRY_DELAYS_SECONDS, start=1):
|
||||
if delay_seconds:
|
||||
await asyncio.sleep(delay_seconds)
|
||||
try:
|
||||
return await _load_consumer_rows_once(project_id=project_id)
|
||||
except Exception as exc: # pragma: no cover - exercised by live pressure
|
||||
last_error = exc
|
||||
logger.warning(
|
||||
"log_controlled_writeback_consumer_readback_attempt_failed",
|
||||
project_id=project_id,
|
||||
attempt=attempt,
|
||||
error_type=type(exc).__name__,
|
||||
)
|
||||
if last_error is not None:
|
||||
raise last_error
|
||||
return [], []
|
||||
|
||||
|
||||
async def _load_consumer_rows_once(
|
||||
*,
|
||||
project_id: str,
|
||||
) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]:
|
||||
async with get_db_context(project_id) as db:
|
||||
await db.execute(text("SET LOCAL statement_timeout = '5000ms'"), {})
|
||||
result = await db.execute(
|
||||
text("""
|
||||
SELECT
|
||||
@@ -106,8 +154,14 @@ async def load_latest_ai_agent_log_controlled_writeback_consumer_readback(
|
||||
"""),
|
||||
{"operation_type": CONSUMER_OPERATION_TYPE},
|
||||
)
|
||||
rows = _result_rows(result)
|
||||
consumer_rows = _result_rows(consumer_result)
|
||||
return _result_rows(result), _result_rows(consumer_result)
|
||||
|
||||
|
||||
def _build_consumer_readback(
|
||||
*,
|
||||
rows: list[dict[str, Any]],
|
||||
consumer_rows: list[dict[str, Any]],
|
||||
) -> dict[str, Any]:
|
||||
consumer_receipts = _consumer_receipts_by_dispatch_id(consumer_rows)
|
||||
bindings = [_consumer_binding(row, consumer_receipts) for row in rows]
|
||||
active_blockers = _active_blockers(bindings)
|
||||
@@ -219,6 +273,92 @@ async def load_latest_ai_agent_log_controlled_writeback_consumer_readback(
|
||||
}
|
||||
|
||||
|
||||
def _fallback_consumer_readback(
|
||||
*,
|
||||
project_id: str,
|
||||
error_type: str,
|
||||
) -> dict[str, Any]:
|
||||
return {
|
||||
"schema_version": SCHEMA_VERSION,
|
||||
"priority": "P1-LOG-KM-RAG-MCP-PLAYBOOK",
|
||||
"scope": "ai_agent_log_controlled_writeback_consumer_readback",
|
||||
"status": "blocked_waiting_controlled_writeback_consumer_db_readback",
|
||||
"readback": {
|
||||
"workplan_id": "P1-LOG-CONTROLLED-WRITEBACK-CONSUMER-READBACK",
|
||||
"workplan_title": (
|
||||
"LOG metadata ledger receipts consumable by KM / RAG / PlayBook / "
|
||||
"MCP / verifier / AI Agent context"
|
||||
),
|
||||
"source_operation_type": OPERATION_TYPE,
|
||||
"source_executor_route": EXECUTOR_ROUTE,
|
||||
"db_read_status": "unavailable",
|
||||
"error_type": error_type,
|
||||
"safe_next_step": "retry_consumer_readback_after_db_pool_pressure_clears",
|
||||
},
|
||||
"controlled_consume": {
|
||||
"mode": "blocked_waiting_consumer_db_readback",
|
||||
"controlled_consume_allowed": False,
|
||||
"owner_review_required_for_low_medium_high": False,
|
||||
"critical_break_glass_required": True,
|
||||
"target_selector_required": True,
|
||||
"source_of_truth_diff_required": True,
|
||||
"check_mode_required": True,
|
||||
"rollback_required": True,
|
||||
"post_apply_verifier_required": True,
|
||||
"runtime_target_write_performed": False,
|
||||
"consumer_apply_route": (
|
||||
"/api/v1/agents/agent-log-controlled-writeback-consumer-apply"
|
||||
),
|
||||
},
|
||||
"consumer_bindings": [],
|
||||
"target_rollups": _target_rollups([]),
|
||||
"rollups": {
|
||||
"target_count": len(_TARGETS),
|
||||
"dispatch_ledger_row_count": 0,
|
||||
"consumer_apply_receipt_row_count": 0,
|
||||
"consumer_binding_count": 0,
|
||||
"ready_consumer_binding_count": 0,
|
||||
"ready_target_count": 0,
|
||||
"metadata_only_receipt_count": 0,
|
||||
"post_apply_verifier_ref_count": 0,
|
||||
"controlled_consumer_readback_ready": False,
|
||||
"km_consumer_binding_count": 0,
|
||||
"rag_consumer_binding_count": 0,
|
||||
"playbook_consumer_binding_count": 0,
|
||||
"mcp_consumer_binding_count": 0,
|
||||
"verifier_consumer_binding_count": 0,
|
||||
"ai_agent_consumer_binding_count": 0,
|
||||
"target_context_receipt_write_count": 0,
|
||||
"km_context_receipt_write_count": 0,
|
||||
"rag_context_receipt_write_count": 0,
|
||||
"playbook_context_receipt_write_count": 0,
|
||||
"mcp_context_receipt_write_count": 0,
|
||||
"verifier_context_receipt_write_count": 0,
|
||||
"ai_agent_context_receipt_write_count": 0,
|
||||
"runtime_target_write_performed": False,
|
||||
},
|
||||
"active_blockers": [
|
||||
"log_controlled_writeback_consumer_readback_db_unavailable"
|
||||
],
|
||||
"operation_boundaries": {
|
||||
"consumer_readback_only": True,
|
||||
"metadata_ledger_read_performed": False,
|
||||
"consumer_apply_receipt_read_performed": False,
|
||||
"runtime_target_write_performed": False,
|
||||
"km_write_performed": False,
|
||||
"rag_index_write_performed": False,
|
||||
"playbook_trust_write_performed": False,
|
||||
"mcp_tool_call_performed": False,
|
||||
"agent_runtime_action_performed": False,
|
||||
"telegram_send_performed": False,
|
||||
"workflow_trigger_performed": False,
|
||||
"raw_log_payload_persisted": False,
|
||||
"secret_value_collection_allowed": False,
|
||||
"github_api_used": False,
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def _consumer_binding(
|
||||
row: Mapping[str, Any],
|
||||
consumer_receipts: Mapping[str, Mapping[str, Any]],
|
||||
|
||||
@@ -28,6 +28,8 @@ class _FakeDb:
|
||||
self.params: list[dict] = []
|
||||
|
||||
async def execute(self, statement, params: dict):
|
||||
if not params:
|
||||
return _FakeResult(None)
|
||||
self.params.append(params)
|
||||
if "fallback_operation_type" in params:
|
||||
return _FakeResult("km_linked")
|
||||
@@ -50,6 +52,14 @@ class _FakeContext:
|
||||
return False
|
||||
|
||||
|
||||
class _FailingContext:
|
||||
async def __aenter__(self):
|
||||
raise TimeoutError("pool checkout unavailable")
|
||||
|
||||
async def __aexit__(self, _exc_type, _exc, _tb) -> bool:
|
||||
return False
|
||||
|
||||
|
||||
def _consumer_readback() -> dict:
|
||||
targets = ("km", "rag", "playbook", "mcp", "verifier", "ai_agent")
|
||||
bindings = [
|
||||
@@ -185,6 +195,71 @@ async def test_log_controlled_writeback_consumer_apply_writes_idempotent_rows(mo
|
||||
assert '"km_write_performed": false' in params["output"]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_log_controlled_writeback_consumer_apply_blocks_when_readback_degraded(
|
||||
monkeypatch,
|
||||
):
|
||||
degraded_readback = _consumer_readback()
|
||||
degraded_readback["status"] = (
|
||||
"blocked_waiting_controlled_writeback_consumer_db_readback"
|
||||
)
|
||||
degraded_readback["controlled_consume"]["controlled_consume_allowed"] = False
|
||||
degraded_readback["active_blockers"] = [
|
||||
"log_controlled_writeback_consumer_readback_db_unavailable"
|
||||
]
|
||||
|
||||
async def fake_readback(*, project_id: str):
|
||||
assert project_id == "awoooi"
|
||||
return degraded_readback
|
||||
|
||||
monkeypatch.setattr(
|
||||
apply_module,
|
||||
"load_latest_ai_agent_log_controlled_writeback_consumer_readback",
|
||||
fake_readback,
|
||||
)
|
||||
|
||||
payload = await consume_latest_ai_agent_log_controlled_writeback(project_id="awoooi")
|
||||
|
||||
assert payload["status"] == (
|
||||
"blocked_waiting_controlled_writeback_consumer_apply_inputs"
|
||||
)
|
||||
assert "controlled_writeback_consumer_readback_not_ready" in payload[
|
||||
"active_blockers"
|
||||
]
|
||||
assert payload["apply_result"]["db_write_status"] == "not_attempted"
|
||||
assert payload["operation_boundaries"]["runtime_target_write_performed"] is False
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_log_controlled_writeback_consumer_apply_degrades_on_db_write_pressure(
|
||||
monkeypatch,
|
||||
):
|
||||
monkeypatch.setattr(
|
||||
apply_module,
|
||||
"get_db_context",
|
||||
lambda project_id: _FailingContext(),
|
||||
)
|
||||
|
||||
async def fake_readback(*, project_id: str):
|
||||
assert project_id == "awoooi"
|
||||
return _consumer_readback()
|
||||
|
||||
monkeypatch.setattr(
|
||||
apply_module,
|
||||
"load_latest_ai_agent_log_controlled_writeback_consumer_readback",
|
||||
fake_readback,
|
||||
)
|
||||
|
||||
payload = await consume_latest_ai_agent_log_controlled_writeback(project_id="awoooi")
|
||||
|
||||
assert payload["status"] == "blocked_waiting_controlled_writeback_consumer_apply_db"
|
||||
assert "consumer_apply_db_write_unavailable" in payload["active_blockers"]
|
||||
assert payload["apply_result"]["db_write_status"] == "unavailable"
|
||||
assert payload["apply_result"]["error_type"] == "TimeoutError"
|
||||
assert payload["operation_boundaries"]["consumer_context_receipt_write_performed"] is False
|
||||
assert payload["operation_boundaries"]["runtime_target_write_performed"] is False
|
||||
|
||||
|
||||
def test_log_controlled_writeback_consumer_apply_endpoint_returns_receipt(monkeypatch):
|
||||
async def fake_apply():
|
||||
payload = build_ai_agent_log_controlled_writeback_consumer_apply_receipt(
|
||||
|
||||
@@ -32,6 +32,8 @@ class _FakeDb:
|
||||
self.params: list[dict] = []
|
||||
|
||||
async def execute(self, _statement, params: dict):
|
||||
if not params:
|
||||
return _FakeMappingResult([])
|
||||
self.params.append(params)
|
||||
if params.get("operation_type") == "log_controlled_writeback_consumed":
|
||||
return _FakeMappingResult(self.consumer_rows)
|
||||
@@ -49,6 +51,14 @@ class _FakeContext:
|
||||
return False
|
||||
|
||||
|
||||
class _FailingContext:
|
||||
async def __aenter__(self):
|
||||
raise TimeoutError("pool checkout unavailable")
|
||||
|
||||
async def __aexit__(self, _exc_type, _exc, _tb) -> bool:
|
||||
return False
|
||||
|
||||
|
||||
def _ledger_rows() -> list[dict]:
|
||||
return [
|
||||
{
|
||||
@@ -147,6 +157,30 @@ async def test_log_controlled_writeback_consumer_loader_reads_apply_receipts(mon
|
||||
assert binding["target_write_receipt"]["runtime_target_write_performed"] is True
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_log_controlled_writeback_consumer_loader_degrades_on_db_pressure(monkeypatch):
|
||||
monkeypatch.setattr(
|
||||
consumer_module,
|
||||
"get_db_context",
|
||||
lambda project_id: _FailingContext(),
|
||||
)
|
||||
|
||||
payload = await load_latest_ai_agent_log_controlled_writeback_consumer_readback()
|
||||
|
||||
assert payload["status"] == (
|
||||
"blocked_waiting_controlled_writeback_consumer_db_readback"
|
||||
)
|
||||
assert payload["readback"]["db_read_status"] == "unavailable"
|
||||
assert payload["readback"]["error_type"] == "TimeoutError"
|
||||
assert payload["controlled_consume"]["controlled_consume_allowed"] is False
|
||||
assert payload["rollups"]["controlled_consumer_readback_ready"] is False
|
||||
assert payload["active_blockers"] == [
|
||||
"log_controlled_writeback_consumer_readback_db_unavailable"
|
||||
]
|
||||
assert payload["operation_boundaries"]["metadata_ledger_read_performed"] is False
|
||||
assert payload["operation_boundaries"]["runtime_target_write_performed"] is False
|
||||
|
||||
|
||||
def test_log_controlled_writeback_consumer_endpoint_returns_readback(monkeypatch):
|
||||
async def fake_loader():
|
||||
fake_db = _FakeDb(_ledger_rows())
|
||||
|
||||
Reference in New Issue
Block a user