69 lines
2.7 KiB
Python
69 lines
2.7 KiB
Python
"""
|
|
operation_parser SSH 識別測試
|
|
=============================
|
|
2026-05-02 ogt + Claude Sonnet 4.6: 修補手動批准 SSH action 卡死的 bug
|
|
|
|
驗證:
|
|
- ssh host '...' → SSH_HOST + host
|
|
- ssh user@host ... → SSH_HOST + host
|
|
- ssh -o opt host ... → SSH_HOST + host
|
|
- 既有 kubectl / 中文 / restart 路徑不被 SSH 規則誤抓
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from src.services.executor import OperationType
|
|
from src.services.operation_parser import parse_operation_from_action
|
|
|
|
|
|
class TestSSHRecognition:
|
|
def test_ssh_with_bare_ip(self):
|
|
p = parse_operation_from_action("ssh 192.168.0.110 'docker image prune -a -f'")
|
|
assert p.operation_type == OperationType.SSH_HOST
|
|
assert p.resource_name == "192.168.0.110"
|
|
|
|
def test_ssh_with_user_at_host(self):
|
|
p = parse_operation_from_action("ssh wooo@192.168.0.110 'df -h'")
|
|
assert p.operation_type == OperationType.SSH_HOST
|
|
assert p.resource_name == "192.168.0.110"
|
|
|
|
def test_ssh_with_options(self):
|
|
p = parse_operation_from_action(
|
|
"ssh -o StrictHostKeyChecking=accept-new wooo@192.168.0.188 'systemctl restart ollama'"
|
|
)
|
|
assert p.operation_type == OperationType.SSH_HOST
|
|
assert p.resource_name == "192.168.0.188"
|
|
|
|
def test_ssh_with_named_alias(self):
|
|
# alias 也要當 host 處理(後端 host whitelist 會再擋)
|
|
p = parse_operation_from_action("ssh backup 'ls -lah /home/ollama/backup'")
|
|
assert p.operation_type == OperationType.SSH_HOST
|
|
assert p.resource_name == "backup"
|
|
|
|
def test_ssh_with_diagnostic_combo(self):
|
|
p = parse_operation_from_action(
|
|
"ssh 192.168.0.110 'echo \"=== CPU TOP ===\"; ps aux --sort=-%cpu | head -15'"
|
|
)
|
|
assert p.operation_type == OperationType.SSH_HOST
|
|
assert p.resource_name == "192.168.0.110"
|
|
|
|
|
|
class TestSSHDoesNotBreakExistingRoutes:
|
|
def test_kubectl_rollout_still_works(self):
|
|
p = parse_operation_from_action("kubectl rollout restart deployment/awoooi-api")
|
|
assert p.operation_type == OperationType.RESTART_DEPLOYMENT
|
|
assert p.resource_name == "awoooi-api"
|
|
|
|
def test_chinese_restart_still_works(self):
|
|
p = parse_operation_from_action("重新啟動 awoooi-worker 服務")
|
|
assert p.operation_type == OperationType.RESTART_DEPLOYMENT
|
|
assert p.resource_name == "awoooi-worker"
|
|
|
|
def test_kubectl_get_routes_to_investigate(self):
|
|
p = parse_operation_from_action("kubectl get pods -n awoooi-prod")
|
|
assert p.operation_type == OperationType.INVESTIGATE
|
|
|
|
def test_unrelated_text_still_returns_none(self):
|
|
p = parse_operation_from_action("(未設)")
|
|
assert p.operation_type is None
|