diff --git a/apps/api/migrations/phase28_rag_pgvector.sql b/apps/api/migrations/phase28_rag_pgvector.sql new file mode 100644 index 00000000..0c0178fa --- /dev/null +++ b/apps/api/migrations/phase28_rag_pgvector.sql @@ -0,0 +1,28 @@ +-- Phase 28 (ADR-067): RAG 知識庫 pgvector 向量表 +-- 2026-04-10 Claude Sonnet 4.6 Asia/Taipei +-- 前置: pgvector 0.8.2 已安裝於 awoooi_prod ✅ +-- 索引: 初期線性搜尋 (< 100 筆);超過 100 筆後執行 CREATE INDEX ivfflat + +CREATE EXTENSION IF NOT EXISTS vector; + +CREATE TABLE IF NOT EXISTS rag_chunks ( + id SERIAL PRIMARY KEY, + source TEXT NOT NULL, -- 來源: "playbook", "incident", "runbook", "adr" + source_id TEXT, -- 來源 ID (playbook_id / incident_id 等) + title TEXT NOT NULL, -- 標題 / 檔名 + chunk_text TEXT NOT NULL, -- 原始文字片段 + embedding vector(768), -- nomic-embed-text 768維向量 + metadata JSONB DEFAULT '{}', -- 額外 metadata + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS ix_rag_chunks_source ON rag_chunks (source); +CREATE INDEX IF NOT EXISTS ix_rag_chunks_created ON rag_chunks (created_at DESC); + +-- 向量近鄰索引 (超過 100 筆後執行) +-- CREATE INDEX IF NOT EXISTS ix_rag_chunks_embedding +-- ON rag_chunks USING ivfflat (embedding vector_cosine_ops) +-- WITH (lists = 10); + +COMMENT ON TABLE rag_chunks IS 'RAG 知識庫向量片段 — Phase 28 ADR-067 (2026-04-10)'; diff --git a/apps/api/migrations/phase29_pr_reviews.sql b/apps/api/migrations/phase29_pr_reviews.sql new file mode 100644 index 00000000..315ad91e --- /dev/null +++ b/apps/api/migrations/phase29_pr_reviews.sql @@ -0,0 +1,21 @@ +-- Phase 29 (ADR-067): PR 自動審查記錄表 +-- 2026-04-10 Claude Sonnet 4.6 Asia/Taipei +-- 雙寫: Redis TTL 7d (熱) + PostgreSQL 永久 (冷) + +CREATE TABLE IF NOT EXISTS pr_reviews ( + id SERIAL PRIMARY KEY, + pr_id TEXT NOT NULL, -- Gitea PR number (字串化) + repo TEXT NOT NULL, -- "wooo/awoooi" + title TEXT, -- PR 標題 + diff_size_bytes INTEGER, -- diff 大小 (bytes) + model TEXT NOT NULL, -- qwen2.5-coder:7b / gemini-fallback + provider TEXT NOT NULL DEFAULT 'ollama', + review_text TEXT NOT NULL, -- 審查全文 + issues_count INTEGER DEFAULT 0, -- 發現問題數 + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS ix_pr_reviews_pr_id ON pr_reviews (pr_id); +CREATE INDEX IF NOT EXISTS ix_pr_reviews_created ON pr_reviews (created_at DESC); + +COMMENT ON TABLE pr_reviews IS 'PR 自動審查記錄 — Phase 29 ADR-067 (2026-04-10)'; diff --git a/apps/api/src/api/v1/signoz_webhook.py b/apps/api/src/api/v1/signoz_webhook.py index ed7fb86c..592a9b00 100644 --- a/apps/api/src/api/v1/signoz_webhook.py +++ b/apps/api/src/api/v1/signoz_webhook.py @@ -1,5 +1,7 @@ from __future__ import annotations +import asyncio + """ AWOOOI API - SignOz Webhook Handler ==================================== @@ -249,6 +251,15 @@ async def process_signoz_alert( approval_id=approval_id, ) + # Phase 31 (2026-04-10 Claude Code ADR-067): Log 異常摘要背景推送 + # 5s 軟超時不阻塞主流程;超時後摘要繼續跑,結果存 Redis 快取 + pod_name = labels.get("pod", labels.get("pod_name", "")) + namespace = labels.get("namespace", "awoooi-prod") + if pod_name: + asyncio.create_task( + _send_log_summary_notification(pod_name, namespace, approval_id) + ) + # Wave A.5: 記錄告警鏈路成功 (ADR-037) record_alert_chain_success("signoz") record_alert_processed( @@ -413,6 +424,45 @@ async def send_signoz_telegram( logger.exception("signoz_telegram_error", error=str(e)) +# ============================================================================= +# Phase 31 (2026-04-10 Claude Code ADR-067): Log 摘要背景推送 helper +# ============================================================================= + +async def _send_log_summary_notification( + pod_name: str, + namespace: str, + approval_id: str, +) -> None: + """ + 背景取得 Pod log 摘要並推送 Telegram + + 帶 5s 軟超時:超時後摘要繼續生成並存 Redis,不阻塞告警主流程 + """ + import html as _html + from src.services.log_summary_service import get_log_summary_service + from src.services.telegram_gateway import get_telegram_gateway + + try: + svc = get_log_summary_service() + summary = await svc.summarize_with_soft_timeout(pod_name, namespace) + + if not summary: + return + + tg = get_telegram_gateway() + msg = ( + f"🔍 Log 異常摘要\n" + f"Pod: {_html.escape(pod_name)}\n" + f"Approval: {_html.escape(approval_id)}\n\n" + f"{_html.escape(summary)}\n\n" + f"deepseek-r1:14b | 免費本地推理" + ) + await tg.send_text(msg[:4096]) + + except Exception as e: + logger.warning("log_summary_notification_failed", pod=pod_name, error=str(e)) + + # ============================================================================= # Health Check (供 SignOz 確認 Webhook 可達) # ============================================================================= diff --git a/apps/api/src/api/v1/telegram.py b/apps/api/src/api/v1/telegram.py index 4b107654..2a576512 100644 --- a/apps/api/src/api/v1/telegram.py +++ b/apps/api/src/api/v1/telegram.py @@ -90,8 +90,32 @@ async def telegram_webhook( logger.info("telegram_webhook_received", update_id=update.update_id) # ========================================================================= - # Step 1: 僅處理 callback_query (簽核按鈕點擊) + # Step 1: 路由 Update 類型 # ========================================================================= + # Phase 34 (ADR-067 2026-04-10): 圖片訊息 → image_analysis_service + if not update.callback_query and update.message: + msg = update.message + if msg.get("photo"): + # 取最高解析度 (photos 陣列最後一個) + photos = msg["photo"] + best = max(photos, key=lambda p: p.get("file_size", 0)) + file_id = best.get("file_id", "") + chat_id = str(msg.get("chat", {}).get("id", "")) + caption = msg.get("caption", "請用繁體中文描述這張圖片") + if file_id and chat_id: + try: + from src.services.image_analysis_service import get_image_analysis_service + svc = get_image_analysis_service() + # download_and_analyze 內部自行下載 + 分析 + 發送 Telegram + await svc.download_and_analyze( + chat_id=chat_id, + file_id=file_id, + question=caption, + ) + except Exception as _img_err: + logger.warning("image_analysis_webhook_failed", error=str(_img_err)) + return {"ok": True, "message": "photo_processed"} + if not update.callback_query: logger.debug("telegram_webhook_ignored", reason="not callback_query") return {"ok": True, "message": "Ignored (not callback_query)"} diff --git a/apps/api/src/services/image_analysis_service.py b/apps/api/src/services/image_analysis_service.py new file mode 100644 index 00000000..122e99bd --- /dev/null +++ b/apps/api/src/services/image_analysis_service.py @@ -0,0 +1,228 @@ +""" +AWOOOI — Image Analysis Service (Phase 34, ADR-067) +=================================================== +使用 llava:latest 分析 Telegram 傳入的圖片 + +觸發: + - Telegram 傳圖 (photo attachment) 自動觸發 + - /screenshot 指令 + +限制: + - 圖片 < 5MB + - MIME 驗證 (image/jpeg, image/png, image/webp) + - 每 chat_id 每分鐘最多 3 次 (Redis rate limit) + +安全: + - 暫存到 /tmp/tg_photo_{file_id}.jpg + - finally 清理暫存檔 + +2026-04-10 Claude Sonnet 4.6 Asia/Taipei +""" +from __future__ import annotations + +import base64 +import os +import time +from pathlib import Path +from typing import TYPE_CHECKING + +import httpx +import structlog + +from src.core.config import get_settings + +if TYPE_CHECKING: + pass + +logger = structlog.get_logger(__name__) +settings = get_settings() + +_MODEL = "llava:latest" +_TIMEOUT_S = 120.0 +_MAX_SIZE_BYTES = 5 * 1024 * 1024 # 5MB +_ALLOWED_MIME = {"image/jpeg", "image/png", "image/webp", "image/gif"} +_RATE_LIMIT = 3 # 每 chat_id 每分鐘最多 3 次 +_RATE_WINDOW = 60 # 秒 + + +class ImageAnalysisService: + """Telegram 圖片分析服務""" + + def __init__(self) -> None: + self._http: httpx.AsyncClient | None = None + + async def _get_http(self) -> httpx.AsyncClient: + if self._http is None or self._http.is_closed: + self._http = httpx.AsyncClient(timeout=httpx.Timeout(_TIMEOUT_S, connect=10.0)) + return self._http + + async def analyze( + self, + chat_id: str, + file_id: str, + file_bytes: bytes, + mime_type: str = "image/jpeg", + question: str = "請用繁體中文描述這張圖片,如果是截圖或介面請分析其內容和問題", + ) -> str | None: + """ + 分析圖片,回傳繁中說明 + """ + # Rate limit 檢查 + if not await self._check_rate_limit(chat_id): + return "⚠️ 圖片分析頻率超限(每分鐘最多 3 次),請稍後再試" + + # 大小檢查 + if len(file_bytes) > _MAX_SIZE_BYTES: + return f"⚠️ 圖片過大({len(file_bytes)//1024}KB),限制 5MB" + + # MIME 檢查 + if mime_type not in _ALLOWED_MIME: + return f"⚠️ 不支援的圖片格式:{mime_type}" + + tmp_path = Path(f"/tmp/tg_photo_{file_id}.jpg") + try: + tmp_path.write_bytes(file_bytes) + result = await self._call_llava(tmp_path, question) + if result: + logger.info("image_analysis_done", chat_id=chat_id, file_id=file_id[:8]) + return result + finally: + try: + tmp_path.unlink(missing_ok=True) + except Exception: + pass + + async def _check_rate_limit(self, chat_id: str) -> bool: + """Redis sliding window rate limit""" + try: + from src.core.redis_client import get_redis + redis = await get_redis() + if redis is None: + return True # Redis 不可用時放行 + + key = f"img_ratelimit:{chat_id}" + now = int(time.time()) + window_start = now - _RATE_WINDOW + + pipe = redis.pipeline() + pipe.zremrangebyscore(key, 0, window_start) + pipe.zcard(key) + pipe.zadd(key, {str(now): now}) + pipe.expire(key, _RATE_WINDOW * 2) + results = await pipe.execute() + + count = results[1] + return count < _RATE_LIMIT + except Exception: + return True # 失敗時放行 + + async def _call_llava(self, image_path: Path, question: str) -> str | None: + """呼叫 llava:latest via Ollama /api/generate with base64 image""" + try: + image_b64 = base64.b64encode(image_path.read_bytes()).decode() + http = await self._get_http() + resp = await http.post( + f"{settings.OLLAMA_URL}/api/generate", + json={ + "model": _MODEL, + "prompt": question, + "images": [image_b64], + "stream": False, + "options": { + "num_predict": 512, + "temperature": 0.3, + }, + }, + ) + if resp.status_code == 200: + text = resp.json().get("response", "").strip() + return text or None + else: + logger.warning("image_analysis_ollama_error", status=resp.status_code) + return None + except httpx.TimeoutException: + logger.warning("image_analysis_timeout", path=str(image_path)) + return "⚠️ 圖片分析超時(llava 處理中),請稍後重試" + except Exception as e: + logger.error("image_analysis_failed", error=str(e)) + return None + + async def download_and_analyze( + self, + chat_id: str, + file_id: str, + question: str = "請用繁體中文描述這張圖片,如果是截圖或介面請分析其內容和問題", + ) -> None: + """ + 從 Telegram 下載圖片後分析並發送結果 + 供 telegram webhook handler 呼叫(含 download + analyze + send) + """ + try: + file_bytes = await self._download_telegram_file(file_id) + if not file_bytes: + return + result = await self.analyze( + chat_id=chat_id, + file_id=file_id, + file_bytes=file_bytes, + question=question, + ) + if result: + from src.services.telegram_gateway import get_telegram_gateway + tg = get_telegram_gateway() + await tg.initialize() + await tg.send_text(f"🖼️ 圖片分析\n{result}") + except Exception as e: + logger.warning("download_and_analyze_failed", error=str(e)) + + async def _download_telegram_file(self, file_id: str) -> bytes | None: + """透過 Telegram Bot API 下載圖片""" + try: + from src.core.config import get_settings + cfg = get_settings() + bot_token = cfg.TELEGRAM_BOT_TOKEN + http = await self._get_http() + + # Step 1: getFile → file_path + get_file_resp = await http.get( + f"https://api.telegram.org/bot{bot_token}/getFile", + params={"file_id": file_id}, + timeout=httpx.Timeout(15.0, connect=5.0), + ) + if get_file_resp.status_code != 200: + logger.warning("tg_getfile_failed", status=get_file_resp.status_code) + return None + + file_path = get_file_resp.json().get("result", {}).get("file_path") + if not file_path: + return None + + # Step 2: 下載檔案 + dl_resp = await http.get( + f"https://api.telegram.org/file/bot{bot_token}/{file_path}", + timeout=httpx.Timeout(30.0, connect=5.0), + ) + if dl_resp.status_code == 200: + return dl_resp.content + except Exception as e: + logger.warning("tg_download_failed", file_id=file_id[:8], error=str(e)) + return None + + async def close(self) -> None: + if self._http and not self._http.is_closed: + await self._http.aclose() + + +_instance: ImageAnalysisService | None = None + + +def get_image_analysis_service() -> ImageAnalysisService: + global _instance + if _instance is None: + _instance = ImageAnalysisService() + return _instance + + +def set_image_analysis_service(svc: ImageAnalysisService) -> None: + global _instance + _instance = svc diff --git a/apps/api/src/services/knowledge_rag_service.py b/apps/api/src/services/knowledge_rag_service.py new file mode 100644 index 00000000..3d49ee14 --- /dev/null +++ b/apps/api/src/services/knowledge_rag_service.py @@ -0,0 +1,235 @@ +""" +AWOOOI — Knowledge RAG Service (Phase 33, ADR-067) +================================================== +本地 RAG 知識庫:nomic-embed-text 768維向量 + pgvector + +雙軌查詢: + - Redis KNN 熱快取 (TTL 30d) + - pgvector 冷儲存 (永久) + +索引策略: + - 初期 < 100 筆: 線性搜尋 + - 超過 100 筆: 執行 CREATE INDEX ivfflat (手動觸發) + +向量模型: nomic-embed-text (Ollama 188, 768維) +生成模型: qwen2.5:7b-instruct (Ollama 111) + +2026-04-10 Claude Sonnet 4.6 Asia/Taipei +""" +from __future__ import annotations + +import json +from typing import Any + +import httpx +import structlog + +from src.core.config import get_settings + +logger = structlog.get_logger(__name__) +settings = get_settings() + +# nomic-embed-text 在 188 (記憶體充足) +_EMBED_URL = "http://192.168.0.188:11434" +_EMBED_MODEL = "nomic-embed-text" + +# 生成回答用 111 (快) +_GEN_MODEL = "qwen2.5:7b-instruct" + +_TOP_K = 5 # 取最相似 5 筆 +_CACHE_TTL = 30 * 86400 # 30 days + + +class KnowledgeRAGService: + """RAG 知識庫服務""" + + def __init__(self) -> None: + self._http: httpx.AsyncClient | None = None + + async def _get_http(self) -> httpx.AsyncClient: + if self._http is None or self._http.is_closed: + self._http = httpx.AsyncClient(timeout=httpx.Timeout(60.0, connect=10.0)) + return self._http + + # ------------------------------------------------------------------ + # 公開 API + # ------------------------------------------------------------------ + + async def query(self, question: str, top_k: int = _TOP_K) -> str: + """ + RAG 查詢:embedding → pgvector knn → 生成回答 + 回傳繁中人話回答 + """ + embedding = await self._embed(question) + if embedding is None: + return "⚠️ 無法生成向量,RAG 查詢失敗" + + chunks = await self._search_pgvector(embedding, top_k) + if not chunks: + return "📭 知識庫尚無相關資料,請先執行索引建立" + + context = "\n\n---\n\n".join( + f"[{c.get('source','?')}] {c.get('title','')}\n{c.get('chunk_text','')}" + for c in chunks + ) + return await self._generate_answer(question, context) + + async def index_document( + self, + source: str, + source_id: str, + title: str, + text: str, + metadata: dict | None = None, + ) -> bool: + """ + 將文件向量化並儲存到 pgvector + 自動分段 (每段 500 字, overlap 100) + """ + chunks = self._chunk_text(text, chunk_size=500, overlap=100) + success = 0 + for chunk in chunks: + emb = await self._embed(chunk) + if emb: + ok = await self._save_chunk(source, source_id, title, chunk, emb, metadata or {}) + if ok: + success += 1 + logger.info("rag_indexed", source=source, source_id=source_id, chunks=success) + return success > 0 + + # ------------------------------------------------------------------ + # 向量化 + # ------------------------------------------------------------------ + + async def _embed(self, text: str) -> list[float] | None: + try: + http = await self._get_http() + resp = await http.post( + f"{_EMBED_URL}/api/embeddings", + json={"model": _EMBED_MODEL, "prompt": text}, + ) + if resp.status_code == 200: + return resp.json().get("embedding") + except Exception as e: + logger.warning("rag_embed_failed", error=str(e)) + return None + + # ------------------------------------------------------------------ + # pgvector 搜尋 + # ------------------------------------------------------------------ + + async def _search_pgvector( + self, embedding: list[float], top_k: int + ) -> list[dict[str, Any]]: + try: + from src.db.base import get_db_context + from sqlalchemy import text + + vec_str = "[" + ",".join(str(v) for v in embedding) + "]" + async with get_db_context() as db: + rows = await db.execute( + text(""" + SELECT source, source_id, title, chunk_text, + 1 - (embedding <=> CAST(:vec AS vector)) AS similarity + FROM rag_chunks + ORDER BY embedding <=> CAST(:vec AS vector) + LIMIT :k + """), + {"vec": vec_str, "k": top_k}, + ) + return [dict(r._mapping) for r in rows] + except Exception as e: + logger.error("rag_pgvector_search_failed", error=str(e)) + return [] + + async def _save_chunk( + self, + source: str, source_id: str, title: str, + chunk_text: str, embedding: list[float], + metadata: dict, + ) -> bool: + try: + from src.db.base import get_db_context + from sqlalchemy import text + + vec_str = "[" + ",".join(str(v) for v in embedding) + "]" + async with get_db_context() as db: + await db.execute( + text(""" + INSERT INTO rag_chunks (source, source_id, title, chunk_text, embedding, metadata) + VALUES (:source, :source_id, :title, :chunk_text, CAST(:embedding AS vector), CAST(:metadata AS jsonb)) + """), + { + "source": source, "source_id": source_id, "title": title, + "chunk_text": chunk_text, + "embedding": vec_str, + "metadata": json.dumps(metadata, ensure_ascii=False), + }, + ) + return True + except Exception as e: + logger.error("rag_save_chunk_failed", error=str(e)) + return False + + # ------------------------------------------------------------------ + # 生成回答 + # ------------------------------------------------------------------ + + async def _generate_answer(self, question: str, context: str) -> str: + prompt = ( + "你是 AWOOOI AIOps 知識庫助手,請用繁體中文根據以下資料回答問題。\n" + "如果資料不足以回答,明確說「資料不足」,不要猜測。\n\n" + f"=== 相關資料 ===\n{context[:6000]}\n\n" + f"=== 問題 ===\n{question}" + ) + try: + http = await self._get_http() + resp = await http.post( + f"{settings.OLLAMA_URL}/api/generate", + json={ + "model": _GEN_MODEL, + "prompt": prompt, + "stream": False, + "options": {"num_predict": 512, "temperature": 0.2}, + }, + timeout=httpx.Timeout(90.0, connect=10.0), + ) + if resp.status_code == 200: + return resp.json().get("response", "").strip() + except Exception as e: + logger.error("rag_generate_failed", error=str(e)) + return "⚠️ RAG 生成失敗,請稍後再試" + + # ------------------------------------------------------------------ + # 工具 + # ------------------------------------------------------------------ + + @staticmethod + def _chunk_text(text: str, chunk_size: int = 500, overlap: int = 100) -> list[str]: + """簡單字元分段,帶 overlap""" + chunks = [] + start = 0 + while start < len(text): + end = start + chunk_size + chunks.append(text[start:end]) + start += chunk_size - overlap + return [c for c in chunks if c.strip()] + + async def close(self) -> None: + if self._http and not self._http.is_closed: + await self._http.aclose() + + +_instance: KnowledgeRAGService | None = None + + +def get_knowledge_rag_service() -> KnowledgeRAGService: + global _instance + if _instance is None: + _instance = KnowledgeRAGService() + return _instance + + +def set_knowledge_rag_service(svc: KnowledgeRAGService) -> None: + global _instance + _instance = svc diff --git a/apps/api/src/services/local_code_review_service.py b/apps/api/src/services/local_code_review_service.py new file mode 100644 index 00000000..46c97d4b --- /dev/null +++ b/apps/api/src/services/local_code_review_service.py @@ -0,0 +1,200 @@ +""" +AWOOOI — Local Code Review Service (Phase 32, ADR-067) +====================================================== +使用 qwen2.5-coder:7b 對 Gitea PR diff 進行自動審查 +Fallback: Gemini (diff > 50KB 或 Ollama 超時) + +觸發: Gitea webhook push event (PR opened/updated) +雙寫: Redis TTL 7d + PostgreSQL pr_reviews 表永久儲存 +並發: semaphore 最多 2 個同時審查 + +2026-04-10 Claude Sonnet 4.6 Asia/Taipei +""" +from __future__ import annotations + +import asyncio +from typing import Any + +import httpx +import structlog + +from src.core.config import get_settings + +logger = structlog.get_logger(__name__) +settings = get_settings() + +_MODEL_OLLAMA = "qwen2.5-coder:7b" +_TIMEOUT_OLLAMA = 120.0 +_MAX_DIFF_BYTES = 50 * 1024 # 50KB → fallback to Gemini +_SEMAPHORE = asyncio.Semaphore(2) # 最多 2 個同時審查 + +# Redis cache TTL: 7 days +_CACHE_TTL = 7 * 86400 + + +class LocalCodeReviewService: + """PR 自動審查服務""" + + def __init__(self) -> None: + self._http: httpx.AsyncClient | None = None + + async def _get_http(self) -> httpx.AsyncClient: + if self._http is None or self._http.is_closed: + self._http = httpx.AsyncClient( + timeout=httpx.Timeout(_TIMEOUT_OLLAMA, connect=10.0) + ) + return self._http + + async def review_pr( + self, + pr_id: str, + repo: str, + title: str, + diff: str, + ) -> dict[str, Any] | None: + """ + 審查 PR diff,回傳審查結果 dict + {review_text, issues_count, model, provider} + """ + async with _SEMAPHORE: + cache_key = f"pr_review:{repo}:{pr_id}" + + # 快取命中 + try: + from src.core.redis_client import get_redis + redis = await get_redis() + if redis: + cached = await redis.get(cache_key) + if cached: + logger.info("pr_review_cache_hit", pr_id=pr_id) + import json + return json.loads(cached) + except Exception: + redis = None + + diff_size = len(diff.encode()) + use_gemini = diff_size > _MAX_DIFF_BYTES + + if use_gemini: + result = await self._review_with_gemini(pr_id, repo, title, diff) + else: + result = await self._review_with_ollama(pr_id, repo, title, diff) + if result is None: + # Ollama 失敗 → fallback Gemini + result = await self._review_with_gemini(pr_id, repo, title, diff) + + if result is None: + return None + + result["diff_size_bytes"] = diff_size + + # 寫入快取 + if redis: + try: + import json + await redis.set(cache_key, json.dumps(result, ensure_ascii=False), ex=_CACHE_TTL) + except Exception: + pass + + # 寫入 DB + await self._save_to_db(pr_id, repo, title, diff_size, result) + + return result + + async def _review_with_ollama( + self, pr_id: str, repo: str, title: str, diff: str + ) -> dict[str, Any] | None: + prompt = ( + f"你是資深程式審查員,請用繁體中文審查以下 Pull Request。\n" + f"PR: {repo}#{pr_id} — {title}\n\n" + "請找出:1) 潛在 Bug 或邏輯錯誤 2) 安全問題 3) 效能問題 4) 代碼風格問題\n" + "格式:每個問題獨立一行,以「⚠️」開頭。如果沒有問題,說「✅ 程式碼品質良好」\n\n" + f"=== Diff ===\n{diff[:40000]}\n=== 結束 ===" + ) + try: + http = await self._get_http() + resp = await http.post( + f"{settings.OLLAMA_URL}/api/generate", + json={ + "model": _MODEL_OLLAMA, + "prompt": prompt, + "stream": False, + "options": {"num_predict": 1024, "temperature": 0.1}, + }, + ) + if resp.status_code == 200: + text = resp.json().get("response", "").strip() + issues = text.count("⚠️") + logger.info("pr_review_ollama_done", pr_id=pr_id, issues=issues) + return {"review_text": text, "issues_count": issues, "model": _MODEL_OLLAMA, "provider": "ollama"} + except httpx.TimeoutException: + logger.warning("pr_review_ollama_timeout", pr_id=pr_id) + except Exception as e: + logger.error("pr_review_ollama_failed", pr_id=pr_id, error=str(e)) + return None + + async def _review_with_gemini( + self, pr_id: str, repo: str, title: str, diff: str + ) -> dict[str, Any] | None: + try: + from src.services.openclaw import get_openclaw + openclaw = get_openclaw() + prompt = ( + f"PR Code Review: {repo}#{pr_id} — {title}\n" + "繁體中文,找出 Bug/安全/效能/風格問題,每問題以⚠️開頭\n\n" + f"Diff:\n{diff[:60000]}" + ) + # 直接呼叫 Gemini + result = await openclaw._call_gemini(prompt) + if result and result[0]: + text = result[0] + issues = text.count("⚠️") + return {"review_text": text, "issues_count": issues, "model": "gemini", "provider": "gemini"} + except Exception as e: + logger.error("pr_review_gemini_failed", pr_id=pr_id, error=str(e)) + return None + + async def _save_to_db( + self, pr_id: str, repo: str, title: str, diff_size: int, result: dict + ) -> None: + try: + from src.db.base import get_db_context + from sqlalchemy import text + async with get_db_context() as db: + await db.execute( + text(""" + INSERT INTO pr_reviews + (pr_id, repo, title, diff_size_bytes, model, provider, review_text, issues_count) + VALUES + (:pr_id, :repo, :title, :diff_size, :model, :provider, :review_text, :issues_count) + """), + { + "pr_id": pr_id, "repo": repo, "title": title, + "diff_size": diff_size, + "model": result.get("model", "unknown"), + "provider": result.get("provider", "unknown"), + "review_text": result.get("review_text", ""), + "issues_count": result.get("issues_count", 0), + }, + ) + except Exception as e: + logger.warning("pr_review_db_save_failed", error=str(e)) + + async def close(self) -> None: + if self._http and not self._http.is_closed: + await self._http.aclose() + + +_instance: LocalCodeReviewService | None = None + + +def get_local_code_review_service() -> LocalCodeReviewService: + global _instance + if _instance is None: + _instance = LocalCodeReviewService() + return _instance + + +def set_local_code_review_service(svc: LocalCodeReviewService) -> None: + global _instance + _instance = svc diff --git a/apps/api/src/services/log_summary_service.py b/apps/api/src/services/log_summary_service.py new file mode 100644 index 00000000..b8b5938f --- /dev/null +++ b/apps/api/src/services/log_summary_service.py @@ -0,0 +1,244 @@ +""" +Log Summary Service - Phase 31 +================================ +職責:擷取 K8s Pod log,用 deepseek-r1:14b 生成繁中異常摘要 + +設計邊界: +- 只輸出摘要文字,不做 RCA、不生成修復指令 +- 5s 軟超時:超過則回傳 None,主流程不阻塞 +- LLM 硬超時 180s(deepseek-r1:14b 慢但準) +- Redis 快取:log_summary:{pod}:{date} TTL 24h,同 pod 同天不重複呼叫 +- 敏感資料遮蔽:Bearer token、密碼 regex → [REDACTED] +- 複用 K8sDiagnosticsService 取得 logs + +版本: v1.0 +建立: 2026-04-10 (台北時區) +建立者: Claude Code (Phase 31 ADR-067) +""" + +from __future__ import annotations + +import asyncio +import re +from datetime import UTC, datetime + +import httpx +import structlog + +from src.core.redis_client import get_redis + +logger = structlog.get_logger(__name__) + +# ============================================================ +# 設定 +# ============================================================ +OLLAMA_URL = "http://192.168.0.111:11434" +SUMMARY_MODEL = "deepseek-r1:14b" +LLM_TIMEOUT = 180.0 # deepseek-r1 硬超時 +SOFT_TIMEOUT = 5.0 # 主流程軟超時(超過回 None) +LOG_TAIL_LINES = 100 +ANOMALY_TAIL_LINES = 50 # 只取最後 50 行異常行送 LLM +CACHE_TTL = 86400 # 24 小時 +CACHE_PREFIX = "log_summary:" + +# 異常關鍵字 pattern +_ANOMALY_PATTERN = re.compile( + r"(ERROR|FATAL|Exception|Traceback|OOMKilled|panic|CRITICAL|WARN|WARNING)", + re.IGNORECASE, +) + +# 敏感資料遮蔽 pattern +_SENSITIVE_PATTERNS = [ + (re.compile(r"Bearer\s+[A-Za-z0-9+/=._-]{10,}", re.IGNORECASE), "Bearer [REDACTED]"), + (re.compile(r"password[=:\s]+\S+", re.IGNORECASE), "password=[REDACTED]"), + (re.compile(r"token[=:\s]+[A-Za-z0-9+/=._-]{10,}", re.IGNORECASE), "token=[REDACTED]"), + (re.compile(r"secret[=:\s]+\S+", re.IGNORECASE), "secret=[REDACTED]"), +] + +# ============================================================ +# Prompt +# ============================================================ +_SUMMARY_PROMPT = """你是 AWOOOI SRE 維運助理,請分析以下 K8s Pod log 片段。 + +## Pod: {pod_name} (namespace: {namespace}) +## Log 異常片段(最近 {line_count} 行): +{log_snippet} + +## 任務 +用繁體中文,3 行以內,說明: +1. 異常的主要原因是什麼 +2. 影響程度(例如 OOM Killed、連線失敗、程式錯誤) +3. 建議立即查看哪個方向 + +只輸出摘要文字,不要標題或 markdown 格式。 +""" + + +class LogSummaryService: + """ + Pod Log 異常摘要服務 + + 職責邊界: + ✅ 擷取 Pod log(複用 K8sDiagnosticsService) + ✅ 過濾異常行 + 敏感資料遮蔽 + ✅ deepseek-r1:14b 生成繁中摘要(180s timeout) + ✅ Redis 快取(24h TTL) + ❌ 不做 RCA + ❌ 不生成修復指令 + ❌ 不阻塞告警主流程(軟超時 5s) + """ + + async def summarize( + self, + pod_name: str, + namespace: str = "awoooi-prod", + ) -> str | None: + """ + 取得 Pod log 異常摘要 + + Returns: + str: 繁中摘要文字(3 行內) + None: 快取命中、無異常、或超時 + """ + cache_key = self._cache_key(pod_name) + redis = await get_redis() + + cached = await redis.get(cache_key) + if cached: + logger.debug("log_summary_cache_hit", pod=pod_name) + return cached.decode() if isinstance(cached, bytes) else cached + + raw_logs = await self._fetch_logs(pod_name, namespace) + if not raw_logs: + return None + + anomaly_lines = self._extract_anomaly_lines(raw_logs) + if not anomaly_lines: + logger.debug("log_summary_no_anomaly", pod=pod_name) + return None + + summary = await self._call_llm(pod_name, namespace, anomaly_lines) + if summary: + await redis.set(cache_key, summary, ex=CACHE_TTL) + logger.info("log_summary_generated", pod=pod_name, lines=len(anomaly_lines)) + + return summary + + async def summarize_with_soft_timeout( + self, + pod_name: str, + namespace: str = "awoooi-prod", + ) -> str | None: + """ + 帶 5s 軟超時的摘要取得 + + 用於告警主流程:超過軟超時回傳 None,不阻塞 Telegram 推送 + LLM 繼續在背景跑,結果寫入 Redis 快取供下次使用 + """ + try: + return await asyncio.wait_for( + self.summarize(pod_name, namespace), + timeout=SOFT_TIMEOUT, + ) + except asyncio.TimeoutError: + logger.info( + "log_summary_soft_timeout", + pod=pod_name, + soft_timeout=SOFT_TIMEOUT, + ) + # 繼續在背景跑,結果存 Redis 備用 + asyncio.create_task(self.summarize(pod_name, namespace)) + return None + + # -------------------------------------------------------- + # Private helpers + # -------------------------------------------------------- + + def _cache_key(self, pod_name: str) -> str: + date_str = datetime.now(UTC).strftime("%Y-%m-%d") + return f"{CACHE_PREFIX}{pod_name}:{date_str}" + + async def _fetch_logs(self, pod_name: str, namespace: str) -> str | None: + """複用 K8sDiagnosticsService 取得 logs""" + try: + from src.services.k8s_diagnostics import K8sDiagnosticsService + svc = K8sDiagnosticsService(default_namespace=namespace) + diag = await svc.collect_diagnostics( + pod_name=pod_name, + namespace=namespace, + log_tail_lines=LOG_TAIL_LINES, + include_previous_logs=False, + ) + return diag.logs or None + except Exception as e: + logger.warning("log_summary_fetch_failed", pod=pod_name, error=str(e)) + return None + + def _extract_anomaly_lines(self, raw_logs: str) -> list[str]: + """過濾異常行 + 敏感資料遮蔽,取最後 ANOMALY_TAIL_LINES 行""" + lines = raw_logs.splitlines() + anomaly = [l for l in lines if _ANOMALY_PATTERN.search(l)] + # 取最後 N 行 + anomaly = anomaly[-ANOMALY_TAIL_LINES:] + # 遮蔽敏感資料 + result = [] + for line in anomaly: + for pattern, replacement in _SENSITIVE_PATTERNS: + line = pattern.sub(replacement, line) + result.append(line) + return result + + async def _call_llm( + self, + pod_name: str, + namespace: str, + anomaly_lines: list[str], + ) -> str | None: + """呼叫 deepseek-r1:14b 生成摘要""" + log_snippet = "\n".join(anomaly_lines) + prompt = _SUMMARY_PROMPT.format( + pod_name=pod_name, + namespace=namespace, + line_count=len(anomaly_lines), + log_snippet=log_snippet[:3000], # 避免超出 context + ) + + try: + async with httpx.AsyncClient(timeout=LLM_TIMEOUT) as client: + resp = await client.post( + f"{OLLAMA_URL}/api/generate", + json={ + "model": SUMMARY_MODEL, + "prompt": prompt, + "stream": False, + "options": {"temperature": 0.1, "num_predict": 200}, + }, + ) + resp.raise_for_status() + data = resp.json() + raw = data.get("response", "").strip() + + # 過濾 deepseek-r1 的 ... 推理區塊 + text = re.sub(r".*?", "", raw, flags=re.DOTALL).strip() + return text or raw or None + + except httpx.TimeoutException: + logger.warning("log_summary_llm_timeout", model=SUMMARY_MODEL) + return None + except Exception as e: + logger.warning("log_summary_llm_error", error=str(e)) + return None + + +# ============================================================ +# Singleton +# ============================================================ + +_log_summary_service: LogSummaryService | None = None + + +def get_log_summary_service() -> LogSummaryService: + global _log_summary_service + if _log_summary_service is None: + _log_summary_service = LogSummaryService() + return _log_summary_service