""" Platform Runtime(Shadow Mode Shell) ====================================== AwoooP Phase 4: 第一個 runtime shell,只跑 shadow,不改 legacy 行為(ADR-106) 2026-05-04 ogt + Claude Sonnet 4.6 Shadow Mode 保證: 1. 0 user-visible response(不發送 Telegram/Slack 任何訊息) 2. 0 destructive tool call(is_destructive=true 的工具全部攔截) 3. 所有執行記錄寫入 awooop_run_state + step_journal(可觀測) 4. budget_service hard kill 同樣作用(防止 shadow 跑出超額費用) Idempotency(ADR-114): (project_id, channel_type, provider_event_id) 複合唯一 Redis NX 先攔(快),PG constraint 最後防(準確) """ from __future__ import annotations import hashlib import json import uuid from datetime import datetime, timedelta, timezone from typing import Any import structlog from sqlalchemy import select from sqlalchemy.dialects.postgresql import insert as pg_insert from src.db.awooop_models import AwoooPRunIdempotency, AwoooPRunState, AwoooPRunStepJournal from src.db.base import get_db_context from src.services.run_state_machine import LEASE_TTL_SECONDS, transition logger = structlog.get_logger(__name__) # Shadow mode 設定 _SHADOW_ENABLED = True # Phase 4 固定 True;Phase 6+ 由 migration_mode 控制 _DESTRUCTIVE_TOOL_KEYWORDS = frozenset({ "delete", "drop", "remove", "kill", "terminate", "destroy", "rollback", "revert", "patch", "apply", "exec", "execute", "restart", "scale", "cordon", "drain", }) # ───────────────────────────────────────────────────────────────────────────── # UUID v7(時間有序) # ───────────────────────────────────────────────────────────────────────────── def _uuid7() -> uuid.UUID: """ 生成 UUID v7(時間有序,適合資料庫 PK)。 格式:48-bit Unix timestamp ms + version(7) + 74-bit random """ now_ms = int(datetime.now(timezone.utc).timestamp() * 1000) rand_bits = int.from_bytes(uuid.uuid4().bytes[6:], "big") & 0x3FFFFFFFFFFFFFFF val = (now_ms << 80) | (0x7 << 76) | (0x8 << 72) | rand_bits return uuid.UUID(int=val) # ───────────────────────────────────────────────────────────────────────────── # W3C traceparent 生成 # ───────────────────────────────────────────────────────────────────────────── def _new_trace_id() -> str: """生成 W3C traceparent-compatible trace_id""" trace_id_hex = uuid.uuid4().hex + uuid.uuid4().hex[:16] # 128-bit span_id_hex = uuid.uuid4().hex[:16] # 64-bit return f"00-{trace_id_hex}-{span_id_hex}-01" # ───────────────────────────────────────────────────────────────────────────── # Idempotency # ───────────────────────────────────────────────────────────────────────────── _IDEMPOTENCY_REDIS_PREFIX = "awooop:run:idem:" _IDEMPOTENCY_REDIS_TTL = 86400 # 24h async def check_idempotency( project_id: str, channel_type: str, provider_event_id: str, ) -> uuid.UUID | None: """ 檢查 (project_id, channel_type, provider_event_id) 是否已有對應 run_id。 Layer 1:Redis NX(快速攔截,TTL 24h) Layer 2:PostgreSQL awooop_run_idempotency(準確) Returns: 既有 run_id,或 None(尚未處理) """ idem_key = f"{_IDEMPOTENCY_REDIS_PREFIX}{project_id}:{channel_type}:{provider_event_id}" # Layer 1: Redis try: from src.core.redis_client import get_redis redis = get_redis() cached = await redis.get(idem_key) if cached: run_id_str = cached.decode() if isinstance(cached, bytes) else cached logger.info( "idempotency_hit_redis", project_id=project_id, provider_event_id=provider_event_id, run_id=run_id_str, ) return uuid.UUID(run_id_str) except Exception as exc: logger.warning("idempotency_redis_check_failed", error=str(exc)) # Layer 2: PostgreSQL try: async with get_db_context(project_id) as db: result = await db.execute( select(AwoooPRunIdempotency.run_id).where( AwoooPRunIdempotency.project_id == project_id, AwoooPRunIdempotency.channel_type == channel_type, AwoooPRunIdempotency.provider_event_id == provider_event_id, ) ) row = result.fetchone() if row: return uuid.UUID(str(row[0])) except Exception as exc: logger.warning("idempotency_pg_check_failed", error=str(exc)) return None async def _register_idempotency( project_id: str, channel_type: str, provider_event_id: str, run_id: uuid.UUID, ) -> None: """寫入 idempotency 記錄(Redis + PostgreSQL)""" idem_key = f"{_IDEMPOTENCY_REDIS_PREFIX}{project_id}:{channel_type}:{provider_event_id}" run_id_str = str(run_id) # Redis NX(若已有其他 worker 寫入,NX 失敗,無害) try: from src.core.redis_client import get_redis redis = get_redis() await redis.set(idem_key, run_id_str, ex=_IDEMPOTENCY_REDIS_TTL, nx=True) except Exception as exc: logger.warning("idempotency_redis_write_failed", error=str(exc)) # PostgreSQL(INSERT OR IGNORE) try: async with get_db_context(project_id) as db: stmt = pg_insert(AwoooPRunIdempotency).values( project_id=project_id, channel_type=channel_type, provider_event_id=provider_event_id, run_id=run_id, ).on_conflict_do_nothing(constraint="uix_run_idempotency_key") await db.execute(stmt) except Exception as exc: logger.warning("idempotency_pg_write_failed", error=str(exc)) # ───────────────────────────────────────────────────────────────────────────── # Shadow destructive tool check # ───────────────────────────────────────────────────────────────────────────── def is_destructive_tool(tool_name: str, is_destructive_flag: bool = False) -> bool: """ 判斷 tool call 是否為破壞性操作。 Shadow mode 下一律攔截。 判斷邏輯: 1. MCP Gateway contract 的 is_destructive=True flag 2. tool_name 包含破壞性關鍵字(fallback,無 contract 時使用) """ if is_destructive_flag: return True tool_lower = tool_name.lower() return any(kw in tool_lower for kw in _DESTRUCTIVE_TOOL_KEYWORDS) # ───────────────────────────────────────────────────────────────────────────── # Run 建立 # ───────────────────────────────────────────────────────────────────────────── async def create_run( *, project_id: str, agent_id: str, trigger_type: str, trigger_ref: str | None = None, input_payload: dict[str, Any] | None = None, channel_type: str | None = None, provider_event_id: str | None = None, timeout_seconds: int = 600, ) -> tuple[uuid.UUID, bool]: """ 建立新 run(或返回既有 run,若重複事件)。 Returns: (run_id, is_duplicate) — is_duplicate=True 表示冪等命中 Shadow mode:is_shadow=True,不產生 user response。 """ # Idempotency 檢查 if channel_type and provider_event_id: existing_run_id = await check_idempotency(project_id, channel_type, provider_event_id) if existing_run_id: logger.info( "run_creation_idempotent", project_id=project_id, channel_type=channel_type, provider_event_id=provider_event_id, existing_run_id=str(existing_run_id), ) return existing_run_id, True run_id = _uuid7() trace_id = _new_trace_id() timeout_at = datetime.now(timezone.utc) + timedelta(seconds=timeout_seconds) # 計算 input_sha256 input_sha256 = None if input_payload: canonical = json.dumps(input_payload, sort_keys=True, separators=(",", ":")) input_sha256 = hashlib.sha256(canonical.encode()).hexdigest() async with get_db_context(project_id) as db: run = AwoooPRunState( run_id=run_id, project_id=project_id, agent_id=agent_id, state="pending", trace_id=trace_id, trigger_type=trigger_type, trigger_ref=trigger_ref, is_shadow=_SHADOW_ENABLED, input_sha256=input_sha256, timeout_at=timeout_at, ) db.add(run) # 寫入 idempotency 記錄 if channel_type and provider_event_id: await _register_idempotency(project_id, channel_type, provider_event_id, run_id) logger.info( "run_created", run_id=str(run_id), project_id=project_id, agent_id=agent_id, is_shadow=_SHADOW_ENABLED, trace_id=trace_id, trigger_type=trigger_type, ) return run_id, False # ───────────────────────────────────────────────────────────────────────────── # Shadow Execution(Phase 4 主邏輯) # ───────────────────────────────────────────────────────────────────────────── async def shadow_execute(run: AwoooPRunState) -> None: """ Shadow mode 執行一個 run。 Phase 4 行為: - 解析 agent contract(get_active()) - 執行 tool calls(全部攔截,不實際執行) - 記錄 step_journal - 完成後 COMPLETED(無 user response) Phase 6+ 才接真實 LLM + channel adapter。 """ run_id = run.run_id project_id = run.project_id agent_id = run.agent_id logger.info( "shadow_execute_start", run_id=str(run_id), project_id=project_id, agent_id=agent_id, ) try: # 解析 agent contract(取得工具清單) from src.services.contract_service import get_active_body agent_contract = await get_active_body( project_id=project_id, contract_family="agent", contract_id=agent_id, ) tools = agent_contract.get("tools", []) if agent_contract else [] # Shadow step journal:記錄每個工具會被攔截 step_seq = 0 async with get_db_context(project_id) as db: for tool in tools: tool_name = tool.get("tool_name", "unknown") blocked = is_destructive_tool(tool_name) step = AwoooPRunStepJournal( run_id=run_id, project_id=project_id, step_seq=step_seq, tool_name=tool_name, mcp_gateway_id=tool.get("mcp_gateway_id"), result_status="success" if not blocked else "pending", was_blocked=blocked, block_reason="shadow_mode_destructive_blocked" if blocked else None, ) db.add(step) step_seq += 1 # 完成 run(shadow mode:無 user response) await transition( run_id=run_id, project_id=project_id, to_state="completed", step_count_delta=step_seq, ) logger.info( "shadow_execute_completed", run_id=str(run_id), steps=step_seq, ) except Exception as exc: logger.exception( "shadow_execute_failed", run_id=str(run_id), error=str(exc), ) await transition( run_id=run_id, project_id=project_id, to_state="failed", error_code="E-RUN-001", error_detail=str(exc)[:500], ) async def get_run_status(run_id: uuid.UUID, project_id: str) -> dict[str, Any] | None: """ 查詢單一 run 的 FSM 狀態。回傳 None 表示不存在。 Router 層透過此 service 函數存取,不直接操作 DB。 """ async with get_db_context(project_id) as db: result = await db.execute( select(AwoooPRunState).where( AwoooPRunState.run_id == run_id, AwoooPRunState.project_id == project_id, ) ) run = result.scalar_one_or_none() if run is None: return None return { "run_id": str(run.run_id), "project_id": run.project_id, "agent_id": run.agent_id, "state": run.state, "is_shadow": run.is_shadow, "trace_id": run.trace_id, "attempt_count": run.attempt_count, "cost_usd": float(run.cost_usd), "step_count": run.step_count, "error_code": run.error_code, "created_at": run.created_at.isoformat() if run.created_at else None, "started_at": run.started_at.isoformat() if run.started_at else None, "completed_at": run.completed_at.isoformat() if run.completed_at else None, }