diff --git a/apps/api/src/services/platform_operator_service.py b/apps/api/src/services/platform_operator_service.py index 3bb86a35..b4e75464 100644 --- a/apps/api/src/services/platform_operator_service.py +++ b/apps/api/src/services/platform_operator_service.py @@ -69,6 +69,7 @@ _MAX_PER_PAGE = 200 _MAX_EVENTS = 100 _MAX_TIMELINE_ITEMS = 100 _MAX_LIST_CONTEXT_ROWS = 500 +_RUN_CONTEXT_QUERY_CHUNK_SIZE = 500 _MAX_STEP_SUMMARY_CHARS = 128 _AI_ROUTE_STATUS_SELECT_TIMEOUT_SECONDS = 12.0 _AI_ROUTE_STATUS_CONNECTIVITY_TIMEOUT_SECONDS = 2.5 @@ -2548,52 +2549,55 @@ async def _load_run_message_context( if not runs: return {}, {} - run_ids = [run.run_id for run in runs] - run_ids_set = set(run_ids) - trigger_refs = [str(run.trigger_ref) for run in runs if run.trigger_ref] - trigger_ref_to_run = { - str(run.trigger_ref): run.run_id - for run in runs - if run.trigger_ref - } - trigger_event_ids: list[UUID] = [] - for trigger_ref in trigger_refs: - try: - trigger_event_ids.append(uuid.UUID(trigger_ref)) - except ValueError: - continue - - inbound_filters = [AwoooPConversationEvent.run_id.in_(run_ids)] - if trigger_refs: - inbound_filters.append(AwoooPConversationEvent.provider_event_id.in_(trigger_refs)) - if trigger_event_ids: - inbound_filters.append(AwoooPConversationEvent.event_id.in_(trigger_event_ids)) - - inbound_result = await db.execute( - select(AwoooPConversationEvent) - .where(sa_or(*inbound_filters)) - .order_by(AwoooPConversationEvent.received_at.desc()) - .limit(limit) - ) inbound_by_run: dict[UUID, list[AwoooPConversationEvent]] = defaultdict(list) - for event in inbound_result.scalars().all(): - target_run_id = event.run_id if event.run_id in run_ids_set else None - if target_run_id is None: - target_run_id = trigger_ref_to_run.get(str(event.provider_event_id)) - if target_run_id is None: - target_run_id = trigger_ref_to_run.get(str(event.event_id)) - if target_run_id is not None: - inbound_by_run[target_run_id].append(event) - - outbound_result = await db.execute( - select(AwoooPOutboundMessage) - .where(AwoooPOutboundMessage.run_id.in_(run_ids)) - .order_by(AwoooPOutboundMessage.queued_at.desc()) - .limit(limit) - ) outbound_by_run: dict[UUID, list[AwoooPOutboundMessage]] = defaultdict(list) - for message in outbound_result.scalars().all(): - outbound_by_run[message.run_id].append(message) + remaining_inbound = max(int(limit), 0) + remaining_outbound = max(int(limit), 0) + + for batch in _iter_run_context_batches(runs): + if remaining_inbound > 0: + inbound_filters = [AwoooPConversationEvent.run_id.in_(batch["run_ids"])] + if batch["trigger_refs"]: + inbound_filters.append(AwoooPConversationEvent.provider_event_id.in_( + batch["trigger_refs"] + )) + if batch["trigger_event_ids"]: + inbound_filters.append(AwoooPConversationEvent.event_id.in_( + batch["trigger_event_ids"] + )) + + inbound_result = await db.execute( + select(AwoooPConversationEvent) + .where(sa_or(*inbound_filters)) + .order_by(AwoooPConversationEvent.received_at.desc()) + .limit(remaining_inbound) + ) + inbound_events = list(inbound_result.scalars().all()) + remaining_inbound = max(remaining_inbound - len(inbound_events), 0) + for event in inbound_events: + target_run_id = ( + event.run_id if event.run_id in batch["run_ids_set"] else None + ) + if target_run_id is None: + target_run_id = batch["trigger_ref_to_run"].get( + str(event.provider_event_id) + ) + if target_run_id is None: + target_run_id = batch["trigger_ref_to_run"].get(str(event.event_id)) + if target_run_id is not None: + inbound_by_run[target_run_id].append(event) + + if remaining_outbound > 0: + outbound_result = await db.execute( + select(AwoooPOutboundMessage) + .where(AwoooPOutboundMessage.run_id.in_(batch["run_ids"])) + .order_by(AwoooPOutboundMessage.queued_at.desc()) + .limit(remaining_outbound) + ) + outbound_messages = list(outbound_result.scalars().all()) + remaining_outbound = max(remaining_outbound - len(outbound_messages), 0) + for message in outbound_messages: + outbound_by_run[message.run_id].append(message) return dict(inbound_by_run), dict(outbound_by_run) @@ -2602,6 +2606,36 @@ def _list_filter_context_limit(candidate_count: int) -> int: return min(max(candidate_count * 4, _MAX_LIST_CONTEXT_ROWS), 20_000) +def _iter_run_context_batches( + runs: list[AwoooPRunState], +) -> list[dict[str, Any]]: + """Split run context lookups below asyncpg's bind-parameter ceiling.""" + batches: list[dict[str, Any]] = [] + for start in range(0, len(runs), _RUN_CONTEXT_QUERY_CHUNK_SIZE): + batch_runs = runs[start : start + _RUN_CONTEXT_QUERY_CHUNK_SIZE] + run_ids = [run.run_id for run in batch_runs] + trigger_refs = [str(run.trigger_ref) for run in batch_runs if run.trigger_ref] + trigger_ref_to_run = { + str(run.trigger_ref): run.run_id + for run in batch_runs + if run.trigger_ref + } + trigger_event_ids: list[UUID] = [] + for trigger_ref in trigger_refs: + try: + trigger_event_ids.append(uuid.UUID(trigger_ref)) + except ValueError: + continue + batches.append({ + "run_ids": run_ids, + "run_ids_set": set(run_ids), + "trigger_refs": trigger_refs, + "trigger_ref_to_run": trigger_ref_to_run, + "trigger_event_ids": trigger_event_ids, + }) + return batches + + def _route_label_from_remediation(item: dict[str, Any]) -> str: """Render remediation MCP route consistently with Telegram / Work Items.""" return "/".join( diff --git a/apps/api/tests/test_awooop_operator_timeline_labels.py b/apps/api/tests/test_awooop_operator_timeline_labels.py index bcbce193..6a9cc378 100644 --- a/apps/api/tests/test_awooop_operator_timeline_labels.py +++ b/apps/api/tests/test_awooop_operator_timeline_labels.py @@ -19,6 +19,7 @@ from src.api.v1.platform.operator_runs import ( from src.services.ollama_failover_manager import OllamaEndpoint, OllamaRoutingResult from src.services.ollama_health_monitor import HealthReport, HealthStatus from src.services.platform_operator_service import ( + _RUN_CONTEXT_QUERY_CHUNK_SIZE, _ai_route_health_map, _ai_route_lane_state, _ai_route_policy_order, @@ -31,6 +32,7 @@ from src.services.platform_operator_service import ( _cicd_event_item_from_row, _collect_run_incident_ids, _is_source_correlation_applied_link, + _iter_run_context_batches, _legacy_mcp_timeline_status, _legacy_mcp_timeline_summary, _list_filter_context_limit, @@ -1973,6 +1975,34 @@ def test_list_filter_context_limit_scales_with_candidate_rows() -> None: assert _list_filter_context_limit(10000) == 20000 +def test_run_context_batches_stay_under_asyncpg_parameter_ceiling() -> None: + runs = [ + SimpleNamespace( + run_id=UUID(int=index + 1), + trigger_ref=str(UUID(int=index + 10_000)), + ) + for index in range((_RUN_CONTEXT_QUERY_CHUNK_SIZE * 2) + 7) + ] + + batches = _iter_run_context_batches(runs) + + assert [len(batch["run_ids"]) for batch in batches] == [ + _RUN_CONTEXT_QUERY_CHUNK_SIZE, + _RUN_CONTEXT_QUERY_CHUNK_SIZE, + 7, + ] + assert batches[0]["trigger_ref_to_run"][str(UUID(int=10_000))] == UUID(int=1) + for batch in batches: + worst_case_inbound_params = ( + len(batch["run_ids"]) + + len(batch["trigger_refs"]) + + len(batch["trigger_event_ids"]) + ) + assert len(batch["run_ids"]) <= _RUN_CONTEXT_QUERY_CHUNK_SIZE + assert worst_case_inbound_params <= _RUN_CONTEXT_QUERY_CHUNK_SIZE * 3 + assert worst_case_inbound_params < 32_767 + + def test_timeline_sort_key_normalizes_datetime_and_iso_string() -> None: fallback = datetime(2026, 5, 14, 10, 0, 0) keys = [