Skip to content

Commit

Permalink
view: 👀 introduce more tracing instrumentation
Browse files Browse the repository at this point in the history
NB: this adds a `tap` dependency to the `penumbra-view` crate too.
  • Loading branch information
cratelyn committed May 7, 2024
1 parent 9b14688 commit b4b747b
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 11 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/view/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ rand_core = {workspace = true, features = ["getrandom"]}
serde = {workspace = true, features = ["derive"]}
serde_json = {workspace = true}
sha2 = {workspace = true}
tap = {workspace = true}
tendermint = {workspace = true}
tokio = {workspace = true, features = ["full"]}
tokio-stream = {workspace = true, features = ["sync"]}
Expand Down
29 changes: 23 additions & 6 deletions crates/view/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use futures::stream::{self, StreamExt, TryStreamExt};
use penumbra_auction::auction::dutch::actions::view::ActionDutchAuctionWithdrawView;
use rand::Rng;
use rand_core::OsRng;
use tap::Tap;
use tokio::sync::{watch, RwLock};
use tokio_stream::wrappers::WatchStream;
use tonic::{async_trait, transport::Channel, Request, Response, Status};
Expand Down Expand Up @@ -91,14 +92,27 @@ pub struct ViewServer {

impl ViewServer {
/// Convenience method that calls [`Storage::load_or_initialize`] and then [`Self::new`].
#[instrument(
skip_all,
fields(
path = ?storage_path.as_ref().map(|p| p.as_ref().as_str()),
url = %node,
)
)]
pub async fn load_or_initialize(
storage_path: Option<impl AsRef<Utf8Path>>,
fvk: &FullViewingKey,
node: Url,
) -> anyhow::Result<Self> {
let storage = Storage::load_or_initialize(storage_path, fvk, node.clone()).await?;
let storage = Storage::load_or_initialize(storage_path, fvk, node.clone())
.tap(|_| tracing::trace!("loading or initializing storage"))
.await?
.tap(|_| tracing::debug!("storage is ready"));

Self::new(storage, node).await
Self::new(storage, node)
.tap(|_| tracing::trace!("constructing view server"))
.await
.tap(|_| tracing::debug!("constructed view server"))
}

/// Constructs a new [`ViewService`], spawning a sync task internally.
Expand All @@ -109,16 +123,19 @@ impl ViewServer {
/// by this method, rather than calling it multiple times. That way, each clone
/// will be backed by the same scanning task, rather than each spawning its own.
pub async fn new(storage: Storage, node: Url) -> anyhow::Result<Self> {
let (worker, sct, error_slot, sync_height_rx) =
Worker::new(storage.clone(), node.clone()).await?;
let (worker, state_commitment_tree, error_slot, sync_height_rx) =
Worker::new(storage.clone(), node.clone())
.tap(|_| tracing::trace!("constructing view server worker"))
.await?
.tap(|_| tracing::debug!("constructed view server worker"));

tokio::spawn(worker.run());
tokio::spawn(worker.run()).tap(|_| tracing::debug!("spawned view server worker"));

Ok(Self {
storage,
error_slot,
sync_height_rx,
state_commitment_tree: sct,
state_commitment_tree,
node,
})
}
Expand Down
25 changes: 22 additions & 3 deletions crates/view/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@ use r2d2_sqlite::{
SqliteConnectionManager,
};
use sha2::{Digest, Sha256};
use tap::{Tap, TapFallible};
use tokio::{
sync::broadcast::{self, error::RecvError},
task::spawn_blocking,
};
use tracing::{error_span, Instrument};
use url::Url;

use penumbra_app::params::AppParameters;
Expand Down Expand Up @@ -75,20 +77,37 @@ pub struct Storage {

impl Storage {
/// If the database at `storage_path` exists, [`Self::load`] it, otherwise, [`Self::initialize`] it.
#[tracing::instrument(
skip_all,
fields(
path = ?storage_path.as_ref().map(|p| p.as_ref().as_str()),
url = %node,
)
)]
pub async fn load_or_initialize(
storage_path: Option<impl AsRef<Utf8Path>>,
fvk: &FullViewingKey,
node: Url,
) -> anyhow::Result<Self> {
if let Some(path) = storage_path.as_ref() {
if path.as_ref().exists() {
if let Some(path) = storage_path.as_ref().map(AsRef::as_ref) {
if path.exists() {
tracing::debug!(?path, "database exists");
return Self::load(path).await;
} else {
tracing::debug!(?path, "database does not exist");
}
};

let mut client = AppQueryServiceClient::connect(node.to_string()).await?;
let mut client = AppQueryServiceClient::connect(node.to_string())
.instrument(error_span!("connecting to endpoint"))
.await
.tap_err(|error| {
tracing::error!(?error, "failed to connect to app query service endpoint")
})?
.tap(|_| tracing::debug!("connected to app query service endpoint"));
let params = client
.app_parameters(tonic::Request::new(AppParametersRequest {}))
.instrument(error_span!("getting app parameters"))
.await?
.into_inner()
.try_into()?;
Expand Down
16 changes: 14 additions & 2 deletions crates/view/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ use penumbra_proto::{
use penumbra_sct::{CommitmentSource, Nullifier};
use penumbra_transaction::Transaction;
use proto::core::app::v1::TransactionsByHeightRequest;
use tap::{Tap, TapFallible};
use tokio::sync::{watch, RwLock};
use tonic::transport::Channel;
use tracing::instrument;
use url::Url;

use crate::{
Expand All @@ -55,6 +57,10 @@ impl Worker {
/// - a shared, in-memory SCT instance;
/// - a shared error slot;
/// - a channel for notifying the client of sync progress.
#[instrument(
skip(storage),
fields(url = %node)
)]
pub async fn new(
storage: Storage,
node: Url,
Expand All @@ -67,7 +73,12 @@ impl Worker {
),
anyhow::Error,
> {
let fvk = storage.full_viewing_key().await?;
tracing::trace!("constructing view server worker");
let fvk = storage
.full_viewing_key()
.await
.context("failed to retrieve full viewing key from storage")?
.tap(|_| tracing::debug!("retrieved full viewing key"));

// Create a shared, in-memory SCT.
let sct = Arc::new(RwLock::new(storage.state_commitment_tree().await?));
Expand All @@ -83,7 +94,8 @@ impl Worker {
.with_context(|| "could not parse node URI")?
.connect()
.await
.with_context(|| "could not connect to grpc server")?;
.with_context(|| "could not connect to grpc server")
.tap_err(|error| tracing::error!(?error, "could not connect to grpc server"))?;

Ok((
Self {
Expand Down

0 comments on commit b4b747b

Please sign in to comment.