Skip to content

Commit

Permalink
pub/private cleanup (#1689)
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Jul 12, 2024
1 parent 70a9902 commit aa0c320
Show file tree
Hide file tree
Showing 20 changed files with 50 additions and 202 deletions.
16 changes: 0 additions & 16 deletions docs/src/transforms.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ Future transforms won't be added to the public API while in alpha. But in these
| [RedisSinkSingle](#redissinksingle) || Beta |
| [Tee](#tee) || Alpha |
| [RequestThrottling](#requestthrottling) || Alpha |
<!--| [DebugRandomDelay](#debugrandomdelay) | ❌ | Alpha |-->

### CassandraSinkCluster

Expand Down Expand Up @@ -225,21 +224,6 @@ This transform will drop any messages it receives and return the supplied respon
# Fail
```

<!-- commented out until included in the public API
### DebugRandomDelay

Delay the transform chain at the position that this transform sits at.

```yaml
- DebugRandomDelay
# length of time to delay in milliseconds
delay: 1000
# optionally provide a distribution for a random amount to add to the base delay
distribution: 500
```
-->

### KafkaSinkCluster

This transform will route kafka messages to a broker within a Kafka cluster:
Expand Down
10 changes: 5 additions & 5 deletions shotover/src/transforms/cassandra/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ mod test_router;
mod token_ring;
pub mod topology;

pub type KeyspaceChanTx = watch::Sender<HashMap<String, KeyspaceMetadata>>;
pub type KeyspaceChanRx = watch::Receiver<HashMap<String, KeyspaceMetadata>>;
type KeyspaceChanTx = watch::Sender<HashMap<String, KeyspaceMetadata>>;
type KeyspaceChanRx = watch::Receiver<HashMap<String, KeyspaceMetadata>>;

const SYSTEM_KEYSPACES: [IdentifierRef<'static>; 3] = [
IdentifierRef::Quoted("system"),
Expand Down Expand Up @@ -106,7 +106,7 @@ impl TransformConfig for CassandraSinkClusterConfig {
}
}

pub struct CassandraSinkClusterBuilder {
struct CassandraSinkClusterBuilder {
contact_points: Vec<String>,
connection_factory: ConnectionFactory,
failed_requests: Counter,
Expand All @@ -118,7 +118,7 @@ pub struct CassandraSinkClusterBuilder {
}

impl CassandraSinkClusterBuilder {
pub fn new(
fn new(
contact_points: Vec<String>,
shotover_peers: Vec<ShotoverNode>,
chain_name: String,
Expand Down Expand Up @@ -205,7 +205,7 @@ pub struct ShotoverNode {
pub host_id: Uuid,
}

pub struct CassandraSinkCluster {
struct CassandraSinkCluster {
contact_points: Vec<String>,

connection_factory: ConnectionFactory,
Expand Down
2 changes: 1 addition & 1 deletion shotover/src/transforms/cassandra/sink_cluster/murmur.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! Taken from https://github.com/scylladb/scylla-rust-driver/blob/4a4fd0e5e785031956f560ecf22cb8653eea122b/scylla/src/routing.rs
//! Taken from <https://github.com/scylladb/scylla-rust-driver/blob/4a4fd0e5e785031956f560ecf22cb8653eea122b/scylla/src/routing.rs>
//! We cant import it as that would bring the openssl dependency into shotover.
use bytes::Buf;
Expand Down
6 changes: 3 additions & 3 deletions shotover/src/transforms/cassandra/sink_single.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ impl TransformConfig for CassandraSinkSingleConfig {
}
}

pub struct CassandraSinkSingleBuilder {
struct CassandraSinkSingleBuilder {
version: Option<Version>,
address: String,
failed_requests: Counter,
Expand All @@ -67,7 +67,7 @@ pub struct CassandraSinkSingleBuilder {
}

impl CassandraSinkSingleBuilder {
pub fn new(
fn new(
address: String,
chain_name: String,
tls: Option<TlsConnector>,
Expand Down Expand Up @@ -115,7 +115,7 @@ impl TransformBuilder for CassandraSinkSingleBuilder {
}
}

pub struct CassandraSinkSingle {
struct CassandraSinkSingle {
version: Option<Version>,
address: String,
connection: Option<SinkConnection>,
Expand Down
2 changes: 1 addition & 1 deletion shotover/src/transforms/coalesce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize};
use std::time::Instant;

#[derive(Clone)]
pub struct Coalesce {
struct Coalesce {
flush_when_buffered_message_count: Option<usize>,
flush_when_millis_since_last_flush: Option<u128>,
buffer: Messages,
Expand Down
7 changes: 1 addition & 6 deletions shotover/src/transforms/debug/force_parse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,9 @@ use crate::message::Messages;
/// The use of this transform is to allow benchmarking the performance impact of parsing messages
/// without worrying about the performance impact of other transform logic.
/// It could also be used to ensure that messages round trip correctly when parsed.
#[cfg(feature = "alpha-transforms")]
use crate::transforms::TransformConfig;
use crate::transforms::TransformContextBuilder;
#[cfg(feature = "alpha-transforms")]
use crate::transforms::TransformContextConfig;
#[cfg(feature = "alpha-transforms")]
use crate::transforms::{DownChainProtocol, UpChainProtocol};
use crate::transforms::{Transform, TransformBuilder, Wrapper};
use anyhow::Result;
Expand All @@ -26,7 +23,6 @@ pub struct DebugForceParseConfig {
pub parse_responses: bool,
}

#[cfg(feature = "alpha-transforms")]
#[typetag::serde(name = "DebugForceParse")]
#[async_trait(?Send)]
impl TransformConfig for DebugForceParseConfig {
Expand Down Expand Up @@ -61,7 +57,6 @@ pub struct DebugForceEncodeConfig {
}

const NAME: &str = "DebugForceEncode";
#[cfg(feature = "alpha-transforms")]
#[typetag::serde(name = "DebugForceEncode")]
#[async_trait(?Send)]
impl TransformConfig for DebugForceEncodeConfig {
Expand All @@ -87,7 +82,7 @@ impl TransformConfig for DebugForceEncodeConfig {
}

#[derive(Clone)]
pub struct DebugForceParse {
struct DebugForceParse {
parse_requests: bool,
parse_responses: bool,
encode_requests: bool,
Expand Down
5 changes: 2 additions & 3 deletions shotover/src/transforms/debug/log_to_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use tracing::{error, info};
pub struct DebugLogToFileConfig;

const NAME: &str = "DebugLogToFile";
#[cfg(feature = "alpha-transforms")]
#[typetag::serde(name = "DebugLogToFile")]
#[async_trait(?Send)]
impl crate::transforms::TransformConfig for DebugLogToFileConfig {
Expand All @@ -40,7 +39,7 @@ impl crate::transforms::TransformConfig for DebugLogToFileConfig {
}
}

pub struct DebugLogToFileBuilder {
struct DebugLogToFileBuilder {
connection_counter: Arc<AtomicU64>,
}

Expand Down Expand Up @@ -77,7 +76,7 @@ impl TransformBuilder for DebugLogToFileBuilder {
}
}

pub struct DebugLogToFile {
struct DebugLogToFile {
request_counter: u64,
response_counter: u64,
requests: PathBuf,
Expand Down
3 changes: 2 additions & 1 deletion shotover/src/transforms/debug/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#[cfg(feature = "alpha-transforms")]
pub mod force_parse;
#[cfg(feature = "alpha-transforms")]
pub mod log_to_file;
pub mod printer;
pub mod random_delay;
pub mod returner;
4 changes: 2 additions & 2 deletions shotover/src/transforms/debug/printer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ impl TransformConfig for DebugPrinterConfig {
}

#[derive(Clone)]
pub struct DebugPrinter {
pub(crate) struct DebugPrinter {
counter: i32,
}

Expand All @@ -44,7 +44,7 @@ impl Default for DebugPrinter {
}

impl DebugPrinter {
pub fn new() -> DebugPrinter {
pub(crate) fn new() -> DebugPrinter {
DebugPrinter { counter: 0 }
}
}
Expand Down
31 changes: 0 additions & 31 deletions shotover/src/transforms/debug/random_delay.rs

This file was deleted.

6 changes: 3 additions & 3 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ impl ShotoverNodeConfig {
}

#[derive(Clone)]
pub struct ShotoverNode {
struct ShotoverNode {
pub address: KafkaAddress,
pub rack: StrBytes,
pub broker_id: BrokerId,
Expand Down Expand Up @@ -142,7 +142,7 @@ impl TransformConfig for KafkaSinkClusterConfig {
}
}

pub struct KafkaSinkClusterBuilder {
struct KafkaSinkClusterBuilder {
// contains address and port
first_contact_points: Vec<String>,
shotover_nodes: Vec<ShotoverNode>,
Expand Down Expand Up @@ -265,7 +265,7 @@ impl AtomicBrokerId {
}
}

pub struct KafkaSinkCluster {
struct KafkaSinkCluster {
first_contact_points: Vec<String>,
shotover_nodes: Vec<ShotoverNode>,
rack: StrBytes,
Expand Down
6 changes: 3 additions & 3 deletions shotover/src/transforms/kafka/sink_single.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl TransformConfig for KafkaSinkSingleConfig {
}
}

pub struct KafkaSinkSingleBuilder {
struct KafkaSinkSingleBuilder {
// contains address and port
address_port: u16,
connect_timeout: Duration,
Expand All @@ -63,7 +63,7 @@ pub struct KafkaSinkSingleBuilder {
}

impl KafkaSinkSingleBuilder {
pub fn new(
fn new(
address_port: u16,
_chain_name: String,
connect_timeout_ms: u64,
Expand Down Expand Up @@ -102,7 +102,7 @@ impl TransformBuilder for KafkaSinkSingleBuilder {
}
}

pub struct KafkaSinkSingle {
struct KafkaSinkSingle {
address_port: u16,
connection: Option<SinkConnection>,
connect_timeout: Duration,
Expand Down
10 changes: 5 additions & 5 deletions shotover/src/transforms/load_balance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ impl TransformConfig for ConnectionBalanceAndPoolConfig {
}
}

pub struct ConnectionBalanceAndPoolBuilder {
pub max_connections: usize,
pub all_connections: Arc<Mutex<Vec<BufferedChain>>>,
pub chain_to_clone: Arc<TransformChainBuilder>,
struct ConnectionBalanceAndPoolBuilder {
max_connections: usize,
all_connections: Arc<Mutex<Vec<BufferedChain>>>,
chain_to_clone: Arc<TransformChainBuilder>,
}

impl TransformBuilder for ConnectionBalanceAndPoolBuilder {
Expand All @@ -71,7 +71,7 @@ impl TransformBuilder for ConnectionBalanceAndPoolBuilder {

/// Every cloned instance of ConnectionBalanceAndPool will use a new connection until `max_connections` clones are made.
/// Once this happens cloned instances will reuse connections from earlier clones.
pub struct ConnectionBalanceAndPool {
struct ConnectionBalanceAndPool {
active_connection: Option<BufferedChain>,
max_connections: usize,
all_connections: Arc<Mutex<Vec<BufferedChain>>>,
Expand Down
9 changes: 3 additions & 6 deletions shotover/src/transforms/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,15 @@ pub mod filter;
pub mod kafka;
pub mod load_balance;
pub mod loopback;
pub mod noop;
pub mod null;
#[cfg(all(feature = "alpha-transforms", feature = "opensearch"))]
pub mod opensearch;
pub mod parallel_map;
#[cfg(feature = "cassandra")]
#[cfg(all(feature = "alpha-transforms", feature = "cassandra"))]
pub mod protect;
pub mod query_counter;
#[cfg(feature = "redis")]
pub mod redis;
pub mod sampler;
pub mod tee;
#[cfg(feature = "cassandra")]
pub mod throttling;
Expand Down Expand Up @@ -265,8 +263,7 @@ pub trait Transform: Send {
///
/// * Terminating specific invariants
/// + Your transform can also choose not to call `requests_wrapper.call_next_transform()` if it sends the
/// messages to an external system or generates its own response to the query e.g.
/// [`crate::transforms::cassandra::sink_single::CassandraSinkSingle`].
/// messages to an external system or generates its own response to the query e.g. `CassandraSinkSingle`.
/// + This type of transform is called a Terminating transform (as no subsequent transforms in the chain will be called).
///
/// * Request/Response invariants:
Expand All @@ -276,7 +273,7 @@ pub trait Transform: Send {
/// But it must be returned eventually over the lifetime of the transform.
/// - If a transform deletes a request it must return a simulated response message with its request_id set to the deleted request.
/// * For in order protocols: this simulated message must be in the correct location within the list of responses
/// - The best way to achieve this is storing the [`MessageId`] of the message before the deleted message.
/// - The best way to achieve this is storing the [`crate::message::MessageId`] of the message before the deleted message.
/// - If a transform introduces a new request into the requests_wrapper the response must be located and
/// removed from the list of returned responses.
/// + For in order protocols, transforms must ensure that responses are kept in the same order in which they are received.
Expand Down
31 changes: 0 additions & 31 deletions shotover/src/transforms/noop.rs

This file was deleted.

Loading

0 comments on commit aa0c320

Please sign in to comment.