From c6bfb417699c2eeaed7544b6f2383a45f58a6ce2 Mon Sep 17 00:00:00 2001 From: cirospaciari Date: Fri, 26 Jan 2024 01:58:22 -0300 Subject: [PATCH 01/12] win IPC yay --- src/bun.js/api/bun/subprocess.zig | 50 +++-- src/bun.js/ipc.zig | 337 ++++++++++++++++++++++++++++-- src/bun.js/javascript.zig | 52 +++-- src/deps/libuv.zig | 4 +- 4 files changed, 398 insertions(+), 45 deletions(-) diff --git a/src/bun.js/api/bun/subprocess.zig b/src/bun.js/api/bun/subprocess.zig index 3dd2291cf51ba1..49f0fac92ed1ad 100644 --- a/src/bun.js/api/bun/subprocess.zig +++ b/src/bun.js/api/bun/subprocess.zig @@ -805,7 +805,12 @@ pub const Subprocess = struct { pub fn disconnect(this: *Subprocess) void { if (this.ipc_mode == .none) return; - this.ipc.socket.close(0, null); + if (Environment.isWindows) { + this.ipc.pipe.data = this; + this.ipc.close(Subprocess); + } else { + this.ipc.socket.close(0, null); + } this.ipc_mode = .none; } @@ -2193,11 +2198,6 @@ pub const Subprocess = struct { } if (args.get(globalThis, "ipc")) |val| { - if (Environment.isWindows) { - globalThis.throwTODO("TODO: IPC is not yet supported on Windows"); - return .zero; - } - if (val.isCell() and val.isCallable(globalThis.vm())) { // In the future, we should add a way to use a different IPC serialization format, specifically `json`. // but the only use case this has is doing interop with node.js IPC and other programs. @@ -2228,6 +2228,24 @@ pub const Subprocess = struct { env_array.capacity = env_array.items.len; } + const pipe_prefix = "BUN_INTERNAL_IPC_PIPE=\\\\.\\pipe\\BUN_IPC_"; + var pipe_env_bytes: [pipe_prefix.len + 37]u8 = undefined; + + const pipe_name_bytes = pipe_env_bytes["BUN_INTERNAL_IPC_PIPE=".len..]; + + if (ipc_mode != .none) { + if (comptime is_sync) { + globalThis.throwInvalidArguments("IPC is not supported in Bun.spawnSync", .{}); + return .zero; + } + env_array.ensureUnusedCapacity(allocator, 2) catch |err| return globalThis.handleError(err, "in uv_spawn"); + + const uuid = globalThis.bunVM().rareData().nextUUID(); + const pipe_env = std.fmt.bufPrintZ(&pipe_env_bytes, "{s}{s}", .{ pipe_prefix, uuid }) catch |err| return globalThis.handleError(err, "in uv_spawn"); + + env_array.appendAssumeCapacity(pipe_env); + } + env_array.append(allocator, null) catch { globalThis.throwOutOfMemory(); return .zero; @@ -2239,6 +2257,15 @@ pub const Subprocess = struct { globalThis.throwOutOfMemory(); return .zero; }; + subprocess.ipc = .{ .pipe = std.mem.zeroes(uv.uv_pipe_t) }; + if (ipc_mode != .none) { + const errno = subprocess.ipc.configureServer(Subprocess, subprocess, pipe_name_bytes); + if (errno != 0) { + alloc.destroy(subprocess); + globalThis.throwValue(bun.sys.Error.fromCodeInt(errno, .uv_spawn).toJSC(globalThis)); + return .zero; + } + } var uv_stdio = [3]uv.uv_stdio_container_s{ stdio[0].setUpChildIoUvSpawn(0, &subprocess.pipes[0], true, bun.invalid_fd) catch |err| { @@ -2286,6 +2313,7 @@ pub const Subprocess = struct { .pid = subprocess.pid, .pidfd = 0, .stdin = Writable.initWithPipe(stdio[0], &subprocess.pipes[0], globalThis) catch { + alloc.destroy(subprocess); globalThis.throwOutOfMemory(); return .zero; }, @@ -2295,16 +2323,15 @@ pub const Subprocess = struct { .on_exit_callback = if (on_exit_callback != .zero) JSC.Strong.create(on_exit_callback, globalThis) else .{}, .ipc_mode = ipc_mode, - .ipc = undefined, - .ipc_callback = undefined, + .ipc = subprocess.ipc, + .ipc_callback = if (ipc_callback != .zero) JSC.Strong.create(ipc_callback, globalThis) else undefined, .flags = .{ .is_sync = is_sync, }, }; - subprocess.pid.data = subprocess; - std.debug.assert(ipc_mode == .none); //TODO: + subprocess.pid.data = subprocess; const out = if (comptime !is_sync) subprocess.toJS(globalThis) else .zero; subprocess.this_jsvalue = out; @@ -3396,8 +3423,7 @@ pub const Subprocess = struct { } } - pub fn handleIPCClose(this: *Subprocess, _: IPC.Socket) void { - // uSocket is already freed so calling .close() on the socket can segfault + pub fn handleIPCClose(this: *Subprocess) void { this.ipc_mode = .none; this.updateHasPendingActivity(); } diff --git a/src/bun.js/ipc.zig b/src/bun.js/ipc.zig index 9d8af127b7df31..5a5569a67af52e 100644 --- a/src/bun.js/ipc.zig +++ b/src/bun.js/ipc.zig @@ -94,14 +94,14 @@ pub fn decodeIPCMessage( pub const Socket = uws.NewSocketHandler(false); -pub const IPCData = struct { +const SocketIPCData = struct { socket: Socket, incoming: bun.ByteList = .{}, // Maybe we should use IPCBuffer here as well outgoing: IPCBuffer = .{}, has_written_version: if (Environment.allow_assert) u1 else u0 = 0, - pub fn writeVersionPacket(this: *IPCData) void { + pub fn writeVersionPacket(this: *SocketIPCData) void { if (Environment.allow_assert) { std.debug.assert(this.has_written_version == 0); } @@ -120,7 +120,7 @@ pub const IPCData = struct { } } - pub fn serializeAndSend(ipc_data: *IPCData, globalThis: *JSGlobalObject, value: JSValue) bool { + pub fn serializeAndSend(ipc_data: *SocketIPCData, globalThis: *JSGlobalObject, value: JSValue) bool { if (Environment.allow_assert) { std.debug.assert(ipc_data.has_written_version == 1); } @@ -156,17 +156,162 @@ pub const IPCData = struct { } }; -/// This type is shared between VirtualMachine and Subprocess for their respective IPC handlers -/// -/// `Context` must be a struct that implements this interface: -/// struct { -/// globalThis: ?*JSGlobalObject, -/// ipc: IPCData, -/// -/// fn handleIPCMessage(*Context, DecodedIPCMessage) void -/// fn handleIPCClose(*Context, Socket) void -/// } -pub fn NewIPCHandler(comptime Context: type) type { +const NamedPipeIPCData = struct { + const uv = bun.windows.libuv; + + pipe: uv.uv_pipe_t, + incoming: bun.ByteList = .{}, // Maybe we should use IPCBuffer here as well + outgoing: IPCBuffer = .{}, + connected: bool = false, + has_written_version: if (Environment.allow_assert) u1 else u0 = 0, + connect_req: uv.uv_connect_t = std.mem.zeroes(uv.uv_connect_t), + server: uv.uv_pipe_t = std.mem.zeroes(uv.uv_pipe_t), + current_payload_len: usize = 0, + + pub fn processSend(this: *NamedPipeIPCData) void { + const bytes = this.outgoing.list.slice(); + log("processSend {d}", .{bytes.len}); + if (bytes.len == 0) return; + + const req = bun.new(uv.uv_write_t, std.mem.zeroes(uv.uv_write_t)); + req.data = @ptrCast(this); + req.write_buffer = uv.uv_buf_t.init(bytes); + log("processSend write_buffer {d}", .{req.write_buffer.len}); + this.current_payload_len = bytes.len; + const write_err = uv.uv_write(req, @ptrCast(&this.pipe), @ptrCast(&req.write_buffer), 1, NamedPipeIPCData.uvWriteCallback).int(); + if (write_err < 0) { + Output.printErrorln("Failed write IPC version", .{}); + return; + } + } + + fn uvWriteCallback(req: *uv.uv_write_t, status: uv.ReturnCode) callconv(.C) void { + const this = bun.cast(*NamedPipeIPCData, req.data); + log("uvWriteCallback {d} {d} {d}", .{ status.int(), this.current_payload_len, this.outgoing.list.len }); + defer bun.destroy(req); + if (status.errEnum()) |_| { + Output.printErrorln("Failed write IPC data", .{}); + return; + } + const n = this.current_payload_len; + if (n == this.outgoing.list.len) { + this.outgoing.cursor = 0; + this.outgoing.list.len = 0; + } else { + this.outgoing.cursor += @intCast(n); + this.processSend(); + } + } + + pub fn writeVersionPacket(this: *NamedPipeIPCData) void { + if (Environment.allow_assert) { + std.debug.assert(this.has_written_version == 0); + } + const VersionPacket = extern struct { + type: IPCMessageType align(1) = .Version, + version: u32 align(1) = ipcVersion, + }; + + if (Environment.allow_assert) { + this.has_written_version = 1; + } + const bytes = comptime std.mem.asBytes(&VersionPacket{}); + // enqueue to be sent after connecting + var list = this.outgoing.list.listManaged(bun.default_allocator); + list.appendSlice(bytes) catch bun.outOfMemory(); + if (this.connected) { + this.processSend(); + } + } + + pub fn serializeAndSend(this: *NamedPipeIPCData, globalThis: *JSGlobalObject, value: JSValue) bool { + if (Environment.allow_assert) { + std.debug.assert(this.has_written_version == 1); + } + + const serialized = value.serialize(globalThis) orelse return false; + defer serialized.deinit(); + + const size: u32 = @intCast(serialized.data.len); + log("serializeAndSend {d}", .{size}); + + const payload_length: usize = @sizeOf(IPCMessageType) + @sizeOf(u32) + size; + + this.outgoing.list.ensureUnusedCapacity(bun.default_allocator, payload_length) catch @panic("OOM"); + const start_offset = this.outgoing.list.len; + + this.outgoing.list.writeTypeAsBytesAssumeCapacity(u8, @intFromEnum(IPCMessageType.SerializedMessage)); + this.outgoing.list.writeTypeAsBytesAssumeCapacity(u32, size); + this.outgoing.list.appendSliceAssumeCapacity(serialized.data); + + std.debug.assert(this.outgoing.list.len == start_offset + payload_length); + + if (start_offset == 0) { + std.debug.assert(this.outgoing.cursor == 0); + if (this.connected) { + this.processSend(); + } + } + + return true; + } + + pub fn close(this: *NamedPipeIPCData, comptime Context: type) void { + if (this.server.loop != null) { + _ = uv.uv_close(@ptrCast(&this.pipe), NewNamedPipeIPCHandler(Context).onServerClose); + } else { + _ = uv.uv_close(@ptrCast(&this.pipe), NewNamedPipeIPCHandler(Context).onClose); + } + } + + pub fn configureServer(this: *NamedPipeIPCData, comptime Context: type, instance: *Context, named_pipe: []const u8) c_int { + log("configureServer", .{}); + const ipc_pipe = &this.server; + + var errno = uv.uv_pipe_init(uv.Loop.get(), ipc_pipe, 0); + if (errno != 0) { + return errno; + } + ipc_pipe.data = @ptrCast(instance); + errno = uv.uv_pipe_bind2(ipc_pipe, named_pipe.ptr, named_pipe.len, 0); + if (errno != 0) { + return errno; + } + errno = uv.uv_listen(@ptrCast(ipc_pipe), 0, NewNamedPipeIPCHandler(Context).onNewClientConnect); + if (errno != 0) { + return errno; + } + + uv.uv_pipe_pending_instances(ipc_pipe, 1); + + uv.uv_unref(@ptrCast(ipc_pipe)); + + this.writeVersionPacket(); + return 0; + } + + pub fn configureClient(this: *NamedPipeIPCData, comptime Context: type, instance: *Context, named_pipe: []const u8) c_int { + log("configureClient", .{}); + const ipc_pipe = &this.pipe; + var errno = uv.uv_pipe_init(uv.Loop.get(), ipc_pipe, 1); + if (errno != 0) { + return errno; + } + ipc_pipe.data = @ptrCast(instance); + this.connect_req.data = @ptrCast(instance); + errno = uv.uv_pipe_connect2(&this.connect_req, ipc_pipe, named_pipe.ptr, named_pipe.len, 0, NewNamedPipeIPCHandler(Context).onConnect); + if (errno != 0) { + return errno; + } + + this.writeVersionPacket(); + return 0; + } +}; + +pub const IPCData = if (Environment.isWindows) NamedPipeIPCData else NamedPipeIPCData; + +fn NewSocketIPCHandler(comptime Context: type) type { return struct { pub fn onOpen( _: *anyopaque, @@ -183,13 +328,13 @@ pub fn NewIPCHandler(comptime Context: type) type { pub fn onClose( this: *Context, - socket: Socket, + _: Socket, _: c_int, _: ?*anyopaque, ) void { // ?! does uSockets .close call onClose? log("onClose\n", .{}); - this.handleIPCClose(socket); + this.handleIPCClose(); } pub fn onData( @@ -208,7 +353,7 @@ pub fn NewIPCHandler(comptime Context: type) type { if (this.globalThis) |global| { break :brk global; } - this.handleIPCClose(socket); + this.handleIPCClose(); socket.close(0, null); return; }, @@ -227,7 +372,7 @@ pub fn NewIPCHandler(comptime Context: type) type { }, error.InvalidFormat => { Output.printErrorln("InvalidFormatError during IPC message handling", .{}); - this.handleIPCClose(socket); + this.handleIPCClose(); socket.close(0, null); return; }, @@ -257,7 +402,7 @@ pub fn NewIPCHandler(comptime Context: type) type { }, error.InvalidFormat => { Output.printErrorln("InvalidFormatError during IPC message handling", .{}); - this.handleIPCClose(socket); + this.handleIPCClose(); socket.close(0, null); return; }, @@ -318,3 +463,157 @@ pub fn NewIPCHandler(comptime Context: type) type { ) void {} }; } + +fn NewNamedPipeIPCHandler(comptime Context: type) type { + const uv = bun.windows.libuv; + return struct { + fn uvStreamAllocCallback(handle: *uv.uv_handle_t, suggested_size: usize, buffer: *uv.uv_buf_t) callconv(.C) void { + const this: *Context = @ptrCast(@alignCast(handle.data)); + + var available = this.ipc.incoming.available(); + if (available.len < suggested_size) { + this.ipc.incoming.ensureUnusedCapacity(bun.default_allocator, suggested_size) catch bun.outOfMemory(); + available = this.ipc.incoming.available(); + } + log("uvStreamAllocCallback {d}", .{suggested_size}); + buffer.* = .{ .base = @ptrCast(available.ptr), .len = @intCast(suggested_size) }; + } + + fn uvStreamReadCallback(handle: *uv.uv_handle_t, nread: isize, buffer: *const uv.uv_buf_t) callconv(.C) void { + log("uvStreamReadCallback {d}", .{nread}); + const this: *Context = @ptrCast(@alignCast(handle.data)); + if (nread <= 0) { + switch (nread) { + 0 => { + // EAGAIN or EWOULDBLOCK + return; + }, + uv.UV_EOF => { + _ = uv.uv_read_stop(@ptrCast(handle)); + this.ipc.close(Context); + }, + else => { + _ = uv.uv_read_stop(@ptrCast(handle)); + this.ipc.close(Context); + }, + } + + // when nread < 0 buffer maybe not point to a valid address + return; + } + + this.ipc.incoming.len += @as(u32, @truncate(buffer.len)); + var slice = this.ipc.incoming.slice(); + const globalThis = switch (@typeInfo(@TypeOf(this.globalThis))) { + .Pointer => this.globalThis, + .Optional => brk: { + if (this.globalThis) |global| { + break :brk global; + } + this.handleIPCClose(); + this.ipc.close(Context); + return; + }, + else => @panic("Unexpected globalThis type: " ++ @typeName(@TypeOf(this.globalThis))), + }; + while (true) { + const result = decodeIPCMessage(slice, globalThis) catch |e| switch (e) { + error.NotEnoughBytes => { + // copy the remaining bytes to the start of the buffer + bun.copy(u8, this.ipc.incoming.ptr[0..slice.len], slice); + this.ipc.incoming.len = @truncate(slice.len); + log("hit NotEnoughBytes2", .{}); + return; + }, + error.InvalidFormat => { + Output.printErrorln("InvalidFormatError during IPC message handling", .{}); + this.handleIPCClose(); + this.ipc.close(Context); + return; + }, + }; + + this.handleIPCMessage(result.message); + + if (result.bytes_consumed < slice.len) { + slice = slice[result.bytes_consumed..]; + } else { + // clear the buffer + this.ipc.incoming.len = 0; + return; + } + } + } + + pub fn onNewClientConnect(req: *uv.uv_stream_t, status: c_int) callconv(.C) void { + log("onNewClientConnect {d}", .{status}); + if (status < 0) { + Output.printErrorln("Failed to connect IPC pipe", .{}); + return; + } + const this = bun.cast(*Context, req.data); + const client = &this.ipc.pipe; + const server = &this.ipc.server; + if (uv.uv_pipe_init(uv.Loop.get(), client, 1) != 0) { + Output.printErrorln("Failed to connect IPC pipe", .{}); + return; + } + client.data = server.data; + + if (uv.uv_accept(@ptrCast(server), @ptrCast(client)) == 0) { + if (this.ipc.connected) { + this.ipc.close(Context); + return; + } + this.ipc.connected = true; + _ = uv.uv_read_start(@ptrCast(client), uvStreamAllocCallback, uvStreamReadCallback); + this.ipc.processSend(); + } else { + this.ipc.close(Context); + } + } + pub fn onConnect(req: *uv.uv_connect_t, status: c_int) callconv(.C) void { + log("onConnect {d}", .{status}); + if (status < 0) { + Output.printErrorln("Failed to connect IPC pipe", .{}); + return; + } + const this = bun.cast(*Context, req.data); + _ = uv.uv_read_start(@ptrCast(&this.ipc.pipe), uvStreamAllocCallback, uvStreamReadCallback); + this.ipc.connected = true; + this.ipc.processSend(); + } + pub fn onServerClose(handler: *anyopaque) callconv(.C) void { + log("onServerClose", .{}); + const event = bun.cast(*uv.uv_pipe_t, handler); + const this = bun.cast(*Context, event.data); + this.handleIPCClose(); + } + + pub fn onClose(handler: *anyopaque) callconv(.C) void { + log("onClose", .{}); + const event = bun.cast(*uv.uv_pipe_t, handler); + const this = bun.cast(*Context, event.data); + if (this.ipc.server.loop != null) { + _ = uv.uv_close(@ptrCast(&this.ipc.server), onServerClose); + return; + } + this.handleIPCClose(); + } + }; +} + +/// This type is shared between VirtualMachine and Subprocess for their respective IPC handlers +/// +/// `Context` must be a struct that implements this interface: +/// struct { +/// globalThis: ?*JSGlobalObject, +/// ipc: IPCData, +/// +/// fn handleIPCMessage(*Context, DecodedIPCMessage) void +/// fn handleIPCClose(*Context) void +/// } +pub fn NewIPCHandler(comptime Context: type) type { + const IPCHandler = if (Environment.isWindows) NewNamedPipeIPCHandler else NewSocketIPCHandler; + return IPCHandler(Context); +} diff --git a/src/bun.js/javascript.zig b/src/bun.js/javascript.zig index d06d8b0701e555..ff28f6bb02f48a 100644 --- a/src/bun.js/javascript.zig +++ b/src/bun.js/javascript.zig @@ -95,6 +95,7 @@ const Lock = @import("../lock.zig").Lock; const BuildMessage = JSC.BuildMessage; const ResolveMessage = JSC.ResolveMessage; const Async = bun.Async; +const uv = bun.windows.libuv; pub const OpaqueCallback = *const fn (current: ?*anyopaque) callconv(.C) void; pub fn OpaqueWrap(comptime Context: type, comptime Function: fn (this: *Context) void) OpaqueCallback { @@ -760,11 +761,22 @@ pub const VirtualMachine = struct { this.hide_bun_stackframes = false; } - if (map.map.fetchSwapRemove("BUN_INTERNAL_IPC_FD")) |kv| { - if (std.fmt.parseInt(i32, kv.value.value, 10) catch null) |fd| { - this.initIPCInstance(bun.toFD(fd)); + if (map.map.fetchSwapRemove("BUN_INTERNAL_IPC_PIPE")) |kv| { + if (Environment.isWindows) { + this.initIPCInstance(kv.value.value); } else { + Output.printErrorln("Failed to connect into BUN_INTERNAL_IPC_PIPE", .{}); + } + } + if (map.map.fetchSwapRemove("BUN_INTERNAL_IPC_FD")) |kv| { + if (Environment.isWindows) { Output.printErrorln("Failed to parse BUN_INTERNAL_IPC_FD", .{}); + } else { + if (std.fmt.parseInt(i32, kv.value.value, 10) catch null) |fd| { + this.initIPCInstance(bun.toFD(fd)); + } else { + Output.printErrorln("Failed to parse BUN_INTERNAL_IPC_FD", .{}); + } } } @@ -3000,7 +3012,8 @@ pub const VirtualMachine = struct { pub const IPCInstance = struct { globalThis: ?*JSGlobalObject, - uws_context: *uws.SocketContext, + context: if (Environment.isWindows) u0 else *uws.SocketContext, + ipc: IPC.IPCData, pub fn handleIPCMessage( @@ -3023,36 +3036,51 @@ pub const VirtualMachine = struct { } } - pub fn handleIPCClose(this: *IPCInstance, _: IPC.Socket) void { + pub fn handleIPCClose(this: *IPCInstance) void { JSC.markBinding(@src()); if (this.globalThis) |global| { var vm = global.bunVM(); vm.ipc = null; Process__emitDisconnectEvent(global); } - uws.us_socket_context_free(0, this.uws_context); + + if (!Environment.isWindows) { + uws.us_socket_context_free(0, this.context); + } bun.default_allocator.destroy(this); } pub const Handlers = IPC.NewIPCHandler(IPCInstance); }; - pub fn initIPCInstance(this: *VirtualMachine, fd: bun.FileDescriptor) void { + pub fn initIPCInstance(this: *VirtualMachine, source: if (Environment.isWindows) []const u8 else bun.FileDescriptor) void { + this.event_loop.ensureWaker(); + if (Environment.isWindows) { - Output.warn("IPC is not supported on Windows", .{}); + var instance = bun.default_allocator.create(IPCInstance) catch bun.outOfMemory(); + instance.* = .{ + .globalThis = this.global, + .context = 0, + .ipc = .{ .pipe = std.mem.zeroes(uv.uv_pipe_t) }, + }; + const errno = instance.ipc.configureClient(IPCInstance, instance, source); + if (errno != 0) { + @panic("Unable to start IPC"); + } + this.ipc = instance; return; } - this.event_loop.ensureWaker(); + const context = uws.us_create_socket_context(0, this.event_loop_handle.?, @sizeOf(usize), .{}).?; IPC.Socket.configure(context, true, *IPCInstance, IPCInstance.Handlers); - var instance = bun.default_allocator.create(IPCInstance) catch @panic("OOM"); + var instance = bun.default_allocator.create(IPCInstance) catch bun.outOfMemory(); instance.* = .{ .globalThis = this.global, - .uws_context = context, + .context = context, .ipc = undefined, }; - const socket = IPC.Socket.fromFd(context, fd, IPCInstance, instance, null) orelse @panic("Unable to start IPC"); + const socket = IPC.Socket.fromFd(context, source, IPCInstance, instance, null) orelse @panic("Unable to start IPC"); socket.setTimeout(0); instance.ipc = .{ .socket = socket }; diff --git a/src/deps/libuv.zig b/src/deps/libuv.zig index 2267766fedd344..bdb3eb3dfaed2b 100644 --- a/src/deps/libuv.zig +++ b/src/deps/libuv.zig @@ -950,7 +950,7 @@ const struct_unnamed_385 = extern struct { write_reqs_pending: c_uint, shutdown_req: [*c]uv_shutdown_t, }; -pub const uv_connection_cb = ?*const fn ([*c]uv_stream_t, c_int) callconv(.C) void; +pub const uv_connection_cb = ?*const fn (*uv_stream_t, c_int) callconv(.C) void; const struct_unnamed_389 = extern struct { connection_cb: uv_connection_cb, }; @@ -1520,7 +1520,7 @@ const union_unnamed_441 = extern union { connect: struct_unnamed_443, }; pub const uv_connect_t = struct_uv_connect_s; -pub const uv_connect_cb = ?*const fn ([*c]uv_connect_t, c_int) callconv(.C) void; +pub const uv_connect_cb = ?*const fn (*uv_connect_t, c_int) callconv(.C) void; pub const struct_uv_connect_s = extern struct { data: ?*anyopaque, type: uv_req_type, From be51758acbe0565396cbab6b43ed32741a26e36d Mon Sep 17 00:00:00 2001 From: cirospaciari Date: Fri, 26 Jan 2024 02:07:58 -0300 Subject: [PATCH 02/12] oopsie --- src/bun.js/ipc.zig | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/bun.js/ipc.zig b/src/bun.js/ipc.zig index 5a5569a67af52e..cadfd45541c34b 100644 --- a/src/bun.js/ipc.zig +++ b/src/bun.js/ipc.zig @@ -169,7 +169,7 @@ const NamedPipeIPCData = struct { current_payload_len: usize = 0, pub fn processSend(this: *NamedPipeIPCData) void { - const bytes = this.outgoing.list.slice(); + const bytes = this.outgoing.list.slice()[this.outgoing.cursor..]; log("processSend {d}", .{bytes.len}); if (bytes.len == 0) return; @@ -519,10 +519,6 @@ fn NewNamedPipeIPCHandler(comptime Context: type) type { while (true) { const result = decodeIPCMessage(slice, globalThis) catch |e| switch (e) { error.NotEnoughBytes => { - // copy the remaining bytes to the start of the buffer - bun.copy(u8, this.ipc.incoming.ptr[0..slice.len], slice); - this.ipc.incoming.len = @truncate(slice.len); - log("hit NotEnoughBytes2", .{}); return; }, error.InvalidFormat => { From 935baa1015e3151483e2bb284c35b5676a5303d0 Mon Sep 17 00:00:00 2001 From: cirospaciari Date: Fri, 26 Jan 2024 02:16:06 -0300 Subject: [PATCH 03/12] oopsie 2 --- src/bun.js/ipc.zig | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/bun.js/ipc.zig b/src/bun.js/ipc.zig index cadfd45541c34b..9448769bfbaad9 100644 --- a/src/bun.js/ipc.zig +++ b/src/bun.js/ipc.zig @@ -309,7 +309,7 @@ const NamedPipeIPCData = struct { } }; -pub const IPCData = if (Environment.isWindows) NamedPipeIPCData else NamedPipeIPCData; +pub const IPCData = if (Environment.isWindows) NamedPipeIPCData else SocketIPCData; fn NewSocketIPCHandler(comptime Context: type) type { return struct { From 42f71a77365d3dc6e1c1c36453e451201df9732f Mon Sep 17 00:00:00 2001 From: cirospaciari Date: Fri, 26 Jan 2024 02:29:43 -0300 Subject: [PATCH 04/12] we need this --- src/bun.js/ipc.zig | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/bun.js/ipc.zig b/src/bun.js/ipc.zig index 9448769bfbaad9..8498dd6b0a5bcf 100644 --- a/src/bun.js/ipc.zig +++ b/src/bun.js/ipc.zig @@ -519,6 +519,10 @@ fn NewNamedPipeIPCHandler(comptime Context: type) type { while (true) { const result = decodeIPCMessage(slice, globalThis) catch |e| switch (e) { error.NotEnoughBytes => { + // copy the remaining bytes to the start of the buffer + bun.copy(u8, this.ipc.incoming.ptr[0..slice.len], slice); + this.ipc.incoming.len = @truncate(slice.len); + log("hit NotEnoughBytes2", .{}); return; }, error.InvalidFormat => { From 489ff14e4c8a9f6bdb497b4ae2288fe93010754d Mon Sep 17 00:00:00 2001 From: cirospaciari Date: Fri, 26 Jan 2024 12:00:36 -0300 Subject: [PATCH 05/12] WIP mixins --- src/bun.js/api/bun/subprocess.zig | 49 +++++---- src/bun.js/ipc.zig | 93 ++++++++-------- src/bun.js/javascript.zig | 15 +-- src/bun.js/webcore/blob.zig | 9 +- src/deps/libuv.zig | 172 ++++++++++++++++++++++++++++-- src/sys.zig | 8 ++ 6 files changed, 253 insertions(+), 93 deletions(-) diff --git a/src/bun.js/api/bun/subprocess.zig b/src/bun.js/api/bun/subprocess.zig index 49f0fac92ed1ad..b2cfc0684e3ee3 100644 --- a/src/bun.js/api/bun/subprocess.zig +++ b/src/bun.js/api/bun/subprocess.zig @@ -411,7 +411,7 @@ pub const Subprocess = struct { .pipe => { if (this.pipe == .buffer) { if (Environment.isWindows) { - uv.uv_ref(@ptrCast(&this.pipe.buffer.stream)); + this.pipe.buffer.stream.unref(); return; } if (this.pipe.buffer.stream.poll_ref) |poll| { @@ -428,7 +428,7 @@ pub const Subprocess = struct { .pipe => { if (this.pipe == .buffer) { if (Environment.isWindows) { - uv.uv_unref(@ptrCast(&this.pipe.buffer.stream)); + this.pipe.buffer.stream.unref(); return; } if (this.pipe.buffer.stream.poll_ref) |poll| { @@ -554,7 +554,7 @@ pub const Subprocess = struct { switch (this.*) { .pipe => { if (Environment.isWindows) { - if (uv.uv_is_closed(@ptrCast(this.pipe.buffer.stream))) { + if (this.pipe.buffer.stream.isClosed()) { return false; } this.pipe.buffer.closeCallback = callback; @@ -958,7 +958,7 @@ pub const Subprocess = struct { if (this.pipe) |pipe| { pipe.data = this; - _ = uv.uv_close(@ptrCast(pipe), BufferedPipeInput.uvClosedCallback); + pipe.close(BufferedPipeInput.uvClosedCallback); } } @@ -1242,7 +1242,7 @@ pub const Subprocess = struct { } } - fn uvStreamReadCallback(handle: *uv.uv_handle_t, nread: isize, buffer: *const uv.uv_buf_t) callconv(.C) void { + fn uvStreamReadCallback(handle: *uv.uv_stream_t, nread: isize, buffer: *const uv.uv_buf_t) callconv(.C) void { const this: *BufferedOutput = @ptrCast(@alignCast(handle.data)); if (nread <= 0) { switch (nread) { @@ -1252,7 +1252,7 @@ pub const Subprocess = struct { }, uv.UV_EOF => { this.status = .{ .done = {} }; - _ = uv.uv_read_stop(@ptrCast(handle)); + handle.readStop(); this.flushBufferedDataIntoReadableStream(); }, else => { @@ -1261,7 +1261,7 @@ pub const Subprocess = struct { }; const err = rt.errEnum() orelse bun.C.E.CANCELED; this.status = .{ .err = bun.sys.Error.fromCode(err, .read) }; - _ = uv.uv_read_stop(@ptrCast(handle)); + handle.readStop(); this.signalStreamError(); }, } @@ -1274,7 +1274,7 @@ pub const Subprocess = struct { this.flushBufferedDataIntoReadableStream(); } - fn uvStreamAllocCallback(handle: *uv.uv_handle_t, suggested_size: usize, buffer: *uv.uv_buf_t) callconv(.C) void { + fn uvStreamAllocCallback(handle: *uv.uv_stream_t, suggested_size: usize, buffer: *uv.uv_buf_t) callconv(.C) void { const this: *BufferedOutput = @ptrCast(@alignCast(handle.data)); var size: usize = 0; var available = this.internal_buffer.available(); @@ -1296,7 +1296,7 @@ pub const Subprocess = struct { } buffer.* = .{ .base = @ptrCast(available.ptr), .len = @intCast(size) }; if (size == 0) { - _ = uv.uv_read_stop(@ptrCast(@alignCast(handle))); + handle.readStop(); this.status = .{ .done = {} }; } } @@ -1305,7 +1305,7 @@ pub const Subprocess = struct { if (Environment.isWindows) { if (this.status == .pending) { this.stream.data = this; - _ = uv.uv_read_start(@ptrCast(this.stream), BufferedOutput.uvStreamAllocCallback, BufferedOutput.uvStreamReadCallback); + _ = this.stream.readStart(BufferedOutput.uvStreamAllocCallback, BufferedOutput.uvStreamReadCallback); } return; } @@ -1557,12 +1557,12 @@ pub const Subprocess = struct { .pending => { if (Environment.isWindows) { needCallbackCall = false; - _ = uv.uv_read_stop(@ptrCast(&this.stream)); - if (uv.uv_is_closed(@ptrCast(&this.stream))) { + this.stream.readStop(); + if (this.stream.isClosed()) { this.readable_stream_ref.deinit(); this.closeCallback.run(); } else { - _ = uv.uv_close(@ptrCast(&this.stream), BufferedOutput.uvClosedCallback); + this.stream.close(BufferedOutput.uvClosedCallback); } } else { this.stream.close(); @@ -1602,7 +1602,9 @@ pub const Subprocess = struct { switch (this.*) { .pipe => { if (Environment.isWindows) { - _ = uv.uv_ref(@ptrCast(this.pipe.stream)); + if (this.pipe.stream) |stream| { + stream.unref(); + } } else if (this.pipe.poll_ref) |poll| { poll.enableKeepingProcessAlive(JSC.VirtualMachine.get()); } @@ -1615,7 +1617,9 @@ pub const Subprocess = struct { switch (this.*) { .pipe => { if (Environment.isWindows) { - _ = uv.uv_unref(@ptrCast(this.pipe.stream)); + if (this.pipe.stream) |stream| { + stream.unref(); + } } else if (this.pipe.poll_ref) |poll| { poll.disableKeepingProcessAlive(JSC.VirtualMachine.get()); } @@ -2259,11 +2263,9 @@ pub const Subprocess = struct { }; subprocess.ipc = .{ .pipe = std.mem.zeroes(uv.uv_pipe_t) }; if (ipc_mode != .none) { - const errno = subprocess.ipc.configureServer(Subprocess, subprocess, pipe_name_bytes); - if (errno != 0) { + if (subprocess.ipc.configureServer(Subprocess, subprocess, pipe_name_bytes).asErr()) |err| { alloc.destroy(subprocess); - globalThis.throwValue(bun.sys.Error.fromCodeInt(errno, .uv_spawn).toJSC(globalThis)); - return .zero; + globalThis.throwValue(err.toJSC(globalThis)); } } @@ -3203,14 +3205,11 @@ pub const Subprocess = struct { ) !uv.uv_stdio_container_s { return switch (stdio) { .array_buffer, .blob, .pipe => { - if (uv.uv_pipe_init(uv.Loop.get(), pipe, 0) != 0) { - return error.FailedToCreatePipe; - } + try pipe.init(uv.Loop.get(), false).unwrap(); + if (fd != bun.invalid_fd) { // we receive a FD so we open this into our pipe - if (uv.uv_pipe_open(pipe, bun.uvfdcast(fd)).errEnum()) |_| { - return error.FailedToCreatePipe; - } + try pipe.open(bun.uvfdcast(fd)).unwrap(); return uv.uv_stdio_container_s{ .flags = @intCast(uv.UV_INHERIT_STREAM), .data = .{ .stream = @ptrCast(pipe) }, diff --git a/src/bun.js/ipc.zig b/src/bun.js/ipc.zig index 8498dd6b0a5bcf..21c302a26fbc2a 100644 --- a/src/bun.js/ipc.zig +++ b/src/bun.js/ipc.zig @@ -11,6 +11,7 @@ const Allocator = std.mem.Allocator; const JSC = @import("root").bun.JSC; const JSValue = JSC.JSValue; const JSGlobalObject = JSC.JSGlobalObject; +const Maybe = JSC.Maybe; pub const log = Output.scoped(.IPC, false); @@ -162,11 +163,12 @@ const NamedPipeIPCData = struct { pipe: uv.uv_pipe_t, incoming: bun.ByteList = .{}, // Maybe we should use IPCBuffer here as well outgoing: IPCBuffer = .{}, + current_payload_len: usize = 0, + connected: bool = false, has_written_version: if (Environment.allow_assert) u1 else u0 = 0, connect_req: uv.uv_connect_t = std.mem.zeroes(uv.uv_connect_t), server: uv.uv_pipe_t = std.mem.zeroes(uv.uv_pipe_t), - current_payload_len: usize = 0, pub fn processSend(this: *NamedPipeIPCData) void { const bytes = this.outgoing.list.slice()[this.outgoing.cursor..]; @@ -178,16 +180,16 @@ const NamedPipeIPCData = struct { req.write_buffer = uv.uv_buf_t.init(bytes); log("processSend write_buffer {d}", .{req.write_buffer.len}); this.current_payload_len = bytes.len; - const write_err = uv.uv_write(req, @ptrCast(&this.pipe), @ptrCast(&req.write_buffer), 1, NamedPipeIPCData.uvWriteCallback).int(); + const write_err = uv.uv_write(req, @ptrCast(&this.pipe), @ptrCast(&req.write_buffer), 1, NamedPipeIPCData.onWriteCallback).int(); if (write_err < 0) { Output.printErrorln("Failed write IPC version", .{}); return; } } - fn uvWriteCallback(req: *uv.uv_write_t, status: uv.ReturnCode) callconv(.C) void { + fn onWriteCallback(req: *uv.uv_write_t, status: uv.ReturnCode) callconv(.C) void { const this = bun.cast(*NamedPipeIPCData, req.data); - log("uvWriteCallback {d} {d} {d}", .{ status.int(), this.current_payload_len, this.outgoing.list.len }); + log("onWriteCallback {d} {d} {d}", .{ status.int(), this.current_payload_len, this.outgoing.list.len }); defer bun.destroy(req); if (status.errEnum()) |_| { Output.printErrorln("Failed write IPC data", .{}); @@ -258,54 +260,41 @@ const NamedPipeIPCData = struct { pub fn close(this: *NamedPipeIPCData, comptime Context: type) void { if (this.server.loop != null) { - _ = uv.uv_close(@ptrCast(&this.pipe), NewNamedPipeIPCHandler(Context).onServerClose); + this.server.close(NewNamedPipeIPCHandler(Context).onServerClose); } else { - _ = uv.uv_close(@ptrCast(&this.pipe), NewNamedPipeIPCHandler(Context).onClose); + this.pipe.close(NewNamedPipeIPCHandler(Context).onClose); } } - pub fn configureServer(this: *NamedPipeIPCData, comptime Context: type, instance: *Context, named_pipe: []const u8) c_int { + pub fn configureServer(this: *NamedPipeIPCData, comptime Context: type, instance: *Context, named_pipe: []const u8) Maybe(void) { log("configureServer", .{}); const ipc_pipe = &this.server; - var errno = uv.uv_pipe_init(uv.Loop.get(), ipc_pipe, 0); - if (errno != 0) { - return errno; + if (ipc_pipe.init(uv.Loop.get(), false).asErr()) |err| { + return .{ .err = err }; } ipc_pipe.data = @ptrCast(instance); - errno = uv.uv_pipe_bind2(ipc_pipe, named_pipe.ptr, named_pipe.len, 0); - if (errno != 0) { - return errno; - } - errno = uv.uv_listen(@ptrCast(ipc_pipe), 0, NewNamedPipeIPCHandler(Context).onNewClientConnect); - if (errno != 0) { - return errno; + if (ipc_pipe.listenNamedPipe(named_pipe, 0, NewNamedPipeIPCHandler(Context).onNewClientConnect).asErr()) |err| { + return .{ .err = err }; } - uv.uv_pipe_pending_instances(ipc_pipe, 1); + ipc_pipe.setPendingInstances(1); - uv.uv_unref(@ptrCast(ipc_pipe)); + ipc_pipe.unref(); this.writeVersionPacket(); - return 0; + return .{ .result = {} }; } - pub fn configureClient(this: *NamedPipeIPCData, comptime Context: type, instance: *Context, named_pipe: []const u8) c_int { + pub fn configureClient(this: *NamedPipeIPCData, comptime Context: type, instance: *Context, named_pipe: []const u8) !void { log("configureClient", .{}); const ipc_pipe = &this.pipe; - var errno = uv.uv_pipe_init(uv.Loop.get(), ipc_pipe, 1); - if (errno != 0) { - return errno; - } + try ipc_pipe.init(uv.Loop.get(), true).unwrap(); ipc_pipe.data = @ptrCast(instance); this.connect_req.data = @ptrCast(instance); - errno = uv.uv_pipe_connect2(&this.connect_req, ipc_pipe, named_pipe.ptr, named_pipe.len, 0, NewNamedPipeIPCHandler(Context).onConnect); - if (errno != 0) { - return errno; - } + try ipc_pipe.connect(&this.connect_req, named_pipe, NewNamedPipeIPCHandler(Context).onConnect).unwrap(); this.writeVersionPacket(); - return 0; } }; @@ -467,7 +456,7 @@ fn NewSocketIPCHandler(comptime Context: type) type { fn NewNamedPipeIPCHandler(comptime Context: type) type { const uv = bun.windows.libuv; return struct { - fn uvStreamAllocCallback(handle: *uv.uv_handle_t, suggested_size: usize, buffer: *uv.uv_buf_t) callconv(.C) void { + fn onStreamAlloc(handle: *uv.uv_stream_t, suggested_size: usize, buffer: *uv.uv_buf_t) callconv(.C) void { const this: *Context = @ptrCast(@alignCast(handle.data)); var available = this.ipc.incoming.available(); @@ -475,12 +464,12 @@ fn NewNamedPipeIPCHandler(comptime Context: type) type { this.ipc.incoming.ensureUnusedCapacity(bun.default_allocator, suggested_size) catch bun.outOfMemory(); available = this.ipc.incoming.available(); } - log("uvStreamAllocCallback {d}", .{suggested_size}); + log("onStreamAlloc {d}", .{suggested_size}); buffer.* = .{ .base = @ptrCast(available.ptr), .len = @intCast(suggested_size) }; } - fn uvStreamReadCallback(handle: *uv.uv_handle_t, nread: isize, buffer: *const uv.uv_buf_t) callconv(.C) void { - log("uvStreamReadCallback {d}", .{nread}); + fn onRead(handle: *uv.uv_stream_t, nread: isize, buffer: *const uv.uv_buf_t) callconv(.C) void { + log("onRead {d}", .{nread}); const this: *Context = @ptrCast(@alignCast(handle.data)); if (nread <= 0) { switch (nread) { @@ -489,11 +478,11 @@ fn NewNamedPipeIPCHandler(comptime Context: type) type { return; }, uv.UV_EOF => { - _ = uv.uv_read_stop(@ptrCast(handle)); + handle.readStop(); this.ipc.close(Context); }, else => { - _ = uv.uv_read_stop(@ptrCast(handle)); + handle.readStop(); this.ipc.close(Context); }, } @@ -554,22 +543,26 @@ fn NewNamedPipeIPCHandler(comptime Context: type) type { const this = bun.cast(*Context, req.data); const client = &this.ipc.pipe; const server = &this.ipc.server; - if (uv.uv_pipe_init(uv.Loop.get(), client, 1) != 0) { + client.init(uv.Loop.get(), true).unwrap() catch { Output.printErrorln("Failed to connect IPC pipe", .{}); return; - } + }; client.data = server.data; - if (uv.uv_accept(@ptrCast(server), @ptrCast(client)) == 0) { - if (this.ipc.connected) { + switch (server.accept(client)) { + .err => { this.ipc.close(Context); return; - } - this.ipc.connected = true; - _ = uv.uv_read_start(@ptrCast(client), uvStreamAllocCallback, uvStreamReadCallback); - this.ipc.processSend(); - } else { - this.ipc.close(Context); + }, + .result => { + this.ipc.connected = true; + client.readStart(onStreamAlloc, onRead).unwrap() catch { + this.ipc.close(Context); + Output.printErrorln("Failed to connect IPC pipe", .{}); + return; + }; + this.ipc.processSend(); + }, } } pub fn onConnect(req: *uv.uv_connect_t, status: c_int) callconv(.C) void { @@ -579,7 +572,11 @@ fn NewNamedPipeIPCHandler(comptime Context: type) type { return; } const this = bun.cast(*Context, req.data); - _ = uv.uv_read_start(@ptrCast(&this.ipc.pipe), uvStreamAllocCallback, uvStreamReadCallback); + this.ipc.pipe.readStart(onStreamAlloc, onRead).unwrap() catch { + this.ipc.close(Context); + Output.printErrorln("Failed to connect IPC pipe", .{}); + return; + }; this.ipc.connected = true; this.ipc.processSend(); } @@ -595,7 +592,7 @@ fn NewNamedPipeIPCHandler(comptime Context: type) type { const event = bun.cast(*uv.uv_pipe_t, handler); const this = bun.cast(*Context, event.data); if (this.ipc.server.loop != null) { - _ = uv.uv_close(@ptrCast(&this.ipc.server), onServerClose); + this.ipc.server.close(onServerClose); return; } this.handleIPCClose(); diff --git a/src/bun.js/javascript.zig b/src/bun.js/javascript.zig index ff28f6bb02f48a..82d3843a54a78c 100644 --- a/src/bun.js/javascript.zig +++ b/src/bun.js/javascript.zig @@ -3016,6 +3016,8 @@ pub const VirtualMachine = struct { ipc: IPC.IPCData, + pub usingnamespace bun.New(@This()); + pub fn handleIPCMessage( this: *IPCInstance, message: IPC.DecodedIPCMessage, @@ -3057,16 +3059,17 @@ pub const VirtualMachine = struct { this.event_loop.ensureWaker(); if (Environment.isWindows) { - var instance = bun.default_allocator.create(IPCInstance) catch bun.outOfMemory(); - instance.* = .{ + var instance = IPCInstance.new(.{ .globalThis = this.global, .context = 0, .ipc = .{ .pipe = std.mem.zeroes(uv.uv_pipe_t) }, + }); + instance.ipc.configureClient(IPCInstance, instance, source) catch { + instance.destroy(); + Output.printErrorln("Unable to start IPC pipe", .{}); + return; }; - const errno = instance.ipc.configureClient(IPCInstance, instance, source); - if (errno != 0) { - @panic("Unable to start IPC"); - } + this.ipc = instance; return; } diff --git a/src/bun.js/webcore/blob.zig b/src/bun.js/webcore/blob.zig index 85a7dc2c87f3a2..0489803cfb675f 100644 --- a/src/bun.js/webcore/blob.zig +++ b/src/bun.js/webcore/blob.zig @@ -2959,17 +2959,18 @@ pub const Blob = struct { var pipe_ptr = &(this.store.?.data.file.pipe); if (store.data.file.pipe.loop == null) { - if (libuv.uv_pipe_init(libuv.Loop.get(), pipe_ptr, 0) != 0) { + pipe_ptr.init(libuv.Loop.get(), false).unwrap() catch { pipe_ptr.loop = null; globalThis.throwInvalidArguments("Failed to create UVStreamSink", .{}); return JSValue.jsUndefined(); - } + }; + const file_fd = bun.uvfdcast(fd); - if (libuv.uv_pipe_open(pipe_ptr, file_fd).errEnum()) |err| { + pipe_ptr.open(file_fd).unwrap() catch |err| { pipe_ptr.loop = null; globalThis.throwInvalidArguments("Failed to create UVStreamSink: uv_pipe_open({d}) {}", .{ file_fd, err }); return JSValue.jsUndefined(); - } + }; } var sink = JSC.WebCore.UVStreamSink.init(globalThis.allocator(), @ptrCast(pipe_ptr), null) catch |err| { diff --git a/src/deps/libuv.zig b/src/deps/libuv.zig index bdb3eb3dfaed2b..6b82d6dc1d2a9f 100644 --- a/src/deps/libuv.zig +++ b/src/deps/libuv.zig @@ -1,4 +1,5 @@ const bun = @import("root").bun; +const Maybe = bun.JSC.Maybe; const WORD = c_ushort; const LARGE_INTEGER = i64; @@ -279,7 +280,6 @@ pub const UV_IF_NAMESIZE = @as(c_int, 16) + @as(c_int, 1); pub const uv__queue = struct_uv__queue; pub const uv_req_s = struct_uv_req_s; -pub const uv_handle_s = Handle; pub const uv_prepare_s = struct_uv_prepare_s; pub const uv_check_s = struct_uv_check_s; pub const uv_idle_s = struct_uv_idle_s; @@ -438,6 +438,10 @@ fn HandleMixin(comptime Type: type) type { pub fn isActive(this: *const Type) bool { return uv_is_active(@ptrCast(this)) != 0; } + + pub fn isClosed(this: *const Type) bool { + return uv_is_closed(@ptrCast(this)); + } }; } @@ -458,7 +462,104 @@ fn ReqMixin(comptime Type: type) type { } }; } -pub const uv_handle_t = Handle; + +// https://docs.libuv.org/en/v1.x/stream.html +fn StreamMixin(comptime Type: type) type { + return struct { + + // pub extern fn uv_write(req: *uv_write_t, handle: *uv_stream_t, bufs: [*]const uv_buf_t, nbufs: c_uint, cb: uv_write_cb) ReturnCode; + // pub extern fn uv_write2(req: *uv_write_t, handle: *uv_stream_t, bufs: [*]const uv_buf_t, nbufs: c_uint, send_handle: *uv_stream_t, cb: uv_write_cb) ReturnCode; + pub fn getWriteQueueSize(this: *Type) usize { + return uv_stream_get_write_queue_size(@ptrCast(this)); + } + + pub fn listen(this: *Type, backlog: i32, cb: uv_connection_cb) Maybe(void) { + const rc = uv_listen(@ptrCast(this), backlog, cb); + if (rc.errno()) |errno| { + return .{ .err = .{ .errno = errno, .syscall = .listen, .from_libuv = true } }; + } + return .{ .result = {} }; + } + + pub fn accept(this: *Type, client: *Type) Maybe(void) { + const rc = uv_accept(@ptrCast(this), @ptrCast(client)); + if (rc.errno()) |errno| { + return .{ .err = .{ .errno = errno, .syscall = .accept, .from_libuv = true } }; + } + return .{ .result = {} }; + } + + pub const stream_read_cb = ?*const fn (*uv_stream_t, isize, *const uv_buf_t) callconv(.C) void; + pub const stream_alloc_cb = ?*const fn (*uv_stream_t, usize, *uv_buf_t) callconv(.C) void; + + pub fn readStart(this: *Type, alloc_cb: stream_alloc_cb, read_cb: stream_read_cb) Maybe(void) { + const rc = uv_read_start(@ptrCast(this), @ptrCast(alloc_cb), @ptrCast(read_cb)); + if (rc.errno()) |errno| { + return .{ .err = .{ .errno = errno, .syscall = .listen, .from_libuv = true } }; + } + return .{ .result = {} }; + } + + pub fn readStop(this: *Type) void { + // always succeed see https://docs.libuv.org/en/v1.x/stream.html#c.uv_read_stop + _ = uv_read_stop(@ptrCast(this)); + } + + pub fn tryWrite(this: *Type, buffer: *uv_buf_t) Maybe(usize) { + const rc = uv_try_write(@ptrCast(this), @ptrCast(buffer), 1); + if (rc.errno()) |errno| { + return .{ .err = .{ .errno = errno, .syscall = .try_write, .from_libuv = true } }; + } + return .{ .result = @intCast(rc.int()) }; + } + + pub fn tryWrite2(this: *Type, buffer: *uv_buf_t, send_handle: *uv_stream_t) ReturnCode { + const rc = uv_try_write2(@ptrCast(this), @ptrCast(buffer), 1, send_handle); + if (rc.errno()) |errno| { + return .{ .err = .{ .errno = errno, .syscall = .try_write2, .from_libuv = true } }; + } + return .{ .result = @intCast(rc.int()) }; + } + + pub fn tryWriteMany(this: *Type, buffers: []uv_buf_t) Maybe(usize) { + const rc = uv_try_write(@ptrCast(this), @ptrCast(buffers.ptr), @intCast(buffers.len)); + if (rc.errno()) |errno| { + return .{ .err = .{ .errno = errno, .syscall = .try_write, .from_libuv = true } }; + } + return .{ .result = @intCast(rc.int()) }; + } + + pub fn tryWriteMany2(this: *Type, buffers: []uv_buf_t, send_handle: *uv_stream_t) Maybe(usize) { + const rc = uv_try_write(@ptrCast(this), @ptrCast(buffers.ptr), @intCast(buffers.len), send_handle); + if (rc.errno()) |errno| { + return .{ .err = .{ .errno = errno, .syscall = .try_write2, .from_libuv = true } }; + } + return .{ .result = @intCast(rc.int()) }; + } + + pub fn isReadable(this: *Type) bool { + return uv_is_readable(@ptrCast(this)) != 0; + } + + pub fn isWritable(this: *@This()) bool { + return uv_is_writable(@ptrCast(this)) != 0; + } + }; +} + +pub const uv_handle_s = extern struct { + data: ?*anyopaque, + loop: ?*uv_loop_t, + type: uv_handle_type, + close_cb: uv_close_cb, + handle_queue: struct_uv__queue, + u: union_unnamed_378, + endgame_next: [*c]uv_handle_t, + flags: c_uint, + + pub usingnamespace HandleMixin(@This()); +}; +pub const uv_handle_t = uv_handle_s; const union_unnamed_375 = extern union { fd: c_int, reserved: [4]?*anyopaque, @@ -895,7 +996,7 @@ const union_unnamed_380 = extern union { }; pub const uv_alloc_cb = ?*const fn (*uv_handle_t, usize, *uv_buf_t) callconv(.C) void; pub const uv_stream_t = struct_uv_stream_s; -/// *uv.uv_handle_t is actually *uv_stream_t, just changed to avoid dependency loop error on Zig +/// *uv_handle_t is actually *uv_stream_t, just changed to avoid dependency loop error on Zig pub const uv_read_cb = ?*const fn (*uv_handle_t, isize, *const uv_buf_t) callconv(.C) void; const struct_unnamed_382 = extern struct { overlapped: OVERLAPPED, @@ -974,6 +1075,9 @@ pub const struct_uv_stream_s = extern struct { activecnt: c_int, read_req: uv_read_t, stream: union_unnamed_384, + + pub usingnamespace HandleMixin(@This()); + pub usingnamespace StreamMixin(@This()); }; const union_unnamed_390 = extern union { fd: c_int, @@ -1216,6 +1320,54 @@ pub const struct_uv_pipe_s = extern struct { handle: HANDLE, name: [*]WCHAR, pipe: union_unnamed_405, + + pub usingnamespace HandleMixin(@This()); + pub usingnamespace StreamMixin(@This()); + + pub fn init(this: *@This(), loop: *Loop, isIPC: bool) Maybe(void) { + @memset(std.mem.asBytes(this), 0); + + const rc = uv_pipe_init(loop, this, if (isIPC) 1 else 0); + if (rc.errno()) |errno| { + return .{ .err = .{ .errno = errno, .syscall = .pipe, .from_libuv = true } }; + } + return .{ .result = {} }; + } + + pub fn listenNamedPipe(this: *@This(), named_pipe: []const u8, backlog: i32, onClientConnect: uv_connection_cb) Maybe(void) { + if (this.bind(named_pipe, 0).asErr()) |err| { + return .{ .err = err }; + } + return this.listen(backlog, onClientConnect); + } + + pub fn bind(this: *@This(), named_pipe: []const u8, flags: i32) Maybe(void) { + const rc = uv_pipe_bind2(this, named_pipe.ptr, named_pipe.len, @intCast(flags)); + if (rc.errno()) |errno| { + return .{ .err = .{ .errno = errno, .syscall = .bind2, .from_libuv = true } }; + } + return .{ .result = {} }; + } + + pub fn connect(this: *@This(), req: *uv_connect_t, name: []const u8, cb: uv_connect_cb) Maybe(void) { + const rc = uv_pipe_connect2(req, this, @ptrCast(name.ptr), name.len, 0, cb); + if (rc.errno()) |errno| { + return .{ .err = .{ .errno = errno, .syscall = .connect2, .from_libuv = true } }; + } + return .{ .result = {} }; + } + + pub fn open(this: *@This(), file: uv_file) Maybe(void) { + const rc = uv_pipe_open(this, file); + if (rc.errno()) |errno| { + return .{ .err = .{ .errno = errno, .syscall = .open, .from_libuv = true } }; + } + return .{ .result = {} }; + } + + pub fn setPendingInstances(this: *@This(), count: i32) void { + uv_pipe_pending_instances(this, count); + } }; pub const uv_pipe_t = struct_uv_pipe_s; const union_unnamed_416 = extern union { @@ -1907,14 +2059,14 @@ pub extern fn uv_buf_init(base: [*]u8, len: c_uint) uv_buf_t; pub extern fn uv_pipe(fds: *[2]uv_file, read_flags: c_int, write_flags: c_int) ReturnCode; pub extern fn uv_socketpair(@"type": c_int, protocol: c_int, socket_vector: [*c]uv_os_sock_t, flags0: c_int, flags1: c_int) c_int; pub extern fn uv_stream_get_write_queue_size(stream: [*c]const uv_stream_t) usize; -pub extern fn uv_listen(stream: [*c]uv_stream_t, backlog: c_int, cb: uv_connection_cb) c_int; -pub extern fn uv_accept(server: [*c]uv_stream_t, client: [*c]uv_stream_t) c_int; -pub extern fn uv_read_start([*c]uv_stream_t, alloc_cb: uv_alloc_cb, read_cb: uv_read_cb) c_int; +pub extern fn uv_listen(stream: [*c]uv_stream_t, backlog: c_int, cb: uv_connection_cb) ReturnCode; +pub extern fn uv_accept(server: [*c]uv_stream_t, client: [*c]uv_stream_t) ReturnCode; +pub extern fn uv_read_start([*c]uv_stream_t, alloc_cb: uv_alloc_cb, read_cb: uv_read_cb) ReturnCode; pub extern fn uv_read_stop([*c]uv_stream_t) c_int; pub extern fn uv_write(req: *uv_write_t, handle: *uv_stream_t, bufs: [*]const uv_buf_t, nbufs: c_uint, cb: uv_write_cb) ReturnCode; pub extern fn uv_write2(req: *uv_write_t, handle: *uv_stream_t, bufs: [*]const uv_buf_t, nbufs: c_uint, send_handle: *uv_stream_t, cb: uv_write_cb) ReturnCode; pub extern fn uv_try_write(handle: *uv_stream_t, bufs: [*]const uv_buf_t, nbufs: c_uint) ReturnCode; -pub extern fn uv_try_write2(handle: *uv_stream_t, bufs: [*]const uv_buf_t, nbufs: c_uint, send_handle: *uv_stream_t) c_int; +pub extern fn uv_try_write2(handle: *uv_stream_t, bufs: [*]const uv_buf_t, nbufs: c_uint, send_handle: *uv_stream_t) ReturnCode; pub extern fn uv_is_readable(handle: *const uv_stream_t) c_int; pub extern fn uv_is_writable(handle: *const uv_stream_t) c_int; pub extern fn uv_stream_set_blocking(handle: *uv_stream_t, blocking: c_int) c_int; @@ -1977,12 +2129,12 @@ pub extern fn uv_tty_get_vterm_state(state: [*c]uv_tty_vtermstate_t) c_int; pub extern fn uv_guess_handle(file: uv_file) uv_handle_type; pub const UV_PIPE_NO_TRUNCATE: c_int = 1; const enum_unnamed_462 = c_uint; -pub extern fn uv_pipe_init(*uv_loop_t, handle: *uv_pipe_t, ipc: c_int) c_int; +pub extern fn uv_pipe_init(*uv_loop_t, handle: *uv_pipe_t, ipc: c_int) ReturnCode; pub extern fn uv_pipe_open([*c]uv_pipe_t, file: uv_file) ReturnCode; pub extern fn uv_pipe_bind(handle: *uv_pipe_t, name: [*]const u8) c_int; -pub extern fn uv_pipe_bind2(handle: *uv_pipe_t, name: [*]const u8, namelen: usize, flags: c_uint) c_int; +pub extern fn uv_pipe_bind2(handle: *uv_pipe_t, name: [*]const u8, namelen: usize, flags: c_uint) ReturnCode; pub extern fn uv_pipe_connect(req: [*c]uv_connect_t, handle: *uv_pipe_t, name: [*]const u8, cb: uv_connect_cb) void; -pub extern fn uv_pipe_connect2(req: [*c]uv_connect_t, handle: *uv_pipe_t, name: [*]const u8, namelen: usize, flags: c_uint, cb: uv_connect_cb) c_int; +pub extern fn uv_pipe_connect2(req: [*c]uv_connect_t, handle: *uv_pipe_t, name: [*]const u8, namelen: usize, flags: c_uint, cb: uv_connect_cb) ReturnCode; pub extern fn uv_pipe_getsockname(handle: *const uv_pipe_t, buffer: [*]u8, size: [*c]usize) c_int; pub extern fn uv_pipe_getpeername(handle: *const uv_pipe_t, buffer: [*]u8, size: [*c]usize) c_int; pub extern fn uv_pipe_pending_instances(handle: *uv_pipe_t, count: c_int) void; diff --git a/src/sys.zig b/src/sys.zig index f9783754521fff..8181c8955e81b6 100644 --- a/src/sys.zig +++ b/src/sys.zig @@ -126,6 +126,14 @@ pub const Tag = enum(u8) { uv_spawn, uv_pipe, pipe, + connect, + connect2, + accept, + bind, + bind2, + listen, + try_write, + try_write2, WriteFile, NtQueryDirectoryFile, From 11aaff9087c59d57285fb529f104453e5dc35597 Mon Sep 17 00:00:00 2001 From: cirospaciari Date: Fri, 26 Jan 2024 14:18:36 -0300 Subject: [PATCH 06/12] more wrappers --- src/bun.js/api/bun/subprocess.zig | 108 ++++++++----------- src/bun.js/ipc.zig | 168 ++++++++++++++++++------------ src/deps/libuv.zig | 144 +++++++++++++++---------- 3 files changed, 231 insertions(+), 189 deletions(-) diff --git a/src/bun.js/api/bun/subprocess.zig b/src/bun.js/api/bun/subprocess.zig index b2cfc0684e3ee3..2ec3f0e68ec097 100644 --- a/src/bun.js/api/bun/subprocess.zig +++ b/src/bun.js/api/bun/subprocess.zig @@ -847,7 +847,6 @@ pub const Subprocess = struct { pub const BufferedPipeInput = struct { remain: []const u8 = "", - input_buffer: uv.uv_buf_t = std.mem.zeroes(uv.uv_buf_t), write_req: uv.uv_write_t = std.mem.zeroes(uv.uv_write_t), pipe: ?*uv.uv_pipe_t, poll_ref: ?*Async.FilePoll = null, @@ -864,8 +863,7 @@ pub const Subprocess = struct { this.writeAllowBlocking(is_sync); } - pub fn uvWriteCallback(req: *uv.uv_write_t, status: uv.ReturnCode) callconv(.C) void { - const this = bun.cast(*BufferedPipeInput, req.data); + pub fn onWrite(this: *BufferedPipeInput, status: uv.ReturnCode) void { if (this.pipe == null) return; if (status.errEnum()) |_| { log("uv_write({d}) fail: {d}", .{ this.remain.len, status.int() }); @@ -884,7 +882,6 @@ pub const Subprocess = struct { var to_write = this.remain; - this.input_buffer = uv.uv_buf_t.init(to_write); if (allow_blocking) { while (true) { if (to_write.len == 0) { @@ -892,35 +889,33 @@ pub const Subprocess = struct { this.close(); return; } - const status = uv.uv_try_write(@ptrCast(pipe), @ptrCast(&this.input_buffer), 1); - if (status.errEnum()) |err| { - if (err == bun.C.E.AGAIN) { - //EAGAIN - this.write_req.data = this; - const write_err = uv.uv_write(&this.write_req, @ptrCast(pipe), @ptrCast(&this.input_buffer), 1, BufferedPipeInput.uvWriteCallback).int(); - if (write_err < 0) { - log("uv_write({d}) fail: {d}", .{ this.remain.len, write_err }); - this.deinit(); + switch (pipe.tryWrite(to_write)) { + .err => |err| { + const errno = err.getErrno(); + if (errno == bun.C.E.AGAIN) { + //EAGAIN + this.write_req.write(@ptrCast(pipe), to_write, this, BufferedPipeInput.onWrite).unwrap() catch |write_err| { + log("uv_write({d}) fail: {}", .{ this.remain.len, write_err }); + this.deinit(); + }; + return; } + log("uv_try_write({d}) fail: {}", .{ to_write.len, errno }); + this.deinit(); return; - } - // fail - log("uv_try_write({d}) fail: {d}", .{ to_write.len, status.int() }); - this.deinit(); - return; + }, + .result => |bytes_written| { + this.written += bytes_written; + this.remain = this.remain[@min(bytes_written, this.remain.len)..]; + to_write = to_write[bytes_written..]; + }, } - const bytes_written: usize = @intCast(status.int()); - this.written += bytes_written; - this.remain = this.remain[@min(bytes_written, this.remain.len)..]; - to_write = to_write[bytes_written..]; } } else { - this.write_req.data = this; - const err = uv.uv_write(&this.write_req, @ptrCast(pipe), @ptrCast(&this.input_buffer), 1, BufferedPipeInput.uvWriteCallback).int(); - if (err < 0) { - log("uv_write({d}) fail: {d}", .{ this.remain.len, err }); + this.write_req.write(@ptrCast(pipe), to_write, this, BufferedPipeInput.onWrite).unwrap() catch |err| { + log("uv_write({d}) fail: {}", .{ this.remain.len, err }); this.deinit(); - } + }; } } @@ -942,9 +937,7 @@ pub const Subprocess = struct { } } - fn uvClosedCallback(handler: *anyopaque) callconv(.C) void { - const event = bun.cast(*uv.uv_pipe_t, handler); - var this = bun.cast(*BufferedPipeInput, event.data); + fn uvClosedCallback(this: *BufferedPipeInput) void { if (this.deinit_onclose) { this.destroy(); } @@ -957,8 +950,7 @@ pub const Subprocess = struct { } if (this.pipe) |pipe| { - pipe.data = this; - pipe.close(BufferedPipeInput.uvClosedCallback); + pipe.close(this, BufferedPipeInput.uvClosedCallback); } } @@ -1242,40 +1234,22 @@ pub const Subprocess = struct { } } - fn uvStreamReadCallback(handle: *uv.uv_stream_t, nread: isize, buffer: *const uv.uv_buf_t) callconv(.C) void { - const this: *BufferedOutput = @ptrCast(@alignCast(handle.data)); - if (nread <= 0) { - switch (nread) { - 0 => { - // EAGAIN or EWOULDBLOCK - return; - }, - uv.UV_EOF => { - this.status = .{ .done = {} }; - handle.readStop(); - this.flushBufferedDataIntoReadableStream(); - }, - else => { - const rt = uv.ReturnCodeI64{ - .value = @intCast(nread), - }; - const err = rt.errEnum() orelse bun.C.E.CANCELED; - this.status = .{ .err = bun.sys.Error.fromCode(err, .read) }; - handle.readStop(); - this.signalStreamError(); - }, - } - - // when nread < 0 buffer maybe not point to a valid address - return; + fn onReadError(this: *BufferedOutput, err: bun.C.E) void { + if (err == bun.C.E.OF) { + this.status = .{ .done = {} }; + this.flushBufferedDataIntoReadableStream(); + } else { + this.status = .{ .err = bun.sys.Error.fromCode(err, .read) }; + this.signalStreamError(); } + } + fn onStreamRead(this: *BufferedOutput, buffer: []const u8) void { this.internal_buffer.len += @as(u32, @truncate(buffer.len)); this.flushBufferedDataIntoReadableStream(); } - fn uvStreamAllocCallback(handle: *uv.uv_stream_t, suggested_size: usize, buffer: *uv.uv_buf_t) callconv(.C) void { - const this: *BufferedOutput = @ptrCast(@alignCast(handle.data)); + fn onReadAlloc(this: *BufferedOutput, suggested_size: usize) []u8 { var size: usize = 0; var available = this.internal_buffer.available(); if (this.auto_sizer) |auto_sizer| { @@ -1294,18 +1268,20 @@ pub const Subprocess = struct { size = suggested_size; } } - buffer.* = .{ .base = @ptrCast(available.ptr), .len = @intCast(size) }; + if (size == 0) { - handle.readStop(); + this.stream.readStop(); this.status = .{ .done = {} }; + return ""; } + return available.ptr[0..@intCast(size)]; } pub fn readAll(this: *BufferedOutput) void { if (Environment.isWindows) { if (this.status == .pending) { this.stream.data = this; - _ = this.stream.readStart(BufferedOutput.uvStreamAllocCallback, BufferedOutput.uvStreamReadCallback); + _ = this.stream.readStart(this, onReadAlloc, onReadError, onStreamRead); } return; } @@ -1543,9 +1519,7 @@ pub const Subprocess = struct { } } - fn uvClosedCallback(handler: *anyopaque) callconv(.C) void { - const event = bun.cast(*uv.uv_pipe_t, handler); - var this = bun.cast(*BufferedOutput, event.data); + fn uvClosedCallback(this: *BufferedOutput) void { this.readable_stream_ref.deinit(); this.closeCallback.run(); } @@ -1562,7 +1536,7 @@ pub const Subprocess = struct { this.readable_stream_ref.deinit(); this.closeCallback.run(); } else { - this.stream.close(BufferedOutput.uvClosedCallback); + this.stream.close(this, BufferedOutput.uvClosedCallback); } } else { this.stream.close(); diff --git a/src/bun.js/ipc.zig b/src/bun.js/ipc.zig index 21c302a26fbc2a..7228004893ae32 100644 --- a/src/bun.js/ipc.zig +++ b/src/bun.js/ipc.zig @@ -39,6 +39,24 @@ pub const IPCMessageType = enum(u8) { pub const IPCBuffer = struct { list: bun.ByteList = .{}, cursor: u32 = 0, + + pub fn reset(this: *IPCBuffer) void { + this.cursor = 0; + this.list.len = 0; + } + + pub fn size(this: *const IPCBuffer) usize { + return this.list.len - this.cursor; + } + + pub fn slice(this: *IPCBuffer) []const u8 { + return this.list.slice()[this.cursor..]; + } + + pub fn deinit(this: *IPCBuffer) void { + this.reset(); + this.list.deinitWithAllocator(bun.default_allocator); + } }; /// Given potentially unfinished buffer `data`, attempt to decode and process a message from it. @@ -163,7 +181,8 @@ const NamedPipeIPCData = struct { pipe: uv.uv_pipe_t, incoming: bun.ByteList = .{}, // Maybe we should use IPCBuffer here as well outgoing: IPCBuffer = .{}, - current_payload_len: usize = 0, + current_payload: IPCBuffer = .{}, + write_req: uv.uv_write_t = std.mem.zeroes(uv.uv_write_t), connected: bool = false, has_written_version: if (Environment.allow_assert) u1 else u0 = 0, @@ -171,38 +190,65 @@ const NamedPipeIPCData = struct { server: uv.uv_pipe_t = std.mem.zeroes(uv.uv_pipe_t), pub fn processSend(this: *NamedPipeIPCData) void { - const bytes = this.outgoing.list.slice()[this.outgoing.cursor..]; + if (this.current_payload.size() > 0) { + // we have some pending async request, the next outgoing data will be processed after this finish + return; + } + + var bytes = this.outgoing.slice(); log("processSend {d}", .{bytes.len}); if (bytes.len == 0) return; - const req = bun.new(uv.uv_write_t, std.mem.zeroes(uv.uv_write_t)); - req.data = @ptrCast(this); - req.write_buffer = uv.uv_buf_t.init(bytes); - log("processSend write_buffer {d}", .{req.write_buffer.len}); - this.current_payload_len = bytes.len; - const write_err = uv.uv_write(req, @ptrCast(&this.pipe), @ptrCast(&req.write_buffer), 1, NamedPipeIPCData.onWriteCallback).int(); - if (write_err < 0) { - Output.printErrorln("Failed write IPC version", .{}); - return; + while (true) { + switch (this.pipe.tryWrite(bytes)) { + .err => |err| { + if (err.getErrno() != bun.C.E.AGAIN) { + Output.printErrorln("Failed to write outgoing data", .{}); + return; + } + + // ok we hit EGAIN and need to go async + + if (this.current_payload.size() > 0) { + // just wait the current request finish to send the next outgoing data + return; + } + + // current payload is empty we can just swap with outgoing + const temp = this.current_payload; + this.current_payload = this.outgoing; + this.outgoing = temp; + + // enqueue the write + this.write_req.write(@ptrCast(&this.pipe), bytes, this, onWriteCallback).unwrap() catch { + Output.printErrorln("Failed to write outgoing data", .{}); + return; + }; + }, + .result => |written| { + bytes = bytes[0..written]; + + if (bytes.len == 0) { + this.outgoing.reset(); + return; + } + this.outgoing.cursor += @intCast(written); + }, + } } } - fn onWriteCallback(req: *uv.uv_write_t, status: uv.ReturnCode) callconv(.C) void { - const this = bun.cast(*NamedPipeIPCData, req.data); - log("onWriteCallback {d} {d} {d}", .{ status.int(), this.current_payload_len, this.outgoing.list.len }); - defer bun.destroy(req); + fn onWriteCallback(this: *NamedPipeIPCData, status: uv.ReturnCode) void { + log("onWriteCallback {d} {d}", .{ status.int(), this.current_payload.size() }); if (status.errEnum()) |_| { - Output.printErrorln("Failed write IPC data", .{}); + Output.printErrorln("Failed to write outgoing data", .{}); return; } - const n = this.current_payload_len; - if (n == this.outgoing.list.len) { - this.outgoing.cursor = 0; - this.outgoing.list.len = 0; - } else { - this.outgoing.cursor += @intCast(n); - this.processSend(); - } + // success means that we send all the data + this.current_payload.reset(); + + // process pending outgoing data + this.processSend(); } pub fn writeVersionPacket(this: *NamedPipeIPCData) void { @@ -260,9 +306,11 @@ const NamedPipeIPCData = struct { pub fn close(this: *NamedPipeIPCData, comptime Context: type) void { if (this.server.loop != null) { - this.server.close(NewNamedPipeIPCHandler(Context).onServerClose); + const context = @as(*Context, @ptrCast(@alignCast(this.server.data))); + this.server.close(context, NewNamedPipeIPCHandler(Context).onServerClose); } else { - this.pipe.close(NewNamedPipeIPCHandler(Context).onClose); + const context = @as(*Context, @ptrCast(@alignCast(this.pipe.data))); + this.pipe.close(context, NewNamedPipeIPCHandler(Context).onClose); } } @@ -296,6 +344,12 @@ const NamedPipeIPCData = struct { this.writeVersionPacket(); } + + pub fn deinit(this: *NamedPipeIPCData) void { + this.outgoing.deinit(); + this.current_payload.deinit(); + this.incoming.deinitWithAllocator(bun.default_allocator); + } }; pub const IPCData = if (Environment.isWindows) NamedPipeIPCData else SocketIPCData; @@ -413,16 +467,15 @@ fn NewSocketIPCHandler(comptime Context: type) type { context: *Context, socket: Socket, ) void { - const to_write = context.ipc.outgoing.list.ptr[context.ipc.outgoing.cursor..context.ipc.outgoing.list.len]; + const to_write = context.ipc.outgoing.slice(); if (to_write.len == 0) { - context.ipc.outgoing.cursor = 0; - context.ipc.outgoing.list.len = 0; + context.ipc.outgoing.reset(); return; } + const n = socket.write(to_write, false); if (n == to_write.len) { - context.ipc.outgoing.cursor = 0; - context.ipc.outgoing.list.len = 0; + context.ipc.outgoing.reset(); } else if (n > 0) { context.ipc.outgoing.cursor += @intCast(n); } @@ -456,41 +509,23 @@ fn NewSocketIPCHandler(comptime Context: type) type { fn NewNamedPipeIPCHandler(comptime Context: type) type { const uv = bun.windows.libuv; return struct { - fn onStreamAlloc(handle: *uv.uv_stream_t, suggested_size: usize, buffer: *uv.uv_buf_t) callconv(.C) void { - const this: *Context = @ptrCast(@alignCast(handle.data)); - + fn onReadAlloc(this: *Context, suggested_size: usize) []u8 { var available = this.ipc.incoming.available(); if (available.len < suggested_size) { this.ipc.incoming.ensureUnusedCapacity(bun.default_allocator, suggested_size) catch bun.outOfMemory(); available = this.ipc.incoming.available(); } - log("onStreamAlloc {d}", .{suggested_size}); - buffer.* = .{ .base = @ptrCast(available.ptr), .len = @intCast(suggested_size) }; + log("onReadAlloc {d}", .{suggested_size}); + return available.ptr[0..suggested_size]; } - fn onRead(handle: *uv.uv_stream_t, nread: isize, buffer: *const uv.uv_buf_t) callconv(.C) void { - log("onRead {d}", .{nread}); - const this: *Context = @ptrCast(@alignCast(handle.data)); - if (nread <= 0) { - switch (nread) { - 0 => { - // EAGAIN or EWOULDBLOCK - return; - }, - uv.UV_EOF => { - handle.readStop(); - this.ipc.close(Context); - }, - else => { - handle.readStop(); - this.ipc.close(Context); - }, - } - - // when nread < 0 buffer maybe not point to a valid address - return; - } + fn onReadError(this: *Context, err: bun.C.E) void { + log("onReadError {}", .{err}); + this.ipc.close(Context); + } + fn onRead(this: *Context, buffer: []const u8) void { + log("onRead {d}", .{buffer.len}); this.ipc.incoming.len += @as(u32, @truncate(buffer.len)); var slice = this.ipc.incoming.slice(); const globalThis = switch (@typeInfo(@TypeOf(this.globalThis))) { @@ -556,7 +591,7 @@ fn NewNamedPipeIPCHandler(comptime Context: type) type { }, .result => { this.ipc.connected = true; - client.readStart(onStreamAlloc, onRead).unwrap() catch { + client.readStart(this, onReadAlloc, onReadError, onRead).unwrap() catch { this.ipc.close(Context); Output.printErrorln("Failed to connect IPC pipe", .{}); return; @@ -572,7 +607,7 @@ fn NewNamedPipeIPCHandler(comptime Context: type) type { return; } const this = bun.cast(*Context, req.data); - this.ipc.pipe.readStart(onStreamAlloc, onRead).unwrap() catch { + this.ipc.pipe.readStart(this, onReadAlloc, onReadError, onRead).unwrap() catch { this.ipc.close(Context); Output.printErrorln("Failed to connect IPC pipe", .{}); return; @@ -580,22 +615,21 @@ fn NewNamedPipeIPCHandler(comptime Context: type) type { this.ipc.connected = true; this.ipc.processSend(); } - pub fn onServerClose(handler: *anyopaque) callconv(.C) void { + + pub fn onServerClose(this: *Context) void { log("onServerClose", .{}); - const event = bun.cast(*uv.uv_pipe_t, handler); - const this = bun.cast(*Context, event.data); this.handleIPCClose(); + this.ipc.deinit(); } - pub fn onClose(handler: *anyopaque) callconv(.C) void { + pub fn onClose(this: *Context) void { log("onClose", .{}); - const event = bun.cast(*uv.uv_pipe_t, handler); - const this = bun.cast(*Context, event.data); if (this.ipc.server.loop != null) { - this.ipc.server.close(onServerClose); + this.ipc.server.close(this, onServerClose); return; } this.handleIPCClose(); + this.ipc.deinit(); } }; } diff --git a/src/deps/libuv.zig b/src/deps/libuv.zig index 6b82d6dc1d2a9f..883ecddbd2626d 100644 --- a/src/deps/libuv.zig +++ b/src/deps/libuv.zig @@ -415,8 +415,21 @@ fn HandleMixin(comptime Type: type) type { pub fn setData(handle: *Type, ptr: ?*anyopaque) void { uv_handle_set_data(@ptrCast(handle), ptr); } - pub fn close(this: *Type, cb: uv_close_cb) void { - uv_close(@ptrCast(this), @ptrCast(cb)); + + pub fn close(this: *Type, context: anytype, comptime onClose: ?*const (fn (@TypeOf(context)) void)) void { + if (comptime onClose) |callback| { + this.data = @ptrCast(context); + const Wrapper = struct { + pub fn uvCloseCb(handler: *anyopaque) callconv(.C) void { + const handle = bun.cast(*uv_handle_t, handler); + callback(@ptrCast(@alignCast(handle.data))); + } + }; + uv_close(@ptrCast(this), @ptrCast(&Wrapper.uvCloseCb)); + return; + } + + uv_close(@ptrCast(this), null); } pub fn hasRef(this: *const Type) bool { @@ -489,11 +502,27 @@ fn StreamMixin(comptime Type: type) type { return .{ .result = {} }; } - pub const stream_read_cb = ?*const fn (*uv_stream_t, isize, *const uv_buf_t) callconv(.C) void; - pub const stream_alloc_cb = ?*const fn (*uv_stream_t, usize, *uv_buf_t) callconv(.C) void; - - pub fn readStart(this: *Type, alloc_cb: stream_alloc_cb, read_cb: stream_read_cb) Maybe(void) { - const rc = uv_read_start(@ptrCast(this), @ptrCast(alloc_cb), @ptrCast(read_cb)); + pub fn readStart(this: *Type, context: anytype, comptime alloc_cb: *const (fn (@TypeOf(context), suggested_size: usize) []u8), comptime error_cb: *const (fn (@TypeOf(context), err: bun.C.E) void), comptime read_cb: *const (fn (@TypeOf(context), data: []const u8) void)) Maybe(void) { + const Context = @TypeOf(context); + this.data = @ptrCast(context); + const Wrapper = struct { + pub fn uvAllocb(req: *uv_stream_t, suggested_size: usize, buffer: *uv_buf_t) callconv(.C) void { + const context_data: Context = @ptrCast(@alignCast(req.data)); + buffer.* = uv_buf_t.init(alloc_cb(context_data, suggested_size)); + } + pub fn uvReadcb(req: *uv_stream_t, nreads: isize, buffer: *uv_buf_t) callconv(.C) void { + const context_data: Context = @ptrCast(@alignCast(req.data)); + if (nreads == 0) return; // EAGAIN or EWOULDBLOCK + if (nreads < 0) { + req.readStop(); + const rc = ReturnCodeI64{ .value = nreads }; + error_cb(context_data, rc.errEnum() orelse bun.C.E.CANCELED); + } else { + read_cb(context_data, buffer.slice()); + } + } + }; + const rc = uv_read_start(@ptrCast(this), @ptrCast(&Wrapper.uvAllocb), @ptrCast(&Wrapper.uvReadcb)); if (rc.errno()) |errno| { return .{ .err = .{ .errno = errno, .syscall = .listen, .from_libuv = true } }; } @@ -505,32 +534,46 @@ fn StreamMixin(comptime Type: type) type { _ = uv_read_stop(@ptrCast(this)); } - pub fn tryWrite(this: *Type, buffer: *uv_buf_t) Maybe(usize) { - const rc = uv_try_write(@ptrCast(this), @ptrCast(buffer), 1); - if (rc.errno()) |errno| { - return .{ .err = .{ .errno = errno, .syscall = .try_write, .from_libuv = true } }; + pub fn write(this: *Type, input: []const u8, context: anytype, comptime onWrite: ?*const (fn (@TypeOf(context), status: ReturnCode) void)) Maybe(void) { + if (comptime onWrite) |callback| { + const Context = @TypeOf(context); + + const Wrapper = struct { + pub fn uvWriteCb(req: *uv_write_t, status: ReturnCode) callconv(.C) void { + const context_data: Context = @ptrCast(@alignCast(req.data)); + bun.destroy(req); + callback(context_data, status); + } + }; + var uv_data = bun.new(uv_write_t, std.mem.zeroes(uv_write_t)); + uv_data.data = context; + uv_data.write_buffer = uv_buf_t.init(input); + + const rc = uv_write(uv_data, @ptrCast(this), @ptrCast(&uv_data.write_buffer), 1, &Wrapper.uvWriteCb); + if (rc.errno()) |errno| { + return .{ .err = .{ .errno = errno, .syscall = .write, .from_libuv = true } }; + } + return .{ .result = {} }; } - return .{ .result = @intCast(rc.int()) }; - } - pub fn tryWrite2(this: *Type, buffer: *uv_buf_t, send_handle: *uv_stream_t) ReturnCode { - const rc = uv_try_write2(@ptrCast(this), @ptrCast(buffer), 1, send_handle); + var req: uv_write_t = std.mem.zeroes(uv_write_t); + const rc = uv_write(&req, this, @ptrCast(&uv_buf_t.init(input)), 1, null); if (rc.errno()) |errno| { - return .{ .err = .{ .errno = errno, .syscall = .try_write2, .from_libuv = true } }; + return .{ .err = .{ .errno = errno, .syscall = .write, .from_libuv = true } }; } - return .{ .result = @intCast(rc.int()) }; + return .{ .result = {} }; } - pub fn tryWriteMany(this: *Type, buffers: []uv_buf_t) Maybe(usize) { - const rc = uv_try_write(@ptrCast(this), @ptrCast(buffers.ptr), @intCast(buffers.len)); + pub fn tryWrite(this: *Type, input: []const u8) Maybe(usize) { + const rc = uv_try_write(@ptrCast(this), @ptrCast(&uv_buf_t.init(input)), 1); if (rc.errno()) |errno| { return .{ .err = .{ .errno = errno, .syscall = .try_write, .from_libuv = true } }; } return .{ .result = @intCast(rc.int()) }; } - pub fn tryWriteMany2(this: *Type, buffers: []uv_buf_t, send_handle: *uv_stream_t) Maybe(usize) { - const rc = uv_try_write(@ptrCast(this), @ptrCast(buffers.ptr), @intCast(buffers.len), send_handle); + pub fn tryWrite2(this: *Type, input: []const u8, send_handle: *uv_stream_t) ReturnCode { + const rc = uv_try_write2(@ptrCast(this), @ptrCast(&uv_buf_t.init(input)), 1, send_handle); if (rc.errno()) |errno| { return .{ .err = .{ .errno = errno, .syscall = .try_write2, .from_libuv = true } }; } @@ -1279,6 +1322,31 @@ pub const struct_uv_write_s = extern struct { write_buffer: uv_buf_t, event_handle: HANDLE, wait_handle: HANDLE, + + pub fn write(req: *@This(), stream: *uv_stream_t, input: []const u8, context: anytype, comptime onWrite: ?*const (fn (@TypeOf(context), status: ReturnCode) void)) Maybe(void) { + if (comptime onWrite) |callback| { + const Wrapper = struct { + pub fn uvWriteCb(handler: *uv_write_t, status: ReturnCode) callconv(.C) void { + callback(@ptrCast(@alignCast(handler.data)), status); + } + }; + + req.data = context; + req.write_buffer = uv_buf_t.init(input); + + const rc = uv_write(req, stream, @ptrCast(&req.write_buffer), 1, &Wrapper.uvWriteCb); + if (rc.errno()) |errno| { + return .{ .err = .{ .errno = errno, .syscall = .write, .from_libuv = true } }; + } + return .{ .result = {} }; + } + + const rc = uv_write(req, stream, @ptrCast(&uv_buf_t.init(input)), 1, null); + if (rc.errno()) |errno| { + return .{ .err = .{ .errno = errno, .syscall = .write, .from_libuv = true } }; + } + return .{ .result = {} }; + } }; pub const uv_write_t = struct_uv_write_s; const union_unnamed_415 = extern union { @@ -2703,37 +2771,3 @@ pub const ReturnCodeI64 = extern struct { }; pub const addrinfo = std.os.windows.ws2_32.addrinfo; - -fn WriterMixin(comptime Type: type) type { - return struct { - pub fn write(mixin: *Type, input: []const u8, context: anytype, comptime onWrite: ?*const (fn (*@TypeOf(context), status: ReturnCode) void)) ReturnCode { - if (comptime onWrite) |callback| { - const Context = @TypeOf(context); - var data = bun.new(uv_write_t); - - data.data = context; - const Wrapper = struct { - uv_data: uv_write_t, - context: Context, - buf: uv_buf_t, - - pub fn uvWriteCb(req: *uv_write_t, status: ReturnCode) callconv(.C) void { - const this: *@This() = @fieldParentPtr(@This(), "uv_data", req); - const context_data = this.context; - bun.destroy(this); - callback(context_data, @enumFromInt(status)); - } - }; - var wrap = bun.new(Wrapper, Wrapper{ - .wrapper = undefined, - .context = context, - .buf = uv_buf_t.init(input), - }); - - return uv_write(&wrap.uv_data, @ptrCast(mixin), @ptrCast(&wrap.buf), 1, &Wrapper.uvWriteCb); - } - - return uv_write(null, mixin, @ptrCast(&uv_buf_t.init(input)), 1, null); - } - }; -} From c09d4b85110c2a2f983f4a42128bc7b36f135c3e Mon Sep 17 00:00:00 2001 From: cirospaciari Date: Fri, 26 Jan 2024 14:54:43 -0300 Subject: [PATCH 07/12] more wrappers --- src/bun.js/ipc.zig | 19 +++++++++---------- src/deps/libuv.zig | 32 +++++++++++++++++++++----------- 2 files changed, 30 insertions(+), 21 deletions(-) diff --git a/src/bun.js/ipc.zig b/src/bun.js/ipc.zig index 7228004893ae32..b789fb24b5d38d 100644 --- a/src/bun.js/ipc.zig +++ b/src/bun.js/ipc.zig @@ -322,7 +322,7 @@ const NamedPipeIPCData = struct { return .{ .err = err }; } ipc_pipe.data = @ptrCast(instance); - if (ipc_pipe.listenNamedPipe(named_pipe, 0, NewNamedPipeIPCHandler(Context).onNewClientConnect).asErr()) |err| { + if (ipc_pipe.listenNamedPipe(named_pipe, 0, instance, NewNamedPipeIPCHandler(Context).onNewClientConnect).asErr()) |err| { return .{ .err = err }; } @@ -340,7 +340,7 @@ const NamedPipeIPCData = struct { try ipc_pipe.init(uv.Loop.get(), true).unwrap(); ipc_pipe.data = @ptrCast(instance); this.connect_req.data = @ptrCast(instance); - try ipc_pipe.connect(&this.connect_req, named_pipe, NewNamedPipeIPCHandler(Context).onConnect).unwrap(); + try ipc_pipe.connect(&this.connect_req, named_pipe, instance, NewNamedPipeIPCHandler(Context).onConnect).unwrap(); this.writeVersionPacket(); } @@ -569,13 +569,12 @@ fn NewNamedPipeIPCHandler(comptime Context: type) type { } } - pub fn onNewClientConnect(req: *uv.uv_stream_t, status: c_int) callconv(.C) void { - log("onNewClientConnect {d}", .{status}); - if (status < 0) { + pub fn onNewClientConnect(this: *Context, status: uv.ReturnCode) void { + log("onNewClientConnect {d}", .{status.int()}); + if (status.errEnum()) |_| { Output.printErrorln("Failed to connect IPC pipe", .{}); return; } - const this = bun.cast(*Context, req.data); const client = &this.ipc.pipe; const server = &this.ipc.server; client.init(uv.Loop.get(), true).unwrap() catch { @@ -600,13 +599,13 @@ fn NewNamedPipeIPCHandler(comptime Context: type) type { }, } } - pub fn onConnect(req: *uv.uv_connect_t, status: c_int) callconv(.C) void { - log("onConnect {d}", .{status}); - if (status < 0) { + + pub fn onConnect(this: *Context, status: uv.ReturnCode) void { + log("onConnect {d}", .{status.int()}); + if (status.errEnum()) |_| { Output.printErrorln("Failed to connect IPC pipe", .{}); return; } - const this = bun.cast(*Context, req.data); this.ipc.pipe.readStart(this, onReadAlloc, onReadError, onRead).unwrap() catch { this.ipc.close(Context); Output.printErrorln("Failed to connect IPC pipe", .{}); diff --git a/src/deps/libuv.zig b/src/deps/libuv.zig index 883ecddbd2626d..d0cf16690eb050 100644 --- a/src/deps/libuv.zig +++ b/src/deps/libuv.zig @@ -479,15 +479,18 @@ fn ReqMixin(comptime Type: type) type { // https://docs.libuv.org/en/v1.x/stream.html fn StreamMixin(comptime Type: type) type { return struct { - - // pub extern fn uv_write(req: *uv_write_t, handle: *uv_stream_t, bufs: [*]const uv_buf_t, nbufs: c_uint, cb: uv_write_cb) ReturnCode; - // pub extern fn uv_write2(req: *uv_write_t, handle: *uv_stream_t, bufs: [*]const uv_buf_t, nbufs: c_uint, send_handle: *uv_stream_t, cb: uv_write_cb) ReturnCode; pub fn getWriteQueueSize(this: *Type) usize { return uv_stream_get_write_queue_size(@ptrCast(this)); } - pub fn listen(this: *Type, backlog: i32, cb: uv_connection_cb) Maybe(void) { - const rc = uv_listen(@ptrCast(this), backlog, cb); + pub fn listen(this: *Type, backlog: i32, context: anytype, comptime onConnect: *const (fn (@TypeOf(context), ReturnCode) void)) Maybe(void) { + this.data = @ptrCast(context); + const Wrapper = struct { + pub fn uvConnectCb(handle: *uv_stream_t, status: ReturnCode) callconv(.C) void { + onConnect(@ptrCast(@alignCast(handle.data)), status); + } + }; + const rc = uv_listen(@ptrCast(this), backlog, &Wrapper.uvConnectCb); if (rc.errno()) |errno| { return .{ .err = .{ .errno = errno, .syscall = .listen, .from_libuv = true } }; } @@ -1094,7 +1097,7 @@ const struct_unnamed_385 = extern struct { write_reqs_pending: c_uint, shutdown_req: [*c]uv_shutdown_t, }; -pub const uv_connection_cb = ?*const fn (*uv_stream_t, c_int) callconv(.C) void; +pub const uv_connection_cb = ?*const fn (*uv_stream_t, ReturnCode) callconv(.C) void; const struct_unnamed_389 = extern struct { connection_cb: uv_connection_cb, }; @@ -1402,11 +1405,11 @@ pub const struct_uv_pipe_s = extern struct { return .{ .result = {} }; } - pub fn listenNamedPipe(this: *@This(), named_pipe: []const u8, backlog: i32, onClientConnect: uv_connection_cb) Maybe(void) { + pub fn listenNamedPipe(this: *@This(), named_pipe: []const u8, backlog: i32, context: anytype, comptime onClientConnect: *const (fn (@TypeOf(context), ReturnCode) void)) Maybe(void) { if (this.bind(named_pipe, 0).asErr()) |err| { return .{ .err = err }; } - return this.listen(backlog, onClientConnect); + return this.listen(backlog, context, onClientConnect); } pub fn bind(this: *@This(), named_pipe: []const u8, flags: i32) Maybe(void) { @@ -1417,8 +1420,15 @@ pub const struct_uv_pipe_s = extern struct { return .{ .result = {} }; } - pub fn connect(this: *@This(), req: *uv_connect_t, name: []const u8, cb: uv_connect_cb) Maybe(void) { - const rc = uv_pipe_connect2(req, this, @ptrCast(name.ptr), name.len, 0, cb); + pub fn connect(this: *@This(), req: *uv_connect_t, name: []const u8, context: anytype, comptime onConnect: *const (fn (@TypeOf(context), ReturnCode) void)) Maybe(void) { + this.data = @ptrCast(context); + const Wrapper = struct { + pub fn uvConnectCb(handle: *uv_connect_t, status: ReturnCode) callconv(.C) void { + onConnect(@ptrCast(@alignCast(handle.data)), status); + } + }; + + const rc = uv_pipe_connect2(req, this, @ptrCast(name.ptr), name.len, 0, &Wrapper.uvConnectCb); if (rc.errno()) |errno| { return .{ .err = .{ .errno = errno, .syscall = .connect2, .from_libuv = true } }; } @@ -1740,7 +1750,7 @@ const union_unnamed_441 = extern union { connect: struct_unnamed_443, }; pub const uv_connect_t = struct_uv_connect_s; -pub const uv_connect_cb = ?*const fn (*uv_connect_t, c_int) callconv(.C) void; +pub const uv_connect_cb = ?*const fn (*uv_connect_t, ReturnCode) callconv(.C) void; pub const struct_uv_connect_s = extern struct { data: ?*anyopaque, type: uv_req_type, From 378469ecb6d8f69d937ccf5d30d7ad0cc3075473 Mon Sep 17 00:00:00 2001 From: cirospaciari Date: Fri, 26 Jan 2024 16:09:41 -0300 Subject: [PATCH 08/12] more clenaup + match some suggested names --- src/bun.js/api/bun/subprocess.zig | 14 +++++++------- src/bun.js/ipc.zig | 8 ++++---- src/deps/libuv.zig | 2 +- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/src/bun.js/api/bun/subprocess.zig b/src/bun.js/api/bun/subprocess.zig index 2ec3f0e68ec097..9774646d411df6 100644 --- a/src/bun.js/api/bun/subprocess.zig +++ b/src/bun.js/api/bun/subprocess.zig @@ -863,7 +863,7 @@ pub const Subprocess = struct { this.writeAllowBlocking(is_sync); } - pub fn onWrite(this: *BufferedPipeInput, status: uv.ReturnCode) void { + pub fn onWriteComplete(this: *BufferedPipeInput, status: uv.ReturnCode) void { if (this.pipe == null) return; if (status.errEnum()) |_| { log("uv_write({d}) fail: {d}", .{ this.remain.len, status.int() }); @@ -894,7 +894,7 @@ pub const Subprocess = struct { const errno = err.getErrno(); if (errno == bun.C.E.AGAIN) { //EAGAIN - this.write_req.write(@ptrCast(pipe), to_write, this, BufferedPipeInput.onWrite).unwrap() catch |write_err| { + this.write_req.write(@ptrCast(pipe), to_write, this, onWriteComplete).unwrap() catch |write_err| { log("uv_write({d}) fail: {}", .{ this.remain.len, write_err }); this.deinit(); }; @@ -912,7 +912,7 @@ pub const Subprocess = struct { } } } else { - this.write_req.write(@ptrCast(pipe), to_write, this, BufferedPipeInput.onWrite).unwrap() catch |err| { + this.write_req.write(@ptrCast(pipe), to_write, this, onWriteComplete).unwrap() catch |err| { log("uv_write({d}) fail: {}", .{ this.remain.len, err }); this.deinit(); }; @@ -937,7 +937,7 @@ pub const Subprocess = struct { } } - fn uvClosedCallback(this: *BufferedPipeInput) void { + fn onStreamClosed(this: *BufferedPipeInput) void { if (this.deinit_onclose) { this.destroy(); } @@ -950,7 +950,7 @@ pub const Subprocess = struct { } if (this.pipe) |pipe| { - pipe.close(this, BufferedPipeInput.uvClosedCallback); + pipe.close(this, onStreamClosed); } } @@ -1519,7 +1519,7 @@ pub const Subprocess = struct { } } - fn uvClosedCallback(this: *BufferedOutput) void { + fn onStreamClosed(this: *BufferedOutput) void { this.readable_stream_ref.deinit(); this.closeCallback.run(); } @@ -1536,7 +1536,7 @@ pub const Subprocess = struct { this.readable_stream_ref.deinit(); this.closeCallback.run(); } else { - this.stream.close(this, BufferedOutput.uvClosedCallback); + this.stream.close(this, onStreamClosed); } } else { this.stream.close(); diff --git a/src/bun.js/ipc.zig b/src/bun.js/ipc.zig index b789fb24b5d38d..31d177e47452ec 100644 --- a/src/bun.js/ipc.zig +++ b/src/bun.js/ipc.zig @@ -220,7 +220,7 @@ const NamedPipeIPCData = struct { this.outgoing = temp; // enqueue the write - this.write_req.write(@ptrCast(&this.pipe), bytes, this, onWriteCallback).unwrap() catch { + this.write_req.write(@ptrCast(&this.pipe), bytes, this, onWriteComplete).unwrap() catch { Output.printErrorln("Failed to write outgoing data", .{}); return; }; @@ -238,8 +238,8 @@ const NamedPipeIPCData = struct { } } - fn onWriteCallback(this: *NamedPipeIPCData, status: uv.ReturnCode) void { - log("onWriteCallback {d} {d}", .{ status.int(), this.current_payload.size() }); + fn onWriteComplete(this: *NamedPipeIPCData, status: uv.ReturnCode) void { + log("onWriteComplete {d} {d}", .{ status.int(), this.current_payload.size() }); if (status.errEnum()) |_| { Output.printErrorln("Failed to write outgoing data", .{}); return; @@ -326,7 +326,7 @@ const NamedPipeIPCData = struct { return .{ .err = err }; } - ipc_pipe.setPendingInstances(1); + ipc_pipe.setPendingInstancesCount(1); ipc_pipe.unref(); diff --git a/src/deps/libuv.zig b/src/deps/libuv.zig index d0cf16690eb050..af3df7cadace1e 100644 --- a/src/deps/libuv.zig +++ b/src/deps/libuv.zig @@ -1443,7 +1443,7 @@ pub const struct_uv_pipe_s = extern struct { return .{ .result = {} }; } - pub fn setPendingInstances(this: *@This(), count: i32) void { + pub fn setPendingInstancesCount(this: *@This(), count: i32) void { uv_pipe_pending_instances(this, count); } }; From 59307ff8e1e221c225496e2da38f528c73311175 Mon Sep 17 00:00:00 2001 From: cirospaciari Date: Fri, 26 Jan 2024 20:28:05 -0300 Subject: [PATCH 09/12] fix cwd with relative path and fix a bunch of tests --- src/bun.js/api/bun/subprocess.zig | 37 ++++++ test/js/bun/spawn/spawn.fixture.js | 44 +++++++ test/js/bun/spawn/spawn.ipc.test.ts | 37 ++++++ test/js/bun/spawn/spawn.test.ts | 174 +++++++++++++++++----------- 4 files changed, 225 insertions(+), 67 deletions(-) create mode 100644 test/js/bun/spawn/spawn.fixture.js create mode 100644 test/js/bun/spawn/spawn.ipc.test.ts diff --git a/src/bun.js/api/bun/subprocess.zig b/src/bun.js/api/bun/subprocess.zig index 9774646d411df6..314ee764e57845 100644 --- a/src/bun.js/api/bun/subprocess.zig +++ b/src/bun.js/api/bun/subprocess.zig @@ -2259,7 +2259,30 @@ pub const Subprocess = struct { }; var cwd_resolver = bun.path.PosixToWinNormalizer{}; + var joined_buf: [bun.MAX_PATH_BYTES + 1]u8 = undefined; + if (!std.fs.path.isAbsoluteWindows(cwd)) { + // we need the absolute path so we can resolveCWDZ + var buf: [bun.MAX_PATH_BYTES + 1]u8 = undefined; + var parts = [_]string{ + cwd, + }; + + const application_cwd = bun.getcwd(&buf) catch |err| { + alloc.destroy(subprocess); + return globalThis.handleError(err, "in uv_spawn"); + }; + buf[application_cwd.len] = std.fs.path.sep; + const file_path = bun.path.joinAbsStringBuf( + buf[0 .. application_cwd.len + 1], + &joined_buf, + &parts, + .auto, + ); + + joined_buf[file_path.len] = 0; + cwd = joined_buf[0..file_path.len :0]; + } const options = uv.uv_process_options_t{ .exit_cb = uvExitCallback, .args = @ptrCast(argv.items[0 .. argv.items.len - 1 :null]), @@ -3202,6 +3225,20 @@ pub const Subprocess = struct { .path => |pathlike| { _ = pathlike; @panic("TODO"); + // var file_path: [bun.MAX_PATH_BYTES]u8 = undefined; + // const path_fd = try bun.sys.open( + // pathlike.path.sliceZ(&file_path), + // std.os.O.WRONLY | std.os.O.CREAT | std.os.O.NONBLOCK, + // 0o664, + // ).unwrap(); + + // try pipe.init(uv.Loop.get(), false).unwrap(); + // try pipe.open(bun.uvfdcast(path_fd)).unwrap(); + + // return uv.uv_stdio_container_s{ + // .flags = @intCast(uv.UV_INHERIT_STREAM), + // .data = .{ .stream = @ptrCast(pipe) }, + // }; }, .inherit => uv.uv_stdio_container_s{ .flags = uv.UV_INHERIT_FD, diff --git a/test/js/bun/spawn/spawn.fixture.js b/test/js/bun/spawn/spawn.fixture.js new file mode 100644 index 00000000000000..6ef61d91f0e301 --- /dev/null +++ b/test/js/bun/spawn/spawn.fixture.js @@ -0,0 +1,44 @@ +import fs from "fs"; +const [command, argument] = process.argv.slice(2); + +try { + switch (command) { + case "sleep": + Bun.sleepSync(parseFloat(argument || "0") * 1000); + break; + case "echo": { + console.log(argument || ""); + break; + } + case "printenv": { + console.log(process.env[argument] || ""); + break; + } + case "false": { + process.exit(1); + } + case "true": { + process.exit(0); + } + case "cat": { + if (fs.existsSync(argument)) { + // cat file + const writer = Bun.stdout.writer(); + writer.write(fs.readFileSync(argument)); + writer.flush(); + } else if (typeof argument == "string") { + // cat text + const writer = Bun.stdout.writer(); + writer.write(argument); + writer.flush(); + } else { + // echo + const writer = Bun.stdout.writer(); + writer.write(await Bun.readableStreamToText(Bun.stdin)); + writer.flush(); + } + } + default: + break; + } +} catch {} diff --git a/test/js/bun/spawn/spawn.ipc.test.ts b/test/js/bun/spawn/spawn.ipc.test.ts new file mode 100644 index 00000000000000..330620072b101f --- /dev/null +++ b/test/js/bun/spawn/spawn.ipc.test.ts @@ -0,0 +1,37 @@ +// @known-failing-on-windows: 1 failing +import { spawn } from "bun"; +import { describe, expect, it } from "bun:test"; +import { gcTick, bunExe } from "harness"; +import path from "path"; + +describe("ipc", () => { + it("the subprocess should be defined and the child should send", done => { + gcTick(); + const returned_subprocess = spawn([bunExe(), path.join(__dirname, "bun-ipc-child.js")], { + ipc: (message, subProcess) => { + expect(subProcess).toBe(returned_subprocess); + expect(message).toBe("hello"); + subProcess.kill(); + done(); + gcTick(); + }, + }); + }); + + it("the subprocess should receive the parent message and respond back", done => { + gcTick(); + + const parentMessage = "I am your father"; + const childProc = spawn([bunExe(), path.join(__dirname, "bun-ipc-child-respond.js")], { + ipc: (message, subProcess) => { + expect(message).toBe(`pong:${parentMessage}`); + subProcess.kill(); + done(); + gcTick(); + }, + }); + + childProc.send(parentMessage); + gcTick(); + }); +}); diff --git a/test/js/bun/spawn/spawn.test.ts b/test/js/bun/spawn/spawn.test.ts index 860bb25fe2c35f..50792f30574d72 100644 --- a/test/js/bun/spawn/spawn.test.ts +++ b/test/js/bun/spawn/spawn.test.ts @@ -5,6 +5,8 @@ import { gcTick as _gcTick, bunExe, bunEnv } from "harness"; import { rmSync, writeFileSync } from "node:fs"; import path from "path"; +const help_fixture = path.join(import.meta.dir, "spawn.fixture.js"); + for (let [gcTick, label] of [ [_gcTick, "gcTick"], // [() => {}, "no gc tick"], @@ -15,7 +17,11 @@ for (let [gcTick, label] of [ const hugeString = "hello".repeat(10000).slice(); it("as an array", () => { - const { stdout } = spawnSync(["echo", "hi"]); + const { stdout } = spawnSync([bunExe(), help_fixture, "echo", "hi"], { + env: { + BUN_DEBUG_QUIET_LOGS: "1", + }, + }); gcTick(); // stdout is a Buffer const text = stdout!.toString(); @@ -25,8 +31,11 @@ for (let [gcTick, label] of [ it("Uint8Array works as stdin", async () => { const { stdout, stderr } = spawnSync({ - cmd: ["cat"], + cmd: [bunExe(), help_fixture, "cat"], stdin: new TextEncoder().encode(hugeString), + env: { + BUN_DEBUG_QUIET_LOGS: "1", + }, }); gcTick(); expect(stdout!.toString()).toBe(hugeString); @@ -36,11 +45,11 @@ for (let [gcTick, label] of [ it("check exit code", async () => { const { exitCode: exitCode1 } = spawnSync({ - cmd: ["ls"], + cmd: [bunExe(), help_fixture, "echo", "1"], }); gcTick(); const { exitCode: exitCode2 } = spawnSync({ - cmd: ["false"], + cmd: [bunExe(), help_fixture, "false"], }); gcTick(); expect(exitCode1).toBe(0); @@ -51,8 +60,11 @@ for (let [gcTick, label] of [ it("throws errors for invalid arguments", async () => { expect(() => { spawnSync({ - cmd: ["echo", "hi"], + cmd: [ bunExe(), help_fixture, "echo", "hi"], cwd: "./this-should-not-exist", + env: { + BUN_DEBUG_QUIET_LOGS: "1", + }, }); }).toThrow("No such file or directory"); }); @@ -64,10 +76,13 @@ for (let [gcTick, label] of [ it("as an array", async () => { gcTick(); await (async () => { - const { stdout } = spawn(["echo", "hello"], { + const { stdout } = spawn([bunExe(), help_fixture, "echo", "hello"], { stdout: "pipe", stderr: null, stdin: null, + env: { + BUN_DEBUG_QUIET_LOGS: "1", + }, }); gcTick(); const text = await new Response(stdout).text(); @@ -78,11 +93,12 @@ for (let [gcTick, label] of [ it("as an array with options object", async () => { gcTick(); - const { stdout } = spawn(["printenv", "FOO"], { + const { stdout } = spawn([bunExe(), help_fixture, "printenv", "FOO"], { cwd: "/tmp", env: { ...process.env, FOO: "bar", + BUN_DEBUG_QUIET_LOGS: "1", }, stdin: null, stdout: "pipe", @@ -98,9 +114,12 @@ for (let [gcTick, label] of [ rmSync("/tmp/out.123.txt", { force: true }); gcTick(); const { exited } = spawn({ - cmd: ["cat"], + cmd: [bunExe(), help_fixture, "cat"], stdin: new TextEncoder().encode(hugeString), stdout: Bun.file("/tmp/out.123.txt"), + env: { + BUN_DEBUG_QUIET_LOGS: "1", + }, }); gcTick(); await exited; @@ -110,11 +129,11 @@ for (let [gcTick, label] of [ it("check exit code", async () => { const exitCode1 = await spawn({ - cmd: ["ls"], + cmd: [bunExe(), "help"], }).exited; gcTick(); const exitCode2 = await spawn({ - cmd: ["false"], + cmd: [bunExe(), help_fixture, "false"], }).exited; gcTick(); expect(exitCode1).toBe(0); @@ -124,7 +143,10 @@ for (let [gcTick, label] of [ it("nothing to stdout and sleeping doesn't keep process open 4ever", async () => { const proc = spawn({ - cmd: ["sleep", "0.1"], + cmd: [bunExe(), help_fixture, "sleep", "0.1"], + env: { + BUN_DEBUG_QUIET_LOGS: "1", + }, }); gcTick(); for await (const _ of proc.stdout) { @@ -139,7 +161,7 @@ for (let [gcTick, label] of [ await new Promise(resolve => { var counter = 0; spawn({ - cmd: ["ls"], + cmd: [bunExe(), "help"], stdin: "ignore", stdout: "ignore", stderr: "ignore", @@ -153,7 +175,7 @@ for (let [gcTick, label] of [ }); spawn({ - cmd: ["false"], + cmd: [bunExe(), help_fixture, "false"], stdin: "ignore", stdout: "ignore", stderr: "ignore", @@ -177,10 +199,13 @@ for (let [gcTick, label] of [ it.skip("Uint8Array works as stdout", () => { gcTick(); const stdout_buffer = new Uint8Array(11); - const { stdout } = spawnSync(["echo", "hello world"], { + const { stdout } = spawnSync([bunExe(), help_fixture, "echo", "hello world"], { stdout: stdout_buffer, stderr: null, stdin: null, + env: { + BUN_DEBUG_QUIET_LOGS: "1", + }, }); gcTick(); const text = new TextDecoder().decode(stdout); @@ -193,10 +218,13 @@ for (let [gcTick, label] of [ it.skip("Uint8Array works as stdout when is smaller than output", () => { gcTick(); const stdout_buffer = new Uint8Array(5); - const { stdout } = spawnSync(["echo", "hello world"], { + const { stdout } = spawnSync([bunExe(), help_fixture, "echo", "hello world"], { stdout: stdout_buffer, stderr: null, stdin: null, + env: { + BUN_DEBUG_QUIET_LOGS: "1", + }, }); gcTick(); const text = new TextDecoder().decode(stdout); @@ -209,10 +237,13 @@ for (let [gcTick, label] of [ it.skip("Uint8Array works as stdout when is the exactly size than output", () => { gcTick(); const stdout_buffer = new Uint8Array(12); - const { stdout } = spawnSync(["echo", "hello world"], { + const { stdout } = spawnSync([bunExe(), help_fixture, "echo", "hello world"], { stdout: stdout_buffer, stderr: null, stdin: null, + env: { + BUN_DEBUG_QUIET_LOGS: "1", + }, }); gcTick(); const text = new TextDecoder().decode(stdout); @@ -225,10 +256,13 @@ for (let [gcTick, label] of [ it.skip("Uint8Array works as stdout when is larger than output", () => { gcTick(); const stdout_buffer = new Uint8Array(15); - const { stdout } = spawnSync(["echo", "hello world"], { + const { stdout } = spawnSync([bunExe(), help_fixture, "echo", "hello world"], { stdout: stdout_buffer, stderr: null, stdin: null, + env: { + BUN_DEBUG_QUIET_LOGS: "1", + }, }); gcTick(); const text = new TextDecoder().decode(stdout); @@ -242,9 +276,12 @@ for (let [gcTick, label] of [ rmSync("/tmp/out.123.txt", { force: true }); gcTick(); const { exited } = spawn({ - cmd: ["cat"], + cmd: [bunExe(), help_fixture, "cat"], stdin: new Blob([new TextEncoder().encode(hugeString)]), stdout: Bun.file("/tmp/out.123.txt"), + env: { + BUN_DEBUG_QUIET_LOGS: "1", + }, }); await exited; @@ -255,8 +292,11 @@ for (let [gcTick, label] of [ rmSync("/tmp/out.123.txt", { force: true }); gcTick(); const { exited } = spawn({ - cmd: ["echo", "hello"], + cmd: [bunExe(), help_fixture, "echo", "hello"], stdout: Bun.file("/tmp/out.123.txt"), + env: { + BUN_DEBUG_QUIET_LOGS: "1", + }, }); await exited; @@ -268,9 +308,12 @@ for (let [gcTick, label] of [ await write(Bun.file("/tmp/out.456.txt"), "hello there!"); gcTick(); const { stdout } = spawn({ - cmd: ["cat"], + cmd: [bunExe(), help_fixture, "cat"], stdout: "pipe", stdin: Bun.file("/tmp/out.456.txt"), + env: { + BUN_DEBUG_QUIET_LOGS: "1", + }, }); gcTick(); expect(await readableStreamToText(stdout!)).toBe("hello there!"); @@ -283,9 +326,12 @@ for (let [gcTick, label] of [ gcTick(); const { exited } = spawn({ - cmd: ["cat"], + cmd: [bunExe(), help_fixture, "cat"], stdout: Bun.file("/tmp/out.123.txt"), stdin: Bun.file("/tmp/out.456.txt"), + env: { + BUN_DEBUG_QUIET_LOGS: "1", + }, }); gcTick(); await exited; @@ -298,8 +344,11 @@ for (let [gcTick, label] of [ await Bun.write("/tmp/out.txt", hugeString); gcTick(); const { stdout } = spawn({ - cmd: ["cat", "/tmp/out.txt"], + cmd: [bunExe(), help_fixture, "cat", "/tmp/out.txt"], stdout: "pipe", + env: { + BUN_DEBUG_QUIET_LOGS: "1", + }, }); gcTick(); @@ -311,8 +360,11 @@ for (let [gcTick, label] of [ it("kill(1) works", async () => { const process = spawn({ - cmd: ["bash", "-c", "sleep 1000"], + cmd: [bunExe(), help_fixture, "sleep", "1000"], stdout: "pipe", + env: { + BUN_DEBUG_QUIET_LOGS: "1", + }, }); gcTick(); const prom = process.exited; @@ -322,8 +374,11 @@ for (let [gcTick, label] of [ it("kill() works", async () => { const process = spawn({ - cmd: ["bash", "-c", "sleep 1000"], + cmd: [bunExe(), help_fixture, "sleep", "1000"], stdout: "pipe", + env: { + BUN_DEBUG_QUIET_LOGS: "1", + }, }); gcTick(); const prom = process.exited; @@ -333,11 +388,14 @@ for (let [gcTick, label] of [ it("stdin can be read and stdout can be written", async () => { const proc = spawn({ - cmd: ["bash", import.meta.dir + "/bash-echo.sh"], + cmd: [bunExe(), help_fixture, "cat"], stdout: "pipe", stdin: "pipe", lazy: true, stderr: "inherit", + env: { + BUN_DEBUG_QUIET_LOGS: "1", + }, }); var stdout = proc.stdout; @@ -369,19 +427,25 @@ for (let [gcTick, label] of [ describe("pipe", () => { function huge() { return spawn({ - cmd: ["echo", hugeString], + cmd: [bunExe(), help_fixture, "cat"], stdout: "pipe", - stdin: "pipe", + stdin: Buffer.from(hugeString), stderr: "inherit", lazy: true, + env: { + BUN_DEBUG_QUIET_LOGS: "1", + }, }); } function helloWorld() { return spawn({ - cmd: ["echo", "hello"], + cmd: [bunExe(), help_fixture, "echo", "hello"], stdout: "pipe", stdin: "ignore", + env: { + BUN_DEBUG_QUIET_LOGS: "1", + }, }); } @@ -452,43 +516,14 @@ for (let [gcTick, label] of [ } }); - describe("ipc", () => { - it("the subprocess should be defined and the child should send", done => { - gcTick(); - const returned_subprocess = spawn([bunExe(), path.join(__dirname, "bun-ipc-child.js")], { - ipc: (message, subProcess) => { - expect(subProcess).toBe(returned_subprocess); - expect(message).toBe("hello"); - subProcess.kill(); - done(); - gcTick(); - }, - }); - }); - - it("the subprocess should receive the parent message and respond back", done => { - gcTick(); - - const parentMessage = "I am your father"; - const childProc = spawn([bunExe(), path.join(__dirname, "bun-ipc-child-respond.js")], { - ipc: (message, subProcess) => { - expect(message).toBe(`pong:${parentMessage}`); - subProcess.kill(); - done(); - gcTick(); - }, - }); - - childProc.send(parentMessage); - gcTick(); - }); - }); - it("throws errors for invalid arguments", async () => { expect(() => { spawnSync({ - cmd: ["echo", "hi"], + cmd: [bunExe(), help_fixture, "echo", "hi"], cwd: "./this-should-not-exist", + env: { + BUN_DEBUG_QUIET_LOGS: "1", + }, }); }).toThrow("No such file or directory"); }); @@ -505,6 +540,7 @@ if (!process.env.BUN_FEATURE_FLAG_FORCE_WAITER_THREAD) { // Both flags are necessary to force this condition "BUN_FEATURE_FLAG_FORCE_WAITER_THREAD": "1", "BUN_GARBAGE_COLLECTOR_LEVEL": "1", + BUN_DEBUG_QUIET_LOGS: "1", }, stderr: "inherit", stdout: "inherit", @@ -518,7 +554,7 @@ describe("spawn unref and kill should not hang", () => { it("kill and await exited", async () => { for (let i = 0; i < 10; i++) { const proc = spawn({ - cmd: ["sleep", "0.001"], + cmd: [bunExe(), help_fixture, "sleep", "0.001"], stdout: "ignore", stderr: "ignore", stdin: "ignore", @@ -532,7 +568,7 @@ describe("spawn unref and kill should not hang", () => { it("unref", async () => { for (let i = 0; i < 100; i++) { const proc = spawn({ - cmd: ["sleep", "0.001"], + cmd: [bunExe(), help_fixture, "sleep", "0.001"], stdout: "ignore", stderr: "ignore", stdin: "ignore", @@ -546,7 +582,7 @@ describe("spawn unref and kill should not hang", () => { it("kill and unref", async () => { for (let i = 0; i < 100; i++) { const proc = spawn({ - cmd: ["sleep", "0.001"], + cmd: [bunExe(), help_fixture, "sleep", "0.001"], stdout: "ignore", stderr: "ignore", stdin: "ignore", @@ -561,7 +597,7 @@ describe("spawn unref and kill should not hang", () => { it("unref and kill", async () => { for (let i = 0; i < 100; i++) { const proc = spawn({ - cmd: ["sleep", "0.001"], + cmd: [bunExe(), help_fixture, "sleep", "0.001"], stdout: "ignore", stderr: "ignore", stdin: "ignore", @@ -588,7 +624,7 @@ async function runTest(sleep: string, order = ["sleep", "kill", "unref", "exited console.log("running", order.join(",")); for (let i = 0; i < 100; i++) { const proc = spawn({ - cmd: ["sleep", sleep], + cmd: [bunExe(), help_fixture, "sleep", sleep], stdout: "ignore", stderr: "ignore", stdin: "ignore", @@ -658,7 +694,11 @@ it("#3480", async () => { var server = Bun.serve({ port: 0, fetch: (req, res) => { - Bun.spawnSync(["echo", "1"], {}); + Bun.spawnSync([bunExe(), help_fixture, "echo", "1"], { + env: { + BUN_DEBUG_QUIET_LOGS: "1", + }, + }); return new Response("Hello world!"); }, }); From d4ce4e103f6e7c6b2312c4002db66283f417b84f Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Fri, 26 Jan 2024 23:28:49 +0000 Subject: [PATCH 10/12] [autofix.ci] apply automated fixes --- test/js/bun/spawn/spawn.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/js/bun/spawn/spawn.test.ts b/test/js/bun/spawn/spawn.test.ts index 50792f30574d72..e83b82abbf7e0c 100644 --- a/test/js/bun/spawn/spawn.test.ts +++ b/test/js/bun/spawn/spawn.test.ts @@ -60,7 +60,7 @@ for (let [gcTick, label] of [ it("throws errors for invalid arguments", async () => { expect(() => { spawnSync({ - cmd: [ bunExe(), help_fixture, "echo", "hi"], + cmd: [bunExe(), help_fixture, "echo", "hi"], cwd: "./this-should-not-exist", env: { BUN_DEBUG_QUIET_LOGS: "1", From 1accffaf05268b042b9031a6c849b964064b5d8b Mon Sep 17 00:00:00 2001 From: cirospaciari Date: Fri, 26 Jan 2024 20:31:21 -0300 Subject: [PATCH 11/12] remove know failing mark --- test/js/bun/spawn/spawn.ipc.test.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/test/js/bun/spawn/spawn.ipc.test.ts b/test/js/bun/spawn/spawn.ipc.test.ts index 330620072b101f..f492aa3ce1cc79 100644 --- a/test/js/bun/spawn/spawn.ipc.test.ts +++ b/test/js/bun/spawn/spawn.ipc.test.ts @@ -1,4 +1,3 @@ -// @known-failing-on-windows: 1 failing import { spawn } from "bun"; import { describe, expect, it } from "bun:test"; import { gcTick, bunExe } from "harness"; From f5f71e56e1c8e96f2690a45e2e2f58fda2fb8ac5 Mon Sep 17 00:00:00 2001 From: cirospaciari Date: Sat, 27 Jan 2024 13:24:54 -0300 Subject: [PATCH 12/12] add .path support so we can use Bun.file as stdin/stdout/stderr --- src/bun.js/api/bun/subprocess.zig | 53 +++++++++++++++++++++---------- 1 file changed, 37 insertions(+), 16 deletions(-) diff --git a/src/bun.js/api/bun/subprocess.zig b/src/bun.js/api/bun/subprocess.zig index 314ee764e57845..0ef151fb623075 100644 --- a/src/bun.js/api/bun/subprocess.zig +++ b/src/bun.js/api/bun/subprocess.zig @@ -3223,22 +3223,43 @@ pub const Subprocess = struct { .data = .{ .fd = bun.uvfdcast(_fd) }, }, .path => |pathlike| { - _ = pathlike; - @panic("TODO"); - // var file_path: [bun.MAX_PATH_BYTES]u8 = undefined; - // const path_fd = try bun.sys.open( - // pathlike.path.sliceZ(&file_path), - // std.os.O.WRONLY | std.os.O.CREAT | std.os.O.NONBLOCK, - // 0o664, - // ).unwrap(); - - // try pipe.init(uv.Loop.get(), false).unwrap(); - // try pipe.open(bun.uvfdcast(path_fd)).unwrap(); - - // return uv.uv_stdio_container_s{ - // .flags = @intCast(uv.UV_INHERIT_STREAM), - // .data = .{ .stream = @ptrCast(pipe) }, - // }; + var path_buf: [bun.MAX_PATH_BYTES]u8 = undefined; + var resolver = bun.path.PosixToWinNormalizer{}; + const flag: i32 = (if (isReadable) os.O.RDONLY else std.os.O.WRONLY); + + var joined_buf: [bun.MAX_PATH_BYTES]u8 = undefined; + var slice = pathlike.sliceZ(&path_buf); + if (!std.fs.path.isAbsoluteWindows(slice)) { + // we need the absolute path so we can open + var cwd_buf: [bun.MAX_PATH_BYTES + 1]u8 = undefined; + + var parts = [_]string{ + slice, + }; + + const application_cwd = try bun.getcwd(&cwd_buf); + cwd_buf[application_cwd.len] = std.fs.path.sep; + const file_path = bun.path.joinAbsStringBuf( + cwd_buf[0 .. application_cwd.len + 1], + &joined_buf, + &parts, + .auto, + ); + + joined_buf[file_path.len] = 0; + slice = joined_buf[0..file_path.len :0]; + } + + const path_fd = try bun.sys.open( + try resolver.resolveCWDZ(slice), + flag | std.os.O.CREAT | std.os.O.NONBLOCK, + 0o664, + ).unwrap(); + + return uv.uv_stdio_container_s{ + .flags = uv.UV_INHERIT_FD, + .data = .{ .fd = bun.uvfdcast(path_fd) }, + }; }, .inherit => uv.uv_stdio_container_s{ .flags = uv.UV_INHERIT_FD,