Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable compression for requests to Elasticsearch #1994

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -325,4 +325,14 @@ public interface ElasticsearchWriteOptions extends PipelineOptions {
Integer getSocketTimeout();

void setSocketTimeout(Integer socketTimeout);

@TemplateParameter.Boolean(
order = 28,
optional = true,
description = "Enable compression of requests.",
helpText = "Whether to compress requests to Elasticsearch. Defaults to: true.")
@Default.Boolean(true)
Boolean getCompressionEnabled();

void setCompressionEnabled(Boolean compressionEnabled);
}
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@ public PDone expand(PCollection<String> jsonStrings) {
if (options().getSocketTimeout() != null) {
config = config.withSocketTimeout(options().getSocketTimeout());
}
if (options().getCompressionEnabled() != null) {
config = config.withCompressionEnabled(options().getCompressionEnabled());
}

ElasticsearchIO.Write elasticsearchWriter =
ElasticsearchIO.write()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,8 @@ public abstract static class ConnectionConfiguration implements Serializable {

public abstract String getUserAgent();

public abstract boolean isCompressionEnabled();

abstract Builder builder();

@AutoValue.Builder
Expand Down Expand Up @@ -304,6 +306,8 @@ abstract static class Builder {

abstract Builder setUserAgent(String userAgent);

abstract Builder setCompressionEnabled(boolean compressionEnabled);

abstract ConnectionConfiguration build();
}

Expand All @@ -330,6 +334,7 @@ public static ConnectionConfiguration create(
.setTrustSelfSignedCerts(false)
.setDisableCertificateValidation(false)
.setUserAgent(userAgent)
.setCompressionEnabled(true)
.build();
}

Expand Down Expand Up @@ -485,6 +490,19 @@ public ConnectionConfiguration withConnectTimeout(Integer connectTimeout) {
return builder().setConnectTimeout(connectTimeout).build();
}

/**
* Configure whether the REST client should compress requests using gzip content encoding and
* add the "Accept-Encoding: gzip". The default is true.
*
* @param compressionEnabled Whether to compress requests using gzip content encoding and add
* the "Accept-Encoding: gzip"
* @return a {@link ConnectionConfiguration} describes a connection configuration to
* Elasticsearch.
*/
public ConnectionConfiguration withCompressionEnabled(boolean compressionEnabled) {
return builder().setCompressionEnabled(compressionEnabled).build();
}

private void populateDisplayData(DisplayData.Builder builder) {
builder.add(DisplayData.item("address", getAddresses().toString()));
builder.add(DisplayData.item("index", getIndex()));
Expand Down Expand Up @@ -516,6 +534,9 @@ RestClient createClient() throws IOException {
}

restClientBuilder.setDefaultHeaders(this.configureDefaultHeaders());
if (isCompressionEnabled()) {
restClientBuilder.setCompressionEnabled(true);
}

if (isDisableCertificateValidation()) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ public void testElasticsearchExpandMethod() {
options.setMaxRetryAttempts(3);
options.setMaxRetryDuration(10L);
options.setSocketTimeout(1);
options.setCompressionEnabled(true);

// Create some dummy input data for testing
PCollection<String> input = testPipeline.apply(Create.of("json string 1", "json string 2"));
Expand Down