Skip to content

Commit

Permalink
Merge pull request #35 from PlaytikaOSS/feature/locks-tests
Browse files Browse the repository at this point in the history
Replace add with put. More tests for locks
  • Loading branch information
kptfh authored May 22, 2024
2 parents 0ddc087 + e71cf11 commit 4412acc
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,9 @@ static List<AerospikeLock> processResults (
return locks;
}

private AerospikeLock putLock(Value batchId, Key lockKey, boolean checkBatchId) throws TemporaryLockingException{
AerospikeLock putLock(Value batchId, Key lockKey, boolean checkBatchId) throws TemporaryLockingException{
try {
aerospikeClient.add(putLockPolicy, lockKey, new Bin(BATCH_ID_BIN_NAME, batchId));
aerospikeClient.put(putLockPolicy, lockKey, new Bin(BATCH_ID_BIN_NAME, batchId));
logger.trace("acquired lock key=[{}], batchId=[{}]", lockKey, batchId);
return new AerospikeLock(LOCKED, lockKey);
} catch (AerospikeException ae) {
Expand Down Expand Up @@ -180,7 +180,7 @@ protected void checkExpectedValues(LOCKS batchLocks, List<AerospikeLock> keysLoc
expectedValuesOperations.checkExpectedValues(keysLocked, batchLocks.expectedValues());
}

private Value getBatchIdOfLock(Key lockKey){
Value getBatchIdOfLock(Key lockKey){
Record record = aerospikeClient.get(null, lockKey);
return getBatchId(record);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,89 @@
package nosql.batch.update.aerospike.lock;


import com.aerospike.client.AerospikeClient;
import com.aerospike.client.Key;
import com.aerospike.client.Value;
import nosql.batch.update.aerospike.basic.AerospikeBasicExpectedValueOperations;
import nosql.batch.update.aerospike.lock.AerospikeLockOperations.LockResult;
import nosql.batch.update.lock.Lock;
import nosql.batch.update.lock.PermanentLockingException;
import nosql.batch.update.lock.TemporaryLockingException;
import org.junit.Test;
import org.testcontainers.containers.GenericContainer;

import java.net.SocketTimeoutException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;

import static java.util.concurrent.CompletableFuture.completedFuture;
import static nosql.batch.update.aerospike.AerospikeTestUtils.AEROSPIKE_PROPERTIES;
import static nosql.batch.update.aerospike.AerospikeTestUtils.getAerospikeClient;
import static nosql.batch.update.aerospike.AerospikeTestUtils.getAerospikeContainer;
import static nosql.batch.update.aerospike.wal.AerospikeWriteAheadLogManager.generateBatchId;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

public class AerospikeLockOperationsTest {

static final GenericContainer aerospike = getAerospikeContainer();

static final AerospikeClient client = getAerospikeClient(aerospike);

static final AerospikeLockOperations aerospikeLockOperations = new AerospikeLockOperations(
client, new AerospikeBasicExpectedValueOperations(client), Executors.newFixedThreadPool(2));

private static int keyIncremental = 0;

@Test
public void shouldLockKey(){
Key key = new Key(AEROSPIKE_PROPERTIES.getNamespace(), "lock", Integer.toString(keyIncremental++));
Value batchId = generateBatchId();
AerospikeLock lock = aerospikeLockOperations.putLock(batchId, key, true);
assertThat(lock.key).isEqualTo(key);

Value batchIdOfLock = aerospikeLockOperations.getBatchIdOfLock(key);
assertThat(batchIdOfLock).isEqualTo(batchId);
assertThat(batchIdOfLock.toString()).isEqualTo(batchId.toString());
}

@Test
public void shouldFailIfLockTheSameKey(){
Key key = new Key(AEROSPIKE_PROPERTIES.getNamespace(), "lock", Integer.toString(keyIncremental++));
Value batchId = generateBatchId();
AerospikeLock lock = aerospikeLockOperations.putLock(batchId, key, false);
assertThat(lock.key).isEqualTo(key);

Value batchId2 = generateBatchId();
assertThatThrownBy(() -> aerospikeLockOperations.putLock(batchId2, key, false))
.hasMessageContaining("Locked by concurrent update")
.hasMessageContaining(batchId.toString())
.hasMessageContaining(batchId2.toString());

Value batchIdOfLock = aerospikeLockOperations.getBatchIdOfLock(key);
assertThat(batchIdOfLock).isEqualTo(batchId);
assertThat(batchIdOfLock.toString()).isEqualTo(batchId.toString());
}

@Test
public void shouldFailIfLockTheSameKeyInWal(){
Key key = new Key(AEROSPIKE_PROPERTIES.getNamespace(), "lock", Integer.toString(keyIncremental++));
Value batchId = generateBatchId();
AerospikeLock lock = aerospikeLockOperations.putLock(batchId, key, true);
assertThat(lock.key).isEqualTo(key);

Value batchId2 = generateBatchId();
assertThatThrownBy(() -> aerospikeLockOperations.putLock(batchId2, key, true))
.hasMessageContaining("Locked by other batch update")
.hasMessageContaining(batchId.toString())
.hasMessageContaining(batchId2.toString());

Value batchIdOfLock = aerospikeLockOperations.getBatchIdOfLock(key);
assertThat(batchIdOfLock).isEqualTo(batchId);
assertThat(batchIdOfLock.toString()).isEqualTo(batchId.toString());
}

@Test
public void shouldSuccess(){

Expand Down

0 comments on commit 4412acc

Please sign in to comment.