Skip to content

Commit

Permalink
Record subscription channels and consumer groups
Browse files Browse the repository at this point in the history
Also includes subchannels.
  • Loading branch information
tom-binary committed May 30, 2024
1 parent 259c8a0 commit 8a28517
Showing 1 changed file with 20 additions and 1 deletion.
21 changes: 20 additions & 1 deletion lib/Myriad/Subscription/Implementation/Redis.pm
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ async method create_from_source (%args) {
source => $src,
max_len => $args{max_len} // MAX_ALLOWED_STREAM_LENGTH
} unless exists $args{subchannel_key};
await $redis->hset(
"subscription.channel",
map { encode_utf8($_) } $service, $args{channel}
);
my %seen_channel;
$self->adopt_future(
$src->unblocked->then($self->$curry::weak(async method {
Expand All @@ -69,6 +73,10 @@ async method create_from_source (%args) {
max_len => $args{max_len} // MAX_ALLOWED_STREAM_LENGTH
};
$seen_channel{$k} = 1;
await $redis->hset(
"subscription.subchannel.{$service}",
map { encode_utf8($_) } $args{channel}, $k
);
}
}
$log->tracef('Subscription source %s adding an event: %s', $target_stream, $event);
Expand Down Expand Up @@ -128,7 +136,18 @@ async method stop {

async method create_group($receiver) {
unless ($receiver->{group}) {
await $redis->create_group($receiver->{key}, $receiver->{group_name});
await $redis->create_group(
$receiver->{key},
$receiver->{group_name}
);
# Record mapping for services and groups - we can also retrieve this information
# from $redis->xinfo_group, but having the entire list in a hash is more convenient
# for larger deployments.
await $redis->hset(
'subscription.group',
$receiver->{key},
$receiver->{group_name}
);
$receiver->{group} = 1;
}
return;
Expand Down

0 comments on commit 8a28517

Please sign in to comment.