From 3c4c0a0126225454430d0f1d6bdc0856c42fd7df Mon Sep 17 00:00:00 2001 From: MeiMei <30769358+mei23@users.noreply.github.com> Date: Sat, 30 Sep 2023 21:59:03 +0900 Subject: [PATCH] Tune queue limit (#2488) * Tune queue limit * clean wait jons too --- .config/example.yml | 4 ++-- src/queue/index.ts | 10 ++++++++-- src/queue/queues.ts | 4 ++-- 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/.config/example.yml b/.config/example.yml index 6810c9b919..bd16a9ae30 100644 --- a/.config/example.yml +++ b/.config/example.yml @@ -95,11 +95,11 @@ autoAdmin: true # Number of worker processes #clusterLimit: 1 -# Job concurrency per worker +# Job concurrency per worker (Default: deliver=cpu x 8, inbox=cpu x 1) # deliverJobConcurrency: 128 # inboxJobConcurrency: 16 -# Job rate limiter +# Job rate limiter (Default: nolimit, 変更しないことを推奨) # deliverJobPerSec: 128 # inboxJobPerSec: 16 diff --git a/src/queue/index.ts b/src/queue/index.ts index 7638745fd8..91213f10c0 100644 --- a/src/queue/index.ts +++ b/src/queue/index.ts @@ -13,6 +13,7 @@ import { DriveFile } from '../models/entities/drive-file'; import { getJobInfo } from './get-job-info'; import { IActivity } from '../remote/activitypub/type'; import { dbQueue, deliverQueue, inboxQueue, objectStorageQueue } from './queues'; +import { cpus } from 'os'; function renderError(e: Error): any { return { @@ -27,6 +28,9 @@ const inboxLogger = queueLogger.createSubLogger('inbox'); const dbLogger = queueLogger.createSubLogger('db'); const objectStorageLogger = queueLogger.createSubLogger('objectStorage'); +export const deliverJobConcurrency = config.deliverJobConcurrency || ((cpus().length || 4) * 8); +export const inboxJobConcurrency = config.inboxJobConcurrency || ((cpus().length || 4) * 1); + deliverQueue .on('waiting', (jobId) => deliverLogger.debug(`waiting id=${jobId}`)) .on('active', (job) => deliverLogger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`)) @@ -207,8 +211,8 @@ export function createCleanRemoteFilesJob() { export default function() { if (!envOption.onlyServer) { - deliverQueue.process(config.deliverJobConcurrency || 128, processDeliver); - inboxQueue.process(config.inboxJobConcurrency || 16, processInbox); + deliverQueue.process(deliverJobConcurrency, processDeliver); + inboxQueue.process(inboxJobConcurrency, processInbox); processDb(dbQueue); procesObjectStorage(objectStorageQueue); } @@ -219,9 +223,11 @@ export function destroy() { deliverLogger.succ(`Cleaned ${jobs.length} ${status} jobs`); }); deliverQueue.clean(0, 'delayed'); + deliverQueue.clean(0, 'wait'); inboxQueue.once('cleaned', (jobs, status) => { inboxLogger.succ(`Cleaned ${jobs.length} ${status} jobs`); }); inboxQueue.clean(0, 'delayed'); + inboxQueue.clean(0, 'wait'); } diff --git a/src/queue/queues.ts b/src/queue/queues.ts index ea5f6cf682..28fdc0730a 100644 --- a/src/queue/queues.ts +++ b/src/queue/queues.ts @@ -2,7 +2,7 @@ import config from '../config'; import { initialize as initializeQueue } from './initialize'; import { DbJobData, DeliverJobData, InboxJobData, ObjectStorageJobData } from './types'; -export const deliverQueue = initializeQueue('deliver', config.deliverJobPerSec || 128); -export const inboxQueue = initializeQueue('inbox', config.inboxJobPerSec || 16); +export const deliverQueue = initializeQueue('deliver', config.deliverJobPerSec || -1); +export const inboxQueue = initializeQueue('inbox', config.inboxJobPerSec || -1); export const dbQueue = initializeQueue('db'); export const objectStorageQueue = initializeQueue('objectStorage');