Skip to content

Commit

Permalink
Fix LRANGE usage in non-stream redis (#105)
Browse files Browse the repository at this point in the history
We `LPUSH` onto front of processing queue, so we should
`LRANGE` the end of the queue when looking for messages
to reenqueue.
  • Loading branch information
jaymell authored Sep 6, 2024
1 parent b49857b commit 551f087
Showing 1 changed file with 2 additions and 2 deletions.
4 changes: 2 additions & 2 deletions omniqueue/src/backends/redis/fallback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,15 +244,15 @@ async fn reenqueue_timed_out_messages<R: RedisConnection>(

let mut conn = pool.get().await?;

let keys: Vec<RawPayload> = conn.lrange(processing_queue_key, 0, 1).await?;
let keys: Vec<RawPayload> = conn.lrange(processing_queue_key, -1, -1).await?;

// If the key is older than now, it means we should be processing keys
let validity_limit = KsuidMs::new(Some(OffsetDateTime::now_utc() - ack_deadline), None)
.to_string()
.into_bytes();

if !keys.is_empty() && keys[0] <= validity_limit {
let keys: Vec<RawPayload> = conn.lrange(processing_queue_key, 0, BATCH_SIZE).await?;
let keys: Vec<RawPayload> = conn.lrange(processing_queue_key, -BATCH_SIZE, -1).await?;
for key in keys {
if key <= validity_limit {
let internal = internal_from_list(&key)?;
Expand Down

0 comments on commit 551f087

Please sign in to comment.