From 529e3e281c481d125e0cf475d26e6647486491df Mon Sep 17 00:00:00 2001 From: Vesa Karvonen Date: Tue, 21 Nov 2023 15:21:23 +0200 Subject: [PATCH] Add `instantaneous_domain_index` --- dune-project | 6 ++ multicore-magic.opam | 1 + src/Multicore_magic.ml | 1 + src/Multicore_magic.mli | 28 ++++++ src/dune | 16 ++++ src/index.ocaml4.ml | 1 + src/index.ocaml5.ml | 159 +++++++++++++++++++++++++++++++++++ test/Multicore_magic_test.ml | 72 ++++++++++++++++ test/dune | 7 +- 9 files changed, 288 insertions(+), 3 deletions(-) create mode 100644 src/index.ocaml4.ml create mode 100644 src/index.ocaml5.ml diff --git a/dune-project b/dune-project index db9487f..bdfef9f 100644 --- a/dune-project +++ b/dune-project @@ -4,6 +4,8 @@ (generate_opam_files true) +(implicit_transitive_deps false) + (source (github ocaml-multicore/multicore-magic)) @@ -21,6 +23,10 @@ (depends (ocaml (>= 4.12.0)) + (domain_shims + (and + (>= 0.1.0) + :with-test)) (alcotest (and (>= 1.7.0) diff --git a/multicore-magic.opam b/multicore-magic.opam index 2386dbe..9c20da0 100644 --- a/multicore-magic.opam +++ b/multicore-magic.opam @@ -9,6 +9,7 @@ bug-reports: "https://github.com/ocaml-multicore/multicore-magic/issues" depends: [ "dune" {>= "3.3"} "ocaml" {>= "4.12.0"} + "domain_shims" {>= "0.1.0" & with-test} "alcotest" {>= "1.7.0" & with-test} "odoc" {>= "2.2.0" & with-doc} ] diff --git a/src/Multicore_magic.ml b/src/Multicore_magic.ml index 8290619..daa313c 100644 --- a/src/Multicore_magic.ml +++ b/src/Multicore_magic.ml @@ -1,5 +1,6 @@ include Padding module Transparent_atomic = Transparent_atomic +include Index let[@inline] fenceless_get (atomic : 'a Atomic.t) = !(Sys.opaque_identity (Obj.magic atomic : 'a ref)) diff --git a/src/Multicore_magic.mli b/src/Multicore_magic.mli index 2d6b72f..ec817e9 100644 --- a/src/Multicore_magic.mli +++ b/src/Multicore_magic.mli @@ -168,3 +168,31 @@ module Transparent_atomic : sig val incr : int t -> unit val decr : int t -> unit end + +(** {1 Avoiding contention} *) + +val instantaneous_domain_index : unit -> int +(** [instantaneous_domain_index ()] potentially (re)allocates and returns a + non-negative integer "index" for the current domain. The indices are + guaranteed to be unique among the domains that exist at a point in time. + Each call of [instantaneous_domain_index ()] may return a different index. + + The intention is that the returned value can be used as an index into a + contention avoiding parallelism safe data structure. For example, a naïve + scalable increment of one counter from an array of counters could be done as + follows: + + {[ + let incr counters = + (* Assuming length of [counters] is a power of two and larger than + the number of domains. *) + let mask = Array.length counters - 1 in + let index = instantaneous_domain_index () in + Atomic.incr counters.(index land mask) + ]} + + The implementation ensures that the indices are allocated as densely as + possible at any given moment. This should allow allocating as many counters + as needed and essentially eliminate contention. + + On OCaml 4 [instantaneous_domain_index ()] will always return [0]. *) diff --git a/src/dune b/src/dune index d70e574..e3ffb92 100644 --- a/src/dune +++ b/src/dune @@ -2,6 +2,22 @@ (name Multicore_magic) (public_name multicore-magic)) +;; + +(rule + (enabled_if + (< %{ocaml_version} 5.0.0)) + (action + (copy index.ocaml4.ml index.ml))) + +(rule + (enabled_if + (>= %{ocaml_version} 5.0.0)) + (action + (copy index.ocaml5.ml index.ml))) + +;; + (rule (enabled_if (< %{ocaml_version} 5.0.0)) diff --git a/src/index.ocaml4.ml b/src/index.ocaml4.ml new file mode 100644 index 0000000..d63761d --- /dev/null +++ b/src/index.ocaml4.ml @@ -0,0 +1 @@ +let instantaneous_domain_index () = 0 diff --git a/src/index.ocaml5.ml b/src/index.ocaml5.ml new file mode 100644 index 0000000..d067ff4 --- /dev/null +++ b/src/index.ocaml5.ml @@ -0,0 +1,159 @@ +open struct + module Atomic = Transparent_atomic + + (** We don't use the sign bit. *) + let bits_per_word = Sys.int_size - 1 + + let bit_index_of b = + (* As [b] contains exactly one non-zero bit, this could directly be optimized + using techniques described in + + Using de Bruijn Sequences to Index a 1 in a Computer Word + by Leiserson, Prokop, and Randall. *) + let i, b = + if 32 < Sys.int_size && 1 lsl 0x20 <= b then (0x20, b lsr 0x20) else (0, b) + in + let i, b = if 1 lsl 0x10 <= b then (i + 0x10, b lsr 0x10) else (i, b) in + let i, b = if 1 lsl 0x08 <= b then (i + 0x08, b lsr 0x08) else (i, b) in + let i, b = if 1 lsl 0x04 <= b then (i + 0x04, b lsr 0x04) else (i, b) in + let i, b = if 1 lsl 0x02 <= b then (i + 0x02, b lsr 0x02) else (i, b) in + if 1 lsl 0x01 <= b then i + 0x01 else i + + module Index_allocator : sig + type t + + val create : unit -> t + val acquire : t -> int + val release : t -> int -> unit + end = struct + type t = int Atomic.t array Atomic.t + + let create () = Atomic.make [||] + + let release words bit_index = + let word_index = bit_index / bits_per_word in + let t = Atomic.get words in + let bit = 1 lsl (bit_index - (word_index * bits_per_word)) in + let word = Array.unsafe_get t word_index in + Atomic.fetch_and_add word (-bit) |> ignore + + let rec acquire_rec words t i = + if i < Array.length t then + let word = Array.unsafe_get t i in + let before = Atomic.get word in + let alloc = before + 1 in + (* We don't use the sign bit. *) + if 0 < alloc then begin + let after = alloc lor before in + if Atomic.compare_and_set word before after then + (i * bits_per_word) + bit_index_of (after lxor before) + else acquire_rec words t i + end + else acquire_rec words t (i + 1) + else + let new_t = + Array.init ((Array.length t * 2) + 1) @@ fun i -> + if i < Array.length t then Array.unsafe_get t i else Atomic.make 0 + in + Atomic.compare_and_set words t new_t |> ignore; + acquire words + + and acquire words = acquire_rec words (Atomic.get words) 0 + end + + module Domain_index_allocator : sig + type t + type domain + + val create : unit -> t + val set_on_first_get : t -> (t -> domain -> unit) -> unit + val new_domain : unit -> domain + val delete_domain : t -> domain -> unit + val get : t -> domain -> int + end = struct + type domain = int ref + + type t = { + mutable _num_domains : int; + index_allocator : Index_allocator.t; + mutable on_first_get : t -> domain -> unit; + } + + external num_domains_as_atomic : t -> int Atomic.t = "%identity" + + let on_first_get _ _ = () + + let create () = + let index_allocator = Index_allocator.create () in + { _num_domains = 0; index_allocator; on_first_get } + |> Padding.copy_as_padded + + let set_on_first_get t on_first_get = t.on_first_get <- on_first_get + + let unallocated_index = Int.max_int + and domain_exit_index = Int.max_int - 1 + + let new_domain () = ref unallocated_index |> Padding.copy_as_padded + + let delete_domain t domain = + let index = !domain in + if index < domain_exit_index then begin + domain := domain_exit_index; + Index_allocator.release t.index_allocator index; + Atomic.decr (num_domains_as_atomic t) + end + + let[@poll error] [@inline never] cas_domain domain before after = + !domain == before + && begin + domain := after; + true + end + + let[@inline never] rec instantaneous_domain_index t domain = + let index = !domain in + if index < Atomic.get (num_domains_as_atomic t) then index + else if index == domain_exit_index then + failwith + "Multicore_magic: instantaneous_domain_index called after domain exit" + else + let new_index = Index_allocator.acquire t.index_allocator in + if + new_index + < Atomic.get (num_domains_as_atomic t) + + Bool.to_int (index == unallocated_index) + && cas_domain domain index new_index + then begin + if index == unallocated_index then begin + Atomic.incr (num_domains_as_atomic t); + t.on_first_get t domain + end + else Index_allocator.release t.index_allocator index; + new_index + end + else begin + Index_allocator.release t.index_allocator new_index; + instantaneous_domain_index t domain + end + + let[@inline] get t domain = + let index = !domain in + if index < Atomic.get (num_domains_as_atomic t) then index + else instantaneous_domain_index t domain + end + + let key = Domain.DLS.new_key Domain_index_allocator.new_domain + let t = Domain_index_allocator.create () + + let release_index () = + let domain = Domain.DLS.get key in + Domain_index_allocator.delete_domain t domain + + let () = + Domain_index_allocator.set_on_first_get t @@ fun _ _ -> + Domain.at_exit release_index +end + +let instantaneous_domain_index () = + let domain = Domain.DLS.get key in + Domain_index_allocator.get t domain diff --git a/test/Multicore_magic_test.ml b/test/Multicore_magic_test.ml index 4ea9b39..4b8ef6e 100644 --- a/test/Multicore_magic_test.ml +++ b/test/Multicore_magic_test.ml @@ -82,6 +82,76 @@ let transparent_atomic v0 v1 v2 () = assert (v2 = Multicore_magic.Transparent_atomic.fenceless_get x); assert (v2 = Multicore_magic.Transparent_atomic.get x) +let test_instantaneous_domain_index () = + if Domain.recommended_domain_count () = 1 then begin + (* Probably running on OCaml 4. Almost nothing to test. *) + assert (0 = Multicore_magic.instantaneous_domain_index ()) + end + else begin + let test_not_same () = + Domain.join @@ Domain.spawn + @@ fun () -> + let i0 = Multicore_magic.instantaneous_domain_index () in + let i1 = + Domain.join @@ Domain.spawn + @@ Multicore_magic.instantaneous_domain_index + in + assert (i0 != i1); + let i1' = + Domain.join @@ Domain.spawn + @@ Multicore_magic.instantaneous_domain_index + in + assert (i1 == i1') + in + test_not_same (); + let module Atomic = Multicore_magic.Transparent_atomic in + let stress () = + let n_domains = 7 in + let num_started = Atomic.make 0 |> Multicore_magic.copy_as_padded in + let num_exited = Atomic.make 0 |> Multicore_magic.copy_as_padded in + let failed = ref false |> Multicore_magic.copy_as_padded in + + let check () = + let num_exited = Atomic.get num_exited in + let i = Multicore_magic.instantaneous_domain_index () in + let n = Atomic.get num_started - num_exited in + if i < 0 || n < i then failed := true + in + + let domain () = + Atomic.incr num_started; + (* [Domain.DLS] is not thread-safe so it might be necessary to make sure + we get the index before spawning threads: *) + check (); + let threads = + Array.init (Random.int 5) @@ fun _ -> + () + |> Thread.create @@ fun () -> + for _ = 0 to Random.int 10 do + Unix.sleepf (Random.float 0.01); + check () + done + in + Array.iter Thread.join threads; + Atomic.incr num_exited + in + + let threads = + Array.init n_domains @@ fun _ -> + () + |> Thread.create @@ fun () -> + for _ = 0 to 100 do + Unix.sleepf (Random.float 0.01); + Domain.join (Domain.spawn domain) + done + in + Array.iter Thread.join threads; + + assert (not !failed) + in + stress () + end + let () = Alcotest.run "multicore-magic" [ @@ -106,4 +176,6 @@ let () = [ Alcotest.test_case "" `Quick (transparent_atomic 4.2 1.01 7.6) ] ); ( "transparent_atomic with ints", [ Alcotest.test_case "" `Quick (transparent_atomic 42 101 76) ] ); + ( "instantaneous_domain_index", + [ Alcotest.test_case "" `Quick test_instantaneous_domain_index ] ); ] diff --git a/test/dune b/test/dune index ded08a3..eb86f37 100644 --- a/test/dune +++ b/test/dune @@ -1,3 +1,4 @@ -(tests - (names Multicore_magic_test) - (libraries Multicore_magic alcotest)) +(test + (name Multicore_magic_test) + (modules Multicore_magic_test) + (libraries Multicore_magic alcotest domain_shims threads.posix unix))