From ced36f2521d2f3fb9d55f6b0794873dc17ccd395 Mon Sep 17 00:00:00 2001 From: Your Name Date: Wed, 20 May 2026 19:31:54 +0800 Subject: [PATCH] feat(awooop): add source provider freshness heartbeat --- .gitea/workflows/e2e-health.yaml | 10 ++ apps/api/src/api/v1/platform/events.py | 121 ++++++++++++- apps/api/src/services/channel_hub.py | 9 +- .../tests/test_alert_chain_smoke_metric.py | 56 +++++- .../test_channel_hub_grouped_alert_events.py | 26 +++ ...test_platform_events_provider_heartbeat.py | 51 ++++++ scripts/alert_chain_smoke_test.py | 167 +++++++++++++++++- 7 files changed, 430 insertions(+), 10 deletions(-) create mode 100644 apps/api/tests/test_platform_events_provider_heartbeat.py diff --git a/.gitea/workflows/e2e-health.yaml b/.gitea/workflows/e2e-health.yaml index c92859a3..aa3a69bf 100644 --- a/.gitea/workflows/e2e-health.yaml +++ b/.gitea/workflows/e2e-health.yaml @@ -51,6 +51,16 @@ jobs: echo "status=failed" >> $GITHUB_OUTPUT exit 1 + - name: Source Provider Freshness Smoke + run: | + python3 scripts/alert_chain_smoke_test.py \ + --api-url https://awoooi.wooo.work \ + --source-provider-heartbeat \ + --json + env: + AWOOOP_OPERATOR_API_KEY: ${{ secrets.AWOOOP_OPERATOR_API_KEY }} + AWOOOP_OPERATOR_ID: gitea-e2e-health + - name: Notify Telegram on Failure if: failure() run: | diff --git a/apps/api/src/api/v1/platform/events.py b/apps/api/src/api/v1/platform/events.py index 5533dfdd..75b1d23d 100644 --- a/apps/api/src/api/v1/platform/events.py +++ b/apps/api/src/api/v1/platform/events.py @@ -6,13 +6,17 @@ AwoooP Operator Console — Channel Events API from __future__ import annotations -from datetime import datetime -from typing import Any +from datetime import UTC, datetime +from typing import Annotated, Any, Literal from uuid import UUID -from fastapi import APIRouter, HTTPException, Query +from fastapi import APIRouter, Depends, HTTPException, Query from pydantic import BaseModel, Field +from src.core.awooop_operator_auth import ( + AwoooPOperatorPrincipal, + verify_awooop_operator, +) from src.services.channel_event_dossier_service import ( RecurrenceWorkItemHandoffKind, RecurrenceWorkItemMode, @@ -24,6 +28,7 @@ from src.services.channel_event_dossier_service import ( fetch_recurrence_work_item_handoff, fetch_recurrence_work_item_preview, ) +from src.services.channel_hub import record_external_alert_event from src.services.platform_operator_service import list_recent_channel_events router = APIRouter() @@ -122,6 +127,38 @@ class ChannelEventDossierCoverageResponse(BaseModel): providers: list[ChannelEventProviderCoverage] +SourceProviderName = Literal["sentry", "signoz"] + + +class SourceProviderHeartbeatRequest(BaseModel): + """Low-noise freshness heartbeat for external source-provider mirrors.""" + + project_id: str = Field(default="awoooi", min_length=1, max_length=64) + providers: list[SourceProviderName] = Field( + default_factory=lambda: ["sentry", "signoz"], + min_length=1, + max_length=2, + ) + reason: str = Field( + default="scheduled_provider_freshness_smoke", + min_length=1, + max_length=120, + ) + run_ref: str | None = Field(default=None, max_length=120) + + +class SourceProviderHeartbeatItem(BaseModel): + provider: SourceProviderName + event_id: str + conversation_event_id: UUID + + +class SourceProviderHeartbeatResponse(BaseModel): + status: str + project_id: str + items: list[SourceProviderHeartbeatItem] + + class ChannelEventRecurrenceSummary(BaseModel): source_event_total: int recurrence_group_total: int @@ -245,6 +282,84 @@ async def get_event_dossier_coverage( ) +@router.post( + "/events/dossier/provider-heartbeat", + response_model=SourceProviderHeartbeatResponse, + summary="寫入 Sentry / SignOz 來源卷宗 freshness heartbeat", + description=( + "受 AwoooP operator key 保護的低噪音 smoke。只寫入來源卷宗與" + "completed shadow run,不建立 Incident、不送 Telegram、不宣稱真實上游告警。" + ), +) +async def create_source_provider_heartbeat( + payload: SourceProviderHeartbeatRequest, + operator: Annotated[ + AwoooPOperatorPrincipal, + Depends(verify_awooop_operator), + ], +) -> dict[str, Any]: + timestamp = datetime.now(UTC).strftime("%Y%m%dT%H%M%SZ") + items: list[dict[str, Any]] = [] + + for provider in payload.providers: + event_id = f"heartbeat-{timestamp}" + event_uuid = await record_external_alert_event( + project_id=payload.project_id, + provider=provider, + event_id=event_id, + stage="heartbeat", + title="SourceProviderHeartbeat", + severity="info", + namespace="awoooi-prod", + target_resource="source-provider-ingestion", + fingerprint=f"source-provider-heartbeat:{provider}", + labels={ + "provider": provider, + "synthetic": "true", + "alert_category": "alertchain_provider_freshness", + "telegram": "not_sent", + "incident": "not_created", + }, + annotations={ + "summary": ( + "Low-noise provider freshness smoke; verifies AwoooP " + "source dossier ingestion without creating an incident." + ), + "reason": payload.reason, + }, + payload={ + "reason": payload.reason, + "run_ref": payload.run_ref, + "operator_id": operator.operator_id, + "auth_method": operator.auth_method, + "synthetic": True, + "side_effects": { + "incident_created": False, + "telegram_sent": False, + "approval_created": False, + }, + }, + ) + if event_uuid is None: + raise HTTPException( + status_code=500, + detail=f"{provider} provider heartbeat was not recorded", + ) + items.append( + { + "provider": provider, + "event_id": event_id, + "conversation_event_id": event_uuid, + } + ) + + return { + "status": "recorded", + "project_id": payload.project_id, + "items": items, + } + + @router.get( "/events/dossier/recurrence", response_model=ChannelEventRecurrenceResponse, diff --git a/apps/api/src/services/channel_hub.py b/apps/api/src/services/channel_hub.py index 8c9d656f..a88753c8 100644 --- a/apps/api/src/services/channel_hub.py +++ b/apps/api/src/services/channel_hub.py @@ -85,6 +85,7 @@ def build_inbound_source_envelope( content_sha256 = hashlib.sha256(raw_content.encode()).hexdigest() if raw_content else None text_refs = _INCIDENT_ID_RE.findall(raw_content or "") provider_name = str(provider or "unknown").strip().lower() or "unknown" + is_provider_heartbeat = str(stage or "").strip().lower() == "heartbeat" source_refs = { "event_ids": _compact_unique([raw_event_id]), "incident_ids": _compact_unique([incident_id, *text_refs]), @@ -92,10 +93,14 @@ def build_inbound_source_envelope( "alert_ids": _compact_unique([provider_event_id, raw_event_id]), "fingerprints": _compact_unique([fingerprint]), "sentry_issue_ids": _compact_unique( - [raw_event_id, provider_event_id] if provider_name == "sentry" else [] + [raw_event_id, provider_event_id] + if provider_name == "sentry" and not is_provider_heartbeat + else [] ), "signoz_alerts": _compact_unique( - [raw_event_id, alertname] if provider_name == "signoz" else [] + [raw_event_id, alertname] + if provider_name == "signoz" and not is_provider_heartbeat + else [] ), } envelope: dict[str, Any] = { diff --git a/apps/api/tests/test_alert_chain_smoke_metric.py b/apps/api/tests/test_alert_chain_smoke_metric.py index 6161ae31..f8bdc2ca 100644 --- a/apps/api/tests/test_alert_chain_smoke_metric.py +++ b/apps/api/tests/test_alert_chain_smoke_metric.py @@ -6,7 +6,6 @@ import time import unittest from pathlib import Path - SCRIPT_PATH = Path(__file__).resolve().parents[3] / "scripts" / "alert_chain_smoke_test.py" SPEC = importlib.util.spec_from_file_location("alert_chain_smoke_test", SCRIPT_PATH) alert_chain_smoke_test = importlib.util.module_from_spec(SPEC) @@ -79,6 +78,61 @@ class AlertChainSmokeMetricTest(unittest.TestCase): self.assertFalse(result.passed) self.assertTrue(result.critical) + def test_source_provider_heartbeat_requires_operator_key(self): + result = alert_chain_smoke_test.send_source_provider_heartbeat( + "https://awoooi.example", + providers=["sentry", "signoz"], + operator_key=None, + operator_id="gitea-e2e-health", + ) + + self.assertFalse(result.passed) + self.assertTrue(result.critical) + self.assertIn("AWOOOP_OPERATOR_API_KEY", result.message) + + def test_source_provider_heartbeat_posts_expected_payload(self): + calls = [] + + def fake_post(url, payload, *, headers=None, timeout=None): + calls.append( + { + "url": url, + "payload": payload, + "headers": headers, + "timeout": timeout, + } + ) + return alert_chain_smoke_test.HttpGetResult( + 200, + ( + '{"status":"recorded","items":[' + '{"provider":"sentry"},{"provider":"signoz"}]}' + ), + ) + + original_post = alert_chain_smoke_test.http_post_json + try: + alert_chain_smoke_test.http_post_json = fake_post + result = alert_chain_smoke_test.send_source_provider_heartbeat( + "https://awoooi.example", + providers=["sentry", "signoz"], + operator_key="secret", + operator_id="gitea-e2e-health", + run_ref="run-123", + ) + finally: + alert_chain_smoke_test.http_post_json = original_post + + self.assertTrue(result.passed) + self.assertEqual( + calls[0]["url"], + "https://awoooi.example/api/v1/platform/events/dossier/provider-heartbeat", + ) + self.assertEqual(calls[0]["payload"]["providers"], ["sentry", "signoz"]) + self.assertEqual(calls[0]["payload"]["run_ref"], "run-123") + self.assertEqual(calls[0]["headers"]["X-AwoooP-Operator-Id"], "gitea-e2e-health") + self.assertEqual(calls[0]["headers"]["X-AwoooP-Operator-Key"], "secret") + if __name__ == "__main__": unittest.main() diff --git a/apps/api/tests/test_channel_hub_grouped_alert_events.py b/apps/api/tests/test_channel_hub_grouped_alert_events.py index 4afb58e2..bb50d6bc 100644 --- a/apps/api/tests/test_channel_hub_grouped_alert_events.py +++ b/apps/api/tests/test_channel_hub_grouped_alert_events.py @@ -274,6 +274,32 @@ def test_sentry_and_signoz_source_refs_keep_raw_event_ids() -> None: assert signoz_envelope["source_refs"]["signoz_alerts"] == ["HighLatency", "fp-456"] +def test_source_provider_heartbeat_refs_do_not_claim_real_provider_alerts() -> None: + sentry_envelope = build_inbound_source_envelope( + provider="sentry", + stage="heartbeat", + provider_event_id="sentry:heartbeat:heartbeat-1", + raw_event_id="heartbeat-1", + raw_content="Sentry heartbeat", + alertname="SourceProviderHeartbeat", + fingerprint="source-provider-heartbeat:sentry", + ) + signoz_envelope = build_inbound_source_envelope( + provider="signoz", + stage="heartbeat", + provider_event_id="signoz:heartbeat:heartbeat-1", + raw_event_id="heartbeat-1", + raw_content="SignOz heartbeat", + alertname="SourceProviderHeartbeat", + fingerprint="source-provider-heartbeat:signoz", + ) + + assert sentry_envelope["source_refs"]["event_ids"] == ["heartbeat-1"] + assert sentry_envelope["source_refs"]["sentry_issue_ids"] == [] + assert signoz_envelope["source_refs"]["event_ids"] == ["heartbeat-1"] + assert signoz_envelope["source_refs"]["signoz_alerts"] == [] + + async def test_record_outbound_message_sets_sent_at_for_sent_messages() -> None: session = _FakeSession() run_id = build_grouped_alert_run_id("awoooi", "telegram-message-13152") diff --git a/apps/api/tests/test_platform_events_provider_heartbeat.py b/apps/api/tests/test_platform_events_provider_heartbeat.py new file mode 100644 index 00000000..33d2e113 --- /dev/null +++ b/apps/api/tests/test_platform_events_provider_heartbeat.py @@ -0,0 +1,51 @@ +from __future__ import annotations + +from uuid import uuid4 + +import pytest + +from src.api.v1.platform import events +from src.core.awooop_operator_auth import AwoooPOperatorPrincipal + + +@pytest.mark.asyncio +async def test_create_source_provider_heartbeat_records_low_noise_events(monkeypatch): + captured: list[dict] = [] + + async def fake_record_external_alert_event(**kwargs): + captured.append(kwargs) + return uuid4() + + monkeypatch.setattr(events, "record_external_alert_event", fake_record_external_alert_event) + + response = await events.create_source_provider_heartbeat( + events.SourceProviderHeartbeatRequest( + providers=["sentry", "signoz"], + reason="test_provider_freshness", + run_ref="run-123", + ), + AwoooPOperatorPrincipal( + operator_id="gitea-e2e-health", + auth_method="operator_api_key", + ), + ) + + assert response["status"] == "recorded" + assert [item["provider"] for item in response["items"]] == ["sentry", "signoz"] + assert [item["event_id"] for item in response["items"]] == [ + captured[0]["event_id"], + captured[1]["event_id"], + ] + + for call in captured: + assert call["stage"] == "heartbeat" + assert call["severity"] == "info" + assert call["title"] == "SourceProviderHeartbeat" + assert call["target_resource"] == "source-provider-ingestion" + assert call["labels"]["telegram"] == "not_sent" + assert call["labels"]["incident"] == "not_created" + assert call["payload"]["side_effects"] == { + "incident_created": False, + "telegram_sent": False, + "approval_created": False, + } diff --git a/scripts/alert_chain_smoke_test.py b/scripts/alert_chain_smoke_test.py index 31304b2c..782e4c6c 100644 --- a/scripts/alert_chain_smoke_test.py +++ b/scripts/alert_chain_smoke_test.py @@ -88,6 +88,29 @@ def http_get( return HttpGetResult(exc.code, body) +def http_post_json( + url: str, + payload: dict[str, Any], + *, + headers: dict[str, str] | None = None, + timeout: int = TIMEOUT, +) -> HttpGetResult: + body = json.dumps(payload, ensure_ascii=False).encode("utf-8") + request_headers = { + "Accept": "application/json,text/plain,*/*", + "Content-Type": "application/json", + **(headers or {}), + } + request = Request(url, data=body, headers=request_headers, method="POST") + try: + with urlopen(request, timeout=timeout) as response: + response_body = response.read().decode("utf-8", errors="replace") + return HttpGetResult(response.status, response_body) + except HTTPError as exc: + response_body = exc.read().decode("utf-8", errors="replace") + return HttpGetResult(exc.code, response_body) + + def _http_error_message(error: Exception) -> str: if isinstance(error, URLError): return str(error.reason) @@ -407,7 +430,7 @@ def check_webhook_health(api_url: str) -> list[CheckResult]: try: resp = http_get(url, timeout=TIMEOUT) if resp.status_code == 200: - results.append(CheckResult(name, True, f"HTTP 200 OK")) + results.append(CheckResult(name, True, "HTTP 200 OK")) else: results.append( CheckResult(name, False, f"HTTP {resp.status_code}") @@ -418,6 +441,82 @@ def check_webhook_health(api_url: str) -> list[CheckResult]: return results +def send_source_provider_heartbeat( + api_url: str, + *, + providers: list[str], + operator_key: str | None, + operator_id: str, + run_ref: str | None = None, +) -> CheckResult: + """Record low-noise provider freshness evidence without creating incidents.""" + cleaned_providers = [ + provider.strip().lower() + for provider in providers + if provider.strip().lower() in {"sentry", "signoz"} + ] + if not cleaned_providers: + return CheckResult( + "Source Provider Heartbeat", + False, + "沒有有效 provider(允許 sentry/signoz)", + ) + if not operator_key: + return CheckResult( + "Source Provider Heartbeat", + False, + "AWOOOP_OPERATOR_API_KEY 未設定;無法寫入受保護 freshness heartbeat", + ) + + payload = { + "project_id": "awoooi", + "providers": cleaned_providers, + "reason": "scheduled_provider_freshness_smoke", + "run_ref": run_ref, + } + try: + resp = http_post_json( + f"{api_url}/api/v1/platform/events/dossier/provider-heartbeat", + payload, + headers={ + "X-AwoooP-Operator-Id": operator_id, + "X-AwoooP-Operator-Key": operator_key, + }, + timeout=TIMEOUT, + ) + data = resp.json() + if resp.status_code >= 400: + return CheckResult( + "Source Provider Heartbeat", + False, + f"HTTP {resp.status_code}: {data.get('detail', resp.text) if isinstance(data, dict) else resp.text}", + ) + items = data.get("items", []) if isinstance(data, dict) else [] + recorded = sorted( + str(item.get("provider", "")).strip().lower() + for item in items + if isinstance(item, dict) + ) + expected = sorted(set(cleaned_providers)) + if recorded != expected: + return CheckResult( + "Source Provider Heartbeat", + False, + f"recorded providers mismatch: expected={expected}, actual={recorded}", + ) + return CheckResult( + "Source Provider Heartbeat", + True, + f"recorded {', '.join(recorded)} freshness heartbeat(s)", + ) + except (URLError, TimeoutError, OSError, json.JSONDecodeError) as e: + return CheckResult( + "Source Provider Heartbeat", + False, + f"無法寫入 provider heartbeat: {_http_error_message(e)}", + ) + + def check_signoz_reachable(signoz_url: str) -> CheckResult: """Check 4: SigNoz UI 可達""" try: @@ -507,10 +606,19 @@ def check_event_exporter() -> CheckResult: # ============================================================================= # 主程式 # ============================================================================= -def run_smoke_test(api_url: str, fail_fast: bool = False) -> SmokeTestReport: +def run_smoke_test( + api_url: str, + fail_fast: bool = False, + *, + source_provider_heartbeat: bool = False, + source_providers: list[str] | None = None, + operator_key: str | None = None, + operator_id: str = "gitea-e2e-health", + run_ref: str | None = None, +) -> SmokeTestReport: report = SmokeTestReport() - print(f"\n🔍 AWOOOI Alert Chain Smoke Test") + print("\n🔍 AWOOOI Alert Chain Smoke Test") print(f" API: {api_url}") print(f" 時間: {time.strftime('%Y-%m-%d %H:%M:%S %Z')}") print("-" * 50) @@ -529,6 +637,23 @@ def run_smoke_test(api_url: str, fail_fast: bool = False) -> SmokeTestReport: if fail_fast and not result.passed and result.critical: return report + if source_provider_heartbeat: + provider_list = source_providers or ["sentry", "signoz"] + heartbeat_result = send_source_provider_heartbeat( + api_url, + providers=provider_list, + operator_key=operator_key, + operator_id=operator_id, + run_ref=run_ref, + ) + report.add(heartbeat_result) + if fail_fast and not heartbeat_result.passed and heartbeat_result.critical: + return report + + if heartbeat_result.passed: + for source in provider_list: + report.add(check_alert_chain_metric(PROMETHEUS_URL, api_url, source=source)) + # Check 4: SigNoz report.add(check_signoz_reachable(SIGNOZ_URL)) @@ -552,9 +677,43 @@ def main() -> int: parser.add_argument( "--json", action="store_true", help="輸出 JSON 格式結果" ) + parser.add_argument( + "--source-provider-heartbeat", + action="store_true", + help="寫入 Sentry/SignOz 低噪音 freshness heartbeat 並驗證 provider 指標", + ) + parser.add_argument( + "--source-provider", + action="append", + choices=["sentry", "signoz"], + help="指定要寫入 heartbeat 的 provider;可重複指定,預設 sentry+signoz", + ) + parser.add_argument( + "--operator-id", + default=os.environ.get("AWOOOP_OPERATOR_ID", "gitea-e2e-health"), + help="AwoooP operator identity header", + ) + parser.add_argument( + "--operator-key-env", + default="AWOOOP_OPERATOR_API_KEY", + help="讀取 AwoooP operator key 的環境變數名稱", + ) + parser.add_argument( + "--run-ref", + default=os.environ.get("GITHUB_RUN_ID") or os.environ.get("GITEA_RUN_ID"), + help="CI run reference stored in heartbeat payload", + ) args = parser.parse_args() - report = run_smoke_test(args.api_url, args.fail_fast) + report = run_smoke_test( + args.api_url, + args.fail_fast, + source_provider_heartbeat=args.source_provider_heartbeat, + source_providers=args.source_provider, + operator_key=os.environ.get(args.operator_key_env), + operator_id=args.operator_id, + run_ref=args.run_ref, + ) print("-" * 50) if report.passed: