From 1116d743da7ac7d492281e9cc5729a23d572aaf0 Mon Sep 17 00:00:00 2001 From: Gosha Date: Tue, 28 Nov 2023 13:08:55 +0200 Subject: [PATCH 01/12] refactor: readiness service to receive indicator list --- .../src/modules/queues.module.ts | 39 ++++++++++++++----- .../readiness/readiness.service.spec.ts | 6 +-- .../services/readiness/readiness.service.ts | 36 +++++++---------- 3 files changed, 48 insertions(+), 33 deletions(-) diff --git a/packages/application-generic/src/modules/queues.module.ts b/packages/application-generic/src/modules/queues.module.ts index 86ed8e7d489..12bef3dc5d5 100644 --- a/packages/application-generic/src/modules/queues.module.ts +++ b/packages/application-generic/src/modules/queues.module.ts @@ -31,7 +31,27 @@ import { WorkflowWorkerService, } from '../services/workers'; -const PROVIDERS: Provider[] = [ +export const workerIndicatorList = { + provide: 'INDICATOR_LIST', + useFactory: ( + standardQueueServiceHealthIndicator: StandardQueueServiceHealthIndicator, + workflowQueueServiceHealthIndicator: WorkflowQueueServiceHealthIndicator, + subscriberProcessQueueHealthIndicator: SubscriberProcessQueueHealthIndicator + ) => { + return [ + standardQueueServiceHealthIndicator, + workflowQueueServiceHealthIndicator, + subscriberProcessQueueHealthIndicator, + ]; + }, + inject: [ + StandardQueueServiceHealthIndicator, + WorkflowQueueServiceHealthIndicator, + SubscriberProcessQueueHealthIndicator, + ], +}; + +const WORKER_PROVIDERS: Provider[] = [ ActiveJobsMetricQueueService, ActiveJobsMetricQueueServiceHealthIndicator, ActiveJobsMetricWorkerService, @@ -42,12 +62,14 @@ const PROVIDERS: Provider[] = [ InboundParseQueue, InboundParseWorker, InboundParseQueueServiceHealthIndicator, + workerIndicatorList, + StandardQueueServiceHealthIndicator, + WebSocketsQueueServiceHealthIndicator, + SubscriberProcessQueueHealthIndicator, ReadinessService, StandardQueueService, - StandardQueueServiceHealthIndicator, StandardWorkerService, WebSocketsQueueService, - WebSocketsQueueServiceHealthIndicator, WebSocketsWorkerService, WorkflowQueueService, ExecutionLogQueueService, @@ -55,16 +77,15 @@ const PROVIDERS: Provider[] = [ WorkflowWorkerService, SubscriberProcessQueueService, SubscriberProcessWorkerService, - SubscriberProcessQueueHealthIndicator, ]; @Module({ - providers: [...PROVIDERS], - exports: [...PROVIDERS], + providers: [...WORKER_PROVIDERS], + exports: [...WORKER_PROVIDERS], }) export class QueuesModule {} -const APP_PROVIDERS: Provider[] = [ +const API_PROVIDERS: Provider[] = [ InboundParseQueue, InboundParseWorker, InboundParseQueueServiceHealthIndicator, @@ -76,7 +97,7 @@ const APP_PROVIDERS: Provider[] = [ ]; @Module({ - providers: [...APP_PROVIDERS], - exports: [...APP_PROVIDERS], + providers: [...API_PROVIDERS], + exports: [...API_PROVIDERS], }) export class BaseApiQueuesModule {} diff --git a/packages/application-generic/src/services/readiness/readiness.service.spec.ts b/packages/application-generic/src/services/readiness/readiness.service.spec.ts index ef30899d704..e1a941cd6c2 100644 --- a/packages/application-generic/src/services/readiness/readiness.service.spec.ts +++ b/packages/application-generic/src/services/readiness/readiness.service.spec.ts @@ -41,11 +41,11 @@ describe('Readiness Service', () => { const subscriberProcessQueueHealthIndicator = new SubscriberProcessQueueHealthIndicator(subscriberProcessQueueService); - readinessService = new ReadinessService( + readinessService = new ReadinessService([ standardQueueServiceHealthIndicator, workflowQueueServiceHealthIndicator, - subscriberProcessQueueHealthIndicator - ); + subscriberProcessQueueHealthIndicator, + ]); }); afterAll(async () => { diff --git a/packages/application-generic/src/services/readiness/readiness.service.ts b/packages/application-generic/src/services/readiness/readiness.service.ts index be3ebe40ad4..8cdf09cf0a4 100644 --- a/packages/application-generic/src/services/readiness/readiness.service.ts +++ b/packages/application-generic/src/services/readiness/readiness.service.ts @@ -1,14 +1,11 @@ -import { Injectable, Logger } from '@nestjs/common'; +import { Inject, Injectable, Logger } from '@nestjs/common'; +import { HealthIndicatorResult, HealthIndicatorStatus } from '@nestjs/terminus'; +import { setTimeout } from 'timers/promises'; import { Worker } from '../bull-mq'; -import { - StandardQueueServiceHealthIndicator, - SubscriberProcessQueueHealthIndicator, - WebSocketsQueueServiceHealthIndicator, - WorkflowQueueServiceHealthIndicator, -} from '../../health'; -import { setTimeout } from 'timers/promises'; +import { IHealthIndicator } from '../../health'; + export interface INovuWorker { readonly DEFAULT_ATTEMPTS: number; gracefulShutdown: () => Promise; @@ -24,9 +21,7 @@ const LOG_CONTEXT = 'ReadinessService'; @Injectable() export class ReadinessService { constructor( - private standardQueueServiceHealthIndicator: StandardQueueServiceHealthIndicator, - private workflowQueueServiceHealthIndicator: WorkflowQueueServiceHealthIndicator, - private subscriberProcessQueueHealthIndicator: SubscriberProcessQueueHealthIndicator + @Inject('INDICATOR_LIST') private indicators: IHealthIndicator[] ) {} async areQueuesEnabled(): Promise { @@ -43,10 +38,7 @@ export class ReadinessService { } Logger.warn( - { - attempt: i, - message: `Some health indicator returned false when checking if queues are enabled ${i}/${retries}`, - }, + `Some health indicator returned false when checking if queues are enabled ${i}/${retries}`, LOG_CONTEXT ); @@ -58,13 +50,15 @@ export class ReadinessService { private async checkServicesHealth() { try { - const healths = await Promise.all([ - this.standardQueueServiceHealthIndicator.isHealthy(), - this.workflowQueueServiceHealthIndicator.isHealthy(), - this.subscriberProcessQueueHealthIndicator.isHealthy(), - ]); + const healths = await Promise.all( + this.indicators.map((indicator) => indicator.isHealthy()) + ); + + const statuses = healths.map( + (health: HealthIndicatorResult) => Object.values(health)[0].status + ); - return healths.every((health) => !!health === true); + return statuses.every((status: HealthIndicatorStatus) => status === 'up'); } catch (error) { Logger.error( error, From 501f26cb66cd8ecfb83a5ac78ae6d9b80e42f800 Mon Sep 17 00:00:00 2001 From: Gosha Date: Tue, 28 Nov 2023 18:37:59 +0200 Subject: [PATCH 02/12] feat: add readiness module and setup --- apps/worker/src/bootstrap.ts | 5 +- apps/ws/src/app.module.ts | 2 + apps/ws/src/bootstrap.ts | 7 +++ apps/ws/src/health/health.controller.ts | 17 +++---- apps/ws/src/health/health.module.ts | 8 ++-- apps/ws/src/readiness/readiness.module.ts | 47 +++++++++++++++++++ .../src/socket/services/cold-start.service.ts | 24 ++++++++++ apps/ws/src/socket/services/index.ts | 3 +- ... => ws-server-health-indicator.service.ts} | 4 +- apps/ws/src/socket/socket.module.ts | 6 +-- .../src/health/dal.health-indicator.ts | 10 +++- .../src/modules/queues.module.ts | 5 +- .../active-jobs-metric-worker.service.ts | 1 - .../completed-jobs-metric-worker.service.ts | 1 - .../workers/execution-log-worker.service.ts | 1 - .../workers/inbound-parse-worker.service.ts | 1 - .../workers/standard-worker.service.ts | 1 - .../subscriber-process-worker.service.ts | 1 - .../workers/web-sockets-worker.service.ts | 1 - .../workers/workflow-worker.service.ts | 1 - 20 files changed, 112 insertions(+), 34 deletions(-) create mode 100644 apps/ws/src/readiness/readiness.module.ts create mode 100644 apps/ws/src/socket/services/cold-start.service.ts rename apps/ws/src/socket/services/{ws-health-indicator.service.ts => ws-server-health-indicator.service.ts} (80%) diff --git a/apps/worker/src/bootstrap.ts b/apps/worker/src/bootstrap.ts index a4c56373962..24994941dbc 100644 --- a/apps/worker/src/bootstrap.ts +++ b/apps/worker/src/bootstrap.ts @@ -66,15 +66,16 @@ export async function bootstrap(): Promise { app.use(bodyParser.json()); app.use(bodyParser.urlencoded({ extended: true })); - // Starts listening for shutdown hooks app.enableShutdownHooks(); Logger.log('BOOTSTRAPPED SUCCESSFULLY'); - await app.listen(process.env.PORT); + await app.init(); await startAppInfra(app); + await app.listen(process.env.PORT); + Logger.log(`Started application in NODE_ENV=${process.env.NODE_ENV} on port ${process.env.PORT}`); return app; diff --git a/apps/ws/src/app.module.ts b/apps/ws/src/app.module.ts index fd953ff36bf..c8398407570 100644 --- a/apps/ws/src/app.module.ts +++ b/apps/ws/src/app.module.ts @@ -8,10 +8,12 @@ import { AppService } from './app.service'; import { SharedModule } from './shared/shared.module'; import { HealthModule } from './health/health.module'; import { SocketModule } from './socket/socket.module'; +import { ReadinessModule } from './readiness/readiness.module'; const packageJson = require('../package.json'); const modules = [ + ReadinessModule, SharedModule, HealthModule, SocketModule, diff --git a/apps/ws/src/bootstrap.ts b/apps/ws/src/bootstrap.ts index 3cf976b09de..6fe5f37425d 100644 --- a/apps/ws/src/bootstrap.ts +++ b/apps/ws/src/bootstrap.ts @@ -10,6 +10,7 @@ import helmet from 'helmet'; import { BullMqService } from '@novu/application-generic'; import { version } from '../package.json'; import { getErrorInterceptor, Logger } from '@novu/application-generic'; +import { prepareAppInfra, startAppInfra } from './socket/services'; if (process.env.SENTRY_DSN) { Sentry.init({ @@ -27,6 +28,8 @@ export async function bootstrap() { app.useLogger(app.get(Logger)); app.flushLogs(); + await prepareAppInfra(app); + app.useGlobalInterceptors(getErrorInterceptor()); app.setGlobalPrefix(CONTEXT_PATH); @@ -42,5 +45,9 @@ export async function bootstrap() { app.useWebSocketAdapter(redisIoAdapter); + await app.init(); + + await startAppInfra(app); + await app.listen(process.env.PORT as string); } diff --git a/apps/ws/src/health/health.controller.ts b/apps/ws/src/health/health.controller.ts index 115395e0638..36cc64ceb13 100644 --- a/apps/ws/src/health/health.controller.ts +++ b/apps/ws/src/health/health.controller.ts @@ -1,26 +1,23 @@ -import { Controller, Get } from '@nestjs/common'; -import { HealthCheck, HealthCheckResult, HealthCheckService, HttpHealthIndicator } from '@nestjs/terminus'; -import { DalServiceHealthIndicator, WebSocketsQueueServiceHealthIndicator } from '@novu/application-generic'; +import { Controller, Get, Inject } from '@nestjs/common'; +import { HealthCheck, HealthCheckResult, HealthCheckService } from '@nestjs/terminus'; +import { IHealthIndicator } from '@novu/application-generic'; import { version } from '../../package.json'; -import { WSHealthIndicator } from '../socket/services'; @Controller('v1/health-check') export class HealthController { constructor( private healthCheckService: HealthCheckService, - private dalHealthIndicator: DalServiceHealthIndicator, - private webSocketsQueueHealthIndicator: WebSocketsQueueServiceHealthIndicator, - private wsHealthIndicator: WSHealthIndicator + @Inject('INDICATOR_LIST') private indicators: IHealthIndicator[] ) {} @Get() @HealthCheck() async healthCheck(): Promise { + const indicatorHealthCheckFunctions = this.indicators.map((indicator) => async () => indicator.isHealthy()); + const result = await this.healthCheckService.check([ - async () => this.dalHealthIndicator.isHealthy(), - async () => this.webSocketsQueueHealthIndicator.isHealthy(), - async () => this.wsHealthIndicator.isHealthy(), + ...indicatorHealthCheckFunctions, async () => { return { apiVersion: { diff --git a/apps/ws/src/health/health.module.ts b/apps/ws/src/health/health.module.ts index e613d53072d..f10d0cb383f 100644 --- a/apps/ws/src/health/health.module.ts +++ b/apps/ws/src/health/health.module.ts @@ -1,16 +1,14 @@ import { Module } from '@nestjs/common'; import { TerminusModule } from '@nestjs/terminus'; -import { QueuesModule } from '@novu/application-generic'; import { HealthController } from './health.controller'; import { SharedModule } from '../shared/shared.module'; -import { WSGateway } from '../socket/ws.gateway'; -import { WSHealthIndicator } from '../socket/services'; +import { ReadinessModule } from '../readiness/readiness.module'; -const PROVIDERS = [WSHealthIndicator, WSGateway]; +const PROVIDERS = []; @Module({ - imports: [TerminusModule, SharedModule, QueuesModule], + imports: [TerminusModule, SharedModule, ReadinessModule], providers: PROVIDERS, controllers: [HealthController], }) diff --git a/apps/ws/src/readiness/readiness.module.ts b/apps/ws/src/readiness/readiness.module.ts new file mode 100644 index 00000000000..f808ddec3cf --- /dev/null +++ b/apps/ws/src/readiness/readiness.module.ts @@ -0,0 +1,47 @@ +import { Module, Provider } from '@nestjs/common'; +import { + DalServiceHealthIndicator, + IHealthIndicator, + ReadinessService, + WebSocketsQueueService, + WebSocketsQueueServiceHealthIndicator, +} from '@novu/application-generic'; +import { WSServerHealthIndicator } from '../socket/services'; +import { SharedModule } from '../shared/shared.module'; +import { SocketModule } from '../socket/socket.module'; + +export const indicatorList = { + provide: 'INDICATOR_LIST', + useFactory: ( + wsServerHealthIndicator: WSServerHealthIndicator, + webSocketsQueueServiceHealthIndicator: WebSocketsQueueServiceHealthIndicator, + dalServiceHealthIndicator: DalServiceHealthIndicator + ) => { + const indicators: IHealthIndicator[] = [ + wsServerHealthIndicator, + webSocketsQueueServiceHealthIndicator, + dalServiceHealthIndicator, + ]; + + return indicators; + }, + inject: [WSServerHealthIndicator, WebSocketsQueueServiceHealthIndicator, DalServiceHealthIndicator], +}; + +const PROVIDERS: Provider[] = [ + WSServerHealthIndicator, + WebSocketsQueueServiceHealthIndicator, + DalServiceHealthIndicator, + + WebSocketsQueueService, + + indicatorList, + ReadinessService, +]; + +@Module({ + imports: [SharedModule, SocketModule], + providers: [...PROVIDERS], + exports: [...PROVIDERS], +}) +export class ReadinessModule {} diff --git a/apps/ws/src/socket/services/cold-start.service.ts b/apps/ws/src/socket/services/cold-start.service.ts new file mode 100644 index 00000000000..3c75b823037 --- /dev/null +++ b/apps/ws/src/socket/services/cold-start.service.ts @@ -0,0 +1,24 @@ +import { INestApplication } from '@nestjs/common'; +import { INovuWorker, ReadinessService } from '@novu/application-generic'; +import { WebSocketWorker } from './web-socket.worker'; + +const getWorkers = (app: INestApplication): INovuWorker[] => { + const webSocketWorker = app.get(WebSocketWorker, { strict: false }); + + const workers: INovuWorker[] = [webSocketWorker]; + + return workers; +}; + +export const prepareAppInfra = async (app: INestApplication): Promise => { + const readinessService = app.get(ReadinessService); + const workers = getWorkers(app); + + await readinessService.pauseWorkers(workers); +}; + +export const startAppInfra = async (app: INestApplication): Promise => { + const readinessService = app.get(ReadinessService); + const workers = getWorkers(app); + await readinessService.enableWorkers(workers); +}; diff --git a/apps/ws/src/socket/services/index.ts b/apps/ws/src/socket/services/index.ts index 9c5f11999e9..ea05245c3c4 100644 --- a/apps/ws/src/socket/services/index.ts +++ b/apps/ws/src/socket/services/index.ts @@ -1,2 +1,3 @@ export { WebSocketWorker } from './web-socket.worker'; -export { WSHealthIndicator } from './ws-health-indicator.service'; +export { WSServerHealthIndicator } from './ws-server-health-indicator.service'; +export { prepareAppInfra, startAppInfra } from './cold-start.service'; diff --git a/apps/ws/src/socket/services/ws-health-indicator.service.ts b/apps/ws/src/socket/services/ws-server-health-indicator.service.ts similarity index 80% rename from apps/ws/src/socket/services/ws-health-indicator.service.ts rename to apps/ws/src/socket/services/ws-server-health-indicator.service.ts index f9be8125eb9..46d29d955ae 100644 --- a/apps/ws/src/socket/services/ws-health-indicator.service.ts +++ b/apps/ws/src/socket/services/ws-server-health-indicator.service.ts @@ -6,8 +6,8 @@ import { IHealthIndicator } from '@novu/application-generic'; import { WSGateway } from '../ws.gateway'; @Injectable() -export class WSHealthIndicator extends HealthIndicator implements IHealthIndicator { - private INDICATOR_KEY = 'ws'; +export class WSServerHealthIndicator extends HealthIndicator implements IHealthIndicator { + private INDICATOR_KEY = 'ws-server'; constructor(private wsGateway: WSGateway) { super(); diff --git a/apps/ws/src/socket/socket.module.ts b/apps/ws/src/socket/socket.module.ts index 15173f355a1..a2868ef2590 100644 --- a/apps/ws/src/socket/socket.module.ts +++ b/apps/ws/src/socket/socket.module.ts @@ -1,6 +1,4 @@ -import { Inject, Module, OnModuleInit, Provider } from '@nestjs/common'; -import { JobTopicNameEnum } from '@novu/shared'; -import { WebSocketsWorkerService } from '@novu/application-generic'; +import { Module, Provider } from '@nestjs/common'; import { WSGateway } from './ws.gateway'; import { SharedModule } from '../shared/shared.module'; @@ -10,7 +8,7 @@ import { WebSocketWorker } from './services'; const USE_CASES: Provider[] = [ExternalServicesRoute]; -const PROVIDERS: Provider[] = [WSGateway, WebSocketsWorkerService, WebSocketWorker]; +const PROVIDERS: Provider[] = [WSGateway, WebSocketWorker]; @Module({ imports: [SharedModule], diff --git a/packages/application-generic/src/health/dal.health-indicator.ts b/packages/application-generic/src/health/dal.health-indicator.ts index 569e6d0f101..d4e4dab7794 100644 --- a/packages/application-generic/src/health/dal.health-indicator.ts +++ b/packages/application-generic/src/health/dal.health-indicator.ts @@ -1,9 +1,13 @@ import { HealthIndicator, HealthIndicatorResult } from '@nestjs/terminus'; import { Injectable } from '@nestjs/common'; import { DalService } from '@novu/dal'; +import { IHealthIndicator } from './health-indicator.interface'; @Injectable() -export class DalServiceHealthIndicator extends HealthIndicator { +export class DalServiceHealthIndicator + extends HealthIndicator + implements IHealthIndicator +{ private INDICATOR_KEY = 'db'; constructor(private dalService: DalService) { @@ -15,4 +19,8 @@ export class DalServiceHealthIndicator extends HealthIndicator { return this.getStatus(this.INDICATOR_KEY, status); } + + isActive(): Promise { + return this.isHealthy(); + } } diff --git a/packages/application-generic/src/modules/queues.module.ts b/packages/application-generic/src/modules/queues.module.ts index 12bef3dc5d5..325b7be2b7b 100644 --- a/packages/application-generic/src/modules/queues.module.ts +++ b/packages/application-generic/src/modules/queues.module.ts @@ -4,6 +4,7 @@ import { bullMqTokenList } from '../custom-providers'; import { ActiveJobsMetricQueueServiceHealthIndicator, CompletedJobsMetricQueueServiceHealthIndicator, + IHealthIndicator, InboundParseQueueServiceHealthIndicator, StandardQueueServiceHealthIndicator, SubscriberProcessQueueHealthIndicator, @@ -38,11 +39,13 @@ export const workerIndicatorList = { workflowQueueServiceHealthIndicator: WorkflowQueueServiceHealthIndicator, subscriberProcessQueueHealthIndicator: SubscriberProcessQueueHealthIndicator ) => { - return [ + const indicatorList: IHealthIndicator[] = [ standardQueueServiceHealthIndicator, workflowQueueServiceHealthIndicator, subscriberProcessQueueHealthIndicator, ]; + + return indicatorList; }, inject: [ StandardQueueServiceHealthIndicator, diff --git a/packages/application-generic/src/services/workers/active-jobs-metric-worker.service.ts b/packages/application-generic/src/services/workers/active-jobs-metric-worker.service.ts index 8ea5b06aa7c..d60ea040de1 100644 --- a/packages/application-generic/src/services/workers/active-jobs-metric-worker.service.ts +++ b/packages/application-generic/src/services/workers/active-jobs-metric-worker.service.ts @@ -9,6 +9,5 @@ const LOG_CONTEXT = 'ActiveJobsMetricWorkerService'; export class ActiveJobsMetricWorkerService extends WorkerBaseService { constructor() { super(JobTopicNameEnum.ACTIVE_JOBS_METRIC); - Logger.log(`Worker ${this.topic} instantiated`, LOG_CONTEXT); } } diff --git a/packages/application-generic/src/services/workers/completed-jobs-metric-worker.service.ts b/packages/application-generic/src/services/workers/completed-jobs-metric-worker.service.ts index 2685dfc09c9..924307c921a 100644 --- a/packages/application-generic/src/services/workers/completed-jobs-metric-worker.service.ts +++ b/packages/application-generic/src/services/workers/completed-jobs-metric-worker.service.ts @@ -9,6 +9,5 @@ const LOG_CONTEXT = 'CompletedJobsMetricWorkerService'; export class CompletedJobsMetricWorkerService extends WorkerBaseService { constructor() { super(JobTopicNameEnum.COMPLETED_JOBS_METRIC); - Logger.log(`Worker ${this.topic} instantiated`, LOG_CONTEXT); } } diff --git a/packages/application-generic/src/services/workers/execution-log-worker.service.ts b/packages/application-generic/src/services/workers/execution-log-worker.service.ts index c661b62858d..cca09d1a6a7 100644 --- a/packages/application-generic/src/services/workers/execution-log-worker.service.ts +++ b/packages/application-generic/src/services/workers/execution-log-worker.service.ts @@ -9,6 +9,5 @@ const LOG_CONTEXT = 'ExecutionLogWorkerService'; export class ExecutionLogWorkerService extends WorkerBaseService { constructor() { super(JobTopicNameEnum.EXECUTION_LOG); - Logger.log(`Worker ${this.topic} instantiated`, LOG_CONTEXT); } } diff --git a/packages/application-generic/src/services/workers/inbound-parse-worker.service.ts b/packages/application-generic/src/services/workers/inbound-parse-worker.service.ts index 6789c9d0d5e..e6c181dd2b3 100644 --- a/packages/application-generic/src/services/workers/inbound-parse-worker.service.ts +++ b/packages/application-generic/src/services/workers/inbound-parse-worker.service.ts @@ -9,6 +9,5 @@ const LOG_CONTEXT = 'InboundParseWorkerService'; export class InboundParseWorkerService extends WorkerBaseService { constructor() { super(JobTopicNameEnum.INBOUND_PARSE_MAIL); - Logger.log(`Worker ${this.topic} instantiated`, LOG_CONTEXT); } } diff --git a/packages/application-generic/src/services/workers/standard-worker.service.ts b/packages/application-generic/src/services/workers/standard-worker.service.ts index 052a06d4ad3..ff5395c0d00 100644 --- a/packages/application-generic/src/services/workers/standard-worker.service.ts +++ b/packages/application-generic/src/services/workers/standard-worker.service.ts @@ -9,6 +9,5 @@ const LOG_CONTEXT = 'StandardWorkerService'; export class StandardWorkerService extends WorkerBaseService { constructor() { super(JobTopicNameEnum.STANDARD); - Logger.log(`Worker ${this.topic} instantiated`, LOG_CONTEXT); } } diff --git a/packages/application-generic/src/services/workers/subscriber-process-worker.service.ts b/packages/application-generic/src/services/workers/subscriber-process-worker.service.ts index c2da7287d83..72e194db1f2 100644 --- a/packages/application-generic/src/services/workers/subscriber-process-worker.service.ts +++ b/packages/application-generic/src/services/workers/subscriber-process-worker.service.ts @@ -13,6 +13,5 @@ export class SubscriberProcessWorkerService { constructor() { super(JobTopicNameEnum.PROCESS_SUBSCRIBER); - Logger.log(`Worker ${this.topic} instantiated`, LOG_CONTEXT); } } diff --git a/packages/application-generic/src/services/workers/web-sockets-worker.service.ts b/packages/application-generic/src/services/workers/web-sockets-worker.service.ts index 21eca627bc4..64acbf65d46 100644 --- a/packages/application-generic/src/services/workers/web-sockets-worker.service.ts +++ b/packages/application-generic/src/services/workers/web-sockets-worker.service.ts @@ -9,6 +9,5 @@ const LOG_CONTEXT = 'WebSocketsWorkerService'; export class WebSocketsWorkerService extends WorkerBaseService { constructor() { super(JobTopicNameEnum.WEB_SOCKETS); - Logger.log(`Worker ${this.topic} instantiated`, LOG_CONTEXT); } } diff --git a/packages/application-generic/src/services/workers/workflow-worker.service.ts b/packages/application-generic/src/services/workers/workflow-worker.service.ts index 415aa040304..07ea0e46220 100644 --- a/packages/application-generic/src/services/workers/workflow-worker.service.ts +++ b/packages/application-generic/src/services/workers/workflow-worker.service.ts @@ -9,6 +9,5 @@ const LOG_CONTEXT = 'WorkflowWorkerService'; export class WorkflowWorkerService extends WorkerBaseService { constructor() { super(JobTopicNameEnum.WORKFLOW); - Logger.log(`Worker ${this.topic} instantiated`, LOG_CONTEXT); } } From 1e3ca93e90910cf8b2c76860b47293cd85a814a6 Mon Sep 17 00:00:00 2001 From: Gosha Date: Thu, 30 Nov 2023 14:24:41 +0200 Subject: [PATCH 03/12] feat: set is online as false on connection disconnection --- .../subscriber-online/subscriber-online.service.ts | 7 ++----- apps/ws/src/socket/ws.gateway.ts | 9 +-------- 2 files changed, 3 insertions(+), 13 deletions(-) diff --git a/apps/ws/src/shared/subscriber-online/subscriber-online.service.ts b/apps/ws/src/shared/subscriber-online/subscriber-online.service.ts index a66029d724e..7f05fec5b63 100644 --- a/apps/ws/src/shared/subscriber-online/subscriber-online.service.ts +++ b/apps/ws/src/shared/subscriber-online/subscriber-online.service.ts @@ -17,12 +17,9 @@ export class SubscriberOnlineService { await this.updateOnlineStatus(subscriber, { isOnline }); } - async handleDisconnection(subscriber: ISubscriberJwt, activeConnections: number) { + async handleDisconnection(subscriber: ISubscriberJwt) { const lastOnlineAt = new Date().toISOString(); - let isOnline = false; - if (activeConnections > 1) { - isOnline = true; - } + const isOnline = false; await this.updateOnlineStatus(subscriber, { isOnline, lastOnlineAt }); } diff --git a/apps/ws/src/socket/ws.gateway.ts b/apps/ws/src/socket/ws.gateway.ts index f6f451db552..99230603978 100644 --- a/apps/ws/src/socket/ws.gateway.ts +++ b/apps/ws/src/socket/ws.gateway.ts @@ -94,8 +94,7 @@ export class WSGateway implements OnGatewayConnection, OnGatewayDisconnect { return; } - const activeConnections = await this.getActiveConnections(connection, subscriber._id); - await this.subscriberOnlineService.handleDisconnection(subscriber, activeConnections); + await this.subscriberOnlineService.handleDisconnection(subscriber); } private async processConnectionRequest(connection: Socket) { @@ -128,10 +127,4 @@ export class WSGateway implements OnGatewayConnection, OnGatewayDisconnect { private disconnect(socket: Socket) { socket.disconnect(); } - - private async getActiveConnections(socket: Socket, subscriberId: string) { - const activeSockets = await socket.in(subscriberId).fetchSockets(); - - return activeSockets.length; - } } From b9394a142cfd014bbb7dacd5f4c6430f45519db7 Mon Sep 17 00:00:00 2001 From: Gosha Date: Thu, 30 Nov 2023 14:26:38 +0200 Subject: [PATCH 04/12] feat: remove redundant call to redis adaptor --- .../external-services-route.usecase.ts | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/apps/ws/src/socket/usecases/external-services-route/external-services-route.usecase.ts b/apps/ws/src/socket/usecases/external-services-route/external-services-route.usecase.ts index cff06088138..901c42b0080 100644 --- a/apps/ws/src/socket/usecases/external-services-route/external-services-route.usecase.ts +++ b/apps/ws/src/socket/usecases/external-services-route/external-services-route.usecase.ts @@ -60,12 +60,6 @@ export class ExternalServicesRoute { return; } - const connection = await this.connectionExist(command); - - if (!connection) { - return; - } - let unreadCount = this.extractCount(command.payload?.unreadCount); if (unreadCount === undefined) { @@ -91,12 +85,6 @@ export class ExternalServicesRoute { return; } - const connection = await this.connectionExist(command); - - if (!connection) { - return; - } - let unseenCount = this.extractCount(command.payload?.unseenCount); if (unseenCount === undefined) { From 9da5ba2e9a8a062c0145b8e062028b1c91b53731 Mon Sep 17 00:00:00 2001 From: Gosha Date: Thu, 30 Nov 2023 19:32:12 +0200 Subject: [PATCH 05/12] feat: add graceful shutdown on module destroy --- apps/api/src/bootstrap.ts | 1 - .../workflow/services/execution-log.worker.ts | 3 +- .../app/workflow/services/standard.worker.ts | 10 ++----- .../app/workflow/services/workflow.worker.ts | 3 +- apps/ws/src/bootstrap.ts | 2 ++ .../src/socket/services/web-socket.worker.ts | 9 ++---- apps/ws/src/socket/ws.gateway.ts | 30 +++++++++++++++++-- .../application-generic/src/modules/index.ts | 1 + .../src/modules/interfaces.ts | 4 +++ .../services/readiness/readiness.service.ts | 5 ++-- .../subscriber-process-worker.service.ts | 6 +--- .../services/workers/worker-base.service.ts | 3 +- 12 files changed, 46 insertions(+), 31 deletions(-) create mode 100644 packages/application-generic/src/modules/interfaces.ts diff --git a/apps/api/src/bootstrap.ts b/apps/api/src/bootstrap.ts index 7cf551fb7b0..4e5aa65601c 100644 --- a/apps/api/src/bootstrap.ts +++ b/apps/api/src/bootstrap.ts @@ -126,7 +126,6 @@ export async function bootstrap(expressApp?): Promise { await app.listen(process.env.PORT); } - // Starts listening for shutdown hooks app.enableShutdownHooks(); Logger.log(`Started application in NODE_ENV=${process.env.NODE_ENV} on port ${process.env.PORT}`); diff --git a/apps/worker/src/app/workflow/services/execution-log.worker.ts b/apps/worker/src/app/workflow/services/execution-log.worker.ts index 73b51f852a0..d9efb8ede68 100644 --- a/apps/worker/src/app/workflow/services/execution-log.worker.ts +++ b/apps/worker/src/app/workflow/services/execution-log.worker.ts @@ -2,7 +2,6 @@ import { Injectable, Logger } from '@nestjs/common'; const nr = require('newrelic'); import { getExecutionLogWorkerOptions, - INovuWorker, PinoLogger, storage, Store, @@ -16,7 +15,7 @@ import { ObservabilityBackgroundTransactionEnum } from '@novu/shared'; const LOG_CONTEXT = 'ExecutionLogWorker'; @Injectable() -export class ExecutionLogWorker extends ExecutionLogWorkerService implements INovuWorker { +export class ExecutionLogWorker extends ExecutionLogWorkerService { constructor(private createExecutionDetails: CreateExecutionDetails) { super(); this.initWorker(this.getWorkerProcessor(), this.getWorkerOptions()); diff --git a/apps/worker/src/app/workflow/services/standard.worker.ts b/apps/worker/src/app/workflow/services/standard.worker.ts index 9253993de62..cd0d235e820 100644 --- a/apps/worker/src/app/workflow/services/standard.worker.ts +++ b/apps/worker/src/app/workflow/services/standard.worker.ts @@ -1,14 +1,8 @@ const nr = require('newrelic'); import { forwardRef, Inject, Injectable, Logger } from '@nestjs/common'; -import { - ExecutionDetailsSourceEnum, - ExecutionDetailsStatusEnum, - IJobData, - ObservabilityBackgroundTransactionEnum, -} from '@novu/shared'; +import { IJobData, ObservabilityBackgroundTransactionEnum } from '@novu/shared'; import { getStandardWorkerOptions, - INovuWorker, Job, PinoLogger, StandardWorkerService, @@ -32,7 +26,7 @@ import { const LOG_CONTEXT = 'StandardWorker'; @Injectable() -export class StandardWorker extends StandardWorkerService implements INovuWorker { +export class StandardWorker extends StandardWorkerService { constructor( private handleLastFailedJob: HandleLastFailedJob, private runJob: RunJob, diff --git a/apps/worker/src/app/workflow/services/workflow.worker.ts b/apps/worker/src/app/workflow/services/workflow.worker.ts index 7bd417a00ef..960541982a0 100644 --- a/apps/worker/src/app/workflow/services/workflow.worker.ts +++ b/apps/worker/src/app/workflow/services/workflow.worker.ts @@ -2,7 +2,6 @@ import { Injectable, Logger } from '@nestjs/common'; const nr = require('newrelic'); import { getWorkflowWorkerOptions, - INovuWorker, PinoLogger, storage, Store, @@ -17,7 +16,7 @@ import { ObservabilityBackgroundTransactionEnum } from '@novu/shared'; const LOG_CONTEXT = 'WorkflowWorker'; @Injectable() -export class WorkflowWorker extends WorkflowWorkerService implements INovuWorker { +export class WorkflowWorker extends WorkflowWorkerService { constructor(private triggerEventUsecase: TriggerEvent) { super(); diff --git a/apps/ws/src/bootstrap.ts b/apps/ws/src/bootstrap.ts index 6fe5f37425d..a19e9994ccc 100644 --- a/apps/ws/src/bootstrap.ts +++ b/apps/ws/src/bootstrap.ts @@ -45,6 +45,8 @@ export async function bootstrap() { app.useWebSocketAdapter(redisIoAdapter); + app.enableShutdownHooks(); + await app.init(); await startAppInfra(app); diff --git a/apps/ws/src/socket/services/web-socket.worker.ts b/apps/ws/src/socket/services/web-socket.worker.ts index 106a70249a1..c4af549b92b 100644 --- a/apps/ws/src/socket/services/web-socket.worker.ts +++ b/apps/ws/src/socket/services/web-socket.worker.ts @@ -1,12 +1,7 @@ const nr = require('newrelic'); import { Injectable, Logger } from '@nestjs/common'; -import { - getWebSocketWorkerOptions, - INovuWorker, - WebSocketsWorkerService, - WorkerOptions, -} from '@novu/application-generic'; +import { getWebSocketWorkerOptions, WebSocketsWorkerService, WorkerOptions } from '@novu/application-generic'; import { ExternalServicesRoute, ExternalServicesRouteCommand } from '../usecases/external-services-route'; import { ObservabilityBackgroundTransactionEnum } from '@novu/shared'; @@ -14,7 +9,7 @@ import { ObservabilityBackgroundTransactionEnum } from '@novu/shared'; const LOG_CONTEXT = 'WebSocketWorker'; @Injectable() -export class WebSocketWorker extends WebSocketsWorkerService implements INovuWorker { +export class WebSocketWorker extends WebSocketsWorkerService { constructor(private externalServicesRoute: ExternalServicesRoute) { super(); diff --git a/apps/ws/src/socket/ws.gateway.ts b/apps/ws/src/socket/ws.gateway.ts index 99230603978..ed5ac468450 100644 --- a/apps/ws/src/socket/ws.gateway.ts +++ b/apps/ws/src/socket/ws.gateway.ts @@ -1,17 +1,18 @@ const nr = require('newrelic'); +import { Server, Socket } from 'socket.io'; import { JwtService } from '@nestjs/jwt'; import { OnGatewayConnection, OnGatewayDisconnect, WebSocketGateway, WebSocketServer } from '@nestjs/websockets'; import { Logger } from '@nestjs/common'; -import { Server, Socket } from 'socket.io'; import { ISubscriberJwt, ObservabilityBackgroundTransactionEnum } from '@novu/shared'; +import { IDestroy } from '@novu/application-generic'; import { SubscriberOnlineService } from '../shared/subscriber-online'; const LOG_CONTEXT = 'WSGateway'; @WebSocketGateway() -export class WSGateway implements OnGatewayConnection, OnGatewayDisconnect { +export class WSGateway implements OnGatewayConnection, OnGatewayDisconnect, IDestroy { constructor(private jwtService: JwtService, private subscriberOnlineService: SubscriberOnlineService) {} @WebSocketServer() @@ -127,4 +128,29 @@ export class WSGateway implements OnGatewayConnection, OnGatewayDisconnect { private disconnect(socket: Socket) { socket.disconnect(); } + + async gracefulShutdown(): Promise { + try { + if (!this.server) { + Logger.error('WS server was not initialized while executing shutdown', LOG_CONTEXT); + + return; + } + + Logger.log('Closing WS server for incoming new connections', LOG_CONTEXT); + this.server.close(); + + Logger.log('Disconnecting active sockets connections', LOG_CONTEXT); + this.server.sockets.disconnectSockets(); + } catch (e) { + Logger.error(e, 'Unexpected exception was thrown while graceful shut down was executed', LOG_CONTEXT); + throw e; + } finally { + Logger.error(`Graceful shutdown down has finished`, LOG_CONTEXT); + } + } + + async onModuleDestroy(): Promise { + await this.gracefulShutdown(); + } } diff --git a/packages/application-generic/src/modules/index.ts b/packages/application-generic/src/modules/index.ts index bdbed523286..32de3e456ee 100644 --- a/packages/application-generic/src/modules/index.ts +++ b/packages/application-generic/src/modules/index.ts @@ -1,2 +1,3 @@ export { QueuesModule } from './queues.module'; export { BaseApiQueuesModule } from './queues.module'; +export * from './interfaces'; diff --git a/packages/application-generic/src/modules/interfaces.ts b/packages/application-generic/src/modules/interfaces.ts new file mode 100644 index 00000000000..4d8fa06bc51 --- /dev/null +++ b/packages/application-generic/src/modules/interfaces.ts @@ -0,0 +1,4 @@ +export interface IDestroy { + gracefulShutdown?: () => Promise; + onModuleDestroy: () => Promise; +} diff --git a/packages/application-generic/src/services/readiness/readiness.service.ts b/packages/application-generic/src/services/readiness/readiness.service.ts index 8cdf09cf0a4..1d1bdbb7fc5 100644 --- a/packages/application-generic/src/services/readiness/readiness.service.ts +++ b/packages/application-generic/src/services/readiness/readiness.service.ts @@ -5,12 +5,11 @@ import { setTimeout } from 'timers/promises'; import { Worker } from '../bull-mq'; import { IHealthIndicator } from '../../health'; +import { IDestroy } from '../../modules'; -export interface INovuWorker { +export interface INovuWorker extends IDestroy { readonly DEFAULT_ATTEMPTS: number; - gracefulShutdown: () => Promise; readonly topic: string; - onModuleDestroy: () => Promise; pause: () => Promise; resume: () => Promise; worker: Worker; diff --git a/packages/application-generic/src/services/workers/subscriber-process-worker.service.ts b/packages/application-generic/src/services/workers/subscriber-process-worker.service.ts index 72e194db1f2..0ef52365b1b 100644 --- a/packages/application-generic/src/services/workers/subscriber-process-worker.service.ts +++ b/packages/application-generic/src/services/workers/subscriber-process-worker.service.ts @@ -1,16 +1,12 @@ import { Injectable, Logger } from '@nestjs/common'; import { JobTopicNameEnum } from '@novu/shared'; -import { INovuWorker } from '../readiness'; import { WorkerBaseService } from './worker-base.service'; const LOG_CONTEXT = 'SubscriberProcessWorkerService'; @Injectable() -export class SubscriberProcessWorkerService - extends WorkerBaseService - implements INovuWorker -{ +export class SubscriberProcessWorkerService extends WorkerBaseService { constructor() { super(JobTopicNameEnum.PROCESS_SUBSCRIBER); } diff --git a/packages/application-generic/src/services/workers/worker-base.service.ts b/packages/application-generic/src/services/workers/worker-base.service.ts index fa722ca225e..6fbc0612a25 100644 --- a/packages/application-generic/src/services/workers/worker-base.service.ts +++ b/packages/application-generic/src/services/workers/worker-base.service.ts @@ -8,6 +8,7 @@ import { Worker, WorkerOptions, } from '../bull-mq'; +import { INovuWorker } from '../readiness'; const LOG_CONTEXT = 'WorkerService'; @@ -18,7 +19,7 @@ export type WorkerProcessor = export { WorkerOptions }; -export class WorkerBaseService { +export class WorkerBaseService implements INovuWorker { private instance: BullMqService; public readonly DEFAULT_ATTEMPTS = 3; From 0dfe1286fdb82d709e7fac7672472fbf8c362ee1 Mon Sep 17 00:00:00 2001 From: Gosha Date: Mon, 4 Dec 2023 20:35:24 +0200 Subject: [PATCH 06/12] feat: update after next merge --- apps/ws/src/app.module.ts | 2 - apps/ws/src/health/health.controller.ts | 2 +- apps/ws/src/health/health.module.ts | 3 +- apps/ws/src/readiness/readiness.module.ts | 47 ----------------------- 4 files changed, 2 insertions(+), 52 deletions(-) delete mode 100644 apps/ws/src/readiness/readiness.module.ts diff --git a/apps/ws/src/app.module.ts b/apps/ws/src/app.module.ts index c8398407570..fd953ff36bf 100644 --- a/apps/ws/src/app.module.ts +++ b/apps/ws/src/app.module.ts @@ -8,12 +8,10 @@ import { AppService } from './app.service'; import { SharedModule } from './shared/shared.module'; import { HealthModule } from './health/health.module'; import { SocketModule } from './socket/socket.module'; -import { ReadinessModule } from './readiness/readiness.module'; const packageJson = require('../package.json'); const modules = [ - ReadinessModule, SharedModule, HealthModule, SocketModule, diff --git a/apps/ws/src/health/health.controller.ts b/apps/ws/src/health/health.controller.ts index 36cc64ceb13..8075d118834 100644 --- a/apps/ws/src/health/health.controller.ts +++ b/apps/ws/src/health/health.controller.ts @@ -8,7 +8,7 @@ import { version } from '../../package.json'; export class HealthController { constructor( private healthCheckService: HealthCheckService, - @Inject('INDICATOR_LIST') private indicators: IHealthIndicator[] + @Inject('QUEUE_HEALTH_INDICATORS') private indicators: IHealthIndicator[] ) {} @Get() diff --git a/apps/ws/src/health/health.module.ts b/apps/ws/src/health/health.module.ts index f10d0cb383f..86d06e49dc8 100644 --- a/apps/ws/src/health/health.module.ts +++ b/apps/ws/src/health/health.module.ts @@ -3,12 +3,11 @@ import { TerminusModule } from '@nestjs/terminus'; import { HealthController } from './health.controller'; import { SharedModule } from '../shared/shared.module'; -import { ReadinessModule } from '../readiness/readiness.module'; const PROVIDERS = []; @Module({ - imports: [TerminusModule, SharedModule, ReadinessModule], + imports: [TerminusModule, SharedModule], providers: PROVIDERS, controllers: [HealthController], }) diff --git a/apps/ws/src/readiness/readiness.module.ts b/apps/ws/src/readiness/readiness.module.ts deleted file mode 100644 index f808ddec3cf..00000000000 --- a/apps/ws/src/readiness/readiness.module.ts +++ /dev/null @@ -1,47 +0,0 @@ -import { Module, Provider } from '@nestjs/common'; -import { - DalServiceHealthIndicator, - IHealthIndicator, - ReadinessService, - WebSocketsQueueService, - WebSocketsQueueServiceHealthIndicator, -} from '@novu/application-generic'; -import { WSServerHealthIndicator } from '../socket/services'; -import { SharedModule } from '../shared/shared.module'; -import { SocketModule } from '../socket/socket.module'; - -export const indicatorList = { - provide: 'INDICATOR_LIST', - useFactory: ( - wsServerHealthIndicator: WSServerHealthIndicator, - webSocketsQueueServiceHealthIndicator: WebSocketsQueueServiceHealthIndicator, - dalServiceHealthIndicator: DalServiceHealthIndicator - ) => { - const indicators: IHealthIndicator[] = [ - wsServerHealthIndicator, - webSocketsQueueServiceHealthIndicator, - dalServiceHealthIndicator, - ]; - - return indicators; - }, - inject: [WSServerHealthIndicator, WebSocketsQueueServiceHealthIndicator, DalServiceHealthIndicator], -}; - -const PROVIDERS: Provider[] = [ - WSServerHealthIndicator, - WebSocketsQueueServiceHealthIndicator, - DalServiceHealthIndicator, - - WebSocketsQueueService, - - indicatorList, - ReadinessService, -]; - -@Module({ - imports: [SharedModule, SocketModule], - providers: [...PROVIDERS], - exports: [...PROVIDERS], -}) -export class ReadinessModule {} From 96edddee3d2565a2f18d09b7bf6e39e7c6575cfb Mon Sep 17 00:00:00 2001 From: Gosha Date: Mon, 4 Dec 2023 21:27:34 +0200 Subject: [PATCH 07/12] feat: bring back dal and sw server health checks --- apps/ws/src/health/health.controller.ts | 7 ++++++- apps/ws/src/health/health.module.ts | 4 +++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/apps/ws/src/health/health.controller.ts b/apps/ws/src/health/health.controller.ts index 8075d118834..26ac141ed20 100644 --- a/apps/ws/src/health/health.controller.ts +++ b/apps/ws/src/health/health.controller.ts @@ -1,13 +1,16 @@ import { Controller, Get, Inject } from '@nestjs/common'; import { HealthCheck, HealthCheckResult, HealthCheckService } from '@nestjs/terminus'; -import { IHealthIndicator } from '@novu/application-generic'; +import { DalServiceHealthIndicator, IHealthIndicator } from '@novu/application-generic'; import { version } from '../../package.json'; +import { WSServerHealthIndicator } from '../socket/services'; @Controller('v1/health-check') export class HealthController { constructor( private healthCheckService: HealthCheckService, + private dalHealthIndicator: DalServiceHealthIndicator, + private wsServerHealthIndicator: WSServerHealthIndicator, @Inject('QUEUE_HEALTH_INDICATORS') private indicators: IHealthIndicator[] ) {} @@ -18,6 +21,8 @@ export class HealthController { const result = await this.healthCheckService.check([ ...indicatorHealthCheckFunctions, + async () => this.dalHealthIndicator.isHealthy(), + async () => this.wsServerHealthIndicator.isHealthy(), async () => { return { apiVersion: { diff --git a/apps/ws/src/health/health.module.ts b/apps/ws/src/health/health.module.ts index 86d06e49dc8..5bfb63c3ae5 100644 --- a/apps/ws/src/health/health.module.ts +++ b/apps/ws/src/health/health.module.ts @@ -3,8 +3,10 @@ import { TerminusModule } from '@nestjs/terminus'; import { HealthController } from './health.controller'; import { SharedModule } from '../shared/shared.module'; +import { WSServerHealthIndicator } from '../socket/services'; +import { WSGateway } from '../socket/ws.gateway'; -const PROVIDERS = []; +const PROVIDERS = [WSServerHealthIndicator, WSGateway]; @Module({ imports: [TerminusModule, SharedModule], From 36533ffb615b86777e21ef53eabff2217f5faf07 Mon Sep 17 00:00:00 2001 From: Gosha Date: Mon, 4 Dec 2023 22:16:53 +0200 Subject: [PATCH 08/12] feat: init inbound worker --- .../app/inbound-parse/inbound-parse.module.ts | 27 +++++++++++++++---- apps/api/src/app/shared/shared.module.ts | 1 - 2 files changed, 22 insertions(+), 6 deletions(-) diff --git a/apps/api/src/app/inbound-parse/inbound-parse.module.ts b/apps/api/src/app/inbound-parse/inbound-parse.module.ts index 1396f677227..fbb71fb78fa 100644 --- a/apps/api/src/app/inbound-parse/inbound-parse.module.ts +++ b/apps/api/src/app/inbound-parse/inbound-parse.module.ts @@ -1,5 +1,5 @@ -import { MiddlewareConsumer, Module, NestModule } from '@nestjs/common'; -import { CompileTemplate } from '@novu/application-generic'; +import { MiddlewareConsumer, Module, NestModule, OnApplicationShutdown } from '@nestjs/common'; +import { CompileTemplate, WorkflowInMemoryProviderService } from '@novu/application-generic'; import { USE_CASES } from './usecases'; import { InboundParseController } from './inbound-parse.controller'; @@ -7,15 +7,32 @@ import { GetMxRecord } from './usecases/get-mx-record/get-mx-record.usecase'; import { SharedModule } from '../shared/shared.module'; import { AuthModule } from '../auth/auth.module'; +import { InboundParseWorkerService } from './services/inbound-parse.worker.service'; -const PROVIDERS = [GetMxRecord, CompileTemplate]; +const PROVIDERS = [GetMxRecord, CompileTemplate, InboundParseWorkerService]; + +const memoryQueueService = { + provide: WorkflowInMemoryProviderService, + useFactory: async () => { + const memoryService = new WorkflowInMemoryProviderService(); + + await memoryService.initialize(); + + return memoryService; + }, +}; @Module({ imports: [SharedModule, AuthModule], controllers: [InboundParseController], - providers: [...PROVIDERS, ...USE_CASES], + providers: [...PROVIDERS, ...USE_CASES, memoryQueueService], exports: [...USE_CASES], }) -export class InboundParseModule implements NestModule { +export class InboundParseModule implements NestModule, OnApplicationShutdown { + constructor(private workflowInMemoryProviderService: WorkflowInMemoryProviderService) {} configure(consumer: MiddlewareConsumer): MiddlewareConsumer | void {} + + async onApplicationShutdown() { + await this.workflowInMemoryProviderService.shutdown(); + } } diff --git a/apps/api/src/app/shared/shared.module.ts b/apps/api/src/app/shared/shared.module.ts index 946edf5bc34..270688cfb6c 100644 --- a/apps/api/src/app/shared/shared.module.ts +++ b/apps/api/src/app/shared/shared.module.ts @@ -97,7 +97,6 @@ const PROVIDERS = [ imports: [ QueuesModule.forRoot([ JobTopicNameEnum.EXECUTION_LOG, - JobTopicNameEnum.WORKFLOW, JobTopicNameEnum.WEB_SOCKETS, JobTopicNameEnum.WORKFLOW, JobTopicNameEnum.INBOUND_PARSE_MAIL, From d745611eee9619035be44d669a6ec31ab12fb77c Mon Sep 17 00:00:00 2001 From: Gosha Date: Mon, 4 Dec 2023 22:17:19 +0200 Subject: [PATCH 09/12] feat: kill process in failure --- apps/ws/src/bootstrap.ts | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/apps/ws/src/bootstrap.ts b/apps/ws/src/bootstrap.ts index 6fe5f37425d..c6376fe7693 100644 --- a/apps/ws/src/bootstrap.ts +++ b/apps/ws/src/bootstrap.ts @@ -47,7 +47,11 @@ export async function bootstrap() { await app.init(); - await startAppInfra(app); + try { + await startAppInfra(app); + } catch (e) { + process.exit(1); + } await app.listen(process.env.PORT as string); } From 60deaad1b12db0725cbfe205fe80fb5043613143 Mon Sep 17 00:00:00 2001 From: Gosha Date: Tue, 5 Dec 2023 00:12:45 +0200 Subject: [PATCH 10/12] fix: remove duplicated sw gateway --- apps/ws/src/health/health.module.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/apps/ws/src/health/health.module.ts b/apps/ws/src/health/health.module.ts index 5bfb63c3ae5..1cd3d409ad7 100644 --- a/apps/ws/src/health/health.module.ts +++ b/apps/ws/src/health/health.module.ts @@ -4,12 +4,12 @@ import { TerminusModule } from '@nestjs/terminus'; import { HealthController } from './health.controller'; import { SharedModule } from '../shared/shared.module'; import { WSServerHealthIndicator } from '../socket/services'; -import { WSGateway } from '../socket/ws.gateway'; +import { SocketModule } from '../socket/socket.module'; -const PROVIDERS = [WSServerHealthIndicator, WSGateway]; +const PROVIDERS = [WSServerHealthIndicator]; @Module({ - imports: [TerminusModule, SharedModule], + imports: [TerminusModule, SharedModule, SocketModule], providers: PROVIDERS, controllers: [HealthController], }) From 1ccd8926188b02f8a8418559e0fa9498742aa8bb Mon Sep 17 00:00:00 2001 From: Gosha Date: Tue, 5 Dec 2023 12:20:01 +0200 Subject: [PATCH 11/12] feat: revert is online support and skip update on shutdown --- .../subscriber-online.service.ts | 8 ++++-- apps/ws/src/socket/ws.gateway.ts | 25 +++++++++++++++++-- 2 files changed, 29 insertions(+), 4 deletions(-) diff --git a/apps/ws/src/shared/subscriber-online/subscriber-online.service.ts b/apps/ws/src/shared/subscriber-online/subscriber-online.service.ts index 7f05fec5b63..513e6578767 100644 --- a/apps/ws/src/shared/subscriber-online/subscriber-online.service.ts +++ b/apps/ws/src/shared/subscriber-online/subscriber-online.service.ts @@ -17,9 +17,13 @@ export class SubscriberOnlineService { await this.updateOnlineStatus(subscriber, { isOnline }); } - async handleDisconnection(subscriber: ISubscriberJwt) { + async handleDisconnection(subscriber: ISubscriberJwt, activeConnections: number) { const lastOnlineAt = new Date().toISOString(); - const isOnline = false; + let isOnline = false; + + if (activeConnections > 1) { + isOnline = true; + } await this.updateOnlineStatus(subscriber, { isOnline, lastOnlineAt }); } diff --git a/apps/ws/src/socket/ws.gateway.ts b/apps/ws/src/socket/ws.gateway.ts index ed5ac468450..7307096529b 100644 --- a/apps/ws/src/socket/ws.gateway.ts +++ b/apps/ws/src/socket/ws.gateway.ts @@ -13,6 +13,8 @@ const LOG_CONTEXT = 'WSGateway'; @WebSocketGateway() export class WSGateway implements OnGatewayConnection, OnGatewayDisconnect, IDestroy { + private isShutdown = false; + constructor(private jwtService: JwtService, private subscriberOnlineService: SubscriberOnlineService) {} @WebSocketServer() @@ -83,7 +85,18 @@ export class WSGateway implements OnGatewayConnection, OnGatewayDisconnect, IDes } } + /* + * This method is called when a client disconnects from the server. + * * When a shutdown is in progress, we opt out of updating the subscriber status, + * assuming that when the current instance goes down, another instance will take its place and handle the subscriber status update. + */ private async processDisconnectionRequest(connection: Socket) { + if (!this.isShutdown) { + await this.handlerSubscriberDisconnection(connection); + } + } + + private async handlerSubscriberDisconnection(connection: Socket) { const token = this.extractToken(connection); if (!token || token === 'null') { @@ -95,7 +108,14 @@ export class WSGateway implements OnGatewayConnection, OnGatewayDisconnect, IDes return; } - await this.subscriberOnlineService.handleDisconnection(subscriber); + const activeConnections = await this.getActiveConnections(connection, subscriber._id); + await this.subscriberOnlineService.handleDisconnection(subscriber, activeConnections); + } + + private async getActiveConnections(socket: Socket, subscriberId: string) { + const activeSockets = await socket.in(subscriberId).fetchSockets(); + + return activeSockets.length; } private async processConnectionRequest(connection: Socket) { @@ -146,11 +166,12 @@ export class WSGateway implements OnGatewayConnection, OnGatewayDisconnect, IDes Logger.error(e, 'Unexpected exception was thrown while graceful shut down was executed', LOG_CONTEXT); throw e; } finally { - Logger.error(`Graceful shutdown down has finished`, LOG_CONTEXT); + Logger.log(`Graceful shutdown down has finished`, LOG_CONTEXT); } } async onModuleDestroy(): Promise { + this.isShutdown = true; await this.gracefulShutdown(); } } From 773dde0d63692fc79224190f75a02c578a1ec493 Mon Sep 17 00:00:00 2001 From: Dima Grossman Date: Tue, 5 Dec 2023 14:06:16 +0200 Subject: [PATCH 12/12] chore(providers): Update axios versions (#4940) * chore(providers): Update kannel axios * refactor(providers): remove unused imports * fix: axios for twillio --- libs/dal/package.json | 1 - packages/node/package.json | 1 - packages/stateless/package.json | 1 - pnpm-lock.yaml | 46 +++++++++++++++++++++------------ providers/kannel/jest.config.js | 3 +++ providers/kannel/package.json | 2 +- providers/sendgrid/package.json | 2 +- providers/twilio/jest.config.js | 5 +++- providers/twilio/package.json | 2 +- 9 files changed, 39 insertions(+), 24 deletions(-) diff --git a/libs/dal/package.json b/libs/dal/package.json index da644740d84..e5dba0264ff 100644 --- a/libs/dal/package.json +++ b/libs/dal/package.json @@ -25,7 +25,6 @@ "@aws-sdk/s3-request-presigner": "^3.382.0", "@faker-js/faker": "^6.0.0", "@novu/shared": "^0.22.0", - "@sendgrid/mail": "^7.4.2", "JSONStream": "^1.3.5", "archiver": "^5.0.0", "async": "^3.2.0", diff --git a/packages/node/package.json b/packages/node/package.json index 06b137e0027..a702498edf1 100644 --- a/packages/node/package.json +++ b/packages/node/package.json @@ -50,7 +50,6 @@ "uuid": "^9.0.1" }, "devDependencies": { - "@sendgrid/mail": "^7.4.6", "@types/jest": "29.5.2", "@types/lodash.get": "^4.4.6", "@types/lodash.merge": "^4.6.6", diff --git a/packages/stateless/package.json b/packages/stateless/package.json index 5ac47dedc3a..8441733ff74 100644 --- a/packages/stateless/package.json +++ b/packages/stateless/package.json @@ -47,7 +47,6 @@ "lodash.merge": "^4.6.2" }, "devDependencies": { - "@sendgrid/mail": "^7.4.6", "@types/jest": "29.5.2", "@types/lodash.get": "^4.4.6", "@types/lodash.merge": "^4.6.6", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 0dd9561b568..065bbb20aea 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1633,9 +1633,6 @@ importers: '@novu/shared': specifier: ^0.22.0 version: link:../shared - '@sendgrid/mail': - specifier: ^7.4.2 - version: 7.7.0 JSONStream: specifier: ^1.3.5 version: 1.3.5 @@ -2629,9 +2626,6 @@ importers: specifier: ^9.0.1 version: 9.0.1 devDependencies: - '@sendgrid/mail': - specifier: ^7.4.6 - version: 7.7.0 '@types/jest': specifier: 29.5.2 version: 29.5.2 @@ -3032,9 +3026,6 @@ importers: specifier: ^4.6.2 version: 4.6.2 devDependencies: - '@sendgrid/mail': - specifier: ^7.4.6 - version: 7.7.0 '@types/jest': specifier: 29.5.2 version: 29.5.2 @@ -4128,8 +4119,8 @@ importers: specifier: ^0.22.0 version: link:../../packages/stateless axios: - specifier: ^0.27.2 - version: 0.27.2 + specifier: ^1.6.2 + version: 1.6.2 devDependencies: '@istanbuljs/nyc-config-typescript': specifier: ^1.0.1 @@ -5352,7 +5343,7 @@ importers: specifier: ^0.22.0 version: link:../../packages/stateless '@sendgrid/mail': - specifier: ^7.4.6 + specifier: ^7.7.0 version: 7.7.0 devDependencies: '@istanbuljs/nyc-config-typescript': @@ -5948,8 +5939,8 @@ importers: specifier: ^0.22.0 version: link:../../packages/stateless twilio: - specifier: ^4.14.1 - version: 4.15.0 + specifier: ^4.19.3 + version: 4.19.3 devDependencies: '@istanbuljs/nyc-config-typescript': specifier: 1.0.2 @@ -20170,12 +20161,14 @@ packages: axios: 0.26.1 transitivePeerDependencies: - debug + dev: false /@sendgrid/helpers@7.7.0: resolution: {integrity: sha512-3AsAxfN3GDBcXoZ/y1mzAAbKzTtUZ5+ZrHOmWQ279AuaFXUNCh9bPnRpN504bgveTqoW+11IzPg3I0WVgDINpw==} engines: {node: '>= 6.0.0'} dependencies: deepmerge: 4.3.1 + dev: false /@sendgrid/mail@7.7.0: resolution: {integrity: sha512-5+nApPE9wINBvHSUxwOxkkQqM/IAAaBYoP9hw7WwgDNQPxraruVqHizeTitVtKGiqWCKm2mnjh4XGN3fvFLqaw==} @@ -20185,6 +20178,7 @@ packages: '@sendgrid/helpers': 7.7.0 transitivePeerDependencies: - debug + dev: false /@sentry-internal/tracing@7.47.0: resolution: {integrity: sha512-udpHnCzF8DQsWf0gQwd0XFGp6Y8MOiwnl8vGt2ohqZGS3m1+IxoRLXsSkD8qmvN6KKDnwbaAvYnK0z0L+AW95g==} @@ -23650,7 +23644,7 @@ packages: dependencies: '@storybook/client-logger': 7.4.2 memoizerific: 1.11.3 - qs: 6.11.1 + qs: 6.11.2 react: 17.0.2 react-dom: 17.0.2(react@17.0.2) @@ -27568,6 +27562,7 @@ packages: follow-redirects: 1.15.2 transitivePeerDependencies: - debug + dev: false /axios@0.27.2: resolution: {integrity: sha512-t+yRIyySRTp/wua5xEr+z1q60QmLq8ABsS5O9Me1AsE5dfKqgnCFzwiCZZ/cGNd1lq4/7akDWMxdhVlucjmnOQ==} @@ -32336,7 +32331,7 @@ packages: eslint: 8.51.0 eslint-import-resolver-node: 0.3.7 eslint-module-utils: 2.8.0(@typescript-eslint/parser@5.58.0)(eslint-import-resolver-node@0.3.7)(eslint-import-resolver-webpack@0.13.7)(eslint@8.51.0) - has: 1.0.3 + has: 1.0.4 is-core-module: 2.13.0 is-glob: 4.0.3 minimatch: 3.1.2 @@ -36780,7 +36775,7 @@ packages: pretty-format: 27.5.1 slash: 3.0.0 strip-json-comments: 3.1.1 - ts-node: 10.9.1(@types/node@16.11.7)(typescript@4.9.5) + ts-node: 10.9.1(@types/node@14.18.42)(typescript@4.9.5) transitivePeerDependencies: - bufferutil - canvas @@ -49415,6 +49410,23 @@ packages: - supports-color dev: false + /twilio@4.19.3: + resolution: {integrity: sha512-3X5Czl9Vg4QFl+2pnfMQ+H8YfEDQ4WeuAmqjUpbK65x0DfmxTCHuPEFWUKVZCJZew6iltJB/1whhVvIKETe54A==} + engines: {node: '>=14.0'} + dependencies: + axios: 1.6.2 + dayjs: 1.11.9 + https-proxy-agent: 5.0.1 + jsonwebtoken: 9.0.0 + qs: 6.11.2 + scmp: 2.1.0 + url-parse: 1.5.10 + xmlbuilder: 13.0.2 + transitivePeerDependencies: + - debug + - supports-color + dev: false + /type-check@0.3.2: resolution: {integrity: sha512-ZCmOJdvOWDBYJlzAoFkC+Q0+bUyEOS1ltgp1MGU03fqHG+dbi9tBFU2Rd9QKiDZFAYrhPh2JUf7rZRIuHRKtOg==} engines: {node: '>= 0.8.0'} diff --git a/providers/kannel/jest.config.js b/providers/kannel/jest.config.js index e86e13bab91..61faa20934a 100644 --- a/providers/kannel/jest.config.js +++ b/providers/kannel/jest.config.js @@ -2,4 +2,7 @@ module.exports = { preset: 'ts-jest', testEnvironment: 'node', + moduleNameMapper: { + axios: 'axios/dist/node/axios.cjs', + }, }; diff --git a/providers/kannel/package.json b/providers/kannel/package.json index 6e2138c03c0..5c803bab1ad 100644 --- a/providers/kannel/package.json +++ b/providers/kannel/package.json @@ -33,7 +33,7 @@ }, "dependencies": { "@novu/stateless": "^0.22.0", - "axios": "^0.27.2" + "axios": "^1.6.2" }, "devDependencies": { "@istanbuljs/nyc-config-typescript": "^1.0.1", diff --git a/providers/sendgrid/package.json b/providers/sendgrid/package.json index 1518b725c56..9ab67d9274d 100644 --- a/providers/sendgrid/package.json +++ b/providers/sendgrid/package.json @@ -46,7 +46,7 @@ }, "dependencies": { "@novu/stateless": "^0.22.0", - "@sendgrid/mail": "^7.4.6" + "@sendgrid/mail": "^7.7.0" }, "devDependencies": { "@istanbuljs/nyc-config-typescript": "1.0.2", diff --git a/providers/twilio/jest.config.js b/providers/twilio/jest.config.js index 8cbf8940ccd..61faa20934a 100644 --- a/providers/twilio/jest.config.js +++ b/providers/twilio/jest.config.js @@ -2,4 +2,7 @@ module.exports = { preset: 'ts-jest', testEnvironment: 'node', -}; \ No newline at end of file + moduleNameMapper: { + axios: 'axios/dist/node/axios.cjs', + }, +}; diff --git a/providers/twilio/package.json b/providers/twilio/package.json index a3afcb1fec9..94f69422949 100644 --- a/providers/twilio/package.json +++ b/providers/twilio/package.json @@ -45,7 +45,7 @@ }, "dependencies": { "@novu/stateless": "^0.22.0", - "twilio": "^4.14.1" + "twilio": "^4.19.3" }, "devDependencies": { "@istanbuljs/nyc-config-typescript": "1.0.2",