Skip to content

Commit

Permalink
Merge branch 'main' into update_metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Feb 8, 2024
2 parents ef4ed11 + bd7fbe8 commit 30dd96e
Show file tree
Hide file tree
Showing 12 changed files with 113 additions and 115 deletions.
11 changes: 10 additions & 1 deletion shotover-proxy/benches/windsock/kafka/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,12 +287,21 @@ impl Bench for KafkaBench {
let shotover_ip = shotover_instance
.as_ref()
.map(|x| x.instance.private_ip().to_string());
let shotover_connect_ip = match self.topology {
KafkaTopology::Single => kafka_instances
.first()
.as_ref()
.map(|x| x.instance.connect_ip().to_string()),
KafkaTopology::Cluster1 | KafkaTopology::Cluster3 => shotover_instance
.as_ref()
.map(|x| x.instance.connect_ip().to_string()),
};

let mut profiler = CloudProfilerRunner::new(
self.name(),
profiling,
profiler_instances,
&Some(kafka_ip.clone()),
&shotover_connect_ip,
)
.await;

Expand Down
15 changes: 11 additions & 4 deletions shotover-proxy/benches/windsock/redis/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,10 +251,17 @@ impl Bench for RedisBench {
let shotover_ip = shotover_instance
.as_ref()
.map(|x| x.instance.private_ip().to_string());

let mut profiler =
CloudProfilerRunner::new(self.name(), profiling, profiler_instances, &shotover_ip)
.await;
let shotover_connect_ip = shotover_instance
.as_ref()
.map(|x| x.instance.connect_ip().to_string());

let mut profiler = CloudProfilerRunner::new(
self.name(),
profiling,
profiler_instances,
&shotover_connect_ip,
)
.await;

let (_, running_shotover) = futures::join!(
redis_instances.run(self.encryption),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ networks:

services:
cassandra1_1:
image: &image library/cassandra:4.0.6
image: &image shotover/cassandra-test:4.0.6-r1
networks:
cluster_subnet:
ipv4_address: 172.16.1.2
Expand All @@ -21,6 +21,7 @@ services:
CASSANDRA_RACK: rack1
CASSANDRA_DC: datacenter1
CASSANDRA_ENDPOINT_SNITCH: GossipingPropertyFileSnitch
CASSANDRA_INITIAL_TOKENS: -1581530985297236285,-2672828095989448545,-3768869996224386834,-5547504297047973963,-6525649749564088038,-737154337806074529,-8144673045261131336,1774614508436541522,2792465913178968537,3610137845791319332,4685893727016898871,5421809419727792512,6501182227175394170,703113892096508709,7744702232855986142,8712066475934521641
MAX_HEAP_SIZE: "400M"
MIN_HEAP_SIZE: "400M"
HEAP_NEWSIZE: "48M"
Expand All @@ -35,6 +36,7 @@ services:
environment:
<<: *environment
CASSANDRA_RACK: rack1
CASSANDRA_INITIAL_TOKENS: -1498046278553887682,-2233961971264781322,-3309717852490360862,-4127389785102711656,-418673471106286025,-5145241189845138672,-6216741806185171484,-7657010036087754724,-8501386683578916482,1792210777652841446,3382215330166740084,5001238625863783381,5979384078379897457,7758018379203484585,824846534574305947,8854060279438422874
volumes: *volumes

cassandra2_1:
Expand All @@ -45,6 +47,7 @@ services:
environment:
<<: *environment
CASSANDRA_RACK: rack2
CASSANDRA_INITIAL_TOKENS: -113196755271768530,-1209238655506706819,-2987872956330293948,-3966018408846408023,-5585041704543451321,-7175046257057349959,-8142410500135885458,1822477002911605486,3262745232814188724,4334245849154221537,5352097253896648552,6169769186508999347,7245525067734578886,7981440760445472527,9060813567893074185,978100355420443729
volumes: *volumes
cassandra2_2:
image: *image
Expand All @@ -54,6 +57,7 @@ services:
environment:
<<: *environment
CASSANDRA_RACK: rack2
CASSANDRA_INITIAL_TOKENS: -1332542431349867968,-2576062437030459942,-365178188271332470,-3655435244478061599,-4391350937188955240,-5467106818414534780,-6284778751026885574,-7302630155769312590,-8374130772109345403,1224826364242566167,2843849659939609464,3821995112455723540,5600629413279310668,6696671313514248957,7787968424206461216,8632345071697622974
volumes: *volumes

cassandra3_1:
Expand All @@ -64,6 +68,7 @@ services:
environment:
<<: *environment
CASSANDRA_RACK: rack3
CASSANDRA_INITIAL_TOKENS: -1219526640376827828,-2197672092892941903,-3816695388589985200,-5406699941103883838,-6374064184182419337,-7617584189863011311,-8696956997310612969,1655149560681697589,2746446671373909848,3590823318865071605,5031091548767654843,559107660446759300,6102592165107687656,7120443569850114671,7938115502462465466,9013871383688045005
volumes: *volumes
cassandra3_2:
image: *image
Expand All @@ -73,4 +78,5 @@ services:
environment:
<<: *environment
CASSANDRA_RACK: rack3
CASSANDRA_INITIAL_TOKENS: -2403785647040412411,-3248162294531574167,-4339459405223786427,-5435501305458724716,-7214135606282311845,-8192281058798425921,-963517417137829172,107983199202203640,1125834603944630655,1943506536556981450,3019262417782560989,3755178110493454630,4834550917941056288,6078070923621648260,7045435166700183759,8635439719214082397
volumes: *volumes
2 changes: 1 addition & 1 deletion shotover/benches/benches/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use shotover::transforms::throttling::RequestThrottlingConfig;
use shotover::transforms::{TransformConfig, Wrapper};

fn criterion_benchmark(c: &mut Criterion) {
super::init();
crate::init();
let rt = tokio::runtime::Runtime::new().unwrap();
let mut group = c.benchmark_group("transform");
group.noise_threshold(0.2);
Expand Down
1 change: 1 addition & 0 deletions shotover/benches/benches/codec/cassandra.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use shotover::message::Message;
use tokio_util::codec::{Decoder, Encoder};

fn criterion_benchmark(c: &mut Criterion) {
crate::init();
let mut group = c.benchmark_group("cassandra_codec");
group.noise_threshold(0.2);

Expand Down
1 change: 1 addition & 0 deletions shotover/benches/benches/codec/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ const KAFKA_REQUESTS: &[(&[u8], &str)] = &[
];

fn criterion_benchmark(c: &mut Criterion) {
crate::init();
let mut group = c.benchmark_group("kafka_codec");
group.noise_threshold(0.2);

Expand Down
4 changes: 4 additions & 0 deletions shotover/benches/benches/main.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
use criterion::criterion_main;
use metrics_exporter_prometheus::PrometheusBuilder;

mod chain;
mod codec;

fn init() {
std::env::set_var("RUST_BACKTRACE", "1");
std::env::set_var("RUST_LIB_BACKTRACE", "0");

let recorder = PrometheusBuilder::new().build_recorder();
metrics::set_boxed_recorder(Box::new(recorder)).ok();
}

criterion_main!(
Expand Down
85 changes: 36 additions & 49 deletions shotover/src/codec/cassandra.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,6 @@ pub struct CassandraDecoder {
version: Arc<AtomicVersionState>,
compression: Arc<AtomicCompressionState>,
handshake_complete: Arc<AtomicBool>,
messages: Vec<Message>,
current_use_keyspace: Option<Identifier>,
direction: Direction,
version_counter: VersionCounter,
Expand All @@ -194,7 +193,6 @@ impl CassandraDecoder {
version,
compression,
handshake_complete,
messages: vec![],
current_use_keyspace: None,
direction,
version_counter,
Expand Down Expand Up @@ -575,58 +573,47 @@ impl Decoder for CassandraDecoder {
let handshake_complete = self.handshake_complete.load(Ordering::Relaxed);
let received_at = Instant::now();

loop {
match self.check_size(src, version, compression, handshake_complete) {
Ok(frame_len) => {
let mut messages = self
.decode_frame(
src,
frame_len,
version,
compression,
handshake_complete,
received_at,
)
.map_err(CodecReadError::Parser)?;

for message in messages.iter_mut() {
if let Ok(Metadata::Cassandra(CassandraMetadata {
opcode: Opcode::Query | Opcode::Batch,
..
})) = message.metadata()
{
if let Some(keyspace) = get_use_keyspace(message) {
self.current_use_keyspace = Some(keyspace);
}

if let Some(keyspace) = &self.current_use_keyspace {
set_default_keyspace(message, keyspace);
}
match self.check_size(src, version, compression, handshake_complete) {
Ok(frame_len) => {
let mut messages = self
.decode_frame(
src,
frame_len,
version,
compression,
handshake_complete,
received_at,
)
.map_err(CodecReadError::Parser)?;

for message in messages.iter_mut() {
if let Ok(Metadata::Cassandra(CassandraMetadata {
opcode: Opcode::Query | Opcode::Batch,
..
})) = message.metadata()
{
if let Some(keyspace) = get_use_keyspace(message) {
self.current_use_keyspace = Some(keyspace);
}
}

self.messages.append(&mut messages);
}
Err(CheckFrameSizeError::NotEnoughBytes) => {
if self.messages.is_empty() || src.remaining() != 0 {
return Ok(None);
} else {
return Ok(Some(std::mem::take(&mut self.messages)));
if let Some(keyspace) = &self.current_use_keyspace {
set_default_keyspace(message, keyspace);
}
}
}
Err(CheckFrameSizeError::UnsupportedVersion(version)) => {
return Err(reject_protocol_version(version));
}
Err(CheckFrameSizeError::UnsupportedCompression(msg)) => {
return Err(CodecReadError::Parser(anyhow!(msg)));
}
err => {
return Err(CodecReadError::Parser(anyhow!(
"Failed to parse frame {:?}",
err
)))
}
Ok(Some(messages))
}
Err(CheckFrameSizeError::NotEnoughBytes) => Ok(None),
Err(CheckFrameSizeError::UnsupportedVersion(version)) => {
Err(reject_protocol_version(version))
}
Err(CheckFrameSizeError::UnsupportedCompression(msg)) => {
Err(CodecReadError::Parser(anyhow!(msg)))
}
err => Err(CodecReadError::Parser(anyhow!(
"Failed to parse frame {:?}",
err
))),
}
}
}
Expand Down
39 changes: 17 additions & 22 deletions shotover/src/codec/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::codec::{CodecBuilder, CodecReadError};
use crate::frame::MessageType;
use crate::message::{Encodable, Message, Messages, ProtocolType};
use anyhow::{anyhow, Result};
use bytes::{Buf, BytesMut};
use bytes::BytesMut;
use kafka_protocol::messages::ApiKey;
use metrics::Histogram;
use std::sync::mpsc;
Expand Down Expand Up @@ -59,7 +59,6 @@ impl CodecBuilder for KafkaCodecBuilder {

pub struct KafkaDecoder {
request_header_rx: Option<mpsc::Receiver<RequestHeader>>,
messages: Messages,
direction: Direction,
}

Expand All @@ -70,7 +69,6 @@ impl KafkaDecoder {
) -> Self {
KafkaDecoder {
request_header_rx,
messages: vec![],
direction,
}
}
Expand All @@ -95,31 +93,28 @@ impl Decoder for KafkaDecoder {

fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
let received_at = Instant::now();
loop {
if let Some(size) = get_length_of_full_message(src) {
let bytes = src.split_to(size);
tracing::debug!(
"{}: incoming kafka message:\n{}",
self.direction,
pretty_hex::pretty_hex(&bytes)
);
let request_header = if let Some(rx) = self.request_header_rx.as_ref() {
if let Some(size) = get_length_of_full_message(src) {
let bytes = src.split_to(size);
tracing::debug!(
"{}: incoming kafka message:\n{}",
self.direction,
pretty_hex::pretty_hex(&bytes)
);
let request_header =
if let Some(rx) = self.request_header_rx.as_ref() {
Some(rx.recv().map_err(|_| {
CodecReadError::Parser(anyhow!("kafka encoder half was lost"))
})?)
} else {
None
};
self.messages.push(Message::from_bytes_at_instant(
bytes.freeze(),
ProtocolType::Kafka { request_header },
Some(received_at),
));
} else if self.messages.is_empty() || src.remaining() != 0 {
return Ok(None);
} else {
return Ok(Some(std::mem::take(&mut self.messages)));
}
Ok(Some(vec![Message::from_bytes_at_instant(
bytes.freeze(),
ProtocolType::Kafka { request_header },
Some(received_at),
)]))
} else {
Ok(None)
}
}
}
Expand Down
46 changes: 17 additions & 29 deletions shotover/src/codec/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::codec::{CodecBuilder, CodecReadError};
use crate::frame::{Frame, MessageType};
use crate::message::{Encodable, Message, Messages};
use anyhow::{anyhow, Result};
use bytes::{Buf, BytesMut};
use bytes::BytesMut;
use metrics::Histogram;
use redis_protocol::resp2::prelude::decode_mut;
use redis_protocol::resp2::prelude::encode_bytes;
Expand Down Expand Up @@ -47,16 +47,12 @@ pub struct RedisEncoder {
}

pub struct RedisDecoder {
messages: Messages,
direction: Direction,
}

impl RedisDecoder {
pub fn new(direction: Direction) -> Self {
Self {
messages: Vec::new(),
direction,
}
Self { direction }
}
}

Expand All @@ -66,30 +62,22 @@ impl Decoder for RedisDecoder {

fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
let received_at = Instant::now();
loop {
match decode_mut(src).map_err(|e| {
CodecReadError::Parser(anyhow!(e).context("Error decoding redis frame"))
})? {
Some((frame, _size, bytes)) => {
tracing::debug!(
"{}: incoming redis message:\n{}",
self.direction,
pretty_hex::pretty_hex(&bytes)
);
self.messages.push(Message::from_bytes_and_frame_at_instant(
bytes,
Frame::Redis(frame),
Some(received_at),
));
}
None => {
if self.messages.is_empty() || src.remaining() != 0 {
return Ok(None);
} else {
return Ok(Some(std::mem::take(&mut self.messages)));
}
}
match decode_mut(src)
.map_err(|e| CodecReadError::Parser(anyhow!(e).context("Error decoding redis frame")))?
{
Some((frame, _size, bytes)) => {
tracing::debug!(
"{}: incoming redis message:\n{}",
self.direction,
pretty_hex::pretty_hex(&bytes)
);
Ok(Some(vec![Message::from_bytes_and_frame_at_instant(
bytes,
Frame::Redis(frame),
Some(received_at),
)]))
}
None => Ok(None),
}
}
}
Expand Down
Loading

0 comments on commit 30dd96e

Please sign in to comment.