Skip to content

Commit

Permalink
Merge branch 'main' into controlled_shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Nov 1, 2024
2 parents 50040c6 + 372186d commit 225048d
Show file tree
Hide file tree
Showing 8 changed files with 131 additions and 10 deletions.
31 changes: 31 additions & 0 deletions shotover-proxy/tests/cassandra_int_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,37 @@ async fn cluster_multi_rack_1_per_rack(#[case] driver: CassandraDriver) {
cluster::multi_rack::test_topology_task(None, expected_nodes, 128).await;
}

#[tokio::test(flavor = "multi_thread")]
async fn cluster_multi_rack_1_per_rack_go_smoke_test() {
let _compose =
docker_compose("tests/test-configs/cassandra/cluster-multi-rack/docker-compose.yaml");

let shotover_rack1 =
shotover_process("tests/test-configs/cassandra/cluster-multi-rack/topology_rack1.yaml")
.with_log_name("Rack1")
.with_config("tests/test-configs/shotover-config/config1.yaml")
.start()
.await;
let shotover_rack2 =
shotover_process("tests/test-configs/cassandra/cluster-multi-rack/topology_rack2.yaml")
.with_log_name("Rack2")
.with_config("tests/test-configs/shotover-config/config2.yaml")
.start()
.await;
let shotover_rack3 =
shotover_process("tests/test-configs/cassandra/cluster-multi-rack/topology_rack3.yaml")
.with_log_name("Rack3")
.with_config("tests/test-configs/shotover-config/config3.yaml")
.start()
.await;

test_helpers::connection::cassandra::go::run_go_smoke_test().await;

shotover_rack1.shutdown_and_then_consume_events(&[]).await;
shotover_rack2.shutdown_and_then_consume_events(&[]).await;
shotover_rack3.shutdown_and_then_consume_events(&[]).await;
}

// This is very slow, only test with one driver
// We previously had this at 3 per rack but this was too much for the github actions runners and resulted in intermittent failures.
#[rstest]
Expand Down
28 changes: 20 additions & 8 deletions shotover-proxy/tests/redis_int_tests/basic_driver_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1006,14 +1006,22 @@ async fn get_master_id(connection: &mut Connection) -> String {
}

async fn is_cluster_replicas_ready(connection: &mut Connection, master_id: &str) -> bool {
let res = redis::cmd("CLUSTER")
tracing::debug!("Checking `CLUSTER REPLICAS {master_id}`");
let res = match redis::cmd("CLUSTER")
.arg("REPLICAS")
.arg(master_id)
.query_async(connection)
.await
.unwrap();
{
Ok(t) => t,
Err(e) => {
tracing::debug!("CLUSTER REPLICAS failed with {e:?}, retrying");
return false;
}
};
if let Value::Bulk(data) = res {
if let Some(Value::Data(data)) = data.first() {
tracing::debug!("CLUSTER REPLICAS returned [], retrying");
return !data.is_empty();
}
}
Expand All @@ -1030,17 +1038,21 @@ pub async fn test_cluster_ports_rewrite_nodes(connection: &mut Connection, new_p
assert_cluster_ports_rewrite_nodes(res, new_port);

// Get an id to use for cluster replicas test
let master_id = get_master_id(connection).await;
let mut master_id = get_master_id(connection).await;

let mut tries = 0;
let instant = std::time::Instant::now();
while !is_cluster_replicas_ready(connection, &master_id).await {
std::thread::sleep(std::time::Duration::from_millis(10));
tries += 1;
if tries > 500 {
tokio::time::sleep(std::time::Duration::from_millis(10)).await;

if instant.elapsed() > Duration::from_secs(30) {
// Log we ran out of retries but let the following "CLUSTER REPLICAS" command give a more specific panic message
tracing::error!("CLUSTER REPLICAS never became ready");
panic!("CLUSTER REPLICAS never became ready");
}

// refetch master ID, sometimes redis incorrectly reports slave nodes as master nodes during startup.
master_id = get_master_id(connection).await;
}
tracing::info!("CLUSTER REPLICAS is ready after {:?}", instant.elapsed());

let res = redis::cmd("CLUSTER")
.arg("REPLICAS")
Expand Down
10 changes: 8 additions & 2 deletions shotover/src/transforms/cassandra/sink_cluster/rewrite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,12 @@ impl MessageRewriter {
CassandraOperation::Result(CassandraResult::Prepared(prepared)),
..
})) => Some(prepared),
// ignore errors here. In the case of all errors, one of them will be returned to the client later on.
Some(Frame::Cassandra(CassandraFrame {
operation:
CassandraOperation::Error(_),
..
})) => None,
other => {
tracing::error!("Response to Prepare query was not a Prepared, was instead: {other:?}");
warnings.push(format!("Shotover: Response to Prepare query was not a Prepared, was instead: {other:?}"));
Expand All @@ -361,8 +367,8 @@ impl MessageRewriter {
});

tracing::error!(
"Nodes did not return the same response to PREPARE statement {err_str}"
);
"Nodes did not return the same response to PREPARE statement {err_str}"
);
warnings.push(format!(
"Shotover: Nodes did not return the same response to PREPARE statement {err_str}"
));
Expand Down
12 changes: 12 additions & 0 deletions test-helpers/src/connection/cassandra/go.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
use crate::run_command_async;
use std::{path::Path, time::Duration};

pub async fn run_go_smoke_test() {
let project_dir = Path::new(env!("CARGO_MANIFEST_DIR")).join("src/connection/cassandra/go");
tokio::time::timeout(
Duration::from_secs(60),
run_command_async(&project_dir, "go", &["run", "basic.go"]),
)
.await
.unwrap();
}
32 changes: 32 additions & 0 deletions test-helpers/src/connection/cassandra/go/basic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package main

import (
"fmt"

"github.com/gocql/gocql"
)

func main() {
fmt.Println("Starting go smoke test")

dc := "datacenter1"
cluster := gocql.NewCluster("127.0.0.1:9042")
cluster.Keyspace = "system"
cluster.RetryPolicy = &gocql.ExponentialBackoffRetryPolicy{NumRetries: 5}
cluster.Consistency = gocql.LocalQuorum
cluster.SerialConsistency = gocql.LocalSerial
cluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy(gocql.DCAwareRoundRobinPolicy(dc))
cluster.HostFilter = gocql.DataCentreHostFilter(dc)
cluster.WriteCoalesceWaitTime = 0
cluster.NumConns = 4
cluster.ProtoVersion = 4

session, err := cluster.CreateSession()
if err != nil {
panic(err)
}

session.Close()

fmt.Println("Success!")
}
10 changes: 10 additions & 0 deletions test-helpers/src/connection/cassandra/go/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
module example/basic

go 1.20

require (
github.com/gocql/gocql v1.7.0 // indirect
github.com/golang/snappy v0.0.3 // indirect
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
)
17 changes: 17 additions & 0 deletions test-helpers/src/connection/cassandra/go/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932/go.mod h1:NOuUCSz6Q9T7+igc/hlvDOUdtWKryOrtFyIVABv/p7k=
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/gocql/gocql v1.7.0 h1:O+7U7/1gSN7QTEAaMEsJc1Oq2QHXvCWoF3DFK9HDHus=
github.com/gocql/gocql v1.7.0/go.mod h1:vnlvXyFZeLBF0Wy+RS8hrOdbn0UWsWtdg07XJnFxZ+4=
github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA=
github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed h1:5upAirOpQc1Q53c0bnx2ufif5kANL7bfZWcc6VJWJd8=
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed/go.mod h1:tMWxXQ9wFIaZeTI9F+hmhFiGpFmhOHzyShyFUhRm0H4=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc=
gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
1 change: 1 addition & 0 deletions test-helpers/src/connection/cassandra/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod connection;
pub mod cql_ws;
pub mod go;
pub mod result_value;

pub use connection::{
Expand Down

0 comments on commit 225048d

Please sign in to comment.