feat(awooop): expose ansible audit truth surface
Some checks failed
Code Review / ai-code-review (push) Successful in 10s
run-migration / migrate (push) Failing after 9s
CD Pipeline / tests (push) Successful in 2m21s
CD Pipeline / build-and-deploy (push) Successful in 3m50s
CD Pipeline / post-deploy-checks (push) Successful in 1m19s

This commit is contained in:
Your Name
2026-05-13 03:53:13 +08:00
parent feda8a0b4b
commit ca80972dc7
8 changed files with 449 additions and 7 deletions

View File

@@ -0,0 +1,262 @@
"""AwoooP Ansible audit helpers.
This module is intentionally non-executing. It exposes the Ansible audit
contract and repo-known playbook catalog so the truth chain can say whether
Ansible was actually considered or executed, without pretending that catalog
hints are runtime remediation.
"""
from __future__ import annotations
from typing import Any
ANSIBLE_OPERATION_TYPES = frozenset({
"ansible_candidate_matched",
"ansible_check_mode_executed",
"ansible_apply_executed",
"ansible_rollback_executed",
"ansible_execution_skipped",
})
_CATALOG: tuple[dict[str, Any], ...] = (
{
"catalog_id": "ansible:110-devops",
"playbook_path": "infra/ansible/playbooks/110-devops.yml",
"inventory_hosts": ["host_110"],
"domains": ["swap", "harbor", "sentry", "gitea", "langfuse", "bitan", "runner", "keepalived", "nginx"],
"keywords": [
"110",
"swap",
"harbor",
"sentry",
"gitea",
"langfuse",
"bitan",
"runner",
"github-runner",
"keepalived",
],
"supports_check_mode": True,
"auto_apply_enabled": False,
"approval_required": True,
"risk_level": "medium",
},
{
"catalog_id": "ansible:188-ai-web",
"playbook_path": "infra/ansible/playbooks/188-ai-web.yml",
"inventory_hosts": ["host_188"],
"domains": ["docker", "momo_backup", "signoz", "minio", "litellm", "n8n", "open_webui", "nginx"],
"keywords": [
"188",
"momo",
"backup",
"postgresql",
"pg_backup",
"signoz",
"minio",
"litellm",
"n8n",
"open-webui",
"openwebui",
"docker-registry",
],
"supports_check_mode": True,
"auto_apply_enabled": False,
"approval_required": True,
"risk_level": "medium",
},
{
"catalog_id": "ansible:nginx-sync",
"playbook_path": "infra/ansible/playbooks/nginx-sync.yml",
"inventory_hosts": ["host_110", "host_188"],
"domains": ["nginx", "proxy", "ollama_proxy", "tls"],
"keywords": ["nginx", "proxy", "ollama", "gcp", "tls", "cert", "502", "upstream"],
"supports_check_mode": True,
"auto_apply_enabled": False,
"approval_required": True,
"risk_level": "medium",
},
{
"catalog_id": "ansible:restore-password-auth",
"playbook_path": "infra/ansible/playbooks/restore-password-auth.yml",
"inventory_hosts": ["host_110", "host_120", "host_121", "host_188"],
"domains": ["ssh", "password_auth"],
"keywords": ["ssh", "passwordauthentication", "password auth", "login", "auth"],
"supports_check_mode": False,
"auto_apply_enabled": False,
"approval_required": True,
"risk_level": "high",
},
)
def _get(row: dict[str, Any], key: str) -> Any:
return row.get(key)
def _tags(row: dict[str, Any]) -> list[str]:
raw = _get(row, "tags")
if isinstance(raw, list):
return [str(item).lower() for item in raw]
if isinstance(raw, str):
return [part.strip().lower() for part in raw.split(",") if part.strip()]
return []
def _first_present(row: dict[str, Any], keys: tuple[str, ...]) -> Any:
for key in keys:
value = _get(row, key)
if value not in (None, ""):
return value
return None
def _is_ansible_operation(row: dict[str, Any]) -> bool:
operation_type = str(_get(row, "operation_type") or "").lower()
if operation_type in ANSIBLE_OPERATION_TYPES:
return True
if "ansible" in _tags(row):
return True
executor = str(
_first_present(
row,
(
"input_executor",
"input_execution_backend",
"output_executor",
"output_execution_backend",
),
)
or ""
).lower()
if executor == "ansible":
return True
playbook_path = str(
_first_present(row, ("input_playbook_path", "output_playbook_path", "input_ansible_playbook_path", "output_ansible_playbook_path"))
or ""
).lower()
return "infra/ansible/" in playbook_path or playbook_path.endswith(".yml") and "ansible" in playbook_path
def _ansible_record(row: dict[str, Any]) -> dict[str, Any]:
return {
"op_id": _get(row, "op_id"),
"operation_type": _get(row, "operation_type"),
"status": _get(row, "status"),
"actor": _get(row, "actor"),
"playbook_id": _first_present(row, ("input_playbook_id", "output_playbook_id")),
"playbook_path": _first_present(
row,
("input_playbook_path", "output_playbook_path", "input_ansible_playbook_path", "output_ansible_playbook_path"),
),
"check_mode": _first_present(row, ("input_check_mode", "output_check_mode")),
"not_used_reason": _first_present(row, ("input_not_used_reason", "output_not_used_reason")),
"dry_run_result": _get(row, "dry_run_result"),
"error": _get(row, "error"),
"duration_ms": _get(row, "duration_ms"),
"tags": _get(row, "tags"),
"created_at": _get(row, "created_at"),
}
def _flatten_text(value: Any, pieces: list[str], remaining: int = 80) -> int:
if remaining <= 0 or value is None:
return remaining
if isinstance(value, dict):
for key, item in value.items():
remaining = _flatten_text(key, pieces, remaining)
remaining = _flatten_text(item, pieces, remaining)
if remaining <= 0:
break
return remaining
if isinstance(value, list):
for item in value:
remaining = _flatten_text(item, pieces, remaining)
if remaining <= 0:
break
return remaining
pieces.append(str(value).lower())
return remaining - 1
def _source_haystack(incident: dict[str, Any] | None, drift: dict[str, Any] | None) -> str:
pieces: list[str] = []
_flatten_text(incident, pieces)
_flatten_text(drift, pieces)
return " ".join(pieces)
def _catalog_hints(incident: dict[str, Any] | None, drift: dict[str, Any] | None) -> dict[str, Any]:
haystack = _source_haystack(incident, drift)
candidates: list[dict[str, Any]] = []
unmatched: list[str] = []
for item in _CATALOG:
matched = [keyword for keyword in item["keywords"] if keyword in haystack]
public_item = {
key: value
for key, value in item.items()
if key
in {
"catalog_id",
"playbook_path",
"inventory_hosts",
"domains",
"supports_check_mode",
"auto_apply_enabled",
"approval_required",
"risk_level",
}
}
if matched:
candidates.append({
**public_item,
"match_score": len(matched),
"matched_keywords": matched,
})
else:
unmatched.append(item["catalog_id"])
candidates.sort(key=lambda row: (-int(row["match_score"]), str(row["catalog_id"])))
return {
"match_mode": "static_catalog_keyword_hint_v1",
"decision_effect": "none",
"available_count": len(_CATALOG),
"candidates": candidates,
"unmatched_catalog_ids": unmatched,
}
def build_ansible_truth(
automation_ops: list[dict[str, Any]],
*,
incident: dict[str, Any] | None,
drift: dict[str, Any] | None,
) -> dict[str, Any]:
"""Build the truth-chain Ansible section from audited facts and catalog hints."""
records = [_ansible_record(row) for row in automation_ops if _is_ansible_operation(row)]
return {
"considered": bool(records),
"records": records,
"audit_contract": {
"schema_version": "ansible_executor_audit_v1",
"operation_types": sorted(ANSIBLE_OPERATION_TYPES),
"required_audit_fields": [
"operation_type",
"status",
"actor",
"input.executor",
"input.playbook_path",
"input.check_mode",
"output.not_used_reason",
"dry_run_result",
],
"default_execution_mode": "catalog/dry-run audit only until approval execution is explicitly wired",
},
"candidate_catalog": _catalog_hints(incident, drift),
"not_used_reason": (
None
if records
else "no automation_operation_log row with Ansible operation type, tag, or executor backend for this source"
),
}

View File

@@ -17,6 +17,7 @@ import structlog
from sqlalchemy import text
from src.db.base import get_db_context
from src.services.awooop_ansible_audit_service import build_ansible_truth
logger = structlog.get_logger(__name__)
@@ -421,15 +422,30 @@ async def fetch_truth_chain(source_id: str, project_id: str = "awoooi") -> dict[
error,
duration_ms,
tags,
input ->> 'executor' AS input_executor,
input ->> 'execution_backend' AS input_execution_backend,
input ->> 'playbook_id' AS input_playbook_id,
input ->> 'playbook_path' AS input_playbook_path,
input ->> 'ansible_playbook_path' AS input_ansible_playbook_path,
input ->> 'check_mode' AS input_check_mode,
input ->> 'not_used_reason' AS input_not_used_reason,
output ->> 'executor' AS output_executor,
output ->> 'execution_backend' AS output_execution_backend,
output ->> 'playbook_id' AS output_playbook_id,
output ->> 'playbook_path' AS output_playbook_path,
output ->> 'ansible_playbook_path' AS output_ansible_playbook_path,
output ->> 'check_mode' AS output_check_mode,
output ->> 'not_used_reason' AS output_not_used_reason,
created_at
FROM automation_operation_log
WHERE coalesce(input::text, '') LIKE :needle
WHERE incident_id::text = :incident_id
OR coalesce(input::text, '') LIKE :needle
OR coalesce(output::text, '') LIKE :needle
OR coalesce(array_to_string(tags, ','), '') LIKE :needle
ORDER BY created_at DESC
LIMIT :limit
""",
{"needle": f"%{incident_id}%", "limit": _MAX_ROWS},
{"incident_id": incident_id, "needle": f"%{incident_id}%", "limit": _MAX_ROWS},
)
km_entries = await _fetch_all(
db,
@@ -626,11 +642,7 @@ async def fetch_truth_chain(source_id: str, project_id: str = "awoooi") -> dict[
},
"execution": {
"automation_operation_log": automation_ops,
"ansible": {
"considered": False,
"records": [],
"not_used_reason": "no first-class Ansible executor audit record in current truth chain",
},
"ansible": build_ansible_truth(automation_ops, incident=incident, drift=drift),
},
"learning": {
"knowledge_entries": km_entries,

View File

@@ -104,6 +104,11 @@ _AUTOMATION_STAGE_MAP = {
"capacity_recommendation": "investigator",
"quota_enforced": "safe",
"notification_formatted": "safe",
"ansible_candidate_matched": "ai_router",
"ansible_check_mode_executed": "executor",
"ansible_apply_executed": "executor",
"ansible_rollback_executed": "executor",
"ansible_execution_skipped": "safe",
}
_AUTOMATION_STATUS_MAP = {
"pending": "pending",