diff --git a/Cargo.lock b/Cargo.lock index 0c8b66655c0ec..624c7ff361eee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8714,6 +8714,7 @@ dependencies = [ "risingwave_connector", "risingwave_expr", "risingwave_expr_impl", + "risingwave_hummock_sdk", "risingwave_pb", "risingwave_rpc_client", "risingwave_source", @@ -8744,6 +8745,7 @@ dependencies = [ "hex", "itertools 0.12.0", "parse-display", + "prost 0.12.1", "risingwave_common", "risingwave_pb", "tracing", @@ -8945,6 +8947,7 @@ dependencies = [ name = "risingwave_meta_model_v2" version = "1.5.0-alpha" dependencies = [ + "risingwave_hummock_sdk", "risingwave_pb", "sea-orm", "serde", @@ -8996,6 +8999,7 @@ dependencies = [ "regex", "risingwave_common", "risingwave_connector", + "risingwave_hummock_sdk", "risingwave_meta", "risingwave_meta_model_v2", "risingwave_pb", diff --git a/src/ctl/src/cmd_impl/hummock/list_version_deltas.rs b/src/ctl/src/cmd_impl/hummock/list_version_deltas.rs index 7e56078e77ed2..02a85a94953c0 100644 --- a/src/ctl/src/cmd_impl/hummock/list_version_deltas.rs +++ b/src/ctl/src/cmd_impl/hummock/list_version_deltas.rs @@ -25,6 +25,6 @@ pub async fn list_version_deltas( let resp = meta_client .list_version_deltas(start_id, num_epochs, HummockEpoch::MAX) .await?; - println!("{:#?}", resp.version_deltas); + println!("{:#?}", resp); Ok(()) } diff --git a/src/ctl/src/cmd_impl/hummock/pause_resume.rs b/src/ctl/src/cmd_impl/hummock/pause_resume.rs index d599ce2327861..bc63144547c9a 100644 --- a/src/ctl/src/cmd_impl/hummock/pause_resume.rs +++ b/src/ctl/src/cmd_impl/hummock/pause_resume.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use risingwave_hummock_sdk::compaction_group::hummock_version_ext::HummockVersionUpdateExt; +use risingwave_hummock_sdk::version::HummockVersion; use risingwave_hummock_sdk::HummockEpoch; use crate::CtlContext; @@ -51,11 +51,13 @@ pub async fn resume_version_checkpoint(context: &CtlContext) -> anyhow::Result<( /// added/removed for what reason (row deletion/compaction/etc.). pub async fn replay_version(context: &CtlContext) -> anyhow::Result<()> { let meta_client = context.meta_client().await?; - let mut base_version = meta_client - .risectl_get_checkpoint_hummock_version() - .await? - .checkpoint_version - .unwrap(); + let mut base_version = HummockVersion::from_rpc_protobuf( + &meta_client + .risectl_get_checkpoint_hummock_version() + .await? + .checkpoint_version + .unwrap(), + ); println!("replay starts"); println!("base version {}", base_version.id); let delta_fetch_size = 100; @@ -65,10 +67,10 @@ pub async fn replay_version(context: &CtlContext) -> anyhow::Result<()> { .list_version_deltas(current_delta_id, delta_fetch_size, HummockEpoch::MAX) .await .unwrap(); - if deltas.version_deltas.is_empty() { + if deltas.is_empty() { break; } - for delta in deltas.version_deltas { + for delta in deltas { if delta.prev_id != base_version.id { eprintln!("missing delta log for version {}", base_version.id); break; diff --git a/src/ctl/src/cmd_impl/hummock/sst_dump.rs b/src/ctl/src/cmd_impl/hummock/sst_dump.rs index 7ed529ec02834..3843dd65f4ad5 100644 --- a/src/ctl/src/cmd_impl/hummock/sst_dump.rs +++ b/src/ctl/src/cmd_impl/hummock/sst_dump.rs @@ -28,7 +28,6 @@ use risingwave_common::util::iter_util::ZipEqFast; use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde; use risingwave_common::util::value_encoding::{BasicSerde, EitherSerde, ValueRowDeserializer}; use risingwave_frontend::TableCatalog; -use risingwave_hummock_sdk::compaction_group::hummock_version_ext::HummockVersionExt; use risingwave_hummock_sdk::key::FullKey; use risingwave_hummock_sdk::HummockSstableObjectId; use risingwave_object_store::object::{ObjectMetadata, ObjectStoreImpl}; @@ -75,7 +74,7 @@ pub async fn sst_dump(context: &CtlContext, args: SstDumpArgs) -> anyhow::Result let hummock = context .hummock_store(HummockServiceOpts::from_env(args.data_dir.clone())?) .await?; - let version = hummock.inner().get_pinned_version().version(); + let version = hummock.inner().get_pinned_version().version().clone(); let sstable_store = hummock.sstable_store(); for level in version.get_combined_levels() { for sstable_info in &level.table_infos { diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index 73df1156cd035..e279ccb38ba69 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -56,6 +56,7 @@ risingwave_common = { workspace = true } risingwave_common_service = { workspace = true } risingwave_connector = { workspace = true } risingwave_expr = { workspace = true } +risingwave_hummock_sdk = { workspace = true } risingwave_pb = { workspace = true } risingwave_rpc_client = { workspace = true } risingwave_source = { workspace = true } diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_hummock_version.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_hummock_version.rs index 97269341d59f3..0c62a61cab738 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_hummock_version.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_hummock_version.rs @@ -16,7 +16,7 @@ use risingwave_common::catalog::RW_CATALOG_SCHEMA_NAME; use risingwave_common::error::Result; use risingwave_common::row::OwnedRow; use risingwave_common::types::{DataType, ScalarImpl}; -use risingwave_pb::hummock::HummockVersion; +use risingwave_hummock_sdk::version::HummockVersion; use serde_json::json; use crate::catalog::system_catalog::{BuiltinTable, SysCatalogReaderImpl}; diff --git a/src/frontend/src/meta_client.rs b/src/frontend/src/meta_client.rs index ea382a03e2c5a..c7753dd20e01a 100644 --- a/src/frontend/src/meta_client.rs +++ b/src/frontend/src/meta_client.rs @@ -15,6 +15,7 @@ use std::collections::HashMap; use risingwave_common::system_param::reader::SystemParamsReader; +use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta}; use risingwave_pb::backup_service::MetaSnapshotMetadata; use risingwave_pb::catalog::Table; use risingwave_pb::common::WorkerNode; @@ -22,7 +23,7 @@ use risingwave_pb::ddl_service::DdlProgress; use risingwave_pb::hummock::write_limits::WriteLimit; use risingwave_pb::hummock::{ BranchedObject, CompactTaskAssignment, CompactTaskProgress, CompactionGroupInfo, - HummockSnapshot, HummockVersion, HummockVersionDelta, + HummockSnapshot, }; use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs; use risingwave_pb::meta::list_actor_states_response::ActorState; @@ -223,15 +224,12 @@ impl FrontendMetaClient for FrontendMetaClientImpl { self.0 .risectl_get_checkpoint_hummock_version() .await - .map(|v| v.checkpoint_version.unwrap()) + .map(|v| HummockVersion::from_rpc_protobuf(&v.checkpoint_version.unwrap())) } async fn list_version_deltas(&self) -> Result> { // FIXME #8612: there can be lots of version deltas, so better to fetch them by pages and refactor `SysRowSeqScanExecutor` to yield multiple chunks. - self.0 - .list_version_deltas(0, u32::MAX, u64::MAX) - .await - .map(|v| v.version_deltas) + self.0.list_version_deltas(0, u32::MAX, u64::MAX).await } async fn list_branched_objects(&self) -> Result> { diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs index 2bfd561f14a0f..819542c431a38 100644 --- a/src/frontend/src/test_utils.rs +++ b/src/frontend/src/test_utils.rs @@ -31,6 +31,7 @@ use risingwave_common::catalog::{ use risingwave_common::error::{ErrorCode, Result}; use risingwave_common::system_param::reader::SystemParamsReader; use risingwave_common::util::column_index_mapping::ColIndexMapping; +use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta}; use risingwave_pb::backup_service::MetaSnapshotMetadata; use risingwave_pb::catalog::table::OptionalAssociatedSourceId; use risingwave_pb::catalog::{ @@ -45,7 +46,7 @@ use risingwave_pb::ddl_service::{ use risingwave_pb::hummock::write_limits::WriteLimit; use risingwave_pb::hummock::{ BranchedObject, CompactTaskAssignment, CompactTaskProgress, CompactionGroupInfo, - HummockSnapshot, HummockVersion, HummockVersionDelta, + HummockSnapshot, }; use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs; use risingwave_pb::meta::list_actor_states_response::ActorState; diff --git a/src/jni_core/src/hummock_iterator.rs b/src/jni_core/src/hummock_iterator.rs index d7fa7533c4738..e0771ef973084 100644 --- a/src/jni_core/src/hummock_iterator.rs +++ b/src/jni_core/src/hummock_iterator.rs @@ -24,6 +24,7 @@ use risingwave_common::util::select_all; use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde; use risingwave_common::util::value_encoding::{BasicSerde, EitherSerde, ValueRowDeserializer}; use risingwave_hummock_sdk::key::{prefixed_range_with_vnode, TableKeyRange}; +use risingwave_hummock_sdk::version::HummockVersion; use risingwave_object_store::object::build_remote_object_store; use risingwave_object_store::object::object_metrics::ObjectStoreMetrics; use risingwave_pb::java_binding::key_range::Bound; @@ -84,7 +85,10 @@ impl HummockJavaBindingIterator { let mut streams = Vec::with_capacity(read_plan.vnode_ids.len()); let key_range = read_plan.key_range.unwrap(); - let pin_version = PinnedVersion::new(read_plan.version.unwrap(), unbounded_channel().0); + let pin_version = PinnedVersion::new( + HummockVersion::from_rpc_protobuf(&read_plan.version.unwrap()), + unbounded_channel().0, + ); let table_id = read_plan.table_id.into(); for vnode in read_plan.vnode_ids { diff --git a/src/meta/model_v2/Cargo.toml b/src/meta/model_v2/Cargo.toml index 1d9992da8a832..f080645fc1c6a 100644 --- a/src/meta/model_v2/Cargo.toml +++ b/src/meta/model_v2/Cargo.toml @@ -14,6 +14,7 @@ ignored = ["workspace-hack"] normal = ["workspace-hack"] [dependencies] +risingwave_hummock_sdk = { workspace = true } risingwave_pb = { workspace = true } sea-orm = { version = "0.12.0", features = [ "sqlx-mysql", diff --git a/src/meta/model_v2/src/hummock_version_delta.rs b/src/meta/model_v2/src/hummock_version_delta.rs index f59f0c6f95455..ee7f5120b28a7 100644 --- a/src/meta/model_v2/src/hummock_version_delta.rs +++ b/src/meta/model_v2/src/hummock_version_delta.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use risingwave_pb::hummock::HummockVersionDelta; +use risingwave_pb::hummock::PbHummockVersionDelta; use sea_orm::entity::prelude::*; use sea_orm::FromJsonQueryResult; use serde::{Deserialize, Serialize}; @@ -36,9 +36,9 @@ pub enum Relation {} impl ActiveModelBehavior for ActiveModel {} -crate::derive_from_json_struct!(FullVersionDelta, HummockVersionDelta); +crate::derive_from_json_struct!(FullVersionDelta, PbHummockVersionDelta); -impl From for HummockVersionDelta { +impl From for PbHummockVersionDelta { fn from(value: Model) -> Self { let ret = value.full_version_delta.into_inner(); assert_eq!(value.id, ret.id as i64); diff --git a/src/meta/service/Cargo.toml b/src/meta/service/Cargo.toml index b734b62106495..c9b6619565cd7 100644 --- a/src/meta/service/Cargo.toml +++ b/src/meta/service/Cargo.toml @@ -23,6 +23,7 @@ rand = "0.8" regex = "1" risingwave_common = { workspace = true } risingwave_connector = { workspace = true } +risingwave_hummock_sdk = { workspace = true } risingwave_meta = { workspace = true } risingwave_meta_model_v2 = { workspace = true } risingwave_pb = { workspace = true } diff --git a/src/meta/service/src/hummock_service.rs b/src/meta/service/src/hummock_service.rs index 835032769266f..f5a7e5b7353b1 100644 --- a/src/meta/service/src/hummock_service.rs +++ b/src/meta/service/src/hummock_service.rs @@ -18,10 +18,10 @@ use std::time::Duration; use futures::StreamExt; use itertools::Itertools; use risingwave_common::catalog::{TableId, NON_RESERVED_SYS_CATALOG_ID}; +use risingwave_hummock_sdk::version::HummockVersionDelta; use risingwave_pb::hummock::get_compaction_score_response::PickerInfo; use risingwave_pb::hummock::hummock_manager_service_server::HummockManagerService; use risingwave_pb::hummock::subscribe_compaction_event_request::Event as RequestEvent; -use risingwave_pb::hummock::version_update_payload::Payload; use risingwave_pb::hummock::*; use tonic::{Request, Response, Status, Streaming}; @@ -29,6 +29,7 @@ use crate::hummock::compaction::selector::ManualCompactionOption; use crate::hummock::{HummockManagerRef, VacuumManagerRef}; use crate::manager::FragmentManagerRef; use crate::RwReceiverStream; + pub struct HummockServiceImpl { hummock_manager: HummockManagerRef, vacuum_manager: VacuumManagerRef, @@ -83,7 +84,7 @@ impl HummockManagerService for HummockServiceImpl { let current_version = self.hummock_manager.get_current_version().await; Ok(Response::new(GetCurrentVersionResponse { status: None, - current_version: Some(current_version), + current_version: Some(current_version.to_protobuf()), })) } @@ -94,10 +95,12 @@ impl HummockManagerService for HummockServiceImpl { let req = request.into_inner(); let (version, compaction_groups) = self .hummock_manager - .replay_version_delta(req.version_delta.unwrap()) + .replay_version_delta(HummockVersionDelta::from_rpc_protobuf( + &req.version_delta.unwrap(), + )) .await?; Ok(Response::new(ReplayVersionDeltaResponse { - version: Some(version), + version: Some(version.to_protobuf()), modified_compaction_groups: compaction_groups, })) } @@ -119,7 +122,7 @@ impl HummockManagerService for HummockServiceImpl { ) -> Result, Status> { let version = self.hummock_manager.disable_commit_epoch().await; Ok(Response::new(DisableCommitEpochResponse { - current_version: Some(version), + current_version: Some(version.to_protobuf()), })) } @@ -133,7 +136,12 @@ impl HummockManagerService for HummockServiceImpl { .list_version_deltas(req.start_id, req.num_limit, req.committed_epoch_limit) .await?; let resp = ListVersionDeltasResponse { - version_deltas: Some(version_deltas), + version_deltas: Some(PbHummockVersionDeltas { + version_deltas: version_deltas + .iter() + .map(HummockVersionDelta::to_protobuf) + .collect(), + }), }; Ok(Response::new(resp)) } @@ -415,15 +423,10 @@ impl HummockManagerService for HummockServiceImpl { request: Request, ) -> Result, Status> { let req = request.into_inner(); - let payload = self.hummock_manager.pin_version(req.context_id).await?; - match payload { - Payload::PinnedVersion(version) => Ok(Response::new(PinVersionResponse { - pinned_version: Some(version), - })), - Payload::VersionDeltas(_) => { - unreachable!("pin_version should not return version delta") - } - } + let version = self.hummock_manager.pin_version(req.context_id).await?; + Ok(Response::new(PinVersionResponse { + pinned_version: Some(version.to_protobuf()), + })) } async fn split_compaction_group( @@ -460,7 +463,7 @@ impl HummockManagerService for HummockServiceImpl { ) -> Result, Status> { let checkpoint_version = self.hummock_manager.get_checkpoint_version().await; Ok(Response::new(RiseCtlGetCheckpointVersionResponse { - checkpoint_version: Some(checkpoint_version), + checkpoint_version: Some(checkpoint_version.to_protobuf()), })) } diff --git a/src/meta/service/src/notification_service.rs b/src/meta/service/src/notification_service.rs index b7b63dcc6c164..1bd0be6551604 100644 --- a/src/meta/service/src/notification_service.rs +++ b/src/meta/service/src/notification_service.rs @@ -183,7 +183,7 @@ impl NotificationServiceImpl { MetaSnapshot { tables, - hummock_version: Some(hummock_version), + hummock_version: Some(hummock_version.to_protobuf()), version: Some(SnapshotVersion { catalog_version, ..Default::default() diff --git a/src/meta/src/backup_restore/meta_snapshot_builder.rs b/src/meta/src/backup_restore/meta_snapshot_builder.rs index 0d3f37155a047..9d4294579d0dc 100644 --- a/src/meta/src/backup_restore/meta_snapshot_builder.rs +++ b/src/meta/src/backup_restore/meta_snapshot_builder.rs @@ -19,11 +19,11 @@ use anyhow::anyhow; use risingwave_backup::error::{BackupError, BackupResult}; use risingwave_backup::meta_snapshot_v1::{ClusterMetadata, MetaSnapshotV1}; use risingwave_backup::MetaSnapshotId; -use risingwave_hummock_sdk::compaction_group::hummock_version_ext::HummockVersionUpdateExt; +use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta}; use risingwave_pb::catalog::{ Connection, Database, Function, Index, Schema, Sink, Source, Table, View, }; -use risingwave_pb::hummock::{HummockVersion, HummockVersionDelta, HummockVersionStats}; +use risingwave_pb::hummock::HummockVersionStats; use risingwave_pb::meta::SystemParams; use risingwave_pb::user::UserInfo; @@ -46,10 +46,10 @@ impl MetaSnapshotV1Builder { } } - pub async fn build>( + pub async fn build( &mut self, id: MetaSnapshotId, - hummock_version_builder: D, + hummock_version_builder: impl Future, ) -> BackupResult<()> { self.snapshot.format_version = VERSION; self.snapshot.id = id; @@ -169,7 +169,8 @@ mod tests { use risingwave_backup::error::BackupError; use risingwave_backup::meta_snapshot_v1::MetaSnapshotV1; use risingwave_common::system_param::system_params_for_test; - use risingwave_pb::hummock::{HummockVersion, HummockVersionStats}; + use risingwave_hummock_sdk::version::HummockVersion; + use risingwave_pb::hummock::HummockVersionStats; use crate::backup_restore::meta_snapshot_builder; use crate::manager::model::SystemParamsModel; diff --git a/src/meta/src/backup_restore/meta_snapshot_builder_v2.rs b/src/meta/src/backup_restore/meta_snapshot_builder_v2.rs index 644b581f03e30..3cb4a39b799c8 100644 --- a/src/meta/src/backup_restore/meta_snapshot_builder_v2.rs +++ b/src/meta/src/backup_restore/meta_snapshot_builder_v2.rs @@ -20,9 +20,9 @@ use itertools::Itertools; use risingwave_backup::error::{BackupError, BackupResult}; use risingwave_backup::meta_snapshot_v2::{MetaSnapshotV2, MetadataV2}; use risingwave_backup::MetaSnapshotId; -use risingwave_hummock_sdk::compaction_group::hummock_version_ext::HummockVersionUpdateExt; +use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta}; use risingwave_meta_model_v2 as model_v2; -use risingwave_pb::hummock::{HummockVersion, HummockVersionDelta}; +use risingwave_pb::hummock::PbHummockVersionDelta; use sea_orm::{EntityTrait, QueryOrder, TransactionTrait}; use crate::controller::SqlMetaStore; @@ -42,10 +42,10 @@ impl MetaSnapshotV2Builder { } } - pub async fn build>( + pub async fn build( &mut self, id: MetaSnapshotId, - hummock_version_builder: D, + hummock_version_builder: impl Future, ) -> BackupResult<()> { self.snapshot.format_version = VERSION; self.snapshot.id = id; @@ -68,7 +68,8 @@ impl MetaSnapshotV2Builder { .await .map_err(|e| BackupError::MetaStorage(e.into()))? .into_iter() - .map_into::(); + .map_into::() + .map(|pb_delta| HummockVersionDelta::from_persisted_protobuf(&pb_delta)); let hummock_version = { let mut redo_state = hummock_version; let mut max_log_id = None; diff --git a/src/meta/src/backup_restore/restore.rs b/src/meta/src/backup_restore/restore.rs index 3e2d36eb86985..877f5d1a454a9 100644 --- a/src/meta/src/backup_restore/restore.rs +++ b/src/meta/src/backup_restore/restore.rs @@ -19,10 +19,11 @@ use risingwave_backup::meta_snapshot::Metadata; use risingwave_backup::storage::{MetaSnapshotStorage, MetaSnapshotStorageRef}; use risingwave_backup::MetaSnapshotId; use risingwave_common::config::{MetaBackend, ObjectStoreConfig}; +use risingwave_hummock_sdk::version::HummockVersion; use risingwave_hummock_sdk::version_checkpoint_path; use risingwave_object_store::object::build_remote_object_store; use risingwave_object_store::object::object_metrics::ObjectStoreMetrics; -use risingwave_pb::hummock::{HummockVersion, HummockVersionCheckpoint}; +use risingwave_pb::hummock::PbHummockVersionCheckpoint; use crate::backup_restore::restore_impl::v1::{LoaderV1, WriterModelV1ToMetaStoreV1}; use crate::backup_restore::restore_impl::v2::{LoaderV2, WriterModelV2ToMetaStoreV2}; @@ -83,8 +84,8 @@ async fn restore_hummock_version( .await, ); let checkpoint_path = version_checkpoint_path(hummock_storage_directory); - let checkpoint = HummockVersionCheckpoint { - version: Some(hummock_version.clone()), + let checkpoint = PbHummockVersionCheckpoint { + version: Some(hummock_version.to_protobuf()), // Ignore stale objects. Full GC will clear them. stale_objects: Default::default(), }; @@ -206,7 +207,8 @@ mod tests { use risingwave_backup::meta_snapshot_v1::{ClusterMetadata, MetaSnapshotV1}; use risingwave_backup::storage::MetaSnapshotStorage; use risingwave_common::config::{MetaBackend, SystemConfig}; - use risingwave_pb::hummock::{HummockVersion, HummockVersionStats}; + use risingwave_hummock_sdk::version::HummockVersion; + use risingwave_pb::hummock::HummockVersionStats; use risingwave_pb::meta::SystemParams; use crate::backup_restore::restore::restore_impl; diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 334a2394de4fb..571caaa3dd6de 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -28,7 +28,9 @@ use risingwave_common::bail; use risingwave_common::catalog::TableId; use risingwave_common::system_param::PAUSE_ON_NEXT_BOOTSTRAP_KEY; use risingwave_common::util::tracing::TracingContext; -use risingwave_hummock_sdk::table_watermark::merge_multiple_new_table_watermarks; +use risingwave_hummock_sdk::table_watermark::{ + merge_multiple_new_table_watermarks, TableWatermarks, +}; use risingwave_hummock_sdk::{ExtendedSstableInfo, HummockSstableObjectId}; use risingwave_pb::catalog::table::TableType; use risingwave_pb::ddl_service::DdlProgress; @@ -1289,7 +1291,22 @@ fn collect_commit_epoch_info(resps: &mut [BarrierCompleteResponse]) -> CommitEpo } CommitEpochInfo::new( synced_ssts, - merge_multiple_new_table_watermarks(resps.iter().map(|resp| resp.table_watermarks.clone())), + merge_multiple_new_table_watermarks( + resps + .iter() + .map(|resp| { + resp.table_watermarks + .iter() + .map(|(table_id, watermarks)| { + ( + TableId::new(*table_id), + TableWatermarks::from_protobuf(watermarks), + ) + }) + .collect() + }) + .collect_vec(), + ), sst_to_worker, ) } diff --git a/src/meta/src/hummock/manager/checkpoint.rs b/src/meta/src/hummock/manager/checkpoint.rs index 6aa64292b9db1..523c0d35f9cd5 100644 --- a/src/meta/src/hummock/manager/checkpoint.rs +++ b/src/meta/src/hummock/manager/checkpoint.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::ops::Bound::{Excluded, Included}; use std::ops::{Deref, DerefMut}; use std::sync::atomic::Ordering; @@ -20,8 +21,10 @@ use function_name::named; use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{ object_size_map, summarize_group_deltas, }; -use risingwave_pb::hummock::hummock_version_checkpoint::StaleObjects; -use risingwave_pb::hummock::{HummockVersion, HummockVersionCheckpoint}; +use risingwave_hummock_sdk::version::HummockVersion; +use risingwave_hummock_sdk::HummockSstableObjectId; +use risingwave_pb::hummock::hummock_version_checkpoint::{PbStaleObjects, StaleObjects}; +use risingwave_pb::hummock::PbHummockVersionCheckpoint; use crate::hummock::error::Result; use crate::hummock::manager::{read_lock, write_lock}; @@ -31,6 +34,32 @@ use crate::storage::{MetaStore, MetaStoreError, DEFAULT_COLUMN_FAMILY}; const HUMMOCK_INIT_FLAG_KEY: &[u8] = b"hummock_init_flag"; +#[derive(Default)] +pub struct HummockVersionCheckpoint { + pub version: HummockVersion, + pub stale_objects: HashMap, +} + +impl HummockVersionCheckpoint { + pub fn from_protobuf(checkpoint: &PbHummockVersionCheckpoint) -> Self { + Self { + version: HummockVersion::from_persisted_protobuf(checkpoint.version.as_ref().unwrap()), + stale_objects: checkpoint + .stale_objects + .iter() + .map(|(object_id, objects)| (*object_id as HummockSstableObjectId, objects.clone())) + .collect(), + } + } + + pub fn to_protobuf(&self) -> PbHummockVersionCheckpoint { + PbHummockVersionCheckpoint { + version: Some(self.version.to_protobuf()), + stale_objects: self.stale_objects.clone(), + } + } +} + /// A hummock version checkpoint compacts previous hummock version delta logs, and stores stale /// objects from those delta logs. impl HummockManager { @@ -54,8 +83,8 @@ impl HummockManager { return Err(e.into()); } }; - let ckpt = HummockVersionCheckpoint::decode(data).map_err(|e| anyhow::anyhow!(e))?; - Ok(ckpt) + let ckpt = PbHummockVersionCheckpoint::decode(data).map_err(|e| anyhow::anyhow!(e))?; + Ok(HummockVersionCheckpoint::from_protobuf(&ckpt)) } pub(super) async fn write_checkpoint( @@ -63,7 +92,7 @@ impl HummockManager { checkpoint: &HummockVersionCheckpoint, ) -> Result<()> { use prost::Message; - let buf = checkpoint.encode_to_vec(); + let buf = checkpoint.to_protobuf().encode_to_vec(); self.object_store .upload(&self.version_checkpoint_path, buf.into()) .await?; @@ -83,13 +112,13 @@ impl HummockManager { let current_version = &versioning.current_version; let old_checkpoint = &versioning.checkpoint; let new_checkpoint_id = current_version.id; - let old_checkpoint_id = old_checkpoint.version.as_ref().unwrap().id; + let old_checkpoint_id = old_checkpoint.version.id; if new_checkpoint_id < old_checkpoint_id + min_delta_log_num { return Ok(0); } let mut stale_objects = old_checkpoint.stale_objects.clone(); // `object_sizes` is used to calculate size of stale objects. - let mut object_sizes = object_size_map(old_checkpoint.version.as_ref().unwrap()); + let mut object_sizes = object_size_map(&old_checkpoint.version); for (_, version_delta) in versioning .hummock_version_deltas .range((Excluded(old_checkpoint_id), Included(new_checkpoint_id))) @@ -120,7 +149,7 @@ impl HummockManager { ); } let new_checkpoint = HummockVersionCheckpoint { - version: Some(current_version.clone()), + version: current_version.clone(), stale_objects, }; drop(versioning_guard); @@ -129,11 +158,7 @@ impl HummockManager { // 3. hold write lock and update in memory state let mut versioning_guard = write_lock!(self, versioning).await; let versioning = versioning_guard.deref_mut(); - assert!( - versioning.checkpoint.version.is_none() - || new_checkpoint.version.as_ref().unwrap().id - >= versioning.checkpoint.version.as_ref().unwrap().id - ); + assert!(new_checkpoint.version.id >= versioning.checkpoint.version.id); versioning.checkpoint = new_checkpoint; versioning.mark_objects_for_deletion(); @@ -191,11 +216,6 @@ impl HummockManager { #[named] pub async fn get_checkpoint_version(&self) -> HummockVersion { let versioning_guard = read_lock!(self, versioning).await; - versioning_guard - .checkpoint - .version - .as_ref() - .unwrap() - .clone() + versioning_guard.checkpoint.version.clone() } } diff --git a/src/meta/src/hummock/manager/compaction_group_manager.rs b/src/meta/src/hummock/manager/compaction_group_manager.rs index 2389fe1538dc2..515723fcaddea 100644 --- a/src/meta/src/hummock/manager/compaction_group_manager.rs +++ b/src/meta/src/hummock/manager/compaction_group_manager.rs @@ -18,10 +18,10 @@ use std::sync::Arc; use function_name::named; use itertools::Itertools; +use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{ build_version_delta_after_version, get_compaction_group_ids, get_compaction_group_ssts, - get_member_table_ids, try_get_compaction_group_id_by_table_id, HummockVersionExt, - HummockVersionUpdateExt, TableGroupInfo, + get_member_table_ids, try_get_compaction_group_id_by_table_id, TableGroupInfo, }; use risingwave_hummock_sdk::compaction_group::{StateTableId, StaticCompactionGroupId}; use risingwave_hummock_sdk::CompactionGroupId; @@ -44,7 +44,7 @@ use crate::hummock::error::{Error, Result}; use crate::hummock::manager::{drop_sst, read_lock, HummockManager}; use crate::hummock::metrics_utils::remove_compaction_group_in_sst_stat; use crate::hummock::model::CompactionGroup; -use crate::manager::{IdCategory, MetaSrvEnv, TableId}; +use crate::manager::{IdCategory, MetaSrvEnv}; use crate::model::{ BTreeMapEntryTransaction, BTreeMapTransaction, MetadataModel, TableFragments, ValTransaction, }; @@ -306,7 +306,9 @@ impl HummockManager { .len() as u64 - 1, ); - new_version_delta.removed_table_ids.push(*table_id); + new_version_delta + .removed_table_ids + .push(TableId::new(*table_id)); } // Remove empty group, GC SSTs and remove metric. @@ -462,7 +464,7 @@ impl HummockManager { table_ids: &[StateTableId], target_group_id: Option, partition_vnode_count: u32, - ) -> Result<(CompactionGroupId, BTreeMap)> { + ) -> Result<(CompactionGroupId, BTreeMap)> { let mut table_to_partition = BTreeMap::default(); if table_ids.is_empty() { return Ok((parent_group_id, table_to_partition)); diff --git a/src/meta/src/hummock/manager/context.rs b/src/meta/src/hummock/manager/context.rs index b069a31ce5bd3..f1b086720049a 100644 --- a/src/meta/src/hummock/manager/context.rs +++ b/src/meta/src/hummock/manager/context.rs @@ -18,10 +18,11 @@ use std::ops::DerefMut; use fail::fail_point; use function_name::named; use itertools::Itertools; +use risingwave_hummock_sdk::version::HummockVersion; use risingwave_hummock_sdk::{ ExtendedSstableInfo, HummockContextId, HummockEpoch, HummockSstableObjectId, }; -use risingwave_pb::hummock::{HummockVersion, ValidationTask}; +use risingwave_pb::hummock::ValidationTask; use crate::hummock::error::{Error, Result}; use crate::hummock::manager::{ diff --git a/src/meta/src/hummock/manager/gc.rs b/src/meta/src/hummock/manager/gc.rs index 837564d6e6477..bfa396729aabb 100644 --- a/src/meta/src/hummock/manager/gc.rs +++ b/src/meta/src/hummock/manager/gc.rs @@ -20,7 +20,6 @@ use std::time::Duration; use function_name::named; use futures::{stream, StreamExt}; use itertools::Itertools; -use risingwave_hummock_sdk::compaction_group::hummock_version_ext::HummockVersionExt; use risingwave_hummock_sdk::HummockSstableObjectId; use risingwave_pb::common::worker_node::State::Running; use risingwave_pb::common::WorkerType; @@ -72,7 +71,7 @@ impl HummockManager { let versioning = versioning_guard.deref_mut(); let deltas_to_delete = versioning .hummock_version_deltas - .range(..=versioning.checkpoint.version.as_ref().unwrap().id) + .range(..=versioning.checkpoint.version.id) .map(|(k, _)| *k) .collect_vec(); // If there is any safe point, skip this to ensure meta backup has required delta logs to @@ -116,7 +115,7 @@ impl HummockManager { let mut tracked_object_ids = HashSet::from_iter(versioning_guard.current_version.get_object_ids()); for delta in versioning_guard.hummock_version_deltas.values() { - tracked_object_ids.extend(delta.get_gc_object_ids()); + tracked_object_ids.extend(delta.gc_object_ids.iter().cloned()); } tracked_object_ids }; diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index 20cb4a4765dc1..db4c1c5d50b87 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -37,8 +37,9 @@ use risingwave_hummock_sdk::compact::{compact_task_to_string, statistics_compact use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{ build_version_delta_after_version, get_compaction_group_ids, get_table_compaction_group_id_mapping, try_get_compaction_group_id_by_table_id, - BranchedSstInfo, HummockLevelsExt, HummockVersionExt, HummockVersionUpdateExt, + BranchedSstInfo, HummockLevelsExt, }; +use risingwave_hummock_sdk::version::HummockVersionDelta; use risingwave_hummock_sdk::{ version_checkpoint_path, CompactionGroupId, ExtendedSstableInfo, HummockCompactionTaskId, HummockContextId, HummockEpoch, HummockSstableId, HummockSstableObjectId, HummockVersionId, @@ -54,10 +55,9 @@ use risingwave_pb::hummock::subscribe_compaction_event_response::{ Event as ResponseEvent, PullTaskAck, }; use risingwave_pb::hummock::{ - version_update_payload, CompactTask, CompactTaskAssignment, CompactionConfig, GroupDelta, - HummockPinnedSnapshot, HummockPinnedVersion, HummockSnapshot, HummockVersion, - HummockVersionCheckpoint, HummockVersionDelta, HummockVersionDeltas, HummockVersionStats, - IntraLevelDelta, SstableInfo, SubscribeCompactionEventRequest, TableOption, TableWatermarks, + CompactTask, CompactTaskAssignment, CompactionConfig, GroupDelta, HummockPinnedSnapshot, + HummockPinnedVersion, HummockSnapshot, HummockVersionStats, IntraLevelDelta, + PbCompactionGroupInfo, SstableInfo, SubscribeCompactionEventRequest, TableOption, }; use risingwave_pb::meta::subscribe_response::{Info, Operation}; use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; @@ -99,7 +99,7 @@ mod tests; mod versioning; pub use versioning::HummockVersionSafePoint; use versioning::*; -mod checkpoint; +pub(crate) mod checkpoint; mod compaction; mod worker; @@ -211,8 +211,6 @@ use risingwave_hummock_sdk::table_stats::{ use risingwave_object_store::object::{build_remote_object_store, ObjectError, ObjectStoreRef}; use risingwave_pb::catalog::Table; use risingwave_pb::hummock::level_handler::RunningCompactTask; -use risingwave_pb::hummock::version_update_payload::Payload; -use risingwave_pb::hummock::PbCompactionGroupInfo; use risingwave_pb::meta::relation::RelationInfo; /// Acquire write lock of the lock with `lock_name`. @@ -268,14 +266,14 @@ pub enum CompactionResumeTrigger { pub struct CommitEpochInfo { pub sstables: Vec, - pub new_table_watermarks: HashMap, + pub new_table_watermarks: HashMap, pub sst_to_context: HashMap, } impl CommitEpochInfo { pub fn new( sstables: Vec, - new_table_watermarks: HashMap, + new_table_watermarks: HashMap, sst_to_context: HashMap, ) -> Self { Self { @@ -502,7 +500,7 @@ impl HummockManager { .insert(self.env.meta_store()) .await?; versioning_guard.checkpoint = HummockVersionCheckpoint { - version: Some(checkpoint_version.clone()), + version: checkpoint_version.clone(), stale_objects: Default::default(), }; self.write_checkpoint(&versioning_guard.checkpoint).await?; @@ -511,12 +509,7 @@ impl HummockManager { } else { // Read checkpoint from object store. versioning_guard.checkpoint = self.read_checkpoint().await?; - versioning_guard - .checkpoint - .version - .as_ref() - .cloned() - .unwrap() + versioning_guard.checkpoint.version.clone() }; versioning_guard.version_stats = HummockVersionStats::list(self.env.meta_store()) .await? @@ -585,10 +578,7 @@ impl HummockManager { /// Pin the current greatest hummock version. The pin belongs to `context_id` /// and will be unpinned when `context_id` is invalidated. #[named] - pub async fn pin_version( - &self, - context_id: HummockContextId, - ) -> Result { + pub async fn pin_version(&self, context_id: HummockContextId) -> Result { let mut versioning_guard = write_lock!(self, versioning).await; let _timer = start_measure_real_process_timer!(self); let versioning = versioning_guard.deref_mut(); @@ -601,7 +591,7 @@ impl HummockManager { }, ); let version_id = versioning.current_version.id; - let ret = Payload::PinnedVersion(versioning.current_version.clone()); + let ret = versioning.current_version.clone(); if context_pinned_version.min_pinned_id == INVALID_VERSION_ID || context_pinned_version.min_pinned_id > version_id { @@ -1786,7 +1776,7 @@ impl HummockManager { start_id: u64, num_limit: u32, committed_epoch_limit: HummockEpoch, - ) -> Result { + ) -> Result> { let versioning = read_lock!(self, versioning).await; let version_deltas = versioning .hummock_version_deltas @@ -1796,7 +1786,7 @@ impl HummockManager { .take(num_limit as _) .cloned() .collect(); - Ok(HummockVersionDeltas { version_deltas }) + Ok(version_deltas) } pub async fn init_metadata_for_version_replay( @@ -2047,7 +2037,7 @@ impl HummockManager { .last_key_value() .unwrap() .1 - .clone()], + .to_protobuf()], }), ); } @@ -2990,7 +2980,7 @@ impl HummockManager { rewrite_cg_ids.push(*cg_id); } - if let Some(levels) = current_version.get_levels().get(cg_id) { + if let Some(levels) = current_version.levels.get(cg_id) { if levels.member_table_ids.len() == 1 { restore_cg_to_partition_vnode.insert( *cg_id, @@ -3225,10 +3215,13 @@ fn init_selectors() -> HashMap { - unreachable!("should get full version") - } - Payload::PinnedVersion(version) => version, - }; + let version = hummock_manager.pin_version(context_id_1).await.unwrap(); assert_eq!( - version.get_id(), + version.id, init_version_id + commit_log_count + register_log_count ); assert_eq!( @@ -553,14 +548,9 @@ async fn test_hummock_manager_basic() { for _ in 0..2 { // should pin latest because deltas cannot contain INVALID_EPOCH - let version = match hummock_manager.pin_version(context_id_2).await.unwrap() { - Payload::VersionDeltas(_) => { - unreachable!("should get full version") - } - Payload::PinnedVersion(version) => version, - }; + let version = hummock_manager.pin_version(context_id_2).await.unwrap(); assert_eq!( - version.get_id(), + version.id, init_version_id + commit_log_count + register_log_count ); // pinned by context_id_1 diff --git a/src/meta/src/hummock/manager/versioning.rs b/src/meta/src/hummock/manager/versioning.rs index 6fd7f33bb63a4..13388288f2dcd 100644 --- a/src/meta/src/hummock/manager/versioning.rs +++ b/src/meta/src/hummock/manager/versioning.rs @@ -20,22 +20,23 @@ use itertools::Itertools; use risingwave_common::util::epoch::INVALID_EPOCH; use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{ build_initial_compaction_group_levels, get_compaction_group_ids, BranchedSstInfo, - HummockVersionExt, }; use risingwave_hummock_sdk::compaction_group::{StateTableId, StaticCompactionGroupId}; use risingwave_hummock_sdk::table_stats::add_prost_table_stats_map; +use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta}; use risingwave_hummock_sdk::{ CompactionGroupId, HummockContextId, HummockSstableObjectId, HummockVersionId, FIRST_VERSION_ID, }; use risingwave_pb::common::WorkerNode; use risingwave_pb::hummock::write_limits::WriteLimit; use risingwave_pb::hummock::{ - CompactionConfig, HummockPinnedSnapshot, HummockPinnedVersion, HummockVersion, - HummockVersionCheckpoint, HummockVersionDelta, HummockVersionStats, SstableInfo, TableStats, + CompactionConfig, HummockPinnedSnapshot, HummockPinnedVersion, HummockVersionStats, + SstableInfo, TableStats, }; use risingwave_pb::meta::subscribe_response::{Info, Operation}; use crate::hummock::error::Result; +use crate::hummock::manager::checkpoint::HummockVersionCheckpoint; use crate::hummock::manager::worker::{HummockManagerEvent, HummockManagerEventSender}; use crate::hummock::manager::{commit_multi_var, read_lock, write_lock}; use crate::hummock::metrics_utils::{trigger_safepoint_stat, trigger_write_stop_stats}; @@ -399,12 +400,12 @@ mod tests { use std::collections::HashMap; use std::sync::Arc; + use risingwave_hummock_sdk::version::HummockVersion; use risingwave_hummock_sdk::{CompactionGroupId, HummockVersionId}; use risingwave_pb::hummock::hummock_version::Levels; use risingwave_pb::hummock::write_limits::WriteLimit; use risingwave_pb::hummock::{ - HummockPinnedVersion, HummockVersion, HummockVersionStats, KeyRange, Level, - OverlappingLevel, SstableInfo, + HummockPinnedVersion, HummockVersionStats, KeyRange, Level, OverlappingLevel, SstableInfo, }; use crate::hummock::compaction::compaction_config::CompactionConfigBuilder; diff --git a/src/meta/src/hummock/metrics_utils.rs b/src/meta/src/hummock/metrics_utils.rs index cf32bc1e00b61..484dc86a832a9 100644 --- a/src/meta/src/hummock/metrics_utils.rs +++ b/src/meta/src/hummock/metrics_utils.rs @@ -18,22 +18,22 @@ use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; use itertools::{enumerate, Itertools}; -use prost::Message; use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{ - object_size_map, BranchedSstInfo, HummockVersionExt, + object_size_map, BranchedSstInfo, }; +use risingwave_hummock_sdk::version::HummockVersion; use risingwave_hummock_sdk::{ CompactionGroupId, HummockContextId, HummockEpoch, HummockSstableObjectId, HummockVersionId, }; use risingwave_pb::hummock::hummock_version::Levels; use risingwave_pb::hummock::write_limits::WriteLimit; use risingwave_pb::hummock::{ - CompactionConfig, HummockPinnedSnapshot, HummockPinnedVersion, HummockVersion, - HummockVersionCheckpoint, HummockVersionStats, LevelType, + CompactionConfig, HummockPinnedSnapshot, HummockPinnedVersion, HummockVersionStats, LevelType, }; use super::compaction::get_compression_algorithm; use super::compaction::selector::DynamicLevelSelectorCore; +use crate::hummock::checkpoint::HummockVersionCheckpoint; use crate::hummock::compaction::CompactStatus; use crate::rpc::metrics::MetaMetrics; @@ -47,7 +47,7 @@ pub fn trigger_version_stat( .set(current_version.max_committed_epoch as i64); metrics .version_size - .set(current_version.encoded_len() as i64); + .set(current_version.estimated_encode_len() as i64); metrics.safe_epoch.set(current_version.safe_epoch as i64); metrics.current_version_id.set(current_version.id as i64); metrics.version_stats.reset(); @@ -368,7 +368,7 @@ pub fn trigger_gc_stat( checkpoint: &HummockVersionCheckpoint, min_pinned_version_id: HummockVersionId, ) { - let current_version_object_size_map = object_size_map(checkpoint.version.as_ref().unwrap()); + let current_version_object_size_map = object_size_map(&checkpoint.version); let current_version_object_size = current_version_object_size_map.values().sum::(); let current_version_object_count = current_version_object_size_map.len(); let mut old_version_object_size = 0; diff --git a/src/meta/src/hummock/mock_hummock_meta_client.rs b/src/meta/src/hummock/mock_hummock_meta_client.rs index 9b0b66fc1e03f..282d47d4c16f1 100644 --- a/src/meta/src/hummock/mock_hummock_meta_client.rs +++ b/src/meta/src/hummock/mock_hummock_meta_client.rs @@ -22,7 +22,10 @@ use async_trait::async_trait; use fail::fail_point; use futures::stream::BoxStream; use futures::{Stream, StreamExt}; +use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; +use risingwave_hummock_sdk::table_watermark::TableWatermarks; +use risingwave_hummock_sdk::version::HummockVersion; use risingwave_hummock_sdk::{ HummockContextId, HummockEpoch, HummockSstableObjectId, HummockVersionId, LocalSstableInfo, SstObjectIdRange, @@ -32,8 +35,8 @@ use risingwave_pb::hummock::compact_task::TaskStatus; use risingwave_pb::hummock::subscribe_compaction_event_request::{Event, ReportTask}; use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent; use risingwave_pb::hummock::{ - compact_task, CompactTask, HummockSnapshot, HummockVersion, SubscribeCompactionEventRequest, - SubscribeCompactionEventResponse, TableWatermarks, VacuumTask, + compact_task, CompactTask, HummockSnapshot, SubscribeCompactionEventRequest, + SubscribeCompactionEventResponse, VacuumTask, }; use risingwave_rpc_client::error::{Result, RpcError}; use risingwave_rpc_client::{CompactionEventItem, HummockMetaClient}; @@ -94,7 +97,7 @@ impl MockHummockMetaClient { &self, epoch: HummockEpoch, sstables: Vec, - new_table_watermarks: HashMap, + new_table_watermarks: HashMap, ) -> Result<()> { let sst_to_worker = sstables .iter() diff --git a/src/meta/src/hummock/model/ext/hummock.rs b/src/meta/src/hummock/model/ext/hummock.rs index f61c20cf95ad5..b10d0b83da2b6 100644 --- a/src/meta/src/hummock/model/ext/hummock.rs +++ b/src/meta/src/hummock/model/ext/hummock.rs @@ -13,6 +13,7 @@ // limitations under the License. use itertools::Itertools; +use risingwave_hummock_sdk::version::HummockVersionDelta; use risingwave_meta_model_v2::compaction_config::CompactionConfig; use risingwave_meta_model_v2::compaction_status::LevelHandlers; use risingwave_meta_model_v2::compaction_task::CompactionTask; @@ -22,9 +23,7 @@ use risingwave_meta_model_v2::{ hummock_pinned_version, hummock_version_delta, CompactionGroupId, CompactionTaskId, HummockVersionId, WorkerId, }; -use risingwave_pb::hummock::{ - CompactTaskAssignment, HummockPinnedSnapshot, HummockPinnedVersion, HummockVersionDelta, -}; +use risingwave_pb::hummock::{CompactTaskAssignment, HummockPinnedSnapshot, HummockPinnedVersion}; use sea_orm::sea_query::OnConflict; use sea_orm::ActiveValue::Set; use sea_orm::EntityTrait; @@ -192,7 +191,7 @@ impl Transactional for HummockVersionDelta { max_committed_epoch: Set(self.max_committed_epoch as _), safe_epoch: Set(self.safe_epoch as _), trivial_move: Set(self.trivial_move), - full_version_delta: Set(FullVersionDelta(self.clone())), + full_version_delta: Set(FullVersionDelta(self.to_protobuf())), }; hummock_version_delta::Entity::insert(m) .on_conflict( diff --git a/src/meta/src/hummock/model/version_delta.rs b/src/meta/src/hummock/model/version_delta.rs index a7d97790ce51a..3103f428a45f8 100644 --- a/src/meta/src/hummock/model/version_delta.rs +++ b/src/meta/src/hummock/model/version_delta.rs @@ -13,8 +13,9 @@ // limitations under the License. use prost::Message; +use risingwave_hummock_sdk::version::HummockVersionDelta; use risingwave_hummock_sdk::HummockVersionId; -use risingwave_pb::hummock::HummockVersionDelta; +use risingwave_pb::hummock::PbHummockVersionDelta; use crate::hummock::model::HUMMOCK_VERSION_DELTA_CF_NAME; use crate::model::{MetadataModel, MetadataModelResult}; @@ -22,22 +23,22 @@ use crate::model::{MetadataModel, MetadataModelResult}; /// `HummockVersionDelta` tracks delta of `Sstables` in given version based on previous version. impl MetadataModel for HummockVersionDelta { type KeyType = HummockVersionId; - type PbType = HummockVersionDelta; + type PbType = PbHummockVersionDelta; fn cf_name() -> String { String::from(HUMMOCK_VERSION_DELTA_CF_NAME) } fn to_protobuf(&self) -> Self::PbType { - self.clone() + self.to_protobuf() } fn to_protobuf_encoded_vec(&self) -> Vec { - self.encode_to_vec() + self.to_protobuf().encode_to_vec() } fn from_protobuf(prost: Self::PbType) -> Self { - prost + Self::from_persisted_protobuf(&prost) } fn key(&self) -> MetadataModelResult { diff --git a/src/meta/src/hummock/test_utils.rs b/src/meta/src/hummock/test_utils.rs index 74d0698c64bea..679687e87ed96 100644 --- a/src/meta/src/hummock/test_utils.rs +++ b/src/meta/src/hummock/test_utils.rs @@ -18,15 +18,14 @@ use std::time::Duration; use itertools::Itertools; use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; use risingwave_hummock_sdk::key::key_with_epoch; +use risingwave_hummock_sdk::version::HummockVersion; use risingwave_hummock_sdk::{ CompactionGroupId, HummockContextId, HummockEpoch, HummockSstableObjectId, LocalSstableInfo, }; use risingwave_pb::common::{HostAddress, WorkerNode, WorkerType}; #[cfg(test)] use risingwave_pb::hummock::compact_task::TaskStatus; -use risingwave_pb::hummock::{ - CompactionConfig, HummockSnapshot, HummockVersion, KeyRange, SstableInfo, -}; +use risingwave_pb::hummock::{CompactionConfig, HummockSnapshot, KeyRange, SstableInfo}; use risingwave_pb::meta::add_worker_node_request::Property; use crate::hummock::compaction::compaction_config::CompactionConfigBuilder; diff --git a/src/meta/src/model/mod.rs b/src/meta/src/model/mod.rs index 6707108974c6a..c0881813c729d 100644 --- a/src/meta/src/model/mod.rs +++ b/src/meta/src/model/mod.rs @@ -166,7 +166,6 @@ macro_rules! for_all_metadata_models { $macro! { // These items should be included in a meta snapshot. // So be sure to update meta backup/restore when adding new items. - { risingwave_pb::hummock::HummockVersion }, { risingwave_pb::hummock::HummockVersionStats }, { crate::hummock::model::CompactionGroup }, { risingwave_pb::catalog::Database }, @@ -184,7 +183,7 @@ macro_rules! for_all_metadata_models { { crate::model::cluster::Worker }, { risingwave_pb::hummock::CompactTaskAssignment }, { crate::hummock::compaction::CompactStatus }, - { risingwave_pb::hummock::HummockVersionDelta }, + { risingwave_hummock_sdk::version::HummockVersionDelta }, { risingwave_pb::hummock::HummockPinnedSnapshot }, { risingwave_pb::hummock::HummockPinnedVersion }, } diff --git a/src/rpc_client/src/hummock_meta_client.rs b/src/rpc_client/src/hummock_meta_client.rs index b02b432963ada..0244c708ab821 100644 --- a/src/rpc_client/src/hummock_meta_client.rs +++ b/src/rpc_client/src/hummock_meta_client.rs @@ -14,12 +14,12 @@ use async_trait::async_trait; use futures::stream::BoxStream; +use risingwave_hummock_sdk::version::HummockVersion; use risingwave_hummock_sdk::{ HummockEpoch, HummockSstableObjectId, HummockVersionId, LocalSstableInfo, SstObjectIdRange, }; use risingwave_pb::hummock::{ - HummockSnapshot, HummockVersion, SubscribeCompactionEventRequest, - SubscribeCompactionEventResponse, VacuumTask, + HummockSnapshot, SubscribeCompactionEventRequest, SubscribeCompactionEventResponse, VacuumTask, }; use tokio::sync::mpsc::UnboundedSender; diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index c2288dea07c0e..fa8ce12c0cf34 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -36,6 +36,7 @@ use risingwave_common::util::resource_util::cpu::total_cpu_available; use risingwave_common::util::resource_util::memory::system_memory_available_bytes; use risingwave_common::RW_VERSION; use risingwave_hummock_sdk::compaction_group::StateTableId; +use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta}; use risingwave_hummock_sdk::{ CompactionGroupId, HummockEpoch, HummockSstableObjectId, HummockVersionId, LocalSstableInfo, SstObjectIdRange, @@ -946,10 +947,13 @@ impl MetaClient { version_delta: HummockVersionDelta, ) -> Result<(HummockVersion, Vec)> { let req = ReplayVersionDeltaRequest { - version_delta: Some(version_delta), + version_delta: Some(version_delta.to_protobuf()), }; let resp = self.inner.replay_version_delta(req).await?; - Ok((resp.version.unwrap(), resp.modified_compaction_groups)) + Ok(( + HummockVersion::from_rpc_protobuf(&resp.version.unwrap()), + resp.modified_compaction_groups, + )) } pub async fn list_version_deltas( @@ -957,7 +961,7 @@ impl MetaClient { start_id: u64, num_limit: u32, committed_epoch_limit: HummockEpoch, - ) -> Result { + ) -> Result> { let req = ListVersionDeltasRequest { start_id, num_limit, @@ -968,7 +972,11 @@ impl MetaClient { .list_version_deltas(req) .await? .version_deltas - .unwrap()) + .unwrap() + .version_deltas + .iter() + .map(HummockVersionDelta::from_rpc_protobuf) + .collect()) } pub async fn trigger_compaction_deterministic( @@ -986,12 +994,14 @@ impl MetaClient { pub async fn disable_commit_epoch(&self) -> Result { let req = DisableCommitEpochRequest {}; - Ok(self - .inner - .disable_commit_epoch(req) - .await? - .current_version - .unwrap()) + Ok(HummockVersion::from_rpc_protobuf( + &self + .inner + .disable_commit_epoch(req) + .await? + .current_version + .unwrap(), + )) } pub async fn pin_specific_snapshot(&self, epoch: HummockEpoch) -> Result { @@ -1284,12 +1294,14 @@ impl HummockMetaClient for MetaClient { async fn get_current_version(&self) -> Result { let req = GetCurrentVersionRequest::default(); - Ok(self - .inner - .get_current_version(req) - .await? - .current_version - .unwrap()) + Ok(HummockVersion::from_rpc_protobuf( + &self + .inner + .get_current_version(req) + .await? + .current_version + .unwrap(), + )) } async fn pin_snapshot(&self) -> Result { diff --git a/src/storage/backup/src/lib.rs b/src/storage/backup/src/lib.rs index f4b4b7e84b285..f094d16010c23 100644 --- a/src/storage/backup/src/lib.rs +++ b/src/storage/backup/src/lib.rs @@ -37,10 +37,9 @@ use std::collections::HashSet; use std::hash::Hasher; use itertools::Itertools; -use risingwave_hummock_sdk::compaction_group::hummock_version_ext::HummockVersionExt; +use risingwave_hummock_sdk::version::HummockVersion; use risingwave_hummock_sdk::{HummockSstableObjectId, HummockVersionId}; use risingwave_pb::backup_service::{PbMetaSnapshotManifest, PbMetaSnapshotMetadata}; -use risingwave_pb::hummock::HummockVersion; use serde::{Deserialize, Serialize}; use crate::error::{BackupError, BackupResult}; diff --git a/src/storage/backup/src/meta_snapshot.rs b/src/storage/backup/src/meta_snapshot.rs index c42fcc5d5851f..e7049fbe0ae41 100644 --- a/src/storage/backup/src/meta_snapshot.rs +++ b/src/storage/backup/src/meta_snapshot.rs @@ -15,7 +15,7 @@ use std::fmt::{Display, Formatter}; use bytes::{Buf, BufMut}; -use risingwave_pb::hummock::HummockVersion; +use risingwave_hummock_sdk::version::HummockVersion; use crate::error::BackupResult; use crate::{xxhash64_checksum, xxhash64_verify, MetaSnapshotId}; diff --git a/src/storage/backup/src/meta_snapshot_v1.rs b/src/storage/backup/src/meta_snapshot_v1.rs index 731107723c2f4..76a8a548eacd7 100644 --- a/src/storage/backup/src/meta_snapshot_v1.rs +++ b/src/storage/backup/src/meta_snapshot_v1.rs @@ -18,10 +18,11 @@ use std::fmt::{Display, Formatter}; use bytes::{Buf, BufMut}; use itertools::Itertools; use risingwave_common::util::iter_util::ZipEqFast; +use risingwave_hummock_sdk::version::HummockVersion; use risingwave_pb::catalog::{ Connection, Database, Function, Index, Schema, Sink, Source, Table, View, }; -use risingwave_pb::hummock::{CompactionGroup, HummockVersion, HummockVersionStats}; +use risingwave_pb::hummock::{CompactionGroup, HummockVersionStats}; use risingwave_pb::meta::{SystemParams, TableFragments}; use risingwave_pb::user::UserInfo; @@ -127,7 +128,7 @@ impl ClusterMetadata { let default_cf_values = self.default_cf.values().collect_vec(); Self::encode_prost_message_list(&default_cf_keys, buf); Self::encode_prost_message_list(&default_cf_values, buf); - Self::encode_prost_message(&self.hummock_version, buf); + Self::encode_prost_message(&self.hummock_version.to_protobuf(), buf); Self::encode_prost_message(&self.version_stats, buf); Self::encode_prost_message_list(&self.compaction_groups.iter().collect_vec(), buf); Self::encode_prost_message_list(&self.table_fragments.iter().collect_vec(), buf); @@ -153,7 +154,8 @@ impl ClusterMetadata { .into_iter() .zip_eq_fast(default_cf_values.into_iter()) .collect(); - let hummock_version = Self::decode_prost_message(&mut buf)?; + let hummock_version = + HummockVersion::from_persisted_protobuf(&Self::decode_prost_message(&mut buf)?); let version_stats = Self::decode_prost_message(&mut buf)?; let compaction_groups: Vec = Self::decode_prost_message_list(&mut buf)?; let table_fragments: Vec = Self::decode_prost_message_list(&mut buf)?; diff --git a/src/storage/backup/src/meta_snapshot_v2.rs b/src/storage/backup/src/meta_snapshot_v2.rs index 092cfd083783c..57bb43e24295a 100644 --- a/src/storage/backup/src/meta_snapshot_v2.rs +++ b/src/storage/backup/src/meta_snapshot_v2.rs @@ -15,8 +15,8 @@ use std::fmt::{Display, Formatter}; use bytes::{Buf, BufMut}; +use risingwave_hummock_sdk::version::HummockVersion; use risingwave_meta_model_v2 as model_v2; -use risingwave_pb::hummock::HummockVersion; use serde::{Deserialize, Serialize}; use crate::meta_snapshot::{MetaSnapshot, Metadata}; @@ -51,7 +51,7 @@ impl Display for MetadataV2 { impl Metadata for MetadataV2 { fn encode_to(&self, buf: &mut Vec) -> BackupResult<()> { put_with_len_prefix(buf, &self.cluster_id)?; - put_with_len_prefix(buf, &self.hummock_version)?; + put_with_len_prefix(buf, &self.hummock_version.to_protobuf())?; put_with_len_prefix(buf, &self.version_stats)?; put_with_len_prefix(buf, &self.compaction_configs)?; // TODO: other metadata @@ -63,13 +63,13 @@ impl Metadata for MetadataV2 { Self: Sized, { let cluster_id = get_with_len_prefix(&mut buf)?; - let hummock_version = get_with_len_prefix(&mut buf)?; + let pb_hummock_version = get_with_len_prefix(&mut buf)?; let version_stats = get_with_len_prefix(&mut buf)?; let compaction_configs = get_with_len_prefix(&mut buf)?; // TODO: other metadata Ok(Self { cluster_id, - hummock_version, + hummock_version: HummockVersion::from_persisted_protobuf(&pb_hummock_version), version_stats, compaction_configs, }) diff --git a/src/storage/hummock_sdk/Cargo.toml b/src/storage/hummock_sdk/Cargo.toml index 10752894f4cc7..e3327bdf993cc 100644 --- a/src/storage/hummock_sdk/Cargo.toml +++ b/src/storage/hummock_sdk/Cargo.toml @@ -19,6 +19,7 @@ easy-ext = "1" hex = "0.4" itertools = "0.12" parse-display = "0.8" +prost = { workspace = true } risingwave_common = { workspace = true } risingwave_pb = { workspace = true } tracing = "0.1" diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index 807748cc77d6a..033f8ab54e471 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -21,10 +21,11 @@ use risingwave_common::catalog::TableId; use risingwave_pb::hummock::group_delta::DeltaType; use risingwave_pb::hummock::hummock_version::Levels; use risingwave_pb::hummock::hummock_version_delta::GroupDeltas; +use risingwave_pb::hummock::table_watermarks::PbEpochNewWatermarks; use risingwave_pb::hummock::{ CompactionConfig, CompatibilityVersion, GroupConstruct, GroupDestroy, GroupMetaChange, - GroupTableChange, HummockVersion, HummockVersionDelta, Level, LevelType, OverlappingLevel, - PbLevelType, PbTableWatermarks, SstableInfo, + GroupTableChange, Level, LevelType, OverlappingLevel, PbLevelType, PbTableWatermarks, + SstableInfo, }; use tracing::warn; @@ -32,7 +33,8 @@ use super::StateTableId; use crate::compaction_group::StaticCompactionGroupId; use crate::key_range::KeyRangeCommon; use crate::prost_key_range::KeyRangeExt; -use crate::table_watermark::{PbTableWatermarksExt, TableWatermarks, TableWatermarksIndex}; +use crate::table_watermark::{TableWatermarks, TableWatermarksIndex, VnodeWatermark}; +use crate::version::{HummockVersion, HummockVersionDelta}; use crate::{can_concat, CompactionGroupId, HummockSstableId, HummockSstableObjectId}; pub struct GroupDeltasSummary { @@ -121,7 +123,6 @@ pub struct SstDeltaInfo { pub type BranchedSstInfo = HashMap; -#[easy_ext::ext(HummockVersionExt)] impl HummockVersion { pub fn get_compaction_group_levels(&self, compaction_group_id: CompactionGroupId) -> &Levels { self.levels @@ -195,9 +196,8 @@ impl HummockVersion { .iter() .map(|(table_id, table_watermarks)| { ( - TableId::from(*table_id), - TableWatermarks::from_protobuf(table_watermarks) - .build_index(self.max_committed_epoch), + *table_id, + table_watermarks.build_index(self.max_committed_epoch), ) }) .collect() @@ -208,20 +208,27 @@ impl HummockVersion { existing_table_ids: &[u32], ) -> BTreeMap { fn extract_single_table_watermark( - table_watermarks: &PbTableWatermarks, + table_watermarks: &TableWatermarks, safe_epoch: u64, ) -> Option { - if let Some(first_epoch_watermark) = table_watermarks.epoch_watermarks.first() { + if let Some((first_epoch, first_epoch_watermark)) = table_watermarks.watermarks.first() + { assert!( - first_epoch_watermark.epoch >= safe_epoch, + *first_epoch >= safe_epoch, "smallest epoch {} in table watermark should be at least safe epoch {}", - first_epoch_watermark.epoch, + first_epoch, safe_epoch ); - if first_epoch_watermark.epoch == safe_epoch { + if *first_epoch == safe_epoch { Some(PbTableWatermarks { - epoch_watermarks: vec![first_epoch_watermark.clone()], - is_ascending: table_watermarks.is_ascending, + epoch_watermarks: vec![PbEpochNewWatermarks { + watermarks: first_epoch_watermark + .iter() + .map(VnodeWatermark::to_protobuf) + .collect(), + epoch: *first_epoch, + }], + is_ascending: table_watermarks.direction.is_ascending(), }) } else { None @@ -233,12 +240,12 @@ impl HummockVersion { self.table_watermarks .iter() .filter_map(|(table_id, table_watermarks)| { - let u32_table_id = *table_id as _; + let u32_table_id = table_id.table_id(); if !existing_table_ids.contains(&u32_table_id) { None } else { extract_single_table_watermark(table_watermarks, self.safe_epoch) - .map(|table_watermarks| (*table_id, table_watermarks)) + .map(|table_watermarks| (table_id.table_id, table_watermarks)) } }) .collect() @@ -256,7 +263,6 @@ pub type SstSplitInfo = ( HummockSstableId, ); -#[easy_ext::ext(HummockVersionUpdateExt)] impl HummockVersion { pub fn count_new_ssts_in_group_split( &self, @@ -1249,13 +1255,12 @@ mod tests { use risingwave_pb::hummock::hummock_version::Levels; use risingwave_pb::hummock::hummock_version_delta::GroupDeltas; use risingwave_pb::hummock::{ - CompactionConfig, GroupConstruct, GroupDelta, GroupDestroy, HummockVersion, - HummockVersionDelta, IntraLevelDelta, Level, LevelType, OverlappingLevel, SstableInfo, + CompactionConfig, GroupConstruct, GroupDelta, GroupDestroy, IntraLevelDelta, Level, + LevelType, OverlappingLevel, SstableInfo, }; - use crate::compaction_group::hummock_version_ext::{ - build_initial_compaction_group_levels, HummockVersionExt, HummockVersionUpdateExt, - }; + use crate::compaction_group::hummock_version_ext::build_initial_compaction_group_levels; + use crate::version::{HummockVersion, HummockVersionDelta}; #[test] fn test_get_sst_object_ids() { diff --git a/src/storage/hummock_sdk/src/lib.rs b/src/storage/hummock_sdk/src/lib.rs index be041c1588465..8ac1515de5220 100644 --- a/src/storage/hummock_sdk/src/lib.rs +++ b/src/storage/hummock_sdk/src/lib.rs @@ -44,6 +44,7 @@ pub mod key_range; pub mod prost_key_range; pub mod table_stats; pub mod table_watermark; +pub mod version; pub use compact::*; diff --git a/src/storage/hummock_sdk/src/table_stats.rs b/src/storage/hummock_sdk/src/table_stats.rs index 02da4999bebc3..40d799ce7256c 100644 --- a/src/storage/hummock_sdk/src/table_stats.rs +++ b/src/storage/hummock_sdk/src/table_stats.rs @@ -15,9 +15,9 @@ use std::borrow::Borrow; use std::collections::{HashMap, HashSet}; -use risingwave_pb::hummock::{HummockVersion, PbTableStats}; +use risingwave_pb::hummock::PbTableStats; -use crate::compaction_group::hummock_version_ext::HummockVersionExt; +use crate::version::HummockVersion; pub type TableStatsMap = HashMap; diff --git a/src/storage/hummock_sdk/src/table_watermark.rs b/src/storage/hummock_sdk/src/table_watermark.rs index 2f76cff6cc5e4..50ecaec0ad69e 100644 --- a/src/storage/hummock_sdk/src/table_watermark.rs +++ b/src/storage/hummock_sdk/src/table_watermark.rs @@ -15,11 +15,14 @@ use std::cmp::Ordering; use std::collections::hash_map::Entry; use std::collections::{btree_map, BTreeMap, HashMap, HashSet}; +use std::mem::size_of; use std::ops::Bound::{Excluded, Included, Unbounded}; use std::sync::Arc; use bytes::Bytes; use risingwave_common::buffer::{Bitmap, BitmapBuilder}; +use risingwave_common::catalog::TableId; +use risingwave_common::estimate_size::EstimateSize; use risingwave_common::hash::{VirtualNode, VnodeBitmapExt}; use risingwave_pb::hummock::table_watermarks::PbEpochNewWatermarks; use risingwave_pb::hummock::{PbTableWatermarks, PbVnodeWatermark}; @@ -288,9 +291,16 @@ impl WatermarkDirection { WatermarkDirection::Descending => key > watermark, } } + + pub fn is_ascending(&self) -> bool { + match self { + WatermarkDirection::Ascending => true, + WatermarkDirection::Descending => false, + } + } } -#[derive(Clone, Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq, EstimateSize)] pub struct VnodeWatermark { vnode_bitmap: Arc, watermark: Bytes, @@ -323,11 +333,11 @@ impl VnodeWatermark { } } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq)] pub struct TableWatermarks { // later epoch at the back - watermarks: Vec<(HummockEpoch, Vec)>, - direction: WatermarkDirection, + pub(crate) watermarks: Vec<(HummockEpoch, Vec)>, + pub(crate) direction: WatermarkDirection, } impl TableWatermarks { @@ -359,6 +369,21 @@ impl TableWatermarks { } } + pub fn estimated_encode_len(&self) -> usize { + self.watermarks.len() * size_of::() + + self + .watermarks + .iter() + .map(|(_, watermarks)| { + watermarks + .iter() + .map(|watermark| watermark.estimated_size()) + .sum::() + }) + .sum::() + + size_of::() // for direction + } + pub fn add_new_epoch_watermarks( &mut self, epoch: HummockEpoch, @@ -411,106 +436,104 @@ impl TableWatermarks { } pub fn merge_multiple_new_table_watermarks( - table_watermarks_list: impl IntoIterator>, -) -> HashMap { - let mut ret: HashMap)> = HashMap::new(); + table_watermarks_list: impl IntoIterator>, +) -> HashMap { + let mut ret: HashMap>)> = + HashMap::new(); for table_watermarks in table_watermarks_list { for (table_id, new_table_watermarks) in table_watermarks { let epoch_watermarks = match ret.entry(table_id) { Entry::Occupied(entry) => { - let (is_ascending, epoch_watermarks) = entry.into_mut(); - assert_eq!(new_table_watermarks.is_ascending, *is_ascending); + let (direction, epoch_watermarks) = entry.into_mut(); + assert_eq!(&new_table_watermarks.direction, direction); epoch_watermarks } Entry::Vacant(entry) => { let (_, epoch_watermarks) = - entry.insert((new_table_watermarks.is_ascending, BTreeMap::new())); + entry.insert((new_table_watermarks.direction, BTreeMap::new())); epoch_watermarks } }; - for new_epoch_watermarks in new_table_watermarks.epoch_watermarks { + for (new_epoch, new_epoch_watermarks) in new_table_watermarks.watermarks { epoch_watermarks - .entry(new_epoch_watermarks.epoch) - .or_insert_with(|| PbEpochNewWatermarks { - watermarks: vec![], - epoch: new_epoch_watermarks.epoch, - }) - .watermarks - .extend(new_epoch_watermarks.watermarks); + .entry(new_epoch) + .or_insert_with(Vec::new) + .extend(new_epoch_watermarks); } } } ret.into_iter() - .map(|(table_id, (is_ascending, epoch_watermarks))| { + .map(|(table_id, (direction, epoch_watermarks))| { ( table_id, - PbTableWatermarks { - is_ascending, + TableWatermarks { + direction, // ordered from earlier epoch to later epoch - epoch_watermarks: epoch_watermarks.into_values().collect(), + watermarks: epoch_watermarks.into_iter().collect(), }, ) }) .collect() } -#[easy_ext::ext(PbTableWatermarksExt)] -impl PbTableWatermarks { - pub fn apply_new_table_watermarks(&mut self, newly_added_watermarks: &PbTableWatermarks) { - assert_eq!(self.is_ascending, newly_added_watermarks.is_ascending); - assert!(self.epoch_watermarks.iter().map(|w| w.epoch).is_sorted()); +impl TableWatermarks { + pub fn apply_new_table_watermarks(&mut self, newly_added_watermarks: &TableWatermarks) { + assert_eq!(self.direction, newly_added_watermarks.direction); + assert!(self.watermarks.iter().map(|(epoch, _)| epoch).is_sorted()); assert!(newly_added_watermarks - .epoch_watermarks + .watermarks .iter() - .map(|w| w.epoch) + .map(|(epoch, _)| epoch) .is_sorted()); // ensure that the newly added watermarks have a later epoch than the previous latest epoch. - if let Some(prev_last_epoch_watermarks) = self.epoch_watermarks.last() && let Some(new_first_epoch_watermarks) = newly_added_watermarks.epoch_watermarks.first() { - assert!(prev_last_epoch_watermarks.epoch < new_first_epoch_watermarks.epoch); + if let Some((prev_last_epoch, _)) = self.watermarks.last() + && let Some((new_first_epoch, _)) = newly_added_watermarks.watermarks.first() { + assert!(prev_last_epoch < new_first_epoch); } - self.epoch_watermarks - .extend(newly_added_watermarks.epoch_watermarks.clone()); + self.watermarks.extend( + newly_added_watermarks + .watermarks + .iter() + .map(|(epoch, new_watermarks)| (*epoch, new_watermarks.clone())), + ); } pub fn clear_stale_epoch_watermark(&mut self, safe_epoch: u64) { - match self.epoch_watermarks.first() { + match self.watermarks.first() { None => { // return on empty watermark return; } - Some(earliest_epoch_watermark) => { - if earliest_epoch_watermark.epoch >= safe_epoch { + Some((earliest_epoch, _)) => { + if *earliest_epoch >= safe_epoch { // No stale epoch watermark needs to be cleared. return; } } } debug!("clear stale table watermark below epoch {}", safe_epoch); - let mut result_epoch_watermark = Vec::with_capacity(self.epoch_watermarks.len()); + let mut result_epoch_watermark = Vec::with_capacity(self.watermarks.len()); let mut unset_vnode: HashSet = (0..VirtualNode::COUNT) .map(VirtualNode::from_index) .collect(); - while let Some(epoch_watermark) = self.epoch_watermarks.last() { - if epoch_watermark.epoch >= safe_epoch { - let epoch_watermark = self.epoch_watermarks.pop().expect("have check Some"); - for watermark in &epoch_watermark.watermarks { - for vnode in - Bitmap::from(watermark.vnode_bitmap.as_ref().expect("should not be None")) - .iter_vnodes() - { + while let Some((epoch, _)) = self.watermarks.last() { + if *epoch >= safe_epoch { + let (epoch, watermarks) = self.watermarks.pop().expect("have check Some"); + for watermark in &watermarks { + for vnode in watermark.vnode_bitmap.iter_vnodes() { unset_vnode.remove(&vnode); } } - result_epoch_watermark.push(epoch_watermark); + result_epoch_watermark.push((epoch, watermarks)); } else { break; } } - while !unset_vnode.is_empty() && let Some(epoch_watermark) = self.epoch_watermarks.pop() { + while !unset_vnode.is_empty() && let Some((_, watermarks)) = self.watermarks.pop() { let mut new_vnode_watermarks = Vec::new(); - for vnode_watermark in &epoch_watermark.watermarks { + for vnode_watermark in watermarks { let mut set_vnode = Vec::new(); - for vnode in Bitmap::from(vnode_watermark.vnode_bitmap.as_ref().expect("should not be None")).iter_vnodes() { + for vnode in vnode_watermark.vnode_bitmap.iter_vnodes() { if unset_vnode.remove(&vnode) { set_vnode.push(vnode); } @@ -520,36 +543,34 @@ impl PbTableWatermarks { for vnode in set_vnode { builder.set(vnode.to_index(), true); } - let bitmap = builder.finish(); - new_vnode_watermarks.push(PbVnodeWatermark { - vnode_bitmap: Some(bitmap.to_protobuf()), - watermark: vnode_watermark.watermark.clone(), + let bitmap = Arc::new(builder.finish()); + new_vnode_watermarks.push(VnodeWatermark { + vnode_bitmap: bitmap, + watermark: vnode_watermark.watermark, }) } } if !new_vnode_watermarks.is_empty() { - if let Some(last_epoch_watermark) = result_epoch_watermark.last_mut() && last_epoch_watermark.epoch == safe_epoch { - last_epoch_watermark.watermarks.extend(new_vnode_watermarks); + if let Some((last_epoch, last_watermarks)) = result_epoch_watermark.last_mut() && *last_epoch == safe_epoch { + last_watermarks.extend(new_vnode_watermarks); } else { - result_epoch_watermark.push(PbEpochNewWatermarks { - watermarks: new_vnode_watermarks, - // set epoch as safe epoch - epoch: safe_epoch, - }) + result_epoch_watermark.push((safe_epoch, new_vnode_watermarks)); } } } // epoch watermark are added from later epoch to earlier epoch. // reverse to ensure that earlier epochs are at the front result_epoch_watermark.reverse(); - assert!(result_epoch_watermark.is_sorted_by(|first, second| { - let ret = first.epoch.cmp(&second.epoch); - assert_ne!(ret, Ordering::Equal); - Some(ret) - })); - *self = PbTableWatermarks { - epoch_watermarks: result_epoch_watermark, - is_ascending: self.is_ascending, + assert!( + result_epoch_watermark.is_sorted_by(|(first_epoch, _), (second_epoch, _)| { + let ret = first_epoch.cmp(second_epoch); + assert_ne!(ret, Ordering::Equal); + Some(ret) + }) + ); + *self = TableWatermarks { + watermarks: result_epoch_watermark, + direction: self.direction, } } } @@ -566,18 +587,16 @@ mod tests { use risingwave_common::buffer::{Bitmap, BitmapBuilder}; use risingwave_common::catalog::TableId; use risingwave_common::hash::VirtualNode; - use risingwave_pb::hummock::table_watermarks::PbEpochNewWatermarks; - use risingwave_pb::hummock::{HummockVersion, PbTableWatermarks, PbVnodeWatermark}; - use crate::compaction_group::hummock_version_ext::HummockVersionExt; use crate::key::{ is_empty_key_range, map_table_key_range, prefix_slice_with_vnode, prefixed_range_with_vnode, TableKeyRange, }; use crate::table_watermark::{ - merge_multiple_new_table_watermarks, PbTableWatermarksExt, TableWatermarks, - TableWatermarksIndex, VnodeWatermark, WatermarkDirection, + merge_multiple_new_table_watermarks, TableWatermarks, TableWatermarksIndex, VnodeWatermark, + WatermarkDirection, }; + use crate::version::HummockVersion; fn build_bitmap(vnodes: impl IntoIterator) -> Arc { let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT); @@ -613,7 +632,7 @@ mod tests { direction, ); - let mut pb_table_watermark = table_watermarks.to_protobuf(); + let mut table_watermark_checkpoint = table_watermarks.clone(); let epoch3 = epoch2 + 1; let mut second_table_watermark = TableWatermarks::single_epoch( @@ -651,8 +670,8 @@ mod tests { direction, ); - pb_table_watermark.apply_new_table_watermarks(&second_table_watermark.to_protobuf()); - assert_eq!(table_watermarks.to_protobuf(), pb_table_watermark); + table_watermark_checkpoint.apply_new_table_watermarks(&second_table_watermark); + assert_eq!(table_watermarks, table_watermark_checkpoint); } #[test] @@ -700,13 +719,13 @@ mod tests { direction, ); - let mut pb_table_watermarks = table_watermarks.to_protobuf(); - pb_table_watermarks.clear_stale_epoch_watermark(epoch1); - assert_eq!(pb_table_watermarks, table_watermarks.to_protobuf()); + let mut table_watermarks_checkpoint = table_watermarks.clone(); + table_watermarks_checkpoint.clear_stale_epoch_watermark(epoch1); + assert_eq!(table_watermarks_checkpoint, table_watermarks); - pb_table_watermarks.clear_stale_epoch_watermark(epoch2); + table_watermarks_checkpoint.clear_stale_epoch_watermark(epoch2); assert_eq!( - pb_table_watermarks, + table_watermarks_checkpoint, TableWatermarks { watermarks: vec![ ( @@ -733,12 +752,11 @@ mod tests { ], direction, } - .to_protobuf() ); - pb_table_watermarks.clear_stale_epoch_watermark(epoch3); + table_watermarks_checkpoint.clear_stale_epoch_watermark(epoch3); assert_eq!( - pb_table_watermarks, + table_watermarks_checkpoint, TableWatermarks { watermarks: vec![ ( @@ -758,12 +776,11 @@ mod tests { ], direction, } - .to_protobuf() ); - pb_table_watermarks.clear_stale_epoch_watermark(epoch4); + table_watermarks_checkpoint.clear_stale_epoch_watermark(epoch4); assert_eq!( - pb_table_watermarks, + table_watermarks_checkpoint, TableWatermarks { watermarks: vec![ ( @@ -783,12 +800,11 @@ mod tests { ], direction, } - .to_protobuf() ); - pb_table_watermarks.clear_stale_epoch_watermark(epoch5); + table_watermarks_checkpoint.clear_stale_epoch_watermark(epoch5); assert_eq!( - pb_table_watermarks, + table_watermarks_checkpoint, TableWatermarks { watermarks: vec![( epoch5, @@ -802,35 +818,34 @@ mod tests { )], direction, } - .to_protobuf() ); } #[test] fn test_merge_multiple_new_table_watermarks() { - fn epoch_new_watermark(epoch: u64, bitmaps: Vec<&Bitmap>) -> PbEpochNewWatermarks { - PbEpochNewWatermarks { - watermarks: bitmaps + fn epoch_new_watermark(epoch: u64, bitmaps: Vec<&Bitmap>) -> (u64, Vec) { + ( + epoch, + bitmaps .into_iter() - .map(|bitmap| PbVnodeWatermark { - watermark: vec![1, 2, epoch as _], - vnode_bitmap: Some(bitmap.to_protobuf()), + .map(|bitmap| VnodeWatermark { + watermark: Bytes::from(vec![1, 2, epoch as _]), + vnode_bitmap: Arc::new(bitmap.clone()), }) .collect(), - epoch: epoch as _, - } + ) } fn build_table_watermark( vnodes: impl IntoIterator, epochs: impl IntoIterator, - ) -> PbTableWatermarks { + ) -> TableWatermarks { let bitmap = build_bitmap(vnodes); - PbTableWatermarks { - epoch_watermarks: epochs + TableWatermarks { + watermarks: epochs .into_iter() .map(|epoch: u64| epoch_new_watermark(epoch, vec![&bitmap])) .collect(), - is_ascending: true, + direction: WatermarkDirection::Ascending, } } let table1_watermark1 = build_table_watermark(0..3, vec![1, 2, 4]); @@ -838,27 +853,27 @@ mod tests { let table2_watermark = build_table_watermark(0..4, 1..3); let table3_watermark = build_table_watermark(0..4, 3..5); let mut first = HashMap::new(); - first.insert(1, table1_watermark1); - first.insert(2, table2_watermark.clone()); + first.insert(TableId::new(1), table1_watermark1); + first.insert(TableId::new(2), table2_watermark.clone()); let mut second = HashMap::new(); - second.insert(1, table1_watermark2); - second.insert(3, table3_watermark.clone()); + second.insert(TableId::new(1), table1_watermark2); + second.insert(TableId::new(3), table3_watermark.clone()); let result = merge_multiple_new_table_watermarks(vec![first, second]); let mut expected = HashMap::new(); expected.insert( - 1, - PbTableWatermarks { - epoch_watermarks: vec![ + TableId::new(1), + TableWatermarks { + watermarks: vec![ epoch_new_watermark(1, vec![&build_bitmap(0..3), &build_bitmap(4..6)]), epoch_new_watermark(2, vec![&build_bitmap(0..3), &build_bitmap(4..6)]), epoch_new_watermark(4, vec![&build_bitmap(0..3)]), epoch_new_watermark(5, vec![&build_bitmap(4..6)]), ], - is_ascending: true, + direction: WatermarkDirection::Ascending, }, ); - expected.insert(2, table2_watermark); - expected.insert(3, table3_watermark); + expected.insert(TableId::new(2), table2_watermark); + expected.insert(TableId::new(3), table3_watermark); assert_eq!(result, expected); } @@ -1053,16 +1068,16 @@ mod tests { }; let test_table_id = TableId::from(233); version.table_watermarks.insert( - test_table_id.table_id, - PbTableWatermarks { - epoch_watermarks: vec![PbEpochNewWatermarks { - watermarks: vec![PbVnodeWatermark { - watermark: watermark1.to_vec(), - vnode_bitmap: Some(build_bitmap(0..VirtualNode::COUNT).to_protobuf()), + test_table_id, + TableWatermarks { + watermarks: vec![( + EPOCH1, + vec![VnodeWatermark { + watermark: watermark1.clone(), + vnode_bitmap: build_bitmap(0..VirtualNode::COUNT), }], - epoch: EPOCH1, - }], - is_ascending: true, + )], + direction: WatermarkDirection::Ascending, }, ); let committed_index = version diff --git a/src/storage/hummock_sdk/src/version.rs b/src/storage/hummock_sdk/src/version.rs new file mode 100644 index 0000000000000..758d374e21321 --- /dev/null +++ b/src/storage/hummock_sdk/src/version.rs @@ -0,0 +1,192 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::mem::size_of; + +use prost::Message; +use risingwave_common::catalog::TableId; +use risingwave_pb::hummock::hummock_version::PbLevels; +use risingwave_pb::hummock::hummock_version_delta::PbGroupDeltas; +use risingwave_pb::hummock::{PbHummockVersion, PbHummockVersionDelta}; + +use crate::table_watermark::TableWatermarks; +use crate::{CompactionGroupId, HummockSstableObjectId}; + +#[derive(Debug, Clone, PartialEq)] +pub struct HummockVersion { + pub id: u64, + pub levels: HashMap, + pub max_committed_epoch: u64, + pub safe_epoch: u64, + pub table_watermarks: HashMap, +} + +impl Default for HummockVersion { + fn default() -> Self { + HummockVersion::from_protobuf_inner(&PbHummockVersion::default()) + } +} + +impl HummockVersion { + /// Convert the `PbHummockVersion` received from rpc to `HummockVersion`. No need to + /// maintain backward compatibility. + pub fn from_rpc_protobuf(pb_version: &PbHummockVersion) -> Self { + Self::from_protobuf_inner(pb_version) + } + + /// Convert the `PbHummockVersion` deserialized from persisted state to `HummockVersion`. + /// We should maintain backward compatibility. + pub fn from_persisted_protobuf(pb_version: &PbHummockVersion) -> Self { + Self::from_protobuf_inner(pb_version) + } + + fn from_protobuf_inner(pb_version: &PbHummockVersion) -> Self { + Self { + id: pb_version.id, + levels: pb_version + .levels + .iter() + .map(|(group_id, levels)| (*group_id as CompactionGroupId, levels.clone())) + .collect(), + max_committed_epoch: pb_version.max_committed_epoch, + safe_epoch: pb_version.safe_epoch, + table_watermarks: pb_version + .table_watermarks + .iter() + .map(|(table_id, table_watermark)| { + ( + TableId::new(*table_id), + TableWatermarks::from_protobuf(table_watermark), + ) + }) + .collect(), + } + } + + pub fn to_protobuf(&self) -> PbHummockVersion { + PbHummockVersion { + id: self.id, + levels: self + .levels + .iter() + .map(|(group_id, levels)| (*group_id as _, levels.clone())) + .collect(), + max_committed_epoch: self.max_committed_epoch, + safe_epoch: self.safe_epoch, + table_watermarks: self + .table_watermarks + .iter() + .map(|(table_id, watermark)| (table_id.table_id, watermark.to_protobuf())) + .collect(), + } + } + + pub fn estimated_encode_len(&self) -> usize { + self.levels.len() * size_of::() + + self + .levels + .values() + .map(|level| level.encoded_len()) + .sum::() + + self.table_watermarks.len() * size_of::() + + self + .table_watermarks + .values() + .map(|table_watermark| table_watermark.estimated_encode_len()) + .sum::() + } +} + +#[derive(Debug, PartialEq, Clone)] +pub struct HummockVersionDelta { + pub id: u64, + pub prev_id: u64, + pub group_deltas: HashMap, + pub max_committed_epoch: u64, + pub safe_epoch: u64, + pub trivial_move: bool, + pub gc_object_ids: Vec, + pub new_table_watermarks: HashMap, + pub removed_table_ids: Vec, +} + +impl Default for HummockVersionDelta { + fn default() -> Self { + HummockVersionDelta::from_protobuf_inner(&PbHummockVersionDelta::default()) + } +} + +impl HummockVersionDelta { + /// Convert the `PbHummockVersionDelta` deserialized from persisted state to `HummockVersionDelta`. + /// We should maintain backward compatibility. + pub fn from_persisted_protobuf(delta: &PbHummockVersionDelta) -> Self { + Self::from_protobuf_inner(delta) + } + + /// Convert the `PbHummockVersionDelta` received from rpc to `HummockVersionDelta`. No need to + /// maintain backward compatibility. + pub fn from_rpc_protobuf(delta: &PbHummockVersionDelta) -> Self { + Self::from_protobuf_inner(delta) + } + + fn from_protobuf_inner(delta: &PbHummockVersionDelta) -> Self { + Self { + id: delta.id, + prev_id: delta.prev_id, + group_deltas: delta.group_deltas.clone(), + max_committed_epoch: delta.max_committed_epoch, + safe_epoch: delta.safe_epoch, + trivial_move: delta.trivial_move, + gc_object_ids: delta.gc_object_ids.clone(), + new_table_watermarks: delta + .new_table_watermarks + .iter() + .map(|(table_id, watermarks)| { + ( + TableId::new(*table_id), + TableWatermarks::from_protobuf(watermarks), + ) + }) + .collect(), + removed_table_ids: delta + .removed_table_ids + .iter() + .map(|table_id| TableId::new(*table_id)) + .collect(), + } + } + + pub fn to_protobuf(&self) -> PbHummockVersionDelta { + PbHummockVersionDelta { + id: self.id, + prev_id: self.prev_id, + group_deltas: self.group_deltas.clone(), + max_committed_epoch: self.max_committed_epoch, + safe_epoch: self.safe_epoch, + trivial_move: self.trivial_move, + gc_object_ids: self.gc_object_ids.clone(), + new_table_watermarks: self + .new_table_watermarks + .iter() + .map(|(table_id, watermarks)| (table_id.table_id, watermarks.to_protobuf())) + .collect(), + removed_table_ids: self + .removed_table_ids + .iter() + .map(|table_id| table_id.table_id) + .collect(), + } + } +} diff --git a/src/storage/hummock_test/src/compactor_tests.rs b/src/storage/hummock_test/src/compactor_tests.rs index 4edd044d5838c..be0ebe204d745 100644 --- a/src/storage/hummock_test/src/compactor_tests.rs +++ b/src/storage/hummock_test/src/compactor_tests.rs @@ -29,11 +29,11 @@ pub(crate) mod tests { use risingwave_common::util::epoch::Epoch; use risingwave_common_service::observer_manager::NotificationClient; use risingwave_hummock_sdk::can_concat; - use risingwave_hummock_sdk::compaction_group::hummock_version_ext::HummockVersionExt; use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; use risingwave_hummock_sdk::key::{next_key, FullKey, TableKey, TABLE_PREFIX_LEN}; use risingwave_hummock_sdk::prost_key_range::KeyRangeExt; use risingwave_hummock_sdk::table_stats::to_prost_table_stats_map; + use risingwave_hummock_sdk::version::HummockVersion; use risingwave_meta::hummock::compaction::compaction_config::CompactionConfigBuilder; use risingwave_meta::hummock::compaction::selector::{ default_compaction_selector, ManualCompactionOption, @@ -44,7 +44,7 @@ pub(crate) mod tests { }; use risingwave_meta::hummock::{HummockManagerRef, MockHummockMetaClient}; use risingwave_pb::common::{HostAddress, WorkerType}; - use risingwave_pb::hummock::{CompactTask, HummockVersion, InputLevel, KeyRange, TableOption}; + use risingwave_pb::hummock::{CompactTask, InputLevel, KeyRange, TableOption}; use risingwave_pb::meta::add_worker_node_request::Property; use risingwave_rpc_client::HummockMetaClient; use risingwave_storage::filter_key_extractor::{ diff --git a/src/storage/hummock_test/src/mock_notification_client.rs b/src/storage/hummock_test/src/mock_notification_client.rs index 991a5a9d5bf84..998fcf39b010c 100644 --- a/src/storage/hummock_test/src/mock_notification_client.rs +++ b/src/storage/hummock_test/src/mock_notification_client.rs @@ -62,7 +62,7 @@ impl NotificationClient for MockNotificationClient { let hummock_version = self.hummock_manager.get_current_version().await; let meta_snapshot = MetaSnapshot { - hummock_version: Some(hummock_version), + hummock_version: Some(hummock_version.to_protobuf()), version: Some(Default::default()), meta_backup_manifest_id: Some(MetaBackupManifestId { id: 0 }), hummock_write_limits: Some(WriteLimits { diff --git a/src/storage/hummock_test/src/sync_point_tests.rs b/src/storage/hummock_test/src/sync_point_tests.rs index aa862d80085f7..37a7db52e7e1a 100644 --- a/src/storage/hummock_test/src/sync_point_tests.rs +++ b/src/storage/hummock_test/src/sync_point_tests.rs @@ -22,7 +22,6 @@ use risingwave_common::cache::CachePriority; use risingwave_common::catalog::hummock::CompactionFilterFlag; use risingwave_common::catalog::TableId; use risingwave_common::hash::VirtualNode; -use risingwave_hummock_sdk::compaction_group::hummock_version_ext::HummockVersionExt; use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; use risingwave_hummock_sdk::key::{next_key, user_key}; use risingwave_hummock_sdk::table_stats::to_prost_table_stats_map; diff --git a/src/storage/hummock_test/src/test_utils.rs b/src/storage/hummock_test/src/test_utils.rs index e06f798fc76a9..9ef7350e28228 100644 --- a/src/storage/hummock_test/src/test_utils.rs +++ b/src/storage/hummock_test/src/test_utils.rs @@ -29,14 +29,13 @@ use risingwave_meta::hummock::{HummockManagerRef, MockHummockMetaClient}; use risingwave_meta::manager::MetaSrvEnv; use risingwave_pb::catalog::{PbTable, Table}; use risingwave_pb::common::WorkerNode; -use risingwave_pb::hummock::version_update_payload; use risingwave_storage::error::StorageResult; use risingwave_storage::filter_key_extractor::{ FilterKeyExtractorImpl, FilterKeyExtractorManager, FullKeyFilterKeyExtractor, RpcFilterKeyExtractorManager, }; use risingwave_storage::hummock::backup_reader::BackupReader; -use risingwave_storage::hummock::event_handler::HummockEvent; +use risingwave_storage::hummock::event_handler::{HummockEvent, HummockVersionUpdate}; use risingwave_storage::hummock::iterator::test_utils::mock_sstable_store; use risingwave_storage::hummock::local_version::pinned_version::PinnedVersion; use risingwave_storage::hummock::observer_manager::HummockObserverNode; @@ -75,9 +74,7 @@ pub async fn prepare_first_valid_version( .await; observer_manager.start().await; let hummock_version = match rx.recv().await { - Some(HummockEvent::VersionUpdate(version_update_payload::Payload::PinnedVersion( - version, - ))) => version, + Some(HummockEvent::VersionUpdate(HummockVersionUpdate::PinnedVersion(version))) => version, _ => unreachable!("should be full version"), }; @@ -257,14 +254,7 @@ impl HummockTestEnv { pub async fn commit_epoch(&self, epoch: u64) { let res = self.storage.seal_and_sync_epoch(epoch).await.unwrap(); self.meta_client - .commit_epoch_with_watermark( - epoch, - res.uncommitted_ssts, - res.table_watermarks - .into_iter() - .map(|(table_id, watermark)| (table_id.table_id, watermark.to_protobuf())) - .collect(), - ) + .commit_epoch_with_watermark(epoch, res.uncommitted_ssts, res.table_watermarks) .await .unwrap(); diff --git a/src/storage/src/hummock/event_handler/hummock_event_handler.rs b/src/storage/src/hummock/event_handler/hummock_event_handler.rs index 498c41c6e635d..5d043d42806c6 100644 --- a/src/storage/src/hummock/event_handler/hummock_event_handler.rs +++ b/src/storage/src/hummock/event_handler/hummock_event_handler.rs @@ -21,9 +21,7 @@ use arc_swap::ArcSwap; use await_tree::InstrumentAwait; use parking_lot::RwLock; use prometheus::core::{AtomicU64, GenericGauge}; -use risingwave_hummock_sdk::compaction_group::hummock_version_ext::HummockVersionUpdateExt; use risingwave_hummock_sdk::{HummockEpoch, LocalSstableInfo}; -use risingwave_pb::hummock::version_update_payload::Payload; use tokio::spawn; use tokio::sync::{mpsc, oneshot}; use tracing::{debug, error, info, trace, warn}; @@ -37,7 +35,7 @@ use crate::hummock::event_handler::refiller::CacheRefillerEvent; use crate::hummock::event_handler::uploader::{ HummockUploader, SyncedData, UploadTaskInfo, UploadTaskPayload, UploaderEvent, }; -use crate::hummock::event_handler::HummockEvent; +use crate::hummock::event_handler::{HummockEvent, HummockVersionUpdate}; use crate::hummock::local_version::pinned_version::PinnedVersion; use crate::hummock::store::version::{ HummockReadVersion, StagingData, StagingSstableInfo, VersionUpdate, @@ -403,7 +401,7 @@ impl HummockEventHandler { }); } - fn handle_version_update(&mut self, version_payload: Payload) { + fn handle_version_update(&mut self, version_payload: HummockVersionUpdate) { let pinned_version = self .refiller .last_new_pinned_version() @@ -413,9 +411,9 @@ impl HummockEventHandler { let mut sst_delta_infos = vec![]; let newly_pinned_version = match version_payload { - Payload::VersionDeltas(version_deltas) => { - let mut version_to_apply = pinned_version.version(); - for version_delta in &version_deltas.version_deltas { + HummockVersionUpdate::VersionDeltas(version_deltas) => { + let mut version_to_apply = pinned_version.version().clone(); + for version_delta in &version_deltas { assert_eq!(version_to_apply.id, version_delta.prev_id); if version_to_apply.max_committed_epoch == version_delta.max_committed_epoch { sst_delta_infos = version_to_apply.build_sst_delta_infos(version_delta); @@ -425,7 +423,7 @@ impl HummockEventHandler { version_to_apply } - Payload::PinnedVersion(version) => version, + HummockVersionUpdate::PinnedVersion(version) => version, }; validate_table_key_range(&newly_pinned_version); diff --git a/src/storage/src/hummock/event_handler/mod.rs b/src/storage/src/hummock/event_handler/mod.rs index a6722b0d77116..b39b5fca708b4 100644 --- a/src/storage/src/hummock/event_handler/mod.rs +++ b/src/storage/src/hummock/event_handler/mod.rs @@ -18,7 +18,6 @@ use std::sync::Arc; use parking_lot::RwLock; use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::HummockEpoch; -use risingwave_pb::hummock::version_update_payload; use tokio::sync::{mpsc, oneshot}; use crate::hummock::shared_buffer::shared_buffer_batch::SharedBufferBatch; @@ -31,6 +30,7 @@ pub mod refiller; pub mod uploader; pub use hummock_event_handler::HummockEventHandler; +use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta}; use super::store::version::HummockReadVersion; @@ -41,6 +41,12 @@ pub struct BufferWriteRequest { pub grant_sender: oneshot::Sender<()>, } +#[derive(Debug)] +pub enum HummockVersionUpdate { + VersionDeltas(Vec), + PinnedVersion(HummockVersion), +} + pub enum HummockEvent { /// Notify that we may flush the shared buffer. BufferMayFlush, @@ -58,7 +64,7 @@ pub enum HummockEvent { Shutdown, - VersionUpdate(version_update_payload::Payload), + VersionUpdate(HummockVersionUpdate), ImmToUploader(ImmutableMemtable), diff --git a/src/storage/src/hummock/event_handler/uploader.rs b/src/storage/src/hummock/event_handler/uploader.rs index 66357753fd038..a23bacb940ae0 100644 --- a/src/storage/src/hummock/event_handler/uploader.rs +++ b/src/storage/src/hummock/event_handler/uploader.rs @@ -1146,8 +1146,9 @@ mod tests { use prometheus::core::GenericGauge; use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::key::{FullKey, TableKey}; + use risingwave_hummock_sdk::version::HummockVersion; use risingwave_hummock_sdk::{HummockEpoch, LocalSstableInfo}; - use risingwave_pb::hummock::{HummockVersion, KeyRange, SstableInfo}; + use risingwave_pb::hummock::{KeyRange, SstableInfo}; use spin::Mutex; use tokio::spawn; use tokio::sync::mpsc::unbounded_channel; diff --git a/src/storage/src/hummock/hummock_meta_client.rs b/src/storage/src/hummock/hummock_meta_client.rs index d56334b802355..94af950f42936 100644 --- a/src/storage/src/hummock/hummock_meta_client.rs +++ b/src/storage/src/hummock/hummock_meta_client.rs @@ -16,10 +16,9 @@ use std::sync::Arc; use async_trait::async_trait; use futures::stream::BoxStream; +use risingwave_hummock_sdk::version::HummockVersion; use risingwave_hummock_sdk::{HummockSstableObjectId, LocalSstableInfo, SstObjectIdRange}; -use risingwave_pb::hummock::{ - HummockSnapshot, HummockVersion, SubscribeCompactionEventRequest, VacuumTask, -}; +use risingwave_pb::hummock::{HummockSnapshot, SubscribeCompactionEventRequest, VacuumTask}; use risingwave_rpc_client::error::Result; use risingwave_rpc_client::{CompactionEventItem, HummockMetaClient, MetaClient}; use tokio::sync::mpsc::UnboundedSender; diff --git a/src/storage/src/hummock/local_version/pinned_version.rs b/src/storage/src/hummock/local_version/pinned_version.rs index 563d63f5c4705..14749c1fa1fc6 100644 --- a/src/storage/src/hummock/local_version/pinned_version.rs +++ b/src/storage/src/hummock/local_version/pinned_version.rs @@ -14,19 +14,16 @@ use std::collections::{BTreeMap, HashMap}; use std::iter::empty; -use std::ops::Deref; use std::sync::Arc; use std::time::{Duration, Instant}; use auto_enums::auto_enum; use risingwave_common::catalog::TableId; -use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{ - HummockVersionExt, HummockVersionUpdateExt, -}; use risingwave_hummock_sdk::table_watermark::TableWatermarksIndex; +use risingwave_hummock_sdk::version::HummockVersion; use risingwave_hummock_sdk::{CompactionGroupId, HummockVersionId, INVALID_VERSION_ID}; use risingwave_pb::hummock::hummock_version::Levels; -use risingwave_pb::hummock::{HummockVersion, Level}; +use risingwave_pb::hummock::PbLevel; use risingwave_rpc_client::HummockMetaClient; use tokio::sync::mpsc::error::TryRecvError; use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; @@ -145,7 +142,7 @@ impl PinnedVersion { self.version.levels.get(&compaction_group_id).unwrap() } - pub fn levels(&self, table_id: TableId) -> impl Iterator { + pub fn levels(&self, table_id: TableId) -> impl Iterator { #[auto_enum(Iterator)] match self.compaction_group_index.get(&table_id) { Some(compaction_group_id) => { @@ -172,8 +169,8 @@ impl PinnedVersion { } /// ret value can't be used as `HummockVersion`. it must be modified with delta - pub fn version(&self) -> HummockVersion { - self.version.deref().clone() + pub fn version(&self) -> &HummockVersion { + &self.version } } diff --git a/src/storage/src/hummock/observer_manager.rs b/src/storage/src/hummock/observer_manager.rs index 3a0715e2417ad..9c455e33782aa 100644 --- a/src/storage/src/hummock/observer_manager.rs +++ b/src/storage/src/hummock/observer_manager.rs @@ -16,9 +16,9 @@ use std::collections::HashMap; use std::sync::Arc; use risingwave_common_service::observer_manager::{ObserverState, SubscribeHummock}; +use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta}; use risingwave_hummock_trace::TraceSpan; use risingwave_pb::catalog::Table; -use risingwave_pb::hummock::version_update_payload; use risingwave_pb::meta::relation::RelationInfo; use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::meta::SubscribeResponse; @@ -26,7 +26,7 @@ use tokio::sync::mpsc::UnboundedSender; use crate::filter_key_extractor::{FilterKeyExtractorImpl, FilterKeyExtractorManagerRef}; use crate::hummock::backup_reader::BackupReaderRef; -use crate::hummock::event_handler::HummockEvent; +use crate::hummock::event_handler::{HummockEvent, HummockVersionUpdate}; use crate::hummock::write_limiter::WriteLimiterRef; pub struct HummockObserverNode { @@ -72,7 +72,13 @@ impl ObserverState for HummockObserverNode { let _ = self .version_update_sender .send(HummockEvent::VersionUpdate( - version_update_payload::Payload::VersionDeltas(hummock_version_deltas), + HummockVersionUpdate::VersionDeltas( + hummock_version_deltas + .version_deltas + .iter() + .map(HummockVersionDelta::from_rpc_protobuf) + .collect(), + ), )) .inspect_err(|e| { tracing::error!("unable to send version delta: {:?}", e); @@ -118,11 +124,11 @@ impl ObserverState for HummockObserverNode { let _ = self .version_update_sender .send(HummockEvent::VersionUpdate( - version_update_payload::Payload::PinnedVersion( - snapshot + HummockVersionUpdate::PinnedVersion(HummockVersion::from_rpc_protobuf( + &snapshot .hummock_version .expect("should get hummock version"), - ), + )), )) .inspect_err(|e| { tracing::error!("unable to send full version: {:?}", e); diff --git a/src/storage/src/hummock/store/hummock_storage.rs b/src/storage/src/hummock/store/hummock_storage.rs index bbdeec1ed67b3..3f1f34016d436 100644 --- a/src/storage/src/hummock/store/hummock_storage.rs +++ b/src/storage/src/hummock/store/hummock_storage.rs @@ -27,10 +27,9 @@ use risingwave_common::util::epoch::is_max_epoch; use risingwave_common_service::observer_manager::{NotificationClient, ObserverManager}; use risingwave_hummock_sdk::key::{is_empty_key_range, TableKey, TableKeyRange}; use risingwave_hummock_sdk::table_watermark::ReadTableWatermark; +use risingwave_hummock_sdk::version::HummockVersion; use risingwave_hummock_sdk::HummockReadEpoch; -#[cfg(any(test, feature = "test"))] -use risingwave_pb::hummock::HummockVersion; -use risingwave_pb::hummock::{version_update_payload, SstableInfo}; +use risingwave_pb::hummock::SstableInfo; use risingwave_rpc_client::HummockMetaClient; use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; use tokio::sync::oneshot; @@ -44,7 +43,9 @@ use crate::hummock::backup_reader::{BackupReader, BackupReaderRef}; use crate::hummock::compactor::CompactorContext; use crate::hummock::event_handler::hummock_event_handler::BufferTracker; use crate::hummock::event_handler::refiller::CacheRefillConfig; -use crate::hummock::event_handler::{HummockEvent, HummockEventHandler, ReadVersionMappingType}; +use crate::hummock::event_handler::{ + HummockEvent, HummockEventHandler, HummockVersionUpdate, ReadVersionMappingType, +}; use crate::hummock::local_version::pinned_version::{start_pinned_version_worker, PinnedVersion}; use crate::hummock::observer_manager::HummockObserverNode; use crate::hummock::store::version::read_filter_for_batch; @@ -168,7 +169,7 @@ impl HummockStorage { observer_manager.start().await; let hummock_version = match event_rx.recv().await { - Some(HummockEvent::VersionUpdate(version_update_payload::Payload::PinnedVersion(version))) => version, + Some(HummockEvent::VersionUpdate(HummockVersionUpdate::PinnedVersion(version))) => version, _ => unreachable!("the hummock observer manager is the first one to take the event tx. Should be full hummock version") }; @@ -542,7 +543,7 @@ impl HummockStorage { let version_id = version.id; self.hummock_event_sender .send(HummockEvent::VersionUpdate( - version_update_payload::Payload::PinnedVersion(version), + HummockVersionUpdate::PinnedVersion(version), )) .unwrap(); loop { diff --git a/src/storage/src/hummock/store/version.rs b/src/storage/src/hummock/store/version.rs index 8fcd81d6b75de..dc1c4afda73f8 100644 --- a/src/storage/src/hummock/store/version.rs +++ b/src/storage/src/hummock/store/version.rs @@ -30,8 +30,9 @@ use risingwave_hummock_sdk::key_range::KeyRangeCommon; use risingwave_hummock_sdk::table_watermark::{ ReadTableWatermark, TableWatermarksIndex, VnodeWatermark, WatermarkDirection, }; +use risingwave_hummock_sdk::version::HummockVersionDelta; use risingwave_hummock_sdk::{HummockEpoch, LocalSstableInfo}; -use risingwave_pb::hummock::{HummockVersionDelta, LevelType, SstableInfo}; +use risingwave_pb::hummock::{LevelType, SstableInfo}; use sync_point::sync_point; use tracing::Instrument; @@ -59,9 +60,6 @@ use crate::monitor::{ }; use crate::store::{gen_min_epoch, ReadOptions, StateStoreIterExt, StreamTypeOfIter}; -// TODO: use a custom data structure to allow in-place update instead of proto -// pub type CommittedVersion = HummockVersion; - pub type CommittedVersion = PinnedVersion; /// Data not committed to Hummock. There are two types of staging data: diff --git a/src/storage/src/hummock/utils.rs b/src/storage/src/hummock/utils.rs index 6404d80bb265f..9edfa8431f2e8 100644 --- a/src/storage/src/hummock/utils.rs +++ b/src/storage/src/hummock/utils.rs @@ -26,8 +26,9 @@ use risingwave_common::catalog::{TableId, TableOption}; use risingwave_hummock_sdk::key::{ bound_table_key_range, EmptySliceRef, FullKey, TableKey, UserKey, }; +use risingwave_hummock_sdk::version::HummockVersion; use risingwave_hummock_sdk::{can_concat, HummockEpoch}; -use risingwave_pb::hummock::{HummockVersion, SstableInfo}; +use risingwave_pb::hummock::SstableInfo; use tokio::sync::watch::Sender; use tokio::sync::Notify; diff --git a/src/tests/compaction_test/src/compaction_test_runner.rs b/src/tests/compaction_test/src/compaction_test_runner.rs index cf3e35b48c692..813e773fd17bf 100644 --- a/src/tests/compaction_test/src/compaction_test_runner.rs +++ b/src/tests/compaction_test/src/compaction_test_runner.rs @@ -32,9 +32,9 @@ use risingwave_common::config::{ use risingwave_common::util::addr::HostAddr; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_hummock_sdk::key::TableKey; +use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta}; use risingwave_hummock_sdk::{CompactionGroupId, HummockEpoch, FIRST_VERSION_ID}; use risingwave_pb::common::WorkerType; -use risingwave_pb::hummock::{HummockVersion, HummockVersionDelta}; use risingwave_rpc_client::{HummockMetaClient, MetaClient}; use risingwave_storage::hummock::hummock_meta_client::MonitoredHummockMetaClient; use risingwave_storage::hummock::{CachePolicy, HummockStorage}; @@ -296,8 +296,7 @@ async fn pull_version_deltas( let res = meta_client .list_version_deltas(0, u32::MAX, u64::MAX) .await - .unwrap() - .version_deltas; + .unwrap(); if let Err(err) = shutdown_tx.send(()) { tracing::warn!("Failed to send shutdown to heartbeat task: {:?}", err); diff --git a/src/tests/compaction_test/src/delete_range_runner.rs b/src/tests/compaction_test/src/delete_range_runner.rs index d2acd7c754c74..2aab4679bb634 100644 --- a/src/tests/compaction_test/src/delete_range_runner.rs +++ b/src/tests/compaction_test/src/delete_range_runner.rs @@ -269,7 +269,7 @@ async fn compaction_test( ) .await .unwrap(); - let version = store.get_pinned_version().version(); + let version = store.get_pinned_version().version().clone(); let remote_version = meta_client.get_current_version().await.unwrap(); println!( "version-{}, remote version-{}",