Skip to content

Commit

Permalink
Revert "feat: from url and from iterator for streameruri (#28)"
Browse files Browse the repository at this point in the history
This reverts commit b824c71.
  • Loading branch information
chris-numeus committed Jul 6, 2024
1 parent f64ea68 commit a3efff8
Show file tree
Hide file tree
Showing 13 changed files with 26 additions and 82 deletions.
6 changes: 1 addition & 5 deletions sea-streamer-file/src/streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,7 @@ impl StreamerTrait for FileStreamer {

/// First check whether the file exists.
/// If not, depending on the options, either create it, or error.
async fn connect<S>(streamer: S, options: Self::ConnectOptions) -> FileResult<Self>
where
S: Into<StreamerUri> + Send,
{
let uri = streamer.into();
async fn connect(uri: StreamerUri, options: Self::ConnectOptions) -> FileResult<Self> {
if uri.nodes().is_empty() {
return Err(StreamErr::StreamUrlErr(StreamUrlErr::ZeroNode));
}
Expand Down
6 changes: 1 addition & 5 deletions sea-streamer-kafka/src/streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,11 +204,7 @@ impl Streamer for KafkaStreamer {
type ConsumerOptions = KafkaConsumerOptions;
type ProducerOptions = KafkaProducerOptions;

async fn connect<S>(streamer: S, options: Self::ConnectOptions) -> KafkaResult<Self>
where
S: Into<StreamerUri> + Send,
{
let uri = streamer.into();
async fn connect(uri: StreamerUri, options: Self::ConnectOptions) -> KafkaResult<Self> {
let admin = create_admin(&uri, &options).map_err(StreamErr::Backend)?;
let timeout = options.timeout().unwrap_or(Duration::from_secs(1));
spawn_blocking(move || admin.inner().fetch_cluster_id(timeout))
Expand Down
4 changes: 2 additions & 2 deletions sea-streamer-kafka/tests/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ async fn main() -> anyhow::Result<()> {
use sea_streamer_kafka::{AutoOffsetReset, KafkaConsumer, KafkaConsumerOptions, KafkaStreamer};
use sea_streamer_types::{
export::futures::StreamExt, Buffer, Consumer, ConsumerMode, ConsumerOptions, Message,
Producer, SeqPos, ShardId, StreamKey, Streamer, StreamerUri, Timestamp,
Producer, SeqPos, ShardId, StreamKey, Streamer, Timestamp,
};

let streamer = KafkaStreamer::connect(
std::env::var("BROKERS_URL")
.unwrap_or_else(|_| "localhost:9092".to_owned())
.parse::<StreamerUri>()
.parse()
.unwrap(),
Default::default(),
)
Expand Down
6 changes: 1 addition & 5 deletions sea-streamer-redis/src/streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,7 @@ impl Streamer for RedisStreamer {
type ConsumerOptions = RedisConsumerOptions;
type ProducerOptions = RedisProducerOptions;

async fn connect<S>(streamer: S, options: Self::ConnectOptions) -> RedisResult<Self>
where
S: Into<StreamerUri> + Send,
{
let uri = streamer.into();
async fn connect(uri: StreamerUri, options: Self::ConnectOptions) -> RedisResult<Self> {
if uri.protocol().is_none() {
return Err(StreamErr::StreamUrlErr(StreamUrlErr::ProtocolRequired));
}
Expand Down
5 changes: 2 additions & 3 deletions sea-streamer-redis/tests/consumer-group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ async fn consumer_group() -> anyhow::Result<()> {
AutoStreamReset, RedisConnectOptions, RedisConsumerOptions, RedisStreamer,
};
use sea_streamer_types::{
Consumer, ConsumerMode, ConsumerOptions, Producer, StreamKey, Streamer, StreamerUri,
Timestamp,
Consumer, ConsumerMode, ConsumerOptions, Producer, StreamKey, Streamer, Timestamp,
};

const TEST: &str = "group-1";
Expand All @@ -30,7 +29,7 @@ async fn consumer_group() -> anyhow::Result<()> {
let streamer = RedisStreamer::connect(
std::env::var("BROKERS_URL")
.unwrap_or_else(|_| "redis://localhost".to_owned())
.parse::<StreamerUri>()
.parse()
.unwrap(),
options,
)
Expand Down
9 changes: 4 additions & 5 deletions sea-streamer-redis/tests/load-balanced.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ async fn load_balance() -> anyhow::Result<()> {
use sea_streamer_runtime::{sleep, spawn_task};
use sea_streamer_types::{
export::futures::stream::StreamExt, Buffer, Consumer, ConsumerMode, ConsumerOptions,
Message, Producer, StreamKey, Streamer, StreamerUri, Timestamp,
Message, Producer, StreamKey, Streamer, Timestamp,
};
use std::time::Duration;

Expand All @@ -39,7 +39,7 @@ async fn load_balance() -> anyhow::Result<()> {
let streamer = RedisStreamer::connect(
std::env::var("BROKERS_URL")
.unwrap_or_else(|_| "redis://localhost".to_owned())
.parse::<StreamerUri>()
.parse()
.unwrap(),
options,
)
Expand Down Expand Up @@ -176,8 +176,7 @@ async fn failover() -> anyhow::Result<()> {
AutoCommit, AutoStreamReset, RedisConnectOptions, RedisConsumerOptions, RedisStreamer,
};
use sea_streamer_types::{
ConsumerGroup, ConsumerMode, ConsumerOptions, Producer, StreamKey, Streamer, StreamerUri,
Timestamp,
ConsumerGroup, ConsumerMode, ConsumerOptions, Producer, StreamKey, Streamer, Timestamp,
};
use std::time::Duration;

Expand All @@ -193,7 +192,7 @@ async fn failover() -> anyhow::Result<()> {
let streamer = RedisStreamer::connect(
std::env::var("BROKERS_URL")
.unwrap_or_else(|_| "redis://localhost".to_owned())
.parse::<StreamerUri>()
.parse()
.unwrap(),
options,
)
Expand Down
5 changes: 2 additions & 3 deletions sea-streamer-redis/tests/realtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ async fn main() -> anyhow::Result<()> {
};
use sea_streamer_runtime::sleep;
use sea_streamer_types::{
ConsumerMode, ConsumerOptions, Producer, ShardId, StreamKey, Streamer, StreamerUri,
Timestamp,
ConsumerMode, ConsumerOptions, Producer, ShardId, StreamKey, Streamer, Timestamp,
};
use std::time::Duration;

Expand All @@ -30,7 +29,7 @@ async fn main() -> anyhow::Result<()> {
let streamer = RedisStreamer::connect(
std::env::var("BROKERS_URL")
.unwrap_or_else(|_| "redis://localhost".to_owned())
.parse::<StreamerUri>()
.parse()
.unwrap(),
options,
)
Expand Down
8 changes: 4 additions & 4 deletions sea-streamer-redis/tests/resumable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ async fn immediate_and_delayed() -> anyhow::Result<()> {
use sea_streamer_runtime::timeout;
use sea_streamer_types::{
Consumer, ConsumerGroup, ConsumerId, ConsumerMode, ConsumerOptions, Producer, ShardId,
StreamKey, Streamer, StreamerUri, Timestamp,
StreamKey, Streamer, Timestamp,
};
use std::time::Duration;

Expand All @@ -37,7 +37,7 @@ async fn immediate_and_delayed() -> anyhow::Result<()> {
let streamer = RedisStreamer::connect(
std::env::var("BROKERS_URL")
.unwrap_or_else(|_| "redis://localhost".to_owned())
.parse::<StreamerUri>()
.parse()
.unwrap(),
options,
)
Expand Down Expand Up @@ -126,7 +126,7 @@ async fn rolling_and_disabled() -> anyhow::Result<()> {
};
use sea_streamer_types::{
export::futures::StreamExt, Buffer, Consumer, ConsumerGroup, ConsumerId, ConsumerMode,
ConsumerOptions, Message, Producer, StreamKey, Streamer, StreamerUri, Timestamp,
ConsumerOptions, Message, Producer, StreamKey, Streamer, Timestamp,
};
use std::time::Duration;

Expand All @@ -147,7 +147,7 @@ async fn rolling_and_disabled() -> anyhow::Result<()> {
let streamer = RedisStreamer::connect(
std::env::var("BROKERS_URL")
.unwrap_or_else(|_| "redis://localhost".to_owned())
.parse::<StreamerUri>()
.parse()
.unwrap(),
options,
)
Expand Down
5 changes: 2 additions & 3 deletions sea-streamer-redis/tests/seek-rewind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ async fn main() -> anyhow::Result<()> {
};
use sea_streamer_runtime::{sleep, timeout};
use sea_streamer_types::{
Consumer, ConsumerMode, ConsumerOptions, Producer, SeqPos, StreamKey, Streamer,
StreamerUri, Timestamp,
Consumer, ConsumerMode, ConsumerOptions, Producer, SeqPos, StreamKey, Streamer, Timestamp,
};
use std::time::Duration;

Expand All @@ -35,7 +34,7 @@ async fn main() -> anyhow::Result<()> {
let streamer = RedisStreamer::connect(
std::env::var("BROKERS_URL")
.unwrap_or_else(|_| "redis://localhost".to_owned())
.parse::<StreamerUri>()
.parse()
.unwrap(),
options,
)
Expand Down
4 changes: 2 additions & 2 deletions sea-streamer-redis/tests/sharding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ async fn main() -> anyhow::Result<()> {
};
use sea_streamer_runtime::sleep;
use sea_streamer_types::{
ConsumerMode, ConsumerOptions, Producer, StreamKey, Streamer, StreamerUri, Timestamp,
ConsumerMode, ConsumerOptions, Producer, StreamKey, Streamer, Timestamp,
};
use std::time::Duration;

Expand All @@ -31,7 +31,7 @@ async fn main() -> anyhow::Result<()> {
let streamer = RedisStreamer::connect(
std::env::var("BROKERS_URL")
.unwrap_or_else(|_| "redis://localhost".to_owned())
.parse::<StreamerUri>()
.parse()
.unwrap(),
options,
)
Expand Down
6 changes: 1 addition & 5 deletions sea-streamer-socket/src/streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,7 @@ impl Streamer for SeaStreamer {
type ConsumerOptions = SeaConsumerOptions;
type ProducerOptions = SeaProducerOptions;

async fn connect<S>(streamer: S, options: Self::ConnectOptions) -> SeaResult<Self>
where
S: Into<StreamerUri> + Send,
{
let uri = streamer.into();
async fn connect(uri: StreamerUri, options: Self::ConnectOptions) -> SeaResult<Self> {
let backend = match uri.protocol() {
Some(protocol) => match protocol {
#[cfg(feature = "backend-kafka")]
Expand Down
5 changes: 1 addition & 4 deletions sea-streamer-stdio/src/streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,7 @@ impl StreamerTrait for StdioStreamer {
type ProducerOptions = StdioProducerOptions;

/// Nothing will happen until you create a producer/consumer
async fn connect<S>(_: S, options: Self::ConnectOptions) -> StdioResult<Self>
where
S: Into<StreamerUri> + Send,
{
async fn connect(_: StreamerUri, options: Self::ConnectOptions) -> StdioResult<Self> {
let StdioConnectOptions { loopback } = options;
Ok(StdioStreamer { loopback })
}
Expand Down
39 changes: 3 additions & 36 deletions sea-streamer-types/src/streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,10 @@ pub trait Streamer: Sized {
type ProducerOptions: ProducerOptions;

/// Establish a connection to the streaming server.
fn connect<S>(
streamer: S,
fn connect(
streamer: StreamerUri,
options: Self::ConnectOptions,
) -> impl Future<Output = StreamResult<Self, Self::Error>> + Send
where
S: Into<StreamerUri> + Send;
) -> impl Future<Output = StreamResult<Self, Self::Error>> + Send;

/// Flush and disconnect from the streaming server.
fn disconnect(self) -> impl Future<Output = StreamResult<(), Self::Error>> + Send;
Expand Down Expand Up @@ -100,20 +98,6 @@ impl Display for StreamerUri {
}
}

impl From<Url> for StreamerUri {
fn from(value: Url) -> Self {
Self { nodes: vec![value] }
}
}

impl FromIterator<Url> for StreamerUri {
fn from_iter<T: IntoIterator<Item = Url>>(iter: T) -> Self {
Self {
nodes: iter.into_iter().collect(),
}
}
}

impl StreamerUri {
pub fn zero() -> Self {
Self { nodes: Vec::new() }
Expand Down Expand Up @@ -363,23 +347,6 @@ mod test {
assert_eq!(uri.nodes(), &["file:///path/to/hi".parse().unwrap()]);
}

#[test]
fn test_into_streamer_uri() {
let url: Url = "proto://sea-ql.org:1234".parse().unwrap();
let uri: StreamerUri = url.clone().into();
assert!(uri.nodes.len() == 1);
assert_eq!(url, uri.nodes.first().unwrap().clone());

let urls: [Url; 3] = [
"proto://sea-ql.org:1".parse().unwrap(),
"proto://sea-ql.org:2".parse().unwrap(),
"proto://sea-ql.org:3".parse().unwrap(),
];
let uri: StreamerUri = StreamerUri::from_iter(urls.clone().into_iter());
assert!(uri.nodes.len() == 3);
assert!(uri.nodes.iter().eq(urls.iter()));
}

#[test]
fn test_parse_stream_url_err() {
use crate::StreamKeyErr;
Expand Down

0 comments on commit a3efff8

Please sign in to comment.