Skip to content

Commit

Permalink
Merge pull request #5 from minuscorp/recreatable-tasks
Browse files Browse the repository at this point in the history
Tasks that are `RecreatableTask` should be recreated by the queue where it was added.
  • Loading branch information
taquitos authored Mar 1, 2018
2 parents ffa7145 + fa26fa1 commit 88c5dff
Show file tree
Hide file tree
Showing 6 changed files with 360 additions and 9 deletions.
28 changes: 28 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,34 @@ task = Task.new(work_block: proc {
queue.add_task_async(task: task)
```

## Recreatable Tasks

The tasks that are created from the mixin `RecreatableTask` can be recovered in future executions of the `TaskQueue` where their were enqueued originally.

### Example

```ruby
# We define a task that includes RecreatableTask
class HelloToRecreatableTask
include TaskQueue::RecreatableTask

# The run! method receives a collection of params and defines the real execution of the task itself.
def run!(**params)
puts "Hello #{params}"
end

# In case the queue gets deallocated with RecreatableTasks on its queue, the hash returned by this function will be stored. Make sure that all values are JSON encodable.
def params_to_hash
{ to: "fastlane" }
end
end

queue = TaskQueue(name: 'test queue')
task = HelloToRecreatableTask.new.to_task
queue.add_task_async(task: task)

```

## Run tests

```
Expand Down
7 changes: 5 additions & 2 deletions queue_worker.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
require "securerandom"

module TaskQueue
# Class responsible for executing a single task
# Designed to live as long as the owning queue lives
Expand Down Expand Up @@ -32,8 +33,6 @@ def finish_task
@busy = false
@current_task.completed = true
@current_task.completed.freeze # Sorry, you can't run this task again
@current_task.finished_successfully = true
@current_task.finished_successfully.freeze
end

begin
Expand Down Expand Up @@ -82,6 +81,10 @@ def process(task:)

work_block.call

# If execution reaches this point, we can ensure that no exceptions were raised.
@current_task.finished_successfully = true
@current_task.finished_successfully.freeze

# When work is done, set @busy to false so we can be assigned up a new work unit
finish_task
rescue StandardError => e
Expand Down
40 changes: 40 additions & 0 deletions recreatable_task.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
module TaskQueue
# Mixin included by Tasks that may be recreated from a file.
module RecreatableTask
def self.included(base)
base.send(:include, InstanceMethods)
end

# Add this as instance methods.
module InstanceMethods
# This method is the base execution unit for a given Task, it
# receives a Hash-like parameter collection defined by the
# `params_to_hash` resultant Hash.
def run!(**params)
# no-op
end

# This method is intended to provide a nice Hash-based interface
# for all the parameters used in a given task and, that in case
# of system failure, might be recreated later.
# For this reason, user has to take in mind that the hash should
# always be serializable right away.
def params_to_hash
{}
end

# This is a convenience operator to turn a RecreatableTask to a
# generic Task, in order to be correctly executed by any TaskQueue.
def to_task
task = Task.new(work_block: proc { run!(params_to_hash) })
task.recreatable = true
task.recreatable.freeze # Avoid further mutations on this.
task.recreatable_class = self.class
task.recreatable_class.freeze
task.recreatable_params = params_to_hash
task.recreatable_params.freeze
task
end
end
end
end
206 changes: 201 additions & 5 deletions specs/work_queue_spec.rb
Original file line number Diff line number Diff line change
@@ -1,9 +1,38 @@
require_relative '../task_queue'
require_relative "../task_queue"

class MockRecreatableTask
include TaskQueue::RecreatableTask

def run!(**params)
puts params.to_s
end

def params_to_hash
{ one_param: "Hello", other_param: "World" }
end
end

class MockOrderedRecreatableTask
include TaskQueue::RecreatableTask

attr_accessor :number
def initialize(number: nil)
self.number = number
end

def run!(**params)
puts params.to_s
end

def params_to_hash
{ one_param: "Hello", other_param: "World", number: number }
end
end

module TaskQueue
describe TaskQueue do
def wait_for_task_to_complete(task: nil)
sleep 0.0001 until task.completed
sleep 0.001 until task.completed
end

it 'Executes one block of work with just 1 worker' do
Expand Down Expand Up @@ -33,7 +62,7 @@ def wait_for_task_to_complete(task: nil)
task = Task.new(work_block: proc { raise "Oh noes" }, ensure_block: proc { ensured = true })
queue.add_task_async(task: task)
wait_for_task_to_complete(task: task)
}.to raise_error(RuntimeError, "Oh noes")
}.to raise_error(RuntimeError)

expect(ensured).to be(true)
end
Expand All @@ -58,11 +87,12 @@ def wait_for_task_to_complete(task: nil)
task = Task.new(work_block: proc { raise "Oh noes" }, ensure_block: proc { |success| ensured = true; success_task = success })
queue.add_task_async(task: task)
wait_for_task_to_complete(task: task)
}.to raise_error(RuntimeError, "Oh noes")

expect(task.finished_successfully).to be(false)
}.to raise_error(RuntimeError)

expect(ensured).to be(true)
expect(success_task).to be(false)
expect(task.finished_successfully).to be(false)
end

it 'Executes 2 blocks of work with just 1 worker' do
Expand Down Expand Up @@ -156,5 +186,171 @@ def wait_for_task_to_complete(task: nil)
# expect that the arrays aren't equal because this was async
expect(numbers).to_not eq(expected_results)
end

it 'Creates tasks from any RecreatableTask' do
recreatable_task = MockRecreatableTask.new
task = recreatable_task.to_task
expect(task).to_not be_nil
end

it 'Should execute tasks from RecreatableTasks' do
recreatable_task = MockRecreatableTask.new
task = recreatable_task.to_task
queue = TaskQueue.new(name: 'test queue')
expect(STDOUT).to receive(:puts).with(recreatable_task.params_to_hash.to_s)
queue.add_task_async(task: task)
wait_for_task_to_complete(task: task)
end
context 'it should call its destructor' do
it 'Should call finalizer when the Queue is destroyed' do
# creating pipe for IPC to get result from child process
# after it garbaged
# http://ruby-doc.org/core-2.0.0/IO.html#method-c-pipe
rd, wr = IO.pipe

# forking
# https://ruby-doc.org/core-2.1.2/Process.html#method-c-fork
if fork
wr.close
called = rd.read
Process.wait
expect(called).to eq({ 'name' => 'test queue', 'number_of_workers' => 1 }.to_s)
rd.close
else
rd.close
# overriding TaskQueue.finalizer(...)
TaskQueue.singleton_class.class_eval do
define_method(:finalizer) do |arg|
proc {
wr.write({ 'name' => arg[:name], 'number_of_workers' => arg[:number_of_workers] })
wr.close
}
end
end

queue = TaskQueue.new(name: 'test queue')
queue = nil
GC.start
end
end
end

it 'Stores in JSON format the meta information of the queue' do
queue = TaskQueue.new(name: 'test queue')
tasks = Queue.new
# We need at least one pending task to let the queue be stored
tasks << MockRecreatableTask.new.to_task
allow(queue).to receive(:queue).and_return(tasks)
TaskQueue.finalizer(name: 'test queue', number_of_workers: 1, tasks: tasks).call
meta_path = Pathname.new(Dir.tmpdir).join('test_queue', 'meta.json')
expect(File.exist?(meta_path)).to eq(true)
queue_meta = JSON.parse(File.read(meta_path), symbolize_names: true)
expect(queue_meta).to eq({ name: 'test_queue', number_of_workers: 1 })

# Cleanup directory
FileUtils.rm_rf(Pathname.new(Dir.tmpdir).join('test_queue'), secure: true)
end

it 'Stores the correct number of pending tasks' do
queue = TaskQueue.new(name: 'test queue')
tasks = Queue.new
# We need at least one pending task to let the queue be stored
task_number = 10
task_number.times do
tasks << MockRecreatableTask.new.to_task
end

allow(queue).to receive(:queue).and_return(tasks)
TaskQueue.finalizer(name: 'test queue', number_of_workers: 1, tasks: tasks).call

file_count = Dir[Pathname.new(Dir.tmpdir).join('test_queue', '*.json')].select { |file| File.file?(file) }.count

expect(file_count).to eq(task_number + 1) # Task count plus meta file.

# Cleanup directory
FileUtils.rm_rf(Pathname.new(Dir.tmpdir).join('test_queue'), secure: true)
end

it 'Stores the correct information of pending tasks in order to recreate them' do
queue = TaskQueue.new(name: 'test queue')
tasks = Queue.new
# We need at least one pending task to let the queue be stored
tasks << MockRecreatableTask.new.to_task

allow(queue).to receive(:queue).and_return(tasks)
TaskQueue.finalizer(name: 'test queue', number_of_workers: 1, tasks: tasks).call
Dir[Pathname.new(Dir.tmpdir).join('test_queue', '*.json')].select { |file| File.file?(file) }.each do |json_file|
next if File.basename(json_file).eql?('meta.json') # Skip meta file
task_hash = JSON.parse(File.read(json_file), symbolize_names: true)
expect(task_hash).to eq({ class: MockRecreatableTask.to_s, params: { one_param: "Hello", other_param: "World" } })
end

# Cleanup directory
FileUtils.rm_rf(Pathname.new(Dir.tmpdir).join('test_queue'), secure: true)
end

it 'Recreates correctly the task from the information stored' do
queue = TaskQueue.new(name: 'test queue')
tasks = Queue.new
# We need at least one pending task to let the queue be stored
tasks << MockRecreatableTask.new.to_task

allow(queue).to receive(:queue).and_return(tasks)
TaskQueue.finalizer(name: 'test queue', number_of_workers: 1, tasks: tasks).call
Dir[Pathname.new(Dir.tmpdir).join('test_queue', '*.json')].select { |file| File.file?(file) }.each do |json_file|
next if File.basename(json_file).eql?('meta.json') # Skip meta file
recreatable_task = Task.from_recreatable_task!(file_path: json_file)
expect(recreatable_task.recreatable_class).to eq(MockRecreatableTask)
expect(recreatable_task.recreatable_params).to eq({ one_param: "Hello", other_param: "World" })
end

# Cleanup directory
FileUtils.rm_rf(Pathname.new(Dir.tmpdir).join('test_queue'), secure: true)
end

it 'Recreates the whole TaskQueue from the information stored' do
queue = TaskQueue.new(name: 'test queue')
tasks = Queue.new
# We need at least one pending task to let the queue be stored
task_number = 2
task_number.times do
tasks << MockRecreatableTask.new.to_task
end

allow(queue).to receive(:queue).and_return(tasks)
TaskQueue.finalizer(name: 'test queue', number_of_workers: 1, tasks: tasks).call

expect(STDOUT).to receive(:puts).with({ one_param: "Hello", other_param: "World" }.to_s).twice
recreated_queue = TaskQueue.from_recreated_tasks!(name: 'test queue')
sleep 0.001 until recreated_queue.task_count.zero?
expect(recreated_queue.name).to eq('test queue')

# Cleanup directory
FileUtils.rm_rf(Pathname.new(Dir.tmpdir).join('test_queue'), secure: true)
end

it 'Recreates the whole TaskQueue from the information stored in the correct order' do
queue = TaskQueue.new(name: 'test queue')
tasks = Queue.new
# We need at least one pending task to let the queue be stored
task_number = 4
task_number.times do |i|
tasks << MockOrderedRecreatableTask.new(number: i).to_task
end

allow(queue).to receive(:queue).and_return(tasks)
TaskQueue.finalizer(name: 'test queue', number_of_workers: 1, tasks: tasks).call

task_number.times do |i|
expect(STDOUT).to receive(:puts).with({ one_param: "Hello", other_param: "World", number: i }.to_s)
end

recreated_queue = TaskQueue.from_recreated_tasks!(name: 'test queue')
sleep 0.001 until recreated_queue.task_count.zero?
expect(recreated_queue.name).to eq('test queue')

# Cleanup directory
FileUtils.rm_rf(Pathname.new(Dir.tmpdir).join('test_queue'), secure: true)
end
end
end
36 changes: 36 additions & 0 deletions task.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
require "json"
require_relative "recreatable_task"

module TaskQueue
# Smallest unit of work that can be submitted to a TaskQueue
# Not reusable, if you attempt to re-execute this, the worker will raise an exception
Expand All @@ -9,6 +12,9 @@ class Task
attr_accessor :completed
attr_accessor :submitted
attr_accessor :finished_successfully
attr_accessor :recreatable
attr_accessor :recreatable_class
attr_accessor :recreatable_params

def initialize(name: nil, description: nil, work_block: nil, ensure_block: nil)
self.work_block = work_block
Expand All @@ -18,9 +24,39 @@ def initialize(name: nil, description: nil, work_block: nil, ensure_block: nil)

self.name.freeze
self.description.freeze
self.recreatable = false
self.recreatable_class = nil
self.completed = false
self.submitted = false
self.finished_successfully = false
end

# Factory method which allows, given a file_path containing
# the file with the required data to recreate the Task stored
# by a certain Queue.
# @param file_path: String
# @returns [Task]
def self.from_recreatable_task!(file_path: nil)
raise 'Task file path was not provided' if file_path.nil?

recreatable_task_hash = JSON.parse(File.read(file_path), symbolize_names: true)
recreatable_task = Object.const_get(recreatable_task_hash[:class]).new

raise 'Recreatable task does not include `RecreatableTask` module' unless recreatable_task.class.include?(RecreatableTask)

params = recreatable_task_hash[:params]

raise "Unexpected parameter type, found #{params.class} expected Hash." unless params.is_a?(Hash)

task = Task.new(work_block: proc { recreatable_task.run!(params) })
task.recreatable = true
task.recreatable.freeze # Avoid further mutations on this.
task.recreatable_class = recreatable_task.class
task.recreatable_class.freeze
task.recreatable_params = params
task.recreatable_params.freeze

task
end
end
end
Loading

0 comments on commit 88c5dff

Please sign in to comment.