Skip to content

Commit

Permalink
Merge pull request #5091 from novuhq/nv-3303-delay-with-filters-doesn…
Browse files Browse the repository at this point in the history
…t-filter-if-its-the-first-step

fix: filters dont apply to a delay that is the first step in workflow
  • Loading branch information
ainouzgali authored Jan 21, 2024
2 parents 55a350a + aa1e4d3 commit bb41dda
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 34 deletions.
72 changes: 72 additions & 0 deletions apps/api/src/app/events/e2e/trigger-event.e2e.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,78 @@ describe(`Trigger event - ${eventTriggerPath} (POST)`, function () {
expect(executionDetails.length).to.equal(1);
});

it('should filter a delay that is the first step in the workflow', async function () {
template = await session.createTemplate({
steps: [
{
type: StepTypeEnum.DELAY,
content: '',
metadata: {
unit: DigestUnitEnum.SECONDS,
amount: 2,
type: DelayTypeEnum.REGULAR,
},
filters: [
{
isNegated: false,
type: 'GROUP',
value: FieldLogicalOperatorEnum.AND,
children: [
{
on: FilterPartTypeEnum.PAYLOAD,
operator: FieldOperatorEnum.IS_DEFINED,
field: 'exclude',
value: '',
},
],
},
],
},
{
type: StepTypeEnum.EMAIL,
name: 'Message Name',
subject: 'Test email subject',
content: [{ type: EmailBlockTypeEnum.TEXT, content: 'This is a sample text block' }],
},
],
});

await axiosInstance.post(
`${session.serverUrl}${eventTriggerPath}`,
{
name: template.triggers[0].identifier,
to: [subscriber.subscriberId],
payload: {
customVar: 'Testing of User Name',
},
},
{
headers: {
authorization: `ApiKey ${session.apiKey}`,
},
}
);

await session.awaitRunningJobs(template?._id, true, 0);

const messagesAfter = await messageRepository.find({
_environmentId: session.environment._id,
_subscriberId: subscriber._id,
channel: StepTypeEnum.EMAIL,
});

expect(messagesAfter.length).to.equal(1);

const executionDetails = await executionDetailsRepository.find({
_environmentId: session.environment._id,
_notificationTemplateId: template?._id,
channel: StepTypeEnum.DELAY,
detail: DetailEnum.FILTER_STEPS,
});

expect(executionDetails.length).to.equal(1);
});

it('should filter digest step', async function () {
const firstStepUuid = uuid();
template = await session.createTemplate({
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,12 @@
import { forwardRef, Inject, Injectable } from '@nestjs/common';
import { JobEntity, JobRepository } from '@novu/dal';
import { AddJob, ConditionsFilter, ConditionsFilterCommand, InstrumentUsecase } from '@novu/application-generic';
import { AddJob, InstrumentUsecase } from '@novu/application-generic';

import { QueueNextJobCommand } from './queue-next-job.command';
import { StepTypeEnum } from '@novu/shared';

@Injectable()
export class QueueNextJob {
constructor(
private jobRepository: JobRepository,
@Inject(forwardRef(() => AddJob)) private addJobUsecase: AddJob,
private conditionsFilter: ConditionsFilter
) {}
constructor(private jobRepository: JobRepository, @Inject(forwardRef(() => AddJob)) private addJobUsecase: AddJob) {}

@InstrumentUsecase()
public async execute(command: QueueNextJobCommand): Promise<JobEntity | undefined> {
Expand All @@ -24,29 +19,12 @@ export class QueueNextJob {
return;
}

let filtered = false;

if ([StepTypeEnum.DELAY, StepTypeEnum.DIGEST].includes(job.type as StepTypeEnum)) {
const shouldRun = await this.conditionsFilter.filter(
ConditionsFilterCommand.create({
filters: job.step.filters || [],
environmentId: command.environmentId,
organizationId: command.organizationId,
userId: command.userId,
job,
})
);

filtered = !shouldRun.passed;
}

await this.addJobUsecase.execute({
userId: job._userId,
environmentId: job._environmentId,
organizationId: command.organizationId,
jobId: job._id,
job,
filtered,
});

return job;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
import { IsDefined, IsOptional } from 'class-validator';
import { IsDefined } from 'class-validator';
import { JobEntity } from '@novu/dal';

import { EnvironmentWithUserCommand } from '../../commands/project.command';
import { EnvironmentWithUserCommand } from '../../commands';

export class AddJobCommand extends EnvironmentWithUserCommand {
@IsDefined()
jobId: string;

@IsDefined()
job: JobEntity;

@IsOptional()
filtered?: boolean;
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@ import { AddDelayJob } from './add-delay-job.usecase';
import { MergeOrCreateDigestCommand } from './merge-or-create-digest.command';
import { MergeOrCreateDigest } from './merge-or-create-digest.usecase';
import { AddJobCommand } from './add-job.command';
import { DetailEnum } from '../../usecases';
import {
ConditionsFilter,
ConditionsFilterCommand,
DetailEnum,
} from '../../usecases';
import {
CalculateDelayService,
JobsOptions,
Expand Down Expand Up @@ -42,7 +46,9 @@ export class AddJob {
private mergeOrCreateDigestUsecase: MergeOrCreateDigest,
private addDelayJob: AddDelayJob,
@Inject(forwardRef(() => CalculateDelayService))
private calculateDelayService: CalculateDelayService
private calculateDelayService: CalculateDelayService,
@Inject(forwardRef(() => ConditionsFilter))
private conditionsFilter: ConditionsFilter
) {}

@InstrumentUsecase()
Expand All @@ -66,6 +72,26 @@ export class AddJob {
LOG_CONTEXT
);

let filtered = false;

if (
[StepTypeEnum.DELAY, StepTypeEnum.DIGEST].includes(
job.type as StepTypeEnum
)
) {
const shouldRun = await this.conditionsFilter.filter(
ConditionsFilterCommand.create({
filters: job.step.filters || [],
environmentId: command.environmentId,
organizationId: command.organizationId,
userId: command.userId,
job,
})
);

filtered = !shouldRun.passed;
}

let digestAmount: number | undefined;
let digestCreationResult: DigestCreationResultEnum | undefined;
if (job.type === StepTypeEnum.DIGEST) {
Expand All @@ -78,7 +104,7 @@ export class AddJob {
Logger.debug(`Digest step amount is: ${digestAmount}`, LOG_CONTEXT);

digestCreationResult = await this.mergeOrCreateDigestUsecase.execute(
MergeOrCreateDigestCommand.create({ job, filtered: command.filtered })
MergeOrCreateDigestCommand.create({ job, filtered })
);

if (digestCreationResult === DigestCreationResultEnum.MERGED) {
Expand Down Expand Up @@ -150,9 +176,9 @@ export class AddJob {
})
);

const delay = command.filtered ? 0 : digestAmount ?? delayAmount;
const delay = filtered ? 0 : digestAmount ?? delayAmount;

if ((digestAmount || delayAmount) && command.filtered) {
if ((digestAmount || delayAmount) && filtered) {
Logger.verbose(
`Delay for job ${job._id} will be 0 because job was filtered`,
LOG_CONTEXT
Expand Down

0 comments on commit bb41dda

Please sign in to comment.