From f76c9fa95938671825ca18479b36c62c234e651f Mon Sep 17 00:00:00 2001 From: tiif Date: Fri, 29 Nov 2024 00:04:01 +0800 Subject: [PATCH] Implement blocking unnamed_socket --- src/concurrency/thread.rs | 2 + src/shims/unix/unnamed_socket.rs | 237 ++++++++++++------ .../libc/socketpair_block_read_twice.rs | 47 ++++ .../libc/socketpair_block_read_twice.stderr | 41 +++ .../libc/socketpair_block_write_twice.rs | 49 ++++ .../libc/socketpair_block_write_twice.stderr | 41 +++ .../fail-dep/libc/socketpair_read_blocking.rs | 12 - .../libc/socketpair_read_blocking.stderr | 5 +- .../libc/socketpair_write_blocking.rs | 16 -- .../libc/socketpair_write_blocking.stderr | 5 +- tests/pass-dep/libc/libc-socketpair.rs | 50 ++++ 11 files changed, 391 insertions(+), 114 deletions(-) create mode 100644 tests/fail-dep/libc/socketpair_block_read_twice.rs create mode 100644 tests/fail-dep/libc/socketpair_block_read_twice.stderr create mode 100644 tests/fail-dep/libc/socketpair_block_write_twice.rs create mode 100644 tests/fail-dep/libc/socketpair_block_write_twice.stderr delete mode 100644 tests/fail-dep/libc/socketpair_read_blocking.rs delete mode 100644 tests/fail-dep/libc/socketpair_write_blocking.rs diff --git a/src/concurrency/thread.rs b/src/concurrency/thread.rs index 59e2fdd428..730c27d016 100644 --- a/src/concurrency/thread.rs +++ b/src/concurrency/thread.rs @@ -159,6 +159,8 @@ pub enum BlockReason { Epoll, /// Blocked on eventfd. Eventfd, + /// Blocked on unnamed_socket. + UnnamedSocket, } /// The state of a thread. diff --git a/src/shims/unix/unnamed_socket.rs b/src/shims/unix/unnamed_socket.rs index 40a76ea743..86ebe95762 100644 --- a/src/shims/unix/unnamed_socket.rs +++ b/src/shims/unix/unnamed_socket.rs @@ -36,6 +36,12 @@ struct AnonSocket { /// This flag is set to `true` if the peer's `readbuf` is non-empty at the time /// of closure. peer_lost_data: Cell, + /// A list of thread ids blocked because the buffer was empty. + /// Once another thread writes some bytes, these threads will be unblocked. + blocked_read_tid: RefCell>, + /// A list of thread ids blocked because the buffer was full. + /// Once another thread reads some bytes, these threads will be unblocked. + blocked_write_tid: RefCell>, is_nonblock: bool, } @@ -83,7 +89,7 @@ impl FileDescription for AnonSocket { fn read<'tcx>( &self, - _self_ref: &FileDescriptionRef, + self_ref: &FileDescriptionRef, _communicate_allowed: bool, ptr: Pointer, len: usize, @@ -100,33 +106,21 @@ impl FileDescription for AnonSocket { // corresponding ErrorKind variant. throw_unsup_format!("reading from the write end of a pipe"); }; - if readbuf.borrow().buf.is_empty() { - if self.peer_fd().upgrade().is_none() { - // Socketpair with no peer and empty buffer. - // 0 bytes successfully read indicates end-of-file. - return ecx.return_read_success(ptr, &[], 0, dest); - } else { - if self.is_nonblock { - // Non-blocking socketpair with writer and empty buffer. - // https://linux.die.net/man/2/read - // EAGAIN or EWOULDBLOCK can be returned for socket, - // POSIX.1-2001 allows either error to be returned for this case. - // Since there is no ErrorKind for EAGAIN, WouldBlock is used. - return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest); - } else { - // Blocking socketpair with writer and empty buffer. - // FIXME: blocking is currently not supported - throw_unsup_format!("socketpair/pipe/pipe2 read: blocking isn't supported yet"); - } - } + + if readbuf.borrow().buf.is_empty() && self.is_nonblock { + // Non-blocking socketpair with writer and empty buffer. + // https://linux.die.net/man/2/read + // EAGAIN or EWOULDBLOCK can be returned for socket, + // POSIX.1-2001 allows either error to be returned for this case. + // Since there is no ErrorKind for EAGAIN, WouldBlock is used. + return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest); } - // TODO: We might need to decide what to do if peer_fd is closed when read is blocked. - anonsocket_read(self, self.peer_fd().upgrade(), len, ptr, dest, ecx) + anonsocket_read(self_ref.downgrade(), len, ptr, dest.clone(), ecx) } fn write<'tcx>( &self, - _self_ref: &FileDescriptionRef, + self_ref: &FileDescriptionRef, _communicate_allowed: bool, ptr: Pointer, len: usize, @@ -153,16 +147,11 @@ impl FileDescription for AnonSocket { }; let available_space = MAX_SOCKETPAIR_BUFFER_CAPACITY.strict_sub(writebuf.borrow().buf.len()); - if available_space == 0 { - if self.is_nonblock { - // Non-blocking socketpair with a full buffer. - return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest); - } else { - // Blocking socketpair with a full buffer. - throw_unsup_format!("socketpair/pipe/pipe2 write: blocking isn't supported yet"); - } + if available_space == 0 && self.is_nonblock { + // Non-blocking socketpair with a full buffer. + return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest); } - anonsocket_write(available_space, &peer_fd, ptr, len, dest, ecx) + anonsocket_write(self_ref.downgrade(), ptr, len, dest.clone(), ecx) } fn as_unix(&self) -> &dyn UnixFileDescription { @@ -172,81 +161,161 @@ impl FileDescription for AnonSocket { /// Write to AnonSocket based on the space available and return the written byte size. fn anonsocket_write<'tcx>( - available_space: usize, - peer_fd: &FileDescriptionRef, + weak_self_ref: WeakFileDescriptionRef, ptr: Pointer, len: usize, - dest: &MPlaceTy<'tcx>, + dest: MPlaceTy<'tcx>, ecx: &mut MiriInterpCx<'tcx>, ) -> InterpResult<'tcx> { + let Some(self_ref) = weak_self_ref.upgrade() else { + // FIXME: We should raise a deadlock error if the self_ref upgrade failed. + throw_unsup_format!("This will be a deadlock error in future") + }; + let self_anonsocket = self_ref.downcast::().unwrap(); + let Some(peer_fd) = self_anonsocket.peer_fd().upgrade() else { + // If the upgrade from Weak to Rc fails, it indicates that all read ends have been + // closed. + return ecx.set_last_error_and_return(ErrorKind::BrokenPipe, &dest); + }; let Some(writebuf) = &peer_fd.downcast::().unwrap().readbuf else { // FIXME: This should return EBADF, but there's no nice way to do that as there's no // corresponding ErrorKind variant. throw_unsup_format!("writing to the reading end of a pipe") }; - let mut writebuf = writebuf.borrow_mut(); - // Remember this clock so `read` can synchronize with us. - ecx.release_clock(|clock| { - writebuf.clock.join(clock); - }); - // Do full write / partial write based on the space available. - let actual_write_size = len.min(available_space); - let bytes = ecx.read_bytes_ptr_strip_provenance(ptr, Size::from_bytes(len))?; - writebuf.buf.extend(&bytes[..actual_write_size]); + let available_space = MAX_SOCKETPAIR_BUFFER_CAPACITY.strict_sub(writebuf.borrow().buf.len()); + + if available_space == 0 { + // Blocking socketpair with a full buffer. + let dest = dest.clone(); + self_anonsocket.blocked_write_tid.borrow_mut().push(ecx.active_thread()); + ecx.block_thread( + BlockReason::UnnamedSocket, + None, + callback!( + @capture<'tcx> { + weak_self_ref: WeakFileDescriptionRef, + ptr: Pointer, + len: usize, + dest: MPlaceTy<'tcx>, + } + @unblock = |this| { + anonsocket_write(weak_self_ref, ptr, len, dest, this) + } + ), + ); + } else { + let mut writebuf = writebuf.borrow_mut(); + // Remember this clock so `read` can synchronize with us. + ecx.release_clock(|clock| { + writebuf.clock.join(clock); + }); + // Do full write / partial write based on the space available. + let actual_write_size = len.min(available_space); + let bytes = ecx.read_bytes_ptr_strip_provenance(ptr, Size::from_bytes(len))?; + writebuf.buf.extend(&bytes[..actual_write_size]); - // Need to stop accessing peer_fd so that it can be notified. - drop(writebuf); + // Need to stop accessing peer_fd so that it can be notified. + drop(writebuf); - // Notification should be provided for peer fd as it became readable. - // The kernel does this even if the fd was already readable before, so we follow suit. - ecx.check_and_update_readiness(peer_fd)?; + // Notification should be provided for peer fd as it became readable. + // The kernel does this even if the fd was already readable before, so we follow suit. + ecx.check_and_update_readiness(&peer_fd)?; + let peer_anonsocket = peer_fd.downcast::().unwrap(); + // Unblock all threads that are currently blocked on peer_fd's read. + let waiting_threads = std::mem::take(&mut *peer_anonsocket.blocked_read_tid.borrow_mut()); + // FIXME: We can randomize the order of unblocking. + for thread_id in waiting_threads { + ecx.unblock_thread(thread_id, BlockReason::UnnamedSocket)?; + } - ecx.return_write_success(actual_write_size, dest) + return ecx.return_write_success(actual_write_size, &dest); + } + interp_ok(()) } /// Read from AnonSocket and return the number of bytes read. fn anonsocket_read<'tcx>( - anonsocket: &AnonSocket, - peer_fd: Option, + weak_self_ref: WeakFileDescriptionRef, len: usize, ptr: Pointer, - dest: &MPlaceTy<'tcx>, + dest: MPlaceTy<'tcx>, ecx: &mut MiriInterpCx<'tcx>, ) -> InterpResult<'tcx> { - let mut bytes = vec![0; len]; + let Some(self_ref) = weak_self_ref.upgrade() else { + // FIXME: We should raise a deadlock error if the self_ref upgrade failed. + throw_unsup_format!("This will be a deadlock error in future") + }; + let self_anonsocket = self_ref.downcast::().unwrap(); - let Some(readbuf) = &anonsocket.readbuf else { + let Some(readbuf) = &self_anonsocket.readbuf else { // FIXME: This should return EBADF, but there's no nice way to do that as there's no // corresponding ErrorKind variant. throw_unsup_format!("reading from the write end of a pipe") }; - let mut readbuf = readbuf.borrow_mut(); - - // Synchronize with all previous writes to this buffer. - // FIXME: this over-synchronizes; a more precise approach would be to - // only sync with the writes whose data we will read. - ecx.acquire_clock(&readbuf.clock); - - // Do full read / partial read based on the space available. - // Conveniently, `read` exists on `VecDeque` and has exactly the desired behavior. - let actual_read_size = readbuf.buf.read(&mut bytes[..]).unwrap(); - - // Need to drop before others can access the readbuf again. - drop(readbuf); - - // A notification should be provided for the peer file description even when it can - // only write 1 byte. This implementation is not compliant with the actual Linux kernel - // implementation. For optimization reasons, the kernel will only mark the file description - // as "writable" when it can write more than a certain number of bytes. Since we - // don't know what that *certain number* is, we will provide a notification every time - // a read is successful. This might result in our epoll emulation providing more - // notifications than the real system. - if let Some(peer_fd) = peer_fd { - ecx.check_and_update_readiness(&peer_fd)?; - } - ecx.return_read_success(ptr, &bytes, actual_read_size, dest) + if readbuf.borrow_mut().buf.is_empty() { + if self_anonsocket.peer_fd().upgrade().is_none() { + // Socketpair with no peer and empty buffer. + // 0 bytes successfully read indicates end-of-file. + return ecx.return_read_success(ptr, &[], 0, &dest); + } else { + // Blocking socketpair with writer and empty buffer. + let weak_self_ref = weak_self_ref.clone(); + self_anonsocket.blocked_read_tid.borrow_mut().push(ecx.active_thread()); + ecx.block_thread( + BlockReason::UnnamedSocket, + None, + callback!( + @capture<'tcx> { + weak_self_ref: WeakFileDescriptionRef, + len: usize, + ptr: Pointer, + dest: MPlaceTy<'tcx>, + } + @unblock = |this| { + anonsocket_read(weak_self_ref, len, ptr, dest, this) + } + ), + ); + } + } else { + let mut bytes = vec![0; len]; + let mut readbuf = readbuf.borrow_mut(); + // Synchronize with all previous writes to this buffer. + // FIXME: this over-synchronizes; a more precise approach would be to + // only sync with the writes whose data we will read. + ecx.acquire_clock(&readbuf.clock); + + // Do full read / partial read based on the space available. + // Conveniently, `read` exists on `VecDeque` and has exactly the desired behavior. + let actual_read_size = readbuf.buf.read(&mut bytes[..]).unwrap(); + + // Need to drop before others can access the readbuf again. + drop(readbuf); + + // A notification should be provided for the peer file description even when it can + // only write 1 byte. This implementation is not compliant with the actual Linux kernel + // implementation. For optimization reasons, the kernel will only mark the file description + // as "writable" when it can write more than a certain number of bytes. Since we + // don't know what that *certain number* is, we will provide a notification every time + // a read is successful. This might result in our epoll emulation providing more + // notifications than the real system. + if let Some(peer_fd) = self_anonsocket.peer_fd().upgrade() { + ecx.check_and_update_readiness(&peer_fd)?; + let peer_anonsocket = peer_fd.downcast::().unwrap(); + // Unblock all threads that are currently blocked on peer_fd's write. + let waiting_threads = + std::mem::take(&mut *peer_anonsocket.blocked_write_tid.borrow_mut()); + // FIXME: We can randomize the order of unblocking. + for thread_id in waiting_threads { + ecx.unblock_thread(thread_id, BlockReason::UnnamedSocket)?; + } + }; + + return ecx.return_read_success(ptr, &bytes, actual_read_size, &dest); + } + interp_ok(()) } impl UnixFileDescription for AnonSocket { @@ -360,12 +429,16 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { readbuf: Some(RefCell::new(Buffer::new())), peer_fd: OnceCell::new(), peer_lost_data: Cell::new(false), + blocked_read_tid: RefCell::new(Vec::new()), + blocked_write_tid: RefCell::new(Vec::new()), is_nonblock: is_sock_nonblock, }); let fd1 = fds.new_ref(AnonSocket { readbuf: Some(RefCell::new(Buffer::new())), peer_fd: OnceCell::new(), peer_lost_data: Cell::new(false), + blocked_read_tid: RefCell::new(Vec::new()), + blocked_write_tid: RefCell::new(Vec::new()), is_nonblock: is_sock_nonblock, }); @@ -424,12 +497,16 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { readbuf: Some(RefCell::new(Buffer::new())), peer_fd: OnceCell::new(), peer_lost_data: Cell::new(false), + blocked_read_tid: RefCell::new(Vec::new()), + blocked_write_tid: RefCell::new(Vec::new()), is_nonblock, }); let fd1 = fds.new_ref(AnonSocket { readbuf: None, peer_fd: OnceCell::new(), peer_lost_data: Cell::new(false), + blocked_read_tid: RefCell::new(Vec::new()), + blocked_write_tid: RefCell::new(Vec::new()), is_nonblock, }); diff --git a/tests/fail-dep/libc/socketpair_block_read_twice.rs b/tests/fail-dep/libc/socketpair_block_read_twice.rs new file mode 100644 index 0000000000..d3e4c43f2b --- /dev/null +++ b/tests/fail-dep/libc/socketpair_block_read_twice.rs @@ -0,0 +1,47 @@ +//@ignore-target: windows # No libc socketpair on Windows +//~^ERROR: deadlocked +//~^^ERROR: deadlocked +// test_race depends on a deterministic schedule. +//@compile-flags: -Zmiri-preemption-rate=0 +//@error-in-other-file: deadlock + +use std::thread; + +// Test the behaviour of a thread being blocked on read, get unblocked, then blocked again. + +// The expected execution is +// 1. Thread 1 blocks. +// 2. Thread 2 blocks. +// 3. Thread 3 unblocks both thread 1 and thread 2. +// 4. Thread 1 reads. +// 5. Thread 2's `read` can never complete -> deadlocked. + +fn main() { + let mut fds = [-1, -1]; + let res = unsafe { libc::socketpair(libc::AF_UNIX, libc::SOCK_STREAM, 0, fds.as_mut_ptr()) }; + assert_eq!(res, 0); + let thread1 = thread::spawn(move || { + // Let this thread block on read. + let mut buf: [u8; 3] = [0; 3]; + let res = unsafe { libc::read(fds[1], buf.as_mut_ptr().cast(), buf.len() as libc::size_t) }; + assert_eq!(res, 3); + assert_eq!(&buf, "abc".as_bytes()); + }); + let thread2 = thread::spawn(move || { + // Let this thread block on read. + let mut buf: [u8; 3] = [0; 3]; + let res = unsafe { libc::read(fds[1], buf.as_mut_ptr().cast(), buf.len() as libc::size_t) }; + //~^ERROR: deadlocked + assert_eq!(res, 3); + assert_eq!(&buf, "abc".as_bytes()); + }); + let thread3 = thread::spawn(move || { + // Unblock thread1 by writing something. + let data = "abc".as_bytes().as_ptr(); + let res = unsafe { libc::write(fds[0], data as *const libc::c_void, 3) }; + assert_eq!(res, 3); + }); + thread1.join().unwrap(); + thread2.join().unwrap(); + thread3.join().unwrap(); +} diff --git a/tests/fail-dep/libc/socketpair_block_read_twice.stderr b/tests/fail-dep/libc/socketpair_block_read_twice.stderr new file mode 100644 index 0000000000..ab807a579d --- /dev/null +++ b/tests/fail-dep/libc/socketpair_block_read_twice.stderr @@ -0,0 +1,41 @@ +error: deadlock: the evaluated program deadlocked + --> RUSTLIB/std/src/sys/pal/PLATFORM/thread.rs:LL:CC + | +LL | let ret = unsafe { libc::pthread_join(id, ptr::null_mut()) }; + | ^ the evaluated program deadlocked + | + = note: BACKTRACE: + = note: inside `std::sys::pal::PLATFORM::thread::Thread::join` at RUSTLIB/std/src/sys/pal/PLATFORM/thread.rs:LL:CC + = note: inside `std::thread::JoinInner::<'_, ()>::join` at RUSTLIB/std/src/thread/mod.rs:LL:CC + = note: inside `std::thread::JoinHandle::<()>::join` at RUSTLIB/std/src/thread/mod.rs:LL:CC +note: inside `main` + --> tests/fail-dep/libc/socketpair_block_read_twice.rs:LL:CC + | +LL | thread2.join().unwrap(); + | ^^^^^^^^^^^^^^ + +error: deadlock: the evaluated program deadlocked + | + = note: the evaluated program deadlocked + = note: (no span available) + = note: BACKTRACE on thread `unnamed-ID`: + +error: deadlock: the evaluated program deadlocked + --> tests/fail-dep/libc/socketpair_block_read_twice.rs:LL:CC + | +LL | let res = unsafe { libc::read(fds[1], buf.as_mut_ptr().cast(), buf.len() as libc::size_t) }; + | ^ the evaluated program deadlocked + | + = note: BACKTRACE on thread `unnamed-ID`: + = note: inside closure at tests/fail-dep/libc/socketpair_block_read_twice.rs:LL:CC + +error: deadlock: the evaluated program deadlocked + | + = note: the evaluated program deadlocked + = note: (no span available) + = note: BACKTRACE on thread `unnamed-ID`: + +note: some details are omitted, run with `MIRIFLAGS=-Zmiri-backtrace=full` for a verbose backtrace + +error: aborting due to 4 previous errors + diff --git a/tests/fail-dep/libc/socketpair_block_write_twice.rs b/tests/fail-dep/libc/socketpair_block_write_twice.rs new file mode 100644 index 0000000000..4f951acb2c --- /dev/null +++ b/tests/fail-dep/libc/socketpair_block_write_twice.rs @@ -0,0 +1,49 @@ +//@ignore-target: windows # No libc socketpair on Windows +//~^ERROR: deadlocked +//~^^ERROR: deadlocked +// test_race depends on a deterministic schedule. +//@compile-flags: -Zmiri-preemption-rate=0 +//@error-in-other-file: deadlock + +use std::thread; + +// Test the behaviour of a thread being blocked on write, get unblocked, then blocked again. + +// The expected execution is +// 1. Thread 1 blocks. +// 2. Thread 2 blocks. +// 3. Thread 3 unblocks both thread 1 and thread 2. +// 4. Thread 1 reads. +// 5. Thread 2's `write` can never complete -> deadlocked. +fn main() { + let mut fds = [-1, -1]; + let res = unsafe { libc::socketpair(libc::AF_UNIX, libc::SOCK_STREAM, 0, fds.as_mut_ptr()) }; + assert_eq!(res, 0); + let arr1: [u8; 212992] = [1; 212992]; + // Exhaust the space in the buffer so the subsequent write will block. + let res = unsafe { libc::write(fds[0], arr1.as_ptr() as *const libc::c_void, 212992) }; + assert_eq!(res, 212992); + let thread1 = thread::spawn(move || { + let data = "abc".as_bytes().as_ptr(); + // The write below will be blocked because the buffer is already full. + let res = unsafe { libc::write(fds[0], data as *const libc::c_void, 3) }; + assert_eq!(res, 3); + }); + let thread2 = thread::spawn(move || { + let data = "abc".as_bytes().as_ptr(); + // The write below will be blocked because the buffer is already full. + let res = unsafe { libc::write(fds[0], data as *const libc::c_void, 3) }; + //~^ERROR: deadlocked + assert_eq!(res, 3); + }); + let thread3 = thread::spawn(move || { + // Unblock thread1 by freeing up some space. + let mut buf: [u8; 3] = [0; 3]; + let res = unsafe { libc::read(fds[1], buf.as_mut_ptr().cast(), buf.len() as libc::size_t) }; + assert_eq!(res, 3); + assert_eq!(buf, [1, 1, 1]); + }); + thread1.join().unwrap(); + thread2.join().unwrap(); + thread3.join().unwrap(); +} diff --git a/tests/fail-dep/libc/socketpair_block_write_twice.stderr b/tests/fail-dep/libc/socketpair_block_write_twice.stderr new file mode 100644 index 0000000000..44cda11102 --- /dev/null +++ b/tests/fail-dep/libc/socketpair_block_write_twice.stderr @@ -0,0 +1,41 @@ +error: deadlock: the evaluated program deadlocked + --> RUSTLIB/std/src/sys/pal/PLATFORM/thread.rs:LL:CC + | +LL | let ret = unsafe { libc::pthread_join(id, ptr::null_mut()) }; + | ^ the evaluated program deadlocked + | + = note: BACKTRACE: + = note: inside `std::sys::pal::PLATFORM::thread::Thread::join` at RUSTLIB/std/src/sys/pal/PLATFORM/thread.rs:LL:CC + = note: inside `std::thread::JoinInner::<'_, ()>::join` at RUSTLIB/std/src/thread/mod.rs:LL:CC + = note: inside `std::thread::JoinHandle::<()>::join` at RUSTLIB/std/src/thread/mod.rs:LL:CC +note: inside `main` + --> tests/fail-dep/libc/socketpair_block_write_twice.rs:LL:CC + | +LL | thread2.join().unwrap(); + | ^^^^^^^^^^^^^^ + +error: deadlock: the evaluated program deadlocked + | + = note: the evaluated program deadlocked + = note: (no span available) + = note: BACKTRACE on thread `unnamed-ID`: + +error: deadlock: the evaluated program deadlocked + --> tests/fail-dep/libc/socketpair_block_write_twice.rs:LL:CC + | +LL | let res = unsafe { libc::write(fds[0], data as *const libc::c_void, 3) }; + | ^ the evaluated program deadlocked + | + = note: BACKTRACE on thread `unnamed-ID`: + = note: inside closure at tests/fail-dep/libc/socketpair_block_write_twice.rs:LL:CC + +error: deadlock: the evaluated program deadlocked + | + = note: the evaluated program deadlocked + = note: (no span available) + = note: BACKTRACE on thread `unnamed-ID`: + +note: some details are omitted, run with `MIRIFLAGS=-Zmiri-backtrace=full` for a verbose backtrace + +error: aborting due to 4 previous errors + diff --git a/tests/fail-dep/libc/socketpair_read_blocking.rs b/tests/fail-dep/libc/socketpair_read_blocking.rs deleted file mode 100644 index ffa4e36f0f..0000000000 --- a/tests/fail-dep/libc/socketpair_read_blocking.rs +++ /dev/null @@ -1,12 +0,0 @@ -//@ignore-target: windows # no libc socketpair on Windows - -// This is temporarily here because blocking on fd is not supported yet. -// When blocking is eventually supported, this will be moved to pass-dep/libc/libc-socketpair - -fn main() { - let mut fds = [-1, -1]; - let _ = unsafe { libc::socketpair(libc::AF_UNIX, libc::SOCK_STREAM, 0, fds.as_mut_ptr()) }; - // The read below will be blocked because the buffer is empty. - let mut buf: [u8; 3] = [0; 3]; - let _res = unsafe { libc::read(fds[1], buf.as_mut_ptr().cast(), buf.len() as libc::size_t) }; //~ERROR: blocking isn't supported -} diff --git a/tests/fail-dep/libc/socketpair_read_blocking.stderr b/tests/fail-dep/libc/socketpair_read_blocking.stderr index 16892614c6..caf23da115 100644 --- a/tests/fail-dep/libc/socketpair_read_blocking.stderr +++ b/tests/fail-dep/libc/socketpair_read_blocking.stderr @@ -1,10 +1,9 @@ -error: unsupported operation: socketpair/pipe/pipe2 read: blocking isn't supported yet +error: deadlock: the evaluated program deadlocked --> tests/fail-dep/libc/socketpair_read_blocking.rs:LL:CC | LL | let _res = unsafe { libc::read(fds[1], buf.as_mut_ptr().cast(), buf.len() as libc::size_t) }; - | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ socketpair/pipe/pipe2 read: blocking isn't supported yet + | ^ the evaluated program deadlocked | - = help: this is likely not a bug in the program; it indicates that the program performed an operation that Miri does not support = note: BACKTRACE: = note: inside `main` at tests/fail-dep/libc/socketpair_read_blocking.rs:LL:CC diff --git a/tests/fail-dep/libc/socketpair_write_blocking.rs b/tests/fail-dep/libc/socketpair_write_blocking.rs deleted file mode 100644 index e83197dfc0..0000000000 --- a/tests/fail-dep/libc/socketpair_write_blocking.rs +++ /dev/null @@ -1,16 +0,0 @@ -//@ignore-target: windows # no libc socketpair on Windows -// This is temporarily here because blocking on fd is not supported yet. -// When blocking is eventually supported, this will be moved to pass-dep/libc/libc-socketpair -fn main() { - let mut fds = [-1, -1]; - let _ = unsafe { libc::socketpair(libc::AF_UNIX, libc::SOCK_STREAM, 0, fds.as_mut_ptr()) }; - // Write size > buffer capacity - // Used up all the space in the buffer. - let arr1: [u8; 212992] = [1; 212992]; - let _ = unsafe { libc::write(fds[0], arr1.as_ptr() as *const libc::c_void, 212992) }; - let data = "abc".as_bytes().as_ptr(); - // The write below will be blocked as the buffer is full. - let _ = unsafe { libc::write(fds[0], data as *const libc::c_void, 3) }; //~ERROR: blocking isn't supported - let mut buf: [u8; 3] = [0; 3]; - let _res = unsafe { libc::read(fds[1], buf.as_mut_ptr().cast(), buf.len() as libc::size_t) }; -} diff --git a/tests/fail-dep/libc/socketpair_write_blocking.stderr b/tests/fail-dep/libc/socketpair_write_blocking.stderr index a2fcf87578..2dc420d5f1 100644 --- a/tests/fail-dep/libc/socketpair_write_blocking.stderr +++ b/tests/fail-dep/libc/socketpair_write_blocking.stderr @@ -1,10 +1,9 @@ -error: unsupported operation: socketpair/pipe/pipe2 write: blocking isn't supported yet +error: deadlock: the evaluated program deadlocked --> tests/fail-dep/libc/socketpair_write_blocking.rs:LL:CC | LL | let _ = unsafe { libc::write(fds[0], data as *const libc::c_void, 3) }; - | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ socketpair/pipe/pipe2 write: blocking isn't supported yet + | ^ the evaluated program deadlocked | - = help: this is likely not a bug in the program; it indicates that the program performed an operation that Miri does not support = note: BACKTRACE: = note: inside `main` at tests/fail-dep/libc/socketpair_write_blocking.rs:LL:CC diff --git a/tests/pass-dep/libc/libc-socketpair.rs b/tests/pass-dep/libc/libc-socketpair.rs index 64819e5767..bbf0e21595 100644 --- a/tests/pass-dep/libc/libc-socketpair.rs +++ b/tests/pass-dep/libc/libc-socketpair.rs @@ -10,6 +10,8 @@ fn main() { test_socketpair(); test_socketpair_threaded(); test_race(); + test_blocking_read(); + test_blocking_write(); } fn test_socketpair() { @@ -136,3 +138,51 @@ fn test_race() { thread::yield_now(); thread1.join().unwrap(); } + +// Test the behaviour of a socketpair getting blocked on read and subsequently unblocked. +fn test_blocking_read() { + let mut fds = [-1, -1]; + let res = unsafe { libc::socketpair(libc::AF_UNIX, libc::SOCK_STREAM, 0, fds.as_mut_ptr()) }; + assert_eq!(res, 0); + let thread1 = thread::spawn(move || { + // Let this thread block on read. + let mut buf: [u8; 3] = [0; 3]; + let res = unsafe { libc::read(fds[1], buf.as_mut_ptr().cast(), buf.len() as libc::size_t) }; + assert_eq!(res, 3); + assert_eq!(&buf, "abc".as_bytes()); + }); + let thread2 = thread::spawn(move || { + // Unblock thread1 by doing writing something. + let data = "abc".as_bytes().as_ptr(); + let res = unsafe { libc::write(fds[0], data as *const libc::c_void, 3) }; + assert_eq!(res, 3); + }); + thread1.join().unwrap(); + thread2.join().unwrap(); +} + +// Test the behaviour of a socketpair getting blocked on write and subsequently unblocked. +fn test_blocking_write() { + let mut fds = [-1, -1]; + let res = unsafe { libc::socketpair(libc::AF_UNIX, libc::SOCK_STREAM, 0, fds.as_mut_ptr()) }; + assert_eq!(res, 0); + let arr1: [u8; 212992] = [1; 212992]; + // Exhaust the space in the buffer so the subsequent write will block. + let res = unsafe { libc::write(fds[0], arr1.as_ptr() as *const libc::c_void, 212992) }; + assert_eq!(res, 212992); + let thread1 = thread::spawn(move || { + let data = "abc".as_bytes().as_ptr(); + // The write below will be blocked because the buffer is already full. + let res = unsafe { libc::write(fds[0], data as *const libc::c_void, 3) }; + assert_eq!(res, 3); + }); + let thread2 = thread::spawn(move || { + // Unblock thread1 by freeing up some space. + let mut buf: [u8; 3] = [0; 3]; + let res = unsafe { libc::read(fds[1], buf.as_mut_ptr().cast(), buf.len() as libc::size_t) }; + assert_eq!(res, 3); + assert_eq!(buf, [1, 1, 1]); + }); + thread1.join().unwrap(); + thread2.join().unwrap(); +}