diff --git a/Makefile.PL b/Makefile.PL index d2a0e96a..64895b75 100644 --- a/Makefile.PL +++ b/Makefile.PL @@ -26,6 +26,7 @@ my %WriteMakefileArgs = ( "Alien::uPB::Core" => 0, "Check::UnitCheck" => 0, "Class::Method::Modifiers" => 0, + "Compress::Zstd" => "0.20", "Config::Any" => "0.33", "Devel::MAT::Dumper" => 0, "Future" => "0.50", @@ -113,6 +114,7 @@ my %FallbackPrereqs = ( "Alien::uPB::Core" => 0, "Check::UnitCheck" => 0, "Class::Method::Modifiers" => 0, + "Compress::Zstd" => "0.20", "Config::Any" => "0.33", "Devel::MAT::Dumper" => 0, "ExtUtils::MakeMaker" => 0, diff --git a/cpanfile b/cpanfile index 7bedab61..a693eedf 100644 --- a/cpanfile +++ b/cpanfile @@ -58,6 +58,7 @@ requires 'Getopt::Long'; requires 'Pod::Usage'; requires 'List::Util', '>= 1.63'; requires 'List::Keywords', '>= 0.11'; +requires 'Compress::Zstd', '>= 0.20'; # Integration requires 'Net::Async::OpenTracing', '>= 1.001'; requires 'Log::Any::Adapter::OpenTracing', '>= 0.001'; diff --git a/lib/Myriad/Registry.pm b/lib/Myriad/Registry.pm index 29ed804d..99812b57 100644 --- a/lib/Myriad/Registry.pm +++ b/lib/Myriad/Registry.pm @@ -142,7 +142,7 @@ Registers a new batch method for the given class. method add_batch ($pkg, $method, $code, $args) { $batch{$pkg}{$method} = { code => $code, - args => $args, + args => $args || {}, }; } diff --git a/lib/Myriad/Service/Attributes.pm b/lib/Myriad/Service/Attributes.pm index fe5f47af..0620ed81 100644 --- a/lib/Myriad/Service/Attributes.pm +++ b/lib/Myriad/Service/Attributes.pm @@ -97,6 +97,16 @@ sub rpc { Mark this as an async method which should be called repeatedly to generate arrayref batches of data. +Takes the following parameters as a hashref: + +=over 4 + +=item * C - compress all data, regardless of size + +=item * C - compress any data which would be larger than the given size after encoding, in bytes + +=back + field $id = 0; async method example_batch : Batch { return [ ++$id ]; @@ -119,6 +129,16 @@ sub batch { Indicates a method which should be called on startup, which given a L will emit events to that sink until it's done. +Takes the following parameters as a hashref: + +=over 4 + +=item * C - compress all data, regardless of size + +=item * C - compress any data which would be larger than the given size after encoding, in bytes + +=back + =cut sub emitter { diff --git a/lib/Myriad/Service/Implementation.pm b/lib/Myriad/Service/Implementation.pm index f1f71ead..b8c49bae 100644 --- a/lib/Myriad/Service/Implementation.pm +++ b/lib/Myriad/Service/Implementation.pm @@ -255,10 +255,12 @@ async method load () { }); await $self->subscription->create_from_source( - source => $spec->{src}->pause, - channel => $chan, - service => $service_name, - max_len => $spec->{args}{max_len}, + source => $spec->{src}->pause, + channel => $chan, + service => $service_name, + max_len => $spec->{args}{max_len}, + compress => $spec->{args}{compress}, + compress_threshold => $spec->{args}{compress_threshold}, ); } } @@ -288,12 +290,16 @@ async method load () { if (my $batches = $registry->batches_for(ref($self))) { for my $method (sort keys $batches->%*) { $log->tracef('Adding Batch %s for %s', $method, $service_name); - my $sink = $batches->{$method}{sink} = $ryu->sink(label => 'batch:' . $method); + my $spec = $batches->{$method}; + my $sink = $spec->{sink} = $ryu->sink(label => 'batch:' . $method); $sink->pause; await $self->subscription->create_from_source( - source => $sink->source, - channel => $method, - service => $service_name, + source => $sink->source, + channel => $method, + service => $service_name, + max_len => $spec->{args}{max_len}, + compress => $spec->{args}{compress}, + compress_threshold => $spec->{args}{compress_threshold}, ); } } diff --git a/lib/Myriad/Subscription/Implementation/Redis.pm b/lib/Myriad/Subscription/Implementation/Redis.pm index 2583a774..ec0511e7 100644 --- a/lib/Myriad/Subscription/Implementation/Redis.pm +++ b/lib/Myriad/Subscription/Implementation/Redis.pm @@ -8,6 +8,7 @@ use Myriad::Class ':v2', extends => qw(IO::Async::Notifier), does => [ # AUTHORITY use Myriad::Util::UUID; +use Compress::Zstd (); use OpenTelemetry::Context; use OpenTelemetry::Trace; use OpenTelemetry::Constants qw( SPAN_STATUS_ERROR SPAN_STATUS_OK ); @@ -57,9 +58,12 @@ async method create_from_source (%args) { # we will make "check_for_overflow" aware about this stream after the service has started await $src->map($self->$curry::weak(method ($event) { $log->tracef('Subscription source %s adding an event: %s', $stream, $event); + my $data = encode_json_utf8($event); return $redis->xadd( encode_utf8($stream) => '*', - data => encode_json_utf8($event), + ($args{compress} || (defined $args{compress_threshold} and length($data) > $args{compress_threshold})) + ? (zstd => Compress::Zstd::compress($data)) + : (data => $data) ); }))->ordered_futures( low => 100, diff --git a/lib/Myriad/Transport/Redis.pm b/lib/Myriad/Transport/Redis.pm index 689f9545..d2653a2d 100644 --- a/lib/Myriad/Transport/Redis.pm +++ b/lib/Myriad/Transport/Redis.pm @@ -31,6 +31,7 @@ It should also cover retry for stateless calls. =cut use Class::Method::Modifiers qw(:all); +use Compress::Zstd (); use Sub::Util qw(subname); use Myriad::Redis::Pending; @@ -271,6 +272,7 @@ async method read_from_stream (%args) { my ($stream, $data) = $batch->@*; return map { my ($id, $args) = $_->@*; + $args->{data} = Compress::Zstd::decompress($args->{zstd}) if exists $args->{zstd}; +{ stream => $self->remove_prefix($stream), id => $id,