From bf26359f39aa4dbb26cac6e28cc32e512eb3a607 Mon Sep 17 00:00:00 2001 From: Habib Date: Tue, 23 Oct 2018 07:45:39 +0200 Subject: [PATCH] feat: Add a logical clock to all the messages created within an actor --- src/Actor.ts | 10 ++++++++++ src/ActorSystem.ts | 50 ++++++++++++++++++++++++++++------------------ src/interfaces.ts | 1 + 3 files changed, 42 insertions(+), 19 deletions(-) diff --git a/src/Actor.ts b/src/Actor.ts index c15a1f4..3bee47e 100644 --- a/src/Actor.ts +++ b/src/Actor.ts @@ -6,6 +6,7 @@ export type MailBoxMessage = { type: ValidActorMethodPropNames; payload: PayloadPropNames[]; senderAddress: Address | null; + id: number; callback: (error?: any, result?: any) => void; }; @@ -71,12 +72,14 @@ function createProxy( targetAddressorActorRef, prop as any, sender || null, + sender ? actorSystem._getMessageId(sender) : 0, ...payload ) : actorSystem.sendMessage( targetAddressorActorRef, prop as any, sender || null, + sender ? actorSystem._getMessageId(sender) : 0, ...payload ); }; @@ -111,6 +114,7 @@ export abstract class Actor { private timerId: any | null; private currentPromise: Promise | CancellablePromise | undefined; + private _currentMessageId = 0; constructor( name: string, @@ -126,9 +130,14 @@ export abstract class Actor { }); } + get currentMessageId() { + return this._currentMessageId++; + } + pushToMailbox = , L extends PayloadPropNames>( type: K, senderAddress: Address | null, + id: number, ...payload: L[] ): Promise => { this.log( @@ -154,6 +163,7 @@ export abstract class Actor { type, payload, senderAddress, + id, callback: (error, result) => { if (error) { reject(error); diff --git a/src/ActorSystem.ts b/src/ActorSystem.ts index 99f6edc..505732d 100644 --- a/src/ActorSystem.ts +++ b/src/ActorSystem.ts @@ -62,19 +62,19 @@ export class ActorSystem { const actorRef = this.findActor(interActorSystemMessage.targetAddress); this.log("The destination address is", interActorSystemMessage.targetAddress); if (actorRef) { - const { mode, type, payload, senderAddress } = interActorSystemMessage; + const { mode, type, payload, id, senderAddress } = interActorSystemMessage; if (mode === "send") { this.log( `Sending the message to the appropriate actor. Type: ${type}, sender: ${senderAddress}, and payload:`, payload ); - this.sendMessageAndWait(actorRef, type, senderAddress, ...payload); + this.sendMessageAndWait(actorRef, type, senderAddress, id, ...payload); } else { this.log( `Sending the question to the appropriate actor. Type: ${type}, sender: ${senderAddress}, and payload:`, payload ); - this.sendMessageAndWait(actorRef, type, senderAddress, ...payload).then( + this.sendMessageAndWait(actorRef, type, senderAddress, id, ...payload).then( message => { this.log( `Received an answer, sending the answer "${message}" for the question with type: ${type}, sender: ${senderAddress}, and payload:`, @@ -111,12 +111,7 @@ export class ActorSystem { }; removeActor = (refOrAddress: ActorRef | Address) => { - let address: Address; - if (refOrAddress instanceof ActorRef) { - address = refOrAddress.address; - } else { - address = refOrAddress; - } + const address = addressOf(refOrAddress); if (address.actorSystemName !== this.name) { throw new Error("Cannot remove actor that does not belong to this actor system"); @@ -130,8 +125,12 @@ export class ActorSystem { } }; - ref = (address: Address): ActorRef => { - return new ActorRef(address, this); + ref = (addressOrActorRef: Address | ActorRef): ActorRef => { + if (addressOrActorRef instanceof ActorRef) { + return addressOrActorRef; + } else { + return new ActorRef(addressOrActorRef, this); + } }; findActor = (address: Address): ActorRef | null => { @@ -153,9 +152,10 @@ export class ActorSystem { target: ActorRef | Address, type: string, senderAddress: Address | null, + id: number, ...payload: any[] ): void => { - this.sendMessageAndWait(target, type, senderAddress, ...payload).then( + this.sendMessageAndWait(target, type, senderAddress, id, ...payload).then( () => { /* do nothing */ }, @@ -179,6 +179,7 @@ export class ActorSystem { target: ActorRef | Address, type: string, senderAddress: Address | null, + id: number, ...payload: any[] ): Promise => { this.log( @@ -190,18 +191,13 @@ export class ActorSystem { "Payload", payload ); - let address: Address; - if (target instanceof ActorRef) { - address = target.address; - } else { - address = target; - } + const address = addressOf(target); if (this.isLocalAddress(address)) { const actor = this.actorRegistry[address.localAddress]; if (actor) { this.log("Found the actor. Sending the message"); - return actor.pushToMailbox(type, senderAddress, ...payload); + return actor.pushToMailbox(type, senderAddress, id, ...payload); } else { this.log("Unable to find the actor. It might have died"); return Promise.reject("Actor not found"); @@ -218,6 +214,7 @@ export class ActorSystem { targetAddress: address, senderAddress: senderAddress, type: type, + id: id, payload: payload }, message => resolve(message) @@ -234,6 +231,11 @@ export class ActorSystem { return address.actorSystemName === this.name; } + _getMessageId(addressOrActorRef: Address | ActorRef) { + const address = addressOf(addressOrActorRef); + const actor = this.actorRegistry[address.localAddress]; + return actor.currentMessageId; + } private log(...message: any[]) { if ( (process.env && process.env.ACTRIX_DEBUG) || @@ -243,3 +245,13 @@ export class ActorSystem { } } } + +function addressOf(addressOrActorRef: Address | ActorRef) { + let address: Address; + if (addressOrActorRef instanceof ActorRef) { + address = addressOrActorRef.address; + } else { + address = addressOrActorRef; + } + return address; +} diff --git a/src/interfaces.ts b/src/interfaces.ts index 946865b..ebe23f0 100644 --- a/src/interfaces.ts +++ b/src/interfaces.ts @@ -31,6 +31,7 @@ export type InterActorSystemMessage = mode: "send" | "ask"; // 'send' is probably no longer needed type: string; payload: any[]; + id: number; targetAddress: Address; senderAddress: Address | null; }