Files
awoooi/scripts/security/gitea-repo-inventory.py
Your Name 9e15fd08b3
All checks were successful
CD Pipeline / tests (push) Successful in 1m39s
Code Review / ai-code-review (push) Successful in 15s
CD Pipeline / build-and-deploy (push) Successful in 5m19s
CD Pipeline / post-deploy-checks (push) Successful in 2m11s
feat(web): land iwooos security posture surfaces
2026-05-25 20:35:52 +08:00

256 lines
9.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""Read-only Gitea repo 全量盤點工具。
此工具可查詢 Gitea org/user API endpoint或吃管理介面匯出的 JSON。
執行期間不寫入 Gitea也不會把 API token 寫入輸出檔。
"""
from __future__ import annotations
import argparse
import json
import os
import sys
import urllib.error
import urllib.request
from pathlib import Path
from urllib.parse import quote, urlencode, urlsplit, urlunsplit
def redact_url(value: str) -> str:
if "://" not in value:
if "@" in value and ":" in value.split("@", 1)[1]:
return value.split("@", 1)[1]
return value
parts = urlsplit(value)
netloc = parts.netloc.split("@", 1)[-1]
return urlunsplit((parts.scheme, netloc, parts.path, parts.query, parts.fragment))
def api_get_json(url: str, token: str | None, timeout: int) -> tuple[int, object]:
headers = {
"Accept": "application/json",
"User-Agent": "awoooi-security-inventory/1.0",
}
if token:
headers["Authorization"] = f"token {token}"
request = urllib.request.Request(url, headers=headers, method="GET")
try:
with urllib.request.urlopen(request, timeout=timeout) as response:
body = response.read().decode("utf-8")
return response.status, json.loads(body)
except urllib.error.HTTPError as exc:
body = exc.read().decode("utf-8", errors="replace")
try:
payload: object = json.loads(body)
except json.JSONDecodeError:
payload = {"message": body.strip()}
return exc.code, payload
def repo_summary(repo: dict[str, object], github_owner: str | None) -> dict[str, object]:
owner = repo.get("owner") if isinstance(repo.get("owner"), dict) else {}
owner_name = owner.get("login") if isinstance(owner, dict) else None
raw_full_name = str(repo.get("full_name") or "")
name = str(repo.get("name") or raw_full_name.rsplit("/", 1)[-1] or "")
full_name = raw_full_name or (f"{owner_name}/{name}" if owner_name else name)
clone_url = str(repo.get("clone_url") or repo.get("html_url") or "")
ssh_url = str(repo.get("ssh_url") or "")
github_repo = f"{github_owner}/{name}" if github_owner and name else ""
return {
"gitea_repo": full_name,
"name": name,
"owner": owner_name or "",
"private": bool(repo.get("private", False)),
"empty": bool(repo.get("empty", False)),
"archived": bool(repo.get("archived", False)),
"default_branch": str(repo.get("default_branch") or ""),
"clone_url_redacted": redact_url(clone_url),
"ssh_url_redacted": redact_url(ssh_url),
"github_repo_candidate": github_repo,
}
def load_export(path: Path) -> object:
return json.loads(path.read_text(encoding="utf-8"))
def extract_repos(payload: object) -> list[dict[str, object]]:
if isinstance(payload, list):
return [item for item in payload if isinstance(item, dict)]
if isinstance(payload, dict):
for key in ("data", "repos", "repositories"):
value = payload.get(key)
if isinstance(value, list):
return [item for item in value if isinstance(item, dict)]
return []
def build_inventory(
*,
base_url: str,
org: str,
github_owner: str | None,
token_present: bool,
http_status: int | None,
payload: object,
query_mode: str,
query: str,
) -> dict[str, object]:
repos = [repo_summary(repo, github_owner) for repo in extract_repos(payload)]
if query_mode == "export":
visibility_scope = "admin_export"
elif token_present:
visibility_scope = "authenticated"
else:
visibility_scope = "public_only"
if repos and visibility_scope in ("authenticated", "admin_export"):
status = "ok"
elif repos:
status = "partial"
else:
status = "blocked"
blocking_reason = ""
if repos and status == "partial":
blocking_reason = "未提供 token結果只代表公開可見 repoprivate/internal repos 仍需只讀 token 或管理匯出"
elif not repos:
if http_status in (401, 403):
blocking_reason = "Gitea API 需要只讀 token 或權限不足"
elif query_mode == "search" and http_status == 200:
blocking_reason = "Gitea public repo search 未回傳 repo可能沒有公開 repo 或需要只讀 token"
elif http_status == 404:
blocking_reason = "Gitea API 查無 org/user repos需確認 org 名稱或使用管理匯出"
elif http_status is None:
blocking_reason = "匯入檔案沒有可解析的 repo list"
else:
blocking_reason = f"Gitea API 回應無 repo listHTTP {http_status}"
return {
"schema_version": "gitea_repo_inventory_v1",
"base_url": redact_url(base_url.rstrip("/")),
"org": org,
"github_owner": github_owner or "",
"query_mode": query_mode,
"query": query,
"visibility_scope": visibility_scope,
"token_present": token_present,
"http_status": http_status,
"status": status,
"blocking_reason": blocking_reason,
"repo_count": len(repos),
"repos": repos,
}
def write_markdown(inventory: dict[str, object], path: Path) -> None:
lines = [
"# Gitea Repo 全量盤點快照",
"",
"| 項目 | 值 |",
"|------|----|",
f"| 狀態 | `{inventory['status']}` |",
f"| Gitea base URL | `{inventory['base_url']}` |",
f"| Org/User | `{inventory['org']}` |",
f"| GitHub owner 候選 | `{inventory['github_owner']}` |",
f"| 查詢模式 | `{inventory.get('query_mode', '')}` |",
f"| 查詢字串 | `{inventory.get('query', '')}` |",
f"| 可見性範圍 | `{inventory.get('visibility_scope', '')}` |",
f"| 是否提供 token | `{inventory['token_present']}` |",
f"| HTTP status | `{inventory['http_status']}` |",
f"| Repo 數量 | `{inventory['repo_count']}` |",
f"| 阻塞原因 | {inventory['blocking_reason'] or ''} |",
"",
"## Repo 清單",
"",
"| Gitea repo | GitHub 候選 | default branch | private | archived |",
"|------------|------------------|----------------|---------|----------|",
]
repos = inventory.get("repos")
if isinstance(repos, list):
for repo in repos:
if not isinstance(repo, dict):
continue
lines.append(
"| "
+ " | ".join(
[
f"`{repo.get('gitea_repo', '')}`",
f"`{repo.get('github_repo_candidate', '')}`",
f"`{repo.get('default_branch', '')}`",
f"`{repo.get('private', '')}`",
f"`{repo.get('archived', '')}`",
]
)
+ " |"
)
lines.extend(
[
"",
"> 注意:本檔由 read-only Gitea inventory 工具產生,不包含 API token 或 remote URL 帳密。",
"",
]
)
path.write_text("\n".join(lines), encoding="utf-8")
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("--base-url", default="http://192.168.0.110:3001")
parser.add_argument("--org", default="wooo")
parser.add_argument("--scope", choices=["org", "user", "search"], default="org")
parser.add_argument("--query", default="")
parser.add_argument("--limit", type=int, default=50)
parser.add_argument("--github-owner", default="")
parser.add_argument("--token-env", default="GITEA_READONLY_TOKEN")
parser.add_argument("--input-json")
parser.add_argument("--output-json", required=True)
parser.add_argument("--output-md", required=True)
parser.add_argument("--timeout", type=int, default=5)
args = parser.parse_args()
token = os.environ.get(args.token_env)
http_status: int | None = None
if args.input_json:
payload = load_export(Path(args.input_json))
query_mode = "export"
query = "input-json"
else:
if args.scope == "search":
params = {"limit": str(args.limit)}
if args.query:
params["q"] = args.query
url = f"{args.base_url.rstrip('/')}/api/v1/repos/search?{urlencode(params)}"
else:
quoted = quote(args.org, safe="")
prefix = "orgs" if args.scope == "org" else "users"
url = f"{args.base_url.rstrip('/')}/api/v1/{prefix}/{quoted}/repos"
http_status, payload = api_get_json(url, token, args.timeout)
query_mode = args.scope
query = args.query
inventory = build_inventory(
base_url=args.base_url,
org=args.org,
github_owner=args.github_owner or None,
token_present=bool(token),
http_status=http_status,
payload=payload,
query_mode=query_mode,
query=query,
)
Path(args.output_json).write_text(
json.dumps(inventory, ensure_ascii=False, indent=2) + "\n",
encoding="utf-8",
)
write_markdown(inventory, Path(args.output_md))
if inventory["status"] == "blocked":
print(inventory["blocking_reason"], file=sys.stderr)
return 2
return 0
if __name__ == "__main__":
raise SystemExit(main())