Skip to content

Commit

Permalink
penumbra: remove Builder factory
Browse files Browse the repository at this point in the history
  • Loading branch information
erwanor committed Apr 6, 2024
1 parent 8b06546 commit 4dfb1c0
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 202 deletions.
4 changes: 3 additions & 1 deletion .cargo/config
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
[build]
# Enable Tokio's `tracing` support for `tokio-console`
rustflags = ["--cfg", "tokio_unstable"]
# rustflags = ["--cfg", "tokio_unstable"]
# Note(erwan): We decided to disable it for the time being,
# I'm keeping this around to be able to reactivate it on a whim.
12 changes: 4 additions & 8 deletions crates/bin/pd/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,9 @@ async fn main() -> anyhow::Result<()> {
"starting pd"
);

let abci_server = tokio::task::Builder::new()
.name("abci_server")
.spawn(penumbra_app::server::new(storage.clone()).listen_tcp(abci_bind))
.expect("failed to spawn abci server");
let abci_server = tokio::task::spawn(
penumbra_app::server::new(storage.clone()).listen_tcp(abci_bind),
);

let grpc_server =
penumbra_app::rpc::router(&storage, cometbft_addr, enable_expensive_rpc)?;
Expand All @@ -148,10 +147,7 @@ async fn main() -> anyhow::Result<()> {
// resolver if auto-https has been enabled.
macro_rules! spawn_grpc_server {
($server:expr) => {
tokio::task::Builder::new()
.name("grpc_server")
.spawn($server.serve(make_svc))
.expect("failed to spawn grpc server")
tokio::task::spawn($server.serve(make_svc))
};
}
let grpc_server = axum_server::bind(grpc_bind);
Expand Down
253 changes: 111 additions & 142 deletions crates/cnidarium/src/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,11 @@ impl Snapshot {
db: db.clone(),
};

let (substore_value, substore_commitment_proof) = tokio::task::Builder::new()
.name("Snapshot::get_with_proof")
.spawn_blocking({
let span = span.clone();
move || span.in_scope(|| substore.get_with_proof(substore_key_bytes))
})?
.await??;
let (substore_value, substore_commitment_proof) = tokio::task::spawn_blocking({
let span = span.clone();
move || span.in_scope(|| substore.get_with_proof(substore_key_bytes))
})
.await??;

proofs.push(substore_commitment_proof);

Expand All @@ -104,13 +102,11 @@ impl Snapshot {
db,
};

let (_, main_commitment_proof) = tokio::task::Builder::new()
.name("Snapshot::get_with_proof")
.spawn_blocking({
let span = span.clone();
move || span.in_scope(|| mainstore.get_with_proof(key_to_substore_root.into()))
})?
.await??;
let (_, main_commitment_proof) = tokio::task::spawn_blocking({
let span = span.clone();
move || span.in_scope(|| mainstore.get_with_proof(key_to_substore_root.into()))
})
.await??;

proofs.push(main_commitment_proof);
}
Expand Down Expand Up @@ -172,10 +168,7 @@ impl Snapshot {
"fetching root hash for substore"
);

tokio::task::Builder::new()
.name("Snapshot::prefix_root_hash")
.spawn_blocking(move || span.in_scope(|| substore.root_hash()))?
.await?
tokio::task::spawn_blocking(move || span.in_scope(|| substore.root_hash())).await?
}

pub async fn root_hash(&self) -> Result<crate::RootHash> {
Expand Down Expand Up @@ -221,21 +214,15 @@ impl StateRead for Snapshot {
};
let key_hash = jmt::KeyHash::with::<sha2::Sha256>(key);

crate::future::SnapshotFuture(
tokio::task::Builder::new()
.name("Snapshot::get_raw")
.spawn_blocking(move || {
span.in_scope(|| {
let _start = std::time::Instant::now();
let rsp = substore.get_jmt(key_hash);
#[cfg(feature = "metrics")]
metrics::histogram!(metrics::STORAGE_GET_RAW_DURATION)
.record(_start.elapsed());
rsp
})
})
.expect("spawning threads is possible"),
)
crate::future::SnapshotFuture(tokio::task::spawn_blocking(move || {
span.in_scope(|| {
let _start = std::time::Instant::now();
let rsp = substore.get_jmt(key_hash);
#[cfg(feature = "metrics")]
metrics::histogram!(metrics::STORAGE_GET_RAW_DURATION).record(_start.elapsed());
rsp
})
}))
}

/// Fetch a key from nonverifiable storage.
Expand All @@ -258,26 +245,21 @@ impl StateRead for Snapshot {
};
let key: Vec<u8> = key.to_vec();

crate::future::SnapshotFuture(
tokio::task::Builder::new()
.name("Snapshot::nonverifiable_get_raw")
.spawn_blocking(move || {
span.in_scope(|| {
let _start = std::time::Instant::now();

let cf_nonverifiable = substore.config.cf_nonverifiable(&substore.db);
let rsp = substore
.rocksdb_snapshot
.get_cf(cf_nonverifiable, key)
.map_err(Into::into);
#[cfg(feature = "metrics")]
metrics::histogram!(metrics::STORAGE_NONCONSENSUS_GET_RAW_DURATION)
.record(_start.elapsed());
rsp
})
})
.expect("spawning threads is possible"),
)
crate::future::SnapshotFuture(tokio::task::spawn_blocking(move || {
span.in_scope(|| {
let _start = std::time::Instant::now();

let cf_nonverifiable = substore.config.cf_nonverifiable(&substore.db);
let rsp = substore
.rocksdb_snapshot
.get_cf(cf_nonverifiable, key)
.map_err(Into::into);
#[cfg(feature = "metrics")]
metrics::histogram!(metrics::STORAGE_NONCONSENSUS_GET_RAW_DURATION)
.record(_start.elapsed());
rsp
})
}))
}

/// Returns a stream of all key-value pairs with the given prefix.
Expand Down Expand Up @@ -309,36 +291,33 @@ impl StateRead for Snapshot {
// Since the JMT keys are hashed, we can't use a prefix iterator directly.
// We need to first prefix range the key preimages column family, then use the hashed matches to fetch the values
// from the JMT column family.
tokio::task::Builder::new()
.name("Snapshot::prefix_raw")
.spawn_blocking(move || {
span.in_scope(|| {
let cf_jmt_keys = substore.config.cf_jmt_keys(&substore.db);
let jmt_keys_iterator =
substore
.rocksdb_snapshot
.iterator_cf_opt(cf_jmt_keys, options, mode);

for tuple in jmt_keys_iterator {
// For each key that matches the prefix, fetch the value from the JMT column family.
let (key_preimage, _) = tuple?;

let k = std::str::from_utf8(key_preimage.as_ref())
.expect("saved jmt keys are utf-8 strings")
.to_string();

let key_hash = jmt::KeyHash::with::<sha2::Sha256>(k.as_bytes());

let v = substore
.get_jmt(key_hash)?
.expect("keys in jmt_keys should have a corresponding value in jmt");

tx_prefix_item.blocking_send(Ok((k, v)))?;
}
anyhow::Ok(())
})
tokio::task::spawn_blocking(move || {
span.in_scope(|| {
let cf_jmt_keys = substore.config.cf_jmt_keys(&substore.db);
let jmt_keys_iterator =
substore
.rocksdb_snapshot
.iterator_cf_opt(cf_jmt_keys, options, mode);

for tuple in jmt_keys_iterator {
// For each key that matches the prefix, fetch the value from the JMT column family.
let (key_preimage, _) = tuple?;

let k = std::str::from_utf8(key_preimage.as_ref())
.expect("saved jmt keys are utf-8 strings")
.to_string();

let key_hash = jmt::KeyHash::with::<sha2::Sha256>(k.as_bytes());

let v = substore
.get_jmt(key_hash)?
.expect("keys in jmt_keys should have a corresponding value in jmt");

tx_prefix_item.blocking_send(Ok((k, v)))?;
}
anyhow::Ok(())
})
.expect("should be able to spawn_blocking");
});

tokio_stream::wrappers::ReceiverStream::new(rx_prefix_query)
}
Expand Down Expand Up @@ -371,27 +350,23 @@ impl StateRead for Snapshot {
let mode = rocksdb::IteratorMode::Start;
let (tx_prefix_keys, rx_prefix_keys) = mpsc::channel(10);

tokio::task::Builder::new()
.name("Snapshot::prefix_keys")
.spawn_blocking(move || {
span.in_scope(|| {
let cf_jmt_keys = substore.config.cf_jmt_keys(&substore.db);
let iter =
substore
.rocksdb_snapshot
.iterator_cf_opt(cf_jmt_keys, options, mode);

for key_and_keyhash in iter {
let (raw_preimage, _) = key_and_keyhash?;
let preimage = std::str::from_utf8(raw_preimage.as_ref())
.expect("saved jmt keys are utf-8 strings")
.to_string();
tx_prefix_keys.blocking_send(Ok(preimage))?;
}
anyhow::Ok(())
})
tokio::task::spawn_blocking(move || {
span.in_scope(|| {
let cf_jmt_keys = substore.config.cf_jmt_keys(&substore.db);
let iter = substore
.rocksdb_snapshot
.iterator_cf_opt(cf_jmt_keys, options, mode);

for key_and_keyhash in iter {
let (raw_preimage, _) = key_and_keyhash?;
let preimage = std::str::from_utf8(raw_preimage.as_ref())
.expect("saved jmt keys are utf-8 strings")
.to_string();
tx_prefix_keys.blocking_send(Ok(preimage))?;
}
anyhow::Ok(())
})
.expect("should be able to spawn_blocking");
});

tokio_stream::wrappers::ReceiverStream::new(rx_prefix_keys)
}
Expand Down Expand Up @@ -421,23 +396,20 @@ impl StateRead for Snapshot {

let (tx_prefix_query, rx_prefix_query) = mpsc::channel(10);

tokio::task::Builder::new()
.name("Snapshot::nonverifiable_prefix_raw")
.spawn_blocking(move || {
span.in_scope(|| {
let cf_nonverifiable = substore.config.cf_nonverifiable(&substore.db);
let iter =
substore
.rocksdb_snapshot
.iterator_cf_opt(cf_nonverifiable, options, mode);
for i in iter {
let (key, value) = i?;
tx_prefix_query.blocking_send(Ok((key.into(), value.into())))?;
}
anyhow::Ok(())
})
tokio::task::spawn_blocking(move || {
span.in_scope(|| {
let cf_nonverifiable = substore.config.cf_nonverifiable(&substore.db);
let iter =
substore
.rocksdb_snapshot
.iterator_cf_opt(cf_nonverifiable, options, mode);
for i in iter {
let (key, value) = i?;
tx_prefix_query.blocking_send(Ok((key.into(), value.into())))?;
}
anyhow::Ok(())
})
.expect("should be able to spawn_blocking");
});

tokio_stream::wrappers::ReceiverStream::new(rx_prefix_query)
}
Expand Down Expand Up @@ -512,32 +484,29 @@ impl StateRead for Snapshot {
let prefix = prefix.to_vec();

let (tx, rx) = mpsc::channel::<Result<(Vec<u8>, Vec<u8>)>>(10);
tokio::task::Builder::new()
.name("Snapshot::nonverifiable_range_raw")
.spawn_blocking(move || {
span.in_scope(|| {
let cf_nonverifiable = substore.config.cf_nonverifiable(&substore.db);
let iter =
substore
.rocksdb_snapshot
.iterator_cf_opt(cf_nonverifiable, options, mode);

for i in iter {
let (key, value) = i?;

// This is a bit of a hack, but RocksDB doesn't let us express the "prefixed range-queries",
// that we want to support. In particular, we want to be able to do a prefix query that starts
// at a particular key, and does not have an upper bound. Since we can't create an iterator that
// cover this range, we have to filter out the keys that don't match the prefix.
if !prefix.is_empty() && !key.starts_with(&prefix) {
break;
}
tx.blocking_send(Ok((key.into(), value.into())))?;
tokio::task::spawn_blocking(move || {
span.in_scope(|| {
let cf_nonverifiable = substore.config.cf_nonverifiable(&substore.db);
let iter =
substore
.rocksdb_snapshot
.iterator_cf_opt(cf_nonverifiable, options, mode);

for i in iter {
let (key, value) = i?;

// This is a bit of a hack, but RocksDB doesn't let us express the "prefixed range-queries",
// that we want to support. In particular, we want to be able to do a prefix query that starts
// at a particular key, and does not have an upper bound. Since we can't create an iterator that
// cover this range, we have to filter out the keys that don't match the prefix.
if !prefix.is_empty() && !key.starts_with(&prefix) {
break;
}
Ok::<(), anyhow::Error>(())
})
tx.blocking_send(Ok((key.into(), value.into())))?;
}
Ok::<(), anyhow::Error>(())
})
.expect("should be able to spawn_blocking");
});

Ok(tokio_stream::wrappers::ReceiverStream::new(rx))
}
Expand Down
Loading

0 comments on commit 4dfb1c0

Please sign in to comment.