feat(awooop): expose drift repeat fingerprint
This commit is contained in:
@@ -167,6 +167,31 @@ class DriftReportRepository:
|
||||
{"report_id": report_id, "narrative": narrative},
|
||||
)
|
||||
|
||||
async def get_repeat_state(self, report: DriftReport) -> dict:
|
||||
"""Return stable fingerprint repeat state for a drift report."""
|
||||
from src.services.drift_repeat_state import build_drift_repeat_state
|
||||
|
||||
async with get_db_context() as db:
|
||||
result = await db.execute(
|
||||
text("""
|
||||
SELECT
|
||||
report_id,
|
||||
namespace,
|
||||
status,
|
||||
scanned_at,
|
||||
created_at,
|
||||
items
|
||||
FROM drift_reports
|
||||
WHERE namespace = :namespace
|
||||
AND created_at > now() - interval '24 hours'
|
||||
ORDER BY scanned_at DESC
|
||||
LIMIT 200
|
||||
"""),
|
||||
{"namespace": report.namespace},
|
||||
)
|
||||
rows = [dict(row) for row in result.mappings().all()]
|
||||
return build_drift_repeat_state(report, rows)
|
||||
|
||||
|
||||
_drift_repo: DriftReportRepository | None = None
|
||||
|
||||
|
||||
@@ -18,6 +18,7 @@ from sqlalchemy import text
|
||||
|
||||
from src.db.base import get_db_context
|
||||
from src.services.awooop_ansible_audit_service import build_ansible_truth
|
||||
from src.services.drift_repeat_state import build_drift_repeat_state
|
||||
|
||||
logger = structlog.get_logger(__name__)
|
||||
|
||||
@@ -266,6 +267,7 @@ async def fetch_truth_chain(source_id: str, project_id: str = "awoooi") -> dict[
|
||||
created_at,
|
||||
resolved_at,
|
||||
interpretation,
|
||||
items,
|
||||
narrative_text
|
||||
FROM drift_reports
|
||||
WHERE report_id = :source_id
|
||||
@@ -473,55 +475,27 @@ async def fetch_truth_chain(source_id: str, project_id: str = "awoooi") -> dict[
|
||||
"reports": [],
|
||||
}
|
||||
if drift is not None:
|
||||
repeat_summary = await _fetch_one(
|
||||
recent_drift_reports = await _fetch_all(
|
||||
db,
|
||||
"""
|
||||
SELECT
|
||||
count(*) AS occurrences_12h,
|
||||
min(scanned_at) AS first_scanned_at,
|
||||
max(scanned_at) AS last_scanned_at
|
||||
report_id,
|
||||
namespace,
|
||||
status,
|
||||
scanned_at,
|
||||
created_at,
|
||||
items,
|
||||
interpretation,
|
||||
narrative_text
|
||||
FROM drift_reports
|
||||
WHERE created_at > now() - interval '12 hours'
|
||||
WHERE created_at > now() - interval '24 hours'
|
||||
AND namespace = :namespace
|
||||
AND status = :status
|
||||
AND high_count = :high_count
|
||||
AND medium_count = :medium_count
|
||||
AND info_count = :info_count
|
||||
""",
|
||||
{
|
||||
"namespace": drift["namespace"],
|
||||
"status": drift["status"],
|
||||
"high_count": drift["high_count"],
|
||||
"medium_count": drift["medium_count"],
|
||||
"info_count": drift["info_count"],
|
||||
},
|
||||
)
|
||||
repeat_reports = await _fetch_all(
|
||||
db,
|
||||
"""
|
||||
SELECT report_id, scanned_at, created_at, status, interpretation, narrative_text
|
||||
FROM drift_reports
|
||||
WHERE created_at > now() - interval '12 hours'
|
||||
AND namespace = :namespace
|
||||
AND status = :status
|
||||
AND high_count = :high_count
|
||||
AND medium_count = :medium_count
|
||||
AND info_count = :info_count
|
||||
ORDER BY scanned_at DESC
|
||||
LIMIT 20
|
||||
LIMIT 200
|
||||
""",
|
||||
{
|
||||
"namespace": drift["namespace"],
|
||||
"status": drift["status"],
|
||||
"high_count": drift["high_count"],
|
||||
"medium_count": drift["medium_count"],
|
||||
"info_count": drift["info_count"],
|
||||
},
|
||||
{"namespace": drift["namespace"]},
|
||||
)
|
||||
drift_repeats = {
|
||||
**(repeat_summary or {}),
|
||||
"reports": repeat_reports,
|
||||
}
|
||||
drift_repeats = build_drift_repeat_state(drift, recent_drift_reports)
|
||||
|
||||
gateway_mcp_rows = await _fetch_all(
|
||||
db,
|
||||
|
||||
@@ -148,7 +148,13 @@ class DriftNarratorService:
|
||||
# 2026-04-18 B 方案: LLM 同時產 narrative + 結構化 items(取代 str()[:30])
|
||||
# 2026-04-20 P0.2: 追加 recommendation(action/confidence/reason)
|
||||
narrative, items, recommendation = await self._generate_narrative_and_items(report, interpretation)
|
||||
await self._send_telegram(report, narrative, items, recommendation)
|
||||
repeat_state = None
|
||||
try:
|
||||
from src.repositories.drift_repository import get_drift_repository
|
||||
repeat_state = await get_drift_repository().get_repeat_state(report)
|
||||
except Exception as e:
|
||||
logger.warning("drift_repeat_state_lookup_failed", report_id=report.report_id, error=str(e))
|
||||
await self._send_telegram(report, narrative, items, recommendation, repeat_state)
|
||||
|
||||
# 寫入 DB narrative_text (Phase 30 ADR-067)
|
||||
try:
|
||||
@@ -643,6 +649,7 @@ class DriftNarratorService:
|
||||
narrative: str,
|
||||
items: list[dict],
|
||||
recommendation: dict | None = None,
|
||||
repeat_state: dict | None = None,
|
||||
) -> None:
|
||||
"""
|
||||
推送 TYPE-4D Config Drift 卡片(ADR-075)+ B 方案智能摘要
|
||||
@@ -654,7 +661,7 @@ class DriftNarratorService:
|
||||
"""
|
||||
from src.services.telegram_gateway import get_telegram_gateway
|
||||
|
||||
diff_summary = self._render_telegram_body(report, narrative, items, recommendation)
|
||||
diff_summary = self._render_telegram_body(report, narrative, items, recommendation, repeat_state)
|
||||
|
||||
try:
|
||||
tg = get_telegram_gateway()
|
||||
@@ -711,6 +718,7 @@ class DriftNarratorService:
|
||||
narrative: str,
|
||||
items: list[dict],
|
||||
recommendation: dict | None = None,
|
||||
repeat_state: dict | None = None,
|
||||
) -> str:
|
||||
"""
|
||||
組裝 Telegram 卡片 body(B 方案格式 + P0.2 AI 推薦)
|
||||
@@ -741,6 +749,10 @@ class DriftNarratorService:
|
||||
}.get(_act, _act)
|
||||
lines.append(f"🎯 AI 建議:{_emoji_action} ({int(_conf * 100)}%) — {_reason}\n")
|
||||
|
||||
repeat_line = self._render_repeat_state(repeat_state)
|
||||
if repeat_line:
|
||||
lines.append(f"{repeat_line}\n")
|
||||
|
||||
lines.append(f"🤖 AI 研判\n{narrative}\n")
|
||||
|
||||
# 用非 trivial + 非白名單 的實際可操作數顯示
|
||||
@@ -761,6 +773,23 @@ class DriftNarratorService:
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
def _render_repeat_state(self, repeat_state: dict | None) -> str:
|
||||
"""Render operator-visible repeat/stage metadata for Telegram."""
|
||||
if not repeat_state:
|
||||
return ""
|
||||
fingerprint = str(repeat_state.get("fingerprint") or "unknown")
|
||||
occurrences = int(repeat_state.get("occurrences_12h") or 0)
|
||||
window_hours = int(repeat_state.get("window_hours") or 12)
|
||||
stage = str(repeat_state.get("operator_stage") or "unknown")
|
||||
if occurrences <= 1:
|
||||
repeat_text = f"{window_hours}h 內首次出現"
|
||||
else:
|
||||
repeat_text = f"{window_hours}h 內第 {occurrences} 次同指紋"
|
||||
return (
|
||||
"流程: drift_scanned → ai_analyzed → "
|
||||
f"{stage}\n重複: {repeat_text}\n指紋: {fingerprint}"
|
||||
)
|
||||
|
||||
|
||||
# ============================================================
|
||||
# Singleton
|
||||
|
||||
180
apps/api/src/services/drift_repeat_state.py
Normal file
180
apps/api/src/services/drift_repeat_state.py
Normal file
@@ -0,0 +1,180 @@
|
||||
"""Stable repeat identity for Config Drift reports.
|
||||
|
||||
The drift scanner emits a fresh ``report_id`` for every run. Operators need a
|
||||
stable identity that answers whether two reports describe the same drift, not
|
||||
just whether they have the same HIGH/MEDIUM/INFO counts.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import hashlib
|
||||
import json
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from typing import Any
|
||||
|
||||
|
||||
SCHEMA_VERSION = "drift_repeat_state_v1"
|
||||
FINGERPRINT_VERSION = "drift_fingerprint_v1"
|
||||
|
||||
|
||||
def _get(obj: Any, key: str, default: Any = None) -> Any:
|
||||
if isinstance(obj, dict):
|
||||
return obj.get(key, default)
|
||||
return getattr(obj, key, default)
|
||||
|
||||
|
||||
def _enum_value(value: Any) -> Any:
|
||||
return getattr(value, "value", value)
|
||||
|
||||
|
||||
def _jsonable(value: Any) -> Any:
|
||||
value = _enum_value(value)
|
||||
if isinstance(value, dict):
|
||||
return {str(k): _jsonable(v) for k, v in value.items()}
|
||||
if isinstance(value, list):
|
||||
return [_jsonable(v) for v in value]
|
||||
if isinstance(value, tuple):
|
||||
return [_jsonable(v) for v in value]
|
||||
if isinstance(value, datetime):
|
||||
return value.isoformat()
|
||||
return value
|
||||
|
||||
|
||||
def _canonical_json(value: Any) -> str:
|
||||
return json.dumps(
|
||||
_jsonable(value),
|
||||
ensure_ascii=False,
|
||||
sort_keys=True,
|
||||
separators=(",", ":"),
|
||||
default=str,
|
||||
)
|
||||
|
||||
|
||||
def _parse_datetime(value: Any) -> datetime | None:
|
||||
if value is None:
|
||||
return None
|
||||
if isinstance(value, datetime):
|
||||
parsed = value
|
||||
if parsed.tzinfo is not None:
|
||||
return parsed.astimezone(timezone.utc).replace(tzinfo=None)
|
||||
return parsed
|
||||
if isinstance(value, str):
|
||||
try:
|
||||
parsed = datetime.fromisoformat(value.replace("Z", "+00:00"))
|
||||
if parsed.tzinfo is not None:
|
||||
return parsed.astimezone(timezone.utc).replace(tzinfo=None)
|
||||
return parsed
|
||||
except ValueError:
|
||||
return None
|
||||
return None
|
||||
|
||||
|
||||
def _iso(value: Any) -> str | None:
|
||||
parsed = _parse_datetime(value)
|
||||
return parsed.isoformat() if parsed else None
|
||||
|
||||
|
||||
def drift_item_identity(item: Any) -> dict[str, Any]:
|
||||
"""Return the stable fields that define one drift item."""
|
||||
return {
|
||||
"resource_kind": str(_get(item, "resource_kind", "")),
|
||||
"resource_name": str(_get(item, "resource_name", "")),
|
||||
"namespace": str(_get(item, "namespace", "")),
|
||||
"field_path": str(_get(item, "field_path", "")),
|
||||
"drift_level": str(_enum_value(_get(item, "drift_level", ""))),
|
||||
"git_value": _jsonable(_get(item, "git_value")),
|
||||
"actual_value": _jsonable(_get(item, "actual_value")),
|
||||
"is_allowlisted": bool(_get(item, "is_allowlisted", False)),
|
||||
}
|
||||
|
||||
|
||||
def build_drift_fingerprint(namespace: str, items: list[Any]) -> str:
|
||||
"""Build a deterministic fingerprint from namespace + sorted drift items."""
|
||||
identities = [drift_item_identity(item) for item in items]
|
||||
identities.sort(key=_canonical_json)
|
||||
payload = {
|
||||
"version": FINGERPRINT_VERSION,
|
||||
"namespace": namespace,
|
||||
"items": identities,
|
||||
}
|
||||
digest = hashlib.sha256(_canonical_json(payload).encode("utf-8")).hexdigest()
|
||||
return f"dfp_{digest[:16]}"
|
||||
|
||||
|
||||
def _report_identity(report: Any) -> dict[str, Any]:
|
||||
items = _get(report, "items", []) or []
|
||||
namespace = str(_get(report, "namespace", ""))
|
||||
return {
|
||||
"report_id": _get(report, "report_id"),
|
||||
"namespace": namespace,
|
||||
"status": str(_enum_value(_get(report, "status", ""))),
|
||||
"scanned_at": _get(report, "scanned_at"),
|
||||
"created_at": _get(report, "created_at"),
|
||||
"fingerprint": build_drift_fingerprint(namespace, list(items)),
|
||||
}
|
||||
|
||||
|
||||
def build_drift_repeat_state(
|
||||
report: Any,
|
||||
recent_reports: list[Any],
|
||||
*,
|
||||
window_hours: int = 12,
|
||||
max_reports: int = 20,
|
||||
) -> dict[str, Any]:
|
||||
"""Summarize repeat state for one drift report using stable fingerprints."""
|
||||
current = _report_identity(report)
|
||||
current_time = (
|
||||
_parse_datetime(current.get("scanned_at"))
|
||||
or _parse_datetime(current.get("created_at"))
|
||||
or datetime.now()
|
||||
)
|
||||
cutoff = current_time - timedelta(hours=window_hours)
|
||||
|
||||
by_id: dict[str, dict[str, Any]] = {}
|
||||
for candidate in [report, *recent_reports]:
|
||||
identity = _report_identity(candidate)
|
||||
report_id = str(identity.get("report_id") or "")
|
||||
if not report_id:
|
||||
continue
|
||||
candidate_time = (
|
||||
_parse_datetime(identity.get("scanned_at"))
|
||||
or _parse_datetime(identity.get("created_at"))
|
||||
)
|
||||
if candidate_time is not None and candidate_time < cutoff:
|
||||
continue
|
||||
if identity["fingerprint"] != current["fingerprint"]:
|
||||
continue
|
||||
by_id[report_id] = identity
|
||||
|
||||
matches = sorted(
|
||||
by_id.values(),
|
||||
key=lambda row: (
|
||||
_parse_datetime(row.get("scanned_at"))
|
||||
or _parse_datetime(row.get("created_at"))
|
||||
or datetime.min
|
||||
),
|
||||
)
|
||||
first = matches[0] if matches else current
|
||||
last = matches[-1] if matches else current
|
||||
status = current.get("status") or "unknown"
|
||||
operator_stage = "pending_human" if status == "pending" else str(status)
|
||||
|
||||
return {
|
||||
"schema_version": SCHEMA_VERSION,
|
||||
"fingerprint": current["fingerprint"],
|
||||
"matching_strategy": "namespace_and_stable_items_v1",
|
||||
"window_hours": window_hours,
|
||||
"occurrences_12h": len(matches),
|
||||
"first_scanned_at": _iso(first.get("scanned_at") or first.get("created_at")),
|
||||
"last_scanned_at": _iso(last.get("scanned_at") or last.get("created_at")),
|
||||
"operator_stage": operator_stage,
|
||||
"reports": [
|
||||
{
|
||||
"report_id": row.get("report_id"),
|
||||
"scanned_at": _iso(row.get("scanned_at")),
|
||||
"created_at": _iso(row.get("created_at")),
|
||||
"status": row.get("status"),
|
||||
}
|
||||
for row in reversed(matches[-max_reports:])
|
||||
],
|
||||
}
|
||||
@@ -1,5 +1,6 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from types import SimpleNamespace
|
||||
|
||||
from src.services.awooop_ansible_audit_service import (
|
||||
@@ -7,6 +8,10 @@ from src.services.awooop_ansible_audit_service import (
|
||||
build_ansible_truth,
|
||||
)
|
||||
from src.services.awooop_truth_chain_service import _clean_row, _truth_status
|
||||
from src.services.drift_repeat_state import (
|
||||
build_drift_fingerprint,
|
||||
build_drift_repeat_state,
|
||||
)
|
||||
|
||||
|
||||
def test_clean_row_parses_json_text_fields_for_gateway_visibility() -> None:
|
||||
@@ -69,6 +74,89 @@ def test_truth_status_marks_repeated_pending_drift_as_human_needed() -> None:
|
||||
assert "drift_ai_confidence_zero" in status["blockers"]
|
||||
|
||||
|
||||
def _drift_item(
|
||||
*,
|
||||
resource_name: str = "awoooi-api",
|
||||
field_path: str = "spec.template.spec.containers[0].image",
|
||||
actual_value: str = "api:hotfix",
|
||||
) -> dict:
|
||||
return {
|
||||
"resource_kind": "Deployment",
|
||||
"resource_name": resource_name,
|
||||
"namespace": "awoooi-prod",
|
||||
"field_path": field_path,
|
||||
"git_value": "api:main",
|
||||
"actual_value": actual_value,
|
||||
"drift_level": "high",
|
||||
"is_allowlisted": False,
|
||||
}
|
||||
|
||||
|
||||
def test_drift_fingerprint_is_stable_across_item_order() -> None:
|
||||
item_a = _drift_item(resource_name="awoooi-api")
|
||||
item_b = _drift_item(
|
||||
resource_name="awoooi-worker",
|
||||
field_path="spec.template.spec.serviceAccountName",
|
||||
actual_value="awoooi-executor",
|
||||
)
|
||||
|
||||
first = build_drift_fingerprint("awoooi-prod", [item_a, item_b])
|
||||
second = build_drift_fingerprint("awoooi-prod", [item_b, item_a])
|
||||
changed = build_drift_fingerprint(
|
||||
"awoooi-prod",
|
||||
[item_a, {**item_b, "actual_value": "different-service-account"}],
|
||||
)
|
||||
|
||||
assert first == second
|
||||
assert first.startswith("dfp_")
|
||||
assert first != changed
|
||||
|
||||
|
||||
def test_drift_repeat_state_counts_matching_fingerprint_only() -> None:
|
||||
now = datetime(2026, 5, 13, 1, 0, tzinfo=timezone.utc)
|
||||
report = {
|
||||
"report_id": "drift-now",
|
||||
"namespace": "awoooi-prod",
|
||||
"status": "pending",
|
||||
"scanned_at": now,
|
||||
"created_at": now,
|
||||
"items": [_drift_item()],
|
||||
}
|
||||
recent = [
|
||||
{
|
||||
**report,
|
||||
"report_id": "drift-prev",
|
||||
"scanned_at": now - timedelta(hours=1),
|
||||
"created_at": now - timedelta(hours=1),
|
||||
},
|
||||
{
|
||||
**report,
|
||||
"report_id": "drift-different",
|
||||
"scanned_at": now - timedelta(hours=2),
|
||||
"created_at": now - timedelta(hours=2),
|
||||
"items": [_drift_item(actual_value="api:other")],
|
||||
},
|
||||
{
|
||||
**report,
|
||||
"report_id": "drift-old",
|
||||
"scanned_at": now - timedelta(hours=13),
|
||||
"created_at": now - timedelta(hours=13),
|
||||
},
|
||||
]
|
||||
|
||||
repeat_state = build_drift_repeat_state(report, recent)
|
||||
|
||||
assert repeat_state["schema_version"] == "drift_repeat_state_v1"
|
||||
assert repeat_state["fingerprint"].startswith("dfp_")
|
||||
assert repeat_state["matching_strategy"] == "namespace_and_stable_items_v1"
|
||||
assert repeat_state["occurrences_12h"] == 2
|
||||
assert repeat_state["operator_stage"] == "pending_human"
|
||||
assert [row["report_id"] for row in repeat_state["reports"]] == [
|
||||
"drift-now",
|
||||
"drift-prev",
|
||||
]
|
||||
|
||||
|
||||
def test_ansible_truth_surfaces_audited_check_mode_record() -> None:
|
||||
truth = build_ansible_truth(
|
||||
[
|
||||
|
||||
Reference in New Issue
Block a user