#!/usr/bin/env python3 """Deterministic AWOOOI CI code review summary. The workflow-level reviewer intentionally avoids printing matching source lines so suspected secrets never leak into CI logs or Telegram. It produces a compact JSON report for the notification layer, while the heavier LLM reviewer can be plugged in behind the same report shape later. """ from __future__ import annotations import argparse import json import re import subprocess from pathlib import Path from typing import Any SECRET_PATTERN = re.compile( r"(AIza[0-9A-Za-z_-]{20,}|sk-[A-Za-z0-9]{20,}|" r"(api[_-]?key|secret|token|password)\s*[:=]\s*['\"]?[A-Za-z0-9_./+=-]{16,})", re.IGNORECASE, ) HIGH_RISK_PATTERN = re.compile( r"(kubectl\s+delete|DROP\s+TABLE|TRUNCATE\s+TABLE|git\s+reset\s+--hard|rm\s+-rf\s+/)", re.IGNORECASE, ) def _run(args: list[str], cwd: Path, check: bool = False) -> subprocess.CompletedProcess[str]: return subprocess.run( args, cwd=cwd, check=check, capture_output=True, text=True, ) def _git_lines(args: list[str], cwd: Path) -> list[str]: result = _run(["git", *args], cwd) if result.returncode != 0: return [] return [line.strip() for line in result.stdout.splitlines() if line.strip()] def _resolve_range(base: str | None, head: str, cwd: Path) -> str: if base and base.strip("0"): base_ok = _run(["git", "rev-parse", "--verify", f"{base}^{{commit}}"], cwd) if base_ok.returncode == 0: return f"{base}..{head}" parent_ok = _run(["git", "rev-parse", "--verify", f"{head}^"], cwd) if parent_ok.returncode == 0: return f"{head}^..{head}" return head def _changed_files(git_range: str, cwd: Path) -> list[str]: files = _git_lines(["diff", "--name-only", git_range], cwd) if files: return files return _git_lines(["show", "--pretty=", "--name-only", git_range.split("..")[-1]], cwd) def _added_lines_for_file(git_range: str, file_path: str, cwd: Path) -> list[str]: result = _run(["git", "diff", "--unified=0", "--no-color", git_range, "--", file_path], cwd) if result.returncode != 0: return [] return [ line[1:] for line in result.stdout.splitlines() if line.startswith("+") and not line.startswith("+++") ] def _diff_check_count(git_range: str, cwd: Path) -> int: result = _run(["git", "diff", "--check", git_range], cwd) if result.returncode == 0: return 0 return len([line for line in result.stdout.splitlines() if line.strip()]) def build_report(base: str | None, head: str, cwd: Path) -> dict[str, Any]: git_range = _resolve_range(base, head, cwd) files = _changed_files(git_range, cwd) secret_files: list[str] = [] high_risk_files: list[str] = [] for file_path in files: added_lines = _added_lines_for_file(git_range, file_path, cwd) if any(SECRET_PATTERN.search(line) for line in added_lines): secret_files.append(file_path) if any(HIGH_RISK_PATTERN.search(line) for line in added_lines): high_risk_files.append(file_path) medium = _diff_check_count(git_range, cwd) counts = { "critical": len(secret_files), "high": len(high_risk_files), "medium": medium, "low": 0, } if counts["critical"]: risk = "CRITICAL" summary = "疑似密鑰或高敏感憑證進入 diff,需立即人工確認。" action = "阻擋部署並清除憑證" top_issue = "敏感輸入異常:變更檔案中出現疑似 secret" elif counts["high"]: risk = "HIGH" summary = "偵測到破壞性操作語句,需確認是否符合變更窗口與回滾計畫。" action = "人工複核高風險操作" top_issue = "破壞性操作:kubectl delete / DROP / rm -rf 等模式" elif counts["medium"]: risk = "MEDIUM" summary = "格式或 whitespace 檢查有異常,建議在合併前修正。" action = "修正 diff check 註記" top_issue = "格式檢查異常:git diff --check 回報問題" else: risk = "LOW" summary = "未發現高風險問題,靜態掃描通過。" action = "無需修復動作" top_issue = "無" return { "range": git_range, "head": head, "files": files, "counts": counts, "risk": risk, "summary": summary, "action": action, "top_issue": top_issue, "agents": ["Hermes", "OpenClaw", "ElephantAlpha", "NemoTron"], } def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--base", default="") parser.add_argument("--head", required=True) parser.add_argument("--repo", default=".") parser.add_argument("--output", required=True) args = parser.parse_args() report = build_report(args.base or None, args.head, Path(args.repo).resolve()) Path(args.output).write_text( json.dumps(report, ensure_ascii=False, indent=2), encoding="utf-8", ) print( "Code review report:", json.dumps( { "risk": report["risk"], "counts": report["counts"], "files": len(report["files"]), }, ensure_ascii=False, ), ) return 0 if __name__ == "__main__": raise SystemExit(main())