#!/usr/bin/env python3 """Verify that source-correlation apply is visible in AwoooP Status Chain. This smoke intentionally uses an existing source-correlation review work item and the public record/apply APIs. The apply path is append-only: it must not mutate Incident state, auto-repair results, or tickets. """ from __future__ import annotations import argparse import json import sys import time import urllib.error import urllib.parse import urllib.request from datetime import datetime, timezone from typing import Any SAFE_WORK_ITEM_TERMS = ("canary", "smoke", "codex") TRANSIENT_GET_HTTP_CODES = {502, 503, 504} GET_READBACK_ATTEMPTS = 4 GET_READBACK_RETRY_DELAY_SECONDS = 2.0 class SmokeError(RuntimeError): """Raised when the smoke cannot prove the expected production contract.""" def _source_review_work_item_id(item: dict[str, Any]) -> str: work_item = item.get("work_item") if isinstance(item.get("work_item"), dict) else {} review = ( item.get("source_correlation_review") if isinstance(item.get("source_correlation_review"), dict) else {} ) apply = ( item.get("source_correlation_apply") if isinstance(item.get("source_correlation_apply"), dict) else {} ) for candidate in [ apply.get("work_item_id"), review.get("work_item_id"), work_item.get("work_item_id"), ]: safe_id = str(candidate or "").strip() if safe_id.startswith("source-evidence:"): return safe_id return str(work_item.get("work_item_id") or "").strip() def _url(base_url: str, path: str, params: dict[str, str | int | None] | None = None) -> str: root = base_url.rstrip("/") if not params: return f"{root}{path}" query = urllib.parse.urlencode({ key: value for key, value in params.items() if value is not None }) return f"{root}{path}?{query}" def _http_json( url: str, *, method: str = "GET", payload: dict[str, Any] | None = None, timeout: int = 30, ) -> dict[str, Any]: data = None headers = {"Accept": "application/json"} if payload is not None: data = json.dumps(payload).encode("utf-8") headers["Content-Type"] = "application/json" request = urllib.request.Request(url, data=data, headers=headers, method=method) attempts = GET_READBACK_ATTEMPTS if method.upper() == "GET" else 1 last_error: Exception | None = None for attempt in range(max(attempts, 1)): try: with urllib.request.urlopen(request, timeout=timeout) as response: return json.loads(response.read().decode("utf-8")) except urllib.error.HTTPError as exc: body = ( exc.read() .decode("utf-8", errors="replace") .encode("utf-8", errors="replace") .decode("utf-8") )[:500] last_error = SmokeError(f"HTTP {exc.code} from {url}: {body}") should_retry = ( method.upper() == "GET" and exc.code in TRANSIENT_GET_HTTP_CODES and attempt + 1 < attempts ) if should_retry: time.sleep(GET_READBACK_RETRY_DELAY_SECONDS) continue raise last_error from exc except (urllib.error.URLError, TimeoutError, json.JSONDecodeError) as exc: last_error = exc should_retry = method.upper() == "GET" and attempt + 1 < attempts if should_retry: time.sleep(GET_READBACK_RETRY_DELAY_SECONDS) continue raise SmokeError(f"request failed for {url}: {exc}") from exc raise SmokeError(f"request failed for {url}: {last_error}") def _find_work_item( recurrence: dict[str, Any], *, work_item_id: str | None, allow_non_canary: bool, allow_existing_apply: bool, ) -> dict[str, Any]: items = recurrence.get("items") if not isinstance(items, list): raise SmokeError("recurrence response missing items") candidates: list[dict[str, Any]] = [] for item in items: if not isinstance(item, dict): continue work_item = item.get("work_item") if not isinstance(work_item, dict): continue if work_item.get("kind") != "source_correlation_review": continue candidate_id = _source_review_work_item_id(item) match_ids = { candidate_id, str(work_item.get("work_item_id") or "").strip(), } if work_item_id and work_item_id not in match_ids: continue if item.get("source_correlation_apply") and not allow_existing_apply: continue searchable = " ".join( str(value or "") for value in [ candidate_id, item.get("alertname"), item.get("latest_provider_event_id"), item.get("latest_content_preview"), ] ).lower() is_safe_canary = any(term in searchable for term in SAFE_WORK_ITEM_TERMS) if not allow_non_canary and not is_safe_canary: continue candidates.append(item) if not candidates: if work_item_id: raise SmokeError( "source_correlation_review work item not found, already applied, " "or not a canary/smoke item" ) raise SmokeError("no unapplied canary/smoke source_correlation_review work item found") return candidates[0] def _has_expected_applied_event( correlation: dict[str, Any], expected_source_event_provider_event_id: str | None, ) -> bool: if not expected_source_event_provider_event_id: return True candidates = correlation.get("top_candidates") if not isinstance(candidates, list): return False for candidate in candidates: if not isinstance(candidate, dict): continue if candidate.get("link_state") != "applied": continue if candidate.get("provider_event_id") == expected_source_event_provider_event_id: return True return False def _parse_event_time(value: Any) -> datetime | None: text = str(value or "").strip() if not text: return None if text.endswith("Z"): text = f"{text[:-1]}+00:00" try: parsed = datetime.fromisoformat(text) except ValueError: return None if parsed.tzinfo is None: parsed = parsed.replace(tzinfo=timezone.utc) return parsed.astimezone(timezone.utc) def _applied_link_age_days(correlation: dict[str, Any]) -> float | None: latest = _parse_event_time(correlation.get("latest_applied_link_at")) if latest is None: return None age_seconds = (datetime.now(timezone.utc) - latest).total_seconds() return max(age_seconds, 0.0) / 86400 def _refresh_reason( correlation: dict[str, Any], *, refresh_if_stale_days: float | None, ) -> str | None: if refresh_if_stale_days is None: return None age_days = _applied_link_age_days(correlation) if age_days is None: return "latest_applied_link_at_missing" if age_days > refresh_if_stale_days: return ( f"latest_applied_link_age_days={age_days:.3f} " f"> refresh_if_stale_days={refresh_if_stale_days:.3f}" ) return None def _wait_for_status_chain( *, api_url: str, project_id: str, incident_id: str, min_applied: int, expected_source_event_provider_event_id: str | None, attempts: int, interval_seconds: float, ) -> dict[str, Any]: status_url = _url( api_url, "/api/v1/platform/status-chain", {"project_id": project_id, "incident_id": incident_id}, ) last_correlation: dict[str, Any] = {} for _ in range(attempts): chain = _http_json(status_url) correlation = ( chain.get("source_refs", {}).get("correlation", {}) if isinstance(chain.get("source_refs"), dict) else {} ) if isinstance(correlation, dict): last_correlation = correlation applied_total = int(last_correlation.get("applied_link_total") or 0) verification = str(last_correlation.get("verification_status") or "") if ( applied_total >= min_applied and verification == "applied_link_verified" and _has_expected_applied_event( last_correlation, expected_source_event_provider_event_id, ) ): return chain time.sleep(interval_seconds) raise SmokeError( "status-chain did not expose applied source link: " f"verification_status={last_correlation.get('verification_status')} " f"applied_link_total={last_correlation.get('applied_link_total')} " f"expected_event={expected_source_event_provider_event_id or '-'}" ) def _verify_existing_status_chain( args: argparse.Namespace, *, status: str, work_item_id: str | None = None, require_expected_event: bool = True, ) -> dict[str, Any]: chain = _wait_for_status_chain( api_url=args.api_url, project_id=args.project_id, incident_id=args.target_incident_id, min_applied=args.min_applied, expected_source_event_provider_event_id=( args.expected_source_event_provider_event_id if require_expected_event else None ), attempts=args.status_attempts, interval_seconds=args.status_interval_seconds, ) correlation = chain["source_refs"]["correlation"] age_days = _applied_link_age_days(correlation) return { "status": status, "work_item_id": work_item_id or args.work_item_id, "target_incident_id": args.target_incident_id, "expected_source_event_provider_event_id": ( args.expected_source_event_provider_event_id ), "verification_status": correlation.get("verification_status"), "applied_link_total": correlation.get("applied_link_total"), "latest_applied_link_at": correlation.get("latest_applied_link_at"), "applied_link_age_days": round(age_days, 6) if age_days is not None else None, } def _select_refresh_work_item( recurrence: dict[str, Any], *, args: argparse.Namespace, ) -> dict[str, Any]: refresh_work_item_id = args.refresh_work_item_id or args.work_item_id if not refresh_work_item_id and not args.refresh_from_latest_canary: raise SmokeError( "applied source link is stale, but no refresh source was configured; " "pass --refresh-work-item-id or --refresh-from-latest-canary" ) return _find_work_item( recurrence, work_item_id=refresh_work_item_id, allow_non_canary=args.allow_non_canary, allow_existing_apply=False, ) def _refresh_candidate_result(item: dict[str, Any]) -> dict[str, Any]: return { "refresh_candidate_status": "ready", "refresh_candidate_work_item_id": _source_review_work_item_id(item), "refresh_candidate_latest_provider_event_id": item.get( "latest_provider_event_id" ), "refresh_candidate_alertname": item.get("alertname"), } def _failed_check_summary(payload: dict[str, Any]) -> str: checks = payload.get("checks") if not isinstance(checks, list): return "-" failed: list[str] = [] for check in checks: if not isinstance(check, dict) or check.get("passed") is True: continue name = str(check.get("name") or "unknown").strip() or "unknown" detail = str(check.get("detail") or "failed").strip() or "failed" failed.append(f"{name}={detail}") return ", ".join(failed) if failed else "-" def _source_review_readback_state( recurrence: dict[str, Any], *, work_item_id: str, ) -> dict[str, Any]: items = recurrence.get("items") if not isinstance(items, list): return {"found": False, "decision": "missing_items", "review_id": None} for item in items: if not isinstance(item, dict): continue work_item = item.get("work_item") if not isinstance(work_item, dict): continue source_review = ( item.get("source_correlation_review") if isinstance(item.get("source_correlation_review"), dict) else {} ) candidate_ids = { _source_review_work_item_id(item), str(work_item.get("work_item_id") or "").strip(), str(source_review.get("work_item_id") or "").strip(), } if work_item_id not in candidate_ids: continue return { "found": True, "decision": str(source_review.get("decision") or "missing").strip() or "missing", "review_id": source_review.get("review_id"), } return {"found": False, "decision": "not_found", "review_id": None} def _wait_for_review_readback( *, args: argparse.Namespace, work_item_id: str, ) -> dict[str, Any]: recurrence_url = _url( args.api_url, "/api/v1/platform/events/dossier/recurrence", { "project_id": args.project_id, "provider": args.provider, "limit": args.limit, }, ) last_state: dict[str, Any] = {} attempts = max(int(args.review_readback_attempts or 1), 1) for attempt in range(attempts): recurrence = _http_json(recurrence_url) last_state = _source_review_readback_state( recurrence, work_item_id=work_item_id, ) if last_state.get("found") and last_state.get("decision") == "accepted": return last_state if attempt + 1 < attempts: time.sleep(max(float(args.review_readback_interval_seconds or 0), 0.0)) raise SmokeError( "accepted review did not appear in recurrence read model: " f"work_item_id={work_item_id} found={last_state.get('found')} " f"decision={last_state.get('decision')}" ) def _apply_source_correlation_item( *, args: argparse.Namespace, work_item_id: str, status: str, refresh_reason: str | None = None, previous_latest_applied_link_at: str | None = None, ) -> dict[str, Any]: review_url = _url( args.api_url, "/api/v1/platform/events/dossier/recurrence/source-correlation/review", ) review = _http_json( review_url, method="POST", payload={ "project_id": args.project_id, "provider": args.provider, "limit": args.limit, "work_item_id": work_item_id, "decision": "accepted", "target_incident_id": args.target_incident_id, "reviewer_id": args.reviewer_id, "operator_note": args.operator_note, }, ) if review.get("review_record_status") != "recorded" or not review.get("allowed"): raise SmokeError( "accepted review was not recorded: " f"status={review.get('review_record_status')} allowed={review.get('allowed')}" ) _wait_for_review_readback(args=args, work_item_id=work_item_id) apply_url = _url( args.api_url, "/api/v1/platform/events/dossier/recurrence/source-correlation/apply", ) apply_result = _http_json( apply_url, method="POST", payload={ "project_id": args.project_id, "provider": args.provider, "limit": args.limit, "work_item_id": work_item_id, "reviewer_id": args.reviewer_id, "operator_note": args.operator_note, }, ) if apply_result.get("apply_status") != "applied": raise SmokeError( "source correlation apply did not complete: " f"status={apply_result.get('apply_status')} " f"allowed={apply_result.get('allowed')} " f"failed_checks={_failed_check_summary(apply_result)}" ) if apply_result.get("writes_incident_state") is not False: raise SmokeError("apply unexpectedly wrote Incident state") if apply_result.get("writes_auto_repair_result") is not False: raise SmokeError("apply unexpectedly wrote auto-repair state") if apply_result.get("writes_ticket") is not False: raise SmokeError("apply unexpectedly wrote ticket state") chain = _wait_for_status_chain( api_url=args.api_url, project_id=args.project_id, incident_id=args.target_incident_id, min_applied=args.min_applied, expected_source_event_provider_event_id=args.expected_source_event_provider_event_id, attempts=args.status_attempts, interval_seconds=args.status_interval_seconds, ) correlation = chain["source_refs"]["correlation"] age_days = _applied_link_age_days(correlation) return { "status": status, "work_item_id": work_item_id, "target_incident_id": args.target_incident_id, "review_status": review.get("review_record_status"), "apply_status": apply_result.get("apply_status"), "source_event_provider_event_id": apply_result.get( "source_event_provider_event_id" ), "expected_source_event_provider_event_id": ( args.expected_source_event_provider_event_id ), "writes_incident_state": apply_result.get("writes_incident_state"), "writes_auto_repair_result": apply_result.get("writes_auto_repair_result"), "writes_ticket": apply_result.get("writes_ticket"), "verification_status": correlation.get("verification_status"), "applied_link_total": correlation.get("applied_link_total"), "latest_applied_link_at": correlation.get("latest_applied_link_at"), "applied_link_age_days": round(age_days, 6) if age_days is not None else None, "refresh_reason": refresh_reason, "previous_latest_applied_link_at": previous_latest_applied_link_at, } def run(args: argparse.Namespace) -> dict[str, Any]: recurrence_url = _url( args.api_url, "/api/v1/platform/events/dossier/recurrence", { "project_id": args.project_id, "provider": args.provider, "limit": args.limit, }, ) recurrence = _http_json(recurrence_url) if args.allow_existing_apply and args.refresh_if_stale_days is not None: try: existing = _verify_existing_status_chain( args, status="already_applied", require_expected_event=False, ) except SmokeError as exc: refresh_reason = f"status_chain_verify_failed:{exc}" previous_latest_applied_link_at = None else: correlation = { "latest_applied_link_at": existing.get("latest_applied_link_at"), "applied_link_total": existing.get("applied_link_total"), "verification_status": existing.get("verification_status"), } refresh_reason = _refresh_reason( correlation, refresh_if_stale_days=args.refresh_if_stale_days, ) if not refresh_reason: refresh_candidate: dict[str, Any] = {} if args.verify_refresh_candidate: refresh_candidate = _refresh_candidate_result( _select_refresh_work_item(recurrence, args=args) ) return { **existing, "apply_status": "applied", "refresh_if_stale_days": args.refresh_if_stale_days, "refresh_reason": None, **refresh_candidate, } previous_latest_applied_link_at = str( existing.get("latest_applied_link_at") or "" ) or None refresh_item = _select_refresh_work_item(recurrence, args=args) refresh_work_item_id = _source_review_work_item_id(refresh_item) if args.dry_run: return { "status": "dry_run_refresh", "refresh_work_item_id": refresh_work_item_id, "target_incident_id": args.target_incident_id, "refresh_if_stale_days": args.refresh_if_stale_days, "refresh_reason": refresh_reason, "previous_latest_applied_link_at": previous_latest_applied_link_at, "refresh_latest_provider_event_id": refresh_item.get( "latest_provider_event_id" ), "refresh_alertname": refresh_item.get("alertname"), } return _apply_source_correlation_item( args=args, work_item_id=refresh_work_item_id, status="refreshed", refresh_reason=refresh_reason, previous_latest_applied_link_at=previous_latest_applied_link_at, ) try: item = _find_work_item( recurrence, work_item_id=args.work_item_id, allow_non_canary=args.allow_non_canary, allow_existing_apply=args.allow_existing_apply, ) except SmokeError: if args.allow_existing_apply and args.work_item_id: return _verify_existing_status_chain( args, status="verified_existing_apply_without_recurrence", ) raise work_item_id = _source_review_work_item_id(item) existing_apply = item.get("source_correlation_apply") if ( args.dry_run and not ( existing_apply and args.allow_existing_apply and args.refresh_if_stale_days is not None ) ): return { "status": "dry_run", "work_item_id": work_item_id, "target_incident_id": args.target_incident_id, "latest_provider_event_id": item.get("latest_provider_event_id"), "alertname": item.get("alertname"), "already_applied": bool(existing_apply), } if existing_apply and args.allow_existing_apply: try: existing = _verify_existing_status_chain( args, status="already_applied", work_item_id=work_item_id, ) except SmokeError as exc: if args.refresh_if_stale_days is None: raise refresh_reason = f"status_chain_verify_failed:{exc}" previous_latest_applied_link_at = None else: correlation = { "latest_applied_link_at": existing.get("latest_applied_link_at"), "applied_link_total": existing.get("applied_link_total"), "verification_status": existing.get("verification_status"), } refresh_reason = _refresh_reason( correlation, refresh_if_stale_days=args.refresh_if_stale_days, ) if not refresh_reason: refresh_candidate: dict[str, Any] = {} if args.verify_refresh_candidate: refresh_candidate = _refresh_candidate_result( _select_refresh_work_item(recurrence, args=args) ) return { **existing, "apply_status": existing_apply.get("apply_status"), "source_event_provider_event_id": existing_apply.get( "source_event_provider_event_id" ), "refresh_if_stale_days": args.refresh_if_stale_days, "refresh_reason": None, **refresh_candidate, } previous_latest_applied_link_at = str( existing.get("latest_applied_link_at") or "" ) or None refresh_item = _select_refresh_work_item(recurrence, args=args) refresh_work_item_id = _source_review_work_item_id(refresh_item) if args.dry_run: return { "status": "dry_run_refresh", "work_item_id": work_item_id, "refresh_work_item_id": refresh_work_item_id, "target_incident_id": args.target_incident_id, "refresh_if_stale_days": args.refresh_if_stale_days, "refresh_reason": refresh_reason, "previous_latest_applied_link_at": previous_latest_applied_link_at, "refresh_latest_provider_event_id": refresh_item.get( "latest_provider_event_id" ), "refresh_alertname": refresh_item.get("alertname"), } return _apply_source_correlation_item( args=args, work_item_id=refresh_work_item_id, status="refreshed", refresh_reason=refresh_reason, previous_latest_applied_link_at=previous_latest_applied_link_at, ) return _apply_source_correlation_item( args=args, work_item_id=work_item_id, status="passed", ) def parse_args(argv: list[str]) -> argparse.Namespace: parser = argparse.ArgumentParser( description="Run AwoooP source-correlation apply status-chain smoke." ) parser.add_argument("--api-url", default="https://awoooi.wooo.work") parser.add_argument("--project-id", default="awoooi") parser.add_argument("--provider", default="sentry") parser.add_argument("--limit", type=int, default=300) parser.add_argument("--work-item-id") parser.add_argument("--target-incident-id", required=True) parser.add_argument("--expected-source-event-provider-event-id") parser.add_argument("--reviewer-id", default="codex_t120_source_apply_smoke") parser.add_argument( "--operator-note", default=( "T120 controlled source-correlation apply smoke; append-only status-chain " "verification, not an Incident state mutation" ), ) parser.add_argument("--min-applied", type=int, default=1) parser.add_argument("--status-attempts", type=int, default=8) parser.add_argument("--status-interval-seconds", type=float, default=2.0) parser.add_argument("--review-readback-attempts", type=int, default=8) parser.add_argument("--review-readback-interval-seconds", type=float, default=2.0) parser.add_argument("--allow-non-canary", action="store_true") parser.add_argument("--allow-existing-apply", action="store_true") parser.add_argument( "--refresh-if-stale-days", type=float, help=( "When an existing applied link is older than this many days, apply " "a fresh canary source-correlation work item instead of passing on " "near-expired evidence." ), ) parser.add_argument( "--refresh-work-item-id", help="Specific canary source-correlation work item to apply when refresh is needed.", ) parser.add_argument( "--refresh-from-latest-canary", action="store_true", help="When refresh is needed, use the latest unapplied canary/smoke/codex work item.", ) parser.add_argument( "--verify-refresh-candidate", action="store_true", help=( "Even when the existing applied link is still fresh, verify that " "the configured refresh work item exists and is safe to apply later." ), ) parser.add_argument("--dry-run", action="store_true") return parser.parse_args(argv) def main(argv: list[str]) -> int: try: result = run(parse_args(argv)) except SmokeError as exc: print(json.dumps({"status": "failed", "error": str(exc)}, ensure_ascii=False)) return 1 print(json.dumps(result, ensure_ascii=False, indent=2, sort_keys=True)) return 0 if __name__ == "__main__": raise SystemExit(main(sys.argv[1:]))