Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

signal handling: User-defined interrupt handlers #49541

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions base/Base.jl
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,7 @@ include("timing.jl")
include("util.jl")
include("client.jl")
include("asyncmap.jl")
include("interrupts.jl")

# deprecated functions
include("deprecated.jl")
Expand Down
5 changes: 5 additions & 0 deletions base/client.jl
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,9 @@ function exec_options(opts)
# remove filename from ARGS
global PROGRAM_FILE = arg_is_program ? popfirst!(ARGS) : ""

# start basic interrupt handler
start_simple_interrupt_handler(;force=true)

# Load Distributed module only if any of the Distributed options have been specified.
distributed_mode = (opts.worker == 1) || (opts.nprocs > 0) || (opts.machine_file != C_NULL)
if distributed_mode
Expand Down Expand Up @@ -323,6 +326,7 @@ function exec_options(opts)
end
end
end

if repl || is_interactive::Bool
b = opts.banner
auto = b == -1
Expand Down Expand Up @@ -413,6 +417,7 @@ function run_main_repl(interactive::Bool, quiet::Bool, banner::Symbol, history_f

if interactive && isassigned(REPL_MODULE_REF)
invokelatest(REPL_MODULE_REF[]) do REPL
start_repl_interrupt_handler(;force=true)
term_env = get(ENV, "TERM", @static Sys.iswindows() ? "" : "dumb")
global current_terminfo = load_terminfo(term_env)
term = REPL.Terminals.TTYTerminal(term_env, stdin, stdout, stderr)
Expand Down
243 changes: 243 additions & 0 deletions base/interrupts.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
# Internal methods, only to be used to change to a different global interrupt handler
function _register_global_interrupt_handler(handler::Task)
handler_ptr = Base.pointer_from_objref(handler)
slot_ptr = cglobal(:jl_interrupt_handler, Ptr{Cvoid})
Intrinsics.atomic_pointerset(slot_ptr, handler_ptr, :release)
end
function _unregister_global_interrupt_handler()
slot_ptr = cglobal(:jl_interrupt_handler, Ptr{Cvoid})
Intrinsics.atomic_pointerset(slot_ptr, C_NULL, :release)
end

const INTERRUPT_HANDLERS_LOCK = Threads.ReentrantLock()
const INTERRUPT_HANDLERS = Dict{Module,Vector{Task}}()
const INTERRUPT_HANDLER_RUNNING = Threads.Atomic{Bool}(false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aren't these atomics deprecated?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would you use instead? The @atomics interface doesn't provide an alternative for a single but entirely atomically accessed resource, as far as I'm aware.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can turn it into a struct, but there's only ever one instance of it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should make it just a global, but currently those don't support custom atomic annotations (they are always just release/consume)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "clean" solution would be to have a struct with @atomic fields, stored in a global Ref. The constructor would check if the Ref is already assigned and refuse being instantiated otherwise.


"""
register_interrupt_handler(mod::Module, handler::Task)

Registers the task `handler` to handle interrupts (such as from Ctrl-C).
Handlers are expected to sit idly within a `wait()` call or similar. When an
interrupt is received by Ctrl-C or manual SIGINT, one of two actions may
happen:

If the REPL is not running (such as when running `julia myscript.jl`), then all
registered interrupt handlers will be woken with an `InterruptException()`, and
the handler may take whatever actions are necessary to gracefully interrupt any
associated running computations. It is expected that the handler will spawn
tasks to perform the graceful interrupt, so that the handler task may return
quickly to again calling `wait()` to catch future user interrupts.

If the REPL is running, then the user will be presented with a terminal menu
which will allow them to do one of:
- Ignore the interrupt (do nothing)
- Activate all handlers for all modules
- Activate all handlers for a specific module
- Disable this interrupt handler logic (see below for details)
- Exit Julia gracefully (with `exit`)
- Exit Julia forcefully (with a `ccall` to `abort`)

Note that if the interrupt handler logic is disabled by the above menu option,
Julia will fall back to the old Ctrl-C handling behavior, which has the
potential to cause crashes and undefined behavior (but can also interrupt more
kinds of code). If desired, the interrupt handler logic can be re-enabled by
calling `start_repl_interrupt_handler()`, which will disable the old Ctrl-C
handling behavior.

To unregister a previously-registered handler, use
[`unregister_interrupt_handler`](@ref).

!!! warn
Non-yielding tasks may block interrupt handlers from running; this means
that once an interrupt handler is registered, code like `while true end`
may become un-interruptible.
"""
function register_interrupt_handler(mod::Module, handler::Task)
if ccall(:jl_generating_output, Cint, ()) == 1
throw(ConcurrencyViolationError("Interrupt handlers cannot be registered during precompilation.\nPlease register your handler later (possibly in your module's `__init__`)."))
end
lock(INTERRUPT_HANDLERS_LOCK) do
handlers = get!(Vector{Task}, INTERRUPT_HANDLERS, mod)
push!(handlers, handler)
end
end

"""
unregister_interrupt_handler(mod::Module, handler::Task)

Unregisters the interrupt handler task `handler`; see
[`register_interrupt_handler`](@ref) for further details.
"""
function unregister_interrupt_handler(mod::Module, handler::Task)
if ccall(:jl_generating_output, Cint, ()) == 1
throw(ConcurrencyViolationError("Interrupt handlers cannot be unregistered during precompilation."))
end
lock(INTERRUPT_HANDLERS_LOCK) do
handlers = get!(Vector{Task}, INTERRUPT_HANDLERS, mod)
deleteat!(handlers, findall(==(handler), handlers))
end
end

function _throwto_interrupt!(task::Task)
if task.state == :runnable
task._isexception = true
task.result = InterruptException()
try
schedule(task)
catch
end
end
end

# Simple (no TUI) interrupt handler

function simple_interrupt_handler()
last_time = 0.0
while true
try
# Wait to be interrupted
wait()
catch err
if !(err isa InterruptException)
rethrow(err)
end

# Force-interrupt root task if two interrupts in quick succession (< 1s)
now_time = time()
diff_time = now_time - last_time
last_time = now_time
if diff_time < 1
_throwto_interrupt!(Base.roottask)
end

# Interrupt all handlers
lock(INTERRUPT_HANDLERS_LOCK) do
for mod in keys(INTERRUPT_HANDLERS)
for handler in INTERRUPT_HANDLERS[mod]
if handler.state == :runnable
_throwto_interrupt!(handler)
end
end
end
end
end
end
end
function simple_interrupt_handler_checked()
try
simple_interrupt_handler()
catch err
# Some internal error, make sure we start a new handler
Threads.atomic_xchg!(INTERRUPT_HANDLER_RUNNING, false)
_unregister_global_interrupt_handler()
start_simple_interrupt_handler()
rethrow()
end
# Clean exit
Threads.atomic_xchg!(INTERRUPT_HANDLER_RUNNING, false)
_unregister_global_interrupt_handler()
end
function start_simple_interrupt_handler(; force::Bool=false)
if (Threads.atomic_cas!(INTERRUPT_HANDLER_RUNNING, false, true) == false) || force
simple_interrupt_handler_task = errormonitor(Threads.@spawn simple_interrupt_handler_checked())
_register_global_interrupt_handler(simple_interrupt_handler_task)
end
end

# REPL (TUI) interrupt handler

function repl_interrupt_handler()
invokelatest(REPL_MODULE_REF[]) do REPL
TerminalMenus = REPL.TerminalMenus

root_menu = TerminalMenus.RadioMenu(
[
"Interrupt all",
"Interrupt only...",
"Interrupt root task (REPL/script)",
"Ignore it",
"Stop handling interrupts",
"Exit Julia",
"Force-exit Julia",
]
)

while true
try
# Wait to be interrupted
wait()
catch err
if !(err isa InterruptException)
rethrow(err)
end

# Display root menu
@label display_root
choice = TerminalMenus.request("Interrupt received, select an action:", root_menu)
if choice == 1
lock(INTERRUPT_HANDLERS_LOCK) do
for mod in keys(INTERRUPT_HANDLERS)
for handler in INTERRUPT_HANDLERS[mod]
if handler.state == :runnable
_throwto_interrupt!(handler)
end
end
end
end
elseif choice == 2
# Display modules menu
mods = lock(INTERRUPT_HANDLERS_LOCK) do
collect(keys(INTERRUPT_HANDLERS))
end
mod_menu = TerminalMenus.RadioMenu(vcat(map(string, mods), "Go Back"))
@label display_mods
choice = TerminalMenus.request("Select a library to interrupt:", mod_menu)
if choice > length(mods) || choice == -1
@goto display_root
else
lock(INTERRUPT_HANDLERS_LOCK) do
for handler in INTERRUPT_HANDLERS[mods[choice]]
_throwto_interrupt!(handler)
end
end
@goto display_mods
end
elseif choice == 3
# Force-interrupt root task
_throwto_interrupt!(Base.roottask)
elseif choice == 4 || choice == -1
# Do nothing
elseif choice == 5
# Exit handler (caller will unregister us)
return
elseif choice == 6
# Exit Julia cleanly
exit()
elseif choice == 7
# Force an exit
ccall(:abort, Cvoid, ())
end
end
end
end
end
function repl_interrupt_handler_checked()
try
repl_interrupt_handler()
catch err
# Some internal error, make sure we start a new handler
Threads.atomic_xchg!(INTERRUPT_HANDLER_RUNNING, false)
_unregister_global_interrupt_handler()
start_repl_interrupt_handler()
rethrow()
end
# Clean exit
Threads.atomic_xchg!(INTERRUPT_HANDLER_RUNNING, false)
_unregister_global_interrupt_handler()
end
function start_repl_interrupt_handler(; force::Bool=false)
if (Threads.atomic_cas!(INTERRUPT_HANDLER_RUNNING, false, true) == false) || force
repl_interrupt_handler_task = errormonitor(Threads.@spawn repl_interrupt_handler_checked())
_register_global_interrupt_handler(repl_interrupt_handler_task)
end
end
1 change: 1 addition & 0 deletions src/gc.c
Original file line number Diff line number Diff line change
Expand Up @@ -2981,6 +2981,7 @@ static void gc_mark_roots(jl_gc_markqueue_t *mq)
gc_try_claim_and_push(mq, jl_emptytuple_type, NULL);
gc_try_claim_and_push(mq, cmpswap_names, NULL);
gc_try_claim_and_push(mq, jl_global_roots_table, NULL);
gc_try_claim_and_push(mq, jl_atomic_load_relaxed(&jl_interrupt_handler), NULL);
}

// find unmarked objects that need to be finalized from the finalizer list "list".
Expand Down
1 change: 1 addition & 0 deletions src/jl_exported_data.inc
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
XX(jl_int8_type) \
XX(jl_interconditional_type) \
XX(jl_interrupt_exception) \
XX(jl_interrupt_handler) \
XX(jl_intrinsic_type) \
XX(jl_kwcall_func) \
XX(jl_lineinfonode_type) \
Expand Down
2 changes: 2 additions & 0 deletions src/jl_exported_funcs.inc
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,7 @@
XX(jl_running_on_valgrind) \
XX(jl_safe_printf) \
XX(jl_SC_CLK_TCK) \
XX(jl_schedule_interrupt_handler) \
XX(jl_set_ARGS) \
XX(jl_set_const) \
XX(jl_set_errno) \
Expand Down Expand Up @@ -523,6 +524,7 @@
XX(jl_ver_string) \
XX(jl_vexceptionf) \
XX(jl_vprintf) \
XX(jl_wake_thread) \
XX(jl_wakeup_thread) \
XX(jl_write_compiler_output) \
XX(jl_yield) \
Expand Down
5 changes: 5 additions & 0 deletions src/julia_threads.h
Original file line number Diff line number Diff line change
Expand Up @@ -375,8 +375,13 @@ JL_DLLEXPORT void jl_gc_run_pending_finalizers(struct _jl_task_t *ct);
extern JL_DLLEXPORT _Atomic(int) jl_gc_have_pending_finalizers;
JL_DLLEXPORT int8_t jl_gc_is_in_finalizer(void);

JL_DLLEXPORT int jl_wake_thread(int16_t tid);
JL_DLLEXPORT void jl_wakeup_thread(int16_t tid);

JL_DLLEXPORT void jl_schedule_task(struct _jl_task_t *task);

JL_DLLEXPORT void jl_schedule_interrupt_handler(void);

#ifdef __cplusplus
}
#endif
Expand Down
8 changes: 5 additions & 3 deletions src/partr.c
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ static int sleep_check_after_threshold(uint64_t *start_cycles)
}


static int wake_thread(int16_t tid) JL_NOTSAFEPOINT
JL_DLLEXPORT int jl_wake_thread(int16_t tid) JL_NOTSAFEPOINT
{
jl_ptls_t other = jl_atomic_load_relaxed(&jl_all_tls_states)[tid];
int8_t state = sleeping;
Expand Down Expand Up @@ -287,7 +287,7 @@ JL_DLLEXPORT void jl_wakeup_thread(int16_t tid) JL_NOTSAFEPOINT
}
else {
// something added to the sticky-queue: notify that thread
if (wake_thread(tid) && uvlock != ct) {
if (jl_wake_thread(tid) && uvlock != ct) {
// check if we need to notify uv_run too
jl_fence();
jl_ptls_t other = jl_atomic_load_relaxed(&jl_all_tls_states)[tid];
Expand All @@ -308,7 +308,7 @@ JL_DLLEXPORT void jl_wakeup_thread(int16_t tid) JL_NOTSAFEPOINT
int nthreads = jl_atomic_load_acquire(&jl_n_threads);
for (tid = 0; tid < nthreads; tid++) {
if (tid != self)
anysleep |= wake_thread(tid);
anysleep |= jl_wake_thread(tid);
}
// check if we need to notify uv_run too
if (uvlock != ct && anysleep) {
Expand Down Expand Up @@ -378,6 +378,8 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q,
uint64_t start_cycles = 0;

while (1) {
jl_schedule_interrupt_handler();

jl_task_t *task = get_next_task(trypoptask, q);
if (task)
return task;
Expand Down
27 changes: 27 additions & 0 deletions src/signal-handling.c
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,33 @@ static void jl_check_profile_autostop(void)
}
}

// Graceful interrupt handler

static _Atomic(int) handle_interrupt = 0;
JL_DLLEXPORT void jl_schedule_interrupt_handler(void)
{
if (jl_atomic_exchange_relaxed(&handle_interrupt, 0) != 1)
return;
jl_task_t *handler = jl_atomic_load_relaxed(&jl_interrupt_handler);
if (!handler)
return;
assert(jl_is_task(handler));
if (jl_atomic_load_relaxed(&handler->_state) != JL_TASK_STATE_RUNNABLE)
return;
handler->result = jl_interrupt_exception;
jl_atomic_store_relaxed(&handler->_isexception, 1);
jl_schedule_task(handler);
}
static int want_interrupt_handler(void)
{
if (jl_atomic_load_relaxed(&jl_interrupt_handler)) {
// Set flag to trigger user handlers on next task switch
jl_atomic_store_relaxed(&handle_interrupt, 1);
return 1;
}
return 0;
}

#if defined(_WIN32)
#include "signals-win.c"
#else
Expand Down
Loading