diff --git a/lib/Myriad/Subscription/Implementation/Redis.pm b/lib/Myriad/Subscription/Implementation/Redis.pm index da2bcc29..93e8df26 100644 --- a/lib/Myriad/Subscription/Implementation/Redis.pm +++ b/lib/Myriad/Subscription/Implementation/Redis.pm @@ -52,6 +52,10 @@ async method create_from_source (%args) { source => $src, max_len => $args{max_len} // MAX_ALLOWED_STREAM_LENGTH } unless exists $args{subchannel_key}; + await $redis->hset( + "subscription.channel", + map { encode_utf8($_) } $service, $args{channel} + ); my %seen_channel; $self->adopt_future( $src->unblocked->then($self->$curry::weak(async method { @@ -69,6 +73,10 @@ async method create_from_source (%args) { max_len => $args{max_len} // MAX_ALLOWED_STREAM_LENGTH }; $seen_channel{$k} = 1; + await $redis->hset( + "subscription.subchannel.{$service}", + map { encode_utf8($_) } $args{channel}, $k + ); } } $log->tracef('Subscription source %s adding an event: %s', $target_stream, $event); @@ -128,7 +136,18 @@ async method stop { async method create_group($receiver) { unless ($receiver->{group}) { - await $redis->create_group($receiver->{key}, $receiver->{group_name}); + await $redis->create_group( + $receiver->{key}, + $receiver->{group_name} + ); + # Record mapping for services and groups - we can also retrieve this information + # from $redis->xinfo_group, but having the entire list in a hash is more convenient + # for larger deployments. + await $redis->hset( + 'subscription.group', + $receiver->{key}, + $receiver->{group_name} + ); $receiver->{group} = 1; } return;