Skip to content

Commit

Permalink
Add v5 benches (#1432)
Browse files Browse the repository at this point in the history
  • Loading branch information
conorbros authored Jan 30, 2024
1 parent a897a4d commit 22f242b
Show file tree
Hide file tree
Showing 3 changed files with 329 additions and 7 deletions.
323 changes: 317 additions & 6 deletions shotover/benches/benches/codec/cassandra.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use cassandra_protocol::frame::message_result::{
ColSpec, ColType, ColTypeOption, ColTypeOptionValue, RowsMetadata, RowsMetadataFlags, TableSpec,
};
use cassandra_protocol::frame::Version;
use criterion::{black_box, criterion_group, BatchSize, Criterion};
use criterion::{criterion_group, BatchSize, Criterion};
use shotover::codec::cassandra::CassandraCodecBuilder;
use shotover::codec::{CodecBuilder, Direction};
use shotover::frame::{
Expand All @@ -12,7 +12,7 @@ use shotover::frame::{
CassandraFrame, CassandraOperation, CassandraResult, Frame,
};
use shotover::message::Message;
use tokio_util::codec::Encoder;
use tokio_util::codec::{Decoder, Encoder};

fn criterion_benchmark(c: &mut Criterion) {
let mut group = c.benchmark_group("cassandra_codec");
Expand All @@ -33,17 +33,127 @@ fn criterion_benchmark(c: &mut Criterion) {
let (_, mut encoder) =
CassandraCodecBuilder::new(Direction::Sink, "cassandra".to_owned()).build();

group.bench_function("encode_system.local_query", |b| {
encoder.set_startup_state_ext("NONE".to_string(), Version::V4);

group.bench_function("encode_system.local_query_v4_no_compression", |b| {
b.iter_batched(
|| messages.clone(),
|messages| {
let mut bytes = BytesMut::new();
encoder.encode(messages, &mut bytes).unwrap();
bytes
},
BatchSize::SmallInput,
)
});

let (mut decoder, mut encoder) =
CassandraCodecBuilder::new(Direction::Sink, "cassandra".to_owned()).build();

encoder.set_startup_state_ext("NONE".to_string(), Version::V4);

group.bench_function("decode_system.local_query_v4_no_compression", |b| {
b.iter_batched(
|| {
let mut bytes = BytesMut::new();
encoder.encode(messages.clone(), &mut bytes).unwrap();
bytes
},
|mut bytes| decoder.decode(&mut bytes).unwrap(),
BatchSize::SmallInput,
)
});
}

{
let messages = vec![Message::from_frame(Frame::Cassandra(CassandraFrame {
version: Version::V4,
stream_id: 1,
tracing: Tracing::Request(false),
warnings: vec![],
operation: CassandraOperation::Query {
query: Box::new(parse_statement_single("SELECT * FROM system.local;")),
params: Box::default(),
},
}))];

let (_, mut encoder) =
CassandraCodecBuilder::new(Direction::Sink, "cassandra".to_owned()).build();

encoder.set_startup_state_ext("NONE".to_string(), Version::V4);

group.bench_function("encode_system.local_query_v4_lz4_compression", |b| {
b.iter_batched(
|| messages.clone(),
|messages| {
let mut bytes = BytesMut::new();
encoder.encode(messages, &mut bytes).unwrap();
bytes
},
BatchSize::SmallInput,
)
});

let (mut decoder, mut encoder) =
CassandraCodecBuilder::new(Direction::Sink, "cassandra".to_owned()).build();

encoder.set_startup_state_ext("NONE".to_string(), Version::V4);

group.bench_function("decode_system.local_query_v4_lz4_compression", |b| {
b.iter_batched(
|| {
let mut bytes = BytesMut::new();
encoder.encode(messages.clone(), &mut bytes).unwrap();
bytes
},
|mut bytes| decoder.decode(&mut bytes).unwrap(),
BatchSize::SmallInput,
)
});
}

{
let messages = vec![Message::from_frame(Frame::Cassandra(CassandraFrame {
version: Version::V4,
stream_id: 0,
tracing: Tracing::Response(None),
warnings: vec![],
operation: CassandraOperation::Result(peers_v2_result()),
}))];

let (_, mut encoder) =
CassandraCodecBuilder::new(Direction::Sink, "cassandra".to_owned()).build();

encoder.set_startup_state_ext("NONE".to_string(), Version::V5);

group.bench_function("encode_system.local_result_v4_no_compression", |b| {
b.iter_batched(
|| messages.clone(),
|messages| {
let mut bytes = BytesMut::new();
encoder.encode(messages, &mut bytes).unwrap();
black_box(bytes)
bytes
},
BatchSize::SmallInput,
)
});

let (mut decoder, mut encoder) =
CassandraCodecBuilder::new(Direction::Sink, "cassandra".to_owned()).build();

encoder.set_startup_state_ext("NONE".to_string(), Version::V5);

group.bench_function("decode_system.local_result_v4_no_compression", |b| {
b.iter_batched(
|| {
let mut bytes = BytesMut::new();
encoder.encode(messages.clone(), &mut bytes).unwrap();
bytes
},
|mut bytes| decoder.decode(&mut bytes).unwrap(),
BatchSize::SmallInput,
)
});
}

{
Expand All @@ -58,14 +168,215 @@ fn criterion_benchmark(c: &mut Criterion) {
let (_, mut encoder) =
CassandraCodecBuilder::new(Direction::Sink, "cassandra".to_owned()).build();

group.bench_function("encode_system.local_result", |b| {
encoder.set_startup_state_ext("LZ4".to_string(), Version::V5);

group.bench_function("encode_system.local_result_v4_lz4_compression", |b| {
b.iter_batched(
|| messages.clone(),
|messages| {
let mut bytes = BytesMut::new();
encoder.encode(messages, &mut bytes).unwrap();
black_box(bytes)
bytes
},
BatchSize::SmallInput,
)
});

let (mut decoder, mut encoder) =
CassandraCodecBuilder::new(Direction::Sink, "cassandra".to_owned()).build();

encoder.set_startup_state_ext("LZ4".to_string(), Version::V5);

group.bench_function("decode_system.local_result_v4_lz4_compression", |b| {
b.iter_batched(
|| {
let mut bytes = BytesMut::new();
encoder.encode(messages.clone(), &mut bytes).unwrap();
bytes
},
|mut bytes| decoder.decode(&mut bytes).unwrap(),
BatchSize::SmallInput,
)
});
}

{
let messages = vec![Message::from_frame(Frame::Cassandra(CassandraFrame {
version: Version::V5,
stream_id: 1,
tracing: Tracing::Request(false),
warnings: vec![],
operation: CassandraOperation::Query {
query: Box::new(parse_statement_single("SELECT * FROM system.local;")),
params: Box::default(),
},
}))];

let (_, mut encoder) =
CassandraCodecBuilder::new(Direction::Sink, "cassandra".to_owned()).build();

encoder.set_startup_state_ext("NONE".to_string(), Version::V5);

group.bench_function("encode_system.local_query_v5_no_compression", |b| {
b.iter_batched(
|| messages.clone(),
|messages| {
let mut bytes = BytesMut::new();
encoder.encode(messages, &mut bytes).unwrap();
bytes
},
BatchSize::SmallInput,
)
});

let (mut decoder, mut encoder) =
CassandraCodecBuilder::new(Direction::Sink, "cassandra".to_owned()).build();

encoder.set_startup_state_ext("LZ4".to_string(), Version::V5);

group.bench_function("decode_system.local_query_v5_no_compression", |b| {
b.iter_batched(
|| {
let mut bytes = BytesMut::new();
encoder.encode(messages.clone(), &mut bytes).unwrap();
bytes
},
|mut bytes| decoder.decode(&mut bytes).unwrap(),
BatchSize::SmallInput,
)
});
}

{
let messages = vec![Message::from_frame(Frame::Cassandra(CassandraFrame {
version: Version::V5,
stream_id: 1,
tracing: Tracing::Request(false),
warnings: vec![],
operation: CassandraOperation::Query {
query: Box::new(parse_statement_single("SELECT * FROM system.local;")),
params: Box::default(),
},
}))];

let (_, mut encoder) =
CassandraCodecBuilder::new(Direction::Sink, "cassandra".to_owned()).build();

encoder.set_startup_state_ext("LZ4".to_string(), Version::V5);

group.bench_function("encode_system.local_query_v5_lz4_compression", |b| {
b.iter_batched(
|| messages.clone(),
|messages| {
let mut bytes = BytesMut::new();
encoder.encode(messages, &mut bytes).unwrap();
bytes
},
BatchSize::SmallInput,
)
});

let (mut decoder, mut encoder) =
CassandraCodecBuilder::new(Direction::Sink, "cassandra".to_owned()).build();

encoder.set_startup_state_ext("LZ4".to_string(), Version::V5);

group.bench_function("decode_system.local_query_v5_lz4_compression", |b| {
b.iter_batched(
|| {
let mut bytes = BytesMut::new();
encoder.encode(messages.clone(), &mut bytes).unwrap();
bytes
},
|mut bytes| decoder.decode(&mut bytes).unwrap(),
BatchSize::SmallInput,
)
});
}

{
let messages = vec![Message::from_frame(Frame::Cassandra(CassandraFrame {
version: Version::V5,
stream_id: 0,
tracing: Tracing::Response(None),
warnings: vec![],
operation: CassandraOperation::Result(peers_v2_result()),
}))];

let (_, mut encoder) =
CassandraCodecBuilder::new(Direction::Sink, "cassandra".to_owned()).build();

encoder.set_startup_state_ext("NONE".to_string(), Version::V5);

group.bench_function("encode_system.local_result_v5_no_compression", |b| {
b.iter_batched(
|| messages.clone(),
|messages| {
let mut bytes = BytesMut::new();
encoder.encode(messages, &mut bytes).unwrap();
bytes
},
BatchSize::SmallInput,
)
});

let (mut decoder, mut encoder) =
CassandraCodecBuilder::new(Direction::Sink, "cassandra".to_owned()).build();

encoder.set_startup_state_ext("NONE".to_string(), Version::V5);

group.bench_function("decode_system.local_result_v5_no_compression", |b| {
b.iter_batched(
|| {
let mut bytes = BytesMut::new();
encoder.encode(messages.clone(), &mut bytes).unwrap();
bytes
},
|mut bytes| decoder.decode(&mut bytes).unwrap(),
BatchSize::SmallInput,
)
});
}

{
let messages = vec![Message::from_frame(Frame::Cassandra(CassandraFrame {
version: Version::V5,
stream_id: 0,
tracing: Tracing::Response(None),
warnings: vec![],
operation: CassandraOperation::Result(peers_v2_result()),
}))];

let (_, mut encoder) =
CassandraCodecBuilder::new(Direction::Sink, "cassandra".to_owned()).build();

encoder.set_startup_state_ext("LZ4".to_string(), Version::V5);

group.bench_function("encode_system.local_result_v5_lz4_compression", |b| {
b.iter_batched(
|| messages.clone(),
|messages| {
let mut bytes = BytesMut::new();
encoder.encode(messages, &mut bytes).unwrap();
bytes
},
BatchSize::SmallInput,
)
});

let (mut decoder, mut encoder) =
CassandraCodecBuilder::new(Direction::Sink, "cassandra".to_owned()).build();

encoder.set_startup_state_ext("LZ4".to_string(), Version::V5);

group.bench_function("decode_system.local_result_v5_lz4_compression", |b| {
b.iter_batched(
|| {
let mut bytes = BytesMut::new();
encoder.encode(messages.clone(), &mut bytes).unwrap();
bytes
},
|mut bytes| decoder.decode(&mut bytes).unwrap(),
BatchSize::SmallInput,
)
});
Expand Down
2 changes: 1 addition & 1 deletion shotover/benches/benches/codec/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ fn criterion_benchmark(c: &mut Criterion) {
for (message, _) in KAFKA_REQUESTS {
input.extend_from_slice(message);
}
group.bench_function("kafka_decode_all", |b| {
group.bench_function("decode_all", |b| {
b.iter_batched(
|| {
(
Expand Down
11 changes: 11 additions & 0 deletions shotover/src/codec/cassandra.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use cql3_parser::cassandra_statement::CassandraStatement;
use cql3_parser::common::Identifier;
use lz4_flex::{block::get_maximum_output_size, compress_into, decompress};
use metrics::{register_counter, Counter, Histogram};
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Instant;
Expand Down Expand Up @@ -773,6 +774,16 @@ impl Encoder<Messages> for CassandraEncoder {
}

impl CassandraEncoder {
pub fn set_startup_state_ext(&mut self, compression: String, version: Version) {
let mut startup_map = HashMap::new();
startup_map.insert("COMPRESSION".into(), compression.to_string());
let startup = BodyReqStartup { map: startup_map };

set_startup_state(&mut self.compression, &mut self.version, version, &startup);
self.handshake_complete
.store(true, std::sync::atomic::Ordering::Relaxed);
}

fn encode_frame(
&mut self,
dst: &mut BytesMut,
Expand Down

0 comments on commit 22f242b

Please sign in to comment.