diff --git a/apps/api/src/app/events/e2e/trigger-event.e2e.ts b/apps/api/src/app/events/e2e/trigger-event.e2e.ts index c6c9c79227d..28286cac47b 100644 --- a/apps/api/src/app/events/e2e/trigger-event.e2e.ts +++ b/apps/api/src/app/events/e2e/trigger-event.e2e.ts @@ -159,6 +159,78 @@ describe(`Trigger event - ${eventTriggerPath} (POST)`, function () { expect(executionDetails.length).to.equal(1); }); + it('should filter a delay that is the first step in the workflow', async function () { + template = await session.createTemplate({ + steps: [ + { + type: StepTypeEnum.DELAY, + content: '', + metadata: { + unit: DigestUnitEnum.SECONDS, + amount: 2, + type: DelayTypeEnum.REGULAR, + }, + filters: [ + { + isNegated: false, + type: 'GROUP', + value: FieldLogicalOperatorEnum.AND, + children: [ + { + on: FilterPartTypeEnum.PAYLOAD, + operator: FieldOperatorEnum.IS_DEFINED, + field: 'exclude', + value: '', + }, + ], + }, + ], + }, + { + type: StepTypeEnum.EMAIL, + name: 'Message Name', + subject: 'Test email subject', + content: [{ type: EmailBlockTypeEnum.TEXT, content: 'This is a sample text block' }], + }, + ], + }); + + await axiosInstance.post( + `${session.serverUrl}${eventTriggerPath}`, + { + name: template.triggers[0].identifier, + to: [subscriber.subscriberId], + payload: { + customVar: 'Testing of User Name', + }, + }, + { + headers: { + authorization: `ApiKey ${session.apiKey}`, + }, + } + ); + + await session.awaitRunningJobs(template?._id, true, 0); + + const messagesAfter = await messageRepository.find({ + _environmentId: session.environment._id, + _subscriberId: subscriber._id, + channel: StepTypeEnum.EMAIL, + }); + + expect(messagesAfter.length).to.equal(1); + + const executionDetails = await executionDetailsRepository.find({ + _environmentId: session.environment._id, + _notificationTemplateId: template?._id, + channel: StepTypeEnum.DELAY, + detail: DetailEnum.FILTER_STEPS, + }); + + expect(executionDetails.length).to.equal(1); + }); + it('should filter digest step', async function () { const firstStepUuid = uuid(); template = await session.createTemplate({ diff --git a/apps/worker/src/app/workflow/usecases/queue-next-job/queue-next-job.usecase.ts b/apps/worker/src/app/workflow/usecases/queue-next-job/queue-next-job.usecase.ts index 580e45e96e7..537e346501d 100644 --- a/apps/worker/src/app/workflow/usecases/queue-next-job/queue-next-job.usecase.ts +++ b/apps/worker/src/app/workflow/usecases/queue-next-job/queue-next-job.usecase.ts @@ -1,17 +1,12 @@ import { forwardRef, Inject, Injectable } from '@nestjs/common'; import { JobEntity, JobRepository } from '@novu/dal'; -import { AddJob, ConditionsFilter, ConditionsFilterCommand, InstrumentUsecase } from '@novu/application-generic'; +import { AddJob, InstrumentUsecase } from '@novu/application-generic'; import { QueueNextJobCommand } from './queue-next-job.command'; -import { StepTypeEnum } from '@novu/shared'; @Injectable() export class QueueNextJob { - constructor( - private jobRepository: JobRepository, - @Inject(forwardRef(() => AddJob)) private addJobUsecase: AddJob, - private conditionsFilter: ConditionsFilter - ) {} + constructor(private jobRepository: JobRepository, @Inject(forwardRef(() => AddJob)) private addJobUsecase: AddJob) {} @InstrumentUsecase() public async execute(command: QueueNextJobCommand): Promise { @@ -24,29 +19,12 @@ export class QueueNextJob { return; } - let filtered = false; - - if ([StepTypeEnum.DELAY, StepTypeEnum.DIGEST].includes(job.type as StepTypeEnum)) { - const shouldRun = await this.conditionsFilter.filter( - ConditionsFilterCommand.create({ - filters: job.step.filters || [], - environmentId: command.environmentId, - organizationId: command.organizationId, - userId: command.userId, - job, - }) - ); - - filtered = !shouldRun.passed; - } - await this.addJobUsecase.execute({ userId: job._userId, environmentId: job._environmentId, organizationId: command.organizationId, jobId: job._id, job, - filtered, }); return job; diff --git a/packages/application-generic/src/usecases/add-job/add-job.command.ts b/packages/application-generic/src/usecases/add-job/add-job.command.ts index 7ac6932b12b..561ac800870 100644 --- a/packages/application-generic/src/usecases/add-job/add-job.command.ts +++ b/packages/application-generic/src/usecases/add-job/add-job.command.ts @@ -1,7 +1,7 @@ -import { IsDefined, IsOptional } from 'class-validator'; +import { IsDefined } from 'class-validator'; import { JobEntity } from '@novu/dal'; -import { EnvironmentWithUserCommand } from '../../commands/project.command'; +import { EnvironmentWithUserCommand } from '../../commands'; export class AddJobCommand extends EnvironmentWithUserCommand { @IsDefined() @@ -9,7 +9,4 @@ export class AddJobCommand extends EnvironmentWithUserCommand { @IsDefined() job: JobEntity; - - @IsOptional() - filtered?: boolean; } diff --git a/packages/application-generic/src/usecases/add-job/add-job.usecase.ts b/packages/application-generic/src/usecases/add-job/add-job.usecase.ts index 0ec29f232ce..20f2b9e77f9 100644 --- a/packages/application-generic/src/usecases/add-job/add-job.usecase.ts +++ b/packages/application-generic/src/usecases/add-job/add-job.usecase.ts @@ -11,7 +11,11 @@ import { AddDelayJob } from './add-delay-job.usecase'; import { MergeOrCreateDigestCommand } from './merge-or-create-digest.command'; import { MergeOrCreateDigest } from './merge-or-create-digest.usecase'; import { AddJobCommand } from './add-job.command'; -import { DetailEnum } from '../../usecases'; +import { + ConditionsFilter, + ConditionsFilterCommand, + DetailEnum, +} from '../../usecases'; import { CalculateDelayService, JobsOptions, @@ -42,7 +46,9 @@ export class AddJob { private mergeOrCreateDigestUsecase: MergeOrCreateDigest, private addDelayJob: AddDelayJob, @Inject(forwardRef(() => CalculateDelayService)) - private calculateDelayService: CalculateDelayService + private calculateDelayService: CalculateDelayService, + @Inject(forwardRef(() => ConditionsFilter)) + private conditionsFilter: ConditionsFilter ) {} @InstrumentUsecase() @@ -66,6 +72,26 @@ export class AddJob { LOG_CONTEXT ); + let filtered = false; + + if ( + [StepTypeEnum.DELAY, StepTypeEnum.DIGEST].includes( + job.type as StepTypeEnum + ) + ) { + const shouldRun = await this.conditionsFilter.filter( + ConditionsFilterCommand.create({ + filters: job.step.filters || [], + environmentId: command.environmentId, + organizationId: command.organizationId, + userId: command.userId, + job, + }) + ); + + filtered = !shouldRun.passed; + } + let digestAmount: number | undefined; let digestCreationResult: DigestCreationResultEnum | undefined; if (job.type === StepTypeEnum.DIGEST) { @@ -78,7 +104,7 @@ export class AddJob { Logger.debug(`Digest step amount is: ${digestAmount}`, LOG_CONTEXT); digestCreationResult = await this.mergeOrCreateDigestUsecase.execute( - MergeOrCreateDigestCommand.create({ job, filtered: command.filtered }) + MergeOrCreateDigestCommand.create({ job, filtered }) ); if (digestCreationResult === DigestCreationResultEnum.MERGED) { @@ -150,9 +176,9 @@ export class AddJob { }) ); - const delay = command.filtered ? 0 : digestAmount ?? delayAmount; + const delay = filtered ? 0 : digestAmount ?? delayAmount; - if ((digestAmount || delayAmount) && command.filtered) { + if ((digestAmount || delayAmount) && filtered) { Logger.verbose( `Delay for job ${job._id} will be 0 because job was filtered`, LOG_CONTEXT