From b5e152f32b0d06244e4ab7e5489d3016120bfefd Mon Sep 17 00:00:00 2001 From: Dima Grossman Date: Sun, 3 Dec 2023 17:11:45 +0200 Subject: [PATCH] refactor(worker): singleton queue and fix inject issues with queues and workers (#4929) * refactor(worker): singleton queue and fix inject issues * fix: tests * fix: tests for ws * fix: ws service bootstrap * fix: ws socket initialization * fix: inbound mail service * fix: ws tests * fix: inbound mail * fix: tests * refactor: extract active workers * fix: remove test * fix: worker test * feat: add cluster retry strategt * fix: queue for api * refactor: remove baseapi queues * fix: remove other queu module instances * fix: remove log * fix: remove unused inboud parse worker service * fix: use app shutdown toclose redis * fix: dedicate memory instance for workers * fix: flaky test * refactor: rename active workers naming * fix: pr refactor * revert: worker creation * feat: add types to worker init config --------- Co-authored-by: Gosha --- apps/api/src/.env.test | 2 +- apps/api/src/app/events/events.module.ts | 3 +- apps/api/src/app/health/health.module.ts | 4 +- .../app/inbound-parse/inbound-parse.module.ts | 7 +- ...ice.ts => inbound-parse.worker.service.ts} | 21 +-- .../app/integrations/integrations.module.ts | 3 +- apps/api/src/app/shared/shared.module.ts | 39 ++-- apps/api/src/app/widgets/widgets.module.ts | 4 +- .../src/server/inbound-mail.service.spec.ts | 1 - .../src/server/inbound-mail.service.ts | 16 +- apps/inbound-mail/src/server/index.ts | 4 +- apps/worker/src/.env.test | 2 +- .../src/app/health/health.controller.ts | 26 +-- apps/worker/src/app/health/health.module.ts | 3 +- apps/worker/src/app/shared/shared.module.ts | 50 +++-- .../active-jobs-metric.service.spec.ts | 105 ----------- .../services/active-jobs-metric.service.ts | 28 ++- .../workflow/services/cold-start.service.ts | 13 +- .../completed-jobs-metric.service.spec.ts | 102 ---------- .../services/completed-jobs-metric.service.ts | 134 ------------- .../services/execution-log.worker.spec.ts | 15 +- .../workflow/services/execution-log.worker.ts | 16 +- .../worker/src/app/workflow/services/index.ts | 1 - .../workflow/services/standard.worker.spec.ts | 31 +-- .../app/workflow/services/standard.worker.ts | 15 +- .../services/subscriber-process.worker.ts | 15 +- .../workflow/services/workflow.worker.spec.ts | 15 +- .../app/workflow/services/workflow.worker.ts | 11 +- .../src/app/workflow/workflow.module.ts | 59 +++--- apps/worker/src/config/worker-init.config.ts | 86 +++++++++ apps/ws/src/health/health.module.ts | 3 +- apps/ws/src/shared/shared.module.ts | 6 +- .../socket/services/web-socket.worker.spec.ts | 11 +- .../src/socket/services/web-socket.worker.ts | 9 +- apps/ws/src/socket/socket.module.ts | 26 ++- libs/shared/src/config/job-queue.ts | 1 - libs/testing/src/jobs.service.ts | 14 +- .../src/custom-providers/index.ts | 43 +---- ...eted-jobs-metric-queue.health-indicator.ts | 21 --- .../application-generic/src/health/index.ts | 2 +- .../application-generic/src/modules/index.ts | 1 - .../src/modules/queues.module.ts | 178 ++++++++++++------ .../services/bull-mq/bull-mq.service.spec.ts | 9 +- .../src/services/bull-mq/bull-mq.service.ts | 25 +-- .../providers/memory-db-cluster-provider.ts | 2 + .../providers/redis-provider.ts | 1 + .../active-jobs-metric-queue.service.spec.ts | 10 +- .../active-jobs-metric-queue.service.ts | 11 +- ...ompleted-jobs-metric-queue.service.spec.ts | 84 --------- .../completed-jobs-metric-queue.service.ts | 17 -- .../execution-log-queue.service.spec.ts | 10 +- .../queues/execution-log-queue.service.ts | 13 +- .../inbound-parse-queue.service.spec.ts | 10 +- .../queues/inbound-parse-queue.service.ts | 12 +- .../src/services/queues/index.ts | 2 - .../src/services/queues/queue-base.service.ts | 21 +-- .../queues/standard-queue.service.spec.ts | 10 +- .../services/queues/standard-queue.service.ts | 11 +- .../subscriber-process-queue.service.ts | 15 +- .../queues/web-sockets-queue.service.spec.ts | 10 +- .../queues/web-sockets-queue.service.ts | 11 +- .../queues/workflow-queue.service.spec.ts | 10 +- .../services/queues/workflow-queue.service.ts | 11 +- .../readiness/readiness.service.spec.ts | 29 ++- .../services/readiness/readiness.service.ts | 22 +-- .../active-jobs-metric-worker.service.ts | 11 +- .../completed-jobs-metric-worker.service.ts | 14 -- .../workers/execution-log-worker.service.ts | 7 +- .../workers/inbound-parse-worker.service.ts | 14 -- .../src/services/workers/index.ts | 4 - .../workers/standard-worker.service.ts | 7 +- .../subscriber-process-worker.service.ts | 9 +- .../workers/web-sockets-worker.service.ts | 8 +- .../services/workers/worker-base.service.ts | 12 +- .../workers/workflow-worker.service.ts | 7 +- .../select-integration.spec.ts | 8 +- .../trigger-event/trigger-event.usecase.ts | 2 - 77 files changed, 679 insertions(+), 926 deletions(-) rename apps/api/src/app/inbound-parse/services/{inbound-parse.queue.service.ts => inbound-parse.worker.service.ts} (66%) delete mode 100644 apps/worker/src/app/workflow/services/active-jobs-metric.service.spec.ts delete mode 100644 apps/worker/src/app/workflow/services/completed-jobs-metric.service.spec.ts delete mode 100644 apps/worker/src/app/workflow/services/completed-jobs-metric.service.ts create mode 100644 apps/worker/src/config/worker-init.config.ts delete mode 100644 packages/application-generic/src/health/completed-jobs-metric-queue.health-indicator.ts delete mode 100644 packages/application-generic/src/services/queues/completed-jobs-metric-queue.service.spec.ts delete mode 100644 packages/application-generic/src/services/queues/completed-jobs-metric-queue.service.ts delete mode 100644 packages/application-generic/src/services/workers/completed-jobs-metric-worker.service.ts delete mode 100644 packages/application-generic/src/services/workers/inbound-parse-worker.service.ts diff --git a/apps/api/src/.env.test b/apps/api/src/.env.test index 537e3774084..67226ed9c80 100644 --- a/apps/api/src/.env.test +++ b/apps/api/src/.env.test @@ -6,7 +6,7 @@ BLUEPRINT_CREATOR=645b648b36dd6d25f8650d37 CLIENT_SUCCESS_AUTH_REDIRECT=http://localhost:4200/auth/login -MONGO_URL=mongodb://localhost:27017/novu-test +MONGO_URL=mongodb://127.0.0.1:27017/novu-test REDIS_PORT=6379 REDIS_HOST=localhost REDIS_PREFIX= diff --git a/apps/api/src/app/events/events.module.ts b/apps/api/src/app/events/events.module.ts index 35d803a73fa..ab9e07e9982 100644 --- a/apps/api/src/app/events/events.module.ts +++ b/apps/api/src/app/events/events.module.ts @@ -7,7 +7,6 @@ import { GetNovuProviderCredentials, StorageHelperService, SendTestEmail, - BaseApiQueuesModule, } from '@novu/application-generic'; import { EventsController } from './events.controller'; @@ -24,6 +23,7 @@ import { ExecutionDetailsModule } from '../execution-details/execution-details.m import { TopicsModule } from '../topics/topics.module'; import { LayoutsModule } from '../layouts/layouts.module'; import { TenantModule } from '../tenant/tenant.module'; +import { JobTopicNameEnum } from '@novu/shared'; const PROVIDERS = [ CreateExecutionDetails, @@ -47,7 +47,6 @@ const PROVIDERS = [ TopicsModule, LayoutsModule, TenantModule, - BaseApiQueuesModule, ], controllers: [EventsController], providers: [...PROVIDERS, ...USE_CASES], diff --git a/apps/api/src/app/health/health.module.ts b/apps/api/src/app/health/health.module.ts index 8b00b5a1a41..b4a4002be7a 100644 --- a/apps/api/src/app/health/health.module.ts +++ b/apps/api/src/app/health/health.module.ts @@ -1,13 +1,11 @@ import { Module } from '@nestjs/common'; import { TerminusModule } from '@nestjs/terminus'; -import { BaseApiQueuesModule } from '@novu/application-generic'; - import { HealthController } from './health.controller'; import { SharedModule } from '../shared/shared.module'; @Module({ - imports: [SharedModule, TerminusModule, BaseApiQueuesModule], + imports: [SharedModule, TerminusModule], controllers: [HealthController], providers: [], }) diff --git a/apps/api/src/app/inbound-parse/inbound-parse.module.ts b/apps/api/src/app/inbound-parse/inbound-parse.module.ts index d8209241fde..1396f677227 100644 --- a/apps/api/src/app/inbound-parse/inbound-parse.module.ts +++ b/apps/api/src/app/inbound-parse/inbound-parse.module.ts @@ -1,18 +1,17 @@ import { MiddlewareConsumer, Module, NestModule } from '@nestjs/common'; -import { BaseApiQueuesModule, CompileTemplate } from '@novu/application-generic'; +import { CompileTemplate } from '@novu/application-generic'; import { USE_CASES } from './usecases'; import { InboundParseController } from './inbound-parse.controller'; -import { InboundParseQueueService } from './services/inbound-parse.queue.service'; import { GetMxRecord } from './usecases/get-mx-record/get-mx-record.usecase'; import { SharedModule } from '../shared/shared.module'; import { AuthModule } from '../auth/auth.module'; -const PROVIDERS = [InboundParseQueueService, GetMxRecord, CompileTemplate]; +const PROVIDERS = [GetMxRecord, CompileTemplate]; @Module({ - imports: [SharedModule, AuthModule, BaseApiQueuesModule], + imports: [SharedModule, AuthModule], controllers: [InboundParseController], providers: [...PROVIDERS, ...USE_CASES], exports: [...USE_CASES], diff --git a/apps/api/src/app/inbound-parse/services/inbound-parse.queue.service.ts b/apps/api/src/app/inbound-parse/services/inbound-parse.worker.service.ts similarity index 66% rename from apps/api/src/app/inbound-parse/services/inbound-parse.queue.service.ts rename to apps/api/src/app/inbound-parse/services/inbound-parse.worker.service.ts index 99a29e38c15..38ba9e4fc79 100644 --- a/apps/api/src/app/inbound-parse/services/inbound-parse.queue.service.ts +++ b/apps/api/src/app/inbound-parse/services/inbound-parse.worker.service.ts @@ -1,11 +1,9 @@ import { + BullMqService, getInboundParseMailWorkerOptions, - InboundParseQueueService as InboundParseQueue, - InboundParseWorker, - Queue, - QueueOptions, - Worker, + WorkerBaseService, WorkerOptions, + WorkflowInMemoryProviderService, } from '@novu/application-generic'; import { JobTopicNameEnum } from '@novu/shared'; import { Injectable, Logger } from '@nestjs/common'; @@ -16,17 +14,14 @@ import { InboundEmailParseCommand } from '../usecases/inbound-email-parse/inboun const LOG_CONTEXT = 'InboundParseQueueService'; @Injectable() -export class InboundParseQueueService { - public readonly queue: Queue; - public readonly worker: Worker; - +export class InboundParseWorkerService extends WorkerBaseService { constructor( private emailParseUsecase: InboundEmailParse, - public readonly inboundParseQueue: InboundParseQueue, - public readonly inboundParseWorker: InboundParseWorker + public workflowInMemoryProviderService: WorkflowInMemoryProviderService ) { - this.inboundParseQueue.createQueue(); - this.inboundParseWorker.createWorker(this.getWorkerProcessor(), this.getWorkerOptions()); + super(JobTopicNameEnum.INBOUND_PARSE_MAIL, new BullMqService(workflowInMemoryProviderService)); + + this.createWorker(this.getWorkerProcessor(), this.getWorkerOptions()); } private getWorkerOptions(): WorkerOptions { diff --git a/apps/api/src/app/integrations/integrations.module.ts b/apps/api/src/app/integrations/integrations.module.ts index 2e73e108954..e1ef63f12f9 100644 --- a/apps/api/src/app/integrations/integrations.module.ts +++ b/apps/api/src/app/integrations/integrations.module.ts @@ -5,9 +5,10 @@ import { USE_CASES } from './usecases'; import { IntegrationsController } from './integrations.controller'; import { AuthModule } from '../auth/auth.module'; import { CompileTemplate, CreateExecutionDetails, QueuesModule } from '@novu/application-generic'; +import { JobTopicNameEnum } from '@novu/shared'; @Module({ - imports: [SharedModule, QueuesModule, forwardRef(() => AuthModule)], + imports: [SharedModule, forwardRef(() => AuthModule)], controllers: [IntegrationsController], providers: [...USE_CASES, CompileTemplate, CreateExecutionDetails], exports: [...USE_CASES], diff --git a/apps/api/src/app/shared/shared.module.ts b/apps/api/src/app/shared/shared.module.ts index e77a17ec15e..946edf5bc34 100644 --- a/apps/api/src/app/shared/shared.module.ts +++ b/apps/api/src/app/shared/shared.module.ts @@ -1,27 +1,27 @@ import { Module } from '@nestjs/common'; import { + ChangeRepository, DalService, - UserRepository, - OrganizationRepository, EnvironmentRepository, ExecutionDetailsRepository, - NotificationTemplateRepository, - SubscriberRepository, - NotificationRepository, - MessageRepository, - NotificationGroupRepository, - MessageTemplateRepository, - MemberRepository, - LayoutRepository, - LogRepository, + FeedRepository, IntegrationRepository, - ChangeRepository, JobRepository, - FeedRepository, + LayoutRepository, + LogRepository, + MemberRepository, + MessageRepository, + MessageTemplateRepository, + NotificationGroupRepository, + NotificationRepository, + NotificationTemplateRepository, + OrganizationRepository, SubscriberPreferenceRepository, + SubscriberRepository, + TenantRepository, TopicRepository, TopicSubscribersRepository, - TenantRepository, + UserRepository, WorkflowOverrideRepository, } from '@novu/dal'; import { @@ -36,10 +36,12 @@ import { getIsTopicNotificationEnabled, InvalidateCacheService, LoggerModule, + QueuesModule, storageService, } from '@novu/application-generic'; import * as packageJson from '../../../package.json'; +import { JobTopicNameEnum } from '@novu/shared'; const DAL_MODELS = [ UserRepository, @@ -93,6 +95,13 @@ const PROVIDERS = [ @Module({ imports: [ + QueuesModule.forRoot([ + JobTopicNameEnum.EXECUTION_LOG, + JobTopicNameEnum.WORKFLOW, + JobTopicNameEnum.WEB_SOCKETS, + JobTopicNameEnum.WORKFLOW, + JobTopicNameEnum.INBOUND_PARSE_MAIL, + ]), LoggerModule.forRoot( createNestLoggingModuleOptions({ serviceName: packageJson.name, @@ -101,6 +110,6 @@ const PROVIDERS = [ ), ], providers: [...PROVIDERS], - exports: [...PROVIDERS, LoggerModule], + exports: [...PROVIDERS, LoggerModule, QueuesModule], }) export class SharedModule {} diff --git a/apps/api/src/app/widgets/widgets.module.ts b/apps/api/src/app/widgets/widgets.module.ts index 35fea2559f7..43419f3ada8 100644 --- a/apps/api/src/app/widgets/widgets.module.ts +++ b/apps/api/src/app/widgets/widgets.module.ts @@ -1,7 +1,5 @@ import { Module } from '@nestjs/common'; -import { BaseApiQueuesModule } from '@novu/application-generic'; - import { USE_CASES } from './usecases'; import { WidgetsController } from './widgets.controller'; import { SharedModule } from '../shared/shared.module'; @@ -10,7 +8,7 @@ import { SubscribersModule } from '../subscribers/subscribers.module'; import { IntegrationModule } from '../integrations/integrations.module'; @Module({ - imports: [SharedModule, SubscribersModule, AuthModule, IntegrationModule, BaseApiQueuesModule], + imports: [SharedModule, SubscribersModule, AuthModule, IntegrationModule], providers: [...USE_CASES], exports: [...USE_CASES], controllers: [WidgetsController], diff --git a/apps/inbound-mail/src/server/inbound-mail.service.spec.ts b/apps/inbound-mail/src/server/inbound-mail.service.spec.ts index dbe2b925e00..e65e1343615 100644 --- a/apps/inbound-mail/src/server/inbound-mail.service.spec.ts +++ b/apps/inbound-mail/src/server/inbound-mail.service.spec.ts @@ -24,7 +24,6 @@ describe('Inbound Mail Service', () => { it('should be initialised properly', async () => { expect(inboundMailService).to.be.ok; - expect(inboundMailService).to.have.all.keys('inboundParseQueueService'); expect(inboundMailService.inboundParseQueueService.DEFAULT_ATTEMPTS).to.equal(3); expect(inboundMailService.inboundParseQueueService.topic).to.equal('inbound-parse-mail'); expect(await inboundMailService.inboundParseQueueService.bullMqService.getStatus()).to.deep.equal({ diff --git a/apps/inbound-mail/src/server/inbound-mail.service.ts b/apps/inbound-mail/src/server/inbound-mail.service.ts index 00dba59d02d..6c62eb91c77 100644 --- a/apps/inbound-mail/src/server/inbound-mail.service.ts +++ b/apps/inbound-mail/src/server/inbound-mail.service.ts @@ -1,10 +1,20 @@ -import { BullMqService, InboundParseQueueService, QueueBaseOptions } from '@novu/application-generic'; +import { + BullMqService, + InboundParseQueueService, + QueueBaseOptions, + WorkflowInMemoryProviderService, +} from '@novu/application-generic'; import { JobTopicNameEnum } from '@novu/shared'; export class InboundMailService { public inboundParseQueueService: InboundParseQueueService; - + private workflowInMemoryProviderService: WorkflowInMemoryProviderService; constructor() { - this.inboundParseQueueService = new InboundParseQueueService(); + this.workflowInMemoryProviderService = new WorkflowInMemoryProviderService(); + this.inboundParseQueueService = new InboundParseQueueService(this.workflowInMemoryProviderService); + } + + async start() { + await this.workflowInMemoryProviderService.initialize(); } } diff --git a/apps/inbound-mail/src/server/index.ts b/apps/inbound-mail/src/server/index.ts index d8ec1afc28d..4f2e9f6fb5e 100644 --- a/apps/inbound-mail/src/server/index.ts +++ b/apps/inbound-mail/src/server/index.ts @@ -54,7 +54,7 @@ class Mailin extends events.EventEmitter { this._smtp = null; } - public start(options: object, callback: (err?) => void) { + public async start(options: object, callback: (err?) => void) { // eslint-disable-next-line @typescript-eslint/no-this-alias const _this = this; @@ -447,6 +447,8 @@ class Mailin extends events.EventEmitter { onRcptTo: onRcptTo, }); + await inboundMailService.start(); + const server = new SMTPServer(smtpOptions); this._smtp = server; diff --git a/apps/worker/src/.env.test b/apps/worker/src/.env.test index e80f33fc46a..6d22b42f3f3 100644 --- a/apps/worker/src/.env.test +++ b/apps/worker/src/.env.test @@ -50,7 +50,7 @@ REDIS_CLUSTER_FAMILY= REDIS_CLUSTER_KEY_PREFIX= # MongoDB -MONGO_URL=mongodb://localhost:27017/novu-test +MONGO_URL=mongodb://127.0.0.1:27017/novu-test # Storage S3_LOCAL_STACK=http://localhost:4566 diff --git a/apps/worker/src/app/health/health.controller.ts b/apps/worker/src/app/health/health.controller.ts index 9c7abb80c47..1a71031996d 100644 --- a/apps/worker/src/app/health/health.controller.ts +++ b/apps/worker/src/app/health/health.controller.ts @@ -1,14 +1,7 @@ -import { Controller, Get } from '@nestjs/common'; +import { Controller, Get, Inject } from '@nestjs/common'; import { ApiExcludeController } from '@nestjs/swagger'; import { HealthCheck, HealthCheckResult, HealthCheckService } from '@nestjs/terminus'; -import { - DalServiceHealthIndicator, - StandardQueueServiceHealthIndicator, - WorkflowQueueServiceHealthIndicator, - ActiveJobsMetricQueueServiceHealthIndicator, - CompletedJobsMetricQueueServiceHealthIndicator, - SubscriberProcessQueueHealthIndicator, -} from '@novu/application-generic'; +import { DalServiceHealthIndicator, QueueHealthIndicator } from '@novu/application-generic'; import { version } from '../../../package.json'; @@ -16,13 +9,10 @@ import { version } from '../../../package.json'; @ApiExcludeController() export class HealthController { constructor( + @Inject('QUEUE_HEALTH_INDICATORS') + private healthIndicators: QueueHealthIndicator[], private healthCheckService: HealthCheckService, - private dalHealthIndicator: DalServiceHealthIndicator, - private standardQueueHealthIndicator: StandardQueueServiceHealthIndicator, - private workflowQueueHealthIndicator: WorkflowQueueServiceHealthIndicator, - private activeJobsMetricQueueServiceHealthIndicator: ActiveJobsMetricQueueServiceHealthIndicator, - private completedJobsMetricQueueServiceHealthIndicator: CompletedJobsMetricQueueServiceHealthIndicator, - private subscriberProcessQueueHealthIndicator: SubscriberProcessQueueHealthIndicator + private dalHealthIndicator: DalServiceHealthIndicator ) {} @Get() @@ -30,11 +20,7 @@ export class HealthController { healthCheck(): Promise { return this.healthCheckService.check([ async () => this.dalHealthIndicator.isHealthy(), - async () => this.standardQueueHealthIndicator.isActive(), - async () => this.workflowQueueHealthIndicator.isActive(), - async () => this.activeJobsMetricQueueServiceHealthIndicator.isActive(), - async () => this.completedJobsMetricQueueServiceHealthIndicator.isActive(), - async () => this.subscriberProcessQueueHealthIndicator.isActive(), + ...this.healthIndicators.map((indicator) => async () => indicator.isHealthy()), async () => { return { apiVersion: { diff --git a/apps/worker/src/app/health/health.module.ts b/apps/worker/src/app/health/health.module.ts index 91b37239653..533bb1810e1 100644 --- a/apps/worker/src/app/health/health.module.ts +++ b/apps/worker/src/app/health/health.module.ts @@ -1,13 +1,12 @@ import { Module } from '@nestjs/common'; import { TerminusModule } from '@nestjs/terminus'; -import { QueuesModule } from '@novu/application-generic'; import { HealthController } from './health.controller'; import { SharedModule } from '../shared/shared.module'; @Module({ - imports: [SharedModule, TerminusModule, QueuesModule], + imports: [SharedModule, TerminusModule], controllers: [HealthController], providers: [], }) diff --git a/apps/worker/src/app/shared/shared.module.ts b/apps/worker/src/app/shared/shared.module.ts index 3a6eb744ab7..69832ffa317 100644 --- a/apps/worker/src/app/shared/shared.module.ts +++ b/apps/worker/src/app/shared/shared.module.ts @@ -1,27 +1,27 @@ import { Module } from '@nestjs/common'; import { + ChangeRepository, DalService, - UserRepository, - OrganizationRepository, EnvironmentRepository, ExecutionDetailsRepository, - NotificationTemplateRepository, - SubscriberRepository, - NotificationRepository, - MessageRepository, - NotificationGroupRepository, - MessageTemplateRepository, - MemberRepository, - LayoutRepository, - LogRepository, + FeedRepository, IntegrationRepository, - ChangeRepository, JobRepository, - FeedRepository, + LayoutRepository, + LogRepository, + MemberRepository, + MessageRepository, + MessageTemplateRepository, + NotificationGroupRepository, + NotificationRepository, + NotificationTemplateRepository, + OrganizationRepository, SubscriberPreferenceRepository, + SubscriberRepository, + TenantRepository, TopicRepository, TopicSubscribersRepository, - TenantRepository, + UserRepository, WorkflowOverrideRepository, } from '@novu/dal'; import { @@ -33,6 +33,7 @@ import { createNestLoggingModuleOptions, CreateNotificationJobs, CreateSubscriber, + CreateTenant, DalServiceHealthIndicator, DigestFilterSteps, DigestFilterStepsBackoff, @@ -41,21 +42,25 @@ import { distributedLockService, EventsDistributedLockService, featureFlagsService, + GetTenant, + getUseMergedDigestId, InvalidateCacheService, LoggerModule, + MetricsModule, ProcessSubscriber, + ProcessTenant, + QueuesModule, StorageHelperService, storageService, UpdateSubscriber, UpdateTenant, - GetTenant, - CreateTenant, - ProcessTenant, - getUseMergedDigestId, } from '@novu/application-generic'; import * as packageJson from '../../../package.json'; import { CreateLog } from './logs'; +import { JobTopicNameEnum } from '@novu/shared'; +import { ActiveJobsMetricService } from '../workflow/services'; +import { UNIQUE_WORKER_DEPENDENCIES } from '../../config/worker-init.config'; const DAL_MODELS = [ UserRepository, @@ -122,10 +127,17 @@ const PROVIDERS = [ CreateTenant, ProcessTenant, ...DAL_MODELS, + ActiveJobsMetricService, ]; @Module({ imports: [ + MetricsModule, + QueuesModule.forRoot( + UNIQUE_WORKER_DEPENDENCIES.length + ? [JobTopicNameEnum.ACTIVE_JOBS_METRIC, ...UNIQUE_WORKER_DEPENDENCIES] + : undefined + ), LoggerModule.forRoot( createNestLoggingModuleOptions({ serviceName: packageJson.name, @@ -134,6 +146,6 @@ const PROVIDERS = [ ), ], providers: [...PROVIDERS], - exports: [...PROVIDERS, LoggerModule], + exports: [...PROVIDERS, LoggerModule, QueuesModule], }) export class SharedModule {} diff --git a/apps/worker/src/app/workflow/services/active-jobs-metric.service.spec.ts b/apps/worker/src/app/workflow/services/active-jobs-metric.service.spec.ts deleted file mode 100644 index f7288066938..00000000000 --- a/apps/worker/src/app/workflow/services/active-jobs-metric.service.spec.ts +++ /dev/null @@ -1,105 +0,0 @@ -import { Test, TestingModule } from '@nestjs/testing'; -import { expect } from 'chai'; - -import { - MetricsService, - StandardQueueService, - WebSocketsQueueService, - WorkflowQueueService, -} from '@novu/application-generic'; - -import { ActiveJobsMetricService } from './active-jobs-metric.service'; - -import { WorkflowModule } from '../workflow.module'; - -let activeJobsMetricService: ActiveJobsMetricService; -let standardService: StandardQueueService; -let webSocketsQueueService: WebSocketsQueueService; -let workflowQueueService: WorkflowQueueService; -let metricsService: MetricsService; -let moduleRef: TestingModule; - -describe('Active Jobs Metric Service', () => { - before(async () => { - process.env.IN_MEMORY_CLUSTER_MODE_ENABLED = 'false'; - process.env.IS_IN_MEMORY_CLUSTER_MODE_ENABLED = 'false'; - - moduleRef = await Test.createTestingModule({ - imports: [WorkflowModule], - }).compile(); - - standardService = moduleRef.get(StandardQueueService); - webSocketsQueueService = moduleRef.get(WebSocketsQueueService); - workflowQueueService = moduleRef.get(WorkflowQueueService); - metricsService = moduleRef.get(MetricsService); - - activeJobsMetricService = new ActiveJobsMetricService( - [standardService, webSocketsQueueService, workflowQueueService], - metricsService - ); - }); - - describe('Environment variables not set', () => { - beforeEach(() => { - process.env.NOVU_MANAGED_SERVICE = 'false'; - process.env.NEW_RELIC_LICENSE_KEY = ''; - - activeJobsMetricService = new ActiveJobsMetricService( - [standardService, webSocketsQueueService, workflowQueueService], - metricsService - ); - }); - - it('should not initialize neither the queue or the worker if the environment conditions are not met', async () => { - expect(activeJobsMetricService).to.be.ok; - expect(activeJobsMetricService).to.have.all.keys('tokenList', 'metricsService'); - expect(await activeJobsMetricService.activeJobsMetricQueueService).to.not.be.ok; - expect(await activeJobsMetricService.activeJobsMetricWorkerService).to.not.be.ok; - }); - }); - - describe('Environment variables configured', () => { - beforeEach(async () => { - process.env.NOVU_MANAGED_SERVICE = 'true'; - process.env.NEW_RELIC_LICENSE_KEY = 'license'; - - activeJobsMetricService = new ActiveJobsMetricService( - [standardService, webSocketsQueueService, workflowQueueService], - metricsService - ); - }); - - after(async () => { - await activeJobsMetricService.activeJobsMetricQueueService.queue.drain(); - await activeJobsMetricService.gracefulShutdown(); - }); - - it('should be initialised properly', async () => { - expect(activeJobsMetricService).to.be.ok; - expect(activeJobsMetricService).to.have.all.keys( - 'activeJobsMetricQueueService', - 'activeJobsMetricWorkerService', - 'metricsService', - 'tokenList' - ); - expect(await activeJobsMetricService.activeJobsMetricQueueService.bullMqService.getStatus()).to.deep.equal({ - queueIsPaused: false, - queueName: 'metric-active-jobs', - workerName: undefined, - workerIsPaused: undefined, - workerIsRunning: undefined, - }); - expect(await activeJobsMetricService.activeJobsMetricWorkerService.bullMqService.getStatus()).to.deep.equal({ - queueIsPaused: undefined, - queueName: undefined, - workerName: 'metric-active-jobs', - workerIsPaused: false, - workerIsRunning: true, - }); - expect(activeJobsMetricService.activeJobsMetricWorkerService.worker.opts).to.deep.include({ - concurrency: 1, - lockDuration: 900, - }); - }); - }); -}); diff --git a/apps/worker/src/app/workflow/services/active-jobs-metric.service.ts b/apps/worker/src/app/workflow/services/active-jobs-metric.service.ts index 971157f68c3..de227441478 100644 --- a/apps/worker/src/app/workflow/services/active-jobs-metric.service.ts +++ b/apps/worker/src/app/workflow/services/active-jobs-metric.service.ts @@ -14,15 +14,13 @@ const METRIC_JOB_ID = 'metric-job'; @Injectable() export class ActiveJobsMetricService { - public readonly activeJobsMetricQueueService: ActiveJobsMetricQueueService; - public readonly activeJobsMetricWorkerService: ActiveJobsMetricWorkerService; - - constructor(@Inject('BULLMQ_LIST') private tokenList: QueueBaseService[], private metricsService: MetricsService) { + constructor( + @Inject('BULLMQ_LIST') private tokenList: QueueBaseService[], + public readonly activeJobsMetricQueueService: ActiveJobsMetricQueueService, + public readonly activeJobsMetricWorkerService: ActiveJobsMetricWorkerService, + private metricsService: MetricsService + ) { if (process.env.NOVU_MANAGED_SERVICE === 'true' && process.env.NEW_RELIC_LICENSE_KEY) { - this.activeJobsMetricQueueService = new ActiveJobsMetricQueueService(); - this.activeJobsMetricWorkerService = new ActiveJobsMetricWorkerService(); - - this.activeJobsMetricQueueService.createQueue(); this.activeJobsMetricWorkerService.createWorker(this.getWorkerProcessor(), this.getWorkerOptions()); this.activeJobsMetricWorkerService.worker.on('completed', async (job) => { @@ -91,11 +89,11 @@ export class ActiveJobsMetricService { try { for (const queueService of this.tokenList) { - const waitCount = (queueService.bullMqService.queue as any).getGroupsJobsCount - ? await (queueService.bullMqService.queue as any).getGroupsJobsCount() - : await queueService.bullMqService.queue.getWaitingCount(); - const delayedCount = await queueService.bullMqService.queue.getDelayedCount(); - const activeCount = await queueService.bullMqService.queue.getActiveCount(); + const waitCount = (queueService.instance.queue as any).getGroupsJobsCount + ? await (queueService.instance.queue as any).getGroupsJobsCount() + : await queueService.instance.queue.getWaitingCount(); + const delayedCount = await queueService.instance.queue.getDelayedCount(); + const activeCount = await queueService.instance.queue.getActiveCount(); Logger.verbose('Recording active, waiting, and delayed metrics'); @@ -126,8 +124,4 @@ export class ActiveJobsMetricService { Logger.log('Shutting down the Active Jobs Metric service has finished', LOG_CONTEXT); } - - async onModuleDestroy(): Promise { - await this.gracefulShutdown(); - } } diff --git a/apps/worker/src/app/workflow/services/cold-start.service.ts b/apps/worker/src/app/workflow/services/cold-start.service.ts index ee0494a26db..bc60ed248d0 100644 --- a/apps/worker/src/app/workflow/services/cold-start.service.ts +++ b/apps/worker/src/app/workflow/services/cold-start.service.ts @@ -1,18 +1,8 @@ import { INestApplication } from '@nestjs/common'; import { INovuWorker, ReadinessService } from '@novu/application-generic'; -import { StandardWorker } from './standard.worker'; -import { SubscriberProcessWorker } from './subscriber-process.worker'; -import { WorkflowWorker } from './workflow.worker'; -import { ExecutionLogWorker } from './execution-log.worker'; - const getWorkers = (app: INestApplication): INovuWorker[] => { - const standardWorker = app.get(StandardWorker, { strict: false }); - const workflowWorker = app.get(WorkflowWorker, { strict: false }); - const subscriberProcessWorker = app.get(SubscriberProcessWorker, { strict: false }); - const executionLogWorker = app.get(ExecutionLogWorker, { strict: false }); - - const workers: INovuWorker[] = [standardWorker, workflowWorker, subscriberProcessWorker, executionLogWorker]; + const workers = app.get('ACTIVE_WORKERS'); return workers; }; @@ -27,5 +17,6 @@ export const prepareAppInfra = async (app: INestApplication): Promise => { export const startAppInfra = async (app: INestApplication): Promise => { const readinessService = app.get(ReadinessService); const workers = getWorkers(app); + await readinessService.enableWorkers(workers); }; diff --git a/apps/worker/src/app/workflow/services/completed-jobs-metric.service.spec.ts b/apps/worker/src/app/workflow/services/completed-jobs-metric.service.spec.ts deleted file mode 100644 index 87cfc57fc79..00000000000 --- a/apps/worker/src/app/workflow/services/completed-jobs-metric.service.spec.ts +++ /dev/null @@ -1,102 +0,0 @@ -import { Test, TestingModule } from '@nestjs/testing'; -import { expect } from 'chai'; - -import { - MetricsService, - StandardQueueService, - WebSocketsQueueService, - WorkflowQueueService, -} from '@novu/application-generic'; - -import { CompletedJobsMetricService } from './completed-jobs-metric.service'; - -import { WorkflowModule } from '../workflow.module'; - -let completedJobsMetricService: CompletedJobsMetricService; -let standardService: StandardQueueService; -let webSocketsQueueService: WebSocketsQueueService; -let workflowQueueService: WorkflowQueueService; -let metricsService: MetricsService; -let moduleRef: TestingModule; - -before(async () => { - process.env.IN_MEMORY_CLUSTER_MODE_ENABLED = 'false'; - process.env.IS_IN_MEMORY_CLUSTER_MODE_ENABLED = 'false'; - - moduleRef = await Test.createTestingModule({ - imports: [WorkflowModule], - }).compile(); - - standardService = moduleRef.get(StandardQueueService); - webSocketsQueueService = moduleRef.get(WebSocketsQueueService); - workflowQueueService = moduleRef.get(WorkflowQueueService); - metricsService = moduleRef.get(MetricsService); -}); - -describe('Completed Jobs Metric Service', () => { - describe('Environment variables not set', () => { - beforeEach(() => { - process.env.NOVU_MANAGED_SERVICE = 'false'; - process.env.NEW_RELIC_LICENSE_KEY = ''; - - completedJobsMetricService = new CompletedJobsMetricService( - [standardService, webSocketsQueueService, workflowQueueService], - metricsService - ); - }); - - it('should not initialize neither the queue or the worker if the environment conditions are not met', async () => { - expect(completedJobsMetricService).to.be.ok; - expect(completedJobsMetricService).to.have.all.keys('tokenList', 'metricsService'); - expect(await completedJobsMetricService.completedJobsMetricQueueService).to.not.be.ok; - expect(await completedJobsMetricService.completedJobsMetricWorkerService).to.not.be.ok; - }); - }); - - describe('Environment variables configured', () => { - beforeEach(async () => { - process.env.NOVU_MANAGED_SERVICE = 'true'; - process.env.NEW_RELIC_LICENSE_KEY = 'license'; - - completedJobsMetricService = new CompletedJobsMetricService( - [standardService, webSocketsQueueService, workflowQueueService], - metricsService - ); - }); - - after(async () => { - await completedJobsMetricService.completedJobsMetricQueueService.queue.drain(); - await completedJobsMetricService.gracefulShutdown(); - }); - - it('should be initialised properly', async () => { - expect(completedJobsMetricService).to.be.ok; - expect(completedJobsMetricService).to.have.all.keys( - 'completedJobsMetricQueueService', - 'completedJobsMetricWorkerService', - 'metricsService', - 'tokenList' - ); - expect(await completedJobsMetricService.completedJobsMetricQueueService.bullMqService.getStatus()).to.deep.equal({ - queueIsPaused: false, - queueName: 'metric-completed-jobs', - workerName: undefined, - workerIsPaused: undefined, - workerIsRunning: undefined, - }); - expect(await completedJobsMetricService.completedJobsMetricWorkerService.bullMqService.getStatus()).to.deep.equal( - { - queueIsPaused: undefined, - queueName: undefined, - workerName: 'metric-completed-jobs', - workerIsPaused: false, - workerIsRunning: true, - } - ); - expect(completedJobsMetricService.completedJobsMetricWorkerService.worker.opts).to.deep.include({ - concurrency: 1, - lockDuration: 900, - }); - }); - }); -}); diff --git a/apps/worker/src/app/workflow/services/completed-jobs-metric.service.ts b/apps/worker/src/app/workflow/services/completed-jobs-metric.service.ts deleted file mode 100644 index e38510baae1..00000000000 --- a/apps/worker/src/app/workflow/services/completed-jobs-metric.service.ts +++ /dev/null @@ -1,134 +0,0 @@ -import { - CompletedJobsMetricQueueService, - CompletedJobsMetricWorkerService, - MetricsService, - QueueBaseService, - WorkerOptions, -} from '@novu/application-generic'; -import * as process from 'process'; -import { JobTopicNameEnum } from '@novu/shared'; - -import { Inject, Injectable, Logger } from '@nestjs/common'; - -import { checkingForCronJob } from '../../shared/utils'; - -const LOG_CONTEXT = 'CompletedJobMetricService'; -const METRIC_JOB_ID = 'metric-job'; - -@Injectable() -export class CompletedJobsMetricService { - public readonly completedJobsMetricQueueService: CompletedJobsMetricQueueService; - public readonly completedJobsMetricWorkerService: CompletedJobsMetricWorkerService; - - constructor(@Inject('BULLMQ_LIST') private tokenList: QueueBaseService[], private metricsService: MetricsService) { - if (process.env.NOVU_MANAGED_SERVICE === 'true' && process.env.NEW_RELIC_LICENSE_KEY) { - this.completedJobsMetricQueueService = new CompletedJobsMetricQueueService(); - this.completedJobsMetricWorkerService = new CompletedJobsMetricWorkerService(); - - this.completedJobsMetricQueueService.createQueue(); - this.completedJobsMetricWorkerService.createWorker(this.getWorkerProcessor(), this.getWorkerOptions()); - - this.completedJobsMetricWorkerService.worker.on('completed', async (job) => { - await checkingForCronJob(process.env.COMPLETED_CRON_ID); - Logger.verbose('Metric Completed Job', job.id, LOG_CONTEXT); - }); - - this.completedJobsMetricWorkerService.worker.on('failed', async (job, error) => { - Logger.verbose('Metric Completed Job failed', LOG_CONTEXT, error); - }); - - this.addToQueueIfMetricJobExists(); - } - } - - private addToQueueIfMetricJobExists(): void { - Promise.resolve( - this.completedJobsMetricQueueService.queue.getRepeatableJobs().then((job): boolean => { - let exists = false; - for (const jobElement of job) { - if (jobElement.id === METRIC_JOB_ID) { - exists = true; - } - } - - return exists; - }) - ) - .then(async (exists: boolean): Promise => { - Logger.debug(`metric job exists: ${exists}`, LOG_CONTEXT); - - if (!exists) { - Logger.debug(`metricJob doesn't exist, creating it`, LOG_CONTEXT); - - return await this.completedJobsMetricQueueService.add(METRIC_JOB_ID, undefined, '', { - jobId: METRIC_JOB_ID, - repeatJobKey: METRIC_JOB_ID, - repeat: { - immediately: true, - pattern: '0 * * * * *', - }, - removeOnFail: true, - removeOnComplete: true, - attempts: 1, - }); - } - - return undefined; - }) - .catch((error) => Logger.error('Metric Job Exists function errored', LOG_CONTEXT, error)); - } - - private getWorkerOptions(): WorkerOptions { - return { - lockDuration: 900, - concurrency: 1, - settings: {}, - }; - } - - private getWorkerProcessor() { - return async () => { - return await new Promise(async (resolve, reject): Promise => { - Logger.verbose('metric job started', LOG_CONTEXT); - const deploymentName = process.env.FLEET_NAME ?? 'default'; - - try { - for (const queueService of this.tokenList) { - const metrics = await queueService.bullMqService.getQueueMetrics(0, 1); - const completeNumber = metrics.completed.count; - const failNumber = metrics.failed.count; - - Logger.verbose('active length', process.env.NEW_RELIC_LICENSE_KEY.length); - Logger.verbose('Recording active, waiting, and delayed metrics'); - - this.metricsService.recordMetric(`Queue/${deploymentName}/${queueService.topic}/completed`, completeNumber); - this.metricsService.recordMetric(`Queue/${deploymentName}/${queueService.topic}/failed`, failNumber); - } - - return resolve(); - } catch (error) { - Logger.error({ error }, 'Error occurred while processing metrics', LOG_CONTEXT); - - return reject(error); - } - }); - }; - } - - public async gracefulShutdown(): Promise { - Logger.log('Shutting the Completed Jobs Metric service down', LOG_CONTEXT); - - if (this.completedJobsMetricQueueService) { - await this.completedJobsMetricQueueService.gracefulShutdown(); - } - if (this.completedJobsMetricWorkerService) { - await this.completedJobsMetricWorkerService.gracefulShutdown(); - } - - Logger.log('Shutting down the Completed Jobs Metric service has finished', LOG_CONTEXT); - } - - async onModuleDestroy(): Promise { - await this.gracefulShutdown(); - } -} diff --git a/apps/worker/src/app/workflow/services/execution-log.worker.spec.ts b/apps/worker/src/app/workflow/services/execution-log.worker.spec.ts index 2f54909be0d..5aa24125322 100644 --- a/apps/worker/src/app/workflow/services/execution-log.worker.spec.ts +++ b/apps/worker/src/app/workflow/services/execution-log.worker.spec.ts @@ -2,7 +2,12 @@ import { Test } from '@nestjs/testing'; import { expect } from 'chai'; import { setTimeout } from 'timers/promises'; -import { TriggerEvent, ExecutionLogQueueService, CreateExecutionDetails } from '@novu/application-generic'; +import { + TriggerEvent, + ExecutionLogQueueService, + CreateExecutionDetails, + WorkflowInMemoryProviderService, +} from '@novu/application-generic'; import { ExecutionLogWorker } from './execution-log.worker'; @@ -21,10 +26,13 @@ describe('ExecutionLog Worker', () => { }).compile(); const createExecutionDetails = moduleRef.get(CreateExecutionDetails); + const workflowInMemoryProviderService = moduleRef.get( + WorkflowInMemoryProviderService + ); - executionLogWorker = new ExecutionLogWorker(createExecutionDetails); + executionLogWorker = new ExecutionLogWorker(createExecutionDetails, workflowInMemoryProviderService); - executionLogQueueService = new ExecutionLogQueueService(); + executionLogQueueService = new ExecutionLogQueueService(workflowInMemoryProviderService); await executionLogQueueService.queue.obliterate(); }); @@ -35,7 +43,6 @@ describe('ExecutionLog Worker', () => { it('should be initialised properly', async () => { expect(executionLogWorker).to.be.ok; - expect(executionLogWorker).to.have.all.keys('DEFAULT_ATTEMPTS', 'instance', 'topic', 'createExecutionDetails'); expect(await executionLogWorker.bullMqService.getStatus()).to.deep.equal({ queueIsPaused: undefined, queueName: undefined, diff --git a/apps/worker/src/app/workflow/services/execution-log.worker.ts b/apps/worker/src/app/workflow/services/execution-log.worker.ts index 73b51f852a0..62583b96ef1 100644 --- a/apps/worker/src/app/workflow/services/execution-log.worker.ts +++ b/apps/worker/src/app/workflow/services/execution-log.worker.ts @@ -1,4 +1,4 @@ -import { Injectable, Logger } from '@nestjs/common'; +import { Inject, Injectable, Logger } from '@nestjs/common'; const nr = require('newrelic'); import { getExecutionLogWorkerOptions, @@ -11,20 +11,22 @@ import { WorkerProcessor, CreateExecutionDetails, CreateExecutionDetailsCommand, + BullMqService, + WorkflowInMemoryProviderService, } from '@novu/application-generic'; import { ObservabilityBackgroundTransactionEnum } from '@novu/shared'; const LOG_CONTEXT = 'ExecutionLogWorker'; @Injectable() export class ExecutionLogWorker extends ExecutionLogWorkerService implements INovuWorker { - constructor(private createExecutionDetails: CreateExecutionDetails) { - super(); + constructor( + private createExecutionDetails: CreateExecutionDetails, + public workflowInMemoryProviderService: WorkflowInMemoryProviderService + ) { + super(new BullMqService(workflowInMemoryProviderService)); + this.initWorker(this.getWorkerProcessor(), this.getWorkerOptions()); } - gracefulShutdown: () => Promise; - onModuleDestroy: () => Promise; - pause: () => Promise; - resume: () => Promise; private getWorkerOptions(): WorkerOptions { return getExecutionLogWorkerOptions(); diff --git a/apps/worker/src/app/workflow/services/index.ts b/apps/worker/src/app/workflow/services/index.ts index d2c7247dc97..400282e4d8d 100644 --- a/apps/worker/src/app/workflow/services/index.ts +++ b/apps/worker/src/app/workflow/services/index.ts @@ -1,5 +1,4 @@ export * from './active-jobs-metric.service'; -export * from './completed-jobs-metric.service'; export * from './standard.worker'; export * from './workflow.worker'; export * from './execution-log.worker'; diff --git a/apps/worker/src/app/workflow/services/standard.worker.spec.ts b/apps/worker/src/app/workflow/services/standard.worker.spec.ts index 869a428e9af..e60dcb18e27 100644 --- a/apps/worker/src/app/workflow/services/standard.worker.spec.ts +++ b/apps/worker/src/app/workflow/services/standard.worker.spec.ts @@ -27,7 +27,7 @@ import { UserService, JobsService, } from '@novu/testing'; -import { StandardQueueService } from '@novu/application-generic'; +import { BullMqService, StandardQueueService, WorkflowInMemoryProviderService } from '@novu/application-generic'; import { StandardWorker } from './standard.worker'; @@ -39,6 +39,7 @@ import { SetJobAsFailed, WebhookFilterBackoffStrategy, } from '../usecases'; +import { SharedModule } from '../../shared/shared.module'; let standardQueueService: StandardQueueService; let standardWorker: StandardWorker; @@ -54,6 +55,9 @@ describe('Standard Worker', () => { let jobsService: JobsService; before(async () => { + const moduleRef = await Test.createTestingModule({ + imports: [WorkflowModule], + }).compile(); process.env.IN_MEMORY_CLUSTER_MODE_ENABLED = 'false'; process.env.IS_IN_MEMORY_CLUSTER_MODE_ENABLED = 'false'; @@ -90,14 +94,17 @@ describe('Standard Worker', () => { const templateService = new NotificationTemplateService(user._id, organization._id, environment._id); template = await templateService.createTemplate({ noFeedId: true, noLayoutId: true, noGroupId: true }); + const workflowInMemoryProviderService = moduleRef.get( + WorkflowInMemoryProviderService + ); - standardQueueService = new StandardQueueService(); + standardQueueService = new StandardQueueService(workflowInMemoryProviderService); await standardQueueService.queue.obliterate(); }); beforeEach(async () => { const moduleRef = await Test.createTestingModule({ - imports: [WorkflowModule], + imports: [WorkflowModule, SharedModule], }).compile(); const handleLastFailedJob = moduleRef.get(HandleLastFailedJob); @@ -105,13 +112,17 @@ describe('Standard Worker', () => { const setJobAsCompleted = moduleRef.get(SetJobAsCompleted); const setJobAsFailed = moduleRef.get(SetJobAsFailed); const webhookFilterBackoffStrategy = moduleRef.get(WebhookFilterBackoffStrategy); + const workflowInMemoryProviderService = moduleRef.get( + WorkflowInMemoryProviderService + ); standardWorker = new StandardWorker( handleLastFailedJob, runJob, setJobAsCompleted, setJobAsFailed, - webhookFilterBackoffStrategy + webhookFilterBackoffStrategy, + workflowInMemoryProviderService ); }); @@ -122,17 +133,7 @@ describe('Standard Worker', () => { it('should be initialised properly', async () => { expect(standardWorker).to.be.ok; - expect(standardWorker).to.have.all.keys( - 'DEFAULT_ATTEMPTS', - 'getBackoffStrategies', - 'handleLastFailedJob', - 'instance', - 'runJob', - 'setJobAsCompleted', - 'setJobAsFailed', - 'topic', - 'webhookFilterBackoffStrategy' - ); + expect(standardWorker.DEFAULT_ATTEMPTS).to.eql(3); expect(standardWorker.worker).to.deep.include({ _eventsCount: 1, diff --git a/apps/worker/src/app/workflow/services/standard.worker.ts b/apps/worker/src/app/workflow/services/standard.worker.ts index 9253993de62..62c61eb8f3f 100644 --- a/apps/worker/src/app/workflow/services/standard.worker.ts +++ b/apps/worker/src/app/workflow/services/standard.worker.ts @@ -1,12 +1,8 @@ const nr = require('newrelic'); import { forwardRef, Inject, Injectable, Logger } from '@nestjs/common'; +import { IJobData, ObservabilityBackgroundTransactionEnum } from '@novu/shared'; import { - ExecutionDetailsSourceEnum, - ExecutionDetailsStatusEnum, - IJobData, - ObservabilityBackgroundTransactionEnum, -} from '@novu/shared'; -import { + BullMqService, getStandardWorkerOptions, INovuWorker, Job, @@ -15,6 +11,7 @@ import { storage, Store, WorkerOptions, + WorkflowInMemoryProviderService, } from '@novu/application-generic'; import { @@ -39,9 +36,11 @@ export class StandardWorker extends StandardWorkerService implements INovuWorker @Inject(forwardRef(() => SetJobAsCompleted)) private setJobAsCompleted: SetJobAsCompleted, @Inject(forwardRef(() => SetJobAsFailed)) private setJobAsFailed: SetJobAsFailed, @Inject(forwardRef(() => WebhookFilterBackoffStrategy)) - private webhookFilterBackoffStrategy: WebhookFilterBackoffStrategy + private webhookFilterBackoffStrategy: WebhookFilterBackoffStrategy, + @Inject(forwardRef(() => WorkflowInMemoryProviderService)) + public workflowInMemoryProviderService: WorkflowInMemoryProviderService ) { - super(); + super(new BullMqService(workflowInMemoryProviderService)); this.initWorker(this.getWorkerProcessor(), this.getWorkerOptions()); diff --git a/apps/worker/src/app/workflow/services/subscriber-process.worker.ts b/apps/worker/src/app/workflow/services/subscriber-process.worker.ts index f70cb8574d4..71c2bfffa84 100644 --- a/apps/worker/src/app/workflow/services/subscriber-process.worker.ts +++ b/apps/worker/src/app/workflow/services/subscriber-process.worker.ts @@ -1,4 +1,4 @@ -import { Injectable, Logger } from '@nestjs/common'; +import { Inject, Injectable, Logger } from '@nestjs/common'; const nr = require('newrelic'); import { ObservabilityBackgroundTransactionEnum } from '@novu/shared'; import { @@ -10,14 +10,21 @@ import { storage, Store, WorkerOptions, + INovuWorker, + BullMqService, + WorkflowInMemoryProviderService, } from '@novu/application-generic'; const LOG_CONTEXT = 'SubscriberProcessWorker'; @Injectable() -export class SubscriberProcessWorker extends SubscriberProcessWorkerService { - constructor(private subscriberJobBoundUsecase: SubscriberJobBound) { - super(); +export class SubscriberProcessWorker extends SubscriberProcessWorkerService implements INovuWorker { + constructor( + private subscriberJobBoundUsecase: SubscriberJobBound, + public workflowInMemoryProviderService: WorkflowInMemoryProviderService + ) { + super(new BullMqService(workflowInMemoryProviderService)); + this.initWorker(this.getWorkerProcessor(), this.getWorkerOpts()); } diff --git a/apps/worker/src/app/workflow/services/workflow.worker.spec.ts b/apps/worker/src/app/workflow/services/workflow.worker.spec.ts index 65b56bd5d00..c1f915a8dc9 100644 --- a/apps/worker/src/app/workflow/services/workflow.worker.spec.ts +++ b/apps/worker/src/app/workflow/services/workflow.worker.spec.ts @@ -2,7 +2,12 @@ import { Test } from '@nestjs/testing'; import { expect } from 'chai'; import { setTimeout } from 'timers/promises'; -import { TriggerEvent, WorkflowQueueService } from '@novu/application-generic'; +import { + BullMqService, + TriggerEvent, + WorkflowInMemoryProviderService, + WorkflowQueueService, +} from '@novu/application-generic'; import { WorkflowWorker } from './workflow.worker'; @@ -21,9 +26,12 @@ describe('Workflow Worker', () => { }).compile(); const triggerEventUseCase = moduleRef.get(TriggerEvent); - workflowWorker = new WorkflowWorker(triggerEventUseCase); + const workflowInMemoryProviderService = moduleRef.get( + WorkflowInMemoryProviderService + ); + workflowWorker = new WorkflowWorker(triggerEventUseCase, workflowInMemoryProviderService); - workflowQueueService = new WorkflowQueueService(); + workflowQueueService = new WorkflowQueueService(workflowInMemoryProviderService); await workflowQueueService.queue.obliterate(); }); @@ -34,7 +42,6 @@ describe('Workflow Worker', () => { it('should be initialised properly', async () => { expect(workflowWorker).to.be.ok; - expect(workflowWorker).to.have.all.keys('DEFAULT_ATTEMPTS', 'instance', 'topic', 'triggerEventUsecase'); expect(await workflowWorker.bullMqService.getStatus()).to.deep.equal({ queueIsPaused: undefined, queueName: undefined, diff --git a/apps/worker/src/app/workflow/services/workflow.worker.ts b/apps/worker/src/app/workflow/services/workflow.worker.ts index 7bd417a00ef..c9e2cc0c6d8 100644 --- a/apps/worker/src/app/workflow/services/workflow.worker.ts +++ b/apps/worker/src/app/workflow/services/workflow.worker.ts @@ -1,4 +1,4 @@ -import { Injectable, Logger } from '@nestjs/common'; +import { Inject, Injectable, Logger } from '@nestjs/common'; const nr = require('newrelic'); import { getWorkflowWorkerOptions, @@ -11,6 +11,8 @@ import { WorkflowWorkerService, WorkerOptions, WorkerProcessor, + BullMqService, + WorkflowInMemoryProviderService, } from '@novu/application-generic'; import { ObservabilityBackgroundTransactionEnum } from '@novu/shared'; @@ -18,8 +20,11 @@ const LOG_CONTEXT = 'WorkflowWorker'; @Injectable() export class WorkflowWorker extends WorkflowWorkerService implements INovuWorker { - constructor(private triggerEventUsecase: TriggerEvent) { - super(); + constructor( + private triggerEventUsecase: TriggerEvent, + public workflowInMemoryProviderService: WorkflowInMemoryProviderService + ) { + super(new BullMqService(workflowInMemoryProviderService)); this.initWorker(this.getWorkerProcessor(), this.getWorkerOptions()); } diff --git a/apps/worker/src/app/workflow/workflow.module.ts b/apps/worker/src/app/workflow/workflow.module.ts index 734592ec2f9..2468195446f 100644 --- a/apps/worker/src/app/workflow/workflow.module.ts +++ b/apps/worker/src/app/workflow/workflow.module.ts @@ -1,10 +1,8 @@ -import { Module, Provider } from '@nestjs/common'; +import { Module, OnApplicationShutdown, Provider } from '@nestjs/common'; import { AddDelayJob, MergeOrCreateDigest, AddJob, - BullMqService, - bullMqTokenList, BulkCreateExecutionDetails, CalculateLimitNovuIntegration, CompileEmailTemplate, @@ -18,7 +16,6 @@ import { GetSubscriberGlobalPreference, GetSubscriberTemplatePreference, ProcessTenant, - QueuesModule, SelectIntegration, SendTestEmail, SendTestEmailCommand, @@ -32,18 +29,10 @@ import { SubscriberJobBound, TriggerBroadcast, TriggerMulticast, - MetricsModule, + WorkflowInMemoryProviderService, } from '@novu/application-generic'; import { JobRepository } from '@novu/dal'; -import { - ExecutionLogWorker, - ActiveJobsMetricService, - CompletedJobsMetricService, - StandardWorker, - WorkflowWorker, -} from './services'; - import { SendMessage, SendMessageChat, @@ -65,7 +54,7 @@ import { } from './usecases'; import { SharedModule } from '../shared/shared.module'; -import { SubscriberProcessWorker } from './services/subscriber-process.worker'; +import { ACTIVE_WORKERS } from '../../config/worker-init.config'; const REPOSITORIES = [JobRepository]; @@ -118,20 +107,36 @@ const USE_CASES = [ TriggerMulticast, ]; -const PROVIDERS: Provider[] = [ - ActiveJobsMetricService, - BullMqService, - bullMqTokenList, - CompletedJobsMetricService, - StandardWorker, - WorkflowWorker, - ExecutionLogWorker, - SubscriberProcessWorker, -]; +const PROVIDERS: Provider[] = []; +const activeWorkersToken: any = { + provide: 'ACTIVE_WORKERS', + useFactory: (...args: any[]) => { + return args; + }, + inject: ACTIVE_WORKERS, +}; + +const memoryQueueService = { + provide: WorkflowInMemoryProviderService, + useFactory: async () => { + const memoryService = new WorkflowInMemoryProviderService(); + + await memoryService.initialize(); + + return memoryService; + }, +}; @Module({ - imports: [SharedModule, QueuesModule, MetricsModule], + imports: [SharedModule], controllers: [], - providers: [...PROVIDERS, ...USE_CASES, ...REPOSITORIES], + providers: [memoryQueueService, ...ACTIVE_WORKERS, ...PROVIDERS, ...USE_CASES, ...REPOSITORIES, activeWorkersToken], + exports: [...PROVIDERS, ...USE_CASES, ...REPOSITORIES, activeWorkersToken], }) -export class WorkflowModule {} +export class WorkflowModule implements OnApplicationShutdown { + constructor(private workflowInMemoryProviderService: WorkflowInMemoryProviderService) {} + + async onApplicationShutdown() { + await this.workflowInMemoryProviderService.shutdown(); + } +} diff --git a/apps/worker/src/config/worker-init.config.ts b/apps/worker/src/config/worker-init.config.ts new file mode 100644 index 00000000000..a1fed41f978 --- /dev/null +++ b/apps/worker/src/config/worker-init.config.ts @@ -0,0 +1,86 @@ +import { Provider } from '@nestjs/common'; + +import { JobTopicNameEnum } from '@novu/shared'; + +import { ExecutionLogWorker, StandardWorker, WorkflowWorker } from '../app/workflow/services'; +import { SubscriberProcessWorker } from '../app/workflow/services/subscriber-process.worker'; + +type WorkerClass = + | typeof StandardWorker + | typeof WorkflowWorker + | typeof ExecutionLogWorker + | typeof SubscriberProcessWorker; + +type WorkerModuleTree = { workerClass: WorkerClass; queueDependencies: JobTopicNameEnum[] }; + +type WorkerDepTree = Partial>; + +export const WORKER_MAPPING: WorkerDepTree = { + [JobTopicNameEnum.STANDARD]: { + workerClass: StandardWorker, + queueDependencies: [ + JobTopicNameEnum.EXECUTION_LOG, + JobTopicNameEnum.WEB_SOCKETS, + JobTopicNameEnum.STANDARD, + JobTopicNameEnum.PROCESS_SUBSCRIBER, + ], + }, + [JobTopicNameEnum.WORKFLOW]: { + workerClass: WorkflowWorker, + queueDependencies: [ + JobTopicNameEnum.EXECUTION_LOG, + JobTopicNameEnum.PROCESS_SUBSCRIBER, + JobTopicNameEnum.STANDARD, + JobTopicNameEnum.WEB_SOCKETS, + ], + }, + [JobTopicNameEnum.EXECUTION_LOG]: { + workerClass: ExecutionLogWorker, + queueDependencies: [ + JobTopicNameEnum.EXECUTION_LOG, + JobTopicNameEnum.STANDARD, + JobTopicNameEnum.WEB_SOCKETS, + JobTopicNameEnum.PROCESS_SUBSCRIBER, + ], + }, + [JobTopicNameEnum.PROCESS_SUBSCRIBER]: { + workerClass: SubscriberProcessWorker, + queueDependencies: [JobTopicNameEnum.EXECUTION_LOG], + }, +}; + +const validQueueEntries = Object.keys(JobTopicNameEnum).map((key) => JobTopicNameEnum[key]); +const isQueueEntry = (queueName: string): queueName is JobTopicNameEnum => { + return validQueueEntries.includes(queueName); +}; + +export const workersToProcess = + process.env.ACTIVE_WORKERS?.split(',').map((queue) => { + const queueName = queue.trim(); + if (!isQueueEntry(queueName)) { + throw new Error(`Invalid queue name ${queueName}`); + } + + return queueName; + }) || []; + +const WORKER_DEPENDENCIES: JobTopicNameEnum[] = workersToProcess.reduce((history, worker) => { + const workerDependencies: JobTopicNameEnum[] = WORKER_MAPPING[worker]?.queueDependencies || []; + + return [...history, ...workerDependencies]; +}, []); + +export const UNIQUE_WORKER_DEPENDENCIES = [...new Set(WORKER_DEPENDENCIES)]; + +export const ACTIVE_WORKERS: Provider[] | any[] = []; + +if (!workersToProcess.length) { + ACTIVE_WORKERS.push(StandardWorker, WorkflowWorker, ExecutionLogWorker, SubscriberProcessWorker); +} else { + workersToProcess.forEach((queue) => { + const workerClass = WORKER_MAPPING[queue]?.workerClass; + if (workerClass) { + ACTIVE_WORKERS.push(workerClass); + } + }); +} diff --git a/apps/ws/src/health/health.module.ts b/apps/ws/src/health/health.module.ts index 4f570ccfbb0..411444eb423 100644 --- a/apps/ws/src/health/health.module.ts +++ b/apps/ws/src/health/health.module.ts @@ -1,12 +1,11 @@ import { Module } from '@nestjs/common'; import { TerminusModule } from '@nestjs/terminus'; -import { QueuesModule } from '@novu/application-generic'; import { HealthController } from './health.controller'; import { SharedModule } from '../shared/shared.module'; @Module({ - imports: [TerminusModule, SharedModule, QueuesModule], + imports: [TerminusModule, SharedModule], controllers: [HealthController], }) export class HealthModule {} diff --git a/apps/ws/src/shared/shared.module.ts b/apps/ws/src/shared/shared.module.ts index cc1b68a371c..ee4606439cf 100644 --- a/apps/ws/src/shared/shared.module.ts +++ b/apps/ws/src/shared/shared.module.ts @@ -11,9 +11,10 @@ import { MessageRepository, MemberRepository, } from '@novu/dal'; -import { AnalyticsService, DalServiceHealthIndicator } from '@novu/application-generic'; +import { AnalyticsService, DalServiceHealthIndicator, QueuesModule } from '@novu/application-generic'; import { SubscriberOnlineService } from './subscriber-online'; +import { JobTopicNameEnum } from '@novu/shared'; const DAL_MODELS = [ UserRepository, @@ -50,6 +51,7 @@ const PROVIDERS = [analyticsService, dalService, DalServiceHealthIndicator, Subs @Module({ imports: [ + QueuesModule.forRoot([JobTopicNameEnum.WEB_SOCKETS]), JwtModule.register({ secretOrKeyProvider: () => process.env.JWT_SECRET as string, signOptions: { @@ -58,6 +60,6 @@ const PROVIDERS = [analyticsService, dalService, DalServiceHealthIndicator, Subs }), ], providers: [...PROVIDERS], - exports: [...PROVIDERS, JwtModule], + exports: [...PROVIDERS, JwtModule, QueuesModule], }) export class SharedModule {} diff --git a/apps/ws/src/socket/services/web-socket.worker.spec.ts b/apps/ws/src/socket/services/web-socket.worker.spec.ts index 3b6c7502ea6..075bedcfc21 100644 --- a/apps/ws/src/socket/services/web-socket.worker.spec.ts +++ b/apps/ws/src/socket/services/web-socket.worker.spec.ts @@ -2,7 +2,7 @@ import { Test } from '@nestjs/testing'; import { expect } from 'chai'; import { setTimeout } from 'timers/promises'; -import { WebSocketsQueueService } from '@novu/application-generic'; +import { WebSocketsQueueService, WorkflowInMemoryProviderService } from '@novu/application-generic'; import { WebSocketWorker } from './web-socket.worker'; @@ -22,9 +22,13 @@ describe('WebSocket Worker', () => { }).compile(); const externalServicesRoute = moduleRef.get(ExternalServicesRoute); - webSocketWorker = new WebSocketWorker(externalServicesRoute); + const workflowInMemoryProviderService = moduleRef.get( + WorkflowInMemoryProviderService + ); - webSocketsQueueService = new WebSocketsQueueService(); + webSocketWorker = new WebSocketWorker(externalServicesRoute, workflowInMemoryProviderService); + + webSocketsQueueService = new WebSocketsQueueService(workflowInMemoryProviderService); await webSocketsQueueService.queue.obliterate(); }); @@ -35,7 +39,6 @@ describe('WebSocket Worker', () => { it('should be initialised properly', async () => { expect(webSocketWorker).to.be.ok; - expect(webSocketWorker).to.have.all.keys('DEFAULT_ATTEMPTS', 'instance', 'externalServicesRoute', 'topic'); expect(await webSocketWorker.bullMqService.getStatus()).to.deep.equal({ queueIsPaused: undefined, queueName: undefined, diff --git a/apps/ws/src/socket/services/web-socket.worker.ts b/apps/ws/src/socket/services/web-socket.worker.ts index 106a70249a1..b216193b737 100644 --- a/apps/ws/src/socket/services/web-socket.worker.ts +++ b/apps/ws/src/socket/services/web-socket.worker.ts @@ -2,10 +2,12 @@ const nr = require('newrelic'); import { Injectable, Logger } from '@nestjs/common'; import { + BullMqService, getWebSocketWorkerOptions, INovuWorker, WebSocketsWorkerService, WorkerOptions, + WorkflowInMemoryProviderService, } from '@novu/application-generic'; import { ExternalServicesRoute, ExternalServicesRouteCommand } from '../usecases/external-services-route'; @@ -15,8 +17,11 @@ const LOG_CONTEXT = 'WebSocketWorker'; @Injectable() export class WebSocketWorker extends WebSocketsWorkerService implements INovuWorker { - constructor(private externalServicesRoute: ExternalServicesRoute) { - super(); + constructor( + private externalServicesRoute: ExternalServicesRoute, + private workflowInMemoryProviderService: WorkflowInMemoryProviderService + ) { + super(new BullMqService(workflowInMemoryProviderService)); this.initWorker(this.getWorkerProcessor(), this.getWorkerOpts()); } diff --git a/apps/ws/src/socket/socket.module.ts b/apps/ws/src/socket/socket.module.ts index 15173f355a1..2acb4d51d56 100644 --- a/apps/ws/src/socket/socket.module.ts +++ b/apps/ws/src/socket/socket.module.ts @@ -1,6 +1,5 @@ -import { Inject, Module, OnModuleInit, Provider } from '@nestjs/common'; -import { JobTopicNameEnum } from '@novu/shared'; -import { WebSocketsWorkerService } from '@novu/application-generic'; +import { Inject, Module, OnApplicationShutdown, OnModuleInit, Provider } from '@nestjs/common'; +import { WebSocketsWorkerService, WorkflowInMemoryProviderService } from '@novu/application-generic'; import { WSGateway } from './ws.gateway'; import { SharedModule } from '../shared/shared.module'; @@ -12,9 +11,26 @@ const USE_CASES: Provider[] = [ExternalServicesRoute]; const PROVIDERS: Provider[] = [WSGateway, WebSocketsWorkerService, WebSocketWorker]; +const memoryQueueService = { + provide: WorkflowInMemoryProviderService, + useFactory: async () => { + const memoryService = new WorkflowInMemoryProviderService(); + + await memoryService.initialize(); + + return memoryService; + }, +}; + @Module({ imports: [SharedModule], - providers: [...PROVIDERS, ...USE_CASES], + providers: [...PROVIDERS, ...USE_CASES, memoryQueueService], exports: [WSGateway], }) -export class SocketModule {} +export class SocketModule implements OnApplicationShutdown { + constructor(private workflowInMemoryProviderService: WorkflowInMemoryProviderService) {} + + async onApplicationShutdown() { + await this.workflowInMemoryProviderService.shutdown(); + } +} diff --git a/libs/shared/src/config/job-queue.ts b/libs/shared/src/config/job-queue.ts index b5abfee2482..3250ed05436 100644 --- a/libs/shared/src/config/job-queue.ts +++ b/libs/shared/src/config/job-queue.ts @@ -1,7 +1,6 @@ export enum JobTopicNameEnum { EXECUTION_LOG = 'execution-logs', ACTIVE_JOBS_METRIC = 'metric-active-jobs', - COMPLETED_JOBS_METRIC = 'metric-completed-jobs', INBOUND_PARSE_MAIL = 'inbound-parse-mail', STANDARD = 'standard', WEB_SOCKETS = 'ws_socket_queue', diff --git a/libs/testing/src/jobs.service.ts b/libs/testing/src/jobs.service.ts index 9ae0db65e35..fc2ec8743c9 100644 --- a/libs/testing/src/jobs.service.ts +++ b/libs/testing/src/jobs.service.ts @@ -12,11 +12,12 @@ export class JobsService { public standardQueue: Queue; public workflowQueue: Queue; public subscriberProcessQueue: Queue; - + public executionLogQueue: Queue; constructor(private isClusterMode?: boolean) { this.workflowQueue = new TestingQueueService(JobTopicNameEnum.WORKFLOW).queue; this.standardQueue = new TestingQueueService(JobTopicNameEnum.STANDARD).queue; this.subscriberProcessQueue = new TestingQueueService(JobTopicNameEnum.PROCESS_SUBSCRIBER).queue; + this.executionLogQueue = new TestingQueueService(JobTopicNameEnum.EXECUTION_LOG).queue; } public async queueGet(jobTopicName: JobTopicNameEnum, getter: 'getDelayed') { @@ -89,6 +90,8 @@ export class JobsService { activeStandardJobsCount, subscriberProcessQueueWaitingCount, subscriberProcessQueueActiveCount, + executionLogQueueWaitingCount, + executionLogQueueActiveCount, ] = await Promise.all([ this.workflowQueue.getActiveCount(), this.workflowQueue.getWaitingCount(), @@ -98,6 +101,9 @@ export class JobsService { this.subscriberProcessQueue.getWaitingCount(), this.subscriberProcessQueue.getActiveCount(), + + this.executionLogQueue.getWaitingCount(), + this.executionLogQueue.getActiveCount(), ]); const totalCount = @@ -106,7 +112,9 @@ export class JobsService { waitingStandardJobsCount + activeStandardJobsCount + subscriberProcessQueueWaitingCount + - subscriberProcessQueueActiveCount; + subscriberProcessQueueActiveCount + + executionLogQueueWaitingCount + + executionLogQueueActiveCount; return { totalCount, @@ -116,6 +124,8 @@ export class JobsService { activeStandardJobsCount, subscriberProcessQueueWaitingCount, subscriberProcessQueueActiveCount, + executionLogQueueWaitingCount, + executionLogQueueActiveCount, }; } } diff --git a/packages/application-generic/src/custom-providers/index.ts b/packages/application-generic/src/custom-providers/index.ts index 0593e21480b..934f2b80a41 100644 --- a/packages/application-generic/src/custom-providers/index.ts +++ b/packages/application-generic/src/custom-providers/index.ts @@ -4,7 +4,6 @@ import { CacheInMemoryProviderService, CacheService, DistributedLockService, - ExecutionLogQueueService, FeatureFlagsService, InboundParseQueueService, ReadinessService, @@ -12,6 +11,8 @@ import { SubscriberProcessQueueService, WebSocketsQueueService, WorkflowQueueService, + ExecutionLogQueueService, + WorkflowInMemoryProviderService, } from '../services'; import { GetIsApiRateLimitingEnabled, @@ -72,17 +73,6 @@ export const cacheInMemoryProviderService = { }, }; -export const bullMqService = { - provide: BullMqService, - useFactory: async (): Promise => { - const service = new BullMqService(); - - await service.initialize(); - - return service; - }, -}; - export const cacheService = { provide: CacheService, useFactory: async (): Promise => { @@ -122,32 +112,3 @@ export const distributedLockService = { return service; }, }; - -export const bullMqTokenList = { - provide: 'BULLMQ_LIST', - useFactory: ( - standardQueueService: StandardQueueService, - webSocketsQueueService: WebSocketsQueueService, - workflowQueueService: WorkflowQueueService, - subscriberProcessQueueService: SubscriberProcessQueueService, - executionLogQueueService: ExecutionLogQueueService, - inboundParseQueueService: InboundParseQueueService - ) => { - return [ - standardQueueService, - webSocketsQueueService, - workflowQueueService, - subscriberProcessQueueService, - executionLogQueueService, - inboundParseQueueService, - ]; - }, - inject: [ - StandardQueueService, - WebSocketsQueueService, - WorkflowQueueService, - SubscriberProcessQueueService, - ExecutionLogQueueService, - InboundParseQueueService, - ], -}; diff --git a/packages/application-generic/src/health/completed-jobs-metric-queue.health-indicator.ts b/packages/application-generic/src/health/completed-jobs-metric-queue.health-indicator.ts deleted file mode 100644 index e969190a323..00000000000 --- a/packages/application-generic/src/health/completed-jobs-metric-queue.health-indicator.ts +++ /dev/null @@ -1,21 +0,0 @@ -import { Injectable } from '@nestjs/common'; - -import { CompletedJobsMetricQueueService } from '../services/queues'; -import { QueueHealthIndicator } from './queue-health-indicator.service'; - -const LOG_CONTEXT = 'CompletedJobsMetricQueueServiceHealthIndicator'; -const INDICATOR_KEY = 'completedJobsMetricQueue'; -const SERVICE_NAME = 'CompletedJobsMetricQueueService'; -@Injectable() -export class CompletedJobsMetricQueueServiceHealthIndicator extends QueueHealthIndicator { - constructor( - private completedJobsMetricQueueService: CompletedJobsMetricQueueService - ) { - super( - completedJobsMetricQueueService, - INDICATOR_KEY, - SERVICE_NAME, - LOG_CONTEXT - ); - } -} diff --git a/packages/application-generic/src/health/index.ts b/packages/application-generic/src/health/index.ts index b86313e3d85..8a688e52ff3 100644 --- a/packages/application-generic/src/health/index.ts +++ b/packages/application-generic/src/health/index.ts @@ -1,9 +1,9 @@ export * from './active-jobs-metric-queue.health-indicator'; export * from './cache.health-indicator'; -export * from './completed-jobs-metric-queue.health-indicator'; export * from './dal.health-indicator'; export * from './inbound-parse-queue.health-indicator'; export * from './standard-queue.health-indicator'; export * from './web-sockets-queue.health-indicator'; export * from './workflow-queue.health-indicator'; export * from './subscriber-process-queue.health-indicator'; +export * from './queue-health-indicator.service'; diff --git a/packages/application-generic/src/modules/index.ts b/packages/application-generic/src/modules/index.ts index c079110c115..c9d351ed23c 100644 --- a/packages/application-generic/src/modules/index.ts +++ b/packages/application-generic/src/modules/index.ts @@ -1,3 +1,2 @@ export { QueuesModule } from './queues.module'; -export { BaseApiQueuesModule } from './queues.module'; export { MetricsModule } from './metrics.module'; diff --git a/packages/application-generic/src/modules/queues.module.ts b/packages/application-generic/src/modules/queues.module.ts index 38ef0f38638..8efe65c361c 100644 --- a/packages/application-generic/src/modules/queues.module.ts +++ b/packages/application-generic/src/modules/queues.module.ts @@ -1,19 +1,21 @@ -import { Module, Provider } from '@nestjs/common'; +import { + DynamicModule, + Module, + OnApplicationShutdown, + Provider, +} from '@nestjs/common'; -import { bullMqTokenList } from '../custom-providers'; import { ActiveJobsMetricQueueServiceHealthIndicator, - CompletedJobsMetricQueueServiceHealthIndicator, InboundParseQueueServiceHealthIndicator, StandardQueueServiceHealthIndicator, SubscriberProcessQueueHealthIndicator, WebSocketsQueueServiceHealthIndicator, WorkflowQueueServiceHealthIndicator, } from '../health'; -import { ReadinessService } from '../services'; +import { ReadinessService, WorkflowInMemoryProviderService } from '../services'; import { ActiveJobsMetricQueueService, - CompletedJobsMetricQueueService, ExecutionLogQueueService, InboundParseQueueService, StandardQueueService, @@ -21,62 +23,122 @@ import { WebSocketsQueueService, WorkflowQueueService, } from '../services/queues'; -import { - ActiveJobsMetricWorkerService, - CompletedJobsMetricWorkerService, - InboundParseWorker, - StandardWorkerService, - SubscriberProcessWorkerService, - WebSocketsWorkerService, - WorkflowWorkerService, -} from '../services/workers'; +import { ActiveJobsMetricWorkerService } from '../services/workers'; +import { JobTopicNameEnum } from '@novu/shared'; -const PROVIDERS: Provider[] = [ - ActiveJobsMetricQueueService, - ActiveJobsMetricQueueServiceHealthIndicator, - ActiveJobsMetricWorkerService, - bullMqTokenList, - CompletedJobsMetricQueueService, - CompletedJobsMetricQueueServiceHealthIndicator, - CompletedJobsMetricWorkerService, - InboundParseQueueService, - InboundParseWorker, - InboundParseQueueServiceHealthIndicator, - ReadinessService, - StandardQueueService, - StandardQueueServiceHealthIndicator, - StandardWorkerService, - WebSocketsQueueService, - WebSocketsQueueServiceHealthIndicator, - WebSocketsWorkerService, - WorkflowQueueService, - ExecutionLogQueueService, - WorkflowQueueServiceHealthIndicator, - WorkflowWorkerService, - SubscriberProcessQueueService, - SubscriberProcessWorkerService, - SubscriberProcessQueueHealthIndicator, -]; +const memoryQueueService = { + provide: WorkflowInMemoryProviderService, + useFactory: async () => { + const memoryService = new WorkflowInMemoryProviderService(); -@Module({ - providers: [...PROVIDERS], - exports: [...PROVIDERS], -}) -export class QueuesModule {} + await memoryService.initialize(); -const APP_PROVIDERS: Provider[] = [ - InboundParseQueueService, - InboundParseWorker, - InboundParseQueueServiceHealthIndicator, - WebSocketsQueueService, - WebSocketsQueueServiceHealthIndicator, - WorkflowQueueService, - ExecutionLogQueueService, - WorkflowQueueServiceHealthIndicator, -]; + return memoryService; + }, +}; + +const INTERNAL_MODULE_PROVIDERS = [memoryQueueService]; +const BASE_PROVIDERS: Provider[] = [ReadinessService]; @Module({ - providers: [...APP_PROVIDERS], - exports: [...APP_PROVIDERS], + providers: [], + exports: [], }) -export class BaseApiQueuesModule {} +export class QueuesModule implements OnApplicationShutdown { + static forRoot(entities: JobTopicNameEnum[] = []): DynamicModule { + if (!entities.length) { + entities = Object.values(JobTopicNameEnum); + } + + const healthIndicators = []; + const tokenList = []; + const DYNAMIC_PROVIDERS = [...BASE_PROVIDERS]; + + for (const entity of entities) { + switch (entity) { + case JobTopicNameEnum.INBOUND_PARSE_MAIL: + healthIndicators.push(InboundParseQueueServiceHealthIndicator); + tokenList.push(InboundParseQueueService); + DYNAMIC_PROVIDERS.push( + InboundParseQueueService, + InboundParseQueueServiceHealthIndicator + ); + break; + case JobTopicNameEnum.WORKFLOW: + healthIndicators.push(WorkflowQueueServiceHealthIndicator); + tokenList.push(WorkflowQueueService); + DYNAMIC_PROVIDERS.push( + WorkflowQueueService, + WorkflowQueueServiceHealthIndicator + ); + break; + case JobTopicNameEnum.WEB_SOCKETS: + healthIndicators.push(WebSocketsQueueServiceHealthIndicator); + tokenList.push(WebSocketsQueueService); + DYNAMIC_PROVIDERS.push( + WebSocketsQueueService, + WebSocketsQueueServiceHealthIndicator + ); + break; + case JobTopicNameEnum.STANDARD: + tokenList.push(StandardQueueService); + DYNAMIC_PROVIDERS.push( + StandardQueueService, + StandardQueueServiceHealthIndicator + ); + break; + case JobTopicNameEnum.PROCESS_SUBSCRIBER: + healthIndicators.push(SubscriberProcessQueueHealthIndicator); + tokenList.push(SubscriberProcessQueueService); + DYNAMIC_PROVIDERS.push( + SubscriberProcessQueueService, + SubscriberProcessQueueHealthIndicator + ); + break; + case JobTopicNameEnum.EXECUTION_LOG: + tokenList.push(ExecutionLogQueueService); + DYNAMIC_PROVIDERS.push(ExecutionLogQueueService); + break; + case JobTopicNameEnum.ACTIVE_JOBS_METRIC: + healthIndicators.push(ActiveJobsMetricQueueServiceHealthIndicator); + tokenList.push(ActiveJobsMetricQueueService); + DYNAMIC_PROVIDERS.push( + ActiveJobsMetricQueueService, + ActiveJobsMetricQueueServiceHealthIndicator, + ActiveJobsMetricWorkerService + ); + break; + } + } + + DYNAMIC_PROVIDERS.push({ + provide: 'BULLMQ_LIST', + useFactory: (...args: any[]) => { + return args; + }, + inject: tokenList, + }); + + DYNAMIC_PROVIDERS.push({ + provide: 'QUEUE_HEALTH_INDICATORS', + useFactory: (...args: any[]) => { + return args; + }, + inject: healthIndicators, + }); + + return { + module: QueuesModule, + providers: [...DYNAMIC_PROVIDERS, ...INTERNAL_MODULE_PROVIDERS], + exports: [...DYNAMIC_PROVIDERS], + }; + } + + constructor( + private workflowInMemoryProviderService: WorkflowInMemoryProviderService + ) {} + + async onApplicationShutdown() { + await this.workflowInMemoryProviderService.shutdown(); + } +} diff --git a/packages/application-generic/src/services/bull-mq/bull-mq.service.spec.ts b/packages/application-generic/src/services/bull-mq/bull-mq.service.spec.ts index 3e2f5d38dbc..141b7f933fa 100644 --- a/packages/application-generic/src/services/bull-mq/bull-mq.service.spec.ts +++ b/packages/application-generic/src/services/bull-mq/bull-mq.service.spec.ts @@ -5,6 +5,7 @@ import { QueueBaseOptions, WorkerOptions, } from './bull-mq.service'; +import { WorkflowInMemoryProviderService } from '../in-memory-provider'; let bullMqService: BullMqService; @@ -14,7 +15,7 @@ describe('BullMQ Service', () => { process.env.IN_MEMORY_CLUSTER_MODE_ENABLED = 'false'; process.env.IS_IN_MEMORY_CLUSTER_MODE_ENABLED = 'false'; - bullMqService = new BullMqService(); + bullMqService = new BullMqService(new WorkflowInMemoryProviderService()); }); afterEach(async () => { @@ -65,7 +66,7 @@ describe('BullMQ Service', () => { process.env.MEMORY_DB_CLUSTER_SERVICE_HOST = 'localhost'; process.env.IS_IN_MEMORY_CLUSTER_MODE_ENABLED = 'true'; - bullMqService = new BullMqService(); + bullMqService = new BullMqService(new WorkflowInMemoryProviderService()); const queue = bullMqService.createQueue( JobTopicNameEnum.ACTIVE_JOBS_METRIC, {} @@ -77,7 +78,7 @@ describe('BullMQ Service', () => { process.env.MEMORY_DB_CLUSTER_SERVICE_HOST = ''; process.env.IS_IN_MEMORY_CLUSTER_MODE_ENABLED = 'false'; - bullMqService = new BullMqService(); + bullMqService = new BullMqService(new WorkflowInMemoryProviderService()); const queue = bullMqService.createQueue( JobTopicNameEnum.ACTIVE_JOBS_METRIC, {} @@ -89,7 +90,7 @@ describe('BullMQ Service', () => { process.env.IS_IN_MEMORY_CLUSTER_MODE_ENABLED = 'true'; process.env.MEMORY_DB_CLUSTER_SERVICE_HOST = ''; - bullMqService = new BullMqService(); + bullMqService = new BullMqService(new WorkflowInMemoryProviderService()); const queue = bullMqService.createQueue( JobTopicNameEnum.ACTIVE_JOBS_METRIC, {} diff --git a/packages/application-generic/src/services/bull-mq/bull-mq.service.ts b/packages/application-generic/src/services/bull-mq/bull-mq.service.ts index 190b9ef18a0..c365bb81f87 100644 --- a/packages/application-generic/src/services/bull-mq/bull-mq.service.ts +++ b/packages/application-generic/src/services/bull-mq/bull-mq.service.ts @@ -39,23 +39,16 @@ export { BulkJobOptions, }; -@Injectable() export class BullMqService { private _queue: Queue; private _worker: Worker; - private workflowInMemoryProviderService: WorkflowInMemoryProviderService; public static readonly pro: boolean = process.env.NOVU_MANAGED_SERVICE !== undefined; - constructor() { - this.workflowInMemoryProviderService = - new WorkflowInMemoryProviderService(); - } - - public async initialize(): Promise { - await this.workflowInMemoryProviderService.initialize(); - } + constructor( + private workflowInMemoryProviderService: WorkflowInMemoryProviderService + ) {} public get worker(): Worker { return this._worker; @@ -87,16 +80,6 @@ export class BullMqService { return BullMqService.pro && BullMqService.haveProInstalled(); } - public async getQueueMetrics( - start?: number, - end?: number - ): Promise { - return { - completed: await this._queue.getMetrics('completed', start, end), - failed: await this._queue.getMetrics('failed', start, end), - }; - } - /** * To avoid going crazy not understanding why jobs are not processed in cluster mode * Reference: @@ -252,8 +235,6 @@ export class BullMqService { await this._worker.close(); } - await this.workflowInMemoryProviderService.shutdown(); - Logger.log('Shutting down the BullMQ service has finished', LOG_CONTEXT); } diff --git a/packages/application-generic/src/services/in-memory-provider/providers/memory-db-cluster-provider.ts b/packages/application-generic/src/services/in-memory-provider/providers/memory-db-cluster-provider.ts index cf2d1b5adae..4e4a32bd27b 100644 --- a/packages/application-generic/src/services/in-memory-provider/providers/memory-db-cluster-provider.ts +++ b/packages/application-generic/src/services/in-memory-provider/providers/memory-db-cluster-provider.ts @@ -120,8 +120,10 @@ export const getMemoryDbCluster = ( enableAutoPipelining: enableAutoPipelining ?? false, enableOfflineQueue: false, redisOptions: { + maxRetriesPerRequest: null, tls, connectTimeout: 10000, + ...(password && { password }), ...(username && { username }), }, diff --git a/packages/application-generic/src/services/in-memory-provider/providers/redis-provider.ts b/packages/application-generic/src/services/in-memory-provider/providers/redis-provider.ts index f0574f0eeab..84e5f220368 100644 --- a/packages/application-generic/src/services/in-memory-provider/providers/redis-provider.ts +++ b/packages/application-generic/src/services/in-memory-provider/providers/redis-provider.ts @@ -91,6 +91,7 @@ export const getRedisInstance = (): Redis | undefined => { const options = { ...configOptions, + maxRetriesPerRequest: null, /* * Disabled in Prod as affects performance */ diff --git a/packages/application-generic/src/services/queues/active-jobs-metric-queue.service.spec.ts b/packages/application-generic/src/services/queues/active-jobs-metric-queue.service.spec.ts index c2de0de08ca..552877c458d 100644 --- a/packages/application-generic/src/services/queues/active-jobs-metric-queue.service.spec.ts +++ b/packages/application-generic/src/services/queues/active-jobs-metric-queue.service.spec.ts @@ -1,13 +1,17 @@ import { Test } from '@nestjs/testing'; import { ActiveJobsMetricQueueService } from './active-jobs-metric-queue.service'; +import { BullMqService } from '../bull-mq'; +import { WorkflowInMemoryProviderService } from '../in-memory-provider'; let activeJobsMetricQueueService: ActiveJobsMetricQueueService; describe('Job metrics Queue service', () => { describe('General', () => { beforeAll(async () => { - activeJobsMetricQueueService = new ActiveJobsMetricQueueService(); + activeJobsMetricQueueService = new ActiveJobsMetricQueueService( + new WorkflowInMemoryProviderService() + ); await activeJobsMetricQueueService.queue.drain(); }); @@ -64,7 +68,9 @@ describe('Job metrics Queue service', () => { beforeAll(async () => { process.env.IS_IN_MEMORY_CLUSTER_MODE_ENABLED = 'true'; - activeJobsMetricQueueService = new ActiveJobsMetricQueueService(); + activeJobsMetricQueueService = new ActiveJobsMetricQueueService( + new WorkflowInMemoryProviderService() + ); await activeJobsMetricQueueService.queue.obliterate(); }); diff --git a/packages/application-generic/src/services/queues/active-jobs-metric-queue.service.ts b/packages/application-generic/src/services/queues/active-jobs-metric-queue.service.ts index 21571e33425..66c412af75e 100644 --- a/packages/application-generic/src/services/queues/active-jobs-metric-queue.service.ts +++ b/packages/application-generic/src/services/queues/active-jobs-metric-queue.service.ts @@ -2,13 +2,20 @@ import { Inject, Injectable, Logger } from '@nestjs/common'; import { JobTopicNameEnum } from '@novu/shared'; import { QueueBaseService } from './queue-base.service'; +import { BullMqService } from '../bull-mq'; +import { WorkflowInMemoryProviderService } from '../in-memory-provider'; const LOG_CONTEXT = 'ActiveJobsMetricQueueService'; @Injectable() export class ActiveJobsMetricQueueService extends QueueBaseService { - constructor() { - super(JobTopicNameEnum.ACTIVE_JOBS_METRIC); + constructor( + public workflowInMemoryProviderService: WorkflowInMemoryProviderService + ) { + super( + JobTopicNameEnum.ACTIVE_JOBS_METRIC, + new BullMqService(workflowInMemoryProviderService) + ); Logger.log(`Creating queue ${this.topic}`, LOG_CONTEXT); diff --git a/packages/application-generic/src/services/queues/completed-jobs-metric-queue.service.spec.ts b/packages/application-generic/src/services/queues/completed-jobs-metric-queue.service.spec.ts deleted file mode 100644 index 00939fee0e1..00000000000 --- a/packages/application-generic/src/services/queues/completed-jobs-metric-queue.service.spec.ts +++ /dev/null @@ -1,84 +0,0 @@ -import { Test } from '@nestjs/testing'; - -import { CompletedJobsMetricQueueService } from './completed-jobs-metric-queue.service'; - -let completedJobsMetricQueueService: CompletedJobsMetricQueueService; - -describe('Job metrics Queue service', () => { - describe('General', () => { - beforeAll(async () => { - completedJobsMetricQueueService = new CompletedJobsMetricQueueService(); - await completedJobsMetricQueueService.queue.drain(); - }); - - beforeEach(async () => { - await completedJobsMetricQueueService.queue.drain(); - }); - - afterEach(async () => { - await completedJobsMetricQueueService.queue.drain(); - }); - - afterAll(async () => { - await completedJobsMetricQueueService.gracefulShutdown(); - }); - - it('should be initialised properly', async () => { - expect(completedJobsMetricQueueService).toBeDefined(); - expect(Object.keys(completedJobsMetricQueueService)).toEqual( - expect.arrayContaining([ - 'topic', - 'DEFAULT_ATTEMPTS', - 'instance', - 'queue', - ]) - ); - expect(completedJobsMetricQueueService.DEFAULT_ATTEMPTS).toEqual(3); - expect(completedJobsMetricQueueService.topic).toEqual( - 'metric-completed-jobs' - ); - expect( - await completedJobsMetricQueueService.bullMqService.getStatus() - ).toEqual({ - queueIsPaused: false, - queueName: 'metric-completed-jobs', - workerName: undefined, - workerIsPaused: undefined, - workerIsRunning: undefined, - }); - expect(await completedJobsMetricQueueService.isPaused()).toEqual(false); - expect(completedJobsMetricQueueService.queue).toMatchObject( - expect.objectContaining({ - _events: {}, - _eventsCount: 0, - _maxListeners: undefined, - name: 'metric-completed-jobs', - jobsOpts: { - removeOnComplete: true, - }, - }) - ); - expect(completedJobsMetricQueueService.queue.opts.prefix).toEqual('bull'); - }); - }); - - describe('Cluster mode', () => { - beforeAll(async () => { - process.env.IS_IN_MEMORY_CLUSTER_MODE_ENABLED = 'true'; - - completedJobsMetricQueueService = new CompletedJobsMetricQueueService(); - await completedJobsMetricQueueService.queue.obliterate(); - }); - - afterAll(async () => { - await completedJobsMetricQueueService.gracefulShutdown(); - process.env.IS_IN_MEMORY_CLUSTER_MODE_ENABLED = 'false'; - }); - - it('should have prefix in cluster mode', async () => { - expect(completedJobsMetricQueueService.queue.opts.prefix).toEqual( - '{metric-completed-jobs}' - ); - }); - }); -}); diff --git a/packages/application-generic/src/services/queues/completed-jobs-metric-queue.service.ts b/packages/application-generic/src/services/queues/completed-jobs-metric-queue.service.ts deleted file mode 100644 index 1ef74d97871..00000000000 --- a/packages/application-generic/src/services/queues/completed-jobs-metric-queue.service.ts +++ /dev/null @@ -1,17 +0,0 @@ -import { Inject, Injectable, Logger } from '@nestjs/common'; -import { JobTopicNameEnum } from '@novu/shared'; - -import { QueueBaseService } from './queue-base.service'; - -const LOG_CONTEXT = 'CompletedJobsMetricQueueService'; - -@Injectable() -export class CompletedJobsMetricQueueService extends QueueBaseService { - constructor() { - super(JobTopicNameEnum.COMPLETED_JOBS_METRIC); - - Logger.log(`Creating queue ${this.topic}`, LOG_CONTEXT); - - this.createQueue(); - } -} diff --git a/packages/application-generic/src/services/queues/execution-log-queue.service.spec.ts b/packages/application-generic/src/services/queues/execution-log-queue.service.spec.ts index 5ec2ed06479..ed7940d3363 100644 --- a/packages/application-generic/src/services/queues/execution-log-queue.service.spec.ts +++ b/packages/application-generic/src/services/queues/execution-log-queue.service.spec.ts @@ -1,13 +1,17 @@ import { Test } from '@nestjs/testing'; import { ExecutionLogQueueService } from './execution-log-queue.service'; +import { BullMqService } from '../bull-mq'; +import { WorkflowInMemoryProviderService } from '../in-memory-provider'; let executionLogQueueService: ExecutionLogQueueService; describe('Execution Log Queue service', () => { describe('General', () => { beforeAll(async () => { - executionLogQueueService = new ExecutionLogQueueService(); + executionLogQueueService = new ExecutionLogQueueService( + new WorkflowInMemoryProviderService() + ); await executionLogQueueService.queue.drain(); }); @@ -62,7 +66,9 @@ describe('Execution Log Queue service', () => { beforeAll(async () => { process.env.IS_IN_MEMORY_CLUSTER_MODE_ENABLED = 'true'; - executionLogQueueService = new ExecutionLogQueueService(); + executionLogQueueService = new ExecutionLogQueueService( + new WorkflowInMemoryProviderService() + ); await executionLogQueueService.queue.obliterate(); }); diff --git a/packages/application-generic/src/services/queues/execution-log-queue.service.ts b/packages/application-generic/src/services/queues/execution-log-queue.service.ts index d9f80b7704b..2247b68d525 100644 --- a/packages/application-generic/src/services/queues/execution-log-queue.service.ts +++ b/packages/application-generic/src/services/queues/execution-log-queue.service.ts @@ -1,14 +1,21 @@ -import { Injectable, Logger } from '@nestjs/common'; +import { Inject, Injectable, Logger } from '@nestjs/common'; import { JobTopicNameEnum } from '@novu/shared'; import { QueueBaseService } from './queue-base.service'; +import { BullMqService } from '../bull-mq'; +import { WorkflowInMemoryProviderService } from '../in-memory-provider'; const LOG_CONTEXT = 'ExecutionLogQueueService'; @Injectable() export class ExecutionLogQueueService extends QueueBaseService { - constructor() { - super(JobTopicNameEnum.EXECUTION_LOG); + constructor( + public workflowInMemoryProviderService: WorkflowInMemoryProviderService + ) { + super( + JobTopicNameEnum.EXECUTION_LOG, + new BullMqService(workflowInMemoryProviderService) + ); Logger.log(`Creating queue ${this.topic}`, LOG_CONTEXT); diff --git a/packages/application-generic/src/services/queues/inbound-parse-queue.service.spec.ts b/packages/application-generic/src/services/queues/inbound-parse-queue.service.spec.ts index 5ed92a3231d..bcc523fa2b1 100644 --- a/packages/application-generic/src/services/queues/inbound-parse-queue.service.spec.ts +++ b/packages/application-generic/src/services/queues/inbound-parse-queue.service.spec.ts @@ -1,13 +1,17 @@ import { Test } from '@nestjs/testing'; import { InboundParseQueueService } from './inbound-parse-queue.service'; +import { BullMqService } from '../bull-mq'; +import { WorkflowInMemoryProviderService } from '../in-memory-provider'; let inboundParseQueueService: InboundParseQueueService; describe('Inbound Parse Queue service', () => { describe('General', () => { beforeAll(async () => { - inboundParseQueueService = new InboundParseQueueService(); + inboundParseQueueService = new InboundParseQueueService( + new WorkflowInMemoryProviderService() + ); await inboundParseQueueService.queue.obliterate(); }); @@ -129,7 +133,9 @@ describe('Inbound Parse Queue service', () => { beforeAll(async () => { process.env.IS_IN_MEMORY_CLUSTER_MODE_ENABLED = 'true'; - inboundParseQueueService = new InboundParseQueueService(); + inboundParseQueueService = new InboundParseQueueService( + new WorkflowInMemoryProviderService() + ); await inboundParseQueueService.queue.obliterate(); }); diff --git a/packages/application-generic/src/services/queues/inbound-parse-queue.service.ts b/packages/application-generic/src/services/queues/inbound-parse-queue.service.ts index 184b99dd09a..87d1b995f58 100644 --- a/packages/application-generic/src/services/queues/inbound-parse-queue.service.ts +++ b/packages/application-generic/src/services/queues/inbound-parse-queue.service.ts @@ -3,14 +3,20 @@ import { JobTopicNameEnum } from '@novu/shared'; import { QueueBaseService } from './queue-base.service'; -import { QueueOptions } from '../bull-mq'; +import { BullMqService, QueueOptions } from '../bull-mq'; +import { WorkflowInMemoryProviderService } from '../in-memory-provider'; const LOG_CONTEXT = 'InboundParseQueueService'; @Injectable() export class InboundParseQueueService extends QueueBaseService { - constructor() { - super(JobTopicNameEnum.INBOUND_PARSE_MAIL); + constructor( + public workflowInMemoryProviderService: WorkflowInMemoryProviderService + ) { + super( + JobTopicNameEnum.INBOUND_PARSE_MAIL, + new BullMqService(workflowInMemoryProviderService) + ); Logger.log(`Creating queue ${this.topic}`, LOG_CONTEXT); diff --git a/packages/application-generic/src/services/queues/index.ts b/packages/application-generic/src/services/queues/index.ts index 276ac42b137..f7d82ab9e0d 100644 --- a/packages/application-generic/src/services/queues/index.ts +++ b/packages/application-generic/src/services/queues/index.ts @@ -1,7 +1,6 @@ import { QueueBaseService } from './queue-base.service'; import { ActiveJobsMetricQueueService } from './active-jobs-metric-queue.service'; -import { CompletedJobsMetricQueueService } from './completed-jobs-metric-queue.service'; import { InboundParseQueueService } from './inbound-parse-queue.service'; import { StandardQueueService } from './standard-queue.service'; import { WebSocketsQueueService } from './web-sockets-queue.service'; @@ -12,7 +11,6 @@ import { ExecutionLogQueueService } from './execution-log-queue.service'; export { QueueBaseService, ActiveJobsMetricQueueService, - CompletedJobsMetricQueueService, InboundParseQueueService, StandardQueueService, WebSocketsQueueService, diff --git a/packages/application-generic/src/services/queues/queue-base.service.ts b/packages/application-generic/src/services/queues/queue-base.service.ts index 65f3ce4e094..9d82e07b8c8 100644 --- a/packages/application-generic/src/services/queues/queue-base.service.ts +++ b/packages/application-generic/src/services/queues/queue-base.service.ts @@ -12,17 +12,16 @@ import { const LOG_CONTEXT = 'QueueService'; export class QueueBaseService { - private instance: BullMqService; + public instance: BullMqService; public readonly DEFAULT_ATTEMPTS = 3; public queue: Queue; - constructor(public readonly topic: JobTopicNameEnum) { - this.instance = new BullMqService(); - } - - public get bullMqService(): BullMqService { - return this.instance; + constructor( + public readonly topic: JobTopicNameEnum, + public bullMqService: BullMqService + ) { + this.instance = bullMqService; } public createQueue(overrideOptions?: QueueOptions): void { @@ -67,10 +66,6 @@ export class QueueBaseService { ); } - async onModuleDestroy(): Promise { - await this.gracefulShutdown(); - } - public async addMinimalJob( id: string, data?: any, @@ -119,4 +114,8 @@ export class QueueBaseService { ) { await this.instance.addBulk(data); } + + async onModuleDestroy(): Promise { + await this.gracefulShutdown(); + } } diff --git a/packages/application-generic/src/services/queues/standard-queue.service.spec.ts b/packages/application-generic/src/services/queues/standard-queue.service.spec.ts index 94becbba5fd..e7a2c0e5c3d 100644 --- a/packages/application-generic/src/services/queues/standard-queue.service.spec.ts +++ b/packages/application-generic/src/services/queues/standard-queue.service.spec.ts @@ -1,13 +1,17 @@ import { Test } from '@nestjs/testing'; import { StandardQueueService } from './standard-queue.service'; +import { BullMqService } from '../bull-mq'; +import { WorkflowInMemoryProviderService } from '../in-memory-provider'; let standardQueueService: StandardQueueService; describe('Standard Queue service', () => { describe('General', () => { beforeAll(async () => { - standardQueueService = new StandardQueueService(); + standardQueueService = new StandardQueueService( + new WorkflowInMemoryProviderService() + ); await standardQueueService.queue.obliterate(); }); @@ -123,7 +127,9 @@ describe('Standard Queue service', () => { beforeAll(async () => { process.env.IS_IN_MEMORY_CLUSTER_MODE_ENABLED = 'true'; - standardQueueService = new StandardQueueService(); + standardQueueService = new StandardQueueService( + new WorkflowInMemoryProviderService() + ); await standardQueueService.queue.obliterate(); }); diff --git a/packages/application-generic/src/services/queues/standard-queue.service.ts b/packages/application-generic/src/services/queues/standard-queue.service.ts index 75febb14e1b..2c676da1f03 100644 --- a/packages/application-generic/src/services/queues/standard-queue.service.ts +++ b/packages/application-generic/src/services/queues/standard-queue.service.ts @@ -2,13 +2,20 @@ import { Inject, Injectable, Logger } from '@nestjs/common'; import { JobTopicNameEnum } from '@novu/shared'; import { QueueBaseService } from './queue-base.service'; +import { BullMqService } from '../bull-mq'; +import { WorkflowInMemoryProviderService } from '../in-memory-provider'; const LOG_CONTEXT = 'StandardQueueService'; @Injectable() export class StandardQueueService extends QueueBaseService { - constructor() { - super(JobTopicNameEnum.STANDARD); + constructor( + public workflowInMemoryProviderService: WorkflowInMemoryProviderService + ) { + super( + JobTopicNameEnum.STANDARD, + new BullMqService(workflowInMemoryProviderService) + ); Logger.log(`Creating queue ${this.topic}`, LOG_CONTEXT); diff --git a/packages/application-generic/src/services/queues/subscriber-process-queue.service.ts b/packages/application-generic/src/services/queues/subscriber-process-queue.service.ts index 94e5670b973..e401067ff39 100644 --- a/packages/application-generic/src/services/queues/subscriber-process-queue.service.ts +++ b/packages/application-generic/src/services/queues/subscriber-process-queue.service.ts @@ -1,13 +1,22 @@ -import { Injectable, Logger } from '@nestjs/common'; +import { forwardRef, Inject, Injectable, Logger } from '@nestjs/common'; import { JobTopicNameEnum } from '@novu/shared'; import { QueueBaseService } from './queue-base.service'; +import { BullMqService } from '../bull-mq'; +import { WorkflowInMemoryProviderService } from '../in-memory-provider'; +import { AuthService } from '../auth'; @Injectable() export class SubscriberProcessQueueService extends QueueBaseService { private readonly LOG_CONTEXT = 'SubscriberProcessQueueService'; - constructor() { - super(JobTopicNameEnum.PROCESS_SUBSCRIBER); + constructor( + @Inject(forwardRef(() => WorkflowInMemoryProviderService)) + public workflowInMemoryProviderService: WorkflowInMemoryProviderService + ) { + super( + JobTopicNameEnum.PROCESS_SUBSCRIBER, + new BullMqService(workflowInMemoryProviderService) + ); Logger.log(`Creating queue ${this.topic}`, this.LOG_CONTEXT); diff --git a/packages/application-generic/src/services/queues/web-sockets-queue.service.spec.ts b/packages/application-generic/src/services/queues/web-sockets-queue.service.spec.ts index 5d8bddc2c67..1e9f980d126 100644 --- a/packages/application-generic/src/services/queues/web-sockets-queue.service.spec.ts +++ b/packages/application-generic/src/services/queues/web-sockets-queue.service.spec.ts @@ -1,13 +1,17 @@ import { Test } from '@nestjs/testing'; import { WebSocketsQueueService } from './web-sockets-queue.service'; +import { BullMqService } from '../bull-mq'; +import { WorkflowInMemoryProviderService } from '../in-memory-provider'; let webSocketsQueueService: WebSocketsQueueService; describe('WebSockets Queue service', () => { describe('General', () => { beforeAll(async () => { - webSocketsQueueService = new WebSocketsQueueService(); + webSocketsQueueService = new WebSocketsQueueService( + new WorkflowInMemoryProviderService() + ); await webSocketsQueueService.queue.obliterate(); }); @@ -116,7 +120,9 @@ describe('WebSockets Queue service', () => { beforeAll(async () => { process.env.IS_IN_MEMORY_CLUSTER_MODE_ENABLED = 'true'; - webSocketsQueueService = new WebSocketsQueueService(); + webSocketsQueueService = new WebSocketsQueueService( + new WorkflowInMemoryProviderService() + ); await webSocketsQueueService.queue.obliterate(); }); diff --git a/packages/application-generic/src/services/queues/web-sockets-queue.service.ts b/packages/application-generic/src/services/queues/web-sockets-queue.service.ts index da5e4ee81cd..b2a92f7e36b 100644 --- a/packages/application-generic/src/services/queues/web-sockets-queue.service.ts +++ b/packages/application-generic/src/services/queues/web-sockets-queue.service.ts @@ -2,13 +2,20 @@ import { Inject, Injectable, Logger } from '@nestjs/common'; import { JobTopicNameEnum } from '@novu/shared'; import { QueueBaseService } from './queue-base.service'; +import { BullMqService } from '../bull-mq'; +import { WorkflowInMemoryProviderService } from '../in-memory-provider'; const LOG_CONTEXT = 'WebSocketsQueueService'; @Injectable() export class WebSocketsQueueService extends QueueBaseService { - constructor() { - super(JobTopicNameEnum.WEB_SOCKETS); + constructor( + public workflowInMemoryProviderService: WorkflowInMemoryProviderService + ) { + super( + JobTopicNameEnum.WEB_SOCKETS, + new BullMqService(workflowInMemoryProviderService) + ); Logger.log(`Creating queue ${this.topic}`, LOG_CONTEXT); diff --git a/packages/application-generic/src/services/queues/workflow-queue.service.spec.ts b/packages/application-generic/src/services/queues/workflow-queue.service.spec.ts index 115bb71f8d8..4e1cc688b52 100644 --- a/packages/application-generic/src/services/queues/workflow-queue.service.spec.ts +++ b/packages/application-generic/src/services/queues/workflow-queue.service.spec.ts @@ -1,13 +1,17 @@ import { Test } from '@nestjs/testing'; import { WorkflowQueueService } from './workflow-queue.service'; +import { BullMqService } from '../bull-mq'; +import { WorkflowInMemoryProviderService } from '../in-memory-provider'; let workflowQueueService: WorkflowQueueService; describe('Workflow Queue service', () => { describe('General', () => { beforeAll(async () => { - workflowQueueService = new WorkflowQueueService(); + workflowQueueService = new WorkflowQueueService( + new WorkflowInMemoryProviderService() + ); await workflowQueueService.queue.obliterate(); }); @@ -123,7 +127,9 @@ describe('Workflow Queue service', () => { beforeAll(async () => { process.env.IS_IN_MEMORY_CLUSTER_MODE_ENABLED = 'true'; - workflowQueueService = new WorkflowQueueService(); + workflowQueueService = new WorkflowQueueService( + new WorkflowInMemoryProviderService() + ); await workflowQueueService.queue.obliterate(); }); diff --git a/packages/application-generic/src/services/queues/workflow-queue.service.ts b/packages/application-generic/src/services/queues/workflow-queue.service.ts index 77d48b6de9c..65d47bd7916 100644 --- a/packages/application-generic/src/services/queues/workflow-queue.service.ts +++ b/packages/application-generic/src/services/queues/workflow-queue.service.ts @@ -2,13 +2,20 @@ import { Inject, Injectable, Logger } from '@nestjs/common'; import { JobTopicNameEnum } from '@novu/shared'; import { QueueBaseService } from './queue-base.service'; +import { BullMqService } from '../bull-mq'; +import { WorkflowInMemoryProviderService } from '../in-memory-provider'; const LOG_CONTEXT = 'WorkflowQueueService'; @Injectable() export class WorkflowQueueService extends QueueBaseService { - constructor() { - super(JobTopicNameEnum.WORKFLOW); + constructor( + public workflowInMemoryProviderService: WorkflowInMemoryProviderService + ) { + super( + JobTopicNameEnum.WORKFLOW, + new BullMqService(workflowInMemoryProviderService) + ); Logger.log(`Creating queue ${this.topic}`, LOG_CONTEXT); diff --git a/packages/application-generic/src/services/readiness/readiness.service.spec.ts b/packages/application-generic/src/services/readiness/readiness.service.spec.ts index ef30899d704..0df0d921478 100644 --- a/packages/application-generic/src/services/readiness/readiness.service.spec.ts +++ b/packages/application-generic/src/services/readiness/readiness.service.spec.ts @@ -12,6 +12,7 @@ import { SubscriberProcessQueueHealthIndicator, WorkflowQueueServiceHealthIndicator, } from '../../health'; +import { WorkflowInMemoryProviderService } from '../in-memory-provider'; let readinessService: ReadinessService; let standardQueueService: StandardQueueService; @@ -24,14 +25,20 @@ describe('Readiness Service', () => { process.env.IN_MEMORY_CLUSTER_MODE_ENABLED = 'false'; process.env.IS_IN_MEMORY_CLUSTER_MODE_ENABLED = 'false'; - standardQueueService = new StandardQueueService(); - workflowQueueService = new WorkflowQueueService(); - subscriberProcessQueueService = new SubscriberProcessQueueService(); + standardQueueService = new StandardQueueService( + new WorkflowInMemoryProviderService() + ); + workflowQueueService = new WorkflowQueueService( + new WorkflowInMemoryProviderService() + ); + subscriberProcessQueueService = new SubscriberProcessQueueService( + new WorkflowInMemoryProviderService() + ); await Promise.all([ - standardQueueService.bullMqService.initialize(), - workflowQueueService.bullMqService.initialize(), - subscriberProcessQueueService.bullMqService.initialize(), + standardQueueService.workflowInMemoryProviderService.initialize(), + workflowQueueService.workflowInMemoryProviderService.initialize(), + subscriberProcessQueueService.workflowInMemoryProviderService.initialize(), ]); const standardQueueServiceHealthIndicator = @@ -41,11 +48,11 @@ describe('Readiness Service', () => { const subscriberProcessQueueHealthIndicator = new SubscriberProcessQueueHealthIndicator(subscriberProcessQueueService); - readinessService = new ReadinessService( + readinessService = new ReadinessService([ standardQueueServiceHealthIndicator, workflowQueueServiceHealthIndicator, - subscriberProcessQueueHealthIndicator - ); + subscriberProcessQueueHealthIndicator, + ]); }); afterAll(async () => { @@ -86,7 +93,9 @@ describe('Readiness Service', () => { }; }; - testWorker = new StandardWorkerService(); + testWorker = new StandardWorkerService( + new BullMqService(new WorkflowInMemoryProviderService()) + ); await testWorker.initWorker(getWorkerProcessor(), getWorkerOptions()); const [initialIsPaused, initialIsRunning] = await Promise.all([ diff --git a/packages/application-generic/src/services/readiness/readiness.service.ts b/packages/application-generic/src/services/readiness/readiness.service.ts index 01870b79210..fa7336fce10 100644 --- a/packages/application-generic/src/services/readiness/readiness.service.ts +++ b/packages/application-generic/src/services/readiness/readiness.service.ts @@ -1,14 +1,9 @@ -import { Injectable, Logger } from '@nestjs/common'; +import { Inject, Injectable, Logger } from '@nestjs/common'; import { Worker } from '../bull-mq'; -import { - StandardQueueServiceHealthIndicator, - SubscriberProcessQueueHealthIndicator, - WebSocketsQueueServiceHealthIndicator, - WorkflowQueueServiceHealthIndicator, -} from '../../health'; import { setTimeout } from 'timers/promises'; +import { QueueHealthIndicator } from '../../health/queue-health-indicator.service'; export interface INovuWorker { readonly DEFAULT_ATTEMPTS: number; gracefulShutdown: () => Promise; @@ -24,9 +19,8 @@ const LOG_CONTEXT = 'ReadinessService'; @Injectable() export class ReadinessService { constructor( - private standardQueueServiceHealthIndicator: StandardQueueServiceHealthIndicator, - private workflowQueueServiceHealthIndicator: WorkflowQueueServiceHealthIndicator, - private subscriberProcessQueueHealthIndicator: SubscriberProcessQueueHealthIndicator + @Inject('QUEUE_HEALTH_INDICATORS') + private healthIndicators: QueueHealthIndicator[] ) {} async areQueuesEnabled(): Promise { @@ -58,11 +52,9 @@ export class ReadinessService { private async checkServicesHealth() { try { - const healths = await Promise.all([ - this.standardQueueServiceHealthIndicator.isHealthy(), - this.workflowQueueServiceHealthIndicator.isHealthy(), - this.subscriberProcessQueueHealthIndicator.isHealthy(), - ]); + const healths = await Promise.all( + this.healthIndicators.map((health) => health.isHealthy()) + ); return healths.every((health) => !!health === true); } catch (error) { diff --git a/packages/application-generic/src/services/workers/active-jobs-metric-worker.service.ts b/packages/application-generic/src/services/workers/active-jobs-metric-worker.service.ts index 8ea5b06aa7c..ab94787ae20 100644 --- a/packages/application-generic/src/services/workers/active-jobs-metric-worker.service.ts +++ b/packages/application-generic/src/services/workers/active-jobs-metric-worker.service.ts @@ -2,13 +2,20 @@ import { Inject, Injectable, Logger } from '@nestjs/common'; import { JobTopicNameEnum } from '@novu/shared'; import { WorkerBaseService } from './index'; +import { BullMqService } from '../bull-mq'; +import { WorkflowInMemoryProviderService } from '../in-memory-provider'; const LOG_CONTEXT = 'ActiveJobsMetricWorkerService'; @Injectable() export class ActiveJobsMetricWorkerService extends WorkerBaseService { - constructor() { - super(JobTopicNameEnum.ACTIVE_JOBS_METRIC); + constructor( + private workflowInMemoryProvider: WorkflowInMemoryProviderService + ) { + super( + JobTopicNameEnum.ACTIVE_JOBS_METRIC, + new BullMqService(workflowInMemoryProvider) + ); Logger.log(`Worker ${this.topic} instantiated`, LOG_CONTEXT); } } diff --git a/packages/application-generic/src/services/workers/completed-jobs-metric-worker.service.ts b/packages/application-generic/src/services/workers/completed-jobs-metric-worker.service.ts deleted file mode 100644 index 2685dfc09c9..00000000000 --- a/packages/application-generic/src/services/workers/completed-jobs-metric-worker.service.ts +++ /dev/null @@ -1,14 +0,0 @@ -import { Inject, Injectable, Logger } from '@nestjs/common'; -import { JobTopicNameEnum } from '@novu/shared'; - -import { WorkerBaseService } from './index'; - -const LOG_CONTEXT = 'CompletedJobsMetricWorkerService'; - -@Injectable() -export class CompletedJobsMetricWorkerService extends WorkerBaseService { - constructor() { - super(JobTopicNameEnum.COMPLETED_JOBS_METRIC); - Logger.log(`Worker ${this.topic} instantiated`, LOG_CONTEXT); - } -} diff --git a/packages/application-generic/src/services/workers/execution-log-worker.service.ts b/packages/application-generic/src/services/workers/execution-log-worker.service.ts index c661b62858d..ba7ecc2fc84 100644 --- a/packages/application-generic/src/services/workers/execution-log-worker.service.ts +++ b/packages/application-generic/src/services/workers/execution-log-worker.service.ts @@ -2,13 +2,14 @@ import { Inject, Injectable, Logger } from '@nestjs/common'; import { JobTopicNameEnum } from '@novu/shared'; import { WorkerBaseService } from './index'; +import { BullMqService } from '../bull-mq'; +import { WorkflowInMemoryProviderService } from '../in-memory-provider'; const LOG_CONTEXT = 'ExecutionLogWorkerService'; -@Injectable() export class ExecutionLogWorkerService extends WorkerBaseService { - constructor() { - super(JobTopicNameEnum.EXECUTION_LOG); + constructor(public bullMqService: BullMqService) { + super(JobTopicNameEnum.EXECUTION_LOG, bullMqService); Logger.log(`Worker ${this.topic} instantiated`, LOG_CONTEXT); } } diff --git a/packages/application-generic/src/services/workers/inbound-parse-worker.service.ts b/packages/application-generic/src/services/workers/inbound-parse-worker.service.ts deleted file mode 100644 index 6789c9d0d5e..00000000000 --- a/packages/application-generic/src/services/workers/inbound-parse-worker.service.ts +++ /dev/null @@ -1,14 +0,0 @@ -import { Inject, Injectable, Logger } from '@nestjs/common'; -import { JobTopicNameEnum } from '@novu/shared'; - -import { WorkerBaseService } from './index'; - -const LOG_CONTEXT = 'InboundParseWorkerService'; - -@Injectable() -export class InboundParseWorkerService extends WorkerBaseService { - constructor() { - super(JobTopicNameEnum.INBOUND_PARSE_MAIL); - Logger.log(`Worker ${this.topic} instantiated`, LOG_CONTEXT); - } -} diff --git a/packages/application-generic/src/services/workers/index.ts b/packages/application-generic/src/services/workers/index.ts index 5c11db3feba..63a19599279 100644 --- a/packages/application-generic/src/services/workers/index.ts +++ b/packages/application-generic/src/services/workers/index.ts @@ -5,8 +5,6 @@ import { } from './worker-base.service'; import { ActiveJobsMetricWorkerService } from './active-jobs-metric-worker.service'; -import { CompletedJobsMetricWorkerService } from './completed-jobs-metric-worker.service'; -import { InboundParseWorkerService } from './inbound-parse-worker.service'; import { StandardWorkerService } from './standard-worker.service'; import { SubscriberProcessWorkerService } from './subscriber-process-worker.service'; import { WebSocketsWorkerService } from './web-sockets-worker.service'; @@ -15,8 +13,6 @@ import { ExecutionLogWorkerService } from './execution-log-worker.service'; export { ActiveJobsMetricWorkerService, - CompletedJobsMetricWorkerService, - InboundParseWorkerService as InboundParseWorker, StandardWorkerService, SubscriberProcessWorkerService, WebSocketsWorkerService, diff --git a/packages/application-generic/src/services/workers/standard-worker.service.ts b/packages/application-generic/src/services/workers/standard-worker.service.ts index 052a06d4ad3..30375172d23 100644 --- a/packages/application-generic/src/services/workers/standard-worker.service.ts +++ b/packages/application-generic/src/services/workers/standard-worker.service.ts @@ -2,13 +2,14 @@ import { Inject, Injectable, Logger } from '@nestjs/common'; import { JobTopicNameEnum } from '@novu/shared'; import { WorkerBaseService } from './index'; +import { BullMqService } from '../bull-mq'; +import { WorkflowInMemoryProviderService } from '../in-memory-provider'; const LOG_CONTEXT = 'StandardWorkerService'; -@Injectable() export class StandardWorkerService extends WorkerBaseService { - constructor() { - super(JobTopicNameEnum.STANDARD); + constructor(public bullMqService: BullMqService) { + super(JobTopicNameEnum.STANDARD, bullMqService); Logger.log(`Worker ${this.topic} instantiated`, LOG_CONTEXT); } } diff --git a/packages/application-generic/src/services/workers/subscriber-process-worker.service.ts b/packages/application-generic/src/services/workers/subscriber-process-worker.service.ts index c2da7287d83..6b7ae89e466 100644 --- a/packages/application-generic/src/services/workers/subscriber-process-worker.service.ts +++ b/packages/application-generic/src/services/workers/subscriber-process-worker.service.ts @@ -1,18 +1,19 @@ -import { Injectable, Logger } from '@nestjs/common'; +import { Inject, Injectable, Logger } from '@nestjs/common'; import { JobTopicNameEnum } from '@novu/shared'; import { INovuWorker } from '../readiness'; import { WorkerBaseService } from './worker-base.service'; +import { BullMqService } from '../bull-mq'; +import { WorkflowInMemoryProviderService } from '../in-memory-provider'; const LOG_CONTEXT = 'SubscriberProcessWorkerService'; -@Injectable() export class SubscriberProcessWorkerService extends WorkerBaseService implements INovuWorker { - constructor() { - super(JobTopicNameEnum.PROCESS_SUBSCRIBER); + constructor(private bullMqService: BullMqService) { + super(JobTopicNameEnum.PROCESS_SUBSCRIBER, bullMqService); Logger.log(`Worker ${this.topic} instantiated`, LOG_CONTEXT); } } diff --git a/packages/application-generic/src/services/workers/web-sockets-worker.service.ts b/packages/application-generic/src/services/workers/web-sockets-worker.service.ts index 21eca627bc4..1cffb0dca12 100644 --- a/packages/application-generic/src/services/workers/web-sockets-worker.service.ts +++ b/packages/application-generic/src/services/workers/web-sockets-worker.service.ts @@ -1,14 +1,14 @@ -import { Injectable, Logger } from '@nestjs/common'; +import { Inject, Injectable, Logger } from '@nestjs/common'; import { JobTopicNameEnum } from '@novu/shared'; import { WorkerBaseService } from './index'; +import { BullMqService } from '../bull-mq'; const LOG_CONTEXT = 'WebSocketsWorkerService'; -@Injectable() export class WebSocketsWorkerService extends WorkerBaseService { - constructor() { - super(JobTopicNameEnum.WEB_SOCKETS); + constructor(public bullMqService: BullMqService) { + super(JobTopicNameEnum.WEB_SOCKETS, bullMqService); Logger.log(`Worker ${this.topic} instantiated`, LOG_CONTEXT); } } diff --git a/packages/application-generic/src/services/workers/worker-base.service.ts b/packages/application-generic/src/services/workers/worker-base.service.ts index fa722ca225e..45a585c2425 100644 --- a/packages/application-generic/src/services/workers/worker-base.service.ts +++ b/packages/application-generic/src/services/workers/worker-base.service.ts @@ -8,6 +8,7 @@ import { Worker, WorkerOptions, } from '../bull-mq'; +import { WorkflowInMemoryProviderService } from '../in-memory-provider'; const LOG_CONTEXT = 'WorkerService'; @@ -23,12 +24,11 @@ export class WorkerBaseService { public readonly DEFAULT_ATTEMPTS = 3; - constructor(public readonly topic: JobTopicNameEnum) { - this.instance = new BullMqService(); - } - - public get bullMqService(): BullMqService { - return this.instance; + constructor( + public readonly topic: JobTopicNameEnum, + public bullMqServiceInstance: BullMqService + ) { + this.instance = bullMqServiceInstance; } public get worker(): Worker { diff --git a/packages/application-generic/src/services/workers/workflow-worker.service.ts b/packages/application-generic/src/services/workers/workflow-worker.service.ts index 415aa040304..29e7993b9db 100644 --- a/packages/application-generic/src/services/workers/workflow-worker.service.ts +++ b/packages/application-generic/src/services/workers/workflow-worker.service.ts @@ -2,13 +2,14 @@ import { Inject, Injectable, Logger } from '@nestjs/common'; import { JobTopicNameEnum } from '@novu/shared'; import { WorkerBaseService } from './index'; +import { BullMqService } from '../bull-mq'; +import { WorkflowInMemoryProviderService } from '../in-memory-provider'; const LOG_CONTEXT = 'WorkflowWorkerService'; -@Injectable() export class WorkflowWorkerService extends WorkerBaseService { - constructor() { - super(JobTopicNameEnum.WORKFLOW); + constructor(public bullMqService: BullMqService) { + super(JobTopicNameEnum.WORKFLOW, bullMqService); Logger.log(`Worker ${this.topic} instantiated`, LOG_CONTEXT); } } diff --git a/packages/application-generic/src/usecases/select-integration/select-integration.spec.ts b/packages/application-generic/src/usecases/select-integration/select-integration.spec.ts index 3074e13c03c..92777d4b162 100644 --- a/packages/application-generic/src/usecases/select-integration/select-integration.spec.ts +++ b/packages/application-generic/src/usecases/select-integration/select-integration.spec.ts @@ -16,7 +16,11 @@ import { SelectIntegrationCommand } from './select-integration.command'; import { GetDecryptedIntegrations } from '../get-decrypted-integrations'; import { ConditionsFilter } from '../conditions-filter'; import { CompileTemplate } from '../compile-template'; -import { ExecutionLogQueueService } from '../../services'; +import { + BullMqService, + ExecutionLogQueueService, + WorkflowInMemoryProviderService, +} from '../../services'; const testIntegration: IntegrationEntity = { _environmentId: 'env-test-123', @@ -105,7 +109,7 @@ describe('select integration', function () { new JobRepository(), new TenantRepository(), new EnvironmentRepository(), - new ExecutionLogQueueService(), + new ExecutionLogQueueService(new WorkflowInMemoryProviderService()), new CompileTemplate() ), new TenantRepository() diff --git a/packages/application-generic/src/usecases/trigger-event/trigger-event.usecase.ts b/packages/application-generic/src/usecases/trigger-event/trigger-event.usecase.ts index 311a8662b1b..07fc9f7fb93 100644 --- a/packages/application-generic/src/usecases/trigger-event/trigger-event.usecase.ts +++ b/packages/application-generic/src/usecases/trigger-event/trigger-event.usecase.ts @@ -47,13 +47,11 @@ export class TriggerEvent { constructor( private processSubscriber: ProcessSubscriber, private integrationRepository: IntegrationRepository, - private subscriberRepository: SubscriberRepository, private jobRepository: JobRepository, private notificationTemplateRepository: NotificationTemplateRepository, private processTenant: ProcessTenant, private logger: PinoLogger, private mapTriggerRecipients: MapTriggerRecipients, - private subscriberProcessQueueService: SubscriberProcessQueueService, private triggerBroadcast: TriggerBroadcast, private triggerMulticast: TriggerMulticast ) {}