-
Notifications
You must be signed in to change notification settings - Fork 23
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add Client method returning a stream of rows #40
base: master
Are you sure you want to change the base?
Conversation
struct MyStream<'a, T: Presto + 'static> { | ||
inner: QueryInternal<'a, T>, | ||
phantom: std::marker::PhantomData<T>, | ||
} | ||
|
||
enum QueryInternal<'a, T: Presto + 'static> { | ||
Query { | ||
sql: String, | ||
client: &'a Client, | ||
}, | ||
Next { | ||
already: Vec<T>, | ||
next: Option<String>, | ||
client: &'a Client, | ||
}, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we merge MyStream
and QueryInternal
into something like QueryStream
.
I can't see why a wrapper is necessary.
match self.inner { | ||
QueryInternal::Query { sql, client } => { | ||
let res = client.get_retry::<T>(sql).await?; | ||
let next = res.next_uri.clone(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
clone
seems unnecessary
))) | ||
} else if let Some(url) = next { | ||
let res = client.get_next_retry::<T>(&url).await?; | ||
let next = res.next_uri.clone(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
clone seems unnecessary
) -> impl futures::stream::Stream<Item = Result<T>> + std::marker::Unpin + '_ { | ||
let stream = MyStream { | ||
inner: QueryInternal::Query { sql, client: self }, | ||
phantom: std::marker::PhantomData, | ||
}; | ||
Box::pin(futures::stream::try_unfold(stream, MyStream::next_and_self)) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should remove std::marker::Unpin
on the return type and let user decide which pin strategy to use, e.g. pin!
macro,Box::pin
.
Hi, thanks for the contribution. I have left some comments, please take a look. |
This is a convenience so users can just iterate over a stream of rows. It also opens up the possibility of using
TryStream
combinators, although honestly I haven't found those all that useful.