-
Notifications
You must be signed in to change notification settings - Fork 142
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for Redis 5 #228
base: master
Are you sure you want to change the base?
Conversation
|
||
def toOptionSeqStringSeqEntry[K, F, V](mb: MultiBulk)(implicit deserializerK: ByteStringDeserializer[K], deserializerF: ByteStringDeserializer[F], deserializerV: ByteStringDeserializer[V]): Option[Seq[(K, Seq[StreamEntry[F, V]])]] = | ||
mb.responses.map { r => | ||
r.map(_.asInstanceOf[MultiBulk]).map(toStringSeqEntry[K,F,V]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you could squeeze some perf by doing map
only once
r.map{ case mb: MultiBulk => toStringSeqEntry[K,F,V](mb)}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I'll include this in my next pull request.
StreamEntry(id, fields) | ||
} | ||
|
||
def toSeqEntry[F, V](mb: MultiBulk)(implicit deserializerF: ByteStringDeserializer[F], deserializerV: ByteStringDeserializer[V]): Seq[StreamEntry[F, V]] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
relatively similar to MultiBulkConverter.toSeqByteString
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While the method signature is similar, the underlying reply from Redis is significantly different.
The Stream commands use nested arrays in their replies to a much greater extent than commands for the other data structures. For example, XRANGE
returns an array of two-element arrays, where the first element is the stream id and the second element is an array of field-value pairs. Borrowing JSON notation, this would be something like:
[
[ ID1, [ FIELD1, VALUE1, FIELD2, VALUE2, ... ] ],
[ ID2, [ FIELD1, VALUE1, FIELD2, VALUE2, ... ] ],
...
]
where ID
, FIELD
and VALUE
are all Bulk strings and []
indicates a MultiBulk array.
The XREAD
reply is even more nested, with an array containing two element arrays where the first element is the stream key and the second element is a sequence of entries, similar to the XRANGE
reply.
Because the replies are structured in such specific ways for Stream comamnds, I opted to put the decoding logic in a separate object (StreamEntryDecoder) rather than adding it to MultiBulkConverter.
I opted to use unsafe operations (casting to MultiBulk, accessing elements by index) for two reasons. First, I think it's best for decoding to break immediately and loudly if we get a reply that doesn't match our understanding/implementation of the Redis spec. Second, I don't see good fallback options, aside from silently dropping parts of the response entirely. Throwing an exception seems like the best response. It seems like there is precedent here in the use of head/tail in MultiBulkConverter.seqtoMapString
and MultiBulkConverter.toOptionStringByteString
, which will also throw exceptions if the array is too small.
How would you feel about introducing a new exception specifically for decode errors? This feels cleaner than emitting low-level exceptions like ClassCastException
, NoSuchElementException
, etc., and would allow callers to implement custom decode error handling if desired. I thought about reusing ReplyErrorException for this, but that seems best reserved for error messages from the Redis server itself. Perhaps ReplyDecodeException
?
|
||
private [redis] object StreamEntryDecoder { | ||
def toEntry[F, V](mb: MultiBulk)(implicit deserializerF: ByteStringDeserializer[F], deserializerV: ByteStringDeserializer[V]): StreamEntry[F, V] = { | ||
val r = mb.responses.get |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are using "unsafe" method here and there (.get
, r(1) (seq.apply(index))
I don't know how safe it is to do it here.
Maybe you could try to compare with what we did in this file
object MultiBulkConverter { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please refer to the comment above for the reasoning behind usage of "unsafe" methods.
Hi there!
As noted in my previous pull request, I've made some changes to support redis 5.0 and the new Streams commands. This branch is based on my features/redis-4.0 branch, so please disregard the common changes. (I'll rebase this if the other pull request is accepted.)
This is still a work-in-progress, but I think there is enough here to get meaningful feedback, especially on the interface and reply decoding for stream commands. If the current approach looks good, I'll add support for consumer groups and blocking stream commands.
Thanks,
-David