From df9ec62eaf4a3a6036281f8b05c7daeb949282d6 Mon Sep 17 00:00:00 2001 From: JamesWrigley Date: Tue, 9 Apr 2024 01:07:26 +0200 Subject: [PATCH 1/3] Allow for workers dying in the middle of cleanup --- src/sch/Sch.jl | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/sch/Sch.jl b/src/sch/Sch.jl index 52be64fba..48a66dcef 100644 --- a/src/sch/Sch.jl +++ b/src/sch/Sch.jl @@ -409,7 +409,15 @@ function cleanup_proc(state, p, log_sink) # If the worker process is still alive, clean it up if wid in workers() - remotecall_wait(_cleanup_proc, wid, state.uid, log_sink) + try + remotecall_wait(_cleanup_proc, wid, state.uid, log_sink) + catch ex + # We allow ProcessExitedException's, which means that the worker + # shutdown halfway through cleanup. + if !(ex isa ProcessExitedException) + rethrow() + end + end end timespan_finish(ctx, :cleanup_proc, (;worker=wid), nothing) From 6bf21b3f5826fb47b1ce77d40ec18dcae8972ce9 Mon Sep 17 00:00:00 2001 From: JamesWrigley Date: Tue, 9 Apr 2024 13:06:52 +0200 Subject: [PATCH 2/3] Allow for dead workers in safepoint() --- src/sch/dynamic.jl | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/src/sch/dynamic.jl b/src/sch/dynamic.jl index cac22c0ed..a0e689edf 100644 --- a/src/sch/dynamic.jl +++ b/src/sch/dynamic.jl @@ -32,9 +32,18 @@ function safepoint(state) if state.halt.set # Force dynamic thunks and listeners to terminate for (inp_chan,out_chan) in values(state.worker_chans) - close(inp_chan) - close(out_chan) + # Closing these channels will fail if the worker died, which we + # allow. + try + close(inp_chan) + close(out_chan) + catch ex + if !(ex isa ProcessExitedException) + rethrow() + end + end end + # Throw out of scheduler throw(SchedulerHaltedException()) end From 3b0a35562f40e4cfd2039f06d0575590c06d7acc Mon Sep 17 00:00:00 2001 From: JamesWrigley Date: Tue, 9 Apr 2024 03:44:35 +0200 Subject: [PATCH 3/3] Use procs() when initializing EAGER_CONTEXT Using `myid()` with `workers()` meant that when the context was initialized with a single worker the processor list would be: `[OSProc(1), OSProc(1)]`. `procs()` will always include PID 1 and any other workers, which is what we want. --- src/sch/eager.jl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sch/eager.jl b/src/sch/eager.jl index b90646c27..8fcb10d92 100644 --- a/src/sch/eager.jl +++ b/src/sch/eager.jl @@ -6,7 +6,7 @@ const EAGER_STATE = Ref{Union{ComputeState,Nothing}}(nothing) function eager_context() if EAGER_CONTEXT[] === nothing - EAGER_CONTEXT[] = Context([myid(),workers()...]) + EAGER_CONTEXT[] = Context(procs()) end return EAGER_CONTEXT[] end