forked from scylladb/scylla-cluster-tests
-
Notifications
You must be signed in to change notification settings - Fork 0
/
admission_control_overload_test.py
104 lines (85 loc) · 4.19 KB
/
admission_control_overload_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import time
from invoke import exceptions
from sdcm.tester import ClusterTester
from sdcm.db_stats import PrometheusDBStats
from sdcm.utils.common import skip_optional_stage
class AdmissionControlOverloadTest(ClusterTester):
"""
Test Scylla load with cassandra-stress to trigger admission control for both read and write
"""
max_transport_requests = 0
def check_prometheus_metrics(self, start_time, now):
prometheus = PrometheusDBStats(self.monitors.nodes[0].external_address)
node_procs_blocked = 'scylla_transport_requests_blocked_memory'
node_procs_res = prometheus.query(node_procs_blocked, start_time, now)
is_admission_control_triggered = False
for node in node_procs_res:
if int(node['values'][0][1]) > 0:
self.log.info('Admission control was triggered')
is_admission_control_triggered = True
return is_admission_control_triggered
def run_load(self, job_num, job_cmd, is_prepare=False):
is_ever_triggered = False
if is_prepare and not skip_optional_stage('prepare_write'):
prepare_stress_queue = self.run_stress_thread(stress_cmd=job_cmd, stress_num=job_num, prefix='preload-',
stats_aggregate_cmds=False)
self.get_stress_results(prepare_stress_queue)
elif not is_prepare and not skip_optional_stage('main_load'):
stress_queue = []
stress_res = []
stress_queue.append(self.run_stress_thread(stress_cmd=job_cmd, stress_num=job_num,
stats_aggregate_cmds=False))
start_time = time.time()
while stress_queue:
stress_res.append(stress_queue.pop(0))
while not all(future.done() for future in stress_res[-1].results_futures):
now = time.time()
if self.check_prometheus_metrics(start_time=start_time, now=now):
is_ever_triggered = True
self.kill_stress_thread()
start_time = now
time.sleep(10)
if not is_ever_triggered:
results = []
for stress in stress_res:
try:
results.append(self.get_stress_results(stress))
except exceptions.CommandTimedOut as ex:
self.log.debug('some c-s timed out\n{}'.format(ex))
return is_ever_triggered
def run_admission_control(self, prepare_base_cmd, stress_base_cmd, job_num):
# run a write workload
if prepare_base_cmd:
self.run_load(job_num=job_num, job_cmd=prepare_base_cmd, is_prepare=True)
time.sleep(10)
# run workload load
is_admission_control_triggered = self.run_load(job_num=job_num, job_cmd=stress_base_cmd)
self.verify_no_drops_and_errors(starting_from=self.start_time)
self.assertTrue(is_admission_control_triggered, 'Admission Control wasn\'t triggered')
def read_admission_control_load(self):
"""
Test steps:
1. Run a write workload as a preparation
2. Run a read workload
"""
self.log.debug('Started running read test')
base_cmd_prepare = self.params.get('prepare_write_cmd')
base_cmd_r = self.params.get('stress_cmd_r')
self.run_admission_control(base_cmd_prepare, base_cmd_r, job_num=8)
self.log.debug('Finished running read test')
def write_admission_control_load(self):
"""
Test steps:
1. Run a write workload without need to preparation
"""
self.log.debug('Started running write test')
base_cmd_w = self.params.get('stress_cmd_w')
self.run_admission_control(None, base_cmd_w, job_num=16)
self.log.debug('Finished running write test')
def test_admission_control(self):
self.write_admission_control_load()
node = self.db_cluster.nodes[0]
node.stop_scylla(verify_up=False, verify_down=True)
node.start_scylla()
self.read_admission_control_load()
self.log.info('Admission Control Test has finished with success')