diff --git a/CHANGES.md b/CHANGES.md index 58a5256..9ff14ba 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,3 +1,7 @@ +## Next release + +* `parallel_find` function that stops early (#129, #130) + ## v0.5.0 This release includes: diff --git a/lib/task.ml b/lib/task.ml index 3599925..5e014dd 100644 --- a/lib/task.ml +++ b/lib/task.ml @@ -250,3 +250,35 @@ let parallel_scan pool op elements = prefix_s end + +let parallel_find (type a) ?(chunk_size=0) ~start ~finish ~body pool = + let pd = get_pool_data pool in + let found : a option Atomic.t = Atomic.make None in + let chunk_size = if chunk_size > 0 then chunk_size + else begin + let n_domains = (Array.length pd.domains) + 1 in + let n_tasks = finish - start + 1 in + if n_domains = 1 then n_tasks + else max 1 (n_tasks/(8*n_domains)) + end + in + let rec work pool fn s e = + if e - s < chunk_size then + let i = ref s in + while !i <= e && Option.is_none (Atomic.get found) do + begin match fn !i with + | None -> () + | Some _ as some -> Atomic.set found some + end; + incr i; + done + else if Option.is_some (Atomic.get found) then () + else begin + let d = s + ((e - s) / 2) in + let left = async pool (fun _ -> work pool fn s d) in + work pool fn (d+1) e; + await pool left + end + in + work pool body start finish; + Atomic.get found diff --git a/lib/task.mli b/lib/task.mli index 16baeac..1efcb37 100644 --- a/lib/task.mli +++ b/lib/task.mli @@ -87,3 +87,20 @@ val parallel_scan : pool -> ('a -> 'a -> 'a) -> 'a array -> 'a array Must be called with a call to {!run} in the dynamic scope to handle the internal algebraic effects for task synchronization. *) + +val parallel_find : ?chunk_size:int -> start:int -> finish:int -> + body:(int -> 'a option) -> pool -> 'a option +(** [parallel_find ~start ~finish ~body pool] calls [body] in parallel + on the indices from [start] to [finish], in any order, until at + least one of them returns [Some v]. + + Search stops when a value is found, but there is no guarantee that + it stops as early as possible, other calls to [body] may happen in + parallel or afterwards. + + See {!parallel_for} for the description of the [chunk_size] + parameter and the scheduling strategy. + + Must be called with a call to {!run} in the dynamic scope to + handle the internal algebraic effects for task synchronization. +*) diff --git a/test/dune b/test/dune index 346e8e1..7df454b 100644 --- a/test/dune +++ b/test/dune @@ -81,6 +81,12 @@ (modules test_task) (modes native)) +(test + (name test_parallel_find) + (libraries domainslib) + (modules test_parallel_find) + (modes native)) + (test (name test_deadlock) (libraries domainslib) diff --git a/test/test_parallel_find.ml b/test/test_parallel_find.ml new file mode 100644 index 0000000..d3c7041 --- /dev/null +++ b/test/test_parallel_find.ml @@ -0,0 +1,41 @@ +let len = 1_000_000 +let nb_needles = 4 + +let () = Random.init 42 + +let needles = + Array.init nb_needles (fun _ -> Random.int len) + +let input = + let t = Array.make len false in + needles |> Array.iter (fun needle -> + t.(needle) <- true + ); + t + +open Domainslib + +let search_needle pool ~chunk_size = + Task.parallel_find pool ~chunk_size ~start:0 ~finish:(len - 1) ~body:(fun i -> + if input.(i) then Some i + else None + ) + +let test_search pool ~chunk_size = + match search_needle pool ~chunk_size with + | None -> assert false + | Some needle -> + assert (Array.exists ((=) needle) needles) + +let () = + (* [num_domains] is the number of *new* domains spawned by the pool + performing computations in addition to the current domain. *) + let num_domains = Domain.recommended_domain_count () - 1 in + Printf.eprintf "test_parallel_find on %d domains.\n" (num_domains + 1); + let pool = Task.setup_pool ~num_domains ~name:"pool" () in + Task.run pool begin fun () -> + [0; 16; 32; 1000] |> List.iter (fun chunk_size -> + test_search pool ~chunk_size) + end; + Task.teardown_pool pool; + prerr_endline "Success.";