Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): replace hummock protobuf strcut with rust struct #15386

Merged
merged 28 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
1f30ff7
feat(storage): replace pb with struct
Li0k Mar 1, 2024
163ff20
fix(storage): fix check
Li0k Mar 4, 2024
e866deb
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Mar 4, 2024
940b233
refactor(storage): refactor and reduce copy
Li0k Mar 4, 2024
9a8ebbc
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Mar 4, 2024
afd7469
fix(storage): address comments
Li0k Mar 8, 2024
9f9707e
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Mar 8, 2024
3bf2bf3
fix(storage): fix compile
Li0k Mar 8, 2024
4f72cba
fix(storage): fix check
Li0k Mar 8, 2024
0a75e90
fix(storage): fix check
Li0k Mar 8, 2024
5672d61
refactor(storage): refactor version
Li0k May 7, 2024
1758269
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k May 7, 2024
480fc62
fix(storage): fix compile
Li0k May 9, 2024
5fff8ae
fix(storage): fix check
Li0k May 9, 2024
b912567
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k May 9, 2024
ebcc00b
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Jul 11, 2024
c5d4b7f
fix(storage): address comments
Li0k Jul 11, 2024
7a1fbb1
fix(storage): address check
Li0k Jul 12, 2024
705f7b0
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Jul 23, 2024
bf1addf
fix(storage): address comments
Li0k Jul 23, 2024
a22660b
fix(storage): fix compile
Li0k Jul 23, 2024
ee43cd3
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Jul 23, 2024
75d72d4
fix(storage): fix sync_point and bench code
Li0k Jul 23, 2024
2de6a48
fix(storage): reorg and address comments
Li0k Jul 25, 2024
7eae479
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Jul 25, 2024
6018f58
fix(storage): fix compile
Li0k Jul 25, 2024
ffc5ac6
fix(storage): fix test compile
Li0k Jul 25, 2024
66011da
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Jul 25, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions src/ctl/src/cmd_impl/hummock/list_version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ pub async fn list_version(
let l0 = levels.l0.as_mut().unwrap();
for sub_level in &mut l0.sub_levels {
for t in &mut sub_level.table_infos {
t.key_range = None;
t.remove_key_range();
}
}
}

// l1 ~ lmax
for level in &mut levels.levels {
for t in &mut level.table_infos {
t.key_range = None;
t.remove_key_range();
}
}
});
Expand All @@ -63,18 +63,18 @@ pub async fn list_version(
println!(
"sub_level_id {} type {} sst_num {} size {}",
sub_level.sub_level_id,
sub_level.level_type().as_str_name(),
sub_level.level_type.as_str_name(),
sub_level.table_infos.len(),
sub_level.total_file_size
)
}
}

for level in levels.get_levels() {
for level in &levels.levels {
println!(
"level_idx {} type {} sst_num {} size {}",
level.level_idx,
level.level_type().as_str_name(),
level.level_type.as_str_name(),
level.table_infos.len(),
level.total_file_size
)
Expand Down
10 changes: 5 additions & 5 deletions src/ctl/src/cmd_impl/hummock/sst_dump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAw
use risingwave_common::util::value_encoding::{BasicSerde, EitherSerde, ValueRowDeserializer};
use risingwave_frontend::TableCatalog;
use risingwave_hummock_sdk::key::FullKey;
use risingwave_hummock_sdk::version::{Level, SstableInfo};
use risingwave_hummock_sdk::HummockSstableObjectId;
use risingwave_object_store::object::{ObjectMetadata, ObjectStoreImpl};
use risingwave_pb::hummock::{Level, SstableInfo};
use risingwave_rpc_client::MetaClient;
use risingwave_storage::hummock::value::HummockValue;
use risingwave_storage::hummock::{
Expand Down Expand Up @@ -83,11 +83,11 @@ pub async fn sst_dump(context: &CtlContext, args: SstDumpArgs) -> anyhow::Result
for level in version.get_combined_levels() {
for sstable_info in &level.table_infos {
if let Some(object_id) = &args.object_id {
if *object_id == sstable_info.get_object_id() {
if *object_id == sstable_info.object_id {
print_level(level, sstable_info);
sst_dump_via_sstable_store(
&sstable_store,
sstable_info.get_object_id(),
sstable_info.object_id,
sstable_info.meta_offset,
sstable_info.file_size,
&table_data,
Expand All @@ -100,7 +100,7 @@ pub async fn sst_dump(context: &CtlContext, args: SstDumpArgs) -> anyhow::Result
print_level(level, sstable_info);
sst_dump_via_sstable_store(
&sstable_store,
sstable_info.get_object_id(),
sstable_info.object_id,
sstable_info.meta_offset,
sstable_info.file_size,
&table_data,
Expand Down Expand Up @@ -161,7 +161,7 @@ pub async fn sst_dump(context: &CtlContext, args: SstDumpArgs) -> anyhow::Result
}

fn print_level(level: &Level, sst_info: &SstableInfo) {
println!("Level Type: {}", level.level_type);
println!("Level Type: {}", level.level_type.as_str_name());
println!("Level Idx: {}", level.level_idx);
if level.level_idx == 0 {
println!("L0 Sub-Level Idx: {}", level.sub_level_id);
Expand Down
7 changes: 3 additions & 4 deletions src/ctl/src/cmd_impl/hummock/validate_version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ use itertools::Itertools;
use risingwave_common::util::epoch::Epoch;
use risingwave_hummock_sdk::compaction_group::hummock_version_ext;
use risingwave_hummock_sdk::key::{FullKey, UserKey};
use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta, SstableInfo};
use risingwave_hummock_sdk::{version_archive_dir, HummockSstableObjectId, HummockVersionId};
use risingwave_object_store::object::ObjectStoreRef;
use risingwave_pb::hummock::group_delta::DeltaType;
use risingwave_pb::hummock::{HummockVersionArchive, SstableInfo};
use risingwave_pb::hummock::HummockVersionArchive;
use risingwave_rpc_client::HummockMetaClient;
use risingwave_storage::hummock::value::HummockValue;
use risingwave_storage::hummock::{Block, BlockHolder, BlockIterator, SstableStoreRef};
Expand Down Expand Up @@ -109,8 +109,7 @@ async fn print_user_key_in_version(
.chain(cg.levels.iter())
{
for sstable_info in &level.table_infos {
use risingwave_hummock_sdk::key_range::KeyRange;
let key_range: KeyRange = sstable_info.key_range.as_ref().unwrap().into();
let key_range = &sstable_info.key_range;
let left_user_key = FullKey::decode(&key_range.left);
let right_user_key = FullKey::decode(&key_range.right);
if left_user_key.user_key > *target_key || *target_key > right_user_key.user_key {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ fn remove_key_range_from_version(mut version: HummockVersion) -> HummockVersion
.chain(cg.l0.as_mut().unwrap().sub_levels.iter_mut())
{
for sst in &mut level.table_infos {
sst.key_range.take();
sst.remove_key_range();
}
}
}
Expand All @@ -107,7 +107,7 @@ fn version_to_compaction_group_rows(version: &HummockVersion) -> Vec<RwHummockVe
version_id: version.id as _,
max_committed_epoch: version.max_committed_epoch as _,
safe_epoch: version.visible_table_safe_epoch() as _,
compaction_group: json!(cg).into(),
compaction_group: json!(cg.to_protobuf()).into(),
})
.collect()
}
Expand All @@ -117,16 +117,16 @@ fn version_to_sstable_rows(version: HummockVersion) -> Vec<RwHummockSstable> {
for cg in version.levels.into_values() {
for level in cg.levels.into_iter().chain(cg.l0.unwrap().sub_levels) {
for sst in level.table_infos {
let key_range = sst.key_range.unwrap();
let key_range = sst.key_range;
sstables.push(RwHummockSstable {
sstable_id: sst.sst_id as _,
object_id: sst.object_id as _,
compaction_group_id: cg.group_id as _,
level_id: level.level_idx as _,
sub_level_id: (level.level_idx == 0).then_some(level.sub_level_id as _),
level_type: level.level_type as _,
key_range_left: key_range.left,
key_range_right: key_range.right,
key_range_left: key_range.left.to_vec(),
key_range_right: key_range.right.to_vec(),
right_exclusive: key_range.right_exclusive,
file_size: sst.file_size as _,
meta_offset: sst.meta_offset as _,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;

use risingwave_common::types::{Fields, JsonbVal};
use risingwave_frontend_macro::system_catalog;
use risingwave_pb::hummock::hummock_version_delta::PbGroupDeltas;
use serde_json::json;

use crate::catalog::system_catalog::SysCatalogReaderImpl;
Expand Down Expand Up @@ -41,7 +44,12 @@ async fn read(reader: &SysCatalogReaderImpl) -> Result<Vec<RwHummockVersionDelta
max_committed_epoch: d.max_committed_epoch as _,
safe_epoch: d.visible_table_safe_epoch() as _,
trivial_move: d.trivial_move,
group_deltas: json!(d.group_deltas).into(),
group_deltas: json!(d
.group_deltas
.into_iter()
.map(|(group_id, deltas)| (group_id, PbGroupDeltas::from(deltas)))
.collect::<HashMap<u64, PbGroupDeltas>>())
.into(),
})
.collect();
Ok(rows)
Expand Down
5 changes: 3 additions & 2 deletions src/meta/model_v2/src/hummock_sstable_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// use risingwave_hummock_sdk::version::SstableInfo;
Li0k marked this conversation as resolved.
Show resolved Hide resolved
use risingwave_pb::hummock::PbSstableInfo;
use sea_orm::entity::prelude::*;
use sea_orm::{DeriveEntityModel, DeriveRelation, EnumIter};
Expand All @@ -24,12 +25,12 @@ pub struct Model {
#[sea_orm(primary_key, auto_increment = false)]
pub sst_id: HummockSstableObjectId,
pub object_id: HummockSstableObjectId,
pub sstable_info: SstableInfo,
pub sstable_info: SstableInfoV2Backend,
}

impl ActiveModelBehavior for ActiveModel {}

#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}

crate::derive_from_blob!(SstableInfo, PbSstableInfo);
crate::derive_from_blob!(SstableInfoV2Backend, PbSstableInfo);
2 changes: 1 addition & 1 deletion src/meta/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ async-trait = "0.1"
either = "1"
futures = { version = "0.3", default-features = false, features = ["alloc"] }
itertools = { workspace = true }
prost ={ workspace = true }
prost = { workspace = true }
rand = { workspace = true }
regex = "1"
risingwave_common = { workspace = true }
Expand Down
29 changes: 19 additions & 10 deletions src/meta/service/src/hummock_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
use std::collections::{HashMap, HashSet};
use std::time::Duration;

use compact_task::PbTaskStatus;
use futures::StreamExt;
use itertools::Itertools;
use risingwave_common::catalog::{TableId, SYS_CATALOG_START_ID};
use risingwave_hummock_sdk::key_range::KeyRange;
use risingwave_hummock_sdk::version::HummockVersionDelta;
use risingwave_meta::manager::MetadataManager;
use risingwave_pb::hummock::get_compaction_score_response::PickerInfo;
Expand Down Expand Up @@ -85,7 +87,7 @@ impl HummockManagerService for HummockServiceImpl {
let current_version = self.hummock_manager.get_current_version().await;
Ok(Response::new(GetCurrentVersionResponse {
status: None,
current_version: Some(current_version.to_protobuf()),
current_version: Some(current_version.into()),
}))
}

Expand All @@ -101,7 +103,7 @@ impl HummockManagerService for HummockServiceImpl {
))
.await?;
Ok(Response::new(ReplayVersionDeltaResponse {
version: Some(version.to_protobuf()),
version: Some(version.into()),
modified_compaction_groups: compaction_groups,
}))
}
Expand All @@ -123,7 +125,7 @@ impl HummockManagerService for HummockServiceImpl {
) -> Result<Response<DisableCommitEpochResponse>, Status> {
let version = self.hummock_manager.disable_commit_epoch().await;
Ok(Response::new(DisableCommitEpochResponse {
current_version: Some(version.to_protobuf()),
current_version: Some(version.into()),
}))
}

Expand All @@ -139,8 +141,8 @@ impl HummockManagerService for HummockServiceImpl {
let resp = ListVersionDeltasResponse {
version_deltas: Some(PbHummockVersionDeltas {
version_deltas: version_deltas
.iter()
.map(HummockVersionDelta::to_protobuf)
.into_iter()
.map(HummockVersionDelta::into)
.collect(),
}),
};
Expand Down Expand Up @@ -234,8 +236,12 @@ impl HummockManagerService for HummockServiceImpl {

// rewrite the key_range
match request.key_range {
Some(key_range) => {
option.key_range = key_range;
Some(pb_key_range) => {
option.key_range = KeyRange {
left: pb_key_range.left.into(),
right: pb_key_range.right.into(),
right_exclusive: pb_key_range.right_exclusive,
};
}

None => {
Expand Down Expand Up @@ -426,7 +432,7 @@ impl HummockManagerService for HummockServiceImpl {
let req = request.into_inner();
let version = self.hummock_manager.pin_version(req.context_id).await?;
Ok(Response::new(PinVersionResponse {
pinned_version: Some(version.to_protobuf()),
pinned_version: Some(version.into()),
}))
}

Expand Down Expand Up @@ -464,7 +470,7 @@ impl HummockManagerService for HummockServiceImpl {
) -> Result<Response<RiseCtlGetCheckpointVersionResponse>, Status> {
let checkpoint_version = self.hummock_manager.get_checkpoint_version().await;
Ok(Response::new(RiseCtlGetCheckpointVersionResponse {
checkpoint_version: Some(checkpoint_version.to_protobuf()),
checkpoint_version: Some(checkpoint_version.into()),
}))
}

Expand Down Expand Up @@ -660,7 +666,10 @@ impl HummockManagerService for HummockServiceImpl {
let request = request.into_inner();
let ret = self
.hummock_manager
.cancel_compact_task(request.task_id, request.task_status())
.cancel_compact_task(
request.task_id,
PbTaskStatus::try_from(request.task_status).unwrap(),
)
.await?;

let response = Response::new(CancelCompactTaskResponse { ret });
Expand Down
2 changes: 1 addition & 1 deletion src/meta/service/src/notification_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ impl NotificationServiceImpl {

Ok(MetaSnapshot {
tables,
hummock_version: Some(hummock_version.to_protobuf()),
hummock_version: Some(hummock_version.into()),
version: Some(SnapshotVersion {
catalog_version,
..Default::default()
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/backup_restore/restore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ async fn restore_hummock_version(
);
let checkpoint_path = version_checkpoint_path(hummock_storage_directory);
let checkpoint = PbHummockVersionCheckpoint {
version: Some(hummock_version.to_protobuf()),
version: Some(hummock_version.into()),
// Ignore stale objects. Full GC will clear them.
stale_objects: Default::default(),
};
Expand Down
11 changes: 4 additions & 7 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1160,15 +1160,15 @@ fn collect_commit_epoch_info(
for resp in resps {
let ssts_iter = resp.synced_sstables.into_iter().map(|grouped| {
let sst_info = grouped.sst.expect("field not None");
sst_to_worker.insert(sst_info.get_object_id(), resp.worker_id);
sst_to_worker.insert(sst_info.object_id, resp.worker_id);
LocalSstableInfo::new(
sst_info,
sst_info.into(),
from_prost_table_stats_map(grouped.table_stats_map),
)
});
synced_ssts.extend(ssts_iter);
table_watermarks.push(resp.table_watermarks);
old_value_ssts.extend(resp.old_value_sstables);
old_value_ssts.extend(resp.old_value_sstables.into_iter().map(|s| s.into()));
}
let new_table_fragment_info =
if let Command::CreateStreamingJob { info, .. } = &command_ctx.command {
Expand Down Expand Up @@ -1215,10 +1215,7 @@ fn collect_commit_epoch_info(
watermarks
.into_iter()
.map(|(table_id, watermarks)| {
(
TableId::new(table_id),
TableWatermarks::from_protobuf(&watermarks),
)
(TableId::new(table_id), TableWatermarks::from(&watermarks))
})
.collect()
})
Expand Down
12 changes: 5 additions & 7 deletions src/meta/src/hummock/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
pub mod compaction_config;
mod overlap_strategy;
use risingwave_common::catalog::{TableId, TableOption};
use risingwave_hummock_sdk::version::{CompactTask, Levels};
use risingwave_pb::hummock::compact_task::{self, TaskType};

mod picker;
Expand All @@ -30,8 +31,7 @@ use risingwave_hummock_sdk::table_watermark::TableWatermarks;
use risingwave_hummock_sdk::version::HummockVersionStateTableInfo;
use risingwave_hummock_sdk::{CompactionGroupId, HummockCompactionTaskId};
use risingwave_pb::hummock::compaction_config::CompactionMode;
use risingwave_pb::hummock::hummock_version::Levels;
use risingwave_pb::hummock::{CompactTask, CompactionConfig, LevelType};
use risingwave_pb::hummock::{CompactionConfig, LevelType};
pub use selector::{CompactionSelector, CompactionSelectorContext};

use self::selector::{EmergencySelector, LocalSelectorStatistic};
Expand Down Expand Up @@ -145,12 +145,11 @@ impl CompactStatus {
}

pub fn is_trivial_move_task(task: &CompactTask) -> bool {
if task.task_type() != TaskType::Dynamic && task.task_type() != TaskType::Emergency {
if task.task_type != TaskType::Dynamic && task.task_type != TaskType::Emergency {
return false;
}

if task.input_ssts.len() != 2
|| task.input_ssts[0].level_type() != LevelType::Nonoverlapping
if task.input_ssts.len() != 2 || task.input_ssts[0].level_type != LevelType::Nonoverlapping
{
return false;
}
Expand All @@ -173,7 +172,7 @@ impl CompactStatus {

pub fn is_trivial_reclaim(task: &CompactTask) -> bool {
// Currently all VnodeWatermark tasks are trivial reclaim.
if task.task_type() == TaskType::VnodeWatermark {
if task.task_type == TaskType::VnodeWatermark {
return true;
}
let exist_table_ids = HashSet::<u32>::from_iter(task.existing_table_ids.clone());
Expand All @@ -186,7 +185,6 @@ impl CompactStatus {
})
}

/// Declares a task as either succeeded, failed or canceled.
pub fn report_compact_task(&mut self, compact_task: &CompactTask) {
for level in &compact_task.input_ssts {
self.level_handlers[level.level_idx as usize].remove_task(compact_task.task_id);
Expand Down
Loading
Loading