Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor the broadcast method to utilize the new queue #4526

Conversation

djabarovgeorge
Copy link
Contributor

What change does this PR introduce?

Refactor the new broadcast method to utilize the new trigger-handler and process-subscriber queues, instead of doing that at the moment of the trigger.

Why was this change needed?

Other information (Screenshots)

@linear
Copy link

linear bot commented Oct 15, 2023

NV-3018 Refactor the `/broadcast` method, to utilize the new queue

What?

Refactor the new broadcast method to utilize the new trigger-handler and process-subscriber queues, instead of doing that at the moment of the trigger.

Why? (Context)

Definition of Done

Comment on lines 71 to 84
const APP_PROVIDERS: Provider[] = [
InboundParseQueue,
InboundParseWorker,
InboundParseQueueServiceHealthIndicator,
WebSocketsQueueService,
WebSocketsQueueServiceHealthIndicator,
WorkflowQueueService,
WorkflowQueueServiceHealthIndicator,
];
@Module({
providers: [...APP_PROVIDERS],
exports: [...APP_PROVIDERS],
})
export class BaseAppQueuesModule {}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not the final solution but a temporary one, in this part, I extracted the relevant queues, so we wont initialize not relevant queues in the API APP.
Will make a discovery in order to check if we could take advantage of nest dynamic modules in order to make it more convenient.
https://docs.nestjs.com/fundamentals/dynamic-modules

@@ -8,7 +10,7 @@ import { SubscribersModule } from '../subscribers/subscribers.module';
import { IntegrationModule } from '../integrations/integrations.module';

@Module({
imports: [SharedModule, SubscribersModule, AuthModule, IntegrationModule],
imports: [SharedModule, SubscribersModule, AuthModule, IntegrationModule, BaseAppQueuesModule],
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was needed because we removed QueueModule from the shared module.
Here we need BaseAppQueuesModule because we use WebSocketsQueueService.

Comment on lines -103 to -104
QueuesModule,
],
providers: [...PROVIDERS],
exports: [...PROVIDERS, LoggerModule, QueuesModule],
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure why we need this one here therefore I removed it.
If someone knows the reason please let me know.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was added to provide the QueuesModule as provider in any module that is being dependant of SharedModule so in case of need QueuesModule would be available without having to inject it in said module.

controllers: [InboundParseController],
providers: [...PROVIDERS, ...USE_CASES],
exports: [...USE_CASES, QueuesModule],
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to export this one here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It probably not anymore. I think it can be safe to be removed.

@@ -26,7 +25,6 @@ export class HealthController {
healthCheck(): Promise<HealthCheckResult> {
const checks: HealthIndicatorFunction[] = [
async () => this.dalHealthIndicator.isHealthy(),
async () => this.standardQueueHealthIndicator.isHealthy(),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We no longer need it because we will use only the workflow queue in the API.

@@ -10,7 +9,7 @@ import { ParseEventRequest } from '../parse-event-request/parse-event-request.us

@Injectable()
export class ProcessBulkTrigger {
constructor(private parseEventRequest: ParseEventRequest, private mapTriggerRecipients: MapTriggerRecipients) {}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

redundant 🧹

Comment on lines -42 to -59
AddJob,
AddDelayJob,
MergeOrCreateDigest,
CreateExecutionDetails,
CreateNotificationJobs,
DigestFilterSteps,
DigestFilterStepsBackoff,
DigestFilterStepsRegular,
DigestFilterStepsTimed,
GetNovuProviderCredentials,
StorageHelperService,
EventsDistributedLockService,
ProcessSubscriber,
ProcessTenant,
SendTestEmail,
StoreSubscriberJobs,
TriggerEvent,
MapTriggerRecipients,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i suspect we have more redundant providers in the DI.

@djabarovgeorge djabarovgeorge changed the title feat: refactor the broadcast method to utilize the new queue Refactor the broadcast method to utilize the new queue Oct 15, 2023
Base automatically changed from refactor-queue-health-indicator to next October 17, 2023 14:03
recipients: command.to,
transactionId: command.transactionId,
userId: command.userId,
actor: actorProcessed,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noticed that in the previous state is was using mappedActor Does anyone knows if there were particular reason? if there is i will refactor in order to pass mappedActor

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally have no idea.
@ainouzgali tagging you because it is my memory about the implementation of mapped actors was made by you, but I might be wrong, so apologies in advance.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe only the subscriberId is needed, so thats why only the mapping was sent. But the command MapTriggerRecipientsCommand is expecting ISubscriberDefine, not SubscriberEntity.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@djabarovgeorge , a few days ago my PR for adding actor was merged, let me know if you need help resolving conflicts or reintegrating it. As it seems we touched a lot of the same logic

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for bringing it up, i removed all of the actor/tenant i saw before the trigger event use-case (there was one redundant due to duplication in the flow), so now the trigger event use-case is responsible for the mapping.

Copy link
Contributor Author

@djabarovgeorge djabarovgeorge Oct 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was debating with myself if to update the to type, to contain AddressingTypeEnum as well or not.
In the end decided to add additional addressingType param.
Will love to hear if anyone has any ideas regarding the new Command interface.

small note, if addressingType === AddressingTypeEnum.BROADCAST the to is redundant because we will trigger to all of the subscribers.

controllers: [InboundParseController],
providers: [...PROVIDERS, ...USE_CASES],
exports: [...USE_CASES, QueuesModule],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It probably not anymore. I think it can be safe to be removed.

Comment on lines -103 to -104
QueuesModule,
],
providers: [...PROVIDERS],
exports: [...PROVIDERS, LoggerModule, QueuesModule],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was added to provide the QueuesModule as provider in any module that is being dependant of SharedModule so in case of need QueuesModule would be available without having to inject it in said module.

Comment on lines +71 to +85
const APP_PROVIDERS: Provider[] = [
InboundParseQueue,
InboundParseWorker,
InboundParseQueueServiceHealthIndicator,
WebSocketsQueueService,
WebSocketsQueueServiceHealthIndicator,
WorkflowQueueService,
WorkflowQueueServiceHealthIndicator,
];

@Module({
providers: [...APP_PROVIDERS],
exports: [...APP_PROVIDERS],
})
export class BaseApiQueuesModule {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you trying to gather all the QueuesModule dependencies used only in API app?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes exactly, for two reasons

  1. that way we won't initialize redundant clients in API.
  2. in the near future (i hope) i want to try creating dynamic health validation, meaning if we inject a new HealthIndicator in the DI it will be validated out of the box, for example without adding it to the healthCheck in the api/src/app/health/health.controller.ts.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, now explained it is a great idea and didn't think about it (even though the clients should be only initialised in the custom providers if needed) and the services only when injected, if I am not wrong.
But the separation of the different services based on the App usage I like it. I would just suggest to create all them and name them as:
ApiQueuesModule
WorkerQueuesModule
WebSocketQueuesModule
etc.

I am happy to take that responsibility in a following task to this one so you don't have to take that work as we need this to move forward.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the small test I made locally before making the changes I saw that they do initialize even if not injected, i guess because they are singletons maybe?

Regarding the separation i totally agree, make it only for the API in order to keep the scope as small as possible for this PR as it is an extra for it. We definitely can update the other ones as well.

Once again I agree we can merge this one for now, the additional improvements can wait for now. I made this one as a preparation for the future.

@@ -1 +1,2 @@
export { QueuesModule } from './queues.module';
export { BaseApiQueuesModule } from './queues.module';
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can reuse the same export line.

@IsDefined()
to: ISubscribersDefine[];
@IsOptional()
to?: ISubscribersDefine[];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reason behind making this optional?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question, it is related to this thought i had
#4526 (comment)
I was not sure how to provide extra data to parse-event and trigger-event usecases that we need to execute broadcast trigger.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, wasn't sure what that meant.
I think we might need to explore to create specific command to each case sharing a base command.

recipients: command.to,
transactionId: command.transactionId,
userId: command.userId,
actor: actorProcessed,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally have no idea.
@ainouzgali tagging you because it is my memory about the implementation of mapped actors was made by you, but I might be wrong, so apologies in advance.

Copy link
Contributor

@p-fernandez p-fernandez left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🌟

@djabarovgeorge djabarovgeorge merged commit 7722653 into next Nov 3, 2023
25 of 27 checks passed
@djabarovgeorge djabarovgeorge deleted the NV-3018-refactor-the-broadcast-method-to-utilize-the-new-queue branch November 3, 2023 09:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants