feat(awooop): run ansible check-mode evidence worker
All checks were successful
CD Pipeline / tests (push) Successful in 1m28s
Code Review / ai-code-review (push) Successful in 12s
CD Pipeline / build-and-deploy (push) Successful in 5m9s
CD Pipeline / post-deploy-checks (push) Successful in 1m30s

This commit is contained in:
Your Name
2026-05-31 12:53:15 +08:00
parent 742980f398
commit 9080ba3670
8 changed files with 814 additions and 2 deletions

View File

@@ -609,6 +609,36 @@ class Settings(BaseSettings):
"(X-AwoooP-Operator-Key header)"
),
)
ENABLE_AWOOOP_ANSIBLE_CHECK_MODE_WORKER: bool = Field(
default=False,
description=(
"True=consume ansible_candidate_matched AOL rows and run "
"ansible-playbook --check --diff only. Apply remains disabled."
),
)
AWOOOP_ANSIBLE_CHECK_MODE_INTERVAL_SECONDS: int = Field(
default=300,
ge=60,
description="AwoooP Ansible check-mode worker polling interval.",
)
AWOOOP_ANSIBLE_CHECK_MODE_BATCH_LIMIT: int = Field(
default=1,
ge=1,
le=5,
description="Maximum Ansible check-mode candidates claimed per worker tick.",
)
AWOOOP_ANSIBLE_CHECK_MODE_TIMEOUT_SECONDS: int = Field(
default=180,
ge=30,
le=600,
description="Timeout for one ansible-playbook --check --diff execution.",
)
AWOOOP_ANSIBLE_CHECK_MODE_STARTUP_SLEEP_SECONDS: int = Field(
default=120,
ge=0,
le=900,
description="Delay before the check-mode worker first tick after API startup.",
)
# ==========================================================================
# 統帥鐵律:禁止 SQLite (AWOOOI 憲法)

View File

@@ -0,0 +1,44 @@
"""AwoooP Ansible check-mode worker loop.
Runs only when explicitly enabled by settings. The worker consumes pending
``ansible_candidate_matched`` rows and records check-mode evidence; it never
executes Ansible apply.
"""
from __future__ import annotations
import asyncio
import structlog
from src.core.config import settings
from src.services.awooop_ansible_check_mode_service import run_pending_check_modes_once
logger = structlog.get_logger(__name__)
async def run_awooop_ansible_check_mode_loop() -> None:
if not settings.ENABLE_AWOOOP_ANSIBLE_CHECK_MODE_WORKER:
logger.info("awooop_ansible_check_mode_worker_disabled")
return
logger.info(
"awooop_ansible_check_mode_worker_started",
interval_seconds=settings.AWOOOP_ANSIBLE_CHECK_MODE_INTERVAL_SECONDS,
batch_limit=settings.AWOOOP_ANSIBLE_CHECK_MODE_BATCH_LIMIT,
timeout_seconds=settings.AWOOOP_ANSIBLE_CHECK_MODE_TIMEOUT_SECONDS,
)
await asyncio.sleep(settings.AWOOOP_ANSIBLE_CHECK_MODE_STARTUP_SLEEP_SECONDS)
while True:
try:
result = await run_pending_check_modes_once(
limit=settings.AWOOOP_ANSIBLE_CHECK_MODE_BATCH_LIMIT,
timeout_seconds=settings.AWOOOP_ANSIBLE_CHECK_MODE_TIMEOUT_SECONDS,
)
if result.get("claimed") or result.get("blockers"):
logger.info("awooop_ansible_check_mode_worker_tick", **result)
except Exception as exc:
logger.warning("awooop_ansible_check_mode_worker_failed", error=str(exc))
await asyncio.sleep(settings.AWOOOP_ANSIBLE_CHECK_MODE_INTERVAL_SECONDS)

View File

@@ -521,6 +521,22 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]:
except Exception as e:
logger.warning("incident_lifecycle_reconciler_schedule_failed", error=str(e))
# AwoooP Ansible check-mode worker.
# 只執行 ansible-playbook --check --diff 並回寫 automation_operation_log
# apply 仍必須走 approval gate本 worker 不寫 auto_repair_executions。
try:
from src.jobs.awooop_ansible_check_mode_job import (
run_awooop_ansible_check_mode_loop,
)
asyncio.create_task(run_awooop_ansible_check_mode_loop())
logger.info(
"awooop_ansible_check_mode_worker_scheduled",
enabled=settings.ENABLE_AWOOOP_ANSIBLE_CHECK_MODE_WORKER,
interval_seconds=settings.AWOOOP_ANSIBLE_CHECK_MODE_INTERVAL_SECONDS,
)
except Exception as e:
logger.warning("awooop_ansible_check_mode_worker_schedule_failed", error=str(e))
# ADR-083 Phase 3: Evolver Agent每日— Playbook 自動合併 + 低信任封存
# 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 3 初始建立
try:

View File

@@ -105,6 +105,15 @@ _CATALOG: tuple[dict[str, Any], ...] = (
)
def get_ansible_catalog_item(catalog_id: str) -> dict[str, Any] | None:
"""Return one repo-known Ansible catalog item without exposing mutability."""
for item in _CATALOG:
if item["catalog_id"] == catalog_id:
return dict(item)
return None
def _get(row: dict[str, Any], key: str) -> Any:
return row.get(key)
@@ -156,6 +165,7 @@ def _is_ansible_operation(row: dict[str, Any]) -> bool:
def _ansible_record(row: dict[str, Any]) -> dict[str, Any]:
return {
"op_id": _get(row, "op_id"),
"parent_op_id": _get(row, "parent_op_id"),
"operation_type": _get(row, "operation_type"),
"status": _get(row, "status"),
"actor": _get(row, "actor"),
@@ -331,6 +341,9 @@ def build_ansible_decision_audit_payload(
"catalog_id": row["catalog_id"],
"playbook_path": row["playbook_path"],
"inventory_hosts": row["inventory_hosts"],
"supports_check_mode": row["supports_check_mode"],
"auto_apply_enabled": row["auto_apply_enabled"],
"approval_required": row["approval_required"],
"risk_level": row["risk_level"],
"match_score": row["match_score"],
"matched_keywords": row["matched_keywords"],

View File

@@ -0,0 +1,533 @@
"""Safe Ansible check-mode executor for AwoooP truth-chain evidence.
This service is deliberately dry-run only. It claims pending
``ansible_candidate_matched`` AOL rows, runs ``ansible-playbook --check --diff``,
and writes the result back as ``ansible_check_mode_executed``. It never enables
apply and never writes auto_repair_executions.
"""
from __future__ import annotations
import asyncio
import json
import os
import re
import shutil
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Any
import structlog
from sqlalchemy import text
from src.core.config import settings
from src.db.base import get_db_context
from src.services.awooop_ansible_audit_service import get_ansible_catalog_item
logger = structlog.get_logger(__name__)
_SAFE_HOST_RE = re.compile(r"^[A-Za-z0-9_.-]+$")
_PLAYBOOK_PREFIX = Path("infra/ansible/playbooks")
_STDOUT_LIMIT = 20_000
_STDERR_LIMIT = 12_000
@dataclass(frozen=True)
class AnsibleCheckModeClaim:
op_id: str
source_candidate_op_id: str
incident_id: str
catalog_id: str
playbook_path: str
inventory_hosts: tuple[str, ...]
input_payload: dict[str, Any]
@dataclass(frozen=True)
class AnsibleCommandSpec:
command: list[str]
cwd: Path
env: dict[str, str]
playbook_abs_path: Path
inventory_abs_path: Path
@dataclass(frozen=True)
class AnsibleRunResult:
returncode: int
stdout: str
stderr: str
duration_ms: int
timed_out: bool = False
def _tail(text_value: str, limit: int) -> str:
if len(text_value) <= limit:
return text_value
return text_value[-limit:]
def _json_loads(value: Any) -> dict[str, Any]:
if isinstance(value, dict):
return value
if isinstance(value, str):
try:
parsed = json.loads(value)
except json.JSONDecodeError:
return {}
return parsed if isinstance(parsed, dict) else {}
return {}
def _playbook_roots(module_path: Path | None = None) -> list[Path]:
resolved_module_path = (module_path or Path(__file__)).resolve()
return [
Path("/app/infra/ansible"),
Path.cwd() / "infra" / "ansible",
*(parent / "infra" / "ansible" for parent in resolved_module_path.parents),
]
def _runtime_blockers(
*,
playbook_root: Path | None = None,
repair_ssh_key_path: Path = Path("/etc/repair-ssh/id_ed25519"),
repair_known_hosts_path: Path = Path("/etc/repair-known-hosts/known_hosts"),
) -> list[str]:
root = playbook_root or next((path for path in _playbook_roots() if path.exists()), None)
blockers: list[str] = []
if shutil.which("ansible-playbook") is None:
blockers.append("ansible_playbook_binary_missing")
if root is None:
blockers.append("ansible_playbook_catalog_missing")
elif not (root / "inventory" / "hosts.yml").exists():
blockers.append("ansible_inventory_missing")
if not repair_ssh_key_path.is_file() or not os.access(repair_ssh_key_path, os.R_OK):
blockers.append("ansible_repair_ssh_key_missing")
if not repair_known_hosts_path.is_file() or not os.access(repair_known_hosts_path, os.R_OK):
blockers.append("ansible_repair_known_hosts_missing")
return blockers
def _safe_candidate(input_payload: dict[str, Any]) -> dict[str, Any]:
candidates = input_payload.get("executor_candidates")
if not isinstance(candidates, list) or not candidates:
raise ValueError("missing_executor_candidates")
for candidate in candidates:
if not isinstance(candidate, dict):
continue
catalog_id = str(candidate.get("catalog_id") or "")
catalog_item = get_ansible_catalog_item(catalog_id)
if not catalog_item:
continue
if catalog_item.get("supports_check_mode") is not True:
continue
if catalog_item.get("auto_apply_enabled") is True:
continue
playbook_path = str(candidate.get("playbook_path") or catalog_item.get("playbook_path") or "")
if playbook_path != str(catalog_item.get("playbook_path") or ""):
continue
inventory_hosts = candidate.get("inventory_hosts") or catalog_item.get("inventory_hosts") or []
if (
isinstance(inventory_hosts, list)
and inventory_hosts
and all(isinstance(host, str) and _SAFE_HOST_RE.fullmatch(host) for host in inventory_hosts)
):
return {
"catalog_id": catalog_id,
"playbook_path": playbook_path,
"inventory_hosts": tuple(inventory_hosts),
"risk_level": str(candidate.get("risk_level") or catalog_item.get("risk_level") or ""),
}
raise ValueError("no_safe_check_mode_candidate")
def build_ansible_check_mode_claim_input(
*,
source_candidate_op_id: str,
candidate_input: dict[str, Any],
) -> dict[str, Any]:
safe = _safe_candidate(candidate_input)
incident_id = str(candidate_input.get("incident_id") or "")
return {
"incident_id": incident_id,
"executor": "ansible",
"execution_backend": "ansible",
"execution_mode": "check_mode",
"check_mode": True,
"diff": True,
"apply_enabled": False,
"approval_required_before_apply": True,
"source_candidate_op_id": source_candidate_op_id,
"catalog_id": safe["catalog_id"],
"playbook_path": safe["playbook_path"],
"inventory_hosts": list(safe["inventory_hosts"]),
"risk_level": safe["risk_level"],
}
def _resolve_playbook_path(playbook_root: Path, playbook_path: str) -> Path:
relative = Path(playbook_path)
if relative.is_absolute() or not str(relative).startswith(str(_PLAYBOOK_PREFIX)):
raise ValueError("unsafe_playbook_path")
repo_root = playbook_root.parent.parent
resolved = (repo_root / relative).resolve()
allowed_root = (repo_root / _PLAYBOOK_PREFIX).resolve()
if allowed_root not in resolved.parents:
raise ValueError("playbook_outside_catalog")
if resolved.suffix not in {".yml", ".yaml"} or not resolved.exists():
raise ValueError("playbook_not_found")
return resolved
def build_ansible_check_mode_command(
*,
playbook_path: str,
inventory_hosts: tuple[str, ...],
playbook_root: Path | None = None,
repair_ssh_key_path: Path = Path("/etc/repair-ssh/id_ed25519"),
repair_known_hosts_path: Path = Path("/etc/repair-known-hosts/known_hosts"),
) -> AnsibleCommandSpec:
root = playbook_root or next((path for path in _playbook_roots() if path.exists()), None)
if root is None:
raise ValueError("ansible_playbook_catalog_missing")
inventory_path = (root / "inventory" / "hosts.yml").resolve()
if not inventory_path.exists():
raise ValueError("ansible_inventory_missing")
if not inventory_hosts or not all(_SAFE_HOST_RE.fullmatch(host) for host in inventory_hosts):
raise ValueError("unsafe_inventory_hosts")
playbook_abs = _resolve_playbook_path(root, playbook_path)
ssh_common_args = (
f"-o UserKnownHostsFile={repair_known_hosts_path} "
"-o IdentitiesOnly=yes -o BatchMode=yes"
)
extra_vars = {
"ansible_ssh_private_key_file": str(repair_ssh_key_path),
"ansible_ssh_common_args": ssh_common_args,
}
command = [
"ansible-playbook",
"-i",
str(inventory_path),
str(playbook_abs),
"--check",
"--diff",
"--limit",
",".join(inventory_hosts),
"--extra-vars",
json.dumps(extra_vars, ensure_ascii=False, separators=(",", ":")),
]
env = {
**os.environ,
"ANSIBLE_HOST_KEY_CHECKING": "true",
"ANSIBLE_RETRY_FILES_ENABLED": "false",
}
return AnsibleCommandSpec(
command=command,
cwd=root,
env=env,
playbook_abs_path=playbook_abs,
inventory_abs_path=inventory_path,
)
async def _run_ansible_command(spec: AnsibleCommandSpec, *, timeout_seconds: int) -> AnsibleRunResult:
started = time.monotonic()
process = await asyncio.create_subprocess_exec(
*spec.command,
cwd=str(spec.cwd),
env=spec.env,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
timed_out = False
try:
stdout_bytes, stderr_bytes = await asyncio.wait_for(
process.communicate(),
timeout=timeout_seconds,
)
except TimeoutError:
timed_out = True
process.kill()
stdout_bytes, stderr_bytes = await process.communicate()
duration_ms = int((time.monotonic() - started) * 1000)
return AnsibleRunResult(
returncode=124 if timed_out else int(process.returncode or 0),
stdout=stdout_bytes.decode("utf-8", "replace"),
stderr=stderr_bytes.decode("utf-8", "replace"),
duration_ms=duration_ms,
timed_out=timed_out,
)
def _build_result_payload(result: AnsibleRunResult) -> tuple[str, dict[str, Any], dict[str, Any], str | None]:
status = "success" if result.returncode == 0 else "failed"
stdout_tail = _tail(result.stdout, _STDOUT_LIMIT)
stderr_tail = _tail(result.stderr, _STDERR_LIMIT)
output = {
"executor": "ansible",
"execution_mode": "check_mode",
"check_mode": True,
"apply_enabled": False,
"approval_required_before_apply": True,
"returncode": result.returncode,
"timed_out": result.timed_out,
"stdout_tail": stdout_tail,
"stderr_tail": stderr_tail,
"next_required_step": "approval_required_before_ansible_apply",
}
dry_run_result = {
"check_mode_executed": True,
"apply_executed": False,
"safe_to_apply_without_approval": False,
"returncode": result.returncode,
"timed_out": result.timed_out,
"stdout_tail": stdout_tail,
"stderr_tail": stderr_tail,
}
error = None if result.returncode == 0 else (stderr_tail or f"ansible_check_mode_failed_rc_{result.returncode}")
return status, output, dry_run_result, error
async def claim_pending_check_modes(
*,
project_id: str = "awoooi",
limit: int = 1,
) -> list[AnsibleCheckModeClaim]:
"""Claim pending Ansible candidates by inserting pending check-mode rows."""
claims: list[AnsibleCheckModeClaim] = []
async with get_db_context(project_id) as db:
result = await db.execute(
text("""
SELECT
candidate.op_id,
candidate.input
FROM automation_operation_log candidate
WHERE candidate.operation_type = 'ansible_candidate_matched'
AND candidate.status = 'dry_run'
AND candidate.input ->> 'executor' = 'ansible'
AND COALESCE((candidate.dry_run_result ->> 'check_mode_executed')::boolean, false) = false
AND NOT EXISTS (
SELECT 1
FROM automation_operation_log existing
WHERE existing.parent_op_id = candidate.op_id
AND existing.operation_type IN (
'ansible_check_mode_executed',
'ansible_execution_skipped'
)
)
ORDER BY candidate.created_at ASC
LIMIT :limit
FOR UPDATE SKIP LOCKED
"""),
{"limit": max(1, limit)},
)
rows = result.mappings().all()
for row in rows:
source_op_id = str(row["op_id"])
candidate_input = _json_loads(row["input"])
try:
claim_input = build_ansible_check_mode_claim_input(
source_candidate_op_id=source_op_id,
candidate_input=candidate_input,
)
except ValueError as exc:
await _insert_skipped_candidate(
db,
source_candidate_op_id=source_op_id,
candidate_input=candidate_input,
reason=str(exc),
)
continue
inserted = await db.execute(
text("""
INSERT INTO automation_operation_log (
operation_type, actor, status,
input, output, dry_run_result,
parent_op_id, tags
) VALUES (
'ansible_check_mode_executed',
'ansible_check_mode_worker',
'pending',
CAST(:input AS jsonb),
'{}'::jsonb,
CAST(:dry_run_result AS jsonb),
CAST(:parent_op_id AS uuid),
:tags
)
RETURNING op_id
"""),
{
"input": json.dumps(claim_input, ensure_ascii=False),
"dry_run_result": json.dumps({
"check_mode_executed": False,
"apply_executed": False,
"claim_state": "claimed",
}, ensure_ascii=False),
"parent_op_id": source_op_id,
"tags": ["ansible", "check_mode", "pending", "apply_locked"],
},
)
op_id = str(inserted.scalar_one())
claims.append(
AnsibleCheckModeClaim(
op_id=op_id,
source_candidate_op_id=source_op_id,
incident_id=str(claim_input.get("incident_id") or ""),
catalog_id=str(claim_input["catalog_id"]),
playbook_path=str(claim_input["playbook_path"]),
inventory_hosts=tuple(str(host) for host in claim_input["inventory_hosts"]),
input_payload=claim_input,
)
)
return claims
async def _insert_skipped_candidate(
db: Any,
*,
source_candidate_op_id: str,
candidate_input: dict[str, Any],
reason: str,
) -> None:
input_payload = {
"incident_id": str(candidate_input.get("incident_id") or ""),
"executor": "ansible",
"execution_backend": "ansible",
"execution_mode": "check_mode",
"check_mode": True,
"apply_enabled": False,
"source_candidate_op_id": source_candidate_op_id,
"not_used_reason": reason,
}
await db.execute(
text("""
INSERT INTO automation_operation_log (
operation_type, actor, status,
input, output, dry_run_result,
parent_op_id, tags
) VALUES (
'ansible_execution_skipped',
'ansible_check_mode_worker',
'dry_run',
CAST(:input AS jsonb),
CAST(:output AS jsonb),
CAST(:dry_run_result AS jsonb),
CAST(:parent_op_id AS uuid),
:tags
)
"""),
{
"input": json.dumps(input_payload, ensure_ascii=False),
"output": json.dumps({
"not_used_reason": reason,
"decision_effect": "skipped_before_runtime",
}, ensure_ascii=False),
"dry_run_result": json.dumps({
"check_mode_executed": False,
"apply_executed": False,
"skipped": True,
"reason": reason,
}, ensure_ascii=False),
"parent_op_id": source_candidate_op_id,
"tags": ["ansible", "check_mode", "skipped", "apply_locked"],
},
)
async def finalize_check_mode_claim(
claim: AnsibleCheckModeClaim,
result: AnsibleRunResult,
*,
project_id: str = "awoooi",
) -> None:
status, output, dry_run_result, error = _build_result_payload(result)
async with get_db_context(project_id) as db:
await db.execute(
text("""
UPDATE automation_operation_log
SET status = :status,
output = CAST(:output AS jsonb),
dry_run_result = CAST(:dry_run_result AS jsonb),
error = :error,
duration_ms = :duration_ms,
stderr_feed_back = :stderr
WHERE op_id = CAST(:op_id AS uuid)
"""),
{
"status": status,
"output": json.dumps(output, ensure_ascii=False),
"dry_run_result": json.dumps(dry_run_result, ensure_ascii=False),
"error": _tail(error or "", 2000) or None,
"duration_ms": result.duration_ms,
"stderr": _tail(result.stderr, _STDERR_LIMIT),
"op_id": claim.op_id,
},
)
async def run_claimed_check_mode(
claim: AnsibleCheckModeClaim,
*,
timeout_seconds: int,
project_id: str = "awoooi",
) -> AnsibleRunResult:
try:
spec = build_ansible_check_mode_command(
playbook_path=claim.playbook_path,
inventory_hosts=claim.inventory_hosts,
)
result = await _run_ansible_command(spec, timeout_seconds=timeout_seconds)
except Exception as exc:
result = AnsibleRunResult(
returncode=1,
stdout="",
stderr=f"ansible_check_mode_runtime_error: {exc}",
duration_ms=0,
)
await finalize_check_mode_claim(claim, result, project_id=project_id)
logger.info(
"ansible_check_mode_claim_completed",
op_id=claim.op_id,
source_candidate_op_id=claim.source_candidate_op_id,
incident_id=claim.incident_id,
catalog_id=claim.catalog_id,
returncode=result.returncode,
timed_out=result.timed_out,
)
return result
async def run_pending_check_modes_once(
*,
project_id: str = "awoooi",
limit: int = 1,
timeout_seconds: int | None = None,
) -> dict[str, Any]:
blockers = _runtime_blockers()
if blockers:
logger.warning("ansible_check_mode_runtime_blocked", blockers=blockers)
return {"claimed": 0, "completed": 0, "failed": 0, "blockers": blockers}
claims = await claim_pending_check_modes(project_id=project_id, limit=limit)
completed = 0
failed = 0
for claim in claims:
result = await run_claimed_check_mode(
claim,
timeout_seconds=timeout_seconds or settings.AWOOOP_ANSIBLE_CHECK_MODE_TIMEOUT_SECONDS,
project_id=project_id,
)
completed += 1
if result.returncode != 0:
failed += 1
return {
"claimed": len(claims),
"completed": completed,
"failed": failed,
"blockers": [],
}

View File

@@ -691,18 +691,32 @@ def _execution_backend_summary(records: list[dict[str, Any]]) -> dict[str, Any]:
summary["ansible_considered_total"] += 1
summary["ansible_audit_record_total"] += len(ansible_records)
summary["ansible_candidate_total"] += len(candidates)
terminal_check_mode_parent_ids = {
str(row.get("parent_op_id"))
for row in ansible_records
if isinstance(row, dict)
and str(row.get("operation_type") or "") in {
"ansible_check_mode_executed",
"ansible_execution_skipped",
}
and row.get("parent_op_id")
}
for row in ansible_records:
if not isinstance(row, dict):
continue
operation_type = str(row.get("operation_type") or "")
if operation_type == "ansible_check_mode_executed":
status = str(row.get("status") or "").lower()
if operation_type == "ansible_check_mode_executed" and status != "pending":
summary["ansible_check_mode_total"] += 1
elif operation_type == "ansible_apply_executed":
summary["ansible_apply_total"] += 1
elif operation_type == "ansible_rollback_executed":
summary["ansible_rollback_total"] += 1
elif operation_type == "ansible_candidate_matched":
elif (
operation_type == "ansible_candidate_matched"
and str(row.get("op_id")) not in terminal_check_mode_parent_ids
):
summary["ansible_pending_check_mode_total"] += 1
return summary
@@ -1233,6 +1247,7 @@ async def fetch_truth_chain(source_id: str, project_id: str = "awoooi") -> dict[
status,
incident_id,
run_id,
parent_op_id,
actor,
dry_run_result,
error,

View File

@@ -9,11 +9,16 @@ from src.services.awooop_ansible_audit_service import (
build_ansible_decision_audit_payload,
build_ansible_truth,
)
from src.services.awooop_ansible_check_mode_service import (
build_ansible_check_mode_claim_input,
build_ansible_check_mode_command,
)
from src.services.awooop_truth_chain_service import (
_ansible_playbook_roots,
_ansible_runtime_readiness,
_automation_quality_score_bucket,
_clean_row,
_execution_backend_summary,
_incident_fingerprints,
_summarize_gateway_mcp,
_truth_status,
@@ -855,3 +860,149 @@ def test_ansible_decision_audit_payload_is_dry_run_only() -> None:
assert payload["input"]["executor_candidates"]
assert payload["output"]["decision_effect"] == "audit_only"
assert payload["dry_run_result"]["check_mode_executed"] is False
def test_ansible_decision_audit_payload_exposes_check_mode_safety_flags() -> None:
incident = SimpleNamespace(
incident_id="INC-MOMO",
project_id="awoooi",
alert_category="database",
notification_type="TYPE-3",
severity=SimpleNamespace(value="P3"),
affected_services=["momo"],
signals=[
SimpleNamespace(
alert_name="MomoPostgresBackupFailed",
labels={"alertname": "MomoPostgresBackupFailed", "instance": "188"},
annotations={},
)
],
)
payload = build_ansible_decision_audit_payload(
incident=incident,
proposal_data={"source": "expert_system", "risk_level": "low"},
decision_path="manual_approval",
not_used_reason="candidate audit",
)
candidate = payload["input"]["executor_candidates"][0]
assert candidate["catalog_id"] == "ansible:188-ai-web"
assert candidate["supports_check_mode"] is True
assert candidate["auto_apply_enabled"] is False
assert candidate["approval_required"] is True
def test_ansible_check_mode_claim_input_keeps_apply_locked() -> None:
candidate_input = {
"incident_id": "INC-MOMO",
"executor": "ansible",
"executor_candidates": [
{
"catalog_id": "ansible:188-ai-web",
"playbook_path": "infra/ansible/playbooks/188-ai-web.yml",
"inventory_hosts": ["host_188"],
"risk_level": "medium",
}
],
}
claim = build_ansible_check_mode_claim_input(
source_candidate_op_id="00000000-0000-0000-0000-000000000001",
candidate_input=candidate_input,
)
assert claim["execution_mode"] == "check_mode"
assert claim["check_mode"] is True
assert claim["diff"] is True
assert claim["apply_enabled"] is False
assert claim["approval_required_before_apply"] is True
assert claim["playbook_path"] == "infra/ansible/playbooks/188-ai-web.yml"
def test_ansible_check_mode_claim_rejects_non_check_mode_catalog() -> None:
candidate_input = {
"incident_id": "INC-SSH",
"executor": "ansible",
"executor_candidates": [
{
"catalog_id": "ansible:restore-password-auth",
"playbook_path": "infra/ansible/playbooks/restore-password-auth.yml",
"inventory_hosts": ["host_188"],
"risk_level": "high",
}
],
}
try:
build_ansible_check_mode_claim_input(
source_candidate_op_id="00000000-0000-0000-0000-000000000002",
candidate_input=candidate_input,
)
except ValueError as exc:
assert str(exc) == "no_safe_check_mode_candidate"
else:
raise AssertionError("non-check-mode catalog should be rejected")
def test_ansible_check_mode_command_uses_check_diff_and_repair_ssh(tmp_path: Path) -> None:
playbook_root = tmp_path / "infra" / "ansible"
playbook_dir = playbook_root / "playbooks"
inventory_dir = playbook_root / "inventory"
playbook_dir.mkdir(parents=True)
inventory_dir.mkdir(parents=True)
(playbook_dir / "188-ai-web.yml").write_text("---\n- hosts: host_188\n tasks: []\n")
(inventory_dir / "hosts.yml").write_text("all: {}\n")
repair_key = tmp_path / "id_ed25519"
known_hosts = tmp_path / "known_hosts"
repair_key.write_text("key")
known_hosts.write_text("host key")
spec = build_ansible_check_mode_command(
playbook_path="infra/ansible/playbooks/188-ai-web.yml",
inventory_hosts=("host_188",),
playbook_root=playbook_root,
repair_ssh_key_path=repair_key,
repair_known_hosts_path=known_hosts,
)
assert "--check" in spec.command
assert "--diff" in spec.command
assert "--limit" in spec.command
assert "host_188" in spec.command
assert "ansible_ssh_private_key_file" in spec.command[-1]
assert str(repair_key) in spec.command[-1]
assert str(known_hosts) in spec.command[-1]
assert "apply" not in " ".join(spec.command)
def test_execution_backend_summary_subtracts_completed_check_mode_parent() -> None:
summary = _execution_backend_summary([
{
"execution": {
"ansible": {
"considered": True,
"candidate_catalog": {"candidates": [{"catalog_id": "ansible:188-ai-web"}]},
"records": [
{
"op_id": "candidate-1",
"operation_type": "ansible_candidate_matched",
"status": "dry_run",
},
{
"op_id": "check-1",
"parent_op_id": "candidate-1",
"operation_type": "ansible_check_mode_executed",
"status": "success",
},
],
},
"automation_operation_log": [],
"auto_repair_executions": [],
},
"automation_quality": {"facts": {}},
}
])
assert summary["ansible_check_mode_total"] == 1
assert summary["ansible_pending_check_mode_total"] == 0

View File

@@ -101,6 +101,16 @@ spec:
value: "80"
- name: PROMETHEUS_MULTIPROC_DIR
value: "/tmp/awoooi-prometheus-multiproc"
- name: ENABLE_AWOOOP_ANSIBLE_CHECK_MODE_WORKER
value: "true"
- name: AWOOOP_ANSIBLE_CHECK_MODE_INTERVAL_SECONDS
value: "300"
- name: AWOOOP_ANSIBLE_CHECK_MODE_BATCH_LIMIT
value: "1"
- name: AWOOOP_ANSIBLE_CHECK_MODE_TIMEOUT_SECONDS
value: "180"
- name: AWOOOP_ANSIBLE_CHECK_MODE_STARTUP_SLEEP_SECONDS
value: "120"
# 2026-04-05 Claude Code: Sprint 3 — 掛載 SSH key 供 HostRepairAgent 使用
volumeMounts:
- name: repair-ssh-key