Skip to content

Commit

Permalink
Merge pull request novuhq#4544 from novuhq/inf-93-environment-variabl…
Browse files Browse the repository at this point in the history
…es-for-workers-configuration

feat(worker): manage default workers config through env variables
  • Loading branch information
Cliftonz authored Oct 30, 2023
2 parents c1b6289 + 233dc05 commit 0a3e6ea
Show file tree
Hide file tree
Showing 14 changed files with 233 additions and 36 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {
getInboundParseMailWorkerOptions,
InboundParseQueue,
InboundParseWorker,
Queue,
Expand Down Expand Up @@ -29,10 +30,7 @@ export class InboundParseQueueService {
}

private getWorkerOptions(): WorkerOptions {
return {
lockDuration: 90000,
concurrency: 200,
};
return getInboundParseMailWorkerOptions();
}

public getWorkerProcessor() {
Expand Down
11 changes: 8 additions & 3 deletions apps/api/src/config/env-validator.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { bool, json, makeValidator, port, str, num, url, ValidatorSpec } from 'envalid';
import * as envalid from 'envalid';
import { bool, cleanEnv, json, makeValidator, port, str, num, url, ValidatorSpec } from 'envalid';

const str32 = makeValidator((variable) => {
if (!(typeof variable === 'string') || variable.length != 32) {
Expand Down Expand Up @@ -73,6 +72,12 @@ const validators: { [K in keyof any]: ValidatorSpec<any[K]> } = {
LAUNCH_DARKLY_SDK_KEY: str({
default: '',
}),
WORKER_DEFAULT_CONCURRENCY: num({
default: undefined,
}),
WORKER_DEFAULT_LOCK_DURATION: num({
default: undefined,
}),
};

if (process.env.STORAGE_SERVICE === 'AZURE') {
Expand Down Expand Up @@ -120,5 +125,5 @@ if (process.env.NODE_ENV !== 'local' && process.env.NODE_ENV !== 'test') {
}

export function validateEnv() {
envalid.cleanEnv(process.env, validators);
cleanEnv(process.env, validators);
}
11 changes: 8 additions & 3 deletions apps/inbound-mail/src/config/env-validator.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { json, port, str, ValidatorSpec } from 'envalid';
import * as envalid from 'envalid';
import { cleanEnv, json, num, port, str, ValidatorSpec } from 'envalid';

// eslint-disable-next-line @typescript-eslint/no-explicit-any
const validators: { [K in keyof any]: ValidatorSpec<any[K]> } = {
Expand All @@ -12,8 +11,14 @@ const validators: { [K in keyof any]: ValidatorSpec<any[K]> } = {
REDIS_TLS: json({
default: undefined,
}),
WORKER_DEFAULT_CONCURRENCY: num({
default: undefined,
}),
WORKER_DEFAULT_LOCK_DURATION: num({
default: undefined,
}),
};

export function validateEnv() {
envalid.cleanEnv(process.env, validators);
cleanEnv(process.env, validators);
}
5 changes: 2 additions & 3 deletions apps/webhook/src/config/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import * as dotenv from 'dotenv';
import * as envalid from 'envalid';
import { str, port } from 'envalid';
import { cleanEnv, str, port } from 'envalid';

dotenv.config();

Expand All @@ -19,7 +18,7 @@ const { error } = dotenv.config({ path: pathToDotEnv });

if (error && !process.env.LAMBDA_TASK_ROOT) throw error;

envalid.cleanEnv(process.env, {
cleanEnv(process.env, {
NODE_ENV: str({
choices: ['dev', 'test', 'production', 'ci', 'local'],
default: 'local',
Expand Down
4 changes: 2 additions & 2 deletions apps/worker/src/app/workflow/services/standard.worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
ObservabilityBackgroundTransactionEnum,
} from '@novu/shared';
import {
getStandardWorkerOptions,
INovuWorker,
Job,
PinoLogger,
Expand Down Expand Up @@ -51,8 +52,7 @@ export class StandardWorker extends StandardWorkerService implements INovuWorker

private getWorkerOptions(): WorkerOptions {
return {
lockDuration: 90000,
concurrency: 200,
...getStandardWorkerOptions(),
settings: {
backoffStrategy: this.getBackoffStrategies(),
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ import { Injectable, Logger } from '@nestjs/common';
const nr = require('newrelic');
import { ObservabilityBackgroundTransactionEnum } from '@novu/shared';
import {
getSubscriberProcessWorkerOptions,
SubscriberJobBound,
SubscriberProcessWorkerService,
PinoLogger,
SubscriberJobBoundCommand,
storage,
Store,
WorkerOptions,
} from '@novu/application-generic';

const LOG_CONTEXT = 'SubscriberProcessWorker';
Expand Down Expand Up @@ -50,10 +52,7 @@ export class SubscriberProcessWorker extends SubscriberProcessWorkerService {
};
}

private getWorkerOpts() {
return {
lockDuration: 90000,
concurrency: 200,
};
private getWorkerOpts(): WorkerOptions {
return getSubscriberProcessWorkerOptions();
}
}
6 changes: 2 additions & 4 deletions apps/worker/src/app/workflow/services/workflow.worker.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Injectable, Logger } from '@nestjs/common';
const nr = require('newrelic');
import {
getWorkflowWorkerOptions,
INovuWorker,
PinoLogger,
storage,
Expand All @@ -24,10 +25,7 @@ export class WorkflowWorker extends WorkflowWorkerService implements INovuWorker
}

private getWorkerOptions(): WorkerOptions {
return {
lockDuration: 90000,
concurrency: 200,
};
return getWorkflowWorkerOptions();
}

private getWorkerProcessor(): WorkerProcessor {
Expand Down
5 changes: 2 additions & 3 deletions apps/worker/src/config/env-validator.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { json, port, str, num, ValidatorSpec, makeValidator } from 'envalid';
import * as envalid from 'envalid';
import { cleanEnv, json, port, str, num, ValidatorSpec, makeValidator } from 'envalid';

const str32 = makeValidator((variable) => {
if (!(typeof variable === 'string') || variable.length != 32) {
Expand Down Expand Up @@ -92,5 +91,5 @@ if (process.env.NODE_ENV !== 'local' && process.env.NODE_ENV !== 'test') {
}

export function validateEnv() {
envalid.cleanEnv(process.env, validators);
cleanEnv(process.env, validators);
}
11 changes: 8 additions & 3 deletions apps/ws/src/config/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import * as dotenv from 'dotenv';
import * as envalid from 'envalid';
import { json, str, port } from 'envalid';
import { cleanEnv, json, num, str, port } from 'envalid';
import { getContextPath, NovuComponentEnum } from '@novu/shared';

dotenv.config();
Expand Down Expand Up @@ -31,7 +30,7 @@ const { error } = dotenv.config({ path });

if (error && !process.env.LAMBDA_TASK_ROOT) throw error;

envalid.cleanEnv(process.env, {
cleanEnv(process.env, {
NODE_ENV: str({
choices: ['dev', 'test', 'production', 'ci', 'local'],
default: 'local',
Expand All @@ -43,6 +42,12 @@ envalid.cleanEnv(process.env, {
default: undefined,
}),
JWT_SECRET: str(),
WORKER_DEFAULT_CONCURRENCY: num({
default: undefined,
}),
WORKER_DEFAULT_LOCK_DURATION: num({
default: undefined,
}),
});

export const CONTEXT_PATH = getContextPath(NovuComponentEnum.WS);
14 changes: 8 additions & 6 deletions apps/ws/src/socket/services/web-socket.worker.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
const nr = require('newrelic');
import { Injectable, Logger } from '@nestjs/common';

import { INovuWorker, WebSocketsWorkerService } from '@novu/application-generic';
import {
getWebSocketWorkerOptions,
INovuWorker,
WebSocketsWorkerService,
WorkerOptions,
} from '@novu/application-generic';

import { ExternalServicesRoute, ExternalServicesRouteCommand } from '../usecases/external-services-route';
import { ObservabilityBackgroundTransactionEnum } from '@novu/shared';
Expand Down Expand Up @@ -61,10 +66,7 @@ export class WebSocketWorker extends WebSocketsWorkerService implements INovuWor
};
}

private getWorkerOpts() {
return {
lockDuration: 90000,
concurrency: 200,
};
private getWorkerOpts(): WorkerOptions {
return getWebSocketWorkerOptions();
}
}
1 change: 1 addition & 0 deletions packages/application-generic/src/config/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from './workers';
119 changes: 119 additions & 0 deletions packages/application-generic/src/config/workers.config.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import {
getInboundParseMailWorkerOptions,
getStandardWorkerOptions,
getSubscriberProcessWorkerOptions,
getWebSocketWorkerOptions,
getWorkflowWorkerOptions,
} from './workers';

describe('Workers Config', () => {
describe('Inbound Parse Mail Worker', () => {
it('should have the default values when no environment variable set', () => {
expect(getInboundParseMailWorkerOptions()).toEqual({
concurrency: 200,
lockDuration: 90000,
});
});

it('should have the values passed through the environment variables', () => {
process.env.WORKER_DEFAULT_CONCURRENCY = '100';
process.env.WORKER_DEFAULT_LOCK_DURATION = '10';

expect(getInboundParseMailWorkerOptions()).toEqual({
concurrency: 100,
lockDuration: 10,
});

process.env.WORKER_DEFAULT_CONCURRENCY = '';
process.env.WORKER_DEFAULT_LOCK_DURATION = '';
});
});

describe('Standard Worker', () => {
it('should have the default values when no environment variable set', () => {
expect(getStandardWorkerOptions()).toEqual({
concurrency: 200,
lockDuration: 90000,
});
});

it('should have the values passed through the environment variables', () => {
process.env.WORKER_DEFAULT_CONCURRENCY = '100';
process.env.WORKER_DEFAULT_LOCK_DURATION = '10';

expect(getStandardWorkerOptions()).toEqual({
concurrency: 100,
lockDuration: 10,
});

process.env.WORKER_DEFAULT_CONCURRENCY = '';
process.env.WORKER_DEFAULT_LOCK_DURATION = '';
});
});

describe('Subscriber Process Worker', () => {
it('should have the default values when no environment variable set', () => {
expect(getSubscriberProcessWorkerOptions()).toEqual({
concurrency: 200,
lockDuration: 90000,
});
});

it('should have the values passed through the environment variables', () => {
process.env.WORKER_DEFAULT_CONCURRENCY = '100';
process.env.WORKER_DEFAULT_LOCK_DURATION = '10';

expect(getSubscriberProcessWorkerOptions()).toEqual({
concurrency: 100,
lockDuration: 10,
});

process.env.WORKER_DEFAULT_CONCURRENCY = '';
process.env.WORKER_DEFAULT_LOCK_DURATION = '';
});
});

describe('Web Socket Worker', () => {
it('should have the default values when no environment variable set', () => {
expect(getWebSocketWorkerOptions()).toEqual({
concurrency: 200,
lockDuration: 90000,
});
});

it('should have the values passed through the environment variables', () => {
process.env.WORKER_DEFAULT_CONCURRENCY = '100';
process.env.WORKER_DEFAULT_LOCK_DURATION = '10';

expect(getWebSocketWorkerOptions()).toEqual({
concurrency: 100,
lockDuration: 10,
});

process.env.WORKER_DEFAULT_CONCURRENCY = '';
process.env.WORKER_DEFAULT_LOCK_DURATION = '';
});
});

describe('Workflow Worker', () => {
it('should have the default values when no environment variable set', () => {
expect(getWorkflowWorkerOptions()).toEqual({
concurrency: 200,
lockDuration: 90000,
});
});

it('should have the values passed through the environment variables', () => {
process.env.WORKER_DEFAULT_CONCURRENCY = '100';
process.env.WORKER_DEFAULT_LOCK_DURATION = '10';

expect(getWorkflowWorkerOptions()).toEqual({
concurrency: 100,
lockDuration: 10,
});

process.env.WORKER_DEFAULT_CONCURRENCY = '';
process.env.WORKER_DEFAULT_LOCK_DURATION = '';
});
});
});
Loading

0 comments on commit 0a3e6ea

Please sign in to comment.