diff --git a/Gemfile.lock b/Gemfile.lock index 4a70de83..9313dc1e 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -22,6 +22,7 @@ PATH oj (~> 3.11) ox (~> 2.14) typhoeus (~> 1.4.0) + zlib (~> 3.2.1) GEM remote: https://rubygems.org/ @@ -194,7 +195,7 @@ GEM mime-types-data (~> 3.2015) mime-types-data (3.2021.0901) mini_mime (1.1.2) - mini_portile2 (2.8.1) + mini_portile2 (2.8.8) minitest (5.14.4) mongo (2.14.0) bson (>= 4.8.2, < 5.0.0) @@ -206,9 +207,14 @@ GEM mustermann (1.1.1) ruby2_keywords (~> 0.0.1) nio4r (2.5.8) + nokogiri (1.18.0) + mini_portile2 (~> 2.8.2) + racc (~> 1.4) oj (3.13.9) ox (2.14.5) parallel (1.20.1) + parallel_tests (4.7.2) + parallel parser (3.0.1.1) ast (~> 2.4.1) pry (0.14.1) @@ -218,7 +224,7 @@ GEM byebug (~> 11.0) pry (~> 0.10) public_suffix (4.0.6) - racc (1.6.2) + racc (1.8.1) rack (2.2.3) rack-protection (2.1.0) rack @@ -317,6 +323,7 @@ GEM websocket-extensions (0.1.5) yard (0.9.26) zeitwerk (2.5.1) + zlib (3.2.1) PLATFORMS ruby @@ -326,6 +333,7 @@ DEPENDENCIES event_source! faker mongoid + parallel_tests pry pry-byebug rails (>= 6.1.4) diff --git a/lib/event_source/operations/payload_codec.rb b/lib/event_source/operations/payload_codec.rb index a1922f92..83d97921 100644 --- a/lib/event_source/operations/payload_codec.rb +++ b/lib/event_source/operations/payload_codec.rb @@ -7,43 +7,56 @@ module EventSource module Operations - # A class for handling payload compression and decompression using Dry-rb. + # A class for handling payload compression and decompression using Zlib. class PayloadCodec - include Dry::Monads[:result] # For Success/Failure monads - include Dry::Monads::Do # For do notation + # include Dry::Monads[:result] # For Success/Failure monads + # include Dry::Monads::Do # For do notation + include Dry::Monads[:result, :do] extend Dry::Initializer + # @!attribute [r] payload + # @return [String, Hash] The payload to be compressed or decompressed option :payload, reader: :private - # Compresses the payload into a Base64-encoded compressed string + # Compresses the payload into a compressed string using Zlib. # # @return [Dry::Monads::Success] if compression is successful # @return [Dry::Monads::Failure] if an error occurs def compress - json_payload = yield validate_payload_for_compression - compressed_data = Zlib.deflate(json_payload) + json_payload = validate_payload_for_compression + return json_payload unless json_payload.success? + compressed_data = Zlib.deflate(json_payload.value!) Success(compressed_data) rescue StandardError => e Failure("Compression failed: #{e.message}") end - # Decompresses a Base64-encoded compressed payload back to its original form + # Decompresses a compressed payload back to its original form. # # @return [Dry::Monads::Success] if decompression is successful # @return [Dry::Monads::Failure] if an error occurs def decompress - decoded_data = yield validate_payload_for_decompression - decompressed_data = Zlib.inflate(decoded_data) + decompressed_data = Zlib.inflate(payload) Success(decompressed_data) rescue StandardError => e Failure("Decompression failed: #{e.message}") end + # Decompresses the payload if it is in binary encoding. + # + # @return [Dry::Monads::Success] if payload is decompressed or not binary + # @return [Dry::Monads::Failure] if decompression fails + def decompress_if_binary + return Success(payload) unless binary_payload? + + decompress + end + private - # Validates the payload before compression + # Validates the payload before compression and ensures it is a Hash or String. # # @return [Dry::Monads::Success] if the payload is valid # @return [Dry::Monads::Failure] if the payload is invalid @@ -53,19 +66,11 @@ def validate_payload_for_compression Success(payload.is_a?(Hash) ? payload.to_json : payload) end - # Validates the payload before decompression + # Checks if the payload is in binary encoding. # - # @return [Dry::Monads::Success] if the payload is valid - # @return [Dry::Monads::Failure] if the payload is invalid - def validate_payload_for_decompression - return Failure('Payload must be a Base64-encoded String') unless is_binary?(payload) - - Success(payload) - rescue StandardError - Failure('Invalid Base64 string for decompression') - end - - def is_binary?(payload) + # @param payload [String] The payload to check + # @return [Boolean] true if the payload is binary, false otherwise + def binary_payload? return false unless payload.respond_to?(:encoding) payload.encoding == Encoding::BINARY diff --git a/lib/event_source/protocols/amqp/bunny_queue_proxy.rb b/lib/event_source/protocols/amqp/bunny_queue_proxy.rb index db7a1141..175c9719 100644 --- a/lib/event_source/protocols/amqp/bunny_queue_proxy.rb +++ b/lib/event_source/protocols/amqp/bunny_queue_proxy.rb @@ -25,7 +25,6 @@ class BunnyQueueProxy # @return [Bunny::Queue] def initialize(channel_proxy, async_api_channel_item) @channel_proxy = channel_proxy - @async_api_channel_item = async_api_channel_item bindings = async_api_channel_item.bindings @consumers = [] @@ -135,7 +134,7 @@ def on_receive_message( subscriber = subscriber_klass.new subscriber.channel = @subject.channel - payload = decompress_payload_if_required(payload) + payload = decompress_payload_if_binary(payload) subscription_handler = EventSource::Protocols::Amqp::BunnyConsumerHandler.new( subscriber, @@ -151,10 +150,9 @@ def on_receive_message( subscriber = nil end - def decompress_payload_if_required(payload) - return payload unless is_binary?(payload) + def decompress_payload_if_binary(payload) + output = EventSource::Operations::PayloadCodec.new(payload: payload).decompress_if_binary - output = EventSource::Operations::PayloadCodec.new(payload: payload).decompress if output.success? output.value! else @@ -163,12 +161,6 @@ def decompress_payload_if_required(payload) end end - def is_binary?(payload) - return false unless payload.respond_to?(:encoding) - - payload.encoding == Encoding::BINARY - end - def find_executable(subscriber_klass, delivery_info) subscriber_suffix = subscriber_klass_name_to_suffix(subscriber_klass) diff --git a/spec/event_source/operations/payload_codec_spec.rb b/spec/event_source/operations/payload_codec_spec.rb new file mode 100644 index 00000000..f717d929 --- /dev/null +++ b/spec/event_source/operations/payload_codec_spec.rb @@ -0,0 +1,142 @@ +# frozen_string_literal: true + +require 'rails_helper' +require 'zlib' + +RSpec.describe EventSource::Operations::PayloadCodec do + subject { described_class.new(payload: test_payload) } + + let(:test_payload) { { key: 'value' }.to_json } + + describe '#compress' do + context 'when the payload is valid' do + it 'returns a compressed string' do + result = subject.compress + + expect(result).to be_success + output = result.value! + expect(output.encoding).to eq(Encoding::BINARY) + expect { Zlib.inflate(output) }.not_to raise_error + end + end + + context 'when the payload is invalid' do + let(:test_payload) { 12345 } # Invalid payload + + it 'returns a failure with an error message' do + result = subject.compress + + expect(result).to be_failure + expect(result.failure).to eq('Payload must be a Hash or String') + end + end + end + + describe '#decompress' do + let(:compressed_payload) { Zlib.deflate(test_payload) } + subject { described_class.new(payload: compressed_payload) } + + context 'when the payload is valid' do + it 'returns the decompressed string' do + result = subject.decompress + + expect(result).to be_success + expect(result.value!).to eq(test_payload) + end + end + + context 'when decompression fails' do + let(:compressed_payload) { 'invalid_data' } + + it 'returns a failure with an error message' do + result = subject.decompress + + expect(result).to be_failure + expect(result.failure).to match(/Decompression failed:/) + end + end + end + + describe '#decompress_if_binary' do + context 'when the payload is not binary' do + it 'returns the original payload' do + result = subject.decompress_if_binary + + expect(result).to be_success + expect(result.value!).to eq(test_payload) + end + end + + context 'when the payload is binary' do + let(:compressed_payload) { Zlib.deflate(test_payload) } + subject { described_class.new(payload: compressed_payload.force_encoding(Encoding::BINARY)) } + + it 'decompresses the payload' do + result = subject.decompress_if_binary + + expect(result).to be_success + expect(result.value!).to eq(test_payload) + end + end + end + + describe '#validate_payload_for_compression' do + context 'when the payload is a valid Hash' do + let(:test_payload) { { key: 'value' } } + + it 'returns the JSON string representation' do + result = subject.send(:validate_payload_for_compression) + + expect(result).to be_success + expect(result.value!).to eq(test_payload.to_json) + end + end + + context 'when the payload is a valid String' do + let(:test_payload) { 'valid_string' } + + it 'returns the string as is' do + result = subject.send(:validate_payload_for_compression) + + expect(result).to be_success + expect(result.value!).to eq(test_payload) + end + end + + context 'when the payload is invalid' do + let(:test_payload) { 12345 } + + it 'returns a failure with an error message' do + result = subject.send(:validate_payload_for_compression) + + expect(result).to be_failure + expect(result.failure).to eq('Payload must be a Hash or String') + end + end + end + + describe '#binary_payload?' do + + before do + subject { described_class.new(payload: test_payload) } + end + + context 'when the payload is binary' do + let(:test_payload) { Zlib.deflate({ key: 'value' }.to_json) } + + it 'returns true' do + result = subject.send(:binary_payload?) + + expect(result).to be true + end + end + + context 'when the payload is not binary' do + it 'returns false' do + result = subject.send(:binary_payload?) + + expect(result).to be false + end + end + end +end