Skip to content

Commit

Permalink
feat(api, worker, application-generic, dal): Resolve preferences for …
Browse files Browse the repository at this point in the history
…stateless workflow executions (#6527)
  • Loading branch information
rifont authored Sep 20, 2024
1 parent a5a3669 commit e967567
Show file tree
Hide file tree
Showing 12 changed files with 179 additions and 72 deletions.
76 changes: 76 additions & 0 deletions apps/api/src/app/events/e2e/bridge-trigger.e2e.ts
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,82 @@ contexts.forEach((context: Context) => {
expect(messageBodies).to.include('Hello there 1');
expect(messageBodies).to.include('Hello there 2');
});

it(`should deliver message if inApp is enabled via workflow preferences [${context.name}]`, async () => {
process.env.IS_WORKFLOW_PREFERENCES_ENABLED = 'true';
const workflowId = `enabled-inapp-workflow-${`${context.name}-${uuidv4()}`}`;
const newWorkflow = workflow(
workflowId,
async ({ step }) => {
await step.inApp('send-in-app', () => ({ body: 'Hello there 1' }));
},
{
preferences: {
channels: {
inApp: {
defaultValue: true,
},
},
},
}
);

await bridgeServer.start({ workflows: [newWorkflow] });

if (context.isStateful) {
await discoverAndSyncBridge(session, workflowsRepository, workflowId, bridgeServer);
}

await triggerEvent(session, workflowId, subscriber, {}, bridge);
await session.awaitRunningJobs();

const sentMessages = await messageRepository.find({
_environmentId: session.environment._id,
_subscriberId: subscriber._id,
templateIdentifier: workflowId,
channel: StepTypeEnum.IN_APP,
});

expect(sentMessages.length).to.be.eq(1);
});

it(`should NOT deliver message if inApp is disabled via workflow preferences [${context.name}]`, async () => {
process.env.IS_WORKFLOW_PREFERENCES_ENABLED = 'true';
const workflowId = `disabled-inapp-workflow-${`${context.name}-${uuidv4()}`}`;
const newWorkflow = workflow(
workflowId,
async ({ step }) => {
await step.inApp('send-in-app', () => ({ body: 'Hello there 1' }));
},
{
preferences: {
channels: {
inApp: {
defaultValue: false,
},
},
},
}
);

await bridgeServer.start({ workflows: [newWorkflow] });

if (context.isStateful) {
await discoverAndSyncBridge(session, workflowsRepository, workflowId, bridgeServer);
}

await triggerEvent(session, workflowId, subscriber, {}, bridge);
await session.awaitRunningJobs();

const sentMessages = await messageRepository.find({
_environmentId: session.environment._id,
_subscriberId: subscriber._id,
templateIdentifier: workflowId,
channel: StepTypeEnum.IN_APP,
});

expect(sentMessages.length).to.be.eq(0);
});
});
});

Expand Down
2 changes: 2 additions & 0 deletions apps/worker/src/.env.test
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,5 @@ IS_USE_MERGED_DIGEST_ID_ENABLED=true

BROADCAST_QUEUE_CHUNK_SIZE=100
MULTICAST_QUEUE_CHUNK_SIZE=100

IS_WORKFLOW_PREFERENCES_ENABLED=true
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ export class RunJob {
events: job.digest?.events,
job,
tags: notification.tags || [],
statelessPreferences: job.preferences,
})
);

Expand All @@ -110,7 +111,7 @@ export class RunJob {
if (job.step.shouldStopOnFail || this.shouldBackoff(error)) {
shouldQueueNextJob = false;
}
throw new PlatformException(error.message);
throw error;
} finally {
if (shouldQueueNextJob) {
const newJob = await this.queueNextJob.execute(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { IsDefined, IsOptional, IsString } from 'class-validator';
import { NotificationStepEntity, JobEntity } from '@novu/dal';
import { EnvironmentWithUserCommand } from '@novu/application-generic';
import { ExecuteOutput } from '@novu/framework';
import { WorkflowChannelPreferences } from '@novu/shared';

export class SendMessageCommand extends EnvironmentWithUserCommand {
@IsDefined()
Expand Down Expand Up @@ -50,4 +51,7 @@ export class SendMessageCommand extends EnvironmentWithUserCommand {

@IsDefined()
tags: string[];

@IsOptional()
statelessPreferences?: WorkflowChannelPreferences;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import {
DetailEnum,
ExecutionLogRoute,
ExecutionLogRouteCommand,
GetPreferences,
GetSubscriberGlobalPreference,
GetSubscriberGlobalPreferenceCommand,
GetSubscriberTemplatePreference,
Expand Down Expand Up @@ -96,30 +97,20 @@ export class SendMessage {
const stepType = command.step?.template?.type;

let bridgeResponse: ExecuteOutput | null = null;
if (![StepTypeEnum.DIGEST, StepTypeEnum.DELAY, StepTypeEnum.TRIGGER].includes(stepType as any)) {
if (![StepTypeEnum.DIGEST, StepTypeEnum.DELAY, StepTypeEnum.TRIGGER].includes(stepType as StepTypeEnum)) {
bridgeResponse = await this.executeBridgeJob.execute({
...command,
variables,
});
}
const isBridgeSkipped = bridgeResponse?.options?.skip;
const { filterResult, channelPreferenceResult } = await this.getStepExecutionHalt(
isBridgeSkipped,
command,
variables
);
const { stepCondition, channelPreference } = await this.evaluateFilters(isBridgeSkipped, command, variables);

if (!command.payload?.$on_boarding_trigger) {
this.sendProcessStepEvent(
command,
isBridgeSkipped,
filterResult,
channelPreferenceResult,
!!bridgeResponse?.outputs
);
this.sendProcessStepEvent(command, isBridgeSkipped, stepCondition, channelPreference, !!bridgeResponse?.outputs);
}

if (!filterResult?.passed || !channelPreferenceResult || isBridgeSkipped) {
if (!stepCondition?.passed || !channelPreference || isBridgeSkipped) {
await this.jobRepository.updateStatus(command.environmentId, command.jobId, JobStatusEnum.CANCELED);

await this.executionLogRoute.execute(
Expand All @@ -131,8 +122,10 @@ export class SendMessage {
isTest: false,
isRetry: false,
raw: JSON.stringify({
...(filterResult ? { filter: { conditions: filterResult?.conditions, passed: filterResult?.passed } } : {}),
...(channelPreferenceResult ? { preferences: { passed: channelPreferenceResult } } : {}),
...(stepCondition
? { filter: { conditions: stepCondition?.conditions, passed: stepCondition?.passed } }
: {}),
...(channelPreference ? { preferences: { passed: channelPreference } } : {}),
...(isBridgeSkipped ? { skip: isBridgeSkipped } : {}),
}),
})
Expand Down Expand Up @@ -201,39 +194,24 @@ export class SendMessage {
return { status: 'success' };
}

private async getStepExecutionHalt(
private async evaluateFilters(
bridgeSkip: boolean | undefined,
command: SendMessageCommand,
variables: IFilterVariables
): Promise<{ filterResult: IConditionsFilterResponse | null; channelPreferenceResult: boolean | null }> {
const skipHalt = this.shouldSkipHalt(bridgeSkip, command.job?.step?.bridgeUrl);
if (skipHalt) {
return { filterResult: { passed: true, conditions: [], variables: {} }, channelPreferenceResult: true };
): Promise<{ stepCondition: IConditionsFilterResponse | null; channelPreference: boolean | null }> {
if (bridgeSkip === true) {
return { stepCondition: { passed: true, conditions: [], variables: {} }, channelPreference: true };
}

const [filterResult, channelPreferenceResult] = await Promise.all([
this.filter(command, variables),
this.filterPreferredChannels(command.job),
const [stepCondition, channelPreference] = await Promise.all([
this.evaluateStepCondition(command, variables),
this.evaluateChannelPreference(command),
]);

return { filterResult, channelPreferenceResult };
}

/**
* This function checks if a bridge skip is happening.
*
* - If `bridgeSkip` is true (highest priority), skips all checks.
* - If `bridgeUrl` is provided, skips all checks (use `skip` option in workflow definition instead).
*
* @param bridgeSkip Whether to skip bridge checks (optional).
* @param bridgeUrl URL of the bridge (optional).
* @return True if bridge skip is happening, false otherwise.
*/
private shouldSkipHalt(bridgeSkip: boolean | undefined, bridgeUrl: string | undefined): boolean {
return bridgeSkip === true || !!bridgeUrl;
return { stepCondition, channelPreference };
}

private async filter(command: SendMessageCommand, variables: IFilterVariables) {
private async evaluateStepCondition(command: SendMessageCommand, variables: IFilterVariables) {
return await this.conditionsFilter.filter(
ConditionsFilterCommand.create({
filters: command.job.step.filters || [],
Expand Down Expand Up @@ -301,16 +279,19 @@ export class SendMessage {
}

@Instrument()
private async filterPreferredChannels(job: JobEntity): Promise<boolean> {
const template = await this.getNotificationTemplate({
private async evaluateChannelPreference(command: SendMessageCommand): Promise<boolean> {
const { job } = command;

const workflow = await this.getWorkflow({
_id: job._templateId,
environmentId: job._environmentId,
});
if (!template) {
throw new PlatformException(`Notification template ${job._templateId} is not found`);
}

if (template.critical || this.isActionStep(job)) {
/*
* The `critical` flag check is needed here for backward-compatibility of V1 Workflow Preferences only.
* V2 Workflow Preferences are stored on the Preference entity instead.
*/
if (workflow?.critical || this.isActionStep(job)) {
return true;
}

Expand Down Expand Up @@ -346,18 +327,40 @@ export class SendMessage {
return false;
}

const { preference } = await this.getSubscriberTemplatePreferenceUsecase.execute(
GetSubscriberTemplatePreferenceCommand.create({
organizationId: job._organizationId,
subscriberId: subscriber.subscriberId,
environmentId: job._environmentId,
template,
subscriber,
tenant: job.tenant,
})
);
let subscriberPreference: { enabled: boolean; channels: IPreferenceChannels };
if (command.statelessPreferences) {
/*
* Stateless Workflow executions do not have their definitions stored in the database.
* Their preferences are available in the command instead.
*
* TODO: Refactor the send-message flow to better handle stateless workflows
*/
const workflowPreference = GetPreferences.mapWorkflowChannelPreferencesToChannelPreferences(
command.statelessPreferences
);
subscriberPreference = {
enabled: true,
channels: workflowPreference,
};
} else {
if (!workflow) {
throw new PlatformException(`Workflow with id '${job._templateId}' was not found`);
}

const { preference } = await this.getSubscriberTemplatePreferenceUsecase.execute(
GetSubscriberTemplatePreferenceCommand.create({
organizationId: job._organizationId,
subscriberId: subscriber.subscriberId,
environmentId: job._environmentId,
template: workflow,
subscriber,
tenant: job.tenant,
})
);
subscriberPreference = preference;
}

const result = this.stepPreferred(preference, job);
const result = this.stepPreferred(subscriberPreference, job);

if (!result) {
await this.executionLogRoute.execute(
Expand All @@ -368,7 +371,7 @@ export class SendMessage {
status: ExecutionDetailsStatusEnum.SUCCESS,
isTest: false,
isRetry: false,
raw: JSON.stringify(preference),
raw: JSON.stringify(subscriberPreference),
})
);
}
Expand Down Expand Up @@ -413,7 +416,7 @@ export class SendMessage {
_id: command._id,
}),
})
private async getNotificationTemplate({ _id, environmentId }: { _id: string; environmentId: string }) {
private async getWorkflow({ _id, environmentId }: { _id: string; environmentId: string }) {
return await this.notificationTemplateRepository.findById(_id, environmentId);
}

Expand All @@ -439,13 +442,13 @@ export class SendMessage {

@Instrument()
private stepPreferred(preference: { enabled: boolean; channels: IPreferenceChannels }, job: JobEntity) {
const templatePreferred = preference.enabled;
const workflowPreferred = preference.enabled;

const channelPreferred = Object.keys(preference.channels).some(
(channelKey) => channelKey === job.type && preference.channels[job.type]
);

return templatePreferred && channelPreferred;
return workflowPreferred && channelPreferred;
}

private isActionStep(job: JobEntity) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ export class SubscriberJobBound {
userId,
tenant,
bridgeUrl: command.bridge?.url,
preferences: command.bridge?.workflow?.preferences,
};

if (actor) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
ISubscribersDefine,
ITenantDefine,
ProvidersIdEnum,
WorkflowChannelPreferences,
} from '@novu/shared';

import { EnvironmentWithUserCommand } from '../../commands';
Expand Down Expand Up @@ -47,4 +48,6 @@ export class CreateNotificationJobsCommand extends EnvironmentWithUserCommand {
bridgeUrl?: string;

controls?: ControlsDto;

preferences?: WorkflowChannelPreferences;
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ export class CreateNotificationJobs {
_actorId: command.actor?._id,
actorId: command.actor?.subscriberId,
}),
preferences: command.preferences,
};

jobs.push(job);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ export class GetPreferences {
}): Promise<IPreferenceChannels | undefined> {
const result = await this.getWorkflowChannelPreferences(command);

if (!result) {
return undefined;
}

return GetPreferences.mapWorkflowChannelPreferencesToChannelPreferences(
result,
);
Expand Down Expand Up @@ -94,11 +98,7 @@ export class GetPreferences {
/** Transform WorkflowChannelPreferences into IPreferenceChannels */
public static mapWorkflowChannelPreferencesToChannelPreferences(
workflowPreferences: WorkflowChannelPreferences,
): IPreferenceChannels | undefined {
if (!workflowPreferences) {
return undefined;
}

): IPreferenceChannels {
return {
in_app:
workflowPreferences.channels.in_app.defaultValue !== undefined
Expand Down
Loading

0 comments on commit e967567

Please sign in to comment.