Skip to content

Commit

Permalink
fix: Make full use of tokio ecosystem if the tokio feature is enabled…
Browse files Browse the repository at this point in the history
… on Unix (#336)
  • Loading branch information
DataTriny authored Jan 8, 2024
1 parent 432dd7a commit c034802
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 55 deletions.
24 changes: 24 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 14 additions & 6 deletions platforms/unix/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,27 @@ edition = "2021"

[features]
default = ["async-io"]
async-io = ["atspi/async-std", "zbus/async-io"]
tokio = ["dep:tokio", "atspi/tokio", "zbus/tokio"]
async-io = ["dep:async-channel", "dep:async-lock", "dep:futures-util", "atspi/async-std", "zbus/async-io"]
tokio = ["dep:tokio", "dep:tokio-stream", "atspi/tokio", "zbus/tokio"]

[dependencies]
accesskit = { version = "0.12.2", path = "../../common" }
accesskit_consumer = { version = "0.17.0", path = "../../consumer" }
async-channel = "2.1.1"
async-lock = "2.7.0"
async-once-cell = "0.5.3"
atspi = { version = "0.19", default-features = false }
futures-lite = "1.13"
futures-util = "0.3.27"
once_cell = "1.17.1"
serde = "1.0"
tokio = { version = "1.32.0", optional = true, features = ["rt", "net", "time"] }
zbus = { version = "3.14", default-features = false }

# async-io support
async-channel = { version = "2.1.1", optional = true }
async-lock = { version = "2.7.0", optional = true }
futures-util = { version = "0.3.27", optional = true }

# tokio support
tokio-stream = { version = "0.1.14", optional = true }
[dependencies.tokio]
version = "1.32.0"
optional = true
features = ["macros", "net", "rt", "sync", "time"]
86 changes: 43 additions & 43 deletions platforms/unix/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::{
};
use accesskit::{ActionHandler, NodeId, Rect, Role, TreeUpdate};
use accesskit_consumer::{DetachedNode, FilterResult, Node, Tree, TreeChangeHandler, TreeState};
#[cfg(not(feature = "tokio"))]
use async_channel::Sender;
use async_once_cell::Lazy;
use atspi::{InterfaceSet, Live, State};
Expand All @@ -26,6 +27,8 @@ use std::{
Arc, Mutex, Weak,
},
};
#[cfg(feature = "tokio")]
use tokio::sync::mpsc::UnboundedSender as Sender;

struct AdapterChangeHandler<'a> {
adapter: &'a AdapterImpl,
Expand Down Expand Up @@ -274,34 +277,35 @@ impl AdapterImpl {
}
}

pub(crate) async fn send_message(&self, message: Message) {
#[cfg(not(feature = "tokio"))]
self.messages.send(message).await.unwrap();
#[cfg(feature = "tokio")]
self.messages.send(message).unwrap();
}

async fn register_interfaces(&self, id: NodeId, new_interfaces: InterfaceSet) {
self.messages
.send(Message::RegisterInterfaces {
adapter_id: self.id,
context: Arc::downgrade(&self.context),
node_id: id,
interfaces: new_interfaces,
})
.await
.unwrap();
self.send_message(Message::RegisterInterfaces {
adapter_id: self.id,
context: Arc::downgrade(&self.context),
node_id: id,
interfaces: new_interfaces,
})
.await;
}

async fn unregister_interfaces(&self, id: NodeId, old_interfaces: InterfaceSet) {
self.messages
.send(Message::UnregisterInterfaces {
adapter_id: self.id,
node_id: id,
interfaces: old_interfaces,
})
.await
.unwrap();
self.send_message(Message::UnregisterInterfaces {
adapter_id: self.id,
node_id: id,
interfaces: old_interfaces,
})
.await;
}

pub(crate) async fn emit_object_event(&self, target: ObjectId, event: ObjectEvent) {
self.messages
.send(Message::EmitEvent(Event::Object { target, event }))
.await
.unwrap();
self.send_message(Message::EmitEvent(Event::Object { target, event }))
.await;
}

fn set_root_window_bounds(&self, bounds: WindowBounds) {
Expand Down Expand Up @@ -336,17 +340,15 @@ impl AdapterImpl {
}

async fn window_activated(&self, window: &NodeWrapper<'_>) {
self.messages
.send(Message::EmitEvent(Event::Window {
target: ObjectId::Node {
adapter: self.id,
node: window.id(),
},
name: window.name().unwrap_or_default(),
event: WindowEvent::Activated,
}))
.await
.unwrap();
self.send_message(Message::EmitEvent(Event::Window {
target: ObjectId::Node {
adapter: self.id,
node: window.id(),
},
name: window.name().unwrap_or_default(),
event: WindowEvent::Activated,
}))
.await;
self.emit_object_event(
ObjectId::Node {
adapter: self.id,
Expand All @@ -366,17 +368,15 @@ impl AdapterImpl {
}

async fn window_deactivated(&self, window: &NodeWrapper<'_>) {
self.messages
.send(Message::EmitEvent(Event::Window {
target: ObjectId::Node {
adapter: self.id,
node: window.id(),
},
name: window.name().unwrap_or_default(),
event: WindowEvent::Deactivated,
}))
.await
.unwrap();
self.send_message(Message::EmitEvent(Event::Window {
target: ObjectId::Node {
adapter: self.id,
node: window.id(),
},
name: window.name().unwrap_or_default(),
event: WindowEvent::Deactivated,
}))
.await;
self.emit_object_event(
ObjectId::Node {
adapter: self.id,
Expand Down
32 changes: 26 additions & 6 deletions platforms/unix/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,23 @@

use accesskit::{ActionHandler, ActionRequest};
use accesskit_consumer::Tree;
#[cfg(not(feature = "tokio"))]
use async_channel::Sender;
#[cfg(not(feature = "tokio"))]
use async_lock::{Mutex as AsyncMutex, MutexGuard as AsyncMutexGuard};
use async_once_cell::OnceCell as AsyncOnceCell;
use atspi::proxy::bus::StatusProxy;
use futures_util::{pin_mut, select, StreamExt};
#[cfg(not(feature = "tokio"))]
use futures_util::{pin_mut as pin, select, StreamExt};
use once_cell::sync::OnceCell;
use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard, Weak};
#[cfg(feature = "tokio")]
use tokio::{
pin, select,
sync::{mpsc::UnboundedSender as Sender, Mutex as AsyncMutex, MutexGuard as AsyncMutexGuard},
};
#[cfg(feature = "tokio")]
use tokio_stream::{wrappers::UnboundedReceiverStream, StreamExt};
use zbus::{Connection, Task};

use crate::{
Expand Down Expand Up @@ -170,10 +180,21 @@ impl ActivationContext {
async fn listen(session_bus: Connection) -> zbus::Result<()> {
let status = StatusProxy::new(&session_bus).await?;
let changes = status.receive_is_enabled_changed().await.fuse();
pin_mut!(changes);
let (tx, rx) = async_channel::unbounded();
let messages = rx.fuse();
pin_mut!(messages);
pin!(changes);

#[cfg(not(feature = "tokio"))]
let (tx, messages) = {
let (tx, rx) = async_channel::unbounded();
let messages = rx.fuse();
(tx, messages)
};
#[cfg(feature = "tokio")]
let (tx, messages) = {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let messages = UnboundedReceiverStream::new(rx).fuse();
(tx, messages)
};
pin!(messages);
let mut atspi_bus = None;

loop {
Expand Down Expand Up @@ -206,7 +227,6 @@ async fn listen(session_bus: Connection) -> zbus::Result<()> {
process_adapter_message(atspi_bus, message).await?;
}
}
complete => return Ok(()),
}
}
}
Expand Down

0 comments on commit c034802

Please sign in to comment.