-
Notifications
You must be signed in to change notification settings - Fork 3.9k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(infra): add active and completed job metrics with dedicated queues
Co-Author: Zac Clifton <[email protected]>
- Loading branch information
1 parent
de68b5a
commit ea934b2
Showing
30 changed files
with
861 additions
and
391 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,6 @@ | ||
NODE_ENV=local | ||
PORT=3004 | ||
FLEET_NAME="default" | ||
STORE_ENCRYPTION_KEY="<ENCRYPTION_KEY_MUST_BE_32_LONG>" | ||
|
||
# Novu | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
import { Logger } from '@nestjs/common'; | ||
|
||
const url = process.env.CRON_CHECKING_URL | ||
? process.env.CRON_CHECKING_URL | ||
: 'https://uptime.betterstack.com/api/v1/heartbeat/'; | ||
const LOG_CONTEXT = 'cronHealth'; | ||
export async function checkingForCronJob(cronId?: string) { | ||
if (process.env.NOVU_MANAGED_SERVICE && process.env.NODE_ENV === 'production' && cronId && url) { | ||
Logger.verbose(`Calling health endpoint for ${cronId}`); | ||
|
||
const response = await fetch(url + cronId); | ||
|
||
if (response.status !== 200) { | ||
Logger.error(`Failed calling better Uptime: ${response.status}`, LOG_CONTEXT); | ||
} else { | ||
Logger.verbose(`Response from better Uptime: ${response.status}`, LOG_CONTEXT); | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,4 @@ | ||
export * from './constants'; | ||
export * from './exceptions'; | ||
export * from './hmac'; | ||
export * from './cron-health'; |
122 changes: 122 additions & 0 deletions
122
apps/worker/src/app/workflow/services/active-jobs-metric.service.spec.ts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,122 @@ | ||
import { Test, TestingModule } from '@nestjs/testing'; | ||
import { expect } from 'chai'; | ||
|
||
import { | ||
ActiveJobsMetricQueueService, | ||
ActiveJobsMetricWorkerService, | ||
StandardQueueService, | ||
WebSocketsQueueService, | ||
WorkflowQueueService, | ||
} from '@novu/application-generic'; | ||
|
||
import { ActiveJobsMetricService } from './active-jobs-metric.service'; | ||
|
||
import { WorkflowModule } from '../workflow.module'; | ||
|
||
let activeJobsMetricService: ActiveJobsMetricService; | ||
let standardService: StandardQueueService; | ||
let webSocketsQueueService: WebSocketsQueueService; | ||
let workflowQueueService: WorkflowQueueService; | ||
let moduleRef: TestingModule; | ||
|
||
before(async () => { | ||
process.env.IN_MEMORY_CLUSTER_MODE_ENABLED = 'false'; | ||
process.env.IS_IN_MEMORY_CLUSTER_MODE_ENABLED = 'false'; | ||
|
||
moduleRef = await Test.createTestingModule({ | ||
imports: [WorkflowModule], | ||
}).compile(); | ||
|
||
standardService = moduleRef.get<StandardQueueService>(StandardQueueService); | ||
webSocketsQueueService = moduleRef.get<WebSocketsQueueService>(WebSocketsQueueService); | ||
workflowQueueService = moduleRef.get<WorkflowQueueService>(WorkflowQueueService); | ||
}); | ||
|
||
describe('Active Jobs Metric Service', () => { | ||
before(async () => { | ||
process.env.IN_MEMORY_CLUSTER_MODE_ENABLED = 'false'; | ||
process.env.IS_IN_MEMORY_CLUSTER_MODE_ENABLED = 'false'; | ||
|
||
moduleRef = await Test.createTestingModule({ | ||
imports: [WorkflowModule], | ||
}).compile(); | ||
|
||
standardService = moduleRef.get<StandardQueueService>(StandardQueueService); | ||
webSocketsQueueService = moduleRef.get<WebSocketsQueueService>(WebSocketsQueueService); | ||
workflowQueueService = moduleRef.get<WorkflowQueueService>(WorkflowQueueService); | ||
|
||
const activeJobsMetricQueueService = new ActiveJobsMetricQueueService(); | ||
const activeJobsMetricWorkerService = new ActiveJobsMetricWorkerService(); | ||
|
||
activeJobsMetricService = new ActiveJobsMetricService([ | ||
standardService, | ||
webSocketsQueueService, | ||
workflowQueueService, | ||
]); | ||
}); | ||
|
||
describe('Environment variables not set', () => { | ||
beforeEach(() => { | ||
process.env.NOVU_MANAGED_SERVICE = 'false'; | ||
process.env.NEW_RELIC_LICENSE_KEY = ''; | ||
|
||
activeJobsMetricService = new ActiveJobsMetricService([ | ||
standardService, | ||
webSocketsQueueService, | ||
workflowQueueService, | ||
]); | ||
}); | ||
|
||
it('should not initialize neither the queue or the worker if the environment conditions are not met', async () => { | ||
expect(activeJobsMetricService).to.be.ok; | ||
expect(activeJobsMetricService).to.have.all.keys('tokenList'); | ||
expect(await activeJobsMetricService.activeJobsMetricQueueService).to.not.be.ok; | ||
expect(await activeJobsMetricService.activeJobsMetricWorkerService).to.not.be.ok; | ||
}); | ||
}); | ||
|
||
describe('Environment variables configured', () => { | ||
beforeEach(async () => { | ||
process.env.NOVU_MANAGED_SERVICE = 'true'; | ||
process.env.NEW_RELIC_LICENSE_KEY = 'license'; | ||
|
||
activeJobsMetricService = new ActiveJobsMetricService([ | ||
standardService, | ||
webSocketsQueueService, | ||
workflowQueueService, | ||
]); | ||
}); | ||
|
||
after(async () => { | ||
await activeJobsMetricService.activeJobsMetricQueueService.queue.drain(); | ||
await activeJobsMetricService.gracefulShutdown(); | ||
}); | ||
|
||
it('should be initialised properly', async () => { | ||
expect(activeJobsMetricService).to.be.ok; | ||
expect(activeJobsMetricService).to.have.all.keys( | ||
'activeJobsMetricQueueService', | ||
'activeJobsMetricWorkerService', | ||
'tokenList' | ||
); | ||
expect(await activeJobsMetricService.activeJobsMetricQueueService.bullMqService.getStatus()).to.deep.equal({ | ||
queueIsPaused: false, | ||
queueName: 'metric-active-jobs', | ||
workerName: undefined, | ||
workerIsPaused: undefined, | ||
workerIsRunning: undefined, | ||
}); | ||
expect(await activeJobsMetricService.activeJobsMetricWorkerService.bullMqService.getStatus()).to.deep.equal({ | ||
queueIsPaused: undefined, | ||
queueName: undefined, | ||
workerName: 'metric-active-jobs', | ||
workerIsPaused: false, | ||
workerIsRunning: true, | ||
}); | ||
expect(activeJobsMetricService.activeJobsMetricWorkerService.worker.opts).to.deep.include({ | ||
concurrency: 1, | ||
lockDuration: 900, | ||
}); | ||
}); | ||
}); | ||
}); |
141 changes: 141 additions & 0 deletions
141
apps/worker/src/app/workflow/services/active-jobs-metric.service.ts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,141 @@ | ||
import { | ||
ActiveJobsMetricQueueService, | ||
ActiveJobsMetricWorkerService, | ||
QueueBaseService, | ||
WorkerOptions, | ||
} from '@novu/application-generic'; | ||
import * as process from 'process'; | ||
import { JobTopicNameEnum } from '@novu/shared'; | ||
|
||
import { Inject, Injectable, Logger } from '@nestjs/common'; | ||
|
||
import { checkingForCronJob } from '../../shared/utils'; | ||
|
||
const LOG_CONTEXT = 'ActiveJobMetricService'; | ||
const METRIC_JOB_ID = 'metric-job'; | ||
|
||
@Injectable() | ||
export class ActiveJobsMetricService { | ||
public readonly activeJobsMetricQueueService: ActiveJobsMetricQueueService; | ||
public readonly activeJobsMetricWorkerService: ActiveJobsMetricWorkerService; | ||
|
||
constructor(@Inject('BULLMQ_LIST') private tokenList: QueueBaseService[]) { | ||
if (process.env.NOVU_MANAGED_SERVICE === 'true' && process.env.NEW_RELIC_LICENSE_KEY) { | ||
this.activeJobsMetricQueueService = new ActiveJobsMetricQueueService(); | ||
this.activeJobsMetricWorkerService = new ActiveJobsMetricWorkerService(); | ||
|
||
this.activeJobsMetricQueueService.createQueue(); | ||
this.activeJobsMetricWorkerService.createWorker(this.getWorkerProcessor(), this.getWorkerOptions()); | ||
|
||
this.activeJobsMetricWorkerService.worker.on('completed', async (job) => { | ||
await checkingForCronJob(process.env.ACTIVE_CRON_ID); | ||
Logger.verbose({ jobId: job.id }, 'Metric Completed Job', LOG_CONTEXT); | ||
}); | ||
|
||
this.activeJobsMetricWorkerService.worker.on('failed', async (job, error) => { | ||
Logger.verbose('Metric Completed Job failed', LOG_CONTEXT, error); | ||
}); | ||
|
||
this.addToQueueIfMetricJobExists(); | ||
} | ||
} | ||
|
||
private addToQueueIfMetricJobExists(): void { | ||
Promise.resolve( | ||
this.activeJobsMetricQueueService.queue.getRepeatableJobs().then((job): boolean => { | ||
let exists = false; | ||
for (const jobElement of job) { | ||
if (jobElement.id === METRIC_JOB_ID) { | ||
exists = true; | ||
} | ||
} | ||
|
||
return exists; | ||
}) | ||
) | ||
.then(async (exists: boolean): Promise<void> => { | ||
Logger.debug(`metric job exists: ${exists}`, LOG_CONTEXT); | ||
|
||
if (!exists) { | ||
Logger.debug(`metricJob doesn't exist, creating it`, LOG_CONTEXT); | ||
|
||
return await this.activeJobsMetricQueueService.add(METRIC_JOB_ID, undefined, '', { | ||
jobId: METRIC_JOB_ID, | ||
repeatJobKey: METRIC_JOB_ID, | ||
repeat: { | ||
immediately: true, | ||
pattern: '* * * * * *', | ||
}, | ||
removeOnFail: true, | ||
removeOnComplete: true, | ||
attempts: 1, | ||
}); | ||
} | ||
|
||
return undefined; | ||
}) | ||
.catch((error) => Logger.error('Metric Job Exists function errored', LOG_CONTEXT, error)); | ||
} | ||
|
||
private getWorkerOptions(): WorkerOptions { | ||
return { | ||
lockDuration: 900, | ||
concurrency: 1, | ||
settings: {}, | ||
}; | ||
} | ||
|
||
private getWorkerProcessor() { | ||
return async () => { | ||
return await new Promise<void>(async (resolve, reject): Promise<void> => { | ||
Logger.verbose('metric job started', LOG_CONTEXT); | ||
const deploymentName = process.env.FLEET_NAME ?? 'default'; | ||
|
||
try { | ||
for (const queueService of this.tokenList) { | ||
const waitCount = await queueService.bullMqService.queue.getWaitingCount(); | ||
const delayedCount = await queueService.bullMqService.queue.getDelayedCount(); | ||
const activeCount = await queueService.bullMqService.queue.getActiveCount(); | ||
|
||
Logger.verbose('active length', process.env.NEW_RELIC_LICENSE_KEY.length); | ||
Logger.log('Recording active, waiting, and delayed metrics'); | ||
|
||
const nr = require('newrelic'); | ||
nr.recordMetric(`ActiveJobsMetricQueueService/${deploymentName}/${queueService.topic}/waiting`, waitCount); | ||
nr.recordMetric( | ||
`ActiveJobsMetricQueueService/${deploymentName}/${queueService.topic}/delayed`, | ||
delayedCount | ||
); | ||
nr.recordMetric(`ActiveJobsMetricQueueService/${deploymentName}/${queueService.topic}/active`, activeCount); | ||
|
||
Logger.verbose(`ActiveJobsMetricQueueService/${deploymentName}/${queueService.topic}/waiting`, waitCount); | ||
Logger.verbose( | ||
`ActiveJobsMetricQueueService/${deploymentName}/${queueService.topic}/delayed`, | ||
delayedCount | ||
); | ||
Logger.verbose(`ActiveJobsMetricQueueService/${deploymentName}/${queueService.topic}/active`, activeCount); | ||
} | ||
|
||
return resolve(); | ||
} catch (error) { | ||
Logger.error({ error }, 'Error occured while processing metrics', LOG_CONTEXT); | ||
|
||
return reject(error); | ||
} | ||
}); | ||
}; | ||
} | ||
|
||
public async gracefulShutdown(): Promise<void> { | ||
Logger.log('Shutting the Active Jobs Metric service down', LOG_CONTEXT); | ||
|
||
await this.activeJobsMetricQueueService.gracefulShutdown(); | ||
await this.activeJobsMetricWorkerService.gracefulShutdown(); | ||
|
||
Logger.log('Shutting down the Active Jobs Metric service has finished', LOG_CONTEXT); | ||
} | ||
|
||
async onModuleDestroy(): Promise<void> { | ||
await this.gracefulShutdown(); | ||
} | ||
} |
Oops, something went wrong.