feat(flywheel): surface ai automation and code review
Some checks failed
Code Review / ai-code-review (push) Successful in 31s
CD Pipeline / build-and-deploy (push) Failing after 5m23s

This commit is contained in:
Your Name
2026-04-30 00:09:25 +08:00
parent d197e2785d
commit 639bb64788
8 changed files with 734 additions and 26 deletions

168
scripts/ci_code_review.py Executable file
View File

@@ -0,0 +1,168 @@
#!/usr/bin/env python3
"""Deterministic AWOOOI CI code review summary.
The workflow-level reviewer intentionally avoids printing matching source lines
so suspected secrets never leak into CI logs or Telegram. It produces a compact
JSON report for the notification layer, while the heavier LLM reviewer can be
plugged in behind the same report shape later.
"""
from __future__ import annotations
import argparse
import json
import re
import subprocess
from pathlib import Path
from typing import Any
SECRET_PATTERN = re.compile(
r"(AIza[0-9A-Za-z_-]{20,}|sk-[A-Za-z0-9]{20,}|"
r"(api[_-]?key|secret|token|password)\s*[:=]\s*['\"]?[A-Za-z0-9_./+=-]{16,})",
re.IGNORECASE,
)
HIGH_RISK_PATTERN = re.compile(
r"(kubectl\s+delete|DROP\s+TABLE|TRUNCATE\s+TABLE|git\s+reset\s+--hard|rm\s+-rf\s+/)",
re.IGNORECASE,
)
def _run(args: list[str], cwd: Path, check: bool = False) -> subprocess.CompletedProcess[str]:
return subprocess.run(
args,
cwd=cwd,
check=check,
capture_output=True,
text=True,
)
def _git_lines(args: list[str], cwd: Path) -> list[str]:
result = _run(["git", *args], cwd)
if result.returncode != 0:
return []
return [line.strip() for line in result.stdout.splitlines() if line.strip()]
def _resolve_range(base: str | None, head: str, cwd: Path) -> str:
if base and base.strip("0"):
base_ok = _run(["git", "rev-parse", "--verify", f"{base}^{{commit}}"], cwd)
if base_ok.returncode == 0:
return f"{base}..{head}"
parent_ok = _run(["git", "rev-parse", "--verify", f"{head}^"], cwd)
if parent_ok.returncode == 0:
return f"{head}^..{head}"
return head
def _changed_files(git_range: str, cwd: Path) -> list[str]:
files = _git_lines(["diff", "--name-only", git_range], cwd)
if files:
return files
return _git_lines(["show", "--pretty=", "--name-only", git_range.split("..")[-1]], cwd)
def _added_lines_for_file(git_range: str, file_path: str, cwd: Path) -> list[str]:
result = _run(["git", "diff", "--unified=0", "--no-color", git_range, "--", file_path], cwd)
if result.returncode != 0:
return []
return [
line[1:]
for line in result.stdout.splitlines()
if line.startswith("+") and not line.startswith("+++")
]
def _diff_check_count(git_range: str, cwd: Path) -> int:
result = _run(["git", "diff", "--check", git_range], cwd)
if result.returncode == 0:
return 0
return len([line for line in result.stdout.splitlines() if line.strip()])
def build_report(base: str | None, head: str, cwd: Path) -> dict[str, Any]:
git_range = _resolve_range(base, head, cwd)
files = _changed_files(git_range, cwd)
secret_files: list[str] = []
high_risk_files: list[str] = []
for file_path in files:
added_lines = _added_lines_for_file(git_range, file_path, cwd)
if any(SECRET_PATTERN.search(line) for line in added_lines):
secret_files.append(file_path)
if any(HIGH_RISK_PATTERN.search(line) for line in added_lines):
high_risk_files.append(file_path)
medium = _diff_check_count(git_range, cwd)
counts = {
"critical": len(secret_files),
"high": len(high_risk_files),
"medium": medium,
"low": 0,
}
if counts["critical"]:
risk = "CRITICAL"
summary = "疑似密鑰或高敏感憑證進入 diff需立即人工確認。"
action = "阻擋部署並清除憑證"
top_issue = "敏感輸入異常:變更檔案中出現疑似 secret"
elif counts["high"]:
risk = "HIGH"
summary = "偵測到破壞性操作語句,需確認是否符合變更窗口與回滾計畫。"
action = "人工複核高風險操作"
top_issue = "破壞性操作kubectl delete / DROP / rm -rf 等模式"
elif counts["medium"]:
risk = "MEDIUM"
summary = "格式或 whitespace 檢查有異常,建議在合併前修正。"
action = "修正 diff check 註記"
top_issue = "格式檢查異常git diff --check 回報問題"
else:
risk = "LOW"
summary = "未發現高風險問題,靜態掃描通過。"
action = "無需修復動作"
top_issue = ""
return {
"range": git_range,
"head": head,
"files": files,
"counts": counts,
"risk": risk,
"summary": summary,
"action": action,
"top_issue": top_issue,
"agents": ["Hermes", "OpenClaw", "ElephantAlpha", "NemoTron"],
}
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("--base", default="")
parser.add_argument("--head", required=True)
parser.add_argument("--repo", default=".")
parser.add_argument("--output", required=True)
args = parser.parse_args()
report = build_report(args.base or None, args.head, Path(args.repo).resolve())
Path(args.output).write_text(
json.dumps(report, ensure_ascii=False, indent=2),
encoding="utf-8",
)
print(
"Code review report:",
json.dumps(
{
"risk": report["risk"],
"counts": report["counts"],
"files": len(report["files"]),
},
ensure_ascii=False,
),
)
return 0
if __name__ == "__main__":
raise SystemExit(main())