diff --git a/.cargo/config b/.cargo/config index 404305d73c..c5d8c8e32e 100644 --- a/.cargo/config +++ b/.cargo/config @@ -1,3 +1,5 @@ [build] # Enable Tokio's `tracing` support for `tokio-console` -rustflags = ["--cfg", "tokio_unstable"] \ No newline at end of file +# rustflags = ["--cfg", "tokio_unstable"] +# Note(erwan): We decided to disable it for the time being, +# I'm keeping this around to be able to reactivate it on a whim. diff --git a/crates/bin/pd/src/main.rs b/crates/bin/pd/src/main.rs index e8dd635958..48bdec77c3 100644 --- a/crates/bin/pd/src/main.rs +++ b/crates/bin/pd/src/main.rs @@ -120,10 +120,9 @@ async fn main() -> anyhow::Result<()> { "starting pd" ); - let abci_server = tokio::task::Builder::new() - .name("abci_server") - .spawn(penumbra_app::server::new(storage.clone()).listen_tcp(abci_bind)) - .expect("failed to spawn abci server"); + let abci_server = tokio::task::spawn( + penumbra_app::server::new(storage.clone()).listen_tcp(abci_bind), + ); let grpc_server = penumbra_app::rpc::router(&storage, cometbft_addr, enable_expensive_rpc)?; @@ -148,10 +147,7 @@ async fn main() -> anyhow::Result<()> { // resolver if auto-https has been enabled. macro_rules! spawn_grpc_server { ($server:expr) => { - tokio::task::Builder::new() - .name("grpc_server") - .spawn($server.serve(make_svc)) - .expect("failed to spawn grpc server") + tokio::task::spawn($server.serve(make_svc)) }; } let grpc_server = axum_server::bind(grpc_bind); diff --git a/crates/cnidarium/src/snapshot.rs b/crates/cnidarium/src/snapshot.rs index c59a652a12..7463db16a2 100644 --- a/crates/cnidarium/src/snapshot.rs +++ b/crates/cnidarium/src/snapshot.rs @@ -81,13 +81,11 @@ impl Snapshot { db: db.clone(), }; - let (substore_value, substore_commitment_proof) = tokio::task::Builder::new() - .name("Snapshot::get_with_proof") - .spawn_blocking({ - let span = span.clone(); - move || span.in_scope(|| substore.get_with_proof(substore_key_bytes)) - })? - .await??; + let (substore_value, substore_commitment_proof) = tokio::task::spawn_blocking({ + let span = span.clone(); + move || span.in_scope(|| substore.get_with_proof(substore_key_bytes)) + }) + .await??; proofs.push(substore_commitment_proof); @@ -104,13 +102,11 @@ impl Snapshot { db, }; - let (_, main_commitment_proof) = tokio::task::Builder::new() - .name("Snapshot::get_with_proof") - .spawn_blocking({ - let span = span.clone(); - move || span.in_scope(|| mainstore.get_with_proof(key_to_substore_root.into())) - })? - .await??; + let (_, main_commitment_proof) = tokio::task::spawn_blocking({ + let span = span.clone(); + move || span.in_scope(|| mainstore.get_with_proof(key_to_substore_root.into())) + }) + .await??; proofs.push(main_commitment_proof); } @@ -172,10 +168,7 @@ impl Snapshot { "fetching root hash for substore" ); - tokio::task::Builder::new() - .name("Snapshot::prefix_root_hash") - .spawn_blocking(move || span.in_scope(|| substore.root_hash()))? - .await? + tokio::task::spawn_blocking(move || span.in_scope(|| substore.root_hash())).await? } pub async fn root_hash(&self) -> Result { @@ -221,21 +214,15 @@ impl StateRead for Snapshot { }; let key_hash = jmt::KeyHash::with::(key); - crate::future::SnapshotFuture( - tokio::task::Builder::new() - .name("Snapshot::get_raw") - .spawn_blocking(move || { - span.in_scope(|| { - let _start = std::time::Instant::now(); - let rsp = substore.get_jmt(key_hash); - #[cfg(feature = "metrics")] - metrics::histogram!(metrics::STORAGE_GET_RAW_DURATION) - .record(_start.elapsed()); - rsp - }) - }) - .expect("spawning threads is possible"), - ) + crate::future::SnapshotFuture(tokio::task::spawn_blocking(move || { + span.in_scope(|| { + let _start = std::time::Instant::now(); + let rsp = substore.get_jmt(key_hash); + #[cfg(feature = "metrics")] + metrics::histogram!(metrics::STORAGE_GET_RAW_DURATION).record(_start.elapsed()); + rsp + }) + })) } /// Fetch a key from nonverifiable storage. @@ -258,26 +245,21 @@ impl StateRead for Snapshot { }; let key: Vec = key.to_vec(); - crate::future::SnapshotFuture( - tokio::task::Builder::new() - .name("Snapshot::nonverifiable_get_raw") - .spawn_blocking(move || { - span.in_scope(|| { - let _start = std::time::Instant::now(); - - let cf_nonverifiable = substore.config.cf_nonverifiable(&substore.db); - let rsp = substore - .rocksdb_snapshot - .get_cf(cf_nonverifiable, key) - .map_err(Into::into); - #[cfg(feature = "metrics")] - metrics::histogram!(metrics::STORAGE_NONCONSENSUS_GET_RAW_DURATION) - .record(_start.elapsed()); - rsp - }) - }) - .expect("spawning threads is possible"), - ) + crate::future::SnapshotFuture(tokio::task::spawn_blocking(move || { + span.in_scope(|| { + let _start = std::time::Instant::now(); + + let cf_nonverifiable = substore.config.cf_nonverifiable(&substore.db); + let rsp = substore + .rocksdb_snapshot + .get_cf(cf_nonverifiable, key) + .map_err(Into::into); + #[cfg(feature = "metrics")] + metrics::histogram!(metrics::STORAGE_NONCONSENSUS_GET_RAW_DURATION) + .record(_start.elapsed()); + rsp + }) + })) } /// Returns a stream of all key-value pairs with the given prefix. @@ -309,36 +291,33 @@ impl StateRead for Snapshot { // Since the JMT keys are hashed, we can't use a prefix iterator directly. // We need to first prefix range the key preimages column family, then use the hashed matches to fetch the values // from the JMT column family. - tokio::task::Builder::new() - .name("Snapshot::prefix_raw") - .spawn_blocking(move || { - span.in_scope(|| { - let cf_jmt_keys = substore.config.cf_jmt_keys(&substore.db); - let jmt_keys_iterator = - substore - .rocksdb_snapshot - .iterator_cf_opt(cf_jmt_keys, options, mode); - - for tuple in jmt_keys_iterator { - // For each key that matches the prefix, fetch the value from the JMT column family. - let (key_preimage, _) = tuple?; - - let k = std::str::from_utf8(key_preimage.as_ref()) - .expect("saved jmt keys are utf-8 strings") - .to_string(); - - let key_hash = jmt::KeyHash::with::(k.as_bytes()); - - let v = substore - .get_jmt(key_hash)? - .expect("keys in jmt_keys should have a corresponding value in jmt"); - - tx_prefix_item.blocking_send(Ok((k, v)))?; - } - anyhow::Ok(()) - }) + tokio::task::spawn_blocking(move || { + span.in_scope(|| { + let cf_jmt_keys = substore.config.cf_jmt_keys(&substore.db); + let jmt_keys_iterator = + substore + .rocksdb_snapshot + .iterator_cf_opt(cf_jmt_keys, options, mode); + + for tuple in jmt_keys_iterator { + // For each key that matches the prefix, fetch the value from the JMT column family. + let (key_preimage, _) = tuple?; + + let k = std::str::from_utf8(key_preimage.as_ref()) + .expect("saved jmt keys are utf-8 strings") + .to_string(); + + let key_hash = jmt::KeyHash::with::(k.as_bytes()); + + let v = substore + .get_jmt(key_hash)? + .expect("keys in jmt_keys should have a corresponding value in jmt"); + + tx_prefix_item.blocking_send(Ok((k, v)))?; + } + anyhow::Ok(()) }) - .expect("should be able to spawn_blocking"); + }); tokio_stream::wrappers::ReceiverStream::new(rx_prefix_query) } @@ -371,27 +350,23 @@ impl StateRead for Snapshot { let mode = rocksdb::IteratorMode::Start; let (tx_prefix_keys, rx_prefix_keys) = mpsc::channel(10); - tokio::task::Builder::new() - .name("Snapshot::prefix_keys") - .spawn_blocking(move || { - span.in_scope(|| { - let cf_jmt_keys = substore.config.cf_jmt_keys(&substore.db); - let iter = - substore - .rocksdb_snapshot - .iterator_cf_opt(cf_jmt_keys, options, mode); - - for key_and_keyhash in iter { - let (raw_preimage, _) = key_and_keyhash?; - let preimage = std::str::from_utf8(raw_preimage.as_ref()) - .expect("saved jmt keys are utf-8 strings") - .to_string(); - tx_prefix_keys.blocking_send(Ok(preimage))?; - } - anyhow::Ok(()) - }) + tokio::task::spawn_blocking(move || { + span.in_scope(|| { + let cf_jmt_keys = substore.config.cf_jmt_keys(&substore.db); + let iter = substore + .rocksdb_snapshot + .iterator_cf_opt(cf_jmt_keys, options, mode); + + for key_and_keyhash in iter { + let (raw_preimage, _) = key_and_keyhash?; + let preimage = std::str::from_utf8(raw_preimage.as_ref()) + .expect("saved jmt keys are utf-8 strings") + .to_string(); + tx_prefix_keys.blocking_send(Ok(preimage))?; + } + anyhow::Ok(()) }) - .expect("should be able to spawn_blocking"); + }); tokio_stream::wrappers::ReceiverStream::new(rx_prefix_keys) } @@ -421,23 +396,20 @@ impl StateRead for Snapshot { let (tx_prefix_query, rx_prefix_query) = mpsc::channel(10); - tokio::task::Builder::new() - .name("Snapshot::nonverifiable_prefix_raw") - .spawn_blocking(move || { - span.in_scope(|| { - let cf_nonverifiable = substore.config.cf_nonverifiable(&substore.db); - let iter = - substore - .rocksdb_snapshot - .iterator_cf_opt(cf_nonverifiable, options, mode); - for i in iter { - let (key, value) = i?; - tx_prefix_query.blocking_send(Ok((key.into(), value.into())))?; - } - anyhow::Ok(()) - }) + tokio::task::spawn_blocking(move || { + span.in_scope(|| { + let cf_nonverifiable = substore.config.cf_nonverifiable(&substore.db); + let iter = + substore + .rocksdb_snapshot + .iterator_cf_opt(cf_nonverifiable, options, mode); + for i in iter { + let (key, value) = i?; + tx_prefix_query.blocking_send(Ok((key.into(), value.into())))?; + } + anyhow::Ok(()) }) - .expect("should be able to spawn_blocking"); + }); tokio_stream::wrappers::ReceiverStream::new(rx_prefix_query) } @@ -512,32 +484,29 @@ impl StateRead for Snapshot { let prefix = prefix.to_vec(); let (tx, rx) = mpsc::channel::, Vec)>>(10); - tokio::task::Builder::new() - .name("Snapshot::nonverifiable_range_raw") - .spawn_blocking(move || { - span.in_scope(|| { - let cf_nonverifiable = substore.config.cf_nonverifiable(&substore.db); - let iter = - substore - .rocksdb_snapshot - .iterator_cf_opt(cf_nonverifiable, options, mode); - - for i in iter { - let (key, value) = i?; - - // This is a bit of a hack, but RocksDB doesn't let us express the "prefixed range-queries", - // that we want to support. In particular, we want to be able to do a prefix query that starts - // at a particular key, and does not have an upper bound. Since we can't create an iterator that - // cover this range, we have to filter out the keys that don't match the prefix. - if !prefix.is_empty() && !key.starts_with(&prefix) { - break; - } - tx.blocking_send(Ok((key.into(), value.into())))?; + tokio::task::spawn_blocking(move || { + span.in_scope(|| { + let cf_nonverifiable = substore.config.cf_nonverifiable(&substore.db); + let iter = + substore + .rocksdb_snapshot + .iterator_cf_opt(cf_nonverifiable, options, mode); + + for i in iter { + let (key, value) = i?; + + // This is a bit of a hack, but RocksDB doesn't let us express the "prefixed range-queries", + // that we want to support. In particular, we want to be able to do a prefix query that starts + // at a particular key, and does not have an upper bound. Since we can't create an iterator that + // cover this range, we have to filter out the keys that don't match the prefix. + if !prefix.is_empty() && !key.starts_with(&prefix) { + break; } - Ok::<(), anyhow::Error>(()) - }) + tx.blocking_send(Ok((key.into(), value.into())))?; + } + Ok::<(), anyhow::Error>(()) }) - .expect("should be able to spawn_blocking"); + }); Ok(tokio_stream::wrappers::ReceiverStream::new(rx)) } diff --git a/crates/cnidarium/src/storage.rs b/crates/cnidarium/src/storage.rs index 350ae8f86d..c1c7bed71f 100644 --- a/crates/cnidarium/src/storage.rs +++ b/crates/cnidarium/src/storage.rs @@ -52,51 +52,49 @@ impl Storage { let span = Span::current(); let db_path = path.clone(); // initializing main storage instance. - let prefixes = tokio::task::Builder::new() - .name("config_rocksdb") - .spawn_blocking(move || { - span.in_scope(|| { - let mut opts = Options::default(); - opts.create_if_missing(true); - opts.create_missing_column_families(true); - tracing::info!(?path, "opening rocksdb config column"); - - // Hack(erwan): RocksDB requires us to specify all the column families - // that we want to use upfront. This is problematic when we are initializing - // a new database, because the call to `DBCommon::list_cf` will fail - // if the database manifest is not found. To work around this, we ignore - // the error and assume that the database is empty. - // Tracked in: https://github.com/rust-rocksdb/rust-rocksdb/issues/608 - let mut columns = DB::list_cf(&opts, path.clone()).unwrap_or_default(); - if columns.is_empty() { - columns.push("config".to_string()); - } - - let db = DB::open_cf(&opts, path, columns).expect("can open database"); - let cf_config = db - .cf_handle("config") - .expect("config column family is created if missing"); - let config_iter = db.iterator_cf(cf_config, rocksdb::IteratorMode::Start); - let mut prefixes = Vec::new(); - tracing::info!("reading prefixes from config column family"); - for i in config_iter { - let (key, _) = i.expect("can read from iterator"); - prefixes.push(String::from_utf8(key.to_vec()).expect("prefix is utf8")); + let prefixes = tokio::task::spawn_blocking(move || { + span.in_scope(|| { + let mut opts = Options::default(); + opts.create_if_missing(true); + opts.create_missing_column_families(true); + tracing::info!(?path, "opening rocksdb config column"); + + // Hack(erwan): RocksDB requires us to specify all the column families + // that we want to use upfront. This is problematic when we are initializing + // a new database, because the call to `DBCommon::list_cf` will fail + // if the database manifest is not found. To work around this, we ignore + // the error and assume that the database is empty. + // Tracked in: https://github.com/rust-rocksdb/rust-rocksdb/issues/608 + let mut columns = DB::list_cf(&opts, path.clone()).unwrap_or_default(); + if columns.is_empty() { + columns.push("config".to_string()); + } + + let db = DB::open_cf(&opts, path, columns).expect("can open database"); + let cf_config = db + .cf_handle("config") + .expect("config column family is created if missing"); + let config_iter = db.iterator_cf(cf_config, rocksdb::IteratorMode::Start); + let mut prefixes = Vec::new(); + tracing::info!("reading prefixes from config column family"); + for i in config_iter { + let (key, _) = i.expect("can read from iterator"); + prefixes.push(String::from_utf8(key.to_vec()).expect("prefix is utf8")); + } + + for prefix in default_prefixes { + if !prefixes.contains(&prefix) { + db.put_cf(cf_config, prefix.as_bytes(), b"") + .expect("can write to db"); + prefixes.push(prefix); } + } - for prefix in default_prefixes { - if !prefixes.contains(&prefix) { - db.put_cf(cf_config, prefix.as_bytes(), b"") - .expect("can write to db"); - prefixes.push(prefix); - } - } - - std::mem::drop(db); - prefixes - }) - })? - .await?; + std::mem::drop(db); + prefixes + }) + }) + .await?; Storage::init(db_path, prefixes).await } @@ -112,9 +110,8 @@ impl Storage { pub async fn init(path: PathBuf, prefixes: Vec) -> Result { let span = Span::current(); - tokio::task::Builder::new() - .name("open_rocksdb") - .spawn_blocking(move || { + tokio::task + ::spawn_blocking(move || { span.in_scope(|| { let mut substore_configs = Vec::new(); tracing::info!("initializing global store config"); @@ -230,7 +227,7 @@ impl Storage { db: shared_db, }))) }) - })? + }) .await? } diff --git a/crates/cnidarium/src/store/substore.rs b/crates/cnidarium/src/store/substore.rs index 9fc6b68521..058ca0441f 100644 --- a/crates/cnidarium/src/store/substore.rs +++ b/crates/cnidarium/src/store/substore.rs @@ -365,9 +365,8 @@ impl SubstoreStorage { ) -> Result<(RootHash, rocksdb::WriteBatch)> { let span = Span::current(); - tokio::task::Builder::new() - .name("Storage::commit_inner_substore") - .spawn_blocking(move || { + tokio::task + ::spawn_blocking(move || { span.in_scope(|| { let jmt = jmt::Sha256Jmt::new(&self.substore_snapshot); let unwritten_changes: Vec<_> = cache @@ -440,7 +439,7 @@ impl SubstoreStorage { Ok((root_hash, write_batch)) }) - })? + }) .await? } }