From c599eeed27f642924bb124df2b32988db379e979 Mon Sep 17 00:00:00 2001 From: Gosha Date: Sun, 15 Oct 2023 12:32:21 +0300 Subject: [PATCH] feat: refactor the broadcast method to utilize the new queue --- apps/api/src/app/events/events.controller.ts | 19 ++---------- apps/api/src/app/events/events.module.ts | 30 ++----------------- .../process-bulk-trigger.usecase.ts | 3 +- .../trigger-event-to-all.usecase.ts | 13 ++++---- apps/api/src/app/health/health.controller.ts | 2 -- apps/api/src/app/health/health.module.ts | 5 ++-- .../app/inbound-parse/inbound-parse.module.ts | 7 ++--- apps/api/src/app/shared/shared.module.ts | 3 +- apps/api/src/app/widgets/widgets.module.ts | 4 ++- .../application-generic/src/modules/index.ts | 1 + .../src/modules/queues.module.ts | 15 ++++++++++ 11 files changed, 38 insertions(+), 64 deletions(-) diff --git a/apps/api/src/app/events/events.controller.ts b/apps/api/src/app/events/events.controller.ts index 47ddb117449..81de457dd03 100644 --- a/apps/api/src/app/events/events.controller.ts +++ b/apps/api/src/app/events/events.controller.ts @@ -1,14 +1,8 @@ import { Body, Controller, Delete, Param, Post, Scope, UseGuards } from '@nestjs/common'; import { ApiOkResponse, ApiExcludeEndpoint, ApiOperation, ApiTags } from '@nestjs/swagger'; import { v4 as uuidv4 } from 'uuid'; -import { - IJwtPayload, - ISubscribersDefine, - ITenantDefine, - TriggerRecipientSubscriber, - TriggerTenantContext, -} from '@novu/shared'; -import { MapTriggerRecipients, SendTestEmail, SendTestEmailCommand } from '@novu/application-generic'; +import { IJwtPayload, ITenantDefine, TriggerTenantContext } from '@novu/shared'; +import { SendTestEmail, SendTestEmailCommand } from '@novu/application-generic'; import { BulkTriggerEventDto, @@ -35,7 +29,6 @@ import { DataBooleanDto } from '../shared/dtos/data-wrapper-dto'; @ApiTags('Events') export class EventsController { constructor( - private mapTriggerRecipients: MapTriggerRecipients, private cancelDelayedUsecase: CancelDelayed, private triggerEventToAll: TriggerEventToAll, private sendTestEmail: SendTestEmail, @@ -118,7 +111,7 @@ export class EventsController { @Body() body: TriggerEventToAllRequestDto ): Promise { const transactionId = body.transactionId || uuidv4(); - const mappedActor = body.actor ? this.mapActor(body.actor) : null; + const mappedActor = typeof body.actor === 'string' ? { subscriberId: body.actor } : null; const mappedTenant = body.tenant ? this.mapTenant(body.tenant) : null; return this.triggerEventToAll.execute( @@ -183,12 +176,6 @@ export class EventsController { ); } - private mapActor(actor?: TriggerRecipientSubscriber | null): ISubscribersDefine | null { - if (!actor) return null; - - return this.mapTriggerRecipients.mapSubscriber(actor); - } - private mapTenant(tenant?: TriggerTenantContext | null): ITenantDefine | null { if (!tenant) return null; diff --git a/apps/api/src/app/events/events.module.ts b/apps/api/src/app/events/events.module.ts index 2df63fe6d4f..c11fcf92613 100644 --- a/apps/api/src/app/events/events.module.ts +++ b/apps/api/src/app/events/events.module.ts @@ -2,25 +2,12 @@ import { Module } from '@nestjs/common'; import { TerminusModule } from '@nestjs/terminus'; import { - AddJob, - AddDelayJob, - MergeOrCreateDigest, CreateExecutionDetails, - CreateNotificationJobs, - DigestFilterSteps, - DigestFilterStepsBackoff, - DigestFilterStepsRegular, - DigestFilterStepsTimed, EventsDistributedLockService, GetNovuProviderCredentials, - ProcessSubscriber, - ProcessTenant, - QueuesModule, StorageHelperService, SendTestEmail, - StoreSubscriberJobs, - TriggerEvent, - MapTriggerRecipients, + BaseAppQueuesModule, } from '@novu/application-generic'; import { EventsController } from './events.controller'; @@ -39,24 +26,11 @@ import { LayoutsModule } from '../layouts/layouts.module'; import { TenantModule } from '../tenant/tenant.module'; const PROVIDERS = [ - AddJob, - AddDelayJob, - MergeOrCreateDigest, CreateExecutionDetails, - CreateNotificationJobs, - DigestFilterSteps, - DigestFilterStepsBackoff, - DigestFilterStepsRegular, - DigestFilterStepsTimed, GetNovuProviderCredentials, StorageHelperService, EventsDistributedLockService, - ProcessSubscriber, - ProcessTenant, SendTestEmail, - StoreSubscriberJobs, - TriggerEvent, - MapTriggerRecipients, ]; @Module({ @@ -73,7 +47,7 @@ const PROVIDERS = [ TopicsModule, LayoutsModule, TenantModule, - QueuesModule, + BaseAppQueuesModule, ], controllers: [EventsController], providers: [...PROVIDERS, ...USE_CASES], diff --git a/apps/api/src/app/events/usecases/process-bulk-trigger/process-bulk-trigger.usecase.ts b/apps/api/src/app/events/usecases/process-bulk-trigger/process-bulk-trigger.usecase.ts index 56751f3acd9..ea2c06ddfce 100644 --- a/apps/api/src/app/events/usecases/process-bulk-trigger/process-bulk-trigger.usecase.ts +++ b/apps/api/src/app/events/usecases/process-bulk-trigger/process-bulk-trigger.usecase.ts @@ -1,6 +1,5 @@ import { Injectable } from '@nestjs/common'; import { TriggerEventStatusEnum } from '@novu/shared'; -import { MapTriggerRecipients } from '@novu/application-generic'; import { ProcessBulkTriggerCommand } from './process-bulk-trigger.command'; @@ -10,7 +9,7 @@ import { ParseEventRequest } from '../parse-event-request/parse-event-request.us @Injectable() export class ProcessBulkTrigger { - constructor(private parseEventRequest: ParseEventRequest, private mapTriggerRecipients: MapTriggerRecipients) {} + constructor(private parseEventRequest: ParseEventRequest) {} async execute(command: ProcessBulkTriggerCommand) { const results: TriggerEventResponseDto[] = []; diff --git a/apps/api/src/app/events/usecases/trigger-event-to-all/trigger-event-to-all.usecase.ts b/apps/api/src/app/events/usecases/trigger-event-to-all/trigger-event-to-all.usecase.ts index e0a5ea90276..3e85d520e7b 100644 --- a/apps/api/src/app/events/usecases/trigger-event-to-all/trigger-event-to-all.usecase.ts +++ b/apps/api/src/app/events/usecases/trigger-event-to-all/trigger-event-to-all.usecase.ts @@ -1,14 +1,13 @@ import { Injectable } from '@nestjs/common'; -import * as _ from 'lodash'; import { SubscriberEntity, SubscriberRepository } from '@novu/dal'; -import { TriggerEvent, TriggerEventCommand } from '@novu/application-generic'; import { TriggerEventStatusEnum } from '@novu/shared'; import { TriggerEventToAllCommand } from './trigger-event-to-all.command'; +import { ParseEventRequest, ParseEventRequestCommand } from '../parse-event-request'; @Injectable() export class TriggerEventToAll { - constructor(private triggerEvent: TriggerEvent, private subscriberRepository: SubscriberRepository) {} + constructor(private subscriberRepository: SubscriberRepository, private parseEventRequest: ParseEventRequest) {} public async execute(command: TriggerEventToAllCommand) { const batchSize = 500; @@ -42,18 +41,18 @@ export class TriggerEventToAll { } private async trigger(command: TriggerEventToAllCommand, list: SubscriberEntity[]) { - await this.triggerEvent.execute( - TriggerEventCommand.create({ + await this.parseEventRequest.execute( + ParseEventRequestCommand.create({ userId: command.userId, environmentId: command.environmentId, organizationId: command.organizationId, identifier: command.identifier, - payload: command.payload, + payload: command.payload || {}, to: list.map((item) => ({ subscriberId: item.subscriberId, })), transactionId: command.transactionId, - overrides: command.overrides, + overrides: command.overrides || {}, actor: command.actor, tenant: command.tenant, }) diff --git a/apps/api/src/app/health/health.controller.ts b/apps/api/src/app/health/health.controller.ts index e877c785552..ee4f2b7f681 100644 --- a/apps/api/src/app/health/health.controller.ts +++ b/apps/api/src/app/health/health.controller.ts @@ -17,7 +17,6 @@ export class HealthController { private healthCheckService: HealthCheckService, private cacheHealthIndicator: CacheServiceHealthIndicator, private dalHealthIndicator: DalServiceHealthIndicator, - private standardQueueHealthIndicator: StandardQueueServiceHealthIndicator, private workflowQueueHealthIndicator: WorkflowQueueServiceHealthIndicator ) {} @@ -26,7 +25,6 @@ export class HealthController { healthCheck(): Promise { const checks: HealthIndicatorFunction[] = [ async () => this.dalHealthIndicator.isHealthy(), - async () => this.standardQueueHealthIndicator.isHealthy(), async () => this.workflowQueueHealthIndicator.isHealthy(), async () => { return { diff --git a/apps/api/src/app/health/health.module.ts b/apps/api/src/app/health/health.module.ts index 533bb1810e1..00bfa8621bb 100644 --- a/apps/api/src/app/health/health.module.ts +++ b/apps/api/src/app/health/health.module.ts @@ -1,12 +1,13 @@ import { Module } from '@nestjs/common'; import { TerminusModule } from '@nestjs/terminus'; -import { HealthController } from './health.controller'; +import { BaseAppQueuesModule } from '@novu/application-generic'; +import { HealthController } from './health.controller'; import { SharedModule } from '../shared/shared.module'; @Module({ - imports: [SharedModule, TerminusModule], + imports: [SharedModule, TerminusModule, BaseAppQueuesModule], controllers: [HealthController], providers: [], }) diff --git a/apps/api/src/app/inbound-parse/inbound-parse.module.ts b/apps/api/src/app/inbound-parse/inbound-parse.module.ts index 18954928e0a..d2014276fca 100644 --- a/apps/api/src/app/inbound-parse/inbound-parse.module.ts +++ b/apps/api/src/app/inbound-parse/inbound-parse.module.ts @@ -1,11 +1,10 @@ import { MiddlewareConsumer, Module, NestModule } from '@nestjs/common'; -import { CompileTemplate, QueuesModule } from '@novu/application-generic'; +import { BaseAppQueuesModule, CompileTemplate } from '@novu/application-generic'; import { USE_CASES } from './usecases'; import { InboundParseController } from './inbound-parse.controller'; import { InboundParseQueueService } from './services/inbound-parse.queue.service'; import { GetMxRecord } from './usecases/get-mx-record/get-mx-record.usecase'; -import { InboundEmailParse } from './usecases/inbound-email-parse/inbound-email-parse.usecase'; import { SharedModule } from '../shared/shared.module'; import { AuthModule } from '../auth/auth.module'; @@ -13,10 +12,10 @@ import { AuthModule } from '../auth/auth.module'; const PROVIDERS = [InboundParseQueueService, GetMxRecord, CompileTemplate]; @Module({ - imports: [SharedModule, AuthModule, QueuesModule], + imports: [SharedModule, AuthModule, BaseAppQueuesModule], controllers: [InboundParseController], providers: [...PROVIDERS, ...USE_CASES], - exports: [...USE_CASES, QueuesModule], + exports: [...USE_CASES], }) export class InboundParseModule implements NestModule { configure(consumer: MiddlewareConsumer): MiddlewareConsumer | void {} diff --git a/apps/api/src/app/shared/shared.module.ts b/apps/api/src/app/shared/shared.module.ts index eadfc5b2318..c0d2d7f993d 100644 --- a/apps/api/src/app/shared/shared.module.ts +++ b/apps/api/src/app/shared/shared.module.ts @@ -100,9 +100,8 @@ const PROVIDERS = [ version: packageJson.version, }) ), - QueuesModule, ], providers: [...PROVIDERS], - exports: [...PROVIDERS, LoggerModule, QueuesModule], + exports: [...PROVIDERS, LoggerModule], }) export class SharedModule {} diff --git a/apps/api/src/app/widgets/widgets.module.ts b/apps/api/src/app/widgets/widgets.module.ts index 43419f3ada8..0db3fc7b672 100644 --- a/apps/api/src/app/widgets/widgets.module.ts +++ b/apps/api/src/app/widgets/widgets.module.ts @@ -1,5 +1,7 @@ import { Module } from '@nestjs/common'; +import { BaseAppQueuesModule } from '@novu/application-generic'; + import { USE_CASES } from './usecases'; import { WidgetsController } from './widgets.controller'; import { SharedModule } from '../shared/shared.module'; @@ -8,7 +10,7 @@ import { SubscribersModule } from '../subscribers/subscribers.module'; import { IntegrationModule } from '../integrations/integrations.module'; @Module({ - imports: [SharedModule, SubscribersModule, AuthModule, IntegrationModule], + imports: [SharedModule, SubscribersModule, AuthModule, IntegrationModule, BaseAppQueuesModule], providers: [...USE_CASES], exports: [...USE_CASES], controllers: [WidgetsController], diff --git a/packages/application-generic/src/modules/index.ts b/packages/application-generic/src/modules/index.ts index f5cdb6c5761..0fb985a789c 100644 --- a/packages/application-generic/src/modules/index.ts +++ b/packages/application-generic/src/modules/index.ts @@ -1 +1,2 @@ export { QueuesModule } from './queues.module'; +export { BaseAppQueuesModule } from './queues.module'; diff --git a/packages/application-generic/src/modules/queues.module.ts b/packages/application-generic/src/modules/queues.module.ts index 9f925d91463..8eadf9868e0 100644 --- a/packages/application-generic/src/modules/queues.module.ts +++ b/packages/application-generic/src/modules/queues.module.ts @@ -67,3 +67,18 @@ const PROVIDERS: Provider[] = [ exports: [...PROVIDERS], }) export class QueuesModule {} + +const APP_PROVIDERS: Provider[] = [ + InboundParseQueue, + InboundParseWorker, + InboundParseQueueServiceHealthIndicator, + WebSocketsQueueService, + WebSocketsQueueServiceHealthIndicator, + WorkflowQueueService, + WorkflowQueueServiceHealthIndicator, +]; +@Module({ + providers: [...APP_PROVIDERS], + exports: [...APP_PROVIDERS], +}) +export class BaseAppQueuesModule {}