Skip to content

Commit

Permalink
Merge pull request #9 from ismailhabib/logical-clock
Browse files Browse the repository at this point in the history
feat: Add a logical clock to all the messages created within an actor
  • Loading branch information
ismailhabib authored Oct 23, 2018
2 parents 4895476 + bf26359 commit 316ddce
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 19 deletions.
10 changes: 10 additions & 0 deletions src/Actor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ export type MailBoxMessage<T> = {
type: ValidActorMethodPropNames<T>;
payload: PayloadPropNames<T>[];
senderAddress: Address | null;
id: number;
callback: (error?: any, result?: any) => void;
};

Expand Down Expand Up @@ -71,12 +72,14 @@ function createProxy<T>(
targetAddressorActorRef,
prop as any,
sender || null,
sender ? actorSystem._getMessageId(sender) : 0,
...payload
)
: actorSystem.sendMessage(
targetAddressorActorRef,
prop as any,
sender || null,
sender ? actorSystem._getMessageId(sender) : 0,
...payload
);
};
Expand Down Expand Up @@ -111,6 +114,7 @@ export abstract class Actor<InitParam = undefined> {

private timerId: any | null;
private currentPromise: Promise<any> | CancellablePromise<any> | undefined;
private _currentMessageId = 0;

constructor(
name: string,
Expand All @@ -126,9 +130,14 @@ export abstract class Actor<InitParam = undefined> {
});
}

get currentMessageId() {
return this._currentMessageId++;
}

pushToMailbox = <K extends ValidActorMethodPropNames<this>, L extends PayloadPropNames<this>>(
type: K,
senderAddress: Address | null,
id: number,
...payload: L[]
): Promise<any> => {
this.log(
Expand All @@ -154,6 +163,7 @@ export abstract class Actor<InitParam = undefined> {
type,
payload,
senderAddress,
id,
callback: (error, result) => {
if (error) {
reject(error);
Expand Down
50 changes: 31 additions & 19 deletions src/ActorSystem.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,19 +62,19 @@ export class ActorSystem {
const actorRef = this.findActor(interActorSystemMessage.targetAddress);
this.log("The destination address is", interActorSystemMessage.targetAddress);
if (actorRef) {
const { mode, type, payload, senderAddress } = interActorSystemMessage;
const { mode, type, payload, id, senderAddress } = interActorSystemMessage;
if (mode === "send") {
this.log(
`Sending the message to the appropriate actor. Type: ${type}, sender: ${senderAddress}, and payload:`,
payload
);
this.sendMessageAndWait(actorRef, type, senderAddress, ...payload);
this.sendMessageAndWait(actorRef, type, senderAddress, id, ...payload);
} else {
this.log(
`Sending the question to the appropriate actor. Type: ${type}, sender: ${senderAddress}, and payload:`,
payload
);
this.sendMessageAndWait(actorRef, type, senderAddress, ...payload).then(
this.sendMessageAndWait(actorRef, type, senderAddress, id, ...payload).then(
message => {
this.log(
`Received an answer, sending the answer "${message}" for the question with type: ${type}, sender: ${senderAddress}, and payload:`,
Expand Down Expand Up @@ -111,12 +111,7 @@ export class ActorSystem {
};

removeActor = (refOrAddress: ActorRef<any> | Address) => {
let address: Address;
if (refOrAddress instanceof ActorRef) {
address = refOrAddress.address;
} else {
address = refOrAddress;
}
const address = addressOf(refOrAddress);

if (address.actorSystemName !== this.name) {
throw new Error("Cannot remove actor that does not belong to this actor system");
Expand All @@ -130,8 +125,12 @@ export class ActorSystem {
}
};

ref = <T>(address: Address): ActorRef<T> => {
return new ActorRef<T>(address, this);
ref = <T>(addressOrActorRef: Address | ActorRef<T>): ActorRef<T> => {
if (addressOrActorRef instanceof ActorRef) {
return addressOrActorRef;
} else {
return new ActorRef<T>(addressOrActorRef, this);
}
};

findActor = <T>(address: Address): ActorRef<T> | null => {
Expand All @@ -153,9 +152,10 @@ export class ActorSystem {
target: ActorRef<any> | Address,
type: string,
senderAddress: Address | null,
id: number,
...payload: any[]
): void => {
this.sendMessageAndWait(target, type, senderAddress, ...payload).then(
this.sendMessageAndWait(target, type, senderAddress, id, ...payload).then(
() => {
/* do nothing */
},
Expand All @@ -179,6 +179,7 @@ export class ActorSystem {
target: ActorRef<any> | Address,
type: string,
senderAddress: Address | null,
id: number,
...payload: any[]
): Promise<any> => {
this.log(
Expand All @@ -190,18 +191,13 @@ export class ActorSystem {
"Payload",
payload
);
let address: Address;
if (target instanceof ActorRef) {
address = target.address;
} else {
address = target;
}
const address = addressOf(target);

if (this.isLocalAddress(address)) {
const actor = this.actorRegistry[address.localAddress];
if (actor) {
this.log("Found the actor. Sending the message");
return actor.pushToMailbox(type, senderAddress, ...payload);
return actor.pushToMailbox(type, senderAddress, id, ...payload);
} else {
this.log("Unable to find the actor. It might have died");
return Promise.reject("Actor not found");
Expand All @@ -218,6 +214,7 @@ export class ActorSystem {
targetAddress: address,
senderAddress: senderAddress,
type: type,
id: id,
payload: payload
},
message => resolve(message)
Expand All @@ -234,6 +231,11 @@ export class ActorSystem {
return address.actorSystemName === this.name;
}

_getMessageId(addressOrActorRef: Address | ActorRef<any>) {
const address = addressOf(addressOrActorRef);
const actor = this.actorRegistry[address.localAddress];
return actor.currentMessageId;
}
private log(...message: any[]) {
if (
(process.env && process.env.ACTRIX_DEBUG) ||
Expand All @@ -243,3 +245,13 @@ export class ActorSystem {
}
}
}

function addressOf(addressOrActorRef: Address | ActorRef<any>) {
let address: Address;
if (addressOrActorRef instanceof ActorRef) {
address = addressOrActorRef.address;
} else {
address = addressOrActorRef;
}
return address;
}
1 change: 1 addition & 0 deletions src/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ export type InterActorSystemMessage =
mode: "send" | "ask"; // 'send' is probably no longer needed
type: string;
payload: any[];
id: number;
targetAddress: Address;
senderAddress: Address | null;
}
Expand Down

0 comments on commit 316ddce

Please sign in to comment.