Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Q metric refactored after memorydb migration #4099

Merged
merged 6 commits into from
Sep 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apps/worker/src/.env.production
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
NODE_ENV=production
PORT=3004
FLEET_NAME="default"
# STORE_ENCRYPTION_KEY="<ENCRYPTION_KEY_MUST_BE_32_LONG>"

# Novu
Expand Down
1 change: 1 addition & 0 deletions apps/worker/src/.example.env
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
NODE_ENV=local
PORT=3004
FLEET_NAME="default"
STORE_ENCRYPTION_KEY="<ENCRYPTION_KEY_MUST_BE_32_LONG>"

# Novu
Expand Down
8 changes: 7 additions & 1 deletion apps/worker/src/app/health/health.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import {
DalServiceHealthIndicator,
StandardQueueServiceHealthIndicator,
WorkflowQueueServiceHealthIndicator,
ActiveJobsMetricQueueServiceHealthIndicator,
CompletedJobsMetricQueueServiceHealthIndicator,
} from '@novu/application-generic';

import { version } from '../../../package.json';
Expand All @@ -16,7 +18,9 @@ export class HealthController {
private healthCheckService: HealthCheckService,
private dalHealthIndicator: DalServiceHealthIndicator,
private standardQueueHealthIndicator: StandardQueueServiceHealthIndicator,
private workflowQueueHealthIndicator: WorkflowQueueServiceHealthIndicator
private workflowQueueHealthIndicator: WorkflowQueueServiceHealthIndicator,
private activeJobsMetricQueueServiceHealthIndicator: ActiveJobsMetricQueueServiceHealthIndicator,
private completedJobsMetricQueueServiceHealthIndicator: CompletedJobsMetricQueueServiceHealthIndicator
) {}

@Get()
Expand All @@ -26,6 +30,8 @@ export class HealthController {
async () => this.dalHealthIndicator.isHealthy(),
async () => this.standardQueueHealthIndicator.isHealthy(),
async () => this.workflowQueueHealthIndicator.isHealthy(),
async () => this.activeJobsMetricQueueServiceHealthIndicator.isHealthy(),
async () => this.completedJobsMetricQueueServiceHealthIndicator.isHealthy(),
async () => {
return {
apiVersion: {
Expand Down
19 changes: 19 additions & 0 deletions apps/worker/src/app/shared/utils/cron-health.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { Logger } from '@nestjs/common';

const url = process.env.CRON_CHECKING_URL
? process.env.CRON_CHECKING_URL
: 'https://uptime.betterstack.com/api/v1/heartbeat/';
const LOG_CONTEXT = 'cronHealth';
export async function checkingForCronJob(cronId?: string) {
if (process.env.NOVU_MANAGED_SERVICE && process.env.NODE_ENV === 'production' && cronId && url) {
Logger.verbose(`Calling health endpoint for ${cronId}`);

const response = await fetch(url + cronId);

if (response.status !== 200) {
Logger.error(`Failed calling better Uptime: ${response.status}`, LOG_CONTEXT);
} else {
Logger.verbose(`Response from better Uptime: ${response.status}`, LOG_CONTEXT);
}
}
}
1 change: 1 addition & 0 deletions apps/worker/src/app/shared/utils/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export * from './constants';
export * from './exceptions';
export * from './hmac';
export * from './cron-health';
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import { Test, TestingModule } from '@nestjs/testing';
import { expect } from 'chai';

import {
ActiveJobsMetricQueueService,
ActiveJobsMetricWorkerService,
StandardQueueService,
WebSocketsQueueService,
WorkflowQueueService,
} from '@novu/application-generic';

import { ActiveJobsMetricService } from './active-jobs-metric.service';

import { WorkflowModule } from '../workflow.module';

let activeJobsMetricService: ActiveJobsMetricService;
let standardService: StandardQueueService;
let webSocketsQueueService: WebSocketsQueueService;
let workflowQueueService: WorkflowQueueService;
let moduleRef: TestingModule;

before(async () => {
process.env.IN_MEMORY_CLUSTER_MODE_ENABLED = 'false';
process.env.IS_IN_MEMORY_CLUSTER_MODE_ENABLED = 'false';

moduleRef = await Test.createTestingModule({
imports: [WorkflowModule],
}).compile();

standardService = moduleRef.get<StandardQueueService>(StandardQueueService);
webSocketsQueueService = moduleRef.get<WebSocketsQueueService>(WebSocketsQueueService);
workflowQueueService = moduleRef.get<WorkflowQueueService>(WorkflowQueueService);
});

describe('Active Jobs Metric Service', () => {
before(async () => {
process.env.IN_MEMORY_CLUSTER_MODE_ENABLED = 'false';
process.env.IS_IN_MEMORY_CLUSTER_MODE_ENABLED = 'false';

moduleRef = await Test.createTestingModule({
imports: [WorkflowModule],
}).compile();

standardService = moduleRef.get<StandardQueueService>(StandardQueueService);
webSocketsQueueService = moduleRef.get<WebSocketsQueueService>(WebSocketsQueueService);
workflowQueueService = moduleRef.get<WorkflowQueueService>(WorkflowQueueService);

const activeJobsMetricQueueService = new ActiveJobsMetricQueueService();
const activeJobsMetricWorkerService = new ActiveJobsMetricWorkerService();

activeJobsMetricService = new ActiveJobsMetricService([
standardService,
webSocketsQueueService,
workflowQueueService,
]);
});

describe('Environment variables not set', () => {
beforeEach(() => {
process.env.NOVU_MANAGED_SERVICE = 'false';
process.env.NEW_RELIC_LICENSE_KEY = '';

activeJobsMetricService = new ActiveJobsMetricService([
standardService,
webSocketsQueueService,
workflowQueueService,
]);
});

it('should not initialize neither the queue or the worker if the environment conditions are not met', async () => {
expect(activeJobsMetricService).to.be.ok;
expect(activeJobsMetricService).to.have.all.keys('tokenList');
expect(await activeJobsMetricService.activeJobsMetricQueueService).to.not.be.ok;
expect(await activeJobsMetricService.activeJobsMetricWorkerService).to.not.be.ok;
});
});

describe('Environment variables configured', () => {
beforeEach(async () => {
process.env.NOVU_MANAGED_SERVICE = 'true';
process.env.NEW_RELIC_LICENSE_KEY = 'license';

activeJobsMetricService = new ActiveJobsMetricService([
standardService,
webSocketsQueueService,
workflowQueueService,
]);
});

after(async () => {
await activeJobsMetricService.activeJobsMetricQueueService.queue.drain();
await activeJobsMetricService.gracefulShutdown();
});

it('should be initialised properly', async () => {
expect(activeJobsMetricService).to.be.ok;
expect(activeJobsMetricService).to.have.all.keys(
'activeJobsMetricQueueService',
'activeJobsMetricWorkerService',
'tokenList'
);
expect(await activeJobsMetricService.activeJobsMetricQueueService.bullMqService.getStatus()).to.deep.equal({
queueIsPaused: false,
queueName: 'metric-active-jobs',
workerName: undefined,
workerIsPaused: undefined,
workerIsRunning: undefined,
});
expect(await activeJobsMetricService.activeJobsMetricWorkerService.bullMqService.getStatus()).to.deep.equal({
queueIsPaused: undefined,
queueName: undefined,
workerName: 'metric-active-jobs',
workerIsPaused: false,
workerIsRunning: true,
});
expect(activeJobsMetricService.activeJobsMetricWorkerService.worker.opts).to.deep.include({
concurrency: 1,
lockDuration: 900,
});
});
});
});
135 changes: 135 additions & 0 deletions apps/worker/src/app/workflow/services/active-jobs-metric.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
import {
ActiveJobsMetricQueueService,
ActiveJobsMetricWorkerService,
QueueBaseService,
WorkerOptions,
} from '@novu/application-generic';
import * as process from 'process';
import { JobTopicNameEnum } from '@novu/shared';

import { Inject, Injectable, Logger } from '@nestjs/common';

import { checkingForCronJob } from '../../shared/utils';

const LOG_CONTEXT = 'ActiveJobMetricService';
const METRIC_JOB_ID = 'metric-job';

@Injectable()
export class ActiveJobsMetricService {
public readonly activeJobsMetricQueueService: ActiveJobsMetricQueueService;
public readonly activeJobsMetricWorkerService: ActiveJobsMetricWorkerService;

constructor(@Inject('BULLMQ_LIST') private tokenList: QueueBaseService[]) {
if (process.env.NOVU_MANAGED_SERVICE === 'true' && process.env.NEW_RELIC_LICENSE_KEY) {
this.activeJobsMetricQueueService = new ActiveJobsMetricQueueService();
this.activeJobsMetricWorkerService = new ActiveJobsMetricWorkerService();

this.activeJobsMetricQueueService.createQueue();
this.activeJobsMetricWorkerService.createWorker(this.getWorkerProcessor(), this.getWorkerOptions());

this.activeJobsMetricWorkerService.worker.on('completed', async (job) => {
await checkingForCronJob(process.env.ACTIVE_CRON_ID);
Logger.verbose({ jobId: job.id }, 'Metric Completed Job', LOG_CONTEXT);
});

this.activeJobsMetricWorkerService.worker.on('failed', async (job, error) => {
Logger.verbose('Metric Completed Job failed', LOG_CONTEXT, error);
});

this.addToQueueIfMetricJobExists();
}
}

private addToQueueIfMetricJobExists(): void {
Promise.resolve(
this.activeJobsMetricQueueService.queue.getRepeatableJobs().then((job): boolean => {
let exists = false;
for (const jobElement of job) {
if (jobElement.id === METRIC_JOB_ID) {
exists = true;
}
}

return exists;
})
)
.then(async (exists: boolean): Promise<void> => {
Logger.debug(`metric job exists: ${exists}`, LOG_CONTEXT);

if (!exists) {
Logger.debug(`metricJob doesn't exist, creating it`, LOG_CONTEXT);

return await this.activeJobsMetricQueueService.add(METRIC_JOB_ID, undefined, '', {
jobId: METRIC_JOB_ID,
repeatJobKey: METRIC_JOB_ID,
repeat: {
immediately: true,
pattern: '* * * * * *',
},
removeOnFail: true,
removeOnComplete: true,
attempts: 1,
});
}

return undefined;
})
.catch((error) => Logger.error('Metric Job Exists function errored', LOG_CONTEXT, error));
}

private getWorkerOptions(): WorkerOptions {
return {
lockDuration: 900,
concurrency: 1,
settings: {},
};
}

private getWorkerProcessor() {
return async () => {
return await new Promise<void>(async (resolve, reject): Promise<void> => {
Logger.verbose('metric job started', LOG_CONTEXT);
const deploymentName = process.env.FLEET_NAME ?? 'default';

try {
for (const queueService of this.tokenList) {
const waitCount = await queueService.bullMqService.queue.getWaitingCount();
const delayedCount = await queueService.bullMqService.queue.getDelayedCount();
const activeCount = await queueService.bullMqService.queue.getActiveCount();

Logger.verbose('active length', process.env.NEW_RELIC_LICENSE_KEY.length);
Logger.log('Recording active, waiting, and delayed metrics');

const nr = require('newrelic');
nr.recordMetric(`Queue/${deploymentName}/${queueService.topic}/waiting`, waitCount);
nr.recordMetric(`Queue/${deploymentName}/${queueService.topic}/delayed`, delayedCount);
nr.recordMetric(`Queue/${deploymentName}/${queueService.topic}/active`, activeCount);

Logger.verbose(`Queue/${deploymentName}/${queueService.topic}/waiting`, waitCount);
Logger.verbose(`Queue/${deploymentName}/${queueService.topic}/delayed`, delayedCount);
Logger.verbose(`Queue/${deploymentName}/${queueService.topic}/active`, activeCount);
}

return resolve();
} catch (error) {
Logger.error({ error }, 'Error occured while processing metrics', LOG_CONTEXT);

return reject(error);
}
});
};
}

public async gracefulShutdown(): Promise<void> {
Logger.log('Shutting the Active Jobs Metric service down', LOG_CONTEXT);

await this.activeJobsMetricQueueService.gracefulShutdown();
await this.activeJobsMetricWorkerService.gracefulShutdown();

Logger.log('Shutting down the Active Jobs Metric service has finished', LOG_CONTEXT);
}

async onModuleDestroy(): Promise<void> {
await this.gracefulShutdown();
}
}
Loading