From 261793b7dd58876545e854d388693de33d8ce8fe Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Thu, 14 Sep 2023 09:39:58 +1000 Subject: [PATCH] transform configs: require Serialize --- custom-transforms-example/src/redis_get_rewrite.rs | 6 +++--- shotover/src/config/chain.rs | 4 ++-- shotover/src/config/mod.rs | 2 ++ shotover/src/config/topology.rs | 13 +++++++++++-- shotover/src/lib.rs | 4 ++-- shotover/src/message/mod.rs | 4 ++-- shotover/src/sources/cassandra.rs | 4 ++-- shotover/src/sources/kafka.rs | 4 ++-- shotover/src/sources/mod.rs | 8 +++++--- shotover/src/sources/opensearch.rs | 4 ++-- shotover/src/sources/redis.rs | 4 ++-- shotover/src/transforms/cassandra/peers_rewrite.rs | 6 +++--- .../src/transforms/cassandra/sink_cluster/mod.rs | 8 ++++---- shotover/src/transforms/cassandra/sink_single.rs | 6 +++--- shotover/src/transforms/coalesce.rs | 6 +++--- shotover/src/transforms/debug/force_parse.rs | 10 +++++----- shotover/src/transforms/debug/log_to_file.rs | 6 +++--- shotover/src/transforms/debug/printer.rs | 6 +++--- shotover/src/transforms/debug/returner.rs | 8 ++++---- .../distributed/tuneable_consistency_scatter.rs | 6 +++--- shotover/src/transforms/filter.rs | 6 +++--- shotover/src/transforms/kafka/sink_cluster.rs | 6 +++--- shotover/src/transforms/kafka/sink_single.rs | 6 +++--- shotover/src/transforms/load_balance.rs | 6 +++--- shotover/src/transforms/mod.rs | 2 +- shotover/src/transforms/null.rs | 6 +++--- shotover/src/transforms/opensearch/mod.rs | 6 +++--- shotover/src/transforms/parallel_map.rs | 6 +++--- shotover/src/transforms/protect/key_management.rs | 4 ++-- shotover/src/transforms/protect/mod.rs | 6 +++--- shotover/src/transforms/query_counter.rs | 5 +++-- shotover/src/transforms/redis/cache.rs | 11 +++++------ .../src/transforms/redis/cluster_ports_rewrite.rs | 5 +++-- shotover/src/transforms/redis/sink_cluster.rs | 8 ++++---- shotover/src/transforms/redis/sink_single.rs | 6 +++--- shotover/src/transforms/redis/timestamp_tagging.rs | 6 +++--- shotover/src/transforms/tee.rs | 8 ++++---- shotover/src/transforms/throttling.rs | 6 +++--- 38 files changed, 121 insertions(+), 107 deletions(-) diff --git a/custom-transforms-example/src/redis_get_rewrite.rs b/custom-transforms-example/src/redis_get_rewrite.rs index a9b679127..bf9c8a1ea 100644 --- a/custom-transforms-example/src/redis_get_rewrite.rs +++ b/custom-transforms-example/src/redis_get_rewrite.rs @@ -1,17 +1,17 @@ use anyhow::Result; use async_trait::async_trait; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use shotover::frame::{Frame, RedisFrame}; use shotover::message::Messages; use shotover::transforms::{Transform, TransformBuilder, TransformConfig, Transforms, Wrapper}; -#[derive(Deserialize, Debug, Clone)] +#[derive(Serialize, Deserialize, Debug, Clone)] #[serde(deny_unknown_fields)] pub struct RedisGetRewriteConfig { pub result: String, } -#[typetag::deserialize(name = "RedisGetRewrite")] +#[typetag::serde(name = "RedisGetRewrite")] #[async_trait(?Send)] impl TransformConfig for RedisGetRewriteConfig { async fn get_builder(&self, _chain_name: String) -> Result> { diff --git a/shotover/src/config/chain.rs b/shotover/src/config/chain.rs index 47c87cc0a..e3209d551 100644 --- a/shotover/src/config/chain.rs +++ b/shotover/src/config/chain.rs @@ -2,11 +2,11 @@ use crate::transforms::chain::TransformChainBuilder; use crate::transforms::{TransformBuilder, TransformConfig}; use anyhow::Result; use serde::de::{DeserializeSeed, Deserializer, MapAccess, SeqAccess, Visitor}; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use std::fmt::{self, Debug}; use std::iter; -#[derive(Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug)] #[serde(deny_unknown_fields)] pub struct TransformChainConfig( #[serde(rename = "TransformChain", deserialize_with = "vec_transform_config")] diff --git a/shotover/src/config/mod.rs b/shotover/src/config/mod.rs index 96bbf8c6d..76669dfdd 100644 --- a/shotover/src/config/mod.rs +++ b/shotover/src/config/mod.rs @@ -1,3 +1,5 @@ +//! Config types, used for serializing/deserializing shotover configuration files + use anyhow::{Context, Result}; use serde::Deserialize; diff --git a/shotover/src/config/topology.rs b/shotover/src/config/topology.rs index 0cb6ee5da..116354b0f 100644 --- a/shotover/src/config/topology.rs +++ b/shotover/src/config/topology.rs @@ -3,12 +3,12 @@ use crate::sources::{Source, SourceConfig}; use crate::transforms::chain::TransformChainBuilder; use anyhow::{anyhow, Context, Result}; use itertools::Itertools; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use std::collections::HashMap; use tokio::sync::watch; use tracing::info; -#[derive(Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug)] #[serde(deny_unknown_fields)] pub struct Topology { pub sources: HashMap, @@ -17,6 +17,7 @@ pub struct Topology { } impl Topology { + /// Load the topology.yaml from the provided path into a Topology instance pub fn from_file(filepath: &str) -> Result { let file = std::fs::File::open(filepath) .with_context(|| format!("Couldn't open the topology file {}", filepath))?; @@ -26,6 +27,14 @@ impl Topology { .with_context(|| format!("Failed to parse topology file {}", filepath)) } + /// Generate the yaml representation of this instance + pub fn serialize(&self) -> Result { + let mut output = vec![]; + let mut serializer = serde_yaml::Serializer::new(&mut output); + serde_yaml::with::singleton_map_recursive::serialize(self, &mut serializer)?; + Ok(String::from_utf8(output).unwrap()) + } + async fn build_chains(&self) -> Result>> { let mut result = HashMap::new(); for (source_name, chain_name) in &self.source_to_chain_mapping { diff --git a/shotover/src/lib.rs b/shotover/src/lib.rs index a1f106248..058c066d6 100644 --- a/shotover/src/lib.rs +++ b/shotover/src/lib.rs @@ -32,13 +32,13 @@ #![allow(clippy::needless_doctest_main)] pub mod codec; -mod config; +pub mod config; pub mod frame; pub mod message; mod observability; pub mod runner; mod server; -mod sources; +pub mod sources; pub mod tcp; pub mod tls; mod tracing_panic_handler; diff --git a/shotover/src/message/mod.rs b/shotover/src/message/mod.rs index 7bc37240f..31d1d3363 100644 --- a/shotover/src/message/mod.rs +++ b/shotover/src/message/mod.rs @@ -14,7 +14,7 @@ use bytes::{Buf, Bytes}; use cassandra_protocol::compression::Compression; use cassandra_protocol::frame::message_error::{ErrorBody, ErrorType}; use nonzero_ext::nonzero; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use std::num::NonZeroU32; pub enum Metadata { @@ -436,7 +436,7 @@ pub enum Encodable { Frame(Frame), } -#[derive(PartialEq, Debug, Clone, Deserialize)] +#[derive(Serialize, Deserialize, PartialEq, Debug, Clone)] #[serde(deny_unknown_fields)] pub enum QueryType { Read, diff --git a/shotover/src/sources/cassandra.rs b/shotover/src/sources/cassandra.rs index 7906d09cb..ab3398a12 100644 --- a/shotover/src/sources/cassandra.rs +++ b/shotover/src/sources/cassandra.rs @@ -5,13 +5,13 @@ use crate::sources::{Source, Transport}; use crate::tls::{TlsAcceptor, TlsAcceptorConfig}; use crate::transforms::chain::TransformChainBuilder; use anyhow::Result; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use std::sync::Arc; use tokio::sync::{watch, Semaphore}; use tokio::task::JoinHandle; use tracing::{error, info}; -#[derive(Deserialize, Debug, Clone)] +#[derive(Serialize, Deserialize, Debug, Clone)] #[serde(deny_unknown_fields)] pub struct CassandraConfig { pub listen_addr: String, diff --git a/shotover/src/sources/kafka.rs b/shotover/src/sources/kafka.rs index 402c9e63f..b1b892b86 100644 --- a/shotover/src/sources/kafka.rs +++ b/shotover/src/sources/kafka.rs @@ -4,13 +4,13 @@ use crate::sources::{Source, Transport}; use crate::tls::{TlsAcceptor, TlsAcceptorConfig}; use crate::transforms::chain::TransformChainBuilder; use anyhow::Result; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use std::sync::Arc; use tokio::sync::{watch, Semaphore}; use tokio::task::JoinHandle; use tracing::{error, info}; -#[derive(Deserialize, Debug, Clone)] +#[derive(Serialize, Deserialize, Debug, Clone)] #[serde(deny_unknown_fields)] pub struct KafkaConfig { pub listen_addr: String, diff --git a/shotover/src/sources/mod.rs b/shotover/src/sources/mod.rs index 988d28964..130890ae8 100644 --- a/shotover/src/sources/mod.rs +++ b/shotover/src/sources/mod.rs @@ -1,10 +1,12 @@ +//! Sources used to listen for connections and send/recieve with the client. + use crate::sources::cassandra::{CassandraConfig, CassandraSource}; use crate::sources::kafka::{KafkaConfig, KafkaSource}; use crate::sources::opensearch::{OpenSearchConfig, OpenSearchSource}; use crate::sources::redis::{RedisConfig, RedisSource}; use crate::transforms::chain::TransformChainBuilder; use anyhow::Result; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use tokio::sync::watch; use tokio::task::JoinHandle; @@ -13,7 +15,7 @@ pub mod kafka; pub mod opensearch; pub mod redis; -#[derive(Deserialize, Debug, Clone, Copy)] +#[derive(Serialize, Deserialize, Debug, Clone, Copy)] #[serde(deny_unknown_fields)] pub enum Transport { Tcp, @@ -39,7 +41,7 @@ impl Source { } } -#[derive(Deserialize, Debug, Clone)] +#[derive(Serialize, Deserialize, Debug, Clone)] #[serde(deny_unknown_fields)] pub enum SourceConfig { Cassandra(CassandraConfig), diff --git a/shotover/src/sources/opensearch.rs b/shotover/src/sources/opensearch.rs index 934b571af..b0e449de3 100644 --- a/shotover/src/sources/opensearch.rs +++ b/shotover/src/sources/opensearch.rs @@ -3,13 +3,13 @@ use crate::server::TcpCodecListener; use crate::sources::{Source, Transport}; use crate::transforms::chain::TransformChainBuilder; use anyhow::Result; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use std::sync::Arc; use tokio::sync::{watch, Semaphore}; use tokio::task::JoinHandle; use tracing::{error, info}; -#[derive(Deserialize, Debug, Clone)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct OpenSearchConfig { pub listen_addr: String, pub connection_limit: Option, diff --git a/shotover/src/sources/redis.rs b/shotover/src/sources/redis.rs index 0bc2fe244..6e4fb3577 100644 --- a/shotover/src/sources/redis.rs +++ b/shotover/src/sources/redis.rs @@ -4,13 +4,13 @@ use crate::sources::{Source, Transport}; use crate::tls::{TlsAcceptor, TlsAcceptorConfig}; use crate::transforms::chain::TransformChainBuilder; use anyhow::Result; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use std::sync::Arc; use tokio::sync::{watch, Semaphore}; use tokio::task::JoinHandle; use tracing::{error, info}; -#[derive(Deserialize, Debug, Clone)] +#[derive(Serialize, Deserialize, Debug, Clone)] #[serde(deny_unknown_fields)] pub struct RedisConfig { pub listen_addr: String, diff --git a/shotover/src/transforms/cassandra/peers_rewrite.rs b/shotover/src/transforms/cassandra/peers_rewrite.rs index 0ce804585..40cddf976 100644 --- a/shotover/src/transforms/cassandra/peers_rewrite.rs +++ b/shotover/src/transforms/cassandra/peers_rewrite.rs @@ -11,15 +11,15 @@ use cassandra_protocol::frame::events::{ServerEvent, StatusChange}; use cql3_parser::cassandra_statement::CassandraStatement; use cql3_parser::common::{FQName, Identifier}; use cql3_parser::select::SelectElement; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; -#[derive(Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug)] #[serde(deny_unknown_fields)] pub struct CassandraPeersRewriteConfig { pub port: u16, } -#[typetag::deserialize(name = "CassandraPeersRewrite")] +#[typetag::serde(name = "CassandraPeersRewrite")] #[async_trait(?Send)] impl TransformConfig for CassandraPeersRewriteConfig { async fn get_builder(&self, _chain_name: String) -> Result> { diff --git a/shotover/src/transforms/cassandra/sink_cluster/mod.rs b/shotover/src/transforms/cassandra/sink_cluster/mod.rs index 400f1e0c7..d89a54eac 100644 --- a/shotover/src/transforms/cassandra/sink_cluster/mod.rs +++ b/shotover/src/transforms/cassandra/sink_cluster/mod.rs @@ -21,7 +21,7 @@ use metrics::{register_counter, Counter}; use node::{CassandraNode, ConnectionFactory}; use node_pool::{GetReplicaErr, KeyspaceMetadata, NodePool}; use rand::prelude::*; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::net::SocketAddr; use std::time::Duration; @@ -47,7 +47,7 @@ const SYSTEM_KEYSPACES: [IdentifierRef<'static>; 3] = [ IdentifierRef::Quoted("system_distributed"), ]; -#[derive(Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug)] #[serde(deny_unknown_fields)] pub struct CassandraSinkClusterConfig { /// contact points must be within the specified data_center and rack. @@ -61,7 +61,7 @@ pub struct CassandraSinkClusterConfig { pub read_timeout: Option, } -#[typetag::deserialize(name = "CassandraSinkCluster")] +#[typetag::serde(name = "CassandraSinkCluster")] #[async_trait(?Send)] impl TransformConfig for CassandraSinkClusterConfig { async fn get_builder(&self, chain_name: String) -> Result> { @@ -181,7 +181,7 @@ impl TransformBuilder for CassandraSinkClusterBuilder { } } -#[derive(Deserialize, Debug, Clone)] +#[derive(Serialize, Deserialize, Debug, Clone)] #[serde(deny_unknown_fields)] pub struct ShotoverNode { pub address: SocketAddr, diff --git a/shotover/src/transforms/cassandra/sink_single.rs b/shotover/src/transforms/cassandra/sink_single.rs index 7c2fdba62..181e9ecb7 100644 --- a/shotover/src/transforms/cassandra/sink_single.rs +++ b/shotover/src/transforms/cassandra/sink_single.rs @@ -10,12 +10,12 @@ use async_trait::async_trait; use cassandra_protocol::frame::Version; use futures::stream::FuturesOrdered; use metrics::{register_counter, Counter}; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use std::time::Duration; use tokio::sync::{mpsc, oneshot}; use tracing::trace; -#[derive(Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug)] #[serde(deny_unknown_fields)] pub struct CassandraSinkSingleConfig { #[serde(rename = "remote_address")] @@ -25,7 +25,7 @@ pub struct CassandraSinkSingleConfig { pub read_timeout: Option, } -#[typetag::deserialize(name = "CassandraSinkSingle")] +#[typetag::serde(name = "CassandraSinkSingle")] #[async_trait(?Send)] impl TransformConfig for CassandraSinkSingleConfig { async fn get_builder(&self, chain_name: String) -> Result> { diff --git a/shotover/src/transforms/coalesce.rs b/shotover/src/transforms/coalesce.rs index cb57ca153..66ca1a84f 100644 --- a/shotover/src/transforms/coalesce.rs +++ b/shotover/src/transforms/coalesce.rs @@ -2,7 +2,7 @@ use crate::message::Messages; use crate::transforms::{Transform, TransformBuilder, TransformConfig, Transforms, Wrapper}; use anyhow::Result; use async_trait::async_trait; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use std::time::Instant; #[derive(Debug, Clone)] @@ -13,14 +13,14 @@ pub struct Coalesce { last_write: Instant, } -#[derive(Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug)] #[serde(deny_unknown_fields)] pub struct CoalesceConfig { pub flush_when_buffered_message_count: Option, pub flush_when_millis_since_last_flush: Option, } -#[typetag::deserialize(name = "Coalesce")] +#[typetag::serde(name = "Coalesce")] #[async_trait(?Send)] impl TransformConfig for CoalesceConfig { async fn get_builder(&self, _chain_name: String) -> Result> { diff --git a/shotover/src/transforms/debug/force_parse.rs b/shotover/src/transforms/debug/force_parse.rs index 4892c7a2f..26d57e009 100644 --- a/shotover/src/transforms/debug/force_parse.rs +++ b/shotover/src/transforms/debug/force_parse.rs @@ -10,11 +10,11 @@ use crate::transforms::TransformConfig; use crate::transforms::{Transform, TransformBuilder, Transforms, Wrapper}; use anyhow::Result; use async_trait::async_trait; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; /// Messages that pass through this transform will be parsed. /// Must be individually enabled at the request or response level. -#[derive(Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug)] #[serde(deny_unknown_fields)] pub struct DebugForceParseConfig { pub parse_requests: bool, @@ -22,7 +22,7 @@ pub struct DebugForceParseConfig { } #[cfg(feature = "alpha-transforms")] -#[typetag::deserialize(name = "DebugForceParse")] +#[typetag::serde(name = "DebugForceParse")] #[async_trait(?Send)] impl TransformConfig for DebugForceParseConfig { async fn get_builder(&self, _chain_name: String) -> Result> { @@ -37,7 +37,7 @@ impl TransformConfig for DebugForceParseConfig { /// Messages that pass through this transform will be parsed and then reencoded. /// Must be individually enabled at the request or response level. -#[derive(Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug)] #[serde(deny_unknown_fields)] pub struct DebugForceEncodeConfig { pub encode_requests: bool, @@ -45,7 +45,7 @@ pub struct DebugForceEncodeConfig { } #[cfg(feature = "alpha-transforms")] -#[typetag::deserialize(name = "DebugForceEncode")] +#[typetag::serde(name = "DebugForceEncode")] #[async_trait(?Send)] impl TransformConfig for DebugForceEncodeConfig { async fn get_builder(&self, _chain_name: String) -> Result> { diff --git a/shotover/src/transforms/debug/log_to_file.rs b/shotover/src/transforms/debug/log_to_file.rs index bb5117c78..75a0f0d85 100644 --- a/shotover/src/transforms/debug/log_to_file.rs +++ b/shotover/src/transforms/debug/log_to_file.rs @@ -2,18 +2,18 @@ use crate::message::{Encodable, Message}; use crate::transforms::{Transform, TransformBuilder, Transforms, Wrapper}; use anyhow::{Context, Result}; use async_trait::async_trait; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use tracing::{error, info}; -#[derive(Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug)] #[serde(deny_unknown_fields)] pub struct DebugLogToFileConfig; #[cfg(feature = "alpha-transforms")] -#[typetag::deserialize(name = "DebugLogToFile")] +#[typetag::serde(name = "DebugLogToFile")] #[async_trait(?Send)] impl crate::transforms::TransformConfig for DebugLogToFileConfig { async fn get_builder(&self, _chain_name: String) -> Result> { diff --git a/shotover/src/transforms/debug/printer.rs b/shotover/src/transforms/debug/printer.rs index 640298536..50e58ae34 100644 --- a/shotover/src/transforms/debug/printer.rs +++ b/shotover/src/transforms/debug/printer.rs @@ -2,14 +2,14 @@ use crate::message::Messages; use crate::transforms::{Transform, TransformBuilder, TransformConfig, Transforms, Wrapper}; use anyhow::Result; use async_trait::async_trait; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use tracing::info; -#[derive(Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug)] #[serde(deny_unknown_fields)] pub struct DebugPrinterConfig; -#[typetag::deserialize(name = "DebugPrinter")] +#[typetag::serde(name = "DebugPrinter")] #[async_trait(?Send)] impl TransformConfig for DebugPrinterConfig { async fn get_builder(&self, _chain_name: String) -> Result> { diff --git a/shotover/src/transforms/debug/returner.rs b/shotover/src/transforms/debug/returner.rs index 96398a4f7..70f3f2351 100644 --- a/shotover/src/transforms/debug/returner.rs +++ b/shotover/src/transforms/debug/returner.rs @@ -3,16 +3,16 @@ use crate::message::{Message, Messages}; use crate::transforms::{Transform, TransformBuilder, TransformConfig, Transforms, Wrapper}; use anyhow::{anyhow, Result}; use async_trait::async_trait; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; -#[derive(Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug)] #[serde(deny_unknown_fields)] pub struct DebugReturnerConfig { #[serde(flatten)] response: Response, } -#[typetag::deserialize(name = "DebugReturner")] +#[typetag::serde(name = "DebugReturner")] #[async_trait(?Send)] impl TransformConfig for DebugReturnerConfig { async fn get_builder(&self, _chain_name: String) -> Result> { @@ -20,7 +20,7 @@ impl TransformConfig for DebugReturnerConfig { } } -#[derive(Debug, Clone, Deserialize)] +#[derive(Serialize, Deserialize, Debug, Clone)] #[serde(deny_unknown_fields)] pub enum Response { #[serde(skip)] diff --git a/shotover/src/transforms/distributed/tuneable_consistency_scatter.rs b/shotover/src/transforms/distributed/tuneable_consistency_scatter.rs index cc5249c47..e5d718761 100644 --- a/shotover/src/transforms/distributed/tuneable_consistency_scatter.rs +++ b/shotover/src/transforms/distributed/tuneable_consistency_scatter.rs @@ -7,11 +7,11 @@ use anyhow::Result; use async_trait::async_trait; use futures::stream::FuturesUnordered; use futures::StreamExt; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use std::collections::HashMap; use tracing::{error, warn}; -#[derive(Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug)] #[serde(deny_unknown_fields)] pub struct TuneableConsistencyScatterConfig { pub route_map: HashMap, @@ -19,7 +19,7 @@ pub struct TuneableConsistencyScatterConfig { pub read_consistency: i32, } -#[typetag::deserialize(name = "TuneableConsistencyScatter")] +#[typetag::serde(name = "TuneableConsistencyScatter")] #[async_trait(?Send)] impl TransformConfig for TuneableConsistencyScatterConfig { async fn get_builder(&self, _chain_name: String) -> Result> { diff --git a/shotover/src/transforms/filter.rs b/shotover/src/transforms/filter.rs index 47c5c4aef..6c591db3c 100644 --- a/shotover/src/transforms/filter.rs +++ b/shotover/src/transforms/filter.rs @@ -2,7 +2,7 @@ use crate::message::{Message, Messages, QueryType}; use crate::transforms::{Transform, TransformBuilder, TransformConfig, Wrapper}; use anyhow::Result; use async_trait::async_trait; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use std::sync::atomic::{AtomicBool, Ordering}; use super::Transforms; @@ -14,13 +14,13 @@ pub struct QueryTypeFilter { pub filter: QueryType, } -#[derive(Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug)] #[serde(deny_unknown_fields)] pub struct QueryTypeFilterConfig { pub filter: QueryType, } -#[typetag::deserialize(name = "QueryTypeFilter")] +#[typetag::serde(name = "QueryTypeFilter")] #[async_trait(?Send)] impl TransformConfig for QueryTypeFilterConfig { async fn get_builder(&self, _chain_name: String) -> Result> { diff --git a/shotover/src/transforms/kafka/sink_cluster.rs b/shotover/src/transforms/kafka/sink_cluster.rs index 7ae082039..c8e596fe5 100644 --- a/shotover/src/transforms/kafka/sink_cluster.rs +++ b/shotover/src/transforms/kafka/sink_cluster.rs @@ -21,7 +21,7 @@ use kafka_protocol::protocol::{Builder, StrBytes}; use rand::rngs::SmallRng; use rand::seq::{IteratorRandom, SliceRandom}; use rand::SeedableRng; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::hash::Hasher; use std::net::SocketAddr; @@ -31,7 +31,7 @@ use tokio::sync::{mpsc, oneshot, RwLock}; use tokio::time::timeout; use uuid::Uuid; -#[derive(Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug)] #[serde(deny_unknown_fields)] pub struct KafkaSinkClusterConfig { pub first_contact_points: Vec, @@ -44,7 +44,7 @@ pub struct KafkaSinkClusterConfig { use crate::transforms::TransformConfig; #[cfg(feature = "alpha-transforms")] -#[typetag::deserialize(name = "KafkaSinkCluster")] +#[typetag::serde(name = "KafkaSinkCluster")] #[async_trait(?Send)] impl TransformConfig for KafkaSinkClusterConfig { async fn get_builder(&self, chain_name: String) -> Result> { diff --git a/shotover/src/transforms/kafka/sink_single.rs b/shotover/src/transforms/kafka/sink_single.rs index 36a61df4e..82649b260 100644 --- a/shotover/src/transforms/kafka/sink_single.rs +++ b/shotover/src/transforms/kafka/sink_single.rs @@ -9,12 +9,12 @@ use crate::transforms::util::{Request, Response}; use crate::transforms::{Transform, TransformBuilder, Transforms, Wrapper}; use anyhow::{anyhow, Result}; use async_trait::async_trait; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use std::time::Duration; use tokio::sync::{mpsc, oneshot}; use tokio::time::timeout; -#[derive(Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug)] #[serde(deny_unknown_fields)] pub struct KafkaSinkSingleConfig { #[serde(rename = "remote_address")] @@ -27,7 +27,7 @@ pub struct KafkaSinkSingleConfig { use crate::transforms::TransformConfig; #[cfg(feature = "alpha-transforms")] -#[typetag::deserialize(name = "KafkaSinkSingle")] +#[typetag::serde(name = "KafkaSinkSingle")] #[async_trait(?Send)] impl TransformConfig for KafkaSinkSingleConfig { async fn get_builder(&self, chain_name: String) -> Result> { diff --git a/shotover/src/transforms/load_balance.rs b/shotover/src/transforms/load_balance.rs index 12373ada1..390fbd875 100644 --- a/shotover/src/transforms/load_balance.rs +++ b/shotover/src/transforms/load_balance.rs @@ -5,11 +5,11 @@ use crate::transforms::chain::{BufferedChain, TransformChainBuilder}; use crate::transforms::{Transform, TransformBuilder, TransformConfig, Wrapper}; use anyhow::Result; use async_trait::async_trait; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use std::sync::Arc; use tokio::sync::Mutex; -#[derive(Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug)] #[serde(deny_unknown_fields)] pub struct ConnectionBalanceAndPoolConfig { pub name: String, @@ -17,7 +17,7 @@ pub struct ConnectionBalanceAndPoolConfig { pub chain: TransformChainConfig, } -#[typetag::deserialize(name = "ConnectionBalanceAndPool")] +#[typetag::serde(name = "ConnectionBalanceAndPool")] #[async_trait(?Send)] impl TransformConfig for ConnectionBalanceAndPoolConfig { async fn get_builder(&self, _chain_name: String) -> Result> { diff --git a/shotover/src/transforms/mod.rs b/shotover/src/transforms/mod.rs index c043a30d8..a6f99877a 100644 --- a/shotover/src/transforms/mod.rs +++ b/shotover/src/transforms/mod.rs @@ -240,7 +240,7 @@ impl Transforms { } } -#[typetag::deserialize] +#[typetag::serde] #[async_trait(?Send)] pub trait TransformConfig: Debug { async fn get_builder(&self, chain_name: String) -> Result>; diff --git a/shotover/src/transforms/null.rs b/shotover/src/transforms/null.rs index 6a984f41a..bab52e015 100644 --- a/shotover/src/transforms/null.rs +++ b/shotover/src/transforms/null.rs @@ -2,13 +2,13 @@ use crate::message::Messages; use crate::transforms::{Transform, TransformBuilder, TransformConfig, Transforms, Wrapper}; use anyhow::Result; use async_trait::async_trait; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; -#[derive(Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug)] #[serde(deny_unknown_fields)] pub struct NullSinkConfig; -#[typetag::deserialize(name = "NullSink")] +#[typetag::serde(name = "NullSink")] #[async_trait(?Send)] impl TransformConfig for NullSinkConfig { async fn get_builder(&self, _chain_name: String) -> Result> { diff --git a/shotover/src/transforms/opensearch/mod.rs b/shotover/src/transforms/opensearch/mod.rs index 55f952efd..1a84d10c2 100644 --- a/shotover/src/transforms/opensearch/mod.rs +++ b/shotover/src/transforms/opensearch/mod.rs @@ -11,19 +11,19 @@ use crate::{ }; use anyhow::{anyhow, Result}; use async_trait::async_trait; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use std::time::Duration; use tokio::sync::oneshot; use tracing::trace; -#[derive(Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug)] pub struct OpenSearchSinkSingleConfig { #[serde(rename = "remote_address")] address: String, connect_timeout_ms: u64, } -#[typetag::deserialize(name = "OpenSearchSinkSingle")] +#[typetag::serde(name = "OpenSearchSinkSingle")] #[async_trait(?Send)] impl TransformConfig for OpenSearchSinkSingleConfig { async fn get_builder(&self, chain_name: String) -> Result> { diff --git a/shotover/src/transforms/parallel_map.rs b/shotover/src/transforms/parallel_map.rs index b73cd56c6..bc718e3ee 100644 --- a/shotover/src/transforms/parallel_map.rs +++ b/shotover/src/transforms/parallel_map.rs @@ -8,7 +8,7 @@ use futures::stream::{FuturesOrdered, FuturesUnordered}; use futures::task::{Context, Poll}; use futures::Stream; use futures::StreamExt; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use std::future::Future; use std::pin::Pin; @@ -63,7 +63,7 @@ where } } -#[derive(Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug)] #[serde(deny_unknown_fields)] pub struct ParallelMapConfig { pub parallelism: u32, @@ -71,7 +71,7 @@ pub struct ParallelMapConfig { pub ordered_results: bool, } -#[typetag::deserialize(name = "ParallelMap")] +#[typetag::serde(name = "ParallelMap")] #[async_trait(?Send)] impl TransformConfig for ParallelMapConfig { async fn get_builder(&self, _chain_name: String) -> Result> { diff --git a/shotover/src/transforms/protect/key_management.rs b/shotover/src/transforms/protect/key_management.rs index 458930d47..fff5c7503 100644 --- a/shotover/src/transforms/protect/key_management.rs +++ b/shotover/src/transforms/protect/key_management.rs @@ -9,7 +9,7 @@ use base64::{engine::general_purpose, Engine as _}; use bytes::Bytes; use cached::proc_macro::cached; use chacha20poly1305::Key; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use std::collections::HashMap; #[derive(Clone, Debug)] @@ -18,7 +18,7 @@ pub enum KeyManager { Local(LocalKeyManagement), } -#[derive(Deserialize, Debug, Clone)] +#[derive(Serialize, Deserialize, Debug, Clone)] #[serde(deny_unknown_fields)] pub enum KeyManagerConfig { AWSKms { diff --git a/shotover/src/transforms/protect/mod.rs b/shotover/src/transforms/protect/mod.rs index fa55a40c6..179b5a30f 100644 --- a/shotover/src/transforms/protect/mod.rs +++ b/shotover/src/transforms/protect/mod.rs @@ -11,7 +11,7 @@ use cql3_parser::cassandra_statement::CassandraStatement; use cql3_parser::common::Identifier; use cql3_parser::insert::InsertValues; use cql3_parser::select::SelectElement; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use std::collections::HashMap; mod aws_kms; @@ -20,7 +20,7 @@ mod key_management; mod local_kek; mod pkcs_11; -#[derive(Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug)] #[serde(deny_unknown_fields)] pub struct ProtectConfig { pub keyspace_table_columns: HashMap>>, @@ -31,7 +31,7 @@ pub struct ProtectConfig { use crate::transforms::TransformConfig; #[cfg(feature = "alpha-transforms")] -#[typetag::deserialize(name = "Protect")] +#[typetag::serde(name = "Protect")] #[async_trait(?Send)] impl TransformConfig for ProtectConfig { async fn get_builder(&self, _chain_name: String) -> Result> { diff --git a/shotover/src/transforms/query_counter.rs b/shotover/src/transforms/query_counter.rs index fc9076a3f..e2df9b7f4 100644 --- a/shotover/src/transforms/query_counter.rs +++ b/shotover/src/transforms/query_counter.rs @@ -7,13 +7,14 @@ use anyhow::Result; use async_trait::async_trait; use metrics::{counter, register_counter}; use serde::Deserialize; +use serde::Serialize; #[derive(Debug, Clone)] pub struct QueryCounter { counter_name: String, } -#[derive(Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug)] #[serde(deny_unknown_fields)] pub struct QueryCounterConfig { pub name: String, @@ -93,7 +94,7 @@ fn get_redis_query_type(frame: &RedisFrame) -> Option { None } -#[typetag::deserialize(name = "QueryCounter")] +#[typetag::serde(name = "QueryCounter")] #[async_trait(?Send)] impl TransformConfig for QueryCounterConfig { async fn get_builder(&self, _chain_name: String) -> Result> { diff --git a/shotover/src/transforms/redis/cache.rs b/shotover/src/transforms/redis/cache.rs index d7c795e59..6873c3c52 100644 --- a/shotover/src/transforms/redis/cache.rs +++ b/shotover/src/transforms/redis/cache.rs @@ -12,7 +12,7 @@ use cql3_parser::common::{FQName, Identifier, Operand, RelationElement, Relation use cql3_parser::select::Select; use itertools::Itertools; use metrics::{register_counter, Counter}; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use std::collections::{BTreeMap, HashMap}; use std::net::SocketAddr; use tracing::{error, warn}; @@ -48,15 +48,14 @@ enum CacheableState { Skip, } -#[derive(Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug)] #[serde(deny_unknown_fields)] pub struct TableCacheSchemaConfig { partition_key: Vec, range_key: Vec, } -#[derive(Deserialize, Debug, Clone)] -#[serde(deny_unknown_fields)] +#[derive(Debug, Clone)] pub struct TableCacheSchema { partition_key: Vec, range_key: Vec, @@ -75,14 +74,14 @@ impl From<&TableCacheSchemaConfig> for TableCacheSchema { } } -#[derive(Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug)] #[serde(deny_unknown_fields)] pub struct RedisConfig { pub caching_schema: HashMap, pub chain: TransformChainConfig, } -#[typetag::deserialize(name = "RedisCache")] +#[typetag::serde(name = "RedisCache")] #[async_trait(?Send)] impl TransformConfig for RedisConfig { async fn get_builder(&self, _chain_name: String) -> Result> { diff --git a/shotover/src/transforms/redis/cluster_ports_rewrite.rs b/shotover/src/transforms/redis/cluster_ports_rewrite.rs index 1d3cc0496..8c13cf6cc 100644 --- a/shotover/src/transforms/redis/cluster_ports_rewrite.rs +++ b/shotover/src/transforms/redis/cluster_ports_rewrite.rs @@ -6,14 +6,15 @@ use anyhow::{anyhow, bail, Context, Result}; use async_trait::async_trait; use bytes::{BufMut, Bytes, BytesMut}; use serde::Deserialize; +use serde::Serialize; -#[derive(Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug)] #[serde(deny_unknown_fields)] pub struct RedisClusterPortsRewriteConfig { pub new_port: u16, } -#[typetag::deserialize(name = "RedisClusterPortsRewrite")] +#[typetag::serde(name = "RedisClusterPortsRewrite")] #[async_trait(?Send)] impl TransformConfig for RedisClusterPortsRewriteConfig { async fn get_builder(&self, _chain_name: String) -> Result> { diff --git a/shotover/src/transforms/redis/sink_cluster.rs b/shotover/src/transforms/redis/sink_cluster.rs index bea727127..0934710fd 100644 --- a/shotover/src/transforms/redis/sink_cluster.rs +++ b/shotover/src/transforms/redis/sink_cluster.rs @@ -25,7 +25,7 @@ use rand::rngs::SmallRng; use rand::seq::IteratorRandom; use rand::SeedableRng; use redis_protocol::types::Redirection; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use std::collections::{BTreeMap, HashMap, HashSet}; use tokio::sync::mpsc::UnboundedSender; use tokio::sync::oneshot; @@ -36,17 +36,17 @@ const SLOT_SIZE: usize = 16384; type ChannelMap = HashMap>>; -#[derive(Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug)] #[serde(deny_unknown_fields)] pub struct RedisSinkClusterConfig { pub first_contact_points: Vec, pub direct_destination: Option, pub tls: Option, - connection_count: Option, + pub connection_count: Option, pub connect_timeout_ms: u64, } -#[typetag::deserialize(name = "RedisSinkCluster")] +#[typetag::serde(name = "RedisSinkCluster")] #[async_trait(?Send)] impl TransformConfig for RedisSinkClusterConfig { async fn get_builder(&self, chain_name: String) -> Result> { diff --git a/shotover/src/transforms/redis/sink_single.rs b/shotover/src/transforms/redis/sink_single.rs index ecaaa5743..80a6e645d 100644 --- a/shotover/src/transforms/redis/sink_single.rs +++ b/shotover/src/transforms/redis/sink_single.rs @@ -11,7 +11,7 @@ use anyhow::{anyhow, Result}; use async_trait::async_trait; use futures::{FutureExt, SinkExt, StreamExt}; use metrics::{register_counter, Counter}; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use std::fmt::Debug; use std::pin::Pin; use std::time::Duration; @@ -20,7 +20,7 @@ use tokio::sync::mpsc; use tokio_util::codec::{FramedRead, FramedWrite}; use tracing::Instrument; -#[derive(Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug)] #[serde(deny_unknown_fields)] pub struct RedisSinkSingleConfig { #[serde(rename = "remote_address")] @@ -29,7 +29,7 @@ pub struct RedisSinkSingleConfig { pub connect_timeout_ms: u64, } -#[typetag::deserialize(name = "RedisSinkSingle")] +#[typetag::serde(name = "RedisSinkSingle")] #[async_trait(?Send)] impl TransformConfig for RedisSinkSingleConfig { async fn get_builder(&self, chain_name: String) -> Result> { diff --git a/shotover/src/transforms/redis/timestamp_tagging.rs b/shotover/src/transforms/redis/timestamp_tagging.rs index 24613d68e..682f474b5 100644 --- a/shotover/src/transforms/redis/timestamp_tagging.rs +++ b/shotover/src/transforms/redis/timestamp_tagging.rs @@ -6,16 +6,16 @@ use anyhow::{anyhow, Result}; use async_trait::async_trait; use bytes::Bytes; use itertools::Itertools; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use std::fmt::Write; use std::time::{SystemTime, UNIX_EPOCH}; use tracing::{debug, trace}; -#[derive(Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug)] #[serde(deny_unknown_fields)] pub struct RedisTimestampTaggerConfig; -#[typetag::deserialize(name = "RedisTimestampTagger")] +#[typetag::serde(name = "RedisTimestampTagger")] #[async_trait(?Send)] impl TransformConfig for RedisTimestampTaggerConfig { async fn get_builder(&self, _chain_name: String) -> Result> { diff --git a/shotover/src/transforms/tee.rs b/shotover/src/transforms/tee.rs index a86e88d6d..a239b78c1 100644 --- a/shotover/src/transforms/tee.rs +++ b/shotover/src/transforms/tee.rs @@ -5,7 +5,7 @@ use crate::transforms::{Transform, TransformBuilder, TransformConfig, Transforms use anyhow::Result; use async_trait::async_trait; use metrics::{register_counter, Counter}; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use tracing::{debug, trace, warn}; pub struct TeeBuilder { @@ -106,7 +106,7 @@ pub enum ConsistencyBehavior { SubchainOnMismatch(BufferedChain), } -#[derive(Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug)] #[serde(deny_unknown_fields)] pub struct TeeConfig { pub behavior: Option, @@ -115,7 +115,7 @@ pub struct TeeConfig { pub buffer_size: Option, } -#[derive(Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug)] #[serde(deny_unknown_fields)] pub enum ConsistencyBehaviorConfig { Ignore, @@ -124,7 +124,7 @@ pub enum ConsistencyBehaviorConfig { SubchainOnMismatch(TransformChainConfig), } -#[typetag::deserialize(name = "Tee")] +#[typetag::serde(name = "Tee")] #[async_trait(?Send)] impl TransformConfig for TeeConfig { async fn get_builder(&self, _chain_name: String) -> Result> { diff --git a/shotover/src/transforms/throttling.rs b/shotover/src/transforms/throttling.rs index 6f88469d9..90c7fea23 100644 --- a/shotover/src/transforms/throttling.rs +++ b/shotover/src/transforms/throttling.rs @@ -9,19 +9,19 @@ use governor::{ Quota, RateLimiter, }; use nonzero_ext::nonzero; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use std::num::NonZeroU32; use std::sync::Arc; use super::Transforms; -#[derive(Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug)] #[serde(deny_unknown_fields)] pub struct RequestThrottlingConfig { pub max_requests_per_second: NonZeroU32, } -#[typetag::deserialize(name = "RequestThrottling")] +#[typetag::serde(name = "RequestThrottling")] #[async_trait(?Send)] impl TransformConfig for RequestThrottlingConfig { async fn get_builder(&self, _chain_name: String) -> Result> {