Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DATAREDIS-1117 - Improve doLock method to atomic. #518

Open
wants to merge 4 commits into
base: 2.7.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
* @author Christoph Strobl
* @author Mark Paluch
* @author André Prata
* @author Joongsoo Park
* @since 2.0
*/
class DefaultRedisCacheWriter implements RedisCacheWriter {
Expand Down Expand Up @@ -149,32 +150,20 @@ public byte[] putIfAbsent(String name, byte[] key, byte[] value, @Nullable Durat

return execute(name, connection -> {

if (isLockingCacheWriter()) {
doLock(name, connection);
}

try {

boolean put;

if (shouldExpireWithin(ttl)) {
put = connection.set(key, value, Expiration.from(ttl), SetOption.ifAbsent());
} else {
put = connection.setNX(key, value);
}
boolean put;

if (put) {
statistics.incPuts(name);
return null;
}

return connection.get(key);
} finally {
if (shouldExpireWithin(ttl)) {
put = connection.set(key, value, Expiration.from(ttl), SetOption.ifAbsent());
} else {
put = connection.setNX(key, value);
}

if (isLockingCacheWriter()) {
doUnlock(name, connection);
}
if (put) {
statistics.incPuts(name);
return null;
}

return connection.get(key);
});
}

Expand Down Expand Up @@ -204,27 +193,12 @@ public void clean(String name, byte[] pattern) {

execute(name, connection -> {

boolean wasLocked = false;

try {

if (isLockingCacheWriter()) {
doLock(name, connection);
wasLocked = true;
}

byte[][] keys = Optional.ofNullable(connection.keys(pattern)).orElse(Collections.emptySet())
.toArray(new byte[0][]);
byte[][] keys = Optional.ofNullable(connection.keys(pattern)).orElse(Collections.emptySet())
.toArray(new byte[0][]);

if (keys.length > 0) {
statistics.incDeletesBy(name, keys.length);
connection.del(keys);
}
} finally {

if (wasLocked && isLockingCacheWriter()) {
doUnlock(name, connection);
}
if (keys.length > 0) {
statistics.incDeletesBy(name, keys.length);
connection.del(keys);
}

return "OK";
Expand Down Expand Up @@ -264,7 +238,8 @@ public RedisCacheWriter withStatisticsCollector(CacheStatisticsCollector cacheSt
* @param name the name of the cache to lock.
*/
void lock(String name) {
execute(name, connection -> doLock(name, connection));

executeLockFree(connection -> doLock(name, connection));
}

/**
Expand All @@ -276,16 +251,42 @@ void unlock(String name) {
executeLockFree(connection -> doUnlock(name, connection));
}

private Boolean doLock(String name, RedisConnection connection) {
/**
* Explicitly try set a write lock on a cache.
*
* @param name the name of the cache to lock.
* @param connection must not be {@literal null}.
*/
boolean tryLock(String name, RedisConnection connection) {
return connection.setNX(createCacheLockKey(name), new byte[0]);
}

private Long doUnlock(String name, RedisConnection connection) {
return connection.del(createCacheLockKey(name));
private void doLock(String name, RedisConnection connection) {

if (!isLockingCacheWriter()) {
return;
}

long lockWaitTimeNs = System.nanoTime();
try {

while (!tryLock(name, connection)) {
Thread.sleep(sleepTime.toMillis());
}
} catch (InterruptedException ex) {

// Re-interrupt current thread, to allow other participants to react.
Thread.currentThread().interrupt();

throw new PessimisticLockingFailureException(String.format("Interrupted while waiting to acquire lock cache %s", name),
ex);
} finally {
statistics.incLockTime(name, System.nanoTime() - lockWaitTimeNs);
}
}

boolean doCheckLock(String name, RedisConnection connection) {
return connection.exists(createCacheLockKey(name));
private Long doUnlock(String name, RedisConnection connection) {
return connection.del(createCacheLockKey(name));
}

/**
Expand All @@ -300,8 +301,19 @@ private <T> T execute(String name, Function<RedisConnection, T> callback) {
RedisConnection connection = connectionFactory.getConnection();
try {

checkAndPotentiallyWaitUntilUnlocked(name, connection);
return callback.apply(connection);
try {

if (isLockingCacheWriter()) {
doLock(name, connection);
}

return callback.apply(connection);
} finally {

if (isLockingCacheWriter()) {
doUnlock(name, connection);
}
}
} finally {
connection.close();
}
Expand All @@ -318,30 +330,6 @@ private void executeLockFree(Consumer<RedisConnection> callback) {
}
}

private void checkAndPotentiallyWaitUntilUnlocked(String name, RedisConnection connection) {

if (!isLockingCacheWriter()) {
return;
}

long lockWaitTimeNs = System.nanoTime();
try {

while (doCheckLock(name, connection)) {
Thread.sleep(sleepTime.toMillis());
}
} catch (InterruptedException ex) {

// Re-interrupt current thread, to allow other participants to react.
Thread.currentThread().interrupt();

throw new PessimisticLockingFailureException(String.format("Interrupted while waiting to unlock cache %s", name),
ex);
} finally {
statistics.incLockTime(name, System.nanoTime() - lockWaitTimeNs);
}
}

private static boolean shouldExpireWithin(@Nullable Duration ttl) {
return ttl != null && !ttl.isZero() && !ttl.isNegative();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
*
* @author Christoph Strobl
* @author Mark Paluch
* @author Joongsoo Park
*/
@RunWith(Parameterized.class)
public class DefaultRedisCacheWriterTests {
Expand Down Expand Up @@ -320,9 +321,9 @@ public void lockingCacheWriterShouldExitWhenInterruptedWaitForLockRelease() thro
DefaultRedisCacheWriter writer = new DefaultRedisCacheWriter(connectionFactory, Duration.ofMillis(50)) {

@Override
boolean doCheckLock(String name, RedisConnection connection) {
boolean tryLock(String name, RedisConnection connection) {
beforeWrite.countDown();
return super.doCheckLock(name, connection);
return super.tryLock(name, connection);
}
};

Expand All @@ -342,7 +343,7 @@ boolean doCheckLock(String name, RedisConnection connection) {

afterWrite.await();

assertThat(exceptionRef.get()).hasMessageContaining("Interrupted while waiting to unlock")
assertThat(exceptionRef.get()).hasMessageContaining("Interrupted while waiting to acquire lock cache")
.hasCauseInstanceOf(InterruptedException.class);
}

Expand Down