Skip to content

Commit

Permalink
Merge pull request #4200 from novuhq/feat-digest-events-refactor
Browse files Browse the repository at this point in the history
feat(worker): refactor get digest events flow
  • Loading branch information
Pablo Fernández authored Sep 25, 2023
2 parents fda0dc9 + c3dfd91 commit c917be5
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 47 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import { IsDefined } from 'class-validator';
import { JobEntity } from '@novu/dal';
import { BaseCommand } from '@novu/application-generic';

export class DigestEventsCommand extends BaseCommand {
@IsDefined()
_subscriberId: string;

@IsDefined()
currentJob: JobEntity;
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Injectable } from '@nestjs/common';
import { Injectable, Logger } from '@nestjs/common';
import { MessageRepository, JobRepository, JobStatusEnum } from '@novu/dal';
import {
StepTypeEnum,
Expand All @@ -9,11 +9,17 @@ import {
} from '@novu/shared';
import { DetailEnum, CreateExecutionDetails, CreateExecutionDetailsCommand } from '@novu/application-generic';

import { DigestEventsCommand } from './digest-events.command';
import { GetDigestEventsRegular } from './get-digest-events-regular.usecase';
import { GetDigestEventsBackoff } from './get-digest-events-backoff.usecase';

import { CreateLog } from '../../../../shared/logs';
import { PlatformException } from '../../../../shared/utils';

import { SendMessageCommand } from '../send-message.command';
import { SendMessageType } from '../send-message-type.usecase';
import { GetDigestEventsRegular } from './get-digest-events-regular.usecase';
import { GetDigestEventsBackoff } from './get-digest-events-backoff.usecase';

const LOG_CONTEXT = 'Digest';

@Injectable()
export class Digest extends SendMessageType {
Expand Down Expand Up @@ -64,14 +70,26 @@ export class Digest extends SendMessageType {
private async getEvents(command: SendMessageCommand) {
const currentJob = await this.jobRepository.findOne({ _environmentId: command.environmentId, _id: command.jobId });

if (!currentJob) {
const message = `Digest job ${command.jobId} is not found`;
Logger.error(message, LOG_CONTEXT);
throw new PlatformException(message);
}

const digestEventsCommand = DigestEventsCommand.create({
currentJob,
// backward compatibility - ternary needed to be removed once the queue renewed
_subscriberId: command._subscriberId ? command._subscriberId : command.subscriberId,
});

if (
currentJob?.digest?.type === DigestTypeEnum.BACKOFF ||
(currentJob?.digest as IDigestRegularMetadata)?.backoff
) {
return this.getDigestEventsBackoff.execute(command);
return this.getDigestEventsBackoff.execute(digestEventsCommand);
}

return this.getDigestEventsRegular.execute(command);
return this.getDigestEventsRegular.execute(digestEventsCommand);
}

private async getJobsToUpdate(command: SendMessageCommand) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,18 @@
import { Injectable } from '@nestjs/common';
import { JobStatusEnum } from '@novu/dal';
import { IDigestRegularMetadata, StepTypeEnum } from '@novu/shared';
import { DigestFilterSteps, InstrumentUsecase } from '@novu/application-generic';
import { StepTypeEnum } from '@novu/shared';
import { InstrumentUsecase } from '@novu/application-generic';

import { SendMessageCommand } from '../send-message.command';
import { DigestEventsCommand } from './digest-events.command';
import { GetDigestEvents } from './get-digest-events.usecase';
import { PlatformException } from '../../../../shared/utils';

@Injectable()
export class GetDigestEventsBackoff extends GetDigestEvents {
@InstrumentUsecase()
public async execute(command: SendMessageCommand) {
const currentJob = await this.jobRepository.findOne({ _environmentId: command.environmentId, _id: command.jobId });
if (!currentJob) throw new PlatformException('Digest job is not found');
public async execute(command: DigestEventsCommand) {
const currentJob = command.currentJob;

const digestMeta = currentJob.digest as IDigestRegularMetadata | undefined;
const digestKey = digestMeta?.digestKey;
const digestValue = DigestFilterSteps.getNestedValue(currentJob.payload, digestKey);
const { digestKey, digestMeta, digestValue } = this.getJobDigest(currentJob);

const jobs = await this.jobRepository.find({
createdAt: {
Expand All @@ -25,12 +21,11 @@ export class GetDigestEventsBackoff extends GetDigestEvents {
_templateId: currentJob._templateId,
status: JobStatusEnum.COMPLETED,
type: StepTypeEnum.TRIGGER,
_environmentId: command.environmentId,
_environmentId: currentJob._environmentId,
...(digestKey && { [`payload.${digestKey}`]: digestValue }),
// backward compatibility - ternary needed to be removed once the queue renewed
_subscriberId: command._subscriberId ? command._subscriberId : command.subscriberId,
_subscriberId: command._subscriberId,
});

return this.filterJobs(currentJob, command.transactionId, jobs);
return this.filterJobs(currentJob, currentJob.transactionId, jobs);
}
}
Original file line number Diff line number Diff line change
@@ -1,47 +1,42 @@
/* eslint-disable @typescript-eslint/ban-ts-comment */
import { Injectable } from '@nestjs/common';
import { sub } from 'date-fns';
import { IDigestRegularMetadata } from '@novu/shared';
import { DigestFilterSteps, InstrumentUsecase } from '@novu/application-generic';
import { InstrumentUsecase } from '@novu/application-generic';

import { PlatformException } from '../../../../shared/utils';
import { SendMessageCommand } from '../send-message.command';
import { DigestEventsCommand } from './digest-events.command';
import { GetDigestEvents } from './get-digest-events.usecase';

@Injectable()
export class GetDigestEventsRegular extends GetDigestEvents {
@InstrumentUsecase()
public async execute(command: SendMessageCommand) {
const currentJob = await this.jobRepository.findOne({
_environmentId: command.environmentId,
_id: command.jobId,
});
if (!currentJob) throw new PlatformException('Digest job is not found');
public async execute(command: DigestEventsCommand) {
const currentJob = command.currentJob;

const digestMeta = currentJob.digest as IDigestRegularMetadata | undefined;
const amount =
typeof digestMeta?.amount === 'number'
const { digestKey, digestMeta, digestValue } = this.getJobDigest(currentJob);

const amount = digestMeta
? typeof digestMeta.amount === 'number'
? digestMeta.amount
: // @ts-ignore
parseInt(digestMeta?.amount, 10);
const earliest = sub(new Date(currentJob.createdAt), {
// @ts-ignore
[digestMeta?.unit]: amount,
});
: parseInt(digestMeta.amount, 10)
: undefined;

const digestKey = digestMeta?.digestKey;
const digestValue = DigestFilterSteps.getNestedValue(currentJob.payload, digestKey);
const createdDate = new Date(currentJob.createdAt);
const subtractedTime = digestMeta
? {
[digestMeta.unit]: amount,
}
: {};
const earliest = sub(new Date(currentJob.createdAt), subtractedTime);

const jobs = await this.jobRepository.findJobsToDigest(
earliest,
currentJob._templateId,
command.environmentId,
// backward compatibility - ternary needed to be removed once the queue renewed
command._subscriberId ? command._subscriberId : command.subscriberId,
currentJob._environmentId,
command._subscriberId,
digestKey,
digestValue
);

return this.filterJobs(currentJob, command.transactionId, jobs);
return this.filterJobs(currentJob, currentJob.transactionId, jobs);
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import { Injectable } from '@nestjs/common';
import { Injectable, Logger } from '@nestjs/common';
import { JobRepository, JobEntity } from '@novu/dal';
import {
EnvironmentId,
ExecutionDetailsSourceEnum,
ExecutionDetailsStatusEnum,
IDigestBaseMetadata,
IDigestRegularMetadata,
StepTypeEnum,
} from '@novu/shared';
import {
Expand All @@ -16,10 +18,28 @@ import {

import { PlatformException } from '../../../../shared/utils';

const LOG_CONTEXT = 'GetDigestEvents';

@Injectable()
export abstract class GetDigestEvents {
constructor(protected jobRepository: JobRepository, private createExecutionDetails: CreateExecutionDetails) {}

protected getJobDigest(job: JobEntity): {
digestMeta: IDigestBaseMetadata | undefined;
digestKey: string | undefined;
digestValue: string | undefined;
} {
const digestMeta = job.digest as IDigestRegularMetadata | undefined;
const digestKey = digestMeta?.digestKey;
const digestValue = DigestFilterSteps.getNestedValue(job.payload, digestKey);

return {
digestKey,
digestMeta,
digestValue,
};
}

@Instrument()
protected async filterJobs(currentJob: JobEntity, transactionId: string, jobs: JobEntity[]) {
const digestMeta = currentJob?.digest as IDigestBaseMetadata | undefined;
Expand Down Expand Up @@ -51,7 +71,9 @@ export abstract class GetDigestEvents {
isRetry: false,
})
);
throw new PlatformException('Trigger job is not found');
const message = `Trigger job for jobId ${currentJob._id} is not found`;
Logger.error(message, LOG_CONTEXT);
throw new PlatformException(message);
}

const events = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,10 @@ export class DigestFilterSteps {
};
}

public static getNestedValue<ObjectType>(payload: ObjectType, path?: string) {
public static getNestedValue<ObjectType>(
payload: ObjectType,
path?: string
): ObjectType | undefined {
if (!path || !payload) {
return undefined;
}
Expand Down

0 comments on commit c917be5

Please sign in to comment.