Skip to content

Commit

Permalink
Update redis-protocol 0.4 to 0.5 (#1556)
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Apr 2, 2024
1 parent a10119f commit 6dff7b7
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 17 deletions.
21 changes: 17 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ rcgen = "0.12.0"
subprocess = "0.2.7"
chacha20poly1305 = { version = "0.10.0", features = ["std"] }
csv = "1.2.0"
redis-protocol = { version = "4.0.1", features = ["decode-mut"] }
redis-protocol = { version = "5.0.0", features = ["bytes"] }
bincode = "1.3.1"
futures = "0.3"
hex = "0.4.3"
Expand Down
2 changes: 1 addition & 1 deletion shotover-proxy/tests/redis_int_tests/basic_driver_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1309,7 +1309,7 @@ async fn read_redis_message(connection: &mut TcpStream) -> RedisFrame {
let mut buffer = BytesMut::new();
loop {
if let Ok(Some((result, len))) =
redis_protocol::resp2::decode::decode(&buffer.clone().freeze())
redis_protocol::resp2::decode::decode_bytes(&buffer.clone().freeze())
{
let _ = buffer.split_to(len);
return result;
Expand Down
2 changes: 0 additions & 2 deletions shotover/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ redis = [
"dep:redis-protocol",
"dep:csv",
"dep:crc16",
"dep:bytes-utils"
]
opensearch = [
"dep:atoi",
Expand All @@ -52,7 +51,6 @@ atomic_enum = "0.3.0"
axum = { version = "0.7", default-features = false, features = ["tokio", "tracing", "http1"] }
pretty-hex = "0.4.0"
tokio-stream = "0.1.2"
bytes-utils = { version = "0.1.1", optional = true }
derivative = "2.1.1"
cached = { version = "0.49", features = ["async"], optional = true }
governor = { version = "0.6", default-features = false, features = ["std", "jitter", "quanta"] }
Expand Down
8 changes: 4 additions & 4 deletions shotover/src/codec/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use crate::message::{Encodable, Message, MessageId, Messages};
use anyhow::{anyhow, Result};
use bytes::BytesMut;
use metrics::Histogram;
use redis_protocol::resp2::prelude::decode_mut;
use redis_protocol::resp2::prelude::encode_bytes;
use redis_protocol::resp2::decode::decode_bytes_mut;
use redis_protocol::resp2::encode::extend_encode;
use tokio_util::codec::{Decoder, Encoder};

#[derive(Clone)]
Expand Down Expand Up @@ -100,7 +100,7 @@ impl Decoder for RedisDecoder {
// Once thats done we will have pubsub support for both RedisSinkSingle AND RedisSinkCluster. Progress!
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
let received_at = Instant::now();
match decode_mut(src)
match decode_bytes_mut(src)
.map_err(|e| CodecReadError::Parser(anyhow!(e).context("Error decoding redis frame")))?
{
Some((frame, _size, bytes)) => {
Expand Down Expand Up @@ -241,7 +241,7 @@ impl Encoder<Messages> for RedisEncoder {
}
Encodable::Frame(frame) => {
let item = frame.into_redis().unwrap();
encode_bytes(dst, &item)
extend_encode(dst, &item)
.map(|_| ())
.map_err(|e| anyhow!("Redis encoding error: {} - {:#?}", e, item))
}
Expand Down
4 changes: 2 additions & 2 deletions shotover/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use kafka::KafkaFrame;
#[cfg(feature = "opensearch")]
pub use opensearch::OpenSearchFrame;
#[cfg(feature = "redis")]
pub use redis_protocol::resp2::types::Frame as RedisFrame;
pub use redis_protocol::resp2::types::BytesFrame as RedisFrame;
use std::fmt::{Display, Formatter, Result as FmtResult};

#[cfg(feature = "cassandra")]
Expand Down Expand Up @@ -131,7 +131,7 @@ impl Frame {
CassandraFrame::from_bytes(bytes, codec_state.as_cassandra()).map(Frame::Cassandra)
}
#[cfg(feature = "redis")]
MessageType::Redis => redis_protocol::resp2::decode::decode(&bytes)
MessageType::Redis => redis_protocol::resp2::decode::decode_bytes(&bytes)
.map(|x| Frame::Redis(x.unwrap().0))
.map_err(|e| anyhow!("{e:?}")),
#[cfg(feature = "kafka")]
Expand Down
33 changes: 30 additions & 3 deletions shotover/src/transforms/redis/sink_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use crate::transforms::{
use anyhow::{anyhow, bail, ensure, Context, Result};
use async_trait::async_trait;
use bytes::Bytes;
use bytes_utils::string::Str;
use derivative::Derivative;
use futures::stream::FuturesOrdered;
use futures::stream::FuturesUnordered;
Expand All @@ -24,7 +23,8 @@ use metrics::counter;
use rand::rngs::SmallRng;
use rand::seq::IteratorRandom;
use rand::SeedableRng;
use redis_protocol::types::Redirection;
use redis_protocol::bytes_utils::string::Str;
use redis_protocol::resp2::types::Resp2Frame;
use serde::{Deserialize, Serialize};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::sync::Arc;
Expand Down Expand Up @@ -1054,7 +1054,7 @@ impl Transform for RedisSinkCluster {
let mut response = response?;
match response.frame() {
Some(Frame::Redis(frame)) => {
match frame.to_redirection() {
match Redirection::parse(frame) {
Some(Redirection::Moved { slot, server }) => {
debug!("Got MOVE {} {}", slot, server);

Expand Down Expand Up @@ -1088,6 +1088,33 @@ impl Transform for RedisSinkCluster {
}
}

enum Redirection {
Moved { slot: u16, server: String },
Ask { slot: u16, server: String },
}

impl Redirection {
fn parse(frame: &RedisFrame) -> Option<Redirection> {
match frame {
RedisFrame::Error(err) => {
let mut tokens = err.split(' ');
match tokens.next()? {
"MOVED" => Some(Redirection::Moved {
slot: tokens.next()?.parse().ok()?,
server: tokens.next()?.to_owned(),
}),
"ASK" => Some(Redirection::Ask {
slot: tokens.next()?.parse().ok()?,
server: tokens.next()?.to_owned(),
}),
_ => None,
}
}
_ => None,
}
}
}

#[derive(Clone, PartialEq, Eq, Hash, Derivative)]
#[derivative(Debug)]
pub struct UsernamePasswordToken {
Expand Down

0 comments on commit 6dff7b7

Please sign in to comment.