diff --git a/src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java b/src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java index dacfb8b95e..2f121ab748 100644 --- a/src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java +++ b/src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java @@ -46,6 +46,7 @@ * @author Christoph Strobl * @author Mark Paluch * @author André Prata + * @author Joongsoo Park * @since 2.0 */ class DefaultRedisCacheWriter implements RedisCacheWriter { @@ -149,32 +150,20 @@ public byte[] putIfAbsent(String name, byte[] key, byte[] value, @Nullable Durat return execute(name, connection -> { - if (isLockingCacheWriter()) { - doLock(name, connection); - } - - try { - - boolean put; - - if (shouldExpireWithin(ttl)) { - put = connection.set(key, value, Expiration.from(ttl), SetOption.ifAbsent()); - } else { - put = connection.setNX(key, value); - } + boolean put; - if (put) { - statistics.incPuts(name); - return null; - } - - return connection.get(key); - } finally { + if (shouldExpireWithin(ttl)) { + put = connection.set(key, value, Expiration.from(ttl), SetOption.ifAbsent()); + } else { + put = connection.setNX(key, value); + } - if (isLockingCacheWriter()) { - doUnlock(name, connection); - } + if (put) { + statistics.incPuts(name); + return null; } + + return connection.get(key); }); } @@ -204,27 +193,12 @@ public void clean(String name, byte[] pattern) { execute(name, connection -> { - boolean wasLocked = false; - - try { - - if (isLockingCacheWriter()) { - doLock(name, connection); - wasLocked = true; - } - - byte[][] keys = Optional.ofNullable(connection.keys(pattern)).orElse(Collections.emptySet()) - .toArray(new byte[0][]); + byte[][] keys = Optional.ofNullable(connection.keys(pattern)).orElse(Collections.emptySet()) + .toArray(new byte[0][]); - if (keys.length > 0) { - statistics.incDeletesBy(name, keys.length); - connection.del(keys); - } - } finally { - - if (wasLocked && isLockingCacheWriter()) { - doUnlock(name, connection); - } + if (keys.length > 0) { + statistics.incDeletesBy(name, keys.length); + connection.del(keys); } return "OK"; @@ -264,7 +238,8 @@ public RedisCacheWriter withStatisticsCollector(CacheStatisticsCollector cacheSt * @param name the name of the cache to lock. */ void lock(String name) { - execute(name, connection -> doLock(name, connection)); + + executeLockFree(connection -> doLock(name, connection)); } /** @@ -276,16 +251,42 @@ void unlock(String name) { executeLockFree(connection -> doUnlock(name, connection)); } - private Boolean doLock(String name, RedisConnection connection) { + /** + * Explicitly try set a write lock on a cache. + * + * @param name the name of the cache to lock. + * @param connection must not be {@literal null}. + */ + boolean tryLock(String name, RedisConnection connection) { return connection.setNX(createCacheLockKey(name), new byte[0]); } - private Long doUnlock(String name, RedisConnection connection) { - return connection.del(createCacheLockKey(name)); + private void doLock(String name, RedisConnection connection) { + + if (!isLockingCacheWriter()) { + return; + } + + long lockWaitTimeNs = System.nanoTime(); + try { + + while (!tryLock(name, connection)) { + Thread.sleep(sleepTime.toMillis()); + } + } catch (InterruptedException ex) { + + // Re-interrupt current thread, to allow other participants to react. + Thread.currentThread().interrupt(); + + throw new PessimisticLockingFailureException(String.format("Interrupted while waiting to acquire lock cache %s", name), + ex); + } finally { + statistics.incLockTime(name, System.nanoTime() - lockWaitTimeNs); + } } - boolean doCheckLock(String name, RedisConnection connection) { - return connection.exists(createCacheLockKey(name)); + private Long doUnlock(String name, RedisConnection connection) { + return connection.del(createCacheLockKey(name)); } /** @@ -300,8 +301,19 @@ private T execute(String name, Function callback) { RedisConnection connection = connectionFactory.getConnection(); try { - checkAndPotentiallyWaitUntilUnlocked(name, connection); - return callback.apply(connection); + try { + + if (isLockingCacheWriter()) { + doLock(name, connection); + } + + return callback.apply(connection); + } finally { + + if (isLockingCacheWriter()) { + doUnlock(name, connection); + } + } } finally { connection.close(); } @@ -318,30 +330,6 @@ private void executeLockFree(Consumer callback) { } } - private void checkAndPotentiallyWaitUntilUnlocked(String name, RedisConnection connection) { - - if (!isLockingCacheWriter()) { - return; - } - - long lockWaitTimeNs = System.nanoTime(); - try { - - while (doCheckLock(name, connection)) { - Thread.sleep(sleepTime.toMillis()); - } - } catch (InterruptedException ex) { - - // Re-interrupt current thread, to allow other participants to react. - Thread.currentThread().interrupt(); - - throw new PessimisticLockingFailureException(String.format("Interrupted while waiting to unlock cache %s", name), - ex); - } finally { - statistics.incLockTime(name, System.nanoTime() - lockWaitTimeNs); - } - } - private static boolean shouldExpireWithin(@Nullable Duration ttl) { return ttl != null && !ttl.isZero() && !ttl.isNegative(); } diff --git a/src/test/java/org/springframework/data/redis/cache/DefaultRedisCacheWriterTests.java b/src/test/java/org/springframework/data/redis/cache/DefaultRedisCacheWriterTests.java index c8b8da8338..1d2a784aff 100644 --- a/src/test/java/org/springframework/data/redis/cache/DefaultRedisCacheWriterTests.java +++ b/src/test/java/org/springframework/data/redis/cache/DefaultRedisCacheWriterTests.java @@ -45,6 +45,7 @@ * * @author Christoph Strobl * @author Mark Paluch + * @author Joongsoo Park */ @RunWith(Parameterized.class) public class DefaultRedisCacheWriterTests { @@ -320,9 +321,9 @@ public void lockingCacheWriterShouldExitWhenInterruptedWaitForLockRelease() thro DefaultRedisCacheWriter writer = new DefaultRedisCacheWriter(connectionFactory, Duration.ofMillis(50)) { @Override - boolean doCheckLock(String name, RedisConnection connection) { + boolean tryLock(String name, RedisConnection connection) { beforeWrite.countDown(); - return super.doCheckLock(name, connection); + return super.tryLock(name, connection); } }; @@ -342,7 +343,7 @@ boolean doCheckLock(String name, RedisConnection connection) { afterWrite.await(); - assertThat(exceptionRef.get()).hasMessageContaining("Interrupted while waiting to unlock") + assertThat(exceptionRef.get()).hasMessageContaining("Interrupted while waiting to acquire lock cache") .hasCauseInstanceOf(InterruptedException.class); }