diff --git a/src/provenance_gc.rs b/src/provenance_gc.rs index 1f4539b5de..b3d715db9c 100644 --- a/src/provenance_gc.rs +++ b/src/provenance_gc.rs @@ -46,12 +46,6 @@ impl VisitProvenance for std::cell::RefCell { } } -impl VisitProvenance for std::vec::Vec { - fn visit_provenance(&self, _visit: &mut VisitWith<'_>) { - // TODO: this is just a temporary change to get Vec to work in unblock_thread, might remove later. - } -} - impl VisitProvenance for BorTag { fn visit_provenance(&self, visit: &mut VisitWith<'_>) { visit(None, Some(*self)) diff --git a/src/shims/unix/unnamed_socket.rs b/src/shims/unix/unnamed_socket.rs index 913c938e92..f09f3130c3 100644 --- a/src/shims/unix/unnamed_socket.rs +++ b/src/shims/unix/unnamed_socket.rs @@ -83,7 +83,7 @@ impl FileDescription for AnonSocket { fn read<'tcx>( &self, - _self_ref: &FileDescriptionRef, + self_ref: &FileDescriptionRef, _communicate_allowed: bool, ptr: Pointer, len: usize, @@ -114,14 +114,32 @@ impl FileDescription for AnonSocket { // Since there is no ErrorKind for EAGAIN, WouldBlock is used. return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest); } else { + // TODO: move this to helper // Blocking socketpair with writer and empty buffer. - // FIXME: blocking is currently not supported - throw_unsup_format!("socketpair/pipe/pipe2 read: blocking isn't supported yet"); + let peer_fd = self.peer_fd().clone(); + let dest = dest.clone(); + let weak_self_ref = self_ref.downgrade(); + ecx.block_thread( + BlockReason::UnnamedSocket, + None, + callback!( + @capture<'tcx> { + weak_self_ref: WeakFileDescriptionRef, + peer_fd: WeakFileDescriptionRef, + len: usize, + ptr: Pointer, + dest: MPlaceTy<'tcx>, + } + @unblock = |this| { + // TODO: We might need to decide what to do if peer_fd is closed when read is blocked. + anonsocket_read(weak_self_ref, peer_fd, len, ptr, &dest, this) + } + ), + ); } } } - // TODO: We might need to decide what to do if peer_fd is closed when read is blocked. - anonsocket_read(self, self.peer_fd().upgrade(), len, ptr, dest, ecx) + interp_ok(()) } fn write<'tcx>( @@ -207,15 +225,16 @@ fn anonsocket_write<'tcx>( /// Read from AnonSocket and return the number of bytes read. fn anonsocket_read<'tcx>( - anonsocket: &AnonSocket, - peer_fd: Option, + self_ref: WeakFileDescriptionRef, + peer_fd: WeakFileDescriptionRef, len: usize, ptr: Pointer, dest: &MPlaceTy<'tcx>, ecx: &mut MiriInterpCx<'tcx>, ) -> InterpResult<'tcx> { let mut bytes = vec![0; len]; - + let self_ref = self_ref.upgrade().unwrap(); // TODO: handle this later + let anonsocket = self_ref.downcast::().unwrap(); let Some(readbuf) = &anonsocket.readbuf else { // FIXME: This should return EBADF, but there's no nice way to do that as there's no // corresponding ErrorKind variant. @@ -242,6 +261,7 @@ fn anonsocket_read<'tcx>( // don't know what that *certain number* is, we will provide a notification every time // a read is successful. This might result in our epoll emulation providing more // notifications than the real system. + let peer_fd = peer_fd.upgrade(); // TODO: handle unwrap if let Some(peer_fd) = peer_fd { ecx.check_and_update_readiness(&peer_fd)?; } @@ -295,211 +315,7 @@ impl UnixFileDescription for AnonSocket { } interp_ok(epoll_ready_events) } - - fn close<'tcx>( - self: Box, - _communicate_allowed: bool, - ecx: &mut MiriInterpCx<'tcx>, - ) -> InterpResult<'tcx, io::Result<()>> { - if let Some(peer_fd) = self.peer_fd().upgrade() { - // If the current readbuf is non-empty when the file description is closed, - // notify the peer that data lost has happened in current file description. - if let Some(readbuf) = &self.readbuf { - if !readbuf.borrow().buf.is_empty() { - peer_fd.downcast::().unwrap().peer_lost_data.set(true); - } - } - // Notify peer fd that close has happened, since that can unblock reads and writes. - ecx.check_and_update_readiness(&peer_fd)?; - } - interp_ok(Ok(())) - } - - fn read<'tcx>( - &self, - self_ref: &FileDescriptionRef, - _communicate_allowed: bool, - ptr: Pointer, - len: usize, - dest: &MPlaceTy<'tcx>, - ecx: &mut MiriInterpCx<'tcx>, - ) -> InterpResult<'tcx> { - let mut bytes = vec![0; len]; - - // Always succeed on read size 0. - if len == 0 { - return ecx.return_read_success(ptr, &bytes, 0, dest); - } - - let Some(readbuf) = &self.readbuf else { - // FIXME: This should return EBADF, but there's no nice way to do that as there's no - // corresponding ErrorKind variant. - throw_unsup_format!("reading from the write end of a pipe"); - }; - if readbuf.borrow().buf.is_empty() { - if self.peer_fd().upgrade().is_none() { - // Socketpair with no peer and empty buffer. - // 0 bytes successfully read indicates end-of-file. - return ecx.return_read_success(ptr, &bytes, 0, dest); - } else { - if self.is_nonblock { - // Non-blocking socketpair with writer and empty buffer. - // https://linux.die.net/man/2/read - // EAGAIN or EWOULDBLOCK can be returned for socket, - // POSIX.1-2001 allows either error to be returned for this case. - // Since there is no ErrorKind for EAGAIN, WouldBlock is used. - return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest); - } else { - // TODO: move this to helper - // Blocking socketpair with writer and empty buffer. - let peer_fd = self.peer_fd().clone(); - let dest = dest.clone(); - let weak_self_ref = self_ref.downgrade(); - ecx.block_thread( - BlockReason::UnnamedSocket, - None, - callback!( - @capture<'tcx> { - weak_self_ref: WeakFileDescriptionRef, - peer_fd: WeakFileDescriptionRef, - bytes: Vec, - ptr: Pointer, - dest: MPlaceTy<'tcx>, - } - @unblock = |this| { - // TODO: We might need to decide what to do if peer_fd is closed when read is blocked. - anonsocket_read(weak_self_ref, peer_fd, &mut bytes[..], ptr, &dest, this) - } - ), - ); - } - } - } - interp_ok(()) - } - - fn write<'tcx>( - &self, - _self_ref: &FileDescriptionRef, - _communicate_allowed: bool, - ptr: Pointer, - len: usize, - dest: &MPlaceTy<'tcx>, - ecx: &mut MiriInterpCx<'tcx>, - ) -> InterpResult<'tcx> { - // Always succeed on write size 0. - // ("If count is zero and fd refers to a file other than a regular file, the results are not specified.") - if len == 0 { - return ecx.return_write_success(0, dest); - } - - // We are writing to our peer's readbuf. - let Some(peer_fd) = self.peer_fd().upgrade() else { - // If the upgrade from Weak to Rc fails, it indicates that all read ends have been - // closed. - return ecx.set_last_error_and_return(ErrorKind::BrokenPipe, dest); - }; - - let Some(writebuf) = &peer_fd.downcast::().unwrap().readbuf else { - // FIXME: This should return EBADF, but there's no nice way to do that as there's no - // corresponding ErrorKind variant. - throw_unsup_format!("writing to the reading end of a pipe"); - }; - let available_space = - MAX_SOCKETPAIR_BUFFER_CAPACITY.strict_sub(writebuf.borrow().buf.len()); - if available_space == 0 { - if self.is_nonblock { - // Non-blocking socketpair with a full buffer. - return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest); - } else { - // Blocking socketpair with a full buffer. - throw_unsup_format!("socketpair/pipe/pipe2 write: blocking isn't supported yet"); - } - } - anonsocket_write(available_space, &peer_fd, ptr, len, dest, ecx) - } -} - -/// Write to AnonSocket based on the space available and return the written byte size. -fn anonsocket_write<'tcx>( - available_space: usize, - peer_fd: &FileDescriptionRef, - ptr: Pointer, - len: usize, - dest: &MPlaceTy<'tcx>, - ecx: &mut MiriInterpCx<'tcx>, -) -> InterpResult<'tcx> { - let Some(writebuf) = &peer_fd.downcast::().unwrap().readbuf else { - // FIXME: This should return EBADF, but there's no nice way to do that as there's no - // corresponding ErrorKind variant. - throw_unsup_format!("writing to the reading end of a pipe") - }; - let mut writebuf = writebuf.borrow_mut(); - - // Remember this clock so `read` can synchronize with us. - ecx.release_clock(|clock| { - writebuf.clock.join(clock); - }); - // Do full write / partial write based on the space available. - let actual_write_size = len.min(available_space); - let bytes = ecx.read_bytes_ptr_strip_provenance(ptr, Size::from_bytes(len))?; - writebuf.buf.extend(&bytes[..actual_write_size]); - - // Need to stop accessing peer_fd so that it can be notified. - drop(writebuf); - - // Notification should be provided for peer fd as it became readable. - // The kernel does this even if the fd was already readable before, so we follow suit. - ecx.check_and_update_readiness(peer_fd)?; - - ecx.return_write_success(actual_write_size, dest) -} - -/// Read from AnonSocket and return the number of bytes read. -fn anonsocket_read<'tcx>( - self_ref: WeakFileDescriptionRef, - peer_fd: WeakFileDescriptionRef, - bytes: &mut [u8], - ptr: Pointer, - dest: &MPlaceTy<'tcx>, - ecx: &mut MiriInterpCx<'tcx>, -) -> InterpResult<'tcx> { - let self_ref = self_ref.upgrade().unwrap(); // TODO: handle this later - let anonsocket = self_ref.downcast::().unwrap(); - let Some(readbuf) = &anonsocket.readbuf else { - // FIXME: This should return EBADF, but there's no nice way to do that as there's no - // corresponding ErrorKind variant. - throw_unsup_format!("reading from the write end of a pipe") - }; - let mut readbuf = readbuf.borrow_mut(); - - // Synchronize with all previous writes to this buffer. - // FIXME: this over-synchronizes; a more precise approach would be to - // only sync with the writes whose data we will read. - ecx.acquire_clock(&readbuf.clock); - - // Do full read / partial read based on the space available. - // Conveniently, `read` exists on `VecDeque` and has exactly the desired behavior. - let actual_read_size = readbuf.buf.read(bytes).unwrap(); - - // Need to drop before others can access the readbuf again. - drop(readbuf); - - // A notification should be provided for the peer file description even when it can - // only write 1 byte. This implementation is not compliant with the actual Linux kernel - // implementation. For optimization reasons, the kernel will only mark the file description - // as "writable" when it can write more than a certain number of bytes. Since we - // don't know what that *certain number* is, we will provide a notification every time - // a read is successful. This might result in our epoll emulation providing more - // notifications than the real system. - let peer_fd = peer_fd.upgrade(); // TODO: handle unwrap - if let Some(peer_fd) = peer_fd { - ecx.check_and_update_readiness(&peer_fd)?; - } - - ecx.return_read_success(ptr, bytes, actual_read_size, dest) } - impl<'tcx> EvalContextExt<'tcx> for crate::MiriInterpCx<'tcx> {} pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { /// For more information on the arguments see the socketpair manpage: