Skip to content

Commit

Permalink
add ParallelPopulationOptimizer
Browse files Browse the repository at this point in the history
  • Loading branch information
alyst committed Jul 24, 2015
1 parent 8b77c68 commit 4429258
Show file tree
Hide file tree
Showing 6 changed files with 330 additions and 0 deletions.
1 change: 1 addition & 0 deletions REQUIRE
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ FactCheck
StatsBase
Distributions
Compat
MessageUtils
2 changes: 2 additions & 0 deletions src/BlackBoxOptim.jl
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ using Distributions, StatsBase, Compat

export Optimizer, AskTellOptimizer, SteppingOptimizer, PopulationOptimizer,
bboptimize, bbsetup, compare_optimizers,
ParallelPopulationOptimizer,

DiffEvoOpt, de_rand_1_bin, de_rand_1_bin_radiuslimited,
adaptive_de_rand_1_bin, adaptive_de_rand_1_bin_radiuslimited,
Expand Down Expand Up @@ -169,6 +170,7 @@ include("resampling_memetic_search.jl")
include("simultaneous_perturbation_stochastic_approximation.jl")
include("generating_set_search.jl")
include("direct_search_with_probabilistic_descent.jl")
include("parallel_population_optimizer.jl")

# Fitness
# include("fitness/fitness_types.jl") FIXME merge it with fitness.jl
Expand Down
1 change: 1 addition & 0 deletions src/optimization_methods.jl
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ ValidMethods = @compat Dict{Symbol,Union(Any,Function)}(
:simultaneous_perturbation_stochastic_approximation => SimultaneousPerturbationSA2,
:generating_set_search => GeneratingSetSearcher,
:probabilistic_descent => direct_search_probabilistic_descent,
:parallel_population_optimizer => parallel_population_optimizer,
)

MethodNames = sort!(collect(keys(ValidMethods)))
305 changes: 305 additions & 0 deletions src/parallel_population_optimizer.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,305 @@
using MessageUtils

ParallelPopulationOptimizer_DefaultParameters = @compat Dict{Symbol,Any}(
:WorkerMethod => :de_rand_1_bin, # worker population optimization method
:NWorkers => 2, # number of workers
:MigrationSize => 1, # number of "migrant" individual to sent to the master
:MigrationPeriod => 100, # number of worker iterations before sending migrants
:ArchiveCapacity => 10 # ParallelPseudoEvaluator archive capacity
)

# metrics from worker optimizer
type WorkerMetrics
num_evals::Int # number of function evals
num_steps::Int # number of steps
num_better::Int # number of steps that improved best fitness
num_migrated::Int # number of migrants worker received
num_better_migrated::Int # number of migrants accepted (because fitness improved)

WorkerMetrics() = new(0, 0, 0, 0, 0)
end

function Base.copy!(a::WorkerMetrics, b::WorkerMetrics)
a.num_evals = b.num_evals
a.num_steps = b.num_steps
a.num_better = b.num_better
a.num_migrated = b.num_migrated
a.num_better_migrated = b.num_better_migrated
return a
end

# fake evaluator for ParallelPopulationOptimizer
# it doesn't evaluate itself, but stores some
# metrics from the workers evaluators
type ParallelPseudoEvaluator{F, P<:OptimizationProblem} <: Evaluator{P}
problem::P
archive::TopListArchive{F}
workers_metrics::Vector{WorkerMetrics} # function evals per worker etc
last_fitness::F
end

num_evals(ppe::ParallelPseudoEvaluator) = mapreduce(x -> x.num_evals, +, 0, ppe.workers_metrics)
num_better(ppe::ParallelPseudoEvaluator) = mapreduce(x -> x.num_better, +, 0, ppe.workers_metrics)

function ParallelPseudoEvaluator{P<:OptimizationProblem}(
problem::P, nworkers::Int;
archiveCapacity::Int = 10)
ParallelPseudoEvaluator{fitness_type(problem), P}(
problem,
TopListArchive(fitness_scheme(problem), numdims(problem), archiveCapacity),
WorkerMetrics[WorkerMetrics() for i in 1:nworkers],
nafitness(fitness_scheme(problem)))
end

# message with the candidate passed between the workers and the master
immutable CandidateMessage{F}
worker::Int # origin of candidate
metrics::WorkerMetrics # current origin worker metrics
candi::Candidate{F}
end

# Parallel population optimizer
# starts nworkers parallel population optimizers.
# At regular interval, the workers send the master process their random population members
# and the master redirects them to the other workers
type ParallelPopulationOptimizer{F, P<:OptimizationProblem} <: SteppingOptimizer
from_workers::MessageUtils.SyncObjRef{RemoteChannel}
to_workers::Vector{MessageUtils.SyncObjRef{RemoteChannel}}
evaluator::ParallelPseudoEvaluator{F, P}

ParallelPopulationOptimizer(from_workers::MessageUtils.SyncObjRef{RemoteChannel},
to_workers::Vector{MessageUtils.SyncObjRef{RemoteChannel}},
evaluator::ParallelPseudoEvaluator{F, P}) =
new(from_workers, to_workers, evaluator)
end

nworkers(ppopt::ParallelPopulationOptimizer) = length(ppopt.to_workers)

# read worker's message, stores the worker metrics and updates best fitness using
function store!{F}(ppe::ParallelPseudoEvaluator{F}, msg::CandidateMessage{F})
copy!(ppe.workers_metrics[msg.worker], msg.metrics)
if !isnafitness(msg.candi.fitness, fitness_scheme(ppe)) # store only the candidates with the known fitness
add_candidate!(ppe.archive, msg.candi.fitness, msg.candi.params, num_evals(ppe))
end
end

# outer parallel population optimizer constructor that
# also spawns worker tasks
function ParallelPopulationOptimizer{P<:OptimizationProblem}(
problem::P, optimizer_generator::Function, nworkers::Int,
migrationSize::Int = 1, migrationPeriod::Int = 100,
archiveCapacity::Int = 10)
F = fitness_type(problem)
info("Constructing parallel optimizer...")
from_workers = channel(T=CandidateMessage{F})
to_workers = MessageUtils.SyncObjRef{RemoteChannel}[channel(T=CandidateMessage{F}) for i in 1:nworkers]
workers_started = channel(T=Bool) # FIXME do we need to wait for the worker?
for i in 1:nworkers
info("Initializing worker #$i...")
pid = i+1
@spawnat pid run_worker(i, workers_started, problem, optimizer_generator,
from_workers, to_workers[i],
migrationSize, migrationPeriod)
end
# wait until all the workers are started
# FIXME is it required?
nstarted = 0
while nstarted < nworkers
take!(workers_started)
nstarted += 1
end
info("All workers started")
ParallelPopulationOptimizer{F, P}(from_workers, to_workers,
ParallelPseudoEvaluator(problem, length(to_workers);
archiveCapacity = archiveCapacity))
end

function parallel_population_optimizer(problem::OptimizationProblem, parameters::Parameters)
param_dict = convert(ParamsDict, parameters) # FIXME convert to dict to avoid serialization problems of DictChain
params = chain(ParallelPopulationOptimizer_DefaultParameters, parameters)
worker_method = params[:WorkerMethod]
info( "Using $worker_method as worker method for parallel optimization")
optimizer_func = ValidMethods[worker_method]

ParallelPopulationOptimizer(problem, problem -> optimizer_func(problem, param_dict),
params[:NWorkers], params[:MigrationSize], params[:MigrationPeriod],
params[:ArchiveCapacity])
end

# redirects candidate to another worker
function redirect{F}(ppopt::ParallelPopulationOptimizer{F}, msg::CandidateMessage{F})
# redirect to the other parallel task
#println("redirecting from $(msg.worker)")
recv_ix = sample(1:(length(ppopt.to_workers)-1))
if recv_ix >= msg.worker # index is the origin worker
recv_ix += 1
end
msg.candi.op = NO_GEN_OP # reset operation and tag to avoid calling adjust!() out-of-context
msg.candi.tag = 0
put!(ppopt.to_workers[recv_ix], msg)
#println("redirecting done")
end

function step!(ppopt::ParallelPopulationOptimizer)
#println("main#: n_evals=$(num_evals(ppopt.evaluator))")
last_better = num_better(ppopt.evaluator)
candidate = take!(ppopt.from_workers)
#println("candidate=$candidate")
store!(ppopt.evaluator, candidate)
redirect(ppopt, candidate)
return num_better(ppopt.evaluator) - last_better
end

# finalize the master: wait for the workers shutdown,
# get their best candidates
function finalize!(ppopt::ParallelPopulationOptimizer, evaluator::ParallelPseudoEvaluator)
# send special terminating candidate with worker -12345 index
F = fitness_type(evaluator)
for to_worker in ppopt.to_workers
put!(to_worker, CandidateMessage{F}(-12345, WorkerMetrics(), Candidate{F}(Individual())))
end
# wait until all threads finish
# the last candidates being sent are the best in the population
n_finished = 0
while n_finished < nworkers(ppopt)
msg = take!(ppopt.from_workers)
if msg.worker < 0 # HACK messages with negative worker is the finishing mark
msg = CandidateMessage{F}(-msg.worker, msg.metrics, msg.candi) # fix the worker Id
store!(evaluator, msg) # store the best candidate
n_finished += 1
info("Worker #$(msg.worker) finished")
end
end
info("Parallel optimizer finished. Metrics per worker: $(evaluator.workers_metrics)")
end

# wraps the worker's population optimizer
# and communicates with the master
type PopulationOptimizerWrapper{O<:PopulationOptimizer,E<:Evaluator}
id::Int # worker's Id
optimizer::O
evaluator::E
to_master::MessageUtils.SyncObjRef{RemoteChannel} # outgoing candidates
from_master::MessageUtils.SyncObjRef{RemoteChannel} # incoming candidates
migrationSize::Int # size of the migrating group
migrationPeriod::Int # number of iterations between the migrations

metrics::WorkerMetrics # current metrics
is_stopping::Bool # if the worker is in stopping mode
end

# out wrapper ctor,
# starts "background" migrants receiver task
function PopulationOptimizerWrapper{O<:PopulationOptimizer,E<:Evaluator}(
id::Int, optimizer::O, evaluator::E,
to_master::MessageUtils.SyncObjRef{RemoteChannel},
from_master::MessageUtils.SyncObjRef{RemoteChannel},
migrationSize = 1, migrationPeriod = 100)
wrapper = PopulationOptimizerWrapper{O,E}(id, optimizer, evaluator,
to_master, from_master,
migrationSize, migrationPeriod,
WorkerMetrics(), false)
# task that reads imigrants from the master
@schedule begin
while !wrapper.is_stopping
#println("receiving imigrants...")
recv_imigrants!(wrapper)
#println("imigrants task yield()")
yield() # return to normal worker processing
end
end

return wrapper
end

function send_emigrants(wrapper::PopulationOptimizerWrapper)
pop = population(wrapper.optimizer)
# prepare the group of emigrants
migrant_ixs = sample(1:popsize(pop), wrapper.migrationSize, replace=false)
for migrant_ix in migrant_ixs
migrant = acquire_candi(pop, migrant_ix)
# send them outward
wrapper.metrics.num_evals = num_evals(wrapper.evaluator)
put!(wrapper.to_master, CandidateMessage{fitness_type(pop)}(wrapper.id, wrapper.metrics, migrant))
# FIXME check that we the reuse of candidate does not affect
# the migrants while they wait to be sent
release_candi(pop, migrant)
end
end

# send the best candidate to the master to acknowledge worker shutdown
function finalize!(wrapper::PopulationOptimizerWrapper)
wrapper.is_stopping = true
# send the best candidate
pop = population(wrapper.optimizer)
best_candi = acquire_candi(pop)
copy!(best_candi.params, best_candidate(wrapper.evaluator.archive))
best_candi.fitness = best_fitness(wrapper.evaluator.archive)
best_candi.index = -1 # we don't know it
best_candi.tag = 0
# HACK send negative worker index to acknowledge the worker is finishing
wrapper.metrics.num_evals = num_evals(wrapper.evaluator)
put!(wrapper.to_master, CandidateMessage{fitness_type(pop)}(-wrapper.id, wrapper.metrics, best_candi))
release_candi(pop, best_candi)
end

# receive migrants (called from "background" task)
function recv_imigrants!(wrapper::PopulationOptimizerWrapper)
pop = population(wrapper.optimizer)
msg = take!(wrapper.from_master)
if msg.worker == -12345 # special index sent by master to indicate termination
finalize!(wrapper)
return
end

# assign migrants to random population indices
migrant_ix = sample(1:popsize(pop))
candidates = Vector{candidate_type(pop)}()
sizehint!(candidates, 2)
push!(candidates, acquire_candi(pop, migrant_ix))
push!(candidates, acquire_candi(pop, msg.candi))
candidates[end].index = migrant_ix # override the incoming index
rank_by_fitness!(wrapper.evaluator, candidates)
wrapper.metrics.num_migrated += 1
wrapper.metrics.num_better_migrated += tell!(wrapper.optimizer, candidates)
end

# run the wrapper (called in the "main" task)
function run!(wrapper::PopulationOptimizerWrapper)
while !wrapper.is_stopping
wrapper.metrics.num_steps += 1
#println("$(wrapper.metrics.num_steps)-th iteration")
if wrapper.metrics.num_steps % wrapper.migrationPeriod == 0
#info("$(myid()): sending migrants")
send_emigrants(wrapper) # before we started processing
#println("run!() task yield()")
yield() # switch to migrants receiving task
end
# normal ask/tell sequence
candidates = ask(wrapper.optimizer)
rank_by_fitness!(wrapper.evaluator, candidates)
wrapper.metrics.num_better += tell!(wrapper.optimizer, candidates)
end
end

# Function that the master process spawns at each worker process.
# Creates and run the worker wrapper
function run_worker(id::Int,
workers_started::MessageUtils.SyncObjRef{RemoteChannel},
problem::OptimizationProblem,
optimizer_generator::Function,
to_master::MessageUtils.SyncObjRef{RemoteChannel},
from_master::MessageUtils.SyncObjRef{RemoteChannel},
migrationSize, migrationPeriod)
info("Initializing parallel optimization worker #$id at task=$(myid())")
wrapper = PopulationOptimizerWrapper(id,
optimizer_generator(problem),
ProblemEvaluator(problem),
to_master, from_master,
migrationSize, migrationPeriod)
# create immigrants receiving tasks
put!(workers_started, true)
info("Starting worker #$id")
run!(wrapper)
info("Worker #$id stopped")
end
1 change: 1 addition & 0 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ my_tests = [
"test_differential_evolution.jl",
"test_adaptive_differential_evolution.jl",
"test_natural_evolution_strategies.jl",
"test_parallel_population_optimizer.jl",

"test_toplevel_bboptimize.jl",
"test_smoketest_bboptimize.jl",
Expand Down
20 changes: 20 additions & 0 deletions test/test_parallel_population_optimizer.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
facts("Parallel population optimizer") do

if nprocs() < 4
addprocs(4-nprocs())
end

@everywhere using BlackBoxOptim, MessageUtils

rosenbrock2d(x) = (1.0 - x[1])^2 + 100.0 * (x[2] - x[1]^2)^2

b, f = bboptimize(rosenbrock2d; method = :parallel_population_optimizer,
search_space = [(-5.0, 5.0), (-2.0, 2.0)], max_time = 100.0,
parameters = @compat Dict{Symbol,Any}(
:ShowTrace => true,
:MigrationSize => 2,
:MigrationPeriod => 100))
@fact size(b) => (2,)
@fact typeof(f) => Float64
@fact f => less_than(100.0)
end

0 comments on commit 4429258

Please sign in to comment.