fix(drift): dedupe semantic fingerprint repeats
This commit is contained in:
@@ -18,7 +18,14 @@ import structlog
|
||||
from sqlalchemy import text
|
||||
|
||||
from src.db.base import get_db_context
|
||||
from src.models.drift import DriftInterpretation, DriftIntent, DriftItem, DriftLevel, DriftReport, DriftStatus
|
||||
from src.models.drift import (
|
||||
DriftIntent,
|
||||
DriftInterpretation,
|
||||
DriftItem,
|
||||
DriftLevel,
|
||||
DriftReport,
|
||||
DriftStatus,
|
||||
)
|
||||
|
||||
logger = structlog.get_logger(__name__)
|
||||
|
||||
@@ -167,7 +174,12 @@ class DriftReportRepository:
|
||||
{"report_id": report_id, "narrative": narrative},
|
||||
)
|
||||
|
||||
async def get_repeat_state(self, report: DriftReport) -> dict:
|
||||
async def get_repeat_state(
|
||||
self,
|
||||
report: DriftReport,
|
||||
*,
|
||||
include_values: bool = True,
|
||||
) -> dict:
|
||||
"""Return stable fingerprint repeat state for a drift report."""
|
||||
from src.services.drift_repeat_state import build_drift_repeat_state
|
||||
|
||||
@@ -190,7 +202,11 @@ class DriftReportRepository:
|
||||
{"namespace": report.namespace},
|
||||
)
|
||||
rows = [dict(row) for row in result.mappings().all()]
|
||||
return build_drift_repeat_state(report, rows)
|
||||
return build_drift_repeat_state(
|
||||
report,
|
||||
rows,
|
||||
include_values=include_values,
|
||||
)
|
||||
|
||||
|
||||
_drift_repo: DriftReportRepository | None = None
|
||||
|
||||
@@ -169,9 +169,10 @@ def build_drift_fingerprint_state(
|
||||
"next_step": next_step,
|
||||
"open_pr": open_pr,
|
||||
"latest_handoff": latest_handoff,
|
||||
"strict_fingerprint": repeat_state.get("strict_fingerprint"),
|
||||
"p0_escalation": {
|
||||
"suppresses_repeated_p0": True,
|
||||
"dedup_key_strategy": "stable_drift_fingerprint",
|
||||
"dedup_key_strategy": "semantic_drift_fingerprint",
|
||||
"dedup_window_hours": 24,
|
||||
},
|
||||
"read_model_route": {
|
||||
@@ -240,10 +241,14 @@ class DriftFingerprintStateService:
|
||||
) -> dict[str, Any]:
|
||||
report = await self._load_report(report_id=report_id, namespace=namespace)
|
||||
repo = get_drift_repository()
|
||||
repeat_state = await repo.get_repeat_state(report)
|
||||
repeat_state = await repo.get_repeat_state(report, include_values=False)
|
||||
fingerprint = repeat_state["fingerprint"]
|
||||
open_pr = await self._lookup_open_pr(report)
|
||||
latest_handoff = await self._fetch_latest_handoff(fingerprint)
|
||||
latest_handoff = await self._fetch_latest_handoff(
|
||||
fingerprint,
|
||||
alternate_fingerprints=_repeat_state_fingerprint_aliases(repeat_state),
|
||||
report_ids=_repeat_state_report_ids(repeat_state),
|
||||
)
|
||||
return build_drift_fingerprint_state(
|
||||
report,
|
||||
repeat_state,
|
||||
@@ -371,7 +376,16 @@ class DriftFingerprintStateService:
|
||||
return report
|
||||
raise DriftFingerprintStateNotFoundError(desired_namespace)
|
||||
|
||||
async def _fetch_latest_handoff(self, fingerprint: str) -> dict[str, Any] | None:
|
||||
async def _fetch_latest_handoff(
|
||||
self,
|
||||
fingerprint: str,
|
||||
*,
|
||||
alternate_fingerprints: set[str] | None = None,
|
||||
report_ids: set[str] | None = None,
|
||||
) -> dict[str, Any] | None:
|
||||
fingerprints = {fingerprint, *(alternate_fingerprints or set())}
|
||||
fingerprints = {value for value in fingerprints if value}
|
||||
report_ids = {value for value in (report_ids or set()) if value}
|
||||
try:
|
||||
async with get_db_context() as db:
|
||||
result = await db.execute(
|
||||
@@ -381,14 +395,13 @@ class DriftFingerprintStateService:
|
||||
FROM alert_operation_log
|
||||
WHERE actor = :actor
|
||||
AND action_detail LIKE 'drift_fingerprint_handoff:%'
|
||||
AND context ->> 'fingerprint' = :fingerprint
|
||||
ORDER BY created_at DESC
|
||||
LIMIT 1
|
||||
LIMIT 100
|
||||
"""
|
||||
),
|
||||
{"actor": SERVICE_ACTOR, "fingerprint": fingerprint},
|
||||
{"actor": SERVICE_ACTOR},
|
||||
)
|
||||
row = result.mappings().first()
|
||||
rows = result.mappings().all()
|
||||
except Exception as exc:
|
||||
logger.warning(
|
||||
"drift_fingerprint_handoff_lookup_failed",
|
||||
@@ -400,7 +413,19 @@ class DriftFingerprintStateService:
|
||||
"handoff_status": "lookup_failed",
|
||||
}
|
||||
|
||||
if not row:
|
||||
row = None
|
||||
for candidate in rows:
|
||||
context = candidate.get("context") or {}
|
||||
if not isinstance(context, dict):
|
||||
continue
|
||||
if context.get("fingerprint") in fingerprints:
|
||||
row = candidate
|
||||
break
|
||||
if context.get("latest_report_id") in report_ids:
|
||||
row = candidate
|
||||
break
|
||||
|
||||
if row is None:
|
||||
return None
|
||||
context = row.get("context") or {}
|
||||
return {
|
||||
@@ -471,6 +496,22 @@ def _default_pr_url(open_pr: dict[str, Any] | None) -> str | None:
|
||||
return open_pr.get("html_url") or open_pr.get("url")
|
||||
|
||||
|
||||
def _repeat_state_fingerprint_aliases(repeat_state: dict[str, Any]) -> set[str]:
|
||||
aliases = {str(repeat_state.get("strict_fingerprint") or "")}
|
||||
for report in repeat_state.get("reports") or []:
|
||||
if isinstance(report, dict):
|
||||
aliases.add(str(report.get("strict_fingerprint") or ""))
|
||||
return {value for value in aliases if value}
|
||||
|
||||
|
||||
def _repeat_state_report_ids(repeat_state: dict[str, Any]) -> set[str]:
|
||||
report_ids = set()
|
||||
for report in repeat_state.get("reports") or []:
|
||||
if isinstance(report, dict):
|
||||
report_ids.add(str(report.get("report_id") or ""))
|
||||
return {value for value in report_ids if value}
|
||||
|
||||
|
||||
def _matches_drift_pr(report: DriftReport, pr: dict[str, Any]) -> bool:
|
||||
text_blob = "\n".join(
|
||||
str(value or "")
|
||||
|
||||
@@ -12,9 +12,11 @@ import json
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from typing import Any
|
||||
|
||||
|
||||
SCHEMA_VERSION = "drift_repeat_state_v1"
|
||||
FINGERPRINT_VERSION = "drift_fingerprint_v1"
|
||||
SEMANTIC_FINGERPRINT_VERSION = "drift_fingerprint_v2"
|
||||
VALUE_AWARE_MATCHING_STRATEGY = "namespace_and_stable_items_v1"
|
||||
SEMANTIC_MATCHING_STRATEGY = "namespace_resource_field_level_v2"
|
||||
|
||||
|
||||
def _get(obj: Any, key: str, default: Any = None) -> Any:
|
||||
@@ -74,26 +76,38 @@ def _iso(value: Any) -> str | None:
|
||||
return parsed.isoformat() if parsed else None
|
||||
|
||||
|
||||
def drift_item_identity(item: Any) -> dict[str, Any]:
|
||||
def drift_item_identity(item: Any, *, include_values: bool = True) -> dict[str, Any]:
|
||||
"""Return the stable fields that define one drift item."""
|
||||
return {
|
||||
identity = {
|
||||
"resource_kind": str(_get(item, "resource_kind", "")),
|
||||
"resource_name": str(_get(item, "resource_name", "")),
|
||||
"namespace": str(_get(item, "namespace", "")),
|
||||
"field_path": str(_get(item, "field_path", "")),
|
||||
"drift_level": str(_enum_value(_get(item, "drift_level", ""))),
|
||||
"git_value": _jsonable(_get(item, "git_value")),
|
||||
"actual_value": _jsonable(_get(item, "actual_value")),
|
||||
"is_allowlisted": bool(_get(item, "is_allowlisted", False)),
|
||||
}
|
||||
if include_values:
|
||||
identity["git_value"] = _jsonable(_get(item, "git_value"))
|
||||
identity["actual_value"] = _jsonable(_get(item, "actual_value"))
|
||||
return identity
|
||||
|
||||
|
||||
def build_drift_fingerprint(namespace: str, items: list[Any]) -> str:
|
||||
def build_drift_fingerprint(
|
||||
namespace: str,
|
||||
items: list[Any],
|
||||
*,
|
||||
include_values: bool = True,
|
||||
) -> str:
|
||||
"""Build a deterministic fingerprint from namespace + sorted drift items."""
|
||||
identities = [drift_item_identity(item) for item in items]
|
||||
identities = [
|
||||
drift_item_identity(item, include_values=include_values)
|
||||
for item in items
|
||||
]
|
||||
identities.sort(key=_canonical_json)
|
||||
payload = {
|
||||
"version": FINGERPRINT_VERSION,
|
||||
"version": FINGERPRINT_VERSION
|
||||
if include_values
|
||||
else SEMANTIC_FINGERPRINT_VERSION,
|
||||
"namespace": namespace,
|
||||
"items": identities,
|
||||
}
|
||||
@@ -101,16 +115,26 @@ def build_drift_fingerprint(namespace: str, items: list[Any]) -> str:
|
||||
return f"dfp_{digest[:16]}"
|
||||
|
||||
|
||||
def _report_identity(report: Any) -> dict[str, Any]:
|
||||
def _report_identity(report: Any, *, include_values: bool = True) -> dict[str, Any]:
|
||||
items = _get(report, "items", []) or []
|
||||
namespace = str(_get(report, "namespace", ""))
|
||||
strict_fingerprint = build_drift_fingerprint(
|
||||
namespace,
|
||||
list(items),
|
||||
include_values=True,
|
||||
)
|
||||
return {
|
||||
"report_id": _get(report, "report_id"),
|
||||
"namespace": namespace,
|
||||
"status": str(_enum_value(_get(report, "status", ""))),
|
||||
"scanned_at": _get(report, "scanned_at"),
|
||||
"created_at": _get(report, "created_at"),
|
||||
"fingerprint": build_drift_fingerprint(namespace, list(items)),
|
||||
"fingerprint": build_drift_fingerprint(
|
||||
namespace,
|
||||
list(items),
|
||||
include_values=include_values,
|
||||
),
|
||||
"strict_fingerprint": strict_fingerprint,
|
||||
}
|
||||
|
||||
|
||||
@@ -118,11 +142,12 @@ def build_drift_repeat_state(
|
||||
report: Any,
|
||||
recent_reports: list[Any],
|
||||
*,
|
||||
include_values: bool = True,
|
||||
window_hours: int = 12,
|
||||
max_reports: int = 20,
|
||||
) -> dict[str, Any]:
|
||||
"""Summarize repeat state for one drift report using stable fingerprints."""
|
||||
current = _report_identity(report)
|
||||
current = _report_identity(report, include_values=include_values)
|
||||
current_time = (
|
||||
_parse_datetime(current.get("scanned_at"))
|
||||
or _parse_datetime(current.get("created_at"))
|
||||
@@ -132,7 +157,7 @@ def build_drift_repeat_state(
|
||||
|
||||
by_id: dict[str, dict[str, Any]] = {}
|
||||
for candidate in [report, *recent_reports]:
|
||||
identity = _report_identity(candidate)
|
||||
identity = _report_identity(candidate, include_values=include_values)
|
||||
report_id = str(identity.get("report_id") or "")
|
||||
if not report_id:
|
||||
continue
|
||||
@@ -162,7 +187,12 @@ def build_drift_repeat_state(
|
||||
return {
|
||||
"schema_version": SCHEMA_VERSION,
|
||||
"fingerprint": current["fingerprint"],
|
||||
"matching_strategy": "namespace_and_stable_items_v1",
|
||||
"strict_fingerprint": current["strict_fingerprint"],
|
||||
"matching_strategy": (
|
||||
VALUE_AWARE_MATCHING_STRATEGY
|
||||
if include_values
|
||||
else SEMANTIC_MATCHING_STRATEGY
|
||||
),
|
||||
"window_hours": window_hours,
|
||||
"occurrences_12h": len(matches),
|
||||
"first_scanned_at": _iso(first.get("scanned_at") or first.get("created_at")),
|
||||
@@ -174,6 +204,7 @@ def build_drift_repeat_state(
|
||||
"scanned_at": _iso(row.get("scanned_at")),
|
||||
"created_at": _iso(row.get("created_at")),
|
||||
"status": row.get("status"),
|
||||
"strict_fingerprint": row.get("strict_fingerprint"),
|
||||
}
|
||||
for row in reversed(matches[-max_reports:])
|
||||
],
|
||||
|
||||
@@ -24,6 +24,7 @@ def _drift_emergency_fingerprint(report: Any) -> str:
|
||||
return build_drift_fingerprint(
|
||||
str(getattr(report, "namespace", "") or ""),
|
||||
list(getattr(report, "items", []) or []),
|
||||
include_values=False,
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.warning(
|
||||
|
||||
@@ -10,12 +10,12 @@ from src.services.awooop_ansible_audit_service import (
|
||||
)
|
||||
from src.services.awooop_truth_chain_service import (
|
||||
_automation_quality_score_bucket,
|
||||
build_automation_quality,
|
||||
build_incident_reconciliation,
|
||||
_clean_row,
|
||||
_incident_fingerprints,
|
||||
_summarize_gateway_mcp,
|
||||
_truth_status,
|
||||
build_automation_quality,
|
||||
build_incident_reconciliation,
|
||||
fetch_truth_chain,
|
||||
summarize_automation_quality_records,
|
||||
)
|
||||
@@ -302,6 +302,39 @@ def test_drift_repeat_state_counts_matching_fingerprint_only() -> None:
|
||||
]
|
||||
|
||||
|
||||
def test_drift_repeat_state_can_group_semantic_shape_without_values() -> None:
|
||||
now = datetime(2026, 5, 19, 1, 0, tzinfo=timezone.utc)
|
||||
report = {
|
||||
"report_id": "drift-now",
|
||||
"namespace": "awoooi-prod",
|
||||
"status": "pending",
|
||||
"scanned_at": now,
|
||||
"created_at": now,
|
||||
"items": [_drift_item(actual_value=["vol-a", "vol-b"])],
|
||||
}
|
||||
recent = [
|
||||
{
|
||||
**report,
|
||||
"report_id": "drift-prev",
|
||||
"scanned_at": now - timedelta(hours=1),
|
||||
"created_at": now - timedelta(hours=1),
|
||||
"items": [_drift_item(actual_value=["vol-a", "vol-b", "vol-c"])],
|
||||
},
|
||||
]
|
||||
|
||||
strict_state = build_drift_repeat_state(report, recent)
|
||||
semantic_state = build_drift_repeat_state(
|
||||
report,
|
||||
recent,
|
||||
include_values=False,
|
||||
)
|
||||
|
||||
assert strict_state["occurrences_12h"] == 1
|
||||
assert semantic_state["matching_strategy"] == "namespace_resource_field_level_v2"
|
||||
assert semantic_state["occurrences_12h"] == 2
|
||||
assert semantic_state["fingerprint"] != semantic_state["strict_fingerprint"]
|
||||
|
||||
|
||||
def test_reconciliation_blocks_open_incident_after_no_action_approval() -> None:
|
||||
reconciliation = build_incident_reconciliation(
|
||||
incident={"incident_id": "INC-1", "status": "INVESTIGATING"},
|
||||
|
||||
Reference in New Issue
Block a user