All checks were successful
CD Pipeline / tests (push) Successful in 1m51s
Code Review / ai-code-review (push) Successful in 30s
Deploy Alert Rules / Deploy Prometheus Alert Rules (push) Successful in 42s
CD Pipeline / build-and-deploy (push) Successful in 8m21s
CD Pipeline / post-deploy-checks (push) Successful in 4m18s
98 lines
3.0 KiB
Python
98 lines
3.0 KiB
Python
from datetime import datetime
|
|
|
|
from src.api.v1.webhooks import (
|
|
_should_bypass_alertmanager_llm,
|
|
_should_use_alertmanager_rule_first,
|
|
)
|
|
from src.services.decision_manager import _should_escalate_auto_approve_rejection
|
|
from src.services.telegram_gateway import _format_resolved_guard_stamp
|
|
|
|
|
|
def test_host_resource_yaml_no_action_bypasses_llm():
|
|
rule_response = {
|
|
"rule_id": "host_resource_alert",
|
|
"suggested_action": "NO_ACTION",
|
|
"kubectl_command": "",
|
|
}
|
|
|
|
assert _should_bypass_alertmanager_llm(rule_response, "host_resource") is True
|
|
|
|
|
|
def test_generic_fallback_does_not_bypass_llm():
|
|
rule_response = {
|
|
"rule_id": "generic_fallback",
|
|
"suggested_action": "NO_ACTION",
|
|
"kubectl_command": "",
|
|
}
|
|
|
|
assert _should_bypass_alertmanager_llm(rule_response, "host_resource") is False
|
|
|
|
|
|
def test_non_host_category_does_not_bypass_llm():
|
|
rule_response = {
|
|
"rule_id": "host_resource_alert",
|
|
"suggested_action": "NO_ACTION",
|
|
"kubectl_command": "",
|
|
}
|
|
|
|
assert _should_bypass_alertmanager_llm(rule_response, "kubernetes") is False
|
|
|
|
|
|
def test_backup_failure_yaml_no_action_bypasses_llm():
|
|
rule_response = {
|
|
"rule_id": "host_backup_failed",
|
|
"suggested_action": "NO_ACTION",
|
|
"kubectl_command": "",
|
|
}
|
|
|
|
assert _should_bypass_alertmanager_llm(rule_response, "backup_failure") is True
|
|
|
|
|
|
def test_host_resource_ssh_rule_uses_rule_first():
|
|
rule_response = {
|
|
"rule_id": "host_resource_alert",
|
|
"suggested_action": "SSH_DIAGNOSE",
|
|
"kubectl_command": "ssh {host} 'df -h'",
|
|
}
|
|
|
|
assert _should_use_alertmanager_rule_first(rule_response, "host_resource") is True
|
|
|
|
|
|
def test_backup_failure_ssh_rule_uses_rule_first():
|
|
rule_response = {
|
|
"rule_id": "host_backup_failed",
|
|
"suggested_action": "SSH_DIAGNOSE",
|
|
"kubectl_command": "ssh {host} 'tail -80 backup.log'",
|
|
}
|
|
|
|
assert _should_use_alertmanager_rule_first(rule_response, "backup_failure") is True
|
|
|
|
|
|
def test_generic_fallback_does_not_use_rule_first():
|
|
rule_response = {
|
|
"rule_id": "generic_fallback",
|
|
"suggested_action": "SSH_DIAGNOSE",
|
|
"kubectl_command": "ssh {host} 'df -h'",
|
|
}
|
|
|
|
assert _should_use_alertmanager_rule_first(rule_response, "host_resource") is False
|
|
|
|
|
|
def test_manual_gate_reasons_escalate_to_emergency_intervention():
|
|
assert _should_escalate_auto_approve_rejection("no_executable_action") is True
|
|
assert _should_escalate_auto_approve_rejection("no_playbook") is True
|
|
assert _should_escalate_auto_approve_rejection("critical_operation") is False
|
|
|
|
|
|
def test_resolved_guard_stamp_without_timestamp_is_clean():
|
|
assert _format_resolved_guard_stamp(None) == "✅ 此事件已解決"
|
|
|
|
|
|
def test_resolved_guard_stamp_with_timestamp_formats_time():
|
|
resolved_at = datetime(2026, 4, 25, 0, 2)
|
|
|
|
assert (
|
|
_format_resolved_guard_stamp(resolved_at)
|
|
== "✅ 此事件已於 2026-04-25 00:02 解決"
|
|
)
|