Skip to content

Commit

Permalink
Add cassandra_int_tests::clustesr_multi_rack_rf3
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Dec 5, 2023
1 parent 4f89245 commit fb18f7e
Show file tree
Hide file tree
Showing 15 changed files with 451 additions and 65 deletions.
27 changes: 16 additions & 11 deletions shotover-proxy/tests/cassandra_int_tests/cluster/multi_rack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,30 +162,35 @@ pub async fn test(connection: &CassandraConnection) {
assert_eq!(out_of_rack_request, "0");
}

pub async fn test_topology_task(ca_path: Option<&str>) {
pub async fn test_topology_task(
ca_path: Option<&str>,
expected_nodes: Vec<SocketAddr>,
expected_racks: Vec<&'static str>,
token_count: usize,
) {
let nodes = run_topology_task(ca_path, None).await;

assert_eq!(nodes.len(), 3);
let mut possible_addresses: Vec<SocketAddr> = vec![
"172.16.1.2:9042".parse().unwrap(),
"172.16.1.3:9042".parse().unwrap(),
"172.16.1.4:9042".parse().unwrap(),
];
let mut possible_racks: Vec<&str> = vec!["rack1", "rack2", "rack3"];
assert_eq!(nodes.len(), expected_nodes.len());
let mut possible_addresses = expected_nodes;
let mut possible_racks = expected_racks;
for node in &nodes {
let address_index = possible_addresses
.iter()
.position(|x| *x == node.address)
.expect("Node did not contain a unique expected address");
.unwrap_or_else(|| {
panic!("Node did not contain a unique expected address. nodes: {nodes:#?}")
});
possible_addresses.remove(address_index);

let rack_index = possible_racks
.iter()
.position(|x| *x == node.rack)
.expect("Node did not contain a unique expected rack");
.unwrap_or_else(|| {
panic!("Node did not contain a unique expected rack. nodes: {nodes:#?}")
});
possible_racks.remove(rack_index);

assert_eq!(node.tokens.len(), 128);
assert_eq!(node.tokens.len(), token_count);
assert!(node.is_up);
}
}
95 changes: 93 additions & 2 deletions shotover-proxy/tests/cassandra_int_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use futures::future::join_all;
use futures::Future;
use rstest::rstest;
use rstest_reuse::{self, *};
use std::net::SocketAddr;
#[cfg(feature = "cassandra-cpp-driver-tests")]
use test_helpers::connection::cassandra::CassandraDriver::Datastax;
use test_helpers::connection::cassandra::Compression;
Expand Down Expand Up @@ -59,6 +60,26 @@ where
timestamp::test(&connection).await;
}

async fn standard_test_suite_default_config_compatible<Fut>(
connection_creator: impl Fn() -> Fut,
driver: CassandraDriver,
) where
Fut: Future<Output = CassandraConnection>,
{
// reuse a single connection a bunch to save time recreating connections
let connection = connection_creator().await;

keyspace::test(&connection).await;
table::test(&connection).await;
udt::test(&connection).await;
native_types::test(&connection).await;
collections::test(&connection, driver).await;
prepared_statements_simple::test(&connection, connection_creator).await;
prepared_statements_all::test(&connection).await;
batch_statements::test(&connection).await;
timestamp::test(&connection).await;
}

#[template]
#[rstest]
#[case::cdrs(CdrsTokio)]
Expand Down Expand Up @@ -234,7 +255,7 @@ async fn cluster_single_rack_v4(#[case] driver: CassandraDriver) {
#[cfg_attr(feature = "cassandra-cpp-driver-tests", case::datastax(Datastax))]
#[case::scylla(Scylla)]
#[tokio::test(flavor = "multi_thread")]
async fn cluster_multi_rack(#[case] driver: CassandraDriver) {
async fn cluster_multi_rack_1_per_rack(#[case] driver: CassandraDriver) {
let _compose =
docker_compose("tests/test-configs/cassandra/cluster-multi-rack/docker-compose.yaml");

Expand Down Expand Up @@ -275,7 +296,77 @@ async fn cluster_multi_rack(#[case] driver: CassandraDriver) {
shotover_rack3.shutdown_and_then_consume_events(&[]).await;
}

cluster::multi_rack::test_topology_task(None).await;
let expected_nodes: Vec<SocketAddr> = vec![
"172.16.1.2:9042".parse().unwrap(),
"172.16.1.3:9042".parse().unwrap(),
"172.16.1.4:9042".parse().unwrap(),
];
let expected_racks = vec!["rack1", "rack2", "rack3"];
cluster::multi_rack::test_topology_task(None, expected_nodes, expected_racks, 128).await;
}

// This is very slow, only test with one driver
#[rstest]
#[case::scylla(Scylla)]
#[tokio::test(flavor = "multi_thread")]
async fn cluster_multi_rack_3_per_rack(#[case] driver: CassandraDriver) {
let _compose = docker_compose(
"tests/test-configs/cassandra/cluster-multi-rack-3-per-rack/docker-compose.yaml",
);

{
let shotover_rack1 = shotover_process(
"tests/test-configs/cassandra/cluster-multi-rack-3-per-rack/topology_rack1.yaml",
)
.with_log_name("Rack1")
.with_observability_port(9001)
.start()
.await;
let shotover_rack2 = shotover_process(
"tests/test-configs/cassandra/cluster-multi-rack-3-per-rack/topology_rack2.yaml",
)
.with_log_name("Rack2")
.with_observability_port(9002)
.start()
.await;
let shotover_rack3 = shotover_process(
"tests/test-configs/cassandra/cluster-multi-rack-3-per-rack/topology_rack3.yaml",
)
.with_log_name("Rack3")
.with_observability_port(9003)
.start()
.await;

let connection = || async {
let mut connection = CassandraConnectionBuilder::new("127.0.0.1", 9042, driver)
.build()
.await;
connection
.enable_schema_awaiter("172.16.1.2:9042", None)
.await;
connection
};
standard_test_suite_default_config_compatible(&connection, driver).await;

shotover_rack1.shutdown_and_then_consume_events(&[]).await;
shotover_rack2.shutdown_and_then_consume_events(&[]).await;
shotover_rack3.shutdown_and_then_consume_events(&[]).await;
}
let expected_nodes: Vec<SocketAddr> = vec![
"172.16.1.2:9042".parse().unwrap(),
"172.16.1.3:9042".parse().unwrap(),
"172.16.1.4:9042".parse().unwrap(),
"172.16.1.5:9042".parse().unwrap(),
"172.16.1.6:9042".parse().unwrap(),
"172.16.1.7:9042".parse().unwrap(),
"172.16.1.8:9042".parse().unwrap(),
"172.16.1.9:9042".parse().unwrap(),
"172.16.1.10:9042".parse().unwrap(),
];
let expected_racks = vec![
"rack1", "rack1", "rack1", "rack2", "rack2", "rack2", "rack3", "rack3", "rack3",
];
cluster::multi_rack::test_topology_task(None, expected_nodes, expected_racks, 16).await;
}

#[rstest]
Expand Down
29 changes: 18 additions & 11 deletions shotover-proxy/tests/cassandra_int_tests/prepared_statements_all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ fn values() -> Vec<ResultValue> {
]
}

async fn insert(connection: &CassandraConnection) {
async fn insert(connection: &CassandraConnection, replication_factor: u32) {
#[cfg(feature = "cassandra-cpp-driver-tests")]
let datastax = matches!(connection, CassandraConnection::Datastax { .. });
#[cfg(not(feature = "cassandra-cpp-driver-tests"))]
Expand All @@ -31,7 +31,7 @@ async fn insert(connection: &CassandraConnection) {
if datastax {
// workaround cassandra-cpp not yet supporting binding decimal values
let prepared = connection
.prepare("INSERT INTO test_prepare_statements_all.test (id, v0, v1, v2, v3, v4, v5, v6, v7, v8, v9, v10, v11, v12, v13) VALUES (?, ?, ?, ?, ?, 1.0, ?, ?, ?, ?, ?, ?, ?, ?, ?);")
.prepare(&format!("INSERT INTO test_prepare_statements_all{replication_factor}.test (id, v0, v1, v2, v3, v4, v5, v6, v7, v8, v9, v10, v11, v12, v13) VALUES (?, ?, ?, ?, ?, 1.0, ?, ?, ?, ?, ?, ?, ?, ?, ?);"))
.await;

assert_eq!(
Expand Down Expand Up @@ -61,7 +61,7 @@ async fn insert(connection: &CassandraConnection) {
);
} else {
let prepared = connection
.prepare("INSERT INTO test_prepare_statements_all.test (id, v0, v1, v2, v3, v4, v5, v6, v7, v8, v9, v10, v11, v12, v13) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);")
.prepare(&format!("INSERT INTO test_prepare_statements_all{replication_factor}.test (id, v0, v1, v2, v3, v4, v5, v6, v7, v8, v9, v10, v11, v12, v13) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);"))
.await;
assert_eq!(
connection.execute_prepared(&prepared, &values()).await,
Expand All @@ -70,12 +70,12 @@ async fn insert(connection: &CassandraConnection) {
}
}

async fn select(connection: &CassandraConnection) {
async fn select(connection: &CassandraConnection, replication_factor: u32) {
if let CassandraConnection::CdrsTokio { .. } = connection {
// workaround cdrs-tokio having broken encoding for bytes
assert_query_result(
connection,
"SELECT id, v0, v1, v3, v5, v6, v7, v8, v9, v10, v11, v12, v13 FROM test_prepare_statements_all.test WHERE id = 1",
&format!("SELECT id, v0, v1, v3, v5, v6, v7, v8, v9, v10, v11, v12, v13 FROM test_prepare_statements_all{replication_factor}.test WHERE id = 1"),
&[&[
ResultValue::Int(1),
ResultValue::Ascii("foo".to_owned()),
Expand All @@ -98,21 +98,28 @@ async fn select(connection: &CassandraConnection) {
} else {
assert_query_result(
connection,
"SELECT id, v0, v1, v2, v3, v4, v5, v6, v7, v8, v9, v10, v11, v12, v13 FROM test_prepare_statements_all.test WHERE id = 1",
&format!("SELECT id, v0, v1, v2, v3, v4, v5, v6, v7, v8, v9, v10, v11, v12, v13 FROM test_prepare_statements_all{replication_factor}.test WHERE id = 1"),
&[&values()],
)
.await;
}
}

pub async fn test(connection: &CassandraConnection) {
run_query(connection, "CREATE KEYSPACE test_prepare_statements_all WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };").await;
async fn setup(connection: &CassandraConnection, replication_factor: u32) {
run_query(connection, &format!("CREATE KEYSPACE test_prepare_statements_all{replication_factor} WITH REPLICATION = {{ 'class' : 'NetworkTopologyStrategy', 'replication_factor' : {replication_factor} }};")).await;
run_query(
connection,
"CREATE TABLE test_prepare_statements_all.test (id int PRIMARY KEY, v0 ascii, v1 bigint, v2 blob, v3 boolean, v4 decimal, v5 double, v6 float, v7 timestamp, v8 uuid, v9 inet, v10 date, v11 time, v12 smallint, v13 tinyint);",
&format!("CREATE TABLE test_prepare_statements_all{replication_factor}.test (id int PRIMARY KEY, v0 ascii, v1 bigint, v2 blob, v3 boolean, v4 decimal, v5 double, v6 float, v7 timestamp, v8 uuid, v9 inet, v10 date, v11 time, v12 smallint, v13 tinyint);"),
)
.await;
}

pub async fn test(connection: &CassandraConnection) {
setup(connection, 1).await;
insert(connection, 1).await;
select(connection, 1).await;

insert(connection).await;
select(connection).await;
setup(connection, 3).await;
insert(connection, 3).await;
select(connection, 3).await;
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ use test_helpers::connection::cassandra::{
assert_query_result, assert_rows, run_query, CassandraConnection, ResultValue,
};

async fn delete(session: &CassandraConnection) {
async fn delete(session: &CassandraConnection, replication_factor: u32) {
let prepared = session
.prepare("DELETE FROM test_prepare_statements.table_1 WHERE id = ?;")
.prepare(&format!(
"DELETE FROM test_prepare_statements{replication_factor}.table_1 WHERE id = ?;"
))
.await;

assert_eq!(
Expand All @@ -17,15 +19,17 @@ async fn delete(session: &CassandraConnection) {

assert_query_result(
session,
"SELECT * FROM test_prepare_statements.table_1 where id = 1;",
&format!("SELECT * FROM test_prepare_statements{replication_factor}.table_1 where id = 1;"),
&[],
)
.await;
}

async fn insert(session: &CassandraConnection) {
async fn insert(session: &CassandraConnection, replication_factor: u32) {
let prepared = session
.prepare("INSERT INTO test_prepare_statements.table_1 (id) VALUES (?);")
.prepare(&format!(
"INSERT INTO test_prepare_statements{replication_factor}.table_1 (id) VALUES (?);"
))
.await;

assert_eq!(
Expand All @@ -50,9 +54,11 @@ async fn insert(session: &CassandraConnection) {
);
}

async fn select(session: &CassandraConnection) {
async fn select(session: &CassandraConnection, replication_factor: u32) {
let prepared = session
.prepare("SELECT id FROM test_prepare_statements.table_1 WHERE id = ?")
.prepare(&format!(
"SELECT id FROM test_prepare_statements{replication_factor}.table_1 WHERE id = ?"
))
.await;

let result_rows = session
Expand All @@ -65,14 +71,17 @@ async fn select(session: &CassandraConnection) {
async fn select_cross_connection<Fut>(
connection: &CassandraConnection,
connection_creator: impl Fn() -> Fut,
replication_factor: u32,
) where
Fut: Future<Output = CassandraConnection>,
{
let connection_before = connection_creator().await;

// query is purposely slightly different to past queries to avoid being cached
let prepared = connection
.prepare("SELECT id, id FROM test_prepare_statements.table_1 WHERE id = ?")
.prepare(&format!(
"SELECT id, id FROM test_prepare_statements{replication_factor}.table_1 WHERE id = ?"
))
.await;

let connection_after = connection_creator().await;
Expand All @@ -93,9 +102,13 @@ async fn select_cross_connection<Fut>(
);
}

async fn use_statement(session: &CassandraConnection) {
async fn use_statement(session: &CassandraConnection, replication_factor: u32) {
// Create prepared command with the correct keyspace
run_query(session, "USE test_prepare_statements;").await;
run_query(
session,
&format!("USE test_prepare_statements{replication_factor};"),
)
.await;
let prepared = session
.prepare("INSERT INTO table_1 (id) VALUES (?);")
.await;
Expand All @@ -109,7 +122,11 @@ async fn use_statement(session: &CassandraConnection) {
);

// change the keyspace to be incorrect
run_query(session, "USE test_prepare_statements_empty;").await;
run_query(
session,
&format!("USE test_prepare_statements_empty{replication_factor};"),
)
.await;

// observe that the query succeeded despite the keyspace being incorrect at the time.
assert_eq!(
Expand All @@ -120,25 +137,35 @@ async fn use_statement(session: &CassandraConnection) {
);
}

pub async fn test<Fut>(session: &CassandraConnection, connection_creator: impl Fn() -> Fut)
where
Fut: Future<Output = CassandraConnection>,
{
run_query(session, "CREATE KEYSPACE test_prepare_statements WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };").await;
run_query(session, "CREATE KEYSPACE test_prepare_statements_empty WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };").await;
async fn setup(session: &CassandraConnection, replication_factor: u32) {
run_query(session, &format!("CREATE KEYSPACE test_prepare_statements{replication_factor} WITH REPLICATION = {{ 'class' : 'NetworkTopologyStrategy', 'replication_factor' : {replication_factor} }};")).await;
run_query(session, &format!("CREATE KEYSPACE test_prepare_statements_empty{replication_factor} WITH REPLICATION = {{ 'class' : 'NetworkTopologyStrategy', 'replication_factor' : {replication_factor} }};")).await;
run_query(
session,
"CREATE TABLE test_prepare_statements.table_1 (id int PRIMARY KEY);",
&format!("CREATE TABLE test_prepare_statements{replication_factor}.table_1 (id int PRIMARY KEY);"),
)
.await;
}

insert(session).await;
select(session).await;
select_cross_connection(session, connection_creator).await;
delete(session).await;
use_statement(session).await;
pub async fn test<Fut>(session: &CassandraConnection, connection_creator: impl Fn() -> Fut)
where
Fut: Future<Output = CassandraConnection>,
{
setup(session, 1).await;
insert(session, 1).await;
select(session, 1).await;
select_cross_connection(session, &connection_creator, 1).await;
delete(session, 1).await;
use_statement(session, 1).await;

let cql = "SELECT * FROM system.local WHERE key = 'local'";
let prepared = session.prepare(cql).await;
session.execute_prepared(&prepared, &[]).await.unwrap();

setup(session, 3).await;
insert(session, 3).await;
select(session, 3).await;
select_cross_connection(session, &connection_creator, 3).await;
delete(session, 3).await;
use_statement(session, 3).await;
}
Loading

0 comments on commit fb18f7e

Please sign in to comment.