Skip to content

Commit

Permalink
Transforms use dynamic dispatch (#1443)
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Jan 31, 2024
1 parent 18f941f commit 4c84f75
Show file tree
Hide file tree
Showing 37 changed files with 324 additions and 430 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 23 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,29 @@ This assists us in knowing when to make the next release a breaking release and

## 0.3.0

## shotover rust API

`TransformBuilder::build` now returns `Box<dyn Transform>` instead of `Transforms`.
This means that custom transforms should implement the builder as:

```rust
impl TransformBuilder for CustomBuilder {
fn build(&self) -> Box<dyn Transform> {
Box::new(CustomTransform::new())
}
}
```

Instead of:

```rust
impl TransformBuilder for CustomBuilder {
fn build(&self) -> Transforms {
Transforms::Custom(CustomTransform::new())
}
}
```

### metrics

The prometheus metrics were renamed to better follow the official reccomended naming scheme: <https://prometheus.io/docs/practices/naming/>
Expand Down
15 changes: 10 additions & 5 deletions custom-transforms-example/src/redis_get_rewrite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use shotover::frame::{Frame, RedisFrame};
use shotover::message::Messages;
use shotover::transforms::{Transform, TransformBuilder, TransformConfig, Transforms, Wrapper};
use shotover::transforms::{Transform, TransformBuilder, TransformConfig, Wrapper};

#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(deny_unknown_fields)]
pub struct RedisGetRewriteConfig {
pub result: String,
}

const NAME: &str = "RedisGetRewrite";
#[typetag::serde(name = "RedisGetRewrite")]
#[async_trait(?Send)]
impl TransformConfig for RedisGetRewriteConfig {
Expand All @@ -27,14 +28,14 @@ pub struct RedisGetRewriteBuilder {
}

impl TransformBuilder for RedisGetRewriteBuilder {
fn build(&self) -> Transforms {
Transforms::Custom(Box::new(RedisGetRewrite {
fn build(&self) -> Box<dyn Transform> {
Box::new(RedisGetRewrite {
result: self.result.clone(),
}))
})
}

fn get_name(&self) -> &'static str {
"RedisGetRewrite"
NAME
}
}

Expand All @@ -44,6 +45,10 @@ pub struct RedisGetRewrite {

#[async_trait]
impl Transform for RedisGetRewrite {
fn get_name(&self) -> &'static str {
NAME
}

async fn transform<'a>(&'a mut self, mut requests_wrapper: Wrapper<'a>) -> Result<Messages> {
let mut get_indices = vec![];
for (i, message) in requests_wrapper.requests.iter_mut().enumerate() {
Expand Down
1 change: 0 additions & 1 deletion shotover/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ ordered-float.workspace = true
#Crypto
aws-config = "1.0.0"
aws-sdk-kms = "1.1.0"
strum_macros = "0.26"
chacha20poly1305 = { version = "0.10.0", features = ["std"] }
generic-array = { version = "0.14", features = ["serde"] }
kafka-protocol = "0.8.0"
Expand Down
1 change: 1 addition & 0 deletions shotover/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#![deny(clippy::print_stdout)]
#![deny(clippy::print_stderr)]
#![allow(clippy::needless_doctest_main)]
#![allow(clippy::box_default)]

pub mod codec;
pub mod config;
Expand Down
14 changes: 1 addition & 13 deletions shotover/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
use crate::config::topology::Topology;
use crate::config::Config;
use crate::observability::LogFilterHttpExporter;
use crate::transforms::Transforms;
use crate::transforms::Wrapper;
use anyhow::Context;
use anyhow::{anyhow, Result};
use clap::{crate_version, Parser};
Expand All @@ -13,7 +11,7 @@ use std::net::SocketAddr;
use tokio::runtime::{self, Runtime};
use tokio::signal::unix::{signal, SignalKind};
use tokio::sync::watch;
use tracing::{debug, error, info};
use tracing::{error, info};
use tracing_appender::non_blocking::{NonBlocking, WorkerGuard};
use tracing_subscriber::filter::Directive;
use tracing_subscriber::fmt::format::DefaultFields;
Expand Down Expand Up @@ -302,16 +300,6 @@ async fn run(
info!(configuration = ?config);
info!(topology = ?topology);

debug!(
"Transform overhead size on stack is {}",
std::mem::size_of::<Transforms>()
);

debug!(
"Wrapper overhead size on stack is {}",
std::mem::size_of::<Wrapper<'_>>()
);

match topology.run_chains(trigger_shutdown_rx).await {
Ok(sources) => {
futures::future::join_all(sources.into_iter().map(|x| x.into_join_handle())).await;
Expand Down
13 changes: 9 additions & 4 deletions shotover/src/transforms/cassandra/peers_rewrite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::frame::{
};
use crate::message::{Message, Messages};
use crate::transforms::cassandra::peers_rewrite::CassandraOperation::Event;
use crate::transforms::{Transform, TransformBuilder, TransformConfig, Transforms, Wrapper};
use crate::transforms::{Transform, TransformBuilder, TransformConfig, Wrapper};
use anyhow::Result;
use async_trait::async_trait;
use cassandra_protocol::frame::events::{ServerEvent, StatusChange};
Expand All @@ -19,6 +19,7 @@ pub struct CassandraPeersRewriteConfig {
pub port: u16,
}

const NAME: &str = "CassandraPeersRewrite";
#[typetag::serde(name = "CassandraPeersRewrite")]
#[async_trait(?Send)]
impl TransformConfig for CassandraPeersRewriteConfig {
Expand All @@ -43,17 +44,21 @@ impl CassandraPeersRewrite {
}

impl TransformBuilder for CassandraPeersRewrite {
fn build(&self) -> Transforms {
Transforms::CassandraPeersRewrite(self.clone())
fn build(&self) -> Box<dyn Transform> {
Box::new(self.clone())
}

fn get_name(&self) -> &'static str {
"CassandraPeersRewrite"
NAME
}
}

#[async_trait]
impl Transform for CassandraPeersRewrite {
fn get_name(&self) -> &'static str {
NAME
}

async fn transform<'a>(&'a mut self, mut requests_wrapper: Wrapper<'a>) -> Result<Messages> {
// Find the indices of queries to system.peers & system.peers_v2
// we need to know which columns in which CQL queries in which messages have system peers
Expand Down
15 changes: 10 additions & 5 deletions shotover/src/transforms/cassandra/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::frame::{CassandraFrame, CassandraOperation, CassandraResult, Frame};
use crate::message::{Message, Messages, Metadata};
use crate::tls::{TlsConnector, TlsConnectorConfig};
use crate::transforms::cassandra::connection::{CassandraConnection, Response, ResponseError};
use crate::transforms::{Transform, TransformBuilder, TransformConfig, Transforms, Wrapper};
use crate::transforms::{Transform, TransformBuilder, TransformConfig, Wrapper};
use anyhow::{anyhow, Context, Result};
use async_trait::async_trait;
use cassandra_protocol::events::ServerEvent;
Expand Down Expand Up @@ -62,6 +62,7 @@ pub struct CassandraSinkClusterConfig {
pub read_timeout: Option<u64>,
}

const NAME: &str = "CassandraSinkCluster";
#[typetag::serde(name = "CassandraSinkCluster")]
#[async_trait(?Send)]
impl TransformConfig for CassandraSinkClusterConfig {
Expand Down Expand Up @@ -152,8 +153,8 @@ impl CassandraSinkClusterBuilder {
}

impl TransformBuilder for CassandraSinkClusterBuilder {
fn build(&self) -> crate::transforms::Transforms {
Transforms::CassandraSinkCluster(Box::new(CassandraSinkCluster {
fn build(&self) -> Box<dyn Transform> {
Box::new(CassandraSinkCluster {
contact_points: self.contact_points.clone(),
message_rewriter: self.message_rewriter.clone(),
control_connection: None,
Expand All @@ -170,11 +171,11 @@ impl TransformBuilder for CassandraSinkClusterBuilder {
keyspaces_rx: self.keyspaces_rx.clone(),
rng: SmallRng::from_rng(rand::thread_rng()).unwrap(),
task_handshake_tx: self.task_handshake_tx.clone(),
}))
})
}

fn get_name(&self) -> &'static str {
"CassandraSinkCluster"
NAME
}

fn is_terminating(&self) -> bool {
Expand Down Expand Up @@ -718,6 +719,10 @@ fn is_use_statement_successful(response: Option<Result<Response>>) -> bool {

#[async_trait]
impl Transform for CassandraSinkCluster {
fn get_name(&self) -> &'static str {
NAME
}

async fn transform<'a>(&'a mut self, requests_wrapper: Wrapper<'a>) -> Result<Messages> {
self.send_message(requests_wrapper.requests).await
}
Expand Down
13 changes: 9 additions & 4 deletions shotover/src/transforms/cassandra/sink_single.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::frame::cassandra::CassandraMetadata;
use crate::message::{Messages, Metadata};
use crate::tls::{TlsConnector, TlsConnectorConfig};
use crate::transforms::cassandra::connection::Response;
use crate::transforms::{Transform, TransformBuilder, TransformConfig, Transforms, Wrapper};
use crate::transforms::{Transform, TransformBuilder, TransformConfig, Wrapper};
use anyhow::{anyhow, Result};
use async_trait::async_trait;
use cassandra_protocol::frame::Version;
Expand All @@ -25,6 +25,7 @@ pub struct CassandraSinkSingleConfig {
pub read_timeout: Option<u64>,
}

const NAME: &str = "CassandraSinkSingle";
#[typetag::serde(name = "CassandraSinkSingle")]
#[async_trait(?Send)]
impl TransformConfig for CassandraSinkSingleConfig {
Expand Down Expand Up @@ -77,8 +78,8 @@ impl CassandraSinkSingleBuilder {
}

impl TransformBuilder for CassandraSinkSingleBuilder {
fn build(&self) -> Transforms {
Transforms::CassandraSinkSingle(CassandraSinkSingle {
fn build(&self) -> Box<dyn Transform> {
Box::new(CassandraSinkSingle {
outbound: None,
version: self.version,
address: self.address.clone(),
Expand All @@ -92,7 +93,7 @@ impl TransformBuilder for CassandraSinkSingleBuilder {
}

fn get_name(&self) -> &'static str {
"CassandraSinkSingle"
NAME
}

fn is_terminating(&self) -> bool {
Expand Down Expand Up @@ -168,6 +169,10 @@ impl CassandraSinkSingle {

#[async_trait]
impl Transform for CassandraSinkSingle {
fn get_name(&self) -> &'static str {
NAME
}

async fn transform<'a>(&'a mut self, requests_wrapper: Wrapper<'a>) -> Result<Messages> {
self.send_message(requests_wrapper.requests).await
}
Expand Down
21 changes: 4 additions & 17 deletions shotover/src/transforms/chain.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::message::Messages;
use crate::transforms::{TransformBuilder, Transforms, Wrapper};
use crate::transforms::{Transform, TransformBuilder, Wrapper};
use anyhow::{anyhow, Result};
use derivative::Derivative;
use futures::TryFutureExt;
Expand Down Expand Up @@ -51,18 +51,13 @@ impl BufferedChainMessages {
/// Transform chains can be of arbitary complexity and a transform can even have its own set of child transform chains.
/// Transform chains are defined by the user in Shotover's configuration file and are linked to sources.
///
/// The transform chain is a vector of mutable references to the enum [Transforms] (which is an enum dispatch wrapper around the various transform types).
#[derive(Derivative)]
#[derivative(Debug)]
/// The transform chain is a vector of mutable references to the enum [Transform] (which is an enum dispatch wrapper around the various transform types).
pub struct TransformChain {
pub name: &'static str,
pub chain: InnerChain,

#[derivative(Debug = "ignore")]
chain_total: Counter,
#[derivative(Debug = "ignore")]
chain_failures: Counter,
#[derivative(Debug = "ignore")]
chain_batch_size: Histogram,
}

Expand Down Expand Up @@ -194,27 +189,19 @@ impl TransformChain {
}
}

#[derive(Derivative)]
#[derivative(Debug)]
pub struct TransformAndMetrics {
pub transform: Transforms,
#[derivative(Debug = "ignore")]
pub transform: Box<dyn Transform>,
pub transform_total: Counter,
#[derivative(Debug = "ignore")]
pub transform_failures: Counter,
#[derivative(Debug = "ignore")]
pub transform_latency: Histogram,
#[derivative(Debug = "ignore")]
pub transform_pushed_total: Counter,
#[derivative(Debug = "ignore")]
pub transform_pushed_failures: Counter,
#[derivative(Debug = "ignore")]
pub transform_pushed_latency: Histogram,
}

impl TransformAndMetrics {
#[cfg(test)]
pub fn new(transform: Transforms) -> Self {
pub fn new(transform: Box<dyn Transform>) -> Self {
TransformAndMetrics {
transform,
transform_total: Counter::noop(),
Expand Down
Loading

0 comments on commit 4c84f75

Please sign in to comment.