diff --git a/proto/hummock.proto b/proto/hummock.proto index ed08064d1f0f..a746d1675e83 100644 --- a/proto/hummock.proto +++ b/proto/hummock.proto @@ -535,7 +535,7 @@ message VacuumTask { // Scan object store to get candidate orphan SSTs. message FullScanTask { - uint64 sst_retention_time_sec = 1; + uint64 sst_retention_watermark = 1; optional string prefix = 2; optional string start_after = 3; optional uint64 limit = 4; diff --git a/proto/stream_service.proto b/proto/stream_service.proto index ab56c9f7e405..45703554c236 100644 --- a/proto/stream_service.proto +++ b/proto/stream_service.proto @@ -32,13 +32,14 @@ message BarrierCompleteResponse { string request_id = 1; common.Status status = 2; repeated CreateMviewProgress create_mview_progress = 3; - message GroupedSstableInfo { + message LocalSstableInfo { reserved 1; reserved "compaction_group_id"; hummock.SstableInfo sst = 2; map table_stats_map = 3; + uint64 created_at = 4; } - repeated GroupedSstableInfo synced_sstables = 4; + repeated LocalSstableInfo synced_sstables = 4; uint32 worker_id = 5; map table_watermarks = 6; repeated hummock.SstableInfo old_value_sstables = 7; diff --git a/src/meta/model_v2/src/hummock_sequence.rs b/src/meta/model_v2/src/hummock_sequence.rs index 58156c33266f..04f8d6dc5540 100644 --- a/src/meta/model_v2/src/hummock_sequence.rs +++ b/src/meta/model_v2/src/hummock_sequence.rs @@ -18,6 +18,8 @@ pub const COMPACTION_TASK_ID: &str = "compaction_task"; pub const COMPACTION_GROUP_ID: &str = "compaction_group"; pub const SSTABLE_OBJECT_ID: &str = "sstable_object"; pub const META_BACKUP_ID: &str = "meta_backup"; +/// The read & write of now is different from other sequences. It merely reuses the hummock_sequence table. +pub const HUMMOCK_NOW: &str = "now"; #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Default, Serialize, Deserialize)] #[sea_orm(table_name = "hummock_sequence")] diff --git a/src/meta/service/src/hummock_service.rs b/src/meta/service/src/hummock_service.rs index 6373c027f15d..a11d77d5408a 100644 --- a/src/meta/service/src/hummock_service.rs +++ b/src/meta/service/src/hummock_service.rs @@ -283,7 +283,8 @@ impl HummockManagerService for HummockServiceImpl { ) -> Result, Status> { let req = request.into_inner(); self.hummock_manager - .start_full_gc(Duration::from_secs(req.sst_retention_time_sec), req.prefix)?; + .start_full_gc(Duration::from_secs(req.sst_retention_time_sec), req.prefix) + .await?; Ok(Response::new(TriggerFullGcResponse { status: None })) } diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index c1c14d2977bc..bdfb5a30ad53 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -1669,6 +1669,7 @@ fn collect_resp_info( LocalSstableInfo::new( sst_info.into(), from_prost_table_stats_map(grouped.table_stats_map), + grouped.created_at, ) }); synced_ssts.extend(ssts_iter); diff --git a/src/meta/src/hummock/manager/compaction/mod.rs b/src/meta/src/hummock/manager/compaction/mod.rs index f05706402e7a..c4e35ddb921e 100644 --- a/src/meta/src/hummock/manager/compaction/mod.rs +++ b/src/meta/src/hummock/manager/compaction/mod.rs @@ -1129,6 +1129,7 @@ impl HummockManager { /// or the task is not owned by `context_id` when `context_id` is not None. pub async fn report_compact_tasks(&self, report_tasks: Vec) -> Result> { + // TODO: add sanity check for serverless compaction let mut guard = self.compaction.write().await; let deterministic_mode = self.env.opts.compaction_deterministic_test; let compaction: &mut Compaction = &mut guard; diff --git a/src/meta/src/hummock/manager/context.rs b/src/meta/src/hummock/manager/context.rs index 7d381e5f36fb..230cb29608bc 100644 --- a/src/meta/src/hummock/manager/context.rs +++ b/src/meta/src/hummock/manager/context.rs @@ -241,6 +241,19 @@ impl HummockManager { } } + // HummockManager::now requires a write to the meta store. Thus, it should be avoided whenever feasible. + if !sstables.is_empty() { + // sanity check to ensure SSTs to commit have not been full GCed yet. + let now = self.now().await?; + let sst_retention_watermark = + now.saturating_sub(self.env.opts.min_sst_retention_time_sec); + for sst in sstables { + if sst.created_at < sst_retention_watermark { + return Err(anyhow::anyhow!("SST {} is rejected from being committed since it's below watermark: SST timestamp {}, meta node timestamp {}, retention_sec {}, watermark {}", sst.sst_info.sst_id, sst.created_at, now, self.env.opts.min_sst_retention_time_sec, sst_retention_watermark).into()); + } + } + } + async { if !self.env.opts.enable_committed_sst_sanity_check { return; diff --git a/src/meta/src/hummock/manager/gc.rs b/src/meta/src/hummock/manager/gc.rs index 06b211f9f127..d6a38c766477 100644 --- a/src/meta/src/hummock/manager/gc.rs +++ b/src/meta/src/hummock/manager/gc.rs @@ -16,17 +16,21 @@ use std::cmp; use std::collections::HashSet; use std::ops::Bound::{Excluded, Included}; use std::ops::DerefMut; -use std::time::Duration; +use std::time::{Duration, SystemTime}; use anyhow::Context; use futures::{stream, StreamExt}; use itertools::Itertools; use parking_lot::Mutex; use risingwave_hummock_sdk::HummockSstableObjectId; +use risingwave_meta_model_migration::OnConflict; +use risingwave_meta_model_v2::hummock_sequence; +use risingwave_meta_model_v2::hummock_sequence::HUMMOCK_NOW; use risingwave_pb::common::worker_node::State::Running; use risingwave_pb::common::WorkerType; use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent; use risingwave_pb::hummock::FullScanTask; +use sea_orm::{ActiveValue, EntityTrait}; use crate::hummock::error::{Error, Result}; use crate::hummock::manager::commit_multi_var; @@ -177,13 +181,13 @@ impl HummockManager { /// 3. Meta node decides which SSTs to delete. See `HummockManager::complete_full_gc`. /// /// Returns Ok(false) if there is no worker available. - pub fn start_full_gc( + pub async fn start_full_gc( &self, sst_retention_time: Duration, prefix: Option, ) -> Result { self.metrics.full_gc_trigger_count.inc(); - // Set a minimum sst_retention_time to avoid deleting SSTs of on-going write op. + // Set a minimum sst_retention_time. let sst_retention_time = cmp::max( sst_retention_time, Duration::from_secs(self.env.opts.min_sst_retention_time_sec), @@ -205,9 +209,13 @@ impl HummockManager { } Some(compactor) => compactor, }; + let sst_retention_watermark = self + .now() + .await? + .saturating_sub(sst_retention_time.as_secs()); compactor .send_event(ResponseEvent::FullScanTask(FullScanTask { - sst_retention_time_sec: sst_retention_time.as_secs(), + sst_retention_watermark, prefix, start_after, limit, @@ -265,6 +273,50 @@ impl HummockManager { tracing::info!("GC watermark is {watermark}. Object full scan returns {candidate_object_number} objects. {after_watermark} remains after filtered by GC watermark. {after_time_travel} remains after filtered by time travel archives. {selected_object_number} remains after filtered by hummock version."); Ok(selected_object_number) } + + pub async fn now(&self) -> Result { + let mut guard = self.now.lock().await; + let new_now = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .expect("Clock may have gone backwards") + .as_secs(); + if new_now < *guard { + return Err(anyhow::anyhow!(format!( + "unexpected decreasing now, old={}, new={}", + *guard, new_now + )) + .into()); + } + *guard = new_now; + drop(guard); + // Persist now to maintain non-decreasing even after a meta node reboot. + if let Some(sql) = self.sql_store() { + let m = hummock_sequence::ActiveModel { + name: ActiveValue::Set(HUMMOCK_NOW.into()), + seq: ActiveValue::Set(new_now.try_into().unwrap()), + }; + hummock_sequence::Entity::insert(m) + .on_conflict( + OnConflict::column(hummock_sequence::Column::Name) + .update_column(hummock_sequence::Column::Seq) + .to_owned(), + ) + .exec(&sql.conn) + .await?; + } + Ok(new_now) + } + + pub(crate) async fn load_now(&self) -> Result> { + let Some(sql) = self.sql_store() else { + return Ok(None); + }; + let now = hummock_sequence::Entity::find_by_id(HUMMOCK_NOW.to_string()) + .one(&sql.conn) + .await? + .map(|m| m.seq.try_into().unwrap()); + Ok(now) + } } pub struct FullGcState { @@ -399,6 +451,7 @@ mod tests { Duration::from_secs(hummock_manager.env.opts.min_sst_retention_time_sec - 1,), None ) + .await .unwrap()); let mut receiver = compactor_manager.add_compactor(context_id); @@ -408,36 +461,28 @@ mod tests { Duration::from_secs(hummock_manager.env.opts.min_sst_retention_time_sec - 1), None ) + .await .unwrap()); - let full_scan_task = match receiver.recv().await.unwrap().unwrap().event.unwrap() { + let _full_scan_task = match receiver.recv().await.unwrap().unwrap().event.unwrap() { ResponseEvent::FullScanTask(task) => task, _ => { panic!() } }; - // min_sst_retention_time_sec override user provided value. - assert_eq!( - hummock_manager.env.opts.min_sst_retention_time_sec, - full_scan_task.sst_retention_time_sec - ); assert!(hummock_manager .start_full_gc( Duration::from_secs(hummock_manager.env.opts.min_sst_retention_time_sec + 1), None ) + .await .unwrap()); - let full_scan_task = match receiver.recv().await.unwrap().unwrap().event.unwrap() { + let _full_scan_task = match receiver.recv().await.unwrap().unwrap().event.unwrap() { ResponseEvent::FullScanTask(task) => task, _ => { panic!() } }; - // min_sst_retention_time_sec doesn't override user provided value. - assert_eq!( - hummock_manager.env.opts.min_sst_retention_time_sec + 1, - full_scan_task.sst_retention_time_sec - ); // Empty input results immediate return, without waiting heartbeat. hummock_manager diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index 8a87565a21b8..d984f9238cd7 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -36,6 +36,7 @@ use risingwave_pb::hummock::{ }; use risingwave_pb::meta::subscribe_response::Operation; use tokio::sync::mpsc::UnboundedSender; +use tokio::sync::Mutex; use tonic::Streaming; use crate::hummock::compaction::CompactStatus; @@ -109,6 +110,7 @@ pub struct HummockManager { // and suggest types with a certain priority. pub compaction_state: CompactionState, full_gc_state: FullGcState, + now: Mutex, } pub type HummockManagerRef = Arc; @@ -279,6 +281,7 @@ impl HummockManager { compactor_streams_change_tx, compaction_state: CompactionState::new(), full_gc_state: FullGcState::new(Some(full_gc_object_limit)), + now: Mutex::new(0), }; let instance = Arc::new(instance); instance.init_time_travel_state().await?; @@ -296,6 +299,9 @@ impl HummockManager { /// Load state from meta store. async fn load_meta_store_state(&self) -> Result<()> { + let now = self.load_now().await?; + *self.now.lock().await = now.unwrap_or(0); + let mut compaction_guard = self.compaction.write().await; let mut versioning_guard = self.versioning.write().await; let mut context_info_guard = self.context_info.write().await; diff --git a/src/meta/src/hummock/manager/tests.rs b/src/meta/src/hummock/manager/tests.rs index f816400368b6..82b5dda90dd6 100644 --- a/src/meta/src/hummock/manager/tests.rs +++ b/src/meta/src/hummock/manager/tests.rs @@ -89,6 +89,7 @@ fn gen_local_sstable_info(sst_id: u64, table_ids: Vec, epoch: u64) -> Local LocalSstableInfo { sst_info: gen_sstable_info(sst_id, table_ids, epoch), table_stats: Default::default(), + created_at: u64::MAX, } } fn get_compaction_group_object_ids( @@ -795,6 +796,31 @@ async fn test_invalid_sst_id() { ); } + // reject due to SST's timestamp is below watermark + let ssts_below_watermerk = ssts + .iter() + .map(|s| LocalSstableInfo { + sst_info: s.sst_info.clone(), + table_stats: s.table_stats.clone(), + created_at: 0, + }) + .collect(); + let error = hummock_meta_client + .commit_epoch( + epoch, + SyncResult { + uncommitted_ssts: ssts_below_watermerk, + ..Default::default() + }, + false, + ) + .await + .unwrap_err(); + assert!(error + .as_report() + .to_string() + .contains("is rejected from being committed since it's below watermark")); + hummock_meta_client .commit_epoch( epoch, @@ -1276,6 +1302,7 @@ async fn test_version_stats() { .iter() .map(|table_id| (*table_id, table_stats_change.clone())) .collect(), + created_at: u64::MAX, }) .collect_vec(); hummock_meta_client @@ -1385,6 +1412,7 @@ async fn test_move_state_tables_to_dedicated_compaction_group_on_commit() { }, ), ]), + created_at: u64::MAX, }; hummock_meta_client .commit_epoch( @@ -1466,10 +1494,12 @@ async fn test_move_state_tables_to_dedicated_compaction_group_on_demand_basic() let sst_1 = LocalSstableInfo { sst_info: gen_sstable_info(10, vec![100], test_epoch(20)), table_stats: Default::default(), + created_at: u64::MAX, }; let sst_2 = LocalSstableInfo { sst_info: gen_sstable_info(11, vec![100, 101], test_epoch(20)), table_stats: Default::default(), + created_at: u64::MAX, }; hummock_meta_client .commit_epoch( @@ -1550,6 +1580,7 @@ async fn test_move_state_tables_to_dedicated_compaction_group_on_demand_non_triv let sst_1 = LocalSstableInfo { sst_info: gen_sstable_info(10, vec![100, 101], test_epoch(20)), table_stats: Default::default(), + created_at: u64::MAX, }; hummock_manager .register_table_ids_for_test(&[(100, 2), (101, 2)]) @@ -1631,11 +1662,13 @@ async fn test_move_state_tables_to_dedicated_compaction_group_trivial_expired() let sst_1 = LocalSstableInfo { sst_info: gen_sstable_info(10, vec![100], test_epoch(20)), table_stats: Default::default(), + created_at: u64::MAX, }; let sst_2 = LocalSstableInfo { sst_info: gen_sstable_info(11, vec![100, 101], test_epoch(20)), table_stats: Default::default(), + created_at: u64::MAX, }; let mut sst_3 = sst_2.clone(); let mut sst_4 = sst_1.clone(); @@ -1767,16 +1800,19 @@ async fn test_move_state_tables_to_dedicated_compaction_group_on_demand_bottom_l let sst_1 = LocalSstableInfo { sst_info: gen_sstable_info(10, vec![100], epoch), table_stats: Default::default(), + created_at: u64::MAX, }; let sst_2 = LocalSstableInfo { sst_info: gen_sstable_info(11, vec![101, 102], epoch), table_stats: Default::default(), + created_at: u64::MAX, }; let sst_3 = LocalSstableInfo { sst_info: gen_sstable_info(12, vec![103], epoch), table_stats: Default::default(), + created_at: u64::MAX, }; hummock_meta_client @@ -1907,10 +1943,12 @@ async fn test_compaction_task_expiration_due_to_split_group() { let sst_1 = LocalSstableInfo { sst_info: gen_sstable_info(10, vec![100, 101], test_epoch(20)), table_stats: Default::default(), + created_at: u64::MAX, }; let sst_2 = LocalSstableInfo { sst_info: gen_sstable_info(11, vec![101], test_epoch(20)), table_stats: Default::default(), + created_at: u64::MAX, }; hummock_meta_client @@ -2245,10 +2283,12 @@ async fn test_unregister_moved_table() { let sst_1 = LocalSstableInfo { sst_info: gen_sstable_info(10, vec![100], test_epoch(20)), table_stats: Default::default(), + created_at: u64::MAX, }; let sst_2 = LocalSstableInfo { sst_info: gen_sstable_info(11, vec![100, 101], test_epoch(20)), table_stats: Default::default(), + created_at: u64::MAX, }; hummock_meta_client diff --git a/src/meta/src/hummock/manager/timer_task.rs b/src/meta/src/hummock/manager/timer_task.rs index eb0b2655d004..0cefe157d849 100644 --- a/src/meta/src/hummock/manager/timer_task.rs +++ b/src/meta/src/hummock/manager/timer_task.rs @@ -344,6 +344,7 @@ impl HummockManager { hummock_manager.env.opts.min_sst_retention_time_sec; if hummock_manager .start_full_gc(Duration::from_secs(retention_sec), None) + .await .is_ok() { tracing::info!("Start full GC from meta node."); diff --git a/src/storage/hummock_sdk/src/lib.rs b/src/storage/hummock_sdk/src/lib.rs index 9e35e1e31a66..db61841b31c3 100644 --- a/src/storage/hummock_sdk/src/lib.rs +++ b/src/storage/hummock_sdk/src/lib.rs @@ -177,13 +177,15 @@ pub struct SyncResult { pub struct LocalSstableInfo { pub sst_info: SstableInfo, pub table_stats: TableStatsMap, + pub created_at: u64, } impl LocalSstableInfo { - pub fn new(sst_info: SstableInfo, table_stats: TableStatsMap) -> Self { + pub fn new(sst_info: SstableInfo, table_stats: TableStatsMap, created_at: u64) -> Self { Self { sst_info, table_stats, + created_at, } } @@ -191,6 +193,7 @@ impl LocalSstableInfo { Self { sst_info, table_stats: Default::default(), + created_at: u64::MAX, } } diff --git a/src/storage/hummock_test/src/hummock_storage_tests.rs b/src/storage/hummock_test/src/hummock_storage_tests.rs index 9bd257c36b9a..0e45aeb7e833 100644 --- a/src/storage/hummock_test/src/hummock_storage_tests.rs +++ b/src/storage/hummock_test/src/hummock_storage_tests.rs @@ -2608,6 +2608,7 @@ async fn test_commit_multi_epoch() { }) .collect(), sst_info: sst, + created_at: u64::MAX, }], new_table_fragment_info, change_log_delta: Default::default(), diff --git a/src/storage/hummock_test/src/vacuum_tests.rs b/src/storage/hummock_test/src/vacuum_tests.rs index e68a2de4b00a..1de752339841 100644 --- a/src/storage/hummock_test/src/vacuum_tests.rs +++ b/src/storage/hummock_test/src/vacuum_tests.rs @@ -91,7 +91,7 @@ async fn test_full_scan() { let object_metadata_iter = Box::pin(stream::iter(object_store_list_result.into_iter().map(Ok))); let task = FullScanTask { - sst_retention_time_sec: 10000, + sst_retention_watermark: 0, prefix: None, start_after: None, limit: None, @@ -102,7 +102,7 @@ async fn test_full_scan() { assert!(scan_result.is_empty()); let task = FullScanTask { - sst_retention_time_sec: 6000, + sst_retention_watermark: now_ts.sub(Duration::from_secs(6000)).as_secs(), prefix: None, start_after: None, limit: None, @@ -113,7 +113,7 @@ async fn test_full_scan() { assert_eq!(scan_result.into_iter().sorted().collect_vec(), vec![1]); let task = FullScanTask { - sst_retention_time_sec: 2000, + sst_retention_watermark: u64::MAX, prefix: None, start_after: None, limit: None, diff --git a/src/storage/src/hummock/sstable/builder.rs b/src/storage/src/hummock/sstable/builder.rs index 49caf0ba0256..24e7e14e02e0 100644 --- a/src/storage/src/hummock/sstable/builder.rs +++ b/src/storage/src/hummock/sstable/builder.rs @@ -14,6 +14,7 @@ use std::collections::BTreeSet; use std::sync::Arc; +use std::time::SystemTime; use bytes::{Bytes, BytesMut}; use risingwave_hummock_sdk::key::{user_key, FullKey, MAX_KEY_LEN}; @@ -540,8 +541,20 @@ impl SstableBuilder { } let writer_output = self.writer.finish(meta).await?; + // The timestamp is only used during full GC. + // + // Ideally object store object's last_modified should be used. + // However, it'll incur additional IO overhead since S3 lacks an interface to retrieve the last_modified timestamp after the PUT operation on an object. + // + // The local timestamp below is expected to precede the last_modified of object store object, given that the object store object is created afterward. + // It should help alleviate the clock drift issue. + + let now = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .expect("Clock may have gone backwards") + .as_secs(); Ok(SstableBuilderOutput:: { - sst_info: LocalSstableInfo::new(sst_info, self.table_stats), + sst_info: LocalSstableInfo::new(sst_info, self.table_stats, now), writer_output, stats: SstableBuilderOutputStats { bloom_filter_size, diff --git a/src/storage/src/hummock/vacuum.rs b/src/storage/src/hummock/vacuum.rs index 17095e994e28..09c8f512adbe 100644 --- a/src/storage/src/hummock/vacuum.rs +++ b/src/storage/src/hummock/vacuum.rs @@ -12,9 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::ops::Sub; use std::sync::Arc; -use std::time::{Duration, SystemTime, UNIX_EPOCH}; use futures::{StreamExt, TryStreamExt}; use risingwave_hummock_sdk::HummockSstableObjectId; @@ -60,12 +58,6 @@ impl Vacuum { full_scan_task: FullScanTask, metadata_iter: ObjectMetadataIter, ) -> HummockResult<(Vec, u64, u64, Option)> { - let timestamp_watermark = SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap() - .sub(Duration::from_secs(full_scan_task.sst_retention_time_sec)) - .as_secs_f64(); - let mut total_object_count = 0; let mut total_object_size = 0; let mut next_start_after: Option = None; @@ -83,7 +75,7 @@ impl Vacuum { next_start_after = Some(o.key.clone()); tracing::debug!(next_start_after, "set next start after"); } - if o.last_modified < timestamp_watermark { + if o.last_modified < full_scan_task.sst_retention_watermark as f64 { Some(Ok(SstableStore::get_object_id_from_path(&o.key))) } else { None @@ -109,7 +101,7 @@ impl Vacuum { sstable_store: SstableStoreRef, ) -> HummockResult<(Vec, u64, u64, Option)> { tracing::info!( - timestamp = full_scan_task.sst_retention_time_sec, + sst_retention_watermark = full_scan_task.sst_retention_watermark, prefix = full_scan_task.prefix.as_ref().unwrap_or(&String::from("")), start_after = full_scan_task.start_after, limit = full_scan_task.limit, diff --git a/src/stream/src/task/barrier_manager.rs b/src/stream/src/task/barrier_manager.rs index 242f21c17272..139f58a696f4 100644 --- a/src/stream/src/task/barrier_manager.rs +++ b/src/stream/src/task/barrier_manager.rs @@ -24,7 +24,7 @@ use futures::StreamExt; use itertools::Itertools; use risingwave_common::error::tonic::extra::Score; use risingwave_pb::stream_service::barrier_complete_response::{ - GroupedSstableInfo, PbCreateMviewProgress, + PbCreateMviewProgress, PbLocalSstableInfo, }; use risingwave_rpc_client::error::{ToTonicStatus, TonicStatusWrapper}; use thiserror_ext::AsReport; @@ -494,9 +494,11 @@ impl LocalBarrierWorker { |LocalSstableInfo { sst_info, table_stats, - }| GroupedSstableInfo { + created_at, + }| PbLocalSstableInfo { sst: Some(sst_info.into()), table_stats_map: to_prost_table_stats_map(table_stats), + created_at, }, ) .collect_vec(),