diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 03aab46fe..c974d8a86 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -153,6 +153,9 @@ jobs: pallet_balances_tests, pallet_transaction_payment_tests, state_tests, + tungstenite_client_test, + ws_client_test, + state_tests, runtime_update_sync, runtime_update_async, ] diff --git a/examples/examples/transfer_with_tungstenite_client.rs b/examples/examples/transfer_with_tungstenite_client.rs index e60695190..2fefa986c 100755 --- a/examples/examples/transfer_with_tungstenite_client.rs +++ b/examples/examples/transfer_with_tungstenite_client.rs @@ -55,8 +55,22 @@ fn main() { let bob_balance = api.get_account_data(&bob.into()).unwrap().unwrap_or_default().free; println!("[+] Bob's Free Balance is {}\n", bob_balance); - // Generate extrinsic. - let xt = api.balance_transfer_allow_death(MultiAddress::Id(bob.into()), 1000000000000); + // We first generate an extrinsic that will fail to be executed due to missing funds. + let xt = api.balance_transfer_allow_death(MultiAddress::Id(bob.into()), bob_balance + 1); + println!( + "Sending an extrinsic from Alice (Key = {}),\n\nto Bob (Key = {})\n", + alice.public(), + bob + ); + println!("[+] Composed extrinsic: {:?}\n", xt); + + // Send and watch extrinsic until it fails onchain. + let result = api.submit_and_watch_extrinsic_until(xt, XtStatus::InBlock); + assert!(result.is_err()); + println!("[+] Extrinsic did not get included due to: {:?}\n", result); + + // This time, we generate an extrinsic that will succeed. + let xt = api.balance_transfer_allow_death(MultiAddress::Id(bob.into()), bob_balance / 2); println!( "Sending an extrinsic from Alice (Key = {}),\n\nto Bob (Key = {})\n", alice.public(), @@ -70,7 +84,7 @@ fn main() { .unwrap() .block_hash .unwrap(); - println!("[+] Extrinsic got included. Hash: {:?}\n", block_hash); + println!("[+] Extrinsic got included. Block Hash: {:?}\n", block_hash); // Verify that Bob's free Balance increased. let bob_new_balance = api.get_account_data(&bob.into()).unwrap().unwrap().free; diff --git a/examples/examples/transfer_with_ws_client.rs b/examples/examples/transfer_with_ws_client.rs index 14581bd07..2adb39bbf 100755 --- a/examples/examples/transfer_with_ws_client.rs +++ b/examples/examples/transfer_with_ws_client.rs @@ -54,8 +54,22 @@ fn main() { let bob_balance = api.get_account_data(&bob.into()).unwrap().unwrap_or_default().free; println!("[+] Bob's Free Balance is {}\n", bob_balance); - // Generate extrinsic. - let xt = api.balance_transfer_allow_death(MultiAddress::Id(bob.into()), 1000000000000); + // We first generate an extrinsic that will fail to be executed due to missing funds. + let xt = api.balance_transfer_allow_death(MultiAddress::Id(bob.into()), bob_balance + 1); + println!( + "Sending an extrinsic from Alice (Key = {}),\n\nto Bob (Key = {})\n", + alice.public(), + bob + ); + println!("[+] Composed extrinsic: {:?}\n", xt); + + // Send and watch extrinsic until it fails onchain. + let result = api.submit_and_watch_extrinsic_until(xt, XtStatus::InBlock); + assert!(result.is_err()); + println!("[+] Extrinsic did not get included due to: {:?}\n", result); + + // This time, we generate an extrinsic that will succeed. + let xt = api.balance_transfer_allow_death(MultiAddress::Id(bob.into()), bob_balance / 2); println!( "Sending an extrinsic from Alice (Key = {}),\n\nto Bob (Key = {})\n", alice.public(), @@ -69,7 +83,7 @@ fn main() { .unwrap() .block_hash .unwrap(); - println!("[+] Extrinsic got included. Hash: {:?}\n", block_hash); + println!("[+] Extrinsic got included. Block Hash: {:?}\n", block_hash); // Verify that Bob's free Balance increased. let bob_new_balance = api.get_account_data(&bob.into()).unwrap().unwrap().free; diff --git a/src/rpc/error.rs b/src/rpc/error.rs index b459f23d8..8f7e635f5 100644 --- a/src/rpc/error.rs +++ b/src/rpc/error.rs @@ -22,6 +22,7 @@ pub type Result = core::result::Result; #[derive(Debug)] pub enum Error { SerdeJson(serde_json::error::Error), + ExtrinsicFailed(String), MpscSend(String), InvalidUrl(String), RecvError(String), diff --git a/src/rpc/helpers.rs b/src/rpc/helpers.rs new file mode 100644 index 000000000..5dcd5524a --- /dev/null +++ b/src/rpc/helpers.rs @@ -0,0 +1,148 @@ +/* + Copyright 2019 Supercomputing Systems AG + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +*/ + +use alloc::{ + format, + string::{String, ToString}, +}; +use serde_json::Value; + +pub fn read_subscription_id(value: &Value) -> Option { + value["result"].as_str().map(|str| str.to_string()) +} + +pub fn read_error_message(value: &Value, msg: &str) -> String { + match value["error"].as_str() { + Some(error_message) => error_message.to_string(), + None => format!("Unexpected Response: {}", msg), + } +} + +pub fn subscription_id_matches(value: &Value, subscription_id: &str) -> bool { + match value["params"]["subscription"].as_str() { + Some(retrieved_subscription_id) => subscription_id == retrieved_subscription_id, + None => false, + } +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + + #[test] + fn read_valid_subscription_response() { + let subcription_id = "tejkataa12124a"; + let value = json!({ + "result": subcription_id, + "id": 43, + "and_so_on": "test", + }); + + let maybe_subcription_id = read_subscription_id(&value); + assert_eq!(maybe_subcription_id, Some(subcription_id.to_string())); + } + + #[test] + fn read_invalid_subscription_response() { + let subcription_id = "tejkataa12124a"; + let value = json!({ + "error": subcription_id, + "id": 43, + "and_so_on": "test", + }); + + let maybe_subcription_id = read_subscription_id(&value); + assert!(maybe_subcription_id.is_none()); + } + + #[test] + fn read_error_message_returns_error_if_available() { + let error_message = "some_error_message"; + let value = json!({ + "error": error_message, + "id": 43, + "and_so_on": "test", + }); + + let msg = serde_json::to_string(&value).unwrap(); + + let message = read_error_message(&value, &msg); + assert!(message.contains(error_message)); + assert!(message.contains("error")); + } + + #[test] + fn read_error_message_returns_full_msg_if_error_is_not_available() { + let error_message = "some_error_message"; + let value = json!({ + "result": error_message, + "id": 43, + "and_so_on": "test", + }); + + let msg = serde_json::to_string(&value).unwrap(); + + let message = read_error_message(&value, &msg); + assert!(message.contains(&msg)); + } + + #[test] + fn subscription_id_matches_returns_true_for_equal_id() { + let subcription_id = "tejkataa12124a"; + let value = json!({ + "params": { + "subscription": subcription_id, + "message": "Test" + }, + "id": 43, + "and_so_on": "test", + }); + + assert!(subscription_id_matches(&value, subcription_id)); + } + + #[test] + fn subscription_id_matches_returns_false_for_not_equal_id() { + let subcription_id = "tejkataa12124a"; + let value = json!({ + "params": { + "subscription": "something else", + "message": "Test" + }, + "id": 43, + "and_so_on": "test", + }); + + assert!(!subscription_id_matches(&value, subcription_id)); + } + + #[test] + fn subscription_id_matches_returns_false_for_missing_subscription() { + let subcription_id = "tejkataa12124a"; + let value = json!({ + "params": { + "result": subcription_id, + "message": "Test" + }, + "id": 43, + "and_so_on": "test", + }); + + assert!(!subscription_id_matches(&value, subcription_id)); + } +} diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index 73d07801f..ff79acdea 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -35,6 +35,8 @@ pub use jsonrpsee_client::JsonrpseeClient; pub mod jsonrpsee_client; pub mod error; +#[cfg(any(feature = "ws-client", feature = "tungstenite-client"))] +mod helpers; pub use error::{Error, Result}; diff --git a/src/rpc/tungstenite_client/client.rs b/src/rpc/tungstenite_client/client.rs index 983d31743..906f11ec5 100644 --- a/src/rpc/tungstenite_client/client.rs +++ b/src/rpc/tungstenite_client/client.rs @@ -15,7 +15,7 @@ */ use crate::rpc::{ - to_json_req, tungstenite_client::subscription::TungsteniteSubscriptionWrapper, + helpers, to_json_req, tungstenite_client::subscription::TungsteniteSubscriptionWrapper, Error as RpcClientError, Request, Result, Subscribe, }; use ac_primitives::RpcParams; @@ -134,9 +134,22 @@ fn subscribe_to_server( // Subscribe to server socket.send(Message::Text(json_req))?; + // Read the first message response - must be the subscription id. + let msg = read_until_text_message(&mut socket)?; + let value: Value = serde_json::from_str(&msg)?; + + let subcription_id = match helpers::read_subscription_id(&value) { + Some(id) => id, + None => { + let message = helpers::read_error_message(&value, &msg); + result_in.send(message)?; + return Ok(()) + }, + }; + loop { let msg = read_until_text_message(&mut socket)?; - send_message_to_client(result_in.clone(), msg.as_str())?; + send_message_to_client(result_in.clone(), &msg, &subcription_id)?; } } @@ -147,19 +160,18 @@ pub fn do_reconnect(error: &RpcClientError) -> bool { ) } -fn send_message_to_client(result_in: ThreadOut, message: &str) -> Result<()> { - debug!("got on_subscription_msg {}", message); +fn send_message_to_client( + result_in: ThreadOut, + message: &str, + subscription_id: &str, +) -> Result<()> { + info!("got on_subscription_msg {}", message); let value: Value = serde_json::from_str(message)?; - match value["id"].as_str() { - Some(_idstr) => { - warn!("Expected subscription, but received an id response instead: {:?}", value); - }, - None => { - let message = serde_json::to_string(&value["params"]["result"])?; - result_in.send(message)?; - }, - }; + if helpers::subscription_id_matches(&value, subscription_id) { + result_in.send(serde_json::to_string(&value["params"]["result"])?)?; + } + Ok(()) } diff --git a/src/rpc/tungstenite_client/subscription.rs b/src/rpc/tungstenite_client/subscription.rs index f06abdd42..75b84e265 100644 --- a/src/rpc/tungstenite_client/subscription.rs +++ b/src/rpc/tungstenite_client/subscription.rs @@ -15,7 +15,7 @@ */ -use crate::rpc::{HandleSubscription, Result}; +use crate::rpc::{Error, HandleSubscription, Result}; use core::marker::PhantomData; use serde::de::DeserializeOwned; use std::sync::mpsc::Receiver; @@ -42,7 +42,7 @@ impl HandleSubscription // Sender was disconnected, therefore no further messages are to be expected. Err(_e) => return None, }; - Some(serde_json::from_str(¬ification).map_err(|e| e.into())) + Some(serde_json::from_str(¬ification).map_err(|_| Error::ExtrinsicFailed(notification))) } async fn unsubscribe(self) -> Result<()> { diff --git a/src/rpc/ws_client/client.rs b/src/rpc/ws_client/client.rs index d5ba7ac03..bd7b53bcf 100644 --- a/src/rpc/ws_client/client.rs +++ b/src/rpc/ws_client/client.rs @@ -50,7 +50,7 @@ impl WsRpcClient { impl Request for WsRpcClient { async fn request(&self, method: &str, params: RpcParams) -> Result { let json_req = to_json_req(method, params)?; - let response = self.direct_rpc_request(json_req, RequestHandler::default())??; + let response = self.direct_rpc_request(json_req, RequestHandler)??; let deserialized_value: R = serde_json::from_str(&response)?; Ok(deserialized_value) } diff --git a/src/rpc/ws_client/mod.rs b/src/rpc/ws_client/mod.rs index 2ea1eac7a..714658043 100644 --- a/src/rpc/ws_client/mod.rs +++ b/src/rpc/ws_client/mod.rs @@ -15,7 +15,7 @@ */ -use crate::rpc::Error as RpcClientError; +use crate::rpc::{helpers, Error as RpcClientError}; use log::*; use std::{fmt::Debug, sync::mpsc::Sender as ThreadOut}; use ws::{CloseCode, Handler, Handshake, Message, Result as WsResult, Sender}; @@ -33,7 +33,7 @@ pub(crate) trait HandleMessage { type ThreadMessage; type Context; - fn handle_message(&self, context: &mut Self::Context) -> WsResult<()>; + fn handle_message(&mut self, context: &mut Self::Context) -> WsResult<()>; } // Clippy says request is never used, even though it is.. @@ -83,7 +83,7 @@ impl HandleMessage for RequestHandler { type ThreadMessage = RpcMessage; type Context = MessageContext; - fn handle_message(&self, context: &mut Self::Context) -> WsResult<()> { + fn handle_message(&mut self, context: &mut Self::Context) -> WsResult<()> { let result = &context.result; let out = &context.out; let msg = &context.msg; @@ -101,34 +101,59 @@ impl HandleMessage for RequestHandler { } #[derive(Default, Debug, PartialEq, Eq, Clone)] -pub(crate) struct SubscriptionHandler {} +pub(crate) struct SubscriptionHandler { + subscription_id: Option, +} impl HandleMessage for SubscriptionHandler { type ThreadMessage = String; type Context = MessageContext; - fn handle_message(&self, context: &mut Self::Context) -> WsResult<()> { + fn handle_message(&mut self, context: &mut Self::Context) -> WsResult<()> { let result = &context.result; let out = &context.out; - let msg = &context.msg; + let msg = &context.msg.as_text()?; info!("got on_subscription_msg {}", msg); - let value: serde_json::Value = serde_json::from_str(msg.as_text()?).map_err(Box::new)?; + let value: serde_json::Value = serde_json::from_str(msg).map_err(Box::new)?; - match value["id"].as_str() { - Some(_idstr) => { - warn!("Expected subscription, but received an id response instead: {:?}", value); - }, + let send_result = match self.subscription_id.as_ref() { + Some(id) => handle_subscription_message(result, &value, id), None => { - let answer = serde_json::to_string(&value["params"]["result"]).map_err(Box::new)?; - - if let Err(e) = result.send(answer) { - // This may happen if the receiver has unsubscribed. - trace!("SendError: {}. will close ws", e); - out.close(CloseCode::Normal)?; + self.subscription_id = helpers::read_subscription_id(&value); + if self.subscription_id.is_none() { + send_error_response(result, &value, msg) + } else { + Ok(()) } }, }; + + if let Err(e) = send_result { + // This may happen if the receiver has unsubscribed. + trace!("SendError: {:?}. will close ws", e); + out.close(CloseCode::Normal)?; + }; Ok(()) } } + +fn handle_subscription_message( + result: &ThreadOut, + value: &serde_json::Value, + subscription_id: &str, +) -> Result<(), RpcClientError> { + if helpers::subscription_id_matches(value, subscription_id) { + result.send(serde_json::to_string(&value["params"]["result"])?)?; + } + Ok(()) +} + +fn send_error_response( + result: &ThreadOut, + value: &serde_json::Value, + msg: &str, +) -> Result<(), RpcClientError> { + result.send(helpers::read_error_message(value, msg))?; + Ok(()) +} diff --git a/src/rpc/ws_client/subscription.rs b/src/rpc/ws_client/subscription.rs index 0358e7ad7..0f86d9808 100644 --- a/src/rpc/ws_client/subscription.rs +++ b/src/rpc/ws_client/subscription.rs @@ -15,7 +15,7 @@ */ -use crate::rpc::{HandleSubscription, Result}; +use crate::rpc::{Error, HandleSubscription, Result}; use core::marker::PhantomData; use serde::de::DeserializeOwned; use std::sync::mpsc::Receiver; @@ -44,7 +44,7 @@ impl HandleSubscription // Sender was disconnected, therefore no further messages are to be expected. Err(_) => return None, }; - Some(serde_json::from_str(¬ification).map_err(|e| e.into())) + Some(serde_json::from_str(¬ification).map_err(|_| Error::ExtrinsicFailed(notification))) } async fn unsubscribe(self) -> Result<()> { diff --git a/testing/examples/tungstenite_client_test.rs b/testing/examples/tungstenite_client_test.rs new file mode 100755 index 000000000..8d4694ee6 --- /dev/null +++ b/testing/examples/tungstenite_client_test.rs @@ -0,0 +1,62 @@ +/* + Copyright 2019 Supercomputing Systems AG + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +use sp_core::{ + crypto::{Pair, Ss58Codec}, + sr25519, +}; +use sp_runtime::MultiAddress; +use substrate_api_client::{ + ac_primitives::{AssetRuntimeConfig, ExtrinsicSigner}, + extrinsic::BalancesExtrinsics, + rpc::TungsteniteRpcClient, + Api, GetAccountInformation, SubmitAndWatch, XtStatus, +}; + +fn main() { + // Setup + let alice: sr25519::Pair = Pair::from_string( + "0xe5be9a5092b81bca64be81d212e7f2f9eba183bb7a90954f7b76361f6edb5c0a", + None, + ) + .unwrap(); + let client = TungsteniteRpcClient::with_default_url(100); + let mut api = Api::::new(client).unwrap(); + api.set_signer(ExtrinsicSigner::::new(alice.clone())); + + let bob = sr25519::Public::from_ss58check("5FHneW46xGXgs5mUiveU4sbTyGBzmstUspZC92UhjJM694ty") + .unwrap(); + let bob_balance = api.get_account_data(&bob.into()).unwrap().unwrap_or_default().free; + + // Check for failed extrinsic failed onchain + let xt = api.balance_transfer_allow_death(MultiAddress::Id(bob.into()), bob_balance + 1); + let result = api.submit_and_watch_extrinsic_until(xt.clone(), XtStatus::InBlock); + assert!(format!("{:?}", result).contains("FundsUnavailable")); + + // Check directly failed extrinsic (before actually submitted to a block) + let result = api.submit_and_watch_extrinsic_until(xt, XtStatus::InBlock); + assert!(result.is_err()); + assert!(format!("{:?}", result).contains("ExtrinsicFailed")); + + // Check for successful extrinisc + let xt = api.balance_transfer_allow_death(MultiAddress::Id(bob.into()), bob_balance / 2); + let _block_hash = api + .submit_and_watch_extrinsic_until(xt, XtStatus::InBlock) + .unwrap() + .block_hash + .unwrap(); + let bob_new_balance = api.get_account_data(&bob.into()).unwrap().unwrap().free; + assert!(bob_new_balance > bob_balance); +} diff --git a/testing/examples/ws_client_test.rs b/testing/examples/ws_client_test.rs new file mode 100755 index 000000000..10dbd0866 --- /dev/null +++ b/testing/examples/ws_client_test.rs @@ -0,0 +1,62 @@ +/* + Copyright 2019 Supercomputing Systems AG + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +use sp_core::{ + crypto::{Pair, Ss58Codec}, + sr25519, +}; +use sp_runtime::MultiAddress; +use substrate_api_client::{ + ac_primitives::{AssetRuntimeConfig, ExtrinsicSigner}, + extrinsic::BalancesExtrinsics, + rpc::WsRpcClient, + Api, GetAccountInformation, SubmitAndWatch, XtStatus, +}; + +fn main() { + // Setup + let alice: sr25519::Pair = Pair::from_string( + "0xe5be9a5092b81bca64be81d212e7f2f9eba183bb7a90954f7b76361f6edb5c0a", + None, + ) + .unwrap(); + let client = WsRpcClient::with_default_url(); + let mut api = Api::::new(client).unwrap(); + api.set_signer(ExtrinsicSigner::::new(alice.clone())); + + let bob = sr25519::Public::from_ss58check("5FHneW46xGXgs5mUiveU4sbTyGBzmstUspZC92UhjJM694ty") + .unwrap(); + let bob_balance = api.get_account_data(&bob.into()).unwrap().unwrap_or_default().free; + + // Check for failed extrinsic failed onchain + let xt = api.balance_transfer_allow_death(MultiAddress::Id(bob.into()), bob_balance + 1); + let result = api.submit_and_watch_extrinsic_until(xt.clone(), XtStatus::InBlock); + assert!(format!("{:?}", result).contains("FundsUnavailable")); + + // Check directly failed extrinsic (before actually submitted to a block) + let result = api.submit_and_watch_extrinsic_until(xt, XtStatus::InBlock); + assert!(result.is_err()); + assert!(format!("{:?}", result).contains("ExtrinsicFailed")); + + // Check for successful extrinisc + let xt = api.balance_transfer_allow_death(MultiAddress::Id(bob.into()), bob_balance / 2); + let _block_hash = api + .submit_and_watch_extrinsic_until(xt, XtStatus::InBlock) + .unwrap() + .block_hash + .unwrap(); + let bob_new_balance = api.get_account_data(&bob.into()).unwrap().unwrap().free; + assert!(bob_new_balance > bob_balance); +}