-
Notifications
You must be signed in to change notification settings - Fork 9
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
For Sender objects throws at runtime if base receiver doesn't have IOScheduler.
- Loading branch information
Showing
8 changed files
with
200 additions
and
95 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,94 @@ | ||
module concurrency.ioscheduler; | ||
|
||
import concurrency.sender : SenderObjectBase, isSender; | ||
import core.time : Duration; | ||
import concepts; | ||
import std.typecons : Nullable, nullable; | ||
|
||
void checkIOScheduler(T)() { | ||
import concurrency.sender : checkSender; | ||
import core.time : msecs; | ||
import std.traits : ReturnType; | ||
alias ReadSender = ReturnType!(T.read); | ||
checkSender!ReadSender(); | ||
// TODO: add other function checks | ||
} | ||
|
||
enum isIOScheduler(T) = is(typeof(checkIOScheduler!T)); | ||
|
||
struct Client { | ||
import std.socket : socket_t; | ||
version(Windows) { | ||
import core.sys.windows.windows : sockaddr, socklen_t; | ||
} else version(Posix) { | ||
import core.sys.posix.sys.socket : sockaddr, socklen_t; | ||
} | ||
|
||
socket_t fd; | ||
sockaddr addr; | ||
socklen_t addrlen; | ||
} | ||
|
||
/// polymorphic IOScheduler | ||
interface IOSchedulerObjectBase { | ||
import std.socket : socket_t; | ||
// TODO: read/write/close aren't just for sockets really | ||
SenderObjectBase!(ubyte[]) read(socket_t fd, return ubyte[] buffer, long offset = 0) @safe; | ||
SenderObjectBase!(Client) accept(socket_t fd) @safe; | ||
SenderObjectBase!(socket_t) connect(socket_t fd, return string address, ushort port) @safe; | ||
SenderObjectBase!(int) write(socket_t fd, return const(ubyte)[] buffer, long offset = 0) @safe; | ||
SenderObjectBase!(void) close(socket_t fd) @safe; | ||
} | ||
|
||
struct NullIOScheduler { | ||
import std.socket : socket_t; | ||
import concurrency.sender : ValueSender; | ||
|
||
string errorMsg; | ||
|
||
ValueSender!(ubyte[]) read(socket_t fd, return ubyte[] buffer, long offset = 0) @safe { | ||
throw new Exception(errorMsg); | ||
} | ||
ValueSender!(Client) accept(socket_t fd) @safe { | ||
throw new Exception(errorMsg); | ||
} | ||
ValueSender!(socket_t) connect(socket_t fd, return string address, ushort port) @safe { | ||
throw new Exception(errorMsg); | ||
} | ||
ValueSender!(int) write(socket_t fd, return const(ubyte)[] buffer, long offset = 0) @safe { | ||
throw new Exception(errorMsg); | ||
} | ||
ValueSender!(void) close(socket_t fd) @safe { | ||
throw new Exception(errorMsg); | ||
} | ||
} | ||
|
||
class IOSchedulerObject(S) : IOSchedulerObjectBase { | ||
import concurrency.sender : toSenderObject; | ||
S scheduler; | ||
this(S scheduler) { | ||
this.scheduler = scheduler; | ||
} | ||
|
||
SenderObjectBase!(ubyte[]) read(socket_t fd, return ubyte[] buffer, long offset = 0) @safe { | ||
return scheduler.read(fd, buffer, offset).toSenderObject(); | ||
} | ||
SenderObjectBase!(Client) accept(socket_t fd) @safe { | ||
return scheduler.accept(fd).toSenderObject(); | ||
} | ||
// TODO: is trusted because of scope string address | ||
SenderObjectBase!(socket_t) connect(socket_t fd, return string address, ushort port) @trusted { | ||
string adr = address; | ||
return scheduler.connect(fd, adr, port).toSenderObject(); | ||
} | ||
SenderObjectBase!(int) write(socket_t fd, return const(ubyte)[] buffer, long offset = 0) @safe { | ||
return scheduler.write(fd, buffer, offset).toSenderObject(); | ||
} | ||
SenderObjectBase!(void) close(socket_t fd) @safe { | ||
return scheduler.close(fd).toSenderObject(); | ||
} | ||
} | ||
|
||
IOSchedulerObjectBase toIOSchedulerObject(S)(S scheduler) { | ||
return new IOSchedulerObject!(S)(scheduler); | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
module concurrency.operations.withioscheduler; | ||
|
||
import concurrency; | ||
import concurrency.receiver; | ||
import concurrency.sender; | ||
import concurrency.stoptoken; | ||
import concepts; | ||
import std.traits; | ||
|
||
auto withIOScheduler(Sender, IOScheduler)(Sender sender, IOScheduler ioScheduler) { | ||
return WithIOSchedulerSender!(Sender, IOScheduler)(sender, ioScheduler); | ||
} | ||
|
||
private struct WithIOSchedulerReceiver(Receiver, Value, IOScheduler) { | ||
Receiver receiver; | ||
IOScheduler ioScheduler; | ||
static if (is(Value == void)) { | ||
void setValue() @safe { | ||
receiver.setValue(); | ||
} | ||
} else { | ||
void setValue(Value value) @safe { | ||
receiver.setValue(value); | ||
} | ||
} | ||
|
||
void setDone() @safe nothrow { | ||
receiver.setDone(); | ||
} | ||
|
||
void setError(Throwable e) @safe nothrow { | ||
receiver.setError(e); | ||
} | ||
|
||
auto getIOScheduler() @safe nothrow { | ||
return ioScheduler; | ||
} | ||
|
||
mixin ForwardExtensionPoints!receiver; | ||
} | ||
|
||
struct WithIOSchedulerSender(Sender, IOScheduler) if (models!(Sender, isSender)) { | ||
alias Value = Sender.Value; | ||
Sender sender; | ||
IOScheduler ioScheduler; | ||
auto connect(Receiver)(return Receiver receiver) @safe return scope { | ||
alias R = WithIOSchedulerReceiver!(Receiver, Sender.Value, IOScheduler); | ||
// ensure NRVO | ||
auto op = sender.connect(R(receiver, ioScheduler)); | ||
return op; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.