Files
awoooi/scripts/security/source-control-migration-inventory.py
Your Name 9e15fd08b3
All checks were successful
CD Pipeline / tests (push) Successful in 1m39s
Code Review / ai-code-review (push) Successful in 15s
CD Pipeline / build-and-deploy (push) Successful in 5m19s
CD Pipeline / post-deploy-checks (push) Successful in 2m11s
feat(web): land iwooos security posture surfaces
2026-05-25 20:35:52 +08:00

289 lines
9.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""Read-only Gitea/GitHub 遷移盤點工具。
此工具只用 `git ls-remote` 比對兩端 refs不 push、不 fetch、不寫入任一 remote。
寫入 evidence 前會先遮蔽 remote URL 內的帳密。
"""
from __future__ import annotations
import argparse
import json
import re
import subprocess
import sys
from dataclasses import dataclass
from pathlib import Path
from urllib.parse import urlsplit, urlunsplit
SHA_RE = re.compile(r"^[0-9a-fA-F]{7,40}$")
@dataclass(frozen=True)
class GitRefs:
heads: dict[str, str]
tags: dict[str, str]
raw_tag_ref_count: int
def run_git(repo: Path, args: list[str]) -> subprocess.CompletedProcess[str]:
return subprocess.run(
["git", *args],
cwd=repo,
check=False,
capture_output=True,
text=True,
)
def require_git(repo: Path, args: list[str]) -> str:
result = run_git(repo, args)
if result.returncode != 0:
stderr = result.stderr.strip() or result.stdout.strip()
raise RuntimeError(f"git {' '.join(args)} failed: {stderr}")
return result.stdout
def redact_url(url: str) -> str:
"""Remove userinfo from common URL formats before storing evidence."""
if "://" in url:
parts = urlsplit(url)
netloc = parts.netloc
if "@" in netloc:
netloc = netloc.split("@", 1)[1]
return urlunsplit((parts.scheme, netloc, parts.path, parts.query, parts.fragment))
# scp-like syntax: user@host:path
if "@" in url and ":" in url.split("@", 1)[1]:
return url.split("@", 1)[1]
return url
def remote_url(repo: Path, remote: str) -> str:
return redact_url(require_git(repo, ["remote", "get-url", remote]).strip())
def repo_slug_from_url(url: str) -> str:
redacted = redact_url(url).removesuffix("/")
if "://" in redacted:
path = urlsplit(redacted).path.strip("/")
elif ":" in redacted:
path = redacted.split(":", 1)[1].strip("/")
else:
path = redacted.strip("/")
return path.removesuffix(".git") or redacted
def parse_ls_remote(output: str, prefix: str) -> dict[str, str]:
refs: dict[str, str] = {}
for line in output.splitlines():
if not line.strip():
continue
try:
sha, ref = line.split(None, 1)
except ValueError:
continue
if not SHA_RE.match(sha) or not ref.startswith(prefix):
continue
name = ref.removeprefix(prefix)
refs[name] = sha
return refs
def refs_for_remote(repo: Path, remote: str) -> GitRefs:
heads_out = require_git(repo, ["ls-remote", "--heads", remote])
tags_out = require_git(repo, ["ls-remote", "--tags", remote])
raw_tags = parse_ls_remote(tags_out, "refs/tags/")
tags = {
name.removesuffix("^{}"): sha
for name, sha in raw_tags.items()
if not name.endswith("^{}")
}
peeled = {
name.removesuffix("^{}"): sha
for name, sha in raw_tags.items()
if name.endswith("^{}")
}
tags.update({name: peeled_sha for name, peeled_sha in peeled.items()})
return GitRefs(
heads=parse_ls_remote(heads_out, "refs/heads/"),
tags=tags,
raw_tag_ref_count=len(raw_tags),
)
def compare_maps(left: dict[str, str], right: dict[str, str]) -> dict[str, object]:
left_names = set(left)
right_names = set(right)
common = sorted(left_names & right_names)
return {
"only_left": sorted(left_names - right_names),
"only_right": sorted(right_names - left_names),
"sha_mismatch": [
{
"name": name,
"left_sha": left[name],
"right_sha": right[name],
}
for name in common
if left[name] != right[name]
],
"matching": [name for name in common if left[name] == right[name]],
}
def build_inventory(repo: Path, gitea_remote: str, github_remote: str) -> dict[str, object]:
gitea = refs_for_remote(repo, gitea_remote)
github = refs_for_remote(repo, github_remote)
gitea_url = remote_url(repo, gitea_remote)
github_url = remote_url(repo, github_remote)
head_diff = compare_maps(gitea.heads, github.heads)
tag_diff = compare_maps(gitea.tags, github.tags)
latest_sha_gitea = gitea.heads.get("main", "")
latest_sha_github = github.heads.get("main", "")
status = "verified"
blocking_reasons: list[str] = []
if head_diff["only_left"] or head_diff["only_right"] or head_diff["sha_mismatch"]:
status = "blocked"
blocking_reasons.append("branches 尚未完全對齊")
if tag_diff["only_left"] or tag_diff["only_right"] or tag_diff["sha_mismatch"]:
status = "blocked"
blocking_reasons.append("tags 尚未完全對齊")
if latest_sha_gitea and latest_sha_github and latest_sha_gitea != latest_sha_github:
status = "blocked"
blocking_reasons.append("main SHA 不一致")
if not latest_sha_gitea or not latest_sha_github:
status = "blocked"
blocking_reasons.append("其中一端缺少 main")
return {
"repo_path": str(repo),
"gitea_remote": gitea_remote,
"github_remote": github_remote,
"gitea_url_redacted": gitea_url,
"github_url_redacted": github_url,
"gitea_repo": repo_slug_from_url(gitea_url),
"github_repo": repo_slug_from_url(github_url),
"branch_count_gitea": len(gitea.heads),
"branch_count_github": len(github.heads),
"tag_count_gitea": len(gitea.tags),
"tag_count_github": len(github.tags),
"raw_tag_ref_count_gitea": gitea.raw_tag_ref_count,
"raw_tag_ref_count_github": github.raw_tag_ref_count,
"latest_sha_gitea": latest_sha_gitea,
"latest_sha_github": latest_sha_github,
"workflows_mapped": False,
"webhooks_mapped": False,
"secrets_inventory_only": True,
"status": status,
"blocking_reason": "".join(blocking_reasons) if blocking_reasons else "",
"heads": head_diff,
"tags": tag_diff,
}
def event_payload(inventory: dict[str, object], evidence_ref: str | None) -> dict[str, object]:
payload = {
"schema_version": "source_control_migration_event_v1",
"gitea_repo": inventory["gitea_repo"],
"github_repo": inventory["github_repo"],
"branch_count_gitea": inventory["branch_count_gitea"],
"branch_count_github": inventory["branch_count_github"],
"tag_count_gitea": inventory["tag_count_gitea"],
"tag_count_github": inventory["tag_count_github"],
"latest_sha_gitea": inventory["latest_sha_gitea"],
"latest_sha_github": inventory["latest_sha_github"],
"workflows_mapped": inventory["workflows_mapped"],
"webhooks_mapped": inventory["webhooks_mapped"],
"secrets_inventory_only": inventory["secrets_inventory_only"],
"status": inventory["status"],
"blocking_reason": inventory["blocking_reason"],
}
if evidence_ref:
payload["evidence_refs"] = [evidence_ref]
return payload
def write_markdown(inventory: dict[str, object], path: Path) -> None:
heads = inventory["heads"]
tags = inventory["tags"]
assert isinstance(heads, dict)
assert isinstance(tags, dict)
lines = [
"# Source Control 遷移盤點快照",
"",
"| 項目 | 值 |",
"|------|----|",
f"| 狀態 | `{inventory['status']}` |",
f"| Gitea remote | `{inventory['gitea_remote']}` |",
f"| GitHub remote | `{inventory['github_remote']}` |",
f"| Gitea repo | `{inventory['gitea_repo']}` |",
f"| GitHub repo | `{inventory['github_repo']}` |",
f"| Gitea URL | `{inventory['gitea_url_redacted']}` |",
f"| GitHub URL | `{inventory['github_url_redacted']}` |",
f"| Gitea 分支數 | `{inventory['branch_count_gitea']}` |",
f"| GitHub 分支數 | `{inventory['branch_count_github']}` |",
f"| Gitea tags | `{inventory['tag_count_gitea']}` |",
f"| GitHub tags | `{inventory['tag_count_github']}` |",
f"| Gitea main | `{inventory['latest_sha_gitea']}` |",
f"| GitHub main | `{inventory['latest_sha_github']}` |",
f"| 阻塞原因 | {inventory['blocking_reason'] or ''} |",
"",
"## 分支差異",
"",
f"- 只在 Gitea`{len(heads['only_left'])}`",
f"- 只在 GitHub`{len(heads['only_right'])}`",
f"- SHA 不一致:`{len(heads['sha_mismatch'])}`",
f"- SHA 一致:`{len(heads['matching'])}`",
"",
"## Tag 差異",
"",
f"- 只在 Gitea`{len(tags['only_left'])}`",
f"- 只在 GitHub`{len(tags['only_right'])}`",
f"- SHA 不一致:`{len(tags['sha_mismatch'])}`",
f"- SHA 一致:`{len(tags['matching'])}`",
"",
"> 注意:本檔由 read-only inventory 工具產生,不包含 remote URL 內的帳密。",
"",
]
path.write_text("\n".join(lines), encoding="utf-8")
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("--repo", default=".")
parser.add_argument("--gitea-remote", default="gitea")
parser.add_argument("--github-remote", default="origin")
parser.add_argument("--output-json")
parser.add_argument("--output-md")
args = parser.parse_args()
repo = Path(args.repo).resolve()
try:
inventory = build_inventory(repo, args.gitea_remote, args.github_remote)
except RuntimeError as exc:
print(str(exc), file=sys.stderr)
return 2
payload = json.dumps(
event_payload(inventory, args.output_md),
ensure_ascii=False,
indent=2,
)
if args.output_json:
Path(args.output_json).write_text(payload + "\n", encoding="utf-8")
else:
print(payload)
if args.output_md:
write_markdown(inventory, Path(args.output_md))
return 0
if __name__ == "__main__":
raise SystemExit(main())