From fabc6347629ebe712894c5a9544e1b866c1489ae Mon Sep 17 00:00:00 2001 From: Roman Zeyde Date: Fri, 29 Oct 2021 11:33:34 +0300 Subject: [PATCH] Allow multiple scripthashes' subscription in parallel --- src/electrum.rs | 90 +++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 79 insertions(+), 11 deletions(-) diff --git a/src/electrum.rs b/src/electrum.rs index b26c46de4..c9ffa00f7 100644 --- a/src/electrum.rs +++ b/src/electrum.rs @@ -297,11 +297,39 @@ impl Rpc { client: &mut Client, (scripthash,): &(ScriptHash,), ) -> Result { - let result = match client.scripthashes.entry(*scripthash) { - Entry::Occupied(e) => e.get().statushash(), - Entry::Vacant(e) => e.insert(self.new_status(*scripthash)?).statushash(), - }; - Ok(json!(result)) + self.scripthashes_subscribe(client, &[*scripthash]) + .next() + .unwrap() + } + + fn scripthashes_subscribe<'a>( + &self, + client: &'a mut Client, + scripthashes: &'a [ScriptHash], + ) -> impl Iterator> + 'a { + let new_scripthashes: Vec = scripthashes + .iter() + .copied() + .filter(|scripthash| !client.scripthashes.contains_key(scripthash)) + .collect(); + + let mut results: HashMap> = new_scripthashes + .into_par_iter() + .map(|scripthash| (scripthash, self.new_status(scripthash))) + .collect(); + + scripthashes.iter().map(move |scripthash| { + let statushash = match client.scripthashes.entry(*scripthash) { + Entry::Occupied(e) => e.get().statushash(), + Entry::Vacant(e) => { + let status = results + .remove(scripthash) + .expect("missing scripthash status")?; // return an error for failed subscriptions + e.insert(status).statushash() + } + }; + Ok(json!(statushash)) + }) } fn new_status(&self, scripthash: ScriptHash) -> Result { @@ -412,16 +440,56 @@ impl Rpc { } fn handle_calls(&self, client: &mut Client, calls: Result) -> Value { + let calls: Calls = match calls { + Ok(calls) => calls, + Err(response) => return response, // JSON parsing failed - the response does not contain request id + }; + match calls { - Ok(Calls::Batch(batch)) => json!(batch - .into_iter() - .map(|call| self.single_call(client, call)) - .collect::>()), - Ok(Calls::Single(call)) => json!(self.single_call(client, call)), - Err(response) => response, // JSON parsing may fail - the response does not contain request id + Calls::Batch(batch) => { + if let Some(result) = self.try_multi_call(client, &batch) { + return json!(result); + } + json!(batch + .into_iter() + .map(|result| self.single_call(client, result)) + .collect::>()) + } + Calls::Single(result) => self.single_call(client, result), } } + fn try_multi_call( + &self, + client: &mut Client, + calls: &[Result], + ) -> Option> { + // exit if any call failed to parse + let valid_calls = calls + .iter() + .map(|result| result.as_ref().ok()) + .collect::>>()?; + + // only "blockchain.scripthashes.subscribe" are supported + let scripthashes: Vec = valid_calls + .iter() + .map(|call| match &call.params { + Params::ScriptHashSubscribe((scripthash,)) => Some(*scripthash), + _ => None, // exit if any of the calls is not supported + }) + .collect::>>()?; + + Some( + self.rpc_duration + .observe_duration("blockchain.scripthash.subscribe:multi", || { + self.scripthashes_subscribe(client, &scripthashes) + .zip(valid_calls) + .map(|(result, call)| call.response(result)) + .collect::>() + }), + ) + } + fn single_call(&self, client: &mut Client, call: Result) -> Value { let call = match call { Ok(call) => call,