feat(awooop): 新增 truth-chain 查詢 API
All checks were successful
Code Review / ai-code-review (push) Successful in 10s
CD Pipeline / tests (push) Successful in 1m16s
CD Pipeline / build-and-deploy (push) Successful in 3m29s
CD Pipeline / post-deploy-checks (push) Successful in 1m19s

This commit is contained in:
Your Name
2026-05-12 22:55:36 +08:00
parent 56228dbb79
commit f7c84530d6
7 changed files with 777 additions and 0 deletions

View File

@@ -13,9 +13,11 @@ from src.api.v1.platform.events import router as events_router
from src.api.v1.platform.operator_runs import router as operator_runs_router
from src.api.v1.platform.runs import router as runs_router
from src.api.v1.platform.tenants import router as tenants_router
from src.api.v1.platform.truth_chain import router as truth_chain_router
router = APIRouter()
router.include_router(events_router)
router.include_router(truth_chain_router)
# 2026-05-06 Codex: FastAPI 依註冊順序比對路由。Operator Console 的
# `/runs/list` 必須排在 `/runs/{run_id}` 前面,否則 `list` 會被當成
# run_id造成前端 Run 監控頁 HTTP 422。

View File

@@ -0,0 +1,35 @@
"""AwoooP Operator Console — truth-chain read API."""
from __future__ import annotations
from typing import Any
from fastapi import APIRouter, Depends, Query
from src.core.awooop_operator_auth import (
AwoooPOperatorPrincipal,
verify_awooop_operator,
)
from src.services.awooop_truth_chain_service import fetch_truth_chain
router = APIRouter()
@router.get(
"/truth-chain/{source_id}",
summary="查詢 Telegram / Incident / Drift 真相鏈",
description=(
"T0 read-only endpoint. 聚合 incident、approval、evidence、MCP、"
"automation_operation_log、drift repeat state 與 outbound mirror"
"讓 Operator Console 能判斷 Telegram 卡片目前卡在哪個流程節點。"
),
)
async def get_truth_chain(
source_id: str,
project_id: str = Query("awoooi", description="租戶 ID"),
operator: AwoooPOperatorPrincipal = Depends(verify_awooop_operator),
) -> dict[str, Any]:
# operator dependency intentionally gates this read API even though the
# principal is not otherwise needed by the aggregation query.
_ = operator
return await fetch_truth_chain(source_id=source_id, project_id=project_id)

View File

@@ -0,0 +1,641 @@
"""AwoooP read-only truth chain aggregation.
T0 only: this service does not mutate incident, approval, execution, or channel
state. It stitches existing durable records into one operator-facing status so
Telegram cards can be audited without guessing which subsystem owns the truth.
"""
from __future__ import annotations
from datetime import date, datetime
from decimal import Decimal
from typing import Any
from uuid import UUID
import structlog
from sqlalchemy import text
from src.db.base import get_db_context
logger = structlog.get_logger(__name__)
_MAX_ROWS = 100
def _clean(value: Any) -> Any:
"""Convert DB values into JSON-friendly primitives."""
if isinstance(value, UUID):
return str(value)
if isinstance(value, (datetime, date)):
return value.isoformat()
if isinstance(value, Decimal):
return float(value)
if isinstance(value, dict):
return {str(k): _clean(v) for k, v in value.items()}
if isinstance(value, list):
return [_clean(v) for v in value]
return value
def _clean_row(row: Any) -> dict[str, Any]:
return {key: _clean(value) for key, value in dict(row).items()}
async def _fetch_all(db: Any, sql: str, params: dict[str, Any]) -> list[dict[str, Any]]:
result = await db.execute(text(sql), params)
return [_clean_row(row) for row in result.mappings().all()]
async def _fetch_one(
db: Any,
sql: str,
params: dict[str, Any],
) -> dict[str, Any] | None:
result = await db.execute(text(sql), params)
row = result.mappings().first()
return _clean_row(row) if row else None
def _source_type(source_id: str, incident: dict[str, Any] | None, drift: dict[str, Any] | None) -> str:
if incident is not None:
return "incident"
if drift is not None:
return "drift_report"
try:
UUID(source_id)
except ValueError:
return "unknown"
return "run"
def _failed_mcp_tools(evidence_rows: list[dict[str, Any]]) -> list[str]:
failed: set[str] = set()
for evidence in evidence_rows:
mcp_health = evidence.get("mcp_health")
if isinstance(mcp_health, dict):
failed.update(str(tool) for tool, ok in mcp_health.items() if ok is False)
return sorted(failed)
def _approval_ids(approvals: list[dict[str, Any]]) -> list[str]:
return [str(row["id"]) for row in approvals if row.get("id")]
def _operation_ids(automation_ops: list[dict[str, Any]]) -> list[str]:
return [str(row["op_id"]) for row in automation_ops if row.get("op_id")]
def _truth_status(
*,
incident: dict[str, Any] | None,
approvals: list[dict[str, Any]],
evidence_rows: list[dict[str, Any]],
automation_ops: list[dict[str, Any]],
drift: dict[str, Any] | None,
drift_repeat_count: int,
gateway_mcp_total: int,
legacy_mcp_total: int,
outbound_visible_total: int,
) -> dict[str, Any]:
"""Derive the current operator-visible truth-chain stage."""
blockers: list[str] = []
needs_human = False
stage = "not_found"
stage_status = "missing"
if drift is not None:
stage = "dedup_or_repeat_updated" if drift_repeat_count > 1 else "received"
stage_status = str(drift.get("status") or "unknown")
interpretation = drift.get("interpretation") or {}
confidence = interpretation.get("confidence") if isinstance(interpretation, dict) else None
if stage_status == "pending":
needs_human = True
blockers.append("drift_report_pending_without_resolution")
if confidence in (0, 0.0):
blockers.append("drift_ai_confidence_zero")
if incident is not None:
incident_status = str(incident.get("status") or "unknown")
stage = "received"
stage_status = incident_status.lower()
if incident_status in {"RESOLVED", "CLOSED"}:
stage = "resolved"
stage_status = "success"
elif evidence_rows:
attempted = sum(int(row.get("sensors_attempted") or 0) for row in evidence_rows)
succeeded = sum(int(row.get("sensors_succeeded") or 0) for row in evidence_rows)
if attempted > 0 and succeeded == 0:
stage = "evidence_degraded"
stage_status = "warning"
needs_human = True
blockers.append("all_evidence_sensors_failed")
else:
stage = "evidence_collected"
stage_status = "completed"
approval_statuses = {str(row.get("status") or "").upper() for row in approvals}
approval_actions = " ".join(str(row.get("action") or "") for row in approvals).upper()
if any(status in {"PENDING", "WAITING_APPROVAL"} for status in approval_statuses):
stage = "approval_required"
stage_status = "waiting"
needs_human = True
elif "APPROVED" in approval_statuses and not automation_ops:
if "NO_ACTION" in approval_actions:
stage = "manual_required"
stage_status = "blocked"
needs_human = True
blockers.append("approval_resolved_no_action_without_execution")
else:
stage = "execution_missing"
stage_status = "blocked"
needs_human = True
blockers.append("approved_without_execution_record")
op_statuses = {str(row.get("status") or "").lower() for row in automation_ops}
if op_statuses:
if op_statuses & {"success", "completed"}:
stage = "execution_succeeded"
stage_status = "success"
elif op_statuses & {"failed", "error"}:
stage = "execution_failed"
stage_status = "error"
needs_human = True
else:
stage = "execution_started"
stage_status = "running"
if incident_status == "INVESTIGATING" and automation_ops == [] and approvals:
blockers.append("incident_still_investigating_after_approval")
if gateway_mcp_total == 0:
blockers.append("awooop_mcp_gateway_audit_empty")
if legacy_mcp_total == 0 and incident is not None:
blockers.append("legacy_mcp_audit_missing")
if outbound_visible_total == 0:
blockers.append("outbound_mirror_not_visible_for_source")
return {
"current_stage": stage,
"stage_status": stage_status,
"needs_human": needs_human,
"blockers": blockers,
}
def _summarize_mcp(rows: list[dict[str, Any]]) -> dict[str, Any]:
by_tool: dict[str, dict[str, Any]] = {}
success_count = 0
failure_count = 0
for row in rows:
key = f"{row.get('mcp_server') or 'unknown'}:{row.get('tool_name') or 'unknown'}"
item = by_tool.setdefault(
key,
{
"mcp_server": row.get("mcp_server"),
"tool_name": row.get("tool_name"),
"success": 0,
"failed": 0,
"last_error": None,
},
)
if row.get("success") is True:
item["success"] += 1
success_count += 1
else:
item["failed"] += 1
failure_count += 1
item["last_error"] = row.get("error_message")
return {
"total": len(rows),
"success": success_count,
"failed": failure_count,
"by_tool": list(by_tool.values()),
}
async def fetch_truth_chain(source_id: str, project_id: str = "awoooi") -> dict[str, Any]:
"""Return a read-only truth chain for an incident, drift report, or run id."""
async with get_db_context(project_id) as db:
incident = await _fetch_one(
db,
"""
SELECT
incident_id,
project_id,
status::text AS status,
severity::text AS severity,
alertname,
alert_category,
notification_type,
created_at,
updated_at,
resolved_at,
verification_result,
frequency_snapshot,
decision_chain
FROM incidents
WHERE incident_id = :source_id
AND (project_id = :project_id OR project_id IS NULL)
""",
{"source_id": source_id, "project_id": project_id},
)
drift = await _fetch_one(
db,
"""
SELECT
report_id,
namespace,
status,
triggered_by,
high_count,
medium_count,
info_count,
scanned_at,
created_at,
resolved_at,
interpretation,
narrative_text
FROM drift_reports
WHERE report_id = :source_id
""",
{"source_id": source_id},
)
runs = await _fetch_all(
db,
"""
SELECT
run_id,
project_id,
agent_id,
state,
trigger_type,
trigger_ref,
is_shadow,
step_count,
created_at,
started_at,
completed_at,
error_code,
error_detail
FROM awooop_run_state
WHERE project_id = :project_id
AND (run_id::text = :source_id OR trigger_ref = :source_id)
ORDER BY created_at DESC
LIMIT :limit
""",
{"source_id": source_id, "project_id": project_id, "limit": _MAX_ROWS},
)
approvals: list[dict[str, Any]] = []
evidence_rows: list[dict[str, Any]] = []
timeline_events: list[dict[str, Any]] = []
legacy_mcp_rows: list[dict[str, Any]] = []
automation_ops: list[dict[str, Any]] = []
km_entries: list[dict[str, Any]] = []
if incident is not None:
incident_id = str(incident["incident_id"])
approvals = await _fetch_all(
db,
"""
SELECT
id,
incident_id,
status::text AS status,
risk_level::text AS risk_level,
action,
description,
hit_count,
created_at,
updated_at,
resolved_at,
matched_playbook_id,
extra_metadata,
decision_fusion_details
FROM approval_records
WHERE incident_id = :incident_id
ORDER BY created_at DESC
LIMIT :limit
""",
{"incident_id": incident_id, "limit": _MAX_ROWS},
)
approval_ids = _approval_ids(approvals)
evidence_rows = await _fetch_all(
db,
"""
SELECT
id,
incident_id,
matched_playbook_id,
collected_at,
collection_duration_ms,
sensors_attempted,
sensors_succeeded,
evidence_summary,
metrics_snapshot,
mcp_health,
verification_result,
pre_execution_state,
post_execution_state,
self_healing_score,
self_healing_detail
FROM incident_evidence
WHERE incident_id = :incident_id
ORDER BY collected_at DESC
LIMIT :limit
""",
{"incident_id": incident_id, "limit": _MAX_ROWS},
)
timeline_events = await _fetch_all(
db,
"""
SELECT
id,
event_type,
status,
title,
description,
actor,
actor_role,
risk_level,
approval_id,
incident_id,
created_at
FROM timeline_events
WHERE incident_id = :incident_id
OR approval_id = ANY(:approval_ids)
ORDER BY created_at ASC
LIMIT :limit
""",
{
"incident_id": incident_id,
"approval_ids": approval_ids or ["__none__"],
"limit": _MAX_ROWS,
},
)
legacy_mcp_rows = await _fetch_all(
db,
"""
SELECT
id,
session_id,
flywheel_node,
mcp_server,
tool_name,
duration_ms,
success,
error_message,
incident_id,
agent_role,
created_at
FROM mcp_audit_log
WHERE incident_id = :incident_id
ORDER BY created_at ASC
LIMIT :limit
""",
{"incident_id": incident_id, "limit": _MAX_ROWS},
)
automation_ops = await _fetch_all(
db,
"""
SELECT
op_id,
operation_type,
status,
incident_id,
run_id,
actor,
dry_run_result,
error,
duration_ms,
tags,
created_at
FROM automation_operation_log
WHERE coalesce(input::text, '') LIKE :needle
OR coalesce(output::text, '') LIKE :needle
OR coalesce(array_to_string(tags, ','), '') LIKE :needle
ORDER BY created_at DESC
LIMIT :limit
""",
{"needle": f"%{incident_id}%", "limit": _MAX_ROWS},
)
km_entries = await _fetch_all(
db,
"""
SELECT
id,
title,
entry_type::text AS entry_type,
status::text AS status,
related_incident_id,
related_approval_id,
created_at
FROM knowledge_entries
WHERE related_incident_id = :incident_id
ORDER BY created_at DESC
LIMIT :limit
""",
{"incident_id": incident_id, "limit": _MAX_ROWS},
)
drift_repeats: dict[str, Any] = {
"occurrences_12h": 0,
"first_scanned_at": None,
"last_scanned_at": None,
"reports": [],
}
if drift is not None:
repeat_summary = await _fetch_one(
db,
"""
SELECT
count(*) AS occurrences_12h,
min(scanned_at) AS first_scanned_at,
max(scanned_at) AS last_scanned_at
FROM drift_reports
WHERE created_at > now() - interval '12 hours'
AND namespace = :namespace
AND status = :status
AND high_count = :high_count
AND medium_count = :medium_count
AND info_count = :info_count
""",
{
"namespace": drift["namespace"],
"status": drift["status"],
"high_count": drift["high_count"],
"medium_count": drift["medium_count"],
"info_count": drift["info_count"],
},
)
repeat_reports = await _fetch_all(
db,
"""
SELECT report_id, scanned_at, created_at, status, interpretation, narrative_text
FROM drift_reports
WHERE created_at > now() - interval '12 hours'
AND namespace = :namespace
AND status = :status
AND high_count = :high_count
AND medium_count = :medium_count
AND info_count = :info_count
ORDER BY scanned_at DESC
LIMIT 20
""",
{
"namespace": drift["namespace"],
"status": drift["status"],
"high_count": drift["high_count"],
"medium_count": drift["medium_count"],
"info_count": drift["info_count"],
},
)
drift_repeats = {
**(repeat_summary or {}),
"reports": repeat_reports,
}
gateway_mcp_rows = await _fetch_all(
db,
"""
SELECT
call_id,
project_id,
run_id,
trace_id,
agent_id,
tool_name,
result_status,
block_gate,
block_reason,
latency_ms,
created_at
FROM awooop_mcp_gateway_audit
WHERE project_id = :project_id
AND (
trace_id = :source_id
OR run_id::text = :source_id
)
ORDER BY created_at ASC
LIMIT :limit
""",
{"source_id": source_id, "project_id": project_id, "limit": _MAX_ROWS},
)
outbound_rows = await _fetch_all(
db,
"""
SELECT
message_id,
project_id,
run_id,
channel_type,
message_type,
content_hash,
content_preview,
provider_message_id,
send_status,
queued_at,
sent_at,
triggered_by_state
FROM awooop_outbound_message
WHERE project_id = :project_id
AND (
run_id::text = :source_id
OR content_preview ILIKE :needle
)
ORDER BY queued_at DESC
LIMIT :limit
""",
{
"source_id": source_id,
"project_id": project_id,
"needle": f"%{source_id}%",
"limit": _MAX_ROWS,
},
)
source_type = _source_type(source_id, incident, drift)
legacy_mcp_summary = _summarize_mcp(legacy_mcp_rows)
truth_status = _truth_status(
incident=incident,
approvals=approvals,
evidence_rows=evidence_rows,
automation_ops=automation_ops,
drift=drift,
drift_repeat_count=int(drift_repeats.get("occurrences_12h") or 0),
gateway_mcp_total=len(gateway_mcp_rows),
legacy_mcp_total=legacy_mcp_summary["total"],
outbound_visible_total=len(outbound_rows),
)
evidence_totals = {
"records": len(evidence_rows),
"sensors_attempted": sum(int(row.get("sensors_attempted") or 0) for row in evidence_rows),
"sensors_succeeded": sum(int(row.get("sensors_succeeded") or 0) for row in evidence_rows),
"failed_tools": _failed_mcp_tools(evidence_rows),
}
result = {
"project_id": project_id,
"source_id": source_id,
"source_type": source_type,
"found": incident is not None or drift is not None or bool(runs),
"truth_status": truth_status,
"linked_ids": {
"incident_id": incident.get("incident_id") if incident else None,
"approval_ids": _approval_ids(approvals),
"run_ids": [row["run_id"] for row in runs],
"drift_report_id": drift.get("report_id") if drift else None,
"operation_ids": _operation_ids(automation_ops),
},
"incident": incident,
"drift": {
"report": drift,
"repeat_state": drift_repeats,
},
"approvals": approvals,
"evidence": {
"summary": evidence_totals,
"records": evidence_rows,
},
"mcp": {
"awooop_gateway": {
"total": len(gateway_mcp_rows),
"records": gateway_mcp_rows,
},
"legacy": {
**legacy_mcp_summary,
"records": legacy_mcp_rows,
},
},
"execution": {
"automation_operation_log": automation_ops,
"ansible": {
"considered": False,
"records": [],
"not_used_reason": "no first-class Ansible executor audit record in current truth chain",
},
},
"learning": {
"knowledge_entries": km_entries,
},
"timeline_events": timeline_events,
"channel": {
"outbound_messages_visible": len(outbound_rows),
"outbound_messages": outbound_rows,
"visibility_note": (
"If this is zero while Telegram delivered a card, the outbound mirror "
"or RLS project context is not a reliable source of truth yet."
),
},
}
logger.info(
"awooop_truth_chain_fetched",
project_id=project_id,
source_id=source_id,
source_type=source_type,
current_stage=truth_status["current_stage"],
)
return result

View File

@@ -0,0 +1,48 @@
from __future__ import annotations
from src.services.awooop_truth_chain_service import _truth_status
def test_truth_status_marks_no_action_approval_as_manual_required() -> None:
status = _truth_status(
incident={"incident_id": "INC-1", "status": "INVESTIGATING"},
approvals=[{"status": "APPROVED", "action": "未知操作 | NO_ACTION"}],
evidence_rows=[{"sensors_attempted": 8, "sensors_succeeded": 0}],
automation_ops=[],
drift=None,
drift_repeat_count=0,
gateway_mcp_total=0,
legacy_mcp_total=8,
outbound_visible_total=0,
)
assert status["current_stage"] == "manual_required"
assert status["stage_status"] == "blocked"
assert status["needs_human"] is True
assert "approval_resolved_no_action_without_execution" in status["blockers"]
assert "all_evidence_sensors_failed" in status["blockers"]
assert "awooop_mcp_gateway_audit_empty" in status["blockers"]
def test_truth_status_marks_repeated_pending_drift_as_human_needed() -> None:
status = _truth_status(
incident=None,
approvals=[],
evidence_rows=[],
automation_ops=[],
drift={
"report_id": "7f858956",
"status": "pending",
"interpretation": {"confidence": 0.0},
},
drift_repeat_count=12,
gateway_mcp_total=0,
legacy_mcp_total=0,
outbound_visible_total=0,
)
assert status["current_stage"] == "dedup_or_repeat_updated"
assert status["stage_status"] == "pending"
assert status["needs_human"] is True
assert "drift_report_pending_without_resolution" in status["blockers"]
assert "drift_ai_confidence_zero" in status["blockers"]

View File

@@ -25,3 +25,13 @@ def test_recent_events_route_is_registered() -> None:
]
assert "/events/recent" in paths
def test_truth_chain_route_is_registered() -> None:
paths = [
route.path
for route in router.routes
if "GET" in getattr(route, "methods", set())
]
assert "/truth-chain/{source_id}" in paths

View File

@@ -1,3 +1,24 @@
## 2026-05-12 | Truth-chain T0 read-only API 第一版
**背景**:完成 Telegram / AwoooP truth-chain live audit 後,下一步先做不改 runtime 的 T0 查詢端點,避免再只靠 Telegram 文案或人工 SQL 判斷流程卡點。
**本次實作**
- 新增 `GET /api/v1/platform/truth-chain/{source_id}`,沿用 AwoooP Operator Console auth。
- 新增 `apps/api/src/services/awooop_truth_chain_service.py`read-only 聚合 incident、drift、approval、evidence、legacy MCP、AwoooP MCP Gateway、automation_operation_log、KM、timeline、outbound mirror。
- 對 B6C589 這類狀態矛盾,`truth_status` 會回 `manual_required` / `blocked` 並列出 blockers例如 evidence sensors 全失敗、NO_ACTION 無 execution、AwoooP MCP Gateway audit 為空。
- 對 Config Drift 這類重複 pending`truth_status` 會回 `dedup_or_repeat_updated` / `pending`,並帶 12h repeat state。
- Ansible 目前先明確回 `not_used_reason`,避免誤以為 AI 已把 Ansible 納入 first-class executor。
**驗證**
- `python -m py_compile apps/api/src/services/awooop_truth_chain_service.py apps/api/src/api/v1/platform/truth_chain.py` 通過。
- `DATABASE_URL='postgresql+asyncpg://awoooi:awoooi_test_2026@localhost:5432/awoooi_test?ssl=disable' python -m pytest apps/api/tests/test_awooop_truth_chain_service.py apps/api/tests/test_platform_router_order.py apps/api/tests/test_awooop_operator_auth.py`10 passed。
**整體進度**
- Wave 0完成並已推版。
- Wave 1RLS/通知治理到 Wave1.3 完成並已推版outbound app-role 可見性列為新紅燈。
- Truth-chain T0live audit、MASTER 收斂、read-only API 第一版完成;待推版與 production smoke。
- 下一步:推 Gitea main等部署後用 `INC-20260512-B6C589``7f858956` 打 live endpoint smoke再進 T1 Channel Event hardening。
## 2026-05-12 | Telegram / AwoooP AI 自動化真相鏈 live audit
**背景**:統帥貼出 Telegram 低風險與 Config Drift 卡片,指出目前無法從訊息判斷是否重複、跑到哪個流程、是否真的 AI 自動修復、是否需要人工,以及 MCP / 自建 MCP / Ansible / Sentry / SignOz 是否實際參與。

View File

@@ -366,6 +366,8 @@ source_event_received
| T4 Drift fingerprint FSM | Drift 不再每小時新增不可辨識卡;同 fingerprint 更新 repeat_count / status / PR / adopt / rollback / ignore | `awoooi-prod HIGH=1 MEDIUM=30` 同組 12h 只更新同一 truth chain |
| T5 Incident status reconciliation | approval、incident、evidence、execution、timeline 狀態一致NO_ACTION 必須是明確 manual_required 或 resolved(noop) | B6C589 類事件不得再出現 incident investigating + approval approved/resolved + no execution/verification 的矛盾 |
**T0 first implementation2026-05-12 22:50 台北)**:新增 read-only `GET /api/v1/platform/truth-chain/{source_id}`,由 Operator Console auth 保護,聚合 incident / drift / approval / evidence / legacy MCP / AwoooP MCP Gateway / automation_operation_log / KM / timeline / outbound mirror。此 endpoint 只揭露現況與缺口,不改任何 incident、approval、execution 或 Telegram state。
**當前紅線**:在 T0-T2 未完成前,任何「中低風險告警已有 AI 自動修復」都只能逐案查證不能全域宣稱。Telegram 卡片必須誠實顯示 degraded / failed / pending_human而不是只顯示 AI 研判摘要。
---
@@ -1818,6 +1820,24 @@ Phase 6 完成後
---
### 2026-05-12 晚 (台北) — T0 truth-chain read API — 第一版查詢端點落地
**範圍**
- 新增 `services/awooop_truth_chain_service.py`read-only 聚合 incident / drift / approval / evidence / legacy MCP / AwoooP MCP Gateway / automation_operation_log / KM / timeline / outbound mirror。
- 新增 `api/v1/platform/truth_chain.py``GET /api/v1/platform/truth-chain/{source_id}`,沿用 Operator Console auth。
- `api/v1/platform/__init__.py` 掛載 router`test_platform_router_order.py` 補 route 註冊檢查;新增 `_truth_status` 單元測試。
**設計邊界**
- 只讀,不修改 incident、approval、execution、Telegram 或 drift 狀態。
- 若 outbound mirror 在 RLS context 下不可見API 會明確回傳 visibility note不假裝資料存在。
- Ansible 在 truth-chain 中先標示為 `not_used_reason`,直到 T3 declarative executor 有 first-class audit record。
**驗證**
- `python -m py_compile apps/api/src/services/awooop_truth_chain_service.py apps/api/src/api/v1/platform/truth_chain.py` 通過。
- `DATABASE_URL='postgresql+asyncpg://awoooi:awoooi_test_2026@localhost:5432/awoooi_test?ssl=disable' python -m pytest apps/api/tests/test_awooop_truth_chain_service.py apps/api/tests/test_platform_router_order.py apps/api/tests/test_awooop_operator_auth.py`10 passed。
---
### 2026-04-20 晚 (台北) — C1-C4 全流程串接 — Playbook 鏈路保護commit de2d34d
**觸發**:統帥全景盤查 AI 自動化節點後,發現 Playbook 自動修復鏈路有 3 個結構性斷點。