Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: digest with passing filters are not aggregated #4992

Merged
merged 2 commits into from
Dec 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 171 additions & 0 deletions apps/api/src/app/events/e2e/trigger-event.e2e.ts
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,177 @@ describe(`Trigger event - ${eventTriggerPath} (POST)`, function () {
expect(executionDetails.length).to.equal(0);
});

it('should digest events with filters', async function () {
template = await session.createTemplate({
steps: [
{
type: StepTypeEnum.DIGEST,
content: '',
metadata: {
unit: DigestUnitEnum.SECONDS,
amount: 2,
type: DelayTypeEnum.REGULAR,
},
filters: [
{
isNegated: false,
type: 'GROUP',
value: FieldLogicalOperatorEnum.AND,
children: [
{
on: FilterPartTypeEnum.PAYLOAD,
operator: FieldOperatorEnum.IS_DEFINED,
field: 'exclude',
value: '',
},
],
},
],
},
{
type: StepTypeEnum.SMS,
content: 'total digested: {{step.total_count}}',
},
],
});

await axiosInstance.post(
`${session.serverUrl}${eventTriggerPath}`,
{
name: template.triggers[0].identifier,
to: [subscriber.subscriberId],
payload: {
exclude: false,
},
},
{
headers: {
authorization: `ApiKey ${session.apiKey}`,
},
}
);
await axiosInstance.post(
`${session.serverUrl}${eventTriggerPath}`,
{
name: template.triggers[0].identifier,
to: [subscriber.subscriberId],
payload: {
exclude: false,
},
},
{
headers: {
authorization: `ApiKey ${session.apiKey}`,
},
}
);

await session.awaitRunningJobs(template?._id, true, 0);

const messagesAfter = await messageRepository.find({
_environmentId: session.environment._id,
_subscriberId: subscriber._id,
channel: StepTypeEnum.SMS,
});

expect(messagesAfter.length).to.equal(1);
expect(messagesAfter && messagesAfter[0].content).to.include('total digested: 2');

const executionDetails = await executionDetailsRepository.find({
_environmentId: session.environment._id,
_notificationTemplateId: template?._id,
channel: StepTypeEnum.DIGEST,
detail: DetailEnum.FILTER_STEPS,
});

expect(executionDetails.length).to.equal(0);
});

it('should not aggregate a filtered digest into a non filtered digest', async function () {
template = await session.createTemplate({
steps: [
{
type: StepTypeEnum.DIGEST,
content: '',
metadata: {
unit: DigestUnitEnum.SECONDS,
amount: 2,
type: DelayTypeEnum.REGULAR,
},
filters: [
{
isNegated: false,
type: 'GROUP',
value: FieldLogicalOperatorEnum.AND,
children: [
{
on: FilterPartTypeEnum.PAYLOAD,
operator: FieldOperatorEnum.IS_DEFINED,
field: 'exclude',
value: '',
},
],
},
],
},
{
type: StepTypeEnum.SMS,
content: 'total digested: {{step.total_count}}',
},
],
});

await axiosInstance.post(
`${session.serverUrl}${eventTriggerPath}`,
{
name: template.triggers[0].identifier,
to: [subscriber.subscriberId],
payload: {
exclude: false,
},
},
{
headers: {
authorization: `ApiKey ${session.apiKey}`,
},
}
);
await axiosInstance.post(
`${session.serverUrl}${eventTriggerPath}`,
{
name: template.triggers[0].identifier,
to: [subscriber.subscriberId],
payload: {},
},
{
headers: {
authorization: `ApiKey ${session.apiKey}`,
},
}
);

await session.awaitRunningJobs(template?._id, true, 0);

const messagesAfter = await messageRepository.find({
_environmentId: session.environment._id,
_subscriberId: subscriber._id,
channel: StepTypeEnum.SMS,
});

expect(messagesAfter.length).to.equal(2);
expect(messagesAfter && messagesAfter[0].content).to.include('total digested: 1');
expect(messagesAfter && messagesAfter[1].content).to.include('total digested: 0');

const executionDetails = await executionDetailsRepository.find({
_environmentId: session.environment._id,
_notificationTemplateId: template?._id,
channel: StepTypeEnum.DIGEST,
detail: DetailEnum.FILTER_STEPS,
});

expect(executionDetails.length).to.equal(1);
});

it('should not filter delay step', async function () {
const firstStepUuid = uuid();
template = await session.createTemplate({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ const generateDetailByStepAndStatus = (status, step) => {
}

if (step.type === StepTypeEnum.DIGEST) {
if (status === JobStatusEnum.SKIPPED) {
return step.executionDetails?.at(-1)?.detail;
}
const { digest } = step;

return `Digesting events for ${digest.amount} ${digest.unit}`;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ export class QueueNextJob {
environmentId: command.environmentId,
organizationId: command.organizationId,
userId: command.userId,
job,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The conditions filter did not have the payload so all digests were created

})
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ export class AddJob {
Logger.debug(`Digest step amount is: ${digestAmount}`, LOG_CONTEXT);

digestCreationResult = await this.mergeOrCreateDigestUsecase.execute(
MergeOrCreateDigestCommand.create({ job })
MergeOrCreateDigestCommand.create({ job, filtered: command.filtered })
);

if (digestCreationResult === DigestCreationResultEnum.MERGED) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import { IsDefined } from 'class-validator';
import { IsDefined, IsOptional } from 'class-validator';
import { JobEntity } from '@novu/dal';

import { BaseCommand } from '../../commands/base.command';

export class MergeOrCreateDigestCommand extends BaseCommand {
@IsDefined()
job: JobEntity;

@IsOptional()
filtered?: boolean;
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,14 @@ export class MergeOrCreateDigest {
const digestKey = digestMeta?.digestKey;
const digestValue = getNestedValue(job.payload, digestKey);

const digestAction = await this.shouldDelayDigestOrMergeWithLock(
job,
digestKey,
digestValue,
digestMeta
);
const digestAction = command.filtered
? { digestResult: DigestCreationResultEnum.SKIPPED }
: await this.shouldDelayDigestOrMergeWithLock(
job,
digestKey,
digestValue,
digestMeta
);

switch (digestAction.digestResult) {
case DigestCreationResultEnum.MERGED:
Expand All @@ -71,7 +73,7 @@ export class MergeOrCreateDigest {
digestAction.activeNotificationId
);
case DigestCreationResultEnum.SKIPPED:
return await this.processSkippedDigest(job);
return await this.processSkippedDigest(job, command.filtered);
case DigestCreationResultEnum.CREATED:
return await this.processCreatedDigest(digestMeta, job);
default:
Expand Down Expand Up @@ -138,19 +140,23 @@ export class MergeOrCreateDigest {

@Instrument()
private async processSkippedDigest(
job: JobEntity
job: JobEntity,
filtered = false
): Promise<DigestCreationResultEnum> {
await this.jobRepository.update(
{
_environmentId: job._environmentId,
_id: job._id,
},
{
$set: {
status: JobStatusEnum.SKIPPED,
await Promise.all([
this.jobRepository.update(
{
_environmentId: job._environmentId,
_id: job._id,
},
}
);
{
$set: {
status: JobStatusEnum.SKIPPED,
},
}
),
this.digestSkippedExecutionDetails(job, filtered),
]);

return DigestCreationResultEnum.SKIPPED;
}
Expand Down Expand Up @@ -213,4 +219,23 @@ export class MergeOrCreateDigest {
job._organizationId
);
}
private async digestSkippedExecutionDetails(
job: JobEntity,
filtered: boolean
): Promise<void> {
const metadata = CreateExecutionDetailsCommand.getExecutionLogMetadata();
await this.executionLogQueueService.add(
metadata._id,
CreateExecutionDetailsCommand.create({
...metadata,
...CreateExecutionDetailsCommand.getDetailsFromJob(job),
detail: filtered ? DetailEnum.FILTER_STEPS : DetailEnum.DIGEST_SKIPPED,
source: ExecutionDetailsSourceEnum.INTERNAL,
status: ExecutionDetailsStatusEnum.SUCCESS,
isTest: false,
isRetry: false,
}),
job._organizationId
);
Comment on lines +222 to +239
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Show execution details for a skipped digest - for backoff as well. We only showed 'step created'

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great addition!

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ export enum DetailEnum {
WEBHOOK_FILTER_FAILED_RETRY = 'Webhook filter failed, retry will be executed',
WEBHOOK_FILTER_FAILED_LAST_RETRY = 'Failed to get response from remote webhook filter on last retry',
DIGEST_MERGED = 'Digest was merged with other digest',
DIGEST_SKIPPED = 'Digest was skipped, first backoff event',
DELAY_FINISHED = 'Delay is finished',
PUSH_MISSING_DEVICE_TOKENS = 'Subscriber credentials is missing the tokens for sending a push notification message',
VARIANT_CHOSEN = 'Variant was chosen by the provided condition criteria',
Expand Down
Loading