fix(awooop): authenticate approval decisions
All checks were successful
Code Review / ai-code-review (push) Successful in 11s
CD Pipeline / tests (push) Successful in 1m3s
CD Pipeline / build-and-deploy (push) Successful in 3m28s
CD Pipeline / post-deploy-checks (push) Successful in 1m25s

This commit is contained in:
OG T
2026-05-06 13:05:51 +08:00
parent e6eae5cdc4
commit c696b99ccf
9 changed files with 294 additions and 6 deletions

View File

@@ -15,9 +15,13 @@ from decimal import Decimal
from typing import Any, Literal
from uuid import UUID
from fastapi import APIRouter, Query
from fastapi import APIRouter, Depends, Query
from pydantic import BaseModel, Field
from src.core.awooop_operator_auth import (
AwoooPOperatorPrincipal,
verify_awooop_operator,
)
from src.services.platform_operator_service import (
decide_approval as decide_approval_svc,
list_approvals as list_approvals_svc,
@@ -65,7 +69,10 @@ class ListApprovalsResponse(BaseModel):
class DecideApprovalRequest(BaseModel):
project_id: str = Field(..., description="租戶 ID")
decision: Literal["approve", "reject"] = Field(..., description="核准或拒絕")
approver_id: str = Field(..., description="審核人 IDplatform_subject_id 或 operator email")
approver_id: str | None = Field(
default=None,
description="Deprecated. Ignored; approver comes from trusted operator headers.",
)
reason: str | None = Field(None, description="決策原因(可選)")
@@ -127,11 +134,12 @@ async def list_approvals(
async def decide_approval(
run_id: str,
body: DecideApprovalRequest,
operator: AwoooPOperatorPrincipal = Depends(verify_awooop_operator),
) -> dict[str, Any]:
return await decide_approval_svc(
run_id=run_id,
project_id=body.project_id,
decision=body.decision,
approver_id=body.approver_id,
approver_id=operator.operator_id,
reason=body.reason,
)

View File

@@ -0,0 +1,126 @@
"""
AwoooP Operator authentication boundary.
ADR-116 Gate 5 approval decisions must not trust browser-supplied identities.
This module accepts a short-lived operator identity only when it is paired with
the server-side AwoooP operator key.
"""
from __future__ import annotations
import re
import secrets
from dataclasses import dataclass
from typing import Annotated
import structlog
from fastapi import Header, HTTPException, status
from src.core.config import settings
logger = structlog.get_logger(__name__)
_OPERATOR_ID_RE = re.compile(r"^[A-Za-z0-9][A-Za-z0-9_.:@-]{1,127}$")
_PROD_ENVS = {"prod", "production"}
@dataclass(frozen=True, slots=True)
class AwoooPOperatorPrincipal:
"""Authenticated AwoooP operator principal."""
operator_id: str
auth_method: str
def _auth_error(detail: str = "Operator authentication required") -> HTTPException:
return HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=detail)
def _clean_operator_id(operator_id: str | None) -> str:
if operator_id is None:
raise _auth_error()
cleaned = operator_id.strip()
if not _OPERATOR_ID_RE.fullmatch(cleaned):
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
detail="Invalid operator identity",
)
return cleaned
def authenticate_awooop_operator_headers(
operator_id: str | None,
operator_key: str | None,
*,
configured_key: str | None = None,
environment: str | None = None,
) -> AwoooPOperatorPrincipal:
"""Validate trusted AwoooP operator headers.
Args:
operator_id: Value from ``X-AwoooP-Operator-Id``.
operator_key: Value from ``X-AwoooP-Operator-Key``.
configured_key: Server-side shared key. Defaults to settings.
environment: Runtime environment. Defaults to settings.
Returns:
Authenticated operator principal.
Raises:
HTTPException: 401 when authentication is missing/invalid, or 422 for
malformed operator identity.
"""
cleaned_operator_id = _clean_operator_id(operator_id)
expected_key = (
settings.AWOOOP_OPERATOR_API_KEY
if configured_key is None
else configured_key
)
runtime_env = (environment or settings.ENVIRONMENT or "").lower()
if not expected_key:
if runtime_env in _PROD_ENVS:
logger.critical(
"awooop_operator_key_missing_in_production",
environment=runtime_env,
)
raise _auth_error()
logger.warning(
"awooop_operator_key_skipped_dev_only",
environment=runtime_env,
operator_id=cleaned_operator_id,
)
return AwoooPOperatorPrincipal(
operator_id=cleaned_operator_id,
auth_method="dev_header",
)
if not operator_key:
logger.warning("awooop_operator_key_missing", operator_id=cleaned_operator_id)
raise _auth_error()
if not secrets.compare_digest(operator_key, expected_key):
logger.warning("awooop_operator_key_invalid", operator_id=cleaned_operator_id)
raise _auth_error()
return AwoooPOperatorPrincipal(
operator_id=cleaned_operator_id,
auth_method="operator_api_key",
)
async def verify_awooop_operator(
x_awooop_operator_id: Annotated[
str | None,
Header(alias="X-AwoooP-Operator-Id"),
] = None,
x_awooop_operator_key: Annotated[
str | None,
Header(alias="X-AwoooP-Operator-Key"),
] = None,
) -> AwoooPOperatorPrincipal:
"""FastAPI dependency for operator mutation endpoints."""
return authenticate_awooop_operator_headers(
operator_id=x_awooop_operator_id,
operator_key=x_awooop_operator_key,
)

View File

@@ -602,6 +602,13 @@ class Settings(BaseSettings):
default="",
description="API Key for K8s admin endpoints (X-K8s-Api-Key header)",
)
AWOOOP_OPERATOR_API_KEY: str = Field(
default="",
description=(
"API key for AwoooP operator mutation endpoints "
"(X-AwoooP-Operator-Key header)"
),
)
# ==========================================================================
# 統帥鐵律:禁止 SQLite (AWOOOI 憲法)