ci(awooop): refresh source correlation canary
All checks were successful
Code Review / ai-code-review (push) Successful in 11s

This commit is contained in:
Your Name
2026-05-21 12:13:07 +08:00
parent 8adae4788c
commit bbe081fc57
2 changed files with 218 additions and 70 deletions

View File

@@ -53,6 +53,9 @@ jobs:
- name: Source Provider Freshness Smoke
run: |
SOURCE_CANARY_RUN_REF="gitea-e2e-${GITHUB_RUN_ID:-manual}-${GITHUB_RUN_ATTEMPT:-1}"
echo "SOURCE_CANARY_RUN_REF=${SOURCE_CANARY_RUN_REF}" >> "$GITHUB_ENV"
echo "SOURCE_CANARY_WORK_ITEM_ID=source-evidence:sentry:upstream_canary:${SOURCE_CANARY_RUN_REF}" >> "$GITHUB_ENV"
OPERATOR_KEY="$(cat <<'AWOOOI_SECRET_AWOOOP_OPERATOR_API_KEY'
${{ secrets.AWOOOP_OPERATOR_API_KEY }}
AWOOOI_SECRET_AWOOOP_OPERATOR_API_KEY
@@ -64,6 +67,7 @@ jobs:
--metrics-api-url http://192.168.0.125:32334 \
--source-provider-heartbeat \
--source-provider-upstream-canary \
--run-ref "${SOURCE_CANARY_RUN_REF}" \
--json
- name: Source Correlation Applied-Link Smoke
@@ -71,9 +75,11 @@ jobs:
python3 scripts/awooop_source_correlation_apply_smoke.py \
--api-url https://awoooi.wooo.work \
--target-incident-id INC-20260505-25E744 \
--work-item-id source-evidence:sentry:received:codex-sentry-20260513-t15b-v3 \
--expected-source-event-provider-event-id sentry:source_correlation_linked:codex-sentry-20260513-t15b-v3 \
--allow-existing-apply
--allow-existing-apply \
--refresh-if-stale-days 6 \
--refresh-work-item-id "${SOURCE_CANARY_WORK_ITEM_ID}" \
--reviewer-id gitea_e2e_source_link_canary \
--operator-note "T122 rolling source-correlation canary refresh; append-only status-chain proof"
- name: Notify Telegram on Failure
if: failure()

View File

@@ -15,6 +15,7 @@ import time
import urllib.error
import urllib.parse
import urllib.request
from datetime import datetime, timezone
from typing import Any
@@ -153,6 +154,47 @@ def _has_expected_applied_event(
return False
def _parse_event_time(value: Any) -> datetime | None:
text = str(value or "").strip()
if not text:
return None
if text.endswith("Z"):
text = f"{text[:-1]}+00:00"
try:
parsed = datetime.fromisoformat(text)
except ValueError:
return None
if parsed.tzinfo is None:
parsed = parsed.replace(tzinfo=timezone.utc)
return parsed.astimezone(timezone.utc)
def _applied_link_age_days(correlation: dict[str, Any]) -> float | None:
latest = _parse_event_time(correlation.get("latest_applied_link_at"))
if latest is None:
return None
age_seconds = (datetime.now(timezone.utc) - latest).total_seconds()
return max(age_seconds, 0.0) / 86400
def _refresh_reason(
correlation: dict[str, Any],
*,
refresh_if_stale_days: float | None,
) -> str | None:
if refresh_if_stale_days is None:
return None
age_days = _applied_link_age_days(correlation)
if age_days is None:
return "latest_applied_link_at_missing"
if age_days > refresh_if_stale_days:
return (
f"latest_applied_link_age_days={age_days:.3f} "
f"> refresh_if_stale_days={refresh_if_stale_days:.3f}"
)
return None
def _wait_for_status_chain(
*,
api_url: str,
@@ -199,7 +241,12 @@ def _wait_for_status_chain(
)
def _verify_existing_status_chain(args: argparse.Namespace, *, status: str) -> dict[str, Any]:
def _verify_existing_status_chain(
args: argparse.Namespace,
*,
status: str,
work_item_id: str | None = None,
) -> dict[str, Any]:
chain = _wait_for_status_chain(
api_url=args.api_url,
project_id=args.project_id,
@@ -210,9 +257,10 @@ def _verify_existing_status_chain(args: argparse.Namespace, *, status: str) -> d
interval_seconds=args.status_interval_seconds,
)
correlation = chain["source_refs"]["correlation"]
age_days = _applied_link_age_days(correlation)
return {
"status": status,
"work_item_id": args.work_item_id,
"work_item_id": work_item_id or args.work_item_id,
"target_incident_id": args.target_incident_id,
"expected_source_event_provider_event_id": (
args.expected_source_event_provider_event_id
@@ -220,76 +268,36 @@ def _verify_existing_status_chain(args: argparse.Namespace, *, status: str) -> d
"verification_status": correlation.get("verification_status"),
"applied_link_total": correlation.get("applied_link_total"),
"latest_applied_link_at": correlation.get("latest_applied_link_at"),
"applied_link_age_days": round(age_days, 6) if age_days is not None else None,
}
def run(args: argparse.Namespace) -> dict[str, Any]:
recurrence_url = _url(
args.api_url,
"/api/v1/platform/events/dossier/recurrence",
{
"project_id": args.project_id,
"provider": args.provider,
"limit": args.limit,
},
def _select_refresh_work_item(
recurrence: dict[str, Any],
*,
args: argparse.Namespace,
) -> dict[str, Any]:
if not args.refresh_work_item_id and not args.refresh_from_latest_canary:
raise SmokeError(
"applied source link is stale, but no refresh source was configured; "
"pass --refresh-work-item-id or --refresh-from-latest-canary"
)
return _find_work_item(
recurrence,
work_item_id=args.refresh_work_item_id,
allow_non_canary=args.allow_non_canary,
allow_existing_apply=False,
)
recurrence = _http_json(recurrence_url)
try:
item = _find_work_item(
recurrence,
work_item_id=args.work_item_id,
allow_non_canary=args.allow_non_canary,
allow_existing_apply=args.allow_existing_apply,
)
except SmokeError:
if args.allow_existing_apply and args.work_item_id:
return _verify_existing_status_chain(
args,
status="verified_existing_apply_without_recurrence",
)
raise
work_item_id = _source_review_work_item_id(item)
existing_apply = item.get("source_correlation_apply")
if args.dry_run:
return {
"status": "dry_run",
"work_item_id": work_item_id,
"target_incident_id": args.target_incident_id,
"latest_provider_event_id": item.get("latest_provider_event_id"),
"alertname": item.get("alertname"),
"already_applied": bool(existing_apply),
}
if existing_apply and args.allow_existing_apply:
chain = _wait_for_status_chain(
api_url=args.api_url,
project_id=args.project_id,
incident_id=args.target_incident_id,
min_applied=args.min_applied,
expected_source_event_provider_event_id=(
args.expected_source_event_provider_event_id
),
attempts=args.status_attempts,
interval_seconds=args.status_interval_seconds,
)
correlation = chain["source_refs"]["correlation"]
return {
"status": "already_applied",
"work_item_id": work_item_id,
"target_incident_id": args.target_incident_id,
"expected_source_event_provider_event_id": (
args.expected_source_event_provider_event_id
),
"apply_status": existing_apply.get("apply_status"),
"source_event_provider_event_id": existing_apply.get(
"source_event_provider_event_id"
),
"verification_status": correlation.get("verification_status"),
"applied_link_total": correlation.get("applied_link_total"),
"latest_applied_link_at": correlation.get("latest_applied_link_at"),
}
def _apply_source_correlation_item(
*,
args: argparse.Namespace,
work_item_id: str,
status: str,
refresh_reason: str | None = None,
previous_latest_applied_link_at: str | None = None,
) -> dict[str, Any]:
review_url = _url(
args.api_url,
"/api/v1/platform/events/dossier/recurrence/source-correlation/review",
@@ -352,8 +360,9 @@ def run(args: argparse.Namespace) -> dict[str, Any]:
interval_seconds=args.status_interval_seconds,
)
correlation = chain["source_refs"]["correlation"]
age_days = _applied_link_age_days(correlation)
return {
"status": "passed",
"status": status,
"work_item_id": work_item_id,
"target_incident_id": args.target_incident_id,
"review_status": review.get("review_record_status"),
@@ -370,9 +379,124 @@ def run(args: argparse.Namespace) -> dict[str, Any]:
"verification_status": correlation.get("verification_status"),
"applied_link_total": correlation.get("applied_link_total"),
"latest_applied_link_at": correlation.get("latest_applied_link_at"),
"applied_link_age_days": round(age_days, 6) if age_days is not None else None,
"refresh_reason": refresh_reason,
"previous_latest_applied_link_at": previous_latest_applied_link_at,
}
def run(args: argparse.Namespace) -> dict[str, Any]:
recurrence_url = _url(
args.api_url,
"/api/v1/platform/events/dossier/recurrence",
{
"project_id": args.project_id,
"provider": args.provider,
"limit": args.limit,
},
)
recurrence = _http_json(recurrence_url)
try:
item = _find_work_item(
recurrence,
work_item_id=args.work_item_id,
allow_non_canary=args.allow_non_canary,
allow_existing_apply=args.allow_existing_apply,
)
except SmokeError:
if args.allow_existing_apply and args.work_item_id:
return _verify_existing_status_chain(
args,
status="verified_existing_apply_without_recurrence",
)
raise
work_item_id = _source_review_work_item_id(item)
existing_apply = item.get("source_correlation_apply")
if (
args.dry_run
and not (
existing_apply
and args.allow_existing_apply
and args.refresh_if_stale_days is not None
)
):
return {
"status": "dry_run",
"work_item_id": work_item_id,
"target_incident_id": args.target_incident_id,
"latest_provider_event_id": item.get("latest_provider_event_id"),
"alertname": item.get("alertname"),
"already_applied": bool(existing_apply),
}
if existing_apply and args.allow_existing_apply:
try:
existing = _verify_existing_status_chain(
args,
status="already_applied",
work_item_id=work_item_id,
)
except SmokeError as exc:
if args.refresh_if_stale_days is None:
raise
refresh_reason = f"status_chain_verify_failed:{exc}"
previous_latest_applied_link_at = None
else:
correlation = {
"latest_applied_link_at": existing.get("latest_applied_link_at"),
"applied_link_total": existing.get("applied_link_total"),
"verification_status": existing.get("verification_status"),
}
refresh_reason = _refresh_reason(
correlation,
refresh_if_stale_days=args.refresh_if_stale_days,
)
if not refresh_reason:
return {
**existing,
"apply_status": existing_apply.get("apply_status"),
"source_event_provider_event_id": existing_apply.get(
"source_event_provider_event_id"
),
"refresh_if_stale_days": args.refresh_if_stale_days,
"refresh_reason": None,
}
previous_latest_applied_link_at = str(
existing.get("latest_applied_link_at") or ""
) or None
refresh_item = _select_refresh_work_item(recurrence, args=args)
refresh_work_item_id = _source_review_work_item_id(refresh_item)
if args.dry_run:
return {
"status": "dry_run_refresh",
"work_item_id": work_item_id,
"refresh_work_item_id": refresh_work_item_id,
"target_incident_id": args.target_incident_id,
"refresh_if_stale_days": args.refresh_if_stale_days,
"refresh_reason": refresh_reason,
"previous_latest_applied_link_at": previous_latest_applied_link_at,
"refresh_latest_provider_event_id": refresh_item.get(
"latest_provider_event_id"
),
"refresh_alertname": refresh_item.get("alertname"),
}
return _apply_source_correlation_item(
args=args,
work_item_id=refresh_work_item_id,
status="refreshed",
refresh_reason=refresh_reason,
previous_latest_applied_link_at=previous_latest_applied_link_at,
)
return _apply_source_correlation_item(
args=args,
work_item_id=work_item_id,
status="passed",
)
def parse_args(argv: list[str]) -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Run AwoooP source-correlation apply status-chain smoke."
@@ -397,6 +521,24 @@ def parse_args(argv: list[str]) -> argparse.Namespace:
parser.add_argument("--status-interval-seconds", type=float, default=2.0)
parser.add_argument("--allow-non-canary", action="store_true")
parser.add_argument("--allow-existing-apply", action="store_true")
parser.add_argument(
"--refresh-if-stale-days",
type=float,
help=(
"When an existing applied link is older than this many days, apply "
"a fresh canary source-correlation work item instead of passing on "
"near-expired evidence."
),
)
parser.add_argument(
"--refresh-work-item-id",
help="Specific canary source-correlation work item to apply when refresh is needed.",
)
parser.add_argument(
"--refresh-from-latest-canary",
action="store_true",
help="When refresh is needed, use the latest unapplied canary/smoke/codex work item.",
)
parser.add_argument("--dry-run", action="store_true")
return parser.parse_args(argv)