diff --git a/src/storage/hummock_test/src/hummock_read_version_tests.rs b/src/storage/hummock_test/src/hummock_read_version_tests.rs index 6b39806871f29..fc8fe8fbd0985 100644 --- a/src/storage/hummock_test/src/hummock_read_version_tests.rs +++ b/src/storage/hummock_test/src/hummock_read_version_tests.rs @@ -53,7 +53,7 @@ async fn test_read_version_basic() { { // single imm let sorted_items = gen_dummy_batch(1); - let size = SharedBufferBatch::measure_batch_size(&sorted_items); + let size = SharedBufferBatch::measure_batch_size(&sorted_items, None); let imm = SharedBufferBatch::build_shared_buffer_batch_for_test( epoch, 0, @@ -87,7 +87,7 @@ async fn test_read_version_basic() { for i in 0..5 { epoch.inc_epoch(); let sorted_items = gen_dummy_batch(i + 2); - let size = SharedBufferBatch::measure_batch_size(&sorted_items); + let size = SharedBufferBatch::measure_batch_size(&sorted_items, None); let imm = SharedBufferBatch::build_shared_buffer_batch_for_test( epoch, 0, @@ -275,7 +275,7 @@ async fn test_read_filter_basic() { { // single imm let sorted_items = gen_dummy_batch(epoch); - let size = SharedBufferBatch::measure_batch_size(&sorted_items); + let size = SharedBufferBatch::measure_batch_size(&sorted_items, None); let imm = SharedBufferBatch::build_shared_buffer_batch_for_test( epoch, 0, diff --git a/src/storage/src/hummock/compactor/shared_buffer_compact.rs b/src/storage/src/hummock/compactor/shared_buffer_compact.rs index 5d60de930a43d..1cd0fbb560699 100644 --- a/src/storage/src/hummock/compactor/shared_buffer_compact.rs +++ b/src/storage/src/hummock/compactor/shared_buffer_compact.rs @@ -137,6 +137,9 @@ pub async fn compact( } /// For compaction from shared buffer to level 0, this is the only function gets called. +/// +/// The `IS_NEW_VALUE` flag means for the given payload, we are doing compaction using its new value or old value. +/// When `IS_NEW_VALUE` is false, we are compacting with old value, and the payload imms should have `old_values` not `None` async fn compact_shared_buffer( context: CompactorContext, sstable_object_id_manager: SstableObjectIdManagerRef, diff --git a/src/storage/src/hummock/event_handler/uploader.rs b/src/storage/src/hummock/event_handler/uploader.rs index 7cbc35f9373b0..38b4e4d3935d8 100644 --- a/src/storage/src/hummock/event_handler/uploader.rs +++ b/src/storage/src/hummock/event_handler/uploader.rs @@ -1317,7 +1317,7 @@ mod tests { TableKey(Bytes::from(dummy_table_key())), SharedBufferValue::Delete, )]; - let size = SharedBufferBatch::measure_batch_size(&sorted_items); + let size = SharedBufferBatch::measure_batch_size(&sorted_items, None); let tracker = match limiter { Some(limiter) => Some(limiter.require_memory(size as u64).await), None => None, diff --git a/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs b/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs index dd167a532718f..a62631421c8c8 100644 --- a/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs +++ b/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs @@ -16,6 +16,7 @@ use std::cmp::Ordering; use std::fmt::Debug; use std::future::Future; use std::marker::PhantomData; +use std::mem::size_of_val; use std::ops::Bound::Included; use std::ops::{Bound, RangeBounds}; use std::sync::atomic::AtomicU64; @@ -121,6 +122,8 @@ impl SharedBufferKeyEntry { pub(crate) struct SharedBufferBatchInner { entries: Vec, new_values: Vec, + /// Store the old values. If some, the length should be the same as `new_values`. It contains empty `Bytes` when the + /// corresponding `new_value` is `Insert`, and contains the old values of `Update` and `Delete`. old_values: Option>, /// The epochs of the data in batch, sorted in ascending order (old to new) epochs: Vec, @@ -143,6 +146,9 @@ impl SharedBufferBatchInner { ) -> Self { assert!(!payload.is_empty()); debug_assert!(payload.iter().is_sorted_by_key(|(key, _)| key)); + if let Some(old_values) = &old_values { + assert_eq!(old_values.len(), payload.len()); + } let epoch_with_gap = EpochWithGap::new(epoch, spill_offset); let mut entries = Vec::with_capacity(payload.len()); @@ -277,7 +283,7 @@ impl SharedBufferBatch { epoch: HummockEpoch, table_id: TableId, ) -> Self { - let size = Self::measure_batch_size(&sorted_items); + let size = Self::measure_batch_size(&sorted_items, old_values.as_deref()); Self { inner: Arc::new(SharedBufferBatchInner::new( @@ -311,7 +317,10 @@ impl SharedBufferBatch { .sum() } - pub fn measure_batch_size(batch_items: &[SharedBufferItem]) -> usize { + pub fn measure_batch_size( + batch_items: &[SharedBufferItem], + old_values: Option<&[Bytes]>, + ) -> usize { // size = Sum(length of full key + length of user value) batch_items .iter() @@ -325,7 +334,11 @@ impl SharedBufferBatch { } } }) - .sum() + .sum::() + + old_values + .iter() + .flat_map(|slice| slice.iter().map(|value| size_of_val(value) + value.len())) + .sum::() } pub fn filter(&self, table_id: TableId, table_key_range: &R) -> bool diff --git a/src/storage/src/hummock/store/local_hummock_storage.rs b/src/storage/src/hummock/store/local_hummock_storage.rs index 7ab9d0396290c..a3823786613f9 100644 --- a/src/storage/src/hummock/store/local_hummock_storage.rs +++ b/src/storage/src/hummock/store/local_hummock_storage.rs @@ -29,6 +29,7 @@ use super::version::{StagingData, VersionUpdate}; use crate::error::StorageResult; use crate::hummock::event_handler::hummock_event_handler::HummockEventSender; use crate::hummock::event_handler::{HummockEvent, HummockReadVersionRef, LocalInstanceGuard}; +use crate::hummock::iterator::change_log::ChangeLogIterator; use crate::hummock::iterator::{ ConcatIteratorInner, Forward, HummockIteratorUnion, MergeIterator, UserIterator, }; @@ -42,7 +43,6 @@ use crate::hummock::utils::{ }; use crate::hummock::write_limiter::WriteLimiterRef; use crate::hummock::{MemoryLimiter, SstableIterator}; -use crate::hummock::iterator::change_log::ChangeLogIterator; use crate::mem_table::{KeyOp, MemTable, MemTableHummockIterator}; use crate::monitor::{HummockStateStoreMetrics, IterLocalMetricsGuard, StoreLocalStatistic}; use crate::store::*; @@ -496,7 +496,8 @@ impl LocalHummockStorage { .start_timer(); let imm_size = if !sorted_items.is_empty() { - let size = SharedBufferBatch::measure_batch_size(&sorted_items); + let size = SharedBufferBatch::measure_batch_size(&sorted_items, old_values.as_deref()); + self.write_limiter.wait_permission(self.table_id).await; let limiter = self.memory_limiter.as_ref(); let tracker = if let Some(tracker) = limiter.try_require_memory(size as u64) { diff --git a/src/storage/src/store_impl.rs b/src/storage/src/store_impl.rs index 7fdb838654b70..e7c8c6e45f7ea 100644 --- a/src/storage/src/store_impl.rs +++ b/src/storage/src/store_impl.rs @@ -201,7 +201,7 @@ macro_rules! dispatch_state_store { }}; } -#[cfg(debug_assertions)] +#[cfg(any(debug_assertions, test, feature = "test"))] pub mod verify { use std::fmt::Debug; use std::future::Future;