Skip to content

Commit

Permalink
Fix flaky signal handlers on posix (#15751)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jarred-Sumner authored Dec 14, 2024
1 parent bd1c5e9 commit 20f9cf0
Show file tree
Hide file tree
Showing 8 changed files with 225 additions and 27 deletions.
2 changes: 2 additions & 0 deletions src/async/posix_event_loop.zig
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ pub const FilePoll = struct {
const Request = JSC.DNS.InternalDNS.Request;
const LifecycleScriptSubprocessOutputReader = bun.install.LifecycleScriptSubprocess.OutputReader;
const BufferedReader = bun.io.BufferedReader;

pub const Owner = bun.TaggedPointerUnion(.{
FileSink,

Expand Down Expand Up @@ -386,6 +387,7 @@ pub const FilePoll = struct {
var handler: *BufferedReader = ptr.as(BufferedReader);
handler.onPoll(size_or_offset, poll.flags.contains(.hup));
},

@field(Owner.Tag, bun.meta.typeBaseName(@typeName(Process))) => {
log("onUpdate " ++ kqueue_or_epoll ++ " (fd: {}) Process", .{poll.fd});
var loader = ptr.as(Process);
Expand Down
39 changes: 24 additions & 15 deletions src/bun.js/bindings/BunProcess.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -823,6 +823,19 @@ bool isSignalName(WTF::String input)
return signalNameToNumberMap->contains(input);
}

extern "C" void Bun__onSignalForJS(int signalNumber, Zig::GlobalObject* globalObject)
{
Process* process = jsCast<Process*>(globalObject->processObject());

String signalName = signalNumberToNameMap->get(signalNumber);
Identifier signalNameIdentifier = Identifier::fromString(globalObject->vm(), signalName);
MarkedArgumentBuffer args;
args.append(jsString(globalObject->vm(), signalNameIdentifier.string()));
args.append(jsNumber(signalNumber));

process->wrapped().emitForBindings(signalNameIdentifier, args);
}

#if OS(WINDOWS)
extern "C" uv_signal_t* Bun__UVSignalHandle__init(JSC::JSGlobalObject* lexicalGlobalObject, int signalNumber, void (*callback)(uv_signal_t*, int));
extern "C" uv_signal_t* Bun__UVSignalHandle__close(uv_signal_t*);
Expand All @@ -834,28 +847,20 @@ void signalHandler(int signalNumber)
void signalHandler(uv_signal_t* signal, int signalNumber)
#endif
{
#if OS(WINDOWS)
if (UNLIKELY(signalNumberToNameMap->find(signalNumber) == signalNumberToNameMap->end()))
return;

auto* context = ScriptExecutionContext::getMainThreadScriptExecutionContext();
if (UNLIKELY(!context))
return;

// signal handlers can be run on any thread
context->postTaskConcurrently([signalNumber](ScriptExecutionContext& context) {
JSGlobalObject* lexicalGlobalObject = context.jsGlobalObject();
Zig::GlobalObject* globalObject = jsCast<Zig::GlobalObject*>(lexicalGlobalObject);

Process* process = jsCast<Process*>(globalObject->processObject());

String signalName = signalNumberToNameMap->get(signalNumber);
Identifier signalNameIdentifier = Identifier::fromString(globalObject->vm(), signalName);
MarkedArgumentBuffer args;
args.append(jsString(globalObject->vm(), signalNameIdentifier.string()));
args.append(jsNumber(signalNumber));

process->wrapped().emitForBindings(signalNameIdentifier, args);
Bun__onSignalForJS(signalNumber, jsCast<Zig::GlobalObject*>(context.jsGlobalObject()));
});
#else

#endif
};

extern "C" void Bun__logUnhandledException(JSC::EncodedJSValue exception);
Expand Down Expand Up @@ -934,7 +939,8 @@ extern "C" void Bun__setChannelRef(GlobalObject* globalObject, bool enabled)
process->scriptExecutionContext()->unrefEventLoop();
}
}

extern "C" void Bun__ensureSignalHandler();
extern "C" void Bun__onPosixSignal(int signalNumber);
static void onDidChangeListeners(EventEmitter& eventEmitter, const Identifier& eventName, bool isAdded)
{
if (eventEmitter.scriptExecutionContext()->isMainThread()) {
Expand Down Expand Up @@ -1056,11 +1062,14 @@ static void onDidChangeListeners(EventEmitter& eventEmitter, const Identifier& e
#endif
};
#if !OS(WINDOWS)
Bun__ensureSignalHandler();
struct sigaction action;
memset(&action, 0, sizeof(struct sigaction));

// Set the handler in the action struct
action.sa_handler = signalHandler;
action.sa_handler = [](int signalNumber) {
Bun__onPosixSignal(signalNumber);
};

// Clear the sa_mask
sigemptyset(&action.sa_mask);
Expand Down
119 changes: 119 additions & 0 deletions src/bun.js/event_loop.zig
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,7 @@ const ServerAllConnectionsClosedTask = @import("./api/server.zig").ServerAllConn
// Task.get(ReadFileTask) -> ?ReadFileTask
pub const Task = TaggedPointerUnion(.{
FetchTasklet,
PosixSignalTask,
AsyncGlobWalkTask,
AsyncTransformTask,
ReadFileTask,
Expand Down Expand Up @@ -782,6 +783,19 @@ pub const EventLoop = struct {
entered_event_loop_count: isize = 0,
concurrent_ref: std.atomic.Value(i32) = std.atomic.Value(i32).init(0),

signal_handler: if (Environment.isPosix) ?*PosixSignalHandle else void = if (Environment.isPosix) null else {},

pub export fn Bun__ensureSignalHandler() void {
if (Environment.isPosix) {
const vm = JSC.VirtualMachine.getMainThreadVM();
const this = vm.eventLoop();
if (this.signal_handler == null) {
this.signal_handler = PosixSignalHandle.new(.{});
@memset(&this.signal_handler.?.signals, 0);
}
}
}

pub const Debug = if (Environment.isDebug) struct {
is_inside_tick_queue: bool = false,
js_call_count_outside_tick_queue: usize = 0,
Expand Down Expand Up @@ -1256,6 +1270,9 @@ pub const EventLoop = struct {
var any: *bun.bundle_v2.DeferredBatchTask = task.get(bun.bundle_v2.DeferredBatchTask).?;
any.runOnJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(PosixSignalTask))) => {
PosixSignalTask.runFromJSThread(@intCast(task.asUintptr()), global);
},

else => {
bun.Output.panic("Unexpected tag: {s}", .{@tagName(task.tag())});
Expand Down Expand Up @@ -1310,6 +1327,12 @@ pub const EventLoop = struct {
pub fn tickConcurrentWithCount(this: *EventLoop) usize {
this.updateCounts();

if (comptime Environment.isPosix) {
if (this.signal_handler) |signal_handler| {
signal_handler.drain(this);
}
}

var concurrent = this.concurrent_tasks.popBatch();
const count = concurrent.count;
if (count == 0)
Expand Down Expand Up @@ -2293,3 +2316,99 @@ pub const EventLoopTaskPtr = union {
js: *ConcurrentTask,
mini: *JSC.AnyTaskWithExtraContext,
};

pub const PosixSignalHandle = struct {
const buffer_size = 8192;

signals: [buffer_size]u8 = undefined,

// Producer index (signal handler writes).
tail: std.atomic.Value(u16) = std.atomic.Value(u16).init(0),
// Consumer index (main thread reads).
head: std.atomic.Value(u16) = std.atomic.Value(u16).init(0),

const log = bun.Output.scoped(.PosixSignalHandle, true);

pub usingnamespace bun.New(@This());

/// Called by the signal handler (single producer).
/// Returns `true` if enqueued successfully, or `false` if the ring is full.
pub fn enqueue(this: *PosixSignalHandle, signal: u8) bool {
// Read the current tail and head (Acquire to ensure we have up‐to‐date values).
const old_tail = this.tail.load(.acquire);
const head_val = this.head.load(.acquire);

// Compute the next tail (wrapping around buffer_size).
const next_tail = (old_tail +% 1) % buffer_size;

// Check if the ring is full.
if (next_tail == (head_val % buffer_size)) {
// The ring buffer is full.
// We cannot block or wait here (since we're in a signal handler).
// So we just drop the signal or log if desired.
log("signal queue is full; dropping", .{});
return false;
}

// Store the signal into the ring buffer slot (Release to ensure data is visible).
@atomicStore(u8, &this.signals[old_tail % buffer_size], signal, .release);

// Publish the new tail (Release so that the consumer sees the updated tail).
this.tail.store(old_tail +% 1, .release);

JSC.VirtualMachine.getMainThreadVM().eventLoop().wakeup();

return true;
}

/// This is the signal handler entry point. Calls enqueue on the ring buffer.
/// Note: Must be minimal logic here. Only do atomics & signal‐safe calls.
export fn Bun__onPosixSignal(number: i32) void {
const vm = JSC.VirtualMachine.getMainThreadVM();
_ = vm.eventLoop().signal_handler.?.enqueue(@intCast(number));
}

/// Called by the main thread (single consumer).
/// Returns `null` if the ring is empty, or the next signal otherwise.
pub fn dequeue(this: *PosixSignalHandle) ?u8 {
// Read the current head and tail.
const old_head = this.head.load(.acquire);
const tail_val = this.tail.load(.acquire);

// If head == tail, the ring is empty.
if (old_head == tail_val) {
return null; // No available items
}

const slot_index = old_head % buffer_size;
// Acquire load of the stored signal to get the item.
const signal = @atomicRmw(u8, &this.signals[slot_index], .Xchg, 0, .acq_rel);

// Publish the updated head (Release).
this.head.store(old_head +% 1, .release);

return signal;
}

/// Drain as many signals as possible and enqueue them as tasks in the event loop.
/// Called by the main thread.
pub fn drain(this: *PosixSignalHandle, event_loop: *JSC.EventLoop) void {
while (this.dequeue()) |signal| {
// Example: wrap the signal into a Task structure
var posix_signal_task: PosixSignalTask = undefined;
var task = JSC.Task.init(&posix_signal_task);
task.setUintptr(signal);
event_loop.enqueueTask(task);
}
}
};

pub const PosixSignalTask = struct {
number: u8,
extern "C" fn Bun__onSignalForJS(number: i32, globalObject: *JSC.JSGlobalObject) void;

pub usingnamespace bun.New(@This());
pub fn runFromJSThread(number: u8, globalObject: *JSC.JSGlobalObject) void {
Bun__onSignalForJS(number, globalObject);
}
};
10 changes: 9 additions & 1 deletion src/bun.js/javascript.zig
Original file line number Diff line number Diff line change
Expand Up @@ -972,6 +972,7 @@ pub const VirtualMachine = struct {
pub const VMHolder = struct {
pub threadlocal var vm: ?*VirtualMachine = null;
pub threadlocal var cached_global_object: ?*JSGlobalObject = null;
pub var main_thread_vm: *VirtualMachine = undefined;
pub export fn Bun__setDefaultGlobalObject(global: *JSGlobalObject) void {
if (vm) |vm_instance| {
vm_instance.global = global;
Expand All @@ -994,6 +995,10 @@ pub const VirtualMachine = struct {
return VMHolder.vm.?;
}

pub fn getMainThreadVM() *VirtualMachine {
return VMHolder.main_thread_vm;
}

pub fn mimeType(this: *VirtualMachine, str: []const u8) ?bun.http.MimeType {
return this.rareData().mimeTypeFromString(this.allocator, str);
}
Expand Down Expand Up @@ -1957,6 +1962,7 @@ pub const VirtualMachine = struct {

graph: ?*bun.StandaloneModuleGraph = null,
debugger: bun.CLI.Command.Debugger = .{ .unspecified = {} },
is_main_thread: bool = false,
};

pub var is_smol_mode = false;
Expand All @@ -1982,7 +1988,9 @@ pub const VirtualMachine = struct {
opts.env_loader,
);
var vm = VMHolder.vm.?;

if (opts.is_main_thread) {
VMHolder.main_thread_vm = vm;
}
vm.* = VirtualMachine{
.global = undefined,
.transpiler_store = RuntimeTranspilerStore.init(),
Expand Down
2 changes: 2 additions & 0 deletions src/bun_js.zig
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ pub const Run = struct {
.log = ctx.log,
.args = ctx.args,
.graph = graph_ptr,
.is_main_thread = true,
}),
.arena = arena,
.ctx = ctx,
Expand Down Expand Up @@ -198,6 +199,7 @@ pub const Run = struct {
.smol = ctx.runtime_options.smol,
.eval = ctx.runtime_options.eval.eval_and_print,
.debugger = ctx.runtime_options.debugger,
.is_main_thread = true,
},
),
.arena = arena,
Expand Down
1 change: 1 addition & 0 deletions src/cli/test_command.zig
Original file line number Diff line number Diff line change
Expand Up @@ -1261,6 +1261,7 @@ pub const TestCommand = struct {
.store_fd = true,
.smol = ctx.runtime_options.smol,
.debugger = ctx.runtime_options.debugger,
.is_main_thread = true,
},
);
vm.argv = ctx.passthrough;
Expand Down
8 changes: 8 additions & 0 deletions src/tagged_pointer.zig
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,14 @@ pub fn TaggedPointerUnion(comptime Types: anytype) type {
return this.repr.get(Type);
}

pub inline fn setUintptr(this: *This, value: AddressableSize) void {
this.repr._ptr = value;
}

pub inline fn asUintptr(this: This) AddressableSize {
return this.repr._ptr;
}

pub inline fn is(this: This, comptime Type: type) bool {
comptime assert_type(Type);
return this.repr.data == comptime @intFromEnum(@field(Tag, typeBaseName(@typeName(Type))));
Expand Down
Loading

0 comments on commit 20f9cf0

Please sign in to comment.