Skip to content

Commit

Permalink
feat: re-export used packages (google#371)
Browse files Browse the repository at this point in the history
## Problem
Library users might get stuck with or ran into issues while using tarpc because of incompatible third party libraries. in particular, tokio_serde and tokio_util.

## Solution
This PR does the following:

1. re-export tokio_serde as part of feature serde-transport, because the end user imports it to use some serde-transport APIs.
2. Update third library packages to latest release and fix resulting issues from that.

## Important Notes
tokio_util 7.3 DelayQueue::poll_expired API changed [0] therefore, InFlightRequests::poll_expired now returns Poll<Option<u64>>

[0] https://docs.rs/tokio-util/latest/tokio_util/time/delay_queue/struct.DelayQueue.html#method.poll_expired
  • Loading branch information
kkharji authored Jul 15, 2022
1 parent 4c8ba41 commit 0e10228
Show file tree
Hide file tree
Showing 10 changed files with 22 additions and 34 deletions.
8 changes: 4 additions & 4 deletions example-service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ anyhow = "1.0"
clap = { version = "3.0.0-rc.9", features = ["derive"] }
log = "0.4"
futures = "0.3"
opentelemetry = { version = "0.16", features = ["rt-tokio"] }
opentelemetry-jaeger = { version = "0.15", features = ["rt-tokio"] }
opentelemetry = { version = "0.17", features = ["rt-tokio"] }
opentelemetry-jaeger = { version = "0.16", features = ["rt-tokio"] }
rand = "0.8"
tarpc = { version = "0.29", path = "../tarpc", features = ["full"] }
tokio = { version = "1", features = ["macros", "net", "rt-multi-thread"] }
tracing = { version = "0.1" }
tracing-opentelemetry = "0.15"
tracing-subscriber = "0.2"
tracing-opentelemetry = "0.17"
tracing-subscriber = {version = "0.3", features = ["env-filter"]}

[lib]
name = "service"
Expand Down
2 changes: 1 addition & 1 deletion tarpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ static_assertions = "1.1.0"
tarpc-plugins = { path = "../plugins", version = "0.12" }
thiserror = "1.0"
tokio = { version = "1", features = ["time"] }
tokio-util = { version = "0.6.9", features = ["time"] }
tokio-util = { version = "0.7.3", features = ["time"] }
tokio-serde = { optional = true, version = "0.8" }
tracing = { version = "0.1", default-features = false, features = [
"attributes",
Expand Down
5 changes: 3 additions & 2 deletions tarpc/examples/custom_transport.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use tarpc::context::Context;
use tarpc::serde_transport as transport;
use tarpc::server::{BaseChannel, Channel};
use tarpc::{context::Context, tokio_serde::formats::Bincode};
use tarpc::tokio_serde::formats::Bincode;
use tarpc::tokio_util::codec::length_delimited::LengthDelimitedCodec;
use tokio::net::{UnixListener, UnixStream};
use tokio_util::codec::length_delimited::LengthDelimitedCodec;

#[tarpc::service]
pub trait PingService {
Expand Down
2 changes: 1 addition & 1 deletion tarpc/examples/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ use tarpc::{
client, context,
serde_transport::tcp,
server::{self, Channel},
tokio_serde::formats::Json,
};
use tokio::net::ToSocketAddrs;
use tokio_serde::formats::Json;
use tracing::info;
use tracing_subscriber::prelude::*;

Expand Down
2 changes: 1 addition & 1 deletion tarpc/examples/tracing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use futures::{future, prelude::*};
use tarpc::{
client, context,
server::{incoming::Incoming, BaseChannel},
tokio_serde::formats::Json,
};
use tokio_serde::formats::Json;
use tracing_subscriber::prelude::*;

pub mod add {
Expand Down
6 changes: 1 addition & 5 deletions tarpc/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,11 +395,7 @@ where
// Receiving Poll::Ready(None) when polling expired requests never indicates "Closed",
// because there can temporarily be zero in-flight rquests. Therefore, there is no need to
// track the status like is done with pending and cancelled requests.
if let Poll::Ready(Some(_)) = self
.in_flight_requests()
.poll_expired(cx)
.map_err(ChannelError::Timer)?
{
if let Poll::Ready(Some(_)) = self.in_flight_requests().poll_expired(cx) {
// Expired requests are considered complete; there is no compelling reason to send a
// cancellation message to the server, since it will have already exhausted its
// allotted processing time.
Expand Down
11 changes: 4 additions & 7 deletions tarpc/src/client/in_flight_requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,9 @@ impl<Resp> InFlightRequests<Resp> {

/// Yields a request that has expired, completing it with a TimedOut error.
/// The caller should send cancellation messages for any yielded request ID.
pub fn poll_expired(
&mut self,
cx: &mut Context,
) -> Poll<Option<Result<u64, tokio::time::error::Error>>> {
self.deadlines.poll_expired(cx).map_ok(|expired| {
let request_id = expired.into_inner();
pub fn poll_expired(&mut self, cx: &mut Context) -> Poll<Option<u64>> {
self.deadlines.poll_expired(cx).map(|expired| {
let request_id = expired?.into_inner();
if let Some(request_data) = self.request_data.remove(&request_id) {
let _entered = request_data.span.enter();
tracing::error!("DeadlineExceeded");
Expand All @@ -131,7 +128,7 @@ impl<Resp> InFlightRequests<Resp> {
.response_completion
.send(Err(DeadlineExceededError));
}
request_id
Some(request_id)
})
}
}
2 changes: 1 addition & 1 deletion tarpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@
pub use serde;

#[cfg(feature = "serde-transport")]
pub use tokio_serde;
pub use {tokio_serde, tokio_util};

#[cfg(feature = "serde-transport")]
#[cfg_attr(docsrs, doc(cfg(feature = "serde-transport")))]
Expand Down
6 changes: 1 addition & 5 deletions tarpc/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,11 +393,7 @@ where
Poll::Pending | Poll::Ready(None) => Closed,
};

let expiration_status = match self
.in_flight_requests_mut()
.poll_expired(cx)
.map_err(ChannelError::Timer)?
{
let expiration_status = match self.in_flight_requests_mut().poll_expired(cx) {
// No need to send a response, since the client wouldn't be waiting for one
// anymore.
Poll::Ready(Some(_)) => Ready,
Expand Down
12 changes: 5 additions & 7 deletions tarpc/src/server/in_flight_requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,16 +94,14 @@ impl InFlightRequests {
}

/// Yields a request that has expired, aborting any ongoing processing of that request.
pub fn poll_expired(
&mut self,
cx: &mut Context,
) -> Poll<Option<Result<u64, tokio::time::error::Error>>> {
pub fn poll_expired(&mut self, cx: &mut Context) -> Poll<Option<u64>> {
if self.deadlines.is_empty() {
// TODO(https://github.com/tokio-rs/tokio/issues/4161)
// This is a workaround for DelayQueue not always treating this case correctly.
return Poll::Ready(None);
}
self.deadlines.poll_expired(cx).map_ok(|expired| {
self.deadlines.poll_expired(cx).map(|expired| {
let expired = expired?;
if let Some(RequestData {
abort_handle, span, ..
}) = self.request_data.remove(expired.get_ref())
Expand All @@ -113,7 +111,7 @@ impl InFlightRequests {
abort_handle.abort();
tracing::error!("DeadlineExceeded");
}
expired.into_inner()
Some(expired.into_inner())
})
}
}
Expand Down Expand Up @@ -161,7 +159,7 @@ mod tests {

assert_matches!(
in_flight_requests.poll_expired(&mut noop_context()),
Poll::Ready(Some(Ok(_)))
Poll::Ready(Some(_))
);
assert_matches!(
abortable_future.poll_unpin(&mut noop_context()),
Expand Down

0 comments on commit 0e10228

Please sign in to comment.