Skip to content

Commit

Permalink
pd: save work streaming compact blocks by avoiding needless deseriali…
Browse files Browse the repository at this point in the history
…zation (#4008)

## Describe your changes

This pulls a proto type out of the state and passes it to the RPC
directly, rather than deserializing through a domain type (which
involves another round of allocations and some crypto operations to
parse field elements). This should help reduce load for RPC servers
streaming compact blocks to clients.

We could in principle go further and pull bytes directly out of the
state and pass them directly to the client (avoiding the
bytes>proto>bytes conversion) but that would be more work (I'm not sure
exactly how or if Tonic supports that), and this is lower-hanging fruit.

## Issue ticket number and link

No issue, just an observation after I looked at Grafana and took a few
minutes to change

## Checklist before requesting a review

- [x] If this code contains consensus-breaking changes, I have added the
"consensus-breaking" label. Otherwise, I declare my belief that there
are not consensus-breaking changes, for the following reason:

> This only changes the RPC implementation, not any consensus logic; it
should be safe to cherry-pick and deploy on a release branch.
  • Loading branch information
hdevalence authored Mar 12, 2024
1 parent 7c96b45 commit 3a9db32
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 6 deletions.
6 changes: 3 additions & 3 deletions crates/core/component/compact-block/src/component/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ impl QueryService for Server {
// Future iterations of this work should start by moving block serialization
// outside of the `send_op` future, and investigate if long blocking sends can
// happen for benign reasons (i.e not caused by the client).
tx_blocks.send(Ok(compact_block.into())).await?;
tx_blocks.send(Ok(compact_block)).await?;
metrics::counter!(metrics::COMPACT_BLOCK_RANGE_SERVED_TOTAL).increment(1);
}

Expand Down Expand Up @@ -172,7 +172,7 @@ impl QueryService for Server {
.expect("no error fetching block")
.expect("compact block for in-range height must be present");
tx_blocks
.send(Ok(block.into()))
.send(Ok(block))
.await
.map_err(|_| tonic::Status::cancelled("client closed connection"))?;
metrics::counter!(metrics::COMPACT_BLOCK_RANGE_SERVED_TOTAL).increment(1);
Expand Down Expand Up @@ -201,7 +201,7 @@ impl QueryService for Server {
.map_err(|e| tonic::Status::internal(e.to_string()))?
.expect("compact block for in-range height must be present");
tx_blocks
.send(Ok(block.into()))
.send(Ok(block))
.await
.map_err(|_| tonic::Status::cancelled("channel closed"))?;
metrics::counter!(metrics::COMPACT_BLOCK_RANGE_SERVED_TOTAL).increment(1);
Expand Down
15 changes: 13 additions & 2 deletions crates/core/component/compact-block/src/component/view.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
use crate::{state_key, CompactBlock};
use crate::state_key;
use anyhow::Context;
use anyhow::Error;
use anyhow::Result;
use async_trait::async_trait;
use cnidarium::StateRead;
use futures::Stream;
use futures::StreamExt;
use penumbra_proto::DomainType;
use penumbra_proto::{penumbra::core::component::compact_block::v1::CompactBlock, Message};
use std::pin::Pin;

#[async_trait]
pub trait StateReadExt: StateRead {
/// Returns a stream of [`CompactBlock`]s starting from `start_height`.
///
/// Note: this method returns the proto type from `penumbra_proto`, rather
/// than deserializing into the domain type, because the primary use is in
/// serving RPC requests, where the proto type will be re-serialized and
/// sent to clients.
fn stream_compact_block(
&self,
start_height: u64,
Expand All @@ -31,6 +36,12 @@ pub trait StateReadExt: StateRead {
.boxed()
}

/// Returns a single [`CompactBlock`] at the given `height`.
///
/// Note: this method returns the proto type from `penumbra_proto`, rather
/// than deserializing into the domain type, because the primary use is in
/// serving RPC requests, where the proto type will be re-serialized and
/// sent to clients.
async fn compact_block(&self, height: u64) -> Result<Option<CompactBlock>> {
Ok(self
.nonverifiable_get_raw(state_key::compact_block(height).as_bytes())
Expand Down
2 changes: 1 addition & 1 deletion crates/test/mock-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ impl MockClient {
.compact_block(height)
.await?
.ok_or_else(|| anyhow::anyhow!("missing compact block for height {}", height))?;
self.scan_block(compact_block)?;
self.scan_block(compact_block.try_into()?)?;
let (latest_height, root) = self.latest_height_and_sct_root();
anyhow::ensure!(latest_height == height, "latest height should be updated");
let expected_root = state
Expand Down

0 comments on commit 3a9db32

Please sign in to comment.