diff --git a/src/Configure.jl b/src/Configure.jl index fb1d1aa6e..ffcceca8d 100644 --- a/src/Configure.jl +++ b/src/Configure.jl @@ -44,7 +44,21 @@ function assert_operators_well_defined(T, options::Options) end # Check for errors before they happen -function test_option_configuration(T, options::Options) +function test_option_configuration( + parallelism, datasets::Vector{D}, saved_state, options::Options +) where {T,D<:Dataset{T}} + if options.deterministic && parallelism != :serial + error("Determinism is only guaranteed for serial mode.") + end + if parallelism == :multithreading && Threads.nthreads() == 1 + @warn "You are using multithreading mode, but only one thread is available. Try starting julia with `--threads=auto`." + end + if any(d -> d.X_units !== nothing || d.y_units !== nothing, datasets) && + options.dimensional_constraint_penalty === nothing && + saved_state === nothing + @warn "You are using dimensional constraints, but `dimensional_constraint_penalty` was not set. The default penalty of `1000.0` will be used." + end + for op in (options.operators.binops..., options.operators.unaops...) if is_anonymous_function(op) throw( @@ -81,25 +95,18 @@ function test_dataset_configuration( ) end - if size(dataset.X, 2) > 10000 - if !options.batching - debug( - verbosity > 0, - "Note: you are running with more than 10,000 datapoints. You should consider turning on batching (`options.batching`), and also if you need that many datapoints. Unless you have a large amount of noise (in which case you should smooth your dataset first), generally < 10,000 datapoints is enough to find a functional form.", - ) - end + if size(dataset.X, 2) > 10000 && !options.batching && verbosity > 0 + @info "Note: you are running with more than 10,000 datapoints. You should consider turning on batching (`options.batching`), and also if you need that many datapoints. Unless you have a large amount of noise (in which case you should smooth your dataset first), generally < 10,000 datapoints is enough to find a functional form." end - if !(typeof(options.elementwise_loss) <: SupervisedLoss) - if dataset.weighted - if !(3 in [m.nargs - 1 for m in methods(options.elementwise_loss)]) - throw( - AssertionError( - "When you create a custom loss function, and are using weights, you need to define your loss function with three scalar arguments: f(prediction, target, weight).", - ), - ) - end - end + if !(typeof(options.elementwise_loss) <: SupervisedLoss) && + dataset.weighted && + !(3 in [m.nargs - 1 for m in methods(options.elementwise_loss)]) + throw( + AssertionError( + "When you create a custom loss function, and are using weights, you need to define your loss function with three scalar arguments: f(prediction, target, weight).", + ), + ) end end @@ -188,14 +195,15 @@ end function copy_definition_to_workers(op, procs, options::Options, verbosity) name = nameof(op) - debug_inline(verbosity > 0, "Copying definition of $op to workers...") + verbosity > 0 && @info "Copying definition of $op to workers..." src_ms = methods(op).ms # Thanks https://discourse.julialang.org/t/easy-way-to-send-custom-function-to-distributed-workers/22118/2 @everywhere procs @eval function $name end for m in src_ms @everywhere procs @eval $m end - return debug(verbosity > 0, "Finished!") + verbosity > 0 && @info "Finished!" + return nothing end function test_function_on_workers(example_inputs, op, procs) @@ -209,7 +217,7 @@ function test_function_on_workers(example_inputs, op, procs) end function activate_env_on_workers(procs, project_path::String, options::Options, verbosity) - debug(verbosity > 0, "Activating environment on workers.") + verbosity > 0 && @info "Activating environment on workers." @everywhere procs begin Base.MainInclude.eval( quote @@ -223,7 +231,7 @@ end function import_module_on_workers(procs, filename::String, options::Options, verbosity) included_local = !("SymbolicRegression" in [k.name for (k, v) in Base.loaded_modules]) if included_local - debug_inline(verbosity > 0, "Importing local module ($filename) on workers...") + verbosity > 0 && @info "Importing local module ($filename) on workers..." @everywhere procs begin # Parse functions on every worker node Base.MainInclude.eval( @@ -233,18 +241,18 @@ function import_module_on_workers(procs, filename::String, options::Options, ver end, ) end - debug(verbosity > 0, "Finished!") + verbosity > 0 && @info "Finished!" else - debug_inline(verbosity > 0, "Importing installed module on workers...") + verbosity > 0 && @info "Importing installed module on workers..." @everywhere procs begin - Base.MainInclude.eval(using SymbolicRegression) + Base.MainInclude.eval(:(using SymbolicRegression)) end - debug(verbosity > 0, "Finished!") + verbosity > 0 && @info "Finished!" end end function test_module_on_workers(procs, options::Options, verbosity) - debug_inline(verbosity > 0, "Testing module on workers...") + verbosity > 0 && @info "Testing module on workers..." futures = [] for proc in procs push!( @@ -255,14 +263,15 @@ function test_module_on_workers(procs, options::Options, verbosity) for future in futures fetch(future) end - return debug(verbosity > 0, "Finished!") + verbosity > 0 && @info "Finished!" + return nothing end function test_entire_pipeline( procs, dataset::Dataset{T}, options::Options, verbosity ) where {T<:DATA_TYPE} futures = [] - debug_inline(verbosity > 0, "Testing entire pipeline on workers...") + verbosity > 0 && @info "Testing entire pipeline on workers..." for proc in procs push!( futures, @@ -293,5 +302,40 @@ function test_entire_pipeline( for future in futures fetch(future) end - return debug(verbosity > 0, "Finished!") + verbosity > 0 && @info "Finished!" + return nothing +end + +function configure_workers(; + procs::Union{Vector{Int},Nothing}, + numprocs::Int, + addprocs_function::Function, + options::Options, + project_path, + file, + exeflags::Cmd, + verbosity, + example_dataset::Dataset, + runtests::Bool, +) + (procs, we_created_procs) = if procs === nothing + (addprocs_function(numprocs; lazy=false, exeflags), true) + else + (procs, false) + end + + if we_created_procs + activate_env_on_workers(procs, project_path, options, verbosity) + import_module_on_workers(procs, file, options, verbosity) + end + move_functions_to_workers(procs, options, example_dataset, verbosity) + if runtests + test_module_on_workers(procs, options, verbosity) + end + + if runtests + test_entire_pipeline(procs, example_dataset, options, verbosity) + end + + return (procs, we_created_procs) end diff --git a/src/Dataset.jl b/src/Dataset.jl index cb3d109ef..3819984d9 100644 --- a/src/Dataset.jl +++ b/src/Dataset.jl @@ -10,7 +10,7 @@ import DynamicQuantities: DEFAULT_DIM_BASE_TYPE import ...InterfaceDynamicQuantitiesModule: get_si_units, get_sym_units -import ..UtilsModule: subscriptify +import ..UtilsModule: subscriptify, get_base_type import ..ProgramConstantsModule: BATCH_DIM, FEATURE_DIM, DATA_TYPE, LOSS_TYPE #! format: off import ...deprecate_varmap @@ -103,12 +103,12 @@ function Dataset( display_variable_names=variable_names, y_variable_name::Union{String,Nothing}=nothing, extra::NamedTuple=NamedTuple(), - loss_type::Type{Linit}=Nothing, + loss_type::Type{L}=Nothing, X_units::Union{AbstractVector,Nothing}=nothing, y_units=nothing, # Deprecated: varMap=nothing, -) where {T<:DATA_TYPE,Linit} +) where {T<:DATA_TYPE,L} Base.require_one_based_indexing(X) y !== nothing && Base.require_one_based_indexing(y) # Deprecation warning: @@ -142,7 +142,12 @@ function Dataset( sum(y) / n end end - out_loss_type = (Linit === Nothing) ? T : Linit + out_loss_type = if L === Nothing + T <: Complex ? get_base_type(T) : T + else + L + end + use_baseline = true baseline = one(out_loss_type) y_si_units = get_si_units(T, y_units) diff --git a/src/HallOfFame.jl b/src/HallOfFame.jl index 76ae26c3b..68e4af8f4 100644 --- a/src/HallOfFame.jl +++ b/src/HallOfFame.jl @@ -4,7 +4,7 @@ import DynamicExpressions: Node, string_tree import ..UtilsModule: split_string import ..CoreModule: MAX_DEGREE, Options, Dataset, DATA_TYPE, LOSS_TYPE, relu import ..ComplexityModule: compute_complexity -import ..PopMemberModule: PopMember, copy_pop_member +import ..PopMemberModule: PopMember import ..LossFunctionsModule: eval_loss import ..InterfaceDynamicExpressionsModule: format_dimensions using Printf: @sprintf @@ -60,12 +60,9 @@ function HallOfFame( ) end -function copy_hall_of_fame( - hof::HallOfFame{T,L} -)::HallOfFame{T,L} where {T<:DATA_TYPE,L<:LOSS_TYPE} +function Base.copy(hof::HallOfFame) return HallOfFame( - [copy_pop_member(member) for member in hof.members], - [exists for exists in hof.exists], + [copy(member) for member in hof.members], [exists for exists in hof.exists] ) end @@ -98,7 +95,7 @@ function calculate_pareto_frontier( end end if betterThanAllSmaller - push!(dominating, copy_pop_member(member)) + push!(dominating, copy(member)) end end return dominating diff --git a/src/MLJInterface.jl b/src/MLJInterface.jl index 7252525a7..434b9ac18 100644 --- a/src/MLJInterface.jl +++ b/src/MLJInterface.jl @@ -39,6 +39,7 @@ function modelexpr(model_name::Symbol) numprocs::Union{Int,Nothing} = nothing procs::Union{Vector{Int},Nothing} = nothing addprocs_function::Union{Function,Nothing} = nothing + heap_size_hint_in_bytes::Union{Integer,Nothing} = nothing runtests::Bool = true loss_type::L = Nothing selection_method::Function = choose_best @@ -166,6 +167,7 @@ function _update(m, verbosity, old_fitresult, old_cache, X, y, w, options) numprocs=m.numprocs, procs=m.procs, addprocs_function=m.addprocs_function, + heap_size_hint_in_bytes=m.heap_size_hint_in_bytes, runtests=m.runtests, saved_state=(old_fitresult === nothing ? nothing : old_fitresult.state), return_state=true, @@ -490,6 +492,11 @@ function tag_with_docstring(model_name::Symbol, description::String, bottom_matt which is the number of processes to use, as well as the `lazy` keyword argument. For example, if set up on a slurm cluster, you could pass `addprocs_function = addprocs_slurm`, which will set up slurm processes. + - `heap_size_hint_in_bytes::Union{Int,Nothing}=nothing`: On Julia 1.9+, you may set the `--heap-size-hint` + flag on Julia processes, recommending garbage collection once a process + is close to the recommended size. This is important for long-running distributed + jobs where each process has an independent memory, and can help avoid + out-of-memory errors. By default, this is set to `Sys.free_memory() / numprocs`. - `runtests::Bool=true`: Whether to run (quick) tests before starting the search, to see if there will be any problems during the equation search related to the host environment. diff --git a/src/Migration.jl b/src/Migration.jl index 97f816bce..db711c68d 100644 --- a/src/Migration.jl +++ b/src/Migration.jl @@ -3,7 +3,7 @@ module MigrationModule using StatsBase: StatsBase import ..CoreModule: Options, DATA_TYPE, LOSS_TYPE import ..PopulationModule: Population -import ..PopMemberModule: PopMember, copy_pop_member_reset_birth +import ..PopMemberModule: PopMember, reset_birth! import ..UtilsModule: poisson_sample """ @@ -33,9 +33,8 @@ function migrate!( migrants = StatsBase.sample(migrant_candidates, num_replace; replace=true) for (i, migrant) in zip(locations, migrants) - base_pop.members[i] = copy_pop_member_reset_birth( - migrant; deterministic=options.deterministic - ) + base_pop.members[i] = copy(migrant) + reset_birth!(base_pop.members[i]; options.deterministic) end return nothing end diff --git a/src/PopMember.jl b/src/PopMember.jl index 72e1c86e9..9ca2f5cc6 100644 --- a/src/PopMember.jl +++ b/src/PopMember.jl @@ -109,10 +109,8 @@ function PopMember( ) end -function copy_pop_member( - p::PopMember{T,L} -)::PopMember{T,L} where {T<:DATA_TYPE,L<:LOSS_TYPE} - tree = copy_node(p.tree) +function Base.copy(p::PopMember{T,L})::PopMember{T,L} where {T<:DATA_TYPE,L<:LOSS_TYPE} + tree = copy(p.tree) score = copy(p.score) loss = copy(p.loss) birth = copy(p.birth) @@ -122,12 +120,9 @@ function copy_pop_member( return PopMember{T,L}(tree, score, loss, birth, complexity, ref, parent) end -function copy_pop_member_reset_birth( - p::PopMember{T,L}; deterministic::Bool -)::PopMember{T,L} where {T<:DATA_TYPE,L<:LOSS_TYPE} - new_member = copy_pop_member(p) - new_member.birth = get_birth_order(; deterministic=deterministic) - return new_member +function reset_birth!(p::PopMember; deterministic::Bool) + p.birth = get_birth_order(; deterministic) + return p end # Can read off complexity directly from pop members diff --git a/src/Population.jl b/src/Population.jl index d1d9ac9c1..5749dcfaa 100644 --- a/src/Population.jl +++ b/src/Population.jl @@ -8,7 +8,7 @@ import ..ComplexityModule: compute_complexity import ..LossFunctionsModule: score_func, update_baseline_loss! import ..AdaptiveParsimonyModule: RunningSearchStatistics import ..MutationFunctionsModule: gen_random_tree -import ..PopMemberModule: PopMember, copy_pop_member +import ..PopMemberModule: PopMember import ..UtilsModule: bottomk_fast, argmin_fast # A list of members of the population, with easy constructors, # which allow for random generation of new populations @@ -92,8 +92,8 @@ function Population( ) end -function copy_population(pop::P)::P where {P<:Population} - return Population([copy_pop_member(pm) for pm in pop.members]) +function Base.copy(pop::P)::P where {P<:Population} + return Population([copy(pm) for pm in pop.members]) end # Sample random members of the population, and make a new one diff --git a/src/RegularizedEvolution.jl b/src/RegularizedEvolution.jl index b1af07090..d74ab3394 100644 --- a/src/RegularizedEvolution.jl +++ b/src/RegularizedEvolution.jl @@ -1,6 +1,5 @@ module RegularizedEvolutionModule -import Random: shuffle! import DynamicExpressions: string_tree import ..CoreModule: Options, Dataset, RecordType, DATA_TYPE, LOSS_TYPE import ..PopMemberModule: PopMember diff --git a/src/SearchUtils.jl b/src/SearchUtils.jl index 2c4d78afb..a94044103 100644 --- a/src/SearchUtils.jl +++ b/src/SearchUtils.jl @@ -8,12 +8,14 @@ using Distributed import StatsBase: mean import ..UtilsModule: subscriptify -import ..CoreModule: Dataset, Options +import ..CoreModule: Dataset, Options, MAX_DEGREE import ..ComplexityModule: compute_complexity -import ..PopulationModule: Population, copy_population +import ..PopulationModule: Population +import ..PopMemberModule: PopMember import ..HallOfFameModule: - HallOfFame, copy_hall_of_fame, calculate_pareto_frontier, string_dominating_pareto_curve + HallOfFame, calculate_pareto_frontier, string_dominating_pareto_curve import ..ProgressBarsModule: WrappedProgressBar, set_multiline_postfix!, manually_iterate! +import ..AdaptiveParsimonyModule: update_frequencies! function next_worker(worker_assignment::Dict{Tuple{Int,Int},Int}, procs::Vector{Int})::Int job_counts = Dict(proc => 0 for proc in procs) @@ -27,22 +29,39 @@ function next_worker(worker_assignment::Dict{Tuple{Int,Int},Int}, procs::Vector{ return least_busy_worker end -function next_worker(worker_assignment::Dict{Tuple{Int,Int},Int}, procs::Nothing)::Int - return 0 +function assign_next_worker!(worker_assignment; pop, out, parallelism, procs)::Int + if parallelism == :multiprocessing + worker_idx = next_worker(worker_assignment, procs) + worker_assignment[(out, pop)] = worker_idx + return worker_idx + else + return 0 + end end -macro sr_spawner(parallel, p, expr) - quote - if $(esc(parallel)) == :serial - $(esc(expr)) - elseif $(esc(parallel)) == :multiprocessing - @spawnat($(esc(p)), $(esc(expr))) - elseif $(esc(parallel)) == :multithreading - Threads.@spawn($(esc(expr))) +function initialize_worker_assignment() + return Dict{Tuple{Int,Int},Int}() +end + +macro sr_spawner(expr, kws...) + # Extract parallelism and worker_idx parameters from kws + @assert length(kws) == 2 + @assert all(ex -> ex.head == :(=), kws) + @assert any(ex -> ex.args[1] == :parallelism, kws) + @assert any(ex -> ex.args[1] == :worker_idx, kws) + parallelism = kws[findfirst(ex -> ex.args[1] == :parallelism, kws)].args[2] + worker_idx = kws[findfirst(ex -> ex.args[1] == :worker_idx, kws)].args[2] + return quote + if $(parallelism) == :serial + $(expr) + elseif $(parallelism) == :multiprocessing + @spawnat($(worker_idx), $(expr)) + elseif $(parallelism) == :multithreading + Threads.@spawn($(expr)) else error("Invalid parallel type.") end - end + end |> esc end function init_dummy_pops( @@ -284,7 +303,7 @@ function load_saved_hall_of_fame(saved_state) else hall_of_fame end - return [copy_hall_of_fame(hof) for hof in hall_of_fame] + return [copy(hof) for hof in hall_of_fame] end load_saved_hall_of_fame(::Nothing)::Nothing = nothing @@ -300,10 +319,30 @@ function get_population( end function load_saved_population(saved_state; out::Int, pop::Int) saved_pop = get_population(saved_state[1]; out=out, pop=pop) - return copy_population(saved_pop) + return copy(saved_pop) end load_saved_population(::Nothing; kws...) = nothing +""" + get_cur_maxsize(; options, total_cycles, cycles_remaining) + +For searches where the maxsize gradually increases, this function returns the +current maxsize. +""" +function get_cur_maxsize(; options::Options, total_cycles::Int, cycles_remaining::Int) + cycles_elapsed = total_cycles - cycles_remaining + fraction_elapsed = 1.0f0 * cycles_elapsed / total_cycles + in_warmup_period = fraction_elapsed <= options.warmup_maxsize_by + + if options.warmup_maxsize_by > 0 && in_warmup_period + return 3 + floor( + Int, (options.maxsize - 3) * fraction_elapsed / options.warmup_maxsize_by + ) + else + return options.maxsize + end +end + function construct_datasets( X, y, @@ -345,4 +384,22 @@ function construct_datasets( ] end +function update_hall_of_fame!( + hall_of_fame::HallOfFame, members::Vector{PM}, options::Options +) where {PM<:PopMember} + for member in members + size = compute_complexity(member, options) + valid_size = 0 < size < options.maxsize + MAX_DEGREE + if !valid_size + continue + end + not_filled = !hall_of_fame.exists[size] + better_than_current = member.score < hall_of_fame.members[size].score + if not_filled || better_than_current + hall_of_fame.members[size] = copy(member) + hall_of_fame.exists[size] = true + end + end +end + end diff --git a/src/SingleIteration.jl b/src/SingleIteration.jl index 3c804042b..9a8fe5c56 100644 --- a/src/SingleIteration.jl +++ b/src/SingleIteration.jl @@ -3,8 +3,7 @@ module SingleIterationModule import DynamicExpressions: Node, string_tree, simplify_tree, combine_operators import ..CoreModule: Options, Dataset, RecordType, DATA_TYPE, LOSS_TYPE import ..ComplexityModule: compute_complexity -import ..UtilsModule: debug -import ..PopMemberModule: copy_pop_member, generate_reference +import ..PopMemberModule: generate_reference import ..PopulationModule: Population, finalize_scores, best_sub_pop import ..HallOfFameModule: HallOfFame import ..AdaptiveParsimonyModule: RunningSearchStatistics @@ -84,7 +83,7 @@ function s_r_cycle( score < best_examples_seen.members[size].score ) best_examples_seen.exists[size] = true - best_examples_seen.members[size] = copy_pop_member(member) + best_examples_seen.members[size] = copy(member) end end first_loop = false diff --git a/src/SymbolicRegression.jl b/src/SymbolicRegression.jl index 5bd65b804..38f28f645 100644 --- a/src/SymbolicRegression.jl +++ b/src/SymbolicRegression.jl @@ -192,8 +192,7 @@ import .CoreModule: erf, erfc, atanh_clip -import .UtilsModule: - debug, debug_inline, is_anonymous_function, recursive_merge, json3_write +import .UtilsModule: is_anonymous_function, recursive_merge, json3_write import .ComplexityModule: compute_complexity import .CheckConstraintsModule: check_constraints import .AdaptiveParsimonyModule: @@ -206,17 +205,17 @@ import .MutationFunctionsModule: crossover_trees import .InterfaceDynamicExpressionsModule: @extend_operators import .LossFunctionsModule: eval_loss, score_func, update_baseline_loss! -import .PopMemberModule: PopMember, copy_pop_member, copy_pop_member_reset_birth -import .PopulationModule: - Population, copy_population, best_sub_pop, record_population, best_of_sample +import .PopMemberModule: PopMember, reset_birth! +import .PopulationModule: Population, best_sub_pop, record_population, best_of_sample import .HallOfFameModule: HallOfFame, calculate_pareto_frontier, string_dominating_pareto_curve import .SingleIterationModule: s_r_cycle, optimize_and_simplify_population import .ProgressBarsModule: WrappedProgressBar -import .RecorderModule: @recorder, find_iteration_from_record +import .RecorderModule: @recorder, is_recording, find_iteration_from_record import .MigrationModule: migrate! import .SearchUtilsModule: - next_worker, + assign_next_worker!, + initialize_worker_assignment, @sr_spawner, watch_stream, close_reader!, @@ -233,7 +232,9 @@ import .SearchUtilsModule: init_dummy_pops, load_saved_hall_of_fame, load_saved_population, - construct_datasets + construct_datasets, + get_cur_maxsize, + update_hall_of_fame! include("deprecates.jl") include("Configure.jl") @@ -292,6 +293,11 @@ which is useful for debugging and profiling. which is the number of processes to use, as well as the `lazy` keyword argument. For example, if set up on a slurm cluster, you could pass `addprocs_function = addprocs_slurm`, which will set up slurm processes. +- `heap_size_hint_in_bytes::Union{Int,Nothing}=nothing`: On Julia 1.9+, you may set the `--heap-size-hint` + flag on Julia processes, recommending garbage collection once a process + is close to the recommended size. This is important for long-running distributed + jobs where each process has an independent memory, and can help avoid + out-of-memory errors. By default, this is set to `Sys.free_memory() / numprocs`. - `runtests::Bool=true`: Whether to run (quick) tests before starting the search, to see if there will be any problems during the equation search related to the host environment. @@ -339,19 +345,20 @@ function equation_search( numprocs::Union{Int,Nothing}=nothing, procs::Union{Vector{Int},Nothing}=nothing, addprocs_function::Union{Function,Nothing}=nothing, + heap_size_hint_in_bytes::Union{Integer,Nothing}=nothing, runtests::Bool=true, saved_state=nothing, return_state::Union{Bool,Nothing}=nothing, - loss_type::Type{Linit}=Nothing, + loss_type::Type{L}=Nothing, verbosity::Union{Integer,Nothing}=nothing, progress::Union{Bool,Nothing}=nothing, X_units::Union{AbstractVector,Nothing}=nothing, y_units=nothing, - v_dim_out::Val{dim_out}=Val(nothing), + v_dim_out::Val{DIM_OUT}=Val(nothing), # Deprecated: multithreaded=nothing, varMap=nothing, -) where {T<:DATA_TYPE,Linit,dim_out} +) where {T<:DATA_TYPE,L,DIM_OUT} if multithreaded !== nothing error( "`multithreaded` is deprecated. Use the `parallelism` argument instead. " * @@ -364,10 +371,6 @@ function equation_search( @assert length(weights) == length(y) weights = reshape(weights, size(y)) end - if T <: Complex && loss_type == Nothing - get_base_type(::Type{Complex{BT}}) where {BT} = BT - loss_type = get_base_type(T) - end datasets = construct_datasets( X, @@ -378,7 +381,7 @@ function equation_search( y_variable_names, X_units, y_units, - loss_type, + L, ) return equation_search( @@ -389,12 +392,13 @@ function equation_search( numprocs=numprocs, procs=procs, addprocs_function=addprocs_function, + heap_size_hint_in_bytes=heap_size_hint_in_bytes, runtests=runtests, saved_state=saved_state, return_state=return_state, verbosity=verbosity, progress=progress, - v_dim_out=Val(dim_out), + v_dim_out=Val(DIM_OUT), ) end @@ -425,13 +429,14 @@ function equation_search( numprocs::Union{Int,Nothing}=nothing, procs::Union{Vector{Int},Nothing}=nothing, addprocs_function::Union{Function,Nothing}=nothing, + heap_size_hint_in_bytes::Union{Integer,Nothing}=nothing, runtests::Bool=true, saved_state=nothing, return_state::Union{Bool,Nothing}=nothing, verbosity::Union{Int,Nothing}=nothing, progress::Union{Bool,Nothing}=nothing, - v_dim_out::Val{dim_out}=Val(nothing), -) where {dim_out,T<:DATA_TYPE,L<:LOSS_TYPE,D<:Dataset{T,L}} + v_dim_out::Val{DIM_OUT}=Val(nothing), +) where {DIM_OUT,T<:DATA_TYPE,L<:LOSS_TYPE,D<:Dataset{T,L}} v_concurrency, concurrency = if parallelism in (:multithreading, "multithreading") (Val(:multithreading), :multithreading) elseif parallelism in (:multiprocessing, "multiprocessing") @@ -457,7 +462,7 @@ function equation_search( ) # TODO: Still not type stable. Should be able to pass `Val{return_state}`. - should_return_state = if options.return_state === nothing + _return_state = if options.return_state === nothing return_state === nothing ? false : return_state else @assert( @@ -467,13 +472,23 @@ function equation_search( options.return_state end - _v_dim_out = if dim_out === nothing + v_dim_out = if DIM_OUT === nothing length(datasets) > 1 ? Val(2) : Val(1) else - Val(dim_out) + Val(DIM_OUT) + end + _numprocs::Int = if numprocs === nothing && procs === nothing + 4 + elseif numprocs !== nothing && procs === nothing + numprocs + elseif numprocs === nothing && procs !== nothing + length(procs) + else + @assert length(procs) == numprocs + numprocs end - verbosity = if verbosity === nothing && options.verbosity === nothing + _verbosity = if verbosity === nothing && options.verbosity === nothing 1 elseif verbosity === nothing options.verbosity @@ -483,9 +498,10 @@ function equation_search( error( "You cannot set `verbosity` in both the search parameters `Options` and the call to `equation_search`.", ) + 1 end - progress = if progress === nothing && options.progress === nothing - (verbosity > 0) && length(datasets) == 1 + _progress = if progress === nothing && options.progress === nothing + (_verbosity > 0) && length(datasets) == 1 elseif progress === nothing options.progress elseif options.progress === nothing @@ -494,63 +510,72 @@ function equation_search( error( "You cannot set `progress` in both the search parameters `Options` and the call to `equation_search`.", ) + false end - if progress + if _progress @assert( length(datasets) == 1, "You cannot display a progress bar for multi-output searches." ) - @assert(verbosity > 0, "You cannot display a progress bar with `verbosity=0`.") + @assert(_verbosity > 0, "You cannot display a progress bar with `verbosity=0`.") end + _addprocs_function = addprocs_function === nothing ? addprocs : addprocs_function + + exeflags = if VERSION >= v"1.9" && concurrency == :multiprocessing + heap_size_hint_in_megabytes = floor( + Int, ( + if heap_size_hint_in_bytes === nothing + (Sys.free_memory() / _numprocs) + else + heap_size_hint_in_bytes + end + ) / 1024^2 + ) + _verbosity > 0 && + heap_size_hint_in_bytes === nothing && + @info "Automatically setting `--heap-size-hint=$(heap_size_hint_in_megabytes)M` on each Julia process. You can configure this with the `heap_size_hint_in_bytes` parameter." + + `--heap-size=$(heap_size_hint_in_megabytes)M` + else + `` + end + + # Underscores here mean that we have mutated the variable return _equation_search( v_concurrency, - _v_dim_out, + v_dim_out, datasets, niterations, options, - numprocs, + _numprocs, procs, - addprocs_function, + _addprocs_function, + exeflags, runtests, saved_state, - verbosity, - progress, - Val(should_return_state), + _verbosity, + _progress, + Val(_return_state), ) end function _equation_search( - ::Val{parallelism}, - ::Val{dim_out}, + ::Val{PARALLELISM}, + ::Val{DIM_OUT}, datasets::Vector{D}, niterations::Int, options::Options, - numprocs::Union{Int,Nothing}, + numprocs::Int, procs::Union{Vector{Int},Nothing}, - addprocs_function::Union{Function,Nothing}, + addprocs_function::Function, + exeflags::Cmd, runtests::Bool, saved_state, verbosity, progress, - ::Val{should_return_state}, -) where {T<:DATA_TYPE,L<:LOSS_TYPE,D<:Dataset{T,L},parallelism,should_return_state,dim_out} - if options.deterministic - if parallelism != :serial - error("Determinism is only guaranteed for serial mode.") - end - end - if parallelism == :multithreading - if Threads.nthreads() == 1 - @warn "You are using multithreading mode, but only one thread is available. Try starting julia with `--threads=auto`." - end - end - if any(d -> d.X_units !== nothing || d.y_units !== nothing, datasets) - if options.dimensional_constraint_penalty === nothing && saved_state === nothing - @warn "You are using dimensional constraints, but `dimensional_constraint_penalty` was not set. The default penalty of `1000.0` will be used." - end - end - + ::Val{RETURN_STATE}, +) where {T<:DATA_TYPE,L<:LOSS_TYPE,D<:Dataset{T,L},PARALLELISM,RETURN_STATE,DIM_OUT} stdin_reader = watch_stream(stdin) if options.define_helper_functions @@ -559,11 +584,10 @@ function _equation_search( example_dataset = datasets[1] nout = size(datasets, 1) - @assert (nout == 1 || dim_out == 2) + @assert (nout == 1 || DIM_OUT == 2) if runtests - test_option_configuration(T, options) - # Testing the first output variable is the same: + test_option_configuration(PARALLELISM, datasets, saved_state, options) test_dataset_configuration(example_dataset, options, verbosity) end @@ -576,45 +600,38 @@ function _equation_search( end # Start a population on every process # Store the population, hall of fame - allPopsType = if parallelism == :serial - Tuple{Population,HallOfFame,RecordType,Float64} - elseif parallelism == :multiprocessing + WorkerOutputType = if PARALLELISM == :serial + Tuple{Population{T,L},HallOfFame{T,L},RecordType,Float64} + elseif PARALLELISM == :multiprocessing Future else Task end - allPops = [allPopsType[] for j in 1:nout] - init_pops = [allPopsType[] for j in 1:nout] + # Persistent storage of last-saved population for final return: + returnPops = init_dummy_pops(options.populations, datasets, options) + # Best 10 members from each population for migration: + bestSubPops = init_dummy_pops(options.populations, datasets, options) + + # Pointers to populations on each worker: + worker_output = [WorkerOutputType[] for j in 1:nout] + # Initialize storage for workers tasks = [Task[] for j in 1:nout] # Set up a channel to send finished populations back to head node - channels = if parallelism == :multiprocessing + channels = if PARALLELISM == :multiprocessing [[RemoteChannel(1) for i in 1:(options.populations)] for j in 1:nout] else [[Channel(1) for i in 1:(options.populations)] for j in 1:nout] # (Unused for :serial) end - # This is a recorder for populations, but is not actually used for processing, just - # for the final return. - returnPops = init_dummy_pops(options.populations, datasets, options) - # These initial populations are discarded: - bestSubPops = init_dummy_pops(options.populations, datasets, options) - - actualMaxsize = options.maxsize + MAX_DEGREE - # TODO: Should really be one per population too. all_running_search_statistics = [ RunningSearchStatistics(; options=options) for j in 1:nout ] - record = RecordType("options" => "$(options)") - - curmaxsizes = if iszero(options.warmup_maxsize_by) - fill(options.maxsize, nout) - else - fill(convert(typeof(options.maxsize), 3), nout) - end + record = RecordType() + @recorder record["options"] = "$(options)" # Records the number of evaluations: # Real numbers indicate use of batching. @@ -624,37 +641,22 @@ function _equation_search( ########################################################################## ### Distributed code: ########################################################################## - if parallelism == :multiprocessing - if addprocs_function === nothing - addprocs_function = addprocs - end - if numprocs === nothing && procs === nothing - numprocs = 4 - procs = addprocs_function(numprocs; lazy=false) - we_created_procs = true - elseif numprocs === nothing - numprocs = length(procs) - elseif procs === nothing - procs = addprocs_function(numprocs; lazy=false) - we_created_procs = true - end - - if we_created_procs - project_path = splitdir(Pkg.project().path)[1] - activate_env_on_workers(procs, project_path, options, verbosity) - import_module_on_workers(procs, @__FILE__, options, verbosity) - end - move_functions_to_workers(procs, options, example_dataset, verbosity) - if runtests - test_module_on_workers(procs, options, verbosity) - end - - if runtests - test_entire_pipeline(procs, example_dataset, options, verbosity) - end + if PARALLELISM == :multiprocessing + (procs, we_created_procs) = configure_workers(; + procs, + numprocs, + addprocs_function, + options, + project_path=splitdir(Pkg.project().path)[1], + file=@__FILE__, + exeflags, + verbosity, + example_dataset, + runtests, + ) end # Get the next worker process to give a job: - worker_assignment = Dict{Tuple{Int,Int},Int}() + worker_assignment = initialize_worker_assignment() hallOfFame = load_saved_hall_of_fame(saved_state) hallOfFame = if hallOfFame === nothing @@ -675,106 +677,101 @@ function _equation_search( hallOfFame::Vector{HallOfFame{T,L}} for j in 1:nout, i in 1:(options.populations) - worker_idx = next_worker(worker_assignment, procs) - if parallelism == :multiprocessing - worker_assignment[(j, i)] = worker_idx - end - + worker_idx = assign_next_worker!( + worker_assignment; out=j, pop=i, parallelism=PARALLELISM, procs + ) saved_pop = load_saved_population(saved_state; out=j, pop=i) - if saved_pop !== nothing && length(saved_pop.members) == options.population_size - saved_pop::Population{T,L} - ## Update losses: - for member in saved_pop.members - score, result_loss = score_func(datasets[j], member, options) - member.score = score - member.loss = result_loss - end - copy_pop = copy_population(saved_pop) - new_pop = @sr_spawner parallelism worker_idx ( - copy_pop, HallOfFame(options, T, L), RecordType(), 0.0 - ) - else - if saved_pop !== nothing - @warn "Recreating population (output=$(j), population=$(i)), as the saved one doesn't have the correct number of members." + new_pop = + if saved_pop !== nothing && length(saved_pop.members) == options.population_size + saved_pop::Population{T,L} + ## Update losses: + for member in saved_pop.members + score, result_loss = score_func(datasets[j], member, options) + member.score = score + member.loss = result_loss + end + copy_pop = copy(saved_pop) + @sr_spawner( + begin + (copy_pop, HallOfFame(options, T, L), RecordType(), 0.0) + end, + parallelism = PARALLELISM, + worker_idx = worker_idx + ) + else + if saved_pop !== nothing + @warn "Recreating population (output=$(j), population=$(i)), as the saved one doesn't have the correct number of members." + end + @sr_spawner( + begin + ( + Population( + datasets[j]; + population_size=options.population_size, + nlength=3, + options=options, + nfeatures=datasets[j].nfeatures, + ), + HallOfFame(options, T, L), + RecordType(), + Float64(options.population_size), + ) + end, + parallelism = PARALLELISM, + worker_idx = worker_idx + ) + # This involves population_size evaluations, on the full dataset: end - new_pop = @sr_spawner parallelism worker_idx ( - Population( - datasets[j]; - population_size=options.population_size, - nlength=3, - options=options, - nfeatures=datasets[j].nfeatures, - ), - HallOfFame(options, T, L), - RecordType(), - Float64(options.population_size), - ) - # This involves population_size evaluations, on the full dataset: - end - push!(init_pops[j], new_pop) + push!(worker_output[j], new_pop) end + total_cycles = options.populations * niterations + cycles_remaining = [total_cycles for j in 1:nout] + curmaxsizes = [ + get_cur_maxsize(; options, total_cycles, cycles_remaining=cycles_remaining[j]) for + j in 1:nout + ] # 2. Start the cycle on every process: for j in 1:nout, i in 1:(options.populations) dataset = datasets[j] running_search_statistics = all_running_search_statistics[j] curmaxsize = curmaxsizes[j] @recorder record["out$(j)_pop$(i)"] = RecordType() - worker_idx = next_worker(worker_assignment, procs) - if parallelism == :multiprocessing - worker_assignment[(j, i)] = worker_idx - end + worker_idx = assign_next_worker!( + worker_assignment; out=j, pop=i, parallelism=PARALLELISM, procs + ) # TODO - why is this needed?? # Multi-threaded doesn't like to fetch within a new task: c_rss = deepcopy(running_search_statistics) - updated_pop = @sr_spawner parallelism worker_idx let - in_pop = if parallelism in (:multiprocessing, :multithreading) - fetch(init_pops[j][i])[1] - else - init_pops[j][i][1] - end - - cur_record = RecordType() - @recorder cur_record["out$(j)_pop$(i)"] = RecordType( - "iteration0" => record_population(in_pop, options) - ) - tmp_num_evals = 0.0 - normalize_frequencies!(c_rss) - tmp_pop, tmp_best_seen, evals_from_cycle = s_r_cycle( - dataset, - in_pop, - options.ncycles_per_iteration, - curmaxsize, - c_rss; - verbosity=verbosity, - options=options, - record=cur_record, - ) - tmp_num_evals += evals_from_cycle - tmp_pop, evals_from_optimize = optimize_and_simplify_population( - dataset, tmp_pop, options, curmaxsize, cur_record - ) - tmp_num_evals += evals_from_optimize - if options.batching - for i_member in 1:(options.maxsize + MAX_DEGREE) - score, result_loss = score_func( - dataset, tmp_best_seen.members[i_member], options - ) - tmp_best_seen.members[i_member].score = score - tmp_best_seen.members[i_member].loss = result_loss - tmp_num_evals += 1 + last_pop = worker_output[j][i] + updated_pop = @sr_spawner( + begin + in_pop = if PARALLELISM in (:multiprocessing, :multithreading) + fetch(last_pop)[1] + else + last_pop[1] end - end - (tmp_pop, tmp_best_seen, cur_record, tmp_num_evals) - end - push!(allPops[j], updated_pop) + _dispatch_s_r_cycle(; + pop=i, + out=j, + iteration=0, + dataset, + options, + verbosity, + in_pop, + curmaxsize, + running_search_statistics=c_rss, + ) + end, + parallelism = PARALLELISM, + worker_idx = worker_idx + ) + worker_output[j][i] = updated_pop end - debug(verbosity > 0, "Started!") + verbosity > 0 && @info "Started!" start_time = time() - total_cycles = options.populations * niterations - cycles_remaining = [total_cycles for j in 1:nout] if progress #TODO: need to iterate this on the max cycles remaining! sum_cycle_remaining = sum(cycles_remaining) @@ -790,10 +787,10 @@ function _equation_search( print_every_n_seconds = 5 equation_speed = Float32[] - if parallelism in (:multiprocessing, :multithreading) + if PARALLELISM in (:multiprocessing, :multithreading) for j in 1:nout, i in 1:(options.populations) # Start listening for each population to finish: - t = @async put!(channels[j][i], fetch(allPops[j][i])) + t = @async put!(channels[j][i], fetch(worker_output[j][i])) push!(tasks[j], t) end end @@ -818,14 +815,14 @@ function _equation_search( j, i = all_idx[kappa] # Check if error on population: - if parallelism in (:multiprocessing, :multithreading) + if PARALLELISM in (:multiprocessing, :multithreading) if istaskfailed(tasks[j][i]) fetch(tasks[j][i]) error("Task failed for population") end end # Non-blocking check if a population is ready: - population_ready = if parallelism in (:multiprocessing, :multithreading) + population_ready = if PARALLELISM in (:multiprocessing, :multithreading) # TODO: Implement type assertions based on parallelism. isready(channels[j][i]) else @@ -838,52 +835,30 @@ function _equation_search( start_work_monitor!(resource_monitor) # Take the fetch operation from the channel since its ready (cur_pop, best_seen, cur_record, cur_num_evals) = - if parallelism in (:multiprocessing, :multithreading) + if PARALLELISM in (:multiprocessing, :multithreading) take!(channels[j][i]) else - allPops[j][i] + worker_output[j][i] end - cur_pop::Population - best_seen::HallOfFame + cur_pop::Population{T,L} + best_seen::HallOfFame{T,L} cur_record::RecordType cur_num_evals::Float64 - returnPops[j][i] = copy_population(cur_pop) + returnPops[j][i] = copy(cur_pop) bestSubPops[j][i] = best_sub_pop(cur_pop; topn=options.topn) @recorder record = recursive_merge(record, cur_record) num_evals[j][i] += cur_num_evals - dataset = datasets[j] curmaxsize = curmaxsizes[j] - #Try normal copy... - bestPops = Population([ - member for pop in bestSubPops[j] for member in pop.members - ]) - - ################################################################### - # Hall Of Fame updating ########################################### - for (i_member, member) in enumerate( - Iterators.flatten((cur_pop.members, best_seen.members[best_seen.exists])) - ) - part_of_cur_pop = i_member <= length(cur_pop.members) + for member in cur_pop.members size = compute_complexity(member, options) - - if part_of_cur_pop - update_frequencies!(all_running_search_statistics[j]; size=size) - end - actualMaxsize = options.maxsize + MAX_DEGREE - - valid_size = 0 < size < actualMaxsize - if valid_size - already_filled = hallOfFame[j].exists[size] - better_than_current = member.score < hallOfFame[j].members[size].score - if !already_filled || better_than_current - hallOfFame[j].members[size] = copy_pop_member(member) - hallOfFame[j].exists[size] = true - end - end + update_frequencies!(all_running_search_statistics[j]; size) end - ################################################################### + #! format: off + update_hall_of_fame!(hallOfFame[j], cur_pop.members, options) + update_hall_of_fame!(hallOfFame[j], best_seen.members[best_seen.exists], options) + #! format: on # Dominating pareto curve - must be better than all simpler equations dominating = calculate_pareto_frontier(hallOfFame[j]) @@ -910,8 +885,11 @@ function _equation_search( ################################################################### # Migration ####################################################### if options.migration + best_of_each = Population([ + member for pop in bestSubPops[j] for member in pop.members + ]) migrate!( - bestPops.members => cur_pop, options; frac=options.fraction_replaced + best_of_each.members => cur_pop, options; frac=options.fraction_replaced ) end if options.hof_migration && length(dominating) > 0 @@ -923,77 +901,42 @@ function _equation_search( if cycles_remaining[j] == 0 break end - worker_idx = next_worker(worker_assignment, procs) - if parallelism == :multiprocessing - worker_assignment[(j, i)] = worker_idx - end - @recorder begin + worker_idx = assign_next_worker!( + worker_assignment; out=j, pop=i, parallelism=PARALLELISM, procs + ) + iteration = if is_recording(options) key = "out$(j)_pop$(i)" - iteration = find_iteration_from_record(key, record) + 1 + find_iteration_from_record(key, record) + 1 + else + 0 end c_rss = deepcopy(all_running_search_statistics[j]) - c_cur_pop = copy_population(cur_pop) - allPops[j][i] = @sr_spawner parallelism worker_idx let - cur_record = RecordType() - @recorder cur_record[key] = RecordType( - "iteration$(iteration)" => record_population(c_cur_pop, options) - ) - tmp_num_evals = 0.0 - normalize_frequencies!(c_rss) - # TODO: Could the dataset objects themselves be modified during the search?? - # Perhaps inside the evaluation kernels? - # It shouldn't be too expensive to copy the dataset. - tmp_pop, tmp_best_seen, evals_from_cycle = s_r_cycle( - dataset, - c_cur_pop, - options.ncycles_per_iteration, - curmaxsize, - c_rss; - verbosity=verbosity, - options=options, - record=cur_record, - ) - tmp_num_evals += evals_from_cycle - tmp_pop, evals_from_optimize = optimize_and_simplify_population( - dataset, tmp_pop, options, curmaxsize, cur_record - ) - tmp_num_evals += evals_from_optimize - - # Update scores if using batching: - if options.batching - for i_member in 1:(options.maxsize + MAX_DEGREE) - if tmp_best_seen.exists[i_member] - score, result_loss = score_func( - dataset, tmp_best_seen.members[i_member], options - ) - tmp_best_seen.members[i_member].score = score - tmp_best_seen.members[i_member].loss = result_loss - tmp_num_evals += 1 - end - end - end - - (tmp_pop, tmp_best_seen, cur_record, tmp_num_evals) - end - if parallelism in (:multiprocessing, :multithreading) - tasks[j][i] = @async put!(channels[j][i], fetch(allPops[j][i])) + in_pop = copy(cur_pop) + worker_output[j][i] = @sr_spawner( + begin + _dispatch_s_r_cycle(; + pop=i, + out=j, + iteration, + dataset, + options, + verbosity, + in_pop, + curmaxsize, + running_search_statistics=c_rss, + ) + end, + parallelism = PARALLELISM, + worker_idx = worker_idx + ) + if PARALLELISM in (:multiprocessing, :multithreading) + tasks[j][i] = @async put!(channels[j][i], fetch(worker_output[j][i])) end - cycles_elapsed = total_cycles - cycles_remaining[j] - if options.warmup_maxsize_by > 0 - fraction_elapsed = 1.0f0 * cycles_elapsed / total_cycles - if fraction_elapsed > options.warmup_maxsize_by - curmaxsizes[j] = options.maxsize - else - curmaxsizes[j] = - 3 + floor( - Int, - (options.maxsize - 3) * fraction_elapsed / - options.warmup_maxsize_by, - ) - end - end + curmaxsizes[j] = get_cur_maxsize(; + options, total_cycles, cycles_remaining=cycles_remaining[j] + ) stop_work_monitor!(resource_monitor) move_window!(all_running_search_statistics[j]) if progress @@ -1005,7 +948,7 @@ function _equation_search( options, equation_speed, head_node_occupation, - parallelism, + PARALLELISM, ) end end @@ -1045,7 +988,7 @@ function _equation_search( total_cycles, cycles_remaining, head_node_occupation, - parallelism, + parallelism=PARALLELISM, width=options.terminal_width, ) end @@ -1069,11 +1012,11 @@ function _equation_search( close_reader!(stdin_reader) # Safely close all processes or threads - if parallelism == :multiprocessing + if PARALLELISM == :multiprocessing we_created_procs && rmprocs(procs) - elseif parallelism == :multithreading + elseif PARALLELISM == :multithreading for j in 1:nout, i in 1:(options.populations) - wait(allPops[j][i]) + wait(worker_output[j][i]) end end @@ -1083,11 +1026,54 @@ function _equation_search( @recorder json3_write(record, options.recorder_file) - if should_return_state - return (returnPops, (dim_out == 1 ? only(hallOfFame) : hallOfFame)) + if RETURN_STATE + return (returnPops, (DIM_OUT == 1 ? only(hallOfFame) : hallOfFame)) else - return (dim_out == 1 ? only(hallOfFame) : hallOfFame) + return (DIM_OUT == 1 ? only(hallOfFame) : hallOfFame) + end +end + +function _dispatch_s_r_cycle(; + pop::Int, + out::Int, + iteration::Int, + dataset::Dataset, + options::Options, + verbosity, + in_pop::Population, + curmaxsize::Int, + running_search_statistics, +) + record = RecordType() + @recorder record["out$(out)_pop$(pop)"] = RecordType( + "iteration$(iteration)" => record_population(in_pop, options) + ) + num_evals = 0.0 + normalize_frequencies!(running_search_statistics) + out_pop, best_seen, evals_from_cycle = s_r_cycle( + dataset, + in_pop, + options.ncycles_per_iteration, + curmaxsize, + running_search_statistics; + verbosity=verbosity, + options=options, + record=record, + ) + num_evals += evals_from_cycle + out_pop, evals_from_optimize = optimize_and_simplify_population( + dataset, out_pop, options, curmaxsize, record + ) + num_evals += evals_from_optimize + if options.batching + for i_member in 1:(options.maxsize + MAX_DEGREE) + score, result_loss = score_func(dataset, best_seen.members[i_member], options) + best_seen.members[i_member].score = score + best_seen.members[i_member].loss = result_loss + num_evals += 1 + end end + return (out_pop, best_seen, record, num_evals) end include("MLJInterface.jl") diff --git a/src/Utils.jl b/src/Utils.jl index 4ec2f1b2f..6021f6eb8 100644 --- a/src/Utils.jl +++ b/src/Utils.jl @@ -4,18 +4,6 @@ module UtilsModule import Printf: @printf import MacroTools: splitdef, combinedef -function debug(verbosity, string...) - if verbosity > 0 - println(string...) - end -end - -function debug_inline(verbosity, string...) - if verbosity > 0 - print(string...) - end -end - const pseudo_time = Ref(0) function get_birth_order(; deterministic=false)::Int @@ -42,6 +30,8 @@ recursive_merge(x::AbstractDict...) = merge(recursive_merge, x...) recursive_merge(x...) = x[end] recursive_merge() = error("Unexpected input.") +get_base_type(::Type{Complex{BT}}) where {BT} = BT + const subscripts = ('₀', '₁', '₂', '₃', '₄', '₅', '₆', '₇', '₈', '₉') function subscriptify(number::Integer) return join([subscripts[i + 1] for i in reverse(digits(number))])