From 162b0aff3df0fedf9e8c257a7987fd5a2b50a109 Mon Sep 17 00:00:00 2001 From: Vesa Karvonen Date: Sat, 20 May 2023 13:19:54 +0300 Subject: [PATCH] Add `Mvar` --- src/kcas_data/kcas_data.ml | 1 + src/kcas_data/magic_option.ml | 9 +++++++-- src/kcas_data/magic_option.mli | 3 +++ src/kcas_data/mvar.ml | 36 ++++++++++++++++++++++++++++++++++ src/kcas_data/mvar.mli | 33 +++++++++++++++++++++++++++++++ src/kcas_data/mvar_intf.ml | 33 +++++++++++++++++++++++++++++++ test/kcas_data/dune | 6 ++++++ test/kcas_data/mvar_test.ml | 22 +++++++++++++++++++++ 8 files changed, 141 insertions(+), 2 deletions(-) create mode 100644 src/kcas_data/mvar.ml create mode 100644 src/kcas_data/mvar.mli create mode 100644 src/kcas_data/mvar_intf.ml create mode 100644 test/kcas_data/mvar_test.ml diff --git a/src/kcas_data/kcas_data.ml b/src/kcas_data/kcas_data.ml index 8dd8aead..7de88887 100644 --- a/src/kcas_data/kcas_data.ml +++ b/src/kcas_data/kcas_data.ml @@ -56,6 +56,7 @@ module Stack = Stack (** {1 Communication and synchronization primitives} *) +module Mvar = Mvar module Promise = Promise (** {1 Linked data structures} *) diff --git a/src/kcas_data/magic_option.ml b/src/kcas_data/magic_option.ml index f8efc427..5782e256 100644 --- a/src/kcas_data/magic_option.ml +++ b/src/kcas_data/magic_option.ml @@ -1,3 +1,5 @@ +open Kcas + type 'a t = 'a let none = ref () @@ -7,8 +9,11 @@ external some : 'a -> 'a t = "%identity" let is_none x = x == none [@@inline] let is_some x = x != none [@@inline] -let get_or_retry x = if is_none x then Kcas.Retry.later () else x +let get_or_retry x = if is_none x then Retry.later () else x [@@inline] +let put_or_retry v x = if is_none x then some v else Retry.later () [@@inline] +let take_or_retry x = if is_none x then Retry.later () else none [@@inline] external get_unsafe : 'a t -> 'a = "%identity" -let to_option x = if is_none x then None else Some x +let to_option x = if is_none x then None else Some x [@@inline] +let of_option = function None -> none | Some x -> some x [@@inline] diff --git a/src/kcas_data/magic_option.mli b/src/kcas_data/magic_option.mli index ec0fc696..c219087b 100644 --- a/src/kcas_data/magic_option.mli +++ b/src/kcas_data/magic_option.mli @@ -7,5 +7,8 @@ val some : 'a -> 'a t val is_none : 'a t -> bool val is_some : 'a t -> bool val get_or_retry : 'a t -> 'a +val put_or_retry : 'a -> 'a t -> 'a t +val take_or_retry : 'a t -> 'a t val get_unsafe : 'a t -> 'a val to_option : 'a t -> 'a option +val of_option : 'a option -> 'a t diff --git a/src/kcas_data/mvar.ml b/src/kcas_data/mvar.ml new file mode 100644 index 00000000..256301f5 --- /dev/null +++ b/src/kcas_data/mvar.ml @@ -0,0 +1,36 @@ +open Kcas + +type 'a t = 'a Magic_option.t Loc.t + +let create x_opt = Loc.make (Magic_option.of_option x_opt) + +module Xt = struct + let is_empty ~xt mv = Magic_option.is_none (Xt.get ~xt mv) + + let try_put ~xt mv value = + Magic_option.is_none + (Xt.compare_and_swap ~xt mv Magic_option.none (Magic_option.some value)) + + let put ~xt mv value = + Xt.unsafe_modify ~xt mv (Magic_option.put_or_retry value) + + let take_opt ~xt mv = + Magic_option.to_option (Xt.exchange ~xt mv Magic_option.none) + + let take ~xt mv = + Magic_option.get_unsafe (Xt.unsafe_update ~xt mv Magic_option.take_or_retry) + + let peek ~xt mv = Magic_option.get_or_retry (Xt.get ~xt mv) + let peek_opt ~xt mv = Magic_option.to_option (Xt.get ~xt mv) +end + +let is_empty mv = Magic_option.is_none (Loc.get mv) +let put mv value = Loc.modify mv (Magic_option.put_or_retry value) + +let try_put mv value = + Loc.compare_and_set mv Magic_option.none (Magic_option.some value) + +let take mv = Magic_option.get_unsafe (Loc.update mv Magic_option.take_or_retry) +let take_opt mv = Magic_option.to_option (Loc.exchange mv Magic_option.none) +let peek mv = Loc.get_as Magic_option.get_or_retry mv +let peek_opt mv = Magic_option.to_option (Loc.get mv) diff --git a/src/kcas_data/mvar.mli b/src/kcas_data/mvar.mli new file mode 100644 index 00000000..12a2b8df --- /dev/null +++ b/src/kcas_data/mvar.mli @@ -0,0 +1,33 @@ +open Kcas + +(** Synchronizing variable. + + A synchronizing variable is essentially equivalent to a ['a option Loc.t] + with blocking semantics on both {!take} and {!put}. + + {b NOTE}: The current implementation is not guaranteed to be fair or + scalable. In other words, when multiple producers block on {!put} or + multiple consumers block on {!take} the operations are not queued and it is + possible for a particular producer or consumer to starve. *) + +(** {1 Common interface} *) + +type !'a t +(** The type of a synchronizing variable that may contain a value of type + ['a]. *) + +val create : 'a option -> 'a t +(** [create x_opt] returns a new synchronizing variable that will either be + empty when [x_opt] is [None] or full when [x_opt] is [Some x]. *) + +(** {1 Compositional interface} *) + +module Xt : + Mvar_intf.Ops + with type 'a t := 'a t + with type ('x, 'fn) fn := xt:'x Xt.t -> 'fn +(** Explicit transaction passing on synchronizing variables. *) + +(** {1 Non-compositional interface} *) + +include Mvar_intf.Ops with type 'a t := 'a t with type ('x, 'fn) fn := 'fn diff --git a/src/kcas_data/mvar_intf.ml b/src/kcas_data/mvar_intf.ml new file mode 100644 index 00000000..3df088f2 --- /dev/null +++ b/src/kcas_data/mvar_intf.ml @@ -0,0 +1,33 @@ +module type Ops = sig + type 'a t + type ('x, 'fn) fn + + val is_empty : ('x, 'a t -> bool) fn + (** [is_empty mv] determines whether the synchronizing variable [mv] contains + a value or not. *) + + val put : ('x, 'a t -> 'a -> unit) fn + (** [put mv x] fills the synchronizing variable [mv] with the value [v] or + blocks until the variable becomes empty. *) + + val try_put : ('x, 'a t -> 'a -> bool) fn + (** [try_put mv x] tries to fill the synchronizing variable [mv] with the + value [v] and returns [true] on success or [false] in case the variable is + full. *) + + val take : ('x, 'a t -> 'a) fn + (** [take mv] removes and returns the current value of the synchronizing + variable [mv] or blocks waiting until the variable is filled. *) + + val take_opt : ('x, 'a t -> 'a option) fn + (** [take_opt mv] removes and returns the current value of the synchronizing + variable [mv] or returns [None] in case the variable is empty. *) + + val peek : ('x, 'a t -> 'a) fn + (** [peek mv] returns the current value of the synchronizing variable [mv] or + blocks waiting until the variable is filled. *) + + val peek_opt : ('x, 'a t -> 'a option) fn + (** [peek_opt mv] returns the current value of the synchronizing variable [mv] + or returns [None] in case the variable is empty. *) +end diff --git a/test/kcas_data/dune b/test/kcas_data/dune index 933d8811..5ebb8fd1 100644 --- a/test/kcas_data/dune +++ b/test/kcas_data/dune @@ -16,6 +16,12 @@ (libraries kcas kcas_data) (package kcas_data)) +(test + (name mvar_test) + (modules mvar_test) + (libraries kcas kcas_data) + (package kcas_data)) + (test (name queue_test) (modules queue_test) diff --git a/test/kcas_data/mvar_test.ml b/test/kcas_data/mvar_test.ml new file mode 100644 index 00000000..912ef5c7 --- /dev/null +++ b/test/kcas_data/mvar_test.ml @@ -0,0 +1,22 @@ +open Kcas +open Kcas_data + +let () = + let mv = Mvar.create (Some 101) in + assert (not (Mvar.is_empty mv)); + assert (Mvar.take mv = 101); + assert (Mvar.is_empty mv); + assert (Mvar.take_opt mv = None); + Mvar.put mv 42; + let running = Mvar.create None in + let d = + Domain.spawn @@ fun () -> + Mvar.put running (); + Xt.commit { tx = Mvar.Xt.put mv 76 } + in + assert (Mvar.take running = ()); + assert (Xt.commit { tx = Mvar.Xt.take mv } = 42); + Domain.join d; + assert (Mvar.take mv = 76); + + Printf.printf "Test Mvar OK!\n%!"