Skip to content

Commit

Permalink
feat(echo): add regular delay step feature parity (#5670)
Browse files Browse the repository at this point in the history
* refactor(worker): add job

* feat(echo): add support for regular delay

* refactor(echo): after pr comments

* fix: update ref

* feat: add default type

* fix: types

* fix: frontend display

* fix: test definition for subscriber

---------

Co-authored-by: Dima Grossman <[email protected]>
  • Loading branch information
djabarovgeorge and scopsy authored Jun 13, 2024
1 parent cc23b1e commit 7fcc32d
Show file tree
Hide file tree
Showing 14 changed files with 219 additions and 84 deletions.
88 changes: 87 additions & 1 deletion apps/api/src/app/events/e2e/echo-trigger.e2e-ee.ts
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,90 @@ describe('Echo Trigger ', async () => {
expect(messagesAfter.length).to.be.eq(1);
expect(messagesAfter[0].content).to.include('2 people liked your post');
});

it('should trigger the echo workflow with delay', async () => {
const workflowId = 'delay-workflow';
await echoServer.echo.workflow(
workflowId,
async ({ step }) => {
const delayResponse = await step.delay(
'delay-id',
async (inputs) => {
return {
type: 'regular',
amount: inputs.amount,
unit: inputs.unit,
};
},
{
inputSchema: {
type: 'object',
properties: {
amount: {
type: 'number',
default: 2,
},
unit: {
type: 'string',
enum: ['seconds', 'minutes', 'hours', 'days', 'weeks', 'months'],
default: 'seconds',
},
},
},
}
);

await step.sms(
'send-sms',
async () => {
const duration = delayResponse.duration;

return {
body: `people waited for ${duration} seconds`,
};
},
{
inputSchema: {
type: 'object',
properties: {},
},
}
);
},
{
payloadSchema: {
type: 'object',
properties: {
name: { type: 'string', default: 'default_name' },
},
required: [],
additionalProperties: false,
} as const,
}
);

await discoverAndSyncEcho(session);

const workflow = await workflowsRepository.findByTriggerIdentifier(session.environment._id, workflowId);
expect(workflow).to.be.ok;

if (!workflow) {
throw new Error('Workflow not found');
}

await triggerEvent(session, workflowId, subscriber);

await session.awaitRunningJobs(workflow?._id, true, 0);

const messagesAfter = await messageRepository.find({
_environmentId: session.environment._id,
_subscriberId: subscriber._id,
channel: StepTypeEnum.SMS,
});

expect(messagesAfter.length).to.be.eq(1);
expect(messagesAfter[0].content).to.match(/people waited for \d+ seconds/);
});
});

async function syncWorkflow(session) {
Expand Down Expand Up @@ -472,10 +556,12 @@ async function triggerEvent(session, workflowId: string, subscriber, payload?: a
async function discoverAndSyncEcho(session: UserSession) {
const resultDiscover = await axios.get(echoServer.serverPath + '/echo?action=discover');

await session.testAgent.post(`/v1/echo/sync`).send({
const discoverResponse = await session.testAgent.post(`/v1/echo/sync`).send({
bridgeUrl: echoServer.serverPath + '/echo',
workflows: resultDiscover.data.workflows,
});

return discoverResponse;
}

async function markAllSubscriberMessagesAs(session: UserSession, subscriberId: string, markAs: MarkMessagesAsEnum) {
Expand Down
4 changes: 2 additions & 2 deletions apps/api/src/app/shared/shared.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import {
analyticsService,
cacheService,
CacheServiceHealthIndicator,
CalculateDelayService,
ComputeJobWaitDurationService,
createNestLoggingModuleOptions,
DalServiceHealthIndicator,
distributedLockService,
Expand Down Expand Up @@ -84,7 +84,7 @@ const PROVIDERS = [
analyticsService,
cacheService,
CacheServiceHealthIndicator,
CalculateDelayService,
ComputeJobWaitDurationService,
dalService,
DalServiceHealthIndicator,
distributedLockService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,30 +80,34 @@ const StepLogo = ({ status, type }) => {
);
};

const generateDetailByStepAndStatus = (status, step) => {
const generateDetailByStepAndStatus = (status, job) => {
if (status === JobStatusEnum.COMPLETED) {
return `Success! ${step.executionDetails?.at(-1)?.detail}`;
return `Success! ${job.executionDetails?.at(-1)?.detail}`;
}

if (step.type === StepTypeEnum.DIGEST) {
if (job.type === StepTypeEnum.DIGEST) {
if (status === JobStatusEnum.SKIPPED) {
return step.executionDetails?.at(-1)?.detail;
return job.executionDetails?.at(-1)?.detail;
}
const { digest } = step;
const { digest } = job;

if (!digest.amount && !digest.unit) return `Waiting to receive digest times from bridge endpoint`;

return `Digesting events for ${digest.amount} ${digest.unit}`;
}

if (step.type === StepTypeEnum.DELAY) {
const { digest, step: stepMetadata, payload } = step;
if (job.type === StepTypeEnum.DELAY) {
const { digest, step: stepMetadata, payload } = job;

if (!digest.amount && !digest.unit) return `Waiting to receive execution delay from bridge endpoint`;
if (stepMetadata.metadata.type === DelayTypeEnum.SCHEDULED) {
return `Delaying execution until ${payload[stepMetadata.metadata.delayPath]}`;
}

return `Delaying execution for ${digest.amount} ${digest.unit}`;
}

return step.executionDetails?.at(-1)?.detail;
return job.executionDetails?.at(-1)?.detail;
};

const getDetailsStyledComponentByStepStatus = (status) => {
Expand Down
4 changes: 2 additions & 2 deletions apps/worker/src/app/shared/shared.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import {
analyticsService,
BulkCreateExecutionDetails,
cacheService,
CalculateDelayService,
ComputeJobWaitDurationService,
CreateExecutionDetails,
createNestLoggingModuleOptions,
CreateNotificationJobs,
Expand Down Expand Up @@ -99,7 +99,7 @@ const PROVIDERS = [
analyticsService,
BulkCreateExecutionDetails,
cacheService,
CalculateDelayService,
ComputeJobWaitDurationService,
CreateExecutionDetails,
CreateLog,
CreateNotificationJobs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { JobRepository, JobStatusEnum } from '@novu/dal';
import { DelayTypeEnum, ExecutionDetailsSourceEnum, ExecutionDetailsStatusEnum, StepTypeEnum } from '@novu/shared';
import {
ApiException,
CalculateDelayService,
ComputeJobWaitDurationService,
DetailEnum,
ExecutionLogRoute,
ExecutionLogRouteCommand,
Expand All @@ -17,8 +17,8 @@ import { AddJobCommand } from './add-job.command';
export class AddDelayJob {
constructor(
private jobRepository: JobRepository,
@Inject(forwardRef(() => CalculateDelayService))
private calculateDelayService: CalculateDelayService,
@Inject(forwardRef(() => ComputeJobWaitDurationService))
private computeJobWaitDurationService: ComputeJobWaitDurationService,
@Inject(forwardRef(() => ExecutionLogRoute))
private executionLogRoute: ExecutionLogRoute
) {}
Expand All @@ -38,17 +38,11 @@ export class AddDelayJob {
let delay;

try {
delay = this.calculateDelayService.calculateDelay({
delay = this.computeJobWaitDurationService.calculateDelay({
stepMetadata: data.step.metadata,
payload: data.payload,
overrides: data.overrides,
// TODO: Remove fallback after other delay types are implemented.
bridgeResponse: command.bridgeResponse?.outputs
? {
type: DelayTypeEnum.REGULAR,
...command.bridgeResponse?.outputs,
}
: undefined,
bridgeResponse: command.bridgeResponse?.outputs,
});

await this.jobRepository.updateStatus(command.environmentId, data._id, JobStatusEnum.DELAYED);
Expand Down
45 changes: 30 additions & 15 deletions apps/worker/src/app/workflow/usecases/add-job/add-job.usecase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import { AddJobCommand } from './add-job.command';
import { validateDigest } from './validation';
import { ModuleRef } from '@nestjs/core';
import {
CalculateDelayService,
ComputeJobWaitDurationService,
ConditionsFilter,
ConditionsFilterCommand,
DetailEnum,
Expand Down Expand Up @@ -52,8 +52,8 @@ export class AddJob {
private executionLogRoute: ExecutionLogRoute,
private mergeOrCreateDigestUsecase: MergeOrCreateDigest,
private addDelayJob: AddDelayJob,
@Inject(forwardRef(() => CalculateDelayService))
private calculateDelayService: CalculateDelayService,
@Inject(forwardRef(() => ComputeJobWaitDurationService))
private computeJobWaitDurationService: ComputeJobWaitDurationService,
@Inject(forwardRef(() => ConditionsFilter))
private conditionsFilter: ConditionsFilter,
private normalizeVariablesUsecase: NormalizeVariables,
Expand Down Expand Up @@ -170,18 +170,39 @@ export class AddJob {
filterVariables: IFilterVariables,
delayAmount: number | undefined
) {
command.bridgeResponse = await this.resonateUsecase.execute<
command.bridgeResponse = await this.fetchResonateData(command, filterVariables);
delayAmount = await this.addDelayJob.execute(command);

Logger.debug(`Delay step Amount is: ${delayAmount}`, LOG_CONTEXT);

return delayAmount;
}

private async fetchResonateData(command: AddJobCommand, filterVariables: IFilterVariables) {
const response = await this.resonateUsecase.execute<
AddJobCommand & { variables: IFilterVariables },
ExecuteOutput<IBridgeDigestResponse>
>({
...command,
variables: filterVariables,
});
delayAmount = await this.addDelayJob.execute(command);

Logger.debug(`Delay step Amount is: ${delayAmount}`, LOG_CONTEXT);
await this.jobRepository.updateOne(
{
_id: command.job._id,
_environmentId: command.environmentId,
},
{
$set: {
'digest.amount': response?.outputs?.amount,
'digest.unit': response?.outputs?.unit,
'digest.type': response?.outputs?.type,
// TODO: Add other types for scheduled etc..
},
}
);

return delayAmount;
return response;
}

private async handleDigest(
Expand All @@ -191,17 +212,11 @@ export class AddJob {
digestAmount: number | undefined,
filtered: boolean
) {
const resonateResponse = await this.resonateUsecase.execute<
AddJobCommand & { variables: IFilterVariables },
ExecuteOutput<IBridgeDigestResponse>
>({
...command,
variables: filterVariables,
});
const resonateResponse = await this.fetchResonateData(command, filterVariables);

validateDigest(job);

digestAmount = this.calculateDelayService.calculateDelay({
digestAmount = this.computeJobWaitDurationService.calculateDelay({
stepMetadata: job.digest,
payload: job.payload,
overrides: job.overrides,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,36 +1,36 @@
import { DigestUnitEnum } from '@novu/shared';
import { CalculateDelayService } from './calculate-delay.service';
import { ComputeJobWaitDurationService } from './compute-job-wait-duration.service';

describe('Calculate Delay Service', function () {
describe('Compute Job Wait Duration Service', function () {
describe('toMilliseconds', function () {
const calculateDelayService = new CalculateDelayService();
const computeJobWaitDurationService = new ComputeJobWaitDurationService();

it('should convert seconds to milliseconds', function () {
const result = (calculateDelayService as any).toMilliseconds(
const result = (computeJobWaitDurationService as any).toMilliseconds(
5,
DigestUnitEnum.SECONDS
);
expect(result).toEqual(5000);
});

it('should convert minutes to milliseconds', function () {
const result = (calculateDelayService as any).toMilliseconds(
const result = (computeJobWaitDurationService as any).toMilliseconds(
5,
DigestUnitEnum.MINUTES
);
expect(result).toEqual(300000);
});

it('should convert hours to milliseconds', function () {
const result = (calculateDelayService as any).toMilliseconds(
const result = (computeJobWaitDurationService as any).toMilliseconds(
5,
DigestUnitEnum.HOURS
);
expect(result).toEqual(18000000);
});

it('should convert days to milliseconds', function () {
const result = (calculateDelayService as any).toMilliseconds(
const result = (computeJobWaitDurationService as any).toMilliseconds(
1,
DigestUnitEnum.DAYS
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import {
IBridgeDigestResponse,
} from '../../utils/require-inject';

export class CalculateDelayService {
export class ComputeJobWaitDurationService {
calculateDelay({
stepMetadata,
payload,
Expand All @@ -41,7 +41,6 @@ export class CalculateDelayService {
if (!delayPath) throw new ApiException(`Delay path not found`);

const delayDate = payload[delayPath];

const delay = differenceInMilliseconds(new Date(delayDate), new Date());

if (delay < 0) {
Expand All @@ -51,12 +50,10 @@ export class CalculateDelayService {
}

return delay;
}

const userUnit = castToDigestUnitEnum(bridgeResponse?.unit);
const userAmount = bridgeResponse?.amount;
} else if (isRegularDigest(digestType)) {
const userUnit = castToDigestUnitEnum(bridgeResponse?.unit);
const userAmount = bridgeResponse?.amount;

if (isRegularDigest(digestType)) {
if (this.isValidDelayOverride(overrides)) {
return this.toMilliseconds(
userAmount ?? (overrides.delay.amount as number),
Expand All @@ -70,9 +67,7 @@ export class CalculateDelayService {
userAmount ?? regularDigestMeta.amount,
userUnit ?? regularDigestMeta.unit
);
}

if (digestType === DigestTypeEnum.TIMED) {
} else if (digestType === DigestTypeEnum.TIMED) {
const timedDigestMeta = stepMetadata as IDigestTimedMetadata;

return TimedDigestDelayService.calculate({
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
export * from './calculate-delay.service';
export * from './compute-job-wait-duration.service';
Loading

0 comments on commit 7fcc32d

Please sign in to comment.