Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
conorbros committed Oct 4, 2023
1 parent 22f27be commit b6232f5
Show file tree
Hide file tree
Showing 10 changed files with 87 additions and 49 deletions.
23 changes: 12 additions & 11 deletions shotover-proxy/tests/test-configs/redis/cluster-dr/topology.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,18 @@ sources:
behavior: Ignore
buffer_size: 10000
chain:
- QueryTypeFilter:
DenyList: [Read]
- Coalesce:
flush_when_buffered_message_count: 2000
# Use an unreasonably large timeout here so that integration tests dont break on slow hardware or a performance regression
flush_when_millis_since_last_flush: 1000000000
- QueryCounter:
name: "DR chain"
- RedisSinkCluster:
first_contact_points: [ "127.0.0.1:2120", "127.0.0.1:2121", "127.0.0.1:2122", "127.0.0.1:2123", "127.0.0.1:2124", "127.0.0.1:2125" ]
connect_timeout_ms: 3000
Single:
- QueryTypeFilter:
DenyList: [Read]
- Coalesce:
flush_when_buffered_message_count: 2000
# Use an unreasonably large timeout here so that integration tests dont break on slow hardware or a performance regression
flush_when_millis_since_last_flush: 1000000000
- QueryCounter:
name: "DR chain"
- RedisSinkCluster:
first_contact_points: [ "127.0.0.1:2120", "127.0.0.1:2121", "127.0.0.1:2122", "127.0.0.1:2123", "127.0.0.1:2124", "127.0.0.1:2125" ]
connect_timeout_ms: 3000
- QueryCounter:
name: "Main chain"
- RedisSinkCluster:
Expand Down
9 changes: 5 additions & 4 deletions shotover-proxy/tests/test-configs/tee/fail.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ sources:
behavior: FailOnMismatch
buffer_size: 10000
chain:
- QueryTypeFilter:
DenyList: [Read]
- DebugReturner:
Redis: "42"
Single:
- QueryTypeFilter:
DenyList: [Read]
- DebugReturner:
Redis: "42"
- DebugReturner:
Redis: "42"
9 changes: 5 additions & 4 deletions shotover-proxy/tests/test-configs/tee/fail_with_mismatch.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ sources:
behavior: FailOnMismatch
buffer_size: 10000
chain:
- QueryTypeFilter:
DenyList: [Read]
- DebugReturner:
Redis: "41"
Single:
- QueryTypeFilter:
DenyList: [Read]
- DebugReturner:
Redis: "41"
- DebugReturner:
Redis: "42"
9 changes: 5 additions & 4 deletions shotover-proxy/tests/test-configs/tee/ignore.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ sources:
behavior: Ignore
buffer_size: 10000
chain:
- QueryTypeFilter:
DenyList: [Read]
- DebugReturner:
Redis: "42"
Single:
- QueryTypeFilter:
DenyList: [Read]
- DebugReturner:
Redis: "42"
- DebugReturner:
Redis: "42"
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ sources:
behavior: Ignore
buffer_size: 10000
chain:
- QueryTypeFilter:
DenyList: [Read]
- DebugReturner:
Redis: "41"
Single:
- QueryTypeFilter:
DenyList: [Read]
- DebugReturner:
Redis: "41"
- DebugReturner:
Redis: "42"
9 changes: 5 additions & 4 deletions shotover-proxy/tests/test-configs/tee/log.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ sources:
behavior: LogWarningOnMismatch
buffer_size: 10000
chain:
- QueryTypeFilter:
DenyList: [Read]
- DebugReturner:
Redis: "42"
Single:
- QueryTypeFilter:
DenyList: [Read]
- DebugReturner:
Redis: "42"
- DebugReturner:
Redis: "42"
9 changes: 5 additions & 4 deletions shotover-proxy/tests/test-configs/tee/log_with_mismatch.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ sources:
behavior: LogWarningOnMismatch
buffer_size: 10000
chain:
- QueryTypeFilter:
DenyList: [Read]
- DebugReturner:
Redis: "41"
Single:
- QueryTypeFilter:
DenyList: [Read]
- DebugReturner:
Redis: "41"
- DebugReturner:
Redis: "42"
5 changes: 3 additions & 2 deletions shotover-proxy/tests/test-configs/tee/subchain.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ sources:
connect_timeout_ms: 3000
buffer_size: 10000
chain:
- DebugReturner:
Redis: "42"
Single:
- DebugReturner:
Redis: "42"
- DebugReturner:
Redis: "42"
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ sources:
remote_address: "127.0.0.1:1111"
connect_timeout_ms: 3000
chain:
- QueryTypeFilter:
DenyList: [Read]
- DebugReturner:
Redis: "41"
Single:
- QueryTypeFilter:
DenyList: [Read]
- DebugReturner:
Redis: "41"
- DebugReturner:
Redis: "42"
45 changes: 37 additions & 8 deletions shotover/src/transforms/tee.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use anyhow::Result;
use async_trait::async_trait;
use metrics::{register_counter, Counter};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use tracing::{debug, trace, warn};

pub struct TeeBuilder {
Expand Down Expand Up @@ -106,12 +107,32 @@ pub enum ConsistencyBehavior {
SubchainOnMismatch(BufferedChain),
}

pub enum ChainMode {
Single(BufferedChain),
Multi(HashMap<String, BufferedChain>),
}

#[derive(Serialize, Deserialize, Debug)]
pub enum ChainModeConfig {
Single(TransformChainConfig),
Multi(HashMap<String, TransformChainConfig>),
}

impl ChainModeConfig {
async fn get_builder(&self) -> Result<TransformChainBuilder> {
match self {
ChainModeConfig::Single(chain) => chain.get_builder("tee_chain".to_string()).await,
ChainModeConfig::Multi(_chains) => todo!(),
}
}
}

#[derive(Serialize, Deserialize, Debug)]
#[serde(deny_unknown_fields)]
pub struct TeeConfig {
pub behavior: Option<ConsistencyBehaviorConfig>,
pub timeout_micros: Option<u64>,
pub chain: TransformChainConfig,
pub chain: ChainModeConfig,
pub buffer_size: Option<usize>,
}

Expand Down Expand Up @@ -146,7 +167,8 @@ impl TransformConfig for TeeConfig {
}
None => ConsistencyBehaviorBuilder::Ignore,
};
let tee_chain = self.chain.get_builder("tee_chain".to_string()).await?;

let tee_chain = self.chain.get_builder().await?;

Ok(Box::new(TeeBuilder::new(
tee_chain,
Expand Down Expand Up @@ -253,12 +275,16 @@ mod tests {
use super::*;
use crate::transforms::null::NullSinkConfig;

fn null_sink_chain() -> ChainModeConfig {
ChainModeConfig::Single(TransformChainConfig(vec![Box::new(NullSinkConfig)]))
}

#[tokio::test]
async fn test_validate_subchain_valid() {
let config = TeeConfig {
behavior: None,
timeout_micros: None,
chain: TransformChainConfig(vec![Box::new(NullSinkConfig)]),
chain: null_sink_chain(),
buffer_size: None,
};

Expand All @@ -272,7 +298,10 @@ mod tests {
let config = TeeConfig {
behavior: None,
timeout_micros: None,
chain: TransformChainConfig(vec![Box::new(NullSinkConfig), Box::new(NullSinkConfig)]),
chain: ChainModeConfig::Single(TransformChainConfig(vec![
Box::new(NullSinkConfig),
Box::new(NullSinkConfig),
])),
buffer_size: None,
};

Expand All @@ -289,7 +318,7 @@ mod tests {
let config = TeeConfig {
behavior: Some(ConsistencyBehaviorConfig::Ignore),
timeout_micros: None,
chain: TransformChainConfig(vec![Box::new(NullSinkConfig)]),
chain: null_sink_chain(),
buffer_size: None,
};
let transform = config.get_builder("".to_owned()).await.unwrap();
Expand All @@ -302,7 +331,7 @@ mod tests {
let config = TeeConfig {
behavior: Some(ConsistencyBehaviorConfig::FailOnMismatch),
timeout_micros: None,
chain: TransformChainConfig(vec![Box::new(NullSinkConfig)]),
chain: null_sink_chain(),
buffer_size: None,
};
let transform = config.get_builder("".to_owned()).await.unwrap();
Expand All @@ -317,7 +346,7 @@ mod tests {
TransformChainConfig(vec![Box::new(NullSinkConfig), Box::new(NullSinkConfig)]),
)),
timeout_micros: None,
chain: TransformChainConfig(vec![Box::new(NullSinkConfig)]),
chain: null_sink_chain(),
buffer_size: None,
};

Expand All @@ -336,7 +365,7 @@ mod tests {
TransformChainConfig(vec![Box::new(NullSinkConfig)]),
)),
timeout_micros: None,
chain: TransformChainConfig(vec![Box::new(NullSinkConfig)]),
chain: null_sink_chain(),
buffer_size: None,
};

Expand Down

0 comments on commit b6232f5

Please sign in to comment.