Skip to content

Commit

Permalink
Clean up before we hit the stream limit
Browse files Browse the repository at this point in the history
  • Loading branch information
tom-binary committed May 30, 2024
1 parent 15cbf8b commit 1a66937
Showing 1 changed file with 12 additions and 9 deletions.
21 changes: 12 additions & 9 deletions lib/Myriad/Subscription/Implementation/Redis.pm
Original file line number Diff line number Diff line change
Expand Up @@ -271,20 +271,23 @@ async method check_for_overflow () {
push @emitters, $emitter;
try {
my $len = await $redis->stream_length($emitter->{stream});
if ($len >= $emitter->{max_len}) {
unless ($emitter->{source}->is_paused) {
$emitter->{source}->pause;
$log->infof('Paused subscription source on %s, length is %s, max allowed %s', $emitter->{stream}, $len, $emitter->{max_len});
}
if ($len >= 0.75 * $emitter->{max_len}) {
# Try a regular cleanup if we're getting close to the limit
await $redis->cleanup(
stream => $emitter->{stream},
limit => $emitter->{max_len}
);
} else {
if($emitter->{source}->is_paused) {
$emitter->{source}->resume;
$log->infof('Resumed subscription source on %s, length is %s', $emitter->{stream}, $len);
$len = await $redis->stream_length($emitter->{stream});
}

if ($len >= $emitter->{max_len}) {
unless ($emitter->{source}->is_paused) {
$emitter->{source}->pause;
$log->infof('Paused subscription source on %s, length is %s, max allowed %s', $emitter->{stream}, $len, $emitter->{max_len});
}
} elsif ($emitter->{source}->is_paused) {
$emitter->{source}->resume;
$log->infof('Resumed subscription source on %s, length is %s', $emitter->{stream}, $len);
}
} catch ($e) {
$log->warnf('An error ocurred while trying to check on stream %s status - %s', $emitter->{stream}, $e);
Expand Down

0 comments on commit 1a66937

Please sign in to comment.