diff --git a/shotover-proxy/tests/test-configs/redis/cluster-dr/topology.yaml b/shotover-proxy/tests/test-configs/redis/cluster-dr/topology.yaml index 1ec8b686c..522807b19 100644 --- a/shotover-proxy/tests/test-configs/redis/cluster-dr/topology.yaml +++ b/shotover-proxy/tests/test-configs/redis/cluster-dr/topology.yaml @@ -10,17 +10,18 @@ sources: behavior: Ignore buffer_size: 10000 chain: - - QueryTypeFilter: - DenyList: [Read] - - Coalesce: - flush_when_buffered_message_count: 2000 - # Use an unreasonably large timeout here so that integration tests dont break on slow hardware or a performance regression - flush_when_millis_since_last_flush: 1000000000 - - QueryCounter: - name: "DR chain" - - RedisSinkCluster: - first_contact_points: [ "127.0.0.1:2120", "127.0.0.1:2121", "127.0.0.1:2122", "127.0.0.1:2123", "127.0.0.1:2124", "127.0.0.1:2125" ] - connect_timeout_ms: 3000 + Single: + - QueryTypeFilter: + DenyList: [Read] + - Coalesce: + flush_when_buffered_message_count: 2000 + # Use an unreasonably large timeout here so that integration tests dont break on slow hardware or a performance regression + flush_when_millis_since_last_flush: 1000000000 + - QueryCounter: + name: "DR chain" + - RedisSinkCluster: + first_contact_points: [ "127.0.0.1:2120", "127.0.0.1:2121", "127.0.0.1:2122", "127.0.0.1:2123", "127.0.0.1:2124", "127.0.0.1:2125" ] + connect_timeout_ms: 3000 - QueryCounter: name: "Main chain" - RedisSinkCluster: diff --git a/shotover-proxy/tests/test-configs/tee/fail.yaml b/shotover-proxy/tests/test-configs/tee/fail.yaml index b30b69c8f..072c792a2 100644 --- a/shotover-proxy/tests/test-configs/tee/fail.yaml +++ b/shotover-proxy/tests/test-configs/tee/fail.yaml @@ -9,9 +9,10 @@ sources: behavior: FailOnMismatch buffer_size: 10000 chain: - - QueryTypeFilter: - DenyList: [Read] - - DebugReturner: - Redis: "42" + Single: + - QueryTypeFilter: + DenyList: [Read] + - DebugReturner: + Redis: "42" - DebugReturner: Redis: "42" diff --git a/shotover-proxy/tests/test-configs/tee/fail_with_mismatch.yaml b/shotover-proxy/tests/test-configs/tee/fail_with_mismatch.yaml index d735a235c..d272c4d0f 100644 --- a/shotover-proxy/tests/test-configs/tee/fail_with_mismatch.yaml +++ b/shotover-proxy/tests/test-configs/tee/fail_with_mismatch.yaml @@ -9,9 +9,10 @@ sources: behavior: FailOnMismatch buffer_size: 10000 chain: - - QueryTypeFilter: - DenyList: [Read] - - DebugReturner: - Redis: "41" + Single: + - QueryTypeFilter: + DenyList: [Read] + - DebugReturner: + Redis: "41" - DebugReturner: Redis: "42" diff --git a/shotover-proxy/tests/test-configs/tee/ignore.yaml b/shotover-proxy/tests/test-configs/tee/ignore.yaml index 2fbecae5d..c03352e6e 100644 --- a/shotover-proxy/tests/test-configs/tee/ignore.yaml +++ b/shotover-proxy/tests/test-configs/tee/ignore.yaml @@ -9,9 +9,10 @@ sources: behavior: Ignore buffer_size: 10000 chain: - - QueryTypeFilter: - DenyList: [Read] - - DebugReturner: - Redis: "42" + Single: + - QueryTypeFilter: + DenyList: [Read] + - DebugReturner: + Redis: "42" - DebugReturner: Redis: "42" diff --git a/shotover-proxy/tests/test-configs/tee/ignore_with_mismatch.yaml b/shotover-proxy/tests/test-configs/tee/ignore_with_mismatch.yaml index 8a0d859c7..9befb4e80 100644 --- a/shotover-proxy/tests/test-configs/tee/ignore_with_mismatch.yaml +++ b/shotover-proxy/tests/test-configs/tee/ignore_with_mismatch.yaml @@ -9,9 +9,10 @@ sources: behavior: Ignore buffer_size: 10000 chain: - - QueryTypeFilter: - DenyList: [Read] - - DebugReturner: - Redis: "41" + Single: + - QueryTypeFilter: + DenyList: [Read] + - DebugReturner: + Redis: "41" - DebugReturner: Redis: "42" diff --git a/shotover-proxy/tests/test-configs/tee/log.yaml b/shotover-proxy/tests/test-configs/tee/log.yaml index 5aff0384d..4430d9134 100644 --- a/shotover-proxy/tests/test-configs/tee/log.yaml +++ b/shotover-proxy/tests/test-configs/tee/log.yaml @@ -9,9 +9,10 @@ sources: behavior: LogWarningOnMismatch buffer_size: 10000 chain: - - QueryTypeFilter: - DenyList: [Read] - - DebugReturner: - Redis: "42" + Single: + - QueryTypeFilter: + DenyList: [Read] + - DebugReturner: + Redis: "42" - DebugReturner: Redis: "42" diff --git a/shotover-proxy/tests/test-configs/tee/log_with_mismatch.yaml b/shotover-proxy/tests/test-configs/tee/log_with_mismatch.yaml index 10305bb4a..63d02a6b6 100644 --- a/shotover-proxy/tests/test-configs/tee/log_with_mismatch.yaml +++ b/shotover-proxy/tests/test-configs/tee/log_with_mismatch.yaml @@ -9,9 +9,10 @@ sources: behavior: LogWarningOnMismatch buffer_size: 10000 chain: - - QueryTypeFilter: - DenyList: [Read] - - DebugReturner: - Redis: "41" + Single: + - QueryTypeFilter: + DenyList: [Read] + - DebugReturner: + Redis: "41" - DebugReturner: Redis: "42" diff --git a/shotover-proxy/tests/test-configs/tee/subchain.yaml b/shotover-proxy/tests/test-configs/tee/subchain.yaml index 9bc3bb6cb..a5a85171e 100644 --- a/shotover-proxy/tests/test-configs/tee/subchain.yaml +++ b/shotover-proxy/tests/test-configs/tee/subchain.yaml @@ -15,7 +15,8 @@ sources: connect_timeout_ms: 3000 buffer_size: 10000 chain: - - DebugReturner: - Redis: "42" + Single: + - DebugReturner: + Redis: "42" - DebugReturner: Redis: "42" diff --git a/shotover-proxy/tests/test-configs/tee/subchain_with_mismatch.yaml b/shotover-proxy/tests/test-configs/tee/subchain_with_mismatch.yaml index dd02461ef..bd3fcf609 100644 --- a/shotover-proxy/tests/test-configs/tee/subchain_with_mismatch.yaml +++ b/shotover-proxy/tests/test-configs/tee/subchain_with_mismatch.yaml @@ -15,9 +15,10 @@ sources: remote_address: "127.0.0.1:1111" connect_timeout_ms: 3000 chain: - - QueryTypeFilter: - DenyList: [Read] - - DebugReturner: - Redis: "41" + Single: + - QueryTypeFilter: + DenyList: [Read] + - DebugReturner: + Redis: "41" - DebugReturner: Redis: "42" diff --git a/shotover/src/transforms/tee.rs b/shotover/src/transforms/tee.rs index a239b78c1..560abb7fe 100644 --- a/shotover/src/transforms/tee.rs +++ b/shotover/src/transforms/tee.rs @@ -6,6 +6,7 @@ use anyhow::Result; use async_trait::async_trait; use metrics::{register_counter, Counter}; use serde::{Deserialize, Serialize}; +use std::collections::HashMap; use tracing::{debug, trace, warn}; pub struct TeeBuilder { @@ -106,12 +107,32 @@ pub enum ConsistencyBehavior { SubchainOnMismatch(BufferedChain), } +pub enum ChainMode { + Single(BufferedChain), + Multi(HashMap), +} + +#[derive(Serialize, Deserialize, Debug)] +pub enum ChainModeConfig { + Single(TransformChainConfig), + Multi(HashMap), +} + +impl ChainModeConfig { + async fn get_builder(&self) -> Result { + match self { + ChainModeConfig::Single(chain) => chain.get_builder("tee_chain".to_string()).await, + ChainModeConfig::Multi(_chains) => todo!(), + } + } +} + #[derive(Serialize, Deserialize, Debug)] #[serde(deny_unknown_fields)] pub struct TeeConfig { pub behavior: Option, pub timeout_micros: Option, - pub chain: TransformChainConfig, + pub chain: ChainModeConfig, pub buffer_size: Option, } @@ -146,7 +167,8 @@ impl TransformConfig for TeeConfig { } None => ConsistencyBehaviorBuilder::Ignore, }; - let tee_chain = self.chain.get_builder("tee_chain".to_string()).await?; + + let tee_chain = self.chain.get_builder().await?; Ok(Box::new(TeeBuilder::new( tee_chain, @@ -253,12 +275,16 @@ mod tests { use super::*; use crate::transforms::null::NullSinkConfig; + fn null_sink_chain() -> ChainModeConfig { + ChainModeConfig::Single(TransformChainConfig(vec![Box::new(NullSinkConfig)])) + } + #[tokio::test] async fn test_validate_subchain_valid() { let config = TeeConfig { behavior: None, timeout_micros: None, - chain: TransformChainConfig(vec![Box::new(NullSinkConfig)]), + chain: null_sink_chain(), buffer_size: None, }; @@ -272,7 +298,10 @@ mod tests { let config = TeeConfig { behavior: None, timeout_micros: None, - chain: TransformChainConfig(vec![Box::new(NullSinkConfig), Box::new(NullSinkConfig)]), + chain: ChainModeConfig::Single(TransformChainConfig(vec![ + Box::new(NullSinkConfig), + Box::new(NullSinkConfig), + ])), buffer_size: None, }; @@ -289,7 +318,7 @@ mod tests { let config = TeeConfig { behavior: Some(ConsistencyBehaviorConfig::Ignore), timeout_micros: None, - chain: TransformChainConfig(vec![Box::new(NullSinkConfig)]), + chain: null_sink_chain(), buffer_size: None, }; let transform = config.get_builder("".to_owned()).await.unwrap(); @@ -302,7 +331,7 @@ mod tests { let config = TeeConfig { behavior: Some(ConsistencyBehaviorConfig::FailOnMismatch), timeout_micros: None, - chain: TransformChainConfig(vec![Box::new(NullSinkConfig)]), + chain: null_sink_chain(), buffer_size: None, }; let transform = config.get_builder("".to_owned()).await.unwrap(); @@ -317,7 +346,7 @@ mod tests { TransformChainConfig(vec![Box::new(NullSinkConfig), Box::new(NullSinkConfig)]), )), timeout_micros: None, - chain: TransformChainConfig(vec![Box::new(NullSinkConfig)]), + chain: null_sink_chain(), buffer_size: None, }; @@ -336,7 +365,7 @@ mod tests { TransformChainConfig(vec![Box::new(NullSinkConfig)]), )), timeout_micros: None, - chain: TransformChainConfig(vec![Box::new(NullSinkConfig)]), + chain: null_sink_chain(), buffer_size: None, };