diff --git a/apps/api/src/app/events/e2e/bridge-trigger.e2e.ts b/apps/api/src/app/events/e2e/bridge-trigger.e2e.ts index 64d1d7665a9..dd0b09cdb9d 100644 --- a/apps/api/src/app/events/e2e/bridge-trigger.e2e.ts +++ b/apps/api/src/app/events/e2e/bridge-trigger.e2e.ts @@ -765,6 +765,82 @@ contexts.forEach((context: Context) => { expect(messageBodies).to.include('Hello there 1'); expect(messageBodies).to.include('Hello there 2'); }); + + it(`should deliver message if inApp is enabled via workflow preferences [${context.name}]`, async () => { + process.env.IS_WORKFLOW_PREFERENCES_ENABLED = 'true'; + const workflowId = `enabled-inapp-workflow-${`${context.name}-${uuidv4()}`}`; + const newWorkflow = workflow( + workflowId, + async ({ step }) => { + await step.inApp('send-in-app', () => ({ body: 'Hello there 1' })); + }, + { + preferences: { + channels: { + inApp: { + defaultValue: true, + }, + }, + }, + } + ); + + await bridgeServer.start({ workflows: [newWorkflow] }); + + if (context.isStateful) { + await discoverAndSyncBridge(session, workflowsRepository, workflowId, bridgeServer); + } + + await triggerEvent(session, workflowId, subscriber, {}, bridge); + await session.awaitRunningJobs(); + + const sentMessages = await messageRepository.find({ + _environmentId: session.environment._id, + _subscriberId: subscriber._id, + templateIdentifier: workflowId, + channel: StepTypeEnum.IN_APP, + }); + + expect(sentMessages.length).to.be.eq(1); + }); + + it(`should NOT deliver message if inApp is disabled via workflow preferences [${context.name}]`, async () => { + process.env.IS_WORKFLOW_PREFERENCES_ENABLED = 'true'; + const workflowId = `disabled-inapp-workflow-${`${context.name}-${uuidv4()}`}`; + const newWorkflow = workflow( + workflowId, + async ({ step }) => { + await step.inApp('send-in-app', () => ({ body: 'Hello there 1' })); + }, + { + preferences: { + channels: { + inApp: { + defaultValue: false, + }, + }, + }, + } + ); + + await bridgeServer.start({ workflows: [newWorkflow] }); + + if (context.isStateful) { + await discoverAndSyncBridge(session, workflowsRepository, workflowId, bridgeServer); + } + + await triggerEvent(session, workflowId, subscriber, {}, bridge); + await session.awaitRunningJobs(); + + const sentMessages = await messageRepository.find({ + _environmentId: session.environment._id, + _subscriberId: subscriber._id, + templateIdentifier: workflowId, + channel: StepTypeEnum.IN_APP, + }); + + expect(sentMessages.length).to.be.eq(0); + }); }); }); diff --git a/apps/worker/src/.env.test b/apps/worker/src/.env.test index 75042df4b1b..1e54513db9b 100644 --- a/apps/worker/src/.env.test +++ b/apps/worker/src/.env.test @@ -82,3 +82,5 @@ IS_USE_MERGED_DIGEST_ID_ENABLED=true BROADCAST_QUEUE_CHUNK_SIZE=100 MULTICAST_QUEUE_CHUNK_SIZE=100 + +IS_WORKFLOW_PREFERENCES_ENABLED=true diff --git a/apps/worker/src/app/workflow/usecases/run-job/run-job.usecase.ts b/apps/worker/src/app/workflow/usecases/run-job/run-job.usecase.ts index e75c02a543d..6863acb511d 100644 --- a/apps/worker/src/app/workflow/usecases/run-job/run-job.usecase.ts +++ b/apps/worker/src/app/workflow/usecases/run-job/run-job.usecase.ts @@ -99,6 +99,7 @@ export class RunJob { events: job.digest?.events, job, tags: notification.tags || [], + statelessPreferences: job.preferences, }) ); @@ -110,7 +111,7 @@ export class RunJob { if (job.step.shouldStopOnFail || this.shouldBackoff(error)) { shouldQueueNextJob = false; } - throw new PlatformException(error.message); + throw error; } finally { if (shouldQueueNextJob) { const newJob = await this.queueNextJob.execute( diff --git a/apps/worker/src/app/workflow/usecases/send-message/send-message.command.ts b/apps/worker/src/app/workflow/usecases/send-message/send-message.command.ts index 700bf32be6d..8c5d709f2aa 100644 --- a/apps/worker/src/app/workflow/usecases/send-message/send-message.command.ts +++ b/apps/worker/src/app/workflow/usecases/send-message/send-message.command.ts @@ -2,6 +2,7 @@ import { IsDefined, IsOptional, IsString } from 'class-validator'; import { NotificationStepEntity, JobEntity } from '@novu/dal'; import { EnvironmentWithUserCommand } from '@novu/application-generic'; import { ExecuteOutput } from '@novu/framework'; +import { WorkflowChannelPreferences } from '@novu/shared'; export class SendMessageCommand extends EnvironmentWithUserCommand { @IsDefined() @@ -50,4 +51,7 @@ export class SendMessageCommand extends EnvironmentWithUserCommand { @IsDefined() tags: string[]; + + @IsOptional() + statelessPreferences?: WorkflowChannelPreferences; } diff --git a/apps/worker/src/app/workflow/usecases/send-message/send-message.usecase.ts b/apps/worker/src/app/workflow/usecases/send-message/send-message.usecase.ts index 3834c208843..f768996d585 100644 --- a/apps/worker/src/app/workflow/usecases/send-message/send-message.usecase.ts +++ b/apps/worker/src/app/workflow/usecases/send-message/send-message.usecase.ts @@ -19,6 +19,7 @@ import { DetailEnum, ExecutionLogRoute, ExecutionLogRouteCommand, + GetPreferences, GetSubscriberGlobalPreference, GetSubscriberGlobalPreferenceCommand, GetSubscriberTemplatePreference, @@ -96,30 +97,20 @@ export class SendMessage { const stepType = command.step?.template?.type; let bridgeResponse: ExecuteOutput | null = null; - if (![StepTypeEnum.DIGEST, StepTypeEnum.DELAY, StepTypeEnum.TRIGGER].includes(stepType as any)) { + if (![StepTypeEnum.DIGEST, StepTypeEnum.DELAY, StepTypeEnum.TRIGGER].includes(stepType as StepTypeEnum)) { bridgeResponse = await this.executeBridgeJob.execute({ ...command, variables, }); } const isBridgeSkipped = bridgeResponse?.options?.skip; - const { filterResult, channelPreferenceResult } = await this.getStepExecutionHalt( - isBridgeSkipped, - command, - variables - ); + const { stepCondition, channelPreference } = await this.evaluateFilters(isBridgeSkipped, command, variables); if (!command.payload?.$on_boarding_trigger) { - this.sendProcessStepEvent( - command, - isBridgeSkipped, - filterResult, - channelPreferenceResult, - !!bridgeResponse?.outputs - ); + this.sendProcessStepEvent(command, isBridgeSkipped, stepCondition, channelPreference, !!bridgeResponse?.outputs); } - if (!filterResult?.passed || !channelPreferenceResult || isBridgeSkipped) { + if (!stepCondition?.passed || !channelPreference || isBridgeSkipped) { await this.jobRepository.updateStatus(command.environmentId, command.jobId, JobStatusEnum.CANCELED); await this.executionLogRoute.execute( @@ -131,8 +122,10 @@ export class SendMessage { isTest: false, isRetry: false, raw: JSON.stringify({ - ...(filterResult ? { filter: { conditions: filterResult?.conditions, passed: filterResult?.passed } } : {}), - ...(channelPreferenceResult ? { preferences: { passed: channelPreferenceResult } } : {}), + ...(stepCondition + ? { filter: { conditions: stepCondition?.conditions, passed: stepCondition?.passed } } + : {}), + ...(channelPreference ? { preferences: { passed: channelPreference } } : {}), ...(isBridgeSkipped ? { skip: isBridgeSkipped } : {}), }), }) @@ -201,39 +194,24 @@ export class SendMessage { return { status: 'success' }; } - private async getStepExecutionHalt( + private async evaluateFilters( bridgeSkip: boolean | undefined, command: SendMessageCommand, variables: IFilterVariables - ): Promise<{ filterResult: IConditionsFilterResponse | null; channelPreferenceResult: boolean | null }> { - const skipHalt = this.shouldSkipHalt(bridgeSkip, command.job?.step?.bridgeUrl); - if (skipHalt) { - return { filterResult: { passed: true, conditions: [], variables: {} }, channelPreferenceResult: true }; + ): Promise<{ stepCondition: IConditionsFilterResponse | null; channelPreference: boolean | null }> { + if (bridgeSkip === true) { + return { stepCondition: { passed: true, conditions: [], variables: {} }, channelPreference: true }; } - const [filterResult, channelPreferenceResult] = await Promise.all([ - this.filter(command, variables), - this.filterPreferredChannels(command.job), + const [stepCondition, channelPreference] = await Promise.all([ + this.evaluateStepCondition(command, variables), + this.evaluateChannelPreference(command), ]); - return { filterResult, channelPreferenceResult }; - } - - /** - * This function checks if a bridge skip is happening. - * - * - If `bridgeSkip` is true (highest priority), skips all checks. - * - If `bridgeUrl` is provided, skips all checks (use `skip` option in workflow definition instead). - * - * @param bridgeSkip Whether to skip bridge checks (optional). - * @param bridgeUrl URL of the bridge (optional). - * @return True if bridge skip is happening, false otherwise. - */ - private shouldSkipHalt(bridgeSkip: boolean | undefined, bridgeUrl: string | undefined): boolean { - return bridgeSkip === true || !!bridgeUrl; + return { stepCondition, channelPreference }; } - private async filter(command: SendMessageCommand, variables: IFilterVariables) { + private async evaluateStepCondition(command: SendMessageCommand, variables: IFilterVariables) { return await this.conditionsFilter.filter( ConditionsFilterCommand.create({ filters: command.job.step.filters || [], @@ -301,16 +279,19 @@ export class SendMessage { } @Instrument() - private async filterPreferredChannels(job: JobEntity): Promise { - const template = await this.getNotificationTemplate({ + private async evaluateChannelPreference(command: SendMessageCommand): Promise { + const { job } = command; + + const workflow = await this.getWorkflow({ _id: job._templateId, environmentId: job._environmentId, }); - if (!template) { - throw new PlatformException(`Notification template ${job._templateId} is not found`); - } - if (template.critical || this.isActionStep(job)) { + /* + * The `critical` flag check is needed here for backward-compatibility of V1 Workflow Preferences only. + * V2 Workflow Preferences are stored on the Preference entity instead. + */ + if (workflow?.critical || this.isActionStep(job)) { return true; } @@ -346,18 +327,40 @@ export class SendMessage { return false; } - const { preference } = await this.getSubscriberTemplatePreferenceUsecase.execute( - GetSubscriberTemplatePreferenceCommand.create({ - organizationId: job._organizationId, - subscriberId: subscriber.subscriberId, - environmentId: job._environmentId, - template, - subscriber, - tenant: job.tenant, - }) - ); + let subscriberPreference: { enabled: boolean; channels: IPreferenceChannels }; + if (command.statelessPreferences) { + /* + * Stateless Workflow executions do not have their definitions stored in the database. + * Their preferences are available in the command instead. + * + * TODO: Refactor the send-message flow to better handle stateless workflows + */ + const workflowPreference = GetPreferences.mapWorkflowChannelPreferencesToChannelPreferences( + command.statelessPreferences + ); + subscriberPreference = { + enabled: true, + channels: workflowPreference, + }; + } else { + if (!workflow) { + throw new PlatformException(`Workflow with id '${job._templateId}' was not found`); + } + + const { preference } = await this.getSubscriberTemplatePreferenceUsecase.execute( + GetSubscriberTemplatePreferenceCommand.create({ + organizationId: job._organizationId, + subscriberId: subscriber.subscriberId, + environmentId: job._environmentId, + template: workflow, + subscriber, + tenant: job.tenant, + }) + ); + subscriberPreference = preference; + } - const result = this.stepPreferred(preference, job); + const result = this.stepPreferred(subscriberPreference, job); if (!result) { await this.executionLogRoute.execute( @@ -368,7 +371,7 @@ export class SendMessage { status: ExecutionDetailsStatusEnum.SUCCESS, isTest: false, isRetry: false, - raw: JSON.stringify(preference), + raw: JSON.stringify(subscriberPreference), }) ); } @@ -413,7 +416,7 @@ export class SendMessage { _id: command._id, }), }) - private async getNotificationTemplate({ _id, environmentId }: { _id: string; environmentId: string }) { + private async getWorkflow({ _id, environmentId }: { _id: string; environmentId: string }) { return await this.notificationTemplateRepository.findById(_id, environmentId); } @@ -439,13 +442,13 @@ export class SendMessage { @Instrument() private stepPreferred(preference: { enabled: boolean; channels: IPreferenceChannels }, job: JobEntity) { - const templatePreferred = preference.enabled; + const workflowPreferred = preference.enabled; const channelPreferred = Object.keys(preference.channels).some( (channelKey) => channelKey === job.type && preference.channels[job.type] ); - return templatePreferred && channelPreferred; + return workflowPreferred && channelPreferred; } private isActionStep(job: JobEntity) { diff --git a/apps/worker/src/app/workflow/usecases/subscriber-job-bound/subscriber-job-bound.usecase.ts b/apps/worker/src/app/workflow/usecases/subscriber-job-bound/subscriber-job-bound.usecase.ts index ba7c28c60a5..55b3cdea56b 100644 --- a/apps/worker/src/app/workflow/usecases/subscriber-job-bound/subscriber-job-bound.usecase.ts +++ b/apps/worker/src/app/workflow/usecases/subscriber-job-bound/subscriber-job-bound.usecase.ts @@ -136,6 +136,7 @@ export class SubscriberJobBound { userId, tenant, bridgeUrl: command.bridge?.url, + preferences: command.bridge?.workflow?.preferences, }; if (actor) { diff --git a/libs/application-generic/src/usecases/create-notification-jobs/create-notification-jobs.command.ts b/libs/application-generic/src/usecases/create-notification-jobs/create-notification-jobs.command.ts index c091fe11e13..284a9c538c1 100644 --- a/libs/application-generic/src/usecases/create-notification-jobs/create-notification-jobs.command.ts +++ b/libs/application-generic/src/usecases/create-notification-jobs/create-notification-jobs.command.ts @@ -7,6 +7,7 @@ import { ISubscribersDefine, ITenantDefine, ProvidersIdEnum, + WorkflowChannelPreferences, } from '@novu/shared'; import { EnvironmentWithUserCommand } from '../../commands'; @@ -47,4 +48,6 @@ export class CreateNotificationJobsCommand extends EnvironmentWithUserCommand { bridgeUrl?: string; controls?: ControlsDto; + + preferences?: WorkflowChannelPreferences; } diff --git a/libs/application-generic/src/usecases/create-notification-jobs/create-notification-jobs.usecase.ts b/libs/application-generic/src/usecases/create-notification-jobs/create-notification-jobs.usecase.ts index d44a211c2ee..a4a8bb2eeec 100644 --- a/libs/application-generic/src/usecases/create-notification-jobs/create-notification-jobs.usecase.ts +++ b/libs/application-generic/src/usecases/create-notification-jobs/create-notification-jobs.usecase.ts @@ -113,6 +113,7 @@ export class CreateNotificationJobs { _actorId: command.actor?._id, actorId: command.actor?.subscriberId, }), + preferences: command.preferences, }; jobs.push(job); diff --git a/libs/application-generic/src/usecases/get-preferences/get-preferences.usecase.ts b/libs/application-generic/src/usecases/get-preferences/get-preferences.usecase.ts index 5559eebb795..fbfa0f30167 100644 --- a/libs/application-generic/src/usecases/get-preferences/get-preferences.usecase.ts +++ b/libs/application-generic/src/usecases/get-preferences/get-preferences.usecase.ts @@ -61,6 +61,10 @@ export class GetPreferences { }): Promise { const result = await this.getWorkflowChannelPreferences(command); + if (!result) { + return undefined; + } + return GetPreferences.mapWorkflowChannelPreferencesToChannelPreferences( result, ); @@ -94,11 +98,7 @@ export class GetPreferences { /** Transform WorkflowChannelPreferences into IPreferenceChannels */ public static mapWorkflowChannelPreferencesToChannelPreferences( workflowPreferences: WorkflowChannelPreferences, - ): IPreferenceChannels | undefined { - if (!workflowPreferences) { - return undefined; - } - + ): IPreferenceChannels { return { in_app: workflowPreferences.channels.in_app.defaultValue !== undefined diff --git a/libs/application-generic/src/usecases/get-subscriber-template-preference/get-subscriber-template-preference.usecase.ts b/libs/application-generic/src/usecases/get-subscriber-template-preference/get-subscriber-template-preference.usecase.ts index fc1e72a5b76..d205859fcd2 100644 --- a/libs/application-generic/src/usecases/get-subscriber-template-preference/get-subscriber-template-preference.usecase.ts +++ b/libs/application-generic/src/usecases/get-subscriber-template-preference/get-subscriber-template-preference.usecase.ts @@ -57,6 +57,10 @@ export class GetSubscriberTemplatePreference { } const initialActiveChannels = await this.getActiveChannels(command); + + /** + * V1 preference object. + */ const subscriberPreference = await this.subscriberPreferenceRepository.findOne( { @@ -71,6 +75,9 @@ export class GetSubscriberTemplatePreference { const templateChannelPreference = command.template.preferenceSettings; + /** + * V2 preference object. + */ const subscriberWorkflowPreferences = await this.getPreferences.getWorkflowChannelPreferences({ environmentId: command.environmentId, @@ -79,10 +86,11 @@ export class GetSubscriberTemplatePreference { templateId: command.template._id, }); - const subscriberPreferenceChannels = - GetPreferences.mapWorkflowChannelPreferencesToChannelPreferences( - subscriberWorkflowPreferences, - ) || subscriberPreference?.channels; + const subscriberPreferenceChannels = subscriberWorkflowPreferences + ? GetPreferences.mapWorkflowChannelPreferencesToChannelPreferences( + subscriberWorkflowPreferences, + ) + : subscriberPreference?.channels; const workflowOverrideChannelPreference = workflowOverride?.preferenceSettings; diff --git a/libs/dal/src/repositories/job/job.entity.ts b/libs/dal/src/repositories/job/job.entity.ts index bc5eac1c2f1..85c5d43d8f4 100644 --- a/libs/dal/src/repositories/job/job.entity.ts +++ b/libs/dal/src/repositories/job/job.entity.ts @@ -1,4 +1,10 @@ -import { StepTypeEnum, IWorkflowStepMetadata, JobStatusEnum, ITenantDefine } from '@novu/shared'; +import { + StepTypeEnum, + IWorkflowStepMetadata, + JobStatusEnum, + ITenantDefine, + WorkflowChannelPreferences, +} from '@novu/shared'; import { Types } from 'mongoose'; import { NotificationStepEntity } from '../notification-template'; @@ -41,6 +47,7 @@ export class JobEntity { _actorId?: string; actorId?: string; stepOutput?: Record; + preferences?: WorkflowChannelPreferences; } export type JobDBModel = ChangePropsValueType< diff --git a/libs/dal/src/repositories/job/job.schema.ts b/libs/dal/src/repositories/job/job.schema.ts index 915336f0843..72a48f69fb3 100644 --- a/libs/dal/src/repositories/job/job.schema.ts +++ b/libs/dal/src/repositories/job/job.schema.ts @@ -130,6 +130,7 @@ const jobSchema = new Schema( }, expireAt: Schema.Types.Date, stepOutput: Schema.Types.Mixed, + preferences: Schema.Types.Mixed, }, schemaOptions );