feat(api): AuditLog + Langfuse Trace for SSH_COMMAND (Sprint 3 T5)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
OG T
2026-04-06 14:38:59 +08:00
parent 02510d3d93
commit a4e11bfa92
2 changed files with 112 additions and 3 deletions

View File

@@ -200,11 +200,47 @@ class HostRepairAgent:
error="" if success else output,
)
async def _write_audit_log(
self,
uri: str,
success: bool,
output: str,
error: str | None,
duration_ms: int,
) -> None:
"""寫入 SSH_COMMAND 稽核日誌到 PostgreSQL。
2026-04-06 Claude Code: Sprint 3 T5
"""
try:
from src.db.base import get_db_context
from src.db.models import AuditLog
async with get_db_context() as db:
audit = AuditLog(
approval_id="auto_repair", # nullable=False — SSH_COMMAND 不走 approval flow
operation_type="SSH_COMMAND",
target_resource=uri[:200],
namespace="host-layer",
success=success,
error_message=error,
k8s_response={"output": output[:1000]} if output else None,
executed_by="auto_repair",
execution_duration_ms=duration_ms,
dry_run_passed=True,
dry_run_message=None,
)
db.add(audit)
await db.commit()
logger.info("ssh_command_audit_written", uri=uri, success=success)
except Exception as e:
logger.error("ssh_command_audit_failed", uri=uri, error=str(e))
# Do not re-raise — audit failure must not affect repair result
async def repair_by_uri(self, command: str, approved: bool = False) -> HostRepairResult:
"""
根據 URI scheme 路由至對應的執行路徑。
2026-04-06 Claude Code: Sprint 3 T3
2026-04-06 Claude Code: Sprint 3 T4 — Redis 冪等鎖防止重複修復
2026-04-06 Claude Code: Sprint 3 T5 — AuditLog + Langfuse Trace
"""
try:
uri = parse_uri_command(command)
@@ -263,13 +299,29 @@ class HostRepairAgent:
redis_lock = None # Redis 未連線fail open
if redis_lock is None:
return await _execute()
import time as _time
_start = _time.monotonic()
result = await _execute()
duration_ms = int((_time.monotonic() - _start) * 1000)
try:
await self._write_audit_log(uri=command, success=result.success, output=result.output, error=result.error or None, duration_ms=duration_ms)
except Exception:
pass
return result
try:
acquired = await redis_lock.acquire()
except Exception:
# Redis 不可用fail open
return await _execute()
import time as _time
_start = _time.monotonic()
result = await _execute()
duration_ms = int((_time.monotonic() - _start) * 1000)
try:
await self._write_audit_log(uri=command, success=result.success, output=result.output, error=result.error or None, duration_ms=duration_ms)
except Exception:
pass
return result
if not acquired:
# Redis 鎖已被其他 Pod 持有
@@ -281,13 +333,46 @@ class HostRepairAgent:
)
try:
return await _execute()
import time as _time
_start = _time.monotonic()
result = await _execute()
duration_ms = int((_time.monotonic() - _start) * 1000)
finally:
try:
await redis_lock.release()
except Exception:
pass
# AuditLog (fire and forget)
try:
await self._write_audit_log(
uri=command,
success=result.success,
output=result.output,
error=result.error or None,
duration_ms=duration_ms,
)
except Exception:
pass
# Langfuse trace (fire and forget)
try:
from src.services.langfuse_client import get_langfuse
lf = get_langfuse()
if lf:
trace = lf.trace(name="ssh_command_repair")
trace.span(
name=f"{uri.scheme}_execute",
input={"uri": command},
output={"success": result.success, "output": result.output[:500] if result.output else ""},
metadata={"duration_ms": duration_ms, "scheme": uri.scheme},
)
lf.flush()
except Exception as lf_err:
logger.debug("langfuse_trace_skipped", error=str(lf_err))
return result
async def _execute_openclaw(self, layer: str, component: str) -> HostRepairResult:
"""openclaw:// — 呼叫現有的 repair(layer, component) 邏輯"""
return await self.repair(layer=layer, component=component)

View File

@@ -273,6 +273,30 @@ class TestRepairByUri:
assert "Unsupported scheme" in result.error
class TestAuditLog:
@pytest.mark.asyncio
async def test_successful_repair_writes_audit_log(self):
"""成功修復應寫入 AuditLog 到 DB"""
from src.services.host_repair_agent import HostRepairAgent, HostRepairResult
from unittest.mock import patch, AsyncMock
agent = HostRepairAgent()
with patch.object(agent, "_execute_openclaw", new_callable=AsyncMock) as mock_oc, \
patch.object(agent, "_write_audit_log", new_callable=AsyncMock) as mock_audit:
mock_oc.return_value = HostRepairResult(
success=True, layer="docker-110", component="sentry", output="REPAIR_OK:sentry"
)
result = await agent.repair_by_uri("openclaw://docker-110/sentry")
assert result.success is True
assert mock_audit.called, "AuditLog should be called"
call_kwargs = mock_audit.call_args
assert call_kwargs is not None
class TestRepairLock:
@pytest.mark.asyncio
async def test_duplicate_repair_is_blocked(self):