Skip to content

Commit

Permalink
Cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
jaymell committed Aug 30, 2024
1 parent abc5ee5 commit fc3b107
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 23 deletions.
14 changes: 7 additions & 7 deletions omniqueue/src/backends/redis/fallback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub(super) async fn send_raw<R: RedisConnection>(
.get()
.await
.map_err(QueueError::generic)?
.lpush(&producer.queue_key, payload.list_payload())
.lpush(&producer.queue_key, payload.into_list_payload())
.await
.map_err(QueueError::generic)
}
Expand Down Expand Up @@ -200,16 +200,16 @@ async fn reenqueue_timed_out_messages<R: RedisConnection>(
for key in keys {
if key <= validity_limit {
let payload = InternalPayload::from_list_item(&key)?;

let refreshed_key = payload.list_payload();
if payload.num_receives >= max_receives {
let num_receives = payload.num_receives;
let refreshed_key = payload.into_list_payload();
if num_receives >= max_receives {
trace!(
num_receives = payload.num_receives,
num_receives = num_receives,
"Maximum attempts reached for message, not reenqueuing",
);
} else {
trace!(
num_receives = payload.num_receives,
num_receives = num_receives,
"Pushing back overdue task to queue"
);
let _: () = conn.rpush(queue_key, &refreshed_key).await?;
Expand All @@ -228,5 +228,5 @@ async fn reenqueue_timed_out_messages<R: RedisConnection>(
}

fn regenerate_key(key: &[u8]) -> Result<RawPayload> {
Ok(InternalPayload::from_list_item(key)?.list_payload())
Ok(InternalPayload::from_list_item(key)?.into_list_payload())
}
32 changes: 19 additions & 13 deletions omniqueue/src/backends/redis/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,11 @@ struct InternalPayload {
}

impl InternalPayload {
// This method is a bit goofy b/c incrementing `num_receives` is
// done when deserializing. This could also be broken into a separate
// This method is a bit goofy b/c `num_receives` is incremented
// when instantiated. This could also be broken into a separate
// method for clarity, but doing so may be more error-prone since
// it would need to be called everywhere this method is called:
// the struct would need to be cloned and the increment method called
// everywhere this method is called:
fn from_list_item(payload: &[u8]) -> Result<Self> {
// All information is stored in the key in which the ID and the [optional]
// number of prior receives are separated by a `#`, and the JSON
Expand Down Expand Up @@ -135,10 +136,10 @@ impl InternalPayload {
})
}

// This method is a bit goofy b/c incrementing `num_receives` is
// done when deserializing. This could also be broken into a separate
// This method is a bit goofy b/c `num_receives` is incremented
// when instantiated. This could also be broken into a separate
// method for clarity, but doing so may be more error-prone since
// it would need to be called everywhere this method is called:
// it would need to be called most places this method is called:
fn from_stream_id(stream_id: &StreamId, payload_key: &str) -> Result<Self> {
let StreamId { map, .. } = stream_id;

Expand All @@ -163,19 +164,20 @@ impl InternalPayload {
})
}

fn list_payload(&self) -> Vec<u8> {
fn into_list_payload(mut self) -> Vec<u8> {
let id = delayed_key_id();

let mut result = Vec::with_capacity(id.len() + self.payload.len() + 1);
let num_receives = self.num_receives.to_string();
let mut result =
Vec::with_capacity(id.len() + num_receives.as_bytes().len() + self.payload.len() + 3);
result.extend(id.as_bytes());
result.push(b'#');
result.extend(self.num_receives.to_string().as_bytes());
result.extend(num_receives.as_bytes());
result.push(b'|');
result.extend(&self.payload);
result.append(&mut self.payload);
result
}

fn stream_payload(self, payload_key: &str) -> Vec<(&str, Vec<u8>)> {
fn into_stream_payload(self, payload_key: &str) -> Vec<(&str, Vec<u8>)> {
vec![
(payload_key, self.payload),
(
Expand Down Expand Up @@ -713,7 +715,11 @@ impl<R: RedisConnection> RedisProducer<R> {
.get()
.await
.map_err(QueueError::generic)?
.zadd(&self.delayed_queue_key, payload.list_payload(), timestamp)
.zadd(
&self.delayed_queue_key,
payload.into_list_payload(),
timestamp,
)
.await
.map_err(QueueError::generic)?;

Expand Down
6 changes: 3 additions & 3 deletions omniqueue/src/backends/redis/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub(super) async fn send_raw<R: RedisConnection>(
.xadd(
&producer.queue_key,
GENERATE_STREAM_ID,
&payload.stream_payload(&producer.payload_key),
&payload.into_stream_payload(&producer.payload_key),
)
.await
.map_err(QueueError::generic)
Expand Down Expand Up @@ -178,7 +178,7 @@ pub(super) async fn add_to_main_queue(
let _ = pipe.xadd(
main_queue_name,
GENERATE_STREAM_ID,
&payload.stream_payload(payload_key),
&payload.into_stream_payload(payload_key),
);
}

Expand Down Expand Up @@ -283,7 +283,7 @@ async fn reenqueue_timed_out_messages<R: RedisConnection>(
let _ = pipe.xadd(
main_queue_name,
GENERATE_STREAM_ID,
&internal_payload.stream_payload(payload_key),
&internal_payload.into_stream_payload(payload_key),
);
}

Expand Down

0 comments on commit fc3b107

Please sign in to comment.