forked from chdb-io/chdb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
test_parallel.py
executable file
·58 lines (41 loc) · 1.4 KB
/
test_parallel.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
#!python3
import concurrent.futures
import time
import sys
import unittest
import chdb
from utils import data_file
# run query parallel in n thread and benchmark
thread_count = 10
query_count = 50
if len(sys.argv) == 2:
thread_count = int(sys.argv[1])
elif len(sys.argv) == 3:
thread_count = int(sys.argv[1])
query_count = int(sys.argv[2])
thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=thread_count)
def run_query(query, fmt):
res = chdb.query(query, fmt)
if len(res) < 2000:
print(f"Error: result size is not correct {res.bytes()}")
exit(1)
def run_queries(query, fmt, count=query_count):
for _ in range(count):
run_query(query, fmt)
def run_queries_parallel(query, fmt, parallel=thread_count, count=query_count):
for _ in range(parallel):
thread_pool.submit(run_queries, query, fmt, count // parallel)
def wait():
thread_pool.shutdown(wait=True)
def benchmark(query, fmt, parallel=thread_count, count=query_count):
time_start = time.time()
run_queries_parallel(query, fmt, parallel, count)
wait()
time_end = time.time()
print("Time cost:", time_end - time_start, "s")
print("QPS:", count / (time_end - time_start))
class TestParallel(unittest.TestCase):
def test_parallel(self):
benchmark(f"SELECT * FROM file('{data_file}', Parquet) LIMIT 10", "Arrow")
if __name__ == '__main__':
unittest.main()