-
Notifications
You must be signed in to change notification settings - Fork 0
/
benchmark_test.py
95 lines (76 loc) · 2.94 KB
/
benchmark_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
import json
from io import BytesIO
import avro.io
import avro.schema
import fastavro
import pytest
class TestBenchmark(object):
@pytest.fixture(scope="session")
def avro_schema_json(self):
with open("fixtures/schema.json", "r") as schema:
return json.loads(schema.read())
@pytest.fixture(scope="session")
def avro_encoder(self, avro_schema_json):
schema = avro.schema.SchemaFromJSONData(avro_schema_json)
def encoder(message):
writer = avro.io.DatumWriter(schema)
writer_stream = BytesIO()
encoder = avro.io.BinaryEncoder(writer_stream)
writer.write(message, encoder)
return writer_stream.getvalue()
return encoder
@pytest.fixture(scope="session")
def avro_decoder(self, avro_schema_json):
schema = avro.schema.SchemaFromJSONData(avro_schema_json)
def decoder(encoded_message):
reader = avro.io.DatumReader(schema)
reader_stream = BytesIO(encoded_message)
decoder = avro.io.BinaryDecoder(reader_stream)
return reader.read(decoder)
return decoder
@pytest.fixture
def fastavro_encoder(self, avro_schema_json):
def encoder(message):
writer_stream = BytesIO()
fastavro.schemaless_writer(writer_stream, avro_schema_json, message)
return writer_stream.getvalue()
return encoder
@pytest.fixture
def fastavro_decoder(self, avro_schema_json):
def decoder(encoded_message):
if not isinstance(encoded_message, bytes):
raise TypeError
reader_stream = BytesIO(encoded_message)
return fastavro.schemaless_reader(reader_stream, avro_schema_json)
return decoder
@pytest.fixture(scope="session")
def record_data(self, avro_schema_json):
with open("fixtures/record.json", "r") as schema:
return json.loads(schema.read())
@pytest.fixture(scope="session")
def record_binary(self, avro_encoder, record_data):
return avro_encoder(record_data)
@pytest.fixture(params=["avro_encoder", "fastavro_encoder"])
def encoder(self, request):
return request.getfixturevalue(request.param)
@pytest.fixture(params=["avro_decoder", "fastavro_decoder"])
def decoder(self, request):
return request.getfixturevalue(request.param)
@pytest.mark.benchmark(group="Encoders",)
def test_avro_encoder(self, benchmark, encoder, record_data):
benchmark.pedantic(
target=encoder,
args=[record_data],
iterations=10,
rounds=10000,
warmup_rounds=100,
)
@pytest.mark.benchmark(group="Decoders",)
def test_decoder(self, benchmark, decoder, record_binary):
benchmark.pedantic(
target=decoder,
args=[record_binary],
iterations=10,
rounds=10000,
warmup_rounds=100,
)