diff --git a/apps/worker/src/app/workflow/usecases/send-message/send-message-in-app.usecase.ts b/apps/worker/src/app/workflow/usecases/send-message/send-message-in-app.usecase.ts index 59ef2aaffe99..1184e0981565 100644 --- a/apps/worker/src/app/workflow/usecases/send-message/send-message-in-app.usecase.ts +++ b/apps/worker/src/app/workflow/usecases/send-message/send-message-in-app.usecase.ts @@ -247,7 +247,7 @@ export class SendMessageInApp extends SendMessageBase { ); await this.webSocketsQueueService.bullMqService.add( - `sendMessage-received-${message._id}`, + message._id, { event: WebSocketEventEnum.RECEIVED, userId: command._subscriberId, @@ -260,28 +260,6 @@ export class SendMessageInApp extends SendMessageBase { command.organizationId ); - await this.webSocketsQueueService.bullMqService.add( - `sendMessage-unseen-${message._id}`, - { - event: WebSocketEventEnum.UNSEEN, - userId: command._subscriberId, - _environmentId: command.environmentId, - }, - {}, - command.organizationId - ); - - await this.webSocketsQueueService.bullMqService.add( - `sendMessage-unread-${message._id}`, - { - event: WebSocketEventEnum.UNREAD, - userId: command._subscriberId, - _environmentId: command.environmentId, - }, - {}, - command.organizationId - ); - await this.createExecutionDetails.execute( CreateExecutionDetailsCommand.create({ ...CreateExecutionDetailsCommand.getDetailsFromJob(command.job), diff --git a/apps/ws/src/socket/usecases/external-services-route/external-services-route.spec.ts b/apps/ws/src/socket/usecases/external-services-route/external-services-route.spec.ts index 4c3350121fc0..c8b1a54d77bd 100644 --- a/apps/ws/src/socket/usecases/external-services-route/external-services-route.spec.ts +++ b/apps/ws/src/socket/usecases/external-services-route/external-services-route.spec.ts @@ -1,5 +1,5 @@ import * as sinon from 'sinon'; -import { EnvironmentRepository, MessageRepository, UserRepository } from '@novu/dal'; +import { EnvironmentRepository, MessageEntity, MessageRepository, UserRepository } from '@novu/dal'; import { WebSocketEventEnum } from '@novu/shared'; import { ExternalServicesRoute } from './external-services-route.usecase'; @@ -7,32 +7,45 @@ import { ExternalServicesRouteCommand } from './external-services-route.command' import { WSGateway } from '../../ws.gateway'; const environmentId = EnvironmentRepository.createObjectId(); +const messageId = 'message-id-1'; const userId = UserRepository.createObjectId(); +const commandReceivedMessage = ExternalServicesRouteCommand.create({ + event: WebSocketEventEnum.RECEIVED, + userId, + _environmentId: environmentId, + payload: { + message: { + _id: messageId, + _environmentId: environmentId, + // etc... + } as MessageEntity, + }, +}); + +const createWsGatewayStub = (result) => { + return { + sendMessage: sinon.stub(), + server: { + sockets: { + in: sinon.stub().returns({ + fetchSockets: sinon.stub().resolves(result), + }), + }, + }, + } as WSGateway; +}; + describe('ExternalServicesRoute', () => { let externalServicesRoute: ExternalServicesRoute; let wsGatewayStub; - let messageRepository: MessageRepository; let findByIdStub: sinon.Stub; let getCountStub: sinon.Stub; + const messageRepository = new MessageRepository(); beforeEach(() => { findByIdStub = sinon.stub(MessageRepository.prototype, 'findById'); getCountStub = sinon.stub(MessageRepository.prototype, 'getCount'); - - wsGatewayStub = { - sendMessage: sinon.stub(), - server: { - sockets: { - in: sinon.stub().returns({ - fetchSockets: sinon.stub().resolves([{ id: 'socketId' }]), - }), - }, - }, - } as WSGateway; - - messageRepository = new MessageRepository(); - externalServicesRoute = new ExternalServicesRoute(wsGatewayStub, messageRepository); }); afterEach(() => { @@ -40,207 +53,196 @@ describe('ExternalServicesRoute', () => { getCountStub.restore(); }); - it('should send unseen count change when event is "unseen_count_changed"', async () => { - getCountStub.resolves(Promise.resolve(5)); - - await externalServicesRoute.execute( - ExternalServicesRouteCommand.create({ - event: WebSocketEventEnum.UNSEEN, - userId, - _environmentId: environmentId, - payload: {}, - }) - ); - - sinon.assert.calledOnceWithExactly(wsGatewayStub.sendMessage, userId, WebSocketEventEnum.UNSEEN, { - unseenCount: 5, - hasMore: false, + describe('User is not online', () => { + beforeEach(() => { + wsGatewayStub = createWsGatewayStub([]); + externalServicesRoute = new ExternalServicesRoute(wsGatewayStub, messageRepository); }); - }); - it('should send unread count change when event is "unread_count_changed"', async () => { - getCountStub.resolves(Promise.resolve(10)); + it('should not send any message to the web socket if user is not online', async () => { + getCountStub.resolves(Promise.resolve(5)); - await externalServicesRoute.execute( - ExternalServicesRouteCommand.create({ - event: WebSocketEventEnum.UNREAD, - userId, - _environmentId: environmentId, - payload: {}, - }) - ); + await externalServicesRoute.execute(commandReceivedMessage); - sinon.assert.calledOnceWithExactly(wsGatewayStub.sendMessage, userId, WebSocketEventEnum.UNREAD, { - unreadCount: 10, - hasMore: false, + sinon.assert.calledOnceWithExactly(wsGatewayStub.server.sockets.in, userId); + sinon.assert.calledOnceWithExactly(wsGatewayStub.server.sockets.in(userId).fetchSockets); + sinon.assert.notCalled(wsGatewayStub.sendMessage); }); }); - it('should send general message when event is neither "unseen_count_changed" nor "unread_count_changed"', async () => { - const messageId = MessageRepository.createObjectId(); - - findByIdStub.resolves(Promise.resolve({ _id: messageId })); - - const command: ExternalServicesRouteCommand = { - event: WebSocketEventEnum.RECEIVED, - userId, - payload: { messageId }, - }; - - await externalServicesRoute.execute(command); - - sinon.assert.calledOnceWithExactly(wsGatewayStub.sendMessage, userId, WebSocketEventEnum.RECEIVED, { - message: { - _id: messageId, - }, - }); - }); - - it('should skip getCount query if unseen count provided', async () => { - getCountStub.resolves(Promise.resolve(10)); - - let command: ExternalServicesRouteCommand = { - event: WebSocketEventEnum.UNSEEN, - userId, - _environmentId: environmentId, - payload: { unseenCount: 5 }, - }; - await externalServicesRoute.execute(command); - sinon.assert.calledOnceWithExactly(wsGatewayStub.sendMessage, userId, WebSocketEventEnum.UNSEEN, { - unseenCount: 5, - hasMore: false, - }); - - command = { - event: WebSocketEventEnum.UNSEEN, - userId, - _environmentId: environmentId, - payload: { unseenCount: 4 }, - }; - await externalServicesRoute.execute(command); - sinon.assert.calledWithMatch(wsGatewayStub.sendMessage.getCall(1), userId, WebSocketEventEnum.UNSEEN, { - unseenCount: 4, - }); - - getCountStub.resolves(Promise.resolve(20)); - command = { - event: WebSocketEventEnum.UNSEEN, - userId, - _environmentId: environmentId, - }; - await externalServicesRoute.execute(command); - sinon.assert.calledWithMatch(wsGatewayStub.sendMessage.getCall(2), userId, WebSocketEventEnum.UNSEEN, { - unseenCount: 20, + describe('User is online', () => { + beforeEach(() => { + wsGatewayStub = createWsGatewayStub([{ id: 'socket-id' }]); + externalServicesRoute = new ExternalServicesRoute(wsGatewayStub, messageRepository); + findByIdStub.resolves(Promise.resolve({ _id: messageId })); }); - getCountStub.resolves(Promise.resolve(21)); - command = { - event: WebSocketEventEnum.UNSEEN, - userId, - _environmentId: environmentId, - payload: { unseenCount: undefined }, - }; - await externalServicesRoute.execute(command); - sinon.assert.calledWithMatch(wsGatewayStub.sendMessage.getCall(3), userId, WebSocketEventEnum.UNSEEN, { - unseenCount: 21, - }); - - getCountStub.resolves(Promise.resolve(22)); - command = { - event: WebSocketEventEnum.UNSEEN, - userId, - _environmentId: environmentId, - payload: { unseenCount: undefined }, - }; - await externalServicesRoute.execute(command); - sinon.assert.calledWithMatch(wsGatewayStub.sendMessage.getCall(4), userId, WebSocketEventEnum.UNSEEN, { - unseenCount: 22, - }); + it('should send message, unseen count and unread count change when event is received', async () => { + getCountStub.resolves(Promise.resolve(5)); - getCountStub.resolves(Promise.resolve(23)); - command = { - event: WebSocketEventEnum.UNSEEN, - userId, - _environmentId: environmentId, - payload: { unseenCount: 0 }, - }; - await externalServicesRoute.execute(command); - sinon.assert.calledWithMatch(wsGatewayStub.sendMessage.getCall(5), userId, WebSocketEventEnum.UNSEEN, { - unseenCount: 0, - }); - }); - - it('should skip getCount query if unread count provided', async () => { - getCountStub.resolves(Promise.resolve(10)); - - let command: ExternalServicesRouteCommand = { - event: WebSocketEventEnum.UNREAD, - userId, - _environmentId: environmentId, - payload: { unreadCount: 5 }, - }; - await externalServicesRoute.execute(command); - sinon.assert.calledOnceWithExactly(wsGatewayStub.sendMessage, userId, WebSocketEventEnum.UNREAD, { - unreadCount: 5, - hasMore: false, - }); + await externalServicesRoute.execute(commandReceivedMessage); - command = { - event: WebSocketEventEnum.UNREAD, - userId, - _environmentId: environmentId, - payload: { unreadCount: 4 }, - }; - await externalServicesRoute.execute(command); - sinon.assert.calledWithMatch(wsGatewayStub.sendMessage.getCall(1), userId, WebSocketEventEnum.UNREAD, { - unreadCount: 4, + sinon.assert.calledWithMatch(wsGatewayStub.sendMessage.getCall(0), userId, WebSocketEventEnum.RECEIVED, { + message: { + _id: messageId, + }, + }); + sinon.assert.calledWithMatch(wsGatewayStub.sendMessage.getCall(1), userId, WebSocketEventEnum.RECEIVED, { + unseenCount: 5, + hasMore: false, + }); + sinon.assert.calledWithMatch(wsGatewayStub.sendMessage.getCall(2), userId, WebSocketEventEnum.RECEIVED, { + unreadCount: 5, + hasMore: false, + }); }); - getCountStub.resolves(Promise.resolve(20)); - command = { - event: WebSocketEventEnum.UNREAD, - userId, - _environmentId: environmentId, - }; - await externalServicesRoute.execute(command); - sinon.assert.calledWithMatch(wsGatewayStub.sendMessage.getCall(2), userId, WebSocketEventEnum.UNREAD, { - unreadCount: 20, - }); + it('should skip getCount query if unseen count provided', async () => { + getCountStub.resolves(Promise.resolve(10)); - getCountStub.resolves(Promise.resolve(21)); - command = { - event: WebSocketEventEnum.UNREAD, - userId, - _environmentId: environmentId, - payload: { unreadCount: undefined }, - }; - await externalServicesRoute.execute(command); - sinon.assert.calledWithMatch(wsGatewayStub.sendMessage.getCall(3), userId, WebSocketEventEnum.UNREAD, { - unreadCount: 21, + let command: ExternalServicesRouteCommand = { + event: WebSocketEventEnum.UNSEEN, + userId, + _environmentId: environmentId, + payload: { unseenCount: 5 }, + }; + await externalServicesRoute.execute(command); + sinon.assert.calledOnceWithExactly(wsGatewayStub.sendMessage, userId, WebSocketEventEnum.UNSEEN, { + unseenCount: 5, + hasMore: false, + }); + + command = { + event: WebSocketEventEnum.UNSEEN, + userId, + _environmentId: environmentId, + payload: { unseenCount: 4 }, + }; + await externalServicesRoute.execute(command); + sinon.assert.calledWithMatch(wsGatewayStub.sendMessage.getCall(1), userId, WebSocketEventEnum.UNSEEN, { + unseenCount: 4, + }); + + getCountStub.resolves(Promise.resolve(20)); + command = { + event: WebSocketEventEnum.UNSEEN, + userId, + _environmentId: environmentId, + }; + await externalServicesRoute.execute(command); + sinon.assert.calledWithMatch(wsGatewayStub.sendMessage.getCall(2), userId, WebSocketEventEnum.UNSEEN, { + unseenCount: 20, + }); + + getCountStub.resolves(Promise.resolve(21)); + command = { + event: WebSocketEventEnum.UNSEEN, + userId, + _environmentId: environmentId, + payload: { unseenCount: undefined }, + }; + await externalServicesRoute.execute(command); + sinon.assert.calledWithMatch(wsGatewayStub.sendMessage.getCall(3), userId, WebSocketEventEnum.UNSEEN, { + unseenCount: 21, + }); + + getCountStub.resolves(Promise.resolve(22)); + command = { + event: WebSocketEventEnum.UNSEEN, + userId, + _environmentId: environmentId, + payload: { unseenCount: undefined }, + }; + await externalServicesRoute.execute(command); + sinon.assert.calledWithMatch(wsGatewayStub.sendMessage.getCall(4), userId, WebSocketEventEnum.UNSEEN, { + unseenCount: 22, + }); + + getCountStub.resolves(Promise.resolve(23)); + command = { + event: WebSocketEventEnum.UNSEEN, + userId, + _environmentId: environmentId, + payload: { unseenCount: 0 }, + }; + await externalServicesRoute.execute(command); + sinon.assert.calledWithMatch(wsGatewayStub.sendMessage.getCall(5), userId, WebSocketEventEnum.UNSEEN, { + unseenCount: 0, + }); }); - getCountStub.resolves(Promise.resolve(22)); - command = { - event: WebSocketEventEnum.UNREAD, - userId, - _environmentId: environmentId, - payload: { unreadCount: undefined }, - }; - await externalServicesRoute.execute(command); - sinon.assert.calledWithMatch(wsGatewayStub.sendMessage.getCall(4), userId, WebSocketEventEnum.UNREAD, { - unreadCount: 22, - }); + it('should skip getCount query if unread count provided', async () => { + getCountStub.resolves(Promise.resolve(10)); - getCountStub.resolves(Promise.resolve(23)); - command = { - event: WebSocketEventEnum.UNREAD, - userId, - _environmentId: environmentId, - payload: { unreadCount: 0 }, - }; - await externalServicesRoute.execute(command); - sinon.assert.calledWithMatch(wsGatewayStub.sendMessage.getCall(5), userId, WebSocketEventEnum.UNREAD, { - unreadCount: 0, + let command: ExternalServicesRouteCommand = { + event: WebSocketEventEnum.UNREAD, + userId, + _environmentId: environmentId, + payload: { unreadCount: 5 }, + }; + await externalServicesRoute.execute(command); + sinon.assert.calledOnceWithExactly(wsGatewayStub.sendMessage, userId, WebSocketEventEnum.UNREAD, { + unreadCount: 5, + hasMore: false, + }); + + command = { + event: WebSocketEventEnum.UNREAD, + userId, + _environmentId: environmentId, + payload: { unreadCount: 4 }, + }; + await externalServicesRoute.execute(command); + sinon.assert.calledWithMatch(wsGatewayStub.sendMessage.getCall(1), userId, WebSocketEventEnum.UNREAD, { + unreadCount: 4, + }); + + getCountStub.resolves(Promise.resolve(20)); + command = { + event: WebSocketEventEnum.UNREAD, + userId, + _environmentId: environmentId, + }; + await externalServicesRoute.execute(command); + sinon.assert.calledWithMatch(wsGatewayStub.sendMessage.getCall(2), userId, WebSocketEventEnum.UNREAD, { + unreadCount: 20, + }); + + getCountStub.resolves(Promise.resolve(21)); + command = { + event: WebSocketEventEnum.UNREAD, + userId, + _environmentId: environmentId, + payload: { unreadCount: undefined }, + }; + await externalServicesRoute.execute(command); + sinon.assert.calledWithMatch(wsGatewayStub.sendMessage.getCall(3), userId, WebSocketEventEnum.UNREAD, { + unreadCount: 21, + }); + + getCountStub.resolves(Promise.resolve(22)); + command = { + event: WebSocketEventEnum.UNREAD, + userId, + _environmentId: environmentId, + payload: { unreadCount: undefined }, + }; + await externalServicesRoute.execute(command); + sinon.assert.calledWithMatch(wsGatewayStub.sendMessage.getCall(4), userId, WebSocketEventEnum.UNREAD, { + unreadCount: 22, + }); + + getCountStub.resolves(Promise.resolve(23)); + command = { + event: WebSocketEventEnum.UNREAD, + userId, + _environmentId: environmentId, + payload: { unreadCount: 0 }, + }; + await externalServicesRoute.execute(command); + sinon.assert.calledWithMatch(wsGatewayStub.sendMessage.getCall(5), userId, WebSocketEventEnum.UNREAD, { + unreadCount: 0, + }); }); }); }); diff --git a/apps/ws/src/socket/usecases/external-services-route/external-services-route.usecase.ts b/apps/ws/src/socket/usecases/external-services-route/external-services-route.usecase.ts index cbe31b3e8112..4d2173328bcd 100644 --- a/apps/ws/src/socket/usecases/external-services-route/external-services-route.usecase.ts +++ b/apps/ws/src/socket/usecases/external-services-route/external-services-route.usecase.ts @@ -2,7 +2,6 @@ import { Injectable, Logger } from '@nestjs/common'; import { MessageRepository } from '@novu/dal'; import { ChannelTypeEnum, WebSocketEventEnum } from '@novu/shared'; -import { BullMqService } from '@novu/application-generic'; import { ExternalServicesRouteCommand } from './external-services-route.command'; import { WSGateway } from '../../ws.gateway'; @@ -12,42 +11,41 @@ const LOG_CONTEXT = 'ExternalServicesRoute'; @Injectable() export class ExternalServicesRoute { - public readonly bullMqService: BullMqService; + constructor(private wsGateway: WSGateway, private messageRepository: MessageRepository) {} - constructor(private wsGateway: WSGateway, private messageRepository: MessageRepository) { - this.bullMqService = new BullMqService(); - } public async execute(command: ExternalServicesRouteCommand) { - if (command.event === WebSocketEventEnum.UNSEEN) { - await this.sendUnseenCountChange(command); - - return; - } - - if (command.event === WebSocketEventEnum.UNREAD) { - await this.sendUnreadCountChange(command); - - return; - } + const isOnline = await this.connectionExist(command); + if (isOnline) { + if (command.event === WebSocketEventEnum.RECEIVED) { + await this.processReceivedEvent(command); + } - if (command.event === WebSocketEventEnum.RECEIVED) { - // TODO: Retro-compatibility for a bit just in case stalled messages - if (command.payload?.message) { - Logger.verbose('Sending full message in the payload', LOG_CONTEXT); - await this.wsGateway.sendMessage(command.userId, command.event, command.payload); + if (command.event === WebSocketEventEnum.UNSEEN) { + await this.sendUnseenCountChange(command); + } - return; + if (command.event === WebSocketEventEnum.UNREAD) { + await this.sendUnreadCountChange(command); } + } + } - // Now we will only send the messageId in the event to reduce RAM consumption in-memory - const messageId = command.payload?.messageId; - if (messageId) { - Logger.verbose('Sending messageId in the payload, we need to retrieve the full message', LOG_CONTEXT); - const message = await this.messageRepository.findById(messageId); - await this.wsGateway.sendMessage(command.userId, command.event, { message }); + private async processReceivedEvent(command: ExternalServicesRouteCommand): Promise { + const { message, messageId } = command.payload || {}; + // TODO: Retro-compatibility for a bit just in case stalled messages + if (message) { + Logger.verbose('Sending full message in the payload', LOG_CONTEXT); + await this.wsGateway.sendMessage(command.userId, command.event, command.payload); + } else if (messageId) { + Logger.verbose('Sending messageId in the payload, we need to retrieve the full message', LOG_CONTEXT); + const storedMessage = await this.messageRepository.findById(messageId); + await this.wsGateway.sendMessage(command.userId, command.event, { message: storedMessage }); + } - return; - } + // Only recalculate the counts if we send a messageId/message. + if (message || messageId) { + await this.sendUnseenCountChange(command); + await this.sendUnreadCountChange(command); } }