Skip to content

Commit

Permalink
refactor(worker): singleton queue and fix inject issues with queues a…
Browse files Browse the repository at this point in the history
…nd workers (#4929)

* refactor(worker): singleton queue and fix inject issues

* fix: tests

* fix: tests for ws

* fix: ws service bootstrap

* fix: ws socket initialization

* fix: inbound mail service

* fix: ws tests

* fix: inbound mail

* fix: tests

* refactor: extract active workers

* fix: remove test

* fix: worker test

* feat: add cluster retry strategt

* fix: queue for api

* refactor: remove baseapi queues

* fix: remove other queu module instances

* fix: remove log

* fix: remove unused inboud parse worker service

* fix: use app shutdown toclose redis

* fix: dedicate memory instance for workers

* fix: flaky test

* refactor: rename active workers naming

* fix: pr refactor

* revert: worker creation

* feat: add types to worker init config

---------

Co-authored-by: Gosha <[email protected]>
  • Loading branch information
scopsy and djabarovgeorge authored Dec 3, 2023
1 parent 3e9ec4e commit b5e152f
Show file tree
Hide file tree
Showing 77 changed files with 679 additions and 926 deletions.
2 changes: 1 addition & 1 deletion apps/api/src/.env.test
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ BLUEPRINT_CREATOR=645b648b36dd6d25f8650d37

CLIENT_SUCCESS_AUTH_REDIRECT=http://localhost:4200/auth/login

MONGO_URL=mongodb://localhost:27017/novu-test
MONGO_URL=mongodb://127.0.0.1:27017/novu-test
REDIS_PORT=6379
REDIS_HOST=localhost
REDIS_PREFIX=
Expand Down
3 changes: 1 addition & 2 deletions apps/api/src/app/events/events.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import {
GetNovuProviderCredentials,
StorageHelperService,
SendTestEmail,
BaseApiQueuesModule,
} from '@novu/application-generic';

import { EventsController } from './events.controller';
Expand All @@ -24,6 +23,7 @@ import { ExecutionDetailsModule } from '../execution-details/execution-details.m
import { TopicsModule } from '../topics/topics.module';
import { LayoutsModule } from '../layouts/layouts.module';
import { TenantModule } from '../tenant/tenant.module';
import { JobTopicNameEnum } from '@novu/shared';

const PROVIDERS = [
CreateExecutionDetails,
Expand All @@ -47,7 +47,6 @@ const PROVIDERS = [
TopicsModule,
LayoutsModule,
TenantModule,
BaseApiQueuesModule,
],
controllers: [EventsController],
providers: [...PROVIDERS, ...USE_CASES],
Expand Down
4 changes: 1 addition & 3 deletions apps/api/src/app/health/health.module.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
import { Module } from '@nestjs/common';
import { TerminusModule } from '@nestjs/terminus';

import { BaseApiQueuesModule } from '@novu/application-generic';

import { HealthController } from './health.controller';
import { SharedModule } from '../shared/shared.module';

@Module({
imports: [SharedModule, TerminusModule, BaseApiQueuesModule],
imports: [SharedModule, TerminusModule],
controllers: [HealthController],
providers: [],
})
Expand Down
7 changes: 3 additions & 4 deletions apps/api/src/app/inbound-parse/inbound-parse.module.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
import { MiddlewareConsumer, Module, NestModule } from '@nestjs/common';
import { BaseApiQueuesModule, CompileTemplate } from '@novu/application-generic';
import { CompileTemplate } from '@novu/application-generic';

import { USE_CASES } from './usecases';
import { InboundParseController } from './inbound-parse.controller';
import { InboundParseQueueService } from './services/inbound-parse.queue.service';
import { GetMxRecord } from './usecases/get-mx-record/get-mx-record.usecase';

import { SharedModule } from '../shared/shared.module';
import { AuthModule } from '../auth/auth.module';

const PROVIDERS = [InboundParseQueueService, GetMxRecord, CompileTemplate];
const PROVIDERS = [GetMxRecord, CompileTemplate];

@Module({
imports: [SharedModule, AuthModule, BaseApiQueuesModule],
imports: [SharedModule, AuthModule],
controllers: [InboundParseController],
providers: [...PROVIDERS, ...USE_CASES],
exports: [...USE_CASES],
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
import {
BullMqService,
getInboundParseMailWorkerOptions,
InboundParseQueueService as InboundParseQueue,
InboundParseWorker,
Queue,
QueueOptions,
Worker,
WorkerBaseService,
WorkerOptions,
WorkflowInMemoryProviderService,
} from '@novu/application-generic';
import { JobTopicNameEnum } from '@novu/shared';
import { Injectable, Logger } from '@nestjs/common';
Expand All @@ -16,17 +14,14 @@ import { InboundEmailParseCommand } from '../usecases/inbound-email-parse/inboun
const LOG_CONTEXT = 'InboundParseQueueService';

@Injectable()
export class InboundParseQueueService {
public readonly queue: Queue;
public readonly worker: Worker;

export class InboundParseWorkerService extends WorkerBaseService {
constructor(
private emailParseUsecase: InboundEmailParse,
public readonly inboundParseQueue: InboundParseQueue,
public readonly inboundParseWorker: InboundParseWorker
public workflowInMemoryProviderService: WorkflowInMemoryProviderService
) {
this.inboundParseQueue.createQueue();
this.inboundParseWorker.createWorker(this.getWorkerProcessor(), this.getWorkerOptions());
super(JobTopicNameEnum.INBOUND_PARSE_MAIL, new BullMqService(workflowInMemoryProviderService));

this.createWorker(this.getWorkerProcessor(), this.getWorkerOptions());
}

private getWorkerOptions(): WorkerOptions {
Expand Down
3 changes: 2 additions & 1 deletion apps/api/src/app/integrations/integrations.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ import { USE_CASES } from './usecases';
import { IntegrationsController } from './integrations.controller';
import { AuthModule } from '../auth/auth.module';
import { CompileTemplate, CreateExecutionDetails, QueuesModule } from '@novu/application-generic';
import { JobTopicNameEnum } from '@novu/shared';

@Module({
imports: [SharedModule, QueuesModule, forwardRef(() => AuthModule)],
imports: [SharedModule, forwardRef(() => AuthModule)],
controllers: [IntegrationsController],
providers: [...USE_CASES, CompileTemplate, CreateExecutionDetails],
exports: [...USE_CASES],
Expand Down
39 changes: 24 additions & 15 deletions apps/api/src/app/shared/shared.module.ts
Original file line number Diff line number Diff line change
@@ -1,27 +1,27 @@
import { Module } from '@nestjs/common';
import {
ChangeRepository,
DalService,
UserRepository,
OrganizationRepository,
EnvironmentRepository,
ExecutionDetailsRepository,
NotificationTemplateRepository,
SubscriberRepository,
NotificationRepository,
MessageRepository,
NotificationGroupRepository,
MessageTemplateRepository,
MemberRepository,
LayoutRepository,
LogRepository,
FeedRepository,
IntegrationRepository,
ChangeRepository,
JobRepository,
FeedRepository,
LayoutRepository,
LogRepository,
MemberRepository,
MessageRepository,
MessageTemplateRepository,
NotificationGroupRepository,
NotificationRepository,
NotificationTemplateRepository,
OrganizationRepository,
SubscriberPreferenceRepository,
SubscriberRepository,
TenantRepository,
TopicRepository,
TopicSubscribersRepository,
TenantRepository,
UserRepository,
WorkflowOverrideRepository,
} from '@novu/dal';
import {
Expand All @@ -36,10 +36,12 @@ import {
getIsTopicNotificationEnabled,
InvalidateCacheService,
LoggerModule,
QueuesModule,
storageService,
} from '@novu/application-generic';

import * as packageJson from '../../../package.json';
import { JobTopicNameEnum } from '@novu/shared';

const DAL_MODELS = [
UserRepository,
Expand Down Expand Up @@ -93,6 +95,13 @@ const PROVIDERS = [

@Module({
imports: [
QueuesModule.forRoot([
JobTopicNameEnum.EXECUTION_LOG,
JobTopicNameEnum.WORKFLOW,
JobTopicNameEnum.WEB_SOCKETS,
JobTopicNameEnum.WORKFLOW,
JobTopicNameEnum.INBOUND_PARSE_MAIL,
]),
LoggerModule.forRoot(
createNestLoggingModuleOptions({
serviceName: packageJson.name,
Expand All @@ -101,6 +110,6 @@ const PROVIDERS = [
),
],
providers: [...PROVIDERS],
exports: [...PROVIDERS, LoggerModule],
exports: [...PROVIDERS, LoggerModule, QueuesModule],
})
export class SharedModule {}
4 changes: 1 addition & 3 deletions apps/api/src/app/widgets/widgets.module.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import { Module } from '@nestjs/common';

import { BaseApiQueuesModule } from '@novu/application-generic';

import { USE_CASES } from './usecases';
import { WidgetsController } from './widgets.controller';
import { SharedModule } from '../shared/shared.module';
Expand All @@ -10,7 +8,7 @@ import { SubscribersModule } from '../subscribers/subscribers.module';
import { IntegrationModule } from '../integrations/integrations.module';

@Module({
imports: [SharedModule, SubscribersModule, AuthModule, IntegrationModule, BaseApiQueuesModule],
imports: [SharedModule, SubscribersModule, AuthModule, IntegrationModule],
providers: [...USE_CASES],
exports: [...USE_CASES],
controllers: [WidgetsController],
Expand Down
1 change: 0 additions & 1 deletion apps/inbound-mail/src/server/inbound-mail.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ describe('Inbound Mail Service', () => {

it('should be initialised properly', async () => {
expect(inboundMailService).to.be.ok;
expect(inboundMailService).to.have.all.keys('inboundParseQueueService');
expect(inboundMailService.inboundParseQueueService.DEFAULT_ATTEMPTS).to.equal(3);
expect(inboundMailService.inboundParseQueueService.topic).to.equal('inbound-parse-mail');
expect(await inboundMailService.inboundParseQueueService.bullMqService.getStatus()).to.deep.equal({
Expand Down
16 changes: 13 additions & 3 deletions apps/inbound-mail/src/server/inbound-mail.service.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,20 @@
import { BullMqService, InboundParseQueueService, QueueBaseOptions } from '@novu/application-generic';
import {
BullMqService,
InboundParseQueueService,
QueueBaseOptions,
WorkflowInMemoryProviderService,
} from '@novu/application-generic';
import { JobTopicNameEnum } from '@novu/shared';

export class InboundMailService {
public inboundParseQueueService: InboundParseQueueService;

private workflowInMemoryProviderService: WorkflowInMemoryProviderService;
constructor() {
this.inboundParseQueueService = new InboundParseQueueService();
this.workflowInMemoryProviderService = new WorkflowInMemoryProviderService();
this.inboundParseQueueService = new InboundParseQueueService(this.workflowInMemoryProviderService);
}

async start() {
await this.workflowInMemoryProviderService.initialize();
}
}
4 changes: 3 additions & 1 deletion apps/inbound-mail/src/server/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class Mailin extends events.EventEmitter {
this._smtp = null;
}

public start(options: object, callback: (err?) => void) {
public async start(options: object, callback: (err?) => void) {
// eslint-disable-next-line @typescript-eslint/no-this-alias
const _this = this;

Expand Down Expand Up @@ -447,6 +447,8 @@ class Mailin extends events.EventEmitter {
onRcptTo: onRcptTo,
});

await inboundMailService.start();

const server = new SMTPServer(smtpOptions);

this._smtp = server;
Expand Down
2 changes: 1 addition & 1 deletion apps/worker/src/.env.test
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ REDIS_CLUSTER_FAMILY=
REDIS_CLUSTER_KEY_PREFIX=

# MongoDB
MONGO_URL=mongodb://localhost:27017/novu-test
MONGO_URL=mongodb://127.0.0.1:27017/novu-test

# Storage
S3_LOCAL_STACK=http://localhost:4566
Expand Down
26 changes: 6 additions & 20 deletions apps/worker/src/app/health/health.controller.ts
Original file line number Diff line number Diff line change
@@ -1,40 +1,26 @@
import { Controller, Get } from '@nestjs/common';
import { Controller, Get, Inject } from '@nestjs/common';
import { ApiExcludeController } from '@nestjs/swagger';
import { HealthCheck, HealthCheckResult, HealthCheckService } from '@nestjs/terminus';
import {
DalServiceHealthIndicator,
StandardQueueServiceHealthIndicator,
WorkflowQueueServiceHealthIndicator,
ActiveJobsMetricQueueServiceHealthIndicator,
CompletedJobsMetricQueueServiceHealthIndicator,
SubscriberProcessQueueHealthIndicator,
} from '@novu/application-generic';
import { DalServiceHealthIndicator, QueueHealthIndicator } from '@novu/application-generic';

import { version } from '../../../package.json';

@Controller('health-check')
@ApiExcludeController()
export class HealthController {
constructor(
@Inject('QUEUE_HEALTH_INDICATORS')
private healthIndicators: QueueHealthIndicator[],
private healthCheckService: HealthCheckService,
private dalHealthIndicator: DalServiceHealthIndicator,
private standardQueueHealthIndicator: StandardQueueServiceHealthIndicator,
private workflowQueueHealthIndicator: WorkflowQueueServiceHealthIndicator,
private activeJobsMetricQueueServiceHealthIndicator: ActiveJobsMetricQueueServiceHealthIndicator,
private completedJobsMetricQueueServiceHealthIndicator: CompletedJobsMetricQueueServiceHealthIndicator,
private subscriberProcessQueueHealthIndicator: SubscriberProcessQueueHealthIndicator
private dalHealthIndicator: DalServiceHealthIndicator
) {}

@Get()
@HealthCheck()
healthCheck(): Promise<HealthCheckResult> {
return this.healthCheckService.check([
async () => this.dalHealthIndicator.isHealthy(),
async () => this.standardQueueHealthIndicator.isActive(),
async () => this.workflowQueueHealthIndicator.isActive(),
async () => this.activeJobsMetricQueueServiceHealthIndicator.isActive(),
async () => this.completedJobsMetricQueueServiceHealthIndicator.isActive(),
async () => this.subscriberProcessQueueHealthIndicator.isActive(),
...this.healthIndicators.map((indicator) => async () => indicator.isHealthy()),
async () => {
return {
apiVersion: {
Expand Down
3 changes: 1 addition & 2 deletions apps/worker/src/app/health/health.module.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
import { Module } from '@nestjs/common';
import { TerminusModule } from '@nestjs/terminus';
import { QueuesModule } from '@novu/application-generic';

import { HealthController } from './health.controller';

import { SharedModule } from '../shared/shared.module';

@Module({
imports: [SharedModule, TerminusModule, QueuesModule],
imports: [SharedModule, TerminusModule],
controllers: [HealthController],
providers: [],
})
Expand Down
Loading

0 comments on commit b5e152f

Please sign in to comment.