From 0edc37f3acb2e1664d1535017c3e946b71e3c5bb Mon Sep 17 00:00:00 2001 From: Zac Clifton Date: Wed, 30 Aug 2023 16:25:31 -0400 Subject: [PATCH] feat(infra): add active and completed job metrics with dedicated queues Co-Author: Zac Clifton --- apps/worker/src/.env.production | 1 + apps/worker/src/.example.env | 1 + .../src/app/shared/utils/cron-health.ts | 19 ++ apps/worker/src/app/shared/utils/index.ts | 1 + .../active-jobs-metric.service.spec.ts | 72 +++++++ .../services/active-jobs-metric.service.ts | 157 ++++++++++++++++ .../completed-jobs-metric.service.spec.ts | 72 +++++++ .../services/completed-jobs-metric.service.ts | 151 +++++++++++++++ .../worker/src/app/workflow/services/index.ts | 3 +- .../services/job-metric.service.spec.ts | 73 -------- .../workflow/services/job-metric.service.ts | 175 ------------------ .../src/app/workflow/workflow.module.ts | 10 +- libs/shared/src/config/job-queue.ts | 3 +- .../src/custom-providers/index.ts | 4 - ...tive-jobs-metric-queue.health-indicator.ts | 38 ++++ ...eted-jobs-metric-queue.health-indicator.ts | 38 ++++ .../application-generic/src/health/index.ts | 3 +- .../job-metrics-queue.health-indicator.ts | 36 ---- .../src/modules/queues.module.ts | 11 +- .../services/bull-mq/bull-mq.service.spec.ts | 23 ++- .../src/services/bull-mq/bull-mq.service.ts | 9 +- .../active-jobs-metric-queue.service.spec.ts | 82 ++++++++ ...ts => active-jobs-metric-queue.service.ts} | 6 +- ...ompleted-jobs-metric-queue.service.spec.ts | 82 ++++++++ .../completed-jobs-metric-queue.service.ts | 17 ++ .../src/services/queues/index.ts | 6 +- .../queues/job-metrics-queue.service.spec.ts | 78 -------- ...s => active-jobs-metric-worker.service.ts} | 6 +- .../completed-jobs-metric-worker.service.ts | 14 ++ .../src/services/workers/index.ts | 6 +- 30 files changed, 806 insertions(+), 391 deletions(-) create mode 100644 apps/worker/src/app/shared/utils/cron-health.ts create mode 100644 apps/worker/src/app/workflow/services/active-jobs-metric.service.spec.ts create mode 100644 apps/worker/src/app/workflow/services/active-jobs-metric.service.ts create mode 100644 apps/worker/src/app/workflow/services/completed-jobs-metric.service.spec.ts create mode 100644 apps/worker/src/app/workflow/services/completed-jobs-metric.service.ts delete mode 100644 apps/worker/src/app/workflow/services/job-metric.service.spec.ts delete mode 100644 apps/worker/src/app/workflow/services/job-metric.service.ts create mode 100644 packages/application-generic/src/health/active-jobs-metric-queue.health-indicator.ts create mode 100644 packages/application-generic/src/health/completed-jobs-metric-queue.health-indicator.ts delete mode 100644 packages/application-generic/src/health/job-metrics-queue.health-indicator.ts create mode 100644 packages/application-generic/src/services/queues/active-jobs-metric-queue.service.spec.ts rename packages/application-generic/src/services/queues/{job-metrics-queue.service.ts => active-jobs-metric-queue.service.ts} (62%) create mode 100644 packages/application-generic/src/services/queues/completed-jobs-metric-queue.service.spec.ts create mode 100644 packages/application-generic/src/services/queues/completed-jobs-metric-queue.service.ts delete mode 100644 packages/application-generic/src/services/queues/job-metrics-queue.service.spec.ts rename packages/application-generic/src/services/workers/{job-metrics-worker.service.ts => active-jobs-metric-worker.service.ts} (60%) create mode 100644 packages/application-generic/src/services/workers/completed-jobs-metric-worker.service.ts diff --git a/apps/worker/src/.env.production b/apps/worker/src/.env.production index 1de6a00b0775..9d238b9613e6 100644 --- a/apps/worker/src/.env.production +++ b/apps/worker/src/.env.production @@ -1,5 +1,6 @@ NODE_ENV=production PORT=3004 +FLEET_NAME="default" # STORE_ENCRYPTION_KEY="" # Novu diff --git a/apps/worker/src/.example.env b/apps/worker/src/.example.env index 10ef63f35d95..3bae76e85a2f 100644 --- a/apps/worker/src/.example.env +++ b/apps/worker/src/.example.env @@ -1,5 +1,6 @@ NODE_ENV=local PORT=3004 +FLEET_NAME="default" STORE_ENCRYPTION_KEY="" # Novu diff --git a/apps/worker/src/app/shared/utils/cron-health.ts b/apps/worker/src/app/shared/utils/cron-health.ts new file mode 100644 index 000000000000..2e7bb4dd6cb7 --- /dev/null +++ b/apps/worker/src/app/shared/utils/cron-health.ts @@ -0,0 +1,19 @@ +import { Logger } from '@nestjs/common'; + +const url = process.env.CRON_CHECKING_URL + ? process.env.CRON_CHECKING_URL + : 'https://uptime.betterstack.com/api/v1/heartbeat/'; +const LOG_CONTEXT = 'cronHealth'; +export async function checkingForCronJob(cronId?: string) { + if (process.env.NOVU_MANAGED_SERVICE && process.env.NODE_ENV === 'production' && cronId && url) { + Logger.verbose(`Calling health endpoint for ${cronId}`); + + const response = await fetch(url + cronId); + + if (response.status !== 200) { + Logger.error(`Failed calling better Uptime: ${response.status}`, LOG_CONTEXT); + } else { + Logger.verbose(`Response from better Uptime: ${response.status}`, LOG_CONTEXT); + } + } +} diff --git a/apps/worker/src/app/shared/utils/index.ts b/apps/worker/src/app/shared/utils/index.ts index e283f25c44c3..42d722103bcc 100644 --- a/apps/worker/src/app/shared/utils/index.ts +++ b/apps/worker/src/app/shared/utils/index.ts @@ -1,3 +1,4 @@ export * from './constants'; export * from './exceptions'; export * from './hmac'; +export * from './cron-health'; diff --git a/apps/worker/src/app/workflow/services/active-jobs-metric.service.spec.ts b/apps/worker/src/app/workflow/services/active-jobs-metric.service.spec.ts new file mode 100644 index 000000000000..861bb2fd737e --- /dev/null +++ b/apps/worker/src/app/workflow/services/active-jobs-metric.service.spec.ts @@ -0,0 +1,72 @@ +import { Test } from '@nestjs/testing'; +import { expect } from 'chai'; + +import { + ActiveJobsMetricQueueService, + ActiveJobsMetricWorkerService, + StandardQueueService, + WebSocketsQueueService, + WorkflowQueueService, +} from '@novu/application-generic'; + +import { ActiveJobsMetricService } from './active-jobs-metric.service'; + +import { WorkflowModule } from '../workflow.module'; + +let activeJobsMetricService: ActiveJobsMetricService; + +describe('Active Jobs Metric Service', () => { + before(async () => { + process.env.IN_MEMORY_CLUSTER_MODE_ENABLED = 'false'; + process.env.IS_IN_MEMORY_CLUSTER_MODE_ENABLED = 'false'; + + const moduleRef = await Test.createTestingModule({ + imports: [WorkflowModule], + }).compile(); + + const standardService = moduleRef.get(StandardQueueService); + const webSocketsQueueService = moduleRef.get(WebSocketsQueueService); + const workflowQueueService = moduleRef.get(WorkflowQueueService); + + const activeJobsMetricQueueService = new ActiveJobsMetricQueueService(); + const activeJobsMetricWorkerService = new ActiveJobsMetricWorkerService(); + + activeJobsMetricService = new ActiveJobsMetricService( + [standardService, webSocketsQueueService, workflowQueueService], + activeJobsMetricQueueService, + activeJobsMetricWorkerService + ); + }); + + after(async () => { + await activeJobsMetricService.activeJobsMetricQueueService.queue.drain(); + await activeJobsMetricService.gracefulShutdown(); + }); + + it('should be initialised properly', async () => { + expect(activeJobsMetricService).to.be.ok; + expect(activeJobsMetricService).to.have.all.keys( + 'activeJobsMetricQueueService', + 'activeJobsMetricWorkerService', + 'tokenList' + ); + expect(await activeJobsMetricService.activeJobsMetricQueueService.bullMqService.getStatus()).to.deep.equal({ + queueIsPaused: false, + queueName: 'metric-active-jobs', + workerName: undefined, + workerIsPaused: undefined, + workerIsRunning: undefined, + }); + expect(await activeJobsMetricService.activeJobsMetricWorkerService.bullMqService.getStatus()).to.deep.equal({ + queueIsPaused: undefined, + queueName: undefined, + workerName: 'metric-active-jobs', + workerIsPaused: false, + workerIsRunning: true, + }); + expect(activeJobsMetricService.activeJobsMetricWorkerService.worker.opts).to.deep.include({ + concurrency: 1, + lockDuration: 900, + }); + }); +}); diff --git a/apps/worker/src/app/workflow/services/active-jobs-metric.service.ts b/apps/worker/src/app/workflow/services/active-jobs-metric.service.ts new file mode 100644 index 000000000000..a1d20be7405d --- /dev/null +++ b/apps/worker/src/app/workflow/services/active-jobs-metric.service.ts @@ -0,0 +1,157 @@ +import { + ActiveJobsMetricQueueService, + ActiveJobsMetricWorkerService, + QueueBaseService, + WorkerOptions, +} from '@novu/application-generic'; +import * as process from 'process'; +import { JobTopicNameEnum } from '@novu/shared'; + +import { Inject, Injectable, Logger } from '@nestjs/common'; + +import { checkingForCronJob } from '../../shared/utils'; + +const LOG_CONTEXT = 'ActiveJobMetricService'; +const METRIC_JOB_ID = 'metric-job'; + +@Injectable() +export class ActiveJobsMetricService { + constructor( + @Inject('BULLMQ_LIST') private tokenList: QueueBaseService[], + public readonly activeJobsMetricQueueService: ActiveJobsMetricQueueService, + public readonly activeJobsMetricWorkerService: ActiveJobsMetricWorkerService + ) { + if (process.env.NOVU_MANAGED_SERVICE === 'true' && process.env.NEW_RELIC_LICENSE_KEY) { + this.activeJobsMetricQueueService.createQueue(); + this.activeJobsMetricWorkerService.createWorker(this.getWorkerProcessor(), this.getWorkerOptions()); + + this.activeJobsMetricWorkerService.worker.on('completed', async (job) => { + await checkingForCronJob(process.env.ACTIVE_CRON_ID); + Logger.verbose({ jobId: job.id }, 'Metric Completed Job', LOG_CONTEXT); + }); + + this.activeJobsMetricWorkerService.worker.on('failed', async (job, error) => { + Logger.verbose('Metric Completed Job failed', LOG_CONTEXT, error); + }); + + this.addToQueueIfMetricJobExists(); + } + } + + private addToQueueIfMetricJobExists(): void { + Promise.resolve( + this.activeJobsMetricQueueService.queue.getRepeatableJobs().then((job): boolean => { + let exists = false; + for (const jobElement of job) { + if (jobElement.id === METRIC_JOB_ID) { + exists = true; + } + } + + return exists; + }) + ) + .then(async (exists: boolean): Promise => { + Logger.debug(`metric job exists: ${exists}`, LOG_CONTEXT); + + if (!exists) { + Logger.debug(`metricJob doesn't exist, creating it`, LOG_CONTEXT); + + return await this.activeJobsMetricQueueService.add(METRIC_JOB_ID, undefined, '', { + jobId: METRIC_JOB_ID, + repeatJobKey: METRIC_JOB_ID, + repeat: { + immediately: true, + pattern: '* * * * * *', + }, + removeOnFail: true, + removeOnComplete: true, + attempts: 1, + }); + } + + return undefined; + }) + .catch((error) => Logger.error('Metric Job Exists function errored', LOG_CONTEXT, error)); + } + + private getWorkerOptions(): WorkerOptions { + return { + lockDuration: 900, + concurrency: 1, + settings: {}, + }; + } + + private getWorkerProcessor() { + return async () => { + return await new Promise(async (resolve, reject): Promise => { + Logger.verbose('metric job started', LOG_CONTEXT); + const deploymentName = process.env.FLEET_NAME ?? 'default'; + + try { + for (const queueService of this.tokenList) { + const waitCount = await queueService.bullMqService.queue.getWaitingCount(); + const delayedCount = await queueService.bullMqService.queue.getDelayedCount(); + const activeCount = await queueService.bullMqService.queue.getActiveCount(); + + if (process.env.NOVU_MANAGED_SERVICE === 'true' && process.env.NEW_RELIC_LICENSE_KEY) { + Logger.verbose('active length', process.env.NEW_RELIC_LICENSE_KEY.length); + Logger.log('Recording active, waiting, and delayed metrics'); + + const nr = require('newrelic'); + nr.recordMetric( + `ActiveJobsMetricQueueService/${deploymentName}/${queueService.topic}/waiting`, + waitCount + ); + nr.recordMetric( + `ActiveJobsMetricQueueService/${deploymentName}/${queueService.topic}/delayed`, + delayedCount + ); + nr.recordMetric( + `ActiveJobsMetricQueueService/${deploymentName}/${queueService.topic}/active`, + activeCount + ); + + Logger.verbose(`ActiveJobsMetricQueueService/${deploymentName}/${queueService.topic}/waiting`, waitCount); + Logger.verbose( + `ActiveJobsMetricQueueService/${deploymentName}/${queueService.topic}/delayed`, + delayedCount + ); + Logger.verbose( + `ActiveJobsMetricQueueService/${deploymentName}/${queueService.topic}/active`, + activeCount + ); + } else { + Logger.debug(`ActiveJobsMetricQueueService/${deploymentName}/${queueService.topic}/waiting`, waitCount); + Logger.debug( + `ActiveJobsMetricQueueService/${deploymentName}/${queueService.topic}/delayed`, + delayedCount + ); + Logger.debug(`ActiveJobsMetricQueueService/${deploymentName}/${queueService.topic}/active`, activeCount); + } + } + + return resolve(); + } catch (error) { + Logger.error({ error }, 'Error occured while processing metrics', LOG_CONTEXT); + + return reject(error); + } + }); + }; + } + + public async gracefulShutdown(): Promise { + Logger.log('Shutting the Active Jobs Metric service down', LOG_CONTEXT); + + await this.activeJobsMetricQueueService.gracefulShutdown(); + await this.activeJobsMetricWorkerService.gracefulShutdown(); + + Logger.log('Shutting down the Active Jobs Metric service has finished', LOG_CONTEXT); + } + + async onModuleDestroy(): Promise { + await this.gracefulShutdown(); + } +} diff --git a/apps/worker/src/app/workflow/services/completed-jobs-metric.service.spec.ts b/apps/worker/src/app/workflow/services/completed-jobs-metric.service.spec.ts new file mode 100644 index 000000000000..7f2702152d97 --- /dev/null +++ b/apps/worker/src/app/workflow/services/completed-jobs-metric.service.spec.ts @@ -0,0 +1,72 @@ +import { Test } from '@nestjs/testing'; +import { expect } from 'chai'; + +import { + CompletedJobsMetricQueueService, + CompletedJobsMetricWorkerService, + StandardQueueService, + WebSocketsQueueService, + WorkflowQueueService, +} from '@novu/application-generic'; + +import { CompletedJobsMetricService } from './completed-jobs-metric.service'; + +import { WorkflowModule } from '../workflow.module'; + +let completedJobsMetricService: CompletedJobsMetricService; + +describe('Completed Jobs Metric Service', () => { + before(async () => { + process.env.IN_MEMORY_CLUSTER_MODE_ENABLED = 'false'; + process.env.IS_IN_MEMORY_CLUSTER_MODE_ENABLED = 'false'; + + const moduleRef = await Test.createTestingModule({ + imports: [WorkflowModule], + }).compile(); + + const standardService = moduleRef.get(StandardQueueService); + const webSocketsQueueService = moduleRef.get(WebSocketsQueueService); + const workflowQueueService = moduleRef.get(WorkflowQueueService); + + const completedJobsMetricQueueService = new CompletedJobsMetricQueueService(); + const completedJobsMetricWorkerService = new CompletedJobsMetricWorkerService(); + + completedJobsMetricService = new CompletedJobsMetricService( + [standardService, webSocketsQueueService, workflowQueueService], + completedJobsMetricQueueService, + completedJobsMetricWorkerService + ); + }); + + after(async () => { + await completedJobsMetricService.completedJobsMetricQueueService.queue.drain(); + await completedJobsMetricService.gracefulShutdown(); + }); + + it('should be initialised properly', async () => { + expect(completedJobsMetricService).to.be.ok; + expect(completedJobsMetricService).to.have.all.keys( + 'completedJobsMetricQueueService', + 'completedJobsMetricWorkerService', + 'tokenList' + ); + expect(await completedJobsMetricService.completedJobsMetricQueueService.bullMqService.getStatus()).to.deep.equal({ + queueIsPaused: false, + queueName: 'metric-completed-jobs', + workerName: undefined, + workerIsPaused: undefined, + workerIsRunning: undefined, + }); + expect(await completedJobsMetricService.completedJobsMetricWorkerService.bullMqService.getStatus()).to.deep.equal({ + queueIsPaused: undefined, + queueName: undefined, + workerName: 'metric-completed-jobs', + workerIsPaused: false, + workerIsRunning: true, + }); + expect(completedJobsMetricService.completedJobsMetricWorkerService.worker.opts).to.deep.include({ + concurrency: 1, + lockDuration: 900, + }); + }); +}); diff --git a/apps/worker/src/app/workflow/services/completed-jobs-metric.service.ts b/apps/worker/src/app/workflow/services/completed-jobs-metric.service.ts new file mode 100644 index 000000000000..fb9245c67c7f --- /dev/null +++ b/apps/worker/src/app/workflow/services/completed-jobs-metric.service.ts @@ -0,0 +1,151 @@ +import { + CompletedJobsMetricQueueService, + CompletedJobsMetricWorkerService, + QueueBaseService, + WorkerOptions, +} from '@novu/application-generic'; +import * as process from 'process'; +import { JobTopicNameEnum } from '@novu/shared'; + +import { Inject, Injectable, Logger } from '@nestjs/common'; + +import { checkingForCronJob } from '../../shared/utils'; + +const LOG_CONTEXT = 'CompletedJobMetricService'; +const METRIC_JOB_ID = 'metric-job'; + +@Injectable() +export class CompletedJobsMetricService { + constructor( + @Inject('BULLMQ_LIST') private tokenList: QueueBaseService[], + public readonly completedJobsMetricQueueService: CompletedJobsMetricQueueService, + public readonly completedJobsMetricWorkerService: CompletedJobsMetricWorkerService + ) { + if (process.env.NOVU_MANAGED_SERVICE === 'true' && process.env.NEW_RELIC_LICENSE_KEY) { + this.completedJobsMetricQueueService.createQueue(); + this.completedJobsMetricWorkerService.createWorker(this.getWorkerProcessor(), this.getWorkerOptions()); + + this.completedJobsMetricWorkerService.worker.on('completed', async (job) => { + await checkingForCronJob(process.env.ACTIVE_CRON_ID); + Logger.verbose('Metric Completed Job', job.id, LOG_CONTEXT); + }); + + this.completedJobsMetricWorkerService.worker.on('failed', async (job, error) => { + Logger.verbose('Metric Completed Job failed', LOG_CONTEXT, error); + }); + + this.addToQueueIfMetricJobExists(); + } + } + + private addToQueueIfMetricJobExists(): void { + Promise.resolve( + this.completedJobsMetricQueueService.queue.getRepeatableJobs().then((job): boolean => { + let exists = false; + for (const jobElement of job) { + if (jobElement.id === METRIC_JOB_ID) { + exists = true; + } + } + + return exists; + }) + ) + .then(async (exists: boolean): Promise => { + Logger.debug(`metric job exists: ${exists}`, LOG_CONTEXT); + + if (!exists) { + Logger.debug(`metricJob doesn't exist, creating it`, LOG_CONTEXT); + + return await this.completedJobsMetricQueueService.add(METRIC_JOB_ID, undefined, '', { + jobId: METRIC_JOB_ID, + repeatJobKey: METRIC_JOB_ID, + repeat: { + immediately: true, + pattern: '* * * * * *', + }, + removeOnFail: true, + removeOnComplete: true, + attempts: 1, + }); + } + + return undefined; + }) + .catch((error) => Logger.error('Metric Job Exists function errored', LOG_CONTEXT, error)); + } + + private getWorkerOptions(): WorkerOptions { + return { + lockDuration: 900, + concurrency: 1, + settings: {}, + }; + } + + private getWorkerProcessor() { + return async () => { + return await new Promise(async (resolve, reject): Promise => { + Logger.verbose('metric job started', LOG_CONTEXT); + const deploymentName = process.env.FLEET_NAME ?? 'default'; + + try { + for (const queueService of this.tokenList) { + const metrics = await queueService.bullMqService.getQueueMetrics(0, 1); + const completeNumber = metrics.completed.count; + const failNumber = metrics.failed.count; + + if (process.env.NOVU_MANAGED_SERVICE === 'true' && process.env.NEW_RELIC_LICENSE_KEY) { + Logger.verbose('active length', process.env.NEW_RELIC_LICENSE_KEY.length); + Logger.log('Recording active, waiting, and delayed metrics'); + + const nr = require('newrelic'); + nr.recordMetric( + `CompletedJobsMetricQueueService${deploymentName}/${queueService.topic}/completed`, + completeNumber + ); + nr.recordMetric( + `CompletedJobsMetricQueueService${deploymentName}/${queueService.topic}/failed`, + failNumber + ); + + Logger.verbose( + `CompletedJobsMetricQueueService${deploymentName}/${queueService.topic}/completed`, + completeNumber + ); + Logger.verbose( + `CompletedJobsMetricQueueService${deploymentName}/${queueService.topic}/failed`, + failNumber + ); + } else { + Logger.debug( + `CompletedJobsMetricQueueService${deploymentName}/${queueService.topic}/completed`, + completeNumber + ); + Logger.debug(`CompletedJobsMetricQueueService${deploymentName}/${queueService.topic}/failed`, failNumber); + } + } + + return resolve(); + } catch (error) { + Logger.error({ error }, 'Error occured while processing metrics', LOG_CONTEXT); + + return reject(error); + } + }); + }; + } + + public async gracefulShutdown(): Promise { + Logger.log('Shutting the Completed Jobs Metric service down', LOG_CONTEXT); + + await this.completedJobsMetricQueueService.gracefulShutdown(); + await this.completedJobsMetricWorkerService.gracefulShutdown(); + + Logger.log('Shutting down the Completed Jobs Metric service has finished', LOG_CONTEXT); + } + + async onModuleDestroy(): Promise { + await this.gracefulShutdown(); + } +} diff --git a/apps/worker/src/app/workflow/services/index.ts b/apps/worker/src/app/workflow/services/index.ts index 980e6415d875..228b77b38c16 100644 --- a/apps/worker/src/app/workflow/services/index.ts +++ b/apps/worker/src/app/workflow/services/index.ts @@ -1,4 +1,5 @@ -export * from './job-metric.service'; +export * from './active-jobs-metric.service'; +export * from './completed-jobs-metric.service'; export * from './standard.worker'; export * from './workflow.worker'; export * from './old-instance-workflow.worker'; diff --git a/apps/worker/src/app/workflow/services/job-metric.service.spec.ts b/apps/worker/src/app/workflow/services/job-metric.service.spec.ts deleted file mode 100644 index d4652f4614e0..000000000000 --- a/apps/worker/src/app/workflow/services/job-metric.service.spec.ts +++ /dev/null @@ -1,73 +0,0 @@ -import { Test } from '@nestjs/testing'; -import { expect } from 'chai'; - -import { - JobMetricsQueueService, - JobMetricsWorkerService, - StandardQueueService, - WebSocketsQueueService, - WorkflowQueueService, -} from '@novu/application-generic'; - -import { JobMetricService } from './job-metric.service'; - -import { WorkflowModule } from '../workflow.module'; - -let jobMetricService: JobMetricService; - -describe('Job Metric Service', () => { - before(async () => { - process.env.IN_MEMORY_CLUSTER_MODE_ENABLED = 'false'; - process.env.IS_IN_MEMORY_CLUSTER_MODE_ENABLED = 'false'; - - const moduleRef = await Test.createTestingModule({ - imports: [WorkflowModule], - }).compile(); - - const standardService = moduleRef.get(StandardQueueService); - const webSocketsQueueService = moduleRef.get(WebSocketsQueueService); - const workflowQueueService = moduleRef.get(WorkflowQueueService); - - const jobMetricQueueService = new JobMetricsQueueService(); - const jobMetricWorkerService = new JobMetricsWorkerService(); - - jobMetricService = new JobMetricService( - [standardService, webSocketsQueueService, workflowQueueService], - jobMetricQueueService, - jobMetricWorkerService - ); - }); - - after(async () => { - await jobMetricService.jobMetricsQueueService.queue.drain(); - await jobMetricService.gracefulShutdown(); - }); - - it('should be initialised properly', async () => { - expect(jobMetricService).to.be.ok; - expect(jobMetricService).to.have.all.keys('jobMetricsQueueService', 'jobMetricsWorkerService', 'tokenList'); - expect(await jobMetricService.jobMetricsQueueService.bullMqService.getStatus()).to.deep.equal({ - queueIsPaused: false, - queueName: 'metric', - workerName: undefined, - workerIsPaused: undefined, - workerIsRunning: undefined, - }); - expect(await jobMetricService.jobMetricsWorkerService.bullMqService.getStatus()).to.deep.equal({ - queueIsPaused: undefined, - queueName: undefined, - workerName: 'metric', - workerIsPaused: false, - workerIsRunning: true, - }); - expect(jobMetricService.jobMetricsWorkerService.worker.opts).to.deep.include({ - concurrency: 1, - lockDuration: 500, - }); - - expect(jobMetricService.jobMetricsWorkerService.worker.opts).to.deep.include({ - concurrency: 1, - lockDuration: 500, - }); - }); -}); diff --git a/apps/worker/src/app/workflow/services/job-metric.service.ts b/apps/worker/src/app/workflow/services/job-metric.service.ts deleted file mode 100644 index 0558e64fda0f..000000000000 --- a/apps/worker/src/app/workflow/services/job-metric.service.ts +++ /dev/null @@ -1,175 +0,0 @@ -const nr = require('newrelic'); -import { Inject, Injectable, Logger } from '@nestjs/common'; -import { - JobMetricsQueueService, - JobMetricsWorkerService, - QueueBaseService, - WorkerOptions, -} from '@novu/application-generic'; -import { JobTopicNameEnum } from '@novu/shared'; -import { min, max, sum, mean } from 'simple-statistics'; - -interface IMetric { - count: number; - total: number; - min: number; - max: number; - sumOfSquares: number; -} - -function sumOfSquares(values: number[]): number { - const meanVal = mean(values); - - return sum(values.map((value) => (value - meanVal) ** 2)); -} - -const LOG_CONTEXT = 'JobMetricService'; -const METRIC_JOB_ID = 'metric-job'; - -/** - * TODO: This service should be split in 2 but have no mental capacity right now - * to think how to do. - */ -@Injectable() -export class JobMetricService { - constructor( - @Inject('BULL_MQ_TOKEN_LIST') - private tokenList: QueueBaseService[], - public readonly jobMetricsQueueService: JobMetricsQueueService, - public readonly jobMetricsWorkerService: JobMetricsWorkerService - ) { - this.jobMetricsQueueService.createQueue(); - this.jobMetricsWorkerService.createWorker(this.getWorkerProcessor(), this.getWorkerOptions()); - - this.jobMetricsWorkerService.worker.on('completed', async (job, error) => { - await this.jobHasCompleted(job); - }); - - this.jobMetricsWorkerService.worker.on('failed', async (job, error) => { - await this.jobHasFailed(job, error); - }); - - this.addToQueueIfMetricJobExists(); - } - - private addToQueueIfMetricJobExists(): void { - Promise.resolve( - this.jobMetricsQueueService.queue.getRepeatableJobs().then((job): boolean => { - let exists = false; - for (const jobElement of job) { - if (jobElement.id === METRIC_JOB_ID) { - exists = true; - } - } - - return exists; - }) - ) - .then(async (exists: boolean): Promise => { - Logger.debug(`metric job exists: ${exists}`, LOG_CONTEXT); - - if (!exists) { - Logger.debug(`metricJob doesn't exist, creating it`, LOG_CONTEXT); - - return await this.jobMetricsQueueService.add(METRIC_JOB_ID, undefined, '', { - jobId: METRIC_JOB_ID, - repeatJobKey: METRIC_JOB_ID, - repeat: { - immediately: true, - pattern: '* * * * * *', - }, - removeOnFail: true, - removeOnComplete: true, - attempts: 1, - }); - } - - return undefined; - }) - .catch((error) => Logger.error(error, 'Metric Job Exists function errored', LOG_CONTEXT)); - } - - private getWorkerOptions(): WorkerOptions { - return { - lockDuration: 500, - concurrency: 1, - settings: {}, - }; - } - - private getWorkerProcessor() { - return async () => { - return await new Promise(async (resolve, reject): Promise => { - Logger.verbose('metric job started', LOG_CONTEXT); - - try { - for (const queueService of this.tokenList) { - const metrics = await queueService.bullMqService.getQueueMetrics(); - - const completeNumber = metrics.completed.count; - const failNumber = metrics.failed.count; - - const successMetric: IMetric = { - count: metrics.completed.count, - total: completeNumber == 0 ? 0 : sum(metrics.completed.data), - min: completeNumber == 0 ? 0 : min(metrics.completed.data), - max: completeNumber == 0 ? 0 : max(metrics.completed.data), - sumOfSquares: completeNumber == 0 ? 0 : sumOfSquares(metrics.completed.data), - }; - - const failMetric: IMetric = { - count: metrics.failed.count, - total: failNumber == 0 ? 0 : sum(metrics.failed.data), - min: failNumber == 0 ? 0 : min(metrics.failed.data), - max: failNumber == 0 ? 0 : max(metrics.failed.data), - sumOfSquares: failNumber == 0 ? 0 : sumOfSquares(metrics.failed.data), - }; - - const waitCount = await queueService.bullMqService.queue.getWaitingCount(); - const delayedCount = await queueService.bullMqService.queue.getDelayedCount(); - const activeCount = await queueService.bullMqService.queue.getActiveCount(); - - if (process.env.NOVU_MANAGED_SERVICE === 'true') { - nr.recordMetric(`MetricQueueService/${queueService.topic}/completed`, successMetric); - nr.recordMetric(`MetricQueueService/${queueService.topic}/failed`, failMetric); - nr.recordMetric(`MetricQueueService/${queueService.topic}/waiting`, waitCount); - nr.recordMetric(`MetricQueueService/${queueService.topic}/delayed`, delayedCount); - nr.recordMetric(`MetricQueueService/${queueService.topic}/active`, activeCount); - } else { - Logger.debug(`MetricQueueService/${queueService.topic}/completed`, JSON.stringify(successMetric)); - Logger.debug(`MetricQueueService/${queueService.topic}/failed`, JSON.stringify(failMetric)); - Logger.debug(`MetricQueueService/${queueService.topic}/waiting`, waitCount); - Logger.debug(`MetricQueueService/${queueService.topic}/delayed`, delayedCount); - Logger.debug(`MetricQueueService/${queueService.topic}/active`, activeCount); - } - } - - return resolve(); - } catch (error) { - return reject(error); - } - }); - }; - } - - private async jobHasCompleted(job): Promise { - Logger.verbose('Metric job Completed', job.id, LOG_CONTEXT); - } - - private async jobHasFailed(job, error): Promise { - Logger.verbose('Metric job failed', error, LOG_CONTEXT); - } - - public async gracefulShutdown(): Promise { - Logger.log('Shutting the Queue service down', LOG_CONTEXT); - - await this.jobMetricsQueueService.gracefulShutdown(); - await this.jobMetricsWorkerService.gracefulShutdown(); - - Logger.log('Shutting down the Queue service has finished', LOG_CONTEXT); - } - - async onModuleDestroy(): Promise { - await this.gracefulShutdown(); - } -} diff --git a/apps/worker/src/app/workflow/workflow.module.ts b/apps/worker/src/app/workflow/workflow.module.ts index 9f9f086d388c..de43d788e3a3 100644 --- a/apps/worker/src/app/workflow/workflow.module.ts +++ b/apps/worker/src/app/workflow/workflow.module.ts @@ -29,7 +29,13 @@ import { } from '@novu/application-generic'; import { JobRepository, MessageRepository, OrganizationRepository, SubscriberRepository } from '@novu/dal'; -import { JobMetricService, StandardWorker, WorkflowWorker, OldInstanceWorkflowWorker } from './services'; +import { + ActiveJobsMetricService, + CompletedJobsMetricService, + StandardWorker, + WorkflowWorker, + OldInstanceWorkflowWorker, +} from './services'; import { MessageMatcher, SendMessage, @@ -97,8 +103,10 @@ const USE_CASES = [ ]; const PROVIDERS: Provider[] = [ + ActiveJobsMetricService, BullMqService, bullMqTokenList, + CompletedJobsMetricService, StandardWorker, WorkflowWorker, OldInstanceBullMqService, diff --git a/libs/shared/src/config/job-queue.ts b/libs/shared/src/config/job-queue.ts index 50ec627998fd..e679470ef64f 100644 --- a/libs/shared/src/config/job-queue.ts +++ b/libs/shared/src/config/job-queue.ts @@ -1,6 +1,7 @@ export enum JobTopicNameEnum { + ACTIVE_JOBS_METRIC = 'metric-active-jobs', + COMPLETED_JOBS_METRIC = 'metric-completed-jobs', INBOUND_PARSE_MAIL = 'inbound-parse-mail', - METRICS = 'metric', STANDARD = 'standard', WEB_SOCKETS = 'ws_socket_queue', WORKFLOW = 'trigger-handler', diff --git a/packages/application-generic/src/custom-providers/index.ts b/packages/application-generic/src/custom-providers/index.ts index 0628feca0216..2fa688c3186e 100644 --- a/packages/application-generic/src/custom-providers/index.ts +++ b/packages/application-generic/src/custom-providers/index.ts @@ -1,7 +1,3 @@ -import { - StandardQueueServiceHealthIndicator, - WorkflowQueueServiceHealthIndicator, -} from '../health'; import { AnalyticsService, BullMqService, diff --git a/packages/application-generic/src/health/active-jobs-metric-queue.health-indicator.ts b/packages/application-generic/src/health/active-jobs-metric-queue.health-indicator.ts new file mode 100644 index 000000000000..00882815daca --- /dev/null +++ b/packages/application-generic/src/health/active-jobs-metric-queue.health-indicator.ts @@ -0,0 +1,38 @@ +import { + HealthCheckError, + HealthIndicator, + HealthIndicatorResult, +} from '@nestjs/terminus'; +import { Injectable, Logger } from '@nestjs/common'; + +import { ActiveJobsMetricQueueService } from '../services'; + +const LOG_CONTEXT = 'ActiveJobsMetricQueueServiceHealthIndicator'; + +@Injectable() +export class ActiveJobsMetricQueueServiceHealthIndicator extends HealthIndicator { + private INDICATOR_KEY = 'activeJobsMetricQueue'; + + constructor( + private activeJobsMetricQueueService: ActiveJobsMetricQueueService + ) { + super(); + } + + async isHealthy(): Promise { + const isReady = this.activeJobsMetricQueueService.isReady(); + + if (isReady) { + Logger.verbose('ActiveJobsMetricQueueService is ready', LOG_CONTEXT); + + return this.getStatus(this.INDICATOR_KEY, true); + } + + Logger.verbose('ActiveJobsMetricQueueService is not ready', LOG_CONTEXT); + + throw new HealthCheckError( + 'ActiveJobsMetric Queue Health', + this.getStatus(this.INDICATOR_KEY, false) + ); + } +} diff --git a/packages/application-generic/src/health/completed-jobs-metric-queue.health-indicator.ts b/packages/application-generic/src/health/completed-jobs-metric-queue.health-indicator.ts new file mode 100644 index 000000000000..9227aa2078a5 --- /dev/null +++ b/packages/application-generic/src/health/completed-jobs-metric-queue.health-indicator.ts @@ -0,0 +1,38 @@ +import { + HealthCheckError, + HealthIndicator, + HealthIndicatorResult, +} from '@nestjs/terminus'; +import { Injectable, Logger } from '@nestjs/common'; + +import { CompletedJobsMetricQueueService } from '../services'; + +const LOG_CONTEXT = 'CompletedJobsMetricQueueServiceHealthIndicator'; + +@Injectable() +export class CompletedJobsMetricQueueServiceHealthIndicator extends HealthIndicator { + private INDICATOR_KEY = 'completedJobsMetricQueue'; + + constructor( + private completedJobsMetricQueueService: CompletedJobsMetricQueueService + ) { + super(); + } + + async isHealthy(): Promise { + const isReady = this.completedJobsMetricQueueService.isReady(); + + if (isReady) { + Logger.verbose('CompletedJobsMetricQueueService is ready', LOG_CONTEXT); + + return this.getStatus(this.INDICATOR_KEY, true); + } + + Logger.verbose('CompletedJobsMetricQueueService is not ready', LOG_CONTEXT); + + throw new HealthCheckError( + 'CompletedJobsMetric Queue Health', + this.getStatus(this.INDICATOR_KEY, false) + ); + } +} diff --git a/packages/application-generic/src/health/index.ts b/packages/application-generic/src/health/index.ts index d10070d78323..11bd8ddc4747 100644 --- a/packages/application-generic/src/health/index.ts +++ b/packages/application-generic/src/health/index.ts @@ -1,7 +1,8 @@ +export * from './active-jobs-metric-queue.health-indicator'; export * from './cache.health-indicator'; +export * from './completed-jobs-metric-queue.health-indicator'; export * from './dal.health-indicator'; export * from './inbound-parse-queue.health-indicator'; -export * from './job-metrics-queue.health-indicator'; export * from './standard-queue.health-indicator'; export * from './web-sockets-queue.health-indicator'; export * from './workflow-queue.health-indicator'; diff --git a/packages/application-generic/src/health/job-metrics-queue.health-indicator.ts b/packages/application-generic/src/health/job-metrics-queue.health-indicator.ts deleted file mode 100644 index 2c67666be591..000000000000 --- a/packages/application-generic/src/health/job-metrics-queue.health-indicator.ts +++ /dev/null @@ -1,36 +0,0 @@ -import { - HealthCheckError, - HealthIndicator, - HealthIndicatorResult, -} from '@nestjs/terminus'; -import { Injectable, Logger } from '@nestjs/common'; - -import { JobMetricsQueueService } from '../services'; - -const LOG_CONTEXT = 'JobMetricsQueueServiceHealthIndicator'; - -@Injectable() -export class JobMetricsQueueServiceHealthIndicator extends HealthIndicator { - private INDICATOR_KEY = 'jobMetricsQueue'; - - constructor(private jobMetricsQueueService: JobMetricsQueueService) { - super(); - } - - async isHealthy(): Promise { - const isReady = this.jobMetricsQueueService.isReady(); - - if (isReady) { - Logger.verbose('JobMetricsQueueService is ready', LOG_CONTEXT); - - return this.getStatus(this.INDICATOR_KEY, true); - } - - Logger.verbose('JobMetricsQueueService is not ready', LOG_CONTEXT); - - throw new HealthCheckError( - 'JobMetrics Queue Health', - this.getStatus(this.INDICATOR_KEY, false) - ); - } -} diff --git a/packages/application-generic/src/modules/queues.module.ts b/packages/application-generic/src/modules/queues.module.ts index 4b8cd11fdc57..4757ce4fb197 100644 --- a/packages/application-generic/src/modules/queues.module.ts +++ b/packages/application-generic/src/modules/queues.module.ts @@ -2,6 +2,8 @@ import { Module, Provider } from '@nestjs/common'; import { bullMqTokenList } from '../custom-providers'; import { + ActiveJobsMetricQueueServiceHealthIndicator, + CompletedJobsMetricQueueServiceHealthIndicator, InboundParseQueueServiceHealthIndicator, StandardQueueServiceHealthIndicator, WebSocketsQueueServiceHealthIndicator, @@ -13,14 +15,17 @@ import { ReadinessService, } from '../services'; import { + ActiveJobsMetricQueueService, + CompletedJobsMetricQueueService, InboundParseQueue, StandardQueueService, WebSocketsQueueService, WorkflowQueueService, } from '../services/queues'; import { + ActiveJobsMetricWorkerService, + CompletedJobsMetricWorkerService, InboundParseWorker, - JobMetricsWorkerService, StandardWorkerService, WebSocketsWorkerService, WorkflowWorkerService, @@ -28,8 +33,12 @@ import { } from '../services/workers'; const PROVIDERS: Provider[] = [ + ActiveJobsMetricQueueService, + ActiveJobsMetricWorkerService, bullMqTokenList, BullMqService, + CompletedJobsMetricQueueService, + CompletedJobsMetricWorkerService, InboundParseQueue, InboundParseWorker, InboundParseQueueServiceHealthIndicator, diff --git a/packages/application-generic/src/services/bull-mq/bull-mq.service.spec.ts b/packages/application-generic/src/services/bull-mq/bull-mq.service.spec.ts index 7cfabb6d8e3d..3e2f5d38dbcf 100644 --- a/packages/application-generic/src/services/bull-mq/bull-mq.service.spec.ts +++ b/packages/application-generic/src/services/bull-mq/bull-mq.service.spec.ts @@ -36,7 +36,7 @@ describe('BullMQ Service', () => { }); it('should create a queue properly with the default configuration', async () => { - const queueName = JobTopicNameEnum.METRICS; + const queueName = JobTopicNameEnum.ACTIVE_JOBS_METRIC; const queueOptions: QueueBaseOptions = {}; await bullMqService.createQueue(queueName, queueOptions); @@ -52,7 +52,7 @@ describe('BullMQ Service', () => { }); it('should create a worker properly with the default configuration', async () => { - const workerName = JobTopicNameEnum.METRICS; + const workerName = JobTopicNameEnum.ACTIVE_JOBS_METRIC; await bullMqService.createWorker(workerName, undefined, {}); expect(bullMqService.worker.name).toEqual(workerName); @@ -66,8 +66,11 @@ describe('BullMQ Service', () => { process.env.IS_IN_MEMORY_CLUSTER_MODE_ENABLED = 'true'; bullMqService = new BullMqService(); - const queue = bullMqService.createQueue(JobTopicNameEnum.METRICS, {}); - expect(queue.opts.prefix).toEqual('{metric}'); + const queue = bullMqService.createQueue( + JobTopicNameEnum.ACTIVE_JOBS_METRIC, + {} + ); + expect(queue.opts.prefix).toEqual('{metric-active-jobs}'); }); it('should not use prefix if a Redis provider is used and not in Cluster mode', async () => { @@ -75,7 +78,10 @@ describe('BullMQ Service', () => { process.env.IS_IN_MEMORY_CLUSTER_MODE_ENABLED = 'false'; bullMqService = new BullMqService(); - const queue = bullMqService.createQueue(JobTopicNameEnum.METRICS, {}); + const queue = bullMqService.createQueue( + JobTopicNameEnum.ACTIVE_JOBS_METRIC, + {} + ); expect(queue.opts.prefix).toEqual('bull'); }); @@ -84,8 +90,11 @@ describe('BullMQ Service', () => { process.env.MEMORY_DB_CLUSTER_SERVICE_HOST = ''; bullMqService = new BullMqService(); - const queue = bullMqService.createQueue(JobTopicNameEnum.METRICS, {}); - expect(queue.opts.prefix).toEqual('{metric}'); + const queue = bullMqService.createQueue( + JobTopicNameEnum.ACTIVE_JOBS_METRIC, + {} + ); + expect(queue.opts.prefix).toEqual('{metric-active-jobs}'); }); }); }); diff --git a/packages/application-generic/src/services/bull-mq/bull-mq.service.ts b/packages/application-generic/src/services/bull-mq/bull-mq.service.ts index a748278f4781..e4b7c5dc27b2 100644 --- a/packages/application-generic/src/services/bull-mq/bull-mq.service.ts +++ b/packages/application-generic/src/services/bull-mq/bull-mq.service.ts @@ -106,10 +106,13 @@ export class BullMqService { return BullMqService.pro && BullMqService.haveProInstalled(); } - public async getQueueMetrics(): Promise { + public async getQueueMetrics( + start?: number, + end?: number + ): Promise { return { - completed: await this._queue.getMetrics('completed'), - failed: await this._queue.getMetrics('failed'), + completed: await this._queue.getMetrics('completed', start, end), + failed: await this._queue.getMetrics('failed', start, end), }; } diff --git a/packages/application-generic/src/services/queues/active-jobs-metric-queue.service.spec.ts b/packages/application-generic/src/services/queues/active-jobs-metric-queue.service.spec.ts new file mode 100644 index 000000000000..c2de0de08ca5 --- /dev/null +++ b/packages/application-generic/src/services/queues/active-jobs-metric-queue.service.spec.ts @@ -0,0 +1,82 @@ +import { Test } from '@nestjs/testing'; + +import { ActiveJobsMetricQueueService } from './active-jobs-metric-queue.service'; + +let activeJobsMetricQueueService: ActiveJobsMetricQueueService; + +describe('Job metrics Queue service', () => { + describe('General', () => { + beforeAll(async () => { + activeJobsMetricQueueService = new ActiveJobsMetricQueueService(); + await activeJobsMetricQueueService.queue.drain(); + }); + + beforeEach(async () => { + await activeJobsMetricQueueService.queue.drain(); + }); + + afterEach(async () => { + await activeJobsMetricQueueService.queue.drain(); + }); + + afterAll(async () => { + await activeJobsMetricQueueService.gracefulShutdown(); + }); + + it('should be initialised properly', async () => { + expect(activeJobsMetricQueueService).toBeDefined(); + expect(Object.keys(activeJobsMetricQueueService)).toEqual( + expect.arrayContaining([ + 'topic', + 'DEFAULT_ATTEMPTS', + 'instance', + 'queue', + ]) + ); + expect(activeJobsMetricQueueService.DEFAULT_ATTEMPTS).toEqual(3); + expect(activeJobsMetricQueueService.topic).toEqual('metric-active-jobs'); + expect( + await activeJobsMetricQueueService.bullMqService.getStatus() + ).toEqual({ + queueIsPaused: false, + queueName: 'metric-active-jobs', + workerName: undefined, + workerIsPaused: undefined, + workerIsRunning: undefined, + }); + expect(await activeJobsMetricQueueService.isPaused()).toEqual(false); + expect(activeJobsMetricQueueService.queue).toMatchObject( + expect.objectContaining({ + _events: {}, + _eventsCount: 0, + _maxListeners: undefined, + name: 'metric-active-jobs', + jobsOpts: { + removeOnComplete: true, + }, + }) + ); + expect(activeJobsMetricQueueService.queue.opts.prefix).toEqual('bull'); + }); + }); + + describe('Cluster mode', () => { + beforeAll(async () => { + process.env.IS_IN_MEMORY_CLUSTER_MODE_ENABLED = 'true'; + + activeJobsMetricQueueService = new ActiveJobsMetricQueueService(); + await activeJobsMetricQueueService.queue.obliterate(); + }); + + afterAll(async () => { + await activeJobsMetricQueueService.gracefulShutdown(); + process.env.IS_IN_MEMORY_CLUSTER_MODE_ENABLED = 'false'; + }); + + it('should have prefix in cluster mode', async () => { + expect(activeJobsMetricQueueService.queue.opts.prefix).toEqual( + '{metric-active-jobs}' + ); + }); + }); +}); diff --git a/packages/application-generic/src/services/queues/job-metrics-queue.service.ts b/packages/application-generic/src/services/queues/active-jobs-metric-queue.service.ts similarity index 62% rename from packages/application-generic/src/services/queues/job-metrics-queue.service.ts rename to packages/application-generic/src/services/queues/active-jobs-metric-queue.service.ts index fade7682eb26..740cd3448db4 100644 --- a/packages/application-generic/src/services/queues/job-metrics-queue.service.ts +++ b/packages/application-generic/src/services/queues/active-jobs-metric-queue.service.ts @@ -3,12 +3,12 @@ import { JobTopicNameEnum } from '@novu/shared'; import { QueueBaseService } from './index'; -const LOG_CONTEXT = 'JobMetricsQueueService'; +const LOG_CONTEXT = 'ActiveJobsMetricQueueService'; @Injectable() -export class JobMetricsQueueService extends QueueBaseService { +export class ActiveJobsMetricQueueService extends QueueBaseService { constructor() { - super(JobTopicNameEnum.METRICS); + super(JobTopicNameEnum.ACTIVE_JOBS_METRIC); Logger.log(`Creating queue ${this.topic}`, LOG_CONTEXT); diff --git a/packages/application-generic/src/services/queues/completed-jobs-metric-queue.service.spec.ts b/packages/application-generic/src/services/queues/completed-jobs-metric-queue.service.spec.ts new file mode 100644 index 000000000000..c9b18528688d --- /dev/null +++ b/packages/application-generic/src/services/queues/completed-jobs-metric-queue.service.spec.ts @@ -0,0 +1,82 @@ +import { Test } from '@nestjs/testing'; + +import { CompletedJobsMetricQueueService } from './completed-jobs-metric-queue.service'; + +let completedJobsMetricQueueService: CompletedJobsMetricQueueService; + +describe('Job metrics Queue service', () => { + describe('General', () => { + beforeAll(async () => { + completedJobsMetricQueueService = new CompletedJobsMetricQueueService(); + await completedJobsMetricQueueService.queue.drain(); + }); + + beforeEach(async () => { + await completedJobsMetricQueueService.queue.drain(); + }); + + afterEach(async () => { + await completedJobsMetricQueueService.queue.drain(); + }); + + afterAll(async () => { + await completedJobsMetricQueueService.gracefulShutdown(); + }); + + it('should be initialised properly', async () => { + expect(completedJobsMetricQueueService).toBeDefined(); + expect(Object.keys(completedJobsMetricQueueService)).toEqual( + expect.arrayContaining([ + 'topic', + 'DEFAULT_ATTEMPTS', + 'instance', + 'queue', + ]) + ); + expect(completedJobsMetricQueueService.DEFAULT_ATTEMPTS).toEqual(3); + expect(completedJobsMetricQueueService.topic).toEqual('metric-completed-jobs'); + expect( + await completedJobsMetricQueueService.bullMqService.getStatus() + ).toEqual({ + queueIsPaused: false, + queueName: 'metric-completed-jobs', + workerName: undefined, + workerIsPaused: undefined, + workerIsRunning: undefined, + }); + expect(await completedJobsMetricQueueService.isPaused()).toEqual(false); + expect(completedJobsMetricQueueService.queue).toMatchObject( + expect.objectContaining({ + _events: {}, + _eventsCount: 0, + _maxListeners: undefined, + name: 'metric-completed-jobs', + jobsOpts: { + removeOnComplete: true, + }, + }) + ); + expect(completedJobsMetricQueueService.queue.opts.prefix).toEqual('bull'); + }); + }); + + describe('Cluster mode', () => { + beforeAll(async () => { + process.env.IS_IN_MEMORY_CLUSTER_MODE_ENABLED = 'true'; + + completedJobsMetricQueueService = new CompletedJobsMetricQueueService(); + await completedJobsMetricQueueService.queue.obliterate(); + }); + + afterAll(async () => { + await completedJobsMetricQueueService.gracefulShutdown(); + process.env.IS_IN_MEMORY_CLUSTER_MODE_ENABLED = 'false'; + }); + + it('should have prefix in cluster mode', async () => { + expect(completedJobsMetricQueueService.queue.opts.prefix).toEqual( + '{metric-completed-jobs}' + ); + }); + }); +}); diff --git a/packages/application-generic/src/services/queues/completed-jobs-metric-queue.service.ts b/packages/application-generic/src/services/queues/completed-jobs-metric-queue.service.ts new file mode 100644 index 000000000000..e4b4e84e9e93 --- /dev/null +++ b/packages/application-generic/src/services/queues/completed-jobs-metric-queue.service.ts @@ -0,0 +1,17 @@ +import { Inject, Injectable, Logger } from '@nestjs/common'; +import { JobTopicNameEnum } from '@novu/shared'; + +import { QueueBaseService } from './index'; + +const LOG_CONTEXT = 'CompletedJobsMetricQueueService'; + +@Injectable() +export class CompletedJobsMetricQueueService extends QueueBaseService { + constructor() { + super(JobTopicNameEnum.COMPLETED_JOBS_METRIC); + + Logger.log(`Creating queue ${this.topic}`, LOG_CONTEXT); + + this.createQueue(); + } +} diff --git a/packages/application-generic/src/services/queues/index.ts b/packages/application-generic/src/services/queues/index.ts index 9987b685c815..2be05a0c9e94 100644 --- a/packages/application-generic/src/services/queues/index.ts +++ b/packages/application-generic/src/services/queues/index.ts @@ -1,15 +1,17 @@ import { QueueBaseService } from './queue-base.service'; +import { ActiveJobsMetricQueueService } from './active-jobs-metric-queue.service'; +import { CompletedJobsMetricQueueService } from './completed-jobs-metric-queue.service'; import { InboundParseQueueService } from './inbound-parse-queue.service'; -import { JobMetricsQueueService } from './job-metrics-queue.service'; import { StandardQueueService } from './standard-queue.service'; import { WebSocketsQueueService } from './web-sockets-queue.service'; import { WorkflowQueueService } from './workflow-queue.service'; export { QueueBaseService, + ActiveJobsMetricQueueService, + CompletedJobsMetricQueueService, InboundParseQueueService as InboundParseQueue, - JobMetricsQueueService, StandardQueueService, WebSocketsQueueService, WorkflowQueueService, diff --git a/packages/application-generic/src/services/queues/job-metrics-queue.service.spec.ts b/packages/application-generic/src/services/queues/job-metrics-queue.service.spec.ts deleted file mode 100644 index 9ed97a53d042..000000000000 --- a/packages/application-generic/src/services/queues/job-metrics-queue.service.spec.ts +++ /dev/null @@ -1,78 +0,0 @@ -import { Test } from '@nestjs/testing'; - -import { JobMetricsQueueService } from './job-metrics-queue.service'; - -let jobMetricsQueueService: JobMetricsQueueService; - -describe('Job metrics Queue service', () => { - describe('General', () => { - beforeAll(async () => { - jobMetricsQueueService = new JobMetricsQueueService(); - await jobMetricsQueueService.queue.drain(); - }); - - beforeEach(async () => { - await jobMetricsQueueService.queue.drain(); - }); - - afterEach(async () => { - await jobMetricsQueueService.queue.drain(); - }); - - afterAll(async () => { - await jobMetricsQueueService.gracefulShutdown(); - }); - - it('should be initialised properly', async () => { - expect(jobMetricsQueueService).toBeDefined(); - expect(Object.keys(jobMetricsQueueService)).toEqual( - expect.arrayContaining([ - 'topic', - 'DEFAULT_ATTEMPTS', - 'instance', - 'queue', - ]) - ); - expect(jobMetricsQueueService.DEFAULT_ATTEMPTS).toEqual(3); - expect(jobMetricsQueueService.topic).toEqual('metric'); - expect(await jobMetricsQueueService.bullMqService.getStatus()).toEqual({ - queueIsPaused: false, - queueName: 'metric', - workerName: undefined, - workerIsPaused: undefined, - workerIsRunning: undefined, - }); - expect(await jobMetricsQueueService.isPaused()).toEqual(true); - expect(jobMetricsQueueService.queue).toMatchObject( - expect.objectContaining({ - _events: {}, - _eventsCount: 0, - _maxListeners: undefined, - name: 'metric', - jobsOpts: { - removeOnComplete: true, - }, - }) - ); - expect(jobMetricsQueueService.queue.opts.prefix).toEqual('bull'); - }); - }); - - describe('Cluster mode', () => { - beforeAll(async () => { - process.env.IS_IN_MEMORY_CLUSTER_MODE_ENABLED = 'true'; - - jobMetricsQueueService = new JobMetricsQueueService(); - await jobMetricsQueueService.queue.obliterate(); - }); - - afterAll(async () => { - await jobMetricsQueueService.gracefulShutdown(); - process.env.IS_IN_MEMORY_CLUSTER_MODE_ENABLED = 'false'; - }); - - it('should have prefix in cluster mode', async () => { - expect(jobMetricsQueueService.queue.opts.prefix).toEqual('{metric}'); - }); - }); -}); diff --git a/packages/application-generic/src/services/workers/job-metrics-worker.service.ts b/packages/application-generic/src/services/workers/active-jobs-metric-worker.service.ts similarity index 60% rename from packages/application-generic/src/services/workers/job-metrics-worker.service.ts rename to packages/application-generic/src/services/workers/active-jobs-metric-worker.service.ts index 82639c8a6751..8ea5b06aa7c6 100644 --- a/packages/application-generic/src/services/workers/job-metrics-worker.service.ts +++ b/packages/application-generic/src/services/workers/active-jobs-metric-worker.service.ts @@ -3,12 +3,12 @@ import { JobTopicNameEnum } from '@novu/shared'; import { WorkerBaseService } from './index'; -const LOG_CONTEXT = 'JobMetricsWorkerService'; +const LOG_CONTEXT = 'ActiveJobsMetricWorkerService'; @Injectable() -export class JobMetricsWorkerService extends WorkerBaseService { +export class ActiveJobsMetricWorkerService extends WorkerBaseService { constructor() { - super(JobTopicNameEnum.METRICS); + super(JobTopicNameEnum.ACTIVE_JOBS_METRIC); Logger.log(`Worker ${this.topic} instantiated`, LOG_CONTEXT); } } diff --git a/packages/application-generic/src/services/workers/completed-jobs-metric-worker.service.ts b/packages/application-generic/src/services/workers/completed-jobs-metric-worker.service.ts new file mode 100644 index 000000000000..2685dfc09c9a --- /dev/null +++ b/packages/application-generic/src/services/workers/completed-jobs-metric-worker.service.ts @@ -0,0 +1,14 @@ +import { Inject, Injectable, Logger } from '@nestjs/common'; +import { JobTopicNameEnum } from '@novu/shared'; + +import { WorkerBaseService } from './index'; + +const LOG_CONTEXT = 'CompletedJobsMetricWorkerService'; + +@Injectable() +export class CompletedJobsMetricWorkerService extends WorkerBaseService { + constructor() { + super(JobTopicNameEnum.COMPLETED_JOBS_METRIC); + Logger.log(`Worker ${this.topic} instantiated`, LOG_CONTEXT); + } +} diff --git a/packages/application-generic/src/services/workers/index.ts b/packages/application-generic/src/services/workers/index.ts index f5717d960f47..ee6f6d9b4dd5 100644 --- a/packages/application-generic/src/services/workers/index.ts +++ b/packages/application-generic/src/services/workers/index.ts @@ -4,16 +4,18 @@ import { WorkerProcessor, } from './worker-base.service'; +import { ActiveJobsMetricWorkerService } from './active-jobs-metric-worker.service'; +import { CompletedJobsMetricWorkerService } from './completed-jobs-metric-worker.service'; import { InboundParseWorkerService } from './inbound-parse-worker.service'; -import { JobMetricsWorkerService } from './job-metrics-worker.service'; import { StandardWorkerService } from './standard-worker.service'; import { WebSocketsWorkerService } from './web-sockets-worker.service'; import { WorkflowWorkerService } from './workflow-worker.service'; import { OldInstanceWorkflowWorkerService } from './old-instance-workflow-worker.service'; export { + ActiveJobsMetricWorkerService, + CompletedJobsMetricWorkerService, InboundParseWorkerService as InboundParseWorker, - JobMetricsWorkerService, StandardWorkerService, WebSocketsWorkerService, WorkerBaseService,