Files
awoooi/scripts/security/git-remote-refs-probe.py
Your Name 9e15fd08b3
All checks were successful
CD Pipeline / tests (push) Successful in 1m39s
Code Review / ai-code-review (push) Successful in 15s
CD Pipeline / build-and-deploy (push) Successful in 5m19s
CD Pipeline / post-deploy-checks (push) Successful in 2m11s
feat(web): land iwooos security posture surfaces
2026-05-25 20:35:52 +08:00

204 lines
7.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""Git remote refs 只讀探測工具。
此工具用 `git ls-remote --heads --tags` 讀取指定本機 repo 的 remote refs
並比對本機 HEAD / branch。它不 fetch、不 clone、不 push也不修改 remote。
"""
from __future__ import annotations
import argparse
import json
import subprocess
from pathlib import Path
from urllib.parse import urlsplit, urlunsplit
def redact_url(value: str) -> str:
if "://" not in value:
if "@" in value and ":" in value.split("@", 1)[1]:
return value.split("@", 1)[1]
return value
parts = urlsplit(value)
netloc = parts.netloc.split("@", 1)[-1]
return urlunsplit((parts.scheme, netloc, parts.path, parts.query, parts.fragment))
def run_git(repo: Path, args: list[str], timeout: int) -> subprocess.CompletedProcess[str]:
try:
return subprocess.run(
["git", *args],
cwd=repo,
check=False,
capture_output=True,
text=True,
timeout=timeout,
)
except subprocess.TimeoutExpired:
return subprocess.CompletedProcess(["git", *args], 124, "", "git command timeout")
def git_value(repo: Path, args: list[str], timeout: int) -> str:
result = run_git(repo, args, timeout)
if result.returncode != 0:
return ""
return result.stdout.strip()
def parse_refs(output: str) -> tuple[dict[str, str], dict[str, str]]:
heads: dict[str, str] = {}
tags: dict[str, str] = {}
for line in output.splitlines():
parts = line.split()
if len(parts) != 2:
continue
sha, ref = parts
if ref.startswith("refs/heads/"):
heads[ref.removeprefix("refs/heads/")] = sha
elif ref.startswith("refs/tags/"):
tag = ref.removeprefix("refs/tags/").removesuffix("^{}")
tags[tag] = sha
return heads, tags
def parse_repo_arg(value: str) -> tuple[str, Path, str]:
parts = value.split("=")
if len(parts) not in (2, 3):
raise argparse.ArgumentTypeError("--repo 必須是 label=/absolute/path 或 label=/absolute/path=remote")
label = parts[0].strip()
path = Path(parts[1].strip()).expanduser().resolve()
remote = parts[2].strip() if len(parts) == 3 else "origin"
if not label or not str(path) or not remote:
raise argparse.ArgumentTypeError("--repo label/path/remote 不可為空")
return label, path, remote
def probe_repo(label: str, repo: Path, remote: str, timeout: int) -> dict[str, object]:
local_head = git_value(repo, ["rev-parse", "HEAD"], timeout)
local_branch = git_value(repo, ["branch", "--show-current"], timeout)
remote_url = git_value(repo, ["remote", "get-url", remote], timeout)
ls_remote = run_git(repo, ["ls-remote", "--heads", "--tags", remote], timeout)
if ls_remote.returncode == 0:
heads, tags = parse_refs(ls_remote.stdout)
remote_branch_sha = heads.get(local_branch, "")
if local_branch and remote_branch_sha and remote_branch_sha == local_head:
status = "aligned_current_branch"
elif heads:
status = "reachable_drift_or_unknown"
else:
status = "reachable_no_heads"
error_summary = ""
else:
heads = {}
tags = {}
remote_branch_sha = ""
status = "unreachable"
stderr = ls_remote.stderr.strip()
if "Permission denied" in stderr:
error_summary = "SSH 權限不足或 remote 不可讀"
elif "Repository not found" in stderr:
error_summary = "remote repo not found 或未授權"
else:
tail = (stderr or ls_remote.stdout.strip()).splitlines()[-1:]
error_summary = tail[0] if tail else "git ls-remote failed"
return {
"label": label,
"repo_path": str(repo),
"remote": remote,
"remote_url_redacted": redact_url(remote_url),
"status": status,
"local_branch": local_branch,
"local_head": local_head,
"remote_current_branch_sha": remote_branch_sha,
"head_count": len(heads),
"tag_count": len(tags),
"heads": [{"name": name, "sha": sha} for name, sha in sorted(heads.items())],
"tags": [{"name": name, "sha": sha} for name, sha in sorted(tags.items())],
"error_summary": error_summary,
}
def build_payload(group_name: str, repo_args: list[tuple[str, Path, str]], timeout: int) -> dict[str, object]:
repos = [probe_repo(label, path, remote, timeout) for label, path, remote in repo_args]
aligned = sum(1 for repo in repos if repo["status"] == "aligned_current_branch")
unreachable = sum(1 for repo in repos if repo["status"] == "unreachable")
return {
"schema_version": "git_remote_refs_probe_v1",
"group_name": group_name,
"status": "ok" if unreachable == 0 else "partial",
"repo_count": len(repos),
"aligned_current_branch_count": aligned,
"unreachable_count": unreachable,
"repos": repos,
}
def write_markdown(payload: dict[str, object], path: Path) -> None:
lines = [
"# Git Remote Refs Probe 快照",
"",
"| 項目 | 值 |",
"|------|----|",
f"| 群組 | `{payload['group_name']}` |",
f"| 狀態 | `{payload['status']}` |",
f"| repo 數 | `{payload['repo_count']}` |",
f"| aligned current branch | `{payload['aligned_current_branch_count']}` |",
f"| unreachable | `{payload['unreachable_count']}` |",
"",
"## Repo refs",
"",
"| Label | Remote URL | Status | Local branch | Local HEAD | Remote branch SHA | Heads | Tags | Error |",
"|-------|------------|--------|--------------|------------|-------------------|-------|------|-------|",
]
for repo in payload.get("repos", []):
if not isinstance(repo, dict):
continue
lines.append(
"| "
+ " | ".join(
[
f"`{repo.get('label', '')}`",
f"`{repo.get('remote_url_redacted', '')}`",
f"`{repo.get('status', '')}`",
f"`{repo.get('local_branch', '')}`",
f"`{str(repo.get('local_head', ''))[:7]}`",
f"`{str(repo.get('remote_current_branch_sha', ''))[:7]}`",
f"`{repo.get('head_count', 0)}`",
f"`{repo.get('tag_count', 0)}`",
str(repo.get("error_summary", "") or ""),
]
)
+ " |"
)
lines.extend(
[
"",
"> 注意:本檔只使用 `git ls-remote` 做 read-only refs 探測;未 fetch、未 clone、未 push。",
"",
]
)
path.write_text("\n".join(lines), encoding="utf-8")
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("--group-name", required=True)
parser.add_argument("--repo", action="append", type=parse_repo_arg, required=True)
parser.add_argument("--output-json", required=True)
parser.add_argument("--output-md", required=True)
parser.add_argument("--timeout", type=int, default=10)
args = parser.parse_args()
payload = build_payload(args.group_name, args.repo, args.timeout)
Path(args.output_json).write_text(
json.dumps(payload, ensure_ascii=False, indent=2) + "\n",
encoding="utf-8",
)
write_markdown(payload, Path(args.output_md))
return 0
if __name__ == "__main__":
raise SystemExit(main())