diff --git a/examples/http_loader.rb b/examples/http_loader.rb index 74dd430..fea0a0f 100644 --- a/examples/http_loader.rb +++ b/examples/http_loader.rb @@ -43,27 +43,43 @@ def initialize(host:, size: 4, timeout: 4) @host = host @size = size @timeout = timeout + @futures = {} end - def perform(operations) + def perform_on_wait(operations) # This fans out and starts off all the concurrent work, which starts and # immediately returns Concurrent::Promises::Future` objects for each operation. + operations.each do |operation| + future(operation) + end + end + + def perform(operations) + # Defer to let other non-async loaders run to completion first. + defer + + # Collect the futures (and possibly trigger any newly added ones) futures = operations.map do |operation| - Concurrent::Promises.future do - pool.with { |connection| operation.call(connection) } - end + future(operation) end + # At this point, all of the concurrent work has been started. # This converges back in, waiting on each concurrent future to finish, and fulfilling each # (non-concurrent) Promise.rb promise. operations.each_with_index.each do |operation, index| - fulfill(operation, futures[index].value) # .value is a blocking call + fulfill(operation, futures[index].value!) # .value is a blocking call end end private + def future(operation) + @futures[operation] ||= Concurrent::Promises.future do + pool.with { |connection| operation.call(connection) } + end + end + def pool @pool ||= ConnectionPool.new(size: @size, timeout: @timeout) do HTTP.persistent(@host) diff --git a/lib/graphql/batch/executor.rb b/lib/graphql/batch/executor.rb index dd64316..9d35c1d 100644 --- a/lib/graphql/batch/executor.rb +++ b/lib/graphql/batch/executor.rb @@ -53,6 +53,19 @@ def resolve(loader) @loading = was_loading end + def defer(loader) + while (non_deferred_loader = @loaders.find { |_, loader| !loader.deferred }) + resolve(non_deferred_loader) + end + end + + def on_wait + # FIXME: Better name? + @loaders.each do |_, loader| + loader.on_any_wait + end + end + def tick resolve(@loaders.shift.last) end diff --git a/lib/graphql/batch/loader.rb b/lib/graphql/batch/loader.rb index 0484df5..ff2a441 100644 --- a/lib/graphql/batch/loader.rb +++ b/lib/graphql/batch/loader.rb @@ -42,13 +42,14 @@ def current_executor end end - attr_accessor :loader_key, :executor + attr_accessor :loader_key, :executor, :deferred def initialize @loader_key = nil @executor = nil @queue = nil @cache = nil + @deferred = false end def load(key) @@ -66,6 +67,12 @@ def prime(key, value) cache[cache_key(key)] ||= ::Promise.resolve(value).tap { |p| p.source = self } end + def on_any_wait + return if resolved? + load_keys = queue # "Peek" the queue, but don't consume it. + perform_on_wait(load_keys) + end + def resolve # :nodoc: return if resolved? load_keys = queue @@ -88,6 +95,7 @@ def around_perform # For Promise#sync def wait # :nodoc: if executor + executor.on_wait executor.resolve(self) else resolve @@ -126,6 +134,36 @@ def fulfilled?(key) promise.pending? && promise.source != self end + def perform_on_wait(keys) + # FIXME: Better name? + # Interface to add custom code to e.g. trigger async operations when any loader starts waiting. + # Example: + # + # def initialize + # super() + # @futures = {} + # end + # + # def perform_on_wait(keys) + # keys.each do |key| + # future(key) + # end + # end + # + # def perform(keys) + # defer # let other non-async loaders run to completion first. + # keys.each do |key| + # future(key).value! + # end + # end + # + # def future(key) + # @futures[key] ||= Concurrent::Promises.future do + # # Perform the async operation + # end + # end + end + # Must override to load the keys and call #fulfill for each key def perform(keys) raise NotImplementedError @@ -146,6 +184,13 @@ def finish_resolve(key) end end + def defer + @deferred = true + executor.defer(self) + ensure + @deferred = false + end + def cache @cache ||= {} end