Skip to content

Commit

Permalink
Merge pull request #4 from pik-gane/parallelization
Browse files Browse the repository at this point in the history
Parallelization
  • Loading branch information
JacobLoe authored Mar 4, 2022
2 parents c4e49b3 + d552c1d commit 6802632
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 42 deletions.
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@ __pycache__/
*.py[cod]
*$py.class

# ignore bbo output
/*/smac3*
/*/outcmaes
.idea

# C extensions
*.so

Expand Down
Binary file added src/plot_simulation_output.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added src/plot_time_for_evaluation.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
54 changes: 40 additions & 14 deletions src/pyoptes/optimization/budget_allocation/target_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

import numpy as np
from ...epidemiological_models.si_model_on_transmissions import SIModelOnTransmissions
from functools import partial
from multiprocessing import cpu_count, Pool

global model, capacities, network

Expand Down Expand Up @@ -125,7 +127,7 @@ def prepare(use_real_data=False,
stopping_delay = 1,
verbose = False,
)


def get_n_inputs():
"""Get the length of the input vector needed for evaluate(),
Expand All @@ -134,10 +136,34 @@ def get_n_inputs():
return model.n_nodes


def simulate_infection():
model.reset()
# run until detection:
model.run()
# return a vector of bools stating which farms are infected at the end:
return model.is_infected[model.t, :]


def n_infected_animals(is_infected):
return np.sum(capacities * is_infected)


def mean_square_and_stderr(n_infected_animals):
values = n_infected_animals**2
estimate = np.mean(values, axis=0)
stderr = np.std(values, ddof=1, axis=0) / np.sqrt(values.shape[0])
return estimate, stderr


def task(unused_simulation_index, aggregation):
return aggregation(simulate_infection())


def evaluate(budget_allocation,
n_simulations=1,
statistic=np.mean # any function converting an array into a number or tuple of numbers
):
n_simulations=1,
aggregation=n_infected_animals,
statistic=mean_square_and_stderr,
parallel=False):
"""Run the SIModelOnTransmissions a single time, using the given budget
allocation, and return the number of nodes infected at the time the
simulation is stopped. Since the simulated process is a stochastic
Expand All @@ -151,7 +177,9 @@ def evaluate(budget_allocation,
- lambda a: np.percentile(a, 95)
- lambda a: np.mean(a**2)
@param budget_allocation: (array of floats) expected number of tests per
@param aggregation: any function converting an array of infection bools into an aggregated "damage"
@param parallel: (bool) Sets whether the simulations runs are computed in parallel. Default is set to True.
@param budget_allocation: (array of floats) expected number of tests per
year, indexed by node
@param n_simulations: (optional int) number of epidemic simulation runs the
evaluation should be based on (default: 1)
Expand All @@ -168,15 +196,13 @@ def evaluate(budget_allocation,
assert budget_allocation.size == model.daily_test_probabilities.size

model.daily_test_probabilities = budget_allocation / 365

n_infected_when_stopped = np.zeros(n_simulations)
for sim in range(n_simulations):
model.reset()
# run until detection:
model.run()
# store simulation result:
n_infected_when_stopped[sim] = (capacities * model.is_infected[model.t,:]).sum()

if parallel:
with Pool(cpu_count()) as pool:
results = pool.map(partial(task, aggregation=aggregation), range(n_simulations))
else:
results = [task(sim, aggregation) for sim in range(n_simulations)]

# return the requested statistic:
return statistic(n_infected_when_stopped)
return statistic(np.array(results))
# Note: other indicators are available via target_function.model
73 changes: 73 additions & 0 deletions src/test_parallelization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
"""
Test comparing the performance (time and simulation output) of the target function with and without parallelization.
"""

from time import time
import numpy as np
import pylab as plt
from pyoptes import set_seed
from pyoptes.optimization.budget_allocation import target_function as fp
from tqdm import tqdm

set_seed(2)

n_simulations = 10000
n_nodes = 120

fp.prepare(
n_nodes=n_nodes, # instead of 60000, since this should suffice in the beginning
capacity_distribution=np.random.lognormal, # this is more realistic than a uniform distribution
delta_t_symptoms=60, # instead of 30, since this gave a clearer picture in Sara's simulations
parallel=True)

total_budget = 1.0 * n_nodes

# init test strategy
weights = np.random.rand(n_nodes)
shares = weights / weights.sum()
x = shares * total_budget

a = time()
# y = np.mean([fp.evaluate(x, n_simulations=n_simulations, parallel=False) for _ in range(100)])
y = fp.evaluate(x, n_simulations=n_simulations, parallel=False)
b = time()-a
print('Non-parallel simulation')
print(f'Time for {n_simulations} simulations of: {b}')
print(f'y: {y}')


a = time()
# y = np.mean([fp.evaluate(x, n_simulations=n_simulations) for _ in range(100)])
y = fp.evaluate(x, n_simulations=n_simulations, parallel=True)
b = time()-a
print('Parallel simulation')
print(f'Time for {n_simulations} simulations of: {b}')
print(f'y: {y}')

n = [1, 100, 1000, 10000, 100000, 1000000]

tl = []
yl = []

tlp = []
ylp = []
for s in tqdm(n):
a = time()
yl.append(fp.evaluate(x, n_simulations=s, parallel=False))
tl.append(time() - a)

a = time()
ylp.append(fp.evaluate(x, n_simulations=s, parallel=True))
tlp.append(time() - a)

plt.plot(n, yl, label='y non-parallel')
plt.plot(n, ylp, label='y parallel')
plt.title('Simulation output for 1, 100, 1000, 10000, 1000000 evaluations')
plt.legend()
plt.show()

plt.plot(n, tl, label='time non-parallel')
plt.plot(n, tlp, label='time parallel')
plt.title('Evaluation time for for 1, 100, 1000, 10000, 1000000 evaluations')
plt.legend()
plt.show()
59 changes: 31 additions & 28 deletions src/test_target_function_waxman.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import numpy as np
from scipy.stats import gaussian_kde as kde
import pylab as plt
from progressbar import progressbar

from pyoptes import set_seed
from pyoptes.optimization.budget_allocation import target_function as f

Expand All @@ -28,7 +30,8 @@
f.prepare(
static_network=static_network,
capacity_distribution=np.random.lognormal, # this is more realistic than a uniform distribution
delta_t_symptoms=60
delta_t_symptoms=60,
# parallel=True
)
n_inputs = f.get_n_inputs()
print("n_inputs (=number of network nodes):", n_inputs)
Expand All @@ -47,32 +50,36 @@
#plt.show()

# evaluate f once at that input:
y = f.evaluate(
x,
n_simulations=100,
statistic=lambda a: np.mean(a**2) # to focus on the tail of the distribution
)
ms_est, ms_stderr = f.evaluate(x, n_simulations=100)

rms_est = np.sqrt(ms_est)
rms_stderr = ms_stderr / (2 * np.sqrt(ms_est)) # std.err. of the square root of ms_est = std.err. of ms_est * derivative of the square root function

print("\nOne evaluation at random x:", rms_est, rms_stderr)

print("\nOne evaluation at random x:", y)
# for Malte:

def est_prob_and_stderr(is_infected):
ps = np.mean(is_infected, axis=0)
stderrs = np.sqrt(ps * (1-ps) / is_infected.shape[0])
return (ps, stderrs)

# use a "non-aggregating" aggregation function plus the above statistic, to get node-specific infection probabilities:
ps, stderrs = f.evaluate(x, n_simulations=100, aggregation=lambda a: a, statistic=est_prob_and_stderr)

print("node infection probabilities:", ps)
print("corresponding std.errs.: ", stderrs)

evaluation_parms = {
'n_simulations': 100,
'statistic': np.mean #lambda a: np.percentile(a, 95)
'n_simulations': 100
}

n_trials = 1000

def stderr(a):
return np.std(a, ddof=1) / np.sqrt(np.size(a))


# evaluate f a number of times at the same input:
ys = np.array([f.evaluate(x, **evaluation_parms) for it in range(n_trials)])
logys = np.log(ys+1e-10)
print("Mean and std.err. of", n_trials, "evaluations at the same random x:", ys.mean(), stderr(ys))
print("Mean and std.err. of the log of", n_trials, "evaluations at that x:", logys.mean(), stderr(logys))

ys = np.array([np.sqrt(f.evaluate(x, **evaluation_parms)[0]) for it in progressbar(range(n_trials))])
logys = np.log(ys)
print("Mean and std.dev. of", n_trials, "evaluations at the same random x:", ys.mean(), ys.std())

# do the same for an x that is based on the total capacity of a node:

Expand All @@ -85,11 +92,9 @@ def stderr(a):
nx.draw(waxman, node_color=[[0,0,0,xi/x2max] for xi in x2], pos=pos)
#plt.show()

ys2 = np.array([f.evaluate(x2, **evaluation_parms) for it in range(n_trials)])
logys2 = np.log(ys2+1e-10)
print("\nMean and std.err. of", n_trials, "evaluations at a capacity-based x:", ys2.mean(), stderr(ys2))
print("Mean and std.err. of the log of", n_trials, "evaluations at that x:", logys2.mean(), stderr(logys2))

ys2 = np.array([np.sqrt(f.evaluate(x, **evaluation_parms)[0]) for it in range(n_trials)])
logys2 = np.log(ys2)
print("Mean and std.dev. of", n_trials, "evaluations at the same capacity-based x:", ys2.mean(), ys2.std())

# do the same for an x that is based on the total number of incoming transmissions per node:

Expand All @@ -106,11 +111,9 @@ def stderr(a):
nx.draw(waxman, node_color=[[0,0,0,xi/x3max] for xi in x3], pos=pos)
#plt.show()

ys3 = np.array([f.evaluate(x3, **evaluation_parms) for it in range(n_trials)])
logys3 = np.log(ys3+1e-10)
print("\nMean and std.err. of", n_trials, "evaluations at a transmissions-based x:", ys3.mean(), stderr(ys3))
print("Mean and std.err. of the log of", n_trials, "evaluations at that x:", logys3.mean(), stderr(logys3))

ys3 = np.array([np.sqrt(f.evaluate(x, **evaluation_parms)[0]) for it in range(n_trials)])
logys3 = np.log(ys3)
print("Mean and std.dev. of", n_trials, "evaluations at the same transmissions-based x:", ys3.mean(), ys3.std())

xs = np.linspace(ys3.min(), ys.max())
plt.figure()
Expand Down

0 comments on commit 6802632

Please sign in to comment.