From 5bec14c2dc14e1ff304f9abf373c646dd366df63 Mon Sep 17 00:00:00 2001 From: DataTriny Date: Sat, 6 Jan 2024 18:56:47 +0100 Subject: [PATCH] fix: Run our own async executor on Unix --- Cargo.lock | 3 +- platforms/unix/Cargo.toml | 5 +- platforms/unix/src/adapter.rs | 302 +++++++++++++++----------------- platforms/unix/src/atspi/bus.rs | 28 ++- platforms/unix/src/context.rs | 208 ++++++++++------------ platforms/unix/src/executor.rs | 171 ++++++++++++++++++ platforms/unix/src/lib.rs | 1 + platforms/unix/src/node.rs | 187 +++++++++----------- platforms/unix/src/util.rs | 32 +--- 9 files changed, 529 insertions(+), 408 deletions(-) create mode 100644 platforms/unix/src/executor.rs diff --git a/Cargo.lock b/Cargo.lock index bde8d4c5..0e718436 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -75,8 +75,9 @@ dependencies = [ "accesskit", "accesskit_consumer", "async-channel 2.1.1", - "async-lock", + "async-executor", "async-once-cell", + "async-task", "atspi", "futures-lite", "futures-util", diff --git a/platforms/unix/Cargo.toml b/platforms/unix/Cargo.toml index c7edb80a..6afa89a1 100644 --- a/platforms/unix/Cargo.toml +++ b/platforms/unix/Cargo.toml @@ -12,7 +12,7 @@ edition = "2021" [features] default = ["async-io"] -async-io = ["dep:async-channel", "dep:async-lock", "dep:futures-util", "atspi/async-std", "zbus/async-io"] +async-io = ["dep:async-channel", "dep:async-executor", "dep:async-task", "dep:futures-util", "atspi/async-std", "zbus/async-io"] tokio = ["dep:tokio", "dep:tokio-stream", "atspi/tokio", "zbus/tokio"] [dependencies] @@ -27,7 +27,8 @@ zbus = { version = "3.14", default-features = false } # async-io support async-channel = { version = "2.1.1", optional = true } -async-lock = { version = "2.7.0", optional = true } +async-executor = { version = "1.5.0", optional = true } +async-task = { version = "4.3.0", optional = true } futures-util = { version = "0.3.27", optional = true } # tokio support diff --git a/platforms/unix/src/adapter.rs b/platforms/unix/src/adapter.rs index 3cbf68b7..76043f6a 100644 --- a/platforms/unix/src/adapter.rs +++ b/platforms/unix/src/adapter.rs @@ -8,10 +8,10 @@ use crate::{ interfaces::{Event, ObjectEvent, WindowEvent}, ObjectId, }, - context::{ActivationContext, AppContext, Context}, + context::{AppContext, Context}, filters::{filter, filter_detached}, node::NodeWrapper, - util::{block_on, WindowBounds}, + util::WindowBounds, }; use accesskit::{ActionHandler, NodeId, Rect, Role, TreeUpdate}; use accesskit_consumer::{DetachedNode, FilterResult, Node, Tree, TreeChangeHandler, TreeState}; @@ -36,63 +36,52 @@ struct AdapterChangeHandler<'a> { impl AdapterChangeHandler<'_> { fn add_node(&mut self, node: &Node) { - block_on(async { - let role = node.role(); - let is_root = node.is_root(); - let node = NodeWrapper::Node { - adapter: self.adapter.id, - node, - }; - let interfaces = node.interfaces(); - self.adapter - .register_interfaces(node.id(), interfaces) - .await; - if is_root && role == Role::Window { - let adapter_index = AppContext::read().adapter_index(self.adapter.id).unwrap(); - self.adapter.window_created(adapter_index, node.id()).await; - } + let role = node.role(); + let is_root = node.is_root(); + let node = NodeWrapper::Node { + adapter: self.adapter.id, + node, + }; + let interfaces = node.interfaces(); + self.adapter.register_interfaces(node.id(), interfaces); + if is_root && role == Role::Window { + let adapter_index = AppContext::read().adapter_index(self.adapter.id).unwrap(); + self.adapter.window_created(adapter_index, node.id()); + } - let live = node.live(); - if live != Live::None { - if let Some(name) = node.name() { - self.adapter - .emit_object_event( - ObjectId::Node { - adapter: self.adapter.id, - node: node.id(), - }, - ObjectEvent::Announcement(name, live), - ) - .await; - } + let live = node.live(); + if live != Live::None { + if let Some(name) = node.name() { + self.adapter.emit_object_event( + ObjectId::Node { + adapter: self.adapter.id, + node: node.id(), + }, + ObjectEvent::Announcement(name, live), + ); } - }) + } } fn remove_node(&mut self, node: &DetachedNode) { - block_on(async { - let role = node.role(); - let is_root = node.is_root(); - let node = NodeWrapper::DetachedNode { + let role = node.role(); + let is_root = node.is_root(); + let node = NodeWrapper::DetachedNode { + adapter: self.adapter.id, + node, + }; + if is_root && role == Role::Window { + self.adapter.window_destroyed(node.id()); + } + self.adapter.emit_object_event( + ObjectId::Node { adapter: self.adapter.id, - node, - }; - if is_root && role == Role::Window { - self.adapter.window_destroyed(node.id()).await; - } - self.adapter - .emit_object_event( - ObjectId::Node { - adapter: self.adapter.id, - node: node.id(), - }, - ObjectEvent::StateChanged(State::Defunct, true), - ) - .await; - self.adapter - .unregister_interfaces(node.id(), node.interfaces()) - .await; - }); + node: node.id(), + }, + ObjectEvent::StateChanged(State::Defunct, true), + ); + self.adapter + .unregister_interfaces(node.id(), node.interfaces()); } } @@ -124,18 +113,12 @@ impl TreeChangeHandler for AdapterChangeHandler<'_> { let old_interfaces = old_wrapper.interfaces(); let new_interfaces = new_wrapper.interfaces(); let kept_interfaces = old_interfaces & new_interfaces; - block_on(async { - self.adapter - .unregister_interfaces(new_wrapper.id(), old_interfaces ^ kept_interfaces) - .await; - self.adapter - .register_interfaces(new_node.id(), new_interfaces ^ kept_interfaces) - .await; - let bounds = *self.adapter.context.read_root_window_bounds(); - new_wrapper - .notify_changes(&bounds, self.adapter, &old_wrapper) - .await; - }); + self.adapter + .unregister_interfaces(new_wrapper.id(), old_interfaces ^ kept_interfaces); + self.adapter + .register_interfaces(new_node.id(), new_interfaces ^ kept_interfaces); + let bounds = *self.adapter.context.read_root_window_bounds(); + new_wrapper.notify_changes(&bounds, self.adapter, &old_wrapper); } } @@ -145,53 +128,43 @@ impl TreeChangeHandler for AdapterChangeHandler<'_> { new_node: Option<&Node>, current_state: &TreeState, ) { - block_on(async { - if let Some(root_window) = root_window(current_state) { - if old_node.is_none() && new_node.is_some() { - self.adapter - .window_activated(&NodeWrapper::Node { - adapter: self.adapter.id, - node: &root_window, - }) - .await; - } else if old_node.is_some() && new_node.is_none() { - self.adapter - .window_deactivated(&NodeWrapper::Node { - adapter: self.adapter.id, - node: &root_window, - }) - .await; - } + if let Some(root_window) = root_window(current_state) { + if old_node.is_none() && new_node.is_some() { + self.adapter.window_activated(&NodeWrapper::Node { + adapter: self.adapter.id, + node: &root_window, + }); + } else if old_node.is_some() && new_node.is_none() { + self.adapter.window_deactivated(&NodeWrapper::Node { + adapter: self.adapter.id, + node: &root_window, + }); } - if let Some(node) = new_node.map(|node| NodeWrapper::Node { - adapter: self.adapter.id, - node, - }) { - self.adapter - .emit_object_event( - ObjectId::Node { - adapter: self.adapter.id, - node: node.id(), - }, - ObjectEvent::StateChanged(State::Focused, true), - ) - .await; - } - if let Some(node) = old_node.map(|node| NodeWrapper::DetachedNode { - adapter: self.adapter.id, - node, - }) { - self.adapter - .emit_object_event( - ObjectId::Node { - adapter: self.adapter.id, - node: node.id(), - }, - ObjectEvent::StateChanged(State::Focused, false), - ) - .await; - } - }); + } + if let Some(node) = new_node.map(|node| NodeWrapper::Node { + adapter: self.adapter.id, + node, + }) { + self.adapter.emit_object_event( + ObjectId::Node { + adapter: self.adapter.id, + node: node.id(), + }, + ObjectEvent::StateChanged(State::Focused, true), + ); + } + if let Some(node) = old_node.map(|node| NodeWrapper::DetachedNode { + adapter: self.adapter.id, + node, + }) { + self.adapter.emit_object_event( + ObjectId::Node { + adapter: self.adapter.id, + node: node.id(), + }, + ObjectEvent::StateChanged(State::Focused, false), + ); + } } fn node_removed(&mut self, node: &DetachedNode, _: &TreeState) { @@ -210,18 +183,18 @@ pub(crate) struct AdapterImpl { impl AdapterImpl { fn new( id: usize, + messages: Sender, initial_state: TreeUpdate, is_window_focused: bool, root_window_bounds: WindowBounds, action_handler: Box, ) -> Self { let tree = Tree::new(initial_state, is_window_focused); - let (messages, context) = { + let context = { let mut app_context = AppContext::write(); - let messages = app_context.messages.clone().unwrap(); let context = Context::new(tree, action_handler, root_window_bounds); app_context.push_adapter(id, &context); - (messages, context) + context }; AdapterImpl { id, @@ -230,7 +203,7 @@ impl AdapterImpl { } } - pub(crate) async fn register_tree(&self) { + pub(crate) fn register_tree(&self) { fn add_children( node: Node<'_>, to_add: &mut Vec<(NodeId, InterfaceSet)>, @@ -270,42 +243,39 @@ impl AdapterImpl { }; for (id, interfaces) in objects_to_add { - self.register_interfaces(id, interfaces).await; + self.register_interfaces(id, interfaces); if id == root_id { - self.window_created(adapter_index, id).await; + self.window_created(adapter_index, id); } } } - pub(crate) async fn send_message(&self, message: Message) { + pub(crate) fn send_message(&self, message: Message) { #[cfg(not(feature = "tokio"))] - self.messages.send(message).await.unwrap(); + let _ = self.messages.try_send(message); #[cfg(feature = "tokio")] - self.messages.send(message).unwrap(); + let _ = self.messages.send(message); } - async fn register_interfaces(&self, id: NodeId, new_interfaces: InterfaceSet) { + fn register_interfaces(&self, id: NodeId, new_interfaces: InterfaceSet) { self.send_message(Message::RegisterInterfaces { adapter_id: self.id, context: Arc::downgrade(&self.context), node_id: id, interfaces: new_interfaces, - }) - .await; + }); } - async fn unregister_interfaces(&self, id: NodeId, old_interfaces: InterfaceSet) { + fn unregister_interfaces(&self, id: NodeId, old_interfaces: InterfaceSet) { self.send_message(Message::UnregisterInterfaces { adapter_id: self.id, node_id: id, interfaces: old_interfaces, - }) - .await; + }); } - pub(crate) async fn emit_object_event(&self, target: ObjectId, event: ObjectEvent) { - self.send_message(Message::EmitEvent(Event::Object { target, event })) - .await; + pub(crate) fn emit_object_event(&self, target: ObjectId, event: ObjectEvent) { + self.send_message(Message::EmitEvent(Event::Object { target, event })); } fn set_root_window_bounds(&self, bounds: WindowBounds) { @@ -325,7 +295,7 @@ impl AdapterImpl { tree.update_host_focus_state_and_process_changes(is_focused, &mut handler); } - async fn window_created(&self, adapter_index: usize, window: NodeId) { + fn window_created(&self, adapter_index: usize, window: NodeId) { self.emit_object_event( ObjectId::Root, ObjectEvent::ChildAdded( @@ -335,11 +305,10 @@ impl AdapterImpl { node: window, }, ), - ) - .await; + ); } - async fn window_activated(&self, window: &NodeWrapper<'_>) { + fn window_activated(&self, window: &NodeWrapper<'_>) { self.send_message(Message::EmitEvent(Event::Window { target: ObjectId::Node { adapter: self.id, @@ -347,27 +316,24 @@ impl AdapterImpl { }, name: window.name().unwrap_or_default(), event: WindowEvent::Activated, - })) - .await; + })); self.emit_object_event( ObjectId::Node { adapter: self.id, node: window.id(), }, ObjectEvent::StateChanged(State::Active, true), - ) - .await; + ); self.emit_object_event( ObjectId::Root, ObjectEvent::ActiveDescendantChanged(ObjectId::Node { adapter: self.id, node: window.id(), }), - ) - .await; + ); } - async fn window_deactivated(&self, window: &NodeWrapper<'_>) { + fn window_deactivated(&self, window: &NodeWrapper<'_>) { self.send_message(Message::EmitEvent(Event::Window { target: ObjectId::Node { adapter: self.id, @@ -375,27 +341,24 @@ impl AdapterImpl { }, name: window.name().unwrap_or_default(), event: WindowEvent::Deactivated, - })) - .await; + })); self.emit_object_event( ObjectId::Node { adapter: self.id, node: window.id(), }, ObjectEvent::StateChanged(State::Active, false), - ) - .await; + ); } - async fn window_destroyed(&self, window: NodeId) { + fn window_destroyed(&self, window: NodeId) { self.emit_object_event( ObjectId::Root, ObjectEvent::ChildRemoved(ObjectId::Node { adapter: self.id, node: window, }), - ) - .await; + ); } } @@ -413,16 +376,13 @@ impl Drop for AdapterImpl { fn drop(&mut self) { AppContext::write().remove_adapter(self.id); let root_id = self.context.read_tree().state().root_id(); - block_on(async { - self.emit_object_event( - ObjectId::Root, - ObjectEvent::ChildRemoved(ObjectId::Node { - adapter: self.id, - node: root_id, - }), - ) - .await; - }); + self.emit_object_event( + ObjectId::Root, + ObjectEvent::ChildRemoved(ObjectId::Node { + adapter: self.id, + node: root_id, + }), + ); } } @@ -431,6 +391,7 @@ pub(crate) type LazyAdapter = Pin>>>; static NEXT_ADAPTER_ID: AtomicUsize = AtomicUsize::new(0); pub struct Adapter { + messages: Sender, id: usize, r#impl: LazyAdapter, is_window_focused: Arc, @@ -444,6 +405,8 @@ impl Adapter { action_handler: Box, ) -> Self { let id = NEXT_ADAPTER_ID.fetch_add(1, Ordering::SeqCst); + let messages = AppContext::read().messages.clone(); + let messages_copy = messages.clone(); let is_window_focused = Arc::new(AtomicBool::new(false)); let is_window_focused_copy = is_window_focused.clone(); let root_window_bounds = Arc::new(Mutex::new(Default::default())); @@ -454,6 +417,7 @@ impl Adapter { let root_window_bounds = *root_window_bounds_copy.lock().unwrap(); AdapterImpl::new( id, + messages_copy, source(), is_window_focused, root_window_bounds, @@ -464,14 +428,25 @@ impl Adapter { )); let adapter = Self { id, + messages: messages.clone(), r#impl: r#impl.clone(), is_window_focused, root_window_bounds, }; - block_on(async move { ActivationContext::activate_eventually(id, r#impl).await }); + adapter.send_message(Message::AddAdapter { + id, + adapter: r#impl, + }); adapter } + pub(crate) fn send_message(&self, message: Message) { + #[cfg(not(feature = "tokio"))] + let _ = self.messages.try_send(message); + #[cfg(feature = "tokio")] + let _ = self.messages.send(message); + } + pub fn set_root_window_bounds(&self, outer: Rect, inner: Rect) { let new_bounds = WindowBounds::new(outer, inner); { @@ -502,13 +477,18 @@ impl Adapter { impl Drop for Adapter { fn drop(&mut self) { - block_on(async { - ActivationContext::remove_adapter(self.id).await; - }) + self.send_message(Message::RemoveAdapter { id: self.id }); } } pub(crate) enum Message { + AddAdapter { + id: usize, + adapter: LazyAdapter, + }, + RemoveAdapter { + id: usize, + }, RegisterInterfaces { adapter_id: usize, context: Weak, diff --git a/platforms/unix/src/atspi/bus.rs b/platforms/unix/src/atspi/bus.rs index 2565c476..d076a39e 100644 --- a/platforms/unix/src/atspi/bus.rs +++ b/platforms/unix/src/atspi/bus.rs @@ -6,6 +6,7 @@ use crate::{ atspi::{interfaces::*, ObjectId}, context::{AppContext, Context}, + executor::{Executor, Task}, PlatformNode, PlatformRootNode, }; use accesskit::NodeId; @@ -22,22 +23,41 @@ use zbus::{ Address, Connection, ConnectionBuilder, Result, }; -#[derive(Clone)] pub(crate) struct Bus { conn: Connection, + _task: Task<()>, socket_proxy: SocketProxy<'static>, } impl Bus { - pub(crate) async fn new(session_bus: &Connection) -> zbus::Result { + pub(crate) async fn new( + session_bus: &Connection, + executor: &Executor<'_>, + ) -> zbus::Result { let address = match var("AT_SPI_BUS_ADDRESS") { Ok(address) if !address.is_empty() => address, _ => BusProxy::new(session_bus).await?.get_address().await?, }; let address: Address = address.as_str().try_into()?; - let conn = ConnectionBuilder::address(address)?.build().await?; + let conn = ConnectionBuilder::address(address)? + .internal_executor(false) + .build() + .await?; + let conn_copy = conn.clone(); + let _task = executor.spawn( + async move { + loop { + conn_copy.executor().tick().await; + } + }, + "accesskit_atspi_bus_task", + ); let socket_proxy = SocketProxy::new(&conn).await?; - let mut bus = Bus { conn, socket_proxy }; + let mut bus = Bus { + conn, + _task, + socket_proxy, + }; bus.register_root_node().await?; Ok(bus) } diff --git a/platforms/unix/src/context.rs b/platforms/unix/src/context.rs index cc1496f0..80303c6f 100644 --- a/platforms/unix/src/context.rs +++ b/platforms/unix/src/context.rs @@ -6,28 +6,29 @@ use accesskit::{ActionHandler, ActionRequest}; use accesskit_consumer::Tree; #[cfg(not(feature = "tokio"))] -use async_channel::Sender; -#[cfg(not(feature = "tokio"))] -use async_lock::{Mutex as AsyncMutex, MutexGuard as AsyncMutexGuard}; -use async_once_cell::OnceCell as AsyncOnceCell; +use async_channel::{Receiver, Sender}; use atspi::proxy::bus::StatusProxy; #[cfg(not(feature = "tokio"))] use futures_util::{pin_mut as pin, select, StreamExt}; use once_cell::sync::OnceCell; -use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard, Weak}; +use std::{ + sync::{Arc, Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard, Weak}, + thread, +}; #[cfg(feature = "tokio")] use tokio::{ pin, select, - sync::{mpsc::UnboundedSender as Sender, Mutex as AsyncMutex, MutexGuard as AsyncMutexGuard}, + sync::mpsc::{UnboundedReceiver as Receiver, UnboundedSender as Sender}, }; #[cfg(feature = "tokio")] use tokio_stream::{wrappers::UnboundedReceiverStream, StreamExt}; -use zbus::{Connection, Task}; +use zbus::{Connection, ConnectionBuilder}; use crate::{ adapter::{LazyAdapter, Message}, atspi::{interfaces::Event, map_or_ignoring_broken_pipe, Bus, OwnedObjectAddress}, - util::WindowBounds, + executor::Executor, + util::{block_on, WindowBounds}, }; pub(crate) struct Context { @@ -73,7 +74,7 @@ impl AdapterAndContext { static APP_CONTEXT: OnceCell>> = OnceCell::new(); pub(crate) struct AppContext { - pub(crate) messages: Option>, + pub(crate) messages: Sender, pub(crate) name: Option, pub(crate) toolkit_name: Option, pub(crate) toolkit_version: Option, @@ -83,29 +84,43 @@ pub(crate) struct AppContext { } impl AppContext { - fn get_or_init<'a>() -> RwLockWriteGuard<'a, Self> { - APP_CONTEXT - .get_or_init(|| { - Arc::new(RwLock::new(Self { - messages: None, - name: None, - toolkit_name: None, - toolkit_version: None, - id: None, - desktop_address: None, - adapters: Vec::new(), + fn get_or_init<'a>() -> &'a Arc> { + APP_CONTEXT.get_or_init(|| { + #[cfg(not(feature = "tokio"))] + let (tx, rx) = async_channel::unbounded(); + #[cfg(feature = "tokio")] + let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); + + thread::spawn(|| { + let executor = Executor::new(); + block_on(executor.run(async { + if let Ok(session_bus) = ConnectionBuilder::session() { + if let Ok(session_bus) = session_bus.internal_executor(false).build().await + { + run_event_loop(&executor, session_bus, rx).await.unwrap(); + } + } })) - }) - .write() - .unwrap() + }); + + Arc::new(RwLock::new(Self { + messages: tx, + name: None, + toolkit_name: None, + toolkit_version: None, + id: None, + desktop_address: None, + adapters: Vec::new(), + })) + }) } pub(crate) fn read<'a>() -> RwLockReadGuard<'a, AppContext> { - APP_CONTEXT.get().unwrap().read().unwrap() + AppContext::get_or_init().read().unwrap() } pub(crate) fn write<'a>() -> RwLockWriteGuard<'a, AppContext> { - APP_CONTEXT.get().unwrap().write().unwrap() + AppContext::get_or_init().write().unwrap() } pub(crate) fn adapter_index(&self, id: usize) -> Result { @@ -124,140 +139,111 @@ impl AppContext { } } -pub(crate) struct ActivationContext { - _task: Option>, - adapters: Vec<(usize, LazyAdapter)>, -} - -static ACTIVATION_CONTEXT: AsyncOnceCell>> = AsyncOnceCell::new(); - -impl ActivationContext { - async fn get_or_init<'a>() -> AsyncMutexGuard<'a, ActivationContext> { - ACTIVATION_CONTEXT - .get_or_init(async { - let task = Connection::session().await.ok().map(|session_bus| { - let session_bus_copy = session_bus.clone(); - session_bus.executor().spawn( - async move { - listen(session_bus_copy).await.unwrap(); - }, - "accesskit_task", - ) - }); - Arc::new(AsyncMutex::new(ActivationContext { - _task: task, - adapters: Vec::new(), - })) - }) - .await - .lock() - .await - } - - pub(crate) async fn activate_eventually(id: usize, adapter: LazyAdapter) { - let mut activation_context = ActivationContext::get_or_init().await; - activation_context.adapters.push((id, adapter)); - let is_a11y_enabled = AppContext::get_or_init().messages.is_some(); - if is_a11y_enabled { - let adapter = &activation_context.adapters.last().unwrap().1; - adapter.as_ref().await; - } - } - - pub(crate) async fn remove_adapter(id: usize) { - if let Some(activation_context) = ACTIVATION_CONTEXT.get() { - let mut context = activation_context.lock().await; - if let Ok(index) = context - .adapters - .binary_search_by(|adapter| adapter.0.cmp(&id)) - { - context.adapters.remove(index); +async fn run_event_loop( + executor: &Executor<'_>, + session_bus: Connection, + rx: Receiver, +) -> zbus::Result<()> { + let session_bus_copy = session_bus.clone(); + let _session_bus_task = executor.spawn( + async move { + loop { + session_bus_copy.executor().tick().await; } - } - } -} + }, + "accesskit_session_bus_task", + ); -async fn listen(session_bus: Connection) -> zbus::Result<()> { let status = StatusProxy::new(&session_bus).await?; let changes = status.receive_is_enabled_changed().await.fuse(); pin!(changes); #[cfg(not(feature = "tokio"))] - let (tx, messages) = { - let (tx, rx) = async_channel::unbounded(); - let messages = rx.fuse(); - (tx, messages) - }; + let messages = rx.fuse(); #[cfg(feature = "tokio")] - let (tx, messages) = { - let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); - let messages = UnboundedReceiverStream::new(rx).fuse(); - (tx, messages) - }; + let messages = UnboundedReceiverStream::new(rx).fuse(); pin!(messages); + let mut atspi_bus = None; + let mut adapters: Vec<(usize, LazyAdapter)> = Vec::new(); loop { select! { change = changes.next() => { - atspi_bus = if let Some(change) = change { + atspi_bus = None; + if let Some(change) = change { if change.get().await? { - map_or_ignoring_broken_pipe(Bus::new(&session_bus).await, None, Some)? - } else { - None + atspi_bus = map_or_ignoring_broken_pipe(Bus::new(&session_bus, executor).await, None, Some)?; } - } else { - None - }; - { - let mut app_context = AppContext::get_or_init(); - app_context.messages = Some(tx.clone()); } if atspi_bus.is_some() { - if let Some(activation_context) = ACTIVATION_CONTEXT.get() { - let activation_context = activation_context.lock().await; - for (_, adapter) in &activation_context.adapters { - adapter.as_ref().await.register_tree().await; - } + for (_, adapter) in &adapters { + adapter.as_ref().await.register_tree(); } } } message = messages.next() => { - if let Some((message, atspi_bus)) = message.zip(atspi_bus.as_ref()) { - process_adapter_message(atspi_bus, message).await?; + if let Some(message) = message { + process_adapter_message(&atspi_bus, &mut adapters, message).await?; } } } } } -async fn process_adapter_message(bus: &Bus, message: Message) -> zbus::Result<()> { +async fn process_adapter_message( + atspi_bus: &Option, + adapters: &mut Vec<(usize, LazyAdapter)>, + message: Message, +) -> zbus::Result<()> { match message { + Message::AddAdapter { id, adapter } => { + adapters.push((id, adapter)); + if atspi_bus.is_some() { + let adapter = &adapters.last_mut().unwrap().1; + adapter.as_ref().await.register_tree(); + } + } + Message::RemoveAdapter { id } => { + if let Ok(index) = adapters.binary_search_by(|adapter| adapter.0.cmp(&id)) { + adapters.remove(index); + } + } Message::RegisterInterfaces { adapter_id, context, node_id, interfaces, } => { - bus.register_interfaces(adapter_id, context, node_id, interfaces) - .await? + if let Some(bus) = atspi_bus { + bus.register_interfaces(adapter_id, context, node_id, interfaces) + .await? + } } Message::UnregisterInterfaces { adapter_id, node_id, interfaces, } => { - bus.unregister_interfaces(adapter_id, node_id, interfaces) - .await? + if let Some(bus) = atspi_bus { + bus.unregister_interfaces(adapter_id, node_id, interfaces) + .await? + } } Message::EmitEvent(Event::Object { target, event }) => { - bus.emit_object_event(target, event).await? + if let Some(bus) = atspi_bus { + bus.emit_object_event(target, event).await? + } } Message::EmitEvent(Event::Window { target, name, event, - }) => bus.emit_window_event(target, name, event).await?, + }) => { + if let Some(bus) = atspi_bus { + bus.emit_window_event(target, name, event).await?; + } + } } Ok(()) diff --git a/platforms/unix/src/executor.rs b/platforms/unix/src/executor.rs new file mode 100644 index 00000000..a7055837 --- /dev/null +++ b/platforms/unix/src/executor.rs @@ -0,0 +1,171 @@ +// Copyright 2024 The AccessKit Authors. All rights reserved. +// Licensed under the Apache License, Version 2.0 (found in +// the LICENSE-APACHE file) or the MIT license (found in +// the LICENSE-MIT file), at your option. + +// Derived from zbus. +// Copyright 2024 Zeeshan Ali Khan. +// Licensed under the MIT license (found in the LICENSE-MIT file). + +#[cfg(not(feature = "tokio"))] +use async_executor::Executor as AsyncExecutor; +#[cfg(not(feature = "tokio"))] +use async_task::Task as AsyncTask; +#[cfg(feature = "tokio")] +use std::marker::PhantomData; +#[cfg(not(feature = "tokio"))] +use std::sync::Arc; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; +#[cfg(feature = "tokio")] +use tokio::task::JoinHandle; + +/// A wrapper around the underlying runtime/executor. +/// +/// This is used to run asynchronous tasks internally and allows integration with various runtimes. +/// See [`crate::Connection::executor`] for an example of integration with external runtimes. +/// +/// **Note:** You can (and should) completely ignore this type when building with `tokio` feature +/// enabled. +#[cfg(not(feature = "tokio"))] +#[derive(Debug, Clone)] +pub(crate) struct Executor<'a> { + executor: Arc>, +} +#[cfg(feature = "tokio")] +#[derive(Debug, Clone)] +pub(crate) struct Executor<'a> { + phantom: PhantomData<&'a ()>, +} + +impl<'a> Executor<'a> { + /// Spawns a task onto the executor. + pub(crate) fn spawn( + &self, + future: impl Future + Send + 'static, + #[allow(unused)] name: &str, + ) -> Task { + #[cfg(not(feature = "tokio"))] + { + Task(Some(self.executor.spawn(future))) + } + + #[cfg(feature = "tokio")] + { + #[cfg(tokio_unstable)] + { + Task(Some( + tokio::task::Builder::new() + .name(name) + .spawn(future) + // SAFETY: Looking at the code, this call always returns an `Ok`. + .unwrap(), + )) + } + #[cfg(not(tokio_unstable))] + { + Task(Some(tokio::task::spawn(future))) + } + } + } + + /// Create a new `Executor`. + pub(crate) fn new() -> Self { + #[cfg(not(feature = "tokio"))] + { + Self { + executor: Arc::new(AsyncExecutor::new()), + } + } + + #[cfg(feature = "tokio")] + { + Self { + phantom: PhantomData, + } + } + } + + /// Runs the executor until the given future completes. + /// + /// With `tokio` feature enabled, it just awaits on the `future`. + pub(crate) async fn run(&self, future: impl Future) -> T { + #[cfg(not(feature = "tokio"))] + { + self.executor.run(future).await + } + #[cfg(feature = "tokio")] + { + future.await + } + } +} + +/// A wrapper around the task API of the underlying runtime/executor. +/// +/// This follows the semantics of `async_task::Task` on drop: +/// +/// * it will be cancelled, rather than detached. For detaching, use the `detach` method. +/// * errors from the task cancellation will will be ignored. If you need to know about task errors, +/// convert the task to a `FallibleTask` using the `fallible` method. +#[cfg(not(feature = "tokio"))] +#[derive(Debug)] +pub(crate) struct Task(Option>); +#[cfg(feature = "tokio")] +#[derive(Debug)] +pub(crate) struct Task(Option>); + +impl Task { + /// Detaches the task to let it keep running in the background. + #[allow(unused_mut)] + #[allow(unused)] + pub(crate) fn detach(mut self) { + #[cfg(not(feature = "tokio"))] + { + self.0.take().expect("async_task::Task is none").detach() + } + + #[cfg(feature = "tokio")] + { + self.0.take().expect("tokio::task::JoinHandle is none"); + } + } +} + +impl Drop for Task { + fn drop(&mut self) { + #[cfg(feature = "tokio")] + { + if let Some(join_handle) = self.0.take() { + join_handle.abort(); + } + } + } +} + +impl Future for Task { + type Output = T; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + #[cfg(not(feature = "tokio"))] + { + Pin::new(&mut self.get_mut().0.as_mut().expect("async_task::Task is none")).poll(cx) + } + + #[cfg(feature = "tokio")] + { + Pin::new( + &mut self + .get_mut() + .0 + .as_mut() + .expect("tokio::task::JoinHandle is none"), + ) + .poll(cx) + .map(|r| r.expect("tokio::task::JoinHandle error")) + } + } +} diff --git a/platforms/unix/src/lib.rs b/platforms/unix/src/lib.rs index 05fbff32..32084e35 100644 --- a/platforms/unix/src/lib.rs +++ b/platforms/unix/src/lib.rs @@ -9,6 +9,7 @@ extern crate zbus; mod adapter; mod atspi; mod context; +mod executor; mod filters; mod node; mod util; diff --git a/platforms/unix/src/node.rs b/platforms/unix/src/node.rs index 2fc1f919..3470ed51 100644 --- a/platforms/unix/src/node.rs +++ b/platforms/unix/src/node.rs @@ -486,172 +486,151 @@ impl<'a> NodeWrapper<'a> { self.node_state().numeric_value() } - pub(crate) async fn notify_changes( + pub(crate) fn notify_changes( &self, window_bounds: &WindowBounds, adapter: &AdapterImpl, old: &NodeWrapper<'_>, ) { - self.notify_state_changes(adapter, old).await; - self.notify_property_changes(adapter, old).await; - self.notify_bounds_changes(window_bounds, adapter, old) - .await; - self.notify_children_changes(adapter, old).await; + self.notify_state_changes(adapter, old); + self.notify_property_changes(adapter, old); + self.notify_bounds_changes(window_bounds, adapter, old); + self.notify_children_changes(adapter, old); } - async fn notify_state_changes(&self, adapter: &AdapterImpl, old: &NodeWrapper<'_>) { + fn notify_state_changes(&self, adapter: &AdapterImpl, old: &NodeWrapper<'_>) { let adapter_id = self.adapter(); let old_state = old.state(true); let new_state = self.state(true); let changed_states = old_state ^ new_state; for state in changed_states.iter() { - adapter - .emit_object_event( - ObjectId::Node { - adapter: adapter_id, - node: self.id(), - }, - ObjectEvent::StateChanged(state, new_state.contains(state)), - ) - .await; + adapter.emit_object_event( + ObjectId::Node { + adapter: adapter_id, + node: self.id(), + }, + ObjectEvent::StateChanged(state, new_state.contains(state)), + ); } } - async fn notify_property_changes(&self, adapter: &AdapterImpl, old: &NodeWrapper<'_>) { + fn notify_property_changes(&self, adapter: &AdapterImpl, old: &NodeWrapper<'_>) { let adapter_id = self.adapter(); let name = self.name(); if name != old.name() { let name = name.unwrap_or_default(); - adapter - .emit_object_event( + adapter.emit_object_event( + ObjectId::Node { + adapter: adapter_id, + node: self.id(), + }, + ObjectEvent::PropertyChanged(Property::Name(name.clone())), + ); + + let live = self.live(); + if live != AtspiLive::None { + adapter.emit_object_event( ObjectId::Node { adapter: adapter_id, node: self.id(), }, - ObjectEvent::PropertyChanged(Property::Name(name.clone())), - ) - .await; - - let live = self.live(); - if live != AtspiLive::None { - adapter - .emit_object_event( - ObjectId::Node { - adapter: adapter_id, - node: self.id(), - }, - ObjectEvent::Announcement(name, live), - ) - .await; + ObjectEvent::Announcement(name, live), + ); } } let description = self.description(); if description != old.description() { - adapter - .emit_object_event( - ObjectId::Node { - adapter: adapter_id, - node: self.id(), - }, - ObjectEvent::PropertyChanged(Property::Description(description)), - ) - .await; + adapter.emit_object_event( + ObjectId::Node { + adapter: adapter_id, + node: self.id(), + }, + ObjectEvent::PropertyChanged(Property::Description(description)), + ); } let parent_id = self.parent_id(); if parent_id != old.parent_id() { - adapter - .emit_object_event( - ObjectId::Node { - adapter: adapter_id, - node: self.id(), - }, - ObjectEvent::PropertyChanged(Property::Parent(self.filtered_parent())), - ) - .await; + adapter.emit_object_event( + ObjectId::Node { + adapter: adapter_id, + node: self.id(), + }, + ObjectEvent::PropertyChanged(Property::Parent(self.filtered_parent())), + ); } let role = self.role(); if role != old.role() { - adapter - .emit_object_event( + adapter.emit_object_event( + ObjectId::Node { + adapter: adapter_id, + node: self.id(), + }, + ObjectEvent::PropertyChanged(Property::Role(role)), + ); + } + if let Some(value) = self.current_value() { + if Some(value) != old.current_value() { + adapter.emit_object_event( ObjectId::Node { adapter: adapter_id, node: self.id(), }, - ObjectEvent::PropertyChanged(Property::Role(role)), - ) - .await; - } - if let Some(value) = self.current_value() { - if Some(value) != old.current_value() { - adapter - .emit_object_event( - ObjectId::Node { - adapter: adapter_id, - node: self.id(), - }, - ObjectEvent::PropertyChanged(Property::Value(value)), - ) - .await; + ObjectEvent::PropertyChanged(Property::Value(value)), + ); } } } - async fn notify_bounds_changes( + fn notify_bounds_changes( &self, window_bounds: &WindowBounds, adapter: &AdapterImpl, old: &NodeWrapper<'_>, ) { if self.raw_bounds_and_transform() != old.raw_bounds_and_transform() { - adapter - .emit_object_event( - ObjectId::Node { - adapter: self.adapter(), - node: self.id(), - }, - ObjectEvent::BoundsChanged(self.extents(window_bounds)), - ) - .await; + adapter.emit_object_event( + ObjectId::Node { + adapter: self.adapter(), + node: self.id(), + }, + ObjectEvent::BoundsChanged(self.extents(window_bounds)), + ); } } - async fn notify_children_changes(&self, adapter: &AdapterImpl, old: &NodeWrapper<'_>) { + fn notify_children_changes(&self, adapter: &AdapterImpl, old: &NodeWrapper<'_>) { let adapter_id = self.adapter(); let old_children = old.child_ids().collect::>(); let filtered_children = self.filtered_child_ids().collect::>(); for (index, child) in filtered_children.iter().enumerate() { if !old_children.contains(child) { - adapter - .emit_object_event( + adapter.emit_object_event( + ObjectId::Node { + adapter: adapter_id, + node: self.id(), + }, + ObjectEvent::ChildAdded( + index, ObjectId::Node { adapter: adapter_id, - node: self.id(), + node: *child, }, - ObjectEvent::ChildAdded( - index, - ObjectId::Node { - adapter: adapter_id, - node: *child, - }, - ), - ) - .await; + ), + ); } } for child in old_children.into_iter() { if !filtered_children.contains(&child) { - adapter - .emit_object_event( - ObjectId::Node { - adapter: adapter_id, - node: self.id(), - }, - ObjectEvent::ChildRemoved(ObjectId::Node { - adapter: adapter_id, - node: child, - }), - ) - .await; + adapter.emit_object_event( + ObjectId::Node { + adapter: adapter_id, + node: self.id(), + }, + ObjectEvent::ChildRemoved(ObjectId::Node { + adapter: adapter_id, + node: child, + }), + ); } } } diff --git a/platforms/unix/src/util.rs b/platforms/unix/src/util.rs index 38b71a94..239fd600 100644 --- a/platforms/unix/src/util.rs +++ b/platforms/unix/src/util.rs @@ -5,38 +5,20 @@ use accesskit::{Point, Rect}; use atspi::CoordType; -#[cfg(feature = "tokio")] -use once_cell::sync::Lazy; #[cfg(not(feature = "tokio"))] pub(crate) fn block_on(future: F) -> F::Output { - zbus::block_on(future) + futures_lite::future::block_on(future) } -#[cfg(feature = "tokio")] -pub(crate) static TOKIO_RT: Lazy = Lazy::new(|| { - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .expect("create tokio runtime"); - let handle = rt.handle().clone(); - std::thread::Builder::new() - .name("accesskit-tokio".into()) - .spawn(move || { - rt.block_on(async { - let duration = std::time::Duration::from_secs(86400); - loop { - tokio::time::sleep(duration).await; - } - }); - }) - .expect("launch tokio runtime thread"); - handle -}); - #[cfg(feature = "tokio")] pub(crate) fn block_on(future: F) -> F::Output { - TOKIO_RT.block_on(future) + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_io() + .enable_time() + .build() + .expect("launch of single-threaded tokio runtime"); + runtime.block_on(future) } #[derive(Clone, Copy, Default)]