Skip to content

Commit

Permalink
Fail to startup when a source name is reused
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Feb 28, 2024
1 parent 81ddc61 commit 478556f
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 15 deletions.
8 changes: 4 additions & 4 deletions shotover-proxy/tests/runner/runner_int_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,13 @@ async fn test_shotover_shutdown_when_topology_invalid_topology_subchains() {
Caused by:
Topology errors
redis source:
redis chain:
redis1 source:
redis1 chain:
Terminating transform "NullSink" is not last in chain. Terminating transform must be last in chain.
Terminating transform "NullSink" is not last in chain. Terminating transform must be last in chain.
Non-terminating transform "DebugPrinter" is last in chain. Last transform must be terminating.
redis source:
redis chain:
redis2 source:
redis2 chain:
TuneableConsistencyScatter:
a_chain_1 chain:
Terminating transform "NullSink" is not last in chain. Terminating transform must be last in chain.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
---
sources:
- Cassandra:
name: "cassandra"
name: "cassandra1"
listen_addr: "127.0.0.1:9043"
chain:
- CassandraSinkSingle:
remote_address: "172.16.1.2:9042"
connect_timeout_ms: 3000

- Cassandra:
name: "cassandra"
name: "cassandra2"
listen_addr: "127.0.0.1:9044"
chain:
- CassandraPeersRewrite:
Expand Down
4 changes: 2 additions & 2 deletions shotover-proxy/tests/test-configs/invalid_subchains.yaml
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
---
sources:
- Redis:
name: "redis"
name: "redis1"
listen_addr: "127.0.0.1:6379"
chain:
- NullSink
- NullSink
- DebugPrinter
- Redis:
name: "redis"
name: "redis2"
listen_addr: "127.0.0.1:6379"
chain:
- DebugPrinter
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
---
sources:
- Redis:
name: "redis"
name: "redis1"
listen_addr: "127.0.0.1:6379"
connection_limit: 3000000
chain:
Expand All @@ -10,7 +10,7 @@ sources:
- DebugReturner:
Redis: "42"
- Redis:
name: "redis"
name: "redis2"
listen_addr: "127.0.0.1:6380"
connection_limit: 3000000
chain:
Expand Down
2 changes: 1 addition & 1 deletion shotover-proxy/tests/test-configs/tee/switch_chain.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ sources:
- DebugReturner:
Redis: "a"
- Redis:
name: "redis-3"
name: "redis-2"
listen_addr: "127.0.0.1:6372"
connection_limit:
chain:
Expand Down
45 changes: 41 additions & 4 deletions shotover/src/config/topology.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use crate::sources::{Source, SourceConfig};
use anyhow::{anyhow, Context, Result};
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use std::fmt::Write;
use tokio::sync::watch;
use tracing::info;

Expand Down Expand Up @@ -36,6 +38,21 @@ impl Topology {
let mut sources: Vec<Source> = Vec::new();

let mut topology_errors = String::new();

let mut duplicated_names = vec![];
for source in &self.sources {
let name = source.get_name();
if self.sources.iter().filter(|x| x.get_name() == name).count() > 1 {
duplicated_names.push(name);
}
}
for name in duplicated_names.iter().unique() {
writeln!(
topology_errors,
"Source name {name:?} occurred more than once. Make sure all source names are unique. The names will be used in logging and metrics."
)?;
}

for source in &self.sources {
match source.get_source(trigger_shutdown_rx.clone()).await {
Ok(source) => sources.push(source),
Expand Down Expand Up @@ -481,6 +498,26 @@ foo source:
assert_eq!(error, expected);
}

#[tokio::test]
async fn test_validate_repeated_source_names() {
let expected = r#"Topology errors
Source name "foo" occurred more than once. Make sure all source names are unique. The names will be used in logging and metrics.
"#;

let mut sources = create_source_from_chain(vec![Box::new(NullSinkConfig)]);
sources.extend(create_source_from_chain(vec![Box::new(NullSinkConfig)]));

let topology = Topology { sources };
let (_sender, trigger_shutdown_rx) = watch::channel::<bool>(false);
let error = topology
.run_chains(trigger_shutdown_rx)
.await
.unwrap_err()
.to_string();

assert_eq!(error, expected);
}

#[tokio::test]
async fn test_validate_chain_multiple_subchains() {
let (_sender, trigger_shutdown_rx) = watch::channel::<bool>(false);
Expand All @@ -495,13 +532,13 @@ foo source:
.to_string();

let expected = r#"Topology errors
redis source:
redis chain:
redis1 source:
redis1 chain:
Terminating transform "NullSink" is not last in chain. Terminating transform must be last in chain.
Terminating transform "NullSink" is not last in chain. Terminating transform must be last in chain.
Non-terminating transform "DebugPrinter" is last in chain. Last transform must be terminating.
redis source:
redis chain:
redis2 source:
redis2 chain:
TuneableConsistencyScatter:
a_chain_1 chain:
Terminating transform "NullSink" is not last in chain. Terminating transform must be last in chain.
Expand Down
13 changes: 13 additions & 0 deletions shotover/src/sources/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,17 @@ impl SourceConfig {
SourceConfig::OpenSearch(r) => r.get_source(trigger_shutdown_rx).await,
}
}

pub(crate) fn get_name(&self) -> &str {
match self {
#[cfg(feature = "cassandra")]
SourceConfig::Cassandra(c) => &c.name,
#[cfg(feature = "redis")]
SourceConfig::Redis(r) => &r.name,
#[cfg(feature = "kafka")]
SourceConfig::Kafka(r) => &r.name,
#[cfg(feature = "opensearch")]
SourceConfig::OpenSearch(r) => &r.name,
}
}
}

0 comments on commit 478556f

Please sign in to comment.