Skip to content

Commit

Permalink
IPF performance (#542)
Browse files Browse the repository at this point in the history
* Updates pyarrow version

* IPF benchmarking

* embed ipynb

---------

Co-authored-by: pveigadecamargo <[email protected]>
Co-authored-by: Renata Imai <[email protected]>
  • Loading branch information
3 people authored Jun 21, 2024
1 parent 68a74fd commit a7339be
Show file tree
Hide file tree
Showing 10 changed files with 1,193 additions and 141 deletions.
1 change: 1 addition & 0 deletions .github/workflows/documentation.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ jobs:
- name: Build documentation
run: |
jupyter nbconvert --to rst docs/source/validation_benchmarking/IPF_benchmark.ipynb
sphinx-build -M latexpdf docs/source docs/source/_static
sphinx-build -b html docs/source docs/build
python3 -m zipfile -c AequilibraE.zip docs/build
Expand Down
144 changes: 131 additions & 13 deletions aequilibrae/distribution/ipf_core.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,13 @@ from cython.parallel import parallel, prange
from libc.stdlib cimport malloc, free



def ipf_core(seed_matrix, target_productions, target_attractions, max_iterations=200, tolerance=0.001, cores = 0):
def ipf_core(seed_matrix, target_productions, target_attractions, max_iterations=200, tolerance=0.001, cores = 0, warn=True):
assert target_attractions.shape[0] == seed_matrix.shape[1]
assert target_productions.shape[0] == seed_matrix.shape[0]

cdef int max_iter = max_iterations
cdef double toler = tolerance
cdef int cpus = cores

assert target_attractions.shape[0] == seed_matrix.shape[1]
assert target_productions.shape[0] == seed_matrix.shape[0]
cdef int cpus = 0

if cores < 0:
cpus = max(1, mp.cpu_count() + cores)
Expand All @@ -24,11 +22,11 @@ def ipf_core(seed_matrix, target_productions, target_attractions, max_iterations
elif cores > 0:
cpus = min(mp.cpu_count(), cores)

mat_prod_tot = np.zeros_like(target_productions)
factor_prod = np.zeros_like(target_productions)
mat_prod_tot = np.zeros_like(target_productions, np.float64)
factor_prod = np.zeros_like(target_productions, np.float64)

mat_attr_tot = np.zeros_like(target_attractions)
factor_attr = np.zeros_like(target_attractions)
mat_attr_tot = np.zeros_like(target_attractions, np.float64)
factor_attr = np.zeros_like(target_attractions, np.float64)

cdef double [:] prod_tot = mat_prod_tot
cdef double [:] prod_tgt = target_productions
Expand All @@ -37,13 +35,28 @@ def ipf_core(seed_matrix, target_productions, target_attractions, max_iterations
cdef double [:] attr_tot = mat_attr_tot
cdef double [:] attr_tgt = target_attractions
cdef double [:] attr_factor = factor_attr
cdef double [:, :] flows = seed_matrix

iter, err = _fratar(flows, prod_tot, prod_tgt, prod_factor, attr_tot, attr_tgt, attr_factor, max_iter, toler, cpus)
if err > tolerance:
if seed_matrix.dtype == np.float32:
iter, err = _ipf_core32(seed_matrix, prod_tot, prod_tgt, prod_factor, attr_tot, attr_tgt, attr_factor, max_iter, toler, cpus)
else:
iter, err = _ipf_core64(seed_matrix, prod_tot, prod_tgt, prod_factor, attr_tot, attr_tgt, attr_factor, max_iter, toler, cpus)

if err > tolerance and warn:
warnings.warn(f"Could not reach convergence in {iter} iterations: {err}")
return iter, err

cdef _ipf_core64(seed_matrix, prod_tot, prod_tgt, prod_factor, attr_tot, attr_tgt, attr_factor, max_iter, toler, cpus):

cdef double [:, :] flows = seed_matrix
return _fratar(flows, prod_tot, prod_tgt, prod_factor, attr_tot, attr_tgt, attr_factor, max_iter, toler, cpus)


cdef _ipf_core32(seed_matrix, prod_tot, prod_tgt, prod_factor, attr_tot, attr_tgt, attr_factor, max_iter, toler, cpus):

cdef float [:, :] flows_single = seed_matrix
return _fratar_sing(flows_single, prod_tot, prod_tgt, prod_factor, attr_tot, attr_tgt, attr_factor, max_iter, toler, cpus)


@cython.wraparound(False)
@cython.embedsignature(True)
@cython.boundscheck(False)
Expand Down Expand Up @@ -192,3 +205,108 @@ cpdef double _calc_err(double[:] p_factor,
err = err if err > 1 / a_factor[j] else 1 / a_factor[j]
err = err if err > a_factor[j] else a_factor[j]
return err


@cython.wraparound(False)
@cython.embedsignature(True)
@cython.boundscheck(False)
cdef _fratar_sing(float[:, :] flows,
double[:] prod_tot,
double[:] prod_tgt,
double[:] prod_factor,
double[:] attr_tot,
double[:] attr_tgt,
double[:] attr_factor,
int max_iter,
double toler,
int cpus) noexcept:

cdef double err = 1.0
cdef int iter = 0
cdef long long i, j
cdef long long I = flows.shape[0]
cdef long long J = flows.shape[1]

# we zero everyone that needs to be zero to be able to skip them in the future
for i in prange(I, nogil=True):
for j in range(J):
if prod_tgt[i] + attr_tgt[j] == 0:
flows[i, j] = 0

for iter in range(max_iter):
_total_prods_sing(flows, prod_tgt, prod_tot, cpus)
err = _factors(prod_tgt, prod_tot, prod_factor, cpus)
for i in prange(I, nogil=True, num_threads=cpus):
if prod_tgt[i] > 0:
for j in range(J):
flows[i, j] = flows[i, j] * prod_factor[i]


_total_attra_sing(flows, prod_tgt, attr_tot, cpus)
err = _factors(attr_tgt, attr_tot, attr_factor, cpus)
for i in prange(I, nogil=True, num_threads=cpus):
if prod_tgt[i] > 0:
for j in range(J):
flows[i, j] = flows[i, j] * attr_factor[j]

# FUNCTION TO COMPUTE THE ERROR
err = _calc_err(prod_factor, attr_factor)
if (err - 1) < toler:
break

return iter, err - 1


@cython.wraparound(False)
@cython.embedsignature(True)
@cython.boundscheck(False)
cpdef void _total_attra_sing(float[:, :] flows,
double[:] prod_tgt,
double[:] attr_tot,
int cpus) noexcept:

cdef long long i, j, jk
cdef double *local_buf
cdef long long I = flows.shape[0]
cdef long long J = flows.shape[1]

# Computes factors
with nogil, parallel( num_threads=cpus):
local_buf = <double *> malloc(sizeof(double) * J)
for j in range(J):
local_buf[j] = 0
attr_tot[j] = 0

for i in prange(I):
if prod_tgt[i] == 0:
continue
for jk in range(J):
# for jk in array_of_indices_of_non_zeros:
local_buf[jk] += flows[i, jk]

with gil:
for j in range(J):
attr_tot[j] += local_buf[j]

free(local_buf)


@cython.wraparound(False)
@cython.embedsignature(True)
@cython.boundscheck(False)
cpdef void _total_prods_sing(float[:, :] flows,
double[:] prod_tgt,
double[:] prod_tot,
int cpus) noexcept nogil:

cdef long long i, j
cdef long long I = flows.shape[0]
cdef long long J = flows.shape[1]

# Calculate the row totals (prods) from the flows
for i in prange(I, num_threads=cpus):
prod_tot[i] = 0
if prod_tgt[i] == 0:
continue
for j in range(J):
prod_tot[i] += flows[i, j]
3 changes: 2 additions & 1 deletion docs/requirements-docs.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@ keplergl
sphinx-gallery
sphinx-copybutton
sphinx-design
sphinx-git
sphinx-git
nbconvert
Binary file added docs/source/images/ipf_runtime_32vs64bits.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified docs/source/images/ipf_runtime_aequilibrae_vs_benchmark.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified docs/source/images/ipf_runtime_vs_num_cores.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion docs/source/validation.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ of Traffic Assignment, linked below.
:maxdepth: 1

validation_benchmarking/traffic_assignment
validation_benchmarking/ipf_performance
validation_benchmarking/IPF_benchmark
Loading

0 comments on commit a7339be

Please sign in to comment.