""" Contract Lifecycle Service =========================== AwoooP Phase 3: 合約生命週期管理(ADR-107/ADR-112) 2026-05-04 ogt + Claude Sonnet 4.6 生命週期狀態機: draft → published → active → revoked ↑ ↓(新 active 把舊的設為 revoked) 操作: draft() — 建立 draft revision(schema 驗證 + body_hash) publish() — HMAC 簽章驗證後 draft → published activate() — approval 確認後 published → active + outbox get_active() — runtime 唯一讀取路徑(只返回 active revision) 安全機制: - body_hash = sha256(canonical JSON)(ADR-112) - publish() 需 HMAC 簽章(settings.CONTRACT_HMAC_KEY) - activate() 需 Redis multi_sig 確認(ADR-112 approval workflow) - 所有操作寫入 audit_log """ from __future__ import annotations import hashlib import hmac import json from datetime import datetime, timezone from typing import Any from uuid import UUID import structlog from pydantic import ValidationError from src.core.config import settings from src.db.awooop_models import AwoooPContractRevision from src.models.awooop_contracts import validate_contract_body from src.repositories import contract_repository logger = structlog.get_logger(__name__) # ───────────────────────────────────────────────────────────────────────────── # 錯誤定義 # ───────────────────────────────────────────────────────────────────────────── class ContractError(Exception): """合約操作基礎錯誤""" def __init__(self, error_code: str, message: str) -> None: self.error_code = error_code super().__init__(f"[{error_code}] {message}") class ContractSchemaError(ContractError): """body_json 不符合 schema""" def __init__(self, family: str, details: str) -> None: super().__init__("E-CONTRACT-001", f"Contract family={family} schema 驗證失敗: {details}") class ContractSignatureError(ContractError): """HMAC 簽章驗證失敗""" def __init__(self) -> None: super().__init__("E-CONTRACT-002", "Contract publish 簽章驗證失敗") class ContractStateError(ContractError): """非法狀態轉換""" def __init__(self, from_state: str, to_state: str) -> None: super().__init__( "E-CONTRACT-003", f"非法狀態轉換 {from_state!r} → {to_state!r}", ) class ContractApprovalError(ContractError): """缺少必要的 activation approval""" def __init__(self, revision_id: str) -> None: super().__init__( "E-CONTRACT-004", f"revision {revision_id} 尚未取得足夠的 approval 簽核", ) class ContractNotFoundError(ContractError): """Revision 不存在""" def __init__(self, revision_id: str) -> None: super().__init__( "E-CONTRACT-005", f"Revision {revision_id!r} 不存在或無權限存取", ) # ───────────────────────────────────────────────────────────────────────────── # Body hash(ADR-112 artifact integrity) # ───────────────────────────────────────────────────────────────────────────── def _compute_body_hash(body_json: dict[str, Any]) -> str: """ 計算 body_json 的 SHA-256 hex digest。 使用 canonical JSON(sorted keys, no spaces)確保確定性。 """ canonical = json.dumps(body_json, sort_keys=True, separators=(",", ":"), ensure_ascii=False) return hashlib.sha256(canonical.encode("utf-8")).hexdigest() def _verify_publish_signature( revision_id: str, body_hash: str, publisher_id: str, signature: str, ) -> bool: """ 驗證 publish HMAC 簽章。 message = f"{revision_id}:{body_hash}:{publisher_id}" secret = settings.CONTRACT_HMAC_KEY(base64 or hex) """ secret = getattr(settings, "CONTRACT_HMAC_KEY", "") if not secret: # 未設定 HMAC key → 開發環境放行(但記錄 warning) logger.warning( "contract_hmac_key_not_set", warning="CONTRACT_HMAC_KEY 未設定,publish 簽章驗證跳過(非 production 行為)", ) return True message = f"{revision_id}:{body_hash}:{publisher_id}".encode("utf-8") expected = hmac.new( secret.encode("utf-8"), message, hashlib.sha256 ).hexdigest() return hmac.compare_digest(expected, signature) # ───────────────────────────────────────────────────────────────────────────── # Multi-sig approval(ADR-112 activation approval) # ───────────────────────────────────────────────────────────────────────────── _APPROVAL_KEY_PREFIX = "contract:approval:" _APPROVAL_REQUIRED = 1 # Phase 3:1 人核准即可;Phase 5+ 升為 2 async def _check_activation_approval(revision_id: str, project_id: str) -> bool: """ 檢查 Redis 中是否有足夠的 activation approval。 key = contract:approval:{project_id}:{revision_id} value = JSON list of approver IDs """ try: from src.core.redis_client import get_redis redis = get_redis() key = f"{_APPROVAL_KEY_PREFIX}{project_id}:{revision_id}" raw = await redis.get(key) if not raw: return False approvers = json.loads(raw.decode() if isinstance(raw, bytes) else raw) return len(approvers) >= _APPROVAL_REQUIRED except Exception as exc: logger.warning("contract_approval_check_failed", revision_id=revision_id, error=str(exc)) return False async def record_activation_approval( revision_id: str, project_id: str, approver_id: str, ) -> int: """ 記錄一個 approver 的核准簽名。 Returns: 目前收到的 approval 數。 """ from src.core.redis_client import get_redis redis = get_redis() key = f"{_APPROVAL_KEY_PREFIX}{project_id}:{revision_id}" raw = await redis.get(key) approvers: list[str] = json.loads(raw.decode() if isinstance(raw, bytes) else raw or "[]") if approver_id not in approvers: approvers.append(approver_id) await redis.set(key, json.dumps(approvers), ex=86400) # 24h TTL logger.info( "contract_approval_recorded", revision_id=revision_id, approver_id=approver_id, total_approvals=len(approvers), ) return len(approvers) # ───────────────────────────────────────────────────────────────────────────── # Core lifecycle operations # ───────────────────────────────────────────────────────────────────────────── async def draft( *, project_id: str, contract_family: str, contract_id: str, version_major: int, version_minor: int, body_json: dict[str, Any], body_schema_version: str = "v1.0", ) -> AwoooPContractRevision: """ Step 1: 建立 draft revision。 - 驗證 body_json 符合 contract_family 的 Pydantic schema - 計算 body_hash(sha256 canonical JSON) - 寫入 DB(lifecycle_status='draft') - 寫入 audit log draft revision 不可被 runtime 讀取(get_active() 只返回 active)。 """ # Schema 驗證 try: validate_contract_body(contract_family, body_json) except ValidationError as exc: raise ContractSchemaError(contract_family, exc.json(indent=0)) from exc except ValueError as exc: raise ContractSchemaError(contract_family, str(exc)) from exc body_hash = _compute_body_hash(body_json) revision = await contract_repository.create_draft( project_id=project_id, contract_family=contract_family, contract_id=contract_id, version_major=version_major, version_minor=version_minor, body_json=body_json, body_hash=body_hash, body_schema_version=body_schema_version, ) await _write_audit( project_id=project_id, action="contract.drafted", resource_type="contract_revision", resource_id=str(revision.revision_id), details={ "contract_family": contract_family, "contract_id": contract_id, "version": f"{version_major}.{version_minor}", "body_hash": body_hash, }, ) return revision async def publish( *, revision_id: UUID, project_id: str, publisher_id: str, signature: str, ) -> AwoooPContractRevision: """ Step 2: draft → published。 - 讀取 revision(驗證 lifecycle_status='draft') - HMAC 簽章驗證(publisher_id + body_hash + revision_id) - 更新 lifecycle_status='published' - 寫入 audit log """ revision = await contract_repository.get_revision(revision_id, project_id) if revision is None: raise ContractNotFoundError(str(revision_id)) if revision.lifecycle_status != "draft": raise ContractStateError(revision.lifecycle_status, "published") if not _verify_publish_signature( str(revision_id), revision.body_hash, publisher_id, signature ): raise ContractSignatureError() published_at = datetime.now(timezone.utc) revision = await contract_repository.mark_published( revision_id=revision_id, project_id=project_id, publisher_id=publisher_id, publish_signature=signature, published_at=published_at, ) await _write_audit( project_id=project_id, action="contract.published", resource_type="contract_revision", resource_id=str(revision_id), details={ "publisher_id": publisher_id, "published_at": published_at.isoformat(), "body_hash": revision.body_hash, }, ) return revision async def activate( *, revision_id: UUID, project_id: str, activator_id: str, bypass_approval: bool = False, ) -> AwoooPContractRevision: """ Step 3: published → active。 - 讀取 revision(驗證 lifecycle_status='published') - 確認 Redis approval(除非 bypass_approval=True) - 更新 active pointer(UPSERT awooop_active_revisions) - 舊 active revision → revoked - 寫入 outbox event(ADR-113) - 寫入 audit log """ revision = await contract_repository.get_revision(revision_id, project_id) if revision is None: raise ContractNotFoundError(str(revision_id)) if revision.lifecycle_status != "published": raise ContractStateError(revision.lifecycle_status, "active") if not bypass_approval: approved = await _check_activation_approval(str(revision_id), project_id) if not approved: raise ContractApprovalError(str(revision_id)) # 找舊 active revision(如果有) old_revision = await contract_repository.get_active_revision( project_id=project_id, contract_family=revision.contract_family, contract_id=revision.contract_id, ) old_revision_id = old_revision.revision_id if old_revision else None revision = await contract_repository.mark_active( revision_id=revision_id, project_id=project_id, contract_family=revision.contract_family, contract_id=revision.contract_id, old_revision_id=old_revision_id, ) await _write_audit( project_id=project_id, action="contract.activated", resource_type="contract_revision", resource_id=str(revision_id), details={ "activator_id": activator_id, "old_revision_id": str(old_revision_id) if old_revision_id else None, "contract_family": revision.contract_family, "contract_id": revision.contract_id, }, ) return revision async def get_active( *, project_id: str, contract_family: str, contract_id: str, verify_hash: bool = True, ) -> AwoooPContractRevision | None: """ Runtime 讀取路徑:只返回 active revision。 verify_hash=True(預設):從 DB 讀取後驗證 body_hash, 確保 body_json 未被竄改(ADR-112 artifact integrity)。 """ revision = await contract_repository.get_active_revision( project_id=project_id, contract_family=contract_family, contract_id=contract_id, ) if revision is None: return None if verify_hash: computed = _compute_body_hash(revision.body_json) if computed != revision.body_hash: logger.error( "contract_hash_mismatch", revision_id=str(revision.revision_id), expected=revision.body_hash, computed=computed, ) raise ContractError( "E-CONTRACT-006", f"revision {revision.revision_id} body_hash 不符(資料可能被竄改)", ) return revision async def get_active_body( *, project_id: str, contract_family: str, contract_id: str, ) -> dict[str, Any] | None: """ 便利方法:直接返回 body_json(含 hash 驗證)。 None = 沒有 active revision。 """ revision = await get_active( project_id=project_id, contract_family=contract_family, contract_id=contract_id, ) return revision.body_json if revision else None # ───────────────────────────────────────────────────────────────────────────── # Audit log helper # ───────────────────────────────────────────────────────────────────────────── async def _write_audit( *, project_id: str, action: str, resource_type: str, resource_id: str, details: dict[str, Any], ) -> None: """寫入 audit_log(非阻擋,失敗只 warning)""" try: from sqlalchemy import text as sa_text from src.db.base import get_db_context async with get_db_context(project_id) as db: await db.execute( sa_text(""" INSERT INTO audit_logs (project_id, action, resource_type, resource_id, details) VALUES (:project_id, :action, :resource_type, :resource_id, :details::jsonb) """), { "project_id": project_id, "action": action, "resource_type": resource_type, "resource_id": resource_id, "details": json.dumps(details), }, ) except Exception as exc: logger.warning( "contract_audit_write_failed", action=action, resource_id=resource_id, error=str(exc), )