承接 Wave 6/7/8 多 engineer 在 agent 限額前完成的代碼,補 commit 解 production HEAD 隱性 import error(decision_fusion 已被 decision_manager 引用但檔案 untracked)。 新增(後端核心): - decision_fusion.py (562 行) — P2.1 方法 III(OpenClaw + Hermes + Elephant 三 LLM 融合) - aiops_timeline.py + aiops_timeline_service.py — critic B4 修復 /api/v1/aiops/timeline endpoint,DB 存取抽到 service 層遵守 leWOOOgo 積木化 - migrations/p2_decision_fusion_columns.sql + rollback — approval_records fusion 欄位 修改(後端整合): - decision_manager.py — fusion 三斷鏈修補(critic B1+B2+B3): · B1: 寫 _evidence_snapshot_ref 到 token.proposal_data · B2: fusion 前計算 complexity_score 並寫 token · B3: fusion composite 寫 token.proposal_data["decision_fusion"] - auto_approve.py — fusion + consensus 認識(critic B3+B5): · composite > 0.7 → auto_execute_eligible bypass min_confidence · source=consensus_engine + score>=0.6 → 規則可信路徑 - consensus_engine.py — db-fix _save_consensus 重用 agent_sessions - governance_agent.py — db-fix _alert PG 寫入 ai_governance_events - approval_db.py — fusion 3 欄位 + 2 partial index + CheckConstraint - db/models.py — schema 對齊 migration - core/config.py — vuln #1 修復:OLLAMA_URL/_FALLBACK_URL field_validator 拒絕公網 IP + 外部域名,僅允許私網/loopback/K8s SVC 白名單 - core/feature_flags.py — P2 fusion + consensus flags - main.py — governance_agent lifespan 啟動 - failover_alerter.py — Wave8-X2: in-memory dedup fallback(Redis 拒絕後不 fail-open) - ollama_*.py — metrics 整合 + recovery 改善 - auto_repair_service.py — verifier 接線 新增(測試 2438 行): - test_decision_fusion.py / test_governance_agent.py / test_consensus_integration.py - test_p2_db_fixes.py / test_wave8_fusion_fixes.py - test_config_url_validation.py(vuln #1 12 tests) - test_failover_alerter.py +Wave8-X2 in-memory dedup 補測 驗收: 116 tests pass (decision_fusion + wave8_fusion + config_url + consensus + governance + p2_db_fixes + failover_alerter) Conflict resolution: - 3 檔(config.py + auto_approve.py + decision_manager.py)git stash pop 衝突 保留 stashed (engineer 最終版),補回 ValueError 「公網 IP」字樣對齊 test Note: 此 commit 解 production HEAD 隱性 import error 仍未修: vuln #4 prompt injection / debugger B14 quota fail-closed / B25-B26 drain_pending_tasks / B8 governance fail alert Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Co-Authored-By: Multiple Engineers (Wave 6/7/8) <noreply@anthropic.com>
134 lines
4.7 KiB
Python
134 lines
4.7 KiB
Python
"""
|
||
OLLAMA URL endpoint poisoning 防護測試 — Wave8-X2
|
||
|
||
vuln #1:OLLAMA_URL / OLLAMA_FALLBACK_URL 缺 IP allowlist 校驗
|
||
修法:pydantic field_validator 拒絕非 private/loopback/known-hostname
|
||
|
||
2026-04-27 Wave8-X2 by Claude — vuln #1 + B14 + alerter memory dedup
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import pytest
|
||
from pydantic import ValidationError
|
||
|
||
|
||
# =============================================================================
|
||
# Helpers
|
||
# =============================================================================
|
||
|
||
|
||
def _make_settings(**kwargs):
|
||
"""建立 Settings 實例,只覆蓋指定欄位,其餘用安全預設值。"""
|
||
from src.core.config import Settings
|
||
|
||
base = {
|
||
"DATABASE_URL": "postgresql://u:p@localhost:5432/test",
|
||
"OLLAMA_URL": "http://192.168.0.111:11434",
|
||
"OLLAMA_FALLBACK_URL": "",
|
||
}
|
||
base.update(kwargs)
|
||
return Settings(**base)
|
||
|
||
|
||
# =============================================================================
|
||
# #1: 公網 IP 應被拒絕
|
||
# =============================================================================
|
||
|
||
|
||
def test_public_ip_rejected_in_ollama_url():
|
||
"""8.8.8.8 是公網 IP,應被 validator 拒絕(端點中毒攻擊情境)"""
|
||
with pytest.raises(ValidationError, match="公網"):
|
||
_make_settings(OLLAMA_URL="http://8.8.8.8:11434")
|
||
|
||
|
||
def test_public_ip_rejected_in_ollama_fallback_url():
|
||
"""FALLBACK_URL 也受同一 validator 保護"""
|
||
with pytest.raises(ValidationError, match="公網"):
|
||
_make_settings(OLLAMA_FALLBACK_URL="http://1.1.1.1:11434")
|
||
|
||
|
||
# =============================================================================
|
||
# #2: 外部域名應被拒絕
|
||
# =============================================================================
|
||
|
||
|
||
def test_external_domain_rejected():
|
||
"""attacker.com 是外部域名(非 IP,非白名單),應被拒絕"""
|
||
with pytest.raises(ValidationError, match="外部域名"):
|
||
_make_settings(OLLAMA_URL="http://attacker.com:11434")
|
||
|
||
|
||
def test_external_domain_fallback_rejected():
|
||
"""FALLBACK_URL 外部域名也應被拒絕"""
|
||
with pytest.raises(ValidationError, match="外部域名"):
|
||
_make_settings(OLLAMA_FALLBACK_URL="http://evil.example.com:11434")
|
||
|
||
|
||
# =============================================================================
|
||
# #3: 私網 IP 應通過
|
||
# =============================================================================
|
||
|
||
|
||
def test_private_ip_192_168_accepted():
|
||
"""192.168.0.111 是 RFC1918 私網 IP,應通過"""
|
||
s = _make_settings(OLLAMA_URL="http://192.168.0.111:11434")
|
||
assert s.OLLAMA_URL == "http://192.168.0.111:11434"
|
||
|
||
|
||
def test_private_ip_10_x_accepted():
|
||
"""10.x.x.x 是 RFC1918 私網 IP,應通過"""
|
||
s = _make_settings(OLLAMA_URL="http://10.0.0.5:11434")
|
||
assert s.OLLAMA_URL == "http://10.0.0.5:11434"
|
||
|
||
|
||
def test_private_ip_172_16_accepted():
|
||
"""172.16.x.x 是 RFC1918 私網 IP,應通過"""
|
||
s = _make_settings(OLLAMA_FALLBACK_URL="http://172.16.0.10:11434")
|
||
assert s.OLLAMA_FALLBACK_URL == "http://172.16.0.10:11434"
|
||
|
||
|
||
# =============================================================================
|
||
# #4: localhost / loopback 應通過
|
||
# =============================================================================
|
||
|
||
|
||
def test_localhost_accepted():
|
||
"""localhost 在 known hostname 白名單,應通過"""
|
||
s = _make_settings(OLLAMA_URL="http://localhost:11434")
|
||
assert s.OLLAMA_URL == "http://localhost:11434"
|
||
|
||
|
||
def test_loopback_ip_accepted():
|
||
"""127.0.0.1 是 loopback IP,應通過"""
|
||
s = _make_settings(OLLAMA_URL="http://127.0.0.1:11434")
|
||
assert s.OLLAMA_URL == "http://127.0.0.1:11434"
|
||
|
||
|
||
# =============================================================================
|
||
# #5: 已知 K8s Service hostname 應通過
|
||
# =============================================================================
|
||
|
||
|
||
def test_known_k8s_svc_ollama_svc_accepted():
|
||
"""ollama-svc 在 K8s Service 白名單,應通過"""
|
||
s = _make_settings(OLLAMA_URL="http://ollama-svc:11434")
|
||
assert s.OLLAMA_URL == "http://ollama-svc:11434"
|
||
|
||
|
||
def test_known_k8s_svc_ollama_fallback_svc_accepted():
|
||
"""ollama-fallback-svc 在白名單,應通過"""
|
||
s = _make_settings(OLLAMA_FALLBACK_URL="http://ollama-fallback-svc:11434")
|
||
assert s.OLLAMA_FALLBACK_URL == "http://ollama-fallback-svc:11434"
|
||
|
||
|
||
# =============================================================================
|
||
# #6: 空字串應通過(OLLAMA_FALLBACK_URL 預設值)
|
||
# =============================================================================
|
||
|
||
|
||
def test_empty_string_fallback_url_accepted():
|
||
"""OLLAMA_FALLBACK_URL 預設空字串(未設定),應通過"""
|
||
s = _make_settings(OLLAMA_FALLBACK_URL="")
|
||
assert s.OLLAMA_FALLBACK_URL == ""
|