Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update to polling 3.x #457

Open
wants to merge 1 commit into
base: 1.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ futures-lite = "1.10.1"
http = "0.2.1"
log = "0.4"
once_cell = "1"
polling = "2"
polling = "3"
slab = "0.4"
sluice = "0.5.4"
url = "2.1"
Expand Down
59 changes: 34 additions & 25 deletions src/agent/selector.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use curl::multi::Socket;
use polling::{Event, Poller};
use polling::{Event, Events, Poller};
use std::{
collections::{HashMap, HashSet},
hash::{BuildHasherDefault, Hasher},
io,
os::fd::BorrowedFd,
sync::Arc,
task::Waker,
time::Duration,
Expand All @@ -30,7 +31,7 @@ pub(crate) struct Selector {

/// Socket events that have occurred. We re-use this vec every call for
/// efficiency.
events: Vec<Event>,
events: Events,

/// Incrementing counter used to deduplicate registration operations.
tick: usize,
Expand All @@ -50,7 +51,7 @@ impl Selector {
poller: Arc::new(Poller::new()?),
sockets: HashMap::with_hasher(Default::default()),
bad_sockets: HashSet::with_hasher(Default::default()),
events: Vec::new(),
events: Events::new(),
tick: 0,
})
}
Expand Down Expand Up @@ -113,18 +114,21 @@ impl Selector {
}

/// Remove a socket from the selector and stop receiving events for it.
#[allow(unsafe_code)]
pub(crate) fn deregister(&mut self, socket: Socket) -> io::Result<()> {
// Remove this socket from our bookkeeping. If we recognize it, also
// remove it from the underlying poller.
if self.sockets.remove(&socket).is_some() {
self.bad_sockets.remove(&socket);

let socket_as_fd = unsafe { BorrowedFd::borrow_raw(socket) };

// There's a good chance that the socket has already been closed.
// Depending on the poller implementation, it may have already
// forgotten about this socket (e.g. epoll). Therefore if we get an
// error back complaining that the socket is invalid, we can safely
// ignore it.
if let Err(e) = self.poller.delete(socket) {
if let Err(e) = self.poller.delete(&socket_as_fd) {
if !is_bad_socket_error(&e) && e.kind() != io::ErrorKind::PermissionDenied {
return Err(e);
}
Expand All @@ -144,7 +148,7 @@ impl Selector {
// We don't do this immediately after polling because the caller may
// choose to de-register a socket before the next call. That's why we
// wait until the last minute.
for event in self.events.drain(..) {
for event in self.events.iter() {
let socket = event.key as Socket;
if let Some(registration) = self.sockets.get_mut(&socket) {
// If the socket was already re-registered this tick, then we
Expand All @@ -160,6 +164,7 @@ impl Selector {
}
}
}
self.events.clear();

// Iterate over sockets that have been registered, but failed to be
// added to the underlying poller temporarily, and retry adding them.
Expand Down Expand Up @@ -204,33 +209,36 @@ impl Selector {
}
}

#[allow(unsafe_code)]
fn poller_add(poller: &Poller, socket: Socket, readable: bool, writable: bool) -> io::Result<()> {
// If this errors, we retry the operation as a modification instead. This is
// because this new socket might re-use a file descriptor that was
// previously closed, but is still registered with the poller. Retrying the
// operation as a modification is sufficient to handle this.
//
// This is especially common with the epoll backend.
if let Err(e) = poller.add(socket, Event {
key: socket as usize,
readable,
writable,
}) {
let socket_as_fd = unsafe { BorrowedFd::borrow_raw(socket) };
if let Err(e) = unsafe {
poller.add(
&socket_as_fd,
Event::new(socket as usize, readable, writable),
)
} {
tracing::debug!(
"failed to add interest for socket {}, retrying as a modify: {}",
socket,
e
);
poller.modify(socket, Event {
key: socket as usize,
readable,
writable,
})?;
poller.modify(
&socket_as_fd,
Event::new(socket as usize, readable, writable),
)?;
}

Ok(())
}

#[allow(unsafe_code)]
fn poller_modify(
poller: &Poller,
socket: Socket,
Expand All @@ -239,21 +247,22 @@ fn poller_modify(
) -> io::Result<()> {
// If this errors, we retry the operation as an add instead. This is done
// because epoll is weird.
if let Err(e) = poller.modify(socket, Event {
key: socket as usize,
readable,
writable,
}) {
let socket_as_fd = unsafe { BorrowedFd::borrow_raw(socket) };
if let Err(e) = poller.modify(
&socket_as_fd,
Event::new(socket as usize, readable, writable),
) {
tracing::debug!(
"failed to modify interest for socket {}, retrying as an add: {}",
socket,
e
);
poller.add(socket, Event {
key: socket as usize,
readable,
writable,
})?;
unsafe {
poller.add(
&socket_as_fd,
Event::new(socket as usize, readable, writable),
)?;
}
}

Ok(())
Expand Down