diff --git a/shotover/benches/benches/codec/cassandra.rs b/shotover/benches/benches/codec/cassandra.rs index 36d7c0ba9..b04bb4eb2 100644 --- a/shotover/benches/benches/codec/cassandra.rs +++ b/shotover/benches/benches/codec/cassandra.rs @@ -12,7 +12,7 @@ use shotover::frame::{ CassandraFrame, CassandraOperation, CassandraResult, Frame, }; use shotover::message::Message; -use tokio_util::codec::Encoder; +use tokio_util::codec::{Decoder, Encoder}; fn criterion_benchmark(c: &mut Criterion) { let mut group = c.benchmark_group("cassandra_codec"); @@ -30,20 +30,88 @@ fn criterion_benchmark(c: &mut Criterion) { }, }))]; - let (_, mut encoder) = + let (mut decoder, mut encoder) = CassandraCodecBuilder::new(Direction::Sink, "cassandra".to_owned()).build(); - group.bench_function("encode_cassandra_system.local_query", |b| { - b.iter_batched( - || messages.clone(), - |messages| { - let mut bytes = BytesMut::new(); - encoder.encode(messages, &mut bytes).unwrap(); - black_box(bytes) - }, - BatchSize::SmallInput, - ) - }); + encoder.set_startup_state_ext("NONE".to_string(), Version::V4); + + group.bench_function( + "encode_cassandra_system.local_query_v4_no_compression", + |b| { + b.iter_batched( + || messages.clone(), + |messages| { + let mut bytes = BytesMut::new(); + encoder.encode(messages, &mut bytes).unwrap(); + black_box(bytes) + }, + BatchSize::SmallInput, + ) + }, + ); + + group.bench_function( + "decode_cassandra_system.local_query_v4_no_compression", + |b| { + b.iter_batched( + || { + let mut bytes = BytesMut::new(); + encoder.encode(messages.clone(), &mut bytes).unwrap(); + bytes + }, + |mut bytes| decoder.decode(&mut bytes).unwrap(), + BatchSize::SmallInput, + ) + }, + ); + } + + { + let messages = vec![Message::from_frame(Frame::Cassandra(CassandraFrame { + version: Version::V4, + stream_id: 1, + tracing: Tracing::Request(false), + warnings: vec![], + operation: CassandraOperation::Query { + query: Box::new(parse_statement_single("SELECT * FROM system.local;")), + params: Box::default(), + }, + }))]; + + let (mut decoder, mut encoder) = + CassandraCodecBuilder::new(Direction::Sink, "cassandra".to_owned()).build(); + + encoder.set_startup_state_ext("NONE".to_string(), Version::V4); + + group.bench_function( + "encode_cassandra_system.local_query_v4_lz4_compression", + |b| { + b.iter_batched( + || messages.clone(), + |messages| { + let mut bytes = BytesMut::new(); + encoder.encode(messages, &mut bytes).unwrap(); + black_box(bytes) + }, + BatchSize::SmallInput, + ) + }, + ); + + group.bench_function( + "decode_cassandra_system.local_query_v4_lz4_compression", + |b| { + b.iter_batched( + || { + let mut bytes = BytesMut::new(); + encoder.encode(messages.clone(), &mut bytes).unwrap(); + bytes + }, + |mut bytes| decoder.decode(&mut bytes).unwrap(), + BatchSize::SmallInput, + ) + }, + ); } { @@ -55,20 +123,271 @@ fn criterion_benchmark(c: &mut Criterion) { operation: CassandraOperation::Result(peers_v2_result()), }))]; - let (_, mut encoder) = + let (mut decoder, mut encoder) = CassandraCodecBuilder::new(Direction::Sink, "cassandra".to_owned()).build(); - group.bench_function("encode_cassandra_system.local_result", |b| { - b.iter_batched( - || messages.clone(), - |messages| { - let mut bytes = BytesMut::new(); - encoder.encode(messages, &mut bytes).unwrap(); - black_box(bytes) - }, - BatchSize::SmallInput, - ) - }); + encoder.set_startup_state_ext("NONE".to_string(), Version::V5); + + group.bench_function( + "encode_cassandra_system.local_result_v4_no_compression", + |b| { + b.iter_batched( + || messages.clone(), + |messages| { + let mut bytes = BytesMut::new(); + encoder.encode(messages, &mut bytes).unwrap(); + black_box(bytes) + }, + BatchSize::SmallInput, + ) + }, + ); + + group.bench_function( + "decode_cassandra_system.local_result_v4_no_compression", + |b| { + b.iter_batched( + || { + let mut bytes = BytesMut::new(); + encoder.encode(messages.clone(), &mut bytes).unwrap(); + bytes + }, + |mut bytes| decoder.decode(&mut bytes).unwrap(), + BatchSize::SmallInput, + ) + }, + ); + } + + { + let messages = vec![Message::from_frame(Frame::Cassandra(CassandraFrame { + version: Version::V4, + stream_id: 0, + tracing: Tracing::Response(None), + warnings: vec![], + operation: CassandraOperation::Result(peers_v2_result()), + }))]; + + let (mut decoder, mut encoder) = + CassandraCodecBuilder::new(Direction::Sink, "cassandra".to_owned()).build(); + + encoder.set_startup_state_ext("LZ4".to_string(), Version::V5); + + group.bench_function( + "encode_cassandra_system.local_result_v4_lz4_compression", + |b| { + b.iter_batched( + || messages.clone(), + |messages| { + let mut bytes = BytesMut::new(); + encoder.encode(messages, &mut bytes).unwrap(); + black_box(bytes) + }, + BatchSize::SmallInput, + ) + }, + ); + + group.bench_function( + "decode_cassandra_system.local_result_v4_lz4_compression", + |b| { + b.iter_batched( + || { + let mut bytes = BytesMut::new(); + encoder.encode(messages.clone(), &mut bytes).unwrap(); + bytes + }, + |mut bytes| decoder.decode(&mut bytes).unwrap(), + BatchSize::SmallInput, + ) + }, + ); + } + + { + let messages = vec![Message::from_frame(Frame::Cassandra(CassandraFrame { + version: Version::V5, + stream_id: 1, + tracing: Tracing::Request(false), + warnings: vec![], + operation: CassandraOperation::Query { + query: Box::new(parse_statement_single("SELECT * FROM system.local;")), + params: Box::default(), + }, + }))]; + + let (mut decoder, mut encoder) = + CassandraCodecBuilder::new(Direction::Sink, "cassandra".to_owned()).build(); + + encoder.set_startup_state_ext("NONE".to_string(), Version::V5); + + group.bench_function( + "encode_cassandra_system.local_query_v5_no_compression", + |b| { + b.iter_batched( + || messages.clone(), + |messages| { + let mut bytes = BytesMut::new(); + encoder.encode(messages, &mut bytes).unwrap(); + black_box(bytes) + }, + BatchSize::SmallInput, + ) + }, + ); + + group.bench_function( + "decode_cassandra_system.local_query_v5_no_compression", + |b| { + b.iter_batched( + || { + let mut bytes = BytesMut::new(); + encoder.encode(messages.clone(), &mut bytes).unwrap(); + bytes + }, + |mut bytes| decoder.decode(&mut bytes).unwrap(), + BatchSize::SmallInput, + ) + }, + ); + } + + { + let messages = vec![Message::from_frame(Frame::Cassandra(CassandraFrame { + version: Version::V5, + stream_id: 1, + tracing: Tracing::Request(false), + warnings: vec![], + operation: CassandraOperation::Query { + query: Box::new(parse_statement_single("SELECT * FROM system.local;")), + params: Box::default(), + }, + }))]; + + let (mut decoder, mut encoder) = + CassandraCodecBuilder::new(Direction::Sink, "cassandra".to_owned()).build(); + + encoder.set_startup_state_ext("LZ4".to_string(), Version::V5); + + group.bench_function( + "encode_cassandra_system.local_query_v5_lz4_compression", + |b| { + b.iter_batched( + || messages.clone(), + |messages| { + let mut bytes = BytesMut::new(); + encoder.encode(messages, &mut bytes).unwrap(); + black_box(bytes) + }, + BatchSize::SmallInput, + ) + }, + ); + + group.bench_function( + "decode_cassandra_system.local_query_v5_lz4_compression", + |b| { + b.iter_batched( + || { + let mut bytes = BytesMut::new(); + encoder.encode(messages.clone(), &mut bytes).unwrap(); + bytes + }, + |mut bytes| decoder.decode(&mut bytes).unwrap(), + BatchSize::SmallInput, + ) + }, + ); + } + + { + let messages = vec![Message::from_frame(Frame::Cassandra(CassandraFrame { + version: Version::V5, + stream_id: 0, + tracing: Tracing::Response(None), + warnings: vec![], + operation: CassandraOperation::Result(peers_v2_result()), + }))]; + + let (mut decoder, mut encoder) = + CassandraCodecBuilder::new(Direction::Sink, "cassandra".to_owned()).build(); + + encoder.set_startup_state_ext("NONE".to_string(), Version::V5); + + group.bench_function( + "encode_cassandra_system.local_result_v5_no_compression", + |b| { + b.iter_batched( + || messages.clone(), + |messages| { + let mut bytes = BytesMut::new(); + encoder.encode(messages, &mut bytes).unwrap(); + black_box(bytes) + }, + BatchSize::SmallInput, + ) + }, + ); + + group.bench_function( + "decode_cassandra_system.local_result_v5_no_compression", + |b| { + b.iter_batched( + || { + let mut bytes = BytesMut::new(); + encoder.encode(messages.clone(), &mut bytes).unwrap(); + bytes + }, + |mut bytes| decoder.decode(&mut bytes).unwrap(), + BatchSize::SmallInput, + ) + }, + ); + } + + { + let messages = vec![Message::from_frame(Frame::Cassandra(CassandraFrame { + version: Version::V5, + stream_id: 0, + tracing: Tracing::Response(None), + warnings: vec![], + operation: CassandraOperation::Result(peers_v2_result()), + }))]; + + let (mut decoder, mut encoder) = + CassandraCodecBuilder::new(Direction::Sink, "cassandra".to_owned()).build(); + + encoder.set_startup_state_ext("LZ4".to_string(), Version::V5); + + group.bench_function( + "encode_cassandra_system.local_result_v5_lz4_compression", + |b| { + b.iter_batched( + || messages.clone(), + |messages| { + let mut bytes = BytesMut::new(); + encoder.encode(messages, &mut bytes).unwrap(); + black_box(bytes) + }, + BatchSize::SmallInput, + ) + }, + ); + + group.bench_function( + "decode_cassandra_system.local_result_v5_lz4_compression", + |b| { + b.iter_batched( + || { + let mut bytes = BytesMut::new(); + encoder.encode(messages.clone(), &mut bytes).unwrap(); + bytes + }, + |mut bytes| decoder.decode(&mut bytes).unwrap(), + BatchSize::SmallInput, + ) + }, + ); } } diff --git a/shotover/src/codec/cassandra.rs b/shotover/src/codec/cassandra.rs index b2d5cfb83..ff4739a20 100644 --- a/shotover/src/codec/cassandra.rs +++ b/shotover/src/codec/cassandra.rs @@ -14,6 +14,7 @@ use cql3_parser::cassandra_statement::CassandraStatement; use cql3_parser::common::Identifier; use lz4_flex::{block::get_maximum_output_size, compress_into, decompress}; use metrics::{register_counter, Counter, Histogram}; +use std::collections::HashMap; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Instant; @@ -773,6 +774,16 @@ impl Encoder for CassandraEncoder { } impl CassandraEncoder { + pub fn set_startup_state_ext(&mut self, compression: String, version: Version) { + let mut startup_map = HashMap::new(); + startup_map.insert("COMPRESSION".into(), compression.to_string()); + let startup = BodyReqStartup { map: startup_map }; + + set_startup_state(&mut self.compression, &mut self.version, version, &startup); + self.handshake_complete + .store(true, std::sync::atomic::Ordering::Relaxed); + } + fn encode_frame( &mut self, dst: &mut BytesMut,