diff --git a/src/desktop/clipboard.rs b/src/desktop/clipboard.rs index 65b80e98d..9d10541e3 100644 --- a/src/desktop/clipboard.rs +++ b/src/desktop/clipboard.rs @@ -5,6 +5,7 @@ use std::collections::HashMap; +use futures_util::{Stream, StreamExt}; use zbus::zvariant::{DeserializeDict, OwnedFd, OwnedObjectPath, SerializeDict, Type, Value}; use super::Session; @@ -117,27 +118,30 @@ impl<'a> Clipboard<'a> { #[doc(alias = "SelectionOwnerChanged")] pub async fn receive_selection_owner_changed( &self, - ) -> Result<(Session, SelectionOwnerChanged)> { - let (object_path, options) = self + ) -> Result> { + Ok(self .0 .signal::<(OwnedObjectPath, SelectionOwnerChanged)>("SelectionOwnerChanged") - .await?; - let session = Session::new(object_path).await?; - - Ok((session, options)) + .await? + .filter_map(|(p, o)| async move { Session::new(p).await.map(|s| (s, o)).ok() })) } /// # Specifications /// /// See also [`SelectionTransfer`](https://flatpak.github.io/xdg-desktop-portal/index.html#gdbus-signal-org-freedesktop-portal-Clipboard.SelectionTransfer). #[doc(alias = "SelectionTransfer")] - pub async fn receive_selection_transfer(&self) -> Result<(Session, String, u32)> { - let (object_path, mime_type, serial) = self + pub async fn receive_selection_transfer( + &self, + ) -> Result> { + Ok(self .0 .signal::<(OwnedObjectPath, String, u32)>("SelectionTransfer") - .await?; - let session = Session::new(object_path).await?; - - Ok((session, mime_type, serial)) + .await? + .filter_map(|(p, mime_type, serial)| async move { + Session::new(p) + .await + .map(|session| (session, mime_type, serial)) + .ok() + })) } } diff --git a/src/desktop/global_shortcuts.rs b/src/desktop/global_shortcuts.rs index 08f3d517b..abce749fa 100644 --- a/src/desktop/global_shortcuts.rs +++ b/src/desktop/global_shortcuts.rs @@ -2,7 +2,7 @@ use std::{collections::HashMap, fmt::Debug, time::Duration}; -use futures_util::TryFutureExt; +use futures_util::{Stream, TryFutureExt}; use serde::{Deserialize, Serialize}; use zbus::zvariant::{ DeserializeDict, ObjectPath, OwnedObjectPath, OwnedValue, SerializeDict, Type, @@ -277,7 +277,7 @@ impl<'a> GlobalShortcuts<'a> { /// /// See also [`Activated`](https://flatpak.github.io/xdg-desktop-portal/index.html#gdbus-signal-org-freedesktop-portal-GlobalShortcuts.Activated). #[doc(alias = "Activated")] - pub async fn receive_activated(&self) -> Result { + pub async fn receive_activated(&self) -> Result, Error> { self.0.signal("Activated").await } @@ -287,7 +287,7 @@ impl<'a> GlobalShortcuts<'a> { /// /// See also [`Deactivated`](https://flatpak.github.io/xdg-desktop-portal/index.html#gdbus-signal-org-freedesktop-portal-GlobalShortcuts.Deactivated). #[doc(alias = "Deactivated")] - pub async fn receive_deactivated(&self) -> Result { + pub async fn receive_deactivated(&self) -> Result, Error> { self.0.signal("Deactivated").await } @@ -298,7 +298,9 @@ impl<'a> GlobalShortcuts<'a> { /// /// See also [`ShortcutsChanged`](https://flatpak.github.io/xdg-desktop-portal/index.html#gdbus-signal-org-freedesktop-portal-GlobalShortcuts.ShortcutsChanged). #[doc(alias = "ShortcutsChanged")] - pub async fn receive_shortcuts_changed(&self) -> Result { + pub async fn receive_shortcuts_changed( + &self, + ) -> Result, Error> { self.0.signal("ShortcutsChanged").await } } diff --git a/src/desktop/inhibit.rs b/src/desktop/inhibit.rs index c0e6d0a81..3384e0b79 100644 --- a/src/desktop/inhibit.rs +++ b/src/desktop/inhibit.rs @@ -4,6 +4,7 @@ //! //! ```rust,no_run //! use std::{thread, time}; +//! use futures_util::StreamExt; //! //! use ashpd::{ //! desktop::inhibit::{InhibitFlags, InhibitProxy, SessionState}, @@ -16,7 +17,7 @@ //! //! let session = proxy.create_monitor(&identifier).await?; //! -//! let state = proxy.receive_state_changed().await?; +//! let state = proxy.receive_state_changed().await?.next().await.unwrap(); //! match state.session_state() { //! SessionState::Running => (), //! SessionState::QueryEnd => { @@ -39,7 +40,7 @@ //! ``` use enumflags2::{bitflags, BitFlags}; -use futures_util::TryFutureExt; +use futures_util::{Stream, TryFutureExt}; use serde::Deserialize; use serde_repr::{Deserialize_repr, Serialize_repr}; use zbus::zvariant::{DeserializeDict, OwnedObjectPath, SerializeDict, Type}; @@ -220,7 +221,7 @@ impl<'a> InhibitProxy<'a> { /// See also [`StateChanged`](https://flatpak.github.io/xdg-desktop-portal/index.html#gdbus-signal-org-freedesktop-portal-Inhibit.StateChanged). #[doc(alias = "StateChanged")] #[doc(alias = "XdpPortal::session-state-changed")] - pub async fn receive_state_changed(&self) -> Result { + pub async fn receive_state_changed(&self) -> Result, Error> { self.0.signal("StateChanged").await } diff --git a/src/desktop/location.rs b/src/desktop/location.rs index 747d8a608..8f4ad2bb3 100644 --- a/src/desktop/location.rs +++ b/src/desktop/location.rs @@ -5,33 +5,32 @@ //! desktop::location::{Accuracy, LocationProxy}, //! WindowIdentifier, //! }; -//! use futures_util::TryFutureExt; +//! use futures_util::{FutureExt, StreamExt}; //! //! async fn run() -> ashpd::Result<()> { //! let proxy = LocationProxy::new().await?; //! let identifier = WindowIdentifier::default(); -//! //! let session = proxy //! .create_session(None, None, Some(Accuracy::Street)) //! .await?; -//! -//! let (_, location) = futures_util::try_join!( -//! proxy.start(&session, &identifier).into_future(), -//! proxy.receive_location_updated().into_future() -//! )?; -//! +//! let mut stream = proxy.receive_location_updated().await?; +//! let (_, location) = futures_util::join!( +//! proxy +//! .start(&session, &identifier) +//! .map(|e| e.expect("Couldn't start session")), +//! stream.next().map(|e| e.expect("Stream is exhausted")) +//! ); //! println!("{}", location.accuracy()); //! println!("{}", location.longitude()); //! println!("{}", location.latitude()); //! session.close().await?; -//! //! Ok(()) //! } //! ``` use std::fmt::Debug; -use futures_util::TryFutureExt; +use futures_util::{Stream, TryFutureExt}; use serde::Deserialize; use serde_repr::Serialize_repr; use zbus::zvariant::{DeserializeDict, OwnedObjectPath, SerializeDict, Type}; @@ -209,7 +208,7 @@ impl<'a> LocationProxy<'a> { /// See also [`LocationUpdated`](https://flatpak.github.io/xdg-desktop-portal/index.html#gdbus-signal-org-freedesktop-portal-Location.LocationUpdated). #[doc(alias = "LocationUpdated")] #[doc(alias = "XdpPortal::location-updated")] - pub async fn receive_location_updated(&self) -> Result { + pub async fn receive_location_updated(&self) -> Result, Error> { self.0.signal("LocationUpdated").await } diff --git a/src/desktop/memory_monitor.rs b/src/desktop/memory_monitor.rs index 6e3d77426..e4f4d2fdc 100644 --- a/src/desktop/memory_monitor.rs +++ b/src/desktop/memory_monitor.rs @@ -2,15 +2,23 @@ //! //! ```rust,no_run //! use ashpd::desktop::memory_monitor::MemoryMonitor; +//! use futures_util::StreamExt; //! //! async fn run() -> ashpd::Result<()> { //! let proxy = MemoryMonitor::new().await?; -//! let level = proxy.receive_low_memory_warning().await?; +//! let level = proxy +//! .receive_low_memory_warning() +//! .await? +//! .next() +//! .await +//! .expect("Stream exhausted"); //! println!("{}", level); //! Ok(()) //! } //! ``` +use futures_util::Stream; + use crate::{proxy::Proxy, Error}; /// The interface provides information about low system memory to sandboxed @@ -39,7 +47,7 @@ impl<'a> MemoryMonitor<'a> { /// /// See also [`LowMemoryWarning`](https://flatpak.github.io/xdg-desktop-portal/index.html#gdbus-signal-org-freedesktop-portal-MemoryMonitor.LowMemoryWarning). #[doc(alias = "LowMemoryWarning")] - pub async fn receive_low_memory_warning(&self) -> Result { + pub async fn receive_low_memory_warning(&self) -> Result, Error> { self.0.signal("LowMemoryWarning").await } } diff --git a/src/desktop/network_monitor.rs b/src/desktop/network_monitor.rs index 4e59cd507..68d66b4e7 100644 --- a/src/desktop/network_monitor.rs +++ b/src/desktop/network_monitor.rs @@ -19,6 +19,7 @@ use std::fmt; +use futures_util::Stream; use serde_repr::Deserialize_repr; use zbus::zvariant::{DeserializeDict, Type}; @@ -165,7 +166,7 @@ impl<'a> NetworkMonitor<'a> { /// # Specifications /// /// See also [`changed`](https://flatpak.github.io/xdg-desktop-portal/index.html#gdbus-signal-org-freedesktop-portal-NetworkMonitor.changed). - pub async fn receive_changed(&self) -> Result<(), Error> { + pub async fn receive_changed(&self) -> Result, Error> { self.0.signal("changed").await } } diff --git a/src/desktop/notification.rs b/src/desktop/notification.rs index 0fe73c180..e970c9d65 100644 --- a/src/desktop/notification.rs +++ b/src/desktop/notification.rs @@ -2,6 +2,7 @@ //! //! ```rust,no_run //! use std::{thread, time}; +//! use futures_util::StreamExt; //! //! use ashpd::desktop::{ //! notification::{Action, Button, Notification, NotificationProxy, Priority}, @@ -27,7 +28,12 @@ //! ) //! .await?; //! -//! let action = proxy.receive_action_invoked().await?; +//! let action = proxy +//! .receive_action_invoked() +//! .await? +//! .next() +//! .await +//! .expect("Stream exhausted"); //! match action.name() { //! "copy" => (), // Copy something to clipboard //! "delete" => (), // Delete the file @@ -46,6 +52,7 @@ use std::{fmt, str::FromStr}; +use futures_util::Stream; use serde::{self, Deserialize, Serialize}; use zbus::zvariant::{OwnedValue, SerializeDict, Type, Value}; @@ -300,7 +307,7 @@ impl<'a> NotificationProxy<'a> { /// See also [`ActionInvoked`](https://flatpak.github.io/xdg-desktop-portal/index.html#gdbus-signal-org-freedesktop-portal-Notification.ActionInvoked). #[doc(alias = "ActionInvoked")] #[doc(alias = "XdpPortal::notification-action-invoked")] - pub async fn receive_action_invoked(&self) -> Result { + pub async fn receive_action_invoked(&self) -> Result, Error> { self.0.signal("ActionInvoked").await } diff --git a/src/desktop/session.rs b/src/desktop/session.rs index 101c334d9..53191e400 100644 --- a/src/desktop/session.rs +++ b/src/desktop/session.rs @@ -1,5 +1,6 @@ use std::{collections::HashMap, fmt::Debug}; +use futures_util::Stream; use serde::{Serialize, Serializer}; use zbus::zvariant::{ObjectPath, OwnedValue, Signature, Type}; @@ -51,7 +52,7 @@ impl<'a> Session<'a> { /// /// See also [`Closed`](https://flatpak.github.io/xdg-desktop-portal/index.html#gdbus-signal-org-freedesktop-portal-Session.Closed). #[doc(alias = "Closed")] - pub async fn receive_closed(&self) -> Result { + pub async fn receive_closed(&self) -> Result, Error> { self.0.signal("Closed").await } diff --git a/src/desktop/settings.rs b/src/desktop/settings.rs index 770abd37e..7229ad8f6 100644 --- a/src/desktop/settings.rs +++ b/src/desktop/settings.rs @@ -1,5 +1,6 @@ //! ```rust,no_run //! use ashpd::desktop::settings::Settings; +//! use futures_util::StreamExt; //! //! async fn run() -> ashpd::Result<()> { //! let proxy = Settings::new().await?; @@ -12,7 +13,12 @@ //! let settings = proxy.read_all(&["org.gnome.desktop.interface"]).await?; //! println!("{:#?}", settings); //! -//! let setting = proxy.receive_setting_changed().await?; +//! let setting = proxy +//! .receive_setting_changed() +//! .await? +//! .next() +//! .await +//! .expect("Stream exhausted"); //! println!("{}", setting.namespace()); //! println!("{}", setting.key()); //! println!("{:#?}", setting.value()); @@ -23,7 +29,8 @@ use std::{collections::HashMap, convert::TryFrom, fmt::Debug}; -use serde::{de::DeserializeOwned, Deserialize, Serialize}; +use futures_util::Stream; +use serde::{Deserialize, Serialize}; use zbus::zvariant::{OwnedValue, Type, Value}; use crate::{proxy::Proxy, Error}; @@ -73,6 +80,37 @@ pub enum ColorScheme { PreferLight, } +impl TryFrom for ColorScheme { + type Error = Error; + + fn try_from(value: OwnedValue) -> Result { + TryFrom::::try_from(value.into()) + } +} + +impl TryFrom> for ColorScheme { + type Error = Error; + + fn try_from(value: Value) -> Result { + Ok(match u32::try_from(value)? { + 1 => ColorScheme::PreferDark, + 2 => ColorScheme::PreferLight, + _ => ColorScheme::NoPreference, + }) + } +} + +impl TryFrom for ColorScheme { + type Error = Error; + + fn try_from(value: Setting) -> Result { + Self::try_from(value.2) + } +} + +const APPEARANCE_NAMESPACE: &str = "org.freedesktop.appearance"; +const COLOR_SCHEME_KEY: &str = "color-scheme"; + /// The interface provides read-only access to a small number of host settings /// required for toolkits similar to XSettings. It is not for general purpose /// settings. @@ -131,46 +169,21 @@ impl<'a> Settings<'a> { #[doc(alias = "Read")] pub async fn read(&self, namespace: &str, key: &str) -> Result where - T: TryFrom + DeserializeOwned + Type, + T: TryFrom, Error: From<>::Error>, { let value = self.0.call::("Read", &(namespace, key)).await?; - if let Some(v) = value.downcast_ref::>() { + if let Some(v) = value.downcast_ref::() { T::try_from(v.to_owned()).map_err(From::from) } else { T::try_from(value).map_err(From::from) } } - /// Reads the value of namespace: `org.freedesktop.appearance` and - /// `color-scheme` key. + /// Retrieves the system's preferred color scheme pub async fn color_scheme(&self) -> Result { - let scheme = match self - .read::("org.freedesktop.appearance", "color-scheme") - .await? - { - 1 => ColorScheme::PreferDark, - 2 => ColorScheme::PreferLight, - _ => ColorScheme::NoPreference, - }; - Ok(scheme) - } - - /// Listen to changes of the namespace `org.freedesktop.appearance` for - /// `color-scheme` key. - pub async fn receive_color_scheme_changed(&self) -> Result { - loop { - let setting = self.receive_setting_changed().await?; - if setting.namespace() == "org.freedesktop.appearance" - && setting.key() == "color-scheme" - { - return Ok(match u32::try_from(setting.value()) { - Ok(1) => ColorScheme::PreferDark, - Ok(2) => ColorScheme::PreferLight, - _ => ColorScheme::NoPreference, - }); - } - } + self.read::(APPEARANCE_NAMESPACE, COLOR_SCHEME_KEY) + .await } /// Signal emitted when a setting changes. @@ -179,7 +192,7 @@ impl<'a> Settings<'a> { /// /// See also [`SettingChanged`](https://flatpak.github.io/xdg-desktop-portal/index.html#gdbus-signal-org-freedesktop-portal-Settings.SettingChanged). #[doc(alias = "SettingChanged")] - pub async fn receive_setting_changed(&self) -> Result { + pub async fn receive_setting_changed(&self) -> Result, Error> { self.0.signal("SettingChanged").await } } diff --git a/src/documents/file_transfer.rs b/src/documents/file_transfer.rs index 9aad627e7..1baa66b88 100644 --- a/src/documents/file_transfer.rs +++ b/src/documents/file_transfer.rs @@ -24,6 +24,7 @@ use std::{collections::HashMap, os::unix::prelude::AsRawFd}; +use futures_util::Stream; use zbus::zvariant::{Fd, SerializeDict, Type, Value}; use crate::{proxy::Proxy, Error}; @@ -187,7 +188,7 @@ impl<'a> FileTransfer<'a> { /// /// See also [`TransferClosed`](https://flatpak.github.io/xdg-desktop-portal/index.html#gdbus-signal-org-freedesktop-portal-FileTransfer.TransferClosed). #[doc(alias = "TransferClosed")] - pub async fn transfer_closed(&self) -> Result { + pub async fn transfer_closed(&self) -> Result, Error> { self.0.signal("TransferClosed").await } } diff --git a/src/flatpak/mod.rs b/src/flatpak/mod.rs index c4cbbf355..121110889 100644 --- a/src/flatpak/mod.rs +++ b/src/flatpak/mod.rs @@ -33,6 +33,7 @@ use std::{ }; use enumflags2::{bitflags, BitFlags}; +use futures_util::Stream; use serde::Serialize; use serde_repr::{Deserialize_repr, Serialize_repr}; use zbus::zvariant::{Fd, OwnedObjectPath, SerializeDict, Type}; @@ -272,7 +273,7 @@ impl<'a> Flatpak<'a> { /// Emitted when a process starts by [`spawn()`][`Flatpak::spawn`]. #[doc(alias = "SpawnStarted")] - pub async fn receive_spawn_started(&self) -> Result<(u32, u32), Error> { + pub async fn receive_spawn_started(&self) -> Result, Error> { self.0.signal("SpawnStarted").await } @@ -284,7 +285,7 @@ impl<'a> Flatpak<'a> { /// See also [`SpawnExited`](https://flatpak.github.io/xdg-desktop-portal/index.html#gdbus-signal-org-freedesktop-portal-Flatpak.SpawnExited). #[doc(alias = "SpawnExited")] #[doc(alias = "XdpPortal::spawn-exited")] - pub async fn receive_spawn_existed(&self) -> Result<(u32, u32), Error> { + pub async fn receive_spawn_existed(&self) -> Result, Error> { self.0.signal("SpawnExited").await } diff --git a/src/flatpak/update_monitor.rs b/src/flatpak/update_monitor.rs index 9e9fd8bda..3df0b0cc4 100644 --- a/src/flatpak/update_monitor.rs +++ b/src/flatpak/update_monitor.rs @@ -5,6 +5,7 @@ //! //! ```rust,no_run //! use ashpd::{flatpak::Flatpak, WindowIdentifier}; +//! use futures_util::StreamExt; //! //! async fn run() -> ashpd::Result<()> { //! let proxy = Flatpak::new().await?; @@ -13,13 +14,19 @@ //! let info = monitor.receive_update_available().await?; //! //! monitor.update(&WindowIdentifier::default()).await?; -//! let progress = monitor.receive_progress().await?; +//! let progress = monitor +//! .receive_progress() +//! .await? +//! .next() +//! .await +//! .expect("Stream exhausted"); //! println!("{:#?}", progress); //! //! Ok(()) //! } //! ``` +use futures_util::Stream; use serde_repr::{Deserialize_repr, Serialize_repr}; use zbus::zvariant::{DeserializeDict, ObjectPath, SerializeDict, Type}; @@ -126,7 +133,7 @@ impl<'a> UpdateMonitor<'a> { /// See also [`Progress`](https://flatpak.github.io/xdg-desktop-portal/index.html#gdbus-signal-org-freedesktop-portal-Flatpak-UpdateMonitor.Progress). #[doc(alias = "Progress")] #[doc(alias = "XdpPortal::update-progress")] - pub async fn receive_progress(&self) -> Result { + pub async fn receive_progress(&self) -> Result, Error> { self.0.signal("Progress").await } @@ -137,7 +144,7 @@ impl<'a> UpdateMonitor<'a> { /// See also [`UpdateAvailable`](https://flatpak.github.io/xdg-desktop-portal/index.html#gdbus-signal-org-freedesktop-portal-Flatpak-UpdateMonitor.UpdateAvailable). #[doc(alias = "UpdateAvailable")] #[doc(alias = "XdpPortal::update-available")] - pub async fn receive_update_available(&self) -> Result { + pub async fn receive_update_available(&self) -> Result, Error> { self.0.signal("UpdateAvailable").await } diff --git a/src/proxy.rs b/src/proxy.rs index a710ee52f..a8c06c1eb 100644 --- a/src/proxy.rs +++ b/src/proxy.rs @@ -1,10 +1,13 @@ -use std::{fmt::Debug, ops::Deref}; +use std::{fmt::Debug, future::ready, ops::Deref}; -use futures_util::StreamExt; +use futures_util::{Stream, StreamExt}; use once_cell::sync::OnceCell; use serde::{Deserialize, Serialize}; use zbus::zvariant::{ObjectPath, OwnedValue, Type}; +#[cfg(feature = "tracing")] +use zbus::Message; + use crate::{ desktop::{HandleToken, Request}, Error, PortalError, @@ -154,32 +157,39 @@ impl<'a> Proxy<'a> { .map_err(From::from) } - pub(crate) async fn signal(&self, signal_name: &'static str) -> Result + pub(crate) async fn signal(&self, name: &'static str) -> Result, Error> where - R: for<'de> Deserialize<'de> + Type + Debug, + I: for<'de> Deserialize<'de> + Type + Debug, { - #[cfg(feature = "tracing")] - tracing::info!( - "Listening to signal '{}' on '{}'", - signal_name, - self.interface() - ); - let mut stream = self - .0 - .receive_signal(signal_name) - .await - .map_err::(From::from)?; - let message = stream.next().await.ok_or(Error::NoResponse)?; - #[cfg(feature = "tracing")] - tracing::info!( - "Received signal '{}' on '{}'", - signal_name, - self.interface() - ); - let content = message.body::()?; - #[cfg(feature = "tracing")] - tracing::debug!("With body {:#?}", content); - Ok(content) + Ok(self.0.receive_signal(name).await?.filter_map({ + #[cfg(not(feature = "tracing"))] + { + move |msg| ready(msg.body().ok()) + } + #[cfg(feature = "tracing")] + { + let ifc = self.interface().to_owned(); + move |msg| ready(trace_body(name, &ifc, msg)) + } + })) + } +} + +#[cfg(feature = "tracing")] +fn trace_body(name: &'static str, ifc: &str, msg: impl AsRef) -> Option +where + I: for<'de> Deserialize<'de> + Type + Debug, +{ + tracing::info!("Received signal '{name}' on '{ifc}'"); + match msg.as_ref().body() { + Ok(body) => { + tracing::debug!("With body {body:#?}"); + Some(body) + } + Err(e) => { + tracing::warn!("Error obtaining body: {e:#?}"); + None + } } }