Skip to content

Commit

Permalink
Fix error caused by rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
tiif committed Dec 6, 2024
1 parent c689ee8 commit 5c1f3f8
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 218 deletions.
6 changes: 0 additions & 6 deletions src/provenance_gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,6 @@ impl<T: VisitProvenance> VisitProvenance for std::cell::RefCell<T> {
}
}

impl<T: VisitProvenance> VisitProvenance for std::vec::Vec<T> {
fn visit_provenance(&self, _visit: &mut VisitWith<'_>) {
// TODO: this is just a temporary change to get Vec<u8> to work in unblock_thread, might remove later.
}
}

impl VisitProvenance for BorTag {
fn visit_provenance(&self, visit: &mut VisitWith<'_>) {
visit(None, Some(*self))
Expand Down
240 changes: 28 additions & 212 deletions src/shims/unix/unnamed_socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ impl FileDescription for AnonSocket {

fn read<'tcx>(
&self,
_self_ref: &FileDescriptionRef,
self_ref: &FileDescriptionRef,
_communicate_allowed: bool,
ptr: Pointer,
len: usize,
Expand Down Expand Up @@ -114,14 +114,32 @@ impl FileDescription for AnonSocket {
// Since there is no ErrorKind for EAGAIN, WouldBlock is used.
return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest);
} else {
// TODO: move this to helper
// Blocking socketpair with writer and empty buffer.
// FIXME: blocking is currently not supported
throw_unsup_format!("socketpair/pipe/pipe2 read: blocking isn't supported yet");
let peer_fd = self.peer_fd().clone();
let dest = dest.clone();
let weak_self_ref = self_ref.downgrade();
ecx.block_thread(
BlockReason::UnnamedSocket,
None,
callback!(
@capture<'tcx> {
weak_self_ref: WeakFileDescriptionRef,
peer_fd: WeakFileDescriptionRef,
len: usize,
ptr: Pointer,
dest: MPlaceTy<'tcx>,
}
@unblock = |this| {
// TODO: We might need to decide what to do if peer_fd is closed when read is blocked.
anonsocket_read(weak_self_ref, peer_fd, len, ptr, &dest, this)
}
),
);
}
}
}
// TODO: We might need to decide what to do if peer_fd is closed when read is blocked.
anonsocket_read(self, self.peer_fd().upgrade(), len, ptr, dest, ecx)
interp_ok(())
}

fn write<'tcx>(
Expand Down Expand Up @@ -207,15 +225,16 @@ fn anonsocket_write<'tcx>(

/// Read from AnonSocket and return the number of bytes read.
fn anonsocket_read<'tcx>(
anonsocket: &AnonSocket,
peer_fd: Option<FileDescriptionRef>,
self_ref: WeakFileDescriptionRef,
peer_fd: WeakFileDescriptionRef,
len: usize,
ptr: Pointer,
dest: &MPlaceTy<'tcx>,
ecx: &mut MiriInterpCx<'tcx>,
) -> InterpResult<'tcx> {
let mut bytes = vec![0; len];

let self_ref = self_ref.upgrade().unwrap(); // TODO: handle this later
let anonsocket = self_ref.downcast::<AnonSocket>().unwrap();
let Some(readbuf) = &anonsocket.readbuf else {
// FIXME: This should return EBADF, but there's no nice way to do that as there's no
// corresponding ErrorKind variant.
Expand All @@ -242,6 +261,7 @@ fn anonsocket_read<'tcx>(
// don't know what that *certain number* is, we will provide a notification every time
// a read is successful. This might result in our epoll emulation providing more
// notifications than the real system.
let peer_fd = peer_fd.upgrade(); // TODO: handle unwrap
if let Some(peer_fd) = peer_fd {
ecx.check_and_update_readiness(&peer_fd)?;
}
Expand Down Expand Up @@ -295,211 +315,7 @@ impl UnixFileDescription for AnonSocket {
}
interp_ok(epoll_ready_events)
}

fn close<'tcx>(
self: Box<Self>,
_communicate_allowed: bool,
ecx: &mut MiriInterpCx<'tcx>,
) -> InterpResult<'tcx, io::Result<()>> {
if let Some(peer_fd) = self.peer_fd().upgrade() {
// If the current readbuf is non-empty when the file description is closed,
// notify the peer that data lost has happened in current file description.
if let Some(readbuf) = &self.readbuf {
if !readbuf.borrow().buf.is_empty() {
peer_fd.downcast::<AnonSocket>().unwrap().peer_lost_data.set(true);
}
}
// Notify peer fd that close has happened, since that can unblock reads and writes.
ecx.check_and_update_readiness(&peer_fd)?;
}
interp_ok(Ok(()))
}

fn read<'tcx>(
&self,
self_ref: &FileDescriptionRef,
_communicate_allowed: bool,
ptr: Pointer,
len: usize,
dest: &MPlaceTy<'tcx>,
ecx: &mut MiriInterpCx<'tcx>,
) -> InterpResult<'tcx> {
let mut bytes = vec![0; len];

// Always succeed on read size 0.
if len == 0 {
return ecx.return_read_success(ptr, &bytes, 0, dest);
}

let Some(readbuf) = &self.readbuf else {
// FIXME: This should return EBADF, but there's no nice way to do that as there's no
// corresponding ErrorKind variant.
throw_unsup_format!("reading from the write end of a pipe");
};
if readbuf.borrow().buf.is_empty() {
if self.peer_fd().upgrade().is_none() {
// Socketpair with no peer and empty buffer.
// 0 bytes successfully read indicates end-of-file.
return ecx.return_read_success(ptr, &bytes, 0, dest);
} else {
if self.is_nonblock {
// Non-blocking socketpair with writer and empty buffer.
// https://linux.die.net/man/2/read
// EAGAIN or EWOULDBLOCK can be returned for socket,
// POSIX.1-2001 allows either error to be returned for this case.
// Since there is no ErrorKind for EAGAIN, WouldBlock is used.
return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest);
} else {
// TODO: move this to helper
// Blocking socketpair with writer and empty buffer.
let peer_fd = self.peer_fd().clone();
let dest = dest.clone();
let weak_self_ref = self_ref.downgrade();
ecx.block_thread(
BlockReason::UnnamedSocket,
None,
callback!(
@capture<'tcx> {
weak_self_ref: WeakFileDescriptionRef,
peer_fd: WeakFileDescriptionRef,
bytes: Vec<u8>,
ptr: Pointer,
dest: MPlaceTy<'tcx>,
}
@unblock = |this| {
// TODO: We might need to decide what to do if peer_fd is closed when read is blocked.
anonsocket_read(weak_self_ref, peer_fd, &mut bytes[..], ptr, &dest, this)
}
),
);
}
}
}
interp_ok(())
}

fn write<'tcx>(
&self,
_self_ref: &FileDescriptionRef,
_communicate_allowed: bool,
ptr: Pointer,
len: usize,
dest: &MPlaceTy<'tcx>,
ecx: &mut MiriInterpCx<'tcx>,
) -> InterpResult<'tcx> {
// Always succeed on write size 0.
// ("If count is zero and fd refers to a file other than a regular file, the results are not specified.")
if len == 0 {
return ecx.return_write_success(0, dest);
}

// We are writing to our peer's readbuf.
let Some(peer_fd) = self.peer_fd().upgrade() else {
// If the upgrade from Weak to Rc fails, it indicates that all read ends have been
// closed.
return ecx.set_last_error_and_return(ErrorKind::BrokenPipe, dest);
};

let Some(writebuf) = &peer_fd.downcast::<AnonSocket>().unwrap().readbuf else {
// FIXME: This should return EBADF, but there's no nice way to do that as there's no
// corresponding ErrorKind variant.
throw_unsup_format!("writing to the reading end of a pipe");
};
let available_space =
MAX_SOCKETPAIR_BUFFER_CAPACITY.strict_sub(writebuf.borrow().buf.len());
if available_space == 0 {
if self.is_nonblock {
// Non-blocking socketpair with a full buffer.
return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest);
} else {
// Blocking socketpair with a full buffer.
throw_unsup_format!("socketpair/pipe/pipe2 write: blocking isn't supported yet");
}
}
anonsocket_write(available_space, &peer_fd, ptr, len, dest, ecx)
}
}

/// Write to AnonSocket based on the space available and return the written byte size.
fn anonsocket_write<'tcx>(
available_space: usize,
peer_fd: &FileDescriptionRef,
ptr: Pointer,
len: usize,
dest: &MPlaceTy<'tcx>,
ecx: &mut MiriInterpCx<'tcx>,
) -> InterpResult<'tcx> {
let Some(writebuf) = &peer_fd.downcast::<AnonSocket>().unwrap().readbuf else {
// FIXME: This should return EBADF, but there's no nice way to do that as there's no
// corresponding ErrorKind variant.
throw_unsup_format!("writing to the reading end of a pipe")
};
let mut writebuf = writebuf.borrow_mut();

// Remember this clock so `read` can synchronize with us.
ecx.release_clock(|clock| {
writebuf.clock.join(clock);
});
// Do full write / partial write based on the space available.
let actual_write_size = len.min(available_space);
let bytes = ecx.read_bytes_ptr_strip_provenance(ptr, Size::from_bytes(len))?;
writebuf.buf.extend(&bytes[..actual_write_size]);

// Need to stop accessing peer_fd so that it can be notified.
drop(writebuf);

// Notification should be provided for peer fd as it became readable.
// The kernel does this even if the fd was already readable before, so we follow suit.
ecx.check_and_update_readiness(peer_fd)?;

ecx.return_write_success(actual_write_size, dest)
}

/// Read from AnonSocket and return the number of bytes read.
fn anonsocket_read<'tcx>(
self_ref: WeakFileDescriptionRef,
peer_fd: WeakFileDescriptionRef,
bytes: &mut [u8],
ptr: Pointer,
dest: &MPlaceTy<'tcx>,
ecx: &mut MiriInterpCx<'tcx>,
) -> InterpResult<'tcx> {
let self_ref = self_ref.upgrade().unwrap(); // TODO: handle this later
let anonsocket = self_ref.downcast::<AnonSocket>().unwrap();
let Some(readbuf) = &anonsocket.readbuf else {
// FIXME: This should return EBADF, but there's no nice way to do that as there's no
// corresponding ErrorKind variant.
throw_unsup_format!("reading from the write end of a pipe")
};
let mut readbuf = readbuf.borrow_mut();

// Synchronize with all previous writes to this buffer.
// FIXME: this over-synchronizes; a more precise approach would be to
// only sync with the writes whose data we will read.
ecx.acquire_clock(&readbuf.clock);

// Do full read / partial read based on the space available.
// Conveniently, `read` exists on `VecDeque` and has exactly the desired behavior.
let actual_read_size = readbuf.buf.read(bytes).unwrap();

// Need to drop before others can access the readbuf again.
drop(readbuf);

// A notification should be provided for the peer file description even when it can
// only write 1 byte. This implementation is not compliant with the actual Linux kernel
// implementation. For optimization reasons, the kernel will only mark the file description
// as "writable" when it can write more than a certain number of bytes. Since we
// don't know what that *certain number* is, we will provide a notification every time
// a read is successful. This might result in our epoll emulation providing more
// notifications than the real system.
let peer_fd = peer_fd.upgrade(); // TODO: handle unwrap
if let Some(peer_fd) = peer_fd {
ecx.check_and_update_readiness(&peer_fd)?;
}

ecx.return_read_success(ptr, bytes, actual_read_size, dest)
}

impl<'tcx> EvalContextExt<'tcx> for crate::MiriInterpCx<'tcx> {}
pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
/// For more information on the arguments see the socketpair manpage:
Expand Down

0 comments on commit 5c1f3f8

Please sign in to comment.