Skip to content

Commit

Permalink
data bucket holds RestartableBucket (#33381)
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffwashington authored Sep 22, 2023
1 parent 456563b commit c750ac5
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 10 deletions.
27 changes: 17 additions & 10 deletions bucket_map/src/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use {
DataBucket, IndexBucket, IndexEntry, IndexEntryPlaceInBucket, MultipleSlots,
OccupiedEnum,
},
restart::RestartableBucket,
MaxSearch, RefCount,
},
rand::{thread_rng, Rng},
Expand Down Expand Up @@ -107,6 +108,9 @@ pub struct Bucket<T: Copy + PartialEq + 'static> {
/// set to true once any entries have been deleted from the index.
/// Deletes indicate that there can be free slots and that the full search range must be searched for an entry.
at_least_one_entry_deleted: bool,

/// keep track of which index file this bucket is using so on restart we can try to reuse it
restartable_bucket: RestartableBucket,
}

impl<'b, T: Clone + Copy + PartialEq + std::fmt::Debug + 'static> Bucket<T> {
Expand All @@ -115,6 +119,7 @@ impl<'b, T: Clone + Copy + PartialEq + std::fmt::Debug + 'static> Bucket<T> {
max_search: MaxSearch,
stats: Arc<BucketMapStats>,
count: Arc<AtomicU64>,
restartable_bucket: RestartableBucket,
) -> Self {
let (index, _file_name) = BucketStorage::new(
Arc::clone(&drives),
Expand All @@ -125,16 +130,18 @@ impl<'b, T: Clone + Copy + PartialEq + std::fmt::Debug + 'static> Bucket<T> {
count,
);
stats.index.resize_grow(0, index.capacity_bytes());
let random = thread_rng().gen();

Self {
random: thread_rng().gen(),
random,
drives,
index,
data: vec![],
stats,
reallocated: Reallocated::default(),
anticipated_size: 0,
at_least_one_entry_deleted: false,
restartable_bucket,
}
}

Expand Down Expand Up @@ -570,7 +577,7 @@ impl<'b, T: Clone + Copy + PartialEq + std::fmt::Debug + 'static> Bucket<T> {
count += 1;
// grow relative to the current capacity
let new_capacity = (current_capacity * 110 / 100).max(anticipated_size);
let (mut index, _file_name) = BucketStorage::new_with_capacity(
let (mut index, file_name) = BucketStorage::new_with_capacity(
Arc::clone(&self.drives),
1,
std::mem::size_of::<IndexEntry<T>>() as u64,
Expand All @@ -596,20 +603,14 @@ impl<'b, T: Clone + Copy + PartialEq + std::fmt::Debug + 'static> Bucket<T> {
let new_elem: &mut IndexEntry<T> = index.get_mut(new_ix);
*new_elem = *elem;
index.copying_entry(new_ix, &self.index, ix);
/*
let dbg_elem: IndexEntry = *new_elem;
assert_eq!(
Self::bucket_find_index_entry(&index, &elem.key, random).unwrap(),
(&dbg_elem, new_ix)
);
*/
}
}
if valid {
self.stats.index.update_max_size(index.capacity());
let mut items = self.reallocated.items.lock().unwrap();
items.index = Some(index);
self.reallocated.add_reallocation();
self.restartable_bucket.set_file(file_name, self.random);
break;
}
}
Expand Down Expand Up @@ -1075,7 +1076,13 @@ mod tests {
let paths: Vec<PathBuf> = vec![tmpdir.path().to_path_buf()];
assert!(!paths.is_empty());
let max_search = 2;
let mut bucket = Bucket::new(Arc::new(paths), max_search, Arc::default(), Arc::default());
let mut bucket = Bucket::new(
Arc::new(paths),
max_search,
Arc::default(),
Arc::default(),
RestartableBucket::default(),
);

let key = Pubkey::new_unique();
assert_eq!(bucket.read_value(&key), None);
Expand Down
1 change: 1 addition & 0 deletions bucket_map/src/bucket_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ impl<T: Clone + Copy + PartialEq + std::fmt::Debug> BucketApi<T> {
self.max_search,
Arc::clone(&self.stats),
Arc::clone(&self.count),
self.restartable_bucket.clone(),
));
}
}
Expand Down

0 comments on commit c750ac5

Please sign in to comment.