Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor the broadcast method to utilize the new queue #4526

Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
c599eee
feat: refactor the broadcast method to utilize the new queue
djabarovgeorge Oct 15, 2023
41dc6bd
fix: missing actor mapping
djabarovgeorge Oct 17, 2023
092193e
Merge branch 'next' into NV-3018-refactor-the-broadcast-method-to-uti…
djabarovgeorge Oct 23, 2023
908f7dc
feat: handle broadcast in trigger event usecase
djabarovgeorge Oct 23, 2023
92bb3ae
feat: make to optional, because it could be broadcast
djabarovgeorge Oct 23, 2023
d1204c0
Merge branch 'next' into NV-3018-refactor-the-broadcast-method-to-uti…
djabarovgeorge Oct 23, 2023
3931058
feat: remove redundant and before time mapping
djabarovgeorge Oct 25, 2023
8a80504
Merge branch 'next' into NV-3018-refactor-the-broadcast-method-to-uti…
djabarovgeorge Oct 26, 2023
010ff5e
fix: remove standard queue service and add map trigger recipients as …
djabarovgeorge Oct 26, 2023
b0d91a1
Merge remote-tracking branch 'origin/NV-3018-refactor-the-broadcast-m…
djabarovgeorge Oct 26, 2023
601b977
fix: update pnpm lock
djabarovgeorge Oct 26, 2023
61449fd
fix: command validation & tenant mapping
djabarovgeorge Oct 26, 2023
e44c267
feat: split trigger commands by type
djabarovgeorge Oct 29, 2023
0079dc1
fix: command type
djabarovgeorge Oct 29, 2023
775a87b
Merge branch 'next' into NV-3018-refactor-the-broadcast-method-to-uti…
djabarovgeorge Oct 29, 2023
bc81155
fix: tenant fetch from string
djabarovgeorge Oct 29, 2023
3bd3a71
fix: pnpm lock
djabarovgeorge Nov 2, 2023
81f5f0d
fix: pnpm lock
djabarovgeorge Nov 3, 2023
02614f8
Merge remote-tracking branch 'origin/NV-3018-refactor-the-broadcast-m…
djabarovgeorge Nov 3, 2023
d0d22e8
fix: pnpm lock
djabarovgeorge Nov 3, 2023
1de3c4e
Merge branch 'next' into NV-3018-refactor-the-broadcast-method-to-uti…
LetItRock Nov 3, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions apps/api/src/app/events/events.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import {
TriggerRecipientSubscriber,
TriggerTenantContext,
} from '@novu/shared';
import { MapTriggerRecipients, SendTestEmail, SendTestEmailCommand } from '@novu/application-generic';
import { SendTestEmail, SendTestEmailCommand } from '@novu/application-generic';

import {
BulkTriggerEventDto,
Expand All @@ -35,7 +35,6 @@ import { DataBooleanDto } from '../shared/dtos/data-wrapper-dto';
@ApiTags('Events')
export class EventsController {
constructor(
private mapTriggerRecipients: MapTriggerRecipients,
private cancelDelayedUsecase: CancelDelayed,
private triggerEventToAll: TriggerEventToAll,
private sendTestEmail: SendTestEmail,
Expand Down Expand Up @@ -119,7 +118,7 @@ export class EventsController {
@Body() body: TriggerEventToAllRequestDto
): Promise<TriggerEventResponseDto> {
const transactionId = body.transactionId || uuidv4();
const mappedActor = body.actor ? this.mapActor(body.actor) : null;
const mappedActor = this.mapActor(body.actor);
const mappedTenant = body.tenant ? this.mapTenant(body.tenant) : null;

return this.triggerEventToAll.execute(
Expand Down Expand Up @@ -187,7 +186,11 @@ export class EventsController {
private mapActor(actor?: TriggerRecipientSubscriber | null): ISubscribersDefine | null {
if (!actor) return null;

return this.mapTriggerRecipients.mapSubscriber(actor);
if (typeof actor === 'string') {
return { subscriberId: actor };
}

return actor;
}

private mapTenant(tenant?: TriggerTenantContext | null): ITenantDefine | null {
Expand Down
30 changes: 2 additions & 28 deletions apps/api/src/app/events/events.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,12 @@ import { Module } from '@nestjs/common';
import { TerminusModule } from '@nestjs/terminus';

import {
AddJob,
AddDelayJob,
MergeOrCreateDigest,
CreateExecutionDetails,
CreateNotificationJobs,
DigestFilterSteps,
DigestFilterStepsBackoff,
DigestFilterStepsRegular,
DigestFilterStepsTimed,
EventsDistributedLockService,
GetNovuProviderCredentials,
ProcessSubscriber,
ProcessTenant,
QueuesModule,
StorageHelperService,
SendTestEmail,
StoreSubscriberJobs,
TriggerEvent,
MapTriggerRecipients,
BaseApiQueuesModule,
} from '@novu/application-generic';

import { EventsController } from './events.controller';
Expand All @@ -39,24 +26,11 @@ import { LayoutsModule } from '../layouts/layouts.module';
import { TenantModule } from '../tenant/tenant.module';

const PROVIDERS = [
AddJob,
AddDelayJob,
MergeOrCreateDigest,
CreateExecutionDetails,
CreateNotificationJobs,
DigestFilterSteps,
DigestFilterStepsBackoff,
DigestFilterStepsRegular,
DigestFilterStepsTimed,
GetNovuProviderCredentials,
StorageHelperService,
EventsDistributedLockService,
ProcessSubscriber,
ProcessTenant,
SendTestEmail,
StoreSubscriberJobs,
TriggerEvent,
MapTriggerRecipients,
Comment on lines -42 to -59
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i suspect we have more redundant providers in the DI.

];

@Module({
Expand All @@ -73,7 +47,7 @@ const PROVIDERS = [
TopicsModule,
LayoutsModule,
TenantModule,
QueuesModule,
BaseApiQueuesModule,
],
controllers: [EventsController],
providers: [...PROVIDERS, ...USE_CASES],
Expand Down
Copy link
Contributor Author

@djabarovgeorge djabarovgeorge Oct 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was debating with myself if to update the to type, to contain AddressingTypeEnum as well or not.
In the end decided to add additional addressingType param.
Will love to hear if anyone has any ideas regarding the new Command interface.

small note, if addressingType === AddressingTypeEnum.BROADCAST the to is redundant because we will trigger to all of the subscribers.

Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { IsDefined, IsString, IsOptional, ValidateNested } from 'class-validator';
import { TriggerRecipients, TriggerRecipientSubscriber, TriggerTenantContext } from '@novu/shared';
import { AddressingTypeEnum, TriggerRecipients, TriggerRecipientSubscriber, TriggerTenantContext } from '@novu/shared';

import { EnvironmentWithUserCommand } from '../../../shared/commands/project.command';

Expand All @@ -14,8 +14,8 @@ export class ParseEventRequestCommand extends EnvironmentWithUserCommand {
@IsDefined()
overrides: Record<string, Record<string, unknown>>;

@IsDefined()
to: TriggerRecipients;
@IsOptional()
to?: TriggerRecipients;

@IsString()
@IsOptional()
Expand All @@ -28,4 +28,7 @@ export class ParseEventRequestCommand extends EnvironmentWithUserCommand {
@IsOptional()
@ValidateNested()
tenant?: TriggerTenantContext | null;

@IsOptional()
addressingType?: AddressingTypeEnum;
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { Injectable } from '@nestjs/common';
import { TriggerEventStatusEnum } from '@novu/shared';
import { MapTriggerRecipients } from '@novu/application-generic';

import { ProcessBulkTriggerCommand } from './process-bulk-trigger.command';

Expand All @@ -10,7 +9,7 @@ import { ParseEventRequest } from '../parse-event-request/parse-event-request.us

@Injectable()
export class ProcessBulkTrigger {
constructor(private parseEventRequest: ParseEventRequest, private mapTriggerRecipients: MapTriggerRecipients) {}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

redundant 🧹

constructor(private parseEventRequest: ParseEventRequest) {}

async execute(command: ProcessBulkTriggerCommand) {
const results: TriggerEventResponseDto[] = [];
Expand Down
Original file line number Diff line number Diff line change
@@ -1,62 +1,34 @@
import { Injectable } from '@nestjs/common';
import * as _ from 'lodash';
import { SubscriberEntity, SubscriberRepository } from '@novu/dal';
import { TriggerEvent, TriggerEventCommand } from '@novu/application-generic';
import { TriggerEventStatusEnum } from '@novu/shared';
import { SubscriberRepository } from '@novu/dal';
import { AddressingTypeEnum, TriggerEventStatusEnum } from '@novu/shared';

import { TriggerEventToAllCommand } from './trigger-event-to-all.command';
import { ParseEventRequest, ParseEventRequestCommand } from '../parse-event-request';

@Injectable()
export class TriggerEventToAll {
constructor(private triggerEvent: TriggerEvent, private subscriberRepository: SubscriberRepository) {}
constructor(private subscriberRepository: SubscriberRepository, private parseEventRequest: ParseEventRequest) {}

public async execute(command: TriggerEventToAllCommand) {
const batchSize = 500;
let list: SubscriberEntity[] = [];

for await (const subscriber of this.subscriberRepository.findBatch(
{
_environmentId: command.environmentId,
_organizationId: command.organizationId,
},
'subscriberId',
{},
batchSize
)) {
list.push(subscriber);
if (list.length === batchSize) {
await this.trigger(command, list);
list = [];
}
}

if (list.length > 0) {
await this.trigger(command, list);
}

return {
acknowledged: true,
status: TriggerEventStatusEnum.PROCESSED,
transactionId: command.transactionId,
};
}

private async trigger(command: TriggerEventToAllCommand, list: SubscriberEntity[]) {
await this.triggerEvent.execute(
TriggerEventCommand.create({
await this.parseEventRequest.execute(
ParseEventRequestCommand.create({
userId: command.userId,
environmentId: command.environmentId,
organizationId: command.organizationId,
identifier: command.identifier,
payload: command.payload,
to: list.map((item) => ({
subscriberId: item.subscriberId,
})),
payload: command.payload || {},
addressingType: AddressingTypeEnum.BROADCAST,
transactionId: command.transactionId,
overrides: command.overrides,
overrides: command.overrides || {},
actor: command.actor,
tenant: command.tenant,
})
);

return {
acknowledged: true,
status: TriggerEventStatusEnum.PROCESSED,
transactionId: command.transactionId,
};
}
}
2 changes: 0 additions & 2 deletions apps/api/src/app/health/health.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ export class HealthController {
private healthCheckService: HealthCheckService,
private cacheHealthIndicator: CacheServiceHealthIndicator,
private dalHealthIndicator: DalServiceHealthIndicator,
private standardQueueHealthIndicator: StandardQueueServiceHealthIndicator,
private workflowQueueHealthIndicator: WorkflowQueueServiceHealthIndicator
) {}

Expand All @@ -26,7 +25,6 @@ export class HealthController {
healthCheck(): Promise<HealthCheckResult> {
const checks: HealthIndicatorFunction[] = [
async () => this.dalHealthIndicator.isHealthy(),
async () => this.standardQueueHealthIndicator.isHealthy(),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We no longer need it because we will use only the workflow queue in the API.

async () => this.workflowQueueHealthIndicator.isHealthy(),
async () => {
return {
Expand Down
5 changes: 3 additions & 2 deletions apps/api/src/app/health/health.module.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import { Module } from '@nestjs/common';
import { TerminusModule } from '@nestjs/terminus';

import { HealthController } from './health.controller';
import { BaseApiQueuesModule } from '@novu/application-generic';

import { HealthController } from './health.controller';
import { SharedModule } from '../shared/shared.module';

@Module({
imports: [SharedModule, TerminusModule],
imports: [SharedModule, TerminusModule, BaseApiQueuesModule],
controllers: [HealthController],
providers: [],
})
Expand Down
7 changes: 3 additions & 4 deletions apps/api/src/app/inbound-parse/inbound-parse.module.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
import { MiddlewareConsumer, Module, NestModule } from '@nestjs/common';
import { CompileTemplate, QueuesModule } from '@novu/application-generic';
import { BaseApiQueuesModule, CompileTemplate } from '@novu/application-generic';

import { USE_CASES } from './usecases';
import { InboundParseController } from './inbound-parse.controller';
import { InboundParseQueueService } from './services/inbound-parse.queue.service';
import { GetMxRecord } from './usecases/get-mx-record/get-mx-record.usecase';
import { InboundEmailParse } from './usecases/inbound-email-parse/inbound-email-parse.usecase';

import { SharedModule } from '../shared/shared.module';
import { AuthModule } from '../auth/auth.module';

const PROVIDERS = [InboundParseQueueService, GetMxRecord, CompileTemplate];

@Module({
imports: [SharedModule, AuthModule, QueuesModule],
imports: [SharedModule, AuthModule, BaseApiQueuesModule],
controllers: [InboundParseController],
providers: [...PROVIDERS, ...USE_CASES],
exports: [...USE_CASES, QueuesModule],
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to export this one here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It probably not anymore. I think it can be safe to be removed.

exports: [...USE_CASES],
})
export class InboundParseModule implements NestModule {
configure(consumer: MiddlewareConsumer): MiddlewareConsumer | void {}
Expand Down
3 changes: 1 addition & 2 deletions apps/api/src/app/shared/shared.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,8 @@ const PROVIDERS = [
version: packageJson.version,
})
),
QueuesModule,
],
providers: [...PROVIDERS],
exports: [...PROVIDERS, LoggerModule, QueuesModule],
Comment on lines -101 to -104
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure why we need this one here therefore I removed it.
If someone knows the reason please let me know.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was added to provide the QueuesModule as provider in any module that is being dependant of SharedModule so in case of need QueuesModule would be available without having to inject it in said module.

exports: [...PROVIDERS, LoggerModule],
})
export class SharedModule {}
4 changes: 3 additions & 1 deletion apps/api/src/app/widgets/widgets.module.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import { Module } from '@nestjs/common';

import { BaseApiQueuesModule } from '@novu/application-generic';

import { USE_CASES } from './usecases';
import { WidgetsController } from './widgets.controller';
import { SharedModule } from '../shared/shared.module';
Expand All @@ -8,7 +10,7 @@ import { SubscribersModule } from '../subscribers/subscribers.module';
import { IntegrationModule } from '../integrations/integrations.module';

@Module({
imports: [SharedModule, SubscribersModule, AuthModule, IntegrationModule],
imports: [SharedModule, SubscribersModule, AuthModule, IntegrationModule, BaseApiQueuesModule],
providers: [...USE_CASES],
exports: [...USE_CASES],
controllers: [WidgetsController],
Expand Down
5 changes: 5 additions & 0 deletions libs/shared/src/types/events/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,8 @@ export interface ITopic {
}

export type TriggerRecipientTopics = ITopic[];

export enum AddressingTypeEnum {
BROADCAST = 'broadcast',
MULTICAST = 'multicast',
}
1 change: 1 addition & 0 deletions packages/application-generic/src/modules/index.ts
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
export { QueuesModule } from './queues.module';
export { BaseApiQueuesModule } from './queues.module';
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can reuse the same export line.

16 changes: 16 additions & 0 deletions packages/application-generic/src/modules/queues.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,19 @@ const PROVIDERS: Provider[] = [
exports: [...PROVIDERS],
})
export class QueuesModule {}

const APP_PROVIDERS: Provider[] = [
InboundParseQueue,
InboundParseWorker,
InboundParseQueueServiceHealthIndicator,
WebSocketsQueueService,
WebSocketsQueueServiceHealthIndicator,
WorkflowQueueService,
WorkflowQueueServiceHealthIndicator,
];

@Module({
providers: [...APP_PROVIDERS],
exports: [...APP_PROVIDERS],
})
export class BaseApiQueuesModule {}
Comment on lines +70 to +84
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you trying to gather all the QueuesModule dependencies used only in API app?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes exactly, for two reasons

  1. that way we won't initialize redundant clients in API.
  2. in the near future (i hope) i want to try creating dynamic health validation, meaning if we inject a new HealthIndicator in the DI it will be validated out of the box, for example without adding it to the healthCheck in the api/src/app/health/health.controller.ts.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, now explained it is a great idea and didn't think about it (even though the clients should be only initialised in the custom providers if needed) and the services only when injected, if I am not wrong.
But the separation of the different services based on the App usage I like it. I would just suggest to create all them and name them as:
ApiQueuesModule
WorkerQueuesModule
WebSocketQueuesModule
etc.

I am happy to take that responsibility in a following task to this one so you don't have to take that work as we need this to move forward.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the small test I made locally before making the changes I saw that they do initialize even if not injected, i guess because they are singletons maybe?

Regarding the separation i totally agree, make it only for the API in order to keep the scope as small as possible for this PR as it is an extra for it. We definitely can update the other ones as well.

Once again I agree we can merge this one for now, the additional improvements can wait for now. I made this one as a preparation for the future.

Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,13 @@ import {
IsString,
IsOptional,
ValidateNested,
IsEnum,
} from 'class-validator';
import { ISubscribersDefine, ITenantDefine } from '@novu/shared';
import {
AddressingTypeEnum,
ISubscribersDefine,
ITenantDefine,
} from '@novu/shared';

import { EnvironmentWithUserCommand } from '../../commands';

Expand All @@ -19,8 +24,8 @@ export class TriggerEventCommand extends EnvironmentWithUserCommand {
@IsDefined()
overrides: Record<string, Record<string, unknown>>;

@IsDefined()
to: ISubscribersDefine[];
@IsOptional()
to?: ISubscribersDefine[];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reason behind making this optional?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question, it is related to this thought i had
#4526 (comment)
I was not sure how to provide extra data to parse-event and trigger-event usecases that we need to execute broadcast trigger.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, wasn't sure what that meant.
I think we might need to explore to create specific command to each case sharing a base command.


@IsString()
@IsDefined()
Expand All @@ -33,4 +38,7 @@ export class TriggerEventCommand extends EnvironmentWithUserCommand {
@IsOptional()
@ValidateNested()
tenant?: ITenantDefine | null;

@IsOptional()
addressingType?: AddressingTypeEnum;
}
Loading
Loading