Files
awoooi/ops/runner/test_read_public_gitea_actions_queue.py
Your Name 5e9849ea32
Some checks failed
CD Pipeline / workflow-shape (push) Successful in 0s
CD Pipeline / cancel-stale-cd (push) Has been skipped
CD Pipeline / tests (push) Failing after 3m1s
CD Pipeline / build-and-deploy (push) Has been skipped
CD Pipeline / post-deploy-checks (push) Has been skipped
fix(runner): mark non110 cd closure verified
2026-06-29 11:38:28 +08:00

187 lines
6.7 KiB
Python

#!/usr/bin/env python3
from __future__ import annotations
import importlib.util
import json
import subprocess
import sys
from pathlib import Path
ROOT = Path(__file__).resolve().parents[2]
SCRIPT = ROOT / "ops/runner/read-public-gitea-actions-queue.py"
def _load_module():
spec = importlib.util.spec_from_file_location(
"read_public_gitea_actions_queue",
SCRIPT,
)
assert spec and spec.loader
module = importlib.util.module_from_spec(spec)
sys.modules[spec.name] = module
spec.loader.exec_module(module)
return module
def _actions_html() -> str:
return """
<span data-tooltip-content="No matching online runner with label: awoooi-non110-ubuntu">
<span><b>ai-technology-watch.yaml #3857</b>:</span>Scheduled</div>
<span data-tooltip-content="Canceled">
<span><b>ai-technology-watch.yaml #3856</b>:</span>Scheduled</div>
<span data-tooltip-content="Waiting">
<span><b>e2e-health.yaml #3855</b>:</span>Scheduled</div>
"""
def _actions_html_full_run_items() -> str:
return """
<div class="flex-list run-list">
<div class="flex-item tw-items-center">
<div class="flex-item-leading">
<span data-tooltip-content="Running"></span>
</div>
<div class="flex-item-main">
<a class="flex-item-title" title="chore(deploy): retry non110 cd after ci image seed" href="/wooo/awoooi/actions/runs/3863">
chore(deploy): retry non110 cd after ci image seed
</a>
<div class="flex-item-body">
<span><b>cd.yaml #3863</b>:</span>Commit
<a href="/wooo/awoooi/commit/a70c6756d9e76c33143676eef82bab7a49ac1839">a70c6756d9</a>
</div>
</div>
</div>
<div class="flex-item tw-items-center">
<div class="flex-item-leading">
<span data-tooltip-content="No matching online runner with label: awoooi-non110-host"></span>
</div>
<div class="flex-item-main">
<a class="flex-item-title" title="fix(ci): restore non110 cd push trigger" href="/wooo/awoooi/actions/runs/3859">
fix(ci): restore non110 cd push trigger
</a>
<div class="flex-item-body">
<span><b>cd.yaml #3859</b>:</span>Commit
<a href="/wooo/awoooi/commit/f3634db18ec3e37381dd634f5f5b80d73f8a8e38">f3634db18e</a>
</div>
</div>
</div>
</div>
"""
def test_parse_visible_runs_extracts_no_matching_runner_label() -> None:
module = _load_module()
runs = module.parse_visible_runs(_actions_html())
assert runs[0]["run_id"] == "3857"
assert runs[0]["workflow"] == "ai-technology-watch.yaml"
assert runs[0]["kind"] == "Scheduled"
assert runs[0]["no_matching_runner_label"] == "awoooi-non110-ubuntu"
assert runs[1]["no_matching_runner_label"] == ""
def test_parse_visible_runs_keeps_full_gitea_rows_aligned() -> None:
module = _load_module()
runs = module.parse_visible_runs(_actions_html_full_run_items())
assert runs[0]["run_id"] == "3863"
assert runs[0]["workflow"] == "cd.yaml"
assert runs[0]["kind"] == "Commit"
assert runs[0]["status"] == "Running"
assert runs[0]["title"] == "chore(deploy): retry non110 cd after ci image seed"
assert runs[0]["commit_sha"] == "a70c6756d9e76c33143676eef82bab7a49ac1839"
assert runs[1]["run_id"] == "3859"
assert runs[1]["no_matching_runner_label"] == "awoooi-non110-host"
def test_build_readback_reports_latest_visible_cd_run() -> None:
module = _load_module()
payload = module.build_readback(
actions_html=_actions_html_full_run_items(),
actions_list_http_status=401,
actions_list_payload={"message": "token is required"},
cd_jobs_http_status=200,
cd_jobs_payload={"jobs": [], "total_count": 0},
)
assert payload["readback"]["latest_visible_cd_run_id"] == "3863"
assert payload["readback"]["latest_visible_cd_run_status"] == "Running"
assert payload["readback"]["latest_visible_cd_run_kind"] == "Commit"
assert payload["readback"]["latest_visible_cd_run_commit_sha"] == (
"a70c6756d9e76c33143676eef82bab7a49ac1839"
)
assert payload["rollups"]["current_main_cd_run_visible"] is True
assert payload["rollups"]["current_main_cd_run_status"] == "Running"
def test_build_readback_sanitizes_actions_api_internal_url() -> None:
module = _load_module()
payload = module.build_readback(
actions_html=_actions_html(),
actions_list_http_status=401,
actions_list_payload={
"message": "token is required",
"url": "http://192.168.0.110:3001/api/swagger",
},
cd_jobs_http_status=200,
cd_jobs_payload={"jobs": [], "total_count": 0},
)
text = json.dumps(payload, sort_keys=True)
assert payload["schema_version"] == module.SCHEMA_VERSION
assert payload["status"] == "blocked_no_matching_online_runner"
assert payload["readback"]["actions_list_without_token_message"] == (
"token is required"
)
assert payload["readback"]["latest_visible_no_matching_runner_run_id"] == "3857"
assert payload["readback"]["latest_visible_no_matching_runner_label"] == (
"awoooi-non110-ubuntu"
)
assert payload["rollups"]["actions_list_requires_token"] is True
assert payload["operation_boundaries"]["github_api_used"] is False
assert "192.168.0." not in text
def test_cli_json_uses_fixture_files_without_network(tmp_path: Path) -> None:
html_path = tmp_path / "actions.html"
list_path = tmp_path / "actions-list.json"
jobs_path = tmp_path / "jobs.json"
html_path.write_text(_actions_html(), encoding="utf-8")
list_path.write_text(
json.dumps(
{
"message": "token is required",
"url": "http://192.168.0.110:3001/api/swagger",
}
),
encoding="utf-8",
)
jobs_path.write_text(json.dumps({"jobs": [], "total_count": 0}), encoding="utf-8")
result = subprocess.run(
[
sys.executable,
str(SCRIPT),
"--actions-html-file",
str(html_path),
"--actions-list-json-file",
str(list_path),
"--actions-list-http-status",
"401",
"--cd-run-jobs-json-file",
str(jobs_path),
"--cd-run-jobs-http-status",
"200",
"--json",
],
check=False,
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
assert result.returncode == 0, result.stdout + result.stderr
payload = json.loads(result.stdout)
assert payload["readback"]["actions_page_visible_run_count"] == 3
assert payload["readback"]["cd_run_jobs_total_count"] == 0
assert payload["readback"]["latest_visible_no_matching_runner_label"] == (
"awoooi-non110-ubuntu"
)
assert "192.168.0." not in result.stdout