Skip to content

Commit

Permalink
Allow multiple scripthashes' subscription in parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
romanz committed Oct 30, 2021
1 parent 44bc27d commit fabc634
Showing 1 changed file with 79 additions and 11 deletions.
90 changes: 79 additions & 11 deletions src/electrum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,11 +297,39 @@ impl Rpc {
client: &mut Client,
(scripthash,): &(ScriptHash,),
) -> Result<Value> {
let result = match client.scripthashes.entry(*scripthash) {
Entry::Occupied(e) => e.get().statushash(),
Entry::Vacant(e) => e.insert(self.new_status(*scripthash)?).statushash(),
};
Ok(json!(result))
self.scripthashes_subscribe(client, &[*scripthash])
.next()
.unwrap()
}

fn scripthashes_subscribe<'a>(
&self,
client: &'a mut Client,
scripthashes: &'a [ScriptHash],
) -> impl Iterator<Item = Result<Value>> + 'a {
let new_scripthashes: Vec<ScriptHash> = scripthashes
.iter()
.copied()
.filter(|scripthash| !client.scripthashes.contains_key(scripthash))
.collect();

let mut results: HashMap<ScriptHash, Result<ScriptHashStatus>> = new_scripthashes
.into_par_iter()
.map(|scripthash| (scripthash, self.new_status(scripthash)))
.collect();

scripthashes.iter().map(move |scripthash| {
let statushash = match client.scripthashes.entry(*scripthash) {
Entry::Occupied(e) => e.get().statushash(),
Entry::Vacant(e) => {
let status = results
.remove(scripthash)
.expect("missing scripthash status")?; // return an error for failed subscriptions
e.insert(status).statushash()
}
};
Ok(json!(statushash))
})
}

fn new_status(&self, scripthash: ScriptHash) -> Result<ScriptHashStatus> {
Expand Down Expand Up @@ -412,16 +440,56 @@ impl Rpc {
}

fn handle_calls(&self, client: &mut Client, calls: Result<Calls, Value>) -> Value {
let calls: Calls = match calls {
Ok(calls) => calls,
Err(response) => return response, // JSON parsing failed - the response does not contain request id
};

match calls {
Ok(Calls::Batch(batch)) => json!(batch
.into_iter()
.map(|call| self.single_call(client, call))
.collect::<Vec<_>>()),
Ok(Calls::Single(call)) => json!(self.single_call(client, call)),
Err(response) => response, // JSON parsing may fail - the response does not contain request id
Calls::Batch(batch) => {
if let Some(result) = self.try_multi_call(client, &batch) {
return json!(result);
}
json!(batch
.into_iter()
.map(|result| self.single_call(client, result))
.collect::<Vec<Value>>())
}
Calls::Single(result) => self.single_call(client, result),
}
}

fn try_multi_call(
&self,
client: &mut Client,
calls: &[Result<Call, Value>],
) -> Option<Vec<Value>> {
// exit if any call failed to parse
let valid_calls = calls
.iter()
.map(|result| result.as_ref().ok())
.collect::<Option<Vec<&Call>>>()?;

// only "blockchain.scripthashes.subscribe" are supported
let scripthashes: Vec<ScriptHash> = valid_calls
.iter()
.map(|call| match &call.params {
Params::ScriptHashSubscribe((scripthash,)) => Some(*scripthash),
_ => None, // exit if any of the calls is not supported
})
.collect::<Option<Vec<ScriptHash>>>()?;

Some(
self.rpc_duration
.observe_duration("blockchain.scripthash.subscribe:multi", || {
self.scripthashes_subscribe(client, &scripthashes)
.zip(valid_calls)
.map(|(result, call)| call.response(result))
.collect::<Vec<Value>>()
}),
)
}

fn single_call(&self, client: &mut Client, call: Result<Call, Value>) -> Value {
let call = match call {
Ok(call) => call,
Expand Down

0 comments on commit fabc634

Please sign in to comment.