Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make the Agents Metrics Accurate Again #4060

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .cspell.json
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,9 @@
"nextjs",
"vanillajs",
"quckstart",
"errmsg"
"errmsg",
"CHECKIN",
"checkin"
],
"flagWords": [],
"patterns": [
Expand Down
1 change: 1 addition & 0 deletions apps/worker/src/.env.production
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
NODE_ENV=production
PORT=3004
FLEET_NAME="default"
# STORE_ENCRYPTION_KEY="<ENCRYPTION_KEY_MUST_BE_32_LONG>"

# Novu
Expand Down
1 change: 1 addition & 0 deletions apps/worker/src/.example.env
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
NODE_ENV=local
PORT=3004
FLEET_NAME="default"
STORE_ENCRYPTION_KEY="<ENCRYPTION_KEY_MUST_BE_32_LONG>"

# Novu
Expand Down
14 changes: 14 additions & 0 deletions apps/worker/src/app/shared/utils/cron-health.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import { Logger } from '@nestjs/common';

const url = process.env.CRON_CHECKIN_URL
? process.env.CRON_CHECKIN_URL
: 'https://uptime.betterstack.com/api/v1/heartbeat/';
Cliftonz marked this conversation as resolved.
Show resolved Hide resolved
const LOG_CONTEXT = 'cronHealth';
export async function checkinForCronJob(cronId?: string) {
if (process.env.NOVU_MANAGED_SERVICE && process.env.NODE_ENV === 'production' && cronId && url) {
Logger.verbose(`Calling health endpoint for ${cronId}`);
await fetch(url + cronId)
.then((response) => Logger.debug(`Response from better Uptime: ${response.status}`))
.catch((error) => Logger.error('Failed calling better Uptime', error, LOG_CONTEXT));
}
Cliftonz marked this conversation as resolved.
Show resolved Hide resolved
}
1 change: 1 addition & 0 deletions apps/worker/src/app/shared/utils/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export * from './constants';
export * from './exceptions';
export * from './hmac';
export * from './cron-health';
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
import { Test } from '@nestjs/testing';
import { expect } from 'chai';

import { MetricQueueService } from './metric-queue.service';
import { MetricQueueActiveService } from './metric-queue-active.service';

import { WorkflowModule } from '../workflow.module';

let metricQueueService: MetricQueueService;
let metricQueueService: MetricQueueActiveService;

describe('Metric Queue service', () => {
beforeEach(async () => {
const moduleRef = await Test.createTestingModule({
imports: [WorkflowModule],
}).compile();

metricQueueService = moduleRef.get<MetricQueueService>(MetricQueueService);
metricQueueService = moduleRef.get<MetricQueueActiveService>(MetricQueueActiveService);
});

afterEach(async () => {
Expand All @@ -24,7 +24,7 @@ describe('Metric Queue service', () => {
expect(metricQueueService).to.be.ok;
expect(metricQueueService).to.have.all.keys('DEFAULT_ATTEMPTS', 'bullMqService', 'name', 'token_list');
expect(await metricQueueService.bullMqService.getRunningStatus()).to.deep.include({
queueName: 'metric',
queueName: 'metric-complete',
workerName: 'metric',
});
expect(metricQueueService.bullMqService.worker.opts).to.deep.include({
Expand Down
115 changes: 115 additions & 0 deletions apps/worker/src/app/workflow/services/metric-queue-active.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import { WorkerOptions } from 'bullmq';
import { Inject, Injectable, Logger } from '@nestjs/common';
import { QueueService } from '@novu/application-generic';
import { checkinForCronJob } from '../../shared/utils';
import * as process from 'process';
import { JobTopicNameEnum } from '@novu/shared';

const LOG_CONTEXT = 'MetricQueueService';
const METRIC_JOB_ID = 'metric-job';

@Injectable()
export class MetricQueueActiveService extends QueueService<Record<string, never>> {
constructor(@Inject('BULLMQ_LIST') private token_list: QueueService[]) {
super(JobTopicNameEnum.METRICS_ACTIVE);

this.bullMqService.createWorker(this.name, this.getWorkerProcessor(), this.getWorkerOpts());

this.bullMqService.worker.on('completed', async (job) => {
await checkinForCronJob(process.env.ACTIVE_CRON_ID);
Logger.verbose('Metric job Completed', job.id, LOG_CONTEXT);
});

this.bullMqService.worker.on('failed', async (job, error) => {
Logger.verbose('Metric job failed', LOG_CONTEXT, error);
});

this.addToQueueIfMetricJobExists();
}
Cliftonz marked this conversation as resolved.
Show resolved Hide resolved

private addToQueueIfMetricJobExists(): void {
Promise.resolve(
this.bullMqService.queue.getRepeatableJobs().then((job): boolean => {
let exists = false;
for (const jobElement of job) {
if (jobElement.id === METRIC_JOB_ID) {
exists = true;
}
}

return exists;
})
)
.then(async (exists): Promise<void> => {
Cliftonz marked this conversation as resolved.
Show resolved Hide resolved
if (exists) {
Logger.debug('metric job exists: ' + exists, LOG_CONTEXT);
} else {
Logger.debug(`metricJob doesn't exist, creating it`, LOG_CONTEXT);

return await this.addToQueue(METRIC_JOB_ID, undefined, '', {
jobId: METRIC_JOB_ID,
repeatJobKey: METRIC_JOB_ID,
repeat: {
immediately: true,
pattern: '* * * * * *',
},
removeOnFail: true,
removeOnComplete: true,
attempts: 1,
});
}

return undefined;
})
.catch((error) => Logger.error('Metric Job Exists function errored', LOG_CONTEXT, error));
}

private getWorkerOpts(): WorkerOptions {
return {
lockDuration: 900,
concurrency: 1,
settings: {},
} as WorkerOptions;
}

private getWorkerProcessor() {
return async () => {
return await new Promise<void>(async (resolve, reject): Promise<void> => {
Logger.verbose('metric job started', LOG_CONTEXT);
const deploymentName = process.env.FLEET_NAME ?? 'default';

try {
for (const queueService of this.token_list) {
const waitCount = await queueService.bullMqService.queue.getWaitingCount();
const delayedCount = await queueService.bullMqService.queue.getDelayedCount();
const activeCount = await queueService.bullMqService.queue.getActiveCount();

if (process.env.NOVU_MANAGED_SERVICE === 'true' && process.env.NEW_RELIC_LICENSE_KEY) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we move the condition to this service construction and avoid to instantiate anything for our self hosted users?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure, you will need to let me know what is the best way to do this.
My thought is that for OS it will output to the cli so if they wanted to they can scrape this data for themselfs.

Logger.verbose('active length', process.env.NEW_RELIC_LICENSE_KEY.length);
Logger.log('Recording active, waiting, and delayed metrics');

const nr = require('newrelic');
nr.recordMetric(`Queue/${deploymentName}/${queueService.name}/waiting`, waitCount);
nr.recordMetric(`Queue/${deploymentName}/${queueService.name}/delayed`, delayedCount);
nr.recordMetric(`Queue/${deploymentName}/${queueService.name}/active`, activeCount);

Logger.verbose(`Queue/${deploymentName}/${queueService.name}/waiting`, waitCount);
Logger.verbose(`Queue/${deploymentName}/${queueService.name}/delayed`, delayedCount);
Logger.verbose(`Queue/${deploymentName}/${queueService.name}/active`, activeCount);
} else {
Logger.debug(`Queue/${deploymentName}/${queueService.name}/waiting`, waitCount);
Logger.debug(`Queue/${deploymentName}/${queueService.name}/delayed`, delayedCount);
Logger.debug(`Queue/${deploymentName}/${queueService.name}/active`, activeCount);
}
}

return resolve();
} catch (error) {
Logger.error('Error occured while processing metrics', { error });

return reject(error);
}
});
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import { Test } from '@nestjs/testing';
import { expect } from 'chai';

import { MetricQueueActiveService } from './metric-queue-active.service';

import { WorkflowModule } from '../workflow.module';

let metricQueueService: MetricQueueActiveService;

describe('Metric Queue service', () => {
beforeEach(async () => {
const moduleRef = await Test.createTestingModule({
imports: [WorkflowModule],
}).compile();

metricQueueService = moduleRef.get<MetricQueueActiveService>(MetricQueueActiveService);
});

afterEach(async () => {
await metricQueueService.gracefulShutdown();
});

it('should be initialised properly', async () => {
expect(metricQueueService).to.be.ok;
expect(metricQueueService).to.have.all.keys('DEFAULT_ATTEMPTS', 'bullMqService', 'name', 'token_list');
expect(await metricQueueService.bullMqService.getRunningStatus()).to.deep.include({
queueName: 'metric-complete',
workerName: 'metric',
});
expect(metricQueueService.bullMqService.worker.opts).to.deep.include({
concurrency: 1,
lockDuration: 500,
});
expect(metricQueueService.bullMqService.worker.opts.connection).to.deep.include({
host: 'localhost',
port: 6379,
});
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
import { WorkerOptions } from 'bullmq';
import { Inject, Injectable, Logger } from '@nestjs/common';
import { QueueService } from '@novu/application-generic';
import { checkinForCronJob } from '../../shared/utils';
import process from 'process';
import { JobTopicNameEnum } from '@novu/shared';

const LOG_CONTEXT = 'MetricQueueService';
const METRIC_JOB_ID = 'metric-job';

@Injectable()
export class MetricQueueCompletedService extends QueueService<Record<string, never>> {
constructor(@Inject('BULLMQ_LIST') private token_list: QueueService[]) {
super(JobTopicNameEnum.METRICS_COMPLETED);

this.bullMqService.createWorker(this.name, this.getWorkerProcessor(), this.getWorkerOpts());

this.bullMqService.worker.on('completed', async (job) => {
await checkinForCronJob(process.env.COMPLETE_CRON_ID);
Logger.verbose(`metric job succeeded`);
});

this.bullMqService.worker.on('failed', async (job, error) => {
Logger.error(`job failed to start: ${error}`, job);
});

this.addToQueueIfMetricJobExists();
}

private addToQueueIfMetricJobExists(): void {
Promise.resolve(
this.bullMqService.queue.getRepeatableJobs().then((job): boolean => {
let exists = false;
for (const jobElement of job) {
if (jobElement.id === METRIC_JOB_ID) {
exists = true;
}
}

return exists;
})
)
.then(async (exists): Promise<void> => {
if (exists) {
Logger.debug('metric job exists: ' + exists, LOG_CONTEXT);
} else {
Logger.debug(`metricJob doesn't exist, creating it`, LOG_CONTEXT);

return await this.addToQueue(METRIC_JOB_ID, undefined, '', {
jobId: METRIC_JOB_ID,
repeatJobKey: METRIC_JOB_ID,
repeat: {
immediately: true,
pattern: '* * * * *',
},
removeOnFail: true,
removeOnComplete: true,
attempts: 1,
});
}

return undefined;
})
.catch((error) => Logger.error('Metric Job Exists function errored', LOG_CONTEXT, error));
}

private getWorkerOpts(): WorkerOptions {
return {
lockDuration: 900,
concurrency: 1,
settings: {},
} as WorkerOptions;
}

private getWorkerProcessor() {
return async () => {
return await new Promise<void>(async (resolve, reject): Promise<void> => {
Logger.verbose('metric job started', LOG_CONTEXT);
const deploymentName = process.env.FLEET_NAME ?? 'default';

try {
for (const queueService of this.token_list) {
const metrics = await queueService.bullMqService.getQueueMetrics(0, 1);

const completeNumber = metrics.completed.count;
const failNumber = metrics.failed.count;

if (process.env.NOVU_MANAGED_SERVICE === 'true' && process.env.NEW_RELIC_LICENSE_KEY.length !== 0) {
Logger.verbose('completed length', process.env.NEW_RELIC_LICENSE_KEY.length);
Logger.log('Recording metrics');

const nr = require('newrelic');
nr.recordMetric(`Queue/${deploymentName}/${queueService.name}/completed`, completeNumber);
nr.recordMetric(`Queue/${deploymentName}/${queueService.name}/failed`, failNumber);

Logger.verbose(`Queue/${deploymentName}/${queueService.name}/completed`, completeNumber);
Logger.verbose(`Queue/${deploymentName}/${queueService.name}/failed`, failNumber);
} else {
Logger.debug(`Queue/${deploymentName}/${queueService.name}/completed`, completeNumber);
Logger.debug(`Queue/${deploymentName}/${queueService.name}/failed`, failNumber);
}
}

return resolve();
} catch (error) {
Logger.error('Error occurred while processing metrics', { error });

return reject(error);
}
});
};
}
}
Loading