fix(awooop): chunk run context lookups
This commit is contained in:
@@ -69,6 +69,7 @@ _MAX_PER_PAGE = 200
|
||||
_MAX_EVENTS = 100
|
||||
_MAX_TIMELINE_ITEMS = 100
|
||||
_MAX_LIST_CONTEXT_ROWS = 500
|
||||
_RUN_CONTEXT_QUERY_CHUNK_SIZE = 500
|
||||
_MAX_STEP_SUMMARY_CHARS = 128
|
||||
_AI_ROUTE_STATUS_SELECT_TIMEOUT_SECONDS = 12.0
|
||||
_AI_ROUTE_STATUS_CONNECTIVITY_TIMEOUT_SECONDS = 2.5
|
||||
@@ -2548,52 +2549,55 @@ async def _load_run_message_context(
|
||||
if not runs:
|
||||
return {}, {}
|
||||
|
||||
run_ids = [run.run_id for run in runs]
|
||||
run_ids_set = set(run_ids)
|
||||
trigger_refs = [str(run.trigger_ref) for run in runs if run.trigger_ref]
|
||||
trigger_ref_to_run = {
|
||||
str(run.trigger_ref): run.run_id
|
||||
for run in runs
|
||||
if run.trigger_ref
|
||||
}
|
||||
trigger_event_ids: list[UUID] = []
|
||||
for trigger_ref in trigger_refs:
|
||||
try:
|
||||
trigger_event_ids.append(uuid.UUID(trigger_ref))
|
||||
except ValueError:
|
||||
continue
|
||||
|
||||
inbound_filters = [AwoooPConversationEvent.run_id.in_(run_ids)]
|
||||
if trigger_refs:
|
||||
inbound_filters.append(AwoooPConversationEvent.provider_event_id.in_(trigger_refs))
|
||||
if trigger_event_ids:
|
||||
inbound_filters.append(AwoooPConversationEvent.event_id.in_(trigger_event_ids))
|
||||
|
||||
inbound_result = await db.execute(
|
||||
select(AwoooPConversationEvent)
|
||||
.where(sa_or(*inbound_filters))
|
||||
.order_by(AwoooPConversationEvent.received_at.desc())
|
||||
.limit(limit)
|
||||
)
|
||||
inbound_by_run: dict[UUID, list[AwoooPConversationEvent]] = defaultdict(list)
|
||||
for event in inbound_result.scalars().all():
|
||||
target_run_id = event.run_id if event.run_id in run_ids_set else None
|
||||
if target_run_id is None:
|
||||
target_run_id = trigger_ref_to_run.get(str(event.provider_event_id))
|
||||
if target_run_id is None:
|
||||
target_run_id = trigger_ref_to_run.get(str(event.event_id))
|
||||
if target_run_id is not None:
|
||||
inbound_by_run[target_run_id].append(event)
|
||||
|
||||
outbound_result = await db.execute(
|
||||
select(AwoooPOutboundMessage)
|
||||
.where(AwoooPOutboundMessage.run_id.in_(run_ids))
|
||||
.order_by(AwoooPOutboundMessage.queued_at.desc())
|
||||
.limit(limit)
|
||||
)
|
||||
outbound_by_run: dict[UUID, list[AwoooPOutboundMessage]] = defaultdict(list)
|
||||
for message in outbound_result.scalars().all():
|
||||
outbound_by_run[message.run_id].append(message)
|
||||
remaining_inbound = max(int(limit), 0)
|
||||
remaining_outbound = max(int(limit), 0)
|
||||
|
||||
for batch in _iter_run_context_batches(runs):
|
||||
if remaining_inbound > 0:
|
||||
inbound_filters = [AwoooPConversationEvent.run_id.in_(batch["run_ids"])]
|
||||
if batch["trigger_refs"]:
|
||||
inbound_filters.append(AwoooPConversationEvent.provider_event_id.in_(
|
||||
batch["trigger_refs"]
|
||||
))
|
||||
if batch["trigger_event_ids"]:
|
||||
inbound_filters.append(AwoooPConversationEvent.event_id.in_(
|
||||
batch["trigger_event_ids"]
|
||||
))
|
||||
|
||||
inbound_result = await db.execute(
|
||||
select(AwoooPConversationEvent)
|
||||
.where(sa_or(*inbound_filters))
|
||||
.order_by(AwoooPConversationEvent.received_at.desc())
|
||||
.limit(remaining_inbound)
|
||||
)
|
||||
inbound_events = list(inbound_result.scalars().all())
|
||||
remaining_inbound = max(remaining_inbound - len(inbound_events), 0)
|
||||
for event in inbound_events:
|
||||
target_run_id = (
|
||||
event.run_id if event.run_id in batch["run_ids_set"] else None
|
||||
)
|
||||
if target_run_id is None:
|
||||
target_run_id = batch["trigger_ref_to_run"].get(
|
||||
str(event.provider_event_id)
|
||||
)
|
||||
if target_run_id is None:
|
||||
target_run_id = batch["trigger_ref_to_run"].get(str(event.event_id))
|
||||
if target_run_id is not None:
|
||||
inbound_by_run[target_run_id].append(event)
|
||||
|
||||
if remaining_outbound > 0:
|
||||
outbound_result = await db.execute(
|
||||
select(AwoooPOutboundMessage)
|
||||
.where(AwoooPOutboundMessage.run_id.in_(batch["run_ids"]))
|
||||
.order_by(AwoooPOutboundMessage.queued_at.desc())
|
||||
.limit(remaining_outbound)
|
||||
)
|
||||
outbound_messages = list(outbound_result.scalars().all())
|
||||
remaining_outbound = max(remaining_outbound - len(outbound_messages), 0)
|
||||
for message in outbound_messages:
|
||||
outbound_by_run[message.run_id].append(message)
|
||||
|
||||
return dict(inbound_by_run), dict(outbound_by_run)
|
||||
|
||||
@@ -2602,6 +2606,36 @@ def _list_filter_context_limit(candidate_count: int) -> int:
|
||||
return min(max(candidate_count * 4, _MAX_LIST_CONTEXT_ROWS), 20_000)
|
||||
|
||||
|
||||
def _iter_run_context_batches(
|
||||
runs: list[AwoooPRunState],
|
||||
) -> list[dict[str, Any]]:
|
||||
"""Split run context lookups below asyncpg's bind-parameter ceiling."""
|
||||
batches: list[dict[str, Any]] = []
|
||||
for start in range(0, len(runs), _RUN_CONTEXT_QUERY_CHUNK_SIZE):
|
||||
batch_runs = runs[start : start + _RUN_CONTEXT_QUERY_CHUNK_SIZE]
|
||||
run_ids = [run.run_id for run in batch_runs]
|
||||
trigger_refs = [str(run.trigger_ref) for run in batch_runs if run.trigger_ref]
|
||||
trigger_ref_to_run = {
|
||||
str(run.trigger_ref): run.run_id
|
||||
for run in batch_runs
|
||||
if run.trigger_ref
|
||||
}
|
||||
trigger_event_ids: list[UUID] = []
|
||||
for trigger_ref in trigger_refs:
|
||||
try:
|
||||
trigger_event_ids.append(uuid.UUID(trigger_ref))
|
||||
except ValueError:
|
||||
continue
|
||||
batches.append({
|
||||
"run_ids": run_ids,
|
||||
"run_ids_set": set(run_ids),
|
||||
"trigger_refs": trigger_refs,
|
||||
"trigger_ref_to_run": trigger_ref_to_run,
|
||||
"trigger_event_ids": trigger_event_ids,
|
||||
})
|
||||
return batches
|
||||
|
||||
|
||||
def _route_label_from_remediation(item: dict[str, Any]) -> str:
|
||||
"""Render remediation MCP route consistently with Telegram / Work Items."""
|
||||
return "/".join(
|
||||
|
||||
@@ -19,6 +19,7 @@ from src.api.v1.platform.operator_runs import (
|
||||
from src.services.ollama_failover_manager import OllamaEndpoint, OllamaRoutingResult
|
||||
from src.services.ollama_health_monitor import HealthReport, HealthStatus
|
||||
from src.services.platform_operator_service import (
|
||||
_RUN_CONTEXT_QUERY_CHUNK_SIZE,
|
||||
_ai_route_health_map,
|
||||
_ai_route_lane_state,
|
||||
_ai_route_policy_order,
|
||||
@@ -31,6 +32,7 @@ from src.services.platform_operator_service import (
|
||||
_cicd_event_item_from_row,
|
||||
_collect_run_incident_ids,
|
||||
_is_source_correlation_applied_link,
|
||||
_iter_run_context_batches,
|
||||
_legacy_mcp_timeline_status,
|
||||
_legacy_mcp_timeline_summary,
|
||||
_list_filter_context_limit,
|
||||
@@ -1973,6 +1975,34 @@ def test_list_filter_context_limit_scales_with_candidate_rows() -> None:
|
||||
assert _list_filter_context_limit(10000) == 20000
|
||||
|
||||
|
||||
def test_run_context_batches_stay_under_asyncpg_parameter_ceiling() -> None:
|
||||
runs = [
|
||||
SimpleNamespace(
|
||||
run_id=UUID(int=index + 1),
|
||||
trigger_ref=str(UUID(int=index + 10_000)),
|
||||
)
|
||||
for index in range((_RUN_CONTEXT_QUERY_CHUNK_SIZE * 2) + 7)
|
||||
]
|
||||
|
||||
batches = _iter_run_context_batches(runs)
|
||||
|
||||
assert [len(batch["run_ids"]) for batch in batches] == [
|
||||
_RUN_CONTEXT_QUERY_CHUNK_SIZE,
|
||||
_RUN_CONTEXT_QUERY_CHUNK_SIZE,
|
||||
7,
|
||||
]
|
||||
assert batches[0]["trigger_ref_to_run"][str(UUID(int=10_000))] == UUID(int=1)
|
||||
for batch in batches:
|
||||
worst_case_inbound_params = (
|
||||
len(batch["run_ids"])
|
||||
+ len(batch["trigger_refs"])
|
||||
+ len(batch["trigger_event_ids"])
|
||||
)
|
||||
assert len(batch["run_ids"]) <= _RUN_CONTEXT_QUERY_CHUNK_SIZE
|
||||
assert worst_case_inbound_params <= _RUN_CONTEXT_QUERY_CHUNK_SIZE * 3
|
||||
assert worst_case_inbound_params < 32_767
|
||||
|
||||
|
||||
def test_timeline_sort_key_normalizes_datetime_and_iso_string() -> None:
|
||||
fallback = datetime(2026, 5, 14, 10, 0, 0)
|
||||
keys = [
|
||||
|
||||
Reference in New Issue
Block a user