762 lines
28 KiB
Python
762 lines
28 KiB
Python
#!/usr/bin/env python3
|
|
"""Verify that source-correlation apply is visible in AwoooP Status Chain.
|
|
|
|
This smoke intentionally uses an existing source-correlation review work item and
|
|
the public record/apply APIs. The apply path is append-only: it must not mutate
|
|
Incident state, auto-repair results, or tickets.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import sys
|
|
import time
|
|
import urllib.error
|
|
import urllib.parse
|
|
import urllib.request
|
|
from datetime import datetime, timezone
|
|
from typing import Any
|
|
|
|
|
|
SAFE_WORK_ITEM_TERMS = ("canary", "smoke", "codex")
|
|
TRANSIENT_GET_HTTP_CODES = {502, 503, 504}
|
|
GET_READBACK_ATTEMPTS = 4
|
|
GET_READBACK_RETRY_DELAY_SECONDS = 2.0
|
|
|
|
|
|
class SmokeError(RuntimeError):
|
|
"""Raised when the smoke cannot prove the expected production contract."""
|
|
|
|
|
|
def _source_review_work_item_id(item: dict[str, Any]) -> str:
|
|
work_item = item.get("work_item") if isinstance(item.get("work_item"), dict) else {}
|
|
review = (
|
|
item.get("source_correlation_review")
|
|
if isinstance(item.get("source_correlation_review"), dict)
|
|
else {}
|
|
)
|
|
apply = (
|
|
item.get("source_correlation_apply")
|
|
if isinstance(item.get("source_correlation_apply"), dict)
|
|
else {}
|
|
)
|
|
for candidate in [
|
|
apply.get("work_item_id"),
|
|
review.get("work_item_id"),
|
|
work_item.get("work_item_id"),
|
|
]:
|
|
safe_id = str(candidate or "").strip()
|
|
if safe_id.startswith("source-evidence:"):
|
|
return safe_id
|
|
return str(work_item.get("work_item_id") or "").strip()
|
|
|
|
|
|
def _url(base_url: str, path: str, params: dict[str, str | int | None] | None = None) -> str:
|
|
root = base_url.rstrip("/")
|
|
if not params:
|
|
return f"{root}{path}"
|
|
query = urllib.parse.urlencode({
|
|
key: value for key, value in params.items() if value is not None
|
|
})
|
|
return f"{root}{path}?{query}"
|
|
|
|
|
|
def _http_json(
|
|
url: str,
|
|
*,
|
|
method: str = "GET",
|
|
payload: dict[str, Any] | None = None,
|
|
timeout: int = 30,
|
|
) -> dict[str, Any]:
|
|
data = None
|
|
headers = {"Accept": "application/json"}
|
|
if payload is not None:
|
|
data = json.dumps(payload).encode("utf-8")
|
|
headers["Content-Type"] = "application/json"
|
|
request = urllib.request.Request(url, data=data, headers=headers, method=method)
|
|
attempts = GET_READBACK_ATTEMPTS if method.upper() == "GET" else 1
|
|
last_error: Exception | None = None
|
|
for attempt in range(max(attempts, 1)):
|
|
try:
|
|
with urllib.request.urlopen(request, timeout=timeout) as response:
|
|
return json.loads(response.read().decode("utf-8"))
|
|
except urllib.error.HTTPError as exc:
|
|
body = (
|
|
exc.read()
|
|
.decode("utf-8", errors="replace")
|
|
.encode("utf-8", errors="replace")
|
|
.decode("utf-8")
|
|
)[:500]
|
|
last_error = SmokeError(f"HTTP {exc.code} from {url}: {body}")
|
|
should_retry = (
|
|
method.upper() == "GET"
|
|
and exc.code in TRANSIENT_GET_HTTP_CODES
|
|
and attempt + 1 < attempts
|
|
)
|
|
if should_retry:
|
|
time.sleep(GET_READBACK_RETRY_DELAY_SECONDS)
|
|
continue
|
|
raise last_error from exc
|
|
except (urllib.error.URLError, TimeoutError, json.JSONDecodeError) as exc:
|
|
last_error = exc
|
|
should_retry = method.upper() == "GET" and attempt + 1 < attempts
|
|
if should_retry:
|
|
time.sleep(GET_READBACK_RETRY_DELAY_SECONDS)
|
|
continue
|
|
raise SmokeError(f"request failed for {url}: {exc}") from exc
|
|
raise SmokeError(f"request failed for {url}: {last_error}")
|
|
|
|
|
|
def _find_work_item(
|
|
recurrence: dict[str, Any],
|
|
*,
|
|
work_item_id: str | None,
|
|
allow_non_canary: bool,
|
|
allow_existing_apply: bool,
|
|
) -> dict[str, Any]:
|
|
items = recurrence.get("items")
|
|
if not isinstance(items, list):
|
|
raise SmokeError("recurrence response missing items")
|
|
|
|
candidates: list[dict[str, Any]] = []
|
|
for item in items:
|
|
if not isinstance(item, dict):
|
|
continue
|
|
work_item = item.get("work_item")
|
|
if not isinstance(work_item, dict):
|
|
continue
|
|
if work_item.get("kind") != "source_correlation_review":
|
|
continue
|
|
candidate_id = _source_review_work_item_id(item)
|
|
match_ids = {
|
|
candidate_id,
|
|
str(work_item.get("work_item_id") or "").strip(),
|
|
}
|
|
if work_item_id and work_item_id not in match_ids:
|
|
continue
|
|
if item.get("source_correlation_apply") and not allow_existing_apply:
|
|
continue
|
|
searchable = " ".join(
|
|
str(value or "")
|
|
for value in [
|
|
candidate_id,
|
|
item.get("alertname"),
|
|
item.get("latest_provider_event_id"),
|
|
item.get("latest_content_preview"),
|
|
]
|
|
).lower()
|
|
is_safe_canary = any(term in searchable for term in SAFE_WORK_ITEM_TERMS)
|
|
if not allow_non_canary and not is_safe_canary:
|
|
continue
|
|
candidates.append(item)
|
|
|
|
if not candidates:
|
|
if work_item_id:
|
|
raise SmokeError(
|
|
"source_correlation_review work item not found, already applied, "
|
|
"or not a canary/smoke item"
|
|
)
|
|
raise SmokeError("no unapplied canary/smoke source_correlation_review work item found")
|
|
return candidates[0]
|
|
|
|
|
|
def _has_expected_applied_event(
|
|
correlation: dict[str, Any],
|
|
expected_source_event_provider_event_id: str | None,
|
|
) -> bool:
|
|
if not expected_source_event_provider_event_id:
|
|
return True
|
|
candidates = correlation.get("top_candidates")
|
|
if not isinstance(candidates, list):
|
|
return False
|
|
for candidate in candidates:
|
|
if not isinstance(candidate, dict):
|
|
continue
|
|
if candidate.get("link_state") != "applied":
|
|
continue
|
|
if candidate.get("provider_event_id") == expected_source_event_provider_event_id:
|
|
return True
|
|
return False
|
|
|
|
|
|
def _parse_event_time(value: Any) -> datetime | None:
|
|
text = str(value or "").strip()
|
|
if not text:
|
|
return None
|
|
if text.endswith("Z"):
|
|
text = f"{text[:-1]}+00:00"
|
|
try:
|
|
parsed = datetime.fromisoformat(text)
|
|
except ValueError:
|
|
return None
|
|
if parsed.tzinfo is None:
|
|
parsed = parsed.replace(tzinfo=timezone.utc)
|
|
return parsed.astimezone(timezone.utc)
|
|
|
|
|
|
def _applied_link_age_days(correlation: dict[str, Any]) -> float | None:
|
|
latest = _parse_event_time(correlation.get("latest_applied_link_at"))
|
|
if latest is None:
|
|
return None
|
|
age_seconds = (datetime.now(timezone.utc) - latest).total_seconds()
|
|
return max(age_seconds, 0.0) / 86400
|
|
|
|
|
|
def _refresh_reason(
|
|
correlation: dict[str, Any],
|
|
*,
|
|
refresh_if_stale_days: float | None,
|
|
) -> str | None:
|
|
if refresh_if_stale_days is None:
|
|
return None
|
|
age_days = _applied_link_age_days(correlation)
|
|
if age_days is None:
|
|
return "latest_applied_link_at_missing"
|
|
if age_days > refresh_if_stale_days:
|
|
return (
|
|
f"latest_applied_link_age_days={age_days:.3f} "
|
|
f"> refresh_if_stale_days={refresh_if_stale_days:.3f}"
|
|
)
|
|
return None
|
|
|
|
|
|
def _wait_for_status_chain(
|
|
*,
|
|
api_url: str,
|
|
project_id: str,
|
|
incident_id: str,
|
|
min_applied: int,
|
|
expected_source_event_provider_event_id: str | None,
|
|
attempts: int,
|
|
interval_seconds: float,
|
|
) -> dict[str, Any]:
|
|
status_url = _url(
|
|
api_url,
|
|
"/api/v1/platform/status-chain",
|
|
{"project_id": project_id, "incident_id": incident_id},
|
|
)
|
|
last_correlation: dict[str, Any] = {}
|
|
for _ in range(attempts):
|
|
chain = _http_json(status_url)
|
|
correlation = (
|
|
chain.get("source_refs", {}).get("correlation", {})
|
|
if isinstance(chain.get("source_refs"), dict)
|
|
else {}
|
|
)
|
|
if isinstance(correlation, dict):
|
|
last_correlation = correlation
|
|
applied_total = int(last_correlation.get("applied_link_total") or 0)
|
|
verification = str(last_correlation.get("verification_status") or "")
|
|
if (
|
|
applied_total >= min_applied
|
|
and verification == "applied_link_verified"
|
|
and _has_expected_applied_event(
|
|
last_correlation,
|
|
expected_source_event_provider_event_id,
|
|
)
|
|
):
|
|
return chain
|
|
time.sleep(interval_seconds)
|
|
|
|
raise SmokeError(
|
|
"status-chain did not expose applied source link: "
|
|
f"verification_status={last_correlation.get('verification_status')} "
|
|
f"applied_link_total={last_correlation.get('applied_link_total')} "
|
|
f"expected_event={expected_source_event_provider_event_id or '-'}"
|
|
)
|
|
|
|
|
|
def _verify_existing_status_chain(
|
|
args: argparse.Namespace,
|
|
*,
|
|
status: str,
|
|
work_item_id: str | None = None,
|
|
require_expected_event: bool = True,
|
|
) -> dict[str, Any]:
|
|
chain = _wait_for_status_chain(
|
|
api_url=args.api_url,
|
|
project_id=args.project_id,
|
|
incident_id=args.target_incident_id,
|
|
min_applied=args.min_applied,
|
|
expected_source_event_provider_event_id=(
|
|
args.expected_source_event_provider_event_id
|
|
if require_expected_event
|
|
else None
|
|
),
|
|
attempts=args.status_attempts,
|
|
interval_seconds=args.status_interval_seconds,
|
|
)
|
|
correlation = chain["source_refs"]["correlation"]
|
|
age_days = _applied_link_age_days(correlation)
|
|
return {
|
|
"status": status,
|
|
"work_item_id": work_item_id or args.work_item_id,
|
|
"target_incident_id": args.target_incident_id,
|
|
"expected_source_event_provider_event_id": (
|
|
args.expected_source_event_provider_event_id
|
|
),
|
|
"verification_status": correlation.get("verification_status"),
|
|
"applied_link_total": correlation.get("applied_link_total"),
|
|
"latest_applied_link_at": correlation.get("latest_applied_link_at"),
|
|
"applied_link_age_days": round(age_days, 6) if age_days is not None else None,
|
|
}
|
|
|
|
|
|
def _select_refresh_work_item(
|
|
recurrence: dict[str, Any],
|
|
*,
|
|
args: argparse.Namespace,
|
|
) -> dict[str, Any]:
|
|
refresh_work_item_id = args.refresh_work_item_id or args.work_item_id
|
|
if not refresh_work_item_id and not args.refresh_from_latest_canary:
|
|
raise SmokeError(
|
|
"applied source link is stale, but no refresh source was configured; "
|
|
"pass --refresh-work-item-id or --refresh-from-latest-canary"
|
|
)
|
|
return _find_work_item(
|
|
recurrence,
|
|
work_item_id=refresh_work_item_id,
|
|
allow_non_canary=args.allow_non_canary,
|
|
allow_existing_apply=False,
|
|
)
|
|
|
|
|
|
def _refresh_candidate_result(item: dict[str, Any]) -> dict[str, Any]:
|
|
return {
|
|
"refresh_candidate_status": "ready",
|
|
"refresh_candidate_work_item_id": _source_review_work_item_id(item),
|
|
"refresh_candidate_latest_provider_event_id": item.get(
|
|
"latest_provider_event_id"
|
|
),
|
|
"refresh_candidate_alertname": item.get("alertname"),
|
|
}
|
|
|
|
|
|
def _failed_check_summary(payload: dict[str, Any]) -> str:
|
|
checks = payload.get("checks")
|
|
if not isinstance(checks, list):
|
|
return "-"
|
|
failed: list[str] = []
|
|
for check in checks:
|
|
if not isinstance(check, dict) or check.get("passed") is True:
|
|
continue
|
|
name = str(check.get("name") or "unknown").strip() or "unknown"
|
|
detail = str(check.get("detail") or "failed").strip() or "failed"
|
|
failed.append(f"{name}={detail}")
|
|
return ", ".join(failed) if failed else "-"
|
|
|
|
|
|
def _source_review_readback_state(
|
|
recurrence: dict[str, Any],
|
|
*,
|
|
work_item_id: str,
|
|
) -> dict[str, Any]:
|
|
items = recurrence.get("items")
|
|
if not isinstance(items, list):
|
|
return {"found": False, "decision": "missing_items", "review_id": None}
|
|
for item in items:
|
|
if not isinstance(item, dict):
|
|
continue
|
|
work_item = item.get("work_item")
|
|
if not isinstance(work_item, dict):
|
|
continue
|
|
source_review = (
|
|
item.get("source_correlation_review")
|
|
if isinstance(item.get("source_correlation_review"), dict)
|
|
else {}
|
|
)
|
|
candidate_ids = {
|
|
_source_review_work_item_id(item),
|
|
str(work_item.get("work_item_id") or "").strip(),
|
|
str(source_review.get("work_item_id") or "").strip(),
|
|
}
|
|
if work_item_id not in candidate_ids:
|
|
continue
|
|
return {
|
|
"found": True,
|
|
"decision": str(source_review.get("decision") or "missing").strip()
|
|
or "missing",
|
|
"review_id": source_review.get("review_id"),
|
|
}
|
|
return {"found": False, "decision": "not_found", "review_id": None}
|
|
|
|
|
|
def _wait_for_review_readback(
|
|
*,
|
|
args: argparse.Namespace,
|
|
work_item_id: str,
|
|
) -> dict[str, Any]:
|
|
recurrence_url = _url(
|
|
args.api_url,
|
|
"/api/v1/platform/events/dossier/recurrence",
|
|
{
|
|
"project_id": args.project_id,
|
|
"provider": args.provider,
|
|
"limit": args.limit,
|
|
},
|
|
)
|
|
last_state: dict[str, Any] = {}
|
|
attempts = max(int(args.review_readback_attempts or 1), 1)
|
|
for attempt in range(attempts):
|
|
recurrence = _http_json(recurrence_url)
|
|
last_state = _source_review_readback_state(
|
|
recurrence,
|
|
work_item_id=work_item_id,
|
|
)
|
|
if last_state.get("found") and last_state.get("decision") == "accepted":
|
|
return last_state
|
|
if attempt + 1 < attempts:
|
|
time.sleep(max(float(args.review_readback_interval_seconds or 0), 0.0))
|
|
raise SmokeError(
|
|
"accepted review did not appear in recurrence read model: "
|
|
f"work_item_id={work_item_id} found={last_state.get('found')} "
|
|
f"decision={last_state.get('decision')}"
|
|
)
|
|
|
|
|
|
def _apply_source_correlation_item(
|
|
*,
|
|
args: argparse.Namespace,
|
|
work_item_id: str,
|
|
status: str,
|
|
refresh_reason: str | None = None,
|
|
previous_latest_applied_link_at: str | None = None,
|
|
) -> dict[str, Any]:
|
|
review_url = _url(
|
|
args.api_url,
|
|
"/api/v1/platform/events/dossier/recurrence/source-correlation/review",
|
|
)
|
|
review = _http_json(
|
|
review_url,
|
|
method="POST",
|
|
payload={
|
|
"project_id": args.project_id,
|
|
"provider": args.provider,
|
|
"limit": args.limit,
|
|
"work_item_id": work_item_id,
|
|
"decision": "accepted",
|
|
"target_incident_id": args.target_incident_id,
|
|
"reviewer_id": args.reviewer_id,
|
|
"operator_note": args.operator_note,
|
|
},
|
|
)
|
|
if review.get("review_record_status") != "recorded" or not review.get("allowed"):
|
|
raise SmokeError(
|
|
"accepted review was not recorded: "
|
|
f"status={review.get('review_record_status')} allowed={review.get('allowed')}"
|
|
)
|
|
_wait_for_review_readback(args=args, work_item_id=work_item_id)
|
|
|
|
apply_url = _url(
|
|
args.api_url,
|
|
"/api/v1/platform/events/dossier/recurrence/source-correlation/apply",
|
|
)
|
|
apply_result = _http_json(
|
|
apply_url,
|
|
method="POST",
|
|
payload={
|
|
"project_id": args.project_id,
|
|
"provider": args.provider,
|
|
"limit": args.limit,
|
|
"work_item_id": work_item_id,
|
|
"reviewer_id": args.reviewer_id,
|
|
"operator_note": args.operator_note,
|
|
},
|
|
)
|
|
if apply_result.get("apply_status") != "applied":
|
|
raise SmokeError(
|
|
"source correlation apply did not complete: "
|
|
f"status={apply_result.get('apply_status')} "
|
|
f"allowed={apply_result.get('allowed')} "
|
|
f"failed_checks={_failed_check_summary(apply_result)}"
|
|
)
|
|
if apply_result.get("writes_incident_state") is not False:
|
|
raise SmokeError("apply unexpectedly wrote Incident state")
|
|
if apply_result.get("writes_auto_repair_result") is not False:
|
|
raise SmokeError("apply unexpectedly wrote auto-repair state")
|
|
if apply_result.get("writes_ticket") is not False:
|
|
raise SmokeError("apply unexpectedly wrote ticket state")
|
|
|
|
chain = _wait_for_status_chain(
|
|
api_url=args.api_url,
|
|
project_id=args.project_id,
|
|
incident_id=args.target_incident_id,
|
|
min_applied=args.min_applied,
|
|
expected_source_event_provider_event_id=args.expected_source_event_provider_event_id,
|
|
attempts=args.status_attempts,
|
|
interval_seconds=args.status_interval_seconds,
|
|
)
|
|
correlation = chain["source_refs"]["correlation"]
|
|
age_days = _applied_link_age_days(correlation)
|
|
return {
|
|
"status": status,
|
|
"work_item_id": work_item_id,
|
|
"target_incident_id": args.target_incident_id,
|
|
"review_status": review.get("review_record_status"),
|
|
"apply_status": apply_result.get("apply_status"),
|
|
"source_event_provider_event_id": apply_result.get(
|
|
"source_event_provider_event_id"
|
|
),
|
|
"expected_source_event_provider_event_id": (
|
|
args.expected_source_event_provider_event_id
|
|
),
|
|
"writes_incident_state": apply_result.get("writes_incident_state"),
|
|
"writes_auto_repair_result": apply_result.get("writes_auto_repair_result"),
|
|
"writes_ticket": apply_result.get("writes_ticket"),
|
|
"verification_status": correlation.get("verification_status"),
|
|
"applied_link_total": correlation.get("applied_link_total"),
|
|
"latest_applied_link_at": correlation.get("latest_applied_link_at"),
|
|
"applied_link_age_days": round(age_days, 6) if age_days is not None else None,
|
|
"refresh_reason": refresh_reason,
|
|
"previous_latest_applied_link_at": previous_latest_applied_link_at,
|
|
}
|
|
|
|
|
|
def run(args: argparse.Namespace) -> dict[str, Any]:
|
|
recurrence_url = _url(
|
|
args.api_url,
|
|
"/api/v1/platform/events/dossier/recurrence",
|
|
{
|
|
"project_id": args.project_id,
|
|
"provider": args.provider,
|
|
"limit": args.limit,
|
|
},
|
|
)
|
|
recurrence = _http_json(recurrence_url)
|
|
if args.allow_existing_apply and args.refresh_if_stale_days is not None:
|
|
try:
|
|
existing = _verify_existing_status_chain(
|
|
args,
|
|
status="already_applied",
|
|
require_expected_event=False,
|
|
)
|
|
except SmokeError as exc:
|
|
refresh_reason = f"status_chain_verify_failed:{exc}"
|
|
previous_latest_applied_link_at = None
|
|
else:
|
|
correlation = {
|
|
"latest_applied_link_at": existing.get("latest_applied_link_at"),
|
|
"applied_link_total": existing.get("applied_link_total"),
|
|
"verification_status": existing.get("verification_status"),
|
|
}
|
|
refresh_reason = _refresh_reason(
|
|
correlation,
|
|
refresh_if_stale_days=args.refresh_if_stale_days,
|
|
)
|
|
if not refresh_reason:
|
|
refresh_candidate: dict[str, Any] = {}
|
|
if args.verify_refresh_candidate:
|
|
refresh_candidate = _refresh_candidate_result(
|
|
_select_refresh_work_item(recurrence, args=args)
|
|
)
|
|
return {
|
|
**existing,
|
|
"apply_status": "applied",
|
|
"refresh_if_stale_days": args.refresh_if_stale_days,
|
|
"refresh_reason": None,
|
|
**refresh_candidate,
|
|
}
|
|
previous_latest_applied_link_at = str(
|
|
existing.get("latest_applied_link_at") or ""
|
|
) or None
|
|
|
|
refresh_item = _select_refresh_work_item(recurrence, args=args)
|
|
refresh_work_item_id = _source_review_work_item_id(refresh_item)
|
|
if args.dry_run:
|
|
return {
|
|
"status": "dry_run_refresh",
|
|
"refresh_work_item_id": refresh_work_item_id,
|
|
"target_incident_id": args.target_incident_id,
|
|
"refresh_if_stale_days": args.refresh_if_stale_days,
|
|
"refresh_reason": refresh_reason,
|
|
"previous_latest_applied_link_at": previous_latest_applied_link_at,
|
|
"refresh_latest_provider_event_id": refresh_item.get(
|
|
"latest_provider_event_id"
|
|
),
|
|
"refresh_alertname": refresh_item.get("alertname"),
|
|
}
|
|
return _apply_source_correlation_item(
|
|
args=args,
|
|
work_item_id=refresh_work_item_id,
|
|
status="refreshed",
|
|
refresh_reason=refresh_reason,
|
|
previous_latest_applied_link_at=previous_latest_applied_link_at,
|
|
)
|
|
|
|
try:
|
|
item = _find_work_item(
|
|
recurrence,
|
|
work_item_id=args.work_item_id,
|
|
allow_non_canary=args.allow_non_canary,
|
|
allow_existing_apply=args.allow_existing_apply,
|
|
)
|
|
except SmokeError:
|
|
if args.allow_existing_apply and args.work_item_id:
|
|
return _verify_existing_status_chain(
|
|
args,
|
|
status="verified_existing_apply_without_recurrence",
|
|
)
|
|
raise
|
|
work_item_id = _source_review_work_item_id(item)
|
|
existing_apply = item.get("source_correlation_apply")
|
|
|
|
if (
|
|
args.dry_run
|
|
and not (
|
|
existing_apply
|
|
and args.allow_existing_apply
|
|
and args.refresh_if_stale_days is not None
|
|
)
|
|
):
|
|
return {
|
|
"status": "dry_run",
|
|
"work_item_id": work_item_id,
|
|
"target_incident_id": args.target_incident_id,
|
|
"latest_provider_event_id": item.get("latest_provider_event_id"),
|
|
"alertname": item.get("alertname"),
|
|
"already_applied": bool(existing_apply),
|
|
}
|
|
|
|
if existing_apply and args.allow_existing_apply:
|
|
try:
|
|
existing = _verify_existing_status_chain(
|
|
args,
|
|
status="already_applied",
|
|
work_item_id=work_item_id,
|
|
)
|
|
except SmokeError as exc:
|
|
if args.refresh_if_stale_days is None:
|
|
raise
|
|
refresh_reason = f"status_chain_verify_failed:{exc}"
|
|
previous_latest_applied_link_at = None
|
|
else:
|
|
correlation = {
|
|
"latest_applied_link_at": existing.get("latest_applied_link_at"),
|
|
"applied_link_total": existing.get("applied_link_total"),
|
|
"verification_status": existing.get("verification_status"),
|
|
}
|
|
refresh_reason = _refresh_reason(
|
|
correlation,
|
|
refresh_if_stale_days=args.refresh_if_stale_days,
|
|
)
|
|
if not refresh_reason:
|
|
refresh_candidate: dict[str, Any] = {}
|
|
if args.verify_refresh_candidate:
|
|
refresh_candidate = _refresh_candidate_result(
|
|
_select_refresh_work_item(recurrence, args=args)
|
|
)
|
|
return {
|
|
**existing,
|
|
"apply_status": existing_apply.get("apply_status"),
|
|
"source_event_provider_event_id": existing_apply.get(
|
|
"source_event_provider_event_id"
|
|
),
|
|
"refresh_if_stale_days": args.refresh_if_stale_days,
|
|
"refresh_reason": None,
|
|
**refresh_candidate,
|
|
}
|
|
previous_latest_applied_link_at = str(
|
|
existing.get("latest_applied_link_at") or ""
|
|
) or None
|
|
|
|
refresh_item = _select_refresh_work_item(recurrence, args=args)
|
|
refresh_work_item_id = _source_review_work_item_id(refresh_item)
|
|
if args.dry_run:
|
|
return {
|
|
"status": "dry_run_refresh",
|
|
"work_item_id": work_item_id,
|
|
"refresh_work_item_id": refresh_work_item_id,
|
|
"target_incident_id": args.target_incident_id,
|
|
"refresh_if_stale_days": args.refresh_if_stale_days,
|
|
"refresh_reason": refresh_reason,
|
|
"previous_latest_applied_link_at": previous_latest_applied_link_at,
|
|
"refresh_latest_provider_event_id": refresh_item.get(
|
|
"latest_provider_event_id"
|
|
),
|
|
"refresh_alertname": refresh_item.get("alertname"),
|
|
}
|
|
return _apply_source_correlation_item(
|
|
args=args,
|
|
work_item_id=refresh_work_item_id,
|
|
status="refreshed",
|
|
refresh_reason=refresh_reason,
|
|
previous_latest_applied_link_at=previous_latest_applied_link_at,
|
|
)
|
|
|
|
return _apply_source_correlation_item(
|
|
args=args,
|
|
work_item_id=work_item_id,
|
|
status="passed",
|
|
)
|
|
|
|
|
|
def parse_args(argv: list[str]) -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(
|
|
description="Run AwoooP source-correlation apply status-chain smoke."
|
|
)
|
|
parser.add_argument("--api-url", default="https://awoooi.wooo.work")
|
|
parser.add_argument("--project-id", default="awoooi")
|
|
parser.add_argument("--provider", default="sentry")
|
|
parser.add_argument("--limit", type=int, default=300)
|
|
parser.add_argument("--work-item-id")
|
|
parser.add_argument("--target-incident-id", required=True)
|
|
parser.add_argument("--expected-source-event-provider-event-id")
|
|
parser.add_argument("--reviewer-id", default="codex_t120_source_apply_smoke")
|
|
parser.add_argument(
|
|
"--operator-note",
|
|
default=(
|
|
"T120 controlled source-correlation apply smoke; append-only status-chain "
|
|
"verification, not an Incident state mutation"
|
|
),
|
|
)
|
|
parser.add_argument("--min-applied", type=int, default=1)
|
|
parser.add_argument("--status-attempts", type=int, default=8)
|
|
parser.add_argument("--status-interval-seconds", type=float, default=2.0)
|
|
parser.add_argument("--review-readback-attempts", type=int, default=8)
|
|
parser.add_argument("--review-readback-interval-seconds", type=float, default=2.0)
|
|
parser.add_argument("--allow-non-canary", action="store_true")
|
|
parser.add_argument("--allow-existing-apply", action="store_true")
|
|
parser.add_argument(
|
|
"--refresh-if-stale-days",
|
|
type=float,
|
|
help=(
|
|
"When an existing applied link is older than this many days, apply "
|
|
"a fresh canary source-correlation work item instead of passing on "
|
|
"near-expired evidence."
|
|
),
|
|
)
|
|
parser.add_argument(
|
|
"--refresh-work-item-id",
|
|
help="Specific canary source-correlation work item to apply when refresh is needed.",
|
|
)
|
|
parser.add_argument(
|
|
"--refresh-from-latest-canary",
|
|
action="store_true",
|
|
help="When refresh is needed, use the latest unapplied canary/smoke/codex work item.",
|
|
)
|
|
parser.add_argument(
|
|
"--verify-refresh-candidate",
|
|
action="store_true",
|
|
help=(
|
|
"Even when the existing applied link is still fresh, verify that "
|
|
"the configured refresh work item exists and is safe to apply later."
|
|
),
|
|
)
|
|
parser.add_argument("--dry-run", action="store_true")
|
|
return parser.parse_args(argv)
|
|
|
|
|
|
def main(argv: list[str]) -> int:
|
|
try:
|
|
result = run(parse_args(argv))
|
|
except SmokeError as exc:
|
|
print(json.dumps({"status": "failed", "error": str(exc)}, ensure_ascii=False))
|
|
return 1
|
|
print(json.dumps(result, ensure_ascii=False, indent=2, sort_keys=True))
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main(sys.argv[1:]))
|