Files
awoooi/apps/api/src/services/platform_operator_service.py
Your Name 9d02ab8080
All checks were successful
Code Review / ai-code-review (push) Successful in 11s
CD Pipeline / tests (push) Successful in 1m6s
CD Pipeline / build-and-deploy (push) Successful in 3m30s
CD Pipeline / post-deploy-checks (push) Successful in 2m12s
feat(awooop): surface mcp investigation evidence
2026-05-18 13:55:27 +08:00

1506 lines
52 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
AwoooP Operator Console — Platform Operator Service
====================================================
leWOOOgo 積木化DB 存取集中在 Service 層Router 不直接引用 get_db。
ADR-106AwoooP Agent Platform
2026-05-05 ogt + Claude Sonnet 4.6
"""
from __future__ import annotations
import re
import uuid
from collections import defaultdict
from datetime import UTC, datetime
from typing import Any
from uuid import UUID
import structlog
from fastapi import HTTPException, status
from sqlalchemy import func, select, text, update
from sqlalchemy import or_ as sa_or
from src.db.awooop_models import (
AwoooPContractRevision,
AwoooPConversationEvent,
AwoooPMcpGatewayAudit,
AwoooPOutboundMessage,
AwoooPRunState,
AwoooPRunStepJournal,
)
from src.db.base import get_db_context
from src.db.models import MCPAuditLog
from src.services.audit_sink import write_audit
from src.services.awooop_approval_token import issue_approval_token, record_approval
from src.services.awooop_truth_chain_service import (
_summarize_gateway_mcp,
_summarize_mcp,
)
from src.services.run_state_machine import transition
logger = structlog.get_logger(__name__)
_MAX_CONTRACTS = 200
_DEFAULT_PER_PAGE = 50
_MAX_PER_PAGE = 200
_MAX_EVENTS = 100
_MAX_TIMELINE_ITEMS = 100
_MAX_LIST_CONTEXT_ROWS = 500
_MAX_STEP_SUMMARY_CHARS = 128
_REMEDIATION_HISTORY_LIMIT = 20
_INCIDENT_ID_RE = re.compile(r"\bINC-\d{8}-[A-Z0-9]{4,}\b")
_REMEDIATION_STATUS_FILTERS = {
"mcp_observed",
"no_evidence",
"read_only_dry_run",
"write_observed",
"blocked",
"observed",
}
# =============================================================================
# Tenants
# =============================================================================
async def list_tenants() -> dict[str, Any]:
"""列出所有 AwoooP 租戶Operator Console不依 RLS 過濾)。"""
async with get_db_context("awoooi") as db:
result = await db.execute(
text("""
SELECT
project_id,
display_name,
migration_mode,
budget_limit_usd,
is_active,
created_at
FROM awooop_operator_list_projects()
""")
)
rows = list(result.mappings().all())
tenants = [
{
"project_id": r["project_id"],
"display_name": r["display_name"],
"migration_mode": r["migration_mode"],
"budget_limit_usd": r["budget_limit_usd"],
"is_active": r["is_active"],
"created_at": r["created_at"],
}
for r in rows
]
return {"tenants": tenants, "total": len(tenants)}
# =============================================================================
# Contracts
# =============================================================================
async def list_contracts(
project_id: str | None,
lifecycle_status: str | None,
) -> dict[str, Any]:
"""列出合約 revisions可 filter by project_id / lifecycle_status"""
async with get_db_context("awoooi") as db:
stmt = select(AwoooPContractRevision).order_by(
AwoooPContractRevision.created_at.desc()
)
if project_id is not None:
stmt = stmt.where(AwoooPContractRevision.project_id == project_id)
if lifecycle_status is not None:
stmt = stmt.where(
AwoooPContractRevision.lifecycle_status == lifecycle_status
)
count_stmt = select(func.count()).select_from(stmt.subquery())
total_result = await db.execute(count_stmt)
total = total_result.scalar_one()
stmt = stmt.limit(_MAX_CONTRACTS)
result = await db.execute(stmt)
rows = list(result.scalars().all())
contracts = [
{
"revision_id": r.revision_id,
"contract_id": r.contract_id,
"contract_family": r.contract_family,
"lifecycle_status": r.lifecycle_status,
"body_hash": r.body_hash,
"version_major": r.version_major,
"version_minor": r.version_minor,
"created_at": r.created_at,
"project_id": r.project_id,
}
for r in rows
]
return {"contracts": contracts, "total": total}
# =============================================================================
# Runs
# =============================================================================
async def list_runs(
project_id: str | None,
state: str | None,
remediation_status: str | None,
incident_id: str | None,
page: int,
per_page: int,
) -> dict[str, Any]:
"""列出 runs支援 project_id、state、remediation_status、incident_id filter 與分頁。"""
_validate_remediation_status_filter(remediation_status)
_validate_incident_id_filter(incident_id)
async with get_db_context("awoooi") as db:
stmt = select(AwoooPRunState).order_by(AwoooPRunState.created_at.desc())
if project_id is not None:
stmt = stmt.where(AwoooPRunState.project_id == project_id)
if state is not None:
stmt = stmt.where(AwoooPRunState.state == state)
offset = (page - 1) * per_page
if remediation_status or incident_id:
result = await db.execute(stmt)
candidate_rows = list(result.scalars().all())
context_limit = _list_filter_context_limit(len(candidate_rows))
inbound_by_run, outbound_by_run = await _load_run_message_context(
db,
candidate_rows,
limit=context_limit,
)
remediation_summaries = await _build_run_remediation_summaries(
runs=candidate_rows,
inbound_by_run=inbound_by_run,
outbound_by_run=outbound_by_run,
)
filtered_rows = [
row
for row in candidate_rows
if _remediation_summary_matches_status(
remediation_summaries.get(row.run_id),
remediation_status,
)
and _remediation_summary_matches_incident_id(
remediation_summaries.get(row.run_id),
incident_id,
)
]
total = len(filtered_rows)
rows = filtered_rows[offset : offset + per_page]
else:
count_stmt = select(func.count()).select_from(stmt.subquery())
total_result = await db.execute(count_stmt)
total = total_result.scalar_one()
stmt = stmt.offset(offset).limit(per_page)
result = await db.execute(stmt)
rows = list(result.scalars().all())
inbound_by_run, outbound_by_run = await _load_run_message_context(db, rows)
remediation_summaries = await _build_run_remediation_summaries(
runs=rows,
inbound_by_run=inbound_by_run,
outbound_by_run=outbound_by_run,
)
runs = [
{
"run_id": r.run_id,
"project_id": r.project_id,
"agent_id": r.agent_id,
"state": r.state,
"is_shadow": r.is_shadow,
"cost_usd": r.cost_usd,
"step_count": r.step_count,
"created_at": r.created_at,
"timeout_at": r.timeout_at,
"remediation_summary": remediation_summaries.get(r.run_id),
}
for r in rows
]
return {"runs": runs, "total": total, "page": page, "per_page": per_page}
def _timeline_item(
*,
ts: Any,
kind: str,
title: str,
status: str,
summary: str | None = None,
metadata: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Build one Operator Console timeline item."""
return {
"ts": ts,
"kind": kind,
"title": title,
"status": status,
"summary": summary,
"metadata": metadata or {},
}
def _utc_now_naive() -> datetime:
"""回傳與 AwoooP timestamp-without-timezone 欄位相容的 UTC 時間。"""
return datetime.now(UTC).replace(tzinfo=None)
def _truncate_step_summary(value: str | None) -> str | None:
"""壓縮 Step summary避免超過 DB 欄位與前端 timeline 需要的短摘要。"""
if not value:
return None
compact = " ".join(str(value).split())
if len(compact) <= _MAX_STEP_SUMMARY_CHARS:
return compact
return f"{compact[: _MAX_STEP_SUMMARY_CHARS - 1]}"
def _approval_step_title(tool_name: str, step_seq: int) -> str:
"""將 operator_console.* step 轉成人能一眼理解的 timeline 標題。"""
if tool_name == "operator_console.approve":
return f"人工審批 {step_seq}: 核准"
if tool_name == "operator_console.reject":
return f"人工審批 {step_seq}: 拒絕"
return f"Step {step_seq}: {tool_name}"
def _outbound_timeline_title(
channel_type: str,
message_type: str,
content_preview: str | None,
) -> str:
"""將 legacy Telegram outbound 分類成 Operator 看得懂的語義標題。"""
channel = channel_type.upper()
preview = content_preview or ""
if "RUNBOOK REVIEW" in preview:
return f"{channel}Runbook 待人工審核"
if "[AWOOOI CI/CD]" in preview or "AWOOOI CI/CD" in preview:
return f"{channel}CI/CD 狀態通知"
if "AI 治理警報" in preview:
return f"{channel}AI 治理警報"
if "HANDOFF REQUIRED" in preview or "AI 自動修復失敗" in preview:
return f"{channel}AI 自動修復失敗,已轉人工"
if "AUTO RESOLVED" in preview or "AI 自動修復完成" in preview:
return f"{channel}AI 自動修復完成"
if "ESCALATION" in preview or "事故升級" in preview:
return f"{channel}:事故升級通知"
if "ACTION REQUIRED" in preview:
return f"{channel}:告警審批卡"
fallback = {
"approval_request": "人工審批請求",
"error": "錯誤回覆",
"final": "處置結果",
"interim": "漸進式狀態回饋",
}.get(message_type, message_type)
return f"{channel}{fallback}"
def _mcp_gateway_summary_row(row: AwoooPMcpGatewayAudit) -> dict[str, Any]:
"""Convert SQLAlchemy audit rows into the truth-chain summary shape."""
return {
"agent_id": row.agent_id,
"tool_name": row.tool_name,
"result_status": row.result_status,
"block_gate": row.block_gate,
"gate_result": row.gate_result or {},
}
def _as_dict(value: Any) -> dict[str, Any]:
"""Return dict payloads defensively; DB JSON fields may be null or stale."""
return value if isinstance(value, dict) else {}
def _append_unique(values: list[str], candidate: Any) -> None:
"""Append non-empty string once while preserving discovery order."""
text_value = str(candidate or "").strip()
if text_value and text_value not in values:
values.append(text_value)
def _append_incident_ids_from_text(values: list[str], text_value: Any) -> None:
"""Extract incident ids from legacy text payloads."""
if not text_value:
return
for incident_id in _INCIDENT_ID_RE.findall(str(text_value)):
_append_unique(values, incident_id)
def _append_incident_ids_from_source_envelope(values: list[str], envelope: Any) -> None:
"""Extract incident ids from AwoooP channel event source_refs."""
source_refs = _as_dict(_as_dict(envelope).get("source_refs"))
incident_ids = source_refs.get("incident_ids")
if isinstance(incident_ids, list):
for incident_id in incident_ids:
_append_unique(values, incident_id)
else:
_append_unique(values, incident_ids)
def _collect_run_incident_ids(
*,
run: AwoooPRunState,
inbound_events: list[AwoooPConversationEvent],
outbound_messages: list[AwoooPOutboundMessage],
) -> list[str]:
"""Collect incident ids that tie a Run back to legacy incident evidence."""
incident_ids: list[str] = []
_append_incident_ids_from_text(incident_ids, run.trigger_ref)
_append_incident_ids_from_text(incident_ids, run.error_detail)
for event in inbound_events:
_append_incident_ids_from_source_envelope(incident_ids, event.source_envelope)
_append_incident_ids_from_text(incident_ids, event.content_preview)
_append_incident_ids_from_text(incident_ids, event.content_redacted)
for message in outbound_messages:
_append_incident_ids_from_text(incident_ids, message.content_preview)
_append_incident_ids_from_text(incident_ids, message.send_error)
return incident_ids
async def _load_run_message_context(
db: Any,
runs: list[AwoooPRunState],
*,
limit: int = _MAX_LIST_CONTEXT_ROWS,
) -> tuple[
dict[UUID, list[AwoooPConversationEvent]],
dict[UUID, list[AwoooPOutboundMessage]],
]:
"""Load list-page sidecar events needed to link runs back to incidents."""
if not runs:
return {}, {}
run_ids = [run.run_id for run in runs]
run_ids_set = set(run_ids)
trigger_refs = [str(run.trigger_ref) for run in runs if run.trigger_ref]
trigger_ref_to_run = {
str(run.trigger_ref): run.run_id
for run in runs
if run.trigger_ref
}
trigger_event_ids: list[UUID] = []
for trigger_ref in trigger_refs:
try:
trigger_event_ids.append(uuid.UUID(trigger_ref))
except ValueError:
continue
inbound_filters = [AwoooPConversationEvent.run_id.in_(run_ids)]
if trigger_refs:
inbound_filters.append(AwoooPConversationEvent.provider_event_id.in_(trigger_refs))
if trigger_event_ids:
inbound_filters.append(AwoooPConversationEvent.event_id.in_(trigger_event_ids))
inbound_result = await db.execute(
select(AwoooPConversationEvent)
.where(sa_or(*inbound_filters))
.order_by(AwoooPConversationEvent.received_at.desc())
.limit(limit)
)
inbound_by_run: dict[UUID, list[AwoooPConversationEvent]] = defaultdict(list)
for event in inbound_result.scalars().all():
target_run_id = event.run_id if event.run_id in run_ids_set else None
if target_run_id is None:
target_run_id = trigger_ref_to_run.get(str(event.provider_event_id))
if target_run_id is None:
target_run_id = trigger_ref_to_run.get(str(event.event_id))
if target_run_id is not None:
inbound_by_run[target_run_id].append(event)
outbound_result = await db.execute(
select(AwoooPOutboundMessage)
.where(AwoooPOutboundMessage.run_id.in_(run_ids))
.order_by(AwoooPOutboundMessage.queued_at.desc())
.limit(limit)
)
outbound_by_run: dict[UUID, list[AwoooPOutboundMessage]] = defaultdict(list)
for message in outbound_result.scalars().all():
outbound_by_run[message.run_id].append(message)
return dict(inbound_by_run), dict(outbound_by_run)
def _list_filter_context_limit(candidate_count: int) -> int:
return min(max(candidate_count * 4, _MAX_LIST_CONTEXT_ROWS), 20_000)
def _route_label_from_remediation(item: dict[str, Any]) -> str:
"""Render remediation MCP route consistently with Telegram / Work Items."""
return "/".join(
str(part)
for part in (
item.get("agent_id"),
item.get("tool_name"),
item.get("required_scope"),
)
if part
) or "--"
def _route_label_from_legacy_mcp(record: dict[str, Any]) -> str:
"""Render self-built/legacy MCP evidence as agent/tool/scope for list UX."""
tool = record.get("tool_name")
server = record.get("mcp_server")
tool_label = ".".join(str(part) for part in (server, tool) if part) or tool
return "/".join(
str(part)
for part in (
record.get("agent_role"),
tool_label,
"read",
)
if part
) or "--"
def _remediation_timeline_status(item: dict[str, Any]) -> str:
if item.get("success") is False or item.get("allowed") is False:
return "failed"
if item.get("verification_result_preview") == "success":
return "success"
return "warning"
def _remediation_timeline_summary(item: dict[str, Any]) -> str:
return (
f"incident={item.get('incident_id') or '--'} "
f"mode={item.get('mode') or '--'} "
f"preview={item.get('verification_result_preview') or '--'} "
f"route={_route_label_from_remediation(item)} "
f"writes_incident={item.get('writes_incident_state')} "
f"writes_auto_repair={item.get('writes_auto_repair_result')}"
)[:500]
def _legacy_mcp_timeline_status(record: dict[str, Any]) -> str:
if record.get("success") is True:
return "success"
if record.get("success") is False:
return "failed"
return "warning"
def _legacy_mcp_timeline_summary(record: dict[str, Any]) -> str:
return (
f"incident={record.get('incident_id') or '--'} "
f"agent={record.get('agent_role') or '--'} "
f"node={record.get('flywheel_node') or '--'} "
f"duration_ms={record.get('duration_ms') if record.get('duration_ms') is not None else '--'} "
f"error={record.get('error_message') or '--'}"
)[:500]
def _run_remediation_list_summary(
*,
run: AwoooPRunState,
incident_ids: list[str],
items: list[dict[str, Any]],
legacy_mcp_records: list[dict[str, Any]] | None = None,
errors: list[dict[str, str]] | None = None,
) -> dict[str, Any]:
"""Summarize durable ADR-100 dry-run and MCP investigation evidence for list UX."""
sorted_items = sorted(
(item for item in items if isinstance(item, dict)),
key=lambda item: str(item.get("created_at") or ""),
reverse=True,
)
sorted_mcp_records = sorted(
(record for record in (legacy_mcp_records or []) if isinstance(record, dict)),
key=lambda record: str(record.get("created_at") or ""),
reverse=True,
)
latest = sorted_items[0] if sorted_items else {}
latest_mcp = sorted_mcp_records[0] if sorted_mcp_records else {}
writes_incident = latest.get("writes_incident_state")
writes_auto_repair = latest.get("writes_auto_repair_result")
route = (
_route_label_from_remediation(latest)
if latest
else _route_label_from_legacy_mcp(latest_mcp)
if latest_mcp
else "--"
)
write_observed = writes_incident is True or writes_auto_repair is True
is_read_only = (
bool(latest)
and latest.get("required_scope") == "read"
and writes_incident is False
and writes_auto_repair is False
)
mcp_total = len(sorted_mcp_records)
mcp_success = sum(1 for record in sorted_mcp_records if record.get("success") is True)
mcp_failed = sum(1 for record in sorted_mcp_records if record.get("success") is False)
if not sorted_items:
status_value = "mcp_observed" if mcp_total > 0 else "no_evidence"
elif latest.get("success") is False or latest.get("allowed") is False:
status_value = "blocked"
elif write_observed:
status_value = "write_observed"
elif is_read_only:
status_value = "read_only_dry_run"
else:
status_value = "observed"
return {
"schema_version": "awooop_run_remediation_summary_v1",
"source": "alert_operation_log" if sorted_items else "mcp_audit_log" if mcp_total > 0 else "none",
"incident_ids": incident_ids,
"total": len(sorted_items),
"evidence_total": len(sorted_items) + mcp_total,
"status": status_value,
"has_dry_run": bool(sorted_items),
"has_mcp_investigation": mcp_total > 0,
"is_read_only": is_read_only,
"human_gate_open": run.state == "waiting_approval",
"latest_at": latest.get("created_at"),
"latest_preview": latest.get("verification_result_preview"),
"latest_mode": latest.get("mode"),
"latest_route": route,
"latest_agent_id": latest.get("agent_id") or latest_mcp.get("agent_role"),
"latest_tool_name": latest.get("tool_name") or latest_mcp.get("tool_name"),
"latest_required_scope": latest.get("required_scope") or ("read" if latest_mcp else None),
"writes_incident_state": writes_incident,
"writes_auto_repair_result": writes_auto_repair,
"mcp_observation_total": mcp_total,
"mcp_observation_success": mcp_success,
"mcp_observation_failed": mcp_failed,
"latest_mcp_server": latest_mcp.get("mcp_server"),
"errors": errors or [],
}
def _validate_remediation_status_filter(value: str | None) -> None:
if value is None:
return
if value not in _REMEDIATION_STATUS_FILTERS:
allowed = ", ".join(sorted(_REMEDIATION_STATUS_FILTERS))
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"remediation_status 必須是: {allowed}",
)
def _validate_incident_id_filter(value: str | None) -> None:
if value is None:
return
if not _INCIDENT_ID_RE.fullmatch(value):
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="incident_id 格式錯誤,必須是 INC-YYYYMMDD-XXXX",
)
def _remediation_summary_matches_status(
summary: dict[str, Any] | None,
remediation_status: str | None,
) -> bool:
if remediation_status is None:
return True
status_value = str((summary or {}).get("status") or "no_evidence")
return status_value == remediation_status
def _remediation_summary_matches_incident_id(
summary: dict[str, Any] | None,
incident_id: str | None,
) -> bool:
if incident_id is None:
return True
incident_ids = (summary or {}).get("incident_ids")
return isinstance(incident_ids, list) and incident_id in incident_ids
async def _build_run_remediation_summaries(
*,
runs: list[AwoooPRunState],
inbound_by_run: dict[UUID, list[AwoooPConversationEvent]],
outbound_by_run: dict[UUID, list[AwoooPOutboundMessage]],
) -> dict[UUID, dict[str, Any]]:
"""Build remediation summaries for list endpoints without writing state."""
if not runs:
return {}
incident_ids_by_run: dict[UUID, list[str]] = {}
all_incident_ids: list[str] = []
for run in runs:
incident_ids = _collect_run_incident_ids(
run=run,
inbound_events=inbound_by_run.get(run.run_id, []),
outbound_messages=outbound_by_run.get(run.run_id, []),
)
incident_ids_by_run[run.run_id] = incident_ids
for incident_id in incident_ids:
_append_unique(all_incident_ids, incident_id)
histories_by_incident: dict[str, list[dict[str, Any]]] = {}
legacy_mcp_by_incident: dict[str, list[dict[str, Any]]] = {}
errors_by_incident: dict[str, dict[str, str]] = {}
if all_incident_ids:
from src.services.adr100_remediation_service import Adr100RemediationService
service = Adr100RemediationService(record_history=False)
for incident_id in all_incident_ids:
try:
history = await service.history(
limit=_REMEDIATION_HISTORY_LIMIT,
incident_id=incident_id,
)
histories_by_incident[incident_id] = [
item
for item in history.get("items", [])
if isinstance(item, dict)
]
except Exception as exc:
logger.warning(
"run_list_remediation_history_fetch_failed",
incident_id=incident_id,
error=str(exc),
)
errors_by_incident[incident_id] = {
"incident_id": incident_id,
"error": str(exc),
}
legacy_mcp_by_incident = await _fetch_legacy_mcp_by_incident_ids(
all_incident_ids,
limit=min(max(len(all_incident_ids) * _REMEDIATION_HISTORY_LIMIT, 100), 5_000),
)
summaries: dict[UUID, dict[str, Any]] = {}
for run in runs:
incident_ids = incident_ids_by_run.get(run.run_id, [])
items: list[dict[str, Any]] = []
legacy_mcp_records: list[dict[str, Any]] = []
errors: list[dict[str, str]] = []
for incident_id in incident_ids:
items.extend(histories_by_incident.get(incident_id, []))
legacy_mcp_records.extend(legacy_mcp_by_incident.get(incident_id, []))
if incident_id in errors_by_incident:
errors.append(errors_by_incident[incident_id])
summaries[run.run_id] = _run_remediation_list_summary(
run=run,
incident_ids=incident_ids,
items=items,
legacy_mcp_records=legacy_mcp_records,
errors=errors,
)
return summaries
def _timeline_sort_key(item: dict[str, Any], fallback_ts: Any) -> str:
"""Normalize mixed DB datetime / ISO string timestamps for timeline sorting."""
value = item.get("ts") or fallback_ts
if hasattr(value, "isoformat"):
return value.isoformat()
return str(value or "")
def _summarize_run_remediation_by_work_item(
items: list[dict[str, Any]],
) -> list[dict[str, Any]]:
summary: dict[str, dict[str, Any]] = {}
for item in items:
key = str(item.get("work_item_id") or item.get("incident_id") or item.get("id"))
if key not in summary:
summary[key] = {
"work_item_id": item.get("work_item_id"),
"incident_id": item.get("incident_id"),
"count": 0,
"latest_at": item.get("created_at"),
"latest_preview": item.get("verification_result_preview"),
"latest_mode": item.get("mode"),
"latest_route": _route_label_from_remediation(item),
}
summary[key]["count"] += 1
return list(summary.values())
async def _fetch_run_remediation_history(
incident_ids: list[str],
*,
limit: int = _REMEDIATION_HISTORY_LIMIT,
) -> dict[str, Any]:
"""Fetch durable ADR-100 remediation dry-run evidence linked to run incidents."""
if not incident_ids:
return {
"schema_version": "awooop_run_remediation_evidence_v1",
"source": "alert_operation_log",
"incident_ids": [],
"total": 0,
"limit": limit,
"items": [],
"by_work_item": [],
"errors": [],
}
from src.services.adr100_remediation_service import Adr100RemediationService
service = Adr100RemediationService(record_history=False)
items: list[dict[str, Any]] = []
errors: list[dict[str, str]] = []
for incident_id in incident_ids:
try:
history = await service.history(limit=limit, incident_id=incident_id)
items.extend(
item
for item in history.get("items", [])
if isinstance(item, dict)
)
except Exception as exc:
logger.warning(
"run_remediation_history_fetch_failed",
incident_id=incident_id,
error=str(exc),
)
errors.append({"incident_id": incident_id, "error": str(exc)})
items.sort(key=lambda item: str(item.get("created_at") or ""), reverse=True)
visible_items = items[:limit]
return {
"schema_version": "awooop_run_remediation_evidence_v1",
"source": "alert_operation_log",
"incident_ids": incident_ids,
"total": len(items),
"limit": limit,
"items": visible_items,
"by_work_item": _summarize_run_remediation_by_work_item(visible_items),
"errors": errors,
}
def _legacy_mcp_record(row: MCPAuditLog) -> dict[str, Any]:
return {
"id": row.id,
"session_id": row.session_id,
"flywheel_node": row.flywheel_node,
"mcp_server": row.mcp_server,
"tool_name": row.tool_name,
"duration_ms": row.duration_ms,
"success": row.success,
"error_message": row.error_message,
"incident_id": row.incident_id,
"agent_role": row.agent_role,
"created_at": row.created_at,
}
async def _fetch_legacy_mcp_by_incident_ids(
incident_ids: list[str],
*,
limit: int,
) -> dict[str, list[dict[str, Any]]]:
"""Fetch legacy/self-built MCP rows for list evidence summaries."""
if not incident_ids:
return {}
async with get_db_context("awoooi") as db:
result = await db.execute(
select(MCPAuditLog)
.where(MCPAuditLog.incident_id.in_(incident_ids))
.order_by(MCPAuditLog.created_at.desc())
.limit(limit)
)
rows = list(result.scalars().all())
by_incident: dict[str, list[dict[str, Any]]] = defaultdict(list)
for row in rows:
if row.incident_id:
by_incident[row.incident_id].append(_legacy_mcp_record(row))
return dict(by_incident)
async def _fetch_run_legacy_mcp_history(
incident_ids: list[str],
*,
limit: int = _MAX_TIMELINE_ITEMS,
) -> dict[str, Any]:
"""Fetch legacy/self-built MCP audit rows linked through incident ids."""
if not incident_ids:
return {
"schema_version": "awooop_run_legacy_mcp_evidence_v1",
"source": "mcp_audit_log",
"incident_ids": [],
"total": 0,
"limit": limit,
"records": [],
"summary": _summarize_mcp([]),
}
async with get_db_context("awoooi") as db:
result = await db.execute(
select(MCPAuditLog)
.where(MCPAuditLog.incident_id.in_(incident_ids))
.order_by(MCPAuditLog.created_at.desc())
.limit(limit)
)
rows = list(result.scalars().all())
records = [_legacy_mcp_record(row) for row in rows]
return {
"schema_version": "awooop_run_legacy_mcp_evidence_v1",
"source": "mcp_audit_log",
"incident_ids": incident_ids,
"total": len(records),
"limit": limit,
"records": records,
"summary": _summarize_mcp(records),
}
async def get_run_detail(
run_id: str,
project_id: str | None = None,
) -> dict[str, Any]:
"""取得單一 Run 的處置脈絡,供 AwoooP Run detail / Timeline 顯示。"""
try:
run_uuid = uuid.UUID(run_id)
except ValueError as exc:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"run_id 格式錯誤: {exc}",
) from exc
async with get_db_context(project_id or "awoooi") as db:
run_stmt = select(AwoooPRunState).where(AwoooPRunState.run_id == run_uuid)
if project_id is not None:
run_stmt = run_stmt.where(AwoooPRunState.project_id == project_id)
run_result = await db.execute(run_stmt)
run = run_result.scalar_one_or_none()
if run is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"run {run_id!r} 不存在",
)
steps_result = await db.execute(
select(AwoooPRunStepJournal)
.where(AwoooPRunStepJournal.run_id == run_uuid)
.order_by(AwoooPRunStepJournal.step_seq.asc())
.limit(_MAX_TIMELINE_ITEMS)
)
steps = list(steps_result.scalars().all())
inbound_where = [AwoooPConversationEvent.run_id == run_uuid]
if run.trigger_ref:
try:
trigger_event_uuid = uuid.UUID(run.trigger_ref)
inbound_where.append(AwoooPConversationEvent.event_id == trigger_event_uuid)
except ValueError:
inbound_where.append(
AwoooPConversationEvent.provider_event_id == run.trigger_ref
)
inbound_result = await db.execute(
select(AwoooPConversationEvent)
.where(sa_or(*inbound_where))
.order_by(AwoooPConversationEvent.received_at.asc())
.limit(_MAX_TIMELINE_ITEMS)
)
inbound_events = list(inbound_result.scalars().all())
outbound_result = await db.execute(
select(AwoooPOutboundMessage)
.where(AwoooPOutboundMessage.run_id == run_uuid)
.order_by(AwoooPOutboundMessage.queued_at.asc())
.limit(_MAX_TIMELINE_ITEMS)
)
outbound_messages = list(outbound_result.scalars().all())
mcp_result = await db.execute(
select(AwoooPMcpGatewayAudit)
.where(AwoooPMcpGatewayAudit.run_id == run_uuid)
.order_by(AwoooPMcpGatewayAudit.created_at.asc())
.limit(_MAX_TIMELINE_ITEMS)
)
mcp_calls = list(mcp_result.scalars().all())
run_payload = {
"run_id": run.run_id,
"project_id": run.project_id,
"agent_id": run.agent_id,
"state": run.state,
"is_shadow": run.is_shadow,
"trace_id": run.trace_id,
"trigger_type": run.trigger_type,
"trigger_ref": run.trigger_ref,
"cost_usd": run.cost_usd,
"step_count": run.step_count,
"attempt_count": run.attempt_count,
"max_attempts": run.max_attempts,
"error_code": run.error_code,
"error_detail": run.error_detail,
"created_at": run.created_at,
"started_at": run.started_at,
"completed_at": run.completed_at,
"timeout_at": run.timeout_at,
"heartbeat_at": run.heartbeat_at,
}
step_items = [
{
"step_id": row.step_id,
"step_seq": row.step_seq,
"tool_name": row.tool_name,
"result_status": row.result_status,
"was_blocked": row.was_blocked,
"block_reason": row.block_reason,
"error_code": row.error_code,
"latency_ms": row.latency_ms,
"created_at": row.created_at,
"completed_at": row.completed_at,
}
for row in steps
]
inbound_items = [
{
"event_id": row.event_id,
"channel_type": row.channel_type,
"provider_event_id": row.provider_event_id,
"content_preview": row.content_preview,
"is_duplicate": row.is_duplicate,
"received_at": row.received_at,
}
for row in inbound_events
]
outbound_items = [
{
"message_id": row.message_id,
"channel_type": row.channel_type,
"message_type": row.message_type,
"content_preview": row.content_preview,
"send_status": row.send_status,
"send_error": row.send_error,
"provider_message_id": row.provider_message_id,
"queued_at": row.queued_at,
"sent_at": row.sent_at,
"triggered_by_state": row.triggered_by_state,
}
for row in outbound_messages
]
def _mcp_item(row: AwoooPMcpGatewayAudit) -> dict[str, Any]:
gate_result = row.gate_result if isinstance(row.gate_result, dict) else {}
return {
"call_id": row.call_id,
"agent_id": row.agent_id,
"tool_name": row.tool_name,
"result_status": row.result_status,
"block_gate": row.block_gate,
"block_reason": row.block_reason,
"latency_ms": row.latency_ms,
"created_at": row.created_at,
"required_scope": gate_result.get("required_scope"),
"policy_enforced": gate_result.get("policy_enforced"),
"is_shadow": gate_result.get("is_shadow"),
"gate_result": gate_result,
}
mcp_items = [_mcp_item(row) for row in mcp_calls]
mcp_gateway_summary = _summarize_gateway_mcp([
_mcp_gateway_summary_row(row) for row in mcp_calls
])
incident_ids = _collect_run_incident_ids(
run=run,
inbound_events=inbound_events,
outbound_messages=outbound_messages,
)
legacy_mcp_history = await _fetch_run_legacy_mcp_history(incident_ids)
remediation_history = await _fetch_run_remediation_history(incident_ids)
timeline: list[dict[str, Any]] = [
_timeline_item(
ts=run.created_at,
kind="run",
title="Run 建立",
status=run.state,
summary=f"{run.trigger_type or 'unknown'}{run.agent_id}",
metadata={"trace_id": run.trace_id, "trigger_ref": run.trigger_ref},
)
]
if run.started_at:
timeline.append(
_timeline_item(
ts=run.started_at,
kind="run",
title="Run 開始執行",
status="running",
summary=run.worker_id,
)
)
for row in inbound_events:
timeline.append(
_timeline_item(
ts=row.received_at,
kind="inbound",
title=f"{row.channel_type} 入站事件",
status="duplicate" if row.is_duplicate else "received",
summary=row.content_preview,
metadata={"provider_event_id": row.provider_event_id},
)
)
for row in steps:
is_approval_step = row.tool_name.startswith("operator_console.")
timeline.append(
_timeline_item(
ts=row.completed_at or row.created_at,
kind="approval" if is_approval_step else "step",
title=_approval_step_title(row.tool_name, row.step_seq),
status=row.result_status,
summary=row.block_reason or row.error_code,
metadata={
"was_blocked": row.was_blocked,
"latency_ms": row.latency_ms,
},
)
)
for row in mcp_calls:
gate_result = row.gate_result if isinstance(row.gate_result, dict) else {}
scope = gate_result.get("required_scope")
policy_enforced = gate_result.get("policy_enforced")
summary = row.block_reason
if summary is None:
summary = (
f"agent={row.agent_id or 'unknown'}"
f" scope={scope or 'unknown'}"
f" policy_enforced={policy_enforced}"
)
timeline.append(
_timeline_item(
ts=row.created_at,
kind="mcp",
title=f"MCP: {row.tool_name}",
status=row.result_status,
summary=summary,
metadata={
"agent_id": row.agent_id,
"block_gate": row.block_gate,
"required_scope": scope,
"policy_enforced": policy_enforced,
"latency_ms": row.latency_ms,
},
)
)
for record in legacy_mcp_history.get("records", []):
if not isinstance(record, dict):
continue
tool_route = "/".join(
part
for part in (
str(record.get("mcp_server") or ""),
str(record.get("tool_name") or ""),
)
if part
) or "unknown"
timeline.append(
_timeline_item(
ts=record.get("created_at"),
kind="mcp",
title=f"Legacy MCP: {tool_route}",
status=_legacy_mcp_timeline_status(record),
summary=_legacy_mcp_timeline_summary(record),
metadata={
"incident_id": record.get("incident_id"),
"agent_role": record.get("agent_role"),
"flywheel_node": record.get("flywheel_node"),
"history_source": "mcp_audit_log",
},
)
)
for item in remediation_history.get("items", []):
if not isinstance(item, dict):
continue
timeline.append(
_timeline_item(
ts=item.get("created_at"),
kind="remediation",
title="ADR-100 補救試跑",
status=_remediation_timeline_status(item),
summary=_remediation_timeline_summary(item),
metadata={
"incident_id": item.get("incident_id"),
"work_item_id": item.get("work_item_id"),
"mcp_route": _route_label_from_remediation(item),
"writes_incident_state": item.get("writes_incident_state"),
"writes_auto_repair_result": item.get("writes_auto_repair_result"),
"history_source": "alert_operation_log",
},
)
)
for row in outbound_messages:
timeline.append(
_timeline_item(
ts=row.sent_at or row.queued_at,
kind="outbound",
title=_outbound_timeline_title(
row.channel_type,
row.message_type,
row.content_preview,
),
status=row.send_status,
summary=row.content_preview or row.send_error,
metadata={
"message_type": row.message_type,
"provider_message_id": row.provider_message_id,
"triggered_by_state": row.triggered_by_state,
},
)
)
if run.completed_at:
timeline.append(
_timeline_item(
ts=run.completed_at,
kind="run",
title="Run 結束",
status=run.state,
summary=run.error_detail or run.error_code,
)
)
timeline = sorted(
timeline,
key=lambda item: _timeline_sort_key(item, run.created_at),
)[:_MAX_TIMELINE_ITEMS]
return {
"run": run_payload,
"steps": step_items,
"inbound_events": inbound_items,
"outbound_messages": outbound_items,
"mcp_calls": mcp_items,
"mcp_gateway": mcp_gateway_summary,
"mcp_legacy": legacy_mcp_history,
"remediation_history": remediation_history,
"timeline": timeline,
"counts": {
"steps": len(step_items),
"inbound_events": len(inbound_items),
"outbound_messages": len(outbound_items),
"mcp_calls": len(mcp_items),
"legacy_mcp_calls": legacy_mcp_history.get("total", 0),
"remediation_history": remediation_history.get("total", 0),
"timeline": len(timeline),
},
}
# =============================================================================
# Channel Events
# =============================================================================
async def list_recent_channel_events(
*,
project_id: str | None,
channel_type: str | None,
provider_prefix: str | None,
limit: int,
) -> dict[str, Any]:
"""列出最近 channel events供 Operator Console 顯示收斂/鏡像脈絡。"""
safe_limit = max(1, min(limit, _MAX_EVENTS))
async with get_db_context("awoooi") as db:
stmt = select(AwoooPConversationEvent).order_by(
AwoooPConversationEvent.received_at.desc()
)
if project_id is not None:
stmt = stmt.where(AwoooPConversationEvent.project_id == project_id)
if channel_type is not None:
stmt = stmt.where(AwoooPConversationEvent.channel_type == channel_type)
if provider_prefix is not None:
stmt = stmt.where(
AwoooPConversationEvent.provider_event_id.like(
f"{provider_prefix}%"
)
)
result = await db.execute(stmt.limit(safe_limit))
rows = list(result.scalars().all())
events = [
{
"event_id": r.event_id,
"project_id": r.project_id,
"channel_type": r.channel_type,
"provider_event_id": r.provider_event_id,
"channel_chat_id": r.channel_chat_id,
"content_preview": r.content_preview,
"is_duplicate": r.is_duplicate,
"received_at": r.received_at,
}
for r in rows
]
return {"events": events, "total": len(events), "limit": safe_limit}
# =============================================================================
# Approvals
# =============================================================================
async def list_approvals(
project_id: str | None,
run_id: str | None = None,
remediation_status: str | None = None,
) -> dict[str, Any]:
"""列出 waiting_approval runs可依 project_id / run_id / remediation_status 篩選。"""
_validate_remediation_status_filter(remediation_status)
run_uuid: UUID | None = None
if run_id:
try:
run_uuid = uuid.UUID(run_id)
except ValueError as exc:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"run_id 格式錯誤: {exc}",
) from exc
async with get_db_context("awoooi") as db:
stmt = (
select(AwoooPRunState)
.where(AwoooPRunState.state == "waiting_approval")
.order_by(AwoooPRunState.created_at.asc())
)
if project_id is not None:
stmt = stmt.where(AwoooPRunState.project_id == project_id)
if run_uuid is not None:
stmt = stmt.where(AwoooPRunState.run_id == run_uuid)
count_stmt = select(func.count()).select_from(stmt.subquery())
total_result = await db.execute(count_stmt)
total = total_result.scalar_one()
result = await db.execute(stmt)
rows = list(result.scalars().all())
inbound_by_run, outbound_by_run = await _load_run_message_context(db, rows)
remediation_summaries = await _build_run_remediation_summaries(
runs=rows,
inbound_by_run=inbound_by_run,
outbound_by_run=outbound_by_run,
)
if remediation_status:
rows = [
row
for row in rows
if _remediation_summary_matches_status(
remediation_summaries.get(row.run_id),
remediation_status,
)
]
total = len(rows)
items = [
{
"run_id": r.run_id,
"project_id": r.project_id,
"agent_id": r.agent_id,
"created_at": r.created_at,
"timeout_at": r.timeout_at,
"remediation_summary": remediation_summaries.get(r.run_id),
}
for r in rows
]
return {"approvals": items, "total": total, "items": items}
async def decide_approval(
run_id: str,
project_id: str,
decision: str,
approver_id: str,
reason: str | None,
) -> dict[str, Any]:
"""核准或拒絕一個待審核的 runADR-116 Gate 5"""
try:
run_uuid = uuid.UUID(run_id)
except ValueError as exc:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"run_id 格式錯誤: {exc}",
) from exc
async with get_db_context(project_id) as db:
result = await db.execute(
select(AwoooPRunState).where(
AwoooPRunState.run_id == run_uuid,
AwoooPRunState.project_id == project_id,
)
)
run = result.scalar_one_or_none()
if run is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"run {run_id!r} 不存在或非此 project 所有",
)
if run.state != "waiting_approval":
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"run {run_id!r} 目前狀態為 {run.state!r},無法審核(需為 waiting_approval",
)
approval_token_jti: str | None = None
new_state: str
if decision == "approve":
token = issue_approval_token(
project_id=project_id,
run_id=run_id,
tool_name="operator_console_approve",
approver_id=approver_id,
)
try:
await record_approval(
project_id=project_id,
run_id=run_id,
tool_name="operator_console_approve",
approver_id=approver_id,
token=token,
)
except Exception as exc:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"核准記錄失敗: {exc}",
) from exc
await transition(run_uuid, project_id, "running")
new_state = "running"
await _record_approval_decision_step(
run_id=run_uuid,
project_id=project_id,
decision=decision,
approver_id=approver_id,
reason=reason,
)
import base64
import json as _json
try:
p_b64 = token.split(".")[1]
padding = 4 - len(p_b64) % 4
if padding != 4:
p_b64 += "=" * padding
payload = _json.loads(base64.urlsafe_b64decode(p_b64))
approval_token_jti = payload.get("jti")
except Exception:
approval_token_jti = None
else:
await transition(
run_uuid,
project_id,
"cancelled",
error_code="E-APPR-REJECTED",
error_detail=f"operator 拒絕: approver={approver_id!r}, reason={reason!r}",
)
new_state = "cancelled"
await _record_approval_decision_step(
run_id=run_uuid,
project_id=project_id,
decision=decision,
approver_id=approver_id,
reason=reason,
)
try:
await write_audit(
project_id=project_id,
action=f"run.approval.{decision}",
resource_type="run",
resource_id=run_id,
details={
"approver_id": approver_id,
"decision": decision,
"reason": reason,
"new_state": new_state,
},
run_id=run_id,
)
except Exception as exc:
logger.warning("approval_audit_write_failed", run_id=run_id, error=str(exc))
return {
"run_id": run_id,
"decision": decision,
"new_state": new_state,
"approval_token_jti": approval_token_jti,
}
async def _record_approval_decision_step(
*,
run_id: UUID,
project_id: str,
decision: str,
approver_id: str,
reason: str | None,
) -> None:
"""把 Operator Console 的人工審批決策寫進 Run Step Journal。
這是治理與可觀測節點,不是執行閘門本身;寫入失敗不可反向阻擋
已完成的 approve / reject否則會讓人工決策狀態機產生二次故障。
"""
tool_name = (
"operator_console.approve"
if decision == "approve"
else "operator_console.reject"
)
summary = _truncate_step_summary(
f"approver={approver_id}; decision={decision}; reason={reason or '-'}"
)
try:
async with get_db_context(project_id) as db:
max_result = await db.execute(
select(func.coalesce(func.max(AwoooPRunStepJournal.step_seq), 0)).where(
AwoooPRunStepJournal.run_id == run_id,
AwoooPRunStepJournal.project_id == project_id,
)
)
step_seq = int(max_result.scalar_one()) + 1
db.add(
AwoooPRunStepJournal(
run_id=run_id,
project_id=project_id,
step_seq=step_seq,
tool_name=tool_name,
result_status="success",
block_reason=summary,
completed_at=_utc_now_naive(),
)
)
await db.execute(
update(AwoooPRunState)
.where(
AwoooPRunState.run_id == run_id,
AwoooPRunState.project_id == project_id,
)
.values(step_count=AwoooPRunState.step_count + 1)
)
logger.info(
"approval_decision_step_recorded",
run_id=str(run_id),
project_id=project_id,
decision=decision,
approver_id=approver_id,
)
except Exception as exc:
logger.warning(
"approval_decision_step_record_failed",
run_id=str(run_id),
project_id=project_id,
decision=decision,
error=str(exc),
)