Files
awoooi/apps/api/tests/factories.py
OG T 49bfbd573c
Some checks failed
CD Pipeline / build-and-deploy (push) Failing after 2m34s
feat(test): B5 整合測試框架 — 真實 DB, 5/5 通過
新增:
- docker-compose.test.yml: CI 用臨時 pgvector PostgreSQL (port 15432)
- tests/factories.py: Incident/Approval/Knowledge/RAG 測試資料工廠
- tests/integration/test_b5_core_flows.py: 5 個 E2E 整合測試 (5/5 PASSED 1.03s)
- tests/integration/setup_test_schema.sql: CI schema 初始化 SQL
- cd.yaml: 新增 Integration Tests B5 step
- scripts/sync_dev_db.py: dev DB 同步工具

修正:
- .env.test: DATABASE_URL 指向 awoooi_dev (本機設定, gitignore 不入庫)

禁止 Mock 鐵律: 所有 DB 測試使用真實 PostgreSQL, 無 SQLite/MagicMock

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-10 11:22:57 +08:00

190 lines
5.5 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
AWOOOI 測試資料工廠
===================
為整合測試提供標準化測試資料建構函式
使用真實 ORM 模型,直接寫入真實 DB
原則:
- 不使用 Mock — 資料直接寫入真實 DB
- 透過 ORM 模型確保欄位型別正確
- 每個 factory 回傳 ORM 物件
2026-04-10 Claude Sonnet 4.6 Asia/Taipei
"""
from __future__ import annotations
import uuid
from datetime import datetime, timedelta, timezone
from sqlalchemy.ext.asyncio import AsyncSession
from src.db.models import ApprovalRecord, KnowledgeEntryRecord
TAIPEI = timezone(timedelta(hours=8))
def _now() -> datetime:
return datetime.now(tz=TAIPEI)
def _uuid() -> str:
return str(uuid.uuid4())
def _inc_id() -> str:
"""生成符合 INC-YYYYMMDD-XXXXXX 格式的 incident_id"""
now = datetime.now(tz=TAIPEI)
suffix = uuid.uuid4().hex[:6].upper()
return f"INC-{now.strftime('%Y%m%d')}-{suffix}"
# =============================================================================
# Approval Factory
# =============================================================================
async def create_approval(
session: AsyncSession,
*,
action: str = "RESTART_POD",
description: str = "Test approval for integration testing",
risk_level: str = "MEDIUM",
status: str = "PENDING",
requested_by: str = "test-system",
) -> ApprovalRecord:
"""建立測試 ApprovalRecord回傳 ORM 物件"""
record = ApprovalRecord(
id=_uuid(),
action=action,
description=description,
risk_level=risk_level,
status=status,
required_signatures=1,
current_signatures=0,
signatures=[],
blast_radius={},
dry_run_checks=[],
requested_by=requested_by,
extra_metadata={},
hit_count=1,
created_at=_now(),
updated_at=_now(),
)
session.add(record)
await session.flush()
return record
# =============================================================================
# Knowledge Entry Factory
# =============================================================================
async def create_knowledge_entry(
session: AsyncSession,
*,
title: str = "Test Entry",
content: str = "Test content for integration testing",
category: str = "test",
entry_type: str = "RUNBOOK",
status: str = "DRAFT",
tags: list[str] | None = None,
created_by: str = "test",
) -> KnowledgeEntryRecord:
"""建立測試知識條目"""
entry = KnowledgeEntryRecord(
id=_uuid(),
title=title,
content=content,
category=category,
entry_type=entry_type,
status=status,
tags=tags or [],
source="HUMAN",
view_count=0,
created_by=created_by,
created_at=_now(),
updated_at=_now(),
)
session.add(entry)
await session.flush()
return entry
# =============================================================================
# Incident Factory (raw SQL — 結構複雜)
# =============================================================================
async def create_incident(
session: AsyncSession,
*,
anomaly_type: str = "pod_crash",
host: str = "mon",
severity: str = "P1",
status: str = "INVESTIGATING",
) -> dict:
"""建立測試 Incident回傳包含 incident_id 的 dict"""
from sqlalchemy import text
incident_id = _inc_id()
signals_json = f'[{{"type":"{anomaly_type}","host":"{host}","severity":"{severity}"}}]'
await session.execute(
text("""
INSERT INTO incidents
(incident_id, status, severity, signals, affected_services,
decision_chain, proposal_ids, outcome, created_at, updated_at, ttl_days, vectorized)
VALUES
(:id, CAST(:status AS incidentstatus), CAST(:severity AS severity),
CAST(:signals AS json), CAST(:aff AS json),
CAST(:dc AS json), CAST(:pids AS json), CAST(:outcome AS json),
:now, :now, 30, false)
"""),
{
"id": incident_id,
"status": status,
"severity": severity,
"signals": signals_json,
"aff": "[]",
"dc": "[]",
"pids": "[]",
"outcome": "{}",
"now": _now(),
},
)
await session.flush()
return {"incident_id": incident_id, "status": status, "severity": severity}
# =============================================================================
# RAG Chunk Factory (raw SQL — vector extension 依賴)
# =============================================================================
async def create_rag_chunk(
session: AsyncSession,
*,
source: str = "test",
source_id: str | None = None,
title: str = "Test Document",
chunk_text: str = "This is a test chunk for integration testing.",
) -> dict:
"""建立 RAG chunk (無 embedding)"""
from sqlalchemy import text
chunk_id = _uuid()
sid = source_id or f"test-{_uuid()[:8]}"
await session.execute(
text("""
INSERT INTO rag_chunks (id, source, source_id, title, chunk_text, metadata, created_at)
VALUES (:id, :src, :sid, :title, :text, '{}'::jsonb, :now)
"""),
{
"id": chunk_id,
"src": source,
"sid": sid,
"title": title,
"text": chunk_text,
"now": _now(),
},
)
await session.flush()
return {"id": chunk_id, "source": source, "source_id": sid}