Skip to content

Commit

Permalink
signal handling: User-defined interrupt handlers
Browse files Browse the repository at this point in the history
Interrupt handling is a tricky problem, not just in terms of
implementation, but in terms of desired behavior: when an interrupt is
received, which code should handle it? Julia's current answer to this is
effectively to throw an `InterruptException` to the first task to hit a
safepoint. While this seems sensible (the code that's running gets
interrupted), it only really works for very basic numerical code. In the case that
multiple tasks are running concurrently, or when try-catch handlers are
registered, this system breaks down, and results in unpredictable
behavior. This unpredictable behavior includes:

- Interrupting background/runtime tasks which don't want to be
  interrupted, as they do little bits of important work (and are
  critical to library runtime functionality)
- Interrupting only one task, when multiple coordinating tasks would
  want to receive the interrupt to safely terminate a computation
- Interrupting only one library's task, when multiple libraries really
  would want to be notified about the interrupt

The above behavior makes it nearly impossible to provide reliable Ctrl-C
behavior, and results in very confused users who get stuck hitting
Ctrl-C continuously, sometimes getting caught in a hang, sometimes
triggering unrelated exception handling code they didn't mean to,
sometimes getting a segfault, and very rarely getting the behavior they
desire (with unpredictable safety of being able to continue using the
active session as intended).

This commit provides an alternative behavior for interrupts which is
more predictable: user code may now register tasks as "interrupt
handlers" (via `Base.register_interrupt_handler`), which will be
guaranteed to receive an `InterruptException` whenever the session
receives an interrupt signal. Additionally, unlike the previous
behavior, no other tasks will receive `InterruptException`s; only
explicitly registered handlers may receive them.

This behavior allows one or more libraries to register handler tasks which
will all be concurrently awoken to handle each interrupt and do whatever
is necessary to safely interrupt any running code; the extent to which
other tasks are interrupted is arbitrary and library-defined. For
example, GPU libraries like AMDGPU.jl can register a handler to safely
interrupt GPU kernels running on all GPU queues and do resource cleanup.
Concurrently, a complex runtime like the scheduler in Dagger.jl can
register a handler to interrupt running tasks on other workers when
possible.

This commit also adds a more convenient interface for when the REPL is
running. When a Ctrl-C is received and the user is not at the REPL
prompt, a TerminalMenus-powered prompt will be shown, where the user
will have a variety of possible actions, including:
- Ignore the interrupt (do nothing)
- Activate all module's interrupt handlers
- Activate a specific module's interrupt handlers
- Disable the interrupt handler (reverting to the old Ctrl-C behavior)
- Exit Julia gracefully (with `exit()`)
- Exit Julia forcefully (with a `ccall` to `abort`)
  • Loading branch information
jpsamaroo committed Aug 4, 2023
1 parent d64d336 commit 2a087a7
Show file tree
Hide file tree
Showing 14 changed files with 338 additions and 8 deletions.
1 change: 1 addition & 0 deletions base/Base.jl
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,7 @@ include("timing.jl")
include("util.jl")
include("client.jl")
include("asyncmap.jl")
include("interrupts.jl")

# deprecated functions
include("deprecated.jl")
Expand Down
5 changes: 5 additions & 0 deletions base/client.jl
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,9 @@ function exec_options(opts)
# remove filename from ARGS
global PROGRAM_FILE = arg_is_program ? popfirst!(ARGS) : ""

# start basic interrupt handler
start_simple_interrupt_handler(;force=true)

# Load Distributed module only if any of the Distributed options have been specified.
distributed_mode = (opts.worker == 1) || (opts.nprocs > 0) || (opts.machine_file != C_NULL)
if distributed_mode
Expand Down Expand Up @@ -324,6 +327,7 @@ function exec_options(opts)
end
end
end

if repl || is_interactive::Bool
if interactiveinput
banner = (opts.banner != 0) # --banner!=no
Expand Down Expand Up @@ -414,6 +418,7 @@ function run_main_repl(interactive::Bool, quiet::Bool, banner::Bool, history_fil

if interactive && isassigned(REPL_MODULE_REF)
invokelatest(REPL_MODULE_REF[]) do REPL
start_repl_interrupt_handler(;force=true)
term_env = get(ENV, "TERM", @static Sys.iswindows() ? "" : "dumb")
term = REPL.Terminals.TTYTerminal(term_env, stdin, stdout, stderr)
banner && Base.banner(term)
Expand Down
243 changes: 243 additions & 0 deletions base/interrupts.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
# Internal methods, only to be used to change to a different global interrupt handler
function _register_global_interrupt_handler(handler::Task)
handler_ptr = Base.pointer_from_objref(handler)
slot_ptr = cglobal(:jl_interrupt_handler, Ptr{Cvoid})
Intrinsics.atomic_pointerset(slot_ptr, handler_ptr, :release)
end
function _unregister_global_interrupt_handler()
slot_ptr = cglobal(:jl_interrupt_handler, Ptr{Cvoid})
Intrinsics.atomic_pointerset(slot_ptr, C_NULL, :release)
end

const INTERRUPT_HANDLERS_LOCK = Threads.ReentrantLock()
const INTERRUPT_HANDLERS = Dict{Module,Vector{Task}}()
const INTERRUPT_HANDLER_RUNNING = Threads.Atomic{Bool}(false)

"""
register_interrupt_handler(mod::Module, handler::Task)
Registers the task `handler` to handle interrupts (such as from Ctrl-C).
Handlers are expected to sit idly within a `wait()` call or similar. When an
interrupt is received by Ctrl-C or manual SIGINT, one of two actions may
happen:
If the REPL is not running (such as when running `julia myscript.jl`), then all
registered interrupt handlers will be woken with an `InterruptException()`, and
the handler may take whatever actions are necessary to gracefully interrupt any
associated running computations. It is expected that the handler will spawn
tasks to perform the graceful interrupt, so that the handler task may return
quickly to again calling `wait()` to catch future user interrupts.
If the REPL is running, then the user will be presented with a terminal menu
which will allow them to do one of:
- Ignore the interrupt (do nothing)
- Activate all handlers for all modules
- Activate all handlers for a specific module
- Disable this interrupt handler logic (see below for details)
- Exit Julia gracefully (with `exit`)
- Exit Julia forcefully (with a `ccall` to `abort`)
Note that if the interrupt handler logic is disabled by the above menu option,
Julia will fall back to the old Ctrl-C handling behavior, which has the
potential to cause crashes and undefined behavior (but can also interrupt more
kinds of code). If desired, the interrupt handler logic can be re-enabled by
calling `start_repl_interrupt_handler()`, which will disable the old Ctrl-C
handling behavior.
To unregister a previously-registered handler, use
[`unregister_interrupt_handler`](@ref).
!!! warn
Non-yielding tasks may block interrupt handlers from running; this means
that once an interrupt handler is registered, code like `while true end`
may become un-interruptible.
"""
function register_interrupt_handler(mod::Module, handler::Task)
if ccall(:jl_generating_output, Cint, ()) == 1
throw(ConcurrencyViolationError("Interrupt handlers cannot be registered during precompilation.\nPlease register your handler later (possibly in your module's `__init__`)."))
end
lock(INTERRUPT_HANDLERS_LOCK) do
handlers = get!(Vector{Task}, INTERRUPT_HANDLERS, mod)
push!(handlers, handler)
end
end

"""
unregister_interrupt_handler(mod::Module, handler::Task)
Unregisters the interrupt handler task `handler`; see
[`register_interrupt_handler`](@ref) for further details.
"""
function unregister_interrupt_handler(mod::Module, handler::Task)
if ccall(:jl_generating_output, Cint, ()) == 1
throw(ConcurrencyViolationError("Interrupt handlers cannot be unregistered during precompilation."))
end
lock(INTERRUPT_HANDLERS_LOCK) do
handlers = get!(Vector{Task}, INTERRUPT_HANDLERS, mod)
deleteat!(handlers, findall(==(handler), handlers))
end
end

function _throwto_interrupt!(task::Task)
if task.state == :runnable
task._isexception = true
task.result = InterruptException()
try
schedule(task)
catch
end
end
end

# Simple (no TUI) interrupt handler

function simple_interrupt_handler()
last_time = 0.0
while true
try
# Wait to be interrupted
wait()
catch err
if !(err isa InterruptException)
rethrow(err)
end

# Force-interrupt root task if two interrupts in quick succession (< 1s)
now_time = time()
diff_time = now_time - last_time
last_time = now_time
if diff_time < 1
_throwto_interrupt!(Base.roottask)
end

# Interrupt all handlers
lock(INTERRUPT_HANDLERS_LOCK) do
for mod in keys(INTERRUPT_HANDLERS)
for handler in INTERRUPT_HANDLERS[mod]
if handler.state == :runnable
_throwto_interrupt!(handler)
end
end
end
end
end
end
end
function simple_interrupt_handler_checked()
try
simple_interrupt_handler()
catch err
# Some internal error, make sure we start a new handler
Threads.atomic_xchg!(INTERRUPT_HANDLER_RUNNING, false)
_unregister_global_interrupt_handler()
start_simple_interrupt_handler()
rethrow()
end
# Clean exit
Threads.atomic_xchg!(INTERRUPT_HANDLER_RUNNING, false)
_unregister_global_interrupt_handler()
end
function start_simple_interrupt_handler(; force::Bool=false)
if (Threads.atomic_cas!(INTERRUPT_HANDLER_RUNNING, false, true) == false) || force
simple_interrupt_handler_task = errormonitor(Threads.@spawn simple_interrupt_handler_checked())
_register_global_interrupt_handler(simple_interrupt_handler_task)
end
end

# REPL (TUI) interrupt handler

function repl_interrupt_handler()
invokelatest(REPL_MODULE_REF[]) do REPL
TerminalMenus = REPL.TerminalMenus

root_menu = TerminalMenus.RadioMenu(
[
"Interrupt all",
"Interrupt only...",
"Interrupt root task (REPL/script)",
"Ignore it",
"Stop handling interrupts",
"Exit Julia",
"Force-exit Julia",
]
)

while true
try
# Wait to be interrupted
wait()
catch err
if !(err isa InterruptException)
rethrow(err)
end

# Display root menu
@label display_root
choice = TerminalMenus.request("Interrupt received, select an action:", root_menu)
if choice == 1
lock(INTERRUPT_HANDLERS_LOCK) do
for mod in keys(INTERRUPT_HANDLERS)
for handler in INTERRUPT_HANDLERS[mod]
if handler.state == :runnable
_throwto_interrupt!(handler)
end
end
end
end
elseif choice == 2
# Display modules menu
mods = lock(INTERRUPT_HANDLERS_LOCK) do
collect(keys(INTERRUPT_HANDLERS))
end
mod_menu = TerminalMenus.RadioMenu(vcat(map(string, mods), "Go Back"))
@label display_mods
choice = TerminalMenus.request("Select a library to interrupt:", mod_menu)
if choice > length(mods) || choice == -1
@goto display_root
else
lock(INTERRUPT_HANDLERS_LOCK) do
for handler in INTERRUPT_HANDLERS[mods[choice]]
_throwto_interrupt!(handler)
end
end
@goto display_mods
end
elseif choice == 3
# Force-interrupt root task
_throwto_interrupt!(Base.roottask)
elseif choice == 4 || choice == -1
# Do nothing
elseif choice == 5
# Exit handler (caller will unregister us)
return
elseif choice == 6
# Exit Julia cleanly
exit()
elseif choice == 7
# Force an exit
ccall(:abort, Cvoid, ())
end
end
end
end
end
function repl_interrupt_handler_checked()
try
repl_interrupt_handler()
catch err
# Some internal error, make sure we start a new handler
Threads.atomic_xchg!(INTERRUPT_HANDLER_RUNNING, false)
_unregister_global_interrupt_handler()
start_repl_interrupt_handler()
rethrow()
end
# Clean exit
Threads.atomic_xchg!(INTERRUPT_HANDLER_RUNNING, false)
_unregister_global_interrupt_handler()
end
function start_repl_interrupt_handler(; force::Bool=false)
if (Threads.atomic_cas!(INTERRUPT_HANDLER_RUNNING, false, true) == false) || force
repl_interrupt_handler_task = errormonitor(Threads.@spawn repl_interrupt_handler_checked())
_register_global_interrupt_handler(repl_interrupt_handler_task)
end
end
1 change: 1 addition & 0 deletions src/gc.c
Original file line number Diff line number Diff line change
Expand Up @@ -2990,6 +2990,7 @@ static void gc_mark_roots(jl_gc_markqueue_t *mq)
gc_try_claim_and_push(mq, jl_emptytuple_type, NULL);
gc_try_claim_and_push(mq, cmpswap_names, NULL);
gc_try_claim_and_push(mq, jl_global_roots_table, NULL);
gc_try_claim_and_push(mq, jl_interrupt_handler, NULL);
}

// find unmarked objects that need to be finalized from the finalizer list "list".
Expand Down
1 change: 1 addition & 0 deletions src/jl_exported_data.inc
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
XX(jl_int8_type) \
XX(jl_interconditional_type) \
XX(jl_interrupt_exception) \
XX(jl_interrupt_handler) \
XX(jl_intrinsic_type) \
XX(jl_kwcall_func) \
XX(jl_lineinfonode_type) \
Expand Down
2 changes: 2 additions & 0 deletions src/jl_exported_funcs.inc
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,7 @@
XX(jl_running_on_valgrind) \
XX(jl_safe_printf) \
XX(jl_SC_CLK_TCK) \
XX(jl_schedule_interrupt_handler) \
XX(jl_set_ARGS) \
XX(jl_set_const) \
XX(jl_set_errno) \
Expand Down Expand Up @@ -522,6 +523,7 @@
XX(jl_ver_string) \
XX(jl_vexceptionf) \
XX(jl_vprintf) \
XX(jl_wake_thread) \
XX(jl_wakeup_thread) \
XX(jl_write_compiler_output) \
XX(jl_yield) \
Expand Down
5 changes: 5 additions & 0 deletions src/julia_threads.h
Original file line number Diff line number Diff line change
Expand Up @@ -375,8 +375,13 @@ JL_DLLEXPORT void jl_gc_run_pending_finalizers(struct _jl_task_t *ct);
extern JL_DLLEXPORT _Atomic(int) jl_gc_have_pending_finalizers;
JL_DLLEXPORT int8_t jl_gc_is_in_finalizer(void);

JL_DLLEXPORT int jl_wake_thread(int16_t tid);
JL_DLLEXPORT void jl_wakeup_thread(int16_t tid);

JL_DLLEXPORT void jl_schedule_task(struct _jl_task_t *task);

JL_DLLEXPORT void jl_schedule_interrupt_handler(void);

#ifdef __cplusplus
}
#endif
Expand Down
8 changes: 5 additions & 3 deletions src/partr.c
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ static int sleep_check_after_threshold(uint64_t *start_cycles)
}


static int wake_thread(int16_t tid) JL_NOTSAFEPOINT
JL_DLLEXPORT int jl_wake_thread(int16_t tid) JL_NOTSAFEPOINT
{
jl_ptls_t other = jl_atomic_load_relaxed(&jl_all_tls_states)[tid];
int8_t state = sleeping;
Expand Down Expand Up @@ -287,7 +287,7 @@ JL_DLLEXPORT void jl_wakeup_thread(int16_t tid) JL_NOTSAFEPOINT
}
else {
// something added to the sticky-queue: notify that thread
if (wake_thread(tid) && uvlock != ct) {
if (jl_wake_thread(tid) && uvlock != ct) {
// check if we need to notify uv_run too
jl_fence();
jl_ptls_t other = jl_atomic_load_relaxed(&jl_all_tls_states)[tid];
Expand All @@ -308,7 +308,7 @@ JL_DLLEXPORT void jl_wakeup_thread(int16_t tid) JL_NOTSAFEPOINT
int nthreads = jl_atomic_load_acquire(&jl_n_threads);
for (tid = 0; tid < nthreads; tid++) {
if (tid != self)
anysleep |= wake_thread(tid);
anysleep |= jl_wake_thread(tid);
}
// check if we need to notify uv_run too
if (uvlock != ct && anysleep) {
Expand Down Expand Up @@ -378,6 +378,8 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q,
uint64_t start_cycles = 0;

while (1) {
jl_schedule_interrupt_handler();

jl_task_t *task = get_next_task(trypoptask, q);
if (task)
return task;
Expand Down
27 changes: 27 additions & 0 deletions src/signal-handling.c
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,33 @@ static void jl_check_profile_autostop(void)
}
}

// Graceful interrupt handler

static _Atomic(int) handle_interrupt = 0;
JL_DLLEXPORT void jl_schedule_interrupt_handler(void)
{
if (jl_atomic_exchange_relaxed(&handle_interrupt, 0) != 1)
return;
jl_task_t *handler = jl_atomic_load_relaxed(&jl_interrupt_handler);
if (!handler)
return;
assert(jl_is_task(handler));
if (jl_atomic_load_relaxed(&handler->_state) != JL_TASK_STATE_RUNNABLE)
return;
handler->result = jl_interrupt_exception;
handler->_isexception = 1;
jl_schedule_task(handler);
}
static int want_interrupt_handler(void)
{
if (jl_atomic_load_relaxed(&jl_interrupt_handler)) {
// Set flag to trigger user handlers on next task switch
jl_atomic_store_relaxed(&handle_interrupt, 1);
return 1;
}
return 0;
}

#if defined(_WIN32)
#include "signals-win.c"
#else
Expand Down
Loading

0 comments on commit 2a087a7

Please sign in to comment.