Skip to content

Commit

Permalink
fix: fixed durable delete flag for wrtie policy
Browse files Browse the repository at this point in the history
  • Loading branch information
Volchkov Andrey authored and kptfh committed Feb 27, 2024
1 parent e8c316c commit ad2a98f
Show file tree
Hide file tree
Showing 7 changed files with 105 additions and 8 deletions.
25 changes: 25 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,28 @@

# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
hs_err_pid*

target/
!.mvn/wrapper/maven-wrapper.jar

### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans

### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr

### NetBeans ###
nbproject/private/
build/
nbbuild/
dist/
nbdist/
.nb-gradle/
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ static ModifiableConfiguration getAerospikeConfiguration(GenericContainer contai
config.set(GRAPH_PREFIX, "test");
//!!! need to prevent small batches mutations as we use deferred locking approach !!!
config.set(BUFFER_SIZE, AEROSPIKE_BUFFER_SIZE);
config.set(TEST_ENVIRONMENT, true); //for test purposes only
return config;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,28 @@ public static GenericContainer startAerospikeContainer(AerospikeProperties prope
.withStartupTimeout(startupTimeout);

aerospike.start();
configureEnterpriseServer(properties, aerospike);
return aerospike;
}

}
private static void configureEnterpriseServer(AerospikeProperties properties,
GenericContainer aerospikeContainer) {
AsadmCommandExecutor asadmCommandExecutor = new AsadmCommandExecutor(aerospikeContainer);
String namespace = properties.getNamespace();
/*
By default, the value of this metric is 90%, we set it to 100% to prevent stopping writes for the Aerospike
Enterprise container during high consumption of system memory. For the Aerospike Community Edition, this metric is not used.
Documentation: https://aerospike.com/docs/server/reference/configuration#stop-writes-sys-memory-pct
*/
log.info("Switching off 'stop-writes-sys-memory-pct'... ");
asadmCommandExecutor.execute(String.format("manage config namespace %s param stop-writes-sys-memory-pct to 100", namespace));
log.info("Success switching off 'stop-writes-sys-memory-pct'");

if (properties.isDurableDelete()) {
log.info("Setting up 'disallow-expunge' to true...");
asadmCommandExecutor.execute(String.format("manage config namespace %s param disallow-expunge to true", namespace));
log.info("Success setting up 'disallow-expunge' to true");
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,12 @@

public class AerospikeProperties {

static final String AEROSPIKE_BEAN_NAME = "aerospike";

boolean enabled = true;
String dockerImage = "aerospike:ce-6.2.0.2";
String dockerImage = "aerospike/aerospike-server-enterprise:6.3.0.16_1";
String namespace = "TEST";
String host = "localhost";
int port = 3000;
boolean durableDelete = true;

public boolean isEnabled() {
return enabled;
Expand Down Expand Up @@ -72,4 +71,12 @@ public int getPort() {
public void setPort(int port) {
this.port = port;
}

public boolean isDurableDelete() {
return durableDelete;
}

public void setDurableDelete(boolean durableDelete) {
this.durableDelete = durableDelete;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.aerospike;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;

public class AsadmCommandExecutor {

private static final Logger log = LoggerFactory.getLogger(AsadmCommandExecutor.class);

private final GenericContainer<?> aerospikeContainer;

public AsadmCommandExecutor(GenericContainer<?> aerospikeContainer) {
this.aerospikeContainer = aerospikeContainer;
}

public void execute(String command) {
try {
Container.ExecResult result = aerospikeContainer.execInContainer("asadm", "--enable", "-e", command);
logStdout(result);
if (result.getExitCode() != 0 || isBadResponse(result)) {
throw new IllegalStateException(String.format("Failed to execute \"asadm --enable -e '%s'\": \nstdout:\n%s\nstderr:\n%s",
command, result.getStdout(), result.getStderr()));
}
} catch (Exception ex) {
throw new IllegalStateException(String.format("Failed to execute \"asadm\"", ex));
}
}

private boolean isBadResponse(Container.ExecResult execResult) {
String stdout = execResult.getStdout();
/*
Example of the stdout without error:
~Set Namespace Param stop-writes-sys-memory-pct to 100~
Node|Response
728bb242e58c:3000|ok
Number of rows: 1
*/
return !stdout.contains("|ok");
}

private static void logStdout(Container.ExecResult result) {
log.debug("Aerospike asadm util stdout: \n{}\n{}", result.getStdout(), result.getStderr());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public WritePolicy writePolicy() {
writePolicy.totalTimeout = configuration.get(AEROSPIKE_WRITE_TIMEOUT);
writePolicy.socketTimeout = configuration.get(AEROSPIKE_SOCKET_TIMEOUT);
writePolicy.maxRetries = NO_RETRIES;
writePolicy.durableDelete = !configuration.get(TEST_ENVIRONMENT);;
return writePolicy;
}

Expand All @@ -75,7 +76,7 @@ public WritePolicy deletePolicy() {
deletePolicy.expiration = -1;
deletePolicy.totalTimeout = configuration.get(AEROSPIKE_WRITE_TIMEOUT);
deletePolicy.socketTimeout = configuration.get(AEROSPIKE_SOCKET_TIMEOUT);
deletePolicy.durableDelete = !configuration.get(TEST_ENVIRONMENT);
deletePolicy.durableDelete = !configuration.get(TEST_ENVIRONMENT);;
deletePolicy.maxRetries = NO_RETRIES;
return deletePolicy;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import static com.playtika.janusgraph.aerospike.ConfigOptions.IDS_NAMESPACE;
import static com.playtika.janusgraph.aerospike.ConfigOptions.NAMESPACE;
import static com.playtika.janusgraph.aerospike.ConfigOptions.SCAN_PARALLELISM;
import static com.playtika.janusgraph.aerospike.ConfigOptions.TEST_ENVIRONMENT;
import static com.playtika.janusgraph.aerospike.ConfigOptions.WAL_NAMESPACE;
import static com.playtika.janusgraph.aerospike.util.AerospikeUtils.isEmptyNamespace;
import static com.playtika.janusgraph.aerospike.util.AerospikeUtils.truncateNamespace;
Expand Down Expand Up @@ -50,7 +49,6 @@ public static ModifiableConfiguration getAerospikeConfiguration(GenericContainer
config.set(GRAPH_PREFIX, "test");
//!!! need to prevent small batches mutations as we use deferred locking approach !!!
config.set(BUFFER_SIZE, AEROSPIKE_BUFFER_SIZE);
config.set(TEST_ENVIRONMENT, true); //for test purposes only
config.set(SCAN_PARALLELISM, 100);
return config;
}
Expand Down

0 comments on commit ad2a98f

Please sign in to comment.