diff --git a/src/stream/src/common/log_store_impl/kv_log_store/mod.rs b/src/stream/src/common/log_store_impl/kv_log_store/mod.rs index 63ee6762cfb30..f75767d22ea08 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/mod.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/mod.rs @@ -1514,6 +1514,13 @@ mod tests { let chunk_ids = check_reader(&mut reader, data[1..].iter()).await; assert_eq!(2, chunk_ids.len()); + reader + .truncate(TruncateOffset::Chunk { + epoch: epoch2, + chunk_id: chunk_ids[0], + }) + .unwrap(); + reader .truncate(TruncateOffset::Barrier { epoch: epoch2 }) .unwrap(); diff --git a/src/stream/src/common/log_store_impl/kv_log_store/reader.rs b/src/stream/src/common/log_store_impl/kv_log_store/reader.rs index 63cc1bd4abf0c..5497b989a0873 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/reader.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/reader.rs @@ -591,24 +591,34 @@ impl LogReader for KvLogStoreReader { self.latest_offset )); } - if let Some(truncate_offset) = &self.truncate_offset { - if offset <= *truncate_offset { - return Err(anyhow!( - "truncate offset {:?} earlier than prev truncate offset {:?}", - offset, - truncate_offset - )); - } - } if offset.epoch() >= self.first_write_epoch.expect("should have init") { + if let Some(truncate_offset) = &self.truncate_offset { + if offset <= *truncate_offset { + return Err(anyhow!( + "truncate offset {:?} earlier than prev truncate offset {:?}", + offset, + truncate_offset + )); + } + } self.rx.truncate_buffer(offset); + self.truncate_offset = Some(offset); } else { // For historical data, no need to truncate at seq id level. Only truncate at barrier. if let TruncateOffset::Barrier { epoch } = &offset { + if let Some(truncate_offset) = &self.truncate_offset { + if offset <= *truncate_offset { + return Err(anyhow!( + "truncate offset {:?} earlier than prev truncate offset {:?}", + offset, + truncate_offset + )); + } + } self.rx.truncate_historical(*epoch); + self.truncate_offset = Some(offset); } } - self.truncate_offset = Some(offset); Ok(()) }