fix(awooop): anchor legacy channel events to shadow runs
This commit is contained in:
@@ -30,9 +30,9 @@ import asyncio
|
||||
import hashlib
|
||||
import html
|
||||
import json
|
||||
from datetime import datetime, timezone
|
||||
from datetime import UTC, datetime
|
||||
from typing import Any
|
||||
from uuid import UUID
|
||||
from uuid import NAMESPACE_URL, UUID, uuid5
|
||||
|
||||
import structlog
|
||||
from sqlalchemy import select, text
|
||||
@@ -48,6 +48,75 @@ logger = structlog.get_logger(__name__)
|
||||
_INTERIM_WAIT_SECONDS = 30
|
||||
|
||||
|
||||
def _input_sha256(input_payload: dict[str, Any] | None) -> str | None:
|
||||
"""計算 Run input 的穩定 hash,讓 mirror run 也能保留最小完整性證據。"""
|
||||
if not input_payload:
|
||||
return None
|
||||
canonical = json.dumps(
|
||||
input_payload,
|
||||
sort_keys=True,
|
||||
separators=(",", ":"),
|
||||
ensure_ascii=False,
|
||||
)
|
||||
return hashlib.sha256(canonical.encode()).hexdigest()
|
||||
|
||||
|
||||
async def ensure_completed_shadow_run(
|
||||
db: AsyncSession,
|
||||
*,
|
||||
project_id: str,
|
||||
run_id: UUID,
|
||||
agent_id: str,
|
||||
trigger_type: str,
|
||||
trigger_ref: str | None,
|
||||
input_payload: dict[str, Any] | None = None,
|
||||
) -> bool:
|
||||
"""為 legacy mirror 資料補一筆 completed shadow run。
|
||||
|
||||
AwoooP 在 strangler 階段會先 mirror legacy Telegram / alert-grouping
|
||||
資料。這些事件不應重新觸發 runtime,但需要 run_state 當 Console 的
|
||||
聚合錨點;因此這裡建立的是已完成的 shadow run,不會被 worker pick up。
|
||||
"""
|
||||
result = await db.execute(
|
||||
text("""
|
||||
INSERT INTO awooop_run_state (
|
||||
run_id, project_id, agent_id, state,
|
||||
trigger_type, trigger_ref, is_shadow,
|
||||
input_sha256, created_at, completed_at, timeout_at
|
||||
) VALUES (
|
||||
:run_id, :project_id, :agent_id, 'completed',
|
||||
:trigger_type, :trigger_ref, TRUE,
|
||||
:input_sha256, NOW(), NOW(), NOW()
|
||||
)
|
||||
ON CONFLICT (run_id) DO NOTHING
|
||||
RETURNING run_id
|
||||
"""),
|
||||
{
|
||||
"run_id": run_id,
|
||||
"project_id": project_id,
|
||||
"agent_id": agent_id,
|
||||
"trigger_type": trigger_type,
|
||||
"trigger_ref": trigger_ref,
|
||||
"input_sha256": _input_sha256(input_payload),
|
||||
},
|
||||
)
|
||||
inserted = result.fetchone() is not None
|
||||
if inserted:
|
||||
logger.info(
|
||||
"completed_shadow_run_created",
|
||||
project_id=project_id,
|
||||
run_id=str(run_id),
|
||||
agent_id=agent_id,
|
||||
trigger_type=trigger_type,
|
||||
)
|
||||
return inserted
|
||||
|
||||
|
||||
def build_grouped_alert_run_id(project_id: str, provider_event_id: str) -> UUID:
|
||||
"""為 grouped child alert 建立穩定 run_id,讓 Run Monitor 可回查。"""
|
||||
return uuid5(NAMESPACE_URL, f"awooop:grouped-alert:{project_id}:{provider_event_id}")
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# 入站事件記錄
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
@@ -311,6 +380,22 @@ async def record_grouped_alert_event(
|
||||
)
|
||||
|
||||
async with get_db_context(project_id) as db:
|
||||
run_id = build_grouped_alert_run_id(project_id, provider_event_id)
|
||||
await ensure_completed_shadow_run(
|
||||
db,
|
||||
project_id=project_id,
|
||||
run_id=run_id,
|
||||
agent_id="legacy-alert-grouping",
|
||||
trigger_type="grouped_alert_event",
|
||||
trigger_ref=provider_event_id,
|
||||
input_payload={
|
||||
"alert_id": alert_id,
|
||||
"alertname": alertname,
|
||||
"severity": severity,
|
||||
"group_key": group_key,
|
||||
"fingerprint": fingerprint,
|
||||
},
|
||||
)
|
||||
event_id = await mirror_inbound_event(
|
||||
db,
|
||||
project_id=project_id,
|
||||
@@ -321,7 +406,8 @@ async def record_grouped_alert_event(
|
||||
channel_chat_id=f"alert-group:{group_key}",
|
||||
content_type="text",
|
||||
raw_content=content,
|
||||
provider_ts=datetime.now(timezone.utc),
|
||||
provider_ts=datetime.now(UTC),
|
||||
run_id=run_id,
|
||||
)
|
||||
|
||||
logger.info(
|
||||
@@ -388,6 +474,22 @@ async def record_outbound_message(
|
||||
|
||||
actual_status = "shadow" if is_shadow else send_status
|
||||
|
||||
await ensure_completed_shadow_run(
|
||||
db,
|
||||
project_id=project_id,
|
||||
run_id=run_id,
|
||||
agent_id="legacy-telegram-gateway",
|
||||
trigger_type="legacy_outbound",
|
||||
trigger_ref=provider_message_id,
|
||||
input_payload={
|
||||
"channel_type": channel_type,
|
||||
"channel_chat_id": channel_chat_id,
|
||||
"message_type": message_type,
|
||||
"send_status": actual_status,
|
||||
"triggered_by_state": triggered_by_state,
|
||||
},
|
||||
)
|
||||
|
||||
result = await db.execute(
|
||||
text("""
|
||||
INSERT INTO awooop_outbound_message (
|
||||
@@ -504,7 +606,7 @@ async def _interim_feedback_task(
|
||||
# run 已推進(complete/failed 等),不需要 interim
|
||||
return
|
||||
|
||||
waiting_since = datetime.now(timezone.utc)
|
||||
waiting_since = datetime.now(UTC)
|
||||
interim_content = "AI 正在分析中,請稍候... ⏳"
|
||||
|
||||
await record_outbound_message(
|
||||
|
||||
@@ -2,6 +2,7 @@ from __future__ import annotations
|
||||
|
||||
from src.services.channel_hub import (
|
||||
build_grouped_alert_provider_event_id,
|
||||
build_grouped_alert_run_id,
|
||||
format_grouped_alert_digest_text,
|
||||
format_grouped_alert_event_content,
|
||||
)
|
||||
@@ -17,6 +18,20 @@ def test_build_grouped_alert_provider_event_id_is_deterministic() -> None:
|
||||
assert len(event_id) < 256
|
||||
|
||||
|
||||
def test_build_grouped_alert_run_id_is_stable() -> None:
|
||||
provider_event_id = build_grouped_alert_provider_event_id(
|
||||
"INC-20260507-ABCD12",
|
||||
"1234567890abcdef" * 4,
|
||||
)
|
||||
|
||||
first = build_grouped_alert_run_id("awoooi", provider_event_id)
|
||||
second = build_grouped_alert_run_id("awoooi", provider_event_id)
|
||||
other_project = build_grouped_alert_run_id("ewoooc", provider_event_id)
|
||||
|
||||
assert first == second
|
||||
assert first != other_project
|
||||
|
||||
|
||||
def test_format_grouped_alert_event_content_keeps_operator_context() -> None:
|
||||
content = format_grouped_alert_event_content(
|
||||
alert_id="INC-20260507-ABCD12",
|
||||
|
||||
Reference in New Issue
Block a user