#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ services/code_review_pipeline_service.py Post-Deploy AI Agent Code Review Pipeline 觸發時機:CD 健康檢查通過後,由 Gitea Action webhook 呼叫 Pipeline: Step 1 system 讀取變更檔案內容 Step 2 Hermes 程式碼掃描(bugs / security / performance) Step 3 OpenClaw 架構品質評估(Ollama-first,Gemini 僅備援) Step 4 ElephantAlpha 決策協調(severity 判定 + auto-fix 裁量) Step 5 NemoTron 行動派遣(action_plans 寫入 + AiderHeal 觸發) 結果輸出: - ai_insights(type='code_review_result') - action_plans(type='code_review_fix') - Telegram 告警(啟動 / 完成 / 錯誤) - 前端 /code-review/ 即時 polling 狀態 """ import json import logging import os import re import threading from datetime import datetime from typing import Any, Dict, List, Optional import requests from database.manager import get_session from sqlalchemy import text # ADR-027:Code Review 走 OllamaService 取得三主機級聯 retry。 from services.hermes_analyst_service import HERMES_MODEL as _HERMES_MODEL from services.ai_call_logger import log_ai_call # Operation Ollama-First v5.0 P1 from services.action_plan_dedupe import active_code_review_action_exists from services.gemini_guard import gemini_disabled_message, get_gemini_api_key logger = logging.getLogger(__name__) def _env_bool(name: str, default: str = "false") -> bool: return os.getenv(name, default).strip().lower() in {"1", "true", "yes", "on"} def _env_float(name: str, default: str) -> float: try: return float(os.getenv(name, default)) except (TypeError, ValueError): return float(default) # ── Pipeline 全域狀態(供前端 polling)───────────────────────────────────── _current_pipeline: Dict[str, Any] = {} _pipeline_lock = threading.Lock() # Gemini 僅作 Code Review Ollama/Claude 主路徑都失敗後的最後雲端備援。 REVIEW_MODEL = os.getenv("OPENCLAW_MODEL", "gemini-2.5-flash") CODE_REVIEW_OLLAMA_MODEL = os.getenv( "CODE_REVIEW_OLLAMA_MODEL", os.getenv("OPENCLAW_OLLAMA_MODEL", "qwen2.5-coder:7b"), ) CODE_REVIEW_OLLAMA_TIMEOUT = int(os.getenv("CODE_REVIEW_OLLAMA_TIMEOUT", "15")) CODE_REVIEW_OLLAMA_SECONDARY_MODEL = os.getenv( "CODE_REVIEW_OLLAMA_SECONDARY_MODEL", "gemma3:4b", ) CODE_REVIEW_OLLAMA_SECONDARY_TIMEOUT = int( os.getenv("CODE_REVIEW_OLLAMA_SECONDARY_TIMEOUT", "25") ) CODE_REVIEW_OLLAMA_FALLBACK_MODEL = os.getenv( "CODE_REVIEW_OLLAMA_FALLBACK_MODEL", _HERMES_MODEL, ) CODE_REVIEW_OLLAMA_FALLBACK_TIMEOUT = int( os.getenv("CODE_REVIEW_OLLAMA_FALLBACK_TIMEOUT", "20") ) CODE_REVIEW_OLLAMA_NUM_PREDICT = int(os.getenv("CODE_REVIEW_OLLAMA_NUM_PREDICT", "384")) CODE_REVIEW_OLLAMA_KEEP_ALIVE = os.getenv("CODE_REVIEW_OLLAMA_KEEP_ALIVE", "5m") CODE_REVIEW_ALLOW_111_FALLBACK = _env_bool("CODE_REVIEW_ALLOW_111_FALLBACK", "false") CODE_REVIEW_OLLAMA_HOST_PREFLIGHT_ENABLED = _env_bool( "CODE_REVIEW_OLLAMA_HOST_PREFLIGHT_ENABLED", "true", ) CODE_REVIEW_OLLAMA_HOST_PREFLIGHT_TIMEOUT = _env_float( "CODE_REVIEW_OLLAMA_HOST_PREFLIGHT_TIMEOUT", "2.5", ) CODE_REVIEW_HERMES_TIMEOUT = int(os.getenv("CODE_REVIEW_HERMES_TIMEOUT", "35")) CODE_REVIEW_HERMES_PRIMARY_MODEL = os.getenv( "CODE_REVIEW_HERMES_PRIMARY_MODEL", CODE_REVIEW_OLLAMA_MODEL, ) CODE_REVIEW_HERMES_SECONDARY_MODEL = os.getenv( "CODE_REVIEW_HERMES_SECONDARY_MODEL", CODE_REVIEW_OLLAMA_SECONDARY_MODEL, ) CODE_REVIEW_HERMES_FALLBACK_MODEL = os.getenv( "CODE_REVIEW_HERMES_FALLBACK_MODEL", CODE_REVIEW_OLLAMA_FALLBACK_MODEL, ) CODE_REVIEW_HERMES_PRIMARY_TIMEOUT = int( os.getenv("CODE_REVIEW_HERMES_PRIMARY_TIMEOUT", os.getenv("CODE_REVIEW_HERMES_TIMEOUT", "15")) ) CODE_REVIEW_HERMES_SECONDARY_TIMEOUT = int(os.getenv("CODE_REVIEW_HERMES_SECONDARY_TIMEOUT", "45")) CODE_REVIEW_HERMES_FALLBACK_TIMEOUT = int(os.getenv("CODE_REVIEW_HERMES_FALLBACK_TIMEOUT", "20")) CODE_REVIEW_HERMES_NUM_PREDICT = int(os.getenv("CODE_REVIEW_HERMES_NUM_PREDICT", "384")) CODE_REVIEW_HERMES_MAX_FILES = int(os.getenv("CODE_REVIEW_HERMES_MAX_FILES", "2")) CODE_REVIEW_HERMES_MAX_CHARS = int(os.getenv("CODE_REVIEW_HERMES_MAX_CHARS", "900")) CODE_REVIEW_HERMES_LLM_SCAN_ENABLED = ( os.getenv("CODE_REVIEW_HERMES_LLM_SCAN_ENABLED", "false").lower() == "true" ) INTERNAL_TOKEN = os.getenv("INTERNAL_WEBHOOK_TOKEN", "") AUTO_FIX_ENABLED = os.getenv("CODE_REVIEW_AUTO_FIX_ENABLED", "true").lower() == "true" ALLOW_INSECURE_WEBHOOK = os.getenv("MOMO_ALLOW_INSECURE_INTERNAL_WEBHOOK_FOR_DEV", "").lower() == "true" AIDER_AUTO_FIX_FILE_PATTERN = re.compile( r"^(services|routes|database)/(?:[a-zA-Z0-9_]+/)*[a-zA-Z0-9_]+\.py$" ) # Phase 7 Frontier 升級 feature flag — 預設 OFF;啟用後只作 Ollama 失敗後的雲端備援。 CODE_REVIEW_USE_CLAUDE = os.getenv("CODE_REVIEW_USE_CLAUDE", "false").lower() == "true" CLAUDE_REVIEW_MODEL = os.getenv("CLAUDE_MODEL", "claude-opus-4-7") def _aider_allowed_fix_files(files: List[str]) -> List[str]: """回傳 ADR-020 允許交給 AiderHeal 自動修復的檔案。""" return [f for f in files if AIDER_AUTO_FIX_FILE_PATTERN.match(f or "")] def _code_review_ollama_host_reachable(host: str) -> bool: """Short-circuit dead GCP Ollama hosts before a generate timeout.""" if not CODE_REVIEW_OLLAMA_HOST_PREFLIGHT_ENABLED: return True try: resp = requests.get( f"{str(host or '').rstrip('/')}/api/version", timeout=CODE_REVIEW_OLLAMA_HOST_PREFLIGHT_TIMEOUT, ) return resp.status_code == 200 except Exception as exc: logger.warning("[CodeReview] Ollama host preflight failed host=%s error=%s", host, exc) return False def _code_review_ollama_model_available(host: str, model: str) -> bool: """Skip a host/model pair when Ollama reports the requested model is absent.""" if not CODE_REVIEW_OLLAMA_HOST_PREFLIGHT_ENABLED or not model: return True try: resp = requests.get( f"{str(host or '').rstrip('/')}/api/tags", timeout=CODE_REVIEW_OLLAMA_HOST_PREFLIGHT_TIMEOUT, ) if resp.status_code != 200: return True names = { str(item.get("name") or "") for item in (resp.json().get("models") or []) if isinstance(item, dict) } if model in names: return True if ":" not in model and f"{model}:latest" in names: return True logger.warning( "[CodeReview] Ollama model preflight failed host=%s model=%s available=%s", host, model, sorted(name for name in names if name)[:12], ) return False except Exception as exc: logger.warning("[CodeReview] Ollama model preflight fail-open host=%s model=%s error=%s", host, model, exc) return True # ═══════════════════════════════════════════════════════════════════════════════ # Pipeline Class # ═══════════════════════════════════════════════════════════════════════════════ class CodeReviewPipeline: """ 5-Step post-deploy code review pipeline. Call pipeline.run() inside a daemon thread. """ def __init__(self, commit_sha: str, changed_files: List[str], branch: str = "main", deploy_type: str = "sync"): self.commit_sha = commit_sha self.branch = branch self.deploy_type = deploy_type self.started_at = datetime.now() self.pipeline_id = f"cr_{commit_sha[:8]}_{self.started_at.strftime('%Y%m%d_%H%M%S')}" # 只 review Python + YAML 檔(跳過靜態資源) self.changed_files = [ f for f in changed_files if f.endswith(('.py', '.yaml', '.yml', '.json')) and not f.startswith(('node_modules/', '.git/')) ] self.state: Dict[str, Any] = { "pipeline_id": self.pipeline_id, "commit_sha": commit_sha, "branch": branch, "changed_files": self.changed_files, "status": "running", "current_step": 0, "total_steps": 5, "steps": [], "findings": [], "severity_summary": {"critical": 0, "high": 0, "medium": 0, "low": 0}, "openclaw_report": "", "ea_decision": {}, "auto_fix_triggered": False, "started_at": self.started_at.isoformat(), "completed_at": None, "message": "", } self._sync_global() # ── State helpers ───────────────────────────────────────────────────────── def _sync_global(self): global _current_pipeline with _pipeline_lock: _current_pipeline = dict(self.state) def _step_start(self, num: int, name: str, agent: str): self.state["current_step"] = num self.state["steps"].append({ "step": num, "name": name, "agent": agent, "status": "running", "started_at": datetime.now().isoformat(), "completed_at": None, "summary": "", }) self._sync_global() logger.info("[CodeReview] ▶ Step %d/5 %s (%s)", num, name, agent) def _step_done(self, num: int, summary: str, ok: bool = True): for s in self.state["steps"]: if s["step"] == num: s["status"] = "ok" if ok else "error" s["completed_at"] = datetime.now().isoformat() s["summary"] = summary[:300] self._sync_global() logger.info("[CodeReview] %s Step %d — %s", "✓" if ok else "✗", num, summary[:100]) def _finish(self, status: str, message: str): self.state["status"] = status self.state["completed_at"] = datetime.now().isoformat() self.state["message"] = message self._sync_global() # ── Main pipeline ───────────────────────────────────────────────────────── def run(self): """Execute full pipeline. Designed to run in daemon thread.""" try: self._notify_start() # Step 1 ─ read files self._step_start(1, "讀取變更檔案", "system") file_contents = self._read_changed_files() if not file_contents: self._step_done(1, "無有效 Python/YAML 變更,跳過 Review") self._finish("skipped", "無有效變更檔案") return self._step_done(1, f"讀取 {len(file_contents)} 個檔案") # Step 2 ─ Hermes scan self._step_start(2, "Hermes 程式碼掃描", "Hermes") findings = self._hermes_scan(file_contents) cnt = self.state["severity_summary"] self._step_done(2, f"CRITICAL={cnt['critical']} HIGH={cnt['high']} MEDIUM={cnt['medium']} LOW={cnt['low']}") # Step 3 ─ OpenClaw assessment self._step_start(3, "OpenClaw 架構品質評估", "OpenClaw") openclaw_report = self._openclaw_assess(file_contents, findings) self.state["openclaw_report"] = openclaw_report self._step_done(3, openclaw_report[:120] if openclaw_report else "(OpenClaw 未回應)") # Step 4 ─ ElephantAlpha orchestration self._step_start(4, "ElephantAlpha 決策協調", "ElephantAlpha") ea = self._ea_orchestrate(findings, openclaw_report) self.state["ea_decision"] = ea self._step_done(4, f"優先度={ea.get('priority','?')} auto_fix={ea.get('auto_fix',False)}") # Step 5 ─ NemoTron dispatch self._step_start(5, "NemoTron 行動派遣", "NemoTron") dispatch = self._nemotron_dispatch(ea, findings) self._step_done(5, f"寫入 {dispatch['actions']} 筆 action_plans,自動修復={'是' if dispatch['auto_fix'] else '否'}") # Persist + notify self._save_to_db(findings, openclaw_report, ea) self._notify_complete(findings, openclaw_report, ea) self._finish("completed", "Pipeline 執行完成") except Exception as e: logger.error("[CodeReview] Pipeline 例外: %s", e, exc_info=True) self._finish("error", str(e)[:200]) self._notify_error(str(e)) # ── Step 1:讀取檔案 ─────────────────────────────────────────────────────── def _read_changed_files(self) -> Dict[str, str]: project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) contents: Dict[str, str] = {} for rel_path in self.changed_files[:8]: # 最多 8 個檔案 abs_path = os.path.join(project_root, rel_path) try: with open(abs_path, encoding="utf-8", errors="ignore") as fh: raw = fh.read() contents[rel_path] = raw[:6000] + ("\n... (截斷)" if len(raw) > 6000 else "") except OSError: logger.debug("[CodeReview] 無法讀取 %s(部署路徑不同?)", rel_path) return contents # ── Step 2:Hermes 掃描 ─────────────────────────────────────────────────── def _hermes_scan(self, files: Dict[str, str]) -> List[Dict]: """走 GCP-A → GCP-B;只有 CODE_REVIEW_ALLOW_111_FALLBACK=true 才落到 111。""" try: if not CODE_REVIEW_HERMES_LLM_SCAN_ENABLED: findings = self._static_code_scan(files) for f in findings: sev = f.get("severity", "LOW").lower() if sev in self.state["severity_summary"]: self.state["severity_summary"][sev] += 1 self.state["findings"] = findings self._sync_global() return findings compact_files = [] for name, content in list(files.items())[:CODE_REVIEW_HERMES_MAX_FILES]: clipped = content[:CODE_REVIEW_HERMES_MAX_CHARS] if len(content) > CODE_REVIEW_HERMES_MAX_CHARS: clipped += "\n... (截斷,僅掃描前段重點)" compact_files.append(f"### {name}\n```python\n{clipped}\n```") files_text = "\n\n".join(compact_files) prompt = f"""你是資深程式碼審查工程師,請掃描以下程式碼並列出所有問題。 {files_text} 輸出格式:純 JSON 陣列,每項包含以下欄位: - severity: "CRITICAL" | "HIGH" | "MEDIUM" | "LOW" - type: "bug" | "security" | "performance" | "maintainability" - file: 檔案名稱 - line_hint: 約略行號或函式名稱 - description: 問題說明(繁體中文,精簡一句) - suggestion: 修復建議(繁體中文,精簡一句) 只輸出 JSON 陣列,不含其他文字。無問題時輸出 []""" from services.ollama_service import ( OLLAMA_HOST_FALLBACK, OLLAMA_HOST_PRIMARY, OLLAMA_HOST_SECONDARY, OllamaService, get_host_label, get_provider_tag, ) hermes_attempts = [ ( "primary_code_scan", OLLAMA_HOST_PRIMARY, CODE_REVIEW_HERMES_PRIMARY_MODEL, CODE_REVIEW_HERMES_PRIMARY_TIMEOUT, ), ( "secondary_fast_scan", OLLAMA_HOST_SECONDARY, CODE_REVIEW_HERMES_SECONDARY_MODEL, CODE_REVIEW_HERMES_SECONDARY_TIMEOUT, ), ] if CODE_REVIEW_ALLOW_111_FALLBACK: hermes_attempts.append(( "lan_111_hermes_scan", OLLAMA_HOST_FALLBACK, CODE_REVIEW_HERMES_FALLBACK_MODEL, CODE_REVIEW_HERMES_FALLBACK_TIMEOUT, )) findings = None last_error = None for attempt_index, (attempt_key, host, model_name, timeout_s) in enumerate( hermes_attempts, start=1, ): with log_ai_call( caller='code_review_hermes', provider=get_provider_tag(host), model=model_name, request_id=f"cr-{self.commit_sha[:8]}", meta={'commit': self.commit_sha[:8], 'branch': self.branch, 'files': len(files), 'route': 'ollama_first', 'attempt': attempt_index, 'attempt_key': attempt_key, 'max_files': CODE_REVIEW_HERMES_MAX_FILES, 'max_chars': CODE_REVIEW_HERMES_MAX_CHARS, 'timeout_s': timeout_s}, ) as _ctx: _ctx.add_meta('host', host) _ctx.add_meta('host_label', get_host_label(host)) if not _code_review_ollama_host_reachable(host): last_error = ( "ollama host preflight failed " f"host={host} timeout={CODE_REVIEW_OLLAMA_HOST_PREFLIGHT_TIMEOUT}s" ) _ctx.add_meta('preflight', 'api_version') _ctx.set_error(last_error) continue if not _code_review_ollama_model_available(host, model_name): last_error = f"ollama model preflight failed host={host} model={model_name}" _ctx.add_meta('preflight', 'api_tags') _ctx.set_error(last_error) continue ollama = OllamaService(host=host, model=model_name) resp = ollama.generate( prompt=prompt, model=model_name, temperature=0.1, timeout=timeout_s, keep_alive=CODE_REVIEW_OLLAMA_KEEP_ALIVE, options={"num_predict": CODE_REVIEW_HERMES_NUM_PREDICT}, ) actual_host = resp.host or host _ctx.set_provider(get_provider_tag(actual_host)) _ctx.set_model(resp.model or model_name) _ctx.set_tokens( input=resp.input_tokens, output=resp.output_tokens, ) _ctx.add_meta('host', actual_host) _ctx.add_meta('host_label', get_host_label(actual_host)) if resp.model and resp.model != model_name: _ctx.add_meta('requested_model', model_name) if not resp.success: last_error = resp.error or 'ollama generate failed' _ctx.set_error(last_error) continue raw = (resp.content or "").strip() match = re.search(r"\[.*\]", raw, re.DOTALL) if not match: last_error = f"missing JSON array: {raw[:120]}" _ctx.set_error(last_error) logger.warning("[CodeReview] Hermes 回應無 JSON: %s", raw[:200]) continue try: findings = json.loads(match.group()) except Exception as exc: last_error = f"json parse failed: {type(exc).__name__}: {exc}" _ctx.set_error(last_error) continue break if findings is None: logger.warning("[CodeReview] Hermes 本地掃描全部失敗: %s", last_error) return [] for f in findings: sev = f.get("severity", "LOW").lower() if sev in self.state["severity_summary"]: self.state["severity_summary"][sev] += 1 self.state["findings"] = findings self._sync_global() return findings except Exception as e: logger.warning("[CodeReview] Hermes 掃描失敗: %s", e) return [] def _static_code_scan(self, files: Dict[str, str]) -> List[Dict]: """Hermes LLM 關閉時的快速 deterministic 掃描,避免部署後卡 Ollama timeout。""" findings: List[Dict] = [] patterns = [ ("HIGH", "security", r"\beval\s*\(", "使用 eval() 有安全風險", "改用安全 parser 或白名單映射"), ("HIGH", "security", r"\bexec\s*\(", "使用 exec() 有安全風險", "移除動態執行或改成明確函式"), ("MEDIUM", "maintainability", r"except\s+Exception\s*:\s*pass\b", "例外被靜默吞掉", "改用 logger.exception 並保留可診斷訊息"), ("MEDIUM", "maintainability", r"except\s*:\s*pass\b", "裸 except 被靜默吞掉", "指定例外類型並記錄錯誤"), ] request_pattern = re.compile(r"requests\.(get|post|put|delete|patch)\s*\(") secret_pattern = re.compile( r"(api[_-]?key|token|password|secret)\s*=\s*['\"][^'\"]{12,}['\"]", re.IGNORECASE, ) for path, content in files.items(): lines = content.splitlines() for line_no, line in enumerate(lines, start=1): stripped = line.strip() request_block = "\n".join(lines[line_no - 1:line_no + 5]) if request_pattern.search(stripped) and "timeout=" not in request_block: findings.append({ "severity": "MEDIUM", "type": "performance", "file": path, "line_hint": str(line_no), "description": "HTTP request 未設定 timeout,可能拖住 worker", "suggestion": "加入明確 timeout 並處理例外", }) if secret_pattern.search(stripped) and "os.getenv" not in stripped: findings.append({ "severity": "HIGH", "type": "security", "file": path, "line_hint": str(line_no), "description": "疑似硬編碼敏感字串", "suggestion": "改用環境變數或 secret store", }) for severity, issue_type, pattern, description, suggestion in patterns: if re.search(pattern, stripped): findings.append({ "severity": severity, "type": issue_type, "file": path, "line_hint": str(line_no), "description": description, "suggestion": suggestion, }) if len(findings) >= 8: return findings[:8] return findings # ── Step 3:OpenClaw 評估 ────────────────────────────────────────────────── def _openclaw_assess(self, files: Dict[str, str], findings: List[Dict]) -> str: """ 路由優先序: L1 (預設) → Ollama GCP-A → GCP-B L1b (flag CODE_REVIEW_ALLOW_111_FALLBACK=true) → 111 最後備援 L2 (flag CODE_REVIEW_USE_CLAUDE=true) → Claude Opus 4.7 雲端備援 L3 (Gemini guard 顯式解鎖) → Gemini 雲端備援 L4 (雲端未顯式開啟時) → deterministic 本地降級摘要 L5 (雲端顯式開啟但失敗時) → ElephantAlpha via NIM/OpenRouter """ sev = self.state["severity_summary"] findings_json = json.dumps(findings[:8], ensure_ascii=False, indent=2) files_list = "\n".join(f"- {k} ({len(v)} 字元)" for k, v in list(files.items())[:5]) system = ( "你是 OpenClaw 程式碼品質戰略分析師,以技術主管視角評估部署後程式碼。" "語言:繁體中文。風格:精準、數據導向、可執行建議。" ) user_prompt = f"""【部署】Commit {self.commit_sha[:8]} @ {self.branch} 【變更檔案】 {files_list} 【Hermes 掃描摘要】CRITICAL={sev['critical']} HIGH={sev['high']} MEDIUM={sev['medium']} LOW={sev['low']} 【問題明細】 {findings_json} 請產出程式碼品質評估(使用 HTML 標題,150字以內): 🔍 整體風險等級(一句理由) ⚠️ 最需關注問題(TOP 2) 💡 架構優化方向(1條長期建議) ✅ 本次部署亮點""" # ── L1:Ollama-first — GCP-A → GCP-B(111 需顯式開 flag)────────────── from services.ollama_service import ( OLLAMA_HOST_FALLBACK, OLLAMA_HOST_PRIMARY, OLLAMA_HOST_SECONDARY, OllamaService, get_host_label, get_provider_tag, ) gemini_api_key = get_gemini_api_key("code_review") cloud_fallback_available = CODE_REVIEW_USE_CLAUDE or bool(gemini_api_key) fallback_caller = 'code_review_openclaw' if CODE_REVIEW_USE_CLAUDE else ( 'code_review_openclaw_gemini' if gemini_api_key else 'code_review_elephant' ) if not cloud_fallback_available: fallback_caller = 'code_review_local_degraded' ollama_attempts = [ ( "primary_code", OLLAMA_HOST_PRIMARY, CODE_REVIEW_OLLAMA_MODEL, CODE_REVIEW_OLLAMA_TIMEOUT, ), ( "secondary_fast", OLLAMA_HOST_SECONDARY, CODE_REVIEW_OLLAMA_SECONDARY_MODEL, CODE_REVIEW_OLLAMA_SECONDARY_TIMEOUT, ), ] if CODE_REVIEW_ALLOW_111_FALLBACK: ollama_attempts.append(( "lan_111_hermes", OLLAMA_HOST_FALLBACK, CODE_REVIEW_OLLAMA_FALLBACK_MODEL, CODE_REVIEW_OLLAMA_FALLBACK_TIMEOUT, )) last_ollama_error = None for attempt_index, (attempt_key, host, model_name, timeout_s) in enumerate( ollama_attempts, start=1, ): with log_ai_call( caller='code_review_openclaw', provider=get_provider_tag(host), model=model_name, request_id=f"cr-{self.commit_sha[:8]}", meta={ 'commit': self.commit_sha[:8], 'branch': self.branch, 'route': 'ollama_first', 'attempt': attempt_index, 'attempt_key': attempt_key, 'timeout_s': timeout_s, }, ) as _ctx: _ctx.add_meta('host', host) _ctx.add_meta('host_label', get_host_label(host)) if not _code_review_ollama_host_reachable(host): last_ollama_error = ( "ollama host preflight failed " f"host={host} timeout={CODE_REVIEW_OLLAMA_HOST_PREFLIGHT_TIMEOUT}s" ) _ctx.add_meta('preflight', 'api_version') _ctx.set_error(last_ollama_error) if attempt_index == len(ollama_attempts): _ctx.fallback_to_caller(fallback_caller) continue if not _code_review_ollama_model_available(host, model_name): last_ollama_error = f"ollama model preflight failed host={host} model={model_name}" _ctx.add_meta('preflight', 'api_tags') _ctx.set_error(last_ollama_error) if attempt_index == len(ollama_attempts): _ctx.fallback_to_caller(fallback_caller) continue ollama = OllamaService(host=host, model=model_name) resp = ollama.generate( prompt=user_prompt, system_prompt=system, model=model_name, temperature=0.2, timeout=timeout_s, keep_alive=CODE_REVIEW_OLLAMA_KEEP_ALIVE, options={"num_predict": CODE_REVIEW_OLLAMA_NUM_PREDICT}, ) actual_host = resp.host or host _ctx.set_provider(get_provider_tag(actual_host)) _ctx.set_model(resp.model or model_name) _ctx.set_tokens(input=resp.input_tokens, output=resp.output_tokens) _ctx.add_meta('host', actual_host) _ctx.add_meta('host_label', get_host_label(actual_host)) if resp.model and resp.model != model_name: _ctx.add_meta('requested_model', model_name) if resp.success and (resp.content or '').strip(): return resp.content or "" last_ollama_error = resp.error or 'ollama generate failed' _ctx.set_error(last_ollama_error) if attempt_index == len(ollama_attempts): _ctx.fallback_to_caller(fallback_caller) logger.warning( "[CodeReview] OpenClaw 本地 Ollama 鏈全部失敗: %s", last_ollama_error, ) if not cloud_fallback_available: logger.warning( "[CodeReview] 111/cloud fallback 未啟用,回傳 deterministic 本地降級評估", ) return self._build_local_openclaw_degraded_report( files=files, findings=findings, last_error=last_ollama_error, ) # ── L1:Phase 7 Frontier — Claude Opus 4.7(程式碼能力 #1)──────────── # feature flag 預設 OFF;ON 時只作 Ollama 失敗後的雲端備援。 if CODE_REVIEW_USE_CLAUDE: try: from services.anthropic_service import anthropic_service except Exception as e: logger.warning("[CodeReview] Claude service import 失敗,改走 Gemini 備援: %s", e) anthropic_service = None # type: ignore if anthropic_service is not None and anthropic_service.is_available(): with log_ai_call( caller='code_review_openclaw', provider='claude', model=CLAUDE_REVIEW_MODEL, request_id=f"cr-{self.commit_sha[:8]}", meta={ 'commit': self.commit_sha[:8], 'branch': self.branch, 'flag': 'CODE_REVIEW_USE_CLAUDE', }, ) as _ctx: resp = anthropic_service.generate( prompt=user_prompt, system_prompt=system, # ephemeral cache(5 分鐘 TTL,省 ~90% 成本) model=CLAUDE_REVIEW_MODEL, max_tokens=2048, temperature=0.2, # code review 要精確 cache_system=True, timeout=120, ) if resp.success: _ctx.set_tokens(input=resp.input_tokens, output=resp.output_tokens) _ctx.set_cache_hit(resp.cache_hit) _ctx.add_meta('cache_creation_tokens', resp.cache_creation_tokens) _ctx.add_meta('cache_read_tokens', resp.cache_read_tokens) return resp.content or "" # Claude 失敗 → fallback 到 Gemini 備援(L3) _ctx.set_error(resp.error or 'claude generate failed') _ctx.fallback_to_caller('code_review_openclaw_gemini') logger.warning( "[CodeReview] Claude 失敗,改走 Gemini 備援: %s", resp.error, ) else: logger.info( "[CodeReview] CODE_REVIEW_USE_CLAUDE=true 但 Claude 不可用(缺 API key 或 SDK),改走下一層備援", ) # ── L3:Gemini — 僅作 Ollama/Claude 都失敗後的備援 ─────────────────── if gemini_api_key: with log_ai_call( caller='code_review_openclaw_gemini', provider='gemini', model=REVIEW_MODEL, request_id=f"cr-{self.commit_sha[:8]}", meta={ 'commit': self.commit_sha[:8], 'branch': self.branch, 'fallback_from': 'code_review_openclaw_ollama', }, ) as _ctx: try: import google.generativeai as genai genai.configure(api_key=gemini_api_key) model = genai.GenerativeModel( model_name=REVIEW_MODEL, generation_config=genai.types.GenerationConfig( temperature=0.3, max_output_tokens=1500, ), system_instruction=system, ) resp = model.generate_content(user_prompt, request_options={"timeout": 90}) try: usage = getattr(resp, 'usage_metadata', None) if usage is not None: _ctx.set_tokens( input=getattr(usage, 'prompt_token_count', 0) or 0, output=getattr(usage, 'candidates_token_count', 0) or 0, ) except Exception: logger.debug("[CodeReview] Gemini usage metadata parse failed", exc_info=True) return resp.text or "" except Exception as e: logger.warning("[CodeReview] OpenClaw Gemini 失敗,降級 ElephantAlpha: %s", e) _ctx.set_error(f"{type(e).__name__}: {e}") _ctx.fallback_to_caller('code_review_elephant') else: logger.info("[CodeReview] 跳過 Gemini 備援:%s", gemini_disabled_message("code_review")) # 降級:ElephantAlpha via OpenRouter(OPENROUTER_API_KEY 容器內一定有) # Phase 1 v5.0 logger 追蹤 with log_ai_call( caller='code_review_elephant', provider='nim_via_elephant', model='nvidia/llama-3.3-nemotron-super-49b-v1.5', request_id=f"cr-{self.commit_sha[:8]}", meta={'commit': self.commit_sha[:8], 'branch': self.branch}, ) as _ctx: try: from services.elephant_service import elephant_service resp = elephant_service.generate( prompt=user_prompt, system_prompt=system, temperature=0.3, timeout=90, ) if resp.success: # ElephantResponse 已含 input_tokens/output_tokens _ctx.set_tokens( input=getattr(resp, 'input_tokens', 0) or 0, output=getattr(resp, 'output_tokens', 0) or 0, ) return resp.content or "" else: _ctx.set_error(getattr(resp, 'error', 'elephant generate failed')) except Exception as e: logger.warning("[CodeReview] OpenClaw ElephantAlpha 降級也失敗: %s", e) _ctx.set_error(f"{type(e).__name__}: {e}") return "" def _build_local_openclaw_degraded_report( self, *, files: Dict[str, str], findings: List[Dict], last_error: Optional[str], ) -> str: sev = self.state["severity_summary"] risk = "低" if sev["critical"]: risk = "嚴重" elif sev["high"]: risk = "高" elif sev["medium"]: risk = "中" top = [f for f in findings[:2] if f.get("description")] top_text = ";".join( f"{f.get('file', 'unknown')}:{f.get('description')}" for f in top ) or "本次 deterministic scan 未發現高風險問題" error_text = (last_error or "local Ollama unavailable")[:120] return ( f"🔍 整體風險等級 {risk}。" f"本地掃描完成 {len(files)} 檔,CRITICAL={sev['critical']}、HIGH={sev['high']}、" f"MEDIUM={sev['medium']}、LOW={sev['low']}。" f"⚠️ 最需關注問題 {top_text}。" f"💡 架構優化方向 GCP-A/GCP-B OpenClaw 不可用時暫停 111 重分析,避免拖高 fallback 主機負載。" f"✅ 本次部署亮點 已以本地降級報告收斂,未呼叫 Gemini;最後錯誤:{error_text}" ) # ── Step 4:ElephantAlpha 決策 ───────────────────────────────────────────── def _ea_orchestrate(self, findings: List[Dict], openclaw_report: str) -> Dict: sev = self.state["severity_summary"] critical_n = sev["critical"] high_n = sev["high"] # 嘗試呼叫 ElephantAlpha 做精細判斷 try: from services.elephant_service import elephant_service top3 = json.dumps( [f for f in findings if f.get("severity") in ("CRITICAL", "HIGH")][:3], ensure_ascii=False, ) prompt = f"""你是 Elephant Alpha,負責協調 Code Review 後的修復決策。 【部署】commit={self.commit_sha[:8]} branch={self.branch} 【問題統計】CRITICAL={critical_n} HIGH={high_n} MEDIUM={sev['medium']} LOW={sev['low']} 【Top 問題】{top3} 【OpenClaw評估摘要】{openclaw_report[:300]} 請以 JSON 回答(不含其他文字): {{ "priority": "critical|high|medium|low", "auto_fix": true|false, "reasoning": "決策理由(繁體中文,一句話,需含具體數字)", "fix_files": ["需自動修復的檔案(最多5個,所有有問題的檔案)"], "human_review_needed": false }} 規則(依 ADR-020 全自動修復政策,覆寫 ADR-012 L3 HITL 對 code review 的限制): - 任何 finding(不論 CRITICAL/HIGH/MEDIUM/LOW)→ auto_fix=true,human_review_needed=false - 安全網是 Git revert + Gitea CI/CD 回滾,不依賴人工審查門檻 - priority 按最嚴重 severity 決定:CRITICAL>HIGH>MEDIUM>LOW - fix_files 填入所有有問題的檔案(最多 5 個,AiderHeal 端會再限流)""" resp = elephant_service.generate( prompt=prompt, json_mode=True, temperature=0.1, timeout=60, ) if resp.success: return self._guard_ea_decision(json.loads(resp.content), findings) except Exception as e: logger.warning("[CodeReview] ElephantAlpha 決策失敗,回退規則: %s", e) # 規則 fallback:ADR-020 全自動修復政策。任何 finding 一律 auto_fix=true。 has_findings = len(findings) > 0 priority = ( "critical" if critical_n > 0 else "high" if high_n > 0 else "medium" if sev["medium"] > 0 else "low" if sev["low"] > 0 else "low" ) auto_fix = bool(has_findings and AUTO_FIX_ENABLED) fix_files = list({ f.get("file", "") for f in findings if f.get("file") })[:5] return { "priority": priority, "auto_fix": auto_fix, "reasoning": f"ADR-020 全自動修復:CRITICAL={critical_n} HIGH={high_n} MEDIUM={sev['medium']} LOW={sev['low']}," + ("觸發 AiderHeal 自動修復(Git+CI/CD 為回滾安全網)" if auto_fix else "無 finding,無需修復"), "fix_files": fix_files, "human_review_needed": False, } def _guard_ea_decision(self, decision: Dict, findings: List[Dict]) -> Dict: """ADR-020 全自動修復政策:有 finding 一律 auto_fix=true,僅受 AUTO_FIX_ENABLED 主開關控制。""" sev = self.state["severity_summary"] priority = (decision.get("priority") or "").lower() or ( "critical" if sev["critical"] > 0 else "high" if sev["high"] > 0 else "medium" if sev["medium"] > 0 else "low" ) has_findings = bool(findings) allowed_auto_fix = bool(has_findings and AUTO_FIX_ENABLED) if has_findings and not AUTO_FIX_ENABLED: logger.warning( "[CodeReview] auto_fix 被 CODE_REVIEW_AUTO_FIX_ENABLED=false 主開關擋下 priority=%s", priority, ) decision["priority"] = priority decision["auto_fix"] = allowed_auto_fix decision["human_review_needed"] = False decision["reasoning"] = ( f"{decision.get('reasoning', '')} " f"[ADR-020 全自動修復: auto_fix={'enabled' if allowed_auto_fix else 'flag_disabled'}, priority={priority}]" ).strip() return decision # ── Step 5:NemoTron 派遣 ────────────────────────────────────────────────── def _nemotron_dispatch(self, ea: Dict, findings: List[Dict]) -> Dict: auto_fix = ea.get("auto_fix", False) fix_files = ea.get("fix_files", []) allowed_fix_files = _aider_allowed_fix_files(fix_files) priority_map = {"critical": 1, "high": 2, "medium": 3, "low": 4} priority_num = priority_map.get(ea.get("priority", "low"), 4) actions_created = 0 session = get_session() try: # 每個需修復的檔案建立一筆 action_plan for fpath in fix_files[:3]: if active_code_review_action_exists(session, fpath): logger.info("[CodeReview] skip duplicate active action_plan file=%s", fpath) continue related = [f for f in findings if f.get("file") == fpath][:3] desc = f"Code Review 修復:{fpath}|{', '.join(f.get('description','')[:40] for f in related)}" session.execute(text(""" INSERT INTO action_plans (action_type, description, status, priority, metadata_json, created_at) VALUES ('code_review_fix', :desc, :status, :priority, :meta, NOW()) """), { "desc": desc[:500], "status": ( "auto_pending" if auto_fix and fpath in allowed_fix_files else "auto_skipped_whitelist" if auto_fix else "auto_disabled" ), "priority": priority_num, "meta": json.dumps({ "pipeline_id": self.pipeline_id, "commit_sha": self.commit_sha, "file": fpath, "auto_fix": auto_fix, "aider_auto_fix_allowed": fpath in allowed_fix_files, "ea_priority": ea.get("priority"), "findings": related, }, ensure_ascii=False), }) actions_created += 1 session.commit() except Exception as e: logger.warning("[CodeReview] action_plans 寫入失敗: %s", e) session.rollback() finally: session.close() # 觸發 AiderHeal(非阻塞) if auto_fix and allowed_fix_files: self.state["auto_fix_triggered"] = True self._sync_global() self._trigger_aider_heal(findings, allowed_fix_files) elif auto_fix and fix_files: self.state["auto_fix_skipped_whitelist"] = True self._sync_global() logger.info("[CodeReview] AiderHeal skipped: no files matched ADR-020 whitelist files=%s", fix_files) return { "actions": actions_created, "auto_fix": auto_fix, "aider_fix_files": allowed_fix_files, } def _trigger_aider_heal(self, findings: List[Dict], fix_files: List[str]): """非阻塞觸發 AiderHeal 自動修復""" def _heal_worker(): try: from services.aider_heal_executor import execute_code_fix for fpath in fix_files[:2]: # 最多同時修 2 個檔案 related = [f for f in findings if f.get("file") == fpath] if not related: continue worst = sorted(related, key=lambda x: {"CRITICAL":0,"HIGH":1,"MEDIUM":2,"LOW":3}.get(x.get("severity","LOW"),3))[0] result = execute_code_fix( error_type=f"code_review_{worst.get('type','bug')}", error_message=worst.get("description", "Code Review 發現問題"), target_file=fpath, context={"suggestion": worst.get("suggestion", ""), "pipeline_id": self.pipeline_id}, ) logger.info("[CodeReview] AiderHeal %s → %s", fpath, result.get("message", "")) except Exception as e: logger.error("[CodeReview] AiderHeal 觸發失敗: %s", e) t = threading.Thread(target=_heal_worker, daemon=True, name=f"aider-heal-{self.commit_sha[:8]}") t.start() # ── DB 持久化 ────────────────────────────────────────────────────────────── def _save_to_db(self, findings: List[Dict], openclaw_report: str, ea: Dict): session = get_session() try: row = session.execute(text(""" INSERT INTO ai_insights (insight_type, content, confidence, created_by, status, metadata_json, period, created_at) VALUES ('code_review_result', :content, :conf, 'code_review_pipeline', 'active', :meta, :period, NOW()) RETURNING id """), { "content": json.dumps({ "findings": findings, "openclaw_report": openclaw_report, "ea_decision": ea, "severity_summary": self.state["severity_summary"], }, ensure_ascii=False)[:8000], "conf": 0.90, "meta": json.dumps({ "pipeline_id": self.pipeline_id, "commit_sha": self.commit_sha, "branch": self.branch, "changed_files": self.changed_files, "auto_fix_triggered": self.state["auto_fix_triggered"], }, ensure_ascii=False), "period": datetime.now().strftime("%Y-%m-%d"), }).fetchone() session.commit() if row: try: from services.openclaw_learning_service import enqueue_insight_embedding enqueue_insight_embedding(row[0], "code_review_result", json.dumps({ "findings": findings, "openclaw_report": openclaw_report, "ea_decision": ea, }, ensure_ascii=False), datetime.now().strftime("%Y-%m-%d")) except Exception as embed_err: logger.warning("[CodeReview] embedding queue enqueue failed: %s", embed_err) logger.info("[CodeReview] ai_insights 寫入成功 pipeline=%s", self.pipeline_id) except Exception as e: logger.error("[CodeReview] DB 寫入失敗: %s", e) session.rollback() finally: session.close() # ── Telegram 通知 ───────────────────────────────────────────────────────── def _notify_start(self): try: from services.telegram_templates import _send_telegram_raw files_list = "\n".join(f" • {f}" for f in self.changed_files[:5]) if len(self.changed_files) > 5: files_list += f"\n (+{len(self.changed_files)-5} 個)" _send_telegram_raw( f"🔍 Code Review 啟動\n" f"══════════════════════════\n" f"📦 Commit {self.commit_sha[:8]} 🌿 {self.branch}\n" f"📝 變更檔案:\n{files_list}\n" f"══════════════════════════\n" f"🤖 Hermes → OpenClaw → Elephant Alpha → NemoTron\n" f"📊 即時進度:https://mo.wooo.work/code-review/" ) except Exception as e: logger.warning("[CodeReview] 啟動通知失敗: %s", e) def _notify_complete(self, findings: List[Dict], openclaw_report: str, ea: Dict): try: from services.telegram_templates import _send_telegram_raw sev = self.state["severity_summary"] priority = ea.get("priority", "medium") auto_fix = ea.get("auto_fix", False) fix_files = ea.get("fix_files", []) allowed_fix_files = _aider_allowed_fix_files(fix_files) icon = {"critical": "🔴", "high": "🟠", "medium": "🟡", "low": "🟢"}.get(priority, "🟡") top_issues = [f for f in findings if f.get("severity") in ("CRITICAL", "HIGH")][:3] issues_lines = "\n".join( f" {'🔴' if f['severity']=='CRITICAL' else '🟠'} [{f['severity']}] {f.get('description','')} — {f.get('file','')}" for f in top_issues ) or " ✅ 無高風險問題" msg = ( f"{icon} Code Review 完成 · {self.commit_sha[:8]}\n" f"══════════════════════════\n" f"🔴 CRITICAL {sev['critical']} " f"🟠 HIGH {sev['high']} " f"🟡 MEDIUM {sev['medium']} " f"🟢 LOW {sev['low']}\n" f"══════════════════════════\n" f"⚠️ 主要問題\n{issues_lines}\n" ) if openclaw_report: msg += f"\n{openclaw_report[:400]}\n" if auto_fix and allowed_fix_files: fix_status = "🔧 已觸發自動修復(AiderHeal)" elif auto_fix and fix_files: fix_status = "⚠️ 不在自動修復白名單,需人工處理" elif sev['critical'] + sev['high'] + sev['medium'] + sev['low'] == 0: fix_status = "✅ 無需修復動作" else: fix_status = "🛑 自動修復主開關關閉(CODE_REVIEW_AUTO_FIX_ENABLED=false)" msg += ( f"══════════════════════════\n" f"🤖 Elephant Alpha:{priority.upper()} {fix_status}\n" f"📊 完整報告:https://mo.wooo.work/code-review/" ) _send_telegram_raw(msg) except Exception as e: logger.warning("[CodeReview] 完成通知失敗: %s", e) def _notify_error(self, error: str): try: from services.telegram_templates import _send_telegram_raw _send_telegram_raw( f"🚨 Code Review Pipeline 失敗\n" f"Commit:{self.commit_sha[:8]} @ {self.branch}\n" f"錯誤:{error[:200]}\n" f"📊 查看:https://mo.wooo.work/code-review/" ) except Exception as exc: logger.warning("[CodeReview] 失敗通知發送失敗: %s", exc, exc_info=True) # ═══════════════════════════════════════════════════════════════════════════════ # 公開 API # ═══════════════════════════════════════════════════════════════════════════════ def trigger_post_deploy_review( commit_sha: str, changed_files: List[str], branch: str = "main", deploy_type: str = "sync", ) -> str: """ 啟動 Pipeline(後台 daemon thread)。 回傳 pipeline_id。由 routes/code_review_routes.py 的 webhook 端點呼叫。 """ pipeline = CodeReviewPipeline(commit_sha, changed_files, branch, deploy_type) t = threading.Thread( target=pipeline.run, daemon=True, name=f"code-review-{commit_sha[:8]}", ) t.start() logger.info("[CodeReview] 已派發 pipeline=%s files=%d", pipeline.pipeline_id, len(pipeline.changed_files)) return pipeline.pipeline_id def get_current_state() -> Dict[str, Any]: """前端 polling 用:取得目前 pipeline 即時狀態""" with _pipeline_lock: return dict(_current_pipeline) def get_history(limit: int = 20) -> List[Dict]: """取得 ai_insights 中歷史 code_review_result 記錄""" session = get_session() try: rows = session.execute(text(""" SELECT id, content, confidence, metadata_json, created_at, status FROM ai_insights WHERE insight_type = 'code_review_result' ORDER BY created_at DESC LIMIT :lim """), {"lim": limit}).fetchall() results = [] for r in rows: meta, content = {}, {} try: meta = json.loads(r[3]) if r[3] else {} content = json.loads(r[1]) if r[1] else {} except Exception: logger.debug("[CodeReview] history row JSON decode failed id=%s", r[0], exc_info=True) sev = content.get("severity_summary", {}) results.append({ "id": r[0], "pipeline_id": meta.get("pipeline_id", ""), "commit_sha": meta.get("commit_sha", "")[:8], "branch": meta.get("branch", ""), "changed_files": meta.get("changed_files", []), "severity_summary": sev, "total_issues": sum(sev.values()), "auto_fix": meta.get("auto_fix_triggered", False), "findings": content.get("findings", []), "openclaw_report": content.get("openclaw_report", ""), "ea_decision": content.get("ea_decision", {}), "created_at": r[4].isoformat() if r[4] else "", "status": r[5] or "active", }) return results except Exception as e: logger.warning("[CodeReview] 歷史讀取失敗: %s", e) return [] finally: session.close() def verify_internal_token(request_token: str) -> bool: """驗證 CD webhook 來源 token。Production 預設必填,避免外部觸發 auto-review/fix 鏈。""" if not INTERNAL_TOKEN: if ALLOW_INSECURE_WEBHOOK: logger.warning("[CodeReview] INTERNAL_WEBHOOK_TOKEN 未設定,僅因 dev override 放行") return True logger.error("[CodeReview] INTERNAL_WEBHOOK_TOKEN 未設定,拒絕 webhook") return False return request_token == INTERNAL_TOKEN