diff --git a/apps/api/src/app/events/e2e/echo-trigger.e2e-ee.ts b/apps/api/src/app/events/e2e/echo-trigger.e2e-ee.ts index 93c955aa53a..98068eecbc7 100644 --- a/apps/api/src/app/events/e2e/echo-trigger.e2e-ee.ts +++ b/apps/api/src/app/events/e2e/echo-trigger.e2e-ee.ts @@ -435,6 +435,90 @@ describe('Echo Trigger ', async () => { expect(messagesAfter.length).to.be.eq(1); expect(messagesAfter[0].content).to.include('2 people liked your post'); }); + + it('should trigger the echo workflow with delay', async () => { + const workflowId = 'delay-workflow'; + await echoServer.echo.workflow( + workflowId, + async ({ step }) => { + const delayResponse = await step.delay( + 'delay-id', + async (inputs) => { + return { + type: 'regular', + amount: inputs.amount, + unit: inputs.unit, + }; + }, + { + inputSchema: { + type: 'object', + properties: { + amount: { + type: 'number', + default: 2, + }, + unit: { + type: 'string', + enum: ['seconds', 'minutes', 'hours', 'days', 'weeks', 'months'], + default: 'seconds', + }, + }, + }, + } + ); + + await step.sms( + 'send-sms', + async () => { + const duration = delayResponse.duration; + + return { + body: `people waited for ${duration} seconds`, + }; + }, + { + inputSchema: { + type: 'object', + properties: {}, + }, + } + ); + }, + { + payloadSchema: { + type: 'object', + properties: { + name: { type: 'string', default: 'default_name' }, + }, + required: [], + additionalProperties: false, + } as const, + } + ); + + await discoverAndSyncEcho(session); + + const workflow = await workflowsRepository.findByTriggerIdentifier(session.environment._id, workflowId); + expect(workflow).to.be.ok; + + if (!workflow) { + throw new Error('Workflow not found'); + } + + await triggerEvent(session, workflowId, subscriber); + + await session.awaitRunningJobs(workflow?._id, true, 0); + + const messagesAfter = await messageRepository.find({ + _environmentId: session.environment._id, + _subscriberId: subscriber._id, + channel: StepTypeEnum.SMS, + }); + + expect(messagesAfter.length).to.be.eq(1); + expect(messagesAfter[0].content).to.match(/people waited for \d+ seconds/); + }); }); async function syncWorkflow(session) { @@ -472,10 +556,12 @@ async function triggerEvent(session, workflowId: string, subscriber, payload?: a async function discoverAndSyncEcho(session: UserSession) { const resultDiscover = await axios.get(echoServer.serverPath + '/echo?action=discover'); - await session.testAgent.post(`/v1/echo/sync`).send({ + const discoverResponse = await session.testAgent.post(`/v1/echo/sync`).send({ bridgeUrl: echoServer.serverPath + '/echo', workflows: resultDiscover.data.workflows, }); + + return discoverResponse; } async function markAllSubscriberMessagesAs(session: UserSession, subscriberId: string, markAs: MarkMessagesAsEnum) { diff --git a/apps/api/src/app/shared/shared.module.ts b/apps/api/src/app/shared/shared.module.ts index ddb20327fc6..404e021e54a 100644 --- a/apps/api/src/app/shared/shared.module.ts +++ b/apps/api/src/app/shared/shared.module.ts @@ -28,7 +28,7 @@ import { analyticsService, cacheService, CacheServiceHealthIndicator, - CalculateDelayService, + ComputeJobWaitDurationService, createNestLoggingModuleOptions, DalServiceHealthIndicator, distributedLockService, @@ -84,7 +84,7 @@ const PROVIDERS = [ analyticsService, cacheService, CacheServiceHealthIndicator, - CalculateDelayService, + ComputeJobWaitDurationService, dalService, DalServiceHealthIndicator, distributedLockService, diff --git a/apps/web/src/components/execution-detail/ExecutionDetailsStepHeader.tsx b/apps/web/src/components/execution-detail/ExecutionDetailsStepHeader.tsx index e9a13dd3405..9f641cf0882 100644 --- a/apps/web/src/components/execution-detail/ExecutionDetailsStepHeader.tsx +++ b/apps/web/src/components/execution-detail/ExecutionDetailsStepHeader.tsx @@ -80,22 +80,26 @@ const StepLogo = ({ status, type }) => { ); }; -const generateDetailByStepAndStatus = (status, step) => { +const generateDetailByStepAndStatus = (status, job) => { if (status === JobStatusEnum.COMPLETED) { - return `Success! ${step.executionDetails?.at(-1)?.detail}`; + return `Success! ${job.executionDetails?.at(-1)?.detail}`; } - if (step.type === StepTypeEnum.DIGEST) { + if (job.type === StepTypeEnum.DIGEST) { if (status === JobStatusEnum.SKIPPED) { - return step.executionDetails?.at(-1)?.detail; + return job.executionDetails?.at(-1)?.detail; } - const { digest } = step; + const { digest } = job; + + if (!digest.amount && !digest.unit) return `Waiting to receive digest times from bridge endpoint`; return `Digesting events for ${digest.amount} ${digest.unit}`; } - if (step.type === StepTypeEnum.DELAY) { - const { digest, step: stepMetadata, payload } = step; + if (job.type === StepTypeEnum.DELAY) { + const { digest, step: stepMetadata, payload } = job; + + if (!digest.amount && !digest.unit) return `Waiting to receive execution delay from bridge endpoint`; if (stepMetadata.metadata.type === DelayTypeEnum.SCHEDULED) { return `Delaying execution until ${payload[stepMetadata.metadata.delayPath]}`; } @@ -103,7 +107,7 @@ const generateDetailByStepAndStatus = (status, step) => { return `Delaying execution for ${digest.amount} ${digest.unit}`; } - return step.executionDetails?.at(-1)?.detail; + return job.executionDetails?.at(-1)?.detail; }; const getDetailsStyledComponentByStepStatus = (status) => { diff --git a/apps/worker/src/app/shared/shared.module.ts b/apps/worker/src/app/shared/shared.module.ts index 7905076ec05..97ccf7d6141 100644 --- a/apps/worker/src/app/shared/shared.module.ts +++ b/apps/worker/src/app/shared/shared.module.ts @@ -28,7 +28,7 @@ import { analyticsService, BulkCreateExecutionDetails, cacheService, - CalculateDelayService, + ComputeJobWaitDurationService, CreateExecutionDetails, createNestLoggingModuleOptions, CreateNotificationJobs, @@ -99,7 +99,7 @@ const PROVIDERS = [ analyticsService, BulkCreateExecutionDetails, cacheService, - CalculateDelayService, + ComputeJobWaitDurationService, CreateExecutionDetails, CreateLog, CreateNotificationJobs, diff --git a/apps/worker/src/app/workflow/usecases/add-job/add-delay-job.usecase.ts b/apps/worker/src/app/workflow/usecases/add-job/add-delay-job.usecase.ts index e083be54c20..fe56be11833 100644 --- a/apps/worker/src/app/workflow/usecases/add-job/add-delay-job.usecase.ts +++ b/apps/worker/src/app/workflow/usecases/add-job/add-delay-job.usecase.ts @@ -4,7 +4,7 @@ import { JobRepository, JobStatusEnum } from '@novu/dal'; import { DelayTypeEnum, ExecutionDetailsSourceEnum, ExecutionDetailsStatusEnum, StepTypeEnum } from '@novu/shared'; import { ApiException, - CalculateDelayService, + ComputeJobWaitDurationService, DetailEnum, ExecutionLogRoute, ExecutionLogRouteCommand, @@ -17,8 +17,8 @@ import { AddJobCommand } from './add-job.command'; export class AddDelayJob { constructor( private jobRepository: JobRepository, - @Inject(forwardRef(() => CalculateDelayService)) - private calculateDelayService: CalculateDelayService, + @Inject(forwardRef(() => ComputeJobWaitDurationService)) + private computeJobWaitDurationService: ComputeJobWaitDurationService, @Inject(forwardRef(() => ExecutionLogRoute)) private executionLogRoute: ExecutionLogRoute ) {} @@ -38,17 +38,11 @@ export class AddDelayJob { let delay; try { - delay = this.calculateDelayService.calculateDelay({ + delay = this.computeJobWaitDurationService.calculateDelay({ stepMetadata: data.step.metadata, payload: data.payload, overrides: data.overrides, - // TODO: Remove fallback after other delay types are implemented. - bridgeResponse: command.bridgeResponse?.outputs - ? { - type: DelayTypeEnum.REGULAR, - ...command.bridgeResponse?.outputs, - } - : undefined, + bridgeResponse: command.bridgeResponse?.outputs, }); await this.jobRepository.updateStatus(command.environmentId, data._id, JobStatusEnum.DELAYED); diff --git a/apps/worker/src/app/workflow/usecases/add-job/add-job.usecase.ts b/apps/worker/src/app/workflow/usecases/add-job/add-job.usecase.ts index 47226155483..070f21f267e 100644 --- a/apps/worker/src/app/workflow/usecases/add-job/add-job.usecase.ts +++ b/apps/worker/src/app/workflow/usecases/add-job/add-job.usecase.ts @@ -15,7 +15,7 @@ import { AddJobCommand } from './add-job.command'; import { validateDigest } from './validation'; import { ModuleRef } from '@nestjs/core'; import { - CalculateDelayService, + ComputeJobWaitDurationService, ConditionsFilter, ConditionsFilterCommand, DetailEnum, @@ -52,8 +52,8 @@ export class AddJob { private executionLogRoute: ExecutionLogRoute, private mergeOrCreateDigestUsecase: MergeOrCreateDigest, private addDelayJob: AddDelayJob, - @Inject(forwardRef(() => CalculateDelayService)) - private calculateDelayService: CalculateDelayService, + @Inject(forwardRef(() => ComputeJobWaitDurationService)) + private computeJobWaitDurationService: ComputeJobWaitDurationService, @Inject(forwardRef(() => ConditionsFilter)) private conditionsFilter: ConditionsFilter, private normalizeVariablesUsecase: NormalizeVariables, @@ -170,18 +170,39 @@ export class AddJob { filterVariables: IFilterVariables, delayAmount: number | undefined ) { - command.bridgeResponse = await this.resonateUsecase.execute< + command.bridgeResponse = await this.fetchResonateData(command, filterVariables); + delayAmount = await this.addDelayJob.execute(command); + + Logger.debug(`Delay step Amount is: ${delayAmount}`, LOG_CONTEXT); + + return delayAmount; + } + + private async fetchResonateData(command: AddJobCommand, filterVariables: IFilterVariables) { + const response = await this.resonateUsecase.execute< AddJobCommand & { variables: IFilterVariables }, ExecuteOutput >({ ...command, variables: filterVariables, }); - delayAmount = await this.addDelayJob.execute(command); - Logger.debug(`Delay step Amount is: ${delayAmount}`, LOG_CONTEXT); + await this.jobRepository.updateOne( + { + _id: command.job._id, + _environmentId: command.environmentId, + }, + { + $set: { + 'digest.amount': response?.outputs?.amount, + 'digest.unit': response?.outputs?.unit, + 'digest.type': response?.outputs?.type, + // TODO: Add other types for scheduled etc.. + }, + } + ); - return delayAmount; + return response; } private async handleDigest( @@ -191,17 +212,11 @@ export class AddJob { digestAmount: number | undefined, filtered: boolean ) { - const resonateResponse = await this.resonateUsecase.execute< - AddJobCommand & { variables: IFilterVariables }, - ExecuteOutput - >({ - ...command, - variables: filterVariables, - }); + const resonateResponse = await this.fetchResonateData(command, filterVariables); validateDigest(job); - digestAmount = this.calculateDelayService.calculateDelay({ + digestAmount = this.computeJobWaitDurationService.calculateDelay({ stepMetadata: job.digest, payload: job.payload, overrides: job.overrides, diff --git a/libs/application-generic/src/services/calculate-delay/calculate-delay.service.spec.ts b/libs/application-generic/src/services/calculate-delay/compute-job-wait-duration.service.spec.ts similarity index 58% rename from libs/application-generic/src/services/calculate-delay/calculate-delay.service.spec.ts rename to libs/application-generic/src/services/calculate-delay/compute-job-wait-duration.service.spec.ts index d8171d58362..2f5181bfec5 100644 --- a/libs/application-generic/src/services/calculate-delay/calculate-delay.service.spec.ts +++ b/libs/application-generic/src/services/calculate-delay/compute-job-wait-duration.service.spec.ts @@ -1,12 +1,12 @@ import { DigestUnitEnum } from '@novu/shared'; -import { CalculateDelayService } from './calculate-delay.service'; +import { ComputeJobWaitDurationService } from './compute-job-wait-duration.service'; -describe('Calculate Delay Service', function () { +describe('Compute Job Wait Duration Service', function () { describe('toMilliseconds', function () { - const calculateDelayService = new CalculateDelayService(); + const computeJobWaitDurationService = new ComputeJobWaitDurationService(); it('should convert seconds to milliseconds', function () { - const result = (calculateDelayService as any).toMilliseconds( + const result = (computeJobWaitDurationService as any).toMilliseconds( 5, DigestUnitEnum.SECONDS ); @@ -14,7 +14,7 @@ describe('Calculate Delay Service', function () { }); it('should convert minutes to milliseconds', function () { - const result = (calculateDelayService as any).toMilliseconds( + const result = (computeJobWaitDurationService as any).toMilliseconds( 5, DigestUnitEnum.MINUTES ); @@ -22,7 +22,7 @@ describe('Calculate Delay Service', function () { }); it('should convert hours to milliseconds', function () { - const result = (calculateDelayService as any).toMilliseconds( + const result = (computeJobWaitDurationService as any).toMilliseconds( 5, DigestUnitEnum.HOURS ); @@ -30,7 +30,7 @@ describe('Calculate Delay Service', function () { }); it('should convert days to milliseconds', function () { - const result = (calculateDelayService as any).toMilliseconds( + const result = (computeJobWaitDurationService as any).toMilliseconds( 1, DigestUnitEnum.DAYS ); diff --git a/libs/application-generic/src/services/calculate-delay/calculate-delay.service.ts b/libs/application-generic/src/services/calculate-delay/compute-job-wait-duration.service.ts similarity index 93% rename from libs/application-generic/src/services/calculate-delay/calculate-delay.service.ts rename to libs/application-generic/src/services/calculate-delay/compute-job-wait-duration.service.ts index 0a7f4268b27..dea2fb80238 100644 --- a/libs/application-generic/src/services/calculate-delay/calculate-delay.service.ts +++ b/libs/application-generic/src/services/calculate-delay/compute-job-wait-duration.service.ts @@ -18,7 +18,7 @@ import { IBridgeDigestResponse, } from '../../utils/require-inject'; -export class CalculateDelayService { +export class ComputeJobWaitDurationService { calculateDelay({ stepMetadata, payload, @@ -41,7 +41,6 @@ export class CalculateDelayService { if (!delayPath) throw new ApiException(`Delay path not found`); const delayDate = payload[delayPath]; - const delay = differenceInMilliseconds(new Date(delayDate), new Date()); if (delay < 0) { @@ -51,12 +50,10 @@ export class CalculateDelayService { } return delay; - } - - const userUnit = castToDigestUnitEnum(bridgeResponse?.unit); - const userAmount = bridgeResponse?.amount; + } else if (isRegularDigest(digestType)) { + const userUnit = castToDigestUnitEnum(bridgeResponse?.unit); + const userAmount = bridgeResponse?.amount; - if (isRegularDigest(digestType)) { if (this.isValidDelayOverride(overrides)) { return this.toMilliseconds( userAmount ?? (overrides.delay.amount as number), @@ -70,9 +67,7 @@ export class CalculateDelayService { userAmount ?? regularDigestMeta.amount, userUnit ?? regularDigestMeta.unit ); - } - - if (digestType === DigestTypeEnum.TIMED) { + } else if (digestType === DigestTypeEnum.TIMED) { const timedDigestMeta = stepMetadata as IDigestTimedMetadata; return TimedDigestDelayService.calculate({ diff --git a/libs/application-generic/src/services/calculate-delay/index.ts b/libs/application-generic/src/services/calculate-delay/index.ts index 17e0936c066..f7a54a684de 100644 --- a/libs/application-generic/src/services/calculate-delay/index.ts +++ b/libs/application-generic/src/services/calculate-delay/index.ts @@ -1 +1 @@ -export * from './calculate-delay.service'; +export * from './compute-job-wait-duration.service'; diff --git a/libs/application-generic/src/usecases/create-notification-jobs/create-notification-jobs.usecase.ts b/libs/application-generic/src/usecases/create-notification-jobs/create-notification-jobs.usecase.ts index dca201f919f..fe0ec58f81f 100644 --- a/libs/application-generic/src/usecases/create-notification-jobs/create-notification-jobs.usecase.ts +++ b/libs/application-generic/src/usecases/create-notification-jobs/create-notification-jobs.usecase.ts @@ -20,7 +20,7 @@ import { import { InstrumentUsecase } from '../../instrumentation'; import { CreateNotificationJobsCommand } from './create-notification-jobs.command'; import { PlatformException } from '../../utils/exceptions'; -import { CalculateDelayService } from '../../services'; +import { ComputeJobWaitDurationService } from '../../services'; const LOG_CONTEXT = 'CreateNotificationUseCase'; type NotificationJob = Omit; @@ -30,8 +30,8 @@ export class CreateNotificationJobs { constructor( private digestFilterSteps: DigestFilterSteps, private notificationRepository: NotificationRepository, - @Inject(forwardRef(() => CalculateDelayService)) - private calculateDelayService: CalculateDelayService + @Inject(forwardRef(() => ComputeJobWaitDurationService)) + private computeJobWaitDurationService: ComputeJobWaitDurationService ) {} @InstrumentUsecase() @@ -171,7 +171,7 @@ export class CreateNotificationJobs { const delay = delayedSteps .map((step) => - this.calculateDelayService.calculateDelay({ + this.computeJobWaitDurationService.calculateDelay({ stepMetadata: step.metadata, payload: command.payload, overrides: command.overrides, diff --git a/libs/testing/tsconfig.build.json b/libs/testing/tsconfig.build.json index 5313562485c..d5bc2cbf1d4 100644 --- a/libs/testing/tsconfig.build.json +++ b/libs/testing/tsconfig.build.json @@ -7,10 +7,8 @@ "outDir": "./dist", "rootDir": "./src", "esModuleInterop": false, + "sourceMap": true, "types": ["node"] }, - "include": [ - "src/**/*" - ] + "include": ["src/**/*"] } - diff --git a/libs/testing/tsconfig.json b/libs/testing/tsconfig.json index 483cdf2122e..8ccb331c6fa 100644 --- a/libs/testing/tsconfig.json +++ b/libs/testing/tsconfig.json @@ -3,6 +3,7 @@ "compilerOptions": { "strictNullChecks": true, "types": ["node"], - "esModuleInterop": false + "esModuleInterop": false, + "sourceMap": true } } diff --git a/packages/echo/src/client.test.ts b/packages/echo/src/client.test.ts index 21c399beb3c..4092d648d5c 100644 --- a/packages/echo/src/client.test.ts +++ b/packages/echo/src/client.test.ts @@ -8,7 +8,10 @@ import { StepNotFoundError, WorkflowNotFoundError, } from './errors'; -import { IEvent } from './types'; +import { IEvent, Step } from './types'; +import { delayOutputSchema } from './schemas'; +import { FromSchema } from 'json-schema-to-ts'; +import { emailChannelSchemas } from './schemas/steps/channels/email.schema'; describe('Echo Client', () => { let echo: Echo; @@ -157,6 +160,7 @@ describe('Echo Client', () => { })); await step.delay('delay', async () => ({ + type: 'regular', amount: 1, unit: 'hours', })); @@ -342,11 +346,17 @@ describe('Echo Client', () => { beforeEach(() => {}); it('should execute workflow successfully when action is execute and data is provided', async () => { + const delayConfiguration: FromSchema = { type: 'regular', unit: 'seconds', amount: 1 }; + const emailConfiguration: FromSchema = { + body: 'Test Body', + subject: 'Subject', + }; await echo.workflow('test-workflow', async ({ step }) => { - await step.email('send-email', async () => ({ body: 'Test Body', subject: 'Subject' })); + await step.email('send-email', async () => emailConfiguration); + await step.delay('delay', async () => delayConfiguration); }); - const event: IEvent = { + const emailEvent: IEvent = { action: 'execute', data: {}, workflowId: 'test-workflow', @@ -356,24 +366,52 @@ describe('Echo Client', () => { inputs: {}, }; - const executionResult = await echo.executeWorkflow(event); - - expect(executionResult).toBeDefined(); - expect(executionResult.outputs).toBeDefined(); - if (!executionResult.outputs) throw new Error('executionResult.outputs is undefined'); - - const body = (executionResult.outputs as any).body as string; - expect(body).toBe('Test Body'); - - const subject = (executionResult.outputs as any).subject as string; - expect(subject).toBe('Subject'); - - expect(executionResult.providers).toEqual({}); - - const metadata = executionResult.metadata; + const emailExecutionResult = await echo.executeWorkflow(emailEvent); + + expect(emailExecutionResult).toBeDefined(); + expect(emailExecutionResult.outputs).toBeDefined(); + if (!emailExecutionResult.outputs) throw new Error('executionResult.outputs is undefined'); + const body = (emailExecutionResult.outputs as any).body as string; + expect(body).toBe(emailConfiguration.body); + const subject = (emailExecutionResult.outputs as any).subject as string; + expect(subject).toBe(emailConfiguration.subject); + expect(emailExecutionResult.providers).toEqual({}); + const metadata = emailExecutionResult.metadata; expect(metadata.status).toBe('success'); expect(metadata.error).toBe(false); expect(metadata.duration).toEqual(expect.any(Number)); + + const delayEvent: IEvent = { + action: 'execute', + data: {}, + workflowId: 'test-workflow', + stepId: 'delay', + subscriber: {}, + state: [ + { + stepId: 'send-email', + outputs: {}, + state: { + status: 'completed', + error: undefined, + }, + }, + ], + inputs: {}, + }; + + const delayExecutionResult = await echo.executeWorkflow(delayEvent); + + expect(delayExecutionResult).toBeDefined(); + expect(delayExecutionResult.outputs).toBeDefined(); + if (!delayExecutionResult.outputs) throw new Error('executionResult.outputs is undefined'); + const unit = (delayExecutionResult.outputs as any).unit as string; + expect(unit).toBe(delayConfiguration.unit); + const amount = (delayExecutionResult.outputs as any).amount as string; + expect(amount).toBe(delayConfiguration.amount); + expect(delayExecutionResult.providers).toEqual({}); + const type = (delayExecutionResult.outputs as any).type as string; + expect(type).toBe(delayConfiguration.type); }); it('should throw error on execute action without data', async () => { diff --git a/packages/echo/src/schemas/steps/actions/delay.schema.ts b/packages/echo/src/schemas/steps/actions/delay.schema.ts index f03ff3802e4..1c2500fd7af 100644 --- a/packages/echo/src/schemas/steps/actions/delay.schema.ts +++ b/packages/echo/src/schemas/steps/actions/delay.schema.ts @@ -3,6 +3,10 @@ import { Schema } from '../../../types/schema.types'; export const delayOutputSchema = { type: 'object', properties: { + type: { + enum: ['regular'], + default: 'regular', + }, amount: { type: 'number' }, unit: { type: 'string',