diff --git a/apps/api/src/app/events/e2e/trigger-event.e2e.ts b/apps/api/src/app/events/e2e/trigger-event.e2e.ts index c3824df8d30..c6c9c79227d 100644 --- a/apps/api/src/app/events/e2e/trigger-event.e2e.ts +++ b/apps/api/src/app/events/e2e/trigger-event.e2e.ts @@ -450,6 +450,177 @@ describe(`Trigger event - ${eventTriggerPath} (POST)`, function () { expect(executionDetails.length).to.equal(0); }); + it('should digest events with filters', async function () { + template = await session.createTemplate({ + steps: [ + { + type: StepTypeEnum.DIGEST, + content: '', + metadata: { + unit: DigestUnitEnum.SECONDS, + amount: 2, + type: DelayTypeEnum.REGULAR, + }, + filters: [ + { + isNegated: false, + type: 'GROUP', + value: FieldLogicalOperatorEnum.AND, + children: [ + { + on: FilterPartTypeEnum.PAYLOAD, + operator: FieldOperatorEnum.IS_DEFINED, + field: 'exclude', + value: '', + }, + ], + }, + ], + }, + { + type: StepTypeEnum.SMS, + content: 'total digested: {{step.total_count}}', + }, + ], + }); + + await axiosInstance.post( + `${session.serverUrl}${eventTriggerPath}`, + { + name: template.triggers[0].identifier, + to: [subscriber.subscriberId], + payload: { + exclude: false, + }, + }, + { + headers: { + authorization: `ApiKey ${session.apiKey}`, + }, + } + ); + await axiosInstance.post( + `${session.serverUrl}${eventTriggerPath}`, + { + name: template.triggers[0].identifier, + to: [subscriber.subscriberId], + payload: { + exclude: false, + }, + }, + { + headers: { + authorization: `ApiKey ${session.apiKey}`, + }, + } + ); + + await session.awaitRunningJobs(template?._id, true, 0); + + const messagesAfter = await messageRepository.find({ + _environmentId: session.environment._id, + _subscriberId: subscriber._id, + channel: StepTypeEnum.SMS, + }); + + expect(messagesAfter.length).to.equal(1); + expect(messagesAfter && messagesAfter[0].content).to.include('total digested: 2'); + + const executionDetails = await executionDetailsRepository.find({ + _environmentId: session.environment._id, + _notificationTemplateId: template?._id, + channel: StepTypeEnum.DIGEST, + detail: DetailEnum.FILTER_STEPS, + }); + + expect(executionDetails.length).to.equal(0); + }); + + it('should not aggregate a filtered digest into a non filtered digest', async function () { + template = await session.createTemplate({ + steps: [ + { + type: StepTypeEnum.DIGEST, + content: '', + metadata: { + unit: DigestUnitEnum.SECONDS, + amount: 2, + type: DelayTypeEnum.REGULAR, + }, + filters: [ + { + isNegated: false, + type: 'GROUP', + value: FieldLogicalOperatorEnum.AND, + children: [ + { + on: FilterPartTypeEnum.PAYLOAD, + operator: FieldOperatorEnum.IS_DEFINED, + field: 'exclude', + value: '', + }, + ], + }, + ], + }, + { + type: StepTypeEnum.SMS, + content: 'total digested: {{step.total_count}}', + }, + ], + }); + + await axiosInstance.post( + `${session.serverUrl}${eventTriggerPath}`, + { + name: template.triggers[0].identifier, + to: [subscriber.subscriberId], + payload: { + exclude: false, + }, + }, + { + headers: { + authorization: `ApiKey ${session.apiKey}`, + }, + } + ); + await axiosInstance.post( + `${session.serverUrl}${eventTriggerPath}`, + { + name: template.triggers[0].identifier, + to: [subscriber.subscriberId], + payload: {}, + }, + { + headers: { + authorization: `ApiKey ${session.apiKey}`, + }, + } + ); + + await session.awaitRunningJobs(template?._id, true, 0); + + const messagesAfter = await messageRepository.find({ + _environmentId: session.environment._id, + _subscriberId: subscriber._id, + channel: StepTypeEnum.SMS, + }); + + expect(messagesAfter.length).to.equal(2); + expect(messagesAfter && messagesAfter[0].content).to.include('total digested: 1'); + expect(messagesAfter && messagesAfter[1].content).to.include('total digested: 0'); + + const executionDetails = await executionDetailsRepository.find({ + _environmentId: session.environment._id, + _notificationTemplateId: template?._id, + channel: StepTypeEnum.DIGEST, + detail: DetailEnum.FILTER_STEPS, + }); + + expect(executionDetails.length).to.equal(1); + }); + it('should not filter delay step', async function () { const firstStepUuid = uuid(); template = await session.createTemplate({ diff --git a/apps/web/src/components/execution-detail/ExecutionDetailsStepHeader.tsx b/apps/web/src/components/execution-detail/ExecutionDetailsStepHeader.tsx index 7767962e585..e9a13dd3405 100644 --- a/apps/web/src/components/execution-detail/ExecutionDetailsStepHeader.tsx +++ b/apps/web/src/components/execution-detail/ExecutionDetailsStepHeader.tsx @@ -86,6 +86,9 @@ const generateDetailByStepAndStatus = (status, step) => { } if (step.type === StepTypeEnum.DIGEST) { + if (status === JobStatusEnum.SKIPPED) { + return step.executionDetails?.at(-1)?.detail; + } const { digest } = step; return `Digesting events for ${digest.amount} ${digest.unit}`; diff --git a/apps/worker/src/app/workflow/usecases/queue-next-job/queue-next-job.usecase.ts b/apps/worker/src/app/workflow/usecases/queue-next-job/queue-next-job.usecase.ts index 248e4a5dbb0..580e45e96e7 100644 --- a/apps/worker/src/app/workflow/usecases/queue-next-job/queue-next-job.usecase.ts +++ b/apps/worker/src/app/workflow/usecases/queue-next-job/queue-next-job.usecase.ts @@ -33,6 +33,7 @@ export class QueueNextJob { environmentId: command.environmentId, organizationId: command.organizationId, userId: command.userId, + job, }) ); diff --git a/packages/application-generic/src/usecases/add-job/add-job.usecase.ts b/packages/application-generic/src/usecases/add-job/add-job.usecase.ts index 2fe5e88b5f7..adda840a823 100644 --- a/packages/application-generic/src/usecases/add-job/add-job.usecase.ts +++ b/packages/application-generic/src/usecases/add-job/add-job.usecase.ts @@ -75,7 +75,7 @@ export class AddJob { Logger.debug(`Digest step amount is: ${digestAmount}`, LOG_CONTEXT); digestCreationResult = await this.mergeOrCreateDigestUsecase.execute( - MergeOrCreateDigestCommand.create({ job }) + MergeOrCreateDigestCommand.create({ job, filtered: command.filtered }) ); if (digestCreationResult === DigestCreationResultEnum.MERGED) { diff --git a/packages/application-generic/src/usecases/add-job/merge-or-create-digest.command.ts b/packages/application-generic/src/usecases/add-job/merge-or-create-digest.command.ts index 3b42e46e9a1..d1773ebfdb7 100644 --- a/packages/application-generic/src/usecases/add-job/merge-or-create-digest.command.ts +++ b/packages/application-generic/src/usecases/add-job/merge-or-create-digest.command.ts @@ -1,4 +1,4 @@ -import { IsDefined } from 'class-validator'; +import { IsDefined, IsOptional } from 'class-validator'; import { JobEntity } from '@novu/dal'; import { BaseCommand } from '../../commands/base.command'; @@ -6,4 +6,7 @@ import { BaseCommand } from '../../commands/base.command'; export class MergeOrCreateDigestCommand extends BaseCommand { @IsDefined() job: JobEntity; + + @IsOptional() + filtered?: boolean; } diff --git a/packages/application-generic/src/usecases/add-job/merge-or-create-digest.usecase.ts b/packages/application-generic/src/usecases/add-job/merge-or-create-digest.usecase.ts index b4294e96d99..a2433522f87 100644 --- a/packages/application-generic/src/usecases/add-job/merge-or-create-digest.usecase.ts +++ b/packages/application-generic/src/usecases/add-job/merge-or-create-digest.usecase.ts @@ -56,12 +56,14 @@ export class MergeOrCreateDigest { const digestKey = digestMeta?.digestKey; const digestValue = getNestedValue(job.payload, digestKey); - const digestAction = await this.shouldDelayDigestOrMergeWithLock( - job, - digestKey, - digestValue, - digestMeta - ); + const digestAction = command.filtered + ? { digestResult: DigestCreationResultEnum.SKIPPED } + : await this.shouldDelayDigestOrMergeWithLock( + job, + digestKey, + digestValue, + digestMeta + ); switch (digestAction.digestResult) { case DigestCreationResultEnum.MERGED: @@ -71,7 +73,7 @@ export class MergeOrCreateDigest { digestAction.activeNotificationId ); case DigestCreationResultEnum.SKIPPED: - return await this.processSkippedDigest(job); + return await this.processSkippedDigest(job, command.filtered); case DigestCreationResultEnum.CREATED: return await this.processCreatedDigest(digestMeta, job); default: @@ -138,19 +140,23 @@ export class MergeOrCreateDigest { @Instrument() private async processSkippedDigest( - job: JobEntity + job: JobEntity, + filtered = false ): Promise { - await this.jobRepository.update( - { - _environmentId: job._environmentId, - _id: job._id, - }, - { - $set: { - status: JobStatusEnum.SKIPPED, + await Promise.all([ + this.jobRepository.update( + { + _environmentId: job._environmentId, + _id: job._id, }, - } - ); + { + $set: { + status: JobStatusEnum.SKIPPED, + }, + } + ), + this.digestSkippedExecutionDetails(job, filtered), + ]); return DigestCreationResultEnum.SKIPPED; } @@ -213,4 +219,23 @@ export class MergeOrCreateDigest { job._organizationId ); } + private async digestSkippedExecutionDetails( + job: JobEntity, + filtered: boolean + ): Promise { + const metadata = CreateExecutionDetailsCommand.getExecutionLogMetadata(); + await this.executionLogQueueService.add( + metadata._id, + CreateExecutionDetailsCommand.create({ + ...metadata, + ...CreateExecutionDetailsCommand.getDetailsFromJob(job), + detail: filtered ? DetailEnum.FILTER_STEPS : DetailEnum.DIGEST_SKIPPED, + source: ExecutionDetailsSourceEnum.INTERNAL, + status: ExecutionDetailsStatusEnum.SUCCESS, + isTest: false, + isRetry: false, + }), + job._organizationId + ); + } } diff --git a/packages/application-generic/src/usecases/create-execution-details/types/index.ts b/packages/application-generic/src/usecases/create-execution-details/types/index.ts index 079c087654b..290fa7a51c3 100644 --- a/packages/application-generic/src/usecases/create-execution-details/types/index.ts +++ b/packages/application-generic/src/usecases/create-execution-details/types/index.ts @@ -33,6 +33,7 @@ export enum DetailEnum { WEBHOOK_FILTER_FAILED_RETRY = 'Webhook filter failed, retry will be executed', WEBHOOK_FILTER_FAILED_LAST_RETRY = 'Failed to get response from remote webhook filter on last retry', DIGEST_MERGED = 'Digest was merged with other digest', + DIGEST_SKIPPED = 'Digest was skipped, first backoff event', DELAY_FINISHED = 'Delay is finished', PUSH_MISSING_DEVICE_TOKENS = 'Subscriber credentials is missing the tokens for sending a push notification message', VARIANT_CHOSEN = 'Variant was chosen by the provided condition criteria',