Skip to content

Commit

Permalink
Merge branch 'main' into kafka-cluster-sasl
Browse files Browse the repository at this point in the history
  • Loading branch information
conorbros authored Mar 8, 2024
2 parents 3dd0936 + 1ba1c48 commit 0ef6b28
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 102 deletions.
13 changes: 13 additions & 0 deletions shotover/src/message/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,19 @@ impl Message {
}
}

/// This method should be called when generating a new request travelling down a seperate chain to an original request.
/// The generated request will share the same MessageId as the message it is diverged from.
pub fn from_frame_diverged(frame: Frame, diverged_from: &Message) -> Self {
Message {
codec_state: frame.as_codec_state(),
inner: Some(MessageInner::Modified { frame }),
meta_timestamp: None,
received_from_source_or_sink_at: diverged_from.received_from_source_or_sink_at,
id: diverged_from.id(),
request_id: None,
}
}

/// Same as [`Message::from_bytes`] but `received_from_source_or_sink_at` is set to None.
pub fn from_bytes(bytes: Bytes, codec_state: CodecState) -> Self {
Self::from_bytes_at_instant(bytes, codec_state, None)
Expand Down
4 changes: 0 additions & 4 deletions shotover/src/transforms/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,10 +323,6 @@ pub trait Transform: Send {
/// - When writing protocol generic transforms: always ensure this is upheld.
/// - When writing a transform specific to a protocol that is out of order: you can disregard this requirement
/// * This is currently only cassandra
/// * Deprecated invariants:
/// + Many transforms rely on the number of responses equalling the number of requests and that requests will be in the same order as the responses.
/// Currently shotover maintains this gaurantee for backwards compatibility
/// but the gaurantee will be removed as soon as the transforms have been altered to no longer rely on it.
///
/// # Naming
/// Transform also have different naming conventions.
Expand Down
218 changes: 120 additions & 98 deletions shotover/src/transforms/redis/cache.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::config::chain::TransformChainConfig;
use crate::frame::{CassandraFrame, CassandraOperation, Frame, MessageType, RedisFrame};
use crate::message::{Message, Messages};
use crate::message::{Message, MessageIdMap, Messages, Metadata};
use crate::transforms::chain::{TransformChain, TransformChainBuilder};
use crate::transforms::{
Transform, TransformBuilder, TransformConfig, TransformContextConfig, Wrapper,
Expand Down Expand Up @@ -124,6 +124,9 @@ impl TransformBuilder for SimpleRedisCacheBuilder {
cache_chain: self.cache_chain.build(),
caching_schema: self.caching_schema.clone(),
missed_requests: self.missed_requests.clone(),
pending_cache_requests: Default::default(),
cache_hit_cassandra_responses: vec![],
cache_miss_cassandra_requests: vec![],
})
}

Expand Down Expand Up @@ -151,122 +154,144 @@ pub struct SimpleRedisCache {
cache_chain: TransformChain,
caching_schema: HashMap<FQName, TableCacheSchema>,
missed_requests: Counter,
pending_cache_requests: MessageIdMap<Message>,

/// cleared by the end of every `Transform::transform` call, stored here to avoid reallocation
cache_hit_cassandra_responses: Vec<Message>,
/// cleared by the end of every `Transform::transform` call, stored here to avoid reallocation
cache_miss_cassandra_requests: Vec<Message>,
}

impl SimpleRedisCache {
fn build_cache_query(&mut self, cassandra_messages: &mut Messages) -> (Messages, Vec<usize>) {
let mut indices = Vec::with_capacity(cassandra_messages.len());
let redis_requests = cassandra_messages
.iter_mut()
.enumerate()
.filter_map(|(i, message)| {
if let Some(Frame::Cassandra(CassandraFrame {
operation: CassandraOperation::Query { query, .. },
..
})) = message.frame()
{
if let CacheableState::CacheRow = is_cacheable(query) {
if let Some(table_name) = query.get_table_name() {
if let Some(table_cache_schema) = self.caching_schema.get(table_name) {
match build_redis_key_from_cql3(query, table_cache_schema) {
Ok(address) => {
indices.push(i);
return Some(Message::from_frame_at_instant(
Frame::Redis(RedisFrame::Array(vec![
RedisFrame::BulkString("HGET".into()),
RedisFrame::BulkString(address.key),
RedisFrame::BulkString(address.field),
])),
message.received_from_source_or_sink_at,
));
}
Err(_e) => {} // TODO match Err(()) here or just have build_redis_key_from_cql3 return Option
}
fn build_cache_query(&mut self, request: &mut Message) -> Option<Message> {
if let Some(Frame::Cassandra(CassandraFrame {
operation: CassandraOperation::Query { query, .. },
..
})) = request.frame()
{
if let CacheableState::CacheRow = is_cacheable(query) {
if let Some(table_name) = query.get_table_name() {
if let Some(table_cache_schema) = self.caching_schema.get(table_name) {
match build_redis_key_from_cql3(query, table_cache_schema) {
Ok(address) => {
return Some(Message::from_frame_diverged(
Frame::Redis(RedisFrame::Array(vec![
RedisFrame::BulkString("HGET".into()),
RedisFrame::BulkString(address.key),
RedisFrame::BulkString(address.field),
])),
request,
));
}
Err(_e) => {} // TODO match Err(()) here or just have build_redis_key_from_cql3 return Option
}
}
}
None
})
.collect();
(redis_requests, indices)
}
}

None
}

fn unwrap_cache_response(
&self,
mut redis_responses: Messages,
redis_indices: Vec<usize>,
cassandra_requests: &mut Messages,
) -> Vec<(Message, usize)> {
redis_responses
.iter_mut()
.zip(redis_indices)
.filter_map(|(redis_response, redis_index)| {
match redis_response.frame() {
Some(Frame::Redis(redis_frame)) => {
match redis_frame {
RedisFrame::Error(err) => {
error!("Redis cache server returned error: {err:?}");
None
}
RedisFrame::BulkString(redis_bytes) => {
match CassandraFrame::from_bytes(redis_bytes.clone(), Compression::None) {
Ok(mut response_frame) => {
if let Some(Frame::Cassandra(request_frame)) =
cassandra_requests[redis_index].frame()
{
if response_frame.version == request_frame.version {
response_frame.stream_id = request_frame.stream_id;
Some((
Message::from_frame_at_instant(
Frame::Cassandra(response_frame),
redis_response.received_from_source_or_sink_at
),
redis_index,
))
fn unwrap_cache_response(&mut self, redis_responses: Messages) {
for mut redis_response in redis_responses {
let original_request = self
.pending_cache_requests
.remove(
&redis_response
.request_id()
.expect("This must have a request, since we dont use redis pubsub"),
)
.expect("There must be a pending request, since we store a pending request for all redis requests");
let cassandra_frame = match redis_response.frame() {
Some(Frame::Redis(redis_frame)) => {
match redis_frame {
RedisFrame::Error(err) => {
error!("Redis cache server returned error: {err:?}");
None
}
RedisFrame::BulkString(redis_bytes) => {
match CassandraFrame::from_bytes(redis_bytes.clone(), Compression::None)
{
Ok(mut response_frame) => {
match original_request.metadata() {
Ok(Metadata::Cassandra(meta)) => {
if response_frame.version == meta.version {
response_frame.stream_id = meta.stream_id;
Some(response_frame)
} else {
// TODO: we should have some logic to convert to the
// expected version instead of just failing here
error!("Failed to use cache as mismatch between request version and cached response version");
None
}
} else {
error!("Failed to use cache as not cassandra request");
}
Ok(_) => {
error!("Not a cassandra request");
None
}
Err(err) => {
error!("invalid request {err:?}");
None
}
}
Err(err) => {
error!("Failed to decode cached cassandra message {err:?}");
None
}
}
Err(err) => {
error!("Failed to decode cached cassandra message {err:?}");
None
}
}
RedisFrame::Null => {
self.missed_requests.increment(1);
None
}
_ => None,
}
RedisFrame::Null => {
self.missed_requests.increment(1);
None
}
_ => None,
}
_ => None,
}
})
.collect()
_ => None,
};
match cassandra_frame {
Some(cassandra_frame) => {
self.cache_hit_cassandra_responses
.push(Message::from_frame_diverged(
Frame::Cassandra(cassandra_frame),
&redis_response,
));
}
None => self.cache_miss_cassandra_requests.push(original_request),
}
}
}

async fn read_from_cache(
&mut self,
cassandra_requests: &mut Messages,
local_addr: SocketAddr,
) -> Result<Vec<(Message, usize)>> {
let (redis_requests, redis_indices) = self.build_cache_query(cassandra_requests);
) -> Result<()> {
let mut redis_requests = Vec::with_capacity(cassandra_requests.len());

for mut cassandra_request in cassandra_requests.drain(..) {
match self.build_cache_query(&mut cassandra_request) {
// The request is cacheable, store the cassandra request for later and send the redis request
Some(redis_request) => {
self.pending_cache_requests
.insert(cassandra_request.id(), cassandra_request);
redis_requests.push(redis_request);
}
// The request is not cacheable, add it directly to the cache miss list
None => self.cache_miss_cassandra_requests.push(cassandra_request),
}
}

let redis_responses = self
.cache_chain
.process_request(Wrapper::new_with_addr(redis_requests, local_addr))
.await?;

Ok(self.unwrap_cache_response(redis_responses, redis_indices, cassandra_requests))
self.unwrap_cache_response(redis_responses);

Ok(())
}

/// Clears the cache for the entire table
Expand Down Expand Up @@ -584,31 +609,28 @@ impl Transform for SimpleRedisCache {
}

async fn transform<'a>(&'a mut self, mut requests_wrapper: Wrapper<'a>) -> Result<Messages> {
let cache_responses = self
.read_from_cache(&mut requests_wrapper.requests, requests_wrapper.local_addr)
self.read_from_cache(&mut requests_wrapper.requests, requests_wrapper.local_addr)
.await
.unwrap_or_else(|err| {
error!("Failed to fetch from cache: {err:?}");
vec![]
});

// remove requests we succesfully got back a cached response for
for (_, cache_index) in cache_responses.iter().rev() {
requests_wrapper.requests.remove(*cache_index);
}

.unwrap_or_else(|err| error!("Failed to fetch from cache: {err:?}"));

// send the cache misses to cassandra
// since requests_wrapper.requests is now empty we can just swap the two vectors to avoid reallocations
assert!(requests_wrapper.requests.is_empty());
std::mem::swap(
&mut requests_wrapper.requests,
&mut self.cache_miss_cassandra_requests,
);
let mut responses = self
.execute_upstream_and_write_to_cache(requests_wrapper)
.await?;

// mix cached response in with our non cached responses
for (cache_response, cache_index) in cache_responses.into_iter() {
responses.insert(cache_index, cache_response);
}
// add the cache hits to the final response
responses.append(&mut self.cache_hit_cassandra_responses);

Ok(responses)
}
}

#[cfg(test)]
mod test {
use crate::frame::cassandra::parse_statement_single;
Expand Down

0 comments on commit 0ef6b28

Please sign in to comment.