diff --git a/proto/hummock.proto b/proto/hummock.proto index 19b7e036c9686..34da6e4e132a3 100644 --- a/proto/hummock.proto +++ b/proto/hummock.proto @@ -833,6 +833,7 @@ message CancelCompactTaskResponse { message GetVersionByEpochRequest { uint64 epoch = 1; + uint32 table_id = 2; } message GetVersionByEpochResponse { diff --git a/src/meta/model_v2/migration/src/lib.rs b/src/meta/model_v2/migration/src/lib.rs index 08291e5b163d5..0b09f3c4d4e11 100644 --- a/src/meta/model_v2/migration/src/lib.rs +++ b/src/meta/model_v2/migration/src/lib.rs @@ -20,6 +20,7 @@ mod m20240702_080451_system_param_value; mod m20240702_084927_unnecessary_fk; mod m20240726_063833_auto_schema_change; mod m20240806_143329_add_rate_limit_to_source_catalog; +mod m20240820_081248_add_time_travel_per_table_epoch; pub struct Migrator; @@ -45,6 +46,7 @@ impl MigratorTrait for Migrator { Box::new(m20240702_084927_unnecessary_fk::Migration), Box::new(m20240726_063833_auto_schema_change::Migration), Box::new(m20240806_143329_add_rate_limit_to_source_catalog::Migration), + Box::new(m20240820_081248_add_time_travel_per_table_epoch::Migration), ] } } diff --git a/src/meta/model_v2/migration/src/m20240820_081248_add_time_travel_per_table_epoch.rs b/src/meta/model_v2/migration/src/m20240820_081248_add_time_travel_per_table_epoch.rs new file mode 100644 index 0000000000000..85d9475aa8f01 --- /dev/null +++ b/src/meta/model_v2/migration/src/m20240820_081248_add_time_travel_per_table_epoch.rs @@ -0,0 +1,197 @@ +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +const TABLE_NAME: &str = "hummock_epoch_to_version"; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + // modify PK + match manager.get_database_backend() { + sea_orm::DatabaseBackend::MySql => { + manager + .alter_table( + Table::alter() + .table(HummockEpochToVersion::Table) + .add_column( + ColumnDef::new(HummockEpochToVersion::TableId).big_integer(), + ) + .to_owned(), + ) + .await?; + manager + .get_connection() + .execute(sea_orm::Statement::from_string( + sea_orm::DatabaseBackend::MySql, + format!("ALTER TABLE {TABLE_NAME} DROP PRIMARY KEY, ADD PRIMARY KEY (epoch, table_id)"), + )) + .await?; + } + sea_orm::DatabaseBackend::Postgres => { + manager + .alter_table( + Table::alter() + .table(HummockEpochToVersion::Table) + .add_column( + ColumnDef::new(HummockEpochToVersion::TableId).big_integer(), + ) + .to_owned(), + ) + .await?; + manager + .get_connection() + .execute(sea_orm::Statement::from_string( + sea_orm::DatabaseBackend::Postgres, + format!("ALTER TABLE {TABLE_NAME} DROP CONSTRAINT {TABLE_NAME}_pkey"), + )) + .await?; + manager + .get_connection() + .execute(sea_orm::Statement::from_string( + sea_orm::DatabaseBackend::Postgres, + format!("ALTER TABLE {TABLE_NAME} ADD PRIMARY KEY (epoch, table_id)"), + )) + .await?; + } + sea_orm::DatabaseBackend::Sqlite => { + // sqlite is not for prod usage, so recreating the table is fine. + manager + .drop_table( + sea_orm_migration::prelude::Table::drop() + .table(HummockEpochToVersion::Table) + .if_exists() + .cascade() + .to_owned(), + ) + .await?; + + manager + .create_table( + Table::create() + .table(HummockEpochToVersion::Table) + .if_not_exists() + .col( + ColumnDef::new(HummockEpochToVersion::Epoch) + .big_integer() + .not_null(), + ) + .col( + ColumnDef::new(HummockEpochToVersion::TableId) + .big_integer() + .not_null(), + ) + .col( + ColumnDef::new(HummockEpochToVersion::VersionId) + .big_integer() + .not_null(), + ) + .primary_key( + Index::create() + .col(HummockEpochToVersion::Epoch) + .col(HummockEpochToVersion::TableId), + ) + .to_owned(), + ) + .await?; + } + } + Ok(()) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + // The downgrade for MySql and Postgres may not work due to PK confliction. + match manager.get_database_backend() { + sea_orm::DatabaseBackend::MySql => { + manager + .get_connection() + .execute(sea_orm::Statement::from_string( + sea_orm::DatabaseBackend::MySql, + format!("ALTER TABLE {TABLE_NAME} DROP PRIMARY KEY"), + )) + .await?; + manager + .alter_table( + Table::alter() + .table(HummockEpochToVersion::Table) + .drop_column(HummockEpochToVersion::TableId) + .to_owned(), + ) + .await?; + manager + .get_connection() + .execute(sea_orm::Statement::from_string( + sea_orm::DatabaseBackend::MySql, + format!("ALTER TABLE {TABLE_NAME} ADD PRIMARY KEY (epoch)"), + )) + .await?; + } + sea_orm::DatabaseBackend::Postgres => { + manager + .get_connection() + .execute(sea_orm::Statement::from_string( + sea_orm::DatabaseBackend::Postgres, + format!("ALTER TABLE {TABLE_NAME} DROP CONSTRAINT {TABLE_NAME}_pkey"), + )) + .await?; + manager + .alter_table( + Table::alter() + .table(HummockEpochToVersion::Table) + .drop_column(HummockEpochToVersion::TableId) + .to_owned(), + ) + .await?; + manager + .get_connection() + .execute(sea_orm::Statement::from_string( + sea_orm::DatabaseBackend::Postgres, + format!("ALTER TABLE {TABLE_NAME} ADD PRIMARY KEY (epoch)"), + )) + .await?; + } + sea_orm::DatabaseBackend::Sqlite => { + manager + .drop_table( + sea_orm_migration::prelude::Table::drop() + .table(HummockEpochToVersion::Table) + .if_exists() + .cascade() + .to_owned(), + ) + .await?; + + manager + .create_table( + Table::create() + .table(HummockEpochToVersion::Table) + .if_not_exists() + .col( + ColumnDef::new(HummockEpochToVersion::Epoch) + .big_integer() + .not_null() + .primary_key(), + ) + .col( + ColumnDef::new(HummockEpochToVersion::VersionId) + .big_integer() + .not_null(), + ) + .to_owned(), + ) + .await?; + } + } + + Ok(()) + } +} + +#[derive(DeriveIden)] +enum HummockEpochToVersion { + Table, + Epoch, + TableId, + VersionId, +} diff --git a/src/meta/model_v2/src/hummock_epoch_to_version.rs b/src/meta/model_v2/src/hummock_epoch_to_version.rs index 181b1b320bc54..f54551aa80178 100644 --- a/src/meta/model_v2/src/hummock_epoch_to_version.rs +++ b/src/meta/model_v2/src/hummock_epoch_to_version.rs @@ -22,6 +22,8 @@ use crate::{Epoch, HummockVersionId}; pub struct Model { #[sea_orm(primary_key, auto_increment = false)] pub epoch: Epoch, + #[sea_orm(primary_key, auto_increment = false)] + pub table_id: i64, pub version_id: HummockVersionId, } diff --git a/src/meta/service/src/hummock_service.rs b/src/meta/service/src/hummock_service.rs index 21e203d8440bd..3217f63a8bb5a 100644 --- a/src/meta/service/src/hummock_service.rs +++ b/src/meta/service/src/hummock_service.rs @@ -710,8 +710,11 @@ impl HummockManagerService for HummockServiceImpl { &self, request: Request, ) -> Result, Status> { - let GetVersionByEpochRequest { epoch } = request.into_inner(); - let version = self.hummock_manager.epoch_to_version(epoch).await?; + let GetVersionByEpochRequest { epoch, table_id } = request.into_inner(); + let version = self + .hummock_manager + .epoch_to_version(epoch, table_id) + .await?; Ok(Response::new(GetVersionByEpochResponse { version: Some(version.to_protobuf()), })) diff --git a/src/meta/src/hummock/manager/commit_epoch.rs b/src/meta/src/hummock/manager/commit_epoch.rs index e7bca768437f2..e92e91c8503d0 100644 --- a/src/meta/src/hummock/manager/commit_epoch.rs +++ b/src/meta/src/hummock/manager/commit_epoch.rs @@ -232,7 +232,7 @@ impl HummockManager { is_visible_table_committed_epoch, new_compaction_group, commit_sstables, - new_table_ids, + &new_table_ids, new_table_watermarks, change_log_delta, ); @@ -289,6 +289,9 @@ impl HummockManager { .values() .map(|g| (g.group_id, g.parent_group_id)) .collect(); + let time_travel_tables_to_commit = table_compaction_group_mapping + .iter() + .filter(|(table_id, _)| tables_to_commit.contains(table_id)); let mut txn = sql_store.conn.begin().await?; let version_snapshot_sst_ids = self .write_time_travel_metadata( @@ -297,6 +300,8 @@ impl HummockManager { time_travel_delta, &group_parents, &versioning.last_time_travel_snapshot_sst_ids, + time_travel_tables_to_commit, + committed_epoch, ) .await?; commit_multi_var_with_provided_txn!( diff --git a/src/meta/src/hummock/manager/time_travel.rs b/src/meta/src/hummock/manager/time_travel.rs index 61c1e820fab0c..0b6ef73e52605 100644 --- a/src/meta/src/hummock/manager/time_travel.rs +++ b/src/meta/src/hummock/manager/time_travel.rs @@ -16,6 +16,7 @@ use std::collections::{HashMap, HashSet, VecDeque}; use anyhow::anyhow; use itertools::Itertools; +use risingwave_common::catalog::TableId; use risingwave_common::system_param::reader::SystemParamsRead; use risingwave_common::util::epoch::Epoch; use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; @@ -36,7 +37,7 @@ use risingwave_pb::hummock::{PbHummockVersion, PbHummockVersionDelta}; use sea_orm::sea_query::OnConflict; use sea_orm::ActiveValue::Set; use sea_orm::{ - ColumnTrait, DatabaseTransaction, EntityTrait, QueryFilter, QueryOrder, QuerySelect, + ColumnTrait, Condition, DatabaseTransaction, EntityTrait, QueryFilter, QueryOrder, QuerySelect, TransactionTrait, }; @@ -101,6 +102,7 @@ impl HummockManager { .lt(risingwave_meta_model_v2::Epoch::try_from(epoch_watermark).unwrap()), ) .order_by_desc(hummock_epoch_to_version::Column::Epoch) + .order_by_asc(hummock_epoch_to_version::Column::VersionId) .one(&txn) .await?; let Some(version_watermark) = version_watermark else { @@ -275,9 +277,19 @@ impl HummockManager { /// The version is retrieved from `hummock_epoch_to_version`, selecting the entry with the largest epoch that's lte `query_epoch`. /// /// The resulted version is complete, i.e. with correct `SstableInfo`. - pub async fn epoch_to_version(&self, query_epoch: HummockEpoch) -> Result { + pub async fn epoch_to_version( + &self, + query_epoch: HummockEpoch, + table_id: u32, + ) -> Result { let sql_store = self.sql_store().ok_or_else(require_sql_meta_store_err)?; let epoch_to_version = hummock_epoch_to_version::Entity::find() + .filter( + Condition::any() + .add(hummock_epoch_to_version::Column::TableId.eq(i64::from(table_id))) + // for backward compatibility + .add(hummock_epoch_to_version::Column::TableId.eq(0)), + ) .filter( hummock_epoch_to_version::Column::Epoch .lte(risingwave_meta_model_v2::Epoch::try_from(query_epoch).unwrap()), @@ -362,7 +374,19 @@ impl HummockManager { delta: HummockVersionDelta, group_parents: &HashMap, skip_sst_ids: &HashSet, + tables_to_commit: impl Iterator, + committed_epoch: u64, ) -> Result>> { + let select_groups = group_parents + .iter() + .filter_map(|(cg_id, _)| { + if should_ignore_group(find_root_group(*cg_id, group_parents)) { + None + } else { + Some(*cg_id) + } + }) + .collect::>(); async fn write_sstable_infos( sst_infos: impl Iterator, txn: &DatabaseTransaction, @@ -388,35 +412,23 @@ impl HummockManager { Ok(count) } - let epoch = delta.visible_table_committed_epoch(); - let version_id: u64 = delta.id.to_u64(); - let m = hummock_epoch_to_version::ActiveModel { - epoch: Set(epoch.try_into().unwrap()), - version_id: Set(version_id.try_into().unwrap()), - }; - hummock_epoch_to_version::Entity::insert(m) - .on_conflict( - OnConflict::column(hummock_epoch_to_version::Column::Epoch) - // The existing row must be inserted by the common committed epoch of created MVs. - // While any duplicate row must be inserted by MVs still in creation. - // So the row shouldn't be updated. - .do_nothing() - .to_owned(), - ) - .do_nothing() - .exec(txn) - .await?; + for (table_id, cg_id) in tables_to_commit { + if !select_groups.contains(cg_id) { + continue; + } + let version_id: u64 = delta.id.to_u64(); + let m = hummock_epoch_to_version::ActiveModel { + epoch: Set(committed_epoch.try_into().unwrap()), + table_id: Set(table_id.table_id.into()), + version_id: Set(version_id.try_into().unwrap()), + }; + // There should be no conflict rows. + hummock_epoch_to_version::Entity::insert(m) + .exec(txn) + .await?; + } + let mut version_sst_ids = None; - let select_groups = group_parents - .iter() - .filter_map(|(cg_id, _)| { - if should_ignore_group(find_root_group(*cg_id, group_parents)) { - None - } else { - Some(*cg_id) - } - }) - .collect::>(); if let Some(version) = version { version_sst_ids = Some( version diff --git a/src/meta/src/hummock/manager/transaction.rs b/src/meta/src/hummock/manager/transaction.rs index aa0ead3cef2aa..9a795608f7e1a 100644 --- a/src/meta/src/hummock/manager/transaction.rs +++ b/src/meta/src/hummock/manager/transaction.rs @@ -122,7 +122,7 @@ impl<'a> HummockVersionTransaction<'a> { is_visible_table_committed_epoch: bool, new_compaction_group: Option<(CompactionGroupId, CompactionConfig)>, commit_sstables: BTreeMap>, - new_table_ids: HashMap, + new_table_ids: &HashMap, new_table_watermarks: HashMap, change_log_delta: HashMap, ) -> HummockVersionDelta { @@ -175,7 +175,7 @@ impl<'a> HummockVersionTransaction<'a> { // update state table info new_version_delta.with_latest_version(|version, delta| { - for (table_id, cg_id) in &new_table_ids { + for (table_id, cg_id) in new_table_ids { assert!( !version.state_table_info.info().contains_key(table_id), "newly added table exists previously: {:?}", diff --git a/src/meta/src/hummock/mock_hummock_meta_client.rs b/src/meta/src/hummock/mock_hummock_meta_client.rs index 1cdd8547c8247..499d9df0958c4 100644 --- a/src/meta/src/hummock/mock_hummock_meta_client.rs +++ b/src/meta/src/hummock/mock_hummock_meta_client.rs @@ -348,7 +348,11 @@ impl HummockMetaClient for MockHummockMetaClient { )) } - async fn get_version_by_epoch(&self, _epoch: HummockEpoch) -> Result { + async fn get_version_by_epoch( + &self, + _epoch: HummockEpoch, + _table_id: u32, + ) -> Result { unimplemented!() } } diff --git a/src/rpc_client/src/hummock_meta_client.rs b/src/rpc_client/src/hummock_meta_client.rs index df42a0da3ff35..bb62875b3fae1 100644 --- a/src/rpc_client/src/hummock_meta_client.rs +++ b/src/rpc_client/src/hummock_meta_client.rs @@ -66,5 +66,9 @@ pub trait HummockMetaClient: Send + Sync + 'static { BoxStream<'static, CompactionEventItem>, )>; - async fn get_version_by_epoch(&self, epoch: HummockEpoch) -> Result; + async fn get_version_by_epoch( + &self, + epoch: HummockEpoch, + table_id: u32, + ) -> Result; } diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index db66e60c91eeb..41706b72becb4 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -1432,8 +1432,12 @@ impl MetaClient { Ok(resp.ret) } - pub async fn get_version_by_epoch(&self, epoch: HummockEpoch) -> Result { - let req = GetVersionByEpochRequest { epoch }; + pub async fn get_version_by_epoch( + &self, + epoch: HummockEpoch, + table_id: u32, + ) -> Result { + let req = GetVersionByEpochRequest { epoch, table_id }; let resp = self.inner.get_version_by_epoch(req).await?; Ok(resp.version.unwrap()) } @@ -1607,8 +1611,12 @@ impl HummockMetaClient for MetaClient { Ok((request_sender, Box::pin(stream))) } - async fn get_version_by_epoch(&self, epoch: HummockEpoch) -> Result { - self.get_version_by_epoch(epoch).await + async fn get_version_by_epoch( + &self, + epoch: HummockEpoch, + table_id: u32, + ) -> Result { + self.get_version_by_epoch(epoch, table_id).await } } diff --git a/src/storage/src/hummock/hummock_meta_client.rs b/src/storage/src/hummock/hummock_meta_client.rs index 4445a74884d5a..d123558acc50b 100644 --- a/src/storage/src/hummock/hummock_meta_client.rs +++ b/src/storage/src/hummock/hummock_meta_client.rs @@ -130,7 +130,11 @@ impl HummockMetaClient for MonitoredHummockMetaClient { self.meta_client.subscribe_compaction_event().await } - async fn get_version_by_epoch(&self, epoch: HummockEpoch) -> Result { - self.meta_client.get_version_by_epoch(epoch).await + async fn get_version_by_epoch( + &self, + epoch: HummockEpoch, + table_id: u32, + ) -> Result { + self.meta_client.get_version_by_epoch(epoch, table_id).await } } diff --git a/src/storage/src/hummock/store/hummock_storage.rs b/src/storage/src/hummock/store/hummock_storage.rs index b64752fca7fd6..82b98c5f4fb39 100644 --- a/src/storage/src/hummock/store/hummock_storage.rs +++ b/src/storage/src/hummock/store/hummock_storage.rs @@ -330,7 +330,7 @@ impl HummockStorage { let fetch = async { let pb_version = self .hummock_meta_client - .get_version_by_epoch(epoch) + .get_version_by_epoch(epoch, table_id.table_id()) .await .inspect_err(|e| tracing::error!("{}", e.to_report_string())) .map_err(|e| HummockError::meta_error(e.to_report_string()))?;