diff --git a/proto/backup_service.proto b/proto/backup_service.proto index 5c9f1364582f0..fbbbee2c4a6ee 100644 --- a/proto/backup_service.proto +++ b/proto/backup_service.proto @@ -47,7 +47,8 @@ message MetaSnapshotManifest { message MetaSnapshotMetadata { uint64 id = 1; uint64 hummock_version_id = 2; - uint64 max_committed_epoch = 3; + reserved 3; + reserved 'max_committed_epoch'; reserved 4; reserved 'safe_epoch'; optional uint32 format_version = 5; diff --git a/proto/hummock.proto b/proto/hummock.proto index f944bf2bf7d64..ef4e218c5fdc2 100644 --- a/proto/hummock.proto +++ b/proto/hummock.proto @@ -175,7 +175,7 @@ message HummockVersion { uint64 id = 1; // Levels of each compaction group map levels = 2; - uint64 max_committed_epoch = 3; + uint64 max_committed_epoch = 3 [deprecated = true]; reserved 4; reserved 'safe_epoch'; map table_watermarks = 5; @@ -191,7 +191,7 @@ message HummockVersionDelta { uint64 prev_id = 2; // Levels of each compaction group map group_deltas = 3; - uint64 max_committed_epoch = 4; + uint64 max_committed_epoch = 4 [deprecated = true]; reserved 5; reserved 'safe_epoch'; bool trivial_move = 6; diff --git a/src/meta/Cargo.toml b/src/meta/Cargo.toml index a7f37bf505910..ef4852a665f72 100644 --- a/src/meta/Cargo.toml +++ b/src/meta/Cargo.toml @@ -103,7 +103,7 @@ workspace-hack = { path = "../workspace-hack" } assert_matches = "1" expect-test = "1.5" rand = { workspace = true } -risingwave_hummock_sdk = { workspace = true } +risingwave_hummock_sdk = { workspace = true, features = ["test"] } risingwave_test_runner = { workspace = true } [features] diff --git a/src/meta/model_v2/src/hummock_version_delta.rs b/src/meta/model_v2/src/hummock_version_delta.rs index 550310e3d6b3f..44f4d24c71949 100644 --- a/src/meta/model_v2/src/hummock_version_delta.rs +++ b/src/meta/model_v2/src/hummock_version_delta.rs @@ -41,7 +41,6 @@ impl From for PbHummockVersionDelta { let ret = value.full_version_delta.to_protobuf(); assert_eq!(value.id, ret.id as i64); assert_eq!(value.prev_id, ret.prev_id as i64); - assert_eq!(value.max_committed_epoch, ret.max_committed_epoch as i64); assert_eq!(value.trivial_move, ret.trivial_move); ret } diff --git a/src/meta/src/barrier/info.rs b/src/meta/src/barrier/info.rs index 06a390380abfb..d0e2ab020f488 100644 --- a/src/meta/src/barrier/info.rs +++ b/src/meta/src/barrier/info.rs @@ -81,6 +81,10 @@ impl InflightGraphInfo { } } + pub fn is_empty(&self) -> bool { + self.fragment_infos.is_empty() + } + /// Update worker nodes snapshot. We need to support incremental updates for it in the future. pub fn on_new_worker_node_map(&self, node_map: &HashMap) { for (node_id, actors) in &self.actor_map { diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index c1c14d2977bc6..87cb15fac4488 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -29,7 +29,6 @@ use prometheus::HistogramTimer; use risingwave_common::catalog::TableId; use risingwave_common::system_param::reader::SystemParamsRead; use risingwave_common::system_param::PAUSE_ON_NEXT_BOOTSTRAP_KEY; -use risingwave_common::util::epoch::{Epoch, INVALID_EPOCH}; use risingwave_common::{bail, must_match}; use risingwave_hummock_sdk::change_log::build_table_change_log_delta; use risingwave_hummock_sdk::sstable_info::SstableInfo; @@ -597,7 +596,7 @@ impl GlobalBarrierManager { let in_flight_barrier_nums = env.opts.in_flight_barrier_nums; let initial_invalid_state = BarrierManagerState::new( - TracedEpoch::new(Epoch(INVALID_EPOCH)), + None, InflightGraphInfo::default(), InflightSubscriptionInfo::default(), None, @@ -949,7 +948,14 @@ impl GlobalBarrierManager { } } - let (prev_epoch, curr_epoch) = self.state.next_epoch_pair(); + let Some((prev_epoch, curr_epoch)) = self.state.next_epoch_pair(&command) else { + // skip the command when there is nothing to do with the barrier + for mut notifier in notifiers { + notifier.notify_started(); + notifier.notify_collected(); + } + return Ok(()); + }; // Insert newly added creating job if let Command::CreateStreamingJob { @@ -1175,7 +1181,6 @@ impl GlobalBarrierManagerContext { change_log_delta: Default::default(), committed_epoch: epoch, tables_to_commit, - is_visible_table_committed_epoch: false, }; self.hummock_manager.commit_epoch(info).await?; Ok(()) @@ -1770,6 +1775,5 @@ fn collect_commit_epoch_info( change_log_delta: table_new_change_log, committed_epoch: epoch, tables_to_commit, - is_visible_table_committed_epoch: true, } } diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index efe164af77979..266e280ca48f4 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -326,16 +326,33 @@ impl GlobalBarrierManager { .context .hummock_manager .on_current_version(|version| { - let max_committed_epoch = version.max_committed_epoch_for_meta(); - for (table_id, info) in version.state_table_info.info() { - assert_eq!( - info.committed_epoch, max_committed_epoch, - "table {} with invisible epoch is not purged", - table_id + let state_table_info = version.state_table_info.info(); + let committed_epoch = state_table_info + .values() + .map(|info| info.committed_epoch) + .next(); + let existing_table_ids = info.existing_table_ids(); + for table_id in existing_table_ids { + assert!( + state_table_info.contains_key(&table_id), + "table id {table_id} not registered to hummock but in recovered job {:?}. hummock table info{:?}", + info.existing_table_ids().collect_vec(), + state_table_info ); } + if let Some(committed_epoch) = committed_epoch { + for (table_id, info) in version.state_table_info.info() { + assert_eq!( + info.committed_epoch, committed_epoch, + "table {} with invisible epoch is not purged", + table_id + ); + } + } ( - TracedEpoch::new(Epoch::from(max_committed_epoch)), + committed_epoch.map(|committed_epoch| { + TracedEpoch::new(Epoch::from(committed_epoch)) + }), version.id, ) }) @@ -388,30 +405,36 @@ impl GlobalBarrierManager { subscriptions_to_add: Default::default(), }); - // Use a different `curr_epoch` for each recovery attempt. - let new_epoch = prev_epoch.next(); - - let mut node_to_collect = control_stream_manager.inject_barrier( - None, - Some(mutation), - (&new_epoch, &prev_epoch), - &BarrierKind::Initial, - &info, - Some(&info), - Some(node_actors), - vec![], - vec![], - )?; - debug!(?node_to_collect, "inject initial barrier"); - while !node_to_collect.is_empty() { - let (worker_id, result) = control_stream_manager - .next_complete_barrier_response() - .await; - let resp = result?; - assert_eq!(resp.epoch, prev_epoch.value().0); - assert!(node_to_collect.remove(&worker_id)); - } - debug!("collected initial barrier"); + let new_epoch = if let Some(prev_epoch) = &prev_epoch { + // Use a different `curr_epoch` for each recovery attempt. + let new_epoch = prev_epoch.next(); + + let mut node_to_collect = control_stream_manager.inject_barrier( + None, + Some(mutation), + (&new_epoch, prev_epoch), + &BarrierKind::Initial, + &info, + Some(&info), + Some(node_actors), + vec![], + vec![], + )?; + debug!(?node_to_collect, "inject initial barrier"); + while !node_to_collect.is_empty() { + let (worker_id, result) = control_stream_manager + .next_complete_barrier_response() + .await; + let resp = result?; + assert_eq!(resp.epoch, prev_epoch.value().0); + assert!(node_to_collect.remove(&worker_id)); + } + debug!("collected initial barrier"); + Some(new_epoch) + } else { + assert!(info.is_empty()); + None + }; ( BarrierManagerState::new(new_epoch, info, subscription_info, paused_reason), @@ -446,7 +469,7 @@ impl GlobalBarrierManager { CheckpointControl::new(self.context.clone(), create_mview_tracker).await; tracing::info!( - epoch = self.state.in_flight_prev_epoch().value().0, + epoch = self.state.in_flight_prev_epoch().map(|epoch| epoch.value().0), paused = ?self.state.paused_reason(), "recovery success" ); diff --git a/src/meta/src/barrier/state.rs b/src/meta/src/barrier/state.rs index fa2ead1b1df05..db2ded5629d7a 100644 --- a/src/meta/src/barrier/state.rs +++ b/src/meta/src/barrier/state.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use risingwave_common::util::epoch::Epoch; use risingwave_pb::meta::PausedReason; use crate::barrier::info::{InflightGraphInfo, InflightSubscriptionInfo}; @@ -23,7 +24,7 @@ pub struct BarrierManagerState { /// /// There's no need to persist this field. On recovery, we will restore this from the latest /// committed snapshot in `HummockManager`. - in_flight_prev_epoch: TracedEpoch, + in_flight_prev_epoch: Option, /// Inflight running actors info. pub(crate) inflight_graph_info: InflightGraphInfo, @@ -36,7 +37,7 @@ pub struct BarrierManagerState { impl BarrierManagerState { pub fn new( - in_flight_prev_epoch: TracedEpoch, + in_flight_prev_epoch: Option, inflight_graph_info: InflightGraphInfo, inflight_subscription_info: InflightSubscriptionInfo, paused_reason: Option, @@ -60,16 +61,24 @@ impl BarrierManagerState { } } - pub fn in_flight_prev_epoch(&self) -> &TracedEpoch { - &self.in_flight_prev_epoch + pub fn in_flight_prev_epoch(&self) -> Option<&TracedEpoch> { + self.in_flight_prev_epoch.as_ref() } /// Returns the epoch pair for the next barrier, and updates the state. - pub fn next_epoch_pair(&mut self) -> (TracedEpoch, TracedEpoch) { - let prev_epoch = self.in_flight_prev_epoch.clone(); + pub fn next_epoch_pair(&mut self, command: &Command) -> Option<(TracedEpoch, TracedEpoch)> { + if self.inflight_graph_info.is_empty() + && !matches!(&command, Command::CreateStreamingJob { .. }) + { + return None; + }; + let in_flight_prev_epoch = self + .in_flight_prev_epoch + .get_or_insert_with(|| TracedEpoch::new(Epoch::now())); + let prev_epoch = in_flight_prev_epoch.clone(); let next_epoch = prev_epoch.next(); - self.in_flight_prev_epoch = next_epoch.clone(); - (prev_epoch, next_epoch) + *in_flight_prev_epoch = next_epoch.clone(); + Some((prev_epoch, next_epoch)) } /// Returns the inflight actor infos that have included the newly added actors in the given command. The dropped actors diff --git a/src/meta/src/hummock/manager/commit_epoch.rs b/src/meta/src/hummock/manager/commit_epoch.rs index 12419fd0993b1..8daea8e53928d 100644 --- a/src/meta/src/hummock/manager/commit_epoch.rs +++ b/src/meta/src/hummock/manager/commit_epoch.rs @@ -65,7 +65,6 @@ pub struct CommitEpochInfo { pub change_log_delta: HashMap, pub committed_epoch: u64, pub tables_to_commit: HashSet, - pub is_visible_table_committed_epoch: bool, } impl HummockManager { @@ -79,7 +78,6 @@ impl HummockManager { change_log_delta, committed_epoch, tables_to_commit, - is_visible_table_committed_epoch, } = commit_info; let mut versioning_guard = self.versioning.write().await; let _timer = start_measure_real_process_timer!(self, "commit_epoch"); @@ -88,11 +86,12 @@ impl HummockManager { return Ok(()); } + assert!(!tables_to_commit.is_empty()); + let versioning: &mut Versioning = &mut versioning_guard; self.commit_epoch_sanity_check( committed_epoch, &tables_to_commit, - is_visible_table_committed_epoch, &sstables, &sst_to_context, &versioning.current_version, @@ -194,7 +193,6 @@ impl HummockManager { let time_travel_delta = version.pre_commit_epoch( committed_epoch, &tables_to_commit, - is_visible_table_committed_epoch, new_compaction_group, commit_sstables, &new_table_ids, diff --git a/src/meta/src/hummock/manager/compaction/compaction_group_manager.rs b/src/meta/src/hummock/manager/compaction/compaction_group_manager.rs index 68d903bd680dd..2bcf5b3c62fdc 100644 --- a/src/meta/src/hummock/manager/compaction/compaction_group_manager.rs +++ b/src/meta/src/hummock/manager/compaction/compaction_group_manager.rs @@ -221,7 +221,16 @@ impl HummockManager { self.env.notification_manager(), &self.metrics, ); - let mut new_version_delta = version.new_delta(None); + let mut new_version_delta = version.new_delta(); + + let committed_epoch = new_version_delta + .latest_version() + .state_table_info + .info() + .values() + .map(|info| info.committed_epoch) + .max() + .unwrap_or(INVALID_EPOCH); for (table_id, raw_group_id) in pairs { let mut group_id = *raw_group_id; @@ -265,7 +274,7 @@ impl HummockManager { .insert( TableId::new(*table_id), PbStateTableInfoDelta { - committed_epoch: INVALID_EPOCH, + committed_epoch, compaction_group_id: *raw_group_id, } ) @@ -293,7 +302,7 @@ impl HummockManager { self.env.notification_manager(), &self.metrics, ); - let mut new_version_delta = version.new_delta(None); + let mut new_version_delta = version.new_delta(); let mut modified_groups: HashMap = HashMap::new(); // Remove member tables @@ -481,7 +490,7 @@ impl HummockManager { self.env.notification_manager(), &self.metrics, ); - let mut new_version_delta = version.new_delta(None); + let mut new_version_delta = version.new_delta(); let new_sst_start_id = next_sstable_object_id( &self.env, diff --git a/src/meta/src/hummock/manager/compaction/compaction_group_schedule.rs b/src/meta/src/hummock/manager/compaction/compaction_group_schedule.rs index f6d464590d02b..ce1cdfb364e75 100644 --- a/src/meta/src/hummock/manager/compaction/compaction_group_schedule.rs +++ b/src/meta/src/hummock/manager/compaction/compaction_group_schedule.rs @@ -167,7 +167,7 @@ impl HummockManager { self.env.notification_manager(), &self.metrics, ); - let mut new_version_delta = version.new_delta(None); + let mut new_version_delta = version.new_delta(); let target_compaction_group_id = { // merge right_group_id to left_group_id and remove right_group_id diff --git a/src/meta/src/hummock/manager/compaction/mod.rs b/src/meta/src/hummock/manager/compaction/mod.rs index 7c0c27a07921a..052aad21ba096 100644 --- a/src/meta/src/hummock/manager/compaction/mod.rs +++ b/src/meta/src/hummock/manager/compaction/mod.rs @@ -144,7 +144,7 @@ fn init_selectors() -> HashMap HummockVersionTransaction<'a> { fn apply_compact_task(&mut self, compact_task: &CompactTask) { - let mut version_delta = self.new_delta(None); + let mut version_delta = self.new_delta(); let trivial_move = CompactStatus::is_trivial_move_task(compact_task); version_delta.trivial_move = trivial_move; diff --git a/src/meta/src/hummock/manager/context.rs b/src/meta/src/hummock/manager/context.rs index 7d381e5f36fbf..cb4d56a1e79c6 100644 --- a/src/meta/src/hummock/manager/context.rs +++ b/src/meta/src/hummock/manager/context.rs @@ -190,7 +190,6 @@ impl HummockManager { &self, committed_epoch: HummockEpoch, tables_to_commit: &HashSet, - is_visible_table_committed_epoch: bool, sstables: &[LocalSstableInfo], sst_to_context: &HashMap, current_version: &HummockVersion, @@ -215,17 +214,6 @@ impl HummockManager { } } - if is_visible_table_committed_epoch - && committed_epoch <= current_version.max_committed_epoch_for_meta() - { - return Err(anyhow::anyhow!( - "Epoch {} <= max_committed_epoch {}", - committed_epoch, - current_version.max_committed_epoch_for_meta() - ) - .into()); - } - // sanity check on monotonically increasing table committed epoch for table_id in tables_to_commit { if let Some(info) = current_version.state_table_info.info().get(table_id) { diff --git a/src/meta/src/hummock/manager/tests.rs b/src/meta/src/hummock/manager/tests.rs index 6fc6ba8110b0e..fa8cfc0c9cd56 100644 --- a/src/meta/src/hummock/manager/tests.rs +++ b/src/meta/src/hummock/manager/tests.rs @@ -22,7 +22,7 @@ use itertools::Itertools; use prometheus::Registry; use risingwave_common::catalog::TableId; use risingwave_common::hash::VirtualNode; -use risingwave_common::util::epoch::{test_epoch, EpochExt, INVALID_EPOCH}; +use risingwave_common::util::epoch::{test_epoch, EpochExt}; use risingwave_hummock_sdk::compact::compact_task_to_string; use risingwave_hummock_sdk::compact_task::CompactTask; use risingwave_hummock_sdk::compaction_group::hummock_version_ext::get_compaction_group_ssts; @@ -269,10 +269,6 @@ async fn test_hummock_transaction() { .await; // Get tables before committing epoch1. No tables should be returned. let current_version = hummock_manager.get_current_version().await; - assert_eq!( - current_version.max_committed_epoch_for_test(), - INVALID_EPOCH - ); let compaction_group_id = StaticCompactionGroupId::StateDefault.into(); assert!(get_sorted_committed_object_ids(¤t_version, compaction_group_id).is_empty()); diff --git a/src/meta/src/hummock/manager/transaction.rs b/src/meta/src/hummock/manager/transaction.rs index 74c2c45bff3ed..d935f12d6c660 100644 --- a/src/meta/src/hummock/manager/transaction.rs +++ b/src/meta/src/hummock/manager/transaction.rs @@ -43,9 +43,6 @@ fn trigger_delta_log_stats(metrics: &MetaMetrics, total_number: usize) { } fn trigger_version_stat(metrics: &MetaMetrics, current_version: &HummockVersion) { - metrics - .max_committed_epoch - .set(current_version.max_committed_epoch_for_meta() as i64); metrics .version_size .set(current_version.estimated_encode_len() as i64); @@ -97,13 +94,8 @@ impl<'a> HummockVersionTransaction<'a> { } } - pub(super) fn new_delta<'b>( - &'b mut self, - max_committed_epoch: Option, - ) -> SingleDeltaTransaction<'a, 'b> { - let delta = self - .latest_version() - .version_delta_after(max_committed_epoch); + pub(super) fn new_delta<'b>(&'b mut self) -> SingleDeltaTransaction<'a, 'b> { + let delta = self.latest_version().version_delta_after(); SingleDeltaTransaction { version_txn: self, delta: Some(delta), @@ -123,19 +115,13 @@ impl<'a> HummockVersionTransaction<'a> { &mut self, committed_epoch: HummockEpoch, tables_to_commit: &HashSet, - is_visible_table_committed_epoch: bool, new_compaction_group: Option<(CompactionGroupId, CompactionConfig)>, commit_sstables: BTreeMap>, new_table_ids: &HashMap, new_table_watermarks: HashMap, change_log_delta: HashMap, ) -> HummockVersionDelta { - let new_max_committed_epoch = if is_visible_table_committed_epoch { - Some(committed_epoch) - } else { - None - }; - let mut new_version_delta = self.new_delta(new_max_committed_epoch); + let mut new_version_delta = self.new_delta(); new_version_delta.new_table_watermarks = new_table_watermarks; new_version_delta.change_log_delta = change_log_delta; diff --git a/src/meta/src/hummock/manager/versioning.rs b/src/meta/src/hummock/manager/versioning.rs index e396a0123b9b1..725d838711e9e 100644 --- a/src/meta/src/hummock/manager/versioning.rs +++ b/src/meta/src/hummock/manager/versioning.rs @@ -255,7 +255,7 @@ impl HummockManager { self.env.notification_manager(), &self.metrics, ); - let mut new_version_delta = version.new_delta(None); + let mut new_version_delta = version.new_delta(); new_version_delta.with_latest_version(|version, delta| { version.may_fill_backward_compatible_state_table_info_delta(delta) }); diff --git a/src/meta/src/hummock/mock_hummock_meta_client.rs b/src/meta/src/hummock/mock_hummock_meta_client.rs index bc87348c09308..ba54bc64969ad 100644 --- a/src/meta/src/hummock/mock_hummock_meta_client.rs +++ b/src/meta/src/hummock/mock_hummock_meta_client.rs @@ -162,6 +162,13 @@ impl HummockMetaClient for MockHummockMetaClient { .iter() .flat_map(|sstable| sstable.sst_info.table_ids.clone()) }) + .chain( + sync_result + .table_watermarks + .keys() + .map(|table_id| table_id.table_id), + ) + .chain(table_ids.iter().cloned()) .collect::>(); let new_table_fragment_info = if commit_table_ids @@ -216,7 +223,6 @@ impl HummockMetaClient for MockHummockMetaClient { .cloned() .map(TableId::from) .collect(), - is_visible_table_committed_epoch: true, }) .await .map_err(mock_err)?; diff --git a/src/storage/backup/src/lib.rs b/src/storage/backup/src/lib.rs index 18d619c1e504f..37fa28f4fe260 100644 --- a/src/storage/backup/src/lib.rs +++ b/src/storage/backup/src/lib.rs @@ -53,7 +53,6 @@ pub struct MetaSnapshotMetadata { pub id: MetaSnapshotId, pub hummock_version_id: HummockVersionId, pub ssts: HashSet, - pub max_committed_epoch: u64, #[serde(default)] pub format_version: u32, pub remarks: Option, @@ -73,7 +72,6 @@ impl MetaSnapshotMetadata { id, hummock_version_id: v.id, ssts: v.get_object_ids(), - max_committed_epoch: v.max_committed_epoch_for_meta(), format_version, remarks, state_table_info: v @@ -116,7 +114,6 @@ impl From<&MetaSnapshotMetadata> for PbMetaSnapshotMetadata { Self { id: m.id, hummock_version_id: m.hummock_version_id.to_u64(), - max_committed_epoch: m.max_committed_epoch, format_version: Some(m.format_version), remarks: m.remarks.clone(), state_table_info: m diff --git a/src/storage/benches/bench_table_watermarks.rs b/src/storage/benches/bench_table_watermarks.rs index b314d665c313d..a99dd765fd593 100644 --- a/src/storage/benches/bench_table_watermarks.rs +++ b/src/storage/benches/bench_table_watermarks.rs @@ -118,7 +118,6 @@ fn gen_version( let committed_epoch = test_epoch(new_epoch_idx as _); let mut version = HummockVersion::from_persisted_protobuf(&PbHummockVersion { id: new_epoch_idx as _, - max_committed_epoch: committed_epoch, ..Default::default() }); version.table_watermarks = (0..table_count) diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index 5ea75ec87897c..95f4b5e871220 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -544,9 +544,14 @@ impl HummockVersion { &version_delta.removed_table_ids, ); - if !is_commit_epoch && self.max_committed_epoch < version_delta.max_committed_epoch { - is_commit_epoch = true; - tracing::trace!("max committed epoch bumped but no table committed epoch is changed"); + #[expect(deprecated)] + { + if !is_commit_epoch && self.max_committed_epoch < version_delta.max_committed_epoch { + is_commit_epoch = true; + tracing::trace!( + "max committed epoch bumped but no table committed epoch is changed" + ); + } } // apply to `levels`, which is different compaction groups @@ -592,20 +597,12 @@ impl HummockVersion { ); self.merge_compaction_group(group_merge.left_group_id, group_merge.right_group_id) } - let max_committed_epoch = self.max_committed_epoch; let group_destroy = summary.group_destroy; let levels = self.levels.get_mut(compaction_group_id).unwrap_or_else(|| { panic!("compaction group {} does not exist", compaction_group_id) }); - assert!( - max_committed_epoch <= version_delta.max_committed_epoch, - "new max commit epoch {} is older than the current max commit epoch {}", - version_delta.max_committed_epoch, - max_committed_epoch - ); if is_commit_epoch { - // `max_committed_epoch` increases. It must be a `commit_epoch` let GroupDeltasSummary { delete_sst_levels, delete_sst_ids_set, @@ -641,7 +638,7 @@ impl HummockVersion { } } } else { - // `max_committed_epoch` is not changed. The delta is caused by compaction. + // The delta is caused by compaction. levels.apply_compact_ssts( summary, self.state_table_info @@ -653,7 +650,10 @@ impl HummockVersion { } } self.id = version_delta.id; - self.max_committed_epoch = version_delta.max_committed_epoch; + #[expect(deprecated)] + { + self.max_committed_epoch = version_delta.max_committed_epoch; + } // apply to table watermark diff --git a/src/storage/hummock_sdk/src/frontend_version.rs b/src/storage/hummock_sdk/src/frontend_version.rs index 254e151f7ec06..fd010e1c3e6ff 100644 --- a/src/storage/hummock_sdk/src/frontend_version.rs +++ b/src/storage/hummock_sdk/src/frontend_version.rs @@ -62,6 +62,7 @@ impl FrontendHummockVersion { } pub fn to_protobuf(&self) -> PbHummockVersion { + #[expect(deprecated)] PbHummockVersion { id: self.id.0, levels: Default::default(), @@ -174,6 +175,7 @@ impl FrontendHummockVersionDelta { } pub fn to_protobuf(&self) -> PbHummockVersionDelta { + #[expect(deprecated)] PbHummockVersionDelta { id: self.id.to_u64(), prev_id: self.prev_id.to_u64(), diff --git a/src/storage/hummock_sdk/src/table_watermark.rs b/src/storage/hummock_sdk/src/table_watermark.rs index e2d5a675f3e99..324e8a91cf4a3 100644 --- a/src/storage/hummock_sdk/src/table_watermark.rs +++ b/src/storage/hummock_sdk/src/table_watermark.rs @@ -726,8 +726,9 @@ mod tests { use risingwave_common::catalog::TableId; use risingwave_common::hash::VirtualNode; use risingwave_common::util::epoch::{test_epoch, EpochExt}; - use risingwave_pb::hummock::PbHummockVersion; + use risingwave_pb::hummock::{PbHummockVersion, StateTableInfo}; + use crate::compaction_group::StaticCompactionGroupId; use crate::key::{is_empty_key_range, prefixed_range_with_vnode, TableKeyRange}; use crate::table_watermark::{ merge_multiple_new_table_watermarks, TableWatermarks, TableWatermarksIndex, VnodeWatermark, @@ -1184,11 +1185,18 @@ mod tests { watermark3.clone(), ); + let test_table_id = TableId::from(233); + let mut version = HummockVersion::from_rpc_protobuf(&PbHummockVersion { - max_committed_epoch: EPOCH1, + state_table_info: HashMap::from_iter([( + test_table_id.table_id, + StateTableInfo { + committed_epoch: EPOCH1, + compaction_group_id: StaticCompactionGroupId::StateDefault as _, + }, + )]), ..Default::default() }); - let test_table_id = TableId::from(233); version.table_watermarks.insert( test_table_id, TableWatermarks { diff --git a/src/storage/hummock_sdk/src/time_travel.rs b/src/storage/hummock_sdk/src/time_travel.rs index 7c6b368300329..3a58a7daa760c 100644 --- a/src/storage/hummock_sdk/src/time_travel.rs +++ b/src/storage/hummock_sdk/src/time_travel.rs @@ -91,6 +91,7 @@ fn refill_sstable_info( impl From<(&HummockVersion, &HashSet)> for IncompleteHummockVersion { fn from(p: (&HummockVersion, &HashSet)) -> Self { let (version, select_group) = p; + #[expect(deprecated)] Self { id: version.id, levels: version @@ -136,6 +137,7 @@ pub type IncompleteHummockVersionDelta = HummockVersionDeltaCommon)> for IncompleteHummockVersionDelta { fn from(p: (&HummockVersionDelta, &HashSet)) -> Self { let (delta, select_group) = p; + #[expect(deprecated)] Self { id: delta.id, prev_id: delta.prev_id, diff --git a/src/storage/hummock_sdk/src/version.rs b/src/storage/hummock_sdk/src/version.rs index 46807a5138d09..6c7328bde6d57 100644 --- a/src/storage/hummock_sdk/src/version.rs +++ b/src/storage/hummock_sdk/src/version.rs @@ -219,6 +219,7 @@ impl HummockVersionStateTableInfo { pub struct HummockVersionCommon { pub id: HummockVersionId, pub levels: HashMap>, + #[deprecated] pub(crate) max_committed_epoch: u64, pub table_watermarks: HashMap>, pub table_change_log: HashMap>, @@ -277,6 +278,7 @@ where T: for<'a> From<&'a PbSstableInfo>, { fn from(pb_version: &PbHummockVersion) -> Self { + #[expect(deprecated)] Self { id: HummockVersionId(pb_version.id), levels: pb_version @@ -319,6 +321,7 @@ where PbSstableInfo: for<'a> From<&'a T>, { fn from(version: &HummockVersionCommon) -> Self { + #[expect(deprecated)] Self { id: version.id.0, levels: version @@ -348,6 +351,7 @@ where PbSstableInfo: for<'a> From<&'a T>, { fn from(version: HummockVersionCommon) -> Self { + #[expect(deprecated)] Self { id: version.id.0, levels: version @@ -413,13 +417,24 @@ impl HummockVersion { } } - pub fn max_committed_epoch_for_meta(&self) -> u64 { - self.max_committed_epoch - } - #[cfg(any(test, feature = "test"))] pub fn max_committed_epoch_for_test(&self) -> u64 { - self.max_committed_epoch + let committed_epoch = self + .state_table_info + .info() + .values() + .next() + .unwrap() + .committed_epoch; + assert!( + self.state_table_info + .info() + .values() + .all(|info| info.committed_epoch == committed_epoch), + "info: {:?}", + self.state_table_info.info() + ); + committed_epoch } pub fn table_committed_epoch(&self, table_id: TableId) -> Option { @@ -430,6 +445,7 @@ impl HummockVersion { } pub fn create_init_version(default_compaction_config: Arc) -> HummockVersion { + #[expect(deprecated)] let mut init_version = HummockVersion { id: FIRST_VERSION_ID, levels: Default::default(), @@ -450,19 +466,13 @@ impl HummockVersion { init_version } - pub fn version_delta_after(&self, max_committed_epoch: Option) -> HummockVersionDelta { - let max_committed_epoch = max_committed_epoch.unwrap_or(self.max_committed_epoch); - assert!( - max_committed_epoch >= self.max_committed_epoch, - "new max_committed_epoch {} less than prev max_committed_epoch: {}", - max_committed_epoch, - self.max_committed_epoch - ); + pub fn version_delta_after(&self) -> HummockVersionDelta { + #[expect(deprecated)] HummockVersionDelta { id: self.next_version_id(), prev_id: self.id, trivial_move: false, - max_committed_epoch, + max_committed_epoch: self.max_committed_epoch, group_deltas: Default::default(), new_table_watermarks: HashMap::new(), removed_table_ids: HashSet::new(), @@ -477,6 +487,7 @@ pub struct HummockVersionDeltaCommon { pub id: HummockVersionId, pub prev_id: HummockVersionId, pub group_deltas: HashMap>, + #[deprecated] pub(crate) max_committed_epoch: u64, pub trivial_move: bool, pub new_table_watermarks: HashMap, @@ -600,6 +611,7 @@ impl HummockVersionDelta { })) } + #[expect(deprecated)] pub fn max_committed_epoch_for_migration(&self) -> HummockEpoch { self.max_committed_epoch } @@ -610,6 +622,7 @@ where T: for<'a> From<&'a PbSstableInfo>, { fn from(pb_version_delta: &PbHummockVersionDelta) -> Self { + #[expect(deprecated)] Self { id: HummockVersionId(pb_version_delta.id), prev_id: HummockVersionId(pb_version_delta.prev_id), @@ -665,6 +678,7 @@ where PbSstableInfo: for<'a> From<&'a T>, { fn from(version_delta: &HummockVersionDeltaCommon) -> Self { + #[expect(deprecated)] Self { id: version_delta.id.0, prev_id: version_delta.prev_id.0, @@ -704,6 +718,7 @@ where PbSstableInfo: From, { fn from(version_delta: HummockVersionDeltaCommon) -> Self { + #[expect(deprecated)] Self { id: version_delta.id.0, prev_id: version_delta.prev_id.0, @@ -743,6 +758,7 @@ where T: From, { fn from(pb_version_delta: PbHummockVersionDelta) -> Self { + #[expect(deprecated)] Self { id: HummockVersionId(pb_version_delta.id), prev_id: HummockVersionId(pb_version_delta.prev_id), diff --git a/src/storage/hummock_test/src/hummock_read_version_tests.rs b/src/storage/hummock_test/src/hummock_read_version_tests.rs index 0e49b9de872e8..c1e1276025f36 100644 --- a/src/storage/hummock_test/src/hummock_read_version_tests.rs +++ b/src/storage/hummock_test/src/hummock_read_version_tests.rs @@ -335,11 +335,8 @@ async fn test_read_filter_basic() { assert_eq!(1, hummock_read_snapshot.0.len()); assert_eq!(0, hummock_read_snapshot.1.len()); assert_eq!( - read_version - .read() - .committed() - .max_committed_epoch_for_test(), - hummock_read_snapshot.2.max_committed_epoch_for_test() + read_version.read().committed().id, + hummock_read_snapshot.2.id, ); } } diff --git a/src/storage/hummock_test/src/hummock_storage_tests.rs b/src/storage/hummock_test/src/hummock_storage_tests.rs index 9bd257c36b9ac..9e2de785fcdbf 100644 --- a/src/storage/hummock_test/src/hummock_storage_tests.rs +++ b/src/storage/hummock_test/src/hummock_storage_tests.rs @@ -25,7 +25,7 @@ use risingwave_common::bitmap::BitmapBuilder; use risingwave_common::catalog::TableId; use risingwave_common::hash::VirtualNode; use risingwave_common::range::RangeBoundsExt; -use risingwave_common::util::epoch::{test_epoch, EpochExt}; +use risingwave_common::util::epoch::{test_epoch, EpochExt, INVALID_EPOCH}; use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; use risingwave_hummock_sdk::key::{ gen_key_from_bytes, prefixed_range_with_vnode, FullKey, TableKey, UserKey, TABLE_PREFIX_LEN, @@ -2420,7 +2420,7 @@ async fn test_table_watermark() { .get(&TEST_TABLE_ID) .unwrap() .clone(), - version.max_committed_epoch_for_test(), + epoch, ); assert_eq!(WatermarkDirection::Ascending, table_watermarks.direction()); assert_eq!( @@ -2575,50 +2575,42 @@ async fn test_commit_multi_epoch() { HashMap::from_iter(object_ids.iter().map(|object_id| (*object_id, context_id))) }; let existing_table_id = TableId::new(1); - let initial_epoch = test_env - .manager - .get_current_version() - .await - .max_committed_epoch_for_test(); - - let commit_epoch = |epoch, - sst: SstableInfo, + let initial_epoch = INVALID_EPOCH; + + let commit_epoch = + |epoch, sst: SstableInfo, new_table_fragment_info, tables_to_commit: &[TableId]| { + let manager = &test_env.manager; + let tables_to_commit = tables_to_commit.iter().cloned().collect(); + async move { + manager + .commit_epoch(CommitEpochInfo { + new_table_watermarks: Default::default(), + sst_to_context: context_id_map(&[sst.object_id]), + sstables: vec![LocalSstableInfo { + table_stats: sst + .table_ids + .iter() + .map(|&table_id| { + ( + table_id, + TableStats { + total_compressed_size: 10, + ..Default::default() + }, + ) + }) + .collect(), + sst_info: sst, + }], new_table_fragment_info, - tables_to_commit: &[TableId], - is_visible_table_committed_epoch| { - let manager = &test_env.manager; - let tables_to_commit = tables_to_commit.iter().cloned().collect(); - async move { - manager - .commit_epoch(CommitEpochInfo { - new_table_watermarks: Default::default(), - sst_to_context: context_id_map(&[sst.object_id]), - sstables: vec![LocalSstableInfo { - table_stats: sst - .table_ids - .iter() - .map(|&table_id| { - ( - table_id, - TableStats { - total_compressed_size: 10, - ..Default::default() - }, - ) - }) - .collect(), - sst_info: sst, - }], - new_table_fragment_info, - change_log_delta: Default::default(), - committed_epoch: epoch, - tables_to_commit, - is_visible_table_committed_epoch, - }) - .await - .unwrap(); - } - }; + change_log_delta: Default::default(), + committed_epoch: epoch, + tables_to_commit, + }) + .await + .unwrap(); + } + }; let epoch1 = initial_epoch.next_epoch(); let sst1_epoch1 = SstableInfo { @@ -2638,7 +2630,6 @@ async fn test_commit_multi_epoch() { internal_table_ids: vec![existing_table_id], }, &[existing_table_id], - true, ) .await; @@ -2655,8 +2646,6 @@ async fn test_commit_multi_epoch() { assert_eq!(sub_level.table_infos.len(), 1); assert_eq!(sub_level.table_infos[0].object_id, sst1_epoch1.object_id); - assert_eq!(version.max_committed_epoch_for_test(), epoch1); - let info = version .state_table_info .info() @@ -2687,7 +2676,6 @@ async fn test_commit_multi_epoch() { sst1_epoch2.clone(), NewTableFragmentInfo::None, &[existing_table_id], - true, ) .await; @@ -2708,8 +2696,6 @@ async fn test_commit_multi_epoch() { assert_eq!(sub_level.table_infos.len(), 1); assert_eq!(sub_level.table_infos[0].object_id, sst1_epoch2.object_id); - assert_eq!(version.max_committed_epoch_for_test(), epoch2); - let info = version .state_table_info .info() @@ -2740,7 +2726,6 @@ async fn test_commit_multi_epoch() { table_ids: HashSet::from_iter([new_table_id]), }, &[new_table_id], - false, ) .await; @@ -2759,8 +2744,6 @@ async fn test_commit_multi_epoch() { assert_eq!(sub_level1.table_infos.len(), 1); assert_eq!(sub_level1.table_infos[0].object_id, sst2_epoch1.object_id); - assert_eq!(version.max_committed_epoch_for_test(), epoch2); - let info = version.state_table_info.info().get(&new_table_id).unwrap(); assert_eq!(info.committed_epoch, epoch1); assert_eq!(info.compaction_group_id, new_cg_id); @@ -2782,7 +2765,6 @@ async fn test_commit_multi_epoch() { sst2_epoch2.clone(), NewTableFragmentInfo::None, &[new_table_id], - false, ) .await; @@ -2801,8 +2783,6 @@ async fn test_commit_multi_epoch() { assert_eq!(sub_level2.table_infos.len(), 1); assert_eq!(sub_level2.table_infos[0].object_id, sst2_epoch2.object_id); - assert_eq!(version.max_committed_epoch_for_test(), epoch2); - let info = version.state_table_info.info().get(&new_table_id).unwrap(); assert_eq!(info.committed_epoch, epoch2); assert_eq!(info.compaction_group_id, new_cg_id); @@ -2824,7 +2804,6 @@ async fn test_commit_multi_epoch() { sst_epoch3.clone(), NewTableFragmentInfo::None, &[existing_table_id, new_table_id], - true, ) .await; @@ -2865,8 +2844,6 @@ async fn test_commit_multi_epoch() { assert_eq!(sub_level3.table_infos.len(), 1); assert_eq!(sub_level3.table_infos[0].object_id, sst2_epoch2.object_id); - assert_eq!(version.max_committed_epoch_for_test(), epoch3); - let info = version.state_table_info.info().get(&new_table_id).unwrap(); assert_eq!(info.committed_epoch, epoch3); diff --git a/src/storage/hummock_test/src/state_store_tests.rs b/src/storage/hummock_test/src/state_store_tests.rs index ab38ccf33fb5c..3280af379fc2d 100644 --- a/src/storage/hummock_test/src/state_store_tests.rs +++ b/src/storage/hummock_test/src/state_store_tests.rs @@ -25,7 +25,7 @@ use itertools::Itertools; use risingwave_common::bitmap::Bitmap; use risingwave_common::catalog::{TableId, TableOption}; use risingwave_common::hash::VirtualNode; -use risingwave_common::util::epoch::{test_epoch, EpochExt, MAX_EPOCH}; +use risingwave_common::util::epoch::{test_epoch, EpochExt, INVALID_EPOCH, MAX_EPOCH}; use risingwave_hummock_sdk::key::{prefixed_range_with_vnode, TableKeyRange}; use risingwave_hummock_sdk::{ HummockReadEpoch, HummockSstableObjectId, LocalSstableInfo, SyncResult, @@ -423,10 +423,7 @@ async fn test_basic_v2() { async fn test_state_store_sync_v2() { let (hummock_storage, _meta_client) = with_hummock_storage_v2(Default::default()).await; - let mut epoch = hummock_storage - .get_pinned_version() - .max_committed_epoch_for_test() - .next_epoch(); + let mut epoch = INVALID_EPOCH.next_epoch(); // ingest 16B batch let mut batch1 = vec![ @@ -1040,9 +1037,7 @@ async fn test_reload_storage() { async fn test_delete_get_v2() { let (hummock_storage, meta_client) = with_hummock_storage_v2(Default::default()).await; - let initial_epoch = hummock_storage - .get_pinned_version() - .max_committed_epoch_for_test(); + let initial_epoch = INVALID_EPOCH; let epoch1 = initial_epoch.next_epoch(); hummock_storage.start_epoch(epoch1, HashSet::from_iter([Default::default()])); let batch1 = vec![ @@ -1132,9 +1127,7 @@ async fn test_delete_get_v2() { async fn test_multiple_epoch_sync_v2() { let (hummock_storage, meta_client) = with_hummock_storage_v2(Default::default()).await; - let initial_epoch = hummock_storage - .get_pinned_version() - .max_committed_epoch_for_test(); + let initial_epoch = INVALID_EPOCH; let epoch1 = initial_epoch.next_epoch(); let batch1 = vec![ ( @@ -1310,9 +1303,7 @@ async fn test_gc_watermark_and_clear_shared_buffer() { .await; let table_id_set = HashSet::from_iter([local_hummock_storage.table_id()]); - let initial_epoch = hummock_storage - .get_pinned_version() - .max_committed_epoch_for_test(); + let initial_epoch = INVALID_EPOCH; let epoch1 = initial_epoch.next_epoch(); hummock_storage.start_epoch(epoch1, HashSet::from_iter([Default::default()])); local_hummock_storage.init_for_test(epoch1).await.unwrap(); @@ -1421,8 +1412,7 @@ async fn test_replicated_local_hummock_storage() { }, change_log_delta: Default::default(), committed_epoch: epoch0, - tables_to_commit: Default::default(), - is_visible_table_committed_epoch: true, + tables_to_commit: HashSet::from_iter([TEST_TABLE_ID]), }) .await .unwrap(); diff --git a/src/storage/src/hummock/event_handler/hummock_event_handler.rs b/src/storage/src/hummock/event_handler/hummock_event_handler.rs index 15f4c74369fdf..7744b102761de 100644 --- a/src/storage/src/hummock/event_handler/hummock_event_handler.rs +++ b/src/storage/src/hummock/event_handler/hummock_event_handler.rs @@ -878,7 +878,7 @@ impl SyncedData { #[cfg(test)] mod tests { - use std::collections::HashSet; + use std::collections::{HashMap, HashSet}; use std::future::{poll_fn, Future}; use std::sync::Arc; use std::task::Poll; @@ -888,8 +888,9 @@ mod tests { use risingwave_common::bitmap::Bitmap; use risingwave_common::hash::VirtualNode; use risingwave_common::util::epoch::{test_epoch, EpochExt}; + use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; use risingwave_hummock_sdk::version::HummockVersion; - use risingwave_pb::hummock::PbHummockVersion; + use risingwave_pb::hummock::{PbHummockVersion, StateTableInfo}; use tokio::spawn; use tokio::sync::mpsc::unbounded_channel; use tokio::sync::oneshot; @@ -908,19 +909,17 @@ mod tests { #[tokio::test] async fn test_clear_shared_buffer() { - let epoch0 = test_epoch(233); let mut next_version_id = 1; - let mut make_new_version = |max_committed_epoch| { + let mut make_new_version = || { let id = next_version_id; next_version_id += 1; HummockVersion::from_rpc_protobuf(&PbHummockVersion { id, - max_committed_epoch, ..Default::default() }) }; - let initial_version = PinnedVersion::new(make_new_version(epoch0), unbounded_channel().0); + let initial_version = PinnedVersion::new(make_new_version(), unbounded_channel().0); let (version_update_tx, version_update_rx) = unbounded_channel(); let (refill_task_tx, mut refill_task_rx) = unbounded_channel(); @@ -961,8 +960,7 @@ mod tests { send_clear(initial_version.id()).await.unwrap(); // test normal refill finish - let epoch1 = epoch0 + 1; - let version1 = make_new_version(epoch1); + let version1 = make_new_version(); { version_update_tx .send(HummockVersionUpdate::PinnedVersion(Box::new( @@ -981,10 +979,8 @@ mod tests { } // test recovery with pending refill task - let epoch2 = epoch1 + 1; - let version2 = make_new_version(epoch2); - let epoch3 = epoch2 + 1; - let version3 = make_new_version(epoch3); + let version2 = make_new_version(); + let version3 = make_new_version(); { version_update_tx .send(HummockVersionUpdate::PinnedVersion(Box::new( @@ -1016,10 +1012,8 @@ mod tests { } // test recovery with later arriving version update - let epoch4 = epoch3 + 1; - let version4 = make_new_version(epoch4); - let epoch5 = epoch4 + 1; - let version5 = make_new_version(epoch5); + let version4 = make_new_version(); + let version5 = make_new_version(); { let mut rx = send_clear(version5.id); assert_pending(&mut rx).await; @@ -1046,7 +1040,13 @@ mod tests { let initial_version = PinnedVersion::new( HummockVersion::from_rpc_protobuf(&PbHummockVersion { id: 1, - max_committed_epoch: epoch0, + state_table_info: HashMap::from_iter([( + TEST_TABLE_ID.table_id, + StateTableInfo { + committed_epoch: epoch0, + compaction_group_id: StaticCompactionGroupId::StateDefault as _, + }, + )]), ..Default::default() }), unbounded_channel().0, diff --git a/src/storage/src/hummock/event_handler/uploader/test_utils.rs b/src/storage/src/hummock/event_handler/uploader/test_utils.rs index 05bd5200b3486..2f711ede7cefc 100644 --- a/src/storage/src/hummock/event_handler/uploader/test_utils.rs +++ b/src/storage/src/hummock/event_handler/uploader/test_utils.rs @@ -91,7 +91,6 @@ impl HummockUploader { pub(super) fn test_hummock_version(epoch: HummockEpoch) -> HummockVersion { let mut version = HummockVersion::from_persisted_protobuf(&PbHummockVersion { id: epoch, - max_committed_epoch: epoch, ..Default::default() }); version.state_table_info.apply_delta(