From bbe081fc57cd984555a7b810814851a5e1d8922f Mon Sep 17 00:00:00 2001 From: Your Name Date: Thu, 21 May 2026 12:13:07 +0800 Subject: [PATCH] ci(awooop): refresh source correlation canary --- .gitea/workflows/e2e-health.yaml | 12 +- .../awooop_source_correlation_apply_smoke.py | 276 +++++++++++++----- 2 files changed, 218 insertions(+), 70 deletions(-) diff --git a/.gitea/workflows/e2e-health.yaml b/.gitea/workflows/e2e-health.yaml index a664ba17..46467c09 100644 --- a/.gitea/workflows/e2e-health.yaml +++ b/.gitea/workflows/e2e-health.yaml @@ -53,6 +53,9 @@ jobs: - name: Source Provider Freshness Smoke run: | + SOURCE_CANARY_RUN_REF="gitea-e2e-${GITHUB_RUN_ID:-manual}-${GITHUB_RUN_ATTEMPT:-1}" + echo "SOURCE_CANARY_RUN_REF=${SOURCE_CANARY_RUN_REF}" >> "$GITHUB_ENV" + echo "SOURCE_CANARY_WORK_ITEM_ID=source-evidence:sentry:upstream_canary:${SOURCE_CANARY_RUN_REF}" >> "$GITHUB_ENV" OPERATOR_KEY="$(cat <<'AWOOOI_SECRET_AWOOOP_OPERATOR_API_KEY' ${{ secrets.AWOOOP_OPERATOR_API_KEY }} AWOOOI_SECRET_AWOOOP_OPERATOR_API_KEY @@ -64,6 +67,7 @@ jobs: --metrics-api-url http://192.168.0.125:32334 \ --source-provider-heartbeat \ --source-provider-upstream-canary \ + --run-ref "${SOURCE_CANARY_RUN_REF}" \ --json - name: Source Correlation Applied-Link Smoke @@ -71,9 +75,11 @@ jobs: python3 scripts/awooop_source_correlation_apply_smoke.py \ --api-url https://awoooi.wooo.work \ --target-incident-id INC-20260505-25E744 \ - --work-item-id source-evidence:sentry:received:codex-sentry-20260513-t15b-v3 \ - --expected-source-event-provider-event-id sentry:source_correlation_linked:codex-sentry-20260513-t15b-v3 \ - --allow-existing-apply + --allow-existing-apply \ + --refresh-if-stale-days 6 \ + --refresh-work-item-id "${SOURCE_CANARY_WORK_ITEM_ID}" \ + --reviewer-id gitea_e2e_source_link_canary \ + --operator-note "T122 rolling source-correlation canary refresh; append-only status-chain proof" - name: Notify Telegram on Failure if: failure() diff --git a/scripts/awooop_source_correlation_apply_smoke.py b/scripts/awooop_source_correlation_apply_smoke.py index 22bf8c8f..4fc6b4fe 100644 --- a/scripts/awooop_source_correlation_apply_smoke.py +++ b/scripts/awooop_source_correlation_apply_smoke.py @@ -15,6 +15,7 @@ import time import urllib.error import urllib.parse import urllib.request +from datetime import datetime, timezone from typing import Any @@ -153,6 +154,47 @@ def _has_expected_applied_event( return False +def _parse_event_time(value: Any) -> datetime | None: + text = str(value or "").strip() + if not text: + return None + if text.endswith("Z"): + text = f"{text[:-1]}+00:00" + try: + parsed = datetime.fromisoformat(text) + except ValueError: + return None + if parsed.tzinfo is None: + parsed = parsed.replace(tzinfo=timezone.utc) + return parsed.astimezone(timezone.utc) + + +def _applied_link_age_days(correlation: dict[str, Any]) -> float | None: + latest = _parse_event_time(correlation.get("latest_applied_link_at")) + if latest is None: + return None + age_seconds = (datetime.now(timezone.utc) - latest).total_seconds() + return max(age_seconds, 0.0) / 86400 + + +def _refresh_reason( + correlation: dict[str, Any], + *, + refresh_if_stale_days: float | None, +) -> str | None: + if refresh_if_stale_days is None: + return None + age_days = _applied_link_age_days(correlation) + if age_days is None: + return "latest_applied_link_at_missing" + if age_days > refresh_if_stale_days: + return ( + f"latest_applied_link_age_days={age_days:.3f} " + f"> refresh_if_stale_days={refresh_if_stale_days:.3f}" + ) + return None + + def _wait_for_status_chain( *, api_url: str, @@ -199,7 +241,12 @@ def _wait_for_status_chain( ) -def _verify_existing_status_chain(args: argparse.Namespace, *, status: str) -> dict[str, Any]: +def _verify_existing_status_chain( + args: argparse.Namespace, + *, + status: str, + work_item_id: str | None = None, +) -> dict[str, Any]: chain = _wait_for_status_chain( api_url=args.api_url, project_id=args.project_id, @@ -210,9 +257,10 @@ def _verify_existing_status_chain(args: argparse.Namespace, *, status: str) -> d interval_seconds=args.status_interval_seconds, ) correlation = chain["source_refs"]["correlation"] + age_days = _applied_link_age_days(correlation) return { "status": status, - "work_item_id": args.work_item_id, + "work_item_id": work_item_id or args.work_item_id, "target_incident_id": args.target_incident_id, "expected_source_event_provider_event_id": ( args.expected_source_event_provider_event_id @@ -220,76 +268,36 @@ def _verify_existing_status_chain(args: argparse.Namespace, *, status: str) -> d "verification_status": correlation.get("verification_status"), "applied_link_total": correlation.get("applied_link_total"), "latest_applied_link_at": correlation.get("latest_applied_link_at"), + "applied_link_age_days": round(age_days, 6) if age_days is not None else None, } -def run(args: argparse.Namespace) -> dict[str, Any]: - recurrence_url = _url( - args.api_url, - "/api/v1/platform/events/dossier/recurrence", - { - "project_id": args.project_id, - "provider": args.provider, - "limit": args.limit, - }, +def _select_refresh_work_item( + recurrence: dict[str, Any], + *, + args: argparse.Namespace, +) -> dict[str, Any]: + if not args.refresh_work_item_id and not args.refresh_from_latest_canary: + raise SmokeError( + "applied source link is stale, but no refresh source was configured; " + "pass --refresh-work-item-id or --refresh-from-latest-canary" + ) + return _find_work_item( + recurrence, + work_item_id=args.refresh_work_item_id, + allow_non_canary=args.allow_non_canary, + allow_existing_apply=False, ) - recurrence = _http_json(recurrence_url) - try: - item = _find_work_item( - recurrence, - work_item_id=args.work_item_id, - allow_non_canary=args.allow_non_canary, - allow_existing_apply=args.allow_existing_apply, - ) - except SmokeError: - if args.allow_existing_apply and args.work_item_id: - return _verify_existing_status_chain( - args, - status="verified_existing_apply_without_recurrence", - ) - raise - work_item_id = _source_review_work_item_id(item) - existing_apply = item.get("source_correlation_apply") - if args.dry_run: - return { - "status": "dry_run", - "work_item_id": work_item_id, - "target_incident_id": args.target_incident_id, - "latest_provider_event_id": item.get("latest_provider_event_id"), - "alertname": item.get("alertname"), - "already_applied": bool(existing_apply), - } - - if existing_apply and args.allow_existing_apply: - chain = _wait_for_status_chain( - api_url=args.api_url, - project_id=args.project_id, - incident_id=args.target_incident_id, - min_applied=args.min_applied, - expected_source_event_provider_event_id=( - args.expected_source_event_provider_event_id - ), - attempts=args.status_attempts, - interval_seconds=args.status_interval_seconds, - ) - correlation = chain["source_refs"]["correlation"] - return { - "status": "already_applied", - "work_item_id": work_item_id, - "target_incident_id": args.target_incident_id, - "expected_source_event_provider_event_id": ( - args.expected_source_event_provider_event_id - ), - "apply_status": existing_apply.get("apply_status"), - "source_event_provider_event_id": existing_apply.get( - "source_event_provider_event_id" - ), - "verification_status": correlation.get("verification_status"), - "applied_link_total": correlation.get("applied_link_total"), - "latest_applied_link_at": correlation.get("latest_applied_link_at"), - } +def _apply_source_correlation_item( + *, + args: argparse.Namespace, + work_item_id: str, + status: str, + refresh_reason: str | None = None, + previous_latest_applied_link_at: str | None = None, +) -> dict[str, Any]: review_url = _url( args.api_url, "/api/v1/platform/events/dossier/recurrence/source-correlation/review", @@ -352,8 +360,9 @@ def run(args: argparse.Namespace) -> dict[str, Any]: interval_seconds=args.status_interval_seconds, ) correlation = chain["source_refs"]["correlation"] + age_days = _applied_link_age_days(correlation) return { - "status": "passed", + "status": status, "work_item_id": work_item_id, "target_incident_id": args.target_incident_id, "review_status": review.get("review_record_status"), @@ -370,9 +379,124 @@ def run(args: argparse.Namespace) -> dict[str, Any]: "verification_status": correlation.get("verification_status"), "applied_link_total": correlation.get("applied_link_total"), "latest_applied_link_at": correlation.get("latest_applied_link_at"), + "applied_link_age_days": round(age_days, 6) if age_days is not None else None, + "refresh_reason": refresh_reason, + "previous_latest_applied_link_at": previous_latest_applied_link_at, } +def run(args: argparse.Namespace) -> dict[str, Any]: + recurrence_url = _url( + args.api_url, + "/api/v1/platform/events/dossier/recurrence", + { + "project_id": args.project_id, + "provider": args.provider, + "limit": args.limit, + }, + ) + recurrence = _http_json(recurrence_url) + try: + item = _find_work_item( + recurrence, + work_item_id=args.work_item_id, + allow_non_canary=args.allow_non_canary, + allow_existing_apply=args.allow_existing_apply, + ) + except SmokeError: + if args.allow_existing_apply and args.work_item_id: + return _verify_existing_status_chain( + args, + status="verified_existing_apply_without_recurrence", + ) + raise + work_item_id = _source_review_work_item_id(item) + existing_apply = item.get("source_correlation_apply") + + if ( + args.dry_run + and not ( + existing_apply + and args.allow_existing_apply + and args.refresh_if_stale_days is not None + ) + ): + return { + "status": "dry_run", + "work_item_id": work_item_id, + "target_incident_id": args.target_incident_id, + "latest_provider_event_id": item.get("latest_provider_event_id"), + "alertname": item.get("alertname"), + "already_applied": bool(existing_apply), + } + + if existing_apply and args.allow_existing_apply: + try: + existing = _verify_existing_status_chain( + args, + status="already_applied", + work_item_id=work_item_id, + ) + except SmokeError as exc: + if args.refresh_if_stale_days is None: + raise + refresh_reason = f"status_chain_verify_failed:{exc}" + previous_latest_applied_link_at = None + else: + correlation = { + "latest_applied_link_at": existing.get("latest_applied_link_at"), + "applied_link_total": existing.get("applied_link_total"), + "verification_status": existing.get("verification_status"), + } + refresh_reason = _refresh_reason( + correlation, + refresh_if_stale_days=args.refresh_if_stale_days, + ) + if not refresh_reason: + return { + **existing, + "apply_status": existing_apply.get("apply_status"), + "source_event_provider_event_id": existing_apply.get( + "source_event_provider_event_id" + ), + "refresh_if_stale_days": args.refresh_if_stale_days, + "refresh_reason": None, + } + previous_latest_applied_link_at = str( + existing.get("latest_applied_link_at") or "" + ) or None + + refresh_item = _select_refresh_work_item(recurrence, args=args) + refresh_work_item_id = _source_review_work_item_id(refresh_item) + if args.dry_run: + return { + "status": "dry_run_refresh", + "work_item_id": work_item_id, + "refresh_work_item_id": refresh_work_item_id, + "target_incident_id": args.target_incident_id, + "refresh_if_stale_days": args.refresh_if_stale_days, + "refresh_reason": refresh_reason, + "previous_latest_applied_link_at": previous_latest_applied_link_at, + "refresh_latest_provider_event_id": refresh_item.get( + "latest_provider_event_id" + ), + "refresh_alertname": refresh_item.get("alertname"), + } + return _apply_source_correlation_item( + args=args, + work_item_id=refresh_work_item_id, + status="refreshed", + refresh_reason=refresh_reason, + previous_latest_applied_link_at=previous_latest_applied_link_at, + ) + + return _apply_source_correlation_item( + args=args, + work_item_id=work_item_id, + status="passed", + ) + + def parse_args(argv: list[str]) -> argparse.Namespace: parser = argparse.ArgumentParser( description="Run AwoooP source-correlation apply status-chain smoke." @@ -397,6 +521,24 @@ def parse_args(argv: list[str]) -> argparse.Namespace: parser.add_argument("--status-interval-seconds", type=float, default=2.0) parser.add_argument("--allow-non-canary", action="store_true") parser.add_argument("--allow-existing-apply", action="store_true") + parser.add_argument( + "--refresh-if-stale-days", + type=float, + help=( + "When an existing applied link is older than this many days, apply " + "a fresh canary source-correlation work item instead of passing on " + "near-expired evidence." + ), + ) + parser.add_argument( + "--refresh-work-item-id", + help="Specific canary source-correlation work item to apply when refresh is needed.", + ) + parser.add_argument( + "--refresh-from-latest-canary", + action="store_true", + help="When refresh is needed, use the latest unapplied canary/smoke/codex work item.", + ) parser.add_argument("--dry-run", action="store_true") return parser.parse_args(argv)