Skip to content

Commit

Permalink
fix(worker): Provide correct execution context to Echo endpoints
Browse files Browse the repository at this point in the history
  • Loading branch information
rifont committed Apr 16, 2024
1 parent 58f8ecf commit 8678c21
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 18 deletions.
2 changes: 1 addition & 1 deletion .source
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,13 @@ export class Digest extends SendMessageType {
})
);

const jobsToUpdate = [...nextJobs.map((job) => job._id), command.job._id];

await this.jobRepository.update(
{
_environmentId: command.environmentId,
_id: {
$in: nextJobs.map((job) => job._id),
$in: jobsToUpdate,
},
},
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,16 @@ export class SendMessage {

const stepType = command.step?.template?.type;

const chimeraResponse = await this.chimeraConnector.execute<
SendMessageCommand & { variables: IFilterVariables },
ExecuteOutput<IChimeraChannelResponse> | null
>({
...command,
variables: shouldRun.variables,
});
let chimeraResponse: ExecuteOutput<IChimeraChannelResponse> | null = null;
if (!['digest', 'delay'].includes(stepType as any)) {
chimeraResponse = await this.chimeraConnector.execute<
SendMessageCommand & { variables: IFilterVariables },
ExecuteOutput<IChimeraChannelResponse> | null
>({
...command,
variables: shouldRun.variables,
});
}

if (!command.payload?.$on_boarding_trigger) {
const usedFilters = shouldRun?.conditions.reduce(ConditionsFilter.sumFilters, {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { forwardRef, Inject, Injectable, Logger } from '@nestjs/common';

import { JobRepository, JobStatusEnum } from '@novu/dal';
import {
DelayTypeEnum,
ExecutionDetailsSourceEnum,
ExecutionDetailsStatusEnum,
StepTypeEnum,
Expand Down Expand Up @@ -46,7 +47,13 @@ export class AddDelayJob {
stepMetadata: data.step.metadata,
payload: data.payload,
overrides: data.overrides,
chimeraResponse: command.chimeraResponse?.outputs,
// TODO: Remove fallback after other delay types are implemented.
chimeraResponse: command.chimeraResponse?.outputs
? {
type: DelayTypeEnum.REGULAR,
...command.chimeraResponse?.outputs,
}
: undefined,
});

await this.jobRepository.updateStatus(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
ExecutionDetailsStatusEnum,
StepTypeEnum,
DigestCreationResultEnum,
DigestTypeEnum,
} from '@novu/shared';

import { AddDelayJob } from './add-delay-job.usecase';
Expand Down Expand Up @@ -35,6 +36,7 @@ import {
IUseCaseInterfaceInline,
requireInject,
} from '../../utils/require-inject';
import { IFilterVariables } from '../../utils/filter-processing-details';

export enum BackoffStrategiesEnum {
WEBHOOK_FILTER_BACKOFF = 'webhookFilterBackoff',
Expand Down Expand Up @@ -85,7 +87,7 @@ export class AddJob {
);

let filtered = false;

let filterVariables: IFilterVariables | undefined;
if (
[StepTypeEnum.DELAY, StepTypeEnum.DIGEST].includes(
job.type as StepTypeEnum
Expand All @@ -102,24 +104,31 @@ export class AddJob {
})
);

filterVariables = shouldRun.variables;
filtered = !shouldRun.passed;
}

let digestAmount: number | undefined;
let digestCreationResult: DigestCreationResultEnum | undefined;
if (job.type === StepTypeEnum.DIGEST) {
const chimeraResponse = await this.chimeraConnector.execute<
AddJobCommand,
AddJobCommand & { variables: IFilterVariables },
ExecuteOutput<IChimeraDigestResponse>
>(command);
>({
...command,
variables: filterVariables,
});

validateDigest(job);

digestAmount = this.calculateDelayService.calculateDelay({
stepMetadata: job.digest,
payload: job.payload,
overrides: job.overrides,
chimeraResponse: chimeraResponse?.outputs,
// TODO: Remove fallback after other digest types are implemented.
chimeraResponse: chimeraResponse
? { type: DigestTypeEnum.REGULAR, ...chimeraResponse.outputs }
: undefined,
});

Logger.debug(`Digest step amount is: ${digestAmount}`, LOG_CONTEXT);
Expand Down Expand Up @@ -164,9 +173,12 @@ export class AddJob {

if (job.type === StepTypeEnum.DELAY) {
const chimeraResponse = await this.chimeraConnector.execute<
AddJobCommand,
AddJobCommand & { variables: IFilterVariables },
ExecuteOutput<IChimeraDigestResponse>
>(command);
>({
...command,
variables: filterVariables,
});

command.chimeraResponse = chimeraResponse;
delayAmount = await this.addDelayJob.execute(command);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ export class MergeOrCreateDigest {
): Promise<MergeOrCreateDigestResultType> {
const { job } = command;

const digestMeta = job.digest as IDigestBaseMetadata | undefined;
const digestMeta =
command.chimeraData ?? (job.digest as IDigestBaseMetadata | undefined);
const digestKey = command.chimeraData?.digestKey ?? digestMeta?.digestKey;
const digestValue = getNestedValue(job.payload, digestKey);

Expand All @@ -73,7 +74,10 @@ export class MergeOrCreateDigest {
case DigestCreationResultEnum.SKIPPED:
return await this.processSkippedDigest(job, command.filtered);
case DigestCreationResultEnum.CREATED:
return await this.processCreatedDigest(digestMeta, job);
return await this.processCreatedDigest(
digestMeta as IDigestBaseMetadata,
job
);
default:
throw new ApiException('Something went wrong with digest creation');
}
Expand Down

0 comments on commit 8678c21

Please sign in to comment.