From f4afe88464238bda2cbb8d48e40a86cb7388b920 Mon Sep 17 00:00:00 2001 From: Roman Zeyde Date: Tue, 26 Oct 2021 13:09:03 +0300 Subject: [PATCH] Collect RPC events into batches Related to https://github.com/romanz/electrs/issues/464 --- src/electrum.rs | 17 +++++-- src/server.rs | 125 +++++++++++++++++++++++++++++------------------- src/tracker.rs | 3 +- 3 files changed, 88 insertions(+), 57 deletions(-) diff --git a/src/electrum.rs b/src/electrum.rs index be38d12aa..2ca73fbe2 100644 --- a/src/electrum.rs +++ b/src/electrum.rs @@ -17,7 +17,7 @@ use crate::{ config::{Config, ELECTRS_VERSION}, daemon::{self, extract_bitcoind_error, Daemon}, merkle::Proof, - metrics::{self, Histogram}, + metrics::{self, Histogram, Metrics}, signals::Signal, status::ScriptHashStatus, tracker::Tracker, @@ -122,15 +122,15 @@ pub struct Rpc { impl Rpc { /// Perform initial index sync (may take a while on first run). - pub fn new(config: &Config) -> Result { - let tracker = Tracker::new(config)?; - let rpc_duration = tracker.metrics().histogram_vec( + pub fn new(config: &Config, metrics: Metrics) -> Result { + let rpc_duration = metrics.histogram_vec( "rpc_duration", "RPC duration (in seconds)", "method", metrics::default_duration_buckets(), ); + let tracker = Tracker::new(config, metrics)?; let signal = Signal::new(); let daemon = Daemon::connect(config, signal.exit_flag(), tracker.metrics())?; let cache = Cache::new(tracker.metrics()); @@ -395,7 +395,14 @@ impl Rpc { })) } - pub fn handle_request(&self, client: &mut Client, line: &str) -> String { + pub fn handle_requests(&self, client: &mut Client, lines: &[String]) -> Vec { + lines + .iter() + .map(|line| self.handle_request(client, &line)) + .collect() + } + + fn handle_request(&self, client: &mut Client, line: &str) -> String { let error_msg_no_id = |err| error_msg(Value::Null, RpcError::Standard(err)); let response: Value = match serde_json::from_str(line) { // parse JSON from str diff --git a/src/server.rs b/src/server.rs index 2ee0171bd..c431d865e 100644 --- a/src/server.rs +++ b/src/server.rs @@ -5,12 +5,14 @@ use rayon::prelude::*; use std::{ collections::hash_map::HashMap, io::{BufRead, BufReader, Write}, + iter::once, net::{Shutdown, TcpListener, TcpStream}, }; use crate::{ config::Config, electrum::{Client, Rpc}, + metrics::{self, Metrics}, signals::ExitError, thread::spawn, }; @@ -60,7 +62,16 @@ pub fn run() -> Result<()> { fn serve() -> Result<()> { let config = Config::from_args(); - let mut rpc = Rpc::new(&config)?; + let metrics = Metrics::new(config.monitoring_addr)?; + + let server_batch_size = metrics.histogram_vec( + "server_batch_size", + "# of server events handled in a single batch", + "type", + metrics::default_size_buckets(), + ); + + let mut rpc = Rpc::new(&config, metrics)?; let (server_tx, server_rx) = unbounded(); if !config.disable_electrum_rpc { @@ -77,32 +88,29 @@ fn serve() -> Result<()> { return Ok(()); } peers = notify_peers(&rpc, peers); // peers are disconnected on error. - loop { - select! { - // Handle signals for graceful shutdown - recv(rpc.signal().receiver()) -> result => { - result.context("signal channel disconnected")?; - rpc.signal().exit_flag().poll().context("RPC server interrupted")?; - }, - // Handle new blocks' notifications - recv(new_block_rx) -> result => match result { - Ok(_) => break, // sync and update - Err(_) => { - info!("disconnected from bitcoind"); - return Ok(()); - } - }, - // Handle Electrum RPC requests - recv(server_rx) -> event => { - let event = event.context("server disconnected")?; - handle_event(&rpc, &mut peers, event); - }, - default(config.wait_duration) => break, // sync and update - }; - // continue RPC processing (if more requests are pending) - if server_rx.is_empty() { - break; - } + select! { + // Handle signals for graceful shutdown + recv(rpc.signal().receiver()) -> result => { + result.context("signal channel disconnected")?; + rpc.signal().exit_flag().poll().context("RPC server interrupted")?; + }, + // Handle new blocks' notifications + recv(new_block_rx) -> result => match result { + Ok(_) => (), // sync and update + Err(_) => { + info!("disconnected from bitcoind"); + return Ok(()); + } + }, + // Handle Electrum RPC requests + recv(server_rx) -> event => { + let first = once(event.context("server disconnected")?); + let rest = server_rx.iter().take(server_rx.len()); + let events: Vec = first.chain(rest).collect(); + server_batch_size.observe("recv", events.len()); + handle_events(&rpc, &mut peers, events); + }, + default(config.wait_duration) => (), // sync and update } } } @@ -140,35 +148,52 @@ enum Message { Done, } -fn handle_event(rpc: &Rpc, peers: &mut HashMap, event: Event) { - let Event { msg, peer_id } = event; - match msg { - Message::New(stream) => { - debug!("{}: connected", peer_id); - peers.insert(peer_id, Peer::new(peer_id, stream)); - } - Message::Request(line) => { - let result = match peers.get_mut(&peer_id) { - Some(peer) => handle_request(rpc, peer, &line), - None => return, // unknown peer - }; - if let Err(e) = result { - error!("{}: disconnecting due to {}", peer_id, e); - peers.remove(&peer_id).unwrap().disconnect(); +fn handle_events(rpc: &Rpc, peers: &mut HashMap, events: Vec) { + let mut events_by_peer = HashMap::>::new(); + events + .into_iter() + .for_each(|e| events_by_peer.entry(e.peer_id).or_default().push(e.msg)); + for (peer_id, messages) in events_by_peer { + handle_peer_events(rpc, peers, peer_id, messages); + } +} + +fn handle_peer_events( + rpc: &Rpc, + peers: &mut HashMap, + peer_id: usize, + messages: Vec, +) { + let mut lines = vec![]; + let mut done = false; + for msg in messages { + match msg { + Message::New(stream) => { + debug!("{}: connected", peer_id); + peers.insert(peer_id, Peer::new(peer_id, stream)); + } + Message::Request(line) => lines.push(line), + Message::Done => { + done = true; + break; } } - Message::Done => { - // already disconnected, just remove from peers' map - peers.remove(&peer_id); + } + let result = match peers.get_mut(&peer_id) { + Some(peer) => { + let responses = rpc.handle_requests(&mut peer.client, &lines); + peer.send(responses) } + None => return, // unknown peer + }; + if let Err(e) = result { + error!("{}: disconnecting due to {}", peer_id, e); + peers.remove(&peer_id).unwrap().disconnect(); + } else if done { + peers.remove(&peer_id); // already disconnected, just remove from peers' map } } -fn handle_request(rpc: &Rpc, peer: &mut Peer, line: &str) -> Result<()> { - let response = rpc.handle_request(&mut peer.client, line); - peer.send(vec![response]) -} - fn accept_loop(listener: TcpListener, server_tx: Sender) -> Result<()> { for (peer_id, conn) in listener.incoming().enumerate() { let stream = conn.context("failed to accept")?; diff --git a/src/tracker.rs b/src/tracker.rs index b2d9c057b..054ec23b6 100644 --- a/src/tracker.rs +++ b/src/tracker.rs @@ -23,8 +23,7 @@ pub struct Tracker { } impl Tracker { - pub fn new(config: &Config) -> Result { - let metrics = Metrics::new(config.monitoring_addr)?; + pub fn new(config: &Config, metrics: Metrics) -> Result { let store = DBStore::open(&config.db_path, config.auto_reindex)?; let chain = Chain::new(config.network); Ok(Self {