Skip to content

Commit

Permalink
Replica.MASTER for read operations
Browse files Browse the repository at this point in the history
  • Loading branch information
skarpenko committed Apr 10, 2024
1 parent 608f7cc commit f8b959b
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.aerospike.client.policy.ClientPolicy;
import com.aerospike.client.policy.Policy;
import com.aerospike.client.policy.QueryPolicy;
import com.aerospike.client.policy.Replica;
import com.aerospike.client.policy.ScanPolicy;
import com.aerospike.client.policy.WritePolicy;
import io.netty.channel.nio.NioEventLoopGroup;
Expand Down Expand Up @@ -51,8 +52,10 @@ public ClientPolicy clientPolicy() {

public BatchPolicy batchPolicy() {
BatchPolicy batchPolicy = new BatchPolicy();
batchPolicy.replica = Replica.MASTER;
batchPolicy.totalTimeout = configuration.get(AEROSPIKE_WRITE_TIMEOUT);
batchPolicy.socketTimeout = configuration.get(AEROSPIKE_SOCKET_TIMEOUT);
batchPolicy.respondAllKeys = true;
return batchPolicy;
}

Expand All @@ -71,7 +74,7 @@ public WritePolicy writePolicy() {
writePolicy.totalTimeout = configuration.get(AEROSPIKE_WRITE_TIMEOUT);
writePolicy.socketTimeout = configuration.get(AEROSPIKE_SOCKET_TIMEOUT);
writePolicy.maxRetries = NO_RETRIES;
writePolicy.durableDelete = !configuration.get(TEST_ENVIRONMENT);;
writePolicy.durableDelete = !configuration.get(TEST_ENVIRONMENT);
return writePolicy;
}

Expand All @@ -80,13 +83,14 @@ public WritePolicy deletePolicy() {
deletePolicy.expiration = -1;
deletePolicy.totalTimeout = configuration.get(AEROSPIKE_WRITE_TIMEOUT);
deletePolicy.socketTimeout = configuration.get(AEROSPIKE_SOCKET_TIMEOUT);
deletePolicy.durableDelete = !configuration.get(TEST_ENVIRONMENT);;
deletePolicy.durableDelete = !configuration.get(TEST_ENVIRONMENT);
deletePolicy.maxRetries = NO_RETRIES;
return deletePolicy;
}

public Policy readPolicy() {
Policy readPolicy = new Policy();
readPolicy.replica = Replica.MASTER;
readPolicy.sendKey = true;
readPolicy.totalTimeout = configuration.get(AEROSPIKE_READ_TIMEOUT);
readPolicy.socketTimeout = configuration.get(AEROSPIKE_SOCKET_TIMEOUT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
import com.aerospike.client.Value;
import com.aerospike.client.cdt.MapOperation;
import com.aerospike.client.cdt.MapReturnType;
import com.aerospike.client.policy.Replica;
import com.aerospike.client.policy.WritePolicy;
import com.playtika.janusgraph.aerospike.AerospikePolicyProvider;
import com.playtika.janusgraph.aerospike.operations.AerospikeOperations;
import nosql.batch.update.aerospike.lock.AerospikeExpectedValuesOperations;
import nosql.batch.update.aerospike.lock.AerospikeLock;
Expand All @@ -28,15 +30,12 @@ public class BatchExpectedValueOperations

private static final Logger logger = LoggerFactory.getLogger(BatchExpectedValueOperations.class);

private static final WritePolicy checkValuesPolicy = new WritePolicy();
static {
checkValuesPolicy.respondAllOps = true;
}

private final AerospikeOperations aerospikeOperations;
private final WritePolicy checkValuesPolicy;

public BatchExpectedValueOperations(AerospikeOperations aerospikeOperations) {
this.aerospikeOperations = aerospikeOperations;
this.checkValuesPolicy = buildCheckValuesPolicy(aerospikeOperations.getAerospikePolicyProvider());
}

@Override
Expand Down Expand Up @@ -118,4 +117,11 @@ private boolean checkValue(Key key, Value column, Value expectedValue, byte[] ac
return false;
}
}

private static WritePolicy buildCheckValuesPolicy(AerospikePolicyProvider policyProvider){
WritePolicy checkValuesPolicy = new WritePolicy(policyProvider.writePolicy());
checkValuesPolicy.replica = Replica.MASTER;
checkValuesPolicy.respondAllOps = true;
return checkValuesPolicy;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
public class BatchLocks implements AerospikeBatchLocks<Map<Key, ExpectedValue>> {

private final AerospikeOperations aerospikeOperations;
private Map<String, Map<Value, Map<Value, Value>>> locksByStore;
private final Map<String, Map<Value, Map<Value, Value>>> locksByStore;
private final List<Key> keysToLock;
private final Map<Key, ExpectedValue> expectedValues;

Expand Down Expand Up @@ -63,4 +63,9 @@ public Map<Key, ExpectedValue> expectedValues() {
public Map<String, Map<Value, Map<Value, Value>>> getLocksByStore() {
return locksByStore;
}

@Override
public String toString(){
return locksByStore.toString();
}
}
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
<aerospike-client.version>6.2.0</aerospike-client.version>
<janusgraph.version>0.5.3</janusgraph.version>

<aerospike-batch-updater.version>0.0.22</aerospike-batch-updater.version>
<aerospike-batch-updater.version>0.0.23</aerospike-batch-updater.version>
<metrics.version>4.1.18</metrics.version>
<netty-all.version>4.1.60.Final</netty-all.version>

Expand Down

0 comments on commit f8b959b

Please sign in to comment.