fix(awooop): anchor legacy channel events to shadow runs
All checks were successful
Code Review / ai-code-review (push) Successful in 10s
CD Pipeline / tests (push) Successful in 1m13s
CD Pipeline / build-and-deploy (push) Successful in 4m9s
CD Pipeline / post-deploy-checks (push) Successful in 1m20s

This commit is contained in:
Your Name
2026-05-07 10:12:52 +08:00
parent 200b760512
commit 5d38115d2f
2 changed files with 121 additions and 4 deletions

View File

@@ -30,9 +30,9 @@ import asyncio
import hashlib
import html
import json
from datetime import datetime, timezone
from datetime import UTC, datetime
from typing import Any
from uuid import UUID
from uuid import NAMESPACE_URL, UUID, uuid5
import structlog
from sqlalchemy import select, text
@@ -48,6 +48,75 @@ logger = structlog.get_logger(__name__)
_INTERIM_WAIT_SECONDS = 30
def _input_sha256(input_payload: dict[str, Any] | None) -> str | None:
"""計算 Run input 的穩定 hash讓 mirror run 也能保留最小完整性證據。"""
if not input_payload:
return None
canonical = json.dumps(
input_payload,
sort_keys=True,
separators=(",", ":"),
ensure_ascii=False,
)
return hashlib.sha256(canonical.encode()).hexdigest()
async def ensure_completed_shadow_run(
db: AsyncSession,
*,
project_id: str,
run_id: UUID,
agent_id: str,
trigger_type: str,
trigger_ref: str | None,
input_payload: dict[str, Any] | None = None,
) -> bool:
"""為 legacy mirror 資料補一筆 completed shadow run。
AwoooP 在 strangler 階段會先 mirror legacy Telegram / alert-grouping
資料。這些事件不應重新觸發 runtime但需要 run_state 當 Console 的
聚合錨點;因此這裡建立的是已完成的 shadow run不會被 worker pick up。
"""
result = await db.execute(
text("""
INSERT INTO awooop_run_state (
run_id, project_id, agent_id, state,
trigger_type, trigger_ref, is_shadow,
input_sha256, created_at, completed_at, timeout_at
) VALUES (
:run_id, :project_id, :agent_id, 'completed',
:trigger_type, :trigger_ref, TRUE,
:input_sha256, NOW(), NOW(), NOW()
)
ON CONFLICT (run_id) DO NOTHING
RETURNING run_id
"""),
{
"run_id": run_id,
"project_id": project_id,
"agent_id": agent_id,
"trigger_type": trigger_type,
"trigger_ref": trigger_ref,
"input_sha256": _input_sha256(input_payload),
},
)
inserted = result.fetchone() is not None
if inserted:
logger.info(
"completed_shadow_run_created",
project_id=project_id,
run_id=str(run_id),
agent_id=agent_id,
trigger_type=trigger_type,
)
return inserted
def build_grouped_alert_run_id(project_id: str, provider_event_id: str) -> UUID:
"""為 grouped child alert 建立穩定 run_id讓 Run Monitor 可回查。"""
return uuid5(NAMESPACE_URL, f"awooop:grouped-alert:{project_id}:{provider_event_id}")
# ─────────────────────────────────────────────────────────────────────────────
# 入站事件記錄
# ─────────────────────────────────────────────────────────────────────────────
@@ -311,6 +380,22 @@ async def record_grouped_alert_event(
)
async with get_db_context(project_id) as db:
run_id = build_grouped_alert_run_id(project_id, provider_event_id)
await ensure_completed_shadow_run(
db,
project_id=project_id,
run_id=run_id,
agent_id="legacy-alert-grouping",
trigger_type="grouped_alert_event",
trigger_ref=provider_event_id,
input_payload={
"alert_id": alert_id,
"alertname": alertname,
"severity": severity,
"group_key": group_key,
"fingerprint": fingerprint,
},
)
event_id = await mirror_inbound_event(
db,
project_id=project_id,
@@ -321,7 +406,8 @@ async def record_grouped_alert_event(
channel_chat_id=f"alert-group:{group_key}",
content_type="text",
raw_content=content,
provider_ts=datetime.now(timezone.utc),
provider_ts=datetime.now(UTC),
run_id=run_id,
)
logger.info(
@@ -388,6 +474,22 @@ async def record_outbound_message(
actual_status = "shadow" if is_shadow else send_status
await ensure_completed_shadow_run(
db,
project_id=project_id,
run_id=run_id,
agent_id="legacy-telegram-gateway",
trigger_type="legacy_outbound",
trigger_ref=provider_message_id,
input_payload={
"channel_type": channel_type,
"channel_chat_id": channel_chat_id,
"message_type": message_type,
"send_status": actual_status,
"triggered_by_state": triggered_by_state,
},
)
result = await db.execute(
text("""
INSERT INTO awooop_outbound_message (
@@ -504,7 +606,7 @@ async def _interim_feedback_task(
# run 已推進complete/failed 等),不需要 interim
return
waiting_since = datetime.now(timezone.utc)
waiting_since = datetime.now(UTC)
interim_content = "AI 正在分析中,請稍候... ⏳"
await record_outbound_message(

View File

@@ -2,6 +2,7 @@ from __future__ import annotations
from src.services.channel_hub import (
build_grouped_alert_provider_event_id,
build_grouped_alert_run_id,
format_grouped_alert_digest_text,
format_grouped_alert_event_content,
)
@@ -17,6 +18,20 @@ def test_build_grouped_alert_provider_event_id_is_deterministic() -> None:
assert len(event_id) < 256
def test_build_grouped_alert_run_id_is_stable() -> None:
provider_event_id = build_grouped_alert_provider_event_id(
"INC-20260507-ABCD12",
"1234567890abcdef" * 4,
)
first = build_grouped_alert_run_id("awoooi", provider_event_id)
second = build_grouped_alert_run_id("awoooi", provider_event_id)
other_project = build_grouped_alert_run_id("ewoooc", provider_event_id)
assert first == second
assert first != other_project
def test_format_grouped_alert_event_content_keeps_operator_context() -> None:
content = format_grouped_alert_event_content(
alert_id="INC-20260507-ABCD12",