Skip to content

Commit

Permalink
Add timeouts to standalone pipeline requests. (#706)
Browse files Browse the repository at this point in the history
Co-authored-by: nihohit <[email protected]>
  • Loading branch information
shachlanAmazon and nihohit authored Dec 20, 2023
1 parent 536df38 commit 50a944a
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 6 deletions.
9 changes: 3 additions & 6 deletions babushka-core/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ impl Client {
count: usize,
routing: Option<RoutingInfo>,
) -> redis::RedisFuture<'a, Vec<redis::Value>> {
(async move {
run_with_timeout(self.request_timeout, async move {
match self.internal_client {
ClientWrapper::Standalone(ref mut client) => {
client.send_packed_commands(cmd, offset, count).await
Expand All @@ -146,11 +146,8 @@ impl Client {
Some(RoutingInfo::SingleNode(route)) => route,
_ => SingleNodeRoutingInfo::Random,
};
run_with_timeout(
self.request_timeout,
client.route_pipeline(cmd, offset, count, route),
)
.await

client.route_pipeline(cmd, offset, count, route).await
}
}
})
Expand Down
26 changes: 26 additions & 0 deletions babushka-core/tests/test_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,4 +257,30 @@ mod shared_client_tests {
assert!(err.is_timeout(), "{err}");
});
}

#[rstest]
#[timeout(SHORT_CLUSTER_TEST_TIMEOUT)]
fn test_request_pipeline_timeout(#[values(false, true)] use_cluster: bool) {
block_on_all(async {
let mut test_basics = setup_test_basics(
use_cluster,
TestConfiguration {
request_timeout: Some(1),
shared_server: true,
..Default::default()
},
)
.await;

let mut pipeline = redis::pipe();
pipeline.blpop("foo", 0.0); // 0 timeout blocks indefinitely
let result = test_basics
.client
.req_packed_commands(&pipeline, 0, 1, None)
.await;
assert!(result.is_err());
let err = result.unwrap_err();
assert!(err.is_timeout(), "{err}");
});
}
}

0 comments on commit 50a944a

Please sign in to comment.