Files
awoooi/scripts/awooop_source_correlation_apply_smoke.py
Your Name 802c4e5ab2
Some checks failed
Code Review / ai-code-review (push) Successful in 14s
CD Pipeline / tests (push) Successful in 1m31s
CD Pipeline / build-and-deploy (push) Successful in 3m57s
CD Pipeline / post-deploy-checks (push) Failing after 15s
fix(awooop): 等待 source correlation review 回寫
2026-06-15 12:42:37 +08:00

670 lines
24 KiB
Python

#!/usr/bin/env python3
"""Verify that source-correlation apply is visible in AwoooP Status Chain.
This smoke intentionally uses an existing source-correlation review work item and
the public record/apply APIs. The apply path is append-only: it must not mutate
Incident state, auto-repair results, or tickets.
"""
from __future__ import annotations
import argparse
import json
import sys
import time
import urllib.error
import urllib.parse
import urllib.request
from datetime import datetime, timezone
from typing import Any
SAFE_WORK_ITEM_TERMS = ("canary", "smoke", "codex")
class SmokeError(RuntimeError):
"""Raised when the smoke cannot prove the expected production contract."""
def _source_review_work_item_id(item: dict[str, Any]) -> str:
work_item = item.get("work_item") if isinstance(item.get("work_item"), dict) else {}
review = (
item.get("source_correlation_review")
if isinstance(item.get("source_correlation_review"), dict)
else {}
)
apply = (
item.get("source_correlation_apply")
if isinstance(item.get("source_correlation_apply"), dict)
else {}
)
for candidate in [
apply.get("work_item_id"),
review.get("work_item_id"),
work_item.get("work_item_id"),
]:
safe_id = str(candidate or "").strip()
if safe_id.startswith("source-evidence:"):
return safe_id
return str(work_item.get("work_item_id") or "").strip()
def _url(base_url: str, path: str, params: dict[str, str | int | None] | None = None) -> str:
root = base_url.rstrip("/")
if not params:
return f"{root}{path}"
query = urllib.parse.urlencode({
key: value for key, value in params.items() if value is not None
})
return f"{root}{path}?{query}"
def _http_json(
url: str,
*,
method: str = "GET",
payload: dict[str, Any] | None = None,
timeout: int = 30,
) -> dict[str, Any]:
data = None
headers = {"Accept": "application/json"}
if payload is not None:
data = json.dumps(payload).encode("utf-8")
headers["Content-Type"] = "application/json"
request = urllib.request.Request(url, data=data, headers=headers, method=method)
try:
with urllib.request.urlopen(request, timeout=timeout) as response:
return json.loads(response.read().decode("utf-8"))
except urllib.error.HTTPError as exc:
body = exc.read().decode("utf-8", errors="replace")[:500]
raise SmokeError(f"HTTP {exc.code} from {url}: {body}") from exc
except (urllib.error.URLError, TimeoutError, json.JSONDecodeError) as exc:
raise SmokeError(f"request failed for {url}: {exc}") from exc
def _find_work_item(
recurrence: dict[str, Any],
*,
work_item_id: str | None,
allow_non_canary: bool,
allow_existing_apply: bool,
) -> dict[str, Any]:
items = recurrence.get("items")
if not isinstance(items, list):
raise SmokeError("recurrence response missing items")
candidates: list[dict[str, Any]] = []
for item in items:
if not isinstance(item, dict):
continue
work_item = item.get("work_item")
if not isinstance(work_item, dict):
continue
if work_item.get("kind") != "source_correlation_review":
continue
candidate_id = _source_review_work_item_id(item)
match_ids = {
candidate_id,
str(work_item.get("work_item_id") or "").strip(),
}
if work_item_id and work_item_id not in match_ids:
continue
if item.get("source_correlation_apply") and not allow_existing_apply:
continue
searchable = " ".join(
str(value or "")
for value in [
candidate_id,
item.get("alertname"),
item.get("latest_provider_event_id"),
item.get("latest_content_preview"),
]
).lower()
is_safe_canary = any(term in searchable for term in SAFE_WORK_ITEM_TERMS)
if not allow_non_canary and not is_safe_canary:
continue
candidates.append(item)
if not candidates:
if work_item_id:
raise SmokeError(
"source_correlation_review work item not found, already applied, "
"or not a canary/smoke item"
)
raise SmokeError("no unapplied canary/smoke source_correlation_review work item found")
return candidates[0]
def _has_expected_applied_event(
correlation: dict[str, Any],
expected_source_event_provider_event_id: str | None,
) -> bool:
if not expected_source_event_provider_event_id:
return True
candidates = correlation.get("top_candidates")
if not isinstance(candidates, list):
return False
for candidate in candidates:
if not isinstance(candidate, dict):
continue
if candidate.get("link_state") != "applied":
continue
if candidate.get("provider_event_id") == expected_source_event_provider_event_id:
return True
return False
def _parse_event_time(value: Any) -> datetime | None:
text = str(value or "").strip()
if not text:
return None
if text.endswith("Z"):
text = f"{text[:-1]}+00:00"
try:
parsed = datetime.fromisoformat(text)
except ValueError:
return None
if parsed.tzinfo is None:
parsed = parsed.replace(tzinfo=timezone.utc)
return parsed.astimezone(timezone.utc)
def _applied_link_age_days(correlation: dict[str, Any]) -> float | None:
latest = _parse_event_time(correlation.get("latest_applied_link_at"))
if latest is None:
return None
age_seconds = (datetime.now(timezone.utc) - latest).total_seconds()
return max(age_seconds, 0.0) / 86400
def _refresh_reason(
correlation: dict[str, Any],
*,
refresh_if_stale_days: float | None,
) -> str | None:
if refresh_if_stale_days is None:
return None
age_days = _applied_link_age_days(correlation)
if age_days is None:
return "latest_applied_link_at_missing"
if age_days > refresh_if_stale_days:
return (
f"latest_applied_link_age_days={age_days:.3f} "
f"> refresh_if_stale_days={refresh_if_stale_days:.3f}"
)
return None
def _wait_for_status_chain(
*,
api_url: str,
project_id: str,
incident_id: str,
min_applied: int,
expected_source_event_provider_event_id: str | None,
attempts: int,
interval_seconds: float,
) -> dict[str, Any]:
status_url = _url(
api_url,
"/api/v1/platform/status-chain",
{"project_id": project_id, "incident_id": incident_id},
)
last_correlation: dict[str, Any] = {}
for _ in range(attempts):
chain = _http_json(status_url)
correlation = (
chain.get("source_refs", {}).get("correlation", {})
if isinstance(chain.get("source_refs"), dict)
else {}
)
if isinstance(correlation, dict):
last_correlation = correlation
applied_total = int(last_correlation.get("applied_link_total") or 0)
verification = str(last_correlation.get("verification_status") or "")
if (
applied_total >= min_applied
and verification == "applied_link_verified"
and _has_expected_applied_event(
last_correlation,
expected_source_event_provider_event_id,
)
):
return chain
time.sleep(interval_seconds)
raise SmokeError(
"status-chain did not expose applied source link: "
f"verification_status={last_correlation.get('verification_status')} "
f"applied_link_total={last_correlation.get('applied_link_total')} "
f"expected_event={expected_source_event_provider_event_id or '-'}"
)
def _verify_existing_status_chain(
args: argparse.Namespace,
*,
status: str,
work_item_id: str | None = None,
) -> dict[str, Any]:
chain = _wait_for_status_chain(
api_url=args.api_url,
project_id=args.project_id,
incident_id=args.target_incident_id,
min_applied=args.min_applied,
expected_source_event_provider_event_id=args.expected_source_event_provider_event_id,
attempts=args.status_attempts,
interval_seconds=args.status_interval_seconds,
)
correlation = chain["source_refs"]["correlation"]
age_days = _applied_link_age_days(correlation)
return {
"status": status,
"work_item_id": work_item_id or args.work_item_id,
"target_incident_id": args.target_incident_id,
"expected_source_event_provider_event_id": (
args.expected_source_event_provider_event_id
),
"verification_status": correlation.get("verification_status"),
"applied_link_total": correlation.get("applied_link_total"),
"latest_applied_link_at": correlation.get("latest_applied_link_at"),
"applied_link_age_days": round(age_days, 6) if age_days is not None else None,
}
def _select_refresh_work_item(
recurrence: dict[str, Any],
*,
args: argparse.Namespace,
) -> dict[str, Any]:
if not args.refresh_work_item_id and not args.refresh_from_latest_canary:
raise SmokeError(
"applied source link is stale, but no refresh source was configured; "
"pass --refresh-work-item-id or --refresh-from-latest-canary"
)
return _find_work_item(
recurrence,
work_item_id=args.refresh_work_item_id,
allow_non_canary=args.allow_non_canary,
allow_existing_apply=False,
)
def _refresh_candidate_result(item: dict[str, Any]) -> dict[str, Any]:
return {
"refresh_candidate_status": "ready",
"refresh_candidate_work_item_id": _source_review_work_item_id(item),
"refresh_candidate_latest_provider_event_id": item.get(
"latest_provider_event_id"
),
"refresh_candidate_alertname": item.get("alertname"),
}
def _failed_check_summary(payload: dict[str, Any]) -> str:
checks = payload.get("checks")
if not isinstance(checks, list):
return "-"
failed: list[str] = []
for check in checks:
if not isinstance(check, dict) or check.get("passed") is True:
continue
name = str(check.get("name") or "unknown").strip() or "unknown"
detail = str(check.get("detail") or "failed").strip() or "failed"
failed.append(f"{name}={detail}")
return ", ".join(failed) if failed else "-"
def _source_review_readback_state(
recurrence: dict[str, Any],
*,
work_item_id: str,
) -> dict[str, Any]:
items = recurrence.get("items")
if not isinstance(items, list):
return {"found": False, "decision": "missing_items", "review_id": None}
for item in items:
if not isinstance(item, dict):
continue
work_item = item.get("work_item")
if not isinstance(work_item, dict):
continue
source_review = (
item.get("source_correlation_review")
if isinstance(item.get("source_correlation_review"), dict)
else {}
)
candidate_ids = {
_source_review_work_item_id(item),
str(work_item.get("work_item_id") or "").strip(),
str(source_review.get("work_item_id") or "").strip(),
}
if work_item_id not in candidate_ids:
continue
return {
"found": True,
"decision": str(source_review.get("decision") or "missing").strip()
or "missing",
"review_id": source_review.get("review_id"),
}
return {"found": False, "decision": "not_found", "review_id": None}
def _wait_for_review_readback(
*,
args: argparse.Namespace,
work_item_id: str,
) -> dict[str, Any]:
recurrence_url = _url(
args.api_url,
"/api/v1/platform/events/dossier/recurrence",
{
"project_id": args.project_id,
"provider": args.provider,
"limit": args.limit,
},
)
last_state: dict[str, Any] = {}
attempts = max(int(args.review_readback_attempts or 1), 1)
for attempt in range(attempts):
recurrence = _http_json(recurrence_url)
last_state = _source_review_readback_state(
recurrence,
work_item_id=work_item_id,
)
if last_state.get("found") and last_state.get("decision") == "accepted":
return last_state
if attempt + 1 < attempts:
time.sleep(max(float(args.review_readback_interval_seconds or 0), 0.0))
raise SmokeError(
"accepted review did not appear in recurrence read model: "
f"work_item_id={work_item_id} found={last_state.get('found')} "
f"decision={last_state.get('decision')}"
)
def _apply_source_correlation_item(
*,
args: argparse.Namespace,
work_item_id: str,
status: str,
refresh_reason: str | None = None,
previous_latest_applied_link_at: str | None = None,
) -> dict[str, Any]:
review_url = _url(
args.api_url,
"/api/v1/platform/events/dossier/recurrence/source-correlation/review",
)
review = _http_json(
review_url,
method="POST",
payload={
"project_id": args.project_id,
"provider": args.provider,
"limit": args.limit,
"work_item_id": work_item_id,
"decision": "accepted",
"target_incident_id": args.target_incident_id,
"reviewer_id": args.reviewer_id,
"operator_note": args.operator_note,
},
)
if review.get("review_record_status") != "recorded" or not review.get("allowed"):
raise SmokeError(
"accepted review was not recorded: "
f"status={review.get('review_record_status')} allowed={review.get('allowed')}"
)
_wait_for_review_readback(args=args, work_item_id=work_item_id)
apply_url = _url(
args.api_url,
"/api/v1/platform/events/dossier/recurrence/source-correlation/apply",
)
apply_result = _http_json(
apply_url,
method="POST",
payload={
"project_id": args.project_id,
"provider": args.provider,
"limit": args.limit,
"work_item_id": work_item_id,
"reviewer_id": args.reviewer_id,
"operator_note": args.operator_note,
},
)
if apply_result.get("apply_status") != "applied":
raise SmokeError(
"source correlation apply did not complete: "
f"status={apply_result.get('apply_status')} "
f"allowed={apply_result.get('allowed')} "
f"failed_checks={_failed_check_summary(apply_result)}"
)
if apply_result.get("writes_incident_state") is not False:
raise SmokeError("apply unexpectedly wrote Incident state")
if apply_result.get("writes_auto_repair_result") is not False:
raise SmokeError("apply unexpectedly wrote auto-repair state")
if apply_result.get("writes_ticket") is not False:
raise SmokeError("apply unexpectedly wrote ticket state")
chain = _wait_for_status_chain(
api_url=args.api_url,
project_id=args.project_id,
incident_id=args.target_incident_id,
min_applied=args.min_applied,
expected_source_event_provider_event_id=args.expected_source_event_provider_event_id,
attempts=args.status_attempts,
interval_seconds=args.status_interval_seconds,
)
correlation = chain["source_refs"]["correlation"]
age_days = _applied_link_age_days(correlation)
return {
"status": status,
"work_item_id": work_item_id,
"target_incident_id": args.target_incident_id,
"review_status": review.get("review_record_status"),
"apply_status": apply_result.get("apply_status"),
"source_event_provider_event_id": apply_result.get(
"source_event_provider_event_id"
),
"expected_source_event_provider_event_id": (
args.expected_source_event_provider_event_id
),
"writes_incident_state": apply_result.get("writes_incident_state"),
"writes_auto_repair_result": apply_result.get("writes_auto_repair_result"),
"writes_ticket": apply_result.get("writes_ticket"),
"verification_status": correlation.get("verification_status"),
"applied_link_total": correlation.get("applied_link_total"),
"latest_applied_link_at": correlation.get("latest_applied_link_at"),
"applied_link_age_days": round(age_days, 6) if age_days is not None else None,
"refresh_reason": refresh_reason,
"previous_latest_applied_link_at": previous_latest_applied_link_at,
}
def run(args: argparse.Namespace) -> dict[str, Any]:
recurrence_url = _url(
args.api_url,
"/api/v1/platform/events/dossier/recurrence",
{
"project_id": args.project_id,
"provider": args.provider,
"limit": args.limit,
},
)
recurrence = _http_json(recurrence_url)
try:
item = _find_work_item(
recurrence,
work_item_id=args.work_item_id,
allow_non_canary=args.allow_non_canary,
allow_existing_apply=args.allow_existing_apply,
)
except SmokeError:
if args.allow_existing_apply and args.work_item_id:
return _verify_existing_status_chain(
args,
status="verified_existing_apply_without_recurrence",
)
raise
work_item_id = _source_review_work_item_id(item)
existing_apply = item.get("source_correlation_apply")
if (
args.dry_run
and not (
existing_apply
and args.allow_existing_apply
and args.refresh_if_stale_days is not None
)
):
return {
"status": "dry_run",
"work_item_id": work_item_id,
"target_incident_id": args.target_incident_id,
"latest_provider_event_id": item.get("latest_provider_event_id"),
"alertname": item.get("alertname"),
"already_applied": bool(existing_apply),
}
if existing_apply and args.allow_existing_apply:
try:
existing = _verify_existing_status_chain(
args,
status="already_applied",
work_item_id=work_item_id,
)
except SmokeError as exc:
if args.refresh_if_stale_days is None:
raise
refresh_reason = f"status_chain_verify_failed:{exc}"
previous_latest_applied_link_at = None
else:
correlation = {
"latest_applied_link_at": existing.get("latest_applied_link_at"),
"applied_link_total": existing.get("applied_link_total"),
"verification_status": existing.get("verification_status"),
}
refresh_reason = _refresh_reason(
correlation,
refresh_if_stale_days=args.refresh_if_stale_days,
)
if not refresh_reason:
refresh_candidate: dict[str, Any] = {}
if args.verify_refresh_candidate:
refresh_candidate = _refresh_candidate_result(
_select_refresh_work_item(recurrence, args=args)
)
return {
**existing,
"apply_status": existing_apply.get("apply_status"),
"source_event_provider_event_id": existing_apply.get(
"source_event_provider_event_id"
),
"refresh_if_stale_days": args.refresh_if_stale_days,
"refresh_reason": None,
**refresh_candidate,
}
previous_latest_applied_link_at = str(
existing.get("latest_applied_link_at") or ""
) or None
refresh_item = _select_refresh_work_item(recurrence, args=args)
refresh_work_item_id = _source_review_work_item_id(refresh_item)
if args.dry_run:
return {
"status": "dry_run_refresh",
"work_item_id": work_item_id,
"refresh_work_item_id": refresh_work_item_id,
"target_incident_id": args.target_incident_id,
"refresh_if_stale_days": args.refresh_if_stale_days,
"refresh_reason": refresh_reason,
"previous_latest_applied_link_at": previous_latest_applied_link_at,
"refresh_latest_provider_event_id": refresh_item.get(
"latest_provider_event_id"
),
"refresh_alertname": refresh_item.get("alertname"),
}
return _apply_source_correlation_item(
args=args,
work_item_id=refresh_work_item_id,
status="refreshed",
refresh_reason=refresh_reason,
previous_latest_applied_link_at=previous_latest_applied_link_at,
)
return _apply_source_correlation_item(
args=args,
work_item_id=work_item_id,
status="passed",
)
def parse_args(argv: list[str]) -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Run AwoooP source-correlation apply status-chain smoke."
)
parser.add_argument("--api-url", default="https://awoooi.wooo.work")
parser.add_argument("--project-id", default="awoooi")
parser.add_argument("--provider", default="sentry")
parser.add_argument("--limit", type=int, default=300)
parser.add_argument("--work-item-id")
parser.add_argument("--target-incident-id", required=True)
parser.add_argument("--expected-source-event-provider-event-id")
parser.add_argument("--reviewer-id", default="codex_t120_source_apply_smoke")
parser.add_argument(
"--operator-note",
default=(
"T120 controlled source-correlation apply smoke; append-only status-chain "
"verification, not an Incident state mutation"
),
)
parser.add_argument("--min-applied", type=int, default=1)
parser.add_argument("--status-attempts", type=int, default=8)
parser.add_argument("--status-interval-seconds", type=float, default=2.0)
parser.add_argument("--review-readback-attempts", type=int, default=8)
parser.add_argument("--review-readback-interval-seconds", type=float, default=2.0)
parser.add_argument("--allow-non-canary", action="store_true")
parser.add_argument("--allow-existing-apply", action="store_true")
parser.add_argument(
"--refresh-if-stale-days",
type=float,
help=(
"When an existing applied link is older than this many days, apply "
"a fresh canary source-correlation work item instead of passing on "
"near-expired evidence."
),
)
parser.add_argument(
"--refresh-work-item-id",
help="Specific canary source-correlation work item to apply when refresh is needed.",
)
parser.add_argument(
"--refresh-from-latest-canary",
action="store_true",
help="When refresh is needed, use the latest unapplied canary/smoke/codex work item.",
)
parser.add_argument(
"--verify-refresh-candidate",
action="store_true",
help=(
"Even when the existing applied link is still fresh, verify that "
"the configured refresh work item exists and is safe to apply later."
),
)
parser.add_argument("--dry-run", action="store_true")
return parser.parse_args(argv)
def main(argv: list[str]) -> int:
try:
result = run(parse_args(argv))
except SmokeError as exc:
print(json.dumps({"status": "failed", "error": str(exc)}, ensure_ascii=False))
return 1
print(json.dumps(result, ensure_ascii=False, indent=2, sort_keys=True))
return 0
if __name__ == "__main__":
raise SystemExit(main(sys.argv[1:]))