Skip to content

Commit

Permalink
feat: refactor the broadcast method to utilize the new queue
Browse files Browse the repository at this point in the history
  • Loading branch information
djabarovgeorge committed Oct 15, 2023
1 parent 88ec98c commit c599eee
Show file tree
Hide file tree
Showing 11 changed files with 38 additions and 64 deletions.
19 changes: 3 additions & 16 deletions apps/api/src/app/events/events.controller.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,8 @@
import { Body, Controller, Delete, Param, Post, Scope, UseGuards } from '@nestjs/common';
import { ApiOkResponse, ApiExcludeEndpoint, ApiOperation, ApiTags } from '@nestjs/swagger';
import { v4 as uuidv4 } from 'uuid';
import {
IJwtPayload,
ISubscribersDefine,
ITenantDefine,
TriggerRecipientSubscriber,
TriggerTenantContext,
} from '@novu/shared';
import { MapTriggerRecipients, SendTestEmail, SendTestEmailCommand } from '@novu/application-generic';
import { IJwtPayload, ITenantDefine, TriggerTenantContext } from '@novu/shared';
import { SendTestEmail, SendTestEmailCommand } from '@novu/application-generic';

import {
BulkTriggerEventDto,
Expand All @@ -35,7 +29,6 @@ import { DataBooleanDto } from '../shared/dtos/data-wrapper-dto';
@ApiTags('Events')
export class EventsController {
constructor(
private mapTriggerRecipients: MapTriggerRecipients,
private cancelDelayedUsecase: CancelDelayed,
private triggerEventToAll: TriggerEventToAll,
private sendTestEmail: SendTestEmail,
Expand Down Expand Up @@ -118,7 +111,7 @@ export class EventsController {
@Body() body: TriggerEventToAllRequestDto
): Promise<TriggerEventResponseDto> {
const transactionId = body.transactionId || uuidv4();
const mappedActor = body.actor ? this.mapActor(body.actor) : null;
const mappedActor = typeof body.actor === 'string' ? { subscriberId: body.actor } : null;
const mappedTenant = body.tenant ? this.mapTenant(body.tenant) : null;

return this.triggerEventToAll.execute(
Expand Down Expand Up @@ -183,12 +176,6 @@ export class EventsController {
);
}

private mapActor(actor?: TriggerRecipientSubscriber | null): ISubscribersDefine | null {
if (!actor) return null;

return this.mapTriggerRecipients.mapSubscriber(actor);
}

private mapTenant(tenant?: TriggerTenantContext | null): ITenantDefine | null {
if (!tenant) return null;

Expand Down
30 changes: 2 additions & 28 deletions apps/api/src/app/events/events.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,12 @@ import { Module } from '@nestjs/common';
import { TerminusModule } from '@nestjs/terminus';

import {
AddJob,
AddDelayJob,
MergeOrCreateDigest,
CreateExecutionDetails,
CreateNotificationJobs,
DigestFilterSteps,
DigestFilterStepsBackoff,
DigestFilterStepsRegular,
DigestFilterStepsTimed,
EventsDistributedLockService,
GetNovuProviderCredentials,
ProcessSubscriber,
ProcessTenant,
QueuesModule,
StorageHelperService,
SendTestEmail,
StoreSubscriberJobs,
TriggerEvent,
MapTriggerRecipients,
BaseAppQueuesModule,
} from '@novu/application-generic';

import { EventsController } from './events.controller';
Expand All @@ -39,24 +26,11 @@ import { LayoutsModule } from '../layouts/layouts.module';
import { TenantModule } from '../tenant/tenant.module';

const PROVIDERS = [
AddJob,
AddDelayJob,
MergeOrCreateDigest,
CreateExecutionDetails,
CreateNotificationJobs,
DigestFilterSteps,
DigestFilterStepsBackoff,
DigestFilterStepsRegular,
DigestFilterStepsTimed,
GetNovuProviderCredentials,
StorageHelperService,
EventsDistributedLockService,
ProcessSubscriber,
ProcessTenant,
SendTestEmail,
StoreSubscriberJobs,
TriggerEvent,
MapTriggerRecipients,
];

@Module({
Expand All @@ -73,7 +47,7 @@ const PROVIDERS = [
TopicsModule,
LayoutsModule,
TenantModule,
QueuesModule,
BaseAppQueuesModule,
],
controllers: [EventsController],
providers: [...PROVIDERS, ...USE_CASES],
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { Injectable } from '@nestjs/common';
import { TriggerEventStatusEnum } from '@novu/shared';
import { MapTriggerRecipients } from '@novu/application-generic';

import { ProcessBulkTriggerCommand } from './process-bulk-trigger.command';

Expand All @@ -10,7 +9,7 @@ import { ParseEventRequest } from '../parse-event-request/parse-event-request.us

@Injectable()
export class ProcessBulkTrigger {
constructor(private parseEventRequest: ParseEventRequest, private mapTriggerRecipients: MapTriggerRecipients) {}
constructor(private parseEventRequest: ParseEventRequest) {}

async execute(command: ProcessBulkTriggerCommand) {
const results: TriggerEventResponseDto[] = [];
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
import { Injectable } from '@nestjs/common';
import * as _ from 'lodash';
import { SubscriberEntity, SubscriberRepository } from '@novu/dal';
import { TriggerEvent, TriggerEventCommand } from '@novu/application-generic';
import { TriggerEventStatusEnum } from '@novu/shared';

import { TriggerEventToAllCommand } from './trigger-event-to-all.command';
import { ParseEventRequest, ParseEventRequestCommand } from '../parse-event-request';

@Injectable()
export class TriggerEventToAll {
constructor(private triggerEvent: TriggerEvent, private subscriberRepository: SubscriberRepository) {}
constructor(private subscriberRepository: SubscriberRepository, private parseEventRequest: ParseEventRequest) {}

public async execute(command: TriggerEventToAllCommand) {
const batchSize = 500;
Expand Down Expand Up @@ -42,18 +41,18 @@ export class TriggerEventToAll {
}

private async trigger(command: TriggerEventToAllCommand, list: SubscriberEntity[]) {
await this.triggerEvent.execute(
TriggerEventCommand.create({
await this.parseEventRequest.execute(
ParseEventRequestCommand.create({
userId: command.userId,
environmentId: command.environmentId,
organizationId: command.organizationId,
identifier: command.identifier,
payload: command.payload,
payload: command.payload || {},
to: list.map((item) => ({
subscriberId: item.subscriberId,
})),
transactionId: command.transactionId,
overrides: command.overrides,
overrides: command.overrides || {},
actor: command.actor,
tenant: command.tenant,
})
Expand Down
2 changes: 0 additions & 2 deletions apps/api/src/app/health/health.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ export class HealthController {
private healthCheckService: HealthCheckService,
private cacheHealthIndicator: CacheServiceHealthIndicator,
private dalHealthIndicator: DalServiceHealthIndicator,
private standardQueueHealthIndicator: StandardQueueServiceHealthIndicator,
private workflowQueueHealthIndicator: WorkflowQueueServiceHealthIndicator
) {}

Expand All @@ -26,7 +25,6 @@ export class HealthController {
healthCheck(): Promise<HealthCheckResult> {
const checks: HealthIndicatorFunction[] = [
async () => this.dalHealthIndicator.isHealthy(),
async () => this.standardQueueHealthIndicator.isHealthy(),
async () => this.workflowQueueHealthIndicator.isHealthy(),
async () => {
return {
Expand Down
5 changes: 3 additions & 2 deletions apps/api/src/app/health/health.module.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import { Module } from '@nestjs/common';
import { TerminusModule } from '@nestjs/terminus';

import { HealthController } from './health.controller';
import { BaseAppQueuesModule } from '@novu/application-generic';

import { HealthController } from './health.controller';
import { SharedModule } from '../shared/shared.module';

@Module({
imports: [SharedModule, TerminusModule],
imports: [SharedModule, TerminusModule, BaseAppQueuesModule],
controllers: [HealthController],
providers: [],
})
Expand Down
7 changes: 3 additions & 4 deletions apps/api/src/app/inbound-parse/inbound-parse.module.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
import { MiddlewareConsumer, Module, NestModule } from '@nestjs/common';
import { CompileTemplate, QueuesModule } from '@novu/application-generic';
import { BaseAppQueuesModule, CompileTemplate } from '@novu/application-generic';

import { USE_CASES } from './usecases';
import { InboundParseController } from './inbound-parse.controller';
import { InboundParseQueueService } from './services/inbound-parse.queue.service';
import { GetMxRecord } from './usecases/get-mx-record/get-mx-record.usecase';
import { InboundEmailParse } from './usecases/inbound-email-parse/inbound-email-parse.usecase';

import { SharedModule } from '../shared/shared.module';
import { AuthModule } from '../auth/auth.module';

const PROVIDERS = [InboundParseQueueService, GetMxRecord, CompileTemplate];

@Module({
imports: [SharedModule, AuthModule, QueuesModule],
imports: [SharedModule, AuthModule, BaseAppQueuesModule],
controllers: [InboundParseController],
providers: [...PROVIDERS, ...USE_CASES],
exports: [...USE_CASES, QueuesModule],
exports: [...USE_CASES],
})
export class InboundParseModule implements NestModule {
configure(consumer: MiddlewareConsumer): MiddlewareConsumer | void {}
Expand Down
3 changes: 1 addition & 2 deletions apps/api/src/app/shared/shared.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,8 @@ const PROVIDERS = [
version: packageJson.version,
})
),
QueuesModule,
],
providers: [...PROVIDERS],
exports: [...PROVIDERS, LoggerModule, QueuesModule],
exports: [...PROVIDERS, LoggerModule],
})
export class SharedModule {}
4 changes: 3 additions & 1 deletion apps/api/src/app/widgets/widgets.module.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import { Module } from '@nestjs/common';

import { BaseAppQueuesModule } from '@novu/application-generic';

import { USE_CASES } from './usecases';
import { WidgetsController } from './widgets.controller';
import { SharedModule } from '../shared/shared.module';
Expand All @@ -8,7 +10,7 @@ import { SubscribersModule } from '../subscribers/subscribers.module';
import { IntegrationModule } from '../integrations/integrations.module';

@Module({
imports: [SharedModule, SubscribersModule, AuthModule, IntegrationModule],
imports: [SharedModule, SubscribersModule, AuthModule, IntegrationModule, BaseAppQueuesModule],
providers: [...USE_CASES],
exports: [...USE_CASES],
controllers: [WidgetsController],
Expand Down
1 change: 1 addition & 0 deletions packages/application-generic/src/modules/index.ts
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
export { QueuesModule } from './queues.module';
export { BaseAppQueuesModule } from './queues.module';
15 changes: 15 additions & 0 deletions packages/application-generic/src/modules/queues.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,18 @@ const PROVIDERS: Provider[] = [
exports: [...PROVIDERS],
})
export class QueuesModule {}

const APP_PROVIDERS: Provider[] = [
InboundParseQueue,
InboundParseWorker,
InboundParseQueueServiceHealthIndicator,
WebSocketsQueueService,
WebSocketsQueueServiceHealthIndicator,
WorkflowQueueService,
WorkflowQueueServiceHealthIndicator,
];
@Module({
providers: [...APP_PROVIDERS],
exports: [...APP_PROVIDERS],
})
export class BaseAppQueuesModule {}

0 comments on commit c599eee

Please sign in to comment.