Skip to content

Commit

Permalink
feat(api): invalidate stale workflows
Browse files Browse the repository at this point in the history
  • Loading branch information
djabarovgeorge committed Nov 7, 2024
1 parent b427d72 commit 0cebb2d
Showing 1 changed file with 47 additions and 0 deletions.
47 changes: 47 additions & 0 deletions apps/api/src/app/bridge/usecases/sync/sync.usecase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@ import {
} from '@novu/dal';
import {
AnalyticsService,
buildNotificationTemplateIdentifierKey,
buildNotificationTemplateKey,
CreateWorkflow,
CreateWorkflowCommand,
ExecuteBridgeRequest,
InvalidateCacheService,
NotificationStep,
UpdateWorkflow,
UpdateWorkflowCommand,
Expand Down Expand Up @@ -40,6 +43,7 @@ export class Sync {
private environmentRepository: EnvironmentRepository,
private executeBridgeRequest: ExecuteBridgeRequest,
private analyticsService: AnalyticsService,
private invalidateCacheService: InvalidateCacheService,
private upsertPreferences: UpsertPreferences
) {}
async execute(command: SyncCommand): Promise<CreateBridgeResponseDto> {
Expand Down Expand Up @@ -81,6 +85,8 @@ export class Sync {
});
}

await this.invalidateStaleWorkflows(command, discover);

const persistedWorkflowsInBridge = await this.createWorkflows(command, discover.workflows);

await this.disposeOldWorkflows(command, persistedWorkflowsInBridge);
Expand All @@ -90,6 +96,47 @@ export class Sync {
return persistedWorkflowsInBridge;
}

private async invalidateStaleWorkflows(command: SyncCommand, discover: DiscoverOutput) {
const existingWorkflows = await this.notificationTemplateRepository.find(
{
_environmentId: command.environmentId,
type: {
$in: [WorkflowTypeEnum.ECHO, WorkflowTypeEnum.BRIDGE],
},
},
{
projection: {
'triggers.0.identifier': 1,
_id: 1,
},
}
);
const discoverWorkflowIdentifiers = discover.workflows.map((workflow) => workflow.workflowId);
const staleWorkflows = existingWorkflows.filter(
(_worklfow) => !discoverWorkflowIdentifiers.includes(_worklfow.triggers[0].identifier)
);
const staleWorkflowIdentifiers = staleWorkflows.map((workflow) => workflow.triggers[0].identifier);
const staleWorkflowIds = staleWorkflows.map((workflow) => workflow._id);

for (const identifier of staleWorkflowIdentifiers) {
await this.invalidateCacheService.invalidateByKey({
key: buildNotificationTemplateIdentifierKey({
templateIdentifier: identifier,
_environmentId: command.environmentId,
}),
});
}

for (const _id of staleWorkflowIds) {
await this.invalidateCacheService.invalidateByKey({
key: buildNotificationTemplateKey({
_id,
_environmentId: command.environmentId,
}),
});
}
}

private async updateBridgeUrl(command: SyncCommand): Promise<void> {
await this.environmentRepository.update(
{ _id: command.environmentId },
Expand Down

0 comments on commit 0cebb2d

Please sign in to comment.