Skip to content

Commit

Permalink
SE: Regression test for the panic
Browse files Browse the repository at this point in the history
  • Loading branch information
Lorak-mmk committed Oct 10, 2024
1 parent c5fb18d commit 630f8c3
Showing 1 changed file with 68 additions and 0 deletions.
68 changes: 68 additions & 0 deletions scylla/tests/integration/retries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,3 +164,71 @@ async fn retries_occur() {
Err(err) => panic!("{}", err),
}
}

// See https://github.com/scylladb/scylla-rust-driver/issues/1085
#[tokio::test]
#[ntest::timeout(30000)]
#[cfg(not(scylla_cloud_tests))]
async fn speculative_execution_panic_regression_test() {
use scylla_proxy::RunningProxy;

setup_tracing();
let test = |proxy_uris: [String; 3], translation_map, mut running_proxy: RunningProxy| async move {
let se = SimpleSpeculativeExecutionPolicy {
max_retry_count: 2,
retry_interval: Duration::from_millis(1),
};
let profile = ExecutionProfile::builder()
.speculative_execution_policy(Some(Arc::new(se)))
.retry_policy(Box::new(FallthroughRetryPolicy))
.build();
// DB preparation phase
let session: Session = SessionBuilder::new()
.known_node(proxy_uris[0].as_str())
.address_translator(Arc::new(translation_map))
.default_execution_profile_handle(profile.into_handle())
.build()
.await
.unwrap();

let ks = unique_keyspace_name();
session.query_unpaged(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}}", ks), &[]).await.unwrap();
session.use_keyspace(ks, false).await.unwrap();
session
.query_unpaged("CREATE TABLE t (a int primary key)", &[])
.await
.unwrap();

let mut q = session
.prepare("INSERT INTO t (a) VALUES (?)")
.await
.unwrap();
q.set_is_idempotent(true); // this is to allow speculative execution to fire
let id: &[u8] = q.get_id();

let drop_connection_on_execute = RequestRule(
Condition::RequestOpcode(RequestOpcode::Execute)
.and(Condition::not(Condition::ConnectionRegisteredAnyEvent))
.and(Condition::BodyContainsCaseSensitive(id.into())),
RequestReaction::drop_connection(),
);

running_proxy.running_nodes[0]
.change_request_rules(Some(vec![drop_connection_on_execute.clone()]));
running_proxy.running_nodes[1]
.change_request_rules(Some(vec![drop_connection_on_execute.clone()]));
running_proxy.running_nodes[2]
.change_request_rules(Some(vec![drop_connection_on_execute.clone()]));

let _result = session.execute_unpaged(&q, (2,)).await.unwrap_err();

running_proxy
};
let res = test_with_3_node_cluster(ShardAwareness::QueryNode, test).await;

match res {
Ok(()) => (),
Err(ProxyError::Worker(WorkerError::DriverDisconnected(_))) => (),
Err(err) => panic!("{}", err),
}
}

0 comments on commit 630f8c3

Please sign in to comment.