From 84154f3e5eb82f99cbe941ee855cdb114f348094 Mon Sep 17 00:00:00 2001 From: William Wen Date: Tue, 10 Sep 2024 19:08:24 +0800 Subject: [PATCH] refactor(storage): cache recent versions and unify batch query and time travel --- src/storage/Cargo.toml | 2 +- src/storage/hummock_trace/src/opts.rs | 2 - .../event_handler/hummock_event_handler.rs | 41 ++- src/storage/src/hummock/local_version/mod.rs | 1 + .../hummock/local_version/recent_versions.rs | 283 ++++++++++++++++++ .../src/hummock/store/hummock_storage.rs | 109 ++++--- src/storage/src/hummock/utils.rs | 21 +- src/storage/src/store.rs | 3 - .../src/table/batch_table/storage_table.rs | 4 - 9 files changed, 362 insertions(+), 104 deletions(-) create mode 100644 src/storage/src/hummock/local_version/recent_versions.rs diff --git a/src/storage/Cargo.toml b/src/storage/Cargo.toml index 2886c4e4e23f7..6a6bde4b146e0 100644 --- a/src/storage/Cargo.toml +++ b/src/storage/Cargo.toml @@ -96,7 +96,7 @@ workspace-hack = { path = "../workspace-hack" } bincode = "1" criterion = { workspace = true, features = ["async_futures", "async_tokio"] } expect-test = "1" -risingwave_hummock_sdk = { workspace = true } +risingwave_hummock_sdk = { workspace = true, features = ["test"] } risingwave_test_runner = { workspace = true } uuid = { version = "1", features = ["v4"] } diff --git a/src/storage/hummock_trace/src/opts.rs b/src/storage/hummock_trace/src/opts.rs index 5d480cca96b58..13b4b49b2022b 100644 --- a/src/storage/hummock_trace/src/opts.rs +++ b/src/storage/hummock_trace/src/opts.rs @@ -109,7 +109,6 @@ pub struct TracedReadOptions { pub retention_seconds: Option, pub table_id: TracedTableId, pub read_version_from_backup: bool, - pub read_version_from_time_travel: bool, } impl TracedReadOptions { @@ -125,7 +124,6 @@ impl TracedReadOptions { retention_seconds: None, table_id: TracedTableId { table_id }, read_version_from_backup: false, - read_version_from_time_travel: false, } } } diff --git a/src/storage/src/hummock/event_handler/hummock_event_handler.rs b/src/storage/src/hummock/event_handler/hummock_event_handler.rs index f2aa2ea7fd88d..c1621f71a3f4a 100644 --- a/src/storage/src/hummock/event_handler/hummock_event_handler.rs +++ b/src/storage/src/hummock/event_handler/hummock_event_handler.rs @@ -50,6 +50,7 @@ use crate::hummock::event_handler::{ ReadOnlyRwLockRef, }; use crate::hummock::local_version::pinned_version::PinnedVersion; +use crate::hummock::local_version::recent_versions::RecentVersions; use crate::hummock::store::version::{ HummockReadVersion, StagingData, StagingSstableInfo, VersionUpdate, }; @@ -197,7 +198,7 @@ pub struct HummockEventHandler { local_read_version_mapping: HashMap, version_update_notifier_tx: Arc>, - pinned_version: Arc>, + recent_versions: Arc>, write_conflict_detector: Option>, uploader: HummockUploader, @@ -355,7 +356,10 @@ impl HummockEventHandler { hummock_event_rx, version_update_rx, version_update_notifier_tx, - pinned_version: Arc::new(ArcSwap::from_pointee(pinned_version)), + recent_versions: Arc::new(ArcSwap::from_pointee(RecentVersions::new( + pinned_version, + 60, + ))), write_conflict_detector, read_version_mapping, local_read_version_mapping: Default::default(), @@ -371,8 +375,8 @@ impl HummockEventHandler { self.version_update_notifier_tx.clone() } - pub fn pinned_version(&self) -> Arc> { - self.pinned_version.clone() + pub fn recent_versions(&self) -> Arc> { + self.recent_versions.clone() } pub fn read_version_mapping(&self) -> ReadOnlyReadVersionMapping { @@ -629,8 +633,9 @@ impl HummockEventHandler { .metrics .event_handler_on_apply_version_update .start_timer(); - self.pinned_version - .store(Arc::new(new_pinned_version.clone())); + self.recent_versions.rcu(|prev_recent_versions| { + prev_recent_versions.with_new_version(new_pinned_version.clone()) + }); { self.for_each_read_version( @@ -663,7 +668,10 @@ impl HummockEventHandler { // TODO: should we change the logic when supporting partial ckpt? if let Some(sstable_object_id_manager) = &self.sstable_object_id_manager { sstable_object_id_manager.remove_watermark_object_id(TrackerId::Epoch( - self.pinned_version.load().visible_table_committed_epoch(), + self.recent_versions + .load() + .latest_version() + .visible_table_committed_epoch(), )); } @@ -789,13 +797,13 @@ impl HummockEventHandler { is_replicated, vnodes, } => { - let pinned_version = self.pinned_version.load(); + let pinned_version = self.recent_versions.load().latest_version().clone(); let instance_id = self.generate_instance_id(); let basic_read_version = Arc::new(RwLock::new( HummockReadVersion::new_with_replication_option( table_id, instance_id, - (**pinned_version).clone(), + pinned_version, is_replicated, vnodes, ), @@ -992,7 +1000,7 @@ mod tests { ); let event_tx = event_handler.event_sender(); - let latest_version = event_handler.pinned_version.clone(); + let latest_version = event_handler.recent_versions.clone(); let latest_version_update_tx = event_handler.version_update_notifier_tx.clone(); let send_clear = |version_id| { @@ -1018,12 +1026,15 @@ mod tests { let (old_version, new_version, refill_finish_tx) = refill_task_rx.recv().await.unwrap(); assert_eq!(old_version.version(), initial_version.version()); assert_eq!(new_version.version(), &version1); - assert_eq!(latest_version.load().version(), initial_version.version()); + assert_eq!( + latest_version.load().latest_version().version(), + initial_version.version() + ); let mut changed = latest_version_update_tx.subscribe(); refill_finish_tx.send(()).unwrap(); changed.changed().await.unwrap(); - assert_eq!(latest_version.load().version(), &version1); + assert_eq!(latest_version.load().latest_version().version(), &version1); } // test recovery with pending refill task @@ -1050,11 +1061,11 @@ mod tests { refill_task_rx.recv().await.unwrap(); assert_eq!(old_version3.version(), &version2); assert_eq!(new_version3.version(), &version3); - assert_eq!(latest_version.load().version(), &version1); + assert_eq!(latest_version.load().latest_version().version(), &version1); let rx = send_clear(version3.id); rx.await.unwrap(); - assert_eq!(latest_version.load().version(), &version3); + assert_eq!(latest_version.load().latest_version().version(), &version3); } async fn assert_pending(fut: &mut (impl Future + Unpin)) { @@ -1081,7 +1092,7 @@ mod tests { ))) .unwrap(); rx.await.unwrap(); - assert_eq!(latest_version.load().version(), &version5); + assert_eq!(latest_version.load().latest_version().version(), &version5); } } diff --git a/src/storage/src/hummock/local_version/mod.rs b/src/storage/src/hummock/local_version/mod.rs index 578e123c6574e..4a45c8dc9075c 100644 --- a/src/storage/src/hummock/local_version/mod.rs +++ b/src/storage/src/hummock/local_version/mod.rs @@ -13,3 +13,4 @@ // limitations under the License. pub mod pinned_version; +pub mod recent_versions; diff --git a/src/storage/src/hummock/local_version/recent_versions.rs b/src/storage/src/hummock/local_version/recent_versions.rs new file mode 100644 index 0000000000000..ff90bc0c9bbe2 --- /dev/null +++ b/src/storage/src/hummock/local_version/recent_versions.rs @@ -0,0 +1,283 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::cmp::Ordering; + +use risingwave_common::catalog::TableId; +use risingwave_hummock_sdk::HummockEpoch; + +use crate::hummock::local_version::pinned_version::PinnedVersion; + +pub struct RecentVersions { + latest_version: PinnedVersion, + is_latest_committed: bool, + recent_versions: Vec, // earlier version at the front + max_version_num: usize, +} + +impl RecentVersions { + pub fn new(version: PinnedVersion, max_version_num: usize) -> Self { + assert!(max_version_num > 0); + Self { + latest_version: version, + is_latest_committed: true, // The first version is always treated as committed epochs + recent_versions: Vec::new(), + max_version_num, + } + } + + fn has_table_committed(&self, new_version: &PinnedVersion) -> bool { + let mut has_table_committed = false; + for (table_id, info) in new_version.version().state_table_info.info() { + if let Some(prev_info) = self + .latest_version + .version() + .state_table_info + .info() + .get(table_id) + { + match info.committed_epoch.cmp(&prev_info.committed_epoch) { + Ordering::Less => { + unreachable!( + "table {} has regress committed epoch {}, prev committed epoch {}", + table_id, info.committed_epoch, prev_info.committed_epoch + ); + } + Ordering::Equal => {} + Ordering::Greater => { + has_table_committed = true; + } + } + } else { + has_table_committed = true; + } + } + has_table_committed + } + + #[must_use] + pub fn with_new_version(&self, version: PinnedVersion) -> Self { + assert!(version.version().id > self.latest_version.version().id); + let is_committed = self.has_table_committed(&version); + let recent_versions = if self.is_latest_committed { + let prev_recent_versions = if self.recent_versions.len() >= self.max_version_num { + assert_eq!(self.recent_versions.len(), self.max_version_num); + &self.recent_versions[1..] + } else { + &self.recent_versions[..] + }; + let mut recent_versions = Vec::with_capacity(prev_recent_versions.len() + 1); + recent_versions.extend(prev_recent_versions.iter().cloned()); + recent_versions.push(self.latest_version.clone()); + recent_versions + } else { + self.recent_versions.clone() + }; + Self { + latest_version: version, + is_latest_committed: is_committed, + recent_versions, + max_version_num: self.max_version_num, + } + } + + pub fn latest_version(&self) -> &PinnedVersion { + &self.latest_version + } + + /// Return the latest version that is safe to read `epoch` on `table_id`. + /// + /// `safe to read` means that the `committed_epoch` of the `table_id` in the version won't be greater than the given `epoch` + pub fn get_safe_version( + &self, + table_id: TableId, + epoch: HummockEpoch, + ) -> Option { + let Some(info) = self + .latest_version + .version() + .state_table_info + .info() + .get(&table_id) + else { + return Some(self.latest_version.clone()); + }; + if info.committed_epoch <= epoch { + return Some(self.latest_version.clone()); + } + self.get_safe_version_from_recent(table_id, epoch) + } + + fn get_safe_version_from_recent( + &self, + table_id: TableId, + epoch: HummockEpoch, + ) -> Option { + let result = self.recent_versions.binary_search_by(|version| { + let committed_epoch = version + .version() + .state_table_info + .info() + .get(&table_id) + .map(|info| info.committed_epoch) + .unwrap_or(0); + committed_epoch.cmp(&epoch) + }); + match result { + Ok(index) => Some(self.recent_versions[index].clone()), + Err(index) => { + // `index` is index of the first version that has `committed_epoch` greater than `epoch` + // or `index` equals `recent_version.len()` when `epoch` is greater than all `committed_epoch` + if index >= self.recent_versions.len() { + assert_eq!(index, self.recent_versions.len()); + self.recent_versions.last().cloned() + } else if index == 0 { + None + } else { + self.recent_versions.get(index - 1).cloned() + } + } + } + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + + use risingwave_common::catalog::TableId; + use risingwave_hummock_sdk::version::HummockVersion; + use risingwave_pb::hummock::{PbHummockVersion, StateTableInfo}; + use tokio::sync::mpsc::unbounded_channel; + + use crate::hummock::local_version::pinned_version::PinnedVersion; + use crate::hummock::local_version::recent_versions::RecentVersions; + + const TEST_TABLE_ID1: TableId = TableId::new(233); + const TEST_TABLE_ID2: TableId = TableId::new(234); + + fn gen_pin_version( + version_id: u64, + table_committed_epoch: impl IntoIterator, + ) -> PinnedVersion { + PinnedVersion::new( + HummockVersion::from_rpc_protobuf(&PbHummockVersion { + id: version_id, + state_table_info: HashMap::from_iter(table_committed_epoch.into_iter().map( + |(table_id, committed_epoch)| { + ( + table_id.table_id, + StateTableInfo { + committed_epoch, + safe_epoch: 0, + compaction_group_id: 0, + }, + ) + }, + )), + ..Default::default() + }), + unbounded_channel().0, + ) + } + + fn assert_query_equal( + recent_version: &RecentVersions, + expected: &[(TableId, u64, Option<&PinnedVersion>)], + ) { + for (table_id, epoch, expected_version) in expected { + let version = recent_version.get_safe_version(*table_id, *epoch); + assert_eq!( + version.as_ref().map(|version| version.id()), + expected_version.map(|version| version.id()) + ); + } + } + + #[test] + fn test_basic() { + let epoch1 = 233; + let epoch0 = epoch1 - 1; + let epoch2 = epoch1 + 1; + let epoch3 = epoch2 + 1; + let epoch4 = epoch3 + 1; + let version1 = gen_pin_version(1, [(TEST_TABLE_ID1, epoch1)]); + // with at most 2 historical versions + let recent_versions = RecentVersions::new(version1.clone(), 2); + assert!(recent_versions.recent_versions.is_empty()); + assert!(recent_versions.is_latest_committed); + assert_query_equal( + &recent_versions, + &[ + (TEST_TABLE_ID1, epoch0, None), + (TEST_TABLE_ID1, epoch1, Some(&version1)), + (TEST_TABLE_ID1, epoch2, Some(&version1)), + ], + ); + + let recent_versions = + recent_versions.with_new_version(gen_pin_version(2, [(TEST_TABLE_ID1, epoch1)])); + assert_eq!(recent_versions.recent_versions.len(), 1); + assert!(!recent_versions.is_latest_committed); + + let version3 = gen_pin_version(3, [(TEST_TABLE_ID1, epoch2)]); + let recent_versions = recent_versions.with_new_version(version3.clone()); + assert_eq!(recent_versions.recent_versions.len(), 1); + assert!(recent_versions.is_latest_committed); + assert_query_equal( + &recent_versions, + &[ + (TEST_TABLE_ID1, epoch0, None), + (TEST_TABLE_ID1, epoch1, Some(&version1)), + (TEST_TABLE_ID1, epoch2, Some(&version3)), + (TEST_TABLE_ID1, epoch3, Some(&version3)), + ], + ); + + let version4 = gen_pin_version(4, [(TEST_TABLE_ID2, epoch1), (TEST_TABLE_ID1, epoch2)]); + let recent_versions = recent_versions.with_new_version(version4.clone()); + assert_eq!(recent_versions.recent_versions.len(), 2); + assert!(recent_versions.is_latest_committed); + assert_query_equal( + &recent_versions, + &[ + (TEST_TABLE_ID1, epoch0, None), + (TEST_TABLE_ID1, epoch1, Some(&version1)), + (TEST_TABLE_ID1, epoch2, Some(&version4)), + (TEST_TABLE_ID1, epoch3, Some(&version4)), + (TEST_TABLE_ID2, epoch0, Some(&version3)), + (TEST_TABLE_ID2, epoch1, Some(&version4)), + (TEST_TABLE_ID2, epoch2, Some(&version4)), + ], + ); + + let version5 = gen_pin_version(5, [(TEST_TABLE_ID2, epoch1), (TEST_TABLE_ID1, epoch3)]); + let recent_versions = recent_versions.with_new_version(version5.clone()); + assert_eq!(recent_versions.recent_versions.len(), 2); + assert!(recent_versions.is_latest_committed); + assert_query_equal( + &recent_versions, + &[ + (TEST_TABLE_ID1, epoch0, None), + (TEST_TABLE_ID1, epoch1, None), + (TEST_TABLE_ID1, epoch2, Some(&version4)), + (TEST_TABLE_ID1, epoch3, Some(&version5)), + (TEST_TABLE_ID1, epoch4, Some(&version5)), + (TEST_TABLE_ID2, epoch0, Some(&version3)), + (TEST_TABLE_ID2, epoch1, Some(&version5)), + (TEST_TABLE_ID2, epoch2, Some(&version5)), + ], + ); + } +} diff --git a/src/storage/src/hummock/store/hummock_storage.rs b/src/storage/src/hummock/store/hummock_storage.rs index 82b98c5f4fb39..bd321197be45d 100644 --- a/src/storage/src/hummock/store/hummock_storage.rs +++ b/src/storage/src/hummock/store/hummock_storage.rs @@ -14,7 +14,7 @@ use std::collections::HashSet; use std::future::Future; -use std::ops::{Bound, Deref}; +use std::ops::Bound; use std::sync::Arc; use arc_swap::ArcSwap; @@ -50,9 +50,10 @@ use crate::hummock::event_handler::{ }; use crate::hummock::iterator::change_log::ChangeLogIterator; use crate::hummock::local_version::pinned_version::{start_pinned_version_worker, PinnedVersion}; +use crate::hummock::local_version::recent_versions::RecentVersions; use crate::hummock::observer_manager::HummockObserverNode; use crate::hummock::time_travel_version_cache::SimpleTimeTravelVersionCache; -use crate::hummock::utils::{validate_safe_epoch, wait_for_epoch}; +use crate::hummock::utils::wait_for_epoch; use crate::hummock::write_limiter::{WriteLimiter, WriteLimiterRef}; use crate::hummock::{ HummockEpoch, HummockError, HummockResult, HummockStorageIterator, HummockStorageRevIterator, @@ -97,7 +98,7 @@ pub struct HummockStorage { version_update_notifier_tx: Arc>, - pinned_version: Arc>, + recent_versions: Arc>, hummock_version_reader: HummockVersionReader, @@ -223,7 +224,7 @@ impl HummockStorage { version_update_notifier_tx: hummock_event_handler.version_update_notifier_tx(), hummock_event_sender: event_tx.clone(), _version_update_sender: version_update_tx, - pinned_version: hummock_event_handler.pinned_version(), + recent_versions: hummock_event_handler.recent_versions(), hummock_version_reader: HummockVersionReader::new( sstable_store, state_store_metrics.clone(), @@ -260,14 +261,12 @@ impl HummockStorage { ) -> StorageResult> { let key_range = (Bound::Included(key.clone()), Bound::Included(key.clone())); - let (key_range, read_version_tuple) = if read_options.read_version_from_time_travel { - self.build_read_version_by_time_travel(epoch, read_options.table_id, key_range) - .await? - } else if read_options.read_version_from_backup { + let (key_range, read_version_tuple) = if read_options.read_version_from_backup { self.build_read_version_tuple_from_backup(epoch, read_options.table_id, key_range) .await? } else { - self.build_read_version_tuple(epoch, read_options.table_id, key_range)? + self.build_read_version_tuple(epoch, read_options.table_id, key_range) + .await? }; if is_empty_key_range(&key_range) { @@ -285,14 +284,12 @@ impl HummockStorage { epoch: u64, read_options: ReadOptions, ) -> StorageResult { - let (key_range, read_version_tuple) = if read_options.read_version_from_time_travel { - self.build_read_version_by_time_travel(epoch, read_options.table_id, key_range) - .await? - } else if read_options.read_version_from_backup { + let (key_range, read_version_tuple) = if read_options.read_version_from_backup { self.build_read_version_tuple_from_backup(epoch, read_options.table_id, key_range) .await? } else { - self.build_read_version_tuple(epoch, read_options.table_id, key_range)? + self.build_read_version_tuple(epoch, read_options.table_id, key_range) + .await? }; self.hummock_version_reader @@ -306,14 +303,12 @@ impl HummockStorage { epoch: u64, read_options: ReadOptions, ) -> StorageResult { - let (key_range, read_version_tuple) = if read_options.read_version_from_time_travel { - self.build_read_version_by_time_travel(epoch, read_options.table_id, key_range) - .await? - } else if read_options.read_version_from_backup { + let (key_range, read_version_tuple) = if read_options.read_version_from_backup { self.build_read_version_tuple_from_backup(epoch, read_options.table_id, key_range) .await? } else { - self.build_read_version_tuple(epoch, read_options.table_id, key_range)? + self.build_read_version_tuple(epoch, read_options.table_id, key_range) + .await? }; self.hummock_version_reader @@ -321,12 +316,11 @@ impl HummockStorage { .await } - async fn build_read_version_by_time_travel( + async fn get_time_travel_version( &self, epoch: u64, table_id: TableId, - key_range: TableKeyRange, - ) -> StorageResult<(TableKeyRange, ReadVersionTuple)> { + ) -> StorageResult { let fetch = async { let pb_version = self .hummock_meta_client @@ -335,7 +329,6 @@ impl HummockStorage { .inspect_err(|e| tracing::error!("{}", e.to_report_string())) .map_err(|e| HummockError::meta_error(e.to_report_string()))?; let version = HummockVersion::from_rpc_protobuf(&pb_version); - validate_safe_epoch(&version, table_id, epoch)?; let (tx, _rx) = unbounded_channel(); Ok(PinnedVersion::new(version, tx)) }; @@ -343,9 +336,7 @@ impl HummockStorage { .simple_time_travel_version_cache .get_or_insert(epoch, fetch) .await?; - Ok(get_committed_read_version_tuple( - version, table_id, key_range, epoch, - )) + Ok(version) } async fn build_read_version_tuple_from_backup( @@ -359,16 +350,12 @@ impl HummockStorage { .try_get_hummock_version(table_id, epoch) .await { - Ok(Some(backup_version)) => { - validate_safe_epoch(backup_version.version(), table_id, epoch)?; - - Ok(get_committed_read_version_tuple( - backup_version, - table_id, - key_range, - epoch, - )) - } + Ok(Some(backup_version)) => Ok(get_committed_read_version_tuple( + backup_version, + table_id, + key_range, + epoch, + )), Ok(None) => Err(HummockError::read_backup_error(format!( "backup include epoch {} not found", epoch @@ -378,27 +365,34 @@ impl HummockStorage { } } - fn build_read_version_tuple( + async fn build_read_version_tuple( &self, epoch: u64, table_id: TableId, key_range: TableKeyRange, ) -> StorageResult<(TableKeyRange, ReadVersionTuple)> { - let pinned_version = self.pinned_version.load(); - validate_safe_epoch(pinned_version.version(), table_id, epoch)?; - let table_committed_epoch = pinned_version + let recent_versions = self.recent_versions.load(); + let pinned_version = recent_versions.latest_version().clone(); + let state_table_info = pinned_version .version() .state_table_info .info() - .get(&table_id) - .map(|info| info.committed_epoch); + .get(&table_id); // check epoch if lower mce - let ret = if let Some(table_committed_epoch) = table_committed_epoch - && epoch <= table_committed_epoch + let ret = if let Some(info) = state_table_info + && epoch <= info.committed_epoch { + let version = if epoch >= info.safe_epoch { + recent_versions.latest_version().clone() + } else if let Some(version) = recent_versions.get_safe_version(table_id, epoch) { + version + } else { + drop(recent_versions); + self.get_time_travel_version(epoch, table_id).await? + }; // read committed_version directly without build snapshot - get_committed_read_version_tuple((**pinned_version).clone(), table_id, key_range, epoch) + get_committed_read_version_tuple(version, table_id, key_range, epoch) } else { let vnode = vnode(&key_range); let mut matched_replicated_read_version_cnt = 0; @@ -438,7 +432,7 @@ impl HummockStorage { vnode.to_index(), epoch, matched_replicated_read_version_cnt, - table_committed_epoch, + state_table_info, ); } else { tracing::debug!( @@ -446,15 +440,10 @@ impl HummockStorage { table_id, vnode.to_index(), epoch, - table_committed_epoch + state_table_info ); } - get_committed_read_version_tuple( - (**pinned_version).clone(), - table_id, - key_range, - epoch, - ) + get_committed_read_version_tuple(pinned_version, table_id, key_range, epoch) } else { if read_version_vec.len() != 1 { let read_version_vnodes = read_version_vec @@ -537,8 +526,9 @@ impl HummockStorage { self.buffer_tracker.get_memory_limiter().clone() } + #[cfg(any(test, feature = "test"))] pub fn get_pinned_version(&self) -> PinnedVersion { - self.pinned_version.load().deref().deref().clone() + self.recent_versions.load().latest_version().clone() } pub fn backup_reader(&self) -> BackupReaderRef { @@ -604,7 +594,7 @@ impl StateStoreRead for HummockStorage { key_range: TableKeyRange, options: ReadLogOptions, ) -> StorageResult { - let version = (**self.pinned_version.load()).clone(); + let version = self.recent_versions.load().latest_version().clone(); let iter = self .hummock_version_reader .iter_log(version, epoch_range, key_range, options) @@ -655,8 +645,9 @@ impl HummockStorage { epoch: u64, ) -> StorageResult { let table_ids = self - .pinned_version + .recent_versions .load() + .latest_version() .version() .state_table_info .info() @@ -675,7 +666,7 @@ impl HummockStorage { .send(HummockVersionUpdate::PinnedVersion(Box::new(version))) .unwrap(); loop { - if self.pinned_version.load().id() >= version_id { + if self.recent_versions.load().latest_version().id() >= version_id { break; } @@ -686,7 +677,7 @@ impl HummockStorage { pub async fn wait_version(&self, version: HummockVersion) { use tokio::task::yield_now; loop { - if self.pinned_version.load().id() >= version.id { + if self.recent_versions.load().latest_version().id() >= version.id { break; } @@ -736,7 +727,7 @@ impl HummockStorage { pub async fn wait_version_update(&self, old_id: HummockVersionId) -> HummockVersionId { use tokio::task::yield_now; loop { - let cur_id = self.pinned_version.load().id(); + let cur_id = self.recent_versions.load().latest_version().id(); if cur_id > old_id { return cur_id; } diff --git a/src/storage/src/hummock/utils.rs b/src/storage/src/hummock/utils.rs index 3f2d1f989f529..c2f6cbafed79b 100644 --- a/src/storage/src/hummock/utils.rs +++ b/src/storage/src/hummock/utils.rs @@ -30,11 +30,10 @@ use risingwave_hummock_sdk::key::{ bound_table_key_range, EmptySliceRef, FullKey, TableKey, UserKey, }; use risingwave_hummock_sdk::sstable_info::SstableInfo; -use risingwave_hummock_sdk::version::HummockVersion; use risingwave_hummock_sdk::{can_concat, HummockEpoch}; use tokio::sync::oneshot::{channel, Receiver, Sender}; -use super::{HummockError, HummockResult, SstableStoreRef}; +use super::{HummockError, SstableStoreRef}; use crate::error::StorageResult; use crate::hummock::CachePolicy; use crate::mem_table::{KeyOp, MemTableError}; @@ -72,24 +71,6 @@ where !too_left && !too_right } -pub fn validate_safe_epoch( - version: &HummockVersion, - table_id: TableId, - epoch: u64, -) -> HummockResult<()> { - if let Some(info) = version.state_table_info.info().get(&table_id) - && epoch < info.safe_epoch - { - return Err(HummockError::expired_epoch( - table_id, - info.safe_epoch, - epoch, - )); - } - - Ok(()) -} - pub fn filter_single_sst(info: &SstableInfo, table_id: TableId, table_key_range: &R) -> bool where R: RangeBounds>, diff --git a/src/storage/src/store.rs b/src/storage/src/store.rs index 91f79231f6939..480b24d5defb9 100644 --- a/src/storage/src/store.rs +++ b/src/storage/src/store.rs @@ -502,7 +502,6 @@ pub struct ReadOptions { /// Read from historical hummock version of meta snapshot backup. /// It should only be used by `StorageTable` for batch query. pub read_version_from_backup: bool, - pub read_version_from_time_travel: bool, } impl From for ReadOptions { @@ -515,7 +514,6 @@ impl From for ReadOptions { retention_seconds: value.retention_seconds, table_id: value.table_id.into(), read_version_from_backup: value.read_version_from_backup, - read_version_from_time_travel: value.read_version_from_time_travel, } } } @@ -530,7 +528,6 @@ impl From for TracedReadOptions { retention_seconds: value.retention_seconds, table_id: value.table_id.into(), read_version_from_backup: value.read_version_from_backup, - read_version_from_time_travel: value.read_version_from_time_travel, } } } diff --git a/src/storage/src/table/batch_table/storage_table.rs b/src/storage/src/table/batch_table/storage_table.rs index 7a0ad76cce4a5..52a7655288c13 100644 --- a/src/storage/src/table/batch_table/storage_table.rs +++ b/src/storage/src/table/batch_table/storage_table.rs @@ -361,7 +361,6 @@ impl StorageTableInner { ) -> StorageResult> { let epoch = wait_epoch.get_epoch(); let read_backup = matches!(wait_epoch, HummockReadEpoch::Backup(_)); - let read_time_travel = matches!(wait_epoch, HummockReadEpoch::TimeTravel(_)); self.store.try_wait_epoch(wait_epoch).await?; let serialized_pk = serialize_pk_with_vnode( &pk, @@ -382,7 +381,6 @@ impl StorageTableInner { retention_seconds: self.table_option.retention_seconds, table_id: self.table_id, read_version_from_backup: read_backup, - read_version_from_time_travel: read_time_travel, cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }; @@ -487,14 +485,12 @@ impl StorageTableInner { let iterators: Vec<_> = try_join_all(table_key_ranges.map(|table_key_range| { let prefix_hint = prefix_hint.clone(); let read_backup = matches!(wait_epoch, HummockReadEpoch::Backup(_)); - let read_time_travel = matches!(wait_epoch, HummockReadEpoch::TimeTravel(_)); async move { let read_options = ReadOptions { prefix_hint, retention_seconds: self.table_option.retention_seconds, table_id: self.table_id, read_version_from_backup: read_backup, - read_version_from_time_travel: read_time_travel, prefetch_options, cache_policy, ..Default::default()