diff --git a/src/storage/src/hummock/compactor/iterator.rs b/src/storage/src/hummock/compactor/iterator.rs index 84fa03c7b71d6..df92a6f0d0aa8 100644 --- a/src/storage/src/hummock/compactor/iterator.rs +++ b/src/storage/src/hummock/compactor/iterator.rs @@ -61,6 +61,11 @@ pub struct SstableStreamIterator { task_progress: Arc, io_retry_times: usize, max_io_retry_times: usize, + + // key range cache + key_range_left: FullKey>, + key_range_right: FullKey>, + key_range_right_exclusive: bool, } impl SstableStreamIterator { @@ -87,6 +92,17 @@ impl SstableStreamIterator { sstable_store: SstableStoreRef, max_io_retry_times: usize, ) -> Self { + // filter the block meta with key range + let block_metas = filter_block_metas( + &block_metas, + &existing_table_ids, + sstable_info.key_range.clone(), + ); + + let key_range_left = FullKey::decode(&sstable_info.key_range.left).to_vec(); + let key_range_right = FullKey::decode(&sstable_info.key_range.right).to_vec(); + let key_range_right_exclusive = sstable_info.key_range.right_exclusive; + Self { block_stream: None, block_iter: None, @@ -99,6 +115,9 @@ impl SstableStreamIterator { task_progress, io_retry_times: 0, max_io_retry_times, + key_range_left, + key_range_right, + key_range_right_exclusive, } } @@ -141,6 +160,16 @@ impl SstableStreamIterator { // `next_block()` loads a new block (i.e., `block_iter` is not `None`), then `block_iter` is // also valid and pointing on the block's first KV-pair. + let seek_key = if let Some(seek_key) = seek_key { + if seek_key.cmp(&self.key_range_left.to_ref()).is_lt() { + Some(self.key_range_left.to_ref()) + } else { + Some(seek_key) + } + } else { + Some(self.key_range_left.to_ref()) + }; + if let (Some(block_iter), Some(seek_key)) = (self.block_iter.as_mut(), seek_key) { block_iter.seek(seek_key); @@ -150,7 +179,8 @@ impl SstableStreamIterator { } } - self.prune_from_valid_block_iter().await + self.prune_from_valid_block_iter().await?; + Ok(()) } /// Loads a new block, creates a new iterator for it, and stores that iterator in @@ -217,14 +247,47 @@ impl SstableStreamIterator { self.prune_from_valid_block_iter().await?; } + if !self.is_valid() { + return Ok(()); + } + + // Check if we need to skip the block. + let key = self + .block_iter + .as_ref() + .unwrap_or_else(|| panic!("no block iter sstinfo={}", self.sst_debug_info())) + .key(); + + if self.exceed_key_range_right(key) { + self.block_iter = None; + } + Ok(()) } pub fn key(&self) -> FullKey<&[u8]> { - self.block_iter + let key = self + .block_iter .as_ref() .unwrap_or_else(|| panic!("no block iter sstinfo={}", self.sst_debug_info())) - .key() + .key(); + + assert!( + !self.exceed_key_range_left(key), + "key {:?} key_range_left {:?}", + key, + self.key_range_left.to_ref() + ); + + assert!( + !self.exceed_key_range_right(key), + "key {:?} key_range_right {:?} key_range_right_exclusive {}", + key, + self.key_range_right.to_ref(), + self.key_range_right_exclusive + ); + + key } pub fn value(&self) -> HummockValue<&[u8]> { @@ -255,6 +318,18 @@ impl SstableStreamIterator { fn need_recreate_io_stream(&self) -> bool { self.io_retry_times < self.max_io_retry_times } + + fn exceed_key_range_left(&self, key: FullKey<&[u8]>) -> bool { + key.cmp(&self.key_range_left.to_ref()).is_lt() + } + + fn exceed_key_range_right(&self, key: FullKey<&[u8]>) -> bool { + if self.key_range_right_exclusive { + key.cmp(&self.key_range_right.to_ref()).is_ge() + } else { + key.cmp(&self.key_range_right.to_ref()).is_gt() + } + } } impl Drop for SstableStreamIterator { @@ -373,7 +448,7 @@ impl ConcatSstableIterator { None => self.key_range.clone(), }; - let block_metas = Self::filter_block_metas( + let block_metas = filter_block_metas( &sstable.meta.block_metas, &self.existing_table_ids, filter_key_range, @@ -409,94 +484,6 @@ impl ConcatSstableIterator { } Ok(()) } - - pub fn filter_block_metas( - block_metas: &Vec, - existing_table_ids: &HashSet, - key_range: KeyRange, - ) -> Vec { - if block_metas.is_empty() { - return vec![]; - } - - let mut start_index = if key_range.left.is_empty() { - 0 - } else { - // start_index points to the greatest block whose smallest_key <= seek_key. - block_metas - .partition_point(|block| { - KeyComparator::compare_encoded_full_key(&key_range.left, &block.smallest_key) - != Ordering::Less - }) - .saturating_sub(1) - }; - - let mut end_index = if key_range.right.is_empty() { - block_metas.len() - } else { - let ret = block_metas.partition_point(|block| { - KeyComparator::compare_encoded_full_key(&block.smallest_key, &key_range.right) - != Ordering::Greater - }); - - if ret == 0 { - // not found - return vec![]; - } - - ret - } - .saturating_sub(1); - - // skip blocks that are not in existing_table_ids - while start_index <= end_index { - let start_block_table_id = block_metas[start_index].table_id().table_id(); - if existing_table_ids.contains(&start_block_table_id) { - break; - } - - // skip this table_id - let old_start_index = start_index; - let block_metas_to_search = &block_metas[start_index..=end_index]; - - start_index += block_metas_to_search.partition_point(|block_meta| { - block_meta.table_id().table_id() == start_block_table_id - }); - - if old_start_index == start_index { - // no more blocks with the same table_id - break; - } - } - - while start_index <= end_index { - let end_block_table_id = block_metas[end_index].table_id().table_id(); - if existing_table_ids.contains(&end_block_table_id) { - break; - } - - let old_end_index = end_index; - let block_metas_to_search = &block_metas[start_index..=end_index]; - - end_index = start_index - + block_metas_to_search - .partition_point(|block_meta| { - block_meta.table_id().table_id() < end_block_table_id - }) - .saturating_sub(1); - - if end_index == old_end_index { - // no more blocks with the same table_id - break; - } - } - - if start_index > end_index { - return vec![]; - } - - block_metas[start_index..=end_index].to_vec() - } } impl HummockIterator for ConcatSstableIterator { @@ -637,18 +624,103 @@ impl> HummockIterator for MonitoredCompa } } +pub(crate) fn filter_block_metas( + block_metas: &Vec, + existing_table_ids: &HashSet, + key_range: KeyRange, +) -> Vec { + if block_metas.is_empty() { + return vec![]; + } + + let mut start_index = if key_range.left.is_empty() { + 0 + } else { + // start_index points to the greatest block whose smallest_key <= seek_key. + block_metas + .partition_point(|block| { + KeyComparator::compare_encoded_full_key(&key_range.left, &block.smallest_key) + != Ordering::Less + }) + .saturating_sub(1) + }; + + let mut end_index = if key_range.right.is_empty() { + block_metas.len() + } else { + let ret = block_metas.partition_point(|block| { + KeyComparator::compare_encoded_full_key(&block.smallest_key, &key_range.right) + != Ordering::Greater + }); + + if ret == 0 { + // not found + return vec![]; + } + + ret + } + .saturating_sub(1); + + // skip blocks that are not in existing_table_ids + while start_index <= end_index { + let start_block_table_id = block_metas[start_index].table_id().table_id(); + if existing_table_ids.contains(&start_block_table_id) { + break; + } + + // skip this table_id + let old_start_index = start_index; + let block_metas_to_search = &block_metas[start_index..=end_index]; + + start_index += block_metas_to_search + .partition_point(|block_meta| block_meta.table_id().table_id() == start_block_table_id); + + if old_start_index == start_index { + // no more blocks with the same table_id + break; + } + } + + while start_index <= end_index { + let end_block_table_id = block_metas[end_index].table_id().table_id(); + if existing_table_ids.contains(&end_block_table_id) { + break; + } + + let old_end_index = end_index; + let block_metas_to_search = &block_metas[start_index..=end_index]; + + end_index = start_index + + block_metas_to_search + .partition_point(|block_meta| block_meta.table_id().table_id() < end_block_table_id) + .saturating_sub(1); + + if end_index == old_end_index { + // no more blocks with the same table_id + break; + } + } + + if start_index > end_index { + return vec![]; + } + + block_metas[start_index..=end_index].to_vec() +} + #[cfg(test)] mod tests { use std::cmp::Ordering; use std::collections::HashSet; use risingwave_common::catalog::TableId; - use risingwave_hummock_sdk::key::{next_full_key, prev_full_key, FullKey}; + use risingwave_hummock_sdk::key::{next_full_key, prev_full_key, FullKey, FullKeyTracker}; use risingwave_hummock_sdk::key_range::KeyRange; use crate::hummock::compactor::ConcatSstableIterator; use crate::hummock::iterator::test_utils::mock_sstable_store; - use crate::hummock::iterator::HummockIterator; + use crate::hummock::iterator::{HummockIterator, MergeIterator}; use crate::hummock::test_utils::{ default_builder_opt_for_test, gen_test_sstable_info, test_key_of, test_value_of, TEST_KEYS_COUNT, @@ -866,14 +938,12 @@ mod tests { #[tokio::test] async fn test_filter_block_metas() { + use crate::hummock::compactor::iterator::filter_block_metas; + { let block_metas = Vec::default(); - let ret = ConcatSstableIterator::filter_block_metas( - &block_metas, - &HashSet::default(), - KeyRange::default(), - ); + let ret = filter_block_metas(&block_metas, &HashSet::default(), KeyRange::default()); assert!(ret.is_empty()); } @@ -894,7 +964,7 @@ mod tests { }, ]; - let ret = ConcatSstableIterator::filter_block_metas( + let ret = filter_block_metas( &block_metas, &HashSet::from_iter(vec![1_u32, 2, 3].into_iter()), KeyRange::default(), @@ -933,7 +1003,7 @@ mod tests { }, ]; - let ret = ConcatSstableIterator::filter_block_metas( + let ret = filter_block_metas( &block_metas, &HashSet::from_iter(vec![2_u32, 3].into_iter()), KeyRange::default(), @@ -972,7 +1042,7 @@ mod tests { }, ]; - let ret = ConcatSstableIterator::filter_block_metas( + let ret = filter_block_metas( &block_metas, &HashSet::from_iter(vec![1_u32, 2_u32].into_iter()), KeyRange::default(), @@ -1010,7 +1080,7 @@ mod tests { ..Default::default() }, ]; - let ret = ConcatSstableIterator::filter_block_metas( + let ret = filter_block_metas( &block_metas, &HashSet::from_iter(vec![2_u32].into_iter()), KeyRange::default(), @@ -1049,7 +1119,7 @@ mod tests { ..Default::default() }, ]; - let ret = ConcatSstableIterator::filter_block_metas( + let ret = filter_block_metas( &block_metas, &HashSet::from_iter(vec![2_u32].into_iter()), KeyRange::default(), @@ -1089,7 +1159,7 @@ mod tests { }, ]; - let ret = ConcatSstableIterator::filter_block_metas( + let ret = filter_block_metas( &block_metas, &HashSet::from_iter(vec![2_u32].into_iter()), KeyRange::default(), @@ -1105,4 +1175,79 @@ mod tests { ); } } + + #[tokio::test] + async fn test_iterator_same_obj() { + let sstable_store = mock_sstable_store().await; + + let table_info = gen_test_sstable_info( + default_builder_opt_for_test(), + 1_u64, + (1..10000).map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))), + sstable_store.clone(), + ) + .await; + + let split_key = test_key_of(5000).encode(); + + let mut sst_1 = table_info.clone(); + sst_1.key_range.right = split_key.clone().into(); + sst_1.key_range.right_exclusive = true; + + let total_key_count = sst_1.total_key_count; + let mut sst_2 = table_info.clone(); + sst_2.sst_id = sst_1.sst_id + 1; + sst_2.key_range.left = split_key.clone().into(); + + { + // test concate + let mut full_key_tracker = FullKeyTracker::>::new(FullKey::default()); + + let mut iter = ConcatSstableIterator::for_test( + vec![0], + vec![sst_1.clone(), sst_2.clone()], + KeyRange::default(), + sstable_store.clone(), + ); + + iter.rewind().await.unwrap(); + + let mut key_count = 0; + while iter.is_valid() { + let is_new_user_key = full_key_tracker.observe(iter.key()); + assert!(is_new_user_key); + key_count += 1; + iter.next().await.unwrap(); + } + + assert_eq!(total_key_count, key_count); + } + + { + let mut full_key_tracker = FullKeyTracker::>::new(FullKey::default()); + let concat_1 = ConcatSstableIterator::for_test( + vec![0], + vec![sst_1.clone()], + KeyRange::default(), + sstable_store.clone(), + ); + + let concat_2 = ConcatSstableIterator::for_test( + vec![0], + vec![sst_2.clone()], + KeyRange::default(), + sstable_store.clone(), + ); + + let mut key_count = 0; + let mut iter = MergeIterator::for_compactor(vec![concat_1, concat_2]); + iter.rewind().await.unwrap(); + while iter.is_valid() { + full_key_tracker.observe(iter.key()); + key_count += 1; + iter.next().await.unwrap(); + } + assert_eq!(total_key_count, key_count); + } + } }