Files
ewoooc/tests/test_gemini_fallback_guard.py
OoO d6d8777e41
All checks were successful
CD Pipeline / deploy (push) Successful in 1m12s
V10.601 收斂 Gemini 與密鑰治理
2026-06-06 14:52:46 +08:00

246 lines
8.4 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Gemini fallback kill-switch contract."""
import re
import subprocess
from pathlib import Path
from services.ai_provider import AIProviderService, AIResponse
from services.gemini_service import GeminiService
ROOT = Path(__file__).resolve().parents[1]
def _production_python_files():
for folder in ("services", "routes"):
yield from sorted((ROOT / folder).rglob("*.py"))
for filename in ("scheduler.py", "config.py"):
path = ROOT / filename
if path.exists():
yield path
def _rel(path: Path) -> str:
return path.relative_to(ROOT).as_posix()
def _tracked_text_files():
result = subprocess.run(
["git", "ls-files", "-z"],
cwd=ROOT,
check=True,
capture_output=True,
)
for raw in result.stdout.split(b"\0"):
if not raw:
continue
path = ROOT / raw.decode("utf-8")
if path.is_file():
try:
path.read_text(encoding="utf-8")
except UnicodeDecodeError:
continue
yield path
def test_gemini_guard_defaults_disabled(monkeypatch):
from services.gemini_guard import get_gemini_api_key, is_gemini_fallback_enabled
monkeypatch.delenv("GEMINI_API_HARD_DISABLED", raising=False)
monkeypatch.delenv("GEMINI_FALLBACK_ENABLED", raising=False)
monkeypatch.setenv("GEMINI_API_KEY", "test-key")
assert is_gemini_fallback_enabled("test") is False
assert get_gemini_api_key("test") == ""
def test_gemini_guard_hard_switch_blocks_enabled_fallback(monkeypatch):
from services.gemini_guard import get_gemini_api_key, is_gemini_fallback_enabled
monkeypatch.delenv("GEMINI_API_HARD_DISABLED", raising=False)
monkeypatch.setenv("GEMINI_FALLBACK_ENABLED", "true")
monkeypatch.setenv("GEMINI_API_KEY", "test-key")
assert is_gemini_fallback_enabled("test") is False
assert get_gemini_api_key("test") == ""
def test_gemini_guard_requires_explicit_emergency_unlock(monkeypatch):
from services.gemini_guard import get_gemini_api_key, is_gemini_fallback_enabled
monkeypatch.setenv("GEMINI_API_HARD_DISABLED", "false")
monkeypatch.setenv("GEMINI_FALLBACK_ENABLED", "true")
monkeypatch.setenv("GEMINI_API_KEY", "test-key")
assert is_gemini_fallback_enabled("test") is True
assert get_gemini_api_key("test") == "test-key"
def test_gemini_guard_respects_context_allowlist(monkeypatch):
from services.gemini_guard import get_gemini_api_key, is_gemini_fallback_enabled
monkeypatch.setenv("GEMINI_API_HARD_DISABLED", "false")
monkeypatch.setenv("GEMINI_FALLBACK_ENABLED", "true")
monkeypatch.setenv("GEMINI_ALLOWED_CONTEXTS", "code_review")
monkeypatch.setenv("GEMINI_API_KEY", "test-key")
assert is_gemini_fallback_enabled("code_review") is True
assert get_gemini_api_key("openclaw_strategy") == ""
def test_ai_provider_does_not_call_gemini_when_guard_disabled(monkeypatch):
monkeypatch.delenv("GEMINI_API_HARD_DISABLED", raising=False)
monkeypatch.delenv("GEMINI_FALLBACK_ENABLED", raising=False)
service = AIProviderService(default_provider="ollama")
failed_ollama = AIResponse(
success=False,
content="",
model="qwen2.5-coder:7b",
provider="ollama",
error="ollama down",
)
def forbidden_fallback():
raise AssertionError("Gemini fallback must be blocked by default")
result = service._gemini_fallback(failed_ollama, forbidden_fallback)
assert result.success is False
assert result.provider == "ollama"
assert "GEMINI_API_HARD_DISABLED=true" in (result.error or "")
def test_gemini_service_check_connection_is_blocked_by_default(monkeypatch):
monkeypatch.delenv("GEMINI_API_HARD_DISABLED", raising=False)
monkeypatch.delenv("GEMINI_FALLBACK_ENABLED", raising=False)
monkeypatch.setenv("GEMINI_API_KEY", "test-key")
service = GeminiService()
def forbidden_init():
raise AssertionError("SDK initialization must not run when fallback is disabled")
monkeypatch.setattr(service, "_ensure_initialized", forbidden_init)
result = service.generate("hello")
assert result.success is False
assert "GEMINI_API_HARD_DISABLED=true" in (result.error or "")
def test_mcp_collector_does_not_initialize_gemini_when_guard_disabled(monkeypatch):
import services.mcp_collector_service as mcp_mod
monkeypatch.delenv("GEMINI_API_HARD_DISABLED", raising=False)
monkeypatch.delenv("GEMINI_FALLBACK_ENABLED", raising=False)
monkeypatch.setenv("GEMINI_API_KEY", "test-key")
service = mcp_mod.MCPCollectorService()
assert service._ensure_init() is False
assert service._genai is None
def test_openclaw_direct_gemini_call_is_blocked_by_default(monkeypatch):
import services.openclaw_strategist_service as svc
monkeypatch.delenv("GEMINI_API_HARD_DISABLED", raising=False)
monkeypatch.delenv("GEMINI_FALLBACK_ENABLED", raising=False)
monkeypatch.setenv("GEMINI_API_KEY", "test-key")
assert svc._call_gemini("system", "user", caller="openclaw_qa_gemini_fallback") is None
def test_no_direct_gemini_api_key_env_read_outside_guard_or_config():
allowed = {"config.py", "services/gemini_guard.py"}
offenders = []
pattern = re.compile(r"os\.getenv\(\s*['\"]GEMINI_API_KEY['\"]")
for path in _production_python_files():
if _rel(path) in allowed:
continue
if pattern.search(path.read_text(encoding="utf-8")):
offenders.append(_rel(path))
assert offenders == []
def test_gemini_outbound_files_are_guarded():
allowed = {
"routes/openclaw_bot_routes.py",
"services/code_review_pipeline_service.py",
"services/gemini_service.py",
"services/mcp_collector_service.py",
"services/openclaw_strategist_service.py",
}
outbound_markers = (
"google.generativeai",
"genai.configure",
"GenerativeModel",
"generate_content(",
"generateContent?key=",
)
offenders = []
unguarded = []
for path in _production_python_files():
text = path.read_text(encoding="utf-8")
has_outbound = any(marker in text for marker in outbound_markers)
if not has_outbound:
continue
rel = _rel(path)
if rel not in allowed:
offenders.append(rel)
if "get_gemini_api_key" not in text:
unguarded.append(rel)
assert offenders == []
assert unguarded == []
def test_tracked_secret_manifests_do_not_contain_live_credentials():
tracked_secret_files = list(ROOT.joinpath("k8s").rglob("*.yaml"))
legacy_secret = ROOT / "k8s 2" / "03-secrets.yaml"
if legacy_secret.exists():
tracked_secret_files.append(legacy_secret)
live_secret_patterns = {
"Google API key": re.compile(r"AIza[0-9A-Za-z_-]{20,}"),
"Telegram bot token": re.compile(r"\d{8,12}:[A-Za-z0-9_-]{30,}"),
"LINE token": re.compile(r"[A-Za-z0-9+/=]{80,}"),
"hardcoded password": re.compile(
r"(POSTGRES_PASSWORD|LOGIN_PASSWORD|APP_PASSWORD|SECRET_KEY):\s*"
r"['\"](?!<)[^'\"]{6,}['\"]"
),
"inline URL password": re.compile(r"://[^:\s/]+:(?!<)[^@\s]+@"),
}
offenders = []
for path in tracked_secret_files:
text = path.read_text(encoding="utf-8")
for label, pattern in live_secret_patterns.items():
if pattern.search(text):
offenders.append(f"{path.relative_to(ROOT).as_posix()}: {label}")
assert offenders == []
def test_tracked_text_files_do_not_contain_known_live_tokens():
live_token_patterns = {
"Google API key": re.compile(r"AIza[0-9A-Za-z_-]{20,}"),
"Google OAuth access token": re.compile(r"ya29\.[0-9A-Za-z_-]{20,}"),
"Google OAuth refresh token": re.compile(r"1//0[0-9A-Za-z_-]{20,}"),
"Google OAuth client secret": re.compile(r"GOCSPX-[0-9A-Za-z_-]{12,}"),
"Telegram bot token": re.compile(r"\d{8,12}:[A-Za-z0-9_-]{30,}"),
"Ollama cloud API key": re.compile(r"\b[0-9a-f]{32}\.[A-Za-z0-9_-]{12,}\b"),
"Superset default password": re.compile(r"Wooo_Superset_\d{4}"),
}
offenders = []
for path in _tracked_text_files():
text = path.read_text(encoding="utf-8")
for label, pattern in live_token_patterns.items():
if pattern.search(text):
offenders.append(f"{_rel(path)}: {label}")
assert offenders == []