From c8f3dc4caf6dc2e70b32c039d4233d9cc1e10122 Mon Sep 17 00:00:00 2001 From: Tom Kelly Date: Fri, 21 Aug 2020 09:57:28 +0100 Subject: [PATCH] mirror the divide-conquer algorithm used for parallel_for with parallel_for_reduce --- lib/task.ml | 38 +++++++++++++++++++++----------------- lib/task.mli | 8 ++++---- 2 files changed, 25 insertions(+), 21 deletions(-) diff --git a/lib/task.ml b/lib/task.ml index aa72580..d8a7d24 100644 --- a/lib/task.ml +++ b/lib/task.ml @@ -57,27 +57,31 @@ let teardown_pool pool = done; Array.iter Domain.join pool.domains -let parallel_for_reduce pool reduce_fun init ~chunk_size ~start ~finish ~body = - assert (chunk_size > 0); - let work s e = - let rec loop i acc = - if i > e then acc - else loop (i+1) (reduce_fun acc (body i)) - in - loop s init +let parallel_for_reduce ?(chunk_size=0) ~start ~finish ~body pool reduce_fun init = + let chunk_size = if chunk_size > 0 then chunk_size + else begin + let n_domains = (Array.length pool.domains) + 1 in + let n_tasks = finish - start + 1 in + if n_domains = 1 then n_tasks + else max 1 (n_tasks/(8*n_domains)) + end in - let rec loop i acc = - if i+chunk_size > finish then - let p = async pool (fun _ -> work i finish) in - p::acc + let rec work s e = + if e - s < chunk_size then + let rec loop i acc = + if i > e then acc + else loop (i+1) (reduce_fun acc (body i)) + in + loop s init else begin - let p = async pool (fun _ -> work i (i+chunk_size-1)) in - loop (i+chunk_size) (p::acc) + let d = s + ((e - s) / 2) in + let p = async pool (fun _ -> work s d) in + let right = work (d+1) e in + let left = await pool p in + reduce_fun left right end in - let ps = loop start [] in - let results = List.map (await pool) ps in - List.fold_left reduce_fun init results + work start finish let parallel_for ?(chunk_size=0) ~start ~finish ~body pool = let chunk_size = if chunk_size > 0 then chunk_size diff --git a/lib/task.mli b/lib/task.mli index 52e6a84..7e47790 100644 --- a/lib/task.mli +++ b/lib/task.mli @@ -27,7 +27,7 @@ val await : pool -> 'a promise -> 'a * be returned. If the task had raised an exception, then [await] raises the * same exception. *) -val parallel_for: ?chunk_size:int -> start:int -> finish:int -> +val parallel_for : ?chunk_size:int -> start:int -> finish:int -> body:(int -> unit) -> pool -> unit (** [parallel_for c s f b p] behaves similar to [for i=s to f do b i done], but * runs the for loop in parallel. The chunk size [c] determines the number of @@ -37,9 +37,9 @@ val parallel_for: ?chunk_size:int -> start:int -> finish:int -> * scheme. *) -val parallel_for_reduce : pool -> ('a -> 'a -> 'a) -> 'a -> chunk_size:int -> - start:int -> finish:int -> body:(int -> 'a) -> 'a -(** [parallel_for_reduce p r i c s f b] is similar to [parallel_for] except +val parallel_for_reduce : ?chunk_size:int -> start:int -> finish:int -> + body:(int -> 'a) -> pool -> ('a -> 'a -> 'a) -> 'a -> 'a +(** [parallel_for_reduce c s f b p r i] is similar to [parallel_for] except * that the result returned by each iteration is reduced with [r] with initial * value [i]. *)