From b0924797cdeb9e0c61b40f8bae4185a98d9cfc71 Mon Sep 17 00:00:00 2001 From: Jorge Revuelta Date: Wed, 28 Feb 2018 14:53:17 +0100 Subject: [PATCH 01/10] First draft on RecreatableTask and recreate queue --- recreatable_task.rb | 40 ++++++++++++++++++++++++++++++++++++++++ task.rb | 32 ++++++++++++++++++++++++++++++++ task_queue.rb | 43 +++++++++++++++++++++++++++++++++++++++---- 3 files changed, 111 insertions(+), 4 deletions(-) create mode 100644 recreatable_task.rb diff --git a/recreatable_task.rb b/recreatable_task.rb new file mode 100644 index 0000000..92fef32 --- /dev/null +++ b/recreatable_task.rb @@ -0,0 +1,40 @@ +# frozen_string_literal: true + +module TaskQueue + # Mixin included by Tasks that may be recreated from a file. + module RecreatableTask + def self.included(base) + base.send(:include, InstanceMethods) + end + + # Add this as instance methods. + module InstanceMethods + # This method is the base execution unit for a given Task, it + # receives a Hash-like parameter collection defined by the + # `params_to_hash` resultant Hash. + def run!(*params) + # no-op + end + + # This method is intended to provide a nice Hash-based interface + # for all the parameters used in a given task and, that in case + # of system failure, might be recreated later. + # For this reason, user has to take in mind that the hash should + # always be serializable right away. + def params_to_hash + {} + end + + def to_task + task = Task.new(work_block: proc { recreatable_task.run!(params_to_hash) }) + task.recreatable = true + task.recreatable.freeze # Avoid further mutations on this. + task.recreatable_class = self.class.constantize + task.recreatable_class.freeze + task.recreatable_params = params_to_hash + task.recreatable_params.freeze + task + end + end + end +end diff --git a/task.rb b/task.rb index 44df00f..315f413 100644 --- a/task.rb +++ b/task.rb @@ -1,3 +1,6 @@ +# frozen_string_literal: true +require 'json' + module TaskQueue # Smallest unit of work that can be submitted to a TaskQueue # Not reusable, if you attempt to re-execute this, the worker will raise an exception @@ -9,6 +12,9 @@ class Task attr_accessor :completed attr_accessor :submitted attr_accessor :finished_successfully + attr_accessor :recreatable + attr_accessor :recreatable_class + attr_accessor :recreatable_params def initialize(name: nil, description: nil, work_block: nil, ensure_block: nil) self.work_block = work_block @@ -18,9 +24,35 @@ def initialize(name: nil, description: nil, work_block: nil, ensure_block: nil) self.name.freeze self.description.freeze + self.recreatable = false + self.recreatable_class = nil self.completed = false self.submitted = false self.finished_successfully = false end + + def self.from_recreatable_task!(file_path: nil) + raise 'Task file path was not provided' unless file_path.nil? + raise 'Task class was not provided' unless recreatable_subclass.nil? + + recreatable_task_hash = JSON.parse(File.read(file_path)) + recreatable_task = recreatable_task_hash['class'].constantize.new + + raise 'Recreatable task does not include `RecreatableTask` module' unless recreatable_task.class.included_modules.include?(TaskQueue::RecreatableTask) + + params = recreatable_task_hash['params'] + + raise "Unexpected parameter type, found #{params.class} expected Hash." unless params.is_a?(Hash) + + task = Task.new(work_block: proc { recreatable_task.run!(params) }) + task.recreatable = true + task.recreatable.freeze # Avoid further mutations on this. + task.recreatable_class = recreatable_task.constantize + task.recreatable_class.freeze + task.recreatable_params = params + task.recreatable_params.freeze + + task + end end end diff --git a/task_queue.rb b/task_queue.rb index 4607653..d4bd388 100644 --- a/task_queue.rb +++ b/task_queue.rb @@ -1,6 +1,10 @@ -require "set" -require_relative "task" -require_relative "queue_worker" +# frozen_string_literal: true + +require 'set' +require_relative 'task' +require_relative 'queue_worker' +require 'tmpdir' +require 'json' module TaskQueue # A queue that executes tasks in the order in which they were received @@ -20,6 +24,8 @@ def initialize(name:, number_of_workers: 1) @available_workers.add(worker) end + ObjectSpace.define_finalizer(self, self.class.finalize(name: name, number_of_workers: number_of_workers, tasks: queue.to_a)) + start_task_distributor end @@ -49,7 +55,7 @@ def available_worker def hand_out_work # get first available worker - while (worker = self.available_worker) + while (worker = available_worker) # if none are available, that's cool. break if worker.nil? @@ -100,5 +106,34 @@ def busy_worker_count return @busy_workers.count end end + + def self.finalizer(name: nil, number_of_workers: 1, tasks: nil) + return if tasks.nil? || name.nil? + path = Pathname.new(Dir.tmpdir).join(name, 'meta.json') + File.write(path, JSON.pretty_generate('name' => name, 'number_of_workers' => number_of_workers)) + to_a(tasks) + .select { |task| task.recreatable && !task.completed } + .each_with_index do |task, index| + task_meta = { 'class' => task.recreatable_class.name, 'params' => task.recreatable_params } + File.write(Pathname.new(Dir.tmpdir).join(name, "#{index}.json"), JSON.pretty_generate(task_meta)) + end + end + + def self.from_recreated_tasks(name: nil) + path = Pathname.new(Dir.tmpdir).join(name, 'meta.json') + queue_meta = JSON.parse(File.read(path)) + queue = TaskQueue.new(name: queue_meta['name'], number_of_workers: queue_meta['number_of_workers']) + Dir.glob(Dir.tmpdir.join(name, '*.json')).sort do |json_file| + next if File.basename(json_file).eql?('meta.json') + queue.add_task_async(task: Task.from_recreatable_task!(file_path: Pathname.new(Dir.tmpdir).join(name, json_file))) + end + queue + end + + private + + def to_a(queue) + queue.size.times.map { queue.pop } + end end end From 677c792dcc8601ba18697d4aaae82e009056ab14 Mon Sep 17 00:00:00 2001 From: Jorge Revuelta Date: Wed, 28 Feb 2018 14:55:55 +0100 Subject: [PATCH 02/10] Typo --- recreatable_task.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/recreatable_task.rb b/recreatable_task.rb index 92fef32..e575b31 100644 --- a/recreatable_task.rb +++ b/recreatable_task.rb @@ -26,7 +26,7 @@ def params_to_hash end def to_task - task = Task.new(work_block: proc { recreatable_task.run!(params_to_hash) }) + task = Task.new(work_block: proc { run!(params_to_hash) }) task.recreatable = true task.recreatable.freeze # Avoid further mutations on this. task.recreatable_class = self.class.constantize From 0a9759385d29a9897f737e08232f20ff354a818a Mon Sep 17 00:00:00 2001 From: Jorge Revuelta Date: Wed, 28 Feb 2018 19:04:15 +0100 Subject: [PATCH 03/10] Added simple test and more additions --- recreatable_task.rb | 4 ++-- specs/work_queue_spec.rb | 34 ++++++++++++++++++++++++++++++++++ task.rb | 7 ++++--- task_queue.rb | 37 ++++++++++++++++++++++--------------- 4 files changed, 62 insertions(+), 20 deletions(-) diff --git a/recreatable_task.rb b/recreatable_task.rb index e575b31..622e3ac 100644 --- a/recreatable_task.rb +++ b/recreatable_task.rb @@ -12,7 +12,7 @@ module InstanceMethods # This method is the base execution unit for a given Task, it # receives a Hash-like parameter collection defined by the # `params_to_hash` resultant Hash. - def run!(*params) + def run!(params) # no-op end @@ -29,7 +29,7 @@ def to_task task = Task.new(work_block: proc { run!(params_to_hash) }) task.recreatable = true task.recreatable.freeze # Avoid further mutations on this. - task.recreatable_class = self.class.constantize + task.recreatable_class = self.class task.recreatable_class.freeze task.recreatable_params = params_to_hash task.recreatable_params.freeze diff --git a/specs/work_queue_spec.rb b/specs/work_queue_spec.rb index 27cd07d..8d19d1c 100644 --- a/specs/work_queue_spec.rb +++ b/specs/work_queue_spec.rb @@ -1,5 +1,17 @@ require_relative '../task_queue' +class MockRecreatableTask + include TaskQueue::RecreatableTask + + def run!(params) + puts "#{params}" + end + + def params_to_hash + { "one_param" => "Hello", "other_param" => "World" } + end +end + module TaskQueue describe TaskQueue do def wait_for_task_to_complete(task: nil) @@ -156,5 +168,27 @@ def wait_for_task_to_complete(task: nil) # expect that the arrays aren't equal because this was async expect(numbers).to_not eq(expected_results) end + + it 'Creates tasks from any RecreatableTask' do + recreatable_task = MockRecreatableTask.new + task = recreatable_task.to_task + expect(task).to_not be_nil + end + + it 'Should execute tasks from RecreatableTasks' do + recreatable_task = MockRecreatableTask.new + task = recreatable_task.to_task + queue = TaskQueue.new(name: 'test queue') + expect(STDOUT).to receive(:puts).with(recreatable_task.params_to_hash.to_s) + queue.add_task_async(task: task) + wait_for_task_to_complete(task: task) + end + + it 'Should call finalizer when the Queue is destroyed' do + # This is actually tricky to achieve, because even forcing + # garbage collection, the queue's finalizer is not getting + # called until the end of the program or the GC decides to + # release the object (calling GC.start does not trigger it). + end end end diff --git a/task.rb b/task.rb index 315f413..fcd5a89 100644 --- a/task.rb +++ b/task.rb @@ -1,5 +1,6 @@ # frozen_string_literal: true require 'json' +require_relative 'recreatable_task' module TaskQueue # Smallest unit of work that can be submitted to a TaskQueue @@ -32,13 +33,13 @@ def initialize(name: nil, description: nil, work_block: nil, ensure_block: nil) end def self.from_recreatable_task!(file_path: nil) - raise 'Task file path was not provided' unless file_path.nil? - raise 'Task class was not provided' unless recreatable_subclass.nil? + raise 'Task file path was not provided' if file_path.nil? + raise 'Task class was not provided' if recreatable_subclass.nil? recreatable_task_hash = JSON.parse(File.read(file_path)) recreatable_task = recreatable_task_hash['class'].constantize.new - raise 'Recreatable task does not include `RecreatableTask` module' unless recreatable_task.class.included_modules.include?(TaskQueue::RecreatableTask) + raise 'Recreatable task does not include `RecreatableTask` module' unless recreatable_task.class.include?(RecreatableTask) params = recreatable_task_hash['params'] diff --git a/task_queue.rb b/task_queue.rb index d4bd388..73210d2 100644 --- a/task_queue.rb +++ b/task_queue.rb @@ -1,15 +1,18 @@ # frozen_string_literal: true require 'set' -require_relative 'task' -require_relative 'queue_worker' require 'tmpdir' require 'json' +require 'pathname' +require_relative 'task' +require_relative 'queue_worker' +require_relative 'recreatable_task' module TaskQueue # A queue that executes tasks in the order in which they were received class TaskQueue attr_reader :name + attr_reader :queue def initialize(name:, number_of_workers: 1) @name = name @@ -24,7 +27,7 @@ def initialize(name:, number_of_workers: 1) @available_workers.add(worker) end - ObjectSpace.define_finalizer(self, self.class.finalize(name: name, number_of_workers: number_of_workers, tasks: queue.to_a)) + ObjectSpace.define_finalizer(self, proc { puts "deallocate"; self.class.finalizer(name: name, number_of_workers: number_of_workers, tasks: @queue) }) start_task_distributor end @@ -77,7 +80,7 @@ def start_task_distributor loop do hand_out_work # only sleep if we have no workers or the queue is empty - Thread.stop if @available_workers.count == 0 || @queue.empty? + Thread.stop if @available_workers.count.zero? || @queue.empty? end end @@ -109,17 +112,27 @@ def busy_worker_count def self.finalizer(name: nil, number_of_workers: 1, tasks: nil) return if tasks.nil? || name.nil? - path = Pathname.new(Dir.tmpdir).join(name, 'meta.json') - File.write(path, JSON.pretty_generate('name' => name, 'number_of_workers' => number_of_workers)) - to_a(tasks) - .select { |task| task.recreatable && !task.completed } + tasks = tasks.size.times.map { tasks.pop } + name = name.sub(' ', '_') + temp_dir = Pathname.new(Dir.tmpdir).join(name) + FileUtils.mkdir_p(temp_dir) unless File.directory?(temp_dir) + FileUtils.rm_rf("#{temp_dir}/.", secure: true) + meta_path = temp_dir.join('meta.json') + FileUtils.touch(meta_path) + File.write(meta_path, JSON.pretty_generate('name' => name, 'number_of_workers' => number_of_workers)) + recreatable_tasks = tasks.select { |task| task.recreatable && !task.completed } + return if recreatable_tasks.count.zero? + recreatable_tasks .each_with_index do |task, index| task_meta = { 'class' => task.recreatable_class.name, 'params' => task.recreatable_params } - File.write(Pathname.new(Dir.tmpdir).join(name, "#{index}.json"), JSON.pretty_generate(task_meta)) + FileUtils.touch(temp_dir.join("#{index}.json")) + File.write(temp_dir.join("#{index}.json"), JSON.pretty_generate(task_meta)) end end def self.from_recreated_tasks(name: nil) + return nil if name.nil? + name = name.sub('_', ' ') path = Pathname.new(Dir.tmpdir).join(name, 'meta.json') queue_meta = JSON.parse(File.read(path)) queue = TaskQueue.new(name: queue_meta['name'], number_of_workers: queue_meta['number_of_workers']) @@ -129,11 +142,5 @@ def self.from_recreated_tasks(name: nil) end queue end - - private - - def to_a(queue) - queue.size.times.map { queue.pop } - end end end From 0fde06e0aaa6cbbb4aa5dffa1f2fadc5664371aa Mon Sep 17 00:00:00 2001 From: Jorge Revuelta Date: Wed, 28 Feb 2018 19:06:46 +0100 Subject: [PATCH 04/10] Typo --- task.rb | 1 - 1 file changed, 1 deletion(-) diff --git a/task.rb b/task.rb index fcd5a89..f84d135 100644 --- a/task.rb +++ b/task.rb @@ -34,7 +34,6 @@ def initialize(name: nil, description: nil, work_block: nil, ensure_block: nil) def self.from_recreatable_task!(file_path: nil) raise 'Task file path was not provided' if file_path.nil? - raise 'Task class was not provided' if recreatable_subclass.nil? recreatable_task_hash = JSON.parse(File.read(file_path)) recreatable_task = recreatable_task_hash['class'].constantize.new From 45dbad99bba4d89e7b0a5373042b3e437e870524 Mon Sep 17 00:00:00 2001 From: Jorge Date: Wed, 28 Feb 2018 20:26:57 +0100 Subject: [PATCH 05/10] Update task_queue.rb --- task_queue.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/task_queue.rb b/task_queue.rb index 73210d2..6677f38 100644 --- a/task_queue.rb +++ b/task_queue.rb @@ -27,7 +27,7 @@ def initialize(name:, number_of_workers: 1) @available_workers.add(worker) end - ObjectSpace.define_finalizer(self, proc { puts "deallocate"; self.class.finalizer(name: name, number_of_workers: number_of_workers, tasks: @queue) }) + ObjectSpace.define_finalizer(self, proc { self.class.finalizer(name: name, number_of_workers: number_of_workers, tasks: @queue) }) start_task_distributor end From c4cb9cb681353a843b29fbda1d93a6af95a29486 Mon Sep 17 00:00:00 2001 From: Jorge Date: Wed, 28 Feb 2018 21:15:55 +0100 Subject: [PATCH 06/10] Update task.rb --- task.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/task.rb b/task.rb index f84d135..4da54ea 100644 --- a/task.rb +++ b/task.rb @@ -47,7 +47,7 @@ def self.from_recreatable_task!(file_path: nil) task = Task.new(work_block: proc { recreatable_task.run!(params) }) task.recreatable = true task.recreatable.freeze # Avoid further mutations on this. - task.recreatable_class = recreatable_task.constantize + task.recreatable_class = recreatable_task.class task.recreatable_class.freeze task.recreatable_params = params task.recreatable_params.freeze From d872c4dc5f36132a14668c1ec4bcd26b122dbaad Mon Sep 17 00:00:00 2001 From: Jorge Revuelta Date: Wed, 28 Feb 2018 22:07:47 +0100 Subject: [PATCH 07/10] Fixed tests, all them going green :smiley: - Added a destructor test, using fork to ensure the Garbage Collector deallocates resources on execution end. --- queue_worker.rb | 6 +++-- specs/work_queue_spec.rb | 47 +++++++++++++++++++++++++++++++--------- task_queue.rb | 38 +++++++++++++++++--------------- 3 files changed, 61 insertions(+), 30 deletions(-) diff --git a/queue_worker.rb b/queue_worker.rb index 83e0a4e..4cfa0f9 100644 --- a/queue_worker.rb +++ b/queue_worker.rb @@ -32,8 +32,6 @@ def finish_task @busy = false @current_task.completed = true @current_task.completed.freeze # Sorry, you can't run this task again - @current_task.finished_successfully = true - @current_task.finished_successfully.freeze end begin @@ -82,6 +80,10 @@ def process(task:) work_block.call + # If execution reaches this point, we can ensure that no exceptions were raised. + @current_task.finished_successfully = true + @current_task.finished_successfully.freeze + # When work is done, set @busy to false so we can be assigned up a new work unit finish_task rescue StandardError => e diff --git a/specs/work_queue_spec.rb b/specs/work_queue_spec.rb index 8d19d1c..d278c5e 100644 --- a/specs/work_queue_spec.rb +++ b/specs/work_queue_spec.rb @@ -15,7 +15,7 @@ def params_to_hash module TaskQueue describe TaskQueue do def wait_for_task_to_complete(task: nil) - sleep 0.0001 until task.completed + sleep 0.001 until task.completed end it 'Executes one block of work with just 1 worker' do @@ -45,7 +45,7 @@ def wait_for_task_to_complete(task: nil) task = Task.new(work_block: proc { raise "Oh noes" }, ensure_block: proc { ensured = true }) queue.add_task_async(task: task) wait_for_task_to_complete(task: task) - }.to raise_error(RuntimeError, "Oh noes") + }.to raise_error(RuntimeError) expect(ensured).to be(true) end @@ -70,11 +70,12 @@ def wait_for_task_to_complete(task: nil) task = Task.new(work_block: proc { raise "Oh noes" }, ensure_block: proc { |success| ensured = true; success_task = success }) queue.add_task_async(task: task) wait_for_task_to_complete(task: task) - }.to raise_error(RuntimeError, "Oh noes") + + expect(task.finished_successfully).to be(false) + }.to raise_error(RuntimeError) expect(ensured).to be(true) expect(success_task).to be(false) - expect(task.finished_successfully).to be(false) end it 'Executes 2 blocks of work with just 1 worker' do @@ -183,12 +184,38 @@ def wait_for_task_to_complete(task: nil) queue.add_task_async(task: task) wait_for_task_to_complete(task: task) end - - it 'Should call finalizer when the Queue is destroyed' do - # This is actually tricky to achieve, because even forcing - # garbage collection, the queue's finalizer is not getting - # called until the end of the program or the GC decides to - # release the object (calling GC.start does not trigger it). + context 'it should call its destructor' do + it 'Should call finalizer when the Queue is destroyed' do + # creating pipe for IPC to get result from child process + # after it garbaged + # http://ruby-doc.org/core-2.0.0/IO.html#method-c-pipe + rd, wr = IO.pipe + + # forking + # https://ruby-doc.org/core-2.1.2/Process.html#method-c-fork + if fork + wr.close + called = rd.read + Process.wait + expect(called).to eq({ 'name' => 'test queue', 'number_of_workers' => 1 }.to_s) + rd.close + else + rd.close + # overriding TaskQueue.finalizer(...) + TaskQueue.singleton_class.class_eval do + define_method(:finalizer) do |arg| + proc { + wr.write({ 'name' => arg[:name], 'number_of_workers' => arg[:number_of_workers] }) + wr.close + } + end + end + + queue = TaskQueue.new(name: 'test queue') + queue = nil + GC.start + end + end end end end diff --git a/task_queue.rb b/task_queue.rb index 6677f38..fb4e57c 100644 --- a/task_queue.rb +++ b/task_queue.rb @@ -27,7 +27,7 @@ def initialize(name:, number_of_workers: 1) @available_workers.add(worker) end - ObjectSpace.define_finalizer(self, proc { self.class.finalizer(name: name, number_of_workers: number_of_workers, tasks: @queue) }) + ObjectSpace.define_finalizer(self, self.class.finalizer(name: name, number_of_workers: number_of_workers, tasks: @queue)) start_task_distributor end @@ -111,23 +111,25 @@ def busy_worker_count end def self.finalizer(name: nil, number_of_workers: 1, tasks: nil) - return if tasks.nil? || name.nil? - tasks = tasks.size.times.map { tasks.pop } - name = name.sub(' ', '_') - temp_dir = Pathname.new(Dir.tmpdir).join(name) - FileUtils.mkdir_p(temp_dir) unless File.directory?(temp_dir) - FileUtils.rm_rf("#{temp_dir}/.", secure: true) - meta_path = temp_dir.join('meta.json') - FileUtils.touch(meta_path) - File.write(meta_path, JSON.pretty_generate('name' => name, 'number_of_workers' => number_of_workers)) - recreatable_tasks = tasks.select { |task| task.recreatable && !task.completed } - return if recreatable_tasks.count.zero? - recreatable_tasks - .each_with_index do |task, index| - task_meta = { 'class' => task.recreatable_class.name, 'params' => task.recreatable_params } - FileUtils.touch(temp_dir.join("#{index}.json")) - File.write(temp_dir.join("#{index}.json"), JSON.pretty_generate(task_meta)) - end + proc { + return if tasks.nil? || name.nil? + tasks = tasks.size.times.map { tasks.pop } + name = name.sub(' ', '_') + temp_dir = Pathname.new(Dir.tmpdir).join(name) + FileUtils.mkdir_p(temp_dir) unless File.directory?(temp_dir) + FileUtils.rm_rf("#{temp_dir}/.", secure: true) + meta_path = temp_dir.join('meta.json') + FileUtils.touch(meta_path) + File.write(meta_path, JSON.pretty_generate('name' => name, 'number_of_workers' => number_of_workers)) + recreatable_tasks = tasks.select { |task| task.recreatable && !task.completed } + return if recreatable_tasks.count.zero? + recreatable_tasks + .each_with_index do |task, index| + task_meta = { 'class' => task.recreatable_class.name, 'params' => task.recreatable_params } + FileUtils.touch(temp_dir.join("#{index}.json")) + File.write(temp_dir.join("#{index}.json"), JSON.pretty_generate(task_meta)) + end + } end def self.from_recreated_tasks(name: nil) From 638203f6e4a55f055fe111bc5ef6eb09a8a6ff7e Mon Sep 17 00:00:00 2001 From: Jorge Revuelta Date: Thu, 1 Mar 2018 17:48:30 +0100 Subject: [PATCH 08/10] Nice and ready for review! --- recreatable_task.rb | 4 +- specs/work_queue_spec.rb | 141 ++++++++++++++++++++++++++++++++++++++- task.rb | 11 ++- task_queue.rb | 28 +++++--- 4 files changed, 166 insertions(+), 18 deletions(-) diff --git a/recreatable_task.rb b/recreatable_task.rb index 622e3ac..677224b 100644 --- a/recreatable_task.rb +++ b/recreatable_task.rb @@ -12,7 +12,7 @@ module InstanceMethods # This method is the base execution unit for a given Task, it # receives a Hash-like parameter collection defined by the # `params_to_hash` resultant Hash. - def run!(params) + def run!(**params) # no-op end @@ -25,6 +25,8 @@ def params_to_hash {} end + # This is a convenience operator to turn a RecreatableTask to a + # generic Task, in order to be correctly executed by any TaskQueue. def to_task task = Task.new(work_block: proc { run!(params_to_hash) }) task.recreatable = true diff --git a/specs/work_queue_spec.rb b/specs/work_queue_spec.rb index d278c5e..91b30ed 100644 --- a/specs/work_queue_spec.rb +++ b/specs/work_queue_spec.rb @@ -3,12 +3,29 @@ class MockRecreatableTask include TaskQueue::RecreatableTask - def run!(params) - puts "#{params}" + def run!(**params) + puts params.to_s end def params_to_hash - { "one_param" => "Hello", "other_param" => "World" } + { one_param: "Hello", other_param: "World" } + end +end + +class MockOrderedRecreatableTask + include TaskQueue::RecreatableTask + + attr_accessor :number + def initialize(number: nil) + self.number = number + end + + def run!(**params) + puts params.to_s + end + + def params_to_hash + { one_param: "Hello", other_param: "World", number: number } end end @@ -217,5 +234,123 @@ def wait_for_task_to_complete(task: nil) end end end + + it 'Stores in JSON format the meta information of the queue' do + queue = TaskQueue.new(name: 'test queue') + tasks = Queue.new + # We need at least one pending task to let the queue be stored + tasks << MockRecreatableTask.new.to_task + allow(queue).to receive(:queue).and_return(tasks) + TaskQueue.finalizer(name: 'test queue', number_of_workers: 1, tasks: tasks).call + meta_path = Pathname.new(Dir.tmpdir).join('test_queue', 'meta.json') + expect(File.exist?(meta_path)).to eq(true) + queue_meta = JSON.parse(File.read(meta_path), symbolize_names: true) + expect(queue_meta).to eq({ name: 'test_queue', number_of_workers: 1 }) + + # Cleanup directory + FileUtils.rm_rf(Pathname.new(Dir.tmpdir).join('test_queue'), secure: true) + end + + it 'Stores the correct number of pending tasks' do + queue = TaskQueue.new(name: 'test queue') + tasks = Queue.new + # We need at least one pending task to let the queue be stored + task_number = 10 + task_number.times do + tasks << MockRecreatableTask.new.to_task + end + + allow(queue).to receive(:queue).and_return(tasks) + TaskQueue.finalizer(name: 'test queue', number_of_workers: 1, tasks: tasks).call + + file_count = Dir[Pathname.new(Dir.tmpdir).join('test_queue', '*.json')].select { |file| File.file?(file) }.count + + expect(file_count).to eq(task_number + 1) # Task count plus meta file. + + # Cleanup directory + FileUtils.rm_rf(Pathname.new(Dir.tmpdir).join('test_queue'), secure: true) + end + + it 'Stores the correct information of pending tasks in order to recreate them' do + queue = TaskQueue.new(name: 'test queue') + tasks = Queue.new + # We need at least one pending task to let the queue be stored + tasks << MockRecreatableTask.new.to_task + + allow(queue).to receive(:queue).and_return(tasks) + TaskQueue.finalizer(name: 'test queue', number_of_workers: 1, tasks: tasks).call + Dir[Pathname.new(Dir.tmpdir).join('test_queue', '*.json')].select { |file| File.file?(file) }.each do |json_file| + next if File.basename(json_file).eql?('meta.json') # Skip meta file + task_hash = JSON.parse(File.read(json_file), symbolize_names: true) + expect(task_hash).to eq({ class: MockRecreatableTask.to_s, params: { one_param: "Hello", other_param: "World" } }) + end + + # Cleanup directory + FileUtils.rm_rf(Pathname.new(Dir.tmpdir).join('test_queue'), secure: true) + end + + it 'Recreates correctly the task from the information stored' do + queue = TaskQueue.new(name: 'test queue') + tasks = Queue.new + # We need at least one pending task to let the queue be stored + tasks << MockRecreatableTask.new.to_task + + allow(queue).to receive(:queue).and_return(tasks) + TaskQueue.finalizer(name: 'test queue', number_of_workers: 1, tasks: tasks).call + Dir[Pathname.new(Dir.tmpdir).join('test_queue', '*.json')].select { |file| File.file?(file) }.each do |json_file| + next if File.basename(json_file).eql?('meta.json') # Skip meta file + recreatable_task = Task.from_recreatable_task!(file_path: json_file) + expect(recreatable_task.recreatable_class).to eq(MockRecreatableTask) + expect(recreatable_task.recreatable_params).to eq({ one_param: "Hello", other_param: "World" }) + end + + # Cleanup directory + FileUtils.rm_rf(Pathname.new(Dir.tmpdir).join('test_queue'), secure: true) + end + + it 'Recreates the whole TaskQueue from the information stored' do + queue = TaskQueue.new(name: 'test queue') + tasks = Queue.new + # We need at least one pending task to let the queue be stored + task_number = 2 + task_number.times do + tasks << MockRecreatableTask.new.to_task + end + + allow(queue).to receive(:queue).and_return(tasks) + TaskQueue.finalizer(name: 'test queue', number_of_workers: 1, tasks: tasks).call + + expect(STDOUT).to receive(:puts).with({ one_param: "Hello", other_param: "World" }.to_s).twice + recreated_queue = TaskQueue.from_recreated_tasks!(name: 'test queue') + sleep 0.001 until recreated_queue.task_count.zero? + expect(recreated_queue.name).to eq('test queue') + + # Cleanup directory + FileUtils.rm_rf(Pathname.new(Dir.tmpdir).join('test_queue'), secure: true) + end + + it 'Recreates the whole TaskQueue from the information stored in the correct order' do + queue = TaskQueue.new(name: 'test queue') + tasks = Queue.new + # We need at least one pending task to let the queue be stored + task_number = 4 + task_number.times do |i| + tasks << MockOrderedRecreatableTask.new(number: i).to_task + end + + allow(queue).to receive(:queue).and_return(tasks) + TaskQueue.finalizer(name: 'test queue', number_of_workers: 1, tasks: tasks).call + + task_number.times do |i| + expect(STDOUT).to receive(:puts).with({ one_param: "Hello", other_param: "World", number: i }.to_s) + end + + recreated_queue = TaskQueue.from_recreated_tasks!(name: 'test queue') + sleep 0.001 until recreated_queue.task_count.zero? + expect(recreated_queue.name).to eq('test queue') + + # Cleanup directory + FileUtils.rm_rf(Pathname.new(Dir.tmpdir).join('test_queue'), secure: true) + end end end diff --git a/task.rb b/task.rb index 4da54ea..6dedde6 100644 --- a/task.rb +++ b/task.rb @@ -32,15 +32,20 @@ def initialize(name: nil, description: nil, work_block: nil, ensure_block: nil) self.finished_successfully = false end + # Factory method which allows, given a file_path containing + # the file with the required data to recreate the Task stored + # by a certain Queue. + # @param file_path: String + # @returns [Task] def self.from_recreatable_task!(file_path: nil) raise 'Task file path was not provided' if file_path.nil? - recreatable_task_hash = JSON.parse(File.read(file_path)) - recreatable_task = recreatable_task_hash['class'].constantize.new + recreatable_task_hash = JSON.parse(File.read(file_path), symbolize_names: true) + recreatable_task = Object.const_get(recreatable_task_hash[:class]).new raise 'Recreatable task does not include `RecreatableTask` module' unless recreatable_task.class.include?(RecreatableTask) - params = recreatable_task_hash['params'] + params = recreatable_task_hash[:params] raise "Unexpected parameter type, found #{params.class} expected Hash." unless params.is_a?(Hash) diff --git a/task_queue.rb b/task_queue.rb index fb4e57c..e83af80 100644 --- a/task_queue.rb +++ b/task_queue.rb @@ -12,7 +12,6 @@ module TaskQueue # A queue that executes tasks in the order in which they were received class TaskQueue attr_reader :name - attr_reader :queue def initialize(name:, number_of_workers: 1) @name = name @@ -114,32 +113,39 @@ def self.finalizer(name: nil, number_of_workers: 1, tasks: nil) proc { return if tasks.nil? || name.nil? tasks = tasks.size.times.map { tasks.pop } + return if tasks.count.zero? + recreatable_tasks = tasks.select { |task| task.recreatable && !task.completed } + return if recreatable_tasks.count.zero? name = name.sub(' ', '_') temp_dir = Pathname.new(Dir.tmpdir).join(name) FileUtils.mkdir_p(temp_dir) unless File.directory?(temp_dir) FileUtils.rm_rf("#{temp_dir}/.", secure: true) meta_path = temp_dir.join('meta.json') FileUtils.touch(meta_path) - File.write(meta_path, JSON.pretty_generate('name' => name, 'number_of_workers' => number_of_workers)) - recreatable_tasks = tasks.select { |task| task.recreatable && !task.completed } - return if recreatable_tasks.count.zero? + File.write(meta_path, JSON.pretty_generate(:name => name, :number_of_workers => number_of_workers)) recreatable_tasks .each_with_index do |task, index| - task_meta = { 'class' => task.recreatable_class.name, 'params' => task.recreatable_params } + task_meta = { :class => task.recreatable_class.name, :params => task.recreatable_params } FileUtils.touch(temp_dir.join("#{index}.json")) File.write(temp_dir.join("#{index}.json"), JSON.pretty_generate(task_meta)) end } end - def self.from_recreated_tasks(name: nil) + # Factory method. + # Creates a new TaskQueue given the name of the TaskQueue that was destroyed. + # + # @param name: String + # @returns [TaskQueue] with its recreatable tasks already added async. + def self.from_recreated_tasks!(name: nil) return nil if name.nil? - name = name.sub('_', ' ') + name = name.sub(' ', '_') path = Pathname.new(Dir.tmpdir).join(name, 'meta.json') - queue_meta = JSON.parse(File.read(path)) - queue = TaskQueue.new(name: queue_meta['name'], number_of_workers: queue_meta['number_of_workers']) - Dir.glob(Dir.tmpdir.join(name, '*.json')).sort do |json_file| - next if File.basename(json_file).eql?('meta.json') + return unless File.file?(path) + queue_meta = JSON.parse(File.read(path), symbolize_names: true) + queue = TaskQueue.new(name: queue_meta[:name].sub('_', ' '), number_of_workers: queue_meta[:number_of_workers]) + Dir[Pathname.new(Dir.tmpdir).join('test_queue', '*.json')].sort.each do |json_file| + next if File.basename(json_file).eql?('meta.json') # Skip meta file queue.add_task_async(task: Task.from_recreatable_task!(file_path: Pathname.new(Dir.tmpdir).join(name, json_file))) end queue From a3db7d2dca5b49bbb5b8f4c0ed437ca0d203b9b1 Mon Sep 17 00:00:00 2001 From: Jorge Revuelta Date: Thu, 1 Mar 2018 20:12:04 +0100 Subject: [PATCH 09/10] Review comments --- README.md | 28 ++++++++++++++++++++++++++++ queue_worker.rb | 1 + recreatable_task.rb | 2 -- specs/work_queue_spec.rb | 2 +- task.rb | 5 ++--- task_queue.rb | 16 +++++++--------- 6 files changed, 39 insertions(+), 15 deletions(-) diff --git a/README.md b/README.md index d6240e6..ee64923 100644 --- a/README.md +++ b/README.md @@ -26,6 +26,34 @@ task = Task.new(work_block: proc { queue.add_task_async(task: task) ``` +## Recreatable Tasks + +The tasks that are created from the mixin `RecreatableTask` can be recovered in future executions of the `TaskQueue` where their were enqueued originally. + +### Example + +```ruby +# We define a task that includes RecreatableTask +class HelloToRecreatableTask + include TaskQueue::RecreatableTask + + # The run! method receives a collection of params and defines the real execution of the task itself. + def run!(**params) + puts "Hello #{params}" + end + + # In case the queue gets deallocated with RecreatableTasks on its queue, the hash returned by this function will be stored. Make sure that all values are JSON encodable. + def params_to_hash + { to: "fastlane" } + end +end + +queue = TaskQueue(name: 'test queue') +task = HelloToRecreatableTask.new.to_task +queue.add_task_async(task: task) + +``` + ## Run tests ``` diff --git a/queue_worker.rb b/queue_worker.rb index 4cfa0f9..f76adbe 100644 --- a/queue_worker.rb +++ b/queue_worker.rb @@ -1,4 +1,5 @@ require "securerandom" + module TaskQueue # Class responsible for executing a single task # Designed to live as long as the owning queue lives diff --git a/recreatable_task.rb b/recreatable_task.rb index 677224b..9a27be9 100644 --- a/recreatable_task.rb +++ b/recreatable_task.rb @@ -1,5 +1,3 @@ -# frozen_string_literal: true - module TaskQueue # Mixin included by Tasks that may be recreated from a file. module RecreatableTask diff --git a/specs/work_queue_spec.rb b/specs/work_queue_spec.rb index 91b30ed..32c81fd 100644 --- a/specs/work_queue_spec.rb +++ b/specs/work_queue_spec.rb @@ -1,4 +1,4 @@ -require_relative '../task_queue' +require_relative "../task_queue" class MockRecreatableTask include TaskQueue::RecreatableTask diff --git a/task.rb b/task.rb index 6dedde6..b6475fc 100644 --- a/task.rb +++ b/task.rb @@ -1,6 +1,5 @@ -# frozen_string_literal: true -require 'json' -require_relative 'recreatable_task' +require "json" +require_relative "recreatable_task" module TaskQueue # Smallest unit of work that can be submitted to a TaskQueue diff --git a/task_queue.rb b/task_queue.rb index e83af80..e84cee4 100644 --- a/task_queue.rb +++ b/task_queue.rb @@ -1,12 +1,10 @@ -# frozen_string_literal: true - -require 'set' -require 'tmpdir' -require 'json' -require 'pathname' -require_relative 'task' -require_relative 'queue_worker' -require_relative 'recreatable_task' +require "set" +require "tmpdir" +require "json" +require "pathname" +require_relative "task" +require_relative "queue_worker" +require_relative "recreatable_task" module TaskQueue # A queue that executes tasks in the order in which they were received From fa26fa1f864744b1c42bf837597fdc01941ddca8 Mon Sep 17 00:00:00 2001 From: Jorge Revuelta Date: Thu, 1 Mar 2018 20:26:10 +0100 Subject: [PATCH 10/10] That was weird --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index ee64923..6f79967 100644 --- a/README.md +++ b/README.md @@ -26,7 +26,7 @@ task = Task.new(work_block: proc { queue.add_task_async(task: task) ``` -## Recreatable Tasks +## Recreatable Tasks The tasks that are created from the mixin `RecreatableTask` can be recovered in future executions of the `TaskQueue` where their were enqueued originally.