Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement blocking unnamed_socket #4072

Merged
merged 1 commit into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/concurrency/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ pub enum BlockReason {
Epoll,
/// Blocked on eventfd.
Eventfd,
/// Blocked on unnamed_socket.
UnnamedSocket,
}

/// The state of a thread.
Expand Down
237 changes: 157 additions & 80 deletions src/shims/unix/unnamed_socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ struct AnonSocket {
/// This flag is set to `true` if the peer's `readbuf` is non-empty at the time
/// of closure.
peer_lost_data: Cell<bool>,
/// A list of thread ids blocked because the buffer was empty.
/// Once another thread writes some bytes, these threads will be unblocked.
blocked_read_tid: RefCell<Vec<ThreadId>>,
/// A list of thread ids blocked because the buffer was full.
/// Once another thread reads some bytes, these threads will be unblocked.
blocked_write_tid: RefCell<Vec<ThreadId>>,
is_nonblock: bool,
}

Expand Down Expand Up @@ -83,7 +89,7 @@ impl FileDescription for AnonSocket {

fn read<'tcx>(
&self,
_self_ref: &FileDescriptionRef,
self_ref: &FileDescriptionRef,
_communicate_allowed: bool,
ptr: Pointer,
len: usize,
Expand All @@ -100,33 +106,21 @@ impl FileDescription for AnonSocket {
// corresponding ErrorKind variant.
throw_unsup_format!("reading from the write end of a pipe");
};
if readbuf.borrow().buf.is_empty() {
if self.peer_fd().upgrade().is_none() {
// Socketpair with no peer and empty buffer.
// 0 bytes successfully read indicates end-of-file.
return ecx.return_read_success(ptr, &[], 0, dest);
} else {
if self.is_nonblock {
// Non-blocking socketpair with writer and empty buffer.
// https://linux.die.net/man/2/read
// EAGAIN or EWOULDBLOCK can be returned for socket,
// POSIX.1-2001 allows either error to be returned for this case.
// Since there is no ErrorKind for EAGAIN, WouldBlock is used.
return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest);
} else {
// Blocking socketpair with writer and empty buffer.
// FIXME: blocking is currently not supported
throw_unsup_format!("socketpair/pipe/pipe2 read: blocking isn't supported yet");
}
}

if readbuf.borrow().buf.is_empty() && self.is_nonblock {
// Non-blocking socketpair with writer and empty buffer.
// https://linux.die.net/man/2/read
// EAGAIN or EWOULDBLOCK can be returned for socket,
// POSIX.1-2001 allows either error to be returned for this case.
// Since there is no ErrorKind for EAGAIN, WouldBlock is used.
return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest);
}
// TODO: We might need to decide what to do if peer_fd is closed when read is blocked.
oli-obk marked this conversation as resolved.
Show resolved Hide resolved
anonsocket_read(self, self.peer_fd().upgrade(), len, ptr, dest, ecx)
tiif marked this conversation as resolved.
Show resolved Hide resolved
anonsocket_read(self_ref.downgrade(), len, ptr, dest.clone(), ecx)
}

fn write<'tcx>(
&self,
_self_ref: &FileDescriptionRef,
self_ref: &FileDescriptionRef,
_communicate_allowed: bool,
ptr: Pointer,
len: usize,
Expand All @@ -153,16 +147,11 @@ impl FileDescription for AnonSocket {
};
let available_space =
MAX_SOCKETPAIR_BUFFER_CAPACITY.strict_sub(writebuf.borrow().buf.len());
if available_space == 0 {
if self.is_nonblock {
// Non-blocking socketpair with a full buffer.
return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest);
} else {
// Blocking socketpair with a full buffer.
throw_unsup_format!("socketpair/pipe/pipe2 write: blocking isn't supported yet");
}
if available_space == 0 && self.is_nonblock {
// Non-blocking socketpair with a full buffer.
return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest);
}
anonsocket_write(available_space, &peer_fd, ptr, len, dest, ecx)
anonsocket_write(self_ref.downgrade(), ptr, len, dest.clone(), ecx)
}

fn as_unix(&self) -> &dyn UnixFileDescription {
Expand All @@ -172,81 +161,161 @@ impl FileDescription for AnonSocket {

/// Write to AnonSocket based on the space available and return the written byte size.
fn anonsocket_write<'tcx>(
available_space: usize,
peer_fd: &FileDescriptionRef,
weak_self_ref: WeakFileDescriptionRef,
ptr: Pointer,
len: usize,
dest: &MPlaceTy<'tcx>,
dest: MPlaceTy<'tcx>,
ecx: &mut MiriInterpCx<'tcx>,
) -> InterpResult<'tcx> {
let Some(self_ref) = weak_self_ref.upgrade() else {
// FIXME: We should raise a deadlock error if the self_ref upgrade failed.
throw_unsup_format!("This will be a deadlock error in future")
};
Comment on lines +170 to +173
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this PR is big enough, so I decided to temporarily make this throws unsupported format.

I wanted to write a test for this, and potentially tweak the diagnostic a bit if it is too confusing (because of the reason stated here).

But I certainly won't mind putting it in this PR if anyone prefers.

let self_anonsocket = self_ref.downcast::<AnonSocket>().unwrap();
let Some(peer_fd) = self_anonsocket.peer_fd().upgrade() else {
// If the upgrade from Weak to Rc fails, it indicates that all read ends have been
// closed.
return ecx.set_last_error_and_return(ErrorKind::BrokenPipe, &dest);
};
let Some(writebuf) = &peer_fd.downcast::<AnonSocket>().unwrap().readbuf else {
// FIXME: This should return EBADF, but there's no nice way to do that as there's no
// corresponding ErrorKind variant.
throw_unsup_format!("writing to the reading end of a pipe")
};
let mut writebuf = writebuf.borrow_mut();

// Remember this clock so `read` can synchronize with us.
ecx.release_clock(|clock| {
writebuf.clock.join(clock);
});
// Do full write / partial write based on the space available.
let actual_write_size = len.min(available_space);
let bytes = ecx.read_bytes_ptr_strip_provenance(ptr, Size::from_bytes(len))?;
writebuf.buf.extend(&bytes[..actual_write_size]);
let available_space = MAX_SOCKETPAIR_BUFFER_CAPACITY.strict_sub(writebuf.borrow().buf.len());

if available_space == 0 {
// Blocking socketpair with a full buffer.
let dest = dest.clone();
self_anonsocket.blocked_write_tid.borrow_mut().push(ecx.active_thread());
ecx.block_thread(
BlockReason::UnnamedSocket,
None,
callback!(
@capture<'tcx> {
weak_self_ref: WeakFileDescriptionRef,
ptr: Pointer,
len: usize,
dest: MPlaceTy<'tcx>,
}
@unblock = |this| {
anonsocket_write(weak_self_ref, ptr, len, dest, this)
}
),
);
} else {
let mut writebuf = writebuf.borrow_mut();
// Remember this clock so `read` can synchronize with us.
ecx.release_clock(|clock| {
writebuf.clock.join(clock);
});
// Do full write / partial write based on the space available.
let actual_write_size = len.min(available_space);
let bytes = ecx.read_bytes_ptr_strip_provenance(ptr, Size::from_bytes(len))?;
writebuf.buf.extend(&bytes[..actual_write_size]);

// Need to stop accessing peer_fd so that it can be notified.
drop(writebuf);
// Need to stop accessing peer_fd so that it can be notified.
drop(writebuf);

// Notification should be provided for peer fd as it became readable.
// The kernel does this even if the fd was already readable before, so we follow suit.
ecx.check_and_update_readiness(peer_fd)?;
// Notification should be provided for peer fd as it became readable.
// The kernel does this even if the fd was already readable before, so we follow suit.
ecx.check_and_update_readiness(&peer_fd)?;
let peer_anonsocket = peer_fd.downcast::<AnonSocket>().unwrap();
// Unblock all threads that are currently blocked on peer_fd's read.
let waiting_threads = std::mem::take(&mut *peer_anonsocket.blocked_read_tid.borrow_mut());
// FIXME: We can randomize the order of unblocking.
for thread_id in waiting_threads {
ecx.unblock_thread(thread_id, BlockReason::UnnamedSocket)?;
}

ecx.return_write_success(actual_write_size, dest)
return ecx.return_write_success(actual_write_size, &dest);
}
interp_ok(())
}

/// Read from AnonSocket and return the number of bytes read.
fn anonsocket_read<'tcx>(
anonsocket: &AnonSocket,
peer_fd: Option<FileDescriptionRef>,
weak_self_ref: WeakFileDescriptionRef,
len: usize,
ptr: Pointer,
dest: &MPlaceTy<'tcx>,
dest: MPlaceTy<'tcx>,
ecx: &mut MiriInterpCx<'tcx>,
) -> InterpResult<'tcx> {
let mut bytes = vec![0; len];
let Some(self_ref) = weak_self_ref.upgrade() else {
// FIXME: We should raise a deadlock error if the self_ref upgrade failed.
throw_unsup_format!("This will be a deadlock error in future")
};
let self_anonsocket = self_ref.downcast::<AnonSocket>().unwrap();

let Some(readbuf) = &anonsocket.readbuf else {
let Some(readbuf) = &self_anonsocket.readbuf else {
// FIXME: This should return EBADF, but there's no nice way to do that as there's no
// corresponding ErrorKind variant.
throw_unsup_format!("reading from the write end of a pipe")
};
let mut readbuf = readbuf.borrow_mut();

// Synchronize with all previous writes to this buffer.
// FIXME: this over-synchronizes; a more precise approach would be to
// only sync with the writes whose data we will read.
ecx.acquire_clock(&readbuf.clock);

// Do full read / partial read based on the space available.
// Conveniently, `read` exists on `VecDeque` and has exactly the desired behavior.
let actual_read_size = readbuf.buf.read(&mut bytes[..]).unwrap();

// Need to drop before others can access the readbuf again.
drop(readbuf);

// A notification should be provided for the peer file description even when it can
// only write 1 byte. This implementation is not compliant with the actual Linux kernel
// implementation. For optimization reasons, the kernel will only mark the file description
// as "writable" when it can write more than a certain number of bytes. Since we
// don't know what that *certain number* is, we will provide a notification every time
// a read is successful. This might result in our epoll emulation providing more
// notifications than the real system.
if let Some(peer_fd) = peer_fd {
ecx.check_and_update_readiness(&peer_fd)?;
}

ecx.return_read_success(ptr, &bytes, actual_read_size, dest)
if readbuf.borrow_mut().buf.is_empty() {
if self_anonsocket.peer_fd().upgrade().is_none() {
// Socketpair with no peer and empty buffer.
// 0 bytes successfully read indicates end-of-file.
return ecx.return_read_success(ptr, &[], 0, &dest);
} else {
// Blocking socketpair with writer and empty buffer.
let weak_self_ref = weak_self_ref.clone();
self_anonsocket.blocked_read_tid.borrow_mut().push(ecx.active_thread());
ecx.block_thread(
BlockReason::UnnamedSocket,
None,
callback!(
@capture<'tcx> {
weak_self_ref: WeakFileDescriptionRef,
len: usize,
ptr: Pointer,
dest: MPlaceTy<'tcx>,
}
@unblock = |this| {
anonsocket_read(weak_self_ref, len, ptr, dest, this)
}
),
);
}
} else {
let mut bytes = vec![0; len];
let mut readbuf = readbuf.borrow_mut();
// Synchronize with all previous writes to this buffer.
// FIXME: this over-synchronizes; a more precise approach would be to
// only sync with the writes whose data we will read.
ecx.acquire_clock(&readbuf.clock);

// Do full read / partial read based on the space available.
// Conveniently, `read` exists on `VecDeque` and has exactly the desired behavior.
let actual_read_size = readbuf.buf.read(&mut bytes[..]).unwrap();

// Need to drop before others can access the readbuf again.
drop(readbuf);

// A notification should be provided for the peer file description even when it can
// only write 1 byte. This implementation is not compliant with the actual Linux kernel
// implementation. For optimization reasons, the kernel will only mark the file description
// as "writable" when it can write more than a certain number of bytes. Since we
// don't know what that *certain number* is, we will provide a notification every time
// a read is successful. This might result in our epoll emulation providing more
// notifications than the real system.
if let Some(peer_fd) = self_anonsocket.peer_fd().upgrade() {
ecx.check_and_update_readiness(&peer_fd)?;
let peer_anonsocket = peer_fd.downcast::<AnonSocket>().unwrap();
// Unblock all threads that are currently blocked on peer_fd's write.
let waiting_threads =
std::mem::take(&mut *peer_anonsocket.blocked_write_tid.borrow_mut());
// FIXME: We can randomize the order of unblocking.
for thread_id in waiting_threads {
ecx.unblock_thread(thread_id, BlockReason::UnnamedSocket)?;
}
};

return ecx.return_read_success(ptr, &bytes, actual_read_size, &dest);
}
interp_ok(())
}

impl UnixFileDescription for AnonSocket {
Expand Down Expand Up @@ -360,12 +429,16 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
readbuf: Some(RefCell::new(Buffer::new())),
peer_fd: OnceCell::new(),
peer_lost_data: Cell::new(false),
blocked_read_tid: RefCell::new(Vec::new()),
blocked_write_tid: RefCell::new(Vec::new()),
is_nonblock: is_sock_nonblock,
});
let fd1 = fds.new_ref(AnonSocket {
readbuf: Some(RefCell::new(Buffer::new())),
peer_fd: OnceCell::new(),
peer_lost_data: Cell::new(false),
blocked_read_tid: RefCell::new(Vec::new()),
blocked_write_tid: RefCell::new(Vec::new()),
is_nonblock: is_sock_nonblock,
});

Expand Down Expand Up @@ -424,12 +497,16 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
readbuf: Some(RefCell::new(Buffer::new())),
peer_fd: OnceCell::new(),
peer_lost_data: Cell::new(false),
blocked_read_tid: RefCell::new(Vec::new()),
blocked_write_tid: RefCell::new(Vec::new()),
is_nonblock,
});
let fd1 = fds.new_ref(AnonSocket {
readbuf: None,
peer_fd: OnceCell::new(),
peer_lost_data: Cell::new(false),
blocked_read_tid: RefCell::new(Vec::new()),
blocked_write_tid: RefCell::new(Vec::new()),
is_nonblock,
});

Expand Down
47 changes: 47 additions & 0 deletions tests/fail-dep/libc/socketpair_block_read_twice.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
//@ignore-target: windows # No libc socketpair on Windows
//~^ERROR: deadlocked
//~^^ERROR: deadlocked
// test_race depends on a deterministic schedule.
//@compile-flags: -Zmiri-preemption-rate=0
//@error-in-other-file: deadlock

use std::thread;

// Test the behaviour of a thread being blocked on read, get unblocked, then blocked again.

// The expected execution is
// 1. Thread 1 blocks.
// 2. Thread 2 blocks.
// 3. Thread 3 unblocks both thread 1 and thread 2.
// 4. Thread 1 reads.
// 5. Thread 2's `read` can never complete -> deadlocked.

fn main() {
let mut fds = [-1, -1];
let res = unsafe { libc::socketpair(libc::AF_UNIX, libc::SOCK_STREAM, 0, fds.as_mut_ptr()) };
assert_eq!(res, 0);
let thread1 = thread::spawn(move || {
// Let this thread block on read.
let mut buf: [u8; 3] = [0; 3];
let res = unsafe { libc::read(fds[1], buf.as_mut_ptr().cast(), buf.len() as libc::size_t) };
assert_eq!(res, 3);
assert_eq!(&buf, "abc".as_bytes());
});
let thread2 = thread::spawn(move || {
// Let this thread block on read.
let mut buf: [u8; 3] = [0; 3];
let res = unsafe { libc::read(fds[1], buf.as_mut_ptr().cast(), buf.len() as libc::size_t) };
//~^ERROR: deadlocked
assert_eq!(res, 3);
assert_eq!(&buf, "abc".as_bytes());
});
let thread3 = thread::spawn(move || {
// Unblock thread1 by writing something.
let data = "abc".as_bytes().as_ptr();
let res = unsafe { libc::write(fds[0], data as *const libc::c_void, 3) };
assert_eq!(res, 3);
});
thread1.join().unwrap();
thread2.join().unwrap();
thread3.join().unwrap();
}
Loading
Loading