Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(storage): replace PbHummockVersion with new HummockVersion struct #14101

Merged
merged 12 commits into from
Dec 26, 2023
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/ctl/src/cmd_impl/hummock/list_version_deltas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,6 @@ pub async fn list_version_deltas(
let resp = meta_client
.list_version_deltas(start_id, num_epochs, HummockEpoch::MAX)
.await?;
println!("{:#?}", resp.version_deltas);
println!("{:#?}", resp);
Ok(())
}
18 changes: 10 additions & 8 deletions src/ctl/src/cmd_impl/hummock/pause_resume.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_hummock_sdk::compaction_group::hummock_version_ext::HummockVersionUpdateExt;
use risingwave_hummock_sdk::version::HummockVersion;
use risingwave_hummock_sdk::HummockEpoch;

use crate::CtlContext;
Expand Down Expand Up @@ -51,11 +51,13 @@ pub async fn resume_version_checkpoint(context: &CtlContext) -> anyhow::Result<(
/// added/removed for what reason (row deletion/compaction/etc.).
pub async fn replay_version(context: &CtlContext) -> anyhow::Result<()> {
let meta_client = context.meta_client().await?;
let mut base_version = meta_client
.risectl_get_checkpoint_hummock_version()
.await?
.checkpoint_version
.unwrap();
let mut base_version = HummockVersion::from_rpc_protobuf(
&meta_client
.risectl_get_checkpoint_hummock_version()
.await?
.checkpoint_version
.unwrap(),
);
println!("replay starts");
println!("base version {}", base_version.id);
let delta_fetch_size = 100;
Expand All @@ -65,10 +67,10 @@ pub async fn replay_version(context: &CtlContext) -> anyhow::Result<()> {
.list_version_deltas(current_delta_id, delta_fetch_size, HummockEpoch::MAX)
.await
.unwrap();
if deltas.version_deltas.is_empty() {
if deltas.is_empty() {
break;
}
for delta in deltas.version_deltas {
for delta in deltas {
if delta.prev_id != base_version.id {
eprintln!("missing delta log for version {}", base_version.id);
break;
Expand Down
3 changes: 1 addition & 2 deletions src/ctl/src/cmd_impl/hummock/sst_dump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde;
use risingwave_common::util::value_encoding::{BasicSerde, EitherSerde, ValueRowDeserializer};
use risingwave_frontend::TableCatalog;
use risingwave_hummock_sdk::compaction_group::hummock_version_ext::HummockVersionExt;
use risingwave_hummock_sdk::key::FullKey;
use risingwave_hummock_sdk::HummockSstableObjectId;
use risingwave_object_store::object::{ObjectMetadata, ObjectStoreImpl};
Expand Down Expand Up @@ -75,7 +74,7 @@ pub async fn sst_dump(context: &CtlContext, args: SstDumpArgs) -> anyhow::Result
let hummock = context
.hummock_store(HummockServiceOpts::from_env(args.data_dir.clone())?)
.await?;
let version = hummock.inner().get_pinned_version().version();
let version = hummock.inner().get_pinned_version().version().clone();
let sstable_store = hummock.sstable_store();
for level in version.get_combined_levels() {
for sstable_info in &level.table_infos {
Expand Down
1 change: 1 addition & 0 deletions src/frontend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ risingwave_common = { workspace = true }
risingwave_common_service = { workspace = true }
risingwave_connector = { workspace = true }
risingwave_expr = { workspace = true }
risingwave_hummock_sdk = { workspace = true }
risingwave_pb = { workspace = true }
risingwave_rpc_client = { workspace = true }
risingwave_source = { workspace = true }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use risingwave_common::catalog::RW_CATALOG_SCHEMA_NAME;
use risingwave_common::error::Result;
use risingwave_common::row::OwnedRow;
use risingwave_common::types::{DataType, ScalarImpl};
use risingwave_pb::hummock::HummockVersion;
use risingwave_hummock_sdk::version::HummockVersion;
use serde_json::json;

use crate::catalog::system_catalog::{BuiltinTable, SysCatalogReaderImpl};
Expand Down
10 changes: 4 additions & 6 deletions src/frontend/src/meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@
use std::collections::HashMap;

use risingwave_common::system_param::reader::SystemParamsReader;
use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
use risingwave_pb::backup_service::MetaSnapshotMetadata;
use risingwave_pb::catalog::Table;
use risingwave_pb::common::WorkerNode;
use risingwave_pb::ddl_service::DdlProgress;
use risingwave_pb::hummock::write_limits::WriteLimit;
use risingwave_pb::hummock::{
BranchedObject, CompactTaskAssignment, CompactTaskProgress, CompactionGroupInfo,
HummockSnapshot, HummockVersion, HummockVersionDelta,
HummockSnapshot,
};
use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
use risingwave_pb::meta::list_actor_states_response::ActorState;
Expand Down Expand Up @@ -223,15 +224,12 @@ impl FrontendMetaClient for FrontendMetaClientImpl {
self.0
.risectl_get_checkpoint_hummock_version()
.await
.map(|v| v.checkpoint_version.unwrap())
.map(|v| HummockVersion::from_rpc_protobuf(&v.checkpoint_version.unwrap()))
}

async fn list_version_deltas(&self) -> Result<Vec<HummockVersionDelta>> {
// FIXME #8612: there can be lots of version deltas, so better to fetch them by pages and refactor `SysRowSeqScanExecutor` to yield multiple chunks.
self.0
.list_version_deltas(0, u32::MAX, u64::MAX)
.await
.map(|v| v.version_deltas)
self.0.list_version_deltas(0, u32::MAX, u64::MAX).await
}

async fn list_branched_objects(&self) -> Result<Vec<BranchedObject>> {
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use risingwave_common::catalog::{
use risingwave_common::error::{ErrorCode, Result};
use risingwave_common::system_param::reader::SystemParamsReader;
use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
use risingwave_pb::backup_service::MetaSnapshotMetadata;
use risingwave_pb::catalog::table::OptionalAssociatedSourceId;
use risingwave_pb::catalog::{
Expand All @@ -45,7 +46,7 @@ use risingwave_pb::ddl_service::{
use risingwave_pb::hummock::write_limits::WriteLimit;
use risingwave_pb::hummock::{
BranchedObject, CompactTaskAssignment, CompactTaskProgress, CompactionGroupInfo,
HummockSnapshot, HummockVersion, HummockVersionDelta,
HummockSnapshot,
};
use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
use risingwave_pb::meta::list_actor_states_response::ActorState;
Expand Down
6 changes: 5 additions & 1 deletion src/jni_core/src/hummock_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use risingwave_common::util::select_all;
use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde;
use risingwave_common::util::value_encoding::{BasicSerde, EitherSerde, ValueRowDeserializer};
use risingwave_hummock_sdk::key::{prefixed_range_with_vnode, TableKeyRange};
use risingwave_hummock_sdk::version::HummockVersion;
use risingwave_object_store::object::build_remote_object_store;
use risingwave_object_store::object::object_metrics::ObjectStoreMetrics;
use risingwave_pb::java_binding::key_range::Bound;
Expand Down Expand Up @@ -84,7 +85,10 @@ impl HummockJavaBindingIterator {

let mut streams = Vec::with_capacity(read_plan.vnode_ids.len());
let key_range = read_plan.key_range.unwrap();
let pin_version = PinnedVersion::new(read_plan.version.unwrap(), unbounded_channel().0);
let pin_version = PinnedVersion::new(
HummockVersion::from_rpc_protobuf(&read_plan.version.unwrap()),
unbounded_channel().0,
);
let table_id = read_plan.table_id.into();

for vnode in read_plan.vnode_ids {
Expand Down
1 change: 1 addition & 0 deletions src/meta/model_v2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ ignored = ["workspace-hack"]
normal = ["workspace-hack"]

[dependencies]
risingwave_hummock_sdk = { workspace = true }
risingwave_pb = { workspace = true }
sea-orm = { version = "0.12.0", features = [
"sqlx-mysql",
Expand Down
6 changes: 3 additions & 3 deletions src/meta/model_v2/src/hummock_version_delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_pb::hummock::HummockVersionDelta;
use risingwave_pb::hummock::PbHummockVersionDelta;
use sea_orm::entity::prelude::*;
use sea_orm::FromJsonQueryResult;
use serde::{Deserialize, Serialize};
Expand All @@ -36,9 +36,9 @@ pub enum Relation {}

impl ActiveModelBehavior for ActiveModel {}

crate::derive_from_json_struct!(FullVersionDelta, HummockVersionDelta);
crate::derive_from_json_struct!(FullVersionDelta, PbHummockVersionDelta);

impl From<Model> for HummockVersionDelta {
impl From<Model> for PbHummockVersionDelta {
fn from(value: Model) -> Self {
let ret = value.full_version_delta.into_inner();
assert_eq!(value.id, ret.id as i64);
Expand Down
1 change: 1 addition & 0 deletions src/meta/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ rand = "0.8"
regex = "1"
risingwave_common = { workspace = true }
risingwave_connector = { workspace = true }
risingwave_hummock_sdk = { workspace = true }
risingwave_meta = { workspace = true }
risingwave_meta_model_v2 = { workspace = true }
risingwave_pb = { workspace = true }
Expand Down
35 changes: 19 additions & 16 deletions src/meta/service/src/hummock_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,18 @@ use std::time::Duration;
use futures::StreamExt;
use itertools::Itertools;
use risingwave_common::catalog::{TableId, NON_RESERVED_SYS_CATALOG_ID};
use risingwave_hummock_sdk::version::HummockVersionDelta;
use risingwave_pb::hummock::get_compaction_score_response::PickerInfo;
use risingwave_pb::hummock::hummock_manager_service_server::HummockManagerService;
use risingwave_pb::hummock::subscribe_compaction_event_request::Event as RequestEvent;
use risingwave_pb::hummock::version_update_payload::Payload;
use risingwave_pb::hummock::*;
use tonic::{Request, Response, Status, Streaming};

use crate::hummock::compaction::selector::ManualCompactionOption;
use crate::hummock::{HummockManagerRef, VacuumManagerRef};
use crate::manager::FragmentManagerRef;
use crate::RwReceiverStream;

pub struct HummockServiceImpl {
hummock_manager: HummockManagerRef,
vacuum_manager: VacuumManagerRef,
Expand Down Expand Up @@ -83,7 +84,7 @@ impl HummockManagerService for HummockServiceImpl {
let current_version = self.hummock_manager.get_current_version().await;
Ok(Response::new(GetCurrentVersionResponse {
status: None,
current_version: Some(current_version),
current_version: Some(current_version.to_protobuf()),
}))
}

Expand All @@ -94,10 +95,12 @@ impl HummockManagerService for HummockServiceImpl {
let req = request.into_inner();
let (version, compaction_groups) = self
.hummock_manager
.replay_version_delta(req.version_delta.unwrap())
.replay_version_delta(HummockVersionDelta::from_rpc_protobuf(
&req.version_delta.unwrap(),
))
.await?;
Ok(Response::new(ReplayVersionDeltaResponse {
version: Some(version),
version: Some(version.to_protobuf()),
modified_compaction_groups: compaction_groups,
}))
}
Expand All @@ -119,7 +122,7 @@ impl HummockManagerService for HummockServiceImpl {
) -> Result<Response<DisableCommitEpochResponse>, Status> {
let version = self.hummock_manager.disable_commit_epoch().await;
Ok(Response::new(DisableCommitEpochResponse {
current_version: Some(version),
current_version: Some(version.to_protobuf()),
}))
}

Expand All @@ -133,7 +136,12 @@ impl HummockManagerService for HummockServiceImpl {
.list_version_deltas(req.start_id, req.num_limit, req.committed_epoch_limit)
.await?;
let resp = ListVersionDeltasResponse {
version_deltas: Some(version_deltas),
version_deltas: Some(PbHummockVersionDeltas {
version_deltas: version_deltas
.iter()
.map(HummockVersionDelta::to_protobuf)
.collect(),
}),
};
Ok(Response::new(resp))
}
Expand Down Expand Up @@ -415,15 +423,10 @@ impl HummockManagerService for HummockServiceImpl {
request: Request<PinVersionRequest>,
) -> Result<Response<PinVersionResponse>, Status> {
let req = request.into_inner();
let payload = self.hummock_manager.pin_version(req.context_id).await?;
match payload {
Payload::PinnedVersion(version) => Ok(Response::new(PinVersionResponse {
pinned_version: Some(version),
})),
Payload::VersionDeltas(_) => {
unreachable!("pin_version should not return version delta")
}
}
let version = self.hummock_manager.pin_version(req.context_id).await?;
Ok(Response::new(PinVersionResponse {
pinned_version: Some(version.to_protobuf()),
}))
}

async fn split_compaction_group(
Expand Down Expand Up @@ -460,7 +463,7 @@ impl HummockManagerService for HummockServiceImpl {
) -> Result<Response<RiseCtlGetCheckpointVersionResponse>, Status> {
let checkpoint_version = self.hummock_manager.get_checkpoint_version().await;
Ok(Response::new(RiseCtlGetCheckpointVersionResponse {
checkpoint_version: Some(checkpoint_version),
checkpoint_version: Some(checkpoint_version.to_protobuf()),
}))
}

Expand Down
2 changes: 1 addition & 1 deletion src/meta/service/src/notification_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ impl NotificationServiceImpl {

MetaSnapshot {
tables,
hummock_version: Some(hummock_version),
hummock_version: Some(hummock_version.to_protobuf()),
version: Some(SnapshotVersion {
catalog_version,
..Default::default()
Expand Down
11 changes: 6 additions & 5 deletions src/meta/src/backup_restore/meta_snapshot_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ use anyhow::anyhow;
use risingwave_backup::error::{BackupError, BackupResult};
use risingwave_backup::meta_snapshot_v1::{ClusterMetadata, MetaSnapshotV1};
use risingwave_backup::MetaSnapshotId;
use risingwave_hummock_sdk::compaction_group::hummock_version_ext::HummockVersionUpdateExt;
use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
use risingwave_pb::catalog::{
Connection, Database, Function, Index, Schema, Sink, Source, Table, View,
};
use risingwave_pb::hummock::{HummockVersion, HummockVersionDelta, HummockVersionStats};
use risingwave_pb::hummock::HummockVersionStats;
use risingwave_pb::meta::SystemParams;
use risingwave_pb::user::UserInfo;

Expand All @@ -46,10 +46,10 @@ impl<S: MetaStore> MetaSnapshotV1Builder<S> {
}
}

pub async fn build<D: Future<Output = HummockVersion>>(
pub async fn build(
&mut self,
id: MetaSnapshotId,
hummock_version_builder: D,
hummock_version_builder: impl Future<Output = HummockVersion>,
) -> BackupResult<()> {
self.snapshot.format_version = VERSION;
self.snapshot.id = id;
Expand Down Expand Up @@ -169,7 +169,8 @@ mod tests {
use risingwave_backup::error::BackupError;
use risingwave_backup::meta_snapshot_v1::MetaSnapshotV1;
use risingwave_common::system_param::system_params_for_test;
use risingwave_pb::hummock::{HummockVersion, HummockVersionStats};
use risingwave_hummock_sdk::version::HummockVersion;
use risingwave_pb::hummock::HummockVersionStats;

use crate::backup_restore::meta_snapshot_builder;
use crate::manager::model::SystemParamsModel;
Expand Down
Loading
Loading