-
Notifications
You must be signed in to change notification settings - Fork 3.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(api): Subscriber deletion side-effects #6872
Changes from all commits
fe17727
90afe2d
5f74fc8
f5fcff5
98bf829
4e1a6f1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,11 +1,9 @@ | ||
import { Test } from '@nestjs/testing'; | ||
import { SubscribersService, UserSession } from '@novu/testing'; | ||
import { NotFoundException } from '@nestjs/common'; | ||
import { expect } from 'chai'; | ||
|
||
import { NotFoundException } from '@nestjs/common'; | ||
import { SubscribersService, UserSession } from '@novu/testing'; | ||
import { Test } from '@nestjs/testing'; | ||
import { RemoveSubscriber } from './remove-subscriber.usecase'; | ||
import { RemoveSubscriberCommand } from './remove-subscriber.command'; | ||
|
||
import { SharedModule } from '../../../shared/shared.module'; | ||
import { SubscribersModule } from '../../subscribers.module'; | ||
|
||
|
@@ -41,8 +39,6 @@ describe('Remove Subscriber', function () { | |
}); | ||
|
||
it('should throw a not found exception if subscriber to remove does not exist', async () => { | ||
const subscriberService = new SubscribersService(session.organization._id, session.environment._id); | ||
|
||
try { | ||
await useCase.execute( | ||
RemoveSubscriberCommand.create({ | ||
|
@@ -51,10 +47,9 @@ describe('Remove Subscriber', function () { | |
organizationId: session.organization._id, | ||
}) | ||
); | ||
throw new Error('Should not reach here'); | ||
expect(true, 'Should never reach this statement').to.be.false; | ||
} catch (e) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A small refactoring without throwing an error will trigger the catch block again and will provide a better message for the test results. |
||
expect(e).to.be.instanceOf(NotFoundException); | ||
expect(e.message).to.eql("Subscriber 'invalid-subscriber-id' was not found"); | ||
} | ||
}); | ||
}); |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,53 +1,88 @@ | ||
import { Injectable } from '@nestjs/common'; | ||
import { SubscriberRepository, DalException, TopicSubscribersRepository } from '@novu/dal'; | ||
import { buildSubscriberKey, InvalidateCacheService } from '@novu/application-generic'; | ||
import { Injectable, NotFoundException } from '@nestjs/common'; | ||
import { | ||
SubscriberRepository, | ||
TopicSubscribersRepository, | ||
SubscriberPreferenceRepository, | ||
PreferencesRepository, | ||
} from '@novu/dal'; | ||
import { | ||
buildSubscriberKey, | ||
buildFeedKey, | ||
buildMessageCountKey, | ||
InvalidateCacheService, | ||
} from '@novu/application-generic'; | ||
|
||
import { RemoveSubscriberCommand } from './remove-subscriber.command'; | ||
import { GetSubscriber } from '../get-subscriber'; | ||
import { ApiException } from '../../../shared/exceptions/api.exception'; | ||
|
||
@Injectable() | ||
export class RemoveSubscriber { | ||
constructor( | ||
private invalidateCache: InvalidateCacheService, | ||
private subscriberRepository: SubscriberRepository, | ||
private getSubscriber: GetSubscriber, | ||
private topicSubscribersRepository: TopicSubscribersRepository | ||
private topicSubscribersRepository: TopicSubscribersRepository, | ||
private subscriberPreferenceRepository: SubscriberPreferenceRepository, | ||
private preferenceRepository: PreferencesRepository | ||
) {} | ||
|
||
async execute(command: RemoveSubscriberCommand) { | ||
try { | ||
const { environmentId: _environmentId, organizationId, subscriberId } = command; | ||
const subscriber = await this.getSubscriber.execute({ | ||
environmentId: _environmentId, | ||
organizationId, | ||
subscriberId, | ||
}); | ||
|
||
await this.invalidateCache.invalidateByKey({ | ||
async execute({ environmentId: _environmentId, subscriberId }: RemoveSubscriberCommand) { | ||
await Promise.all([ | ||
this.invalidateCache.invalidateByKey({ | ||
key: buildSubscriberKey({ | ||
subscriberId: command.subscriberId, | ||
_environmentId: command.environmentId, | ||
subscriberId, | ||
_environmentId, | ||
}), | ||
}); | ||
}), | ||
this.invalidateCache.invalidateQuery({ | ||
key: buildFeedKey().invalidate({ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Clear inbox entries from cache |
||
subscriberId, | ||
_environmentId, | ||
}), | ||
}), | ||
this.invalidateCache.invalidateQuery({ | ||
key: buildMessageCountKey().invalidate({ | ||
subscriberId, | ||
_environmentId, | ||
}), | ||
}), | ||
]); | ||
|
||
const subscriberInternalIds = await this.subscriberRepository._model.distinct('_id', { | ||
subscriberId, | ||
_environmentId, | ||
}); | ||
|
||
if (subscriberInternalIds.length === 0) { | ||
throw new NotFoundException({ message: 'Subscriber was not found', externalSubscriberId: subscriberId }); | ||
} | ||
|
||
await this.subscriberRepository.withTransaction(async () => { | ||
/* | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Delete all related records atomically. Postgres, I miss you! |
||
* Note about parallelism in transactions | ||
* | ||
SokratisVidros marked this conversation as resolved.
Show resolved
Hide resolved
|
||
* Running operations in parallel is not supported during a transaction. | ||
* The use of Promise.all, Promise.allSettled, Promise.race, etc. to parallelize operations | ||
* inside a transaction is undefined behaviour and should be avoided. | ||
* | ||
* Refer to https://mongoosejs.com/docs/transactions.html#note-about-parallelism-in-transactions | ||
*/ | ||
await this.subscriberRepository.delete({ | ||
_environmentId: subscriber._environmentId, | ||
_organizationId: subscriber._organizationId, | ||
subscriberId: subscriber.subscriberId, | ||
subscriberId, | ||
_environmentId, | ||
}); | ||
|
||
await this.topicSubscribersRepository.delete({ | ||
_environmentId: subscriber._environmentId, | ||
_organizationId: subscriber._organizationId, | ||
externalSubscriberId: subscriber.subscriberId, | ||
_environmentId, | ||
externalSubscriberId: subscriberId, | ||
}); | ||
} catch (e) { | ||
if (e instanceof DalException) { | ||
throw new ApiException(e.message); | ||
} | ||
throw e; | ||
} | ||
await this.subscriberPreferenceRepository.delete({ | ||
_environmentId, | ||
_subscriberId: { $in: subscriberInternalIds }, | ||
}); | ||
await this.preferenceRepository.delete({ | ||
_environmentId, | ||
_subscriberId: { $in: subscriberInternalIds }, | ||
}); | ||
}); | ||
|
||
return { | ||
acknowledged: true, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,13 +28,6 @@ export class PreferencesRepository extends BaseRepository<PreferencesDBModel, Pr | |
return this.mapEntity(item); | ||
} | ||
|
||
async delete(query: PreferencesQuery) { | ||
const item = await this.findOne({ _id: query._id, _environmentId: query._environmentId }); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is no need for an extra query when trying to delete a record. The number of affected rows returned by mongoose indicates if the record was found or not. |
||
if (!item) throw new DalException(`Could not find preferences with id ${query._id}`); | ||
|
||
return await this.preferences.delete({ _id: item._id, _environmentId: item._environmentId }); | ||
} | ||
|
||
async findDeleted(query: PreferencesQuery): Promise<PreferencesEntity> { | ||
const res: PreferencesEntity = await this.preferences.findDeleted(query); | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -157,55 +157,19 @@ export class SubscriberRepository extends BaseRepository<SubscriberDBModel, Subs | |
); | ||
} | ||
|
||
async delete(query: SubscriberDeleteQuery) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Clean ups for unused methods |
||
const requestQuery: SubscriberDeleteQuery = { | ||
_environmentId: query._environmentId, | ||
subscriberId: query.subscriberId, | ||
}; | ||
|
||
const foundSubscriber = await this.findOne(requestQuery); | ||
|
||
if (!foundSubscriber) { | ||
throw new DalException(`Could not find subscriber ${query.subscriberId} to delete`); | ||
} | ||
|
||
return await this.subscriber.delete(requestQuery); | ||
} | ||
|
||
async deleteMany(query: SubscriberDeleteManyQuery) { | ||
const requestQuery: SubscriberDeleteManyQuery = { | ||
_environmentId: query._environmentId, | ||
subscriberId: query.subscriberId, | ||
}; | ||
|
||
if (query._id) { | ||
requestQuery._id = query._id; | ||
} | ||
|
||
return await this.subscriber.delete(requestQuery); | ||
} | ||
|
||
async findDeleted(query: SubscriberQuery) { | ||
const requestQuery: SubscriberQuery = { | ||
_environmentId: query._environmentId, | ||
subscriberId: query.subscriberId, | ||
}; | ||
|
||
const res = await this.subscriber.findDeleted(requestQuery); | ||
|
||
return this.mapEntity(res); | ||
} | ||
|
||
async estimatedDocumentCount(): Promise<number> { | ||
return this.subscriber.estimatedDocumentCount(); | ||
} | ||
} | ||
|
||
function mapToSubscriberObject(subscriberId: string) { | ||
return { subscriberId }; | ||
} | ||
|
||
function regExpEscape(literalString: string): string { | ||
return literalString.replace(/[-[\]{}()*+!<=:?./\\^$|#\s,]/g, '\\$&'); | ||
} | ||
|
||
function isErrorWithWriteErrors(e: unknown): e is { writeErrors?: any; message?: string; result?: any } { | ||
return typeof e === 'object' && e !== null && 'writeErrors' in e; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Simplify the E2E so that the
findDeleted
can be removed.