Skip to content

Commit

Permalink
Opensearch
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Jan 30, 2024
1 parent 6864e6b commit 2ce3fe7
Show file tree
Hide file tree
Showing 14 changed files with 99 additions and 39 deletions.
6 changes: 4 additions & 2 deletions .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,13 @@ jobs:
- name: Install ubuntu packages
run: shotover-proxy/build/install_ubuntu_packages.sh
- name: Install cargo-hack
run: cargo install cargo-hack --version 0.5.8
uses: taiki-e/install-action@v2
with:
tool: [email protected]
- name: Ensure that dev tools compiles and has no warnings with no features enabled
run: cargo clippy --locked ${{ matrix.cargo_flags }} --all-targets -- -D warnings
- name: Ensure that shotover-proxy compiles and has no warnings under every possible combination of features
# some things to explicitly point out:
# * clippy also reports rustc warnings and errors
# * clippy --all-targets is not run so we only build the shotover_proxy executable without the tests/benches
run: cargo hack --feature-powerset clippy --locked ${{ matrix.cargo_flags }} --package shotover-proxy -- -D warnings
run: cargo hack --feature-powerset --at-least-one-of redis,cassandra,kafka,opensearch clippy --locked ${{ matrix.cargo_flags }} --package shotover-proxy -- -D warnings
4 changes: 2 additions & 2 deletions .github/workflows/lint.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@ jobs:
- name: Install cargo-hack
uses: taiki-e/install-action@v2
with:
tool: [email protected].4
tool: [email protected].16
- name: Ensure `cargo fmt --all` was run
run: cargo fmt --all -- --check
- name: Ensure that all crates compile and have no warnings under every possible combination of features
# some things to explicitly point out:
# * clippy also reports rustc warnings and errors
# * clippy --all-targets causes clippy to run against tests and examples which it doesnt do by default.
run: cargo hack --feature-powerset clippy --all-targets --locked -- -D warnings
run: cargo hack --feature-powerset --at-least-one-of redis,cassandra,kafka,opensearch clippy --all-targets --locked -- -D warnings
- name: Report disk usage
run: |
df -h
Expand Down
2 changes: 1 addition & 1 deletion shotover-proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ alpha-transforms = ["shotover/alpha-transforms"]
cassandra = ["shotover/cassandra"]
kafka = ["shotover/kafka"]
redis = ["shotover/redis"]
opensearch = []
opensearch = ["shotover/opensearch"]
cassandra-cpp-driver-tests = ["test-helpers/cassandra-cpp-driver-tests"]
rdkafka-driver-tests = ["test-helpers/rdkafka-driver-tests"]
default = ["cassandra", "kafka", "redis", "opensearch"]
Expand Down
1 change: 0 additions & 1 deletion shotover-proxy/benches/windsock/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use shotover::{config::topology::Topology as ShotoverTopology, sources::SourceCo
pub enum Shotover {
None,
Standard,
#[allow(dead_code)]
ForcedMessageParsed,
}

Expand Down
1 change: 0 additions & 1 deletion shotover-proxy/benches/windsock/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ pub fn main() {
)
.unwrap();
}
tracing::info!("aaa");

#[cfg(feature = "cassandra")]
let cassandra_benches = itertools::iproduct!(
Expand Down
1 change: 0 additions & 1 deletion shotover-proxy/benches/windsock/shotover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use test_helpers::shotover_process::ShotoverProcessBuilder;
use tokio_bin_process::{bin_path, BinProcess};
use uuid::Uuid;

#[allow(dead_code)]
pub async fn shotover_process_custom_topology(
topology_contents: &str,
profiler: &ProfilerRunner,
Expand Down
55 changes: 38 additions & 17 deletions shotover/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,31 @@ cassandra = [
"dep:version-compare",
"dep:aws-sdk-kms",
"dep:aws-config",
"dep:base64",
"dep:serde_json",
"dep:halfbrown",
"dep:chacha20poly1305",
"dep:generic-array",
"dep:hex",
"dep:bincode",
]
kafka = ["dep:kafka-protocol", "dep:dashmap", "dep:xxhash-rust"]
redis = ["dep:redis-protocol", "dep:csv"]
default = ["cassandra", "redis", "kafka"]
kafka = [
"dep:kafka-protocol",
"dep:dashmap",
"dep:xxhash-rust",
"dep:string",
]
redis = [
"dep:redis-protocol",
"dep:csv",
"dep:crc16",
]
opensearch = [
"dep:atoi",
"dep:http",
"dep:httparse",
]
default = ["cassandra", "redis", "kafka", "opensearch"]

[dependencies]
atomic_enum = "0.2.0"
Expand All @@ -44,7 +65,7 @@ futures.workspace = true
tokio.workspace = true
tokio-util.workspace = true
csv = { workspace = true, optional = true }
hex.workspace = true
hex = { workspace = true, optional = true }
async-trait.workspace = true
typetag.workspace = true
tokio-tungstenite = "0.21.0"
Expand All @@ -58,15 +79,15 @@ backtrace-ext = "0.2"
# Parsers
cql3-parser = { version = "0.4.0", optional = true }
serde.workspace = true
serde_json.workspace = true
serde_json = { workspace = true, optional = true }
serde_yaml.workspace = true
bincode.workspace = true
bincode = { workspace = true, optional = true }
num = { version = "0.4.0", features = ["serde"] }
uuid = { workspace = true }
bigdecimal = { version = "0.4.0", features = ["serde"] }
base64 = "0.21.0"
httparse = "1.8.0"
http = "0.2.9"
base64 = { version = "0.21.0", optional = true }
httparse = { version = "1.8.0", optional = true }
http = { version = "0.2.9", optional = true }

#Observability
metrics = "0.21.0"
Expand All @@ -75,36 +96,36 @@ tracing.workspace = true
tracing-subscriber.workspace = true
tracing-appender.workspace = true
hyper.workspace = true
halfbrown = "0.2.1"
halfbrown = { version = "0.2.1", optional = true }

# Transform dependencies
redis-protocol = { workspace = true, optional = true }
cassandra-protocol = { workspace = true, optional = true }
crc16 = "0.4.0"
crc16 = { version = "0.4.0", optional = true }
ordered-float.workspace = true

#Crypto
aws-config = { version = "1.0.0", optional = true }
aws-sdk-kms = { version = "1.1.0", optional = true }
strum_macros = "0.26"
chacha20poly1305 = { version = "0.10.0", features = ["std"] }
generic-array = { version = "0.14", features = ["serde"] }
chacha20poly1305 = { version = "0.10.0", features = ["std"], optional = true }
generic-array = { version = "0.14", features = ["serde"], optional = true }
kafka-protocol = { version = "0.8.0", optional = true }
rustls = { version = "0.22.0" }
tokio-rustls = "0.25"
rustls-pemfile = "2.0.0"
rustls-pki-types = "1.0.1"
string = "0.3.0"
string = { version = "0.3.0", optional = true }
xxhash-rust = { version = "0.8.6", features = ["xxh3"], optional = true }
dashmap = { version = "5.4.0", optional = true }
atoi = "2.0.0"
atoi = { version = "2.0.0", optional = true }

[dev-dependencies]
criterion = { version = "0.5.0", features = ["async_tokio"] }
hex-literal.workspace = true

# TODO: Optionionally compiling benches is quite tricky with criterion, maybe it would be easier with divan?
# For now just set requried features
# TODO: Optionally compiling benches is quite tricky with criterion, maybe it would be easier with divan?
# For now just set required features
[[bench]]
name = "benches"
harness = false
Expand Down
2 changes: 2 additions & 0 deletions shotover/src/codec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use tokio_util::codec::{Decoder, Encoder};
pub mod cassandra;
#[cfg(feature = "kafka")]
pub mod kafka;
#[cfg(feature = "opensearch")]
pub mod opensearch;
#[cfg(feature = "redis")]
pub mod redis;
Expand Down Expand Up @@ -56,6 +57,7 @@ pub enum CodecState {
request_header: Option<RequestHeader>,
},
Dummy,
#[cfg(feature = "opensearch")]
OpenSearch,
}

Expand Down
18 changes: 15 additions & 3 deletions shotover/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub use cassandra::{CassandraFrame, CassandraOperation, CassandraResult};
use cassandra_protocol::compression::Compression;
#[cfg(feature = "kafka")]
use kafka::KafkaFrame;
#[cfg(feature = "opensearch")]
pub use opensearch::OpenSearchFrame;
#[cfg(feature = "redis")]
pub use redis_protocol::resp2::types::Frame as RedisFrame;
Expand All @@ -18,6 +19,7 @@ use std::fmt::{Display, Formatter, Result as FmtResult};
pub mod cassandra;
#[cfg(feature = "kafka")]
pub mod kafka;
#[cfg(feature = "opensearch")]
pub mod opensearch;
#[cfg(feature = "redis")]
pub mod redis;
Expand All @@ -32,6 +34,7 @@ pub enum MessageType {
#[cfg(feature = "kafka")]
Kafka,
Dummy,
#[cfg(feature = "opensearch")]
OpenSearch,
}

Expand All @@ -44,6 +47,7 @@ impl From<&ProtocolType> for MessageType {
ProtocolType::Redis => Self::Redis,
#[cfg(feature = "kafka")]
ProtocolType::Kafka { .. } => Self::Kafka,
#[cfg(feature = "opensearch")]
ProtocolType::OpenSearch => Self::OpenSearch,
}
}
Expand All @@ -63,6 +67,7 @@ impl Frame {
request_header: None,
},
Frame::Dummy => CodecState::Dummy,
#[cfg(feature = "opensearch")]
Frame::OpenSearch(_) => CodecState::OpenSearch,
}
}
Expand All @@ -79,29 +84,31 @@ pub enum Frame {
/// Represents a message that has must exist due to shotovers requirement that every request has a corresponding response.
/// It exists purely to keep transform invariants and codecs will completely ignore this frame when they receive it
Dummy,
#[cfg(feature = "opensearch")]
OpenSearch(OpenSearchFrame),
}

impl Frame {
pub fn from_bytes(
bytes: Bytes,
message_type: MessageType,
_codec_state: CodecState,
codec_state: CodecState,
) -> Result<Self> {
match message_type {
#[cfg(feature = "cassandra")]
MessageType::Cassandra => {
CassandraFrame::from_bytes(bytes, _codec_state.as_cassandra()).map(Frame::Cassandra)
CassandraFrame::from_bytes(bytes, codec_state.as_cassandra()).map(Frame::Cassandra)
}
#[cfg(feature = "redis")]
MessageType::Redis => redis_protocol::resp2::decode::decode(&bytes)
.map(|x| Frame::Redis(x.unwrap().0))
.map_err(|e| anyhow!("{e:?}")),
#[cfg(feature = "kafka")]
MessageType::Kafka => {
KafkaFrame::from_bytes(bytes, _codec_state.as_kafka()).map(Frame::Kafka)
KafkaFrame::from_bytes(bytes, codec_state.as_kafka()).map(Frame::Kafka)
}
MessageType::Dummy => Ok(Frame::Dummy),
#[cfg(feature = "opensearch")]
MessageType::OpenSearch => Ok(Frame::OpenSearch(OpenSearchFrame::from_bytes(&bytes)?)),
}
}
Expand All @@ -115,6 +122,7 @@ impl Frame {
#[cfg(feature = "kafka")]
Frame::Kafka(_) => "Kafka",
Frame::Dummy => "Dummy",
#[cfg(feature = "opensearch")]
Frame::OpenSearch(_) => "OpenSearch",
}
}
Expand All @@ -128,6 +136,7 @@ impl Frame {
#[cfg(feature = "kafka")]
Frame::Kafka(_) => MessageType::Kafka,
Frame::Dummy => MessageType::Dummy,
#[cfg(feature = "opensearch")]
Frame::OpenSearch(_) => MessageType::OpenSearch,
}
}
Expand Down Expand Up @@ -177,8 +186,10 @@ impl Frame {
}
}

#[cfg(feature = "opensearch")]
pub fn into_opensearch(self) -> Result<OpenSearchFrame> {
match self {
#[cfg(feature = "opensearch")]
Frame::OpenSearch(frame) => Ok(frame),
frame => Err(anyhow!(
"Expected opensearch frame but received {} frame",
Expand All @@ -198,6 +209,7 @@ impl Display for Frame {
#[cfg(feature = "kafka")]
Frame::Kafka(frame) => write!(f, "Kafka {}", frame),
Frame::Dummy => write!(f, "Shotover internal dummy message"),
#[cfg(feature = "opensearch")]
Frame::OpenSearch(frame) => write!(f, "OpenSearch: {:?}", frame),
}
}
Expand Down
14 changes: 12 additions & 2 deletions shotover/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,20 @@
any(
not(feature = "cassandra"),
not(feature = "redis"),
not(feature = "kafka")
not(feature = "kafka"),
not(feature = "opensearch"),
),
allow(dead_code, unused_imports)
allow(dead_code, unused_imports, unused_variables)
)]
#[cfg(all(
not(feature = "cassandra"),
not(feature = "redis"),
not(feature = "kafka"),
not(feature = "opensearch"),
))]
compile_error!(
"At least one protocol feature must be enabled, e.g. `cassandra`, `redis`, `kafka` or `opensearch`"
);

pub mod codec;
pub mod config;
Expand Down
Loading

0 comments on commit 2ce3fe7

Please sign in to comment.