From 9249703884065ab90d3bd26857552dc8bba6ef64 Mon Sep 17 00:00:00 2001 From: p-fernandez Date: Tue, 31 Oct 2023 19:22:10 +0000 Subject: [PATCH 1/2] feat(infra): create dedicated services for in-memory for wf and cache --- .../e2e/get-grouped-blueprints.e2e.ts | 7 +- .../app/events/e2e/process-subscriber.e2e.ts | 9 +- .../src/app/health/e2e/health-check.e2e.ts | 3 - apps/api/src/app/widgets/e2e/get-count.e2e.ts | 13 +-- .../app/widgets/e2e/get-unseen-count.e2e.ts | 17 ++- .../e2e/initialize-widget-session.e2e.ts | 7 +- .../src/app/health/e2e/health-check.e2e.ts | 4 - .../src/custom-providers/index.ts | 35 +++--- ...tive-jobs-metric-queue.health-indicator.ts | 2 +- .../src/health/cache.health-indicator.ts | 2 +- ...eted-jobs-metric-queue.health-indicator.ts | 2 +- .../inbound-parse-queue.health-indicator.ts | 2 +- .../health/queue-health-indicator.service.ts | 2 +- .../health/standard-queue.health-indicator.ts | 2 +- ...bscriber-process-queue.health-indicator.ts | 2 +- .../web-sockets-queue.health-indicator.ts | 2 +- .../health/workflow-queue.health-indicator.ts | 2 +- .../src/modules/queues.module.ts | 7 +- .../src/services/bull-mq/bull-mq.service.ts | 44 ++------ .../bull-mq/old-instance-bull-mq.service.ts | 3 +- .../src/services/cache/cache-service.spec.ts | 36 ++---- .../src/services/cache/cache.service.ts | 24 ++-- .../distributed-lock.service.spec.ts | 25 ++--- .../distributed-lock.service.ts | 13 ++- .../cache-in-memory-provider.service.ts | 105 ++++++++++++++++++ .../in-memory-provider.service.spec.ts | 15 +-- .../in-memory-provider.service.ts | 37 ++---- .../src/services/in-memory-provider/index.ts | 2 + .../workflow-in-memory-provider.service.ts | 86 ++++++++++++++ .../active-jobs-metric-queue.service.ts | 2 +- .../completed-jobs-metric-queue.service.ts | 2 +- .../queues/inbound-parse-queue.service.ts | 2 +- .../services/queues/standard-queue.service.ts | 2 +- .../subscriber-process-queue.service.ts | 1 + .../queues/web-sockets-queue.service.ts | 2 +- .../services/queues/workflow-queue.service.ts | 2 +- .../create-subscriber.spec.ts | 21 ++-- ...in-memory-cluster-mode-enabled.use-case.ts | 6 +- .../get-system-critical-flag.test.ts | 12 ++ .../update-subscriber.spec.ts | 18 ++- pnpm-lock.yaml | 22 ++-- 41 files changed, 358 insertions(+), 244 deletions(-) create mode 100644 packages/application-generic/src/services/in-memory-provider/cache-in-memory-provider.service.ts create mode 100644 packages/application-generic/src/services/in-memory-provider/workflow-in-memory-provider.service.ts diff --git a/apps/api/src/app/blueprint/e2e/get-grouped-blueprints.e2e.ts b/apps/api/src/app/blueprint/e2e/get-grouped-blueprints.e2e.ts index 67e4a5893f3..a981955e169 100644 --- a/apps/api/src/app/blueprint/e2e/get-grouped-blueprints.e2e.ts +++ b/apps/api/src/app/blueprint/e2e/get-grouped-blueprints.e2e.ts @@ -13,9 +13,8 @@ import { } from '@novu/shared'; import { buildGroupedBlueprintsKey, + CacheInMemoryProviderService, CacheService, - InMemoryProviderEnum, - InMemoryProviderService, InvalidateCacheService, } from '@novu/application-generic'; @@ -34,8 +33,8 @@ describe('Get grouped notification template blueprints - /blueprints/group-by-ca let indexModuleStub: sinon.SinonStub; before(async () => { - const inMemoryProviderService = new InMemoryProviderService(InMemoryProviderEnum.REDIS); - const cacheService = new CacheService(inMemoryProviderService); + const cacheInMemoryProviderService = new CacheInMemoryProviderService(); + const cacheService = new CacheService(cacheInMemoryProviderService); await cacheService.initialize(); invalidateCache = new InvalidateCacheService(cacheService); diff --git a/apps/api/src/app/events/e2e/process-subscriber.e2e.ts b/apps/api/src/app/events/e2e/process-subscriber.e2e.ts index 22fe8f871c6..0e92775950c 100644 --- a/apps/api/src/app/events/e2e/process-subscriber.e2e.ts +++ b/apps/api/src/app/events/e2e/process-subscriber.e2e.ts @@ -12,9 +12,8 @@ import { ChannelTypeEnum, ISubscribersDefine, IUpdateNotificationTemplateDto, St import { buildNotificationTemplateIdentifierKey, buildNotificationTemplateKey, + CacheInMemoryProviderService, CacheService, - InMemoryProviderEnum, - InMemoryProviderService, InvalidateCacheService, } from '@novu/application-generic'; @@ -29,15 +28,15 @@ describe('Trigger event - process subscriber /v1/events/trigger (POST)', functio let subscriberService: SubscribersService; let cacheService: CacheService; let invalidateCache: InvalidateCacheService; - let inMemoryProviderService: InMemoryProviderService; + let cacheInMemoryProviderService: CacheInMemoryProviderService; const subscriberRepository = new SubscriberRepository(); const messageRepository = new MessageRepository(); const notificationTemplateRepository = new NotificationTemplateRepository(); before(async () => { - inMemoryProviderService = new InMemoryProviderService(InMemoryProviderEnum.REDIS); - cacheService = new CacheService(inMemoryProviderService); + cacheInMemoryProviderService = new CacheInMemoryProviderService(); + cacheService = new CacheService(cacheInMemoryProviderService); await cacheService.initialize(); invalidateCache = new InvalidateCacheService(cacheService); }); diff --git a/apps/api/src/app/health/e2e/health-check.e2e.ts b/apps/api/src/app/health/e2e/health-check.e2e.ts index 8644e96ba64..a5b1d0e3670 100644 --- a/apps/api/src/app/health/e2e/health-check.e2e.ts +++ b/apps/api/src/app/health/e2e/health-check.e2e.ts @@ -1,13 +1,10 @@ import { UserSession } from '@novu/testing'; -import { InMemoryProviderEnum, InMemoryProviderService } from '@novu/application-generic'; import { expect } from 'chai'; describe('Health-check', () => { const session = new UserSession(); before(async () => { - const inMemoryProviderService = new InMemoryProviderService(InMemoryProviderEnum.REDIS); - await session.initialize(); }); diff --git a/apps/api/src/app/widgets/e2e/get-count.e2e.ts b/apps/api/src/app/widgets/e2e/get-count.e2e.ts index 835e31b919c..b6fa214b411 100644 --- a/apps/api/src/app/widgets/e2e/get-count.e2e.ts +++ b/apps/api/src/app/widgets/e2e/get-count.e2e.ts @@ -6,9 +6,8 @@ import { ChannelTypeEnum, InAppProviderIdEnum } from '@novu/shared'; import { buildFeedKey, buildMessageCountKey, + CacheInMemoryProviderService, CacheService, - InMemoryProviderEnum, - InMemoryProviderService, InvalidateCacheService, } from '@novu/application-generic'; @@ -23,11 +22,11 @@ describe('Count - GET /widget/notifications/count', function () { } | null = null; let invalidateCache: InvalidateCacheService; - let inMemoryProviderService: InMemoryProviderService; + let cacheInMemoryProviderService: CacheInMemoryProviderService; before(async () => { - inMemoryProviderService = new InMemoryProviderService(InMemoryProviderEnum.REDIS); - const cacheService = new CacheService(inMemoryProviderService); + cacheInMemoryProviderService = new CacheInMemoryProviderService(); + const cacheService = new CacheService(cacheInMemoryProviderService); await cacheService.initialize(); invalidateCache = new InvalidateCacheService(cacheService); }); @@ -256,7 +255,7 @@ describe('Count - GET /widget/notifications/count', function () { const messages = await messageRepository.findBySubscriberChannel( session.environment._id, - subscriberProfile!._id, + String(subscriberProfile?._id), ChannelTypeEnum.IN_APP ); const messageId = messages[0]._id; @@ -319,7 +318,7 @@ describe('Count - GET /widget/notifications/count', function () { const messages = await messageRepository.findBySubscriberChannel( session.environment._id, - subscriberProfile!._id, + String(subscriberProfile?._id), ChannelTypeEnum.IN_APP ); const messageId = messages[0]._id; diff --git a/apps/api/src/app/widgets/e2e/get-unseen-count.e2e.ts b/apps/api/src/app/widgets/e2e/get-unseen-count.e2e.ts index d1a7c6aa39a..e2325e59b1e 100644 --- a/apps/api/src/app/widgets/e2e/get-unseen-count.e2e.ts +++ b/apps/api/src/app/widgets/e2e/get-unseen-count.e2e.ts @@ -6,9 +6,8 @@ import { ChannelTypeEnum } from '@novu/shared'; import { buildFeedKey, buildMessageCountKey, + CacheInMemoryProviderService, CacheService, - InMemoryProviderEnum, - InMemoryProviderService, InvalidateCacheService, } from '@novu/application-generic'; @@ -22,12 +21,12 @@ describe('Unseen Count - GET /widget/notifications/unseen', function () { _id: string; } | null = null; - let inMemoryProviderService: InMemoryProviderService; + let cacheInMemoryProviderService: CacheInMemoryProviderService; let invalidateCache: InvalidateCacheService; before(async () => { - inMemoryProviderService = new InMemoryProviderService(InMemoryProviderEnum.REDIS); - const cacheService = new CacheService(inMemoryProviderService); + cacheInMemoryProviderService = new CacheInMemoryProviderService(); + const cacheService = new CacheService(cacheInMemoryProviderService); await cacheService.initialize(); invalidateCache = new InvalidateCacheService(cacheService); }); @@ -68,7 +67,7 @@ describe('Unseen Count - GET /widget/notifications/unseen', function () { const messages = await messageRepository.findBySubscriberChannel( session.environment._id, - subscriberProfile!._id, + String(subscriberProfile?._id), ChannelTypeEnum.IN_APP ); const messageId = messages[0]._id; @@ -97,7 +96,7 @@ describe('Unseen Count - GET /widget/notifications/unseen', function () { const messages = await messageRepository.findBySubscriberChannel( session.environment._id, - subscriberProfile!._id, + String(subscriberProfile?._id), ChannelTypeEnum.IN_APP ); const messageId = messages[0]._id; @@ -126,7 +125,7 @@ describe('Unseen Count - GET /widget/notifications/unseen', function () { const messages = await messageRepository.findBySubscriberChannel( session.environment._id, - subscriberProfile!._id, + String(subscriberProfile?._id), ChannelTypeEnum.IN_APP ); const messageId = messages[0]._id; @@ -155,7 +154,7 @@ describe('Unseen Count - GET /widget/notifications/unseen', function () { const messages = await messageRepository.findBySubscriberChannel( session.environment._id, - subscriberProfile!._id, + String(subscriberProfile?._id), ChannelTypeEnum.IN_APP ); const messageId = messages[0]._id; diff --git a/apps/api/src/app/widgets/e2e/initialize-widget-session.e2e.ts b/apps/api/src/app/widgets/e2e/initialize-widget-session.e2e.ts index 1fa1bc924f1..e2f5b0292e5 100644 --- a/apps/api/src/app/widgets/e2e/initialize-widget-session.e2e.ts +++ b/apps/api/src/app/widgets/e2e/initialize-widget-session.e2e.ts @@ -5,10 +5,9 @@ import { expect } from 'chai'; import { createHash } from '../../shared/helpers/hmac.service'; import { buildIntegrationKey, + CacheInMemoryProviderService, CacheService, - InMemoryProviderService, InvalidateCacheService, - InMemoryProviderEnum, } from '@novu/application-generic'; const integrationRepository = new IntegrationRepository(); @@ -19,8 +18,8 @@ describe('Initialize Session - /widgets/session/initialize (POST)', async () => let invalidateCache: InvalidateCacheService; before(async () => { - const inMemoryProviderService = new InMemoryProviderService(InMemoryProviderEnum.REDIS); - const cacheService = new CacheService(inMemoryProviderService); + const cacheInMemoryProviderService = new CacheInMemoryProviderService(); + const cacheService = new CacheService(cacheInMemoryProviderService); await cacheService.initialize(); invalidateCache = new InvalidateCacheService(cacheService); }); diff --git a/apps/worker/src/app/health/e2e/health-check.e2e.ts b/apps/worker/src/app/health/e2e/health-check.e2e.ts index 55fa750b699..9bcc4f98e80 100644 --- a/apps/worker/src/app/health/e2e/health-check.e2e.ts +++ b/apps/worker/src/app/health/e2e/health-check.e2e.ts @@ -1,4 +1,3 @@ -import { InMemoryProviderEnum, InMemoryProviderService } from '@novu/application-generic'; import { expect } from 'chai'; import * as request from 'supertest'; import * as defaults from 'superagent-defaults'; @@ -7,9 +6,6 @@ describe('Health-check', () => { let testAgent; before(async () => { - const inMemoryProviderService = new InMemoryProviderService(InMemoryProviderEnum.REDIS); - await inMemoryProviderService.delayUntilReadiness(); - testAgent = defaults(request(`http://localhost:${process.env.PORT}`)); }); diff --git a/packages/application-generic/src/custom-providers/index.ts b/packages/application-generic/src/custom-providers/index.ts index f1a174be828..ba39d9671ad 100644 --- a/packages/application-generic/src/custom-providers/index.ts +++ b/packages/application-generic/src/custom-providers/index.ts @@ -1,14 +1,14 @@ import { AnalyticsService, BullMqService, + CacheInMemoryProviderService, CacheService, DistributedLockService, FeatureFlagsService, - InMemoryProviderEnum, - InMemoryProviderService, ReadinessService, OldInstanceBullMqService, StandardQueueService, + SubscriberProcessQueueService, WebSocketsQueueService, WorkflowQueueService, } from '../services'; @@ -16,7 +16,6 @@ import { GetIsTopicNotificationEnabled, GetUseMergedDigestId, } from '../usecases'; -import { SubscriberProcessQueueService } from '../services/queues/subscriber-process-queue.service'; export const featureFlagsService = { provide: FeatureFlagsService, @@ -52,13 +51,10 @@ export const getIsTopicNotificationEnabled = { inject: [FeatureFlagsService], }; -export const inMemoryProviderService = { - provide: InMemoryProviderService, - useFactory: ( - provider: InMemoryProviderEnum, - enableAutoPipelining?: boolean - ): InMemoryProviderService => { - return new InMemoryProviderService(provider, enableAutoPipelining); +export const cacheInMemoryProviderService = { + provide: CacheInMemoryProviderService, + useFactory: (): CacheInMemoryProviderService => { + return new CacheInMemoryProviderService(); }, }; @@ -87,14 +83,10 @@ export const oldInstanceBullMqService = { export const cacheService = { provide: CacheService, useFactory: async (): Promise => { - const enableAutoPipelining = - process.env.REDIS_CACHE_ENABLE_AUTOPIPELINING === 'false'; - const factoryInMemoryProviderService = inMemoryProviderService.useFactory( - InMemoryProviderEnum.ELASTICACHE, - enableAutoPipelining - ); + const factoryCacheInMemoryProviderService = + cacheInMemoryProviderService.useFactory(); - const service = new CacheService(factoryInMemoryProviderService); + const service = new CacheService(factoryCacheInMemoryProviderService); await service.initialize(); @@ -115,11 +107,12 @@ export const analyticsService = { export const distributedLockService = { provide: DistributedLockService, useFactory: async (): Promise => { - const factoryInMemoryProviderService = inMemoryProviderService.useFactory( - InMemoryProviderEnum.ELASTICACHE - ); + const factoryCacheInMemoryProviderService = + cacheInMemoryProviderService.useFactory(); - const service = new DistributedLockService(factoryInMemoryProviderService); + const service = new DistributedLockService( + factoryCacheInMemoryProviderService + ); await service.initialize(); diff --git a/packages/application-generic/src/health/active-jobs-metric-queue.health-indicator.ts b/packages/application-generic/src/health/active-jobs-metric-queue.health-indicator.ts index fe5a80d525b..26a0176dacc 100644 --- a/packages/application-generic/src/health/active-jobs-metric-queue.health-indicator.ts +++ b/packages/application-generic/src/health/active-jobs-metric-queue.health-indicator.ts @@ -1,6 +1,6 @@ import { Injectable } from '@nestjs/common'; -import { ActiveJobsMetricQueueService } from '../services'; +import { ActiveJobsMetricQueueService } from '../services/queues'; import { QueueHealthIndicator } from './queue-health-indicator.service'; const LOG_CONTEXT = 'ActiveJobsMetricQueueServiceHealthIndicator'; diff --git a/packages/application-generic/src/health/cache.health-indicator.ts b/packages/application-generic/src/health/cache.health-indicator.ts index 5c2a765e50f..1e000b83083 100644 --- a/packages/application-generic/src/health/cache.health-indicator.ts +++ b/packages/application-generic/src/health/cache.health-indicator.ts @@ -5,7 +5,7 @@ import { } from '@nestjs/terminus'; import { Injectable, Logger } from '@nestjs/common'; -import { CacheService } from '../services'; +import { CacheService } from '../services/cache'; const LOG_CONTEXT = 'CacheServiceHealthIndicator'; diff --git a/packages/application-generic/src/health/completed-jobs-metric-queue.health-indicator.ts b/packages/application-generic/src/health/completed-jobs-metric-queue.health-indicator.ts index efa011c3835..e969190a323 100644 --- a/packages/application-generic/src/health/completed-jobs-metric-queue.health-indicator.ts +++ b/packages/application-generic/src/health/completed-jobs-metric-queue.health-indicator.ts @@ -1,6 +1,6 @@ import { Injectable } from '@nestjs/common'; -import { CompletedJobsMetricQueueService } from '../services'; +import { CompletedJobsMetricQueueService } from '../services/queues'; import { QueueHealthIndicator } from './queue-health-indicator.service'; const LOG_CONTEXT = 'CompletedJobsMetricQueueServiceHealthIndicator'; diff --git a/packages/application-generic/src/health/inbound-parse-queue.health-indicator.ts b/packages/application-generic/src/health/inbound-parse-queue.health-indicator.ts index d408277f4a4..c73f651add2 100644 --- a/packages/application-generic/src/health/inbound-parse-queue.health-indicator.ts +++ b/packages/application-generic/src/health/inbound-parse-queue.health-indicator.ts @@ -1,6 +1,6 @@ import { Injectable } from '@nestjs/common'; -import { InboundParseQueue } from '../services'; +import { InboundParseQueue } from '../services/queues'; import { QueueHealthIndicator } from './queue-health-indicator.service'; const LOG_CONTEXT = 'InboundParseQueueServiceHealthIndicator'; diff --git a/packages/application-generic/src/health/queue-health-indicator.service.ts b/packages/application-generic/src/health/queue-health-indicator.service.ts index 16954b233e3..90e8b466165 100644 --- a/packages/application-generic/src/health/queue-health-indicator.service.ts +++ b/packages/application-generic/src/health/queue-health-indicator.service.ts @@ -5,7 +5,7 @@ import { } from '@nestjs/terminus'; import { Injectable, Logger } from '@nestjs/common'; -import { QueueBaseService } from '../services'; +import { QueueBaseService } from '../services/queues/queue-base.service'; import { IHealthIndicator } from './health-indicator.interface'; @Injectable() diff --git a/packages/application-generic/src/health/standard-queue.health-indicator.ts b/packages/application-generic/src/health/standard-queue.health-indicator.ts index aa4c9a31908..dbee0dce903 100644 --- a/packages/application-generic/src/health/standard-queue.health-indicator.ts +++ b/packages/application-generic/src/health/standard-queue.health-indicator.ts @@ -1,6 +1,6 @@ import { Injectable } from '@nestjs/common'; -import { StandardQueueService } from '../services'; +import { StandardQueueService } from '../services/queues'; import { QueueHealthIndicator } from './queue-health-indicator.service'; const LOG_CONTEXT = 'StandardQueueServiceHealthIndicator'; diff --git a/packages/application-generic/src/health/subscriber-process-queue.health-indicator.ts b/packages/application-generic/src/health/subscriber-process-queue.health-indicator.ts index 96251dc6230..d9e6a9384ee 100644 --- a/packages/application-generic/src/health/subscriber-process-queue.health-indicator.ts +++ b/packages/application-generic/src/health/subscriber-process-queue.health-indicator.ts @@ -1,6 +1,6 @@ import { Injectable } from '@nestjs/common'; -import { SubscriberProcessQueueService } from '../services'; +import { SubscriberProcessQueueService } from '../services/queues'; import { ObservabilityBackgroundTransactionEnum } from '@novu/shared'; import { QueueHealthIndicator } from './queue-health-indicator.service'; diff --git a/packages/application-generic/src/health/web-sockets-queue.health-indicator.ts b/packages/application-generic/src/health/web-sockets-queue.health-indicator.ts index 9ef05994af1..8211ecdcda1 100644 --- a/packages/application-generic/src/health/web-sockets-queue.health-indicator.ts +++ b/packages/application-generic/src/health/web-sockets-queue.health-indicator.ts @@ -1,6 +1,6 @@ import { Injectable } from '@nestjs/common'; -import { WebSocketsQueueService } from '../services'; +import { WebSocketsQueueService } from '../services/queues'; import { QueueHealthIndicator } from './queue-health-indicator.service'; const LOG_CONTEXT = 'WebSocketsQueueServiceHealthIndicator'; diff --git a/packages/application-generic/src/health/workflow-queue.health-indicator.ts b/packages/application-generic/src/health/workflow-queue.health-indicator.ts index 6f5dd361a4a..34fa1011f39 100644 --- a/packages/application-generic/src/health/workflow-queue.health-indicator.ts +++ b/packages/application-generic/src/health/workflow-queue.health-indicator.ts @@ -1,6 +1,6 @@ import { Injectable } from '@nestjs/common'; -import { WorkflowQueueService } from '../services'; +import { WorkflowQueueService } from '../services/queues'; import { QueueHealthIndicator } from './queue-health-indicator.service'; const LOG_CONTEXT = 'WorkflowQueueServiceHealthIndicator'; diff --git a/packages/application-generic/src/modules/queues.module.ts b/packages/application-generic/src/modules/queues.module.ts index 9f925d91463..4c4e091aacc 100644 --- a/packages/application-generic/src/modules/queues.module.ts +++ b/packages/application-generic/src/modules/queues.module.ts @@ -6,6 +6,7 @@ import { CompletedJobsMetricQueueServiceHealthIndicator, InboundParseQueueServiceHealthIndicator, StandardQueueServiceHealthIndicator, + SubscriberProcessQueueHealthIndicator, WebSocketsQueueServiceHealthIndicator, WorkflowQueueServiceHealthIndicator, } from '../health'; @@ -15,6 +16,7 @@ import { CompletedJobsMetricQueueService, InboundParseQueue, StandardQueueService, + SubscriberProcessQueueService, WebSocketsQueueService, WorkflowQueueService, } from '../services/queues'; @@ -23,16 +25,13 @@ import { CompletedJobsMetricWorkerService, InboundParseWorker, StandardWorkerService, + SubscriberProcessWorkerService, WebSocketsWorkerService, WorkflowWorkerService, OldInstanceStandardWorkerService, OldInstanceWorkflowWorkerService, } from '../services/workers'; -import { SubscriberProcessQueueService } from '../services/queues/subscriber-process-queue.service'; -import { SubscriberProcessWorkerService } from '../services/workers/subscriber-process-worker.service'; -import { SubscriberProcessQueueHealthIndicator } from '../health/subscriber-process-queue.health-indicator'; - const PROVIDERS: Provider[] = [ ActiveJobsMetricQueueService, ActiveJobsMetricQueueServiceHealthIndicator, diff --git a/packages/application-generic/src/services/bull-mq/bull-mq.service.ts b/packages/application-generic/src/services/bull-mq/bull-mq.service.ts index a788754d2dd..190b9ef18a0 100644 --- a/packages/application-generic/src/services/bull-mq/bull-mq.service.ts +++ b/packages/application-generic/src/services/bull-mq/bull-mq.service.ts @@ -15,10 +15,7 @@ import { import { Injectable, Logger } from '@nestjs/common'; import { IEventJobData, IJobData, JobTopicNameEnum } from '@novu/shared'; -import { - InMemoryProviderEnum, - InMemoryProviderService, -} from '../in-memory-provider'; +import { WorkflowInMemoryProviderService } from '../in-memory-provider'; interface IQueueMetrics { completed: Metrics; @@ -46,35 +43,18 @@ export { export class BullMqService { private _queue: Queue; private _worker: Worker; - private inMemoryProviderService: InMemoryProviderService; + private workflowInMemoryProviderService: WorkflowInMemoryProviderService; public static readonly pro: boolean = process.env.NOVU_MANAGED_SERVICE !== undefined; constructor() { - this.inMemoryProviderService = new InMemoryProviderService( - this.selectProvider() - ); - } - - /** - * Rules for the provider selection: - * - For our self hosted users we assume all of them have a single node Redis - * instance. - * - For Novu we will use MemoryDB. We fallback to a Redis Cluster configuration - * if MemoryDB not configured properly. That's happening in the provider - * mapping in the /in-memory-provider/providers/index.ts - */ - private selectProvider(): InMemoryProviderEnum { - if (process.env.IS_DOCKER_HOSTED) { - return InMemoryProviderEnum.REDIS; - } - - return InMemoryProviderEnum.MEMORY_DB; + this.workflowInMemoryProviderService = + new WorkflowInMemoryProviderService(); } public async initialize(): Promise { - await this.inMemoryProviderService.delayUntilReadiness(); + await this.workflowInMemoryProviderService.initialize(); } public get worker(): Worker { @@ -129,11 +109,7 @@ export class BullMqService { * */ private generatePrefix(prefix: JobTopicNameEnum): string { - const isClusterMode = this.inMemoryProviderService.isClusterMode(); - const providerConfigured = - this.inMemoryProviderService.getProvider.configured; - - if (isClusterMode || providerConfigured !== InMemoryProviderEnum.REDIS) { + if (this.workflowInMemoryProviderService.providerInUseIsInClusterMode()) { return `{${prefix}}`; } @@ -142,7 +118,7 @@ export class BullMqService { public createQueue(topic: JobTopicNameEnum, queueOptions: QueueOptions) { const config = { - connection: this.inMemoryProviderService.inMemoryProviderClient, + connection: this.workflowInMemoryProviderService.getClient(), ...(queueOptions?.defaultJobOptions && { defaultJobOptions: { ...queueOptions.defaultJobOptions, @@ -184,7 +160,7 @@ export class BullMqService { const { concurrency, connection, lockDuration, settings } = workerOptions; const config = { - connection: this.inMemoryProviderService.inMemoryProviderClient, + connection: this.workflowInMemoryProviderService.getClient(), ...(concurrency && { concurrency }), ...(lockDuration && { lockDuration }), ...(settings && { settings }), @@ -276,7 +252,7 @@ export class BullMqService { await this._worker.close(); } - await this.inMemoryProviderService.shutdown(); + await this.workflowInMemoryProviderService.shutdown(); Logger.log('Shutting down the BullMQ service has finished', LOG_CONTEXT); } @@ -304,7 +280,7 @@ export class BullMqService { } public isClientReady(): boolean { - return this.inMemoryProviderService.isClientReady(); + return this.workflowInMemoryProviderService.isReady(); } public async isQueuePaused(): Promise { diff --git a/packages/application-generic/src/services/bull-mq/old-instance-bull-mq.service.ts b/packages/application-generic/src/services/bull-mq/old-instance-bull-mq.service.ts index 211102f443e..d9cd7b4bacf 100644 --- a/packages/application-generic/src/services/bull-mq/old-instance-bull-mq.service.ts +++ b/packages/application-generic/src/services/bull-mq/old-instance-bull-mq.service.ts @@ -49,7 +49,8 @@ export class OldInstanceBullMqService { constructor() { if (this.shouldInstantiate()) { this.inMemoryProviderService = new InMemoryProviderService( - InMemoryProviderEnum.OLD_INSTANCE_REDIS + InMemoryProviderEnum.OLD_INSTANCE_REDIS, + true ); this.enabled = true; } else { diff --git a/packages/application-generic/src/services/cache/cache-service.spec.ts b/packages/application-generic/src/services/cache/cache-service.spec.ts index 545bf45b548..91c413e0aa4 100644 --- a/packages/application-generic/src/services/cache/cache-service.spec.ts +++ b/packages/application-generic/src/services/cache/cache-service.spec.ts @@ -5,13 +5,7 @@ import { splitKey, } from './cache.service'; -import { - InMemoryProviderEnum, - InMemoryProviderService, -} from '../in-memory-provider'; - -const enableAutoPipelining = - process.env.REDIS_CACHE_ENABLE_AUTOPIPELINING === 'true'; +import { CacheInMemoryProviderService } from '../in-memory-provider'; /** * TODO: Maybe create a Test single Redis instance to be able to run it in the @@ -19,24 +13,20 @@ const enableAutoPipelining = */ describe.skip('Cache Service - Redis Instance - Non Cluster Mode', () => { let cacheService: CacheService; - let inMemoryProviderService: InMemoryProviderService; + let cacheInMemoryProviderService: CacheInMemoryProviderService; beforeAll(async () => { process.env.IS_IN_MEMORY_CLUSTER_MODE_ENABLED = 'false'; - inMemoryProviderService = new InMemoryProviderService( - InMemoryProviderEnum.REDIS, - enableAutoPipelining - ); - await inMemoryProviderService.delayUntilReadiness(); - expect(inMemoryProviderService.isClusterMode()).toBe(false); + cacheInMemoryProviderService = new CacheInMemoryProviderService(); + expect(cacheInMemoryProviderService.isCluster).toBe(false); - cacheService = new CacheService(inMemoryProviderService); + cacheService = new CacheService(cacheInMemoryProviderService); await cacheService.initialize(); }); afterAll(async () => { - await inMemoryProviderService.shutdown(); + await cacheInMemoryProviderService.shutdown(); }); it('should be instantiated properly', async () => { @@ -80,24 +70,20 @@ describe.skip('Cache Service - Redis Instance - Non Cluster Mode', () => { describe('Cache Service - Cluster Mode', () => { let cacheService: CacheService; - let inMemoryProviderService: InMemoryProviderService; + let cacheInMemoryProviderService: CacheInMemoryProviderService; beforeAll(async () => { process.env.IS_IN_MEMORY_CLUSTER_MODE_ENABLED = 'true'; - inMemoryProviderService = new InMemoryProviderService( - InMemoryProviderEnum.REDIS, - enableAutoPipelining - ); - await inMemoryProviderService.delayUntilReadiness(); - expect(inMemoryProviderService.isClusterMode()).toBe(true); + cacheInMemoryProviderService = new CacheInMemoryProviderService(); + expect(cacheInMemoryProviderService.isCluster).toBe(true); - cacheService = new CacheService(inMemoryProviderService); + cacheService = new CacheService(cacheInMemoryProviderService); await cacheService.initialize(); }); afterAll(async () => { - await inMemoryProviderService.shutdown(); + await cacheInMemoryProviderService.shutdown(); }); it('should be instantiated properly', async () => { diff --git a/packages/application-generic/src/services/cache/cache.service.ts b/packages/application-generic/src/services/cache/cache.service.ts index 3623d420bde..2a2829a4659 100644 --- a/packages/application-generic/src/services/cache/cache.service.ts +++ b/packages/application-generic/src/services/cache/cache.service.ts @@ -2,8 +2,8 @@ import { Injectable, Logger } from '@nestjs/common'; import { QUERY_PREFIX } from './key-builders'; import { + CacheInMemoryProviderService, InMemoryProviderClient, - InMemoryProviderService, Pipeline, } from '../in-memory-provider'; import { addJitter } from '../../resilience'; @@ -30,23 +30,27 @@ export type CachingConfig = { }; export class CacheService implements ICacheService { - private client: InMemoryProviderClient; private cacheTtl: number; private readonly TTL_VARIANT_PERCENTAGE = 0.1; - constructor(private inMemoryProviderService: InMemoryProviderService) {} + constructor( + private cacheInMemoryProviderService: CacheInMemoryProviderService + ) {} public async initialize(): Promise { Logger.log('Initiated cache service', LOG_CONTEXT); - await this.inMemoryProviderService.delayUntilReadiness(); + await this.cacheInMemoryProviderService.initialize(); - this.client = this.inMemoryProviderService.inMemoryProviderClient; - this.cacheTtl = this.inMemoryProviderService.inMemoryProviderConfig.ttl; + this.cacheTtl = this.cacheInMemoryProviderService.getTtl(); + } + + public get client(): InMemoryProviderClient { + return this.cacheInMemoryProviderService.getClient(); } public getStatus(): string { - return this.client?.status; + return this.cacheInMemoryProviderService.getClientStatus(); } public getTtl(): number { @@ -54,7 +58,7 @@ export class CacheService implements ICacheService { } public cacheEnabled(): boolean { - const isEnabled = this.inMemoryProviderService.isClientReady(); + const isEnabled = this.cacheInMemoryProviderService.isReady(); if (!isEnabled) { Logger.log('Cache service is not enabled', LOG_CONTEXT); } @@ -113,7 +117,7 @@ export class CacheService implements ICacheService { pipeline.sadd(credentials, query); pipeline.expire( credentials, - this.inMemoryProviderService.inMemoryProviderConfig.ttl + + this.cacheInMemoryProviderService.getTtl() + this.getTtlInSeconds(options) ); @@ -189,7 +193,7 @@ export class CacheService implements ICacheService { if (client) { return new Promise((resolve, reject) => { - const stream = this.inMemoryProviderService.inMemoryScan(pattern); + const stream = this.cacheInMemoryProviderService.inMemoryScan(pattern); stream.on('data', function (keys) { if (keys.length) { diff --git a/packages/application-generic/src/services/distributed-lock/distributed-lock.service.spec.ts b/packages/application-generic/src/services/distributed-lock/distributed-lock.service.spec.ts index b4d8ad0f3c6..a8ade37edcd 100644 --- a/packages/application-generic/src/services/distributed-lock/distributed-lock.service.spec.ts +++ b/packages/application-generic/src/services/distributed-lock/distributed-lock.service.spec.ts @@ -7,7 +7,7 @@ import { FeatureFlagsService } from '../feature-flags.service'; import { InMemoryProviderClient, InMemoryProviderEnum, - InMemoryProviderService, + CacheInMemoryProviderService, } from '../in-memory-provider'; const originalRedisCacheServiceHost = (process.env.REDIS_CACHE_SERVICE_HOST = @@ -38,20 +38,18 @@ describe('Distributed Lock Service', () => { }); describe('In-memory provider service set', () => { - let inMemoryProviderService: InMemoryProviderService; + let cacheInMemoryProviderService: CacheInMemoryProviderService; let distributedLockService: DistributedLockService; beforeEach(async () => { - inMemoryProviderService = new InMemoryProviderService( - InMemoryProviderEnum.REDIS - ); + cacheInMemoryProviderService = new CacheInMemoryProviderService(); - await inMemoryProviderService.delayUntilReadiness(); + await cacheInMemoryProviderService.initialize(); - expect(inMemoryProviderService.getStatus()).toEqual('ready'); + expect(cacheInMemoryProviderService.getClientStatus()).toEqual('ready'); distributedLockService = new DistributedLockService( - inMemoryProviderService + cacheInMemoryProviderService ); await distributedLockService.initialize(); }); @@ -290,7 +288,7 @@ describe('Distributed Lock Service', () => { }); describe('Bypass - In-memory provider service not set', () => { - let inMemoryProviderService: InMemoryProviderService; + let cacheInMemoryProviderService: CacheInMemoryProviderService; let distributedLockService: DistributedLockService; beforeEach(async () => { @@ -299,10 +297,11 @@ describe('Distributed Lock Service', () => { process.env.REDIS_CLUSTER_SERVICE_HOST = ''; process.env.REDIS_CLUSTER_SERVICE_PORTS = ''; - inMemoryProviderService = new InMemoryProviderService(undefined); - expect(inMemoryProviderService.inMemoryProviderConfig.host).toEqual( - 'localhost' - ); + cacheInMemoryProviderService = new CacheInMemoryProviderService(); + expect( + cacheInMemoryProviderService.inMemoryProviderService + .inMemoryProviderConfig.host + ).toEqual('localhost'); distributedLockService = new DistributedLockService(undefined); // If no initializing the service is like the client is not properly set }); diff --git a/packages/application-generic/src/services/distributed-lock/distributed-lock.service.ts b/packages/application-generic/src/services/distributed-lock/distributed-lock.service.ts index 3b94fe6cb63..952790a10c7 100644 --- a/packages/application-generic/src/services/distributed-lock/distributed-lock.service.ts +++ b/packages/application-generic/src/services/distributed-lock/distributed-lock.service.ts @@ -4,8 +4,7 @@ import { Injectable, Logger } from '@nestjs/common'; import { InMemoryProviderClient, - InMemoryProviderEnum, - InMemoryProviderService, + CacheInMemoryProviderService, } from '../in-memory-provider'; const LOG_CONTEXT = 'DistributedLock'; @@ -22,11 +21,13 @@ export class DistributedLockService { public lockCounter = {}; public shuttingDown = false; - constructor(private inMemoryProviderService: InMemoryProviderService) {} + constructor( + private cacheInMemoryProviderService: CacheInMemoryProviderService + ) {} async initialize(): Promise { - await this.inMemoryProviderService.delayUntilReadiness(); - this.startup(this.inMemoryProviderService.inMemoryProviderClient); + await this.cacheInMemoryProviderService.initialize(); + this.startup(this.cacheInMemoryProviderService.getClient()); } public startup( @@ -103,7 +104,7 @@ export class DistributedLockService { } finally { this.shuttingDown = false; this.distributedLock = undefined; - await this.inMemoryProviderService.shutdown(); + await this.cacheInMemoryProviderService.shutdown(); Logger.verbose('Redlock shutdown', LOG_CONTEXT); } } diff --git a/packages/application-generic/src/services/in-memory-provider/cache-in-memory-provider.service.ts b/packages/application-generic/src/services/in-memory-provider/cache-in-memory-provider.service.ts new file mode 100644 index 00000000000..3344a7e24c6 --- /dev/null +++ b/packages/application-generic/src/services/in-memory-provider/cache-in-memory-provider.service.ts @@ -0,0 +1,105 @@ +import { Logger } from '@nestjs/common'; + +import { InMemoryProviderService } from './in-memory-provider.service'; +import { + InMemoryProviderEnum, + InMemoryProviderClient, + ScanStream, +} from './types'; + +import { GetIsInMemoryClusterModeEnabled } from '../../usecases'; + +const LOG_CONTEXT = 'CacheInMemoryProviderService'; + +export class CacheInMemoryProviderService { + public inMemoryProviderService: InMemoryProviderService; + public isCluster: boolean; + private getIsInMemoryClusterModeEnabled: GetIsInMemoryClusterModeEnabled; + + constructor() { + this.getIsInMemoryClusterModeEnabled = + new GetIsInMemoryClusterModeEnabled(); + + const provider = this.selectProvider(); + this.isCluster = this.isClusterMode(); + + const enableAutoPipelining = + process.env.REDIS_CACHE_ENABLE_AUTOPIPELINING === 'true'; + + this.inMemoryProviderService = new InMemoryProviderService( + provider, + this.isCluster, + enableAutoPipelining + ); + } + + /** + * Rules for the provider selection: + * - For our self hosted users we assume all of them have a single node Redis + * instance. + * - For Novu we will use Elasticache. We fallback to a Redis Cluster configuration + * if Elasticache not configured properly. That's happening in the provider + * mapping in the /in-memory-provider/providers/index.ts + */ + private selectProvider(): InMemoryProviderEnum { + if (process.env.IS_DOCKER_HOSTED) { + return InMemoryProviderEnum.REDIS; + } + + return InMemoryProviderEnum.ELASTICACHE; + } + + private descriptiveLogMessage(message) { + return `[Provider: ${this.selectProvider()}] ${message}`; + } + + private isClusterMode(): boolean { + const isClusterModeEnabled = this.getIsInMemoryClusterModeEnabled.execute(); + + Logger.log( + this.descriptiveLogMessage( + `Cluster mode ${ + isClusterModeEnabled ? 'is' : 'is not' + } enabled for ${LOG_CONTEXT}` + ), + LOG_CONTEXT + ); + + return isClusterModeEnabled; + } + + public async initialize(): Promise { + await this.inMemoryProviderService.delayUntilReadiness(); + } + + public getClient(): InMemoryProviderClient { + return this.inMemoryProviderService.inMemoryProviderClient; + } + + public getClientStatus(): string { + return this.getClient().status; + } + + public getTtl(): number { + return this.inMemoryProviderService.inMemoryProviderConfig.ttl; + } + + public inMemoryScan(pattern: string): ScanStream { + return this.inMemoryProviderService.inMemoryScan(pattern); + } + + public isReady(): boolean { + return this.inMemoryProviderService.isClientReady(); + } + + public providerInUseIsInClusterMode(): boolean { + const providerConfigured = + this.inMemoryProviderService.getProvider.configured; + + return this.isCluster || providerConfigured !== InMemoryProviderEnum.REDIS; + } + + public async shutdown(): Promise { + await this.inMemoryProviderService.shutdown(); + } +} diff --git a/packages/application-generic/src/services/in-memory-provider/in-memory-provider.service.spec.ts b/packages/application-generic/src/services/in-memory-provider/in-memory-provider.service.spec.ts index d863d0030d9..82874c9e927 100644 --- a/packages/application-generic/src/services/in-memory-provider/in-memory-provider.service.spec.ts +++ b/packages/application-generic/src/services/in-memory-provider/in-memory-provider.service.spec.ts @@ -6,10 +6,9 @@ let inMemoryProviderService: InMemoryProviderService; describe('In-memory Provider Service', () => { describe('Non cluster mode', () => { beforeEach(async () => { - process.env.IS_IN_MEMORY_CLUSTER_MODE_ENABLED = 'false'; - inMemoryProviderService = new InMemoryProviderService( - InMemoryProviderEnum.REDIS + InMemoryProviderEnum.REDIS, + false ); await inMemoryProviderService.delayUntilReadiness(); @@ -45,8 +44,6 @@ describe('In-memory Provider Service', () => { }); it('should instantiate the provider properly', async () => { - expect(inMemoryProviderService.isClusterMode()).toEqual(false); - const { inMemoryProviderClient } = inMemoryProviderService; expect(inMemoryProviderClient.status).toEqual('ready'); @@ -84,10 +81,9 @@ describe('In-memory Provider Service', () => { describe('Cluster mode', () => { beforeEach(async () => { - process.env.IS_IN_MEMORY_CLUSTER_MODE_ENABLED = 'true'; - inMemoryProviderService = new InMemoryProviderService( - InMemoryProviderEnum.REDIS + InMemoryProviderEnum.REDIS, + true ); await inMemoryProviderService.delayUntilReadiness(); @@ -102,6 +98,7 @@ describe('In-memory Provider Service', () => { it('enableAutoPipelining is enabled', async () => { const clusterWithPipelining = new InMemoryProviderService( InMemoryProviderEnum.REDIS, + true, true ); await clusterWithPipelining.delayUntilReadiness(); @@ -147,8 +144,6 @@ describe('In-memory Provider Service', () => { }); it('should instantiate the provider properly', async () => { - expect(inMemoryProviderService.isClusterMode()).toEqual(true); - const { inMemoryProviderClient } = inMemoryProviderService; expect(inMemoryProviderClient.status).toEqual('ready'); diff --git a/packages/application-generic/src/services/in-memory-provider/in-memory-provider.service.ts b/packages/application-generic/src/services/in-memory-provider/in-memory-provider.service.ts index 9023e308b51..a3e4b3f0479 100644 --- a/packages/application-generic/src/services/in-memory-provider/in-memory-provider.service.ts +++ b/packages/application-generic/src/services/in-memory-provider/in-memory-provider.service.ts @@ -16,12 +16,9 @@ import { ScanStream, } from './types'; -import { GetIsInMemoryClusterModeEnabled } from '../../usecases'; - const LOG_CONTEXT = 'InMemoryProviderService'; export class InMemoryProviderService { - private getIsInMemoryClusterModeEnabled: GetIsInMemoryClusterModeEnabled; public inMemoryProviderClient: InMemoryProviderClient; public inMemoryProviderConfig: InMemoryProviderConfig; @@ -29,15 +26,13 @@ export class InMemoryProviderService { constructor( private provider: InMemoryProviderEnum, + private isCluster: boolean, private enableAutoPipelining?: boolean ) { Logger.log( this.descriptiveLogMessage('In-memory provider service initialized'), LOG_CONTEXT ); - - this.getIsInMemoryClusterModeEnabled = - new GetIsInMemoryClusterModeEnabled(); this.inMemoryProviderClient = this.buildClient(provider); } @@ -45,7 +40,7 @@ export class InMemoryProviderService { selected: InMemoryProviderEnum; configured: InMemoryProviderEnum; } { - const config = this.isClusterMode() + const config = this.isCluster ? getClientAndConfigForCluster(this.provider) : getClientAndConfig(); @@ -55,7 +50,7 @@ export class InMemoryProviderService { }; } - private descriptiveLogMessage(message) { + protected descriptiveLogMessage(message) { return `[Provider: ${this.provider}] ${message}`; } @@ -65,9 +60,7 @@ export class InMemoryProviderService { return this.oldInstanceInMemoryProviderSetup(); } - const isClusterMode = this.isClusterMode(); - - return isClusterMode + return this.isCluster ? this.inMemoryClusterProviderSetup(provider) : this.inMemoryProviderSetup(); } @@ -114,24 +107,8 @@ export class InMemoryProviderService { return this.isProviderClientReady(this.getStatus()); } - public isClusterMode(): boolean { - const isClusterModeEnabled = this.getIsInMemoryClusterModeEnabled.execute(); - - Logger.log( - this.descriptiveLogMessage( - `Cluster mode ${ - isClusterModeEnabled ? 'is' : 'is not' - } enabled for InMemoryProviderService` - ), - LOG_CONTEXT - ); - - return isClusterModeEnabled; - } - public getClusterOptions(): ClusterOptions | undefined { - const isClusterMode = this.isClusterMode(); - if (this.inMemoryProviderClient && isClusterMode) { + if (this.inMemoryProviderClient && this.isCluster) { return this.inMemoryProviderClient.options; } } @@ -140,7 +117,7 @@ export class InMemoryProviderService { if (this.inMemoryProviderClient) { if ( this.provider === InMemoryProviderEnum.OLD_INSTANCE_REDIS || - !this.isClusterMode() + !this.isCluster ) { const options: RedisOptions = this.inMemoryProviderClient.options; @@ -404,7 +381,7 @@ export class InMemoryProviderService { } public inMemoryScan(pattern: string): ScanStream { - if (this.isClusterMode()) { + if (this.isCluster) { const client = this.inMemoryProviderClient as Cluster; return client.sscanStream(pattern); diff --git a/packages/application-generic/src/services/in-memory-provider/index.ts b/packages/application-generic/src/services/in-memory-provider/index.ts index 63a4025e519..283190ecd4c 100644 --- a/packages/application-generic/src/services/in-memory-provider/index.ts +++ b/packages/application-generic/src/services/in-memory-provider/index.ts @@ -1,2 +1,4 @@ +export * from './cache-in-memory-provider.service'; export * from './in-memory-provider.service'; +export * from './workflow-in-memory-provider.service'; export * from './types'; diff --git a/packages/application-generic/src/services/in-memory-provider/workflow-in-memory-provider.service.ts b/packages/application-generic/src/services/in-memory-provider/workflow-in-memory-provider.service.ts new file mode 100644 index 00000000000..54ba363d0d6 --- /dev/null +++ b/packages/application-generic/src/services/in-memory-provider/workflow-in-memory-provider.service.ts @@ -0,0 +1,86 @@ +import { Logger } from '@nestjs/common'; + +import { InMemoryProviderService } from './in-memory-provider.service'; +import { InMemoryProviderEnum, InMemoryProviderClient } from './types'; + +import { GetIsInMemoryClusterModeEnabled } from '../../usecases'; + +const LOG_CONTEXT = 'WorkflowInMemoryProviderService'; + +export class WorkflowInMemoryProviderService { + public inMemoryProviderService: InMemoryProviderService; + public isCluster: boolean; + private getIsInMemoryClusterModeEnabled: GetIsInMemoryClusterModeEnabled; + + constructor() { + this.getIsInMemoryClusterModeEnabled = + new GetIsInMemoryClusterModeEnabled(); + + const provider = this.selectProvider(); + this.isCluster = this.isClusterMode(); + + this.inMemoryProviderService = new InMemoryProviderService( + provider, + this.isCluster, + false + ); + } + + /** + * Rules for the provider selection: + * - For our self hosted users we assume all of them have a single node Redis + * instance. + * - For Novu we will use MemoryDB. We fallback to a Redis Cluster configuration + * if MemoryDB not configured properly. That's happening in the provider + * mapping in the /in-memory-provider/providers/index.ts + */ + private selectProvider(): InMemoryProviderEnum { + if (process.env.IS_DOCKER_HOSTED) { + return InMemoryProviderEnum.REDIS; + } + + return InMemoryProviderEnum.MEMORY_DB; + } + + private descriptiveLogMessage(message) { + return `[Provider: ${this.selectProvider()}] ${message}`; + } + + private isClusterMode(): boolean { + const isClusterModeEnabled = this.getIsInMemoryClusterModeEnabled.execute(); + + Logger.log( + this.descriptiveLogMessage( + `Cluster mode ${ + isClusterModeEnabled ? 'is' : 'is not' + } enabled for ${LOG_CONTEXT}` + ), + LOG_CONTEXT + ); + + return isClusterModeEnabled; + } + + public async initialize(): Promise { + await this.inMemoryProviderService.delayUntilReadiness(); + } + + public getClient(): InMemoryProviderClient { + return this.inMemoryProviderService.inMemoryProviderClient; + } + + public isReady(): boolean { + return this.inMemoryProviderService.isClientReady(); + } + + public providerInUseIsInClusterMode(): boolean { + const providerConfigured = + this.inMemoryProviderService.getProvider.configured; + + return this.isCluster || providerConfigured !== InMemoryProviderEnum.REDIS; + } + + public async shutdown(): Promise { + await this.inMemoryProviderService.shutdown(); + } +} diff --git a/packages/application-generic/src/services/queues/active-jobs-metric-queue.service.ts b/packages/application-generic/src/services/queues/active-jobs-metric-queue.service.ts index 740cd3448db..21571e33425 100644 --- a/packages/application-generic/src/services/queues/active-jobs-metric-queue.service.ts +++ b/packages/application-generic/src/services/queues/active-jobs-metric-queue.service.ts @@ -1,7 +1,7 @@ import { Inject, Injectable, Logger } from '@nestjs/common'; import { JobTopicNameEnum } from '@novu/shared'; -import { QueueBaseService } from './index'; +import { QueueBaseService } from './queue-base.service'; const LOG_CONTEXT = 'ActiveJobsMetricQueueService'; diff --git a/packages/application-generic/src/services/queues/completed-jobs-metric-queue.service.ts b/packages/application-generic/src/services/queues/completed-jobs-metric-queue.service.ts index e4b4e84e9e9..1ef74d97871 100644 --- a/packages/application-generic/src/services/queues/completed-jobs-metric-queue.service.ts +++ b/packages/application-generic/src/services/queues/completed-jobs-metric-queue.service.ts @@ -1,7 +1,7 @@ import { Inject, Injectable, Logger } from '@nestjs/common'; import { JobTopicNameEnum } from '@novu/shared'; -import { QueueBaseService } from './index'; +import { QueueBaseService } from './queue-base.service'; const LOG_CONTEXT = 'CompletedJobsMetricQueueService'; diff --git a/packages/application-generic/src/services/queues/inbound-parse-queue.service.ts b/packages/application-generic/src/services/queues/inbound-parse-queue.service.ts index 6dc20a8ec4d..184b99dd09a 100644 --- a/packages/application-generic/src/services/queues/inbound-parse-queue.service.ts +++ b/packages/application-generic/src/services/queues/inbound-parse-queue.service.ts @@ -1,7 +1,7 @@ import { Inject, Injectable, Logger } from '@nestjs/common'; import { JobTopicNameEnum } from '@novu/shared'; -import { QueueBaseService } from './index'; +import { QueueBaseService } from './queue-base.service'; import { QueueOptions } from '../bull-mq'; diff --git a/packages/application-generic/src/services/queues/standard-queue.service.ts b/packages/application-generic/src/services/queues/standard-queue.service.ts index db9418482f3..75febb14e1b 100644 --- a/packages/application-generic/src/services/queues/standard-queue.service.ts +++ b/packages/application-generic/src/services/queues/standard-queue.service.ts @@ -1,7 +1,7 @@ import { Inject, Injectable, Logger } from '@nestjs/common'; import { JobTopicNameEnum } from '@novu/shared'; -import { QueueBaseService } from './index'; +import { QueueBaseService } from './queue-base.service'; const LOG_CONTEXT = 'StandardQueueService'; diff --git a/packages/application-generic/src/services/queues/subscriber-process-queue.service.ts b/packages/application-generic/src/services/queues/subscriber-process-queue.service.ts index fd957326701..94e5670b973 100644 --- a/packages/application-generic/src/services/queues/subscriber-process-queue.service.ts +++ b/packages/application-generic/src/services/queues/subscriber-process-queue.service.ts @@ -1,5 +1,6 @@ import { Injectable, Logger } from '@nestjs/common'; import { JobTopicNameEnum } from '@novu/shared'; + import { QueueBaseService } from './queue-base.service'; @Injectable() diff --git a/packages/application-generic/src/services/queues/web-sockets-queue.service.ts b/packages/application-generic/src/services/queues/web-sockets-queue.service.ts index 95db5d32e49..da5e4ee81cd 100644 --- a/packages/application-generic/src/services/queues/web-sockets-queue.service.ts +++ b/packages/application-generic/src/services/queues/web-sockets-queue.service.ts @@ -1,7 +1,7 @@ import { Inject, Injectable, Logger } from '@nestjs/common'; import { JobTopicNameEnum } from '@novu/shared'; -import { QueueBaseService } from './index'; +import { QueueBaseService } from './queue-base.service'; const LOG_CONTEXT = 'WebSocketsQueueService'; diff --git a/packages/application-generic/src/services/queues/workflow-queue.service.ts b/packages/application-generic/src/services/queues/workflow-queue.service.ts index da98d93d312..77d48b6de9c 100644 --- a/packages/application-generic/src/services/queues/workflow-queue.service.ts +++ b/packages/application-generic/src/services/queues/workflow-queue.service.ts @@ -1,7 +1,7 @@ import { Inject, Injectable, Logger } from '@nestjs/common'; import { JobTopicNameEnum } from '@novu/shared'; -import { QueueBaseService } from './index'; +import { QueueBaseService } from './queue-base.service'; const LOG_CONTEXT = 'WorkflowQueueService'; diff --git a/packages/application-generic/src/usecases/create-subscriber/create-subscriber.spec.ts b/packages/application-generic/src/usecases/create-subscriber/create-subscriber.spec.ts index 970fc9314d9..dafe16337c3 100644 --- a/packages/application-generic/src/usecases/create-subscriber/create-subscriber.spec.ts +++ b/packages/application-generic/src/usecases/create-subscriber/create-subscriber.spec.ts @@ -7,30 +7,29 @@ import { CreateSubscriberCommand } from './create-subscriber.command'; import { CacheService, + CacheInMemoryProviderService, InMemoryProviderEnum, InMemoryProviderService, InvalidateCacheService, } from '../../services'; import { UpdateSubscriber } from '../update-subscriber'; -const inMemoryProviderService = { - provide: InMemoryProviderService, - useFactory: async (): Promise => { - const inMemoryProvider = new InMemoryProviderService( - InMemoryProviderEnum.REDIS - ); +const cacheInMemoryProviderService = { + provide: CacheInMemoryProviderService, + useFactory: async (): Promise => { + const cacheInMemoryProvider = new CacheInMemoryProviderService(); - return inMemoryProvider; + return cacheInMemoryProvider; }, }; const cacheService = { provide: CacheService, useFactory: async () => { - const factoryInMemoryProviderService = - await inMemoryProviderService.useFactory(); + const factoryCacheInMemoryProviderService = + await cacheInMemoryProviderService.useFactory(); - const service = new CacheService(factoryInMemoryProviderService); + const service = new CacheService(factoryCacheInMemoryProviderService); await service.initialize(); return service; @@ -44,7 +43,7 @@ describe('Create Subscriber', function () { beforeEach(async () => { const moduleRef = await Test.createTestingModule({ imports: [SubscriberRepository, InvalidateCacheService], - providers: [UpdateSubscriber, inMemoryProviderService, cacheService], + providers: [UpdateSubscriber, cacheInMemoryProviderService, cacheService], }).compile(); session = new UserSession(); diff --git a/packages/application-generic/src/usecases/get-feature-flag/get-is-in-memory-cluster-mode-enabled.use-case.ts b/packages/application-generic/src/usecases/get-feature-flag/get-is-in-memory-cluster-mode-enabled.use-case.ts index cbab8036512..03572fa0dce 100644 --- a/packages/application-generic/src/usecases/get-feature-flag/get-is-in-memory-cluster-mode-enabled.use-case.ts +++ b/packages/application-generic/src/usecases/get-feature-flag/get-is-in-memory-cluster-mode-enabled.use-case.ts @@ -7,9 +7,9 @@ import { GetSystemCriticalFlag } from './get-system-critical-flag.use-case'; @Injectable() export class GetIsInMemoryClusterModeEnabled extends GetSystemCriticalFlag { execute(): boolean { - const value = - process.env.IS_IN_MEMORY_CLUSTER_MODE_ENABLED ?? - process.env.IN_MEMORY_CLUSTER_MODE_ENABLED; + const value = process.env.IS_IN_MEMORY_CLUSTER_MODE_ENABLED + ? process.env.IS_IN_MEMORY_CLUSTER_MODE_ENABLED + : process.env.IN_MEMORY_CLUSTER_MODE_ENABLED; const fallbackValue = false; const defaultValue = this.prepareBooleanStringSystemCriticalFlag( value, diff --git a/packages/application-generic/src/usecases/get-feature-flag/get-system-critical-flag.test.ts b/packages/application-generic/src/usecases/get-feature-flag/get-system-critical-flag.test.ts index 164079ace41..39bcd15cdcf 100644 --- a/packages/application-generic/src/usecases/get-feature-flag/get-system-critical-flag.test.ts +++ b/packages/application-generic/src/usecases/get-feature-flag/get-system-critical-flag.test.ts @@ -40,6 +40,18 @@ describe('Get System Critical Flag', () => { const result = getIsInMemoryClusterModeEnabled.execute(); expect(result).toEqual(true); }); + + it('should return new environment variable value when is set even if it is false', async () => { + // TODO: Temporary coexistence to replace env variable name + process.env.IS_IN_MEMORY_CLUSTER_MODE_ENABLED = 'false'; + process.env.IN_MEMORY_CLUSTER_MODE_ENABLED = 'true'; + + const getIsInMemoryClusterModeEnabled = + new GetIsInMemoryClusterModeEnabled(); + + const result = getIsInMemoryClusterModeEnabled.execute(); + expect(result).toEqual(false); + }); }); describe('SystemCriticalFlagEnum.IS_REQUEST_RATE_LIMITING_ENABLED', () => { diff --git a/packages/application-generic/src/usecases/update-subscriber/update-subscriber.spec.ts b/packages/application-generic/src/usecases/update-subscriber/update-subscriber.spec.ts index 3f0e399c135..c60ea29d3f1 100644 --- a/packages/application-generic/src/usecases/update-subscriber/update-subscriber.spec.ts +++ b/packages/application-generic/src/usecases/update-subscriber/update-subscriber.spec.ts @@ -6,19 +6,17 @@ import { UpdateSubscriber } from './update-subscriber.usecase'; import { UpdateSubscriberCommand } from './update-subscriber.command'; import { CacheService, + CacheInMemoryProviderService, InvalidateCacheService, - InMemoryProviderService, InMemoryProviderEnum, } from '../../services'; -const inMemoryProviderService = { - provide: InMemoryProviderService, - useFactory: async (): Promise => { - const inMemoryProvider = new InMemoryProviderService( - InMemoryProviderEnum.REDIS - ); +const cacheInMemoryProviderService = { + provide: CacheInMemoryProviderService, + useFactory: async (): Promise => { + const cacheInMemoryProvider = new CacheInMemoryProviderService(); - return inMemoryProvider; + return cacheInMemoryProvider; }, }; @@ -26,7 +24,7 @@ const cacheService = { provide: CacheService, useFactory: async () => { const factoryInMemoryProviderService = - await inMemoryProviderService.useFactory(); + await cacheInMemoryProviderService.useFactory(); return new CacheService(factoryInMemoryProviderService); }, @@ -39,7 +37,7 @@ describe('Update Subscriber', function () { beforeEach(async () => { const moduleRef = await Test.createTestingModule({ imports: [SubscriberRepository, InvalidateCacheService], - providers: [UpdateSubscriber, inMemoryProviderService, cacheService], + providers: [UpdateSubscriber, cacheInMemoryProviderService, cacheService], }).compile(); session = new UserSession(); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 1245d22dbca..6cd3f28c998 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -12668,7 +12668,7 @@ packages: '@types/react': optional: true dependencies: - '@babel/runtime': 7.21.0 + '@babel/runtime': 7.23.2 '@emotion/babel-plugin': 11.10.6 '@emotion/cache': 11.10.7 '@emotion/serialize': 1.1.1 @@ -25560,14 +25560,6 @@ packages: acorn: ^8 dependencies: acorn: 8.10.0 - dev: true - - /acorn-import-assertions@1.9.0(acorn@8.8.2): - resolution: {integrity: sha512-cmMwop9x+8KFhxvKrKfPYmN6/pKTYYHBqLa0DfvVZcKMJWNyWLnaqND7dx/qn66R7ewM1UX5XMaDVP5wlVTaVA==} - peerDependencies: - acorn: ^8 - dependencies: - acorn: 8.8.2 /acorn-jsx@5.3.2(acorn@7.4.1): resolution: {integrity: sha512-rq9s+JNhf0IChjtDXxllJ7g41oZk5SlXtp0LHwyA5cejwn7vKmKp4pPri6YEePv2PU65sAsegbXtIinmDFDXgQ==} @@ -31408,7 +31400,7 @@ packages: eslint: 8.51.0 eslint-import-resolver-node: 0.3.7 eslint-module-utils: 2.8.0(@typescript-eslint/parser@5.58.0)(eslint-import-resolver-node@0.3.7)(eslint-import-resolver-webpack@0.13.7)(eslint@8.51.0) - has: 1.0.3 + has: 1.0.4 is-core-module: 2.13.0 is-glob: 4.0.3 minimatch: 3.1.2 @@ -31486,7 +31478,7 @@ packages: damerau-levenshtein: 1.0.8 emoji-regex: 9.2.2 eslint: 8.51.0 - has: 1.0.3 + has: 1.0.4 jsx-ast-utils: 3.3.3 language-tags: 1.0.5 minimatch: 3.1.2 @@ -50106,8 +50098,8 @@ packages: '@webassemblyjs/ast': 1.11.1 '@webassemblyjs/wasm-edit': 1.11.1 '@webassemblyjs/wasm-parser': 1.11.1 - acorn: 8.8.2 - acorn-import-assertions: 1.9.0(acorn@8.8.2) + acorn: 8.10.0 + acorn-import-assertions: 1.9.0(acorn@8.10.0) browserslist: 4.21.10 chrome-trace-event: 1.0.3 enhanced-resolve: 5.15.0 @@ -50146,8 +50138,8 @@ packages: '@webassemblyjs/ast': 1.11.1 '@webassemblyjs/wasm-edit': 1.11.1 '@webassemblyjs/wasm-parser': 1.11.1 - acorn: 8.8.2 - acorn-import-assertions: 1.9.0(acorn@8.8.2) + acorn: 8.10.0 + acorn-import-assertions: 1.9.0(acorn@8.10.0) browserslist: 4.21.10 chrome-trace-event: 1.0.3 enhanced-resolve: 5.15.0 From bef09a4b7429a1bfbb62f61469a526d60e0ee3a4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pablo=20Fern=C3=A1ndez?= Date: Wed, 1 Nov 2023 15:43:34 +0000 Subject: [PATCH 2/2] Update packages/application-generic/src/services/in-memory-provider/cache-in-memory-provider.service.ts Co-authored-by: Richard Fontein <32132657+rifont@users.noreply.github.com> --- .../in-memory-provider/cache-in-memory-provider.service.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/application-generic/src/services/in-memory-provider/cache-in-memory-provider.service.ts b/packages/application-generic/src/services/in-memory-provider/cache-in-memory-provider.service.ts index 3344a7e24c6..7bfb925c97d 100644 --- a/packages/application-generic/src/services/in-memory-provider/cache-in-memory-provider.service.ts +++ b/packages/application-generic/src/services/in-memory-provider/cache-in-memory-provider.service.ts @@ -59,7 +59,7 @@ export class CacheInMemoryProviderService { Logger.log( this.descriptiveLogMessage( `Cluster mode ${ - isClusterModeEnabled ? 'is' : 'is not' + isClusterModeEnabled ? 'IS' : 'IS NOT' } enabled for ${LOG_CONTEXT}` ), LOG_CONTEXT