#!/usr/bin/env python3 # -*- coding: utf-8 -*- """Gemini fallback kill-switch contract.""" import re import subprocess from pathlib import Path from services.ai_provider import AIProviderService, AIResponse from services.gemini_service import GeminiService ROOT = Path(__file__).resolve().parents[1] def _production_python_files(): for folder in ("services", "routes"): yield from sorted((ROOT / folder).rglob("*.py")) for filename in ("scheduler.py", "config.py"): path = ROOT / filename if path.exists(): yield path def _rel(path: Path) -> str: return path.relative_to(ROOT).as_posix() def _tracked_text_files(): result = subprocess.run( ["git", "ls-files", "-z"], cwd=ROOT, check=True, capture_output=True, ) for raw in result.stdout.split(b"\0"): if not raw: continue path = ROOT / raw.decode("utf-8") if path.is_file(): try: path.read_text(encoding="utf-8") except UnicodeDecodeError: continue yield path def test_gemini_guard_defaults_disabled(monkeypatch): from services.gemini_guard import get_gemini_api_key, is_gemini_fallback_enabled monkeypatch.delenv("GEMINI_API_HARD_DISABLED", raising=False) monkeypatch.delenv("GEMINI_FALLBACK_ENABLED", raising=False) monkeypatch.setenv("GEMINI_API_KEY", "test-key") assert is_gemini_fallback_enabled("test") is False assert get_gemini_api_key("test") == "" def test_gemini_guard_hard_switch_blocks_enabled_fallback(monkeypatch): from services.gemini_guard import get_gemini_api_key, is_gemini_fallback_enabled monkeypatch.delenv("GEMINI_API_HARD_DISABLED", raising=False) monkeypatch.setenv("GEMINI_FALLBACK_ENABLED", "true") monkeypatch.setenv("GEMINI_API_KEY", "test-key") assert is_gemini_fallback_enabled("test") is False assert get_gemini_api_key("test") == "" def test_gemini_guard_requires_explicit_emergency_unlock(monkeypatch): from services.gemini_guard import get_gemini_api_key, is_gemini_fallback_enabled monkeypatch.setenv("GEMINI_API_HARD_DISABLED", "false") monkeypatch.setenv("GEMINI_FALLBACK_ENABLED", "true") monkeypatch.setenv("GEMINI_API_KEY", "test-key") assert is_gemini_fallback_enabled("test") is True assert get_gemini_api_key("test") == "test-key" def test_gemini_guard_respects_context_allowlist(monkeypatch): from services.gemini_guard import get_gemini_api_key, is_gemini_fallback_enabled monkeypatch.setenv("GEMINI_API_HARD_DISABLED", "false") monkeypatch.setenv("GEMINI_FALLBACK_ENABLED", "true") monkeypatch.setenv("GEMINI_ALLOWED_CONTEXTS", "code_review") monkeypatch.setenv("GEMINI_API_KEY", "test-key") assert is_gemini_fallback_enabled("code_review") is True assert get_gemini_api_key("openclaw_strategy") == "" def test_ai_provider_does_not_call_gemini_when_guard_disabled(monkeypatch): monkeypatch.delenv("GEMINI_API_HARD_DISABLED", raising=False) monkeypatch.delenv("GEMINI_FALLBACK_ENABLED", raising=False) service = AIProviderService(default_provider="ollama") failed_ollama = AIResponse( success=False, content="", model="qwen2.5-coder:7b", provider="ollama", error="ollama down", ) def forbidden_fallback(): raise AssertionError("Gemini fallback must be blocked by default") result = service._gemini_fallback(failed_ollama, forbidden_fallback) assert result.success is False assert result.provider == "ollama" assert "GEMINI_API_HARD_DISABLED=true" in (result.error or "") def test_gemini_service_check_connection_is_blocked_by_default(monkeypatch): monkeypatch.delenv("GEMINI_API_HARD_DISABLED", raising=False) monkeypatch.delenv("GEMINI_FALLBACK_ENABLED", raising=False) monkeypatch.setenv("GEMINI_API_KEY", "test-key") service = GeminiService() def forbidden_init(): raise AssertionError("SDK initialization must not run when fallback is disabled") monkeypatch.setattr(service, "_ensure_initialized", forbidden_init) result = service.generate("hello") assert result.success is False assert "GEMINI_API_HARD_DISABLED=true" in (result.error or "") def test_mcp_collector_does_not_initialize_gemini_when_guard_disabled(monkeypatch): import services.mcp_collector_service as mcp_mod monkeypatch.delenv("GEMINI_API_HARD_DISABLED", raising=False) monkeypatch.delenv("GEMINI_FALLBACK_ENABLED", raising=False) monkeypatch.setenv("GEMINI_API_KEY", "test-key") service = mcp_mod.MCPCollectorService() assert service._ensure_init() is False assert service._genai is None def test_openclaw_direct_gemini_call_is_blocked_by_default(monkeypatch): import services.openclaw_strategist_service as svc monkeypatch.delenv("GEMINI_API_HARD_DISABLED", raising=False) monkeypatch.delenv("GEMINI_FALLBACK_ENABLED", raising=False) monkeypatch.setenv("GEMINI_API_KEY", "test-key") assert svc._call_gemini("system", "user", caller="openclaw_qa_gemini_fallback") is None def test_no_direct_gemini_api_key_env_read_outside_guard_or_config(): allowed = {"config.py", "services/gemini_guard.py"} offenders = [] pattern = re.compile(r"os\.getenv\(\s*['\"]GEMINI_API_KEY['\"]") for path in _production_python_files(): if _rel(path) in allowed: continue if pattern.search(path.read_text(encoding="utf-8")): offenders.append(_rel(path)) assert offenders == [] def test_gemini_outbound_files_are_guarded(): allowed = { "routes/openclaw_bot_routes.py", "services/code_review_pipeline_service.py", "services/gemini_service.py", "services/mcp_collector_service.py", "services/openclaw_strategist_service.py", } outbound_markers = ( "google.generativeai", "genai.configure", "GenerativeModel", "generate_content(", "generateContent?key=", ) offenders = [] unguarded = [] for path in _production_python_files(): text = path.read_text(encoding="utf-8") has_outbound = any(marker in text for marker in outbound_markers) if not has_outbound: continue rel = _rel(path) if rel not in allowed: offenders.append(rel) if "get_gemini_api_key" not in text: unguarded.append(rel) assert offenders == [] assert unguarded == [] def test_tracked_secret_manifests_do_not_contain_live_credentials(): tracked_secret_files = list(ROOT.joinpath("k8s").rglob("*.yaml")) legacy_secret = ROOT / "k8s 2" / "03-secrets.yaml" if legacy_secret.exists(): tracked_secret_files.append(legacy_secret) live_secret_patterns = { "Google API key": re.compile(r"AIza[0-9A-Za-z_-]{20,}"), "Telegram bot token": re.compile(r"\d{8,12}:[A-Za-z0-9_-]{30,}"), "LINE token": re.compile(r"[A-Za-z0-9+/=]{80,}"), "hardcoded password": re.compile( r"(POSTGRES_PASSWORD|LOGIN_PASSWORD|APP_PASSWORD|SECRET_KEY):\s*" r"['\"](?!<)[^'\"]{6,}['\"]" ), "inline URL password": re.compile(r"://[^:\s/]+:(?!<)[^@\s]+@"), } offenders = [] for path in tracked_secret_files: text = path.read_text(encoding="utf-8") for label, pattern in live_secret_patterns.items(): if pattern.search(text): offenders.append(f"{path.relative_to(ROOT).as_posix()}: {label}") assert offenders == [] def test_tracked_text_files_do_not_contain_known_live_tokens(): live_token_patterns = { "Google API key": re.compile(r"AIza[0-9A-Za-z_-]{20,}"), "Google OAuth access token": re.compile(r"ya29\.[0-9A-Za-z_-]{20,}"), "Google OAuth refresh token": re.compile(r"1//0[0-9A-Za-z_-]{20,}"), "Google OAuth client secret": re.compile(r"GOCSPX-[0-9A-Za-z_-]{12,}"), "Telegram bot token": re.compile(r"\d{8,12}:[A-Za-z0-9_-]{30,}"), "Ollama cloud API key": re.compile(r"\b[0-9a-f]{32}\.[A-Za-z0-9_-]{12,}\b"), "Superset default password": re.compile(r"Wooo_Superset_\d{4}"), } offenders = [] for path in _tracked_text_files(): text = path.read_text(encoding="utf-8") for label, pattern in live_token_patterns.items(): if pattern.search(text): offenders.append(f"{_rel(path)}: {label}") assert offenders == []