Skip to content

Commit

Permalink
Expose streams for signals (#152)
Browse files Browse the repository at this point in the history
* Add ColorScheme stream

* Add tracing

* Replace signal fn

* Switch to futures_lite::StreamExt

* Avoid duplicate schemes

* Rename constants

* Message for reasoning about about futures-lite

The reason futures-lites is used is that its StreamExt::filter_map takes a simple FnMut that maps to an Option in contrast to the one from futures-utils, which maps to a future. In some cases it simplifies implementation a lot and also prevents the problem of the stream not beeing Unpin

* Fix doc tests

* Fix typo

* Remove futures_lite

* Remove receive_color_scheme_changed_with_init

* Remove unused

* Cargo fmt
  • Loading branch information
d2weber authored Aug 11, 2023
1 parent ee3777f commit 230bde9
Show file tree
Hide file tree
Showing 13 changed files with 156 additions and 101 deletions.
28 changes: 16 additions & 12 deletions src/desktop/clipboard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

use std::collections::HashMap;

use futures_util::{Stream, StreamExt};
use zbus::zvariant::{DeserializeDict, OwnedFd, OwnedObjectPath, SerializeDict, Type, Value};

use super::Session;
Expand Down Expand Up @@ -117,27 +118,30 @@ impl<'a> Clipboard<'a> {
#[doc(alias = "SelectionOwnerChanged")]
pub async fn receive_selection_owner_changed(
&self,
) -> Result<(Session, SelectionOwnerChanged)> {
let (object_path, options) = self
) -> Result<impl Stream<Item = (Session, SelectionOwnerChanged)>> {
Ok(self
.0
.signal::<(OwnedObjectPath, SelectionOwnerChanged)>("SelectionOwnerChanged")
.await?;
let session = Session::new(object_path).await?;

Ok((session, options))
.await?
.filter_map(|(p, o)| async move { Session::new(p).await.map(|s| (s, o)).ok() }))
}

/// # Specifications
///
/// See also [`SelectionTransfer`](https://flatpak.github.io/xdg-desktop-portal/index.html#gdbus-signal-org-freedesktop-portal-Clipboard.SelectionTransfer).
#[doc(alias = "SelectionTransfer")]
pub async fn receive_selection_transfer(&self) -> Result<(Session, String, u32)> {
let (object_path, mime_type, serial) = self
pub async fn receive_selection_transfer(
&self,
) -> Result<impl Stream<Item = (Session, String, u32)>> {
Ok(self
.0
.signal::<(OwnedObjectPath, String, u32)>("SelectionTransfer")
.await?;
let session = Session::new(object_path).await?;

Ok((session, mime_type, serial))
.await?
.filter_map(|(p, mime_type, serial)| async move {
Session::new(p)
.await
.map(|session| (session, mime_type, serial))
.ok()
}))
}
}
10 changes: 6 additions & 4 deletions src/desktop/global_shortcuts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use std::{collections::HashMap, fmt::Debug, time::Duration};

use futures_util::TryFutureExt;
use futures_util::{Stream, TryFutureExt};
use serde::{Deserialize, Serialize};
use zbus::zvariant::{
DeserializeDict, ObjectPath, OwnedObjectPath, OwnedValue, SerializeDict, Type,
Expand Down Expand Up @@ -277,7 +277,7 @@ impl<'a> GlobalShortcuts<'a> {
///
/// See also [`Activated`](https://flatpak.github.io/xdg-desktop-portal/index.html#gdbus-signal-org-freedesktop-portal-GlobalShortcuts.Activated).
#[doc(alias = "Activated")]
pub async fn receive_activated(&self) -> Result<Activated, Error> {
pub async fn receive_activated(&self) -> Result<impl Stream<Item = Activated>, Error> {
self.0.signal("Activated").await
}

Expand All @@ -287,7 +287,7 @@ impl<'a> GlobalShortcuts<'a> {
///
/// See also [`Deactivated`](https://flatpak.github.io/xdg-desktop-portal/index.html#gdbus-signal-org-freedesktop-portal-GlobalShortcuts.Deactivated).
#[doc(alias = "Deactivated")]
pub async fn receive_deactivated(&self) -> Result<Deactivated, Error> {
pub async fn receive_deactivated(&self) -> Result<impl Stream<Item = Deactivated>, Error> {
self.0.signal("Deactivated").await
}

Expand All @@ -298,7 +298,9 @@ impl<'a> GlobalShortcuts<'a> {
///
/// See also [`ShortcutsChanged`](https://flatpak.github.io/xdg-desktop-portal/index.html#gdbus-signal-org-freedesktop-portal-GlobalShortcuts.ShortcutsChanged).
#[doc(alias = "ShortcutsChanged")]
pub async fn receive_shortcuts_changed(&self) -> Result<ShortcutsChanged, Error> {
pub async fn receive_shortcuts_changed(
&self,
) -> Result<impl Stream<Item = ShortcutsChanged>, Error> {
self.0.signal("ShortcutsChanged").await
}
}
7 changes: 4 additions & 3 deletions src/desktop/inhibit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
//!
//! ```rust,no_run
//! use std::{thread, time};
//! use futures_util::StreamExt;
//!
//! use ashpd::{
//! desktop::inhibit::{InhibitFlags, InhibitProxy, SessionState},
Expand All @@ -16,7 +17,7 @@
//!
//! let session = proxy.create_monitor(&identifier).await?;
//!
//! let state = proxy.receive_state_changed().await?;
//! let state = proxy.receive_state_changed().await?.next().await.unwrap();
//! match state.session_state() {
//! SessionState::Running => (),
//! SessionState::QueryEnd => {
Expand All @@ -39,7 +40,7 @@
//! ```

use enumflags2::{bitflags, BitFlags};
use futures_util::TryFutureExt;
use futures_util::{Stream, TryFutureExt};
use serde::Deserialize;
use serde_repr::{Deserialize_repr, Serialize_repr};
use zbus::zvariant::{DeserializeDict, OwnedObjectPath, SerializeDict, Type};
Expand Down Expand Up @@ -220,7 +221,7 @@ impl<'a> InhibitProxy<'a> {
/// See also [`StateChanged`](https://flatpak.github.io/xdg-desktop-portal/index.html#gdbus-signal-org-freedesktop-portal-Inhibit.StateChanged).
#[doc(alias = "StateChanged")]
#[doc(alias = "XdpPortal::session-state-changed")]
pub async fn receive_state_changed(&self) -> Result<InhibitState, Error> {
pub async fn receive_state_changed(&self) -> Result<impl Stream<Item = InhibitState>, Error> {
self.0.signal("StateChanged").await
}

Expand Down
21 changes: 10 additions & 11 deletions src/desktop/location.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,33 +5,32 @@
//! desktop::location::{Accuracy, LocationProxy},
//! WindowIdentifier,
//! };
//! use futures_util::TryFutureExt;
//! use futures_util::{FutureExt, StreamExt};
//!
//! async fn run() -> ashpd::Result<()> {
//! let proxy = LocationProxy::new().await?;
//! let identifier = WindowIdentifier::default();
//!
//! let session = proxy
//! .create_session(None, None, Some(Accuracy::Street))
//! .await?;
//!
//! let (_, location) = futures_util::try_join!(
//! proxy.start(&session, &identifier).into_future(),
//! proxy.receive_location_updated().into_future()
//! )?;
//!
//! let mut stream = proxy.receive_location_updated().await?;
//! let (_, location) = futures_util::join!(
//! proxy
//! .start(&session, &identifier)
//! .map(|e| e.expect("Couldn't start session")),
//! stream.next().map(|e| e.expect("Stream is exhausted"))
//! );
//! println!("{}", location.accuracy());
//! println!("{}", location.longitude());
//! println!("{}", location.latitude());
//! session.close().await?;
//!
//! Ok(())
//! }
//! ```

use std::fmt::Debug;

use futures_util::TryFutureExt;
use futures_util::{Stream, TryFutureExt};
use serde::Deserialize;
use serde_repr::Serialize_repr;
use zbus::zvariant::{DeserializeDict, OwnedObjectPath, SerializeDict, Type};
Expand Down Expand Up @@ -209,7 +208,7 @@ impl<'a> LocationProxy<'a> {
/// See also [`LocationUpdated`](https://flatpak.github.io/xdg-desktop-portal/index.html#gdbus-signal-org-freedesktop-portal-Location.LocationUpdated).
#[doc(alias = "LocationUpdated")]
#[doc(alias = "XdpPortal::location-updated")]
pub async fn receive_location_updated(&self) -> Result<Location, Error> {
pub async fn receive_location_updated(&self) -> Result<impl Stream<Item = Location>, Error> {
self.0.signal("LocationUpdated").await
}

Expand Down
12 changes: 10 additions & 2 deletions src/desktop/memory_monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,23 @@
//!
//! ```rust,no_run
//! use ashpd::desktop::memory_monitor::MemoryMonitor;
//! use futures_util::StreamExt;
//!
//! async fn run() -> ashpd::Result<()> {
//! let proxy = MemoryMonitor::new().await?;
//! let level = proxy.receive_low_memory_warning().await?;
//! let level = proxy
//! .receive_low_memory_warning()
//! .await?
//! .next()
//! .await
//! .expect("Stream exhausted");
//! println!("{}", level);
//! Ok(())
//! }
//! ```

use futures_util::Stream;

use crate::{proxy::Proxy, Error};

/// The interface provides information about low system memory to sandboxed
Expand Down Expand Up @@ -39,7 +47,7 @@ impl<'a> MemoryMonitor<'a> {
///
/// See also [`LowMemoryWarning`](https://flatpak.github.io/xdg-desktop-portal/index.html#gdbus-signal-org-freedesktop-portal-MemoryMonitor.LowMemoryWarning).
#[doc(alias = "LowMemoryWarning")]
pub async fn receive_low_memory_warning(&self) -> Result<i32, Error> {
pub async fn receive_low_memory_warning(&self) -> Result<impl Stream<Item = i32>, Error> {
self.0.signal("LowMemoryWarning").await
}
}
3 changes: 2 additions & 1 deletion src/desktop/network_monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

use std::fmt;

use futures_util::Stream;
use serde_repr::Deserialize_repr;
use zbus::zvariant::{DeserializeDict, Type};

Expand Down Expand Up @@ -165,7 +166,7 @@ impl<'a> NetworkMonitor<'a> {
/// # Specifications
///
/// See also [`changed`](https://flatpak.github.io/xdg-desktop-portal/index.html#gdbus-signal-org-freedesktop-portal-NetworkMonitor.changed).
pub async fn receive_changed(&self) -> Result<(), Error> {
pub async fn receive_changed(&self) -> Result<impl Stream<Item = ()>, Error> {
self.0.signal("changed").await
}
}
11 changes: 9 additions & 2 deletions src/desktop/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
//!
//! ```rust,no_run
//! use std::{thread, time};
//! use futures_util::StreamExt;
//!
//! use ashpd::desktop::{
//! notification::{Action, Button, Notification, NotificationProxy, Priority},
Expand All @@ -27,7 +28,12 @@
//! )
//! .await?;
//!
//! let action = proxy.receive_action_invoked().await?;
//! let action = proxy
//! .receive_action_invoked()
//! .await?
//! .next()
//! .await
//! .expect("Stream exhausted");
//! match action.name() {
//! "copy" => (), // Copy something to clipboard
//! "delete" => (), // Delete the file
Expand All @@ -46,6 +52,7 @@

use std::{fmt, str::FromStr};

use futures_util::Stream;
use serde::{self, Deserialize, Serialize};
use zbus::zvariant::{OwnedValue, SerializeDict, Type, Value};

Expand Down Expand Up @@ -300,7 +307,7 @@ impl<'a> NotificationProxy<'a> {
/// See also [`ActionInvoked`](https://flatpak.github.io/xdg-desktop-portal/index.html#gdbus-signal-org-freedesktop-portal-Notification.ActionInvoked).
#[doc(alias = "ActionInvoked")]
#[doc(alias = "XdpPortal::notification-action-invoked")]
pub async fn receive_action_invoked(&self) -> Result<Action, Error> {
pub async fn receive_action_invoked(&self) -> Result<impl Stream<Item = Action>, Error> {
self.0.signal("ActionInvoked").await
}

Expand Down
3 changes: 2 additions & 1 deletion src/desktop/session.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{collections::HashMap, fmt::Debug};

use futures_util::Stream;
use serde::{Serialize, Serializer};
use zbus::zvariant::{ObjectPath, OwnedValue, Signature, Type};

Expand Down Expand Up @@ -51,7 +52,7 @@ impl<'a> Session<'a> {
///
/// See also [`Closed`](https://flatpak.github.io/xdg-desktop-portal/index.html#gdbus-signal-org-freedesktop-portal-Session.Closed).
#[doc(alias = "Closed")]
pub async fn receive_closed(&self) -> Result<SessionDetails, Error> {
pub async fn receive_closed(&self) -> Result<impl Stream<Item = SessionDetails>, Error> {
self.0.signal("Closed").await
}

Expand Down
Loading

0 comments on commit 230bde9

Please sign in to comment.