From e76c1573e01ca99e33a4533fa38b940cafcaee60 Mon Sep 17 00:00:00 2001 From: Julian P Samaroo Date: Tue, 25 Apr 2023 10:44:35 -0700 Subject: [PATCH] signal handling: User-defined interrupt handlers Interrupt handling is a tricky problem, not just in terms of implementation, but in terms of desired behavior: when an interrupt is received, which code should handle it? Julia's current answer to this is effectively to throw an `InterruptException` to the first task to hit a safepoint. While this seems sensible (the code that's running gets interrupted), it only really works for very basic numerical code. In the case that multiple tasks are running concurrently, or when try-catch handlers are registered, this system breaks down, and results in unpredictable behavior. This unpredictable behavior includes: - Interrupting background/runtime tasks which don't want to be interrupted, as they do little bits of important work (and are critical to library runtime functionality) - Interrupting only one task, when multiple coordinating tasks would want to receive the interrupt to safely terminate a computation - Interrupting only one library's task, when multiple libraries really would want to be notified about the interrupt The above behavior makes it nearly impossible to provide reliable Ctrl-C behavior, and results in very confused users who get stuck hitting Ctrl-C continuously, sometimes getting caught in a hang, sometimes triggering unrelated exception handling code they didn't mean to, sometimes getting a segfault, and very rarely getting the behavior they desire (with unpredictable safety of being able to continue using the active session as intended). This commit provides an alternative behavior for interrupts which is more predictable: user code may now register tasks as "interrupt handlers" (via `Base.register_interrupt_handler`), which will be guaranteed to receive an `InterruptException` whenever the session receives an interrupt signal. Additionally, unlike the previous behavior, no other tasks will receive `InterruptException`s; only explicitly registered handlers may receive them. This behavior allows one or more libraries to register handler tasks which will all be concurrently awoken to handle each interrupt and do whatever is necessary to safely interrupt any running code; the extent to which other tasks are interrupted is arbitrary and library-defined. For example, GPU libraries like AMDGPU.jl can register a handler to safely interrupt GPU kernels running on all GPU queues and do resource cleanup. Concurrently, a complex runtime like the scheduler in Dagger.jl can register a handler to interrupt running tasks on other workers when possible. This commit also adds a more convenient interface for when the REPL is running. When a Ctrl-C is received and the user is not at the REPL prompt, a TerminalMenus-powered prompt will be shown, where the user will have a variety of possible actions, including: - Ignore the interrupt (do nothing) - Activate all module's interrupt handlers - Activate a specific module's interrupt handlers - Disable the interrupt handler (reverting to the old Ctrl-C behavior) - Exit Julia gracefully (with `exit()`) - Exit Julia forcefully (with a `ccall` to `abort`) --- base/Base.jl | 1 + base/client.jl | 3 + base/interrupts.jl | 232 ++++++++++++++++++++++++++++++++++++++ src/gc.c | 1 + src/jl_exported_data.inc | 1 + src/jl_exported_funcs.inc | 1 + src/julia_threads.h | 2 + src/signal-handling.c | 30 +++++ src/signals-unix.c | 8 +- src/signals-win.c | 3 +- src/task.c | 7 +- src/threading.h | 2 + test/stress.jl | 25 ++++ 13 files changed, 311 insertions(+), 5 deletions(-) create mode 100644 base/interrupts.jl diff --git a/base/Base.jl b/base/Base.jl index ecc0f0e5522ede..548ee3e370675a 100644 --- a/base/Base.jl +++ b/base/Base.jl @@ -465,6 +465,7 @@ include("timing.jl") include("util.jl") include("client.jl") include("asyncmap.jl") +include("interrupts.jl") # deprecated functions include("deprecated.jl") diff --git a/base/client.jl b/base/client.jl index 6e30c9991e45ef..6f2a277e61dc1f 100644 --- a/base/client.jl +++ b/base/client.jl @@ -331,6 +331,8 @@ function exec_options(opts) banner = (opts.banner == 1) # --banner=yes end run_main_repl(interactiveinput, quiet, banner, history_file, color_set) + else + start_simple_interrupt_handler() end nothing end @@ -414,6 +416,7 @@ function run_main_repl(interactive::Bool, quiet::Bool, banner::Bool, history_fil if interactive && isassigned(REPL_MODULE_REF) invokelatest(REPL_MODULE_REF[]) do REPL + start_repl_interrupt_handler() term_env = get(ENV, "TERM", @static Sys.iswindows() ? "" : "dumb") term = REPL.Terminals.TTYTerminal(term_env, stdin, stdout, stderr) banner && Base.banner(term) diff --git a/base/interrupts.jl b/base/interrupts.jl new file mode 100644 index 00000000000000..3a3042ff7284a5 --- /dev/null +++ b/base/interrupts.jl @@ -0,0 +1,232 @@ +# Internal methods, only to be used to change to a different global interrupt handler +function _register_global_interrupt_handler(handler::Task) + handler_ptr = Base.pointer_from_objref(handler) + slot_ptr = cglobal(:jl_interrupt_handler, Ptr{Cvoid}) + Intrinsics.atomic_pointerset(slot_ptr, handler_ptr, :release) +end +function _unregister_global_interrupt_handler() + slot_ptr = cglobal(:jl_interrupt_handler, Ptr{Cvoid}) + Intrinsics.atomic_pointerset(slot_ptr, C_NULL, :release) +end + +const INTERRUPT_HANDLERS_LOCK = Threads.ReentrantLock() +const INTERRUPT_HANDLERS = Dict{Module,Vector{Task}}() +const INTERRUPT_HANDLER_RUNNING = Threads.Atomic{Bool}(false) + +""" + register_interrupt_handler(mod::Module, handler::Task) + +Registers the task `handler` to handle interrupts (such as from Ctrl-C). +Handlers are expected to sit idly within a `wait()` call or similar. When an +interrupt is received by Ctrl-C or manual SIGINT, one of two actions may +happen: + +If the REPL is not running (such as when running `julia myscript.jl`), then all +registered interrupt handlers will be woken with an `InterruptException()`, and +the handler may take whatever actions are necessary to gracefully interrupt any +associated running computations. It is expected that the handler will spawn +tasks to perform the graceful interrupt, so that the handler task may return +quickly to again calling `wait()` to catch future user interrupts. + +If the REPL is running, then the user will be presented with a terminal menu +which will allow them to do one of: +- Ignore the interrupt (do nothing) +- Activate all handlers for all modules +- Activate all handlers for a specific module +- Disable this interrupt handler logic (see below for details) +- Exit Julia gracefully (with `exit`) +- Exit Julia forcefully (with a `ccall` to `abort`) + +Note that if the interrupt handler logic is disabled by the above menu option, +Julia will fall back to the old Ctrl-C handling behavior, which has the +potential to cause crashes and undefined behavior (but can also interrupt more +kinds of code). If desired, the interrupt handler logic can be re-enabled by +calling `start_repl_interrupt_handler()`, which will disable the old Ctrl-C +handling behavior. + +To unregister a previously-registered handler, use +[`unregister_interrupt_handler`](@ref). + +!!! warn + Non-yielding tasks may block interrupt handlers from running; this means + that once an interrupt handler is registered, code like `while true end` + may become un-interruptible. +""" +function register_interrupt_handler(mod::Module, handler::Task) + if ccall(:jl_generating_output, Cint, ()) == 1 + throw(ConcurrencyViolationError("Interrupt handlers cannot be registered during precompilation.\nPlease register your handler later (possibly in your module's `__init__`).")) + end + lock(INTERRUPT_HANDLERS_LOCK) do + handlers = get!(Vector{Task}, INTERRUPT_HANDLERS, mod) + push!(handlers, handler) + end +end + +""" + unregister_interrupt_handler(mod::Module, handler::Task) + +Unregisters the interrupt handler task `handler`; see +[`register_interrupt_handler`](@ref) for further details. +""" +function unregister_interrupt_handler(mod::Module, handler::Task) + if ccall(:jl_generating_output, Cint, ()) == 1 + throw(ConcurrencyViolationError("Interrupt handlers cannot be unregistered during precompilation.")) + end + lock(INTERRUPT_HANDLERS_LOCK) do + handlers = get!(Vector{Task}, INTERRUPT_HANDLERS, mod) + deleteat!(handlers, findall(==(handler), handlers)) + end +end + +function _throwto_interrupt!(task::Task) + if task.state == :runnable + task._isexception = true + task.result = InterruptException() + schedule(task) + end +end + +function repl_interrupt_handler() + invokelatest(REPL_MODULE_REF[]) do REPL + TerminalMenus = REPL.TerminalMenus + + root_menu = TerminalMenus.RadioMenu( + [ + "Interrupt all", + "Interrupt only...", + "Ignore it", + "Stop handling interrupts", + "Exit Julia", + "Force-exit Julia", + ] + ) + + while true + try + # Wait to be interrupted + wait() + catch err + if !(err isa InterruptException) + rethrow(err) + end + + if length(lock(()->length(INTERRUPT_HANDLERS), INTERRUPT_HANDLERS_LOCK)) == 0 + println("No interrupt handlers were registered, ignoring interrupt...") + continue + end + + # Display root menu + @label display_root + choice = TerminalMenus.request("Interrupt received, select an action:", root_menu) + if choice == 1 + lock(INTERRUPT_HANDLERS_LOCK) do + for mod in keys(INTERRUPT_HANDLERS) + for handler in INTERRUPT_HANDLERS[mod] + if handler.state == :runnable + _throwto_interrupt!(handler) + end + end + end + end + elseif choice == 2 + # Display modules menu + mods = lock(INTERRUPT_HANDLERS_LOCK) do + collect(keys(INTERRUPT_HANDLERS)) + end + length(mods) > 0 || continue + mod_menu = TerminalMenus.RadioMenu(vcat(map(string, mods), "Go Back")) + @label display_mods + choice = TerminalMenus.request("Select a library to interrupt:", mod_menu) + if choice > length(mods) || choice == -1 + @goto display_root + else + lock(INTERRUPT_HANDLERS_LOCK) do + for handler in INTERRUPT_HANDLERS[mods[choice]] + _throwto_interrupt!(handler) + end + end + @goto display_mods + end + elseif choice == 3 || choice == -1 + # Do nothing + elseif choice == 4 + # Exit handler (caller will unregister us) + return + elseif choice == 5 + # Exit Julia cleanly + exit() + elseif choice == 6 + # Force an exit + ccall(:abort, Cvoid, ()) + end + end + end + end +end +function repl_interrupt_handler_checked() + try + repl_interrupt_handler() + catch err + # Some internal error, make sure we start a new handler + Threads.atomic_xchg!(INTERRUPT_HANDLER_RUNNING, false) + _unregister_global_interrupt_handler() + start_repl_interrupt_handler() + rethrow() + end + # Clean exit + Threads.atomic_xchg!(INTERRUPT_HANDLER_RUNNING, false) + _unregister_global_interrupt_handler() +end + +function start_repl_interrupt_handler() + if Threads.atomic_cas!(INTERRUPT_HANDLER_RUNNING, false, true) == false + repl_interrupt_handler_task = errormonitor(Threads.@spawn repl_interrupt_handler_checked()) + _register_global_interrupt_handler(repl_interrupt_handler_task) + end +end + + +function simple_interrupt_handler() + while true + try + # Wait to be interrupted + wait() + catch err + if !(err isa InterruptException) + rethrow(err) + end + + # Interrupt all handlers + lock(INTERRUPT_HANDLERS_LOCK) do + for mod in keys(INTERRUPT_HANDLERS) + for handler in INTERRUPT_HANDLERS[mod] + if handler.state == :runnable + _throwto_interrupt!(handler) + end + end + end + end + end + end +end +function simple_interrupt_handler_checked() + try + simple_interrupt_handler() + catch err + # Some internal error, make sure we start a new handler + Threads.atomic_xchg!(INTERRUPT_HANDLER_RUNNING, false) + _unregister_global_interrupt_handler() + start_simple_interrupt_handler() + rethrow() + end + # Clean exit + Threads.atomic_xchg!(INTERRUPT_HANDLER_RUNNING, false) + _unregister_global_interrupt_handler() +end + +function start_simple_interrupt_handler() + if Threads.atomic_cas!(INTERRUPT_HANDLER_RUNNING, false, true) == false + simple_interrupt_handler_task = errormonitor(Threads.@spawn simple_interrupt_handler_checked()) + _register_global_interrupt_handler(simple_interrupt_handler_task) + end +end diff --git a/src/gc.c b/src/gc.c index 5390b08cc9e8cf..50b7c1fe91f432 100644 --- a/src/gc.c +++ b/src/gc.c @@ -2990,6 +2990,7 @@ static void gc_mark_roots(jl_gc_markqueue_t *mq) gc_try_claim_and_push(mq, jl_emptytuple_type, NULL); gc_try_claim_and_push(mq, cmpswap_names, NULL); gc_try_claim_and_push(mq, jl_global_roots_table, NULL); + gc_try_claim_and_push(mq, jl_interrupt_handler, NULL); } // find unmarked objects that need to be finalized from the finalizer list "list". diff --git a/src/jl_exported_data.inc b/src/jl_exported_data.inc index 9c1a454020406f..49c4da47fc3f8b 100644 --- a/src/jl_exported_data.inc +++ b/src/jl_exported_data.inc @@ -54,6 +54,7 @@ XX(jl_int8_type) \ XX(jl_interconditional_type) \ XX(jl_interrupt_exception) \ + XX(jl_interrupt_handler) \ XX(jl_intrinsic_type) \ XX(jl_kwcall_func) \ XX(jl_lineinfonode_type) \ diff --git a/src/jl_exported_funcs.inc b/src/jl_exported_funcs.inc index 7d54d13d699d01..855f150cd9464e 100644 --- a/src/jl_exported_funcs.inc +++ b/src/jl_exported_funcs.inc @@ -514,6 +514,7 @@ XX(jl_uncompress_ir) \ XX(jl_undefined_var_error) \ XX(jl_has_no_field_error) \ + XX(jl_unregister_interrupt_handler) \ XX(jl_value_ptr) \ XX(jl_ver_is_release) \ XX(jl_ver_major) \ diff --git a/src/julia_threads.h b/src/julia_threads.h index d4cbb88e619ba7..219c79992522ea 100644 --- a/src/julia_threads.h +++ b/src/julia_threads.h @@ -377,6 +377,8 @@ JL_DLLEXPORT int8_t jl_gc_is_in_finalizer(void); JL_DLLEXPORT void jl_wakeup_thread(int16_t tid); +JL_DLLEXPORT void jl_schedule_task(struct _jl_task_t *task); + #ifdef __cplusplus } #endif diff --git a/src/signal-handling.c b/src/signal-handling.c index 284ad359f3799e..079ce413fc1b60 100644 --- a/src/signal-handling.c +++ b/src/signal-handling.c @@ -303,6 +303,36 @@ static void jl_check_profile_autostop(void) } } +// Graceful interrupt handler + +JL_DLLEXPORT _Atomic(jl_task_t *) jl_interrupt_handler JL_GLOBALLY_ROOTED = NULL; +static _Atomic(int) handle_interrupt = 0; +JL_DLLEXPORT void jl_schedule_interrupt_handler(void) +{ + if (jl_atomic_exchange_relaxed(&handle_interrupt, 0) != 1) + return; + jl_task_t *handler = jl_atomic_load_relaxed(&jl_interrupt_handler); + if (!handler) + return; + assert(jl_is_task(handler)); + if (handler->ptls) + return; + if (jl_atomic_load_relaxed(&handler->_state) != JL_TASK_STATE_RUNNABLE) + return; + handler->result = jl_interrupt_exception; + handler->_isexception = 1; + jl_schedule_task(handler); +} +static int want_interrupt_handler(void) +{ + if (jl_atomic_load_relaxed(&jl_interrupt_handler)) { + // Set flag to trigger user handlers on next task switch + jl_atomic_store_relaxed(&handle_interrupt, 1); + return 1; + } + return 0; +} + #if defined(_WIN32) #include "signals-win.c" #else diff --git a/src/signals-unix.c b/src/signals-unix.c index 4c21d25d3622c3..65b547a990cf11 100644 --- a/src/signals-unix.c +++ b/src/signals-unix.c @@ -527,9 +527,9 @@ void usr2_handler(int sig, siginfo_t *info, void *ctx) int force = jl_check_force_sigint(); if (force || (!ptls->defer_signal && ptls->io_wait)) { jl_safepoint_consume_sigint(); + // Force a throw if (force) jl_safe_printf("WARNING: Force throwing a SIGINT\n"); - // Force a throw jl_clear_force_sigint(); jl_throw_in_ctx(ct, jl_interrupt_exception, sig, ctx); } @@ -767,7 +767,7 @@ static void *signal_listener(void *arg) profile = (sig == SIGUSR1); #if defined(_POSIX_C_SOURCE) && _POSIX_C_SOURCE >= 199309L if (profile && !(info.si_code == SI_TIMER && - info.si_value.sival_ptr == &timerprof)) + info.si_value.sival_ptr == &timerprof)) profile = 0; #endif #endif @@ -780,6 +780,10 @@ static void *signal_listener(void *arg) else if (exit_on_sigint) { critical = 1; } + // FIXME: Skip this if force + else if (want_interrupt_handler()) { + continue; + } else { jl_try_deliver_sigint(); continue; diff --git a/src/signals-win.c b/src/signals-win.c index 5dd6b34558ca6d..70bd48d1f47cd0 100644 --- a/src/signals-win.c +++ b/src/signals-win.c @@ -221,7 +221,8 @@ static BOOL WINAPI sigint_handler(DWORD wsig) //This needs winapi types to guara if (!jl_ignore_sigint()) { if (exit_on_sigint) jl_exit(128 + sig); // 128 + SIGINT - jl_try_deliver_sigint(); + if (!want_interrupt_handler()) + jl_try_deliver_sigint(); } return 1; } diff --git a/src/task.c b/src/task.c index 1dab8688cb0796..f579d8090d0616 100644 --- a/src/task.c +++ b/src/task.c @@ -621,8 +621,12 @@ JL_NO_ASAN static void ctx_switch(jl_task_t *lastt) sanitizer_finish_switch_fiber(ptls->previous_task, jl_atomic_load_relaxed(&ptls->current_task)); } +JL_DLLIMPORT void jl_schedule_interrupt_handler(void); + JL_DLLEXPORT void jl_switch(void) JL_NOTSAFEPOINT_LEAVE JL_NOTSAFEPOINT_ENTER { + jl_schedule_interrupt_handler(); + jl_task_t *ct = jl_current_task; jl_ptls_t ptls = ct->ptls; jl_task_t *t = ptls->next_task; @@ -1140,7 +1144,7 @@ JL_DLLEXPORT void jl_task_wait() jl_apply(&wait_func, 1); ct->world_age = last_age; } - +#endif JL_DLLEXPORT void jl_schedule_task(jl_task_t *task) { static jl_function_t *sched_func = NULL; @@ -1154,7 +1158,6 @@ JL_DLLEXPORT void jl_schedule_task(jl_task_t *task) jl_apply(args, 2); ct->world_age = last_age; } -#endif // Do one-time initializations for task system void jl_init_tasks(void) JL_GC_DISABLED diff --git a/src/threading.h b/src/threading.h index 73d2cd73fb70d2..562880d017b8ba 100644 --- a/src/threading.h +++ b/src/threading.h @@ -29,6 +29,8 @@ void jl_gc_mark_threadfun(void *arg); void jl_gc_sweep_threadfun(void *arg); void jl_threadfun(void *arg); +extern _Atomic(jl_task_t*) jl_interrupt_handler JL_GLOBALLY_ROOTED; + #ifdef __cplusplus } #endif diff --git a/test/stress.jl b/test/stress.jl index b9fb720f0596ae..031853f03b9a74 100644 --- a/test/stress.jl +++ b/test/stress.jl @@ -77,6 +77,10 @@ end # !Sys.iswindows # sig 2 is SIGINT per the POSIX.1-1990 standard if !Sys.iswindows() Base.exit_on_sigint(false) + + # test old interrupt behavior + Base._unregister_global_interrupt_handler() + Base.INTERRUPT_HANDLER_RUNNING[] = false @test_throws InterruptException begin ccall(:kill, Cvoid, (Cint, Cint,), getpid(), 2) for i in 1:10 @@ -84,5 +88,26 @@ if !Sys.iswindows() ccall(:jl_gc_safepoint, Cvoid, ()) # wait for SIGINT to arrive end end + + # interrupt handlers + Base.start_simple_interrupt_handler() + let exc_ref = Ref{Any}() + handler = Threads.@spawn begin + try + wait() + catch exc + exc_ref[] = exc + end + end + yield() # let the handler start + Base.register_interrupt_handler(Base, handler) + ccall(:kill, Cvoid, (Cint, Cint,), getpid(), 2) + for i in 1:10 + Libc.systemsleep(0.1) + yield() # wait for the handler to be run + end + Base.unregister_interrupt_handler(Base, handler) + @test isassigned(exc_ref) && exc_ref[] isa InterruptException + end Base.exit_on_sigint(true) end