diff --git a/source/concurrency/io/iouring.d b/source/concurrency/io/iouring.d index 9d788ac..ca40bc1 100644 --- a/source/concurrency/io/iouring.d +++ b/source/concurrency/io/iouring.d @@ -314,10 +314,12 @@ struct RunOp(Sender, Receiver) { import concurrency.operations.dofinally; import concurrency.operations.whenall; import concurrency.operations.withscheduler; + import concurrency.operations.withioscheduler; alias RunSender = JustFromSender!(void delegate() @trusted shared); alias SenderWithScheduler = WithSchedulerSender!(Sender, IOUringScheduler); - alias ValueSender = DoFinallySender!(SenderWithScheduler, void delegate() @safe nothrow shared); + alias SenderWithIOScheduler = WithIOSchedulerSender!(SenderWithScheduler, IOUringScheduler); + alias ValueSender = DoFinallySender!(SenderWithIOScheduler, void delegate() @safe nothrow shared); alias CombinedSender = WhenAllSender!(ValueSender, RunSender); alias Op = OpType!(CombinedSender, Receiver); @@ -333,8 +335,9 @@ struct RunOp(Sender, Receiver) { this(IOUringContext* context, Sender sender, return Receiver receiver) @trusted return scope { this.context = context; shared IOUringContext* sharedContext = cast(shared)context; + auto scheduler = IOUringScheduler(sharedContext); op = whenAll( - sender.withScheduler(IOUringScheduler(cast(shared)context)).doFinally(() @safe nothrow shared { + sender.withScheduler(scheduler).withIOScheduler(scheduler).doFinally(() @safe nothrow shared { stopSource.stop(); sharedContext.wakeup(); }), @@ -522,7 +525,7 @@ struct ReadOperation(Receiver) { } struct AcceptSender { - import concurrency.scheduler : Client; + import concurrency.ioscheduler : Client; import std.socket : socket_t; alias Value = Client; shared IOUringContext* context; @@ -540,7 +543,7 @@ struct AcceptSender { struct AcceptOperation(Receiver) { import core.sys.posix.sys.socket : sockaddr, socklen_t; import core.sys.posix.netinet.in_; - import concurrency.scheduler : Client; + import concurrency.ioscheduler : Client; import std.socket : socket_t; socket_t fd; diff --git a/source/concurrency/io/package.d b/source/concurrency/io/package.d index d78a08a..25ff857 100644 --- a/source/concurrency/io/package.d +++ b/source/concurrency/io/package.d @@ -1,7 +1,7 @@ module concurrency.io; import concurrency.io.iouring; -import concurrency.scheduler : Client; +import concurrency.ioscheduler : Client; import std.socket : socket_t; @@ -19,7 +19,7 @@ struct ReadAsyncSender { long offset; auto connect(Receiver)(return Receiver receiver) @safe return scope { // ensure NRVO - auto op = receiver.getScheduler().read(fd, buffer, offset).connect(receiver); + auto op = receiver.getIOScheduler().read(fd, buffer, offset).connect(receiver); return op; } } @@ -33,7 +33,7 @@ struct AcceptAsyncSender { socket_t fd; auto connect(Receiver)(return Receiver receiver) @safe return scope { // ensure NRVO - auto op = receiver.getScheduler().accept(fd).connect(receiver); + auto op = receiver.getIOScheduler().accept(fd).connect(receiver); return op; } } @@ -50,7 +50,7 @@ struct ConnectAsyncSender { ushort port; auto connect(Receiver)(return Receiver receiver) @safe return scope { // ensure NRVO - auto op = receiver.getScheduler().connect(fd, address, port).connect(receiver); + auto op = receiver.getIOScheduler().connect(fd, address, port).connect(receiver); return op; } } @@ -66,7 +66,7 @@ struct WriteAsyncSender { long offset; auto connect(Receiver)(return Receiver receiver) @safe return scope { // ensure NRVO - auto op = receiver.getScheduler().write(fd, buffer, offset).connect(receiver); + auto op = receiver.getIOScheduler().write(fd, buffer, offset).connect(receiver); return op; } } @@ -80,7 +80,7 @@ struct CloseAsyncSender { socket_t fd; auto connect(Receiver)(return Receiver receiver) @safe return scope { // ensure NRVO - auto op = receiver.getScheduler().close(fd).connect(receiver); + auto op = receiver.getIOScheduler().close(fd).connect(receiver); return op; } } diff --git a/source/concurrency/ioscheduler.d b/source/concurrency/ioscheduler.d new file mode 100644 index 0000000..de3e07f --- /dev/null +++ b/source/concurrency/ioscheduler.d @@ -0,0 +1,94 @@ +module concurrency.ioscheduler; + +import concurrency.sender : SenderObjectBase, isSender; +import core.time : Duration; +import concepts; +import std.typecons : Nullable, nullable; + +void checkIOScheduler(T)() { + import concurrency.sender : checkSender; + import core.time : msecs; + import std.traits : ReturnType; + alias ReadSender = ReturnType!(T.read); + checkSender!ReadSender(); + // TODO: add other function checks +} + +enum isIOScheduler(T) = is(typeof(checkIOScheduler!T)); + +struct Client { + import std.socket : socket_t; + version(Windows) { + import core.sys.windows.windows : sockaddr, socklen_t; + } else version(Posix) { + import core.sys.posix.sys.socket : sockaddr, socklen_t; + } + + socket_t fd; + sockaddr addr; + socklen_t addrlen; +} + +/// polymorphic IOScheduler +interface IOSchedulerObjectBase { + import std.socket : socket_t; + // TODO: read/write/close aren't just for sockets really + SenderObjectBase!(ubyte[]) read(socket_t fd, return ubyte[] buffer, long offset = 0) @safe; + SenderObjectBase!(Client) accept(socket_t fd) @safe; + SenderObjectBase!(socket_t) connect(socket_t fd, return string address, ushort port) @safe; + SenderObjectBase!(int) write(socket_t fd, return const(ubyte)[] buffer, long offset = 0) @safe; + SenderObjectBase!(void) close(socket_t fd) @safe; +} + +struct NullIOScheduler { + import std.socket : socket_t; + import concurrency.sender : ValueSender; + + string errorMsg; + + ValueSender!(ubyte[]) read(socket_t fd, return ubyte[] buffer, long offset = 0) @safe { + throw new Exception(errorMsg); + } + ValueSender!(Client) accept(socket_t fd) @safe { + throw new Exception(errorMsg); + } + ValueSender!(socket_t) connect(socket_t fd, return string address, ushort port) @safe { + throw new Exception(errorMsg); + } + ValueSender!(int) write(socket_t fd, return const(ubyte)[] buffer, long offset = 0) @safe { + throw new Exception(errorMsg); + } + ValueSender!(void) close(socket_t fd) @safe { + throw new Exception(errorMsg); + } +} + +class IOSchedulerObject(S) : IOSchedulerObjectBase { + import concurrency.sender : toSenderObject; + S scheduler; + this(S scheduler) { + this.scheduler = scheduler; + } + + SenderObjectBase!(ubyte[]) read(socket_t fd, return ubyte[] buffer, long offset = 0) @safe { + return scheduler.read(fd, buffer, offset).toSenderObject(); + } + SenderObjectBase!(Client) accept(socket_t fd) @safe { + return scheduler.accept(fd).toSenderObject(); + } + // TODO: is trusted because of scope string address + SenderObjectBase!(socket_t) connect(socket_t fd, return string address, ushort port) @trusted { + string adr = address; + return scheduler.connect(fd, adr, port).toSenderObject(); + } + SenderObjectBase!(int) write(socket_t fd, return const(ubyte)[] buffer, long offset = 0) @safe { + return scheduler.write(fd, buffer, offset).toSenderObject(); + } + SenderObjectBase!(void) close(socket_t fd) @safe { + return scheduler.close(fd).toSenderObject(); + } +} + +IOSchedulerObjectBase toIOSchedulerObject(S)(S scheduler) { + return new IOSchedulerObject!(S)(scheduler); +} diff --git a/source/concurrency/operations/withioscheduler.d b/source/concurrency/operations/withioscheduler.d new file mode 100644 index 0000000..8b53981 --- /dev/null +++ b/source/concurrency/operations/withioscheduler.d @@ -0,0 +1,52 @@ +module concurrency.operations.withioscheduler; + +import concurrency; +import concurrency.receiver; +import concurrency.sender; +import concurrency.stoptoken; +import concepts; +import std.traits; + +auto withIOScheduler(Sender, IOScheduler)(Sender sender, IOScheduler ioScheduler) { + return WithIOSchedulerSender!(Sender, IOScheduler)(sender, ioScheduler); +} + +private struct WithIOSchedulerReceiver(Receiver, Value, IOScheduler) { + Receiver receiver; + IOScheduler ioScheduler; + static if (is(Value == void)) { + void setValue() @safe { + receiver.setValue(); + } + } else { + void setValue(Value value) @safe { + receiver.setValue(value); + } + } + + void setDone() @safe nothrow { + receiver.setDone(); + } + + void setError(Throwable e) @safe nothrow { + receiver.setError(e); + } + + auto getIOScheduler() @safe nothrow { + return ioScheduler; + } + + mixin ForwardExtensionPoints!receiver; +} + +struct WithIOSchedulerSender(Sender, IOScheduler) if (models!(Sender, isSender)) { + alias Value = Sender.Value; + Sender sender; + IOScheduler ioScheduler; + auto connect(Receiver)(return Receiver receiver) @safe return scope { + alias R = WithIOSchedulerReceiver!(Receiver, Sender.Value, IOScheduler); + // ensure NRVO + auto op = sender.connect(R(receiver, ioScheduler)); + return op; + } +} diff --git a/source/concurrency/receiver.d b/source/concurrency/receiver.d index f9846fb..09ab76c 100644 --- a/source/concurrency/receiver.d +++ b/source/concurrency/receiver.d @@ -25,12 +25,19 @@ mixin template ForwardExtensionPoints(alias receiver) { auto getScheduler() nothrow @safe { return receiver.getScheduler(); } + + static if (__traits(hasMember, receiver, "getIOScheduler")) { + auto getIOScheduler() nothrow @safe { + return receiver.getIOScheduler(); + } + } } /// A polymorphic receiver of type T interface ReceiverObjectBase(T) { import concurrency.stoptoken : StopToken; import concurrency.scheduler : SchedulerObjectBase; + import concurrency.ioscheduler : IOSchedulerObjectBase; static assert(models!(ReceiverObjectBase!T, isReceiver)); static if (is(T == void)) void setValue() @safe; @@ -40,6 +47,7 @@ interface ReceiverObjectBase(T) { void setError(Throwable e) nothrow @safe; shared(StopToken) getStopToken() nothrow @safe; SchedulerObjectBase getScheduler() scope nothrow @safe; + IOSchedulerObjectBase getIOScheduler() scope nothrow @safe; } struct NullReceiver(T) { diff --git a/source/concurrency/scheduler.d b/source/concurrency/scheduler.d index da24ae3..d068b42 100644 --- a/source/concurrency/scheduler.d +++ b/source/concurrency/scheduler.d @@ -17,59 +17,12 @@ void checkScheduler(T)() { enum isScheduler(T) = is(typeof(checkScheduler!T)); -struct Client { - import std.socket : socket_t; - version(Windows) { - import core.sys.windows.windows : sockaddr, socklen_t; - } else version(Posix) { - import core.sys.posix.sys.socket : sockaddr, socklen_t; - } - - socket_t fd; - sockaddr addr; - socklen_t addrlen; -} - /// polymorphic Scheduler interface SchedulerObjectBase { - import std.socket : socket_t; SenderObjectBase!void schedule() @safe; SenderObjectBase!void scheduleAfter(Duration d) @safe; - // TODO: do these belong here? - SenderObjectBase!(ubyte[]) read(socket_t fd, return ubyte[] buffer, long offset = 0) @safe; - SenderObjectBase!(Client) accept(socket_t fd) @safe; - SenderObjectBase!(socket_t) connect(socket_t fd, return string address, ushort port) @safe; - SenderObjectBase!(int) write(socket_t fd, return const(ubyte)[] buffer, long offset = 0) @safe; - SenderObjectBase!(void) close(socket_t fd) @safe; } - -// We can pull the LocalThreadExecutor (and its schedule/scheduleAfter) out into a specialized context. -// Just like we did with the iouring context - -// The interesting bit is that the syncWait algorithm then might be inferred as @nogc - -// The question remains how we would want to integrate these. -// With iouring we created a runner that would take a sender and would inject the scheduler and allow itself to steal the current thread. - -// That last part is important, we don't want to spawn a thread just to run timers, we can do it perfectly fine on the current thread. -// Same with iouring or other event loops. - -// That said, we can, if we want to, move the event loop to another thread. - -// The only thing we can't do is cross schedule timers from one thread to another. -// Well, that is not true, we can create two context objects that expose a Scheduler - - - - - - -// Guess we just have to write it and see.... - -// Dietmar Kuhl used a iocontext with a run function that allows running it on the current thread. -// In rant I had the iocontext's runner return a sender so you could await that. - class SchedulerObject(S) : SchedulerObjectBase { import concurrency.sender : toSenderObject; S scheduler; @@ -84,43 +37,6 @@ class SchedulerObject(S) : SchedulerObjectBase { SenderObjectBase!void scheduleAfter(Duration d) @safe { return scheduler.scheduleAfter(d).toSenderObject(); } - SenderObjectBase!(ubyte[]) read(socket_t fd, return ubyte[] buffer, long offset = 0) @safe { - static if (__traits(hasMember, S, "read")) { - return scheduler.read(fd, buffer, offset).toSenderObject(); - } else { - throw new Exception("`read` not implemented on "~S.stringof); - } - } - SenderObjectBase!(Client) accept(socket_t fd) @safe { - static if (__traits(hasMember, S, "accept")) { - return scheduler.accept(fd).toSenderObject(); - } else { - throw new Exception("`accept` not implemented on "~S.stringof); - } - } - // TODO: is trusted because of scope string address - SenderObjectBase!(socket_t) connect(socket_t fd, return string address, ushort port) @trusted { - static if (__traits(hasMember, S, "connect")) { - string adr = address; - return scheduler.connect(fd, adr, port).toSenderObject(); - } else { - throw new Exception("`connect` not implemented on "~S.stringof); - } - } - SenderObjectBase!(int) write(socket_t fd, return const(ubyte)[] buffer, long offset = 0) @safe { - static if (__traits(hasMember, S, "write")) { - return scheduler.write(fd, buffer, offset).toSenderObject(); - } else { - throw new Exception("`write` not implemented on "~S.stringof); - } - } - SenderObjectBase!(void) close(socket_t fd) @safe { - static if (__traits(hasMember, S, "close")) { - return scheduler.close(fd).toSenderObject(); - } else { - throw new Exception("`close` not implemented on "~S.stringof); - } - } } SchedulerObjectBase toSchedulerObject(S)(S scheduler) { diff --git a/source/concurrency/sender.d b/source/concurrency/sender.d index 8b84a78..43324b6 100644 --- a/source/concurrency/sender.d +++ b/source/concurrency/sender.d @@ -42,7 +42,7 @@ import core.time : Duration; /// checks that T is a Sender void checkSender(T)() @safe { - import concurrency.scheduler : SchedulerObjectBase; + import concurrency.ioscheduler : NullIOScheduler; import concurrency.stoptoken : StopToken; T t = T.init; struct Scheduler { @@ -75,6 +75,10 @@ void checkSender(T)() @safe { Scheduler getScheduler() @safe nothrow { return Scheduler.init; } + + NullIOScheduler getIOScheduler() @safe nothrow { + return NullIOScheduler("Testing NullIOScheduler"); + } } scope receiver = Receiver.init; @@ -225,10 +229,12 @@ template toReceiverObject(T) { import concurrency.receiver; import concurrency.stoptoken : StopToken; import concurrency.scheduler : SchedulerObjectBase; + import concurrency.ioscheduler : IOSchedulerObjectBase; return new class(receiver) ReceiverObjectBase!T { Receiver receiver; SchedulerObjectBase scheduler; + IOSchedulerObjectBase ioScheduler; this(Receiver receiver) { this.receiver = receiver; } @@ -262,6 +268,21 @@ template toReceiverObject(T) { } return scheduler; } + + IOSchedulerObjectBase getIOScheduler() nothrow @safe scope { + import concurrency.ioscheduler : toIOSchedulerObject; + static if (__traits(hasMember, receiver, "getIOScheduler")) { + if (ioScheduler is null) { + ioScheduler = receiver.getIOScheduler().toIOSchedulerObject; + } + } else { + import concurrency.ioscheduler : NullIOScheduler; + if (ioScheduler is null) { + ioScheduler = NullIOScheduler("Type " ~Receiver.stringof ~ " doesn't have IOScheduler.").toIOSchedulerObject; + } + } + return ioScheduler; + } }; } } diff --git a/tests/ut/concurrency/io.d b/tests/ut/concurrency/io.d index e54afcb..e5c0348 100644 --- a/tests/ut/concurrency/io.d +++ b/tests/ut/concurrency/io.d @@ -72,3 +72,14 @@ unittest { closeSocket(socket); closeSocket(fd); } + +@safe +@("acceptAsync.missing.ioscheduler") +unittest { + import concurrency.io.socket; + import concurrency.sender; + import std.socket; + acceptAsync(cast(socket_t)0) + .toSenderObject + .syncWait().value.shouldThrow; +}