fix(awooop): classify no action audits correctly
All checks were successful
Code Review / ai-code-review (push) Successful in 11s
CD Pipeline / tests (push) Successful in 1m11s
CD Pipeline / build-and-deploy (push) Successful in 3m39s
CD Pipeline / post-deploy-checks (push) Successful in 1m37s

This commit is contained in:
Your Name
2026-05-13 17:12:44 +08:00
parent c1e2567b15
commit 22beddc8a8
2 changed files with 110 additions and 8 deletions

View File

@@ -101,6 +101,40 @@ def _auto_repair_ids(auto_repair_executions: list[dict[str, Any]]) -> list[str]:
return [str(row["id"]) for row in auto_repair_executions if row.get("id")]
def _looks_like_no_action(value: Any) -> bool:
text = str(value or "").upper()
return (
"NO_ACTION" in text
or "NO-ACTION" in text
or "NOACTION" in text
or text.startswith("OBSERVE")
or text.startswith("INVESTIGATE")
)
def _approval_has_no_action(approvals: list[dict[str, Any]]) -> bool:
return any(_looks_like_no_action(row.get("action")) for row in approvals)
def _is_no_action_operation(row: dict[str, Any]) -> bool:
"""Return true for durable audit rows that represent observation, not repair."""
if str(row.get("operation_type") or "") != "playbook_executed":
return False
return any(
_looks_like_no_action(row.get(key))
for key in (
"input_action",
"output_action",
"output_reason",
"output_not_used_reason",
)
)
def _effective_execution_ops(automation_ops: list[dict[str, Any]]) -> list[dict[str, Any]]:
return [row for row in automation_ops if not _is_no_action_operation(row)]
def build_incident_reconciliation(
*,
incident: dict[str, Any] | None,
@@ -255,7 +289,8 @@ def _truth_status(
if incident is not None:
incident_status = str(incident.get("status") or "unknown")
repair_rows = auto_repair_executions or []
has_execution_records = bool(automation_ops or repair_rows)
effective_ops = _effective_execution_ops(automation_ops)
has_execution_records = bool(effective_ops or repair_rows)
stage = "received"
stage_status = incident_status.lower()
if incident_status in {"RESOLVED", "CLOSED"}:
@@ -275,12 +310,13 @@ def _truth_status(
approval_statuses = {str(row.get("status") or "").upper() for row in approvals}
approval_actions = " ".join(str(row.get("action") or "") for row in approvals).upper()
approval_no_action = _approval_has_no_action(approvals)
if any(status in {"PENDING", "WAITING_APPROVAL"} for status in approval_statuses):
stage = "approval_required"
stage_status = "waiting"
needs_human = True
elif "APPROVED" in approval_statuses and not has_execution_records:
if "NO_ACTION" in approval_actions:
if approval_no_action or "NO_ACTION" in approval_actions:
stage = "manual_required"
stage_status = "blocked"
needs_human = True
@@ -291,7 +327,7 @@ def _truth_status(
needs_human = True
blockers.append("approved_without_execution_record")
op_statuses = {str(row.get("status") or "").lower() for row in automation_ops}
op_statuses = {str(row.get("status") or "").lower() for row in effective_ops}
repair_successes = {row.get("success") for row in repair_rows}
if op_statuses or repair_successes:
if (op_statuses & {"success", "completed"}) or True in repair_successes:
@@ -372,12 +408,15 @@ def build_automation_quality(
evidence_succeeded = sum(int(row.get("sensors_succeeded") or 0) for row in evidence_rows)
gateway_total = int(gateway_mcp_summary.get("total") or 0)
legacy_total = int(legacy_mcp_summary.get("total") or 0)
automation_statuses = {str(row.get("status") or "").lower() for row in automation_ops}
effective_ops = _effective_execution_ops(automation_ops)
noop_ops = [row for row in automation_ops if _is_no_action_operation(row)]
automation_statuses = {str(row.get("status") or "").lower() for row in effective_ops}
auto_repair_successes = {row.get("success") for row in auto_repair_executions}
has_execution = bool(automation_ops or auto_repair_executions)
has_execution = bool(effective_ops or auto_repair_executions)
verification_result = _latest_verification_result(incident, evidence_rows)
approval_statuses = {str(row.get("status") or "").upper() for row in approvals}
approval_actions = " ".join(str(row.get("action") or "") for row in approvals).upper()
approval_no_action = _approval_has_no_action(approvals)
gate("source_persisted", "passed", str(incident.get("incident_id")))
gate("outbound_recorded", "passed" if outbound_rows else "missing", str(len(outbound_rows)))
@@ -399,14 +438,14 @@ def build_automation_quality(
if any(status in {"PENDING", "WAITING_APPROVAL"} for status in approval_statuses):
gate("approval_state", "warning", "waiting_approval")
elif "APPROVED" in approval_statuses and "NO_ACTION" in approval_actions and not has_execution:
elif "APPROVED" in approval_statuses and (approval_no_action or "NO_ACTION" in approval_actions) and not has_execution:
gate("approval_state", "failed", "approved_no_action_without_execution")
elif approvals:
gate("approval_state", "passed", ",".join(sorted(approval_statuses)))
else:
gate("approval_state", "not_applicable", "no approval")
gate("execution_recorded", "passed" if has_execution else "missing", str(len(automation_ops) + len(auto_repair_executions)))
gate("execution_recorded", "passed" if has_execution else "missing", str(len(effective_ops) + len(auto_repair_executions)))
gate("auto_repair_recorded", "passed" if auto_repair_executions else "missing", str(len(auto_repair_executions)))
if not has_execution:
@@ -433,7 +472,7 @@ def build_automation_quality(
verdict = "execution_failed"
elif has_execution:
verdict = "execution_unverified"
elif "APPROVED" in approval_statuses and "NO_ACTION" in approval_actions:
elif "APPROVED" in approval_statuses and (approval_no_action or "NO_ACTION" in approval_actions):
verdict = "manual_required_no_action"
elif any(status in {"PENDING", "WAITING_APPROVAL"} for status in approval_statuses):
verdict = "approval_required"
@@ -479,6 +518,8 @@ def build_automation_quality(
"legacy_mcp_total": legacy_total,
"approvals": len(approvals),
"automation_operation_records": len(automation_ops),
"effective_execution_records": len(effective_ops),
"noop_operation_records": len(noop_ops),
"auto_repair_execution_records": len(auto_repair_executions),
"verification_result": verification_result,
"knowledge_entries": len(km_entries),
@@ -960,6 +1001,7 @@ async def fetch_truth_chain(source_id: str, project_id: str = "awoooi") -> dict[
error,
duration_ms,
tags,
input ->> 'action' AS input_action,
input ->> 'executor' AS input_executor,
input ->> 'execution_backend' AS input_execution_backend,
input ->> 'playbook_id' AS input_playbook_id,
@@ -967,6 +1009,8 @@ async def fetch_truth_chain(source_id: str, project_id: str = "awoooi") -> dict[
input ->> 'ansible_playbook_path' AS input_ansible_playbook_path,
input ->> 'check_mode' AS input_check_mode,
input ->> 'not_used_reason' AS input_not_used_reason,
output ->> 'action' AS output_action,
output ->> 'reason' AS output_reason,
output ->> 'executor' AS output_executor,
output ->> 'execution_backend' AS output_execution_backend,
output ->> 'playbook_id' AS output_playbook_id,

View File

@@ -58,6 +58,33 @@ def test_truth_status_marks_no_action_approval_as_manual_required() -> None:
assert "awooop_mcp_gateway_audit_empty" in status["blockers"]
def test_truth_status_does_not_treat_no_action_audit_as_execution() -> None:
status = _truth_status(
incident={"incident_id": "INC-1", "status": "RESOLVED"},
approvals=[{"status": "APPROVED", "action": "未知操作 | NO_ACTION"}],
evidence_rows=[{"sensors_attempted": 8, "sensors_succeeded": 6}],
automation_ops=[
{
"operation_type": "playbook_executed",
"status": "success",
"actor": "approval_execution",
"output_reason": "NO_ACTION",
"output_action": "未知操作 | NO_ACTION",
},
],
drift=None,
drift_repeat_count=0,
gateway_mcp_total=8,
legacy_mcp_total=8,
outbound_visible_total=1,
)
assert status["current_stage"] == "manual_required"
assert status["stage_status"] == "blocked"
assert status["needs_human"] is True
assert "approval_resolved_no_action_without_execution" in status["blockers"]
def test_truth_status_marks_repeated_pending_drift_as_human_needed() -> None:
status = _truth_status(
incident=None,
@@ -249,6 +276,37 @@ def test_automation_quality_marks_no_action_without_execution() -> None:
assert "execution_recorded" in quality["blockers"]
def test_automation_quality_ignores_no_action_audit_rows_as_execution() -> None:
quality = build_automation_quality(
incident={"incident_id": "INC-1", "status": "RESOLVED"},
approvals=[{"status": "APPROVED", "action": "未知操作 | NO_ACTION"}],
evidence_rows=[{"sensors_attempted": 8, "sensors_succeeded": 6}],
automation_ops=[
{
"operation_type": "playbook_executed",
"status": "success",
"actor": "approval_execution",
"output_reason": "NO_ACTION",
"output_action": "未知操作 | NO_ACTION",
},
],
auto_repair_executions=[],
gateway_mcp_summary={"total": 8},
legacy_mcp_summary={"total": 8},
outbound_rows=[{"message_id": "m1"}],
km_entries=[{"id": "km-1"}],
timeline_events=[{"id": "tl-1"}],
)
gates = {row["name"]: row["status"] for row in quality["gates"]}
assert quality["verdict"] == "manual_required_no_action"
assert quality["facts"]["automation_operation_records"] == 1
assert quality["facts"]["effective_execution_records"] == 0
assert quality["facts"]["noop_operation_records"] == 1
assert gates["execution_recorded"] == "missing"
assert gates["verification_recorded"] == "not_applicable"
def test_automation_quality_marks_verified_auto_repair() -> None:
quality = build_automation_quality(
incident={