diff --git a/src/data_structures/synchronizer.ml b/src/data_structures/synchronizer.ml new file mode 100644 index 000000000..de19fb47b --- /dev/null +++ b/src/data_structures/synchronizer.ml @@ -0,0 +1,64 @@ +(* SPDX-License-Identifier: AGPL-3.0-or-later *) +(* Copyright © 2021-2024 OCamlPro *) +(* Written by the Owi programmers *) + +type ('get, 'write) t = + { mutex : Mutex.t + ; cond : Condition.t + ; getter : unit -> 'get option + ; writter : 'write -> Condition.t -> unit + ; mutable pledges : int + ; mutable closed : bool + } + +let init getter writter = + { mutex = Mutex.create () + ; cond = Condition.create () + ; getter + ; writter + ; pledges = 0 + ; closed = false + } + +let get synchro pledge = + let rec inner_loop synchro pledge = + match synchro.getter () with + | None when synchro.pledges = 0 || synchro.closed -> + Condition.broadcast synchro.cond; + None + | None -> + Condition.wait synchro.cond synchro.mutex; + inner_loop synchro pledge + | Some _ as v -> + if pledge then synchro.pledges <- synchro.pledges + 1; + v + in + Mutex.protect synchro.mutex (fun () -> inner_loop synchro pledge) + +let write v { writter; cond; mutex; _ } = + Mutex.protect mutex (fun () -> writter v cond) + +let make_pledge synchro = + Mutex.lock synchro.mutex; + synchro.pledges <- synchro.pledges + 1; + Mutex.unlock synchro.mutex + +let end_pledge synchro = + Mutex.lock synchro.mutex; + synchro.pledges <- synchro.pledges - 1; + Condition.broadcast synchro.cond; + Mutex.unlock synchro.mutex + +let fail q = + Mutex.lock q.mutex; + q.closed <- true; + Condition.broadcast q.cond; + Mutex.unlock q.mutex + +let rec work_while f q = + match get q true with + | None -> () + | Some v -> + let () = f v (fun v -> write v q) in + end_pledge q; + work_while f q diff --git a/src/data_structures/synchronizer.mli b/src/data_structures/synchronizer.mli new file mode 100644 index 000000000..f232361da --- /dev/null +++ b/src/data_structures/synchronizer.mli @@ -0,0 +1,16 @@ +type (!'get, !'write) t + +val init : + (unit -> 'get option) -> ('write -> Condition.t -> unit) -> ('get, 'write) t + +val get : ('get, 'write) t -> bool -> 'get option + +val write : 'write -> ('get, 'write) t -> unit + +val make_pledge : ('get, 'write) t -> unit + +val end_pledge : ('get, 'write) t -> unit + +val fail : ('get, 'write) t -> unit + +val work_while : ('get -> ('write -> unit) -> unit) -> ('get, 'write) t -> unit diff --git a/src/data_structures/wq.ml b/src/data_structures/wq.ml index 67e4a3f56..f602a293c 100644 --- a/src/data_structures/wq.ml +++ b/src/data_structures/wq.ml @@ -2,42 +2,13 @@ (* Copyright © 2021-2024 OCamlPro *) (* Written by the Owi programmers *) -type 'a t = - { mutex : Mutex.t - ; cond : Condition.t - ; queue : 'a Queue.t - ; mutable pledges : int - ; mutable failed : bool - } +type 'a t = ('a, 'a) Synchronizer.t -let pop q pledge = - Mutex.lock q.mutex; - let r = - try - while Queue.is_empty q.queue do - if q.pledges = 0 || q.failed then raise Exit; - Condition.wait q.cond q.mutex - done; - let v = Queue.pop q.queue in - if pledge then q.pledges <- q.pledges + 1; - Some v - with Exit -> - Condition.broadcast q.cond; - None - in - Mutex.unlock q.mutex; - r +let pop q pledge = Synchronizer.get q pledge -let make_pledge q = - Mutex.lock q.mutex; - q.pledges <- q.pledges + 1; - Mutex.unlock q.mutex +let make_pledge = Synchronizer.make_pledge -let end_pledge q = - Mutex.lock q.mutex; - q.pledges <- q.pledges - 1; - Condition.broadcast q.cond; - Mutex.unlock q.mutex +let end_pledge = Synchronizer.end_pledge let rec read_as_seq (q : 'a t) ~finalizer : 'a Seq.t = fun () -> @@ -47,23 +18,17 @@ let rec read_as_seq (q : 'a t) ~finalizer : 'a Seq.t = Nil | Some v -> Cons (v, read_as_seq q ~finalizer) -let push v q = - Mutex.lock q.mutex; - let was_empty = Queue.is_empty q.queue in - Queue.push v q.queue; - if was_empty then Condition.broadcast q.cond; - Mutex.unlock q.mutex +let push v q = Synchronizer.write v q + +let work_while f q = Synchronizer.work_while f q -let fail q = - Mutex.lock q.mutex; - q.failed <- true; - Condition.broadcast q.cond; - Mutex.unlock q.mutex +let fail = Synchronizer.fail let init () = - { mutex = Mutex.create () - ; cond = Condition.create () - ; queue = Queue.create () - ; pledges = 0 - ; failed = false - } + let q = Queue.create () in + let writter v condvar = + let was_empty = Queue.is_empty q in + Queue.push v q; + if was_empty then Condition.broadcast condvar + in + Synchronizer.init (fun () -> Queue.take_opt q) writter diff --git a/src/data_structures/wq.mli b/src/data_structures/wq.mli index df1da5f92..67f81f0f0 100644 --- a/src/data_structures/wq.mli +++ b/src/data_structures/wq.mli @@ -17,3 +17,5 @@ val end_pledge : 'a t -> unit val fail : 'a t -> unit val read_as_seq : 'a t -> finalizer:(unit -> unit) -> 'a Seq.t + +val work_while : ('a -> ('a -> unit) -> unit) -> 'a t -> unit diff --git a/src/dune b/src/dune index 3b449742b..cc8e4fad9 100644 --- a/src/dune +++ b/src/dune @@ -77,6 +77,7 @@ stack string_map syntax + synchronizer text text_lexer text_parser diff --git a/src/symbolic/symbolic_choice.ml b/src/symbolic/symbolic_choice.ml index 322d883c0..d522c642e 100644 --- a/src/symbolic/symbolic_choice.ml +++ b/src/symbolic/symbolic_choice.ml @@ -87,23 +87,19 @@ module CoreImpl = struct let add_init_task sched task = Wq.push task sched.work_queue - let rec work wls sched callback = - let rec handle_status (t : _ Schedulable.status) sched = + let work wls sched callback = + let rec handle_status (t : _ Schedulable.status) write_back = match t with | Stop -> () | Now x -> callback x - | Yield (_prio, f) -> Wq.push f sched.work_queue + | Yield (_prio, f) -> write_back f | Choice (m1, m2) -> - handle_status m1 sched; - handle_status m2 sched + handle_status m1 write_back; + handle_status m2 write_back in - match Wq.pop sched.work_queue true with - | None -> () - | Some f -> begin - handle_status (Schedulable.run f wls) sched; - Wq.end_pledge sched.work_queue; - work wls sched callback - end + Wq.work_while + (fun f write_back -> handle_status (Schedulable.run f wls) write_back) + sched.work_queue let spawn_worker sched wls_init callback callback_init callback_close = callback_init ();