#!/usr/bin/env python3 """Git remote refs 只讀探測工具。 此工具用 `git ls-remote --heads --tags` 讀取指定本機 repo 的 remote refs, 並比對本機 HEAD / branch。它不 fetch、不 clone、不 push,也不修改 remote。 """ from __future__ import annotations import argparse import json import subprocess from pathlib import Path from urllib.parse import urlsplit, urlunsplit def redact_url(value: str) -> str: if "://" not in value: if "@" in value and ":" in value.split("@", 1)[1]: return value.split("@", 1)[1] return value parts = urlsplit(value) netloc = parts.netloc.split("@", 1)[-1] return urlunsplit((parts.scheme, netloc, parts.path, parts.query, parts.fragment)) def run_git(repo: Path, args: list[str], timeout: int) -> subprocess.CompletedProcess[str]: try: return subprocess.run( ["git", *args], cwd=repo, check=False, capture_output=True, text=True, timeout=timeout, ) except subprocess.TimeoutExpired: return subprocess.CompletedProcess(["git", *args], 124, "", "git command timeout") def git_value(repo: Path, args: list[str], timeout: int) -> str: result = run_git(repo, args, timeout) if result.returncode != 0: return "" return result.stdout.strip() def parse_refs(output: str) -> tuple[dict[str, str], dict[str, str]]: heads: dict[str, str] = {} tags: dict[str, str] = {} for line in output.splitlines(): parts = line.split() if len(parts) != 2: continue sha, ref = parts if ref.startswith("refs/heads/"): heads[ref.removeprefix("refs/heads/")] = sha elif ref.startswith("refs/tags/"): tag = ref.removeprefix("refs/tags/").removesuffix("^{}") tags[tag] = sha return heads, tags def parse_repo_arg(value: str) -> tuple[str, Path, str]: parts = value.split("=") if len(parts) not in (2, 3): raise argparse.ArgumentTypeError("--repo 必須是 label=/absolute/path 或 label=/absolute/path=remote") label = parts[0].strip() path = Path(parts[1].strip()).expanduser().resolve() remote = parts[2].strip() if len(parts) == 3 else "origin" if not label or not str(path) or not remote: raise argparse.ArgumentTypeError("--repo label/path/remote 不可為空") return label, path, remote def probe_repo(label: str, repo: Path, remote: str, timeout: int) -> dict[str, object]: local_head = git_value(repo, ["rev-parse", "HEAD"], timeout) local_branch = git_value(repo, ["branch", "--show-current"], timeout) remote_url = git_value(repo, ["remote", "get-url", remote], timeout) ls_remote = run_git(repo, ["ls-remote", "--heads", "--tags", remote], timeout) if ls_remote.returncode == 0: heads, tags = parse_refs(ls_remote.stdout) remote_branch_sha = heads.get(local_branch, "") if local_branch and remote_branch_sha and remote_branch_sha == local_head: status = "aligned_current_branch" elif heads: status = "reachable_drift_or_unknown" else: status = "reachable_no_heads" error_summary = "" else: heads = {} tags = {} remote_branch_sha = "" status = "unreachable" stderr = ls_remote.stderr.strip() if "Permission denied" in stderr: error_summary = "SSH 權限不足或 remote 不可讀" elif "Repository not found" in stderr: error_summary = "remote repo not found 或未授權" else: tail = (stderr or ls_remote.stdout.strip()).splitlines()[-1:] error_summary = tail[0] if tail else "git ls-remote failed" return { "label": label, "repo_path": str(repo), "remote": remote, "remote_url_redacted": redact_url(remote_url), "status": status, "local_branch": local_branch, "local_head": local_head, "remote_current_branch_sha": remote_branch_sha, "head_count": len(heads), "tag_count": len(tags), "heads": [{"name": name, "sha": sha} for name, sha in sorted(heads.items())], "tags": [{"name": name, "sha": sha} for name, sha in sorted(tags.items())], "error_summary": error_summary, } def build_payload(group_name: str, repo_args: list[tuple[str, Path, str]], timeout: int) -> dict[str, object]: repos = [probe_repo(label, path, remote, timeout) for label, path, remote in repo_args] aligned = sum(1 for repo in repos if repo["status"] == "aligned_current_branch") unreachable = sum(1 for repo in repos if repo["status"] == "unreachable") return { "schema_version": "git_remote_refs_probe_v1", "group_name": group_name, "status": "ok" if unreachable == 0 else "partial", "repo_count": len(repos), "aligned_current_branch_count": aligned, "unreachable_count": unreachable, "repos": repos, } def write_markdown(payload: dict[str, object], path: Path) -> None: lines = [ "# Git Remote Refs Probe 快照", "", "| 項目 | 值 |", "|------|----|", f"| 群組 | `{payload['group_name']}` |", f"| 狀態 | `{payload['status']}` |", f"| repo 數 | `{payload['repo_count']}` |", f"| aligned current branch | `{payload['aligned_current_branch_count']}` |", f"| unreachable | `{payload['unreachable_count']}` |", "", "## Repo refs", "", "| Label | Remote URL | Status | Local branch | Local HEAD | Remote branch SHA | Heads | Tags | Error |", "|-------|------------|--------|--------------|------------|-------------------|-------|------|-------|", ] for repo in payload.get("repos", []): if not isinstance(repo, dict): continue lines.append( "| " + " | ".join( [ f"`{repo.get('label', '')}`", f"`{repo.get('remote_url_redacted', '')}`", f"`{repo.get('status', '')}`", f"`{repo.get('local_branch', '')}`", f"`{str(repo.get('local_head', ''))[:7]}`", f"`{str(repo.get('remote_current_branch_sha', ''))[:7]}`", f"`{repo.get('head_count', 0)}`", f"`{repo.get('tag_count', 0)}`", str(repo.get("error_summary", "") or "無"), ] ) + " |" ) lines.extend( [ "", "> 注意:本檔只使用 `git ls-remote` 做 read-only refs 探測;未 fetch、未 clone、未 push。", "", ] ) path.write_text("\n".join(lines), encoding="utf-8") def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--group-name", required=True) parser.add_argument("--repo", action="append", type=parse_repo_arg, required=True) parser.add_argument("--output-json", required=True) parser.add_argument("--output-md", required=True) parser.add_argument("--timeout", type=int, default=10) args = parser.parse_args() payload = build_payload(args.group_name, args.repo, args.timeout) Path(args.output_json).write_text( json.dumps(payload, ensure_ascii=False, indent=2) + "\n", encoding="utf-8", ) write_markdown(payload, Path(args.output_md)) return 0 if __name__ == "__main__": raise SystemExit(main())