204 lines
7.4 KiB
Python
204 lines
7.4 KiB
Python
#!/usr/bin/env python3
|
||
"""Git remote refs 只讀探測工具。
|
||
|
||
此工具用 `git ls-remote --heads --tags` 讀取指定本機 repo 的 remote refs,
|
||
並比對本機 HEAD / branch。它不 fetch、不 clone、不 push,也不修改 remote。
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import json
|
||
import subprocess
|
||
from pathlib import Path
|
||
from urllib.parse import urlsplit, urlunsplit
|
||
|
||
|
||
def redact_url(value: str) -> str:
|
||
if "://" not in value:
|
||
if "@" in value and ":" in value.split("@", 1)[1]:
|
||
return value.split("@", 1)[1]
|
||
return value
|
||
parts = urlsplit(value)
|
||
netloc = parts.netloc.split("@", 1)[-1]
|
||
return urlunsplit((parts.scheme, netloc, parts.path, parts.query, parts.fragment))
|
||
|
||
|
||
def run_git(repo: Path, args: list[str], timeout: int) -> subprocess.CompletedProcess[str]:
|
||
try:
|
||
return subprocess.run(
|
||
["git", *args],
|
||
cwd=repo,
|
||
check=False,
|
||
capture_output=True,
|
||
text=True,
|
||
timeout=timeout,
|
||
)
|
||
except subprocess.TimeoutExpired:
|
||
return subprocess.CompletedProcess(["git", *args], 124, "", "git command timeout")
|
||
|
||
|
||
def git_value(repo: Path, args: list[str], timeout: int) -> str:
|
||
result = run_git(repo, args, timeout)
|
||
if result.returncode != 0:
|
||
return ""
|
||
return result.stdout.strip()
|
||
|
||
|
||
def parse_refs(output: str) -> tuple[dict[str, str], dict[str, str]]:
|
||
heads: dict[str, str] = {}
|
||
tags: dict[str, str] = {}
|
||
for line in output.splitlines():
|
||
parts = line.split()
|
||
if len(parts) != 2:
|
||
continue
|
||
sha, ref = parts
|
||
if ref.startswith("refs/heads/"):
|
||
heads[ref.removeprefix("refs/heads/")] = sha
|
||
elif ref.startswith("refs/tags/"):
|
||
tag = ref.removeprefix("refs/tags/").removesuffix("^{}")
|
||
tags[tag] = sha
|
||
return heads, tags
|
||
|
||
|
||
def parse_repo_arg(value: str) -> tuple[str, Path, str]:
|
||
parts = value.split("=")
|
||
if len(parts) not in (2, 3):
|
||
raise argparse.ArgumentTypeError("--repo 必須是 label=/absolute/path 或 label=/absolute/path=remote")
|
||
label = parts[0].strip()
|
||
path = Path(parts[1].strip()).expanduser().resolve()
|
||
remote = parts[2].strip() if len(parts) == 3 else "origin"
|
||
if not label or not str(path) or not remote:
|
||
raise argparse.ArgumentTypeError("--repo label/path/remote 不可為空")
|
||
return label, path, remote
|
||
|
||
|
||
def probe_repo(label: str, repo: Path, remote: str, timeout: int) -> dict[str, object]:
|
||
local_head = git_value(repo, ["rev-parse", "HEAD"], timeout)
|
||
local_branch = git_value(repo, ["branch", "--show-current"], timeout)
|
||
remote_url = git_value(repo, ["remote", "get-url", remote], timeout)
|
||
ls_remote = run_git(repo, ["ls-remote", "--heads", "--tags", remote], timeout)
|
||
if ls_remote.returncode == 0:
|
||
heads, tags = parse_refs(ls_remote.stdout)
|
||
remote_branch_sha = heads.get(local_branch, "")
|
||
if local_branch and remote_branch_sha and remote_branch_sha == local_head:
|
||
status = "aligned_current_branch"
|
||
elif heads:
|
||
status = "reachable_drift_or_unknown"
|
||
else:
|
||
status = "reachable_no_heads"
|
||
error_summary = ""
|
||
else:
|
||
heads = {}
|
||
tags = {}
|
||
remote_branch_sha = ""
|
||
status = "unreachable"
|
||
stderr = ls_remote.stderr.strip()
|
||
if "Permission denied" in stderr:
|
||
error_summary = "SSH 權限不足或 remote 不可讀"
|
||
elif "Repository not found" in stderr:
|
||
error_summary = "remote repo not found 或未授權"
|
||
else:
|
||
tail = (stderr or ls_remote.stdout.strip()).splitlines()[-1:]
|
||
error_summary = tail[0] if tail else "git ls-remote failed"
|
||
|
||
return {
|
||
"label": label,
|
||
"repo_path": str(repo),
|
||
"remote": remote,
|
||
"remote_url_redacted": redact_url(remote_url),
|
||
"status": status,
|
||
"local_branch": local_branch,
|
||
"local_head": local_head,
|
||
"remote_current_branch_sha": remote_branch_sha,
|
||
"head_count": len(heads),
|
||
"tag_count": len(tags),
|
||
"heads": [{"name": name, "sha": sha} for name, sha in sorted(heads.items())],
|
||
"tags": [{"name": name, "sha": sha} for name, sha in sorted(tags.items())],
|
||
"error_summary": error_summary,
|
||
}
|
||
|
||
|
||
def build_payload(group_name: str, repo_args: list[tuple[str, Path, str]], timeout: int) -> dict[str, object]:
|
||
repos = [probe_repo(label, path, remote, timeout) for label, path, remote in repo_args]
|
||
aligned = sum(1 for repo in repos if repo["status"] == "aligned_current_branch")
|
||
unreachable = sum(1 for repo in repos if repo["status"] == "unreachable")
|
||
return {
|
||
"schema_version": "git_remote_refs_probe_v1",
|
||
"group_name": group_name,
|
||
"status": "ok" if unreachable == 0 else "partial",
|
||
"repo_count": len(repos),
|
||
"aligned_current_branch_count": aligned,
|
||
"unreachable_count": unreachable,
|
||
"repos": repos,
|
||
}
|
||
|
||
|
||
def write_markdown(payload: dict[str, object], path: Path) -> None:
|
||
lines = [
|
||
"# Git Remote Refs Probe 快照",
|
||
"",
|
||
"| 項目 | 值 |",
|
||
"|------|----|",
|
||
f"| 群組 | `{payload['group_name']}` |",
|
||
f"| 狀態 | `{payload['status']}` |",
|
||
f"| repo 數 | `{payload['repo_count']}` |",
|
||
f"| aligned current branch | `{payload['aligned_current_branch_count']}` |",
|
||
f"| unreachable | `{payload['unreachable_count']}` |",
|
||
"",
|
||
"## Repo refs",
|
||
"",
|
||
"| Label | Remote URL | Status | Local branch | Local HEAD | Remote branch SHA | Heads | Tags | Error |",
|
||
"|-------|------------|--------|--------------|------------|-------------------|-------|------|-------|",
|
||
]
|
||
for repo in payload.get("repos", []):
|
||
if not isinstance(repo, dict):
|
||
continue
|
||
lines.append(
|
||
"| "
|
||
+ " | ".join(
|
||
[
|
||
f"`{repo.get('label', '')}`",
|
||
f"`{repo.get('remote_url_redacted', '')}`",
|
||
f"`{repo.get('status', '')}`",
|
||
f"`{repo.get('local_branch', '')}`",
|
||
f"`{str(repo.get('local_head', ''))[:7]}`",
|
||
f"`{str(repo.get('remote_current_branch_sha', ''))[:7]}`",
|
||
f"`{repo.get('head_count', 0)}`",
|
||
f"`{repo.get('tag_count', 0)}`",
|
||
str(repo.get("error_summary", "") or "無"),
|
||
]
|
||
)
|
||
+ " |"
|
||
)
|
||
lines.extend(
|
||
[
|
||
"",
|
||
"> 注意:本檔只使用 `git ls-remote` 做 read-only refs 探測;未 fetch、未 clone、未 push。",
|
||
"",
|
||
]
|
||
)
|
||
path.write_text("\n".join(lines), encoding="utf-8")
|
||
|
||
|
||
def main() -> int:
|
||
parser = argparse.ArgumentParser()
|
||
parser.add_argument("--group-name", required=True)
|
||
parser.add_argument("--repo", action="append", type=parse_repo_arg, required=True)
|
||
parser.add_argument("--output-json", required=True)
|
||
parser.add_argument("--output-md", required=True)
|
||
parser.add_argument("--timeout", type=int, default=10)
|
||
args = parser.parse_args()
|
||
|
||
payload = build_payload(args.group_name, args.repo, args.timeout)
|
||
Path(args.output_json).write_text(
|
||
json.dumps(payload, ensure_ascii=False, indent=2) + "\n",
|
||
encoding="utf-8",
|
||
)
|
||
write_markdown(payload, Path(args.output_md))
|
||
return 0
|
||
|
||
|
||
if __name__ == "__main__":
|
||
raise SystemExit(main())
|