diff --git a/shotover-proxy/tests/runner/runner_int_tests.rs b/shotover-proxy/tests/runner/runner_int_tests.rs index cf59e4a93..fe6e44ffc 100644 --- a/shotover-proxy/tests/runner/runner_int_tests.rs +++ b/shotover-proxy/tests/runner/runner_int_tests.rs @@ -95,13 +95,13 @@ async fn test_shotover_shutdown_when_topology_invalid_topology_subchains() { Caused by: Topology errors - redis source: - redis chain: + redis1 source: + redis1 chain: Terminating transform "NullSink" is not last in chain. Terminating transform must be last in chain. Terminating transform "NullSink" is not last in chain. Terminating transform must be last in chain. Non-terminating transform "DebugPrinter" is last in chain. Last transform must be terminating. - redis source: - redis chain: + redis2 source: + redis2 chain: TuneableConsistencyScatter: a_chain_1 chain: Terminating transform "NullSink" is not last in chain. Terminating transform must be last in chain. diff --git a/shotover-proxy/tests/test-configs/cassandra/peers-rewrite/topology.yaml b/shotover-proxy/tests/test-configs/cassandra/peers-rewrite/topology.yaml index af36d736f..9b1f3e7b3 100644 --- a/shotover-proxy/tests/test-configs/cassandra/peers-rewrite/topology.yaml +++ b/shotover-proxy/tests/test-configs/cassandra/peers-rewrite/topology.yaml @@ -1,7 +1,7 @@ --- sources: - Cassandra: - name: "cassandra" + name: "cassandra1" listen_addr: "127.0.0.1:9043" chain: - CassandraSinkSingle: @@ -9,7 +9,7 @@ sources: connect_timeout_ms: 3000 - Cassandra: - name: "cassandra" + name: "cassandra2" listen_addr: "127.0.0.1:9044" chain: - CassandraPeersRewrite: diff --git a/shotover-proxy/tests/test-configs/invalid_subchains.yaml b/shotover-proxy/tests/test-configs/invalid_subchains.yaml index cc99cd550..3e1551b7b 100644 --- a/shotover-proxy/tests/test-configs/invalid_subchains.yaml +++ b/shotover-proxy/tests/test-configs/invalid_subchains.yaml @@ -1,14 +1,14 @@ --- sources: - Redis: - name: "redis" + name: "redis1" listen_addr: "127.0.0.1:6379" chain: - NullSink - NullSink - DebugPrinter - Redis: - name: "redis" + name: "redis2" listen_addr: "127.0.0.1:6379" chain: - DebugPrinter diff --git a/shotover-proxy/tests/test-configs/query_type_filter/simple.yaml b/shotover-proxy/tests/test-configs/query_type_filter/simple.yaml index 981f69414..39131b4bb 100644 --- a/shotover-proxy/tests/test-configs/query_type_filter/simple.yaml +++ b/shotover-proxy/tests/test-configs/query_type_filter/simple.yaml @@ -1,7 +1,7 @@ --- sources: - Redis: - name: "redis" + name: "redis1" listen_addr: "127.0.0.1:6379" connection_limit: 3000000 chain: @@ -10,7 +10,7 @@ sources: - DebugReturner: Redis: "42" - Redis: - name: "redis" + name: "redis2" listen_addr: "127.0.0.1:6380" connection_limit: 3000000 chain: diff --git a/shotover-proxy/tests/test-configs/tee/switch_chain.yaml b/shotover-proxy/tests/test-configs/tee/switch_chain.yaml index 9af3ed9e1..f6c435ff0 100644 --- a/shotover-proxy/tests/test-configs/tee/switch_chain.yaml +++ b/shotover-proxy/tests/test-configs/tee/switch_chain.yaml @@ -15,7 +15,7 @@ sources: - DebugReturner: Redis: "a" - Redis: - name: "redis-3" + name: "redis-2" listen_addr: "127.0.0.1:6372" connection_limit: chain: diff --git a/shotover/src/config/topology.rs b/shotover/src/config/topology.rs index 374677ea5..bdc3dd398 100644 --- a/shotover/src/config/topology.rs +++ b/shotover/src/config/topology.rs @@ -1,6 +1,8 @@ use crate::sources::{Source, SourceConfig}; use anyhow::{anyhow, Context, Result}; +use itertools::Itertools; use serde::{Deserialize, Serialize}; +use std::fmt::Write; use tokio::sync::watch; use tracing::info; @@ -36,6 +38,21 @@ impl Topology { let mut sources: Vec = Vec::new(); let mut topology_errors = String::new(); + + let mut duplicated_names = vec![]; + for source in &self.sources { + let name = source.get_name(); + if self.sources.iter().filter(|x| x.get_name() == name).count() > 1 { + duplicated_names.push(name); + } + } + for name in duplicated_names.iter().unique() { + writeln!( + topology_errors, + "Source name {name:?} occurred more than once. Make sure all source names are unique. The names will be used in logging and metrics." + )?; + } + for source in &self.sources { match source.get_source(trigger_shutdown_rx.clone()).await { Ok(source) => sources.push(source), @@ -481,6 +498,26 @@ foo source: assert_eq!(error, expected); } + #[tokio::test] + async fn test_validate_repeated_source_names() { + let expected = r#"Topology errors +Source name "foo" occurred more than once. Make sure all source names are unique. The names will be used in logging and metrics. +"#; + + let mut sources = create_source_from_chain(vec![Box::new(NullSinkConfig)]); + sources.extend(create_source_from_chain(vec![Box::new(NullSinkConfig)])); + + let topology = Topology { sources }; + let (_sender, trigger_shutdown_rx) = watch::channel::(false); + let error = topology + .run_chains(trigger_shutdown_rx) + .await + .unwrap_err() + .to_string(); + + assert_eq!(error, expected); + } + #[tokio::test] async fn test_validate_chain_multiple_subchains() { let (_sender, trigger_shutdown_rx) = watch::channel::(false); @@ -495,13 +532,13 @@ foo source: .to_string(); let expected = r#"Topology errors -redis source: - redis chain: +redis1 source: + redis1 chain: Terminating transform "NullSink" is not last in chain. Terminating transform must be last in chain. Terminating transform "NullSink" is not last in chain. Terminating transform must be last in chain. Non-terminating transform "DebugPrinter" is last in chain. Last transform must be terminating. -redis source: - redis chain: +redis2 source: + redis2 chain: TuneableConsistencyScatter: a_chain_1 chain: Terminating transform "NullSink" is not last in chain. Terminating transform must be last in chain. diff --git a/shotover/src/sources/mod.rs b/shotover/src/sources/mod.rs index 3a32c1a33..646577182 100644 --- a/shotover/src/sources/mod.rs +++ b/shotover/src/sources/mod.rs @@ -85,4 +85,17 @@ impl SourceConfig { SourceConfig::OpenSearch(r) => r.get_source(trigger_shutdown_rx).await, } } + + pub(crate) fn get_name(&self) -> &str { + match self { + #[cfg(feature = "cassandra")] + SourceConfig::Cassandra(c) => &c.name, + #[cfg(feature = "redis")] + SourceConfig::Redis(r) => &r.name, + #[cfg(feature = "kafka")] + SourceConfig::Kafka(r) => &r.name, + #[cfg(feature = "opensearch")] + SourceConfig::OpenSearch(r) => &r.name, + } + } }