Files
ewoooc/services/code_review_pipeline_service.py

1129 lines
53 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
services/code_review_pipeline_service.py
Post-Deploy AI Agent Code Review Pipeline
觸發時機CD 健康檢查通過後,由 Gitea Action webhook 呼叫
Pipeline
Step 1 system 讀取變更檔案內容
Step 2 Hermes 程式碼掃描bugs / security / performance
Step 3 OpenClaw 架構品質評估Ollama-firstGemini 僅備援)
Step 4 ElephantAlpha 決策協調severity 判定 + auto-fix 裁量)
Step 5 NemoTron 行動派遣action_plans 寫入 + AiderHeal 觸發)
結果輸出:
- ai_insightstype='code_review_result'
- action_planstype='code_review_fix'
- Telegram 告警(啟動 / 完成 / 錯誤)
- 前端 /code-review/ 即時 polling 狀態
"""
import json
import logging
import os
import re
import threading
from datetime import datetime
from typing import Any, Dict, List, Optional
from database.manager import get_session
from sqlalchemy import text
# ADR-027Code Review 走 OllamaService 取得三主機級聯 retry。
from services.hermes_analyst_service import HERMES_MODEL as _HERMES_MODEL
from services.ai_call_logger import log_ai_call # Operation Ollama-First v5.0 P1
from services.action_plan_dedupe import active_code_review_action_exists
from services.gemini_guard import gemini_disabled_message, get_gemini_api_key
logger = logging.getLogger(__name__)
def _env_bool(name: str, default: str = "false") -> bool:
return os.getenv(name, default).strip().lower() in {"1", "true", "yes", "on"}
# ── Pipeline 全域狀態(供前端 polling─────────────────────────────────────
_current_pipeline: Dict[str, Any] = {}
_pipeline_lock = threading.Lock()
# Gemini 僅作 Code Review Ollama/Claude 主路徑都失敗後的最後雲端備援。
REVIEW_MODEL = os.getenv("OPENCLAW_MODEL", "gemini-2.5-flash")
CODE_REVIEW_OLLAMA_MODEL = os.getenv(
"CODE_REVIEW_OLLAMA_MODEL",
os.getenv("OPENCLAW_OLLAMA_MODEL", "qwen2.5-coder:7b"),
)
CODE_REVIEW_OLLAMA_TIMEOUT = int(os.getenv("CODE_REVIEW_OLLAMA_TIMEOUT", "15"))
CODE_REVIEW_OLLAMA_SECONDARY_MODEL = os.getenv(
"CODE_REVIEW_OLLAMA_SECONDARY_MODEL",
"gemma3:4b",
)
CODE_REVIEW_OLLAMA_SECONDARY_TIMEOUT = int(
os.getenv("CODE_REVIEW_OLLAMA_SECONDARY_TIMEOUT", "60")
)
CODE_REVIEW_OLLAMA_FALLBACK_MODEL = os.getenv(
"CODE_REVIEW_OLLAMA_FALLBACK_MODEL",
_HERMES_MODEL,
)
CODE_REVIEW_OLLAMA_FALLBACK_TIMEOUT = int(
os.getenv("CODE_REVIEW_OLLAMA_FALLBACK_TIMEOUT", "20")
)
CODE_REVIEW_OLLAMA_NUM_PREDICT = int(os.getenv("CODE_REVIEW_OLLAMA_NUM_PREDICT", "384"))
CODE_REVIEW_OLLAMA_KEEP_ALIVE = os.getenv("CODE_REVIEW_OLLAMA_KEEP_ALIVE", "5m")
CODE_REVIEW_ALLOW_111_FALLBACK = _env_bool("CODE_REVIEW_ALLOW_111_FALLBACK", "false")
CODE_REVIEW_HERMES_TIMEOUT = int(os.getenv("CODE_REVIEW_HERMES_TIMEOUT", "35"))
CODE_REVIEW_HERMES_PRIMARY_MODEL = os.getenv(
"CODE_REVIEW_HERMES_PRIMARY_MODEL",
CODE_REVIEW_OLLAMA_MODEL,
)
CODE_REVIEW_HERMES_SECONDARY_MODEL = os.getenv(
"CODE_REVIEW_HERMES_SECONDARY_MODEL",
CODE_REVIEW_OLLAMA_SECONDARY_MODEL,
)
CODE_REVIEW_HERMES_FALLBACK_MODEL = os.getenv(
"CODE_REVIEW_HERMES_FALLBACK_MODEL",
CODE_REVIEW_OLLAMA_FALLBACK_MODEL,
)
CODE_REVIEW_HERMES_PRIMARY_TIMEOUT = int(
os.getenv("CODE_REVIEW_HERMES_PRIMARY_TIMEOUT", os.getenv("CODE_REVIEW_HERMES_TIMEOUT", "15"))
)
CODE_REVIEW_HERMES_SECONDARY_TIMEOUT = int(os.getenv("CODE_REVIEW_HERMES_SECONDARY_TIMEOUT", "45"))
CODE_REVIEW_HERMES_FALLBACK_TIMEOUT = int(os.getenv("CODE_REVIEW_HERMES_FALLBACK_TIMEOUT", "20"))
CODE_REVIEW_HERMES_NUM_PREDICT = int(os.getenv("CODE_REVIEW_HERMES_NUM_PREDICT", "384"))
CODE_REVIEW_HERMES_MAX_FILES = int(os.getenv("CODE_REVIEW_HERMES_MAX_FILES", "2"))
CODE_REVIEW_HERMES_MAX_CHARS = int(os.getenv("CODE_REVIEW_HERMES_MAX_CHARS", "900"))
CODE_REVIEW_HERMES_LLM_SCAN_ENABLED = (
os.getenv("CODE_REVIEW_HERMES_LLM_SCAN_ENABLED", "false").lower() == "true"
)
INTERNAL_TOKEN = os.getenv("INTERNAL_WEBHOOK_TOKEN", "")
AUTO_FIX_ENABLED = os.getenv("CODE_REVIEW_AUTO_FIX_ENABLED", "true").lower() == "true"
ALLOW_INSECURE_WEBHOOK = os.getenv("MOMO_ALLOW_INSECURE_INTERNAL_WEBHOOK_FOR_DEV", "").lower() == "true"
AIDER_AUTO_FIX_FILE_PATTERN = re.compile(
r"^(services|routes|database)/(?:[a-zA-Z0-9_]+/)*[a-zA-Z0-9_]+\.py$"
)
# Phase 7 Frontier 升級 feature flag — 預設 OFF啟用後只作 Ollama 失敗後的雲端備援。
CODE_REVIEW_USE_CLAUDE = os.getenv("CODE_REVIEW_USE_CLAUDE", "false").lower() == "true"
CLAUDE_REVIEW_MODEL = os.getenv("CLAUDE_MODEL", "claude-opus-4-7")
def _aider_allowed_fix_files(files: List[str]) -> List[str]:
"""回傳 ADR-020 允許交給 AiderHeal 自動修復的檔案。"""
return [f for f in files if AIDER_AUTO_FIX_FILE_PATTERN.match(f or "")]
# ═══════════════════════════════════════════════════════════════════════════════
# Pipeline Class
# ═══════════════════════════════════════════════════════════════════════════════
class CodeReviewPipeline:
"""
5-Step post-deploy code review pipeline.
Call pipeline.run() inside a daemon thread.
"""
def __init__(self, commit_sha: str, changed_files: List[str],
branch: str = "main", deploy_type: str = "sync"):
self.commit_sha = commit_sha
self.branch = branch
self.deploy_type = deploy_type
self.started_at = datetime.now()
self.pipeline_id = f"cr_{commit_sha[:8]}_{self.started_at.strftime('%Y%m%d_%H%M%S')}"
# 只 review Python + YAML 檔(跳過靜態資源)
self.changed_files = [
f for f in changed_files
if f.endswith(('.py', '.yaml', '.yml', '.json'))
and not f.startswith(('node_modules/', '.git/'))
]
self.state: Dict[str, Any] = {
"pipeline_id": self.pipeline_id,
"commit_sha": commit_sha,
"branch": branch,
"changed_files": self.changed_files,
"status": "running",
"current_step": 0,
"total_steps": 5,
"steps": [],
"findings": [],
"severity_summary": {"critical": 0, "high": 0, "medium": 0, "low": 0},
"openclaw_report": "",
"ea_decision": {},
"auto_fix_triggered": False,
"started_at": self.started_at.isoformat(),
"completed_at": None,
"message": "",
}
self._sync_global()
# ── State helpers ─────────────────────────────────────────────────────────
def _sync_global(self):
global _current_pipeline
with _pipeline_lock:
_current_pipeline = dict(self.state)
def _step_start(self, num: int, name: str, agent: str):
self.state["current_step"] = num
self.state["steps"].append({
"step": num,
"name": name,
"agent": agent,
"status": "running",
"started_at": datetime.now().isoformat(),
"completed_at": None,
"summary": "",
})
self._sync_global()
logger.info("[CodeReview] ▶ Step %d/5 %s (%s)", num, name, agent)
def _step_done(self, num: int, summary: str, ok: bool = True):
for s in self.state["steps"]:
if s["step"] == num:
s["status"] = "ok" if ok else "error"
s["completed_at"] = datetime.now().isoformat()
s["summary"] = summary[:300]
self._sync_global()
logger.info("[CodeReview] %s Step %d%s", "" if ok else "", num, summary[:100])
def _finish(self, status: str, message: str):
self.state["status"] = status
self.state["completed_at"] = datetime.now().isoformat()
self.state["message"] = message
self._sync_global()
# ── Main pipeline ─────────────────────────────────────────────────────────
def run(self):
"""Execute full pipeline. Designed to run in daemon thread."""
try:
self._notify_start()
# Step 1 ─ read files
self._step_start(1, "讀取變更檔案", "system")
file_contents = self._read_changed_files()
if not file_contents:
self._step_done(1, "無有效 Python/YAML 變更,跳過 Review")
self._finish("skipped", "無有效變更檔案")
return
self._step_done(1, f"讀取 {len(file_contents)} 個檔案")
# Step 2 ─ Hermes scan
self._step_start(2, "Hermes 程式碼掃描", "Hermes")
findings = self._hermes_scan(file_contents)
cnt = self.state["severity_summary"]
self._step_done(2, f"CRITICAL={cnt['critical']} HIGH={cnt['high']} MEDIUM={cnt['medium']} LOW={cnt['low']}")
# Step 3 ─ OpenClaw assessment
self._step_start(3, "OpenClaw 架構品質評估", "OpenClaw")
openclaw_report = self._openclaw_assess(file_contents, findings)
self.state["openclaw_report"] = openclaw_report
self._step_done(3, openclaw_report[:120] if openclaw_report else "OpenClaw 未回應)")
# Step 4 ─ ElephantAlpha orchestration
self._step_start(4, "ElephantAlpha 決策協調", "ElephantAlpha")
ea = self._ea_orchestrate(findings, openclaw_report)
self.state["ea_decision"] = ea
self._step_done(4, f"優先度={ea.get('priority','?')} auto_fix={ea.get('auto_fix',False)}")
# Step 5 ─ NemoTron dispatch
self._step_start(5, "NemoTron 行動派遣", "NemoTron")
dispatch = self._nemotron_dispatch(ea, findings)
self._step_done(5, f"寫入 {dispatch['actions']} 筆 action_plans自動修復={'' if dispatch['auto_fix'] else ''}")
# Persist + notify
self._save_to_db(findings, openclaw_report, ea)
self._notify_complete(findings, openclaw_report, ea)
self._finish("completed", "Pipeline 執行完成")
except Exception as e:
logger.error("[CodeReview] Pipeline 例外: %s", e, exc_info=True)
self._finish("error", str(e)[:200])
self._notify_error(str(e))
# ── Step 1讀取檔案 ───────────────────────────────────────────────────────
def _read_changed_files(self) -> Dict[str, str]:
project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
contents: Dict[str, str] = {}
for rel_path in self.changed_files[:8]: # 最多 8 個檔案
abs_path = os.path.join(project_root, rel_path)
try:
with open(abs_path, encoding="utf-8", errors="ignore") as fh:
raw = fh.read()
contents[rel_path] = raw[:6000] + ("\n... (截斷)" if len(raw) > 6000 else "")
except OSError:
logger.debug("[CodeReview] 無法讀取 %s(部署路徑不同?)", rel_path)
return contents
# ── Step 2Hermes 掃描 ───────────────────────────────────────────────────
def _hermes_scan(self, files: Dict[str, str]) -> List[Dict]:
"""走 GCP-A → GCP-B只有 CODE_REVIEW_ALLOW_111_FALLBACK=true 才落到 111。"""
try:
if not CODE_REVIEW_HERMES_LLM_SCAN_ENABLED:
findings = self._static_code_scan(files)
for f in findings:
sev = f.get("severity", "LOW").lower()
if sev in self.state["severity_summary"]:
self.state["severity_summary"][sev] += 1
self.state["findings"] = findings
self._sync_global()
return findings
compact_files = []
for name, content in list(files.items())[:CODE_REVIEW_HERMES_MAX_FILES]:
clipped = content[:CODE_REVIEW_HERMES_MAX_CHARS]
if len(content) > CODE_REVIEW_HERMES_MAX_CHARS:
clipped += "\n... (截斷,僅掃描前段重點)"
compact_files.append(f"### {name}\n```python\n{clipped}\n```")
files_text = "\n\n".join(compact_files)
prompt = f"""你是資深程式碼審查工程師,請掃描以下程式碼並列出所有問題。
{files_text}
輸出格式:純 JSON 陣列,每項包含以下欄位:
- severity: "CRITICAL" | "HIGH" | "MEDIUM" | "LOW"
- type: "bug" | "security" | "performance" | "maintainability"
- file: 檔案名稱
- line_hint: 約略行號或函式名稱
- description: 問題說明(繁體中文,精簡一句)
- suggestion: 修復建議(繁體中文,精簡一句)
只輸出 JSON 陣列,不含其他文字。無問題時輸出 []"""
from services.ollama_service import (
OLLAMA_HOST_FALLBACK,
OLLAMA_HOST_PRIMARY,
OLLAMA_HOST_SECONDARY,
OllamaService,
get_host_label,
get_provider_tag,
)
hermes_attempts = [
(
"primary_code_scan",
OLLAMA_HOST_PRIMARY,
CODE_REVIEW_HERMES_PRIMARY_MODEL,
CODE_REVIEW_HERMES_PRIMARY_TIMEOUT,
),
(
"secondary_fast_scan",
OLLAMA_HOST_SECONDARY,
CODE_REVIEW_HERMES_SECONDARY_MODEL,
CODE_REVIEW_HERMES_SECONDARY_TIMEOUT,
),
]
if CODE_REVIEW_ALLOW_111_FALLBACK:
hermes_attempts.append((
"lan_111_hermes_scan",
OLLAMA_HOST_FALLBACK,
CODE_REVIEW_HERMES_FALLBACK_MODEL,
CODE_REVIEW_HERMES_FALLBACK_TIMEOUT,
))
findings = None
last_error = None
for attempt_index, (attempt_key, host, model_name, timeout_s) in enumerate(
hermes_attempts,
start=1,
):
with log_ai_call(
caller='code_review_hermes',
provider=get_provider_tag(host),
model=model_name,
request_id=f"cr-{self.commit_sha[:8]}",
meta={'commit': self.commit_sha[:8], 'branch': self.branch,
'files': len(files), 'route': 'ollama_first',
'attempt': attempt_index,
'attempt_key': attempt_key,
'max_files': CODE_REVIEW_HERMES_MAX_FILES,
'max_chars': CODE_REVIEW_HERMES_MAX_CHARS,
'timeout_s': timeout_s},
) as _ctx:
ollama = OllamaService(host=host, model=model_name)
resp = ollama.generate(
prompt=prompt,
model=model_name,
temperature=0.1,
timeout=timeout_s,
keep_alive=CODE_REVIEW_OLLAMA_KEEP_ALIVE,
options={"num_predict": CODE_REVIEW_HERMES_NUM_PREDICT},
)
actual_host = resp.host or host
_ctx.set_provider(get_provider_tag(actual_host))
_ctx.set_model(resp.model or model_name)
_ctx.set_tokens(
input=resp.input_tokens,
output=resp.output_tokens,
)
_ctx.add_meta('host', actual_host)
_ctx.add_meta('host_label', get_host_label(actual_host))
if resp.model and resp.model != model_name:
_ctx.add_meta('requested_model', model_name)
if not resp.success:
last_error = resp.error or 'ollama generate failed'
_ctx.set_error(last_error)
continue
raw = (resp.content or "").strip()
match = re.search(r"\[.*\]", raw, re.DOTALL)
if not match:
last_error = f"missing JSON array: {raw[:120]}"
_ctx.set_error(last_error)
logger.warning("[CodeReview] Hermes 回應無 JSON: %s", raw[:200])
continue
try:
findings = json.loads(match.group())
except Exception as exc:
last_error = f"json parse failed: {type(exc).__name__}: {exc}"
_ctx.set_error(last_error)
continue
break
if findings is None:
logger.warning("[CodeReview] Hermes 本地掃描全部失敗: %s", last_error)
return []
for f in findings:
sev = f.get("severity", "LOW").lower()
if sev in self.state["severity_summary"]:
self.state["severity_summary"][sev] += 1
self.state["findings"] = findings
self._sync_global()
return findings
except Exception as e:
logger.warning("[CodeReview] Hermes 掃描失敗: %s", e)
return []
def _static_code_scan(self, files: Dict[str, str]) -> List[Dict]:
"""Hermes LLM 關閉時的快速 deterministic 掃描,避免部署後卡 Ollama timeout。"""
findings: List[Dict] = []
patterns = [
("HIGH", "security", r"\beval\s*\(", "使用 eval() 有安全風險", "改用安全 parser 或白名單映射"),
("HIGH", "security", r"\bexec\s*\(", "使用 exec() 有安全風險", "移除動態執行或改成明確函式"),
("MEDIUM", "maintainability", r"except\s+Exception\s*:\s*pass\b", "例外被靜默吞掉", "改用 logger.exception 並保留可診斷訊息"),
("MEDIUM", "maintainability", r"except\s*:\s*pass\b", "裸 except 被靜默吞掉", "指定例外類型並記錄錯誤"),
]
request_pattern = re.compile(r"requests\.(get|post|put|delete|patch)\s*\(")
secret_pattern = re.compile(
r"(api[_-]?key|token|password|secret)\s*=\s*['\"][^'\"]{12,}['\"]",
re.IGNORECASE,
)
for path, content in files.items():
for line_no, line in enumerate(content.splitlines(), start=1):
stripped = line.strip()
if request_pattern.search(stripped) and "timeout=" not in stripped:
findings.append({
"severity": "MEDIUM",
"type": "performance",
"file": path,
"line_hint": str(line_no),
"description": "HTTP request 未設定 timeout可能拖住 worker",
"suggestion": "加入明確 timeout 並處理例外",
})
if secret_pattern.search(stripped) and "os.getenv" not in stripped:
findings.append({
"severity": "HIGH",
"type": "security",
"file": path,
"line_hint": str(line_no),
"description": "疑似硬編碼敏感字串",
"suggestion": "改用環境變數或 secret store",
})
for severity, issue_type, pattern, description, suggestion in patterns:
if re.search(pattern, stripped):
findings.append({
"severity": severity,
"type": issue_type,
"file": path,
"line_hint": str(line_no),
"description": description,
"suggestion": suggestion,
})
if len(findings) >= 8:
return findings[:8]
return findings
# ── Step 3OpenClaw 評估 ──────────────────────────────────────────────────
def _openclaw_assess(self, files: Dict[str, str], findings: List[Dict]) -> str:
"""
路由優先序:
L1 (預設) → Ollama GCP-A → GCP-B
L1b (flag CODE_REVIEW_ALLOW_111_FALLBACK=true) → 111 最後備援
L2 (flag CODE_REVIEW_USE_CLAUDE=true) → Claude Opus 4.7 雲端備援
L3 (Gemini guard 顯式解鎖) → Gemini 雲端備援
L4 (雲端未顯式開啟時) → deterministic 本地降級摘要
L5 (雲端顯式開啟但失敗時) → ElephantAlpha via NIM/OpenRouter
"""
sev = self.state["severity_summary"]
findings_json = json.dumps(findings[:8], ensure_ascii=False, indent=2)
files_list = "\n".join(f"- {k} ({len(v)} 字元)" for k, v in list(files.items())[:5])
system = (
"你是 OpenClaw 程式碼品質戰略分析師,以技術主管視角評估部署後程式碼。"
"語言:繁體中文。風格:精準、數據導向、可執行建議。"
)
user_prompt = f"""【部署】Commit {self.commit_sha[:8]} @ {self.branch}
【變更檔案】
{files_list}
【Hermes 掃描摘要】CRITICAL={sev['critical']} HIGH={sev['high']} MEDIUM={sev['medium']} LOW={sev['low']}
【問題明細】
{findings_json}
請產出程式碼品質評估(使用 HTML <b> 標題150字以內
<b>🔍 整體風險等級</b>(一句理由)
<b>⚠️ 最需關注問題</b>TOP 2
<b>💡 架構優化方向</b>1條長期建議
<b>✅ 本次部署亮點</b>"""
# ── L1Ollama-first — GCP-A → GCP-B111 需顯式開 flag──────────────
from services.ollama_service import (
OLLAMA_HOST_FALLBACK,
OLLAMA_HOST_PRIMARY,
OLLAMA_HOST_SECONDARY,
OllamaService,
get_host_label,
get_provider_tag,
)
gemini_api_key = get_gemini_api_key("code_review")
cloud_fallback_available = CODE_REVIEW_USE_CLAUDE or bool(gemini_api_key)
fallback_caller = 'code_review_openclaw' if CODE_REVIEW_USE_CLAUDE else (
'code_review_openclaw_gemini' if gemini_api_key else 'code_review_elephant'
)
if not cloud_fallback_available:
fallback_caller = 'code_review_local_degraded'
ollama_attempts = [
(
"primary_code",
OLLAMA_HOST_PRIMARY,
CODE_REVIEW_OLLAMA_MODEL,
CODE_REVIEW_OLLAMA_TIMEOUT,
),
(
"secondary_fast",
OLLAMA_HOST_SECONDARY,
CODE_REVIEW_OLLAMA_SECONDARY_MODEL,
CODE_REVIEW_OLLAMA_SECONDARY_TIMEOUT,
),
]
if CODE_REVIEW_ALLOW_111_FALLBACK:
ollama_attempts.append((
"lan_111_hermes",
OLLAMA_HOST_FALLBACK,
CODE_REVIEW_OLLAMA_FALLBACK_MODEL,
CODE_REVIEW_OLLAMA_FALLBACK_TIMEOUT,
))
last_ollama_error = None
for attempt_index, (attempt_key, host, model_name, timeout_s) in enumerate(
ollama_attempts,
start=1,
):
with log_ai_call(
caller='code_review_openclaw',
provider=get_provider_tag(host),
model=model_name,
request_id=f"cr-{self.commit_sha[:8]}",
meta={
'commit': self.commit_sha[:8],
'branch': self.branch,
'route': 'ollama_first',
'attempt': attempt_index,
'attempt_key': attempt_key,
'timeout_s': timeout_s,
},
) as _ctx:
ollama = OllamaService(host=host, model=model_name)
resp = ollama.generate(
prompt=user_prompt,
system_prompt=system,
model=model_name,
temperature=0.2,
timeout=timeout_s,
keep_alive=CODE_REVIEW_OLLAMA_KEEP_ALIVE,
options={"num_predict": CODE_REVIEW_OLLAMA_NUM_PREDICT},
)
actual_host = resp.host or host
_ctx.set_provider(get_provider_tag(actual_host))
_ctx.set_model(resp.model or model_name)
_ctx.set_tokens(input=resp.input_tokens, output=resp.output_tokens)
_ctx.add_meta('host', actual_host)
_ctx.add_meta('host_label', get_host_label(actual_host))
if resp.model and resp.model != model_name:
_ctx.add_meta('requested_model', model_name)
if resp.success and (resp.content or '').strip():
return resp.content or ""
last_ollama_error = resp.error or 'ollama generate failed'
_ctx.set_error(last_ollama_error)
if attempt_index == len(ollama_attempts):
_ctx.fallback_to_caller(fallback_caller)
logger.warning(
"[CodeReview] OpenClaw 本地 Ollama 鏈全部失敗: %s",
last_ollama_error,
)
if not cloud_fallback_available:
logger.warning(
"[CodeReview] 111/cloud fallback 未啟用,回傳 deterministic 本地降級評估",
)
return self._build_local_openclaw_degraded_report(
files=files,
findings=findings,
last_error=last_ollama_error,
)
# ── L1Phase 7 Frontier — Claude Opus 4.7(程式碼能力 #1────────────
# feature flag 預設 OFFON 時只作 Ollama 失敗後的雲端備援。
if CODE_REVIEW_USE_CLAUDE:
try:
from services.anthropic_service import anthropic_service
except Exception as e:
logger.warning("[CodeReview] Claude service import 失敗,改走 Gemini 備援: %s", e)
anthropic_service = None # type: ignore
if anthropic_service is not None and anthropic_service.is_available():
with log_ai_call(
caller='code_review_openclaw',
provider='claude',
model=CLAUDE_REVIEW_MODEL,
request_id=f"cr-{self.commit_sha[:8]}",
meta={
'commit': self.commit_sha[:8],
'branch': self.branch,
'flag': 'CODE_REVIEW_USE_CLAUDE',
},
) as _ctx:
resp = anthropic_service.generate(
prompt=user_prompt,
system_prompt=system, # ephemeral cache5 分鐘 TTL省 ~90% 成本)
model=CLAUDE_REVIEW_MODEL,
max_tokens=2048,
temperature=0.2, # code review 要精確
cache_system=True,
timeout=120,
)
if resp.success:
_ctx.set_tokens(input=resp.input_tokens, output=resp.output_tokens)
_ctx.set_cache_hit(resp.cache_hit)
_ctx.add_meta('cache_creation_tokens', resp.cache_creation_tokens)
_ctx.add_meta('cache_read_tokens', resp.cache_read_tokens)
return resp.content or ""
# Claude 失敗 → fallback 到 Gemini 備援L3
_ctx.set_error(resp.error or 'claude generate failed')
_ctx.fallback_to_caller('code_review_openclaw_gemini')
logger.warning(
"[CodeReview] Claude 失敗,改走 Gemini 備援: %s", resp.error,
)
else:
logger.info(
"[CodeReview] CODE_REVIEW_USE_CLAUDE=true 但 Claude 不可用(缺 API key 或 SDK改走下一層備援",
)
# ── L3Gemini — 僅作 Ollama/Claude 都失敗後的備援 ───────────────────
if gemini_api_key:
with log_ai_call(
caller='code_review_openclaw_gemini',
provider='gemini',
model=REVIEW_MODEL,
request_id=f"cr-{self.commit_sha[:8]}",
meta={
'commit': self.commit_sha[:8],
'branch': self.branch,
'fallback_from': 'code_review_openclaw_ollama',
},
) as _ctx:
try:
import google.generativeai as genai
genai.configure(api_key=gemini_api_key)
model = genai.GenerativeModel(
model_name=REVIEW_MODEL,
generation_config=genai.types.GenerationConfig(
temperature=0.3, max_output_tokens=1500,
),
system_instruction=system,
)
resp = model.generate_content(user_prompt, request_options={"timeout": 90})
try:
usage = getattr(resp, 'usage_metadata', None)
if usage is not None:
_ctx.set_tokens(
input=getattr(usage, 'prompt_token_count', 0) or 0,
output=getattr(usage, 'candidates_token_count', 0) or 0,
)
except Exception:
logger.debug("[CodeReview] Gemini usage metadata parse failed", exc_info=True)
return resp.text or ""
except Exception as e:
logger.warning("[CodeReview] OpenClaw Gemini 失敗,降級 ElephantAlpha: %s", e)
_ctx.set_error(f"{type(e).__name__}: {e}")
_ctx.fallback_to_caller('code_review_elephant')
else:
logger.info("[CodeReview] 跳過 Gemini 備援:%s", gemini_disabled_message("code_review"))
# 降級ElephantAlpha via OpenRouterOPENROUTER_API_KEY 容器內一定有)
# Phase 1 v5.0 logger 追蹤
with log_ai_call(
caller='code_review_elephant',
provider='nim_via_elephant',
model='nvidia/llama-3.3-nemotron-super-49b-v1.5',
request_id=f"cr-{self.commit_sha[:8]}",
meta={'commit': self.commit_sha[:8], 'branch': self.branch},
) as _ctx:
try:
from services.elephant_service import elephant_service
resp = elephant_service.generate(
prompt=user_prompt,
system_prompt=system,
temperature=0.3,
timeout=90,
)
if resp.success:
# ElephantResponse 已含 input_tokens/output_tokens
_ctx.set_tokens(
input=getattr(resp, 'input_tokens', 0) or 0,
output=getattr(resp, 'output_tokens', 0) or 0,
)
return resp.content or ""
else:
_ctx.set_error(getattr(resp, 'error', 'elephant generate failed'))
except Exception as e:
logger.warning("[CodeReview] OpenClaw ElephantAlpha 降級也失敗: %s", e)
_ctx.set_error(f"{type(e).__name__}: {e}")
return ""
def _build_local_openclaw_degraded_report(
self,
*,
files: Dict[str, str],
findings: List[Dict],
last_error: Optional[str],
) -> str:
sev = self.state["severity_summary"]
risk = ""
if sev["critical"]:
risk = "嚴重"
elif sev["high"]:
risk = ""
elif sev["medium"]:
risk = ""
top = [f for f in findings[:2] if f.get("description")]
top_text = "".join(
f"{f.get('file', 'unknown')}{f.get('description')}" for f in top
) or "本次 deterministic scan 未發現高風險問題"
error_text = (last_error or "local Ollama unavailable")[:120]
return (
f"<b>🔍 整體風險等級</b> {risk}"
f"本地掃描完成 {len(files)}CRITICAL={sev['critical']}、HIGH={sev['high']}"
f"MEDIUM={sev['medium']}、LOW={sev['low']}"
f"<b>⚠️ 最需關注問題</b> {top_text}"
f"<b>💡 架構優化方向</b> GCP-A/GCP-B OpenClaw 不可用時暫停 111 重分析,避免拖高 fallback 主機負載。"
f"<b>✅ 本次部署亮點</b> 已以本地降級報告收斂,未呼叫 Gemini最後錯誤{error_text}"
)
# ── Step 4ElephantAlpha 決策 ─────────────────────────────────────────────
def _ea_orchestrate(self, findings: List[Dict], openclaw_report: str) -> Dict:
sev = self.state["severity_summary"]
critical_n = sev["critical"]
high_n = sev["high"]
# 嘗試呼叫 ElephantAlpha 做精細判斷
try:
from services.elephant_service import elephant_service
top3 = json.dumps(
[f for f in findings if f.get("severity") in ("CRITICAL", "HIGH")][:3],
ensure_ascii=False,
)
prompt = f"""你是 Elephant Alpha負責協調 Code Review 後的修復決策。
【部署】commit={self.commit_sha[:8]} branch={self.branch}
【問題統計】CRITICAL={critical_n} HIGH={high_n} MEDIUM={sev['medium']} LOW={sev['low']}
【Top 問題】{top3}
【OpenClaw評估摘要】{openclaw_report[:300]}
請以 JSON 回答(不含其他文字):
{{
"priority": "critical|high|medium|low",
"auto_fix": true|false,
"reasoning": "決策理由(繁體中文,一句話,需含具體數字)",
"fix_files": ["需自動修復的檔案最多5個所有有問題的檔案"],
"human_review_needed": false
}}
規則(依 ADR-020 全自動修復政策,覆寫 ADR-012 L3 HITL 對 code review 的限制):
- 任何 finding不論 CRITICAL/HIGH/MEDIUM/LOW→ auto_fix=truehuman_review_needed=false
- 安全網是 Git revert + Gitea CI/CD 回滾,不依賴人工審查門檻
- priority 按最嚴重 severity 決定CRITICAL>HIGH>MEDIUM>LOW
- fix_files 填入所有有問題的檔案(最多 5 個AiderHeal 端會再限流)"""
resp = elephant_service.generate(
prompt=prompt,
json_mode=True,
temperature=0.1,
timeout=60,
)
if resp.success:
return self._guard_ea_decision(json.loads(resp.content), findings)
except Exception as e:
logger.warning("[CodeReview] ElephantAlpha 決策失敗,回退規則: %s", e)
# 規則 fallbackADR-020 全自動修復政策。任何 finding 一律 auto_fix=true。
has_findings = len(findings) > 0
priority = (
"critical" if critical_n > 0 else
"high" if high_n > 0 else
"medium" if sev["medium"] > 0 else
"low" if sev["low"] > 0 else "low"
)
auto_fix = bool(has_findings and AUTO_FIX_ENABLED)
fix_files = list({
f.get("file", "") for f in findings if f.get("file")
})[:5]
return {
"priority": priority,
"auto_fix": auto_fix,
"reasoning": f"ADR-020 全自動修復CRITICAL={critical_n} HIGH={high_n} MEDIUM={sev['medium']} LOW={sev['low']}"
+ ("觸發 AiderHeal 自動修復Git+CI/CD 為回滾安全網)" if auto_fix else "無 finding無需修復"),
"fix_files": fix_files,
"human_review_needed": False,
}
def _guard_ea_decision(self, decision: Dict, findings: List[Dict]) -> Dict:
"""ADR-020 全自動修復政策:有 finding 一律 auto_fix=true僅受 AUTO_FIX_ENABLED 主開關控制。"""
sev = self.state["severity_summary"]
priority = (decision.get("priority") or "").lower() or (
"critical" if sev["critical"] > 0 else
"high" if sev["high"] > 0 else
"medium" if sev["medium"] > 0 else
"low"
)
has_findings = bool(findings)
allowed_auto_fix = bool(has_findings and AUTO_FIX_ENABLED)
if has_findings and not AUTO_FIX_ENABLED:
logger.warning(
"[CodeReview] auto_fix 被 CODE_REVIEW_AUTO_FIX_ENABLED=false 主開關擋下 priority=%s",
priority,
)
decision["priority"] = priority
decision["auto_fix"] = allowed_auto_fix
decision["human_review_needed"] = False
decision["reasoning"] = (
f"{decision.get('reasoning', '')} "
f"[ADR-020 全自動修復: auto_fix={'enabled' if allowed_auto_fix else 'flag_disabled'}, priority={priority}]"
).strip()
return decision
# ── Step 5NemoTron 派遣 ──────────────────────────────────────────────────
def _nemotron_dispatch(self, ea: Dict, findings: List[Dict]) -> Dict:
auto_fix = ea.get("auto_fix", False)
fix_files = ea.get("fix_files", [])
allowed_fix_files = _aider_allowed_fix_files(fix_files)
priority_map = {"critical": 1, "high": 2, "medium": 3, "low": 4}
priority_num = priority_map.get(ea.get("priority", "low"), 4)
actions_created = 0
session = get_session()
try:
# 每個需修復的檔案建立一筆 action_plan
for fpath in fix_files[:3]:
if active_code_review_action_exists(session, fpath):
logger.info("[CodeReview] skip duplicate active action_plan file=%s", fpath)
continue
related = [f for f in findings if f.get("file") == fpath][:3]
desc = f"Code Review 修復:{fpath}{', '.join(f.get('description','')[:40] for f in related)}"
session.execute(text("""
INSERT INTO action_plans
(action_type, description, status, priority, metadata_json, created_at)
VALUES
('code_review_fix', :desc, :status, :priority, :meta, NOW())
"""), {
"desc": desc[:500],
"status": (
"auto_pending"
if auto_fix and fpath in allowed_fix_files
else "auto_skipped_whitelist"
if auto_fix
else "auto_disabled"
),
"priority": priority_num,
"meta": json.dumps({
"pipeline_id": self.pipeline_id,
"commit_sha": self.commit_sha,
"file": fpath,
"auto_fix": auto_fix,
"aider_auto_fix_allowed": fpath in allowed_fix_files,
"ea_priority": ea.get("priority"),
"findings": related,
}, ensure_ascii=False),
})
actions_created += 1
session.commit()
except Exception as e:
logger.warning("[CodeReview] action_plans 寫入失敗: %s", e)
session.rollback()
finally:
session.close()
# 觸發 AiderHeal非阻塞
if auto_fix and allowed_fix_files:
self.state["auto_fix_triggered"] = True
self._sync_global()
self._trigger_aider_heal(findings, allowed_fix_files)
elif auto_fix and fix_files:
self.state["auto_fix_skipped_whitelist"] = True
self._sync_global()
logger.info("[CodeReview] AiderHeal skipped: no files matched ADR-020 whitelist files=%s", fix_files)
return {
"actions": actions_created,
"auto_fix": auto_fix,
"aider_fix_files": allowed_fix_files,
}
def _trigger_aider_heal(self, findings: List[Dict], fix_files: List[str]):
"""非阻塞觸發 AiderHeal 自動修復"""
def _heal_worker():
try:
from services.aider_heal_executor import execute_code_fix
for fpath in fix_files[:2]: # 最多同時修 2 個檔案
related = [f for f in findings if f.get("file") == fpath]
if not related:
continue
worst = sorted(related, key=lambda x: {"CRITICAL":0,"HIGH":1,"MEDIUM":2,"LOW":3}.get(x.get("severity","LOW"),3))[0]
result = execute_code_fix(
error_type=f"code_review_{worst.get('type','bug')}",
error_message=worst.get("description", "Code Review 發現問題"),
target_file=fpath,
context={"suggestion": worst.get("suggestion", ""), "pipeline_id": self.pipeline_id},
)
logger.info("[CodeReview] AiderHeal %s%s", fpath, result.get("message", ""))
except Exception as e:
logger.error("[CodeReview] AiderHeal 觸發失敗: %s", e)
t = threading.Thread(target=_heal_worker, daemon=True, name=f"aider-heal-{self.commit_sha[:8]}")
t.start()
# ── DB 持久化 ──────────────────────────────────────────────────────────────
def _save_to_db(self, findings: List[Dict], openclaw_report: str, ea: Dict):
session = get_session()
try:
row = session.execute(text("""
INSERT INTO ai_insights
(insight_type, content, confidence, created_by,
status, metadata_json, period, created_at)
VALUES
('code_review_result', :content, :conf, 'code_review_pipeline',
'active', :meta, :period, NOW())
RETURNING id
"""), {
"content": json.dumps({
"findings": findings,
"openclaw_report": openclaw_report,
"ea_decision": ea,
"severity_summary": self.state["severity_summary"],
}, ensure_ascii=False)[:8000],
"conf": 0.90,
"meta": json.dumps({
"pipeline_id": self.pipeline_id,
"commit_sha": self.commit_sha,
"branch": self.branch,
"changed_files": self.changed_files,
"auto_fix_triggered": self.state["auto_fix_triggered"],
}, ensure_ascii=False),
"period": datetime.now().strftime("%Y-%m-%d"),
}).fetchone()
session.commit()
if row:
try:
from services.openclaw_learning_service import enqueue_insight_embedding
enqueue_insight_embedding(row[0], "code_review_result", json.dumps({
"findings": findings,
"openclaw_report": openclaw_report,
"ea_decision": ea,
}, ensure_ascii=False), datetime.now().strftime("%Y-%m-%d"))
except Exception as embed_err:
logger.warning("[CodeReview] embedding queue enqueue failed: %s", embed_err)
logger.info("[CodeReview] ai_insights 寫入成功 pipeline=%s", self.pipeline_id)
except Exception as e:
logger.error("[CodeReview] DB 寫入失敗: %s", e)
session.rollback()
finally:
session.close()
# ── Telegram 通知 ─────────────────────────────────────────────────────────
def _notify_start(self):
try:
from services.telegram_templates import _send_telegram_raw
files_list = "\n".join(f"{f}" for f in self.changed_files[:5])
if len(self.changed_files) > 5:
files_list += f"\n +{len(self.changed_files)-5} 個)"
_send_telegram_raw(
f"🔍 <b>Code Review 啟動</b>\n"
f"══════════════════════════\n"
f"📦 Commit <code>{self.commit_sha[:8]}</code> 🌿 {self.branch}\n"
f"📝 變更檔案:\n{files_list}\n"
f"══════════════════════════\n"
f"🤖 <i>Hermes → OpenClaw → Elephant Alpha → NemoTron</i>\n"
f"📊 即時進度https://mo.wooo.work/code-review/"
)
except Exception as e:
logger.warning("[CodeReview] 啟動通知失敗: %s", e)
def _notify_complete(self, findings: List[Dict], openclaw_report: str, ea: Dict):
try:
from services.telegram_templates import _send_telegram_raw
sev = self.state["severity_summary"]
priority = ea.get("priority", "medium")
auto_fix = ea.get("auto_fix", False)
fix_files = ea.get("fix_files", [])
allowed_fix_files = _aider_allowed_fix_files(fix_files)
icon = {"critical": "🔴", "high": "🟠", "medium": "🟡", "low": "🟢"}.get(priority, "🟡")
top_issues = [f for f in findings if f.get("severity") in ("CRITICAL", "HIGH")][:3]
issues_lines = "\n".join(
f" {'🔴' if f['severity']=='CRITICAL' else '🟠'} [{f['severity']}] {f.get('description','')} — <i>{f.get('file','')}</i>"
for f in top_issues
) or " ✅ 無高風險問題"
msg = (
f"{icon} <b>Code Review 完成</b> · <code>{self.commit_sha[:8]}</code>\n"
f"══════════════════════════\n"
f"🔴 CRITICAL <b>{sev['critical']}</b> "
f"🟠 HIGH <b>{sev['high']}</b> "
f"🟡 MEDIUM <b>{sev['medium']}</b> "
f"🟢 LOW <b>{sev['low']}</b>\n"
f"══════════════════════════\n"
f"<b>⚠️ 主要問題</b>\n{issues_lines}\n"
)
if openclaw_report:
msg += f"\n{openclaw_report[:400]}\n"
if auto_fix and allowed_fix_files:
fix_status = "🔧 已觸發自動修復AiderHeal"
elif auto_fix and fix_files:
fix_status = "⚠️ 不在自動修復白名單,需人工處理"
elif sev['critical'] + sev['high'] + sev['medium'] + sev['low'] == 0:
fix_status = "✅ 無需修復動作"
else:
fix_status = "🛑 自動修復主開關關閉CODE_REVIEW_AUTO_FIX_ENABLED=false"
msg += (
f"══════════════════════════\n"
f"🤖 Elephant Alpha<b>{priority.upper()}</b> {fix_status}\n"
f"📊 完整報告https://mo.wooo.work/code-review/"
)
_send_telegram_raw(msg)
except Exception as e:
logger.warning("[CodeReview] 完成通知失敗: %s", e)
def _notify_error(self, error: str):
try:
from services.telegram_templates import _send_telegram_raw
_send_telegram_raw(
f"🚨 <b>Code Review Pipeline 失敗</b>\n"
f"Commit<code>{self.commit_sha[:8]}</code> @ {self.branch}\n"
f"錯誤:{error[:200]}\n"
f"📊 查看https://mo.wooo.work/code-review/"
)
except Exception as exc:
logger.warning("[CodeReview] 失敗通知發送失敗: %s", exc, exc_info=True)
# ═══════════════════════════════════════════════════════════════════════════════
# 公開 API
# ═══════════════════════════════════════════════════════════════════════════════
def trigger_post_deploy_review(
commit_sha: str,
changed_files: List[str],
branch: str = "main",
deploy_type: str = "sync",
) -> str:
"""
啟動 Pipeline後台 daemon thread
回傳 pipeline_id。由 routes/code_review_routes.py 的 webhook 端點呼叫。
"""
pipeline = CodeReviewPipeline(commit_sha, changed_files, branch, deploy_type)
t = threading.Thread(
target=pipeline.run,
daemon=True,
name=f"code-review-{commit_sha[:8]}",
)
t.start()
logger.info("[CodeReview] 已派發 pipeline=%s files=%d", pipeline.pipeline_id, len(pipeline.changed_files))
return pipeline.pipeline_id
def get_current_state() -> Dict[str, Any]:
"""前端 polling 用:取得目前 pipeline 即時狀態"""
with _pipeline_lock:
return dict(_current_pipeline)
def get_history(limit: int = 20) -> List[Dict]:
"""取得 ai_insights 中歷史 code_review_result 記錄"""
session = get_session()
try:
rows = session.execute(text("""
SELECT id, content, confidence, metadata_json, created_at, status
FROM ai_insights
WHERE insight_type = 'code_review_result'
ORDER BY created_at DESC
LIMIT :lim
"""), {"lim": limit}).fetchall()
results = []
for r in rows:
meta, content = {}, {}
try:
meta = json.loads(r[3]) if r[3] else {}
content = json.loads(r[1]) if r[1] else {}
except Exception:
logger.debug("[CodeReview] history row JSON decode failed id=%s", r[0], exc_info=True)
sev = content.get("severity_summary", {})
results.append({
"id": r[0],
"pipeline_id": meta.get("pipeline_id", ""),
"commit_sha": meta.get("commit_sha", "")[:8],
"branch": meta.get("branch", ""),
"changed_files": meta.get("changed_files", []),
"severity_summary": sev,
"total_issues": sum(sev.values()),
"auto_fix": meta.get("auto_fix_triggered", False),
"findings": content.get("findings", []),
"openclaw_report": content.get("openclaw_report", ""),
"ea_decision": content.get("ea_decision", {}),
"created_at": r[4].isoformat() if r[4] else "",
"status": r[5] or "active",
})
return results
except Exception as e:
logger.warning("[CodeReview] 歷史讀取失敗: %s", e)
return []
finally:
session.close()
def verify_internal_token(request_token: str) -> bool:
"""驗證 CD webhook 來源 token。Production 預設必填,避免外部觸發 auto-review/fix 鏈。"""
if not INTERNAL_TOKEN:
if ALLOW_INSECURE_WEBHOOK:
logger.warning("[CodeReview] INTERNAL_WEBHOOK_TOKEN 未設定,僅因 dev override 放行")
return True
logger.error("[CodeReview] INTERNAL_WEBHOOK_TOKEN 未設定,拒絕 webhook")
return False
return request_token == INTERNAL_TOKEN