diff --git a/.devcontainer/docker-compose.yml b/.devcontainer/docker-compose.yml index d9f38b8235fc..a8ad3c6f78b7 100644 --- a/.devcontainer/docker-compose.yml +++ b/.devcontainer/docker-compose.yml @@ -24,7 +24,7 @@ services: DFLY_snapshot_cron: '* * * * *' DFLY_version_check: false DFLY_tcp_backlog: 2048 - DFLY_default_lua_flags: allow-undeclared-keys + DFLY_lock_on_hashtags: true DFLY_pipeline_squash: 0 DFLY_multi_exec_squash: false DFLY_conn_io_threads: 4 diff --git a/.github/workflows/test-backend.yml b/.github/workflows/test-backend.yml index 3a2744c85878..67517f395641 100644 --- a/.github/workflows/test-backend.yml +++ b/.github/workflows/test-backend.yml @@ -38,7 +38,7 @@ jobs: env: DFLY_version_check: false DFLY_tcp_backlog: 2048 - DFLY_default_lua_flags: allow-undeclared-keys + DFLY_lock_on_hashtags: true DFLY_pipeline_squash: 0 DFLY_multi_exec_squash: false DFLY_conn_io_threads: 4 @@ -100,7 +100,7 @@ jobs: env: DFLY_version_check: false DFLY_tcp_backlog: 2048 - DFLY_default_lua_flags: allow-undeclared-keys + DFLY_lock_on_hashtags: true DFLY_pipeline_squash: 0 DFLY_multi_exec_squash: false DFLY_conn_io_threads: 4 diff --git a/chart/templates/Deployment.yml b/chart/templates/Deployment.yml index 2d3c89e673b9..d45dfebcaa8e 100644 --- a/chart/templates/Deployment.yml +++ b/chart/templates/Deployment.yml @@ -44,8 +44,8 @@ spec: value: false - name: DFLY_tcp_backlog value: 2048 - - name: DFLY_default_lua_flags - value: allow-undeclared-keys + - name: DFLY_lock_on_hashtags + value: true - name: DFLY_pipeline_squash value: 0 - name: DFLY_multi_exec_squash diff --git a/docker-compose.local-db.yml b/docker-compose.local-db.yml index 25793bc4f852..db2c4a8b5c7a 100644 --- a/docker-compose.local-db.yml +++ b/docker-compose.local-db.yml @@ -12,7 +12,7 @@ services: DFLY_snapshot_cron: '* * * * *' DFLY_version_check: false DFLY_tcp_backlog: 2048 - DFLY_default_lua_flags: allow-undeclared-keys + DFLY_lock_on_hashtags: true DFLY_pipeline_squash: 0 DFLY_multi_exec_squash: false DFLY_conn_io_threads: 4 diff --git a/docker-compose_example.yml b/docker-compose_example.yml index 379bc3d77f7c..eceda396f353 100644 --- a/docker-compose_example.yml +++ b/docker-compose_example.yml @@ -32,7 +32,7 @@ services: DFLY_snapshot_cron: '* * * * *' DFLY_version_check: false DFLY_tcp_backlog: 2048 - DFLY_default_lua_flags: allow-undeclared-keys + DFLY_lock_on_hashtags: true DFLY_pipeline_squash: 0 DFLY_multi_exec_squash: false DFLY_conn_io_threads: 4 diff --git a/packages/backend/src/core/QueueModule.ts b/packages/backend/src/core/QueueModule.ts index d89fff81ee19..e51ce09390e2 100644 --- a/packages/backend/src/core/QueueModule.ts +++ b/packages/backend/src/core/QueueModule.ts @@ -36,13 +36,13 @@ const $endedPollNotification: Provider = { const $deliver: Provider = { provide: 'queue:deliver', - useFactory: (config: Config) => new Queues(config.redisForDeliverQueues.map(queueConfig => new Bull.Queue(QUEUE.DELIVER, baseQueueOptions(queueConfig, config.bullmqQueueOptions, QUEUE.DELIVER)))), + useFactory: (config: Config) => new Queues(config.redisForDeliverQueues.map((queueConfig, index) => new Bull.Queue(`${QUEUE.DELIVER}-${index}`, baseQueueOptions(queueConfig, config.bullmqQueueOptions, QUEUE.DELIVER, index)))), inject: [DI.config], }; const $inbox: Provider = { provide: 'queue:inbox', - useFactory: (config: Config) => new Queues(config.redisForInboxQueues.map(queueConfig => new Bull.Queue(QUEUE.INBOX, baseQueueOptions(queueConfig, config.bullmqQueueOptions, QUEUE.INBOX)))), + useFactory: (config: Config) => new Queues(config.redisForInboxQueues.map((queueConfig, index) => new Bull.Queue(`${QUEUE.INBOX}-${index}`, baseQueueOptions(queueConfig, config.bullmqQueueOptions, QUEUE.INBOX, index)))), inject: [DI.config], }; @@ -54,7 +54,7 @@ const $db: Provider = { const $relationship: Provider = { provide: 'queue:relationship', - useFactory: (config: Config) => new Queues(config.redisForRelationshipQueues.map(queueConfig => new Bull.Queue(QUEUE.RELATIONSHIP, baseQueueOptions(queueConfig, config.bullmqQueueOptions, QUEUE.RELATIONSHIP)))), + useFactory: (config: Config) => new Queues(config.redisForRelationshipQueues.map((queueConfig, index) => new Bull.Queue(`${QUEUE.RELATIONSHIP}-${index}`, baseQueueOptions(queueConfig, config.bullmqQueueOptions, QUEUE.RELATIONSHIP, index)))), inject: [DI.config], }; diff --git a/packages/backend/src/queue/QueueProcessorService.ts b/packages/backend/src/queue/QueueProcessorService.ts index ac571db25902..026950cce672 100644 --- a/packages/backend/src/queue/QueueProcessorService.ts +++ b/packages/backend/src/queue/QueueProcessorService.ts @@ -208,8 +208,8 @@ export class QueueProcessorService implements OnApplicationShutdown { //#region deliver this.deliverQueueWorkers = this.config.redisForDeliverQueues .filter((_, index) => process.env.QUEUE_WORKER_INDEX == null || index === Number.parseInt(process.env.QUEUE_WORKER_INDEX, 10)) - .map(config => new Bull.Worker(QUEUE.DELIVER, (job) => this.deliverProcessorService.process(job), { - ...baseWorkerOptions(config, this.config.bullmqWorkerOptions, QUEUE.DELIVER), + .map((config, index) => new Bull.Worker(`${QUEUE.DELIVER}-${index}`, (job) => this.deliverProcessorService.process(job), { + ...baseWorkerOptions(config, this.config.bullmqWorkerOptions, QUEUE.DELIVER, index), autorun: false, concurrency: this.config.deliverJobConcurrency ?? 128, limiter: { @@ -236,8 +236,8 @@ export class QueueProcessorService implements OnApplicationShutdown { //#region inbox this.inboxQueueWorkers = this.config.redisForInboxQueues .filter((_, index) => process.env.QUEUE_WORKER_INDEX == null || index === Number.parseInt(process.env.QUEUE_WORKER_INDEX, 10)) - .map(config => new Bull.Worker(QUEUE.INBOX, (job) => this.inboxProcessorService.process(job), { - ...baseWorkerOptions(config, this.config.bullmqWorkerOptions, QUEUE.INBOX), + .map((config, index) => new Bull.Worker(`${QUEUE.INBOX}-${index}`, (job) => this.inboxProcessorService.process(job), { + ...baseWorkerOptions(config, this.config.bullmqWorkerOptions, QUEUE.INBOX, index), autorun: false, concurrency: this.config.inboxJobConcurrency ?? 16, limiter: { @@ -288,7 +288,7 @@ export class QueueProcessorService implements OnApplicationShutdown { //#region relationship this.relationshipQueueWorkers = this.config.redisForRelationshipQueues .filter((_, index) => process.env.QUEUE_WORKER_INDEX == null || index === Number.parseInt(process.env.QUEUE_WORKER_INDEX, 10)) - .map(config => new Bull.Worker(QUEUE.RELATIONSHIP, (job) => { + .map((config, index) => new Bull.Worker(`${QUEUE.RELATIONSHIP}-${index}`, (job) => { switch (job.name) { case 'follow': return this.relationshipProcessorService.processFollow(job); case 'unfollow': return this.relationshipProcessorService.processUnfollow(job); @@ -297,7 +297,7 @@ export class QueueProcessorService implements OnApplicationShutdown { default: throw new Error(`unrecognized job type ${job.name} for relationship`); } }, { - ...baseWorkerOptions(config, this.config.bullmqWorkerOptions, QUEUE.RELATIONSHIP), + ...baseWorkerOptions(config, this.config.bullmqWorkerOptions, QUEUE.RELATIONSHIP, index), autorun: false, concurrency: this.config.relationshipJobConcurrency ?? 16, limiter: { diff --git a/packages/backend/src/queue/const.ts b/packages/backend/src/queue/const.ts index 06edef1a2003..c3f2304d2459 100644 --- a/packages/backend/src/queue/const.ts +++ b/packages/backend/src/queue/const.ts @@ -18,7 +18,8 @@ export const QUEUE = { WEBHOOK_DELIVER: 'webhookDeliver', }; -export function baseQueueOptions(config: RedisOptions & RedisOptionsSource, queueOptions: Partial, queueName: typeof QUEUE[keyof typeof QUEUE]): Bull.QueueOptions { +export function baseQueueOptions(config: RedisOptions & RedisOptionsSource, queueOptions: Partial, queueName: typeof QUEUE[keyof typeof QUEUE], index?: number): Bull.QueueOptions { + const name = typeof index === 'number' ? `${queueName}-${index}` : queueName; return { ...queueOptions, connection: { @@ -33,11 +34,12 @@ export function baseQueueOptions(config: RedisOptions & RedisOptionsSource, queu return 1; }, }, - prefix: config.prefix ? `${config.prefix}:queue:${queueName}` : `queue:${queueName}`, + prefix: config.prefix ? `{${config.prefix}:queue:${name}}` : `{queue:${name}}`, }; } -export function baseWorkerOptions(config: RedisOptions & RedisOptionsSource, workerOptions: Partial, queueName: typeof QUEUE[keyof typeof QUEUE]): Bull.WorkerOptions { +export function baseWorkerOptions(config: RedisOptions & RedisOptionsSource, workerOptions: Partial, queueName: typeof QUEUE[keyof typeof QUEUE], index?: number): Bull.WorkerOptions { + const name = typeof index === 'number' ? `${queueName}-${index}` : queueName; return { ...workerOptions, connection: { @@ -52,6 +54,6 @@ export function baseWorkerOptions(config: RedisOptions & RedisOptionsSource, wor return 1; }, }, - prefix: config.prefix ? `${config.prefix}:queue:${queueName}` : `queue:${queueName}`, + prefix: config.prefix ? `{${config.prefix}:queue:${name}}` : `{queue:${name}}`, }; } diff --git a/packages/backend/src/server/web/ClientServerService.ts b/packages/backend/src/server/web/ClientServerService.ts index 9c8ee94d6ae0..eef7c389bc76 100644 --- a/packages/backend/src/server/web/ClientServerService.ts +++ b/packages/backend/src/server/web/ClientServerService.ts @@ -245,13 +245,13 @@ export class ClientServerService { queues: [ this.systemQueue, this.endedPollNotificationQueue, + ...this.deliverQueue.queues, + ...this.inboxQueue.queues, this.dbQueue, + ...this.relationshipQueue.queues, this.objectStorageQueue, this.webhookDeliverQueue, - ].map(q => new BullMQAdapter(q)) - .concat(this.deliverQueue.queues.map((q, index) => new BullMQAdapter(q, { prefix: `${index}-` }))) - .concat(this.inboxQueue.queues.map((q, index) => new BullMQAdapter(q, { prefix: `${index}-` }))) - .concat(this.relationshipQueue.queues.map((q, index) => new BullMQAdapter(q, { prefix: `${index}-` }))), + ].map(q => new BullMQAdapter(q)), serverAdapter, }); diff --git a/packages/backend/test/docker-compose.yml b/packages/backend/test/docker-compose.yml index 74659a952ae8..d54d61ae7420 100644 --- a/packages/backend/test/docker-compose.yml +++ b/packages/backend/test/docker-compose.yml @@ -8,7 +8,7 @@ services: environment: DFLY_version_check: false DFLY_tcp_backlog: 2048 - DFLY_default_lua_flags: allow-undeclared-keys + DFLY_lock_on_hashtags: true DFLY_pipeline_squash: 0 DFLY_multi_exec_squash: false DFLY_conn_io_threads: 4