From dd6515625d980443ae1a71cf6332438cdbd49e86 Mon Sep 17 00:00:00 2001 From: William Wen <44139337+wenym1@users.noreply.github.com> Date: Wed, 21 Aug 2024 19:25:11 +0800 Subject: [PATCH] fix: fix log store truncate offset incorrect check (#18165) --- .../common/log_store_impl/kv_log_store/mod.rs | 7 +++++ .../log_store_impl/kv_log_store/reader.rs | 30 ++++++++++++------- 2 files changed, 27 insertions(+), 10 deletions(-) diff --git a/src/stream/src/common/log_store_impl/kv_log_store/mod.rs b/src/stream/src/common/log_store_impl/kv_log_store/mod.rs index 63ee6762cfb30..f75767d22ea08 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/mod.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/mod.rs @@ -1514,6 +1514,13 @@ mod tests { let chunk_ids = check_reader(&mut reader, data[1..].iter()).await; assert_eq!(2, chunk_ids.len()); + reader + .truncate(TruncateOffset::Chunk { + epoch: epoch2, + chunk_id: chunk_ids[0], + }) + .unwrap(); + reader .truncate(TruncateOffset::Barrier { epoch: epoch2 }) .unwrap(); diff --git a/src/stream/src/common/log_store_impl/kv_log_store/reader.rs b/src/stream/src/common/log_store_impl/kv_log_store/reader.rs index 63cc1bd4abf0c..5497b989a0873 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/reader.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/reader.rs @@ -591,24 +591,34 @@ impl LogReader for KvLogStoreReader { self.latest_offset )); } - if let Some(truncate_offset) = &self.truncate_offset { - if offset <= *truncate_offset { - return Err(anyhow!( - "truncate offset {:?} earlier than prev truncate offset {:?}", - offset, - truncate_offset - )); - } - } if offset.epoch() >= self.first_write_epoch.expect("should have init") { + if let Some(truncate_offset) = &self.truncate_offset { + if offset <= *truncate_offset { + return Err(anyhow!( + "truncate offset {:?} earlier than prev truncate offset {:?}", + offset, + truncate_offset + )); + } + } self.rx.truncate_buffer(offset); + self.truncate_offset = Some(offset); } else { // For historical data, no need to truncate at seq id level. Only truncate at barrier. if let TruncateOffset::Barrier { epoch } = &offset { + if let Some(truncate_offset) = &self.truncate_offset { + if offset <= *truncate_offset { + return Err(anyhow!( + "truncate offset {:?} earlier than prev truncate offset {:?}", + offset, + truncate_offset + )); + } + } self.rx.truncate_historical(*epoch); + self.truncate_offset = Some(offset); } } - self.truncate_offset = Some(offset); Ok(()) }