diff --git a/apps/api/src/services/channel_hub.py b/apps/api/src/services/channel_hub.py index 93ca04db..e1048233 100644 --- a/apps/api/src/services/channel_hub.py +++ b/apps/api/src/services/channel_hub.py @@ -30,9 +30,9 @@ import asyncio import hashlib import html import json -from datetime import datetime, timezone +from datetime import UTC, datetime from typing import Any -from uuid import UUID +from uuid import NAMESPACE_URL, UUID, uuid5 import structlog from sqlalchemy import select, text @@ -48,6 +48,75 @@ logger = structlog.get_logger(__name__) _INTERIM_WAIT_SECONDS = 30 +def _input_sha256(input_payload: dict[str, Any] | None) -> str | None: + """計算 Run input 的穩定 hash,讓 mirror run 也能保留最小完整性證據。""" + if not input_payload: + return None + canonical = json.dumps( + input_payload, + sort_keys=True, + separators=(",", ":"), + ensure_ascii=False, + ) + return hashlib.sha256(canonical.encode()).hexdigest() + + +async def ensure_completed_shadow_run( + db: AsyncSession, + *, + project_id: str, + run_id: UUID, + agent_id: str, + trigger_type: str, + trigger_ref: str | None, + input_payload: dict[str, Any] | None = None, +) -> bool: + """為 legacy mirror 資料補一筆 completed shadow run。 + + AwoooP 在 strangler 階段會先 mirror legacy Telegram / alert-grouping + 資料。這些事件不應重新觸發 runtime,但需要 run_state 當 Console 的 + 聚合錨點;因此這裡建立的是已完成的 shadow run,不會被 worker pick up。 + """ + result = await db.execute( + text(""" + INSERT INTO awooop_run_state ( + run_id, project_id, agent_id, state, + trigger_type, trigger_ref, is_shadow, + input_sha256, created_at, completed_at, timeout_at + ) VALUES ( + :run_id, :project_id, :agent_id, 'completed', + :trigger_type, :trigger_ref, TRUE, + :input_sha256, NOW(), NOW(), NOW() + ) + ON CONFLICT (run_id) DO NOTHING + RETURNING run_id + """), + { + "run_id": run_id, + "project_id": project_id, + "agent_id": agent_id, + "trigger_type": trigger_type, + "trigger_ref": trigger_ref, + "input_sha256": _input_sha256(input_payload), + }, + ) + inserted = result.fetchone() is not None + if inserted: + logger.info( + "completed_shadow_run_created", + project_id=project_id, + run_id=str(run_id), + agent_id=agent_id, + trigger_type=trigger_type, + ) + return inserted + + +def build_grouped_alert_run_id(project_id: str, provider_event_id: str) -> UUID: + """為 grouped child alert 建立穩定 run_id,讓 Run Monitor 可回查。""" + return uuid5(NAMESPACE_URL, f"awooop:grouped-alert:{project_id}:{provider_event_id}") + + # ───────────────────────────────────────────────────────────────────────────── # 入站事件記錄 # ───────────────────────────────────────────────────────────────────────────── @@ -311,6 +380,22 @@ async def record_grouped_alert_event( ) async with get_db_context(project_id) as db: + run_id = build_grouped_alert_run_id(project_id, provider_event_id) + await ensure_completed_shadow_run( + db, + project_id=project_id, + run_id=run_id, + agent_id="legacy-alert-grouping", + trigger_type="grouped_alert_event", + trigger_ref=provider_event_id, + input_payload={ + "alert_id": alert_id, + "alertname": alertname, + "severity": severity, + "group_key": group_key, + "fingerprint": fingerprint, + }, + ) event_id = await mirror_inbound_event( db, project_id=project_id, @@ -321,7 +406,8 @@ async def record_grouped_alert_event( channel_chat_id=f"alert-group:{group_key}", content_type="text", raw_content=content, - provider_ts=datetime.now(timezone.utc), + provider_ts=datetime.now(UTC), + run_id=run_id, ) logger.info( @@ -388,6 +474,22 @@ async def record_outbound_message( actual_status = "shadow" if is_shadow else send_status + await ensure_completed_shadow_run( + db, + project_id=project_id, + run_id=run_id, + agent_id="legacy-telegram-gateway", + trigger_type="legacy_outbound", + trigger_ref=provider_message_id, + input_payload={ + "channel_type": channel_type, + "channel_chat_id": channel_chat_id, + "message_type": message_type, + "send_status": actual_status, + "triggered_by_state": triggered_by_state, + }, + ) + result = await db.execute( text(""" INSERT INTO awooop_outbound_message ( @@ -504,7 +606,7 @@ async def _interim_feedback_task( # run 已推進(complete/failed 等),不需要 interim return - waiting_since = datetime.now(timezone.utc) + waiting_since = datetime.now(UTC) interim_content = "AI 正在分析中,請稍候... ⏳" await record_outbound_message( diff --git a/apps/api/tests/test_channel_hub_grouped_alert_events.py b/apps/api/tests/test_channel_hub_grouped_alert_events.py index 1cae72ea..685223de 100644 --- a/apps/api/tests/test_channel_hub_grouped_alert_events.py +++ b/apps/api/tests/test_channel_hub_grouped_alert_events.py @@ -2,6 +2,7 @@ from __future__ import annotations from src.services.channel_hub import ( build_grouped_alert_provider_event_id, + build_grouped_alert_run_id, format_grouped_alert_digest_text, format_grouped_alert_event_content, ) @@ -17,6 +18,20 @@ def test_build_grouped_alert_provider_event_id_is_deterministic() -> None: assert len(event_id) < 256 +def test_build_grouped_alert_run_id_is_stable() -> None: + provider_event_id = build_grouped_alert_provider_event_id( + "INC-20260507-ABCD12", + "1234567890abcdef" * 4, + ) + + first = build_grouped_alert_run_id("awoooi", provider_event_id) + second = build_grouped_alert_run_id("awoooi", provider_event_id) + other_project = build_grouped_alert_run_id("ewoooc", provider_event_id) + + assert first == second + assert first != other_project + + def test_format_grouped_alert_event_content_keeps_operator_context() -> None: content = format_grouped_alert_event_content( alert_id="INC-20260507-ABCD12",