Skip to content

Commit

Permalink
BullMQ + DragonflyDB で Hashtag を使用しすべてをロックしないようにする
Browse files Browse the repository at this point in the history
  • Loading branch information
riku6460 committed Dec 22, 2024
1 parent 4ecfae0 commit 43f3d4f
Show file tree
Hide file tree
Showing 10 changed files with 27 additions and 25 deletions.
2 changes: 1 addition & 1 deletion .devcontainer/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ services:
DFLY_snapshot_cron: '* * * * *'
DFLY_version_check: false
DFLY_tcp_backlog: 2048
DFLY_default_lua_flags: allow-undeclared-keys
DFLY_lock_on_hashtags: true
DFLY_pipeline_squash: 0
DFLY_multi_exec_squash: false
DFLY_conn_io_threads: 4
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/test-backend.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ jobs:
env:
DFLY_version_check: false
DFLY_tcp_backlog: 2048
DFLY_default_lua_flags: allow-undeclared-keys
DFLY_lock_on_hashtags: true
DFLY_pipeline_squash: 0
DFLY_multi_exec_squash: false
DFLY_conn_io_threads: 4
Expand Down Expand Up @@ -100,7 +100,7 @@ jobs:
env:
DFLY_version_check: false
DFLY_tcp_backlog: 2048
DFLY_default_lua_flags: allow-undeclared-keys
DFLY_lock_on_hashtags: true
DFLY_pipeline_squash: 0
DFLY_multi_exec_squash: false
DFLY_conn_io_threads: 4
Expand Down
4 changes: 2 additions & 2 deletions chart/templates/Deployment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ spec:
value: false
- name: DFLY_tcp_backlog
value: 2048
- name: DFLY_default_lua_flags
value: allow-undeclared-keys
- name: DFLY_lock_on_hashtags
value: true
- name: DFLY_pipeline_squash
value: 0
- name: DFLY_multi_exec_squash
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.local-db.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ services:
DFLY_snapshot_cron: '* * * * *'
DFLY_version_check: false
DFLY_tcp_backlog: 2048
DFLY_default_lua_flags: allow-undeclared-keys
DFLY_lock_on_hashtags: true
DFLY_pipeline_squash: 0
DFLY_multi_exec_squash: false
DFLY_conn_io_threads: 4
Expand Down
2 changes: 1 addition & 1 deletion docker-compose_example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ services:
DFLY_snapshot_cron: '* * * * *'
DFLY_version_check: false
DFLY_tcp_backlog: 2048
DFLY_default_lua_flags: allow-undeclared-keys
DFLY_lock_on_hashtags: true
DFLY_pipeline_squash: 0
DFLY_multi_exec_squash: false
DFLY_conn_io_threads: 4
Expand Down
6 changes: 3 additions & 3 deletions packages/backend/src/core/QueueModule.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ const $endedPollNotification: Provider = {

const $deliver: Provider = {
provide: 'queue:deliver',
useFactory: (config: Config) => new Queues(config.redisForDeliverQueues.map(queueConfig => new Bull.Queue(QUEUE.DELIVER, baseQueueOptions(queueConfig, config.bullmqQueueOptions, QUEUE.DELIVER)))),
useFactory: (config: Config) => new Queues(config.redisForDeliverQueues.map((queueConfig, index) => new Bull.Queue(`${QUEUE.DELIVER}-${index}`, baseQueueOptions(queueConfig, config.bullmqQueueOptions, QUEUE.DELIVER, index)))),
inject: [DI.config],
};

const $inbox: Provider = {
provide: 'queue:inbox',
useFactory: (config: Config) => new Queues(config.redisForInboxQueues.map(queueConfig => new Bull.Queue(QUEUE.INBOX, baseQueueOptions(queueConfig, config.bullmqQueueOptions, QUEUE.INBOX)))),
useFactory: (config: Config) => new Queues(config.redisForInboxQueues.map((queueConfig, index) => new Bull.Queue(`${QUEUE.INBOX}-${index}`, baseQueueOptions(queueConfig, config.bullmqQueueOptions, QUEUE.INBOX, index)))),
inject: [DI.config],
};

Expand All @@ -54,7 +54,7 @@ const $db: Provider = {

const $relationship: Provider = {
provide: 'queue:relationship',
useFactory: (config: Config) => new Queues(config.redisForRelationshipQueues.map(queueConfig => new Bull.Queue(QUEUE.RELATIONSHIP, baseQueueOptions(queueConfig, config.bullmqQueueOptions, QUEUE.RELATIONSHIP)))),
useFactory: (config: Config) => new Queues(config.redisForRelationshipQueues.map((queueConfig, index) => new Bull.Queue(`${QUEUE.RELATIONSHIP}-${index}`, baseQueueOptions(queueConfig, config.bullmqQueueOptions, QUEUE.RELATIONSHIP, index)))),
inject: [DI.config],
};

Expand Down
12 changes: 6 additions & 6 deletions packages/backend/src/queue/QueueProcessorService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,8 @@ export class QueueProcessorService implements OnApplicationShutdown {
//#region deliver
this.deliverQueueWorkers = this.config.redisForDeliverQueues
.filter((_, index) => process.env.QUEUE_WORKER_INDEX == null || index === Number.parseInt(process.env.QUEUE_WORKER_INDEX, 10))
.map(config => new Bull.Worker(QUEUE.DELIVER, (job) => this.deliverProcessorService.process(job), {
...baseWorkerOptions(config, this.config.bullmqWorkerOptions, QUEUE.DELIVER),
.map((config, index) => new Bull.Worker(`${QUEUE.DELIVER}-${index}`, (job) => this.deliverProcessorService.process(job), {
...baseWorkerOptions(config, this.config.bullmqWorkerOptions, QUEUE.DELIVER, index),
autorun: false,
concurrency: this.config.deliverJobConcurrency ?? 128,
limiter: {
Expand All @@ -236,8 +236,8 @@ export class QueueProcessorService implements OnApplicationShutdown {
//#region inbox
this.inboxQueueWorkers = this.config.redisForInboxQueues
.filter((_, index) => process.env.QUEUE_WORKER_INDEX == null || index === Number.parseInt(process.env.QUEUE_WORKER_INDEX, 10))
.map(config => new Bull.Worker(QUEUE.INBOX, (job) => this.inboxProcessorService.process(job), {
...baseWorkerOptions(config, this.config.bullmqWorkerOptions, QUEUE.INBOX),
.map((config, index) => new Bull.Worker(`${QUEUE.INBOX}-${index}`, (job) => this.inboxProcessorService.process(job), {
...baseWorkerOptions(config, this.config.bullmqWorkerOptions, QUEUE.INBOX, index),
autorun: false,
concurrency: this.config.inboxJobConcurrency ?? 16,
limiter: {
Expand Down Expand Up @@ -288,7 +288,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
//#region relationship
this.relationshipQueueWorkers = this.config.redisForRelationshipQueues
.filter((_, index) => process.env.QUEUE_WORKER_INDEX == null || index === Number.parseInt(process.env.QUEUE_WORKER_INDEX, 10))
.map(config => new Bull.Worker(QUEUE.RELATIONSHIP, (job) => {
.map((config, index) => new Bull.Worker(`${QUEUE.RELATIONSHIP}-${index}`, (job) => {
switch (job.name) {
case 'follow': return this.relationshipProcessorService.processFollow(job);
case 'unfollow': return this.relationshipProcessorService.processUnfollow(job);
Expand All @@ -297,7 +297,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
default: throw new Error(`unrecognized job type ${job.name} for relationship`);
}
}, {
...baseWorkerOptions(config, this.config.bullmqWorkerOptions, QUEUE.RELATIONSHIP),
...baseWorkerOptions(config, this.config.bullmqWorkerOptions, QUEUE.RELATIONSHIP, index),
autorun: false,
concurrency: this.config.relationshipJobConcurrency ?? 16,
limiter: {
Expand Down
10 changes: 6 additions & 4 deletions packages/backend/src/queue/const.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ export const QUEUE = {
WEBHOOK_DELIVER: 'webhookDeliver',
};

export function baseQueueOptions(config: RedisOptions & RedisOptionsSource, queueOptions: Partial<Bull.QueueOptions>, queueName: typeof QUEUE[keyof typeof QUEUE]): Bull.QueueOptions {
export function baseQueueOptions(config: RedisOptions & RedisOptionsSource, queueOptions: Partial<Bull.QueueOptions>, queueName: typeof QUEUE[keyof typeof QUEUE], index?: number): Bull.QueueOptions {
const name = typeof index === 'number' ? `${queueName}-${index}` : queueName;
return {
...queueOptions,
connection: {
Expand All @@ -33,11 +34,12 @@ export function baseQueueOptions(config: RedisOptions & RedisOptionsSource, queu
return 1;
},
},
prefix: config.prefix ? `${config.prefix}:queue:${queueName}` : `queue:${queueName}`,
prefix: config.prefix ? `{${config.prefix}:queue:${name}}` : `{queue:${name}}`,
};
}

export function baseWorkerOptions(config: RedisOptions & RedisOptionsSource, workerOptions: Partial<Bull.WorkerOptions>, queueName: typeof QUEUE[keyof typeof QUEUE]): Bull.WorkerOptions {
export function baseWorkerOptions(config: RedisOptions & RedisOptionsSource, workerOptions: Partial<Bull.WorkerOptions>, queueName: typeof QUEUE[keyof typeof QUEUE], index?: number): Bull.WorkerOptions {
const name = typeof index === 'number' ? `${queueName}-${index}` : queueName;
return {
...workerOptions,
connection: {
Expand All @@ -52,6 +54,6 @@ export function baseWorkerOptions(config: RedisOptions & RedisOptionsSource, wor
return 1;
},
},
prefix: config.prefix ? `${config.prefix}:queue:${queueName}` : `queue:${queueName}`,
prefix: config.prefix ? `{${config.prefix}:queue:${name}}` : `{queue:${name}}`,
};
}
8 changes: 4 additions & 4 deletions packages/backend/src/server/web/ClientServerService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -245,13 +245,13 @@ export class ClientServerService {
queues: [
this.systemQueue,
this.endedPollNotificationQueue,
...this.deliverQueue.queues,
...this.inboxQueue.queues,
this.dbQueue,
...this.relationshipQueue.queues,
this.objectStorageQueue,
this.webhookDeliverQueue,
].map(q => new BullMQAdapter(q))
.concat(this.deliverQueue.queues.map((q, index) => new BullMQAdapter(q, { prefix: `${index}-` })))
.concat(this.inboxQueue.queues.map((q, index) => new BullMQAdapter(q, { prefix: `${index}-` })))
.concat(this.relationshipQueue.queues.map((q, index) => new BullMQAdapter(q, { prefix: `${index}-` }))),
].map(q => new BullMQAdapter(q)),
serverAdapter,
});

Expand Down
2 changes: 1 addition & 1 deletion packages/backend/test/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ services:
environment:
DFLY_version_check: false
DFLY_tcp_backlog: 2048
DFLY_default_lua_flags: allow-undeclared-keys
DFLY_lock_on_hashtags: true
DFLY_pipeline_squash: 0
DFLY_multi_exec_squash: false
DFLY_conn_io_threads: 4
Expand Down

0 comments on commit 43f3d4f

Please sign in to comment.