diff --git a/packages/application-generic/src/health/active-jobs-metric-queue.health-indicator.ts b/packages/application-generic/src/health/active-jobs-metric-queue.health-indicator.ts index 29c6c5e0364..fe5a80d525b 100644 --- a/packages/application-generic/src/health/active-jobs-metric-queue.health-indicator.ts +++ b/packages/application-generic/src/health/active-jobs-metric-queue.health-indicator.ts @@ -1,56 +1,22 @@ -import { - HealthCheckError, - HealthIndicator, - HealthIndicatorResult, -} from '@nestjs/terminus'; -import { Injectable, Logger } from '@nestjs/common'; +import { Injectable } from '@nestjs/common'; import { ActiveJobsMetricQueueService } from '../services'; +import { QueueHealthIndicator } from './queue-health-indicator.service'; const LOG_CONTEXT = 'ActiveJobsMetricQueueServiceHealthIndicator'; +const INDICATOR_KEY = 'activeJobsMetricQueue'; +const SERVICE_NAME = 'ActiveJobsMetricQueueService'; @Injectable() -export class ActiveJobsMetricQueueServiceHealthIndicator extends HealthIndicator { - private INDICATOR_KEY = 'activeJobsMetricQueue'; - +export class ActiveJobsMetricQueueServiceHealthIndicator extends QueueHealthIndicator { constructor( private activeJobsMetricQueueService: ActiveJobsMetricQueueService ) { - super(); - } - - async isHealthy(): Promise { - const isReady = this.activeJobsMetricQueueService.isReady(); - - if (isReady) { - Logger.verbose('ActiveJobsMetricQueueService is ready', LOG_CONTEXT); - - return this.getStatus(this.INDICATOR_KEY, true); - } - - Logger.verbose('ActiveJobsMetricQueueService is not ready', LOG_CONTEXT); - - throw new HealthCheckError( - 'ActiveJobsMetric Queue Health', - this.getStatus(this.INDICATOR_KEY, false) - ); - } - - async isActive(): Promise { - const isReady = this.activeJobsMetricQueueService.isReady(); - const isPaused = await this.activeJobsMetricQueueService.isPaused(); - - if (isReady && !isPaused) { - Logger.verbose('ActiveJobsMetricQueueService is active', LOG_CONTEXT); - - return this.getStatus(this.INDICATOR_KEY, true); - } - - Logger.verbose('ActiveJobsMetricQueueService is not active', LOG_CONTEXT); - - throw new HealthCheckError( - 'ActiveJobsMetric Queue Health', - this.getStatus(this.INDICATOR_KEY, false) + super( + activeJobsMetricQueueService, + INDICATOR_KEY, + SERVICE_NAME, + LOG_CONTEXT ); } } diff --git a/packages/application-generic/src/health/completed-jobs-metric-queue.health-indicator.ts b/packages/application-generic/src/health/completed-jobs-metric-queue.health-indicator.ts index 018eaf9f246..efa011c3835 100644 --- a/packages/application-generic/src/health/completed-jobs-metric-queue.health-indicator.ts +++ b/packages/application-generic/src/health/completed-jobs-metric-queue.health-indicator.ts @@ -1,58 +1,21 @@ -import { - HealthCheckError, - HealthIndicator, - HealthIndicatorResult, -} from '@nestjs/terminus'; -import { Injectable, Logger } from '@nestjs/common'; +import { Injectable } from '@nestjs/common'; import { CompletedJobsMetricQueueService } from '../services'; +import { QueueHealthIndicator } from './queue-health-indicator.service'; const LOG_CONTEXT = 'CompletedJobsMetricQueueServiceHealthIndicator'; - +const INDICATOR_KEY = 'completedJobsMetricQueue'; +const SERVICE_NAME = 'CompletedJobsMetricQueueService'; @Injectable() -export class CompletedJobsMetricQueueServiceHealthIndicator extends HealthIndicator { - private INDICATOR_KEY = 'completedJobsMetricQueue'; - +export class CompletedJobsMetricQueueServiceHealthIndicator extends QueueHealthIndicator { constructor( private completedJobsMetricQueueService: CompletedJobsMetricQueueService ) { - super(); - } - - async isHealthy(): Promise { - const isReady = this.completedJobsMetricQueueService.isReady(); - - if (isReady) { - Logger.verbose('CompletedJobsMetricQueueService is ready', LOG_CONTEXT); - - return this.getStatus(this.INDICATOR_KEY, true); - } - - Logger.verbose('CompletedJobsMetricQueueService is not ready', LOG_CONTEXT); - - throw new HealthCheckError( - 'CompletedJobsMetric Queue Health', - this.getStatus(this.INDICATOR_KEY, false) - ); - } - async isActive(): Promise { - const isReady = this.completedJobsMetricQueueService.isReady(); - const isPaused = await this.completedJobsMetricQueueService.isPaused(); - - if (isReady && !isPaused) { - Logger.verbose('CompletedJobsMetricQueueService is active', LOG_CONTEXT); - - return this.getStatus(this.INDICATOR_KEY, true); - } - - Logger.verbose( - 'CompletedJobsMetricQueueService is not active', + super( + completedJobsMetricQueueService, + INDICATOR_KEY, + SERVICE_NAME, LOG_CONTEXT ); - - throw new HealthCheckError( - 'CompletedJobsMetric Queue Health', - this.getStatus(this.INDICATOR_KEY, false) - ); } } diff --git a/packages/application-generic/src/health/dal.health-indicator.ts b/packages/application-generic/src/health/dal.health-indicator.ts index a57754869e2..569e6d0f101 100644 --- a/packages/application-generic/src/health/dal.health-indicator.ts +++ b/packages/application-generic/src/health/dal.health-indicator.ts @@ -1,8 +1,4 @@ -import { - HealthCheckError, - HealthIndicator, - HealthIndicatorResult, -} from '@nestjs/terminus'; +import { HealthIndicator, HealthIndicatorResult } from '@nestjs/terminus'; import { Injectable } from '@nestjs/common'; import { DalService } from '@novu/dal'; diff --git a/packages/application-generic/src/health/health-indicator.interface.ts b/packages/application-generic/src/health/health-indicator.interface.ts new file mode 100644 index 00000000000..a84c57b8531 --- /dev/null +++ b/packages/application-generic/src/health/health-indicator.interface.ts @@ -0,0 +1,6 @@ +import { HealthIndicatorResult } from '@nestjs/terminus'; + +export interface IHealthIndicator { + isHealthy(): Promise; + isActive(): Promise; +} diff --git a/packages/application-generic/src/health/inbound-parse-queue.health-indicator.ts b/packages/application-generic/src/health/inbound-parse-queue.health-indicator.ts index 471d84c80f7..d408277f4a4 100644 --- a/packages/application-generic/src/health/inbound-parse-queue.health-indicator.ts +++ b/packages/application-generic/src/health/inbound-parse-queue.health-indicator.ts @@ -1,36 +1,15 @@ -import { - HealthCheckError, - HealthIndicator, - HealthIndicatorResult, -} from '@nestjs/terminus'; -import { Injectable, Logger } from '@nestjs/common'; +import { Injectable } from '@nestjs/common'; import { InboundParseQueue } from '../services'; +import { QueueHealthIndicator } from './queue-health-indicator.service'; const LOG_CONTEXT = 'InboundParseQueueServiceHealthIndicator'; +const INDICATOR_KEY = 'inboundParseQueue'; +const SERVICE_NAME = 'InboundParseQueueService'; @Injectable() -export class InboundParseQueueServiceHealthIndicator extends HealthIndicator { - private INDICATOR_KEY = 'inboundParseQueue'; - +export class InboundParseQueueServiceHealthIndicator extends QueueHealthIndicator { constructor(private inboundParseQueueService: InboundParseQueue) { - super(); - } - - async isHealthy(): Promise { - const isReady = this.inboundParseQueueService.isReady(); - - if (isReady) { - Logger.verbose('InboundParseQueueService is ready', LOG_CONTEXT); - - return this.getStatus(this.INDICATOR_KEY, true); - } - - Logger.verbose('InboundParseQueueService is not ready', LOG_CONTEXT); - - throw new HealthCheckError( - 'InboundParse Queue Health', - this.getStatus(this.INDICATOR_KEY, false) - ); + super(inboundParseQueueService, INDICATOR_KEY, SERVICE_NAME, LOG_CONTEXT); } } diff --git a/packages/application-generic/src/health/queue-health-indicator.service.ts b/packages/application-generic/src/health/queue-health-indicator.service.ts new file mode 100644 index 00000000000..0c23216d091 --- /dev/null +++ b/packages/application-generic/src/health/queue-health-indicator.service.ts @@ -0,0 +1,67 @@ +import { + HealthCheckError, + HealthIndicator, + HealthIndicatorResult, +} from '@nestjs/terminus'; +import { Injectable, Logger } from '@nestjs/common'; + +import { QueueBaseService } from '../services'; +import { IHealthIndicator } from './health-indicator.interface'; + +@Injectable() +export abstract class QueueHealthIndicator + extends HealthIndicator + implements IHealthIndicator +{ + constructor( + private queueBaseService: QueueBaseService, + private indicatorKey: string, + private serviceName: string, + private logContext: string + ) { + super(); + } + + async isHealthy(): Promise { + return await this.handleHealthCheck(); + } + + async isActive(): Promise { + return await this.handleHealthCheck(true); + } + + async handleHealthCheck(checkActive = false) { + const isReady = this.queueBaseService.isReady(); + + if (!isReady) { + Logger.verbose(`${this.serviceName} is not ready`, this.logContext); + + throw new HealthCheckError( + `${this.serviceName} Health`, + this.getStatus(this.indicatorKey, false) + ); + } + + if (!checkActive) { + Logger.verbose(`${this.serviceName} is ready`, this.logContext); + + return this.getStatus(this.indicatorKey, true); + } + + const isPaused = await this.queueBaseService.isPaused(); + const isActive = isReady && !isPaused; + + if (!isActive) { + Logger.verbose(`${this.serviceName} is not active`, this.logContext); + + throw new HealthCheckError( + `${this.serviceName} Health`, + this.getStatus(this.indicatorKey, false) + ); + } + + Logger.verbose(`${this.serviceName} is active`, this.logContext); + + return this.getStatus(this.indicatorKey, true); + } +} diff --git a/packages/application-generic/src/health/standard-queue.health-indicator.ts b/packages/application-generic/src/health/standard-queue.health-indicator.ts index 5f3d4d7de9e..aa4c9a31908 100644 --- a/packages/application-generic/src/health/standard-queue.health-indicator.ts +++ b/packages/application-generic/src/health/standard-queue.health-indicator.ts @@ -1,54 +1,15 @@ -import { - HealthCheckError, - HealthIndicator, - HealthIndicatorResult, -} from '@nestjs/terminus'; -import { Injectable, Logger } from '@nestjs/common'; +import { Injectable } from '@nestjs/common'; import { StandardQueueService } from '../services'; +import { QueueHealthIndicator } from './queue-health-indicator.service'; const LOG_CONTEXT = 'StandardQueueServiceHealthIndicator'; +const INDICATOR_KEY = 'standardQueue'; +const SERVICE_NAME = 'StandardQueueService'; @Injectable() -export class StandardQueueServiceHealthIndicator extends HealthIndicator { - private INDICATOR_KEY = 'standardQueue'; - +export class StandardQueueServiceHealthIndicator extends QueueHealthIndicator { constructor(private standardQueueService: StandardQueueService) { - super(); - } - - async isHealthy(): Promise { - const isReady = this.standardQueueService.isReady(); - - if (isReady) { - Logger.verbose('StandardQueueService is ready', LOG_CONTEXT); - - return this.getStatus(this.INDICATOR_KEY, true); - } - - Logger.verbose('StandardQueueService is not ready', LOG_CONTEXT); - - throw new HealthCheckError( - 'Standard Queue Health', - this.getStatus(this.INDICATOR_KEY, false) - ); - } - - async isActive(): Promise { - const isReady = this.standardQueueService.isReady(); - const isPaused = await this.standardQueueService.isPaused(); - - if (isReady && !isPaused) { - Logger.verbose('StandardQueueService is active', LOG_CONTEXT); - - return this.getStatus(this.INDICATOR_KEY, true); - } - - Logger.verbose('StandardQueueService is not active', LOG_CONTEXT); - - throw new HealthCheckError( - 'Standard Queue Health', - this.getStatus(this.INDICATOR_KEY, false) - ); + super(standardQueueService, INDICATOR_KEY, SERVICE_NAME, LOG_CONTEXT); } } diff --git a/packages/application-generic/src/health/subscriber-process-queue.health-indicator.ts b/packages/application-generic/src/health/subscriber-process-queue.health-indicator.ts index 89e8d1074a5..96251dc6230 100644 --- a/packages/application-generic/src/health/subscriber-process-queue.health-indicator.ts +++ b/packages/application-generic/src/health/subscriber-process-queue.health-indicator.ts @@ -1,58 +1,24 @@ -import { - HealthCheckError, - HealthIndicator, - HealthIndicatorResult, -} from '@nestjs/terminus'; -import { Injectable, Logger } from '@nestjs/common'; +import { Injectable } from '@nestjs/common'; import { SubscriberProcessQueueService } from '../services'; import { ObservabilityBackgroundTransactionEnum } from '@novu/shared'; +import { QueueHealthIndicator } from './queue-health-indicator.service'; const LOG_CONTEXT = 'SubscriberProcessQueueHealthIndicator'; +const INDICATOR_KEY = + ObservabilityBackgroundTransactionEnum.SUBSCRIBER_PROCESSING_QUEUE; +const SERVICE_NAME = 'SubscriberProcessQueueService'; @Injectable() -export class SubscriberProcessQueueHealthIndicator extends HealthIndicator { - private INDICATOR_KEY = - ObservabilityBackgroundTransactionEnum.SUBSCRIBER_PROCESSING_QUEUE; - +export class SubscriberProcessQueueHealthIndicator extends QueueHealthIndicator { constructor( private subscriberProcessQueueService: SubscriberProcessQueueService ) { - super(); - } - - async isHealthy(): Promise { - const isReady = this.subscriberProcessQueueService.isReady(); - - if (isReady) { - Logger.verbose('SubscriberProcessQueueService is ready', LOG_CONTEXT); - - return this.getStatus(this.INDICATOR_KEY, true); - } - - Logger.verbose('SubscriberProcessQueueService is not ready', LOG_CONTEXT); - - throw new HealthCheckError( - 'Subscriber Process Queue Service Health', - this.getStatus(this.INDICATOR_KEY, false) - ); - } - - async isActive(): Promise { - const isReady = this.subscriberProcessQueueService.isReady(); - const isPaused = await this.subscriberProcessQueueService.isPaused(); - - if (isReady && !isPaused) { - Logger.verbose('SubscriberProcessQueueService is active', LOG_CONTEXT); - - return this.getStatus(this.INDICATOR_KEY, true); - } - - Logger.verbose('SubscriberProcessQueueService is not active', LOG_CONTEXT); - - throw new HealthCheckError( - 'Subscriber Process Queue Service Health', - this.getStatus(this.INDICATOR_KEY, false) + super( + subscriberProcessQueueService, + INDICATOR_KEY, + SERVICE_NAME, + LOG_CONTEXT ); } } diff --git a/packages/application-generic/src/health/web-sockets-queue.health-indicator.ts b/packages/application-generic/src/health/web-sockets-queue.health-indicator.ts index 8ac5fb776de..9ef05994af1 100644 --- a/packages/application-generic/src/health/web-sockets-queue.health-indicator.ts +++ b/packages/application-generic/src/health/web-sockets-queue.health-indicator.ts @@ -1,36 +1,15 @@ -import { - HealthCheckError, - HealthIndicator, - HealthIndicatorResult, -} from '@nestjs/terminus'; -import { Injectable, Logger } from '@nestjs/common'; +import { Injectable } from '@nestjs/common'; import { WebSocketsQueueService } from '../services'; +import { QueueHealthIndicator } from './queue-health-indicator.service'; const LOG_CONTEXT = 'WebSocketsQueueServiceHealthIndicator'; +const INDICATOR_KEY = 'webSocketsQueue'; +const SERVICE_NAME = 'WebSocketsQueueService'; @Injectable() -export class WebSocketsQueueServiceHealthIndicator extends HealthIndicator { - private INDICATOR_KEY = 'webSocketsQueue'; - +export class WebSocketsQueueServiceHealthIndicator extends QueueHealthIndicator { constructor(private webSocketsQueueService: WebSocketsQueueService) { - super(); - } - - async isHealthy(): Promise { - const isReady = this.webSocketsQueueService.isReady(); - - if (isReady) { - Logger.verbose('WebSocketsQueueService is ready', LOG_CONTEXT); - - return this.getStatus(this.INDICATOR_KEY, true); - } - - Logger.verbose('WebSocketsQueueService is not ready', LOG_CONTEXT); - - throw new HealthCheckError( - 'Ws Queue Health', - this.getStatus(this.INDICATOR_KEY, false) - ); + super(webSocketsQueueService, INDICATOR_KEY, SERVICE_NAME, LOG_CONTEXT); } } diff --git a/packages/application-generic/src/health/workflow-queue.health-indicator.ts b/packages/application-generic/src/health/workflow-queue.health-indicator.ts index 06c0ce447d5..6f5dd361a4a 100644 --- a/packages/application-generic/src/health/workflow-queue.health-indicator.ts +++ b/packages/application-generic/src/health/workflow-queue.health-indicator.ts @@ -1,54 +1,15 @@ -import { - HealthCheckError, - HealthIndicator, - HealthIndicatorResult, -} from '@nestjs/terminus'; -import { Injectable, Logger } from '@nestjs/common'; +import { Injectable } from '@nestjs/common'; import { WorkflowQueueService } from '../services'; +import { QueueHealthIndicator } from './queue-health-indicator.service'; const LOG_CONTEXT = 'WorkflowQueueServiceHealthIndicator'; +const INDICATOR_KEY = 'workflowQueue'; +const SERVICE_NAME = 'WorkflowQueueService'; @Injectable() -export class WorkflowQueueServiceHealthIndicator extends HealthIndicator { - private INDICATOR_KEY = 'workflowQueue'; - +export class WorkflowQueueServiceHealthIndicator extends QueueHealthIndicator { constructor(private workflowQueueService: WorkflowQueueService) { - super(); - } - - async isHealthy(): Promise { - const isReady = this.workflowQueueService.isReady(); - - if (isReady) { - Logger.verbose('WorkflowQueueService is ready', LOG_CONTEXT); - - return this.getStatus(this.INDICATOR_KEY, true); - } - - Logger.verbose('WorkflowQueueService is not ready', LOG_CONTEXT); - - throw new HealthCheckError( - 'Workflow Queue Health', - this.getStatus(this.INDICATOR_KEY, false) - ); - } - - async isActive(): Promise { - const isReady = this.workflowQueueService.isReady(); - const isPaused = await this.workflowQueueService.isPaused(); - - if (isReady && !isPaused) { - Logger.verbose('WorkflowQueueService is active', LOG_CONTEXT); - - return this.getStatus(this.INDICATOR_KEY, true); - } - - Logger.verbose('WorkflowQueueService is not active', LOG_CONTEXT); - - throw new HealthCheckError( - 'Workflow Queue Health', - this.getStatus(this.INDICATOR_KEY, false) - ); + super(workflowQueueService, INDICATOR_KEY, SERVICE_NAME, LOG_CONTEXT); } }