Skip to content

Commit

Permalink
Add instantaneous_domain_index
Browse files Browse the repository at this point in the history
  • Loading branch information
polytypic committed Dec 23, 2023
1 parent 256b202 commit 529e3e2
Show file tree
Hide file tree
Showing 9 changed files with 288 additions and 3 deletions.
6 changes: 6 additions & 0 deletions dune-project
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

(generate_opam_files true)

(implicit_transitive_deps false)

(source
(github ocaml-multicore/multicore-magic))

Expand All @@ -21,6 +23,10 @@
(depends
(ocaml
(>= 4.12.0))
(domain_shims
(and
(>= 0.1.0)
:with-test))
(alcotest
(and
(>= 1.7.0)
Expand Down
1 change: 1 addition & 0 deletions multicore-magic.opam
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ bug-reports: "https://github.com/ocaml-multicore/multicore-magic/issues"
depends: [
"dune" {>= "3.3"}
"ocaml" {>= "4.12.0"}
"domain_shims" {>= "0.1.0" & with-test}
"alcotest" {>= "1.7.0" & with-test}
"odoc" {>= "2.2.0" & with-doc}
]
Expand Down
1 change: 1 addition & 0 deletions src/Multicore_magic.ml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
include Padding
module Transparent_atomic = Transparent_atomic
include Index

let[@inline] fenceless_get (atomic : 'a Atomic.t) =
!(Sys.opaque_identity (Obj.magic atomic : 'a ref))
Expand Down
28 changes: 28 additions & 0 deletions src/Multicore_magic.mli
Original file line number Diff line number Diff line change
Expand Up @@ -168,3 +168,31 @@ module Transparent_atomic : sig
val incr : int t -> unit
val decr : int t -> unit
end

(** {1 Avoiding contention} *)

val instantaneous_domain_index : unit -> int
(** [instantaneous_domain_index ()] potentially (re)allocates and returns a
non-negative integer "index" for the current domain. The indices are
guaranteed to be unique among the domains that exist at a point in time.
Each call of [instantaneous_domain_index ()] may return a different index.
The intention is that the returned value can be used as an index into a
contention avoiding parallelism safe data structure. For example, a naïve
scalable increment of one counter from an array of counters could be done as
follows:
{[
let incr counters =
(* Assuming length of [counters] is a power of two and larger than
the number of domains. *)
let mask = Array.length counters - 1 in
let index = instantaneous_domain_index () in
Atomic.incr counters.(index land mask)
]}
The implementation ensures that the indices are allocated as densely as
possible at any given moment. This should allow allocating as many counters
as needed and essentially eliminate contention.
On OCaml 4 [instantaneous_domain_index ()] will always return [0]. *)
16 changes: 16 additions & 0 deletions src/dune
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,22 @@
(name Multicore_magic)
(public_name multicore-magic))

;;

(rule
(enabled_if
(< %{ocaml_version} 5.0.0))
(action
(copy index.ocaml4.ml index.ml)))

(rule
(enabled_if
(>= %{ocaml_version} 5.0.0))
(action
(copy index.ocaml5.ml index.ml)))

;;

(rule
(enabled_if
(< %{ocaml_version} 5.0.0))
Expand Down
1 change: 1 addition & 0 deletions src/index.ocaml4.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
let instantaneous_domain_index () = 0
159 changes: 159 additions & 0 deletions src/index.ocaml5.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
open struct
module Atomic = Transparent_atomic

(** We don't use the sign bit. *)
let bits_per_word = Sys.int_size - 1

let bit_index_of b =
(* As [b] contains exactly one non-zero bit, this could directly be optimized
using techniques described in
Using de Bruijn Sequences to Index a 1 in a Computer Word
by Leiserson, Prokop, and Randall. *)
let i, b =
if 32 < Sys.int_size && 1 lsl 0x20 <= b then (0x20, b lsr 0x20) else (0, b)
in
let i, b = if 1 lsl 0x10 <= b then (i + 0x10, b lsr 0x10) else (i, b) in
let i, b = if 1 lsl 0x08 <= b then (i + 0x08, b lsr 0x08) else (i, b) in
let i, b = if 1 lsl 0x04 <= b then (i + 0x04, b lsr 0x04) else (i, b) in
let i, b = if 1 lsl 0x02 <= b then (i + 0x02, b lsr 0x02) else (i, b) in
if 1 lsl 0x01 <= b then i + 0x01 else i

module Index_allocator : sig
type t

val create : unit -> t
val acquire : t -> int
val release : t -> int -> unit
end = struct
type t = int Atomic.t array Atomic.t

let create () = Atomic.make [||]

let release words bit_index =
let word_index = bit_index / bits_per_word in
let t = Atomic.get words in
let bit = 1 lsl (bit_index - (word_index * bits_per_word)) in
let word = Array.unsafe_get t word_index in
Atomic.fetch_and_add word (-bit) |> ignore

let rec acquire_rec words t i =
if i < Array.length t then
let word = Array.unsafe_get t i in
let before = Atomic.get word in
let alloc = before + 1 in
(* We don't use the sign bit. *)
if 0 < alloc then begin
let after = alloc lor before in
if Atomic.compare_and_set word before after then
(i * bits_per_word) + bit_index_of (after lxor before)
else acquire_rec words t i
end
else acquire_rec words t (i + 1)
else
let new_t =
Array.init ((Array.length t * 2) + 1) @@ fun i ->
if i < Array.length t then Array.unsafe_get t i else Atomic.make 0
in
Atomic.compare_and_set words t new_t |> ignore;
acquire words

and acquire words = acquire_rec words (Atomic.get words) 0
end

module Domain_index_allocator : sig
type t
type domain

val create : unit -> t
val set_on_first_get : t -> (t -> domain -> unit) -> unit
val new_domain : unit -> domain
val delete_domain : t -> domain -> unit
val get : t -> domain -> int
end = struct
type domain = int ref

type t = {
mutable _num_domains : int;
index_allocator : Index_allocator.t;
mutable on_first_get : t -> domain -> unit;
}

external num_domains_as_atomic : t -> int Atomic.t = "%identity"

let on_first_get _ _ = ()

let create () =
let index_allocator = Index_allocator.create () in
{ _num_domains = 0; index_allocator; on_first_get }
|> Padding.copy_as_padded

let set_on_first_get t on_first_get = t.on_first_get <- on_first_get

let unallocated_index = Int.max_int
and domain_exit_index = Int.max_int - 1

let new_domain () = ref unallocated_index |> Padding.copy_as_padded

let delete_domain t domain =
let index = !domain in
if index < domain_exit_index then begin
domain := domain_exit_index;
Index_allocator.release t.index_allocator index;
Atomic.decr (num_domains_as_atomic t)
end

let[@poll error] [@inline never] cas_domain domain before after =
!domain == before
&& begin
domain := after;
true
end

let[@inline never] rec instantaneous_domain_index t domain =
let index = !domain in
if index < Atomic.get (num_domains_as_atomic t) then index
else if index == domain_exit_index then
failwith
"Multicore_magic: instantaneous_domain_index called after domain exit"
else
let new_index = Index_allocator.acquire t.index_allocator in
if
new_index
< Atomic.get (num_domains_as_atomic t)
+ Bool.to_int (index == unallocated_index)
&& cas_domain domain index new_index
then begin
if index == unallocated_index then begin
Atomic.incr (num_domains_as_atomic t);
t.on_first_get t domain
end
else Index_allocator.release t.index_allocator index;
new_index
end
else begin
Index_allocator.release t.index_allocator new_index;
instantaneous_domain_index t domain
end

let[@inline] get t domain =
let index = !domain in
if index < Atomic.get (num_domains_as_atomic t) then index
else instantaneous_domain_index t domain
end

let key = Domain.DLS.new_key Domain_index_allocator.new_domain
let t = Domain_index_allocator.create ()

let release_index () =
let domain = Domain.DLS.get key in
Domain_index_allocator.delete_domain t domain

let () =
Domain_index_allocator.set_on_first_get t @@ fun _ _ ->
Domain.at_exit release_index
end

let instantaneous_domain_index () =
let domain = Domain.DLS.get key in
Domain_index_allocator.get t domain
72 changes: 72 additions & 0 deletions test/Multicore_magic_test.ml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,76 @@ let transparent_atomic v0 v1 v2 () =
assert (v2 = Multicore_magic.Transparent_atomic.fenceless_get x);
assert (v2 = Multicore_magic.Transparent_atomic.get x)

let test_instantaneous_domain_index () =
if Domain.recommended_domain_count () = 1 then begin
(* Probably running on OCaml 4. Almost nothing to test. *)
assert (0 = Multicore_magic.instantaneous_domain_index ())
end
else begin
let test_not_same () =
Domain.join @@ Domain.spawn
@@ fun () ->
let i0 = Multicore_magic.instantaneous_domain_index () in
let i1 =
Domain.join @@ Domain.spawn
@@ Multicore_magic.instantaneous_domain_index
in
assert (i0 != i1);
let i1' =
Domain.join @@ Domain.spawn
@@ Multicore_magic.instantaneous_domain_index
in
assert (i1 == i1')
in
test_not_same ();
let module Atomic = Multicore_magic.Transparent_atomic in
let stress () =
let n_domains = 7 in
let num_started = Atomic.make 0 |> Multicore_magic.copy_as_padded in
let num_exited = Atomic.make 0 |> Multicore_magic.copy_as_padded in
let failed = ref false |> Multicore_magic.copy_as_padded in

let check () =
let num_exited = Atomic.get num_exited in
let i = Multicore_magic.instantaneous_domain_index () in
let n = Atomic.get num_started - num_exited in
if i < 0 || n < i then failed := true
in

let domain () =
Atomic.incr num_started;
(* [Domain.DLS] is not thread-safe so it might be necessary to make sure
we get the index before spawning threads: *)
check ();
let threads =
Array.init (Random.int 5) @@ fun _ ->
()
|> Thread.create @@ fun () ->
for _ = 0 to Random.int 10 do
Unix.sleepf (Random.float 0.01);
check ()
done
in
Array.iter Thread.join threads;
Atomic.incr num_exited
in

let threads =
Array.init n_domains @@ fun _ ->
()
|> Thread.create @@ fun () ->
for _ = 0 to 100 do
Unix.sleepf (Random.float 0.01);
Domain.join (Domain.spawn domain)
done
in
Array.iter Thread.join threads;

assert (not !failed)
in
stress ()
end

let () =
Alcotest.run "multicore-magic"
[
Expand All @@ -106,4 +176,6 @@ let () =
[ Alcotest.test_case "" `Quick (transparent_atomic 4.2 1.01 7.6) ] );
( "transparent_atomic with ints",
[ Alcotest.test_case "" `Quick (transparent_atomic 42 101 76) ] );
( "instantaneous_domain_index",
[ Alcotest.test_case "" `Quick test_instantaneous_domain_index ] );
]
7 changes: 4 additions & 3 deletions test/dune
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
(tests
(names Multicore_magic_test)
(libraries Multicore_magic alcotest))
(test
(name Multicore_magic_test)
(modules Multicore_magic_test)
(libraries Multicore_magic alcotest domain_shims threads.posix unix))

0 comments on commit 529e3e2

Please sign in to comment.