Skip to content

Commit

Permalink
Merge pull request #4733 from novuhq/nv-3136-create-dedicated-service…
Browse files Browse the repository at this point in the history
…-for-workflow-and-cache-in-memory

feat(infra): create dedicated services for in-memory for wf and cache
  • Loading branch information
Pablo Fernández authored Nov 1, 2023
2 parents 737f930 + bef09a4 commit 3f7b019
Show file tree
Hide file tree
Showing 41 changed files with 358 additions and 244 deletions.
7 changes: 3 additions & 4 deletions apps/api/src/app/blueprint/e2e/get-grouped-blueprints.e2e.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,8 @@ import {
} from '@novu/shared';
import {
buildGroupedBlueprintsKey,
CacheInMemoryProviderService,
CacheService,
InMemoryProviderEnum,
InMemoryProviderService,
InvalidateCacheService,
} from '@novu/application-generic';

Expand All @@ -34,8 +33,8 @@ describe('Get grouped notification template blueprints - /blueprints/group-by-ca
let indexModuleStub: sinon.SinonStub;

before(async () => {
const inMemoryProviderService = new InMemoryProviderService(InMemoryProviderEnum.REDIS);
const cacheService = new CacheService(inMemoryProviderService);
const cacheInMemoryProviderService = new CacheInMemoryProviderService();
const cacheService = new CacheService(cacheInMemoryProviderService);
await cacheService.initialize();
invalidateCache = new InvalidateCacheService(cacheService);

Expand Down
9 changes: 4 additions & 5 deletions apps/api/src/app/events/e2e/process-subscriber.e2e.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@ import { ChannelTypeEnum, ISubscribersDefine, IUpdateNotificationTemplateDto, St
import {
buildNotificationTemplateIdentifierKey,
buildNotificationTemplateKey,
CacheInMemoryProviderService,
CacheService,
InMemoryProviderEnum,
InMemoryProviderService,
InvalidateCacheService,
} from '@novu/application-generic';

Expand All @@ -29,15 +28,15 @@ describe('Trigger event - process subscriber /v1/events/trigger (POST)', functio
let subscriberService: SubscribersService;
let cacheService: CacheService;
let invalidateCache: InvalidateCacheService;
let inMemoryProviderService: InMemoryProviderService;
let cacheInMemoryProviderService: CacheInMemoryProviderService;

const subscriberRepository = new SubscriberRepository();
const messageRepository = new MessageRepository();
const notificationTemplateRepository = new NotificationTemplateRepository();

before(async () => {
inMemoryProviderService = new InMemoryProviderService(InMemoryProviderEnum.REDIS);
cacheService = new CacheService(inMemoryProviderService);
cacheInMemoryProviderService = new CacheInMemoryProviderService();
cacheService = new CacheService(cacheInMemoryProviderService);
await cacheService.initialize();
invalidateCache = new InvalidateCacheService(cacheService);
});
Expand Down
3 changes: 0 additions & 3 deletions apps/api/src/app/health/e2e/health-check.e2e.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
import { UserSession } from '@novu/testing';
import { InMemoryProviderEnum, InMemoryProviderService } from '@novu/application-generic';
import { expect } from 'chai';

describe('Health-check', () => {
const session = new UserSession();

before(async () => {
const inMemoryProviderService = new InMemoryProviderService(InMemoryProviderEnum.REDIS);

await session.initialize();
});

Expand Down
13 changes: 6 additions & 7 deletions apps/api/src/app/widgets/e2e/get-count.e2e.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@ import { ChannelTypeEnum, InAppProviderIdEnum } from '@novu/shared';
import {
buildFeedKey,
buildMessageCountKey,
CacheInMemoryProviderService,
CacheService,
InMemoryProviderEnum,
InMemoryProviderService,
InvalidateCacheService,
} from '@novu/application-generic';

Expand All @@ -23,11 +22,11 @@ describe('Count - GET /widget/notifications/count', function () {
} | null = null;

let invalidateCache: InvalidateCacheService;
let inMemoryProviderService: InMemoryProviderService;
let cacheInMemoryProviderService: CacheInMemoryProviderService;

before(async () => {
inMemoryProviderService = new InMemoryProviderService(InMemoryProviderEnum.REDIS);
const cacheService = new CacheService(inMemoryProviderService);
cacheInMemoryProviderService = new CacheInMemoryProviderService();
const cacheService = new CacheService(cacheInMemoryProviderService);
await cacheService.initialize();
invalidateCache = new InvalidateCacheService(cacheService);
});
Expand Down Expand Up @@ -256,7 +255,7 @@ describe('Count - GET /widget/notifications/count', function () {

const messages = await messageRepository.findBySubscriberChannel(
session.environment._id,
subscriberProfile!._id,
String(subscriberProfile?._id),
ChannelTypeEnum.IN_APP
);
const messageId = messages[0]._id;
Expand Down Expand Up @@ -319,7 +318,7 @@ describe('Count - GET /widget/notifications/count', function () {

const messages = await messageRepository.findBySubscriberChannel(
session.environment._id,
subscriberProfile!._id,
String(subscriberProfile?._id),
ChannelTypeEnum.IN_APP
);
const messageId = messages[0]._id;
Expand Down
17 changes: 8 additions & 9 deletions apps/api/src/app/widgets/e2e/get-unseen-count.e2e.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@ import { ChannelTypeEnum } from '@novu/shared';
import {
buildFeedKey,
buildMessageCountKey,
CacheInMemoryProviderService,
CacheService,
InMemoryProviderEnum,
InMemoryProviderService,
InvalidateCacheService,
} from '@novu/application-generic';

Expand All @@ -22,12 +21,12 @@ describe('Unseen Count - GET /widget/notifications/unseen', function () {
_id: string;
} | null = null;

let inMemoryProviderService: InMemoryProviderService;
let cacheInMemoryProviderService: CacheInMemoryProviderService;
let invalidateCache: InvalidateCacheService;

before(async () => {
inMemoryProviderService = new InMemoryProviderService(InMemoryProviderEnum.REDIS);
const cacheService = new CacheService(inMemoryProviderService);
cacheInMemoryProviderService = new CacheInMemoryProviderService();
const cacheService = new CacheService(cacheInMemoryProviderService);
await cacheService.initialize();
invalidateCache = new InvalidateCacheService(cacheService);
});
Expand Down Expand Up @@ -68,7 +67,7 @@ describe('Unseen Count - GET /widget/notifications/unseen', function () {

const messages = await messageRepository.findBySubscriberChannel(
session.environment._id,
subscriberProfile!._id,
String(subscriberProfile?._id),
ChannelTypeEnum.IN_APP
);
const messageId = messages[0]._id;
Expand Down Expand Up @@ -97,7 +96,7 @@ describe('Unseen Count - GET /widget/notifications/unseen', function () {

const messages = await messageRepository.findBySubscriberChannel(
session.environment._id,
subscriberProfile!._id,
String(subscriberProfile?._id),
ChannelTypeEnum.IN_APP
);
const messageId = messages[0]._id;
Expand Down Expand Up @@ -126,7 +125,7 @@ describe('Unseen Count - GET /widget/notifications/unseen', function () {

const messages = await messageRepository.findBySubscriberChannel(
session.environment._id,
subscriberProfile!._id,
String(subscriberProfile?._id),
ChannelTypeEnum.IN_APP
);
const messageId = messages[0]._id;
Expand Down Expand Up @@ -155,7 +154,7 @@ describe('Unseen Count - GET /widget/notifications/unseen', function () {

const messages = await messageRepository.findBySubscriberChannel(
session.environment._id,
subscriberProfile!._id,
String(subscriberProfile?._id),
ChannelTypeEnum.IN_APP
);
const messageId = messages[0]._id;
Expand Down
7 changes: 3 additions & 4 deletions apps/api/src/app/widgets/e2e/initialize-widget-session.e2e.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@ import { expect } from 'chai';
import { createHash } from '../../shared/helpers/hmac.service';
import {
buildIntegrationKey,
CacheInMemoryProviderService,
CacheService,
InMemoryProviderService,
InvalidateCacheService,
InMemoryProviderEnum,
} from '@novu/application-generic';

const integrationRepository = new IntegrationRepository();
Expand All @@ -19,8 +18,8 @@ describe('Initialize Session - /widgets/session/initialize (POST)', async () =>
let invalidateCache: InvalidateCacheService;

before(async () => {
const inMemoryProviderService = new InMemoryProviderService(InMemoryProviderEnum.REDIS);
const cacheService = new CacheService(inMemoryProviderService);
const cacheInMemoryProviderService = new CacheInMemoryProviderService();
const cacheService = new CacheService(cacheInMemoryProviderService);
await cacheService.initialize();
invalidateCache = new InvalidateCacheService(cacheService);
});
Expand Down
4 changes: 0 additions & 4 deletions apps/worker/src/app/health/e2e/health-check.e2e.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { InMemoryProviderEnum, InMemoryProviderService } from '@novu/application-generic';
import { expect } from 'chai';
import * as request from 'supertest';
import * as defaults from 'superagent-defaults';
Expand All @@ -7,9 +6,6 @@ describe('Health-check', () => {
let testAgent;

before(async () => {
const inMemoryProviderService = new InMemoryProviderService(InMemoryProviderEnum.REDIS);
await inMemoryProviderService.delayUntilReadiness();

testAgent = defaults(request(`http://localhost:${process.env.PORT}`));
});

Expand Down
35 changes: 14 additions & 21 deletions packages/application-generic/src/custom-providers/index.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
import {
AnalyticsService,
BullMqService,
CacheInMemoryProviderService,
CacheService,
DistributedLockService,
FeatureFlagsService,
InMemoryProviderEnum,
InMemoryProviderService,
ReadinessService,
OldInstanceBullMqService,
StandardQueueService,
SubscriberProcessQueueService,
WebSocketsQueueService,
WorkflowQueueService,
} from '../services';
import {
GetIsTopicNotificationEnabled,
GetUseMergedDigestId,
} from '../usecases';
import { SubscriberProcessQueueService } from '../services/queues/subscriber-process-queue.service';

export const featureFlagsService = {
provide: FeatureFlagsService,
Expand Down Expand Up @@ -52,13 +51,10 @@ export const getIsTopicNotificationEnabled = {
inject: [FeatureFlagsService],
};

export const inMemoryProviderService = {
provide: InMemoryProviderService,
useFactory: (
provider: InMemoryProviderEnum,
enableAutoPipelining?: boolean
): InMemoryProviderService => {
return new InMemoryProviderService(provider, enableAutoPipelining);
export const cacheInMemoryProviderService = {
provide: CacheInMemoryProviderService,
useFactory: (): CacheInMemoryProviderService => {
return new CacheInMemoryProviderService();
},
};

Expand Down Expand Up @@ -87,14 +83,10 @@ export const oldInstanceBullMqService = {
export const cacheService = {
provide: CacheService,
useFactory: async (): Promise<CacheService> => {
const enableAutoPipelining =
process.env.REDIS_CACHE_ENABLE_AUTOPIPELINING === 'false';
const factoryInMemoryProviderService = inMemoryProviderService.useFactory(
InMemoryProviderEnum.ELASTICACHE,
enableAutoPipelining
);
const factoryCacheInMemoryProviderService =
cacheInMemoryProviderService.useFactory();

const service = new CacheService(factoryInMemoryProviderService);
const service = new CacheService(factoryCacheInMemoryProviderService);

await service.initialize();

Expand All @@ -115,11 +107,12 @@ export const analyticsService = {
export const distributedLockService = {
provide: DistributedLockService,
useFactory: async (): Promise<DistributedLockService> => {
const factoryInMemoryProviderService = inMemoryProviderService.useFactory(
InMemoryProviderEnum.ELASTICACHE
);
const factoryCacheInMemoryProviderService =
cacheInMemoryProviderService.useFactory();

const service = new DistributedLockService(factoryInMemoryProviderService);
const service = new DistributedLockService(
factoryCacheInMemoryProviderService
);

await service.initialize();

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Injectable } from '@nestjs/common';

import { ActiveJobsMetricQueueService } from '../services';
import { ActiveJobsMetricQueueService } from '../services/queues';
import { QueueHealthIndicator } from './queue-health-indicator.service';

const LOG_CONTEXT = 'ActiveJobsMetricQueueServiceHealthIndicator';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import {
} from '@nestjs/terminus';
import { Injectable, Logger } from '@nestjs/common';

import { CacheService } from '../services';
import { CacheService } from '../services/cache';

const LOG_CONTEXT = 'CacheServiceHealthIndicator';

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Injectable } from '@nestjs/common';

import { CompletedJobsMetricQueueService } from '../services';
import { CompletedJobsMetricQueueService } from '../services/queues';
import { QueueHealthIndicator } from './queue-health-indicator.service';

const LOG_CONTEXT = 'CompletedJobsMetricQueueServiceHealthIndicator';
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Injectable } from '@nestjs/common';

import { InboundParseQueue } from '../services';
import { InboundParseQueue } from '../services/queues';
import { QueueHealthIndicator } from './queue-health-indicator.service';

const LOG_CONTEXT = 'InboundParseQueueServiceHealthIndicator';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import {
} from '@nestjs/terminus';
import { Injectable, Logger } from '@nestjs/common';

import { QueueBaseService } from '../services';
import { QueueBaseService } from '../services/queues/queue-base.service';
import { IHealthIndicator } from './health-indicator.interface';

@Injectable()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Injectable } from '@nestjs/common';

import { StandardQueueService } from '../services';
import { StandardQueueService } from '../services/queues';
import { QueueHealthIndicator } from './queue-health-indicator.service';

const LOG_CONTEXT = 'StandardQueueServiceHealthIndicator';
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Injectable } from '@nestjs/common';

import { SubscriberProcessQueueService } from '../services';
import { SubscriberProcessQueueService } from '../services/queues';
import { ObservabilityBackgroundTransactionEnum } from '@novu/shared';
import { QueueHealthIndicator } from './queue-health-indicator.service';

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Injectable } from '@nestjs/common';

import { WebSocketsQueueService } from '../services';
import { WebSocketsQueueService } from '../services/queues';
import { QueueHealthIndicator } from './queue-health-indicator.service';

const LOG_CONTEXT = 'WebSocketsQueueServiceHealthIndicator';
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Injectable } from '@nestjs/common';

import { WorkflowQueueService } from '../services';
import { WorkflowQueueService } from '../services/queues';
import { QueueHealthIndicator } from './queue-health-indicator.service';

const LOG_CONTEXT = 'WorkflowQueueServiceHealthIndicator';
Expand Down
7 changes: 3 additions & 4 deletions packages/application-generic/src/modules/queues.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
CompletedJobsMetricQueueServiceHealthIndicator,
InboundParseQueueServiceHealthIndicator,
StandardQueueServiceHealthIndicator,
SubscriberProcessQueueHealthIndicator,
WebSocketsQueueServiceHealthIndicator,
WorkflowQueueServiceHealthIndicator,
} from '../health';
Expand All @@ -15,6 +16,7 @@ import {
CompletedJobsMetricQueueService,
InboundParseQueue,
StandardQueueService,
SubscriberProcessQueueService,
WebSocketsQueueService,
WorkflowQueueService,
} from '../services/queues';
Expand All @@ -23,16 +25,13 @@ import {
CompletedJobsMetricWorkerService,
InboundParseWorker,
StandardWorkerService,
SubscriberProcessWorkerService,
WebSocketsWorkerService,
WorkflowWorkerService,
OldInstanceStandardWorkerService,
OldInstanceWorkflowWorkerService,
} from '../services/workers';

import { SubscriberProcessQueueService } from '../services/queues/subscriber-process-queue.service';
import { SubscriberProcessWorkerService } from '../services/workers/subscriber-process-worker.service';
import { SubscriberProcessQueueHealthIndicator } from '../health/subscriber-process-queue.health-indicator';

const PROVIDERS: Provider[] = [
ActiveJobsMetricQueueService,
ActiveJobsMetricQueueServiceHealthIndicator,
Expand Down
Loading

0 comments on commit 3f7b019

Please sign in to comment.