diff --git a/apps/api/src/repositories/drift_repository.py b/apps/api/src/repositories/drift_repository.py index a40c16b3..15124c5d 100644 --- a/apps/api/src/repositories/drift_repository.py +++ b/apps/api/src/repositories/drift_repository.py @@ -167,6 +167,31 @@ class DriftReportRepository: {"report_id": report_id, "narrative": narrative}, ) + async def get_repeat_state(self, report: DriftReport) -> dict: + """Return stable fingerprint repeat state for a drift report.""" + from src.services.drift_repeat_state import build_drift_repeat_state + + async with get_db_context() as db: + result = await db.execute( + text(""" + SELECT + report_id, + namespace, + status, + scanned_at, + created_at, + items + FROM drift_reports + WHERE namespace = :namespace + AND created_at > now() - interval '24 hours' + ORDER BY scanned_at DESC + LIMIT 200 + """), + {"namespace": report.namespace}, + ) + rows = [dict(row) for row in result.mappings().all()] + return build_drift_repeat_state(report, rows) + _drift_repo: DriftReportRepository | None = None diff --git a/apps/api/src/services/awooop_truth_chain_service.py b/apps/api/src/services/awooop_truth_chain_service.py index ed182eb2..91e840d6 100644 --- a/apps/api/src/services/awooop_truth_chain_service.py +++ b/apps/api/src/services/awooop_truth_chain_service.py @@ -18,6 +18,7 @@ from sqlalchemy import text from src.db.base import get_db_context from src.services.awooop_ansible_audit_service import build_ansible_truth +from src.services.drift_repeat_state import build_drift_repeat_state logger = structlog.get_logger(__name__) @@ -266,6 +267,7 @@ async def fetch_truth_chain(source_id: str, project_id: str = "awoooi") -> dict[ created_at, resolved_at, interpretation, + items, narrative_text FROM drift_reports WHERE report_id = :source_id @@ -473,55 +475,27 @@ async def fetch_truth_chain(source_id: str, project_id: str = "awoooi") -> dict[ "reports": [], } if drift is not None: - repeat_summary = await _fetch_one( + recent_drift_reports = await _fetch_all( db, """ SELECT - count(*) AS occurrences_12h, - min(scanned_at) AS first_scanned_at, - max(scanned_at) AS last_scanned_at + report_id, + namespace, + status, + scanned_at, + created_at, + items, + interpretation, + narrative_text FROM drift_reports - WHERE created_at > now() - interval '12 hours' + WHERE created_at > now() - interval '24 hours' AND namespace = :namespace - AND status = :status - AND high_count = :high_count - AND medium_count = :medium_count - AND info_count = :info_count - """, - { - "namespace": drift["namespace"], - "status": drift["status"], - "high_count": drift["high_count"], - "medium_count": drift["medium_count"], - "info_count": drift["info_count"], - }, - ) - repeat_reports = await _fetch_all( - db, - """ - SELECT report_id, scanned_at, created_at, status, interpretation, narrative_text - FROM drift_reports - WHERE created_at > now() - interval '12 hours' - AND namespace = :namespace - AND status = :status - AND high_count = :high_count - AND medium_count = :medium_count - AND info_count = :info_count ORDER BY scanned_at DESC - LIMIT 20 + LIMIT 200 """, - { - "namespace": drift["namespace"], - "status": drift["status"], - "high_count": drift["high_count"], - "medium_count": drift["medium_count"], - "info_count": drift["info_count"], - }, + {"namespace": drift["namespace"]}, ) - drift_repeats = { - **(repeat_summary or {}), - "reports": repeat_reports, - } + drift_repeats = build_drift_repeat_state(drift, recent_drift_reports) gateway_mcp_rows = await _fetch_all( db, diff --git a/apps/api/src/services/drift_narrator_service.py b/apps/api/src/services/drift_narrator_service.py index e09448e6..8e29651f 100644 --- a/apps/api/src/services/drift_narrator_service.py +++ b/apps/api/src/services/drift_narrator_service.py @@ -148,7 +148,13 @@ class DriftNarratorService: # 2026-04-18 B 方案: LLM 同時產 narrative + 結構化 items(取代 str()[:30]) # 2026-04-20 P0.2: 追加 recommendation(action/confidence/reason) narrative, items, recommendation = await self._generate_narrative_and_items(report, interpretation) - await self._send_telegram(report, narrative, items, recommendation) + repeat_state = None + try: + from src.repositories.drift_repository import get_drift_repository + repeat_state = await get_drift_repository().get_repeat_state(report) + except Exception as e: + logger.warning("drift_repeat_state_lookup_failed", report_id=report.report_id, error=str(e)) + await self._send_telegram(report, narrative, items, recommendation, repeat_state) # 寫入 DB narrative_text (Phase 30 ADR-067) try: @@ -643,6 +649,7 @@ class DriftNarratorService: narrative: str, items: list[dict], recommendation: dict | None = None, + repeat_state: dict | None = None, ) -> None: """ 推送 TYPE-4D Config Drift 卡片(ADR-075)+ B 方案智能摘要 @@ -654,7 +661,7 @@ class DriftNarratorService: """ from src.services.telegram_gateway import get_telegram_gateway - diff_summary = self._render_telegram_body(report, narrative, items, recommendation) + diff_summary = self._render_telegram_body(report, narrative, items, recommendation, repeat_state) try: tg = get_telegram_gateway() @@ -711,6 +718,7 @@ class DriftNarratorService: narrative: str, items: list[dict], recommendation: dict | None = None, + repeat_state: dict | None = None, ) -> str: """ 組裝 Telegram 卡片 body(B 方案格式 + P0.2 AI 推薦) @@ -741,6 +749,10 @@ class DriftNarratorService: }.get(_act, _act) lines.append(f"🎯 AI 建議:{_emoji_action} ({int(_conf * 100)}%) — {_reason}\n") + repeat_line = self._render_repeat_state(repeat_state) + if repeat_line: + lines.append(f"{repeat_line}\n") + lines.append(f"🤖 AI 研判\n{narrative}\n") # 用非 trivial + 非白名單 的實際可操作數顯示 @@ -761,6 +773,23 @@ class DriftNarratorService: return "\n".join(lines) + def _render_repeat_state(self, repeat_state: dict | None) -> str: + """Render operator-visible repeat/stage metadata for Telegram.""" + if not repeat_state: + return "" + fingerprint = str(repeat_state.get("fingerprint") or "unknown") + occurrences = int(repeat_state.get("occurrences_12h") or 0) + window_hours = int(repeat_state.get("window_hours") or 12) + stage = str(repeat_state.get("operator_stage") or "unknown") + if occurrences <= 1: + repeat_text = f"{window_hours}h 內首次出現" + else: + repeat_text = f"{window_hours}h 內第 {occurrences} 次同指紋" + return ( + "流程: drift_scanned → ai_analyzed → " + f"{stage}\n重複: {repeat_text}\n指紋: {fingerprint}" + ) + # ============================================================ # Singleton diff --git a/apps/api/src/services/drift_repeat_state.py b/apps/api/src/services/drift_repeat_state.py new file mode 100644 index 00000000..36b9ec6b --- /dev/null +++ b/apps/api/src/services/drift_repeat_state.py @@ -0,0 +1,180 @@ +"""Stable repeat identity for Config Drift reports. + +The drift scanner emits a fresh ``report_id`` for every run. Operators need a +stable identity that answers whether two reports describe the same drift, not +just whether they have the same HIGH/MEDIUM/INFO counts. +""" + +from __future__ import annotations + +import hashlib +import json +from datetime import datetime, timedelta, timezone +from typing import Any + + +SCHEMA_VERSION = "drift_repeat_state_v1" +FINGERPRINT_VERSION = "drift_fingerprint_v1" + + +def _get(obj: Any, key: str, default: Any = None) -> Any: + if isinstance(obj, dict): + return obj.get(key, default) + return getattr(obj, key, default) + + +def _enum_value(value: Any) -> Any: + return getattr(value, "value", value) + + +def _jsonable(value: Any) -> Any: + value = _enum_value(value) + if isinstance(value, dict): + return {str(k): _jsonable(v) for k, v in value.items()} + if isinstance(value, list): + return [_jsonable(v) for v in value] + if isinstance(value, tuple): + return [_jsonable(v) for v in value] + if isinstance(value, datetime): + return value.isoformat() + return value + + +def _canonical_json(value: Any) -> str: + return json.dumps( + _jsonable(value), + ensure_ascii=False, + sort_keys=True, + separators=(",", ":"), + default=str, + ) + + +def _parse_datetime(value: Any) -> datetime | None: + if value is None: + return None + if isinstance(value, datetime): + parsed = value + if parsed.tzinfo is not None: + return parsed.astimezone(timezone.utc).replace(tzinfo=None) + return parsed + if isinstance(value, str): + try: + parsed = datetime.fromisoformat(value.replace("Z", "+00:00")) + if parsed.tzinfo is not None: + return parsed.astimezone(timezone.utc).replace(tzinfo=None) + return parsed + except ValueError: + return None + return None + + +def _iso(value: Any) -> str | None: + parsed = _parse_datetime(value) + return parsed.isoformat() if parsed else None + + +def drift_item_identity(item: Any) -> dict[str, Any]: + """Return the stable fields that define one drift item.""" + return { + "resource_kind": str(_get(item, "resource_kind", "")), + "resource_name": str(_get(item, "resource_name", "")), + "namespace": str(_get(item, "namespace", "")), + "field_path": str(_get(item, "field_path", "")), + "drift_level": str(_enum_value(_get(item, "drift_level", ""))), + "git_value": _jsonable(_get(item, "git_value")), + "actual_value": _jsonable(_get(item, "actual_value")), + "is_allowlisted": bool(_get(item, "is_allowlisted", False)), + } + + +def build_drift_fingerprint(namespace: str, items: list[Any]) -> str: + """Build a deterministic fingerprint from namespace + sorted drift items.""" + identities = [drift_item_identity(item) for item in items] + identities.sort(key=_canonical_json) + payload = { + "version": FINGERPRINT_VERSION, + "namespace": namespace, + "items": identities, + } + digest = hashlib.sha256(_canonical_json(payload).encode("utf-8")).hexdigest() + return f"dfp_{digest[:16]}" + + +def _report_identity(report: Any) -> dict[str, Any]: + items = _get(report, "items", []) or [] + namespace = str(_get(report, "namespace", "")) + return { + "report_id": _get(report, "report_id"), + "namespace": namespace, + "status": str(_enum_value(_get(report, "status", ""))), + "scanned_at": _get(report, "scanned_at"), + "created_at": _get(report, "created_at"), + "fingerprint": build_drift_fingerprint(namespace, list(items)), + } + + +def build_drift_repeat_state( + report: Any, + recent_reports: list[Any], + *, + window_hours: int = 12, + max_reports: int = 20, +) -> dict[str, Any]: + """Summarize repeat state for one drift report using stable fingerprints.""" + current = _report_identity(report) + current_time = ( + _parse_datetime(current.get("scanned_at")) + or _parse_datetime(current.get("created_at")) + or datetime.now() + ) + cutoff = current_time - timedelta(hours=window_hours) + + by_id: dict[str, dict[str, Any]] = {} + for candidate in [report, *recent_reports]: + identity = _report_identity(candidate) + report_id = str(identity.get("report_id") or "") + if not report_id: + continue + candidate_time = ( + _parse_datetime(identity.get("scanned_at")) + or _parse_datetime(identity.get("created_at")) + ) + if candidate_time is not None and candidate_time < cutoff: + continue + if identity["fingerprint"] != current["fingerprint"]: + continue + by_id[report_id] = identity + + matches = sorted( + by_id.values(), + key=lambda row: ( + _parse_datetime(row.get("scanned_at")) + or _parse_datetime(row.get("created_at")) + or datetime.min + ), + ) + first = matches[0] if matches else current + last = matches[-1] if matches else current + status = current.get("status") or "unknown" + operator_stage = "pending_human" if status == "pending" else str(status) + + return { + "schema_version": SCHEMA_VERSION, + "fingerprint": current["fingerprint"], + "matching_strategy": "namespace_and_stable_items_v1", + "window_hours": window_hours, + "occurrences_12h": len(matches), + "first_scanned_at": _iso(first.get("scanned_at") or first.get("created_at")), + "last_scanned_at": _iso(last.get("scanned_at") or last.get("created_at")), + "operator_stage": operator_stage, + "reports": [ + { + "report_id": row.get("report_id"), + "scanned_at": _iso(row.get("scanned_at")), + "created_at": _iso(row.get("created_at")), + "status": row.get("status"), + } + for row in reversed(matches[-max_reports:]) + ], + } diff --git a/apps/api/tests/test_awooop_truth_chain_service.py b/apps/api/tests/test_awooop_truth_chain_service.py index a2d09172..a3097c41 100644 --- a/apps/api/tests/test_awooop_truth_chain_service.py +++ b/apps/api/tests/test_awooop_truth_chain_service.py @@ -1,5 +1,6 @@ from __future__ import annotations +from datetime import datetime, timedelta, timezone from types import SimpleNamespace from src.services.awooop_ansible_audit_service import ( @@ -7,6 +8,10 @@ from src.services.awooop_ansible_audit_service import ( build_ansible_truth, ) from src.services.awooop_truth_chain_service import _clean_row, _truth_status +from src.services.drift_repeat_state import ( + build_drift_fingerprint, + build_drift_repeat_state, +) def test_clean_row_parses_json_text_fields_for_gateway_visibility() -> None: @@ -69,6 +74,89 @@ def test_truth_status_marks_repeated_pending_drift_as_human_needed() -> None: assert "drift_ai_confidence_zero" in status["blockers"] +def _drift_item( + *, + resource_name: str = "awoooi-api", + field_path: str = "spec.template.spec.containers[0].image", + actual_value: str = "api:hotfix", +) -> dict: + return { + "resource_kind": "Deployment", + "resource_name": resource_name, + "namespace": "awoooi-prod", + "field_path": field_path, + "git_value": "api:main", + "actual_value": actual_value, + "drift_level": "high", + "is_allowlisted": False, + } + + +def test_drift_fingerprint_is_stable_across_item_order() -> None: + item_a = _drift_item(resource_name="awoooi-api") + item_b = _drift_item( + resource_name="awoooi-worker", + field_path="spec.template.spec.serviceAccountName", + actual_value="awoooi-executor", + ) + + first = build_drift_fingerprint("awoooi-prod", [item_a, item_b]) + second = build_drift_fingerprint("awoooi-prod", [item_b, item_a]) + changed = build_drift_fingerprint( + "awoooi-prod", + [item_a, {**item_b, "actual_value": "different-service-account"}], + ) + + assert first == second + assert first.startswith("dfp_") + assert first != changed + + +def test_drift_repeat_state_counts_matching_fingerprint_only() -> None: + now = datetime(2026, 5, 13, 1, 0, tzinfo=timezone.utc) + report = { + "report_id": "drift-now", + "namespace": "awoooi-prod", + "status": "pending", + "scanned_at": now, + "created_at": now, + "items": [_drift_item()], + } + recent = [ + { + **report, + "report_id": "drift-prev", + "scanned_at": now - timedelta(hours=1), + "created_at": now - timedelta(hours=1), + }, + { + **report, + "report_id": "drift-different", + "scanned_at": now - timedelta(hours=2), + "created_at": now - timedelta(hours=2), + "items": [_drift_item(actual_value="api:other")], + }, + { + **report, + "report_id": "drift-old", + "scanned_at": now - timedelta(hours=13), + "created_at": now - timedelta(hours=13), + }, + ] + + repeat_state = build_drift_repeat_state(report, recent) + + assert repeat_state["schema_version"] == "drift_repeat_state_v1" + assert repeat_state["fingerprint"].startswith("dfp_") + assert repeat_state["matching_strategy"] == "namespace_and_stable_items_v1" + assert repeat_state["occurrences_12h"] == 2 + assert repeat_state["operator_stage"] == "pending_human" + assert [row["report_id"] for row in repeat_state["reports"]] == [ + "drift-now", + "drift-prev", + ] + + def test_ansible_truth_surfaces_audited_check_mode_record() -> None: truth = build_ansible_truth( [