""" AwoooP Operator Console — Platform Operator Service ==================================================== leWOOOgo 積木化:DB 存取集中在 Service 層,Router 不直接引用 get_db。 ADR-106(AwoooP Agent Platform) 2026-05-05 ogt + Claude Sonnet 4.6 """ from __future__ import annotations import uuid from datetime import datetime from decimal import Decimal from typing import Any from uuid import UUID import structlog from fastapi import HTTPException, status from sqlalchemy import func, select from src.db.awooop_models import ( AwoooPContractRevision, AwoooPProject, AwoooPRunState, ) from src.db.base import get_db_context from src.services.audit_sink import write_audit from src.services.awooop_approval_token import issue_approval_token, record_approval from src.services.run_state_machine import transition logger = structlog.get_logger(__name__) _MAX_CONTRACTS = 200 _DEFAULT_PER_PAGE = 50 _MAX_PER_PAGE = 200 # ============================================================================= # Tenants # ============================================================================= async def list_tenants() -> dict[str, Any]: """列出所有 AwoooP 租戶(Operator Console,不依 RLS 過濾)。""" async with get_db_context("awoooi") as db: result = await db.execute( select(AwoooPProject).order_by(AwoooPProject.created_at.asc()) ) rows = list(result.scalars().all()) tenants = [ { "project_id": r.project_id, "display_name": r.display_name, "migration_mode": r.migration_mode, "budget_limit_usd": r.budget_limit_usd, "is_active": r.is_active, "created_at": r.created_at, } for r in rows ] return {"tenants": tenants, "total": len(tenants)} # ============================================================================= # Contracts # ============================================================================= async def list_contracts( project_id: str | None, lifecycle_status: str | None, ) -> dict[str, Any]: """列出合約 revisions(可 filter by project_id / lifecycle_status)。""" async with get_db_context("awoooi") as db: stmt = select(AwoooPContractRevision).order_by( AwoooPContractRevision.created_at.desc() ) if project_id is not None: stmt = stmt.where(AwoooPContractRevision.project_id == project_id) if lifecycle_status is not None: stmt = stmt.where( AwoooPContractRevision.lifecycle_status == lifecycle_status ) count_stmt = select(func.count()).select_from(stmt.subquery()) total_result = await db.execute(count_stmt) total = total_result.scalar_one() stmt = stmt.limit(_MAX_CONTRACTS) result = await db.execute(stmt) rows = list(result.scalars().all()) contracts = [ { "revision_id": r.revision_id, "contract_id": r.contract_id, "contract_family": r.contract_family, "lifecycle_status": r.lifecycle_status, "body_hash": r.body_hash, "version_major": r.version_major, "version_minor": r.version_minor, "created_at": r.created_at, "project_id": r.project_id, } for r in rows ] return {"contracts": contracts, "total": total} # ============================================================================= # Runs # ============================================================================= async def list_runs( project_id: str | None, state: str | None, page: int, per_page: int, ) -> dict[str, Any]: """列出 runs,支援 project_id、state filter 與分頁。""" async with get_db_context("awoooi") as db: stmt = select(AwoooPRunState).order_by(AwoooPRunState.created_at.desc()) if project_id is not None: stmt = stmt.where(AwoooPRunState.project_id == project_id) if state is not None: stmt = stmt.where(AwoooPRunState.state == state) count_stmt = select(func.count()).select_from(stmt.subquery()) total_result = await db.execute(count_stmt) total = total_result.scalar_one() offset = (page - 1) * per_page stmt = stmt.offset(offset).limit(per_page) result = await db.execute(stmt) rows = list(result.scalars().all()) runs = [ { "run_id": r.run_id, "project_id": r.project_id, "agent_id": r.agent_id, "state": r.state, "is_shadow": r.is_shadow, "cost_usd": r.cost_usd, "step_count": r.step_count, "created_at": r.created_at, "timeout_at": r.timeout_at, } for r in rows ] return {"runs": runs, "total": total, "page": page, "per_page": per_page} # ============================================================================= # Approvals # ============================================================================= async def list_approvals(project_id: str | None) -> dict[str, Any]: """列出所有 waiting_approval 狀態的 runs。""" async with get_db_context("awoooi") as db: stmt = ( select(AwoooPRunState) .where(AwoooPRunState.state == "waiting_approval") .order_by(AwoooPRunState.created_at.asc()) ) if project_id is not None: stmt = stmt.where(AwoooPRunState.project_id == project_id) count_stmt = select(func.count()).select_from(stmt.subquery()) total_result = await db.execute(count_stmt) total = total_result.scalar_one() result = await db.execute(stmt) rows = list(result.scalars().all()) items = [ { "run_id": r.run_id, "project_id": r.project_id, "agent_id": r.agent_id, "created_at": r.created_at, "timeout_at": r.timeout_at, } for r in rows ] return {"approvals": items, "total": total, "items": items} async def decide_approval( run_id: str, project_id: str, decision: str, approver_id: str, reason: str | None, ) -> dict[str, Any]: """核准或拒絕一個待審核的 run(ADR-116 Gate 5)。""" try: run_uuid = uuid.UUID(run_id) except ValueError as exc: raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=f"run_id 格式錯誤: {exc}", ) from exc async with get_db_context(project_id) as db: result = await db.execute( select(AwoooPRunState).where( AwoooPRunState.run_id == run_uuid, AwoooPRunState.project_id == project_id, ) ) run = result.scalar_one_or_none() if run is None: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"run {run_id!r} 不存在或非此 project 所有", ) if run.state != "waiting_approval": raise HTTPException( status_code=status.HTTP_409_CONFLICT, detail=f"run {run_id!r} 目前狀態為 {run.state!r},無法審核(需為 waiting_approval)", ) approval_token_jti: str | None = None new_state: str if decision == "approve": token = issue_approval_token( project_id=project_id, run_id=run_id, tool_name="operator_console_approve", approver_id=approver_id, ) try: await record_approval( project_id=project_id, run_id=run_id, tool_name="operator_console_approve", approver_id=approver_id, token=token, ) except Exception as exc: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"核准記錄失敗: {exc}", ) from exc await transition(run_uuid, project_id, "running") new_state = "running" import base64 import json as _json try: p_b64 = token.split(".")[1] padding = 4 - len(p_b64) % 4 if padding != 4: p_b64 += "=" * padding payload = _json.loads(base64.urlsafe_b64decode(p_b64)) approval_token_jti = payload.get("jti") except Exception: approval_token_jti = None else: await transition( run_uuid, project_id, "cancelled", error_code="E-APPR-REJECTED", error_detail=f"operator 拒絕: approver={approver_id!r}, reason={reason!r}", ) new_state = "cancelled" try: await write_audit( project_id=project_id, action=f"run.approval.{decision}", resource_type="run", resource_id=run_id, details={ "approver_id": approver_id, "decision": decision, "reason": reason, "new_state": new_state, }, ) except Exception as exc: logger.warning("approval_audit_write_failed", run_id=run_id, error=str(exc)) return { "run_id": run_id, "decision": decision, "new_state": new_state, "approval_token_jti": approval_token_jti, }