Skip to content

Commit

Permalink
add ParallelPopulationOptimizer
Browse files Browse the repository at this point in the history
  • Loading branch information
alyst committed Aug 10, 2015
1 parent ba049c5 commit 20a2a69
Show file tree
Hide file tree
Showing 7 changed files with 360 additions and 4 deletions.
2 changes: 2 additions & 0 deletions src/BlackBoxOptim.jl
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ using Distributions, StatsBase, Compat

export Optimizer, AskTellOptimizer, SteppingOptimizer, PopulationOptimizer,
bboptimize, bbsetup, compare_optimizers,
ParallelPopulationOptimizer,

DiffEvoOpt, de_rand_1_bin, de_rand_1_bin_radiuslimited,
adaptive_de_rand_1_bin, adaptive_de_rand_1_bin_radiuslimited,
Expand Down Expand Up @@ -177,6 +178,7 @@ include("resampling_memetic_search.jl")
include("simultaneous_perturbation_stochastic_approximation.jl")
include("generating_set_search.jl")
include("direct_search_with_probabilistic_descent.jl")
include("parallel_population_optimizer.jl")

# Fitness
# include("fitness/fitness_types.jl") FIXME merge it with fitness.jl
Expand Down
1 change: 1 addition & 0 deletions src/optimization_methods.jl
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const ValidMethods = @compat Dict{Symbol,Union(Any,Function)}(
:simultaneous_perturbation_stochastic_approximation => SimultaneousPerturbationSA2,
:generating_set_search => GeneratingSetSearcher,
:probabilistic_descent => direct_search_probabilistic_descent,
:parallel_population_optimizer => parallel_population_optimizer,
)

const MethodNames = sort!(collect(keys(ValidMethods)))
334 changes: 334 additions & 0 deletions src/parallel_population_optimizer.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,334 @@
const ParallelPopulationOptimizer_DefaultParameters = @compat Dict{Symbol,Any}(
:WorkerMethod => :adaptive_de_rand_1_bin_radiuslimited, # worker population optimization method
:NWorkers => 2, # number of workers
:MigrationSize => 1, # number of "migrant" individual to sent to the master
:MigrationPeriod => 100, # number of worker iterations before sending migrants
:ArchiveCapacity => 10, # ParallelPseudoEvaluator archive capacity
:ToWorkerChannelCapacity => 100, # how many unread messages the master->worker channel can store
:FromWorkersChannelCapacity => 1000 # how many unread messages the workers->master channel can store
)

# metrics from worker optimizer
type WorkerMetrics
num_evals::Int # number of function evals
num_steps::Int # number of steps
num_better::Int # number of steps that improved best fitness
num_migrated::Int # number of migrants worker received
num_better_migrated::Int # number of migrants accepted (because fitness improved)

WorkerMetrics() = new(0, 0, 0, 0, 0)
end

function Base.copy!(a::WorkerMetrics, b::WorkerMetrics)
a.num_evals = b.num_evals
a.num_steps = b.num_steps
a.num_better = b.num_better
a.num_migrated = b.num_migrated
a.num_better_migrated = b.num_better_migrated
return a
end

# fake evaluator for ParallelPopulationOptimizer
# it doesn't evaluate itself, but stores some
# metrics from the workers evaluators
type ParallelPseudoEvaluator{F, P<:OptimizationProblem} <: Evaluator{P}
problem::P
archive::TopListArchive{F}
workers_metrics::Vector{WorkerMetrics} # function evals per worker etc
last_fitness::F
end

num_evals(ppe::ParallelPseudoEvaluator) = mapreduce(x -> x.num_evals, +, 0, ppe.workers_metrics)
num_better(ppe::ParallelPseudoEvaluator) = mapreduce(x -> x.num_better, +, 0, ppe.workers_metrics)

function ParallelPseudoEvaluator{P<:OptimizationProblem}(
problem::P, nworkers::Int;
ArchiveCapacity::Int = 10)
ParallelPseudoEvaluator{fitness_type(problem), P}(
problem,
TopListArchive(fitness_scheme(problem), numdims(problem), ArchiveCapacity),
WorkerMetrics[WorkerMetrics() for i in 1:nworkers],
nafitness(fitness_scheme(problem)))
end

# message with the candidate passed between the workers and the master
immutable CandidateMessage{F}
worker::Int # origin of candidate
metrics::WorkerMetrics # current origin worker metrics
candi::Candidate{F}
end

typealias WorkerChannel{F} Channel{CandidateMessage{F}}
typealias WorkerChannelRef{F} RemoteRef{WorkerChannel{F}}

# Parallel population optimizer
# starts nworkers parallel population optimizers.
# At regular interval, the workers send the master process their random population members
# and the master redirects them to the other workers
type ParallelPopulationOptimizer{F, P<:OptimizationProblem} <: SteppingOptimizer
final_fitnesses::Vector{RemoteRef{Channel{Any}}} # references to the @spawnat ID run_worker()
from_workers::WorkerChannelRef{F}
to_workers::Vector{WorkerChannelRef{F}}
evaluator::ParallelPseudoEvaluator{F, P}

ParallelPopulationOptimizer(final_fitnesses::Vector{RemoteRef{Channel{Any}}},
from_workers::WorkerChannelRef{F},
to_workers::Vector{WorkerChannelRef{F}},
evaluator::ParallelPseudoEvaluator{F, P}) =
new(final_fitnesses, from_workers, to_workers, evaluator)
end

nworkers(ppopt::ParallelPopulationOptimizer) = length(ppopt.to_workers)

# read worker's message, stores the worker metrics and updates best fitness using
function store!{F}(ppe::ParallelPseudoEvaluator{F}, msg::CandidateMessage{F})
copy!(ppe.workers_metrics[msg.worker], msg.metrics)
if !isnafitness(msg.candi.fitness, fitness_scheme(ppe)) # store only the candidates with the known fitness
add_candidate!(ppe.archive, msg.candi.fitness, msg.candi.params, num_evals(ppe))
end
end

# check that worker stil running
# their RemoteRefs should not be ready,
# but if there was exception in the worker,
# it would be thrown into the main thread
function check_workers_running{T}(workers::AbstractVector{RemoteRef{T}})
for i in eachindex(workers)
if isready(workers[i])
fetch(workers[i]) # fetch the worker, this should trigger an exception
# no exception, but the worker should not be ready
error("Worker #i is finished before the master shutdown")
end
end
return false
end

# outer parallel population optimizer constructor that
# also spawns worker tasks
function ParallelPopulationOptimizer{P<:OptimizationProblem}(
problem::P, optimizer_generator::Function, nworkers::Int,
MigrationSize::Int = 1, MigrationPeriod::Int = 100,
ArchiveCapacity::Int = 10,
ToWorkerChannelCapacity::Int = 1000,
FromWorkersChannelCapacity::Int = nworkers * ToWorkerChannelCapacity)
F = fitness_type(problem)
info("Constructing parallel optimizer...")
from_workers = RemoteRef(() -> WorkerChannel{F}(FromWorkersChannelCapacity))
to_workers = WorkerChannelRef{F}[RemoteRef(() -> WorkerChannel{F}(ToWorkerChannelCapacity), i+1) for i in 1:nworkers]
workers_started = RemoteRef(() -> Channel{Int}(nworkers)) # FIXME do we need to wait for the worker?
all_ready = RemoteRef(() -> Channel{Bool}(1))
final_fitnesses = map(function(id)
info("Initializing worker #$id...");
pid = id+1;
@spawnat pid run_worker(id, workers_started, all_ready, problem, optimizer_generator,
from_workers, to_workers[id],
MigrationSize, MigrationPeriod)
end, 1:nworkers)
# wait until all the workers are started
# FIXME is it required?
nstarted = 0
while nstarted < nworkers
check_workers_running(final_fitnesses)
worker_id = take!(workers_started)
info("Worker #$worker_id is ready")
nstarted += 1
end
put!(all_ready, true)
info("All workers ready")
ParallelPopulationOptimizer{F, P}(final_fitnesses, from_workers, to_workers,
ParallelPseudoEvaluator(problem, nworkers;
ArchiveCapacity = ArchiveCapacity))
end

function parallel_population_optimizer(problem::OptimizationProblem, parameters::Parameters)
param_dict = convert(ParamsDict, parameters) # FIXME convert to dict to avoid serialization problems of DictChain
params = chain(ParallelPopulationOptimizer_DefaultParameters, parameters)
worker_method = params[:WorkerMethod]
info( "Using $worker_method as worker method for parallel optimization")
optimizer_func = ValidMethods[worker_method]

ParallelPopulationOptimizer(problem, problem -> optimizer_func(problem, param_dict),
params[:NWorkers], params[:MigrationSize], params[:MigrationPeriod],
params[:ArchiveCapacity],
params[:ToWorkerChannelCapacity], params[:FromWorkersChannelCapacity])
end

# redirects candidate to another worker
function redirect{F}(ppopt::ParallelPopulationOptimizer{F}, msg::CandidateMessage{F})
# redirect to the other parallel task
#println("redirecting from $(msg.worker)")
recv_ix = sample(1:(length(ppopt.to_workers)-1))
if recv_ix >= msg.worker # index is the origin worker
recv_ix += 1
end
msg.candi.op = NO_GEN_OP # reset operation and tag to avoid calling adjust!() out-of-context
msg.candi.tag = 0
put!(ppopt.to_workers[recv_ix], msg)
#println("redirecting done")
end

function step!{F}(ppopt::ParallelPopulationOptimizer{F})
#println("main#: n_evals=$(num_evals(ppopt.evaluator))")
check_workers_running(ppopt.final_fitnesses)
last_better = num_better(ppopt.evaluator)
candidate = take!(ppopt.from_workers)::CandidateMessage{F}
#println("candidate=$candidate")
store!(ppopt.evaluator, candidate)
redirect(ppopt, candidate)
return num_better(ppopt.evaluator) - last_better
end

const FINAL_CANDIDATE = -12345 # special terminating candidate with worker index

# finalize the master: wait for the workers shutdown,
# get their best candidates
function finalize!{F}(ppopt::ParallelPopulationOptimizer{F})
info("Finalizing parallel optimizer...")
# send special terminating candidate
for to_worker in ppopt.to_workers
put!(to_worker, CandidateMessage{F}(FINAL_CANDIDATE, WorkerMetrics(),
Candidate{F}(Individual())))
end
# wait until all threads finish
# the last candidates being sent are the best in the population
info("Waiting for the workers to finish...")
for i in eachindex(ppopt.final_fitnesses)
msg = fetch(ppopt.final_fitnesses[i])::CandidateMessage{F}
@assert msg.worker == i
store!(ppopt.evaluator, msg) # store the best candidate
info("Worker #$(msg.worker) finished")
end
info("Parallel optimizer finished. Metrics per worker: $(ppopt.evaluator.workers_metrics)")
end

# wraps the worker's population optimizer
# and communicates with the master
type PopulationOptimizerWrapper{F,O<:PopulationOptimizer,E<:Evaluator}
id::Int # worker's Id
optimizer::O
evaluator::E
to_master::WorkerChannelRef{F} # outgoing candidates
from_master::WorkerChannelRef{F} # incoming candidates
migrationSize::Int # size of the migrating group
migrationPeriod::Int # number of iterations between the migrations

metrics::WorkerMetrics # current metrics
is_stopping::Bool # if the optimizer is being shut down
can_receive::Condition # condition recv_task waits for
can_run::Condition # condition run!() task waits for
recv_task::Task # task that continuously executes recv_immigrants()

function Base.call{F,O,E}(::Type{PopulationOptimizerWrapper},
id::Int, optimizer::O, evaluator::E,
to_master::WorkerChannelRef{F}, from_master::WorkerChannelRef{F},
migrationSize::Int = 1, migrationPeriod::Int = 100)
res = new{F,O,E}(id, optimizer, evaluator,
to_master, from_master,
migrationSize, migrationPeriod,
WorkerMetrics(), false, Condition(), Condition())
# "background" migrants receiver task
res.recv_task = @schedule while !res.is_stopping
wait(res.can_receive)
recv_immigrants!(res)
notify(res.can_run)
end
return res
end
end

function send_emigrants{F}(wrapper::PopulationOptimizerWrapper{F})
pop = population(wrapper.optimizer)
# prepare the group of emigrants
migrant_ixs = sample(1:popsize(pop), wrapper.migrationSize, replace=false)
for migrant_ix in migrant_ixs
migrant = acquire_candi(pop, migrant_ix)
# send them outward
wrapper.metrics.num_evals = num_evals(wrapper.evaluator)
put!(wrapper.to_master, CandidateMessage{F}(wrapper.id, wrapper.metrics, migrant))
# FIXME check that we the reuse of candidate does not affect
# the migrants while they wait to be sent
release_candi(pop, migrant)
end
end

# receive migrants (called from "background" task)
function recv_immigrants!{F}(wrapper::PopulationOptimizerWrapper{F})
pop = population(wrapper.optimizer)
msg = take!(wrapper.from_master)::CandidateMessage{F}
if msg.worker == FINAL_CANDIDATE # special index sent by master to indicate termination
wrapper.is_stopping = true
return
end

# assign migrants to random population indices
migrant_ix = sample(1:popsize(pop))
candidates = sizehint!(Vector{candidate_type(pop)}(), 2)
push!(candidates, acquire_candi(pop, migrant_ix))
push!(candidates, acquire_candi(pop, msg.candi))
candidates[end].index = migrant_ix # override the incoming index
rank_by_fitness!(wrapper.evaluator, candidates)
wrapper.metrics.num_migrated += 1
wrapper.metrics.num_better_migrated += tell!(wrapper.optimizer, candidates)
end

# run the wrapper (called in the "main" task)
function run!(wrapper::PopulationOptimizerWrapper)
while !wrapper.is_stopping
if istaskdone(wrapper.recv_task)
error("recv_task has completed prematurely")
end
wrapper.metrics.num_steps += 1
#println("$(wrapper.metrics.num_steps)-th iteration")
if wrapper.metrics.num_steps % wrapper.migrationPeriod == 0
send_emigrants(wrapper) # before we started processing
notify(wrapper.can_receive) # switch to migrants receiving task
wait(wrapper.can_run)
end
# normal ask/tell sequence
candidates = ask(wrapper.optimizer)
rank_by_fitness!(wrapper.evaluator, candidates)
wrapper.metrics.num_better += tell!(wrapper.optimizer, candidates)
end
finalize!(wrapper.optimizer, wrapper.evaluator)
end

# returns the candidate message with final metrics and the best candidate
function final_fitness{F}(wrapper::PopulationOptimizerWrapper{F})
@assert wrapper.is_stopping
# send the best candidate
pop = population(wrapper.optimizer)
best_candi = acquire_candi(pop)
copy!(best_candi.params, best_candidate(wrapper.evaluator.archive))
best_candi.fitness = best_fitness(wrapper.evaluator.archive)
best_candi.index = -1 # we don't know it
best_candi.tag = 0
# HACK send negative worker index to acknowledge the worker is finishing
wrapper.metrics.num_evals = num_evals(wrapper.evaluator)
CandidateMessage{F}(wrapper.id, wrapper.metrics, best_candi)
end

# Function that the master process spawns at each worker process.
# Creates and run the worker wrapper
function run_worker{F}(id::Int,
workers_started::RemoteRef{Channel{Int}},
all_ready::RemoteRef{Channel{Bool}},
problem::OptimizationProblem,
optimizer_generator::Function,
to_master::WorkerChannelRef{F},
from_master::WorkerChannelRef{F},
migrationSize, migrationPeriod)
info("Initializing parallel optimization worker #$id at task=$(myid())")
wrapper = PopulationOptimizerWrapper(id,
optimizer_generator(problem),
ProblemEvaluator(problem),
to_master, from_master,
migrationSize, migrationPeriod)
# create immigrants receiving tasks=#
put!(workers_started, id)
info("Waiting for the master start signal...")
fetch(all_ready) # wait until the master and other workers are ready
info("Running worker #$id")
run!(wrapper)
info("Worker #$id stopped")
final_fitness(wrapper) # return the best fitness
end
5 changes: 5 additions & 0 deletions test/helper.jl
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,8 @@ using FactCheck
using Compat

NumTestRepetitions = 100

if nprocs() < 4
addprocs(4-nprocs()) # add procs for parallel population optimizer
end
@everywhere using BlackBoxOptim
1 change: 1 addition & 0 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ my_tests = [
"test_differential_evolution.jl",
"test_adaptive_differential_evolution.jl",
"test_natural_evolution_strategies.jl",
"test_parallel_population_optimizer.jl",

"test_toplevel_bboptimize.jl",
"test_smoketest_bboptimize.jl",
Expand Down
13 changes: 13 additions & 0 deletions test/test_parallel_population_optimizer.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
facts("Parallel population optimizer") do

@everywhere using BlackBoxOptim

rosenbrock2d(x) = (1.0 - x[1])^2 + 100.0 * (x[2] - x[1]^2)^2

res = bboptimize(rosenbrock2d; Method = :parallel_population_optimizer,
SearchSpace = [(-5.0, 5.0), (-2.0, 2.0)], MaxTime = 100.0,
ShowTrace = true, MigrationSize = 2, MigrationPeriod = 100)
@fact size(best_candidate(res)) => (2,)
@fact typeof(best_fitness(res)) => Float64
@fact best_fitness(res) => less_than(100.0)
end
Loading

0 comments on commit 20a2a69

Please sign in to comment.