Skip to content

Commit

Permalink
checkpoint: seems to compile
Browse files Browse the repository at this point in the history
  • Loading branch information
hdevalence committed Sep 23, 2023
1 parent 1e8b1b8 commit 52a3dbb
Show file tree
Hide file tree
Showing 23 changed files with 1,554 additions and 1,746 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions crates/bin/pcli/src/command/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,10 @@ impl QueryCmd {
QueryCmd::Key { key } => key.clone(),
};

use penumbra_proto::storage::v1alpha1::query_service_client::QueryServiceClient;
use penumbra_proto::core::app::v1alpha1::query_service_client::QueryServiceClient;
let mut client = QueryServiceClient::new(app.pd_channel().await?);

let req = penumbra_proto::storage::v1alpha1::KeyValueRequest {
let req = penumbra_proto::core::app::v1alpha1::KeyValueRequest {
key: key.clone(),
..Default::default()
};
Expand Down
2 changes: 1 addition & 1 deletion crates/bin/pcli/src/command/query/ibc_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use ibc_types::core::channel::ChannelEnd;
use ibc_types::core::connection::ConnectionEnd;
use ibc_types::lightclients::tendermint::client_state::ClientState as TendermintClientState;

use penumbra_proto::storage::v1alpha1::{
use penumbra_proto::core::app::v1alpha1::{
query_service_client::QueryServiceClient as StorageQueryServiceClient, KeyValueRequest,
};
use penumbra_proto::DomainType;
Expand Down
204 changes: 0 additions & 204 deletions crates/bin/pd/src/info/specific.rs
Original file line number Diff line number Diff line change
@@ -1,204 +0,0 @@
use std::pin::Pin;
use std::sync::Arc;

use async_stream::try_stream;
use futures::StreamExt;
use futures::TryStreamExt;
use penumbra_asset::{asset, Value};
use penumbra_chain::component::AppHashRead;
use penumbra_chain::component::StateReadExt as _;
use penumbra_dex::component::router::RouteAndFill;
use penumbra_dex::component::router::RoutingParams;
use penumbra_dex::{
component::{PositionRead, StateReadExt},
lp::{position, position::Position},
DirectedTradingPair, SwapExecution, TradingPair,
};
use penumbra_governance::StateReadExt as _;
use penumbra_proto::{
self as proto,
client::v1alpha1::{
specific_query_service_server::SpecificQueryService, BatchSwapOutputDataRequest,
DenomMetadataByIdRequest, KeyValueRequest, KeyValueResponse, ProposalInfoRequest,
ProposalInfoResponse, ProposalRateDataRequest, ProposalRateDataResponse,
ValidatorStatusRequest,
},
StateReadProto as _,
};
use penumbra_sct::component::StateReadExt as _;
use penumbra_shielded_pool::component::SupplyRead as _;
use penumbra_stake::rate::RateData;
use penumbra_stake::StateReadExt as _;

use penumbra_proto::DomainType;
use penumbra_storage::StateDelta;
use penumbra_storage::StateRead;

use tonic::Status;
use tracing::instrument;

// We need to use the tracing-futures version of Instrument,
// because we want to instrument a Stream, and the Stream trait
// isn't in std, and the tracing::Instrument trait only works with
// (stable) std types.
//use tracing_futures::Instrument;

use super::Info;

#[tonic::async_trait]
impl SpecificQueryService for Info {



#[instrument(skip(self, request))]
async fn proposal_info(
&self,
request: tonic::Request<ProposalInfoRequest>,
) -> Result<tonic::Response<ProposalInfoResponse>, Status> {
let state = self.storage.latest_snapshot();
state
.check_chain_id(&request.get_ref().chain_id)
.await
.map_err(|e| tonic::Status::unknown(format!("chain_id not OK: {e}")))?;
let proposal_id = request.into_inner().proposal_id;

let start_block_height = state
.proposal_voting_start(proposal_id)
.await
.map_err(|e| tonic::Status::internal(e.to_string()))?
.ok_or_else(|| tonic::Status::unknown(format!("proposal {proposal_id} not found")))?;

let start_position = state
.proposal_voting_start_position(proposal_id)
.await
.map_err(|e| tonic::Status::internal(e.to_string()))?
.ok_or_else(|| tonic::Status::unknown(format!("proposal {proposal_id} not found")))?;

Ok(tonic::Response::new(ProposalInfoResponse {
start_block_height,
start_position: start_position.into(),
}))
}

type ProposalRateDataStream = Pin<
Box<dyn futures::Stream<Item = Result<ProposalRateDataResponse, tonic::Status>> + Send>,
>;

#[instrument(skip(self, request))]
async fn proposal_rate_data(
&self,
request: tonic::Request<ProposalRateDataRequest>,
) -> Result<tonic::Response<Self::ProposalRateDataStream>, Status> {
let state = self.storage.latest_snapshot();
state
.check_chain_id(&request.get_ref().chain_id)
.await
.map_err(|e| tonic::Status::unknown(format!("chain_id not OK: {e}")))?;
let proposal_id = request.into_inner().proposal_id;

use penumbra_governance::state_key;

let s = state.prefix(&state_key::all_rate_data_at_proposal_start(proposal_id));
Ok(tonic::Response::new(
s.map_ok(|i: (String, RateData)| {
let (_key, rate_data) = i;
ProposalRateDataResponse {
rate_data: Some(rate_data.into()),
}
})
.map_err(|e: anyhow::Error| {
tonic::Status::unavailable(format!("error getting prefix value from storage: {e}"))
})
// TODO: how do we instrument a Stream
//.instrument(Span::current())
.boxed(),
))
}

#[instrument(skip(self, request))]
async fn key_value(
&self,
request: tonic::Request<KeyValueRequest>,
) -> Result<tonic::Response<KeyValueResponse>, Status> {
let state = self.storage.latest_snapshot();
// We map the error here to avoid including `tonic` as a dependency
// in the `chain` crate, to support its compilation to wasm.
state
.check_chain_id(&request.get_ref().chain_id)
.await
.map_err(|e| tonic::Status::unknown(format!("chain_id not OK: {e}")))?;

let request = request.into_inner();
tracing::debug!(?request);

if request.key.is_empty() {
return Err(Status::invalid_argument("key is empty"));
}

// TODO(erwan): we are unconditionally generating the proof here; we shouldn't do that if the
// request doesn't ask for it
let (some_value, proof) = state
.get_with_proof_to_apphash(request.key.into_bytes())
.await
.map_err(|e| tonic::Status::internal(e.to_string()))?;

Ok(tonic::Response::new(KeyValueResponse {
value: some_value.map(Into::into),
proof: if request.proof {
Some(ibc_proto::ibc::core::commitment::v1::MerkleProof {
proofs: proof
.proofs
.into_iter()
.map(|p| {
let mut encoded = Vec::new();
prost::Message::encode(&p, &mut encoded).expect("able to encode proof");
prost::Message::decode(&*encoded).expect("able to decode proof")
})
.collect(),
})
} else {
None
},
}))
}

type PrefixValueStream =
Pin<Box<dyn futures::Stream<Item = Result<PrefixValueResponse, tonic::Status>> + Send>>;

#[instrument(skip(self, request))]
async fn prefix_value(
&self,
request: tonic::Request<PrefixValueRequest>,
) -> Result<tonic::Response<Self::PrefixValueStream>, Status> {
let state = self.storage.latest_snapshot();
state
.check_chain_id(&request.get_ref().chain_id)
.await
.map_err(|e| tonic::Status::unknown(format!("chain_id not OK: {e}")))?;
let request = request.into_inner();
tracing::debug!(?request);

if request.prefix.is_empty() {
return Err(Status::invalid_argument("prefix is empty"));
}

Ok(tonic::Response::new(
state
.prefix_raw(&request.prefix)
.map_ok(|i: (String, Vec<u8>)| {
let (key, value) = i;
PrefixValueResponse { key, value }
})
.map_err(|e: anyhow::Error| {
tonic::Status::unavailable(format!(
"error getting prefix value from storage: {e}"
))
})
// TODO: how do we instrument a Stream
//.instrument(Span::current())
.boxed(),
))
}


}
82 changes: 62 additions & 20 deletions crates/bin/pd/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,7 @@ use pd::testnet::{
join::testnet_join,
};
use pd::upgrade::{self, Upgrade};
use penumbra_proto::client::v1alpha1::{
oblivious_query_service_server::ObliviousQueryServiceServer,
specific_query_service_server::SpecificQueryServiceServer,
tendermint_proxy_service_server::TendermintProxyServiceServer,
};
use penumbra_proto::util::tendermint_proxy::v1alpha1::tendermint_proxy_service_server::TendermintProxyServiceServer;
use penumbra_storage::{StateDelta, Storage};
use penumbra_tendermint_proxy::TendermintProxy;
use penumbra_tower_trace::remote_addr;
Expand Down Expand Up @@ -335,6 +331,38 @@ async fn main() -> anyhow::Result<()> {

let ibc = penumbra_ibc::component::rpc::IbcQuery::new(storage.clone());

// TODO: Once we migrate to Tonic 0.10.0, we'll be able to use the
// `Routes` structure to have each component define a method that
// returns a `Routes` with all of its query services bundled inside.
//
// This means we won't have to import all this shit and recite every
// single service -- we can e.g., have the app crate assemble all of
// its components' query services into a single `Routes` and then
// just add that to the gRPC server.

use penumbra_proto::core::{
app::v1alpha1::query_service_server::QueryServiceServer as AppQueryServiceServer,
component::{
chain::v1alpha1::query_service_server::QueryServiceServer as ChainQueryServiceServer,
compact_block::v1alpha1::query_service_server::QueryServiceServer as CompactBlockQueryServiceServer,
dex::v1alpha1::query_service_server::QueryServiceServer as DexQueryServiceServer,
governance::v1alpha1::query_service_server::QueryServiceServer as GovernanceQueryServiceServer,
sct::v1alpha1::query_service_server::QueryServiceServer as SctQueryServiceServer,
shielded_pool::v1alpha1::query_service_server::QueryServiceServer as ShieldedPoolQueryServiceServer,
stake::v1alpha1::query_service_server::QueryServiceServer as StakeQueryServiceServer,
},
};
use tonic_web::enable as we;

use penumbra_app::rpc::Server as AppServer;
use penumbra_chain::component::rpc::Server as ChainServer;
use penumbra_compact_block::component::rpc::Server as CompactBlockServer;
use penumbra_dex::component::rpc::Server as DexServer;
use penumbra_governance::component::rpc::Server as GovernanceServer;
use penumbra_sct::component::rpc::Server as SctServer;
use penumbra_shielded_pool::component::rpc::Server as ShieldedPoolServer;
use penumbra_stake::component::rpc::Server as StakeServer;

let grpc_server = Server::builder()
.trace_fn(|req| match remote_addr(req) {
Some(remote_addr) => {
Expand All @@ -352,24 +380,38 @@ async fn main() -> anyhow::Result<()> {
// new blocks.
// .timeout(std::time::Duration::from_secs(7))
// Wrap each of the gRPC services in a tonic-web proxy:
.add_service(tonic_web::enable(ObliviousQueryServiceServer::new(
info.clone(),
.add_service(we(AppQueryServiceServer::new(AppServer::new(
storage.clone(),
))))
.add_service(we(ChainQueryServiceServer::new(ChainServer::new(
storage.clone(),
))))
.add_service(we(CompactBlockQueryServiceServer::new(
CompactBlockServer::new(storage.clone()),
)))
.add_service(tonic_web::enable(SpecificQueryServiceServer::new(
info.clone(),
.add_service(we(DexQueryServiceServer::new(DexServer::new(
storage.clone(),
))))
.add_service(we(GovernanceQueryServiceServer::new(
GovernanceServer::new(storage.clone()),
)))
.add_service(tonic_web::enable(ClientQueryServer::new(ibc.clone())))
.add_service(tonic_web::enable(ChannelQueryServer::new(ibc.clone())))
.add_service(tonic_web::enable(ConnectionQueryServer::new(ibc.clone())))
.add_service(tonic_web::enable(TendermintProxyServiceServer::new(
tm_proxy.clone(),
.add_service(we(SctQueryServiceServer::new(SctServer::new(
storage.clone(),
))))
.add_service(we(ShieldedPoolQueryServiceServer::new(
ShieldedPoolServer::new(storage.clone()),
)))
.add_service(tonic_web::enable(
tonic_reflection::server::Builder::configure()
.register_encoded_file_descriptor_set(penumbra_proto::FILE_DESCRIPTOR_SET)
.build()
.with_context(|| "could not configure grpc reflection service")?,
));
.add_service(we(StakeQueryServiceServer::new(StakeServer::new(
storage.clone(),
))))
.add_service(we(ClientQueryServer::new(ibc.clone())))
.add_service(we(ChannelQueryServer::new(ibc.clone())))
.add_service(we(ConnectionQueryServer::new(ibc.clone())))
.add_service(we(TendermintProxyServiceServer::new(tm_proxy.clone())))
.add_service(we(tonic_reflection::server::Builder::configure()
.register_encoded_file_descriptor_set(penumbra_proto::FILE_DESCRIPTOR_SET)
.build()
.with_context(|| "could not configure grpc reflection service")?));

let grpc_server = if let Some(domain) = grpc_auto_https {
use pd::auto_https::Wrapper;
Expand Down
Loading

0 comments on commit 52a3dbb

Please sign in to comment.