diff --git a/shotover-proxy/tests/cassandra_int_tests/cluster/multi_rack.rs b/shotover-proxy/tests/cassandra_int_tests/cluster/multi_rack.rs index 6dc1ad7ef..c44ae8ec6 100644 --- a/shotover-proxy/tests/cassandra_int_tests/cluster/multi_rack.rs +++ b/shotover-proxy/tests/cassandra_int_tests/cluster/multi_rack.rs @@ -162,30 +162,35 @@ pub async fn test(connection: &CassandraConnection) { assert_eq!(out_of_rack_request, "0"); } -pub async fn test_topology_task(ca_path: Option<&str>) { +pub async fn test_topology_task( + ca_path: Option<&str>, + expected_nodes: Vec, + expected_racks: Vec<&'static str>, + token_count: usize, +) { let nodes = run_topology_task(ca_path, None).await; - assert_eq!(nodes.len(), 3); - let mut possible_addresses: Vec = vec![ - "172.16.1.2:9042".parse().unwrap(), - "172.16.1.3:9042".parse().unwrap(), - "172.16.1.4:9042".parse().unwrap(), - ]; - let mut possible_racks: Vec<&str> = vec!["rack1", "rack2", "rack3"]; + assert_eq!(nodes.len(), expected_nodes.len()); + let mut possible_addresses = expected_nodes; + let mut possible_racks = expected_racks; for node in &nodes { let address_index = possible_addresses .iter() .position(|x| *x == node.address) - .expect("Node did not contain a unique expected address"); + .unwrap_or_else(|| { + panic!("Node did not contain a unique expected address. nodes: {nodes:#?}") + }); possible_addresses.remove(address_index); let rack_index = possible_racks .iter() .position(|x| *x == node.rack) - .expect("Node did not contain a unique expected rack"); + .unwrap_or_else(|| { + panic!("Node did not contain a unique expected rack. nodes: {nodes:#?}") + }); possible_racks.remove(rack_index); - assert_eq!(node.tokens.len(), 128); + assert_eq!(node.tokens.len(), token_count); assert!(node.is_up); } } diff --git a/shotover-proxy/tests/cassandra_int_tests/mod.rs b/shotover-proxy/tests/cassandra_int_tests/mod.rs index 854bb25c5..364c56ae9 100644 --- a/shotover-proxy/tests/cassandra_int_tests/mod.rs +++ b/shotover-proxy/tests/cassandra_int_tests/mod.rs @@ -8,6 +8,7 @@ use futures::future::join_all; use futures::Future; use rstest::rstest; use rstest_reuse::{self, *}; +use std::net::SocketAddr; #[cfg(feature = "cassandra-cpp-driver-tests")] use test_helpers::connection::cassandra::CassandraDriver::Datastax; use test_helpers::connection::cassandra::Compression; @@ -59,6 +60,26 @@ where timestamp::test(&connection).await; } +async fn standard_test_suite_default_config_compatible( + connection_creator: impl Fn() -> Fut, + driver: CassandraDriver, +) where + Fut: Future, +{ + // reuse a single connection a bunch to save time recreating connections + let connection = connection_creator().await; + + keyspace::test(&connection).await; + table::test(&connection).await; + udt::test(&connection).await; + native_types::test(&connection).await; + collections::test(&connection, driver).await; + prepared_statements_simple::test(&connection, connection_creator).await; + prepared_statements_all::test(&connection).await; + batch_statements::test(&connection).await; + timestamp::test(&connection).await; +} + #[template] #[rstest] #[case::cdrs(CdrsTokio)] @@ -234,7 +255,7 @@ async fn cluster_single_rack_v4(#[case] driver: CassandraDriver) { #[cfg_attr(feature = "cassandra-cpp-driver-tests", case::datastax(Datastax))] #[case::scylla(Scylla)] #[tokio::test(flavor = "multi_thread")] -async fn cluster_multi_rack(#[case] driver: CassandraDriver) { +async fn cluster_multi_rack_1_per_rack(#[case] driver: CassandraDriver) { let _compose = docker_compose("tests/test-configs/cassandra/cluster-multi-rack/docker-compose.yaml"); @@ -275,7 +296,77 @@ async fn cluster_multi_rack(#[case] driver: CassandraDriver) { shotover_rack3.shutdown_and_then_consume_events(&[]).await; } - cluster::multi_rack::test_topology_task(None).await; + let expected_nodes: Vec = vec![ + "172.16.1.2:9042".parse().unwrap(), + "172.16.1.3:9042".parse().unwrap(), + "172.16.1.4:9042".parse().unwrap(), + ]; + let expected_racks = vec!["rack1", "rack2", "rack3"]; + cluster::multi_rack::test_topology_task(None, expected_nodes, expected_racks, 128).await; +} + +// This is very slow, only test with one driver +#[rstest] +#[case::scylla(Scylla)] +#[tokio::test(flavor = "multi_thread")] +async fn cluster_multi_rack_3_per_rack(#[case] driver: CassandraDriver) { + let _compose = docker_compose( + "tests/test-configs/cassandra/cluster-multi-rack-3-per-rack/docker-compose.yaml", + ); + + { + let shotover_rack1 = shotover_process( + "tests/test-configs/cassandra/cluster-multi-rack-3-per-rack/topology_rack1.yaml", + ) + .with_log_name("Rack1") + .with_observability_port(9001) + .start() + .await; + let shotover_rack2 = shotover_process( + "tests/test-configs/cassandra/cluster-multi-rack-3-per-rack/topology_rack2.yaml", + ) + .with_log_name("Rack2") + .with_observability_port(9002) + .start() + .await; + let shotover_rack3 = shotover_process( + "tests/test-configs/cassandra/cluster-multi-rack-3-per-rack/topology_rack3.yaml", + ) + .with_log_name("Rack3") + .with_observability_port(9003) + .start() + .await; + + let connection = || async { + let mut connection = CassandraConnectionBuilder::new("127.0.0.1", 9042, driver) + .build() + .await; + connection + .enable_schema_awaiter("172.16.1.2:9042", None) + .await; + connection + }; + standard_test_suite_default_config_compatible(&connection, driver).await; + + shotover_rack1.shutdown_and_then_consume_events(&[]).await; + shotover_rack2.shutdown_and_then_consume_events(&[]).await; + shotover_rack3.shutdown_and_then_consume_events(&[]).await; + } + let expected_nodes: Vec = vec![ + "172.16.1.2:9042".parse().unwrap(), + "172.16.1.3:9042".parse().unwrap(), + "172.16.1.4:9042".parse().unwrap(), + "172.16.1.5:9042".parse().unwrap(), + "172.16.1.6:9042".parse().unwrap(), + "172.16.1.7:9042".parse().unwrap(), + "172.16.1.8:9042".parse().unwrap(), + "172.16.1.9:9042".parse().unwrap(), + "172.16.1.10:9042".parse().unwrap(), + ]; + let expected_racks = vec![ + "rack1", "rack1", "rack1", "rack2", "rack2", "rack2", "rack3", "rack3", "rack3", + ]; + cluster::multi_rack::test_topology_task(None, expected_nodes, expected_racks, 16).await; } #[rstest] diff --git a/shotover-proxy/tests/cassandra_int_tests/prepared_statements_all.rs b/shotover-proxy/tests/cassandra_int_tests/prepared_statements_all.rs index 523cf9289..7c8cd4e79 100644 --- a/shotover-proxy/tests/cassandra_int_tests/prepared_statements_all.rs +++ b/shotover-proxy/tests/cassandra_int_tests/prepared_statements_all.rs @@ -22,7 +22,7 @@ fn values() -> Vec { ] } -async fn insert(connection: &CassandraConnection) { +async fn insert(connection: &CassandraConnection, replication_factor: u32) { #[cfg(feature = "cassandra-cpp-driver-tests")] let datastax = matches!(connection, CassandraConnection::Datastax { .. }); #[cfg(not(feature = "cassandra-cpp-driver-tests"))] @@ -31,7 +31,7 @@ async fn insert(connection: &CassandraConnection) { if datastax { // workaround cassandra-cpp not yet supporting binding decimal values let prepared = connection - .prepare("INSERT INTO test_prepare_statements_all.test (id, v0, v1, v2, v3, v4, v5, v6, v7, v8, v9, v10, v11, v12, v13) VALUES (?, ?, ?, ?, ?, 1.0, ?, ?, ?, ?, ?, ?, ?, ?, ?);") + .prepare(&format!("INSERT INTO test_prepare_statements_all{replication_factor}.test (id, v0, v1, v2, v3, v4, v5, v6, v7, v8, v9, v10, v11, v12, v13) VALUES (?, ?, ?, ?, ?, 1.0, ?, ?, ?, ?, ?, ?, ?, ?, ?);")) .await; assert_eq!( @@ -61,7 +61,7 @@ async fn insert(connection: &CassandraConnection) { ); } else { let prepared = connection - .prepare("INSERT INTO test_prepare_statements_all.test (id, v0, v1, v2, v3, v4, v5, v6, v7, v8, v9, v10, v11, v12, v13) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);") + .prepare(&format!("INSERT INTO test_prepare_statements_all{replication_factor}.test (id, v0, v1, v2, v3, v4, v5, v6, v7, v8, v9, v10, v11, v12, v13) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);")) .await; assert_eq!( connection.execute_prepared(&prepared, &values()).await, @@ -70,12 +70,12 @@ async fn insert(connection: &CassandraConnection) { } } -async fn select(connection: &CassandraConnection) { +async fn select(connection: &CassandraConnection, replication_factor: u32) { if let CassandraConnection::CdrsTokio { .. } = connection { // workaround cdrs-tokio having broken encoding for bytes assert_query_result( connection, - "SELECT id, v0, v1, v3, v5, v6, v7, v8, v9, v10, v11, v12, v13 FROM test_prepare_statements_all.test WHERE id = 1", + &format!("SELECT id, v0, v1, v3, v5, v6, v7, v8, v9, v10, v11, v12, v13 FROM test_prepare_statements_all{replication_factor}.test WHERE id = 1"), &[&[ ResultValue::Int(1), ResultValue::Ascii("foo".to_owned()), @@ -98,21 +98,28 @@ async fn select(connection: &CassandraConnection) { } else { assert_query_result( connection, - "SELECT id, v0, v1, v2, v3, v4, v5, v6, v7, v8, v9, v10, v11, v12, v13 FROM test_prepare_statements_all.test WHERE id = 1", + &format!("SELECT id, v0, v1, v2, v3, v4, v5, v6, v7, v8, v9, v10, v11, v12, v13 FROM test_prepare_statements_all{replication_factor}.test WHERE id = 1"), &[&values()], ) .await; } } -pub async fn test(connection: &CassandraConnection) { - run_query(connection, "CREATE KEYSPACE test_prepare_statements_all WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };").await; +async fn setup(connection: &CassandraConnection, replication_factor: u32) { + run_query(connection, &format!("CREATE KEYSPACE test_prepare_statements_all{replication_factor} WITH REPLICATION = {{ 'class' : 'NetworkTopologyStrategy', 'replication_factor' : {replication_factor} }};")).await; run_query( connection, - "CREATE TABLE test_prepare_statements_all.test (id int PRIMARY KEY, v0 ascii, v1 bigint, v2 blob, v3 boolean, v4 decimal, v5 double, v6 float, v7 timestamp, v8 uuid, v9 inet, v10 date, v11 time, v12 smallint, v13 tinyint);", + &format!("CREATE TABLE test_prepare_statements_all{replication_factor}.test (id int PRIMARY KEY, v0 ascii, v1 bigint, v2 blob, v3 boolean, v4 decimal, v5 double, v6 float, v7 timestamp, v8 uuid, v9 inet, v10 date, v11 time, v12 smallint, v13 tinyint);"), ) .await; +} + +pub async fn test(connection: &CassandraConnection) { + setup(connection, 1).await; + insert(connection, 1).await; + select(connection, 1).await; - insert(connection).await; - select(connection).await; + setup(connection, 3).await; + insert(connection, 3).await; + select(connection, 3).await; } diff --git a/shotover-proxy/tests/cassandra_int_tests/prepared_statements_simple.rs b/shotover-proxy/tests/cassandra_int_tests/prepared_statements_simple.rs index e3605e313..6e8ab6fd6 100644 --- a/shotover-proxy/tests/cassandra_int_tests/prepared_statements_simple.rs +++ b/shotover-proxy/tests/cassandra_int_tests/prepared_statements_simple.rs @@ -3,9 +3,11 @@ use test_helpers::connection::cassandra::{ assert_query_result, assert_rows, run_query, CassandraConnection, ResultValue, }; -async fn delete(session: &CassandraConnection) { +async fn delete(session: &CassandraConnection, replication_factor: u32) { let prepared = session - .prepare("DELETE FROM test_prepare_statements.table_1 WHERE id = ?;") + .prepare(&format!( + "DELETE FROM test_prepare_statements{replication_factor}.table_1 WHERE id = ?;" + )) .await; assert_eq!( @@ -17,15 +19,17 @@ async fn delete(session: &CassandraConnection) { assert_query_result( session, - "SELECT * FROM test_prepare_statements.table_1 where id = 1;", + &format!("SELECT * FROM test_prepare_statements{replication_factor}.table_1 where id = 1;"), &[], ) .await; } -async fn insert(session: &CassandraConnection) { +async fn insert(session: &CassandraConnection, replication_factor: u32) { let prepared = session - .prepare("INSERT INTO test_prepare_statements.table_1 (id) VALUES (?);") + .prepare(&format!( + "INSERT INTO test_prepare_statements{replication_factor}.table_1 (id) VALUES (?);" + )) .await; assert_eq!( @@ -50,9 +54,11 @@ async fn insert(session: &CassandraConnection) { ); } -async fn select(session: &CassandraConnection) { +async fn select(session: &CassandraConnection, replication_factor: u32) { let prepared = session - .prepare("SELECT id FROM test_prepare_statements.table_1 WHERE id = ?") + .prepare(&format!( + "SELECT id FROM test_prepare_statements{replication_factor}.table_1 WHERE id = ?" + )) .await; let result_rows = session @@ -65,6 +71,7 @@ async fn select(session: &CassandraConnection) { async fn select_cross_connection( connection: &CassandraConnection, connection_creator: impl Fn() -> Fut, + replication_factor: u32, ) where Fut: Future, { @@ -72,7 +79,9 @@ async fn select_cross_connection( // query is purposely slightly different to past queries to avoid being cached let prepared = connection - .prepare("SELECT id, id FROM test_prepare_statements.table_1 WHERE id = ?") + .prepare(&format!( + "SELECT id, id FROM test_prepare_statements{replication_factor}.table_1 WHERE id = ?" + )) .await; let connection_after = connection_creator().await; @@ -93,9 +102,13 @@ async fn select_cross_connection( ); } -async fn use_statement(session: &CassandraConnection) { +async fn use_statement(session: &CassandraConnection, replication_factor: u32) { // Create prepared command with the correct keyspace - run_query(session, "USE test_prepare_statements;").await; + run_query( + session, + &format!("USE test_prepare_statements{replication_factor};"), + ) + .await; let prepared = session .prepare("INSERT INTO table_1 (id) VALUES (?);") .await; @@ -109,7 +122,11 @@ async fn use_statement(session: &CassandraConnection) { ); // change the keyspace to be incorrect - run_query(session, "USE test_prepare_statements_empty;").await; + run_query( + session, + &format!("USE test_prepare_statements_empty{replication_factor};"), + ) + .await; // observe that the query succeeded despite the keyspace being incorrect at the time. assert_eq!( @@ -120,25 +137,35 @@ async fn use_statement(session: &CassandraConnection) { ); } -pub async fn test(session: &CassandraConnection, connection_creator: impl Fn() -> Fut) -where - Fut: Future, -{ - run_query(session, "CREATE KEYSPACE test_prepare_statements WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };").await; - run_query(session, "CREATE KEYSPACE test_prepare_statements_empty WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };").await; +async fn setup(session: &CassandraConnection, replication_factor: u32) { + run_query(session, &format!("CREATE KEYSPACE test_prepare_statements{replication_factor} WITH REPLICATION = {{ 'class' : 'NetworkTopologyStrategy', 'replication_factor' : {replication_factor} }};")).await; + run_query(session, &format!("CREATE KEYSPACE test_prepare_statements_empty{replication_factor} WITH REPLICATION = {{ 'class' : 'NetworkTopologyStrategy', 'replication_factor' : {replication_factor} }};")).await; run_query( session, - "CREATE TABLE test_prepare_statements.table_1 (id int PRIMARY KEY);", + &format!("CREATE TABLE test_prepare_statements{replication_factor}.table_1 (id int PRIMARY KEY);"), ) .await; +} - insert(session).await; - select(session).await; - select_cross_connection(session, connection_creator).await; - delete(session).await; - use_statement(session).await; +pub async fn test(session: &CassandraConnection, connection_creator: impl Fn() -> Fut) +where + Fut: Future, +{ + setup(session, 1).await; + insert(session, 1).await; + select(session, 1).await; + select_cross_connection(session, &connection_creator, 1).await; + delete(session, 1).await; + use_statement(session, 1).await; let cql = "SELECT * FROM system.local WHERE key = 'local'"; let prepared = session.prepare(cql).await; session.execute_prepared(&prepared, &[]).await.unwrap(); + + setup(session, 3).await; + insert(session, 3).await; + select(session, 3).await; + select_cross_connection(session, &connection_creator, 3).await; + delete(session, 3).await; + use_statement(session, 3).await; } diff --git a/shotover-proxy/tests/test-configs/cassandra/cluster-multi-rack-3-per-rack/docker-compose.yaml b/shotover-proxy/tests/test-configs/cassandra/cluster-multi-rack-3-per-rack/docker-compose.yaml new file mode 100644 index 000000000..c2ceceaaf --- /dev/null +++ b/shotover-proxy/tests/test-configs/cassandra/cluster-multi-rack-3-per-rack/docker-compose.yaml @@ -0,0 +1,103 @@ +version: "3.3" +networks: + cluster_subnet: + name: cluster_subnet + driver: bridge + ipam: + driver: default + config: + - subnet: 172.16.1.0/24 + gateway: 172.16.1.1 + +services: + cassandra1_1: + image: &image library/cassandra:4.0.6 + networks: + cluster_subnet: + ipv4_address: 172.16.1.2 + environment: &environment + CASSANDRA_SEEDS: "cassandra1_1,cassandra1_2,cassandra1_3,cassandra2_1,cassandra2_2,cassandra2_3,cassandra3_1,cassandra3_2,cassandra3_3" + CASSANDRA_CLUSTER_NAME: TestCluster + CASSANDRA_RACK: rack1 + CASSANDRA_DC: dc1 + CASSANDRA_ENDPOINT_SNITCH: GossipingPropertyFileSnitch + MAX_HEAP_SIZE: "400M" + MIN_HEAP_SIZE: "400M" + HEAP_NEWSIZE: "48M" + volumes: &volumes + - type: tmpfs + target: /var/lib/cassandra + cassandra1_2: + image: *image + networks: + cluster_subnet: + ipv4_address: 172.16.1.3 + environment: + <<: *environment + CASSANDRA_RACK: rack1 + volumes: *volumes + cassandra1_3: + image: *image + networks: + cluster_subnet: + ipv4_address: 172.16.1.4 + environment: + <<: *environment + CASSANDRA_RACK: rack1 + volumes: *volumes + + cassandra2_1: + image: *image + networks: + cluster_subnet: + ipv4_address: 172.16.1.5 + environment: + <<: *environment + CASSANDRA_RACK: rack2 + volumes: *volumes + cassandra2_2: + image: *image + networks: + cluster_subnet: + ipv4_address: 172.16.1.6 + environment: + <<: *environment + CASSANDRA_RACK: rack2 + volumes: *volumes + cassandra2_3: + image: *image + networks: + cluster_subnet: + ipv4_address: 172.16.1.7 + environment: + <<: *environment + CASSANDRA_RACK: rack2 + volumes: *volumes + + cassandra3_1: + image: *image + networks: + cluster_subnet: + ipv4_address: 172.16.1.8 + environment: + <<: *environment + CASSANDRA_RACK: rack3 + volumes: *volumes + cassandra3_2: + image: *image + networks: + cluster_subnet: + ipv4_address: 172.16.1.9 + environment: + <<: *environment + CASSANDRA_RACK: rack3 + volumes: *volumes + cassandra3_3: + image: *image + networks: + cluster_subnet: + ipv4_address: 172.16.1.10 + environment: + <<: *environment + CASSANDRA_RACK: rack3 + volumes: *volumes diff --git a/shotover-proxy/tests/test-configs/cassandra/cluster-multi-rack-3-per-rack/topology_rack1.yaml b/shotover-proxy/tests/test-configs/cassandra/cluster-multi-rack-3-per-rack/topology_rack1.yaml new file mode 100644 index 000000000..75c525e1e --- /dev/null +++ b/shotover-proxy/tests/test-configs/cassandra/cluster-multi-rack-3-per-rack/topology_rack1.yaml @@ -0,0 +1,23 @@ +--- +sources: + - Cassandra: + name: "cassandra" + listen_addr: "127.0.0.1:9042" + chain: + - CassandraSinkCluster: + first_contact_points: ["172.16.1.2:9042", "172.16.1.3:9042"] + local_shotover_host_id: "2dd022d6-2937-4754-89d6-02d2933a8f7a" + shotover_nodes: + - address: "127.0.0.1:9042" + data_center: "dc1" + rack: "rack1" + host_id: "2dd022d6-2937-4754-89d6-02d2933a8f7a" + - address: "127.0.0.2:9042" + data_center: "dc1" + rack: "rack2" + host_id: "3c3c4e2d-ba74-4f76-b52e-fb5bcee6a9f4" + - address: "127.0.0.3:9042" + data_center: "dc1" + rack: "rack3" + host_id: "fa74d7ec-1223-472b-97de-04a32ccdb70b" + connect_timeout_ms: 3000 diff --git a/shotover-proxy/tests/test-configs/cassandra/cluster-multi-rack-3-per-rack/topology_rack2.yaml b/shotover-proxy/tests/test-configs/cassandra/cluster-multi-rack-3-per-rack/topology_rack2.yaml new file mode 100644 index 000000000..2bc0b4f94 --- /dev/null +++ b/shotover-proxy/tests/test-configs/cassandra/cluster-multi-rack-3-per-rack/topology_rack2.yaml @@ -0,0 +1,23 @@ +--- +sources: + - Cassandra: + name: "cassandra" + listen_addr: "127.0.0.2:9042" + chain: + - CassandraSinkCluster: + first_contact_points: ["172.16.1.2:9042", "172.16.1.3:9042"] + local_shotover_host_id: "3c3c4e2d-ba74-4f76-b52e-fb5bcee6a9f4" + shotover_nodes: + - address: "127.0.0.1:9042" + data_center: "dc1" + rack: "rack1" + host_id: "2dd022d6-2937-4754-89d6-02d2933a8f7a" + - address: "127.0.0.2:9042" + data_center: "dc1" + rack: "rack2" + host_id: "3c3c4e2d-ba74-4f76-b52e-fb5bcee6a9f4" + - address: "127.0.0.3:9042" + data_center: "dc1" + rack: "rack3" + host_id: "fa74d7ec-1223-472b-97de-04a32ccdb70b" + connect_timeout_ms: 3000 diff --git a/shotover-proxy/tests/test-configs/cassandra/cluster-multi-rack-3-per-rack/topology_rack3.yaml b/shotover-proxy/tests/test-configs/cassandra/cluster-multi-rack-3-per-rack/topology_rack3.yaml new file mode 100644 index 000000000..146819d5e --- /dev/null +++ b/shotover-proxy/tests/test-configs/cassandra/cluster-multi-rack-3-per-rack/topology_rack3.yaml @@ -0,0 +1,23 @@ +--- +sources: + - Cassandra: + name: "cassandra" + listen_addr: "127.0.0.3:9042" + chain: + - CassandraSinkCluster: + first_contact_points: ["172.16.1.2:9042", "172.16.1.3:9042"] + local_shotover_host_id: "fa74d7ec-1223-472b-97de-04a32ccdb70b" + shotover_nodes: + - address: "127.0.0.1:9042" + data_center: "dc1" + rack: "rack1" + host_id: "2dd022d6-2937-4754-89d6-02d2933a8f7a" + - address: "127.0.0.2:9042" + data_center: "dc1" + rack: "rack2" + host_id: "3c3c4e2d-ba74-4f76-b52e-fb5bcee6a9f4" + - address: "127.0.0.3:9042" + data_center: "dc1" + rack: "rack3" + host_id: "fa74d7ec-1223-472b-97de-04a32ccdb70b" + connect_timeout_ms: 3000 diff --git a/shotover/src/transforms/cassandra/sink_cluster/mod.rs b/shotover/src/transforms/cassandra/sink_cluster/mod.rs index d89a54eac..65c7f0c7e 100644 --- a/shotover/src/transforms/cassandra/sink_cluster/mod.rs +++ b/shotover/src/transforms/cassandra/sink_cluster/mod.rs @@ -378,7 +378,7 @@ impl CassandraSinkCluster { .ok_or_else(|| anyhow!("ran out of nodes to send prepare messages to"))?; match self .pool - .nodes() + .nodes_mut() .iter_mut() .find(|node| node.host_id == next_host_id) .ok_or_else(|| anyhow!("node {next_host_id} has dissapeared"))? @@ -516,7 +516,7 @@ impl CassandraSinkCluster { // If any errors occurred close the connection as we can no // longer make any guarantees about the current state of the connection if !is_use_statement_successful(response) { - self.pool.nodes()[node_index].outbound = None; + self.pool.nodes_mut()[node_index].outbound = None; } } diff --git a/shotover/src/transforms/cassandra/sink_cluster/node_pool.rs b/shotover/src/transforms/cassandra/sink_cluster/node_pool.rs index 076badfb3..5a9244a29 100644 --- a/shotover/src/transforms/cassandra/sink_cluster/node_pool.rs +++ b/shotover/src/transforms/cassandra/sink_cluster/node_pool.rs @@ -30,9 +30,18 @@ pub enum GetReplicaErr { NoRoutingKey, } +#[derive(Debug, Clone, PartialEq)] +pub enum ReplicationStrategy { + SimpleStrategy, + NetworkTopologyStrategy, + LocalStrategy, + Unknown, +} + #[derive(Debug, Clone, PartialEq)] pub struct KeyspaceMetadata { pub replication_factor: usize, + pub replication_strategy: ReplicationStrategy, } // Values in the builder are shared between transform instances that come from the same transform in the topology.yaml @@ -70,10 +79,14 @@ pub struct NodePool { } impl NodePool { - pub fn nodes(&mut self) -> &mut [CassandraNode] { + pub fn nodes_mut(&mut self) -> &mut [CassandraNode] { &mut self.nodes } + pub fn nodes(&self) -> &[CassandraNode] { + &self.nodes + } + /// if the node list has been updated use the new list, copying over any existing connections pub fn update_nodes(&mut self, nodes_rx: &mut watch::Receiver>) { let mut new_nodes = nodes_rx.borrow_and_update().clone(); @@ -90,6 +103,11 @@ impl NodePool { } self.nodes = new_nodes; self.token_map = TokenMap::new(self.nodes.as_slice()); + tracing::debug!( + "nodes updated, nodes={:#?}\ntokens={:#?}", + self.nodes, + self.token_map + ); } pub fn report_issue_with_node(&mut self, address: SocketAddr) { @@ -201,10 +219,7 @@ impl NodePool { let replica_host_ids = self .token_map - .iter_replica_nodes_capped( - Murmur3Token::generate(&routing_key), - keyspace.replication_factor, - ) + .iter_replica_nodes(self.nodes(), Murmur3Token::generate(&routing_key), keyspace) .collect::>(); let mut nodes: Vec<&mut CassandraNode> = self @@ -227,9 +242,16 @@ impl NodePool { // An execute message is being delivered outside of CassandraSinkCluster's designated rack. The only cases this can occur is when: // The client correctly routes to the shotover node that reports it has the token in its rack, however the destination cassandra node has since gone down and is now inaccessible. // or + // ReplicationStrategy::SimpleStrategy is used with a replication factor > 1 + // or // The clients token aware routing is broken. + #[cfg(debug_assertions)] + tracing::warn!("No suitable nodes to route to found within rack. This error only occurs in debug builds as it should never occur in an ideal integration test situation."); self.out_of_rack_requests.increment(1); } + tracing::debug!( + "Shotover with designated rack {rack:?} found replica nodes {replica_host_ids:?}" + ); Ok(nodes) } diff --git a/shotover/src/transforms/cassandra/sink_cluster/rewrite.rs b/shotover/src/transforms/cassandra/sink_cluster/rewrite.rs index bb8317d29..507232386 100644 --- a/shotover/src/transforms/cassandra/sink_cluster/rewrite.rs +++ b/shotover/src/transforms/cassandra/sink_cluster/rewrite.rs @@ -111,7 +111,7 @@ impl MessageRewriter { // This is purely an optimization: To avoid opening these connections sequentially later on, we open them concurrently now. try_join_all( - pool.nodes() + pool.nodes_mut() .iter_mut() .filter(|x| destination_nodes.contains(&x.host_id)) .map(|node| node.get_connection(connection_factory)), diff --git a/shotover/src/transforms/cassandra/sink_cluster/test_router.rs b/shotover/src/transforms/cassandra/sink_cluster/test_router.rs index 7909095f0..f6705f168 100644 --- a/shotover/src/transforms/cassandra/sink_cluster/test_router.rs +++ b/shotover/src/transforms/cassandra/sink_cluster/test_router.rs @@ -4,7 +4,7 @@ mod test_token_aware_router { use super::super::routing_key::calculate_routing_key; use crate::transforms::cassandra::sink_cluster::node::CassandraNode; use crate::transforms::cassandra::sink_cluster::node_pool::{ - NodePoolBuilder, PreparedMetadata, + NodePoolBuilder, PreparedMetadata, ReplicationStrategy, }; use crate::transforms::cassandra::sink_cluster::{KeyspaceChanRx, KeyspaceChanTx}; use cassandra_protocol::consistency::Consistency::One; @@ -36,6 +36,7 @@ mod test_token_aware_router { let keyspace_metadata = KeyspaceMetadata { replication_factor: 3, + replication_strategy: ReplicationStrategy::SimpleStrategy, }; let (keyspaces_tx, mut keyspaces_rx): (KeyspaceChanTx, KeyspaceChanRx) = diff --git a/shotover/src/transforms/cassandra/sink_cluster/token_map.rs b/shotover/src/transforms/cassandra/sink_cluster/token_map.rs index fae5fd981..cadba13bb 100644 --- a/shotover/src/transforms/cassandra/sink_cluster/token_map.rs +++ b/shotover/src/transforms/cassandra/sink_cluster/token_map.rs @@ -3,6 +3,8 @@ use cassandra_protocol::token::Murmur3Token; use std::collections::BTreeMap; use uuid::Uuid; +use super::node_pool::{KeyspaceMetadata, ReplicationStrategy}; + #[derive(Debug, Clone)] pub struct TokenMap { token_ring: BTreeMap, @@ -19,15 +21,31 @@ impl TokenMap { } /// Returns nodes starting at given token and going in the direction of replicas. - pub fn iter_replica_nodes_capped( - &self, + pub fn iter_replica_nodes<'a>( + &'a self, + nodes: &'a [CassandraNode], token: Murmur3Token, - replica_count: usize, + keyspace: &'a KeyspaceMetadata, ) -> impl Iterator + '_ { + let mut racks_used = vec![]; self.token_ring .range(token..) .chain(self.token_ring.iter()) - .take(replica_count) + .filter(move |(_, host_id)| { + if let ReplicationStrategy::NetworkTopologyStrategy = keyspace.replication_strategy + { + let rack = &nodes.iter().find(|x| x.host_id == **host_id).unwrap().rack; + if racks_used.contains(&rack) { + false + } else { + racks_used.push(rack); + true + } + } else { + true + } + }) + .take(keyspace.replication_factor) .map(|(_, node)| *node) } } @@ -98,7 +116,33 @@ mod test_token_map { fn verify_tokens(node_host_ids: &[Uuid], token: Murmur3Token) { let token_map = TokenMap::new(prepare_nodes().as_slice()); let nodes = token_map - .iter_replica_nodes_capped(token, node_host_ids.len()) + .iter_replica_nodes( + &[ + CassandraNode::new( + "127.0.0.1:9042".parse().unwrap(), + "rack1".to_owned(), + vec![], + NODE_1, + ), + CassandraNode::new( + "127.0.0.2:9042".parse().unwrap(), + "rack1".to_owned(), + vec![], + NODE_2, + ), + CassandraNode::new( + "127.0.0.3:9042".parse().unwrap(), + "rack1".to_owned(), + vec![], + NODE_3, + ), + ], + token, + &KeyspaceMetadata { + replication_factor: node_host_ids.len(), + replication_strategy: ReplicationStrategy::SimpleStrategy, + }, + ) .collect_vec(); assert_eq!(nodes, node_host_ids); diff --git a/shotover/src/transforms/cassandra/sink_cluster/topology.rs b/shotover/src/transforms/cassandra/sink_cluster/topology.rs index ccdf26d7b..642967782 100644 --- a/shotover/src/transforms/cassandra/sink_cluster/topology.rs +++ b/shotover/src/transforms/cassandra/sink_cluster/topology.rs @@ -235,6 +235,8 @@ async fn fetch_current_nodes( } mod system_keyspaces { + use crate::transforms::cassandra::sink_cluster::node_pool::ReplicationStrategy; + use super::*; use std::str::FromStr; @@ -308,7 +310,10 @@ mod system_keyspaces { anyhow!("Could not parse replication factor as an integer",) })?; - KeyspaceMetadata { replication_factor } + KeyspaceMetadata { + replication_factor, + replication_strategy: ReplicationStrategy::SimpleStrategy, + } } "org.apache.cassandra.locator.NetworkTopologyStrategy" | "NetworkTopologyStrategy" => { @@ -330,17 +335,20 @@ mod system_keyspaces { KeyspaceMetadata { replication_factor: data_center_rf, + replication_strategy: ReplicationStrategy::NetworkTopologyStrategy, } } "org.apache.cassandra.locator.LocalStrategy" | "LocalStrategy" => { KeyspaceMetadata { replication_factor: 1, + replication_strategy: ReplicationStrategy::LocalStrategy, } } _ => { tracing::warn!("Unrecognised replication strategy: {strategy_name:?}"); KeyspaceMetadata { replication_factor: 1, + replication_strategy: ReplicationStrategy::Unknown, } } } @@ -584,6 +592,8 @@ mod system_peers { #[cfg(test)] mod test_system_keyspaces { + use crate::transforms::cassandra::sink_cluster::node_pool::ReplicationStrategy; + use super::*; #[test] @@ -612,7 +622,8 @@ mod test_system_keyspaces { ( "test".into(), KeyspaceMetadata { - replication_factor: 2 + replication_factor: 2, + replication_strategy: ReplicationStrategy::SimpleStrategy, } ) ) @@ -647,7 +658,8 @@ mod test_system_keyspaces { ( "test".into(), KeyspaceMetadata { - replication_factor: 3 + replication_factor: 3, + replication_strategy: ReplicationStrategy::NetworkTopologyStrategy, } ) ) diff --git a/test-helpers/src/docker_compose.rs b/test-helpers/src/docker_compose.rs index c15291c30..2b406155b 100644 --- a/test-helpers/src/docker_compose.rs +++ b/test-helpers/src/docker_compose.rs @@ -27,7 +27,7 @@ pub fn new_moto() -> DockerCompose { docker_compose("tests/transforms/docker-compose-moto.yaml") } -pub static IMAGE_WAITERS: [Image; 10] = [ +pub static IMAGE_WAITERS: [Image; 11] = [ Image { name: "motoserver/moto", log_regex_to_wait_for: r"Press CTRL\+C to quit", @@ -60,6 +60,11 @@ pub static IMAGE_WAITERS: [Image; 10] = [ log_regex_to_wait_for: r"Startup complete", timeout: Duration::from_secs(120), }, + Image { + name: "library/cassandra:4.0.6", + log_regex_to_wait_for: r"Startup complete", + timeout: Duration::from_secs(120), + }, Image { name: "shotover/cassandra-test:4.0.6-r1", log_regex_to_wait_for: r"Startup complete",