Skip to content

Commit

Permalink
compactblock: add unary rpc, fix pcli query
Browse files Browse the repository at this point in the history
This allows us to inspect compact blocks in Buf Studio (by exposing a unary
RPC) and fixes the broken query in `pcli q`.

As part of follow up work, we should eventually aim to replace all of the k/v
queries in pcli with real RPCs that are actually useful by other things than
just our Rust code.
  • Loading branch information
hdevalence committed Jan 30, 2024
1 parent 6cf8fcf commit 4b76d88
Show file tree
Hide file tree
Showing 8 changed files with 583 additions and 85 deletions.
20 changes: 20 additions & 0 deletions crates/bin/pcli/src/command/query.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use anyhow::{anyhow, Context, Result};

mod shielded_pool;
use colored_json::ToColoredJson;
use shielded_pool::ShieldedPool;
mod tx;
use tx::Tx;
Expand Down Expand Up @@ -115,6 +116,25 @@ impl QueryCmd {
return ibc.exec(app).await;
}

// TODO: this is a hack; we should replace all raw state key uses with RPC methods.
if let QueryCmd::ShieldedPool(ShieldedPool::CompactBlock { height }) = self {
use penumbra_proto::core::component::compact_block::v1alpha1::{
query_service_client::QueryServiceClient as CompactBlockQueryServiceClient,
CompactBlockRequest,
};
let mut client = CompactBlockQueryServiceClient::new(app.pd_channel().await?);
let compact_block = client
.compact_block(CompactBlockRequest { height: *height })
.await?
.into_inner()
.compact_block
.ok_or_else(|| anyhow!("compact block missing from response"))?;
let json = serde_json::to_string_pretty(&compact_block)?;

println!("{}", json.to_colored_json_auto()?);
return Ok(());
}

let key = match self {
QueryCmd::Tx(_)
| QueryCmd::Chain(_)
Expand Down
9 changes: 4 additions & 5 deletions crates/bin/pcli/src/command/query/shielded_pool.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use anyhow::Result;
use colored_json::prelude::*;
use penumbra_compact_block::CompactBlock;
use penumbra_proto::DomainType;
use penumbra_sct::{CommitmentSource, NullificationInfo, Nullifier};
use penumbra_tct::StateCommitment;
Expand Down Expand Up @@ -30,11 +29,12 @@ pub enum ShieldedPool {

impl ShieldedPool {
pub fn key(&self) -> String {
use penumbra_compact_block::state_key as cb_state_key;
use penumbra_sct::state_key as sct_state_key;
match self {
ShieldedPool::Anchor { height } => sct_state_key::anchor_by_height(*height),
ShieldedPool::CompactBlock { height } => cb_state_key::compact_block(*height),
ShieldedPool::CompactBlock { .. } => {
unreachable!("should be handled at outer level via rpc");
}
ShieldedPool::Commitment { commitment } => sct_state_key::note_source(commitment),
ShieldedPool::Nullifier { nullifier } => {
sct_state_key::spent_nullifier_lookup(nullifier)
Expand All @@ -49,8 +49,7 @@ impl ShieldedPool {
serde_json::to_string_pretty(&anchor)?
}
ShieldedPool::CompactBlock { .. } => {
let compact_block = CompactBlock::decode(bytes)?;
serde_json::to_string_pretty(&compact_block)?
unreachable!("should be handled at outer level via rpc");
}
ShieldedPool::Commitment { .. } => {
let commitment_source = CommitmentSource::decode(bytes)?;
Expand Down
19 changes: 19 additions & 0 deletions crates/core/component/compact-block/src/component/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use futures::{StreamExt, TryFutureExt, TryStreamExt};
use penumbra_chain::component::StateReadExt as _;
use penumbra_proto::core::component::compact_block::v1alpha1::{
query_service_server::QueryService, CompactBlockRangeRequest, CompactBlockRangeResponse,
CompactBlockRequest, CompactBlockResponse,
};
use tokio::sync::mpsc;
use tonic::Status;
Expand All @@ -30,6 +31,24 @@ impl QueryService for Server {
Box<dyn futures::Stream<Item = Result<CompactBlockRangeResponse, tonic::Status>> + Send>,
>;

async fn compact_block(
&self,
request: tonic::Request<CompactBlockRequest>,
) -> Result<tonic::Response<CompactBlockResponse>, Status> {
let snapshot = self.storage.latest_snapshot();

let height = request.get_ref().height;
let compact_block = snapshot
.compact_block(height)
.await
.map_err(|e| tonic::Status::internal(format!("error fetching block: {e:#}")))?
.ok_or_else(|| tonic::Status::not_found(format!("compact block {height} not found")))?;

Ok(tonic::Response::new(CompactBlockResponse {
compact_block: Some(compact_block.into()),
}))
}

#[instrument(
skip(self, request),
fields(
Expand Down
123 changes: 121 additions & 2 deletions crates/proto/src/gen/penumbra.core.component.compact_block.v1alpha1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,36 @@ impl ::prost::Name for CompactBlockRangeResponse {
)
}
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct CompactBlockRequest {
#[prost(uint64, tag = "1")]
pub height: u64,
}
impl ::prost::Name for CompactBlockRequest {
const NAME: &'static str = "CompactBlockRequest";
const PACKAGE: &'static str = "penumbra.core.component.compact_block.v1alpha1";
fn full_name() -> ::prost::alloc::string::String {
::prost::alloc::format!(
"penumbra.core.component.compact_block.v1alpha1.{}", Self::NAME
)
}
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct CompactBlockResponse {
#[prost(message, optional, tag = "1")]
pub compact_block: ::core::option::Option<CompactBlock>,
}
impl ::prost::Name for CompactBlockResponse {
const NAME: &'static str = "CompactBlockResponse";
const PACKAGE: &'static str = "penumbra.core.component.compact_block.v1alpha1";
fn full_name() -> ::prost::alloc::string::String {
::prost::alloc::format!(
"penumbra.core.component.compact_block.v1alpha1.{}", Self::NAME
)
}
}
/// Generated client implementations.
#[cfg(feature = "rpc")]
pub mod query_service_client {
Expand Down Expand Up @@ -266,7 +296,7 @@ pub mod query_service_client {
self.inner = self.inner.max_encoding_message_size(limit);
self
}
/// Returns a stream of `CompactBlockRangeResponse`s.
/// Returns a stream of compact blocks, optionally keeping the stream alive for push notifications.
pub async fn compact_block_range(
&mut self,
request: impl tonic::IntoRequest<super::CompactBlockRangeRequest>,
Expand Down Expand Up @@ -297,6 +327,39 @@ pub mod query_service_client {
);
self.inner.server_streaming(req, path, codec).await
}
/// Returns a single compact block at a specific height.
///
/// Clients requesting multiple compact blocks should generally use the streaming RPC.
pub async fn compact_block(
&mut self,
request: impl tonic::IntoRequest<super::CompactBlockRequest>,
) -> std::result::Result<
tonic::Response<super::CompactBlockResponse>,
tonic::Status,
> {
self.inner
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
format!("Service was not ready: {}", e.into()),
)
})?;
let codec = tonic::codec::ProstCodec::default();
let path = http::uri::PathAndQuery::from_static(
"/penumbra.core.component.compact_block.v1alpha1.QueryService/CompactBlock",
);
let mut req = request.into_request();
req.extensions_mut()
.insert(
GrpcMethod::new(
"penumbra.core.component.compact_block.v1alpha1.QueryService",
"CompactBlock",
),
);
self.inner.unary(req, path, codec).await
}
}
}
/// Generated server implementations.
Expand All @@ -316,14 +379,24 @@ pub mod query_service_server {
>
+ Send
+ 'static;
/// Returns a stream of `CompactBlockRangeResponse`s.
/// Returns a stream of compact blocks, optionally keeping the stream alive for push notifications.
async fn compact_block_range(
&self,
request: tonic::Request<super::CompactBlockRangeRequest>,
) -> std::result::Result<
tonic::Response<Self::CompactBlockRangeStream>,
tonic::Status,
>;
/// Returns a single compact block at a specific height.
///
/// Clients requesting multiple compact blocks should generally use the streaming RPC.
async fn compact_block(
&self,
request: tonic::Request<super::CompactBlockRequest>,
) -> std::result::Result<
tonic::Response<super::CompactBlockResponse>,
tonic::Status,
>;
}
/// Query operations for the compact block component.
#[derive(Debug)]
Expand Down Expand Up @@ -454,6 +527,52 @@ pub mod query_service_server {
};
Box::pin(fut)
}
"/penumbra.core.component.compact_block.v1alpha1.QueryService/CompactBlock" => {
#[allow(non_camel_case_types)]
struct CompactBlockSvc<T: QueryService>(pub Arc<T>);
impl<
T: QueryService,
> tonic::server::UnaryService<super::CompactBlockRequest>
for CompactBlockSvc<T> {
type Response = super::CompactBlockResponse;
type Future = BoxFuture<
tonic::Response<Self::Response>,
tonic::Status,
>;
fn call(
&mut self,
request: tonic::Request<super::CompactBlockRequest>,
) -> Self::Future {
let inner = Arc::clone(&self.0);
let fut = async move {
<T as QueryService>::compact_block(&inner, request).await
};
Box::pin(fut)
}
}
let accept_compression_encodings = self.accept_compression_encodings;
let send_compression_encodings = self.send_compression_encodings;
let max_decoding_message_size = self.max_decoding_message_size;
let max_encoding_message_size = self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
let inner = inner.0;
let method = CompactBlockSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
.apply_compression_config(
accept_compression_encodings,
send_compression_encodings,
)
.apply_max_message_size_config(
max_decoding_message_size,
max_encoding_message_size,
);
let res = grpc.unary(method, req).await;
Ok(res)
};
Box::pin(fut)
}
_ => {
Box::pin(async move {
Ok(
Expand Down
Loading

0 comments on commit 4b76d88

Please sign in to comment.