fix(api): surface stockplatform live runtime drift
Some checks failed
CD Pipeline / workflow-shape (push) Successful in 0s
CD Pipeline / cancel-stale-cd (push) Has been skipped
CD Pipeline / tests (push) Successful in 1m44s
CD Pipeline / post-deploy-checks (push) Has been cancelled
CD Pipeline / build-and-deploy (push) Has been cancelled

This commit is contained in:
Your Name
2026-06-30 09:50:25 +08:00
parent ce764f6453
commit b4a8dfbb03
8 changed files with 712 additions and 3 deletions

View File

@@ -334,6 +334,8 @@ jobs:
;;
apps/api/src/services/reboot_auto_recovery_drill_preflight.py)
;;
apps/api/src/services/stockplatform_public_api_runtime_readback.py)
;;
apps/api/src/services/iwooos_security_operating_system.py)
;;
apps/api/Dockerfile)
@@ -436,6 +438,8 @@ jobs:
;;
apps/api/tests/test_reboot_auto_recovery_slo_scorecard_api.py)
;;
apps/api/tests/test_stockplatform_public_api_runtime_readback.py)
;;
apps/api/tests/test_iwooos_security_operating_system.py)
;;
apps/api/tests/test_awoooi_production_deploy_readback_blocker.py)
@@ -628,6 +632,7 @@ jobs:
src/services/gitea_workflow_runner_owner_attestation_request.py \
src/services/reboot_auto_recovery_slo_scorecard.py \
src/services/reboot_auto_recovery_drill_preflight.py \
src/services/stockplatform_public_api_runtime_readback.py \
src/services/iwooos_security_operating_system.py \
src/services/awoooi_gitea_onboarding_warning_step_dashboard.py \
src/services/awoooi_gitea_onboarding_warning_step_owner_package.py \
@@ -685,6 +690,7 @@ jobs:
tests/test_gitea_private_inventory_p0_scorecard_api.py \
tests/test_gitea_workflow_runner_owner_attestation_request_api.py \
tests/test_reboot_auto_recovery_slo_scorecard_api.py \
tests/test_stockplatform_public_api_runtime_readback.py \
tests/test_iwooos_security_operating_system.py \
tests/test_awoooi_production_deploy_readback_blocker.py \
tests/test_awoooi_priority_work_order_readback_api.py \

View File

@@ -335,6 +335,7 @@ from src.services.awoooi_gitea_onboarding_warning_step_template_copy_receipt imp
load_latest_awoooi_gitea_onboarding_warning_step_template_copy_receipt,
)
from src.services.awoooi_priority_work_order_readback import (
apply_stockplatform_public_api_runtime_readback,
load_latest_awoooi_priority_work_order_readback,
)
from src.services.awoooi_status_cleanup_dashboard import (
@@ -442,6 +443,9 @@ from src.services.service_health_failure_notification_policy import (
from src.services.service_health_gap_matrix import (
load_latest_service_health_gap_matrix,
)
from src.services.stockplatform_public_api_runtime_readback import (
load_latest_stockplatform_public_api_runtime_readback,
)
router = APIRouter(prefix="/agents", tags=["Agent Teams"])
logger = get_logger("awoooi.agents")
@@ -1067,8 +1071,9 @@ async def get_delivery_closure_workbench() -> dict[str, Any]:
description=(
"讀取已提交的 AWOOOI P0/P1 主線工作順序快照;此端點只回傳"
"目前 active P0、已關閉 P0、下一步順序、禁止事項與 evidence refs。"
"不讀 raw sessions / SQLite、不呼叫 GitHub / Gitea live API、不讀 secret、"
"不註冊 runner、不觸發 workflow、不操作 host / Docker / K8s / DB / firewall。"
"會讀取 StockPlatform public HTTPS health/freshness/ingestion live "
"readback不讀 raw sessions / SQLite、不呼叫 GitHub / Gitea live API、"
"不讀 secret、不註冊 runner、不觸發 workflow、不操作 host / Docker / K8s / DB / firewall。"
),
)
async def get_awoooi_priority_work_order_readback() -> dict[str, Any]:
@@ -1077,6 +1082,10 @@ async def get_awoooi_priority_work_order_readback() -> dict[str, Any]:
payload = await asyncio.to_thread(
load_latest_awoooi_priority_work_order_readback
)
stockplatform_runtime = await asyncio.to_thread(
load_latest_stockplatform_public_api_runtime_readback
)
apply_stockplatform_public_api_runtime_readback(payload, stockplatform_runtime)
return redact_public_lan_topology(payload)
except FileNotFoundError as exc:
raise HTTPException(
@@ -1091,6 +1100,34 @@ async def get_awoooi_priority_work_order_readback() -> dict[str, Any]:
) from exc
@router.get(
"/stockplatform-public-api-runtime-readback",
response_model=dict[str, Any],
summary="取得 StockPlatform public API runtime readback",
description=(
"讀取 StockPlatform public web/API health、freshness 與 ingestion live "
"readback並與 P0-006 committed scorecard 比對。此端點只做 public HTTPS "
"probe不 SSH、不 Docker、不 restart、不寫 DB、不讀 secret、不觸發 workflow。"
),
)
async def get_stockplatform_public_api_runtime_readback() -> dict[str, Any]:
"""回傳 StockPlatform public API runtime live readback。"""
try:
payload = await asyncio.to_thread(
load_latest_stockplatform_public_api_runtime_readback
)
return redact_public_lan_topology(payload)
except (json.JSONDecodeError, ValueError) as exc:
logger.error(
"stockplatform_public_api_runtime_readback_invalid",
error=str(exc),
)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="StockPlatform public API runtime readback 無效",
) from exc
@router.get(
"/product-awoooi-manifest-standard",
response_model=dict[str, Any],

View File

@@ -41,6 +41,123 @@ def load_latest_awoooi_priority_work_order_readback(
return payload
def apply_stockplatform_public_api_runtime_readback(
payload: dict[str, Any],
runtime_readback: dict[str, Any],
) -> None:
"""Overlay live StockPlatform public API truth onto the priority readback."""
runtime_ready = runtime_readback.get("status") == (
"stockplatform_public_api_runtime_ready"
)
runtime_blockers = _strings(runtime_readback.get("active_blockers"))
runtime_rollups = _dict(runtime_readback.get("rollups"))
runtime_readback_body = _dict(runtime_readback.get("readback"))
state = _dict(payload.setdefault("mainline_execution_state", {}))
state["stockplatform_public_api_runtime_status"] = str(
runtime_readback.get("status") or "unknown"
)
state["stockplatform_public_api_live_readback_state"] = (
"ready" if runtime_ready else "blocked"
)
state["stockplatform_public_api_active_blockers"] = runtime_blockers
state["stockplatform_public_api_live_drift_from_committed_scorecard"] = bool(
runtime_readback.get("live_drift_from_committed_scorecard") is True
)
for item in _list(payload.get("in_progress_or_blocked_in_priority_order")):
workplan = _dict(item)
if workplan.get("workplan_id") != "P0-006":
continue
evidence = _dict(workplan.setdefault("evidence", {}))
evidence["stockplatform_public_api_runtime_status"] = state[
"stockplatform_public_api_runtime_status"
]
evidence["stockplatform_public_api_runtime_ready"] = runtime_ready
evidence["stockplatform_public_api_active_blockers"] = runtime_blockers
evidence["stockplatform_public_api_http_502_count"] = _int(
runtime_rollups.get("http_502_count")
)
evidence["stockplatform_public_api_live_drift_from_committed_scorecard"] = (
state["stockplatform_public_api_live_drift_from_committed_scorecard"]
)
evidence["stockplatform_public_api_health_http_status"] = (
runtime_readback_body.get("api_health_http_status")
)
evidence["stockplatform_freshness_http_status"] = (
runtime_readback_body.get("freshness_http_status")
)
evidence["stockplatform_ingestion_http_status"] = (
runtime_readback_body.get("ingestion_http_status")
)
if runtime_ready:
continue
workplan["status"] = "blocked_stockplatform_public_api_runtime_drift"
workplan["safe_next_step"] = str(
runtime_readback.get("safe_next_step") or ""
)
workplan["reason"] = (
"Committed P0-006 scorecard says StockPlatform freshness and "
"ingestion are ok, but live public API readback is blocked. "
"This must remain visible before claiming the reboot SLO lane is "
"only waiting on a fresh boot window."
)
professional_fix = _dict(workplan.setdefault("professional_fix", {}))
professional_fix["action"] = (
"Recover the StockPlatform API container/runtime control path with "
"a separate bounded plan, then rerun public API readback. Do not "
"restart Docker daemon, reboot hosts, prune data, write DB rows, "
"trigger workflows, or read secrets from this lane."
)
professional_fix["owner"] = (
"stockplatform public API runtime readback plus P0-006 reboot SLO lane"
)
if runtime_ready:
_refresh_rollups_after_stockplatform_overlay(payload, state)
return
existing_blockers = _strings(state.get("active_p0_live_active_blockers"))
state["active_p0_live_active_blockers"] = _unique_strings(
existing_blockers
+ ["stockplatform_public_api_runtime_drift"]
+ runtime_blockers
)
state["active_p0_state"] = "blocked_stockplatform_public_api_runtime_drift"
state["next_executable_mainline_workplan_id"] = (
"P0-006-STOCKPLATFORM-PUBLIC-API-RUNTIME-READBACK"
)
state["next_executable_mainline_state"] = (
"blocked_live_stockplatform_public_api_requires_separate_runtime_"
"control_path_recovery_without_daemon_restart"
)
payload["status"] = "p0_006_blocked_stockplatform_public_api_runtime_drift"
payload["next_execution_order"] = [
(
"P0-006-STOCKPLATFORM-PUBLIC-API-RUNTIME-READBACK: live public "
"StockPlatform API is blocked while committed scorecard still says "
"freshness/ingestion are ok; fix runtime control-path evidence "
"before claiming the lane is only reboot-window gated."
),
(
"P0-006: keep reboot SLO timer live, but do not claim final SLO "
"closure while StockPlatform public API readback is blocked."
),
(
"P0-006-REBOOT-DRILL-PREFLIGHT-READBACK: preflight remains ready, "
"but reboot remains break-glass and is not authorized by this lane."
),
(
"NEXT: after StockPlatform public API readback is green or a "
"separate hard-blocker handoff is recorded, resume the next "
"blocker-free priority item."
),
]
_refresh_rollups_after_stockplatform_overlay(payload, state)
def _enrich_from_current_readbacks(payload: dict[str, Any]) -> None:
from src.services.awoooi_gitea_onboarding_warning_step_runtime_enablement_gate import (
load_latest_awoooi_gitea_onboarding_warning_step_runtime_enablement_gate,
@@ -314,6 +431,48 @@ def _enrich_from_current_readbacks(payload: dict[str, Any]) -> None:
)
def _refresh_rollups_after_stockplatform_overlay(
payload: dict[str, Any],
state: dict[str, Any],
) -> None:
rollups = _dict(payload.setdefault("rollups", {}))
active_blockers = _strings(state.get("active_p0_live_active_blockers"))
stock_ready = state.get("stockplatform_public_api_live_readback_state") == "ready"
stock_blocked = (
state.get("stockplatform_public_api_live_readback_state") == "blocked"
)
rollups["stockplatform_public_api_runtime_ready"] = stock_ready
rollups["stockplatform_public_api_runtime_blocked"] = stock_blocked
rollups["stockplatform_public_api_live_drift_from_committed_scorecard"] = bool(
state.get("stockplatform_public_api_live_drift_from_committed_scorecard")
is True
)
rollups["active_p0_live_active_blocker_count"] = len(active_blockers)
rollups["active_p0_event_gated_by_fresh_reboot_window_only"] = (
rollups.get("active_p0_event_gated_by_fresh_reboot_window_only") is True
and not stock_blocked
)
summary = _dict(payload.setdefault("summary", {}))
summary["status"] = str(payload.get("status") or "")
summary["active_p0_state"] = str(state.get("active_p0_state") or "")
summary["active_p0_live_active_blockers"] = active_blockers
summary["stockplatform_public_api_runtime_status"] = str(
state.get("stockplatform_public_api_runtime_status") or "unknown"
)
summary["stockplatform_public_api_runtime_ready"] = stock_ready
summary["stockplatform_public_api_live_drift_from_committed_scorecard"] = bool(
state.get("stockplatform_public_api_live_drift_from_committed_scorecard")
is True
)
summary["next_executable_mainline_workplan_id"] = str(
state.get("next_executable_mainline_workplan_id") or ""
)
summary["next_executable_mainline_state"] = str(
state.get("next_executable_mainline_state") or ""
)
def _set_rollups_and_summary(
*,
payload: dict[str, Any],
@@ -455,6 +614,17 @@ def _strings(value: Any) -> list[str]:
return [str(item) for item in _list(value)]
def _unique_strings(values: list[str]) -> list[str]:
seen: set[str] = set()
unique: list[str] = []
for value in values:
if value in seen:
continue
seen.add(value)
unique.append(value)
return unique
def _is_sha(value: str) -> bool:
return bool(_SHA_RE.fullmatch(value))

View File

@@ -0,0 +1,292 @@
"""StockPlatform public API runtime readback.
This readback probes only public HTTPS endpoints. It does not SSH, use Docker,
read secrets, write StockPlatform data, or trigger recovery actions.
"""
from __future__ import annotations
import json
import urllib.error
import urllib.request
from pathlib import Path
from typing import Any, Callable
from src.services.reboot_auto_recovery_slo_scorecard import (
load_latest_reboot_auto_recovery_slo_scorecard,
)
_API_SCHEMA_VERSION = "stockplatform_public_api_runtime_readback_v1"
_DEFAULT_BASE_URL = "https://stock.wooo.work"
_DEFAULT_TIMEOUT_SECONDS = 4.0
Probe = Callable[[str, float], dict[str, Any]]
def load_latest_stockplatform_public_api_runtime_readback(
*,
base_url: str = _DEFAULT_BASE_URL,
timeout_seconds: float = _DEFAULT_TIMEOUT_SECONDS,
operations_dir: Path | None = None,
probe: Probe | None = None,
) -> dict[str, Any]:
"""Build a live public readback for StockPlatform web/API health."""
http_probe = probe or _http_probe
normalized_base_url = base_url.rstrip("/")
committed_scorecard = load_latest_reboot_auto_recovery_slo_scorecard(
operations_dir
)
committed_stock = _dict(committed_scorecard.get("stockplatform_data_freshness"))
endpoints = {
"public_web_healthz": "/healthz",
"public_api_healthz": "/api/healthz",
"freshness": "/api/v1/system/freshness",
"ingestion": "/api/v1/system/ingestion",
}
probes = {
name: _probe_endpoint(
http_probe,
f"{normalized_base_url}{path}",
timeout_seconds,
parse_json=name in {"freshness", "ingestion"},
)
for name, path in endpoints.items()
}
return _build_payload(
base_url=normalized_base_url,
timeout_seconds=timeout_seconds,
probes=probes,
committed_stockplatform=committed_stock,
)
def _build_payload(
*,
base_url: str,
timeout_seconds: float,
probes: dict[str, dict[str, Any]],
committed_stockplatform: dict[str, Any],
) -> dict[str, Any]:
web = _dict(probes.get("public_web_healthz"))
api = _dict(probes.get("public_api_healthz"))
freshness = _dict(probes.get("freshness"))
ingestion = _dict(probes.get("ingestion"))
freshness_json = _dict(freshness.get("json"))
ingestion_json = _dict(ingestion.get("json"))
checks = {
"public_web_health_ok": web.get("http_status") == 200,
"public_api_health_ok": api.get("http_status") == 200,
"freshness_readback_ok": freshness.get("http_status") == 200
and freshness_json.get("status") == "ok",
"ingestion_readback_ok": ingestion.get("http_status") == 200
and ingestion_json.get("status") == "ok",
}
active_blockers = _active_blockers(
api=api,
freshness=freshness,
ingestion=ingestion,
freshness_json=freshness_json,
ingestion_json=ingestion_json,
)
ready = all(checks.values())
committed_freshness_ok = committed_stockplatform.get("freshness_status") == "ok"
committed_ingestion_ok = committed_stockplatform.get("ingestion_status") == "ok"
committed_ok = committed_freshness_ok and committed_ingestion_ok
live_drift_from_committed_scorecard = committed_ok and not ready
status = (
"stockplatform_public_api_runtime_ready"
if ready
else "blocked_stockplatform_public_api_runtime_drift"
)
safe_next_step = (
"stockplatform_public_api_ready_keep_p0_006_reboot_window_gate"
if ready
else (
"recover_stockplatform_api_container_runtime_or_docker_control_path_"
"without_daemon_restart_then_rerun_public_api_readback"
)
)
http_statuses = {
name: _dict(probe).get("http_status") for name, probe in probes.items()
}
return {
"schema_version": _API_SCHEMA_VERSION,
"scope": "stockplatform_public_api_runtime_readback",
"priority": "P0-006",
"status": status,
"safe_next_step": safe_next_step,
"active_blockers": active_blockers,
"active_blocker_count": len(active_blockers),
"runtime_ready": ready,
"live_drift_from_committed_scorecard": live_drift_from_committed_scorecard,
"base_url": base_url,
"timeout_seconds": timeout_seconds,
"checks": checks,
"readback": {
"web_health_http_status": web.get("http_status"),
"api_health_http_status": api.get("http_status"),
"freshness_http_status": freshness.get("http_status"),
"ingestion_http_status": ingestion.get("http_status"),
"freshness_status": str(freshness_json.get("status") or "unknown"),
"ingestion_status": str(ingestion_json.get("status") or "unknown"),
"committed_freshness_status": str(
committed_stockplatform.get("freshness_status") or "unknown"
),
"committed_ingestion_status": str(
committed_stockplatform.get("ingestion_status") or "unknown"
),
"live_drift_from_committed_scorecard": (
live_drift_from_committed_scorecard
),
},
"rollups": {
"runtime_ready": ready,
"public_web_health_ok": checks["public_web_health_ok"],
"public_api_health_ok": checks["public_api_health_ok"],
"freshness_readback_ok": checks["freshness_readback_ok"],
"ingestion_readback_ok": checks["ingestion_readback_ok"],
"http_502_count": sum(
1 for status_code in http_statuses.values() if status_code == 502
),
"http_non_200_count": sum(
1
for status_code in http_statuses.values()
if isinstance(status_code, int) and status_code != 200
),
"committed_stockplatform_freshness_ok": committed_freshness_ok,
"committed_stockplatform_ingestion_ok": committed_ingestion_ok,
"live_drift_from_committed_scorecard": (
live_drift_from_committed_scorecard
),
},
"probes": probes,
"operation_boundaries": {
"read_only_public_https_probe": True,
"ssh_used": False,
"docker_command_performed": False,
"docker_restart_performed": False,
"docker_daemon_restart_performed": False,
"host_reboot_performed": False,
"service_restart_performed": False,
"database_write_or_restore_performed": False,
"stockplatform_manual_data_write_performed": False,
"secret_value_collection_allowed": False,
"github_api_used": False,
"workflow_trigger_performed": False,
"runtime_write_allowed": False,
},
}
def _active_blockers(
*,
api: dict[str, Any],
freshness: dict[str, Any],
ingestion: dict[str, Any],
freshness_json: dict[str, Any],
ingestion_json: dict[str, Any],
) -> list[str]:
blockers: list[str] = []
_append_http_blocker(blockers, "stockplatform_public_api_healthz", api)
_append_http_blocker(blockers, "stockplatform_freshness", freshness)
_append_http_blocker(blockers, "stockplatform_ingestion", ingestion)
if freshness.get("http_status") == 200 and freshness_json.get("status") != "ok":
blockers.extend(
f"stockplatform_freshness_{item}"
for item in _strings(freshness_json.get("blockers"))
)
if not _strings(freshness_json.get("blockers")):
blockers.append("stockplatform_freshness_status_not_ok")
if ingestion.get("http_status") == 200 and ingestion_json.get("status") != "ok":
blockers.extend(
f"stockplatform_ingestion_{item}"
for item in _strings(ingestion_json.get("blockers"))
)
if not _strings(ingestion_json.get("blockers")):
blockers.append("stockplatform_ingestion_status_not_ok")
return _unique_strings(blockers)
def _append_http_blocker(
blockers: list[str],
prefix: str,
probe: dict[str, Any],
) -> None:
http_status = probe.get("http_status")
if http_status == 200:
return
if isinstance(http_status, int):
blockers.append(f"{prefix}_http_{http_status}")
return
blockers.append(f"{prefix}_unreachable")
def _probe_endpoint(
probe: Probe,
url: str,
timeout_seconds: float,
*,
parse_json: bool,
) -> dict[str, Any]:
result = probe(url, timeout_seconds)
http_status = _int_or_none(result.get("http_status"))
body = str(result.get("body") or "")
payload: dict[str, Any] = {
"url": url,
"http_status": http_status,
"ok": http_status == 200,
"error": str(result.get("error") or ""),
}
if parse_json and http_status == 200:
try:
payload["json"] = json.loads(body)
except json.JSONDecodeError:
payload["json"] = {}
payload["error"] = "invalid_json"
return payload
def _http_probe(url: str, timeout_seconds: float) -> dict[str, Any]:
request = urllib.request.Request(
url,
headers={"User-Agent": "awoooi-stockplatform-public-api-readback"},
)
try:
with urllib.request.urlopen(request, timeout=timeout_seconds) as response:
body = response.read(8192).decode("utf-8", "replace")
return {"http_status": response.status, "body": body, "error": ""}
except urllib.error.HTTPError as exc:
body = exc.read(2048).decode("utf-8", "replace")
return {"http_status": exc.code, "body": body, "error": ""}
except Exception as exc: # noqa: BLE001 - readback must fail closed.
return {"http_status": None, "body": "", "error": type(exc).__name__}
def _dict(value: Any) -> dict[str, Any]:
return value if isinstance(value, dict) else {}
def _int_or_none(value: Any) -> int | None:
if isinstance(value, bool):
return int(value)
if isinstance(value, int | float):
return int(value)
return None
def _strings(value: Any) -> list[str]:
if not isinstance(value, list):
return []
return [str(item) for item in value if item is not None]
def _unique_strings(values: list[str]) -> list[str]:
seen: set[str] = set()
unique: list[str] = []
for value in values:
if value in seen:
continue
seen.add(value)
unique.append(value)
return unique

View File

@@ -7,8 +7,10 @@ import pytest
from fastapi import FastAPI
from fastapi.testclient import TestClient
from src.api.v1 import agents
from src.api.v1.agents import router
from src.services.awoooi_priority_work_order_readback import (
apply_stockplatform_public_api_runtime_readback,
load_latest_awoooi_priority_work_order_readback,
)
@@ -57,7 +59,14 @@ def test_awoooi_priority_work_order_readback_loader_returns_mainline_order():
assert payload["operation_boundaries"]["host_write_performed"] is False
def test_awoooi_priority_work_order_readback_endpoint_returns_snapshot():
def test_awoooi_priority_work_order_readback_endpoint_returns_snapshot(
monkeypatch: pytest.MonkeyPatch,
):
monkeypatch.setattr(
agents,
"load_latest_stockplatform_public_api_runtime_readback",
_stockplatform_runtime_ready,
)
app = FastAPI()
app.include_router(router, prefix="/api/v1")
client = TestClient(app)
@@ -70,10 +79,43 @@ def test_awoooi_priority_work_order_readback_endpoint_returns_snapshot():
assert data["mainline_execution_state"]["active_p0_workplan_id"] == "P0-006"
assert data["mainline_execution_state"]["p0_004_template_copy_apply_gate_runtime_readback_state"] == "ready"
assert data["mainline_execution_state"]["reboot_drill_preflight_runtime_readback_state"] == "ready"
assert data["rollups"]["stockplatform_public_api_runtime_ready"] is True
assert data["next_execution_order"][0].startswith("P0-006:")
assert "do not reboot" in data["next_execution_order"][0]
def test_awoooi_priority_work_order_readback_overlays_live_stockplatform_drift():
payload = load_latest_awoooi_priority_work_order_readback()
apply_stockplatform_public_api_runtime_readback(
payload,
_stockplatform_runtime_blocked(),
)
state = payload["mainline_execution_state"]
in_progress = payload["in_progress_or_blocked_in_priority_order"][0]
evidence = in_progress["evidence"]
assert payload["status"] == "p0_006_blocked_stockplatform_public_api_runtime_drift"
assert state["active_p0_state"] == "blocked_stockplatform_public_api_runtime_drift"
assert state["stockplatform_public_api_live_readback_state"] == "blocked"
assert "stockplatform_public_api_runtime_drift" in state[
"active_p0_live_active_blockers"
]
assert evidence["stockplatform_public_api_runtime_ready"] is False
assert evidence["stockplatform_public_api_http_502_count"] == 3
assert evidence["stockplatform_public_api_health_http_status"] == 502
assert in_progress["status"] == "blocked_stockplatform_public_api_runtime_drift"
assert "Do not restart Docker daemon" in in_progress["professional_fix"]["action"]
assert payload["rollups"]["stockplatform_public_api_runtime_blocked"] is True
assert (
payload["rollups"]["active_p0_event_gated_by_fresh_reboot_window_only"]
is False
)
assert payload["next_execution_order"][0].startswith(
"P0-006-STOCKPLATFORM-PUBLIC-API-RUNTIME-READBACK"
)
def test_awoooi_priority_work_order_readback_normalizes_runtime_source_truth(
monkeypatch: pytest.MonkeyPatch,
):
@@ -130,3 +172,48 @@ def test_awoooi_priority_work_order_readback_rejects_reordered_active_p0(tmp_pat
with pytest.raises(ValueError, match="active_p0_workplan_id"):
load_latest_awoooi_priority_work_order_readback(operations_dir)
def _stockplatform_runtime_ready() -> dict:
return {
"schema_version": "stockplatform_public_api_runtime_readback_v1",
"status": "stockplatform_public_api_runtime_ready",
"safe_next_step": "stockplatform_public_api_ready_keep_p0_006_reboot_window_gate",
"active_blockers": [],
"runtime_ready": True,
"live_drift_from_committed_scorecard": False,
"readback": {
"api_health_http_status": 200,
"freshness_http_status": 200,
"ingestion_http_status": 200,
},
"rollups": {
"http_502_count": 0,
},
}
def _stockplatform_runtime_blocked() -> dict:
return {
"schema_version": "stockplatform_public_api_runtime_readback_v1",
"status": "blocked_stockplatform_public_api_runtime_drift",
"safe_next_step": (
"recover_stockplatform_api_container_runtime_or_docker_control_path_"
"without_daemon_restart_then_rerun_public_api_readback"
),
"active_blockers": [
"stockplatform_public_api_healthz_http_502",
"stockplatform_freshness_http_502",
"stockplatform_ingestion_http_502",
],
"runtime_ready": False,
"live_drift_from_committed_scorecard": True,
"readback": {
"api_health_http_status": 502,
"freshness_http_status": 502,
"ingestion_http_status": 502,
},
"rollups": {
"http_502_count": 3,
},
}

View File

@@ -0,0 +1,96 @@
from __future__ import annotations
import json
from fastapi import FastAPI
from fastapi.testclient import TestClient
from src.api.v1 import agents
from src.api.v1.agents import router
from src.services.stockplatform_public_api_runtime_readback import (
load_latest_stockplatform_public_api_runtime_readback,
)
def test_stockplatform_public_api_runtime_readback_blocks_live_502():
payload = load_latest_stockplatform_public_api_runtime_readback(
probe=_probe_public_web_ok_api_502
)
assert payload["schema_version"] == "stockplatform_public_api_runtime_readback_v1"
assert payload["priority"] == "P0-006"
assert payload["status"] == "blocked_stockplatform_public_api_runtime_drift"
assert payload["runtime_ready"] is False
assert payload["live_drift_from_committed_scorecard"] is True
assert payload["readback"]["api_health_http_status"] == 502
assert payload["readback"]["freshness_http_status"] == 502
assert payload["readback"]["ingestion_http_status"] == 502
assert payload["rollups"]["public_web_health_ok"] is True
assert payload["rollups"]["public_api_health_ok"] is False
assert payload["rollups"]["http_502_count"] == 3
assert payload["active_blockers"] == [
"stockplatform_public_api_healthz_http_502",
"stockplatform_freshness_http_502",
"stockplatform_ingestion_http_502",
]
assert (
payload["operation_boundaries"]["read_only_public_https_probe"] is True
)
assert payload["operation_boundaries"]["ssh_used"] is False
assert payload["operation_boundaries"]["docker_restart_performed"] is False
assert payload["operation_boundaries"]["docker_daemon_restart_performed"] is False
assert payload["operation_boundaries"]["host_reboot_performed"] is False
assert payload["operation_boundaries"]["database_write_or_restore_performed"] is False
assert payload["operation_boundaries"]["secret_value_collection_allowed"] is False
def test_stockplatform_public_api_runtime_readback_ready_when_live_green():
payload = load_latest_stockplatform_public_api_runtime_readback(probe=_probe_ok)
assert payload["status"] == "stockplatform_public_api_runtime_ready"
assert payload["runtime_ready"] is True
assert payload["live_drift_from_committed_scorecard"] is False
assert payload["active_blockers"] == []
assert payload["readback"]["freshness_status"] == "ok"
assert payload["readback"]["ingestion_status"] == "ok"
assert payload["rollups"]["http_502_count"] == 0
def test_stockplatform_public_api_runtime_endpoint_returns_readback(monkeypatch):
monkeypatch.setattr(
agents,
"load_latest_stockplatform_public_api_runtime_readback",
lambda: load_latest_stockplatform_public_api_runtime_readback(
probe=_probe_public_web_ok_api_502
),
)
app = FastAPI()
app.include_router(router, prefix="/api/v1")
client = TestClient(app)
response = client.get("/api/v1/agents/stockplatform-public-api-runtime-readback")
assert response.status_code == 200
data = response.json()
assert data["status"] == "blocked_stockplatform_public_api_runtime_drift"
assert data["active_blocker_count"] == 3
def _probe_public_web_ok_api_502(url: str, timeout_seconds: float) -> dict:
del timeout_seconds
if url.endswith("/healthz") and "/api/" not in url:
return {"http_status": 200, "body": "ok", "error": ""}
return {
"http_status": 502,
"body": "<html><h1>502 Bad Gateway</h1></html>",
"error": "",
}
def _probe_ok(url: str, timeout_seconds: float) -> dict:
del timeout_seconds
if url.endswith("/api/v1/system/freshness"):
return {"http_status": 200, "body": json.dumps({"status": "ok"}), "error": ""}
if url.endswith("/api/v1/system/ingestion"):
return {"http_status": 200, "body": json.dumps({"status": "ok"}), "error": ""}
return {"http_status": 200, "body": "ok", "error": ""}

View File

@@ -1,3 +1,20 @@
## 2026-06-30 — 09:50 P0-006 StockPlatform live public API drift readback
**照主線修正的問題**
- Production P0-006 committed scorecard 顯示 StockPlatform freshness / ingestion `ok`,但 live public `https://stock.wooo.work/api/*` 目前回 502若只看 committed scorecard會誤判 P0-006 只剩 fresh reboot window。
- 新增 `stockplatform_public_api_runtime_readback` service 與 GET `/api/v1/agents/stockplatform-public-api-runtime-readback`,只做 public HTTPS probe`/healthz``/api/healthz``/api/v1/system/freshness``/api/v1/system/ingestion`
- `GET /api/v1/agents/awoooi-priority-work-order-readback` 現在會把 live StockPlatform public API readback 疊到 priority ledger若 committed scorecard 是 ok 但 live API 502主線狀態改為 `p0_006_blocked_stockplatform_public_api_runtime_drift`,下一步固定為 `P0-006-STOCKPLATFORM-PUBLIC-API-RUNTIME-READBACK`
**驗證**
- Focused pytestStockPlatform runtime readback / priority readback / reboot SLO / Delivery Workbench / CD profile `43 passed`
- `py_compile`:新增 service、priority service、agents router 通過。
- Gitea runner pressure guard`workflow_files=11``auto_branch_events_on_110=0``generic_runner_labels=0`
- Gitea step env secret guard`no Gitea run/with secrets or legacy Telegram routes`
- `git diff --check`:通過。
- 本地 live readback 讀回 `stock_status=blocked_stockplatform_public_api_runtime_drift``stock_http_502_count=3`priority overlay 讀回 `priority_next=P0-006-STOCKPLATFORM-PUBLIC-API-RUNTIME-READBACK`
**邊界**:未重啟主機,未 restart Docker daemon / Nginx / K3s / DB / firewall未 prune / restore / DB write未讀 secret / token / raw sessions / SQLite / `.env`,未使用 GitHub / `gh` / GitHub API。
## 2026-06-30 — 09:24 production deploy readback bounded poll
**照主線處理的問題**

View File

@@ -315,10 +315,14 @@ def test_reboot_auto_recovery_slo_sources_stay_on_controlled_runtime_profile() -
"docs/operations/awoooi-reboot-auto-recovery-slo-scorecard.snapshot.json)",
"apps/api/src/services/reboot_auto_recovery_slo_scorecard.py)",
"apps/api/src/services/reboot_auto_recovery_drill_preflight.py)",
"apps/api/src/services/stockplatform_public_api_runtime_readback.py)",
"apps/api/tests/test_reboot_auto_recovery_slo_scorecard_api.py)",
"apps/api/tests/test_stockplatform_public_api_runtime_readback.py)",
"src/services/reboot_auto_recovery_slo_scorecard.py",
"src/services/reboot_auto_recovery_drill_preflight.py",
"src/services/stockplatform_public_api_runtime_readback.py",
"tests/test_reboot_auto_recovery_slo_scorecard_api.py",
"tests/test_stockplatform_public_api_runtime_readback.py",
"scripts/reboot-recovery/awoooi-reboot-auto-recovery-slo.service)",
"scripts/reboot-recovery/awoooi-reboot-auto-recovery-slo.timer)",
"scripts/reboot-recovery/install-reboot-auto-recovery-slo-110.sh)",