diff --git a/lib/Myriad/Subscription/Implementation/Redis.pm b/lib/Myriad/Subscription/Implementation/Redis.pm index 93e8df26..dbd061bb 100644 --- a/lib/Myriad/Subscription/Implementation/Redis.pm +++ b/lib/Myriad/Subscription/Implementation/Redis.pm @@ -271,20 +271,23 @@ async method check_for_overflow () { push @emitters, $emitter; try { my $len = await $redis->stream_length($emitter->{stream}); - if ($len >= $emitter->{max_len}) { - unless ($emitter->{source}->is_paused) { - $emitter->{source}->pause; - $log->infof('Paused subscription source on %s, length is %s, max allowed %s', $emitter->{stream}, $len, $emitter->{max_len}); - } + if ($len >= 0.75 * $emitter->{max_len}) { + # Try a regular cleanup if we're getting close to the limit await $redis->cleanup( stream => $emitter->{stream}, limit => $emitter->{max_len} ); - } else { - if($emitter->{source}->is_paused) { - $emitter->{source}->resume; - $log->infof('Resumed subscription source on %s, length is %s', $emitter->{stream}, $len); + $len = await $redis->stream_length($emitter->{stream}); + } + + if ($len >= $emitter->{max_len}) { + unless ($emitter->{source}->is_paused) { + $emitter->{source}->pause; + $log->infof('Paused subscription source on %s, length is %s, max allowed %s', $emitter->{stream}, $len, $emitter->{max_len}); } + } elsif ($emitter->{source}->is_paused) { + $emitter->{source}->resume; + $log->infof('Resumed subscription source on %s, length is %s', $emitter->{stream}, $len); } } catch ($e) { $log->warnf('An error ocurred while trying to check on stream %s status - %s', $emitter->{stream}, $e);