Skip to content

Commit

Permalink
perf: avoid sstable info copy in event handler (cherry-pick #15811) (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
hzxa21 authored Apr 6, 2024
1 parent 08b7c93 commit 5de0c49
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 12 deletions.
6 changes: 3 additions & 3 deletions src/storage/hummock_test/src/hummock_read_version_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ async fn test_read_version_basic() {
.rev()
.collect::<Vec<_>>();

let dummy_sst = StagingSstableInfo::new(
let dummy_sst = Arc::new(StagingSstableInfo::new(
vec![
LocalSstableInfo::for_test(SstableInfo {
object_id: 1,
Expand Down Expand Up @@ -181,7 +181,7 @@ async fn test_read_version_basic() {
epoch_id_vec_for_clear,
batch_id_vec_for_clear,
1,
);
));

{
read_version.update(VersionUpdate::Staging(StagingData::Sst(dummy_sst)));
Expand Down Expand Up @@ -368,7 +368,7 @@ async fn test_read_filter_basic() {

// // Update read version via staging SSTs
// let sst_id = 233;
// let staging_sst = gen_dummy_sst_info(sst_id, imms.clone(), table_id, epoch);
// let staging_sst = Arc::new(gen_dummy_sst_info(sst_id, imms.clone(), table_id, epoch));
// read_version_vec.iter().for_each(|v| {
// v.write().update(VersionUpdate::Staging(StagingData::Sst(
// StagingSstableInfo::new(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,9 +392,10 @@ impl HummockEventHandler {
// older data first
.rev()
.for_each(|staging_sstable_info| {
let staging_sstable_info_ref = Arc::new(staging_sstable_info);
self.for_each_read_version(|read_version| {
read_version.update(VersionUpdate::Staging(StagingData::Sst(
staging_sstable_info.clone(),
staging_sstable_info_ref.clone(),
)))
});
});
Expand Down Expand Up @@ -438,6 +439,7 @@ impl HummockEventHandler {

fn handle_data_spilled(&mut self, staging_sstable_info: StagingSstableInfo) {
// todo: do some prune for version update
let staging_sstable_info = Arc::new(staging_sstable_info);
self.for_each_read_version(|read_version| {
trace!("data_spilled. SST size {}", staging_sstable_info.imm_size());
read_version.update(VersionUpdate::Staging(StagingData::Sst(
Expand Down
16 changes: 8 additions & 8 deletions src/storage/src/hummock/store/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ impl StagingSstableInfo {
pub enum StagingData {
ImmMem(ImmutableMemtable),
MergedImmMem(ImmutableMemtable, Vec<ImmId>),
Sst(StagingSstableInfo),
Sst(Arc<StagingSstableInfo>),
}

pub enum VersionUpdate {
Expand All @@ -139,7 +139,7 @@ pub struct StagingVersion {
pub imm: VecDeque<ImmutableMemtable>,

// newer data comes first
pub sst: VecDeque<StagingSstableInfo>,
pub sst: VecDeque<Arc<StagingSstableInfo>>,
}

impl StagingVersion {
Expand Down Expand Up @@ -285,7 +285,7 @@ impl HummockReadVersion {
StagingData::MergedImmMem(merged_imm, imm_ids) => {
self.add_merged_imm(merged_imm, imm_ids);
}
StagingData::Sst(staging_sst) => {
StagingData::Sst(staging_sst_ref) => {
// The following properties must be ensured:
// 1) self.staging.imm is sorted by imm id descendingly
// 2) staging_sst.imm_ids preserves the imm id partial
Expand All @@ -308,7 +308,7 @@ impl HummockReadVersion {
self.staging.imm.iter().map(|imm| imm.batch_id()).collect();

// intersected batch_id order from oldest to newest
let intersect_imm_ids = staging_sst
let intersect_imm_ids = staging_sst_ref
.imm_ids
.iter()
.rev()
Expand Down Expand Up @@ -343,15 +343,15 @@ impl HummockReadVersion {
staging_sst.epochs {:?},
local_imm_ids {:?},
intersect_imm_ids {:?}",
staging_sst.imm_size,
staging_sst.imm_ids,
staging_sst.epochs,
staging_sst_ref.imm_size,
staging_sst_ref.imm_ids,
staging_sst_ref.epochs,
local_imm_ids,
intersect_imm_ids,
);
}
}
self.staging.sst.push_front(staging_sst);
self.staging.sst.push_front(staging_sst_ref);
}
}
},
Expand Down

0 comments on commit 5de0c49

Please sign in to comment.