fix(awooop): mark inbound-only truth chains received
All checks were successful
Code Review / ai-code-review (push) Successful in 11s
CD Pipeline / tests (push) Successful in 1m19s
CD Pipeline / build-and-deploy (push) Successful in 3m24s
CD Pipeline / post-deploy-checks (push) Successful in 1m25s

This commit is contained in:
Your Name
2026-05-13 21:38:43 +08:00
parent 365d93f07e
commit a524e468e4
2 changed files with 26 additions and 0 deletions

View File

@@ -305,6 +305,7 @@ def _truth_status(
gateway_mcp_total: int,
legacy_mcp_total: int,
outbound_visible_total: int,
inbound_visible_total: int = 0,
auto_repair_executions: list[dict[str, Any]] | None = None,
) -> dict[str, Any]:
"""Derive the current operator-visible truth-chain stage."""
@@ -324,6 +325,10 @@ def _truth_status(
if confidence in (0, 0.0):
blockers.append("drift_ai_confidence_zero")
if incident is None and drift is None and inbound_visible_total > 0:
stage = "inbound_received"
stage_status = "observed"
if incident is not None:
incident_status = str(incident.get("status") or "unknown")
repair_rows = auto_repair_executions or []
@@ -1284,6 +1289,7 @@ async def fetch_truth_chain(source_id: str, project_id: str = "awoooi") -> dict[
gateway_mcp_total=len(gateway_mcp_rows),
legacy_mcp_total=legacy_mcp_summary["total"],
outbound_visible_total=len(outbound_rows),
inbound_visible_total=len(inbound_rows),
auto_repair_executions=auto_repair_executions,
)
if incident is None and drift is None and not runs and gateway_mcp_rows:

View File

@@ -91,6 +91,26 @@ def test_truth_status_marks_no_action_approval_as_manual_required() -> None:
assert "awooop_mcp_gateway_audit_empty" in status["blockers"]
def test_truth_status_marks_inbound_only_source_as_received() -> None:
status = _truth_status(
incident=None,
approvals=[],
evidence_rows=[],
automation_ops=[],
drift=None,
drift_repeat_count=0,
gateway_mcp_total=0,
legacy_mcp_total=0,
outbound_visible_total=0,
inbound_visible_total=1,
)
assert status["current_stage"] == "inbound_received"
assert status["stage_status"] == "observed"
assert status["needs_human"] is False
assert "awooop_mcp_gateway_audit_empty" in status["blockers"]
def test_truth_status_does_not_treat_no_action_audit_as_execution() -> None:
status = _truth_status(
incident={"incident_id": "INC-1", "status": "RESOLVED"},