Skip to content

Commit

Permalink
feat(infra): add active and completed job metrics with dedicated queues
Browse files Browse the repository at this point in the history
Co-Author: Zac Clifton <[email protected]>
  • Loading branch information
Cliftonz authored and p-fernandez committed Sep 5, 2023
1 parent d00b9b3 commit 60613ae
Show file tree
Hide file tree
Showing 30 changed files with 861 additions and 391 deletions.
1 change: 1 addition & 0 deletions apps/worker/src/.env.production
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
NODE_ENV=production
PORT=3004
FLEET_NAME="default"
# STORE_ENCRYPTION_KEY="<ENCRYPTION_KEY_MUST_BE_32_LONG>"

# Novu
Expand Down
1 change: 1 addition & 0 deletions apps/worker/src/.example.env
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
NODE_ENV=local
PORT=3004
FLEET_NAME="default"
STORE_ENCRYPTION_KEY="<ENCRYPTION_KEY_MUST_BE_32_LONG>"

# Novu
Expand Down
19 changes: 19 additions & 0 deletions apps/worker/src/app/shared/utils/cron-health.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { Logger } from '@nestjs/common';

const url = process.env.CRON_CHECKING_URL
? process.env.CRON_CHECKING_URL
: 'https://uptime.betterstack.com/api/v1/heartbeat/';
const LOG_CONTEXT = 'cronHealth';
export async function checkingForCronJob(cronId?: string) {
if (process.env.NOVU_MANAGED_SERVICE && process.env.NODE_ENV === 'production' && cronId && url) {
Logger.verbose(`Calling health endpoint for ${cronId}`);

const response = await fetch(url + cronId);

if (response.status !== 200) {
Logger.error(`Failed calling better Uptime: ${response.status}`, LOG_CONTEXT);
} else {
Logger.verbose(`Response from better Uptime: ${response.status}`, LOG_CONTEXT);
}
}
}
1 change: 1 addition & 0 deletions apps/worker/src/app/shared/utils/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export * from './constants';
export * from './exceptions';
export * from './hmac';
export * from './cron-health';
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import { Test, TestingModule } from '@nestjs/testing';
import { expect } from 'chai';

import {
ActiveJobsMetricQueueService,
ActiveJobsMetricWorkerService,
StandardQueueService,
WebSocketsQueueService,
WorkflowQueueService,
} from '@novu/application-generic';

import { ActiveJobsMetricService } from './active-jobs-metric.service';

import { WorkflowModule } from '../workflow.module';

let activeJobsMetricService: ActiveJobsMetricService;
let standardService: StandardQueueService;
let webSocketsQueueService: WebSocketsQueueService;
let workflowQueueService: WorkflowQueueService;
let moduleRef: TestingModule;

before(async () => {
process.env.IN_MEMORY_CLUSTER_MODE_ENABLED = 'false';
process.env.IS_IN_MEMORY_CLUSTER_MODE_ENABLED = 'false';

moduleRef = await Test.createTestingModule({
imports: [WorkflowModule],
}).compile();

standardService = moduleRef.get<StandardQueueService>(StandardQueueService);
webSocketsQueueService = moduleRef.get<WebSocketsQueueService>(WebSocketsQueueService);
workflowQueueService = moduleRef.get<WorkflowQueueService>(WorkflowQueueService);
});

describe('Active Jobs Metric Service', () => {
before(async () => {
process.env.IN_MEMORY_CLUSTER_MODE_ENABLED = 'false';
process.env.IS_IN_MEMORY_CLUSTER_MODE_ENABLED = 'false';

moduleRef = await Test.createTestingModule({
imports: [WorkflowModule],
}).compile();

standardService = moduleRef.get<StandardQueueService>(StandardQueueService);
webSocketsQueueService = moduleRef.get<WebSocketsQueueService>(WebSocketsQueueService);
workflowQueueService = moduleRef.get<WorkflowQueueService>(WorkflowQueueService);

const activeJobsMetricQueueService = new ActiveJobsMetricQueueService();
const activeJobsMetricWorkerService = new ActiveJobsMetricWorkerService();

activeJobsMetricService = new ActiveJobsMetricService([
standardService,
webSocketsQueueService,
workflowQueueService,
]);
});

describe('Environment variables not set', () => {
beforeEach(() => {
process.env.NOVU_MANAGED_SERVICE = 'false';
process.env.NEW_RELIC_LICENSE_KEY = '';

activeJobsMetricService = new ActiveJobsMetricService([
standardService,
webSocketsQueueService,
workflowQueueService,
]);
});

it('should not initialize neither the queue or the worker if the environment conditions are not met', async () => {
expect(activeJobsMetricService).to.be.ok;
expect(activeJobsMetricService).to.have.all.keys('tokenList');
expect(await activeJobsMetricService.activeJobsMetricQueueService).to.not.be.ok;
expect(await activeJobsMetricService.activeJobsMetricWorkerService).to.not.be.ok;
});
});

describe('Environment variables configured', () => {
beforeEach(async () => {
process.env.NOVU_MANAGED_SERVICE = 'true';
process.env.NEW_RELIC_LICENSE_KEY = 'license';

activeJobsMetricService = new ActiveJobsMetricService([
standardService,
webSocketsQueueService,
workflowQueueService,
]);
});

after(async () => {
await activeJobsMetricService.activeJobsMetricQueueService.queue.drain();
await activeJobsMetricService.gracefulShutdown();
});

it('should be initialised properly', async () => {
expect(activeJobsMetricService).to.be.ok;
expect(activeJobsMetricService).to.have.all.keys(
'activeJobsMetricQueueService',
'activeJobsMetricWorkerService',
'tokenList'
);
expect(await activeJobsMetricService.activeJobsMetricQueueService.bullMqService.getStatus()).to.deep.equal({
queueIsPaused: false,
queueName: 'metric-active-jobs',
workerName: undefined,
workerIsPaused: undefined,
workerIsRunning: undefined,
});
expect(await activeJobsMetricService.activeJobsMetricWorkerService.bullMqService.getStatus()).to.deep.equal({
queueIsPaused: undefined,
queueName: undefined,
workerName: 'metric-active-jobs',
workerIsPaused: false,
workerIsRunning: true,
});
expect(activeJobsMetricService.activeJobsMetricWorkerService.worker.opts).to.deep.include({
concurrency: 1,
lockDuration: 900,
});
});
});
});
141 changes: 141 additions & 0 deletions apps/worker/src/app/workflow/services/active-jobs-metric.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
import {
ActiveJobsMetricQueueService,
ActiveJobsMetricWorkerService,
QueueBaseService,
WorkerOptions,
} from '@novu/application-generic';
import * as process from 'process';
import { JobTopicNameEnum } from '@novu/shared';

import { Inject, Injectable, Logger } from '@nestjs/common';

import { checkingForCronJob } from '../../shared/utils';

const LOG_CONTEXT = 'ActiveJobMetricService';
const METRIC_JOB_ID = 'metric-job';

@Injectable()
export class ActiveJobsMetricService {
public readonly activeJobsMetricQueueService: ActiveJobsMetricQueueService;
public readonly activeJobsMetricWorkerService: ActiveJobsMetricWorkerService;

constructor(@Inject('BULLMQ_LIST') private tokenList: QueueBaseService[]) {
if (process.env.NOVU_MANAGED_SERVICE === 'true' && process.env.NEW_RELIC_LICENSE_KEY) {
this.activeJobsMetricQueueService = new ActiveJobsMetricQueueService();
this.activeJobsMetricWorkerService = new ActiveJobsMetricWorkerService();

this.activeJobsMetricQueueService.createQueue();
this.activeJobsMetricWorkerService.createWorker(this.getWorkerProcessor(), this.getWorkerOptions());

this.activeJobsMetricWorkerService.worker.on('completed', async (job) => {
await checkingForCronJob(process.env.ACTIVE_CRON_ID);
Logger.verbose({ jobId: job.id }, 'Metric Completed Job', LOG_CONTEXT);
});

this.activeJobsMetricWorkerService.worker.on('failed', async (job, error) => {
Logger.verbose('Metric Completed Job failed', LOG_CONTEXT, error);
});

this.addToQueueIfMetricJobExists();
}
}

private addToQueueIfMetricJobExists(): void {
Promise.resolve(
this.activeJobsMetricQueueService.queue.getRepeatableJobs().then((job): boolean => {
let exists = false;
for (const jobElement of job) {
if (jobElement.id === METRIC_JOB_ID) {
exists = true;
}
}

return exists;
})
)
.then(async (exists: boolean): Promise<void> => {
Logger.debug(`metric job exists: ${exists}`, LOG_CONTEXT);

if (!exists) {
Logger.debug(`metricJob doesn't exist, creating it`, LOG_CONTEXT);

return await this.activeJobsMetricQueueService.add(METRIC_JOB_ID, undefined, '', {
jobId: METRIC_JOB_ID,
repeatJobKey: METRIC_JOB_ID,
repeat: {
immediately: true,
pattern: '* * * * * *',
},
removeOnFail: true,
removeOnComplete: true,
attempts: 1,
});
}

return undefined;
})
.catch((error) => Logger.error('Metric Job Exists function errored', LOG_CONTEXT, error));
}

private getWorkerOptions(): WorkerOptions {
return {
lockDuration: 900,
concurrency: 1,
settings: {},
};
}

private getWorkerProcessor() {
return async () => {
return await new Promise<void>(async (resolve, reject): Promise<void> => {
Logger.verbose('metric job started', LOG_CONTEXT);
const deploymentName = process.env.FLEET_NAME ?? 'default';

try {
for (const queueService of this.tokenList) {
const waitCount = await queueService.bullMqService.queue.getWaitingCount();
const delayedCount = await queueService.bullMqService.queue.getDelayedCount();
const activeCount = await queueService.bullMqService.queue.getActiveCount();

Logger.verbose('active length', process.env.NEW_RELIC_LICENSE_KEY.length);
Logger.log('Recording active, waiting, and delayed metrics');

const nr = require('newrelic');
nr.recordMetric(`ActiveJobsMetricQueueService/${deploymentName}/${queueService.topic}/waiting`, waitCount);
nr.recordMetric(
`ActiveJobsMetricQueueService/${deploymentName}/${queueService.topic}/delayed`,
delayedCount
);
nr.recordMetric(`ActiveJobsMetricQueueService/${deploymentName}/${queueService.topic}/active`, activeCount);

Logger.verbose(`ActiveJobsMetricQueueService/${deploymentName}/${queueService.topic}/waiting`, waitCount);
Logger.verbose(
`ActiveJobsMetricQueueService/${deploymentName}/${queueService.topic}/delayed`,
delayedCount
);
Logger.verbose(`ActiveJobsMetricQueueService/${deploymentName}/${queueService.topic}/active`, activeCount);
}

return resolve();
} catch (error) {
Logger.error({ error }, 'Error occured while processing metrics', LOG_CONTEXT);

return reject(error);
}
});
};
}

public async gracefulShutdown(): Promise<void> {
Logger.log('Shutting the Active Jobs Metric service down', LOG_CONTEXT);

await this.activeJobsMetricQueueService.gracefulShutdown();
await this.activeJobsMetricWorkerService.gracefulShutdown();

Logger.log('Shutting down the Active Jobs Metric service has finished', LOG_CONTEXT);
}

async onModuleDestroy(): Promise<void> {
await this.gracefulShutdown();
}
}
Loading

0 comments on commit 60613ae

Please sign in to comment.