From a4e11bfa922dcf71f36a9f2e8b6b822c12a3cce7 Mon Sep 17 00:00:00 2001 From: OG T Date: Mon, 6 Apr 2026 14:38:59 +0800 Subject: [PATCH] feat(api): AuditLog + Langfuse Trace for SSH_COMMAND (Sprint 3 T5) Co-Authored-By: Claude Sonnet 4.6 --- apps/api/src/services/host_repair_agent.py | 91 +++++++++++++++++++++- apps/api/tests/test_host_repair_agent.py | 24 ++++++ 2 files changed, 112 insertions(+), 3 deletions(-) diff --git a/apps/api/src/services/host_repair_agent.py b/apps/api/src/services/host_repair_agent.py index e53c486e..1bf80c02 100644 --- a/apps/api/src/services/host_repair_agent.py +++ b/apps/api/src/services/host_repair_agent.py @@ -200,11 +200,47 @@ class HostRepairAgent: error="" if success else output, ) + async def _write_audit_log( + self, + uri: str, + success: bool, + output: str, + error: str | None, + duration_ms: int, + ) -> None: + """寫入 SSH_COMMAND 稽核日誌到 PostgreSQL。 + 2026-04-06 Claude Code: Sprint 3 T5 + """ + try: + from src.db.base import get_db_context + from src.db.models import AuditLog + async with get_db_context() as db: + audit = AuditLog( + approval_id="auto_repair", # nullable=False — SSH_COMMAND 不走 approval flow + operation_type="SSH_COMMAND", + target_resource=uri[:200], + namespace="host-layer", + success=success, + error_message=error, + k8s_response={"output": output[:1000]} if output else None, + executed_by="auto_repair", + execution_duration_ms=duration_ms, + dry_run_passed=True, + dry_run_message=None, + ) + db.add(audit) + await db.commit() + logger.info("ssh_command_audit_written", uri=uri, success=success) + except Exception as e: + logger.error("ssh_command_audit_failed", uri=uri, error=str(e)) + # Do not re-raise — audit failure must not affect repair result + async def repair_by_uri(self, command: str, approved: bool = False) -> HostRepairResult: """ 根據 URI scheme 路由至對應的執行路徑。 2026-04-06 Claude Code: Sprint 3 T3 2026-04-06 Claude Code: Sprint 3 T4 — Redis 冪等鎖防止重複修復 + 2026-04-06 Claude Code: Sprint 3 T5 — AuditLog + Langfuse Trace """ try: uri = parse_uri_command(command) @@ -263,13 +299,29 @@ class HostRepairAgent: redis_lock = None # Redis 未連線,fail open if redis_lock is None: - return await _execute() + import time as _time + _start = _time.monotonic() + result = await _execute() + duration_ms = int((_time.monotonic() - _start) * 1000) + try: + await self._write_audit_log(uri=command, success=result.success, output=result.output, error=result.error or None, duration_ms=duration_ms) + except Exception: + pass + return result try: acquired = await redis_lock.acquire() except Exception: # Redis 不可用,fail open - return await _execute() + import time as _time + _start = _time.monotonic() + result = await _execute() + duration_ms = int((_time.monotonic() - _start) * 1000) + try: + await self._write_audit_log(uri=command, success=result.success, output=result.output, error=result.error or None, duration_ms=duration_ms) + except Exception: + pass + return result if not acquired: # Redis 鎖已被其他 Pod 持有 @@ -281,13 +333,46 @@ class HostRepairAgent: ) try: - return await _execute() + import time as _time + _start = _time.monotonic() + result = await _execute() + duration_ms = int((_time.monotonic() - _start) * 1000) finally: try: await redis_lock.release() except Exception: pass + # AuditLog (fire and forget) + try: + await self._write_audit_log( + uri=command, + success=result.success, + output=result.output, + error=result.error or None, + duration_ms=duration_ms, + ) + except Exception: + pass + + # Langfuse trace (fire and forget) + try: + from src.services.langfuse_client import get_langfuse + lf = get_langfuse() + if lf: + trace = lf.trace(name="ssh_command_repair") + trace.span( + name=f"{uri.scheme}_execute", + input={"uri": command}, + output={"success": result.success, "output": result.output[:500] if result.output else ""}, + metadata={"duration_ms": duration_ms, "scheme": uri.scheme}, + ) + lf.flush() + except Exception as lf_err: + logger.debug("langfuse_trace_skipped", error=str(lf_err)) + + return result + async def _execute_openclaw(self, layer: str, component: str) -> HostRepairResult: """openclaw:// — 呼叫現有的 repair(layer, component) 邏輯""" return await self.repair(layer=layer, component=component) diff --git a/apps/api/tests/test_host_repair_agent.py b/apps/api/tests/test_host_repair_agent.py index 38e70c0f..13272d80 100644 --- a/apps/api/tests/test_host_repair_agent.py +++ b/apps/api/tests/test_host_repair_agent.py @@ -273,6 +273,30 @@ class TestRepairByUri: assert "Unsupported scheme" in result.error +class TestAuditLog: + @pytest.mark.asyncio + async def test_successful_repair_writes_audit_log(self): + """成功修復應寫入 AuditLog 到 DB""" + from src.services.host_repair_agent import HostRepairAgent, HostRepairResult + from unittest.mock import patch, AsyncMock + + agent = HostRepairAgent() + + with patch.object(agent, "_execute_openclaw", new_callable=AsyncMock) as mock_oc, \ + patch.object(agent, "_write_audit_log", new_callable=AsyncMock) as mock_audit: + + mock_oc.return_value = HostRepairResult( + success=True, layer="docker-110", component="sentry", output="REPAIR_OK:sentry" + ) + + result = await agent.repair_by_uri("openclaw://docker-110/sentry") + + assert result.success is True + assert mock_audit.called, "AuditLog should be called" + call_kwargs = mock_audit.call_args + assert call_kwargs is not None + + class TestRepairLock: @pytest.mark.asyncio async def test_duplicate_repair_is_blocked(self):