Files
ewoooc/services/code_review_pipeline_service.py
OoO 0b13055466 feat(p38): host_health + ppt_audit DB 持久化(B-1 + B-2)
統帥要求:
1. 所有 6 個觀測頁的功能和數據都要完整寫入資料庫儲存
2. Ollama 切 GCP 順序 GCP-A → GCP-B → 111

盤點結果:
- 4/6 頁面已有 DB 表(ai_calls / learning_episodes / rag_query_log / ai_call_budgets)
- 2/6 頁面是即時查詢無歷史:host_health(HTTP probe)、ppt_audit(os.listdir)
- Ollama 99% 已合規,僅 1 處過時註解

修補(B-1):
- services/code_review_pipeline_service.py:207 註解更新
  「直呼內網 Ollama (192.168.0.188)」→ 「走 resolve_ollama_host 三主機級聯 ADR-027」

新增(B-2):
- migrations/029_create_host_health_probes.sql
  - 三主機健康歷史表(label/url/healthy/response_ms/error_msg)
  - 索引:probed_at / (host_label, probed_at)
  - 30 天保留(cron 清理)
- migrations/030_create_ppt_audit_results.sql
  - PPT 視覺審核結果表(status/issues_count/issues_found JSONB/confidence)
  - 索引:audited_at / pptx_filename / failed-only partial
- routes/admin_observability_routes.py:host_health_dashboard
  - 每次 probe 寫入 host_health_probes(失敗安全)
  - 新增 24h 健康趨勢卡片(uptime % / 平均 ms)
- routes/admin_observability_routes.py:ppt_audit_history
  - 從 ppt_audit_results 讀過去 7 日 audit 紀錄
  - 顯示審核時間/檔名/結果/問題數/信心度/耗時
- services/ppt_vision_service.py:check_ppt_file
  - 新增 _persist_audit_result() 跑完寫入 DB(status/issues/confidence/duration)
  - 失敗安全:DB 寫入失敗只 log warning,不擋主流程
- templates/admin/host_health.html + ppt_audit_history.html
  - 新增「24h 健康趨勢」card(host_health)
  - 新增「視覺審核歷史紀錄」card(ppt_audit)

DoD:
- 程式碼語法 ✓
- Jinja 平衡 ✓
- 失敗安全(DB 寫入或讀取失敗都不擋頁面渲染)✓

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-04 18:55:39 +08:00

800 lines
38 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
services/code_review_pipeline_service.py
Post-Deploy AI Agent Code Review Pipeline
觸發時機CD 健康檢查通過後,由 Gitea Action webhook 呼叫
Pipeline
Step 1 system 讀取變更檔案內容
Step 2 Hermes 程式碼掃描bugs / security / performance
Step 3 OpenClaw 架構品質評估Gemini 2.5 Flash
Step 4 ElephantAlpha 決策協調severity 判定 + auto-fix 裁量)
Step 5 NemoTron 行動派遣action_plans 寫入 + AiderHeal 觸發)
結果輸出:
- ai_insightstype='code_review_result'
- action_planstype='code_review_fix'
- Telegram 告警(啟動 / 完成 / 錯誤)
- 前端 /code-review/ 即時 polling 狀態
"""
import json
import logging
import os
import re
import threading
from datetime import datetime
from typing import Any, Dict, List, Optional
from database.manager import get_session
from sqlalchemy import text
# ADR-027 Phase 2 N3HERMES_MODEL 仍 import純常數HERMES_URL 改 lazy
# 每次 _hermes_scan 才透過 get_hermes_url() 取最新解析GCP 優先 / 111 備援),
# 避免 import-time freeze 導致主機切換不生效。
from services.hermes_analyst_service import HERMES_MODEL as _HERMES_MODEL
from config import get_hermes_url
from services.ai_call_logger import log_ai_call # Operation Ollama-First v5.0 P1
logger = logging.getLogger(__name__)
# ── Pipeline 全域狀態(供前端 polling─────────────────────────────────────
_current_pipeline: Dict[str, Any] = {}
_pipeline_lock = threading.Lock()
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY", "")
# LOCKED-GEMINI: Code Review 全 repo diff 可達 100K+ tokens超過 Ollama 32K context
# Phase 7 升級CODE_REVIEW_USE_CLAUDE=true 時改走 Claude Opus 4.7200K context, Arena code Elo 1548
# 預設 OFF行為與 Phase 6 完全相同ADR-028 鎖定場景 #5
REVIEW_MODEL = os.getenv("OPENCLAW_MODEL", "gemini-2.5-flash")
INTERNAL_TOKEN = os.getenv("INTERNAL_WEBHOOK_TOKEN", "")
AUTO_FIX_ENABLED = os.getenv("CODE_REVIEW_AUTO_FIX_ENABLED", "true").lower() == "true"
ALLOW_INSECURE_WEBHOOK = os.getenv("MOMO_ALLOW_INSECURE_INTERNAL_WEBHOOK_FOR_DEV", "").lower() == "true"
# Phase 7 Frontier 升級 feature flag — 預設 OFF啟用後 _openclaw_assess 改走 Claude Opus 4.7
CODE_REVIEW_USE_CLAUDE = os.getenv("CODE_REVIEW_USE_CLAUDE", "false").lower() == "true"
CLAUDE_REVIEW_MODEL = os.getenv("CLAUDE_MODEL", "claude-opus-4-7")
# ═══════════════════════════════════════════════════════════════════════════════
# Pipeline Class
# ═══════════════════════════════════════════════════════════════════════════════
class CodeReviewPipeline:
"""
5-Step post-deploy code review pipeline.
Call pipeline.run() inside a daemon thread.
"""
def __init__(self, commit_sha: str, changed_files: List[str],
branch: str = "main", deploy_type: str = "sync"):
self.commit_sha = commit_sha
self.branch = branch
self.deploy_type = deploy_type
self.started_at = datetime.now()
self.pipeline_id = f"cr_{commit_sha[:8]}_{self.started_at.strftime('%Y%m%d_%H%M%S')}"
# 只 review Python + YAML 檔(跳過靜態資源)
self.changed_files = [
f for f in changed_files
if f.endswith(('.py', '.yaml', '.yml', '.json'))
and not f.startswith(('node_modules/', '.git/'))
]
self.state: Dict[str, Any] = {
"pipeline_id": self.pipeline_id,
"commit_sha": commit_sha,
"branch": branch,
"changed_files": self.changed_files,
"status": "running",
"current_step": 0,
"total_steps": 5,
"steps": [],
"findings": [],
"severity_summary": {"critical": 0, "high": 0, "medium": 0, "low": 0},
"openclaw_report": "",
"ea_decision": {},
"auto_fix_triggered": False,
"started_at": self.started_at.isoformat(),
"completed_at": None,
"message": "",
}
self._sync_global()
# ── State helpers ─────────────────────────────────────────────────────────
def _sync_global(self):
global _current_pipeline
with _pipeline_lock:
_current_pipeline = dict(self.state)
def _step_start(self, num: int, name: str, agent: str):
self.state["current_step"] = num
self.state["steps"].append({
"step": num,
"name": name,
"agent": agent,
"status": "running",
"started_at": datetime.now().isoformat(),
"completed_at": None,
"summary": "",
})
self._sync_global()
logger.info("[CodeReview] ▶ Step %d/5 %s (%s)", num, name, agent)
def _step_done(self, num: int, summary: str, ok: bool = True):
for s in self.state["steps"]:
if s["step"] == num:
s["status"] = "ok" if ok else "error"
s["completed_at"] = datetime.now().isoformat()
s["summary"] = summary[:300]
self._sync_global()
logger.info("[CodeReview] %s Step %d%s", "" if ok else "", num, summary[:100])
def _finish(self, status: str, message: str):
self.state["status"] = status
self.state["completed_at"] = datetime.now().isoformat()
self.state["message"] = message
self._sync_global()
# ── Main pipeline ─────────────────────────────────────────────────────────
def run(self):
"""Execute full pipeline. Designed to run in daemon thread."""
try:
self._notify_start()
# Step 1 ─ read files
self._step_start(1, "讀取變更檔案", "system")
file_contents = self._read_changed_files()
if not file_contents:
self._step_done(1, "無有效 Python/YAML 變更,跳過 Review")
self._finish("skipped", "無有效變更檔案")
return
self._step_done(1, f"讀取 {len(file_contents)} 個檔案")
# Step 2 ─ Hermes scan
self._step_start(2, "Hermes 程式碼掃描", "Hermes")
findings = self._hermes_scan(file_contents)
cnt = self.state["severity_summary"]
self._step_done(2, f"CRITICAL={cnt['critical']} HIGH={cnt['high']} MEDIUM={cnt['medium']} LOW={cnt['low']}")
# Step 3 ─ OpenClaw assessment
self._step_start(3, "OpenClaw 架構品質評估", "OpenClaw")
openclaw_report = self._openclaw_assess(file_contents, findings)
self.state["openclaw_report"] = openclaw_report
self._step_done(3, openclaw_report[:120] if openclaw_report else "Gemini 未回應)")
# Step 4 ─ ElephantAlpha orchestration
self._step_start(4, "ElephantAlpha 決策協調", "ElephantAlpha")
ea = self._ea_orchestrate(findings, openclaw_report)
self.state["ea_decision"] = ea
self._step_done(4, f"優先度={ea.get('priority','?')} auto_fix={ea.get('auto_fix',False)}")
# Step 5 ─ NemoTron dispatch
self._step_start(5, "NemoTron 行動派遣", "NemoTron")
dispatch = self._nemotron_dispatch(ea, findings)
self._step_done(5, f"寫入 {dispatch['actions']} 筆 action_plans自動修復={'' if dispatch['auto_fix'] else ''}")
# Persist + notify
self._save_to_db(findings, openclaw_report, ea)
self._notify_complete(findings, openclaw_report, ea)
self._finish("completed", "Pipeline 執行完成")
except Exception as e:
logger.error("[CodeReview] Pipeline 例外: %s", e, exc_info=True)
self._finish("error", str(e)[:200])
self._notify_error(str(e))
# ── Step 1讀取檔案 ───────────────────────────────────────────────────────
def _read_changed_files(self) -> Dict[str, str]:
project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
contents: Dict[str, str] = {}
for rel_path in self.changed_files[:8]: # 最多 8 個檔案
abs_path = os.path.join(project_root, rel_path)
try:
with open(abs_path, encoding="utf-8", errors="ignore") as fh:
raw = fh.read()
contents[rel_path] = raw[:6000] + ("\n... (截斷)" if len(raw) > 6000 else "")
except OSError:
logger.debug("[CodeReview] 無法讀取 %s(部署路徑不同?)", rel_path)
return contents
# ── Step 2Hermes 掃描 ───────────────────────────────────────────────────
def _hermes_scan(self, files: Dict[str, str]) -> List[Dict]:
"""走 resolve_ollama_host() 三主機級聯GCP-A → GCP-B → 111ADR-027 Phase 2"""
try:
import requests as _req
files_text = "\n\n".join(
f"### {name}\n```python\n{content}\n```"
for name, content in list(files.items())[:4]
)
prompt = f"""你是資深程式碼審查工程師,請掃描以下程式碼並列出所有問題。
{files_text}
輸出格式:純 JSON 陣列,每項包含以下欄位:
- severity: "CRITICAL" | "HIGH" | "MEDIUM" | "LOW"
- type: "bug" | "security" | "performance" | "maintainability"
- file: 檔案名稱
- line_hint: 約略行號或函式名稱
- description: 問題說明(繁體中文,精簡一句)
- suggestion: 修復建議(繁體中文,精簡一句)
只輸出 JSON 陣列,不含其他文字。無問題時輸出 []"""
# ADR-027 Phase 2 N3lazy resolve Hermes 主機GCP 優先 / 111 備援),
# 避開 import-time freeze。provider 標籤跟著解析結果動態決定。
hermes_url = get_hermes_url()
provider_tag = (
'gcp_ollama' if ('34.21.145.224' in hermes_url or '34.143.170.20' in hermes_url)
else 'ollama_111' if '192.168.0.111' in hermes_url
else 'ollama_other'
)
# Phase 1 v5.0: 包 ai_call_logger 追蹤 Code Review Hermes scan
with log_ai_call(
caller='code_review_hermes',
provider=provider_tag,
model=_HERMES_MODEL,
request_id=f"cr-{self.commit_sha[:8]}",
meta={'commit': self.commit_sha[:8], 'branch': self.branch,
'files': len(files), 'host': hermes_url},
) as _ctx:
resp = _req.post(
f"{hermes_url}/api/generate",
json={"model": _HERMES_MODEL, "prompt": prompt,
"stream": False, "options": {"temperature": 0.1}},
timeout=120,
)
resp.raise_for_status()
body = resp.json()
_ctx.set_tokens(
input=body.get("prompt_eval_count", 0),
output=body.get("eval_count", 0),
)
raw = body.get("response", "").strip()
match = re.search(r"\[.*\]", raw, re.DOTALL)
if not match:
logger.warning("[CodeReview] Hermes 回應無 JSON: %s", raw[:200])
return []
findings = json.loads(match.group())
for f in findings:
sev = f.get("severity", "LOW").lower()
if sev in self.state["severity_summary"]:
self.state["severity_summary"][sev] += 1
self.state["findings"] = findings
self._sync_global()
return findings
except Exception as e:
logger.warning("[CodeReview] Hermes 掃描失敗: %s", e)
return []
# ── Step 3OpenClaw 評估 ──────────────────────────────────────────────────
def _openclaw_assess(self, files: Dict[str, str], findings: List[Dict]) -> str:
"""
路由優先序:
L1 (Phase 7, flag CODE_REVIEW_USE_CLAUDE=true) → Claude Opus 4.7 (Arena code #1)
L2 (預設) → GeminiGEMINI_API_KEY
L3 (降級) → ElephantAlpha via OpenRouter
feature flag 預設 OFF行為與 Phase 6 完全相同。
"""
sev = self.state["severity_summary"]
findings_json = json.dumps(findings[:8], ensure_ascii=False, indent=2)
files_list = "\n".join(f"- {k} ({len(v)} 字元)" for k, v in list(files.items())[:5])
system = (
"你是 OpenClaw 程式碼品質戰略分析師,以技術主管視角評估部署後程式碼。"
"語言:繁體中文。風格:精準、數據導向、可執行建議。"
)
user_prompt = f"""【部署】Commit {self.commit_sha[:8]} @ {self.branch}
【變更檔案】
{files_list}
【Hermes 掃描摘要】CRITICAL={sev['critical']} HIGH={sev['high']} MEDIUM={sev['medium']} LOW={sev['low']}
【問題明細】
{findings_json}
請產出程式碼品質評估(使用 HTML <b> 標題150字以內
<b>🔍 整體風險等級</b>(一句理由)
<b>⚠️ 最需關注問題</b>TOP 2
<b>💡 架構優化方向</b>1條長期建議
<b>✅ 本次部署亮點</b>"""
# ── L1Phase 7 Frontier — Claude Opus 4.7(程式碼能力 #1────────────
# feature flag 預設 OFFON 時優先走,失敗 fallback 到 L2 Gemini
if CODE_REVIEW_USE_CLAUDE:
try:
from services.anthropic_service import anthropic_service
except Exception as e:
logger.warning("[CodeReview] Claude service import 失敗,退回 Gemini: %s", e)
anthropic_service = None # type: ignore
if anthropic_service is not None and anthropic_service.is_available():
with log_ai_call(
caller='code_review_openclaw',
provider='claude',
model=CLAUDE_REVIEW_MODEL,
request_id=f"cr-{self.commit_sha[:8]}",
meta={
'commit': self.commit_sha[:8],
'branch': self.branch,
'flag': 'CODE_REVIEW_USE_CLAUDE',
},
) as _ctx:
resp = anthropic_service.generate(
prompt=user_prompt,
system_prompt=system, # ephemeral cache5 分鐘 TTL省 ~90% 成本)
model=CLAUDE_REVIEW_MODEL,
max_tokens=2048,
temperature=0.2, # code review 要精確
cache_system=True,
timeout=120,
)
if resp.success:
_ctx.set_tokens(input=resp.input_tokens, output=resp.output_tokens)
_ctx.set_cache_hit(resp.cache_hit)
_ctx.add_meta('cache_creation_tokens', resp.cache_creation_tokens)
_ctx.add_meta('cache_read_tokens', resp.cache_read_tokens)
return resp.content or ""
# Claude 失敗 → fallback 到 GeminiL2
_ctx.set_error(resp.error or 'claude generate failed')
_ctx.fallback_to_caller('code_review_openclaw_gemini')
logger.warning(
"[CodeReview] Claude 失敗,降級 Gemini: %s", resp.error,
)
else:
logger.info(
"[CodeReview] CODE_REVIEW_USE_CLAUDE=true 但 Claude 不可用(缺 API key 或 SDK退回 Gemini",
)
# ── L2Gemini — Phase 1 v5.0 logger 追蹤 ────────────────────────────
if GEMINI_API_KEY:
with log_ai_call(
caller='code_review_openclaw',
provider='gemini',
model=REVIEW_MODEL,
request_id=f"cr-{self.commit_sha[:8]}",
meta={'commit': self.commit_sha[:8], 'branch': self.branch},
) as _ctx:
try:
import google.generativeai as genai
genai.configure(api_key=GEMINI_API_KEY)
model = genai.GenerativeModel(
model_name=REVIEW_MODEL,
generation_config=genai.types.GenerationConfig(
temperature=0.3, max_output_tokens=1500,
),
system_instruction=system,
)
resp = model.generate_content(user_prompt, request_options={"timeout": 90})
try:
usage = getattr(resp, 'usage_metadata', None)
if usage is not None:
_ctx.set_tokens(
input=getattr(usage, 'prompt_token_count', 0) or 0,
output=getattr(usage, 'candidates_token_count', 0) or 0,
)
except Exception:
pass
return resp.text or ""
except Exception as e:
logger.warning("[CodeReview] OpenClaw Gemini 失敗,降級 ElephantAlpha: %s", e)
_ctx.set_error(f"{type(e).__name__}: {e}")
_ctx.fallback_to_caller('code_review_elephant')
# 降級ElephantAlpha via OpenRouterOPENROUTER_API_KEY 容器內一定有)
# Phase 1 v5.0 logger 追蹤
with log_ai_call(
caller='code_review_elephant',
provider='nim_via_elephant',
model='nvidia/llama-3.3-nemotron-super-49b-v1.5',
request_id=f"cr-{self.commit_sha[:8]}",
meta={'commit': self.commit_sha[:8], 'branch': self.branch},
) as _ctx:
try:
from services.elephant_service import elephant_service
resp = elephant_service.generate(
prompt=user_prompt,
system_prompt=system,
temperature=0.3,
timeout=90,
)
if resp.success:
# ElephantResponse 已含 input_tokens/output_tokens
_ctx.set_tokens(
input=getattr(resp, 'input_tokens', 0) or 0,
output=getattr(resp, 'output_tokens', 0) or 0,
)
return resp.content or ""
else:
_ctx.set_error(getattr(resp, 'error', 'elephant generate failed'))
except Exception as e:
logger.warning("[CodeReview] OpenClaw ElephantAlpha 降級也失敗: %s", e)
_ctx.set_error(f"{type(e).__name__}: {e}")
return ""
# ── Step 4ElephantAlpha 決策 ─────────────────────────────────────────────
def _ea_orchestrate(self, findings: List[Dict], openclaw_report: str) -> Dict:
sev = self.state["severity_summary"]
critical_n = sev["critical"]
high_n = sev["high"]
# 嘗試呼叫 ElephantAlpha 做精細判斷
try:
from services.elephant_service import elephant_service
top3 = json.dumps(
[f for f in findings if f.get("severity") in ("CRITICAL", "HIGH")][:3],
ensure_ascii=False,
)
prompt = f"""你是 Elephant Alpha負責協調 Code Review 後的修復決策。
【部署】commit={self.commit_sha[:8]} branch={self.branch}
【問題統計】CRITICAL={critical_n} HIGH={high_n} MEDIUM={sev['medium']} LOW={sev['low']}
【Top 問題】{top3}
【OpenClaw評估摘要】{openclaw_report[:300]}
請以 JSON 回答(不含其他文字):
{{
"priority": "critical|high|medium|low",
"auto_fix": true|false,
"reasoning": "決策理由(繁體中文,一句話,需含具體數字)",
"fix_files": ["需自動修復的檔案最多5個所有有問題的檔案"],
"human_review_needed": false
}}
規則(依 ADR-020 全自動修復政策,覆寫 ADR-012 L3 HITL 對 code review 的限制):
- 任何 finding不論 CRITICAL/HIGH/MEDIUM/LOW→ auto_fix=truehuman_review_needed=false
- 安全網是 Git revert + Gitea CI/CD 回滾,不依賴人工審查門檻
- priority 按最嚴重 severity 決定CRITICAL>HIGH>MEDIUM>LOW
- fix_files 填入所有有問題的檔案(最多 5 個AiderHeal 端會再限流)"""
resp = elephant_service.generate(
prompt=prompt,
json_mode=True,
temperature=0.1,
timeout=60,
)
if resp.success:
return self._guard_ea_decision(json.loads(resp.content), findings)
except Exception as e:
logger.warning("[CodeReview] ElephantAlpha 決策失敗,回退規則: %s", e)
# 規則 fallbackADR-020 全自動修復政策。任何 finding 一律 auto_fix=true。
has_findings = len(findings) > 0
priority = (
"critical" if critical_n > 0 else
"high" if high_n > 0 else
"medium" if sev["medium"] > 0 else
"low" if sev["low"] > 0 else "low"
)
auto_fix = bool(has_findings and AUTO_FIX_ENABLED)
fix_files = list({
f.get("file", "") for f in findings if f.get("file")
})[:5]
return {
"priority": priority,
"auto_fix": auto_fix,
"reasoning": f"ADR-020 全自動修復CRITICAL={critical_n} HIGH={high_n} MEDIUM={sev['medium']} LOW={sev['low']}"
+ ("觸發 AiderHeal 自動修復Git+CI/CD 為回滾安全網)" if auto_fix else "無 finding無需修復"),
"fix_files": fix_files,
"human_review_needed": False,
}
def _guard_ea_decision(self, decision: Dict, findings: List[Dict]) -> Dict:
"""ADR-020 全自動修復政策:有 finding 一律 auto_fix=true僅受 AUTO_FIX_ENABLED 主開關控制。"""
sev = self.state["severity_summary"]
priority = (decision.get("priority") or "").lower() or (
"critical" if sev["critical"] > 0 else
"high" if sev["high"] > 0 else
"medium" if sev["medium"] > 0 else
"low"
)
has_findings = bool(findings)
allowed_auto_fix = bool(has_findings and AUTO_FIX_ENABLED)
if has_findings and not AUTO_FIX_ENABLED:
logger.warning(
"[CodeReview] auto_fix 被 CODE_REVIEW_AUTO_FIX_ENABLED=false 主開關擋下 priority=%s",
priority,
)
decision["priority"] = priority
decision["auto_fix"] = allowed_auto_fix
decision["human_review_needed"] = False
decision["reasoning"] = (
f"{decision.get('reasoning', '')} "
f"[ADR-020 全自動修復: auto_fix={'enabled' if allowed_auto_fix else 'flag_disabled'}, priority={priority}]"
).strip()
return decision
# ── Step 5NemoTron 派遣 ──────────────────────────────────────────────────
def _nemotron_dispatch(self, ea: Dict, findings: List[Dict]) -> Dict:
auto_fix = ea.get("auto_fix", False)
fix_files = ea.get("fix_files", [])
priority_map = {"critical": 1, "high": 2, "medium": 3, "low": 4}
priority_num = priority_map.get(ea.get("priority", "low"), 4)
actions_created = 0
session = get_session()
try:
# 每個需修復的檔案建立一筆 action_plan
for fpath in fix_files[:3]:
related = [f for f in findings if f.get("file") == fpath][:3]
desc = f"Code Review 修復:{fpath}{', '.join(f.get('description','')[:40] for f in related)}"
session.execute(text("""
INSERT INTO action_plans
(action_type, description, status, priority, metadata_json, created_at)
VALUES
('code_review_fix', :desc, :status, :priority, :meta, NOW())
"""), {
"desc": desc[:500],
"status": "auto_pending" if auto_fix else "auto_disabled",
"priority": priority_num,
"meta": json.dumps({
"pipeline_id": self.pipeline_id,
"commit_sha": self.commit_sha,
"file": fpath,
"auto_fix": auto_fix,
"ea_priority": ea.get("priority"),
"findings": related,
}, ensure_ascii=False),
})
actions_created += 1
session.commit()
except Exception as e:
logger.warning("[CodeReview] action_plans 寫入失敗: %s", e)
session.rollback()
finally:
session.close()
# 觸發 AiderHeal非阻塞
if auto_fix and fix_files:
self.state["auto_fix_triggered"] = True
self._sync_global()
self._trigger_aider_heal(findings, fix_files)
return {"actions": actions_created, "auto_fix": auto_fix}
def _trigger_aider_heal(self, findings: List[Dict], fix_files: List[str]):
"""非阻塞觸發 AiderHeal 自動修復"""
def _heal_worker():
try:
from services.aider_heal_executor import execute_code_fix
for fpath in fix_files[:2]: # 最多同時修 2 個檔案
related = [f for f in findings if f.get("file") == fpath]
if not related:
continue
worst = sorted(related, key=lambda x: {"CRITICAL":0,"HIGH":1,"MEDIUM":2,"LOW":3}.get(x.get("severity","LOW"),3))[0]
result = execute_code_fix(
error_type=f"code_review_{worst.get('type','bug')}",
error_message=worst.get("description", "Code Review 發現問題"),
target_file=fpath,
context={"suggestion": worst.get("suggestion", ""), "pipeline_id": self.pipeline_id},
)
logger.info("[CodeReview] AiderHeal %s%s", fpath, result.get("message", ""))
except Exception as e:
logger.error("[CodeReview] AiderHeal 觸發失敗: %s", e)
t = threading.Thread(target=_heal_worker, daemon=True, name=f"aider-heal-{self.commit_sha[:8]}")
t.start()
# ── DB 持久化 ──────────────────────────────────────────────────────────────
def _save_to_db(self, findings: List[Dict], openclaw_report: str, ea: Dict):
session = get_session()
try:
row = session.execute(text("""
INSERT INTO ai_insights
(insight_type, content, confidence, created_by,
status, metadata_json, period, created_at)
VALUES
('code_review_result', :content, :conf, 'code_review_pipeline',
'active', :meta, :period, NOW())
RETURNING id
"""), {
"content": json.dumps({
"findings": findings,
"openclaw_report": openclaw_report,
"ea_decision": ea,
"severity_summary": self.state["severity_summary"],
}, ensure_ascii=False)[:8000],
"conf": 0.90,
"meta": json.dumps({
"pipeline_id": self.pipeline_id,
"commit_sha": self.commit_sha,
"branch": self.branch,
"changed_files": self.changed_files,
"auto_fix_triggered": self.state["auto_fix_triggered"],
}, ensure_ascii=False),
"period": datetime.now().strftime("%Y-%m-%d"),
}).fetchone()
session.commit()
if row:
try:
from services.openclaw_learning_service import enqueue_insight_embedding
enqueue_insight_embedding(row[0], "code_review_result", json.dumps({
"findings": findings,
"openclaw_report": openclaw_report,
"ea_decision": ea,
}, ensure_ascii=False), datetime.now().strftime("%Y-%m-%d"))
except Exception as embed_err:
logger.warning("[CodeReview] embedding queue enqueue failed: %s", embed_err)
logger.info("[CodeReview] ai_insights 寫入成功 pipeline=%s", self.pipeline_id)
except Exception as e:
logger.error("[CodeReview] DB 寫入失敗: %s", e)
session.rollback()
finally:
session.close()
# ── Telegram 通知 ─────────────────────────────────────────────────────────
def _notify_start(self):
try:
from services.telegram_templates import _send_telegram_raw
files_list = "\n".join(f"{f}" for f in self.changed_files[:5])
if len(self.changed_files) > 5:
files_list += f"\n +{len(self.changed_files)-5} 個)"
_send_telegram_raw(
f"🔍 <b>Code Review 啟動</b>\n"
f"══════════════════════════\n"
f"📦 Commit <code>{self.commit_sha[:8]}</code> 🌿 {self.branch}\n"
f"📝 變更檔案:\n{files_list}\n"
f"══════════════════════════\n"
f"🤖 <i>Hermes → OpenClaw → Elephant Alpha → NemoTron</i>\n"
f"📊 即時進度https://mo.wooo.work/code-review/"
)
except Exception as e:
logger.warning("[CodeReview] 啟動通知失敗: %s", e)
def _notify_complete(self, findings: List[Dict], openclaw_report: str, ea: Dict):
try:
from services.telegram_templates import _send_telegram_raw
sev = self.state["severity_summary"]
priority = ea.get("priority", "medium")
auto_fix = ea.get("auto_fix", False)
icon = {"critical": "🔴", "high": "🟠", "medium": "🟡", "low": "🟢"}.get(priority, "🟡")
top_issues = [f for f in findings if f.get("severity") in ("CRITICAL", "HIGH")][:3]
issues_lines = "\n".join(
f" {'🔴' if f['severity']=='CRITICAL' else '🟠'} [{f['severity']}] {f.get('description','')} — <i>{f.get('file','')}</i>"
for f in top_issues
) or " ✅ 無高風險問題"
msg = (
f"{icon} <b>Code Review 完成</b> · <code>{self.commit_sha[:8]}</code>\n"
f"══════════════════════════\n"
f"🔴 CRITICAL <b>{sev['critical']}</b> "
f"🟠 HIGH <b>{sev['high']}</b> "
f"🟡 MEDIUM <b>{sev['medium']}</b> "
f"🟢 LOW <b>{sev['low']}</b>\n"
f"══════════════════════════\n"
f"<b>⚠️ 主要問題</b>\n{issues_lines}\n"
)
if openclaw_report:
msg += f"\n{openclaw_report[:400]}\n"
if auto_fix:
fix_status = "🔧 已觸發自動修復AiderHeal"
elif sev['critical'] + sev['high'] + sev['medium'] + sev['low'] == 0:
fix_status = "✅ 無需修復動作"
else:
fix_status = "🛑 自動修復主開關關閉CODE_REVIEW_AUTO_FIX_ENABLED=false"
msg += (
f"══════════════════════════\n"
f"🤖 Elephant Alpha<b>{priority.upper()}</b> {fix_status}\n"
f"📊 完整報告https://mo.wooo.work/code-review/"
)
_send_telegram_raw(msg)
except Exception as e:
logger.warning("[CodeReview] 完成通知失敗: %s", e)
def _notify_error(self, error: str):
try:
from services.telegram_templates import _send_telegram_raw
_send_telegram_raw(
f"🚨 <b>Code Review Pipeline 失敗</b>\n"
f"Commit<code>{self.commit_sha[:8]}</code> @ {self.branch}\n"
f"錯誤:{error[:200]}\n"
f"📊 查看https://mo.wooo.work/code-review/"
)
except Exception:
pass
# ═══════════════════════════════════════════════════════════════════════════════
# 公開 API
# ═══════════════════════════════════════════════════════════════════════════════
def trigger_post_deploy_review(
commit_sha: str,
changed_files: List[str],
branch: str = "main",
deploy_type: str = "sync",
) -> str:
"""
啟動 Pipeline後台 daemon thread
回傳 pipeline_id。由 routes/code_review_routes.py 的 webhook 端點呼叫。
"""
pipeline = CodeReviewPipeline(commit_sha, changed_files, branch, deploy_type)
t = threading.Thread(
target=pipeline.run,
daemon=True,
name=f"code-review-{commit_sha[:8]}",
)
t.start()
logger.info("[CodeReview] 已派發 pipeline=%s files=%d", pipeline.pipeline_id, len(pipeline.changed_files))
return pipeline.pipeline_id
def get_current_state() -> Dict[str, Any]:
"""前端 polling 用:取得目前 pipeline 即時狀態"""
with _pipeline_lock:
return dict(_current_pipeline)
def get_history(limit: int = 20) -> List[Dict]:
"""取得 ai_insights 中歷史 code_review_result 記錄"""
session = get_session()
try:
rows = session.execute(text("""
SELECT id, content, confidence, metadata_json, created_at, status
FROM ai_insights
WHERE insight_type = 'code_review_result'
ORDER BY created_at DESC
LIMIT :lim
"""), {"lim": limit}).fetchall()
results = []
for r in rows:
meta, content = {}, {}
try:
meta = json.loads(r[3]) if r[3] else {}
content = json.loads(r[1]) if r[1] else {}
except Exception:
pass
sev = content.get("severity_summary", {})
results.append({
"id": r[0],
"pipeline_id": meta.get("pipeline_id", ""),
"commit_sha": meta.get("commit_sha", "")[:8],
"branch": meta.get("branch", ""),
"changed_files": meta.get("changed_files", []),
"severity_summary": sev,
"total_issues": sum(sev.values()),
"auto_fix": meta.get("auto_fix_triggered", False),
"findings": content.get("findings", []),
"openclaw_report": content.get("openclaw_report", ""),
"ea_decision": content.get("ea_decision", {}),
"created_at": r[4].isoformat() if r[4] else "",
"status": r[5] or "active",
})
return results
except Exception as e:
logger.warning("[CodeReview] 歷史讀取失敗: %s", e)
return []
finally:
session.close()
def verify_internal_token(request_token: str) -> bool:
"""驗證 CD webhook 來源 token。Production 預設必填,避免外部觸發 auto-review/fix 鏈。"""
if not INTERNAL_TOKEN:
if ALLOW_INSECURE_WEBHOOK:
logger.warning("[CodeReview] INTERNAL_WEBHOOK_TOKEN 未設定,僅因 dev override 放行")
return True
logger.error("[CodeReview] INTERNAL_WEBHOOK_TOKEN 未設定,拒絕 webhook")
return False
return request_token == INTERNAL_TOKEN