From 1262e36799b280940649afd274ac15781034b477 Mon Sep 17 00:00:00 2001 From: "Heinz N. Gies" Date: Tue, 9 Apr 2024 14:37:56 +0200 Subject: [PATCH] Some minor API improvements Signed-off-by: Heinz N. Gies --- .github/scripts/install-docker-images.sh | 7 -- Cargo.lock | 80 +++++++++---------- tremor-connectors-aws/Cargo.toml | 2 +- tremor-connectors-aws/tests/aws/streamer.rs | 2 +- tremor-connectors-gcp/Cargo.toml | 3 +- tremor-connectors-gcp/tests/gcp/gpub.rs | 4 +- tremor-connectors/src/harness.rs | 17 +++- tremor-connectors/src/impls/dns/client.rs | 1 - tremor-connectors/src/impls/http/client.rs | 1 - tremor-connectors/src/impls/ws/client.rs | 3 +- tremor-connectors/src/impls/ws/server.rs | 2 +- tremor-connectors/tests/bench.rs | 5 +- .../tests/clickhouse/more_complex_test.rs | 2 +- .../tests/clickhouse/simple_test.rs | 2 +- tremor-connectors/tests/elastic.rs | 14 ++-- tremor-connectors/tests/http/client.rs | 4 +- tremor-connectors/tests/http/server.rs | 5 +- tremor-connectors/tests/kafka/producer.rs | 6 +- tremor-connectors/tests/tcp/client.rs | 7 +- tremor-connectors/tests/tcp/server.rs | 5 +- tremor-connectors/tests/udp.rs | 5 +- tremor-connectors/tests/unix_socket.rs | 9 +-- tremor-connectors/tests/wal.rs | 4 +- tremor-connectors/tests/ws.rs | 10 +-- 24 files changed, 95 insertions(+), 105 deletions(-) delete mode 100755 .github/scripts/install-docker-images.sh diff --git a/.github/scripts/install-docker-images.sh b/.github/scripts/install-docker-images.sh deleted file mode 100755 index 0b5ec8f354..0000000000 --- a/.github/scripts/install-docker-images.sh +++ /dev/null @@ -1,7 +0,0 @@ -#!/usr/bin/env bash - -for test in elastic kafka aws; do - image=$(grep "IMAGE:" tremor-connectors/tests/$test.rs | sed -e 's/.*"\(.*\)".*/\1/g') - version=$(grep "VERSION:" tremor-connectors/tests/$test.rs | sed -e 's/.*"\(.*\)".*/\1/g') - docker pull $image:$version -done \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index aec313d08c..026b7fb29a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -617,7 +617,7 @@ dependencies = [ "aws-smithy-runtime-api", "aws-smithy-types", "aws-types", - "bytes 1.6.0", + "bytes", "fastrand 2.0.2", "http 0.2.12", "hyper", @@ -653,7 +653,7 @@ dependencies = [ "aws-smithy-runtime-api", "aws-smithy-types", "aws-types", - "bytes 1.6.0", + "bytes", "fastrand 2.0.2", "http 0.2.12", "http-body 0.4.6", @@ -683,7 +683,7 @@ dependencies = [ "aws-smithy-types", "aws-smithy-xml", "aws-types", - "bytes 1.6.0", + "bytes", "fastrand 2.0.2", "hex", "hmac 0.12.1", @@ -732,7 +732,7 @@ dependencies = [ "aws-smithy-http", "aws-smithy-runtime-api", "aws-smithy-types", - "bytes 1.6.0", + "bytes", "crypto-bigint 0.5.5", "form_urlencoded", "hex", @@ -769,7 +769,7 @@ checksum = "83fa43bc04a6b2441968faeab56e68da3812f978a670a5db32accbdcafddd12f" dependencies = [ "aws-smithy-http", "aws-smithy-types", - "bytes 1.6.0", + "bytes", "crc32c", "crc32fast", "hex", @@ -789,7 +789,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6363078f927f612b970edf9d1903ef5cef9a64d1e8423525ebb1f0a1633c858" dependencies = [ "aws-smithy-types", - "bytes 1.6.0", + "bytes", "crc32fast", ] @@ -802,7 +802,7 @@ dependencies = [ "aws-smithy-eventstream", "aws-smithy-runtime-api", "aws-smithy-types", - "bytes 1.6.0", + "bytes", "bytes-utils", "futures-core", "http 0.2.12", @@ -843,7 +843,7 @@ dependencies = [ "aws-smithy-http", "aws-smithy-runtime-api", "aws-smithy-types", - "bytes 1.6.0", + "bytes", "fastrand 2.0.2", "h2", "http 0.2.12", @@ -867,7 +867,7 @@ checksum = "ccb2b3a7030dc9a3c9a08ce0b25decea5130e9db19619d4dffbbff34f75fe850" dependencies = [ "aws-smithy-async", "aws-smithy-types", - "bytes 1.6.0", + "bytes", "http 0.2.12", "http 1.1.0", "pin-project-lite 0.2.14", @@ -883,7 +883,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "abe14dceea1e70101d38fbf2a99e6a34159477c0fb95e68e05c66bd7ae4c3729" dependencies = [ "base64-simd", - "bytes 1.6.0", + "bytes", "bytes-utils", "futures-core", "http 0.2.12", @@ -1105,12 +1105,6 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" -[[package]] -name = "bytes" -version = "0.5.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e4cec68f03f32e44924783795810fa50a7035d8c8ebe78580ad7e6c703fba38" - [[package]] name = "bytes" version = "1.6.0" @@ -1123,7 +1117,7 @@ version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7dafe3a8757b027e2be6e4e5601ed563c55989fcf1546e933c66c8eb3a058d35" dependencies = [ - "bytes 1.6.0", + "bytes", "either", ] @@ -1414,7 +1408,7 @@ version = "4.6.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "35ed6e9d84f0b51a7f52daf1c7d71dd136fd7a3f41a8462b8cdb8c78d920fad4" dependencies = [ - "bytes 1.6.0", + "bytes", "memchr", ] @@ -1982,7 +1976,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "40d9bd57d914cc66ce878f098f63ed7b5d5b64c30644a5adb950b008f874a6c6" dependencies = [ "base64 0.11.0", - "bytes 1.6.0", + "bytes", "dyn-clone", "lazy_static", "percent-encoding", @@ -2582,7 +2576,7 @@ version = "0.3.26" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "81fe527a889e1532da5c525686d96d4c2e74cdd345badf8dfef9f6b39dd5f5e8" dependencies = [ - "bytes 1.6.0", + "bytes", "fnv", "futures-core", "futures-sink", @@ -2752,7 +2746,7 @@ version = "0.2.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "601cbb57e577e2f5ef5be8e7b83f0f63994f25aa94d673e54a92d5c516d101f1" dependencies = [ - "bytes 1.6.0", + "bytes", "fnv", "itoa 1.0.11", ] @@ -2763,7 +2757,7 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258" dependencies = [ - "bytes 1.6.0", + "bytes", "fnv", "itoa 1.0.11", ] @@ -2774,7 +2768,7 @@ version = "0.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" dependencies = [ - "bytes 1.6.0", + "bytes", "http 0.2.12", "pin-project-lite 0.2.14", ] @@ -2785,7 +2779,7 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1cac85db508abc24a2e48553ba12a996e87244a0395ce011e62b37158745d643" dependencies = [ - "bytes 1.6.0", + "bytes", "http 1.1.0", ] @@ -2795,7 +2789,7 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0475f8b2ac86659c21b64320d5d653f9efe42acd2a4e560073ec61a155a34f1d" dependencies = [ - "bytes 1.6.0", + "bytes", "futures-core", "http 1.1.0", "http-body 1.0.0", @@ -2867,7 +2861,7 @@ version = "0.14.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bf96e135eb83a2a8ddf766e426a841d8ddd7449d5f00d34ea02b41d2f19eef80" dependencies = [ - "bytes 1.6.0", + "bytes", "futures-channel", "futures-core", "futures-util", @@ -2919,7 +2913,7 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" dependencies = [ - "bytes 1.6.0", + "bytes", "hyper", "native-tls", "tokio", @@ -4227,7 +4221,7 @@ version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "444879275cb4fd84958b1a1d5420d15e6fcf7c235fe47f053c9c2a80aceb6001" dependencies = [ - "bytes 1.6.0", + "bytes", "prost-derive", ] @@ -4237,7 +4231,7 @@ version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "62941722fb675d463659e49c4f3fe1fe792ff24fe5bbaa9c08cd3b98a1c354f5" dependencies = [ - "bytes 1.6.0", + "bytes", "heck 0.3.3", "itertools 0.10.5", "lazy_static", @@ -4270,7 +4264,7 @@ version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "534b7a0e836e3c482d2693070f982e39e7611da9695d4d1f5a4b186b51faef0a" dependencies = [ - "bytes 1.6.0", + "bytes", "prost", ] @@ -4539,7 +4533,7 @@ checksum = "dd67538700a17451e7cba03ac727fb961abb7607553461627b97de0b89cf4a62" dependencies = [ "async-compression", "base64 0.21.7", - "bytes 1.6.0", + "bytes", "encoding_rs", "futures-core", "futures-util", @@ -5118,7 +5112,7 @@ dependencies = [ "async-tungstenite", "base64 0.21.7", "bitflags 1.3.2", - "bytes 1.6.0", + "bytes", "cfg-if", "dashmap", "flate2", @@ -6002,7 +5996,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1adbebffeca75fcfd058afa480fb6c0b81e165a0323f9c9d39c9697e37c46787" dependencies = [ "backtrace", - "bytes 1.6.0", + "bytes", "libc", "mio", "num_cpus", @@ -6106,7 +6100,7 @@ version = "0.6.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "36943ee01a6d67977dd3f84a5a1d2efeb4ada3a1ae771cadfaa535d9d9fc6507" dependencies = [ - "bytes 1.6.0", + "bytes", "futures-core", "futures-sink", "log", @@ -6120,7 +6114,7 @@ version = "0.7.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5419f34732d9eb6ee4c3578b7989078579b7f039cbbb9ca2c4da015749371e15" dependencies = [ - "bytes 1.6.0", + "bytes", "futures-core", "futures-sink", "pin-project-lite 0.2.14", @@ -6154,7 +6148,7 @@ dependencies = [ "async-stream", "async-trait", "base64 0.13.1", - "bytes 1.6.0", + "bytes", "futures-core", "futures-util", "h2", @@ -6419,7 +6413,7 @@ dependencies = [ "base64 0.21.7", "beef", "bimap", - "bytes 1.6.0", + "bytes", "chrono", "chrono-tz", "clickhouse-rs", @@ -6494,12 +6488,12 @@ name = "tremor-connectors-aws" version = "0.1.0" dependencies = [ "anyhow", - "async-channel 1.9.0", + "async-channel 2.2.0", "async-trait", "aws-config", "aws-sdk-s3", "aws-types", - "bytes 1.6.0", + "bytes", "log", "rand 0.8.5", "serde", @@ -6524,7 +6518,7 @@ dependencies = [ "async-channel 1.9.0", "async-stream", "async-trait", - "bytes 0.5.6", + "bytes", "env_logger 0.11.3", "futures", "googapis", @@ -6574,7 +6568,7 @@ version = "0.13.0-rc.16" dependencies = [ "anyhow", "byteorder", - "bytes 1.6.0", + "bytes", "error-chain", "libflate", "log", @@ -6837,7 +6831,7 @@ checksum = "e27992fd6a8c29ee7eef28fc78349aa244134e10ad447ce3b9f0ac0ed0fa4ce0" dependencies = [ "base64 0.13.1", "byteorder", - "bytes 1.6.0", + "bytes", "http 0.2.12", "httparse", "log", @@ -6857,7 +6851,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9e3dac10fd62eaf6617d3a904ae222845979aec67c615d1c842b4002c7666fb9" dependencies = [ "byteorder", - "bytes 1.6.0", + "bytes", "data-encoding", "http 0.2.12", "httparse", diff --git a/tremor-connectors-aws/Cargo.toml b/tremor-connectors-aws/Cargo.toml index aae5450a0b..cc2f11025f 100644 --- a/tremor-connectors-aws/Cargo.toml +++ b/tremor-connectors-aws/Cargo.toml @@ -18,7 +18,7 @@ tremor-config = { path = "../tremor-config" } async-trait = { version = "0.1", default-features = false } serde = { version = "1.0", default-features = false, features = ["derive"] } anyhow = { version = "1", default-features = false } -async-channel = { version = "1.9", default-features = false } +async-channel = { version = "2", default-features = false } thiserror = { version = "1", default-features = false } tokio = { version = "1.34", default-features = false } log = { version = "0.4", default-features = false } diff --git a/tremor-connectors-aws/tests/aws/streamer.rs b/tremor-connectors-aws/tests/aws/streamer.rs index 9845e5c6b4..763c8a98a0 100644 --- a/tremor-connectors-aws/tests/aws/streamer.rs +++ b/tremor-connectors-aws/tests/aws/streamer.rs @@ -255,7 +255,7 @@ async fn connector_s3_consistent() -> anyhow::Result<()> { } async fn send_to_sink(harness: &Harness, event: &Event) -> anyhow::Result<()> { - harness.send_to_sink(event.clone(), IN).await?; + harness.send_to_sink(event.clone()).await?; Ok(()) } diff --git a/tremor-connectors-gcp/Cargo.toml b/tremor-connectors-gcp/Cargo.toml index c82165d289..4a39060dc4 100644 --- a/tremor-connectors-gcp/Cargo.toml +++ b/tremor-connectors-gcp/Cargo.toml @@ -25,8 +25,7 @@ log = { version = "0.4", default-features = true } serde = { version = "1.0", default-features = true, features = ["derive"] } thiserror = { version = "1", default-features = true } -bytes = { version = "0.5", default-features = true } - +bytes = { version = "1.6", default-features = true } googapis = { version = "0.6", default-features = true, features = [ "google-pubsub-v1", "google-cloud-bigquery-storage-v1", diff --git a/tremor-connectors-gcp/tests/gcp/gpub.rs b/tremor-connectors-gcp/tests/gcp/gpub.rs index fcb8637cc3..3eaca96dc4 100644 --- a/tremor-connectors-gcp/tests/gcp/gpub.rs +++ b/tremor-connectors-gcp/tests/gcp/gpub.rs @@ -150,7 +150,7 @@ async fn simple_publish() -> anyhow::Result<()> { data: (Value::String(format!("Event {i}").into()), literal!({})).into(), ..Event::default() }; - harness.send_to_sink(event, IN).await?; + harness.send_to_sink(event).await?; } let mut received_messages = HashSet::new(); @@ -257,7 +257,7 @@ async fn simple_publish_with_timeout() -> anyhow::Result<()> { data: (Value::from("Event X"), literal!({})).into(), ..Event::default() }; - harness.send_to_sink(event, IN).await?; + harness.send_to_sink(event).await?; timeout( Duration::from_secs(10), harness.wait_for_state(State::Failed), diff --git a/tremor-connectors/src/harness.rs b/tremor-connectors/src/harness.rs index f250d6b3f4..9e9c6f30d1 100644 --- a/tremor-connectors/src/harness.rs +++ b/tremor-connectors/src/harness.rs @@ -314,16 +314,29 @@ impl Harness { self.get_pipe(ERR) } - /// Send an event to the connector + /// Send an event to the connector to a specific port /// # Errors /// If the event could not be sent - pub async fn send_to_sink(&self, event: Event, port: Port<'static>) -> anyhow::Result<()> { + pub async fn send_to_sink_port(&self, event: Event, port: Port<'static>) -> anyhow::Result<()> { Ok(self .addr .send_sink(sink::Msg::Event { event, port }) .await?) } + /// Send an event to the connector to the `IN` port + /// # Errors + /// If the event could not be sent + pub async fn send_to_sink(&self, event: Event) -> anyhow::Result<()> { + Ok(self + .addr + .send_sink(sink::Msg::Event { + event, + port: Port::In, + }) + .await?) + } + /// sends a message to the source /// # Errors /// If the message could not be sent diff --git a/tremor-connectors/src/impls/dns/client.rs b/tremor-connectors/src/impls/dns/client.rs index 6c9d796c58..31837cae31 100644 --- a/tremor-connectors/src/impls/dns/client.rs +++ b/tremor-connectors/src/impls/dns/client.rs @@ -22,7 +22,6 @@ use std::sync::{ }; use tokio::sync::mpsc::{channel, Receiver, Sender}; use tremor_common::ports::{ERR, OUT}; -use tremor_script::EventPayload; use tremor_system::event::DEFAULT_STREAM_ID; use tremor_value::prelude::*; use trust_dns_resolver::{ diff --git a/tremor-connectors/src/impls/http/client.rs b/tremor-connectors/src/impls/http/client.rs index 60fe9acb81..19968d7370 100644 --- a/tremor-connectors/src/impls/http/client.rs +++ b/tremor-connectors/src/impls/http/client.rs @@ -44,7 +44,6 @@ use tokio::{ time::timeout, }; use tremor_common::{time::nanotime, url::Url}; -use tremor_system::qsize; use tremor_value::prelude::*; // pipeline -> Sink -> http client diff --git a/tremor-connectors/src/impls/ws/client.rs b/tremor-connectors/src/impls/ws/client.rs index a434fc9b8e..6c49617a40 100644 --- a/tremor-connectors/src/impls/ws/client.rs +++ b/tremor-connectors/src/impls/ws/client.rs @@ -38,8 +38,7 @@ use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio_rustls::TlsConnector; use tokio_tungstenite::client_async; use tremor_common::url::Url; -use tremor_script::EventOriginUri; -use tremor_system::{event::DEFAULT_STREAM_ID, qsize}; +use tremor_system::event::DEFAULT_STREAM_ID; use tremor_value::literal; const URL_SCHEME: &str = "tremor-ws-client"; diff --git a/tremor-connectors/src/impls/ws/server.rs b/tremor-connectors/src/impls/ws/server.rs index c563c95cb9..5c5ed678c8 100644 --- a/tremor-connectors/src/impls/ws/server.rs +++ b/tremor-connectors/src/impls/ws/server.rs @@ -28,7 +28,7 @@ use crate::{ tls::TLSServerConfig, ConnectionMeta, }, - ConnectorType, StreamIdGen, ACCEPT_TIMEOUT, + StreamIdGen, ACCEPT_TIMEOUT, }; use futures::StreamExt; diff --git a/tremor-connectors/tests/bench.rs b/tremor-connectors/tests/bench.rs index 71f6571ef1..9113c9b5c7 100644 --- a/tremor-connectors/tests/bench.rs +++ b/tremor-connectors/tests/bench.rs @@ -16,7 +16,6 @@ use log::{error, info}; use std::{io::Write, time::Duration}; use tempfile::NamedTempFile; use tokio::{sync::mpsc::channel, time::timeout}; -use tremor_common::ports::IN; use tremor_connectors::{harness::Harness, impls::bench}; use tremor_system::killswitch::KillSwitch; use tremor_value::prelude::*; @@ -51,7 +50,7 @@ async fn stop_after_events() -> anyhow::Result<()> { // echo pipeline for _ in 0..6 { let event = harness.out()?.get_event().await?; - harness.send_to_sink(event, IN).await?; + harness.send_to_sink(event).await?; } anyhow::Ok(()) }); @@ -95,7 +94,7 @@ async fn stop_after_secs() -> anyhow::Result<()> { Ok(r) => r, Err(e) => return anyhow::Result::<()>::Err(e), }; - if let Err(e) = harness.send_to_sink(event, IN).await { + if let Err(e) = harness.send_to_sink(event).await { error!("Error sending event to sink: {e}"); } } diff --git a/tremor-connectors/tests/clickhouse/more_complex_test.rs b/tremor-connectors/tests/clickhouse/more_complex_test.rs index e556a42e9b..f0c73ea849 100644 --- a/tremor-connectors/tests/clickhouse/more_complex_test.rs +++ b/tremor-connectors/tests/clickhouse/more_complex_test.rs @@ -269,7 +269,7 @@ async fn test() -> Result<()> { ..Event::default() }; - harness.send_to_sink(event, IN).await?; + harness.send_to_sink(event).await?; // Once the data has been inserted, we wait for the "I handled everything" // signal and check its properties. diff --git a/tremor-connectors/tests/clickhouse/simple_test.rs b/tremor-connectors/tests/clickhouse/simple_test.rs index eaf8c63814..89600f423e 100644 --- a/tremor-connectors/tests/clickhouse/simple_test.rs +++ b/tremor-connectors/tests/clickhouse/simple_test.rs @@ -112,7 +112,7 @@ async fn simple_insertion() -> anyhow::Result<()> { ..Event::default() }; - harness.send_to_sink(event, IN).await?; + harness.send_to_sink(event).await?; // Once the data has been inserted, we wait for the "I handled everything" // signal and check its properties. diff --git a/tremor-connectors/tests/elastic.rs b/tremor-connectors/tests/elastic.rs index 954071d25a..d4f642b087 100644 --- a/tremor-connectors/tests/elastic.rs +++ b/tremor-connectors/tests/elastic.rs @@ -123,7 +123,7 @@ async fn connector_elastic() -> anyhow::Result<()> { transactional: false, ..Event::default() }; - harness.send_to_sink(event_not_batched, IN).await?; + harness.send_to_sink(event_not_batched).await?; let err_events = harness.err()?.get_events(); assert!(err_events.is_empty(), "Received err msgs: {err_events:?}"); let event = harness.out()?.get_event().await?; @@ -189,7 +189,7 @@ async fn connector_elastic() -> anyhow::Result<()> { transactional: false, ..Event::default() }; - harness.send_to_sink(event_not_batched, IN).await?; + harness.send_to_sink(event_not_batched).await?; let err_events = harness.err()?.get_events(); assert!(err_events.is_empty(), "Received err msgs: {err_events:?}"); let event = harness.out()?.get_event().await?; @@ -280,7 +280,7 @@ async fn connector_elastic() -> anyhow::Result<()> { data: (batched_data, batched_meta).into(), ..Event::default() }; - harness.send_to_sink(event_batched, IN).await?; + harness.send_to_sink(event_batched).await?; let out_event1 = harness.out()?.get_event().await?; let meta = out_event1.data.suffix().meta().clone_static(); @@ -387,7 +387,7 @@ async fn connector_elastic() -> anyhow::Result<()> { data: (update_data, update_meta).into(), ..Event::default() }; - harness.send_to_sink(update_event, IN).await?; + harness.send_to_sink(update_event).await?; let out_event = harness.out()?.get_event().await?; let meta = out_event.data.suffix().meta(); assert_eq!( @@ -442,7 +442,7 @@ async fn connector_elastic() -> anyhow::Result<()> { .into(), ..Event::default() }; - harness.send_to_sink(event.clone(), IN).await?; + harness.send_to_sink(event.clone()).await?; let cf = harness.get_pipe(IN)?.get_contraflow().await?; assert_eq!(CbAction::Fail, cf.cb); let err_event = harness.err()?.get_event().await?; @@ -587,7 +587,7 @@ async fn elastic_routing() -> anyhow::Result<()> { data: (batched_data, batched_meta).into(), ..Event::default() }; - harness.send_to_sink(event_batched, IN).await?; + harness.send_to_sink(event_batched).await?; let out_event1 = harness.out()?.get_event().await?; let meta = out_event1.data.suffix().meta(); assert_eq!( @@ -1223,7 +1223,7 @@ async fn send_one_event(harness: &mut Harness) -> anyhow::Result<()> { data: (data, meta).into(), ..Event::default() }; - harness.send_to_sink(event, IN).await?; + harness.send_to_sink(event).await?; let success_event = harness .out() .expect("NO pipeline connected to port OUT") diff --git a/tremor-connectors/tests/http/client.rs b/tremor-connectors/tests/http/client.rs index d0c1ea2417..0d6999e7ff 100644 --- a/tremor-connectors/tests/http/client.rs +++ b/tremor-connectors/tests/http/client.rs @@ -26,7 +26,7 @@ use std::{ }; use tokio::task::{spawn, JoinHandle}; use tremor_common::url::HttpDefaults; -use tremor_common::{ports::IN, url::Url}; +use tremor_common::url::Url; use tremor_connectors::{ harness::Harness, impls::http::{self as http_impl, meta::content_type}, @@ -158,7 +158,7 @@ async fn rtt( harness.start().await?; harness.wait_for_connected().await?; harness.consume_initial_sink_contraflow().await?; - harness.send_to_sink(event, IN).await?; + harness.send_to_sink(event).await?; let event = harness.out()?.get_event().await; let event = match event { Ok(event) => event, diff --git a/tremor-connectors/tests/http/server.rs b/tremor-connectors/tests/http/server.rs index 94d245705e..c1b6d5f2c6 100644 --- a/tremor-connectors/tests/http/server.rs +++ b/tremor-connectors/tests/http/server.rs @@ -21,7 +21,6 @@ use std::{ time::{Duration, Instant}, }; use tokio::time::timeout; -use tremor_common::ports::IN; use tremor_connectors::{ harness::Harness, impls::http::{meta::content_type, server}, @@ -60,7 +59,7 @@ where ..Event::default() }; // pass the processed event to the sink - connector.send_to_sink(event, IN).await?; + connector.send_to_sink(event).await?; } let (_out, _err) = connector.stop().await?; @@ -371,7 +370,7 @@ async fn https_server_test() -> Result<()> { data: (value, meta).into(), ..Event::default() }; - connector.send_to_sink(event, IN).await?; + connector.send_to_sink(event).await?; } anyhow::Ok(()) }); diff --git a/tremor-connectors/tests/kafka/producer.rs b/tremor-connectors/tests/kafka/producer.rs index 6862025930..2685cf9752 100644 --- a/tremor-connectors/tests/kafka/producer.rs +++ b/tremor-connectors/tests/kafka/producer.rs @@ -128,7 +128,7 @@ async fn connector_kafka_producer() -> anyhow::Result<()> { transactional: false, ..Event::default() }; - harness.send_to_sink(e1, IN).await?; + harness.send_to_sink(e1).await?; match timeout(Duration::from_secs(30), message_stream.next()) // first message, we might need to wait a little longer for the consumer to boot up and settle things with redpanda .await? { @@ -165,7 +165,7 @@ async fn connector_kafka_producer() -> anyhow::Result<()> { transactional: true, ..Event::default() }; - harness.send_to_sink(e2, IN).await?; + harness.send_to_sink(e2).await?; match timeout(Duration::from_secs(5), message_stream.next()).await? { Some(Ok(msg)) => { assert_eq!(Some("badger".as_bytes()), msg.key()); @@ -218,7 +218,7 @@ async fn connector_kafka_producer() -> anyhow::Result<()> { is_batch: true, ..Event::default() }; - harness.send_to_sink(batched_event, IN).await?; + harness.send_to_sink(batched_event).await?; let borrowed_batchman_msg = timeout(Duration::from_secs(2), message_stream.next()) .await? .expect("timeout waiting for batchman message") diff --git a/tremor-connectors/tests/tcp/client.rs b/tremor-connectors/tests/tcp/client.rs index ad981643d6..9c00919794 100644 --- a/tremor-connectors/tests/tcp/client.rs +++ b/tremor-connectors/tests/tcp/client.rs @@ -15,7 +15,6 @@ use crate::EchoServer; use std::time::Duration; use tokio::net::lookup_host; -use tremor_common::ports::IN; use tremor_connectors::{ harness::Harness, impls::tcp, @@ -88,7 +87,7 @@ async fn tcp_client_test(use_tls: bool) -> anyhow::Result<()> { transactional: true, ..Event::default() }; - connector.send_to_sink(event, IN).await?; + connector.send_to_sink(event).await?; let response = connector.out()?.get_event().await?; let localhost_ip = lookup_host(("localhost", 0)) .await? @@ -126,11 +125,11 @@ async fn tcp_client_test(use_tls: bool) -> anyhow::Result<()> { transactional: true, ..Event::default() }; - connector.send_to_sink(event.clone(), IN).await?; + connector.send_to_sink(event.clone()).await?; let mut cf = connector.get_pipe("in")?.get_contraflow().await?; while matches!(cf.cb, CbAction::Ack) { tokio::time::sleep(Duration::from_millis(100)).await; - connector.send_to_sink(event.clone(), IN).await?; + connector.send_to_sink(event.clone()).await?; cf = connector.get_pipe("in")?.get_contraflow().await?; } assert_eq!(CbAction::Fail, cf.cb); diff --git a/tremor-connectors/tests/tcp/server.rs b/tremor-connectors/tests/tcp/server.rs index c4c3d7b511..59989bf70f 100644 --- a/tremor-connectors/tests/tcp/server.rs +++ b/tremor-connectors/tests/tcp/server.rs @@ -20,7 +20,6 @@ use tokio::{ net::TcpStream, time::timeout, }; -use tremor_common::ports::IN; use tremor_connectors::{harness::Harness, impls::tcp, utils::integration::free_port}; use tremor_system::event::{Event, EventId}; use tremor_value::{literal, prelude::*, Value}; @@ -69,7 +68,7 @@ async fn server_event_routing() -> anyhow::Result<()> { data: (Value::String("badger".into()), meta).into(), ..Event::default() }; - harness.send_to_sink(event1, IN).await?; + harness.send_to_sink(event1).await?; let mut buf = vec![0_u8; 8192]; let bytes_read = timeout(Duration::from_secs(2), socket1.read(&mut buf)).await??; let data = &buf[0..bytes_read]; @@ -89,7 +88,7 @@ async fn server_event_routing() -> anyhow::Result<()> { ..Event::default() }; - harness.send_to_sink(event2, IN).await?; + harness.send_to_sink(event2).await?; let bytes_read = timeout(Duration::from_secs(5), socket2.read(&mut buf)).await??; let data = &buf[0..bytes_read]; assert_eq!("fleek", &String::from_utf8_lossy(data)); diff --git a/tremor-connectors/tests/udp.rs b/tremor-connectors/tests/udp.rs index 1786f7b55f..e5695e0947 100644 --- a/tremor-connectors/tests/udp.rs +++ b/tremor-connectors/tests/udp.rs @@ -13,7 +13,6 @@ // limitations under the License. #![cfg(feature = "integration-tests-udp")] -use tremor_common::ports::IN; use tremor_connectors::harness::Harness; use tremor_connectors::impls::udp; use tremor_system::event::Event; @@ -49,7 +48,7 @@ async fn udp_no_bind() -> anyhow::Result<()> { data: (Value::String("badger".into()), literal!({})).into(), ..Event::default() }; - client_harness.send_to_sink(event1, IN).await?; + client_harness.send_to_sink(event1).await?; // send something to socket 2 let server_event = server_harness.out()?.get_event().await?; // send an event and route it via eventid to socket 2 @@ -95,7 +94,7 @@ async fn udp_bind() -> anyhow::Result<()> { data: (Value::String("badger".into()), literal!({})).into(), ..Event::default() }; - client_harness.send_to_sink(event1, IN).await?; + client_harness.send_to_sink(event1).await?; // send something to socket 2 let server_event = server_harness.out()?.get_event().await?; // send an event and route it via eventid to socket 2 diff --git a/tremor-connectors/tests/unix_socket.rs b/tremor-connectors/tests/unix_socket.rs index e86a2dc84a..3de47b2542 100644 --- a/tremor-connectors/tests/unix_socket.rs +++ b/tremor-connectors/tests/unix_socket.rs @@ -19,11 +19,10 @@ use tokio::{ io::{AsyncReadExt, AsyncWriteExt}, net::UnixStream, }; -use tremor_common::ports::IN; use tremor_connectors::harness::Harness; use tremor_connectors::impls::unix_socket; use tremor_system::event::{Event, EventId}; -use tremor_value::{literal, prelude::*, Value}; +use tremor_value::prelude::*; #[tokio::test(flavor = "multi_thread")] async fn unix_socket() -> anyhow::Result<()> { @@ -92,7 +91,7 @@ async fn unix_socket() -> anyhow::Result<()> { data: (Value::String("badger".into()), meta).into(), ..Event::default() }; - server_harness.send_to_sink(event1, IN).await?; + server_harness.send_to_sink(event1).await?; let mut buf = vec![0_u8; 8192]; let bytes_read = socket1.read(&mut buf).await?; @@ -108,7 +107,7 @@ async fn unix_socket() -> anyhow::Result<()> { ..Event::default() }; - client_harness.send_to_sink(event, IN).await?; + client_harness.send_to_sink(event).await?; // send something to socket 2 let server_event = server_harness.out()?.get_event().await?; // send an event and route it via eventid to socket 2 @@ -119,7 +118,7 @@ async fn unix_socket() -> anyhow::Result<()> { data: (Value::String("fleek".into()), Value::object()).into(), ..Event::default() }; - server_harness.send_to_sink(event2, IN).await?; + server_harness.send_to_sink(event2).await?; let client_event = client_harness.out()?.get_event().await?; assert_eq!("fleek", client_event.data.parts().0.to_string()); debug!("Received event 2 via client"); diff --git a/tremor-connectors/tests/wal.rs b/tremor-connectors/tests/wal.rs index debe1df7f7..383a6101a0 100644 --- a/tremor-connectors/tests/wal.rs +++ b/tremor-connectors/tests/wal.rs @@ -54,7 +54,7 @@ async fn wal() -> anyhow::Result<()> { transactional: false, ..Event::default() }; - harness.send_to_sink(event, IN).await?; + harness.send_to_sink(event).await?; let event = harness.out()?.get_event().await?; // event is now transactional let ack_id = event.id.clone(); @@ -69,7 +69,7 @@ async fn wal() -> anyhow::Result<()> { transactional: true, ..Event::default() }; - harness.send_to_sink(event, IN).await?; + harness.send_to_sink(event).await?; // check that we got an ack for the event let cf = harness.get_pipe(IN)?.get_contraflow().await?; diff --git a/tremor-connectors/tests/ws.rs b/tremor-connectors/tests/ws.rs index 260103ba59..b4c2ac8e10 100644 --- a/tremor-connectors/tests/ws.rs +++ b/tremor-connectors/tests/ws.rs @@ -347,7 +347,7 @@ async fn ws_server_text_routing() -> Result<()> { data: (Value::String("badger".into()), meta).into(), ..Event::default() }; - harness.send_to_sink(echo_back, IN).await?; + harness.send_to_sink(echo_back).await?; assert_eq!(ExpectMessage::Text("\"badger\"".into()), c1.expect()?); //cleanup @@ -390,7 +390,7 @@ async fn ws_client_binary_routing() -> Result<()> { data: (Value::String("badger".into()), meta).into(), ..Event::default() }; - harness.send_to_sink(echo_back, IN).await?; + harness.send_to_sink(echo_back).await?; let data: Vec = br#""badger""#.to_vec(); assert_eq!(ExpectMessage::Binary(data), ts.expect()?); @@ -434,7 +434,7 @@ async fn ws_client_text_routing() -> Result<()> { data: (Value::String("badger".into()), meta).into(), ..Event::default() }; - harness.send_to_sink(echo_back, IN).await?; + harness.send_to_sink(echo_back).await?; assert_eq!(ExpectMessage::Text(r#""badger""#.to_string()), ts.expect()?); @@ -516,7 +516,7 @@ async fn wss_server_text_routing() -> Result<()> { data: (Value::String("badger".into()), meta).into(), ..Event::default() }; - harness.send_to_sink(echo_back, IN).await?; + harness.send_to_sink(echo_back).await?; assert_eq!( ExpectMessage::Text(r#""badger""#.to_string()), c1.expect().await? @@ -601,7 +601,7 @@ async fn wss_server_binary_routing() -> Result<()> { data: (Value::String("badger".into()), meta).into(), ..Event::default() }; - harness.send_to_sink(echo_back, IN).await?; + harness.send_to_sink(echo_back).await?; let data = br#""badger""#.to_vec(); assert_eq!(ExpectMessage::Binary(data), c1.expect().await?);