-
Notifications
You must be signed in to change notification settings - Fork 18
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
2 changed files
with
271 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,271 @@ | ||
# End to end overview | ||
|
||
This document will use valkey as an example for explaining the end to end flow of messages through shotover. | ||
The same flow within shotover is used for all protocols, so this document should still be useful if you are working with another protocol. | ||
|
||
The general flow of messages though shotover looks like: | ||
|
||
![ | ||
Client -> ValkeyCodec -> ValkeySource -> Some transform -> Another transform -> ValkeySinkCluster -> ValkeyCodec -> Valkey | ||
](end-to-end-overview.png) | ||
|
||
## The client | ||
|
||
A user sends a valkey command through their client: | ||
|
||
1. The user calls: `client.set("foo", "bar")`. | ||
2. The client translates the `set(..)` arguments into a RESP request that looks like: ["SET", "foo", "bar"] | ||
3. A hash is taken of the key "foo" which is used to choose which shotover node to send the request to. | ||
4. The RESP request is converted into the RESP wire format, which is purely ascii except for user data: | ||
|
||
```text | ||
*3 | ||
$3 | ||
SET | ||
$3 | ||
foo | ||
$3 | ||
bar | ||
``` | ||
|
||
`*3` means an array with 3 elements. | ||
The first element is `$3\nSET`, which means a string of length 3 containing `SET`. | ||
The second and third arguments are also strings of length 3: `$3\nfoo` and `$3\nbar` | ||
|
||
5. The bytes of the message are sent over a TCP connection to the chosen shotover node. In this example, no such connection exists so a new one is made. | ||
|
||
## Shotover accepts a new connection | ||
|
||
When [ValkeySource](https://github.com/shotover/shotover-proxy/blob/de0d1a3fafb92cf1875dd9ca79b277faf3cb3e77/shotover/src/sources/valkey.rs#L54) is created during shotover startup, it creates a `TcpCodecListener` and then calls [TcpCodecListener::run](https://github.com/shotover/shotover-proxy/blob/de0d1a3fafb92cf1875dd9ca79b277faf3cb3e77/shotover/src/server.rs#L160) which listens in a background task for incoming TCP connections on the sources configured port. | ||
`TcpCodecListener` accepts a new connection from the valkey client and constructs and runs a `Handler` type, which manages the connection. | ||
The Handler type creates: | ||
|
||
* read/write tasks around the TCP connection. | ||
* A `ValkeyEncoder` and `ValkeyDecoder` pair is created from [ValkeyCodecBuilder](https://github.com/shotover/shotover-proxy/blob/de0d1a3fafb92cf1875dd9ca79b277faf3cb3e77/shotover/src/server.rs#L449). | ||
* The `ValkeyEncoder` is given to the [write task](https://github.com/shotover/shotover-proxy/blob/de0d1a3fafb92cf1875dd9ca79b277faf3cb3e77/shotover/src/server.rs#L517) | ||
* The `ValkeyDecoder` is given to the [read task](https://github.com/shotover/shotover-proxy/blob/de0d1a3fafb92cf1875dd9ca79b277faf3cb3e77/shotover/src/server.rs#L467) | ||
* a new [transform chain](https://github.com/shotover/shotover-proxy/blob/de0d1a3fafb92cf1875dd9ca79b277faf3cb3e77/shotover/src/server.rs#L208) instance to handle the requests coming in from this connection. | ||
* This transform chain instance handles a single connection passing from the client to valkey and isolates it from other connections. | ||
|
||
The handler type then [continues to run](https://github.com/shotover/shotover-proxy/blob/de0d1a3fafb92cf1875dd9ca79b277faf3cb3e77/shotover/src/server.rs#L677), routing requests and responses between the transform chain and the client connection read/write tasks. | ||
|
||
## ValkeyDecoder | ||
|
||
The `tokio_util` crate provides an [Encoder](https://docs.rs/tokio-util/latest/tokio_util/codec/trait.Encoder.html) trait and a [Decoder](https://docs.rs/tokio-util/latest/tokio_util/codec/trait.Decoder.html) trait. | ||
|
||
Through this interface: | ||
|
||
* we provide the logic for how to encode and decode messages into and out of a buffer of bytes by implementing the traits. | ||
* tokio provides the logic for reading and writing the bytes from the actual TCP connection via the [FramedWrite](https://docs.rs/tokio-util/latest/tokio_util/codec/struct.FramedWrite.html) and [FramedRead](https://docs.rs/tokio-util/latest/tokio_util/codec/struct.FramedRead.html) types. | ||
|
||
Since TCP itself provides a stream of bytes without any application level framing [^1] it is up to the database protocol itself to implement framing on top of TCP. | ||
So the logic of a `Decoder` implementation must gracefully handle incomplete messages. Leaving any half received messages in the buffer. | ||
|
||
Protocols like kafka and cassandra achieve framing by including a message length in bytes in the header. This is great for shotover since it means we can avoid parsing the entire message when its not needed. | ||
However Valkey does not have a header so we always need to parse the entire message to find out where it ends. | ||
|
||
The [ValkeyDecoder](https://github.com/shotover/shotover-proxy/blob/d7547741d8b10c5f64f133e1145bf843f7fb57ec/shotover/src/codec/valkey.rs#L74) is an example of a `Decoder` implementation. | ||
|
||
Lets step through the `ValkeyDecoder` implementation: | ||
|
||
[^1]: Framing is how a protocol defines where individual messages begin and end. | ||
|
||
### Reading a message | ||
|
||
The first thing `ValkeyDecoder::decode` does is attempt to parse a valkey message from the beginning of the bytes. | ||
This is done by calling [decode_bytes_mut](https://docs.rs/redis-protocol/latest/redis_protocol/resp2/decode/fn.decode_bytes_mut.html) from the `redis-protocol` crate. | ||
There are a few possible return values: | ||
|
||
* Failure to parse because the message is not fully received yet - in this case we return `None` so that the `FramedRead` will call us again when more bytes have been received. | ||
* Any other kind of parse error - we bubble up the error, eventually resulting in the connection being terminated. | ||
* a message is successfully returned - we continue on and pass the message to the next stage. | ||
|
||
In this case the parsed message forms a structure of: | ||
|
||
```rust | ||
ValkeyFrame::Array(vec![ | ||
ValkeyFrame::BulkString("SET"), | ||
ValkeyFrame::BulkString("foo"), | ||
ValkeyFrame::BulkString("bar"), | ||
]) | ||
``` | ||
|
||
### Constructing a `Message` | ||
|
||
All messages in shotover are stored in a [Message](https://docs.rs/shotover/latest/shotover/message/struct.Message.html) type which is passed through each transform. | ||
`Message` abstracts over all the different protocols supported by shotover. | ||
|
||
The `ValkeyDecoder` constructs a message by calling `Message::from_bytes_and_frame_at_instant`. | ||
We pass in the raw bytes of the message and the parsed frame of the message, as well as a timestamp which is used purely for metrics. | ||
Protocols with better framing mechanisms will use a different constructor to avoid parsing the whole request unless its really needed. | ||
|
||
When the Message is created a new ID is generated and stored in the Message. This ID is a randomly generated 128bit integer used by transforms to match responses with their corresponding requests. This value is meaningful only within shotover and is not part of the redis protocol. | ||
Lets say in this example our message is assigned the ID `0xd12ac2704d19e53ef3fea94b4885c950`. | ||
|
||
`ValkeyDecoder::decode` then `return`s the `Message` to the caller: `tokio_util` `FramedRead`. | ||
|
||
## codec to transform glue | ||
|
||
The `Message` then goes through a few steps before it actually reaches a transform. | ||
|
||
1. The [read task created by the Handler](https://github.com/shotover/shotover-proxy/blob/de0d1a3fafb92cf1875dd9ca79b277faf3cb3e77/shotover/src/server.rs#L468) | ||
1. The message is read from the `FramedRead` | ||
2. The message is sent through a [tokio channel](https://docs.rs/tokio/latest/tokio/sync/mpsc/struct.UnboundedSender.html) | ||
* This logic is specifically run in a separate task to enable decoding of incoming requests to run in parallel of any messages currently being process by transforms (calling tokio async code will execute on the same core unless a task is used) | ||
2. The [Handler::run_loop](https://github.com/shotover/shotover-proxy/blob/de0d1a3fafb92cf1875dd9ca79b277faf3cb3e77/shotover/src/server.rs#L677) method loops for the lifetime of the incoming connection and: | ||
1. Listens for requests from the read task over the channel | ||
2. If there are any requests, all pending requests are collected into a batch (`Vec<Message>`) In our case the client is sending requests serially, waiting for responses each time. So this batch will contain only a single request. | ||
3. Creates a [ChainState](https://github.com/shotover/shotover-proxy/blob/4eed01edf42e7a9adca7016854dcbb6f08a25f68/shotover/src/transforms/mod.rs#L149). `ChainState` contains all the chain level state accessed by transforms. This includes things like the batch of requests, the address the client connected to, a flag to allow transforms to force close the connection. | ||
Transforms are free to alter the `ChainState` and the next transform in the chain will receive the same altered `ChainState`. | ||
4. Calls [TransformChain::process_request](https://github.com/shotover/shotover-proxy/blob/de0d1a3fafb92cf1875dd9ca79b277faf3cb3e77/shotover/src/transforms/chain.rs#L162) passing it the `ChainState`. | ||
3. `TransformChain::process_request`: | ||
1. Inserts the list of transforms in the chain into `ChainState` | ||
2. Calls `ChainState::call_next_transform` | ||
4. `ChainState::call_next_transform`: | ||
1. Pops the first transform from the list of transforms. | ||
2. Calls the transforms `transform` method, beginning execution of the transform. | ||
|
||
## Some Transform | ||
|
||
The first transform in the chain begins executing. | ||
Most transforms look something like this: | ||
|
||
```rust | ||
async fn transform<'shorter, 'longer: 'shorter>( | ||
&mut self, | ||
chain_state: &'shorter mut ChainState<'longer>, | ||
) -> Result<Messages> { | ||
// iterate over all requests | ||
for request in &mut chain_state.requests { | ||
// each request is of type Message | ||
if let Some(Frame::Valkey(frame)) = request.frame() { | ||
// Calling `frame` on the request returns the parsed frame of the message. | ||
// This assertion is silly, but would pass in the example request we are working through | ||
assert_eq!( | ||
frame, | ||
ValkeyFrame::Array(vec![ | ||
ValkeyFrame::BulkString(Bytes::from_static("SET")), | ||
ValkeyFrame::BulkString(Bytes::from_static("foo")), | ||
ValkeyFrame::BulkString(Bytes::from_static("bar")), | ||
]) | ||
) | ||
|
||
// At this point the transform is able to read and or rewrite the request as it pleases. | ||
// But for this example we will assume that no rewriting occurs. | ||
} | ||
} | ||
|
||
let mut responses = chain_state.call_next_transform().await?; | ||
|
||
for response in responses.iter_mut() { | ||
if let Some(Frame::Valkey(frame)) = request.frame() { | ||
// do something with the responses | ||
} | ||
} | ||
|
||
Ok(()) | ||
} | ||
``` | ||
|
||
At the point where `call_next_transform` is called, the next transform in the chain is popped from the list in `ChainState` and executed. | ||
Execution of this transform asynchronously waits until the request is completely sent. | ||
|
||
In the case of `RedisSinkCluster` (the sink transform used in this example) `call_next_transform` will also block until a response for each request has been received. But that is legacy behavior that should be fixed in the future. | ||
|
||
## Another Transform | ||
|
||
Another transform is called. | ||
This is the same as the previous section. | ||
However this time it pops the final transform from the list of transforms and executes it, in this scenario the final transform is `ValkeySinkCluster`. | ||
|
||
## ValkeySinkCluster | ||
|
||
The [ValkeySinkCluster](https://github.com/shotover/shotover-proxy/blob/de0d1a3fafb92cf1875dd9ca79b277faf3cb3e77/shotover/src/transforms/valkey/sink_cluster.rs#L1022) transform is quite complex so I will only describe it at a high level and assume it is configured in cluster hiding mode: | ||
|
||
1. For each request in `ChainState` | ||
1. Determine how to route the request via [RoutingInfo::for_command_frame](https://github.com/shotover/shotover-proxy/blob/de0d1a3fafb92cf1875dd9ca79b277faf3cb3e77/shotover/src/transforms/valkey/sink_cluster.rs#L726), in this case, since we are routing a `set` with key of `foo` we get `RoutingInfo::Slot(hash_of(foo))`. | ||
2. [Lookup the computed slot value](https://github.com/shotover/shotover-proxy/blob/de0d1a3fafb92cf1875dd9ca79b277faf3cb3e77/shotover/src/transforms/valkey/sink_cluster.rs#L239) against the list of redis nodes to find which redis node should handle this slot. | ||
3. Send the request to the redis node. A new outgoing connection is created if it does not exist yet. | ||
|
||
Other functionality of ValkeySinkCluster not listed above includes: | ||
|
||
* fetching and managing the metadata required for routing requests. | ||
* working in either cluster hiding or handling mode according to the configuration. | ||
|
||
## SinkConnection | ||
|
||
The standard way to form an outgoing connection is with [SinkConnection](https://github.com/shotover/shotover-proxy/blob/33f49fc1976df84ed538c9f58dd51a160b642968/shotover/src/connection.rs) | ||
However, out of the 6 sink transforms that shotover has currently, `ValkeySinkCluster` is the only sink transform not to use `SinkConnection`. | ||
This is only for legacy reasons, so to give a better overview of shotover, I'll be pretending that `ValkeySinkCluster` does actually use `SinkConnection`. | ||
|
||
The `SinkConnection` type contains a single TCP connection and allows sending and receiving shotover `Message`s with it. | ||
When the `SinkConnection` is created it runs [spawn_read_write_tasks](https://github.com/shotover/shotover-proxy/blob/33f49fc1976df84ed538c9f58dd51a160b642968/shotover/src/connection.rs#L253) which creates the tokio tasks for reading and writing to the outgoing connection. | ||
|
||
In our scenario the transform called [SinkConnection::send](https://github.com/shotover/shotover-proxy/blob/33f49fc1976df84ed538c9f58dd51a160b642968/shotover/src/connection.rs#L110) which sends a batch of requests to the writer task over a channel. | ||
|
||
The writer task then writes the message to `FramedWrite` which encodes the message to the TCP connection via `ValkeySinkCluster`. | ||
|
||
## ValkeyEncoder | ||
|
||
Earlier we talked about the `Encoder` and `Decoder` traits. | ||
The [ValkeyEncoder](https://github.com/shotover/shotover-proxy/blob/33f49fc1976df84ed538c9f58dd51a160b642968/shotover/src/codec/valkey.rs#L67) is an example of an `Encoder` implementation. | ||
|
||
The logic for `ValkeyEncoder::encode` looks like: | ||
|
||
1. The `into_encodable` method is called on each request. This method returns the most efficient way to encode the request. | ||
* If the message is marked as modified by the transforms, the parsed redis frame is returned, the encoder must reencode the bytes from the frame. | ||
* If the message is not marked as modified, the raw bytes are returned and the encoder can simply write the raw bytes to the socket which is much faster. | ||
|
||
In our example the request is unmodified so we take the fast path by directly writing the bytes. | ||
|
||
You will recall that the original message was: | ||
|
||
```text | ||
*3 | ||
$3 | ||
SET | ||
$3 | ||
foo | ||
$3 | ||
bar | ||
``` | ||
|
||
That message is what is written out over TCP. | ||
|
||
Additionally sink `ValkeyEncoder` sends a message to its sink `ValkeyDecoder` counterpart. | ||
`Encoders` and `Decoders` are always created in pairs allowing them to be assigned a shared channel at creation. This allows them to share state which is a requirement for working around stateful protocols. | ||
The message sent to the `ValkeyEncoder` is a [RequestInfo](https://github.com/shotover/shotover-proxy/blob/33f49fc1976df84ed538c9f58dd51a160b642968/shotover/src/codec/valkey.rs#L52) which tells the decoder: | ||
|
||
* the request ID of the next response it will receive. | ||
* in our case the ID was originally set to `0xd12ac2704d19e53ef3fea94b4885c950` | ||
* Whether the last request was a request to enter pubsub mode. | ||
* In our case it is a simple `SET` request, so we are not entering pubsub mode. | ||
|
||
## Valkey Instance | ||
|
||
The request is received by Valkey. | ||
|
||
Valkey modifies its internal value of `foo` to contain `bar` and then sends back a success response. | ||
The success response is encoded in RESP as: | ||
|
||
```text | ||
+OK | ||
``` | ||
|
||
## ValkeyDecoder | ||
|
||
Now the `ValkeyDecoder` is used again, but this time its for decoding a response instead of a request. Parsing a response has a few extra complexities that we don't have to deal with when parsing a request. | ||
|
||
The response is parsed as `ValkeyFrame::SimpleString("+OK")` | ||
|
||
### pubsub | ||
|
||
Due to various oddities of different protocols we often need to add extra logic to an `Encoder` or `Decoder`. | ||
In the case of Valkey, we need some special logic to handle pubsub messages. | ||
Usually shotover operates | ||
|
||
## SinkConnection | ||
|
||
## ValkeySinkCluster | ||
|
||
ValkeySinkCluster then requests the |
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.