Skip to content

Commit

Permalink
cherry-pick #15289
Browse files Browse the repository at this point in the history
  • Loading branch information
hzxa21 committed Apr 2, 2024
1 parent cb5f183 commit 3a59888
Show file tree
Hide file tree
Showing 23 changed files with 561 additions and 588 deletions.
7 changes: 7 additions & 0 deletions src/storage/hummock_sdk/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,13 @@ pub fn vnode_range(range: &TableKeyRange) -> (usize, usize) {
(left, right)
}

// Ensure there is only one vnode involved in table key range and return the vnode
pub fn vnode(range: &TableKeyRange) -> VirtualNode {
let (l, r_exclusive) = vnode_range(range);
assert_eq!(r_exclusive - l, 1);
VirtualNode::from_index(l)
}

/// Converts user key to full key by appending `epoch` to the user key.
pub fn key_with_epoch(mut user_key: Vec<u8>, epoch: HummockEpoch) -> Vec<u8> {
let res = epoch.to_be();
Expand Down
138 changes: 40 additions & 98 deletions src/storage/hummock_sdk/src/table_watermark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use risingwave_pb::hummock::table_watermarks::PbEpochNewWatermarks;
use risingwave_pb::hummock::{PbTableWatermarks, PbVnodeWatermark};
use tracing::{debug, warn};

use crate::key::{prefix_slice_with_vnode, vnode_range, TableKey, TableKeyRange};
use crate::key::{prefix_slice_with_vnode, vnode, TableKey, TableKeyRange};
use crate::HummockEpoch;

#[derive(Clone)]
Expand Down Expand Up @@ -102,79 +102,54 @@ impl TableWatermarksIndex {
self.read_watermark(vnode, HummockEpoch::MAX)
}

pub fn range_watermarks(
pub fn rewrite_range_with_table_watermark(
&self,
epoch: HummockEpoch,
key_range: &mut TableKeyRange,
) -> Option<ReadTableWatermark> {
let mut ret = BTreeMap::new();
let (left, right) = vnode_range(key_range);
if right - left == 1 {
// the table key range falls in a single vnode. No table watermark will be returned, and instead the key range
// will be modified.
let vnode = VirtualNode::from_index(left);
if let Some(watermark) = self.read_watermark(vnode, epoch) {
match self.watermark_direction {
WatermarkDirection::Ascending => {
let overwrite_start_key = match &key_range.0 {
Included(start_key) | Excluded(start_key) => {
start_key.key_part() < watermark
}
Unbounded => true,
) {
let vnode = vnode(key_range);
if let Some(watermark) = self.read_watermark(vnode, epoch) {
match self.watermark_direction {
WatermarkDirection::Ascending => {
let overwrite_start_key = match &key_range.0 {
Included(start_key) | Excluded(start_key) => {
start_key.key_part() < watermark
}
Unbounded => true,
};
if overwrite_start_key {
let watermark_key = TableKey(prefix_slice_with_vnode(vnode, &watermark));
let fully_filtered = match &key_range.1 {
Included(end_key) => end_key < &watermark_key,
Excluded(end_key) => end_key <= &watermark_key,
Unbounded => false,
};
if overwrite_start_key {
let watermark_key =
TableKey(prefix_slice_with_vnode(vnode, &watermark));
let fully_filtered = match &key_range.1 {
Included(end_key) => end_key < &watermark_key,
Excluded(end_key) => end_key <= &watermark_key,
Unbounded => false,
};
if fully_filtered {
key_range.1 = Excluded(watermark_key.clone());
}
key_range.0 = Included(watermark_key);
if fully_filtered {
key_range.1 = Excluded(watermark_key.clone());
}
key_range.0 = Included(watermark_key);
}
WatermarkDirection::Descending => {
let overwrite_end_key = match &key_range.1 {
Included(end_key) | Excluded(end_key) => end_key.key_part() > watermark,
Unbounded => true,
}
WatermarkDirection::Descending => {
let overwrite_end_key = match &key_range.1 {
Included(end_key) | Excluded(end_key) => end_key.key_part() > watermark,
Unbounded => true,
};
if overwrite_end_key {
let watermark_key = TableKey(prefix_slice_with_vnode(vnode, &watermark));
let fully_filtered = match &key_range.0 {
Included(start_key) => start_key > &watermark_key,
Excluded(start_key) => start_key >= &watermark_key,
Unbounded => false,
};
if overwrite_end_key {
let watermark_key =
TableKey(prefix_slice_with_vnode(vnode, &watermark));
let fully_filtered = match &key_range.0 {
Included(start_key) => start_key > &watermark_key,
Excluded(start_key) => start_key >= &watermark_key,
Unbounded => false,
};
if fully_filtered {
*key_range =
(Included(watermark_key.clone()), Excluded(watermark_key));
} else {
key_range.1 = Included(watermark_key);
}
if fully_filtered {
*key_range = (Included(watermark_key.clone()), Excluded(watermark_key));
} else {
key_range.1 = Included(watermark_key);
}
}
}
}
None
} else {
for i in left..right {
let vnode = VirtualNode::from_index(i);
if let Some(watermark) = self.read_watermark(vnode, epoch) {
assert!(ret.insert(vnode, watermark).is_none());
}
}
if ret.is_empty() {
None
} else {
Some(ReadTableWatermark {
direction: self.direction(),
vnode_watermarks: ret,
})
}
}
}

Expand Down Expand Up @@ -597,10 +572,7 @@ mod tests {
use risingwave_common::catalog::TableId;
use risingwave_common::hash::VirtualNode;

use crate::key::{
is_empty_key_range, map_table_key_range, prefix_slice_with_vnode,
prefixed_range_with_vnode, TableKeyRange,
};
use crate::key::{is_empty_key_range, prefixed_range_with_vnode, TableKeyRange};
use crate::table_watermark::{
merge_multiple_new_table_watermarks, TableWatermarks, TableWatermarksIndex, VnodeWatermark,
WatermarkDirection,
Expand Down Expand Up @@ -960,42 +932,12 @@ mod tests {
Some(watermark2.clone())
);

// test read from multiple vnodes
{
let range = map_table_key_range((
Included(prefix_slice_with_vnode(
VirtualNode::from_index(1),
b"begin",
)),
Excluded(prefix_slice_with_vnode(VirtualNode::from_index(2), b"end")),
));
let mut range_mut = range.clone();
let read_watermarks = index.range_watermarks(EPOCH2, &mut range_mut).unwrap();
assert_eq!(range_mut, range);
assert_eq!(direction, read_watermarks.direction);
assert_eq!(2, read_watermarks.vnode_watermarks.len());
assert_eq!(
&watermark2,
read_watermarks
.vnode_watermarks
.get(&VirtualNode::from_index(1))
.unwrap()
);
assert_eq!(
&watermark2,
read_watermarks
.vnode_watermarks
.get(&VirtualNode::from_index(2))
.unwrap()
);
}

// watermark is watermark2
let check_watermark_range =
|query_range: (Bound<Bytes>, Bound<Bytes>),
output_range: Option<(Bound<Bytes>, Bound<Bytes>)>| {
let mut range = build_watermark_range(direction, query_range);
assert!(index.range_watermarks(EPOCH2, &mut range).is_none());
index.rewrite_range_with_table_watermark(EPOCH2, &mut range);
if let Some(output_range) = output_range {
assert_eq!(range, build_watermark_range(direction, output_range));
} else {
Expand Down
26 changes: 20 additions & 6 deletions src/storage/hummock_test/src/compactor_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ pub(crate) mod tests {
use risingwave_common_service::observer_manager::NotificationClient;
use risingwave_hummock_sdk::can_concat;
use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
use risingwave_hummock_sdk::key::{next_key, FullKey, TableKey, TABLE_PREFIX_LEN};
use risingwave_hummock_sdk::key::{
next_key, prefix_slice_with_vnode, prefixed_range_with_vnode, FullKey, TableKey,
TABLE_PREFIX_LEN,
};
use risingwave_hummock_sdk::prost_key_range::KeyRangeExt;
use risingwave_hummock_sdk::table_stats::to_prost_table_stats_map;
use risingwave_hummock_sdk::version::HummockVersion;
Expand Down Expand Up @@ -144,7 +147,9 @@ pub(crate) mod tests {
value_size: usize,
epochs: Vec<u64>,
) {
let mut local = storage.new_local(Default::default()).await;
let mut local = storage
.new_local(NewLocalOptions::for_test(TableId::default()))
.await;
// 1. add sstables
let val = b"0"[..].repeat(value_size);
local.init_for_test(epochs[0]).await.unwrap();
Expand Down Expand Up @@ -718,6 +723,8 @@ pub(crate) mod tests {
StaticCompactionGroupId::StateDefault.into(),
)
.await;

let vnode = VirtualNode::from_index(1);
for index in 0..kv_count {
epoch += 1;
let next_epoch = epoch + 1;
Expand All @@ -734,7 +741,7 @@ pub(crate) mod tests {

let mut prefix = BytesMut::default();
let random_key = rand::thread_rng().gen::<[u8; 32]>();
prefix.put_u16(1);
prefix.extend_from_slice(&vnode.to_be_bytes());
prefix.put_slice(random_key.as_slice());

storage
Expand Down Expand Up @@ -840,7 +847,10 @@ pub(crate) mod tests {
// 7. scan kv to check key table_id
let scan_result = global_storage
.scan(
(Bound::Unbounded, Bound::Unbounded),
prefixed_range_with_vnode(
(Bound::<Bytes>::Unbounded, Bound::<Bytes>::Unbounded),
vnode,
),
epoch,
None,
ReadOptions {
Expand Down Expand Up @@ -910,6 +920,7 @@ pub(crate) mod tests {
let base_epoch = Epoch::now();
let mut epoch: u64 = base_epoch.0;
let millisec_interval_epoch: u64 = (1 << 16) * 100;
let vnode = VirtualNode::from_index(1);
let mut epoch_set = BTreeSet::new();

let mut local = storage
Expand All @@ -924,7 +935,7 @@ pub(crate) mod tests {
epoch_set.insert(epoch);
let mut prefix = BytesMut::default();
let random_key = rand::thread_rng().gen::<[u8; 32]>();
prefix.put_u16(1);
prefix.extend_from_slice(&vnode.to_be_bytes());
prefix.put_slice(random_key.as_slice());

local
Expand Down Expand Up @@ -1035,7 +1046,10 @@ pub(crate) mod tests {
// 6. scan kv to check key table_id
let scan_result = storage
.scan(
(Bound::Unbounded, Bound::Unbounded),
prefixed_range_with_vnode(
(Bound::<Bytes>::Unbounded, Bound::<Bytes>::Unbounded),
vnode,
),
epoch,
None,
ReadOptions {
Expand Down
4 changes: 3 additions & 1 deletion src/storage/hummock_test/src/failpoint_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ async fn test_failpoints_state_store_read_upload() {
.await
.unwrap();

let mut local = hummock_storage.new_local(NewLocalOptions::default()).await;
let mut local = hummock_storage
.new_local(NewLocalOptions::for_test(TableId::default()))
.await;

let anchor = gen_key_from_str(VirtualNode::ZERO, "aa");
let mut batch1 = vec![
Expand Down
Loading

0 comments on commit 3a59888

Please sign in to comment.