Skip to content

Commit

Permalink
fix: dedup staging sst on storage table read (#14664)
Browse files Browse the repository at this point in the history
  • Loading branch information
hzxa21 authored Jan 23, 2024
1 parent c75db2f commit 4c953eb
Show file tree
Hide file tree
Showing 7 changed files with 133 additions and 50 deletions.
88 changes: 77 additions & 11 deletions src/storage/hummock_test/src/hummock_read_version_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use itertools::Itertools;
use parking_lot::RwLock;
use risingwave_common::catalog::TableId;
use risingwave_hummock_sdk::key::{key_with_epoch, map_table_key_range};
use risingwave_hummock_sdk::LocalSstableInfo;
use risingwave_hummock_sdk::{HummockEpoch, LocalSstableInfo};
use risingwave_meta::hummock::test_utils::setup_compute_env;
use risingwave_pb::hummock::{KeyRange, SstableInfo};
use risingwave_storage::hummock::iterator::test_utils::{
Expand All @@ -31,7 +31,7 @@ use risingwave_storage::hummock::store::version::{
read_filter_for_batch, read_filter_for_local, HummockReadVersion, StagingData,
StagingSstableInfo, VersionUpdate,
};
use risingwave_storage::hummock::test_utils::gen_dummy_batch;
use risingwave_storage::hummock::test_utils::{gen_dummy_batch, gen_dummy_sst_info};

use crate::test_utils::prepare_first_valid_version;

Expand All @@ -52,15 +52,13 @@ async fn test_read_version_basic() {
let kv_pairs = gen_dummy_batch(epoch);
let sorted_items = SharedBufferBatch::build_shared_buffer_item_batches(kv_pairs);
let size = SharedBufferBatch::measure_batch_size(&sorted_items);
let imm = SharedBufferBatch::build_shared_buffer_batch(
let imm = SharedBufferBatch::build_shared_buffer_batch_for_test(
epoch,
0,
sorted_items,
size,
vec![],
TableId::from(table_id),
None,
None,
);

read_version.update(VersionUpdate::Staging(StagingData::ImmMem(imm)));
Expand Down Expand Up @@ -91,15 +89,13 @@ async fn test_read_version_basic() {
let kv_pairs = gen_dummy_batch(epoch);
let sorted_items = SharedBufferBatch::build_shared_buffer_item_batches(kv_pairs);
let size = SharedBufferBatch::measure_batch_size(&sorted_items);
let imm = SharedBufferBatch::build_shared_buffer_batch(
let imm = SharedBufferBatch::build_shared_buffer_batch_for_test(
epoch,
0,
sorted_items,
size,
vec![],
TableId::from(table_id),
None,
None,
);

read_version.update(VersionUpdate::Staging(StagingData::ImmMem(imm)));
Expand Down Expand Up @@ -278,15 +274,13 @@ async fn test_read_filter_basic() {
let kv_pairs = gen_dummy_batch(epoch);
let sorted_items = SharedBufferBatch::build_shared_buffer_item_batches(kv_pairs);
let size = SharedBufferBatch::measure_batch_size(&sorted_items);
let imm = SharedBufferBatch::build_shared_buffer_batch(
let imm = SharedBufferBatch::build_shared_buffer_batch_for_test(
epoch,
0,
sorted_items,
size,
vec![],
TableId::from(table_id),
None,
None,
);

read_version
Expand Down Expand Up @@ -344,3 +338,75 @@ async fn test_read_filter_basic() {
}
}
}

#[tokio::test]
async fn test_read_filter_for_batch_issue_14659() {
use std::ops::Bound::Unbounded;

let (env, hummock_manager_ref, _cluster_manager_ref, worker_node) =
setup_compute_env(8080).await;

let (pinned_version, _, _) =
prepare_first_valid_version(env, hummock_manager_ref, worker_node).await;

const NUM_SHARDS: u64 = 2;
let table_id = TableId::from(2);
let epoch = 1;
let mut read_version_vec = vec![];
let mut imms = vec![];

// Populate IMMs
for i in 0..NUM_SHARDS {
let read_version = Arc::new(RwLock::new(HummockReadVersion::new(
table_id,
pinned_version.clone(),
)));

let items = SharedBufferBatch::build_shared_buffer_item_batches(gen_dummy_batch(i));
let size = SharedBufferBatch::measure_batch_size(&items);
let imm = SharedBufferBatch::build_shared_buffer_batch_for_test(
epoch,
0,
items,
size,
vec![],
table_id,
);

imms.push(imm.clone());

read_version
.write()
.update(VersionUpdate::Staging(StagingData::ImmMem(imm)));

read_version_vec.push(read_version);
}

// Update read version via staging SSTs
let sst_id = 233;
let staging_sst = gen_dummy_sst_info(sst_id, imms.clone(), table_id, epoch);
read_version_vec.iter().for_each(|v| {
v.write().update(VersionUpdate::Staging(StagingData::Sst(
StagingSstableInfo::new(
vec![LocalSstableInfo::for_test(staging_sst.clone())],
vec![epoch],
imms.iter().map(|imm| imm.batch_id()).collect_vec(),
imms.iter().map(|imm| imm.size()).sum(),
),
)));
});

// build for batch with max epoch
let (_, hummock_read_snapshot) = read_filter_for_batch(
HummockEpoch::MAX,
table_id,
(Unbounded, Unbounded),
read_version_vec,
)
.unwrap();

// No imms should be proivided
assert_eq!(0, hummock_read_snapshot.0.len());
// Only 1 staging sst is provided
assert_eq!(1, hummock_read_snapshot.1.len());
}
20 changes: 5 additions & 15 deletions src/storage/src/hummock/compactor/shared_buffer_compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,7 @@ mod tests {

#[tokio::test]
async fn test_generate_splits_in_order() {
let imm1 = ImmutableMemtable::build_shared_buffer_batch(
let imm1 = ImmutableMemtable::build_shared_buffer_batch_for_test(
3,
0,
vec![(
Expand All @@ -629,10 +629,8 @@ mod tests {
1024 * 1024,
vec![],
TableId::new(1),
None,
None,
);
let imm2 = ImmutableMemtable::build_shared_buffer_batch(
let imm2 = ImmutableMemtable::build_shared_buffer_batch_for_test(
3,
0,
vec![(
Expand All @@ -642,11 +640,9 @@ mod tests {
(1024 + 256) * 1024,
vec![],
TableId::new(1),
None,
None,
);

let imm3 = ImmutableMemtable::build_shared_buffer_batch(
let imm3 = ImmutableMemtable::build_shared_buffer_batch_for_test(
2,
0,
vec![(
Expand All @@ -656,10 +652,8 @@ mod tests {
(1024 + 512) * 1024,
vec![],
TableId::new(1),
None,
None,
);
let imm4 = ImmutableMemtable::build_shared_buffer_batch(
let imm4 = ImmutableMemtable::build_shared_buffer_batch_for_test(
3,
0,
vec![(
Expand All @@ -669,11 +663,9 @@ mod tests {
(1024 + 512) * 1024,
vec![],
TableId::new(1),
None,
None,
);

let imm5 = ImmutableMemtable::build_shared_buffer_batch(
let imm5 = ImmutableMemtable::build_shared_buffer_batch_for_test(
3,
0,
vec![(
Expand All @@ -683,8 +675,6 @@ mod tests {
(1024 + 256) * 1024,
vec![],
TableId::new(2),
None,
None,
);

let storage_opts = StorageOpts {
Expand Down
2 changes: 1 addition & 1 deletion src/storage/src/hummock/event_handler/uploader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1222,7 +1222,7 @@ mod tests {
size,
vec![],
TEST_TABLE_ID,
None,
LocalInstanceId::default(),
tracker,
)
}
Expand Down
45 changes: 31 additions & 14 deletions src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -584,7 +584,7 @@ impl SharedBufferBatch {
size: usize,
delete_ranges: Vec<(Bound<Bytes>, Bound<Bytes>)>,
table_id: TableId,
instance_id: Option<LocalInstanceId>,
instance_id: LocalInstanceId,
tracker: Option<MemoryTracker>,
) -> Self {
let inner = SharedBufferBatchInner::new(
Expand All @@ -599,7 +599,7 @@ impl SharedBufferBatch {
SharedBufferBatch {
inner: Arc::new(inner),
table_id,
instance_id: instance_id.unwrap_or_default(),
instance_id,
}
}

Expand Down Expand Up @@ -642,6 +642,31 @@ impl SharedBufferBatch {
}
vnodes
}

#[cfg(any(test, feature = "test"))]
pub fn build_shared_buffer_batch_for_test(
epoch: HummockEpoch,
spill_offset: u16,
sorted_items: Vec<SharedBufferItem>,
size: usize,
delete_ranges: Vec<(Bound<Bytes>, Bound<Bytes>)>,
table_id: TableId,
) -> Self {
let inner = SharedBufferBatchInner::new(
table_id,
epoch,
spill_offset,
sorted_items,
delete_ranges,
size,
None,
);
SharedBufferBatch {
inner: Arc::new(inner),
table_id,
instance_id: LocalInstanceId::default(),
}
}
}

/// Iterate all the items in the shared buffer batch
Expand Down Expand Up @@ -991,7 +1016,7 @@ mod tests {
output.reverse();
assert_eq!(output, shared_buffer_items);

let batch = SharedBufferBatch::build_shared_buffer_batch(
let batch = SharedBufferBatch::build_shared_buffer_batch_for_test(
epoch,
0,
vec![],
Expand All @@ -1007,8 +1032,6 @@ mod tests {
),
],
TableId::new(0),
None,
None,
);
assert_eq!(batch.start_table_key().as_ref(), "a".as_bytes());
assert_eq!(
Expand Down Expand Up @@ -1174,15 +1197,13 @@ mod tests {
Bound::Excluded(Bytes::from(b"eee".to_vec())),
),
];
let shared_buffer_batch = SharedBufferBatch::build_shared_buffer_batch(
let shared_buffer_batch = SharedBufferBatch::build_shared_buffer_batch_for_test(
epoch,
0,
vec![],
0,
delete_ranges,
Default::default(),
None,
None,
);
assert_eq!(
epoch,
Expand Down Expand Up @@ -1467,15 +1488,13 @@ mod tests {
];
let sorted_items1 = transform_shared_buffer(shared_buffer_items1);
let size = SharedBufferBatch::measure_batch_size(&sorted_items1);
let imm1 = SharedBufferBatch::build_shared_buffer_batch(
let imm1 = SharedBufferBatch::build_shared_buffer_batch_for_test(
epoch,
0,
sorted_items1,
size,
delete_ranges,
table_id,
None,
None,
);

let epoch = 2;
Expand Down Expand Up @@ -1513,15 +1532,13 @@ mod tests {
];
let sorted_items2 = transform_shared_buffer(shared_buffer_items2);
let size = SharedBufferBatch::measure_batch_size(&sorted_items2);
let imm2 = SharedBufferBatch::build_shared_buffer_batch(
let imm2 = SharedBufferBatch::build_shared_buffer_batch_for_test(
epoch,
0,
sorted_items2,
size,
delete_ranges,
table_id,
None,
None,
);

let imms = vec![imm2, imm1];
Expand Down
2 changes: 1 addition & 1 deletion src/storage/src/hummock/store/local_hummock_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ impl LocalHummockStorage {
size,
delete_ranges,
table_id,
Some(instance_id),
instance_id,
Some(tracker),
);
self.spill_offset += 1;
Expand Down
19 changes: 13 additions & 6 deletions src/storage/src/hummock/store/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -537,22 +537,29 @@ pub fn read_filter_for_batch(

let mut imm_vec = Vec::default();
let mut sst_vec = Vec::default();
let mut seen_imm_ids = HashSet::new();
let mut seen_sst_ids = HashSet::new();

// only filter the staging data that epoch greater than max_mce to avoid data duplication
let (min_epoch, max_epoch) = (max_mce_version.max_committed_epoch(), epoch);
// prune imm and sst with max_mce
for (staging_imms, staging_ssts) in staging_vec {
imm_vec.extend(
staging_imms
.into_iter()
.filter(|imm| imm.min_epoch() > min_epoch && imm.min_epoch() <= max_epoch),
);
imm_vec.extend(staging_imms.into_iter().filter(|imm| {
// There shouldn't be duplicated IMMs because merge imm only operates on a single shard.
assert!(seen_imm_ids.insert(imm.batch_id()));
imm.min_epoch() > min_epoch && imm.min_epoch() <= max_epoch
}));

sst_vec.extend(staging_ssts.into_iter().filter(|staging_sst| {
assert!(
staging_sst.get_max_epoch() <= min_epoch || staging_sst.get_min_epoch() > min_epoch
);
staging_sst.min_epoch > min_epoch
// Dedup staging SSTs in different shard. Duplicates can happen in the following case:
// - Table 1 Shard 1 produces IMM 1
// - Table 1 Shard 2 produces IMM 2
// - IMM 1 and IMM 2 are compacted into SST 1 as a Staging SST
// - SST 1 is added to both Shard 1's and Shard 2's read version
staging_sst.min_epoch > min_epoch && seen_sst_ids.insert(staging_sst.object_id)
}));
}

Expand Down
Loading

0 comments on commit 4c953eb

Please sign in to comment.