Skip to content

Commit

Permalink
Merge pull request #10 from ncthbrt/persistent-querying
Browse files Browse the repository at this point in the history
Added persistent queries
  • Loading branch information
ncthbrt authored May 19, 2018
2 parents e72ae15 + 90296cd commit 7e12da2
Show file tree
Hide file tree
Showing 8 changed files with 252 additions and 27 deletions.
1 change: 1 addition & 0 deletions .bsb.lock
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
33093
109 changes: 109 additions & 0 deletions __tests__/Actor_Test.re
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,16 @@ module StringMap = Map.Make(StringCompare);
external createMockPersistenceEngine : unit => persistenceEngine =
"MockPersistenceEngine";

[@bs.module "nact/test/mock-persistence-engine"] [@bs.new]
external createPersistenceEngineWithData : 'a => persistenceEngine =
"MockPersistenceEngine";

let toPersistentEvents = (key, events) =>
events
|> Belt.Array.mapWithIndex(_, (i, e) =>
{"data": e, "sequenceNumber": i + 1, "key": key, "createdAt": i}
);

let (?:) = v => resolve(v);

let (>=>) = (promise1, promise2) => then_(promise2, promise1);
Expand Down Expand Up @@ -541,6 +551,105 @@ describe("Persistent Actor", () => {
});
});

describe("Persistent Query", () => {
testPromise("correctly replays events", () => {
let system =
start(
~persistenceEngine=
createPersistenceEngineWithData(
Js.Dict.fromList([
(
"calculator",
toPersistentEvents(
"calculator",
[|`Add(10), `Subtract(2), `Add(3)|],
),
),
]),
),
(),
);
let query =
persistentQuery(
~key="calculator",
system,
(total, msg) =>
resolve(
switch (msg) {
| `Add(number) => total + number
| `Subtract(number) => total - number
},
),
0,
);
query() >=> (result => expect(result) |> toEqual(11) |> resolve);
});
testPromise("can specify a custom decoder", () => {
let decoder = json =>
switch (json |> unsafeDecoder) {
| `Add(number) => `Add(number * 2)
| x => x
};
let system =
start(
~persistenceEngine=
createPersistenceEngineWithData(
Js.Dict.fromList([
(
"calculator",
toPersistentEvents(
"calculator",
[|`Add(10), `Subtract(2), `Add(3)|],
),
),
]),
),
(),
);
let query =
persistentQuery(
~key="calculator",
~decoder,
~encoder=id => Obj.magic(id),
system,
total =>
fun
| `Add(number) => resolve(total + number)
| `Subtract(number) => resolve(total - number),
0,
);
query() >=> (result => expect(result) |> toEqual(24) |> resolve);
});
testPromise("rejects after throwing an exception", () => {
let system =
start(
~persistenceEngine=
createPersistenceEngineWithData(
Js.Dict.fromList([
(
"calculator",
toPersistentEvents(
"calculator",
[|`Add(10), `Subtract(2), `Add(3)|],
),
),
]),
),
(),
);
let query =
persistentQuery(
~key="calculator",
system,
((), _) => raise(TragicException),
(),
);
query()
>=> (() => resolve(fail("This should have thrown")))
>/=> ((_) => resolve(pass));
});
});

describe("useStatefulSupervisionPolicy", () =>
testPromise("can be used to construct stateful supervision policies", () => {
let system = start();
Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "reason-nact",
"version": "4.0.1",
"version": "4.1.0",
"description": "let reason-nact = (node.js, reason, actors) ⇒ your µ services have never been so typed",
"repository": "https://github.com/ncthbrt/reason-nact",
"author": "Nick Cuthbert (https://github.com/ncthbrt)",
Expand Down Expand Up @@ -44,6 +44,6 @@
"nyc": "^11.3.0"
},
"dependencies": {
"nact": "^7.0.2"
"nact": "^7.1.2"
}
}
44 changes: 40 additions & 4 deletions src/Nact.js
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,11 @@ function useStatefulSupervisionPolicy(f, initialState) {

function spawn(name, shutdownAfter, onCrash, param, func, initialState) {
var options = {
initialState: initialState,
shutdownAfter: Js_null_undefined.fromOption(shutdownAfter),
onCrash: mapSupervisionFunction(onCrash)
};
var f = function (possibleState, msg, ctx) {
var state = Belt_Option.getWithDefault((possibleState == null) ? /* None */0 : [possibleState], initialState);
var f = function (state, msg, ctx) {
try {
return Curry._3(func, state, msg, mapCtx(ctx));
}
Expand All @@ -193,6 +193,7 @@ function spawn(name, shutdownAfter, onCrash, param, func, initialState) {
function spawnStateless(name, shutdownAfter, param, func) {
var options = {
shutdownAfter: Js_null_undefined.fromOption(shutdownAfter),
initialState: undefined,
onCrash: undefined
};
var f = function (msg, ctx) {
Expand Down Expand Up @@ -221,6 +222,7 @@ function spawnPersistent(key, name, shutdownAfter, snapshotEvery, onCrash, decod
return unsafeEncoder(prim);
}));
var options = {
initialState: initialState,
shutdownAfter: Js_null_undefined.fromOption(shutdownAfter),
onCrash: mapSupervisionFunction(onCrash),
snapshotEvery: Js_null_undefined.fromOption(snapshotEvery),
Expand All @@ -230,9 +232,8 @@ function spawnPersistent(key, name, shutdownAfter, snapshotEvery, onCrash, decod
snapshotDecoder: stateDecoder$1
};
var f = function (state, msg, ctx) {
var state$1 = (state == null) ? initialState : state;
try {
return Curry._3(func, state$1, msg, mapPersistentCtx(ctx));
return Curry._3(func, state, msg, mapPersistentCtx(ctx));
}
catch (raw_err){
return Promise.reject(Js_exn.internalToOCamlException(raw_err));
Expand All @@ -242,6 +243,40 @@ function spawnPersistent(key, name, shutdownAfter, snapshotEvery, onCrash, decod
return /* ActorRef */[untypedRef];
}

function persistentQuery(key, snapshotKey, cacheDuration, snapshotEvery, decoder, stateDecoder, encoder, stateEncoder, param, func, initialState) {
var decoder$1 = Belt_Option.getWithDefault(decoder, (function (prim) {
return unsafeDecoder(prim);
}));
var stateDecoder$1 = Belt_Option.getWithDefault(stateDecoder, (function (prim) {
return unsafeDecoder(prim);
}));
var stateEncoder$1 = Belt_Option.getWithDefault(stateEncoder, (function (prim) {
return unsafeEncoder(prim);
}));
var encoder$1 = Belt_Option.getWithDefault(encoder, (function (prim) {
return unsafeEncoder(prim);
}));
var options = {
initialState: initialState,
cacheDuration: Js_null_undefined.fromOption(cacheDuration),
snapshotEvery: Js_null_undefined.fromOption(snapshotEvery),
snapshotKey: Js_null_undefined.fromOption(snapshotKey),
encoder: encoder$1,
decoder: decoder$1,
snapshotEncoder: stateEncoder$1,
snapshotDecoder: stateDecoder$1
};
var f = function (state, msg) {
try {
return Curry._2(func, state, msg);
}
catch (raw_err){
return Promise.reject(Js_exn.internalToOCamlException(raw_err));
}
};
return Nact.persistentQuery(param[0], f, key, options);
}

function stop(param) {
Nact.stop(param[0]);
return /* () */0;
Expand Down Expand Up @@ -355,6 +390,7 @@ exports.useStatefulSupervisionPolicy = useStatefulSupervisionPolicy;
exports.spawn = spawn;
exports.spawnStateless = spawnStateless;
exports.spawnPersistent = spawnPersistent;
exports.persistentQuery = persistentQuery;
exports.spawnAdapter = spawnAdapter;
exports.start = start;
exports.stop = stop;
Expand Down
57 changes: 44 additions & 13 deletions src/Nact.re
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ type statelessActor('msg, 'parentMsg) =
type persistentActor('state, 'msg, 'parentMsg) =
('state, 'msg, persistentCtx('msg, 'parentMsg)) => Js.Promise.t('state);

type persistentQuery('state) = unit => Js.Promise.t('state);

let useStatefulSupervisionPolicy = (f, initialState) => {
let state = ref(initialState);
(msg, err, ctx) => {
Expand All @@ -201,25 +203,22 @@ let spawn =
initialState,
) => {
let options = {
"initialState": Js.Nullable.return(initialState),
"shutdownAfter": fromOption(shutdownAfter),
"onCrash": mapSupervisionFunction(onCrash),
};
let f = (possibleState: Js.nullable('state), msg: 'msg, ctx) => {
let state =
possibleState
|. Js.Nullable.toOption
|. Belt.Option.getWithDefault(initialState);
let f = (state, msg: 'msg, ctx) =>
try (func(state, msg, mapCtx(ctx))) {
| err => reject(err)
};
};
let untypedRef = Nact_bindings.spawn(parent, f, fromOption(name), options);
ActorRef(untypedRef);
};

let spawnStateless = (~name=?, ~shutdownAfter=?, ActorRef(parent), func) => {
let options = {
"shutdownAfter": fromOption(shutdownAfter),
"initialState": Js.Nullable.undefined,
"onCrash": mapSupervisionFunction(None),
};
let f = (msg, ctx) =>
Expand Down Expand Up @@ -253,6 +252,7 @@ let spawnPersistent =
stateEncoder |. Belt.Option.getWithDefault(unsafeEncoder);
let encoder = encoder |. Belt.Option.getWithDefault(unsafeEncoder);
let options: Nact_bindings.persistentActorOptions('msg, 'parentMsg, 'state) = {
"initialState": initialState,
"shutdownAfter": fromOption(shutdownAfter),
"onCrash": mapSupervisionFunction(onCrash),
"snapshotEvery": fromOption(snapshotEvery),
Expand All @@ -261,21 +261,52 @@ let spawnPersistent =
"snapshotEncoder": stateEncoder,
"snapshotDecoder": stateDecoder,
};
let f = (state, msg, ctx) => {
let state =
switch (Js.Nullable.toOption(state)) {
| None => initialState
| Some(state) => state
};
let f = (state, msg, ctx) =>
try (func(state, msg, mapPersistentCtx(ctx))) {
| err => reject(err)
};
};
let untypedRef =
Nact_bindings.spawnPersistent(parent, f, key, fromOption(name), options);
ActorRef(untypedRef);
};

let persistentQuery =
(
~key,
~snapshotKey=?,
~cacheDuration=?,
~snapshotEvery=?,
~decoder=?,
~stateDecoder=?,
~encoder=?,
~stateEncoder=?,
ActorRef(actor),
func,
initialState,
) => {
let decoder = decoder |. Belt.Option.getWithDefault(unsafeDecoder);
let stateDecoder =
stateDecoder |. Belt.Option.getWithDefault(unsafeDecoder);
let stateEncoder =
stateEncoder |. Belt.Option.getWithDefault(unsafeEncoder);
let encoder = encoder |. Belt.Option.getWithDefault(unsafeEncoder);
let options: Nact_bindings.persistentQueryOptions('msg, 'state) = {
"initialState": initialState,
"cacheDuration": fromOption(cacheDuration),
"snapshotEvery": fromOption(snapshotEvery),
"snapshotKey": fromOption(snapshotKey),
"encoder": encoder,
"decoder": decoder,
"snapshotEncoder": stateEncoder,
"snapshotDecoder": stateDecoder,
};
let f = (state, msg) =>
try (func(state, msg)) {
| err => reject(err)
};
Nact_bindings.persistentQuery(actor, f, key, options);
};

let stop = (ActorRef(reference)) => Nact_bindings.stop(reference);

let dispatch = (ActorRef(recipient), msg) =>
Expand Down
18 changes: 18 additions & 0 deletions src/Nact.rei
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,24 @@ let spawnPersistent:
) =>
actorRef('msg);

type persistentQuery('state) = unit => Js.Promise.t('state);

let persistentQuery:
(
~key: string,
~snapshotKey: string=?,
~cacheDuration: int=?,
~snapshotEvery: int=?,
~decoder: decoder('msg)=?,
~stateDecoder: decoder('state)=?,
~encoder: encoder('msg)=?,
~stateEncoder: encoder('state)=?,
actorRef('parent),
('state, 'msg) => Js.Promise.t('state),
'state
) =>
persistentQuery('state);

let spawnAdapter:
(~name: string=?, actorRef('parentMsg), 'msg => 'parentMsg) =>
actorRef('msg);
Expand Down
Loading

0 comments on commit 7e12da2

Please sign in to comment.