From 9843c59450e925a58d1cc702301ac93cf85c71f4 Mon Sep 17 00:00:00 2001 From: Your Name Date: Tue, 19 May 2026 01:12:50 +0800 Subject: [PATCH] fix(drift): dedupe semantic fingerprint repeats --- apps/api/src/repositories/drift_repository.py | 22 ++++++- .../drift_fingerprint_state_service.py | 59 ++++++++++++++++--- apps/api/src/services/drift_repeat_state.py | 57 ++++++++++++++---- .../services/emergency_escalation_service.py | 1 + .../tests/test_awooop_truth_chain_service.py | 37 +++++++++++- 5 files changed, 149 insertions(+), 27 deletions(-) diff --git a/apps/api/src/repositories/drift_repository.py b/apps/api/src/repositories/drift_repository.py index 15124c5d..4f68e7ad 100644 --- a/apps/api/src/repositories/drift_repository.py +++ b/apps/api/src/repositories/drift_repository.py @@ -18,7 +18,14 @@ import structlog from sqlalchemy import text from src.db.base import get_db_context -from src.models.drift import DriftInterpretation, DriftIntent, DriftItem, DriftLevel, DriftReport, DriftStatus +from src.models.drift import ( + DriftIntent, + DriftInterpretation, + DriftItem, + DriftLevel, + DriftReport, + DriftStatus, +) logger = structlog.get_logger(__name__) @@ -167,7 +174,12 @@ class DriftReportRepository: {"report_id": report_id, "narrative": narrative}, ) - async def get_repeat_state(self, report: DriftReport) -> dict: + async def get_repeat_state( + self, + report: DriftReport, + *, + include_values: bool = True, + ) -> dict: """Return stable fingerprint repeat state for a drift report.""" from src.services.drift_repeat_state import build_drift_repeat_state @@ -190,7 +202,11 @@ class DriftReportRepository: {"namespace": report.namespace}, ) rows = [dict(row) for row in result.mappings().all()] - return build_drift_repeat_state(report, rows) + return build_drift_repeat_state( + report, + rows, + include_values=include_values, + ) _drift_repo: DriftReportRepository | None = None diff --git a/apps/api/src/services/drift_fingerprint_state_service.py b/apps/api/src/services/drift_fingerprint_state_service.py index ca4ecf26..de99bcd1 100644 --- a/apps/api/src/services/drift_fingerprint_state_service.py +++ b/apps/api/src/services/drift_fingerprint_state_service.py @@ -169,9 +169,10 @@ def build_drift_fingerprint_state( "next_step": next_step, "open_pr": open_pr, "latest_handoff": latest_handoff, + "strict_fingerprint": repeat_state.get("strict_fingerprint"), "p0_escalation": { "suppresses_repeated_p0": True, - "dedup_key_strategy": "stable_drift_fingerprint", + "dedup_key_strategy": "semantic_drift_fingerprint", "dedup_window_hours": 24, }, "read_model_route": { @@ -240,10 +241,14 @@ class DriftFingerprintStateService: ) -> dict[str, Any]: report = await self._load_report(report_id=report_id, namespace=namespace) repo = get_drift_repository() - repeat_state = await repo.get_repeat_state(report) + repeat_state = await repo.get_repeat_state(report, include_values=False) fingerprint = repeat_state["fingerprint"] open_pr = await self._lookup_open_pr(report) - latest_handoff = await self._fetch_latest_handoff(fingerprint) + latest_handoff = await self._fetch_latest_handoff( + fingerprint, + alternate_fingerprints=_repeat_state_fingerprint_aliases(repeat_state), + report_ids=_repeat_state_report_ids(repeat_state), + ) return build_drift_fingerprint_state( report, repeat_state, @@ -371,7 +376,16 @@ class DriftFingerprintStateService: return report raise DriftFingerprintStateNotFoundError(desired_namespace) - async def _fetch_latest_handoff(self, fingerprint: str) -> dict[str, Any] | None: + async def _fetch_latest_handoff( + self, + fingerprint: str, + *, + alternate_fingerprints: set[str] | None = None, + report_ids: set[str] | None = None, + ) -> dict[str, Any] | None: + fingerprints = {fingerprint, *(alternate_fingerprints or set())} + fingerprints = {value for value in fingerprints if value} + report_ids = {value for value in (report_ids or set()) if value} try: async with get_db_context() as db: result = await db.execute( @@ -381,14 +395,13 @@ class DriftFingerprintStateService: FROM alert_operation_log WHERE actor = :actor AND action_detail LIKE 'drift_fingerprint_handoff:%' - AND context ->> 'fingerprint' = :fingerprint ORDER BY created_at DESC - LIMIT 1 + LIMIT 100 """ ), - {"actor": SERVICE_ACTOR, "fingerprint": fingerprint}, + {"actor": SERVICE_ACTOR}, ) - row = result.mappings().first() + rows = result.mappings().all() except Exception as exc: logger.warning( "drift_fingerprint_handoff_lookup_failed", @@ -400,7 +413,19 @@ class DriftFingerprintStateService: "handoff_status": "lookup_failed", } - if not row: + row = None + for candidate in rows: + context = candidate.get("context") or {} + if not isinstance(context, dict): + continue + if context.get("fingerprint") in fingerprints: + row = candidate + break + if context.get("latest_report_id") in report_ids: + row = candidate + break + + if row is None: return None context = row.get("context") or {} return { @@ -471,6 +496,22 @@ def _default_pr_url(open_pr: dict[str, Any] | None) -> str | None: return open_pr.get("html_url") or open_pr.get("url") +def _repeat_state_fingerprint_aliases(repeat_state: dict[str, Any]) -> set[str]: + aliases = {str(repeat_state.get("strict_fingerprint") or "")} + for report in repeat_state.get("reports") or []: + if isinstance(report, dict): + aliases.add(str(report.get("strict_fingerprint") or "")) + return {value for value in aliases if value} + + +def _repeat_state_report_ids(repeat_state: dict[str, Any]) -> set[str]: + report_ids = set() + for report in repeat_state.get("reports") or []: + if isinstance(report, dict): + report_ids.add(str(report.get("report_id") or "")) + return {value for value in report_ids if value} + + def _matches_drift_pr(report: DriftReport, pr: dict[str, Any]) -> bool: text_blob = "\n".join( str(value or "") diff --git a/apps/api/src/services/drift_repeat_state.py b/apps/api/src/services/drift_repeat_state.py index 36b9ec6b..25d8d51f 100644 --- a/apps/api/src/services/drift_repeat_state.py +++ b/apps/api/src/services/drift_repeat_state.py @@ -12,9 +12,11 @@ import json from datetime import datetime, timedelta, timezone from typing import Any - SCHEMA_VERSION = "drift_repeat_state_v1" FINGERPRINT_VERSION = "drift_fingerprint_v1" +SEMANTIC_FINGERPRINT_VERSION = "drift_fingerprint_v2" +VALUE_AWARE_MATCHING_STRATEGY = "namespace_and_stable_items_v1" +SEMANTIC_MATCHING_STRATEGY = "namespace_resource_field_level_v2" def _get(obj: Any, key: str, default: Any = None) -> Any: @@ -74,26 +76,38 @@ def _iso(value: Any) -> str | None: return parsed.isoformat() if parsed else None -def drift_item_identity(item: Any) -> dict[str, Any]: +def drift_item_identity(item: Any, *, include_values: bool = True) -> dict[str, Any]: """Return the stable fields that define one drift item.""" - return { + identity = { "resource_kind": str(_get(item, "resource_kind", "")), "resource_name": str(_get(item, "resource_name", "")), "namespace": str(_get(item, "namespace", "")), "field_path": str(_get(item, "field_path", "")), "drift_level": str(_enum_value(_get(item, "drift_level", ""))), - "git_value": _jsonable(_get(item, "git_value")), - "actual_value": _jsonable(_get(item, "actual_value")), "is_allowlisted": bool(_get(item, "is_allowlisted", False)), } + if include_values: + identity["git_value"] = _jsonable(_get(item, "git_value")) + identity["actual_value"] = _jsonable(_get(item, "actual_value")) + return identity -def build_drift_fingerprint(namespace: str, items: list[Any]) -> str: +def build_drift_fingerprint( + namespace: str, + items: list[Any], + *, + include_values: bool = True, +) -> str: """Build a deterministic fingerprint from namespace + sorted drift items.""" - identities = [drift_item_identity(item) for item in items] + identities = [ + drift_item_identity(item, include_values=include_values) + for item in items + ] identities.sort(key=_canonical_json) payload = { - "version": FINGERPRINT_VERSION, + "version": FINGERPRINT_VERSION + if include_values + else SEMANTIC_FINGERPRINT_VERSION, "namespace": namespace, "items": identities, } @@ -101,16 +115,26 @@ def build_drift_fingerprint(namespace: str, items: list[Any]) -> str: return f"dfp_{digest[:16]}" -def _report_identity(report: Any) -> dict[str, Any]: +def _report_identity(report: Any, *, include_values: bool = True) -> dict[str, Any]: items = _get(report, "items", []) or [] namespace = str(_get(report, "namespace", "")) + strict_fingerprint = build_drift_fingerprint( + namespace, + list(items), + include_values=True, + ) return { "report_id": _get(report, "report_id"), "namespace": namespace, "status": str(_enum_value(_get(report, "status", ""))), "scanned_at": _get(report, "scanned_at"), "created_at": _get(report, "created_at"), - "fingerprint": build_drift_fingerprint(namespace, list(items)), + "fingerprint": build_drift_fingerprint( + namespace, + list(items), + include_values=include_values, + ), + "strict_fingerprint": strict_fingerprint, } @@ -118,11 +142,12 @@ def build_drift_repeat_state( report: Any, recent_reports: list[Any], *, + include_values: bool = True, window_hours: int = 12, max_reports: int = 20, ) -> dict[str, Any]: """Summarize repeat state for one drift report using stable fingerprints.""" - current = _report_identity(report) + current = _report_identity(report, include_values=include_values) current_time = ( _parse_datetime(current.get("scanned_at")) or _parse_datetime(current.get("created_at")) @@ -132,7 +157,7 @@ def build_drift_repeat_state( by_id: dict[str, dict[str, Any]] = {} for candidate in [report, *recent_reports]: - identity = _report_identity(candidate) + identity = _report_identity(candidate, include_values=include_values) report_id = str(identity.get("report_id") or "") if not report_id: continue @@ -162,7 +187,12 @@ def build_drift_repeat_state( return { "schema_version": SCHEMA_VERSION, "fingerprint": current["fingerprint"], - "matching_strategy": "namespace_and_stable_items_v1", + "strict_fingerprint": current["strict_fingerprint"], + "matching_strategy": ( + VALUE_AWARE_MATCHING_STRATEGY + if include_values + else SEMANTIC_MATCHING_STRATEGY + ), "window_hours": window_hours, "occurrences_12h": len(matches), "first_scanned_at": _iso(first.get("scanned_at") or first.get("created_at")), @@ -174,6 +204,7 @@ def build_drift_repeat_state( "scanned_at": _iso(row.get("scanned_at")), "created_at": _iso(row.get("created_at")), "status": row.get("status"), + "strict_fingerprint": row.get("strict_fingerprint"), } for row in reversed(matches[-max_reports:]) ], diff --git a/apps/api/src/services/emergency_escalation_service.py b/apps/api/src/services/emergency_escalation_service.py index 7de5aac9..4a8a15b6 100644 --- a/apps/api/src/services/emergency_escalation_service.py +++ b/apps/api/src/services/emergency_escalation_service.py @@ -24,6 +24,7 @@ def _drift_emergency_fingerprint(report: Any) -> str: return build_drift_fingerprint( str(getattr(report, "namespace", "") or ""), list(getattr(report, "items", []) or []), + include_values=False, ) except Exception as exc: logger.warning( diff --git a/apps/api/tests/test_awooop_truth_chain_service.py b/apps/api/tests/test_awooop_truth_chain_service.py index 59923385..d89b9337 100644 --- a/apps/api/tests/test_awooop_truth_chain_service.py +++ b/apps/api/tests/test_awooop_truth_chain_service.py @@ -10,12 +10,12 @@ from src.services.awooop_ansible_audit_service import ( ) from src.services.awooop_truth_chain_service import ( _automation_quality_score_bucket, - build_automation_quality, - build_incident_reconciliation, _clean_row, _incident_fingerprints, _summarize_gateway_mcp, _truth_status, + build_automation_quality, + build_incident_reconciliation, fetch_truth_chain, summarize_automation_quality_records, ) @@ -302,6 +302,39 @@ def test_drift_repeat_state_counts_matching_fingerprint_only() -> None: ] +def test_drift_repeat_state_can_group_semantic_shape_without_values() -> None: + now = datetime(2026, 5, 19, 1, 0, tzinfo=timezone.utc) + report = { + "report_id": "drift-now", + "namespace": "awoooi-prod", + "status": "pending", + "scanned_at": now, + "created_at": now, + "items": [_drift_item(actual_value=["vol-a", "vol-b"])], + } + recent = [ + { + **report, + "report_id": "drift-prev", + "scanned_at": now - timedelta(hours=1), + "created_at": now - timedelta(hours=1), + "items": [_drift_item(actual_value=["vol-a", "vol-b", "vol-c"])], + }, + ] + + strict_state = build_drift_repeat_state(report, recent) + semantic_state = build_drift_repeat_state( + report, + recent, + include_values=False, + ) + + assert strict_state["occurrences_12h"] == 1 + assert semantic_state["matching_strategy"] == "namespace_resource_field_level_v2" + assert semantic_state["occurrences_12h"] == 2 + assert semantic_state["fingerprint"] != semantic_state["strict_fingerprint"] + + def test_reconciliation_blocks_open_incident_after_no_action_approval() -> None: reconciliation = build_incident_reconciliation( incident={"incident_id": "INC-1", "status": "INVESTIGATING"},