From 70025e419b9d44fb6093bc4aa4bb406b60c806c7 Mon Sep 17 00:00:00 2001 From: Roman Zeyde Date: Fri, 13 Aug 2021 20:34:40 +0300 Subject: [PATCH] Add blockchain.outpoint.subscribe RPC --- src/electrum.rs | 55 +++++++++++++++++-- src/status.rs | 141 +++++++++++++++++++++++++++++++++++++++++++++++- src/tracker.rs | 10 +++- 3 files changed, 200 insertions(+), 6 deletions(-) diff --git a/src/electrum.rs b/src/electrum.rs index c9ffa00f7..c29f4985b 100644 --- a/src/electrum.rs +++ b/src/electrum.rs @@ -2,7 +2,7 @@ use anyhow::{bail, Context, Result}; use bitcoin::{ consensus::{deserialize, serialize}, hashes::hex::{FromHex, ToHex}, - BlockHash, Txid, + BlockHash, OutPoint, Txid, }; use crossbeam_channel::Receiver; use rayon::prelude::*; @@ -19,7 +19,7 @@ use crate::{ merkle::Proof, metrics::{self, Histogram, Metrics}, signals::Signal, - status::ScriptHashStatus, + status::{OutPointStatus, ScriptHashStatus}, tracker::Tracker, types::ScriptHash, }; @@ -32,6 +32,7 @@ const UNKNOWN_FEE: isize = -1; // (allowed by Electrum protocol) pub struct Client { tip: Option, scripthashes: HashMap, + outpoints: HashMap, } #[derive(Deserialize)] @@ -176,7 +177,25 @@ impl Rpc { } }) .collect::>>() - .context("failed to update status")?; + .context("failed to update scripthash status")?; + + notifications.extend( + client + .outpoints + .par_iter_mut() + .filter_map(|(outpoint, status)| -> Option> { + match self.tracker.update_outpoint_status(status, &self.daemon) { + Ok(true) => Some(Ok(notification( + "blockchain.outpoint.subscribe", + &[json!([outpoint.txid, outpoint.vout]), json!(status)], + ))), + Ok(false) => None, // outpoint status is the same + Err(e) => Some(Err(e)), + } + }) + .collect::>>() + .context("failed to update scripthash status")?, + ); if let Some(old_tip) = client.tip { let new_tip = self.tracker.chain().tip(); @@ -332,6 +351,28 @@ impl Rpc { }) } + fn outpoint_subscribe(&self, client: &mut Client, (txid, vout): (Txid, u32)) -> Result { + let outpoint = OutPoint::new(txid, vout); + Ok(match client.outpoints.entry(outpoint) { + Entry::Occupied(e) => json!(e.get()), + Entry::Vacant(e) => { + let outpoint = OutPoint::new(txid, vout); + let mut status = OutPointStatus::new(outpoint); + self.tracker + .update_outpoint_status(&mut status, &self.daemon)?; + json!(e.insert(status)) + } + }) + } + + fn outpoint_unsubscribe( + &self, + client: &mut Client, + (txid, vout): (Txid, u32), + ) -> Result { + Ok(json!(client.outpoints.remove(&OutPoint::new(txid, vout)))) + } + fn new_status(&self, scripthash: ScriptHash) -> Result { let mut status = ScriptHashStatus::new(scripthash); self.tracker @@ -505,6 +546,8 @@ impl Rpc { Params::Features => self.features(), Params::HeadersSubscribe => self.headers_subscribe(client), Params::MempoolFeeHistogram => self.get_fee_histogram(), + Params::OutPointSubscribe(args) => self.outpoint_subscribe(client, *args), + Params::OutPointUnsubscribe(args) => self.outpoint_unsubscribe(client, *args), Params::PeersSubscribe => Ok(json!([])), Params::Ping => Ok(Value::Null), Params::RelayFee => self.relayfee(), @@ -527,12 +570,13 @@ enum Params { Banner, BlockHeader((usize,)), BlockHeaders((usize, usize)), - TransactionBroadcast((String,)), Donation, EstimateFee((u16,)), Features, HeadersSubscribe, MempoolFeeHistogram, + OutPointSubscribe((Txid, u32)), // TODO: support spk_hint + OutPointUnsubscribe((Txid, u32)), PeersSubscribe, Ping, RelayFee, @@ -540,6 +584,7 @@ enum Params { ScriptHashGetHistory((ScriptHash,)), ScriptHashListUnspent((ScriptHash,)), ScriptHashSubscribe((ScriptHash,)), + TransactionBroadcast((String,)), TransactionGet(TxGetArgs), TransactionGetMerkle((Txid, usize)), Version((String, Version)), @@ -552,6 +597,8 @@ impl Params { "blockchain.block.headers" => Params::BlockHeaders(convert(params)?), "blockchain.estimatefee" => Params::EstimateFee(convert(params)?), "blockchain.headers.subscribe" => Params::HeadersSubscribe, + "blockchain.outpoint.subscribe" => Params::OutPointSubscribe(convert(params)?), + "blockchain.outpoint.unsubscribe" => Params::OutPointUnsubscribe(convert(params)?), "blockchain.relayfee" => Params::RelayFee, "blockchain.scripthash.get_balance" => Params::ScriptHashGetBalance(convert(params)?), "blockchain.scripthash.get_history" => Params::ScriptHashGetHistory(convert(params)?), diff --git a/src/status.rs b/src/status.rs index e1e30a5cc..70b32a643 100644 --- a/src/status.rs +++ b/src/status.rs @@ -4,7 +4,7 @@ use bitcoin::{ Amount, Block, BlockHash, OutPoint, SignedAmount, Transaction, Txid, }; use rayon::prelude::*; -use serde::ser::{Serialize, Serializer}; +use serde::ser::{Serialize, SerializeMap, Serializer}; use std::collections::{BTreeMap, HashMap, HashSet}; use std::convert::TryFrom; @@ -49,12 +49,26 @@ impl TxEntry { // Confirmation height of a transaction or its mempool state: // https://electrumx-spesmilo.readthedocs.io/en/latest/protocol-methods.html#blockchain-scripthash-get-history // https://electrumx-spesmilo.readthedocs.io/en/latest/protocol-methods.html#blockchain-scripthash-get-mempool +#[derive(Copy, Clone, Eq, PartialEq)] enum Height { Confirmed { height: usize }, Unconfirmed { has_unconfirmed_inputs: bool }, } impl Height { + fn from_blockhash(blockhash: BlockHash, chain: &Chain) -> Self { + let height = chain + .get_block_height(&blockhash) + .expect("missing block in chain"); + Self::Confirmed { height } + } + + fn unconfirmed(e: &crate::mempool::Entry) -> Self { + Self::Unconfirmed { + has_unconfirmed_inputs: e.has_unconfirmed_inputs, + } + } + fn as_i64(&self) -> i64 { match self { Self::Confirmed { height } => i64::try_from(*height).unwrap(), @@ -511,6 +525,131 @@ fn compute_status_hash(history: &[HistoryEntry]) -> Option { Some(StatusHash::from_engine(engine)) } +pub(crate) struct OutPointStatus { + outpoint: OutPoint, + funding: Option, + spending: Option<(Txid, Height)>, + tip: BlockHash, +} + +impl Serialize for OutPointStatus { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let mut map = serializer.serialize_map(None)?; + if let Some(funding) = &self.funding { + map.serialize_entry("height", &funding)?; + } + if let Some((txid, height)) = &self.spending { + map.serialize_entry("spender_txhash", &txid)?; + map.serialize_entry("spender_height", &height)?; + } + map.end() + } +} + +impl OutPointStatus { + pub(crate) fn new(outpoint: OutPoint) -> Self { + Self { + outpoint, + funding: None, + spending: None, + tip: BlockHash::default(), + } + } + + pub(crate) fn sync( + &mut self, + index: &Index, + mempool: &Mempool, + daemon: &Daemon, + ) -> Result { + let funding = self.sync_funding(index, daemon, mempool)?; + let spending = self.sync_spending(index, daemon, mempool)?; + let same_status = (self.funding == funding) && (self.spending == spending); + self.funding = funding; + self.spending = spending; + self.tip = index.chain().tip(); + Ok(!same_status) + } + + /// Return true iff current tip became unconfirmed + fn is_reorg(&self, chain: &Chain) -> bool { + chain.get_block_height(&self.tip).is_none() + } + + fn sync_funding( + &self, + index: &Index, + daemon: &Daemon, + mempool: &Mempool, + ) -> Result> { + let chain = index.chain(); + if !self.is_reorg(chain) { + if let Some(Height::Confirmed { .. }) = &self.funding { + return Ok(self.funding); + } + } + let mut confirmed = None; + daemon.for_blocks( + index.filter_by_txid(self.outpoint.txid), + |blockhash, block| { + if confirmed.is_none() { + for tx in block.txdata { + let txid = tx.txid(); + let output_len = u32::try_from(tx.output.len()).unwrap(); + if self.outpoint.txid == txid && self.outpoint.vout < output_len { + confirmed = Some(Height::from_blockhash(blockhash, chain)); + return; + } + } + } + }, + )?; + Ok(confirmed.or_else(|| { + mempool + .get(&self.outpoint.txid) + .map(|entry| Height::unconfirmed(entry)) + })) + } + + fn sync_spending( + &self, + index: &Index, + daemon: &Daemon, + mempool: &Mempool, + ) -> Result> { + let chain = index.chain(); + if !self.is_reorg(chain) { + if let Some((_, Height::Confirmed { .. })) = &self.spending { + return Ok(self.spending); + } + } + let spending_blockhashes = index.filter_by_spending(self.outpoint); + let mut confirmed = None; + daemon.for_blocks(spending_blockhashes, |blockhash, block| { + for tx in block.txdata { + for txi in &tx.input { + if txi.previous_output == self.outpoint { + // TODO: there should be only one spending input + assert!(confirmed.is_none(), "double spend of {}", self.outpoint); + confirmed = Some((tx.txid(), Height::from_blockhash(blockhash, chain))); + return; + } + } + } + })?; + Ok(confirmed.or_else(|| { + let entries = mempool.filter_by_spending(&self.outpoint); + assert!(entries.len() <= 1, "double spend of {}", self.outpoint); + entries + .first() + .map(|entry| (entry.txid, Height::unconfirmed(entry))) + })) + } +} + #[cfg(test)] mod tests { use super::HistoryEntry; diff --git a/src/tracker.rs b/src/tracker.rs index 054ec23b6..bb086335d 100644 --- a/src/tracker.rs +++ b/src/tracker.rs @@ -11,7 +11,7 @@ use crate::{ mempool::{FeeHistogram, Mempool}, metrics::Metrics, signals::ExitFlag, - status::{Balance, ScriptHashStatus, UnspentEntry}, + status::{Balance, OutPointStatus, ScriptHashStatus, UnspentEntry}, }; /// Electrum protocol subscriptions' tracker @@ -82,6 +82,14 @@ impl Tracker { status.get_balance(self.chain()) } + pub(crate) fn update_outpoint_status( + &self, + status: &mut OutPointStatus, + daemon: &Daemon, + ) -> Result { + status.sync(&self.index, &self.mempool, daemon) + } + pub(crate) fn get_blockhash_by_txid(&self, txid: Txid) -> Option { // Note: there are two blocks with coinbase transactions having same txid (see BIP-30) self.index.filter_by_txid(txid).next()