From 5e0896fc7c6631c9044f8419829d412c03f1dea6 Mon Sep 17 00:00:00 2001 From: Yuki Kishimoto Date: Wed, 20 Nov 2024 10:12:25 +0100 Subject: [PATCH] lmdb: transactions improvements * Remove `remove_by_id` method * Add `Lmdb::delete` method * Re-use read transaction in some method instead of create a new one Signed-off-by: Yuki Kishimoto --- CHANGELOG.md | 3 +- crates/nostr-lmdb/src/store/lmdb/mod.rs | 163 +++++++++++------------- crates/nostr-lmdb/src/store/mod.rs | 68 +++++----- 3 files changed, 110 insertions(+), 124 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 02eb22bf1..168a17150 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -63,7 +63,8 @@ * sdk: cleanup `Client` methods ([Yuki Kishimoto]) * relay-builder: port selection by using random port generation ([Yuki Kishimoto]) * lmdb: optimize vector initialization in unit tests ([Xiao Yu]) -* lmdb: commit also read txn and force sync after delete operations ([Yuki Kishimoto]) +* lmdb: commit also read txn ([Yuki Kishimoto]) +* lmdb: transactions improvements ([Yuki Kishimoto]) * nwc: increase default timeout to 60 secs ([Yuki Kishimoto]) * book: convert JS snippets to TypeScript ([Yuki Kishimoto]) * book: use `pyright` to check python snippets ([Yuki Kishimoto]) diff --git a/crates/nostr-lmdb/src/store/lmdb/mod.rs b/crates/nostr-lmdb/src/store/lmdb/mod.rs index e13a8b43d..2293e4870 100644 --- a/crates/nostr-lmdb/src/store/lmdb/mod.rs +++ b/crates/nostr-lmdb/src/store/lmdb/mod.rs @@ -135,18 +135,6 @@ impl Lmdb { }) } - pub(crate) fn force_sync(&self) -> Result<(), Error> { - self.env.force_sync()?; - Ok(()) - } - - // pub(crate) fn close(self) -> Result<(), Error> { - // self.env.force_sync()?; - // let closing_event = self.env.prepare_for_closing(); - // closing_event.wait(); - // Ok(()) - // } - /// Get a read transaction #[inline] pub(crate) fn read_txn(&self) -> Result { @@ -194,37 +182,35 @@ impl Lmdb { self.ac_index.put(txn, &ac_index_key, id)?; for tag in event.tags.iter() { - if let Some(tag_name) = tag.single_letter_tag() { - if let Some(tag_value) = tag.content() { - // Index by tag (with created_at and id) - let tc_index_key: Vec = index::make_tc_index_key( - &tag_name, - tag_value, - &event.created_at, - event.id.as_bytes(), - ); - self.tc_index.put(txn, &tc_index_key, id)?; - - // Index by author and tag (with created_at and id) - let atc_index_key: Vec = index::make_atc_index_key( - &event.pubkey.to_bytes(), - &tag_name, - tag_value, - &event.created_at, - event.id.as_bytes(), - ); - self.atc_index.put(txn, &atc_index_key, id)?; - - // Index by kind and tag (with created_at and id) - let ktc_index_key: Vec = index::make_ktc_index_key( - event.kind.as_u16(), - &tag_name, - tag_value, - &event.created_at, - event.id.as_bytes(), - ); - self.ktc_index.put(txn, &ktc_index_key, id)?; - } + if let (Some(tag_name), Some(tag_value)) = (tag.single_letter_tag(), tag.content()) { + // Index by author and tag (with created_at and id) + let atc_index_key: Vec = index::make_atc_index_key( + &event.pubkey.to_bytes(), + &tag_name, + tag_value, + &event.created_at, + event.id.as_bytes(), + ); + self.atc_index.put(txn, &atc_index_key, id)?; + + // Index by kind and tag (with created_at and id) + let ktc_index_key: Vec = index::make_ktc_index_key( + event.kind.as_u16(), + &tag_name, + tag_value, + &event.created_at, + event.id.as_bytes(), + ); + self.ktc_index.put(txn, &ktc_index_key, id)?; + + // Index by tag (with created_at and id) + let tc_index_key: Vec = index::make_tc_index_key( + &tag_name, + tag_value, + &event.created_at, + event.id.as_bytes(), + ); + self.tc_index.put(txn, &tc_index_key, id)?; } } @@ -233,6 +219,19 @@ impl Lmdb { /// Remove the event pub(crate) fn remove(&self, txn: &mut RwTxn, event: &DatabaseEvent) -> Result<(), Error> { + self.events.delete(txn, event.id())?; + + let ci_index_key: Vec = index::make_ci_index_key(&event.created_at, event.id()); + self.ci_index.delete(txn, &ci_index_key)?; + + let akc_index_key: Vec = + index::make_akc_index_key(event.author(), event.kind, &event.created_at, event.id()); + self.akc_index.delete(txn, &akc_index_key)?; + + let ac_index_key: Vec = + index::make_ac_index_key(event.author(), &event.created_at, event.id()); + self.ac_index.delete(txn, &ac_index_key)?; + for tag in event.iter_tags() { if let Some((tag_name, tag_value)) = tag.extract() { // Index by author and tag (with created_at and id) @@ -262,19 +261,6 @@ impl Lmdb { } } - let ac_index_key: Vec = - index::make_ac_index_key(event.author(), &event.created_at, event.id()); - self.ac_index.delete(txn, &ac_index_key)?; - - let ci_index_key: Vec = index::make_ci_index_key(&event.created_at, event.id()); - self.ci_index.delete(txn, &ci_index_key)?; - - let akc_index_key: Vec = - index::make_akc_index_key(event.author(), event.kind, &event.created_at, event.id()); - self.akc_index.delete(txn, &akc_index_key)?; - - self.events.delete(txn, event.id())?; - Ok(()) } @@ -324,6 +310,14 @@ impl Lmdb { Ok(output) } + pub fn delete(&self, read_txn: &RoTxn, txn: &mut RwTxn, filter: Filter) -> Result<(), Error> { + let events = self.single_filter_query(read_txn, filter)?; + for event in events.into_iter() { + self.remove(txn, &event)?; + } + Ok(()) + } + /// Find all events that match the filter fn single_filter_query<'a>( &self, @@ -627,24 +621,11 @@ impl Lmdb { Ok(None) } - /// Remove an event by ID - pub fn remove_by_id( - &self, - read_txn: &RoTxn, - write_txn: &mut RwTxn, - event_id: &[u8], - ) -> Result<(), Error> { - if let Some(event) = self.get_event_by_id(read_txn, event_id)? { - self.remove(write_txn, &event)?; - } - - Ok(()) - } - // Remove all replaceable events with the matching author-kind // Kind must be a replaceable (not parameterized replaceable) event kind pub fn remove_replaceable( &self, + read_txn: &RoTxn, txn: &mut RwTxn, coordinate: &Coordinate, until: Timestamp, @@ -653,9 +634,8 @@ impl Lmdb { return Err(Error::WrongEventKind); } - let read_txn = self.read_txn()?; let iter = self.akc_iter( - &read_txn, + read_txn, &coordinate.public_key.to_bytes(), coordinate.kind.as_u16(), Timestamp::zero(), @@ -664,7 +644,10 @@ impl Lmdb { for result in iter { let (_key, id) = result?; - self.remove_by_id(&read_txn, txn, id)?; + + if let Some(event) = self.get_event_by_id(read_txn, id)? { + self.remove(txn, &event)?; + } } Ok(()) @@ -674,6 +657,7 @@ impl Lmdb { // Kind must be a parameterized-replaceable event kind pub fn remove_parameterized_replaceable( &self, + read_txn: &RoTxn, txn: &mut RwTxn, coordinate: &Coordinate, until: Timestamp, @@ -682,9 +666,8 @@ impl Lmdb { return Err(Error::WrongEventKind); } - let read_txn = self.read_txn()?; let iter = self.atc_iter( - &read_txn, + read_txn, &coordinate.public_key.to_bytes(), &SingleLetterTag::lowercase(Alphabet::D), &coordinate.identifier, @@ -696,12 +679,10 @@ impl Lmdb { let (_key, id) = result?; // Our index doesn't have Kind embedded, so we have to check it - let event = self - .get_event_by_id(&read_txn, id)? - .ok_or(Error::NotFound)?; + let event = self.get_event_by_id(read_txn, id)?.ok_or(Error::NotFound)?; if event.kind == coordinate.kind.as_u16() { - self.remove_by_id(&read_txn, txn, id)?; + self.remove(txn, &event)?; } } @@ -750,8 +731,8 @@ impl Lmdb { let start_prefix = index::make_ci_index_key(until, &EVENT_ID_ALL_ZEROS); let end_prefix = index::make_ci_index_key(since, &EVENT_ID_ALL_255); let range = ( - Bound::Included(&*start_prefix), - Bound::Excluded(&*end_prefix), + Bound::Included(start_prefix.as_slice()), + Bound::Excluded(end_prefix.as_slice()), ); Ok(self.ci_index.range(txn, &range)?) } @@ -772,8 +753,8 @@ impl Lmdb { ); let end_prefix = index::make_tc_index_key(tag_name, tag_value, since, &EVENT_ID_ALL_255); let range = ( - Bound::Included(&*start_prefix), - Bound::Excluded(&*end_prefix), + Bound::Included(start_prefix.as_slice()), + Bound::Excluded(end_prefix.as_slice()), ); Ok(self.tc_index.range(txn, &range)?) } @@ -788,8 +769,8 @@ impl Lmdb { let start_prefix = index::make_ac_index_key(author, &until, &EVENT_ID_ALL_ZEROS); let end_prefix = index::make_ac_index_key(author, &since, &EVENT_ID_ALL_255); let range = ( - Bound::Included(&*start_prefix), - Bound::Excluded(&*end_prefix), + Bound::Included(start_prefix.as_slice()), + Bound::Excluded(end_prefix.as_slice()), ); Ok(self.ac_index.range(txn, &range)?) } @@ -805,8 +786,8 @@ impl Lmdb { let start_prefix = index::make_akc_index_key(author, kind, &until, &EVENT_ID_ALL_ZEROS); let end_prefix = index::make_akc_index_key(author, kind, &since, &EVENT_ID_ALL_255); let range = ( - Bound::Included(&*start_prefix), - Bound::Excluded(&*end_prefix), + Bound::Included(start_prefix.as_slice()), + Bound::Excluded(end_prefix.as_slice()), ); Ok(self.akc_index.range(txn, &range)?) } @@ -830,8 +811,8 @@ impl Lmdb { let end_prefix: Vec = index::make_atc_index_key(author, tag_name, tag_value, since, &EVENT_ID_ALL_255); let range = ( - Bound::Included(&*start_prefix), - Bound::Excluded(&*end_prefix), + Bound::Included(start_prefix.as_slice()), + Bound::Excluded(end_prefix.as_slice()), ); Ok(self.atc_index.range(txn, &range)?) } @@ -855,8 +836,8 @@ impl Lmdb { let end_prefix = index::make_ktc_index_key(kind, tag_name, tag_value, since, &EVENT_ID_ALL_255); let range = ( - Bound::Included(&*start_prefix), - Bound::Excluded(&*end_prefix), + Bound::Included(start_prefix.as_slice()), + Bound::Excluded(end_prefix.as_slice()), ); Ok(self.ktc_index.range(txn, &range)?) } diff --git a/crates/nostr-lmdb/src/store/mod.rs b/crates/nostr-lmdb/src/store/mod.rs index 3aae99279..11572b423 100644 --- a/crates/nostr-lmdb/src/store/mod.rs +++ b/crates/nostr-lmdb/src/store/mod.rs @@ -75,17 +75,17 @@ impl Store { let event = event.clone(); self.interact_with_fbb(move |db, fbb| { - // Acquire write transaction - let mut txn = db.write_txn()?; + // Acquire read transaction + let read_txn = db.read_txn()?; // Already exists - if db.has_event(&txn, event.id.as_bytes())? { + if db.has_event(&read_txn, event.id.as_bytes())? { //return Err(Error::Duplicate); return Ok(false); } // Reject event if ID was deleted - if db.is_deleted(&txn, &event.id)? { + if db.is_deleted(&read_txn, &event.id)? { //return Err(Error::Deleted); return Ok(false); } @@ -94,7 +94,7 @@ impl Store { // (non-parameterized) if event.kind.is_replaceable() { let coordinate: Coordinate = Coordinate::new(event.kind, event.pubkey); - if let Some(time) = db.when_is_coordinate_deleted(&txn, &coordinate)? { + if let Some(time) = db.when_is_coordinate_deleted(&read_txn, &coordinate)? { if event.created_at <= time { //return Err(Error::Deleted); return Ok(false); @@ -108,7 +108,7 @@ impl Store { if let Some(identifier) = event.tags.identifier() { let coordinate: Coordinate = Coordinate::new(event.kind, event.pubkey).identifier(identifier); - if let Some(time) = db.when_is_coordinate_deleted(&txn, &coordinate)? { + if let Some(time) = db.when_is_coordinate_deleted(&read_txn, &coordinate)? { if event.created_at <= time { //return Err(Error::Deleted); return Ok(false); @@ -117,17 +117,23 @@ impl Store { } } + // Acquire write transaction + let mut txn = db.write_txn()?; + // Remove replaceable events being replaced if event.kind.is_replaceable() { // Find replaceable event - if let Some(stored) = db.find_replaceable_event(&txn, &event.pubkey, event.kind)? { + if let Some(stored) = + db.find_replaceable_event(&read_txn, &event.pubkey, event.kind)? + { if stored.created_at > event.created_at { // return Err(Error::Replaced); + txn.abort(); return Ok(false); } let coordinate: Coordinate = Coordinate::new(event.kind, event.pubkey); - db.remove_replaceable(&mut txn, &coordinate, event.created_at)?; + db.remove_replaceable(&read_txn, &mut txn, &coordinate, event.created_at)?; } } @@ -139,14 +145,16 @@ impl Store { // Find param replaceable event if let Some(stored) = - db.find_parameterized_replaceable_event(&txn, &coordinate)? + db.find_parameterized_replaceable_event(&read_txn, &coordinate)? { if stored.created_at > event.created_at { // return Err(Error::Replaced); + txn.abort(); return Ok(false); } db.remove_parameterized_replaceable( + &read_txn, &mut txn, &coordinate, Timestamp::max(), @@ -155,32 +163,35 @@ impl Store { } } - // Store and index the event - db.store(&mut txn, fbb, &event)?; - // Handle deletion events if let Kind::EventDeletion = event.kind { - let invalid: bool = Self::handle_deletion_event(&db, &mut txn, &event)?; + let invalid: bool = Self::handle_deletion_event(&db, &read_txn, &mut txn, &event)?; if invalid { + txn.abort(); return Ok(false); } } - txn.commit()?; + // Store and index the event + db.store(&mut txn, fbb, &event)?; - // TODO: force_sync? + read_txn.commit()?; + txn.commit()?; Ok(true) }) .await? } - fn handle_deletion_event(db: &Lmdb, txn: &mut RwTxn, event: &Event) -> Result { - let read_txn = db.read_txn()?; - + fn handle_deletion_event( + db: &Lmdb, + read_txn: &RoTxn, + txn: &mut RwTxn, + event: &Event, + ) -> Result { for id in event.tags.event_ids() { - if let Some(target) = db.get_event_by_id(txn, id.as_bytes())? { + if let Some(target) = db.get_event_by_id(read_txn, id.as_bytes())? { // Author must match if target.author() != &event.pubkey.to_bytes() { return Ok(true); @@ -188,12 +199,10 @@ impl Store { // Mark as deleted and remove event db.mark_deleted(txn, id)?; - db.remove_by_id(&read_txn, txn, id.as_bytes())?; + db.remove(txn, &target)?; } } - read_txn.commit()?; - for coordinate in event.tags.coordinates() { // Author must match if coordinate.public_key != event.pubkey { @@ -205,9 +214,9 @@ impl Store { // Remove events (up to the created_at of the deletion event) if coordinate.kind.is_replaceable() { - db.remove_replaceable(txn, coordinate, event.created_at)?; + db.remove_replaceable(read_txn, txn, coordinate, event.created_at)?; } else if coordinate.kind.is_parameterized_replaceable() { - db.remove_parameterized_replaceable(txn, coordinate, event.created_at)?; + db.remove_parameterized_replaceable(read_txn, txn, coordinate, event.created_at)?; } } @@ -312,17 +321,13 @@ impl Store { pub async fn delete(&self, filter: Filter) -> Result<(), Error> { self.interact(move |db| { let read_txn = db.read_txn()?; - let events = db.query(&read_txn, vec![filter])?; - let mut txn = db.write_txn()?; - for event in events.into_iter() { - db.remove(&mut txn, &event)?; - } + + db.delete(&read_txn, &mut txn, filter)?; + read_txn.commit()?; txn.commit()?; - db.force_sync()?; - Ok(()) }) .await? @@ -333,7 +338,6 @@ impl Store { let mut txn = db.write_txn()?; db.wipe(&mut txn)?; txn.commit()?; - db.force_sync()?; Ok(()) }) .await?