Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature/subscription_compression #311

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Makefile.PL
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ my %WriteMakefileArgs = (
"Alien::uPB::Core" => 0,
"Check::UnitCheck" => 0,
"Class::Method::Modifiers" => 0,
"Compress::Zstd" => "0.20",
"Config::Any" => "0.33",
"Devel::MAT::Dumper" => 0,
"Future" => "0.50",
Expand Down Expand Up @@ -113,6 +114,7 @@ my %FallbackPrereqs = (
"Alien::uPB::Core" => 0,
"Check::UnitCheck" => 0,
"Class::Method::Modifiers" => 0,
"Compress::Zstd" => "0.20",
"Config::Any" => "0.33",
"Devel::MAT::Dumper" => 0,
"ExtUtils::MakeMaker" => 0,
Expand Down
1 change: 1 addition & 0 deletions cpanfile
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ requires 'Getopt::Long';
requires 'Pod::Usage';
requires 'List::Util', '>= 1.63';
requires 'List::Keywords', '>= 0.11';
requires 'Compress::Zstd', '>= 0.20';
# Integration
requires 'Net::Async::OpenTracing', '>= 1.001';
requires 'Log::Any::Adapter::OpenTracing', '>= 0.001';
Expand Down
2 changes: 1 addition & 1 deletion lib/Myriad/Registry.pm
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ Registers a new batch method for the given class.
method add_batch ($pkg, $method, $code, $args) {
$batch{$pkg}{$method} = {
code => $code,
args => $args,
args => $args || {},
};
}

Expand Down
20 changes: 20 additions & 0 deletions lib/Myriad/Service/Attributes.pm
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,16 @@ sub rpc {
Mark this as an async method which should be called repeatedly to generate
arrayref batches of data.

Takes the following parameters as a hashref:

=over 4

=item * C<compress> - compress all data, regardless of size

=item * C<compress_threshold> - compress any data which would be larger than the given size after encoding, in bytes

=back

field $id = 0;
async method example_batch : Batch {
return [ ++$id ];
Expand All @@ -119,6 +129,16 @@ sub batch {
Indicates a method which should be called on startup, which given a
L<Ryu::Sink> will emit events to that sink until it's done.

Takes the following parameters as a hashref:

=over 4

=item * C<compress> - compress all data, regardless of size

=item * C<compress_threshold> - compress any data which would be larger than the given size after encoding, in bytes

=back

=cut

sub emitter {
Expand Down
22 changes: 14 additions & 8 deletions lib/Myriad/Service/Implementation.pm
Original file line number Diff line number Diff line change
Expand Up @@ -255,10 +255,12 @@ async method load () {
});

await $self->subscription->create_from_source(
source => $spec->{src}->pause,
channel => $chan,
service => $service_name,
max_len => $spec->{args}{max_len},
source => $spec->{src}->pause,
channel => $chan,
service => $service_name,
max_len => $spec->{args}{max_len},
compress => $spec->{args}{compress},
compress_threshold => $spec->{args}{compress_threshold},
);
}
}
Expand Down Expand Up @@ -288,12 +290,16 @@ async method load () {
if (my $batches = $registry->batches_for(ref($self))) {
for my $method (sort keys $batches->%*) {
$log->tracef('Adding Batch %s for %s', $method, $service_name);
my $sink = $batches->{$method}{sink} = $ryu->sink(label => 'batch:' . $method);
my $spec = $batches->{$method};
my $sink = $spec->{sink} = $ryu->sink(label => 'batch:' . $method);
$sink->pause;
await $self->subscription->create_from_source(
source => $sink->source,
channel => $method,
service => $service_name,
source => $sink->source,
channel => $method,
service => $service_name,
max_len => $spec->{args}{max_len},
compress => $spec->{args}{compress},
compress_threshold => $spec->{args}{compress_threshold},
);
}
}
Expand Down
6 changes: 5 additions & 1 deletion lib/Myriad/Subscription/Implementation/Redis.pm
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use Myriad::Class ':v2', extends => qw(IO::Async::Notifier), does => [
# AUTHORITY

use Myriad::Util::UUID;
use Compress::Zstd ();
use OpenTelemetry::Context;
use OpenTelemetry::Trace;
use OpenTelemetry::Constants qw( SPAN_STATUS_ERROR SPAN_STATUS_OK );
Expand Down Expand Up @@ -57,9 +58,12 @@ async method create_from_source (%args) {
# we will make "check_for_overflow" aware about this stream after the service has started
await $src->map($self->$curry::weak(method ($event) {
$log->tracef('Subscription source %s adding an event: %s', $stream, $event);
my $data = encode_json_utf8($event);
return $redis->xadd(
encode_utf8($stream) => '*',
data => encode_json_utf8($event),
($args{compress} || (defined $args{compress_threshold} and length($data) > $args{compress_threshold}))
? (zstd => Compress::Zstd::compress($data))
: (data => $data)
);
}))->ordered_futures(
low => 100,
Expand Down
2 changes: 2 additions & 0 deletions lib/Myriad/Transport/Redis.pm
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ It should also cover retry for stateless calls.
=cut

use Class::Method::Modifiers qw(:all);
use Compress::Zstd ();
use Sub::Util qw(subname);

use Myriad::Redis::Pending;
Expand Down Expand Up @@ -271,6 +272,7 @@ async method read_from_stream (%args) {
my ($stream, $data) = $batch->@*;
return map {
my ($id, $args) = $_->@*;
$args->{data} = Compress::Zstd::decompress($args->{zstd}) if exists $args->{zstd};
+{
stream => $self->remove_prefix($stream),
id => $id,
Expand Down
Loading