From ba75b7864253057f54f08448020d730eba4cf4d0 Mon Sep 17 00:00:00 2001 From: gamgyul163 Date: Tue, 26 Nov 2024 19:25:39 +0900 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20feat:=20flush=20=ED=95=A0=20?= =?UTF-8?q?=EB=95=8C=20redis=EB=A5=BC=20=EC=9D=B4=EC=9A=A9=ED=95=B4=20?= =?UTF-8?q?=EB=9D=BD=ED=82=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Backend/apps/api/src/chats/chats.service.ts | 32 ++++++++++++--------- 1 file changed, 19 insertions(+), 13 deletions(-) diff --git a/Backend/apps/api/src/chats/chats.service.ts b/Backend/apps/api/src/chats/chats.service.ts index e92791d9..1781116d 100644 --- a/Backend/apps/api/src/chats/chats.service.ts +++ b/Backend/apps/api/src/chats/chats.service.ts @@ -8,7 +8,6 @@ import { firstValueFrom } from 'rxjs'; @Injectable() export class ChatsService { - private isFlushing = false; constructor( @InjectRedis() private readonly redisClient: Redis, private readonly httpService: HttpService, @@ -37,7 +36,7 @@ export class ChatsService { filteringResult: true, }; const chatString = JSON.stringify(chat); - await this.redisClient.multi().publish(`${channelId}:chat`, chatString).lpush('chatQueue', chatId).exec(); + await this.redisClient.multi().publish(`${channelId}:chat`, chatString).rpush('chatQueue', chatId).exec(); this.clovaFiltering(chat); } @@ -85,19 +84,26 @@ export class ChatsService { } async flushChat() { - if (!this.isFlushing) { - this.isFlushing = true; - while (true) { - const frontChatId = await this.redisClient.lindex('chatQueue', 0); - const chatString = await this.redisClient.hget('chatCache', frontChatId); - if (!chatString) { - break; - } else { - const chat = JSON.parse(chatString); - await this.redisClient.multi().rpush(`${chat.channelId}:chats`, chatString).lpop('chatQueue').exec(); + const lockKey = 'chat:flush:lock'; + const lock = await this.redisClient.set(lockKey, 'lock', 'NX'); + + try { + if (lockKey) { + while (true) { + const frontChatId = await this.redisClient.lindex('chatQueue', 0); + const chatString = await this.redisClient.hget('chatCache', frontChatId); + if (!chatString) { + break; + } else { + const chat = JSON.parse(chatString); + await this.redisClient.multi().rpush(`${chat.channelId}:chats`, chatString).lpop('chatQueue').exec(); + } } } - this.isFlushing = false; + } catch (err) { + console.log(err); + } finally { + await this.redisClient.del(lockKey); } } }