From bd86ef6d3d639ec4ffa344736163961d231f4ae7 Mon Sep 17 00:00:00 2001 From: Gosha Date: Sun, 24 Sep 2023 13:47:28 +0300 Subject: [PATCH 1/4] refactor: use conditions filter --- .../src/app/events/e2e/trigger-event.e2e.ts | 4 +- apps/api/src/app/shared/dtos/step-filter.ts | 4 +- .../create-notification-template.command.ts | 4 +- .../worker/src/app/workflow/usecases/index.ts | 1 - .../usecases/message-matcher/index.ts | 1 - .../message-matcher.command.ts | 2 +- .../message-matcher.usecase.spec.ts | 580 +++++---- .../message-matcher.usecase.ts | 1047 ++++++++--------- .../queue-next-job/queue-next-job.usecase.ts | 27 +- .../send-message/send-message.usecase.ts | 56 +- .../src/app/workflow/workflow.module.ts | 2 - .../notification-template.entity.ts | 10 +- packages/application-generic/package.json | 1 + .../conditions-filter.command.ts | 9 +- .../conditions-filter.usecase.ts | 167 ++- .../select-integration.spec.ts | 1 + .../select-integration.usecase.ts | 14 +- .../select-variant/select-variant.usecase.ts | 4 +- pnpm-lock.yaml | 45 +- 19 files changed, 1038 insertions(+), 941 deletions(-) diff --git a/apps/api/src/app/events/e2e/trigger-event.e2e.ts b/apps/api/src/app/events/e2e/trigger-event.e2e.ts index 441939b0fc7..a44757a65fa 100644 --- a/apps/api/src/app/events/e2e/trigger-event.e2e.ts +++ b/apps/api/src/app/events/e2e/trigger-event.e2e.ts @@ -307,7 +307,7 @@ describe(`Trigger event - ${eventTriggerPath} (POST)`, function () { detail: DetailEnum.FILTER_STEPS, }); - expect(executionDetails).to.not.be.ok; + expect(executionDetails?.detail).to.be.equal('Step was filtered based on steps filters'); }); it('should not filter delay step', async function () { @@ -388,7 +388,7 @@ describe(`Trigger event - ${eventTriggerPath} (POST)`, function () { detail: DetailEnum.FILTER_STEPS, }); - expect(executionDetails).to.not.be.ok; + expect(executionDetails?.detail).to.be.equal('Step was filtered based on steps filters'); }); it('should use conditions to select integration', async function () { diff --git a/apps/api/src/app/shared/dtos/step-filter.ts b/apps/api/src/app/shared/dtos/step-filter.ts index cd06682db44..f7a98fb827e 100644 --- a/apps/api/src/app/shared/dtos/step-filter.ts +++ b/apps/api/src/app/shared/dtos/step-filter.ts @@ -115,12 +115,12 @@ type FilterParts = export class StepFilter { @ApiProperty() - isNegated: boolean; + isNegated?: boolean; @ApiProperty({ enum: ['BOOLEAN', 'TEXT', 'DATE', 'NUMBER', 'STATEMENT', 'LIST', 'MULTI_LIST', 'GROUP'], }) - type: BuilderFieldType; + type?: BuilderFieldType; @ApiProperty({ enum: ['AND', 'OR'], diff --git a/apps/api/src/app/workflows/usecases/create-notification-template/create-notification-template.command.ts b/apps/api/src/app/workflows/usecases/create-notification-template/create-notification-template.command.ts index 33f41c2bf74..a264390d2b6 100644 --- a/apps/api/src/app/workflows/usecases/create-notification-template/create-notification-template.command.ts +++ b/apps/api/src/app/workflows/usecases/create-notification-template/create-notification-template.command.ts @@ -136,10 +136,10 @@ export class NotificationStep extends NotificationStepVariant { } export class MessageFilter { - isNegated: boolean; + isNegated?: boolean; @IsString() - type: BuilderFieldType; + type?: BuilderFieldType; @IsString() value: BuilderGroupValues; diff --git a/apps/worker/src/app/workflow/usecases/index.ts b/apps/worker/src/app/workflow/usecases/index.ts index b65ed1e1357..288f42daf8d 100644 --- a/apps/worker/src/app/workflow/usecases/index.ts +++ b/apps/worker/src/app/workflow/usecases/index.ts @@ -1,5 +1,4 @@ export * from './handle-last-failed-job'; -export * from './message-matcher'; export * from './queue-next-job'; export * from './run-job'; export * from './send-message'; diff --git a/apps/worker/src/app/workflow/usecases/message-matcher/index.ts b/apps/worker/src/app/workflow/usecases/message-matcher/index.ts index 169e541f397..e69de29bb2d 100644 --- a/apps/worker/src/app/workflow/usecases/message-matcher/index.ts +++ b/apps/worker/src/app/workflow/usecases/message-matcher/index.ts @@ -1 +0,0 @@ -export { MessageMatcher } from './message-matcher.usecase'; diff --git a/apps/worker/src/app/workflow/usecases/message-matcher/message-matcher.command.ts b/apps/worker/src/app/workflow/usecases/message-matcher/message-matcher.command.ts index e93efa04d43..762f8d11bd0 100644 --- a/apps/worker/src/app/workflow/usecases/message-matcher/message-matcher.command.ts +++ b/apps/worker/src/app/workflow/usecases/message-matcher/message-matcher.command.ts @@ -2,7 +2,7 @@ import { IsDefined, IsString } from 'class-validator'; import { NotificationStepEntity, JobEntity } from '@novu/dal'; import { EnvironmentWithUserCommand } from '@novu/application-generic'; -export class MessageMatcherCommand extends EnvironmentWithUserCommand { +class MessageMatcherCommand extends EnvironmentWithUserCommand { @IsDefined() @IsString() identifier: string; diff --git a/apps/worker/src/app/workflow/usecases/message-matcher/message-matcher.usecase.spec.ts b/apps/worker/src/app/workflow/usecases/message-matcher/message-matcher.usecase.spec.ts index e716e042800..73643a774f4 100644 --- a/apps/worker/src/app/workflow/usecases/message-matcher/message-matcher.usecase.spec.ts +++ b/apps/worker/src/app/workflow/usecases/message-matcher/message-matcher.usecase.spec.ts @@ -2,17 +2,16 @@ import { expect } from 'chai'; import * as sinon from 'sinon'; import axios from 'axios'; import { Duration, sub } from 'date-fns'; + import { FilterParts, FilterPartTypeEnum, FILTER_TO_LABEL, StepTypeEnum, TimeOperatorEnum } from '@novu/shared'; import { JobEntity, MessageTemplateEntity, NotificationStepEntity } from '@novu/dal'; - -import { MessageMatcher } from './message-matcher.usecase'; -import type { SendMessageCommand } from '../send-message/send-message.command'; +import { ConditionsFilter, ConditionsFilterCommand } from '@novu/application-generic'; describe('Message filter matcher', function () { const createExecutionDetails = { execute: sinon.stub(), }; - const messageMatcher = new MessageMatcher( + const conditionsFilter = new ConditionsFilter( undefined as any, createExecutionDetails as any, undefined as any, @@ -22,8 +21,8 @@ describe('Message filter matcher', function () { ); it('should filter correct message by the filter value', async function () { - const matchedMessage = await messageMatcher.filter( - sendMessageCommand({ + const matchedMessage = await conditionsFilter.filter( + mapConditionsFilterCommand({ step: makeStep('Correct Match', 'OR', [ { operator: 'EQUAL', @@ -32,20 +31,20 @@ describe('Message filter matcher', function () { on: FilterPartTypeEnum.PAYLOAD, }, ]), - }), - { - payload: { - varField: 'firstVar', + variables: { + payload: { + varField: 'firstVar', + }, }, - } + }) ); expect(matchedMessage.passed).to.equal(true); }); it('should match a message for AND filter group', async function () { - const matchedMessage = await messageMatcher.filter( - sendMessageCommand({ + const matchedMessage = await conditionsFilter.filter( + mapConditionsFilterCommand({ step: makeStep('Correct Match', 'AND', [ { operator: 'EQUAL', @@ -60,21 +59,21 @@ describe('Message filter matcher', function () { on: FilterPartTypeEnum.PAYLOAD, }, ]), - }), - { - payload: { - varField: 'firstVar', - secondField: 'secondVar', + variables: { + payload: { + varField: 'firstVar', + secondField: 'secondVar', + }, }, - } + }) ); expect(matchedMessage.passed).to.equal(true); }); it('should not match AND group for single bad item', async function () { - const matchedMessage = await messageMatcher.filter( - sendMessageCommand({ + const matchedMessage = await conditionsFilter.filter( + mapConditionsFilterCommand({ step: makeStep('Title', 'AND', [ { operator: 'EQUAL', @@ -89,21 +88,21 @@ describe('Message filter matcher', function () { on: FilterPartTypeEnum.PAYLOAD, }, ]), - }), - { - payload: { - varField: 'firstVar', - secondField: 'secondVarBad', + variables: { + payload: { + varField: 'firstVar', + secondField: 'secondVarBad', + }, }, - } + }) ); expect(matchedMessage.passed).to.equal(false); }); it('should match a NOT_EQUAL for EQUAL var', async function () { - const matchedMessage = await messageMatcher.filter( - sendMessageCommand({ + const matchedMessage = await conditionsFilter.filter( + mapConditionsFilterCommand({ step: makeStep('Correct Match', 'AND', [ { operator: 'EQUAL', @@ -118,21 +117,21 @@ describe('Message filter matcher', function () { on: FilterPartTypeEnum.PAYLOAD, }, ]), - }), - { - payload: { - varField: 'firstVar', - secondField: 'secondVarBad', + variables: { + payload: { + varField: 'firstVar', + secondField: 'secondVarBad', + }, }, - } + }) ); expect(matchedMessage.passed).to.equal(true); }); it('should match a EQUAL for a boolean var', async function () { - const matchedMessage = await messageMatcher.filter( - sendMessageCommand({ + const matchedMessage = await conditionsFilter.filter( + mapConditionsFilterCommand({ step: makeStep('Correct Match', 'AND', [ { operator: 'EQUAL', @@ -141,34 +140,36 @@ describe('Message filter matcher', function () { on: FilterPartTypeEnum.PAYLOAD, }, ]), - }), - { - payload: { - varField: true, + variables: { + payload: { + varField: true, + }, }, - } + }) ); expect(matchedMessage.passed).to.equal(true); }); it('should fall thru for no filters item', async function () { - const matchedMessage = await messageMatcher.filter( - sendMessageCommand({ step: makeStep('Correct Match 2', 'OR', []) }), - { - payload: { - varField: 'firstVar', - secondField: 'secondVarBad', + const matchedMessage = await conditionsFilter.filter( + mapConditionsFilterCommand({ + step: makeStep('Correct Match 2', 'OR', []), + variables: { + payload: { + varField: 'firstVar', + secondField: 'secondVarBad', + }, }, - } + }) ); expect(matchedMessage.passed).to.equal(true); }); it('should get larger payload var then filter value', async function () { - const matchedMessage = await messageMatcher.filter( - sendMessageCommand({ + const matchedMessage = await conditionsFilter.filter( + mapConditionsFilterCommand({ step: makeStep('Correct Match', 'AND', [ { operator: 'LARGER', @@ -177,20 +178,20 @@ describe('Message filter matcher', function () { on: FilterPartTypeEnum.PAYLOAD, }, ]), - }), - { - payload: { - varField: 3, + variables: { + payload: { + varField: 3, + }, }, - } + }) ); expect(matchedMessage.passed).to.equal(true); }); it('should get smaller payload var then filter value', async function () { - const matchedMessage = await messageMatcher.filter( - sendMessageCommand({ + const matchedMessage = await conditionsFilter.filter( + mapConditionsFilterCommand({ step: makeStep('Correct Match', 'AND', [ { operator: 'SMALLER', @@ -199,20 +200,20 @@ describe('Message filter matcher', function () { on: FilterPartTypeEnum.PAYLOAD, }, ]), - }), - { - payload: { - varField: 0, + variables: { + payload: { + varField: 0, + }, }, - } + }) ); expect(matchedMessage.passed).to.equal(true); }); it('should get larger or equal payload var then filter value', async function () { - let matchedMessage = await messageMatcher.filter( - sendMessageCommand({ + let matchedMessage = await conditionsFilter.filter( + mapConditionsFilterCommand({ step: makeStep('Correct Match', 'AND', [ { operator: 'LARGER_EQUAL', @@ -221,18 +222,18 @@ describe('Message filter matcher', function () { on: FilterPartTypeEnum.PAYLOAD, }, ]), - }), - { - payload: { - varField: 3, + variables: { + payload: { + varField: 3, + }, }, - } + }) ); expect(matchedMessage.passed).to.equal(true); - matchedMessage = await messageMatcher.filter( - sendMessageCommand({ + matchedMessage = await conditionsFilter.filter( + mapConditionsFilterCommand({ step: makeStep('Correct Match', 'AND', [ { operator: 'LARGER_EQUAL', @@ -241,19 +242,19 @@ describe('Message filter matcher', function () { on: FilterPartTypeEnum.PAYLOAD, }, ]), - }), - { - payload: { - varField: 3, + variables: { + payload: { + varField: 3, + }, }, - } + }) ); expect(matchedMessage.passed).to.equal(true); }); it('should check if value is defined in payload', async function () { - const matchedMessage = await messageMatcher.filter( - sendMessageCommand({ + const matchedMessage = await conditionsFilter.filter( + mapConditionsFilterCommand({ step: makeStep('Correct Match', 'AND', [ { operator: 'IS_DEFINED', @@ -262,20 +263,20 @@ describe('Message filter matcher', function () { on: FilterPartTypeEnum.PAYLOAD, }, ]), - }), - { - payload: { - emailMessage: 'This works', + variables: { + payload: { + emailMessage: 'This works', + }, }, - } + }) ); expect(matchedMessage.passed).to.equal(true); }); it('should check if key is defined or not in subscriber data', async function () { - const matchedMessage = await messageMatcher.filter( - sendMessageCommand({ + const matchedMessage = await conditionsFilter.filter( + mapConditionsFilterCommand({ step: makeStep('Correct Match', 'AND', [ { operator: 'IS_DEFINED', @@ -284,32 +285,32 @@ describe('Message filter matcher', function () { on: FilterPartTypeEnum.SUBSCRIBER, }, ]), - }), - { - subscriber: { - firstName: '', - lastName: '', - email: '', - subscriberId: '', - deleted: false, - createdAt: '', - updatedAt: '', - _id: '', - _organizationId: '', - _environmentId: '', - data: { - nested_Key: 'nestedValue', + variables: { + subscriber: { + firstName: '', + lastName: '', + email: '', + subscriberId: '', + deleted: false, + createdAt: '', + updatedAt: '', + _id: '', + _organizationId: '', + _environmentId: '', + data: { + nested_Key: 'nestedValue', + }, }, }, - } + }) ); expect(matchedMessage.passed).to.equal(false); }); it('should get nested custom subscriber data', async function () { - const matchedMessage = await messageMatcher.filter( - sendMessageCommand({ + const matchedMessage = await conditionsFilter.filter( + mapConditionsFilterCommand({ step: makeStep('Correct Match', 'OR', [ { operator: 'EQUAL', @@ -318,32 +319,32 @@ describe('Message filter matcher', function () { on: FilterPartTypeEnum.SUBSCRIBER, }, ]), - }), - { - subscriber: { - firstName: '', - lastName: '', - email: '', - subscriberId: '', - deleted: false, - createdAt: '', - updatedAt: '', - _id: '', - _organizationId: '', - _environmentId: '', - data: { - nestedKey: 'nestedValue', + variables: { + subscriber: { + firstName: '', + lastName: '', + email: '', + subscriberId: '', + deleted: false, + createdAt: '', + updatedAt: '', + _id: '', + _organizationId: '', + _environmentId: '', + data: { + nestedKey: 'nestedValue', + }, }, }, - } + }) ); expect(matchedMessage.passed).to.equal(true); }); it("should return false with nested data that doesn't exist", async function () { - const matchedMessage = await messageMatcher.filter( - sendMessageCommand({ + const matchedMessage = await conditionsFilter.filter( + mapConditionsFilterCommand({ step: makeStep('Correct Match', 'OR', [ { operator: 'EQUAL', @@ -352,24 +353,24 @@ describe('Message filter matcher', function () { on: FilterPartTypeEnum.PAYLOAD, }, ]), - }), - { - payload: { - data: { - nestedKey: { - childKey: 'nestedValue', + variables: { + payload: { + data: { + nestedKey: { + childKey: 'nestedValue', + }, }, }, }, - } + }) ); expect(matchedMessage.passed).to.equal(false); }); it('should get smaller or equal payload var then filter value', async function () { - let matchedMessage = await messageMatcher.filter( - sendMessageCommand({ + let matchedMessage = await conditionsFilter.filter( + mapConditionsFilterCommand({ step: makeStep('Correct Match', 'AND', [ { operator: 'SMALLER_EQUAL', @@ -378,18 +379,18 @@ describe('Message filter matcher', function () { on: FilterPartTypeEnum.PAYLOAD, }, ]), - }), - { - payload: { - varField: 0, + variables: { + payload: { + varField: 0, + }, }, - } + }) ); expect(matchedMessage.passed).to.equal(true); - matchedMessage = await messageMatcher.filter( - sendMessageCommand({ + matchedMessage = await conditionsFilter.filter( + mapConditionsFilterCommand({ step: makeStep('Correct Match', 'AND', [ { operator: 'SMALLER_EQUAL', @@ -398,20 +399,20 @@ describe('Message filter matcher', function () { on: FilterPartTypeEnum.PAYLOAD, }, ]), - }), - { - payload: { - varField: 3, + variables: { + payload: { + varField: 3, + }, }, - } + }) ); expect(matchedMessage.passed).to.equal(true); }); it('should handle now filters', async function () { - let matchedMessage = await messageMatcher.filter( - sendMessageCommand({ + let matchedMessage = await conditionsFilter.filter( + mapConditionsFilterCommand({ step: { _templateId: '123', template: { @@ -425,17 +426,18 @@ describe('Message filter matcher', function () { } as MessageTemplateEntity, filters: undefined, }, - }), - { - payload: { - varField: 3, + variables: { + payload: { + varField: 3, + }, }, - } + }) ); + expect(matchedMessage.passed).to.equal(true); - matchedMessage = await messageMatcher.filter( - sendMessageCommand({ + matchedMessage = await conditionsFilter.filter( + mapConditionsFilterCommand({ step: { _templateId: '123', template: { @@ -449,16 +451,17 @@ describe('Message filter matcher', function () { } as MessageTemplateEntity, filters: [], }, - }), - { - payload: { - varField: 3, + variables: { + payload: { + varField: 3, + }, }, - } + }) ); + expect(matchedMessage.passed).to.equal(true); - matchedMessage = await messageMatcher.filter( - sendMessageCommand({ + matchedMessage = await conditionsFilter.filter( + mapConditionsFilterCommand({ step: { _templateId: '123', template: { @@ -479,16 +482,17 @@ describe('Message filter matcher', function () { }, ], }, - }), - { - payload: { - varField: 3, + variables: { + payload: { + varField: 3, + }, }, - } + }) ); + expect(matchedMessage.passed).to.equal(true); - matchedMessage = await messageMatcher.filter( - sendMessageCommand({ + matchedMessage = await conditionsFilter.filter( + mapConditionsFilterCommand({ step: { _templateId: '123', template: { @@ -509,13 +513,14 @@ describe('Message filter matcher', function () { }, ], }, - }), - { - payload: { - varField: 3, + variables: { + payload: { + varField: 3, + }, }, - } + }) ); + expect(matchedMessage.passed).to.equal(true); }); @@ -526,8 +531,8 @@ describe('Message filter matcher', function () { }) ); - const matchedMessage = await messageMatcher.filter( - sendMessageCommand({ + const matchedMessage = await conditionsFilter.filter( + mapConditionsFilterCommand({ step: makeStep('Correct Match', undefined, [ { operator: 'EQUAL', @@ -537,10 +542,8 @@ describe('Message filter matcher', function () { webhookUrl: 'www.user.com/webhook', }, ]), - }), - { - payload: {}, - } + variables: { payload: {} }, + }) ); expect(matchedMessage.passed).to.equal(true); @@ -555,8 +558,8 @@ describe('Message filter matcher', function () { }) ); - let matchedMessage = await messageMatcher.filter( - sendMessageCommand({ + let matchedMessage = await conditionsFilter.filter( + mapConditionsFilterCommand({ step: makeStep('Correct Match', 'OR', [ { operator: 'EQUAL', @@ -572,10 +575,8 @@ describe('Message filter matcher', function () { webhookUrl: 'www.user.com/webhook', }, ]), - }), - { - payload: { payloadVarField: true }, - } + variables: { payload: { payloadVarField: true } }, + }) ); let requestsCount = gotGetStub.callCount; @@ -585,8 +586,8 @@ describe('Message filter matcher', function () { //Reorder children order to make sure it is not random - matchedMessage = await messageMatcher.filter( - sendMessageCommand({ + matchedMessage = await conditionsFilter.filter( + mapConditionsFilterCommand({ step: makeStep('Correct Match', 'OR', [ { operator: 'EQUAL', @@ -602,10 +603,8 @@ describe('Message filter matcher', function () { on: FilterPartTypeEnum.PAYLOAD, }, ]), - }), - { - payload: { payloadVarField: true }, - } + variables: { payload: { payloadVarField: true } }, + }) ); requestsCount = gotGetStub.callCount; @@ -623,8 +622,8 @@ describe('Message filter matcher', function () { }) ); - let matchedMessage = await messageMatcher.filter( - sendMessageCommand({ + let matchedMessage = await conditionsFilter.filter( + mapConditionsFilterCommand({ step: makeStep('Correct Match', 'AND', [ { operator: 'EQUAL', @@ -640,10 +639,8 @@ describe('Message filter matcher', function () { webhookUrl: 'www.user.com/webhook', }, ]), - }), - { - payload: { payloadVarField: false }, - } + variables: { payload: { payloadVarField: false } }, + }) ); let requestsCount = gotGetStub.callCount; @@ -653,8 +650,8 @@ describe('Message filter matcher', function () { //Reorder children order to make sure it is not random - matchedMessage = await messageMatcher.filter( - sendMessageCommand({ + matchedMessage = await conditionsFilter.filter( + mapConditionsFilterCommand({ step: makeStep('Correct Match', 'AND', [ { operator: 'EQUAL', @@ -670,10 +667,8 @@ describe('Message filter matcher', function () { on: FilterPartTypeEnum.PAYLOAD, }, ]), - }), - { - payload: { payloadVarField: false }, - } + variables: { payload: { payloadVarField: false } }, + }) ); requestsCount = gotGetStub.callCount; @@ -697,7 +692,7 @@ describe('Message filter matcher', function () { describe('isOnline', () => { it('allows to process multiple filter parts', async () => { - const matcher = new MessageMatcher( + const filter = new ConditionsFilter( { findOne: () => Promise.resolve(getSubscriber()) } as any, createExecutionDetails as any, undefined as any, @@ -705,8 +700,8 @@ describe('Message filter matcher', function () { undefined as any, undefined as any ); - const matchedMessage = await matcher.filter( - sendMessageCommand({ + const matchedMessage = await filter.filter( + mapConditionsFilterCommand({ step: makeStep('Correct Match', 'AND', [ { on: FilterPartTypeEnum.IS_ONLINE, @@ -719,16 +714,15 @@ describe('Message filter matcher', function () { on: FilterPartTypeEnum.PAYLOAD, }, ]), - }), - { - payload: { payloadVarField: true }, - } + variables: { payload: { payloadVarField: true } }, + }) ); + expect(matchedMessage.passed).to.equal(true); }); it("doesn't allow to process if the subscriber has no online fields set and filter is true", async () => { - const matcher = new MessageMatcher( + const filter = new ConditionsFilter( { findOne: () => Promise.resolve({ @@ -742,24 +736,23 @@ describe('Message filter matcher', function () { undefined as any, undefined as any ); - const matchedMessage = await matcher.filter( - sendMessageCommand({ + const matchedMessage = await filter.filter( + mapConditionsFilterCommand({ step: makeStep('Correct Match', 'AND', [ { on: FilterPartTypeEnum.IS_ONLINE, value: true, }, ]), - }), - { - payload: {}, - } + variables: { payload: {} }, + }) ); + expect(matchedMessage.passed).to.equal(false); }); it("doesn't allow to process if the subscriber has no online fields set and filter is false", async () => { - const matcher = new MessageMatcher( + const filter = new ConditionsFilter( { findOne: () => Promise.resolve({ @@ -773,24 +766,23 @@ describe('Message filter matcher', function () { undefined as any, undefined as any ); - const matchedMessage = await matcher.filter( - sendMessageCommand({ + const matchedMessage = await filter.filter( + mapConditionsFilterCommand({ step: makeStep('Correct Match', 'AND', [ { on: FilterPartTypeEnum.IS_ONLINE, value: false, }, ]), - }), - { - payload: {}, - } + variables: { payload: {} }, + }) ); + expect(matchedMessage.passed).to.equal(false); }); it('allows to process if the subscriber is online', async () => { - const matcher = new MessageMatcher( + const filter = new ConditionsFilter( { findOne: () => Promise.resolve(getSubscriber()) } as any, createExecutionDetails as any, undefined as any, @@ -798,24 +790,23 @@ describe('Message filter matcher', function () { undefined as any, undefined as any ); - const matchedMessage = await matcher.filter( - sendMessageCommand({ + const matchedMessage = await filter.filter( + mapConditionsFilterCommand({ step: makeStep('Correct Match', 'AND', [ { on: FilterPartTypeEnum.IS_ONLINE, value: true, }, ]), - }), - { - payload: {}, - } + variables: { payload: {} }, + }) ); + expect(matchedMessage.passed).to.equal(true); }); it("doesn't allow to process if the subscriber is not online", async () => { - const matcher = new MessageMatcher( + const filter = new ConditionsFilter( { findOne: () => Promise.resolve(getSubscriber({ isOnline: false })) } as any, createExecutionDetails as any, undefined as any, @@ -823,26 +814,25 @@ describe('Message filter matcher', function () { undefined as any, undefined as any ); - const matchedMessage = await matcher.filter( - sendMessageCommand({ + const matchedMessage = await filter.filter( + mapConditionsFilterCommand({ step: makeStep('Correct Match', 'AND', [ { on: FilterPartTypeEnum.IS_ONLINE, value: true, }, ]), - }), - { - payload: {}, - } + variables: { payload: {} }, + }) ); + expect(matchedMessage.passed).to.equal(false); }); }); describe('isOnlineInLast', () => { it('allows to process multiple filter parts', async () => { - const matcher = new MessageMatcher( + const filter = new ConditionsFilter( { findOne: () => Promise.resolve(getSubscriber({ isOnline: true }, { subDuration: { minutes: 3 } })), } as any, @@ -852,8 +842,8 @@ describe('Message filter matcher', function () { undefined as any, undefined as any ); - const matchedMessage = await matcher.filter( - sendMessageCommand({ + const matchedMessage = await filter.filter( + mapConditionsFilterCommand({ step: makeStep('Correct Match', 'AND', [ { on: FilterPartTypeEnum.IS_ONLINE_IN_LAST, @@ -867,16 +857,15 @@ describe('Message filter matcher', function () { on: FilterPartTypeEnum.PAYLOAD, }, ]), - }), - { - payload: { payloadVarField: true }, - } + variables: { payload: { payloadVarField: true } }, + }) ); + expect(matchedMessage.passed).to.equal(true); }); it("doesn't allow to process if the subscriber with no online fields set", async () => { - const matcher = new MessageMatcher( + const filter = new ConditionsFilter( { findOne: () => Promise.resolve({ @@ -890,8 +879,8 @@ describe('Message filter matcher', function () { undefined as any, undefined as any ); - const matchedMessage = await matcher.filter( - sendMessageCommand({ + const matchedMessage = await filter.filter( + mapConditionsFilterCommand({ step: makeStep('Correct Match', 'AND', [ { on: FilterPartTypeEnum.IS_ONLINE_IN_LAST, @@ -899,16 +888,15 @@ describe('Message filter matcher', function () { timeOperator: TimeOperatorEnum.MINUTES, }, ]), - }), - { - payload: {}, - } + variables: { payload: {} }, + }) ); + expect(matchedMessage.passed).to.equal(false); }); it('allows to process if the subscriber is still online', async () => { - const matcher = new MessageMatcher( + const filter = new ConditionsFilter( { findOne: () => Promise.resolve(getSubscriber({ isOnline: true }, { subDuration: { minutes: 10 } })), } as any, @@ -918,8 +906,8 @@ describe('Message filter matcher', function () { undefined as any, undefined as any ); - const matchedMessage = await matcher.filter( - sendMessageCommand({ + const matchedMessage = await filter.filter( + mapConditionsFilterCommand({ step: makeStep('Correct Match', 'AND', [ { on: FilterPartTypeEnum.IS_ONLINE_IN_LAST, @@ -927,16 +915,15 @@ describe('Message filter matcher', function () { timeOperator: TimeOperatorEnum.MINUTES, }, ]), - }), - { - payload: {}, - } + variables: { payload: {} }, + }) ); + expect(matchedMessage.passed).to.equal(true); }); it('allows to process if the subscriber was online in last 5 min', async () => { - const matcher = new MessageMatcher( + const filter = new ConditionsFilter( { findOne: () => Promise.resolve(getSubscriber({ isOnline: false }, { subDuration: { minutes: 4 } })), } as any, @@ -946,8 +933,8 @@ describe('Message filter matcher', function () { undefined as any, undefined as any ); - const matchedMessage = await matcher.filter( - sendMessageCommand({ + const matchedMessage = await filter.filter( + mapConditionsFilterCommand({ step: makeStep('Correct Match', 'AND', [ { on: FilterPartTypeEnum.IS_ONLINE_IN_LAST, @@ -955,16 +942,15 @@ describe('Message filter matcher', function () { timeOperator: TimeOperatorEnum.MINUTES, }, ]), - }), - { - payload: {}, - } + variables: { payload: {} }, + }) ); + expect(matchedMessage.passed).to.equal(true); }); it("doesn't allow to process if the subscriber was online more that last 5 min", async () => { - const matcher = new MessageMatcher( + const filter = new ConditionsFilter( { findOne: () => Promise.resolve(getSubscriber({ isOnline: false }, { subDuration: { minutes: 6 } })), } as any, @@ -974,8 +960,8 @@ describe('Message filter matcher', function () { undefined as any, undefined as any ); - const matchedMessage = await matcher.filter( - sendMessageCommand({ + const matchedMessage = await filter.filter( + mapConditionsFilterCommand({ step: makeStep('Correct Match', 'AND', [ { on: FilterPartTypeEnum.IS_ONLINE_IN_LAST, @@ -983,16 +969,15 @@ describe('Message filter matcher', function () { timeOperator: TimeOperatorEnum.MINUTES, }, ]), - }), - { - payload: {}, - } + variables: { payload: {} }, + }) ); + expect(matchedMessage.passed).to.equal(false); }); it('allows to process if the subscriber was online in last 1 hour', async () => { - const matcher = new MessageMatcher( + const filter = new ConditionsFilter( { findOne: () => Promise.resolve(getSubscriber({ isOnline: false }, { subDuration: { minutes: 30 } })), } as any, @@ -1002,8 +987,8 @@ describe('Message filter matcher', function () { undefined as any, undefined as any ); - const matchedMessage = await matcher.filter( - sendMessageCommand({ + const matchedMessage = await filter.filter( + mapConditionsFilterCommand({ step: makeStep('Correct Match', 'AND', [ { on: FilterPartTypeEnum.IS_ONLINE_IN_LAST, @@ -1011,16 +996,15 @@ describe('Message filter matcher', function () { timeOperator: TimeOperatorEnum.HOURS, }, ]), - }), - { - payload: {}, - } + variables: { payload: {} }, + }) ); + expect(matchedMessage.passed).to.equal(true); }); it('allows to process if the subscriber was online in last 1 day', async () => { - const matcher = new MessageMatcher( + const filter = new ConditionsFilter( { findOne: () => Promise.resolve(getSubscriber({ isOnline: false }, { subDuration: { hours: 23 } })), } as any, @@ -1030,8 +1014,8 @@ describe('Message filter matcher', function () { undefined as any, undefined as any ); - const matchedMessage = await matcher.filter( - sendMessageCommand({ + const matchedMessage = await filter.filter( + mapConditionsFilterCommand({ step: makeStep('Correct Match', 'AND', [ { on: FilterPartTypeEnum.IS_ONLINE_IN_LAST, @@ -1039,11 +1023,10 @@ describe('Message filter matcher', function () { timeOperator: TimeOperatorEnum.DAYS, }, ]), - }), - { - payload: {}, - } + variables: { payload: {} }, + }) ); + expect(matchedMessage.passed).to.equal(true); }); }); @@ -1051,7 +1034,7 @@ describe('Message filter matcher', function () { describe('it summarize used filters based on condition', () => { it('should add a passed condition', () => { - const result = MessageMatcher.sumFilters( + const result = ConditionsFilter.sumFilters( { filters: [], failedFilters: [], @@ -1074,7 +1057,7 @@ describe('Message filter matcher', function () { }); it('should add a failed condition', () => { - const result = MessageMatcher.sumFilters( + const result = ConditionsFilter.sumFilters( { filters: [], failedFilters: [], @@ -1097,7 +1080,7 @@ describe('Message filter matcher', function () { }); it('should add online for both cases of online', () => { - let result = MessageMatcher.sumFilters( + let result = ConditionsFilter.sumFilters( { filters: [], failedFilters: [], @@ -1118,7 +1101,7 @@ describe('Message filter matcher', function () { expect(result.filters.length).to.eq(1); expect(result.filters).to.contain('online'); - result = MessageMatcher.sumFilters( + result = ConditionsFilter.sumFilters( { filters: [], failedFilters: [], @@ -1172,21 +1155,20 @@ function makeStep( }; } -function sendMessageCommand({ step }: { step: NotificationStepEntity }): SendMessageCommand { +function mapConditionsFilterCommand({ + step, + variables, +}: { + step: NotificationStepEntity; + variables?: any; +}): ConditionsFilterCommand { return { - identifier: '123', - payload: {}, - overrides: {}, + variables: { ...(variables || {}) }, + filters: [], step, environmentId: '123', organizationId: '123', userId: '123', - transactionId: '123', - notificationId: '123', - _templateId: '123', - subscriberId: '1234', - _subscriberId: '123', - jobId: '123', job: { _notificationId: '123', transactionId: '123', diff --git a/apps/worker/src/app/workflow/usecases/message-matcher/message-matcher.usecase.ts b/apps/worker/src/app/workflow/usecases/message-matcher/message-matcher.usecase.ts index 2a8f508e0b2..4a04f92edfb 100644 --- a/apps/worker/src/app/workflow/usecases/message-matcher/message-matcher.usecase.ts +++ b/apps/worker/src/app/workflow/usecases/message-matcher/message-matcher.usecase.ts @@ -1,525 +1,522 @@ -import * as _ from 'lodash'; -import axios from 'axios'; -import { Injectable } from '@nestjs/common'; -import { parseISO, differenceInMinutes, differenceInHours, differenceInDays } from 'date-fns'; -import { - FilterParts, - IWebhookFilterPart, - IRealtimeOnlineFilterPart, - IOnlineInLastFilterPart, - FILTER_TO_LABEL, - FilterPartTypeEnum, - ICondition, - TimeOperatorEnum, - ChannelTypeEnum, - IPreviousStepFilterPart, - PreviousStepTypeEnum, - ExecutionDetailsSourceEnum, - ExecutionDetailsStatusEnum, -} from '@novu/shared'; -import { - SubscriberEntity, - EnvironmentRepository, - SubscriberRepository, - StepFilter, - ExecutionDetailsRepository, - MessageRepository, - JobRepository, -} from '@novu/dal'; -import { - DetailEnum, - CreateExecutionDetails, - CreateExecutionDetailsCommand, - buildSubscriberKey, - CachedEntity, - Instrument, - Filter, - FilterProcessingDetails, - IFilterVariables, -} from '@novu/application-generic'; -import { EmailEventStatusEnum } from '@novu/stateless'; - -import { EXCEPTION_MESSAGE_ON_WEBHOOK_FILTER, createHash, PlatformException } from '../../../shared/utils'; -import { MessageMatcherCommand } from './message-matcher.command'; - -const differenceIn = (currentDate: Date, lastDate: Date, timeOperator: TimeOperatorEnum) => { - if (timeOperator === TimeOperatorEnum.MINUTES) { - return differenceInMinutes(currentDate, lastDate); - } - - if (timeOperator === TimeOperatorEnum.HOURS) { - return differenceInHours(currentDate, lastDate); - } - - return differenceInDays(currentDate, lastDate); -}; - -@Injectable() -export class MessageMatcher extends Filter { - constructor( - private subscriberRepository: SubscriberRepository, - private createExecutionDetails: CreateExecutionDetails, - private environmentRepository: EnvironmentRepository, - private executionDetailsRepository: ExecutionDetailsRepository, - private messageRepository: MessageRepository, - private jobRepository: JobRepository - ) { - super(); - } - - public async filter( - command: MessageMatcherCommand, - variables: IFilterVariables, - prefiltering = false - ): Promise<{ - passed: boolean; - conditions: ICondition[]; - }> { - const { step } = command; - if (!step?.filters || !Array.isArray(step?.filters)) { - return { - passed: true, - conditions: [], - }; - } - if (step.filters?.length) { - const details: FilterProcessingDetails[] = []; - - const foundFilter = await this.findAsync(step.filters, async (filter) => { - const filterProcessingDetails = new FilterProcessingDetails(); - filterProcessingDetails.addFilter(filter, variables); - - const children = filter.children; - const noRules = !children || (Array.isArray(children) && children.length === 0); - if (noRules) { - return true; - } - - const singleRule = !children || (Array.isArray(children) && children.length === 1); - if (singleRule) { - const result = await this.processFilter(variables, children[0], command, filterProcessingDetails); - if (!prefiltering) { - await this.createExecutionDetails.execute( - CreateExecutionDetailsCommand.create({ - ...CreateExecutionDetailsCommand.getDetailsFromJob(command.job), - detail: DetailEnum.PROCESSING_STEP_FILTER, - source: ExecutionDetailsSourceEnum.INTERNAL, - status: ExecutionDetailsStatusEnum.PENDING, - isTest: false, - isRetry: false, - raw: filterProcessingDetails.toString(), - }) - ); - } - - details.push(filterProcessingDetails); - - return result; - } - - const result = await this.handleGroupFilters(filter, variables, command, filterProcessingDetails); - if (!prefiltering) { - await this.createExecutionDetails.execute( - CreateExecutionDetailsCommand.create({ - ...CreateExecutionDetailsCommand.getDetailsFromJob(command.job), - detail: DetailEnum.PROCESSING_STEP_FILTER, - source: ExecutionDetailsSourceEnum.INTERNAL, - status: ExecutionDetailsStatusEnum.PENDING, - isTest: false, - isRetry: false, - raw: filterProcessingDetails.toString(), - }) - ); - } - - details.push(filterProcessingDetails); - - return result; - }); - - const conditions = details - .map((detail) => detail.toObject().conditions) - .reduce((conditionsArray, collection) => [...collection, ...conditionsArray], []); - - return { - passed: !!foundFilter, - conditions: conditions, - }; - } - - return { - passed: true, - conditions: [], - }; - } - - public static sumFilters( - summary: { - filters: string[]; - failedFilters: string[]; - passedFilters: string[]; - }, - condition: ICondition - ) { - let type: string = condition.filter?.toLowerCase(); - - if (condition.filter === FILTER_TO_LABEL.isOnline || condition.filter === FILTER_TO_LABEL.isOnlineInLast) { - type = 'online'; - } - - return Filter.sumFilters(summary, condition, type); - } - - private async handleGroupFilters( - filter: StepFilter, - variables: IFilterVariables, - command: MessageMatcherCommand, - filterProcessingDetails: FilterProcessingDetails - ): Promise { - if (filter.value === 'OR') { - return await this.handleOrFilters(filter, variables, command, filterProcessingDetails); - } - - if (filter.value === 'AND') { - return await this.handleAndFilters(filter, variables, command, filterProcessingDetails); - } - - return false; - } - - private splitFilters(filter: StepFilter) { - const webhookFilters = filter.children.filter((childFilter) => childFilter.on === 'webhook'); - - const otherFilters = filter.children.filter((childFilter) => childFilter.on !== 'webhook'); - - return { webhookFilters, otherFilters }; - } - - private async handleAndFilters( - filter: StepFilter, - variables: IFilterVariables, - command: MessageMatcherCommand, - filterProcessingDetails: FilterProcessingDetails - ): Promise { - const { webhookFilters, otherFilters } = this.splitFilters(filter); - - const matchedOtherFilters = await this.filterAsync(otherFilters, (i) => - this.processFilter(variables, i, command, filterProcessingDetails) - ); - if (otherFilters.length !== matchedOtherFilters.length) { - return false; - } - - const matchedWebhookFilters = await this.filterAsync(webhookFilters, (i) => - this.processFilter(variables, i, command, filterProcessingDetails) - ); - - return matchedWebhookFilters.length === webhookFilters.length; - } - - private async handleOrFilters( - filter: StepFilter, - variables: IFilterVariables, - command: MessageMatcherCommand, - filterProcessingDetails: FilterProcessingDetails - ): Promise { - const { webhookFilters, otherFilters } = this.splitFilters(filter); - - const foundFilter = await this.findAsync(otherFilters, (i) => - this.processFilter(variables, i, command, filterProcessingDetails) - ); - if (foundFilter) { - return true; - } - - return !!(await this.findAsync(webhookFilters, (i) => - this.processFilter(variables, i, command, filterProcessingDetails) - )); - } - - private async processPreviousStep( - filter: IPreviousStepFilterPart, - command: MessageMatcherCommand, - filterProcessingDetails: FilterProcessingDetails - ): Promise { - const job = await this.jobRepository.findOne({ - transactionId: command.transactionId, - // backward compatibility - ternary needed to be removed once the queue renewed - _subscriberId: command._subscriberId ? command._subscriberId : command.subscriberId, - _environmentId: command.environmentId, - _organizationId: command.organizationId, - 'step.uuid': filter.step, - }); - - if (!job) { - return true; - } - - const message = await this.messageRepository.findOne({ - _jobId: job._id, - _environmentId: command.environmentId, - // backward compatibility - ternary needed to be removed once the queue renewed - _subscriberId: command._subscriberId ? command._subscriberId : command.subscriberId, - transactionId: command.transactionId, - }); - - if (!message) { - return true; - } - - const label = FILTER_TO_LABEL[filter.on]; - const field = filter.stepType; - const expected = 'true'; - const operator = 'EQUAL'; - - if (message?.channel === ChannelTypeEnum.EMAIL) { - const count = await this.executionDetailsRepository.count({ - _jobId: command.job._parentId, - _messageId: message._id, - _environmentId: command.environmentId, - webhookStatus: EmailEventStatusEnum.OPENED, - }); - - const passed = [PreviousStepTypeEnum.UNREAD, PreviousStepTypeEnum.UNSEEN].includes(filter.stepType) - ? count === 0 - : count > 0; - - filterProcessingDetails.addCondition({ - filter: label, - field, - expected, - actual: `${passed}`, - operator, - passed, - }); - - return passed; - } - - const value = [PreviousStepTypeEnum.SEEN, PreviousStepTypeEnum.UNSEEN].includes(filter.stepType) - ? message.seen - : message.read; - const passed = [PreviousStepTypeEnum.UNREAD, PreviousStepTypeEnum.UNSEEN].includes(filter.stepType) - ? value === false - : value; - - filterProcessingDetails.addCondition({ - filter: label, - field, - expected, - actual: `${passed}`, - operator, - passed, - }); - - return passed; - } - - private async processIsOnline( - filter: IRealtimeOnlineFilterPart | IOnlineInLastFilterPart, - command: MessageMatcherCommand, - filterProcessingDetails: FilterProcessingDetails - ): Promise { - const subscriber = await this.subscriberRepository.findOne({ - _id: command.subscriberId, - _organizationId: command.organizationId, - _environmentId: command.environmentId, - }); - - const hasNoOnlineFieldsSet = - typeof subscriber?.isOnline === 'undefined' && typeof subscriber?.lastOnlineAt === 'undefined'; - const isOnlineString = `${subscriber?.isOnline ?? ''}`; - const lastOnlineAtString = `${subscriber?.lastOnlineAt ?? ''}`; - // the old subscriber created before the is online functionality should not be processed - if (hasNoOnlineFieldsSet) { - filterProcessingDetails.addCondition({ - filter: FILTER_TO_LABEL[filter.on], - field: 'isOnline', - expected: `${filter.value}`, - actual: `${filter.on === FilterPartTypeEnum.IS_ONLINE ? isOnlineString : lastOnlineAtString}`, - operator: filter.on === FilterPartTypeEnum.IS_ONLINE ? 'EQUAL' : filter.timeOperator, - passed: false, - }); - - return false; - } - - const isOnlineMatch = subscriber?.isOnline === filter.value; - if (filter.on === FilterPartTypeEnum.IS_ONLINE) { - filterProcessingDetails.addCondition({ - filter: FILTER_TO_LABEL[filter.on], - field: 'isOnline', - expected: `${filter.value}`, - actual: isOnlineString, - operator: 'EQUAL', - passed: isOnlineMatch, - }); - - return isOnlineMatch; - } - - const currentDate = new Date(); - const lastOnlineAt = subscriber?.lastOnlineAt ? parseISO(subscriber?.lastOnlineAt) : new Date(); - const diff = differenceIn(currentDate, lastOnlineAt, filter.timeOperator); - const result = subscriber?.isOnline || (!subscriber?.isOnline && diff >= 0 && diff <= filter.value); - - filterProcessingDetails.addCondition({ - filter: FILTER_TO_LABEL[filter.on], - field: subscriber?.isOnline ? 'isOnline' : 'lastOnlineAt', - expected: subscriber?.isOnline ? 'true' : `${filter.value}`, - actual: `${subscriber?.isOnline ? 'true' : diff}`, - operator: filter.timeOperator, - passed: result, - }); - - return result; - } - - private async getWebhookResponse( - child: IWebhookFilterPart, - variables: IFilterVariables, - command: MessageMatcherCommand - ): Promise | undefined> { - if (!child.webhookUrl) return undefined; - - const payload = await this.buildPayload(variables, command); - - const hmac = await this.buildHmac(command); - - const config = { - headers: { - 'nv-hmac-256': hmac, - }, - }; - - try { - return await axios.post(child.webhookUrl, payload, config).then((response) => { - return response.data as Record; - }); - } catch (err) { - throw new Error( - JSON.stringify({ - message: err.message, - data: EXCEPTION_MESSAGE_ON_WEBHOOK_FILTER, - }) - ); - } - } - - private async buildPayload(variables: IFilterVariables, command: MessageMatcherCommand) { - if (process.env.NODE_ENV === 'test') return variables; - - const payload: Partial<{ - subscriber: SubscriberEntity | null; - payload: Record; - identifier: string; - channel: string; - providerId: string; - }> = {}; - - if (variables.subscriber) { - payload.subscriber = variables.subscriber; - } else { - payload.subscriber = await this.subscriberRepository.findBySubscriberId( - command.environmentId, - command.subscriberId - ); - } - - if (variables.payload) { - payload.payload = variables.payload; - } - - payload.identifier = command.identifier; - payload.channel = command.job.type; - - if (command.job.providerId) { - payload.providerId = command.job.providerId; - } - - return payload; - } - - private async buildHmac(command: MessageMatcherCommand): Promise { - if (process.env.NODE_ENV === 'test') return ''; - - const environment = await this.environmentRepository.findOne({ - _id: command.environmentId, - _organizationId: command.organizationId, - }); - if (!environment) throw new PlatformException('Environment is not found'); - - return createHash(environment.apiKeys[0].key, command.environmentId); - } - - private async processFilter( - variables: IFilterVariables, - child: FilterParts, - command: MessageMatcherCommand, - filterProcessingDetails: FilterProcessingDetails - ): Promise { - let passed = false; - - if (child.on === FilterPartTypeEnum.WEBHOOK) { - if (process.env.NODE_ENV === 'test') return true; - - const res = await this.getWebhookResponse(child, variables, command); - passed = this.processFilterEquality({ payload: undefined, webhook: res }, child, filterProcessingDetails); - } - - if (child.on === FilterPartTypeEnum.PAYLOAD || child.on === FilterPartTypeEnum.SUBSCRIBER) { - passed = this.processFilterEquality(variables, child, filterProcessingDetails); - } - - if (child.on === FilterPartTypeEnum.IS_ONLINE || child.on === FilterPartTypeEnum.IS_ONLINE_IN_LAST) { - passed = await this.processIsOnline(child, command, filterProcessingDetails); - } - - if (child.on === FilterPartTypeEnum.PREVIOUS_STEP) { - passed = await this.processPreviousStep(child, command, filterProcessingDetails); - } - - return passed; - } - - @Instrument() - public async getFilterData(command: MessageMatcherCommand) { - const subscriberFilterExist = command.step?.filters?.find((filter) => { - return filter?.children?.find((item) => item?.on === 'subscriber'); - }); - - let subscriber; - - if (subscriberFilterExist) { - subscriber = await this.getSubscriberBySubscriberId({ - subscriberId: command.subscriberId, - _environmentId: command.environmentId, - }); - } - - return { - subscriber, - payload: command.job.payload, - }; - } - - @CachedEntity({ - builder: (command: { subscriberId: string; _environmentId: string }) => - buildSubscriberKey({ - _environmentId: command._environmentId, - subscriberId: command.subscriberId, - }), - }) - public async getSubscriberBySubscriberId({ - subscriberId, - _environmentId, - }: { - subscriberId: string; - _environmentId: string; - }) { - return await this.subscriberRepository.findOne({ - _environmentId, - subscriberId, - }); - } -} +/* + * import * as _ from 'lodash'; + * import axios from 'axios'; + * import { Injectable } from '@nestjs/common'; + * import { parseISO, differenceInMinutes, differenceInHours, differenceInDays } from 'date-fns'; + * import { + * FilterParts, + * IWebhookFilterPart, + * IRealtimeOnlineFilterPart, + * IOnlineInLastFilterPart, + * FILTER_TO_LABEL, + * FilterPartTypeEnum, + * ICondition, + * TimeOperatorEnum, + * ChannelTypeEnum, + * IPreviousStepFilterPart, + * PreviousStepTypeEnum, + * ExecutionDetailsSourceEnum, + * ExecutionDetailsStatusEnum, + * } from '@novu/shared'; + * import { + * SubscriberEntity, + * EnvironmentRepository, + * SubscriberRepository, + * StepFilter, + * ExecutionDetailsRepository, + * MessageRepository, + * JobRepository, + * } from '@novu/dal'; + * import { + * DetailEnum, + * CreateExecutionDetails, + * CreateExecutionDetailsCommand, + * buildSubscriberKey, + * CachedEntity, + * Instrument, + * Filter, + * FilterProcessingDetails, + * IFilterVariables, + * } from '@novu/application-generic'; + * import { EmailEventStatusEnum } from '@novu/stateless'; + * + * import { EXCEPTION_MESSAGE_ON_WEBHOOK_FILTER, createHash, PlatformException } from '../../../shared/utils'; + * import { MessageMatcherCommand } from './message-matcher.command'; + * + * const differenceIn = (currentDate: Date, lastDate: Date, timeOperator: TimeOperatorEnum) => { + * if (timeOperator === TimeOperatorEnum.MINUTES) { + * return differenceInMinutes(currentDate, lastDate); + * } + * + * if (timeOperator === TimeOperatorEnum.HOURS) { + * return differenceInHours(currentDate, lastDate); + * } + * + * return differenceInDays(currentDate, lastDate); + * }; + * + * @Injectable() + * class MessageMatcher extends Filter { + * constructor( + * private subscriberRepository: SubscriberRepository, + * private createExecutionDetails: CreateExecutionDetails, + * private environmentRepository: EnvironmentRepository, + * private executionDetailsRepository: ExecutionDetailsRepository, + * private messageRepository: MessageRepository, + * private jobRepository: JobRepository + * ) { + * super(); + * } + * + * public async filter( + * command: MessageMatcherCommand, + * variables: IFilterVariables + * ): Promise<{ + * passed: boolean; + * conditions: ICondition[]; + * }> { + * const { step } = command; + * if (!step?.filters || !Array.isArray(step?.filters)) { + * return { + * passed: true, + * conditions: [], + * }; + * } + * if (step.filters?.length) { + * const details: FilterProcessingDetails[] = []; + * + * const foundFilter = await this.findAsync(step.filters, async (filter) => { + * const filterProcessingDetails = new FilterProcessingDetails(); + * filterProcessingDetails.addFilter(filter, variables); + * + * const children = filter.children; + * const noRules = !children || (Array.isArray(children) && children.length === 0); + * if (noRules) { + * return true; + * } + * + * const singleRule = !children || (Array.isArray(children) && children.length === 1); + * if (singleRule) { + * const result = await this.processFilter(variables, children[0], command, filterProcessingDetails); + * await this.createExecutionDetails.execute( + * CreateExecutionDetailsCommand.create({ + * ...CreateExecutionDetailsCommand.getDetailsFromJob(command.job), + * detail: DetailEnum.PROCESSING_STEP_FILTER, + * source: ExecutionDetailsSourceEnum.INTERNAL, + * status: ExecutionDetailsStatusEnum.PENDING, + * isTest: false, + * isRetry: false, + * raw: filterProcessingDetails.toString(), + * }) + * ); + * + * details.push(filterProcessingDetails); + * + * return result; + * } + * + * const result = await this.handleGroupFilters(filter, variables, command, filterProcessingDetails); + * await this.createExecutionDetails.execute( + * CreateExecutionDetailsCommand.create({ + * ...CreateExecutionDetailsCommand.getDetailsFromJob(command.job), + * detail: DetailEnum.PROCESSING_STEP_FILTER, + * source: ExecutionDetailsSourceEnum.INTERNAL, + * status: ExecutionDetailsStatusEnum.PENDING, + * isTest: false, + * isRetry: false, + * raw: filterProcessingDetails.toString(), + * }) + * ); + * + * details.push(filterProcessingDetails); + * + * return result; + * }); + * + * const conditions = details + * .map((detail) => detail.toObject().conditions) + * .reduce((conditionsArray, collection) => [...collection, ...conditionsArray], []); + * + * return { + * passed: !!foundFilter, + * conditions: conditions, + * }; + * } + * + * return { + * passed: true, + * conditions: [], + * }; + * } + * + * public static sumFilters( + * summary: { + * filters: string[]; + * failedFilters: string[]; + * passedFilters: string[]; + * }, + * condition: ICondition + * ) { + * let type: string = condition.filter?.toLowerCase(); + * + * if (condition.filter === FILTER_TO_LABEL.isOnline || condition.filter === FILTER_TO_LABEL.isOnlineInLast) { + * type = 'online'; + * } + * + * return Filter.sumFilters(summary, condition, type); + * } + * + * private async handleGroupFilters( + * filter: StepFilter, + * variables: IFilterVariables, + * command: MessageMatcherCommand, + * filterProcessingDetails: FilterProcessingDetails + * ): Promise { + * if (filter.value === 'OR') { + * return await this.handleOrFilters(filter, variables, command, filterProcessingDetails); + * } + * + * if (filter.value === 'AND') { + * return await this.handleAndFilters(filter, variables, command, filterProcessingDetails); + * } + * + * return false; + * } + * + * private splitFilters(filter: StepFilter) { + * const webhookFilters = filter.children.filter((childFilter) => childFilter.on === 'webhook'); + * + * const otherFilters = filter.children.filter((childFilter) => childFilter.on !== 'webhook'); + * + * return { webhookFilters, otherFilters }; + * } + * + * private async handleAndFilters( + * filter: StepFilter, + * variables: IFilterVariables, + * command: MessageMatcherCommand, + * filterProcessingDetails: FilterProcessingDetails + * ): Promise { + * const { webhookFilters, otherFilters } = this.splitFilters(filter); + * + * const matchedOtherFilters = await this.filterAsync(otherFilters, (i) => + * this.processFilter(variables, i, command, filterProcessingDetails) + * ); + * if (otherFilters.length !== matchedOtherFilters.length) { + * return false; + * } + * + * const matchedWebhookFilters = await this.filterAsync(webhookFilters, (i) => + * this.processFilter(variables, i, command, filterProcessingDetails) + * ); + * + * return matchedWebhookFilters.length === webhookFilters.length; + * } + * + * private async handleOrFilters( + * filter: StepFilter, + * variables: IFilterVariables, + * command: MessageMatcherCommand, + * filterProcessingDetails: FilterProcessingDetails + * ): Promise { + * const { webhookFilters, otherFilters } = this.splitFilters(filter); + * + * const foundFilter = await this.findAsync(otherFilters, (i) => + * this.processFilter(variables, i, command, filterProcessingDetails) + * ); + * if (foundFilter) { + * return true; + * } + * + * return !!(await this.findAsync(webhookFilters, (i) => + * this.processFilter(variables, i, command, filterProcessingDetails) + * )); + * } + * + * private async processPreviousStep( + * filter: IPreviousStepFilterPart, + * command: MessageMatcherCommand, + * filterProcessingDetails: FilterProcessingDetails + * ): Promise { + * const job = await this.jobRepository.findOne({ + * transactionId: command.transactionId, + * // backward compatibility - ternary needed to be removed once the queue renewed + * _subscriberId: command._subscriberId ? command._subscriberId : command.subscriberId, + * _environmentId: command.environmentId, + * _organizationId: command.organizationId, + * 'step.uuid': filter.step, + * }); + * + * if (!job) { + * return true; + * } + * + * const message = await this.messageRepository.findOne({ + * _jobId: job._id, + * _environmentId: command.environmentId, + * // backward compatibility - ternary needed to be removed once the queue renewed + * _subscriberId: command._subscriberId ? command._subscriberId : command.subscriberId, + * transactionId: command.transactionId, + * }); + * + * if (!message) { + * return true; + * } + * + * const label = FILTER_TO_LABEL[filter.on]; + * const field = filter.stepType; + * const expected = 'true'; + * const operator = 'EQUAL'; + * + * if (message?.channel === ChannelTypeEnum.EMAIL) { + * const count = await this.executionDetailsRepository.count({ + * _jobId: command.job._parentId, + * _messageId: message._id, + * _environmentId: command.environmentId, + * webhookStatus: EmailEventStatusEnum.OPENED, + * }); + * + * const passed = [PreviousStepTypeEnum.UNREAD, PreviousStepTypeEnum.UNSEEN].includes(filter.stepType) + * ? count === 0 + * : count > 0; + * + * filterProcessingDetails.addCondition({ + * filter: label, + * field, + * expected, + * actual: `${passed}`, + * operator, + * passed, + * }); + * + * return passed; + * } + * + * const value = [PreviousStepTypeEnum.SEEN, PreviousStepTypeEnum.UNSEEN].includes(filter.stepType) + * ? message.seen + * : message.read; + * const passed = [PreviousStepTypeEnum.UNREAD, PreviousStepTypeEnum.UNSEEN].includes(filter.stepType) + * ? value === false + * : value; + * + * filterProcessingDetails.addCondition({ + * filter: label, + * field, + * expected, + * actual: `${passed}`, + * operator, + * passed, + * }); + * + * return passed; + * } + * + * private async processIsOnline( + * filter: IRealtimeOnlineFilterPart | IOnlineInLastFilterPart, + * command: MessageMatcherCommand, + * filterProcessingDetails: FilterProcessingDetails + * ): Promise { + * const subscriber = await this.subscriberRepository.findOne({ + * _id: command.subscriberId, + * _organizationId: command.organizationId, + * _environmentId: command.environmentId, + * }); + * + * const hasNoOnlineFieldsSet = + * typeof subscriber?.isOnline === 'undefined' && typeof subscriber?.lastOnlineAt === 'undefined'; + * const isOnlineString = `${subscriber?.isOnline ?? ''}`; + * const lastOnlineAtString = `${subscriber?.lastOnlineAt ?? ''}`; + * // the old subscriber created before the is online functionality should not be processed + * if (hasNoOnlineFieldsSet) { + * filterProcessingDetails.addCondition({ + * filter: FILTER_TO_LABEL[filter.on], + * field: 'isOnline', + * expected: `${filter.value}`, + * actual: `${filter.on === FilterPartTypeEnum.IS_ONLINE ? isOnlineString : lastOnlineAtString}`, + * operator: filter.on === FilterPartTypeEnum.IS_ONLINE ? 'EQUAL' : filter.timeOperator, + * passed: false, + * }); + * + * return false; + * } + * + * const isOnlineMatch = subscriber?.isOnline === filter.value; + * if (filter.on === FilterPartTypeEnum.IS_ONLINE) { + * filterProcessingDetails.addCondition({ + * filter: FILTER_TO_LABEL[filter.on], + * field: 'isOnline', + * expected: `${filter.value}`, + * actual: isOnlineString, + * operator: 'EQUAL', + * passed: isOnlineMatch, + * }); + * + * return isOnlineMatch; + * } + * + * const currentDate = new Date(); + * const lastOnlineAt = subscriber?.lastOnlineAt ? parseISO(subscriber?.lastOnlineAt) : new Date(); + * const diff = differenceIn(currentDate, lastOnlineAt, filter.timeOperator); + * const result = subscriber?.isOnline || (!subscriber?.isOnline && diff >= 0 && diff <= filter.value); + * + * filterProcessingDetails.addCondition({ + * filter: FILTER_TO_LABEL[filter.on], + * field: subscriber?.isOnline ? 'isOnline' : 'lastOnlineAt', + * expected: subscriber?.isOnline ? 'true' : `${filter.value}`, + * actual: `${subscriber?.isOnline ? 'true' : diff}`, + * operator: filter.timeOperator, + * passed: result, + * }); + * + * return result; + * } + * + * private async getWebhookResponse( + * child: IWebhookFilterPart, + * variables: IFilterVariables, + * command: MessageMatcherCommand + * ): Promise | undefined> { + * if (!child.webhookUrl) return undefined; + * + * const payload = await this.buildPayload(variables, command); + * + * const hmac = await this.buildHmac(command); + * + * const config = { + * headers: { + * 'nv-hmac-256': hmac, + * }, + * }; + * + * try { + * return await axios.post(child.webhookUrl, payload, config).then((response) => { + * return response.data as Record; + * }); + * } catch (err) { + * throw new Error( + * JSON.stringify({ + * message: err.message, + * data: EXCEPTION_MESSAGE_ON_WEBHOOK_FILTER, + * }) + * ); + * } + * } + * + * private async buildPayload(variables: IFilterVariables, command: MessageMatcherCommand) { + * if (process.env.NODE_ENV === 'test') return variables; + * + * const payload: Partial<{ + * subscriber: SubscriberEntity | null; + * payload: Record; + * identifier: string; + * channel: string; + * providerId: string; + * }> = {}; + * + * if (variables.subscriber) { + * payload.subscriber = variables.subscriber; + * } else { + * payload.subscriber = await this.subscriberRepository.findBySubscriberId( + * command.environmentId, + * command.subscriberId + * ); + * } + * + * if (variables.payload) { + * payload.payload = variables.payload; + * } + * + * payload.identifier = command.identifier; + * payload.channel = command.job.type; + * + * if (command.job.providerId) { + * payload.providerId = command.job.providerId; + * } + * + * return payload; + * } + * + * private async buildHmac(command: MessageMatcherCommand): Promise { + * if (process.env.NODE_ENV === 'test') return ''; + * + * const environment = await this.environmentRepository.findOne({ + * _id: command.environmentId, + * _organizationId: command.organizationId, + * }); + * if (!environment) throw new PlatformException('Environment is not found'); + * + * return createHash(environment.apiKeys[0].key, command.environmentId); + * } + * + * private async processFilter( + * variables: IFilterVariables, + * child: FilterParts, + * command: MessageMatcherCommand, + * filterProcessingDetails: FilterProcessingDetails + * ): Promise { + * let passed = false; + * + * if (child.on === FilterPartTypeEnum.WEBHOOK) { + * if (process.env.NODE_ENV === 'test') return true; + * + * const res = await this.getWebhookResponse(child, variables, command); + * passed = this.processFilterEquality({ payload: undefined, webhook: res }, child, filterProcessingDetails); + * } + * + * if (child.on === FilterPartTypeEnum.PAYLOAD || child.on === FilterPartTypeEnum.SUBSCRIBER) { + * passed = this.processFilterEquality(variables, child, filterProcessingDetails); + * } + * + * if (child.on === FilterPartTypeEnum.IS_ONLINE || child.on === FilterPartTypeEnum.IS_ONLINE_IN_LAST) { + * passed = await this.processIsOnline(child, command, filterProcessingDetails); + * } + * + * if (child.on === FilterPartTypeEnum.PREVIOUS_STEP) { + * passed = await this.processPreviousStep(child, command, filterProcessingDetails); + * } + * + * return passed; + * } + * + * @Instrument() + * public async getFilterData(command: MessageMatcherCommand) { + * const subscriberFilterExist = command.step?.filters?.find((filter) => { + * return filter?.children?.find((item) => item?.on === 'subscriber'); + * }); + * + * let subscriber; + * + * if (subscriberFilterExist) { + * subscriber = await this.getSubscriberBySubscriberId({ + * subscriberId: command.subscriberId, + * _environmentId: command.environmentId, + * }); + * } + * + * return { + * subscriber, + * payload: command.job.payload, + * }; + * } + * + * @CachedEntity({ + * builder: (command: { subscriberId: string; _environmentId: string }) => + * buildSubscriberKey({ + * _environmentId: command._environmentId, + * subscriberId: command.subscriberId, + * }), + * }) + * public async getSubscriberBySubscriberId({ + * subscriberId, + * _environmentId, + * }: { + * subscriberId: string; + * _environmentId: string; + * }) { + * return await this.subscriberRepository.findOne({ + * _environmentId, + * subscriberId, + * }); + * } + * } + */ diff --git a/apps/worker/src/app/workflow/usecases/queue-next-job/queue-next-job.usecase.ts b/apps/worker/src/app/workflow/usecases/queue-next-job/queue-next-job.usecase.ts index aa7b2cbea33..248e4a5dbb0 100644 --- a/apps/worker/src/app/workflow/usecases/queue-next-job/queue-next-job.usecase.ts +++ b/apps/worker/src/app/workflow/usecases/queue-next-job/queue-next-job.usecase.ts @@ -1,18 +1,16 @@ import { forwardRef, Inject, Injectable } from '@nestjs/common'; import { JobEntity, JobRepository } from '@novu/dal'; -import { AddJob, InstrumentUsecase } from '@novu/application-generic'; +import { AddJob, ConditionsFilter, ConditionsFilterCommand, InstrumentUsecase } from '@novu/application-generic'; import { QueueNextJobCommand } from './queue-next-job.command'; -import { MessageMatcher } from '../message-matcher/message-matcher.usecase'; import { StepTypeEnum } from '@novu/shared'; -import { MessageMatcherCommand } from '../message-matcher/message-matcher.command'; @Injectable() export class QueueNextJob { constructor( private jobRepository: JobRepository, @Inject(forwardRef(() => AddJob)) private addJobUsecase: AddJob, - private messageMatcher: MessageMatcher + private conditionsFilter: ConditionsFilter ) {} @InstrumentUsecase() @@ -29,19 +27,14 @@ export class QueueNextJob { let filtered = false; if ([StepTypeEnum.DELAY, StepTypeEnum.DIGEST].includes(job.type as StepTypeEnum)) { - const messageMatcherCommand = MessageMatcherCommand.create({ - step: job.step, - job: job, - userId: command.userId, - transactionId: job.transactionId, - _subscriberId: job._subscriberId, - environmentId: job._environmentId, - organizationId: job._organizationId, - subscriberId: job.subscriberId, - identifier: job.identifier, - }); - const payload = await this.messageMatcher.getFilterData(messageMatcherCommand); - const shouldRun = await this.messageMatcher.filter(messageMatcherCommand, payload, true); + const shouldRun = await this.conditionsFilter.filter( + ConditionsFilterCommand.create({ + filters: job.step.filters || [], + environmentId: command.environmentId, + organizationId: command.organizationId, + userId: command.userId, + }) + ); filtered = !shouldRun.passed; } diff --git a/apps/worker/src/app/workflow/usecases/send-message/send-message.usecase.ts b/apps/worker/src/app/workflow/usecases/send-message/send-message.usecase.ts index d68528ac9f0..851158fadf1 100644 --- a/apps/worker/src/app/workflow/usecases/send-message/send-message.usecase.ts +++ b/apps/worker/src/app/workflow/usecases/send-message/send-message.usecase.ts @@ -19,6 +19,8 @@ import { GetSubscriberTemplatePreference, GetSubscriberTemplatePreferenceCommand, Instrument, + ConditionsFilterCommand, + ConditionsFilter, } from '@novu/application-generic'; import { JobEntity, @@ -37,8 +39,6 @@ import { SendMessageChat } from './send-message-chat.usecase'; import { SendMessagePush } from './send-message-push.usecase'; import { Digest } from './digest'; import { PlatformException } from '../../../shared/utils'; -import { MessageMatcher } from '../message-matcher'; -import { MessageMatcherCommand } from '../message-matcher/message-matcher.command'; @Injectable() export class SendMessage { @@ -54,7 +54,8 @@ export class SendMessage { private notificationTemplateRepository: NotificationTemplateRepository, private jobRepository: JobRepository, private sendMessageDelay: SendMessageDelay, - private matchMessage: MessageMatcher, + private conditionsFilter: ConditionsFilter, + private subscriberRepository: SubscriberRepository, private analyticsService: AnalyticsService ) {} @@ -66,7 +67,7 @@ export class SendMessage { const stepType = command.step?.template?.type; if (!command.payload?.$on_boarding_trigger) { - const usedFilters = shouldRun.conditions.reduce(MessageMatcher.sumFilters, { + const usedFilters = shouldRun.conditions.reduce(ConditionsFilter.sumFilters, { filters: [], failedFilters: [], passedFilters: [], @@ -144,21 +145,14 @@ export class SendMessage { } private async filter(command: SendMessageCommand) { - const messageMatcherCommand = MessageMatcherCommand.create({ - step: command.job.step, - job: command.job, - userId: command.userId, - transactionId: command.job.transactionId, - _subscriberId: command.job._subscriberId, - environmentId: command.job._environmentId, - organizationId: command.job._organizationId, - subscriberId: command.job.subscriberId, - identifier: command.job.identifier, - }); - - const data = await this.matchMessage.getFilterData(messageMatcherCommand); - - const shouldRun = await this.matchMessage.filter(messageMatcherCommand, data); + const shouldRun = await this.conditionsFilter.filter( + ConditionsFilterCommand.create({ + filters: command.job.step.filters || [], + environmentId: command.environmentId, + organizationId: command.organizationId, + userId: command.userId, + }) + ); if (!shouldRun.passed) { await this.createExecutionDetails.execute( @@ -170,7 +164,7 @@ export class SendMessage { isTest: false, isRetry: false, raw: JSON.stringify({ - payload: data, + payload: shouldRun.variables, filters: command.step.filters, }), }) @@ -194,7 +188,7 @@ export class SendMessage { return true; } - const subscriber = await this.matchMessage.getSubscriberBySubscriberId({ + const subscriber = await this.getSubscriberBySubscriberId({ _environmentId: job._environmentId, subscriberId: job.subscriberId, }); @@ -240,6 +234,26 @@ export class SendMessage { return await this.notificationTemplateRepository.findById(_id, environmentId); } + @CachedEntity({ + builder: (command: { subscriberId: string; _environmentId: string }) => + buildSubscriberKey({ + _environmentId: command._environmentId, + subscriberId: command.subscriberId, + }), + }) + public async getSubscriberBySubscriberId({ + subscriberId, + _environmentId, + }: { + subscriberId: string; + _environmentId: string; + }) { + return await this.subscriberRepository.findOne({ + _environmentId, + subscriberId, + }); + } + @Instrument() private stepPreferred(preference: { enabled: boolean; channels: IPreferenceChannels }, job: JobEntity) { const templatePreferred = preference.enabled; diff --git a/apps/worker/src/app/workflow/workflow.module.ts b/apps/worker/src/app/workflow/workflow.module.ts index 3868164b76a..2161d9ae1ef 100644 --- a/apps/worker/src/app/workflow/workflow.module.ts +++ b/apps/worker/src/app/workflow/workflow.module.ts @@ -38,7 +38,6 @@ import { } from './services'; import { - MessageMatcher, SendMessage, SendMessageChat, SendMessageDelay, @@ -84,7 +83,6 @@ const USE_CASES = [ GetSubscriberPreference, GetSubscriberTemplatePreference, HandleLastFailedJob, - MessageMatcher, ProcessTenant, QueueNextJob, RunJob, diff --git a/libs/dal/src/repositories/notification-template/notification-template.entity.ts b/libs/dal/src/repositories/notification-template/notification-template.entity.ts index 0793332186d..fb6c225f9ad 100644 --- a/libs/dal/src/repositories/notification-template/notification-template.entity.ts +++ b/libs/dal/src/repositories/notification-template/notification-template.entity.ts @@ -13,6 +13,7 @@ import { IStepVariant, INotificationTrigger, TriggerTypeEnum, + IMessageFilter, } from '@novu/shared'; import { MessageTemplateEntity } from '../message-template'; @@ -121,13 +122,10 @@ export class NotificationStepEntity extends StepVariantEntity implements INotifi variants?: StepVariantEntity[]; } -export class StepFilter { - isNegated: boolean; - - type: BuilderFieldType; - +export class StepFilter implements IMessageFilter { + isNegated?: boolean; + type?: BuilderFieldType; value: BuilderGroupValues; - children: FilterParts[]; } diff --git a/packages/application-generic/package.json b/packages/application-generic/package.json index 63cc61b9368..b8c0482b818 100644 --- a/packages/application-generic/package.json +++ b/packages/application-generic/package.json @@ -123,6 +123,7 @@ "codecov": "^3.5.0", "cpx": "^1.5.0", "dotenv": "^8.2.0", + "chai": "^4.2.0", "jest": "^27.1.0", "npm-run-all": "^4.1.5", "nyc": "^15.1.0", diff --git a/packages/application-generic/src/usecases/conditions-filter/conditions-filter.command.ts b/packages/application-generic/src/usecases/conditions-filter/conditions-filter.command.ts index 225e995c4e7..dc40c940033 100644 --- a/packages/application-generic/src/usecases/conditions-filter/conditions-filter.command.ts +++ b/packages/application-generic/src/usecases/conditions-filter/conditions-filter.command.ts @@ -1,8 +1,11 @@ -import { StepFilter } from '@novu/dal'; import { IsDefined } from 'class-validator'; -import { EnvironmentWithUserCommand } from '../../commands'; + +import { StepFilter } from '@novu/dal'; import { IJob, INotificationTemplateStep } from '@novu/shared'; +import { EnvironmentWithUserCommand } from '../../commands'; +import { IFilterVariables } from '../../utils/filter-processing-details'; + export class ConditionsFilterCommand extends EnvironmentWithUserCommand { @IsDefined() filters: StepFilter[]; @@ -10,4 +13,6 @@ export class ConditionsFilterCommand extends EnvironmentWithUserCommand { job?: IJob; step?: INotificationTemplateStep; + + variables?: IFilterVariables; } diff --git a/packages/application-generic/src/usecases/conditions-filter/conditions-filter.usecase.ts b/packages/application-generic/src/usecases/conditions-filter/conditions-filter.usecase.ts index 6056e29b991..cef5cc4ac66 100644 --- a/packages/application-generic/src/usecases/conditions-filter/conditions-filter.usecase.ts +++ b/packages/application-generic/src/usecases/conditions-filter/conditions-filter.usecase.ts @@ -8,6 +8,8 @@ import { SubscriberEntity, SubscriberRepository, JobRepository, + TenantEntity, + TenantRepository, } from '@novu/dal'; import { ChannelTypeEnum, @@ -37,31 +39,38 @@ import { import { EmailEventStatusEnum } from '@novu/stateless'; import { PlatformException } from '../../utils/exceptions'; import { createHash } from '../../utils/hmac'; - +import { Instrument } from '../../instrumentation'; +import { CachedEntity } from '../../services/cache/interceptors/cached-entity.interceptor'; +import { buildSubscriberKey } from '../../services/cache/key-builders/entities'; @Injectable() export class ConditionsFilter extends Filter { constructor( + private subscriberRepository: SubscriberRepository, private messageRepository: MessageRepository, private executionDetailsRepository: ExecutionDetailsRepository, - private subscriberRepository: SubscriberRepository, private jobRepository: JobRepository, + private tenantRepository: TenantRepository, private environmentRepository: EnvironmentRepository ) { super(); } - public async filter( - command: ConditionsFilterCommand, - variables: IFilterVariables - ): Promise<{ + public async filter(command: ConditionsFilterCommand): Promise<{ passed: boolean; conditions: ICondition[]; + variables: IFilterVariables; }> { - const { filters } = command; + const variables = await this.normalizeVariablesData(command); + const filters = command.filters?.length + ? command.filters + : command.step?.filters?.length + ? command.step.filters + : []; if (!filters || !Array.isArray(filters) || filters.length === 0) { return { passed: true, conditions: [], + variables: variables, }; } @@ -114,10 +123,31 @@ export class ConditionsFilter extends Filter { return { passed: !!foundFilter, - conditions: conditions, + conditions, + variables, }; } + public static sumFilters( + summary: { + filters: string[]; + failedFilters: string[]; + passedFilters: string[]; + }, + condition: ICondition + ) { + let type: string = condition.filter?.toLowerCase(); + + if ( + condition.filter === FILTER_TO_LABEL.isOnline || + condition.filter === FILTER_TO_LABEL.isOnlineInLast + ) { + type = 'online'; + } + + return Filter.sumFilters(summary, condition, type); + } + private async processPreviousStep( filter: IPreviousStepFilterPart, command: ConditionsFilterCommand, @@ -214,9 +244,8 @@ export class ConditionsFilter extends Filter { command: ConditionsFilterCommand, filterProcessingDetails: FilterProcessingDetails ): Promise { - const subscriber = await this.subscriberRepository.findOne({ - _id: command.job.subscriberId, - _organizationId: command.organizationId, + const subscriber = await this.getSubscriberBySubscriberId({ + subscriberId: command.job.subscriberId, _environmentId: command.environmentId, }); @@ -448,11 +477,32 @@ export class ConditionsFilter extends Filter { command: ConditionsFilterCommand, filterProcessingDetails: FilterProcessingDetails ): Promise { - const matchedOtherFilters = await this.filterAsync(filter.children, (i) => + const { webhookFilters, otherFilters } = this.splitFilters(filter); + + const matchedOtherFilters = await this.filterAsync(otherFilters, (i) => this.processFilter(variables, i, command, filterProcessingDetails) ); + if (otherFilters.length !== matchedOtherFilters.length) { + return false; + } + + const matchedWebhookFilters = await this.filterAsync(webhookFilters, (i) => + this.processFilter(variables, i, command, filterProcessingDetails) + ); + + return matchedWebhookFilters.length === webhookFilters.length; + } - return filter.children.length === matchedOtherFilters.length; + private splitFilters(filter: StepFilter) { + const webhookFilters = filter.children.filter( + (childFilter) => childFilter.on === 'webhook' + ); + + const otherFilters = filter.children.filter( + (childFilter) => childFilter.on !== 'webhook' + ); + + return { webhookFilters, otherFilters }; } private async handleOrFilters( @@ -461,11 +511,98 @@ export class ConditionsFilter extends Filter { command: ConditionsFilterCommand, filterProcessingDetails: FilterProcessingDetails ): Promise { - const foundFilter = await this.findAsync(filter.children, (i) => + const { webhookFilters, otherFilters } = this.splitFilters(filter); + + const foundFilter = await this.findAsync(otherFilters, (i) => this.processFilter(variables, i, command, filterProcessingDetails) ); + if (foundFilter) { + return true; + } - return !!foundFilter; + return !!(await this.findAsync(webhookFilters, (i) => + this.processFilter(variables, i, command, filterProcessingDetails) + )); + } + + @Instrument() + private async normalizeVariablesData(command: ConditionsFilterCommand) { + const filterVariables: IFilterVariables = {}; + + filterVariables.subscriber = await this.fetchSubscriberIfMissing(command); + filterVariables.tenant = await this.fetchTenantIfMissing(command); + filterVariables.payload = command.variables?.payload + ? command.variables?.payload + : command.job?.payload ?? undefined; + + return filterVariables; + } + + private async fetchSubscriberIfMissing( + command: ConditionsFilterCommand + ): Promise { + if (command.variables?.subscriber) { + return command.variables.subscriber; + } + + const subscriberFilterExist = command.step?.filters?.find((filter) => { + return filter?.children?.find((item) => item?.on === 'subscriber'); + }); + + if (subscriberFilterExist && command.job) { + return ( + (await this.getSubscriberBySubscriberId({ + subscriberId: command.job.subscriberId, + _environmentId: command.environmentId, + })) ?? undefined + ); + } + + return undefined; + } + + private async fetchTenantIfMissing( + command: ConditionsFilterCommand + ): Promise { + if (command.variables?.tenant) { + return command.variables.tenant; + } + + const tenantIdentifier = command.job?.tenant?.identifier; + const tenantFilterExist = command.step?.filters?.find((filter) => { + return filter?.children?.find((item) => item?.on === 'subscriber'); + }); + + if (tenantFilterExist && tenantIdentifier && command.job) { + return ( + (await this.tenantRepository.findOne({ + _environmentId: command.job._environmentId, + identifier: tenantIdentifier, + })) ?? undefined + ); + } + + return undefined; + } + + @CachedEntity({ + builder: (command: { subscriberId: string; _environmentId: string }) => + buildSubscriberKey({ + _environmentId: command._environmentId, + subscriberId: command.subscriberId, + }), + }) + public async getSubscriberBySubscriberId({ + subscriberId, + _environmentId, + }: { + subscriberId: string; + _environmentId: string; + }) { + return await this.subscriberRepository.findOne({ + _environmentId, + subscriberId, + }); } } diff --git a/packages/application-generic/src/usecases/select-integration/select-integration.spec.ts b/packages/application-generic/src/usecases/select-integration/select-integration.spec.ts index d4e13df8da5..5db74822639 100644 --- a/packages/application-generic/src/usecases/select-integration/select-integration.spec.ts +++ b/packages/application-generic/src/usecases/select-integration/select-integration.spec.ts @@ -106,6 +106,7 @@ describe('select integration', function () { new ExecutionDetailsRepository(), new SubscriberRepository(), new JobRepository(), + new TenantRepository(), new EnvironmentRepository() ), new TenantRepository(), diff --git a/packages/application-generic/src/usecases/select-integration/select-integration.usecase.ts b/packages/application-generic/src/usecases/select-integration/select-integration.usecase.ts index c41e64a8031..6f4b36ea4c5 100644 --- a/packages/application-generic/src/usecases/select-integration/select-integration.usecase.ts +++ b/packages/application-generic/src/usecases/select-integration/select-integration.usecase.ts @@ -8,8 +8,9 @@ import { import { CHANNELS_WITH_PRIMARY } from '@novu/shared'; import { SelectIntegrationCommand } from './select-integration.command'; -import { ConditionsFilter } from '../conditions-filter/conditions-filter.usecase'; -import { buildIntegrationKey, CachedQuery } from '../../services/cache'; +import { ConditionsFilter } from '../conditions-filter'; +import { CachedQuery } from '../../services/cache/interceptors/cached-query.interceptor'; +import { buildIntegrationKey } from '../../services/cache/key-builders/queries'; import { FeatureFlagCommand, GetIsMultiProviderConfigurationEnabled, @@ -96,11 +97,12 @@ export class SelectIntegration { environmentId: command.environmentId, organizationId: command.organizationId, userId: command.userId, - }), - { - tenant, - } + variables: { + tenant, + }, + }) ); + if (passed) { integration = currentIntegration; break; diff --git a/packages/application-generic/src/usecases/select-variant/select-variant.usecase.ts b/packages/application-generic/src/usecases/select-variant/select-variant.usecase.ts index 83d8062e4a7..8e6714d381e 100644 --- a/packages/application-generic/src/usecases/select-variant/select-variant.usecase.ts +++ b/packages/application-generic/src/usecases/select-variant/select-variant.usecase.ts @@ -43,8 +43,8 @@ export class SelectVariant { userId: command.userId, step: command.step, job: command.job, - }), - command.filterData + variables: command.filterData, + }) ); if (passed) { diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index c27dc259b78..1f2a01e6aa8 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -394,7 +394,7 @@ importers: nodemon: 2.0.22 prettier: 2.8.7 sinon: 9.2.4 - ts-jest: 27.1.5_4aafjbpmnrfjtrzkyohogv4jce + ts-jest: 27.1.5_cnngzrja2umb46xxazlucyx2qu ts-loader: 9.4.2_rggdtlzfqxxwxudp3onsqdyocm ts-node: 10.9.1_wh2cnrlliuuxb2etxm2m3ttgna tsconfig-paths: 4.1.2 @@ -1547,6 +1547,7 @@ importers: analytics-node: ^6.2.0 axios: ^1.5.0 bullmq: ^3.10.2 + chai: ^4.2.0 class-transformer: ^0.5.1 class-validator: ^0.14.0 codecov: ^3.5.0 @@ -1657,6 +1658,7 @@ importers: '@types/analytics-node': 3.1.11 '@types/jest': 29.5.2 '@types/sinon': 9.0.11 + chai: 4.3.7 codecov: 3.8.3 cpx: 1.5.0 dotenv: 8.6.0 @@ -1667,7 +1669,7 @@ importers: rimraf: 3.0.2 sinon: 9.2.4 ts-jest: 27.1.5_jt6hjr4g6reedzkbayzypxjbqa - ts-node: 10.9.1_263kxb4upkllfszsccwxso4xq4 + ts-node: 10.9.1_wh2cnrlliuuxb2etxm2m3ttgna typescript: 4.9.5 packages/cli: @@ -28762,7 +28764,7 @@ packages: dependencies: '@types/node': 14.18.42 cosmiconfig: 8.2.0 - ts-node: 10.9.1_u2ngtadnsu6rqlw26gb7xq6vqq + ts-node: 10.9.1_wh2cnrlliuuxb2etxm2m3ttgna typescript: 4.9.5 /cosmiconfig/5.2.1: @@ -36486,7 +36488,7 @@ packages: pretty-format: 27.5.1 slash: 3.0.0 strip-json-comments: 3.1.1 - ts-node: 10.9.1_263kxb4upkllfszsccwxso4xq4 + ts-node: 10.9.1_wh2cnrlliuuxb2etxm2m3ttgna transitivePeerDependencies: - bufferutil - canvas @@ -50301,7 +50303,7 @@ packages: yargs-parser: 20.2.9 dev: true - /ts-jest/27.1.5_4aafjbpmnrfjtrzkyohogv4jce: + /ts-jest/27.1.5_cnngzrja2umb46xxazlucyx2qu: resolution: {integrity: sha512-Xv6jBQPoBEvBq/5i2TeSG9tt/nqkbpcurrEG1b+2yfBrcJelOZF9Ml6dmyMh7bcW9JyFbRYpR5rxROSlBLTZHA==} engines: {node: ^10.13.0 || ^12.13.0 || ^14.15.0 || >=15.0.0} hasBin: true @@ -50322,7 +50324,6 @@ packages: esbuild: optional: true dependencies: - '@babel/core': 7.22.11 bs-logger: 0.2.6 fast-json-stable-stringify: 2.1.0 jest: 27.5.1_ts-node@10.9.1 @@ -50504,37 +50505,6 @@ packages: webpack: 5.88.2 dev: true - /ts-node/10.9.1_263kxb4upkllfszsccwxso4xq4: - resolution: {integrity: sha512-NtVysVPkxxrwFGUUxGYhfux8k78pQB3JqYBXlLRZgdGUqTO5wU/UyHop5p70iEbGhB7q5KmiZiU0Y3KlJrScEw==} - hasBin: true - peerDependencies: - '@swc/core': '>=1.2.50' - '@swc/wasm': '>=1.2.50' - '@types/node': '*' - typescript: '>=2.7' - peerDependenciesMeta: - '@swc/core': - optional: true - '@swc/wasm': - optional: true - dependencies: - '@cspotcode/source-map-support': 0.8.1 - '@tsconfig/node10': 1.0.9 - '@tsconfig/node12': 1.0.11 - '@tsconfig/node14': 1.0.3 - '@tsconfig/node16': 1.0.3 - '@types/node': 16.11.7 - acorn: 8.10.0 - acorn-walk: 8.2.0 - arg: 4.1.3 - create-require: 1.1.1 - diff: 4.0.2 - make-error: 1.3.6 - typescript: 4.9.5 - v8-compile-cache-lib: 3.0.1 - yn: 3.1.1 - dev: true - /ts-node/10.9.1_3oc4l4vkwjasz4xtxrjz3zw4u4: resolution: {integrity: sha512-NtVysVPkxxrwFGUUxGYhfux8k78pQB3JqYBXlLRZgdGUqTO5wU/UyHop5p70iEbGhB7q5KmiZiU0Y3KlJrScEw==} hasBin: true @@ -50661,6 +50631,7 @@ packages: typescript: 4.9.5 v8-compile-cache-lib: 3.0.1 yn: 3.1.1 + dev: true /ts-node/10.9.1_wh2cnrlliuuxb2etxm2m3ttgna: resolution: {integrity: sha512-NtVysVPkxxrwFGUUxGYhfux8k78pQB3JqYBXlLRZgdGUqTO5wU/UyHop5p70iEbGhB7q5KmiZiU0Y3KlJrScEw==} From befcfa96da09b6fec810f8fc3864742f6d9f30b2 Mon Sep 17 00:00:00 2001 From: Gosha Date: Sun, 24 Sep 2023 14:18:24 +0300 Subject: [PATCH 2/4] fix: spec build --- .../src/usecases/select-integration/select-integration.spec.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/application-generic/src/usecases/select-integration/select-integration.spec.ts b/packages/application-generic/src/usecases/select-integration/select-integration.spec.ts index 5db74822639..586b4c37cad 100644 --- a/packages/application-generic/src/usecases/select-integration/select-integration.spec.ts +++ b/packages/application-generic/src/usecases/select-integration/select-integration.spec.ts @@ -102,9 +102,9 @@ describe('select integration', function () { // @ts-ignore new GetDecryptedIntegrations(), new ConditionsFilter( + new SubscriberRepository(), new MessageRepository(), new ExecutionDetailsRepository(), - new SubscriberRepository(), new JobRepository(), new TenantRepository(), new EnvironmentRepository() From ab3a32f336f98d01c1d894ed1fe67dfa3c176f76 Mon Sep 17 00:00:00 2001 From: Gosha Date: Tue, 26 Sep 2023 12:39:41 +0300 Subject: [PATCH 3/4] feat: delete old use case --- .../conditions-filter.usecase.spec.ts} | 0 .../usecases/message-matcher/index.ts | 0 .../message-matcher.command.ts | 25 - .../message-matcher.usecase.ts | 522 ------------------ 4 files changed, 547 deletions(-) rename apps/worker/src/app/workflow/{usecases/message-matcher/message-matcher.usecase.spec.ts => specs/conditions-filter.usecase.spec.ts} (100%) delete mode 100644 apps/worker/src/app/workflow/usecases/message-matcher/index.ts delete mode 100644 apps/worker/src/app/workflow/usecases/message-matcher/message-matcher.command.ts delete mode 100644 apps/worker/src/app/workflow/usecases/message-matcher/message-matcher.usecase.ts diff --git a/apps/worker/src/app/workflow/usecases/message-matcher/message-matcher.usecase.spec.ts b/apps/worker/src/app/workflow/specs/conditions-filter.usecase.spec.ts similarity index 100% rename from apps/worker/src/app/workflow/usecases/message-matcher/message-matcher.usecase.spec.ts rename to apps/worker/src/app/workflow/specs/conditions-filter.usecase.spec.ts diff --git a/apps/worker/src/app/workflow/usecases/message-matcher/index.ts b/apps/worker/src/app/workflow/usecases/message-matcher/index.ts deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/apps/worker/src/app/workflow/usecases/message-matcher/message-matcher.command.ts b/apps/worker/src/app/workflow/usecases/message-matcher/message-matcher.command.ts deleted file mode 100644 index 762f8d11bd0..00000000000 --- a/apps/worker/src/app/workflow/usecases/message-matcher/message-matcher.command.ts +++ /dev/null @@ -1,25 +0,0 @@ -import { IsDefined, IsString } from 'class-validator'; -import { NotificationStepEntity, JobEntity } from '@novu/dal'; -import { EnvironmentWithUserCommand } from '@novu/application-generic'; - -class MessageMatcherCommand extends EnvironmentWithUserCommand { - @IsDefined() - @IsString() - identifier: string; - - @IsDefined() - step: NotificationStepEntity; - - @IsString() - @IsDefined() - transactionId: string; - - @IsDefined() - subscriberId: string; - - @IsDefined() - _subscriberId: string; - - @IsDefined() - job: JobEntity; -} diff --git a/apps/worker/src/app/workflow/usecases/message-matcher/message-matcher.usecase.ts b/apps/worker/src/app/workflow/usecases/message-matcher/message-matcher.usecase.ts deleted file mode 100644 index 4a04f92edfb..00000000000 --- a/apps/worker/src/app/workflow/usecases/message-matcher/message-matcher.usecase.ts +++ /dev/null @@ -1,522 +0,0 @@ -/* - * import * as _ from 'lodash'; - * import axios from 'axios'; - * import { Injectable } from '@nestjs/common'; - * import { parseISO, differenceInMinutes, differenceInHours, differenceInDays } from 'date-fns'; - * import { - * FilterParts, - * IWebhookFilterPart, - * IRealtimeOnlineFilterPart, - * IOnlineInLastFilterPart, - * FILTER_TO_LABEL, - * FilterPartTypeEnum, - * ICondition, - * TimeOperatorEnum, - * ChannelTypeEnum, - * IPreviousStepFilterPart, - * PreviousStepTypeEnum, - * ExecutionDetailsSourceEnum, - * ExecutionDetailsStatusEnum, - * } from '@novu/shared'; - * import { - * SubscriberEntity, - * EnvironmentRepository, - * SubscriberRepository, - * StepFilter, - * ExecutionDetailsRepository, - * MessageRepository, - * JobRepository, - * } from '@novu/dal'; - * import { - * DetailEnum, - * CreateExecutionDetails, - * CreateExecutionDetailsCommand, - * buildSubscriberKey, - * CachedEntity, - * Instrument, - * Filter, - * FilterProcessingDetails, - * IFilterVariables, - * } from '@novu/application-generic'; - * import { EmailEventStatusEnum } from '@novu/stateless'; - * - * import { EXCEPTION_MESSAGE_ON_WEBHOOK_FILTER, createHash, PlatformException } from '../../../shared/utils'; - * import { MessageMatcherCommand } from './message-matcher.command'; - * - * const differenceIn = (currentDate: Date, lastDate: Date, timeOperator: TimeOperatorEnum) => { - * if (timeOperator === TimeOperatorEnum.MINUTES) { - * return differenceInMinutes(currentDate, lastDate); - * } - * - * if (timeOperator === TimeOperatorEnum.HOURS) { - * return differenceInHours(currentDate, lastDate); - * } - * - * return differenceInDays(currentDate, lastDate); - * }; - * - * @Injectable() - * class MessageMatcher extends Filter { - * constructor( - * private subscriberRepository: SubscriberRepository, - * private createExecutionDetails: CreateExecutionDetails, - * private environmentRepository: EnvironmentRepository, - * private executionDetailsRepository: ExecutionDetailsRepository, - * private messageRepository: MessageRepository, - * private jobRepository: JobRepository - * ) { - * super(); - * } - * - * public async filter( - * command: MessageMatcherCommand, - * variables: IFilterVariables - * ): Promise<{ - * passed: boolean; - * conditions: ICondition[]; - * }> { - * const { step } = command; - * if (!step?.filters || !Array.isArray(step?.filters)) { - * return { - * passed: true, - * conditions: [], - * }; - * } - * if (step.filters?.length) { - * const details: FilterProcessingDetails[] = []; - * - * const foundFilter = await this.findAsync(step.filters, async (filter) => { - * const filterProcessingDetails = new FilterProcessingDetails(); - * filterProcessingDetails.addFilter(filter, variables); - * - * const children = filter.children; - * const noRules = !children || (Array.isArray(children) && children.length === 0); - * if (noRules) { - * return true; - * } - * - * const singleRule = !children || (Array.isArray(children) && children.length === 1); - * if (singleRule) { - * const result = await this.processFilter(variables, children[0], command, filterProcessingDetails); - * await this.createExecutionDetails.execute( - * CreateExecutionDetailsCommand.create({ - * ...CreateExecutionDetailsCommand.getDetailsFromJob(command.job), - * detail: DetailEnum.PROCESSING_STEP_FILTER, - * source: ExecutionDetailsSourceEnum.INTERNAL, - * status: ExecutionDetailsStatusEnum.PENDING, - * isTest: false, - * isRetry: false, - * raw: filterProcessingDetails.toString(), - * }) - * ); - * - * details.push(filterProcessingDetails); - * - * return result; - * } - * - * const result = await this.handleGroupFilters(filter, variables, command, filterProcessingDetails); - * await this.createExecutionDetails.execute( - * CreateExecutionDetailsCommand.create({ - * ...CreateExecutionDetailsCommand.getDetailsFromJob(command.job), - * detail: DetailEnum.PROCESSING_STEP_FILTER, - * source: ExecutionDetailsSourceEnum.INTERNAL, - * status: ExecutionDetailsStatusEnum.PENDING, - * isTest: false, - * isRetry: false, - * raw: filterProcessingDetails.toString(), - * }) - * ); - * - * details.push(filterProcessingDetails); - * - * return result; - * }); - * - * const conditions = details - * .map((detail) => detail.toObject().conditions) - * .reduce((conditionsArray, collection) => [...collection, ...conditionsArray], []); - * - * return { - * passed: !!foundFilter, - * conditions: conditions, - * }; - * } - * - * return { - * passed: true, - * conditions: [], - * }; - * } - * - * public static sumFilters( - * summary: { - * filters: string[]; - * failedFilters: string[]; - * passedFilters: string[]; - * }, - * condition: ICondition - * ) { - * let type: string = condition.filter?.toLowerCase(); - * - * if (condition.filter === FILTER_TO_LABEL.isOnline || condition.filter === FILTER_TO_LABEL.isOnlineInLast) { - * type = 'online'; - * } - * - * return Filter.sumFilters(summary, condition, type); - * } - * - * private async handleGroupFilters( - * filter: StepFilter, - * variables: IFilterVariables, - * command: MessageMatcherCommand, - * filterProcessingDetails: FilterProcessingDetails - * ): Promise { - * if (filter.value === 'OR') { - * return await this.handleOrFilters(filter, variables, command, filterProcessingDetails); - * } - * - * if (filter.value === 'AND') { - * return await this.handleAndFilters(filter, variables, command, filterProcessingDetails); - * } - * - * return false; - * } - * - * private splitFilters(filter: StepFilter) { - * const webhookFilters = filter.children.filter((childFilter) => childFilter.on === 'webhook'); - * - * const otherFilters = filter.children.filter((childFilter) => childFilter.on !== 'webhook'); - * - * return { webhookFilters, otherFilters }; - * } - * - * private async handleAndFilters( - * filter: StepFilter, - * variables: IFilterVariables, - * command: MessageMatcherCommand, - * filterProcessingDetails: FilterProcessingDetails - * ): Promise { - * const { webhookFilters, otherFilters } = this.splitFilters(filter); - * - * const matchedOtherFilters = await this.filterAsync(otherFilters, (i) => - * this.processFilter(variables, i, command, filterProcessingDetails) - * ); - * if (otherFilters.length !== matchedOtherFilters.length) { - * return false; - * } - * - * const matchedWebhookFilters = await this.filterAsync(webhookFilters, (i) => - * this.processFilter(variables, i, command, filterProcessingDetails) - * ); - * - * return matchedWebhookFilters.length === webhookFilters.length; - * } - * - * private async handleOrFilters( - * filter: StepFilter, - * variables: IFilterVariables, - * command: MessageMatcherCommand, - * filterProcessingDetails: FilterProcessingDetails - * ): Promise { - * const { webhookFilters, otherFilters } = this.splitFilters(filter); - * - * const foundFilter = await this.findAsync(otherFilters, (i) => - * this.processFilter(variables, i, command, filterProcessingDetails) - * ); - * if (foundFilter) { - * return true; - * } - * - * return !!(await this.findAsync(webhookFilters, (i) => - * this.processFilter(variables, i, command, filterProcessingDetails) - * )); - * } - * - * private async processPreviousStep( - * filter: IPreviousStepFilterPart, - * command: MessageMatcherCommand, - * filterProcessingDetails: FilterProcessingDetails - * ): Promise { - * const job = await this.jobRepository.findOne({ - * transactionId: command.transactionId, - * // backward compatibility - ternary needed to be removed once the queue renewed - * _subscriberId: command._subscriberId ? command._subscriberId : command.subscriberId, - * _environmentId: command.environmentId, - * _organizationId: command.organizationId, - * 'step.uuid': filter.step, - * }); - * - * if (!job) { - * return true; - * } - * - * const message = await this.messageRepository.findOne({ - * _jobId: job._id, - * _environmentId: command.environmentId, - * // backward compatibility - ternary needed to be removed once the queue renewed - * _subscriberId: command._subscriberId ? command._subscriberId : command.subscriberId, - * transactionId: command.transactionId, - * }); - * - * if (!message) { - * return true; - * } - * - * const label = FILTER_TO_LABEL[filter.on]; - * const field = filter.stepType; - * const expected = 'true'; - * const operator = 'EQUAL'; - * - * if (message?.channel === ChannelTypeEnum.EMAIL) { - * const count = await this.executionDetailsRepository.count({ - * _jobId: command.job._parentId, - * _messageId: message._id, - * _environmentId: command.environmentId, - * webhookStatus: EmailEventStatusEnum.OPENED, - * }); - * - * const passed = [PreviousStepTypeEnum.UNREAD, PreviousStepTypeEnum.UNSEEN].includes(filter.stepType) - * ? count === 0 - * : count > 0; - * - * filterProcessingDetails.addCondition({ - * filter: label, - * field, - * expected, - * actual: `${passed}`, - * operator, - * passed, - * }); - * - * return passed; - * } - * - * const value = [PreviousStepTypeEnum.SEEN, PreviousStepTypeEnum.UNSEEN].includes(filter.stepType) - * ? message.seen - * : message.read; - * const passed = [PreviousStepTypeEnum.UNREAD, PreviousStepTypeEnum.UNSEEN].includes(filter.stepType) - * ? value === false - * : value; - * - * filterProcessingDetails.addCondition({ - * filter: label, - * field, - * expected, - * actual: `${passed}`, - * operator, - * passed, - * }); - * - * return passed; - * } - * - * private async processIsOnline( - * filter: IRealtimeOnlineFilterPart | IOnlineInLastFilterPart, - * command: MessageMatcherCommand, - * filterProcessingDetails: FilterProcessingDetails - * ): Promise { - * const subscriber = await this.subscriberRepository.findOne({ - * _id: command.subscriberId, - * _organizationId: command.organizationId, - * _environmentId: command.environmentId, - * }); - * - * const hasNoOnlineFieldsSet = - * typeof subscriber?.isOnline === 'undefined' && typeof subscriber?.lastOnlineAt === 'undefined'; - * const isOnlineString = `${subscriber?.isOnline ?? ''}`; - * const lastOnlineAtString = `${subscriber?.lastOnlineAt ?? ''}`; - * // the old subscriber created before the is online functionality should not be processed - * if (hasNoOnlineFieldsSet) { - * filterProcessingDetails.addCondition({ - * filter: FILTER_TO_LABEL[filter.on], - * field: 'isOnline', - * expected: `${filter.value}`, - * actual: `${filter.on === FilterPartTypeEnum.IS_ONLINE ? isOnlineString : lastOnlineAtString}`, - * operator: filter.on === FilterPartTypeEnum.IS_ONLINE ? 'EQUAL' : filter.timeOperator, - * passed: false, - * }); - * - * return false; - * } - * - * const isOnlineMatch = subscriber?.isOnline === filter.value; - * if (filter.on === FilterPartTypeEnum.IS_ONLINE) { - * filterProcessingDetails.addCondition({ - * filter: FILTER_TO_LABEL[filter.on], - * field: 'isOnline', - * expected: `${filter.value}`, - * actual: isOnlineString, - * operator: 'EQUAL', - * passed: isOnlineMatch, - * }); - * - * return isOnlineMatch; - * } - * - * const currentDate = new Date(); - * const lastOnlineAt = subscriber?.lastOnlineAt ? parseISO(subscriber?.lastOnlineAt) : new Date(); - * const diff = differenceIn(currentDate, lastOnlineAt, filter.timeOperator); - * const result = subscriber?.isOnline || (!subscriber?.isOnline && diff >= 0 && diff <= filter.value); - * - * filterProcessingDetails.addCondition({ - * filter: FILTER_TO_LABEL[filter.on], - * field: subscriber?.isOnline ? 'isOnline' : 'lastOnlineAt', - * expected: subscriber?.isOnline ? 'true' : `${filter.value}`, - * actual: `${subscriber?.isOnline ? 'true' : diff}`, - * operator: filter.timeOperator, - * passed: result, - * }); - * - * return result; - * } - * - * private async getWebhookResponse( - * child: IWebhookFilterPart, - * variables: IFilterVariables, - * command: MessageMatcherCommand - * ): Promise | undefined> { - * if (!child.webhookUrl) return undefined; - * - * const payload = await this.buildPayload(variables, command); - * - * const hmac = await this.buildHmac(command); - * - * const config = { - * headers: { - * 'nv-hmac-256': hmac, - * }, - * }; - * - * try { - * return await axios.post(child.webhookUrl, payload, config).then((response) => { - * return response.data as Record; - * }); - * } catch (err) { - * throw new Error( - * JSON.stringify({ - * message: err.message, - * data: EXCEPTION_MESSAGE_ON_WEBHOOK_FILTER, - * }) - * ); - * } - * } - * - * private async buildPayload(variables: IFilterVariables, command: MessageMatcherCommand) { - * if (process.env.NODE_ENV === 'test') return variables; - * - * const payload: Partial<{ - * subscriber: SubscriberEntity | null; - * payload: Record; - * identifier: string; - * channel: string; - * providerId: string; - * }> = {}; - * - * if (variables.subscriber) { - * payload.subscriber = variables.subscriber; - * } else { - * payload.subscriber = await this.subscriberRepository.findBySubscriberId( - * command.environmentId, - * command.subscriberId - * ); - * } - * - * if (variables.payload) { - * payload.payload = variables.payload; - * } - * - * payload.identifier = command.identifier; - * payload.channel = command.job.type; - * - * if (command.job.providerId) { - * payload.providerId = command.job.providerId; - * } - * - * return payload; - * } - * - * private async buildHmac(command: MessageMatcherCommand): Promise { - * if (process.env.NODE_ENV === 'test') return ''; - * - * const environment = await this.environmentRepository.findOne({ - * _id: command.environmentId, - * _organizationId: command.organizationId, - * }); - * if (!environment) throw new PlatformException('Environment is not found'); - * - * return createHash(environment.apiKeys[0].key, command.environmentId); - * } - * - * private async processFilter( - * variables: IFilterVariables, - * child: FilterParts, - * command: MessageMatcherCommand, - * filterProcessingDetails: FilterProcessingDetails - * ): Promise { - * let passed = false; - * - * if (child.on === FilterPartTypeEnum.WEBHOOK) { - * if (process.env.NODE_ENV === 'test') return true; - * - * const res = await this.getWebhookResponse(child, variables, command); - * passed = this.processFilterEquality({ payload: undefined, webhook: res }, child, filterProcessingDetails); - * } - * - * if (child.on === FilterPartTypeEnum.PAYLOAD || child.on === FilterPartTypeEnum.SUBSCRIBER) { - * passed = this.processFilterEquality(variables, child, filterProcessingDetails); - * } - * - * if (child.on === FilterPartTypeEnum.IS_ONLINE || child.on === FilterPartTypeEnum.IS_ONLINE_IN_LAST) { - * passed = await this.processIsOnline(child, command, filterProcessingDetails); - * } - * - * if (child.on === FilterPartTypeEnum.PREVIOUS_STEP) { - * passed = await this.processPreviousStep(child, command, filterProcessingDetails); - * } - * - * return passed; - * } - * - * @Instrument() - * public async getFilterData(command: MessageMatcherCommand) { - * const subscriberFilterExist = command.step?.filters?.find((filter) => { - * return filter?.children?.find((item) => item?.on === 'subscriber'); - * }); - * - * let subscriber; - * - * if (subscriberFilterExist) { - * subscriber = await this.getSubscriberBySubscriberId({ - * subscriberId: command.subscriberId, - * _environmentId: command.environmentId, - * }); - * } - * - * return { - * subscriber, - * payload: command.job.payload, - * }; - * } - * - * @CachedEntity({ - * builder: (command: { subscriberId: string; _environmentId: string }) => - * buildSubscriberKey({ - * _environmentId: command._environmentId, - * subscriberId: command.subscriberId, - * }), - * }) - * public async getSubscriberBySubscriberId({ - * subscriberId, - * _environmentId, - * }: { - * subscriberId: string; - * _environmentId: string; - * }) { - * return await this.subscriberRepository.findOne({ - * _environmentId, - * subscriberId, - * }); - * } - * } - */ From 5ca053f5685a05ee4b71d120c90ffa5f6640f187 Mon Sep 17 00:00:00 2001 From: Gosha Date: Mon, 2 Oct 2023 20:38:04 +0300 Subject: [PATCH 4/4] feat: update after pr comments --- .../conditions-filter.usecase.ts | 45 ++++++++++++++----- 1 file changed, 33 insertions(+), 12 deletions(-) diff --git a/packages/application-generic/src/usecases/conditions-filter/conditions-filter.usecase.ts b/packages/application-generic/src/usecases/conditions-filter/conditions-filter.usecase.ts index cef5cc4ac66..018530fe4e9 100644 --- a/packages/application-generic/src/usecases/conditions-filter/conditions-filter.usecase.ts +++ b/packages/application-generic/src/usecases/conditions-filter/conditions-filter.usecase.ts @@ -23,6 +23,8 @@ import { IWebhookFilterPart, PreviousStepTypeEnum, TimeOperatorEnum, + IMessageFilter, + INotificationTemplateStep, } from '@novu/shared'; import { Filter } from '../../utils/filter'; import { @@ -42,6 +44,7 @@ import { createHash } from '../../utils/hmac'; import { Instrument } from '../../instrumentation'; import { CachedEntity } from '../../services/cache/interceptors/cached-entity.interceptor'; import { buildSubscriberKey } from '../../services/cache/key-builders/entities'; + @Injectable() export class ConditionsFilter extends Filter { constructor( @@ -61,11 +64,8 @@ export class ConditionsFilter extends Filter { variables: IFilterVariables; }> { const variables = await this.normalizeVariablesData(command); - const filters = command.filters?.length - ? command.filters - : command.step?.filters?.length - ? command.step.filters - : []; + const filters = this.extractFilters(command); + if (!filters || !Array.isArray(filters) || filters.length === 0) { return { passed: true, @@ -128,6 +128,14 @@ export class ConditionsFilter extends Filter { }; } + private extractFilters(command: ConditionsFilterCommand) { + return command.filters?.length + ? command.filters + : command.step?.filters?.length + ? command.step.filters + : []; + } + public static sumFilters( summary: { filters: string[]; @@ -529,8 +537,19 @@ export class ConditionsFilter extends Filter { private async normalizeVariablesData(command: ConditionsFilterCommand) { const filterVariables: IFilterVariables = {}; - filterVariables.subscriber = await this.fetchSubscriberIfMissing(command); - filterVariables.tenant = await this.fetchTenantIfMissing(command); + const combinedFilters = [ + command.step, + ...(command.step?.variants || []), + ].flatMap((variant) => (variant?.filters ? variant?.filters : [])); + + filterVariables.subscriber = await this.fetchSubscriberIfMissing( + command, + combinedFilters + ); + filterVariables.tenant = await this.fetchTenantIfMissing( + command, + combinedFilters + ); filterVariables.payload = command.variables?.payload ? command.variables?.payload : command.job?.payload ?? undefined; @@ -539,13 +558,14 @@ export class ConditionsFilter extends Filter { } private async fetchSubscriberIfMissing( - command: ConditionsFilterCommand + command: ConditionsFilterCommand, + filters: IMessageFilter[] ): Promise { if (command.variables?.subscriber) { return command.variables.subscriber; } - const subscriberFilterExist = command.step?.filters?.find((filter) => { + const subscriberFilterExist = filters?.find((filter) => { return filter?.children?.find((item) => item?.on === 'subscriber'); }); @@ -562,15 +582,16 @@ export class ConditionsFilter extends Filter { } private async fetchTenantIfMissing( - command: ConditionsFilterCommand + command: ConditionsFilterCommand, + filters: IMessageFilter[] ): Promise { if (command.variables?.tenant) { return command.variables.tenant; } const tenantIdentifier = command.job?.tenant?.identifier; - const tenantFilterExist = command.step?.filters?.find((filter) => { - return filter?.children?.find((item) => item?.on === 'subscriber'); + const tenantFilterExist = filters?.find((filter) => { + return filter?.children?.find((item) => item?.on === 'tenant'); }); if (tenantFilterExist && tenantIdentifier && command.job) {