From 1ba1c48975d7aca2b62da8ccc509c2cfc3b1e3e2 Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Fri, 8 Mar 2024 14:59:05 +1100 Subject: [PATCH] RedisCache: port to MessageId invariants (#1516) --- shotover/src/message/mod.rs | 13 ++ shotover/src/transforms/mod.rs | 4 - shotover/src/transforms/redis/cache.rs | 218 ++++++++++++++----------- 3 files changed, 133 insertions(+), 102 deletions(-) diff --git a/shotover/src/message/mod.rs b/shotover/src/message/mod.rs index 78a794806..5ae4731c4 100644 --- a/shotover/src/message/mod.rs +++ b/shotover/src/message/mod.rs @@ -135,6 +135,19 @@ impl Message { } } + /// This method should be called when generating a new request travelling down a seperate chain to an original request. + /// The generated request will share the same MessageId as the message it is diverged from. + pub fn from_frame_diverged(frame: Frame, diverged_from: &Message) -> Self { + Message { + codec_state: frame.as_codec_state(), + inner: Some(MessageInner::Modified { frame }), + meta_timestamp: None, + received_from_source_or_sink_at: diverged_from.received_from_source_or_sink_at, + id: diverged_from.id(), + request_id: None, + } + } + /// Same as [`Message::from_bytes`] but `received_from_source_or_sink_at` is set to None. pub fn from_bytes(bytes: Bytes, codec_state: CodecState) -> Self { Self::from_bytes_at_instant(bytes, codec_state, None) diff --git a/shotover/src/transforms/mod.rs b/shotover/src/transforms/mod.rs index 1c1dd51d8..034d8cb09 100644 --- a/shotover/src/transforms/mod.rs +++ b/shotover/src/transforms/mod.rs @@ -323,10 +323,6 @@ pub trait Transform: Send { /// - When writing protocol generic transforms: always ensure this is upheld. /// - When writing a transform specific to a protocol that is out of order: you can disregard this requirement /// * This is currently only cassandra - /// * Deprecated invariants: - /// + Many transforms rely on the number of responses equalling the number of requests and that requests will be in the same order as the responses. - /// Currently shotover maintains this gaurantee for backwards compatibility - /// but the gaurantee will be removed as soon as the transforms have been altered to no longer rely on it. /// /// # Naming /// Transform also have different naming conventions. diff --git a/shotover/src/transforms/redis/cache.rs b/shotover/src/transforms/redis/cache.rs index a5948fb67..b05643f5e 100644 --- a/shotover/src/transforms/redis/cache.rs +++ b/shotover/src/transforms/redis/cache.rs @@ -1,6 +1,6 @@ use crate::config::chain::TransformChainConfig; use crate::frame::{CassandraFrame, CassandraOperation, Frame, MessageType, RedisFrame}; -use crate::message::{Message, Messages}; +use crate::message::{Message, MessageIdMap, Messages, Metadata}; use crate::transforms::chain::{TransformChain, TransformChainBuilder}; use crate::transforms::{ Transform, TransformBuilder, TransformConfig, TransformContextConfig, Wrapper, @@ -124,6 +124,9 @@ impl TransformBuilder for SimpleRedisCacheBuilder { cache_chain: self.cache_chain.build(), caching_schema: self.caching_schema.clone(), missed_requests: self.missed_requests.clone(), + pending_cache_requests: Default::default(), + cache_hit_cassandra_responses: vec![], + cache_miss_cassandra_requests: vec![], }) } @@ -151,122 +154,144 @@ pub struct SimpleRedisCache { cache_chain: TransformChain, caching_schema: HashMap, missed_requests: Counter, + pending_cache_requests: MessageIdMap, + + /// cleared by the end of every `Transform::transform` call, stored here to avoid reallocation + cache_hit_cassandra_responses: Vec, + /// cleared by the end of every `Transform::transform` call, stored here to avoid reallocation + cache_miss_cassandra_requests: Vec, } impl SimpleRedisCache { - fn build_cache_query(&mut self, cassandra_messages: &mut Messages) -> (Messages, Vec) { - let mut indices = Vec::with_capacity(cassandra_messages.len()); - let redis_requests = cassandra_messages - .iter_mut() - .enumerate() - .filter_map(|(i, message)| { - if let Some(Frame::Cassandra(CassandraFrame { - operation: CassandraOperation::Query { query, .. }, - .. - })) = message.frame() - { - if let CacheableState::CacheRow = is_cacheable(query) { - if let Some(table_name) = query.get_table_name() { - if let Some(table_cache_schema) = self.caching_schema.get(table_name) { - match build_redis_key_from_cql3(query, table_cache_schema) { - Ok(address) => { - indices.push(i); - return Some(Message::from_frame_at_instant( - Frame::Redis(RedisFrame::Array(vec![ - RedisFrame::BulkString("HGET".into()), - RedisFrame::BulkString(address.key), - RedisFrame::BulkString(address.field), - ])), - message.received_from_source_or_sink_at, - )); - } - Err(_e) => {} // TODO match Err(()) here or just have build_redis_key_from_cql3 return Option - } + fn build_cache_query(&mut self, request: &mut Message) -> Option { + if let Some(Frame::Cassandra(CassandraFrame { + operation: CassandraOperation::Query { query, .. }, + .. + })) = request.frame() + { + if let CacheableState::CacheRow = is_cacheable(query) { + if let Some(table_name) = query.get_table_name() { + if let Some(table_cache_schema) = self.caching_schema.get(table_name) { + match build_redis_key_from_cql3(query, table_cache_schema) { + Ok(address) => { + return Some(Message::from_frame_diverged( + Frame::Redis(RedisFrame::Array(vec![ + RedisFrame::BulkString("HGET".into()), + RedisFrame::BulkString(address.key), + RedisFrame::BulkString(address.field), + ])), + request, + )); } + Err(_e) => {} // TODO match Err(()) here or just have build_redis_key_from_cql3 return Option } } } - None - }) - .collect(); - (redis_requests, indices) + } + } + + None } - fn unwrap_cache_response( - &self, - mut redis_responses: Messages, - redis_indices: Vec, - cassandra_requests: &mut Messages, - ) -> Vec<(Message, usize)> { - redis_responses - .iter_mut() - .zip(redis_indices) - .filter_map(|(redis_response, redis_index)| { - match redis_response.frame() { - Some(Frame::Redis(redis_frame)) => { - match redis_frame { - RedisFrame::Error(err) => { - error!("Redis cache server returned error: {err:?}"); - None - } - RedisFrame::BulkString(redis_bytes) => { - match CassandraFrame::from_bytes(redis_bytes.clone(), Compression::None) { - Ok(mut response_frame) => { - if let Some(Frame::Cassandra(request_frame)) = - cassandra_requests[redis_index].frame() - { - if response_frame.version == request_frame.version { - response_frame.stream_id = request_frame.stream_id; - Some(( - Message::from_frame_at_instant( - Frame::Cassandra(response_frame), - redis_response.received_from_source_or_sink_at - ), - redis_index, - )) + fn unwrap_cache_response(&mut self, redis_responses: Messages) { + for mut redis_response in redis_responses { + let original_request = self + .pending_cache_requests + .remove( + &redis_response + .request_id() + .expect("This must have a request, since we dont use redis pubsub"), + ) + .expect("There must be a pending request, since we store a pending request for all redis requests"); + let cassandra_frame = match redis_response.frame() { + Some(Frame::Redis(redis_frame)) => { + match redis_frame { + RedisFrame::Error(err) => { + error!("Redis cache server returned error: {err:?}"); + None + } + RedisFrame::BulkString(redis_bytes) => { + match CassandraFrame::from_bytes(redis_bytes.clone(), Compression::None) + { + Ok(mut response_frame) => { + match original_request.metadata() { + Ok(Metadata::Cassandra(meta)) => { + if response_frame.version == meta.version { + response_frame.stream_id = meta.stream_id; + Some(response_frame) } else { // TODO: we should have some logic to convert to the // expected version instead of just failing here error!("Failed to use cache as mismatch between request version and cached response version"); None } - } else { - error!("Failed to use cache as not cassandra request"); + } + Ok(_) => { + error!("Not a cassandra request"); + None + } + Err(err) => { + error!("invalid request {err:?}"); None } - } - Err(err) => { - error!("Failed to decode cached cassandra message {err:?}"); - None } } + Err(err) => { + error!("Failed to decode cached cassandra message {err:?}"); + None + } } - RedisFrame::Null => { - self.missed_requests.increment(1); - None - } - _ => None, } + RedisFrame::Null => { + self.missed_requests.increment(1); + None + } + _ => None, } - _ => None, } - }) - .collect() + _ => None, + }; + match cassandra_frame { + Some(cassandra_frame) => { + self.cache_hit_cassandra_responses + .push(Message::from_frame_diverged( + Frame::Cassandra(cassandra_frame), + &redis_response, + )); + } + None => self.cache_miss_cassandra_requests.push(original_request), + } + } } async fn read_from_cache( &mut self, cassandra_requests: &mut Messages, local_addr: SocketAddr, - ) -> Result> { - let (redis_requests, redis_indices) = self.build_cache_query(cassandra_requests); + ) -> Result<()> { + let mut redis_requests = Vec::with_capacity(cassandra_requests.len()); + + for mut cassandra_request in cassandra_requests.drain(..) { + match self.build_cache_query(&mut cassandra_request) { + // The request is cacheable, store the cassandra request for later and send the redis request + Some(redis_request) => { + self.pending_cache_requests + .insert(cassandra_request.id(), cassandra_request); + redis_requests.push(redis_request); + } + // The request is not cacheable, add it directly to the cache miss list + None => self.cache_miss_cassandra_requests.push(cassandra_request), + } + } let redis_responses = self .cache_chain .process_request(Wrapper::new_with_addr(redis_requests, local_addr)) .await?; - Ok(self.unwrap_cache_response(redis_responses, redis_indices, cassandra_requests)) + self.unwrap_cache_response(redis_responses); + + Ok(()) } /// Clears the cache for the entire table @@ -584,31 +609,28 @@ impl Transform for SimpleRedisCache { } async fn transform<'a>(&'a mut self, mut requests_wrapper: Wrapper<'a>) -> Result { - let cache_responses = self - .read_from_cache(&mut requests_wrapper.requests, requests_wrapper.local_addr) + self.read_from_cache(&mut requests_wrapper.requests, requests_wrapper.local_addr) .await - .unwrap_or_else(|err| { - error!("Failed to fetch from cache: {err:?}"); - vec![] - }); - - // remove requests we succesfully got back a cached response for - for (_, cache_index) in cache_responses.iter().rev() { - requests_wrapper.requests.remove(*cache_index); - } - + .unwrap_or_else(|err| error!("Failed to fetch from cache: {err:?}")); + + // send the cache misses to cassandra + // since requests_wrapper.requests is now empty we can just swap the two vectors to avoid reallocations + assert!(requests_wrapper.requests.is_empty()); + std::mem::swap( + &mut requests_wrapper.requests, + &mut self.cache_miss_cassandra_requests, + ); let mut responses = self .execute_upstream_and_write_to_cache(requests_wrapper) .await?; - // mix cached response in with our non cached responses - for (cache_response, cache_index) in cache_responses.into_iter() { - responses.insert(cache_index, cache_response); - } + // add the cache hits to the final response + responses.append(&mut self.cache_hit_cassandra_responses); Ok(responses) } } + #[cfg(test)] mod test { use crate::frame::cassandra::parse_statement_single;