diff --git a/apps/api/src/app/subscribers/e2e/remove-subscriber.e2e.ts b/apps/api/src/app/subscribers/e2e/remove-subscriber.e2e.ts index 2cb01759938..ccd2095d05e 100644 --- a/apps/api/src/app/subscribers/e2e/remove-subscriber.e2e.ts +++ b/apps/api/src/app/subscribers/e2e/remove-subscriber.e2e.ts @@ -45,18 +45,8 @@ describe('Delete Subscriber - /subscribers/:subscriberId (DELETE)', function () }, }); - const isDeleted = !(await subscriberRepository.findBySubscriberId(session.environment._id, '123')); - - expect(isDeleted).to.equal(true); - - const deletedSubscriber = ( - await subscriberRepository.findDeleted({ - _environmentId: session.environment._id, - subscriberId: '123', - }) - )?.[0]; - - expect(deletedSubscriber.deleted).to.equal(true); + const subscriber = await subscriberRepository.findBySubscriberId(session.environment._id, '123'); + expect(subscriber).to.be.null; }); it('should dispose subscriber relations to topic once he was removed', async () => { diff --git a/apps/api/src/app/subscribers/usecases/remove-subscriber/remove-subscriber.spec.ts b/apps/api/src/app/subscribers/usecases/remove-subscriber/remove-subscriber.spec.ts index 47413afc9f8..b8f8985bdb9 100644 --- a/apps/api/src/app/subscribers/usecases/remove-subscriber/remove-subscriber.spec.ts +++ b/apps/api/src/app/subscribers/usecases/remove-subscriber/remove-subscriber.spec.ts @@ -1,11 +1,9 @@ -import { Test } from '@nestjs/testing'; -import { SubscribersService, UserSession } from '@novu/testing'; -import { NotFoundException } from '@nestjs/common'; import { expect } from 'chai'; - +import { NotFoundException } from '@nestjs/common'; +import { SubscribersService, UserSession } from '@novu/testing'; +import { Test } from '@nestjs/testing'; import { RemoveSubscriber } from './remove-subscriber.usecase'; import { RemoveSubscriberCommand } from './remove-subscriber.command'; - import { SharedModule } from '../../../shared/shared.module'; import { SubscribersModule } from '../../subscribers.module'; @@ -41,8 +39,6 @@ describe('Remove Subscriber', function () { }); it('should throw a not found exception if subscriber to remove does not exist', async () => { - const subscriberService = new SubscribersService(session.organization._id, session.environment._id); - try { await useCase.execute( RemoveSubscriberCommand.create({ @@ -51,10 +47,9 @@ describe('Remove Subscriber', function () { organizationId: session.organization._id, }) ); - throw new Error('Should not reach here'); + expect(true, 'Should never reach this statement').to.be.false; } catch (e) { expect(e).to.be.instanceOf(NotFoundException); - expect(e.message).to.eql("Subscriber 'invalid-subscriber-id' was not found"); } }); }); diff --git a/apps/api/src/app/subscribers/usecases/remove-subscriber/remove-subscriber.usecase.ts b/apps/api/src/app/subscribers/usecases/remove-subscriber/remove-subscriber.usecase.ts index 426fd1ec752..c570bdcd191 100644 --- a/apps/api/src/app/subscribers/usecases/remove-subscriber/remove-subscriber.usecase.ts +++ b/apps/api/src/app/subscribers/usecases/remove-subscriber/remove-subscriber.usecase.ts @@ -1,53 +1,88 @@ -import { Injectable } from '@nestjs/common'; -import { SubscriberRepository, DalException, TopicSubscribersRepository } from '@novu/dal'; -import { buildSubscriberKey, InvalidateCacheService } from '@novu/application-generic'; +import { Injectable, NotFoundException } from '@nestjs/common'; +import { + SubscriberRepository, + TopicSubscribersRepository, + SubscriberPreferenceRepository, + PreferencesRepository, +} from '@novu/dal'; +import { + buildSubscriberKey, + buildFeedKey, + buildMessageCountKey, + InvalidateCacheService, +} from '@novu/application-generic'; import { RemoveSubscriberCommand } from './remove-subscriber.command'; -import { GetSubscriber } from '../get-subscriber'; -import { ApiException } from '../../../shared/exceptions/api.exception'; @Injectable() export class RemoveSubscriber { constructor( private invalidateCache: InvalidateCacheService, private subscriberRepository: SubscriberRepository, - private getSubscriber: GetSubscriber, - private topicSubscribersRepository: TopicSubscribersRepository + private topicSubscribersRepository: TopicSubscribersRepository, + private subscriberPreferenceRepository: SubscriberPreferenceRepository, + private preferenceRepository: PreferencesRepository ) {} - async execute(command: RemoveSubscriberCommand) { - try { - const { environmentId: _environmentId, organizationId, subscriberId } = command; - const subscriber = await this.getSubscriber.execute({ - environmentId: _environmentId, - organizationId, - subscriberId, - }); - - await this.invalidateCache.invalidateByKey({ + async execute({ environmentId: _environmentId, subscriberId }: RemoveSubscriberCommand) { + await Promise.all([ + this.invalidateCache.invalidateByKey({ key: buildSubscriberKey({ - subscriberId: command.subscriberId, - _environmentId: command.environmentId, + subscriberId, + _environmentId, }), - }); + }), + this.invalidateCache.invalidateQuery({ + key: buildFeedKey().invalidate({ + subscriberId, + _environmentId, + }), + }), + this.invalidateCache.invalidateQuery({ + key: buildMessageCountKey().invalidate({ + subscriberId, + _environmentId, + }), + }), + ]); + + const subscriberInternalIds = await this.subscriberRepository._model.distinct('_id', { + subscriberId, + _environmentId, + }); + if (subscriberInternalIds.length === 0) { + throw new NotFoundException({ message: 'Subscriber was not found', externalSubscriberId: subscriberId }); + } + + await this.subscriberRepository.withTransaction(async () => { + /* + * Note about parallelism in transactions + * + * Running operations in parallel is not supported during a transaction. + * The use of Promise.all, Promise.allSettled, Promise.race, etc. to parallelize operations + * inside a transaction is undefined behaviour and should be avoided. + * + * Refer to https://mongoosejs.com/docs/transactions.html#note-about-parallelism-in-transactions + */ await this.subscriberRepository.delete({ - _environmentId: subscriber._environmentId, - _organizationId: subscriber._organizationId, - subscriberId: subscriber.subscriberId, + subscriberId, + _environmentId, }); await this.topicSubscribersRepository.delete({ - _environmentId: subscriber._environmentId, - _organizationId: subscriber._organizationId, - externalSubscriberId: subscriber.subscriberId, + _environmentId, + externalSubscriberId: subscriberId, }); - } catch (e) { - if (e instanceof DalException) { - throw new ApiException(e.message); - } - throw e; - } + await this.subscriberPreferenceRepository.delete({ + _environmentId, + _subscriberId: { $in: subscriberInternalIds }, + }); + await this.preferenceRepository.delete({ + _environmentId, + _subscriberId: { $in: subscriberInternalIds }, + }); + }); return { acknowledged: true, diff --git a/apps/api/src/app/workflows-v2/usecases/upsert-workflow/upsert-workflow.usecase.ts b/apps/api/src/app/workflows-v2/usecases/upsert-workflow/upsert-workflow.usecase.ts index 96392730e7f..0c27b50258b 100644 --- a/apps/api/src/app/workflows-v2/usecases/upsert-workflow/upsert-workflow.usecase.ts +++ b/apps/api/src/app/workflows-v2/usecases/upsert-workflow/upsert-workflow.usecase.ts @@ -161,10 +161,7 @@ export class UpsertWorkflowUseCase { ); } - private async upsertUserWorkflowPreferences( - workflow: NotificationTemplateEntity, - command: UpsertWorkflowCommand - ): Promise { + private async upsertUserWorkflowPreferences(workflow: NotificationTemplateEntity, command: UpsertWorkflowCommand) { let preferences: WorkflowPreferences | null; if (command.workflowDto.preferences?.user !== undefined) { preferences = command.workflowDto.preferences.user; @@ -172,7 +169,7 @@ export class UpsertWorkflowUseCase { preferences = DEFAULT_WORKFLOW_PREFERENCES; } - return await this.upsertPreferencesUsecase.upsertUserWorkflowPreferences( + await this.upsertPreferencesUsecase.upsertUserWorkflowPreferences( UpsertUserWorkflowPreferencesCommand.create({ environmentId: workflow._environmentId, organizationId: workflow._organizationId, @@ -183,11 +180,8 @@ export class UpsertWorkflowUseCase { ); } - private async upsertWorkflowPreferences( - workflow: NotificationTemplateEntity, - command: UpsertWorkflowCommand - ): Promise { - return await this.upsertPreferencesUsecase.upsertWorkflowPreferences( + private async upsertWorkflowPreferences(workflow: NotificationTemplateEntity, command: UpsertWorkflowCommand) { + await this.upsertPreferencesUsecase.upsertWorkflowPreferences( UpsertWorkflowPreferencesCommand.create({ environmentId: workflow._environmentId, organizationId: workflow._organizationId, diff --git a/libs/application-generic/src/usecases/upsert-preferences/upsert-preferences.usecase.ts b/libs/application-generic/src/usecases/upsert-preferences/upsert-preferences.usecase.ts index 7b9e3d388dd..a338cb4f70d 100644 --- a/libs/application-generic/src/usecases/upsert-preferences/upsert-preferences.usecase.ts +++ b/libs/application-generic/src/usecases/upsert-preferences/upsert-preferences.usecase.ts @@ -110,7 +110,14 @@ export class UpsertPreferences { throw new BadRequestException('Preference not found'); } - return this.deletePreferences(command, foundPreference?._id); + await this.deletePreferences(command, foundPreference?._id); + + /* + * TODO: Ideally we need to return the foundPreference with a deleted: true flag + * but the repository does not support this yet. For now we will make a compromise + * to avoid refactoring all the usages of this usecase. + */ + return foundPreference; } if (foundPreference) { @@ -162,7 +169,7 @@ export class UpsertPreferences { private async deletePreferences( command: UpsertPreferencesCommand, preferencesId: string, - ): Promise { + ) { return await this.preferencesRepository.delete({ _id: preferencesId, _environmentId: command.environmentId, diff --git a/libs/dal/src/repositories/base-repository.ts b/libs/dal/src/repositories/base-repository.ts index 37b27476dc2..76f734f55c2 100644 --- a/libs/dal/src/repositories/base-repository.ts +++ b/libs/dal/src/repositories/base-repository.ts @@ -6,7 +6,16 @@ import { DEFAULT_MESSAGE_IN_APP_RETENTION_DAYS, DEFAULT_NOTIFICATION_RETENTION_DAYS, } from '@novu/shared'; -import { FilterQuery, Model, ProjectionType, QueryOptions, QueryWithHelpers, Types, UpdateQuery } from 'mongoose'; +import { + ClientSession, + FilterQuery, + Model, + ProjectionType, + QueryOptions, + QueryWithHelpers, + Types, + UpdateQuery, +} from 'mongoose'; import { DalException } from '../shared'; export class BaseRepository { @@ -338,6 +347,10 @@ export class BaseRepository { protected mapEntities(data: any): T_MappedEntity[] { return plainToInstance(this.entity, JSON.parse(JSON.stringify(data))); } + + async withTransaction(fn: Parameters[0]) { + return (await this._model.db.startSession()).withTransaction(fn); + } } interface IOptions { diff --git a/libs/dal/src/repositories/preferences/preferences.repository.ts b/libs/dal/src/repositories/preferences/preferences.repository.ts index 96aa65f1ff5..e95fb5cfd8c 100644 --- a/libs/dal/src/repositories/preferences/preferences.repository.ts +++ b/libs/dal/src/repositories/preferences/preferences.repository.ts @@ -28,13 +28,6 @@ export class PreferencesRepository extends BaseRepository { const res: PreferencesEntity = await this.preferences.findDeleted(query); diff --git a/libs/dal/src/repositories/subscriber/subscriber.repository.ts b/libs/dal/src/repositories/subscriber/subscriber.repository.ts index 88db39aeaaa..bc60514b922 100644 --- a/libs/dal/src/repositories/subscriber/subscriber.repository.ts +++ b/libs/dal/src/repositories/subscriber/subscriber.repository.ts @@ -157,55 +157,19 @@ export class SubscriberRepository extends BaseRepository { return this.subscriber.estimatedDocumentCount(); } } + function mapToSubscriberObject(subscriberId: string) { return { subscriberId }; } + function regExpEscape(literalString: string): string { return literalString.replace(/[-[\]{}()*+!<=:?./\\^$|#\s,]/g, '\\$&'); } + function isErrorWithWriteErrors(e: unknown): e is { writeErrors?: any; message?: string; result?: any } { return typeof e === 'object' && e !== null && 'writeErrors' in e; }