Skip to content

Commit

Permalink
Tune queue limit (#2488)
Browse files Browse the repository at this point in the history
* Tune queue limit

* clean wait jons too
  • Loading branch information
mei23 authored Sep 30, 2023
1 parent 82cde8b commit 3c4c0a0
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 6 deletions.
4 changes: 2 additions & 2 deletions .config/example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,11 @@ autoAdmin: true
# Number of worker processes
#clusterLimit: 1

# Job concurrency per worker
# Job concurrency per worker (Default: deliver=cpu x 8, inbox=cpu x 1)
# deliverJobConcurrency: 128
# inboxJobConcurrency: 16

# Job rate limiter
# Job rate limiter (Default: nolimit, 変更しないことを推奨)
# deliverJobPerSec: 128
# inboxJobPerSec: 16

Expand Down
10 changes: 8 additions & 2 deletions src/queue/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import { DriveFile } from '../models/entities/drive-file';
import { getJobInfo } from './get-job-info';
import { IActivity } from '../remote/activitypub/type';
import { dbQueue, deliverQueue, inboxQueue, objectStorageQueue } from './queues';
import { cpus } from 'os';

function renderError(e: Error): any {
return {
Expand All @@ -27,6 +28,9 @@ const inboxLogger = queueLogger.createSubLogger('inbox');
const dbLogger = queueLogger.createSubLogger('db');
const objectStorageLogger = queueLogger.createSubLogger('objectStorage');

export const deliverJobConcurrency = config.deliverJobConcurrency || ((cpus().length || 4) * 8);
export const inboxJobConcurrency = config.inboxJobConcurrency || ((cpus().length || 4) * 1);

deliverQueue
.on('waiting', (jobId) => deliverLogger.debug(`waiting id=${jobId}`))
.on('active', (job) => deliverLogger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`))
Expand Down Expand Up @@ -207,8 +211,8 @@ export function createCleanRemoteFilesJob() {

export default function() {
if (!envOption.onlyServer) {
deliverQueue.process(config.deliverJobConcurrency || 128, processDeliver);
inboxQueue.process(config.inboxJobConcurrency || 16, processInbox);
deliverQueue.process(deliverJobConcurrency, processDeliver);
inboxQueue.process(inboxJobConcurrency, processInbox);
processDb(dbQueue);
procesObjectStorage(objectStorageQueue);
}
Expand All @@ -219,9 +223,11 @@ export function destroy() {
deliverLogger.succ(`Cleaned ${jobs.length} ${status} jobs`);
});
deliverQueue.clean(0, 'delayed');
deliverQueue.clean(0, 'wait');

inboxQueue.once('cleaned', (jobs, status) => {
inboxLogger.succ(`Cleaned ${jobs.length} ${status} jobs`);
});
inboxQueue.clean(0, 'delayed');
inboxQueue.clean(0, 'wait');
}
4 changes: 2 additions & 2 deletions src/queue/queues.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import config from '../config';
import { initialize as initializeQueue } from './initialize';
import { DbJobData, DeliverJobData, InboxJobData, ObjectStorageJobData } from './types';

export const deliverQueue = initializeQueue<DeliverJobData>('deliver', config.deliverJobPerSec || 128);
export const inboxQueue = initializeQueue<InboxJobData>('inbox', config.inboxJobPerSec || 16);
export const deliverQueue = initializeQueue<DeliverJobData>('deliver', config.deliverJobPerSec || -1);
export const inboxQueue = initializeQueue<InboxJobData>('inbox', config.inboxJobPerSec || -1);
export const dbQueue = initializeQueue<DbJobData>('db');
export const objectStorageQueue = initializeQueue<ObjectStorageJobData>('objectStorage');

0 comments on commit 3c4c0a0

Please sign in to comment.