diff --git a/Makefile.PL b/Makefile.PL index b8d74761..93880a47 100644 --- a/Makefile.PL +++ b/Makefile.PL @@ -31,7 +31,7 @@ my %WriteMakefileArgs = ( "Config::Any" => "0.33", "Data::Checks" => "0.09", "Devel::MAT::Dumper" => 0, - "Future" => "0.50", + "Future" => "0.51", "Future::AsyncAwait" => "0.66", "Future::AsyncAwait::Hooks" => "0.02", "Future::IO" => "0.15", @@ -127,7 +127,7 @@ my %FallbackPrereqs = ( "Devel::MAT::Dumper" => 0, "ExtUtils::MakeMaker" => 0, "File::Spec" => 0, - "Future" => "0.50", + "Future" => "0.51", "Future::AsyncAwait" => "0.66", "Future::AsyncAwait::Hooks" => "0.02", "Future::IO" => "0.15", diff --git a/cpanfile b/cpanfile index 17f43a53..3bb615a5 100644 --- a/cpanfile +++ b/cpanfile @@ -11,7 +11,7 @@ requires 'Syntax::Keyword::Try', '>= 0.29'; requires 'Syntax::Keyword::Defer', '>= 0.10'; requires 'Syntax::Keyword::Match', '>= 0.15'; requires 'Syntax::Operator::Equ', '>= 0.10'; -requires 'Future', '>= 0.50'; +requires 'Future', '>= 0.51'; requires 'Future::Queue', '>= 0.52'; requires 'Future::AsyncAwait', '>= 0.66'; requires 'Future::AsyncAwait::Hooks', '>= 0.02'; diff --git a/lib/Myriad.pm b/lib/Myriad.pm index 4de9e7b1..96dd37b3 100644 --- a/lib/Myriad.pm +++ b/lib/Myriad.pm @@ -580,10 +580,10 @@ async method shutdown () { } splice $shutdown_tasks->@*; await Future->wait_any( - Future->wait_all( + $self->loop->timeout_future(after => $ENV{MYRIAD_SHUTDOWN_TIMEOUT} // 10), + also => Future->wait_all( @shutdown_operations ), - $self->loop->timeout_future(after => 5) ); $f->done unless $f->is_ready; @@ -780,11 +780,17 @@ async method run () { try { # Run the startup tasks, order is important - for my $task ($startup_tasks->@*) { + while(my $task = shift $startup_tasks->@*) { $log->tracef('Startup task %s', $task); await $self->$task; } } catch ($e) { + try { + # Cancel all remaining tasks + $_->cancel for splice $startup_tasks->@*; + } catch ($e) { + $log->errorf('Unable to cancel pending startup tasks: %s', $e); + } die "Startup tasks failed - $e"; } $log->tracef('Startup tasks done'); diff --git a/lib/Myriad/Mutex.pm b/lib/Myriad/Mutex.pm index 35c0de67..f724046a 100644 --- a/lib/Myriad/Mutex.pm +++ b/lib/Myriad/Mutex.pm @@ -52,7 +52,7 @@ async method acquire { my $removed = $storage->when_key_changed($key); await Future->wait_any( $loop->delay_future(after => 3 + rand), - $removed->without_cancel, + also => $removed, ) if await $storage->get($key); } else { $log->debugf('Acquired mutex [%s]', $key); diff --git a/lib/Myriad/RPC/Implementation/Memory.pm b/lib/Myriad/RPC/Implementation/Memory.pm index 39ca1ba7..205cdcfb 100644 --- a/lib/Myriad/RPC/Implementation/Memory.pm +++ b/lib/Myriad/RPC/Implementation/Memory.pm @@ -87,8 +87,8 @@ async method start () { ) if %$messages; }), foreach => [ $self->rpc_list->@* ], concurrent => 0 + $self->rpc_list->@*); await Future->wait_any( - $should_shutdown->without_cancel, - $self->loop->delay_future(after => 0.1) + $self->loop->delay_future(after => 0.1), + also => $should_shutdown, ); } } diff --git a/lib/Myriad/Subscription/Implementation/Redis.pm b/lib/Myriad/Subscription/Implementation/Redis.pm index 7b147839..454f29f3 100644 --- a/lib/Myriad/Subscription/Implementation/Redis.pm +++ b/lib/Myriad/Subscription/Implementation/Redis.pm @@ -139,9 +139,9 @@ async method start { $log->tracef('Starting subscription handler client_id: %s', $client_id); await $self->create_streams; await Future->wait_any( - $should_shutdown->without_cancel, $self->receive_items, - $self->check_for_overflow + $self->check_for_overflow, + also => $should_shutdown, ); }