Files
awoooi/apps/api/src/services/drift_repeat_state.py
Your Name 5b34877429
All checks were successful
Code Review / ai-code-review (push) Successful in 11s
CD Pipeline / tests (push) Successful in 1m17s
CD Pipeline / build-and-deploy (push) Successful in 3m20s
CD Pipeline / post-deploy-checks (push) Successful in 1m13s
feat(awooop): expose drift repeat fingerprint
2026-05-13 07:36:21 +08:00

181 lines
5.9 KiB
Python

"""Stable repeat identity for Config Drift reports.
The drift scanner emits a fresh ``report_id`` for every run. Operators need a
stable identity that answers whether two reports describe the same drift, not
just whether they have the same HIGH/MEDIUM/INFO counts.
"""
from __future__ import annotations
import hashlib
import json
from datetime import datetime, timedelta, timezone
from typing import Any
SCHEMA_VERSION = "drift_repeat_state_v1"
FINGERPRINT_VERSION = "drift_fingerprint_v1"
def _get(obj: Any, key: str, default: Any = None) -> Any:
if isinstance(obj, dict):
return obj.get(key, default)
return getattr(obj, key, default)
def _enum_value(value: Any) -> Any:
return getattr(value, "value", value)
def _jsonable(value: Any) -> Any:
value = _enum_value(value)
if isinstance(value, dict):
return {str(k): _jsonable(v) for k, v in value.items()}
if isinstance(value, list):
return [_jsonable(v) for v in value]
if isinstance(value, tuple):
return [_jsonable(v) for v in value]
if isinstance(value, datetime):
return value.isoformat()
return value
def _canonical_json(value: Any) -> str:
return json.dumps(
_jsonable(value),
ensure_ascii=False,
sort_keys=True,
separators=(",", ":"),
default=str,
)
def _parse_datetime(value: Any) -> datetime | None:
if value is None:
return None
if isinstance(value, datetime):
parsed = value
if parsed.tzinfo is not None:
return parsed.astimezone(timezone.utc).replace(tzinfo=None)
return parsed
if isinstance(value, str):
try:
parsed = datetime.fromisoformat(value.replace("Z", "+00:00"))
if parsed.tzinfo is not None:
return parsed.astimezone(timezone.utc).replace(tzinfo=None)
return parsed
except ValueError:
return None
return None
def _iso(value: Any) -> str | None:
parsed = _parse_datetime(value)
return parsed.isoformat() if parsed else None
def drift_item_identity(item: Any) -> dict[str, Any]:
"""Return the stable fields that define one drift item."""
return {
"resource_kind": str(_get(item, "resource_kind", "")),
"resource_name": str(_get(item, "resource_name", "")),
"namespace": str(_get(item, "namespace", "")),
"field_path": str(_get(item, "field_path", "")),
"drift_level": str(_enum_value(_get(item, "drift_level", ""))),
"git_value": _jsonable(_get(item, "git_value")),
"actual_value": _jsonable(_get(item, "actual_value")),
"is_allowlisted": bool(_get(item, "is_allowlisted", False)),
}
def build_drift_fingerprint(namespace: str, items: list[Any]) -> str:
"""Build a deterministic fingerprint from namespace + sorted drift items."""
identities = [drift_item_identity(item) for item in items]
identities.sort(key=_canonical_json)
payload = {
"version": FINGERPRINT_VERSION,
"namespace": namespace,
"items": identities,
}
digest = hashlib.sha256(_canonical_json(payload).encode("utf-8")).hexdigest()
return f"dfp_{digest[:16]}"
def _report_identity(report: Any) -> dict[str, Any]:
items = _get(report, "items", []) or []
namespace = str(_get(report, "namespace", ""))
return {
"report_id": _get(report, "report_id"),
"namespace": namespace,
"status": str(_enum_value(_get(report, "status", ""))),
"scanned_at": _get(report, "scanned_at"),
"created_at": _get(report, "created_at"),
"fingerprint": build_drift_fingerprint(namespace, list(items)),
}
def build_drift_repeat_state(
report: Any,
recent_reports: list[Any],
*,
window_hours: int = 12,
max_reports: int = 20,
) -> dict[str, Any]:
"""Summarize repeat state for one drift report using stable fingerprints."""
current = _report_identity(report)
current_time = (
_parse_datetime(current.get("scanned_at"))
or _parse_datetime(current.get("created_at"))
or datetime.now()
)
cutoff = current_time - timedelta(hours=window_hours)
by_id: dict[str, dict[str, Any]] = {}
for candidate in [report, *recent_reports]:
identity = _report_identity(candidate)
report_id = str(identity.get("report_id") or "")
if not report_id:
continue
candidate_time = (
_parse_datetime(identity.get("scanned_at"))
or _parse_datetime(identity.get("created_at"))
)
if candidate_time is not None and candidate_time < cutoff:
continue
if identity["fingerprint"] != current["fingerprint"]:
continue
by_id[report_id] = identity
matches = sorted(
by_id.values(),
key=lambda row: (
_parse_datetime(row.get("scanned_at"))
or _parse_datetime(row.get("created_at"))
or datetime.min
),
)
first = matches[0] if matches else current
last = matches[-1] if matches else current
status = current.get("status") or "unknown"
operator_stage = "pending_human" if status == "pending" else str(status)
return {
"schema_version": SCHEMA_VERSION,
"fingerprint": current["fingerprint"],
"matching_strategy": "namespace_and_stable_items_v1",
"window_hours": window_hours,
"occurrences_12h": len(matches),
"first_scanned_at": _iso(first.get("scanned_at") or first.get("created_at")),
"last_scanned_at": _iso(last.get("scanned_at") or last.get("created_at")),
"operator_stage": operator_stage,
"reports": [
{
"report_id": row.get("report_id"),
"scanned_at": _iso(row.get("scanned_at")),
"created_at": _iso(row.get("created_at")),
"status": row.get("status"),
}
for row in reversed(matches[-max_reports:])
],
}