Skip to content

Commit

Permalink
Merge branch 'main' into aws_cli
Browse files Browse the repository at this point in the history
  • Loading branch information
conorbros authored Jan 29, 2024
2 parents 2092727 + bb92aea commit 6ab1c36
Show file tree
Hide file tree
Showing 8 changed files with 160 additions and 143 deletions.
Original file line number Diff line number Diff line change
@@ -1,159 +1,23 @@
use bytes::{Bytes, BytesMut};
use bytes::BytesMut;
use cassandra_protocol::frame::message_result::{
ColSpec, ColType, ColTypeOption, ColTypeOptionValue, RowsMetadata, RowsMetadataFlags, TableSpec,
};
use cassandra_protocol::frame::Version;
use criterion::{black_box, criterion_group, BatchSize, Criterion};
use shotover::codec::cassandra::CassandraCodecBuilder;
use shotover::codec::kafka::KafkaCodecBuilder;
use shotover::codec::{CodecBuilder, Direction};
use shotover::frame::{
cassandra::{parse_statement_single, Tracing},
value::{GenericValue, IntSize},
CassandraFrame, CassandraOperation, CassandraResult, Frame,
};
use shotover::message::{Message, ProtocolType};
use tokio_util::codec::{Decoder, Encoder};

const KAFKA_REQUESTS: &[(&[u8], &str)] = &[
(
include_bytes!("kafka_requests/metadata.bin"),
"request_metadata",
),
(
include_bytes!("kafka_requests/list_offsets.bin"),
"request_list_offsets",
),
(include_bytes!("kafka_requests/fetch.bin"), "request_fetch"),
(
include_bytes!("kafka_requests/produce.bin"),
"request_produce",
),
];
use shotover::message::Message;
use tokio_util::codec::Encoder;

fn criterion_benchmark(c: &mut Criterion) {
super::init();
let mut group = c.benchmark_group("codec");
let mut group = c.benchmark_group("cassandra_codec");
group.noise_threshold(0.2);

for (message, file_name) in KAFKA_REQUESTS {
{
let mut input = BytesMut::new();
input.extend_from_slice(message);
group.bench_function(format!("kafka_decode_{file_name}"), |b| {
b.iter_batched(
|| {
(
// recreate codec since it is stateful
KafkaCodecBuilder::new(Direction::Source, "kafka".to_owned()).build(),
input.clone(),
)
},
|((mut decoder, _encoder), mut input)| {
let mut result = decoder.decode(&mut input).unwrap().unwrap();
for message in &mut result {
message.frame();
}
black_box(result)
},
BatchSize::SmallInput,
)
});
}
{
let mut message = Message::from_bytes(
Bytes::from(message.to_vec()),
ProtocolType::Kafka {
request_header: None,
},
);
// force the message to be parsed and clear raw message
message.frame();
message.invalidate_cache();

let messages = vec![message];

group.bench_function(format!("kafka_encode_{file_name}"), |b| {
b.iter_batched(
|| {
(
// recreate codec since it is stateful
KafkaCodecBuilder::new(Direction::Sink, "kafka".to_owned()).build(),
messages.clone(),
)
},
|((_decoder, mut encoder), messages)| {
let mut bytes = BytesMut::new();
encoder.encode(messages, &mut bytes).unwrap();
black_box(bytes)
},
BatchSize::SmallInput,
)
});
}
}

{
let mut input = BytesMut::new();
for (message, _) in KAFKA_REQUESTS {
input.extend_from_slice(message);
}
group.bench_function("kafka_decode_all", |b| {
b.iter_batched(
|| {
(
// recreate codec since it is stateful
KafkaCodecBuilder::new(Direction::Source, "kafka".to_owned()).build(),
input.clone(),
)
},
|((mut decoder, _encoder), mut input)| {
let mut result = decoder.decode(&mut input).unwrap().unwrap();
for message in &mut result {
message.frame();
}
black_box(result)
},
BatchSize::SmallInput,
)
});
}

{
let mut messages = vec![];
for (message, _) in KAFKA_REQUESTS {
let mut message = Message::from_bytes(
Bytes::from(message.to_vec()),
ProtocolType::Kafka {
request_header: None,
},
);
// force the message to be parsed and clear raw message
message.frame();
message.invalidate_cache();

messages.push(message);
}

group.bench_function("kafka_encode_all", |b| {
b.iter_batched(
|| {
(
// recreate codec since it is stateful
KafkaCodecBuilder::new(Direction::Sink, "kafka".to_owned()).build(),
messages.clone(),
)
},
|((_decoder, mut encoder), messages)| {
let mut bytes = BytesMut::new();
encoder.encode(messages, &mut bytes).unwrap();
black_box(bytes)
},
BatchSize::SmallInput,
)
});
}

{
let messages = vec![Message::from_frame(Frame::Cassandra(CassandraFrame {
version: Version::V4,
Expand All @@ -169,7 +33,7 @@ fn criterion_benchmark(c: &mut Criterion) {
let (_, mut encoder) =
CassandraCodecBuilder::new(Direction::Sink, "cassandra".to_owned()).build();

group.bench_function("encode_cassandra_system.local_query", |b| {
group.bench_function("encode_system.local_query", |b| {
b.iter_batched(
|| messages.clone(),
|messages| {
Expand All @@ -194,7 +58,7 @@ fn criterion_benchmark(c: &mut Criterion) {
let (_, mut encoder) =
CassandraCodecBuilder::new(Direction::Sink, "cassandra".to_owned()).build();

group.bench_function("encode_cassandra_system.local_result", |b| {
group.bench_function("encode_system.local_result", |b| {
b.iter_batched(
|| messages.clone(),
|messages| {
Expand Down
147 changes: 147 additions & 0 deletions shotover/benches/benches/codec/kafka.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
use bytes::{Bytes, BytesMut};
use criterion::{black_box, criterion_group, BatchSize, Criterion};
use shotover::codec::kafka::KafkaCodecBuilder;
use shotover::codec::{CodecBuilder, Direction};
use shotover::message::{Message, ProtocolType};
use tokio_util::codec::{Decoder, Encoder};

const KAFKA_REQUESTS: &[(&[u8], &str)] = &[
(
include_bytes!("kafka_requests/metadata.bin"),
"request_metadata",
),
(
include_bytes!("kafka_requests/list_offsets.bin"),
"request_list_offsets",
),
(include_bytes!("kafka_requests/fetch.bin"), "request_fetch"),
(
include_bytes!("kafka_requests/produce.bin"),
"request_produce",
),
];

fn criterion_benchmark(c: &mut Criterion) {
let mut group = c.benchmark_group("kafka_codec");
group.noise_threshold(0.2);

for (message, file_name) in KAFKA_REQUESTS {
{
let mut input = BytesMut::new();
input.extend_from_slice(message);
group.bench_function(format!("decode_{file_name}"), |b| {
b.iter_batched(
|| {
(
// recreate codec since it is stateful
KafkaCodecBuilder::new(Direction::Source, "kafka".to_owned()).build(),
input.clone(),
)
},
|((mut decoder, _encoder), mut input)| {
let mut result = decoder.decode(&mut input).unwrap().unwrap();
for message in &mut result {
message.frame();
}
black_box(result)
},
BatchSize::SmallInput,
)
});
}
{
let mut message = Message::from_bytes(
Bytes::from(message.to_vec()),
ProtocolType::Kafka {
request_header: None,
},
);
// force the message to be parsed and clear raw message
message.frame();
message.invalidate_cache();

let messages = vec![message];

group.bench_function(format!("encode_{file_name}"), |b| {
b.iter_batched(
|| {
(
// recreate codec since it is stateful
KafkaCodecBuilder::new(Direction::Sink, "kafka".to_owned()).build(),
messages.clone(),
)
},
|((_decoder, mut encoder), messages)| {
let mut bytes = BytesMut::new();
encoder.encode(messages, &mut bytes).unwrap();
black_box(bytes)
},
BatchSize::SmallInput,
)
});
}
}

{
let mut input = BytesMut::new();
for (message, _) in KAFKA_REQUESTS {
input.extend_from_slice(message);
}
group.bench_function("kafka_decode_all", |b| {
b.iter_batched(
|| {
(
// recreate codec since it is stateful
KafkaCodecBuilder::new(Direction::Source, "kafka".to_owned()).build(),
input.clone(),
)
},
|((mut decoder, _encoder), mut input)| {
let mut result = decoder.decode(&mut input).unwrap().unwrap();
for message in &mut result {
message.frame();
}
black_box(result)
},
BatchSize::SmallInput,
)
});
}

{
let mut messages = vec![];
for (message, _) in KAFKA_REQUESTS {
let mut message = Message::from_bytes(
Bytes::from(message.to_vec()),
ProtocolType::Kafka {
request_header: None,
},
);
// force the message to be parsed and clear raw message
message.frame();
message.invalidate_cache();

messages.push(message);
}

group.bench_function("encode_all", |b| {
b.iter_batched(
|| {
(
// recreate codec since it is stateful
KafkaCodecBuilder::new(Direction::Sink, "kafka".to_owned()).build(),
messages.clone(),
)
},
|((_decoder, mut encoder), messages)| {
let mut bytes = BytesMut::new();
encoder.encode(messages, &mut bytes).unwrap();
black_box(bytes)
},
BatchSize::SmallInput,
)
});
}
}

criterion_group!(benches, criterion_benchmark);
2 changes: 2 additions & 0 deletions shotover/benches/benches/codec/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod cassandra;
pub mod kafka;
6 changes: 5 additions & 1 deletion shotover/benches/benches/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,8 @@ fn init() {
std::env::set_var("RUST_LIB_BACKTRACE", "0");
}

criterion_main!(chain::benches, codec::benches);
criterion_main!(
chain::benches,
codec::kafka::benches,
codec::cassandra::benches
);

0 comments on commit 6ab1c36

Please sign in to comment.