Skip to content

Commit

Permalink
Add GraphQL::Batch::Async
Browse files Browse the repository at this point in the history
  • Loading branch information
tgwizard committed Oct 14, 2024
1 parent 046a00b commit 864d9af
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 45 deletions.
9 changes: 4 additions & 5 deletions examples/http_loader.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
# An example loader which is blocking and synchronous as a whole, but executes all of its operations concurrently.
module Loaders
class HTTPLoader < GraphQL::Batch::Loader
include GraphQL::Batch::Async

def initialize(host:, size: 4, timeout: 4)
super()
@host = host
Expand All @@ -46,7 +48,7 @@ def initialize(host:, size: 4, timeout: 4)
@futures = {}
end

def perform_on_wait(operations)
def perform_early(operations)
# This fans out and starts off all the concurrent work, which starts and
# immediately returns Concurrent::Promises::Future` objects for each operation.
operations.each do |operation|
Expand All @@ -55,17 +57,14 @@ def perform_on_wait(operations)
end

def perform(operations)
# Defer to let other non-async loaders run to completion first.
defer

# Collect the futures (and possibly trigger any newly added ones)
futures = operations.map do |operation|
future(operation)
end

# At this point, all of the concurrent work has been started.

# This converges back in, waiting on each concurrent future to finish, and fulfilling each
# Now it converges back in, waiting on each concurrent future to finish, and fulfilling each
# (non-concurrent) Promise.rb promise.
operations.each_with_index.each do |operation, index|
fulfill(operation, futures[index].value!) # .value is a blocking call
Expand Down
1 change: 1 addition & 0 deletions lib/graphql/batch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,6 @@ def self.use(schema_defn, executor_class: GraphQL::Batch::Executor)

require_relative "batch/version"
require_relative "batch/loader"
require_relative "batch/async"
require_relative "batch/executor"
require_relative "batch/setup_multiplex"
25 changes: 25 additions & 0 deletions lib/graphql/batch/async.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
module GraphQL::Batch
module Async
def resolve
defer # Let other non-async loaders run to completion first.
@peek_queue_index = nil
super
end

def on_any_loader_wait
@peek_queue_index ||= 0
peek_queue = queue[@peek_queue_index..]
return if peek_queue.empty?
@peek_queue_index = peek_queue.size
perform_early(peek_queue)
end

def perform_early(keys)
raise NotImplementedError, "Implement GraphQL::Batch::Async#perform_early to trigger async operations early"
end

def perform(keys)
raise NotImplementedError, "Implement GraphQL::Batch::Async#perform to wait on the async operations"
end
end
end
9 changes: 6 additions & 3 deletions lib/graphql/batch/executor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,19 @@ def resolve(loader)
@loading = was_loading
end

def defer(_loader)
while (non_deferred_loader = @loaders.find { |_, loader| !loader.deferred })
# Defer the resolution of the current loader, allowing other loaders to be resolved first.
# This is useful when the current loader has kicked off async or concurrent work, and don't need to
# block execution of the current thread until later.
def defer_to_other_loaders
while (non_deferred_loader = @loaders.find { |_, loader| !loader.deferred && !loader.resolved? })
resolve(non_deferred_loader)
end
end

def on_wait
# FIXME: Better name?
@loaders.each do |_, loader|
loader.on_any_wait
loader.on_any_loader_wait
end
end

Expand Down
43 changes: 6 additions & 37 deletions lib/graphql/batch/loader.rb
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,11 @@ def prime(key, value)
cache[cache_key(key)] ||= ::Promise.resolve(value).tap { |p| p.source = self }
end

def on_any_wait
return if resolved?
load_keys = queue # "Peek" the queue, but don't consume it.
# TODO: Should we have a "peek queue" / "async queue", that we can consume here, to prevent
# duplicate calls to perform_on_wait? (perform_on_wait should be idempotent anyway, but...)
perform_on_wait(load_keys)
# Called when any GraphQL::Batch::Loader starts waiting. May be called more than once per loader, if
# the loader is waiting multiple times. Will not be called once per promise.
#
# Use GraphQL::Batch::Async for the common way to use this.
def on_any_loader_wait
end

def resolve # :nodoc:
Expand Down Expand Up @@ -136,36 +135,6 @@ def fulfilled?(key)
promise.pending? && promise.source != self
end

def perform_on_wait(keys)
# FIXME: Better name?
# Interface to add custom code to e.g. trigger async operations when any loader starts waiting.
# Example:
#
# def initialize
# super()
# @futures = {}
# end
#
# def perform_on_wait(keys)
# keys.each do |key|
# future(key)
# end
# end
#
# def perform(keys)
# defer # let other non-async loaders run to completion first.
# keys.each do |key|
# future(key).value!
# end
# end
#
# def future(key)
# @futures[key] ||= Concurrent::Promises.future do
# # Perform the async operation
# end
# end
end

# Must override to load the keys and call #fulfill for each key
def perform(keys)
raise NotImplementedError
Expand All @@ -188,7 +157,7 @@ def finish_resolve(key)

def defer
@deferred = true
executor.defer(self)
executor.defer_to_other_loaders
ensure
@deferred = false
end
Expand Down

0 comments on commit 864d9af

Please sign in to comment.