diff --git a/scylla/src/transport/speculative_execution.rs b/scylla/src/transport/speculative_execution.rs index ecc9d02c7..10b60c4b7 100644 --- a/scylla/src/transport/speculative_execution.rs +++ b/scylla/src/transport/speculative_execution.rs @@ -127,21 +127,17 @@ where } } res = async_tasks.select_next_some() => { - match res { - Some(r) => { - if !can_be_ignored(&r) { - return r; - } else { - last_error = Some(r) - } - }, - None => { - if async_tasks.is_empty() && retries_remaining == 0 { - return last_error.unwrap_or({ - Err(EMPTY_PLAN_ERROR) - }); - } - }, + if let Some(r) = res { + if !can_be_ignored(&r) { + return r; + } else { + last_error = Some(r) + } + } + if async_tasks.is_empty() && retries_remaining == 0 { + return last_error.unwrap_or({ + Err(EMPTY_PLAN_ERROR) + }); } } } diff --git a/scylla/tests/integration/retries.rs b/scylla/tests/integration/retries.rs index 92bf1613c..8aca5d197 100644 --- a/scylla/tests/integration/retries.rs +++ b/scylla/tests/integration/retries.rs @@ -164,3 +164,71 @@ async fn retries_occur() { Err(err) => panic!("{}", err), } } + +// See https://github.com/scylladb/scylla-rust-driver/issues/1085 +#[tokio::test] +#[ntest::timeout(30000)] +#[cfg(not(scylla_cloud_tests))] +async fn speculative_execution_panic_regression_test() { + use scylla_proxy::RunningProxy; + + setup_tracing(); + let test = |proxy_uris: [String; 3], translation_map, mut running_proxy: RunningProxy| async move { + let se = SimpleSpeculativeExecutionPolicy { + max_retry_count: 2, + retry_interval: Duration::from_millis(1), + }; + let profile = ExecutionProfile::builder() + .speculative_execution_policy(Some(Arc::new(se))) + .retry_policy(Box::new(FallthroughRetryPolicy)) + .build(); + // DB preparation phase + let session: Session = SessionBuilder::new() + .known_node(proxy_uris[0].as_str()) + .address_translator(Arc::new(translation_map)) + .default_execution_profile_handle(profile.into_handle()) + .build() + .await + .unwrap(); + + let ks = unique_keyspace_name(); + session.query_unpaged(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}}", ks), &[]).await.unwrap(); + session.use_keyspace(ks, false).await.unwrap(); + session + .query_unpaged("CREATE TABLE t (a int primary key)", &[]) + .await + .unwrap(); + + let mut q = session + .prepare("INSERT INTO t (a) VALUES (?)") + .await + .unwrap(); + q.set_is_idempotent(true); // this is to allow speculative execution to fire + let id: &[u8] = q.get_id(); + + let drop_connection_on_execute = RequestRule( + Condition::RequestOpcode(RequestOpcode::Execute) + .and(Condition::not(Condition::ConnectionRegisteredAnyEvent)) + .and(Condition::BodyContainsCaseSensitive(id.into())), + RequestReaction::drop_connection(), + ); + + running_proxy.running_nodes[0] + .change_request_rules(Some(vec![drop_connection_on_execute.clone()])); + running_proxy.running_nodes[1] + .change_request_rules(Some(vec![drop_connection_on_execute.clone()])); + running_proxy.running_nodes[2] + .change_request_rules(Some(vec![drop_connection_on_execute.clone()])); + + let _result = session.execute_unpaged(&q, (2,)).await.unwrap_err(); + + running_proxy + }; + let res = test_with_3_node_cluster(ShardAwareness::QueryNode, test).await; + + match res { + Ok(()) => (), + Err(ProxyError::Worker(WorkerError::DriverDisconnected(_))) => (), + Err(err) => panic!("{}", err), + } +}