Files
awoooi/scripts/ci_code_review.py
Your Name 639bb64788
Some checks failed
Code Review / ai-code-review (push) Successful in 31s
CD Pipeline / build-and-deploy (push) Failing after 5m23s
feat(flywheel): surface ai automation and code review
2026-04-30 00:09:25 +08:00

169 lines
5.3 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""Deterministic AWOOOI CI code review summary.
The workflow-level reviewer intentionally avoids printing matching source lines
so suspected secrets never leak into CI logs or Telegram. It produces a compact
JSON report for the notification layer, while the heavier LLM reviewer can be
plugged in behind the same report shape later.
"""
from __future__ import annotations
import argparse
import json
import re
import subprocess
from pathlib import Path
from typing import Any
SECRET_PATTERN = re.compile(
r"(AIza[0-9A-Za-z_-]{20,}|sk-[A-Za-z0-9]{20,}|"
r"(api[_-]?key|secret|token|password)\s*[:=]\s*['\"]?[A-Za-z0-9_./+=-]{16,})",
re.IGNORECASE,
)
HIGH_RISK_PATTERN = re.compile(
r"(kubectl\s+delete|DROP\s+TABLE|TRUNCATE\s+TABLE|git\s+reset\s+--hard|rm\s+-rf\s+/)",
re.IGNORECASE,
)
def _run(args: list[str], cwd: Path, check: bool = False) -> subprocess.CompletedProcess[str]:
return subprocess.run(
args,
cwd=cwd,
check=check,
capture_output=True,
text=True,
)
def _git_lines(args: list[str], cwd: Path) -> list[str]:
result = _run(["git", *args], cwd)
if result.returncode != 0:
return []
return [line.strip() for line in result.stdout.splitlines() if line.strip()]
def _resolve_range(base: str | None, head: str, cwd: Path) -> str:
if base and base.strip("0"):
base_ok = _run(["git", "rev-parse", "--verify", f"{base}^{{commit}}"], cwd)
if base_ok.returncode == 0:
return f"{base}..{head}"
parent_ok = _run(["git", "rev-parse", "--verify", f"{head}^"], cwd)
if parent_ok.returncode == 0:
return f"{head}^..{head}"
return head
def _changed_files(git_range: str, cwd: Path) -> list[str]:
files = _git_lines(["diff", "--name-only", git_range], cwd)
if files:
return files
return _git_lines(["show", "--pretty=", "--name-only", git_range.split("..")[-1]], cwd)
def _added_lines_for_file(git_range: str, file_path: str, cwd: Path) -> list[str]:
result = _run(["git", "diff", "--unified=0", "--no-color", git_range, "--", file_path], cwd)
if result.returncode != 0:
return []
return [
line[1:]
for line in result.stdout.splitlines()
if line.startswith("+") and not line.startswith("+++")
]
def _diff_check_count(git_range: str, cwd: Path) -> int:
result = _run(["git", "diff", "--check", git_range], cwd)
if result.returncode == 0:
return 0
return len([line for line in result.stdout.splitlines() if line.strip()])
def build_report(base: str | None, head: str, cwd: Path) -> dict[str, Any]:
git_range = _resolve_range(base, head, cwd)
files = _changed_files(git_range, cwd)
secret_files: list[str] = []
high_risk_files: list[str] = []
for file_path in files:
added_lines = _added_lines_for_file(git_range, file_path, cwd)
if any(SECRET_PATTERN.search(line) for line in added_lines):
secret_files.append(file_path)
if any(HIGH_RISK_PATTERN.search(line) for line in added_lines):
high_risk_files.append(file_path)
medium = _diff_check_count(git_range, cwd)
counts = {
"critical": len(secret_files),
"high": len(high_risk_files),
"medium": medium,
"low": 0,
}
if counts["critical"]:
risk = "CRITICAL"
summary = "疑似密鑰或高敏感憑證進入 diff需立即人工確認。"
action = "阻擋部署並清除憑證"
top_issue = "敏感輸入異常:變更檔案中出現疑似 secret"
elif counts["high"]:
risk = "HIGH"
summary = "偵測到破壞性操作語句,需確認是否符合變更窗口與回滾計畫。"
action = "人工複核高風險操作"
top_issue = "破壞性操作kubectl delete / DROP / rm -rf 等模式"
elif counts["medium"]:
risk = "MEDIUM"
summary = "格式或 whitespace 檢查有異常,建議在合併前修正。"
action = "修正 diff check 註記"
top_issue = "格式檢查異常git diff --check 回報問題"
else:
risk = "LOW"
summary = "未發現高風險問題,靜態掃描通過。"
action = "無需修復動作"
top_issue = ""
return {
"range": git_range,
"head": head,
"files": files,
"counts": counts,
"risk": risk,
"summary": summary,
"action": action,
"top_issue": top_issue,
"agents": ["Hermes", "OpenClaw", "ElephantAlpha", "NemoTron"],
}
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("--base", default="")
parser.add_argument("--head", required=True)
parser.add_argument("--repo", default=".")
parser.add_argument("--output", required=True)
args = parser.parse_args()
report = build_report(args.base or None, args.head, Path(args.repo).resolve())
Path(args.output).write_text(
json.dumps(report, ensure_ascii=False, indent=2),
encoding="utf-8",
)
print(
"Code review report:",
json.dumps(
{
"risk": report["risk"],
"counts": report["counts"],
"files": len(report["files"]),
},
ensure_ascii=False,
),
)
return 0
if __name__ == "__main__":
raise SystemExit(main())