diff --git a/apps/api/src/app/shared/shared.module.ts b/apps/api/src/app/shared/shared.module.ts index 12e2a15f055..eb5b045a1f1 100644 --- a/apps/api/src/app/shared/shared.module.ts +++ b/apps/api/src/app/shared/shared.module.ts @@ -99,7 +99,6 @@ const PROVIDERS = [ imports: [ QueuesModule.forRoot([ JobTopicNameEnum.EXECUTION_LOG, - JobTopicNameEnum.WORKFLOW, JobTopicNameEnum.WEB_SOCKETS, JobTopicNameEnum.WORKFLOW, JobTopicNameEnum.INBOUND_PARSE_MAIL, diff --git a/apps/api/src/bootstrap.ts b/apps/api/src/bootstrap.ts index 7cf551fb7b0..4e5aa65601c 100644 --- a/apps/api/src/bootstrap.ts +++ b/apps/api/src/bootstrap.ts @@ -126,7 +126,6 @@ export async function bootstrap(expressApp?): Promise { await app.listen(process.env.PORT); } - // Starts listening for shutdown hooks app.enableShutdownHooks(); Logger.log(`Started application in NODE_ENV=${process.env.NODE_ENV} on port ${process.env.PORT}`); diff --git a/apps/worker/src/app/workflow/services/execution-log.worker.ts b/apps/worker/src/app/workflow/services/execution-log.worker.ts index 62583b96ef1..2ba1dbe65b1 100644 --- a/apps/worker/src/app/workflow/services/execution-log.worker.ts +++ b/apps/worker/src/app/workflow/services/execution-log.worker.ts @@ -2,7 +2,6 @@ import { Inject, Injectable, Logger } from '@nestjs/common'; const nr = require('newrelic'); import { getExecutionLogWorkerOptions, - INovuWorker, PinoLogger, storage, Store, @@ -18,13 +17,12 @@ import { ObservabilityBackgroundTransactionEnum } from '@novu/shared'; const LOG_CONTEXT = 'ExecutionLogWorker'; @Injectable() -export class ExecutionLogWorker extends ExecutionLogWorkerService implements INovuWorker { +export class ExecutionLogWorker extends ExecutionLogWorkerService { constructor( private createExecutionDetails: CreateExecutionDetails, public workflowInMemoryProviderService: WorkflowInMemoryProviderService ) { super(new BullMqService(workflowInMemoryProviderService)); - this.initWorker(this.getWorkerProcessor(), this.getWorkerOptions()); } diff --git a/apps/worker/src/app/workflow/services/standard.worker.ts b/apps/worker/src/app/workflow/services/standard.worker.ts index 62c61eb8f3f..cbe4a39a94b 100644 --- a/apps/worker/src/app/workflow/services/standard.worker.ts +++ b/apps/worker/src/app/workflow/services/standard.worker.ts @@ -4,7 +4,6 @@ import { IJobData, ObservabilityBackgroundTransactionEnum } from '@novu/shared'; import { BullMqService, getStandardWorkerOptions, - INovuWorker, Job, PinoLogger, StandardWorkerService, @@ -29,7 +28,7 @@ import { const LOG_CONTEXT = 'StandardWorker'; @Injectable() -export class StandardWorker extends StandardWorkerService implements INovuWorker { +export class StandardWorker extends StandardWorkerService { constructor( private handleLastFailedJob: HandleLastFailedJob, private runJob: RunJob, diff --git a/apps/worker/src/app/workflow/services/workflow.worker.ts b/apps/worker/src/app/workflow/services/workflow.worker.ts index c9e2cc0c6d8..822bb7b3a63 100644 --- a/apps/worker/src/app/workflow/services/workflow.worker.ts +++ b/apps/worker/src/app/workflow/services/workflow.worker.ts @@ -1,8 +1,7 @@ -import { Inject, Injectable, Logger } from '@nestjs/common'; +import { Injectable, Logger } from '@nestjs/common'; const nr = require('newrelic'); import { getWorkflowWorkerOptions, - INovuWorker, PinoLogger, storage, Store, @@ -19,7 +18,7 @@ import { ObservabilityBackgroundTransactionEnum } from '@novu/shared'; const LOG_CONTEXT = 'WorkflowWorker'; @Injectable() -export class WorkflowWorker extends WorkflowWorkerService implements INovuWorker { +export class WorkflowWorker extends WorkflowWorkerService { constructor( private triggerEventUsecase: TriggerEvent, public workflowInMemoryProviderService: WorkflowInMemoryProviderService diff --git a/apps/worker/src/bootstrap.ts b/apps/worker/src/bootstrap.ts index 8bb26ef68be..7b93f27a1a1 100644 --- a/apps/worker/src/bootstrap.ts +++ b/apps/worker/src/bootstrap.ts @@ -66,12 +66,11 @@ export async function bootstrap(): Promise { app.use(bodyParser.json()); app.use(bodyParser.urlencoded({ extended: true })); - // Starts listening for shutdown hooks app.enableShutdownHooks(); Logger.log('BOOTSTRAPPED SUCCESSFULLY'); - await app.listen(process.env.PORT); + await app.init(); try { await startAppInfra(app); @@ -79,6 +78,8 @@ export async function bootstrap(): Promise { process.exit(1); } + await app.listen(process.env.PORT); + Logger.log(`Started application in NODE_ENV=${process.env.NODE_ENV} on port ${process.env.PORT}`); return app; diff --git a/apps/ws/src/bootstrap.ts b/apps/ws/src/bootstrap.ts index 3cf976b09de..ba346c44e93 100644 --- a/apps/ws/src/bootstrap.ts +++ b/apps/ws/src/bootstrap.ts @@ -10,6 +10,7 @@ import helmet from 'helmet'; import { BullMqService } from '@novu/application-generic'; import { version } from '../package.json'; import { getErrorInterceptor, Logger } from '@novu/application-generic'; +import { prepareAppInfra, startAppInfra } from './socket/services'; if (process.env.SENTRY_DSN) { Sentry.init({ @@ -27,6 +28,8 @@ export async function bootstrap() { app.useLogger(app.get(Logger)); app.flushLogs(); + await prepareAppInfra(app); + app.useGlobalInterceptors(getErrorInterceptor()); app.setGlobalPrefix(CONTEXT_PATH); @@ -42,5 +45,15 @@ export async function bootstrap() { app.useWebSocketAdapter(redisIoAdapter); + app.enableShutdownHooks(); + + await app.init(); + + try { + await startAppInfra(app); + } catch (e) { + process.exit(1); + } + await app.listen(process.env.PORT as string); } diff --git a/apps/ws/src/health/health.controller.ts b/apps/ws/src/health/health.controller.ts index 115395e0638..26ac141ed20 100644 --- a/apps/ws/src/health/health.controller.ts +++ b/apps/ws/src/health/health.controller.ts @@ -1,26 +1,28 @@ -import { Controller, Get } from '@nestjs/common'; -import { HealthCheck, HealthCheckResult, HealthCheckService, HttpHealthIndicator } from '@nestjs/terminus'; -import { DalServiceHealthIndicator, WebSocketsQueueServiceHealthIndicator } from '@novu/application-generic'; +import { Controller, Get, Inject } from '@nestjs/common'; +import { HealthCheck, HealthCheckResult, HealthCheckService } from '@nestjs/terminus'; +import { DalServiceHealthIndicator, IHealthIndicator } from '@novu/application-generic'; import { version } from '../../package.json'; -import { WSHealthIndicator } from '../socket/services'; +import { WSServerHealthIndicator } from '../socket/services'; @Controller('v1/health-check') export class HealthController { constructor( private healthCheckService: HealthCheckService, private dalHealthIndicator: DalServiceHealthIndicator, - private webSocketsQueueHealthIndicator: WebSocketsQueueServiceHealthIndicator, - private wsHealthIndicator: WSHealthIndicator + private wsServerHealthIndicator: WSServerHealthIndicator, + @Inject('QUEUE_HEALTH_INDICATORS') private indicators: IHealthIndicator[] ) {} @Get() @HealthCheck() async healthCheck(): Promise { + const indicatorHealthCheckFunctions = this.indicators.map((indicator) => async () => indicator.isHealthy()); + const result = await this.healthCheckService.check([ + ...indicatorHealthCheckFunctions, async () => this.dalHealthIndicator.isHealthy(), - async () => this.webSocketsQueueHealthIndicator.isHealthy(), - async () => this.wsHealthIndicator.isHealthy(), + async () => this.wsServerHealthIndicator.isHealthy(), async () => { return { apiVersion: { diff --git a/apps/ws/src/health/health.module.ts b/apps/ws/src/health/health.module.ts index 38e17e65d62..1cd3d409ad7 100644 --- a/apps/ws/src/health/health.module.ts +++ b/apps/ws/src/health/health.module.ts @@ -3,13 +3,13 @@ import { TerminusModule } from '@nestjs/terminus'; import { HealthController } from './health.controller'; import { SharedModule } from '../shared/shared.module'; -import { WSGateway } from '../socket/ws.gateway'; -import { WSHealthIndicator } from '../socket/services'; +import { WSServerHealthIndicator } from '../socket/services'; +import { SocketModule } from '../socket/socket.module'; -const PROVIDERS = [WSHealthIndicator, WSGateway]; +const PROVIDERS = [WSServerHealthIndicator]; @Module({ - imports: [TerminusModule, SharedModule], + imports: [TerminusModule, SharedModule, SocketModule], providers: PROVIDERS, controllers: [HealthController], }) diff --git a/apps/ws/src/shared/subscriber-online/subscriber-online.service.ts b/apps/ws/src/shared/subscriber-online/subscriber-online.service.ts index a66029d724e..513e6578767 100644 --- a/apps/ws/src/shared/subscriber-online/subscriber-online.service.ts +++ b/apps/ws/src/shared/subscriber-online/subscriber-online.service.ts @@ -20,6 +20,7 @@ export class SubscriberOnlineService { async handleDisconnection(subscriber: ISubscriberJwt, activeConnections: number) { const lastOnlineAt = new Date().toISOString(); let isOnline = false; + if (activeConnections > 1) { isOnline = true; } diff --git a/apps/ws/src/socket/services/cold-start.service.ts b/apps/ws/src/socket/services/cold-start.service.ts new file mode 100644 index 00000000000..3c75b823037 --- /dev/null +++ b/apps/ws/src/socket/services/cold-start.service.ts @@ -0,0 +1,24 @@ +import { INestApplication } from '@nestjs/common'; +import { INovuWorker, ReadinessService } from '@novu/application-generic'; +import { WebSocketWorker } from './web-socket.worker'; + +const getWorkers = (app: INestApplication): INovuWorker[] => { + const webSocketWorker = app.get(WebSocketWorker, { strict: false }); + + const workers: INovuWorker[] = [webSocketWorker]; + + return workers; +}; + +export const prepareAppInfra = async (app: INestApplication): Promise => { + const readinessService = app.get(ReadinessService); + const workers = getWorkers(app); + + await readinessService.pauseWorkers(workers); +}; + +export const startAppInfra = async (app: INestApplication): Promise => { + const readinessService = app.get(ReadinessService); + const workers = getWorkers(app); + await readinessService.enableWorkers(workers); +}; diff --git a/apps/ws/src/socket/services/index.ts b/apps/ws/src/socket/services/index.ts index 9c5f11999e9..ea05245c3c4 100644 --- a/apps/ws/src/socket/services/index.ts +++ b/apps/ws/src/socket/services/index.ts @@ -1,2 +1,3 @@ export { WebSocketWorker } from './web-socket.worker'; -export { WSHealthIndicator } from './ws-health-indicator.service'; +export { WSServerHealthIndicator } from './ws-server-health-indicator.service'; +export { prepareAppInfra, startAppInfra } from './cold-start.service'; diff --git a/apps/ws/src/socket/services/web-socket.worker.ts b/apps/ws/src/socket/services/web-socket.worker.ts index b216193b737..ff3f532fa26 100644 --- a/apps/ws/src/socket/services/web-socket.worker.ts +++ b/apps/ws/src/socket/services/web-socket.worker.ts @@ -4,7 +4,6 @@ import { Injectable, Logger } from '@nestjs/common'; import { BullMqService, getWebSocketWorkerOptions, - INovuWorker, WebSocketsWorkerService, WorkerOptions, WorkflowInMemoryProviderService, @@ -16,7 +15,7 @@ import { ObservabilityBackgroundTransactionEnum } from '@novu/shared'; const LOG_CONTEXT = 'WebSocketWorker'; @Injectable() -export class WebSocketWorker extends WebSocketsWorkerService implements INovuWorker { +export class WebSocketWorker extends WebSocketsWorkerService { constructor( private externalServicesRoute: ExternalServicesRoute, private workflowInMemoryProviderService: WorkflowInMemoryProviderService diff --git a/apps/ws/src/socket/services/ws-health-indicator.service.ts b/apps/ws/src/socket/services/ws-server-health-indicator.service.ts similarity index 80% rename from apps/ws/src/socket/services/ws-health-indicator.service.ts rename to apps/ws/src/socket/services/ws-server-health-indicator.service.ts index f9be8125eb9..46d29d955ae 100644 --- a/apps/ws/src/socket/services/ws-health-indicator.service.ts +++ b/apps/ws/src/socket/services/ws-server-health-indicator.service.ts @@ -6,8 +6,8 @@ import { IHealthIndicator } from '@novu/application-generic'; import { WSGateway } from '../ws.gateway'; @Injectable() -export class WSHealthIndicator extends HealthIndicator implements IHealthIndicator { - private INDICATOR_KEY = 'ws'; +export class WSServerHealthIndicator extends HealthIndicator implements IHealthIndicator { + private INDICATOR_KEY = 'ws-server'; constructor(private wsGateway: WSGateway) { super(); diff --git a/apps/ws/src/socket/socket.module.ts b/apps/ws/src/socket/socket.module.ts index 2acb4d51d56..e2cf7dce103 100644 --- a/apps/ws/src/socket/socket.module.ts +++ b/apps/ws/src/socket/socket.module.ts @@ -1,5 +1,5 @@ -import { Inject, Module, OnApplicationShutdown, OnModuleInit, Provider } from '@nestjs/common'; -import { WebSocketsWorkerService, WorkflowInMemoryProviderService } from '@novu/application-generic'; +import { Module, OnApplicationShutdown, Provider } from '@nestjs/common'; +import { WorkflowInMemoryProviderService } from '@novu/application-generic'; import { WSGateway } from './ws.gateway'; import { SharedModule } from '../shared/shared.module'; @@ -9,7 +9,7 @@ import { WebSocketWorker } from './services'; const USE_CASES: Provider[] = [ExternalServicesRoute]; -const PROVIDERS: Provider[] = [WSGateway, WebSocketsWorkerService, WebSocketWorker]; +const PROVIDERS: Provider[] = [WSGateway, WebSocketWorker]; const memoryQueueService = { provide: WorkflowInMemoryProviderService, diff --git a/apps/ws/src/socket/usecases/external-services-route/external-services-route.usecase.ts b/apps/ws/src/socket/usecases/external-services-route/external-services-route.usecase.ts index cff06088138..901c42b0080 100644 --- a/apps/ws/src/socket/usecases/external-services-route/external-services-route.usecase.ts +++ b/apps/ws/src/socket/usecases/external-services-route/external-services-route.usecase.ts @@ -60,12 +60,6 @@ export class ExternalServicesRoute { return; } - const connection = await this.connectionExist(command); - - if (!connection) { - return; - } - let unreadCount = this.extractCount(command.payload?.unreadCount); if (unreadCount === undefined) { @@ -91,12 +85,6 @@ export class ExternalServicesRoute { return; } - const connection = await this.connectionExist(command); - - if (!connection) { - return; - } - let unseenCount = this.extractCount(command.payload?.unseenCount); if (unseenCount === undefined) { diff --git a/apps/ws/src/socket/ws.gateway.ts b/apps/ws/src/socket/ws.gateway.ts index f6f451db552..7307096529b 100644 --- a/apps/ws/src/socket/ws.gateway.ts +++ b/apps/ws/src/socket/ws.gateway.ts @@ -1,17 +1,20 @@ const nr = require('newrelic'); +import { Server, Socket } from 'socket.io'; import { JwtService } from '@nestjs/jwt'; import { OnGatewayConnection, OnGatewayDisconnect, WebSocketGateway, WebSocketServer } from '@nestjs/websockets'; import { Logger } from '@nestjs/common'; -import { Server, Socket } from 'socket.io'; import { ISubscriberJwt, ObservabilityBackgroundTransactionEnum } from '@novu/shared'; +import { IDestroy } from '@novu/application-generic'; import { SubscriberOnlineService } from '../shared/subscriber-online'; const LOG_CONTEXT = 'WSGateway'; @WebSocketGateway() -export class WSGateway implements OnGatewayConnection, OnGatewayDisconnect { +export class WSGateway implements OnGatewayConnection, OnGatewayDisconnect, IDestroy { + private isShutdown = false; + constructor(private jwtService: JwtService, private subscriberOnlineService: SubscriberOnlineService) {} @WebSocketServer() @@ -82,7 +85,18 @@ export class WSGateway implements OnGatewayConnection, OnGatewayDisconnect { } } + /* + * This method is called when a client disconnects from the server. + * * When a shutdown is in progress, we opt out of updating the subscriber status, + * assuming that when the current instance goes down, another instance will take its place and handle the subscriber status update. + */ private async processDisconnectionRequest(connection: Socket) { + if (!this.isShutdown) { + await this.handlerSubscriberDisconnection(connection); + } + } + + private async handlerSubscriberDisconnection(connection: Socket) { const token = this.extractToken(connection); if (!token || token === 'null') { @@ -98,6 +112,12 @@ export class WSGateway implements OnGatewayConnection, OnGatewayDisconnect { await this.subscriberOnlineService.handleDisconnection(subscriber, activeConnections); } + private async getActiveConnections(socket: Socket, subscriberId: string) { + const activeSockets = await socket.in(subscriberId).fetchSockets(); + + return activeSockets.length; + } + private async processConnectionRequest(connection: Socket) { const token = this.extractToken(connection); @@ -129,9 +149,29 @@ export class WSGateway implements OnGatewayConnection, OnGatewayDisconnect { socket.disconnect(); } - private async getActiveConnections(socket: Socket, subscriberId: string) { - const activeSockets = await socket.in(subscriberId).fetchSockets(); + async gracefulShutdown(): Promise { + try { + if (!this.server) { + Logger.error('WS server was not initialized while executing shutdown', LOG_CONTEXT); - return activeSockets.length; + return; + } + + Logger.log('Closing WS server for incoming new connections', LOG_CONTEXT); + this.server.close(); + + Logger.log('Disconnecting active sockets connections', LOG_CONTEXT); + this.server.sockets.disconnectSockets(); + } catch (e) { + Logger.error(e, 'Unexpected exception was thrown while graceful shut down was executed', LOG_CONTEXT); + throw e; + } finally { + Logger.log(`Graceful shutdown down has finished`, LOG_CONTEXT); + } + } + + async onModuleDestroy(): Promise { + this.isShutdown = true; + await this.gracefulShutdown(); } } diff --git a/libs/dal/package.json b/libs/dal/package.json index da644740d84..e5dba0264ff 100644 --- a/libs/dal/package.json +++ b/libs/dal/package.json @@ -25,7 +25,6 @@ "@aws-sdk/s3-request-presigner": "^3.382.0", "@faker-js/faker": "^6.0.0", "@novu/shared": "^0.22.0", - "@sendgrid/mail": "^7.4.2", "JSONStream": "^1.3.5", "archiver": "^5.0.0", "async": "^3.2.0", diff --git a/packages/application-generic/src/health/dal.health-indicator.ts b/packages/application-generic/src/health/dal.health-indicator.ts index 569e6d0f101..d4e4dab7794 100644 --- a/packages/application-generic/src/health/dal.health-indicator.ts +++ b/packages/application-generic/src/health/dal.health-indicator.ts @@ -1,9 +1,13 @@ import { HealthIndicator, HealthIndicatorResult } from '@nestjs/terminus'; import { Injectable } from '@nestjs/common'; import { DalService } from '@novu/dal'; +import { IHealthIndicator } from './health-indicator.interface'; @Injectable() -export class DalServiceHealthIndicator extends HealthIndicator { +export class DalServiceHealthIndicator + extends HealthIndicator + implements IHealthIndicator +{ private INDICATOR_KEY = 'db'; constructor(private dalService: DalService) { @@ -15,4 +19,8 @@ export class DalServiceHealthIndicator extends HealthIndicator { return this.getStatus(this.INDICATOR_KEY, status); } + + isActive(): Promise { + return this.isHealthy(); + } } diff --git a/packages/application-generic/src/modules/index.ts b/packages/application-generic/src/modules/index.ts index c9d351ed23c..1ed7afc3bd1 100644 --- a/packages/application-generic/src/modules/index.ts +++ b/packages/application-generic/src/modules/index.ts @@ -1,2 +1,3 @@ export { QueuesModule } from './queues.module'; export { MetricsModule } from './metrics.module'; +export * from './interfaces'; diff --git a/packages/application-generic/src/modules/interfaces.ts b/packages/application-generic/src/modules/interfaces.ts new file mode 100644 index 00000000000..4d8fa06bc51 --- /dev/null +++ b/packages/application-generic/src/modules/interfaces.ts @@ -0,0 +1,4 @@ +export interface IDestroy { + gracefulShutdown?: () => Promise; + onModuleDestroy: () => Promise; +} diff --git a/packages/application-generic/src/services/readiness/readiness.service.ts b/packages/application-generic/src/services/readiness/readiness.service.ts index fa7336fce10..bf1b742ea64 100644 --- a/packages/application-generic/src/services/readiness/readiness.service.ts +++ b/packages/application-generic/src/services/readiness/readiness.service.ts @@ -1,14 +1,15 @@ import { Inject, Injectable, Logger } from '@nestjs/common'; +import { HealthIndicatorResult, HealthIndicatorStatus } from '@nestjs/terminus'; +import { setTimeout } from 'timers/promises'; import { Worker } from '../bull-mq'; -import { setTimeout } from 'timers/promises'; -import { QueueHealthIndicator } from '../../health/queue-health-indicator.service'; -export interface INovuWorker { +import { IHealthIndicator } from '../../health'; +import { IDestroy } from '../../modules'; + +export interface INovuWorker extends IDestroy { readonly DEFAULT_ATTEMPTS: number; - gracefulShutdown: () => Promise; readonly topic: string; - onModuleDestroy: () => Promise; pause: () => Promise; resume: () => Promise; worker: Worker; @@ -20,7 +21,7 @@ const LOG_CONTEXT = 'ReadinessService'; export class ReadinessService { constructor( @Inject('QUEUE_HEALTH_INDICATORS') - private healthIndicators: QueueHealthIndicator[] + private healthIndicators: IHealthIndicator[] ) {} async areQueuesEnabled(): Promise { @@ -37,10 +38,7 @@ export class ReadinessService { } Logger.warn( - { - attempt: i, - message: `Some health indicator returned false when checking if queues are enabled ${i}/${retries}`, - }, + `Some health indicator returned false when checking if queues are enabled ${i}/${retries}`, LOG_CONTEXT ); @@ -56,7 +54,11 @@ export class ReadinessService { this.healthIndicators.map((health) => health.isHealthy()) ); - return healths.every((health) => !!health === true); + const statuses = healths.map( + (health: HealthIndicatorResult) => Object.values(health)[0].status + ); + + return statuses.every((status: HealthIndicatorStatus) => status === 'up'); } catch (error) { Logger.error( error, diff --git a/packages/application-generic/src/services/workers/active-jobs-metric-worker.service.ts b/packages/application-generic/src/services/workers/active-jobs-metric-worker.service.ts index ab94787ae20..7ffad852290 100644 --- a/packages/application-generic/src/services/workers/active-jobs-metric-worker.service.ts +++ b/packages/application-generic/src/services/workers/active-jobs-metric-worker.service.ts @@ -16,6 +16,5 @@ export class ActiveJobsMetricWorkerService extends WorkerBaseService { JobTopicNameEnum.ACTIVE_JOBS_METRIC, new BullMqService(workflowInMemoryProvider) ); - Logger.log(`Worker ${this.topic} instantiated`, LOG_CONTEXT); } } diff --git a/packages/application-generic/src/services/workers/completed-jobs-metric-worker.service.ts b/packages/application-generic/src/services/workers/completed-jobs-metric-worker.service.ts new file mode 100644 index 00000000000..e69de29bb2d diff --git a/packages/application-generic/src/services/workers/execution-log-worker.service.ts b/packages/application-generic/src/services/workers/execution-log-worker.service.ts index ba7ecc2fc84..a2c77527a47 100644 --- a/packages/application-generic/src/services/workers/execution-log-worker.service.ts +++ b/packages/application-generic/src/services/workers/execution-log-worker.service.ts @@ -1,15 +1,12 @@ -import { Inject, Injectable, Logger } from '@nestjs/common'; import { JobTopicNameEnum } from '@novu/shared'; import { WorkerBaseService } from './index'; import { BullMqService } from '../bull-mq'; -import { WorkflowInMemoryProviderService } from '../in-memory-provider'; const LOG_CONTEXT = 'ExecutionLogWorkerService'; export class ExecutionLogWorkerService extends WorkerBaseService { constructor(public bullMqService: BullMqService) { super(JobTopicNameEnum.EXECUTION_LOG, bullMqService); - Logger.log(`Worker ${this.topic} instantiated`, LOG_CONTEXT); } } diff --git a/packages/application-generic/src/services/workers/inbound-parse-worker.service.ts b/packages/application-generic/src/services/workers/inbound-parse-worker.service.ts new file mode 100644 index 00000000000..e69de29bb2d diff --git a/packages/application-generic/src/services/workers/standard-worker.service.ts b/packages/application-generic/src/services/workers/standard-worker.service.ts index 30375172d23..28c5bab4f49 100644 --- a/packages/application-generic/src/services/workers/standard-worker.service.ts +++ b/packages/application-generic/src/services/workers/standard-worker.service.ts @@ -1,15 +1,12 @@ -import { Inject, Injectable, Logger } from '@nestjs/common'; import { JobTopicNameEnum } from '@novu/shared'; import { WorkerBaseService } from './index'; import { BullMqService } from '../bull-mq'; -import { WorkflowInMemoryProviderService } from '../in-memory-provider'; const LOG_CONTEXT = 'StandardWorkerService'; export class StandardWorkerService extends WorkerBaseService { constructor(public bullMqService: BullMqService) { super(JobTopicNameEnum.STANDARD, bullMqService); - Logger.log(`Worker ${this.topic} instantiated`, LOG_CONTEXT); } } diff --git a/packages/application-generic/src/services/workers/subscriber-process-worker.service.ts b/packages/application-generic/src/services/workers/subscriber-process-worker.service.ts index 6b7ae89e466..b9ed28c01c5 100644 --- a/packages/application-generic/src/services/workers/subscriber-process-worker.service.ts +++ b/packages/application-generic/src/services/workers/subscriber-process-worker.service.ts @@ -1,19 +1,12 @@ -import { Inject, Injectable, Logger } from '@nestjs/common'; import { JobTopicNameEnum } from '@novu/shared'; -import { INovuWorker } from '../readiness'; import { WorkerBaseService } from './worker-base.service'; import { BullMqService } from '../bull-mq'; -import { WorkflowInMemoryProviderService } from '../in-memory-provider'; const LOG_CONTEXT = 'SubscriberProcessWorkerService'; -export class SubscriberProcessWorkerService - extends WorkerBaseService - implements INovuWorker -{ +export class SubscriberProcessWorkerService extends WorkerBaseService { constructor(private bullMqService: BullMqService) { super(JobTopicNameEnum.PROCESS_SUBSCRIBER, bullMqService); - Logger.log(`Worker ${this.topic} instantiated`, LOG_CONTEXT); } } diff --git a/packages/application-generic/src/services/workers/web-sockets-worker.service.ts b/packages/application-generic/src/services/workers/web-sockets-worker.service.ts index 1cffb0dca12..87bf8efd7c4 100644 --- a/packages/application-generic/src/services/workers/web-sockets-worker.service.ts +++ b/packages/application-generic/src/services/workers/web-sockets-worker.service.ts @@ -1,4 +1,3 @@ -import { Inject, Injectable, Logger } from '@nestjs/common'; import { JobTopicNameEnum } from '@novu/shared'; import { WorkerBaseService } from './index'; @@ -9,6 +8,5 @@ const LOG_CONTEXT = 'WebSocketsWorkerService'; export class WebSocketsWorkerService extends WorkerBaseService { constructor(public bullMqService: BullMqService) { super(JobTopicNameEnum.WEB_SOCKETS, bullMqService); - Logger.log(`Worker ${this.topic} instantiated`, LOG_CONTEXT); } } diff --git a/packages/application-generic/src/services/workers/worker-base.service.ts b/packages/application-generic/src/services/workers/worker-base.service.ts index 45a585c2425..1bcb1b033f9 100644 --- a/packages/application-generic/src/services/workers/worker-base.service.ts +++ b/packages/application-generic/src/services/workers/worker-base.service.ts @@ -1,14 +1,8 @@ -import { IJobData, JobTopicNameEnum } from '@novu/shared'; -import { Inject, Injectable, Logger } from '@nestjs/common'; - -import { - BullMqService, - JobsOptions, - Processor, - Worker, - WorkerOptions, -} from '../bull-mq'; -import { WorkflowInMemoryProviderService } from '../in-memory-provider'; +import { JobTopicNameEnum } from '@novu/shared'; +import { Logger } from '@nestjs/common'; + +import { BullMqService, Processor, Worker, WorkerOptions } from '../bull-mq'; +import { INovuWorker } from '../readiness'; const LOG_CONTEXT = 'WorkerService'; @@ -19,7 +13,7 @@ export type WorkerProcessor = export { WorkerOptions }; -export class WorkerBaseService { +export class WorkerBaseService implements INovuWorker { private instance: BullMqService; public readonly DEFAULT_ATTEMPTS = 3; diff --git a/packages/application-generic/src/services/workers/workflow-worker.service.ts b/packages/application-generic/src/services/workers/workflow-worker.service.ts index 29e7993b9db..de00fbb325b 100644 --- a/packages/application-generic/src/services/workers/workflow-worker.service.ts +++ b/packages/application-generic/src/services/workers/workflow-worker.service.ts @@ -1,15 +1,12 @@ -import { Inject, Injectable, Logger } from '@nestjs/common'; import { JobTopicNameEnum } from '@novu/shared'; import { WorkerBaseService } from './index'; import { BullMqService } from '../bull-mq'; -import { WorkflowInMemoryProviderService } from '../in-memory-provider'; const LOG_CONTEXT = 'WorkflowWorkerService'; export class WorkflowWorkerService extends WorkerBaseService { constructor(public bullMqService: BullMqService) { super(JobTopicNameEnum.WORKFLOW, bullMqService); - Logger.log(`Worker ${this.topic} instantiated`, LOG_CONTEXT); } } diff --git a/packages/node/package.json b/packages/node/package.json index 06b137e0027..a702498edf1 100644 --- a/packages/node/package.json +++ b/packages/node/package.json @@ -50,7 +50,6 @@ "uuid": "^9.0.1" }, "devDependencies": { - "@sendgrid/mail": "^7.4.6", "@types/jest": "29.5.2", "@types/lodash.get": "^4.4.6", "@types/lodash.merge": "^4.6.6", diff --git a/packages/stateless/package.json b/packages/stateless/package.json index 5ac47dedc3a..8441733ff74 100644 --- a/packages/stateless/package.json +++ b/packages/stateless/package.json @@ -47,7 +47,6 @@ "lodash.merge": "^4.6.2" }, "devDependencies": { - "@sendgrid/mail": "^7.4.6", "@types/jest": "29.5.2", "@types/lodash.get": "^4.4.6", "@types/lodash.merge": "^4.6.6", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 0dd9561b568..065bbb20aea 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1633,9 +1633,6 @@ importers: '@novu/shared': specifier: ^0.22.0 version: link:../shared - '@sendgrid/mail': - specifier: ^7.4.2 - version: 7.7.0 JSONStream: specifier: ^1.3.5 version: 1.3.5 @@ -2629,9 +2626,6 @@ importers: specifier: ^9.0.1 version: 9.0.1 devDependencies: - '@sendgrid/mail': - specifier: ^7.4.6 - version: 7.7.0 '@types/jest': specifier: 29.5.2 version: 29.5.2 @@ -3032,9 +3026,6 @@ importers: specifier: ^4.6.2 version: 4.6.2 devDependencies: - '@sendgrid/mail': - specifier: ^7.4.6 - version: 7.7.0 '@types/jest': specifier: 29.5.2 version: 29.5.2 @@ -4128,8 +4119,8 @@ importers: specifier: ^0.22.0 version: link:../../packages/stateless axios: - specifier: ^0.27.2 - version: 0.27.2 + specifier: ^1.6.2 + version: 1.6.2 devDependencies: '@istanbuljs/nyc-config-typescript': specifier: ^1.0.1 @@ -5352,7 +5343,7 @@ importers: specifier: ^0.22.0 version: link:../../packages/stateless '@sendgrid/mail': - specifier: ^7.4.6 + specifier: ^7.7.0 version: 7.7.0 devDependencies: '@istanbuljs/nyc-config-typescript': @@ -5948,8 +5939,8 @@ importers: specifier: ^0.22.0 version: link:../../packages/stateless twilio: - specifier: ^4.14.1 - version: 4.15.0 + specifier: ^4.19.3 + version: 4.19.3 devDependencies: '@istanbuljs/nyc-config-typescript': specifier: 1.0.2 @@ -20170,12 +20161,14 @@ packages: axios: 0.26.1 transitivePeerDependencies: - debug + dev: false /@sendgrid/helpers@7.7.0: resolution: {integrity: sha512-3AsAxfN3GDBcXoZ/y1mzAAbKzTtUZ5+ZrHOmWQ279AuaFXUNCh9bPnRpN504bgveTqoW+11IzPg3I0WVgDINpw==} engines: {node: '>= 6.0.0'} dependencies: deepmerge: 4.3.1 + dev: false /@sendgrid/mail@7.7.0: resolution: {integrity: sha512-5+nApPE9wINBvHSUxwOxkkQqM/IAAaBYoP9hw7WwgDNQPxraruVqHizeTitVtKGiqWCKm2mnjh4XGN3fvFLqaw==} @@ -20185,6 +20178,7 @@ packages: '@sendgrid/helpers': 7.7.0 transitivePeerDependencies: - debug + dev: false /@sentry-internal/tracing@7.47.0: resolution: {integrity: sha512-udpHnCzF8DQsWf0gQwd0XFGp6Y8MOiwnl8vGt2ohqZGS3m1+IxoRLXsSkD8qmvN6KKDnwbaAvYnK0z0L+AW95g==} @@ -23650,7 +23644,7 @@ packages: dependencies: '@storybook/client-logger': 7.4.2 memoizerific: 1.11.3 - qs: 6.11.1 + qs: 6.11.2 react: 17.0.2 react-dom: 17.0.2(react@17.0.2) @@ -27568,6 +27562,7 @@ packages: follow-redirects: 1.15.2 transitivePeerDependencies: - debug + dev: false /axios@0.27.2: resolution: {integrity: sha512-t+yRIyySRTp/wua5xEr+z1q60QmLq8ABsS5O9Me1AsE5dfKqgnCFzwiCZZ/cGNd1lq4/7akDWMxdhVlucjmnOQ==} @@ -32336,7 +32331,7 @@ packages: eslint: 8.51.0 eslint-import-resolver-node: 0.3.7 eslint-module-utils: 2.8.0(@typescript-eslint/parser@5.58.0)(eslint-import-resolver-node@0.3.7)(eslint-import-resolver-webpack@0.13.7)(eslint@8.51.0) - has: 1.0.3 + has: 1.0.4 is-core-module: 2.13.0 is-glob: 4.0.3 minimatch: 3.1.2 @@ -36780,7 +36775,7 @@ packages: pretty-format: 27.5.1 slash: 3.0.0 strip-json-comments: 3.1.1 - ts-node: 10.9.1(@types/node@16.11.7)(typescript@4.9.5) + ts-node: 10.9.1(@types/node@14.18.42)(typescript@4.9.5) transitivePeerDependencies: - bufferutil - canvas @@ -49415,6 +49410,23 @@ packages: - supports-color dev: false + /twilio@4.19.3: + resolution: {integrity: sha512-3X5Czl9Vg4QFl+2pnfMQ+H8YfEDQ4WeuAmqjUpbK65x0DfmxTCHuPEFWUKVZCJZew6iltJB/1whhVvIKETe54A==} + engines: {node: '>=14.0'} + dependencies: + axios: 1.6.2 + dayjs: 1.11.9 + https-proxy-agent: 5.0.1 + jsonwebtoken: 9.0.0 + qs: 6.11.2 + scmp: 2.1.0 + url-parse: 1.5.10 + xmlbuilder: 13.0.2 + transitivePeerDependencies: + - debug + - supports-color + dev: false + /type-check@0.3.2: resolution: {integrity: sha512-ZCmOJdvOWDBYJlzAoFkC+Q0+bUyEOS1ltgp1MGU03fqHG+dbi9tBFU2Rd9QKiDZFAYrhPh2JUf7rZRIuHRKtOg==} engines: {node: '>= 0.8.0'} diff --git a/providers/kannel/jest.config.js b/providers/kannel/jest.config.js index e86e13bab91..61faa20934a 100644 --- a/providers/kannel/jest.config.js +++ b/providers/kannel/jest.config.js @@ -2,4 +2,7 @@ module.exports = { preset: 'ts-jest', testEnvironment: 'node', + moduleNameMapper: { + axios: 'axios/dist/node/axios.cjs', + }, }; diff --git a/providers/kannel/package.json b/providers/kannel/package.json index 6e2138c03c0..5c803bab1ad 100644 --- a/providers/kannel/package.json +++ b/providers/kannel/package.json @@ -33,7 +33,7 @@ }, "dependencies": { "@novu/stateless": "^0.22.0", - "axios": "^0.27.2" + "axios": "^1.6.2" }, "devDependencies": { "@istanbuljs/nyc-config-typescript": "^1.0.1", diff --git a/providers/sendgrid/package.json b/providers/sendgrid/package.json index 1518b725c56..9ab67d9274d 100644 --- a/providers/sendgrid/package.json +++ b/providers/sendgrid/package.json @@ -46,7 +46,7 @@ }, "dependencies": { "@novu/stateless": "^0.22.0", - "@sendgrid/mail": "^7.4.6" + "@sendgrid/mail": "^7.7.0" }, "devDependencies": { "@istanbuljs/nyc-config-typescript": "1.0.2", diff --git a/providers/twilio/jest.config.js b/providers/twilio/jest.config.js index 8cbf8940ccd..61faa20934a 100644 --- a/providers/twilio/jest.config.js +++ b/providers/twilio/jest.config.js @@ -2,4 +2,7 @@ module.exports = { preset: 'ts-jest', testEnvironment: 'node', -}; \ No newline at end of file + moduleNameMapper: { + axios: 'axios/dist/node/axios.cjs', + }, +}; diff --git a/providers/twilio/package.json b/providers/twilio/package.json index a3afcb1fec9..94f69422949 100644 --- a/providers/twilio/package.json +++ b/providers/twilio/package.json @@ -45,7 +45,7 @@ }, "dependencies": { "@novu/stateless": "^0.22.0", - "twilio": "^4.14.1" + "twilio": "^4.19.3" }, "devDependencies": { "@istanbuljs/nyc-config-typescript": "1.0.2",