From ac16c6ba2d23e71aac56e59ea4feb87d2a748754 Mon Sep 17 00:00:00 2001 From: Eitan Seri-Levi Date: Sat, 5 Aug 2023 16:31:08 +0300 Subject: [PATCH 01/17] added default content type filter --- beacon_node/http_api/src/lib.rs | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 7d1475809a7..1d78846ba47 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -74,7 +74,7 @@ use version::{ use warp::http::StatusCode; use warp::sse::Event; use warp::Reply; -use warp::{http::Response, Filter}; +use warp::{http::Response, Filter, Rejection}; use warp_utils::{ query::multi_key_query, task::{blocking_json_task, blocking_response_task}, @@ -4000,7 +4000,7 @@ pub fn serve( // Define the ultimate set of routes that will be provided to the server. // Use `uor` rather than `or` in order to simplify types (see `UnifyingOrFilter`). - let routes = warp::get() + let routes = warp::get().and(default_content_type()) .and( get_beacon_genesis .uor(get_beacon_state_root) @@ -4070,7 +4070,7 @@ pub fn serve( ) .boxed() .uor( - warp::post().and( + warp::post().and(default_content_type()).and( warp::header::exact("Content-Type", "application/octet-stream") // Routes which expect `application/octet-stream` go within this `and`. .and(post_beacon_blocks_ssz.uor(post_beacon_blocks_v2_ssz)) @@ -4145,6 +4145,18 @@ pub fn serve( Ok(http_server) } +// Default content-type to application/json if no header was provided +fn default_content_type() -> impl Filter + Clone { + warp::header::optional::("accept") + .map(|content_type: Option| { + if content_type.is_none() { + warp::reply::with_header(warp::reply(), "Content-Type", "application/json"); + } + }) + .untuple_one() + .boxed() +} + /// Publish a message to the libp2p pubsub network. fn publish_pubsub_message( network_tx: &UnboundedSender>, From 599b1938e61cfd1f1e8363f40a0161408868fe6d Mon Sep 17 00:00:00 2001 From: Eitan Seri-Levi Date: Wed, 9 Aug 2023 12:35:36 +0300 Subject: [PATCH 02/17] added init sqlite code --- Cargo.lock | 2 + slasher/Cargo.toml | 5 +- slasher/src/database.rs | 1 + slasher/src/database/interface.rs | 14 ++ slasher/src/database/sqlite_impl.rs | 322 ++++++++++++++++++++++++++++ slasher/src/error.rs | 9 + 6 files changed, 352 insertions(+), 1 deletion(-) create mode 100644 slasher/src/database/sqlite_impl.rs diff --git a/Cargo.lock b/Cargo.lock index a24087c3a06..103659236ce 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7068,6 +7068,7 @@ dependencies = [ name = "slasher" version = "0.1.0" dependencies = [ + "base64 0.21.2", "bincode", "byteorder", "ethereum_ssz", @@ -7085,6 +7086,7 @@ dependencies = [ "parking_lot 0.12.1", "rand 0.8.5", "rayon", + "rusqlite", "safe_arith", "serde", "serde_derive", diff --git a/slasher/Cargo.toml b/slasher/Cargo.toml index bfa7b5f64c5..6a7920e173d 100644 --- a/slasher/Cargo.toml +++ b/slasher/Cargo.toml @@ -5,7 +5,7 @@ authors = ["Michael Sproul "] edition = "2021" [features] -default = ["lmdb"] +default = ["rusqlite"] mdbx = ["dep:mdbx"] lmdb = ["lmdb-rkv", "lmdb-rkv-sys"] @@ -31,6 +31,9 @@ tree_hash_derive = "0.5.0" types = { path = "../consensus/types" } strum = { version = "0.24.1", features = ["derive"] } +base64 = "0.21.2" +rusqlite = { version = "*", optional = true} + # MDBX is pinned at the last version with Windows and macOS support. mdbx = { package = "libmdbx", git = "https://github.com/sigp/libmdbx-rs", tag = "v0.1.4", optional = true } lmdb-rkv = { git = "https://github.com/sigp/lmdb-rs", rev = "f33845c6469b94265319aac0ed5085597862c27e", optional = true } diff --git a/slasher/src/database.rs b/slasher/src/database.rs index 49d2b00a4cd..bf9e37b530e 100644 --- a/slasher/src/database.rs +++ b/slasher/src/database.rs @@ -1,6 +1,7 @@ pub mod interface; mod lmdb_impl; mod mdbx_impl; +mod sqlite_impl; use crate::{ metrics, AttesterRecord, AttesterSlashingStatus, CompactAttesterRecord, Config, Error, diff --git a/slasher/src/database/interface.rs b/slasher/src/database/interface.rs index 5bb920383c3..e18f578a47f 100644 --- a/slasher/src/database/interface.rs +++ b/slasher/src/database/interface.rs @@ -7,6 +7,8 @@ use std::path::PathBuf; use crate::database::lmdb_impl; #[cfg(feature = "mdbx")] use crate::database::mdbx_impl; +#[cfg(feature = "rusqlite")] +use crate::database::sqlite_impl; #[derive(Debug)] pub enum Environment { @@ -14,6 +16,8 @@ pub enum Environment { Mdbx(mdbx_impl::Environment), #[cfg(feature = "lmdb")] Lmdb(lmdb_impl::Environment), + #[cfg(feature = "rusqlite")] + Sqlite(sqlite_impl::Environment), Disabled, } @@ -23,6 +27,8 @@ pub enum RwTransaction<'env> { Mdbx(mdbx_impl::RwTransaction<'env>), #[cfg(feature = "lmdb")] Lmdb(lmdb_impl::RwTransaction<'env>), + #[cfg(feature = "rusqlite")] + Sqlite(sqlite_impl::RwTransaction<'env>), Disabled(PhantomData<&'env ()>), } @@ -32,6 +38,8 @@ pub enum Database<'env> { Mdbx(mdbx_impl::Database<'env>), #[cfg(feature = "lmdb")] Lmdb(lmdb_impl::Database<'env>), + #[cfg(feature = "rusqlite")] + Sqlite(sqlite_impl::Database<'env>), Disabled(PhantomData<&'env ()>), } @@ -54,6 +62,8 @@ pub enum Cursor<'env> { Mdbx(mdbx_impl::Cursor<'env>), #[cfg(feature = "lmdb")] Lmdb(lmdb_impl::Cursor<'env>), + #[cfg(feature = "rusqlite")] + Sqlite(sqlite_impl::Cursor<'env>), Disabled(PhantomData<&'env ()>), } @@ -67,6 +77,8 @@ impl Environment { DatabaseBackend::Mdbx => mdbx_impl::Environment::new(config).map(Environment::Mdbx), #[cfg(feature = "lmdb")] DatabaseBackend::Lmdb => lmdb_impl::Environment::new(config).map(Environment::Lmdb), + #[cfg(feature = "rusqlite")] + DatabaseBackend::Sqlite => sqlite_impl::Environment::new(config).map(Environment::Sqlite), DatabaseBackend::Disabled => Err(Error::SlasherDatabaseBackendDisabled), } } @@ -114,6 +126,8 @@ impl<'env> RwTransaction<'env> { (Self::Mdbx(txn), Database::Mdbx(db)) => txn.get(db, key), #[cfg(feature = "lmdb")] (Self::Lmdb(txn), Database::Lmdb(db)) => txn.get(db, key), + #[cfg(feature = "rusqlite")] + (Self::Sqlite(txn), Database::Sqlite(db)) => txn.get(db, key), _ => Err(Error::MismatchedDatabaseVariant), } } diff --git a/slasher/src/database/sqlite_impl.rs b/slasher/src/database/sqlite_impl.rs new file mode 100644 index 00000000000..93ef73471ca --- /dev/null +++ b/slasher/src/database/sqlite_impl.rs @@ -0,0 +1,322 @@ +#![cfg(feature = "rusqlite")] +use std::fmt; +use std::{ + borrow::{Borrow, Cow}, + path::PathBuf, +}; +use base64::{Engine as _, engine::{general_purpose}}; + +use crate::{ + database::{ + interface::{Key, OpenDatabases, Value}, + *, + }, + Config, Error, +}; + +const BASE_DB: &str = "slasher_db"; + +#[derive(Debug)] +pub struct Environment { + _db_count: usize, + db: rusqlite::Connection, +} + +#[derive(Debug)] +pub struct Database<'env> { + env: &'env Environment, + table_name: &'env str, +} + +#[derive(Debug)] +pub struct RwTransaction<'env> { + // txn: Option>, + _phantom: PhantomData<&'env ()>, +} + +impl<'env> Drop for RwTransaction<'env> { + fn drop(&mut self) { + // Perform any necessary cleanup or resource deallocation here + // This code will be automatically executed when an instance of MyStruct goes out of scope. + } +} + +#[derive(Debug)] +pub struct Cursor<'env> { + db: &'env Database<'env>, + current_key: Option>, +} + +/* +pub struct WriteTransaction<'env>(redb::WriteTransaction<'env>); + +impl<'env> fmt::Debug for WriteTransaction<'env> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "InternalStruct {{ /* fields and their values */ }}") + } +} +*/ + +impl Environment { + pub fn new(config: &Config) -> Result { + let db_path = match config.database_path.join(BASE_DB).as_path().to_str() { + Some(path) => path.to_string(), + None => "".to_string(), + }; + let database = rusqlite::Connection::open(db_path)?; + + Ok(Environment { + _db_count: MAX_NUM_DBS, + db: database, + }) + } + + + pub fn create_databases(&self) -> Result { + let indexed_attestation_db = self.create_table(INDEXED_ATTESTATION_DB)?; + let indexed_attestation_id_db = self.create_table(INDEXED_ATTESTATION_ID_DB)?; + let attesters_db = self.create_table(ATTESTERS_DB)?; + let attesters_max_targets_db = self.create_table(ATTESTERS_MAX_TARGETS_DB)?; + let min_targets_db = self.create_table(MIN_TARGETS_DB)?; + let max_targets_db = self.create_table(MAX_TARGETS_DB)?; + let current_epochs_db = self.create_table(CURRENT_EPOCHS_DB)?; + let proposers_db = self.create_table(PROPOSERS_DB)?; + let metadata_db = self.create_table(METADATA_DB)?; + + Ok(OpenDatabases { + indexed_attestation_db, + indexed_attestation_id_db, + attesters_db, + attesters_max_targets_db, + min_targets_db, + max_targets_db, + current_epochs_db, + proposers_db, + metadata_db, + }) + } + + pub fn create_table<'env>( + &'env self, + table_name: &'env str, + ) -> Result, Error> { + + let create_table_command = format!( + "CREATE TABLE {} ( + key INTEGER PRIMARY KEY, + value INTEGER + );", table_name); + + self.db.execute(&create_table_command, ())?; + + Ok(crate::Database::Sqlite(Database { + table_name, + env: self, + })) + } + + pub fn filenames(&self, config: &Config) -> Vec { + vec![config.database_path.join(BASE_DB)] + } + + pub fn begin_rw_txn(&self) -> Result { + Ok(RwTransaction { + _phantom: PhantomData, + }) + } +} + + +impl<'env> Database<'env> { +} + +struct QueryResult { + value: Vec +} + +impl<'env> RwTransaction<'env> { + pub fn get + ?Sized>( + &'env self, + db: &Database<'env>, + key: &K, + ) -> Result>, Error> { + let encoded_key = general_purpose::STANDARD.encode(&key); + let query_statement = format!("SELECT value FROM {} where key = {}", db.table_name, encoded_key); + let database = &db.env.db; + let mut stmt = database.prepare(&query_statement)?; + + let result = stmt.query_row([], |row| { + Ok(QueryResult { + value: row.get(0)?, + }) + })?; + + Ok(Some(Cow::from(result.value))) + } +/* + pub fn put, V: AsRef<[u8]>>( + &mut self, + db: &Database, + key: K, + value: V, + ) -> Result<(), Error> { + let table_definition: TableDefinition<'_, &[u8], &[u8]> = + TableDefinition::new(db.table_name); + let database = &db.env.db; + let tx = database.begin_write()?; + { + let mut table = tx.open_table(table_definition)?; + table.insert(key.as_ref().borrow(), value.as_ref().borrow())?; + } + tx.commit()?; + Ok(()) + } + + pub fn del>(&mut self, db: &Database, key: K) -> Result<(), Error> { + let table_definition: TableDefinition<'_, &[u8], &[u8]> = + TableDefinition::new(db.table_name); + let database = &db.env.db; + let tx = database.begin_write()?; + { + let mut table = tx.open_table(table_definition)?; + table.remove(key.as_ref().borrow())?; + } + tx.commit()?; + Ok(()) + } + + pub fn cursor<'a>(&'a mut self, db: &'a Database<'a>) -> Result, Error> { + Ok(Cursor { + db, + current_key: None, + }) + } + + pub fn commit(self) -> Result<(), Error> { + Ok(()) + + match self.txn.unwrap().commit() { + Ok(_) => { + self.txn = None; + Ok(()) + } + Err(_) => panic!(), + } + } + */ +} +/* +impl<'env> Cursor<'env> { + pub fn first_key(&mut self) -> Result, Error> { + let table_definition: TableDefinition<'_, &[u8], &[u8]> = + TableDefinition::new(self.db.table_name); + let database = &self.db.env.db; + let tx = database.begin_read()?; + + let table = tx.open_table(table_definition)?; + + let first = table + .iter()? + .next() + .map(|x| x.map(|(key, _)| (key.value()).to_vec())); + + if let Some(owned_key) = first { + let owned_key = owned_key?; + self.current_key = Some(Cow::from(owned_key)); + Ok(self.current_key.clone()) + } else { + Ok(None) + } + } + + pub fn last_key(&mut self) -> Result>, Error> { + let table_definition: TableDefinition<'_, &[u8], &[u8]> = + TableDefinition::new(self.db.table_name); + let database = &self.db.env.db; + let tx = database.begin_read()?; + + let table = tx.open_table(table_definition)?; + + let last = table + .iter()? + .rev() + .next_back() + .map(|x| x.map(|(key, _)| (key.value()).to_vec())); + + if let Some(owned_key) = last { + let owned_key = owned_key?; + self.current_key = Some(Cow::from(owned_key)); + return Ok(self.current_key.clone()); + } + Ok(None) + } + + pub fn next_key(&mut self) -> Result>, Error> { + let table_definition: TableDefinition<'_, &[u8], &[u8]> = + TableDefinition::new(self.db.table_name); + let database = &self.db.env.db; + let tx = database.begin_read()?; + + if let Some(current_key) = &self.current_key.clone() { + let range: std::ops::RangeFrom<&[u8]> = current_key..; + let table = tx.open_table(table_definition)?; + + let next = table + .range(range)? + .next() + .map(|x| x.map(|(key, _)| (key.value()).to_vec())); + + if let Some(owned_key) = next { + let owned_key = owned_key?; + self.current_key = Some(Cow::from(owned_key)); + return Ok(self.current_key.clone()); + } + } + Ok(None) + } + + pub fn get_current(&mut self) -> Result, Value<'env>)>, Error> { + if let Some(key) = &self.current_key { + let table_definition: TableDefinition<'_, &[u8], &[u8]> = + TableDefinition::new(self.db.table_name); + let database = &self.db.env.db; + let tx = database.begin_read()?; + let table = tx.open_table(table_definition)?; + let result = table.get(key.as_ref())?; + + if let Some(access_guard) = result { + let value = access_guard.value().to_vec().clone(); + return Ok(Some((key.clone(), Cow::from(value)))); + } + } + Ok(None) + } + + pub fn delete_current(&mut self) -> Result<(), Error> { + if let Some(key) = &self.current_key { + let table_definition: TableDefinition<'_, &[u8], &[u8]> = + TableDefinition::new(self.db.table_name); + let database = &self.db.env.db; + let tx = database.begin_write()?; + { + let mut table = tx.open_table(table_definition)?; + table.remove(key.as_ref())?; + } + tx.commit()?; + } + Ok(()) + } + + pub fn put, V: AsRef<[u8]>>(&mut self, key: K, value: V) -> Result<(), Error> { + let table_definition: TableDefinition<'_, &[u8], &[u8]> = + TableDefinition::new(self.db.table_name); + let database = &self.db.env.db; + let tx = database.begin_write()?; + { + let mut table = tx.open_table(table_definition)?; + table.insert(key.as_ref().borrow(), value.as_ref().borrow())?; + } + tx.commit()?; + Ok(()) + } +}*/ \ No newline at end of file diff --git a/slasher/src/error.rs b/slasher/src/error.rs index b939c281e9f..a423d7a7e7b 100644 --- a/slasher/src/error.rs +++ b/slasher/src/error.rs @@ -8,6 +8,8 @@ pub enum Error { DatabaseMdbxError(mdbx::Error), #[cfg(feature = "lmdb")] DatabaseLmdbError(lmdb::Error), + #[cfg(feature = "rusqlite")] + DatabaseSqliteError(rusqlite::Error), SlasherDatabaseBackendDisabled, MismatchedDatabaseVariant, DatabaseIOError(io::Error), @@ -88,6 +90,13 @@ impl From for Error { } } +#[cfg(feature = "rusqlite")] +impl From for Error { + fn from(e: rusqlite::Error) -> Self { + Error::DatabaseSqliteError(e.into()) + } +} + impl From for Error { fn from(e: io::Error) -> Self { Error::DatabaseIOError(e) From 8f8f0bc890a33b97be32017acc88d1a56a4a9bba Mon Sep 17 00:00:00 2001 From: Eitan Seri-Levi Date: Fri, 25 Aug 2023 21:49:58 +0300 Subject: [PATCH 03/17] sqlite changes --- beacon_node/http_api/src/builder_states.rs | 72 +++++ beacon_node/http_api/src/lib.rs | 73 ++++- beacon_node/http_api/tests/tests.rs | 101 +++++++ lighthouse/Cargo.toml | 4 +- lighthouse/tests/beacon_node.rs | 4 +- slasher/Cargo.toml | 3 +- slasher/src/config.rs | 6 +- slasher/src/database.rs | 2 - slasher/src/database/interface.rs | 46 ++- slasher/src/database/sqlite_impl.rs | 315 ++++++++++----------- slasher/src/error.rs | 4 +- slasher/tests/backend.rs | 6 +- slasher/tests/proposer_slashings.rs | 9 +- slasher/tests/random.rs | 2 +- slasher/tests/wrap_around.rs | 2 +- 15 files changed, 451 insertions(+), 198 deletions(-) create mode 100644 beacon_node/http_api/src/builder_states.rs diff --git a/beacon_node/http_api/src/builder_states.rs b/beacon_node/http_api/src/builder_states.rs new file mode 100644 index 00000000000..90203f2d60c --- /dev/null +++ b/beacon_node/http_api/src/builder_states.rs @@ -0,0 +1,72 @@ +use crate::StateId; +use beacon_chain::{BeaconChain, BeaconChainTypes}; +use safe_arith::SafeArith; +use state_processing::per_block_processing::get_expected_withdrawals; +use state_processing::state_advance::partial_state_advance; +use std::sync::Arc; +use types::{BeaconState, EthSpec, ForkName, Slot, Withdrawals}; + +const MAX_EPOCH_LOOKAHEAD: u64 = 2; + +/// Get the withdrawals computed from the specified state, that will be included in the block +/// that gets built on the specified state. +pub fn get_next_withdrawals( + chain: &Arc>, + mut state: BeaconState, + state_id: StateId, + proposal_slot: Slot, +) -> Result, warp::Rejection> { + get_next_withdrawals_sanity_checks(chain, &state, proposal_slot)?; + + // advance the state to the epoch of the proposal slot. + let proposal_epoch = proposal_slot.epoch(T::EthSpec::slots_per_epoch()); + let (state_root, _, _) = state_id.root(chain)?; + if proposal_epoch != state.current_epoch() { + if let Err(e) = + partial_state_advance(&mut state, Some(state_root), proposal_slot, &chain.spec) + { + return Err(warp_utils::reject::custom_server_error(format!( + "failed to advance to the epoch of the proposal slot: {:?}", + e + ))); + } + } + + match get_expected_withdrawals(&state, &chain.spec) { + Ok(withdrawals) => Ok(withdrawals), + Err(e) => Err(warp_utils::reject::custom_server_error(format!( + "failed to get expected withdrawal: {:?}", + e + ))), + } +} + +fn get_next_withdrawals_sanity_checks( + chain: &BeaconChain, + state: &BeaconState, + proposal_slot: Slot, +) -> Result<(), warp::Rejection> { + if proposal_slot <= state.slot() { + return Err(warp_utils::reject::custom_bad_request( + "proposal slot must be greater than the pre-state slot".to_string(), + )); + } + + let fork = chain.spec.fork_name_at_slot::(proposal_slot); + if let ForkName::Base | ForkName::Altair | ForkName::Merge = fork { + return Err(warp_utils::reject::custom_bad_request( + "the specified state is a pre-capella state.".to_string(), + )); + } + + let look_ahead_limit = MAX_EPOCH_LOOKAHEAD + .safe_mul(T::EthSpec::slots_per_epoch()) + .map_err(warp_utils::reject::arith_error)?; + if proposal_slot >= state.slot() + look_ahead_limit { + return Err(warp_utils::reject::custom_bad_request(format!( + "proposal slot is greater than or equal to the look ahead limit: {look_ahead_limit}" + ))); + } + + Ok(()) +} diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 284709fb793..3aa10139b0b 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -10,6 +10,7 @@ mod attester_duties; mod block_id; mod block_packing_efficiency; mod block_rewards; +mod builder_states; mod database; mod metrics; mod proposer_duties; @@ -32,6 +33,7 @@ use beacon_chain::{ }; use beacon_processor::BeaconProcessorSend; pub use block_id::BlockId; +use builder_states::get_next_withdrawals; use bytes::Bytes; use directory::DEFAULT_ROOT_DIR; use eth2::types::{ @@ -2291,6 +2293,60 @@ pub fn serve( }, ); + /* + * builder/states + */ + + let builder_states_path = eth_v1 + .and(warp::path("builder")) + .and(warp::path("states")) + .and(chain_filter.clone()); + + // GET builder/states/{state_id}/expected_withdrawals + let get_expected_withdrawals = builder_states_path + .clone() + .and(task_spawner_filter.clone()) + .and(warp::path::param::()) + .and(warp::path("expected_withdrawals")) + .and(warp::query::()) + .and(warp::path::end()) + .and(warp::header::optional::("accept")) + .then( + |chain: Arc>, + task_spawner: TaskSpawner, + state_id: StateId, + query: api_types::ExpectedWithdrawalsQuery, + accept_header: Option| { + task_spawner.blocking_response_task(Priority::P1, move || { + let (state, execution_optimistic, finalized) = state_id.state(&chain)?; + let proposal_slot = query.proposal_slot.unwrap_or(state.slot() + 1); + let withdrawals = + get_next_withdrawals::(&chain, state, state_id, proposal_slot)?; + + match accept_header { + Some(api_types::Accept::Ssz) => Response::builder() + .status(200) + .header("Content-Type", "application/octet-stream") + .body(withdrawals.as_ssz_bytes().into()) + .map_err(|e| { + warp_utils::reject::custom_server_error(format!( + "failed to create response: {}", + e + )) + }), + _ => Ok(warp::reply::json( + &api_types::ExecutionOptimisticFinalizedResponse { + data: withdrawals, + execution_optimistic: Some(execution_optimistic), + finalized: Some(finalized), + }, + ) + .into_response()), + } + }) + }, + ); + /* * beacon/rewards */ @@ -4438,7 +4494,7 @@ pub fn serve( // Define the ultimate set of routes that will be provided to the server. // Use `uor` rather than `or` in order to simplify types (see `UnifyingOrFilter`). - let routes = warp::get().and(default_content_type()) + let routes = warp::get() .and( get_beacon_genesis .uor(get_beacon_state_root) @@ -4503,12 +4559,13 @@ pub fn serve( .uor(get_lighthouse_block_packing_efficiency) .uor(get_lighthouse_merge_readiness) .uor(get_events) + .uor(get_expected_withdrawals) .uor(lighthouse_log_events.boxed()) .recover(warp_utils::reject::handle_rejection), ) .boxed() .uor( - warp::post().and(default_content_type()).and( + warp::post().and( warp::header::exact("Content-Type", "application/octet-stream") // Routes which expect `application/octet-stream` go within this `and`. .and( @@ -4588,18 +4645,6 @@ pub fn serve( Ok(http_server) } -// Default content-type to application/json if no header was provided -fn default_content_type() -> impl Filter + Clone { - warp::header::optional::("accept") - .map(|content_type: Option| { - if content_type.is_none() { - warp::reply::with_header(warp::reply(), "Content-Type", "application/json"); - } - }) - .untuple_one() - .boxed() -} - /// Publish a message to the libp2p pubsub network. fn publish_pubsub_message( network_tx: &UnboundedSender>, diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index 3c72441c02f..46cc55591c7 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -28,6 +28,7 @@ use sensitive_url::SensitiveUrl; use slot_clock::SlotClock; use state_processing::per_block_processing::get_expected_withdrawals; use state_processing::per_slot_processing; +use state_processing::state_advance::partial_state_advance; use std::convert::TryInto; use std::sync::Arc; use tokio::time::Duration; @@ -4341,6 +4342,72 @@ impl ApiTester { self } + pub async fn test_get_expected_withdrawals_invalid_state(self) -> Self { + let state_id = CoreStateId::Root(Hash256::zero()); + + let result = self.client.get_expected_withdrawals(&state_id).await; + + match result { + Err(e) => { + assert_eq!(e.status().unwrap(), 404); + } + _ => panic!("query did not fail correctly"), + } + + self + } + + pub async fn test_get_expected_withdrawals_capella(self) -> Self { + let slot = self.chain.slot().unwrap(); + let state_id = CoreStateId::Slot(slot); + + // calculate the expected withdrawals + let (mut state, _, _) = StateId(state_id).state(&self.chain).unwrap(); + let proposal_slot = state.slot() + 1; + let proposal_epoch = proposal_slot.epoch(E::slots_per_epoch()); + let (state_root, _, _) = StateId(state_id).root(&self.chain).unwrap(); + if proposal_epoch != state.current_epoch() { + let _ = partial_state_advance( + &mut state, + Some(state_root), + proposal_slot, + &self.chain.spec, + ); + } + let expected_withdrawals = get_expected_withdrawals(&state, &self.chain.spec).unwrap(); + + // fetch expected withdrawals from the client + let result = self.client.get_expected_withdrawals(&state_id).await; + match result { + Ok(withdrawal_response) => { + assert_eq!(withdrawal_response.execution_optimistic, Some(false)); + assert_eq!(withdrawal_response.finalized, Some(false)); + assert_eq!(withdrawal_response.data, expected_withdrawals.to_vec()); + } + Err(e) => { + println!("{:?}", e); + panic!("query failed incorrectly"); + } + } + + self + } + + pub async fn test_get_expected_withdrawals_pre_capella(self) -> Self { + let state_id = CoreStateId::Head; + + let result = self.client.get_expected_withdrawals(&state_id).await; + + match result { + Err(e) => { + assert_eq!(e.status().unwrap(), 400); + } + _ => panic!("query did not fail correctly"), + } + + self + } + pub async fn test_get_events_altair(self) -> Self { let topics = vec![EventTopic::ContributionAndProof]; let mut events_future = self @@ -5123,3 +5190,37 @@ async fn optimistic_responses() { .test_check_optimistic_responses() .await; } + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn expected_withdrawals_invalid_pre_capella() { + let mut config = ApiTesterConfig::default(); + config.spec.altair_fork_epoch = Some(Epoch::new(0)); + ApiTester::new_from_config(config) + .await + .test_get_expected_withdrawals_pre_capella() + .await; +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn expected_withdrawals_invalid_state() { + let mut config = ApiTesterConfig::default(); + config.spec.altair_fork_epoch = Some(Epoch::new(0)); + config.spec.bellatrix_fork_epoch = Some(Epoch::new(0)); + config.spec.capella_fork_epoch = Some(Epoch::new(0)); + ApiTester::new_from_config(config) + .await + .test_get_expected_withdrawals_invalid_state() + .await; +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn expected_withdrawals_valid_capella() { + let mut config = ApiTesterConfig::default(); + config.spec.altair_fork_epoch = Some(Epoch::new(0)); + config.spec.bellatrix_fork_epoch = Some(Epoch::new(0)); + config.spec.capella_fork_epoch = Some(Epoch::new(0)); + ApiTester::new_from_config(config) + .await + .test_get_expected_withdrawals_capella() + .await; +} diff --git a/lighthouse/Cargo.toml b/lighthouse/Cargo.toml index 8003236f2df..e1eff8e9fe2 100644 --- a/lighthouse/Cargo.toml +++ b/lighthouse/Cargo.toml @@ -7,7 +7,7 @@ autotests = false rust-version = "1.68.2" [features] -default = ["slasher-lmdb"] +default = ["slasher-sqlite"] # Writes debugging .ssz files to /tmp during block processing. write_ssz_files = ["beacon_node/write_ssz_files"] # Compiles the BLS crypto code so that the binary is portable across machines. @@ -24,6 +24,8 @@ gnosis = [] slasher-mdbx = ["slasher/mdbx"] # Support slasher LMDB backend. slasher-lmdb = ["slasher/lmdb"] +# Support slasher LMDB backend. +slasher-sqlite = ["slasher/sqlite"] # Use jemalloc. jemalloc = ["malloc_utils/jemalloc"] diff --git a/lighthouse/tests/beacon_node.rs b/lighthouse/tests/beacon_node.rs index 37d224dbc31..f3b5857377d 100644 --- a/lighthouse/tests/beacon_node.rs +++ b/lighthouse/tests/beacon_node.rs @@ -1997,11 +1997,11 @@ fn slasher_backend_override_to_default() { CommandLineTest::new() .flag("slasher", None) .flag("slasher-max-db-size", Some("1")) - .flag("slasher-backend", Some("lmdb")) + .flag("slasher-backend", Some("sqlite")) .run_with_zero_port() .with_config(|config| { let slasher_config = config.slasher.as_ref().unwrap(); - assert_eq!(slasher_config.backend, slasher::DatabaseBackend::Lmdb); + assert_eq!(slasher_config.backend, slasher::DatabaseBackend::Sqlite); }); } diff --git a/slasher/Cargo.toml b/slasher/Cargo.toml index 6a7920e173d..a381221c891 100644 --- a/slasher/Cargo.toml +++ b/slasher/Cargo.toml @@ -5,9 +5,10 @@ authors = ["Michael Sproul "] edition = "2021" [features] -default = ["rusqlite"] +default = ["sqlite"] mdbx = ["dep:mdbx"] lmdb = ["lmdb-rkv", "lmdb-rkv-sys"] +sqlite = ["dep:rusqlite"] [dependencies] bincode = "1.3.1" diff --git a/slasher/src/config.rs b/slasher/src/config.rs index 361621d176b..44f07b746ea 100644 --- a/slasher/src/config.rs +++ b/slasher/src/config.rs @@ -17,7 +17,9 @@ pub const DEFAULT_BROADCAST: bool = false; pub const DEFAULT_BACKEND: DatabaseBackend = DatabaseBackend::Mdbx; #[cfg(feature = "lmdb")] pub const DEFAULT_BACKEND: DatabaseBackend = DatabaseBackend::Lmdb; -#[cfg(not(any(feature = "mdbx", feature = "lmdb")))] +#[cfg(feature = "sqlite")] +pub const DEFAULT_BACKEND: DatabaseBackend = DatabaseBackend::Sqlite; +#[cfg(not(any(feature = "mdbx", feature = "lmdb", feature = "sqlite")))] pub const DEFAULT_BACKEND: DatabaseBackend = DatabaseBackend::Disabled; pub const MAX_HISTORY_LENGTH: usize = 1 << 16; @@ -62,6 +64,8 @@ pub enum DatabaseBackend { Mdbx, #[cfg(feature = "lmdb")] Lmdb, + #[cfg(feature = "sqlite")] + Sqlite, Disabled, } diff --git a/slasher/src/database.rs b/slasher/src/database.rs index bf9e37b530e..bc0e0e63851 100644 --- a/slasher/src/database.rs +++ b/slasher/src/database.rs @@ -273,7 +273,6 @@ impl SlasherDB { }; db = db.migrate()?; - let mut txn = db.begin_rw_txn()?; if let Some(on_disk_config) = db.load_config(&mut txn)? { let current_disk_config = db.config.disk_config(); @@ -706,7 +705,6 @@ impl SlasherDB { let (slot, _) = ProposerKey::parse(key_bytes)?; if slot < min_slot { cursor.delete_current()?; - // End the loop if there is no next entry. if cursor.next_key()?.is_none() { break; diff --git a/slasher/src/database/interface.rs b/slasher/src/database/interface.rs index e18f578a47f..87c6e1f01ad 100644 --- a/slasher/src/database/interface.rs +++ b/slasher/src/database/interface.rs @@ -7,7 +7,7 @@ use std::path::PathBuf; use crate::database::lmdb_impl; #[cfg(feature = "mdbx")] use crate::database::mdbx_impl; -#[cfg(feature = "rusqlite")] +#[cfg(feature = "sqlite")] use crate::database::sqlite_impl; #[derive(Debug)] @@ -16,7 +16,7 @@ pub enum Environment { Mdbx(mdbx_impl::Environment), #[cfg(feature = "lmdb")] Lmdb(lmdb_impl::Environment), - #[cfg(feature = "rusqlite")] + #[cfg(feature = "sqlite")] Sqlite(sqlite_impl::Environment), Disabled, } @@ -27,7 +27,7 @@ pub enum RwTransaction<'env> { Mdbx(mdbx_impl::RwTransaction<'env>), #[cfg(feature = "lmdb")] Lmdb(lmdb_impl::RwTransaction<'env>), - #[cfg(feature = "rusqlite")] + #[cfg(feature = "sqlite")] Sqlite(sqlite_impl::RwTransaction<'env>), Disabled(PhantomData<&'env ()>), } @@ -38,7 +38,7 @@ pub enum Database<'env> { Mdbx(mdbx_impl::Database<'env>), #[cfg(feature = "lmdb")] Lmdb(lmdb_impl::Database<'env>), - #[cfg(feature = "rusqlite")] + #[cfg(feature = "sqlite")] Sqlite(sqlite_impl::Database<'env>), Disabled(PhantomData<&'env ()>), } @@ -62,7 +62,7 @@ pub enum Cursor<'env> { Mdbx(mdbx_impl::Cursor<'env>), #[cfg(feature = "lmdb")] Lmdb(lmdb_impl::Cursor<'env>), - #[cfg(feature = "rusqlite")] + #[cfg(feature = "sqlite")] Sqlite(sqlite_impl::Cursor<'env>), Disabled(PhantomData<&'env ()>), } @@ -77,8 +77,10 @@ impl Environment { DatabaseBackend::Mdbx => mdbx_impl::Environment::new(config).map(Environment::Mdbx), #[cfg(feature = "lmdb")] DatabaseBackend::Lmdb => lmdb_impl::Environment::new(config).map(Environment::Lmdb), - #[cfg(feature = "rusqlite")] - DatabaseBackend::Sqlite => sqlite_impl::Environment::new(config).map(Environment::Sqlite), + #[cfg(feature = "sqlite")] + DatabaseBackend::Sqlite => { + sqlite_impl::Environment::new(config).map(Environment::Sqlite) + } DatabaseBackend::Disabled => Err(Error::SlasherDatabaseBackendDisabled), } } @@ -89,6 +91,8 @@ impl Environment { Self::Mdbx(env) => env.create_databases(), #[cfg(feature = "lmdb")] Self::Lmdb(env) => env.create_databases(), + #[cfg(feature = "sqlite")] + Self::Sqlite(env) => env.create_databases(), _ => Err(Error::MismatchedDatabaseVariant), } } @@ -99,6 +103,8 @@ impl Environment { Self::Mdbx(env) => env.begin_rw_txn().map(RwTransaction::Mdbx), #[cfg(feature = "lmdb")] Self::Lmdb(env) => env.begin_rw_txn().map(RwTransaction::Lmdb), + #[cfg(feature = "sqlite")] + Self::Sqlite(env) => env.begin_rw_txn().map(RwTransaction::Sqlite), _ => Err(Error::MismatchedDatabaseVariant), } } @@ -110,6 +116,8 @@ impl Environment { Self::Mdbx(env) => env.filenames(config), #[cfg(feature = "lmdb")] Self::Lmdb(env) => env.filenames(config), + #[cfg(feature = "sqlite")] + Self::Sqlite(env) => env.filenames(config), _ => vec![], } } @@ -126,7 +134,7 @@ impl<'env> RwTransaction<'env> { (Self::Mdbx(txn), Database::Mdbx(db)) => txn.get(db, key), #[cfg(feature = "lmdb")] (Self::Lmdb(txn), Database::Lmdb(db)) => txn.get(db, key), - #[cfg(feature = "rusqlite")] + #[cfg(feature = "sqlite")] (Self::Sqlite(txn), Database::Sqlite(db)) => txn.get(db, key), _ => Err(Error::MismatchedDatabaseVariant), } @@ -143,6 +151,8 @@ impl<'env> RwTransaction<'env> { (Self::Mdbx(txn), Database::Mdbx(db)) => txn.put(db, key, value), #[cfg(feature = "lmdb")] (Self::Lmdb(txn), Database::Lmdb(db)) => txn.put(db, key, value), + #[cfg(feature = "sqlite")] + (Self::Sqlite(txn), Database::Sqlite(db)) => txn.put(db, key, value), _ => Err(Error::MismatchedDatabaseVariant), } } @@ -153,16 +163,20 @@ impl<'env> RwTransaction<'env> { (Self::Mdbx(txn), Database::Mdbx(db)) => txn.del(db, key), #[cfg(feature = "lmdb")] (Self::Lmdb(txn), Database::Lmdb(db)) => txn.del(db, key), + #[cfg(feature = "sqlite")] + (Self::Sqlite(txn), Database::Sqlite(db)) => txn.del(db, key), _ => Err(Error::MismatchedDatabaseVariant), } } - pub fn cursor<'a>(&'a mut self, db: &Database) -> Result, Error> { + pub fn cursor<'a>(&'a mut self, db: &'a Database) -> Result, Error> { match (self, db) { #[cfg(feature = "mdbx")] (Self::Mdbx(txn), Database::Mdbx(db)) => txn.cursor(db).map(Cursor::Mdbx), #[cfg(feature = "lmdb")] (Self::Lmdb(txn), Database::Lmdb(db)) => txn.cursor(db).map(Cursor::Lmdb), + #[cfg(feature = "sqlite")] + (Self::Sqlite(txn), Database::Sqlite(db)) => txn.cursor(db).map(Cursor::Sqlite), _ => Err(Error::MismatchedDatabaseVariant), } } @@ -173,6 +187,8 @@ impl<'env> RwTransaction<'env> { Self::Mdbx(txn) => txn.commit(), #[cfg(feature = "lmdb")] Self::Lmdb(txn) => txn.commit(), + #[cfg(feature = "sqlite")] + Self::Sqlite(txn) => txn.commit(), _ => Err(Error::MismatchedDatabaseVariant), } } @@ -186,6 +202,8 @@ impl<'env> Cursor<'env> { Cursor::Mdbx(cursor) => cursor.first_key(), #[cfg(feature = "lmdb")] Cursor::Lmdb(cursor) => cursor.first_key(), + #[cfg(feature = "sqlite")] + Cursor::Sqlite(cursor) => cursor.first_key(), _ => Err(Error::MismatchedDatabaseVariant), } } @@ -197,6 +215,8 @@ impl<'env> Cursor<'env> { Cursor::Mdbx(cursor) => cursor.last_key(), #[cfg(feature = "lmdb")] Cursor::Lmdb(cursor) => cursor.last_key(), + #[cfg(feature = "sqlite")] + Cursor::Sqlite(cursor) => cursor.last_key(), _ => Err(Error::MismatchedDatabaseVariant), } } @@ -207,6 +227,8 @@ impl<'env> Cursor<'env> { Cursor::Mdbx(cursor) => cursor.next_key(), #[cfg(feature = "lmdb")] Cursor::Lmdb(cursor) => cursor.next_key(), + #[cfg(feature = "sqlite")] + Cursor::Sqlite(cursor) => cursor.next_key(), _ => Err(Error::MismatchedDatabaseVariant), } } @@ -218,6 +240,8 @@ impl<'env> Cursor<'env> { Cursor::Mdbx(cursor) => cursor.get_current(), #[cfg(feature = "lmdb")] Cursor::Lmdb(cursor) => cursor.get_current(), + #[cfg(feature = "sqlite")] + Cursor::Sqlite(cursor) => cursor.get_current(), _ => Err(Error::MismatchedDatabaseVariant), } } @@ -228,6 +252,8 @@ impl<'env> Cursor<'env> { Cursor::Mdbx(cursor) => cursor.delete_current(), #[cfg(feature = "lmdb")] Cursor::Lmdb(cursor) => cursor.delete_current(), + #[cfg(feature = "sqlite")] + Cursor::Sqlite(cursor) => cursor.delete_current(), _ => Err(Error::MismatchedDatabaseVariant), } } @@ -238,6 +264,8 @@ impl<'env> Cursor<'env> { Self::Mdbx(cursor) => cursor.put(key, value), #[cfg(feature = "lmdb")] Self::Lmdb(cursor) => cursor.put(key, value), + #[cfg(feature = "sqlite")] + Self::Sqlite(cursor) => cursor.put(key, value), _ => Err(Error::MismatchedDatabaseVariant), } } diff --git a/slasher/src/database/sqlite_impl.rs b/slasher/src/database/sqlite_impl.rs index 93ef73471ca..125fbb89195 100644 --- a/slasher/src/database/sqlite_impl.rs +++ b/slasher/src/database/sqlite_impl.rs @@ -1,10 +1,11 @@ -#![cfg(feature = "rusqlite")] +#![cfg(feature = "sqlite")] +use base64::{engine::general_purpose, Engine as _}; +use rusqlite::{params, OptionalExtension, ToSql}; use std::fmt; use std::{ borrow::{Borrow, Cow}, path::PathBuf, }; -use base64::{Engine as _, engine::{general_purpose}}; use crate::{ database::{ @@ -16,10 +17,23 @@ use crate::{ const BASE_DB: &str = "slasher_db"; +impl<'env> Database<'env> {} + +struct QueryResult { + id: u8, + value: Vec, +} + +struct FullQueryResult { + id: u8, + key: Vec, + value: Vec, +} + #[derive(Debug)] pub struct Environment { _db_count: usize, - db: rusqlite::Connection, + db_path: String, } #[derive(Debug)] @@ -44,34 +58,23 @@ impl<'env> Drop for RwTransaction<'env> { #[derive(Debug)] pub struct Cursor<'env> { db: &'env Database<'env>, - current_key: Option>, + current_id: Option, } -/* -pub struct WriteTransaction<'env>(redb::WriteTransaction<'env>); - -impl<'env> fmt::Debug for WriteTransaction<'env> { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "InternalStruct {{ /* fields and their values */ }}") - } -} -*/ - impl Environment { pub fn new(config: &Config) -> Result { let db_path = match config.database_path.join(BASE_DB).as_path().to_str() { Some(path) => path.to_string(), None => "".to_string(), }; - let database = rusqlite::Connection::open(db_path)?; - + let _ = rusqlite::Connection::open(&db_path)?; + Ok(Environment { _db_count: MAX_NUM_DBS, - db: database, + db_path, }) } - pub fn create_databases(&self) -> Result { let indexed_attestation_db = self.create_table(INDEXED_ATTESTATION_DB)?; let indexed_attestation_id_db = self.create_table(INDEXED_ATTESTATION_ID_DB)?; @@ -100,21 +103,25 @@ impl Environment { &'env self, table_name: &'env str, ) -> Result, Error> { - - let create_table_command = format!( + let create_table_command = format!( "CREATE TABLE {} ( - key INTEGER PRIMARY KEY, - value INTEGER - );", table_name); + id INTEGER PRIMARY KEY AUTOINCREMENT, + key BLOB UNIQUE, + value BLOB + );", + table_name + ); - self.db.execute(&create_table_command, ())?; + let database = rusqlite::Connection::open(&self.db_path)?; + + database.execute(&create_table_command, ())?; Ok(crate::Database::Sqlite(Database { table_name, env: self, })) } - + pub fn filenames(&self, config: &Config) -> Vec { vec![config.database_path.join(BASE_DB)] } @@ -126,197 +133,187 @@ impl Environment { } } - -impl<'env> Database<'env> { -} - -struct QueryResult { - value: Vec -} - impl<'env> RwTransaction<'env> { pub fn get + ?Sized>( &'env self, db: &Database<'env>, key: &K, ) -> Result>, Error> { - let encoded_key = general_purpose::STANDARD.encode(&key); - let query_statement = format!("SELECT value FROM {} where key = {}", db.table_name, encoded_key); - let database = &db.env.db; + let query_statement = format!("SELECT * FROM {} where key =:key;", db.table_name); + let database = rusqlite::Connection::open(&db.env.db_path)?; let mut stmt = database.prepare(&query_statement)?; - - let result = stmt.query_row([], |row| { - Ok(QueryResult { - value: row.get(0)?, + + let query_result = stmt + .query_row([key.as_ref()], |row| { + Ok(FullQueryResult { + id: row.get(0)?, + key: row.get(1)?, + value: row.get(2)?, + }) }) - })?; + .optional()?; - Ok(Some(Cow::from(result.value))) + match query_result { + Some(result) => Ok(Some(Cow::from(result.value))), + None => Ok(None), + } } -/* + pub fn put, V: AsRef<[u8]>>( &mut self, db: &Database, key: K, value: V, ) -> Result<(), Error> { - let table_definition: TableDefinition<'_, &[u8], &[u8]> = - TableDefinition::new(db.table_name); - let database = &db.env.db; - let tx = database.begin_write()?; - { - let mut table = tx.open_table(table_definition)?; - table.insert(key.as_ref().borrow(), value.as_ref().borrow())?; - } - tx.commit()?; + let insert_statement = format!( + "INSERT OR REPLACE INTO {} (key, value) VALUES (?1, ?2)", + db.table_name + ); + let database = rusqlite::Connection::open(&db.env.db_path)?; + let _ = database.execute(&insert_statement, params![key.as_ref(), value.as_ref()])?; Ok(()) } pub fn del>(&mut self, db: &Database, key: K) -> Result<(), Error> { - let table_definition: TableDefinition<'_, &[u8], &[u8]> = - TableDefinition::new(db.table_name); - let database = &db.env.db; - let tx = database.begin_write()?; - { - let mut table = tx.open_table(table_definition)?; - table.remove(key.as_ref().borrow())?; - } - tx.commit()?; + let encoded_key = general_purpose::STANDARD.encode(&key); + let delete_statement = format!("DELETE FROM {} WHERE key=?1", db.table_name); + let database = rusqlite::Connection::open(&db.env.db_path)?; + let _ = database.execute(&delete_statement, [encoded_key])?; Ok(()) } - pub fn cursor<'a>(&'a mut self, db: &'a Database<'a>) -> Result, Error> { + pub fn cursor<'a>(&'a mut self, db: &'a Database) -> Result, Error> { Ok(Cursor { db, - current_key: None, + current_id: None, }) } pub fn commit(self) -> Result<(), Error> { Ok(()) - - match self.txn.unwrap().commit() { - Ok(_) => { - self.txn = None; - Ok(()) - } - Err(_) => panic!(), - } } - */ } -/* + impl<'env> Cursor<'env> { pub fn first_key(&mut self) -> Result, Error> { - let table_definition: TableDefinition<'_, &[u8], &[u8]> = - TableDefinition::new(self.db.table_name); - let database = &self.db.env.db; - let tx = database.begin_read()?; - - let table = tx.open_table(table_definition)?; - - let first = table - .iter()? - .next() - .map(|x| x.map(|(key, _)| (key.value()).to_vec())); - - if let Some(owned_key) = first { - let owned_key = owned_key?; - self.current_key = Some(Cow::from(owned_key)); - Ok(self.current_key.clone()) - } else { - Ok(None) + let query_statement = format!("SELECT id, key FROM {} ORDER BY id ASC", self.db.table_name); + let database = rusqlite::Connection::open(&self.db.env.db_path)?; + let mut stmt = database.prepare(&query_statement)?; + let mut query_result = stmt.query_map([], |row| { + Ok(QueryResult { + id: row.get(0)?, + value: row.get(1)?, + }) + })?; + + match query_result.next() { + Some(result) => { + let r: QueryResult = result?; + let key = Cow::from(r.value); + self.current_id = Some(r.id.clone()); + Ok(Some(key)) + } + None => Ok(None), } } pub fn last_key(&mut self) -> Result>, Error> { - let table_definition: TableDefinition<'_, &[u8], &[u8]> = - TableDefinition::new(self.db.table_name); - let database = &self.db.env.db; - let tx = database.begin_read()?; - - let table = tx.open_table(table_definition)?; - - let last = table - .iter()? - .rev() - .next_back() - .map(|x| x.map(|(key, _)| (key.value()).to_vec())); - - if let Some(owned_key) = last { - let owned_key = owned_key?; - self.current_key = Some(Cow::from(owned_key)); - return Ok(self.current_key.clone()); + let query_statement = format!("SELECT * FROM {} ORDER BY id ASC", self.db.table_name); + let database = rusqlite::Connection::open(&self.db.env.db_path)?; + let mut stmt = database.prepare(&query_statement)?; + + let mut query_result = stmt.query_map([], |row| { + Ok(FullQueryResult { + id: row.get(0)?, + key: row.get(1)?, + value: row.get(2)?, + }) + })?; + + match query_result.last() { + Some(result) => { + let r: FullQueryResult = result?; + let key = Cow::from(r.key); + self.current_id = Some(r.id.clone()); + Ok(Some(key)) + } + None => Ok(None), } - Ok(None) } pub fn next_key(&mut self) -> Result>, Error> { - let table_definition: TableDefinition<'_, &[u8], &[u8]> = - TableDefinition::new(self.db.table_name); - let database = &self.db.env.db; - let tx = database.begin_read()?; - - if let Some(current_key) = &self.current_key.clone() { - let range: std::ops::RangeFrom<&[u8]> = current_key..; - let table = tx.open_table(table_definition)?; - - let next = table - .range(range)? - .next() - .map(|x| x.map(|(key, _)| (key.value()).to_vec())); - - if let Some(owned_key) = next { - let owned_key = owned_key?; - self.current_key = Some(Cow::from(owned_key)); - return Ok(self.current_key.clone()); + let mut query_statement = "".to_string(); + if let Some(current_id) = &self.current_id { + query_statement = format!( + "SELECT id, key FROM {} where id > {} ORDER BY id ASC", + self.db.table_name, current_id + ); + } else { + query_statement = format!("SELECT id, key FROM {} ORDER BY id ASC", self.db.table_name); + } + let database = rusqlite::Connection::open(&self.db.env.db_path)?; + let mut stmt = database.prepare(&query_statement)?; + + let mut query_result = stmt.query_map([], |row| { + Ok(QueryResult { + id: row.get(0)?, + value: row.get(1)?, + }) + })?; + + match query_result.next() { + Some(result) => { + let r: QueryResult = result?; + let key = Cow::from(r.value); + self.current_id = Some(r.id.clone()); + Ok(Some(key)) } + None => Ok(None), } - Ok(None) } pub fn get_current(&mut self) -> Result, Value<'env>)>, Error> { - if let Some(key) = &self.current_key { - let table_definition: TableDefinition<'_, &[u8], &[u8]> = - TableDefinition::new(self.db.table_name); - let database = &self.db.env.db; - let tx = database.begin_read()?; - let table = tx.open_table(table_definition)?; - let result = table.get(key.as_ref())?; - - if let Some(access_guard) = result { - let value = access_guard.value().to_vec().clone(); - return Ok(Some((key.clone(), Cow::from(value)))); + if let Some(current_id) = &self.current_id { + let query_statement = format!( + "SELECT id, key, value FROM {} where id=?1", + self.db.table_name + ); + let database = rusqlite::Connection::open(&self.db.env.db_path)?; + let mut stmt = database.prepare(&query_statement)?; + let query_result = stmt + .query_row([current_id], |row| { + Ok(FullQueryResult { + id: row.get(0)?, + key: row.get(1)?, + value: row.get(2)?, + }) + }) + .optional()?; + + if let Some(result) = query_result { + return Ok(Some((Cow::from(result.key), Cow::from(result.value)))); } } Ok(None) } pub fn delete_current(&mut self) -> Result<(), Error> { - if let Some(key) = &self.current_key { - let table_definition: TableDefinition<'_, &[u8], &[u8]> = - TableDefinition::new(self.db.table_name); - let database = &self.db.env.db; - let tx = database.begin_write()?; - { - let mut table = tx.open_table(table_definition)?; - table.remove(key.as_ref())?; - } - tx.commit()?; + if let Some(current_id) = &self.current_id { + let delete_statement = format!("DELETE FROM {} WHERE id=?1", self.db.table_name); + let database = rusqlite::Connection::open(&self.db.env.db_path)?; + let _ = database.execute(&delete_statement, [current_id])?; + self.current_id = None; } Ok(()) } pub fn put, V: AsRef<[u8]>>(&mut self, key: K, value: V) -> Result<(), Error> { - let table_definition: TableDefinition<'_, &[u8], &[u8]> = - TableDefinition::new(self.db.table_name); - let database = &self.db.env.db; - let tx = database.begin_write()?; - { - let mut table = tx.open_table(table_definition)?; - table.insert(key.as_ref().borrow(), value.as_ref().borrow())?; - } - tx.commit()?; + let insert_statement = format!( + "INSERT OR REPLACE INTO {} (key, value) VALUES (?1, ?2)", + self.db.table_name + ); + let database = rusqlite::Connection::open(&self.db.env.db_path)?; + let _ = database.execute(&insert_statement, params![key.as_ref(), value.as_ref()])?; Ok(()) } -}*/ \ No newline at end of file +} diff --git a/slasher/src/error.rs b/slasher/src/error.rs index a423d7a7e7b..c925706ee74 100644 --- a/slasher/src/error.rs +++ b/slasher/src/error.rs @@ -8,7 +8,7 @@ pub enum Error { DatabaseMdbxError(mdbx::Error), #[cfg(feature = "lmdb")] DatabaseLmdbError(lmdb::Error), - #[cfg(feature = "rusqlite")] + #[cfg(feature = "sqlite")] DatabaseSqliteError(rusqlite::Error), SlasherDatabaseBackendDisabled, MismatchedDatabaseVariant, @@ -90,7 +90,7 @@ impl From for Error { } } -#[cfg(feature = "rusqlite")] +#[cfg(feature = "sqlite")] impl From for Error { fn from(e: rusqlite::Error) -> Self { Error::DatabaseSqliteError(e.into()) diff --git a/slasher/tests/backend.rs b/slasher/tests/backend.rs index fd1a6ae14f6..4c91eccdfbc 100644 --- a/slasher/tests/backend.rs +++ b/slasher/tests/backend.rs @@ -1,11 +1,11 @@ -#![cfg(feature = "lmdb")] +#![cfg(any(feature = "mdbx", feature = "lmdb"))] use slasher::{config::MDBX_DATA_FILENAME, Config, DatabaseBackend, DatabaseBackendOverride}; use std::fs::File; use tempfile::tempdir; #[test] -#[cfg(all(feature = "mdbx", feature = "lmdb"))] +#[cfg(all(feature = "mdbx", feature = "lmdb", feature = "sqlite"))] fn override_no_existing_db() { let tempdir = tempdir().unwrap(); let mut config = Config::new(tempdir.path().into()); @@ -41,7 +41,7 @@ fn no_override_with_existing_mdbx_db() { } #[test] -#[cfg(all(not(feature = "mdbx"), feature = "lmdb"))] +#[cfg(all(not(feature = "mdbx"), feature = "lmdb", not(feature = "sqlite")))] fn failed_override_with_existing_mdbx_db() { let tempdir = tempdir().unwrap(); let mut config = Config::new(tempdir.path().into()); diff --git a/slasher/tests/proposer_slashings.rs b/slasher/tests/proposer_slashings.rs index 3b7b8ed583c..13a3edcbbca 100644 --- a/slasher/tests/proposer_slashings.rs +++ b/slasher/tests/proposer_slashings.rs @@ -1,4 +1,4 @@ -#![cfg(any(feature = "mdbx", feature = "lmdb"))] +#![cfg(any(feature = "mdbx", feature = "lmdb", feature = "sqlite"))] use logging::test_logger; use slasher::{ @@ -18,17 +18,22 @@ fn empty_pruning() { #[test] fn block_pruning() { + println!("START"); let slots_per_epoch = E::slots_per_epoch(); let tempdir = tempdir().unwrap(); + println!("....whty"); let mut config = Config::new(tempdir.path().into()); + println!("....whty2"); config.chunk_size = 2; config.history_length = 2; - + println!("....whty3"); let slasher = Slasher::::open(config.clone(), test_logger()).unwrap(); + println!("....whty4"); let current_epoch = Epoch::from(2 * config.history_length); // Pruning the empty database should be safe. + slasher.prune_database(Epoch::new(0)).unwrap(); slasher.prune_database(current_epoch).unwrap(); diff --git a/slasher/tests/random.rs b/slasher/tests/random.rs index 968a4dbb688..d3de050443e 100644 --- a/slasher/tests/random.rs +++ b/slasher/tests/random.rs @@ -1,4 +1,4 @@ -#![cfg(any(feature = "mdbx", feature = "lmdb"))] +#![cfg(any(feature = "mdbx", feature = "lmdb", feature = "sqlite"))] use logging::test_logger; use rand::prelude::*; diff --git a/slasher/tests/wrap_around.rs b/slasher/tests/wrap_around.rs index d2c876d3630..9ad9614f41c 100644 --- a/slasher/tests/wrap_around.rs +++ b/slasher/tests/wrap_around.rs @@ -1,4 +1,4 @@ -#![cfg(any(feature = "mdbx", feature = "lmdb"))] +#![cfg(any(feature = "mdbx", feature = "lmdb", feature = "sqlite"))] use logging::test_logger; use slasher::{test_utils::indexed_att, Config, Slasher}; From 9efa8c58ca9c530d5339eca417bdd28ec677c935 Mon Sep 17 00:00:00 2001 From: Eitan Seri-Levi Date: Fri, 25 Aug 2023 21:52:02 +0300 Subject: [PATCH 04/17] remove uncessary import --- Cargo.lock | 1 - slasher/Cargo.toml | 1 - 2 files changed, 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index db67f646ce9..b41afa4ecf6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7090,7 +7090,6 @@ dependencies = [ name = "slasher" version = "0.1.0" dependencies = [ - "base64 0.21.2", "bincode", "byteorder", "ethereum_ssz", diff --git a/slasher/Cargo.toml b/slasher/Cargo.toml index a381221c891..ab50a57001c 100644 --- a/slasher/Cargo.toml +++ b/slasher/Cargo.toml @@ -32,7 +32,6 @@ tree_hash_derive = "0.5.0" types = { path = "../consensus/types" } strum = { version = "0.24.1", features = ["derive"] } -base64 = "0.21.2" rusqlite = { version = "*", optional = true} # MDBX is pinned at the last version with Windows and macOS support. From b439002a147a3f003bcdb771e76d277af33d3458 Mon Sep 17 00:00:00 2001 From: Eitan Seri-Levi Date: Fri, 25 Aug 2023 21:56:37 +0300 Subject: [PATCH 05/17] remove import --- slasher/src/database/sqlite_impl.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/slasher/src/database/sqlite_impl.rs b/slasher/src/database/sqlite_impl.rs index 125fbb89195..67fc951c555 100644 --- a/slasher/src/database/sqlite_impl.rs +++ b/slasher/src/database/sqlite_impl.rs @@ -1,5 +1,4 @@ #![cfg(feature = "sqlite")] -use base64::{engine::general_purpose, Engine as _}; use rusqlite::{params, OptionalExtension, ToSql}; use std::fmt; use std::{ @@ -175,10 +174,9 @@ impl<'env> RwTransaction<'env> { } pub fn del>(&mut self, db: &Database, key: K) -> Result<(), Error> { - let encoded_key = general_purpose::STANDARD.encode(&key); let delete_statement = format!("DELETE FROM {} WHERE key=?1", db.table_name); let database = rusqlite::Connection::open(&db.env.db_path)?; - let _ = database.execute(&delete_statement, [encoded_key])?; + let _ = database.execute(&delete_statement, [key.as_ref()])?; Ok(()) } From 7217d95c3216ddf9c585dfa2abe8ad3ed64e6106 Mon Sep 17 00:00:00 2001 From: Eitan Seri-Levi Date: Fri, 25 Aug 2023 22:11:13 +0300 Subject: [PATCH 06/17] cleanup --- slasher/src/database/sqlite_impl.rs | 6 +++--- slasher/src/error.rs | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/slasher/src/database/sqlite_impl.rs b/slasher/src/database/sqlite_impl.rs index 67fc951c555..8676952c5b0 100644 --- a/slasher/src/database/sqlite_impl.rs +++ b/slasher/src/database/sqlite_impl.rs @@ -208,7 +208,7 @@ impl<'env> Cursor<'env> { Some(result) => { let r: QueryResult = result?; let key = Cow::from(r.value); - self.current_id = Some(r.id.clone()); + self.current_id = Some(r.id); Ok(Some(key)) } None => Ok(None), @@ -232,7 +232,7 @@ impl<'env> Cursor<'env> { Some(result) => { let r: FullQueryResult = result?; let key = Cow::from(r.key); - self.current_id = Some(r.id.clone()); + self.current_id = Some(r.id); Ok(Some(key)) } None => Ok(None), @@ -263,7 +263,7 @@ impl<'env> Cursor<'env> { Some(result) => { let r: QueryResult = result?; let key = Cow::from(r.value); - self.current_id = Some(r.id.clone()); + self.current_id = Some(r.id); Ok(Some(key)) } None => Ok(None), diff --git a/slasher/src/error.rs b/slasher/src/error.rs index c925706ee74..a6a5dfdd137 100644 --- a/slasher/src/error.rs +++ b/slasher/src/error.rs @@ -93,7 +93,7 @@ impl From for Error { #[cfg(feature = "sqlite")] impl From for Error { fn from(e: rusqlite::Error) -> Self { - Error::DatabaseSqliteError(e.into()) + Error::DatabaseSqliteError(e) } } From 2e440ccc0e05d3abd550a35138dfe747fd330ecd Mon Sep 17 00:00:00 2001 From: Eitan Seri-Levi Date: Sun, 17 Sep 2023 19:23:06 +0300 Subject: [PATCH 07/17] improved queries --- Cargo.lock | 20 ++++---- slasher/src/database/sqlite_impl.rs | 78 +++++++++++++---------------- slasher/tests/proposer_slashings.rs | 6 --- 3 files changed, 45 insertions(+), 59 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3b72b91f9c2..613e0cd043a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9068,16 +9068,6 @@ version = "0.25.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "14247bb57be4f377dfb94c72830b8ce8fc6beac03cf4bf7b9732eadd414123fc" -[[package]] -name = "whoami" -version = "1.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22fc3756b8a9133049b26c7f61ab35416c130e8c09b660f5b3958b446f52cc50" -dependencies = [ - "wasm-bindgen", - "web-sys", -] - [[package]] name = "which" version = "4.4.0" @@ -9089,6 +9079,16 @@ dependencies = [ "once_cell", ] +[[package]] +name = "whoami" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22fc3756b8a9133049b26c7f61ab35416c130e8c09b660f5b3958b446f52cc50" +dependencies = [ + "wasm-bindgen", + "web-sys", +] + [[package]] name = "widestring" version = "0.4.3" diff --git a/slasher/src/database/sqlite_impl.rs b/slasher/src/database/sqlite_impl.rs index 8676952c5b0..98549f3ea7a 100644 --- a/slasher/src/database/sqlite_impl.rs +++ b/slasher/src/database/sqlite_impl.rs @@ -19,14 +19,14 @@ const BASE_DB: &str = "slasher_db"; impl<'env> Database<'env> {} struct QueryResult { - id: u8, - value: Vec, + id: Option, + value: Option>, } struct FullQueryResult { - id: u8, - key: Vec, - value: Vec, + id: Option, + key: Option>, + value: Option>, } #[derive(Debug)] @@ -153,7 +153,7 @@ impl<'env> RwTransaction<'env> { .optional()?; match query_result { - Some(result) => Ok(Some(Cow::from(result.value))), + Some(result) => Ok(Some(Cow::from(result.value.unwrap_or_default()))), None => Ok(None), } } @@ -194,33 +194,31 @@ impl<'env> RwTransaction<'env> { impl<'env> Cursor<'env> { pub fn first_key(&mut self) -> Result, Error> { - let query_statement = format!("SELECT id, key FROM {} ORDER BY id ASC", self.db.table_name); + let query_statement = format!("SELECT MIN(id), key, value FROM {}", self.db.table_name); let database = rusqlite::Connection::open(&self.db.env.db_path)?; let mut stmt = database.prepare(&query_statement)?; - let mut query_result = stmt.query_map([], |row| { - Ok(QueryResult { + let mut query_result = stmt.query_row([], |row| { + Ok(FullQueryResult { id: row.get(0)?, - value: row.get(1)?, + key: row.get(1)?, + value: row.get(2)?, }) })?; - match query_result.next() { - Some(result) => { - let r: QueryResult = result?; - let key = Cow::from(r.value); - self.current_id = Some(r.id); - Ok(Some(key)) - } - None => Ok(None), - } + if query_result.id.is_some() { + let key = Cow::from(query_result.key.unwrap_or_default()); + self.current_id = query_result.id; + return Ok(Some(key)) + } + Ok(None) } pub fn last_key(&mut self) -> Result>, Error> { - let query_statement = format!("SELECT * FROM {} ORDER BY id ASC", self.db.table_name); + let query_statement = format!("SELECT MAX(id), key, value FROM {}", self.db.table_name); let database = rusqlite::Connection::open(&self.db.env.db_path)?; let mut stmt = database.prepare(&query_statement)?; - let mut query_result = stmt.query_map([], |row| { + let mut query_result = stmt.query_row([], |row| { Ok(FullQueryResult { id: row.get(0)?, key: row.get(1)?, @@ -228,46 +226,40 @@ impl<'env> Cursor<'env> { }) })?; - match query_result.last() { - Some(result) => { - let r: FullQueryResult = result?; - let key = Cow::from(r.key); - self.current_id = Some(r.id); - Ok(Some(key)) - } - None => Ok(None), - } + if query_result.id.is_some() { + let key = Cow::from(query_result.key.unwrap_or_default()); + self.current_id = query_result.id; + return Ok(Some(key)) + } + Ok(None) } pub fn next_key(&mut self) -> Result>, Error> { let mut query_statement = "".to_string(); if let Some(current_id) = &self.current_id { query_statement = format!( - "SELECT id, key FROM {} where id > {} ORDER BY id ASC", + "SELECT MIN(id), key FROM {} where id > {}", self.db.table_name, current_id ); } else { - query_statement = format!("SELECT id, key FROM {} ORDER BY id ASC", self.db.table_name); + query_statement = format!("SELECT MIN(id), key FROM {}", self.db.table_name); } let database = rusqlite::Connection::open(&self.db.env.db_path)?; let mut stmt = database.prepare(&query_statement)?; - let mut query_result = stmt.query_map([], |row| { + let mut query_result = stmt.query_row([], |row| { Ok(QueryResult { id: row.get(0)?, value: row.get(1)?, }) })?; - match query_result.next() { - Some(result) => { - let r: QueryResult = result?; - let key = Cow::from(r.value); - self.current_id = Some(r.id); - Ok(Some(key)) - } - None => Ok(None), - } + if query_result.id.is_some() { + let key = Cow::from(query_result.value.unwrap_or_default()); + self.current_id = query_result.id; + return Ok(Some(key)) + } + Ok(None) } pub fn get_current(&mut self) -> Result, Value<'env>)>, Error> { @@ -289,7 +281,7 @@ impl<'env> Cursor<'env> { .optional()?; if let Some(result) = query_result { - return Ok(Some((Cow::from(result.key), Cow::from(result.value)))); + return Ok(Some((Cow::from(result.key.unwrap_or_default()), Cow::from(result.value.unwrap_or_default())))); } } Ok(None) diff --git a/slasher/tests/proposer_slashings.rs b/slasher/tests/proposer_slashings.rs index 13a3edcbbca..4554ac5a164 100644 --- a/slasher/tests/proposer_slashings.rs +++ b/slasher/tests/proposer_slashings.rs @@ -18,18 +18,12 @@ fn empty_pruning() { #[test] fn block_pruning() { - println!("START"); let slots_per_epoch = E::slots_per_epoch(); - let tempdir = tempdir().unwrap(); - println!("....whty"); let mut config = Config::new(tempdir.path().into()); - println!("....whty2"); config.chunk_size = 2; config.history_length = 2; - println!("....whty3"); let slasher = Slasher::::open(config.clone(), test_logger()).unwrap(); - println!("....whty4"); let current_epoch = Epoch::from(2 * config.history_length); // Pruning the empty database should be safe. From 230f12fb49b4cb3172bc9e86ff6829315c2798d9 Mon Sep 17 00:00:00 2001 From: Eitan Seri-Levi Date: Sun, 17 Sep 2023 19:25:53 +0300 Subject: [PATCH 08/17] fmt --- slasher/src/database/sqlite_impl.rs | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/slasher/src/database/sqlite_impl.rs b/slasher/src/database/sqlite_impl.rs index 98549f3ea7a..4513ece01e9 100644 --- a/slasher/src/database/sqlite_impl.rs +++ b/slasher/src/database/sqlite_impl.rs @@ -208,8 +208,8 @@ impl<'env> Cursor<'env> { if query_result.id.is_some() { let key = Cow::from(query_result.key.unwrap_or_default()); self.current_id = query_result.id; - return Ok(Some(key)) - } + return Ok(Some(key)); + } Ok(None) } @@ -229,8 +229,8 @@ impl<'env> Cursor<'env> { if query_result.id.is_some() { let key = Cow::from(query_result.key.unwrap_or_default()); self.current_id = query_result.id; - return Ok(Some(key)) - } + return Ok(Some(key)); + } Ok(None) } @@ -257,8 +257,8 @@ impl<'env> Cursor<'env> { if query_result.id.is_some() { let key = Cow::from(query_result.value.unwrap_or_default()); self.current_id = query_result.id; - return Ok(Some(key)) - } + return Ok(Some(key)); + } Ok(None) } @@ -281,7 +281,10 @@ impl<'env> Cursor<'env> { .optional()?; if let Some(result) = query_result { - return Ok(Some((Cow::from(result.key.unwrap_or_default()), Cow::from(result.value.unwrap_or_default())))); + return Ok(Some(( + Cow::from(result.key.unwrap_or_default()), + Cow::from(result.value.unwrap_or_default()), + ))); } } Ok(None) From 0cc3ffb84cdbfc89c644067b05e35d5ba28bd003 Mon Sep 17 00:00:00 2001 From: Eitan Seri-Levi Date: Sun, 17 Sep 2023 21:20:01 +0300 Subject: [PATCH 09/17] improve query --- slasher/src/database/sqlite_impl.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/slasher/src/database/sqlite_impl.rs b/slasher/src/database/sqlite_impl.rs index 4513ece01e9..0ee32b899b5 100644 --- a/slasher/src/database/sqlite_impl.rs +++ b/slasher/src/database/sqlite_impl.rs @@ -19,12 +19,12 @@ const BASE_DB: &str = "slasher_db"; impl<'env> Database<'env> {} struct QueryResult { - id: Option, + id: Option, value: Option>, } struct FullQueryResult { - id: Option, + id: Option, key: Option>, value: Option>, } @@ -57,7 +57,7 @@ impl<'env> Drop for RwTransaction<'env> { #[derive(Debug)] pub struct Cursor<'env> { db: &'env Database<'env>, - current_id: Option, + current_id: Option, } impl Environment { @@ -103,7 +103,7 @@ impl Environment { table_name: &'env str, ) -> Result, Error> { let create_table_command = format!( - "CREATE TABLE {} ( + "CREATE TABLE IF NOT EXISTS {} ( id INTEGER PRIMARY KEY AUTOINCREMENT, key BLOB UNIQUE, value BLOB From f5a1a3222b76dbb77fe33849bd60421814ffc892 Mon Sep 17 00:00:00 2001 From: Eitan Seri-Levi Date: Fri, 22 Sep 2023 14:46:35 +0300 Subject: [PATCH 10/17] stash statement --- slasher/src/database/sqlite_impl.rs | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/slasher/src/database/sqlite_impl.rs b/slasher/src/database/sqlite_impl.rs index 0ee32b899b5..a601b8572ab 100644 --- a/slasher/src/database/sqlite_impl.rs +++ b/slasher/src/database/sqlite_impl.rs @@ -140,7 +140,7 @@ impl<'env> RwTransaction<'env> { ) -> Result>, Error> { let query_statement = format!("SELECT * FROM {} where key =:key;", db.table_name); let database = rusqlite::Connection::open(&db.env.db_path)?; - let mut stmt = database.prepare(&query_statement)?; + let mut stmt = database.prepare_cached(&query_statement)?; let query_result = stmt .query_row([key.as_ref()], |row| { @@ -169,14 +169,16 @@ impl<'env> RwTransaction<'env> { db.table_name ); let database = rusqlite::Connection::open(&db.env.db_path)?; - let _ = database.execute(&insert_statement, params![key.as_ref(), value.as_ref()])?; + let mut stmt = database.prepare_cached(&insert_statement)?; + stmt.execute(params![key.as_ref(), value.as_ref()])?; Ok(()) } pub fn del>(&mut self, db: &Database, key: K) -> Result<(), Error> { let delete_statement = format!("DELETE FROM {} WHERE key=?1", db.table_name); let database = rusqlite::Connection::open(&db.env.db_path)?; - let _ = database.execute(&delete_statement, [key.as_ref()])?; + let mut stmt = database.prepare_cached(&delete_statement)?; + stmt.execute(params![key.as_ref()])?; Ok(()) } @@ -196,7 +198,7 @@ impl<'env> Cursor<'env> { pub fn first_key(&mut self) -> Result, Error> { let query_statement = format!("SELECT MIN(id), key, value FROM {}", self.db.table_name); let database = rusqlite::Connection::open(&self.db.env.db_path)?; - let mut stmt = database.prepare(&query_statement)?; + let mut stmt = database.prepare_cached(&query_statement)?; let mut query_result = stmt.query_row([], |row| { Ok(FullQueryResult { id: row.get(0)?, @@ -216,7 +218,7 @@ impl<'env> Cursor<'env> { pub fn last_key(&mut self) -> Result>, Error> { let query_statement = format!("SELECT MAX(id), key, value FROM {}", self.db.table_name); let database = rusqlite::Connection::open(&self.db.env.db_path)?; - let mut stmt = database.prepare(&query_statement)?; + let mut stmt = database.prepare_cached(&query_statement)?; let mut query_result = stmt.query_row([], |row| { Ok(FullQueryResult { @@ -245,7 +247,7 @@ impl<'env> Cursor<'env> { query_statement = format!("SELECT MIN(id), key FROM {}", self.db.table_name); } let database = rusqlite::Connection::open(&self.db.env.db_path)?; - let mut stmt = database.prepare(&query_statement)?; + let mut stmt = database.prepare_cached(&query_statement)?; let mut query_result = stmt.query_row([], |row| { Ok(QueryResult { @@ -269,7 +271,7 @@ impl<'env> Cursor<'env> { self.db.table_name ); let database = rusqlite::Connection::open(&self.db.env.db_path)?; - let mut stmt = database.prepare(&query_statement)?; + let mut stmt = database.prepare_cached(&query_statement)?; let query_result = stmt .query_row([current_id], |row| { Ok(FullQueryResult { @@ -306,7 +308,8 @@ impl<'env> Cursor<'env> { self.db.table_name ); let database = rusqlite::Connection::open(&self.db.env.db_path)?; - let _ = database.execute(&insert_statement, params![key.as_ref(), value.as_ref()])?; + let mut stmt = database.prepare_cached(&insert_statement)?; + stmt.execute(params![key.as_ref(), value.as_ref()])?; Ok(()) } } From e703c24f6a0818402deb791652ff8810c247bc6e Mon Sep 17 00:00:00 2001 From: Eitan Seri-Levi Date: Sat, 30 Sep 2023 10:01:12 +0300 Subject: [PATCH 11/17] sqlite use transaction --- Cargo.lock | 1 + consensus/types/src/indexed_attestation.rs | 5 + slasher/Cargo.toml | 1 + slasher/src/database.rs | 72 +++---- slasher/src/database/interface.rs | 90 ++++----- slasher/src/database/sqlite_impl.rs | 220 +++++++++++---------- 6 files changed, 192 insertions(+), 197 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9abac3be490..cb4ed1befe8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7130,6 +7130,7 @@ version = "0.1.0" dependencies = [ "bincode", "byteorder", + "derivative", "ethereum_ssz", "ethereum_ssz_derive", "filesystem", diff --git a/consensus/types/src/indexed_attestation.rs b/consensus/types/src/indexed_attestation.rs index c59cbef307e..70656d66c59 100644 --- a/consensus/types/src/indexed_attestation.rs +++ b/consensus/types/src/indexed_attestation.rs @@ -40,6 +40,11 @@ impl IndexedAttestation { /// /// Spec v0.12.1 pub fn is_double_vote(&self, other: &Self) -> bool { + println!("{:?}", self.data.target.epoch); + println!("{:?}", other.data.target.epoch); + println!("{:?}", self.data); + println!("{:?}", other.data); + self.data.target.epoch == other.data.target.epoch && self.data != other.data } diff --git a/slasher/Cargo.toml b/slasher/Cargo.toml index efeb5cb5922..2602bc622d3 100644 --- a/slasher/Cargo.toml +++ b/slasher/Cargo.toml @@ -13,6 +13,7 @@ sqlite = ["dep:rusqlite"] [dependencies] bincode = { workspace = true } byteorder = { workspace = true } +derivative = { workspace = true } ethereum_ssz = { workspace = true } ethereum_ssz_derive = { workspace = true } flate2 = { version = "1.0.14", features = ["zlib"], default-features = false } diff --git a/slasher/src/database.rs b/slasher/src/database.rs index bc0e0e63851..d5ec7d4a232 100644 --- a/slasher/src/database.rs +++ b/slasher/src/database.rs @@ -448,9 +448,8 @@ impl SlasherDB { // Store the new indexed attestation at the end of the current table. let db = &self.databases.indexed_attestation_db; - let mut cursor = txn.cursor(db)?; - let indexed_att_id = match cursor.last_key()? { + let indexed_att_id = match txn.last_key(db)? { // First ID is 1 so that 0 can be used to represent `null` in `CompactAttesterRecord`. None => 1, Some(key_bytes) => IndexedAttestationId::parse(key_bytes)? + 1, @@ -459,8 +458,7 @@ impl SlasherDB { let attestation_key = IndexedAttestationId::new(indexed_att_id); let data = indexed_attestation.as_ssz_bytes(); - cursor.put(attestation_key.as_ref(), &data)?; - drop(cursor); + txn.put(db, attestation_key.as_ref(), &data)?; // Update the (epoch, hash) to ID mapping. self.put_indexed_attestation_id(txn, &id_key, attestation_key)?; @@ -692,27 +690,24 @@ impl SlasherDB { .saturating_sub(self.config.history_length) .start_slot(E::slots_per_epoch()); - let mut cursor = txn.cursor(&self.databases.proposers_db)?; + let db = &self.databases.proposers_db; // Position cursor at first key, bailing out if the database is empty. - if cursor.first_key()?.is_none() { + if txn.first_key(db)?.is_none() { return Ok(()); } - loop { - let (key_bytes, _) = cursor.get_current()?.ok_or(Error::MissingProposerKey)?; - - let (slot, _) = ProposerKey::parse(key_bytes)?; + let should_delete = |key: &[u8]| -> Result { + let mut should_delete = false; + let (slot, _) = ProposerKey::parse(Cow::from(key))?; if slot < min_slot { - cursor.delete_current()?; - // End the loop if there is no next entry. - if cursor.next_key()?.is_none() { - break; - } - } else { - break; + should_delete = true; } - } + + Ok(should_delete) + }; + + txn.delete_while(&self.databases.proposers_db, should_delete)?; Ok(()) } @@ -726,38 +721,31 @@ impl SlasherDB { .saturating_add(1u64) .saturating_sub(self.config.history_length as u64); - // Collect indexed attestation IDs to delete. - let mut indexed_attestation_ids = vec![]; - - let mut cursor = txn.cursor(&self.databases.indexed_attestation_id_db)?; + let db = &self.databases.indexed_attestation_id_db; // Position cursor at first key, bailing out if the database is empty. - if cursor.first_key()?.is_none() { + if txn.first_key(db)?.is_none() { return Ok(()); } - loop { - let (key_bytes, value) = cursor - .get_current()? - .ok_or(Error::MissingIndexedAttestationIdKey)?; - - let (target_epoch, _) = IndexedAttestationIdKey::parse(key_bytes)?; - + let should_delete = |key: &[u8]| -> Result { + let (target_epoch, _) = IndexedAttestationIdKey::parse(Cow::from(key))?; if target_epoch < min_epoch { - indexed_attestation_ids.push(IndexedAttestationId::new( - IndexedAttestationId::parse(value)?, - )); + return Ok(true); + } - cursor.delete_current()?; + Ok(false) + }; - if cursor.next_key()?.is_none() { - break; - } - } else { - break; - } - } - drop(cursor); + let indexed_attestation_ids: Vec = txn + .delete_while(&self.databases.proposers_db, should_delete)? + .into_iter() + .map(|value| { + IndexedAttestationId::new( + IndexedAttestationId::parse(Cow::from(value)).unwrap_or_default(), + ) + }) + .collect(); // Delete the indexed attestations. // Optimisation potential: use a cursor here. diff --git a/slasher/src/database/interface.rs b/slasher/src/database/interface.rs index 87c6e1f01ad..46119322ed9 100644 --- a/slasher/src/database/interface.rs +++ b/slasher/src/database/interface.rs @@ -56,17 +56,6 @@ pub struct OpenDatabases<'env> { pub metadata_db: Database<'env>, } -#[derive(Debug)] -pub enum Cursor<'env> { - #[cfg(feature = "mdbx")] - Mdbx(mdbx_impl::Cursor<'env>), - #[cfg(feature = "lmdb")] - Lmdb(lmdb_impl::Cursor<'env>), - #[cfg(feature = "sqlite")] - Sqlite(sqlite_impl::Cursor<'env>), - Disabled(PhantomData<&'env ()>), -} - pub type Key<'a> = Cow<'a, [u8]>; pub type Value<'a> = Cow<'a, [u8]>; @@ -169,103 +158,92 @@ impl<'env> RwTransaction<'env> { } } - pub fn cursor<'a>(&'a mut self, db: &'a Database) -> Result, Error> { + pub fn first_key(&mut self, db: &Database) -> Result, Error> { match (self, db) { - #[cfg(feature = "mdbx")] - (Self::Mdbx(txn), Database::Mdbx(db)) => txn.cursor(db).map(Cursor::Mdbx), - #[cfg(feature = "lmdb")] - (Self::Lmdb(txn), Database::Lmdb(db)) => txn.cursor(db).map(Cursor::Lmdb), - #[cfg(feature = "sqlite")] - (Self::Sqlite(txn), Database::Sqlite(db)) => txn.cursor(db).map(Cursor::Sqlite), - _ => Err(Error::MismatchedDatabaseVariant), - } - } - - pub fn commit(self) -> Result<(), Error> { - match self { - #[cfg(feature = "mdbx")] - Self::Mdbx(txn) => txn.commit(), - #[cfg(feature = "lmdb")] - Self::Lmdb(txn) => txn.commit(), - #[cfg(feature = "sqlite")] - Self::Sqlite(txn) => txn.commit(), - _ => Err(Error::MismatchedDatabaseVariant), - } - } -} - -impl<'env> Cursor<'env> { - /// Return the first key in the current database while advancing the cursor's position. - pub fn first_key(&mut self) -> Result, Error> { - match self { #[cfg(feature = "mdbx")] Cursor::Mdbx(cursor) => cursor.first_key(), #[cfg(feature = "lmdb")] Cursor::Lmdb(cursor) => cursor.first_key(), #[cfg(feature = "sqlite")] - Cursor::Sqlite(cursor) => cursor.first_key(), + (Self::Sqlite(txn), Database::Sqlite(db)) => txn.first_key(db), _ => Err(Error::MismatchedDatabaseVariant), } } /// Return the last key in the current database while advancing the cursor's position. - pub fn last_key(&mut self) -> Result, Error> { - match self { + pub fn last_key(&mut self, db: &Database) -> Result, Error> { + match (self, db) { #[cfg(feature = "mdbx")] Cursor::Mdbx(cursor) => cursor.last_key(), #[cfg(feature = "lmdb")] Cursor::Lmdb(cursor) => cursor.last_key(), #[cfg(feature = "sqlite")] - Cursor::Sqlite(cursor) => cursor.last_key(), + (Self::Sqlite(txn), Database::Sqlite(db)) => txn.last_key(db), _ => Err(Error::MismatchedDatabaseVariant), } } - pub fn next_key(&mut self) -> Result, Error> { - match self { + pub fn next_key(&mut self, db: &Database) -> Result, Error> { + match (self, db) { #[cfg(feature = "mdbx")] Cursor::Mdbx(cursor) => cursor.next_key(), #[cfg(feature = "lmdb")] Cursor::Lmdb(cursor) => cursor.next_key(), #[cfg(feature = "sqlite")] - Cursor::Sqlite(cursor) => cursor.next_key(), + (Self::Sqlite(txn), Database::Sqlite(db)) => txn.next_key(db), _ => Err(Error::MismatchedDatabaseVariant), } } /// Get the key value pair at the current position. - pub fn get_current(&mut self) -> Result, Error> { - match self { + pub fn get_current(&mut self, db: &Database) -> Result, Error> { + match (self, db) { #[cfg(feature = "mdbx")] Cursor::Mdbx(cursor) => cursor.get_current(), #[cfg(feature = "lmdb")] Cursor::Lmdb(cursor) => cursor.get_current(), #[cfg(feature = "sqlite")] - Cursor::Sqlite(cursor) => cursor.get_current(), + (Self::Sqlite(txn), Database::Sqlite(db)) => txn.get_current(db), _ => Err(Error::MismatchedDatabaseVariant), } } - pub fn delete_current(&mut self) -> Result<(), Error> { - match self { + pub fn delete_current(&mut self, db: &Database) -> Result<(), Error> { + match (self, db) { #[cfg(feature = "mdbx")] Cursor::Mdbx(cursor) => cursor.delete_current(), #[cfg(feature = "lmdb")] Cursor::Lmdb(cursor) => cursor.delete_current(), #[cfg(feature = "sqlite")] - Cursor::Sqlite(cursor) => cursor.delete_current(), + (Self::Sqlite(txn), Database::Sqlite(db)) => txn.delete_current(db), + _ => Err(Error::MismatchedDatabaseVariant), + } + } + + pub fn delete_while( + &mut self, + db: &Database, + f: impl Fn(&[u8]) -> Result, + ) -> Result>, Error> { + match (self, db) { + #[cfg(feature = "mdbx")] + (Self::Mdbx(txn), Database::Mdbx(db)) => txn.del(db, key), + #[cfg(feature = "lmdb")] + (Self::Lmdb(txn), Database::Lmdb(db)) => txn.del(db, key), + #[cfg(feature = "sqlite")] + (Self::Sqlite(txn), Database::Sqlite(db)) => txn.delete_while(db, f), _ => Err(Error::MismatchedDatabaseVariant), } } - pub fn put, V: AsRef<[u8]>>(&mut self, key: K, value: V) -> Result<(), Error> { + pub fn commit(self) -> Result<(), Error> { match self { #[cfg(feature = "mdbx")] - Self::Mdbx(cursor) => cursor.put(key, value), + Self::Mdbx(txn) => txn.commit(), #[cfg(feature = "lmdb")] - Self::Lmdb(cursor) => cursor.put(key, value), + Self::Lmdb(txn) => txn.commit(), #[cfg(feature = "sqlite")] - Self::Sqlite(cursor) => cursor.put(key, value), + Self::Sqlite(txn) => txn.commit(), _ => Err(Error::MismatchedDatabaseVariant), } } diff --git a/slasher/src/database/sqlite_impl.rs b/slasher/src/database/sqlite_impl.rs index a601b8572ab..0298c162d34 100644 --- a/slasher/src/database/sqlite_impl.rs +++ b/slasher/src/database/sqlite_impl.rs @@ -1,6 +1,7 @@ #![cfg(feature = "sqlite")] -use rusqlite::{params, OptionalExtension, ToSql}; -use std::fmt; +use rusqlite::{params, OptionalExtension, ToSql, Transaction, Connection}; +use std::{fmt, collections::HashMap}; +use derivative::Derivative; use std::{ borrow::{Borrow, Cow}, path::PathBuf, @@ -20,7 +21,7 @@ impl<'env> Database<'env> {} struct QueryResult { id: Option, - value: Option>, + key: Option>, } struct FullQueryResult { @@ -33,6 +34,7 @@ struct FullQueryResult { pub struct Environment { _db_count: usize, db_path: String, + conn: Connection, } #[derive(Debug)] @@ -41,36 +43,27 @@ pub struct Database<'env> { table_name: &'env str, } -#[derive(Debug)] +#[derive(Derivative)] +#[derivative(Debug)] pub struct RwTransaction<'env> { - // txn: Option>, + db_path: String, + cursor: HashMap, + txn: Transaction<'env>, _phantom: PhantomData<&'env ()>, } -impl<'env> Drop for RwTransaction<'env> { - fn drop(&mut self) { - // Perform any necessary cleanup or resource deallocation here - // This code will be automatically executed when an instance of MyStruct goes out of scope. - } -} - -#[derive(Debug)] -pub struct Cursor<'env> { - db: &'env Database<'env>, - current_id: Option, -} - impl Environment { pub fn new(config: &Config) -> Result { let db_path = match config.database_path.join(BASE_DB).as_path().to_str() { Some(path) => path.to_string(), None => "".to_string(), }; - let _ = rusqlite::Connection::open(&db_path)?; + let conn = rusqlite::Connection::open(&db_path)?; Ok(Environment { _db_count: MAX_NUM_DBS, db_path, + conn }) } @@ -126,8 +119,13 @@ impl Environment { } pub fn begin_rw_txn(&self) -> Result { + + Ok(RwTransaction { _phantom: PhantomData, + db_path: self.db_path.clone(), + cursor: HashMap::new(), + txn: self.conn.unchecked_transaction()?, }) } } @@ -139,8 +137,7 @@ impl<'env> RwTransaction<'env> { key: &K, ) -> Result>, Error> { let query_statement = format!("SELECT * FROM {} where key =:key;", db.table_name); - let database = rusqlite::Connection::open(&db.env.db_path)?; - let mut stmt = database.prepare_cached(&query_statement)?; + let mut stmt = self.txn.prepare_cached(&query_statement)?; let query_result = stmt .query_row([key.as_ref()], |row| { @@ -154,7 +151,9 @@ impl<'env> RwTransaction<'env> { match query_result { Some(result) => Ok(Some(Cow::from(result.value.unwrap_or_default()))), - None => Ok(None), + None => { + Ok(None) + }, } } @@ -168,37 +167,28 @@ impl<'env> RwTransaction<'env> { "INSERT OR REPLACE INTO {} (key, value) VALUES (?1, ?2)", db.table_name ); - let database = rusqlite::Connection::open(&db.env.db_path)?; - let mut stmt = database.prepare_cached(&insert_statement)?; - stmt.execute(params![key.as_ref(), value.as_ref()])?; + self.txn.execute(&insert_statement, params![key.as_ref().to_owned(), value.as_ref().to_owned()])?; Ok(()) } - + pub fn del>(&mut self, db: &Database, key: K) -> Result<(), Error> { let delete_statement = format!("DELETE FROM {} WHERE key=?1", db.table_name); - let database = rusqlite::Connection::open(&db.env.db_path)?; - let mut stmt = database.prepare_cached(&delete_statement)?; - stmt.execute(params![key.as_ref()])?; + self.txn.execute(&delete_statement, params![key.as_ref().to_owned()])?; Ok(()) } - pub fn cursor<'a>(&'a mut self, db: &'a Database) -> Result, Error> { - Ok(Cursor { - db, - current_id: None, - }) - } - - pub fn commit(self) -> Result<(), Error> { + pub fn delete_current(&mut self, db: &Database) -> Result<(), Error> { + if let Some(current_id) = self.cursor.get(db.table_name) { + let delete_statement = format!("DELETE FROM {} WHERE id=?1", db.table_name); + self.txn.execute(&delete_statement, params![current_id.to_owned()])?; + self.cursor.remove(db.table_name); + } Ok(()) } -} -impl<'env> Cursor<'env> { - pub fn first_key(&mut self) -> Result, Error> { - let query_statement = format!("SELECT MIN(id), key, value FROM {}", self.db.table_name); - let database = rusqlite::Connection::open(&self.db.env.db_path)?; - let mut stmt = database.prepare_cached(&query_statement)?; + pub fn first_key(&mut self, db: &Database) -> Result, Error> { + let query_statement = format!("SELECT MIN(id), key, value FROM {}", db.table_name); + let mut stmt = self.txn.prepare_cached(&query_statement)?; let mut query_result = stmt.query_row([], |row| { Ok(FullQueryResult { id: row.get(0)?, @@ -207,18 +197,17 @@ impl<'env> Cursor<'env> { }) })?; - if query_result.id.is_some() { - let key = Cow::from(query_result.key.unwrap_or_default()); - self.current_id = query_result.id; - return Ok(Some(key)); - } + if let Some(key) = query_result.key { + self.cursor.insert(db.table_name.to_string(), query_result.id.unwrap_or_default()); + return Ok(Some(Cow::from(key))); + } + Ok(None) } - pub fn last_key(&mut self) -> Result>, Error> { - let query_statement = format!("SELECT MAX(id), key, value FROM {}", self.db.table_name); - let database = rusqlite::Connection::open(&self.db.env.db_path)?; - let mut stmt = database.prepare_cached(&query_statement)?; + pub fn last_key(&mut self, db: &Database) -> Result>, Error> { + let query_statement = format!("SELECT MAX(id), key, value FROM {}", db.table_name); + let mut stmt = self.txn.prepare_cached(&query_statement)?; let mut query_result = stmt.query_row([], |row| { Ok(FullQueryResult { @@ -228,50 +217,67 @@ impl<'env> Cursor<'env> { }) })?; - if query_result.id.is_some() { - let key = Cow::from(query_result.key.unwrap_or_default()); - self.current_id = query_result.id; - return Ok(Some(key)); - } + if let Some(key) = query_result.key { + self.cursor.insert(db.table_name.to_string(), query_result.id.unwrap_or_default()); + return Ok(Some(Cow::from(key))); + } + Ok(None) } - pub fn next_key(&mut self) -> Result>, Error> { + pub fn next_key(&mut self, db: &Database) -> Result>, Error> { + let mut query_statement = "".to_string(); - if let Some(current_id) = &self.current_id { - query_statement = format!( - "SELECT MIN(id), key FROM {} where id > {}", - self.db.table_name, current_id - ); - } else { - query_statement = format!("SELECT MIN(id), key FROM {}", self.db.table_name); - } - let database = rusqlite::Connection::open(&self.db.env.db_path)?; - let mut stmt = database.prepare_cached(&query_statement)?; - let mut query_result = stmt.query_row([], |row| { - Ok(QueryResult { - id: row.get(0)?, - value: row.get(1)?, - }) - })?; + let query_result = match self.cursor.get(db.table_name) { + Some(current_key) => { + query_statement = format!( + "SELECT MIN(id), key FROM {} where id >?1", + db.table_name + ); + + let mut stmt = self.txn.prepare_cached(&query_statement)?; + + let mut query_result = stmt.query_row(params![current_key], |row| { + Ok(QueryResult { + id: row.get(0)?, + key: row.get(1)?, + }) + })?; + + query_result + }, + None => { + query_statement = format!("SELECT MIN(id), key FROM {}", db.table_name); + + let mut stmt = self.txn.prepare_cached(&query_statement)?; + + let mut query_result = stmt.query_row([], |row| { + Ok(QueryResult { + id: row.get(0)?, + key: row.get(1)?, + }) + })?; + + query_result + }, + }; - if query_result.id.is_some() { - let key = Cow::from(query_result.value.unwrap_or_default()); - self.current_id = query_result.id; - return Ok(Some(key)); + if let Some(key) = query_result.key { + self.cursor.insert(db.table_name.to_string(), query_result.id.unwrap_or_default()); + return Ok(Some(Cow::from(key))); } + Ok(None) } - pub fn get_current(&mut self) -> Result, Value<'env>)>, Error> { - if let Some(current_id) = &self.current_id { + pub fn get_current(&mut self, db: &Database) -> Result, Value<'env>)>, Error> { + if let Some(current_id) = self.cursor.get(db.table_name) { let query_statement = format!( "SELECT id, key, value FROM {} where id=?1", - self.db.table_name + db.table_name ); - let database = rusqlite::Connection::open(&self.db.env.db_path)?; - let mut stmt = database.prepare_cached(&query_statement)?; + let mut stmt = self.txn.prepare_cached(&query_statement)?; let query_result = stmt .query_row([current_id], |row| { Ok(FullQueryResult { @@ -292,24 +298,40 @@ impl<'env> Cursor<'env> { Ok(None) } - pub fn delete_current(&mut self) -> Result<(), Error> { - if let Some(current_id) = &self.current_id { - let delete_statement = format!("DELETE FROM {} WHERE id=?1", self.db.table_name); - let database = rusqlite::Connection::open(&self.db.env.db_path)?; - let _ = database.execute(&delete_statement, [current_id])?; - self.current_id = None; - } - Ok(()) + pub fn delete_while( + &mut self, + db: &Database, + f: impl Fn(&[u8]) -> Result, + ) -> Result>, Error> { + let mut deleted_values: Vec> = vec![]; + if let Some(current_key) = &self.cursor.get(db.table_name) { + let query_statement = format!( + "SELECT id, key, value FROM {} where id>=?1", + db.table_name + ); + let mut stmt = self.txn.prepare(&query_statement)?; + let rows = stmt.query_map(params![current_key], |row| { + Ok(FullQueryResult { + id: row.get(0)?, + key: row.get(1)?, + value: row.get(2)?, + }) + })?; + + for row in rows { + let query_result = row?; + + if f(&query_result.key.unwrap())? { + let delete_statement = format!("DELETE FROM {} WHERE id=?1", db.table_name); + self.txn.execute(&delete_statement, params![query_result.id.unwrap()])?; + } + } + }; + Ok(deleted_values) } - pub fn put, V: AsRef<[u8]>>(&mut self, key: K, value: V) -> Result<(), Error> { - let insert_statement = format!( - "INSERT OR REPLACE INTO {} (key, value) VALUES (?1, ?2)", - self.db.table_name - ); - let database = rusqlite::Connection::open(&self.db.env.db_path)?; - let mut stmt = database.prepare_cached(&insert_statement)?; - stmt.execute(params![key.as_ref(), value.as_ref()])?; + pub fn commit(mut self) -> Result<(), Error> { + self.txn.commit()?; Ok(()) } } From c676dec655a5fd5337c480daccd6cf1bba166c71 Mon Sep 17 00:00:00 2001 From: Eitan Seri-Levi Date: Sat, 30 Sep 2023 10:28:48 +0300 Subject: [PATCH 12/17] fixes --- consensus/types/src/indexed_attestation.rs | 5 ----- slasher/src/database/sqlite_impl.rs | 4 ++-- 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/consensus/types/src/indexed_attestation.rs b/consensus/types/src/indexed_attestation.rs index 70656d66c59..c59cbef307e 100644 --- a/consensus/types/src/indexed_attestation.rs +++ b/consensus/types/src/indexed_attestation.rs @@ -40,11 +40,6 @@ impl IndexedAttestation { /// /// Spec v0.12.1 pub fn is_double_vote(&self, other: &Self) -> bool { - println!("{:?}", self.data.target.epoch); - println!("{:?}", other.data.target.epoch); - println!("{:?}", self.data); - println!("{:?}", other.data); - self.data.target.epoch == other.data.target.epoch && self.data != other.data } diff --git a/slasher/src/database/sqlite_impl.rs b/slasher/src/database/sqlite_impl.rs index 0298c162d34..1b5cb5ad9be 100644 --- a/slasher/src/database/sqlite_impl.rs +++ b/slasher/src/database/sqlite_impl.rs @@ -120,7 +120,7 @@ impl Environment { pub fn begin_rw_txn(&self) -> Result { - + Ok(RwTransaction { _phantom: PhantomData, db_path: self.db_path.clone(), @@ -330,7 +330,7 @@ impl<'env> RwTransaction<'env> { Ok(deleted_values) } - pub fn commit(mut self) -> Result<(), Error> { + pub fn commit(self) -> Result<(), Error> { self.txn.commit()?; Ok(()) } From 2f8a0f2e436e82dab904cdadd6ce5a731f7d0692 Mon Sep 17 00:00:00 2001 From: Eitan Seri-Levi Date: Sat, 30 Sep 2023 22:06:32 +0300 Subject: [PATCH 13/17] r2d2 --- Cargo.lock | 2 ++ slasher/Cargo.toml | 2 ++ slasher/file.db | 0 slasher/src/database/interface.rs | 1 + slasher/src/database/sqlite_impl.rs | 14 ++++++++++---- 5 files changed, 15 insertions(+), 4 deletions(-) create mode 100644 slasher/file.db diff --git a/Cargo.lock b/Cargo.lock index fcf5111db93..f4a1d8fdcc5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7144,6 +7144,8 @@ dependencies = [ "lru 0.7.8", "maplit", "parking_lot 0.12.1", + "r2d2", + "r2d2_sqlite", "rand", "rayon", "rusqlite", diff --git a/slasher/Cargo.toml b/slasher/Cargo.toml index 2602bc622d3..fa5e47a80ba 100644 --- a/slasher/Cargo.toml +++ b/slasher/Cargo.toml @@ -34,6 +34,8 @@ types = { workspace = true } strum = { workspace = true } rusqlite = { version = "*", optional = true} +r2d2 = "0.8.10" +r2d2_sqlite = "0.21.0" # MDBX is pinned at the last version with Windows and macOS support. mdbx = { package = "libmdbx", git = "https://github.com/sigp/libmdbx-rs", tag = "v0.1.4", optional = true } diff --git a/slasher/file.db b/slasher/file.db new file mode 100644 index 00000000000..e69de29bb2d diff --git a/slasher/src/database/interface.rs b/slasher/src/database/interface.rs index 46119322ed9..30b63096d7a 100644 --- a/slasher/src/database/interface.rs +++ b/slasher/src/database/interface.rs @@ -88,6 +88,7 @@ impl Environment { pub fn begin_rw_txn(&self) -> Result { match self { + /* */ #[cfg(feature = "mdbx")] Self::Mdbx(env) => env.begin_rw_txn().map(RwTransaction::Mdbx), #[cfg(feature = "lmdb")] diff --git a/slasher/src/database/sqlite_impl.rs b/slasher/src/database/sqlite_impl.rs index 1b5cb5ad9be..2e45e07da15 100644 --- a/slasher/src/database/sqlite_impl.rs +++ b/slasher/src/database/sqlite_impl.rs @@ -1,4 +1,6 @@ #![cfg(feature = "sqlite")] +use r2d2::PooledConnection; +use r2d2_sqlite::SqliteConnectionManager; use rusqlite::{params, OptionalExtension, ToSql, Transaction, Connection}; use std::{fmt, collections::HashMap}; use derivative::Derivative; @@ -34,7 +36,7 @@ struct FullQueryResult { pub struct Environment { _db_count: usize, db_path: String, - conn: Connection, + conn: PooledConnection } #[derive(Debug)] @@ -58,7 +60,9 @@ impl Environment { Some(path) => path.to_string(), None => "".to_string(), }; - let conn = rusqlite::Connection::open(&db_path)?; + let manager = SqliteConnectionManager::file(&db_path); + let pool = r2d2::Pool::builder().build(manager).unwrap(); + let conn = pool.get().unwrap(); Ok(Environment { _db_count: MAX_NUM_DBS, @@ -114,13 +118,15 @@ impl Environment { })) } + pub fn db_path(&self) -> String { + return self.db_path.clone(); + } + pub fn filenames(&self, config: &Config) -> Vec { vec![config.database_path.join(BASE_DB)] } pub fn begin_rw_txn(&self) -> Result { - - Ok(RwTransaction { _phantom: PhantomData, db_path: self.db_path.clone(), From 6b94d1191a2a7dae0025bd2e95063a24befc3d2d Mon Sep 17 00:00:00 2001 From: Eitan Seri-Levi Date: Tue, 10 Oct 2023 00:23:17 +0300 Subject: [PATCH 14/17] pragma --- slasher/src/database/sqlite_impl.rs | 65 +++++++++++++++++++---------- 1 file changed, 42 insertions(+), 23 deletions(-) diff --git a/slasher/src/database/sqlite_impl.rs b/slasher/src/database/sqlite_impl.rs index 2e45e07da15..f465d7e799d 100644 --- a/slasher/src/database/sqlite_impl.rs +++ b/slasher/src/database/sqlite_impl.rs @@ -1,5 +1,5 @@ #![cfg(feature = "sqlite")] -use r2d2::PooledConnection; +use r2d2::{PooledConnection, Pool}; use r2d2_sqlite::SqliteConnectionManager; use rusqlite::{params, OptionalExtension, ToSql, Transaction, Connection}; use std::{fmt, collections::HashMap}; @@ -36,7 +36,7 @@ struct FullQueryResult { pub struct Environment { _db_count: usize, db_path: String, - conn: PooledConnection + pool: Pool } #[derive(Debug)] @@ -50,7 +50,7 @@ pub struct Database<'env> { pub struct RwTransaction<'env> { db_path: String, cursor: HashMap, - txn: Transaction<'env>, + conn: PooledConnection, _phantom: PhantomData<&'env ()>, } @@ -62,12 +62,11 @@ impl Environment { }; let manager = SqliteConnectionManager::file(&db_path); let pool = r2d2::Pool::builder().build(manager).unwrap(); - let conn = pool.get().unwrap(); Ok(Environment { _db_count: MAX_NUM_DBS, db_path, - conn + pool }) } @@ -127,15 +126,20 @@ impl Environment { } pub fn begin_rw_txn(&self) -> Result { + + let conn: PooledConnection = self.pool.get().unwrap(); + conn.pragma_update(None, "journal_mode", "wal"); + conn.pragma_update(None, "synchronous", "NORMAL"); Ok(RwTransaction { _phantom: PhantomData, db_path: self.db_path.clone(), cursor: HashMap::new(), - txn: self.conn.unchecked_transaction()?, + conn, }) } } + impl<'env> RwTransaction<'env> { pub fn get + ?Sized>( &'env self, @@ -143,7 +147,9 @@ impl<'env> RwTransaction<'env> { key: &K, ) -> Result>, Error> { let query_statement = format!("SELECT * FROM {} where key =:key;", db.table_name); - let mut stmt = self.txn.prepare_cached(&query_statement)?; + + let txn = self.conn.unchecked_transaction()?; + let mut stmt = txn.prepare_cached(&query_statement)?; let query_result = stmt .query_row([key.as_ref()], |row| { @@ -173,20 +179,26 @@ impl<'env> RwTransaction<'env> { "INSERT OR REPLACE INTO {} (key, value) VALUES (?1, ?2)", db.table_name ); - self.txn.execute(&insert_statement, params![key.as_ref().to_owned(), value.as_ref().to_owned()])?; + let txn = self.conn.transaction()?; + txn.execute(&insert_statement, params![key.as_ref().to_owned(), value.as_ref().to_owned()])?; + txn.commit()?; Ok(()) } pub fn del>(&mut self, db: &Database, key: K) -> Result<(), Error> { let delete_statement = format!("DELETE FROM {} WHERE key=?1", db.table_name); - self.txn.execute(&delete_statement, params![key.as_ref().to_owned()])?; + let txn = self.conn.transaction()?; + txn.execute(&delete_statement, params![key.as_ref().to_owned()])?; + txn.commit()?; Ok(()) } pub fn delete_current(&mut self, db: &Database) -> Result<(), Error> { if let Some(current_id) = self.cursor.get(db.table_name) { let delete_statement = format!("DELETE FROM {} WHERE id=?1", db.table_name); - self.txn.execute(&delete_statement, params![current_id.to_owned()])?; + let txn = self.conn.transaction()?; + txn.execute(&delete_statement, params![current_id.to_owned()])?; + txn.commit()?; self.cursor.remove(db.table_name); } Ok(()) @@ -194,7 +206,8 @@ impl<'env> RwTransaction<'env> { pub fn first_key(&mut self, db: &Database) -> Result, Error> { let query_statement = format!("SELECT MIN(id), key, value FROM {}", db.table_name); - let mut stmt = self.txn.prepare_cached(&query_statement)?; + let txn = self.conn.transaction()?; + let mut stmt = txn.prepare_cached(&query_statement)?; let mut query_result = stmt.query_row([], |row| { Ok(FullQueryResult { id: row.get(0)?, @@ -213,7 +226,8 @@ impl<'env> RwTransaction<'env> { pub fn last_key(&mut self, db: &Database) -> Result>, Error> { let query_statement = format!("SELECT MAX(id), key, value FROM {}", db.table_name); - let mut stmt = self.txn.prepare_cached(&query_statement)?; + let txn = self.conn.transaction()?; + let mut stmt = txn.prepare_cached(&query_statement)?; let mut query_result = stmt.query_row([], |row| { Ok(FullQueryResult { @@ -241,8 +255,8 @@ impl<'env> RwTransaction<'env> { "SELECT MIN(id), key FROM {} where id >?1", db.table_name ); - - let mut stmt = self.txn.prepare_cached(&query_statement)?; + let txn = self.conn.transaction()?; + let mut stmt = txn.prepare_cached(&query_statement)?; let mut query_result = stmt.query_row(params![current_key], |row| { Ok(QueryResult { @@ -255,8 +269,8 @@ impl<'env> RwTransaction<'env> { }, None => { query_statement = format!("SELECT MIN(id), key FROM {}", db.table_name); - - let mut stmt = self.txn.prepare_cached(&query_statement)?; + let txn = self.conn.transaction()?; + let mut stmt = txn.prepare_cached(&query_statement)?; let mut query_result = stmt.query_row([], |row| { Ok(QueryResult { @@ -283,7 +297,8 @@ impl<'env> RwTransaction<'env> { "SELECT id, key, value FROM {} where id=?1", db.table_name ); - let mut stmt = self.txn.prepare_cached(&query_statement)?; + let txn = self.conn.transaction()?; + let mut stmt = txn.prepare_cached(&query_statement)?; let query_result = stmt .query_row([current_id], |row| { Ok(FullQueryResult { @@ -315,7 +330,8 @@ impl<'env> RwTransaction<'env> { "SELECT id, key, value FROM {} where id>=?1", db.table_name ); - let mut stmt = self.txn.prepare(&query_statement)?; + + let mut stmt = self.conn.prepare(&query_statement)?; let rows = stmt.query_map(params![current_key], |row| { Ok(FullQueryResult { id: row.get(0)?, @@ -323,21 +339,24 @@ impl<'env> RwTransaction<'env> { value: row.get(2)?, }) })?; - + let txn = self.conn.unchecked_transaction()?; for row in rows { let query_result = row?; if f(&query_result.key.unwrap())? { let delete_statement = format!("DELETE FROM {} WHERE id=?1", db.table_name); - self.txn.execute(&delete_statement, params![query_result.id.unwrap()])?; + txn.execute(&delete_statement, params![query_result.id.unwrap()])?; } } + + txn.commit()?; }; Ok(deleted_values) } - pub fn commit(self) -> Result<(), Error> { - self.txn.commit()?; + pub fn commit(mut self) -> Result<(), Error> { + let txn = self.conn.transaction()?; + txn.commit()?; Ok(()) } -} +} \ No newline at end of file From 2a15f53515e79aca07d46d2344236e7b59af60e7 Mon Sep 17 00:00:00 2001 From: Eitan Seri-Levi Date: Tue, 10 Oct 2023 22:11:20 +0300 Subject: [PATCH 15/17] prepare cache statement --- slasher/src/database/sqlite_impl.rs | 53 +++++++++++++---------------- 1 file changed, 23 insertions(+), 30 deletions(-) diff --git a/slasher/src/database/sqlite_impl.rs b/slasher/src/database/sqlite_impl.rs index f465d7e799d..930aef74625 100644 --- a/slasher/src/database/sqlite_impl.rs +++ b/slasher/src/database/sqlite_impl.rs @@ -1,7 +1,7 @@ #![cfg(feature = "sqlite")] use r2d2::{PooledConnection, Pool}; use r2d2_sqlite::SqliteConnectionManager; -use rusqlite::{params, OptionalExtension, ToSql, Transaction, Connection}; +use rusqlite::{params, OptionalExtension, ToSql, Transaction, Connection, named_params}; use std::{fmt, collections::HashMap}; use derivative::Derivative; use std::{ @@ -148,11 +148,10 @@ impl<'env> RwTransaction<'env> { ) -> Result>, Error> { let query_statement = format!("SELECT * FROM {} where key =:key;", db.table_name); - let txn = self.conn.unchecked_transaction()?; - let mut stmt = txn.prepare_cached(&query_statement)?; + let mut stmt = self.conn.prepare_cached(&query_statement)?; let query_result = stmt - .query_row([key.as_ref()], |row| { + .query_row(named_params![":key": key.as_ref()], |row| { Ok(FullQueryResult { id: row.get(0)?, key: row.get(1)?, @@ -176,29 +175,26 @@ impl<'env> RwTransaction<'env> { value: V, ) -> Result<(), Error> { let insert_statement = format!( - "INSERT OR REPLACE INTO {} (key, value) VALUES (?1, ?2)", + "INSERT OR REPLACE INTO {} (key, value) VALUES (:key, :value)", db.table_name ); - let txn = self.conn.transaction()?; - txn.execute(&insert_statement, params![key.as_ref().to_owned(), value.as_ref().to_owned()])?; - txn.commit()?; + let mut stmt = self.conn.prepare_cached(&insert_statement)?; + stmt.execute(named_params![":key": key.as_ref().to_owned(), ":value": value.as_ref().to_owned()])?; Ok(()) } pub fn del>(&mut self, db: &Database, key: K) -> Result<(), Error> { - let delete_statement = format!("DELETE FROM {} WHERE key=?1", db.table_name); - let txn = self.conn.transaction()?; - txn.execute(&delete_statement, params![key.as_ref().to_owned()])?; - txn.commit()?; + let delete_statement = format!("DELETE FROM {} WHERE key=:id", db.table_name); + let mut stmt = self.conn.prepare_cached(&delete_statement)?; + stmt.execute(named_params![":id": key.as_ref().to_owned()])?; Ok(()) } pub fn delete_current(&mut self, db: &Database) -> Result<(), Error> { if let Some(current_id) = self.cursor.get(db.table_name) { - let delete_statement = format!("DELETE FROM {} WHERE id=?1", db.table_name); - let txn = self.conn.transaction()?; - txn.execute(&delete_statement, params![current_id.to_owned()])?; - txn.commit()?; + let delete_statement = format!("DELETE FROM {} WHERE id=:id", db.table_name); + let mut stmt = self.conn.prepare_cached(&delete_statement)?; + stmt.execute(named_params![":id": current_id.to_owned()])?; self.cursor.remove(db.table_name); } Ok(()) @@ -252,13 +248,13 @@ impl<'env> RwTransaction<'env> { let query_result = match self.cursor.get(db.table_name) { Some(current_key) => { query_statement = format!( - "SELECT MIN(id), key FROM {} where id >?1", + "SELECT MIN(id), key FROM {} where id >:id", db.table_name ); let txn = self.conn.transaction()?; let mut stmt = txn.prepare_cached(&query_statement)?; - let mut query_result = stmt.query_row(params![current_key], |row| { + let mut query_result = stmt.query_row(named_params![":id": current_key], |row| { Ok(QueryResult { id: row.get(0)?, key: row.get(1)?, @@ -294,13 +290,13 @@ impl<'env> RwTransaction<'env> { pub fn get_current(&mut self, db: &Database) -> Result, Value<'env>)>, Error> { if let Some(current_id) = self.cursor.get(db.table_name) { let query_statement = format!( - "SELECT id, key, value FROM {} where id=?1", + "SELECT id, key, value FROM {} where id=:id", db.table_name ); let txn = self.conn.transaction()?; let mut stmt = txn.prepare_cached(&query_statement)?; let query_result = stmt - .query_row([current_id], |row| { + .query_row(named_params![":id": current_id], |row| { Ok(FullQueryResult { id: row.get(0)?, key: row.get(1)?, @@ -327,36 +323,33 @@ impl<'env> RwTransaction<'env> { let mut deleted_values: Vec> = vec![]; if let Some(current_key) = &self.cursor.get(db.table_name) { let query_statement = format!( - "SELECT id, key, value FROM {} where id>=?1", + "SELECT id, key, value FROM {} where id>=:id", db.table_name ); - let mut stmt = self.conn.prepare(&query_statement)?; - let rows = stmt.query_map(params![current_key], |row| { + let mut stmt = self.conn.prepare_cached(&query_statement)?; + let rows = stmt.query_map(named_params![":id": current_key], |row| { Ok(FullQueryResult { id: row.get(0)?, key: row.get(1)?, value: row.get(2)?, }) })?; - let txn = self.conn.unchecked_transaction()?; + + let delete_statement = format!("DELETE FROM {} WHERE id=:id", db.table_name); + let mut stmt = self.conn.prepare_cached(&delete_statement)?; for row in rows { let query_result = row?; if f(&query_result.key.unwrap())? { - let delete_statement = format!("DELETE FROM {} WHERE id=?1", db.table_name); - txn.execute(&delete_statement, params![query_result.id.unwrap()])?; + stmt.execute(named_params![":id": query_result.id.unwrap()])?; } } - - txn.commit()?; }; Ok(deleted_values) } pub fn commit(mut self) -> Result<(), Error> { - let txn = self.conn.transaction()?; - txn.commit()?; Ok(()) } } \ No newline at end of file From 4d51964713fe4cc0faa8cccd827fa28d5094bfd2 Mon Sep 17 00:00:00 2001 From: Eitan Seri-Levi Date: Thu, 12 Oct 2023 23:21:01 +0300 Subject: [PATCH 16/17] remove id --- slasher/src/database/sqlite_impl.rs | 91 ++++++++++++----------------- 1 file changed, 38 insertions(+), 53 deletions(-) diff --git a/slasher/src/database/sqlite_impl.rs b/slasher/src/database/sqlite_impl.rs index 930aef74625..f27ddfb8da4 100644 --- a/slasher/src/database/sqlite_impl.rs +++ b/slasher/src/database/sqlite_impl.rs @@ -22,12 +22,10 @@ const BASE_DB: &str = "slasher_db"; impl<'env> Database<'env> {} struct QueryResult { - id: Option, key: Option>, } struct FullQueryResult { - id: Option, key: Option>, value: Option>, } @@ -49,7 +47,7 @@ pub struct Database<'env> { #[derivative(Debug)] pub struct RwTransaction<'env> { db_path: String, - cursor: HashMap, + cursor: HashMap>, conn: PooledConnection, _phantom: PhantomData<&'env ()>, } @@ -100,7 +98,6 @@ impl Environment { ) -> Result, Error> { let create_table_command = format!( "CREATE TABLE IF NOT EXISTS {} ( - id INTEGER PRIMARY KEY AUTOINCREMENT, key BLOB UNIQUE, value BLOB );", @@ -153,9 +150,8 @@ impl<'env> RwTransaction<'env> { let query_result = stmt .query_row(named_params![":key": key.as_ref()], |row| { Ok(FullQueryResult { - id: row.get(0)?, - key: row.get(1)?, - value: row.get(2)?, + key: row.get(0)?, + value: row.get(1)?, }) }) .optional()?; @@ -184,36 +180,34 @@ impl<'env> RwTransaction<'env> { } pub fn del>(&mut self, db: &Database, key: K) -> Result<(), Error> { - let delete_statement = format!("DELETE FROM {} WHERE key=:id", db.table_name); + let delete_statement = format!("DELETE FROM {} WHERE key=:key", db.table_name); let mut stmt = self.conn.prepare_cached(&delete_statement)?; - stmt.execute(named_params![":id": key.as_ref().to_owned()])?; + stmt.execute(named_params![":key": key.as_ref().to_owned()])?; Ok(()) } pub fn delete_current(&mut self, db: &Database) -> Result<(), Error> { if let Some(current_id) = self.cursor.get(db.table_name) { - let delete_statement = format!("DELETE FROM {} WHERE id=:id", db.table_name); + let delete_statement = format!("DELETE FROM {} WHERE key=:key", db.table_name); let mut stmt = self.conn.prepare_cached(&delete_statement)?; - stmt.execute(named_params![":id": current_id.to_owned()])?; + stmt.execute(named_params![":key": current_id.to_owned()])?; self.cursor.remove(db.table_name); } Ok(()) } pub fn first_key(&mut self, db: &Database) -> Result, Error> { - let query_statement = format!("SELECT MIN(id), key, value FROM {}", db.table_name); - let txn = self.conn.transaction()?; - let mut stmt = txn.prepare_cached(&query_statement)?; + let query_statement = format!("SELECT MIN(key), value FROM {}", db.table_name); + let mut stmt = self.conn.prepare_cached(&query_statement)?; let mut query_result = stmt.query_row([], |row| { Ok(FullQueryResult { - id: row.get(0)?, - key: row.get(1)?, - value: row.get(2)?, + key: row.get(0)?, + value: row.get(1)?, }) })?; if let Some(key) = query_result.key { - self.cursor.insert(db.table_name.to_string(), query_result.id.unwrap_or_default()); + self.cursor.insert(db.table_name.to_string(), key.clone()); return Ok(Some(Cow::from(key))); } @@ -221,20 +215,18 @@ impl<'env> RwTransaction<'env> { } pub fn last_key(&mut self, db: &Database) -> Result>, Error> { - let query_statement = format!("SELECT MAX(id), key, value FROM {}", db.table_name); - let txn = self.conn.transaction()?; - let mut stmt = txn.prepare_cached(&query_statement)?; + let query_statement = format!("SELECT MAX(key), value FROM {}", db.table_name); + let mut stmt = self.conn.prepare_cached(&query_statement)?; let mut query_result = stmt.query_row([], |row| { Ok(FullQueryResult { - id: row.get(0)?, - key: row.get(1)?, - value: row.get(2)?, + key: row.get(0)?, + value: row.get(1)?, }) })?; if let Some(key) = query_result.key { - self.cursor.insert(db.table_name.to_string(), query_result.id.unwrap_or_default()); + self.cursor.insert(db.table_name.to_string(), key.clone()); return Ok(Some(Cow::from(key))); } @@ -248,30 +240,26 @@ impl<'env> RwTransaction<'env> { let query_result = match self.cursor.get(db.table_name) { Some(current_key) => { query_statement = format!( - "SELECT MIN(id), key FROM {} where id >:id", + "SELECT MIN(key) FROM {} where key >:key", db.table_name ); - let txn = self.conn.transaction()?; - let mut stmt = txn.prepare_cached(&query_statement)?; + let mut stmt = self.conn.prepare_cached(&query_statement)?; - let mut query_result = stmt.query_row(named_params![":id": current_key], |row| { + let mut query_result = stmt.query_row(named_params![":key": current_key], |row| { Ok(QueryResult { - id: row.get(0)?, - key: row.get(1)?, + key: row.get(0)?, }) })?; query_result }, None => { - query_statement = format!("SELECT MIN(id), key FROM {}", db.table_name); - let txn = self.conn.transaction()?; - let mut stmt = txn.prepare_cached(&query_statement)?; + query_statement = format!("SELECT MIN(key) FROM {}", db.table_name); + let mut stmt = self.conn.prepare_cached(&query_statement)?; let mut query_result = stmt.query_row([], |row| { Ok(QueryResult { - id: row.get(0)?, - key: row.get(1)?, + key: row.get(0)?, }) })?; @@ -280,7 +268,7 @@ impl<'env> RwTransaction<'env> { }; if let Some(key) = query_result.key { - self.cursor.insert(db.table_name.to_string(), query_result.id.unwrap_or_default()); + self.cursor.insert(db.table_name.to_string(), key.clone()); return Ok(Some(Cow::from(key))); } @@ -290,17 +278,15 @@ impl<'env> RwTransaction<'env> { pub fn get_current(&mut self, db: &Database) -> Result, Value<'env>)>, Error> { if let Some(current_id) = self.cursor.get(db.table_name) { let query_statement = format!( - "SELECT id, key, value FROM {} where id=:id", + "SELECT key, value FROM {} where key=:key", db.table_name ); - let txn = self.conn.transaction()?; - let mut stmt = txn.prepare_cached(&query_statement)?; + let mut stmt = self.conn.prepare_cached(&query_statement)?; let query_result = stmt - .query_row(named_params![":id": current_id], |row| { + .query_row(named_params![":key": current_id], |row| { Ok(FullQueryResult { - id: row.get(0)?, - key: row.get(1)?, - value: row.get(2)?, + key: row.get(0)?, + value: row.get(1)?, }) }) .optional()?; @@ -323,26 +309,25 @@ impl<'env> RwTransaction<'env> { let mut deleted_values: Vec> = vec![]; if let Some(current_key) = &self.cursor.get(db.table_name) { let query_statement = format!( - "SELECT id, key, value FROM {} where id>=:id", + "SELECT key, value FROM {} where key>=:key", db.table_name ); let mut stmt = self.conn.prepare_cached(&query_statement)?; - let rows = stmt.query_map(named_params![":id": current_key], |row| { + let rows = stmt.query_map(named_params![":key": current_key], |row| { Ok(FullQueryResult { - id: row.get(0)?, - key: row.get(1)?, - value: row.get(2)?, + key: row.get(0)?, + value: row.get(1)?, }) })?; - let delete_statement = format!("DELETE FROM {} WHERE id=:id", db.table_name); + let delete_statement = format!("DELETE FROM {} WHERE key=:key", db.table_name); let mut stmt = self.conn.prepare_cached(&delete_statement)?; for row in rows { let query_result = row?; - - if f(&query_result.key.unwrap())? { - stmt.execute(named_params![":id": query_result.id.unwrap()])?; + let key = query_result.key.unwrap(); + if f(&key)? { + stmt.execute(named_params![":key": key])?; } } }; From d552265b036170bf6a4a7006505300aa5964224f Mon Sep 17 00:00:00 2001 From: Eitan Seri-Levi Date: Sun, 15 Oct 2023 18:40:30 +0300 Subject: [PATCH 17/17] additional pragmas --- slasher/src/database/sqlite_impl.rs | 83 +++++++++++++---------------- 1 file changed, 37 insertions(+), 46 deletions(-) diff --git a/slasher/src/database/sqlite_impl.rs b/slasher/src/database/sqlite_impl.rs index f27ddfb8da4..445dd3da9be 100644 --- a/slasher/src/database/sqlite_impl.rs +++ b/slasher/src/database/sqlite_impl.rs @@ -1,13 +1,13 @@ #![cfg(feature = "sqlite")] -use r2d2::{PooledConnection, Pool}; -use r2d2_sqlite::SqliteConnectionManager; -use rusqlite::{params, OptionalExtension, ToSql, Transaction, Connection, named_params}; -use std::{fmt, collections::HashMap}; use derivative::Derivative; +use r2d2::{Pool, PooledConnection}; +use r2d2_sqlite::SqliteConnectionManager; +use rusqlite::{named_params, params, Connection, OptionalExtension, ToSql, Transaction}; use std::{ borrow::{Borrow, Cow}, path::PathBuf, }; +use std::{collections::HashMap, fmt}; use crate::{ database::{ @@ -34,7 +34,7 @@ struct FullQueryResult { pub struct Environment { _db_count: usize, db_path: String, - pool: Pool + pool: Pool, } #[derive(Debug)] @@ -64,7 +64,7 @@ impl Environment { Ok(Environment { _db_count: MAX_NUM_DBS, db_path, - pool + pool, }) } @@ -123,10 +123,11 @@ impl Environment { } pub fn begin_rw_txn(&self) -> Result { - let conn: PooledConnection = self.pool.get().unwrap(); conn.pragma_update(None, "journal_mode", "wal"); conn.pragma_update(None, "synchronous", "NORMAL"); + conn.pragma_update(None, "locking_mode", "EXCLUSIVE"); + conn.pragma_update(None, "cache_size", "10000"); Ok(RwTransaction { _phantom: PhantomData, db_path: self.db_path.clone(), @@ -136,7 +137,6 @@ impl Environment { } } - impl<'env> RwTransaction<'env> { pub fn get + ?Sized>( &'env self, @@ -158,9 +158,7 @@ impl<'env> RwTransaction<'env> { match query_result { Some(result) => Ok(Some(Cow::from(result.value.unwrap_or_default()))), - None => { - Ok(None) - }, + None => Ok(None), } } @@ -175,10 +173,12 @@ impl<'env> RwTransaction<'env> { db.table_name ); let mut stmt = self.conn.prepare_cached(&insert_statement)?; - stmt.execute(named_params![":key": key.as_ref().to_owned(), ":value": value.as_ref().to_owned()])?; + stmt.execute( + named_params![":key": key.as_ref().to_owned(), ":value": value.as_ref().to_owned()], + )?; Ok(()) } - + pub fn del>(&mut self, db: &Database, key: K) -> Result<(), Error> { let delete_statement = format!("DELETE FROM {} WHERE key=:key", db.table_name); let mut stmt = self.conn.prepare_cached(&delete_statement)?; @@ -209,7 +209,7 @@ impl<'env> RwTransaction<'env> { if let Some(key) = query_result.key { self.cursor.insert(db.table_name.to_string(), key.clone()); return Ok(Some(Cow::from(key))); - } + } Ok(None) } @@ -228,43 +228,35 @@ impl<'env> RwTransaction<'env> { if let Some(key) = query_result.key { self.cursor.insert(db.table_name.to_string(), key.clone()); return Ok(Some(Cow::from(key))); - } + } Ok(None) } pub fn next_key(&mut self, db: &Database) -> Result>, Error> { - let mut query_statement = "".to_string(); let query_result = match self.cursor.get(db.table_name) { - Some(current_key) => { - query_statement = format!( - "SELECT MIN(key) FROM {} where key >:key", - db.table_name - ); + Some(current_key) => { + query_statement = format!("SELECT MIN(key) FROM {} where key >:key", db.table_name); let mut stmt = self.conn.prepare_cached(&query_statement)?; - - let mut query_result = stmt.query_row(named_params![":key": current_key], |row| { - Ok(QueryResult { - key: row.get(0)?, - }) - })?; + + let mut query_result = stmt + .query_row(named_params![":key": current_key], |row| { + Ok(QueryResult { key: row.get(0)? }) + })?; query_result - }, + } None => { query_statement = format!("SELECT MIN(key) FROM {}", db.table_name); let mut stmt = self.conn.prepare_cached(&query_statement)?; - - let mut query_result = stmt.query_row([], |row| { - Ok(QueryResult { - key: row.get(0)?, - }) - })?; + + let mut query_result = + stmt.query_row([], |row| Ok(QueryResult { key: row.get(0)? }))?; query_result - }, + } }; if let Some(key) = query_result.key { @@ -275,12 +267,13 @@ impl<'env> RwTransaction<'env> { Ok(None) } - pub fn get_current(&mut self, db: &Database) -> Result, Value<'env>)>, Error> { + pub fn get_current( + &mut self, + db: &Database, + ) -> Result, Value<'env>)>, Error> { if let Some(current_id) = self.cursor.get(db.table_name) { - let query_statement = format!( - "SELECT key, value FROM {} where key=:key", - db.table_name - ); + let query_statement = + format!("SELECT key, value FROM {} where key=:key", db.table_name); let mut stmt = self.conn.prepare_cached(&query_statement)?; let query_result = stmt .query_row(named_params![":key": current_id], |row| { @@ -308,11 +301,9 @@ impl<'env> RwTransaction<'env> { ) -> Result>, Error> { let mut deleted_values: Vec> = vec![]; if let Some(current_key) = &self.cursor.get(db.table_name) { - let query_statement = format!( - "SELECT key, value FROM {} where key>=:key", - db.table_name - ); - + let query_statement = + format!("SELECT key, value FROM {} where key>=:key", db.table_name); + let mut stmt = self.conn.prepare_cached(&query_statement)?; let rows = stmt.query_map(named_params![":key": current_key], |row| { Ok(FullQueryResult { @@ -337,4 +328,4 @@ impl<'env> RwTransaction<'env> { pub fn commit(mut self) -> Result<(), Error> { Ok(()) } -} \ No newline at end of file +}