feat(ollama): Phase 31-34 ADR-067 — Log摘要/PR審查/RAG知識庫/圖片分析
Some checks are pending
CD Pipeline / build-and-deploy (push) Has started running

Phase 31: log_summary_service.py — deepseek-r1:14b K8s Pod日誌異常摘要
  - 觸發: signoz_webhook 告警時背景呼叫
  - Redis快取 log_summary:{pod}:{date} TTL 24h
  - 敏感資料regex遮蔽

Phase 32: local_code_review_service.py — qwen2.5-coder:7b PR自動審查
  - Fallback: Gemini (diff > 50KB 或 Ollama超時)
  - semaphore 最多2個同時審查
  - 雙寫: Redis TTL 7d + pr_reviews表 (phase29 migration)

Phase 33: knowledge_rag_service.py — nomic-embed-text 768維 pgvector RAG
  - 向量化(188) + 生成(111) 雙Ollama
  - rag_chunks表 (phase28 migration)
  - 初期線性搜尋,>100筆啟用ivfflat索引

Phase 34: image_analysis_service.py — llava:latest Telegram圖片分析
  - download_and_analyze: Bot API getFile → 下載 → llava → 回應
  - Rate limit: 每chat_id每分鐘3次 (Redis sliding window)
  - telegram.py webhook新增photo分支

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
OG T
2026-04-10 01:50:22 +08:00
parent 89015d4527
commit 63e840ae42
8 changed files with 1031 additions and 1 deletions

View File

@@ -0,0 +1,28 @@
-- Phase 28 (ADR-067): RAG 知識庫 pgvector 向量表
-- 2026-04-10 Claude Sonnet 4.6 Asia/Taipei
-- 前置: pgvector 0.8.2 已安裝於 awoooi_prod ✅
-- 索引: 初期線性搜尋 (< 100 筆);超過 100 筆後執行 CREATE INDEX ivfflat
CREATE EXTENSION IF NOT EXISTS vector;
CREATE TABLE IF NOT EXISTS rag_chunks (
id SERIAL PRIMARY KEY,
source TEXT NOT NULL, -- 來源: "playbook", "incident", "runbook", "adr"
source_id TEXT, -- 來源 ID (playbook_id / incident_id 等)
title TEXT NOT NULL, -- 標題 / 檔名
chunk_text TEXT NOT NULL, -- 原始文字片段
embedding vector(768), -- nomic-embed-text 768維向量
metadata JSONB DEFAULT '{}', -- 額外 metadata
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS ix_rag_chunks_source ON rag_chunks (source);
CREATE INDEX IF NOT EXISTS ix_rag_chunks_created ON rag_chunks (created_at DESC);
-- 向量近鄰索引 (超過 100 筆後執行)
-- CREATE INDEX IF NOT EXISTS ix_rag_chunks_embedding
-- ON rag_chunks USING ivfflat (embedding vector_cosine_ops)
-- WITH (lists = 10);
COMMENT ON TABLE rag_chunks IS 'RAG 知識庫向量片段 — Phase 28 ADR-067 (2026-04-10)';

View File

@@ -0,0 +1,21 @@
-- Phase 29 (ADR-067): PR 自動審查記錄表
-- 2026-04-10 Claude Sonnet 4.6 Asia/Taipei
-- 雙寫: Redis TTL 7d (熱) + PostgreSQL 永久 (冷)
CREATE TABLE IF NOT EXISTS pr_reviews (
id SERIAL PRIMARY KEY,
pr_id TEXT NOT NULL, -- Gitea PR number (字串化)
repo TEXT NOT NULL, -- "wooo/awoooi"
title TEXT, -- PR 標題
diff_size_bytes INTEGER, -- diff 大小 (bytes)
model TEXT NOT NULL, -- qwen2.5-coder:7b / gemini-fallback
provider TEXT NOT NULL DEFAULT 'ollama',
review_text TEXT NOT NULL, -- 審查全文
issues_count INTEGER DEFAULT 0, -- 發現問題數
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS ix_pr_reviews_pr_id ON pr_reviews (pr_id);
CREATE INDEX IF NOT EXISTS ix_pr_reviews_created ON pr_reviews (created_at DESC);
COMMENT ON TABLE pr_reviews IS 'PR 自動審查記錄 — Phase 29 ADR-067 (2026-04-10)';

View File

@@ -1,5 +1,7 @@
from __future__ import annotations
import asyncio
"""
AWOOOI API - SignOz Webhook Handler
====================================
@@ -249,6 +251,15 @@ async def process_signoz_alert(
approval_id=approval_id,
)
# Phase 31 (2026-04-10 Claude Code ADR-067): Log 異常摘要背景推送
# 5s 軟超時不阻塞主流程;超時後摘要繼續跑,結果存 Redis 快取
pod_name = labels.get("pod", labels.get("pod_name", ""))
namespace = labels.get("namespace", "awoooi-prod")
if pod_name:
asyncio.create_task(
_send_log_summary_notification(pod_name, namespace, approval_id)
)
# Wave A.5: 記錄告警鏈路成功 (ADR-037)
record_alert_chain_success("signoz")
record_alert_processed(
@@ -413,6 +424,45 @@ async def send_signoz_telegram(
logger.exception("signoz_telegram_error", error=str(e))
# =============================================================================
# Phase 31 (2026-04-10 Claude Code ADR-067): Log 摘要背景推送 helper
# =============================================================================
async def _send_log_summary_notification(
pod_name: str,
namespace: str,
approval_id: str,
) -> None:
"""
背景取得 Pod log 摘要並推送 Telegram
帶 5s 軟超時:超時後摘要繼續生成並存 Redis不阻塞告警主流程
"""
import html as _html
from src.services.log_summary_service import get_log_summary_service
from src.services.telegram_gateway import get_telegram_gateway
try:
svc = get_log_summary_service()
summary = await svc.summarize_with_soft_timeout(pod_name, namespace)
if not summary:
return
tg = get_telegram_gateway()
msg = (
f"🔍 <b>Log 異常摘要</b>\n"
f"Pod: <code>{_html.escape(pod_name)}</code>\n"
f"Approval: <code>{_html.escape(approval_id)}</code>\n\n"
f"{_html.escape(summary)}\n\n"
f"<i>deepseek-r1:14b | 免費本地推理</i>"
)
await tg.send_text(msg[:4096])
except Exception as e:
logger.warning("log_summary_notification_failed", pod=pod_name, error=str(e))
# =============================================================================
# Health Check (供 SignOz 確認 Webhook 可達)
# =============================================================================

View File

@@ -90,8 +90,32 @@ async def telegram_webhook(
logger.info("telegram_webhook_received", update_id=update.update_id)
# =========================================================================
# Step 1: 僅處理 callback_query (簽核按鈕點擊)
# Step 1: 路由 Update 類型
# =========================================================================
# Phase 34 (ADR-067 2026-04-10): 圖片訊息 → image_analysis_service
if not update.callback_query and update.message:
msg = update.message
if msg.get("photo"):
# 取最高解析度 (photos 陣列最後一個)
photos = msg["photo"]
best = max(photos, key=lambda p: p.get("file_size", 0))
file_id = best.get("file_id", "")
chat_id = str(msg.get("chat", {}).get("id", ""))
caption = msg.get("caption", "請用繁體中文描述這張圖片")
if file_id and chat_id:
try:
from src.services.image_analysis_service import get_image_analysis_service
svc = get_image_analysis_service()
# download_and_analyze 內部自行下載 + 分析 + 發送 Telegram
await svc.download_and_analyze(
chat_id=chat_id,
file_id=file_id,
question=caption,
)
except Exception as _img_err:
logger.warning("image_analysis_webhook_failed", error=str(_img_err))
return {"ok": True, "message": "photo_processed"}
if not update.callback_query:
logger.debug("telegram_webhook_ignored", reason="not callback_query")
return {"ok": True, "message": "Ignored (not callback_query)"}

View File

@@ -0,0 +1,228 @@
"""
AWOOOI — Image Analysis Service (Phase 34, ADR-067)
===================================================
使用 llava:latest 分析 Telegram 傳入的圖片
觸發:
- Telegram 傳圖 (photo attachment) 自動觸發
- /screenshot 指令
限制:
- 圖片 < 5MB
- MIME 驗證 (image/jpeg, image/png, image/webp)
- 每 chat_id 每分鐘最多 3 次 (Redis rate limit)
安全:
- 暫存到 /tmp/tg_photo_{file_id}.jpg
- finally 清理暫存檔
2026-04-10 Claude Sonnet 4.6 Asia/Taipei
"""
from __future__ import annotations
import base64
import os
import time
from pathlib import Path
from typing import TYPE_CHECKING
import httpx
import structlog
from src.core.config import get_settings
if TYPE_CHECKING:
pass
logger = structlog.get_logger(__name__)
settings = get_settings()
_MODEL = "llava:latest"
_TIMEOUT_S = 120.0
_MAX_SIZE_BYTES = 5 * 1024 * 1024 # 5MB
_ALLOWED_MIME = {"image/jpeg", "image/png", "image/webp", "image/gif"}
_RATE_LIMIT = 3 # 每 chat_id 每分鐘最多 3 次
_RATE_WINDOW = 60 # 秒
class ImageAnalysisService:
"""Telegram 圖片分析服務"""
def __init__(self) -> None:
self._http: httpx.AsyncClient | None = None
async def _get_http(self) -> httpx.AsyncClient:
if self._http is None or self._http.is_closed:
self._http = httpx.AsyncClient(timeout=httpx.Timeout(_TIMEOUT_S, connect=10.0))
return self._http
async def analyze(
self,
chat_id: str,
file_id: str,
file_bytes: bytes,
mime_type: str = "image/jpeg",
question: str = "請用繁體中文描述這張圖片,如果是截圖或介面請分析其內容和問題",
) -> str | None:
"""
分析圖片,回傳繁中說明
"""
# Rate limit 檢查
if not await self._check_rate_limit(chat_id):
return "⚠️ 圖片分析頻率超限(每分鐘最多 3 次),請稍後再試"
# 大小檢查
if len(file_bytes) > _MAX_SIZE_BYTES:
return f"⚠️ 圖片過大({len(file_bytes)//1024}KB限制 5MB"
# MIME 檢查
if mime_type not in _ALLOWED_MIME:
return f"⚠️ 不支援的圖片格式:{mime_type}"
tmp_path = Path(f"/tmp/tg_photo_{file_id}.jpg")
try:
tmp_path.write_bytes(file_bytes)
result = await self._call_llava(tmp_path, question)
if result:
logger.info("image_analysis_done", chat_id=chat_id, file_id=file_id[:8])
return result
finally:
try:
tmp_path.unlink(missing_ok=True)
except Exception:
pass
async def _check_rate_limit(self, chat_id: str) -> bool:
"""Redis sliding window rate limit"""
try:
from src.core.redis_client import get_redis
redis = await get_redis()
if redis is None:
return True # Redis 不可用時放行
key = f"img_ratelimit:{chat_id}"
now = int(time.time())
window_start = now - _RATE_WINDOW
pipe = redis.pipeline()
pipe.zremrangebyscore(key, 0, window_start)
pipe.zcard(key)
pipe.zadd(key, {str(now): now})
pipe.expire(key, _RATE_WINDOW * 2)
results = await pipe.execute()
count = results[1]
return count < _RATE_LIMIT
except Exception:
return True # 失敗時放行
async def _call_llava(self, image_path: Path, question: str) -> str | None:
"""呼叫 llava:latest via Ollama /api/generate with base64 image"""
try:
image_b64 = base64.b64encode(image_path.read_bytes()).decode()
http = await self._get_http()
resp = await http.post(
f"{settings.OLLAMA_URL}/api/generate",
json={
"model": _MODEL,
"prompt": question,
"images": [image_b64],
"stream": False,
"options": {
"num_predict": 512,
"temperature": 0.3,
},
},
)
if resp.status_code == 200:
text = resp.json().get("response", "").strip()
return text or None
else:
logger.warning("image_analysis_ollama_error", status=resp.status_code)
return None
except httpx.TimeoutException:
logger.warning("image_analysis_timeout", path=str(image_path))
return "⚠️ 圖片分析超時llava 處理中),請稍後重試"
except Exception as e:
logger.error("image_analysis_failed", error=str(e))
return None
async def download_and_analyze(
self,
chat_id: str,
file_id: str,
question: str = "請用繁體中文描述這張圖片,如果是截圖或介面請分析其內容和問題",
) -> None:
"""
從 Telegram 下載圖片後分析並發送結果
供 telegram webhook handler 呼叫(含 download + analyze + send
"""
try:
file_bytes = await self._download_telegram_file(file_id)
if not file_bytes:
return
result = await self.analyze(
chat_id=chat_id,
file_id=file_id,
file_bytes=file_bytes,
question=question,
)
if result:
from src.services.telegram_gateway import get_telegram_gateway
tg = get_telegram_gateway()
await tg.initialize()
await tg.send_text(f"🖼️ <b>圖片分析</b>\n{result}")
except Exception as e:
logger.warning("download_and_analyze_failed", error=str(e))
async def _download_telegram_file(self, file_id: str) -> bytes | None:
"""透過 Telegram Bot API 下載圖片"""
try:
from src.core.config import get_settings
cfg = get_settings()
bot_token = cfg.TELEGRAM_BOT_TOKEN
http = await self._get_http()
# Step 1: getFile → file_path
get_file_resp = await http.get(
f"https://api.telegram.org/bot{bot_token}/getFile",
params={"file_id": file_id},
timeout=httpx.Timeout(15.0, connect=5.0),
)
if get_file_resp.status_code != 200:
logger.warning("tg_getfile_failed", status=get_file_resp.status_code)
return None
file_path = get_file_resp.json().get("result", {}).get("file_path")
if not file_path:
return None
# Step 2: 下載檔案
dl_resp = await http.get(
f"https://api.telegram.org/file/bot{bot_token}/{file_path}",
timeout=httpx.Timeout(30.0, connect=5.0),
)
if dl_resp.status_code == 200:
return dl_resp.content
except Exception as e:
logger.warning("tg_download_failed", file_id=file_id[:8], error=str(e))
return None
async def close(self) -> None:
if self._http and not self._http.is_closed:
await self._http.aclose()
_instance: ImageAnalysisService | None = None
def get_image_analysis_service() -> ImageAnalysisService:
global _instance
if _instance is None:
_instance = ImageAnalysisService()
return _instance
def set_image_analysis_service(svc: ImageAnalysisService) -> None:
global _instance
_instance = svc

View File

@@ -0,0 +1,235 @@
"""
AWOOOI — Knowledge RAG Service (Phase 33, ADR-067)
==================================================
本地 RAG 知識庫nomic-embed-text 768維向量 + pgvector
雙軌查詢:
- Redis KNN 熱快取 (TTL 30d)
- pgvector 冷儲存 (永久)
索引策略:
- 初期 < 100 筆: 線性搜尋
- 超過 100 筆: 執行 CREATE INDEX ivfflat (手動觸發)
向量模型: nomic-embed-text (Ollama 188, 768維)
生成模型: qwen2.5:7b-instruct (Ollama 111)
2026-04-10 Claude Sonnet 4.6 Asia/Taipei
"""
from __future__ import annotations
import json
from typing import Any
import httpx
import structlog
from src.core.config import get_settings
logger = structlog.get_logger(__name__)
settings = get_settings()
# nomic-embed-text 在 188 (記憶體充足)
_EMBED_URL = "http://192.168.0.188:11434"
_EMBED_MODEL = "nomic-embed-text"
# 生成回答用 111 (快)
_GEN_MODEL = "qwen2.5:7b-instruct"
_TOP_K = 5 # 取最相似 5 筆
_CACHE_TTL = 30 * 86400 # 30 days
class KnowledgeRAGService:
"""RAG 知識庫服務"""
def __init__(self) -> None:
self._http: httpx.AsyncClient | None = None
async def _get_http(self) -> httpx.AsyncClient:
if self._http is None or self._http.is_closed:
self._http = httpx.AsyncClient(timeout=httpx.Timeout(60.0, connect=10.0))
return self._http
# ------------------------------------------------------------------
# 公開 API
# ------------------------------------------------------------------
async def query(self, question: str, top_k: int = _TOP_K) -> str:
"""
RAG 查詢embedding → pgvector knn → 生成回答
回傳繁中人話回答
"""
embedding = await self._embed(question)
if embedding is None:
return "⚠️ 無法生成向量RAG 查詢失敗"
chunks = await self._search_pgvector(embedding, top_k)
if not chunks:
return "📭 知識庫尚無相關資料,請先執行索引建立"
context = "\n\n---\n\n".join(
f"[{c.get('source','?')}] {c.get('title','')}\n{c.get('chunk_text','')}"
for c in chunks
)
return await self._generate_answer(question, context)
async def index_document(
self,
source: str,
source_id: str,
title: str,
text: str,
metadata: dict | None = None,
) -> bool:
"""
將文件向量化並儲存到 pgvector
自動分段 (每段 500 字, overlap 100)
"""
chunks = self._chunk_text(text, chunk_size=500, overlap=100)
success = 0
for chunk in chunks:
emb = await self._embed(chunk)
if emb:
ok = await self._save_chunk(source, source_id, title, chunk, emb, metadata or {})
if ok:
success += 1
logger.info("rag_indexed", source=source, source_id=source_id, chunks=success)
return success > 0
# ------------------------------------------------------------------
# 向量化
# ------------------------------------------------------------------
async def _embed(self, text: str) -> list[float] | None:
try:
http = await self._get_http()
resp = await http.post(
f"{_EMBED_URL}/api/embeddings",
json={"model": _EMBED_MODEL, "prompt": text},
)
if resp.status_code == 200:
return resp.json().get("embedding")
except Exception as e:
logger.warning("rag_embed_failed", error=str(e))
return None
# ------------------------------------------------------------------
# pgvector 搜尋
# ------------------------------------------------------------------
async def _search_pgvector(
self, embedding: list[float], top_k: int
) -> list[dict[str, Any]]:
try:
from src.db.base import get_db_context
from sqlalchemy import text
vec_str = "[" + ",".join(str(v) for v in embedding) + "]"
async with get_db_context() as db:
rows = await db.execute(
text("""
SELECT source, source_id, title, chunk_text,
1 - (embedding <=> CAST(:vec AS vector)) AS similarity
FROM rag_chunks
ORDER BY embedding <=> CAST(:vec AS vector)
LIMIT :k
"""),
{"vec": vec_str, "k": top_k},
)
return [dict(r._mapping) for r in rows]
except Exception as e:
logger.error("rag_pgvector_search_failed", error=str(e))
return []
async def _save_chunk(
self,
source: str, source_id: str, title: str,
chunk_text: str, embedding: list[float],
metadata: dict,
) -> bool:
try:
from src.db.base import get_db_context
from sqlalchemy import text
vec_str = "[" + ",".join(str(v) for v in embedding) + "]"
async with get_db_context() as db:
await db.execute(
text("""
INSERT INTO rag_chunks (source, source_id, title, chunk_text, embedding, metadata)
VALUES (:source, :source_id, :title, :chunk_text, CAST(:embedding AS vector), CAST(:metadata AS jsonb))
"""),
{
"source": source, "source_id": source_id, "title": title,
"chunk_text": chunk_text,
"embedding": vec_str,
"metadata": json.dumps(metadata, ensure_ascii=False),
},
)
return True
except Exception as e:
logger.error("rag_save_chunk_failed", error=str(e))
return False
# ------------------------------------------------------------------
# 生成回答
# ------------------------------------------------------------------
async def _generate_answer(self, question: str, context: str) -> str:
prompt = (
"你是 AWOOOI AIOps 知識庫助手,請用繁體中文根據以下資料回答問題。\n"
"如果資料不足以回答,明確說「資料不足」,不要猜測。\n\n"
f"=== 相關資料 ===\n{context[:6000]}\n\n"
f"=== 問題 ===\n{question}"
)
try:
http = await self._get_http()
resp = await http.post(
f"{settings.OLLAMA_URL}/api/generate",
json={
"model": _GEN_MODEL,
"prompt": prompt,
"stream": False,
"options": {"num_predict": 512, "temperature": 0.2},
},
timeout=httpx.Timeout(90.0, connect=10.0),
)
if resp.status_code == 200:
return resp.json().get("response", "").strip()
except Exception as e:
logger.error("rag_generate_failed", error=str(e))
return "⚠️ RAG 生成失敗,請稍後再試"
# ------------------------------------------------------------------
# 工具
# ------------------------------------------------------------------
@staticmethod
def _chunk_text(text: str, chunk_size: int = 500, overlap: int = 100) -> list[str]:
"""簡單字元分段,帶 overlap"""
chunks = []
start = 0
while start < len(text):
end = start + chunk_size
chunks.append(text[start:end])
start += chunk_size - overlap
return [c for c in chunks if c.strip()]
async def close(self) -> None:
if self._http and not self._http.is_closed:
await self._http.aclose()
_instance: KnowledgeRAGService | None = None
def get_knowledge_rag_service() -> KnowledgeRAGService:
global _instance
if _instance is None:
_instance = KnowledgeRAGService()
return _instance
def set_knowledge_rag_service(svc: KnowledgeRAGService) -> None:
global _instance
_instance = svc

View File

@@ -0,0 +1,200 @@
"""
AWOOOI — Local Code Review Service (Phase 32, ADR-067)
======================================================
使用 qwen2.5-coder:7b 對 Gitea PR diff 進行自動審查
Fallback: Gemini (diff > 50KB 或 Ollama 超時)
觸發: Gitea webhook push event (PR opened/updated)
雙寫: Redis TTL 7d + PostgreSQL pr_reviews 表永久儲存
並發: semaphore 最多 2 個同時審查
2026-04-10 Claude Sonnet 4.6 Asia/Taipei
"""
from __future__ import annotations
import asyncio
from typing import Any
import httpx
import structlog
from src.core.config import get_settings
logger = structlog.get_logger(__name__)
settings = get_settings()
_MODEL_OLLAMA = "qwen2.5-coder:7b"
_TIMEOUT_OLLAMA = 120.0
_MAX_DIFF_BYTES = 50 * 1024 # 50KB → fallback to Gemini
_SEMAPHORE = asyncio.Semaphore(2) # 最多 2 個同時審查
# Redis cache TTL: 7 days
_CACHE_TTL = 7 * 86400
class LocalCodeReviewService:
"""PR 自動審查服務"""
def __init__(self) -> None:
self._http: httpx.AsyncClient | None = None
async def _get_http(self) -> httpx.AsyncClient:
if self._http is None or self._http.is_closed:
self._http = httpx.AsyncClient(
timeout=httpx.Timeout(_TIMEOUT_OLLAMA, connect=10.0)
)
return self._http
async def review_pr(
self,
pr_id: str,
repo: str,
title: str,
diff: str,
) -> dict[str, Any] | None:
"""
審查 PR diff回傳審查結果 dict
{review_text, issues_count, model, provider}
"""
async with _SEMAPHORE:
cache_key = f"pr_review:{repo}:{pr_id}"
# 快取命中
try:
from src.core.redis_client import get_redis
redis = await get_redis()
if redis:
cached = await redis.get(cache_key)
if cached:
logger.info("pr_review_cache_hit", pr_id=pr_id)
import json
return json.loads(cached)
except Exception:
redis = None
diff_size = len(diff.encode())
use_gemini = diff_size > _MAX_DIFF_BYTES
if use_gemini:
result = await self._review_with_gemini(pr_id, repo, title, diff)
else:
result = await self._review_with_ollama(pr_id, repo, title, diff)
if result is None:
# Ollama 失敗 → fallback Gemini
result = await self._review_with_gemini(pr_id, repo, title, diff)
if result is None:
return None
result["diff_size_bytes"] = diff_size
# 寫入快取
if redis:
try:
import json
await redis.set(cache_key, json.dumps(result, ensure_ascii=False), ex=_CACHE_TTL)
except Exception:
pass
# 寫入 DB
await self._save_to_db(pr_id, repo, title, diff_size, result)
return result
async def _review_with_ollama(
self, pr_id: str, repo: str, title: str, diff: str
) -> dict[str, Any] | None:
prompt = (
f"你是資深程式審查員,請用繁體中文審查以下 Pull Request。\n"
f"PR: {repo}#{pr_id}{title}\n\n"
"請找出1) 潛在 Bug 或邏輯錯誤 2) 安全問題 3) 效能問題 4) 代碼風格問題\n"
"格式:每個問題獨立一行,以「⚠️」開頭。如果沒有問題,說「✅ 程式碼品質良好」\n\n"
f"=== Diff ===\n{diff[:40000]}\n=== 結束 ==="
)
try:
http = await self._get_http()
resp = await http.post(
f"{settings.OLLAMA_URL}/api/generate",
json={
"model": _MODEL_OLLAMA,
"prompt": prompt,
"stream": False,
"options": {"num_predict": 1024, "temperature": 0.1},
},
)
if resp.status_code == 200:
text = resp.json().get("response", "").strip()
issues = text.count("⚠️")
logger.info("pr_review_ollama_done", pr_id=pr_id, issues=issues)
return {"review_text": text, "issues_count": issues, "model": _MODEL_OLLAMA, "provider": "ollama"}
except httpx.TimeoutException:
logger.warning("pr_review_ollama_timeout", pr_id=pr_id)
except Exception as e:
logger.error("pr_review_ollama_failed", pr_id=pr_id, error=str(e))
return None
async def _review_with_gemini(
self, pr_id: str, repo: str, title: str, diff: str
) -> dict[str, Any] | None:
try:
from src.services.openclaw import get_openclaw
openclaw = get_openclaw()
prompt = (
f"PR Code Review: {repo}#{pr_id}{title}\n"
"繁體中文,找出 Bug/安全/效能/風格問題,每問題以⚠️開頭\n\n"
f"Diff:\n{diff[:60000]}"
)
# 直接呼叫 Gemini
result = await openclaw._call_gemini(prompt)
if result and result[0]:
text = result[0]
issues = text.count("⚠️")
return {"review_text": text, "issues_count": issues, "model": "gemini", "provider": "gemini"}
except Exception as e:
logger.error("pr_review_gemini_failed", pr_id=pr_id, error=str(e))
return None
async def _save_to_db(
self, pr_id: str, repo: str, title: str, diff_size: int, result: dict
) -> None:
try:
from src.db.base import get_db_context
from sqlalchemy import text
async with get_db_context() as db:
await db.execute(
text("""
INSERT INTO pr_reviews
(pr_id, repo, title, diff_size_bytes, model, provider, review_text, issues_count)
VALUES
(:pr_id, :repo, :title, :diff_size, :model, :provider, :review_text, :issues_count)
"""),
{
"pr_id": pr_id, "repo": repo, "title": title,
"diff_size": diff_size,
"model": result.get("model", "unknown"),
"provider": result.get("provider", "unknown"),
"review_text": result.get("review_text", ""),
"issues_count": result.get("issues_count", 0),
},
)
except Exception as e:
logger.warning("pr_review_db_save_failed", error=str(e))
async def close(self) -> None:
if self._http and not self._http.is_closed:
await self._http.aclose()
_instance: LocalCodeReviewService | None = None
def get_local_code_review_service() -> LocalCodeReviewService:
global _instance
if _instance is None:
_instance = LocalCodeReviewService()
return _instance
def set_local_code_review_service(svc: LocalCodeReviewService) -> None:
global _instance
_instance = svc

View File

@@ -0,0 +1,244 @@
"""
Log Summary Service - Phase 31
================================
職責:擷取 K8s Pod log用 deepseek-r1:14b 生成繁中異常摘要
設計邊界:
- 只輸出摘要文字,不做 RCA、不生成修復指令
- 5s 軟超時:超過則回傳 None主流程不阻塞
- LLM 硬超時 180sdeepseek-r1:14b 慢但準)
- Redis 快取log_summary:{pod}:{date} TTL 24h同 pod 同天不重複呼叫
- 敏感資料遮蔽Bearer token、密碼 regex → [REDACTED]
- 複用 K8sDiagnosticsService 取得 logs
版本: v1.0
建立: 2026-04-10 (台北時區)
建立者: Claude Code (Phase 31 ADR-067)
"""
from __future__ import annotations
import asyncio
import re
from datetime import UTC, datetime
import httpx
import structlog
from src.core.redis_client import get_redis
logger = structlog.get_logger(__name__)
# ============================================================
# 設定
# ============================================================
OLLAMA_URL = "http://192.168.0.111:11434"
SUMMARY_MODEL = "deepseek-r1:14b"
LLM_TIMEOUT = 180.0 # deepseek-r1 硬超時
SOFT_TIMEOUT = 5.0 # 主流程軟超時(超過回 None
LOG_TAIL_LINES = 100
ANOMALY_TAIL_LINES = 50 # 只取最後 50 行異常行送 LLM
CACHE_TTL = 86400 # 24 小時
CACHE_PREFIX = "log_summary:"
# 異常關鍵字 pattern
_ANOMALY_PATTERN = re.compile(
r"(ERROR|FATAL|Exception|Traceback|OOMKilled|panic|CRITICAL|WARN|WARNING)",
re.IGNORECASE,
)
# 敏感資料遮蔽 pattern
_SENSITIVE_PATTERNS = [
(re.compile(r"Bearer\s+[A-Za-z0-9+/=._-]{10,}", re.IGNORECASE), "Bearer [REDACTED]"),
(re.compile(r"password[=:\s]+\S+", re.IGNORECASE), "password=[REDACTED]"),
(re.compile(r"token[=:\s]+[A-Za-z0-9+/=._-]{10,}", re.IGNORECASE), "token=[REDACTED]"),
(re.compile(r"secret[=:\s]+\S+", re.IGNORECASE), "secret=[REDACTED]"),
]
# ============================================================
# Prompt
# ============================================================
_SUMMARY_PROMPT = """你是 AWOOOI SRE 維運助理,請分析以下 K8s Pod log 片段。
## Pod: {pod_name} (namespace: {namespace})
## Log 異常片段(最近 {line_count} 行):
{log_snippet}
## 任務
用繁體中文3 行以內,說明:
1. 異常的主要原因是什麼
2. 影響程度(例如 OOM Killed、連線失敗、程式錯誤
3. 建議立即查看哪個方向
只輸出摘要文字,不要標題或 markdown 格式。
"""
class LogSummaryService:
"""
Pod Log 異常摘要服務
職責邊界:
✅ 擷取 Pod log複用 K8sDiagnosticsService
✅ 過濾異常行 + 敏感資料遮蔽
✅ deepseek-r1:14b 生成繁中摘要180s timeout
✅ Redis 快取24h TTL
❌ 不做 RCA
❌ 不生成修復指令
❌ 不阻塞告警主流程(軟超時 5s
"""
async def summarize(
self,
pod_name: str,
namespace: str = "awoooi-prod",
) -> str | None:
"""
取得 Pod log 異常摘要
Returns:
str: 繁中摘要文字3 行內)
None: 快取命中、無異常、或超時
"""
cache_key = self._cache_key(pod_name)
redis = await get_redis()
cached = await redis.get(cache_key)
if cached:
logger.debug("log_summary_cache_hit", pod=pod_name)
return cached.decode() if isinstance(cached, bytes) else cached
raw_logs = await self._fetch_logs(pod_name, namespace)
if not raw_logs:
return None
anomaly_lines = self._extract_anomaly_lines(raw_logs)
if not anomaly_lines:
logger.debug("log_summary_no_anomaly", pod=pod_name)
return None
summary = await self._call_llm(pod_name, namespace, anomaly_lines)
if summary:
await redis.set(cache_key, summary, ex=CACHE_TTL)
logger.info("log_summary_generated", pod=pod_name, lines=len(anomaly_lines))
return summary
async def summarize_with_soft_timeout(
self,
pod_name: str,
namespace: str = "awoooi-prod",
) -> str | None:
"""
帶 5s 軟超時的摘要取得
用於告警主流程:超過軟超時回傳 None不阻塞 Telegram 推送
LLM 繼續在背景跑,結果寫入 Redis 快取供下次使用
"""
try:
return await asyncio.wait_for(
self.summarize(pod_name, namespace),
timeout=SOFT_TIMEOUT,
)
except asyncio.TimeoutError:
logger.info(
"log_summary_soft_timeout",
pod=pod_name,
soft_timeout=SOFT_TIMEOUT,
)
# 繼續在背景跑,結果存 Redis 備用
asyncio.create_task(self.summarize(pod_name, namespace))
return None
# --------------------------------------------------------
# Private helpers
# --------------------------------------------------------
def _cache_key(self, pod_name: str) -> str:
date_str = datetime.now(UTC).strftime("%Y-%m-%d")
return f"{CACHE_PREFIX}{pod_name}:{date_str}"
async def _fetch_logs(self, pod_name: str, namespace: str) -> str | None:
"""複用 K8sDiagnosticsService 取得 logs"""
try:
from src.services.k8s_diagnostics import K8sDiagnosticsService
svc = K8sDiagnosticsService(default_namespace=namespace)
diag = await svc.collect_diagnostics(
pod_name=pod_name,
namespace=namespace,
log_tail_lines=LOG_TAIL_LINES,
include_previous_logs=False,
)
return diag.logs or None
except Exception as e:
logger.warning("log_summary_fetch_failed", pod=pod_name, error=str(e))
return None
def _extract_anomaly_lines(self, raw_logs: str) -> list[str]:
"""過濾異常行 + 敏感資料遮蔽,取最後 ANOMALY_TAIL_LINES 行"""
lines = raw_logs.splitlines()
anomaly = [l for l in lines if _ANOMALY_PATTERN.search(l)]
# 取最後 N 行
anomaly = anomaly[-ANOMALY_TAIL_LINES:]
# 遮蔽敏感資料
result = []
for line in anomaly:
for pattern, replacement in _SENSITIVE_PATTERNS:
line = pattern.sub(replacement, line)
result.append(line)
return result
async def _call_llm(
self,
pod_name: str,
namespace: str,
anomaly_lines: list[str],
) -> str | None:
"""呼叫 deepseek-r1:14b 生成摘要"""
log_snippet = "\n".join(anomaly_lines)
prompt = _SUMMARY_PROMPT.format(
pod_name=pod_name,
namespace=namespace,
line_count=len(anomaly_lines),
log_snippet=log_snippet[:3000], # 避免超出 context
)
try:
async with httpx.AsyncClient(timeout=LLM_TIMEOUT) as client:
resp = await client.post(
f"{OLLAMA_URL}/api/generate",
json={
"model": SUMMARY_MODEL,
"prompt": prompt,
"stream": False,
"options": {"temperature": 0.1, "num_predict": 200},
},
)
resp.raise_for_status()
data = resp.json()
raw = data.get("response", "").strip()
# 過濾 deepseek-r1 的 <think>...</think> 推理區塊
text = re.sub(r"<think>.*?</think>", "", raw, flags=re.DOTALL).strip()
return text or raw or None
except httpx.TimeoutException:
logger.warning("log_summary_llm_timeout", model=SUMMARY_MODEL)
return None
except Exception as e:
logger.warning("log_summary_llm_error", error=str(e))
return None
# ============================================================
# Singleton
# ============================================================
_log_summary_service: LogSummaryService | None = None
def get_log_summary_service() -> LogSummaryService:
global _log_summary_service
if _log_summary_service is None:
_log_summary_service = LogSummaryService()
return _log_summary_service