Skip to content

Commit

Permalink
Merge pull request #18 from ocaml-multicore/ctk21/parallel_for_reduce…
Browse files Browse the repository at this point in the history
…_div_con

Use same algorithm for parallel_for_reduce as parallel_for
  • Loading branch information
kayceesrk authored Aug 21, 2020
2 parents b2da603 + c8f3dc4 commit 2ac25c7
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 21 deletions.
38 changes: 21 additions & 17 deletions lib/task.ml
Original file line number Diff line number Diff line change
Expand Up @@ -57,27 +57,31 @@ let teardown_pool pool =
done;
Array.iter Domain.join pool.domains

let parallel_for_reduce pool reduce_fun init ~chunk_size ~start ~finish ~body =
assert (chunk_size > 0);
let work s e =
let rec loop i acc =
if i > e then acc
else loop (i+1) (reduce_fun acc (body i))
in
loop s init
let parallel_for_reduce ?(chunk_size=0) ~start ~finish ~body pool reduce_fun init =
let chunk_size = if chunk_size > 0 then chunk_size
else begin
let n_domains = (Array.length pool.domains) + 1 in
let n_tasks = finish - start + 1 in
if n_domains = 1 then n_tasks
else max 1 (n_tasks/(8*n_domains))
end
in
let rec loop i acc =
if i+chunk_size > finish then
let p = async pool (fun _ -> work i finish) in
p::acc
let rec work s e =
if e - s < chunk_size then
let rec loop i acc =
if i > e then acc
else loop (i+1) (reduce_fun acc (body i))
in
loop s init
else begin
let p = async pool (fun _ -> work i (i+chunk_size-1)) in
loop (i+chunk_size) (p::acc)
let d = s + ((e - s) / 2) in
let p = async pool (fun _ -> work s d) in
let right = work (d+1) e in
let left = await pool p in
reduce_fun left right
end
in
let ps = loop start [] in
let results = List.map (await pool) ps in
List.fold_left reduce_fun init results
work start finish

let parallel_for ?(chunk_size=0) ~start ~finish ~body pool =
let chunk_size = if chunk_size > 0 then chunk_size
Expand Down
8 changes: 4 additions & 4 deletions lib/task.mli
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ val await : pool -> 'a promise -> 'a
* be returned. If the task had raised an exception, then [await] raises the
* same exception. *)

val parallel_for: ?chunk_size:int -> start:int -> finish:int ->
val parallel_for : ?chunk_size:int -> start:int -> finish:int ->
body:(int -> unit) -> pool -> unit
(** [parallel_for c s f b p] behaves similar to [for i=s to f do b i done], but
* runs the for loop in parallel. The chunk size [c] determines the number of
Expand All @@ -37,9 +37,9 @@ val parallel_for: ?chunk_size:int -> start:int -> finish:int ->
* scheme.
*)

val parallel_for_reduce : pool -> ('a -> 'a -> 'a) -> 'a -> chunk_size:int ->
start:int -> finish:int -> body:(int -> 'a) -> 'a
(** [parallel_for_reduce p r i c s f b] is similar to [parallel_for] except
val parallel_for_reduce : ?chunk_size:int -> start:int -> finish:int ->
body:(int -> 'a) -> pool -> ('a -> 'a -> 'a) -> 'a -> 'a
(** [parallel_for_reduce c s f b p r i] is similar to [parallel_for] except
* that the result returned by each iteration is reduced with [r] with initial
* value [i]. *)

Expand Down

0 comments on commit 2ac25c7

Please sign in to comment.