From 9b5735638aeb7ffe52e59062943c59fa4852ebd3 Mon Sep 17 00:00:00 2001 From: "Aode (Lion)" Date: Thu, 16 Sep 2021 17:23:34 -0500 Subject: [PATCH 1/3] Delegate Future implementation to Next struct Previously, reading values out of Subscriber asynchronously would involve awaiting an &mut Subscriber, which causes re-polling the same future after it yields a value. This is considered 'improper' for a Future, and more closely maps to the Stream trait. Unfortunately, Stream has not been standardized and so implementing that trait would require pulling in the 'futures' dependency, which is not yet 1.0 and subject to change. Instead, this implements a Stream-like poll_next method on Subscriber, and exposes that from an implementation of Future for a new Next<'_> type, which can be constructed by calling '.next()' on a Subscriber. This allows for a more traditional interaction with futures that mimics how other libraries produce values. Note that it is still possible to indefinitely poll on an &mut Next<'_> type to pull every value out of the subscriber, but it is less intuitive to perform that operation and it likely won't occur in downstream projects. --- src/subscriber.rs | 100 +++++++++++++++++++++++++++------------------- 1 file changed, 58 insertions(+), 42 deletions(-) diff --git a/src/subscriber.rs b/src/subscriber.rs index 49d6aae23..5912afaa2 100644 --- a/src/subscriber.rs +++ b/src/subscriber.rs @@ -107,9 +107,9 @@ type Senders = Map, SyncSender>>)>; /// ``` /// Aynchronous, non-blocking subscriber: /// -/// `Subscription` implements `Future>`. +/// `Subscription` provides a `next` method which returns an `impl Future>`. /// -/// `while let Some(event) = (&mut subscriber).await { /* use it */ }` +/// `while let Some(event) = subscriber.next().await { /* use it */ }` pub struct Subscriber { id: usize, rx: Receiver>>, @@ -125,6 +125,44 @@ impl Drop for Subscriber { } impl Subscriber { + /// Creates a future that resolves to the next value of the + /// subscriber, or None if the backing `Db` shuts down + pub fn next(&mut self) -> impl Future> + '_ { + Next { subscriber: self } + } + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + loop { + let mut future_rx = if let Some(future_rx) = self.existing.take() { + future_rx + } else { + match self.rx.try_recv() { + Ok(future_rx) => future_rx, + Err(TryRecvError::Empty) => break, + Err(TryRecvError::Disconnected) => { + return Poll::Ready(None) + } + } + }; + + match Future::poll(Pin::new(&mut future_rx), cx) { + Poll::Ready(Some(event)) => return Poll::Ready(event), + Poll::Ready(None) => continue, + Poll::Pending => { + self.existing = Some(future_rx); + return Poll::Pending; + } + } + } + let mut home = self.home.write(); + let entry = home.get_mut(&self.id).unwrap(); + entry.0 = Some(cx.waker().clone()); + Poll::Pending + } + /// Attempts to wait for a value on this `Subscriber`, returning /// an error if no event arrives within the provided `Duration` /// or if the backing `Db` shuts down. @@ -165,42 +203,6 @@ impl Subscriber { } } -impl Future for Subscriber { - type Output = Option; - - fn poll( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll { - loop { - let mut future_rx = if let Some(future_rx) = self.existing.take() { - future_rx - } else { - match self.rx.try_recv() { - Ok(future_rx) => future_rx, - Err(TryRecvError::Empty) => break, - Err(TryRecvError::Disconnected) => { - return Poll::Ready(None) - } - } - }; - - match Future::poll(Pin::new(&mut future_rx), cx) { - Poll::Ready(Some(event)) => return Poll::Ready(event), - Poll::Ready(None) => continue, - Poll::Pending => { - self.existing = Some(future_rx); - return Poll::Pending; - } - } - } - let mut home = self.home.write(); - let entry = home.get_mut(&self.id).unwrap(); - entry.0 = Some(cx.waker().clone()); - Poll::Pending - } -} - impl Iterator for Subscriber { type Item = Event; @@ -216,6 +218,23 @@ impl Iterator for Subscriber { } } +#[doc(hidden)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +struct Next<'a> { + subscriber: &'a mut Subscriber, +} + +impl<'a> Future for Next<'a> { + type Output = Option; + + fn poll( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll { + Pin::new(&mut *self.subscriber).poll_next(cx) + } +} + #[derive(Debug, Default)] pub(crate) struct Subscribers { watched: RwLock, Arc>>>, @@ -239,10 +258,7 @@ impl Drop for Subscribers { } impl Subscribers { - pub(crate) fn register( - &self, - prefix: &[u8] - ) -> Subscriber { + pub(crate) fn register(&self, prefix: &[u8]) -> Subscriber { self.ever_used.store(true, Relaxed); let r_mu = { let r_mu = self.watched.read(); From da7eb266f7027e1fc0cacc957bade6a8f346dd68 Mon Sep 17 00:00:00 2001 From: "Aode (Lion)" Date: Thu, 16 Sep 2021 17:41:52 -0500 Subject: [PATCH 2/3] Rename .next to .next_event to avoid conflict with Iterator --- src/subscriber.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/subscriber.rs b/src/subscriber.rs index 5912afaa2..e34ca9f5d 100644 --- a/src/subscriber.rs +++ b/src/subscriber.rs @@ -109,7 +109,7 @@ type Senders = Map, SyncSender>>)>; /// /// `Subscription` provides a `next` method which returns an `impl Future>`. /// -/// `while let Some(event) = subscriber.next().await { /* use it */ }` +/// `while let Some(event) = subscriber.next_event().await { /* use it */ }` pub struct Subscriber { id: usize, rx: Receiver>>, @@ -127,7 +127,7 @@ impl Drop for Subscriber { impl Subscriber { /// Creates a future that resolves to the next value of the /// subscriber, or None if the backing `Db` shuts down - pub fn next(&mut self) -> impl Future> + '_ { + pub fn next_event(&mut self) -> impl Future> + '_ { Next { subscriber: self } } From bd3697f38330fe4052361af4aed16104f4debbfd Mon Sep 17 00:00:00 2001 From: "Aode (Lion)" Date: Thu, 16 Sep 2021 17:44:19 -0500 Subject: [PATCH 3/3] Fix doctest --- src/tree.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tree.rs b/src/tree.rs index f472e9ef7..02fda1f36 100644 --- a/src/tree.rs +++ b/src/tree.rs @@ -891,7 +891,7 @@ impl Tree { /// # let config = sled::Config::new().temporary(true); /// # let db = config.open().unwrap(); /// # let mut subscriber = db.watch_prefix(vec![]); - /// while let Some(event) = (&mut subscriber).await { + /// while let Some(event) = subscriber.next_event().await { /// /* use it */ /// } /// # }