Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Apr 22, 2024
1 parent a136122 commit ad0e146
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 10 deletions.
6 changes: 3 additions & 3 deletions src/storage/hummock_test/src/hummock_read_version_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ async fn test_read_version_basic() {
{
// single imm
let sorted_items = gen_dummy_batch(1);
let size = SharedBufferBatch::measure_batch_size(&sorted_items);
let size = SharedBufferBatch::measure_batch_size(&sorted_items, None);
let imm = SharedBufferBatch::build_shared_buffer_batch_for_test(
epoch,
0,
Expand Down Expand Up @@ -87,7 +87,7 @@ async fn test_read_version_basic() {
for i in 0..5 {
epoch.inc_epoch();
let sorted_items = gen_dummy_batch(i + 2);
let size = SharedBufferBatch::measure_batch_size(&sorted_items);
let size = SharedBufferBatch::measure_batch_size(&sorted_items, None);
let imm = SharedBufferBatch::build_shared_buffer_batch_for_test(
epoch,
0,
Expand Down Expand Up @@ -275,7 +275,7 @@ async fn test_read_filter_basic() {
{
// single imm
let sorted_items = gen_dummy_batch(epoch);
let size = SharedBufferBatch::measure_batch_size(&sorted_items);
let size = SharedBufferBatch::measure_batch_size(&sorted_items, None);
let imm = SharedBufferBatch::build_shared_buffer_batch_for_test(
epoch,
0,
Expand Down
3 changes: 3 additions & 0 deletions src/storage/src/hummock/compactor/shared_buffer_compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ pub async fn compact(
}

/// For compaction from shared buffer to level 0, this is the only function gets called.
///
/// The `IS_NEW_VALUE` flag means for the given payload, we are doing compaction using its new value or old value.
/// When `IS_NEW_VALUE` is false, we are compacting with old value, and the payload imms should have `old_values` not `None`
async fn compact_shared_buffer<const IS_NEW_VALUE: bool>(
context: CompactorContext,
sstable_object_id_manager: SstableObjectIdManagerRef,
Expand Down
2 changes: 1 addition & 1 deletion src/storage/src/hummock/event_handler/uploader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1317,7 +1317,7 @@ mod tests {
TableKey(Bytes::from(dummy_table_key())),
SharedBufferValue::Delete,
)];
let size = SharedBufferBatch::measure_batch_size(&sorted_items);
let size = SharedBufferBatch::measure_batch_size(&sorted_items, None);
let tracker = match limiter {
Some(limiter) => Some(limiter.require_memory(size as u64).await),
None => None,
Expand Down
19 changes: 16 additions & 3 deletions src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::cmp::Ordering;
use std::fmt::Debug;
use std::future::Future;
use std::marker::PhantomData;
use std::mem::size_of_val;
use std::ops::Bound::Included;
use std::ops::{Bound, RangeBounds};
use std::sync::atomic::AtomicU64;
Expand Down Expand Up @@ -121,6 +122,8 @@ impl SharedBufferKeyEntry {
pub(crate) struct SharedBufferBatchInner {
entries: Vec<SharedBufferKeyEntry>,
new_values: Vec<VersionedSharedBufferValue>,
/// Store the old values. If some, the length should be the same as `new_values`. It contains empty `Bytes` when the
/// corresponding `new_value` is `Insert`, and contains the old values of `Update` and `Delete`.
old_values: Option<Vec<Bytes>>,
/// The epochs of the data in batch, sorted in ascending order (old to new)
epochs: Vec<HummockEpoch>,
Expand All @@ -143,6 +146,9 @@ impl SharedBufferBatchInner {
) -> Self {
assert!(!payload.is_empty());
debug_assert!(payload.iter().is_sorted_by_key(|(key, _)| key));
if let Some(old_values) = &old_values {
assert_eq!(old_values.len(), payload.len());
}

let epoch_with_gap = EpochWithGap::new(epoch, spill_offset);
let mut entries = Vec::with_capacity(payload.len());
Expand Down Expand Up @@ -277,7 +283,7 @@ impl SharedBufferBatch {
epoch: HummockEpoch,
table_id: TableId,
) -> Self {
let size = Self::measure_batch_size(&sorted_items);
let size = Self::measure_batch_size(&sorted_items, old_values.as_deref());

Self {
inner: Arc::new(SharedBufferBatchInner::new(
Expand Down Expand Up @@ -311,7 +317,10 @@ impl SharedBufferBatch {
.sum()
}

pub fn measure_batch_size(batch_items: &[SharedBufferItem]) -> usize {
pub fn measure_batch_size(
batch_items: &[SharedBufferItem],
old_values: Option<&[Bytes]>,
) -> usize {
// size = Sum(length of full key + length of user value)
batch_items
.iter()
Expand All @@ -325,7 +334,11 @@ impl SharedBufferBatch {
}
}
})
.sum()
.sum::<usize>()
+ old_values
.iter()
.flat_map(|slice| slice.iter().map(|value| size_of_val(value) + value.len()))
.sum::<usize>()
}

pub fn filter<R, B>(&self, table_id: TableId, table_key_range: &R) -> bool
Expand Down
5 changes: 3 additions & 2 deletions src/storage/src/hummock/store/local_hummock_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use super::version::{StagingData, VersionUpdate};
use crate::error::StorageResult;
use crate::hummock::event_handler::hummock_event_handler::HummockEventSender;
use crate::hummock::event_handler::{HummockEvent, HummockReadVersionRef, LocalInstanceGuard};
use crate::hummock::iterator::change_log::ChangeLogIterator;
use crate::hummock::iterator::{
ConcatIteratorInner, Forward, HummockIteratorUnion, MergeIterator, UserIterator,
};
Expand All @@ -42,7 +43,6 @@ use crate::hummock::utils::{
};
use crate::hummock::write_limiter::WriteLimiterRef;
use crate::hummock::{MemoryLimiter, SstableIterator};
use crate::hummock::iterator::change_log::ChangeLogIterator;
use crate::mem_table::{KeyOp, MemTable, MemTableHummockIterator};
use crate::monitor::{HummockStateStoreMetrics, IterLocalMetricsGuard, StoreLocalStatistic};
use crate::store::*;
Expand Down Expand Up @@ -496,7 +496,8 @@ impl LocalHummockStorage {
.start_timer();

let imm_size = if !sorted_items.is_empty() {
let size = SharedBufferBatch::measure_batch_size(&sorted_items);
let size = SharedBufferBatch::measure_batch_size(&sorted_items, old_values.as_deref());

self.write_limiter.wait_permission(self.table_id).await;
let limiter = self.memory_limiter.as_ref();
let tracker = if let Some(tracker) = limiter.try_require_memory(size as u64) {
Expand Down
2 changes: 1 addition & 1 deletion src/storage/src/store_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ macro_rules! dispatch_state_store {
}};
}

#[cfg(debug_assertions)]
#[cfg(any(debug_assertions, test, feature = "test"))]
pub mod verify {
use std::fmt::Debug;
use std::future::Future;
Expand Down

0 comments on commit ad0e146

Please sign in to comment.