From 8678c2139fd95ebd33b93aca73031a3c313e45fd Mon Sep 17 00:00:00 2001 From: Richard Fontein <32132657+rifont@users.noreply.github.com> Date: Tue, 16 Apr 2024 17:37:03 +0100 Subject: [PATCH] fix(worker): Provide correct execution context to Echo endpoints --- .source | 2 +- .../send-message/digest/digest.usecase.ts | 4 +++- .../send-message/send-message.usecase.ts | 17 +++++++------ .../usecases/add-job/add-delay-job.usecase.ts | 9 ++++++- .../src/usecases/add-job/add-job.usecase.ts | 24 ++++++++++++++----- .../add-job/merge-or-create-digest.usecase.ts | 8 +++++-- 6 files changed, 46 insertions(+), 18 deletions(-) diff --git a/.source b/.source index c7398957a7a..f4c7ef42b87 160000 --- a/.source +++ b/.source @@ -1 +1 @@ -Subproject commit c7398957a7aabb717dab081d59b1da1750f4b7de +Subproject commit f4c7ef42b873d5c3a67d561f36258bb313414a91 diff --git a/apps/worker/src/app/workflow/usecases/send-message/digest/digest.usecase.ts b/apps/worker/src/app/workflow/usecases/send-message/digest/digest.usecase.ts index 342a055c0b3..28e1827ca9d 100644 --- a/apps/worker/src/app/workflow/usecases/send-message/digest/digest.usecase.ts +++ b/apps/worker/src/app/workflow/usecases/send-message/digest/digest.usecase.ts @@ -71,11 +71,13 @@ export class Digest extends SendMessageType { }) ); + const jobsToUpdate = [...nextJobs.map((job) => job._id), command.job._id]; + await this.jobRepository.update( { _environmentId: command.environmentId, _id: { - $in: nextJobs.map((job) => job._id), + $in: jobsToUpdate, }, }, { diff --git a/apps/worker/src/app/workflow/usecases/send-message/send-message.usecase.ts b/apps/worker/src/app/workflow/usecases/send-message/send-message.usecase.ts index 5cfe288974a..febe515fcca 100644 --- a/apps/worker/src/app/workflow/usecases/send-message/send-message.usecase.ts +++ b/apps/worker/src/app/workflow/usecases/send-message/send-message.usecase.ts @@ -88,13 +88,16 @@ export class SendMessage { const stepType = command.step?.template?.type; - const chimeraResponse = await this.chimeraConnector.execute< - SendMessageCommand & { variables: IFilterVariables }, - ExecuteOutput | null - >({ - ...command, - variables: shouldRun.variables, - }); + let chimeraResponse: ExecuteOutput | null = null; + if (!['digest', 'delay'].includes(stepType as any)) { + chimeraResponse = await this.chimeraConnector.execute< + SendMessageCommand & { variables: IFilterVariables }, + ExecuteOutput | null + >({ + ...command, + variables: shouldRun.variables, + }); + } if (!command.payload?.$on_boarding_trigger) { const usedFilters = shouldRun?.conditions.reduce(ConditionsFilter.sumFilters, { diff --git a/packages/application-generic/src/usecases/add-job/add-delay-job.usecase.ts b/packages/application-generic/src/usecases/add-job/add-delay-job.usecase.ts index 8462de9586c..b6efe9d3e31 100644 --- a/packages/application-generic/src/usecases/add-job/add-delay-job.usecase.ts +++ b/packages/application-generic/src/usecases/add-job/add-delay-job.usecase.ts @@ -2,6 +2,7 @@ import { forwardRef, Inject, Injectable, Logger } from '@nestjs/common'; import { JobRepository, JobStatusEnum } from '@novu/dal'; import { + DelayTypeEnum, ExecutionDetailsSourceEnum, ExecutionDetailsStatusEnum, StepTypeEnum, @@ -46,7 +47,13 @@ export class AddDelayJob { stepMetadata: data.step.metadata, payload: data.payload, overrides: data.overrides, - chimeraResponse: command.chimeraResponse?.outputs, + // TODO: Remove fallback after other delay types are implemented. + chimeraResponse: command.chimeraResponse?.outputs + ? { + type: DelayTypeEnum.REGULAR, + ...command.chimeraResponse?.outputs, + } + : undefined, }); await this.jobRepository.updateStatus( diff --git a/packages/application-generic/src/usecases/add-job/add-job.usecase.ts b/packages/application-generic/src/usecases/add-job/add-job.usecase.ts index 482bf453622..a433019f9dc 100644 --- a/packages/application-generic/src/usecases/add-job/add-job.usecase.ts +++ b/packages/application-generic/src/usecases/add-job/add-job.usecase.ts @@ -5,6 +5,7 @@ import { ExecutionDetailsStatusEnum, StepTypeEnum, DigestCreationResultEnum, + DigestTypeEnum, } from '@novu/shared'; import { AddDelayJob } from './add-delay-job.usecase'; @@ -35,6 +36,7 @@ import { IUseCaseInterfaceInline, requireInject, } from '../../utils/require-inject'; +import { IFilterVariables } from '../../utils/filter-processing-details'; export enum BackoffStrategiesEnum { WEBHOOK_FILTER_BACKOFF = 'webhookFilterBackoff', @@ -85,7 +87,7 @@ export class AddJob { ); let filtered = false; - + let filterVariables: IFilterVariables | undefined; if ( [StepTypeEnum.DELAY, StepTypeEnum.DIGEST].includes( job.type as StepTypeEnum @@ -102,6 +104,7 @@ export class AddJob { }) ); + filterVariables = shouldRun.variables; filtered = !shouldRun.passed; } @@ -109,9 +112,12 @@ export class AddJob { let digestCreationResult: DigestCreationResultEnum | undefined; if (job.type === StepTypeEnum.DIGEST) { const chimeraResponse = await this.chimeraConnector.execute< - AddJobCommand, + AddJobCommand & { variables: IFilterVariables }, ExecuteOutput - >(command); + >({ + ...command, + variables: filterVariables, + }); validateDigest(job); @@ -119,7 +125,10 @@ export class AddJob { stepMetadata: job.digest, payload: job.payload, overrides: job.overrides, - chimeraResponse: chimeraResponse?.outputs, + // TODO: Remove fallback after other digest types are implemented. + chimeraResponse: chimeraResponse + ? { type: DigestTypeEnum.REGULAR, ...chimeraResponse.outputs } + : undefined, }); Logger.debug(`Digest step amount is: ${digestAmount}`, LOG_CONTEXT); @@ -164,9 +173,12 @@ export class AddJob { if (job.type === StepTypeEnum.DELAY) { const chimeraResponse = await this.chimeraConnector.execute< - AddJobCommand, + AddJobCommand & { variables: IFilterVariables }, ExecuteOutput - >(command); + >({ + ...command, + variables: filterVariables, + }); command.chimeraResponse = chimeraResponse; delayAmount = await this.addDelayJob.execute(command); diff --git a/packages/application-generic/src/usecases/add-job/merge-or-create-digest.usecase.ts b/packages/application-generic/src/usecases/add-job/merge-or-create-digest.usecase.ts index a0fabba8b0c..9a8600c4775 100644 --- a/packages/application-generic/src/usecases/add-job/merge-or-create-digest.usecase.ts +++ b/packages/application-generic/src/usecases/add-job/merge-or-create-digest.usecase.ts @@ -50,7 +50,8 @@ export class MergeOrCreateDigest { ): Promise { const { job } = command; - const digestMeta = job.digest as IDigestBaseMetadata | undefined; + const digestMeta = + command.chimeraData ?? (job.digest as IDigestBaseMetadata | undefined); const digestKey = command.chimeraData?.digestKey ?? digestMeta?.digestKey; const digestValue = getNestedValue(job.payload, digestKey); @@ -73,7 +74,10 @@ export class MergeOrCreateDigest { case DigestCreationResultEnum.SKIPPED: return await this.processSkippedDigest(job, command.filtered); case DigestCreationResultEnum.CREATED: - return await this.processCreatedDigest(digestMeta, job); + return await this.processCreatedDigest( + digestMeta as IDigestBaseMetadata, + job + ); default: throw new ApiException('Something went wrong with digest creation'); }