#!/usr/bin/env python3 """Read-only Gitea/GitHub 遷移盤點工具。 此工具只用 `git ls-remote` 比對兩端 refs,不 push、不 fetch、不寫入任一 remote。 寫入 evidence 前會先遮蔽 remote URL 內的帳密。 """ from __future__ import annotations import argparse import json import re import subprocess import sys from dataclasses import dataclass from pathlib import Path from urllib.parse import urlsplit, urlunsplit SHA_RE = re.compile(r"^[0-9a-fA-F]{7,40}$") @dataclass(frozen=True) class GitRefs: heads: dict[str, str] tags: dict[str, str] raw_tag_ref_count: int def run_git(repo: Path, args: list[str]) -> subprocess.CompletedProcess[str]: return subprocess.run( ["git", *args], cwd=repo, check=False, capture_output=True, text=True, ) def require_git(repo: Path, args: list[str]) -> str: result = run_git(repo, args) if result.returncode != 0: stderr = result.stderr.strip() or result.stdout.strip() raise RuntimeError(f"git {' '.join(args)} failed: {stderr}") return result.stdout def redact_url(url: str) -> str: """Remove userinfo from common URL formats before storing evidence.""" if "://" in url: parts = urlsplit(url) netloc = parts.netloc if "@" in netloc: netloc = netloc.split("@", 1)[1] return urlunsplit((parts.scheme, netloc, parts.path, parts.query, parts.fragment)) # scp-like syntax: user@host:path if "@" in url and ":" in url.split("@", 1)[1]: return url.split("@", 1)[1] return url def remote_url(repo: Path, remote: str) -> str: return redact_url(require_git(repo, ["remote", "get-url", remote]).strip()) def repo_slug_from_url(url: str) -> str: redacted = redact_url(url).removesuffix("/") if "://" in redacted: path = urlsplit(redacted).path.strip("/") elif ":" in redacted: path = redacted.split(":", 1)[1].strip("/") else: path = redacted.strip("/") return path.removesuffix(".git") or redacted def parse_ls_remote(output: str, prefix: str) -> dict[str, str]: refs: dict[str, str] = {} for line in output.splitlines(): if not line.strip(): continue try: sha, ref = line.split(None, 1) except ValueError: continue if not SHA_RE.match(sha) or not ref.startswith(prefix): continue name = ref.removeprefix(prefix) refs[name] = sha return refs def refs_for_remote(repo: Path, remote: str) -> GitRefs: heads_out = require_git(repo, ["ls-remote", "--heads", remote]) tags_out = require_git(repo, ["ls-remote", "--tags", remote]) raw_tags = parse_ls_remote(tags_out, "refs/tags/") tags = { name.removesuffix("^{}"): sha for name, sha in raw_tags.items() if not name.endswith("^{}") } peeled = { name.removesuffix("^{}"): sha for name, sha in raw_tags.items() if name.endswith("^{}") } tags.update({name: peeled_sha for name, peeled_sha in peeled.items()}) return GitRefs( heads=parse_ls_remote(heads_out, "refs/heads/"), tags=tags, raw_tag_ref_count=len(raw_tags), ) def compare_maps(left: dict[str, str], right: dict[str, str]) -> dict[str, object]: left_names = set(left) right_names = set(right) common = sorted(left_names & right_names) return { "only_left": sorted(left_names - right_names), "only_right": sorted(right_names - left_names), "sha_mismatch": [ { "name": name, "left_sha": left[name], "right_sha": right[name], } for name in common if left[name] != right[name] ], "matching": [name for name in common if left[name] == right[name]], } def build_inventory(repo: Path, gitea_remote: str, github_remote: str) -> dict[str, object]: gitea = refs_for_remote(repo, gitea_remote) github = refs_for_remote(repo, github_remote) gitea_url = remote_url(repo, gitea_remote) github_url = remote_url(repo, github_remote) head_diff = compare_maps(gitea.heads, github.heads) tag_diff = compare_maps(gitea.tags, github.tags) latest_sha_gitea = gitea.heads.get("main", "") latest_sha_github = github.heads.get("main", "") status = "verified" blocking_reasons: list[str] = [] if head_diff["only_left"] or head_diff["only_right"] or head_diff["sha_mismatch"]: status = "blocked" blocking_reasons.append("branches 尚未完全對齊") if tag_diff["only_left"] or tag_diff["only_right"] or tag_diff["sha_mismatch"]: status = "blocked" blocking_reasons.append("tags 尚未完全對齊") if latest_sha_gitea and latest_sha_github and latest_sha_gitea != latest_sha_github: status = "blocked" blocking_reasons.append("main SHA 不一致") if not latest_sha_gitea or not latest_sha_github: status = "blocked" blocking_reasons.append("其中一端缺少 main") return { "repo_path": str(repo), "gitea_remote": gitea_remote, "github_remote": github_remote, "gitea_url_redacted": gitea_url, "github_url_redacted": github_url, "gitea_repo": repo_slug_from_url(gitea_url), "github_repo": repo_slug_from_url(github_url), "branch_count_gitea": len(gitea.heads), "branch_count_github": len(github.heads), "tag_count_gitea": len(gitea.tags), "tag_count_github": len(github.tags), "raw_tag_ref_count_gitea": gitea.raw_tag_ref_count, "raw_tag_ref_count_github": github.raw_tag_ref_count, "latest_sha_gitea": latest_sha_gitea, "latest_sha_github": latest_sha_github, "workflows_mapped": False, "webhooks_mapped": False, "secrets_inventory_only": True, "status": status, "blocking_reason": ";".join(blocking_reasons) if blocking_reasons else "", "heads": head_diff, "tags": tag_diff, } def event_payload(inventory: dict[str, object], evidence_ref: str | None) -> dict[str, object]: payload = { "schema_version": "source_control_migration_event_v1", "gitea_repo": inventory["gitea_repo"], "github_repo": inventory["github_repo"], "branch_count_gitea": inventory["branch_count_gitea"], "branch_count_github": inventory["branch_count_github"], "tag_count_gitea": inventory["tag_count_gitea"], "tag_count_github": inventory["tag_count_github"], "latest_sha_gitea": inventory["latest_sha_gitea"], "latest_sha_github": inventory["latest_sha_github"], "workflows_mapped": inventory["workflows_mapped"], "webhooks_mapped": inventory["webhooks_mapped"], "secrets_inventory_only": inventory["secrets_inventory_only"], "status": inventory["status"], "blocking_reason": inventory["blocking_reason"], } if evidence_ref: payload["evidence_refs"] = [evidence_ref] return payload def write_markdown(inventory: dict[str, object], path: Path) -> None: heads = inventory["heads"] tags = inventory["tags"] assert isinstance(heads, dict) assert isinstance(tags, dict) lines = [ "# Source Control 遷移盤點快照", "", "| 項目 | 值 |", "|------|----|", f"| 狀態 | `{inventory['status']}` |", f"| Gitea remote | `{inventory['gitea_remote']}` |", f"| GitHub remote | `{inventory['github_remote']}` |", f"| Gitea repo | `{inventory['gitea_repo']}` |", f"| GitHub repo | `{inventory['github_repo']}` |", f"| Gitea URL | `{inventory['gitea_url_redacted']}` |", f"| GitHub URL | `{inventory['github_url_redacted']}` |", f"| Gitea 分支數 | `{inventory['branch_count_gitea']}` |", f"| GitHub 分支數 | `{inventory['branch_count_github']}` |", f"| Gitea tags | `{inventory['tag_count_gitea']}` |", f"| GitHub tags | `{inventory['tag_count_github']}` |", f"| Gitea main | `{inventory['latest_sha_gitea']}` |", f"| GitHub main | `{inventory['latest_sha_github']}` |", f"| 阻塞原因 | {inventory['blocking_reason'] or '無'} |", "", "## 分支差異", "", f"- 只在 Gitea:`{len(heads['only_left'])}`", f"- 只在 GitHub:`{len(heads['only_right'])}`", f"- SHA 不一致:`{len(heads['sha_mismatch'])}`", f"- SHA 一致:`{len(heads['matching'])}`", "", "## Tag 差異", "", f"- 只在 Gitea:`{len(tags['only_left'])}`", f"- 只在 GitHub:`{len(tags['only_right'])}`", f"- SHA 不一致:`{len(tags['sha_mismatch'])}`", f"- SHA 一致:`{len(tags['matching'])}`", "", "> 注意:本檔由 read-only inventory 工具產生,不包含 remote URL 內的帳密。", "", ] path.write_text("\n".join(lines), encoding="utf-8") def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--repo", default=".") parser.add_argument("--gitea-remote", default="gitea") parser.add_argument("--github-remote", default="origin") parser.add_argument("--output-json") parser.add_argument("--output-md") args = parser.parse_args() repo = Path(args.repo).resolve() try: inventory = build_inventory(repo, args.gitea_remote, args.github_remote) except RuntimeError as exc: print(str(exc), file=sys.stderr) return 2 payload = json.dumps( event_payload(inventory, args.output_md), ensure_ascii=False, indent=2, ) if args.output_json: Path(args.output_json).write_text(payload + "\n", encoding="utf-8") else: print(payload) if args.output_md: write_markdown(inventory, Path(args.output_md)) return 0 if __name__ == "__main__": raise SystemExit(main())