Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

parallel_find : ... -> (unit -> 'a option) -> 'a option #90

Merged
merged 1 commit into from
May 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## Next release

* `parallel_find` function that stops early (#129, #130)

## v0.5.0

This release includes:
Expand Down
32 changes: 32 additions & 0 deletions lib/task.ml
Original file line number Diff line number Diff line change
Expand Up @@ -250,3 +250,35 @@ let parallel_scan pool op elements =

prefix_s
end

let parallel_find (type a) ?(chunk_size=0) ~start ~finish ~body pool =
let pd = get_pool_data pool in
let found : a option Atomic.t = Atomic.make None in
let chunk_size = if chunk_size > 0 then chunk_size
else begin
let n_domains = (Array.length pd.domains) + 1 in
let n_tasks = finish - start + 1 in
if n_domains = 1 then n_tasks
else max 1 (n_tasks/(8*n_domains))
end
in
let rec work pool fn s e =
if e - s < chunk_size then
let i = ref s in
while !i <= e && Option.is_none (Atomic.get found) do
begin match fn !i with
| None -> ()
| Some _ as some -> Atomic.set found some
end;
incr i;
done
else if Option.is_some (Atomic.get found) then ()
else begin
let d = s + ((e - s) / 2) in
let left = async pool (fun _ -> work pool fn s d) in
work pool fn (d+1) e;
await pool left
end
in
work pool body start finish;
Atomic.get found
17 changes: 17 additions & 0 deletions lib/task.mli
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,20 @@ val parallel_scan : pool -> ('a -> 'a -> 'a) -> 'a array -> 'a array

Must be called with a call to {!run} in the dynamic scope to handle the
internal algebraic effects for task synchronization. *)

val parallel_find : ?chunk_size:int -> start:int -> finish:int ->
body:(int -> 'a option) -> pool -> 'a option
(** [parallel_find ~start ~finish ~body pool] calls [body] in parallel
on the indices from [start] to [finish], in any order, until at
least one of them returns [Some v].

Search stops when a value is found, but there is no guarantee that
it stops as early as possible, other calls to [body] may happen in
parallel or afterwards.

See {!parallel_for} for the description of the [chunk_size]
parameter and the scheduling strategy.

Must be called with a call to {!run} in the dynamic scope to
handle the internal algebraic effects for task synchronization.
*)
6 changes: 6 additions & 0 deletions test/dune
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,12 @@
(modules test_task)
(modes native))

(test
(name test_parallel_find)
(libraries domainslib)
(modules test_parallel_find)
(modes native))

(test
(name test_deadlock)
(libraries domainslib)
Expand Down
41 changes: 41 additions & 0 deletions test/test_parallel_find.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
let len = 1_000_000
let nb_needles = 4

let () = Random.init 42

let needles =
Array.init nb_needles (fun _ -> Random.int len)

let input =
let t = Array.make len false in
needles |> Array.iter (fun needle ->
t.(needle) <- true
);
t

open Domainslib

let search_needle pool ~chunk_size =
Task.parallel_find pool ~chunk_size ~start:0 ~finish:(len - 1) ~body:(fun i ->
if input.(i) then Some i
else None
)

let test_search pool ~chunk_size =
match search_needle pool ~chunk_size with
| None -> assert false
| Some needle ->
assert (Array.exists ((=) needle) needles)

let () =
(* [num_domains] is the number of *new* domains spawned by the pool
performing computations in addition to the current domain. *)
let num_domains = Domain.recommended_domain_count () - 1 in
Printf.eprintf "test_parallel_find on %d domains.\n" (num_domains + 1);
let pool = Task.setup_pool ~num_domains ~name:"pool" () in
Task.run pool begin fun () ->
[0; 16; 32; 1000] |> List.iter (fun chunk_size ->
test_search pool ~chunk_size)
end;
Task.teardown_pool pool;
prerr_endline "Success.";