diff --git a/Cargo.lock b/Cargo.lock index f7af3936a355c3..0aff8af65fa8a6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1768,6 +1768,7 @@ dependencies = [ "libz-sys", "md-5", "md4", + "memchr", "node_resolver", "num-bigint", "num-bigint-dig", @@ -4150,9 +4151,9 @@ dependencies = [ [[package]] name = "memchr" -version = "2.7.2" +version = "2.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c8640c5d730cb13ebd907d8d04b52f55ac9a2eec55b440c8892f40d56c76c1d" +checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" [[package]] name = "memmap2" diff --git a/cli/args/mod.rs b/cli/args/mod.rs index ea79aaa464e6a5..3f6c0177b38af2 100644 --- a/cli/args/mod.rs +++ b/cli/args/mod.rs @@ -1050,10 +1050,10 @@ impl CliOptions { } pub fn node_ipc_fd(&self) -> Option { - let maybe_node_channel_fd = std::env::var("DENO_CHANNEL_FD").ok(); + let maybe_node_channel_fd = std::env::var("NODE_CHANNEL_FD").ok(); if let Some(node_channel_fd) = maybe_node_channel_fd { // Remove so that child processes don't inherit this environment variable. - std::env::remove_var("DENO_CHANNEL_FD"); + std::env::remove_var("NODE_CHANNEL_FD"); node_channel_fd.parse::().ok() } else { None diff --git a/ext/node/Cargo.toml b/ext/node/Cargo.toml index 00afb64eb83935..5c410f09007a8e 100644 --- a/ext/node/Cargo.toml +++ b/ext/node/Cargo.toml @@ -55,6 +55,7 @@ libc.workspace = true libz-sys.workspace = true md-5 = { version = "0.10.5", features = ["oid"] } md4 = "0.10.2" +memchr = "2.7.4" node_resolver.workspace = true num-bigint.workspace = true num-bigint-dig = "0.8.2" diff --git a/ext/node/benchmarks/child_process_ipc.mjs b/ext/node/benchmarks/child_process_ipc.mjs index 0486972dc3b7b2..39377cd8c2a64a 100644 --- a/ext/node/benchmarks/child_process_ipc.mjs +++ b/ext/node/benchmarks/child_process_ipc.mjs @@ -5,10 +5,20 @@ import { setImmediate } from "node:timers"; if (process.env.CHILD) { const len = +process.env.CHILD; const msg = ".".repeat(len); + let waiting = false; const send = () => { - while (process.send(msg)); + while ( + process.send(msg, undefined, undefined, (_e) => { + if (waiting) { + waiting = false; + setImmediate(send); + } + }) + ); // Wait: backlog of unsent messages exceeds threshold - setImmediate(send); + // once the message is sent, the callback will be called + // and we'll resume + waiting = true; }; send(); } else { diff --git a/ext/node/lib.rs b/ext/node/lib.rs index 2c86505779a40c..de57ff6bfc35c9 100644 --- a/ext/node/lib.rs +++ b/ext/node/lib.rs @@ -30,6 +30,7 @@ pub use deno_package_json::PackageJson; pub use node_resolver::PathClean; pub use ops::ipc::ChildPipeFd; pub use ops::ipc::IpcJsonStreamResource; +pub use ops::ipc::IpcRefTracker; use ops::vm; pub use ops::vm::create_v8_context; pub use ops::vm::init_global_template; @@ -378,6 +379,8 @@ deno_core::extension!(deno_node, ops::ipc::op_node_child_ipc_pipe, ops::ipc::op_node_ipc_write, ops::ipc::op_node_ipc_read, + ops::ipc::op_node_ipc_ref, + ops::ipc::op_node_ipc_unref, ops::process::op_node_process_kill, ops::process::op_process_abort, ], diff --git a/ext/node/ops/ipc.rs b/ext/node/ops/ipc.rs index dc0c086c1230c6..4849a5c6cf2817 100644 --- a/ext/node/ops/ipc.rs +++ b/ext/node/ops/ipc.rs @@ -15,23 +15,33 @@ mod impl_ { use std::os::fd::RawFd; use std::pin::Pin; use std::rc::Rc; + use std::sync::atomic::AtomicBool; + use std::sync::atomic::AtomicUsize; + use std::task::ready; use std::task::Context; use std::task::Poll; use deno_core::error::bad_resource_id; use deno_core::error::AnyError; use deno_core::op2; + use deno_core::serde; + use deno_core::serde::Serializer; use deno_core::serde_json; + use deno_core::v8; use deno_core::AsyncRefCell; use deno_core::CancelFuture; use deno_core::CancelHandle; + use deno_core::ExternalOpsTracker; use deno_core::OpState; use deno_core::RcRef; use deno_core::ResourceId; + use deno_core::ToV8; + use memchr::memchr; use pin_project_lite::pin_project; - use tokio::io::AsyncBufRead; + use serde::Serialize; + use tokio::io::AsyncRead; use tokio::io::AsyncWriteExt; - use tokio::io::BufReader; + use tokio::io::ReadBuf; #[cfg(unix)] use tokio::net::unix::OwnedReadHalf; @@ -43,6 +53,116 @@ mod impl_ { #[cfg(windows)] type NamedPipeClient = tokio::net::windows::named_pipe::NamedPipeClient; + /// Wrapper around v8 value that implements Serialize. + struct SerializeWrapper<'a, 'b>( + RefCell<&'b mut v8::HandleScope<'a>>, + v8::Local<'a, v8::Value>, + ); + + impl<'a, 'b> Serialize for SerializeWrapper<'a, 'b> { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + serialize_v8_value(*self.0.borrow_mut(), self.1, serializer) + } + } + + /// Serialize a v8 value directly into a serde serializer. + /// This allows us to go from v8 values to JSON without having to + /// deserialize into a `serde_json::Value` and then reserialize to JSON + fn serialize_v8_value<'a, S: Serializer>( + scope: &mut v8::HandleScope<'a>, + value: v8::Local<'a, v8::Value>, + ser: S, + ) -> Result { + use serde::ser::Error; + if value.is_null_or_undefined() { + ser.serialize_unit() + } else if value.is_number() || value.is_number_object() { + let num_value = value.number_value(scope).unwrap(); + if (num_value as i64 as f64) == num_value { + ser.serialize_i64(num_value as i64) + } else { + ser.serialize_f64(num_value) + } + } else if value.is_string() { + let str = deno_core::serde_v8::to_utf8(value.try_into().unwrap(), scope); + ser.serialize_str(&str) + } else if value.is_string_object() { + let str = + deno_core::serde_v8::to_utf8(value.to_string(scope).unwrap(), scope); + ser.serialize_str(&str) + } else if value.is_boolean() { + ser.serialize_bool(value.is_true()) + } else if value.is_boolean_object() { + ser.serialize_bool(value.boolean_value(scope)) + } else if value.is_array() { + use serde::ser::SerializeSeq; + let array = v8::Local::::try_from(value).unwrap(); + let length = array.length(); + let mut seq = ser.serialize_seq(Some(length as usize))?; + for i in 0..length { + let element = array.get_index(scope, i).unwrap(); + seq + .serialize_element(&SerializeWrapper(RefCell::new(scope), element))?; + } + seq.end() + } else if value.is_object() { + use serde::ser::SerializeMap; + if value.is_array_buffer_view() { + let buffer = v8::Local::::try_from(value).unwrap(); + let mut buf = vec![0u8; buffer.byte_length()]; + let copied = buffer.copy_contents(&mut buf); + assert_eq!(copied, buf.len()); + return ser.serialize_bytes(&buf); + } + let object = value.to_object(scope).unwrap(); + // node uses `JSON.stringify`, so to match its behavior (and allow serializing custom objects) + // we need to respect the `toJSON` method if it exists. + let to_json_key = v8::String::new_from_utf8( + scope, + b"toJSON", + v8::NewStringType::Internalized, + ) + .unwrap() + .into(); + if let Some(to_json) = object.get(scope, to_json_key) { + if to_json.is_function() { + let to_json = v8::Local::::try_from(to_json).unwrap(); + let json_value = to_json.call(scope, object.into(), &[]).unwrap(); + return serialize_v8_value(scope, json_value, ser); + } + } + + let keys = object + .get_own_property_names( + scope, + v8::GetPropertyNamesArgs { + ..Default::default() + }, + ) + .unwrap(); + let num_keys = keys.length(); + let mut map = ser.serialize_map(Some(num_keys as usize))?; + for i in 0..num_keys { + let key = keys.get_index(scope, i).unwrap(); + let key_str = key.to_rust_string_lossy(scope); + let value = object.get(scope, key).unwrap(); + map.serialize_entry( + &key_str, + &SerializeWrapper(RefCell::new(scope), value), + )?; + } + map.end() + } else { + // TODO(nathanwhit): better error message + Err(S::Error::custom(deno_core::error::type_error( + "Unsupported type", + ))) + } + } + // Open IPC pipe from bootstrap options. #[op2] #[smi] @@ -53,25 +173,66 @@ mod impl_ { Some(child_pipe_fd) => child_pipe_fd.0, None => return Ok(None), }; - + let ref_tracker = IpcRefTracker::new(state.external_ops_tracker.clone()); Ok(Some( - state.resource_table.add(IpcJsonStreamResource::new(fd)?), + state + .resource_table + .add(IpcJsonStreamResource::new(fd, ref_tracker)?), )) } #[op2(async)] - pub async fn op_node_ipc_write( + pub fn op_node_ipc_write<'a>( + scope: &mut v8::HandleScope<'a>, state: Rc>, #[smi] rid: ResourceId, - #[serde] value: serde_json::Value, - ) -> Result<(), AnyError> { + value: v8::Local<'a, v8::Value>, + // using an array as an "out parameter". + // index 0 is a boolean indicating whether the queue is under the limit. + // + // ideally we would just return `Result<(impl Future, bool), ..>`, but that's not + // supported by `op2` currently. + queue_ok: v8::Local<'a, v8::Array>, + ) -> Result>, AnyError> { + let mut serialized = Vec::with_capacity(64); + let mut ser = serde_json::Serializer::new(&mut serialized); + serialize_v8_value(scope, value, &mut ser).map_err(|e| { + deno_core::error::type_error(format!( + "failed to serialize json value: {e}" + )) + })?; + serialized.push(b'\n'); + let stream = state .borrow() .resource_table .get::(rid) .map_err(|_| bad_resource_id())?; - stream.write_msg(value).await?; - Ok(()) + let old = stream + .queued_bytes + .fetch_add(serialized.len(), std::sync::atomic::Ordering::Relaxed); + if old + serialized.len() > 2 * INITIAL_CAPACITY { + // sending messages too fast + let v = false.to_v8(scope)?; + queue_ok.set_index(scope, 0, v); + } + Ok(async move { + stream.clone().write_msg_bytes(&serialized).await?; + stream + .queued_bytes + .fetch_sub(serialized.len(), std::sync::atomic::Ordering::Relaxed); + Ok(()) + }) + } + + /// Value signaling that the other end ipc channel has closed. + /// + /// Node reserves objects of this form (`{ "cmd": "NODE_"`) + /// for internal use, so we use it here as well to avoid breaking anyone. + fn stop_sentinel() -> serde_json::Value { + serde_json::json!({ + "cmd": "NODE_CLOSE" + }) } #[op2(async)] @@ -89,7 +250,92 @@ mod impl_ { let cancel = stream.cancel.clone(); let mut stream = RcRef::map(stream, |r| &r.read_half).borrow_mut().await; let msgs = stream.read_msg().or_cancel(cancel).await??; - Ok(msgs) + if let Some(msg) = msgs { + Ok(msg) + } else { + Ok(stop_sentinel()) + } + } + + #[op2(fast)] + pub fn op_node_ipc_ref(state: &mut OpState, #[smi] rid: ResourceId) { + let stream = state + .resource_table + .get::(rid) + .expect("Invalid resource ID"); + stream.ref_tracker.ref_(); + } + + #[op2(fast)] + pub fn op_node_ipc_unref(state: &mut OpState, #[smi] rid: ResourceId) { + let stream = state + .resource_table + .get::(rid) + .expect("Invalid resource ID"); + stream.ref_tracker.unref(); + } + + /// Tracks whether the IPC resources is currently + /// refed, and allows refing/unrefing it. + pub struct IpcRefTracker { + refed: AtomicBool, + tracker: OpsTracker, + } + + /// A little wrapper so we don't have to get an + /// `ExternalOpsTracker` for tests. When we aren't + /// cfg(test), this will get optimized out. + enum OpsTracker { + External(ExternalOpsTracker), + #[cfg(test)] + Test, + } + + impl OpsTracker { + fn ref_(&self) { + match self { + Self::External(tracker) => tracker.ref_op(), + #[cfg(test)] + Self::Test => {} + } + } + + fn unref(&self) { + match self { + Self::External(tracker) => tracker.unref_op(), + #[cfg(test)] + Self::Test => {} + } + } + } + + impl IpcRefTracker { + pub fn new(tracker: ExternalOpsTracker) -> Self { + Self { + refed: AtomicBool::new(false), + tracker: OpsTracker::External(tracker), + } + } + + #[cfg(test)] + fn new_test() -> Self { + Self { + refed: AtomicBool::new(false), + tracker: OpsTracker::Test, + } + } + + fn ref_(&self) { + if !self.refed.swap(true, std::sync::atomic::Ordering::AcqRel) { + self.tracker.ref_(); + } + } + + fn unref(&self) { + if self.refed.swap(false, std::sync::atomic::Ordering::AcqRel) { + self.tracker.unref(); + } + } } pub struct IpcJsonStreamResource { @@ -99,6 +345,8 @@ mod impl_ { #[cfg(windows)] write_half: AsyncRefCell>, cancel: Rc, + queued_bytes: AtomicUsize, + ref_tracker: IpcRefTracker, } impl deno_core::Resource for IpcJsonStreamResource { @@ -134,64 +382,56 @@ mod impl_ { } impl IpcJsonStreamResource { - pub fn new(stream: i64) -> Result { + pub fn new( + stream: i64, + ref_tracker: IpcRefTracker, + ) -> Result { let (read_half, write_half) = pipe(stream as _)?; Ok(Self { read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)), write_half: AsyncRefCell::new(write_half), cancel: Default::default(), + queued_bytes: Default::default(), + ref_tracker, }) } - #[cfg(unix)] - #[cfg(test)] - fn from_stream(stream: UnixStream) -> Self { + #[cfg(all(unix, test))] + fn from_stream(stream: UnixStream, ref_tracker: IpcRefTracker) -> Self { let (read_half, write_half) = stream.into_split(); Self { read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)), write_half: AsyncRefCell::new(write_half), cancel: Default::default(), + queued_bytes: Default::default(), + ref_tracker, } } - #[cfg(windows)] - #[cfg(test)] - fn from_stream(pipe: NamedPipeClient) -> Self { + #[cfg(all(windows, test))] + fn from_stream(pipe: NamedPipeClient, ref_tracker: IpcRefTracker) -> Self { let (read_half, write_half) = tokio::io::split(pipe); Self { read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)), write_half: AsyncRefCell::new(write_half), cancel: Default::default(), + queued_bytes: Default::default(), + ref_tracker, } } - async fn write_msg( + /// writes _newline terminated_ JSON message to the IPC pipe. + async fn write_msg_bytes( self: Rc, - msg: serde_json::Value, + msg: &[u8], ) -> Result<(), AnyError> { let mut write_half = RcRef::map(self, |r| &r.write_half).borrow_mut().await; - // Perf note: We do not benefit from writev here because - // we are always allocating a buffer for serialization anyways. - let mut buf = Vec::new(); - serde_json::to_writer(&mut buf, &msg)?; - buf.push(b'\n'); - write_half.write_all(&buf).await?; + write_half.write_all(msg).await?; Ok(()) } } - #[inline] - fn memchr(needle: u8, haystack: &[u8]) -> Option { - #[cfg(all(target_os = "macos", target_arch = "aarch64"))] - // Safety: haystack of valid length. neon_memchr can handle unaligned - // data. - return unsafe { neon::neon_memchr(haystack, needle, haystack.len()) }; - - #[cfg(not(all(target_os = "macos", target_arch = "aarch64")))] - return haystack.iter().position(|&b| b == needle); - } - // Initial capacity of the buffered reader and the JSON backing buffer. // // This is a tradeoff between memory usage and performance on large messages. @@ -199,41 +439,91 @@ mod impl_ { // 64kb has been chosen after benchmarking 64 to 66536 << 6 - 1 bytes per message. const INITIAL_CAPACITY: usize = 1024 * 64; + /// A buffer for reading from the IPC pipe. + /// Similar to the internal buffer of `tokio::io::BufReader`. + /// + /// This exists to provide buffered reading while granting mutable access + /// to the internal buffer (which isn't exposed through `tokio::io::BufReader` + /// or the `AsyncBufRead` trait). `simd_json` requires mutable access to an input + /// buffer for parsing, so this allows us to use the read buffer directly as the + /// input buffer without a copy (provided the message fits). + struct ReadBuffer { + buffer: Box<[u8]>, + pos: usize, + cap: usize, + } + + impl ReadBuffer { + fn new() -> Self { + Self { + buffer: vec![0; INITIAL_CAPACITY].into_boxed_slice(), + pos: 0, + cap: 0, + } + } + + fn get_mut(&mut self) -> &mut [u8] { + &mut self.buffer + } + + fn available_mut(&mut self) -> &mut [u8] { + &mut self.buffer[self.pos..self.cap] + } + + fn consume(&mut self, n: usize) { + self.pos = std::cmp::min(self.pos + n, self.cap); + } + + fn needs_fill(&self) -> bool { + self.pos >= self.cap + } + } + // JSON serialization stream over IPC pipe. // // `\n` is used as a delimiter between messages. struct IpcJsonStream { #[cfg(unix)] - pipe: BufReader, + pipe: OwnedReadHalf, #[cfg(windows)] - pipe: BufReader>, + pipe: tokio::io::ReadHalf, buffer: Vec, + read_buffer: ReadBuffer, } impl IpcJsonStream { #[cfg(unix)] fn new(pipe: OwnedReadHalf) -> Self { Self { - pipe: BufReader::with_capacity(INITIAL_CAPACITY, pipe), + pipe, buffer: Vec::with_capacity(INITIAL_CAPACITY), + read_buffer: ReadBuffer::new(), } } #[cfg(windows)] fn new(pipe: tokio::io::ReadHalf) -> Self { Self { - pipe: BufReader::with_capacity(INITIAL_CAPACITY, pipe), + pipe, buffer: Vec::with_capacity(INITIAL_CAPACITY), + read_buffer: ReadBuffer::new(), } } - async fn read_msg(&mut self) -> Result { + async fn read_msg( + &mut self, + ) -> Result, AnyError> { let mut json = None; - let nread = - read_msg_inner(&mut self.pipe, &mut self.buffer, &mut json).await?; + let nread = read_msg_inner( + &mut self.pipe, + &mut self.buffer, + &mut json, + &mut self.read_buffer, + ) + .await?; if nread == 0 { // EOF. - return Ok(serde_json::Value::Null); + return Ok(None); } let json = match json { @@ -250,7 +540,7 @@ mod impl_ { self.buffer.set_len(0); } - Ok(json) + Ok(Some(json)) } } @@ -263,6 +553,7 @@ mod impl_ { // The number of bytes appended to buf. This can be less than buf.len() if // the buffer was not empty when the operation was started. read: usize, + read_buffer: &'a mut ReadBuffer, } } @@ -270,43 +561,41 @@ mod impl_ { reader: &'a mut R, buf: &'a mut Vec, json: &'a mut Option, + read_buffer: &'a mut ReadBuffer, ) -> ReadMsgInner<'a, R> where - R: AsyncBufRead + ?Sized + Unpin, + R: AsyncRead + ?Sized + Unpin, { ReadMsgInner { reader, buf, json, read: 0, + read_buffer, } } - fn read_msg_internal( + fn read_msg_internal( mut reader: Pin<&mut R>, cx: &mut Context<'_>, buf: &mut Vec, + read_buffer: &mut ReadBuffer, json: &mut Option, read: &mut usize, ) -> Poll> { loop { let (done, used) = { - let available = match reader.as_mut().poll_fill_buf(cx) { - std::task::Poll::Ready(t) => t?, - std::task::Poll::Pending => return std::task::Poll::Pending, - }; + // effectively a tiny `poll_fill_buf`, but allows us to get a mutable reference to the buffer. + if read_buffer.needs_fill() { + let mut read_buf = ReadBuf::new(read_buffer.get_mut()); + ready!(reader.as_mut().poll_read(cx, &mut read_buf))?; + read_buffer.cap = read_buf.filled().len(); + read_buffer.pos = 0; + } + let available = read_buffer.available_mut(); if let Some(i) = memchr(b'\n', available) { if *read == 0 { // Fast path: parse and put into the json slot directly. - // - // Safety: It is ok to overwrite the contents because - // we don't need to copy it into the buffer and the length will be reset. - let available = unsafe { - std::slice::from_raw_parts_mut( - available.as_ptr() as *mut u8, - available.len(), - ) - }; json.replace( simd_json::from_slice(&mut available[..i + 1]) .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?, @@ -323,7 +612,7 @@ mod impl_ { } }; - reader.as_mut().consume(used); + read_buffer.consume(used); *read += used; if done || used == 0 { return Poll::Ready(Ok(mem::replace(read, 0))); @@ -331,81 +620,30 @@ mod impl_ { } } - impl Future for ReadMsgInner<'_, R> { + impl Future for ReadMsgInner<'_, R> { type Output = io::Result; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let me = self.project(); - read_msg_internal(Pin::new(*me.reader), cx, me.buf, me.json, me.read) - } - } - - #[cfg(all(target_os = "macos", target_arch = "aarch64"))] - mod neon { - use std::arch::aarch64::*; - - pub unsafe fn neon_memchr( - str: &[u8], - c: u8, - length: usize, - ) -> Option { - let end = str.as_ptr().wrapping_add(length); - - // Alignment handling - let mut ptr = str.as_ptr(); - while ptr < end && (ptr as usize) & 0xF != 0 { - if *ptr == c { - return Some(ptr as usize - str.as_ptr() as usize); - } - ptr = ptr.wrapping_add(1); - } - - let search_char = vdupq_n_u8(c); - - while ptr.wrapping_add(16) <= end { - let chunk = vld1q_u8(ptr); - let comparison = vceqq_u8(chunk, search_char); - - // Check first 64 bits - let result0 = vgetq_lane_u64(vreinterpretq_u64_u8(comparison), 0); - if result0 != 0 { - return Some( - (ptr as usize - str.as_ptr() as usize) - + result0.trailing_zeros() as usize / 8, - ); - } - - // Check second 64 bits - let result1 = vgetq_lane_u64(vreinterpretq_u64_u8(comparison), 1); - if result1 != 0 { - return Some( - (ptr as usize - str.as_ptr() as usize) - + 8 - + result1.trailing_zeros() as usize / 8, - ); - } - - ptr = ptr.wrapping_add(16); - } - - // Handle remaining unaligned characters - while ptr < end { - if *ptr == c { - return Some(ptr as usize - str.as_ptr() as usize); - } - ptr = ptr.wrapping_add(1); - } - - None + read_msg_internal( + Pin::new(*me.reader), + cx, + me.buf, + me.read_buffer, + me.json, + me.read, + ) } } #[cfg(test)] mod tests { use super::IpcJsonStreamResource; - use deno_core::serde_json; use deno_core::serde_json::json; + use deno_core::v8; + use deno_core::JsRuntime; use deno_core::RcRef; + use deno_core::RuntimeOptions; use std::rc::Rc; #[allow(clippy::unused_async)] @@ -414,7 +652,10 @@ mod impl_ { let (a, b) = tokio::net::UnixStream::pair().unwrap(); /* Similar to how ops would use the resource */ - let a = Rc::new(IpcJsonStreamResource::from_stream(a)); + let a = Rc::new(IpcJsonStreamResource::from_stream( + a, + super::IpcRefTracker::new_test(), + )); (a, b) } @@ -434,7 +675,10 @@ mod impl_ { server.connect().await.unwrap(); /* Similar to how ops would use the resource */ - let client = Rc::new(IpcJsonStreamResource::from_stream(client)); + let client = Rc::new(IpcJsonStreamResource::from_stream( + client, + super::IpcRefTracker::new_test(), + )); (client, server) } @@ -467,10 +711,9 @@ mod impl_ { let mut ipc = RcRef::map(ipc, |r| &r.read_half).borrow_mut().await; loop { - let msgs = ipc.read_msg().await?; - if msgs == serde_json::Value::Null { + let Some(msgs) = ipc.read_msg().await? else { break; - } + }; bytes += msgs.as_str().unwrap().len(); if start.elapsed().as_secs() > 5 { break; @@ -501,10 +744,13 @@ mod impl_ { Ok::<_, std::io::Error>(()) }); - ipc.clone().write_msg(json!("hello")).await?; + ipc + .clone() + .write_msg_bytes(&json_to_bytes(json!("hello"))) + .await?; let mut ipc = RcRef::map(ipc, |r| &r.read_half).borrow_mut().await; - let msgs = ipc.read_msg().await?; + let msgs = ipc.read_msg().await?.unwrap(); assert_eq!(msgs, json!("world")); child.await??; @@ -512,6 +758,12 @@ mod impl_ { Ok(()) } + fn json_to_bytes(v: deno_core::serde_json::Value) -> Vec { + let mut buf = deno_core::serde_json::to_vec(&v).unwrap(); + buf.push(b'\n'); + buf + } + #[tokio::test] async fn unix_ipc_json_multi() -> Result<(), Box> { let (ipc, mut fd2) = pair().await; @@ -527,11 +779,17 @@ mod impl_ { Ok::<_, std::io::Error>(()) }); - ipc.clone().write_msg(json!("hello")).await?; - ipc.clone().write_msg(json!("world")).await?; + ipc + .clone() + .write_msg_bytes(&json_to_bytes(json!("hello"))) + .await?; + ipc + .clone() + .write_msg_bytes(&json_to_bytes(json!("world"))) + .await?; let mut ipc = RcRef::map(ipc, |r| &r.read_half).borrow_mut().await; - let msgs = ipc.read_msg().await?; + let msgs = ipc.read_msg().await?.unwrap(); assert_eq!(msgs, json!("foo")); child.await??; @@ -566,5 +824,58 @@ mod impl_ { let empty = b""; assert_eq!(super::memchr(b'\n', empty), None); } + + fn wrap_expr(s: &str) -> String { + format!("(function () {{ return {s}; }})()") + } + + fn serialize_js_to_json(runtime: &mut JsRuntime, js: String) -> String { + let val = runtime.execute_script("", js).unwrap(); + let scope = &mut runtime.handle_scope(); + let val = v8::Local::new(scope, val); + let mut buf = Vec::new(); + let mut ser = deno_core::serde_json::Serializer::new(&mut buf); + super::serialize_v8_value(scope, val, &mut ser).unwrap(); + String::from_utf8(buf).unwrap() + } + + #[test] + fn ipc_serialization() { + let mut runtime = JsRuntime::new(RuntimeOptions::default()); + + let cases = [ + ("'hello'", "\"hello\""), + ("1", "1"), + ("1.5", "1.5"), + ("Number.NaN", "null"), + ("Infinity", "null"), + ("Number.MAX_SAFE_INTEGER", &(2i64.pow(53) - 1).to_string()), + ( + "Number.MIN_SAFE_INTEGER", + &(-(2i64.pow(53) - 1)).to_string(), + ), + ("[1, 2, 3]", "[1,2,3]"), + ("new Uint8Array([1,2,3])", "[1,2,3]"), + ( + "{ a: 1.5, b: { c: new ArrayBuffer(5) }}", + r#"{"a":1.5,"b":{"c":{}}}"#, + ), + ("new Number(1)", "1"), + ("new Boolean(true)", "true"), + ("true", "true"), + (r#"new String("foo")"#, "\"foo\""), + ("null", "null"), + ( + r#"{ a: "field", toJSON() { return "custom"; } }"#, + "\"custom\"", + ), + ]; + + for (input, expect) in cases { + let js = wrap_expr(input); + let actual = serialize_js_to_json(&mut runtime, js); + assert_eq!(actual, expect); + } + } } } diff --git a/ext/node/ops/vm.rs b/ext/node/ops/vm.rs index df631a51f9e223..a44d84975f883a 100644 --- a/ext/node/ops/vm.rs +++ b/ext/node/ops/vm.rs @@ -140,8 +140,7 @@ mod tests { #[test] fn test_run_in_this_context() { let platform = v8::new_default_platform(0, false).make_shared(); - v8::V8::initialize_platform(platform); - v8::V8::initialize(); + deno_core::JsRuntime::init_platform(Some(platform)); let isolate = &mut v8::Isolate::new(Default::default()); diff --git a/ext/node/polyfills/child_process.ts b/ext/node/polyfills/child_process.ts index 2aba88d635dd90..bb38b746c13082 100644 --- a/ext/node/polyfills/child_process.ts +++ b/ext/node/polyfills/child_process.ts @@ -115,7 +115,8 @@ export function fork( // more const v8Flags: string[] = []; if (Array.isArray(execArgv)) { - for (let index = 0; index < execArgv.length; index++) { + let index = 0; + while (index < execArgv.length) { const flag = execArgv[index]; if (flag.startsWith("--max-old-space-size")) { execArgv.splice(index, 1); @@ -123,6 +124,16 @@ export function fork( } else if (flag.startsWith("--enable-source-maps")) { // https://github.com/denoland/deno/issues/21750 execArgv.splice(index, 1); + } else if (flag.startsWith("-C") || flag.startsWith("--conditions")) { + let rm = 1; + if (flag.indexOf("=") === -1) { + // --conditions foo + // so remove the next argument as well. + rm = 2; + } + execArgv.splice(index, rm); + } else { + index++; } } } @@ -825,7 +836,17 @@ export function execFileSync( function setupChildProcessIpcChannel() { const fd = op_node_child_ipc_pipe(); if (typeof fd != "number" || fd < 0) return; - setupChannel(process, fd); + const control = setupChannel(process, fd); + process.on("newListener", (name: string) => { + if (name === "message" || name === "disconnect") { + control.refCounted(); + } + }); + process.on("removeListener", (name: string) => { + if (name === "message" || name === "disconnect") { + control.unrefCounted(); + } + }); } internals.__setupChildProcessIpcChannel = setupChildProcessIpcChannel; diff --git a/ext/node/polyfills/internal/child_process.ts b/ext/node/polyfills/internal/child_process.ts index cabae63ee4e49f..2dcf0e78207209 100644 --- a/ext/node/polyfills/internal/child_process.ts +++ b/ext/node/polyfills/internal/child_process.ts @@ -7,7 +7,12 @@ // deno-lint-ignore-file prefer-primordials import { core, internals } from "ext:core/mod.js"; -import { op_node_ipc_read, op_node_ipc_write } from "ext:core/ops"; +import { + op_node_ipc_read, + op_node_ipc_ref, + op_node_ipc_unref, + op_node_ipc_write, +} from "ext:core/ops"; import { ArrayIsArray, ArrayPrototypeFilter, @@ -17,13 +22,14 @@ import { ArrayPrototypeSort, ArrayPrototypeUnshift, ObjectHasOwn, + StringPrototypeStartsWith, StringPrototypeToUpperCase, } from "ext:deno_node/internal/primordials.mjs"; import { assert } from "ext:deno_node/_util/asserts.ts"; import { EventEmitter } from "node:events"; import { os } from "ext:deno_node/internal_binding/constants.ts"; -import { notImplemented, warnNotImplemented } from "ext:deno_node/_utils.ts"; +import { notImplemented } from "ext:deno_node/_utils.ts"; import { Readable, Stream, Writable } from "node:stream"; import { isWindows } from "ext:deno_node/_util/os.ts"; import { nextTick } from "ext:deno_node/_next_tick.ts"; @@ -31,6 +37,7 @@ import { AbortError, ERR_INVALID_ARG_TYPE, ERR_INVALID_ARG_VALUE, + ERR_IPC_CHANNEL_CLOSED, ERR_UNKNOWN_SIGNAL, } from "ext:deno_node/internal/errors.ts"; import { Buffer } from "node:buffer"; @@ -46,6 +53,7 @@ import { import { kEmptyObject } from "ext:deno_node/internal/util.mjs"; import { getValidatedPath } from "ext:deno_node/internal/fs/utils.mjs"; import process from "node:process"; +import { StringPrototypeSlice } from "ext:deno_node/internal/primordials.mjs"; export function mapValues( record: Readonly>, @@ -97,6 +105,19 @@ export function stdioStringToArray( return options; } +const kClosesNeeded = Symbol("_closesNeeded"); +const kClosesReceived = Symbol("_closesReceived"); + +// We only want to emit a close event for the child process when all of +// the writable streams have closed. The value of `child[kClosesNeeded]` should be 1 + +// the number of opened writable streams (note this excludes `stdin`). +function maybeClose(child: ChildProcess) { + child[kClosesReceived]++; + if (child[kClosesNeeded] === child[kClosesReceived]) { + child.emit("close", child.exitCode, child.signalCode); + } +} + export class ChildProcess extends EventEmitter { /** * The exit code of the child process. This property will be `null` until the child process exits. @@ -152,8 +173,13 @@ export class ChildProcess extends EventEmitter { null, ]; + disconnect?: () => void; + #process!: Deno.ChildProcess; #spawned = Promise.withResolvers(); + [kClosesNeeded] = 1; + [kClosesReceived] = 0; + canDisconnect = false; constructor( command: string, @@ -218,13 +244,23 @@ export class ChildProcess extends EventEmitter { if (stdout === "pipe") { assert(this.#process.stdout); + this[kClosesNeeded]++; this.stdout = Readable.fromWeb(this.#process.stdout); + this.stdout.on("close", () => { + maybeClose(this); + }); } if (stderr === "pipe") { assert(this.#process.stderr); + this[kClosesNeeded]++; this.stderr = Readable.fromWeb(this.#process.stderr); + this.stderr.on("close", () => { + maybeClose(this); + }); } + // TODO(nathanwhit): once we impl > 3 stdio pipes make sure we also listen for their + // close events (like above) this.stdio[0] = this.stdin; this.stdio[1] = this.stdout; @@ -259,6 +295,10 @@ export class ChildProcess extends EventEmitter { const pipeFd = internals.getPipeFd(this.#process); if (typeof pipeFd == "number") { setupChannel(this, pipeFd); + this[kClosesNeeded]++; + this.on("disconnect", () => { + maybeClose(this); + }); } (async () => { @@ -271,7 +311,7 @@ export class ChildProcess extends EventEmitter { this.emit("exit", exitCode, signalCode); await this.#_waitForChildStreamsToClose(); this.#closePipes(); - this.emit("close", exitCode, signalCode); + maybeClose(this); }); })(); } catch (err) { @@ -304,7 +344,7 @@ export class ChildProcess extends EventEmitter { } /* Cancel any pending IPC I/O */ - if (this.implementsDisconnect) { + if (this.canDisconnect) { this.disconnect?.(); } @@ -321,10 +361,6 @@ export class ChildProcess extends EventEmitter { this.#process.unref(); } - disconnect() { - warnNotImplemented("ChildProcess.prototype.disconnect"); - } - async #_waitForChildStreamsToClose() { const promises = [] as Array>; // Don't close parent process stdin if that's passed through @@ -359,6 +395,16 @@ export class ChildProcess extends EventEmitter { assert(this.stdin); this.stdin.destroy(); } + /// TODO(nathanwhit): for some reason when the child process exits + /// and the child end of the named pipe closes, reads still just return `Pending` + /// instead of returning that 0 bytes were read (to signal the pipe died). + /// For now, just forcibly disconnect, but in theory I think we could miss messages + /// that haven't been read yet. + if (Deno.build.os === "windows") { + if (this.canDisconnect) { + this.disconnect?.(); + } + } } } @@ -1099,18 +1145,109 @@ function toDenoArgs(args: string[]): string[] { return denoArgs; } -export function setupChannel(target, ipc) { +const kControlDisconnect = Symbol("kControlDisconnect"); +const kPendingMessages = Symbol("kPendingMessages"); + +// controls refcounting for the IPC channel +class Control extends EventEmitter { + #channel: number; + #refs: number = 0; + #refExplicitlySet = false; + #connected = true; + [kPendingMessages] = []; + constructor(channel: number) { + super(); + this.#channel = channel; + } + + #ref() { + if (this.#connected) { + op_node_ipc_ref(this.#channel); + } + } + + #unref() { + if (this.#connected) { + op_node_ipc_unref(this.#channel); + } + } + + [kControlDisconnect]() { + this.#unref(); + this.#connected = false; + } + + refCounted() { + if (++this.#refs === 1 && !this.#refExplicitlySet) { + this.#ref(); + } + } + + unrefCounted() { + if (--this.#refs === 0 && !this.#refExplicitlySet) { + this.#unref(); + this.emit("unref"); + } + } + + ref() { + this.#refExplicitlySet = true; + this.#ref(); + } + + unref() { + this.#refExplicitlySet = false; + this.#unref(); + } +} + +type InternalMessage = { + cmd: `NODE_${string}`; +}; + +// deno-lint-ignore no-explicit-any +function isInternal(msg: any): msg is InternalMessage { + if (msg && typeof msg === "object") { + const cmd = msg["cmd"]; + if (typeof cmd === "string") { + return StringPrototypeStartsWith(cmd, "NODE_"); + } + } + return false; +} + +function internalCmdName(msg: InternalMessage): string { + return StringPrototypeSlice(msg.cmd, 5); +} + +// deno-lint-ignore no-explicit-any +export function setupChannel(target: any, ipc: number) { + const control = new Control(ipc); + target.channel = control; + async function readLoop() { try { while (true) { if (!target.connected || target.killed) { return; } - const msg = await op_node_ipc_read(ipc); - if (msg == null) { - // Channel closed. - target.disconnect(); - return; + const prom = op_node_ipc_read(ipc); + // there will always be a pending read promise, + // but it shouldn't keep the event loop from exiting + core.unrefOpPromise(prom); + const msg = await prom; + if (isInternal(msg)) { + const cmd = internalCmdName(msg); + if (cmd === "CLOSE") { + // Channel closed. + target.disconnect(); + return; + } else { + // TODO(nathanwhit): once we add support for sending + // handles, if we want to support deno-node IPC interop, + // we'll need to handle the NODE_HANDLE_* messages here. + continue; + } } process.nextTick(handleMessage, msg); @@ -1126,9 +1263,29 @@ export function setupChannel(target, ipc) { } function handleMessage(msg) { - target.emit("message", msg); + if (!target.channel) { + return; + } + if (target.listenerCount("message") !== 0) { + target.emit("message", msg); + return; + } + + ArrayPrototypePush(target.channel[kPendingMessages], msg); } + target.on("newListener", () => { + nextTick(() => { + if (!target.channel || !target.listenerCount("message")) { + return; + } + for (const msg of target.channel[kPendingMessages]) { + target.emit("message", msg); + } + target.channel[kPendingMessages] = []; + }); + }); + target.send = function (message, handle, options, callback) { if (typeof handle === "function") { callback = handle; @@ -1151,32 +1308,55 @@ export function setupChannel(target, ipc) { notImplemented("ChildProcess.send with handle"); } - op_node_ipc_write(ipc, message) + if (!target.connected) { + const err = new ERR_IPC_CHANNEL_CLOSED(); + if (typeof callback === "function") { + console.error("ChildProcess.send with callback"); + process.nextTick(callback, err); + } else { + nextTick(() => target.emit("error", err)); + } + return false; + } + + // signals whether the queue is within the limit. + // if false, the sender should slow down. + // this acts as a backpressure mechanism. + const queueOk = [true]; + control.refCounted(); + op_node_ipc_write(ipc, message, queueOk) .then(() => { + control.unrefCounted(); if (callback) { process.nextTick(callback, null); } }); + return queueOk[0]; }; target.connected = true; target.disconnect = function () { - if (!this.connected) { - this.emit("error", new Error("IPC channel is already disconnected")); + if (!target.connected) { + target.emit("error", new Error("IPC channel is already disconnected")); return; } - this.connected = false; + target.connected = false; + target.canDisconnect = false; + control[kControlDisconnect](); process.nextTick(() => { + target.channel = null; core.close(ipc); target.emit("disconnect"); }); }; - target.implementsDisconnect = true; + target.canDisconnect = true; // Start reading messages from the channel. readLoop(); + + return control; } export default { diff --git a/runtime/ops/process.rs b/runtime/ops/process.rs index ecf6ef49bb9e01..69fb5cf29d9cb1 100644 --- a/runtime/ops/process.rs +++ b/runtime/ops/process.rs @@ -345,14 +345,15 @@ fn create_command( }); /* One end returned to parent process (this) */ - let pipe_rid = Some( - state - .resource_table - .add(deno_node::IpcJsonStreamResource::new(fd1 as _)?), - ); + let pipe_rid = Some(state.resource_table.add( + deno_node::IpcJsonStreamResource::new( + fd1 as _, + deno_node::IpcRefTracker::new(state.external_ops_tracker.clone()), + )?, + )); - /* The other end passed to child process via DENO_CHANNEL_FD */ - command.env("DENO_CHANNEL_FD", format!("{}", ipc)); + /* The other end passed to child process via NODE_CHANNEL_FD */ + command.env("NODE_CHANNEL_FD", format!("{}", ipc)); return Ok((command, pipe_rid)); } @@ -470,14 +471,15 @@ fn create_command( } /* One end returned to parent process (this) */ - let pipe_fd = Some( - state - .resource_table - .add(deno_node::IpcJsonStreamResource::new(hd1 as i64)?), - ); - - /* The other end passed to child process via DENO_CHANNEL_FD */ - command.env("DENO_CHANNEL_FD", format!("{}", hd2 as i64)); + let pipe_fd = Some(state.resource_table.add( + deno_node::IpcJsonStreamResource::new( + hd1 as i64, + deno_node::IpcRefTracker::new(state.external_ops_tracker.clone()), + )?, + )); + + /* The other end passed to child process via NODE_CHANNEL_FD */ + command.env("NODE_CHANNEL_FD", format!("{}", hd2 as i64)); return Ok((command, pipe_fd)); } diff --git a/tests/unit_node/child_process_test.ts b/tests/unit_node/child_process_test.ts index cfac0b5a97ad50..d613d298944fc5 100644 --- a/tests/unit_node/child_process_test.ts +++ b/tests/unit_node/child_process_test.ts @@ -9,8 +9,10 @@ import { assertNotStrictEquals, assertStrictEquals, assertStringIncludes, + assertThrows, } from "@std/assert"; import * as path from "@std/path"; +import { setTimeout } from "node:timers"; const { spawn, spawnSync, execFile, execFileSync, ChildProcess } = CP; @@ -63,6 +65,7 @@ Deno.test("[node/child_process disconnect] the method exists", async () => { const deferred = withTimeout(); const childProcess = spawn(Deno.execPath(), ["--help"], { env: { NO_COLOR: "true" }, + stdio: ["pipe", "pipe", "pipe", "ipc"], }); try { childProcess.disconnect(); @@ -855,3 +858,191 @@ Deno.test( assertEquals(output.stderr, null); }, ); + +Deno.test( + async function ipcSerialization() { + const timeout = withTimeout(); + const script = ` + if (typeof process.send !== "function") { + console.error("process.send is not a function"); + process.exit(1); + } + + class BigIntWrapper { + constructor(value) { + this.value = value; + } + toJSON() { + return this.value.toString(); + } + } + + const makeSab = (arr) => { + const sab = new SharedArrayBuffer(arr.length); + const buf = new Uint8Array(sab); + for (let i = 0; i < arr.length; i++) { + buf[i] = arr[i]; + } + return buf; + }; + + + const inputs = [ + "foo", + { + foo: "bar", + }, + 42, + true, + null, + new Uint8Array([1, 2, 3]), + { + foo: new Uint8Array([1, 2, 3]), + bar: makeSab([4, 5, 6]), + }, + [1, { foo: 2 }, [3, 4]], + new BigIntWrapper(42n), + ]; + for (const input of inputs) { + process.send(input); + } + `; + const file = await Deno.makeTempFile(); + await Deno.writeTextFile(file, script); + const child = CP.fork(file, [], { + stdio: ["inherit", "inherit", "inherit", "ipc"], + }); + const expect = [ + "foo", + { + foo: "bar", + }, + 42, + true, + null, + [1, 2, 3], + { + foo: [1, 2, 3], + bar: [4, 5, 6], + }, + [1, { foo: 2 }, [3, 4]], + "42", + ]; + let i = 0; + + child.on("message", (message) => { + assertEquals(message, expect[i]); + i++; + }); + child.on("close", () => timeout.resolve()); + await timeout.promise; + assertEquals(i, expect.length); + }, +); + +Deno.test(async function childProcessExitsGracefully() { + const testdataDir = path.join( + path.dirname(path.fromFileUrl(import.meta.url)), + "testdata", + ); + const script = path.join( + testdataDir, + "node_modules", + "foo", + "index.js", + ); + const p = Promise.withResolvers(); + const cp = CP.fork(script, [], { + cwd: testdataDir, + stdio: ["inherit", "inherit", "inherit", "ipc"], + }); + cp.on("close", () => p.resolve()); + + await p.promise; +}); + +Deno.test(async function killMultipleTimesNoError() { + const loop = ` + while (true) { + await new Promise((resolve) => setTimeout(resolve, 10000)); + } + `; + + const timeout = withTimeout(); + const file = await Deno.makeTempFile(); + await Deno.writeTextFile(file, loop); + const child = CP.fork(file, [], { + stdio: ["inherit", "inherit", "inherit", "ipc"], + }); + child.on("close", () => { + timeout.resolve(); + }); + child.kill(); + child.kill(); + + // explicitly calling disconnect after kill should throw + assertThrows(() => child.disconnect()); + + await timeout.promise; +}); + +// Make sure that you receive messages sent before a "message" event listener is set up +Deno.test(async function bufferMessagesIfNoListener() { + const code = ` + process.on("message", (_) => { + process.channel.unref(); + }); + process.send("hello"); + process.send("world"); + console.error("sent messages"); + `; + const file = await Deno.makeTempFile(); + await Deno.writeTextFile(file, code); + const timeout = withTimeout(); + const child = CP.fork(file, [], { + stdio: ["inherit", "inherit", "pipe", "ipc"], + }); + + let got = 0; + child.on("message", (message) => { + if (got++ === 0) { + assertEquals(message, "hello"); + } else { + assertEquals(message, "world"); + } + }); + child.on("close", () => { + timeout.resolve(); + }); + let stderr = ""; + child.stderr?.on("data", (data) => { + stderr += data; + if (stderr.includes("sent messages")) { + // now that we've set up the listeners, and the child + // has sent the messages, we can let it exit + child.send("ready"); + } + }); + await timeout.promise; + assertEquals(got, 2); +}); + +Deno.test(async function sendAfterClosedThrows() { + const code = ``; + const file = await Deno.makeTempFile(); + await Deno.writeTextFile(file, code); + const timeout = withTimeout(); + const child = CP.fork(file, [], { + stdio: ["inherit", "inherit", "inherit", "ipc"], + }); + child.on("error", (err) => { + assert("code" in err); + assertEquals(err.code, "ERR_IPC_CHANNEL_CLOSED"); + timeout.resolve(); + }); + child.on("close", () => { + child.send("ready"); + }); + + await timeout.promise; +});