Files
awoooi/scripts/awooop_source_correlation_apply_smoke.py
Your Name 3f5fb9d8b2
All checks were successful
Code Review / ai-code-review (push) Successful in 11s
ci(awooop): gate source correlation applied link
2026-05-21 11:45:39 +08:00

416 lines
15 KiB
Python

#!/usr/bin/env python3
"""Verify that source-correlation apply is visible in AwoooP Status Chain.
This smoke intentionally uses an existing source-correlation review work item and
the public record/apply APIs. The apply path is append-only: it must not mutate
Incident state, auto-repair results, or tickets.
"""
from __future__ import annotations
import argparse
import json
import sys
import time
import urllib.error
import urllib.parse
import urllib.request
from typing import Any
SAFE_WORK_ITEM_TERMS = ("canary", "smoke", "codex")
class SmokeError(RuntimeError):
"""Raised when the smoke cannot prove the expected production contract."""
def _source_review_work_item_id(item: dict[str, Any]) -> str:
work_item = item.get("work_item") if isinstance(item.get("work_item"), dict) else {}
review = (
item.get("source_correlation_review")
if isinstance(item.get("source_correlation_review"), dict)
else {}
)
apply = (
item.get("source_correlation_apply")
if isinstance(item.get("source_correlation_apply"), dict)
else {}
)
for candidate in [
apply.get("work_item_id"),
review.get("work_item_id"),
work_item.get("work_item_id"),
]:
safe_id = str(candidate or "").strip()
if safe_id.startswith("source-evidence:"):
return safe_id
return str(work_item.get("work_item_id") or "").strip()
def _url(base_url: str, path: str, params: dict[str, str | int | None] | None = None) -> str:
root = base_url.rstrip("/")
if not params:
return f"{root}{path}"
query = urllib.parse.urlencode({
key: value for key, value in params.items() if value is not None
})
return f"{root}{path}?{query}"
def _http_json(
url: str,
*,
method: str = "GET",
payload: dict[str, Any] | None = None,
timeout: int = 30,
) -> dict[str, Any]:
data = None
headers = {"Accept": "application/json"}
if payload is not None:
data = json.dumps(payload).encode("utf-8")
headers["Content-Type"] = "application/json"
request = urllib.request.Request(url, data=data, headers=headers, method=method)
try:
with urllib.request.urlopen(request, timeout=timeout) as response:
return json.loads(response.read().decode("utf-8"))
except urllib.error.HTTPError as exc:
body = exc.read().decode("utf-8", errors="replace")[:500]
raise SmokeError(f"HTTP {exc.code} from {url}: {body}") from exc
except (urllib.error.URLError, TimeoutError, json.JSONDecodeError) as exc:
raise SmokeError(f"request failed for {url}: {exc}") from exc
def _find_work_item(
recurrence: dict[str, Any],
*,
work_item_id: str | None,
allow_non_canary: bool,
allow_existing_apply: bool,
) -> dict[str, Any]:
items = recurrence.get("items")
if not isinstance(items, list):
raise SmokeError("recurrence response missing items")
candidates: list[dict[str, Any]] = []
for item in items:
if not isinstance(item, dict):
continue
work_item = item.get("work_item")
if not isinstance(work_item, dict):
continue
if work_item.get("kind") != "source_correlation_review":
continue
candidate_id = _source_review_work_item_id(item)
match_ids = {
candidate_id,
str(work_item.get("work_item_id") or "").strip(),
}
if work_item_id and work_item_id not in match_ids:
continue
if item.get("source_correlation_apply") and not allow_existing_apply:
continue
searchable = " ".join(
str(value or "")
for value in [
candidate_id,
item.get("alertname"),
item.get("latest_provider_event_id"),
item.get("latest_content_preview"),
]
).lower()
is_safe_canary = any(term in searchable for term in SAFE_WORK_ITEM_TERMS)
if not allow_non_canary and not is_safe_canary:
continue
candidates.append(item)
if not candidates:
if work_item_id:
raise SmokeError(
"source_correlation_review work item not found, already applied, "
"or not a canary/smoke item"
)
raise SmokeError("no unapplied canary/smoke source_correlation_review work item found")
return candidates[0]
def _has_expected_applied_event(
correlation: dict[str, Any],
expected_source_event_provider_event_id: str | None,
) -> bool:
if not expected_source_event_provider_event_id:
return True
candidates = correlation.get("top_candidates")
if not isinstance(candidates, list):
return False
for candidate in candidates:
if not isinstance(candidate, dict):
continue
if candidate.get("link_state") != "applied":
continue
if candidate.get("provider_event_id") == expected_source_event_provider_event_id:
return True
return False
def _wait_for_status_chain(
*,
api_url: str,
project_id: str,
incident_id: str,
min_applied: int,
expected_source_event_provider_event_id: str | None,
attempts: int,
interval_seconds: float,
) -> dict[str, Any]:
status_url = _url(
api_url,
"/api/v1/platform/status-chain",
{"project_id": project_id, "incident_id": incident_id},
)
last_correlation: dict[str, Any] = {}
for _ in range(attempts):
chain = _http_json(status_url)
correlation = (
chain.get("source_refs", {}).get("correlation", {})
if isinstance(chain.get("source_refs"), dict)
else {}
)
if isinstance(correlation, dict):
last_correlation = correlation
applied_total = int(last_correlation.get("applied_link_total") or 0)
verification = str(last_correlation.get("verification_status") or "")
if (
applied_total >= min_applied
and verification == "applied_link_verified"
and _has_expected_applied_event(
last_correlation,
expected_source_event_provider_event_id,
)
):
return chain
time.sleep(interval_seconds)
raise SmokeError(
"status-chain did not expose applied source link: "
f"verification_status={last_correlation.get('verification_status')} "
f"applied_link_total={last_correlation.get('applied_link_total')} "
f"expected_event={expected_source_event_provider_event_id or '-'}"
)
def _verify_existing_status_chain(args: argparse.Namespace, *, status: str) -> dict[str, Any]:
chain = _wait_for_status_chain(
api_url=args.api_url,
project_id=args.project_id,
incident_id=args.target_incident_id,
min_applied=args.min_applied,
expected_source_event_provider_event_id=args.expected_source_event_provider_event_id,
attempts=args.status_attempts,
interval_seconds=args.status_interval_seconds,
)
correlation = chain["source_refs"]["correlation"]
return {
"status": status,
"work_item_id": args.work_item_id,
"target_incident_id": args.target_incident_id,
"expected_source_event_provider_event_id": (
args.expected_source_event_provider_event_id
),
"verification_status": correlation.get("verification_status"),
"applied_link_total": correlation.get("applied_link_total"),
"latest_applied_link_at": correlation.get("latest_applied_link_at"),
}
def run(args: argparse.Namespace) -> dict[str, Any]:
recurrence_url = _url(
args.api_url,
"/api/v1/platform/events/dossier/recurrence",
{
"project_id": args.project_id,
"provider": args.provider,
"limit": args.limit,
},
)
recurrence = _http_json(recurrence_url)
try:
item = _find_work_item(
recurrence,
work_item_id=args.work_item_id,
allow_non_canary=args.allow_non_canary,
allow_existing_apply=args.allow_existing_apply,
)
except SmokeError:
if args.allow_existing_apply and args.work_item_id:
return _verify_existing_status_chain(
args,
status="verified_existing_apply_without_recurrence",
)
raise
work_item_id = _source_review_work_item_id(item)
existing_apply = item.get("source_correlation_apply")
if args.dry_run:
return {
"status": "dry_run",
"work_item_id": work_item_id,
"target_incident_id": args.target_incident_id,
"latest_provider_event_id": item.get("latest_provider_event_id"),
"alertname": item.get("alertname"),
"already_applied": bool(existing_apply),
}
if existing_apply and args.allow_existing_apply:
chain = _wait_for_status_chain(
api_url=args.api_url,
project_id=args.project_id,
incident_id=args.target_incident_id,
min_applied=args.min_applied,
expected_source_event_provider_event_id=(
args.expected_source_event_provider_event_id
),
attempts=args.status_attempts,
interval_seconds=args.status_interval_seconds,
)
correlation = chain["source_refs"]["correlation"]
return {
"status": "already_applied",
"work_item_id": work_item_id,
"target_incident_id": args.target_incident_id,
"expected_source_event_provider_event_id": (
args.expected_source_event_provider_event_id
),
"apply_status": existing_apply.get("apply_status"),
"source_event_provider_event_id": existing_apply.get(
"source_event_provider_event_id"
),
"verification_status": correlation.get("verification_status"),
"applied_link_total": correlation.get("applied_link_total"),
"latest_applied_link_at": correlation.get("latest_applied_link_at"),
}
review_url = _url(
args.api_url,
"/api/v1/platform/events/dossier/recurrence/source-correlation/review",
)
review = _http_json(
review_url,
method="POST",
payload={
"project_id": args.project_id,
"provider": args.provider,
"limit": args.limit,
"work_item_id": work_item_id,
"decision": "accepted",
"target_incident_id": args.target_incident_id,
"reviewer_id": args.reviewer_id,
"operator_note": args.operator_note,
},
)
if review.get("review_record_status") != "recorded" or not review.get("allowed"):
raise SmokeError(
"accepted review was not recorded: "
f"status={review.get('review_record_status')} allowed={review.get('allowed')}"
)
apply_url = _url(
args.api_url,
"/api/v1/platform/events/dossier/recurrence/source-correlation/apply",
)
apply_result = _http_json(
apply_url,
method="POST",
payload={
"project_id": args.project_id,
"provider": args.provider,
"limit": args.limit,
"work_item_id": work_item_id,
"reviewer_id": args.reviewer_id,
"operator_note": args.operator_note,
},
)
if apply_result.get("apply_status") != "applied":
raise SmokeError(
"source correlation apply did not complete: "
f"status={apply_result.get('apply_status')} allowed={apply_result.get('allowed')}"
)
if apply_result.get("writes_incident_state") is not False:
raise SmokeError("apply unexpectedly wrote Incident state")
if apply_result.get("writes_auto_repair_result") is not False:
raise SmokeError("apply unexpectedly wrote auto-repair state")
if apply_result.get("writes_ticket") is not False:
raise SmokeError("apply unexpectedly wrote ticket state")
chain = _wait_for_status_chain(
api_url=args.api_url,
project_id=args.project_id,
incident_id=args.target_incident_id,
min_applied=args.min_applied,
expected_source_event_provider_event_id=args.expected_source_event_provider_event_id,
attempts=args.status_attempts,
interval_seconds=args.status_interval_seconds,
)
correlation = chain["source_refs"]["correlation"]
return {
"status": "passed",
"work_item_id": work_item_id,
"target_incident_id": args.target_incident_id,
"review_status": review.get("review_record_status"),
"apply_status": apply_result.get("apply_status"),
"source_event_provider_event_id": apply_result.get(
"source_event_provider_event_id"
),
"expected_source_event_provider_event_id": (
args.expected_source_event_provider_event_id
),
"writes_incident_state": apply_result.get("writes_incident_state"),
"writes_auto_repair_result": apply_result.get("writes_auto_repair_result"),
"writes_ticket": apply_result.get("writes_ticket"),
"verification_status": correlation.get("verification_status"),
"applied_link_total": correlation.get("applied_link_total"),
"latest_applied_link_at": correlation.get("latest_applied_link_at"),
}
def parse_args(argv: list[str]) -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Run AwoooP source-correlation apply status-chain smoke."
)
parser.add_argument("--api-url", default="https://awoooi.wooo.work")
parser.add_argument("--project-id", default="awoooi")
parser.add_argument("--provider", default="sentry")
parser.add_argument("--limit", type=int, default=300)
parser.add_argument("--work-item-id")
parser.add_argument("--target-incident-id", required=True)
parser.add_argument("--expected-source-event-provider-event-id")
parser.add_argument("--reviewer-id", default="codex_t120_source_apply_smoke")
parser.add_argument(
"--operator-note",
default=(
"T120 controlled source-correlation apply smoke; append-only status-chain "
"verification, not an Incident state mutation"
),
)
parser.add_argument("--min-applied", type=int, default=1)
parser.add_argument("--status-attempts", type=int, default=8)
parser.add_argument("--status-interval-seconds", type=float, default=2.0)
parser.add_argument("--allow-non-canary", action="store_true")
parser.add_argument("--allow-existing-apply", action="store_true")
parser.add_argument("--dry-run", action="store_true")
return parser.parse_args(argv)
def main(argv: list[str]) -> int:
try:
result = run(parse_args(argv))
except SmokeError as exc:
print(json.dumps({"status": "failed", "error": str(exc)}, ensure_ascii=False))
return 1
print(json.dumps(result, ensure_ascii=False, indent=2, sort_keys=True))
return 0
if __name__ == "__main__":
raise SystemExit(main(sys.argv[1:]))