Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor AnonSocket::read/write for blocking socketpair #4037

Merged
merged 1 commit into from
Nov 24, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 80 additions & 44 deletions src/shims/unix/unnamed_socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,7 @@ impl FileDescription for AnonSocket {
// corresponding ErrorKind variant.
throw_unsup_format!("reading from the write end of a pipe");
};
let mut readbuf = readbuf.borrow_mut();
if readbuf.buf.is_empty() {
if readbuf.borrow().buf.is_empty() {
if self.peer_fd().upgrade().is_none() {
// Socketpair with no peer and empty buffer.
// 0 bytes successfully read indicates end-of-file.
Expand All @@ -167,31 +166,8 @@ impl FileDescription for AnonSocket {
}
}
}

// Synchronize with all previous writes to this buffer.
// FIXME: this over-synchronizes; a more precise approach would be to
// only sync with the writes whose data we will read.
ecx.acquire_clock(&readbuf.clock);

// Do full read / partial read based on the space available.
// Conveniently, `read` exists on `VecDeque` and has exactly the desired behavior.
let actual_read_size = readbuf.buf.read(&mut bytes).unwrap();

// Need to drop before others can access the readbuf again.
drop(readbuf);

// A notification should be provided for the peer file description even when it can
// only write 1 byte. This implementation is not compliant with the actual Linux kernel
// implementation. For optimization reasons, the kernel will only mark the file description
// as "writable" when it can write more than a certain number of bytes. Since we
// don't know what that *certain number* is, we will provide a notification every time
// a read is successful. This might result in our epoll emulation providing more
// notifications than the real system.
if let Some(peer_fd) = self.peer_fd().upgrade() {
ecx.check_and_update_readiness(&peer_fd)?;
}

ecx.return_read_success(ptr, &bytes, actual_read_size, dest)
// TODO: We might need to decide what to do if peer_fd is closed when read is blocked.
anonsocket_read(self, self.peer_fd().upgrade(), &mut bytes, ptr, dest, ecx)
Comment on lines +169 to +170
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not entirely sure if closing peer_fd when it is blocking should be allowed. I currently prefer to leave the TODO here and revisit it again when implementing blocking (and write a testcase for this).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would expect that closing one side wakes up the other side, and makes it return an EOF read.

I'm a bit surprised that all that if readbuf.borrow().buf.is_empty() { stuff is not insider the helper, but 🤷 I guess it can be moved later if it turns out to be useful.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'm a bit surprised that all that if readbuf.borrow().buf.is_empty() { stuff is not insider the helper,

Ah right I should've moved them inside helper, I guess I will just separate this into another commit when I implement the blocking operation.

}

fn write<'tcx>(
Expand Down Expand Up @@ -221,9 +197,8 @@ impl FileDescription for AnonSocket {
// corresponding ErrorKind variant.
throw_unsup_format!("writing to the reading end of a pipe");
};
let mut writebuf = writebuf.borrow_mut();
let data_size = writebuf.buf.len();
let available_space = MAX_SOCKETPAIR_BUFFER_CAPACITY.strict_sub(data_size);
let available_space =
MAX_SOCKETPAIR_BUFFER_CAPACITY.strict_sub(writebuf.borrow().buf.len());
if available_space == 0 {
if self.is_nonblock {
// Non-blocking socketpair with a full buffer.
Expand All @@ -233,24 +208,85 @@ impl FileDescription for AnonSocket {
throw_unsup_format!("socketpair/pipe/pipe2 write: blocking isn't supported yet");
}
}
// Remember this clock so `read` can synchronize with us.
ecx.release_clock(|clock| {
writebuf.clock.join(clock);
});
// Do full write / partial write based on the space available.
let actual_write_size = len.min(available_space);
let bytes = ecx.read_bytes_ptr_strip_provenance(ptr, Size::from_bytes(len))?;
writebuf.buf.extend(&bytes[..actual_write_size]);
anonsocket_write(available_space, &peer_fd, ptr, len, dest, ecx)
}
}

// Need to stop accessing peer_fd so that it can be notified.
drop(writebuf);
/// Write to AnonSocket based on the space available and return the written byte size.
fn anonsocket_write<'tcx>(
available_space: usize,
peer_fd: &FileDescriptionRef,
ptr: Pointer,
len: usize,
dest: &MPlaceTy<'tcx>,
ecx: &mut MiriInterpCx<'tcx>,
) -> InterpResult<'tcx> {
let Some(writebuf) = &peer_fd.downcast::<AnonSocket>().unwrap().readbuf else {
// FIXME: This should return EBADF, but there's no nice way to do that as there's no
// corresponding ErrorKind variant.
throw_unsup_format!("writing to the reading end of a pipe")
};
let mut writebuf = writebuf.borrow_mut();

// Remember this clock so `read` can synchronize with us.
ecx.release_clock(|clock| {
writebuf.clock.join(clock);
});
// Do full write / partial write based on the space available.
let actual_write_size = len.min(available_space);
let bytes = ecx.read_bytes_ptr_strip_provenance(ptr, Size::from_bytes(len))?;
writebuf.buf.extend(&bytes[..actual_write_size]);

// Need to stop accessing peer_fd so that it can be notified.
drop(writebuf);

// Notification should be provided for peer fd as it became readable.
// The kernel does this even if the fd was already readable before, so we follow suit.
ecx.check_and_update_readiness(peer_fd)?;

ecx.return_write_success(actual_write_size, dest)
}

// Notification should be provided for peer fd as it became readable.
// The kernel does this even if the fd was already readable before, so we follow suit.
/// Read from AnonSocket and return the number of bytes read.
fn anonsocket_read<'tcx>(
tiif marked this conversation as resolved.
Show resolved Hide resolved
anonsocket: &AnonSocket,
peer_fd: Option<FileDescriptionRef>,
bytes: &mut [u8],
ptr: Pointer,
dest: &MPlaceTy<'tcx>,
ecx: &mut MiriInterpCx<'tcx>,
) -> InterpResult<'tcx> {
let Some(readbuf) = &anonsocket.readbuf else {
// FIXME: This should return EBADF, but there's no nice way to do that as there's no
// corresponding ErrorKind variant.
throw_unsup_format!("reading from the write end of a pipe")
};
let mut readbuf = readbuf.borrow_mut();

// Synchronize with all previous writes to this buffer.
// FIXME: this over-synchronizes; a more precise approach would be to
// only sync with the writes whose data we will read.
ecx.acquire_clock(&readbuf.clock);

// Do full read / partial read based on the space available.
// Conveniently, `read` exists on `VecDeque` and has exactly the desired behavior.
let actual_read_size = readbuf.buf.read(bytes).unwrap();

// Need to drop before others can access the readbuf again.
drop(readbuf);

// A notification should be provided for the peer file description even when it can
// only write 1 byte. This implementation is not compliant with the actual Linux kernel
// implementation. For optimization reasons, the kernel will only mark the file description
// as "writable" when it can write more than a certain number of bytes. Since we
// don't know what that *certain number* is, we will provide a notification every time
// a read is successful. This might result in our epoll emulation providing more
// notifications than the real system.
if let Some(peer_fd) = peer_fd {
ecx.check_and_update_readiness(&peer_fd)?;

ecx.return_write_success(actual_write_size, dest)
}

ecx.return_read_success(ptr, bytes, actual_read_size, dest)
}

impl<'tcx> EvalContextExt<'tcx> for crate::MiriInterpCx<'tcx> {}
Expand Down