Skip to content

Commit

Permalink
refactor payload_codec, add specs
Browse files Browse the repository at this point in the history
  • Loading branch information
raghuramg committed Dec 26, 2024
1 parent 8f7e798 commit e94ae79
Show file tree
Hide file tree
Showing 4 changed files with 182 additions and 35 deletions.
12 changes: 10 additions & 2 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ PATH
oj (~> 3.11)
ox (~> 2.14)
typhoeus (~> 1.4.0)
zlib (~> 3.2.1)

GEM
remote: https://rubygems.org/
Expand Down Expand Up @@ -194,7 +195,7 @@ GEM
mime-types-data (~> 3.2015)
mime-types-data (3.2021.0901)
mini_mime (1.1.2)
mini_portile2 (2.8.1)
mini_portile2 (2.8.8)
minitest (5.14.4)
mongo (2.14.0)
bson (>= 4.8.2, < 5.0.0)
Expand All @@ -206,9 +207,14 @@ GEM
mustermann (1.1.1)
ruby2_keywords (~> 0.0.1)
nio4r (2.5.8)
nokogiri (1.18.0)
mini_portile2 (~> 2.8.2)
racc (~> 1.4)
oj (3.13.9)
ox (2.14.5)
parallel (1.20.1)
parallel_tests (4.7.2)
parallel
parser (3.0.1.1)
ast (~> 2.4.1)
pry (0.14.1)
Expand All @@ -218,7 +224,7 @@ GEM
byebug (~> 11.0)
pry (~> 0.10)
public_suffix (4.0.6)
racc (1.6.2)
racc (1.8.1)
rack (2.2.3)
rack-protection (2.1.0)
rack
Expand Down Expand Up @@ -317,6 +323,7 @@ GEM
websocket-extensions (0.1.5)
yard (0.9.26)
zeitwerk (2.5.1)
zlib (3.2.1)

PLATFORMS
ruby
Expand All @@ -326,6 +333,7 @@ DEPENDENCIES
event_source!
faker
mongoid
parallel_tests
pry
pry-byebug
rails (>= 6.1.4)
Expand Down
49 changes: 27 additions & 22 deletions lib/event_source/operations/payload_codec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,43 +7,56 @@

module EventSource
module Operations
# A class for handling payload compression and decompression using Dry-rb.
# A class for handling payload compression and decompression using Zlib.
class PayloadCodec
include Dry::Monads[:result] # For Success/Failure monads
include Dry::Monads::Do # For do notation
# include Dry::Monads[:result] # For Success/Failure monads
# include Dry::Monads::Do # For do notation
include Dry::Monads[:result, :do]
extend Dry::Initializer

# @!attribute [r] payload
# @return [String, Hash] The payload to be compressed or decompressed
option :payload, reader: :private

# Compresses the payload into a Base64-encoded compressed string
# Compresses the payload into a compressed string using Zlib.
#
# @return [Dry::Monads::Success<String>] if compression is successful
# @return [Dry::Monads::Failure<String>] if an error occurs
def compress
json_payload = yield validate_payload_for_compression
compressed_data = Zlib.deflate(json_payload)
json_payload = validate_payload_for_compression
return json_payload unless json_payload.success?

compressed_data = Zlib.deflate(json_payload.value!)
Success(compressed_data)
rescue StandardError => e
Failure("Compression failed: #{e.message}")
end

# Decompresses a Base64-encoded compressed payload back to its original form
# Decompresses a compressed payload back to its original form.
#
# @return [Dry::Monads::Success<String>] if decompression is successful
# @return [Dry::Monads::Failure<String>] if an error occurs
def decompress
decoded_data = yield validate_payload_for_decompression
decompressed_data = Zlib.inflate(decoded_data)
decompressed_data = Zlib.inflate(payload)

Success(decompressed_data)
rescue StandardError => e
Failure("Decompression failed: #{e.message}")
end

# Decompresses the payload if it is in binary encoding.
#
# @return [Dry::Monads::Success<String>] if payload is decompressed or not binary
# @return [Dry::Monads::Failure<String>] if decompression fails
def decompress_if_binary
return Success(payload) unless binary_payload?

decompress
end

private

# Validates the payload before compression
# Validates the payload before compression and ensures it is a Hash or String.
#
# @return [Dry::Monads::Success<String>] if the payload is valid
# @return [Dry::Monads::Failure<String>] if the payload is invalid
Expand All @@ -53,19 +66,11 @@ def validate_payload_for_compression
Success(payload.is_a?(Hash) ? payload.to_json : payload)
end

# Validates the payload before decompression
# Checks if the payload is in binary encoding.
#
# @return [Dry::Monads::Success<String>] if the payload is valid
# @return [Dry::Monads::Failure<String>] if the payload is invalid
def validate_payload_for_decompression
return Failure('Payload must be a Base64-encoded String') unless is_binary?(payload)

Success(payload)
rescue StandardError
Failure('Invalid Base64 string for decompression')
end

def is_binary?(payload)
# @param payload [String] The payload to check
# @return [Boolean] true if the payload is binary, false otherwise
def binary_payload?
return false unless payload.respond_to?(:encoding)

payload.encoding == Encoding::BINARY
Expand Down
14 changes: 3 additions & 11 deletions lib/event_source/protocols/amqp/bunny_queue_proxy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ class BunnyQueueProxy
# @return [Bunny::Queue]
def initialize(channel_proxy, async_api_channel_item)
@channel_proxy = channel_proxy
@async_api_channel_item = async_api_channel_item
bindings = async_api_channel_item.bindings
@consumers = []

Expand Down Expand Up @@ -135,7 +134,7 @@ def on_receive_message(

subscriber = subscriber_klass.new
subscriber.channel = @subject.channel
payload = decompress_payload_if_required(payload)
payload = decompress_payload_if_binary(payload)
subscription_handler =
EventSource::Protocols::Amqp::BunnyConsumerHandler.new(
subscriber,
Expand All @@ -151,10 +150,9 @@ def on_receive_message(
subscriber = nil
end

def decompress_payload_if_required(payload)
return payload unless is_binary?(payload)
def decompress_payload_if_binary(payload)
output = EventSource::Operations::PayloadCodec.new(payload: payload).decompress_if_binary

output = EventSource::Operations::PayloadCodec.new(payload: payload).decompress
if output.success?
output.value!
else
Expand All @@ -163,12 +161,6 @@ def decompress_payload_if_required(payload)
end
end

def is_binary?(payload)
return false unless payload.respond_to?(:encoding)

payload.encoding == Encoding::BINARY
end

def find_executable(subscriber_klass, delivery_info)
subscriber_suffix = subscriber_klass_name_to_suffix(subscriber_klass)

Expand Down
142 changes: 142 additions & 0 deletions spec/event_source/operations/payload_codec_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
# frozen_string_literal: true

require 'rails_helper'
require 'zlib'

RSpec.describe EventSource::Operations::PayloadCodec do
subject { described_class.new(payload: test_payload) }

let(:test_payload) { { key: 'value' }.to_json }

describe '#compress' do
context 'when the payload is valid' do
it 'returns a compressed string' do
result = subject.compress

expect(result).to be_success
output = result.value!
expect(output.encoding).to eq(Encoding::BINARY)
expect { Zlib.inflate(output) }.not_to raise_error
end
end

context 'when the payload is invalid' do
let(:test_payload) { 12345 } # Invalid payload

it 'returns a failure with an error message' do
result = subject.compress

expect(result).to be_failure
expect(result.failure).to eq('Payload must be a Hash or String')
end
end
end

describe '#decompress' do
let(:compressed_payload) { Zlib.deflate(test_payload) }
subject { described_class.new(payload: compressed_payload) }

context 'when the payload is valid' do
it 'returns the decompressed string' do
result = subject.decompress

expect(result).to be_success
expect(result.value!).to eq(test_payload)
end
end

context 'when decompression fails' do
let(:compressed_payload) { 'invalid_data' }

it 'returns a failure with an error message' do
result = subject.decompress

expect(result).to be_failure
expect(result.failure).to match(/Decompression failed:/)
end
end
end

describe '#decompress_if_binary' do
context 'when the payload is not binary' do
it 'returns the original payload' do
result = subject.decompress_if_binary

expect(result).to be_success
expect(result.value!).to eq(test_payload)
end
end

context 'when the payload is binary' do
let(:compressed_payload) { Zlib.deflate(test_payload) }
subject { described_class.new(payload: compressed_payload.force_encoding(Encoding::BINARY)) }

it 'decompresses the payload' do
result = subject.decompress_if_binary

expect(result).to be_success
expect(result.value!).to eq(test_payload)
end
end
end

describe '#validate_payload_for_compression' do
context 'when the payload is a valid Hash' do
let(:test_payload) { { key: 'value' } }

it 'returns the JSON string representation' do
result = subject.send(:validate_payload_for_compression)

expect(result).to be_success
expect(result.value!).to eq(test_payload.to_json)
end
end

context 'when the payload is a valid String' do
let(:test_payload) { 'valid_string' }

it 'returns the string as is' do
result = subject.send(:validate_payload_for_compression)

expect(result).to be_success
expect(result.value!).to eq(test_payload)
end
end

context 'when the payload is invalid' do
let(:test_payload) { 12345 }

it 'returns a failure with an error message' do
result = subject.send(:validate_payload_for_compression)

expect(result).to be_failure
expect(result.failure).to eq('Payload must be a Hash or String')
end
end
end

describe '#binary_payload?' do

before do
subject { described_class.new(payload: test_payload) }
end

context 'when the payload is binary' do
let(:test_payload) { Zlib.deflate({ key: 'value' }.to_json) }

it 'returns true' do
result = subject.send(:binary_payload?)

expect(result).to be true
end
end

context 'when the payload is not binary' do
it 'returns false' do
result = subject.send(:binary_payload?)

expect(result).to be false
end
end
end
end

0 comments on commit e94ae79

Please sign in to comment.