Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): replace hummock protobuf strcut with rust struct #15386

Merged
merged 28 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
1f30ff7
feat(storage): replace pb with struct
Li0k Mar 1, 2024
163ff20
fix(storage): fix check
Li0k Mar 4, 2024
e866deb
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Mar 4, 2024
940b233
refactor(storage): refactor and reduce copy
Li0k Mar 4, 2024
9a8ebbc
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Mar 4, 2024
afd7469
fix(storage): address comments
Li0k Mar 8, 2024
9f9707e
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Mar 8, 2024
3bf2bf3
fix(storage): fix compile
Li0k Mar 8, 2024
4f72cba
fix(storage): fix check
Li0k Mar 8, 2024
0a75e90
fix(storage): fix check
Li0k Mar 8, 2024
5672d61
refactor(storage): refactor version
Li0k May 7, 2024
1758269
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k May 7, 2024
480fc62
fix(storage): fix compile
Li0k May 9, 2024
5fff8ae
fix(storage): fix check
Li0k May 9, 2024
b912567
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k May 9, 2024
ebcc00b
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Jul 11, 2024
c5d4b7f
fix(storage): address comments
Li0k Jul 11, 2024
7a1fbb1
fix(storage): address check
Li0k Jul 12, 2024
705f7b0
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Jul 23, 2024
bf1addf
fix(storage): address comments
Li0k Jul 23, 2024
a22660b
fix(storage): fix compile
Li0k Jul 23, 2024
ee43cd3
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Jul 23, 2024
75d72d4
fix(storage): fix sync_point and bench code
Li0k Jul 23, 2024
2de6a48
fix(storage): reorg and address comments
Li0k Jul 25, 2024
7eae479
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Jul 25, 2024
6018f58
fix(storage): fix compile
Li0k Jul 25, 2024
ffc5ac6
fix(storage): fix test compile
Li0k Jul 25, 2024
66011da
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Jul 25, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions src/compute/src/rpc/service/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use await_tree::InstrumentAwait;
use itertools::Itertools;
use risingwave_hummock_sdk::table_stats::to_prost_table_stats_map;
use risingwave_hummock_sdk::LocalSstableInfo;
use risingwave_hummock_sdk::{LocalSstableInfo, ProtoSerializeOwnExt};
use risingwave_pb::stream_service::barrier_complete_response::GroupedSstableInfo;
use risingwave_pb::stream_service::stream_service_server::StreamService;
use risingwave_pb::stream_service::*;
Expand Down Expand Up @@ -177,15 +177,15 @@ impl StreamService for StreamServiceImpl {
table_stats,
}| GroupedSstableInfo {
compaction_group_id,
sst: Some(sst_info),
sst: Some(sst_info.to_protobuf_own()),
table_stats_map: to_prost_table_stats_map(table_stats),
},
)
.collect_vec(),
worker_id: self.env.worker_id(),
table_watermarks: table_watermarks
.into_iter()
.map(|(key, value)| (key.table_id, value.to_protobuf()))
.map(|(key, value)| (key.table_id, value.to_protobuf_own()))
.collect(),
}))
}
Expand Down
4 changes: 2 additions & 2 deletions src/ctl/src/cmd_impl/hummock/sst_dump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAw
use risingwave_common::util::value_encoding::{BasicSerde, EitherSerde, ValueRowDeserializer};
use risingwave_frontend::TableCatalog;
use risingwave_hummock_sdk::key::FullKey;
use risingwave_hummock_sdk::version::{Level, SstableInfo};
use risingwave_hummock_sdk::HummockSstableObjectId;
use risingwave_object_store::object::{ObjectMetadata, ObjectStoreImpl};
use risingwave_pb::hummock::{Level, SstableInfo};
use risingwave_rpc_client::MetaClient;
use risingwave_storage::hummock::value::HummockValue;
use risingwave_storage::hummock::{
Expand Down Expand Up @@ -156,7 +156,7 @@ pub async fn sst_dump(context: &CtlContext, args: SstDumpArgs) -> anyhow::Result
}

fn print_level(level: &Level, sst_info: &SstableInfo) {
println!("Level Type: {}", level.level_type);
println!("Level Type: {}", level.level_type.as_str_name());
println!("Level Idx: {}", level.level_idx);
if level.level_idx == 0 {
println!("L0 Sub-Level Idx: {}", level.sub_level_id);
Expand Down
7 changes: 3 additions & 4 deletions src/ctl/src/cmd_impl/hummock/validate_version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ use itertools::Itertools;
use risingwave_common::util::epoch::Epoch;
use risingwave_hummock_sdk::compaction_group::hummock_version_ext;
use risingwave_hummock_sdk::key::{FullKey, UserKey};
use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta, SstableInfo};
use risingwave_hummock_sdk::{version_archive_dir, HummockSstableObjectId, HummockVersionId};
use risingwave_object_store::object::ObjectStoreRef;
use risingwave_pb::hummock::group_delta::DeltaType;
use risingwave_pb::hummock::{HummockVersionArchive, SstableInfo};
use risingwave_pb::hummock::HummockVersionArchive;
use risingwave_rpc_client::HummockMetaClient;
use risingwave_storage::hummock::value::HummockValue;
use risingwave_storage::hummock::{Block, BlockHolder, BlockIterator, SstableStoreRef};
Expand Down Expand Up @@ -107,8 +107,7 @@ async fn print_user_key_in_version(
.chain(cg.levels.iter())
{
for sstable_info in &level.table_infos {
use risingwave_hummock_sdk::key_range::KeyRange;
let key_range: KeyRange = sstable_info.key_range.as_ref().unwrap().into();
let key_range = sstable_info.key_range.as_ref().unwrap();
let left_user_key = FullKey::decode(&key_range.left);
let right_user_key = FullKey::decode(&key_range.right);
if left_user_key.user_key > *target_key || *target_key > right_user_key.user_key {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,8 @@ fn version_to_sstable_rows(version: HummockVersion) -> Vec<RwHummockSstable> {
level_id: level.level_idx as _,
sub_level_id: (level.level_idx > 0).then_some(level.sub_level_id as _),
level_type: level.level_type as _,
key_range_left: key_range.left,
key_range_right: key_range.right,
key_range_left: key_range.left.to_vec(),
key_range_right: key_range.right.to_vec(),
right_exclusive: key_range.right_exclusive,
file_size: sst.file_size as _,
meta_offset: sst.meta_offset as _,
Expand Down
1 change: 1 addition & 0 deletions src/meta/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ normal = ["workspace-hack"]
[dependencies]
anyhow = "1"
async-trait = "0.1"
bytes = { version = "1" }
either = "1"
futures = { version = "0.3", default-features = false, features = ["alloc"] }
itertools = "0.12"
Expand Down
11 changes: 9 additions & 2 deletions src/meta/service/src/hummock_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ use std::time::Duration;
use futures::StreamExt;
use itertools::Itertools;
use risingwave_common::catalog::{TableId, NON_RESERVED_SYS_CATALOG_ID};
use risingwave_hummock_sdk::key_range::KeyRange;
use risingwave_hummock_sdk::version::HummockVersionDelta;
use risingwave_hummock_sdk::ProtoSerializeExt;
use risingwave_meta::manager::MetadataManager;
use risingwave_pb::hummock::get_compaction_score_response::PickerInfo;
use risingwave_pb::hummock::hummock_manager_service_server::HummockManagerService;
Expand Down Expand Up @@ -224,6 +226,7 @@ impl HummockManagerService for HummockServiceImpl {
&self,
request: Request<TriggerManualCompactionRequest>,
) -> Result<Response<TriggerManualCompactionResponse>, Status> {
use bytes::Bytes;
let request = request.into_inner();
let compaction_group_id = request.compaction_group_id;
let mut option = ManualCompactionOption {
Expand All @@ -234,8 +237,12 @@ impl HummockManagerService for HummockServiceImpl {

// rewrite the key_range
match request.key_range {
Some(key_range) => {
option.key_range = key_range;
Some(pb_key_range) => {
option.key_range = KeyRange {
left: Bytes::from(pb_key_range.left),
Li0k marked this conversation as resolved.
Show resolved Hide resolved
right: Bytes::from(pb_key_range.right),
right_exclusive: pb_key_range.right_exclusive,
};
}

None => {
Expand Down
1 change: 1 addition & 0 deletions src/meta/service/src/notification_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use itertools::Itertools;
use risingwave_hummock_sdk::ProtoSerializeExt;
use risingwave_meta::manager::MetadataManager;
use risingwave_meta::MetaResult;
use risingwave_pb::backup_service::MetaBackupManifestId;
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/backup_restore/restore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use risingwave_backup::storage::{MetaSnapshotStorage, MetaSnapshotStorageRef};
use risingwave_backup::MetaSnapshotId;
use risingwave_common::config::{MetaBackend, ObjectStoreConfig};
use risingwave_hummock_sdk::version::HummockVersion;
use risingwave_hummock_sdk::version_checkpoint_path;
use risingwave_hummock_sdk::{version_checkpoint_path, ProtoSerializeExt};
use risingwave_object_store::object::build_remote_object_store;
use risingwave_object_store::object::object_metrics::ObjectStoreMetrics;
use risingwave_pb::hummock::PbHummockVersionCheckpoint;
Expand Down
7 changes: 5 additions & 2 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ use risingwave_common::{bail, must_match};
use risingwave_hummock_sdk::table_watermark::{
merge_multiple_new_table_watermarks, TableWatermarks,
};
use risingwave_hummock_sdk::{ExtendedSstableInfo, HummockSstableObjectId};
use risingwave_hummock_sdk::version::SstableInfo;
use risingwave_hummock_sdk::{
ExtendedSstableInfo, HummockSstableObjectId, ProtoSerializeExt, ProtoSerializeOwnExt,
};
use risingwave_pb::catalog::table::TableType;
use risingwave_pb::common::WorkerNode;
use risingwave_pb::ddl_service::DdlProgress;
Expand Down Expand Up @@ -1092,7 +1095,7 @@ fn collect_commit_epoch_info(
sst_to_worker.insert(sst_info.get_object_id(), resp.worker_id);
ExtendedSstableInfo::new(
grouped.compaction_group_id,
sst_info,
SstableInfo::from_protobuf_own(sst_info),
grouped.table_stats_map,
)
});
Expand Down
4 changes: 2 additions & 2 deletions src/meta/src/hummock/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
pub mod compaction_config;
mod overlap_strategy;
use risingwave_common::catalog::TableOption;
use risingwave_hummock_sdk::version::{CompactTask, Levels};
use risingwave_pb::hummock::compact_task::{self, TaskType};

mod picker;
Expand All @@ -28,8 +29,7 @@ use std::sync::Arc;
use picker::{LevelCompactionPicker, TierCompactionPicker};
use risingwave_hummock_sdk::{can_concat, CompactionGroupId, HummockCompactionTaskId};
use risingwave_pb::hummock::compaction_config::CompactionMode;
use risingwave_pb::hummock::hummock_version::Levels;
use risingwave_pb::hummock::{CompactTask, CompactionConfig, LevelType};
use risingwave_pb::hummock::{CompactionConfig, LevelType};
pub use selector::CompactionSelector;

use self::selector::LocalSelectorStatistic;
Expand Down
4 changes: 2 additions & 2 deletions src/meta/src/hummock/compaction/overlap_strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ use std::fmt::Debug;
use std::ops::Range;

use itertools::Itertools;
use risingwave_hummock_sdk::key_range::KeyRangeCommon;
use risingwave_hummock_sdk::key_range::{KeyRange, KeyRangeCommon};
use risingwave_hummock_sdk::version::SstableInfo;
use risingwave_hummock_sdk::KeyComparator;
use risingwave_pb::hummock::{KeyRange, SstableInfo};

pub trait OverlapInfo: Debug {
fn check_overlap(&self, a: &SstableInfo) -> bool;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ use std::sync::Arc;

use itertools::Itertools;
use risingwave_hummock_sdk::compaction_group::hummock_version_ext::HummockLevelsExt;
use risingwave_pb::hummock::hummock_version::Levels;
use risingwave_pb::hummock::{CompactionConfig, InputLevel, Level, LevelType, OverlappingLevel};
use risingwave_hummock_sdk::version::{InputLevel, Level, Levels, OverlappingLevel};
use risingwave_pb::hummock::{CompactionConfig, LevelType};

use super::min_overlap_compaction_picker::NonOverlapSubLevelPicker;
use super::{
Expand Down Expand Up @@ -46,7 +46,7 @@ impl CompactionPicker for LevelCompactionPicker {
if l0.sub_levels.is_empty() {
return None;
}
if l0.sub_levels[0].level_type != LevelType::Nonoverlapping as i32
if l0.sub_levels[0].level_type != LevelType::Nonoverlapping
&& l0.sub_levels[0].table_infos.len() > 1
{
stats.skip_by_overlapping += 1;
Expand Down Expand Up @@ -225,7 +225,7 @@ impl LevelCompactionPicker {
.into_iter()
.map(|table_infos| InputLevel {
level_idx: 0,
level_type: LevelType::Nonoverlapping as i32,
level_type: LevelType::Nonoverlapping,
table_infos,
})
.collect_vec();
Expand Down Expand Up @@ -393,7 +393,7 @@ pub mod tests {

let levels = vec![Level {
level_idx: 1,
level_type: LevelType::Nonoverlapping as i32,
level_type: LevelType::Nonoverlapping,
table_infos: vec![
generate_table(3, 1, 0, 50, 1),
generate_table(4, 1, 150, 200, 1),
Expand Down Expand Up @@ -457,7 +457,7 @@ pub mod tests {
let mut picker = create_compaction_picker_for_test();
let levels = vec![Level {
level_idx: 1,
level_type: LevelType::Nonoverlapping as i32,
level_type: LevelType::Nonoverlapping,
table_infos: vec![],
total_file_size: 0,
sub_level_id: 0,
Expand Down Expand Up @@ -522,7 +522,7 @@ pub mod tests {
let mut levels = Levels {
levels: vec![Level {
level_idx: 1,
level_type: LevelType::Nonoverlapping as i32,
level_type: LevelType::Nonoverlapping,
table_infos: vec![
generate_table(1, 1, 100, 399, 2),
generate_table(2, 1, 400, 699, 2),
Expand Down Expand Up @@ -570,7 +570,7 @@ pub mod tests {
]);
// We can set level_type only because the input above is valid.
for s in &mut l0.sub_levels {
s.level_type = LevelType::Nonoverlapping as i32;
s.level_type = LevelType::Nonoverlapping;
}
let levels = Levels {
l0: Some(l0),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use std::sync::Arc;

use risingwave_pb::hummock::hummock_version::Levels;
use risingwave_hummock_sdk::version::Levels;
use risingwave_pb::hummock::{CompactionConfig, LevelType};

use super::{
Expand Down Expand Up @@ -55,22 +55,20 @@ impl EmergencyCompactionPicker {
let overlapping_count = l0
.sub_levels
.iter()
.filter(|level| level.level_type == LevelType::Overlapping as i32)
.filter(|level| level.level_type == LevelType::Overlapping)
.count();
let no_overlap_count = l0
.sub_levels
.iter()
.filter(|level| {
level.level_type == LevelType::Nonoverlapping as i32
&& level.vnode_partition_count == 0
level.level_type == LevelType::Nonoverlapping && level.vnode_partition_count == 0
})
.count();
let partitioned_count = l0
.sub_levels
.iter()
.filter(|level| {
level.level_type == LevelType::Nonoverlapping as i32
&& level.vnode_partition_count > 0
level.level_type == LevelType::Nonoverlapping && level.vnode_partition_count > 0
})
.count();
// We trigger `EmergencyCompactionPicker` only when some unexpected condition cause the number of l0 levels increase and the origin strategy
Expand Down
Loading
Loading