Skip to content

Commit

Permalink
add an ability to deliver messages asynchronously via adapters
Browse files Browse the repository at this point in the history
  • Loading branch information
prog-supdex committed Oct 4, 2023
1 parent bef401d commit ebc7e13
Show file tree
Hide file tree
Showing 16 changed files with 571 additions and 36 deletions.
28 changes: 28 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,34 @@ Or you can use `collect`
end
```

## Async delivering messages

If you use `Rails` application or you use `ActiveJob`, you can deliver messages using `ActiveJob`

### Configuration

You have the next configuration

```ruby
GraphQL::AnyCable.configure do |config|
# ... other configurations
config.delivery_method = "inline" # the default value "inline", also can be "active_job"
config.queue = "default" # the name of ActiveJob queue
config.job_class = "GraphQL::Jobs::TriggerJob" # the name executor job
end
```

`delivery_method` can be either `inline` or `active_job`.
`inline` means that delivering messaging will work sync.
`active_job` - It will add delivering messages operations to `ActiveJob` with queue `default` and using job `GraphQL::Jobs::TriggerJob`

You can change the queue or job_class by changing it in the configuration

Or you can run code

```ruby
GraphQL::AnyCable.delivery_method = "active_job", { queue: "broadcasting", job_class: "GraphQL::Jobs::TriggerJob" }
```

## Testing applications which use `graphql-anycable`

Expand Down
1 change: 1 addition & 0 deletions graphql-anycable.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,5 @@ Gem::Specification.new do |spec|
spec.add_development_dependency "railties"
spec.add_development_dependency "rake", ">= 12.3.3"
spec.add_development_dependency "rspec", "~> 3.0"
spec.add_development_dependency "activejob", "~> 6.0"
end
14 changes: 14 additions & 0 deletions lib/graphql-anycable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
require_relative "graphql/anycable/config"
require_relative "graphql/anycable/railtie" if defined?(Rails)
require_relative "graphql/anycable/stats"
require_relative "graphql/anycable/delivery_adapter"
require_relative "graphql/subscriptions/anycable_subscriptions"

module GraphQL
Expand All @@ -25,6 +26,19 @@ def self.stats(**options)
Stats.new(**options).collect
end

def self.delivery_method=(args)
method_name, options = Array(args)
options ||= {}

config.delivery_method = method_name
config.queue = options[:queue] if options[:queue]
config.job_class = options[:job_class] if options[:job_class]
end

def self.delivery_adapter(object)
DeliveryAdapter.lookup(executor_object: object)
end

module_function

def redis
Expand Down
25 changes: 25 additions & 0 deletions lib/graphql/adapters/active_job_adapter.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# frozen_string_literal: true

module GraphQL
module Adapters
class ActiveJobAdapter < BaseAdapter
def trigger(...)
executor_class_job.set(queue: config.queue).perform_later(
executor_object,
executor_method,
...
)
end

private

def executor_class_job
config.job_class.constantize
end

def config
GraphQL::AnyCable.config
end
end
end
end
18 changes: 18 additions & 0 deletions lib/graphql/adapters/base_adapter.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# frozen_string_literal: true

module GraphQL
module Adapters
class BaseAdapter
attr_reader :executor_object, :executor_method

def initialize(executor_object:)
@executor_object = executor_object
@executor_method = executor_object.class::EXECUTOR_METHOD_NAME
end

def trigger
raise NoMethodError, "#{__method__} method should be implemented in concrete class"
end
end
end
end
11 changes: 11 additions & 0 deletions lib/graphql/adapters/inline_adapter.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# frozen_string_literal: true

module GraphQL
module Adapters
class InlineAdapter < BaseAdapter
def trigger(...)
executor_object.public_send(executor_method, ...)
end
end
end
end
32 changes: 32 additions & 0 deletions lib/graphql/anycable/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,38 @@ class Config < Anyway::Config
attr_config use_redis_object_on_cleanup: true
attr_config use_client_provided_uniq_id: true
attr_config redis_prefix: "graphql" # Here, we set clear redis_prefix without any hyphen. The hyphen is added at the end of this value on our side.

attr_config delivery_method: "inline", queue: "default", job_class: "GraphQL::Jobs::TriggerJob"

def job_class=(value)
ensure_value_is_not_blank!("job_class", value)

super
end

def queue=(value)
ensure_value_is_not_blank!("queue", value)

super
end

def delivery_method=(value)
ensure_value_is_not_blank!("delivery_method", value)

super
end

private

def empty_value?(value)
value.nil? || value == ""
end

def ensure_value_is_not_blank!(name, value)
return unless empty_value?(value)

raise_validation_error("#{name} can not be blank")
end
end
end
end
25 changes: 25 additions & 0 deletions lib/graphql/anycable/delivery_adapter.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# frozen_string_literal: true

require "graphql/adapters/base_adapter"
require "graphql/adapters/inline_adapter"
require "graphql/adapters/active_job_adapter"

module GraphQL
module AnyCable
class DeliveryAdapter
class << self
def lookup(options)
adapter_class_name = config.delivery_method.to_s.split("_").map(&:capitalize).join

Adapters.const_get("#{adapter_class_name}Adapter").new(**(options || {}))
rescue NameError => e
raise e.class, "Delivery adapter :#{config.delivery_method} haven't been found", e.backtrace
end

def config
GraphQL::AnyCable.config
end
end
end
end
end
9 changes: 9 additions & 0 deletions lib/graphql/anycable/railtie.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,15 @@
module GraphQL
module AnyCable
class Railtie < ::Rails::Railtie
initializer "graphql_anycable.load_trigger_job" do
ActiveSupport.on_load(:active_job) do
require "graphql/jobs/trigger_job"
require "graphql/serializers/anycable_subscription_serializer"

ActiveJob::Serializers.add_serializers(GraphQL::Serializers::AnyCableSubscriptionSerializer)
end
end

rake_tasks do
path = File.expand_path(__dir__)
Dir.glob("#{path}/tasks/**/*.rake").each { |f| load f }
Expand Down
11 changes: 11 additions & 0 deletions lib/graphql/jobs/trigger_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# frozen_string_literal: true

module GraphQL
module Jobs
class TriggerJob < ActiveJob::Base
def perform(executor_object, execute_method, event_name, args = {}, object = nil, options = {})
executor_object.public_send(execute_method, event_name, args, object, **options)
end
end
end
end
19 changes: 19 additions & 0 deletions lib/graphql/serializers/anycable_subscription_serializer.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# frozen_string_literal: true

module GraphQL
module Serializers
class AnyCableSubscriptionSerializer < ActiveJob::Serializers::ObjectSerializer
def serialize?(argument)
argument.kind_of?(GraphQL::Subscriptions::AnyCableSubscriptions)
end

def serialize(subscription)
super(subscription.collected_arguments)
end

def deserialize(payload)
GraphQL::Subscriptions::AnyCableSubscriptions.new(**payload)
end
end
end
end
15 changes: 15 additions & 0 deletions lib/graphql/subscriptions/anycable_subscriptions.rb
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,21 @@ class AnyCableSubscriptions < GraphQL::Subscriptions

def_delegators :"GraphQL::AnyCable", :redis, :config

attr_reader :collected_arguments
alias_method :trigger_sync, :trigger

SUBSCRIPTION_PREFIX = "subscription:" # HASH: Stores subscription data: query, context, …
FINGERPRINTS_PREFIX = "fingerprints:" # ZSET: To get fingerprints by topic
SUBSCRIPTIONS_PREFIX = "subscriptions:" # SET: To get subscriptions by fingerprint
CHANNEL_PREFIX = "channel:" # SET: Auxiliary structure for whole channel's subscriptions cleanup
EXECUTOR_METHOD_NAME = "trigger_sync" # method, who executes the sync method "trigger"

# @param serializer [<#dump(obj), #load(string)] Used for serializing messages before handing them to `.broadcast(msg)`
def initialize(serializer: Serialize, **rest)
@serializer = serializer

@collected_arguments = collect_arguments(serializer, rest)

super
end

Expand Down Expand Up @@ -206,6 +213,10 @@ def delete_channel_subscriptions(channel_or_id)
redis.del(redis_key(CHANNEL_PREFIX) + channel_id)
end

def trigger(...)
AnyCable.delivery_adapter(self).trigger(...)
end

private

def anycable
Expand Down Expand Up @@ -243,6 +254,10 @@ def fetch_channel_istate(channel)
def redis_key(prefix)
"#{config.redis_prefix}-#{prefix}"
end

def collect_arguments(serializer, payload)
payload.merge(serializer: serializer)
end
end
end
end
Expand Down
90 changes: 89 additions & 1 deletion spec/graphql/anycable_spec.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# frozen_string_literal: true

require "active_job"
require "graphql/jobs/trigger_job"

RSpec.describe GraphQL::AnyCable do
subject do
AnycableSchema.execute(
Expand Down Expand Up @@ -263,9 +266,94 @@

describe ".stats" do
it "calls Graphql::AnyCable::Stats" do
allow_any_instance_of(GraphQL::AnyCable::Stats).to receive(:collect)
expect_any_instance_of(GraphQL::AnyCable::Stats).to receive(:collect)

described_class.stats
end
end

describe ".delivery_adapter" do
context "when config.delivery_method is inline" do
around do |ex|
old_value = GraphQL::AnyCable.config.delivery_method
GraphQL::AnyCable.config.delivery_method = "inline"

ex.run

GraphQL::AnyCable.config.delivery_method = old_value
end

it "calls InlineAdapter" do
expect(GraphQL::Adapters::InlineAdapter).to receive(:new).with(executor_object: "any_object")

described_class.delivery_adapter("any_object")
end
end

context "when config.delivery_method is active_job" do
around do |ex|
old_value = GraphQL::AnyCable.config.delivery_method
GraphQL::AnyCable.config.delivery_method = "active_job"

ex.run

GraphQL::AnyCable.config.delivery_method = old_value
end

it "calls ActiveJobAdapter" do
expect(GraphQL::Adapters::ActiveJobAdapter).to receive(:new).with(executor_object: "any_object")

described_class.delivery_adapter("any_object")
end
end
end

describe ".delivery_method" do
let(:config) { GraphQL::AnyCable.config }

after do
config.delivery_method = "inline"
config.queue = "default"
config.job_class = "GraphQL::Jobs::TriggerJob"
end

it "changes config" do
expect(config.delivery_method).to eq("inline")
expect(config.queue).to eq("default")
expect(config.job_class).to eq("GraphQL::Jobs::TriggerJob")

described_class.delivery_method = :active_job, { queue: "test", job_class: "CustomJob" }

expect(config.delivery_method).to eq(:active_job)
expect(config.queue).to eq("test")
expect(config.job_class).to eq("CustomJob")
end

context "when entered empty delivery_method" do
it "raises an error" do
expect { described_class.delivery_method = nil }.to raise_error(
Anyway::Config::ValidationError,
/delivery_method can not be blank/,
)
end
end

context "when entered invalid queue" do
it "raises an error" do
expect { described_class.delivery_method = "inline", { queue: "" } }.to raise_error(
Anyway::Config::ValidationError,
/queue can not be blank/,
)
end
end

context "when entered invalid job_class" do
it "raises an error" do
expect { described_class.delivery_method = "inline", { job_class: "" } }.to raise_error(
Anyway::Config::ValidationError,
/job_class can not be blank/,
)
end
end
end
end
Loading

0 comments on commit ebc7e13

Please sign in to comment.