Skip to content

Commit

Permalink
Merge branch 'main' into redesign_windsock_cli
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Feb 16, 2024
2 parents 732c6ba + ed2c332 commit 347bad3
Show file tree
Hide file tree
Showing 11 changed files with 628 additions and 317 deletions.
61 changes: 41 additions & 20 deletions shotover/src/codec/kafka.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::{message_latency, CodecWriteError, Direction};
use crate::codec::{CodecBuilder, CodecReadError};
use crate::frame::MessageType;
use crate::message::{Encodable, Message, Messages, ProtocolType};
use crate::message::{Encodable, Message, MessageId, Messages, ProtocolType};
use anyhow::{anyhow, Result};
use bytes::BytesMut;
use kafka_protocol::messages::ApiKey;
Expand Down Expand Up @@ -57,14 +57,20 @@ impl CodecBuilder for KafkaCodecBuilder {
}
}

pub struct RequestInfo {
header: RequestHeader,
id: MessageId,
}

pub struct KafkaDecoder {
request_header_rx: Option<mpsc::Receiver<RequestHeader>>,
// Some when Sink (because it receives responses)
request_header_rx: Option<mpsc::Receiver<RequestInfo>>,
direction: Direction,
}

impl KafkaDecoder {
pub fn new(
request_header_rx: Option<mpsc::Receiver<RequestHeader>>,
request_header_rx: Option<mpsc::Receiver<RequestInfo>>,
direction: Direction,
) -> Self {
KafkaDecoder {
Expand Down Expand Up @@ -100,19 +106,29 @@ impl Decoder for KafkaDecoder {
self.direction,
pretty_hex::pretty_hex(&bytes)
);
let request_header =
if let Some(rx) = self.request_header_rx.as_ref() {
Some(rx.recv().map_err(|_| {
CodecReadError::Parser(anyhow!("kafka encoder half was lost"))
})?)
} else {
None
};
Ok(Some(vec![Message::from_bytes_at_instant(
bytes.freeze(),
ProtocolType::Kafka { request_header },
Some(received_at),
)]))
let message = if let Some(rx) = self.request_header_rx.as_ref() {
let RequestInfo { header, id } = rx
.recv()
.map_err(|_| CodecReadError::Parser(anyhow!("kafka encoder half was lost")))?;
let mut message = Message::from_bytes_at_instant(
bytes.freeze(),
ProtocolType::Kafka {
request_header: Some(header),
},
Some(received_at),
);
message.set_request_id(id);
message
} else {
Message::from_bytes_at_instant(
bytes.freeze(),
ProtocolType::Kafka {
request_header: None,
},
Some(received_at),
)
};
Ok(Some(vec![message]))
} else {
Ok(None)
}
Expand All @@ -121,13 +137,14 @@ impl Decoder for KafkaDecoder {

pub struct KafkaEncoder {
message_latency: Histogram,
request_header_tx: Option<mpsc::Sender<RequestHeader>>,
// Some when Sink (because it sends requests)
request_header_tx: Option<mpsc::Sender<RequestInfo>>,
direction: Direction,
}

impl KafkaEncoder {
pub fn new(
request_header_tx: Option<mpsc::Sender<RequestHeader>>,
request_header_tx: Option<mpsc::Sender<RequestInfo>>,
direction: Direction,
message_latency: Histogram,
) -> Self {
Expand All @@ -147,6 +164,7 @@ impl Encoder<Messages> for KafkaEncoder {
let start = dst.len();
m.ensure_message_type(MessageType::Kafka)
.map_err(CodecWriteError::Encoder)?;
let id = m.id();
let received_at = m.received_from_source_or_sink_at;
let result = match m.into_encodable() {
Encodable::Bytes(bytes) => {
Expand All @@ -161,8 +179,11 @@ impl Encoder<Messages> for KafkaEncoder {
let version = i16::from_be_bytes(dst[start + 6..start + 8].try_into().unwrap());
let api_key = ApiKey::try_from(api_key)
.map_err(|_| CodecWriteError::Encoder(anyhow!("unknown api key {api_key}")))?;
tx.send(RequestHeader { api_key, version })
.map_err(|e| CodecWriteError::Encoder(anyhow!(e)))?;
tx.send(RequestInfo {
header: RequestHeader { api_key, version },
id,
})
.map_err(|e| CodecWriteError::Encoder(anyhow!(e)))?;
}
if let Some(received_at) = received_at {
self.message_latency.record(received_at.elapsed());
Expand Down
102 changes: 87 additions & 15 deletions shotover/src/codec/opensearch.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
use super::{CodecBuilder, CodecReadError, CodecWriteError, Direction};
use crate::frame::{
opensearch::{HttpHead, RequestParts, ResponseParts},
Frame, MessageType, OpenSearchFrame,
};
use crate::message::{Encodable, Message, Messages};
use crate::{
frame::{
opensearch::{HttpHead, RequestParts, ResponseParts},
Frame, MessageType, OpenSearchFrame,
},
message::MessageId,
};
use anyhow::{anyhow, Result};
use bytes::{Buf, BytesMut};
use http::{header, HeaderName, HeaderValue, Method, Request, Response};
use metrics::Histogram;
use std::{
sync::{Arc, Mutex},
sync::{mpsc, Arc, Mutex},
time::Instant,
};
use tokio_util::codec::{Decoder, Encoder};
Expand All @@ -35,9 +38,17 @@ impl CodecBuilder for OpenSearchCodecBuilder {
fn build(&self) -> (OpenSearchDecoder, OpenSearchEncoder) {
let last_outgoing_method = Arc::new(Mutex::new(None));

let (tx, rx) = match self.direction {
Direction::Source => (None, None),
Direction::Sink => {
let (tx, rx) = mpsc::channel();
(Some(tx), Some(rx))
}
};
(
OpenSearchDecoder::new(self.direction, last_outgoing_method.clone()),
OpenSearchDecoder::new(rx, self.direction, last_outgoing_method.clone()),
OpenSearchEncoder::new(
tx,
self.direction,
last_outgoing_method,
self.message_latency.clone(),
Expand All @@ -51,6 +62,8 @@ impl CodecBuilder for OpenSearchCodecBuilder {
}

pub struct OpenSearchDecoder {
// Some when Sink (because it receives responses)
request_header_rx: Option<mpsc::Receiver<MessageId>>,
direction: Direction,
state: State,
last_outgoing_method: Arc<Mutex<Option<Method>>>,
Expand All @@ -63,8 +76,13 @@ struct DecodeResult {
}

impl OpenSearchDecoder {
pub fn new(direction: Direction, last_outgoing_method: Arc<Mutex<Option<Method>>>) -> Self {
pub fn new(
request_header_rx: Option<mpsc::Receiver<MessageId>>,
direction: Direction,
last_outgoing_method: Arc<Mutex<Option<Method>>>,
) -> Self {
Self {
request_header_rx,
direction,
state: State::ParsingResponse,
last_outgoing_method,
Expand Down Expand Up @@ -233,10 +251,17 @@ impl Decoder for OpenSearchDecoder {
}

let body = src.split_to(content_length).freeze();
return Ok(Some(vec![Message::from_frame_at_instant(
let mut message = Message::from_frame_at_instant(
Frame::OpenSearch(OpenSearchFrame::new(http_headers, body)),
Some(received_at),
)]));
);
if let Some(rx) = self.request_header_rx.as_ref() {
let id = rx.recv().map_err(|_| {
CodecReadError::Parser(anyhow!("opensearch encoder half was lost"))
})?;
message.set_request_id(id);
}
return Ok(Some(vec![message]));
}
}
}
Expand All @@ -247,15 +272,19 @@ pub struct OpenSearchEncoder {
direction: Direction,
last_outgoing_method: Arc<Mutex<Option<Method>>>,
message_latency: Histogram,
// Some when Sink (because it sends requests)
request_header_tx: Option<mpsc::Sender<MessageId>>,
}

impl OpenSearchEncoder {
pub fn new(
request_header_tx: Option<mpsc::Sender<MessageId>>,
direction: Direction,
last_outgoing_method: Arc<Mutex<Option<Method>>>,
message_latency: Histogram,
) -> Self {
Self {
request_header_tx,
direction,
last_outgoing_method,
message_latency,
Expand All @@ -276,6 +305,10 @@ impl Encoder<Messages> for OpenSearchEncoder {
m.ensure_message_type(MessageType::OpenSearch)
.map_err(CodecWriteError::Encoder)?;
let received_at = m.received_from_source_or_sink_at;
if let Some(tx) = self.request_header_tx.as_ref() {
tx.send(m.id())
.map_err(|e| CodecWriteError::Encoder(anyhow!(e)))?;
}
let result = match m.into_encodable() {
Encodable::Bytes(bytes) => {
dst.extend_from_slice(&bytes);
Expand Down Expand Up @@ -344,13 +377,52 @@ impl Encoder<Messages> for OpenSearchEncoder {

#[cfg(test)]
mod opensearch_tests {
use crate::codec::{opensearch::OpenSearchCodecBuilder, CodecBuilder, Direction};
use bytes::BytesMut;
use crate::{
codec::{opensearch::OpenSearchCodecBuilder, CodecBuilder, Direction},
frame::{
opensearch::{HttpHead, RequestParts},
Frame, OpenSearchFrame,
},
message::Message,
};
use bytes::{Bytes, BytesMut};
use http::{Method, Version};
use tokio_util::codec::{Decoder, Encoder};

fn test_frame(raw_frame: &[u8], direction: Direction) {
fn assert_decode_encode_request(raw_frame: &[u8]) {
let (mut decoder, mut encoder) =
OpenSearchCodecBuilder::new(Direction::Source, "opensearch".to_owned()).build();

let message = decoder
.decode(&mut BytesMut::from(raw_frame))
.unwrap()
.unwrap();

let mut dest = BytesMut::new();
encoder.encode(message, &mut dest).unwrap();
assert_eq!(raw_frame, &dest);
}

fn assert_decode_encode_response(raw_frame: &[u8]) {
let (mut decoder, mut encoder) =
OpenSearchCodecBuilder::new(direction, "opensearch".to_owned()).build();
OpenSearchCodecBuilder::new(Direction::Sink, "opensearch".to_owned()).build();

// set the required state for decoding a response
encoder
.encode(
vec![Message::from_frame(Frame::OpenSearch(OpenSearchFrame {
headers: HttpHead::Request(RequestParts {
method: Method::GET,
uri: "/foo".parse().unwrap(),
version: Version::HTTP_11,
headers: Default::default(),
}),
body: Bytes::new(),
}))],
&mut BytesMut::new(),
)
.unwrap();

let message = decoder
.decode(&mut BytesMut::from(raw_frame))
.unwrap()
Expand Down Expand Up @@ -378,11 +450,11 @@ mod opensearch_tests {

#[test]
fn test_request() {
test_frame(&REQUEST, Direction::Source);
assert_decode_encode_request(&REQUEST);
}

#[test]
fn test_response() {
test_frame(&RESPONSE, Direction::Sink);
assert_decode_encode_response(&RESPONSE);
}
}
Loading

0 comments on commit 347bad3

Please sign in to comment.