Skip to content

Commit

Permalink
Do not retry requests that have been dropped by the user.
Browse files Browse the repository at this point in the history
  • Loading branch information
nihohit committed Sep 9, 2024
1 parent 426bb99 commit 18d2ace
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 2 deletions.
4 changes: 3 additions & 1 deletion redis/src/cluster_async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -835,7 +835,9 @@ impl<C> Future for Request<C> {

fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context) -> Poll<Self::Output> {
let mut this = self.as_mut().project();
if this.request.is_none() {
// If the sender is closed, the caller is no longer waiting for the reply, and it is ambiguous
// whether they expect the side-effect of the request to happen or not.
if this.request.is_none() || this.request.as_ref().unwrap().sender.is_closed() {
return Poll::Ready(Next::Done);
}
let future = match this.future.as_mut().project() {
Expand Down
45 changes: 44 additions & 1 deletion redis/tests/test_cluster_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ mod cluster_async {
};

use futures::prelude::*;
use futures_time::task::sleep;
use futures_time::{future::FutureExt, task::sleep};
use once_cell::sync::Lazy;
use std::ops::Add;

Expand Down Expand Up @@ -4142,6 +4142,49 @@ mod cluster_async {
.unwrap();
}

#[test]
fn test_async_cluster_do_not_retry_when_receiver_was_dropped() {
let name = "test_async_cluster_do_not_retry_when_receiver_was_dropped";
let cmd = cmd("FAKE_COMMAND");
let packed_cmd = cmd.get_packed_command();
let request_counter = Arc::new(AtomicU32::new(0));
let cloned_req_counter = request_counter.clone();
let MockEnv {
runtime,
async_connection: mut connection,
..
} = MockEnv::with_client_builder(
ClusterClient::builder(vec![&*format!("redis://{name}")])
.retries(5)
.retry_wait_formula(1, 1)
.min_retry_wait(2),
name,
move |received_cmd: &[u8], _| {
respond_startup(name, received_cmd)?;

if received_cmd == packed_cmd {
cloned_req_counter.fetch_add(1, Ordering::Relaxed);
return Err(Err((ErrorKind::TryAgain, "seriously, try again").into()));
}

Err(Ok(Value::Okay))
},
);

runtime.block_on(async move {
cmd.query_async::<_, Value>(&mut connection)
.timeout(futures_time::time::Duration::from_millis(1))
.await
.unwrap_err();
// we sleep here, to allow the cluster connection time to retry. We expect it won't, but without this
// sleep the test will complete before the the runtime gave the connection time to retry, which would've made the
// test pass regardless of whether the connection tries retrying or not.
sleep(Duration::from_millis(10).into()).await;
});

assert_eq!(request_counter.load(Ordering::Relaxed), 1);
}

#[cfg(feature = "tls-rustls")]
mod mtls_test {
use crate::support::mtls_test::create_cluster_client_from_cluster;
Expand Down

0 comments on commit 18d2ace

Please sign in to comment.