fix(awooop): chunk run context lookups
All checks were successful
CD Pipeline / tests (push) Successful in 1m20s
Code Review / ai-code-review (push) Successful in 13s
CD Pipeline / build-and-deploy (push) Successful in 3m55s
CD Pipeline / post-deploy-checks (push) Successful in 1m47s

This commit is contained in:
Your Name
2026-05-31 16:47:01 +08:00
parent b92025a829
commit aee92bc7a3
2 changed files with 108 additions and 44 deletions

View File

@@ -69,6 +69,7 @@ _MAX_PER_PAGE = 200
_MAX_EVENTS = 100
_MAX_TIMELINE_ITEMS = 100
_MAX_LIST_CONTEXT_ROWS = 500
_RUN_CONTEXT_QUERY_CHUNK_SIZE = 500
_MAX_STEP_SUMMARY_CHARS = 128
_AI_ROUTE_STATUS_SELECT_TIMEOUT_SECONDS = 12.0
_AI_ROUTE_STATUS_CONNECTIVITY_TIMEOUT_SECONDS = 2.5
@@ -2548,52 +2549,55 @@ async def _load_run_message_context(
if not runs:
return {}, {}
run_ids = [run.run_id for run in runs]
run_ids_set = set(run_ids)
trigger_refs = [str(run.trigger_ref) for run in runs if run.trigger_ref]
trigger_ref_to_run = {
str(run.trigger_ref): run.run_id
for run in runs
if run.trigger_ref
}
trigger_event_ids: list[UUID] = []
for trigger_ref in trigger_refs:
try:
trigger_event_ids.append(uuid.UUID(trigger_ref))
except ValueError:
continue
inbound_filters = [AwoooPConversationEvent.run_id.in_(run_ids)]
if trigger_refs:
inbound_filters.append(AwoooPConversationEvent.provider_event_id.in_(trigger_refs))
if trigger_event_ids:
inbound_filters.append(AwoooPConversationEvent.event_id.in_(trigger_event_ids))
inbound_result = await db.execute(
select(AwoooPConversationEvent)
.where(sa_or(*inbound_filters))
.order_by(AwoooPConversationEvent.received_at.desc())
.limit(limit)
)
inbound_by_run: dict[UUID, list[AwoooPConversationEvent]] = defaultdict(list)
for event in inbound_result.scalars().all():
target_run_id = event.run_id if event.run_id in run_ids_set else None
if target_run_id is None:
target_run_id = trigger_ref_to_run.get(str(event.provider_event_id))
if target_run_id is None:
target_run_id = trigger_ref_to_run.get(str(event.event_id))
if target_run_id is not None:
inbound_by_run[target_run_id].append(event)
outbound_result = await db.execute(
select(AwoooPOutboundMessage)
.where(AwoooPOutboundMessage.run_id.in_(run_ids))
.order_by(AwoooPOutboundMessage.queued_at.desc())
.limit(limit)
)
outbound_by_run: dict[UUID, list[AwoooPOutboundMessage]] = defaultdict(list)
for message in outbound_result.scalars().all():
outbound_by_run[message.run_id].append(message)
remaining_inbound = max(int(limit), 0)
remaining_outbound = max(int(limit), 0)
for batch in _iter_run_context_batches(runs):
if remaining_inbound > 0:
inbound_filters = [AwoooPConversationEvent.run_id.in_(batch["run_ids"])]
if batch["trigger_refs"]:
inbound_filters.append(AwoooPConversationEvent.provider_event_id.in_(
batch["trigger_refs"]
))
if batch["trigger_event_ids"]:
inbound_filters.append(AwoooPConversationEvent.event_id.in_(
batch["trigger_event_ids"]
))
inbound_result = await db.execute(
select(AwoooPConversationEvent)
.where(sa_or(*inbound_filters))
.order_by(AwoooPConversationEvent.received_at.desc())
.limit(remaining_inbound)
)
inbound_events = list(inbound_result.scalars().all())
remaining_inbound = max(remaining_inbound - len(inbound_events), 0)
for event in inbound_events:
target_run_id = (
event.run_id if event.run_id in batch["run_ids_set"] else None
)
if target_run_id is None:
target_run_id = batch["trigger_ref_to_run"].get(
str(event.provider_event_id)
)
if target_run_id is None:
target_run_id = batch["trigger_ref_to_run"].get(str(event.event_id))
if target_run_id is not None:
inbound_by_run[target_run_id].append(event)
if remaining_outbound > 0:
outbound_result = await db.execute(
select(AwoooPOutboundMessage)
.where(AwoooPOutboundMessage.run_id.in_(batch["run_ids"]))
.order_by(AwoooPOutboundMessage.queued_at.desc())
.limit(remaining_outbound)
)
outbound_messages = list(outbound_result.scalars().all())
remaining_outbound = max(remaining_outbound - len(outbound_messages), 0)
for message in outbound_messages:
outbound_by_run[message.run_id].append(message)
return dict(inbound_by_run), dict(outbound_by_run)
@@ -2602,6 +2606,36 @@ def _list_filter_context_limit(candidate_count: int) -> int:
return min(max(candidate_count * 4, _MAX_LIST_CONTEXT_ROWS), 20_000)
def _iter_run_context_batches(
runs: list[AwoooPRunState],
) -> list[dict[str, Any]]:
"""Split run context lookups below asyncpg's bind-parameter ceiling."""
batches: list[dict[str, Any]] = []
for start in range(0, len(runs), _RUN_CONTEXT_QUERY_CHUNK_SIZE):
batch_runs = runs[start : start + _RUN_CONTEXT_QUERY_CHUNK_SIZE]
run_ids = [run.run_id for run in batch_runs]
trigger_refs = [str(run.trigger_ref) for run in batch_runs if run.trigger_ref]
trigger_ref_to_run = {
str(run.trigger_ref): run.run_id
for run in batch_runs
if run.trigger_ref
}
trigger_event_ids: list[UUID] = []
for trigger_ref in trigger_refs:
try:
trigger_event_ids.append(uuid.UUID(trigger_ref))
except ValueError:
continue
batches.append({
"run_ids": run_ids,
"run_ids_set": set(run_ids),
"trigger_refs": trigger_refs,
"trigger_ref_to_run": trigger_ref_to_run,
"trigger_event_ids": trigger_event_ids,
})
return batches
def _route_label_from_remediation(item: dict[str, Any]) -> str:
"""Render remediation MCP route consistently with Telegram / Work Items."""
return "/".join(

View File

@@ -19,6 +19,7 @@ from src.api.v1.platform.operator_runs import (
from src.services.ollama_failover_manager import OllamaEndpoint, OllamaRoutingResult
from src.services.ollama_health_monitor import HealthReport, HealthStatus
from src.services.platform_operator_service import (
_RUN_CONTEXT_QUERY_CHUNK_SIZE,
_ai_route_health_map,
_ai_route_lane_state,
_ai_route_policy_order,
@@ -31,6 +32,7 @@ from src.services.platform_operator_service import (
_cicd_event_item_from_row,
_collect_run_incident_ids,
_is_source_correlation_applied_link,
_iter_run_context_batches,
_legacy_mcp_timeline_status,
_legacy_mcp_timeline_summary,
_list_filter_context_limit,
@@ -1973,6 +1975,34 @@ def test_list_filter_context_limit_scales_with_candidate_rows() -> None:
assert _list_filter_context_limit(10000) == 20000
def test_run_context_batches_stay_under_asyncpg_parameter_ceiling() -> None:
runs = [
SimpleNamespace(
run_id=UUID(int=index + 1),
trigger_ref=str(UUID(int=index + 10_000)),
)
for index in range((_RUN_CONTEXT_QUERY_CHUNK_SIZE * 2) + 7)
]
batches = _iter_run_context_batches(runs)
assert [len(batch["run_ids"]) for batch in batches] == [
_RUN_CONTEXT_QUERY_CHUNK_SIZE,
_RUN_CONTEXT_QUERY_CHUNK_SIZE,
7,
]
assert batches[0]["trigger_ref_to_run"][str(UUID(int=10_000))] == UUID(int=1)
for batch in batches:
worst_case_inbound_params = (
len(batch["run_ids"])
+ len(batch["trigger_refs"])
+ len(batch["trigger_event_ids"])
)
assert len(batch["run_ids"]) <= _RUN_CONTEXT_QUERY_CHUNK_SIZE
assert worst_case_inbound_params <= _RUN_CONTEXT_QUERY_CHUNK_SIZE * 3
assert worst_case_inbound_params < 32_767
def test_timeline_sort_key_normalizes_datetime_and_iso_string() -> None:
fallback = datetime(2026, 5, 14, 10, 0, 0)
keys = [