diff --git a/apps/worker/src/app/workflow/usecases/send-message/send-message-in-app.usecase.ts b/apps/worker/src/app/workflow/usecases/send-message/send-message-in-app.usecase.ts index f48b38684e34..59ef2aaffe99 100644 --- a/apps/worker/src/app/workflow/usecases/send-message/send-message-in-app.usecase.ts +++ b/apps/worker/src/app/workflow/usecases/send-message/send-message-in-app.usecase.ts @@ -247,12 +247,13 @@ export class SendMessageInApp extends SendMessageBase { ); await this.webSocketsQueueService.bullMqService.add( - 'sendMessage-received-' + message._id, + `sendMessage-received-${message._id}`, { event: WebSocketEventEnum.RECEIVED, userId: command._subscriberId, + _environmentId: command.environmentId, payload: { - message, + messageId: message._id, }, }, {}, @@ -260,7 +261,7 @@ export class SendMessageInApp extends SendMessageBase { ); await this.webSocketsQueueService.bullMqService.add( - 'sendMessage-unseen-' + message._id, + `sendMessage-unseen-${message._id}`, { event: WebSocketEventEnum.UNSEEN, userId: command._subscriberId, @@ -271,7 +272,7 @@ export class SendMessageInApp extends SendMessageBase { ); await this.webSocketsQueueService.bullMqService.add( - 'sendMessage-unread-' + message._id, + `sendMessage-unread-${message._id}`, { event: WebSocketEventEnum.UNREAD, userId: command._subscriberId, diff --git a/apps/ws/package.json b/apps/ws/package.json index 260f4c2f51a0..50946b414732 100644 --- a/apps/ws/package.json +++ b/apps/ws/package.json @@ -16,7 +16,7 @@ "start:debug": "nest start --debug --watch", "start:prod": "node dist/src/main", "lint": "eslint \"{src,apps,libs,test}/**/*.ts\" --fix", - "test": "cross-env TS_NODE_COMPILER_OPTIONS='{\"strictNullChecks\": false}' TZ=UTC NODE_ENV=test E2E_RUNNER=true mocha --timeout 10000 --require ts-node/register --exit --file e2e/setup.ts 'src/**/**/*.spec.ts'" + "test": "cross-env TS_NODE_COMPILER_OPTIONS='{\"strictNullChecks\": false}' TZ=UTC NODE_ENV=test E2E_RUNNER=true mocha --timeout 10000 --require ts-node/register --exit --file e2e/setup.ts './src/**/*.spec.ts'" }, "dependencies": { "@godaddy/terminus": "^4.3.1", diff --git a/apps/ws/src/.env.test b/apps/ws/src/.env.test index 5cfd29236825..35a085e17042 100644 --- a/apps/ws/src/.env.test +++ b/apps/ws/src/.env.test @@ -9,3 +9,5 @@ NODE_ENV=test GLOBAL_CONTEXT_PATH= WS_CONTEXT_PATH= + +LOGGING_LEVEL=error diff --git a/apps/ws/src/socket/usecases/external-services-route/external-services-route.command.ts b/apps/ws/src/socket/usecases/external-services-route/external-services-route.command.ts index 9b81b73613bf..a5ea694c024d 100644 --- a/apps/ws/src/socket/usecases/external-services-route/external-services-route.command.ts +++ b/apps/ws/src/socket/usecases/external-services-route/external-services-route.command.ts @@ -1,6 +1,7 @@ import { IsDefined, IsOptional, IsString } from 'class-validator'; import { BaseCommand } from '@novu/application-generic'; +import { MessageEntity } from '@novu/dal'; export class ExternalServicesRouteCommand extends BaseCommand { @IsDefined() @@ -12,7 +13,16 @@ export class ExternalServicesRouteCommand extends BaseCommand { event: string; @IsOptional() - payload: Record; + payload?: { + /* + * TODO: We shouldn't import DAL here but this is temporary as we will remove + * the ability of send full message + */ + message?: MessageEntity; + messageId?: string; + unreadCount?: number; + unseenCount?: number; + }; @IsString() @IsOptional() diff --git a/apps/ws/src/socket/usecases/external-services-route/external-services-route.spec.ts b/apps/ws/src/socket/usecases/external-services-route/external-services-route.spec.ts index c6d74b4a60ce..4c3350121fc0 100644 --- a/apps/ws/src/socket/usecases/external-services-route/external-services-route.spec.ts +++ b/apps/ws/src/socket/usecases/external-services-route/external-services-route.spec.ts @@ -1,17 +1,25 @@ import * as sinon from 'sinon'; -import { MessageRepository } from '@novu/dal'; +import { EnvironmentRepository, MessageRepository, UserRepository } from '@novu/dal'; import { WebSocketEventEnum } from '@novu/shared'; import { ExternalServicesRoute } from './external-services-route.usecase'; import { ExternalServicesRouteCommand } from './external-services-route.command'; import { WSGateway } from '../../ws.gateway'; +const environmentId = EnvironmentRepository.createObjectId(); +const userId = UserRepository.createObjectId(); + describe('ExternalServicesRoute', () => { let externalServicesRoute: ExternalServicesRoute; let wsGatewayStub; let messageRepository: MessageRepository; + let findByIdStub: sinon.Stub; + let getCountStub: sinon.Stub; beforeEach(() => { + findByIdStub = sinon.stub(MessageRepository.prototype, 'findById'); + getCountStub = sinon.stub(MessageRepository.prototype, 'getCount'); + wsGatewayStub = { sendMessage: sinon.stub(), server: { @@ -24,225 +32,215 @@ describe('ExternalServicesRoute', () => { } as WSGateway; messageRepository = new MessageRepository(); - externalServicesRoute = new ExternalServicesRoute(wsGatewayStub, messageRepository); }); + afterEach(() => { + findByIdStub.restore(); + getCountStub.restore(); + }); + it('should send unseen count change when event is "unseen_count_changed"', async () => { - const messageRepositoryStub = sinon.stub(MessageRepository.prototype, 'getCount').resolves(Promise.resolve(5)); + getCountStub.resolves(Promise.resolve(5)); await externalServicesRoute.execute( ExternalServicesRouteCommand.create({ event: WebSocketEventEnum.UNSEEN, - userId: 'userId', - _environmentId: 'envId', + userId, + _environmentId: environmentId, payload: {}, }) ); - sinon.assert.calledOnceWithExactly(wsGatewayStub.sendMessage, 'userId', WebSocketEventEnum.UNSEEN, { + sinon.assert.calledOnceWithExactly(wsGatewayStub.sendMessage, userId, WebSocketEventEnum.UNSEEN, { unseenCount: 5, hasMore: false, }); - - messageRepositoryStub.restore(); }); it('should send unread count change when event is "unread_count_changed"', async () => { - const messageRepositoryStub = sinon.stub(MessageRepository.prototype, 'getCount').resolves(Promise.resolve(10)); + getCountStub.resolves(Promise.resolve(10)); await externalServicesRoute.execute( ExternalServicesRouteCommand.create({ event: WebSocketEventEnum.UNREAD, - userId: 'userId', - _environmentId: 'envId', + userId, + _environmentId: environmentId, payload: {}, }) ); - sinon.assert.calledOnceWithExactly(wsGatewayStub.sendMessage, 'userId', WebSocketEventEnum.UNREAD, { + sinon.assert.calledOnceWithExactly(wsGatewayStub.sendMessage, userId, WebSocketEventEnum.UNREAD, { unreadCount: 10, hasMore: false, }); - - messageRepositoryStub.restore(); }); it('should send general message when event is neither "unseen_count_changed" nor "unread_count_changed"', async () => { - const messageRepositoryStub = sinon.stub(MessageRepository.prototype, 'getCount').resolves(Promise.resolve(10)); + const messageId = MessageRepository.createObjectId(); + + findByIdStub.resolves(Promise.resolve({ _id: messageId })); const command: ExternalServicesRouteCommand = { event: WebSocketEventEnum.RECEIVED, - userId: 'userId', - payload: { data: 'payloadData' }, + userId, + payload: { messageId }, }; await externalServicesRoute.execute(command); - sinon.assert.calledOnceWithExactly(wsGatewayStub.sendMessage, 'userId', WebSocketEventEnum.RECEIVED, { - data: 'payloadData', + sinon.assert.calledOnceWithExactly(wsGatewayStub.sendMessage, userId, WebSocketEventEnum.RECEIVED, { + message: { + _id: messageId, + }, }); - - messageRepositoryStub.restore(); }); it('should skip getCount query if unseen count provided', async () => { - let messageRepositoryStub = sinon.stub(MessageRepository.prototype, 'getCount').resolves(Promise.resolve(10)); + getCountStub.resolves(Promise.resolve(10)); + let command: ExternalServicesRouteCommand = { event: WebSocketEventEnum.UNSEEN, - userId: 'userId', - _environmentId: 'envId', + userId, + _environmentId: environmentId, payload: { unseenCount: 5 }, }; await externalServicesRoute.execute(command); - sinon.assert.calledOnceWithExactly(wsGatewayStub.sendMessage, 'userId', WebSocketEventEnum.UNSEEN, { + sinon.assert.calledOnceWithExactly(wsGatewayStub.sendMessage, userId, WebSocketEventEnum.UNSEEN, { unseenCount: 5, hasMore: false, }); - messageRepositoryStub.restore(); - messageRepositoryStub = sinon.stub(MessageRepository.prototype, 'getCount').resolves(Promise.resolve(10)); command = { event: WebSocketEventEnum.UNSEEN, - userId: 'userId', - _environmentId: 'envId', - payload: { unseenCount: '4' }, + userId, + _environmentId: environmentId, + payload: { unseenCount: 4 }, }; await externalServicesRoute.execute(command); - sinon.assert.calledWithMatch(wsGatewayStub.sendMessage.getCall(1), 'userId', WebSocketEventEnum.UNSEEN, { + sinon.assert.calledWithMatch(wsGatewayStub.sendMessage.getCall(1), userId, WebSocketEventEnum.UNSEEN, { unseenCount: 4, }); - messageRepositoryStub.restore(); - messageRepositoryStub = sinon.stub(MessageRepository.prototype, 'getCount').resolves(Promise.resolve(20)); + getCountStub.resolves(Promise.resolve(20)); command = { event: WebSocketEventEnum.UNSEEN, - userId: 'userId', - _environmentId: 'envId', - } as any; + userId, + _environmentId: environmentId, + }; await externalServicesRoute.execute(command); - sinon.assert.calledWithMatch(wsGatewayStub.sendMessage.getCall(2), 'userId', WebSocketEventEnum.UNSEEN, { + sinon.assert.calledWithMatch(wsGatewayStub.sendMessage.getCall(2), userId, WebSocketEventEnum.UNSEEN, { unseenCount: 20, }); - messageRepositoryStub.restore(); - messageRepositoryStub = sinon.stub(MessageRepository.prototype, 'getCount').resolves(Promise.resolve(21)); + getCountStub.resolves(Promise.resolve(21)); command = { event: WebSocketEventEnum.UNSEEN, - userId: 'userId', - _environmentId: 'envId', - payload: { unseenCount: null }, + userId, + _environmentId: environmentId, + payload: { unseenCount: undefined }, }; await externalServicesRoute.execute(command); - sinon.assert.calledWithMatch(wsGatewayStub.sendMessage.getCall(3), 'userId', WebSocketEventEnum.UNSEEN, { + sinon.assert.calledWithMatch(wsGatewayStub.sendMessage.getCall(3), userId, WebSocketEventEnum.UNSEEN, { unseenCount: 21, }); - messageRepositoryStub.restore(); - messageRepositoryStub = sinon.stub(MessageRepository.prototype, 'getCount').resolves(Promise.resolve(22)); + getCountStub.resolves(Promise.resolve(22)); command = { event: WebSocketEventEnum.UNSEEN, - userId: 'userId', - _environmentId: 'envId', + userId, + _environmentId: environmentId, payload: { unseenCount: undefined }, }; await externalServicesRoute.execute(command); - sinon.assert.calledWithMatch(wsGatewayStub.sendMessage.getCall(4), 'userId', WebSocketEventEnum.UNSEEN, { + sinon.assert.calledWithMatch(wsGatewayStub.sendMessage.getCall(4), userId, WebSocketEventEnum.UNSEEN, { unseenCount: 22, }); - messageRepositoryStub.restore(); - messageRepositoryStub = sinon.stub(MessageRepository.prototype, 'getCount').resolves(Promise.resolve(23)); + getCountStub.resolves(Promise.resolve(23)); command = { event: WebSocketEventEnum.UNSEEN, - userId: 'userId', - _environmentId: 'envId', + userId, + _environmentId: environmentId, payload: { unseenCount: 0 }, }; await externalServicesRoute.execute(command); - sinon.assert.calledWithMatch(wsGatewayStub.sendMessage.getCall(5), 'userId', WebSocketEventEnum.UNSEEN, { + sinon.assert.calledWithMatch(wsGatewayStub.sendMessage.getCall(5), userId, WebSocketEventEnum.UNSEEN, { unseenCount: 0, }); - messageRepositoryStub.restore(); }); it('should skip getCount query if unread count provided', async () => { - let messageRepositoryStub = sinon.stub(MessageRepository.prototype, 'getCount').resolves(Promise.resolve(10)); + getCountStub.resolves(Promise.resolve(10)); + let command: ExternalServicesRouteCommand = { event: WebSocketEventEnum.UNREAD, - userId: 'userId', - _environmentId: 'envId', + userId, + _environmentId: environmentId, payload: { unreadCount: 5 }, }; await externalServicesRoute.execute(command); - sinon.assert.calledOnceWithExactly(wsGatewayStub.sendMessage, 'userId', WebSocketEventEnum.UNREAD, { + sinon.assert.calledOnceWithExactly(wsGatewayStub.sendMessage, userId, WebSocketEventEnum.UNREAD, { unreadCount: 5, hasMore: false, }); - messageRepositoryStub.restore(); - messageRepositoryStub = sinon.stub(MessageRepository.prototype, 'getCount').resolves(Promise.resolve(10)); command = { event: WebSocketEventEnum.UNREAD, - userId: 'userId', - _environmentId: 'envId', - payload: { unreadCount: '4' }, + userId, + _environmentId: environmentId, + payload: { unreadCount: 4 }, }; await externalServicesRoute.execute(command); - sinon.assert.calledWithMatch(wsGatewayStub.sendMessage.getCall(1), 'userId', WebSocketEventEnum.UNREAD, { + sinon.assert.calledWithMatch(wsGatewayStub.sendMessage.getCall(1), userId, WebSocketEventEnum.UNREAD, { unreadCount: 4, }); - messageRepositoryStub.restore(); - messageRepositoryStub = sinon.stub(MessageRepository.prototype, 'getCount').resolves(Promise.resolve(20)); + getCountStub.resolves(Promise.resolve(20)); command = { event: WebSocketEventEnum.UNREAD, - userId: 'userId', - _environmentId: 'envId', - } as any; + userId, + _environmentId: environmentId, + }; await externalServicesRoute.execute(command); - sinon.assert.calledWithMatch(wsGatewayStub.sendMessage.getCall(2), 'userId', WebSocketEventEnum.UNREAD, { + sinon.assert.calledWithMatch(wsGatewayStub.sendMessage.getCall(2), userId, WebSocketEventEnum.UNREAD, { unreadCount: 20, }); - messageRepositoryStub.restore(); - messageRepositoryStub = sinon.stub(MessageRepository.prototype, 'getCount').resolves(Promise.resolve(21)); + getCountStub.resolves(Promise.resolve(21)); command = { event: WebSocketEventEnum.UNREAD, - userId: 'userId', - _environmentId: 'envId', - payload: { unreadCount: null }, + userId, + _environmentId: environmentId, + payload: { unreadCount: undefined }, }; await externalServicesRoute.execute(command); - sinon.assert.calledWithMatch(wsGatewayStub.sendMessage.getCall(3), 'userId', WebSocketEventEnum.UNREAD, { + sinon.assert.calledWithMatch(wsGatewayStub.sendMessage.getCall(3), userId, WebSocketEventEnum.UNREAD, { unreadCount: 21, }); - messageRepositoryStub.restore(); - messageRepositoryStub = sinon.stub(MessageRepository.prototype, 'getCount').resolves(Promise.resolve(22)); + getCountStub.resolves(Promise.resolve(22)); command = { event: WebSocketEventEnum.UNREAD, - userId: 'userId', - _environmentId: 'envId', + userId, + _environmentId: environmentId, payload: { unreadCount: undefined }, }; await externalServicesRoute.execute(command); - sinon.assert.calledWithMatch(wsGatewayStub.sendMessage.getCall(4), 'userId', WebSocketEventEnum.UNREAD, { + sinon.assert.calledWithMatch(wsGatewayStub.sendMessage.getCall(4), userId, WebSocketEventEnum.UNREAD, { unreadCount: 22, }); - messageRepositoryStub.restore(); - messageRepositoryStub = sinon.stub(MessageRepository.prototype, 'getCount').resolves(Promise.resolve(23)); + getCountStub.resolves(Promise.resolve(23)); command = { event: WebSocketEventEnum.UNREAD, - userId: 'userId', - _environmentId: 'envId', + userId, + _environmentId: environmentId, payload: { unreadCount: 0 }, }; await externalServicesRoute.execute(command); - sinon.assert.calledWithMatch(wsGatewayStub.sendMessage.getCall(5), 'userId', WebSocketEventEnum.UNREAD, { + sinon.assert.calledWithMatch(wsGatewayStub.sendMessage.getCall(5), userId, WebSocketEventEnum.UNREAD, { unreadCount: 0, }); - messageRepositoryStub.restore(); }); }); diff --git a/apps/ws/src/socket/usecases/external-services-route/external-services-route.usecase.ts b/apps/ws/src/socket/usecases/external-services-route/external-services-route.usecase.ts index 6943107b6410..cbe31b3e8112 100644 --- a/apps/ws/src/socket/usecases/external-services-route/external-services-route.usecase.ts +++ b/apps/ws/src/socket/usecases/external-services-route/external-services-route.usecase.ts @@ -1,4 +1,4 @@ -import { Injectable } from '@nestjs/common'; +import { Injectable, Logger } from '@nestjs/common'; import { MessageRepository } from '@novu/dal'; import { ChannelTypeEnum, WebSocketEventEnum } from '@novu/shared'; @@ -8,6 +8,8 @@ import { ExternalServicesRouteCommand } from './external-services-route.command' import { WSGateway } from '../../ws.gateway'; import { IUnreadCountPaginationIndication, IUnseenCountPaginationIndication } from './types'; +const LOG_CONTEXT = 'ExternalServicesRoute'; + @Injectable() export class ExternalServicesRoute { public readonly bullMqService: BullMqService; @@ -28,7 +30,25 @@ export class ExternalServicesRoute { return; } - await this.wsGateway.sendMessage(command.userId, command.event, command.payload); + if (command.event === WebSocketEventEnum.RECEIVED) { + // TODO: Retro-compatibility for a bit just in case stalled messages + if (command.payload?.message) { + Logger.verbose('Sending full message in the payload', LOG_CONTEXT); + await this.wsGateway.sendMessage(command.userId, command.event, command.payload); + + return; + } + + // Now we will only send the messageId in the event to reduce RAM consumption in-memory + const messageId = command.payload?.messageId; + if (messageId) { + Logger.verbose('Sending messageId in the payload, we need to retrieve the full message', LOG_CONTEXT); + const message = await this.messageRepository.findById(messageId); + await this.wsGateway.sendMessage(command.userId, command.event, { message }); + + return; + } + } } private async sendUnreadCountChange(command: ExternalServicesRouteCommand) { diff --git a/packages/application-generic/src/services/in-memory-provider/in-memory-provider.service.ts b/packages/application-generic/src/services/in-memory-provider/in-memory-provider.service.ts index ecc60e201823..9023e308b51d 100644 --- a/packages/application-generic/src/services/in-memory-provider/in-memory-provider.service.ts +++ b/packages/application-generic/src/services/in-memory-provider/in-memory-provider.service.ts @@ -252,13 +252,6 @@ export class InMemoryProviderService { const { getClient, getConfig, isClientReady } = getClientAndConfig(); - console.log( - getClient(), - getConfig(), - isClientReady(this.provider), - LOG_CONTEXT - ); - this.isProviderClientReady = isClientReady; this.inMemoryProviderConfig = getConfig(); const { host, port, ttl } = getConfig();