169 lines
5.3 KiB
Python
Executable File
169 lines
5.3 KiB
Python
Executable File
#!/usr/bin/env python3
|
||
"""Deterministic AWOOOI CI code review summary.
|
||
|
||
The workflow-level reviewer intentionally avoids printing matching source lines
|
||
so suspected secrets never leak into CI logs or Telegram. It produces a compact
|
||
JSON report for the notification layer, while the heavier LLM reviewer can be
|
||
plugged in behind the same report shape later.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import json
|
||
import re
|
||
import subprocess
|
||
from pathlib import Path
|
||
from typing import Any
|
||
|
||
|
||
SECRET_PATTERN = re.compile(
|
||
r"(AIza[0-9A-Za-z_-]{20,}|sk-[A-Za-z0-9]{20,}|"
|
||
r"(api[_-]?key|secret|token|password)\s*[:=]\s*['\"]?[A-Za-z0-9_./+=-]{16,})",
|
||
re.IGNORECASE,
|
||
)
|
||
HIGH_RISK_PATTERN = re.compile(
|
||
r"(kubectl\s+delete|DROP\s+TABLE|TRUNCATE\s+TABLE|git\s+reset\s+--hard|rm\s+-rf\s+/)",
|
||
re.IGNORECASE,
|
||
)
|
||
|
||
|
||
def _run(args: list[str], cwd: Path, check: bool = False) -> subprocess.CompletedProcess[str]:
|
||
return subprocess.run(
|
||
args,
|
||
cwd=cwd,
|
||
check=check,
|
||
capture_output=True,
|
||
text=True,
|
||
)
|
||
|
||
|
||
def _git_lines(args: list[str], cwd: Path) -> list[str]:
|
||
result = _run(["git", *args], cwd)
|
||
if result.returncode != 0:
|
||
return []
|
||
return [line.strip() for line in result.stdout.splitlines() if line.strip()]
|
||
|
||
|
||
def _resolve_range(base: str | None, head: str, cwd: Path) -> str:
|
||
if base and base.strip("0"):
|
||
base_ok = _run(["git", "rev-parse", "--verify", f"{base}^{{commit}}"], cwd)
|
||
if base_ok.returncode == 0:
|
||
return f"{base}..{head}"
|
||
|
||
parent_ok = _run(["git", "rev-parse", "--verify", f"{head}^"], cwd)
|
||
if parent_ok.returncode == 0:
|
||
return f"{head}^..{head}"
|
||
return head
|
||
|
||
|
||
def _changed_files(git_range: str, cwd: Path) -> list[str]:
|
||
files = _git_lines(["diff", "--name-only", git_range], cwd)
|
||
if files:
|
||
return files
|
||
return _git_lines(["show", "--pretty=", "--name-only", git_range.split("..")[-1]], cwd)
|
||
|
||
|
||
def _added_lines_for_file(git_range: str, file_path: str, cwd: Path) -> list[str]:
|
||
result = _run(["git", "diff", "--unified=0", "--no-color", git_range, "--", file_path], cwd)
|
||
if result.returncode != 0:
|
||
return []
|
||
return [
|
||
line[1:]
|
||
for line in result.stdout.splitlines()
|
||
if line.startswith("+") and not line.startswith("+++")
|
||
]
|
||
|
||
|
||
def _diff_check_count(git_range: str, cwd: Path) -> int:
|
||
result = _run(["git", "diff", "--check", git_range], cwd)
|
||
if result.returncode == 0:
|
||
return 0
|
||
return len([line for line in result.stdout.splitlines() if line.strip()])
|
||
|
||
|
||
def build_report(base: str | None, head: str, cwd: Path) -> dict[str, Any]:
|
||
git_range = _resolve_range(base, head, cwd)
|
||
files = _changed_files(git_range, cwd)
|
||
|
||
secret_files: list[str] = []
|
||
high_risk_files: list[str] = []
|
||
for file_path in files:
|
||
added_lines = _added_lines_for_file(git_range, file_path, cwd)
|
||
if any(SECRET_PATTERN.search(line) for line in added_lines):
|
||
secret_files.append(file_path)
|
||
if any(HIGH_RISK_PATTERN.search(line) for line in added_lines):
|
||
high_risk_files.append(file_path)
|
||
|
||
medium = _diff_check_count(git_range, cwd)
|
||
counts = {
|
||
"critical": len(secret_files),
|
||
"high": len(high_risk_files),
|
||
"medium": medium,
|
||
"low": 0,
|
||
}
|
||
|
||
if counts["critical"]:
|
||
risk = "CRITICAL"
|
||
summary = "疑似密鑰或高敏感憑證進入 diff,需立即人工確認。"
|
||
action = "阻擋部署並清除憑證"
|
||
top_issue = "敏感輸入異常:變更檔案中出現疑似 secret"
|
||
elif counts["high"]:
|
||
risk = "HIGH"
|
||
summary = "偵測到破壞性操作語句,需確認是否符合變更窗口與回滾計畫。"
|
||
action = "人工複核高風險操作"
|
||
top_issue = "破壞性操作:kubectl delete / DROP / rm -rf 等模式"
|
||
elif counts["medium"]:
|
||
risk = "MEDIUM"
|
||
summary = "格式或 whitespace 檢查有異常,建議在合併前修正。"
|
||
action = "修正 diff check 註記"
|
||
top_issue = "格式檢查異常:git diff --check 回報問題"
|
||
else:
|
||
risk = "LOW"
|
||
summary = "未發現高風險問題,靜態掃描通過。"
|
||
action = "無需修復動作"
|
||
top_issue = "無"
|
||
|
||
return {
|
||
"range": git_range,
|
||
"head": head,
|
||
"files": files,
|
||
"counts": counts,
|
||
"risk": risk,
|
||
"summary": summary,
|
||
"action": action,
|
||
"top_issue": top_issue,
|
||
"agents": ["Hermes", "OpenClaw", "ElephantAlpha", "NemoTron"],
|
||
}
|
||
|
||
|
||
def main() -> int:
|
||
parser = argparse.ArgumentParser()
|
||
parser.add_argument("--base", default="")
|
||
parser.add_argument("--head", required=True)
|
||
parser.add_argument("--repo", default=".")
|
||
parser.add_argument("--output", required=True)
|
||
args = parser.parse_args()
|
||
|
||
report = build_report(args.base or None, args.head, Path(args.repo).resolve())
|
||
Path(args.output).write_text(
|
||
json.dumps(report, ensure_ascii=False, indent=2),
|
||
encoding="utf-8",
|
||
)
|
||
print(
|
||
"Code review report:",
|
||
json.dumps(
|
||
{
|
||
"risk": report["risk"],
|
||
"counts": report["counts"],
|
||
"files": len(report["files"]),
|
||
},
|
||
ensure_ascii=False,
|
||
),
|
||
)
|
||
return 0
|
||
|
||
|
||
if __name__ == "__main__":
|
||
raise SystemExit(main())
|