diff --git a/Cargo.lock b/Cargo.lock index dd2076920..be718f93e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4313,6 +4313,7 @@ dependencies = [ "csv", "dashmap", "derivative", + "fnv", "futures", "generic-array", "governor", diff --git a/custom-transforms-example/src/redis_get_rewrite.rs b/custom-transforms-example/src/redis_get_rewrite.rs index 0a0650eb6..7d31f4260 100644 --- a/custom-transforms-example/src/redis_get_rewrite.rs +++ b/custom-transforms-example/src/redis_get_rewrite.rs @@ -2,9 +2,8 @@ use anyhow::Result; use async_trait::async_trait; use serde::{Deserialize, Serialize}; use shotover::frame::{Frame, RedisFrame}; -use shotover::message::{MessageId, Messages}; +use shotover::message::{MessageIdSet, Messages}; use shotover::transforms::{Transform, TransformBuilder, TransformConfig, Wrapper}; -use std::collections::HashSet; #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(deny_unknown_fields)] @@ -31,7 +30,7 @@ pub struct RedisGetRewriteBuilder { impl TransformBuilder for RedisGetRewriteBuilder { fn build(&self) -> Box { Box::new(RedisGetRewrite { - get_requests: HashSet::new(), + get_requests: MessageIdSet::default(), result: self.result.clone(), }) } @@ -42,7 +41,7 @@ impl TransformBuilder for RedisGetRewriteBuilder { } pub struct RedisGetRewrite { - get_requests: HashSet, + get_requests: MessageIdSet, result: String, } diff --git a/shotover/Cargo.toml b/shotover/Cargo.toml index e18b12df1..b0e714bdb 100644 --- a/shotover/Cargo.toml +++ b/shotover/Cargo.toml @@ -119,6 +119,7 @@ string = { version = "0.3.0", optional = true } xxhash-rust = { version = "0.8.6", features = ["xxh3"], optional = true } dashmap = { version = "5.4.0", optional = true } atoi = { version = "2.0.0", optional = true } +fnv = "1.0.7" [dev-dependencies] criterion = { version = "0.5.0", features = ["async_tokio"] } diff --git a/shotover/src/message/mod.rs b/shotover/src/message/mod.rs index aacc87b6b..22d3a2070 100644 --- a/shotover/src/message/mod.rs +++ b/shotover/src/message/mod.rs @@ -13,11 +13,16 @@ use bytes::Bytes; #[cfg(feature = "cassandra")] use cassandra_protocol::compression::Compression; use derivative::Derivative; +use fnv::FnvBuildHasher; use nonzero_ext::nonzero; use serde::{Deserialize, Serialize}; +use std::collections::{HashMap, HashSet}; use std::num::NonZeroU32; use std::time::Instant; +pub type MessageIdMap = HashMap; +pub type MessageIdSet = HashSet; + pub enum Metadata { #[cfg(feature = "cassandra")] Cassandra(CassandraMetadata), diff --git a/shotover/src/transforms/cassandra/sink_cluster/mod.rs b/shotover/src/transforms/cassandra/sink_cluster/mod.rs index 2faea8926..9de065400 100644 --- a/shotover/src/transforms/cassandra/sink_cluster/mod.rs +++ b/shotover/src/transforms/cassandra/sink_cluster/mod.rs @@ -2,7 +2,7 @@ use self::node_pool::{get_accessible_owned_connection, NodePoolBuilder, Prepared use self::rewrite::MessageRewriter; use crate::frame::cassandra::{CassandraMetadata, Tracing}; use crate::frame::{CassandraFrame, CassandraOperation, CassandraResult, Frame}; -use crate::message::{Message, Messages, Metadata}; +use crate::message::{Message, MessageIdMap, Messages, Metadata}; use crate::tls::{TlsConnector, TlsConnectorConfig}; use crate::transforms::cassandra::connection::{CassandraConnection, Response, ResponseError}; use crate::transforms::{Transform, TransformBuilder, TransformConfig, Wrapper}; @@ -137,7 +137,7 @@ impl CassandraSinkClusterBuilder { shotover_peers, local_shotover_node, to_rewrite: vec![], - prepare_requests_to_destination_nodes: HashMap::new(), + prepare_requests_to_destination_nodes: MessageIdMap::default(), }; Self { diff --git a/shotover/src/transforms/cassandra/sink_cluster/rewrite.rs b/shotover/src/transforms/cassandra/sink_cluster/rewrite.rs index bd10fdd16..ab893ed72 100644 --- a/shotover/src/transforms/cassandra/sink_cluster/rewrite.rs +++ b/shotover/src/transforms/cassandra/sink_cluster/rewrite.rs @@ -2,7 +2,7 @@ use super::node::ConnectionFactory; use super::node_pool::NodePool; use super::ShotoverNode; use crate::frame::{CassandraFrame, CassandraOperation, CassandraResult, Frame}; -use crate::message::{Message, Messages}; +use crate::message::{Message, MessageIdMap, Messages}; use crate::{ frame::{ cassandra::{parse_statement_single, Tracing}, @@ -20,7 +20,6 @@ use cql3_parser::common::{ use cql3_parser::select::{Select, SelectElement}; use futures::future::try_join_all; use itertools::Itertools; -use std::collections::HashMap; use std::fmt::Write; use std::net::{IpAddr, Ipv4Addr}; use uuid::Uuid; @@ -57,7 +56,7 @@ pub struct MessageRewriter { pub shotover_peers: Vec, pub local_shotover_node: ShotoverNode, pub to_rewrite: Vec, - pub prepare_requests_to_destination_nodes: HashMap, + pub prepare_requests_to_destination_nodes: MessageIdMap, } impl MessageRewriter { diff --git a/shotover/src/transforms/mod.rs b/shotover/src/transforms/mod.rs index c7ec91658..51614cb79 100644 --- a/shotover/src/transforms/mod.rs +++ b/shotover/src/transforms/mod.rs @@ -1,11 +1,11 @@ //! Various types required for defining a transform -use crate::message::{Message, MessageId, Messages}; +use self::chain::TransformAndMetrics; +use crate::message::{Message, MessageIdMap, Messages}; use anyhow::{anyhow, Result}; use async_trait::async_trait; use core::fmt; use futures::Future; -use std::collections::HashMap; use std::fmt::{Debug, Formatter}; use std::iter::Rev; use std::net::SocketAddr; @@ -14,8 +14,6 @@ use std::slice::IterMut; use tokio::sync::mpsc; use tokio::time::Instant; -use self::chain::TransformAndMetrics; - #[cfg(feature = "cassandra")] pub mod cassandra; pub mod chain; @@ -188,7 +186,7 @@ impl<'a> Wrapper<'a> { result } - pub fn clone_requests_into_hashmap(&self, destination: &mut HashMap) { + pub fn clone_requests_into_hashmap(&self, destination: &mut MessageIdMap) { for request in &self.requests { destination.insert(request.id(), request.clone()); } diff --git a/shotover/src/transforms/protect/mod.rs b/shotover/src/transforms/protect/mod.rs index 36051ac36..2f81ade14 100644 --- a/shotover/src/transforms/protect/mod.rs +++ b/shotover/src/transforms/protect/mod.rs @@ -1,7 +1,7 @@ use crate::frame::{ value::GenericValue, CassandraFrame, CassandraOperation, CassandraResult, Frame, }; -use crate::message::{Message, MessageId, Messages}; +use crate::message::{Message, MessageIdMap, Messages}; use crate::transforms::protect::key_management::KeyManager; pub use crate::transforms::protect::key_management::KeyManagerConfig; use crate::transforms::{Transform, TransformBuilder, Wrapper}; @@ -56,7 +56,7 @@ impl TransformConfig for ProtectConfig { .collect(), key_source: self.key_manager.build().await?, key_id: "XXXXXXX".to_string(), - requests: HashMap::new(), + requests: MessageIdMap::default(), })) } } @@ -69,7 +69,7 @@ pub struct Protect { // TODO this should be a function to create key_ids based on "something", e.g. primary key // for the moment this is just a string key_id: String, - requests: HashMap, + requests: MessageIdMap, } impl TransformBuilder for Protect { diff --git a/shotover/src/transforms/redis/cluster_ports_rewrite.rs b/shotover/src/transforms/redis/cluster_ports_rewrite.rs index 0396d6a35..f740ee96f 100644 --- a/shotover/src/transforms/redis/cluster_ports_rewrite.rs +++ b/shotover/src/transforms/redis/cluster_ports_rewrite.rs @@ -1,9 +1,6 @@ -use std::collections::HashMap; - use crate::frame::Frame; use crate::frame::RedisFrame; -use crate::message::MessageId; -use crate::message::Messages; +use crate::message::{MessageIdMap, Messages}; use crate::transforms::{Transform, TransformBuilder, TransformConfig, Wrapper}; use anyhow::{anyhow, bail, Context, Result}; use async_trait::async_trait; @@ -39,7 +36,7 @@ impl TransformBuilder for RedisClusterPortsRewrite { #[derive(Clone)] pub struct RedisClusterPortsRewrite { new_port: u16, - request_type: HashMap, + request_type: MessageIdMap, } #[derive(Clone)] @@ -52,7 +49,7 @@ impl RedisClusterPortsRewrite { pub fn new(new_port: u16) -> Self { RedisClusterPortsRewrite { new_port, - request_type: HashMap::new(), + request_type: MessageIdMap::default(), } } } diff --git a/windsock/readme.md b/windsock/readme.md index 70f0d3db9..8d072d031 100644 --- a/windsock/readme.md +++ b/windsock/readme.md @@ -144,7 +144,7 @@ and graphs: TODO ### Just run every bench ```shell -> cargo windsock +> cargo windsock run-local ``` ### Run benches with matching tags and view all the results in one table