Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): store old value in memtable #15301

Merged
merged 8 commits into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,12 @@ message HummockVersionDelta {
reserved "gc_object_ids";
map<uint32, TableWatermarks> new_table_watermarks = 8;
repeated uint32 removed_table_ids = 9;
message ChangeLogDelta {
EpochNewChangeLog new_log = 1;
// only logs in epoch later than truncate_epoch will be preserved
uint64 truncate_epoch = 2;
}
map<uint32, ChangeLogDelta> change_log_delta = 10;
}

message HummockVersionDeltas {
Expand Down
1 change: 1 addition & 0 deletions proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ message BarrierCompleteResponse {
repeated GroupedSstableInfo synced_sstables = 4;
uint32 worker_id = 5;
map<uint32, hummock.TableWatermarks> table_watermarks = 6;
repeated hummock.SstableInfo old_value_sstables = 7;
}

// Before starting streaming, the leader node broadcast the actor-host table to needed workers.
Expand Down
15 changes: 14 additions & 1 deletion src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::assert_matches::assert_matches;
use std::collections::hash_map::Entry;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::future::pending;
use std::iter::empty;
use std::mem::{replace, take};
use std::sync::Arc;
use std::time::Duration;
Expand All @@ -30,6 +31,7 @@ use risingwave_common::catalog::TableId;
use risingwave_common::system_param::reader::SystemParamsRead;
use risingwave_common::system_param::PAUSE_ON_NEXT_BOOTSTRAP_KEY;
use risingwave_common::util::epoch::{Epoch, INVALID_EPOCH};
use risingwave_hummock_sdk::change_log::build_table_change_log_delta;
use risingwave_hummock_sdk::table_watermark::{
merge_multiple_new_table_watermarks, TableWatermarks,
};
Expand Down Expand Up @@ -1144,11 +1146,12 @@ pub type BarrierManagerRef = GlobalBarrierManagerContext;
fn collect_commit_epoch_info(
resps: Vec<BarrierCompleteResponse>,
command_ctx: &CommandContext,
_epochs: &Vec<u64>,
epochs: &Vec<u64>,
) -> CommitEpochInfo {
let mut sst_to_worker: HashMap<HummockSstableObjectId, WorkerId> = HashMap::new();
let mut synced_ssts: Vec<ExtendedSstableInfo> = vec![];
let mut table_watermarks = Vec::with_capacity(resps.len());
let mut old_value_ssts = Vec::with_capacity(resps.len());
for resp in resps {
let ssts_iter = resp.synced_sstables.into_iter().map(|grouped| {
let sst_info = grouped.sst.expect("field not None");
Expand All @@ -1161,6 +1164,7 @@ fn collect_commit_epoch_info(
});
synced_ssts.extend(ssts_iter);
table_watermarks.push(resp.table_watermarks);
old_value_ssts.extend(resp.old_value_sstables);
}
let new_table_fragment_info = if let Command::CreateStreamingJob {
table_fragments, ..
Expand All @@ -1179,6 +1183,14 @@ fn collect_commit_epoch_info(
None
};

let table_new_change_log = build_table_change_log_delta(
old_value_ssts.into_iter(),
synced_ssts.iter().map(|sst| &sst.sst_info),
epochs,
// TODO: pass log store table id and the corresponding truncate_epoch
empty(),
);

CommitEpochInfo::new(
synced_ssts,
merge_multiple_new_table_watermarks(
Expand All @@ -1199,5 +1211,6 @@ fn collect_commit_epoch_info(
),
sst_to_worker,
new_table_fragment_info,
table_new_change_log,
)
}
7 changes: 7 additions & 0 deletions src/meta/src/hummock/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ pub struct CommitEpochInfo {
pub new_table_watermarks: HashMap<TableId, TableWatermarks>,
pub sst_to_context: HashMap<HummockSstableObjectId, HummockContextId>,
pub new_table_fragment_info: Option<NewTableFragmentInfo>,
pub change_log_delta: HashMap<TableId, ChangeLogDelta>,
}

impl CommitEpochInfo {
Expand All @@ -271,12 +272,14 @@ impl CommitEpochInfo {
new_table_watermarks: HashMap<TableId, TableWatermarks>,
sst_to_context: HashMap<HummockSstableObjectId, HummockContextId>,
new_table_fragment_info: Option<NewTableFragmentInfo>,
change_log_delta: HashMap<TableId, ChangeLogDelta>,
) -> Self {
Self {
sstables,
new_table_watermarks,
sst_to_context,
new_table_fragment_info,
change_log_delta,
}
}

Expand All @@ -290,6 +293,7 @@ impl CommitEpochInfo {
HashMap::new(),
sst_to_context,
None,
HashMap::new(),
)
}
}
Expand Down Expand Up @@ -1624,6 +1628,7 @@ impl HummockManager {
new_table_watermarks,
sst_to_context,
new_table_fragment_info,
change_log_delta,
} = commit_info;
let mut versioning_guard = write_lock!(self, versioning).await;
let _timer = start_measure_real_process_timer!(self);
Expand Down Expand Up @@ -1659,6 +1664,7 @@ impl HummockManager {
);
new_version_delta.max_committed_epoch = epoch;
new_version_delta.new_table_watermarks = new_table_watermarks;
new_version_delta.change_log_delta = change_log_delta;

let mut table_compaction_group_mapping = old_version.build_compaction_group_info();

Expand Down Expand Up @@ -3475,6 +3481,7 @@ fn init_selectors() -> HashMap<compact_task::TaskType, Box<dyn CompactionSelecto
type CompactionRequestChannelItem = (CompactionGroupId, compact_task::TaskType);
use risingwave_hummock_sdk::table_watermark::TableWatermarks;
use risingwave_hummock_sdk::version::HummockVersion;
use risingwave_pb::hummock::hummock_version_delta::ChangeLogDelta;
use tokio::sync::mpsc::error::SendError;

use super::compaction::CompactionSelector;
Expand Down
16 changes: 15 additions & 1 deletion src/meta/src/hummock/mock_hummock_meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use async_trait::async_trait;
use fail::fail_point;
use futures::stream::BoxStream;
use futures::{Stream, StreamExt};
use risingwave_hummock_sdk::change_log::build_table_change_log_delta;
use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
use risingwave_hummock_sdk::version::HummockVersion;
use risingwave_hummock_sdk::{
Expand Down Expand Up @@ -156,13 +157,25 @@ impl HummockMetaClient for MockHummockMetaClient {
}

async fn commit_epoch(&self, epoch: HummockEpoch, sync_result: SyncResult) -> Result<()> {
let version: HummockVersion = self.hummock_manager.get_current_version().await;
let sst_to_worker = sync_result
.uncommitted_ssts
.iter()
.map(|LocalSstableInfo { sst_info, .. }| (sst_info.get_object_id(), self.context_id))
.collect();
let new_table_watermark = sync_result.table_watermarks;

let table_change_log = build_table_change_log_delta(
sync_result
.old_value_ssts
.into_iter()
.map(|sst| sst.sst_info),
sync_result.uncommitted_ssts.iter().map(|sst| &sst.sst_info),
&vec![epoch],
version
.levels
.values()
.flat_map(|group| group.member_table_ids.iter().map(|table_id| (*table_id, 0))),
);
self.hummock_manager
.commit_epoch(
epoch,
Expand All @@ -175,6 +188,7 @@ impl HummockMetaClient for MockHummockMetaClient {
new_table_watermark,
sst_to_worker,
None,
table_change_log,
),
)
.await
Expand Down
7 changes: 4 additions & 3 deletions src/storage/benches/bench_imm_compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use risingwave_common::catalog::TableId;
use risingwave_hummock_sdk::key::TableKey;
use risingwave_storage::hummock::compactor::merge_imms_in_memory;
use risingwave_storage::hummock::shared_buffer::shared_buffer_batch::SharedBufferBatch;
use risingwave_storage::hummock::value::HummockValue;
use risingwave_storage::hummock::shared_buffer::shared_buffer_batch::{
SharedBufferBatch, SharedBufferValue,
};

fn gen_interleave_shared_buffer_batch(
batch_size: usize,
Expand All @@ -34,7 +35,7 @@ fn gen_interleave_shared_buffer_batch(
TableKey(Bytes::copy_from_slice(
format!("test_key_{:08}", j * batch_count + i).as_bytes(),
)),
HummockValue::put(Bytes::copy_from_slice("value".as_bytes())),
SharedBufferValue::Insert(Bytes::copy_from_slice("value".as_bytes())),
));
}
let batch = SharedBufferBatch::for_test(batch_data, epoch, Default::default());
Expand Down
7 changes: 3 additions & 4 deletions src/storage/benches/bench_merge_iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@ use risingwave_storage::hummock::iterator::{
Forward, HummockIterator, HummockIteratorUnion, MergeIterator, SkipWatermarkIterator,
};
use risingwave_storage::hummock::shared_buffer::shared_buffer_batch::{
SharedBufferBatch, SharedBufferBatchIterator,
SharedBufferBatch, SharedBufferBatchIterator, SharedBufferValue,
};
use risingwave_storage::hummock::value::HummockValue;

fn gen_interleave_shared_buffer_batch_iter(
batch_size: usize,
Expand All @@ -39,7 +38,7 @@ fn gen_interleave_shared_buffer_batch_iter(
TableKey(Bytes::copy_from_slice(
format!("test_key_{:08}", j * batch_count + i).as_bytes(),
)),
HummockValue::put(Bytes::copy_from_slice("value".as_bytes())),
SharedBufferValue::Insert(Bytes::copy_from_slice("value".as_bytes())),
));
}
let batch = SharedBufferBatch::for_test(batch_data, 2333, Default::default());
Expand Down Expand Up @@ -69,7 +68,7 @@ fn gen_interleave_shared_buffer_batch_enum_iter(
TableKey(Bytes::copy_from_slice(
format!("test_key_{:08}", j * batch_count + i).as_bytes(),
)),
HummockValue::put(Bytes::copy_from_slice("value".as_bytes())),
SharedBufferValue::Insert(Bytes::copy_from_slice("value".as_bytes())),
));
}
let batch = SharedBufferBatch::for_test(batch_data, 2333, Default::default());
Expand Down
Loading
Loading