Skip to content

Commit

Permalink
Merge pull request #352 from tom-binary/feature/also_instead_of_cancel
Browse files Browse the repository at this point in the history
Use `also` to avoid cancelling timeout-guarded `Future`s
  • Loading branch information
tom-binary authored Oct 22, 2024
2 parents 00c9252 + 1493f50 commit e6a9378
Show file tree
Hide file tree
Showing 6 changed files with 17 additions and 11 deletions.
4 changes: 2 additions & 2 deletions Makefile.PL
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ my %WriteMakefileArgs = (
"Config::Any" => "0.33",
"Data::Checks" => "0.09",
"Devel::MAT::Dumper" => 0,
"Future" => "0.50",
"Future" => "0.51",
"Future::AsyncAwait" => "0.66",
"Future::AsyncAwait::Hooks" => "0.02",
"Future::IO" => "0.15",
Expand Down Expand Up @@ -127,7 +127,7 @@ my %FallbackPrereqs = (
"Devel::MAT::Dumper" => 0,
"ExtUtils::MakeMaker" => 0,
"File::Spec" => 0,
"Future" => "0.50",
"Future" => "0.51",
"Future::AsyncAwait" => "0.66",
"Future::AsyncAwait::Hooks" => "0.02",
"Future::IO" => "0.15",
Expand Down
2 changes: 1 addition & 1 deletion cpanfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ requires 'Syntax::Keyword::Try', '>= 0.29';
requires 'Syntax::Keyword::Defer', '>= 0.10';
requires 'Syntax::Keyword::Match', '>= 0.15';
requires 'Syntax::Operator::Equ', '>= 0.10';
requires 'Future', '>= 0.50';
requires 'Future', '>= 0.51';
requires 'Future::Queue', '>= 0.52';
requires 'Future::AsyncAwait', '>= 0.66';
requires 'Future::AsyncAwait::Hooks', '>= 0.02';
Expand Down
12 changes: 9 additions & 3 deletions lib/Myriad.pm
Original file line number Diff line number Diff line change
Expand Up @@ -580,10 +580,10 @@ async method shutdown () {
} splice $shutdown_tasks->@*;

await Future->wait_any(
Future->wait_all(
$self->loop->timeout_future(after => $ENV{MYRIAD_SHUTDOWN_TIMEOUT} // 10),
also => Future->wait_all(
@shutdown_operations
),
$self->loop->timeout_future(after => 5)
);

$f->done unless $f->is_ready;
Expand Down Expand Up @@ -780,11 +780,17 @@ async method run () {

try {
# Run the startup tasks, order is important
for my $task ($startup_tasks->@*) {
while(my $task = shift $startup_tasks->@*) {
$log->tracef('Startup task %s', $task);
await $self->$task;
}
} catch ($e) {
try {
# Cancel all remaining tasks
$_->cancel for splice $startup_tasks->@*;
} catch ($e) {
$log->errorf('Unable to cancel pending startup tasks: %s', $e);
}
die "Startup tasks failed - $e";
}
$log->tracef('Startup tasks done');
Expand Down
2 changes: 1 addition & 1 deletion lib/Myriad/Mutex.pm
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ async method acquire {
my $removed = $storage->when_key_changed($key);
await Future->wait_any(
$loop->delay_future(after => 3 + rand),
$removed->without_cancel,
also => $removed,
) if await $storage->get($key);
} else {
$log->debugf('Acquired mutex [%s]', $key);
Expand Down
4 changes: 2 additions & 2 deletions lib/Myriad/RPC/Implementation/Memory.pm
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ async method start () {
) if %$messages;
}), foreach => [ $self->rpc_list->@* ], concurrent => 0 + $self->rpc_list->@*);
await Future->wait_any(
$should_shutdown->without_cancel,
$self->loop->delay_future(after => 0.1)
$self->loop->delay_future(after => 0.1),
also => $should_shutdown,
);
}
}
Expand Down
4 changes: 2 additions & 2 deletions lib/Myriad/Subscription/Implementation/Redis.pm
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,9 @@ async method start {
$log->tracef('Starting subscription handler client_id: %s', $client_id);
await $self->create_streams;
await Future->wait_any(
$should_shutdown->without_cancel,
$self->receive_items,
$self->check_for_overflow
$self->check_for_overflow,
also => $should_shutdown,
);
}

Expand Down

0 comments on commit e6a9378

Please sign in to comment.