feat(ollama): Phase 31-34 ADR-067 — Log摘要/PR審查/RAG知識庫/圖片分析
Some checks are pending
CD Pipeline / build-and-deploy (push) Has started running
Some checks are pending
CD Pipeline / build-and-deploy (push) Has started running
Phase 31: log_summary_service.py — deepseek-r1:14b K8s Pod日誌異常摘要
- 觸發: signoz_webhook 告警時背景呼叫
- Redis快取 log_summary:{pod}:{date} TTL 24h
- 敏感資料regex遮蔽
Phase 32: local_code_review_service.py — qwen2.5-coder:7b PR自動審查
- Fallback: Gemini (diff > 50KB 或 Ollama超時)
- semaphore 最多2個同時審查
- 雙寫: Redis TTL 7d + pr_reviews表 (phase29 migration)
Phase 33: knowledge_rag_service.py — nomic-embed-text 768維 pgvector RAG
- 向量化(188) + 生成(111) 雙Ollama
- rag_chunks表 (phase28 migration)
- 初期線性搜尋,>100筆啟用ivfflat索引
Phase 34: image_analysis_service.py — llava:latest Telegram圖片分析
- download_and_analyze: Bot API getFile → 下載 → llava → 回應
- Rate limit: 每chat_id每分鐘3次 (Redis sliding window)
- telegram.py webhook新增photo分支
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
28
apps/api/migrations/phase28_rag_pgvector.sql
Normal file
28
apps/api/migrations/phase28_rag_pgvector.sql
Normal file
@@ -0,0 +1,28 @@
|
||||
-- Phase 28 (ADR-067): RAG 知識庫 pgvector 向量表
|
||||
-- 2026-04-10 Claude Sonnet 4.6 Asia/Taipei
|
||||
-- 前置: pgvector 0.8.2 已安裝於 awoooi_prod ✅
|
||||
-- 索引: 初期線性搜尋 (< 100 筆);超過 100 筆後執行 CREATE INDEX ivfflat
|
||||
|
||||
CREATE EXTENSION IF NOT EXISTS vector;
|
||||
|
||||
CREATE TABLE IF NOT EXISTS rag_chunks (
|
||||
id SERIAL PRIMARY KEY,
|
||||
source TEXT NOT NULL, -- 來源: "playbook", "incident", "runbook", "adr"
|
||||
source_id TEXT, -- 來源 ID (playbook_id / incident_id 等)
|
||||
title TEXT NOT NULL, -- 標題 / 檔名
|
||||
chunk_text TEXT NOT NULL, -- 原始文字片段
|
||||
embedding vector(768), -- nomic-embed-text 768維向量
|
||||
metadata JSONB DEFAULT '{}', -- 額外 metadata
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS ix_rag_chunks_source ON rag_chunks (source);
|
||||
CREATE INDEX IF NOT EXISTS ix_rag_chunks_created ON rag_chunks (created_at DESC);
|
||||
|
||||
-- 向量近鄰索引 (超過 100 筆後執行)
|
||||
-- CREATE INDEX IF NOT EXISTS ix_rag_chunks_embedding
|
||||
-- ON rag_chunks USING ivfflat (embedding vector_cosine_ops)
|
||||
-- WITH (lists = 10);
|
||||
|
||||
COMMENT ON TABLE rag_chunks IS 'RAG 知識庫向量片段 — Phase 28 ADR-067 (2026-04-10)';
|
||||
21
apps/api/migrations/phase29_pr_reviews.sql
Normal file
21
apps/api/migrations/phase29_pr_reviews.sql
Normal file
@@ -0,0 +1,21 @@
|
||||
-- Phase 29 (ADR-067): PR 自動審查記錄表
|
||||
-- 2026-04-10 Claude Sonnet 4.6 Asia/Taipei
|
||||
-- 雙寫: Redis TTL 7d (熱) + PostgreSQL 永久 (冷)
|
||||
|
||||
CREATE TABLE IF NOT EXISTS pr_reviews (
|
||||
id SERIAL PRIMARY KEY,
|
||||
pr_id TEXT NOT NULL, -- Gitea PR number (字串化)
|
||||
repo TEXT NOT NULL, -- "wooo/awoooi"
|
||||
title TEXT, -- PR 標題
|
||||
diff_size_bytes INTEGER, -- diff 大小 (bytes)
|
||||
model TEXT NOT NULL, -- qwen2.5-coder:7b / gemini-fallback
|
||||
provider TEXT NOT NULL DEFAULT 'ollama',
|
||||
review_text TEXT NOT NULL, -- 審查全文
|
||||
issues_count INTEGER DEFAULT 0, -- 發現問題數
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS ix_pr_reviews_pr_id ON pr_reviews (pr_id);
|
||||
CREATE INDEX IF NOT EXISTS ix_pr_reviews_created ON pr_reviews (created_at DESC);
|
||||
|
||||
COMMENT ON TABLE pr_reviews IS 'PR 自動審查記錄 — Phase 29 ADR-067 (2026-04-10)';
|
||||
@@ -1,5 +1,7 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
|
||||
"""
|
||||
AWOOOI API - SignOz Webhook Handler
|
||||
====================================
|
||||
@@ -249,6 +251,15 @@ async def process_signoz_alert(
|
||||
approval_id=approval_id,
|
||||
)
|
||||
|
||||
# Phase 31 (2026-04-10 Claude Code ADR-067): Log 異常摘要背景推送
|
||||
# 5s 軟超時不阻塞主流程;超時後摘要繼續跑,結果存 Redis 快取
|
||||
pod_name = labels.get("pod", labels.get("pod_name", ""))
|
||||
namespace = labels.get("namespace", "awoooi-prod")
|
||||
if pod_name:
|
||||
asyncio.create_task(
|
||||
_send_log_summary_notification(pod_name, namespace, approval_id)
|
||||
)
|
||||
|
||||
# Wave A.5: 記錄告警鏈路成功 (ADR-037)
|
||||
record_alert_chain_success("signoz")
|
||||
record_alert_processed(
|
||||
@@ -413,6 +424,45 @@ async def send_signoz_telegram(
|
||||
logger.exception("signoz_telegram_error", error=str(e))
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Phase 31 (2026-04-10 Claude Code ADR-067): Log 摘要背景推送 helper
|
||||
# =============================================================================
|
||||
|
||||
async def _send_log_summary_notification(
|
||||
pod_name: str,
|
||||
namespace: str,
|
||||
approval_id: str,
|
||||
) -> None:
|
||||
"""
|
||||
背景取得 Pod log 摘要並推送 Telegram
|
||||
|
||||
帶 5s 軟超時:超時後摘要繼續生成並存 Redis,不阻塞告警主流程
|
||||
"""
|
||||
import html as _html
|
||||
from src.services.log_summary_service import get_log_summary_service
|
||||
from src.services.telegram_gateway import get_telegram_gateway
|
||||
|
||||
try:
|
||||
svc = get_log_summary_service()
|
||||
summary = await svc.summarize_with_soft_timeout(pod_name, namespace)
|
||||
|
||||
if not summary:
|
||||
return
|
||||
|
||||
tg = get_telegram_gateway()
|
||||
msg = (
|
||||
f"🔍 <b>Log 異常摘要</b>\n"
|
||||
f"Pod: <code>{_html.escape(pod_name)}</code>\n"
|
||||
f"Approval: <code>{_html.escape(approval_id)}</code>\n\n"
|
||||
f"{_html.escape(summary)}\n\n"
|
||||
f"<i>deepseek-r1:14b | 免費本地推理</i>"
|
||||
)
|
||||
await tg.send_text(msg[:4096])
|
||||
|
||||
except Exception as e:
|
||||
logger.warning("log_summary_notification_failed", pod=pod_name, error=str(e))
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Health Check (供 SignOz 確認 Webhook 可達)
|
||||
# =============================================================================
|
||||
|
||||
@@ -90,8 +90,32 @@ async def telegram_webhook(
|
||||
logger.info("telegram_webhook_received", update_id=update.update_id)
|
||||
|
||||
# =========================================================================
|
||||
# Step 1: 僅處理 callback_query (簽核按鈕點擊)
|
||||
# Step 1: 路由 Update 類型
|
||||
# =========================================================================
|
||||
# Phase 34 (ADR-067 2026-04-10): 圖片訊息 → image_analysis_service
|
||||
if not update.callback_query and update.message:
|
||||
msg = update.message
|
||||
if msg.get("photo"):
|
||||
# 取最高解析度 (photos 陣列最後一個)
|
||||
photos = msg["photo"]
|
||||
best = max(photos, key=lambda p: p.get("file_size", 0))
|
||||
file_id = best.get("file_id", "")
|
||||
chat_id = str(msg.get("chat", {}).get("id", ""))
|
||||
caption = msg.get("caption", "請用繁體中文描述這張圖片")
|
||||
if file_id and chat_id:
|
||||
try:
|
||||
from src.services.image_analysis_service import get_image_analysis_service
|
||||
svc = get_image_analysis_service()
|
||||
# download_and_analyze 內部自行下載 + 分析 + 發送 Telegram
|
||||
await svc.download_and_analyze(
|
||||
chat_id=chat_id,
|
||||
file_id=file_id,
|
||||
question=caption,
|
||||
)
|
||||
except Exception as _img_err:
|
||||
logger.warning("image_analysis_webhook_failed", error=str(_img_err))
|
||||
return {"ok": True, "message": "photo_processed"}
|
||||
|
||||
if not update.callback_query:
|
||||
logger.debug("telegram_webhook_ignored", reason="not callback_query")
|
||||
return {"ok": True, "message": "Ignored (not callback_query)"}
|
||||
|
||||
228
apps/api/src/services/image_analysis_service.py
Normal file
228
apps/api/src/services/image_analysis_service.py
Normal file
@@ -0,0 +1,228 @@
|
||||
"""
|
||||
AWOOOI — Image Analysis Service (Phase 34, ADR-067)
|
||||
===================================================
|
||||
使用 llava:latest 分析 Telegram 傳入的圖片
|
||||
|
||||
觸發:
|
||||
- Telegram 傳圖 (photo attachment) 自動觸發
|
||||
- /screenshot 指令
|
||||
|
||||
限制:
|
||||
- 圖片 < 5MB
|
||||
- MIME 驗證 (image/jpeg, image/png, image/webp)
|
||||
- 每 chat_id 每分鐘最多 3 次 (Redis rate limit)
|
||||
|
||||
安全:
|
||||
- 暫存到 /tmp/tg_photo_{file_id}.jpg
|
||||
- finally 清理暫存檔
|
||||
|
||||
2026-04-10 Claude Sonnet 4.6 Asia/Taipei
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import base64
|
||||
import os
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import httpx
|
||||
import structlog
|
||||
|
||||
from src.core.config import get_settings
|
||||
|
||||
if TYPE_CHECKING:
|
||||
pass
|
||||
|
||||
logger = structlog.get_logger(__name__)
|
||||
settings = get_settings()
|
||||
|
||||
_MODEL = "llava:latest"
|
||||
_TIMEOUT_S = 120.0
|
||||
_MAX_SIZE_BYTES = 5 * 1024 * 1024 # 5MB
|
||||
_ALLOWED_MIME = {"image/jpeg", "image/png", "image/webp", "image/gif"}
|
||||
_RATE_LIMIT = 3 # 每 chat_id 每分鐘最多 3 次
|
||||
_RATE_WINDOW = 60 # 秒
|
||||
|
||||
|
||||
class ImageAnalysisService:
|
||||
"""Telegram 圖片分析服務"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._http: httpx.AsyncClient | None = None
|
||||
|
||||
async def _get_http(self) -> httpx.AsyncClient:
|
||||
if self._http is None or self._http.is_closed:
|
||||
self._http = httpx.AsyncClient(timeout=httpx.Timeout(_TIMEOUT_S, connect=10.0))
|
||||
return self._http
|
||||
|
||||
async def analyze(
|
||||
self,
|
||||
chat_id: str,
|
||||
file_id: str,
|
||||
file_bytes: bytes,
|
||||
mime_type: str = "image/jpeg",
|
||||
question: str = "請用繁體中文描述這張圖片,如果是截圖或介面請分析其內容和問題",
|
||||
) -> str | None:
|
||||
"""
|
||||
分析圖片,回傳繁中說明
|
||||
"""
|
||||
# Rate limit 檢查
|
||||
if not await self._check_rate_limit(chat_id):
|
||||
return "⚠️ 圖片分析頻率超限(每分鐘最多 3 次),請稍後再試"
|
||||
|
||||
# 大小檢查
|
||||
if len(file_bytes) > _MAX_SIZE_BYTES:
|
||||
return f"⚠️ 圖片過大({len(file_bytes)//1024}KB),限制 5MB"
|
||||
|
||||
# MIME 檢查
|
||||
if mime_type not in _ALLOWED_MIME:
|
||||
return f"⚠️ 不支援的圖片格式:{mime_type}"
|
||||
|
||||
tmp_path = Path(f"/tmp/tg_photo_{file_id}.jpg")
|
||||
try:
|
||||
tmp_path.write_bytes(file_bytes)
|
||||
result = await self._call_llava(tmp_path, question)
|
||||
if result:
|
||||
logger.info("image_analysis_done", chat_id=chat_id, file_id=file_id[:8])
|
||||
return result
|
||||
finally:
|
||||
try:
|
||||
tmp_path.unlink(missing_ok=True)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
async def _check_rate_limit(self, chat_id: str) -> bool:
|
||||
"""Redis sliding window rate limit"""
|
||||
try:
|
||||
from src.core.redis_client import get_redis
|
||||
redis = await get_redis()
|
||||
if redis is None:
|
||||
return True # Redis 不可用時放行
|
||||
|
||||
key = f"img_ratelimit:{chat_id}"
|
||||
now = int(time.time())
|
||||
window_start = now - _RATE_WINDOW
|
||||
|
||||
pipe = redis.pipeline()
|
||||
pipe.zremrangebyscore(key, 0, window_start)
|
||||
pipe.zcard(key)
|
||||
pipe.zadd(key, {str(now): now})
|
||||
pipe.expire(key, _RATE_WINDOW * 2)
|
||||
results = await pipe.execute()
|
||||
|
||||
count = results[1]
|
||||
return count < _RATE_LIMIT
|
||||
except Exception:
|
||||
return True # 失敗時放行
|
||||
|
||||
async def _call_llava(self, image_path: Path, question: str) -> str | None:
|
||||
"""呼叫 llava:latest via Ollama /api/generate with base64 image"""
|
||||
try:
|
||||
image_b64 = base64.b64encode(image_path.read_bytes()).decode()
|
||||
http = await self._get_http()
|
||||
resp = await http.post(
|
||||
f"{settings.OLLAMA_URL}/api/generate",
|
||||
json={
|
||||
"model": _MODEL,
|
||||
"prompt": question,
|
||||
"images": [image_b64],
|
||||
"stream": False,
|
||||
"options": {
|
||||
"num_predict": 512,
|
||||
"temperature": 0.3,
|
||||
},
|
||||
},
|
||||
)
|
||||
if resp.status_code == 200:
|
||||
text = resp.json().get("response", "").strip()
|
||||
return text or None
|
||||
else:
|
||||
logger.warning("image_analysis_ollama_error", status=resp.status_code)
|
||||
return None
|
||||
except httpx.TimeoutException:
|
||||
logger.warning("image_analysis_timeout", path=str(image_path))
|
||||
return "⚠️ 圖片分析超時(llava 處理中),請稍後重試"
|
||||
except Exception as e:
|
||||
logger.error("image_analysis_failed", error=str(e))
|
||||
return None
|
||||
|
||||
async def download_and_analyze(
|
||||
self,
|
||||
chat_id: str,
|
||||
file_id: str,
|
||||
question: str = "請用繁體中文描述這張圖片,如果是截圖或介面請分析其內容和問題",
|
||||
) -> None:
|
||||
"""
|
||||
從 Telegram 下載圖片後分析並發送結果
|
||||
供 telegram webhook handler 呼叫(含 download + analyze + send)
|
||||
"""
|
||||
try:
|
||||
file_bytes = await self._download_telegram_file(file_id)
|
||||
if not file_bytes:
|
||||
return
|
||||
result = await self.analyze(
|
||||
chat_id=chat_id,
|
||||
file_id=file_id,
|
||||
file_bytes=file_bytes,
|
||||
question=question,
|
||||
)
|
||||
if result:
|
||||
from src.services.telegram_gateway import get_telegram_gateway
|
||||
tg = get_telegram_gateway()
|
||||
await tg.initialize()
|
||||
await tg.send_text(f"🖼️ <b>圖片分析</b>\n{result}")
|
||||
except Exception as e:
|
||||
logger.warning("download_and_analyze_failed", error=str(e))
|
||||
|
||||
async def _download_telegram_file(self, file_id: str) -> bytes | None:
|
||||
"""透過 Telegram Bot API 下載圖片"""
|
||||
try:
|
||||
from src.core.config import get_settings
|
||||
cfg = get_settings()
|
||||
bot_token = cfg.TELEGRAM_BOT_TOKEN
|
||||
http = await self._get_http()
|
||||
|
||||
# Step 1: getFile → file_path
|
||||
get_file_resp = await http.get(
|
||||
f"https://api.telegram.org/bot{bot_token}/getFile",
|
||||
params={"file_id": file_id},
|
||||
timeout=httpx.Timeout(15.0, connect=5.0),
|
||||
)
|
||||
if get_file_resp.status_code != 200:
|
||||
logger.warning("tg_getfile_failed", status=get_file_resp.status_code)
|
||||
return None
|
||||
|
||||
file_path = get_file_resp.json().get("result", {}).get("file_path")
|
||||
if not file_path:
|
||||
return None
|
||||
|
||||
# Step 2: 下載檔案
|
||||
dl_resp = await http.get(
|
||||
f"https://api.telegram.org/file/bot{bot_token}/{file_path}",
|
||||
timeout=httpx.Timeout(30.0, connect=5.0),
|
||||
)
|
||||
if dl_resp.status_code == 200:
|
||||
return dl_resp.content
|
||||
except Exception as e:
|
||||
logger.warning("tg_download_failed", file_id=file_id[:8], error=str(e))
|
||||
return None
|
||||
|
||||
async def close(self) -> None:
|
||||
if self._http and not self._http.is_closed:
|
||||
await self._http.aclose()
|
||||
|
||||
|
||||
_instance: ImageAnalysisService | None = None
|
||||
|
||||
|
||||
def get_image_analysis_service() -> ImageAnalysisService:
|
||||
global _instance
|
||||
if _instance is None:
|
||||
_instance = ImageAnalysisService()
|
||||
return _instance
|
||||
|
||||
|
||||
def set_image_analysis_service(svc: ImageAnalysisService) -> None:
|
||||
global _instance
|
||||
_instance = svc
|
||||
235
apps/api/src/services/knowledge_rag_service.py
Normal file
235
apps/api/src/services/knowledge_rag_service.py
Normal file
@@ -0,0 +1,235 @@
|
||||
"""
|
||||
AWOOOI — Knowledge RAG Service (Phase 33, ADR-067)
|
||||
==================================================
|
||||
本地 RAG 知識庫:nomic-embed-text 768維向量 + pgvector
|
||||
|
||||
雙軌查詢:
|
||||
- Redis KNN 熱快取 (TTL 30d)
|
||||
- pgvector 冷儲存 (永久)
|
||||
|
||||
索引策略:
|
||||
- 初期 < 100 筆: 線性搜尋
|
||||
- 超過 100 筆: 執行 CREATE INDEX ivfflat (手動觸發)
|
||||
|
||||
向量模型: nomic-embed-text (Ollama 188, 768維)
|
||||
生成模型: qwen2.5:7b-instruct (Ollama 111)
|
||||
|
||||
2026-04-10 Claude Sonnet 4.6 Asia/Taipei
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from typing import Any
|
||||
|
||||
import httpx
|
||||
import structlog
|
||||
|
||||
from src.core.config import get_settings
|
||||
|
||||
logger = structlog.get_logger(__name__)
|
||||
settings = get_settings()
|
||||
|
||||
# nomic-embed-text 在 188 (記憶體充足)
|
||||
_EMBED_URL = "http://192.168.0.188:11434"
|
||||
_EMBED_MODEL = "nomic-embed-text"
|
||||
|
||||
# 生成回答用 111 (快)
|
||||
_GEN_MODEL = "qwen2.5:7b-instruct"
|
||||
|
||||
_TOP_K = 5 # 取最相似 5 筆
|
||||
_CACHE_TTL = 30 * 86400 # 30 days
|
||||
|
||||
|
||||
class KnowledgeRAGService:
|
||||
"""RAG 知識庫服務"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._http: httpx.AsyncClient | None = None
|
||||
|
||||
async def _get_http(self) -> httpx.AsyncClient:
|
||||
if self._http is None or self._http.is_closed:
|
||||
self._http = httpx.AsyncClient(timeout=httpx.Timeout(60.0, connect=10.0))
|
||||
return self._http
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# 公開 API
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
async def query(self, question: str, top_k: int = _TOP_K) -> str:
|
||||
"""
|
||||
RAG 查詢:embedding → pgvector knn → 生成回答
|
||||
回傳繁中人話回答
|
||||
"""
|
||||
embedding = await self._embed(question)
|
||||
if embedding is None:
|
||||
return "⚠️ 無法生成向量,RAG 查詢失敗"
|
||||
|
||||
chunks = await self._search_pgvector(embedding, top_k)
|
||||
if not chunks:
|
||||
return "📭 知識庫尚無相關資料,請先執行索引建立"
|
||||
|
||||
context = "\n\n---\n\n".join(
|
||||
f"[{c.get('source','?')}] {c.get('title','')}\n{c.get('chunk_text','')}"
|
||||
for c in chunks
|
||||
)
|
||||
return await self._generate_answer(question, context)
|
||||
|
||||
async def index_document(
|
||||
self,
|
||||
source: str,
|
||||
source_id: str,
|
||||
title: str,
|
||||
text: str,
|
||||
metadata: dict | None = None,
|
||||
) -> bool:
|
||||
"""
|
||||
將文件向量化並儲存到 pgvector
|
||||
自動分段 (每段 500 字, overlap 100)
|
||||
"""
|
||||
chunks = self._chunk_text(text, chunk_size=500, overlap=100)
|
||||
success = 0
|
||||
for chunk in chunks:
|
||||
emb = await self._embed(chunk)
|
||||
if emb:
|
||||
ok = await self._save_chunk(source, source_id, title, chunk, emb, metadata or {})
|
||||
if ok:
|
||||
success += 1
|
||||
logger.info("rag_indexed", source=source, source_id=source_id, chunks=success)
|
||||
return success > 0
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# 向量化
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
async def _embed(self, text: str) -> list[float] | None:
|
||||
try:
|
||||
http = await self._get_http()
|
||||
resp = await http.post(
|
||||
f"{_EMBED_URL}/api/embeddings",
|
||||
json={"model": _EMBED_MODEL, "prompt": text},
|
||||
)
|
||||
if resp.status_code == 200:
|
||||
return resp.json().get("embedding")
|
||||
except Exception as e:
|
||||
logger.warning("rag_embed_failed", error=str(e))
|
||||
return None
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# pgvector 搜尋
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
async def _search_pgvector(
|
||||
self, embedding: list[float], top_k: int
|
||||
) -> list[dict[str, Any]]:
|
||||
try:
|
||||
from src.db.base import get_db_context
|
||||
from sqlalchemy import text
|
||||
|
||||
vec_str = "[" + ",".join(str(v) for v in embedding) + "]"
|
||||
async with get_db_context() as db:
|
||||
rows = await db.execute(
|
||||
text("""
|
||||
SELECT source, source_id, title, chunk_text,
|
||||
1 - (embedding <=> CAST(:vec AS vector)) AS similarity
|
||||
FROM rag_chunks
|
||||
ORDER BY embedding <=> CAST(:vec AS vector)
|
||||
LIMIT :k
|
||||
"""),
|
||||
{"vec": vec_str, "k": top_k},
|
||||
)
|
||||
return [dict(r._mapping) for r in rows]
|
||||
except Exception as e:
|
||||
logger.error("rag_pgvector_search_failed", error=str(e))
|
||||
return []
|
||||
|
||||
async def _save_chunk(
|
||||
self,
|
||||
source: str, source_id: str, title: str,
|
||||
chunk_text: str, embedding: list[float],
|
||||
metadata: dict,
|
||||
) -> bool:
|
||||
try:
|
||||
from src.db.base import get_db_context
|
||||
from sqlalchemy import text
|
||||
|
||||
vec_str = "[" + ",".join(str(v) for v in embedding) + "]"
|
||||
async with get_db_context() as db:
|
||||
await db.execute(
|
||||
text("""
|
||||
INSERT INTO rag_chunks (source, source_id, title, chunk_text, embedding, metadata)
|
||||
VALUES (:source, :source_id, :title, :chunk_text, CAST(:embedding AS vector), CAST(:metadata AS jsonb))
|
||||
"""),
|
||||
{
|
||||
"source": source, "source_id": source_id, "title": title,
|
||||
"chunk_text": chunk_text,
|
||||
"embedding": vec_str,
|
||||
"metadata": json.dumps(metadata, ensure_ascii=False),
|
||||
},
|
||||
)
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error("rag_save_chunk_failed", error=str(e))
|
||||
return False
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# 生成回答
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
async def _generate_answer(self, question: str, context: str) -> str:
|
||||
prompt = (
|
||||
"你是 AWOOOI AIOps 知識庫助手,請用繁體中文根據以下資料回答問題。\n"
|
||||
"如果資料不足以回答,明確說「資料不足」,不要猜測。\n\n"
|
||||
f"=== 相關資料 ===\n{context[:6000]}\n\n"
|
||||
f"=== 問題 ===\n{question}"
|
||||
)
|
||||
try:
|
||||
http = await self._get_http()
|
||||
resp = await http.post(
|
||||
f"{settings.OLLAMA_URL}/api/generate",
|
||||
json={
|
||||
"model": _GEN_MODEL,
|
||||
"prompt": prompt,
|
||||
"stream": False,
|
||||
"options": {"num_predict": 512, "temperature": 0.2},
|
||||
},
|
||||
timeout=httpx.Timeout(90.0, connect=10.0),
|
||||
)
|
||||
if resp.status_code == 200:
|
||||
return resp.json().get("response", "").strip()
|
||||
except Exception as e:
|
||||
logger.error("rag_generate_failed", error=str(e))
|
||||
return "⚠️ RAG 生成失敗,請稍後再試"
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# 工具
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
@staticmethod
|
||||
def _chunk_text(text: str, chunk_size: int = 500, overlap: int = 100) -> list[str]:
|
||||
"""簡單字元分段,帶 overlap"""
|
||||
chunks = []
|
||||
start = 0
|
||||
while start < len(text):
|
||||
end = start + chunk_size
|
||||
chunks.append(text[start:end])
|
||||
start += chunk_size - overlap
|
||||
return [c for c in chunks if c.strip()]
|
||||
|
||||
async def close(self) -> None:
|
||||
if self._http and not self._http.is_closed:
|
||||
await self._http.aclose()
|
||||
|
||||
|
||||
_instance: KnowledgeRAGService | None = None
|
||||
|
||||
|
||||
def get_knowledge_rag_service() -> KnowledgeRAGService:
|
||||
global _instance
|
||||
if _instance is None:
|
||||
_instance = KnowledgeRAGService()
|
||||
return _instance
|
||||
|
||||
|
||||
def set_knowledge_rag_service(svc: KnowledgeRAGService) -> None:
|
||||
global _instance
|
||||
_instance = svc
|
||||
200
apps/api/src/services/local_code_review_service.py
Normal file
200
apps/api/src/services/local_code_review_service.py
Normal file
@@ -0,0 +1,200 @@
|
||||
"""
|
||||
AWOOOI — Local Code Review Service (Phase 32, ADR-067)
|
||||
======================================================
|
||||
使用 qwen2.5-coder:7b 對 Gitea PR diff 進行自動審查
|
||||
Fallback: Gemini (diff > 50KB 或 Ollama 超時)
|
||||
|
||||
觸發: Gitea webhook push event (PR opened/updated)
|
||||
雙寫: Redis TTL 7d + PostgreSQL pr_reviews 表永久儲存
|
||||
並發: semaphore 最多 2 個同時審查
|
||||
|
||||
2026-04-10 Claude Sonnet 4.6 Asia/Taipei
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from typing import Any
|
||||
|
||||
import httpx
|
||||
import structlog
|
||||
|
||||
from src.core.config import get_settings
|
||||
|
||||
logger = structlog.get_logger(__name__)
|
||||
settings = get_settings()
|
||||
|
||||
_MODEL_OLLAMA = "qwen2.5-coder:7b"
|
||||
_TIMEOUT_OLLAMA = 120.0
|
||||
_MAX_DIFF_BYTES = 50 * 1024 # 50KB → fallback to Gemini
|
||||
_SEMAPHORE = asyncio.Semaphore(2) # 最多 2 個同時審查
|
||||
|
||||
# Redis cache TTL: 7 days
|
||||
_CACHE_TTL = 7 * 86400
|
||||
|
||||
|
||||
class LocalCodeReviewService:
|
||||
"""PR 自動審查服務"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._http: httpx.AsyncClient | None = None
|
||||
|
||||
async def _get_http(self) -> httpx.AsyncClient:
|
||||
if self._http is None or self._http.is_closed:
|
||||
self._http = httpx.AsyncClient(
|
||||
timeout=httpx.Timeout(_TIMEOUT_OLLAMA, connect=10.0)
|
||||
)
|
||||
return self._http
|
||||
|
||||
async def review_pr(
|
||||
self,
|
||||
pr_id: str,
|
||||
repo: str,
|
||||
title: str,
|
||||
diff: str,
|
||||
) -> dict[str, Any] | None:
|
||||
"""
|
||||
審查 PR diff,回傳審查結果 dict
|
||||
{review_text, issues_count, model, provider}
|
||||
"""
|
||||
async with _SEMAPHORE:
|
||||
cache_key = f"pr_review:{repo}:{pr_id}"
|
||||
|
||||
# 快取命中
|
||||
try:
|
||||
from src.core.redis_client import get_redis
|
||||
redis = await get_redis()
|
||||
if redis:
|
||||
cached = await redis.get(cache_key)
|
||||
if cached:
|
||||
logger.info("pr_review_cache_hit", pr_id=pr_id)
|
||||
import json
|
||||
return json.loads(cached)
|
||||
except Exception:
|
||||
redis = None
|
||||
|
||||
diff_size = len(diff.encode())
|
||||
use_gemini = diff_size > _MAX_DIFF_BYTES
|
||||
|
||||
if use_gemini:
|
||||
result = await self._review_with_gemini(pr_id, repo, title, diff)
|
||||
else:
|
||||
result = await self._review_with_ollama(pr_id, repo, title, diff)
|
||||
if result is None:
|
||||
# Ollama 失敗 → fallback Gemini
|
||||
result = await self._review_with_gemini(pr_id, repo, title, diff)
|
||||
|
||||
if result is None:
|
||||
return None
|
||||
|
||||
result["diff_size_bytes"] = diff_size
|
||||
|
||||
# 寫入快取
|
||||
if redis:
|
||||
try:
|
||||
import json
|
||||
await redis.set(cache_key, json.dumps(result, ensure_ascii=False), ex=_CACHE_TTL)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# 寫入 DB
|
||||
await self._save_to_db(pr_id, repo, title, diff_size, result)
|
||||
|
||||
return result
|
||||
|
||||
async def _review_with_ollama(
|
||||
self, pr_id: str, repo: str, title: str, diff: str
|
||||
) -> dict[str, Any] | None:
|
||||
prompt = (
|
||||
f"你是資深程式審查員,請用繁體中文審查以下 Pull Request。\n"
|
||||
f"PR: {repo}#{pr_id} — {title}\n\n"
|
||||
"請找出:1) 潛在 Bug 或邏輯錯誤 2) 安全問題 3) 效能問題 4) 代碼風格問題\n"
|
||||
"格式:每個問題獨立一行,以「⚠️」開頭。如果沒有問題,說「✅ 程式碼品質良好」\n\n"
|
||||
f"=== Diff ===\n{diff[:40000]}\n=== 結束 ==="
|
||||
)
|
||||
try:
|
||||
http = await self._get_http()
|
||||
resp = await http.post(
|
||||
f"{settings.OLLAMA_URL}/api/generate",
|
||||
json={
|
||||
"model": _MODEL_OLLAMA,
|
||||
"prompt": prompt,
|
||||
"stream": False,
|
||||
"options": {"num_predict": 1024, "temperature": 0.1},
|
||||
},
|
||||
)
|
||||
if resp.status_code == 200:
|
||||
text = resp.json().get("response", "").strip()
|
||||
issues = text.count("⚠️")
|
||||
logger.info("pr_review_ollama_done", pr_id=pr_id, issues=issues)
|
||||
return {"review_text": text, "issues_count": issues, "model": _MODEL_OLLAMA, "provider": "ollama"}
|
||||
except httpx.TimeoutException:
|
||||
logger.warning("pr_review_ollama_timeout", pr_id=pr_id)
|
||||
except Exception as e:
|
||||
logger.error("pr_review_ollama_failed", pr_id=pr_id, error=str(e))
|
||||
return None
|
||||
|
||||
async def _review_with_gemini(
|
||||
self, pr_id: str, repo: str, title: str, diff: str
|
||||
) -> dict[str, Any] | None:
|
||||
try:
|
||||
from src.services.openclaw import get_openclaw
|
||||
openclaw = get_openclaw()
|
||||
prompt = (
|
||||
f"PR Code Review: {repo}#{pr_id} — {title}\n"
|
||||
"繁體中文,找出 Bug/安全/效能/風格問題,每問題以⚠️開頭\n\n"
|
||||
f"Diff:\n{diff[:60000]}"
|
||||
)
|
||||
# 直接呼叫 Gemini
|
||||
result = await openclaw._call_gemini(prompt)
|
||||
if result and result[0]:
|
||||
text = result[0]
|
||||
issues = text.count("⚠️")
|
||||
return {"review_text": text, "issues_count": issues, "model": "gemini", "provider": "gemini"}
|
||||
except Exception as e:
|
||||
logger.error("pr_review_gemini_failed", pr_id=pr_id, error=str(e))
|
||||
return None
|
||||
|
||||
async def _save_to_db(
|
||||
self, pr_id: str, repo: str, title: str, diff_size: int, result: dict
|
||||
) -> None:
|
||||
try:
|
||||
from src.db.base import get_db_context
|
||||
from sqlalchemy import text
|
||||
async with get_db_context() as db:
|
||||
await db.execute(
|
||||
text("""
|
||||
INSERT INTO pr_reviews
|
||||
(pr_id, repo, title, diff_size_bytes, model, provider, review_text, issues_count)
|
||||
VALUES
|
||||
(:pr_id, :repo, :title, :diff_size, :model, :provider, :review_text, :issues_count)
|
||||
"""),
|
||||
{
|
||||
"pr_id": pr_id, "repo": repo, "title": title,
|
||||
"diff_size": diff_size,
|
||||
"model": result.get("model", "unknown"),
|
||||
"provider": result.get("provider", "unknown"),
|
||||
"review_text": result.get("review_text", ""),
|
||||
"issues_count": result.get("issues_count", 0),
|
||||
},
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning("pr_review_db_save_failed", error=str(e))
|
||||
|
||||
async def close(self) -> None:
|
||||
if self._http and not self._http.is_closed:
|
||||
await self._http.aclose()
|
||||
|
||||
|
||||
_instance: LocalCodeReviewService | None = None
|
||||
|
||||
|
||||
def get_local_code_review_service() -> LocalCodeReviewService:
|
||||
global _instance
|
||||
if _instance is None:
|
||||
_instance = LocalCodeReviewService()
|
||||
return _instance
|
||||
|
||||
|
||||
def set_local_code_review_service(svc: LocalCodeReviewService) -> None:
|
||||
global _instance
|
||||
_instance = svc
|
||||
244
apps/api/src/services/log_summary_service.py
Normal file
244
apps/api/src/services/log_summary_service.py
Normal file
@@ -0,0 +1,244 @@
|
||||
"""
|
||||
Log Summary Service - Phase 31
|
||||
================================
|
||||
職責:擷取 K8s Pod log,用 deepseek-r1:14b 生成繁中異常摘要
|
||||
|
||||
設計邊界:
|
||||
- 只輸出摘要文字,不做 RCA、不生成修復指令
|
||||
- 5s 軟超時:超過則回傳 None,主流程不阻塞
|
||||
- LLM 硬超時 180s(deepseek-r1:14b 慢但準)
|
||||
- Redis 快取:log_summary:{pod}:{date} TTL 24h,同 pod 同天不重複呼叫
|
||||
- 敏感資料遮蔽:Bearer token、密碼 regex → [REDACTED]
|
||||
- 複用 K8sDiagnosticsService 取得 logs
|
||||
|
||||
版本: v1.0
|
||||
建立: 2026-04-10 (台北時區)
|
||||
建立者: Claude Code (Phase 31 ADR-067)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import re
|
||||
from datetime import UTC, datetime
|
||||
|
||||
import httpx
|
||||
import structlog
|
||||
|
||||
from src.core.redis_client import get_redis
|
||||
|
||||
logger = structlog.get_logger(__name__)
|
||||
|
||||
# ============================================================
|
||||
# 設定
|
||||
# ============================================================
|
||||
OLLAMA_URL = "http://192.168.0.111:11434"
|
||||
SUMMARY_MODEL = "deepseek-r1:14b"
|
||||
LLM_TIMEOUT = 180.0 # deepseek-r1 硬超時
|
||||
SOFT_TIMEOUT = 5.0 # 主流程軟超時(超過回 None)
|
||||
LOG_TAIL_LINES = 100
|
||||
ANOMALY_TAIL_LINES = 50 # 只取最後 50 行異常行送 LLM
|
||||
CACHE_TTL = 86400 # 24 小時
|
||||
CACHE_PREFIX = "log_summary:"
|
||||
|
||||
# 異常關鍵字 pattern
|
||||
_ANOMALY_PATTERN = re.compile(
|
||||
r"(ERROR|FATAL|Exception|Traceback|OOMKilled|panic|CRITICAL|WARN|WARNING)",
|
||||
re.IGNORECASE,
|
||||
)
|
||||
|
||||
# 敏感資料遮蔽 pattern
|
||||
_SENSITIVE_PATTERNS = [
|
||||
(re.compile(r"Bearer\s+[A-Za-z0-9+/=._-]{10,}", re.IGNORECASE), "Bearer [REDACTED]"),
|
||||
(re.compile(r"password[=:\s]+\S+", re.IGNORECASE), "password=[REDACTED]"),
|
||||
(re.compile(r"token[=:\s]+[A-Za-z0-9+/=._-]{10,}", re.IGNORECASE), "token=[REDACTED]"),
|
||||
(re.compile(r"secret[=:\s]+\S+", re.IGNORECASE), "secret=[REDACTED]"),
|
||||
]
|
||||
|
||||
# ============================================================
|
||||
# Prompt
|
||||
# ============================================================
|
||||
_SUMMARY_PROMPT = """你是 AWOOOI SRE 維運助理,請分析以下 K8s Pod log 片段。
|
||||
|
||||
## Pod: {pod_name} (namespace: {namespace})
|
||||
## Log 異常片段(最近 {line_count} 行):
|
||||
{log_snippet}
|
||||
|
||||
## 任務
|
||||
用繁體中文,3 行以內,說明:
|
||||
1. 異常的主要原因是什麼
|
||||
2. 影響程度(例如 OOM Killed、連線失敗、程式錯誤)
|
||||
3. 建議立即查看哪個方向
|
||||
|
||||
只輸出摘要文字,不要標題或 markdown 格式。
|
||||
"""
|
||||
|
||||
|
||||
class LogSummaryService:
|
||||
"""
|
||||
Pod Log 異常摘要服務
|
||||
|
||||
職責邊界:
|
||||
✅ 擷取 Pod log(複用 K8sDiagnosticsService)
|
||||
✅ 過濾異常行 + 敏感資料遮蔽
|
||||
✅ deepseek-r1:14b 生成繁中摘要(180s timeout)
|
||||
✅ Redis 快取(24h TTL)
|
||||
❌ 不做 RCA
|
||||
❌ 不生成修復指令
|
||||
❌ 不阻塞告警主流程(軟超時 5s)
|
||||
"""
|
||||
|
||||
async def summarize(
|
||||
self,
|
||||
pod_name: str,
|
||||
namespace: str = "awoooi-prod",
|
||||
) -> str | None:
|
||||
"""
|
||||
取得 Pod log 異常摘要
|
||||
|
||||
Returns:
|
||||
str: 繁中摘要文字(3 行內)
|
||||
None: 快取命中、無異常、或超時
|
||||
"""
|
||||
cache_key = self._cache_key(pod_name)
|
||||
redis = await get_redis()
|
||||
|
||||
cached = await redis.get(cache_key)
|
||||
if cached:
|
||||
logger.debug("log_summary_cache_hit", pod=pod_name)
|
||||
return cached.decode() if isinstance(cached, bytes) else cached
|
||||
|
||||
raw_logs = await self._fetch_logs(pod_name, namespace)
|
||||
if not raw_logs:
|
||||
return None
|
||||
|
||||
anomaly_lines = self._extract_anomaly_lines(raw_logs)
|
||||
if not anomaly_lines:
|
||||
logger.debug("log_summary_no_anomaly", pod=pod_name)
|
||||
return None
|
||||
|
||||
summary = await self._call_llm(pod_name, namespace, anomaly_lines)
|
||||
if summary:
|
||||
await redis.set(cache_key, summary, ex=CACHE_TTL)
|
||||
logger.info("log_summary_generated", pod=pod_name, lines=len(anomaly_lines))
|
||||
|
||||
return summary
|
||||
|
||||
async def summarize_with_soft_timeout(
|
||||
self,
|
||||
pod_name: str,
|
||||
namespace: str = "awoooi-prod",
|
||||
) -> str | None:
|
||||
"""
|
||||
帶 5s 軟超時的摘要取得
|
||||
|
||||
用於告警主流程:超過軟超時回傳 None,不阻塞 Telegram 推送
|
||||
LLM 繼續在背景跑,結果寫入 Redis 快取供下次使用
|
||||
"""
|
||||
try:
|
||||
return await asyncio.wait_for(
|
||||
self.summarize(pod_name, namespace),
|
||||
timeout=SOFT_TIMEOUT,
|
||||
)
|
||||
except asyncio.TimeoutError:
|
||||
logger.info(
|
||||
"log_summary_soft_timeout",
|
||||
pod=pod_name,
|
||||
soft_timeout=SOFT_TIMEOUT,
|
||||
)
|
||||
# 繼續在背景跑,結果存 Redis 備用
|
||||
asyncio.create_task(self.summarize(pod_name, namespace))
|
||||
return None
|
||||
|
||||
# --------------------------------------------------------
|
||||
# Private helpers
|
||||
# --------------------------------------------------------
|
||||
|
||||
def _cache_key(self, pod_name: str) -> str:
|
||||
date_str = datetime.now(UTC).strftime("%Y-%m-%d")
|
||||
return f"{CACHE_PREFIX}{pod_name}:{date_str}"
|
||||
|
||||
async def _fetch_logs(self, pod_name: str, namespace: str) -> str | None:
|
||||
"""複用 K8sDiagnosticsService 取得 logs"""
|
||||
try:
|
||||
from src.services.k8s_diagnostics import K8sDiagnosticsService
|
||||
svc = K8sDiagnosticsService(default_namespace=namespace)
|
||||
diag = await svc.collect_diagnostics(
|
||||
pod_name=pod_name,
|
||||
namespace=namespace,
|
||||
log_tail_lines=LOG_TAIL_LINES,
|
||||
include_previous_logs=False,
|
||||
)
|
||||
return diag.logs or None
|
||||
except Exception as e:
|
||||
logger.warning("log_summary_fetch_failed", pod=pod_name, error=str(e))
|
||||
return None
|
||||
|
||||
def _extract_anomaly_lines(self, raw_logs: str) -> list[str]:
|
||||
"""過濾異常行 + 敏感資料遮蔽,取最後 ANOMALY_TAIL_LINES 行"""
|
||||
lines = raw_logs.splitlines()
|
||||
anomaly = [l for l in lines if _ANOMALY_PATTERN.search(l)]
|
||||
# 取最後 N 行
|
||||
anomaly = anomaly[-ANOMALY_TAIL_LINES:]
|
||||
# 遮蔽敏感資料
|
||||
result = []
|
||||
for line in anomaly:
|
||||
for pattern, replacement in _SENSITIVE_PATTERNS:
|
||||
line = pattern.sub(replacement, line)
|
||||
result.append(line)
|
||||
return result
|
||||
|
||||
async def _call_llm(
|
||||
self,
|
||||
pod_name: str,
|
||||
namespace: str,
|
||||
anomaly_lines: list[str],
|
||||
) -> str | None:
|
||||
"""呼叫 deepseek-r1:14b 生成摘要"""
|
||||
log_snippet = "\n".join(anomaly_lines)
|
||||
prompt = _SUMMARY_PROMPT.format(
|
||||
pod_name=pod_name,
|
||||
namespace=namespace,
|
||||
line_count=len(anomaly_lines),
|
||||
log_snippet=log_snippet[:3000], # 避免超出 context
|
||||
)
|
||||
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=LLM_TIMEOUT) as client:
|
||||
resp = await client.post(
|
||||
f"{OLLAMA_URL}/api/generate",
|
||||
json={
|
||||
"model": SUMMARY_MODEL,
|
||||
"prompt": prompt,
|
||||
"stream": False,
|
||||
"options": {"temperature": 0.1, "num_predict": 200},
|
||||
},
|
||||
)
|
||||
resp.raise_for_status()
|
||||
data = resp.json()
|
||||
raw = data.get("response", "").strip()
|
||||
|
||||
# 過濾 deepseek-r1 的 <think>...</think> 推理區塊
|
||||
text = re.sub(r"<think>.*?</think>", "", raw, flags=re.DOTALL).strip()
|
||||
return text or raw or None
|
||||
|
||||
except httpx.TimeoutException:
|
||||
logger.warning("log_summary_llm_timeout", model=SUMMARY_MODEL)
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.warning("log_summary_llm_error", error=str(e))
|
||||
return None
|
||||
|
||||
|
||||
# ============================================================
|
||||
# Singleton
|
||||
# ============================================================
|
||||
|
||||
_log_summary_service: LogSummaryService | None = None
|
||||
|
||||
|
||||
def get_log_summary_service() -> LogSummaryService:
|
||||
global _log_summary_service
|
||||
if _log_summary_service is None:
|
||||
_log_summary_service = LogSummaryService()
|
||||
return _log_summary_service
|
||||
Reference in New Issue
Block a user