feat(governance): archive duplicate km review drafts
All checks were successful
Code Review / ai-code-review (push) Successful in 10s
Type Sync Check / check-type-sync (push) Successful in 33s
CD Pipeline / tests (push) Successful in 3m31s
CD Pipeline / build-and-deploy (push) Successful in 4m41s
CD Pipeline / post-deploy-checks (push) Successful in 1m53s

This commit is contained in:
Your Name
2026-05-20 00:30:17 +08:00
parent 101cd42974
commit c8a995aff2
9 changed files with 779 additions and 46 deletions

View File

@@ -22,19 +22,25 @@ from datetime import datetime
from typing import Annotated
import structlog
from fastapi import APIRouter, Query
from fastapi import APIRouter, HTTPException, Query
from src.models.governance import (
GovernanceEventsResponse,
GovernanceQueueResponse,
GovernanceSummaryResponse,
KnowledgeReviewDraftArchiveRequest,
KnowledgeReviewDraftArchiveResponse,
KnowledgeReviewDraftDedupeResponse,
)
from src.services.governance_km_review_service import (
KmReviewDraftArchiveError,
archive_km_review_draft_duplicates,
)
from src.services.governance_query_service import (
query_km_review_draft_dedupe,
query_governance_events,
query_governance_queue,
query_governance_summary,
query_km_review_draft_dedupe,
)
logger = structlog.get_logger(__name__)
@@ -147,6 +153,42 @@ async def get_km_review_draft_dedupe(
return await query_km_review_draft_dedupe(limit=limit)
# =============================================================================
# POST /api/v1/ai/governance/km-review-drafts/dedupe/{event_id}/archive-duplicates
# =============================================================================
@router.post(
"/ai/governance/km-review-drafts/dedupe/{governance_event_id}/archive-duplicates",
response_model=KnowledgeReviewDraftArchiveResponse,
)
async def post_km_review_draft_archive_duplicates(
governance_event_id: str,
request: KnowledgeReviewDraftArchiveRequest,
) -> KnowledgeReviewDraftArchiveResponse:
"""
Owner 審核後封存 Hermes KM healthcheck duplicate review drafts。
這不是 read endpoint必須明確傳 owner_approved=true且後端會重新比對
最新 dedupe plan。封存為 KnowledgeEntry.status=archived不刪除資料。
"""
logger.info(
"km_review_draft_archive_request",
governance_event_id=governance_event_id,
canonical_entry_id=request.canonical_entry_id,
duplicate_count=len(request.duplicate_entry_ids),
owner=request.owner,
dry_run=request.dry_run,
owner_approved=request.owner_approved,
)
try:
return await archive_km_review_draft_duplicates(
governance_event_id=governance_event_id,
request=request,
)
except KmReviewDraftArchiveError as exc:
raise HTTPException(status_code=exc.status_code, detail=exc.detail) from exc
# =============================================================================
# GET /api/v1/ai/governance/summary
# =============================================================================

View File

@@ -146,6 +146,33 @@ class KnowledgeReviewDraftDedupeResponse(BaseModel):
generated_at: datetime
class KnowledgeReviewDraftArchiveRequest(BaseModel):
canonical_entry_id: str = Field(min_length=1, max_length=120)
duplicate_entry_ids: list[str] = Field(min_length=1, max_length=100)
owner: str = Field(default="operator_console", min_length=1, max_length=100)
owner_approved: bool = False
dry_run: bool = False
class KnowledgeReviewDraftArchiveResponse(BaseModel):
schema_version: str = "km_review_draft_archive_v1"
governance_event_id: str
canonical_entry_id: str
requested_duplicate_entry_ids: list[str]
archived_entry_ids: list[str] = Field(default_factory=list)
skipped_entry_ids: list[str] = Field(default_factory=list)
would_archive_entry_ids: list[str] = Field(default_factory=list)
status: Literal["dry_run", "archived", "noop_already_archived"]
owner: str
owner_approved: bool
dry_run: bool
writes_km: bool
writes_governance_audit: bool
audit_dispatch_id: str | None = None
next_action: str = "stale_ratio_recheck"
generated_at: datetime
# =============================================================================
# Endpoint 3: summary
# =============================================================================

View File

@@ -0,0 +1,367 @@
"""
Governance KM Review Service
============================
Owner-approved operations for Hermes KM healthcheck review drafts.
設計原則:
- read model 仍在 governance_query_service本檔只處理 owner 審核後的寫入。
- 封存採 KnowledgeEntry.status=archived不刪除資料。
- 寫入前重新比對最新 dedupe plan避免前端 stale click 封存錯資料。
- 每次成功封存都寫 governance_remediation_dispatch terminal row 作 audit trail。
"""
from __future__ import annotations
from typing import Any, Literal
import structlog
from sqlalchemy import select
from src.db.base import get_db_context
from src.db.models import (
GovernanceRemediationDispatch,
KnowledgeEntryRecord,
generate_uuid,
taipei_now,
)
from src.models.governance import (
KnowledgeReviewDraftArchiveRequest,
KnowledgeReviewDraftArchiveResponse,
KnowledgeReviewDraftDedupeGroup,
)
from src.models.knowledge import EntryStatus, EntryType
from src.services.governance_query_service import query_km_review_draft_dedupe
from src.utils.timezone import now_taipei
logger = structlog.get_logger(__name__)
class KmReviewDraftArchiveError(Exception):
"""KM review draft archive request failed validation."""
def __init__(self, status_code: int, detail: str) -> None:
super().__init__(detail)
self.status_code = status_code
self.detail = detail
async def archive_km_review_draft_duplicates(
*,
governance_event_id: str,
request: KnowledgeReviewDraftArchiveRequest,
) -> KnowledgeReviewDraftArchiveResponse:
"""Archive duplicate Hermes KM review drafts after explicit owner approval."""
duplicate_ids = _unique_ids(request.duplicate_entry_ids)
if not duplicate_ids:
raise KmReviewDraftArchiveError(422, "duplicate_entry_ids is required")
if request.canonical_entry_id in duplicate_ids:
raise KmReviewDraftArchiveError(409, "canonical_entry_id cannot be archived")
if not request.dry_run and not request.owner_approved:
raise KmReviewDraftArchiveError(
403,
"owner_approved=true is required before archiving duplicate KM drafts",
)
plan = await query_km_review_draft_dedupe(limit=200)
group = next(
(
item
for item in plan.groups
if item.governance_event_id == governance_event_id
),
None,
)
if group is None:
already_archived = await _load_already_archived_duplicate_ids(
duplicate_ids,
canonical_entry_id=request.canonical_entry_id,
governance_event_id=governance_event_id,
)
if set(already_archived) == set(duplicate_ids):
return _build_archive_response(
governance_event_id=governance_event_id,
request=request,
duplicate_ids=duplicate_ids,
status="noop_already_archived",
skipped_entry_ids=duplicate_ids,
writes_km=False,
writes_governance_audit=False,
)
raise KmReviewDraftArchiveError(
409,
"latest dedupe plan no longer contains this governance_event_id",
)
_validate_archive_request_against_plan(group, request, duplicate_ids)
if request.dry_run:
return _build_archive_response(
governance_event_id=governance_event_id,
request=request,
duplicate_ids=duplicate_ids,
status="dry_run",
would_archive_entry_ids=duplicate_ids,
writes_km=False,
writes_governance_audit=False,
)
archived_ids, audit_dispatch_id = await _archive_duplicates_and_write_audit(
governance_event_id=governance_event_id,
request=request,
duplicate_ids=duplicate_ids,
)
return _build_archive_response(
governance_event_id=governance_event_id,
request=request,
duplicate_ids=duplicate_ids,
status="archived",
archived_entry_ids=archived_ids,
writes_km=bool(archived_ids),
writes_governance_audit=True,
audit_dispatch_id=audit_dispatch_id,
)
def _unique_ids(ids: list[str]) -> list[str]:
"""Normalize duplicate ids while preserving operator-visible order."""
result: list[str] = []
for raw in ids:
value = str(raw).strip()
if value and value not in result:
result.append(value[:120])
return result
def _validate_archive_request_against_plan(
group: KnowledgeReviewDraftDedupeGroup,
request: KnowledgeReviewDraftArchiveRequest,
duplicate_ids: list[str],
) -> None:
"""Ensure the owner action is based on the latest read model."""
if group.canonical_entry_id != request.canonical_entry_id:
raise KmReviewDraftArchiveError(
409,
"canonical_entry_id does not match the latest dedupe plan",
)
current_duplicate_ids = _unique_ids(group.duplicate_entry_ids)
if set(current_duplicate_ids) != set(duplicate_ids):
raise KmReviewDraftArchiveError(
409,
"duplicate_entry_ids does not match the latest dedupe plan",
)
async def _load_already_archived_duplicate_ids(
duplicate_ids: list[str],
*,
canonical_entry_id: str,
governance_event_id: str,
) -> list[str]:
"""Treat double-clicks as idempotent only when prior archive tags prove it."""
if not duplicate_ids:
return []
async with get_db_context() as db:
result = await db.execute(
select(KnowledgeEntryRecord).where(
KnowledgeEntryRecord.id.in_(duplicate_ids)
)
)
records = result.scalars().all()
archived: list[str] = []
for record in records:
tags = [str(tag) for tag in (record.tags or [])]
if (
_enum_value(record.status) == EntryStatus.ARCHIVED.value
and f"dedupe_canonical:{canonical_entry_id}" in tags
and f"governance_event:{governance_event_id}" in tags
):
archived.append(str(record.id))
return archived
async def _archive_duplicates_and_write_audit(
*,
governance_event_id: str,
request: KnowledgeReviewDraftArchiveRequest,
duplicate_ids: list[str],
) -> tuple[list[str], str]:
"""Soft-archive duplicate rows and append a terminal audit dispatch."""
now = now_taipei()
async with get_db_context() as db:
result = await db.execute(
select(KnowledgeEntryRecord).where(
KnowledgeEntryRecord.id.in_(duplicate_ids)
)
)
records = result.scalars().all()
records_by_id = {str(record.id): record for record in records}
missing = [entry_id for entry_id in duplicate_ids if entry_id not in records_by_id]
if missing:
raise KmReviewDraftArchiveError(
409,
f"duplicate KM drafts missing or no longer visible: {', '.join(missing[:3])}",
)
archived_ids: list[str] = []
for entry_id in duplicate_ids:
record = records_by_id[entry_id]
tags = [str(tag) for tag in (record.tags or [])]
if not _is_archive_candidate(
record,
governance_event_id=governance_event_id,
):
raise KmReviewDraftArchiveError(
409,
f"KM draft {entry_id} is no longer an archive candidate",
)
record.status = EntryStatus.ARCHIVED
record.tags = _append_archive_tags(
tags,
governance_event_id=governance_event_id,
canonical_entry_id=request.canonical_entry_id,
owner=request.owner,
archived_at=now.isoformat(),
)
record.updated_at = now
archived_ids.append(entry_id)
audit = GovernanceRemediationDispatch(
id=generate_uuid(),
governance_event_id=governance_event_id,
event_type="knowledge_degradation",
dispatch_status="succeeded",
decision_context=_build_archive_audit_context(
governance_event_id=governance_event_id,
request=request,
archived_ids=archived_ids,
),
executor_type="hermes_km_review_dedupe_owner_archive",
attempt_count=0,
max_attempts=1,
dispatched_at=taipei_now(),
started_at=taipei_now(),
completed_at=taipei_now(),
created_by=request.owner[:100],
)
db.add(audit)
await db.flush()
logger.info(
"km_review_draft_duplicates_archived",
governance_event_id=governance_event_id,
canonical_entry_id=request.canonical_entry_id,
duplicate_count=len(archived_ids),
audit_dispatch_id=audit.id,
)
return archived_ids, str(audit.id)
def _is_archive_candidate(
record: KnowledgeEntryRecord,
*,
governance_event_id: str,
) -> bool:
tags = [str(tag) for tag in (record.tags or [])]
return (
_enum_value(record.entry_type) == EntryType.AUTO_RUNBOOK.value
and _enum_value(record.status) == EntryStatus.REVIEW.value
and f"governance_event:{governance_event_id}" in tags
)
def _append_archive_tags(
tags: list[str],
*,
governance_event_id: str,
canonical_entry_id: str,
owner: str,
archived_at: str,
) -> list[str]:
additions = [
"archived_by:km_review_dedupe",
f"governance_event:{governance_event_id}",
f"dedupe_canonical:{canonical_entry_id}",
f"dedupe_owner:{owner[:80]}",
f"archived_at:{archived_at}",
]
merged = list(tags)
for tag in additions:
if tag not in merged:
merged.append(tag)
return merged
def _build_archive_audit_context(
*,
governance_event_id: str,
request: KnowledgeReviewDraftArchiveRequest,
archived_ids: list[str],
) -> dict[str, Any]:
return {
"schema_version": "km_review_draft_archive_audit_v1",
"decision_path": "owner_approved_archive_duplicates",
"workflow": {
"current_stage": "km_duplicate_archive_after_owner_approval",
"steps": [
"detected",
"queued_kb_healthcheck",
"draft_km_updates",
"waiting_owner_review",
"owner_approved_duplicate_archive",
"stale_ratio_recheck",
],
"next_action": "stale_ratio_recheck",
"writes_km": True,
"writes_governance_audit": True,
},
"owner_action": "review_canonical_and_archive_duplicate_drafts",
"owner": request.owner,
"governance_event_id": governance_event_id,
"canonical_entry_id": request.canonical_entry_id,
"archived_entry_ids": archived_ids,
"archived_count": len(archived_ids),
"dry_run": request.dry_run,
"owner_approved": request.owner_approved,
}
def _build_archive_response(
*,
governance_event_id: str,
request: KnowledgeReviewDraftArchiveRequest,
duplicate_ids: list[str],
status: Literal["dry_run", "archived", "noop_already_archived"],
archived_entry_ids: list[str] | None = None,
skipped_entry_ids: list[str] | None = None,
would_archive_entry_ids: list[str] | None = None,
writes_km: bool,
writes_governance_audit: bool,
audit_dispatch_id: str | None = None,
) -> KnowledgeReviewDraftArchiveResponse:
return KnowledgeReviewDraftArchiveResponse(
governance_event_id=governance_event_id,
canonical_entry_id=request.canonical_entry_id,
requested_duplicate_entry_ids=duplicate_ids,
archived_entry_ids=archived_entry_ids or [],
skipped_entry_ids=skipped_entry_ids or [],
would_archive_entry_ids=would_archive_entry_ids or [],
status=status,
owner=request.owner,
owner_approved=request.owner_approved,
dry_run=request.dry_run,
writes_km=writes_km,
writes_governance_audit=writes_governance_audit,
audit_dispatch_id=audit_dispatch_id,
generated_at=now_taipei(),
)
def _enum_value(value: Any) -> str:
return str(value.value if hasattr(value, "value") else value)