From c750ac5d38d78c801df58652043c140107fad6be Mon Sep 17 00:00:00 2001 From: "Jeff Washington (jwash)" Date: Fri, 22 Sep 2023 14:42:08 -0700 Subject: [PATCH] data bucket holds RestartableBucket (#33381) --- bucket_map/src/bucket.rs | 27 +++++++++++++++++---------- bucket_map/src/bucket_api.rs | 1 + 2 files changed, 18 insertions(+), 10 deletions(-) diff --git a/bucket_map/src/bucket.rs b/bucket_map/src/bucket.rs index f9a01ac8e591a7..00fa5dc79f14ad 100644 --- a/bucket_map/src/bucket.rs +++ b/bucket_map/src/bucket.rs @@ -11,6 +11,7 @@ use { DataBucket, IndexBucket, IndexEntry, IndexEntryPlaceInBucket, MultipleSlots, OccupiedEnum, }, + restart::RestartableBucket, MaxSearch, RefCount, }, rand::{thread_rng, Rng}, @@ -107,6 +108,9 @@ pub struct Bucket { /// set to true once any entries have been deleted from the index. /// Deletes indicate that there can be free slots and that the full search range must be searched for an entry. at_least_one_entry_deleted: bool, + + /// keep track of which index file this bucket is using so on restart we can try to reuse it + restartable_bucket: RestartableBucket, } impl<'b, T: Clone + Copy + PartialEq + std::fmt::Debug + 'static> Bucket { @@ -115,6 +119,7 @@ impl<'b, T: Clone + Copy + PartialEq + std::fmt::Debug + 'static> Bucket { max_search: MaxSearch, stats: Arc, count: Arc, + restartable_bucket: RestartableBucket, ) -> Self { let (index, _file_name) = BucketStorage::new( Arc::clone(&drives), @@ -125,9 +130,10 @@ impl<'b, T: Clone + Copy + PartialEq + std::fmt::Debug + 'static> Bucket { count, ); stats.index.resize_grow(0, index.capacity_bytes()); + let random = thread_rng().gen(); Self { - random: thread_rng().gen(), + random, drives, index, data: vec![], @@ -135,6 +141,7 @@ impl<'b, T: Clone + Copy + PartialEq + std::fmt::Debug + 'static> Bucket { reallocated: Reallocated::default(), anticipated_size: 0, at_least_one_entry_deleted: false, + restartable_bucket, } } @@ -570,7 +577,7 @@ impl<'b, T: Clone + Copy + PartialEq + std::fmt::Debug + 'static> Bucket { count += 1; // grow relative to the current capacity let new_capacity = (current_capacity * 110 / 100).max(anticipated_size); - let (mut index, _file_name) = BucketStorage::new_with_capacity( + let (mut index, file_name) = BucketStorage::new_with_capacity( Arc::clone(&self.drives), 1, std::mem::size_of::>() as u64, @@ -596,13 +603,6 @@ impl<'b, T: Clone + Copy + PartialEq + std::fmt::Debug + 'static> Bucket { let new_elem: &mut IndexEntry = index.get_mut(new_ix); *new_elem = *elem; index.copying_entry(new_ix, &self.index, ix); - /* - let dbg_elem: IndexEntry = *new_elem; - assert_eq!( - Self::bucket_find_index_entry(&index, &elem.key, random).unwrap(), - (&dbg_elem, new_ix) - ); - */ } } if valid { @@ -610,6 +610,7 @@ impl<'b, T: Clone + Copy + PartialEq + std::fmt::Debug + 'static> Bucket { let mut items = self.reallocated.items.lock().unwrap(); items.index = Some(index); self.reallocated.add_reallocation(); + self.restartable_bucket.set_file(file_name, self.random); break; } } @@ -1075,7 +1076,13 @@ mod tests { let paths: Vec = vec![tmpdir.path().to_path_buf()]; assert!(!paths.is_empty()); let max_search = 2; - let mut bucket = Bucket::new(Arc::new(paths), max_search, Arc::default(), Arc::default()); + let mut bucket = Bucket::new( + Arc::new(paths), + max_search, + Arc::default(), + Arc::default(), + RestartableBucket::default(), + ); let key = Pubkey::new_unique(); assert_eq!(bucket.read_value(&key), None); diff --git a/bucket_map/src/bucket_api.rs b/bucket_map/src/bucket_api.rs index d81b4b52f3cc34..e5449a814a5be9 100644 --- a/bucket_map/src/bucket_api.rs +++ b/bucket_map/src/bucket_api.rs @@ -97,6 +97,7 @@ impl BucketApi { self.max_search, Arc::clone(&self.stats), Arc::clone(&self.count), + self.restartable_bucket.clone(), )); } }