feat(awooop): audit ansible decision candidates
All checks were successful
Code Review / ai-code-review (push) Successful in 10s
CD Pipeline / tests (push) Successful in 1m1s
CD Pipeline / build-and-deploy (push) Successful in 3m33s
CD Pipeline / post-deploy-checks (push) Successful in 1m17s

This commit is contained in:
Your Name
2026-05-13 04:07:23 +08:00
parent f61747aeac
commit 3799e0db0d
5 changed files with 278 additions and 1 deletions

View File

@@ -8,8 +8,16 @@ hints are runtime remediation.
from __future__ import annotations
import json
from typing import Any
import structlog
from sqlalchemy import text
from src.db.base import get_db_context
logger = structlog.get_logger(__name__)
ANSIBLE_OPERATION_TYPES = frozenset({
"ansible_candidate_matched",
@@ -27,6 +35,9 @@ _CATALOG: tuple[dict[str, Any], ...] = (
"domains": ["swap", "harbor", "sentry", "gitea", "langfuse", "bitan", "runner", "keepalived", "nginx"],
"keywords": [
"110",
"docker",
"container",
"dockercontainerunhealthy",
"swap",
"harbor",
"sentry",
@@ -49,6 +60,9 @@ _CATALOG: tuple[dict[str, Any], ...] = (
"domains": ["docker", "momo_backup", "signoz", "minio", "litellm", "n8n", "open_webui", "nginx"],
"keywords": [
"188",
"docker",
"container",
"dockercontainerunhealthy",
"momo",
"backup",
"postgresql",
@@ -260,3 +274,160 @@ def build_ansible_truth(
else "no automation_operation_log row with Ansible operation type, tag, or executor backend for this source"
),
}
def _incident_public_dict(incident: Any) -> dict[str, Any]:
if incident is None:
return {}
if isinstance(incident, dict):
return incident
severity = getattr(incident, "severity", None)
signals_payload: list[dict[str, Any]] = []
for signal in getattr(incident, "signals", None) or []:
signals_payload.append({
"alert_name": getattr(signal, "alert_name", None),
"labels": getattr(signal, "labels", None) or {},
"annotations": getattr(signal, "annotations", None) or {},
})
return {
"incident_id": getattr(incident, "incident_id", None),
"project_id": getattr(incident, "project_id", None),
"alertname": getattr(incident, "alertname", None),
"alert_category": getattr(incident, "alert_category", None),
"notification_type": getattr(incident, "notification_type", None),
"severity": getattr(severity, "value", severity),
"affected_services": getattr(incident, "affected_services", None) or [],
"signals": signals_payload,
}
def build_ansible_decision_audit_payload(
*,
incident: Any,
proposal_data: dict[str, Any],
decision_path: str,
not_used_reason: str,
) -> dict[str, Any] | None:
"""Return an AOL payload when Ansible has catalog candidates for a decision."""
incident_payload = _incident_public_dict(incident)
hints = _catalog_hints(incident_payload, None)
candidates = hints.get("candidates") or []
if not candidates:
return None
incident_id = str(incident_payload.get("incident_id") or "")
input_payload = {
"incident_id": incident_id,
"executor": "ansible",
"execution_backend": "ansible",
"decision_path": decision_path,
"check_mode": True,
"apply_enabled": False,
"approval_required": True,
"candidate_catalog_schema": hints["match_mode"],
"executor_candidates": [
{
"catalog_id": row["catalog_id"],
"playbook_path": row["playbook_path"],
"inventory_hosts": row["inventory_hosts"],
"risk_level": row["risk_level"],
"match_score": row["match_score"],
"matched_keywords": row["matched_keywords"],
}
for row in candidates[:5]
],
"proposal_source": proposal_data.get("source", ""),
"proposal_risk_level": proposal_data.get("risk_level", ""),
"proposal_action_preview": str(
proposal_data.get("action")
or proposal_data.get("kubectl_command")
or ""
)[:240],
}
output_payload = {
"not_used_reason": not_used_reason,
"decision_effect": "audit_only",
"next_required_step": "wire approval_execution to Ansible check-mode before apply",
}
return {
"operation_type": "ansible_candidate_matched",
"status": "dry_run",
"input": input_payload,
"output": output_payload,
"dry_run_result": {
"check_mode_executed": False,
"candidate_count": len(candidates),
"reason": not_used_reason,
},
"tags": ["ansible", "decision", "candidate", "check_mode_pending"],
}
async def record_ansible_decision_audit(
*,
incident: Any,
proposal_data: dict[str, Any],
decision_path: str,
not_used_reason: str,
) -> bool:
"""Write a best-effort Ansible candidate audit row for one decision."""
payload = build_ansible_decision_audit_payload(
incident=incident,
proposal_data=proposal_data,
decision_path=decision_path,
not_used_reason=not_used_reason,
)
if payload is None:
return False
incident_id = payload["input"]["incident_id"]
project_id = getattr(incident, "project_id", None) or "awoooi"
try:
async with get_db_context(str(project_id)) as db:
existing = await db.execute(
text("""
SELECT op_id
FROM automation_operation_log
WHERE operation_type = 'ansible_candidate_matched'
AND input ->> 'incident_id' = :incident_id
AND input ->> 'executor' = 'ansible'
LIMIT 1
"""),
{"incident_id": incident_id},
)
if existing.scalar() is not None:
return False
await db.execute(
text("""
INSERT INTO automation_operation_log (
operation_type, actor, status,
input, output, dry_run_result, tags
) VALUES (
:operation_type,
'decision_manager',
:status,
CAST(:input AS jsonb),
CAST(:output AS jsonb),
CAST(:dry_run_result AS jsonb),
:tags
)
"""),
{
"operation_type": payload["operation_type"],
"status": payload["status"],
"input": json.dumps(payload["input"], ensure_ascii=False),
"output": json.dumps(payload["output"], ensure_ascii=False),
"dry_run_result": json.dumps(payload["dry_run_result"], ensure_ascii=False),
"tags": payload["tags"],
},
)
return True
except Exception as exc:
logger.warning(
"ansible_decision_audit_write_failed",
incident_id=incident_id,
error=str(exc),
)
return False

View File

@@ -1790,6 +1790,25 @@ class DecisionManager:
token.proposal_data["auto_approve_reason"] = auto_decision.reason_detail
await self._save_token(token)
try:
from src.services.awooop_ansible_audit_service import (
record_ansible_decision_audit as _record_ansible_decision_audit,
)
_fire_and_forget(
_record_ansible_decision_audit(
incident=incident,
proposal_data=token.proposal_data,
decision_path="auto_execute",
not_used_reason=(
"auto_execute selected existing executor path; "
"Ansible check-mode is not wired yet"
),
)
)
except Exception as _ansible_audit_err:
logger.debug("ansible_decision_audit_schedule_error", error=str(_ansible_audit_err))
# 觸發自動執行 (非阻塞)
_fire_and_forget(
self._auto_execute(incident, token)
@@ -1813,6 +1832,24 @@ class DecisionManager:
),
)
)
try:
from src.services.awooop_ansible_audit_service import (
record_ansible_decision_audit as _record_ansible_decision_audit,
)
_fire_and_forget(
_record_ansible_decision_audit(
incident=incident,
proposal_data=token.proposal_data,
decision_path="manual_approval",
not_used_reason=(
"manual approval required; Ansible check-mode "
"is not wired to approval execution yet"
),
)
)
except Exception as _ansible_audit_err:
logger.debug("ansible_decision_audit_schedule_error", error=str(_ansible_audit_err))
_fire_and_forget(
_push_decision_to_telegram(incident, token.proposal_data)
)

View File

@@ -1,6 +1,11 @@
from __future__ import annotations
from src.services.awooop_ansible_audit_service import build_ansible_truth
from types import SimpleNamespace
from src.services.awooop_ansible_audit_service import (
build_ansible_decision_audit_payload,
build_ansible_truth,
)
from src.services.awooop_truth_chain_service import _clean_row, _truth_status
@@ -107,3 +112,39 @@ def test_ansible_truth_keeps_catalog_hint_separate_from_runtime_use() -> None:
assert truth["candidate_catalog"]["candidates"][0]["catalog_id"] == "ansible:nginx-sync"
assert truth["candidate_catalog"]["candidates"][0]["approval_required"] is True
assert truth["candidate_catalog"]["decision_effect"] == "none"
def test_ansible_decision_audit_payload_is_dry_run_only() -> None:
incident = SimpleNamespace(
incident_id="INC-DOCKER",
project_id="awoooi",
alert_category="infrastructure",
notification_type="TYPE-3",
severity=SimpleNamespace(value="P3"),
affected_services=["bitan-pharmacy-bitan-1"],
signals=[
SimpleNamespace(
alert_name="DockerContainerUnhealthy",
labels={"alertname": "DockerContainerUnhealthy", "container": "bitan-pharmacy-bitan-1"},
annotations={},
)
],
)
payload = build_ansible_decision_audit_payload(
incident=incident,
proposal_data={"source": "expert_system", "risk_level": "low", "action": "NO_ACTION"},
decision_path="manual_approval",
not_used_reason="manual approval required; Ansible check-mode is not wired yet",
)
assert payload is not None
assert payload["operation_type"] == "ansible_candidate_matched"
assert payload["status"] == "dry_run"
assert payload["input"]["executor"] == "ansible"
assert payload["input"]["check_mode"] is True
assert payload["input"]["apply_enabled"] is False
assert payload["input"]["approval_required"] is True
assert payload["input"]["executor_candidates"]
assert payload["output"]["decision_effect"] == "audit_only"
assert payload["dry_run_result"]["check_mode_executed"] is False

View File

@@ -59,6 +59,25 @@
- `audit_contract.schema_version=ansible_executor_audit_v1`
- Caveat下一個 migration push 仍需 live 驗證 `run-migration` audit seed 是否完全通過;本輪 workflow 修正後沒有新的 migration 觸發可重跑。
**T3 第二段本地實作**
- `awooop_ansible_audit_service.py` 新增 decision audit payload/writer
- 只有 static catalog 有候選 playbook 時才寫 `automation_operation_log`
- operation_type=`ansible_candidate_matched`
- status=`dry_run`
- `input.executor=ansible``check_mode=true``apply_enabled=false``approval_required=true`
- `output.decision_effect=audit_only`
- `decision_manager` 在 auto-execute / manual-approval 分支都排程 best-effort audit write
- 不改 executor。
- 不跑 Ansible。
- 不阻塞決策和 Telegram。
- Docker/container 類 incident 也會命中 Ansible catalog hint讓 B6C589 這類事件後續新 decision 能留下 Ansible candidate audit row。
- 本地驗證:
- `py_compile`pass。
- `ruff --select F,E9`pass。
- `pytest test_awooop_truth_chain_service.py test_platform_router_order.py test_awooop_operator_auth.py -q`14 passed。
- `git diff --check`pass。
- 待推版與 production smoke。
## 2026-05-12 | run-migration audit seed 再修正
**背景**Gitea `run-migration``Seed asset_discovery_run (audit)` 再次失敗:

View File

@@ -1926,6 +1926,15 @@ Phase 6 完成後
- B6C589 truth-chain smoke`manual_required/blocked``mcp_gateway_total=8``execution.ansible.considered=false``records=0`、not_used_reason 清楚顯示沒有 Ansible audit record。
- 下一個 migration push 仍需驗證 `run-migration` audit seed live gate因本輪 workflow 修正後未再新增 migration 觸發重跑。
**T3 第二段本地追加**
- `decision_manager` 在 auto-execute / manual-approval 分支新增 best-effort Ansible candidate audit write。
- 僅在 catalog 有候選 playbook 時寫 `automation_operation_log`
`operation_type=ansible_candidate_matched``status=dry_run`
`input.check_mode=true``input.apply_enabled=false`
`output.decision_effect=audit_only`
- 這仍不是 Ansible 執行器;它只讓 truth-chain 能看到 AI decision path 曾考慮 Ansible candidate以及為何未進入 check-mode/apply。
- 本地 `py_compile` / `ruff F,E9` / 14 個 truth-chain/operator/router tests 通過;待推版和 production smoke。
---
### 2026-04-20 晚 (台北) — C1-C4 全流程串接 — Playbook 鏈路保護commit de2d34d