-
Notifications
You must be signed in to change notification settings - Fork 3.9k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(infra): add active and completed job metrics with dedicated queues
Co-Author: Zac Clifton <[email protected]>
- Loading branch information
1 parent
de68b5a
commit 0edc37f
Showing
30 changed files
with
806 additions
and
391 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,6 @@ | ||
NODE_ENV=local | ||
PORT=3004 | ||
FLEET_NAME="default" | ||
STORE_ENCRYPTION_KEY="<ENCRYPTION_KEY_MUST_BE_32_LONG>" | ||
|
||
# Novu | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
import { Logger } from '@nestjs/common'; | ||
|
||
const url = process.env.CRON_CHECKING_URL | ||
? process.env.CRON_CHECKING_URL | ||
: 'https://uptime.betterstack.com/api/v1/heartbeat/'; | ||
const LOG_CONTEXT = 'cronHealth'; | ||
export async function checkingForCronJob(cronId?: string) { | ||
if (process.env.NOVU_MANAGED_SERVICE && process.env.NODE_ENV === 'production' && cronId && url) { | ||
Logger.verbose(`Calling health endpoint for ${cronId}`); | ||
|
||
const response = await fetch(url + cronId); | ||
|
||
if (response.status !== 200) { | ||
Logger.error(`Failed calling better Uptime: ${response.status}`, LOG_CONTEXT); | ||
} else { | ||
Logger.verbose(`Response from better Uptime: ${response.status}`, LOG_CONTEXT); | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,4 @@ | ||
export * from './constants'; | ||
export * from './exceptions'; | ||
export * from './hmac'; | ||
export * from './cron-health'; |
72 changes: 72 additions & 0 deletions
72
apps/worker/src/app/workflow/services/active-jobs-metric.service.spec.ts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,72 @@ | ||
import { Test } from '@nestjs/testing'; | ||
import { expect } from 'chai'; | ||
|
||
import { | ||
ActiveJobsMetricQueueService, | ||
ActiveJobsMetricWorkerService, | ||
StandardQueueService, | ||
WebSocketsQueueService, | ||
WorkflowQueueService, | ||
} from '@novu/application-generic'; | ||
|
||
import { ActiveJobsMetricService } from './active-jobs-metric.service'; | ||
|
||
import { WorkflowModule } from '../workflow.module'; | ||
|
||
let activeJobsMetricService: ActiveJobsMetricService; | ||
|
||
describe('Active Jobs Metric Service', () => { | ||
before(async () => { | ||
process.env.IN_MEMORY_CLUSTER_MODE_ENABLED = 'false'; | ||
process.env.IS_IN_MEMORY_CLUSTER_MODE_ENABLED = 'false'; | ||
|
||
const moduleRef = await Test.createTestingModule({ | ||
imports: [WorkflowModule], | ||
}).compile(); | ||
|
||
const standardService = moduleRef.get<StandardQueueService>(StandardQueueService); | ||
const webSocketsQueueService = moduleRef.get<WebSocketsQueueService>(WebSocketsQueueService); | ||
const workflowQueueService = moduleRef.get<WorkflowQueueService>(WorkflowQueueService); | ||
|
||
const activeJobsMetricQueueService = new ActiveJobsMetricQueueService(); | ||
const activeJobsMetricWorkerService = new ActiveJobsMetricWorkerService(); | ||
|
||
activeJobsMetricService = new ActiveJobsMetricService( | ||
[standardService, webSocketsQueueService, workflowQueueService], | ||
activeJobsMetricQueueService, | ||
activeJobsMetricWorkerService | ||
); | ||
}); | ||
|
||
after(async () => { | ||
await activeJobsMetricService.activeJobsMetricQueueService.queue.drain(); | ||
await activeJobsMetricService.gracefulShutdown(); | ||
}); | ||
|
||
it('should be initialised properly', async () => { | ||
expect(activeJobsMetricService).to.be.ok; | ||
expect(activeJobsMetricService).to.have.all.keys( | ||
'activeJobsMetricQueueService', | ||
'activeJobsMetricWorkerService', | ||
'tokenList' | ||
); | ||
expect(await activeJobsMetricService.activeJobsMetricQueueService.bullMqService.getStatus()).to.deep.equal({ | ||
queueIsPaused: false, | ||
queueName: 'metric-active-jobs', | ||
workerName: undefined, | ||
workerIsPaused: undefined, | ||
workerIsRunning: undefined, | ||
}); | ||
expect(await activeJobsMetricService.activeJobsMetricWorkerService.bullMqService.getStatus()).to.deep.equal({ | ||
queueIsPaused: undefined, | ||
queueName: undefined, | ||
workerName: 'metric-active-jobs', | ||
workerIsPaused: false, | ||
workerIsRunning: true, | ||
}); | ||
expect(activeJobsMetricService.activeJobsMetricWorkerService.worker.opts).to.deep.include({ | ||
concurrency: 1, | ||
lockDuration: 900, | ||
}); | ||
}); | ||
}); |
157 changes: 157 additions & 0 deletions
157
apps/worker/src/app/workflow/services/active-jobs-metric.service.ts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,157 @@ | ||
import { | ||
ActiveJobsMetricQueueService, | ||
ActiveJobsMetricWorkerService, | ||
QueueBaseService, | ||
WorkerOptions, | ||
} from '@novu/application-generic'; | ||
import * as process from 'process'; | ||
import { JobTopicNameEnum } from '@novu/shared'; | ||
|
||
import { Inject, Injectable, Logger } from '@nestjs/common'; | ||
|
||
import { checkingForCronJob } from '../../shared/utils'; | ||
|
||
const LOG_CONTEXT = 'ActiveJobMetricService'; | ||
const METRIC_JOB_ID = 'metric-job'; | ||
|
||
@Injectable() | ||
export class ActiveJobsMetricService { | ||
constructor( | ||
@Inject('BULLMQ_LIST') private tokenList: QueueBaseService[], | ||
public readonly activeJobsMetricQueueService: ActiveJobsMetricQueueService, | ||
public readonly activeJobsMetricWorkerService: ActiveJobsMetricWorkerService | ||
) { | ||
if (process.env.NOVU_MANAGED_SERVICE === 'true' && process.env.NEW_RELIC_LICENSE_KEY) { | ||
this.activeJobsMetricQueueService.createQueue(); | ||
this.activeJobsMetricWorkerService.createWorker(this.getWorkerProcessor(), this.getWorkerOptions()); | ||
|
||
this.activeJobsMetricWorkerService.worker.on('completed', async (job) => { | ||
await checkingForCronJob(process.env.ACTIVE_CRON_ID); | ||
Logger.verbose({ jobId: job.id }, 'Metric Completed Job', LOG_CONTEXT); | ||
}); | ||
|
||
this.activeJobsMetricWorkerService.worker.on('failed', async (job, error) => { | ||
Logger.verbose('Metric Completed Job failed', LOG_CONTEXT, error); | ||
}); | ||
|
||
this.addToQueueIfMetricJobExists(); | ||
} | ||
} | ||
|
||
private addToQueueIfMetricJobExists(): void { | ||
Promise.resolve( | ||
this.activeJobsMetricQueueService.queue.getRepeatableJobs().then((job): boolean => { | ||
let exists = false; | ||
for (const jobElement of job) { | ||
if (jobElement.id === METRIC_JOB_ID) { | ||
exists = true; | ||
} | ||
} | ||
|
||
return exists; | ||
}) | ||
) | ||
.then(async (exists: boolean): Promise<void> => { | ||
Logger.debug(`metric job exists: ${exists}`, LOG_CONTEXT); | ||
|
||
if (!exists) { | ||
Logger.debug(`metricJob doesn't exist, creating it`, LOG_CONTEXT); | ||
|
||
return await this.activeJobsMetricQueueService.add(METRIC_JOB_ID, undefined, '', { | ||
jobId: METRIC_JOB_ID, | ||
repeatJobKey: METRIC_JOB_ID, | ||
repeat: { | ||
immediately: true, | ||
pattern: '* * * * * *', | ||
}, | ||
removeOnFail: true, | ||
removeOnComplete: true, | ||
attempts: 1, | ||
}); | ||
} | ||
|
||
return undefined; | ||
}) | ||
.catch((error) => Logger.error('Metric Job Exists function errored', LOG_CONTEXT, error)); | ||
} | ||
|
||
private getWorkerOptions(): WorkerOptions { | ||
return { | ||
lockDuration: 900, | ||
concurrency: 1, | ||
settings: {}, | ||
}; | ||
} | ||
|
||
private getWorkerProcessor() { | ||
return async () => { | ||
return await new Promise<void>(async (resolve, reject): Promise<void> => { | ||
Logger.verbose('metric job started', LOG_CONTEXT); | ||
const deploymentName = process.env.FLEET_NAME ?? 'default'; | ||
|
||
try { | ||
for (const queueService of this.tokenList) { | ||
const waitCount = await queueService.bullMqService.queue.getWaitingCount(); | ||
const delayedCount = await queueService.bullMqService.queue.getDelayedCount(); | ||
const activeCount = await queueService.bullMqService.queue.getActiveCount(); | ||
|
||
if (process.env.NOVU_MANAGED_SERVICE === 'true' && process.env.NEW_RELIC_LICENSE_KEY) { | ||
Logger.verbose('active length', process.env.NEW_RELIC_LICENSE_KEY.length); | ||
Logger.log('Recording active, waiting, and delayed metrics'); | ||
|
||
const nr = require('newrelic'); | ||
nr.recordMetric( | ||
`ActiveJobsMetricQueueService/${deploymentName}/${queueService.topic}/waiting`, | ||
waitCount | ||
); | ||
nr.recordMetric( | ||
`ActiveJobsMetricQueueService/${deploymentName}/${queueService.topic}/delayed`, | ||
delayedCount | ||
); | ||
nr.recordMetric( | ||
`ActiveJobsMetricQueueService/${deploymentName}/${queueService.topic}/active`, | ||
activeCount | ||
); | ||
|
||
Logger.verbose(`ActiveJobsMetricQueueService/${deploymentName}/${queueService.topic}/waiting`, waitCount); | ||
Logger.verbose( | ||
`ActiveJobsMetricQueueService/${deploymentName}/${queueService.topic}/delayed`, | ||
delayedCount | ||
); | ||
Logger.verbose( | ||
`ActiveJobsMetricQueueService/${deploymentName}/${queueService.topic}/active`, | ||
activeCount | ||
); | ||
} else { | ||
Logger.debug(`ActiveJobsMetricQueueService/${deploymentName}/${queueService.topic}/waiting`, waitCount); | ||
Logger.debug( | ||
`ActiveJobsMetricQueueService/${deploymentName}/${queueService.topic}/delayed`, | ||
delayedCount | ||
); | ||
Logger.debug(`ActiveJobsMetricQueueService/${deploymentName}/${queueService.topic}/active`, activeCount); | ||
} | ||
} | ||
|
||
return resolve(); | ||
} catch (error) { | ||
Logger.error({ error }, 'Error occured while processing metrics', LOG_CONTEXT); | ||
|
||
return reject(error); | ||
} | ||
}); | ||
}; | ||
} | ||
|
||
public async gracefulShutdown(): Promise<void> { | ||
Logger.log('Shutting the Active Jobs Metric service down', LOG_CONTEXT); | ||
|
||
await this.activeJobsMetricQueueService.gracefulShutdown(); | ||
await this.activeJobsMetricWorkerService.gracefulShutdown(); | ||
|
||
Logger.log('Shutting down the Active Jobs Metric service has finished', LOG_CONTEXT); | ||
} | ||
|
||
async onModuleDestroy(): Promise<void> { | ||
await this.gracefulShutdown(); | ||
} | ||
} |
72 changes: 72 additions & 0 deletions
72
apps/worker/src/app/workflow/services/completed-jobs-metric.service.spec.ts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,72 @@ | ||
import { Test } from '@nestjs/testing'; | ||
import { expect } from 'chai'; | ||
|
||
import { | ||
CompletedJobsMetricQueueService, | ||
CompletedJobsMetricWorkerService, | ||
StandardQueueService, | ||
WebSocketsQueueService, | ||
WorkflowQueueService, | ||
} from '@novu/application-generic'; | ||
|
||
import { CompletedJobsMetricService } from './completed-jobs-metric.service'; | ||
|
||
import { WorkflowModule } from '../workflow.module'; | ||
|
||
let completedJobsMetricService: CompletedJobsMetricService; | ||
|
||
describe('Completed Jobs Metric Service', () => { | ||
before(async () => { | ||
process.env.IN_MEMORY_CLUSTER_MODE_ENABLED = 'false'; | ||
process.env.IS_IN_MEMORY_CLUSTER_MODE_ENABLED = 'false'; | ||
|
||
const moduleRef = await Test.createTestingModule({ | ||
imports: [WorkflowModule], | ||
}).compile(); | ||
|
||
const standardService = moduleRef.get<StandardQueueService>(StandardQueueService); | ||
const webSocketsQueueService = moduleRef.get<WebSocketsQueueService>(WebSocketsQueueService); | ||
const workflowQueueService = moduleRef.get<WorkflowQueueService>(WorkflowQueueService); | ||
|
||
const completedJobsMetricQueueService = new CompletedJobsMetricQueueService(); | ||
const completedJobsMetricWorkerService = new CompletedJobsMetricWorkerService(); | ||
|
||
completedJobsMetricService = new CompletedJobsMetricService( | ||
[standardService, webSocketsQueueService, workflowQueueService], | ||
completedJobsMetricQueueService, | ||
completedJobsMetricWorkerService | ||
); | ||
}); | ||
|
||
after(async () => { | ||
await completedJobsMetricService.completedJobsMetricQueueService.queue.drain(); | ||
await completedJobsMetricService.gracefulShutdown(); | ||
}); | ||
|
||
it('should be initialised properly', async () => { | ||
expect(completedJobsMetricService).to.be.ok; | ||
expect(completedJobsMetricService).to.have.all.keys( | ||
'completedJobsMetricQueueService', | ||
'completedJobsMetricWorkerService', | ||
'tokenList' | ||
); | ||
expect(await completedJobsMetricService.completedJobsMetricQueueService.bullMqService.getStatus()).to.deep.equal({ | ||
queueIsPaused: false, | ||
queueName: 'metric-completed-jobs', | ||
workerName: undefined, | ||
workerIsPaused: undefined, | ||
workerIsRunning: undefined, | ||
}); | ||
expect(await completedJobsMetricService.completedJobsMetricWorkerService.bullMqService.getStatus()).to.deep.equal({ | ||
queueIsPaused: undefined, | ||
queueName: undefined, | ||
workerName: 'metric-completed-jobs', | ||
workerIsPaused: false, | ||
workerIsRunning: true, | ||
}); | ||
expect(completedJobsMetricService.completedJobsMetricWorkerService.worker.opts).to.deep.include({ | ||
concurrency: 1, | ||
lockDuration: 900, | ||
}); | ||
}); | ||
}); |
Oops, something went wrong.