fix(drift): dedupe semantic fingerprint repeats
All checks were successful
Code Review / ai-code-review (push) Successful in 10s
CD Pipeline / tests (push) Successful in 1m15s
CD Pipeline / build-and-deploy (push) Successful in 3m26s
CD Pipeline / post-deploy-checks (push) Successful in 1m34s

This commit is contained in:
Your Name
2026-05-19 01:12:50 +08:00
parent 1ca4912270
commit 9843c59450
5 changed files with 149 additions and 27 deletions

View File

@@ -18,7 +18,14 @@ import structlog
from sqlalchemy import text
from src.db.base import get_db_context
from src.models.drift import DriftInterpretation, DriftIntent, DriftItem, DriftLevel, DriftReport, DriftStatus
from src.models.drift import (
DriftIntent,
DriftInterpretation,
DriftItem,
DriftLevel,
DriftReport,
DriftStatus,
)
logger = structlog.get_logger(__name__)
@@ -167,7 +174,12 @@ class DriftReportRepository:
{"report_id": report_id, "narrative": narrative},
)
async def get_repeat_state(self, report: DriftReport) -> dict:
async def get_repeat_state(
self,
report: DriftReport,
*,
include_values: bool = True,
) -> dict:
"""Return stable fingerprint repeat state for a drift report."""
from src.services.drift_repeat_state import build_drift_repeat_state
@@ -190,7 +202,11 @@ class DriftReportRepository:
{"namespace": report.namespace},
)
rows = [dict(row) for row in result.mappings().all()]
return build_drift_repeat_state(report, rows)
return build_drift_repeat_state(
report,
rows,
include_values=include_values,
)
_drift_repo: DriftReportRepository | None = None

View File

@@ -169,9 +169,10 @@ def build_drift_fingerprint_state(
"next_step": next_step,
"open_pr": open_pr,
"latest_handoff": latest_handoff,
"strict_fingerprint": repeat_state.get("strict_fingerprint"),
"p0_escalation": {
"suppresses_repeated_p0": True,
"dedup_key_strategy": "stable_drift_fingerprint",
"dedup_key_strategy": "semantic_drift_fingerprint",
"dedup_window_hours": 24,
},
"read_model_route": {
@@ -240,10 +241,14 @@ class DriftFingerprintStateService:
) -> dict[str, Any]:
report = await self._load_report(report_id=report_id, namespace=namespace)
repo = get_drift_repository()
repeat_state = await repo.get_repeat_state(report)
repeat_state = await repo.get_repeat_state(report, include_values=False)
fingerprint = repeat_state["fingerprint"]
open_pr = await self._lookup_open_pr(report)
latest_handoff = await self._fetch_latest_handoff(fingerprint)
latest_handoff = await self._fetch_latest_handoff(
fingerprint,
alternate_fingerprints=_repeat_state_fingerprint_aliases(repeat_state),
report_ids=_repeat_state_report_ids(repeat_state),
)
return build_drift_fingerprint_state(
report,
repeat_state,
@@ -371,7 +376,16 @@ class DriftFingerprintStateService:
return report
raise DriftFingerprintStateNotFoundError(desired_namespace)
async def _fetch_latest_handoff(self, fingerprint: str) -> dict[str, Any] | None:
async def _fetch_latest_handoff(
self,
fingerprint: str,
*,
alternate_fingerprints: set[str] | None = None,
report_ids: set[str] | None = None,
) -> dict[str, Any] | None:
fingerprints = {fingerprint, *(alternate_fingerprints or set())}
fingerprints = {value for value in fingerprints if value}
report_ids = {value for value in (report_ids or set()) if value}
try:
async with get_db_context() as db:
result = await db.execute(
@@ -381,14 +395,13 @@ class DriftFingerprintStateService:
FROM alert_operation_log
WHERE actor = :actor
AND action_detail LIKE 'drift_fingerprint_handoff:%'
AND context ->> 'fingerprint' = :fingerprint
ORDER BY created_at DESC
LIMIT 1
LIMIT 100
"""
),
{"actor": SERVICE_ACTOR, "fingerprint": fingerprint},
{"actor": SERVICE_ACTOR},
)
row = result.mappings().first()
rows = result.mappings().all()
except Exception as exc:
logger.warning(
"drift_fingerprint_handoff_lookup_failed",
@@ -400,7 +413,19 @@ class DriftFingerprintStateService:
"handoff_status": "lookup_failed",
}
if not row:
row = None
for candidate in rows:
context = candidate.get("context") or {}
if not isinstance(context, dict):
continue
if context.get("fingerprint") in fingerprints:
row = candidate
break
if context.get("latest_report_id") in report_ids:
row = candidate
break
if row is None:
return None
context = row.get("context") or {}
return {
@@ -471,6 +496,22 @@ def _default_pr_url(open_pr: dict[str, Any] | None) -> str | None:
return open_pr.get("html_url") or open_pr.get("url")
def _repeat_state_fingerprint_aliases(repeat_state: dict[str, Any]) -> set[str]:
aliases = {str(repeat_state.get("strict_fingerprint") or "")}
for report in repeat_state.get("reports") or []:
if isinstance(report, dict):
aliases.add(str(report.get("strict_fingerprint") or ""))
return {value for value in aliases if value}
def _repeat_state_report_ids(repeat_state: dict[str, Any]) -> set[str]:
report_ids = set()
for report in repeat_state.get("reports") or []:
if isinstance(report, dict):
report_ids.add(str(report.get("report_id") or ""))
return {value for value in report_ids if value}
def _matches_drift_pr(report: DriftReport, pr: dict[str, Any]) -> bool:
text_blob = "\n".join(
str(value or "")

View File

@@ -12,9 +12,11 @@ import json
from datetime import datetime, timedelta, timezone
from typing import Any
SCHEMA_VERSION = "drift_repeat_state_v1"
FINGERPRINT_VERSION = "drift_fingerprint_v1"
SEMANTIC_FINGERPRINT_VERSION = "drift_fingerprint_v2"
VALUE_AWARE_MATCHING_STRATEGY = "namespace_and_stable_items_v1"
SEMANTIC_MATCHING_STRATEGY = "namespace_resource_field_level_v2"
def _get(obj: Any, key: str, default: Any = None) -> Any:
@@ -74,26 +76,38 @@ def _iso(value: Any) -> str | None:
return parsed.isoformat() if parsed else None
def drift_item_identity(item: Any) -> dict[str, Any]:
def drift_item_identity(item: Any, *, include_values: bool = True) -> dict[str, Any]:
"""Return the stable fields that define one drift item."""
return {
identity = {
"resource_kind": str(_get(item, "resource_kind", "")),
"resource_name": str(_get(item, "resource_name", "")),
"namespace": str(_get(item, "namespace", "")),
"field_path": str(_get(item, "field_path", "")),
"drift_level": str(_enum_value(_get(item, "drift_level", ""))),
"git_value": _jsonable(_get(item, "git_value")),
"actual_value": _jsonable(_get(item, "actual_value")),
"is_allowlisted": bool(_get(item, "is_allowlisted", False)),
}
if include_values:
identity["git_value"] = _jsonable(_get(item, "git_value"))
identity["actual_value"] = _jsonable(_get(item, "actual_value"))
return identity
def build_drift_fingerprint(namespace: str, items: list[Any]) -> str:
def build_drift_fingerprint(
namespace: str,
items: list[Any],
*,
include_values: bool = True,
) -> str:
"""Build a deterministic fingerprint from namespace + sorted drift items."""
identities = [drift_item_identity(item) for item in items]
identities = [
drift_item_identity(item, include_values=include_values)
for item in items
]
identities.sort(key=_canonical_json)
payload = {
"version": FINGERPRINT_VERSION,
"version": FINGERPRINT_VERSION
if include_values
else SEMANTIC_FINGERPRINT_VERSION,
"namespace": namespace,
"items": identities,
}
@@ -101,16 +115,26 @@ def build_drift_fingerprint(namespace: str, items: list[Any]) -> str:
return f"dfp_{digest[:16]}"
def _report_identity(report: Any) -> dict[str, Any]:
def _report_identity(report: Any, *, include_values: bool = True) -> dict[str, Any]:
items = _get(report, "items", []) or []
namespace = str(_get(report, "namespace", ""))
strict_fingerprint = build_drift_fingerprint(
namespace,
list(items),
include_values=True,
)
return {
"report_id": _get(report, "report_id"),
"namespace": namespace,
"status": str(_enum_value(_get(report, "status", ""))),
"scanned_at": _get(report, "scanned_at"),
"created_at": _get(report, "created_at"),
"fingerprint": build_drift_fingerprint(namespace, list(items)),
"fingerprint": build_drift_fingerprint(
namespace,
list(items),
include_values=include_values,
),
"strict_fingerprint": strict_fingerprint,
}
@@ -118,11 +142,12 @@ def build_drift_repeat_state(
report: Any,
recent_reports: list[Any],
*,
include_values: bool = True,
window_hours: int = 12,
max_reports: int = 20,
) -> dict[str, Any]:
"""Summarize repeat state for one drift report using stable fingerprints."""
current = _report_identity(report)
current = _report_identity(report, include_values=include_values)
current_time = (
_parse_datetime(current.get("scanned_at"))
or _parse_datetime(current.get("created_at"))
@@ -132,7 +157,7 @@ def build_drift_repeat_state(
by_id: dict[str, dict[str, Any]] = {}
for candidate in [report, *recent_reports]:
identity = _report_identity(candidate)
identity = _report_identity(candidate, include_values=include_values)
report_id = str(identity.get("report_id") or "")
if not report_id:
continue
@@ -162,7 +187,12 @@ def build_drift_repeat_state(
return {
"schema_version": SCHEMA_VERSION,
"fingerprint": current["fingerprint"],
"matching_strategy": "namespace_and_stable_items_v1",
"strict_fingerprint": current["strict_fingerprint"],
"matching_strategy": (
VALUE_AWARE_MATCHING_STRATEGY
if include_values
else SEMANTIC_MATCHING_STRATEGY
),
"window_hours": window_hours,
"occurrences_12h": len(matches),
"first_scanned_at": _iso(first.get("scanned_at") or first.get("created_at")),
@@ -174,6 +204,7 @@ def build_drift_repeat_state(
"scanned_at": _iso(row.get("scanned_at")),
"created_at": _iso(row.get("created_at")),
"status": row.get("status"),
"strict_fingerprint": row.get("strict_fingerprint"),
}
for row in reversed(matches[-max_reports:])
],

View File

@@ -24,6 +24,7 @@ def _drift_emergency_fingerprint(report: Any) -> str:
return build_drift_fingerprint(
str(getattr(report, "namespace", "") or ""),
list(getattr(report, "items", []) or []),
include_values=False,
)
except Exception as exc:
logger.warning(

View File

@@ -10,12 +10,12 @@ from src.services.awooop_ansible_audit_service import (
)
from src.services.awooop_truth_chain_service import (
_automation_quality_score_bucket,
build_automation_quality,
build_incident_reconciliation,
_clean_row,
_incident_fingerprints,
_summarize_gateway_mcp,
_truth_status,
build_automation_quality,
build_incident_reconciliation,
fetch_truth_chain,
summarize_automation_quality_records,
)
@@ -302,6 +302,39 @@ def test_drift_repeat_state_counts_matching_fingerprint_only() -> None:
]
def test_drift_repeat_state_can_group_semantic_shape_without_values() -> None:
now = datetime(2026, 5, 19, 1, 0, tzinfo=timezone.utc)
report = {
"report_id": "drift-now",
"namespace": "awoooi-prod",
"status": "pending",
"scanned_at": now,
"created_at": now,
"items": [_drift_item(actual_value=["vol-a", "vol-b"])],
}
recent = [
{
**report,
"report_id": "drift-prev",
"scanned_at": now - timedelta(hours=1),
"created_at": now - timedelta(hours=1),
"items": [_drift_item(actual_value=["vol-a", "vol-b", "vol-c"])],
},
]
strict_state = build_drift_repeat_state(report, recent)
semantic_state = build_drift_repeat_state(
report,
recent,
include_values=False,
)
assert strict_state["occurrences_12h"] == 1
assert semantic_state["matching_strategy"] == "namespace_resource_field_level_v2"
assert semantic_state["occurrences_12h"] == 2
assert semantic_state["fingerprint"] != semantic_state["strict_fingerprint"]
def test_reconciliation_blocks_open_incident_after_no_action_approval() -> None:
reconciliation = build_incident_reconciliation(
incident={"incident_id": "INC-1", "status": "INVESTIGATING"},