Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test jsonrpsee v0.20 #691

Closed
wants to merge 12 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 14 additions & 21 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ url = { version = "2.0.0", optional = true }

# websocket dependent features
futures = { version = "0.3", optional = true }
jsonrpsee = { version = "0.16", optional = true, features = ["async-client", "client-ws-transport", "jsonrpsee-types"] }
jsonrpsee = { version = "0.20", optional = true, features = ["async-client", "client-ws-transport-native-tls", "jsonrpsee-types"] }
tokio = { version = "1.34", optional = true }
tungstenite = { version = "0.21", optional = true, features = ["native-tls"] }
ws = { version = "0.9.2", optional = true, features = ["ssl"] }

Expand Down Expand Up @@ -80,7 +81,7 @@ sync-api = ["ac-compose-macros/sync-api", "maybe-async/is_sync"]
# Use the `jsonrpsee` crate for websocket communication. Does provide sync and async support but needs a tokio runtime.
# Provides convenience functions such as subscription callbacks.
# Most examples use the `jsonrpsee` feature and can be used for reference.
jsonrpsee-client = ["std", "jsonrpsee", "futures"]
jsonrpsee-client = ["std", "jsonrpsee", "futures", "tokio"]

# Use the `tungstenite` crate for websocket communication. No async support but has some reconnection capabilities.
# See the example `transfer_with_tungstenite_client` on how to use it.
Expand Down
40 changes: 34 additions & 6 deletions src/rpc/jsonrpsee_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,16 @@
use crate::rpc::{Error, Request, Result, RpcParams, Subscribe};
use futures::executor::block_on;
use jsonrpsee::{
client_transport::ws::{Uri, WsTransportClientBuilder},
client_transport::ws::{Url, WsTransportClientBuilder},
core::{
client::{Client, ClientBuilder, ClientT, SubscriptionClientT},
traits::ToRpcParams,
},
};
use serde::de::DeserializeOwned;
use serde_json::value::RawValue;
use serde_json::{value::RawValue, Value};
use std::sync::Arc;
use tokio::runtime::Handle;

pub use subscription::SubscriptionWrapper;

Expand All @@ -43,13 +44,13 @@ impl JsonrpseeClient {
}

pub async fn async_new(url: &str) -> Result<Self> {
let uri: Uri = url.parse().map_err(|e| Error::Client(Box::new(e)))?;
let uri: Url = url.parse().map_err(|e| Error::Client(Box::new(e)))?;
let (tx, rx) = WsTransportClientBuilder::default()
.build(uri)
.await
.map_err(|e| Error::Client(Box::new(e)))?;
let client = ClientBuilder::default()
.max_notifs_per_subscription(4096)
.max_buffer_capacity_per_subscription(4096)
.build_with_tokio(tx, rx);
Ok(Self { inner: Arc::new(client) })
}
Expand All @@ -64,11 +65,38 @@ impl Request for JsonrpseeClient {
.map_err(|e| Error::Client(Box::new(e)))
}
}

#[maybe_async::sync_impl]
impl Request for JsonrpseeClient {
fn request<R: DeserializeOwned>(&self, method: &str, params: RpcParams) -> Result<R> {
block_on(self.inner.request(method, RpcParamsWrapper(params)))
.map_err(|e| Error::Client(Box::new(e)))
let handle = match Handle::try_current() {
Ok(handle) => handle,
Err(_) => {
// We are not inside a tokio runtime, so lets start one.
let rt =
tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
rt.handle().clone()
},
};

let client = self.inner.clone();
let method_string = method.to_string();

// The inner jsonrpsee client must not deserialize to the `R` value, because the return value must
// implement `Send`. But we do not want to enforce the `R` value to implement this solely because we need
// to de-async something. Therefore, the deserialization must happen outside of the newly spawned thread.
// We need to spawn a new thread because tokio does not allow the blocking of the main thread:
// ERROR: Cannot block the current thread from within a runtime.
// This happens because a function attempted to block the current thread while the thread is being used to drive asynchronous tasks.
let answer: Value = std::thread::spawn(move || {
handle.block_on(client.request(&method_string, RpcParamsWrapper(params)))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can't use block_on on tokio multi-thread it will panic.

The best would would be to change this to:

#[maybe_async::sync_impl]
impl Request for JsonrpseeClient {
	fn request<serde_json::Value>(&self, method: &str, params: RpcParams) -> Result<R> {
                let (tx, rx) = tokio::sync::oneshot();
 
                tokio::spawn(async move {
                    let res = self.inner.request(method, RpcParamsWrapper(params))).await.map_err(|e| Error::Client(Box::new(e)));
                    tx.send(res);
                });

                let rx = rx.blocking_recv().unwrap();
		let deserialized_value: R = serde_json::from_value(rx)?;
		Ok(deserialized_value)
	}
}

Otherwise, you need to create a single threaded tokio-runtime https://docs.rs/tokio/latest/tokio/runtime/struct.Builder.html#method.new_current_thread

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem with the solution you're providing is, that it enforces the creation of a tokio task / thread outside of the request function, therefore breaking our current sync-api. With the current examples, it runs into the following runtime error at the blocking_recv() part:

ERROR: Cannot block the current thread from within a runtime.
This happens because a function attempted to block the current thread while the thread is being used to drive asynchronous tasks.

I'm not entirely sure what we should do here - either provide an all-inclusive function with a handle try_current matching to whatever the user does outside, or enforce either tokio or non-tokio threading.

In the long run, probably the best thing would be to either remove sync-api altogether on our side or introduce a sync-api for jsonrspee without tokio enforcement.

Copy link

@niklasad1 niklasad1 Dec 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I see sorry for giving a bad suggestion. That was news to me

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a bad suggestion at all - it really helped me along the way. Thank you very much!

})
.join()
.unwrap()
.map_err(|e| Error::Client(Box::new(e)))?;

let deserialized_value: R = serde_json::from_value(answer)?;
Ok(deserialized_value)
}
}

Expand Down