From 9ddbf8b503d45f1cf67044cbd1a3285cdc8b362a Mon Sep 17 00:00:00 2001 From: Julian P Samaroo Date: Tue, 25 Apr 2023 10:44:35 -0700 Subject: [PATCH] signal handling: User-defined interrupt handlers Interrupt handling is a tricky problem, not just in terms of implementation, but in terms of desired behavior: when an interrupt is received, which code should handle it? Julia's current answer to this is effectively to throw an `InterruptException` to the first task to hit a safepoint. While this seems sensible (the code that's running gets interrupted), it only really works for very basic numerical code. In the case that multiple tasks are running concurrently, or when try-catch handlers are registered, this system breaks down, and results in unpredictable behavior. This unpredictable behavior includes: - Interrupting background/runtime tasks which don't want to be interrupted, as they do little bits of important work (and are critical to library runtime functionality) - Interrupting only one task, when multiple coordinating tasks would want to receive the interrupt to safely terminate a computation - Interrupting only one library's task, when multiple libraries really would want to be notified about the interrupt The above behavior makes it nearly impossible to provide reliable Ctrl-C behavior, and results in very confused users who get stuck hitting Ctrl-C continuously, sometimes getting caught in a hang, sometimes triggering unrelated exception handling code they didn't mean to, sometimes getting a segfault, and very rarely getting the behavior they desire (with unpredictable safety of being able to continue using the active session as intended). This commit provides an alternative behavior for interrupts which is more predictable: user code may now register tasks as "interrupt handlers" (via `Base.register_interrupt_handler`), which will be guaranteed to receive an `InterruptException` whenever the session receives an interrupt signal. Additionally, unlike the previous behavior, no other tasks will receive `InterruptException`s; only explicitly registered handlers may receive them. This behavior allows one or more libraries to register handler tasks which will all be concurrently awoken to handle each interrupt and do whatever is necessary to safely interrupt any running code; the extent to which other tasks are interrupted is arbitrary and library-defined. For example, GPU libraries like AMDGPU.jl can register a handler to safely interrupt GPU kernels running on all GPU queues and do resource cleanup. Concurrently, a complex runtime like the scheduler in Dagger.jl can register a handler to interrupt running tasks on other workers when possible. This commit also adds a more convenient interface for when the REPL is running. When a Ctrl-C is received and the user is not at the REPL prompt, a TerminalMenus-powered prompt will be shown, where the user will have a variety of possible actions, including: - Ignore the interrupt (do nothing) - Activate all module's interrupt handlers - Activate a specific module's interrupt handlers - Disable the interrupt handler (reverting to the old Ctrl-C behavior) - Exit Julia gracefully (with `exit()`) - Exit Julia forcefully (with a `ccall` to `abort`) --- base/Base.jl | 1 + base/client.jl | 5 + base/interrupts.jl | 243 ++++++++++++++++++++++++++++++++++++++ src/gc.c | 1 + src/jl_exported_data.inc | 1 + src/jl_exported_funcs.inc | 2 + src/julia_threads.h | 5 + src/partr.c | 8 +- src/signal-handling.c | 27 +++++ src/signals-unix.c | 18 ++- src/signals-win.c | 3 +- src/task.c | 5 +- src/threading.h | 2 + test/stress.jl | 25 ++++ 14 files changed, 338 insertions(+), 8 deletions(-) create mode 100644 base/interrupts.jl diff --git a/base/Base.jl b/base/Base.jl index fbc39e8104fba..0ad52b04bb25a 100644 --- a/base/Base.jl +++ b/base/Base.jl @@ -465,6 +465,7 @@ include("timing.jl") include("util.jl") include("client.jl") include("asyncmap.jl") +include("interrupts.jl") # deprecated functions include("deprecated.jl") diff --git a/base/client.jl b/base/client.jl index 0b11a330cf179..f4e42591e4aed 100644 --- a/base/client.jl +++ b/base/client.jl @@ -260,6 +260,9 @@ function exec_options(opts) # remove filename from ARGS global PROGRAM_FILE = arg_is_program ? popfirst!(ARGS) : "" + # start basic interrupt handler + start_simple_interrupt_handler(;force=true) + # Load Distributed module only if any of the Distributed options have been specified. distributed_mode = (opts.worker == 1) || (opts.nprocs > 0) || (opts.machine_file != C_NULL) if distributed_mode @@ -323,6 +326,7 @@ function exec_options(opts) end end end + if repl || is_interactive::Bool b = opts.banner auto = b == -1 @@ -413,6 +417,7 @@ function run_main_repl(interactive::Bool, quiet::Bool, banner::Symbol, history_f if interactive && isassigned(REPL_MODULE_REF) invokelatest(REPL_MODULE_REF[]) do REPL + start_repl_interrupt_handler(;force=true) term_env = get(ENV, "TERM", @static Sys.iswindows() ? "" : "dumb") global current_terminfo = load_terminfo(term_env) term = REPL.Terminals.TTYTerminal(term_env, stdin, stdout, stderr) diff --git a/base/interrupts.jl b/base/interrupts.jl new file mode 100644 index 0000000000000..4dc7ba9ed93f7 --- /dev/null +++ b/base/interrupts.jl @@ -0,0 +1,243 @@ +# Internal methods, only to be used to change to a different global interrupt handler +function _register_global_interrupt_handler(handler::Task) + handler_ptr = Base.pointer_from_objref(handler) + slot_ptr = cglobal(:jl_interrupt_handler, Ptr{Cvoid}) + Intrinsics.atomic_pointerset(slot_ptr, handler_ptr, :release) +end +function _unregister_global_interrupt_handler() + slot_ptr = cglobal(:jl_interrupt_handler, Ptr{Cvoid}) + Intrinsics.atomic_pointerset(slot_ptr, C_NULL, :release) +end + +const INTERRUPT_HANDLERS_LOCK = Threads.ReentrantLock() +const INTERRUPT_HANDLERS = Dict{Module,Vector{Task}}() +const INTERRUPT_HANDLER_RUNNING = Threads.Atomic{Bool}(false) + +""" + register_interrupt_handler(mod::Module, handler::Task) + +Registers the task `handler` to handle interrupts (such as from Ctrl-C). +Handlers are expected to sit idly within a `wait()` call or similar. When an +interrupt is received by Ctrl-C or manual SIGINT, one of two actions may +happen: + +If the REPL is not running (such as when running `julia myscript.jl`), then all +registered interrupt handlers will be woken with an `InterruptException()`, and +the handler may take whatever actions are necessary to gracefully interrupt any +associated running computations. It is expected that the handler will spawn +tasks to perform the graceful interrupt, so that the handler task may return +quickly to again calling `wait()` to catch future user interrupts. + +If the REPL is running, then the user will be presented with a terminal menu +which will allow them to do one of: +- Ignore the interrupt (do nothing) +- Activate all handlers for all modules +- Activate all handlers for a specific module +- Disable this interrupt handler logic (see below for details) +- Exit Julia gracefully (with `exit`) +- Exit Julia forcefully (with a `ccall` to `abort`) + +Note that if the interrupt handler logic is disabled by the above menu option, +Julia will fall back to the old Ctrl-C handling behavior, which has the +potential to cause crashes and undefined behavior (but can also interrupt more +kinds of code). If desired, the interrupt handler logic can be re-enabled by +calling `start_repl_interrupt_handler()`, which will disable the old Ctrl-C +handling behavior. + +To unregister a previously-registered handler, use +[`unregister_interrupt_handler`](@ref). + +!!! warn + Non-yielding tasks may block interrupt handlers from running; this means + that once an interrupt handler is registered, code like `while true end` + may become un-interruptible. +""" +function register_interrupt_handler(mod::Module, handler::Task) + if ccall(:jl_generating_output, Cint, ()) == 1 + throw(ConcurrencyViolationError("Interrupt handlers cannot be registered during precompilation.\nPlease register your handler later (possibly in your module's `__init__`).")) + end + lock(INTERRUPT_HANDLERS_LOCK) do + handlers = get!(Vector{Task}, INTERRUPT_HANDLERS, mod) + push!(handlers, handler) + end +end + +""" + unregister_interrupt_handler(mod::Module, handler::Task) + +Unregisters the interrupt handler task `handler`; see +[`register_interrupt_handler`](@ref) for further details. +""" +function unregister_interrupt_handler(mod::Module, handler::Task) + if ccall(:jl_generating_output, Cint, ()) == 1 + throw(ConcurrencyViolationError("Interrupt handlers cannot be unregistered during precompilation.")) + end + lock(INTERRUPT_HANDLERS_LOCK) do + handlers = get!(Vector{Task}, INTERRUPT_HANDLERS, mod) + deleteat!(handlers, findall(==(handler), handlers)) + end +end + +function _throwto_interrupt!(task::Task) + if task.state == :runnable + task._isexception = true + task.result = InterruptException() + try + schedule(task) + catch + end + end +end + +# Simple (no TUI) interrupt handler + +function simple_interrupt_handler() + last_time = 0.0 + while true + try + # Wait to be interrupted + wait() + catch err + if !(err isa InterruptException) + rethrow(err) + end + + # Force-interrupt root task if two interrupts in quick succession (< 1s) + now_time = time() + diff_time = now_time - last_time + last_time = now_time + if diff_time < 1 + _throwto_interrupt!(Base.roottask) + end + + # Interrupt all handlers + lock(INTERRUPT_HANDLERS_LOCK) do + for mod in keys(INTERRUPT_HANDLERS) + for handler in INTERRUPT_HANDLERS[mod] + if handler.state == :runnable + _throwto_interrupt!(handler) + end + end + end + end + end + end +end +function simple_interrupt_handler_checked() + try + simple_interrupt_handler() + catch err + # Some internal error, make sure we start a new handler + Threads.atomic_xchg!(INTERRUPT_HANDLER_RUNNING, false) + _unregister_global_interrupt_handler() + start_simple_interrupt_handler() + rethrow() + end + # Clean exit + Threads.atomic_xchg!(INTERRUPT_HANDLER_RUNNING, false) + _unregister_global_interrupt_handler() +end +function start_simple_interrupt_handler(; force::Bool=false) + if (Threads.atomic_cas!(INTERRUPT_HANDLER_RUNNING, false, true) == false) || force + simple_interrupt_handler_task = errormonitor(Threads.@spawn simple_interrupt_handler_checked()) + _register_global_interrupt_handler(simple_interrupt_handler_task) + end +end + +# REPL (TUI) interrupt handler + +function repl_interrupt_handler() + invokelatest(REPL_MODULE_REF[]) do REPL + TerminalMenus = REPL.TerminalMenus + + root_menu = TerminalMenus.RadioMenu( + [ + "Interrupt all", + "Interrupt only...", + "Interrupt root task (REPL/script)", + "Ignore it", + "Stop handling interrupts", + "Exit Julia", + "Force-exit Julia", + ] + ) + + while true + try + # Wait to be interrupted + wait() + catch err + if !(err isa InterruptException) + rethrow(err) + end + + # Display root menu + @label display_root + choice = TerminalMenus.request("Interrupt received, select an action:", root_menu) + if choice == 1 + lock(INTERRUPT_HANDLERS_LOCK) do + for mod in keys(INTERRUPT_HANDLERS) + for handler in INTERRUPT_HANDLERS[mod] + if handler.state == :runnable + _throwto_interrupt!(handler) + end + end + end + end + elseif choice == 2 + # Display modules menu + mods = lock(INTERRUPT_HANDLERS_LOCK) do + collect(keys(INTERRUPT_HANDLERS)) + end + mod_menu = TerminalMenus.RadioMenu(vcat(map(string, mods), "Go Back")) + @label display_mods + choice = TerminalMenus.request("Select a library to interrupt:", mod_menu) + if choice > length(mods) || choice == -1 + @goto display_root + else + lock(INTERRUPT_HANDLERS_LOCK) do + for handler in INTERRUPT_HANDLERS[mods[choice]] + _throwto_interrupt!(handler) + end + end + @goto display_mods + end + elseif choice == 3 + # Force-interrupt root task + _throwto_interrupt!(Base.roottask) + elseif choice == 4 || choice == -1 + # Do nothing + elseif choice == 5 + # Exit handler (caller will unregister us) + return + elseif choice == 6 + # Exit Julia cleanly + exit() + elseif choice == 7 + # Force an exit + ccall(:abort, Cvoid, ()) + end + end + end + end +end +function repl_interrupt_handler_checked() + try + repl_interrupt_handler() + catch err + # Some internal error, make sure we start a new handler + Threads.atomic_xchg!(INTERRUPT_HANDLER_RUNNING, false) + _unregister_global_interrupt_handler() + start_repl_interrupt_handler() + rethrow() + end + # Clean exit + Threads.atomic_xchg!(INTERRUPT_HANDLER_RUNNING, false) + _unregister_global_interrupt_handler() +end +function start_repl_interrupt_handler(; force::Bool=false) + if (Threads.atomic_cas!(INTERRUPT_HANDLER_RUNNING, false, true) == false) || force + repl_interrupt_handler_task = errormonitor(Threads.@spawn repl_interrupt_handler_checked()) + _register_global_interrupt_handler(repl_interrupt_handler_task) + end +end diff --git a/src/gc.c b/src/gc.c index cf04641d1fb69..d296f9a444cd0 100644 --- a/src/gc.c +++ b/src/gc.c @@ -2981,6 +2981,7 @@ static void gc_mark_roots(jl_gc_markqueue_t *mq) gc_try_claim_and_push(mq, jl_emptytuple_type, NULL); gc_try_claim_and_push(mq, cmpswap_names, NULL); gc_try_claim_and_push(mq, jl_global_roots_table, NULL); + gc_try_claim_and_push(mq, jl_atomic_load_relaxed(&jl_interrupt_handler), NULL); } // find unmarked objects that need to be finalized from the finalizer list "list". diff --git a/src/jl_exported_data.inc b/src/jl_exported_data.inc index 2acde218a104c..e3b603d8c43f0 100644 --- a/src/jl_exported_data.inc +++ b/src/jl_exported_data.inc @@ -55,6 +55,7 @@ XX(jl_int8_type) \ XX(jl_interconditional_type) \ XX(jl_interrupt_exception) \ + XX(jl_interrupt_handler) \ XX(jl_intrinsic_type) \ XX(jl_kwcall_func) \ XX(jl_lineinfonode_type) \ diff --git a/src/jl_exported_funcs.inc b/src/jl_exported_funcs.inc index 228c88109f322..966d6a24f8be6 100644 --- a/src/jl_exported_funcs.inc +++ b/src/jl_exported_funcs.inc @@ -416,6 +416,7 @@ XX(jl_running_on_valgrind) \ XX(jl_safe_printf) \ XX(jl_SC_CLK_TCK) \ + XX(jl_schedule_interrupt_handler) \ XX(jl_set_ARGS) \ XX(jl_set_const) \ XX(jl_set_errno) \ @@ -523,6 +524,7 @@ XX(jl_ver_string) \ XX(jl_vexceptionf) \ XX(jl_vprintf) \ + XX(jl_wake_thread) \ XX(jl_wakeup_thread) \ XX(jl_write_compiler_output) \ XX(jl_yield) \ diff --git a/src/julia_threads.h b/src/julia_threads.h index d4cbb88e619ba..f31a23372bdd7 100644 --- a/src/julia_threads.h +++ b/src/julia_threads.h @@ -375,8 +375,13 @@ JL_DLLEXPORT void jl_gc_run_pending_finalizers(struct _jl_task_t *ct); extern JL_DLLEXPORT _Atomic(int) jl_gc_have_pending_finalizers; JL_DLLEXPORT int8_t jl_gc_is_in_finalizer(void); +JL_DLLEXPORT int jl_wake_thread(int16_t tid); JL_DLLEXPORT void jl_wakeup_thread(int16_t tid); +JL_DLLEXPORT void jl_schedule_task(struct _jl_task_t *task); + +JL_DLLEXPORT void jl_schedule_interrupt_handler(void); + #ifdef __cplusplus } #endif diff --git a/src/partr.c b/src/partr.c index 75d6d832fe78f..2e12ac800ccdc 100644 --- a/src/partr.c +++ b/src/partr.c @@ -241,7 +241,7 @@ static int sleep_check_after_threshold(uint64_t *start_cycles) } -static int wake_thread(int16_t tid) JL_NOTSAFEPOINT +JL_DLLEXPORT int jl_wake_thread(int16_t tid) JL_NOTSAFEPOINT { jl_ptls_t other = jl_atomic_load_relaxed(&jl_all_tls_states)[tid]; int8_t state = sleeping; @@ -287,7 +287,7 @@ JL_DLLEXPORT void jl_wakeup_thread(int16_t tid) JL_NOTSAFEPOINT } else { // something added to the sticky-queue: notify that thread - if (wake_thread(tid) && uvlock != ct) { + if (jl_wake_thread(tid) && uvlock != ct) { // check if we need to notify uv_run too jl_fence(); jl_ptls_t other = jl_atomic_load_relaxed(&jl_all_tls_states)[tid]; @@ -308,7 +308,7 @@ JL_DLLEXPORT void jl_wakeup_thread(int16_t tid) JL_NOTSAFEPOINT int nthreads = jl_atomic_load_acquire(&jl_n_threads); for (tid = 0; tid < nthreads; tid++) { if (tid != self) - anysleep |= wake_thread(tid); + anysleep |= jl_wake_thread(tid); } // check if we need to notify uv_run too if (uvlock != ct && anysleep) { @@ -378,6 +378,8 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q, uint64_t start_cycles = 0; while (1) { + jl_schedule_interrupt_handler(); + jl_task_t *task = get_next_task(trypoptask, q); if (task) return task; diff --git a/src/signal-handling.c b/src/signal-handling.c index 284ad359f3799..5ab2c45557ec7 100644 --- a/src/signal-handling.c +++ b/src/signal-handling.c @@ -303,6 +303,33 @@ static void jl_check_profile_autostop(void) } } +// Graceful interrupt handler + +static _Atomic(int) handle_interrupt = 0; +JL_DLLEXPORT void jl_schedule_interrupt_handler(void) +{ + if (jl_atomic_exchange_relaxed(&handle_interrupt, 0) != 1) + return; + jl_task_t *handler = jl_atomic_load_relaxed(&jl_interrupt_handler); + if (!handler) + return; + assert(jl_is_task(handler)); + if (jl_atomic_load_relaxed(&handler->_state) != JL_TASK_STATE_RUNNABLE) + return; + handler->result = jl_interrupt_exception; + jl_atomic_store_relaxed(&handler->_isexception, 1); + jl_schedule_task(handler); +} +static int want_interrupt_handler(void) +{ + if (jl_atomic_load_relaxed(&jl_interrupt_handler)) { + // Set flag to trigger user handlers on next task switch + jl_atomic_store_relaxed(&handle_interrupt, 1); + return 1; + } + return 0; +} + #if defined(_WIN32) #include "signals-win.c" #else diff --git a/src/signals-unix.c b/src/signals-unix.c index b2056947e2b8a..7beed3492ec7c 100644 --- a/src/signals-unix.c +++ b/src/signals-unix.c @@ -527,9 +527,9 @@ void usr2_handler(int sig, siginfo_t *info, void *ctx) int force = jl_check_force_sigint(); if (force || (!ptls->defer_signal && ptls->io_wait)) { jl_safepoint_consume_sigint(); + // Force a throw if (force) jl_safe_printf("WARNING: Force throwing a SIGINT\n"); - // Force a throw jl_clear_force_sigint(); jl_throw_in_ctx(ct, jl_interrupt_exception, sig, ctx); } @@ -602,6 +602,16 @@ JL_DLLEXPORT void jl_profile_stop_timer(void) #endif #endif // HAVE_MACH +static void jl_deliver_handled_sigint(void) +{ + jl_ptls_t other = jl_atomic_load_relaxed(&jl_all_tls_states)[0]; + jl_wake_libuv(); + jl_atomic_store_release(&other->signal_request, 2); + // This also makes sure `sleep` is aborted. + pthread_kill(other->system_id, SIGUSR2); + jl_wake_thread(0); +} + static void allocate_segv_handler(void) { struct sigaction act; @@ -767,7 +777,7 @@ static void *signal_listener(void *arg) profile = (sig == SIGUSR1); #if defined(_POSIX_C_SOURCE) && _POSIX_C_SOURCE >= 199309L if (profile && !(info.si_code == SI_TIMER && - info.si_value.sival_ptr == &timerprof)) + info.si_value.sival_ptr == &timerprof)) profile = 0; #endif #endif @@ -780,6 +790,10 @@ static void *signal_listener(void *arg) else if (exit_on_sigint) { critical = 1; } + else if (want_interrupt_handler()) { + jl_deliver_handled_sigint(); + continue; + } else { jl_try_deliver_sigint(); continue; diff --git a/src/signals-win.c b/src/signals-win.c index 7cd3b02462851..bc4fae966b4c6 100644 --- a/src/signals-win.c +++ b/src/signals-win.c @@ -221,7 +221,8 @@ static BOOL WINAPI sigint_handler(DWORD wsig) //This needs winapi types to guara if (!jl_ignore_sigint()) { if (exit_on_sigint) jl_exit(128 + sig); // 128 + SIGINT - jl_try_deliver_sigint(); + if (!want_interrupt_handler()) + jl_try_deliver_sigint(); } return 1; } diff --git a/src/task.c b/src/task.c index 73d9033f0cb50..73fe0f68af078 100644 --- a/src/task.c +++ b/src/task.c @@ -623,6 +623,8 @@ JL_NO_ASAN static void ctx_switch(jl_task_t *lastt) JL_DLLEXPORT void jl_switch(void) JL_NOTSAFEPOINT_LEAVE JL_NOTSAFEPOINT_ENTER { + jl_schedule_interrupt_handler(); + jl_task_t *ct = jl_current_task; jl_ptls_t ptls = ct->ptls; jl_task_t *t = ptls->next_task; @@ -1140,7 +1142,7 @@ JL_DLLEXPORT void jl_task_wait() jl_apply(&wait_func, 1); ct->world_age = last_age; } - +#endif JL_DLLEXPORT void jl_schedule_task(jl_task_t *task) { static jl_function_t *sched_func = NULL; @@ -1154,7 +1156,6 @@ JL_DLLEXPORT void jl_schedule_task(jl_task_t *task) jl_apply(args, 2); ct->world_age = last_age; } -#endif // Do one-time initializations for task system void jl_init_tasks(void) JL_GC_DISABLED diff --git a/src/threading.h b/src/threading.h index 73d2cd73fb70d..003aa004e53cf 100644 --- a/src/threading.h +++ b/src/threading.h @@ -29,6 +29,8 @@ void jl_gc_mark_threadfun(void *arg); void jl_gc_sweep_threadfun(void *arg); void jl_threadfun(void *arg); +extern JL_DLLIMPORT _Atomic(jl_task_t*) jl_interrupt_handler JL_GLOBALLY_ROOTED; + #ifdef __cplusplus } #endif diff --git a/test/stress.jl b/test/stress.jl index b9fb720f0596a..031853f03b9a7 100644 --- a/test/stress.jl +++ b/test/stress.jl @@ -77,6 +77,10 @@ end # !Sys.iswindows # sig 2 is SIGINT per the POSIX.1-1990 standard if !Sys.iswindows() Base.exit_on_sigint(false) + + # test old interrupt behavior + Base._unregister_global_interrupt_handler() + Base.INTERRUPT_HANDLER_RUNNING[] = false @test_throws InterruptException begin ccall(:kill, Cvoid, (Cint, Cint,), getpid(), 2) for i in 1:10 @@ -84,5 +88,26 @@ if !Sys.iswindows() ccall(:jl_gc_safepoint, Cvoid, ()) # wait for SIGINT to arrive end end + + # interrupt handlers + Base.start_simple_interrupt_handler() + let exc_ref = Ref{Any}() + handler = Threads.@spawn begin + try + wait() + catch exc + exc_ref[] = exc + end + end + yield() # let the handler start + Base.register_interrupt_handler(Base, handler) + ccall(:kill, Cvoid, (Cint, Cint,), getpid(), 2) + for i in 1:10 + Libc.systemsleep(0.1) + yield() # wait for the handler to be run + end + Base.unregister_interrupt_handler(Base, handler) + @test isassigned(exc_ref) && exc_ref[] isa InterruptException + end Base.exit_on_sigint(true) end