Files
awoooi/apps/api/src/services/platform_operator_service.py
Your Name 7d92f0acd7
All checks were successful
Code Review / ai-code-review (push) Successful in 10s
CD Pipeline / tests (push) Successful in 1m8s
CD Pipeline / build-and-deploy (push) Successful in 3m49s
CD Pipeline / post-deploy-checks (push) Successful in 1m25s
chore(rls): stage projects canary path
2026-05-12 21:25:24 +08:00

790 lines
26 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
AwoooP Operator Console — Platform Operator Service
====================================================
leWOOOgo 積木化DB 存取集中在 Service 層Router 不直接引用 get_db。
ADR-106AwoooP Agent Platform
2026-05-05 ogt + Claude Sonnet 4.6
"""
from __future__ import annotations
import uuid
from datetime import UTC, datetime
from typing import Any
from uuid import UUID
import structlog
from fastapi import HTTPException, status
from sqlalchemy import func, select, text, update
from sqlalchemy import or_ as sa_or
from src.db.awooop_models import (
AwoooPContractRevision,
AwoooPConversationEvent,
AwoooPMcpGatewayAudit,
AwoooPOutboundMessage,
AwoooPRunState,
AwoooPRunStepJournal,
)
from src.db.base import get_db_context
from src.services.audit_sink import write_audit
from src.services.awooop_approval_token import issue_approval_token, record_approval
from src.services.run_state_machine import transition
logger = structlog.get_logger(__name__)
_MAX_CONTRACTS = 200
_DEFAULT_PER_PAGE = 50
_MAX_PER_PAGE = 200
_MAX_EVENTS = 100
_MAX_TIMELINE_ITEMS = 100
_MAX_STEP_SUMMARY_CHARS = 128
# =============================================================================
# Tenants
# =============================================================================
async def list_tenants() -> dict[str, Any]:
"""列出所有 AwoooP 租戶Operator Console不依 RLS 過濾)。"""
async with get_db_context("awoooi") as db:
result = await db.execute(
text("""
SELECT
project_id,
display_name,
migration_mode,
budget_limit_usd,
is_active,
created_at
FROM awooop_operator_list_projects()
""")
)
rows = list(result.mappings().all())
tenants = [
{
"project_id": r["project_id"],
"display_name": r["display_name"],
"migration_mode": r["migration_mode"],
"budget_limit_usd": r["budget_limit_usd"],
"is_active": r["is_active"],
"created_at": r["created_at"],
}
for r in rows
]
return {"tenants": tenants, "total": len(tenants)}
# =============================================================================
# Contracts
# =============================================================================
async def list_contracts(
project_id: str | None,
lifecycle_status: str | None,
) -> dict[str, Any]:
"""列出合約 revisions可 filter by project_id / lifecycle_status"""
async with get_db_context("awoooi") as db:
stmt = select(AwoooPContractRevision).order_by(
AwoooPContractRevision.created_at.desc()
)
if project_id is not None:
stmt = stmt.where(AwoooPContractRevision.project_id == project_id)
if lifecycle_status is not None:
stmt = stmt.where(
AwoooPContractRevision.lifecycle_status == lifecycle_status
)
count_stmt = select(func.count()).select_from(stmt.subquery())
total_result = await db.execute(count_stmt)
total = total_result.scalar_one()
stmt = stmt.limit(_MAX_CONTRACTS)
result = await db.execute(stmt)
rows = list(result.scalars().all())
contracts = [
{
"revision_id": r.revision_id,
"contract_id": r.contract_id,
"contract_family": r.contract_family,
"lifecycle_status": r.lifecycle_status,
"body_hash": r.body_hash,
"version_major": r.version_major,
"version_minor": r.version_minor,
"created_at": r.created_at,
"project_id": r.project_id,
}
for r in rows
]
return {"contracts": contracts, "total": total}
# =============================================================================
# Runs
# =============================================================================
async def list_runs(
project_id: str | None,
state: str | None,
page: int,
per_page: int,
) -> dict[str, Any]:
"""列出 runs支援 project_id、state filter 與分頁。"""
async with get_db_context("awoooi") as db:
stmt = select(AwoooPRunState).order_by(AwoooPRunState.created_at.desc())
if project_id is not None:
stmt = stmt.where(AwoooPRunState.project_id == project_id)
if state is not None:
stmt = stmt.where(AwoooPRunState.state == state)
count_stmt = select(func.count()).select_from(stmt.subquery())
total_result = await db.execute(count_stmt)
total = total_result.scalar_one()
offset = (page - 1) * per_page
stmt = stmt.offset(offset).limit(per_page)
result = await db.execute(stmt)
rows = list(result.scalars().all())
runs = [
{
"run_id": r.run_id,
"project_id": r.project_id,
"agent_id": r.agent_id,
"state": r.state,
"is_shadow": r.is_shadow,
"cost_usd": r.cost_usd,
"step_count": r.step_count,
"created_at": r.created_at,
"timeout_at": r.timeout_at,
}
for r in rows
]
return {"runs": runs, "total": total, "page": page, "per_page": per_page}
def _timeline_item(
*,
ts: Any,
kind: str,
title: str,
status: str,
summary: str | None = None,
metadata: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Build one Operator Console timeline item."""
return {
"ts": ts,
"kind": kind,
"title": title,
"status": status,
"summary": summary,
"metadata": metadata or {},
}
def _utc_now_naive() -> datetime:
"""回傳與 AwoooP timestamp-without-timezone 欄位相容的 UTC 時間。"""
return datetime.now(UTC).replace(tzinfo=None)
def _truncate_step_summary(value: str | None) -> str | None:
"""壓縮 Step summary避免超過 DB 欄位與前端 timeline 需要的短摘要。"""
if not value:
return None
compact = " ".join(str(value).split())
if len(compact) <= _MAX_STEP_SUMMARY_CHARS:
return compact
return f"{compact[: _MAX_STEP_SUMMARY_CHARS - 1]}"
def _approval_step_title(tool_name: str, step_seq: int) -> str:
"""將 operator_console.* step 轉成人能一眼理解的 timeline 標題。"""
if tool_name == "operator_console.approve":
return f"人工審批 {step_seq}: 核准"
if tool_name == "operator_console.reject":
return f"人工審批 {step_seq}: 拒絕"
return f"Step {step_seq}: {tool_name}"
def _outbound_timeline_title(
channel_type: str,
message_type: str,
content_preview: str | None,
) -> str:
"""將 legacy Telegram outbound 分類成 Operator 看得懂的語義標題。"""
channel = channel_type.upper()
preview = content_preview or ""
if "RUNBOOK REVIEW" in preview:
return f"{channel}Runbook 待人工審核"
if "[AWOOOI CI/CD]" in preview or "AWOOOI CI/CD" in preview:
return f"{channel}CI/CD 狀態通知"
if "AI 治理警報" in preview:
return f"{channel}AI 治理警報"
if "HANDOFF REQUIRED" in preview or "AI 自動修復失敗" in preview:
return f"{channel}AI 自動修復失敗,已轉人工"
if "AUTO RESOLVED" in preview or "AI 自動修復完成" in preview:
return f"{channel}AI 自動修復完成"
if "ESCALATION" in preview or "事故升級" in preview:
return f"{channel}:事故升級通知"
if "ACTION REQUIRED" in preview:
return f"{channel}:告警審批卡"
fallback = {
"approval_request": "人工審批請求",
"error": "錯誤回覆",
"final": "處置結果",
"interim": "漸進式狀態回饋",
}.get(message_type, message_type)
return f"{channel}{fallback}"
async def get_run_detail(
run_id: str,
project_id: str | None = None,
) -> dict[str, Any]:
"""取得單一 Run 的處置脈絡,供 AwoooP Run detail / Timeline 顯示。"""
try:
run_uuid = uuid.UUID(run_id)
except ValueError as exc:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"run_id 格式錯誤: {exc}",
) from exc
async with get_db_context(project_id or "awoooi") as db:
run_stmt = select(AwoooPRunState).where(AwoooPRunState.run_id == run_uuid)
if project_id is not None:
run_stmt = run_stmt.where(AwoooPRunState.project_id == project_id)
run_result = await db.execute(run_stmt)
run = run_result.scalar_one_or_none()
if run is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"run {run_id!r} 不存在",
)
steps_result = await db.execute(
select(AwoooPRunStepJournal)
.where(AwoooPRunStepJournal.run_id == run_uuid)
.order_by(AwoooPRunStepJournal.step_seq.asc())
.limit(_MAX_TIMELINE_ITEMS)
)
steps = list(steps_result.scalars().all())
inbound_where = [AwoooPConversationEvent.run_id == run_uuid]
if run.trigger_ref:
try:
trigger_event_uuid = uuid.UUID(run.trigger_ref)
inbound_where.append(AwoooPConversationEvent.event_id == trigger_event_uuid)
except ValueError:
inbound_where.append(
AwoooPConversationEvent.provider_event_id == run.trigger_ref
)
inbound_result = await db.execute(
select(AwoooPConversationEvent)
.where(sa_or(*inbound_where))
.order_by(AwoooPConversationEvent.received_at.asc())
.limit(_MAX_TIMELINE_ITEMS)
)
inbound_events = list(inbound_result.scalars().all())
outbound_result = await db.execute(
select(AwoooPOutboundMessage)
.where(AwoooPOutboundMessage.run_id == run_uuid)
.order_by(AwoooPOutboundMessage.queued_at.asc())
.limit(_MAX_TIMELINE_ITEMS)
)
outbound_messages = list(outbound_result.scalars().all())
mcp_result = await db.execute(
select(AwoooPMcpGatewayAudit)
.where(AwoooPMcpGatewayAudit.run_id == run_uuid)
.order_by(AwoooPMcpGatewayAudit.created_at.asc())
.limit(_MAX_TIMELINE_ITEMS)
)
mcp_calls = list(mcp_result.scalars().all())
run_payload = {
"run_id": run.run_id,
"project_id": run.project_id,
"agent_id": run.agent_id,
"state": run.state,
"is_shadow": run.is_shadow,
"trace_id": run.trace_id,
"trigger_type": run.trigger_type,
"trigger_ref": run.trigger_ref,
"cost_usd": run.cost_usd,
"step_count": run.step_count,
"attempt_count": run.attempt_count,
"max_attempts": run.max_attempts,
"error_code": run.error_code,
"error_detail": run.error_detail,
"created_at": run.created_at,
"started_at": run.started_at,
"completed_at": run.completed_at,
"timeout_at": run.timeout_at,
"heartbeat_at": run.heartbeat_at,
}
step_items = [
{
"step_id": row.step_id,
"step_seq": row.step_seq,
"tool_name": row.tool_name,
"result_status": row.result_status,
"was_blocked": row.was_blocked,
"block_reason": row.block_reason,
"error_code": row.error_code,
"latency_ms": row.latency_ms,
"created_at": row.created_at,
"completed_at": row.completed_at,
}
for row in steps
]
inbound_items = [
{
"event_id": row.event_id,
"channel_type": row.channel_type,
"provider_event_id": row.provider_event_id,
"content_preview": row.content_preview,
"is_duplicate": row.is_duplicate,
"received_at": row.received_at,
}
for row in inbound_events
]
outbound_items = [
{
"message_id": row.message_id,
"channel_type": row.channel_type,
"message_type": row.message_type,
"content_preview": row.content_preview,
"send_status": row.send_status,
"send_error": row.send_error,
"provider_message_id": row.provider_message_id,
"queued_at": row.queued_at,
"sent_at": row.sent_at,
"triggered_by_state": row.triggered_by_state,
}
for row in outbound_messages
]
mcp_items = [
{
"call_id": row.call_id,
"tool_name": row.tool_name,
"result_status": row.result_status,
"block_gate": row.block_gate,
"block_reason": row.block_reason,
"latency_ms": row.latency_ms,
"created_at": row.created_at,
}
for row in mcp_calls
]
timeline: list[dict[str, Any]] = [
_timeline_item(
ts=run.created_at,
kind="run",
title="Run 建立",
status=run.state,
summary=f"{run.trigger_type or 'unknown'}{run.agent_id}",
metadata={"trace_id": run.trace_id, "trigger_ref": run.trigger_ref},
)
]
if run.started_at:
timeline.append(
_timeline_item(
ts=run.started_at,
kind="run",
title="Run 開始執行",
status="running",
summary=run.worker_id,
)
)
for row in inbound_events:
timeline.append(
_timeline_item(
ts=row.received_at,
kind="inbound",
title=f"{row.channel_type} 入站事件",
status="duplicate" if row.is_duplicate else "received",
summary=row.content_preview,
metadata={"provider_event_id": row.provider_event_id},
)
)
for row in steps:
is_approval_step = row.tool_name.startswith("operator_console.")
timeline.append(
_timeline_item(
ts=row.completed_at or row.created_at,
kind="approval" if is_approval_step else "step",
title=_approval_step_title(row.tool_name, row.step_seq),
status=row.result_status,
summary=row.block_reason or row.error_code,
metadata={
"was_blocked": row.was_blocked,
"latency_ms": row.latency_ms,
},
)
)
for row in mcp_calls:
timeline.append(
_timeline_item(
ts=row.created_at,
kind="mcp",
title=f"MCP: {row.tool_name}",
status=row.result_status,
summary=row.block_reason,
metadata={
"block_gate": row.block_gate,
"latency_ms": row.latency_ms,
},
)
)
for row in outbound_messages:
timeline.append(
_timeline_item(
ts=row.sent_at or row.queued_at,
kind="outbound",
title=_outbound_timeline_title(
row.channel_type,
row.message_type,
row.content_preview,
),
status=row.send_status,
summary=row.content_preview or row.send_error,
metadata={
"message_type": row.message_type,
"provider_message_id": row.provider_message_id,
"triggered_by_state": row.triggered_by_state,
},
)
)
if run.completed_at:
timeline.append(
_timeline_item(
ts=run.completed_at,
kind="run",
title="Run 結束",
status=run.state,
summary=run.error_detail or run.error_code,
)
)
timeline = sorted(
timeline,
key=lambda item: item["ts"] or run.created_at,
)[:_MAX_TIMELINE_ITEMS]
return {
"run": run_payload,
"steps": step_items,
"inbound_events": inbound_items,
"outbound_messages": outbound_items,
"mcp_calls": mcp_items,
"timeline": timeline,
"counts": {
"steps": len(step_items),
"inbound_events": len(inbound_items),
"outbound_messages": len(outbound_items),
"mcp_calls": len(mcp_items),
"timeline": len(timeline),
},
}
# =============================================================================
# Channel Events
# =============================================================================
async def list_recent_channel_events(
*,
project_id: str | None,
channel_type: str | None,
provider_prefix: str | None,
limit: int,
) -> dict[str, Any]:
"""列出最近 channel events供 Operator Console 顯示收斂/鏡像脈絡。"""
safe_limit = max(1, min(limit, _MAX_EVENTS))
async with get_db_context("awoooi") as db:
stmt = select(AwoooPConversationEvent).order_by(
AwoooPConversationEvent.received_at.desc()
)
if project_id is not None:
stmt = stmt.where(AwoooPConversationEvent.project_id == project_id)
if channel_type is not None:
stmt = stmt.where(AwoooPConversationEvent.channel_type == channel_type)
if provider_prefix is not None:
stmt = stmt.where(
AwoooPConversationEvent.provider_event_id.like(
f"{provider_prefix}%"
)
)
result = await db.execute(stmt.limit(safe_limit))
rows = list(result.scalars().all())
events = [
{
"event_id": r.event_id,
"project_id": r.project_id,
"channel_type": r.channel_type,
"provider_event_id": r.provider_event_id,
"channel_chat_id": r.channel_chat_id,
"content_preview": r.content_preview,
"is_duplicate": r.is_duplicate,
"received_at": r.received_at,
}
for r in rows
]
return {"events": events, "total": len(events), "limit": safe_limit}
# =============================================================================
# Approvals
# =============================================================================
async def list_approvals(
project_id: str | None,
run_id: str | None = None,
) -> dict[str, Any]:
"""列出 waiting_approval runs可依 project_id 或 run_id 篩選。"""
run_uuid: UUID | None = None
if run_id:
try:
run_uuid = uuid.UUID(run_id)
except ValueError as exc:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"run_id 格式錯誤: {exc}",
) from exc
async with get_db_context("awoooi") as db:
stmt = (
select(AwoooPRunState)
.where(AwoooPRunState.state == "waiting_approval")
.order_by(AwoooPRunState.created_at.asc())
)
if project_id is not None:
stmt = stmt.where(AwoooPRunState.project_id == project_id)
if run_uuid is not None:
stmt = stmt.where(AwoooPRunState.run_id == run_uuid)
count_stmt = select(func.count()).select_from(stmt.subquery())
total_result = await db.execute(count_stmt)
total = total_result.scalar_one()
result = await db.execute(stmt)
rows = list(result.scalars().all())
items = [
{
"run_id": r.run_id,
"project_id": r.project_id,
"agent_id": r.agent_id,
"created_at": r.created_at,
"timeout_at": r.timeout_at,
}
for r in rows
]
return {"approvals": items, "total": total, "items": items}
async def decide_approval(
run_id: str,
project_id: str,
decision: str,
approver_id: str,
reason: str | None,
) -> dict[str, Any]:
"""核准或拒絕一個待審核的 runADR-116 Gate 5"""
try:
run_uuid = uuid.UUID(run_id)
except ValueError as exc:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"run_id 格式錯誤: {exc}",
) from exc
async with get_db_context(project_id) as db:
result = await db.execute(
select(AwoooPRunState).where(
AwoooPRunState.run_id == run_uuid,
AwoooPRunState.project_id == project_id,
)
)
run = result.scalar_one_or_none()
if run is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"run {run_id!r} 不存在或非此 project 所有",
)
if run.state != "waiting_approval":
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"run {run_id!r} 目前狀態為 {run.state!r},無法審核(需為 waiting_approval",
)
approval_token_jti: str | None = None
new_state: str
if decision == "approve":
token = issue_approval_token(
project_id=project_id,
run_id=run_id,
tool_name="operator_console_approve",
approver_id=approver_id,
)
try:
await record_approval(
project_id=project_id,
run_id=run_id,
tool_name="operator_console_approve",
approver_id=approver_id,
token=token,
)
except Exception as exc:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"核准記錄失敗: {exc}",
) from exc
await transition(run_uuid, project_id, "running")
new_state = "running"
await _record_approval_decision_step(
run_id=run_uuid,
project_id=project_id,
decision=decision,
approver_id=approver_id,
reason=reason,
)
import base64
import json as _json
try:
p_b64 = token.split(".")[1]
padding = 4 - len(p_b64) % 4
if padding != 4:
p_b64 += "=" * padding
payload = _json.loads(base64.urlsafe_b64decode(p_b64))
approval_token_jti = payload.get("jti")
except Exception:
approval_token_jti = None
else:
await transition(
run_uuid,
project_id,
"cancelled",
error_code="E-APPR-REJECTED",
error_detail=f"operator 拒絕: approver={approver_id!r}, reason={reason!r}",
)
new_state = "cancelled"
await _record_approval_decision_step(
run_id=run_uuid,
project_id=project_id,
decision=decision,
approver_id=approver_id,
reason=reason,
)
try:
await write_audit(
project_id=project_id,
action=f"run.approval.{decision}",
resource_type="run",
resource_id=run_id,
details={
"approver_id": approver_id,
"decision": decision,
"reason": reason,
"new_state": new_state,
},
run_id=run_id,
)
except Exception as exc:
logger.warning("approval_audit_write_failed", run_id=run_id, error=str(exc))
return {
"run_id": run_id,
"decision": decision,
"new_state": new_state,
"approval_token_jti": approval_token_jti,
}
async def _record_approval_decision_step(
*,
run_id: UUID,
project_id: str,
decision: str,
approver_id: str,
reason: str | None,
) -> None:
"""把 Operator Console 的人工審批決策寫進 Run Step Journal。
這是治理與可觀測節點,不是執行閘門本身;寫入失敗不可反向阻擋
已完成的 approve / reject否則會讓人工決策狀態機產生二次故障。
"""
tool_name = (
"operator_console.approve"
if decision == "approve"
else "operator_console.reject"
)
summary = _truncate_step_summary(
f"approver={approver_id}; decision={decision}; reason={reason or '-'}"
)
try:
async with get_db_context(project_id) as db:
max_result = await db.execute(
select(func.coalesce(func.max(AwoooPRunStepJournal.step_seq), 0)).where(
AwoooPRunStepJournal.run_id == run_id,
AwoooPRunStepJournal.project_id == project_id,
)
)
step_seq = int(max_result.scalar_one()) + 1
db.add(
AwoooPRunStepJournal(
run_id=run_id,
project_id=project_id,
step_seq=step_seq,
tool_name=tool_name,
result_status="success",
block_reason=summary,
completed_at=_utc_now_naive(),
)
)
await db.execute(
update(AwoooPRunState)
.where(
AwoooPRunState.run_id == run_id,
AwoooPRunState.project_id == project_id,
)
.values(step_count=AwoooPRunState.step_count + 1)
)
logger.info(
"approval_decision_step_recorded",
run_id=str(run_id),
project_id=project_id,
decision=decision,
approver_id=approver_id,
)
except Exception as exc:
logger.warning(
"approval_decision_step_record_failed",
run_id=str(run_id),
project_id=project_id,
decision=decision,
error=str(exc),
)