diff --git a/packages/backend/server/src/base/mutex/locker.ts b/packages/backend/server/src/base/mutex/locker.ts index 8f2dce4838ab8..9d16bfe2a4571 100644 --- a/packages/backend/server/src/base/mutex/locker.ts +++ b/packages/backend/server/src/base/mutex/locker.ts @@ -11,11 +11,12 @@ import { Lock } from './lock'; const lockScript = `local key = KEYS[1] local owner = ARGV[1] --- if lock is not exists or lock is owned by the owner --- then set lock to the owner and return 1, otherwise return 0 +-- if lock is not exists then set lock to the owner and return 1, otherwise return 0 -- if the lock is not released correctly due to unexpected reasons -- lock will be released after 60 seconds -if redis.call("get", key) == owner or redis.call("set", key, owner, "NX", "EX", 60) then +if redis.call("get", key) == owner then + return 0 +elseif redis.call("set", key, owner, "NX", "EX", 60) then return 1 else return 0 diff --git a/packages/backend/server/src/base/mutex/mutex.ts b/packages/backend/server/src/base/mutex/mutex.ts index d78f606317569..9ca7f64bb22e1 100644 --- a/packages/backend/server/src/base/mutex/mutex.ts +++ b/packages/backend/server/src/base/mutex/mutex.ts @@ -3,6 +3,7 @@ import { randomUUID } from 'node:crypto'; import { Inject, Injectable, Logger, Scope } from '@nestjs/common'; import { ModuleRef, REQUEST } from '@nestjs/core'; import type { Request } from 'express'; +import { nanoid } from 'nanoid'; import { GraphqlContext } from '../graphql'; import { retryable } from '../utils/promise'; @@ -14,7 +15,7 @@ export const MUTEX_WAIT = 100; @Injectable() export class Mutex { protected logger = new Logger(Mutex.name); - private readonly clusterIdentifier = `cluster:${randomUUID()}`; + private readonly clusterIdentifier = `cluster:${nanoid()}`; constructor(protected readonly locker: Locker) {} @@ -39,7 +40,10 @@ export class Mutex { * @param key resource key * @returns LockGuard */ - async acquire(key: string, owner: string = this.clusterIdentifier) { + async acquire( + key: string, + owner: string = `${this.clusterIdentifier}:${nanoid()}` + ) { try { return await retryable( () => this.locker.lock(owner, key), diff --git a/packages/backend/server/tests/mutex.spec.ts b/packages/backend/server/tests/mutex.spec.ts new file mode 100644 index 0000000000000..ca56265edbbbf --- /dev/null +++ b/packages/backend/server/tests/mutex.spec.ts @@ -0,0 +1,73 @@ +import { randomUUID } from 'node:crypto'; + +import { TestingModule } from '@nestjs/testing'; +import ava, { TestFn } from 'ava'; +import Sinon from 'sinon'; + +import { Locker, Mutex } from '../src/base/mutex'; +import { SessionRedis } from '../src/base/redis'; +import { createTestingModule, sleep } from './utils'; + +const test = ava as TestFn<{ + module: TestingModule; + mutex: Mutex; + locker: Locker; + session: SessionRedis; +}>; + +test.beforeEach(async t => { + const module = await createTestingModule(); + + t.context.module = module; + t.context.mutex = module.get(Mutex); + t.context.locker = module.get(Locker); + t.context.session = module.get(SessionRedis); +}); + +test.afterEach(async t => { + await t.context.module.close(); +}); + +const lockerPrefix = randomUUID(); +test('should be able to acquire lock', async t => { + const { mutex } = t.context; + + { + t.truthy( + await mutex.acquire(`${lockerPrefix}1`), + 'should be able to acquire lock' + ); + t.falsy( + await mutex.acquire(`${lockerPrefix}1`), + 'should not be able to acquire lock again' + ); + } + + { + const lock1 = await mutex.acquire(`${lockerPrefix}2`); + t.truthy(lock1); + await lock1?.release(); + const lock2 = await mutex.acquire(`${lockerPrefix}2`); + t.truthy(lock2); + } +}); + +test('should be able to acquire lock parallel', async t => { + const { mutex, locker } = t.context; + const spyedLocker = Sinon.spy(locker, 'lock'); + const requestLock = async (key: string) => { + const lock = mutex.acquire(key); + await using _lock = await lock; + await t.throwsAsync(spyedLocker.lastCall.returnValue, { + message: `Failed to acquire lock for resource [${key}]`, + }); + await sleep(100); + }; + + await t.notThrowsAsync( + Promise.all( + Array.from({ length: 10 }, _ => requestLock(`${lockerPrefix}3`)) + ), + 'should be able to acquire lock parallel' + ); +});