From e65a2241eb123d30e0bc635e02deb6ab0d0fdfb6 Mon Sep 17 00:00:00 2001 From: p-fernandez Date: Mon, 16 Oct 2023 14:47:19 +0100 Subject: [PATCH] feat(worker): manage default workers config through env variables --- .../services/inbound-parse.queue.service.ts | 6 +- apps/api/src/config/env-validator.ts | 11 +- apps/inbound-mail/src/config/env-validator.ts | 11 +- apps/webhook/src/config/index.ts | 5 +- .../app/workflow/services/standard.worker.ts | 4 +- .../services/subscriber-process.worker.ts | 9 +- .../app/workflow/services/workflow.worker.ts | 6 +- apps/worker/src/config/env-validator.ts | 5 +- apps/ws/src/config/index.ts | 11 +- .../src/socket/services/web-socket.worker.ts | 14 ++- .../application-generic/src/config/index.ts | 1 + .../src/config/workers.config.spec.ts | 119 ++++++++++++++++++ .../application-generic/src/config/workers.ts | 66 ++++++++++ packages/application-generic/src/index.ts | 1 + 14 files changed, 233 insertions(+), 36 deletions(-) create mode 100644 packages/application-generic/src/config/index.ts create mode 100644 packages/application-generic/src/config/workers.config.spec.ts create mode 100644 packages/application-generic/src/config/workers.ts diff --git a/apps/api/src/app/inbound-parse/services/inbound-parse.queue.service.ts b/apps/api/src/app/inbound-parse/services/inbound-parse.queue.service.ts index dc4b3cf24ca..208ede0aff3 100644 --- a/apps/api/src/app/inbound-parse/services/inbound-parse.queue.service.ts +++ b/apps/api/src/app/inbound-parse/services/inbound-parse.queue.service.ts @@ -1,4 +1,5 @@ import { + getInboundParseMailWorkerOptions, InboundParseQueue, InboundParseWorker, Queue, @@ -29,10 +30,7 @@ export class InboundParseQueueService { } private getWorkerOptions(): WorkerOptions { - return { - lockDuration: 90000, - concurrency: 200, - }; + return getInboundParseMailWorkerOptions(); } public getWorkerProcessor() { diff --git a/apps/api/src/config/env-validator.ts b/apps/api/src/config/env-validator.ts index 1bcf1ef6fb1..641ae7b9e45 100644 --- a/apps/api/src/config/env-validator.ts +++ b/apps/api/src/config/env-validator.ts @@ -1,5 +1,4 @@ -import { bool, json, makeValidator, port, str, num, url, ValidatorSpec } from 'envalid'; -import * as envalid from 'envalid'; +import { bool, cleanEnv, json, makeValidator, port, str, num, url, ValidatorSpec } from 'envalid'; const str32 = makeValidator((variable) => { if (!(typeof variable === 'string') || variable.length != 32) { @@ -73,6 +72,12 @@ const validators: { [K in keyof any]: ValidatorSpec } = { LAUNCH_DARKLY_SDK_KEY: str({ default: '', }), + WORKER_DEFAULT_CONCURRENCY: num({ + default: undefined, + }), + WORKER_DEFAULT_LOCK_DURATION: num({ + default: undefined, + }), }; if (process.env.STORAGE_SERVICE === 'AZURE') { @@ -120,5 +125,5 @@ if (process.env.NODE_ENV !== 'local' && process.env.NODE_ENV !== 'test') { } export function validateEnv() { - envalid.cleanEnv(process.env, validators); + cleanEnv(process.env, validators); } diff --git a/apps/inbound-mail/src/config/env-validator.ts b/apps/inbound-mail/src/config/env-validator.ts index 929d4e82f35..526933a74ef 100644 --- a/apps/inbound-mail/src/config/env-validator.ts +++ b/apps/inbound-mail/src/config/env-validator.ts @@ -1,5 +1,4 @@ -import { json, port, str, ValidatorSpec } from 'envalid'; -import * as envalid from 'envalid'; +import { cleanEnv, json, num, port, str, ValidatorSpec } from 'envalid'; // eslint-disable-next-line @typescript-eslint/no-explicit-any const validators: { [K in keyof any]: ValidatorSpec } = { @@ -12,8 +11,14 @@ const validators: { [K in keyof any]: ValidatorSpec } = { REDIS_TLS: json({ default: undefined, }), + WORKER_DEFAULT_CONCURRENCY: num({ + default: undefined, + }), + WORKER_DEFAULT_LOCK_DURATION: num({ + default: undefined, + }), }; export function validateEnv() { - envalid.cleanEnv(process.env, validators); + cleanEnv(process.env, validators); } diff --git a/apps/webhook/src/config/index.ts b/apps/webhook/src/config/index.ts index 0bb1d2e6c1c..8415f34f1d1 100644 --- a/apps/webhook/src/config/index.ts +++ b/apps/webhook/src/config/index.ts @@ -1,6 +1,5 @@ import * as dotenv from 'dotenv'; -import * as envalid from 'envalid'; -import { str, port } from 'envalid'; +import { cleanEnv, str, port } from 'envalid'; dotenv.config(); @@ -19,7 +18,7 @@ const { error } = dotenv.config({ path: pathToDotEnv }); if (error && !process.env.LAMBDA_TASK_ROOT) throw error; -envalid.cleanEnv(process.env, { +cleanEnv(process.env, { NODE_ENV: str({ choices: ['dev', 'test', 'production', 'ci', 'local'], default: 'local', diff --git a/apps/worker/src/app/workflow/services/standard.worker.ts b/apps/worker/src/app/workflow/services/standard.worker.ts index 42bf724c402..9253993de62 100644 --- a/apps/worker/src/app/workflow/services/standard.worker.ts +++ b/apps/worker/src/app/workflow/services/standard.worker.ts @@ -7,6 +7,7 @@ import { ObservabilityBackgroundTransactionEnum, } from '@novu/shared'; import { + getStandardWorkerOptions, INovuWorker, Job, PinoLogger, @@ -51,8 +52,7 @@ export class StandardWorker extends StandardWorkerService implements INovuWorker private getWorkerOptions(): WorkerOptions { return { - lockDuration: 90000, - concurrency: 200, + ...getStandardWorkerOptions(), settings: { backoffStrategy: this.getBackoffStrategies(), }, diff --git a/apps/worker/src/app/workflow/services/subscriber-process.worker.ts b/apps/worker/src/app/workflow/services/subscriber-process.worker.ts index ac114d90830..f70cb8574d4 100644 --- a/apps/worker/src/app/workflow/services/subscriber-process.worker.ts +++ b/apps/worker/src/app/workflow/services/subscriber-process.worker.ts @@ -2,12 +2,14 @@ import { Injectable, Logger } from '@nestjs/common'; const nr = require('newrelic'); import { ObservabilityBackgroundTransactionEnum } from '@novu/shared'; import { + getSubscriberProcessWorkerOptions, SubscriberJobBound, SubscriberProcessWorkerService, PinoLogger, SubscriberJobBoundCommand, storage, Store, + WorkerOptions, } from '@novu/application-generic'; const LOG_CONTEXT = 'SubscriberProcessWorker'; @@ -50,10 +52,7 @@ export class SubscriberProcessWorker extends SubscriberProcessWorkerService { }; } - private getWorkerOpts() { - return { - lockDuration: 90000, - concurrency: 200, - }; + private getWorkerOpts(): WorkerOptions { + return getSubscriberProcessWorkerOptions(); } } diff --git a/apps/worker/src/app/workflow/services/workflow.worker.ts b/apps/worker/src/app/workflow/services/workflow.worker.ts index 34c67d1dd45..7bd417a00ef 100644 --- a/apps/worker/src/app/workflow/services/workflow.worker.ts +++ b/apps/worker/src/app/workflow/services/workflow.worker.ts @@ -1,6 +1,7 @@ import { Injectable, Logger } from '@nestjs/common'; const nr = require('newrelic'); import { + getWorkflowWorkerOptions, INovuWorker, PinoLogger, storage, @@ -24,10 +25,7 @@ export class WorkflowWorker extends WorkflowWorkerService implements INovuWorker } private getWorkerOptions(): WorkerOptions { - return { - lockDuration: 90000, - concurrency: 200, - }; + return getWorkflowWorkerOptions(); } private getWorkerProcessor(): WorkerProcessor { diff --git a/apps/worker/src/config/env-validator.ts b/apps/worker/src/config/env-validator.ts index f0351ba7fae..b83a8c5b02a 100644 --- a/apps/worker/src/config/env-validator.ts +++ b/apps/worker/src/config/env-validator.ts @@ -1,5 +1,4 @@ -import { json, port, str, num, ValidatorSpec, makeValidator } from 'envalid'; -import * as envalid from 'envalid'; +import { cleanEnv, json, port, str, num, ValidatorSpec, makeValidator } from 'envalid'; const str32 = makeValidator((variable) => { if (!(typeof variable === 'string') || variable.length != 32) { @@ -92,5 +91,5 @@ if (process.env.NODE_ENV !== 'local' && process.env.NODE_ENV !== 'test') { } export function validateEnv() { - envalid.cleanEnv(process.env, validators); + cleanEnv(process.env, validators); } diff --git a/apps/ws/src/config/index.ts b/apps/ws/src/config/index.ts index 84674eeed5a..dda548e2739 100644 --- a/apps/ws/src/config/index.ts +++ b/apps/ws/src/config/index.ts @@ -1,6 +1,5 @@ import * as dotenv from 'dotenv'; -import * as envalid from 'envalid'; -import { json, str, port } from 'envalid'; +import { cleanEnv, json, num, str, port } from 'envalid'; import { getContextPath, NovuComponentEnum } from '@novu/shared'; dotenv.config(); @@ -31,7 +30,7 @@ const { error } = dotenv.config({ path }); if (error && !process.env.LAMBDA_TASK_ROOT) throw error; -envalid.cleanEnv(process.env, { +cleanEnv(process.env, { NODE_ENV: str({ choices: ['dev', 'test', 'production', 'ci', 'local'], default: 'local', @@ -43,6 +42,12 @@ envalid.cleanEnv(process.env, { default: undefined, }), JWT_SECRET: str(), + WORKER_DEFAULT_CONCURRENCY: num({ + default: undefined, + }), + WORKER_DEFAULT_LOCK_DURATION: num({ + default: undefined, + }), }); export const CONTEXT_PATH = getContextPath(NovuComponentEnum.WS); diff --git a/apps/ws/src/socket/services/web-socket.worker.ts b/apps/ws/src/socket/services/web-socket.worker.ts index 60883aed266..47dcb560ca6 100644 --- a/apps/ws/src/socket/services/web-socket.worker.ts +++ b/apps/ws/src/socket/services/web-socket.worker.ts @@ -1,7 +1,12 @@ const nr = require('newrelic'); import { Injectable, Logger } from '@nestjs/common'; -import { INovuWorker, WebSocketsWorkerService } from '@novu/application-generic'; +import { + getWebSocketWorkerOptions, + INovuWorker, + WebSocketsWorkerService, + WorkerOptions, +} from '@novu/application-generic'; import { ExternalServicesRoute, ExternalServicesRouteCommand } from '../usecases/external-services-route'; import { ObservabilityBackgroundTransactionEnum } from '@novu/shared'; @@ -61,10 +66,7 @@ export class WebSocketWorker extends WebSocketsWorkerService implements INovuWor }; } - private getWorkerOpts() { - return { - lockDuration: 90000, - concurrency: 200, - }; + private getWorkerOpts(): WorkerOptions { + return getWebSocketWorkerOptions(); } } diff --git a/packages/application-generic/src/config/index.ts b/packages/application-generic/src/config/index.ts new file mode 100644 index 00000000000..3b9bdec5677 --- /dev/null +++ b/packages/application-generic/src/config/index.ts @@ -0,0 +1 @@ +export * from './workers'; diff --git a/packages/application-generic/src/config/workers.config.spec.ts b/packages/application-generic/src/config/workers.config.spec.ts new file mode 100644 index 00000000000..1c864736704 --- /dev/null +++ b/packages/application-generic/src/config/workers.config.spec.ts @@ -0,0 +1,119 @@ +import { + getInboundParseMailWorkerOptions, + getStandardWorkerOptions, + getSubscriberProcessWorkerOptions, + getWebSocketWorkerOptions, + getWorkflowWorkerOptions, +} from './workers'; + +describe('Workers Config', () => { + describe('Inbound Parse Mail Worker', () => { + it('should have the default values when no environment variable set', () => { + expect(getInboundParseMailWorkerOptions()).toEqual({ + concurrency: 200, + lockDuration: 90000, + }); + }); + + it('should have the values passed through the environment variables', () => { + process.env.WORKER_DEFAULT_CONCURRENCY = '100'; + process.env.WORKER_DEFAULT_LOCK_DURATION = '10'; + + expect(getInboundParseMailWorkerOptions()).toEqual({ + concurrency: 100, + lockDuration: 10, + }); + + process.env.WORKER_DEFAULT_CONCURRENCY = ''; + process.env.WORKER_DEFAULT_LOCK_DURATION = ''; + }); + }); + + describe('Standard Worker', () => { + it('should have the default values when no environment variable set', () => { + expect(getStandardWorkerOptions()).toEqual({ + concurrency: 200, + lockDuration: 90000, + }); + }); + + it('should have the values passed through the environment variables', () => { + process.env.WORKER_DEFAULT_CONCURRENCY = '100'; + process.env.WORKER_DEFAULT_LOCK_DURATION = '10'; + + expect(getStandardWorkerOptions()).toEqual({ + concurrency: 100, + lockDuration: 10, + }); + + process.env.WORKER_DEFAULT_CONCURRENCY = ''; + process.env.WORKER_DEFAULT_LOCK_DURATION = ''; + }); + }); + + describe('Subscriber Process Worker', () => { + it('should have the default values when no environment variable set', () => { + expect(getSubscriberProcessWorkerOptions()).toEqual({ + concurrency: 200, + lockDuration: 90000, + }); + }); + + it('should have the values passed through the environment variables', () => { + process.env.WORKER_DEFAULT_CONCURRENCY = '100'; + process.env.WORKER_DEFAULT_LOCK_DURATION = '10'; + + expect(getSubscriberProcessWorkerOptions()).toEqual({ + concurrency: 100, + lockDuration: 10, + }); + + process.env.WORKER_DEFAULT_CONCURRENCY = ''; + process.env.WORKER_DEFAULT_LOCK_DURATION = ''; + }); + }); + + describe('Web Socket Worker', () => { + it('should have the default values when no environment variable set', () => { + expect(getWebSocketWorkerOptions()).toEqual({ + concurrency: 200, + lockDuration: 90000, + }); + }); + + it('should have the values passed through the environment variables', () => { + process.env.WORKER_DEFAULT_CONCURRENCY = '100'; + process.env.WORKER_DEFAULT_LOCK_DURATION = '10'; + + expect(getWebSocketWorkerOptions()).toEqual({ + concurrency: 100, + lockDuration: 10, + }); + + process.env.WORKER_DEFAULT_CONCURRENCY = ''; + process.env.WORKER_DEFAULT_LOCK_DURATION = ''; + }); + }); + + describe('Workflow Worker', () => { + it('should have the default values when no environment variable set', () => { + expect(getWorkflowWorkerOptions()).toEqual({ + concurrency: 200, + lockDuration: 90000, + }); + }); + + it('should have the values passed through the environment variables', () => { + process.env.WORKER_DEFAULT_CONCURRENCY = '100'; + process.env.WORKER_DEFAULT_LOCK_DURATION = '10'; + + expect(getWorkflowWorkerOptions()).toEqual({ + concurrency: 100, + lockDuration: 10, + }); + + process.env.WORKER_DEFAULT_CONCURRENCY = ''; + process.env.WORKER_DEFAULT_LOCK_DURATION = ''; + }); + }); +}); diff --git a/packages/application-generic/src/config/workers.ts b/packages/application-generic/src/config/workers.ts new file mode 100644 index 00000000000..6dd073f8628 --- /dev/null +++ b/packages/application-generic/src/config/workers.ts @@ -0,0 +1,66 @@ +enum WorkerEnum { + INBOUND_PARSE_MAIL = 'InboundParseMailWorker', + SUBSCRIBER_PROCESS = 'SubscriberProcessWorker', + STANDARD = 'StandardWorker', + WEB_SOCKET = 'WebSocketWorker', + WORKFLOW = 'WorkflowWorker', +} + +interface IWorkerConfig { + concurrency: number; + lockDuration: number; +} + +type WorkersConfig = Record; + +const getDefaultConcurrency = () => + process.env.WORKER_DEFAULT_CONCURRENCY + ? Number(process.env.WORKER_DEFAULT_CONCURRENCY) + : undefined; + +const getDefaultLockDuration = () => + process.env.WORKER_DEFAULT_LOCK_DURATION + ? Number(process.env.WORKER_DEFAULT_LOCK_DURATION) + : undefined; + +const getWorkerConfig = (worker: WorkerEnum): IWorkerConfig => { + const workersConfig = { + [WorkerEnum.INBOUND_PARSE_MAIL]: { + concurrency: getDefaultConcurrency() ?? 200, + lockDuration: getDefaultLockDuration() ?? 90000, + }, + [WorkerEnum.SUBSCRIBER_PROCESS]: { + concurrency: getDefaultConcurrency() ?? 200, + lockDuration: getDefaultLockDuration() ?? 90000, + }, + [WorkerEnum.STANDARD]: { + concurrency: getDefaultConcurrency() ?? 200, + lockDuration: getDefaultLockDuration() ?? 90000, + }, + [WorkerEnum.WEB_SOCKET]: { + concurrency: getDefaultConcurrency() ?? 200, + lockDuration: getDefaultLockDuration() ?? 90000, + }, + [WorkerEnum.WORKFLOW]: { + concurrency: getDefaultConcurrency() ?? 200, + lockDuration: getDefaultLockDuration() ?? 90000, + }, + }; + + return workersConfig[worker]; +}; + +export const getInboundParseMailWorkerOptions = () => + getWorkerConfig(WorkerEnum.INBOUND_PARSE_MAIL); + +export const getSubscriberProcessWorkerOptions = () => + getWorkerConfig(WorkerEnum.SUBSCRIBER_PROCESS); + +export const getStandardWorkerOptions = () => + getWorkerConfig(WorkerEnum.STANDARD); + +export const getWebSocketWorkerOptions = () => + getWorkerConfig(WorkerEnum.WEB_SOCKET); + +export const getWorkflowWorkerOptions = () => + getWorkerConfig(WorkerEnum.WORKFLOW); diff --git a/packages/application-generic/src/index.ts b/packages/application-generic/src/index.ts index 86e982129dd..41754b61601 100644 --- a/packages/application-generic/src/index.ts +++ b/packages/application-generic/src/index.ts @@ -1,4 +1,5 @@ export * from './commands/index'; +export * from './config'; export * from './custom-providers'; export * from './factories/index'; export * from './health/index';