From d454d17a54c8e95a5c9c4d558616792ce3deb13b Mon Sep 17 00:00:00 2001 From: Richard Fontein <32132657+rifont@users.noreply.github.com> Date: Tue, 15 Oct 2024 23:13:33 +0200 Subject: [PATCH] feat(api): Add Novu-managed Bridge endpoint per environment (#6451) --- .cspell.json | 1 + apps/api/src/app/bridge/bridge.controller.ts | 32 ++-- .../dtos/validate-bridge-url-response.dto.ts | 5 +- apps/api/src/app/bridge/shared/types.ts | 5 - .../get-bridge-status.command.ts | 11 +- .../get-bridge-status.usecase.ts | 35 ++-- .../preview-step/preview-step.command.ts | 6 +- .../preview-step/preview-step.usecase.ts | 127 ++++-------- .../app/bridge/usecases/sync/sync.usecase.ts | 5 +- .../app/environments/environments.module.ts | 10 +- .../app/environments/novu-bridge-client.ts | 64 +++++++ .../environments/novu-bridge.controller.ts | 27 +++ .../app/environments/novu-bridge.module.ts | 25 +++ .../construct-framework-workflow.command.ts | 8 + .../construct-framework-workflow.usecase.ts | 181 ++++++++++++++++++ .../construct-framework-workflow/index.ts | 2 + .../src/app/environments/usecases/index.ts | 1 - .../src/app/events/e2e/bridge-trigger.e2e.ts | 69 ++++++- apps/api/src/app/events/events.controller.ts | 1 - .../parse-event-request.usecase.ts | 24 +-- .../send-test-email.command.ts | 6 +- .../send-test-email.usecase.ts | 11 +- apps/api/src/app/shared/shared.module.ts | 4 +- .../mappers/notification-template-mapper.ts | 4 +- .../upsert-workflow.usecase.ts | 18 +- .../workflows-v2/workflow.controller.e2e.ts | 24 ++- .../components/v2/BridgeUpdateModal.tsx | 2 +- apps/worker/src/.env.test | 2 + apps/worker/src/app/shared/shared.module.ts | 2 + .../execute-bridge-job.usecase.ts | 91 ++++----- apps/worker/src/config/env.validators.ts | 1 + .../create-workflow.usecase.ts | 5 +- .../execute-bridge-request.command.ts | 21 +- .../execute-bridge-request.usecase.ts | 177 ++++++++++++++--- .../get-decrypted-secret-key.command.ts | 3 + .../get-decrypted-secret-key.usecase.ts | 23 +++ .../get-decrypted-secret-key/index.ts | 2 + .../application-generic/src/usecases/index.ts | 1 + libs/application-generic/src/utils/bridge.ts | 24 +++ .../dto/workflows/workflow-commons-fields.ts | 4 +- .../shared/src/types/http/headers.types.ts | 1 + 41 files changed, 800 insertions(+), 265 deletions(-) create mode 100644 apps/api/src/app/environments/novu-bridge-client.ts create mode 100644 apps/api/src/app/environments/novu-bridge.controller.ts create mode 100644 apps/api/src/app/environments/novu-bridge.module.ts create mode 100644 apps/api/src/app/environments/usecases/construct-framework-workflow/construct-framework-workflow.command.ts create mode 100644 apps/api/src/app/environments/usecases/construct-framework-workflow/construct-framework-workflow.usecase.ts create mode 100644 apps/api/src/app/environments/usecases/construct-framework-workflow/index.ts create mode 100644 libs/application-generic/src/usecases/get-decrypted-secret-key/get-decrypted-secret-key.command.ts create mode 100644 libs/application-generic/src/usecases/get-decrypted-secret-key/get-decrypted-secret-key.usecase.ts create mode 100644 libs/application-generic/src/usecases/get-decrypted-secret-key/index.ts diff --git a/.cspell.json b/.cspell.json index f3ae64ad43c..7b49da98f4c 100644 --- a/.cspell.json +++ b/.cspell.json @@ -309,6 +309,7 @@ "mailin", "Mailjet", "mailparser", + "Maily", "Maizzle", "mansagroup", "mantine", diff --git a/apps/api/src/app/bridge/bridge.controller.ts b/apps/api/src/app/bridge/bridge.controller.ts index 526d125eb5b..792b0cbd1e2 100644 --- a/apps/api/src/app/bridge/bridge.controller.ts +++ b/apps/api/src/app/bridge/bridge.controller.ts @@ -37,7 +37,7 @@ import { CreateBridgeResponseDto } from './dtos/create-bridge-response.dto'; export class BridgeController { constructor( private syncUsecase: Sync, - private validateBridgeUrlUsecase: GetBridgeStatus, + private getBridgeStatus: GetBridgeStatus, private environmentRepository: EnvironmentRepository, private notificationTemplateRepository: NotificationTemplateRepository, private controlValuesRepository: ControlValuesRepository, @@ -49,21 +49,12 @@ export class BridgeController { @Get('/status') @UseGuards(UserAuthGuard) async health(@UserSession() user: UserSessionData) { - const environment = await this.environmentRepository.findOne({ _id: user.environmentId }); - if (!environment?.echo?.url) { - throw new BadRequestException('Bridge URL not found'); - } - - const result = await this.validateBridgeUrlUsecase.execute( + const result = await this.getBridgeStatus.execute( GetBridgeStatusCommand.create({ - bridgeUrl: environment.echo.url, + environmentId: user.environmentId, }) ); - if (result.status !== 'ok') { - throw new Error('Bridge URL is not accessible'); - } - return result; } @@ -79,10 +70,8 @@ export class BridgeController { PreviewStepCommand.create({ workflowId, stepId, - inputs: data.controls || data.inputs, controls: data.controls || data.inputs, - data: data.payload, - bridgeUrl: data.bridgeUrl, + payload: data.payload, environmentId: user.environmentId, organizationId: user.organizationId, userId: user._id, @@ -214,17 +203,22 @@ export class BridgeController { @Post('/validate') @ExternalApiAccessible() - async validateBridgeUrl(@Body() body: ValidateBridgeUrlRequestDto): Promise { + @UseGuards(UserAuthGuard) + async validateBridgeUrl( + @UserSession() user: UserSessionData, + @Body() body: ValidateBridgeUrlRequestDto + ): Promise { try { - const result = await this.validateBridgeUrlUsecase.execute( + const result = await this.getBridgeStatus.execute( GetBridgeStatusCommand.create({ - bridgeUrl: body.bridgeUrl, + environmentId: user.environmentId, + statelessBridgeUrl: body.bridgeUrl, }) ); return { isValid: result.status === 'ok' }; } catch (err: any) { - return { isValid: false }; + return { isValid: false, error: err.message }; } } } diff --git a/apps/api/src/app/bridge/dtos/validate-bridge-url-response.dto.ts b/apps/api/src/app/bridge/dtos/validate-bridge-url-response.dto.ts index 065636c4a3a..6ce03c6267a 100644 --- a/apps/api/src/app/bridge/dtos/validate-bridge-url-response.dto.ts +++ b/apps/api/src/app/bridge/dtos/validate-bridge-url-response.dto.ts @@ -1,6 +1,9 @@ -import { ApiProperty } from '@nestjs/swagger'; +import { ApiProperty, ApiPropertyOptional } from '@nestjs/swagger'; export class ValidateBridgeUrlResponseDto { @ApiProperty() isValid: boolean; + + @ApiPropertyOptional() + error?: string; } diff --git a/apps/api/src/app/bridge/shared/types.ts b/apps/api/src/app/bridge/shared/types.ts index 3d631696b93..c1dbf76abff 100644 --- a/apps/api/src/app/bridge/shared/types.ts +++ b/apps/api/src/app/bridge/shared/types.ts @@ -30,8 +30,3 @@ export interface IWorkflowDefineStep { code: string; } - -export enum BridgeErrorCodeEnum { - BRIDGE_UNEXPECTED_RESPONSE = 'BRIDGE_UNEXPECTED_RESPONSE', - BRIDGE_ENDPOINT_NOT_FOUND = 'BRIDGE_ENDPOINT_NOT_FOUND', -} diff --git a/apps/api/src/app/bridge/usecases/get-bridge-status/get-bridge-status.command.ts b/apps/api/src/app/bridge/usecases/get-bridge-status/get-bridge-status.command.ts index 9547a8efa9b..27efc48887b 100644 --- a/apps/api/src/app/bridge/usecases/get-bridge-status/get-bridge-status.command.ts +++ b/apps/api/src/app/bridge/usecases/get-bridge-status/get-bridge-status.command.ts @@ -1,10 +1,5 @@ -import { IsUrl } from 'class-validator'; -import { BaseCommand } from '@novu/application-generic'; +import { EnvironmentLevelCommand } from '@novu/application-generic'; -export class GetBridgeStatusCommand extends BaseCommand { - @IsUrl({ - require_protocol: true, - require_tld: false, - }) - bridgeUrl: string; +export class GetBridgeStatusCommand extends EnvironmentLevelCommand { + statelessBridgeUrl?: string; } diff --git a/apps/api/src/app/bridge/usecases/get-bridge-status/get-bridge-status.usecase.ts b/apps/api/src/app/bridge/usecases/get-bridge-status/get-bridge-status.usecase.ts index b75dee2bc7a..0d4a66555fe 100644 --- a/apps/api/src/app/bridge/usecases/get-bridge-status/get-bridge-status.usecase.ts +++ b/apps/api/src/app/bridge/usecases/get-bridge-status/get-bridge-status.usecase.ts @@ -1,34 +1,35 @@ -import { BadRequestException, Logger, Injectable } from '@nestjs/common'; -import axios from 'axios'; -import { HealthCheck, GetActionEnum, HttpQueryKeysEnum } from '@novu/framework'; +import { Logger, Injectable } from '@nestjs/common'; +import { HealthCheck, GetActionEnum } from '@novu/framework'; +import { ExecuteBridgeRequest, ExecuteBridgeRequestCommand, ExecuteBridgeRequestDto } from '@novu/application-generic'; +import { WorkflowOriginEnum } from '@novu/shared'; import { GetBridgeStatusCommand } from './get-bridge-status.command'; -const axiosInstance = axios.create(); - export const LOG_CONTEXT = 'GetBridgeStatusUsecase'; @Injectable() export class GetBridgeStatus { + constructor(private executeBridgeRequest: ExecuteBridgeRequest) {} + async execute(command: GetBridgeStatusCommand): Promise { try { - const bridgeActionUrl = new URL(command.bridgeUrl); - bridgeActionUrl.searchParams.set(HttpQueryKeysEnum.ACTION, GetActionEnum.HEALTH_CHECK); - - const response = await axiosInstance.get(bridgeActionUrl.toString(), { - headers: { - 'Bypass-Tunnel-Reminder': 'true', - 'content-type': 'application/json', - }, - }); + const response = (await this.executeBridgeRequest.execute( + ExecuteBridgeRequestCommand.create({ + environmentId: command.environmentId, + action: GetActionEnum.HEALTH_CHECK, + workflowOrigin: WorkflowOriginEnum.EXTERNAL, + statelessBridgeUrl: command.statelessBridgeUrl, + retriesLimit: 1, + }) + )) as ExecuteBridgeRequestDto; - return response.data; + return response; } catch (err: any) { Logger.error( - `Failed to verify Bridge endpoint ${command.bridgeUrl} with error: ${(err as Error).message || err}`, + `Failed to verify Bridge endpoint for environment ${command.environmentId} with error: ${(err as Error).message || err}`, (err as Error).stack, LOG_CONTEXT ); - throw new BadRequestException(`Bridge is not accessible. ${err.message}`); + throw err; } } } diff --git a/apps/api/src/app/bridge/usecases/preview-step/preview-step.command.ts b/apps/api/src/app/bridge/usecases/preview-step/preview-step.command.ts index eded591c535..e2e89bd2621 100644 --- a/apps/api/src/app/bridge/usecases/preview-step/preview-step.command.ts +++ b/apps/api/src/app/bridge/usecases/preview-step/preview-step.command.ts @@ -3,8 +3,6 @@ import { EnvironmentWithUserCommand } from '@novu/application-generic'; export class PreviewStepCommand extends EnvironmentWithUserCommand { workflowId: string; stepId: string; - inputs: any; - controls: any; - data: any; - bridgeUrl?: string; + controls: Record; + payload: Record; } diff --git a/apps/api/src/app/bridge/usecases/preview-step/preview-step.usecase.ts b/apps/api/src/app/bridge/usecases/preview-step/preview-step.usecase.ts index f5358cc0acb..96e546826a8 100644 --- a/apps/api/src/app/bridge/usecases/preview-step/preview-step.usecase.ts +++ b/apps/api/src/app/bridge/usecases/preview-step/preview-step.usecase.ts @@ -1,114 +1,53 @@ -import { createHmac } from 'crypto'; -import axios from 'axios'; -import { BadRequestException, Injectable } from '@nestjs/common'; -import { PostActionEnum, HttpQueryKeysEnum } from '@novu/framework'; - -import { EnvironmentRepository } from '@novu/dal'; -import { decryptApiKey } from '@novu/application-generic'; +import { Injectable } from '@nestjs/common'; +import { PostActionEnum, HttpQueryKeysEnum, Event, JobStatusEnum, ExecuteOutput } from '@novu/framework'; +import { ExecuteBridgeRequest, ExecuteBridgeRequestCommand } from '@novu/application-generic'; +import { WorkflowOriginEnum } from '@novu/shared'; import { PreviewStepCommand } from './preview-step.command'; -import { BridgeErrorCodeEnum } from '../../shared'; @Injectable() export class PreviewStep { - constructor(private environmentRepository: EnvironmentRepository) {} - - async execute(command: PreviewStepCommand) { - const environment = await this.environmentRepository.findOne({ _id: command.environmentId }); - const bridgeUrl = command.bridgeUrl || environment?.echo.url; - if (!bridgeUrl) { - throw new BadRequestException('Bridge URL not found'); - } - - const axiosInstance = axios.create(); - try { - const payload = this.mapPayload(command); - const novuSignatureHeader = this.buildNovuSignature(environment, payload); - const bridgeActionUrl = new URL(bridgeUrl); - bridgeActionUrl.searchParams.set(HttpQueryKeysEnum.ACTION, PostActionEnum.PREVIEW); - bridgeActionUrl.searchParams.set(HttpQueryKeysEnum.WORKFLOW_ID, command.workflowId); - bridgeActionUrl.searchParams.set(HttpQueryKeysEnum.STEP_ID, command.stepId); - - const response = await axiosInstance.post(bridgeActionUrl.toString(), payload, { - headers: { - 'content-type': 'application/json', - 'x-novu-signature': novuSignatureHeader, - 'novu-signature': novuSignatureHeader, + constructor(private executeBridgeRequest: ExecuteBridgeRequest) {} + + async execute(command: PreviewStepCommand): Promise { + const event = this.mapEvent(command); + + const response = (await this.executeBridgeRequest.execute( + ExecuteBridgeRequestCommand.create({ + environmentId: command.environmentId, + action: PostActionEnum.PREVIEW, + event, + searchParams: { + [HttpQueryKeysEnum.WORKFLOW_ID]: command.workflowId, + [HttpQueryKeysEnum.STEP_ID]: command.stepId, }, - }); - - if (!response.data?.outputs || !response.data?.metadata) { - throw new BadRequestException({ - code: BridgeErrorCodeEnum.BRIDGE_UNEXPECTED_RESPONSE, - message: JSON.stringify(response.data), - }); - } - - return response.data; - } catch (e: any) { - if (e?.response?.status === 404) { - throw new BadRequestException({ - code: BridgeErrorCodeEnum.BRIDGE_ENDPOINT_NOT_FOUND, - message: `Bridge Endpoint Was not found or not accessible. Endpoint: ${bridgeUrl}`, - }); - } - - if (e?.response?.status === 405) { - throw new BadRequestException({ - code: BridgeErrorCodeEnum.BRIDGE_ENDPOINT_NOT_FOUND, - message: `Bridge Endpoint is not properly configured. : ${bridgeUrl}`, - }); - } - - if (e.code === BridgeErrorCodeEnum.BRIDGE_UNEXPECTED_RESPONSE) { - throw e; - } + // TODO: pass the origin from the command + workflowOrigin: WorkflowOriginEnum.EXTERNAL, + retriesLimit: 1, + }) + )) as ExecuteOutput; - // todo add status indication - check if e?.response?.status === 400 here - if (e?.response?.data) { - throw new BadRequestException(e.response.data); - } - - throw new BadRequestException({ - code: BridgeErrorCodeEnum.BRIDGE_UNEXPECTED_RESPONSE, - message: `Un-expected Bridge response: ${e.message}`, - }); - } + return response; } - private mapPayload(command: PreviewStepCommand) { + private mapEvent(command: PreviewStepCommand): Omit { const payload = { - inputs: command.controls || command.inputs || {}, - controls: command.controls || command.inputs || {}, - data: command.data || {}, + /** @deprecated - use controls instead */ + inputs: command.controls || {}, + controls: command.controls || {}, + /** @deprecated - use payload instead */ + data: command.payload || {}, + payload: command.payload || {}, state: [ { stepId: 'trigger', - outputs: command.data || {}, + outputs: command.payload || {}, + state: { status: JobStatusEnum.COMPLETED }, }, ], + subscriber: {}, }; return payload; } - - private buildNovuSignature( - environment, - payload: { data: any; inputs: any; controls: any; state: { outputs: any; stepId: string }[] } - ) { - const timestamp = Date.now(); - const xNovuSignature = `t=${timestamp},v1=${this.createHmacByApiKey( - environment.apiKeys[0].key, - timestamp, - payload - )}`; - - return xNovuSignature; - } - - private createHmacByApiKey(secret: string, timestamp: number, payload) { - const publicKey = `${timestamp}.${JSON.stringify(payload)}`; - - return createHmac('sha256', decryptApiKey(secret)).update(publicKey).digest('hex'); - } } diff --git a/apps/api/src/app/bridge/usecases/sync/sync.usecase.ts b/apps/api/src/app/bridge/usecases/sync/sync.usecase.ts index b975268868c..b5556063423 100644 --- a/apps/api/src/app/bridge/usecases/sync/sync.usecase.ts +++ b/apps/api/src/app/bridge/usecases/sync/sync.usecase.ts @@ -50,10 +50,11 @@ export class Sync { let discover: DiscoverOutput | undefined; try { discover = (await this.executeBridgeRequest.execute({ - bridgeUrl: command.bridgeUrl, - apiKey: environment.apiKeys[0].key, + statelessBridgeUrl: command.bridgeUrl, + environmentId: command.environmentId, action: GetActionEnum.DISCOVER, retriesLimit: 1, + workflowOrigin: WorkflowOriginEnum.EXTERNAL, })) as DiscoverOutput; } catch (error: any) { throw new BadRequestException(`Bridge URL is not valid. ${error.message}`); diff --git a/apps/api/src/app/environments/environments.module.ts b/apps/api/src/app/environments/environments.module.ts index 4a1e0493d08..b3f228bcc77 100644 --- a/apps/api/src/app/environments/environments.module.ts +++ b/apps/api/src/app/environments/environments.module.ts @@ -1,13 +1,21 @@ import { forwardRef, Module } from '@nestjs/common'; + import { SharedModule } from '../shared/shared.module'; import { USE_CASES } from './usecases'; import { EnvironmentsController } from './environments.controller'; import { NotificationGroupsModule } from '../notification-groups/notification-groups.module'; import { AuthModule } from '../auth/auth.module'; import { LayoutsModule } from '../layouts/layouts.module'; +import { NovuBridgeModule } from './novu-bridge.module'; @Module({ - imports: [SharedModule, NotificationGroupsModule, forwardRef(() => AuthModule), forwardRef(() => LayoutsModule)], + imports: [ + SharedModule, + NotificationGroupsModule, + forwardRef(() => AuthModule), + forwardRef(() => LayoutsModule), + NovuBridgeModule, + ], controllers: [EnvironmentsController], providers: [...USE_CASES], exports: [...USE_CASES], diff --git a/apps/api/src/app/environments/novu-bridge-client.ts b/apps/api/src/app/environments/novu-bridge-client.ts new file mode 100644 index 00000000000..d7a3614898f --- /dev/null +++ b/apps/api/src/app/environments/novu-bridge-client.ts @@ -0,0 +1,64 @@ +import { Injectable, Inject, Scope } from '@nestjs/common'; +import type { Request, Response } from 'express'; + +import { Client, PostActionEnum, NovuRequestHandler, Workflow } from '@novu/framework'; +// @ts-expect-error - TODO: bundle CJS with @novu/framework +import { NovuHandler } from '@novu/framework/nest'; +import { GetDecryptedSecretKey, GetDecryptedSecretKeyCommand } from '@novu/application-generic'; +import { ConstructFrameworkWorkflow, ConstructFrameworkWorkflowCommand } from './usecases/construct-framework-workflow'; + +/* + * A custom framework name is specified for the Novu-managed Bridge endpoint + * to provide a clear distinction between Novu-managed and self-managed Bridge endpoints. + */ +export const frameworkName = 'novu-nest'; + +/** + * This class overrides the default NestJS Novu Bridge Client to allow for dynamic construction of + * workflows to serve on the Novu Bridge. + */ +@Injectable({ scope: Scope.REQUEST }) +export class NovuBridgeClient { + public novuRequestHandler: NovuRequestHandler | null = null; + + constructor( + @Inject(NovuHandler) private novuHandler: NovuHandler, + private constructFrameworkWorkflow: ConstructFrameworkWorkflow, + private getDecryptedSecretKey: GetDecryptedSecretKey + ) {} + + public async handleRequest(req: Request, res: Response) { + const secretKey = await this.getDecryptedSecretKey.execute( + GetDecryptedSecretKeyCommand.create({ + environmentId: req.params.environmentId, + }) + ); + + const workflows: Workflow[] = []; + + /* + * Only construct a workflow when dealing with a POST request to the Novu-managed Bridge endpoint. + * Non-POST requests don't have a `workflowId` query parameter, so we can't construct a workflow. + * Those non-POST requests are handled for the purpose of returning a successful health-check. + */ + if (Object.values(PostActionEnum).includes(req.query.action as PostActionEnum)) { + const programmaticallyConstructedWorkflow = await this.constructFrameworkWorkflow.execute( + ConstructFrameworkWorkflowCommand.create({ + environmentId: req.params.environmentId, + workflowId: req.query.workflowId as string, + }) + ); + + workflows.push(programmaticallyConstructedWorkflow); + } + + this.novuRequestHandler = new NovuRequestHandler({ + frameworkName, + workflows, + client: new Client({ secretKey, strictAuthentication: true }), + handler: this.novuHandler.handler, + }); + + await this.novuRequestHandler.createHandler()(req, res); + } +} diff --git a/apps/api/src/app/environments/novu-bridge.controller.ts b/apps/api/src/app/environments/novu-bridge.controller.ts new file mode 100644 index 00000000000..855166e62cf --- /dev/null +++ b/apps/api/src/app/environments/novu-bridge.controller.ts @@ -0,0 +1,27 @@ +import { Controller, Req, Res, Inject, Get, Post, Options } from '@nestjs/common'; +import { Request, Response } from 'express'; +import { ApiExcludeController } from '@nestjs/swagger'; +// @ts-expect-error - TODO: bundle CJS with @novu/framework +import { NovuClient } from '@novu/framework/nest'; +import { NovuBridgeClient } from './novu-bridge-client'; + +@Controller('/environments/:environmentId/bridge') +@ApiExcludeController() +export class NovuBridgeController { + constructor(@Inject(NovuClient) private novuService: NovuBridgeClient) {} + + @Get() + async handleGet(@Req() req: Request, @Res() res: Response) { + await this.novuService.handleRequest(req, res); + } + + @Post() + async handlePost(@Req() req: Request, @Res() res: Response) { + await this.novuService.handleRequest(req, res); + } + + @Options() + async handleOptions(@Req() req: Request, @Res() res: Response) { + await this.novuService.handleRequest(req, res); + } +} diff --git a/apps/api/src/app/environments/novu-bridge.module.ts b/apps/api/src/app/environments/novu-bridge.module.ts new file mode 100644 index 00000000000..c361bb0b53a --- /dev/null +++ b/apps/api/src/app/environments/novu-bridge.module.ts @@ -0,0 +1,25 @@ +import { Module } from '@nestjs/common'; +// @ts-expect-error - TODO: bundle CJS with @novu/framework +import { NovuClient, NovuHandler } from '@novu/framework/nest'; + +import { EnvironmentRepository, NotificationTemplateRepository } from '@novu/dal'; +import { GetDecryptedSecretKey } from '@novu/application-generic'; +import { NovuBridgeClient } from './novu-bridge-client'; +import { ConstructFrameworkWorkflow } from './usecases/construct-framework-workflow'; +import { NovuBridgeController } from './novu-bridge.controller'; + +@Module({ + controllers: [NovuBridgeController], + providers: [ + { + provide: NovuClient, + useClass: NovuBridgeClient, + }, + NovuHandler, + EnvironmentRepository, + NotificationTemplateRepository, + ConstructFrameworkWorkflow, + GetDecryptedSecretKey, + ], +}) +export class NovuBridgeModule {} diff --git a/apps/api/src/app/environments/usecases/construct-framework-workflow/construct-framework-workflow.command.ts b/apps/api/src/app/environments/usecases/construct-framework-workflow/construct-framework-workflow.command.ts new file mode 100644 index 00000000000..ed72ff190fe --- /dev/null +++ b/apps/api/src/app/environments/usecases/construct-framework-workflow/construct-framework-workflow.command.ts @@ -0,0 +1,8 @@ +import { EnvironmentLevelCommand } from '@novu/application-generic'; +import { IsDefined, IsObject, IsString } from 'class-validator'; + +export class ConstructFrameworkWorkflowCommand extends EnvironmentLevelCommand { + @IsString() + @IsDefined() + workflowId: string; +} diff --git a/apps/api/src/app/environments/usecases/construct-framework-workflow/construct-framework-workflow.usecase.ts b/apps/api/src/app/environments/usecases/construct-framework-workflow/construct-framework-workflow.usecase.ts new file mode 100644 index 00000000000..ca9619d64ae --- /dev/null +++ b/apps/api/src/app/environments/usecases/construct-framework-workflow/construct-framework-workflow.usecase.ts @@ -0,0 +1,181 @@ +import { Injectable, InternalServerErrorException } from '@nestjs/common'; +import { + ActionStep, + ChannelStep, + ChatOutput, + DelayOutput, + DigestOutput, + EmailOutput, + InAppOutput, + PushOutput, + SmsOutput, + Step, + StepOptions, + StepOutput, + Workflow, + workflow, +} from '@novu/framework'; +import { NotificationTemplateRepository, NotificationTemplateEntity, NotificationStepEntity } from '@novu/dal'; +import { StepTypeEnum } from '@novu/shared'; +import { ConstructFrameworkWorkflowCommand } from './construct-framework-workflow.command'; + +@Injectable() +export class ConstructFrameworkWorkflow { + constructor(private workflowsRepository: NotificationTemplateRepository) {} + + async execute(command: ConstructFrameworkWorkflowCommand): Promise { + const dbWorkflow = await this.getDbWorkflow(command.environmentId, command.workflowId); + + return this.constructFrameworkWorkflow(dbWorkflow); + } + + private async getDbWorkflow(environmentId: string, workflowId: string): Promise { + const foundWorkflow = await this.workflowsRepository.findByTriggerIdentifier(environmentId, workflowId); + + if (!foundWorkflow) { + throw new InternalServerErrorException(`Workflow ${workflowId} not found`); + } + + return foundWorkflow; + } + + private constructFrameworkWorkflow(newWorkflow: NotificationTemplateEntity): Workflow { + return workflow( + newWorkflow.triggers[0].identifier, + async ({ step }) => { + for await (const staticStep of newWorkflow.steps) { + await this.constructStep(step, staticStep); + } + }, + { + /* + * TODO: Workflow options are not needed currently, given that this endpoint + * focuses on execution only. However we should reconsider if we decide to + * expose Workflow options to the `workflow` function. + * + * preferences: foundWorkflow.preferences, + * tags: foundWorkflow.tags, + */ + } + ); + } + + private constructStep(step: Step, staticStep: NotificationStepEntity): StepOutput> { + const stepTemplate = staticStep.template; + + if (!stepTemplate) { + throw new InternalServerErrorException(`Step template not found for step ${staticStep.stepId}`); + } + + const stepType = stepTemplate.type; + const { stepId } = staticStep; + + if (!stepId) { + throw new InternalServerErrorException(`Step id not found for step ${staticStep.stepId}`); + } + + const stepControls = stepTemplate.controls; + + if (!stepControls) { + throw new InternalServerErrorException(`Step controls not found for step ${staticStep.stepId}`); + } + + switch (stepType) { + case StepTypeEnum.IN_APP: + return step.inApp( + // The step id is used internally by the framework to identify the step + stepId, + // The step callback function. Takes controls and returns the step outputs + async (controlValues) => { + // TODO: insert custom in-app hydration logic here. + return controlValues as InAppOutput; + }, + // Step options + this.constructChannelStepOptions(staticStep) + ); + case StepTypeEnum.EMAIL: + return step.email( + stepId, + async (controlValues) => { + // TODO: insert custom Maily.to hydration logic here. + return controlValues as EmailOutput; + }, + this.constructChannelStepOptions(staticStep) + ); + case StepTypeEnum.SMS: + return step.inApp( + stepId, + async (controlValues) => { + // TODO: insert custom SMS hydration logic here. + return controlValues as SmsOutput; + }, + this.constructChannelStepOptions(staticStep) + ); + case StepTypeEnum.CHAT: + return step.inApp( + stepId, + async (controlValues) => { + // TODO: insert custom chat hydration logic here. + return controlValues as ChatOutput; + }, + this.constructChannelStepOptions(staticStep) + ); + case StepTypeEnum.PUSH: + return step.inApp( + stepId, + async (controlValues) => { + // TODO: insert custom push hydration logic here. + return controlValues as PushOutput; + }, + this.constructChannelStepOptions(staticStep) + ); + case StepTypeEnum.DIGEST: + return step.digest( + stepId, + async (controlValues) => { + return controlValues as DigestOutput; + }, + this.constructActionStepOptions(staticStep) + ); + case StepTypeEnum.DELAY: + return step.delay( + stepId, + async (controlValues) => { + return controlValues as DelayOutput; + }, + this.constructActionStepOptions(staticStep) + ); + default: + throw new InternalServerErrorException(`Step type ${stepType} is not supported`); + } + } + + private constructChannelStepOptions(staticStep: NotificationStepEntity): Required[2]> { + return { + ...this.constructCommonStepOptions(staticStep), + // TODO: resolve this from the Step options + disableOutputSanitization: false, + // TODO: add providers + providers: {}, + }; + } + + private constructActionStepOptions(staticStep: NotificationStepEntity): Required[2]> { + return { + ...this.constructCommonStepOptions(staticStep), + }; + } + + private constructCommonStepOptions(staticStep: NotificationStepEntity): Required { + return { + /** @deprecated */ + inputSchema: staticStep.template!.controls!.schema, + controlSchema: staticStep.template!.controls!.schema, + /* + * TODO: add conditions + * Used to construct conditions defined with https://react-querybuilder.js.org/ or similar + */ + skip: (controlValues) => false, + }; + } +} diff --git a/apps/api/src/app/environments/usecases/construct-framework-workflow/index.ts b/apps/api/src/app/environments/usecases/construct-framework-workflow/index.ts new file mode 100644 index 00000000000..22b3e6ebbe3 --- /dev/null +++ b/apps/api/src/app/environments/usecases/construct-framework-workflow/index.ts @@ -0,0 +1,2 @@ +export * from './construct-framework-workflow.usecase'; +export * from './construct-framework-workflow.command'; diff --git a/apps/api/src/app/environments/usecases/index.ts b/apps/api/src/app/environments/usecases/index.ts index 267b0540a7a..98658a41aa8 100644 --- a/apps/api/src/app/environments/usecases/index.ts +++ b/apps/api/src/app/environments/usecases/index.ts @@ -8,7 +8,6 @@ import { UpdateEnvironment } from './update-environment/update-environment.useca import { GetMxRecord } from '../../inbound-parse/usecases/get-mx-record/get-mx-record.usecase'; export const USE_CASES = [ - // GetMxRecord, CreateEnvironment, UpdateEnvironment, diff --git a/apps/api/src/app/events/e2e/bridge-trigger.e2e.ts b/apps/api/src/app/events/e2e/bridge-trigger.e2e.ts index 844d80d6d80..a88266a8d1e 100644 --- a/apps/api/src/app/events/e2e/bridge-trigger.e2e.ts +++ b/apps/api/src/app/events/e2e/bridge-trigger.e2e.ts @@ -12,12 +12,15 @@ import { } from '@novu/dal'; import { ChannelTypeEnum, + CreateWorkflowDto, ExecutionDetailsStatusEnum, JobStatusEnum, MessagesStatusEnum, StepTypeEnum, + WorkflowCreationSourceEnum, + WorkflowResponseDto, } from '@novu/shared'; -import { workflow } from '@novu/framework'; +import { workflow, channelStepSchemas } from '@novu/framework'; import { DetailEnum } from '@novu/application-generic'; import { BridgeServer } from '../../../../e2e/bridge.server'; @@ -31,7 +34,7 @@ const contexts: Context[] = [ ]; contexts.forEach((context: Context) => { - describe('Bridge Trigger', async () => { + describe('Self-Hosted Bridge Trigger', async () => { let session: UserSession; let bridgeServer: BridgeServer; const messageRepository = new MessageRepository(); @@ -1467,6 +1470,68 @@ contexts.forEach((context: Context) => { }); }); +describe('Novu-Hosted Bridge Trigger', () => { + let session: UserSession; + const messageRepository = new MessageRepository(); + let subscriber: SubscriberEntity; + let subscriberService: SubscribersService; + + beforeEach(async () => { + session = new UserSession(); + await session.initialize(); + subscriberService = new SubscribersService(session.organization._id, session.environment._id); + subscriber = await subscriberService.createSubscriber({ _id: session.subscriberId }); + }); + + it('should execute a Novu-managed workflow', async () => { + const createWorkflowDto: CreateWorkflowDto = { + name: 'Test Workflow', + description: 'Test Workflow', + __source: WorkflowCreationSourceEnum.DASHBOARD, + workflowId: 'test-workflow', + steps: [ + { + type: StepTypeEnum.IN_APP, + name: 'Test Step 1', + controls: { + schema: channelStepSchemas.in_app.output, + }, + controlValues: { + body: 'Test Body', + }, + }, + { + type: StepTypeEnum.IN_APP, + name: 'Test Step 2', + controls: { + schema: channelStepSchemas.in_app.output, + }, + controlValues: { + body: 'Test Body', + }, + }, + ], + }; + + const response = await session.testAgent.post(`/v2/workflows`).send(createWorkflowDto); + expect(response.status).to.be.eq(201); + + const responseData = response.body.data as WorkflowResponseDto; + + await triggerEvent(session, responseData.workflowId, subscriber._id, {}); + await session.awaitRunningJobs(); + + const sentMessages = await messageRepository.find({ + _environmentId: session.environment._id, + _subscriberId: session.subscriberProfile?._id, + templateIdentifier: responseData.workflowId, + channel: StepTypeEnum.IN_APP, + }); + + expect(sentMessages.length).to.be.eq(2); + }); +}); + async function syncWorkflow( session: UserSession, workflowsRepository: NotificationTemplateRepository, diff --git a/apps/api/src/app/events/events.controller.ts b/apps/api/src/app/events/events.controller.ts index 3746816c882..fcfe53eb6b7 100644 --- a/apps/api/src/app/events/events.controller.ts +++ b/apps/api/src/app/events/events.controller.ts @@ -172,7 +172,6 @@ export class EventsController { workflowId: body.workflowId, stepId: body.stepId, bridge: body.bridge, - inputs: body.controls || body.inputs, controls: body.controls || body.inputs, }) ); diff --git a/apps/api/src/app/events/usecases/parse-event-request/parse-event-request.usecase.ts b/apps/api/src/app/events/usecases/parse-event-request/parse-event-request.usecase.ts index 2f82c29ed87..a7fb786815a 100644 --- a/apps/api/src/app/events/usecases/parse-event-request/parse-event-request.usecase.ts +++ b/apps/api/src/app/events/usecases/parse-event-request/parse-event-request.usecase.ts @@ -28,6 +28,7 @@ import { ReservedVariablesMap, TriggerContextTypeEnum, TriggerEventStatusEnum, + WorkflowOriginEnum, } from '@novu/shared'; import { WorkflowOverrideRepository, @@ -79,10 +80,13 @@ export class ParseEventRequest { public async execute(command: ParseEventRequestCommand) { const transactionId = command.transactionId || uuidv4(); - const { environment } = await this.isStatelessWorkflowAllowed(command.environmentId, command.bridgeUrl); + const { environment, statelessWorkflowAllowed } = await this.isStatelessWorkflowAllowed( + command.environmentId, + command.bridgeUrl + ); - if (environment) { - const discoveredWorkflow = await this.queryDiscoverWorkflow(command, environment); + if (environment && statelessWorkflowAllowed) { + const discoveredWorkflow = await this.queryDiscoverWorkflow(command); if (!discoveredWorkflow) { throw new UnprocessableEntityException('workflow_not_found'); @@ -184,19 +188,17 @@ export class ParseEventRequest { return await this.dispatchEvent(command, transactionId); } - private async queryDiscoverWorkflow( - command: ParseEventRequestCommand, - environment: EnvironmentEntity - ): Promise { + private async queryDiscoverWorkflow(command: ParseEventRequestCommand): Promise { if (!command.bridgeUrl) { return null; } const discover = (await this.executeBridgeRequest.execute( ExecuteBridgeRequestCommand.create({ - bridgeUrl: command.bridgeUrl, - apiKey: environment.apiKeys[0].key, + statelessBridgeUrl: command.bridgeUrl, + environmentId: command.environmentId, action: GetActionEnum.DISCOVER, + workflowOrigin: WorkflowOriginEnum.EXTERNAL, }) )) as ExecuteBridgeRequestDto; @@ -238,9 +240,7 @@ export class ParseEventRequest { throw new UnprocessableEntityException('Environment not found'); } - const statelessWorkflowAllowed = environment.name !== 'Production'; - - return { environment, statelessWorkflowAllowed }; + return { environment, statelessWorkflowAllowed: true }; } @Instrument() diff --git a/apps/api/src/app/events/usecases/send-test-email/send-test-email.command.ts b/apps/api/src/app/events/usecases/send-test-email/send-test-email.command.ts index c5f40c240ae..d77328544ce 100644 --- a/apps/api/src/app/events/usecases/send-test-email/send-test-email.command.ts +++ b/apps/api/src/app/events/usecases/send-test-email/send-test-email.command.ts @@ -7,7 +7,7 @@ export class SendTestEmailCommand extends EnvironmentWithUserCommand { contentType: 'customHtml' | 'editor'; @IsDefined() - payload: any; // eslint-disable-line @typescript-eslint/no-explicit-any + payload: Record; @IsDefined() @IsString() @@ -40,9 +40,7 @@ export class SendTestEmailCommand extends EnvironmentWithUserCommand { stepId?: string | null; @IsOptional() - inputs: any; - @IsOptional() - controls: any; + controls: Record; @IsOptional() @IsString() diff --git a/apps/api/src/app/events/usecases/send-test-email/send-test-email.usecase.ts b/apps/api/src/app/events/usecases/send-test-email/send-test-email.usecase.ts index 616f648e9cb..210a42bfdf9 100644 --- a/apps/api/src/app/events/usecases/send-test-email/send-test-email.usecase.ts +++ b/apps/api/src/app/events/usecases/send-test-email/send-test-email.usecase.ts @@ -97,9 +97,8 @@ export class SendTestEmail { PreviewStepCommand.create({ workflowId: command.workflowId, stepId: command.stepId, - inputs: command.controls || command.inputs, - controls: command.controls || command.inputs, - data: command.payload, + controls: command.controls, + payload: command.payload, environmentId: command.environmentId, organizationId: command.organizationId, userId: command.userId, @@ -110,8 +109,8 @@ export class SendTestEmail { throw new ApiException('Could not retrieve content from edge'); } - html = data.outputs.body; - subject = data.outputs.subject; + html = data.outputs.body as string; + subject = data.outputs.subject as string; if (data.providers && typeof data.providers === 'object') { bridgeProviderData = data.providers[integration.providerId] || {}; @@ -123,7 +122,7 @@ export class SendTestEmail { to: Array.isArray(email) ? email : [email], subject, html: html as string, - from: command.payload.$sender_email || integration?.credentials.from || 'no-reply@novu.co', + from: (command.payload.$sender_email as string) || integration?.credentials.from || 'no-reply@novu.co', }; await this.sendMessage(integration, mailData, mailFactory, command, bridgeProviderData); diff --git a/apps/api/src/app/shared/shared.module.ts b/apps/api/src/app/shared/shared.module.ts index 0cf22a587c2..320b9f32b5e 100644 --- a/apps/api/src/app/shared/shared.module.ts +++ b/apps/api/src/app/shared/shared.module.ts @@ -39,8 +39,9 @@ import { ExecuteBridgeRequest, ExecutionLogRoute, featureFlagsService, - injectCommunityAuthProviders, + GetDecryptedSecretKey, getFeatureFlag, + injectCommunityAuthProviders, InvalidateCacheService, LoggerModule, QueuesModule, @@ -114,6 +115,7 @@ const PROVIDERS = [ CreateExecutionDetails, ExecuteBridgeRequest, getFeatureFlag, + GetDecryptedSecretKey, ]; const IMPORTS = [ diff --git a/apps/api/src/app/workflows-v2/mappers/notification-template-mapper.ts b/apps/api/src/app/workflows-v2/mappers/notification-template-mapper.ts index 83db74077a7..540451b8342 100644 --- a/apps/api/src/app/workflows-v2/mappers/notification-template-mapper.ts +++ b/apps/api/src/app/workflows-v2/mappers/notification-template-mapper.ts @@ -83,11 +83,11 @@ function toStepResponseDto(step: NotificationStepEntity): StepResponseDto { }; } -function convertControls(step: NotificationStepEntity): ControlsSchema | undefined { +function convertControls(step: NotificationStepEntity): ControlsSchema { if (step.template?.controls) { return { schema: step.template.controls.schema }; } else { - return undefined; + throw new Error('Missing controls'); } } diff --git a/apps/api/src/app/workflows-v2/usecases/upsert-workflow/upsert-workflow.usecase.ts b/apps/api/src/app/workflows-v2/usecases/upsert-workflow/upsert-workflow.usecase.ts index 96e821b7197..9122f90e248 100644 --- a/apps/api/src/app/workflows-v2/usecases/upsert-workflow/upsert-workflow.usecase.ts +++ b/apps/api/src/app/workflows-v2/usecases/upsert-workflow/upsert-workflow.usecase.ts @@ -223,6 +223,21 @@ export class UpsertWorkflowUseCase { return this.mapSingleStep(persistedWorkflow, step); }); + const seenStepIds = new Set(); + const duplicateStepIds = new Set(); + + steps.forEach((step) => { + if (seenStepIds.has(step.stepId)) { + duplicateStepIds.add(step.stepId); + } else { + seenStepIds.add(step.stepId); + } + }); + + if (duplicateStepIds.size > 0) { + throw new BadRequestException(`Duplicate stepIds are not allowed: ${Array.from(duplicateStepIds).join(', ')}`); + } + return steps; } @@ -244,7 +259,7 @@ export class UpsertWorkflowUseCase { return stepEntityToReturn; } - private buildBaseStepEntity(step: StepDto | (StepDto & { stepUuid: string })) { + private buildBaseStepEntity(step: StepDto | (StepDto & { stepUuid: string })): NotificationStep { return { template: { type: step.type, @@ -252,6 +267,7 @@ export class UpsertWorkflowUseCase { controls: step.controls, content: '', }, + stepId: slugifyName(step.name), name: step.name, }; } diff --git a/apps/api/src/app/workflows-v2/workflow.controller.e2e.ts b/apps/api/src/app/workflows-v2/workflow.controller.e2e.ts index 0558a4f0f98..097487ce5e5 100644 --- a/apps/api/src/app/workflows-v2/workflow.controller.e2e.ts +++ b/apps/api/src/app/workflows-v2/workflow.controller.e2e.ts @@ -15,7 +15,7 @@ import { WorkflowResponseDto, } from '@novu/shared'; import { randomBytes } from 'crypto'; -import { JsonSchema } from '@novu/framework'; +import { channelStepSchemas, JsonSchema } from '@novu/framework'; import { slugifyName } from '@novu/application-generic'; const v2Prefix = '/v2'; @@ -69,6 +69,16 @@ describe('Workflow Controller E2E API Testing', () => { const workflowCreated: WorkflowResponseDto = res.body.data; expect(workflowCreated.workflowId).to.include(`${slugifyName(nameSuffix)}-`); }); + + it('should throw error when creating workflow with duplicate step ids', async () => { + const nameSuffix = `Test Workflow${new Date().toString()}`; + const createWorkflowDto: CreateWorkflowDto = buildCreateWorkflowDto(nameSuffix, { + steps: [buildEmailStep(), buildEmailStep(), buildInAppStep(), buildInAppStep()], + }); + const res = await session.testAgent.post(`${v2Prefix}/workflows`).send(createWorkflowDto); + expect(res.status).to.be.equal(400); + expect(res.body.message).to.be.equal('Duplicate stepIds are not allowed: email-test-step, in-app-test-step'); + }); }); describe('Update Workflow Permutations', () => { @@ -228,6 +238,9 @@ async function createWorkflowAndValidate(nameSuffix: string = ''): Promise = {}): CreateWorkflowDto { return { __source: WorkflowCreationSourceEnum.EDITOR, name: TEST_WORKFLOW_NAME + nameSuffix, @@ -250,6 +266,7 @@ function buildCreateWorkflowDto(nameSuffix: string): CreateWorkflowDto { active: true, tags: TEST_TAGS, steps: [buildEmailStep(), buildInAppStep()], + ...overrides, }; } @@ -271,6 +288,9 @@ function buildStepWithoutUUid(stepInResponse: StepDto & { stepUuid: string }) { if (!stepInResponse.controls) { return { controlValues: stepInResponse.controlValues, + controls: { + schema: channelStepSchemas[stepInResponse.type].output, + }, name: stepInResponse.name, type: stepInResponse.type, }; diff --git a/apps/web/src/components/layout/components/v2/BridgeUpdateModal.tsx b/apps/web/src/components/layout/components/v2/BridgeUpdateModal.tsx index b5413bab76e..f432e1f2cfa 100644 --- a/apps/web/src/components/layout/components/v2/BridgeUpdateModal.tsx +++ b/apps/web/src/components/layout/components/v2/BridgeUpdateModal.tsx @@ -78,7 +78,7 @@ export const BridgeUpdateModal: FC = ({ isOpen, toggleOp result = await validateBridgeUrl({ bridgeUrl: url }); } if (!result.isValid) { - throw new Error('The provided URL is not the Novu Endpoint URL'); + throw new Error(result.error); } } diff --git a/apps/worker/src/.env.test b/apps/worker/src/.env.test index 1e54513db9b..636800112ff 100644 --- a/apps/worker/src/.env.test +++ b/apps/worker/src/.env.test @@ -84,3 +84,5 @@ BROADCAST_QUEUE_CHUNK_SIZE=100 MULTICAST_QUEUE_CHUNK_SIZE=100 IS_WORKFLOW_PREFERENCES_ENABLED=true + +API_ROOT_URL=http://localhost:1337 diff --git a/apps/worker/src/app/shared/shared.module.ts b/apps/worker/src/app/shared/shared.module.ts index 5ccc0dac16b..b596f212214 100644 --- a/apps/worker/src/app/shared/shared.module.ts +++ b/apps/worker/src/app/shared/shared.module.ts @@ -15,6 +15,7 @@ import { EventsDistributedLockService, ExecuteBridgeRequest, featureFlagsService, + GetDecryptedSecretKey, GetTenant, InvalidateCacheService, LoggerModule, @@ -116,6 +117,7 @@ const PROVIDERS = [ ...DAL_MODELS, ActiveJobsMetricService, ExecuteBridgeRequest, + GetDecryptedSecretKey, ]; @Module({ diff --git a/apps/worker/src/app/workflow/usecases/execute-bridge-job/execute-bridge-job.usecase.ts b/apps/worker/src/app/workflow/usecases/execute-bridge-job/execute-bridge-job.usecase.ts index 45c99bc7264..4bc3e65a90f 100644 --- a/apps/worker/src/app/workflow/usecases/execute-bridge-job/execute-bridge-job.usecase.ts +++ b/apps/worker/src/app/workflow/usecases/execute-bridge-job/execute-bridge-job.usecase.ts @@ -14,6 +14,7 @@ import { ExecutionDetailsSourceEnum, ExecutionDetailsStatusEnum, JobStatusEnum, + WorkflowOriginEnum, WorkflowTypeEnum, } from '@novu/shared'; import { Event, State, PostActionEnum, ExecuteOutput } from '@novu/framework'; @@ -26,7 +27,6 @@ import { ExecuteBridgeRequestCommand, } from '@novu/application-generic'; import { ExecuteBridgeJobCommand } from './execute-bridge-job.command'; -import { PlatformException } from '../../../shared/utils'; const LOG_CONTEXT = 'ExecuteBridgeJob'; @@ -57,7 +57,7 @@ export class ExecuteBridgeJob { $in: [WorkflowTypeEnum.ECHO, WorkflowTypeEnum.BRIDGE], }, }, - '_id triggers type' + '_id triggers type origin' ); } @@ -81,7 +81,7 @@ export class ExecuteBridgeJob { throw new Error(`Environment id ${command.environmentId} is not found`); } - if (!environment?.echo?.url && isStateful) { + if (!environment?.echo?.url && isStateful && workflow?.origin === WorkflowOriginEnum.EXTERNAL) { throw new Error(`Bridge URL is not set for environment id: ${environment._id}`); } @@ -109,9 +109,14 @@ export class ExecuteBridgeJob { : command.identifier; const bridgeResponse = await this.sendBridgeRequest({ - bridgeUrl: command.job.step.bridgeUrl ?? environment.echo.url, + environmentId: command.environmentId, + /* + * TODO: We fallback to external due to lack of backfilling origin for existing Workflows. + * Once we backfill the origin field for existing Workflows, we should remove the fallback. + */ + workflowOrigin: workflow?.origin || WorkflowOriginEnum.EXTERNAL, + statelessBridgeUrl: command.job.step.bridgeUrl, event: bridgeEvent, - apiKey: environment.apiKeys[0].key, job: command.job, searchParams: { workflowId, @@ -153,7 +158,7 @@ export class ExecuteBridgeJob { return payload; } - private async generateState(payload, command: ExecuteBridgeJobCommand) { + private async generateState(payload, command: ExecuteBridgeJobCommand): Promise { const previousJobs: State[] = []; let theJob = (await this.jobRepository.findOne({ _id: command.job._parentId, @@ -184,7 +189,7 @@ export class ExecuteBridgeJob { /* * Backward compatibility, If the first job is not a trigger, we need to add a trigger job to the state */ - private normalizeFirstJob(firstJob: JobEntity, previousJobs: State[], payload) { + private normalizeFirstJob(firstJob: JobEntity, previousJobs: State[], payload?: Record) { if (firstJob.type !== 'trigger') { previousJobs.push({ stepId: 'trigger', @@ -195,57 +200,57 @@ export class ExecuteBridgeJob { } private async sendBridgeRequest({ - bridgeUrl, + statelessBridgeUrl, event, - apiKey, job, searchParams, + workflowOrigin, + environmentId, }: Omit & { job: JobEntity; }): Promise { try { - const afterResponse = async (response) => { - const body = response?.body as string | undefined; - - if (response.statusCode >= 400) { - const createExecutionDetailsCommand: CreateExecutionDetailsCommand = { - ...CreateExecutionDetailsCommand.getDetailsFromJob(job), - detail: DetailEnum.FAILED_BRIDGE_RETRY, - source: ExecutionDetailsSourceEnum.INTERNAL, - status: ExecutionDetailsStatusEnum.WARNING, - isTest: false, - isRetry: false, - raw: JSON.stringify({ - url: bridgeUrl, - statusCode: response.statusCode, - retryCount: response.retryCount, - message: response.statusMessage, - ...(body && body?.length > 0 ? { raw: JSON.parse(body) } : {}), - }), - }; - - await this.createExecutionDetails.execute(createExecutionDetailsCommand); - } - - return response; - }; - return this.executeBridgeRequest.execute({ - bridgeUrl, + statelessBridgeUrl, event, - apiKey, action: PostActionEnum.EXECUTE, searchParams, - afterResponse: afterResponse.bind(this), + afterResponse: async (response) => { + const body = response?.body as string | undefined; + + if (response.statusCode >= 400) { + const createExecutionDetailsCommand: CreateExecutionDetailsCommand = { + ...CreateExecutionDetailsCommand.getDetailsFromJob(job), + detail: DetailEnum.FAILED_BRIDGE_RETRY, + source: ExecutionDetailsSourceEnum.INTERNAL, + status: ExecutionDetailsStatusEnum.WARNING, + isTest: false, + isRetry: false, + raw: JSON.stringify({ + url: statelessBridgeUrl, + statusCode: response.statusCode, + retryCount: response.retryCount, + message: response.statusMessage, + ...(body && body?.length > 0 ? { raw: JSON.parse(body) } : {}), + }), + }; + + await this.createExecutionDetails.execute(createExecutionDetailsCommand); + } + + return response; + }, + workflowOrigin, + environmentId, }) as Promise; } catch (error: any) { Logger.error(error, 'Error sending Bridge request:', LOG_CONTEXT); - let raw: { retryCount?: any; statusCode?: any; message: any; url: string }; + let raw: { retryCount?: number; statusCode?: number; message: string; url?: string }; if (error.response) { raw = { - url: bridgeUrl, + url: statelessBridgeUrl, statusCode: error.response?.statusCode, message: error.response?.statusMessage, ...(error.response?.retryCount ? { retryCount: error.response?.retryCount } : {}), @@ -253,12 +258,12 @@ export class ExecuteBridgeJob { }; } else if (error.message) { raw = { - url: bridgeUrl, + url: statelessBridgeUrl, message: error.message, }; } else { raw = { - url: bridgeUrl, + url: statelessBridgeUrl, message: 'An Unexpected Error Occurred', }; } @@ -279,7 +284,7 @@ export class ExecuteBridgeJob { } } - private async mapState(job: JobEntity, payload: any) { + private async mapState(job: JobEntity, payload: Record) { let output = {}; let state: State['state'] | null = null; let stepId: string | null = null; diff --git a/apps/worker/src/config/env.validators.ts b/apps/worker/src/config/env.validators.ts index 4ee8546a0f8..8c99fc6177b 100644 --- a/apps/worker/src/config/env.validators.ts +++ b/apps/worker/src/config/env.validators.ts @@ -60,6 +60,7 @@ export const envValidators = { NOTIFICATION_RETENTION_DAYS: num({ default: DEFAULT_NOTIFICATION_RETENTION_DAYS }), MESSAGE_GENERIC_RETENTION_DAYS: num({ default: DEFAULT_MESSAGE_GENERIC_RETENTION_DAYS }), MESSAGE_IN_APP_RETENTION_DAYS: num({ default: DEFAULT_MESSAGE_IN_APP_RETENTION_DAYS }), + API_ROOT_URL: str(), // Feature Flags ...Object.keys(FeatureFlagsKeysEnum).reduce( diff --git a/libs/application-generic/src/usecases/create-workflow/create-workflow.usecase.ts b/libs/application-generic/src/usecases/create-workflow/create-workflow.usecase.ts index 4a974112594..8e51a8e91e2 100644 --- a/libs/application-generic/src/usecases/create-workflow/create-workflow.usecase.ts +++ b/libs/application-generic/src/usecases/create-workflow/create-workflow.usecase.ts @@ -126,7 +126,10 @@ export class CreateWorkflow { } let triggerIdentifier: string; - if (command.type === WorkflowTypeEnum.BRIDGE) + if ( + command.type === WorkflowTypeEnum.BRIDGE && + command.origin === WorkflowOriginEnum.EXTERNAL + ) /* * Bridge workflows need to have the identifier preserved to ensure that * the Framework-defined identifier is the source of truth. diff --git a/libs/application-generic/src/usecases/execute-bridge-request/execute-bridge-request.command.ts b/libs/application-generic/src/usecases/execute-bridge-request/execute-bridge-request.command.ts index 12ad2599477..58618a39252 100644 --- a/libs/application-generic/src/usecases/execute-bridge-request/execute-bridge-request.command.ts +++ b/libs/application-generic/src/usecases/execute-bridge-request/execute-bridge-request.command.ts @@ -1,4 +1,4 @@ -import { IsDefined, IsOptional, IsString } from 'class-validator'; +import { IsDefined, IsOptional } from 'class-validator'; import { AfterResponseHook } from 'got'; import { CodeResult, @@ -10,20 +10,15 @@ import { PostActionEnum, HttpQueryKeysEnum, } from '@novu/framework'; -import { BaseCommand } from '../../commands'; - -export class ExecuteBridgeRequestCommand extends BaseCommand { - @IsString() - bridgeUrl: string; +import { WorkflowOriginEnum } from '@novu/shared'; +import { EnvironmentLevelCommand } from '../../commands'; +export class ExecuteBridgeRequestCommand extends EnvironmentLevelCommand { @IsOptional() event?: Omit; - @IsString() - apiKey: string; - @IsOptional() - searchParams?: Record; + searchParams?: Partial>; @IsOptional() afterResponse?: AfterResponseHook; @@ -33,6 +28,12 @@ export class ExecuteBridgeRequestCommand extends BaseCommand { @IsOptional() retriesLimit?: number; + + @IsDefined() + workflowOrigin: WorkflowOriginEnum; + + @IsOptional() + statelessBridgeUrl?: string; } // will generate the output type based on the action diff --git a/libs/application-generic/src/usecases/execute-bridge-request/execute-bridge-request.usecase.ts b/libs/application-generic/src/usecases/execute-bridge-request/execute-bridge-request.usecase.ts index 6675b1410bc..c2c6217a00c 100644 --- a/libs/application-generic/src/usecases/execute-bridge-request/execute-bridge-request.usecase.ts +++ b/libs/application-generic/src/usecases/execute-bridge-request/execute-bridge-request.usecase.ts @@ -1,24 +1,47 @@ -import { Injectable, Logger, NotFoundException } from '@nestjs/common'; +import { + Injectable, + Logger, + NotFoundException, + BadRequestException, + HttpException, +} from '@nestjs/common'; import got, { OptionsOfTextResponseBody, RequestError } from 'got'; -import { createHmac } from 'crypto'; +import { createHmac } from 'node:crypto'; import { PostActionEnum, HttpHeaderKeysEnum, HttpQueryKeysEnum, GetActionEnum, + ErrorCodeEnum, } from '@novu/framework'; +import { EnvironmentRepository } from '@novu/dal'; +import { HttpRequestHeaderKeysEnum, WorkflowOriginEnum } from '@novu/shared'; import { ExecuteBridgeRequestCommand, ExecuteBridgeRequestDto, } from './execute-bridge-request.command'; -import { decryptApiKey } from '../../encryption'; +import { + GetDecryptedSecretKey, + GetDecryptedSecretKeyCommand, +} from '../get-decrypted-secret-key'; +import { BRIDGE_EXECUTION_ERROR } from '../../utils'; export const DEFAULT_TIMEOUT = 15_000; // 15 seconds export const DEFAULT_RETRIES_LIMIT = 3; export const RETRYABLE_HTTP_CODES: number[] = [ 408, 413, 429, 500, 502, 503, 504, 521, 522, 524, ]; +const RETRYABLE_ERROR_CODES: string[] = [ + 'ETIMEDOUT', + 'ECONNRESET', + 'EADDRINUSE', + 'ECONNREFUSED', + 'EPIPE', + 'ENOTFOUND', + 'ENETUNREACH', + 'EAI_AGAIN', +]; const LOG_CONTEXT = 'ExecuteBridgeRequest'; @@ -35,11 +58,43 @@ type TunnelResponseError = { @Injectable() export class ExecuteBridgeRequest { + constructor( + private environmentRepository: EnvironmentRepository, + private getDecryptedSecretKey: GetDecryptedSecretKey, + ) {} + async execute( command: ExecuteBridgeRequestCommand, ): Promise> { + const environment = await this.environmentRepository.findOne({ + _id: command.environmentId, + }); + + if (!environment) { + throw new NotFoundException( + `Environment ${command.environmentId} not found`, + ); + } + + const secretKey = await this.getDecryptedSecretKey.execute( + GetDecryptedSecretKeyCommand.create({ + environmentId: command.environmentId, + }), + ); + const bridgeUrl = this.getBridgeUrl( + environment.bridge?.url || environment.echo?.url, + command.environmentId, + command.workflowOrigin, + command.statelessBridgeUrl, + ); + + Logger.log( + `Resolved bridge URL: ${bridgeUrl} for environment ${command.environmentId} and origin ${command.workflowOrigin}`, + LOG_CONTEXT, + ); + const retriesLimit = command.retriesLimit || DEFAULT_RETRIES_LIMIT; - const bridgeActionUrl = new URL(command.bridgeUrl); + const bridgeActionUrl = new URL(bridgeUrl); bridgeActionUrl.searchParams.set(HttpQueryKeysEnum.ACTION, command.action); Object.entries(command.searchParams || {}).forEach(([key, value]) => { bridgeActionUrl.searchParams.set(key, value); @@ -62,16 +117,7 @@ export class ExecuteBridgeRequest { return 2 ** attemptCount * 1000; }, statusCodes: RETRYABLE_HTTP_CODES, - errorCodes: [ - 'ETIMEDOUT', - 'ECONNRESET', - 'EADDRINUSE', - 'ECONNREFUSED', - 'EPIPE', - 'ENOTFOUND', - 'ENETUNREACH', - 'EAI_AGAIN', - ], + errorCodes: RETRYABLE_ERROR_CODES, }, hooks: { afterResponse: @@ -80,8 +126,8 @@ export class ExecuteBridgeRequest { }; const timestamp = Date.now(); - const novuSignatureHeader = `t=${timestamp},v1=${this.createHmacByApiKey( - command.apiKey, + const novuSignatureHeader = `t=${timestamp},v1=${this.createHmacBySecretKey( + secretKey, timestamp, command.event || {}, )}`; @@ -93,6 +139,8 @@ export class ExecuteBridgeRequest { : got.get; const headers = { + [HttpRequestHeaderKeysEnum.BYPASS_TUNNEL_REMINDER]: 'true', + [HttpRequestHeaderKeysEnum.CONTENT_TYPE]: 'application/json', [HttpHeaderKeysEnum.NOVU_SIGNATURE_DEPRECATED]: novuSignatureHeader, [HttpHeaderKeysEnum.NOVU_SIGNATURE]: novuSignatureHeader, }; @@ -105,19 +153,54 @@ export class ExecuteBridgeRequest { }).json(); } catch (error) { if (error instanceof RequestError) { - const body = JSON.parse(error.response.body as string); - if (body.code === TUNNEL_ERROR_CODE) { + let body: Record; + try { + body = JSON.parse(error.response.body as string); + } catch (e) { + // If the body is not valid JSON, we'll just use an empty object. + body = {}; + } + + if (Object.values(ErrorCodeEnum).includes(body.code as ErrorCodeEnum)) { + // Handle known Bridge errors. Propagate the error code and message. + throw new HttpException(body, error.response.statusCode); + } else if (body.code === TUNNEL_ERROR_CODE) { // Handle known tunnel errors const tunnelBody = body as TunnelResponseError; Logger.error( `Could not establish tunnel connection for \`${url}\`. Error: \`${tunnelBody.message}\``, LOG_CONTEXT, ); + throw new NotFoundException(BRIDGE_EXECUTION_ERROR.TUNNEL_NOT_FOUND); + } else if (error.response?.statusCode === 502) { + /* + * Tunnel was live, but the Bridge endpoint was down. + * 502 is thrown by the tunnel service when the Bridge endpoint is not reachable. + */ + Logger.error( + `Bridge endpoint unavailable for \`${url}\``, + LOG_CONTEXT, + ); + throw new BadRequestException( + BRIDGE_EXECUTION_ERROR.BRIDGE_ENDPOINT_UNAVAILABLE, + ); + } else if (error.response?.statusCode === 404) { + // Bridge endpoint wasn't found. + Logger.error(`Bridge endpoint not found for \`${url}\``, LOG_CONTEXT); throw new NotFoundException( - `Unable to reach Bridge app. Run npx novu@latest dev in Local mode, or ensure your Bridge app deployment is available.`, + BRIDGE_EXECUTION_ERROR.BRIDGE_ENDPOINT_NOT_FOUND, + ); + } else if (error.response?.statusCode === 405) { + // The Bridge endpoint didn't expose the required methods. + Logger.error( + `Bridge endpoint method not configured for \`${url}\``, + LOG_CONTEXT, + ); + throw new BadRequestException( + BRIDGE_EXECUTION_ERROR.BRIDGE_METHOD_NOT_CONFIGURED, ); } else { - // Handle unknown bridge request errors + // Unknown errors when calling the Bridge endpoint. Logger.error( `Unknown bridge request error calling \`${url}\`: \`${JSON.stringify( body, @@ -127,7 +210,7 @@ export class ExecuteBridgeRequest { throw error; } } else { - // Handle unknown errors + // Handle unknown, non-request errors. Logger.error( `Unknown bridge error calling \`${url}\``, error, @@ -138,11 +221,55 @@ export class ExecuteBridgeRequest { } } - private createHmacByApiKey(secret: string, timestamp: number, payload: any) { + private createHmacBySecretKey( + secretKey: string, + timestamp: number, + payload: unknown, + ) { const publicKey = `${timestamp}.${JSON.stringify(payload)}`; - return createHmac('sha256', decryptApiKey(secret)) - .update(publicKey) - .digest('hex'); + return createHmac('sha256', secretKey).update(publicKey).digest('hex'); + } + + /** + * Returns the bridge URL based on the workflow origin and statelessBridgeUrl. + * + * - Novu Cloud workflows go to the Novu API Bridge + * - External workflows go to the Client Bridge + * + * @param environmentBridgeUrl - The URL of the bridge app. + * @param environmentId - The ID of the environment. + * @param workflowOrigin - The origin of the workflow. + * @param statelessBridgeUrl - The URL of the stateless bridge app. + * @returns The correct bridge URL. + */ + private getBridgeUrl( + environmentBridgeUrl: string, + environmentId: string, + workflowOrigin: WorkflowOriginEnum, + statelessBridgeUrl?: string, + ): string { + if (statelessBridgeUrl) { + return statelessBridgeUrl; + } + + switch (workflowOrigin) { + case WorkflowOriginEnum.NOVU_CLOUD: + return `${this.getApiUrl()}/v1/environments/${environmentId}/bridge`; + case WorkflowOriginEnum.EXTERNAL: + return environmentBridgeUrl; + default: + throw new Error(`Unsupported workflow origin: ${workflowOrigin}`); + } + } + + private getApiUrl(): string { + const apiUrl = process.env.API_ROOT_URL; + + if (!apiUrl) { + throw new Error('API_ROOT_URL environment variable is not set'); + } + + return apiUrl; } } diff --git a/libs/application-generic/src/usecases/get-decrypted-secret-key/get-decrypted-secret-key.command.ts b/libs/application-generic/src/usecases/get-decrypted-secret-key/get-decrypted-secret-key.command.ts new file mode 100644 index 00000000000..eb0282be451 --- /dev/null +++ b/libs/application-generic/src/usecases/get-decrypted-secret-key/get-decrypted-secret-key.command.ts @@ -0,0 +1,3 @@ +import { EnvironmentLevelCommand } from '../../commands'; + +export class GetDecryptedSecretKeyCommand extends EnvironmentLevelCommand {} diff --git a/libs/application-generic/src/usecases/get-decrypted-secret-key/get-decrypted-secret-key.usecase.ts b/libs/application-generic/src/usecases/get-decrypted-secret-key/get-decrypted-secret-key.usecase.ts new file mode 100644 index 00000000000..cbb84c58f0d --- /dev/null +++ b/libs/application-generic/src/usecases/get-decrypted-secret-key/get-decrypted-secret-key.usecase.ts @@ -0,0 +1,23 @@ +import { Injectable, NotFoundException } from '@nestjs/common'; +import { EnvironmentRepository } from '@novu/dal'; +import { GetDecryptedSecretKeyCommand } from './get-decrypted-secret-key.command'; +import { decryptApiKey } from '../../encryption'; + +@Injectable() +export class GetDecryptedSecretKey { + constructor(private readonly environmentRepository: EnvironmentRepository) {} + + async execute(command: GetDecryptedSecretKeyCommand): Promise { + const environment = await this.environmentRepository.findOne({ + _id: command.environmentId, + }); + + if (!environment) { + throw new NotFoundException( + `Environment ${command.environmentId} not found`, + ); + } + + return decryptApiKey(environment.apiKeys[0].key); + } +} diff --git a/libs/application-generic/src/usecases/get-decrypted-secret-key/index.ts b/libs/application-generic/src/usecases/get-decrypted-secret-key/index.ts new file mode 100644 index 00000000000..259ae795f09 --- /dev/null +++ b/libs/application-generic/src/usecases/get-decrypted-secret-key/index.ts @@ -0,0 +1,2 @@ +export * from './get-decrypted-secret-key.usecase'; +export * from './get-decrypted-secret-key.command'; diff --git a/libs/application-generic/src/usecases/index.ts b/libs/application-generic/src/usecases/index.ts index ae434b0f813..2cb243028d6 100644 --- a/libs/application-generic/src/usecases/index.ts +++ b/libs/application-generic/src/usecases/index.ts @@ -45,3 +45,4 @@ export * from './execute-bridge-request'; export * from './upsert-preferences'; export * from './upsert-control-values'; export * from './get-preferences'; +export * from './get-decrypted-secret-key'; diff --git a/libs/application-generic/src/utils/bridge.ts b/libs/application-generic/src/utils/bridge.ts index c789c39fa9a..a9ecc69559c 100644 --- a/libs/application-generic/src/utils/bridge.ts +++ b/libs/application-generic/src/utils/bridge.ts @@ -35,3 +35,27 @@ export const isRegularDigestOutput = ( ): outputs is DigestRegularOutput => { return !isTimedDigestOutput(outputs) && !isLookBackDigestOutput(outputs); }; + +export const BRIDGE_EXECUTION_ERROR = { + TUNNEL_NOT_FOUND: { + code: 'TUNNEL_NOT_FOUND', + message: + 'Unable to establish tunnel connection. Run npx novu@latest dev in Local mode, or ensure your Tunnel app deployment is available.', + }, + BRIDGE_ENDPOINT_NOT_FOUND: { + code: 'BRIDGE_ENDPOINT_NOT_FOUND', + message: + 'Could not connect to Bridge Endpoint. Make sure you are running your local app server.', + }, + BRIDGE_ENDPOINT_UNAVAILABLE: { + code: 'BRIDGE_ENDPOINT_UNAVAILABLE', + message: + 'Unable to reach Bridge Endpoint. Run npx novu@latest dev in Local mode, or ensure your Bridge app deployment is available.', + }, + BRIDGE_METHOD_NOT_CONFIGURED: { + code: 'BRIDGE_METHOD_NOT_CONFIGURED', + message: + // eslint-disable-next-line max-len + 'Bridge Endpoint is not correctly configured. Ensure your `@novu/framework` integration exposes the `POST`, `GET`, and `OPTIONS` methods.', + }, +} satisfies Record; diff --git a/packages/shared/src/dto/workflows/workflow-commons-fields.ts b/packages/shared/src/dto/workflows/workflow-commons-fields.ts index dd3f6b2c6a3..4cdfc6687ab 100644 --- a/packages/shared/src/dto/workflows/workflow-commons-fields.ts +++ b/packages/shared/src/dto/workflows/workflow-commons-fields.ts @@ -39,8 +39,8 @@ export class StepDto { @IsDefined() type: StepTypeEnum; - @IsOptional() - controls?: ControlsSchema; + @IsObject() + controls: ControlsSchema; @IsObject() controlValues: Record; diff --git a/packages/shared/src/types/http/headers.types.ts b/packages/shared/src/types/http/headers.types.ts index 6310787f17c..d85a1be60d4 100644 --- a/packages/shared/src/types/http/headers.types.ts +++ b/packages/shared/src/types/http/headers.types.ts @@ -9,6 +9,7 @@ export enum HttpRequestHeaderKeysEnum { NOVU_ENVIRONMENT_ID = 'Novu-Environment-Id', NOVU_API_VERSION = 'Novu-API-Version', NOVU_USER_AGENT = 'Novu-User-Agent', + BYPASS_TUNNEL_REMINDER = 'Bypass-Tunnel-Reminder', } testHttpHeaderEnumValidity(HttpRequestHeaderKeysEnum);