feat(awooop): 新增 truth-chain 查詢 API
This commit is contained in:
@@ -13,9 +13,11 @@ from src.api.v1.platform.events import router as events_router
|
||||
from src.api.v1.platform.operator_runs import router as operator_runs_router
|
||||
from src.api.v1.platform.runs import router as runs_router
|
||||
from src.api.v1.platform.tenants import router as tenants_router
|
||||
from src.api.v1.platform.truth_chain import router as truth_chain_router
|
||||
|
||||
router = APIRouter()
|
||||
router.include_router(events_router)
|
||||
router.include_router(truth_chain_router)
|
||||
# 2026-05-06 Codex: FastAPI 依註冊順序比對路由。Operator Console 的
|
||||
# `/runs/list` 必須排在 `/runs/{run_id}` 前面,否則 `list` 會被當成
|
||||
# run_id,造成前端 Run 監控頁 HTTP 422。
|
||||
|
||||
35
apps/api/src/api/v1/platform/truth_chain.py
Normal file
35
apps/api/src/api/v1/platform/truth_chain.py
Normal file
@@ -0,0 +1,35 @@
|
||||
"""AwoooP Operator Console — truth-chain read API."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
|
||||
from fastapi import APIRouter, Depends, Query
|
||||
|
||||
from src.core.awooop_operator_auth import (
|
||||
AwoooPOperatorPrincipal,
|
||||
verify_awooop_operator,
|
||||
)
|
||||
from src.services.awooop_truth_chain_service import fetch_truth_chain
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
@router.get(
|
||||
"/truth-chain/{source_id}",
|
||||
summary="查詢 Telegram / Incident / Drift 真相鏈",
|
||||
description=(
|
||||
"T0 read-only endpoint. 聚合 incident、approval、evidence、MCP、"
|
||||
"automation_operation_log、drift repeat state 與 outbound mirror,"
|
||||
"讓 Operator Console 能判斷 Telegram 卡片目前卡在哪個流程節點。"
|
||||
),
|
||||
)
|
||||
async def get_truth_chain(
|
||||
source_id: str,
|
||||
project_id: str = Query("awoooi", description="租戶 ID"),
|
||||
operator: AwoooPOperatorPrincipal = Depends(verify_awooop_operator),
|
||||
) -> dict[str, Any]:
|
||||
# operator dependency intentionally gates this read API even though the
|
||||
# principal is not otherwise needed by the aggregation query.
|
||||
_ = operator
|
||||
return await fetch_truth_chain(source_id=source_id, project_id=project_id)
|
||||
641
apps/api/src/services/awooop_truth_chain_service.py
Normal file
641
apps/api/src/services/awooop_truth_chain_service.py
Normal file
@@ -0,0 +1,641 @@
|
||||
"""AwoooP read-only truth chain aggregation.
|
||||
|
||||
T0 only: this service does not mutate incident, approval, execution, or channel
|
||||
state. It stitches existing durable records into one operator-facing status so
|
||||
Telegram cards can be audited without guessing which subsystem owns the truth.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import date, datetime
|
||||
from decimal import Decimal
|
||||
from typing import Any
|
||||
from uuid import UUID
|
||||
|
||||
import structlog
|
||||
from sqlalchemy import text
|
||||
|
||||
from src.db.base import get_db_context
|
||||
|
||||
logger = structlog.get_logger(__name__)
|
||||
|
||||
_MAX_ROWS = 100
|
||||
|
||||
|
||||
def _clean(value: Any) -> Any:
|
||||
"""Convert DB values into JSON-friendly primitives."""
|
||||
if isinstance(value, UUID):
|
||||
return str(value)
|
||||
if isinstance(value, (datetime, date)):
|
||||
return value.isoformat()
|
||||
if isinstance(value, Decimal):
|
||||
return float(value)
|
||||
if isinstance(value, dict):
|
||||
return {str(k): _clean(v) for k, v in value.items()}
|
||||
if isinstance(value, list):
|
||||
return [_clean(v) for v in value]
|
||||
return value
|
||||
|
||||
|
||||
def _clean_row(row: Any) -> dict[str, Any]:
|
||||
return {key: _clean(value) for key, value in dict(row).items()}
|
||||
|
||||
|
||||
async def _fetch_all(db: Any, sql: str, params: dict[str, Any]) -> list[dict[str, Any]]:
|
||||
result = await db.execute(text(sql), params)
|
||||
return [_clean_row(row) for row in result.mappings().all()]
|
||||
|
||||
|
||||
async def _fetch_one(
|
||||
db: Any,
|
||||
sql: str,
|
||||
params: dict[str, Any],
|
||||
) -> dict[str, Any] | None:
|
||||
result = await db.execute(text(sql), params)
|
||||
row = result.mappings().first()
|
||||
return _clean_row(row) if row else None
|
||||
|
||||
|
||||
def _source_type(source_id: str, incident: dict[str, Any] | None, drift: dict[str, Any] | None) -> str:
|
||||
if incident is not None:
|
||||
return "incident"
|
||||
if drift is not None:
|
||||
return "drift_report"
|
||||
try:
|
||||
UUID(source_id)
|
||||
except ValueError:
|
||||
return "unknown"
|
||||
return "run"
|
||||
|
||||
|
||||
def _failed_mcp_tools(evidence_rows: list[dict[str, Any]]) -> list[str]:
|
||||
failed: set[str] = set()
|
||||
for evidence in evidence_rows:
|
||||
mcp_health = evidence.get("mcp_health")
|
||||
if isinstance(mcp_health, dict):
|
||||
failed.update(str(tool) for tool, ok in mcp_health.items() if ok is False)
|
||||
return sorted(failed)
|
||||
|
||||
|
||||
def _approval_ids(approvals: list[dict[str, Any]]) -> list[str]:
|
||||
return [str(row["id"]) for row in approvals if row.get("id")]
|
||||
|
||||
|
||||
def _operation_ids(automation_ops: list[dict[str, Any]]) -> list[str]:
|
||||
return [str(row["op_id"]) for row in automation_ops if row.get("op_id")]
|
||||
|
||||
|
||||
def _truth_status(
|
||||
*,
|
||||
incident: dict[str, Any] | None,
|
||||
approvals: list[dict[str, Any]],
|
||||
evidence_rows: list[dict[str, Any]],
|
||||
automation_ops: list[dict[str, Any]],
|
||||
drift: dict[str, Any] | None,
|
||||
drift_repeat_count: int,
|
||||
gateway_mcp_total: int,
|
||||
legacy_mcp_total: int,
|
||||
outbound_visible_total: int,
|
||||
) -> dict[str, Any]:
|
||||
"""Derive the current operator-visible truth-chain stage."""
|
||||
blockers: list[str] = []
|
||||
needs_human = False
|
||||
stage = "not_found"
|
||||
stage_status = "missing"
|
||||
|
||||
if drift is not None:
|
||||
stage = "dedup_or_repeat_updated" if drift_repeat_count > 1 else "received"
|
||||
stage_status = str(drift.get("status") or "unknown")
|
||||
interpretation = drift.get("interpretation") or {}
|
||||
confidence = interpretation.get("confidence") if isinstance(interpretation, dict) else None
|
||||
if stage_status == "pending":
|
||||
needs_human = True
|
||||
blockers.append("drift_report_pending_without_resolution")
|
||||
if confidence in (0, 0.0):
|
||||
blockers.append("drift_ai_confidence_zero")
|
||||
|
||||
if incident is not None:
|
||||
incident_status = str(incident.get("status") or "unknown")
|
||||
stage = "received"
|
||||
stage_status = incident_status.lower()
|
||||
if incident_status in {"RESOLVED", "CLOSED"}:
|
||||
stage = "resolved"
|
||||
stage_status = "success"
|
||||
elif evidence_rows:
|
||||
attempted = sum(int(row.get("sensors_attempted") or 0) for row in evidence_rows)
|
||||
succeeded = sum(int(row.get("sensors_succeeded") or 0) for row in evidence_rows)
|
||||
if attempted > 0 and succeeded == 0:
|
||||
stage = "evidence_degraded"
|
||||
stage_status = "warning"
|
||||
needs_human = True
|
||||
blockers.append("all_evidence_sensors_failed")
|
||||
else:
|
||||
stage = "evidence_collected"
|
||||
stage_status = "completed"
|
||||
|
||||
approval_statuses = {str(row.get("status") or "").upper() for row in approvals}
|
||||
approval_actions = " ".join(str(row.get("action") or "") for row in approvals).upper()
|
||||
if any(status in {"PENDING", "WAITING_APPROVAL"} for status in approval_statuses):
|
||||
stage = "approval_required"
|
||||
stage_status = "waiting"
|
||||
needs_human = True
|
||||
elif "APPROVED" in approval_statuses and not automation_ops:
|
||||
if "NO_ACTION" in approval_actions:
|
||||
stage = "manual_required"
|
||||
stage_status = "blocked"
|
||||
needs_human = True
|
||||
blockers.append("approval_resolved_no_action_without_execution")
|
||||
else:
|
||||
stage = "execution_missing"
|
||||
stage_status = "blocked"
|
||||
needs_human = True
|
||||
blockers.append("approved_without_execution_record")
|
||||
|
||||
op_statuses = {str(row.get("status") or "").lower() for row in automation_ops}
|
||||
if op_statuses:
|
||||
if op_statuses & {"success", "completed"}:
|
||||
stage = "execution_succeeded"
|
||||
stage_status = "success"
|
||||
elif op_statuses & {"failed", "error"}:
|
||||
stage = "execution_failed"
|
||||
stage_status = "error"
|
||||
needs_human = True
|
||||
else:
|
||||
stage = "execution_started"
|
||||
stage_status = "running"
|
||||
|
||||
if incident_status == "INVESTIGATING" and automation_ops == [] and approvals:
|
||||
blockers.append("incident_still_investigating_after_approval")
|
||||
|
||||
if gateway_mcp_total == 0:
|
||||
blockers.append("awooop_mcp_gateway_audit_empty")
|
||||
if legacy_mcp_total == 0 and incident is not None:
|
||||
blockers.append("legacy_mcp_audit_missing")
|
||||
if outbound_visible_total == 0:
|
||||
blockers.append("outbound_mirror_not_visible_for_source")
|
||||
|
||||
return {
|
||||
"current_stage": stage,
|
||||
"stage_status": stage_status,
|
||||
"needs_human": needs_human,
|
||||
"blockers": blockers,
|
||||
}
|
||||
|
||||
|
||||
def _summarize_mcp(rows: list[dict[str, Any]]) -> dict[str, Any]:
|
||||
by_tool: dict[str, dict[str, Any]] = {}
|
||||
success_count = 0
|
||||
failure_count = 0
|
||||
for row in rows:
|
||||
key = f"{row.get('mcp_server') or 'unknown'}:{row.get('tool_name') or 'unknown'}"
|
||||
item = by_tool.setdefault(
|
||||
key,
|
||||
{
|
||||
"mcp_server": row.get("mcp_server"),
|
||||
"tool_name": row.get("tool_name"),
|
||||
"success": 0,
|
||||
"failed": 0,
|
||||
"last_error": None,
|
||||
},
|
||||
)
|
||||
if row.get("success") is True:
|
||||
item["success"] += 1
|
||||
success_count += 1
|
||||
else:
|
||||
item["failed"] += 1
|
||||
failure_count += 1
|
||||
item["last_error"] = row.get("error_message")
|
||||
return {
|
||||
"total": len(rows),
|
||||
"success": success_count,
|
||||
"failed": failure_count,
|
||||
"by_tool": list(by_tool.values()),
|
||||
}
|
||||
|
||||
|
||||
async def fetch_truth_chain(source_id: str, project_id: str = "awoooi") -> dict[str, Any]:
|
||||
"""Return a read-only truth chain for an incident, drift report, or run id."""
|
||||
async with get_db_context(project_id) as db:
|
||||
incident = await _fetch_one(
|
||||
db,
|
||||
"""
|
||||
SELECT
|
||||
incident_id,
|
||||
project_id,
|
||||
status::text AS status,
|
||||
severity::text AS severity,
|
||||
alertname,
|
||||
alert_category,
|
||||
notification_type,
|
||||
created_at,
|
||||
updated_at,
|
||||
resolved_at,
|
||||
verification_result,
|
||||
frequency_snapshot,
|
||||
decision_chain
|
||||
FROM incidents
|
||||
WHERE incident_id = :source_id
|
||||
AND (project_id = :project_id OR project_id IS NULL)
|
||||
""",
|
||||
{"source_id": source_id, "project_id": project_id},
|
||||
)
|
||||
|
||||
drift = await _fetch_one(
|
||||
db,
|
||||
"""
|
||||
SELECT
|
||||
report_id,
|
||||
namespace,
|
||||
status,
|
||||
triggered_by,
|
||||
high_count,
|
||||
medium_count,
|
||||
info_count,
|
||||
scanned_at,
|
||||
created_at,
|
||||
resolved_at,
|
||||
interpretation,
|
||||
narrative_text
|
||||
FROM drift_reports
|
||||
WHERE report_id = :source_id
|
||||
""",
|
||||
{"source_id": source_id},
|
||||
)
|
||||
|
||||
runs = await _fetch_all(
|
||||
db,
|
||||
"""
|
||||
SELECT
|
||||
run_id,
|
||||
project_id,
|
||||
agent_id,
|
||||
state,
|
||||
trigger_type,
|
||||
trigger_ref,
|
||||
is_shadow,
|
||||
step_count,
|
||||
created_at,
|
||||
started_at,
|
||||
completed_at,
|
||||
error_code,
|
||||
error_detail
|
||||
FROM awooop_run_state
|
||||
WHERE project_id = :project_id
|
||||
AND (run_id::text = :source_id OR trigger_ref = :source_id)
|
||||
ORDER BY created_at DESC
|
||||
LIMIT :limit
|
||||
""",
|
||||
{"source_id": source_id, "project_id": project_id, "limit": _MAX_ROWS},
|
||||
)
|
||||
|
||||
approvals: list[dict[str, Any]] = []
|
||||
evidence_rows: list[dict[str, Any]] = []
|
||||
timeline_events: list[dict[str, Any]] = []
|
||||
legacy_mcp_rows: list[dict[str, Any]] = []
|
||||
automation_ops: list[dict[str, Any]] = []
|
||||
km_entries: list[dict[str, Any]] = []
|
||||
|
||||
if incident is not None:
|
||||
incident_id = str(incident["incident_id"])
|
||||
approvals = await _fetch_all(
|
||||
db,
|
||||
"""
|
||||
SELECT
|
||||
id,
|
||||
incident_id,
|
||||
status::text AS status,
|
||||
risk_level::text AS risk_level,
|
||||
action,
|
||||
description,
|
||||
hit_count,
|
||||
created_at,
|
||||
updated_at,
|
||||
resolved_at,
|
||||
matched_playbook_id,
|
||||
extra_metadata,
|
||||
decision_fusion_details
|
||||
FROM approval_records
|
||||
WHERE incident_id = :incident_id
|
||||
ORDER BY created_at DESC
|
||||
LIMIT :limit
|
||||
""",
|
||||
{"incident_id": incident_id, "limit": _MAX_ROWS},
|
||||
)
|
||||
approval_ids = _approval_ids(approvals)
|
||||
evidence_rows = await _fetch_all(
|
||||
db,
|
||||
"""
|
||||
SELECT
|
||||
id,
|
||||
incident_id,
|
||||
matched_playbook_id,
|
||||
collected_at,
|
||||
collection_duration_ms,
|
||||
sensors_attempted,
|
||||
sensors_succeeded,
|
||||
evidence_summary,
|
||||
metrics_snapshot,
|
||||
mcp_health,
|
||||
verification_result,
|
||||
pre_execution_state,
|
||||
post_execution_state,
|
||||
self_healing_score,
|
||||
self_healing_detail
|
||||
FROM incident_evidence
|
||||
WHERE incident_id = :incident_id
|
||||
ORDER BY collected_at DESC
|
||||
LIMIT :limit
|
||||
""",
|
||||
{"incident_id": incident_id, "limit": _MAX_ROWS},
|
||||
)
|
||||
timeline_events = await _fetch_all(
|
||||
db,
|
||||
"""
|
||||
SELECT
|
||||
id,
|
||||
event_type,
|
||||
status,
|
||||
title,
|
||||
description,
|
||||
actor,
|
||||
actor_role,
|
||||
risk_level,
|
||||
approval_id,
|
||||
incident_id,
|
||||
created_at
|
||||
FROM timeline_events
|
||||
WHERE incident_id = :incident_id
|
||||
OR approval_id = ANY(:approval_ids)
|
||||
ORDER BY created_at ASC
|
||||
LIMIT :limit
|
||||
""",
|
||||
{
|
||||
"incident_id": incident_id,
|
||||
"approval_ids": approval_ids or ["__none__"],
|
||||
"limit": _MAX_ROWS,
|
||||
},
|
||||
)
|
||||
legacy_mcp_rows = await _fetch_all(
|
||||
db,
|
||||
"""
|
||||
SELECT
|
||||
id,
|
||||
session_id,
|
||||
flywheel_node,
|
||||
mcp_server,
|
||||
tool_name,
|
||||
duration_ms,
|
||||
success,
|
||||
error_message,
|
||||
incident_id,
|
||||
agent_role,
|
||||
created_at
|
||||
FROM mcp_audit_log
|
||||
WHERE incident_id = :incident_id
|
||||
ORDER BY created_at ASC
|
||||
LIMIT :limit
|
||||
""",
|
||||
{"incident_id": incident_id, "limit": _MAX_ROWS},
|
||||
)
|
||||
automation_ops = await _fetch_all(
|
||||
db,
|
||||
"""
|
||||
SELECT
|
||||
op_id,
|
||||
operation_type,
|
||||
status,
|
||||
incident_id,
|
||||
run_id,
|
||||
actor,
|
||||
dry_run_result,
|
||||
error,
|
||||
duration_ms,
|
||||
tags,
|
||||
created_at
|
||||
FROM automation_operation_log
|
||||
WHERE coalesce(input::text, '') LIKE :needle
|
||||
OR coalesce(output::text, '') LIKE :needle
|
||||
OR coalesce(array_to_string(tags, ','), '') LIKE :needle
|
||||
ORDER BY created_at DESC
|
||||
LIMIT :limit
|
||||
""",
|
||||
{"needle": f"%{incident_id}%", "limit": _MAX_ROWS},
|
||||
)
|
||||
km_entries = await _fetch_all(
|
||||
db,
|
||||
"""
|
||||
SELECT
|
||||
id,
|
||||
title,
|
||||
entry_type::text AS entry_type,
|
||||
status::text AS status,
|
||||
related_incident_id,
|
||||
related_approval_id,
|
||||
created_at
|
||||
FROM knowledge_entries
|
||||
WHERE related_incident_id = :incident_id
|
||||
ORDER BY created_at DESC
|
||||
LIMIT :limit
|
||||
""",
|
||||
{"incident_id": incident_id, "limit": _MAX_ROWS},
|
||||
)
|
||||
|
||||
drift_repeats: dict[str, Any] = {
|
||||
"occurrences_12h": 0,
|
||||
"first_scanned_at": None,
|
||||
"last_scanned_at": None,
|
||||
"reports": [],
|
||||
}
|
||||
if drift is not None:
|
||||
repeat_summary = await _fetch_one(
|
||||
db,
|
||||
"""
|
||||
SELECT
|
||||
count(*) AS occurrences_12h,
|
||||
min(scanned_at) AS first_scanned_at,
|
||||
max(scanned_at) AS last_scanned_at
|
||||
FROM drift_reports
|
||||
WHERE created_at > now() - interval '12 hours'
|
||||
AND namespace = :namespace
|
||||
AND status = :status
|
||||
AND high_count = :high_count
|
||||
AND medium_count = :medium_count
|
||||
AND info_count = :info_count
|
||||
""",
|
||||
{
|
||||
"namespace": drift["namespace"],
|
||||
"status": drift["status"],
|
||||
"high_count": drift["high_count"],
|
||||
"medium_count": drift["medium_count"],
|
||||
"info_count": drift["info_count"],
|
||||
},
|
||||
)
|
||||
repeat_reports = await _fetch_all(
|
||||
db,
|
||||
"""
|
||||
SELECT report_id, scanned_at, created_at, status, interpretation, narrative_text
|
||||
FROM drift_reports
|
||||
WHERE created_at > now() - interval '12 hours'
|
||||
AND namespace = :namespace
|
||||
AND status = :status
|
||||
AND high_count = :high_count
|
||||
AND medium_count = :medium_count
|
||||
AND info_count = :info_count
|
||||
ORDER BY scanned_at DESC
|
||||
LIMIT 20
|
||||
""",
|
||||
{
|
||||
"namespace": drift["namespace"],
|
||||
"status": drift["status"],
|
||||
"high_count": drift["high_count"],
|
||||
"medium_count": drift["medium_count"],
|
||||
"info_count": drift["info_count"],
|
||||
},
|
||||
)
|
||||
drift_repeats = {
|
||||
**(repeat_summary or {}),
|
||||
"reports": repeat_reports,
|
||||
}
|
||||
|
||||
gateway_mcp_rows = await _fetch_all(
|
||||
db,
|
||||
"""
|
||||
SELECT
|
||||
call_id,
|
||||
project_id,
|
||||
run_id,
|
||||
trace_id,
|
||||
agent_id,
|
||||
tool_name,
|
||||
result_status,
|
||||
block_gate,
|
||||
block_reason,
|
||||
latency_ms,
|
||||
created_at
|
||||
FROM awooop_mcp_gateway_audit
|
||||
WHERE project_id = :project_id
|
||||
AND (
|
||||
trace_id = :source_id
|
||||
OR run_id::text = :source_id
|
||||
)
|
||||
ORDER BY created_at ASC
|
||||
LIMIT :limit
|
||||
""",
|
||||
{"source_id": source_id, "project_id": project_id, "limit": _MAX_ROWS},
|
||||
)
|
||||
outbound_rows = await _fetch_all(
|
||||
db,
|
||||
"""
|
||||
SELECT
|
||||
message_id,
|
||||
project_id,
|
||||
run_id,
|
||||
channel_type,
|
||||
message_type,
|
||||
content_hash,
|
||||
content_preview,
|
||||
provider_message_id,
|
||||
send_status,
|
||||
queued_at,
|
||||
sent_at,
|
||||
triggered_by_state
|
||||
FROM awooop_outbound_message
|
||||
WHERE project_id = :project_id
|
||||
AND (
|
||||
run_id::text = :source_id
|
||||
OR content_preview ILIKE :needle
|
||||
)
|
||||
ORDER BY queued_at DESC
|
||||
LIMIT :limit
|
||||
""",
|
||||
{
|
||||
"source_id": source_id,
|
||||
"project_id": project_id,
|
||||
"needle": f"%{source_id}%",
|
||||
"limit": _MAX_ROWS,
|
||||
},
|
||||
)
|
||||
|
||||
source_type = _source_type(source_id, incident, drift)
|
||||
legacy_mcp_summary = _summarize_mcp(legacy_mcp_rows)
|
||||
truth_status = _truth_status(
|
||||
incident=incident,
|
||||
approvals=approvals,
|
||||
evidence_rows=evidence_rows,
|
||||
automation_ops=automation_ops,
|
||||
drift=drift,
|
||||
drift_repeat_count=int(drift_repeats.get("occurrences_12h") or 0),
|
||||
gateway_mcp_total=len(gateway_mcp_rows),
|
||||
legacy_mcp_total=legacy_mcp_summary["total"],
|
||||
outbound_visible_total=len(outbound_rows),
|
||||
)
|
||||
|
||||
evidence_totals = {
|
||||
"records": len(evidence_rows),
|
||||
"sensors_attempted": sum(int(row.get("sensors_attempted") or 0) for row in evidence_rows),
|
||||
"sensors_succeeded": sum(int(row.get("sensors_succeeded") or 0) for row in evidence_rows),
|
||||
"failed_tools": _failed_mcp_tools(evidence_rows),
|
||||
}
|
||||
|
||||
result = {
|
||||
"project_id": project_id,
|
||||
"source_id": source_id,
|
||||
"source_type": source_type,
|
||||
"found": incident is not None or drift is not None or bool(runs),
|
||||
"truth_status": truth_status,
|
||||
"linked_ids": {
|
||||
"incident_id": incident.get("incident_id") if incident else None,
|
||||
"approval_ids": _approval_ids(approvals),
|
||||
"run_ids": [row["run_id"] for row in runs],
|
||||
"drift_report_id": drift.get("report_id") if drift else None,
|
||||
"operation_ids": _operation_ids(automation_ops),
|
||||
},
|
||||
"incident": incident,
|
||||
"drift": {
|
||||
"report": drift,
|
||||
"repeat_state": drift_repeats,
|
||||
},
|
||||
"approvals": approvals,
|
||||
"evidence": {
|
||||
"summary": evidence_totals,
|
||||
"records": evidence_rows,
|
||||
},
|
||||
"mcp": {
|
||||
"awooop_gateway": {
|
||||
"total": len(gateway_mcp_rows),
|
||||
"records": gateway_mcp_rows,
|
||||
},
|
||||
"legacy": {
|
||||
**legacy_mcp_summary,
|
||||
"records": legacy_mcp_rows,
|
||||
},
|
||||
},
|
||||
"execution": {
|
||||
"automation_operation_log": automation_ops,
|
||||
"ansible": {
|
||||
"considered": False,
|
||||
"records": [],
|
||||
"not_used_reason": "no first-class Ansible executor audit record in current truth chain",
|
||||
},
|
||||
},
|
||||
"learning": {
|
||||
"knowledge_entries": km_entries,
|
||||
},
|
||||
"timeline_events": timeline_events,
|
||||
"channel": {
|
||||
"outbound_messages_visible": len(outbound_rows),
|
||||
"outbound_messages": outbound_rows,
|
||||
"visibility_note": (
|
||||
"If this is zero while Telegram delivered a card, the outbound mirror "
|
||||
"or RLS project context is not a reliable source of truth yet."
|
||||
),
|
||||
},
|
||||
}
|
||||
logger.info(
|
||||
"awooop_truth_chain_fetched",
|
||||
project_id=project_id,
|
||||
source_id=source_id,
|
||||
source_type=source_type,
|
||||
current_stage=truth_status["current_stage"],
|
||||
)
|
||||
return result
|
||||
48
apps/api/tests/test_awooop_truth_chain_service.py
Normal file
48
apps/api/tests/test_awooop_truth_chain_service.py
Normal file
@@ -0,0 +1,48 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from src.services.awooop_truth_chain_service import _truth_status
|
||||
|
||||
|
||||
def test_truth_status_marks_no_action_approval_as_manual_required() -> None:
|
||||
status = _truth_status(
|
||||
incident={"incident_id": "INC-1", "status": "INVESTIGATING"},
|
||||
approvals=[{"status": "APPROVED", "action": "未知操作 | NO_ACTION"}],
|
||||
evidence_rows=[{"sensors_attempted": 8, "sensors_succeeded": 0}],
|
||||
automation_ops=[],
|
||||
drift=None,
|
||||
drift_repeat_count=0,
|
||||
gateway_mcp_total=0,
|
||||
legacy_mcp_total=8,
|
||||
outbound_visible_total=0,
|
||||
)
|
||||
|
||||
assert status["current_stage"] == "manual_required"
|
||||
assert status["stage_status"] == "blocked"
|
||||
assert status["needs_human"] is True
|
||||
assert "approval_resolved_no_action_without_execution" in status["blockers"]
|
||||
assert "all_evidence_sensors_failed" in status["blockers"]
|
||||
assert "awooop_mcp_gateway_audit_empty" in status["blockers"]
|
||||
|
||||
|
||||
def test_truth_status_marks_repeated_pending_drift_as_human_needed() -> None:
|
||||
status = _truth_status(
|
||||
incident=None,
|
||||
approvals=[],
|
||||
evidence_rows=[],
|
||||
automation_ops=[],
|
||||
drift={
|
||||
"report_id": "7f858956",
|
||||
"status": "pending",
|
||||
"interpretation": {"confidence": 0.0},
|
||||
},
|
||||
drift_repeat_count=12,
|
||||
gateway_mcp_total=0,
|
||||
legacy_mcp_total=0,
|
||||
outbound_visible_total=0,
|
||||
)
|
||||
|
||||
assert status["current_stage"] == "dedup_or_repeat_updated"
|
||||
assert status["stage_status"] == "pending"
|
||||
assert status["needs_human"] is True
|
||||
assert "drift_report_pending_without_resolution" in status["blockers"]
|
||||
assert "drift_ai_confidence_zero" in status["blockers"]
|
||||
@@ -25,3 +25,13 @@ def test_recent_events_route_is_registered() -> None:
|
||||
]
|
||||
|
||||
assert "/events/recent" in paths
|
||||
|
||||
|
||||
def test_truth_chain_route_is_registered() -> None:
|
||||
paths = [
|
||||
route.path
|
||||
for route in router.routes
|
||||
if "GET" in getattr(route, "methods", set())
|
||||
]
|
||||
|
||||
assert "/truth-chain/{source_id}" in paths
|
||||
|
||||
@@ -1,3 +1,24 @@
|
||||
## 2026-05-12 | Truth-chain T0 read-only API 第一版
|
||||
|
||||
**背景**:完成 Telegram / AwoooP truth-chain live audit 後,下一步先做不改 runtime 的 T0 查詢端點,避免再只靠 Telegram 文案或人工 SQL 判斷流程卡點。
|
||||
|
||||
**本次實作**:
|
||||
- 新增 `GET /api/v1/platform/truth-chain/{source_id}`,沿用 AwoooP Operator Console auth。
|
||||
- 新增 `apps/api/src/services/awooop_truth_chain_service.py`,read-only 聚合 incident、drift、approval、evidence、legacy MCP、AwoooP MCP Gateway、automation_operation_log、KM、timeline、outbound mirror。
|
||||
- 對 B6C589 這類狀態矛盾,`truth_status` 會回 `manual_required` / `blocked` 並列出 blockers,例如 evidence sensors 全失敗、NO_ACTION 無 execution、AwoooP MCP Gateway audit 為空。
|
||||
- 對 Config Drift 這類重複 pending,`truth_status` 會回 `dedup_or_repeat_updated` / `pending`,並帶 12h repeat state。
|
||||
- Ansible 目前先明確回 `not_used_reason`,避免誤以為 AI 已把 Ansible 納入 first-class executor。
|
||||
|
||||
**驗證**:
|
||||
- `python -m py_compile apps/api/src/services/awooop_truth_chain_service.py apps/api/src/api/v1/platform/truth_chain.py` 通過。
|
||||
- `DATABASE_URL='postgresql+asyncpg://awoooi:awoooi_test_2026@localhost:5432/awoooi_test?ssl=disable' python -m pytest apps/api/tests/test_awooop_truth_chain_service.py apps/api/tests/test_platform_router_order.py apps/api/tests/test_awooop_operator_auth.py`:10 passed。
|
||||
|
||||
**整體進度**:
|
||||
- Wave 0:完成並已推版。
|
||||
- Wave 1:RLS/通知治理到 Wave1.3 完成並已推版;outbound app-role 可見性列為新紅燈。
|
||||
- Truth-chain T0:live audit、MASTER 收斂、read-only API 第一版完成;待推版與 production smoke。
|
||||
- 下一步:推 Gitea main,等部署後用 `INC-20260512-B6C589` 與 `7f858956` 打 live endpoint smoke;再進 T1 Channel Event hardening。
|
||||
|
||||
## 2026-05-12 | Telegram / AwoooP AI 自動化真相鏈 live audit
|
||||
|
||||
**背景**:統帥貼出 Telegram 低風險與 Config Drift 卡片,指出目前無法從訊息判斷是否重複、跑到哪個流程、是否真的 AI 自動修復、是否需要人工,以及 MCP / 自建 MCP / Ansible / Sentry / SignOz 是否實際參與。
|
||||
|
||||
@@ -366,6 +366,8 @@ source_event_received
|
||||
| T4 Drift fingerprint FSM | Drift 不再每小時新增不可辨識卡;同 fingerprint 更新 repeat_count / status / PR / adopt / rollback / ignore | `awoooi-prod HIGH=1 MEDIUM=30` 同組 12h 只更新同一 truth chain |
|
||||
| T5 Incident status reconciliation | approval、incident、evidence、execution、timeline 狀態一致;NO_ACTION 必須是明確 manual_required 或 resolved(noop) | B6C589 類事件不得再出現 incident investigating + approval approved/resolved + no execution/verification 的矛盾 |
|
||||
|
||||
**T0 first implementation(2026-05-12 22:50 台北)**:新增 read-only `GET /api/v1/platform/truth-chain/{source_id}`,由 Operator Console auth 保護,聚合 incident / drift / approval / evidence / legacy MCP / AwoooP MCP Gateway / automation_operation_log / KM / timeline / outbound mirror。此 endpoint 只揭露現況與缺口,不改任何 incident、approval、execution 或 Telegram state。
|
||||
|
||||
**當前紅線**:在 T0-T2 未完成前,任何「中低風險告警已有 AI 自動修復」都只能逐案查證,不能全域宣稱。Telegram 卡片必須誠實顯示 degraded / failed / pending_human,而不是只顯示 AI 研判摘要。
|
||||
|
||||
---
|
||||
@@ -1818,6 +1820,24 @@ Phase 6 完成後
|
||||
|
||||
---
|
||||
|
||||
### 2026-05-12 晚 (台北) — T0 truth-chain read API — 第一版查詢端點落地
|
||||
|
||||
**範圍**:
|
||||
- 新增 `services/awooop_truth_chain_service.py`:read-only 聚合 incident / drift / approval / evidence / legacy MCP / AwoooP MCP Gateway / automation_operation_log / KM / timeline / outbound mirror。
|
||||
- 新增 `api/v1/platform/truth_chain.py`:`GET /api/v1/platform/truth-chain/{source_id}`,沿用 Operator Console auth。
|
||||
- `api/v1/platform/__init__.py` 掛載 router;`test_platform_router_order.py` 補 route 註冊檢查;新增 `_truth_status` 單元測試。
|
||||
|
||||
**設計邊界**:
|
||||
- 只讀,不修改 incident、approval、execution、Telegram 或 drift 狀態。
|
||||
- 若 outbound mirror 在 RLS context 下不可見,API 會明確回傳 visibility note,不假裝資料存在。
|
||||
- Ansible 在 truth-chain 中先標示為 `not_used_reason`,直到 T3 declarative executor 有 first-class audit record。
|
||||
|
||||
**驗證**:
|
||||
- `python -m py_compile apps/api/src/services/awooop_truth_chain_service.py apps/api/src/api/v1/platform/truth_chain.py` 通過。
|
||||
- `DATABASE_URL='postgresql+asyncpg://awoooi:awoooi_test_2026@localhost:5432/awoooi_test?ssl=disable' python -m pytest apps/api/tests/test_awooop_truth_chain_service.py apps/api/tests/test_platform_router_order.py apps/api/tests/test_awooop_operator_auth.py`:10 passed。
|
||||
|
||||
---
|
||||
|
||||
### 2026-04-20 晚 (台北) — C1-C4 全流程串接 — Playbook 鏈路保護(commit de2d34d)
|
||||
|
||||
**觸發**:統帥全景盤查 AI 自動化節點後,發現 Playbook 自動修復鏈路有 3 個結構性斷點。
|
||||
|
||||
Reference in New Issue
Block a user