Skip to content

Commit

Permalink
Add subscriber name as suffix for executable container keys (#108)
Browse files Browse the repository at this point in the history
* Add subscriber name as suffix for executable container keys

* added specs
  • Loading branch information
raghuramg authored Feb 9, 2024
1 parent f4e83cb commit ed23a94
Show file tree
Hide file tree
Showing 6 changed files with 294 additions and 48 deletions.
70 changes: 24 additions & 46 deletions lib/event_source/protocols/amqp/bunny_queue_proxy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -117,41 +117,6 @@ def convert_subscriber_prefetch(options)

def resolve_subscriber_routing_keys(channel, operation); end

# def register_subscription(subscriber_klass, bindings)
# consumer_proxy = consumer_proxy_for(bindings)

# consumer_proxy.on_delivery do |delivery_info, metadata, payload|
# on_receive_message(
# subscriber_klass,
# delivery_info,
# metadata,
# payload
# )
# end

# subscribe_consumer(consumer_proxy)
# end

# def subscribe_consumer(consumer_proxy)
# @subject.subscribe_with(consumer_proxy)
# @consumers.push(consumer_proxy)
# end

# def consumer_proxy_for(bindings)
# operation_bindings = convert_to_consumer_options(bindings[:amqp])

# logger.debug 'consumer proxy options:'
# logger.debug operation_bindings.inspect

# BunnyConsumerProxy.new(
# @subject.channel,
# @subject,
# '',
# operation_bindings[:no_ack],
# operation_bindings[:exclusive]
# )
# end

def on_receive_message(
subscriber_klass,
delivery_info,
Expand All @@ -164,17 +129,7 @@ def on_receive_message(
logger.debug metadata.inspect
logger.debug payload.inspect

if delivery_info.routing_key
routing_key = [app_name, delivery_info.routing_key].join(delimiter)
executable = subscriber_klass.executable_for(routing_key)
end

unless executable
routing_key = [app_name, exchange_name].join(delimiter)
executable = subscriber_klass.executable_for(routing_key)
end

logger.debug "routing_key: #{routing_key}"
executable = find_executable(subscriber_klass, delivery_info)
return unless executable

subscriber = subscriber_klass.new
Expand All @@ -196,6 +151,13 @@ def on_receive_message(
subscriber = nil
end

def find_executable(subscriber_klass, delivery_info)
subscriber_suffix = subscriber_klass_name_to_suffix(subscriber_klass)

find_executable_for_routing_key(subscriber_klass, delivery_info, subscriber_suffix) ||
find_default_executable(subscriber_klass, subscriber_suffix)
end

def respond_to_missing?(name, include_private); end

# Forward all missing method calls to the Bunny::Queue instance
Expand All @@ -205,6 +167,22 @@ def method_missing(name, *args)

private

def subscriber_klass_name_to_suffix(subscriber_klass)
subscriber_klass.name.downcase.gsub("::", '_')
end

def find_executable_for_routing_key(subscriber_klass, delivery_info, subscriber_suffix)
return unless delivery_info.routing_key

routing_key = [app_name, delivery_info.routing_key].join(delimiter)
subscriber_klass.executable_for(routing_key + "_#{subscriber_suffix}")
end

def find_default_executable(subscriber_klass, subscriber_suffix)
default_routing_key = [app_name, exchange_name].join(delimiter)
subscriber_klass.executable_for(default_routing_key + "_#{subscriber_suffix}")
end

def delimiter
EventSource.delimiter(:amqp)
end
Expand Down
8 changes: 6 additions & 2 deletions lib/event_source/subscriber.rb
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,15 @@ def subscribe(subscription_name, &block)
unless formatted_publisher_key.gsub(delimiter, '_') == identifier
unique_key_elements.push(identifier)
end
logger.debug "Subscriber#susbcribe Unique_key #{unique_key_elements.join(delimiter)}"

return unless block_given?

subscriber_suffix = self.name.downcase.gsub('::', '_')
subscriber_unique_key = unique_key_elements.join(delimiter) + "_#{subscriber_suffix}"
logger.debug "Subscriber#susbcribe Unique key #{subscriber_unique_key}"

EventSource::Subscriber.executable_container[
unique_key_elements.join(delimiter)
subscriber_unique_key
] =
block
end
Expand Down
153 changes: 153 additions & 0 deletions spec/event_source/protocols/amqp/bunny_queue_proxy_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -168,4 +168,157 @@
# end
# end
end

context "executable lookup with subscriber suffix" do
let(:connection_manager) { EventSource::ConnectionManager.instance }
let!(:connection) { connection_manager.add_connection(my_server) }

let(:event_log_subscriber) do
Pathname.pwd.join(
"spec",
"rails_app",
"app",
"event_source",
"subscribers",
"event_log_subscriber.rb"
)
end

let(:enterprise_subscriber) do
Pathname.pwd.join(
"spec",
"rails_app",
"app",
"event_source",
"subscribers",
"enterprise_subscriber.rb"
)
end

let(:publish_resource) do
EventSource::AsyncApi::Operations::AsyncApiConf::LoadPath
.new
.call(
path:
Pathname.pwd.join(
"spec",
"support",
"asyncapi",
"amqp_audit_log_publish.yml"
)
)
.success
end

let(:subscribe_resource) do
EventSource::AsyncApi::Operations::AsyncApiConf::LoadPath
.new
.call(
path:
Pathname.pwd.join(
"spec",
"support",
"asyncapi",
"amqp_audit_log_subscribe.yml"
)
)
.success
end

let(:subscribe_two_resource) do
EventSource::AsyncApi::Operations::AsyncApiConf::LoadPath
.new
.call(
path:
Pathname.pwd.join(
"spec",
"support",
"asyncapi",
"amqp_enterprise_subscribe.yml"
)
)
.success
end

let(:publish_channel) do
connection.add_channel(
"enroll.audit_log.events.created",
publish_resource.channels.first
)
end
let(:subscribe_channel) do
connection.add_channel(
"on_enroll.enroll.audit_log.events",
subscribe_resource.channels.first
)
end
let(:subscribe_two_channel) do
connection.add_channel(
"on_enroll.enroll.enterprise.events",
subscribe_two_resource.channels.first
)
end

let(:load_subscribers) do
[event_log_subscriber, enterprise_subscriber].each do |file|
require file.to_s
end
end

before do
allow(EventSource).to receive(:app_name).and_return("enroll")
connection.start unless connection.active?
publish_channel
subscribe_channel
subscribe_two_channel
load_subscribers
allow(subject).to receive(:exchange_name) { exchange_name }
end

let(:audit_log_proc) do
EventSource::Subscriber.executable_container[
"enroll.enroll.audit_log.events_subscribers_eventlogsubscriber"
]
end

let(:enterprise_advance_day_proc) do
EventSource::Subscriber.executable_container[
"enroll.enroll.enterprise.events.date_advanced_subscribers_enterprisesubscriber"
]
end

context "when routing key based executable is not found" do
let(:delivery_info) do
double(routing_key: "enroll.enterprise.events.date_advanced")
end

let(:exchange_name) { "enroll.audit_log.events" }

it "should return default audit log proc" do
executable =
subject.find_executable(
Subscribers::EventLogSubscriber,
delivery_info
)
expect(executable).to match(audit_log_proc)
end
end

context "when routing key based executable is found" do
let(:delivery_info) do
double(routing_key: "enroll.enterprise.events.date_advanced")
end

let(:exchange_name) { "enroll.enterprise.events" }

it "should return executable for the routing key" do
executable =
subject.find_executable(
Subscribers::EnterpriseSubscriber,
delivery_info
)
expect(executable).to match(enterprise_advance_day_proc)
end
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# frozen_string_literal: true

module Subscribers
# Subscriber will receive Enterprise requests like date change
class EnterpriseSubscriber
include ::EventSource::Subscriber[amqp: "enroll.enterprise.events"]

subscribe(:on_date_advanced) do |delivery_info, metadata, response|
logger.info "-" * 100 unless Rails.env.test?
logger.info "EnterpriseSubscriber#on_date_advanced, response: #{response}"

ack(delivery_info.delivery_tag)
rescue StandardError, SystemStackError => e
ack(delivery_info.delivery_tag)
end

subscribe(
:on_enroll_enterprise_events
) do |delivery_info, _metadata, response|
logger.info "-" * 100 unless Rails.env.test?
logger.info "EnterpriseSubscriber#on_enroll_enterprise_events, response: #{response}"

ack(delivery_info.delivery_tag)
rescue StandardError, SystemStackError => e
ack(delivery_info.delivery_tag)
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# frozen_string_literal: true

module Subscribers
# Subscriber will receive Audit Log events
class EventLogSubscriber
include EventSource::Logging
include ::EventSource::Subscriber[amqp: "enroll.audit_log.events"]

subscribe(
:on_enroll_audit_log_events
) do |delivery_info, metadata, response|
logger.info "-" * 100 unless Rails.env.test?

subscriber_logger.info "EventLogEventsSubscriber#on_enroll_audit_log_events, response: #{response}"

ack(delivery_info.delivery_tag)
rescue StandardError, SystemStackError => e
ack(delivery_info.delivery_tag)
end

private

def subscriber_logger
@subscriber_logger ||=
Logger.new("#{Rails.root}/log/on_enroll_audit_log_events.log")
end
end
end
55 changes: 55 additions & 0 deletions spec/support/asyncapi/amqp_enterprise_subscribe.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
---
asyncapi: 2.0.0
info:
title: Enroll App
version: 0.1.0
description: AMQP Subsribe configuration for the Enroll App services
contact:
name: IdeaCrew
url: https://ideacrew.com
email: [email protected]
license:
name: MIT
url: https://opensource.org/licenses/MIT

servers:
production:
url: "amqp://rabbitmq:5672/event_source"
protocol: :amqp
protocolVersion: "0.9.2"
description: RabbitMQ Production Server
development:
url: "amqp://rabbitmq:5672/event_source"
protocol: :amqp
protocolVersion: "0.9.2"
description: RabbitMQ Test Server
test:
url: "amqp://rabbitmq:5672/event_source"
protocol: :amqp
protocolVersion: "0.9.2"
description: RabbitMQ Test Server
channels:
on_enroll.enroll.enterprise.events:
bindings:
amqp:
is: queue
queue:
name: on_enroll.enroll.enterprise.events
durable: true
exclusive: false
auto_delete: false
vhost: "/"
subscribe:
bindings:
amqp:
ack: true
exclusive: false
routing_key: enroll.#
prefetch: 1
bindingVersion: "0.2.0"
operationId: on_enroll.enroll.enterprise.events
description: Events - for system wide changes

tags:
- name: linter_tag
description: placeholder that satisfies the linter

0 comments on commit ed23a94

Please sign in to comment.