diff --git a/src/stream/src/executor/over_window/over_partition.rs b/src/stream/src/executor/over_window/over_partition.rs index 8ce4cf1968ef2..1d308d4007515 100644 --- a/src/stream/src/executor/over_window/over_partition.rs +++ b/src/stream/src/executor/over_window/over_partition.rs @@ -88,25 +88,25 @@ pub(super) fn shrink_partition_cache( let mut cursor = range_cache.inner().upper_bound(Bound::Excluded(&ck_start)); for _ in 0..MAGIC_JITTER_PREVENTION { - if cursor.key().is_none() { + if cursor.prev().is_none() { break; } - cursor.move_prev(); } let start = cursor - .key() + .peek_prev() + .map(|(k, _)| k) .unwrap_or_else(|| range_cache.first_key_value().unwrap().0) .clone(); - let mut cursor = range_cache.inner().lower_bound(Bound::Excluded(&ck_end)); + let mut cursor = range_cache.inner().lower_bound(Bound::Included(&ck_start)); for _ in 0..MAGIC_JITTER_PREVENTION { - if cursor.key().is_none() { + if cursor.next().is_none() { break; } - cursor.move_next(); } let end = cursor - .key() + .peek_next() + .map(|(k, _)| k) .unwrap_or_else(|| range_cache.last_key_value().unwrap().0) .clone(); @@ -125,27 +125,27 @@ pub(super) fn shrink_partition_cache( let mut cursor = range_cache.inner().upper_bound(Bound::Excluded(&ck_start)); // go back for at most `MAGIC_JITTER_PREVENTION` entries for _ in 0..MAGIC_JITTER_PREVENTION { - if cursor.key().is_none() { + if cursor.prev().is_none() { break; } - cursor.move_prev(); capacity_remain -= 1; } let start = cursor - .key() + .peek_prev() + .map(|(k, _)| k) .unwrap_or_else(|| range_cache.first_key_value().unwrap().0) .clone(); let mut cursor = range_cache.inner().lower_bound(Bound::Included(&ck_start)); // go forward for at most `capacity_remain` entries for _ in 0..capacity_remain { - if cursor.key().is_none() { + if cursor.next().is_none() { break; } - cursor.move_next(); } let end = cursor - .key() + .peek_next() + .map(|(k, _)| k) .unwrap_or_else(|| range_cache.last_key_value().unwrap().0) .clone(); @@ -165,27 +165,27 @@ pub(super) fn shrink_partition_cache( let mut cursor = range_cache.inner().lower_bound(Bound::Excluded(&ck_end)); // go forward for at most `MAGIC_JITTER_PREVENTION` entries for _ in 0..MAGIC_JITTER_PREVENTION { - if cursor.key().is_none() { + if cursor.next().is_none() { break; } - cursor.move_next(); capacity_remain -= 1; } let end = cursor - .key() + .peek_next() + .map(|(k, _)| k) .unwrap_or_else(|| range_cache.last_key_value().unwrap().0) .clone(); let mut cursor = range_cache.inner().upper_bound(Bound::Included(&ck_end)); // go back for at most `capacity_remain` entries for _ in 0..capacity_remain { - if cursor.key().is_none() { + if cursor.prev().is_none() { break; } - cursor.move_prev(); } let start = cursor - .key() + .peek_prev() + .map(|(k, _)| k) .unwrap_or_else(|| range_cache.first_key_value().unwrap().0) .clone();