Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor the broadcast method to utilize the new queue #4526

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
c599eee
feat: refactor the broadcast method to utilize the new queue
djabarovgeorge Oct 15, 2023
41dc6bd
fix: missing actor mapping
djabarovgeorge Oct 17, 2023
092193e
Merge branch 'next' into NV-3018-refactor-the-broadcast-method-to-uti…
djabarovgeorge Oct 23, 2023
908f7dc
feat: handle broadcast in trigger event usecase
djabarovgeorge Oct 23, 2023
92bb3ae
feat: make to optional, because it could be broadcast
djabarovgeorge Oct 23, 2023
d1204c0
Merge branch 'next' into NV-3018-refactor-the-broadcast-method-to-uti…
djabarovgeorge Oct 23, 2023
3931058
feat: remove redundant and before time mapping
djabarovgeorge Oct 25, 2023
8a80504
Merge branch 'next' into NV-3018-refactor-the-broadcast-method-to-uti…
djabarovgeorge Oct 26, 2023
010ff5e
fix: remove standard queue service and add map trigger recipients as …
djabarovgeorge Oct 26, 2023
b0d91a1
Merge remote-tracking branch 'origin/NV-3018-refactor-the-broadcast-m…
djabarovgeorge Oct 26, 2023
601b977
fix: update pnpm lock
djabarovgeorge Oct 26, 2023
61449fd
fix: command validation & tenant mapping
djabarovgeorge Oct 26, 2023
e44c267
feat: split trigger commands by type
djabarovgeorge Oct 29, 2023
0079dc1
fix: command type
djabarovgeorge Oct 29, 2023
775a87b
Merge branch 'next' into NV-3018-refactor-the-broadcast-method-to-uti…
djabarovgeorge Oct 29, 2023
bc81155
fix: tenant fetch from string
djabarovgeorge Oct 29, 2023
3bd3a71
fix: pnpm lock
djabarovgeorge Nov 2, 2023
81f5f0d
fix: pnpm lock
djabarovgeorge Nov 3, 2023
02614f8
Merge remote-tracking branch 'origin/NV-3018-refactor-the-broadcast-m…
djabarovgeorge Nov 3, 2023
d0d22e8
fix: pnpm lock
djabarovgeorge Nov 3, 2023
1de3c4e
Merge branch 'next' into NV-3018-refactor-the-broadcast-method-to-uti…
LetItRock Nov 3, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 3 additions & 16 deletions apps/api/src/app/events/events.controller.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,8 @@
import { Body, Controller, Delete, Param, Post, Scope, UseGuards } from '@nestjs/common';
import { ApiOkResponse, ApiExcludeEndpoint, ApiOperation, ApiTags } from '@nestjs/swagger';
import { v4 as uuidv4 } from 'uuid';
import {
IJwtPayload,
ISubscribersDefine,
ITenantDefine,
TriggerRecipientSubscriber,
TriggerTenantContext,
} from '@novu/shared';
import { MapTriggerRecipients, SendTestEmail, SendTestEmailCommand } from '@novu/application-generic';
import { IJwtPayload, ITenantDefine, TriggerTenantContext } from '@novu/shared';
import { SendTestEmail, SendTestEmailCommand } from '@novu/application-generic';

import {
BulkTriggerEventDto,
Expand All @@ -35,7 +29,6 @@ import { DataBooleanDto } from '../shared/dtos/data-wrapper-dto';
@ApiTags('Events')
export class EventsController {
constructor(
private mapTriggerRecipients: MapTriggerRecipients,
private cancelDelayedUsecase: CancelDelayed,
private triggerEventToAll: TriggerEventToAll,
private sendTestEmail: SendTestEmail,
Expand Down Expand Up @@ -118,7 +111,7 @@ export class EventsController {
@Body() body: TriggerEventToAllRequestDto
): Promise<TriggerEventResponseDto> {
const transactionId = body.transactionId || uuidv4();
const mappedActor = body.actor ? this.mapActor(body.actor) : null;
const mappedActor = typeof body.actor === 'string' ? { subscriberId: body.actor } : null;
djabarovgeorge marked this conversation as resolved.
Show resolved Hide resolved
const mappedTenant = body.tenant ? this.mapTenant(body.tenant) : null;

return this.triggerEventToAll.execute(
Expand Down Expand Up @@ -183,12 +176,6 @@ export class EventsController {
);
}

private mapActor(actor?: TriggerRecipientSubscriber | null): ISubscribersDefine | null {
if (!actor) return null;

return this.mapTriggerRecipients.mapSubscriber(actor);
}

private mapTenant(tenant?: TriggerTenantContext | null): ITenantDefine | null {
if (!tenant) return null;

Expand Down
30 changes: 2 additions & 28 deletions apps/api/src/app/events/events.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,12 @@ import { Module } from '@nestjs/common';
import { TerminusModule } from '@nestjs/terminus';

import {
AddJob,
AddDelayJob,
MergeOrCreateDigest,
CreateExecutionDetails,
CreateNotificationJobs,
DigestFilterSteps,
DigestFilterStepsBackoff,
DigestFilterStepsRegular,
DigestFilterStepsTimed,
EventsDistributedLockService,
GetNovuProviderCredentials,
ProcessSubscriber,
ProcessTenant,
QueuesModule,
StorageHelperService,
SendTestEmail,
StoreSubscriberJobs,
TriggerEvent,
MapTriggerRecipients,
BaseAppQueuesModule,
} from '@novu/application-generic';

import { EventsController } from './events.controller';
Expand All @@ -39,24 +26,11 @@ import { LayoutsModule } from '../layouts/layouts.module';
import { TenantModule } from '../tenant/tenant.module';

const PROVIDERS = [
AddJob,
AddDelayJob,
MergeOrCreateDigest,
CreateExecutionDetails,
CreateNotificationJobs,
DigestFilterSteps,
DigestFilterStepsBackoff,
DigestFilterStepsRegular,
DigestFilterStepsTimed,
GetNovuProviderCredentials,
StorageHelperService,
EventsDistributedLockService,
ProcessSubscriber,
ProcessTenant,
SendTestEmail,
StoreSubscriberJobs,
TriggerEvent,
MapTriggerRecipients,
Comment on lines -42 to -59
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i suspect we have more redundant providers in the DI.

];

@Module({
Expand All @@ -73,7 +47,7 @@ const PROVIDERS = [
TopicsModule,
LayoutsModule,
TenantModule,
QueuesModule,
BaseAppQueuesModule,
],
controllers: [EventsController],
providers: [...PROVIDERS, ...USE_CASES],
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { Injectable } from '@nestjs/common';
import { TriggerEventStatusEnum } from '@novu/shared';
import { MapTriggerRecipients } from '@novu/application-generic';

import { ProcessBulkTriggerCommand } from './process-bulk-trigger.command';

Expand All @@ -10,7 +9,7 @@ import { ParseEventRequest } from '../parse-event-request/parse-event-request.us

@Injectable()
export class ProcessBulkTrigger {
constructor(private parseEventRequest: ParseEventRequest, private mapTriggerRecipients: MapTriggerRecipients) {}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

redundant 🧹

constructor(private parseEventRequest: ParseEventRequest) {}

async execute(command: ProcessBulkTriggerCommand) {
const results: TriggerEventResponseDto[] = [];
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
import { Injectable } from '@nestjs/common';
import * as _ from 'lodash';
import { SubscriberEntity, SubscriberRepository } from '@novu/dal';
import { TriggerEvent, TriggerEventCommand } from '@novu/application-generic';
import { TriggerEventStatusEnum } from '@novu/shared';

import { TriggerEventToAllCommand } from './trigger-event-to-all.command';
import { ParseEventRequest, ParseEventRequestCommand } from '../parse-event-request';

@Injectable()
export class TriggerEventToAll {
constructor(private triggerEvent: TriggerEvent, private subscriberRepository: SubscriberRepository) {}
constructor(private subscriberRepository: SubscriberRepository, private parseEventRequest: ParseEventRequest) {}

public async execute(command: TriggerEventToAllCommand) {
const batchSize = 500;
Expand Down Expand Up @@ -42,18 +41,18 @@ export class TriggerEventToAll {
}

private async trigger(command: TriggerEventToAllCommand, list: SubscriberEntity[]) {
djabarovgeorge marked this conversation as resolved.
Show resolved Hide resolved
await this.triggerEvent.execute(
TriggerEventCommand.create({
await this.parseEventRequest.execute(
ParseEventRequestCommand.create({
userId: command.userId,
environmentId: command.environmentId,
organizationId: command.organizationId,
identifier: command.identifier,
payload: command.payload,
payload: command.payload || {},
to: list.map((item) => ({
subscriberId: item.subscriberId,
})),
transactionId: command.transactionId,
overrides: command.overrides,
overrides: command.overrides || {},
actor: command.actor,
tenant: command.tenant,
})
Expand Down
2 changes: 0 additions & 2 deletions apps/api/src/app/health/health.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ export class HealthController {
private healthCheckService: HealthCheckService,
private cacheHealthIndicator: CacheServiceHealthIndicator,
private dalHealthIndicator: DalServiceHealthIndicator,
private standardQueueHealthIndicator: StandardQueueServiceHealthIndicator,
private workflowQueueHealthIndicator: WorkflowQueueServiceHealthIndicator
) {}

Expand All @@ -26,7 +25,6 @@ export class HealthController {
healthCheck(): Promise<HealthCheckResult> {
const checks: HealthIndicatorFunction[] = [
async () => this.dalHealthIndicator.isHealthy(),
async () => this.standardQueueHealthIndicator.isHealthy(),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We no longer need it because we will use only the workflow queue in the API.

async () => this.workflowQueueHealthIndicator.isHealthy(),
async () => {
return {
Expand Down
5 changes: 3 additions & 2 deletions apps/api/src/app/health/health.module.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import { Module } from '@nestjs/common';
import { TerminusModule } from '@nestjs/terminus';

import { HealthController } from './health.controller';
import { BaseAppQueuesModule } from '@novu/application-generic';

import { HealthController } from './health.controller';
import { SharedModule } from '../shared/shared.module';

@Module({
imports: [SharedModule, TerminusModule],
imports: [SharedModule, TerminusModule, BaseAppQueuesModule],
controllers: [HealthController],
providers: [],
})
Expand Down
7 changes: 3 additions & 4 deletions apps/api/src/app/inbound-parse/inbound-parse.module.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
import { MiddlewareConsumer, Module, NestModule } from '@nestjs/common';
import { CompileTemplate, QueuesModule } from '@novu/application-generic';
import { BaseAppQueuesModule, CompileTemplate } from '@novu/application-generic';

import { USE_CASES } from './usecases';
import { InboundParseController } from './inbound-parse.controller';
import { InboundParseQueueService } from './services/inbound-parse.queue.service';
import { GetMxRecord } from './usecases/get-mx-record/get-mx-record.usecase';
import { InboundEmailParse } from './usecases/inbound-email-parse/inbound-email-parse.usecase';

import { SharedModule } from '../shared/shared.module';
import { AuthModule } from '../auth/auth.module';

const PROVIDERS = [InboundParseQueueService, GetMxRecord, CompileTemplate];

@Module({
imports: [SharedModule, AuthModule, QueuesModule],
imports: [SharedModule, AuthModule, BaseAppQueuesModule],
controllers: [InboundParseController],
providers: [...PROVIDERS, ...USE_CASES],
exports: [...USE_CASES, QueuesModule],
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to export this one here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It probably not anymore. I think it can be safe to be removed.

exports: [...USE_CASES],
})
export class InboundParseModule implements NestModule {
configure(consumer: MiddlewareConsumer): MiddlewareConsumer | void {}
Expand Down
3 changes: 1 addition & 2 deletions apps/api/src/app/shared/shared.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,8 @@ const PROVIDERS = [
version: packageJson.version,
})
),
QueuesModule,
],
providers: [...PROVIDERS],
exports: [...PROVIDERS, LoggerModule, QueuesModule],
Comment on lines -101 to -104
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure why we need this one here therefore I removed it.
If someone knows the reason please let me know.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was added to provide the QueuesModule as provider in any module that is being dependant of SharedModule so in case of need QueuesModule would be available without having to inject it in said module.

exports: [...PROVIDERS, LoggerModule],
})
export class SharedModule {}
4 changes: 3 additions & 1 deletion apps/api/src/app/widgets/widgets.module.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import { Module } from '@nestjs/common';

import { BaseAppQueuesModule } from '@novu/application-generic';

import { USE_CASES } from './usecases';
import { WidgetsController } from './widgets.controller';
import { SharedModule } from '../shared/shared.module';
Expand All @@ -8,7 +10,7 @@ import { SubscribersModule } from '../subscribers/subscribers.module';
import { IntegrationModule } from '../integrations/integrations.module';

@Module({
imports: [SharedModule, SubscribersModule, AuthModule, IntegrationModule],
imports: [SharedModule, SubscribersModule, AuthModule, IntegrationModule, BaseAppQueuesModule],
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was needed because we removed QueueModule from the shared module.
Here we need BaseAppQueuesModule because we use WebSocketsQueueService.

providers: [...USE_CASES],
exports: [...USE_CASES],
controllers: [WidgetsController],
Expand Down
1 change: 1 addition & 0 deletions packages/application-generic/src/modules/index.ts
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
export { QueuesModule } from './queues.module';
export { BaseAppQueuesModule } from './queues.module';
15 changes: 15 additions & 0 deletions packages/application-generic/src/modules/queues.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,18 @@ const PROVIDERS: Provider[] = [
exports: [...PROVIDERS],
})
export class QueuesModule {}

const APP_PROVIDERS: Provider[] = [
InboundParseQueue,
InboundParseWorker,
InboundParseQueueServiceHealthIndicator,
WebSocketsQueueService,
WebSocketsQueueServiceHealthIndicator,
WorkflowQueueService,
WorkflowQueueServiceHealthIndicator,
];
@Module({
providers: [...APP_PROVIDERS],
exports: [...APP_PROVIDERS],
})
export class BaseAppQueuesModule {}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not the final solution but a temporary one, in this part, I extracted the relevant queues, so we wont initialize not relevant queues in the API APP.
Will make a discovery in order to check if we could take advantage of nest dynamic modules in order to make it more convenient.
https://docs.nestjs.com/fundamentals/dynamic-modules

Loading