From 2371cc62f3757e643766865f95f5a6f2b46b7cf8 Mon Sep 17 00:00:00 2001 From: Fantasy-Jay <13631435453@163.com> Date: Fri, 8 Sep 2023 14:01:32 +0800 Subject: [PATCH] [KYUUBI #5216] Workaround for negative counter in SessionLimiter ### _Why are the changes needed?_ Fix: https://github.com/apache/kyuubi/issues/5216 ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [ ] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request ### _Was this patch authored or co-authored using generative AI tooling?_ No Closes #5217 from zhuyaogai/issue-5216. Closes #5216 b8d2e1796 [Fantasy-Jay] Limit counter resource leak in SessionLimiter. cda3702e5 [Fantasy-Jay] Limit counter resource leak in SessionLimiter. 36272d1b1 [Fantasy-Jay] fix test bug. 1e282d20f [Fantasy-Jay] Limit counter resource leak in SessionLimiter. 7fc389ff1 [Fantasy-Jay] Limit counter resource leak in SessionLimiter. Authored-by: Fantasy-Jay <13631435453@163.com> Signed-off-by: Cheng Pan --- .../kyuubi/session/SessionLimiter.scala | 9 +--- .../kyuubi/session/SessionLimiterSuite.scala | 50 +++++++++++++++++++ 2 files changed, 52 insertions(+), 7 deletions(-) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/SessionLimiter.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/SessionLimiter.scala index c8112d532db..8a1ebedf1a5 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/SessionLimiter.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/SessionLimiter.scala @@ -95,7 +95,8 @@ class SessionLimiterImpl(userLimit: Int, ipAddressLimit: Int, userIpAddressLimit private def decrLimitCount(key: String): Unit = { _counters.get(key) match { - case count: AtomicInteger => count.decrementAndGet() + case count: AtomicInteger => + count.accumulateAndGet(1, (l, r) => if (l > 0) l - r else l) case _ => } } @@ -121,12 +122,6 @@ class SessionLimiterWithAccessControlListImpl( } } - override def decrement(userIpAddress: UserIpAddress): Unit = { - if (!unlimitedUsers.contains(userIpAddress.user)) { - super.decrement(userIpAddress) - } - } - private[kyuubi] def setUnlimitedUsers(unlimitedUsers: Set[String]): Unit = { this.unlimitedUsers = unlimitedUsers } diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/session/SessionLimiterSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/session/SessionLimiterSuite.scala index df75d15f26c..775239f9b09 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/session/SessionLimiterSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/session/SessionLimiterSuite.scala @@ -20,8 +20,10 @@ import java.util.concurrent.{CountDownLatch, Executors} import java.util.concurrent.atomic.LongAdder import scala.collection.JavaConverters._ +import scala.util.Random import org.apache.kyuubi.{KyuubiFunSuite, KyuubiSQLException} +import org.apache.kyuubi.util.ThreadUtils class SessionLimiterSuite extends KyuubiFunSuite { @@ -149,4 +151,52 @@ class SessionLimiterSuite extends KyuubiFunSuite { assert(caught.getMessage.equals( "Connection denied because the user is in the deny user list. (user: user002)")) } + + test("test refresh unlimited users and deny users") { + val random: Random = new Random() + val latch = new CountDownLatch(600) + val userLimit = 100 + val ipAddressLimit = 101 + val userIpAddressLimit = 102 + val limiter = + SessionLimiter(userLimit, ipAddressLimit, userIpAddressLimit, Set.empty, Set.empty) + val threadPool = ThreadUtils.newDaemonCachedThreadPool("test-refresh-config") + + def checkUserLimit(userIpAddress: UserIpAddress): Unit = { + for (i <- 0 until 200) { + threadPool.execute(() => { + try { + Thread.sleep(random.nextInt(200)) + limiter.increment(userIpAddress) + } catch { + case _: Throwable => + } finally { + Thread.sleep(random.nextInt(500)) + // finally call limiter#decrement method. + limiter.decrement(userIpAddress) + latch.countDown() + } + }) + } + } + + checkUserLimit(UserIpAddress("user001", "127.0.0.1")) + checkUserLimit(UserIpAddress("user002", "127.0.0.2")) + checkUserLimit(UserIpAddress("user003", "127.0.0.3")) + + Thread.sleep(100) + // set unlimited users and deny users + SessionLimiter.resetUnlimitedUsers(limiter, Set("user001")) + SessionLimiter.resetDenyUsers(limiter, Set("user002")) + + Thread.sleep(300) + // unset unlimited users and deny users + SessionLimiter.resetUnlimitedUsers(limiter, Set.empty) + SessionLimiter.resetDenyUsers(limiter, Set.empty) + + latch.await() + threadPool.shutdown() + limiter.asInstanceOf[SessionLimiterImpl].counters().asScala.values + .foreach(c => assert(c.get() == 0)) + } }