246 lines
8.4 KiB
Python
246 lines
8.4 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""Gemini fallback kill-switch contract."""
|
|
|
|
import re
|
|
import subprocess
|
|
from pathlib import Path
|
|
|
|
from services.ai_provider import AIProviderService, AIResponse
|
|
from services.gemini_service import GeminiService
|
|
|
|
ROOT = Path(__file__).resolve().parents[1]
|
|
|
|
|
|
def _production_python_files():
|
|
for folder in ("services", "routes"):
|
|
yield from sorted((ROOT / folder).rglob("*.py"))
|
|
for filename in ("scheduler.py", "config.py"):
|
|
path = ROOT / filename
|
|
if path.exists():
|
|
yield path
|
|
|
|
|
|
def _rel(path: Path) -> str:
|
|
return path.relative_to(ROOT).as_posix()
|
|
|
|
|
|
def _tracked_text_files():
|
|
result = subprocess.run(
|
|
["git", "ls-files", "-z"],
|
|
cwd=ROOT,
|
|
check=True,
|
|
capture_output=True,
|
|
)
|
|
for raw in result.stdout.split(b"\0"):
|
|
if not raw:
|
|
continue
|
|
path = ROOT / raw.decode("utf-8")
|
|
if path.is_file():
|
|
try:
|
|
path.read_text(encoding="utf-8")
|
|
except UnicodeDecodeError:
|
|
continue
|
|
yield path
|
|
|
|
|
|
def test_gemini_guard_defaults_disabled(monkeypatch):
|
|
from services.gemini_guard import get_gemini_api_key, is_gemini_fallback_enabled
|
|
|
|
monkeypatch.delenv("GEMINI_API_HARD_DISABLED", raising=False)
|
|
monkeypatch.delenv("GEMINI_FALLBACK_ENABLED", raising=False)
|
|
monkeypatch.setenv("GEMINI_API_KEY", "test-key")
|
|
|
|
assert is_gemini_fallback_enabled("test") is False
|
|
assert get_gemini_api_key("test") == ""
|
|
|
|
|
|
def test_gemini_guard_hard_switch_blocks_enabled_fallback(monkeypatch):
|
|
from services.gemini_guard import get_gemini_api_key, is_gemini_fallback_enabled
|
|
|
|
monkeypatch.delenv("GEMINI_API_HARD_DISABLED", raising=False)
|
|
monkeypatch.setenv("GEMINI_FALLBACK_ENABLED", "true")
|
|
monkeypatch.setenv("GEMINI_API_KEY", "test-key")
|
|
|
|
assert is_gemini_fallback_enabled("test") is False
|
|
assert get_gemini_api_key("test") == ""
|
|
|
|
|
|
def test_gemini_guard_requires_explicit_emergency_unlock(monkeypatch):
|
|
from services.gemini_guard import get_gemini_api_key, is_gemini_fallback_enabled
|
|
|
|
monkeypatch.setenv("GEMINI_API_HARD_DISABLED", "false")
|
|
monkeypatch.setenv("GEMINI_FALLBACK_ENABLED", "true")
|
|
monkeypatch.setenv("GEMINI_API_KEY", "test-key")
|
|
|
|
assert is_gemini_fallback_enabled("test") is True
|
|
assert get_gemini_api_key("test") == "test-key"
|
|
|
|
|
|
def test_gemini_guard_respects_context_allowlist(monkeypatch):
|
|
from services.gemini_guard import get_gemini_api_key, is_gemini_fallback_enabled
|
|
|
|
monkeypatch.setenv("GEMINI_API_HARD_DISABLED", "false")
|
|
monkeypatch.setenv("GEMINI_FALLBACK_ENABLED", "true")
|
|
monkeypatch.setenv("GEMINI_ALLOWED_CONTEXTS", "code_review")
|
|
monkeypatch.setenv("GEMINI_API_KEY", "test-key")
|
|
|
|
assert is_gemini_fallback_enabled("code_review") is True
|
|
assert get_gemini_api_key("openclaw_strategy") == ""
|
|
|
|
|
|
def test_ai_provider_does_not_call_gemini_when_guard_disabled(monkeypatch):
|
|
monkeypatch.delenv("GEMINI_API_HARD_DISABLED", raising=False)
|
|
monkeypatch.delenv("GEMINI_FALLBACK_ENABLED", raising=False)
|
|
service = AIProviderService(default_provider="ollama")
|
|
failed_ollama = AIResponse(
|
|
success=False,
|
|
content="",
|
|
model="qwen2.5-coder:7b",
|
|
provider="ollama",
|
|
error="ollama down",
|
|
)
|
|
|
|
def forbidden_fallback():
|
|
raise AssertionError("Gemini fallback must be blocked by default")
|
|
|
|
result = service._gemini_fallback(failed_ollama, forbidden_fallback)
|
|
|
|
assert result.success is False
|
|
assert result.provider == "ollama"
|
|
assert "GEMINI_API_HARD_DISABLED=true" in (result.error or "")
|
|
|
|
|
|
def test_gemini_service_check_connection_is_blocked_by_default(monkeypatch):
|
|
monkeypatch.delenv("GEMINI_API_HARD_DISABLED", raising=False)
|
|
monkeypatch.delenv("GEMINI_FALLBACK_ENABLED", raising=False)
|
|
monkeypatch.setenv("GEMINI_API_KEY", "test-key")
|
|
service = GeminiService()
|
|
|
|
def forbidden_init():
|
|
raise AssertionError("SDK initialization must not run when fallback is disabled")
|
|
|
|
monkeypatch.setattr(service, "_ensure_initialized", forbidden_init)
|
|
|
|
result = service.generate("hello")
|
|
|
|
assert result.success is False
|
|
assert "GEMINI_API_HARD_DISABLED=true" in (result.error or "")
|
|
|
|
|
|
def test_mcp_collector_does_not_initialize_gemini_when_guard_disabled(monkeypatch):
|
|
import services.mcp_collector_service as mcp_mod
|
|
|
|
monkeypatch.delenv("GEMINI_API_HARD_DISABLED", raising=False)
|
|
monkeypatch.delenv("GEMINI_FALLBACK_ENABLED", raising=False)
|
|
monkeypatch.setenv("GEMINI_API_KEY", "test-key")
|
|
service = mcp_mod.MCPCollectorService()
|
|
|
|
assert service._ensure_init() is False
|
|
assert service._genai is None
|
|
|
|
|
|
def test_openclaw_direct_gemini_call_is_blocked_by_default(monkeypatch):
|
|
import services.openclaw_strategist_service as svc
|
|
|
|
monkeypatch.delenv("GEMINI_API_HARD_DISABLED", raising=False)
|
|
monkeypatch.delenv("GEMINI_FALLBACK_ENABLED", raising=False)
|
|
monkeypatch.setenv("GEMINI_API_KEY", "test-key")
|
|
|
|
assert svc._call_gemini("system", "user", caller="openclaw_qa_gemini_fallback") is None
|
|
|
|
|
|
def test_no_direct_gemini_api_key_env_read_outside_guard_or_config():
|
|
allowed = {"config.py", "services/gemini_guard.py"}
|
|
offenders = []
|
|
pattern = re.compile(r"os\.getenv\(\s*['\"]GEMINI_API_KEY['\"]")
|
|
|
|
for path in _production_python_files():
|
|
if _rel(path) in allowed:
|
|
continue
|
|
if pattern.search(path.read_text(encoding="utf-8")):
|
|
offenders.append(_rel(path))
|
|
|
|
assert offenders == []
|
|
|
|
|
|
def test_gemini_outbound_files_are_guarded():
|
|
allowed = {
|
|
"routes/openclaw_bot_routes.py",
|
|
"services/code_review_pipeline_service.py",
|
|
"services/gemini_service.py",
|
|
"services/mcp_collector_service.py",
|
|
"services/openclaw_strategist_service.py",
|
|
}
|
|
outbound_markers = (
|
|
"google.generativeai",
|
|
"genai.configure",
|
|
"GenerativeModel",
|
|
"generate_content(",
|
|
"generateContent?key=",
|
|
)
|
|
offenders = []
|
|
unguarded = []
|
|
|
|
for path in _production_python_files():
|
|
text = path.read_text(encoding="utf-8")
|
|
has_outbound = any(marker in text for marker in outbound_markers)
|
|
if not has_outbound:
|
|
continue
|
|
rel = _rel(path)
|
|
if rel not in allowed:
|
|
offenders.append(rel)
|
|
if "get_gemini_api_key" not in text:
|
|
unguarded.append(rel)
|
|
|
|
assert offenders == []
|
|
assert unguarded == []
|
|
|
|
|
|
def test_tracked_secret_manifests_do_not_contain_live_credentials():
|
|
tracked_secret_files = list(ROOT.joinpath("k8s").rglob("*.yaml"))
|
|
legacy_secret = ROOT / "k8s 2" / "03-secrets.yaml"
|
|
if legacy_secret.exists():
|
|
tracked_secret_files.append(legacy_secret)
|
|
|
|
live_secret_patterns = {
|
|
"Google API key": re.compile(r"AIza[0-9A-Za-z_-]{20,}"),
|
|
"Telegram bot token": re.compile(r"\d{8,12}:[A-Za-z0-9_-]{30,}"),
|
|
"LINE token": re.compile(r"[A-Za-z0-9+/=]{80,}"),
|
|
"hardcoded password": re.compile(
|
|
r"(POSTGRES_PASSWORD|LOGIN_PASSWORD|APP_PASSWORD|SECRET_KEY):\s*"
|
|
r"['\"](?!<)[^'\"]{6,}['\"]"
|
|
),
|
|
"inline URL password": re.compile(r"://[^:\s/]+:(?!<)[^@\s]+@"),
|
|
}
|
|
offenders = []
|
|
|
|
for path in tracked_secret_files:
|
|
text = path.read_text(encoding="utf-8")
|
|
for label, pattern in live_secret_patterns.items():
|
|
if pattern.search(text):
|
|
offenders.append(f"{path.relative_to(ROOT).as_posix()}: {label}")
|
|
|
|
assert offenders == []
|
|
|
|
|
|
def test_tracked_text_files_do_not_contain_known_live_tokens():
|
|
live_token_patterns = {
|
|
"Google API key": re.compile(r"AIza[0-9A-Za-z_-]{20,}"),
|
|
"Google OAuth access token": re.compile(r"ya29\.[0-9A-Za-z_-]{20,}"),
|
|
"Google OAuth refresh token": re.compile(r"1//0[0-9A-Za-z_-]{20,}"),
|
|
"Google OAuth client secret": re.compile(r"GOCSPX-[0-9A-Za-z_-]{12,}"),
|
|
"Telegram bot token": re.compile(r"\d{8,12}:[A-Za-z0-9_-]{30,}"),
|
|
"Ollama cloud API key": re.compile(r"\b[0-9a-f]{32}\.[A-Za-z0-9_-]{12,}\b"),
|
|
"Superset default password": re.compile(r"Wooo_Superset_\d{4}"),
|
|
}
|
|
offenders = []
|
|
|
|
for path in _tracked_text_files():
|
|
text = path.read_text(encoding="utf-8")
|
|
for label, pattern in live_token_patterns.items():
|
|
if pattern.search(text):
|
|
offenders.append(f"{_rel(path)}: {label}")
|
|
|
|
assert offenders == []
|