diff --git a/shotover/benches/benches/chain.rs b/shotover/benches/benches/chain.rs index b05037845..4c0f4b59c 100644 --- a/shotover/benches/benches/chain.rs +++ b/shotover/benches/benches/chain.rs @@ -3,10 +3,11 @@ use cassandra_protocol::compression::Compression; use cassandra_protocol::{consistency::Consistency, frame::Version, query::QueryParams}; use criterion::{criterion_group, BatchSize, Criterion}; use hex_literal::hex; +use shotover::codec::CodecState; use shotover::frame::cassandra::{parse_statement_single, Tracing}; use shotover::frame::RedisFrame; use shotover::frame::{CassandraFrame, CassandraOperation, Frame}; -use shotover::message::{Message, MessageIdMap, ProtocolType, QueryType}; +use shotover::message::{Message, MessageIdMap, QueryType}; use shotover::transforms::cassandra::peers_rewrite::CassandraPeersRewrite; use shotover::transforms::chain::{TransformChain, TransformChainBuilder}; use shotover::transforms::debug::returner::{DebugReturner, Response}; @@ -210,7 +211,7 @@ fn criterion_benchmark(c: &mut Criterion) { ) .to_vec(), ), - ProtocolType::Cassandra { + CodecState::Cassandra { compression: Compression::None, }, )], @@ -264,7 +265,7 @@ fn criterion_benchmark(c: &mut Criterion) { } .encode(Compression::None) .into(), - ProtocolType::Cassandra { + CodecState::Cassandra { compression: Compression::None, }, )], diff --git a/shotover/benches/benches/codec/kafka.rs b/shotover/benches/benches/codec/kafka.rs index 94f92d33d..313f285f8 100644 --- a/shotover/benches/benches/codec/kafka.rs +++ b/shotover/benches/benches/codec/kafka.rs @@ -1,8 +1,8 @@ use bytes::{Bytes, BytesMut}; use criterion::{black_box, criterion_group, BatchSize, Criterion}; use shotover::codec::kafka::KafkaCodecBuilder; -use shotover::codec::{CodecBuilder, Direction}; -use shotover::message::{Message, ProtocolType}; +use shotover::codec::{CodecBuilder, CodecState, Direction}; +use shotover::message::Message; use tokio_util::codec::{Decoder, Encoder}; const KAFKA_REQUESTS: &[(&[u8], &str)] = &[ @@ -76,7 +76,7 @@ fn criterion_benchmark(c: &mut Criterion) { { let mut message = Message::from_bytes( Bytes::from(message.to_vec()), - ProtocolType::Kafka { + CodecState::Kafka { request_header: None, }, ); @@ -112,7 +112,7 @@ fn criterion_benchmark(c: &mut Criterion) { for (message, _) in KAFKA_REQUESTS { let mut message = Message::from_bytes( Bytes::from(message.to_vec()), - ProtocolType::Kafka { + CodecState::Kafka { request_header: None, }, ); diff --git a/shotover/src/codec/cassandra.rs b/shotover/src/codec/cassandra.rs index bdfd32e7d..fdad538e4 100644 --- a/shotover/src/codec/cassandra.rs +++ b/shotover/src/codec/cassandra.rs @@ -1,4 +1,5 @@ use super::{CodecBuilder, CodecReadError, CodecWriteError, Direction}; +use crate::codec::CodecState; use crate::frame::cassandra::{CassandraMetadata, CassandraOperation, Tracing}; use crate::frame::{CassandraFrame, Frame, MessageType}; use crate::message::{Encodable, Message, Messages, Metadata}; @@ -358,7 +359,7 @@ impl CassandraDecoder { let message = Message::from_bytes_at_instant( bytes.freeze(), - crate::message::ProtocolType::Cassandra { + CodecState::Cassandra { compression: if compressed { compression } else { @@ -510,7 +511,7 @@ impl CassandraDecoder { envelopes.push(Message::from_bytes_at_instant( envelope, - crate::message::ProtocolType::Cassandra { + CodecState::Cassandra { compression: Compression::None, }, Some(received_at), diff --git a/shotover/src/codec/kafka.rs b/shotover/src/codec/kafka.rs index 85892a0ba..fcd92b5bf 100644 --- a/shotover/src/codec/kafka.rs +++ b/shotover/src/codec/kafka.rs @@ -1,7 +1,7 @@ use super::{message_latency, CodecWriteError, Direction}; -use crate::codec::{CodecBuilder, CodecReadError}; +use crate::codec::{CodecBuilder, CodecReadError, CodecState}; use crate::frame::MessageType; -use crate::message::{Encodable, Message, MessageId, Messages, ProtocolType}; +use crate::message::{Encodable, Message, MessageId, Messages}; use anyhow::{anyhow, Result}; use bytes::BytesMut; use kafka_protocol::messages::ApiKey; @@ -112,7 +112,7 @@ impl Decoder for KafkaDecoder { .map_err(|_| CodecReadError::Parser(anyhow!("kafka encoder half was lost")))?; let mut message = Message::from_bytes_at_instant( bytes.freeze(), - ProtocolType::Kafka { + CodecState::Kafka { request_header: Some(header), }, Some(received_at), @@ -122,7 +122,7 @@ impl Decoder for KafkaDecoder { } else { Message::from_bytes_at_instant( bytes.freeze(), - ProtocolType::Kafka { + CodecState::Kafka { request_header: None, }, Some(received_at), diff --git a/shotover/src/frame/mod.rs b/shotover/src/frame/mod.rs index 465a46f1c..7091db8dd 100644 --- a/shotover/src/frame/mod.rs +++ b/shotover/src/frame/mod.rs @@ -1,6 +1,6 @@ //! parsed AST-like representations of messages -use crate::{codec::CodecState, message::ProtocolType}; +use crate::codec::CodecState; use anyhow::{anyhow, Result}; use bytes::Bytes; #[cfg(feature = "cassandra")] @@ -33,9 +33,9 @@ pub enum MessageType { Cassandra, #[cfg(feature = "kafka")] Kafka, - Dummy, #[cfg(feature = "opensearch")] OpenSearch, + Dummy, } impl MessageType { @@ -68,17 +68,18 @@ impl MessageType { } } -impl From<&ProtocolType> for MessageType { - fn from(value: &ProtocolType) -> Self { +impl From<&CodecState> for MessageType { + fn from(value: &CodecState) -> Self { match value { #[cfg(feature = "cassandra")] - ProtocolType::Cassandra { .. } => Self::Cassandra, + CodecState::Cassandra { .. } => Self::Cassandra, #[cfg(feature = "redis")] - ProtocolType::Redis => Self::Redis, + CodecState::Redis => Self::Redis, #[cfg(feature = "kafka")] - ProtocolType::Kafka { .. } => Self::Kafka, + CodecState::Kafka { .. } => Self::Kafka, #[cfg(feature = "opensearch")] - ProtocolType::OpenSearch => Self::OpenSearch, + CodecState::OpenSearch => Self::OpenSearch, + CodecState::Dummy => Self::Dummy, } } } diff --git a/shotover/src/message/mod.rs b/shotover/src/message/mod.rs index 1fa6d5e2d..78a794806 100644 --- a/shotover/src/message/mod.rs +++ b/shotover/src/message/mod.rs @@ -1,7 +1,5 @@ //! Message and supporting types - used to hold a message/query/result going between the client and database -#[cfg(feature = "kafka")] -use crate::codec::kafka::RequestHeader; use crate::codec::CodecState; #[cfg(feature = "cassandra")] use crate::frame::{cassandra, cassandra::CassandraMetadata}; @@ -10,8 +8,6 @@ use crate::frame::{redis::redis_query_type, RedisFrame}; use crate::frame::{Frame, MessageType}; use anyhow::{anyhow, Context, Result}; use bytes::Bytes; -#[cfg(feature = "cassandra")] -use cassandra_protocol::compression::Compression; use derivative::Derivative; use fnv::FnvBuildHasher; use nonzero_ext::nonzero; @@ -34,39 +30,6 @@ pub enum Metadata { OpenSearch, } -#[derive(PartialEq)] -pub enum ProtocolType { - #[cfg(feature = "cassandra")] - Cassandra { compression: Compression }, - #[cfg(feature = "redis")] - Redis, - #[cfg(feature = "kafka")] - Kafka { - request_header: Option, - }, - #[cfg(feature = "opensearch")] - OpenSearch, -} - -impl From<&ProtocolType> for CodecState { - fn from(value: &ProtocolType) -> Self { - match value { - #[cfg(feature = "cassandra")] - ProtocolType::Cassandra { compression } => Self::Cassandra { - compression: *compression, - }, - #[cfg(feature = "redis")] - ProtocolType::Redis => Self::Redis, - #[cfg(feature = "kafka")] - ProtocolType::Kafka { request_header } => Self::Kafka { - request_header: *request_header, - }, - #[cfg(feature = "opensearch")] - ProtocolType::OpenSearch => Self::OpenSearch, - } - } -} - pub type Messages = Vec; /// Unique identifier for the message assigned by shotover at creation time. @@ -121,16 +84,16 @@ impl Message { /// Providing just the bytes results in better performance when only the raw bytes are available. pub fn from_bytes_at_instant( bytes: Bytes, - protocol_type: ProtocolType, + codec_state: CodecState, received_from_source_or_sink_at: Option, ) -> Self { Message { inner: Some(MessageInner::RawBytes { bytes, - message_type: MessageType::from(&protocol_type), + message_type: MessageType::from(&codec_state), }), meta_timestamp: None, - codec_state: CodecState::from(&protocol_type), + codec_state, received_from_source_or_sink_at, id: rand::random(), request_id: None, @@ -173,8 +136,8 @@ impl Message { } /// Same as [`Message::from_bytes`] but `received_from_source_or_sink_at` is set to None. - pub fn from_bytes(bytes: Bytes, protocol_type: ProtocolType) -> Self { - Self::from_bytes_at_instant(bytes, protocol_type, None) + pub fn from_bytes(bytes: Bytes, codec_state: CodecState) -> Self { + Self::from_bytes_at_instant(bytes, codec_state, None) } /// Same as [`Message::from_frame`] but `received_from_source_or_sink_at` is set to None.