Skip to content

Commit

Permalink
Allow compression of Elasticsearch requests (#31601)
Browse files Browse the repository at this point in the history
  • Loading branch information
Amar3tto authored Jun 17, 2024
1 parent e2d6246 commit 5feba0d
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ enableJavaPerformanceTesting()
description = "Apache Beam :: SDKs :: Java :: IO :: Elasticsearch-Tests :: 7.x"
ext.summary = "Tests of ElasticsearchIO on Elasticsearch 7.x"

def elastic_search_version = "7.13.4"
def elastic_search_version = "7.17.22"

dependencies {
testImplementation project(path: ":sdks:java:io:elasticsearch-tests:elasticsearch-tests-common", configuration: "testRuntimeMigration")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public static void beforeClass() throws IOException {

// Start the container. This step might take some time...
container.start();
client = ElasticsearchIOTestUtils.clientFromContainer(container);
client = ElasticsearchIOTestUtils.clientFromContainer(container, true);
setDefaultTemplate(client);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public static void beforeClass() throws IOException {

// Start the container. This step might take some time...
container.start();
client = ElasticsearchIOTestUtils.clientFromContainer(container);
client = ElasticsearchIOTestUtils.clientFromContainer(container, true);
setDefaultTemplate(client);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ applyJavaNature(
description = "Apache Beam :: SDKs :: Java :: IO :: Elasticsearch-Tests :: Common"
ext.summary = "Common test classes for ElasticsearchIO"

def elastic_search_version = "7.9.2"
def elastic_search_version = "7.17.22"

dependencies {
testImplementation library.java.jackson_databind
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -492,11 +492,17 @@ static int countByMatch(
}

static RestClient clientFromContainer(ElasticsearchContainer container) {
return clientFromContainer(container, false);
}

static RestClient clientFromContainer(
ElasticsearchContainer container, boolean isCompressionEnabled) {
return RestClient.builder(
new HttpHost(
container.getContainerIpAddress(),
container.getMappedPort(ELASTICSEARCH_DEFAULT_PORT),
"http"))
.setCompressionEnabled(isCompressionEnabled)
.build();
}

Expand Down
2 changes: 1 addition & 1 deletion sdks/java/io/elasticsearch/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,6 @@ dependencies {
implementation library.java.slf4j_api
implementation "org.apache.httpcomponents:httpasyncclient:4.1.4"
implementation "org.apache.httpcomponents:httpcore-nio:4.4.12"
implementation "org.elasticsearch.client:elasticsearch-rest-client:7.9.2"
implementation "org.elasticsearch.client:elasticsearch-rest-client:7.17.22"
testImplementation project(path: ":sdks:java:io:common", configuration: "testRuntimeMigration")
}
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,8 @@ public abstract static class ConnectionConfiguration implements Serializable {

public abstract boolean isTrustSelfSignedCerts();

public abstract boolean isCompressionEnabled();

abstract Builder builder();

@AutoValue.Builder
Expand Down Expand Up @@ -378,6 +380,8 @@ abstract static class Builder {

abstract Builder setTrustSelfSignedCerts(boolean trustSelfSignedCerts);

abstract Builder setCompressionEnabled(boolean compressionEnabled);

abstract ConnectionConfiguration build();
}

Expand All @@ -399,6 +403,7 @@ public static ConnectionConfiguration create(String[] addresses, String index, S
.setIndex(index)
.setType(type)
.setTrustSelfSignedCerts(false)
.setCompressionEnabled(true)
.build();
}

Expand All @@ -418,6 +423,7 @@ public static ConnectionConfiguration create(String[] addresses, String index) {
.setIndex(index)
.setType("")
.setTrustSelfSignedCerts(false)
.setCompressionEnabled(true)
.build();
}

Expand All @@ -435,6 +441,7 @@ public static ConnectionConfiguration create(String[] addresses) {
.setIndex("")
.setType("")
.setTrustSelfSignedCerts(false)
.setCompressionEnabled(true)
.build();
}

Expand Down Expand Up @@ -634,6 +641,19 @@ public ConnectionConfiguration withTrustSelfSignedCerts(boolean trustSelfSignedC
return builder().setTrustSelfSignedCerts(trustSelfSignedCerts).build();
}

/**
* Configure whether the REST client should compress requests using gzip content encoding and
* add the "Accept-Encoding: gzip". The default is true.
*
* @param compressionEnabled Whether to compress requests using gzip content encoding and add
* the "Accept-Encoding: gzip"
* @return a {@link ConnectionConfiguration} describes a connection configuration to
* Elasticsearch.
*/
public ConnectionConfiguration withCompressionEnabled(boolean compressionEnabled) {
return builder().setCompressionEnabled(compressionEnabled).build();
}

/**
* If set, overwrites the default max retry timeout (30000ms) in the Elastic {@link RestClient}
* and the default socket timeout (30000ms) in the {@link RequestConfig} of the Elastic {@link
Expand Down Expand Up @@ -670,6 +690,7 @@ private void populateDisplayData(DisplayData.Builder builder) {
builder.addIfNotNull(DisplayData.item("socketTimeout", getSocketTimeout()));
builder.addIfNotNull(DisplayData.item("connectTimeout", getConnectTimeout()));
builder.addIfNotNull(DisplayData.item("trustSelfSignedCerts", isTrustSelfSignedCerts()));
builder.addIfNotNull(DisplayData.item("compressionEnabled", isCompressionEnabled()));
}

private SSLContext getSSLContext() throws IOException {
Expand Down Expand Up @@ -717,6 +738,9 @@ RestClient createClient() throws IOException {
Header[] headerList = new Header[getDefaultHeaders().size()];
restClientBuilder.setDefaultHeaders(getDefaultHeaders().toArray(headerList));
}
if (isCompressionEnabled()) {
restClientBuilder.setCompressionEnabled(true);
}

restClientBuilder.setHttpClientConfigCallback(
httpClientBuilder -> {
Expand Down

0 comments on commit 5feba0d

Please sign in to comment.