Skip to content

Commit

Permalink
Merge pull request #5083 from novuhq/nv-3343-feature-flag-the-executi…
Browse files Browse the repository at this point in the history
…on-details-queue

Execution Detail - Route by FF to store by queue or directly to DAL layer
  • Loading branch information
djabarovgeorge authored Jan 17, 2024
2 parents cd9d5e2 + c5d680b commit 50d89a6
Show file tree
Hide file tree
Showing 28 changed files with 514 additions and 572 deletions.
6 changes: 6 additions & 0 deletions apps/api/src/app/shared/shared.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ import {
QueuesModule,
storageService,
getIsApiIdempotencyEnabled,
ExecutionLogRoute,
CreateExecutionDetails,
getIsExecutionLogQueueEnabled,
} from '@novu/application-generic';

import * as packageJson from '../../../package.json';
Expand Down Expand Up @@ -95,6 +98,9 @@ const PROVIDERS = [
InvalidateCacheService,
storageService,
...DAL_MODELS,
ExecutionLogRoute,
CreateExecutionDetails,
getIsExecutionLogQueueEnabled,
];

@Module({
Expand Down
Original file line number Diff line number Diff line change
@@ -1,25 +1,20 @@
import { NotFoundError } from 'rxjs';
import { Injectable, Logger } from '@nestjs/common';

import { JobRepository } from '@novu/dal';
import { ExecutionDetailsSourceEnum, ExecutionDetailsStatusEnum } from '@novu/shared';
import {
CreateExecutionDetailsCommand,
DetailEnum,
ExecutionLogQueueService,
InstrumentUsecase,
} from '@novu/application-generic';
import { DetailEnum, ExecutionLogRoute, ExecutionLogRouteCommand, InstrumentUsecase } from '@novu/application-generic';

import { HandleLastFailedJobCommand } from './handle-last-failed-job.command';

import { QueueNextJob, QueueNextJobCommand } from '../queue-next-job';
import { PlatformException } from '../../../shared/utils';
import { NotFoundError } from 'rxjs';

const LOG_CONTEXT = 'HandleLastFailedJob';

@Injectable()
export class HandleLastFailedJob {
constructor(
private executionLogQueueService: ExecutionLogQueueService,
private executionLogRoute: ExecutionLogRoute,
private queueNextJob: QueueNextJob,
private jobRepository: JobRepository
) {}
Expand All @@ -40,21 +35,18 @@ export class HandleLastFailedJob {
Logger.error(message, new NotFoundError(message), LOG_CONTEXT);
throw new PlatformException(message);
}
const metadata = CreateExecutionDetailsCommand.getExecutionLogMetadata();
await this.executionLogQueueService.add({
name: metadata._id,
data: CreateExecutionDetailsCommand.create({
...metadata,
...CreateExecutionDetailsCommand.getDetailsFromJob(job),

await this.executionLogRoute.execute(
ExecutionLogRouteCommand.create({
...ExecutionLogRouteCommand.getDetailsFromJob(job),
detail: DetailEnum.WEBHOOK_FILTER_FAILED_LAST_RETRY,
source: ExecutionDetailsSourceEnum.WEBHOOK,
status: ExecutionDetailsStatusEnum.PENDING,
isTest: false,
isRetry: true,
raw: JSON.stringify({ message: JSON.parse(error.message).message }),
}),
groupId: job._organizationId,
});
})
);

if (!job?.step?.shouldStopOnFail) {
await this.queueNextJob.execute(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ import {
} from '@novu/shared';
import {
DetailEnum,
CreateExecutionDetailsCommand,
GetUseMergedDigestId,
FeatureFlagCommand,
ExecutionLogQueueService,
ExecutionLogRoute,
ExecutionLogRouteCommand,
} from '@novu/application-generic';

import { GetDigestEventsRegular } from './get-digest-events-regular.usecase';
Expand All @@ -32,13 +32,13 @@ export class Digest extends SendMessageType {
constructor(
protected messageRepository: MessageRepository,
protected createLogUsecase: CreateLog,
protected executionLogQueueService: ExecutionLogQueueService,
protected executionLogRoute: ExecutionLogRoute,
protected jobRepository: JobRepository,
private getDigestEventsRegular: GetDigestEventsRegular,
private getDigestEventsBackoff: GetDigestEventsBackoff,
private getUseMergedDigestId: GetUseMergedDigestId
) {
super(messageRepository, createLogUsecase, executionLogQueueService);
super(messageRepository, createLogUsecase, executionLogRoute);
}

public async execute(command: SendMessageCommand) {
Expand All @@ -56,21 +56,18 @@ export class Digest extends SendMessageType {

const events = await getEvents(command, currentJob);
const nextJobs = await this.getJobsToUpdate(command);
const metadata = CreateExecutionDetailsCommand.getExecutionLogMetadata();
await this.executionLogQueueService.add({
name: metadata._id,
data: CreateExecutionDetailsCommand.create({
...metadata,
...CreateExecutionDetailsCommand.getDetailsFromJob(currentJob),

await this.executionLogRoute.execute(
ExecutionLogRouteCommand.create({
...ExecutionLogRouteCommand.getDetailsFromJob(command.job),
detail: DetailEnum.DIGEST_TRIGGERED_EVENTS,
source: ExecutionDetailsSourceEnum.INTERNAL,
status: ExecutionDetailsStatusEnum.SUCCESS,
isTest: false,
isRetry: false,
raw: JSON.stringify(events),
}),
groupId: currentJob._organizationId,
});
})
);

await this.jobRepository.update(
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ import {
} from '@novu/shared';
import {
DetailEnum,
CreateExecutionDetailsCommand,
Instrument,
ExecutionLogQueueService,
getNestedValue,
ExecutionLogRoute,
ExecutionLogRouteCommand,
} from '@novu/application-generic';

import { PlatformException } from '../../../../shared/utils';
Expand All @@ -20,7 +20,7 @@ const LOG_CONTEXT = 'GetDigestEvents';

@Injectable()
export abstract class GetDigestEvents {
constructor(protected jobRepository: JobRepository, private executionLogQueueService: ExecutionLogQueueService) {}
constructor(protected jobRepository: JobRepository, private executionLogRoute: ExecutionLogRoute) {}

@Instrument()
protected async filterJobs(currentJob: JobEntity, transactionId: string, jobs: JobEntity[]) {
Expand All @@ -41,20 +41,17 @@ export abstract class GetDigestEvents {
)) as Pick<JobEntity, '_id'>;

if (!currentTrigger) {
const metadata = CreateExecutionDetailsCommand.getExecutionLogMetadata();
await this.executionLogQueueService.add({
name: metadata._id,
data: CreateExecutionDetailsCommand.create({
...metadata,
...CreateExecutionDetailsCommand.getDetailsFromJob(currentJob),
await this.executionLogRoute.execute(
ExecutionLogRouteCommand.create({
...ExecutionLogRouteCommand.getDetailsFromJob(currentJob),
detail: DetailEnum.DIGEST_TRIGGERED_EVENTS,
source: ExecutionDetailsSourceEnum.INTERNAL,
status: ExecutionDetailsStatusEnum.FAILED,
isTest: false,
isRetry: false,
}),
groupId: currentJob._organizationId,
});
})
);

const message = `Trigger job for jobId ${currentJob._id} is not found`;
Logger.error(message, LOG_CONTEXT);
throw new PlatformException(message);
Expand Down
Loading

0 comments on commit 50d89a6

Please sign in to comment.