From c3dfd912e6c57e09d06db1694cfa756c6a59ac37 Mon Sep 17 00:00:00 2001 From: p-fernandez Date: Mon, 25 Sep 2023 09:58:27 +0100 Subject: [PATCH] feat(worker): refactor get digest events flow --- .../digest/digest-events.command.ts | 11 +++++ .../send-message/digest/digest.usecase.ts | 28 +++++++++--- .../get-digest-events-backoff.usecase.ts | 23 ++++------ .../get-digest-events-regular.usecase.ts | 45 +++++++++---------- .../digest/get-digest-events.usecase.ts | 26 ++++++++++- .../digest-filter-steps.usecase.ts | 5 ++- 6 files changed, 91 insertions(+), 47 deletions(-) create mode 100644 apps/worker/src/app/workflow/usecases/send-message/digest/digest-events.command.ts diff --git a/apps/worker/src/app/workflow/usecases/send-message/digest/digest-events.command.ts b/apps/worker/src/app/workflow/usecases/send-message/digest/digest-events.command.ts new file mode 100644 index 00000000000..223c6b9ea1a --- /dev/null +++ b/apps/worker/src/app/workflow/usecases/send-message/digest/digest-events.command.ts @@ -0,0 +1,11 @@ +import { IsDefined } from 'class-validator'; +import { JobEntity } from '@novu/dal'; +import { BaseCommand } from '@novu/application-generic'; + +export class DigestEventsCommand extends BaseCommand { + @IsDefined() + _subscriberId: string; + + @IsDefined() + currentJob: JobEntity; +} diff --git a/apps/worker/src/app/workflow/usecases/send-message/digest/digest.usecase.ts b/apps/worker/src/app/workflow/usecases/send-message/digest/digest.usecase.ts index 8b7b607f35c..37a7f88c307 100644 --- a/apps/worker/src/app/workflow/usecases/send-message/digest/digest.usecase.ts +++ b/apps/worker/src/app/workflow/usecases/send-message/digest/digest.usecase.ts @@ -1,4 +1,4 @@ -import { Injectable } from '@nestjs/common'; +import { Injectable, Logger } from '@nestjs/common'; import { MessageRepository, JobRepository, JobStatusEnum } from '@novu/dal'; import { StepTypeEnum, @@ -9,11 +9,17 @@ import { } from '@novu/shared'; import { DetailEnum, CreateExecutionDetails, CreateExecutionDetailsCommand } from '@novu/application-generic'; +import { DigestEventsCommand } from './digest-events.command'; +import { GetDigestEventsRegular } from './get-digest-events-regular.usecase'; +import { GetDigestEventsBackoff } from './get-digest-events-backoff.usecase'; + import { CreateLog } from '../../../../shared/logs'; +import { PlatformException } from '../../../../shared/utils'; + import { SendMessageCommand } from '../send-message.command'; import { SendMessageType } from '../send-message-type.usecase'; -import { GetDigestEventsRegular } from './get-digest-events-regular.usecase'; -import { GetDigestEventsBackoff } from './get-digest-events-backoff.usecase'; + +const LOG_CONTEXT = 'Digest'; @Injectable() export class Digest extends SendMessageType { @@ -64,14 +70,26 @@ export class Digest extends SendMessageType { private async getEvents(command: SendMessageCommand) { const currentJob = await this.jobRepository.findOne({ _environmentId: command.environmentId, _id: command.jobId }); + if (!currentJob) { + const message = `Digest job ${command.jobId} is not found`; + Logger.error(message, LOG_CONTEXT); + throw new PlatformException(message); + } + + const digestEventsCommand = DigestEventsCommand.create({ + currentJob, + // backward compatibility - ternary needed to be removed once the queue renewed + _subscriberId: command._subscriberId ? command._subscriberId : command.subscriberId, + }); + if ( currentJob?.digest?.type === DigestTypeEnum.BACKOFF || (currentJob?.digest as IDigestRegularMetadata)?.backoff ) { - return this.getDigestEventsBackoff.execute(command); + return this.getDigestEventsBackoff.execute(digestEventsCommand); } - return this.getDigestEventsRegular.execute(command); + return this.getDigestEventsRegular.execute(digestEventsCommand); } private async getJobsToUpdate(command: SendMessageCommand) { diff --git a/apps/worker/src/app/workflow/usecases/send-message/digest/get-digest-events-backoff.usecase.ts b/apps/worker/src/app/workflow/usecases/send-message/digest/get-digest-events-backoff.usecase.ts index 0b9aadcfa61..59e5e1eb632 100644 --- a/apps/worker/src/app/workflow/usecases/send-message/digest/get-digest-events-backoff.usecase.ts +++ b/apps/worker/src/app/workflow/usecases/send-message/digest/get-digest-events-backoff.usecase.ts @@ -1,22 +1,18 @@ import { Injectable } from '@nestjs/common'; import { JobStatusEnum } from '@novu/dal'; -import { IDigestRegularMetadata, StepTypeEnum } from '@novu/shared'; -import { DigestFilterSteps, InstrumentUsecase } from '@novu/application-generic'; +import { StepTypeEnum } from '@novu/shared'; +import { InstrumentUsecase } from '@novu/application-generic'; -import { SendMessageCommand } from '../send-message.command'; +import { DigestEventsCommand } from './digest-events.command'; import { GetDigestEvents } from './get-digest-events.usecase'; -import { PlatformException } from '../../../../shared/utils'; @Injectable() export class GetDigestEventsBackoff extends GetDigestEvents { @InstrumentUsecase() - public async execute(command: SendMessageCommand) { - const currentJob = await this.jobRepository.findOne({ _environmentId: command.environmentId, _id: command.jobId }); - if (!currentJob) throw new PlatformException('Digest job is not found'); + public async execute(command: DigestEventsCommand) { + const currentJob = command.currentJob; - const digestMeta = currentJob.digest as IDigestRegularMetadata | undefined; - const digestKey = digestMeta?.digestKey; - const digestValue = DigestFilterSteps.getNestedValue(currentJob.payload, digestKey); + const { digestKey, digestMeta, digestValue } = this.getJobDigest(currentJob); const jobs = await this.jobRepository.find({ createdAt: { @@ -25,12 +21,11 @@ export class GetDigestEventsBackoff extends GetDigestEvents { _templateId: currentJob._templateId, status: JobStatusEnum.COMPLETED, type: StepTypeEnum.TRIGGER, - _environmentId: command.environmentId, + _environmentId: currentJob._environmentId, ...(digestKey && { [`payload.${digestKey}`]: digestValue }), - // backward compatibility - ternary needed to be removed once the queue renewed - _subscriberId: command._subscriberId ? command._subscriberId : command.subscriberId, + _subscriberId: command._subscriberId, }); - return this.filterJobs(currentJob, command.transactionId, jobs); + return this.filterJobs(currentJob, currentJob.transactionId, jobs); } } diff --git a/apps/worker/src/app/workflow/usecases/send-message/digest/get-digest-events-regular.usecase.ts b/apps/worker/src/app/workflow/usecases/send-message/digest/get-digest-events-regular.usecase.ts index fc0fa8d9ad4..975807fd4bf 100644 --- a/apps/worker/src/app/workflow/usecases/send-message/digest/get-digest-events-regular.usecase.ts +++ b/apps/worker/src/app/workflow/usecases/send-message/digest/get-digest-events-regular.usecase.ts @@ -1,47 +1,42 @@ /* eslint-disable @typescript-eslint/ban-ts-comment */ import { Injectable } from '@nestjs/common'; import { sub } from 'date-fns'; -import { IDigestRegularMetadata } from '@novu/shared'; -import { DigestFilterSteps, InstrumentUsecase } from '@novu/application-generic'; +import { InstrumentUsecase } from '@novu/application-generic'; -import { PlatformException } from '../../../../shared/utils'; -import { SendMessageCommand } from '../send-message.command'; +import { DigestEventsCommand } from './digest-events.command'; import { GetDigestEvents } from './get-digest-events.usecase'; @Injectable() export class GetDigestEventsRegular extends GetDigestEvents { @InstrumentUsecase() - public async execute(command: SendMessageCommand) { - const currentJob = await this.jobRepository.findOne({ - _environmentId: command.environmentId, - _id: command.jobId, - }); - if (!currentJob) throw new PlatformException('Digest job is not found'); + public async execute(command: DigestEventsCommand) { + const currentJob = command.currentJob; - const digestMeta = currentJob.digest as IDigestRegularMetadata | undefined; - const amount = - typeof digestMeta?.amount === 'number' + const { digestKey, digestMeta, digestValue } = this.getJobDigest(currentJob); + + const amount = digestMeta + ? typeof digestMeta.amount === 'number' ? digestMeta.amount - : // @ts-ignore - parseInt(digestMeta?.amount, 10); - const earliest = sub(new Date(currentJob.createdAt), { - // @ts-ignore - [digestMeta?.unit]: amount, - }); + : parseInt(digestMeta.amount, 10) + : undefined; - const digestKey = digestMeta?.digestKey; - const digestValue = DigestFilterSteps.getNestedValue(currentJob.payload, digestKey); + const createdDate = new Date(currentJob.createdAt); + const subtractedTime = digestMeta + ? { + [digestMeta.unit]: amount, + } + : {}; + const earliest = sub(new Date(currentJob.createdAt), subtractedTime); const jobs = await this.jobRepository.findJobsToDigest( earliest, currentJob._templateId, - command.environmentId, - // backward compatibility - ternary needed to be removed once the queue renewed - command._subscriberId ? command._subscriberId : command.subscriberId, + currentJob._environmentId, + command._subscriberId, digestKey, digestValue ); - return this.filterJobs(currentJob, command.transactionId, jobs); + return this.filterJobs(currentJob, currentJob.transactionId, jobs); } } diff --git a/apps/worker/src/app/workflow/usecases/send-message/digest/get-digest-events.usecase.ts b/apps/worker/src/app/workflow/usecases/send-message/digest/get-digest-events.usecase.ts index be40a7032b3..9dba660a1c2 100644 --- a/apps/worker/src/app/workflow/usecases/send-message/digest/get-digest-events.usecase.ts +++ b/apps/worker/src/app/workflow/usecases/send-message/digest/get-digest-events.usecase.ts @@ -1,9 +1,11 @@ -import { Injectable } from '@nestjs/common'; +import { Injectable, Logger } from '@nestjs/common'; import { JobRepository, JobEntity } from '@novu/dal'; import { + EnvironmentId, ExecutionDetailsSourceEnum, ExecutionDetailsStatusEnum, IDigestBaseMetadata, + IDigestRegularMetadata, StepTypeEnum, } from '@novu/shared'; import { @@ -16,10 +18,28 @@ import { import { PlatformException } from '../../../../shared/utils'; +const LOG_CONTEXT = 'GetDigestEvents'; + @Injectable() export abstract class GetDigestEvents { constructor(protected jobRepository: JobRepository, private createExecutionDetails: CreateExecutionDetails) {} + protected getJobDigest(job: JobEntity): { + digestMeta: IDigestBaseMetadata | undefined; + digestKey: string | undefined; + digestValue: string | undefined; + } { + const digestMeta = job.digest as IDigestRegularMetadata | undefined; + const digestKey = digestMeta?.digestKey; + const digestValue = DigestFilterSteps.getNestedValue(job.payload, digestKey); + + return { + digestKey, + digestMeta, + digestValue, + }; + } + @Instrument() protected async filterJobs(currentJob: JobEntity, transactionId: string, jobs: JobEntity[]) { const digestMeta = currentJob?.digest as IDigestBaseMetadata | undefined; @@ -51,7 +71,9 @@ export abstract class GetDigestEvents { isRetry: false, }) ); - throw new PlatformException('Trigger job is not found'); + const message = `Trigger job for jobId ${currentJob._id} is not found`; + Logger.error(message, LOG_CONTEXT); + throw new PlatformException(message); } const events = [ diff --git a/packages/application-generic/src/usecases/digest-filter-steps/digest-filter-steps.usecase.ts b/packages/application-generic/src/usecases/digest-filter-steps/digest-filter-steps.usecase.ts index 7e2c2d52bec..bea403dd0ad 100644 --- a/packages/application-generic/src/usecases/digest-filter-steps/digest-filter-steps.usecase.ts +++ b/packages/application-generic/src/usecases/digest-filter-steps/digest-filter-steps.usecase.ts @@ -59,7 +59,10 @@ export class DigestFilterSteps { }; } - public static getNestedValue(payload: ObjectType, path?: string) { + public static getNestedValue( + payload: ObjectType, + path?: string + ): ObjectType | undefined { if (!path || !payload) { return undefined; }