1243 lines
58 KiB
Python
1243 lines
58 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
services/code_review_pipeline_service.py
|
||
Post-Deploy AI Agent Code Review Pipeline
|
||
|
||
觸發時機:CD 健康檢查通過後,由 Gitea Action webhook 呼叫
|
||
Pipeline:
|
||
Step 1 system 讀取變更檔案內容
|
||
Step 2 Hermes 程式碼掃描(bugs / security / performance)
|
||
Step 3 OpenClaw 架構品質評估(Ollama-first,Gemini 僅備援)
|
||
Step 4 ElephantAlpha 決策協調(severity 判定 + auto-fix 裁量)
|
||
Step 5 NemoTron 行動派遣(action_plans 寫入 + AiderHeal 觸發)
|
||
|
||
結果輸出:
|
||
- ai_insights(type='code_review_result')
|
||
- action_plans(type='code_review_fix')
|
||
- Telegram 告警(啟動 / 完成 / 錯誤)
|
||
- 前端 /code-review/ 即時 polling 狀態
|
||
"""
|
||
|
||
import json
|
||
import logging
|
||
import os
|
||
import re
|
||
import threading
|
||
from datetime import datetime
|
||
from typing import Any, Dict, List, Optional
|
||
|
||
import requests
|
||
from database.manager import get_session
|
||
from sqlalchemy import text
|
||
# ADR-027:Code Review 走 OllamaService 取得三主機級聯 retry。
|
||
from services.hermes_analyst_service import HERMES_MODEL as _HERMES_MODEL
|
||
from services.ai_call_logger import log_ai_call # Operation Ollama-First v5.0 P1
|
||
from services.action_plan_dedupe import active_code_review_action_exists
|
||
from services.gemini_guard import gemini_disabled_message, get_gemini_api_key
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
def _env_bool(name: str, default: str = "false") -> bool:
|
||
return os.getenv(name, default).strip().lower() in {"1", "true", "yes", "on"}
|
||
|
||
|
||
def _env_float(name: str, default: str) -> float:
|
||
try:
|
||
return float(os.getenv(name, default))
|
||
except (TypeError, ValueError):
|
||
return float(default)
|
||
|
||
|
||
# ── Pipeline 全域狀態(供前端 polling)─────────────────────────────────────
|
||
_current_pipeline: Dict[str, Any] = {}
|
||
_pipeline_lock = threading.Lock()
|
||
|
||
# Gemini 僅作 Code Review Ollama/Claude 主路徑都失敗後的最後雲端備援。
|
||
REVIEW_MODEL = os.getenv("OPENCLAW_MODEL", "gemini-2.5-flash")
|
||
CODE_REVIEW_OLLAMA_MODEL = os.getenv(
|
||
"CODE_REVIEW_OLLAMA_MODEL",
|
||
os.getenv("OPENCLAW_OLLAMA_MODEL", "qwen2.5-coder:7b"),
|
||
)
|
||
CODE_REVIEW_OLLAMA_TIMEOUT = int(os.getenv("CODE_REVIEW_OLLAMA_TIMEOUT", "15"))
|
||
CODE_REVIEW_OLLAMA_SECONDARY_MODEL = os.getenv(
|
||
"CODE_REVIEW_OLLAMA_SECONDARY_MODEL",
|
||
"gemma3:4b",
|
||
)
|
||
CODE_REVIEW_OLLAMA_SECONDARY_TIMEOUT = int(
|
||
os.getenv("CODE_REVIEW_OLLAMA_SECONDARY_TIMEOUT", "25")
|
||
)
|
||
CODE_REVIEW_OLLAMA_FALLBACK_MODEL = os.getenv(
|
||
"CODE_REVIEW_OLLAMA_FALLBACK_MODEL",
|
||
_HERMES_MODEL,
|
||
)
|
||
CODE_REVIEW_OLLAMA_FALLBACK_TIMEOUT = int(
|
||
os.getenv("CODE_REVIEW_OLLAMA_FALLBACK_TIMEOUT", "20")
|
||
)
|
||
CODE_REVIEW_OLLAMA_NUM_PREDICT = int(os.getenv("CODE_REVIEW_OLLAMA_NUM_PREDICT", "384"))
|
||
CODE_REVIEW_OLLAMA_KEEP_ALIVE = os.getenv("CODE_REVIEW_OLLAMA_KEEP_ALIVE", "5m")
|
||
CODE_REVIEW_ALLOW_111_FALLBACK = _env_bool("CODE_REVIEW_ALLOW_111_FALLBACK", "false")
|
||
CODE_REVIEW_OLLAMA_HOST_PREFLIGHT_ENABLED = _env_bool(
|
||
"CODE_REVIEW_OLLAMA_HOST_PREFLIGHT_ENABLED",
|
||
"true",
|
||
)
|
||
CODE_REVIEW_OLLAMA_HOST_PREFLIGHT_TIMEOUT = _env_float(
|
||
"CODE_REVIEW_OLLAMA_HOST_PREFLIGHT_TIMEOUT",
|
||
"2.5",
|
||
)
|
||
CODE_REVIEW_HERMES_TIMEOUT = int(os.getenv("CODE_REVIEW_HERMES_TIMEOUT", "35"))
|
||
CODE_REVIEW_HERMES_PRIMARY_MODEL = os.getenv(
|
||
"CODE_REVIEW_HERMES_PRIMARY_MODEL",
|
||
CODE_REVIEW_OLLAMA_MODEL,
|
||
)
|
||
CODE_REVIEW_HERMES_SECONDARY_MODEL = os.getenv(
|
||
"CODE_REVIEW_HERMES_SECONDARY_MODEL",
|
||
CODE_REVIEW_OLLAMA_SECONDARY_MODEL,
|
||
)
|
||
CODE_REVIEW_HERMES_FALLBACK_MODEL = os.getenv(
|
||
"CODE_REVIEW_HERMES_FALLBACK_MODEL",
|
||
CODE_REVIEW_OLLAMA_FALLBACK_MODEL,
|
||
)
|
||
CODE_REVIEW_HERMES_PRIMARY_TIMEOUT = int(
|
||
os.getenv("CODE_REVIEW_HERMES_PRIMARY_TIMEOUT", os.getenv("CODE_REVIEW_HERMES_TIMEOUT", "15"))
|
||
)
|
||
CODE_REVIEW_HERMES_SECONDARY_TIMEOUT = int(os.getenv("CODE_REVIEW_HERMES_SECONDARY_TIMEOUT", "45"))
|
||
CODE_REVIEW_HERMES_FALLBACK_TIMEOUT = int(os.getenv("CODE_REVIEW_HERMES_FALLBACK_TIMEOUT", "20"))
|
||
CODE_REVIEW_HERMES_NUM_PREDICT = int(os.getenv("CODE_REVIEW_HERMES_NUM_PREDICT", "384"))
|
||
CODE_REVIEW_HERMES_MAX_FILES = int(os.getenv("CODE_REVIEW_HERMES_MAX_FILES", "2"))
|
||
CODE_REVIEW_HERMES_MAX_CHARS = int(os.getenv("CODE_REVIEW_HERMES_MAX_CHARS", "900"))
|
||
CODE_REVIEW_HERMES_LLM_SCAN_ENABLED = (
|
||
os.getenv("CODE_REVIEW_HERMES_LLM_SCAN_ENABLED", "false").lower() == "true"
|
||
)
|
||
INTERNAL_TOKEN = os.getenv("INTERNAL_WEBHOOK_TOKEN", "")
|
||
AUTO_FIX_ENABLED = os.getenv("CODE_REVIEW_AUTO_FIX_ENABLED", "true").lower() == "true"
|
||
ALLOW_INSECURE_WEBHOOK = os.getenv("MOMO_ALLOW_INSECURE_INTERNAL_WEBHOOK_FOR_DEV", "").lower() == "true"
|
||
AIDER_AUTO_FIX_FILE_PATTERN = re.compile(
|
||
r"^(services|routes|database)/(?:[a-zA-Z0-9_]+/)*[a-zA-Z0-9_]+\.py$"
|
||
)
|
||
_OPENCLAW_RAW_ERROR_RE = re.compile(r"[;;]?\s*最後錯誤:[^<\n\r]+")
|
||
|
||
# Phase 7 Frontier 升級 feature flag — 預設 OFF;啟用後只作 Ollama 失敗後的雲端備援。
|
||
CODE_REVIEW_USE_CLAUDE = os.getenv("CODE_REVIEW_USE_CLAUDE", "false").lower() == "true"
|
||
CLAUDE_REVIEW_MODEL = os.getenv("CLAUDE_MODEL", "claude-opus-4-7")
|
||
|
||
|
||
def _aider_allowed_fix_files(files: List[str]) -> List[str]:
|
||
"""回傳 ADR-020 允許交給 AiderHeal 自動修復的檔案。"""
|
||
return [f for f in files if AIDER_AUTO_FIX_FILE_PATTERN.match(f or "")]
|
||
|
||
|
||
def _public_openclaw_report(report: Any) -> str:
|
||
"""Hide model timeout internals from post-deploy reports shown to operators."""
|
||
text = str(report or "")
|
||
if not text:
|
||
return ""
|
||
text = _OPENCLAW_RAW_ERROR_RE.sub(";AI 延伸分析暫時略過,已以本地掃描完成部署後檢查", text)
|
||
text = text.replace("本地降級報告", "本地掃描報告")
|
||
text = text.replace("deterministic scan", "本地掃描")
|
||
text = text.replace("OpenClaw", "AI 架構檢查")
|
||
return text
|
||
|
||
|
||
def _code_review_ollama_host_reachable(host: str) -> bool:
|
||
"""Short-circuit dead GCP Ollama hosts before a generate timeout."""
|
||
if not CODE_REVIEW_OLLAMA_HOST_PREFLIGHT_ENABLED:
|
||
return True
|
||
try:
|
||
resp = requests.get(
|
||
f"{str(host or '').rstrip('/')}/api/version",
|
||
timeout=CODE_REVIEW_OLLAMA_HOST_PREFLIGHT_TIMEOUT,
|
||
)
|
||
return resp.status_code == 200
|
||
except Exception as exc:
|
||
logger.warning("[CodeReview] Ollama host preflight failed host=%s error=%s", host, exc)
|
||
return False
|
||
|
||
|
||
def _code_review_ollama_model_available(host: str, model: str) -> bool:
|
||
"""Skip a host/model pair when Ollama reports the requested model is absent."""
|
||
if not CODE_REVIEW_OLLAMA_HOST_PREFLIGHT_ENABLED or not model:
|
||
return True
|
||
try:
|
||
resp = requests.get(
|
||
f"{str(host or '').rstrip('/')}/api/tags",
|
||
timeout=CODE_REVIEW_OLLAMA_HOST_PREFLIGHT_TIMEOUT,
|
||
)
|
||
if resp.status_code != 200:
|
||
return True
|
||
names = {
|
||
str(item.get("name") or "")
|
||
for item in (resp.json().get("models") or [])
|
||
if isinstance(item, dict)
|
||
}
|
||
if model in names:
|
||
return True
|
||
if ":" not in model and f"{model}:latest" in names:
|
||
return True
|
||
logger.warning(
|
||
"[CodeReview] Ollama model preflight failed host=%s model=%s available=%s",
|
||
host,
|
||
model,
|
||
sorted(name for name in names if name)[:12],
|
||
)
|
||
return False
|
||
except Exception as exc:
|
||
logger.warning("[CodeReview] Ollama model preflight fail-open host=%s model=%s error=%s", host, model, exc)
|
||
return True
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════════════════════
|
||
# Pipeline Class
|
||
# ═══════════════════════════════════════════════════════════════════════════════
|
||
|
||
class CodeReviewPipeline:
|
||
"""
|
||
5-Step post-deploy code review pipeline.
|
||
Call pipeline.run() inside a daemon thread.
|
||
"""
|
||
|
||
def __init__(self, commit_sha: str, changed_files: List[str],
|
||
branch: str = "main", deploy_type: str = "sync"):
|
||
self.commit_sha = commit_sha
|
||
self.branch = branch
|
||
self.deploy_type = deploy_type
|
||
self.started_at = datetime.now()
|
||
self.pipeline_id = f"cr_{commit_sha[:8]}_{self.started_at.strftime('%Y%m%d_%H%M%S')}"
|
||
|
||
# 只 review Python + YAML 檔(跳過靜態資源)
|
||
self.changed_files = [
|
||
f for f in changed_files
|
||
if f.endswith(('.py', '.yaml', '.yml', '.json'))
|
||
and not f.startswith(('node_modules/', '.git/'))
|
||
]
|
||
|
||
self.state: Dict[str, Any] = {
|
||
"pipeline_id": self.pipeline_id,
|
||
"commit_sha": commit_sha,
|
||
"branch": branch,
|
||
"changed_files": self.changed_files,
|
||
"status": "running",
|
||
"current_step": 0,
|
||
"total_steps": 5,
|
||
"steps": [],
|
||
"findings": [],
|
||
"severity_summary": {"critical": 0, "high": 0, "medium": 0, "low": 0},
|
||
"openclaw_report": "",
|
||
"ea_decision": {},
|
||
"auto_fix_triggered": False,
|
||
"started_at": self.started_at.isoformat(),
|
||
"completed_at": None,
|
||
"message": "",
|
||
}
|
||
self._sync_global()
|
||
|
||
# ── State helpers ─────────────────────────────────────────────────────────
|
||
|
||
def _sync_global(self):
|
||
global _current_pipeline
|
||
with _pipeline_lock:
|
||
_current_pipeline = dict(self.state)
|
||
|
||
def _step_start(self, num: int, name: str, agent: str):
|
||
self.state["current_step"] = num
|
||
self.state["steps"].append({
|
||
"step": num,
|
||
"name": name,
|
||
"agent": agent,
|
||
"status": "running",
|
||
"started_at": datetime.now().isoformat(),
|
||
"completed_at": None,
|
||
"summary": "",
|
||
})
|
||
self._sync_global()
|
||
logger.info("[CodeReview] ▶ Step %d/5 %s (%s)", num, name, agent)
|
||
|
||
def _step_done(self, num: int, summary: str, ok: bool = True):
|
||
for s in self.state["steps"]:
|
||
if s["step"] == num:
|
||
s["status"] = "ok" if ok else "error"
|
||
s["completed_at"] = datetime.now().isoformat()
|
||
s["summary"] = summary[:300]
|
||
self._sync_global()
|
||
logger.info("[CodeReview] %s Step %d — %s", "✓" if ok else "✗", num, summary[:100])
|
||
|
||
def _finish(self, status: str, message: str):
|
||
self.state["status"] = status
|
||
self.state["completed_at"] = datetime.now().isoformat()
|
||
self.state["message"] = message
|
||
self._sync_global()
|
||
|
||
# ── Main pipeline ─────────────────────────────────────────────────────────
|
||
|
||
def run(self):
|
||
"""Execute full pipeline. Designed to run in daemon thread."""
|
||
try:
|
||
self._notify_start()
|
||
|
||
# Step 1 ─ read files
|
||
self._step_start(1, "讀取變更檔案", "system")
|
||
file_contents = self._read_changed_files()
|
||
if not file_contents:
|
||
self._step_done(1, "無有效 Python/YAML 變更,跳過 Review")
|
||
self._finish("skipped", "無有效變更檔案")
|
||
return
|
||
self._step_done(1, f"讀取 {len(file_contents)} 個檔案")
|
||
|
||
# Step 2 ─ Hermes scan
|
||
self._step_start(2, "Hermes 程式碼掃描", "Hermes")
|
||
findings = self._hermes_scan(file_contents)
|
||
cnt = self.state["severity_summary"]
|
||
self._step_done(2, f"CRITICAL={cnt['critical']} HIGH={cnt['high']} MEDIUM={cnt['medium']} LOW={cnt['low']}")
|
||
|
||
# Step 3 ─ OpenClaw assessment
|
||
self._step_start(3, "OpenClaw 架構品質評估", "OpenClaw")
|
||
openclaw_report = self._openclaw_assess(file_contents, findings)
|
||
self.state["openclaw_report"] = openclaw_report
|
||
self._step_done(3, openclaw_report[:120] if openclaw_report else "(OpenClaw 未回應)")
|
||
|
||
# Step 4 ─ ElephantAlpha orchestration
|
||
self._step_start(4, "ElephantAlpha 決策協調", "ElephantAlpha")
|
||
ea = self._ea_orchestrate(findings, openclaw_report)
|
||
self.state["ea_decision"] = ea
|
||
self._step_done(4, f"優先度={ea.get('priority','?')} auto_fix={ea.get('auto_fix',False)}")
|
||
|
||
# Step 5 ─ NemoTron dispatch
|
||
self._step_start(5, "NemoTron 行動派遣", "NemoTron")
|
||
dispatch = self._nemotron_dispatch(ea, findings)
|
||
self._step_done(5, f"寫入 {dispatch['actions']} 筆 action_plans,自動修復={'是' if dispatch['auto_fix'] else '否'}")
|
||
|
||
# Persist + notify
|
||
self._save_to_db(findings, openclaw_report, ea)
|
||
self._notify_complete(findings, openclaw_report, ea)
|
||
self._finish("completed", "Pipeline 執行完成")
|
||
|
||
except Exception as e:
|
||
logger.error("[CodeReview] Pipeline 例外: %s", e, exc_info=True)
|
||
self._finish("error", str(e)[:200])
|
||
self._notify_error(str(e))
|
||
|
||
# ── Step 1:讀取檔案 ───────────────────────────────────────────────────────
|
||
|
||
def _read_changed_files(self) -> Dict[str, str]:
|
||
project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||
contents: Dict[str, str] = {}
|
||
for rel_path in self.changed_files[:8]: # 最多 8 個檔案
|
||
abs_path = os.path.join(project_root, rel_path)
|
||
try:
|
||
with open(abs_path, encoding="utf-8", errors="ignore") as fh:
|
||
raw = fh.read()
|
||
contents[rel_path] = raw[:6000] + ("\n... (截斷)" if len(raw) > 6000 else "")
|
||
except OSError:
|
||
logger.debug("[CodeReview] 無法讀取 %s(部署路徑不同?)", rel_path)
|
||
return contents
|
||
|
||
# ── Step 2:Hermes 掃描 ───────────────────────────────────────────────────
|
||
|
||
def _hermes_scan(self, files: Dict[str, str]) -> List[Dict]:
|
||
"""走 GCP-A → GCP-B;只有 CODE_REVIEW_ALLOW_111_FALLBACK=true 才落到 111。"""
|
||
try:
|
||
if not CODE_REVIEW_HERMES_LLM_SCAN_ENABLED:
|
||
findings = self._static_code_scan(files)
|
||
for f in findings:
|
||
sev = f.get("severity", "LOW").lower()
|
||
if sev in self.state["severity_summary"]:
|
||
self.state["severity_summary"][sev] += 1
|
||
self.state["findings"] = findings
|
||
self._sync_global()
|
||
return findings
|
||
|
||
compact_files = []
|
||
for name, content in list(files.items())[:CODE_REVIEW_HERMES_MAX_FILES]:
|
||
clipped = content[:CODE_REVIEW_HERMES_MAX_CHARS]
|
||
if len(content) > CODE_REVIEW_HERMES_MAX_CHARS:
|
||
clipped += "\n... (截斷,僅掃描前段重點)"
|
||
compact_files.append(f"### {name}\n```python\n{clipped}\n```")
|
||
files_text = "\n\n".join(compact_files)
|
||
|
||
prompt = f"""你是資深程式碼審查工程師,請掃描以下程式碼並列出所有問題。
|
||
|
||
{files_text}
|
||
|
||
輸出格式:純 JSON 陣列,每項包含以下欄位:
|
||
- severity: "CRITICAL" | "HIGH" | "MEDIUM" | "LOW"
|
||
- type: "bug" | "security" | "performance" | "maintainability"
|
||
- file: 檔案名稱
|
||
- line_hint: 約略行號或函式名稱
|
||
- description: 問題說明(繁體中文,精簡一句)
|
||
- suggestion: 修復建議(繁體中文,精簡一句)
|
||
|
||
只輸出 JSON 陣列,不含其他文字。無問題時輸出 []"""
|
||
|
||
from services.ollama_service import (
|
||
OLLAMA_HOST_FALLBACK,
|
||
OLLAMA_HOST_PRIMARY,
|
||
OLLAMA_HOST_SECONDARY,
|
||
OllamaService,
|
||
get_host_label,
|
||
get_provider_tag,
|
||
)
|
||
|
||
hermes_attempts = [
|
||
(
|
||
"primary_code_scan",
|
||
OLLAMA_HOST_PRIMARY,
|
||
CODE_REVIEW_HERMES_PRIMARY_MODEL,
|
||
CODE_REVIEW_HERMES_PRIMARY_TIMEOUT,
|
||
),
|
||
(
|
||
"secondary_fast_scan",
|
||
OLLAMA_HOST_SECONDARY,
|
||
CODE_REVIEW_HERMES_SECONDARY_MODEL,
|
||
CODE_REVIEW_HERMES_SECONDARY_TIMEOUT,
|
||
),
|
||
]
|
||
if CODE_REVIEW_ALLOW_111_FALLBACK:
|
||
hermes_attempts.append((
|
||
"lan_111_hermes_scan",
|
||
OLLAMA_HOST_FALLBACK,
|
||
CODE_REVIEW_HERMES_FALLBACK_MODEL,
|
||
CODE_REVIEW_HERMES_FALLBACK_TIMEOUT,
|
||
))
|
||
findings = None
|
||
last_error = None
|
||
|
||
for attempt_index, (attempt_key, host, model_name, timeout_s) in enumerate(
|
||
hermes_attempts,
|
||
start=1,
|
||
):
|
||
with log_ai_call(
|
||
caller='code_review_hermes',
|
||
provider=get_provider_tag(host),
|
||
model=model_name,
|
||
request_id=f"cr-{self.commit_sha[:8]}",
|
||
meta={'commit': self.commit_sha[:8], 'branch': self.branch,
|
||
'files': len(files), 'route': 'ollama_first',
|
||
'attempt': attempt_index,
|
||
'attempt_key': attempt_key,
|
||
'max_files': CODE_REVIEW_HERMES_MAX_FILES,
|
||
'max_chars': CODE_REVIEW_HERMES_MAX_CHARS,
|
||
'timeout_s': timeout_s},
|
||
) as _ctx:
|
||
_ctx.add_meta('host', host)
|
||
_ctx.add_meta('host_label', get_host_label(host))
|
||
if not _code_review_ollama_host_reachable(host):
|
||
last_error = (
|
||
"ollama host preflight failed "
|
||
f"host={host} timeout={CODE_REVIEW_OLLAMA_HOST_PREFLIGHT_TIMEOUT}s"
|
||
)
|
||
_ctx.add_meta('preflight', 'api_version')
|
||
_ctx.set_error(last_error)
|
||
continue
|
||
if not _code_review_ollama_model_available(host, model_name):
|
||
last_error = f"ollama model preflight failed host={host} model={model_name}"
|
||
_ctx.add_meta('preflight', 'api_tags')
|
||
_ctx.set_error(last_error)
|
||
continue
|
||
ollama = OllamaService(host=host, model=model_name)
|
||
resp = ollama.generate(
|
||
prompt=prompt,
|
||
model=model_name,
|
||
temperature=0.1,
|
||
timeout=timeout_s,
|
||
keep_alive=CODE_REVIEW_OLLAMA_KEEP_ALIVE,
|
||
options={"num_predict": CODE_REVIEW_HERMES_NUM_PREDICT},
|
||
)
|
||
actual_host = resp.host or host
|
||
_ctx.set_provider(get_provider_tag(actual_host))
|
||
_ctx.set_model(resp.model or model_name)
|
||
_ctx.set_tokens(
|
||
input=resp.input_tokens,
|
||
output=resp.output_tokens,
|
||
)
|
||
_ctx.add_meta('host', actual_host)
|
||
_ctx.add_meta('host_label', get_host_label(actual_host))
|
||
if resp.model and resp.model != model_name:
|
||
_ctx.add_meta('requested_model', model_name)
|
||
if not resp.success:
|
||
last_error = resp.error or 'ollama generate failed'
|
||
_ctx.set_error(last_error)
|
||
continue
|
||
raw = (resp.content or "").strip()
|
||
match = re.search(r"\[.*\]", raw, re.DOTALL)
|
||
if not match:
|
||
last_error = f"missing JSON array: {raw[:120]}"
|
||
_ctx.set_error(last_error)
|
||
logger.warning("[CodeReview] Hermes 回應無 JSON: %s", raw[:200])
|
||
continue
|
||
try:
|
||
findings = json.loads(match.group())
|
||
except Exception as exc:
|
||
last_error = f"json parse failed: {type(exc).__name__}: {exc}"
|
||
_ctx.set_error(last_error)
|
||
continue
|
||
break
|
||
|
||
if findings is None:
|
||
logger.warning("[CodeReview] Hermes 本地掃描全部失敗: %s", last_error)
|
||
return []
|
||
|
||
for f in findings:
|
||
sev = f.get("severity", "LOW").lower()
|
||
if sev in self.state["severity_summary"]:
|
||
self.state["severity_summary"][sev] += 1
|
||
self.state["findings"] = findings
|
||
self._sync_global()
|
||
return findings
|
||
|
||
except Exception as e:
|
||
logger.warning("[CodeReview] Hermes 掃描失敗: %s", e)
|
||
return []
|
||
|
||
def _static_code_scan(self, files: Dict[str, str]) -> List[Dict]:
|
||
"""Hermes LLM 關閉時的快速 deterministic 掃描,避免部署後卡 Ollama timeout。"""
|
||
findings: List[Dict] = []
|
||
patterns = [
|
||
("HIGH", "security", r"\beval\s*\(", "使用 eval() 有安全風險", "改用安全 parser 或白名單映射"),
|
||
("HIGH", "security", r"\bexec\s*\(", "使用 exec() 有安全風險", "移除動態執行或改成明確函式"),
|
||
("MEDIUM", "maintainability", r"except\s+Exception\s*:\s*pass\b", "例外被靜默吞掉", "改用 logger.exception 並保留可診斷訊息"),
|
||
("MEDIUM", "maintainability", r"except\s*:\s*pass\b", "裸 except 被靜默吞掉", "指定例外類型並記錄錯誤"),
|
||
]
|
||
request_pattern = re.compile(r"requests\.(get|post|put|delete|patch)\s*\(")
|
||
secret_pattern = re.compile(
|
||
r"(api[_-]?key|token|password|secret)\s*=\s*['\"][^'\"]{12,}['\"]",
|
||
re.IGNORECASE,
|
||
)
|
||
for path, content in files.items():
|
||
lines = content.splitlines()
|
||
for line_no, line in enumerate(lines, start=1):
|
||
stripped = line.strip()
|
||
request_block = "\n".join(lines[line_no - 1:line_no + 5])
|
||
if request_pattern.search(stripped) and "timeout=" not in request_block:
|
||
findings.append({
|
||
"severity": "MEDIUM",
|
||
"type": "performance",
|
||
"file": path,
|
||
"line_hint": str(line_no),
|
||
"description": "HTTP request 未設定 timeout,可能拖住 worker",
|
||
"suggestion": "加入明確 timeout 並處理例外",
|
||
})
|
||
if secret_pattern.search(stripped) and "os.getenv" not in stripped:
|
||
findings.append({
|
||
"severity": "HIGH",
|
||
"type": "security",
|
||
"file": path,
|
||
"line_hint": str(line_no),
|
||
"description": "疑似硬編碼敏感字串",
|
||
"suggestion": "改用環境變數或 secret store",
|
||
})
|
||
for severity, issue_type, pattern, description, suggestion in patterns:
|
||
if re.search(pattern, stripped):
|
||
findings.append({
|
||
"severity": severity,
|
||
"type": issue_type,
|
||
"file": path,
|
||
"line_hint": str(line_no),
|
||
"description": description,
|
||
"suggestion": suggestion,
|
||
})
|
||
if len(findings) >= 8:
|
||
return findings[:8]
|
||
return findings
|
||
|
||
# ── Step 3:OpenClaw 評估 ──────────────────────────────────────────────────
|
||
|
||
def _openclaw_assess(self, files: Dict[str, str], findings: List[Dict]) -> str:
|
||
"""
|
||
路由優先序:
|
||
L1 (預設) → Ollama GCP-A → GCP-B
|
||
L1b (flag CODE_REVIEW_ALLOW_111_FALLBACK=true) → 111 最後備援
|
||
L2 (flag CODE_REVIEW_USE_CLAUDE=true) → Claude Opus 4.7 雲端備援
|
||
L3 (Gemini guard 顯式解鎖) → Gemini 雲端備援
|
||
L4 (雲端未顯式開啟時) → deterministic 本地降級摘要
|
||
L5 (雲端顯式開啟但失敗時) → ElephantAlpha via NIM/OpenRouter
|
||
"""
|
||
sev = self.state["severity_summary"]
|
||
findings_json = json.dumps(findings[:8], ensure_ascii=False, indent=2)
|
||
files_list = "\n".join(f"- {k} ({len(v)} 字元)" for k, v in list(files.items())[:5])
|
||
|
||
system = (
|
||
"你是 OpenClaw 程式碼品質戰略分析師,以技術主管視角評估部署後程式碼。"
|
||
"語言:繁體中文。風格:精準、數據導向、可執行建議。"
|
||
)
|
||
user_prompt = f"""【部署】Commit {self.commit_sha[:8]} @ {self.branch}
|
||
【變更檔案】
|
||
{files_list}
|
||
|
||
【Hermes 掃描摘要】CRITICAL={sev['critical']} HIGH={sev['high']} MEDIUM={sev['medium']} LOW={sev['low']}
|
||
【問題明細】
|
||
{findings_json}
|
||
|
||
請產出程式碼品質評估(使用 HTML <b> 標題,150字以內):
|
||
<b>🔍 整體風險等級</b>(一句理由)
|
||
<b>⚠️ 最需關注問題</b>(TOP 2)
|
||
<b>💡 架構優化方向</b>(1條長期建議)
|
||
<b>✅ 本次部署亮點</b>"""
|
||
|
||
# ── L1:Ollama-first — GCP-A → GCP-B(111 需顯式開 flag)──────────────
|
||
from services.ollama_service import (
|
||
OLLAMA_HOST_FALLBACK,
|
||
OLLAMA_HOST_PRIMARY,
|
||
OLLAMA_HOST_SECONDARY,
|
||
OllamaService,
|
||
get_host_label,
|
||
get_provider_tag,
|
||
)
|
||
|
||
gemini_api_key = get_gemini_api_key("code_review")
|
||
cloud_fallback_available = CODE_REVIEW_USE_CLAUDE or bool(gemini_api_key)
|
||
fallback_caller = 'code_review_openclaw' if CODE_REVIEW_USE_CLAUDE else (
|
||
'code_review_openclaw_gemini' if gemini_api_key else 'code_review_elephant'
|
||
)
|
||
if not cloud_fallback_available:
|
||
fallback_caller = 'code_review_local_degraded'
|
||
ollama_attempts = [
|
||
(
|
||
"primary_code",
|
||
OLLAMA_HOST_PRIMARY,
|
||
CODE_REVIEW_OLLAMA_MODEL,
|
||
CODE_REVIEW_OLLAMA_TIMEOUT,
|
||
),
|
||
(
|
||
"secondary_fast",
|
||
OLLAMA_HOST_SECONDARY,
|
||
CODE_REVIEW_OLLAMA_SECONDARY_MODEL,
|
||
CODE_REVIEW_OLLAMA_SECONDARY_TIMEOUT,
|
||
),
|
||
]
|
||
if CODE_REVIEW_ALLOW_111_FALLBACK:
|
||
ollama_attempts.append((
|
||
"lan_111_hermes",
|
||
OLLAMA_HOST_FALLBACK,
|
||
CODE_REVIEW_OLLAMA_FALLBACK_MODEL,
|
||
CODE_REVIEW_OLLAMA_FALLBACK_TIMEOUT,
|
||
))
|
||
last_ollama_error = None
|
||
|
||
for attempt_index, (attempt_key, host, model_name, timeout_s) in enumerate(
|
||
ollama_attempts,
|
||
start=1,
|
||
):
|
||
with log_ai_call(
|
||
caller='code_review_openclaw',
|
||
provider=get_provider_tag(host),
|
||
model=model_name,
|
||
request_id=f"cr-{self.commit_sha[:8]}",
|
||
meta={
|
||
'commit': self.commit_sha[:8],
|
||
'branch': self.branch,
|
||
'route': 'ollama_first',
|
||
'attempt': attempt_index,
|
||
'attempt_key': attempt_key,
|
||
'timeout_s': timeout_s,
|
||
},
|
||
) as _ctx:
|
||
_ctx.add_meta('host', host)
|
||
_ctx.add_meta('host_label', get_host_label(host))
|
||
if not _code_review_ollama_host_reachable(host):
|
||
last_ollama_error = (
|
||
"ollama host preflight failed "
|
||
f"host={host} timeout={CODE_REVIEW_OLLAMA_HOST_PREFLIGHT_TIMEOUT}s"
|
||
)
|
||
_ctx.add_meta('preflight', 'api_version')
|
||
_ctx.set_error(last_ollama_error)
|
||
if attempt_index == len(ollama_attempts):
|
||
_ctx.fallback_to_caller(fallback_caller)
|
||
continue
|
||
if not _code_review_ollama_model_available(host, model_name):
|
||
last_ollama_error = f"ollama model preflight failed host={host} model={model_name}"
|
||
_ctx.add_meta('preflight', 'api_tags')
|
||
_ctx.set_error(last_ollama_error)
|
||
if attempt_index == len(ollama_attempts):
|
||
_ctx.fallback_to_caller(fallback_caller)
|
||
continue
|
||
ollama = OllamaService(host=host, model=model_name)
|
||
resp = ollama.generate(
|
||
prompt=user_prompt,
|
||
system_prompt=system,
|
||
model=model_name,
|
||
temperature=0.2,
|
||
timeout=timeout_s,
|
||
keep_alive=CODE_REVIEW_OLLAMA_KEEP_ALIVE,
|
||
options={"num_predict": CODE_REVIEW_OLLAMA_NUM_PREDICT},
|
||
)
|
||
actual_host = resp.host or host
|
||
_ctx.set_provider(get_provider_tag(actual_host))
|
||
_ctx.set_model(resp.model or model_name)
|
||
_ctx.set_tokens(input=resp.input_tokens, output=resp.output_tokens)
|
||
_ctx.add_meta('host', actual_host)
|
||
_ctx.add_meta('host_label', get_host_label(actual_host))
|
||
if resp.model and resp.model != model_name:
|
||
_ctx.add_meta('requested_model', model_name)
|
||
if resp.success and (resp.content or '').strip():
|
||
return resp.content or ""
|
||
last_ollama_error = resp.error or 'ollama generate failed'
|
||
_ctx.set_error(last_ollama_error)
|
||
if attempt_index == len(ollama_attempts):
|
||
_ctx.fallback_to_caller(fallback_caller)
|
||
|
||
logger.warning(
|
||
"[CodeReview] OpenClaw 本地 Ollama 鏈全部失敗: %s",
|
||
last_ollama_error,
|
||
)
|
||
|
||
if not cloud_fallback_available:
|
||
logger.warning(
|
||
"[CodeReview] 111/cloud fallback 未啟用,回傳 deterministic 本地降級評估",
|
||
)
|
||
return self._build_local_openclaw_degraded_report(
|
||
files=files,
|
||
findings=findings,
|
||
last_error=last_ollama_error,
|
||
)
|
||
|
||
# ── L1:Phase 7 Frontier — Claude Opus 4.7(程式碼能力 #1)────────────
|
||
# feature flag 預設 OFF;ON 時只作 Ollama 失敗後的雲端備援。
|
||
if CODE_REVIEW_USE_CLAUDE:
|
||
try:
|
||
from services.anthropic_service import anthropic_service
|
||
except Exception as e:
|
||
logger.warning("[CodeReview] Claude service import 失敗,改走 Gemini 備援: %s", e)
|
||
anthropic_service = None # type: ignore
|
||
|
||
if anthropic_service is not None and anthropic_service.is_available():
|
||
with log_ai_call(
|
||
caller='code_review_openclaw',
|
||
provider='claude',
|
||
model=CLAUDE_REVIEW_MODEL,
|
||
request_id=f"cr-{self.commit_sha[:8]}",
|
||
meta={
|
||
'commit': self.commit_sha[:8],
|
||
'branch': self.branch,
|
||
'flag': 'CODE_REVIEW_USE_CLAUDE',
|
||
},
|
||
) as _ctx:
|
||
resp = anthropic_service.generate(
|
||
prompt=user_prompt,
|
||
system_prompt=system, # ephemeral cache(5 分鐘 TTL,省 ~90% 成本)
|
||
model=CLAUDE_REVIEW_MODEL,
|
||
max_tokens=2048,
|
||
temperature=0.2, # code review 要精確
|
||
cache_system=True,
|
||
timeout=120,
|
||
)
|
||
if resp.success:
|
||
_ctx.set_tokens(input=resp.input_tokens, output=resp.output_tokens)
|
||
_ctx.set_cache_hit(resp.cache_hit)
|
||
_ctx.add_meta('cache_creation_tokens', resp.cache_creation_tokens)
|
||
_ctx.add_meta('cache_read_tokens', resp.cache_read_tokens)
|
||
return resp.content or ""
|
||
# Claude 失敗 → fallback 到 Gemini 備援(L3)
|
||
_ctx.set_error(resp.error or 'claude generate failed')
|
||
_ctx.fallback_to_caller('code_review_openclaw_gemini')
|
||
logger.warning(
|
||
"[CodeReview] Claude 失敗,改走 Gemini 備援: %s", resp.error,
|
||
)
|
||
else:
|
||
logger.info(
|
||
"[CodeReview] CODE_REVIEW_USE_CLAUDE=true 但 Claude 不可用(缺 API key 或 SDK),改走下一層備援",
|
||
)
|
||
|
||
# ── L3:Gemini — 僅作 Ollama/Claude 都失敗後的備援 ───────────────────
|
||
if gemini_api_key:
|
||
with log_ai_call(
|
||
caller='code_review_openclaw_gemini',
|
||
provider='gemini',
|
||
model=REVIEW_MODEL,
|
||
request_id=f"cr-{self.commit_sha[:8]}",
|
||
meta={
|
||
'commit': self.commit_sha[:8],
|
||
'branch': self.branch,
|
||
'fallback_from': 'code_review_openclaw_ollama',
|
||
},
|
||
) as _ctx:
|
||
try:
|
||
import google.generativeai as genai
|
||
genai.configure(api_key=gemini_api_key)
|
||
model = genai.GenerativeModel(
|
||
model_name=REVIEW_MODEL,
|
||
generation_config=genai.types.GenerationConfig(
|
||
temperature=0.3, max_output_tokens=1500,
|
||
),
|
||
system_instruction=system,
|
||
)
|
||
resp = model.generate_content(user_prompt, request_options={"timeout": 90})
|
||
try:
|
||
usage = getattr(resp, 'usage_metadata', None)
|
||
if usage is not None:
|
||
_ctx.set_tokens(
|
||
input=getattr(usage, 'prompt_token_count', 0) or 0,
|
||
output=getattr(usage, 'candidates_token_count', 0) or 0,
|
||
)
|
||
except Exception:
|
||
logger.debug("[CodeReview] Gemini usage metadata parse failed", exc_info=True)
|
||
return resp.text or ""
|
||
except Exception as e:
|
||
logger.warning("[CodeReview] OpenClaw Gemini 失敗,降級 ElephantAlpha: %s", e)
|
||
_ctx.set_error(f"{type(e).__name__}: {e}")
|
||
_ctx.fallback_to_caller('code_review_elephant')
|
||
else:
|
||
logger.info("[CodeReview] 跳過 Gemini 備援:%s", gemini_disabled_message("code_review"))
|
||
|
||
# 降級:ElephantAlpha via OpenRouter(OPENROUTER_API_KEY 容器內一定有)
|
||
# Phase 1 v5.0 logger 追蹤
|
||
with log_ai_call(
|
||
caller='code_review_elephant',
|
||
provider='nim_via_elephant',
|
||
model='nvidia/llama-3.3-nemotron-super-49b-v1.5',
|
||
request_id=f"cr-{self.commit_sha[:8]}",
|
||
meta={'commit': self.commit_sha[:8], 'branch': self.branch},
|
||
) as _ctx:
|
||
try:
|
||
from services.elephant_service import elephant_service
|
||
resp = elephant_service.generate(
|
||
prompt=user_prompt,
|
||
system_prompt=system,
|
||
temperature=0.3,
|
||
timeout=90,
|
||
)
|
||
if resp.success:
|
||
# ElephantResponse 已含 input_tokens/output_tokens
|
||
_ctx.set_tokens(
|
||
input=getattr(resp, 'input_tokens', 0) or 0,
|
||
output=getattr(resp, 'output_tokens', 0) or 0,
|
||
)
|
||
return resp.content or ""
|
||
else:
|
||
_ctx.set_error(getattr(resp, 'error', 'elephant generate failed'))
|
||
except Exception as e:
|
||
logger.warning("[CodeReview] OpenClaw ElephantAlpha 降級也失敗: %s", e)
|
||
_ctx.set_error(f"{type(e).__name__}: {e}")
|
||
|
||
return ""
|
||
|
||
def _build_local_openclaw_degraded_report(
|
||
self,
|
||
*,
|
||
files: Dict[str, str],
|
||
findings: List[Dict],
|
||
last_error: Optional[str],
|
||
) -> str:
|
||
sev = self.state["severity_summary"]
|
||
risk = "低"
|
||
if sev["critical"]:
|
||
risk = "嚴重"
|
||
elif sev["high"]:
|
||
risk = "高"
|
||
elif sev["medium"]:
|
||
risk = "中"
|
||
top = [f for f in findings[:2] if f.get("description")]
|
||
top_text = ";".join(
|
||
f"{f.get('file', 'unknown')}:{f.get('description')}" for f in top
|
||
) or "本次本地掃描未發現高風險問題"
|
||
return (
|
||
f"<b>🔍 整體風險等級</b> {risk}。"
|
||
f"本地掃描完成 {len(files)} 檔,CRITICAL={sev['critical']}、HIGH={sev['high']}、"
|
||
f"MEDIUM={sev['medium']}、LOW={sev['low']}。"
|
||
f"<b>⚠️ 最需關注問題</b> {top_text}。"
|
||
f"<b>💡 架構優化方向</b> AI 架構檢查延伸分析不可用時,以本地掃描收斂,避免拖高模型主機負載。"
|
||
f"<b>✅ 本次部署亮點</b> 已以本地掃描報告收斂,未呼叫 Gemini;AI 延伸分析暫時略過。"
|
||
)
|
||
|
||
# ── Step 4:ElephantAlpha 決策 ─────────────────────────────────────────────
|
||
|
||
def _ea_orchestrate(self, findings: List[Dict], openclaw_report: str) -> Dict:
|
||
sev = self.state["severity_summary"]
|
||
critical_n = sev["critical"]
|
||
high_n = sev["high"]
|
||
|
||
# 嘗試呼叫 ElephantAlpha 做精細判斷
|
||
try:
|
||
from services.elephant_service import elephant_service
|
||
|
||
top3 = json.dumps(
|
||
[f for f in findings if f.get("severity") in ("CRITICAL", "HIGH")][:3],
|
||
ensure_ascii=False,
|
||
)
|
||
prompt = f"""你是 Elephant Alpha,負責協調 Code Review 後的修復決策。
|
||
|
||
【部署】commit={self.commit_sha[:8]} branch={self.branch}
|
||
【問題統計】CRITICAL={critical_n} HIGH={high_n} MEDIUM={sev['medium']} LOW={sev['low']}
|
||
【Top 問題】{top3}
|
||
【OpenClaw評估摘要】{openclaw_report[:300]}
|
||
|
||
請以 JSON 回答(不含其他文字):
|
||
{{
|
||
"priority": "critical|high|medium|low",
|
||
"auto_fix": true|false,
|
||
"reasoning": "決策理由(繁體中文,一句話,需含具體數字)",
|
||
"fix_files": ["需自動修復的檔案(最多5個,所有有問題的檔案)"],
|
||
"human_review_needed": false
|
||
}}
|
||
|
||
規則(依 ADR-020 全自動修復政策,覆寫 ADR-012 L3 HITL 對 code review 的限制):
|
||
- 任何 finding(不論 CRITICAL/HIGH/MEDIUM/LOW)→ auto_fix=true,human_review_needed=false
|
||
- 安全網是 Git revert + Gitea CI/CD 回滾,不依賴人工審查門檻
|
||
- priority 按最嚴重 severity 決定:CRITICAL>HIGH>MEDIUM>LOW
|
||
- fix_files 填入所有有問題的檔案(最多 5 個,AiderHeal 端會再限流)"""
|
||
|
||
resp = elephant_service.generate(
|
||
prompt=prompt,
|
||
json_mode=True,
|
||
temperature=0.1,
|
||
timeout=60,
|
||
)
|
||
if resp.success:
|
||
return self._guard_ea_decision(json.loads(resp.content), findings)
|
||
except Exception as e:
|
||
logger.warning("[CodeReview] ElephantAlpha 決策失敗,回退規則: %s", e)
|
||
|
||
# 規則 fallback:ADR-020 全自動修復政策。任何 finding 一律 auto_fix=true。
|
||
has_findings = len(findings) > 0
|
||
priority = (
|
||
"critical" if critical_n > 0 else
|
||
"high" if high_n > 0 else
|
||
"medium" if sev["medium"] > 0 else
|
||
"low" if sev["low"] > 0 else "low"
|
||
)
|
||
auto_fix = bool(has_findings and AUTO_FIX_ENABLED)
|
||
fix_files = list({
|
||
f.get("file", "") for f in findings if f.get("file")
|
||
})[:5]
|
||
|
||
return {
|
||
"priority": priority,
|
||
"auto_fix": auto_fix,
|
||
"reasoning": f"ADR-020 全自動修復:CRITICAL={critical_n} HIGH={high_n} MEDIUM={sev['medium']} LOW={sev['low']},"
|
||
+ ("觸發 AiderHeal 自動修復(Git+CI/CD 為回滾安全網)" if auto_fix else "無 finding,無需修復"),
|
||
"fix_files": fix_files,
|
||
"human_review_needed": False,
|
||
}
|
||
|
||
def _guard_ea_decision(self, decision: Dict, findings: List[Dict]) -> Dict:
|
||
"""ADR-020 全自動修復政策:有 finding 一律 auto_fix=true,僅受 AUTO_FIX_ENABLED 主開關控制。"""
|
||
sev = self.state["severity_summary"]
|
||
priority = (decision.get("priority") or "").lower() or (
|
||
"critical" if sev["critical"] > 0 else
|
||
"high" if sev["high"] > 0 else
|
||
"medium" if sev["medium"] > 0 else
|
||
"low"
|
||
)
|
||
has_findings = bool(findings)
|
||
allowed_auto_fix = bool(has_findings and AUTO_FIX_ENABLED)
|
||
if has_findings and not AUTO_FIX_ENABLED:
|
||
logger.warning(
|
||
"[CodeReview] auto_fix 被 CODE_REVIEW_AUTO_FIX_ENABLED=false 主開關擋下 priority=%s",
|
||
priority,
|
||
)
|
||
|
||
decision["priority"] = priority
|
||
decision["auto_fix"] = allowed_auto_fix
|
||
decision["human_review_needed"] = False
|
||
decision["reasoning"] = (
|
||
f"{decision.get('reasoning', '')} "
|
||
f"[ADR-020 全自動修復: auto_fix={'enabled' if allowed_auto_fix else 'flag_disabled'}, priority={priority}]"
|
||
).strip()
|
||
return decision
|
||
|
||
# ── Step 5:NemoTron 派遣 ──────────────────────────────────────────────────
|
||
|
||
def _nemotron_dispatch(self, ea: Dict, findings: List[Dict]) -> Dict:
|
||
auto_fix = ea.get("auto_fix", False)
|
||
fix_files = ea.get("fix_files", [])
|
||
allowed_fix_files = _aider_allowed_fix_files(fix_files)
|
||
priority_map = {"critical": 1, "high": 2, "medium": 3, "low": 4}
|
||
priority_num = priority_map.get(ea.get("priority", "low"), 4)
|
||
|
||
actions_created = 0
|
||
session = get_session()
|
||
try:
|
||
# 每個需修復的檔案建立一筆 action_plan
|
||
for fpath in fix_files[:3]:
|
||
if active_code_review_action_exists(session, fpath):
|
||
logger.info("[CodeReview] skip duplicate active action_plan file=%s", fpath)
|
||
continue
|
||
related = [f for f in findings if f.get("file") == fpath][:3]
|
||
desc = f"Code Review 修復:{fpath}|{', '.join(f.get('description','')[:40] for f in related)}"
|
||
session.execute(text("""
|
||
INSERT INTO action_plans
|
||
(action_type, description, status, priority, metadata_json, created_at)
|
||
VALUES
|
||
('code_review_fix', :desc, :status, :priority, :meta, NOW())
|
||
"""), {
|
||
"desc": desc[:500],
|
||
"status": (
|
||
"auto_pending"
|
||
if auto_fix and fpath in allowed_fix_files
|
||
else "auto_skipped_whitelist"
|
||
if auto_fix
|
||
else "auto_disabled"
|
||
),
|
||
"priority": priority_num,
|
||
"meta": json.dumps({
|
||
"pipeline_id": self.pipeline_id,
|
||
"commit_sha": self.commit_sha,
|
||
"file": fpath,
|
||
"auto_fix": auto_fix,
|
||
"aider_auto_fix_allowed": fpath in allowed_fix_files,
|
||
"ea_priority": ea.get("priority"),
|
||
"findings": related,
|
||
}, ensure_ascii=False),
|
||
})
|
||
actions_created += 1
|
||
session.commit()
|
||
except Exception as e:
|
||
logger.warning("[CodeReview] action_plans 寫入失敗: %s", e)
|
||
session.rollback()
|
||
finally:
|
||
session.close()
|
||
|
||
# 觸發 AiderHeal(非阻塞)
|
||
if auto_fix and allowed_fix_files:
|
||
self.state["auto_fix_triggered"] = True
|
||
self._sync_global()
|
||
self._trigger_aider_heal(findings, allowed_fix_files)
|
||
elif auto_fix and fix_files:
|
||
self.state["auto_fix_skipped_whitelist"] = True
|
||
self._sync_global()
|
||
logger.info("[CodeReview] AiderHeal skipped: no files matched ADR-020 whitelist files=%s", fix_files)
|
||
|
||
return {
|
||
"actions": actions_created,
|
||
"auto_fix": auto_fix,
|
||
"aider_fix_files": allowed_fix_files,
|
||
}
|
||
|
||
def _trigger_aider_heal(self, findings: List[Dict], fix_files: List[str]):
|
||
"""非阻塞觸發 AiderHeal 自動修復"""
|
||
def _heal_worker():
|
||
try:
|
||
from services.aider_heal_executor import execute_code_fix
|
||
for fpath in fix_files[:2]: # 最多同時修 2 個檔案
|
||
related = [f for f in findings if f.get("file") == fpath]
|
||
if not related:
|
||
continue
|
||
worst = sorted(related, key=lambda x: {"CRITICAL":0,"HIGH":1,"MEDIUM":2,"LOW":3}.get(x.get("severity","LOW"),3))[0]
|
||
result = execute_code_fix(
|
||
error_type=f"code_review_{worst.get('type','bug')}",
|
||
error_message=worst.get("description", "Code Review 發現問題"),
|
||
target_file=fpath,
|
||
context={"suggestion": worst.get("suggestion", ""), "pipeline_id": self.pipeline_id},
|
||
)
|
||
logger.info("[CodeReview] AiderHeal %s → %s", fpath, result.get("message", ""))
|
||
except Exception as e:
|
||
logger.error("[CodeReview] AiderHeal 觸發失敗: %s", e)
|
||
|
||
t = threading.Thread(target=_heal_worker, daemon=True, name=f"aider-heal-{self.commit_sha[:8]}")
|
||
t.start()
|
||
|
||
# ── DB 持久化 ──────────────────────────────────────────────────────────────
|
||
|
||
def _save_to_db(self, findings: List[Dict], openclaw_report: str, ea: Dict):
|
||
session = get_session()
|
||
try:
|
||
row = session.execute(text("""
|
||
INSERT INTO ai_insights
|
||
(insight_type, content, confidence, created_by,
|
||
status, metadata_json, period, created_at)
|
||
VALUES
|
||
('code_review_result', :content, :conf, 'code_review_pipeline',
|
||
'active', :meta, :period, NOW())
|
||
RETURNING id
|
||
"""), {
|
||
"content": json.dumps({
|
||
"findings": findings,
|
||
"openclaw_report": openclaw_report,
|
||
"ea_decision": ea,
|
||
"severity_summary": self.state["severity_summary"],
|
||
}, ensure_ascii=False)[:8000],
|
||
"conf": 0.90,
|
||
"meta": json.dumps({
|
||
"pipeline_id": self.pipeline_id,
|
||
"commit_sha": self.commit_sha,
|
||
"branch": self.branch,
|
||
"changed_files": self.changed_files,
|
||
"auto_fix_triggered": self.state["auto_fix_triggered"],
|
||
}, ensure_ascii=False),
|
||
"period": datetime.now().strftime("%Y-%m-%d"),
|
||
}).fetchone()
|
||
session.commit()
|
||
if row:
|
||
try:
|
||
from services.openclaw_learning_service import enqueue_insight_embedding
|
||
enqueue_insight_embedding(row[0], "code_review_result", json.dumps({
|
||
"findings": findings,
|
||
"openclaw_report": openclaw_report,
|
||
"ea_decision": ea,
|
||
}, ensure_ascii=False), datetime.now().strftime("%Y-%m-%d"))
|
||
except Exception as embed_err:
|
||
logger.warning("[CodeReview] embedding queue enqueue failed: %s", embed_err)
|
||
logger.info("[CodeReview] ai_insights 寫入成功 pipeline=%s", self.pipeline_id)
|
||
except Exception as e:
|
||
logger.error("[CodeReview] DB 寫入失敗: %s", e)
|
||
session.rollback()
|
||
finally:
|
||
session.close()
|
||
|
||
# ── Telegram 通知 ─────────────────────────────────────────────────────────
|
||
|
||
def _notify_start(self):
|
||
try:
|
||
from services.telegram_templates import _send_telegram_raw
|
||
files_list = "\n".join(f" • {f}" for f in self.changed_files[:5])
|
||
if len(self.changed_files) > 5:
|
||
files_list += f"\n (+{len(self.changed_files)-5} 個)"
|
||
_send_telegram_raw(
|
||
f"🔍 <b>Code Review 啟動</b>\n"
|
||
f"══════════════════════════\n"
|
||
f"📦 Commit <code>{self.commit_sha[:8]}</code> 🌿 {self.branch}\n"
|
||
f"📝 變更檔案:\n{files_list}\n"
|
||
f"══════════════════════════\n"
|
||
f"🤖 <i>Hermes → OpenClaw → Elephant Alpha → NemoTron</i>\n"
|
||
f"📊 即時進度:https://mo.wooo.work/code-review/"
|
||
)
|
||
except Exception as e:
|
||
logger.warning("[CodeReview] 啟動通知失敗: %s", e)
|
||
|
||
def _notify_complete(self, findings: List[Dict], openclaw_report: str, ea: Dict):
|
||
try:
|
||
from services.telegram_templates import _send_telegram_raw
|
||
sev = self.state["severity_summary"]
|
||
priority = ea.get("priority", "medium")
|
||
auto_fix = ea.get("auto_fix", False)
|
||
fix_files = ea.get("fix_files", [])
|
||
allowed_fix_files = _aider_allowed_fix_files(fix_files)
|
||
|
||
icon = {"critical": "🔴", "high": "🟠", "medium": "🟡", "low": "🟢"}.get(priority, "🟡")
|
||
top_issues = [f for f in findings if f.get("severity") in ("CRITICAL", "HIGH")][:3]
|
||
issues_lines = "\n".join(
|
||
f" {'🔴' if f['severity']=='CRITICAL' else '🟠'} [{f['severity']}] {f.get('description','')} — <i>{f.get('file','')}</i>"
|
||
for f in top_issues
|
||
) or " ✅ 無高風險問題"
|
||
|
||
msg = (
|
||
f"{icon} <b>Code Review 完成</b> · <code>{self.commit_sha[:8]}</code>\n"
|
||
f"══════════════════════════\n"
|
||
f"🔴 CRITICAL <b>{sev['critical']}</b> "
|
||
f"🟠 HIGH <b>{sev['high']}</b> "
|
||
f"🟡 MEDIUM <b>{sev['medium']}</b> "
|
||
f"🟢 LOW <b>{sev['low']}</b>\n"
|
||
f"══════════════════════════\n"
|
||
f"<b>⚠️ 主要問題</b>\n{issues_lines}\n"
|
||
)
|
||
if openclaw_report:
|
||
msg += f"\n{openclaw_report[:400]}\n"
|
||
|
||
if auto_fix and allowed_fix_files:
|
||
fix_status = "🔧 已觸發自動修復(AiderHeal)"
|
||
elif auto_fix and fix_files:
|
||
fix_status = "⚠️ 不在自動修復白名單,需人工處理"
|
||
elif sev['critical'] + sev['high'] + sev['medium'] + sev['low'] == 0:
|
||
fix_status = "✅ 無需修復動作"
|
||
else:
|
||
fix_status = "🛑 自動修復主開關關閉(CODE_REVIEW_AUTO_FIX_ENABLED=false)"
|
||
msg += (
|
||
f"══════════════════════════\n"
|
||
f"🤖 Elephant Alpha:<b>{priority.upper()}</b> {fix_status}\n"
|
||
f"📊 完整報告:https://mo.wooo.work/code-review/"
|
||
)
|
||
_send_telegram_raw(msg)
|
||
except Exception as e:
|
||
logger.warning("[CodeReview] 完成通知失敗: %s", e)
|
||
|
||
def _notify_error(self, error: str):
|
||
try:
|
||
from services.telegram_templates import _send_telegram_raw
|
||
_send_telegram_raw(
|
||
f"🚨 <b>Code Review Pipeline 失敗</b>\n"
|
||
f"Commit:<code>{self.commit_sha[:8]}</code> @ {self.branch}\n"
|
||
f"錯誤:{error[:200]}\n"
|
||
f"📊 查看:https://mo.wooo.work/code-review/"
|
||
)
|
||
except Exception as exc:
|
||
logger.warning("[CodeReview] 失敗通知發送失敗: %s", exc, exc_info=True)
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════════════════════
|
||
# 公開 API
|
||
# ═══════════════════════════════════════════════════════════════════════════════
|
||
|
||
def trigger_post_deploy_review(
|
||
commit_sha: str,
|
||
changed_files: List[str],
|
||
branch: str = "main",
|
||
deploy_type: str = "sync",
|
||
) -> str:
|
||
"""
|
||
啟動 Pipeline(後台 daemon thread)。
|
||
回傳 pipeline_id。由 routes/code_review_routes.py 的 webhook 端點呼叫。
|
||
"""
|
||
pipeline = CodeReviewPipeline(commit_sha, changed_files, branch, deploy_type)
|
||
t = threading.Thread(
|
||
target=pipeline.run,
|
||
daemon=True,
|
||
name=f"code-review-{commit_sha[:8]}",
|
||
)
|
||
t.start()
|
||
logger.info("[CodeReview] 已派發 pipeline=%s files=%d", pipeline.pipeline_id, len(pipeline.changed_files))
|
||
return pipeline.pipeline_id
|
||
|
||
|
||
def get_current_state() -> Dict[str, Any]:
|
||
"""前端 polling 用:取得目前 pipeline 即時狀態"""
|
||
with _pipeline_lock:
|
||
state = dict(_current_pipeline)
|
||
if state.get("openclaw_report"):
|
||
state["openclaw_report"] = _public_openclaw_report(state.get("openclaw_report"))
|
||
return state
|
||
|
||
|
||
def get_history(limit: int = 20) -> List[Dict]:
|
||
"""取得 ai_insights 中歷史 code_review_result 記錄"""
|
||
session = get_session()
|
||
try:
|
||
rows = session.execute(text("""
|
||
SELECT id, content, confidence, metadata_json, created_at, status
|
||
FROM ai_insights
|
||
WHERE insight_type = 'code_review_result'
|
||
ORDER BY created_at DESC
|
||
LIMIT :lim
|
||
"""), {"lim": limit}).fetchall()
|
||
|
||
results = []
|
||
for r in rows:
|
||
meta, content = {}, {}
|
||
try:
|
||
meta = json.loads(r[3]) if r[3] else {}
|
||
content = json.loads(r[1]) if r[1] else {}
|
||
except Exception:
|
||
logger.debug("[CodeReview] history row JSON decode failed id=%s", r[0], exc_info=True)
|
||
sev = content.get("severity_summary", {})
|
||
results.append({
|
||
"id": r[0],
|
||
"pipeline_id": meta.get("pipeline_id", ""),
|
||
"commit_sha": meta.get("commit_sha", "")[:8],
|
||
"branch": meta.get("branch", ""),
|
||
"changed_files": meta.get("changed_files", []),
|
||
"severity_summary": sev,
|
||
"total_issues": sum(sev.values()),
|
||
"auto_fix": meta.get("auto_fix_triggered", False),
|
||
"findings": content.get("findings", []),
|
||
"openclaw_report": _public_openclaw_report(content.get("openclaw_report", "")),
|
||
"ea_decision": content.get("ea_decision", {}),
|
||
"created_at": r[4].isoformat() if r[4] else "",
|
||
"status": r[5] or "active",
|
||
})
|
||
return results
|
||
except Exception as e:
|
||
logger.warning("[CodeReview] 歷史讀取失敗: %s", e)
|
||
return []
|
||
finally:
|
||
session.close()
|
||
|
||
|
||
def verify_internal_token(request_token: str) -> bool:
|
||
"""驗證 CD webhook 來源 token。Production 預設必填,避免外部觸發 auto-review/fix 鏈。"""
|
||
if not INTERNAL_TOKEN:
|
||
if ALLOW_INSECURE_WEBHOOK:
|
||
logger.warning("[CodeReview] INTERNAL_WEBHOOK_TOKEN 未設定,僅因 dev override 放行")
|
||
return True
|
||
logger.error("[CodeReview] INTERNAL_WEBHOOK_TOKEN 未設定,拒絕 webhook")
|
||
return False
|
||
return request_token == INTERNAL_TOKEN
|