Skip to content

Commit

Permalink
Opensearch
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Jan 30, 2024
1 parent 6864e6b commit 7c7c421
Show file tree
Hide file tree
Showing 10 changed files with 82 additions and 30 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ jobs:
- name: Install ubuntu packages
run: shotover-proxy/build/install_ubuntu_packages.sh
- name: Install cargo-hack
run: cargo install cargo-hack --version 0.5.8
run: cargo install cargo-hack --version 0.6.16
- name: Ensure that dev tools compiles and has no warnings with no features enabled
run: cargo clippy --locked ${{ matrix.cargo_flags }} --all-targets -- -D warnings
- name: Ensure that shotover-proxy compiles and has no warnings under every possible combination of features
# some things to explicitly point out:
# * clippy also reports rustc warnings and errors
# * clippy --all-targets is not run so we only build the shotover_proxy executable without the tests/benches
run: cargo hack --feature-powerset clippy --locked ${{ matrix.cargo_flags }} --package shotover-proxy -- -D warnings
run: cargo hack --feature-powerset --at-least-one-of redis,cassandra,kafka,opensearch clippy --locked ${{ matrix.cargo_flags }} --package shotover-proxy -- -D warnings
2 changes: 1 addition & 1 deletion shotover-proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ alpha-transforms = ["shotover/alpha-transforms"]
cassandra = ["shotover/cassandra"]
kafka = ["shotover/kafka"]
redis = ["shotover/redis"]
opensearch = []
opensearch = ["shotover/opensearch"]
cassandra-cpp-driver-tests = ["test-helpers/cassandra-cpp-driver-tests"]
rdkafka-driver-tests = ["test-helpers/rdkafka-driver-tests"]
default = ["cassandra", "kafka", "redis", "opensearch"]
Expand Down
41 changes: 28 additions & 13 deletions shotover/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,25 @@ cassandra = [
"dep:version-compare",
"dep:aws-sdk-kms",
"dep:aws-config",
"dep:base64",
"dep:serde_json",
"dep:halfbrown",
"dep:chacha20poly1305",
"dep:generic-array",
]
kafka = ["dep:kafka-protocol", "dep:dashmap", "dep:xxhash-rust"]
redis = ["dep:redis-protocol", "dep:csv"]
default = ["cassandra", "redis", "kafka"]
kafka = [
"dep:kafka-protocol",
"dep:dashmap",
"dep:xxhash-rust",
"dep:string"
]
redis = [
"dep:redis-protocol",
"dep:csv",
"dep:crc16"
]
opensearch = ["dep:atoi"]
default = ["cassandra", "redis", "kafka", "opensearch"]

[dependencies]
atomic_enum = "0.2.0"
Expand Down Expand Up @@ -58,13 +73,13 @@ backtrace-ext = "0.2"
# Parsers
cql3-parser = { version = "0.4.0", optional = true }
serde.workspace = true
serde_json.workspace = true
serde_json = { workspace = true, optional = true }
serde_yaml.workspace = true
bincode.workspace = true
num = { version = "0.4.0", features = ["serde"] }
uuid = { workspace = true }
bigdecimal = { version = "0.4.0", features = ["serde"] }
base64 = "0.21.0"
base64 = { version = "0.21.0", optional = true }
httparse = "1.8.0"
http = "0.2.9"

Expand All @@ -75,36 +90,36 @@ tracing.workspace = true
tracing-subscriber.workspace = true
tracing-appender.workspace = true
hyper.workspace = true
halfbrown = "0.2.1"
halfbrown = { version = "0.2.1", optional = true }

# Transform dependencies
redis-protocol = { workspace = true, optional = true }
cassandra-protocol = { workspace = true, optional = true }
crc16 = "0.4.0"
crc16 = { version = "0.4.0", optional = true }
ordered-float.workspace = true

#Crypto
aws-config = { version = "1.0.0", optional = true }
aws-sdk-kms = { version = "1.1.0", optional = true }
strum_macros = "0.26"
chacha20poly1305 = { version = "0.10.0", features = ["std"] }
generic-array = { version = "0.14", features = ["serde"] }
chacha20poly1305 = { version = "0.10.0", features = ["std"], optional = true }
generic-array = { version = "0.14", features = ["serde"], optional = true }
kafka-protocol = { version = "0.8.0", optional = true }
rustls = { version = "0.22.0" }
tokio-rustls = "0.25"
rustls-pemfile = "2.0.0"
rustls-pki-types = "1.0.1"
string = "0.3.0"
string = { version = "0.3.0", optional = true }
xxhash-rust = { version = "0.8.6", features = ["xxh3"], optional = true }
dashmap = { version = "5.4.0", optional = true }
atoi = "2.0.0"
atoi = { version = "2.0.0", optional = true }

[dev-dependencies]
criterion = { version = "0.5.0", features = ["async_tokio"] }
hex-literal.workspace = true

# TODO: Optionionally compiling benches is quite tricky with criterion, maybe it would be easier with divan?
# For now just set requried features
# TODO: Optionally compiling benches is quite tricky with criterion, maybe it would be easier with divan?
# For now just set required features
[[bench]]
name = "benches"
harness = false
Expand Down
2 changes: 2 additions & 0 deletions shotover/src/codec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use tokio_util::codec::{Decoder, Encoder};
pub mod cassandra;
#[cfg(feature = "kafka")]
pub mod kafka;
#[cfg(feature = "opensearch")]
pub mod opensearch;
#[cfg(feature = "redis")]
pub mod redis;
Expand Down Expand Up @@ -56,6 +57,7 @@ pub enum CodecState {
request_header: Option<RequestHeader>,
},
Dummy,
#[cfg(feature = "opensearch")]
OpenSearch,
}

Expand Down
15 changes: 12 additions & 3 deletions shotover/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pub enum MessageType {
#[cfg(feature = "kafka")]
Kafka,
Dummy,
#[cfg(feature = "opensearch")]
OpenSearch,
}

Expand All @@ -44,6 +45,7 @@ impl From<&ProtocolType> for MessageType {
ProtocolType::Redis => Self::Redis,
#[cfg(feature = "kafka")]
ProtocolType::Kafka { .. } => Self::Kafka,
#[cfg(feature = "opensearch")]
ProtocolType::OpenSearch => Self::OpenSearch,
}
}
Expand All @@ -63,6 +65,7 @@ impl Frame {
request_header: None,
},
Frame::Dummy => CodecState::Dummy,
#[cfg(feature = "opensearch")]
Frame::OpenSearch(_) => CodecState::OpenSearch,
}
}
Expand All @@ -79,29 +82,31 @@ pub enum Frame {
/// Represents a message that has must exist due to shotovers requirement that every request has a corresponding response.
/// It exists purely to keep transform invariants and codecs will completely ignore this frame when they receive it
Dummy,
#[cfg(feature = "opensearch")]
OpenSearch(OpenSearchFrame),
}

impl Frame {
pub fn from_bytes(
bytes: Bytes,
message_type: MessageType,
_codec_state: CodecState,
codec_state: CodecState,
) -> Result<Self> {
match message_type {
#[cfg(feature = "cassandra")]
MessageType::Cassandra => {
CassandraFrame::from_bytes(bytes, _codec_state.as_cassandra()).map(Frame::Cassandra)
CassandraFrame::from_bytes(bytes, codec_state.as_cassandra()).map(Frame::Cassandra)
}
#[cfg(feature = "redis")]
MessageType::Redis => redis_protocol::resp2::decode::decode(&bytes)
.map(|x| Frame::Redis(x.unwrap().0))
.map_err(|e| anyhow!("{e:?}")),
#[cfg(feature = "kafka")]
MessageType::Kafka => {
KafkaFrame::from_bytes(bytes, _codec_state.as_kafka()).map(Frame::Kafka)
KafkaFrame::from_bytes(bytes, codec_state.as_kafka()).map(Frame::Kafka)
}
MessageType::Dummy => Ok(Frame::Dummy),
#[cfg(feature = "opensearch")]
MessageType::OpenSearch => Ok(Frame::OpenSearch(OpenSearchFrame::from_bytes(&bytes)?)),
}
}
Expand All @@ -115,6 +120,7 @@ impl Frame {
#[cfg(feature = "kafka")]
Frame::Kafka(_) => "Kafka",
Frame::Dummy => "Dummy",
#[cfg(feature = "opensearch")]
Frame::OpenSearch(_) => "OpenSearch",
}
}
Expand All @@ -128,6 +134,7 @@ impl Frame {
#[cfg(feature = "kafka")]
Frame::Kafka(_) => MessageType::Kafka,
Frame::Dummy => MessageType::Dummy,
#[cfg(feature = "opensearch")]
Frame::OpenSearch(_) => MessageType::OpenSearch,
}
}
Expand Down Expand Up @@ -179,6 +186,7 @@ impl Frame {

pub fn into_opensearch(self) -> Result<OpenSearchFrame> {
match self {
#[cfg(feature = "opensearch")]
Frame::OpenSearch(frame) => Ok(frame),
frame => Err(anyhow!(
"Expected opensearch frame but received {} frame",
Expand All @@ -198,6 +206,7 @@ impl Display for Frame {
#[cfg(feature = "kafka")]
Frame::Kafka(frame) => write!(f, "Kafka {}", frame),
Frame::Dummy => write!(f, "Shotover internal dummy message"),
#[cfg(feature = "opensearch")]
Frame::OpenSearch(frame) => write!(f, "OpenSearch: {:?}", frame),
}
}
Expand Down
14 changes: 12 additions & 2 deletions shotover/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,20 @@
any(
not(feature = "cassandra"),
not(feature = "redis"),
not(feature = "kafka")
not(feature = "kafka"),
not(feature = "opensearch"),
),
allow(dead_code, unused_imports)
allow(dead_code, unused_imports, unused_variables)
)]
#[cfg(all(
not(feature = "cassandra"),
not(feature = "redis"),
not(feature = "kafka"),
not(feature = "opensearch"),
))]
compile_error!(
"At least one protocol feature must be enabled, e.g. `cassandra`, `redis`, `kafka` or `opensearch`"
);

pub mod codec;
pub mod config;
Expand Down
15 changes: 12 additions & 3 deletions shotover/src/message/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,21 @@ pub enum Metadata {
Redis,
#[cfg(feature = "kafka")]
Kafka,
#[cfg(feature = "opensearch")]
OpenSearch,
}

#[derive(PartialEq)]
pub enum ProtocolType {
#[cfg(feature = "cassandra")]
Cassandra {
compression: Compression,
},
Cassandra { compression: Compression },
#[cfg(feature = "redis")]
Redis,
#[cfg(feature = "kafka")]
Kafka {
request_header: Option<RequestHeader>,
},
#[cfg(feature = "opensearch")]
OpenSearch,
}

Expand All @@ -67,6 +67,7 @@ impl From<&ProtocolType> for CodecState {
ProtocolType::Kafka { request_header } => Self::Kafka {
request_header: *request_header,
},
#[cfg(feature = "opensearch")]
ProtocolType::OpenSearch => Self::OpenSearch,
}
}
Expand Down Expand Up @@ -275,6 +276,7 @@ impl Message {
#[cfg(feature = "kafka")]
MessageType::Kafka => todo!(),
MessageType::Dummy => nonzero!(1u32),
#[cfg(feature = "opensearch")]
MessageType::OpenSearch => todo!(),
},
MessageInner::Modified { frame } | MessageInner::Parsed { frame, .. } => match frame {
Expand All @@ -285,6 +287,7 @@ impl Message {
#[cfg(feature = "kafka")]
Frame::Kafka(_) => todo!(),
Frame::Dummy => nonzero!(1u32),
#[cfg(feature = "opensearch")]
Frame::OpenSearch(_) => todo!(),
},
})
Expand Down Expand Up @@ -312,6 +315,7 @@ impl Message {
#[cfg(feature = "kafka")]
Some(Frame::Kafka(_)) => todo!(),
Some(Frame::Dummy) => todo!(),
#[cfg(feature = "opensearch")]
Some(Frame::OpenSearch(_)) => todo!(),
None => QueryType::ReadWrite,
}
Expand Down Expand Up @@ -350,6 +354,7 @@ impl Message {
Metadata::Kafka => return Err(anyhow!(_error).context(
"A generic error cannot be formed because the kafka protocol does not support it",
)),
#[cfg(feature = "opensearch")]
Metadata::OpenSearch => unimplemented!()
}))
}
Expand All @@ -372,6 +377,7 @@ impl Message {
#[cfg(feature = "kafka")]
MessageType::Kafka => Ok(Metadata::Kafka),
MessageType::Dummy => Err(anyhow!("Dummy has no metadata")),
#[cfg(feature = "opensearch")]
MessageType::OpenSearch => Err(anyhow!("OpenSearch has no metadata")),
},
MessageInner::Parsed { frame, .. } | MessageInner::Modified { frame } => match frame {
Expand All @@ -382,6 +388,7 @@ impl Message {
#[cfg(feature = "redis")]
Frame::Redis(_) => Ok(Metadata::Redis),
Frame::Dummy => Err(anyhow!("dummy has no metadata")),
#[cfg(feature = "opensearch")]
Frame::OpenSearch(_) => Err(anyhow!("OpenSearch has no metadata")),
},
}
Expand Down Expand Up @@ -412,6 +419,7 @@ impl Message {
Metadata::Redis => unimplemented!(),
#[cfg(feature = "kafka")]
Metadata::Kafka => unimplemented!(),
#[cfg(feature = "opensearch")]
Metadata::OpenSearch => unimplemented!(),
},
// reachable with feature = cassandra
Expand Down Expand Up @@ -451,6 +459,7 @@ impl Message {
#[cfg(feature = "kafka")]
Frame::Kafka(_) => None,
Frame::Dummy => None,
#[cfg(feature = "opensearch")]
Frame::OpenSearch(_) => None,
}
}
Expand Down
6 changes: 6 additions & 0 deletions shotover/src/sources/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use crate::sources::cassandra::{CassandraConfig, CassandraSource};
#[cfg(feature = "kafka")]
use crate::sources::kafka::{KafkaConfig, KafkaSource};
#[cfg(feature = "opensearch")]
use crate::sources::opensearch::{OpenSearchConfig, OpenSearchSource};
#[cfg(feature = "redis")]
use crate::sources::redis::{RedisConfig, RedisSource};
Expand All @@ -16,6 +17,7 @@ use tokio::task::JoinHandle;
pub mod cassandra;
#[cfg(feature = "kafka")]
pub mod kafka;
#[cfg(feature = "opensearch")]
pub mod opensearch;
#[cfg(feature = "redis")]
pub mod redis;
Expand All @@ -35,6 +37,7 @@ pub enum Source {
Redis(RedisSource),
#[cfg(feature = "kafka")]
Kafka(KafkaSource),
#[cfg(feature = "opensearch")]
OpenSearch(OpenSearchSource),
}

Expand All @@ -47,6 +50,7 @@ impl Source {
Source::Redis(r) => r.join_handle,
#[cfg(feature = "kafka")]
Source::Kafka(r) => r.join_handle,
#[cfg(feature = "opensearch")]
Source::OpenSearch(o) => o.join_handle,
}
}
Expand All @@ -61,6 +65,7 @@ pub enum SourceConfig {
Redis(RedisConfig),
#[cfg(feature = "kafka")]
Kafka(KafkaConfig),
#[cfg(feature = "opensearch")]
OpenSearch(OpenSearchConfig),
}

Expand All @@ -76,6 +81,7 @@ impl SourceConfig {
SourceConfig::Redis(r) => r.get_source(trigger_shutdown_rx).await,
#[cfg(feature = "kafka")]
SourceConfig::Kafka(r) => r.get_source(trigger_shutdown_rx).await,
#[cfg(feature = "opensearch")]
SourceConfig::OpenSearch(r) => r.get_source(trigger_shutdown_rx).await,
}
}
Expand Down
Loading

0 comments on commit 7c7c421

Please sign in to comment.