diff --git a/Cargo.toml b/Cargo.toml index e1ab8ef2..99300a77 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,7 +42,7 @@ futures-lite = "1.10.1" http = "0.2.1" log = "0.4" once_cell = "1" -polling = "2" +polling = "3" slab = "0.4" sluice = "0.5.4" url = "2.1" diff --git a/src/agent/selector.rs b/src/agent/selector.rs index 2f789879..2021d106 100644 --- a/src/agent/selector.rs +++ b/src/agent/selector.rs @@ -1,9 +1,10 @@ use curl::multi::Socket; -use polling::{Event, Poller}; +use polling::{Event, Events, Poller}; use std::{ collections::{HashMap, HashSet}, hash::{BuildHasherDefault, Hasher}, io, + os::fd::BorrowedFd, sync::Arc, task::Waker, time::Duration, @@ -30,7 +31,7 @@ pub(crate) struct Selector { /// Socket events that have occurred. We re-use this vec every call for /// efficiency. - events: Vec, + events: Events, /// Incrementing counter used to deduplicate registration operations. tick: usize, @@ -50,7 +51,7 @@ impl Selector { poller: Arc::new(Poller::new()?), sockets: HashMap::with_hasher(Default::default()), bad_sockets: HashSet::with_hasher(Default::default()), - events: Vec::new(), + events: Events::new(), tick: 0, }) } @@ -113,18 +114,21 @@ impl Selector { } /// Remove a socket from the selector and stop receiving events for it. + #[allow(unsafe_code)] pub(crate) fn deregister(&mut self, socket: Socket) -> io::Result<()> { // Remove this socket from our bookkeeping. If we recognize it, also // remove it from the underlying poller. if self.sockets.remove(&socket).is_some() { self.bad_sockets.remove(&socket); + let socket_as_fd = unsafe { BorrowedFd::borrow_raw(socket) }; + // There's a good chance that the socket has already been closed. // Depending on the poller implementation, it may have already // forgotten about this socket (e.g. epoll). Therefore if we get an // error back complaining that the socket is invalid, we can safely // ignore it. - if let Err(e) = self.poller.delete(socket) { + if let Err(e) = self.poller.delete(&socket_as_fd) { if !is_bad_socket_error(&e) && e.kind() != io::ErrorKind::PermissionDenied { return Err(e); } @@ -144,7 +148,7 @@ impl Selector { // We don't do this immediately after polling because the caller may // choose to de-register a socket before the next call. That's why we // wait until the last minute. - for event in self.events.drain(..) { + for event in self.events.iter() { let socket = event.key as Socket; if let Some(registration) = self.sockets.get_mut(&socket) { // If the socket was already re-registered this tick, then we @@ -160,6 +164,7 @@ impl Selector { } } } + self.events.clear(); // Iterate over sockets that have been registered, but failed to be // added to the underlying poller temporarily, and retry adding them. @@ -204,6 +209,7 @@ impl Selector { } } +#[allow(unsafe_code)] fn poller_add(poller: &Poller, socket: Socket, readable: bool, writable: bool) -> io::Result<()> { // If this errors, we retry the operation as a modification instead. This is // because this new socket might re-use a file descriptor that was @@ -211,26 +217,28 @@ fn poller_add(poller: &Poller, socket: Socket, readable: bool, writable: bool) - // operation as a modification is sufficient to handle this. // // This is especially common with the epoll backend. - if let Err(e) = poller.add(socket, Event { - key: socket as usize, - readable, - writable, - }) { + let socket_as_fd = unsafe { BorrowedFd::borrow_raw(socket) }; + if let Err(e) = unsafe { + poller.add( + &socket_as_fd, + Event::new(socket as usize, readable, writable), + ) + } { tracing::debug!( "failed to add interest for socket {}, retrying as a modify: {}", socket, e ); - poller.modify(socket, Event { - key: socket as usize, - readable, - writable, - })?; + poller.modify( + &socket_as_fd, + Event::new(socket as usize, readable, writable), + )?; } Ok(()) } +#[allow(unsafe_code)] fn poller_modify( poller: &Poller, socket: Socket, @@ -239,21 +247,22 @@ fn poller_modify( ) -> io::Result<()> { // If this errors, we retry the operation as an add instead. This is done // because epoll is weird. - if let Err(e) = poller.modify(socket, Event { - key: socket as usize, - readable, - writable, - }) { + let socket_as_fd = unsafe { BorrowedFd::borrow_raw(socket) }; + if let Err(e) = poller.modify( + &socket_as_fd, + Event::new(socket as usize, readable, writable), + ) { tracing::debug!( "failed to modify interest for socket {}, retrying as an add: {}", socket, e ); - poller.add(socket, Event { - key: socket as usize, - readable, - writable, - })?; + unsafe { + poller.add( + &socket_as_fd, + Event::new(socket as usize, readable, writable), + )?; + } } Ok(())