From fe177277fc6d6e63ad59de6b39eac69aa652904f Mon Sep 17 00:00:00 2001 From: Sokratis Vidros Date: Wed, 6 Nov 2024 15:14:52 +0200 Subject: [PATCH 1/6] fix(api): Clear Inbox and NC cached entries on subscriber deletion --- .../remove-subscriber.usecase.ts | 31 +++++++++++++++---- 1 file changed, 25 insertions(+), 6 deletions(-) diff --git a/apps/api/src/app/subscribers/usecases/remove-subscriber/remove-subscriber.usecase.ts b/apps/api/src/app/subscribers/usecases/remove-subscriber/remove-subscriber.usecase.ts index 426fd1ec752..52b1feed85e 100644 --- a/apps/api/src/app/subscribers/usecases/remove-subscriber/remove-subscriber.usecase.ts +++ b/apps/api/src/app/subscribers/usecases/remove-subscriber/remove-subscriber.usecase.ts @@ -1,6 +1,11 @@ import { Injectable } from '@nestjs/common'; import { SubscriberRepository, DalException, TopicSubscribersRepository } from '@novu/dal'; -import { buildSubscriberKey, InvalidateCacheService } from '@novu/application-generic'; +import { + buildSubscriberKey, + buildFeedKey, + buildMessageCountKey, + InvalidateCacheService, +} from '@novu/application-generic'; import { RemoveSubscriberCommand } from './remove-subscriber.command'; import { GetSubscriber } from '../get-subscriber'; @@ -24,12 +29,26 @@ export class RemoveSubscriber { subscriberId, }); - await this.invalidateCache.invalidateByKey({ - key: buildSubscriberKey({ - subscriberId: command.subscriberId, - _environmentId: command.environmentId, + await Promise.all([ + this.invalidateCache.invalidateByKey({ + key: buildSubscriberKey({ + subscriberId: command.subscriberId, + _environmentId: command.environmentId, + }), }), - }); + this.invalidateCache.invalidateQuery({ + key: buildFeedKey().invalidate({ + subscriberId, + _environmentId: command.environmentId, + }), + }), + this.invalidateCache.invalidateQuery({ + key: buildMessageCountKey().invalidate({ + subscriberId, + _environmentId: command.environmentId, + }), + }), + ]); await this.subscriberRepository.delete({ _environmentId: subscriber._environmentId, From 90afe2dd77c01792dcdacf71905da6a51b8b26b0 Mon Sep 17 00:00:00 2001 From: Sokratis Vidros Date: Wed, 6 Nov 2024 15:15:34 +0200 Subject: [PATCH 2/6] chore(api): Simplify subscriber repository 1. Simplify the delete method to delete a subscriber without fetching them first. 2. Remove unused methods --- .../subscriber/subscriber.repository.ts | 42 ++----------------- 1 file changed, 3 insertions(+), 39 deletions(-) diff --git a/libs/dal/src/repositories/subscriber/subscriber.repository.ts b/libs/dal/src/repositories/subscriber/subscriber.repository.ts index 88db39aeaaa..bc60514b922 100644 --- a/libs/dal/src/repositories/subscriber/subscriber.repository.ts +++ b/libs/dal/src/repositories/subscriber/subscriber.repository.ts @@ -157,55 +157,19 @@ export class SubscriberRepository extends BaseRepository { return this.subscriber.estimatedDocumentCount(); } } + function mapToSubscriberObject(subscriberId: string) { return { subscriberId }; } + function regExpEscape(literalString: string): string { return literalString.replace(/[-[\]{}()*+!<=:?./\\^$|#\s,]/g, '\\$&'); } + function isErrorWithWriteErrors(e: unknown): e is { writeErrors?: any; message?: string; result?: any } { return typeof e === 'object' && e !== null && 'writeErrors' in e; } From 5f74fc82ea28aa3f50a8933b633079f768b574e2 Mon Sep 17 00:00:00 2001 From: Sokratis Vidros Date: Wed, 6 Nov 2024 16:23:00 +0200 Subject: [PATCH 3/6] feat(dal): Expose Mongoose transactions --- libs/dal/src/repositories/base-repository.ts | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/libs/dal/src/repositories/base-repository.ts b/libs/dal/src/repositories/base-repository.ts index 37b27476dc2..76f734f55c2 100644 --- a/libs/dal/src/repositories/base-repository.ts +++ b/libs/dal/src/repositories/base-repository.ts @@ -6,7 +6,16 @@ import { DEFAULT_MESSAGE_IN_APP_RETENTION_DAYS, DEFAULT_NOTIFICATION_RETENTION_DAYS, } from '@novu/shared'; -import { FilterQuery, Model, ProjectionType, QueryOptions, QueryWithHelpers, Types, UpdateQuery } from 'mongoose'; +import { + ClientSession, + FilterQuery, + Model, + ProjectionType, + QueryOptions, + QueryWithHelpers, + Types, + UpdateQuery, +} from 'mongoose'; import { DalException } from '../shared'; export class BaseRepository { @@ -338,6 +347,10 @@ export class BaseRepository { protected mapEntities(data: any): T_MappedEntity[] { return plainToInstance(this.entity, JSON.parse(JSON.stringify(data))); } + + async withTransaction(fn: Parameters[0]) { + return (await this._model.db.startSession()).withTransaction(fn); + } } interface IOptions { From f5fcff54e941b73310967c75239eca7d05ba0ef2 Mon Sep 17 00:00:00 2001 From: Sokratis Vidros Date: Wed, 6 Nov 2024 16:55:54 +0200 Subject: [PATCH 4/6] fix(api): Remove subscribers along with their related entities --- .../subscribers/e2e/remove-subscriber.e2e.ts | 14 +-- .../remove-subscriber.spec.ts | 13 +-- .../remove-subscriber.usecase.ts | 104 ++++++++++-------- .../preferences/preferences.repository.ts | 7 -- 4 files changed, 66 insertions(+), 72 deletions(-) diff --git a/apps/api/src/app/subscribers/e2e/remove-subscriber.e2e.ts b/apps/api/src/app/subscribers/e2e/remove-subscriber.e2e.ts index 2cb01759938..ccd2095d05e 100644 --- a/apps/api/src/app/subscribers/e2e/remove-subscriber.e2e.ts +++ b/apps/api/src/app/subscribers/e2e/remove-subscriber.e2e.ts @@ -45,18 +45,8 @@ describe('Delete Subscriber - /subscribers/:subscriberId (DELETE)', function () }, }); - const isDeleted = !(await subscriberRepository.findBySubscriberId(session.environment._id, '123')); - - expect(isDeleted).to.equal(true); - - const deletedSubscriber = ( - await subscriberRepository.findDeleted({ - _environmentId: session.environment._id, - subscriberId: '123', - }) - )?.[0]; - - expect(deletedSubscriber.deleted).to.equal(true); + const subscriber = await subscriberRepository.findBySubscriberId(session.environment._id, '123'); + expect(subscriber).to.be.null; }); it('should dispose subscriber relations to topic once he was removed', async () => { diff --git a/apps/api/src/app/subscribers/usecases/remove-subscriber/remove-subscriber.spec.ts b/apps/api/src/app/subscribers/usecases/remove-subscriber/remove-subscriber.spec.ts index 47413afc9f8..b8f8985bdb9 100644 --- a/apps/api/src/app/subscribers/usecases/remove-subscriber/remove-subscriber.spec.ts +++ b/apps/api/src/app/subscribers/usecases/remove-subscriber/remove-subscriber.spec.ts @@ -1,11 +1,9 @@ -import { Test } from '@nestjs/testing'; -import { SubscribersService, UserSession } from '@novu/testing'; -import { NotFoundException } from '@nestjs/common'; import { expect } from 'chai'; - +import { NotFoundException } from '@nestjs/common'; +import { SubscribersService, UserSession } from '@novu/testing'; +import { Test } from '@nestjs/testing'; import { RemoveSubscriber } from './remove-subscriber.usecase'; import { RemoveSubscriberCommand } from './remove-subscriber.command'; - import { SharedModule } from '../../../shared/shared.module'; import { SubscribersModule } from '../../subscribers.module'; @@ -41,8 +39,6 @@ describe('Remove Subscriber', function () { }); it('should throw a not found exception if subscriber to remove does not exist', async () => { - const subscriberService = new SubscribersService(session.organization._id, session.environment._id); - try { await useCase.execute( RemoveSubscriberCommand.create({ @@ -51,10 +47,9 @@ describe('Remove Subscriber', function () { organizationId: session.organization._id, }) ); - throw new Error('Should not reach here'); + expect(true, 'Should never reach this statement').to.be.false; } catch (e) { expect(e).to.be.instanceOf(NotFoundException); - expect(e.message).to.eql("Subscriber 'invalid-subscriber-id' was not found"); } }); }); diff --git a/apps/api/src/app/subscribers/usecases/remove-subscriber/remove-subscriber.usecase.ts b/apps/api/src/app/subscribers/usecases/remove-subscriber/remove-subscriber.usecase.ts index 52b1feed85e..c570bdcd191 100644 --- a/apps/api/src/app/subscribers/usecases/remove-subscriber/remove-subscriber.usecase.ts +++ b/apps/api/src/app/subscribers/usecases/remove-subscriber/remove-subscriber.usecase.ts @@ -1,5 +1,10 @@ -import { Injectable } from '@nestjs/common'; -import { SubscriberRepository, DalException, TopicSubscribersRepository } from '@novu/dal'; +import { Injectable, NotFoundException } from '@nestjs/common'; +import { + SubscriberRepository, + TopicSubscribersRepository, + SubscriberPreferenceRepository, + PreferencesRepository, +} from '@novu/dal'; import { buildSubscriberKey, buildFeedKey, @@ -8,65 +13,76 @@ import { } from '@novu/application-generic'; import { RemoveSubscriberCommand } from './remove-subscriber.command'; -import { GetSubscriber } from '../get-subscriber'; -import { ApiException } from '../../../shared/exceptions/api.exception'; @Injectable() export class RemoveSubscriber { constructor( private invalidateCache: InvalidateCacheService, private subscriberRepository: SubscriberRepository, - private getSubscriber: GetSubscriber, - private topicSubscribersRepository: TopicSubscribersRepository + private topicSubscribersRepository: TopicSubscribersRepository, + private subscriberPreferenceRepository: SubscriberPreferenceRepository, + private preferenceRepository: PreferencesRepository ) {} - async execute(command: RemoveSubscriberCommand) { - try { - const { environmentId: _environmentId, organizationId, subscriberId } = command; - const subscriber = await this.getSubscriber.execute({ - environmentId: _environmentId, - organizationId, - subscriberId, - }); - - await Promise.all([ - this.invalidateCache.invalidateByKey({ - key: buildSubscriberKey({ - subscriberId: command.subscriberId, - _environmentId: command.environmentId, - }), + async execute({ environmentId: _environmentId, subscriberId }: RemoveSubscriberCommand) { + await Promise.all([ + this.invalidateCache.invalidateByKey({ + key: buildSubscriberKey({ + subscriberId, + _environmentId, }), - this.invalidateCache.invalidateQuery({ - key: buildFeedKey().invalidate({ - subscriberId, - _environmentId: command.environmentId, - }), + }), + this.invalidateCache.invalidateQuery({ + key: buildFeedKey().invalidate({ + subscriberId, + _environmentId, }), - this.invalidateCache.invalidateQuery({ - key: buildMessageCountKey().invalidate({ - subscriberId, - _environmentId: command.environmentId, - }), + }), + this.invalidateCache.invalidateQuery({ + key: buildMessageCountKey().invalidate({ + subscriberId, + _environmentId, }), - ]); + }), + ]); + + const subscriberInternalIds = await this.subscriberRepository._model.distinct('_id', { + subscriberId, + _environmentId, + }); + if (subscriberInternalIds.length === 0) { + throw new NotFoundException({ message: 'Subscriber was not found', externalSubscriberId: subscriberId }); + } + + await this.subscriberRepository.withTransaction(async () => { + /* + * Note about parallelism in transactions + * + * Running operations in parallel is not supported during a transaction. + * The use of Promise.all, Promise.allSettled, Promise.race, etc. to parallelize operations + * inside a transaction is undefined behaviour and should be avoided. + * + * Refer to https://mongoosejs.com/docs/transactions.html#note-about-parallelism-in-transactions + */ await this.subscriberRepository.delete({ - _environmentId: subscriber._environmentId, - _organizationId: subscriber._organizationId, - subscriberId: subscriber.subscriberId, + subscriberId, + _environmentId, }); await this.topicSubscribersRepository.delete({ - _environmentId: subscriber._environmentId, - _organizationId: subscriber._organizationId, - externalSubscriberId: subscriber.subscriberId, + _environmentId, + externalSubscriberId: subscriberId, }); - } catch (e) { - if (e instanceof DalException) { - throw new ApiException(e.message); - } - throw e; - } + await this.subscriberPreferenceRepository.delete({ + _environmentId, + _subscriberId: { $in: subscriberInternalIds }, + }); + await this.preferenceRepository.delete({ + _environmentId, + _subscriberId: { $in: subscriberInternalIds }, + }); + }); return { acknowledged: true, diff --git a/libs/dal/src/repositories/preferences/preferences.repository.ts b/libs/dal/src/repositories/preferences/preferences.repository.ts index 96aa65f1ff5..e95fb5cfd8c 100644 --- a/libs/dal/src/repositories/preferences/preferences.repository.ts +++ b/libs/dal/src/repositories/preferences/preferences.repository.ts @@ -28,13 +28,6 @@ export class PreferencesRepository extends BaseRepository { const res: PreferencesEntity = await this.preferences.findDeleted(query); From 98bf8290b71f5f5ca0e3efeed7c215fb1e9363f3 Mon Sep 17 00:00:00 2001 From: Sokratis Vidros Date: Thu, 7 Nov 2024 22:56:00 +0200 Subject: [PATCH 5/6] fixup! fix(api): Remove subscribers along with their related entities --- .../upsert-workflow/upsert-workflow.usecase.ts | 14 ++++---------- .../upsert-preferences.usecase.ts | 11 +++++++++-- 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/apps/api/src/app/workflows-v2/usecases/upsert-workflow/upsert-workflow.usecase.ts b/apps/api/src/app/workflows-v2/usecases/upsert-workflow/upsert-workflow.usecase.ts index d31a30094db..12d3e67fa6e 100644 --- a/apps/api/src/app/workflows-v2/usecases/upsert-workflow/upsert-workflow.usecase.ts +++ b/apps/api/src/app/workflows-v2/usecases/upsert-workflow/upsert-workflow.usecase.ts @@ -152,10 +152,7 @@ export class UpsertWorkflowUseCase { ); } - private async upsertUserWorkflowPreferences( - workflow: NotificationTemplateEntity, - command: UpsertWorkflowCommand - ): Promise { + private async upsertUserWorkflowPreferences(workflow: NotificationTemplateEntity, command: UpsertWorkflowCommand) { let preferences: WorkflowPreferences | null; if (command.workflowDto.preferences?.user !== undefined) { preferences = command.workflowDto.preferences.user; @@ -163,7 +160,7 @@ export class UpsertWorkflowUseCase { preferences = DEFAULT_WORKFLOW_PREFERENCES; } - return await this.upsertPreferencesUsecase.upsertUserWorkflowPreferences( + await this.upsertPreferencesUsecase.upsertUserWorkflowPreferences( UpsertUserWorkflowPreferencesCommand.create({ environmentId: workflow._environmentId, organizationId: workflow._organizationId, @@ -174,11 +171,8 @@ export class UpsertWorkflowUseCase { ); } - private async upsertWorkflowPreferences( - workflow: NotificationTemplateEntity, - command: UpsertWorkflowCommand - ): Promise { - return await this.upsertPreferencesUsecase.upsertWorkflowPreferences( + private async upsertWorkflowPreferences(workflow: NotificationTemplateEntity, command: UpsertWorkflowCommand) { + await this.upsertPreferencesUsecase.upsertWorkflowPreferences( UpsertWorkflowPreferencesCommand.create({ environmentId: workflow._environmentId, organizationId: workflow._organizationId, diff --git a/libs/application-generic/src/usecases/upsert-preferences/upsert-preferences.usecase.ts b/libs/application-generic/src/usecases/upsert-preferences/upsert-preferences.usecase.ts index 7b9e3d388dd..9f00974a8b9 100644 --- a/libs/application-generic/src/usecases/upsert-preferences/upsert-preferences.usecase.ts +++ b/libs/application-generic/src/usecases/upsert-preferences/upsert-preferences.usecase.ts @@ -110,7 +110,14 @@ export class UpsertPreferences { throw new BadRequestException('Preference not found'); } - return this.deletePreferences(command, foundPreference?._id); + await this.deletePreferences(command, foundPreference?._id); + + /* + * Ideally we need to return the foundPreference with a deleted: true flag + * but the repository does not support this yet. For now we will make a compromise + * to avoid refactoring all the usages of this usecase. + */ + return foundPreference; } if (foundPreference) { @@ -162,7 +169,7 @@ export class UpsertPreferences { private async deletePreferences( command: UpsertPreferencesCommand, preferencesId: string, - ): Promise { + ) { return await this.preferencesRepository.delete({ _id: preferencesId, _environmentId: command.environmentId, From 4e1a6f1aa23d6bcac316e117d2a73524e1dda4cf Mon Sep 17 00:00:00 2001 From: Sokratis Vidros Date: Fri, 8 Nov 2024 10:14:41 +0200 Subject: [PATCH 6/6] Update libs/application-generic/src/usecases/upsert-preferences/upsert-preferences.usecase.ts Co-authored-by: Richard Fontein <32132657+rifont@users.noreply.github.com> --- .../usecases/upsert-preferences/upsert-preferences.usecase.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libs/application-generic/src/usecases/upsert-preferences/upsert-preferences.usecase.ts b/libs/application-generic/src/usecases/upsert-preferences/upsert-preferences.usecase.ts index 9f00974a8b9..a338cb4f70d 100644 --- a/libs/application-generic/src/usecases/upsert-preferences/upsert-preferences.usecase.ts +++ b/libs/application-generic/src/usecases/upsert-preferences/upsert-preferences.usecase.ts @@ -113,7 +113,7 @@ export class UpsertPreferences { await this.deletePreferences(command, foundPreference?._id); /* - * Ideally we need to return the foundPreference with a deleted: true flag + * TODO: Ideally we need to return the foundPreference with a deleted: true flag * but the repository does not support this yet. For now we will make a compromise * to avoid refactoring all the usages of this usecase. */