Skip to content

Commit

Permalink
Create Synchronzier module
Browse files Browse the repository at this point in the history
  • Loading branch information
krtab authored and zapashcanon committed Aug 6, 2024
1 parent 113a08b commit 22e4151
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 62 deletions.
64 changes: 64 additions & 0 deletions src/data_structures/synchronizer.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
(* SPDX-License-Identifier: AGPL-3.0-or-later *)
(* Copyright © 2021-2024 OCamlPro *)
(* Written by the Owi programmers *)

type ('get, 'write) t =
{ mutex : Mutex.t
; cond : Condition.t
; getter : unit -> 'get option
; writter : 'write -> Condition.t -> unit
; mutable pledges : int
; mutable closed : bool
}

let init getter writter =
{ mutex = Mutex.create ()
; cond = Condition.create ()
; getter
; writter
; pledges = 0
; closed = false
}

let get synchro pledge =
let rec inner_loop synchro pledge =
match synchro.getter () with
| None when synchro.pledges = 0 || synchro.closed ->
Condition.broadcast synchro.cond;
None
| None ->
Condition.wait synchro.cond synchro.mutex;
inner_loop synchro pledge
| Some _ as v ->
if pledge then synchro.pledges <- synchro.pledges + 1;
v
in
Mutex.protect synchro.mutex (fun () -> inner_loop synchro pledge)

let write v { writter; cond; mutex; _ } =
Mutex.protect mutex (fun () -> writter v cond)

let make_pledge synchro =
Mutex.lock synchro.mutex;
synchro.pledges <- synchro.pledges + 1;
Mutex.unlock synchro.mutex

let end_pledge synchro =
Mutex.lock synchro.mutex;
synchro.pledges <- synchro.pledges - 1;
Condition.broadcast synchro.cond;
Mutex.unlock synchro.mutex

let fail q =
Mutex.lock q.mutex;
q.closed <- true;
Condition.broadcast q.cond;
Mutex.unlock q.mutex

let rec work_while f q =
match get q true with
| None -> ()
| Some v ->
let () = f v (fun v -> write v q) in
end_pledge q;
work_while f q
16 changes: 16 additions & 0 deletions src/data_structures/synchronizer.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
type (!'get, !'write) t

val init :
(unit -> 'get option) -> ('write -> Condition.t -> unit) -> ('get, 'write) t

val get : ('get, 'write) t -> bool -> 'get option

val write : 'write -> ('get, 'write) t -> unit

val make_pledge : ('get, 'write) t -> unit

val end_pledge : ('get, 'write) t -> unit

val fail : ('get, 'write) t -> unit

val work_while : ('get -> ('write -> unit) -> unit) -> ('get, 'write) t -> unit
65 changes: 15 additions & 50 deletions src/data_structures/wq.ml
Original file line number Diff line number Diff line change
Expand Up @@ -2,42 +2,13 @@
(* Copyright © 2021-2024 OCamlPro *)
(* Written by the Owi programmers *)

type 'a t =
{ mutex : Mutex.t
; cond : Condition.t
; queue : 'a Queue.t
; mutable pledges : int
; mutable failed : bool
}
type 'a t = ('a, 'a) Synchronizer.t

let pop q pledge =
Mutex.lock q.mutex;
let r =
try
while Queue.is_empty q.queue do
if q.pledges = 0 || q.failed then raise Exit;
Condition.wait q.cond q.mutex
done;
let v = Queue.pop q.queue in
if pledge then q.pledges <- q.pledges + 1;
Some v
with Exit ->
Condition.broadcast q.cond;
None
in
Mutex.unlock q.mutex;
r
let pop q pledge = Synchronizer.get q pledge

let make_pledge q =
Mutex.lock q.mutex;
q.pledges <- q.pledges + 1;
Mutex.unlock q.mutex
let make_pledge = Synchronizer.make_pledge

let end_pledge q =
Mutex.lock q.mutex;
q.pledges <- q.pledges - 1;
Condition.broadcast q.cond;
Mutex.unlock q.mutex
let end_pledge = Synchronizer.end_pledge

let rec read_as_seq (q : 'a t) ~finalizer : 'a Seq.t =
fun () ->
Expand All @@ -47,23 +18,17 @@ let rec read_as_seq (q : 'a t) ~finalizer : 'a Seq.t =
Nil
| Some v -> Cons (v, read_as_seq q ~finalizer)

let push v q =
Mutex.lock q.mutex;
let was_empty = Queue.is_empty q.queue in
Queue.push v q.queue;
if was_empty then Condition.broadcast q.cond;
Mutex.unlock q.mutex
let push v q = Synchronizer.write v q

let work_while f q = Synchronizer.work_while f q

let fail q =
Mutex.lock q.mutex;
q.failed <- true;
Condition.broadcast q.cond;
Mutex.unlock q.mutex
let fail = Synchronizer.fail

let init () =
{ mutex = Mutex.create ()
; cond = Condition.create ()
; queue = Queue.create ()
; pledges = 0
; failed = false
}
let q = Queue.create () in
let writter v condvar =
let was_empty = Queue.is_empty q in
Queue.push v q;
if was_empty then Condition.broadcast condvar
in
Synchronizer.init (fun () -> Queue.take_opt q) writter
2 changes: 2 additions & 0 deletions src/data_structures/wq.mli
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,5 @@ val end_pledge : 'a t -> unit
val fail : 'a t -> unit

val read_as_seq : 'a t -> finalizer:(unit -> unit) -> 'a Seq.t

val work_while : ('a -> ('a -> unit) -> unit) -> 'a t -> unit
1 change: 1 addition & 0 deletions src/dune
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
stack
string_map
syntax
synchronizer
text
text_lexer
text_parser
Expand Down
20 changes: 8 additions & 12 deletions src/symbolic/symbolic_choice.ml
Original file line number Diff line number Diff line change
Expand Up @@ -87,23 +87,19 @@ module CoreImpl = struct

let add_init_task sched task = Wq.push task sched.work_queue

let rec work wls sched callback =
let rec handle_status (t : _ Schedulable.status) sched =
let work wls sched callback =
let rec handle_status (t : _ Schedulable.status) write_back =
match t with
| Stop -> ()
| Now x -> callback x
| Yield (_prio, f) -> Wq.push f sched.work_queue
| Yield (_prio, f) -> write_back f
| Choice (m1, m2) ->
handle_status m1 sched;
handle_status m2 sched
handle_status m1 write_back;
handle_status m2 write_back
in
match Wq.pop sched.work_queue true with
| None -> ()
| Some f -> begin
handle_status (Schedulable.run f wls) sched;
Wq.end_pledge sched.work_queue;
work wls sched callback
end
Wq.work_while
(fun f write_back -> handle_status (Schedulable.run f wls) write_back)
sched.work_queue

let spawn_worker sched wls_init callback callback_init callback_close =
callback_init ();
Expand Down

0 comments on commit 22e4151

Please sign in to comment.