Skip to content

Commit

Permalink
Towards #47
Browse files Browse the repository at this point in the history
Replacing all functions with purrr/furrr
  • Loading branch information
mem48 committed Sep 11, 2024
1 parent 1d82b11 commit baf7b27
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 55 deletions.
5 changes: 0 additions & 5 deletions DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,17 @@ Imports:
collapse,
data.table,
dodgr,
doSNOW,
dplyr,
digest,
foreach,
future,
furrr,
future.apply,
geodist,
httr,
iotools,
stringr,
sf,
parallel,
lubridate,
purrr (>= 1.0),
pbapply,
readr (>= 2.0),
RcppSimdJson,
tidyr,
Expand Down
23 changes: 15 additions & 8 deletions R/atoc_export.R
Original file line number Diff line number Diff line change
Expand Up @@ -564,15 +564,22 @@ duplicate.stop_times_alt <- function(calendar, stop_times, ncores = 1) {
}

if (ncores == 1) {
stop_times.dup <- pbapply::pblapply(stop_times_split, duplicate.stop_times.int)
#stop_times.dup <- pbapply::pblapply(stop_times_split, duplicate.stop_times.int)
stop_times.dup <- purrr::map(stop_times_split, duplicate.stop_times.int, .progress = TRUE)
} else {
cl <- parallel::makeCluster(ncores)
stop_times.dup <- pbapply::pblapply(stop_times_split,
duplicate.stop_times.int,
cl = cl
)
parallel::stopCluster(cl)
rm(cl)
# cl <- parallel::makeCluster(ncores)
# stop_times.dup <- pbapply::pblapply(stop_times_split,
# duplicate.stop_times.int,
# cl = cl
# )
# parallel::stopCluster(cl)
# rm(cl)

future::plan(future::multisession, workers = ncores)
res <- furrr::future_map(.x = stop_times_split,
.f = duplicate.stop_times.int,
.progress = TRUE)
future::plan(future::sequential)
}

stop_times.dup <- dplyr::bind_rows(stop_times.dup)
Expand Down
9 changes: 6 additions & 3 deletions R/atoc_shapes.R
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ ATOC_shapes <- function(gtfs) {
}
}

dp.list <- pbapply::pblapply(dp.list, path_to_sf, verts = verts)
#dp.list <- pbapply::pblapply(dp.list, path_to_sf, verts = verts)
dp.list <- purrr::map(dp.list, path_to_sf, verts = verts, .progress = TRUE)
dp.list <- unname(dp.list)
pairs$geometry <- sf::st_sfc(dp.list, crs = 4326)
rm(dp.list, verts)
Expand All @@ -110,7 +111,8 @@ ATOC_shapes <- function(gtfs) {
}

message(paste0(Sys.time()," Invert routes"))
pairs_opp$geometry <- pbapply::pblapply(pairs_opp$geometry, invert_linestring)
#pairs_opp$geometry <- pbapply::pblapply(pairs_opp$geometry, invert_linestring)
pairs_opp$geometry <- purrr::map(pairs_opp$geometry, invert_linestring, .progress = TRUE)
pairs_opp$geometry <- sf::st_as_sfc(pairs_opp$geometry, crs = 4326)
pairs_opp <- sf::st_as_sf(pairs_opp)
pairs_opp <- pairs_opp[, names(pairs)]
Expand All @@ -135,7 +137,8 @@ ATOC_shapes <- function(gtfs) {

message(paste0(Sys.time()," final formatting"))
rm(graph, pairs)
shape_res <- pbapply::pblapply(st_split, match_lines)
#shape_res <- pbapply::pblapply(st_split, match_lines)
shape_res <- purrr::map(st_split, match_lines, .progress = TRUE)

str5 <- lapply(shape_res, `[[`, 2)
shapes <- lapply(shape_res, `[[`, 1)
Expand Down
26 changes: 17 additions & 9 deletions R/gtfs_interpolate_times.R
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,24 @@ gtfs_interpolate_times <- function(gtfs, ncores = 1){
stop_times <- dplyr::group_split(stop_times)

if(ncores == 1){
stop_times <- pbapply::pblapply(stop_times, stops_interpolate)
#stop_times <- pbapply::pblapply(stop_times, stops_interpolate)
stop_times <- purrr::map(stop_times, stops_interpolate, .progress = TRUE)
} else {
cl <- parallel::makeCluster(ncores)
parallel::clusterEvalQ(cl, {loadNamespace("UK2GTFS")})
stop_times <- pbapply::pblapply(stop_times,
stops_interpolate,
cl = cl
)
parallel::stopCluster(cl)
rm(cl)
# cl <- parallel::makeCluster(ncores)
# parallel::clusterEvalQ(cl, {loadNamespace("UK2GTFS")})
# stop_times <- pbapply::pblapply(stop_times,
# stops_interpolate,
# cl = cl
# )
# parallel::stopCluster(cl)
# rm(cl)

future::plan(future::multisession, workers = ncores)
keep <- furrr::future_map(.x = stop_times,
.f = stops_interpolate,
.progress = TRUE)
future::plan(future::sequential)

}

stop_times <- data.table::rbindlist(stop_times)
Expand Down
7 changes: 6 additions & 1 deletion R/stops_per_week_functions.R
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,12 @@ gtfs_trips_per_zone <- function(gtfs,
res <- dplyr::group_by(stop_times, zone_id)
res <- dplyr::group_split(res)
future::plan(future::multisession)
res <- future.apply::future_lapply(res, internal_trips_per_zone, by_mode, days_tot)
#res <- future.apply::future_lapply(res, internal_trips_per_zone, by_mode, days_tot)
res <- furrr::future_map(.x = res,
.f = internal_trips_per_zone,
by_mode = by_mode,
days_tot = days_tot,
.progress = TRUE)
future::plan(future::sequential)


Expand Down
80 changes: 51 additions & 29 deletions R/transxchange.R
Original file line number Diff line number Diff line change
Expand Up @@ -135,18 +135,28 @@ transxchange2gtfs <- function(path_in,
} else {
message(paste0(Sys.time(), " Importing TransXchange files, multicore"))

pb <- utils::txtProgressBar(max = length(files), style = 3)
progress <- function(n) utils::setTxtProgressBar(pb, n)
opts <- list(progress = progress, preschedule = FALSE)
cl <- parallel::makeCluster(ncores)
doSNOW::registerDoSNOW(cl)
boot <- foreach::foreach(i = seq_len(length(files)), .options.snow = opts)
res_all <- foreach::`%dopar%`(boot, {
UK2GTFS:::transxchange_import_try(files[i],
try_mode = try_mode)
})
parallel::stopCluster(cl)
rm(cl, boot, opts, pb, progress)
future::plan(future::multisession, workers = ncores)
res_all <- furrr::future_map(.x = files,
.f = transxchange_import_try,
run_debug = TRUE,
full_import = FALSE,
try_mode = try_mode,
.progress = TRUE)
future::plan(future::sequential)


# pb <- utils::txtProgressBar(max = length(files), style = 3)
# progress <- function(n) utils::setTxtProgressBar(pb, n)
# opts <- list(progress = progress, preschedule = FALSE)
# cl <- parallel::makeCluster(ncores)
# doSNOW::registerDoSNOW(cl)
# boot <- foreach::foreach(i = seq_len(length(files)), .options.snow = opts)
# res_all <- foreach::`%dopar%`(boot, {
# UK2GTFS:::transxchange_import_try(files[i],
# try_mode = try_mode)
# })
# parallel::stopCluster(cl)
# rm(cl, boot, opts, pb, progress)

res_all_message <- res_all[sapply(res_all, class) == "character"]
res_all <- res_all[sapply(res_all, class) == "list"]
Expand All @@ -173,23 +183,35 @@ transxchange2gtfs <- function(path_in,
message(" ")
message(paste0(Sys.time(), " Converting to GTFS, multicore"))

pb <- utils::txtProgressBar(min = 0, max = length(res_all), style = 3)
progress <- function(n) utils::setTxtProgressBar(pb, n)
opts <- list(progress = progress, preschedule = FALSE)
cl <- parallel::makeCluster(ncores)
doSNOW::registerDoSNOW(cl)
boot <- foreach::foreach(i = seq_len(length(res_all)), .options.snow = opts)
gtfs_all <- foreach::`%dopar%`(boot, {
UK2GTFS:::transxchange_export_try(res_all[[i]],
cal = cal,
naptan = naptan_trim,
scotland = scotland,
try_mode = try_mode)
# setTxtProgressBar(pb, i)
})

parallel::stopCluster(cl)
rm(cl, boot, opts, pb, progress)
future::plan(future::multisession, workers = ncores)
gtfs_all <- furrr::future_map(.x = res_all,
.f = transxchange_export_try,
run_debug = TRUE,
cal = cal,
naptan = naptan,
scotland = scotland,
try_mode = try_mode,
.progress = TRUE)
future::plan(future::sequential)


# pb <- utils::txtProgressBar(min = 0, max = length(res_all), style = 3)
# progress <- function(n) utils::setTxtProgressBar(pb, n)
# opts <- list(progress = progress, preschedule = FALSE)
# cl <- parallel::makeCluster(ncores)
# doSNOW::registerDoSNOW(cl)
# boot <- foreach::foreach(i = seq_len(length(res_all)), .options.snow = opts)
# gtfs_all <- foreach::`%dopar%`(boot, {
# UK2GTFS:::transxchange_export_try(res_all[[i]],
# cal = cal,
# naptan = naptan_trim,
# scotland = scotland,
# try_mode = try_mode)
# # setTxtProgressBar(pb, i)
# })
#
# parallel::stopCluster(cl)
# rm(cl, boot, opts, pb, progress)
}

unlink(file.path(tempdir(), "txc"), recursive = TRUE)
Expand Down

0 comments on commit baf7b27

Please sign in to comment.