From dccae08f0dbdec8cea57600154bcfa8dd303bac8 Mon Sep 17 00:00:00 2001 From: sp-gupta Date: Thu, 6 Apr 2023 10:44:37 +0530 Subject: [PATCH] Support for Opensearch --- pom.xml | 10 + .../elasticsearch/ElasticsearchClient.java | 15 +- ...r.java => ElasticsearchDataConverter.java} | 8 +- .../ElasticsearchSinkConnector.java | 13 +- .../ElasticsearchSinkConnectorConfig.java | 28 +- .../elasticsearch/ElasticsearchSinkTask.java | 29 +- .../elasticsearch/ElasticsearchValidator.java | 496 ++++++++++++ .../connect/elasticsearch/SearchClient.java | 46 ++ .../elasticsearch/SearchDataConverter.java | 7 + .../elasticsearch/SyncOffsetTracker.java | 4 +- .../connect/elasticsearch/Validator.java | 488 +---------- .../opensearch/ConfigCallbackHandler.java | 370 +++++++++ .../elasticsearch/opensearch/Mapping.java | 255 ++++++ .../opensearch/OpensearchClient.java | 758 ++++++++++++++++++ .../opensearch/OpensearchDataConverter.java | 426 ++++++++++ .../opensearch/OpensearchValidator.java | 499 ++++++++++++ ...SearchElasticsearchDataConverterTest.java} | 44 +- .../ElasticsearchClientTest.java | 24 +- .../connect/elasticsearch/MappingTest.java | 3 +- .../connect/elasticsearch/ValidatorTest.java | 86 +- 20 files changed, 3021 insertions(+), 588 deletions(-) rename src/main/java/io/confluent/connect/elasticsearch/{DataConverter.java => ElasticsearchDataConverter.java} (98%) create mode 100644 src/main/java/io/confluent/connect/elasticsearch/ElasticsearchValidator.java create mode 100644 src/main/java/io/confluent/connect/elasticsearch/SearchClient.java create mode 100644 src/main/java/io/confluent/connect/elasticsearch/SearchDataConverter.java create mode 100644 src/main/java/io/confluent/connect/elasticsearch/opensearch/ConfigCallbackHandler.java create mode 100644 src/main/java/io/confluent/connect/elasticsearch/opensearch/Mapping.java create mode 100644 src/main/java/io/confluent/connect/elasticsearch/opensearch/OpensearchClient.java create mode 100644 src/main/java/io/confluent/connect/elasticsearch/opensearch/OpensearchDataConverter.java create mode 100644 src/main/java/io/confluent/connect/elasticsearch/opensearch/OpensearchValidator.java rename src/test/java/io/confluent/connect/elasticsearch/{DataConverterTest.java => ElasticSearchElasticsearchDataConverterTest.java} (91%) diff --git a/pom.xml b/pom.xml index eefaea298..51ad6e355 100644 --- a/pom.xml +++ b/pom.xml @@ -65,6 +65,11 @@ + + org.apache.lucene + lucene-core + 9.5.0 + org.apache.kafka connect-api @@ -75,6 +80,11 @@ connect-json provided + + org.opensearch.client + opensearch-rest-high-level-client + 2.6.0 + org.elasticsearch.client elasticsearch-rest-high-level-client diff --git a/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchClient.java b/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchClient.java index 1610bf727..94f25c900 100644 --- a/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchClient.java +++ b/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchClient.java @@ -49,6 +49,7 @@ import org.elasticsearch.action.bulk.BulkProcessor.Listener; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestHighLevelClient; @@ -61,6 +62,7 @@ import org.elasticsearch.client.indices.GetMappingsResponse; import org.elasticsearch.client.indices.PutMappingRequest; import org.elasticsearch.cluster.metadata.MappingMetadata; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.VersionType; import org.slf4j.Logger; @@ -85,7 +87,7 @@ * in failure of the task. */ @SuppressWarnings("checkstyle:ClassDataAbstractionCoupling") -public class ElasticsearchClient { +public class ElasticsearchClient extends SearchClient{ private static final Logger log = LoggerFactory.getLogger(ElasticsearchClient.class); @@ -111,7 +113,7 @@ public class ElasticsearchClient { private final ConcurrentMap> inFlightRequests; private final ElasticsearchSinkConnectorConfig config; private final ErrantRecordReporter reporter; - private final RestHighLevelClient client; + protected final RestHighLevelClient client; private final ExecutorService bulkExecutorService; private final Time clock; private final Lock inFlightRequestLock = new ReentrantLock(); @@ -157,7 +159,7 @@ public ElasticsearchClient( this.bulkProcessor = BulkProcessor .builder(buildConsumer(), buildListener(afterBulkCallback)) .setBulkActions(config.batchSize()) - .setBulkSize(config.bulkSize()) + .setBulkSize((ByteSizeValue) config.bulkSize()) .setConcurrentRequests(config.maxInFlightRequests() - 1) // 0 = no concurrent requests .setFlushInterval(TimeValue.timeValueMillis(config.lingerMs())) // Disabling bulk processor retries, because they only cover a small subset of errors @@ -331,15 +333,16 @@ public boolean hasMapping(String index) { * @param offsetState record's offset state * @throws ConnectException if one of the requests failed */ - public void index(SinkRecord record, DocWriteRequest request, OffsetState offsetState) { + public void index(SinkRecord record, Object request, OffsetState offsetState) { + DocWriteRequest docWriteRequest = (DocWriteRequest) request; throwIfFailed(); // TODO should we just pause partitions instead of blocking and failing the connector? verifyNumBufferedRecords(); - requestToSinkRecord.put(request, new SinkRecordAndOffset(record, offsetState)); + requestToSinkRecord.put(docWriteRequest, new SinkRecordAndOffset(record, offsetState)); numBufferedRecords.incrementAndGet(); - bulkProcessor.add(request); + bulkProcessor.add(docWriteRequest); } public void throwIfFailed() { diff --git a/src/main/java/io/confluent/connect/elasticsearch/DataConverter.java b/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchDataConverter.java similarity index 98% rename from src/main/java/io/confluent/connect/elasticsearch/DataConverter.java rename to src/main/java/io/confluent/connect/elasticsearch/ElasticsearchDataConverter.java index 53085bc0a..dab4c8096 100644 --- a/src/main/java/io/confluent/connect/elasticsearch/DataConverter.java +++ b/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchDataConverter.java @@ -52,9 +52,9 @@ import java.util.List; import java.util.Map; -public class DataConverter { +public class ElasticsearchDataConverter implements SearchDataConverter { - private static final Logger log = LoggerFactory.getLogger(DataConverter.class); + private static final Logger log = LoggerFactory.getLogger(ElasticsearchDataConverter.class); private static final Converter JSON_CONVERTER; protected static final String MAP_KEY = "key"; @@ -79,7 +79,7 @@ public class DataConverter { * * @param config connector config */ - public DataConverter(ElasticsearchSinkConnectorConfig config) { + public ElasticsearchDataConverter(ElasticsearchSinkConnectorConfig config) { this.config = config; this.objectMapper = new ObjectMapper(); } @@ -116,7 +116,7 @@ private String convertKey(Schema keySchema, Object key) { } } - public DocWriteRequest convertRecord(SinkRecord record, String index) { + public Object convertRecord(SinkRecord record, String index) { if (record.value() == null) { switch (config.behaviorOnNullValues()) { case IGNORE: diff --git a/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnector.java b/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnector.java index 070a95037..0517783ea 100644 --- a/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnector.java +++ b/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnector.java @@ -15,12 +15,15 @@ package io.confluent.connect.elasticsearch; +import io.confluent.connect.elasticsearch.opensearch.OpensearchValidator; import org.apache.kafka.common.config.Config; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.sink.SinkConnector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.HashMap; @@ -30,6 +33,7 @@ public class ElasticsearchSinkConnector extends SinkConnector { private Map configProperties; + private static final Logger log = LoggerFactory.getLogger(ElasticsearchClient.class); @Override public String version() { @@ -76,7 +80,14 @@ public ConfigDef config() { @Override public Config validate(Map connectorConfigs) { - Validator validator = new Validator(connectorConfigs); + Validator validator; + ElasticsearchSinkConnectorConfig config = new ElasticsearchSinkConnectorConfig(connectorConfigs); + log.info(config.toString()); + if(config.getServiceType().equals("Elasticsearch")){ + validator = new ElasticsearchValidator(connectorConfigs); + } else { + validator = new OpensearchValidator(connectorConfigs); + } return validator.validate(); } } diff --git a/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfig.java b/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfig.java index 8dd1e80ce..2f2a61a76 100644 --- a/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfig.java +++ b/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfig.java @@ -361,7 +361,11 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig { ); private static final String DATA_STREAM_TIMESTAMP_DISPLAY = "Data Stream Timestamp Field"; private static final String DATA_STREAM_TIMESTAMP_DEFAULT = ""; - + private static final String SERVICE_TYPE_CONFIG = + "service.type"; + private static final String SERVICE_TYPE_DISPLAY = "Elasticsearch or Opensearch"; + private static final String SERVICE_TYPE_DEFAULT = "Elasticsearch"; + private static final String SERVICE_TYPE_DOC = "Type of end service - Elasticsearch or opensearch"; private static final String CONNECTOR_GROUP = "Connector"; private static final String DATA_CONVERSION_GROUP = "Data Conversion"; private static final String PROXY_GROUP = "Proxy"; @@ -583,6 +587,16 @@ private static void addConnectorConfigs(ConfigDef configDef) { ++order, Width.SHORT, READ_TIMEOUT_MS_DISPLAY + ).define( + SERVICE_TYPE_CONFIG, + Type.STRING, + SERVICE_TYPE_DEFAULT, + Importance.LOW, + SERVICE_TYPE_DOC, + CONNECTOR_GROUP, + ++order, + Width.SHORT, + SERVICE_TYPE_DISPLAY ); } @@ -882,8 +896,14 @@ public BehaviorOnNullValues behaviorOnNullValues() { return BehaviorOnNullValues.valueOf(getString(BEHAVIOR_ON_NULL_VALUES_CONFIG).toUpperCase()); } - public ByteSizeValue bulkSize() { - return new ByteSizeValue(getLong(BULK_SIZE_BYTES_CONFIG)); + public Object bulkSize() { + if(getServiceType().equals("Elasticsearch")){ + return new org.elasticsearch.common.unit.ByteSizeValue(getLong(BULK_SIZE_BYTES_CONFIG)); + } + else{ + return new org.opensearch.common.unit.ByteSizeValue(getLong(BULK_SIZE_BYTES_CONFIG)); + } + } public boolean compression() { @@ -996,6 +1016,8 @@ public long retryBackoffMs() { return getLong(RETRY_BACKOFF_MS_CONFIG); } + public String getServiceType() { return getString(SERVICE_TYPE_CONFIG); } + private SecurityProtocol securityProtocol() { return SecurityProtocol.valueOf(getString(SECURITY_PROTOCOL_CONFIG).toUpperCase()); } diff --git a/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java b/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java index 8ff18ac71..dc10854bf 100644 --- a/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java +++ b/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java @@ -21,6 +21,8 @@ import java.util.Set; import java.util.function.BooleanSupplier; +import io.confluent.connect.elasticsearch.opensearch.OpensearchClient; +import io.confluent.connect.elasticsearch.opensearch.OpensearchDataConverter; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.errors.ConnectException; @@ -29,7 +31,6 @@ import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTask; import org.apache.kafka.connect.sink.SinkTaskContext; -import org.elasticsearch.action.DocWriteRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,14 +41,15 @@ public class ElasticsearchSinkTask extends SinkTask { private static final Logger log = LoggerFactory.getLogger(ElasticsearchSinkTask.class); - private DataConverter converter; - private ElasticsearchClient client; + private SearchDataConverter converter; + private SearchClient client; private ElasticsearchSinkConnectorConfig config; private ErrantRecordReporter reporter; private Set existingMappings; private Set indexCache; private OffsetTracker offsetTracker; private PartitionPauser partitionPauser; + public String type; @Override public void start(Map props) { @@ -55,11 +57,10 @@ public void start(Map props) { } // visible for testing - protected void start(Map props, ElasticsearchClient client) { + protected void start(Map props, SearchClient client) { log.info("Starting ElasticsearchSinkTask."); this.config = new ElasticsearchSinkConnectorConfig(props); - this.converter = new DataConverter(config); this.existingMappings = new HashSet<>(); this.indexCache = new HashSet<>(); int offsetHighWaterMark = config.maxBufferedRecords() * 10; @@ -79,8 +80,18 @@ protected void start(Map props, ElasticsearchClient client) { log.warn("AK versions prior to 2.6 do not support the errant record reporter."); } Runnable afterBulkCallback = () -> offsetTracker.updateOffsets(); - this.client = client != null ? client - : new ElasticsearchClient(config, reporter, afterBulkCallback); + if(config.getServiceType().equals("Elasticsearch")){ + this.type = "Elasticsearch"; + this.client = client != null ? client + : new ElasticsearchClient(config, reporter, afterBulkCallback); + this.converter = new ElasticsearchDataConverter(config); + } else { + log.info("Opensearch client....."); + this.type = "Opensearch"; + this.client = client != null ? client + : new OpensearchClient(config, reporter, afterBulkCallback); + this.converter = new OpensearchDataConverter(config); + } if (!config.flushSynchronously()) { this.offsetTracker = new AsyncOffsetTracker(context); @@ -254,7 +265,7 @@ private void tryWriteRecord(SinkRecord sinkRecord, OffsetState offsetState) { ensureIndexExists(indexName); checkMapping(indexName, sinkRecord); - DocWriteRequest docWriteRequest = null; + Object docWriteRequest = null; try { docWriteRequest = converter.convertRecord(sinkRecord, indexName); } catch (DataException convertException) { @@ -270,7 +281,9 @@ private void tryWriteRecord(SinkRecord sinkRecord, OffsetState offsetState) { if (docWriteRequest != null) { logTrace("Adding {} to bulk processor.", sinkRecord); + log.info("Client: " + client.getClass()); client.index(sinkRecord, docWriteRequest, offsetState); + log.info("Called index method"); } } diff --git a/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchValidator.java b/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchValidator.java new file mode 100644 index 000000000..1599811e8 --- /dev/null +++ b/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchValidator.java @@ -0,0 +1,496 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.connect.elasticsearch; + +import org.apache.http.HttpHost; +import org.apache.kafka.common.config.Config; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.config.ConfigValue; +import org.apache.kafka.common.config.SslConfigs; +import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.client.core.MainResponse; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestHighLevelClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.SecurityProtocol; + +import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.BATCH_SIZE_CONFIG; +import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG; +import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.BehaviorOnNullValues; +import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.CONNECTION_PASSWORD_CONFIG; +import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.CONNECTION_URL_CONFIG; +import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.CONNECTION_USERNAME_CONFIG; +import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.DATA_STREAM_DATASET_CONFIG; +import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.DATA_STREAM_TIMESTAMP_CONFIG; +import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.DATA_STREAM_TYPE_CONFIG; +import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.DataStreamType; +import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.FLUSH_TIMEOUT_MS_CONFIG; +import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.IGNORE_KEY_CONFIG; +import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.IGNORE_KEY_TOPICS_CONFIG; +import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.IGNORE_SCHEMA_CONFIG; +import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.IGNORE_SCHEMA_TOPICS_CONFIG; +import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.KERBEROS_KEYTAB_PATH_CONFIG; +import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.KERBEROS_PRINCIPAL_CONFIG; +import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.LINGER_MS_CONFIG; +import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.MAX_BUFFERED_RECORDS_CONFIG; +import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.MAX_IN_FLIGHT_REQUESTS_CONFIG; +import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.PROXY_HOST_CONFIG; +import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.PROXY_PASSWORD_CONFIG; +import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.PROXY_PORT_CONFIG; +import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.PROXY_USERNAME_CONFIG; +import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.SECURITY_PROTOCOL_CONFIG; +import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.SSL_CONFIG_PREFIX; +import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.WRITE_METHOD_CONFIG; +import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.WriteMethod; + +public class ElasticsearchValidator extends Validator{ + + private static final Logger log = LoggerFactory.getLogger(ElasticsearchValidator.class); + + private static final String CONNECTOR_V11_COMPATIBLE_ES_VERSION = "7.0.0"; + private static final String DATA_STREAM_COMPATIBLE_ES_VERSION = "7.9.0"; + + private ElasticsearchSinkConnectorConfig config; + private Map values; + private List validations; + private ClientFactory clientFactory; + + public ElasticsearchValidator(Map props) { + this(props, null); + } + + // Exposed for testing + protected ElasticsearchValidator(Map props, ClientFactory clientFactory) { + try { + this.config = new ElasticsearchSinkConnectorConfig(props); + } catch (ConfigException e) { + // some configs are invalid + } + + this.clientFactory = clientFactory == null ? this::createClient : clientFactory; + validations = ElasticsearchSinkConnectorConfig.CONFIG.validate(props); + values = validations.stream().collect(Collectors.toMap(ConfigValue::name, Function.identity())); + } + + public Config validate() { + if (config == null) { + // individual configs are invalid, no point in validating combinations + return new Config(validations); + } + + validateCredentials(); + validateDataStreamConfigs(); + validateIgnoreConfigs(); + validateKerberos(); + validateLingerMs(); + validateMaxBufferedRecords(); + validateProxy(); + validateSsl(); + + if (!hasErrors()) { + // no point in connection validation if previous ones fails + try (RestHighLevelClient client = clientFactory.client()) { + validateConnection(client); + validateVersion(client); + } catch (IOException e) { + log.warn("Closing the client failed.", e); + } catch (Throwable e) { + log.error("Failed to create client to verify connection. ", e); + addErrorMessage(CONNECTION_URL_CONFIG, "Failed to create client to verify connection. " + + e.getMessage()); + } + } + + return new Config(validations); + } + + private void validateCredentials() { + boolean onlyOneSet = config.username() != null ^ config.password() != null; + if (onlyOneSet) { + String errorMessage = String.format( + "Both '%s' and '%s' must be set.", CONNECTION_USERNAME_CONFIG, CONNECTION_PASSWORD_CONFIG + ); + addErrorMessage(CONNECTION_USERNAME_CONFIG, errorMessage); + addErrorMessage(CONNECTION_PASSWORD_CONFIG, errorMessage); + } + } + + private void validateDataStreamConfigs() { + if (config.dataStreamType() == DataStreamType.NONE ^ config.dataStreamDataset().isEmpty()) { + String errorMessage = String.format( + "Either both or neither '%s' and '%s' must be set.", + DATA_STREAM_DATASET_CONFIG, + DATA_STREAM_TYPE_CONFIG + ); + addErrorMessage(DATA_STREAM_TYPE_CONFIG, errorMessage); + addErrorMessage(DATA_STREAM_DATASET_CONFIG, errorMessage); + } + + if (config.isDataStream() && config.writeMethod() == WriteMethod.UPSERT) { + String errorMessage = String.format( + "Upserts are not supported with data streams. %s must not be %s if %s and %s are set.", + WRITE_METHOD_CONFIG, + WriteMethod.UPSERT, + DATA_STREAM_TYPE_CONFIG, + DATA_STREAM_DATASET_CONFIG + ); + addErrorMessage(WRITE_METHOD_CONFIG, errorMessage); + } + + if (config.isDataStream() && config.behaviorOnNullValues() == BehaviorOnNullValues.DELETE) { + String errorMessage = String.format( + "Deletes are not supported with data streams. %s must not be %s if %s and %s are set.", + BEHAVIOR_ON_NULL_VALUES_CONFIG, + BehaviorOnNullValues.DELETE, + DATA_STREAM_TYPE_CONFIG, + DATA_STREAM_DATASET_CONFIG + ); + addErrorMessage(BEHAVIOR_ON_NULL_VALUES_CONFIG, errorMessage); + } + + if (!config.isDataStream() && !config.dataStreamTimestampField().isEmpty()) { + String errorMessage = String.format( + "Mapping a field to the '@timestamp' field is only necessary for data streams. " + + "%s must not be set if %s and %s are not set.", + DATA_STREAM_TIMESTAMP_CONFIG, + DATA_STREAM_TYPE_CONFIG, + DATA_STREAM_DATASET_CONFIG + ); + addErrorMessage(DATA_STREAM_TIMESTAMP_CONFIG, errorMessage); + } + } + + private void validateIgnoreConfigs() { + if (config.ignoreKey() && !config.ignoreKeyTopics().isEmpty()) { + String errorMessage = String.format( + "'%s' can not be set if '%s' is true.", IGNORE_KEY_TOPICS_CONFIG, IGNORE_KEY_CONFIG + ); + addErrorMessage(IGNORE_KEY_CONFIG, errorMessage); + addErrorMessage(IGNORE_KEY_TOPICS_CONFIG, errorMessage); + } + + if (config.ignoreSchema() && !config.ignoreSchemaTopics().isEmpty()) { + String errorMessage = String.format( + "'%s' can not be set if '%s' is true.", IGNORE_SCHEMA_TOPICS_CONFIG, IGNORE_SCHEMA_CONFIG + ); + addErrorMessage(IGNORE_SCHEMA_CONFIG, errorMessage); + addErrorMessage(IGNORE_SCHEMA_TOPICS_CONFIG, errorMessage); + } + } + + private void validateKerberos() { + boolean onlyOneSet = config.kerberosUserPrincipal() != null ^ config.keytabPath() != null; + if (onlyOneSet) { + String errorMessage = String.format( + "Either both or neither '%s' and '%s' must be set.", + KERBEROS_PRINCIPAL_CONFIG, + KERBEROS_KEYTAB_PATH_CONFIG + ); + addErrorMessage(KERBEROS_PRINCIPAL_CONFIG, errorMessage); + addErrorMessage(KERBEROS_KEYTAB_PATH_CONFIG, errorMessage); + } + + if (config.isKerberosEnabled()) { + // currently do not support Kerberos with regular auth + if (config.isAuthenticatedConnection()) { + String errorMessage = String.format( + "Either only Kerberos (%s, %s) or connection credentials (%s, %s) must be set.", + KERBEROS_PRINCIPAL_CONFIG, + KERBEROS_KEYTAB_PATH_CONFIG, + CONNECTION_USERNAME_CONFIG, + CONNECTION_PASSWORD_CONFIG + ); + addErrorMessage(KERBEROS_PRINCIPAL_CONFIG, errorMessage); + addErrorMessage(KERBEROS_KEYTAB_PATH_CONFIG, errorMessage); + addErrorMessage(CONNECTION_USERNAME_CONFIG, errorMessage); + addErrorMessage(CONNECTION_PASSWORD_CONFIG, errorMessage); + } + + // currently do not support Kerberos with proxy + if (config.isBasicProxyConfigured()) { + String errorMessage = String.format( + "Kerberos (%s, %s) is not supported with proxy settings (%s).", + KERBEROS_PRINCIPAL_CONFIG, + KERBEROS_KEYTAB_PATH_CONFIG, + PROXY_HOST_CONFIG + ); + addErrorMessage(KERBEROS_PRINCIPAL_CONFIG, errorMessage); + addErrorMessage(KERBEROS_KEYTAB_PATH_CONFIG, errorMessage); + addErrorMessage(PROXY_HOST_CONFIG, errorMessage); + } + } + + } + + private void validateLingerMs() { + if (config.lingerMs() > config.flushTimeoutMs()) { + String errorMessage = String.format( + "'%s' (%d) can not be larger than '%s' (%d).", + LINGER_MS_CONFIG, config.lingerMs(), FLUSH_TIMEOUT_MS_CONFIG, config.flushTimeoutMs() + ); + addErrorMessage(LINGER_MS_CONFIG, errorMessage); + addErrorMessage(FLUSH_TIMEOUT_MS_CONFIG, errorMessage); + } + } + + private void validateMaxBufferedRecords() { + if (config.maxBufferedRecords() < config.batchSize() * config.maxInFlightRequests()) { + String errorMessage = String.format( + "'%s' (%d) must be larger than or equal to '%s' (%d) x %s (%d).", + MAX_BUFFERED_RECORDS_CONFIG, config.maxBufferedRecords(), + BATCH_SIZE_CONFIG, config.batchSize(), + MAX_IN_FLIGHT_REQUESTS_CONFIG, config.maxInFlightRequests() + ); + + addErrorMessage(MAX_BUFFERED_RECORDS_CONFIG, errorMessage); + addErrorMessage(BATCH_SIZE_CONFIG, errorMessage); + addErrorMessage(MAX_IN_FLIGHT_REQUESTS_CONFIG, errorMessage); + } + } + + private void validateProxy() { + if (!config.isBasicProxyConfigured()) { + if (!config.proxyUsername().isEmpty()) { + String errorMessage = String.format( + "'%s' must be set to use '%s'.", PROXY_HOST_CONFIG, PROXY_USERNAME_CONFIG + ); + addErrorMessage(PROXY_USERNAME_CONFIG, errorMessage); + addErrorMessage(PROXY_HOST_CONFIG, errorMessage); + } + + if (config.proxyPassword() != null) { + String errorMessage = String.format( + "'%s' must be set to use '%s'.", PROXY_HOST_CONFIG, PROXY_PASSWORD_CONFIG + ); + addErrorMessage(PROXY_PASSWORD_CONFIG, errorMessage); + addErrorMessage(PROXY_HOST_CONFIG, errorMessage); + } + } else { + boolean onlyOneSet = config.proxyUsername().isEmpty() ^ config.proxyPassword() == null; + if (onlyOneSet) { + String errorMessage = String.format( + "Either both or neither '%s' and '%s' can be set.", + PROXY_USERNAME_CONFIG, + PROXY_PASSWORD_CONFIG + ); + addErrorMessage(PROXY_USERNAME_CONFIG, errorMessage); + addErrorMessage(PROXY_PASSWORD_CONFIG, errorMessage); + } + } + } + + private void validateSsl() { + Map sslConfigs = config.originalsWithPrefix(SSL_CONFIG_PREFIX); + if (!config.isSslEnabled()) { + if (!sslConfigs.isEmpty()) { + String errorMessage = String.format( + "'%s' must be set to '%s' to use SSL configs.", + SECURITY_PROTOCOL_CONFIG, + SecurityProtocol.SSL + ); + addErrorMessage(SECURITY_PROTOCOL_CONFIG, errorMessage); + } + } else { + if (sslConfigs.isEmpty()) { + String errorMessage = String.format( + "At least these SSL configs ('%s', '%s', '%s', and '%s') must be present for SSL" + + " support. Otherwise set '%s' to '%s'.", + SSL_CONFIG_PREFIX + SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, + SSL_CONFIG_PREFIX + SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, + SSL_CONFIG_PREFIX + SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, + SSL_CONFIG_PREFIX + SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, + SECURITY_PROTOCOL_CONFIG, + SecurityProtocol.PLAINTEXT + ); + addErrorMessage(SECURITY_PROTOCOL_CONFIG, errorMessage); + } + } + } + + private void validateVersion(RestHighLevelClient client) { + MainResponse response; + try { + response = client.info(RequestOptions.DEFAULT); + } catch (IOException | ElasticsearchStatusException e) { + // Same error messages as from validating the connection for IOException. + // Insufficient privileges to validate the version number if caught + // ElasticsearchStatusException. + return; + } + String esVersionNumber = response.getVersion().getNumber(); + if (config.isDataStream() + && compareVersions(esVersionNumber, DATA_STREAM_COMPATIBLE_ES_VERSION) < 0) { + String errorMessage = String.format( + "Elasticsearch version %s is not compatible with data streams. Elasticsearch" + + "version must be at least %s.", + esVersionNumber, + DATA_STREAM_COMPATIBLE_ES_VERSION + ); + addErrorMessage(CONNECTION_URL_CONFIG, errorMessage); + addErrorMessage(DATA_STREAM_TYPE_CONFIG, errorMessage); + addErrorMessage(DATA_STREAM_DATASET_CONFIG, errorMessage); + } + if (compareVersions(esVersionNumber, CONNECTOR_V11_COMPATIBLE_ES_VERSION) < 0) { + String errorMessage = String.format( + "Connector version %s is not compatible with Elasticsearch version %s. Elasticsearch " + + "version must be at least %s.", + Version.getVersion(), + esVersionNumber, + CONNECTOR_V11_COMPATIBLE_ES_VERSION + ); + addErrorMessage(CONNECTION_URL_CONFIG, errorMessage); + } + } + + /** + * Compares versionNumber to compatibleVersion. + * + * @return a negative integer, zero, or a positive integer if + * versionNumber is less than, equal to, or greater + * than compatibleVersion. + */ + private int compareVersions(String versionNumber, String compatibleVersion) { + String[] versionSplit = versionNumber.split("\\."); + String[] compatibleSplit = compatibleVersion.split("\\."); + + for (int i = 0; i < Math.min(versionSplit.length, compatibleSplit.length); i++) { + String versionSplitBeforeSuffix = versionSplit[i].split("-")[0]; + String compatibleSplitBeforeSuffix = compatibleSplit[i].split("-")[0]; + int comparison = Integer.compare( + Integer.parseInt(versionSplitBeforeSuffix), + Integer.parseInt(compatibleSplitBeforeSuffix) + ); + if (comparison != 0) { + return comparison; + } + } + return versionSplit.length - compatibleSplit.length; + } + + private void validateConnection(RestHighLevelClient client) { + boolean successful; + String exceptionMessage = ""; + try { + successful = client.ping(RequestOptions.DEFAULT); + } catch (ElasticsearchStatusException e) { + switch (e.status()) { + case FORBIDDEN: + // ES is up, but user is not authorized to ping server + successful = true; + break; + default: + successful = false; + exceptionMessage = String.format("Error message: %s", e.getMessage()); + } + } catch (Exception e) { + successful = false; + exceptionMessage = String.format("Error message: %s", e.getMessage()); + } + if (!successful) { + String errorMessage = String.format( + "Could not connect to Elasticsearch. %s", + exceptionMessage + ); + addErrorMessage(CONNECTION_URL_CONFIG, errorMessage); + + if (config.isAuthenticatedConnection()) { + errorMessage = String.format( + "Could not authenticate the user. Check the '%s' and '%s'. %s", + CONNECTION_USERNAME_CONFIG, + CONNECTION_PASSWORD_CONFIG, + exceptionMessage + ); + addErrorMessage(CONNECTION_USERNAME_CONFIG, errorMessage); + addErrorMessage(CONNECTION_PASSWORD_CONFIG, errorMessage); + } + + if (config.isSslEnabled()) { + errorMessage = String.format( + "Could not connect to Elasticsearch. Check your SSL settings.%s", + exceptionMessage + ); + + addErrorMessage(SECURITY_PROTOCOL_CONFIG, errorMessage); + } + + if (config.isKerberosEnabled()) { + errorMessage = String.format( + "Could not connect to Elasticsearch. Check your Kerberos settings. %s", + exceptionMessage + ); + + addErrorMessage(KERBEROS_PRINCIPAL_CONFIG, errorMessage); + addErrorMessage(KERBEROS_KEYTAB_PATH_CONFIG, errorMessage); + } + + if (config.isBasicProxyConfigured()) { + errorMessage = String.format( + "Could not connect to Elasticsearch. Check your proxy settings. %s", + exceptionMessage + ); + addErrorMessage(PROXY_HOST_CONFIG, errorMessage); + addErrorMessage(PROXY_PORT_CONFIG, errorMessage); + + if (config.isProxyWithAuthenticationConfigured()) { + addErrorMessage(PROXY_USERNAME_CONFIG, errorMessage); + addErrorMessage(PROXY_PASSWORD_CONFIG, errorMessage); + } + } + } + } + + private void addErrorMessage(String property, String error) { + values.get(property).addErrorMessage(error); + } + + private RestHighLevelClient createClient() { + ConfigCallbackHandler configCallbackHandler = new ConfigCallbackHandler(config); + return new RestHighLevelClient( + RestClient + .builder( + config.connectionUrls() + .stream() + .map(HttpHost::create) + .collect(Collectors.toList()) + .toArray(new HttpHost[config.connectionUrls().size()]) + ) + .setHttpClientConfigCallback(configCallbackHandler) + ); + } + + private boolean hasErrors() { + for (ConfigValue config : validations) { + if (!config.errorMessages().isEmpty()) { + return true; + } + } + + return false; + } + + interface ClientFactory { + RestHighLevelClient client(); + } +} diff --git a/src/main/java/io/confluent/connect/elasticsearch/SearchClient.java b/src/main/java/io/confluent/connect/elasticsearch/SearchClient.java new file mode 100644 index 000000000..51b3cc63c --- /dev/null +++ b/src/main/java/io/confluent/connect/elasticsearch/SearchClient.java @@ -0,0 +1,46 @@ +package io.confluent.connect.elasticsearch; + +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.sink.SinkRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SearchClient{ + + public String type; + private static final Logger log = LoggerFactory.getLogger(ElasticsearchClient.class); + public Object version() { + return null; + } + + public void throwIfFailed() { + } + + public void flush() { + } + + public void close() { + } + + public boolean hasMapping(String index) { + return false; + } + + public void createMapping(String index, Schema valueSchema) { + } + + public void waitForInFlightRequests() { + } + + public boolean isFailed() { + return false; + } + + public boolean createIndexOrDataStream(String index) { + return false; + } + + public void index(SinkRecord sinkRecord, Object docWriteRequest, OffsetState offsetState) { + log.info("I am in search client...."); + } +} diff --git a/src/main/java/io/confluent/connect/elasticsearch/SearchDataConverter.java b/src/main/java/io/confluent/connect/elasticsearch/SearchDataConverter.java new file mode 100644 index 000000000..3224b682c --- /dev/null +++ b/src/main/java/io/confluent/connect/elasticsearch/SearchDataConverter.java @@ -0,0 +1,7 @@ +package io.confluent.connect.elasticsearch; + +import org.apache.kafka.connect.sink.SinkRecord; + +public interface SearchDataConverter { + Object convertRecord(SinkRecord sinkRecord, String indexName); +} diff --git a/src/main/java/io/confluent/connect/elasticsearch/SyncOffsetTracker.java b/src/main/java/io/confluent/connect/elasticsearch/SyncOffsetTracker.java index 9383f61f7..c635828e4 100644 --- a/src/main/java/io/confluent/connect/elasticsearch/SyncOffsetTracker.java +++ b/src/main/java/io/confluent/connect/elasticsearch/SyncOffsetTracker.java @@ -29,9 +29,9 @@ */ public class SyncOffsetTracker implements OffsetTracker { - private ElasticsearchClient client; + private SearchClient client; - public SyncOffsetTracker(ElasticsearchClient client) { + public SyncOffsetTracker(SearchClient client) { this.client = client; } diff --git a/src/main/java/io/confluent/connect/elasticsearch/Validator.java b/src/main/java/io/confluent/connect/elasticsearch/Validator.java index 0dc577cec..96d569997 100644 --- a/src/main/java/io/confluent/connect/elasticsearch/Validator.java +++ b/src/main/java/io/confluent/connect/elasticsearch/Validator.java @@ -1,496 +1,14 @@ -/* - * Copyright 2020 Confluent Inc. - * - * Licensed under the Confluent Community License (the "License"); you may not use - * this file except in compliance with the License. You may obtain a copy of the - * License at - * - * http://www.confluent.io/confluent-community-license - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - package io.confluent.connect.elasticsearch; -import org.apache.http.HttpHost; import org.apache.kafka.common.config.Config; -import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.ConfigValue; -import org.apache.kafka.common.config.SslConfigs; -import org.elasticsearch.ElasticsearchStatusException; -import org.elasticsearch.client.core.MainResponse; -import org.elasticsearch.client.RequestOptions; -import org.elasticsearch.client.RestClient; -import org.elasticsearch.client.RestHighLevelClient; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import java.io.IOException; +import java.util.LinkedList; import java.util.List; -import java.util.Map; -import java.util.function.Function; -import java.util.stream.Collectors; - -import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.SecurityProtocol; - -import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.BATCH_SIZE_CONFIG; -import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG; -import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.BehaviorOnNullValues; -import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.CONNECTION_PASSWORD_CONFIG; -import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.CONNECTION_URL_CONFIG; -import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.CONNECTION_USERNAME_CONFIG; -import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.DATA_STREAM_DATASET_CONFIG; -import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.DATA_STREAM_TIMESTAMP_CONFIG; -import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.DATA_STREAM_TYPE_CONFIG; -import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.DataStreamType; -import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.FLUSH_TIMEOUT_MS_CONFIG; -import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.IGNORE_KEY_CONFIG; -import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.IGNORE_KEY_TOPICS_CONFIG; -import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.IGNORE_SCHEMA_CONFIG; -import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.IGNORE_SCHEMA_TOPICS_CONFIG; -import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.KERBEROS_KEYTAB_PATH_CONFIG; -import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.KERBEROS_PRINCIPAL_CONFIG; -import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.LINGER_MS_CONFIG; -import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.MAX_BUFFERED_RECORDS_CONFIG; -import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.MAX_IN_FLIGHT_REQUESTS_CONFIG; -import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.PROXY_HOST_CONFIG; -import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.PROXY_PASSWORD_CONFIG; -import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.PROXY_PORT_CONFIG; -import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.PROXY_USERNAME_CONFIG; -import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.SECURITY_PROTOCOL_CONFIG; -import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.SSL_CONFIG_PREFIX; -import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.WRITE_METHOD_CONFIG; -import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.WriteMethod; public class Validator { - private static final Logger log = LoggerFactory.getLogger(Validator.class); - - private static final String CONNECTOR_V11_COMPATIBLE_ES_VERSION = "7.0.0"; - private static final String DATA_STREAM_COMPATIBLE_ES_VERSION = "7.9.0"; - - private ElasticsearchSinkConnectorConfig config; - private Map values; - private List validations; - private ClientFactory clientFactory; - - public Validator(Map props) { - this(props, null); - } - - // Exposed for testing - protected Validator(Map props, ClientFactory clientFactory) { - try { - this.config = new ElasticsearchSinkConnectorConfig(props); - } catch (ConfigException e) { - // some configs are invalid - } - - this.clientFactory = clientFactory == null ? this::createClient : clientFactory; - validations = ElasticsearchSinkConnectorConfig.CONFIG.validate(props); - values = validations.stream().collect(Collectors.toMap(ConfigValue::name, Function.identity())); - } - - public Config validate() { - if (config == null) { - // individual configs are invalid, no point in validating combinations - return new Config(validations); - } - - validateCredentials(); - validateDataStreamConfigs(); - validateIgnoreConfigs(); - validateKerberos(); - validateLingerMs(); - validateMaxBufferedRecords(); - validateProxy(); - validateSsl(); - - if (!hasErrors()) { - // no point in connection validation if previous ones fails - try (RestHighLevelClient client = clientFactory.client()) { - validateConnection(client); - validateVersion(client); - } catch (IOException e) { - log.warn("Closing the client failed.", e); - } catch (Throwable e) { - log.error("Failed to create client to verify connection. ", e); - addErrorMessage(CONNECTION_URL_CONFIG, "Failed to create client to verify connection. " - + e.getMessage()); - } - } - - return new Config(validations); - } - - private void validateCredentials() { - boolean onlyOneSet = config.username() != null ^ config.password() != null; - if (onlyOneSet) { - String errorMessage = String.format( - "Both '%s' and '%s' must be set.", CONNECTION_USERNAME_CONFIG, CONNECTION_PASSWORD_CONFIG - ); - addErrorMessage(CONNECTION_USERNAME_CONFIG, errorMessage); - addErrorMessage(CONNECTION_PASSWORD_CONFIG, errorMessage); - } - } - - private void validateDataStreamConfigs() { - if (config.dataStreamType() == DataStreamType.NONE ^ config.dataStreamDataset().isEmpty()) { - String errorMessage = String.format( - "Either both or neither '%s' and '%s' must be set.", - DATA_STREAM_DATASET_CONFIG, - DATA_STREAM_TYPE_CONFIG - ); - addErrorMessage(DATA_STREAM_TYPE_CONFIG, errorMessage); - addErrorMessage(DATA_STREAM_DATASET_CONFIG, errorMessage); - } - - if (config.isDataStream() && config.writeMethod() == WriteMethod.UPSERT) { - String errorMessage = String.format( - "Upserts are not supported with data streams. %s must not be %s if %s and %s are set.", - WRITE_METHOD_CONFIG, - WriteMethod.UPSERT, - DATA_STREAM_TYPE_CONFIG, - DATA_STREAM_DATASET_CONFIG - ); - addErrorMessage(WRITE_METHOD_CONFIG, errorMessage); - } - - if (config.isDataStream() && config.behaviorOnNullValues() == BehaviorOnNullValues.DELETE) { - String errorMessage = String.format( - "Deletes are not supported with data streams. %s must not be %s if %s and %s are set.", - BEHAVIOR_ON_NULL_VALUES_CONFIG, - BehaviorOnNullValues.DELETE, - DATA_STREAM_TYPE_CONFIG, - DATA_STREAM_DATASET_CONFIG - ); - addErrorMessage(BEHAVIOR_ON_NULL_VALUES_CONFIG, errorMessage); - } - - if (!config.isDataStream() && !config.dataStreamTimestampField().isEmpty()) { - String errorMessage = String.format( - "Mapping a field to the '@timestamp' field is only necessary for data streams. " - + "%s must not be set if %s and %s are not set.", - DATA_STREAM_TIMESTAMP_CONFIG, - DATA_STREAM_TYPE_CONFIG, - DATA_STREAM_DATASET_CONFIG - ); - addErrorMessage(DATA_STREAM_TIMESTAMP_CONFIG, errorMessage); - } - } - - private void validateIgnoreConfigs() { - if (config.ignoreKey() && !config.ignoreKeyTopics().isEmpty()) { - String errorMessage = String.format( - "'%s' can not be set if '%s' is true.", IGNORE_KEY_TOPICS_CONFIG, IGNORE_KEY_CONFIG - ); - addErrorMessage(IGNORE_KEY_CONFIG, errorMessage); - addErrorMessage(IGNORE_KEY_TOPICS_CONFIG, errorMessage); + public Config validate() { + return new Config(new LinkedList<>()); } - - if (config.ignoreSchema() && !config.ignoreSchemaTopics().isEmpty()) { - String errorMessage = String.format( - "'%s' can not be set if '%s' is true.", IGNORE_SCHEMA_TOPICS_CONFIG, IGNORE_SCHEMA_CONFIG - ); - addErrorMessage(IGNORE_SCHEMA_CONFIG, errorMessage); - addErrorMessage(IGNORE_SCHEMA_TOPICS_CONFIG, errorMessage); - } - } - - private void validateKerberos() { - boolean onlyOneSet = config.kerberosUserPrincipal() != null ^ config.keytabPath() != null; - if (onlyOneSet) { - String errorMessage = String.format( - "Either both or neither '%s' and '%s' must be set.", - KERBEROS_PRINCIPAL_CONFIG, - KERBEROS_KEYTAB_PATH_CONFIG - ); - addErrorMessage(KERBEROS_PRINCIPAL_CONFIG, errorMessage); - addErrorMessage(KERBEROS_KEYTAB_PATH_CONFIG, errorMessage); - } - - if (config.isKerberosEnabled()) { - // currently do not support Kerberos with regular auth - if (config.isAuthenticatedConnection()) { - String errorMessage = String.format( - "Either only Kerberos (%s, %s) or connection credentials (%s, %s) must be set.", - KERBEROS_PRINCIPAL_CONFIG, - KERBEROS_KEYTAB_PATH_CONFIG, - CONNECTION_USERNAME_CONFIG, - CONNECTION_PASSWORD_CONFIG - ); - addErrorMessage(KERBEROS_PRINCIPAL_CONFIG, errorMessage); - addErrorMessage(KERBEROS_KEYTAB_PATH_CONFIG, errorMessage); - addErrorMessage(CONNECTION_USERNAME_CONFIG, errorMessage); - addErrorMessage(CONNECTION_PASSWORD_CONFIG, errorMessage); - } - - // currently do not support Kerberos with proxy - if (config.isBasicProxyConfigured()) { - String errorMessage = String.format( - "Kerberos (%s, %s) is not supported with proxy settings (%s).", - KERBEROS_PRINCIPAL_CONFIG, - KERBEROS_KEYTAB_PATH_CONFIG, - PROXY_HOST_CONFIG - ); - addErrorMessage(KERBEROS_PRINCIPAL_CONFIG, errorMessage); - addErrorMessage(KERBEROS_KEYTAB_PATH_CONFIG, errorMessage); - addErrorMessage(PROXY_HOST_CONFIG, errorMessage); - } - } - - } - - private void validateLingerMs() { - if (config.lingerMs() > config.flushTimeoutMs()) { - String errorMessage = String.format( - "'%s' (%d) can not be larger than '%s' (%d).", - LINGER_MS_CONFIG, config.lingerMs(), FLUSH_TIMEOUT_MS_CONFIG, config.flushTimeoutMs() - ); - addErrorMessage(LINGER_MS_CONFIG, errorMessage); - addErrorMessage(FLUSH_TIMEOUT_MS_CONFIG, errorMessage); - } - } - - private void validateMaxBufferedRecords() { - if (config.maxBufferedRecords() < config.batchSize() * config.maxInFlightRequests()) { - String errorMessage = String.format( - "'%s' (%d) must be larger than or equal to '%s' (%d) x %s (%d).", - MAX_BUFFERED_RECORDS_CONFIG, config.maxBufferedRecords(), - BATCH_SIZE_CONFIG, config.batchSize(), - MAX_IN_FLIGHT_REQUESTS_CONFIG, config.maxInFlightRequests() - ); - - addErrorMessage(MAX_BUFFERED_RECORDS_CONFIG, errorMessage); - addErrorMessage(BATCH_SIZE_CONFIG, errorMessage); - addErrorMessage(MAX_IN_FLIGHT_REQUESTS_CONFIG, errorMessage); - } - } - - private void validateProxy() { - if (!config.isBasicProxyConfigured()) { - if (!config.proxyUsername().isEmpty()) { - String errorMessage = String.format( - "'%s' must be set to use '%s'.", PROXY_HOST_CONFIG, PROXY_USERNAME_CONFIG - ); - addErrorMessage(PROXY_USERNAME_CONFIG, errorMessage); - addErrorMessage(PROXY_HOST_CONFIG, errorMessage); - } - - if (config.proxyPassword() != null) { - String errorMessage = String.format( - "'%s' must be set to use '%s'.", PROXY_HOST_CONFIG, PROXY_PASSWORD_CONFIG - ); - addErrorMessage(PROXY_PASSWORD_CONFIG, errorMessage); - addErrorMessage(PROXY_HOST_CONFIG, errorMessage); - } - } else { - boolean onlyOneSet = config.proxyUsername().isEmpty() ^ config.proxyPassword() == null; - if (onlyOneSet) { - String errorMessage = String.format( - "Either both or neither '%s' and '%s' can be set.", - PROXY_USERNAME_CONFIG, - PROXY_PASSWORD_CONFIG - ); - addErrorMessage(PROXY_USERNAME_CONFIG, errorMessage); - addErrorMessage(PROXY_PASSWORD_CONFIG, errorMessage); - } - } - } - - private void validateSsl() { - Map sslConfigs = config.originalsWithPrefix(SSL_CONFIG_PREFIX); - if (!config.isSslEnabled()) { - if (!sslConfigs.isEmpty()) { - String errorMessage = String.format( - "'%s' must be set to '%s' to use SSL configs.", - SECURITY_PROTOCOL_CONFIG, - SecurityProtocol.SSL - ); - addErrorMessage(SECURITY_PROTOCOL_CONFIG, errorMessage); - } - } else { - if (sslConfigs.isEmpty()) { - String errorMessage = String.format( - "At least these SSL configs ('%s', '%s', '%s', and '%s') must be present for SSL" - + " support. Otherwise set '%s' to '%s'.", - SSL_CONFIG_PREFIX + SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, - SSL_CONFIG_PREFIX + SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, - SSL_CONFIG_PREFIX + SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, - SSL_CONFIG_PREFIX + SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, - SECURITY_PROTOCOL_CONFIG, - SecurityProtocol.PLAINTEXT - ); - addErrorMessage(SECURITY_PROTOCOL_CONFIG, errorMessage); - } - } - } - - private void validateVersion(RestHighLevelClient client) { - MainResponse response; - try { - response = client.info(RequestOptions.DEFAULT); - } catch (IOException | ElasticsearchStatusException e) { - // Same error messages as from validating the connection for IOException. - // Insufficient privileges to validate the version number if caught - // ElasticsearchStatusException. - return; - } - String esVersionNumber = response.getVersion().getNumber(); - if (config.isDataStream() - && compareVersions(esVersionNumber, DATA_STREAM_COMPATIBLE_ES_VERSION) < 0) { - String errorMessage = String.format( - "Elasticsearch version %s is not compatible with data streams. Elasticsearch" - + "version must be at least %s.", - esVersionNumber, - DATA_STREAM_COMPATIBLE_ES_VERSION - ); - addErrorMessage(CONNECTION_URL_CONFIG, errorMessage); - addErrorMessage(DATA_STREAM_TYPE_CONFIG, errorMessage); - addErrorMessage(DATA_STREAM_DATASET_CONFIG, errorMessage); - } - if (compareVersions(esVersionNumber, CONNECTOR_V11_COMPATIBLE_ES_VERSION) < 0) { - String errorMessage = String.format( - "Connector version %s is not compatible with Elasticsearch version %s. Elasticsearch " - + "version must be at least %s.", - Version.getVersion(), - esVersionNumber, - CONNECTOR_V11_COMPATIBLE_ES_VERSION - ); - addErrorMessage(CONNECTION_URL_CONFIG, errorMessage); - } - } - - /** - * Compares versionNumber to compatibleVersion. - * - * @return a negative integer, zero, or a positive integer if - * versionNumber is less than, equal to, or greater - * than compatibleVersion. - */ - private int compareVersions(String versionNumber, String compatibleVersion) { - String[] versionSplit = versionNumber.split("\\."); - String[] compatibleSplit = compatibleVersion.split("\\."); - - for (int i = 0; i < Math.min(versionSplit.length, compatibleSplit.length); i++) { - String versionSplitBeforeSuffix = versionSplit[i].split("-")[0]; - String compatibleSplitBeforeSuffix = compatibleSplit[i].split("-")[0]; - int comparison = Integer.compare( - Integer.parseInt(versionSplitBeforeSuffix), - Integer.parseInt(compatibleSplitBeforeSuffix) - ); - if (comparison != 0) { - return comparison; - } - } - return versionSplit.length - compatibleSplit.length; - } - - private void validateConnection(RestHighLevelClient client) { - boolean successful; - String exceptionMessage = ""; - try { - successful = client.ping(RequestOptions.DEFAULT); - } catch (ElasticsearchStatusException e) { - switch (e.status()) { - case FORBIDDEN: - // ES is up, but user is not authorized to ping server - successful = true; - break; - default: - successful = false; - exceptionMessage = String.format("Error message: %s", e.getMessage()); - } - } catch (Exception e) { - successful = false; - exceptionMessage = String.format("Error message: %s", e.getMessage()); - } - if (!successful) { - String errorMessage = String.format( - "Could not connect to Elasticsearch. %s", - exceptionMessage - ); - addErrorMessage(CONNECTION_URL_CONFIG, errorMessage); - - if (config.isAuthenticatedConnection()) { - errorMessage = String.format( - "Could not authenticate the user. Check the '%s' and '%s'. %s", - CONNECTION_USERNAME_CONFIG, - CONNECTION_PASSWORD_CONFIG, - exceptionMessage - ); - addErrorMessage(CONNECTION_USERNAME_CONFIG, errorMessage); - addErrorMessage(CONNECTION_PASSWORD_CONFIG, errorMessage); - } - - if (config.isSslEnabled()) { - errorMessage = String.format( - "Could not connect to Elasticsearch. Check your SSL settings.%s", - exceptionMessage - ); - - addErrorMessage(SECURITY_PROTOCOL_CONFIG, errorMessage); - } - - if (config.isKerberosEnabled()) { - errorMessage = String.format( - "Could not connect to Elasticsearch. Check your Kerberos settings. %s", - exceptionMessage - ); - - addErrorMessage(KERBEROS_PRINCIPAL_CONFIG, errorMessage); - addErrorMessage(KERBEROS_KEYTAB_PATH_CONFIG, errorMessage); - } - - if (config.isBasicProxyConfigured()) { - errorMessage = String.format( - "Could not connect to Elasticsearch. Check your proxy settings. %s", - exceptionMessage - ); - addErrorMessage(PROXY_HOST_CONFIG, errorMessage); - addErrorMessage(PROXY_PORT_CONFIG, errorMessage); - - if (config.isProxyWithAuthenticationConfigured()) { - addErrorMessage(PROXY_USERNAME_CONFIG, errorMessage); - addErrorMessage(PROXY_PASSWORD_CONFIG, errorMessage); - } - } - } - } - - private void addErrorMessage(String property, String error) { - values.get(property).addErrorMessage(error); - } - - private RestHighLevelClient createClient() { - ConfigCallbackHandler configCallbackHandler = new ConfigCallbackHandler(config); - return new RestHighLevelClient( - RestClient - .builder( - config.connectionUrls() - .stream() - .map(HttpHost::create) - .collect(Collectors.toList()) - .toArray(new HttpHost[config.connectionUrls().size()]) - ) - .setHttpClientConfigCallback(configCallbackHandler) - ); - } - - private boolean hasErrors() { - for (ConfigValue config : validations) { - if (!config.errorMessages().isEmpty()) { - return true; - } - } - - return false; - } - - interface ClientFactory { - RestHighLevelClient client(); - } } diff --git a/src/main/java/io/confluent/connect/elasticsearch/opensearch/ConfigCallbackHandler.java b/src/main/java/io/confluent/connect/elasticsearch/opensearch/ConfigCallbackHandler.java new file mode 100644 index 000000000..503cd10a0 --- /dev/null +++ b/src/main/java/io/confluent/connect/elasticsearch/opensearch/ConfigCallbackHandler.java @@ -0,0 +1,370 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.connect.elasticsearch.opensearch; + +import com.sun.security.auth.module.Krb5LoginModule; +import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig; +import org.apache.http.HttpHost; +import org.apache.http.auth.AuthSchemeProvider; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.KerberosCredentials; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.client.config.AuthSchemes; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.config.Lookup; +import org.apache.http.config.Registry; +import org.apache.http.config.RegistryBuilder; +import org.apache.http.conn.ssl.NoopHostnameVerifier; +import org.apache.http.conn.ssl.SSLConnectionSocketFactory; +import org.apache.http.impl.auth.SPNegoSchemeFactory; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; +import org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager; +import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor; +import org.apache.http.impl.nio.reactor.IOReactorConfig; +import org.apache.http.nio.conn.NoopIOSessionStrategy; +import org.apache.http.nio.conn.SchemeIOSessionStrategy; +import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy; +import org.apache.http.nio.reactor.ConnectingIOReactor; +import org.apache.http.nio.reactor.IOReactorException; +import org.apache.kafka.common.network.Mode; +import org.apache.kafka.common.security.ssl.SslFactory; +import org.apache.kafka.connect.errors.ConnectException; +import org.opensearch.client.RestClientBuilder.HttpClientConfigCallback; +import org.ietf.jgss.GSSCredential; +import org.ietf.jgss.GSSException; +import org.ietf.jgss.GSSManager; +import org.ietf.jgss.Oid; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.HostnameVerifier; +import javax.net.ssl.SSLContext; +import javax.security.auth.Subject; +import javax.security.auth.kerberos.KerberosPrincipal; +import javax.security.auth.login.AppConfigurationEntry; +import javax.security.auth.login.Configuration; +import javax.security.auth.login.LoginContext; +import java.security.AccessController; +import java.security.PrivilegedActionException; +import java.security.PrivilegedExceptionAction; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; + +public class ConfigCallbackHandler implements HttpClientConfigCallback { + + private static final Logger log = LoggerFactory.getLogger(ConfigCallbackHandler.class); + + private static final Oid SPNEGO_OID = spnegoOid(); + + private final ElasticsearchSinkConnectorConfig config; + + public ConfigCallbackHandler(ElasticsearchSinkConnectorConfig config) { + this.config = config; + } + + /** + * Customizes the client according to the configurations and starts the connection reaping thread. + * + * @param builder the HttpAsyncClientBuilder + * @return the builder + */ + @Override + public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder builder) { + RequestConfig requestConfig = RequestConfig.custom() + .setContentCompressionEnabled(config.compression()) + .setConnectTimeout(config.connectionTimeoutMs()) + .setConnectionRequestTimeout(config.readTimeoutMs()) + .setSocketTimeout(config.readTimeoutMs()) + .build(); + + builder.setConnectionManager(createConnectionManager()) + .setDefaultRequestConfig(requestConfig); + + configureAuthentication(builder); + + if (config.isKerberosEnabled()) { + configureKerberos(builder); + } + + if (config.isSslEnabled()) { + configureSslContext(builder); + } + + if (config.isKerberosEnabled() && config.isSslEnabled()) { + log.info("Using Kerberos and SSL connection to {}.", config.connectionUrls()); + } else if (config.isKerberosEnabled()) { + log.info("Using Kerberos connection to {}.", config.connectionUrls()); + } else if (config.isSslEnabled()) { + log.info("Using SSL connection to {}.", config.connectionUrls()); + } else { + log.info("Using unsecured connection to {}.", config.connectionUrls()); + } + + return builder; + } + + /** + * Configures HTTP authentication and proxy authentication according to the client configuration. + * + * @param builder the HttpAsyncClientBuilder + */ + private void configureAuthentication(HttpAsyncClientBuilder builder) { + CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + if (config.isAuthenticatedConnection()) { + config.connectionUrls().forEach(url -> credentialsProvider.setCredentials( + new AuthScope(HttpHost.create(url)), + new UsernamePasswordCredentials(config.username(), config.password().value()) + ) + ); + builder.setDefaultCredentialsProvider(credentialsProvider); + } + + if (config.isBasicProxyConfigured()) { + HttpHost proxy = new HttpHost(config.proxyHost(), config.proxyPort()); + builder.setProxy(proxy); + + if (config.isProxyWithAuthenticationConfigured()) { + credentialsProvider.setCredentials( + new AuthScope(proxy), + new UsernamePasswordCredentials(config.proxyUsername(), config.proxyPassword().value()) + ); + } + + builder.setDefaultCredentialsProvider(credentialsProvider); + } + } + + /** + * Creates a connection manager for the client. + * + * @return the connection manager + */ + private PoolingNHttpClientConnectionManager createConnectionManager() { + try { + PoolingNHttpClientConnectionManager cm; + IOReactorConfig ioReactorConfig = IOReactorConfig.custom() + .setConnectTimeout(config.connectionTimeoutMs()) + .setSoTimeout(config.readTimeoutMs()) + .build(); + ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(ioReactorConfig); + + if (config.isSslEnabled()) { + HostnameVerifier hostnameVerifier = config.shouldDisableHostnameVerification() + ? new NoopHostnameVerifier() + : SSLConnectionSocketFactory.getDefaultHostnameVerifier(); + Registry reg = RegistryBuilder.create() + .register("http", NoopIOSessionStrategy.INSTANCE) + .register("https", new SSLIOSessionStrategy(sslContext(), hostnameVerifier)) + .build(); + + cm = new PoolingNHttpClientConnectionManager(ioReactor, reg); + } else { + cm = new PoolingNHttpClientConnectionManager(ioReactor); + } + + // Allowing up to two http connections per processing thread to a given host + int maxPerRoute = Math.max(10, config.maxInFlightRequests() * 2); + cm.setDefaultMaxPerRoute(maxPerRoute); + // And for the global limit, with multiply the per-host limit + // by the number of potential different ES hosts + cm.setMaxTotal(maxPerRoute * config.connectionUrls().size()); + + log.debug("Connection pool config: maxPerRoute: {}, maxTotal {}", + cm.getDefaultMaxPerRoute(), + cm.getMaxTotal()); + + return cm; + } catch (IOReactorException e) { + throw new ConnectException("Unable to open ElasticsearchClient.", e); + } + } + + /** + * Configures the client to use Kerberos authentication. Overrides any proxy or basic auth + * credentials. + * + * @param builder the HttpAsyncClientBuilder to configure + * @return the configured builder + */ + private HttpAsyncClientBuilder configureKerberos(HttpAsyncClientBuilder builder) { + GSSManager gssManager = GSSManager.getInstance(); + Lookup authSchemeRegistry = + RegistryBuilder.create() + .register(AuthSchemes.SPNEGO, new SPNegoSchemeFactory()) + .build(); + builder.setDefaultAuthSchemeRegistry(authSchemeRegistry); + + try { + LoginContext loginContext = loginContext(); + GSSCredential credential = Subject.doAs( + loginContext.getSubject(), + (PrivilegedExceptionAction) () -> gssManager.createCredential( + null, + GSSCredential.DEFAULT_LIFETIME, + SPNEGO_OID, + GSSCredential.INITIATE_ONLY + ) + ); + CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials( + new AuthScope( + AuthScope.ANY_HOST, + AuthScope.ANY_PORT, + AuthScope.ANY_REALM, + AuthSchemes.SPNEGO + ), + new KerberosCredentials(credential) + ); + builder.setDefaultCredentialsProvider(credentialsProvider); + } catch (PrivilegedActionException e) { + throw new ConnectException(e); + } + + return builder; + } + + /** + * Configures the client to use SSL if configured. + * + * @param builder the HttpAsyncClientBuilder + */ + private void configureSslContext(HttpAsyncClientBuilder builder) { + HostnameVerifier hostnameVerifier = config.shouldDisableHostnameVerification() + ? new NoopHostnameVerifier() + : SSLConnectionSocketFactory.getDefaultHostnameVerifier(); + + SSLContext sslContext = sslContext(); + builder.setSSLContext(sslContext); + builder.setSSLHostnameVerifier(hostnameVerifier); + builder.setSSLStrategy(new SSLIOSessionStrategy(sslContext, hostnameVerifier)); + } + + /** + * Gets the SslContext for the client. + */ + private SSLContext sslContext() { + SslFactory sslFactory = new SslFactory(Mode.CLIENT, null, false); + sslFactory.configure(config.sslConfigs()); + + try { + // try AK <= 2.2 first + log.debug("Trying AK 2.2 SslFactory methods."); + return (SSLContext) SslFactory.class.getDeclaredMethod("sslContext").invoke(sslFactory); + } catch (Exception e) { + // must be running AK 2.3+ + log.debug("Could not find AK 2.2 SslFactory methods. Trying AK 2.3+ methods for SslFactory."); + + Object sslEngine; + try { + // try AK <= 2.6 second + sslEngine = SslFactory.class.getDeclaredMethod("sslEngineBuilder").invoke(sslFactory); + log.debug("Using AK 2.2-2.5 SslFactory methods."); + } catch (Exception ex) { + // must be running AK 2.6+ + log.debug( + "Could not find AK 2.3-2.5 SslFactory methods. Trying AK 2.6+ methods for SslFactory." + ); + try { + sslEngine = SslFactory.class.getDeclaredMethod("sslEngineFactory").invoke(sslFactory); + log.debug("Using AK 2.6+ SslFactory methods."); + } catch (Exception exc) { + throw new ConnectException("Failed to find methods for SslFactory.", exc); + } + } + + try { + return (SSLContext) sslEngine + .getClass() + .getDeclaredMethod("sslContext") + .invoke(sslEngine); + } catch (Exception ex) { + throw new ConnectException("Could not create SSLContext.", ex); + } + } + } + + /** + * Logs in and returns a login context for the given kerberos user principle. + * + * @return the login context + * @throws PrivilegedActionException if the login failed + */ + private LoginContext loginContext() throws PrivilegedActionException { + Configuration conf = new Configuration() { + @Override + public AppConfigurationEntry[] getAppConfigurationEntry(String name) { + return new AppConfigurationEntry[] { + new AppConfigurationEntry( + Krb5LoginModule.class.getName(), + AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, + kerberosConfigs() + ) + }; + } + }; + + return AccessController.doPrivileged( + (PrivilegedExceptionAction) () -> { + Subject subject = new Subject( + false, + Collections.singleton(new KerberosPrincipal(config.kerberosUserPrincipal())), + new HashSet<>(), + new HashSet<>() + ); + LoginContext loginContext = new LoginContext( + "ElasticsearchSinkConnector", + subject, + null, + conf + ); + loginContext.login(); + return loginContext; + } + ); + } + + /** + * Creates the Kerberos configurations. + * + * @return map of kerberos configs + */ + private Map kerberosConfigs() { + Map configs = new HashMap<>(); + configs.put("useTicketCache", "true"); + configs.put("renewTGT", "true"); + configs.put("useKeyTab", "true"); + configs.put("keyTab", config.keytabPath()); + //Krb5 in GSS API needs to be refreshed so it does not throw the error + //Specified version of key is not available + configs.put("refreshKrb5Config", "true"); + configs.put("principal", config.kerberosUserPrincipal()); + configs.put("storeKey", "false"); + configs.put("doNotPrompt", "true"); + return configs; + } + + private static Oid spnegoOid() { + try { + return new Oid("1.3.6.1.5.5.2"); + } catch (GSSException gsse) { + throw new ConnectException(gsse); + } + } +} diff --git a/src/main/java/io/confluent/connect/elasticsearch/opensearch/Mapping.java b/src/main/java/io/confluent/connect/elasticsearch/opensearch/Mapping.java new file mode 100644 index 000000000..14938ed84 --- /dev/null +++ b/src/main/java/io/confluent/connect/elasticsearch/opensearch/Mapping.java @@ -0,0 +1,255 @@ +package io.confluent.connect.elasticsearch.opensearch; + +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + + +import java.math.BigDecimal; +import org.apache.kafka.connect.data.Date; +import org.apache.kafka.connect.data.Decimal; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Time; +import org.apache.kafka.connect.data.Timestamp; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.errors.DataException; +import org.opensearch.common.xcontent.XContentBuilder; +import org.opensearch.common.xcontent.XContentFactory; + +import java.io.IOException; + +public class Mapping { + + // Elasticsearch types + public static final String BOOLEAN_TYPE = "boolean"; + public static final String BYTE_TYPE = "byte"; + public static final String BINARY_TYPE = "binary"; + public static final String SHORT_TYPE = "short"; + public static final String INTEGER_TYPE = "integer"; + public static final String LONG_TYPE = "long"; + public static final String FLOAT_TYPE = "float"; + public static final String DOUBLE_TYPE = "double"; + public static final String STRING_TYPE = "string"; + public static final String TEXT_TYPE = "text"; + public static final String KEYWORD_TYPE = "keyword"; + public static final String DATE_TYPE = "date"; + + // Elasticsearch mapping fields + private static final String DEFAULT_VALUE_FIELD = "null_value"; + private static final String FIELDS_FIELD = "fields"; + private static final String IGNORE_ABOVE_FIELD = "ignore_above"; + public static final String KEY_FIELD = "key"; + private static final String KEYWORD_FIELD = "keyword"; + private static final String PROPERTIES_FIELD = "properties"; + private static final String TYPE_FIELD = "type"; + public static final String VALUE_FIELD = "value"; + + /** + * Build mapping from the provided schema. + * + * @param schema The schema used to build the mapping. + * @return the schema as a JSON mapping + */ + public static XContentBuilder buildMapping(Schema schema) { + try { + XContentBuilder builder = XContentFactory.jsonBuilder(); + builder.startObject(); + { + buildMapping(schema, builder); + } + builder.endObject(); + return builder; + } catch (IOException e) { + throw new ConnectException("Failed to build mapping for schema " + schema, e); + } + } + + private static XContentBuilder buildMapping(Schema schema, XContentBuilder builder) + throws IOException { + + if (schema == null) { + throw new DataException("Cannot infer mapping without schema."); + } + + // Handle logical types + XContentBuilder logicalConversion = inferLogicalMapping(builder, schema); + if (logicalConversion != null) { + return logicalConversion; + } + + Schema.Type schemaType = schema.type(); + switch (schema.type()) { + case ARRAY: + return buildMapping(schema.valueSchema(), builder); + + case MAP: + return buildMap(schema, builder); + + case STRUCT: + return buildStruct(schema, builder); + + default: + return inferPrimitive(builder, getElasticsearchType(schemaType), schema.defaultValue()); + } + } + + private static void addTextMapping(XContentBuilder builder) throws IOException { + // Add additional mapping for indexing, per https://www.elastic.co/blog/strings-are-dead-long-live-strings + builder.startObject(FIELDS_FIELD); + { + builder.startObject(KEYWORD_FIELD); + { + builder.field(TYPE_FIELD, KEYWORD_TYPE); + builder.field(IGNORE_ABOVE_FIELD, 256); + } + builder.endObject(); + } + builder.endObject(); + } + + private static XContentBuilder buildMap(Schema schema, XContentBuilder builder) + throws IOException { + + builder.startObject(PROPERTIES_FIELD); + { + builder.startObject(KEY_FIELD); + { + buildMapping(schema.keySchema(), builder); + } + builder.endObject(); + builder.startObject(VALUE_FIELD); + { + buildMapping(schema.valueSchema(), builder); + } + builder.endObject(); + } + return builder.endObject(); + } + + private static XContentBuilder buildStruct(Schema schema, XContentBuilder builder) + throws IOException { + + builder.startObject(PROPERTIES_FIELD); + { + for (Field field : schema.fields()) { + builder.startObject(field.name()); + { + buildMapping(field.schema(), builder); + } + builder.endObject(); + } + } + return builder.endObject(); + } + + private static XContentBuilder inferPrimitive( + XContentBuilder builder, + String type, + Object defaultValue + ) throws IOException { + + if (type == null) { + throw new DataException(String.format("Invalid primitive type %s.", type)); + } + + builder.field(TYPE_FIELD, type); + if (type.equals(TEXT_TYPE)) { + addTextMapping(builder); + } + + if (defaultValue == null) { + return builder; + } + + switch (type) { + case BYTE_TYPE: + return builder.field(DEFAULT_VALUE_FIELD, (byte) defaultValue); + case SHORT_TYPE: + return builder.field(DEFAULT_VALUE_FIELD, (short) defaultValue); + case INTEGER_TYPE: + return builder.field(DEFAULT_VALUE_FIELD, (int) defaultValue); + case LONG_TYPE: + return builder.field(DEFAULT_VALUE_FIELD, (long) defaultValue); + case FLOAT_TYPE: + return builder.field(DEFAULT_VALUE_FIELD, (float) defaultValue); + case DOUBLE_TYPE: + return builder.field(DEFAULT_VALUE_FIELD, (double) defaultValue); + case BOOLEAN_TYPE: + return builder.field(DEFAULT_VALUE_FIELD, (boolean) defaultValue); + case DATE_TYPE: + return builder.field(DEFAULT_VALUE_FIELD, ((java.util.Date) defaultValue).getTime()); + /* + * IGNORE default values for text and binary types as this is not supported by ES side. + * see https://www.elastic.co/guide/en/elasticsearch/reference/current/text.html and + * https://www.elastic.co/guide/en/elasticsearch/reference/current/binary.html for details. + */ + case STRING_TYPE: + case TEXT_TYPE: + case BINARY_TYPE: + return builder; + default: + throw new DataException("Invalid primitive type " + type + "."); + } + } + + private static XContentBuilder inferLogicalMapping(XContentBuilder builder, Schema schema) + throws IOException { + + if (schema.name() == null) { + return null; + } + + switch (schema.name()) { + case Date.LOGICAL_NAME: + case Time.LOGICAL_NAME: + case Timestamp.LOGICAL_NAME: + return inferPrimitive(builder, DATE_TYPE, schema.defaultValue()); + case Decimal.LOGICAL_NAME: + Double defaultValue = schema.defaultValue() != null ? ((BigDecimal) schema.defaultValue()) + .doubleValue() : null; + return inferPrimitive(builder, DOUBLE_TYPE, defaultValue); + default: + // User-defined type or unknown built-in + return null; + } + } + + // visible for testing + protected static String getElasticsearchType(Schema.Type schemaType) { + switch (schemaType) { + case BOOLEAN: + return BOOLEAN_TYPE; + case INT8: + return BYTE_TYPE; + case INT16: + return SHORT_TYPE; + case INT32: + return INTEGER_TYPE; + case INT64: + return LONG_TYPE; + case FLOAT32: + return FLOAT_TYPE; + case FLOAT64: + return DOUBLE_TYPE; + case STRING: + return TEXT_TYPE; + case BYTES: + return BINARY_TYPE; + default: + return null; + } + } +} + diff --git a/src/main/java/io/confluent/connect/elasticsearch/opensearch/OpensearchClient.java b/src/main/java/io/confluent/connect/elasticsearch/opensearch/OpensearchClient.java new file mode 100644 index 000000000..55a53bedd --- /dev/null +++ b/src/main/java/io/confluent/connect/elasticsearch/opensearch/OpensearchClient.java @@ -0,0 +1,758 @@ +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.connect.elasticsearch.opensearch; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.BiConsumer; + +import io.confluent.connect.elasticsearch.*; +import org.apache.http.HttpHost; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.errors.DataException; +import org.apache.kafka.connect.sink.ErrantRecordReporter; +import org.apache.kafka.connect.sink.SinkRecord; +import org.opensearch.OpenSearchStatusException; +import org.opensearch.action.ActionListener; +import org.opensearch.action.DocWriteRequest; +import org.opensearch.action.bulk.BackoffPolicy; +import org.opensearch.action.bulk.BulkItemResponse; +import org.opensearch.action.bulk.BulkProcessor; +import org.opensearch.action.bulk.BulkProcessor.Listener; +import org.opensearch.action.bulk.BulkRequest; +import org.opensearch.action.bulk.BulkResponse; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.client.RequestOptions; +import org.opensearch.client.RestClient; +import org.opensearch.client.RestHighLevelClient; +import org.opensearch.client.RestClientBuilder; +import org.opensearch.client.core.MainResponse; +import org.opensearch.client.indices.CreateDataStreamRequest; +import org.opensearch.client.indices.CreateIndexRequest; +import org.opensearch.client.indices.GetIndexRequest; +import org.opensearch.client.indices.GetMappingsRequest; +import org.opensearch.client.indices.GetMappingsResponse; +import org.opensearch.client.indices.PutMappingRequest; +import org.opensearch.cluster.metadata.MappingMetadata; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.unit.ByteSizeValue; +import org.opensearch.index.VersionType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.BehaviorOnMalformedDoc; + +import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.FLUSH_TIMEOUT_MS_CONFIG; +import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.MAX_BUFFERED_RECORDS_CONFIG; +import static java.util.stream.Collectors.toList; + +/** + * Based on Elasticsearch's BulkProcessor, which is responsible for building batches based on size + * and linger time (not grouped by partitions) and limiting the concurrency (max number of + * in-flight requests). + * + *

Batch processing is asynchronous. BulkProcessor delegates the bulk calls to a separate thread + * pool. Retries are handled synchronously in each batch thread. + * + *

If all the retries fail, the exception is reported via an atomic reference to an error, + * which is checked and thrown from a subsequent call to the task's put method and that results + * in failure of the task. + */ +@SuppressWarnings("checkstyle:ClassDataAbstractionCoupling") +public class OpensearchClient extends SearchClient{ + + private static final Logger log = LoggerFactory.getLogger(ElasticsearchClient.class); + + private static final long WAIT_TIME_MS = 10; + private static final long CLOSE_WAIT_TIME_MS = 5_000; + private static final String RESOURCE_ALREADY_EXISTS_EXCEPTION = + "resource_already_exists_exception"; + private static final String VERSION_CONFLICT_EXCEPTION = "version_conflict_engine_exception"; + private static final Set MALFORMED_DOC_ERRORS = new HashSet<>( + Arrays.asList( + "strict_dynamic_mapping_exception", + "mapper_parsing_exception", + "illegal_argument_exception", + "action_request_validation_exception" + ) + ); + private static final String UNKNOWN_VERSION_TAG = "Unknown"; + + protected final AtomicInteger numBufferedRecords; + private final AtomicReference error; + protected final BulkProcessor bulkProcessor; + private final ConcurrentMap, SinkRecordAndOffset> requestToSinkRecord; + private final ConcurrentMap> inFlightRequests; + private final ElasticsearchSinkConnectorConfig config; + private final ErrantRecordReporter reporter; + private final RestHighLevelClient client; + private final ExecutorService bulkExecutorService; + private final Time clock; + private final Lock inFlightRequestLock = new ReentrantLock(); + private final Condition inFlightRequestsUpdated = inFlightRequestLock.newCondition(); + private final String openSearchVersion; + + @SuppressWarnings("deprecation") + public OpensearchClient( + ElasticsearchSinkConnectorConfig config, + ErrantRecordReporter reporter, + Runnable afterBulkCallback + ) { + this.bulkExecutorService = Executors.newFixedThreadPool(config.maxInFlightRequests()); + this.numBufferedRecords = new AtomicInteger(0); + this.error = new AtomicReference<>(); + this.requestToSinkRecord = new ConcurrentHashMap<>(); + this.inFlightRequests = reporter != null ? new ConcurrentHashMap<>() : null; + this.config = config; + this.reporter = reporter; + this.clock = Time.SYSTEM; + + ConfigCallbackHandler configCallbackHandler = new ConfigCallbackHandler(config); + RestClientBuilder client = RestClient.builder( + config.connectionUrls() + .stream() + .map(HttpHost::create) + .collect(toList()) + .toArray(new HttpHost[config.connectionUrls().size()])) + .setHttpClientConfigCallback(configCallbackHandler); + + RestHighLevelClient clientBuilder = new RestHighLevelClient(client); + + openSearchVersion = getServerVersion(client); + + this.client = clientBuilder; + this.bulkProcessor = BulkProcessor + .builder(buildConsumer(), buildListener(afterBulkCallback)) + .setBulkActions(config.batchSize()) + .setBulkSize((ByteSizeValue) config.bulkSize()) + .setConcurrentRequests(config.maxInFlightRequests() - 1) // 0 = no concurrent requests + .setFlushInterval(TimeValue.timeValueMillis(config.lingerMs())) + // Disabling bulk processor retries, because they only cover a small subset of errors + // (see https://github.com/elastic/elasticsearch/issues/71159) + // We are doing retries in the async thread instead. + .setBackoffPolicy(BackoffPolicy.noBackoff()) + .build(); + } + + /** + * Elastic High level Rest Client 7.17 has a compatibility mode to support ES 8. Checks the + * version number of ES to determine if we should be running in compatibility mode while using + * HLRC 7.17 to talk to ES. + */ + + private String getServerVersion(RestClientBuilder client) { + RestHighLevelClient highLevelClient = new RestHighLevelClient(client); + MainResponse response; + String esVersionNumber = UNKNOWN_VERSION_TAG; + try { + response = highLevelClient.info(RequestOptions.DEFAULT); + esVersionNumber = response.getVersion().getNumber(); + } catch (Exception e) { + // Same error messages as from validating the connection for IOException. + // Insufficient privileges to validate the version number if caught + // ElasticsearchStatusException. + log.warn("Failed to get ES server version", e); + } + return esVersionNumber; + } + + private BiConsumer> buildConsumer() { + return (req, lis) -> + // Executes a synchronous bulk request in a background thread, with synchronous retries. + // We don't use bulkAsync because we can't retry from its callback (see + // https://github.com/confluentinc/kafka-connect-elasticsearch/pull/575) + // BulkProcessor is the one guaranteeing that no more than maxInFlightRequests batches + // are started at the same time (a new consumer is not called until all others are finished), + // which means we don't need to limit the executor pending task queue. + + // Result is ignored because everything is reported via the corresponding ActionListener. + bulkExecutorService.submit(() -> { + try { + BulkResponse bulkResponse = callWithRetries( + "execute bulk request", + () -> client.bulk(req, RequestOptions.DEFAULT) + ); + lis.onResponse(bulkResponse); + } catch (Exception ex) { + lis.onFailure(ex); + } catch (Throwable ex) { + lis.onFailure(new ConnectException("Bulk request failed", ex)); + } + }); + } + + /** + * Returns the underlying Elasticsearch client. + * + * @return the underlying RestHighLevelClient + */ + public RestHighLevelClient client() { + return client; + } + + /** + * Closes the ElasticsearchClient. + * + * @throws ConnectException if all the records fail to flush before the timeout. + */ + public void close() { + try { + if (!bulkProcessor.awaitClose(config.flushTimeoutMs(), TimeUnit.MILLISECONDS)) { + throw new ConnectException( + "Failed to process outstanding requests in time while closing the ElasticsearchClient." + ); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new ConnectException( + "Interrupted while processing all in-flight requests on ElasticsearchClient close.", e + ); + } finally { + closeResources(); + } + } + + /** + * Creates an index or data stream. Will not recreate the index or data stream if + * it already exists. Will create a data stream instead of an index if the data stream + * configurations are set. + * + * @param name the name of the index or data stream to create + * @return true if the index or data stream was created, false if it already exists + */ + public boolean createIndexOrDataStream(String name) { + if (indexExists(name)) { + return false; + } + return config.isDataStream() ? createDataStream(name) : createIndex(name); + } + + /** + * Creates a mapping for the given index and schema. + * + * @param index the index to create the mapping for + * @param schema the schema to map + */ + public void createMapping(String index, Schema schema) { + PutMappingRequest request = new PutMappingRequest(index).source(Mapping.buildMapping(schema)); + callWithRetries( + String.format("create mapping for index %s with schema %s", index, schema), + () -> client.indices().putMapping(request, RequestOptions.DEFAULT) + ); + } + + + /** + * Triggers a flush of any buffered records. + */ + public void flush() { + bulkProcessor.flush(); + } + + public void waitForInFlightRequests() { + inFlightRequestLock.lock(); + try { + while (numBufferedRecords.get() > 0) { + inFlightRequestsUpdated.await(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new ConnectException(e); + } finally { + inFlightRequestLock.unlock(); + } + } + + /** + * Checks whether the index already has a mapping or not. + * @param index the index to check + * @return true if a mapping exists, false if it does not + */ + public boolean hasMapping(String index) { + MappingMetadata mapping = mapping(index); + return mapping != null && mapping.sourceAsMap() != null && !mapping.sourceAsMap().isEmpty(); + } + + /** + * Buffers a record to index. Will ensure that there are no concurrent requests for the same + * document id when either the DLQ is configured or + * {@link ElasticsearchSinkConnectorConfig#IGNORE_KEY_CONFIG} is set to false because + * they require the use of a map keyed by document id. + * + *

This call is usually asynchronous, but can block in any of the following scenarios: + *

    + *
  • A new batch is finished (e.g. max batch size has been reached) and + * the overall number of threads (max in flight requests) are in use.
  • + *
  • The maximum number of buffered records have been reached
  • + *
+ * + * @param record the record to index + * @param request the associated request to send + * @param offsetState record's offset state + * @throws ConnectException if one of the requests failed + */ + public void index(SinkRecord record, Object request, OffsetState offsetState) { + log.info("I am in index.... " + request); + throwIfFailed(); + + // TODO should we just pause partitions instead of blocking and failing the connector? + verifyNumBufferedRecords(); + requestToSinkRecord.put((DocWriteRequest) request, new SinkRecordAndOffset(record, offsetState)); + numBufferedRecords.incrementAndGet(); + bulkProcessor.add((DocWriteRequest) request); + } + + public void throwIfFailed() { + if (isFailed()) { + try { + close(); + } catch (ConnectException e) { + // if close fails, want to still throw the original exception + log.warn("Couldn't close elasticsearch client", e); + } + throw error.get(); + } + } + + /** + * Wait for internal buffer to be less than max.buffered.records configuration + */ + private void verifyNumBufferedRecords() { + long maxWaitTime = clock.milliseconds() + config.flushTimeoutMs(); + while (numBufferedRecords.get() >= config.maxBufferedRecords()) { + clock.sleep(WAIT_TIME_MS); + if (clock.milliseconds() > maxWaitTime) { + throw new ConnectException( + String.format("Could not make space in the internal buffer fast enough. " + + "Consider increasing %s or %s.", + FLUSH_TIMEOUT_MS_CONFIG, + MAX_BUFFERED_RECORDS_CONFIG + ) + ); + } + } + } + + static class SinkRecordAndOffset { + + private final SinkRecord sinkRecord; + private final OffsetState offsetState; + + public SinkRecordAndOffset(SinkRecord sinkRecord, OffsetState offsetState) { + this.sinkRecord = sinkRecord; + this.offsetState = offsetState; + } + } + + /** + * Checks whether the index exists. + * + * @param index the index to check + * @return true if it exists, false if it does not + */ + public boolean indexExists(String index) { + GetIndexRequest request = new GetIndexRequest(index); + return callWithRetries( + "check if index " + index + " exists", + () -> client.indices().exists(request, RequestOptions.DEFAULT) + ); + } + + /** + * Creates a listener with callback functions to handle completed requests for the BulkProcessor. + * + * @return the listener + */ + private BulkProcessor.Listener buildListener(Runnable afterBulkCallback) { + return new Listener() { + @Override + public void beforeBulk(long executionId, BulkRequest request) { + if (inFlightRequests != null) { + List sinkRecords = request.requests().stream() + .map(requestToSinkRecord::get) + .collect(toList()); + + inFlightRequests.put(executionId, sinkRecords); + } + } + + @Override + public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { + List> requests = request.requests(); + int idx = 0; + for (BulkItemResponse bulkItemResponse : response) { + DocWriteRequest req = idx < requests.size() ? requests.get(idx) : null; + boolean failed = handleResponse(bulkItemResponse, req, executionId); + if (!failed && req != null) { + requestToSinkRecord.get(req).offsetState.markProcessed(); + } + idx++; + } + + afterBulkCallback.run(); + + bulkFinished(executionId, request); + } + + @Override + public void afterBulk(long executionId, BulkRequest request, Throwable failure) { + log.warn("Bulk request {} failed", executionId, failure); + error.compareAndSet(null, new ConnectException("Bulk request failed", failure)); + bulkFinished(executionId, request); + } + + private void bulkFinished(long executionId, BulkRequest request) { + request.requests().forEach(requestToSinkRecord::remove); + removeFromInFlightRequests(executionId); + inFlightRequestLock.lock(); + try { + numBufferedRecords.addAndGet(-request.requests().size()); + inFlightRequestsUpdated.signalAll(); + } finally { + inFlightRequestLock.unlock(); + } + } + }; + } + + /** + * Calls the specified function with retries and backoffs until the retries are exhausted or the + * function succeeds. + * + * @param description description of the attempted action in present tense + * @param function the function to call and retry + * @param the return type of the function + * @return the return value of the called function + */ + private T callWithRetries(String description, Callable function) { + return RetryUtil.callWithRetries( + description, + function, + config.maxRetries() + 1, + config.retryBackoffMs() + ); + } + + /** + * Closes all the connection and thread resources of the client. + */ + private void closeResources() { + bulkExecutorService.shutdown(); + try { + if (!bulkExecutorService.awaitTermination(CLOSE_WAIT_TIME_MS, TimeUnit.MILLISECONDS)) { + bulkExecutorService.shutdownNow(); + } + } catch (InterruptedException e) { + bulkExecutorService.shutdownNow(); + Thread.currentThread().interrupt(); + log.warn("Interrupted while awaiting for executor service shutdown.", e); + } + + try { + client.close(); + } catch (IOException e) { + log.warn("Failed to close Elasticsearch client.", e); + } + } + + /** + * Creates a data stream. Will not recreate the data stream if it already exists. + * + * @param dataStream the data stream to create given in the form {type}-{dataset}-{topic} + * @return true if the data stream was created, false if it already exists + */ + private boolean createDataStream(String dataStream) { + CreateDataStreamRequest request = new CreateDataStreamRequest(dataStream); + return callWithRetries( + "create data stream " + dataStream, + () -> { + try { + client.indices().createDataStream(request, RequestOptions.DEFAULT); + } catch (OpenSearchStatusException | IOException e) { + if (!e.getMessage().contains(RESOURCE_ALREADY_EXISTS_EXCEPTION)) { + throw e; + } + return false; + } + return true; + } + ); + } + + /** + * Creates an index. Will not recreate the index if it already exists. + * + * @param index the index to create + * @return true if the index was created, false if it already exists + */ + private boolean createIndex(String index) { + CreateIndexRequest request = new CreateIndexRequest(index); + return callWithRetries( + "create index " + index, + () -> { + try { + client.indices().create(request, RequestOptions.DEFAULT); + } catch (OpenSearchStatusException | IOException e) { + if (!e.getMessage().contains(RESOURCE_ALREADY_EXISTS_EXCEPTION)) { + throw e; + } + return false; + } + return true; + } + ); + } + + /** + * Processes a response from a {@link org.elasticsearch.action.bulk.BulkItemRequest}. + * Successful responses are ignored. Failed responses are reported to the DLQ and handled + * according to configuration (ignore or fail). Version conflicts are ignored. + * + * @param response the response to process + * @param request the request which generated the response + * @param executionId the execution id of the request + * @return true if the record was not successfully processed, and we should not commit its offset + */ + protected boolean handleResponse(BulkItemResponse response, + DocWriteRequest request, + long executionId) { + log.info("Reached here..."); + if (response.isFailed()) { + log.info("Response failed Sparsh....." + response); + for (String error : MALFORMED_DOC_ERRORS) { + if (response.getFailureMessage().contains(error)) { + boolean failed = handleMalformedDocResponse(response); + if (!failed) { + reportBadRecord(response, executionId); + } + return failed; + } + } + if (response.getFailureMessage().contains(VERSION_CONFLICT_EXCEPTION)) { + // Now check if this version conflict is caused by external version number + // which was set by us (set explicitly to the topic's offset), in which case + // the version conflict is due to a repeated or out-of-order message offset + // and thus can be ignored, since the newer value (higher offset) should + // remain the key's value in any case. + if (request == null || request.versionType() != VersionType.EXTERNAL) { + log.warn("{} version conflict for operation {} on document '{}' version {}" + + " in index '{}'.", + request != null ? request.versionType() : "UNKNOWN", + response.getOpType(), + response.getId(), + response.getVersion(), + response.getIndex() + ); + + log.trace("{} version conflict for operation {} on document '{}' version {}" + + " in index '{}'", + request != null ? request.versionType() : "UNKNOWN", + response.getOpType(), + response.getId(), + response.getVersion(), + response.getIndex() + ); + // Maybe this was a race condition? Put it in the DLQ in case someone + // wishes to investigate. + reportBadRecord(response, executionId); + } else { + // This is an out-of-order or (more likely) repeated topic offset. Allow the + // higher offset's value for this key to remain. + // + // Note: For external version conflicts, response.getVersion() will be returned as -1, + // but we have the actual version number for this record because we set it in + // the request. + log.debug("Ignoring EXTERNAL version conflict for operation {} on" + + " document '{}' version {} in index '{}'.", + response.getOpType(), + response.getId(), + request.version(), + response.getIndex() + ); + } + return false; + } + + error.compareAndSet( + null, + new ConnectException( + "Indexing record failed -> Response status: " + + response.getFailure().getStatus() + + ",\n Index: " + response.getFailure().getIndex() + + ",\n Document Id: " + response.getFailure().getId()) + ); + return true; + } + return false; + } + + /** + * Handle a failed response as a result of a malformed document. Depending on the configuration, + * ignore or fail. + * + * @param response the failed response from ES + * @return true if the record was not successfully processed, and we should not commit its offset + */ + private boolean handleMalformedDocResponse(BulkItemResponse response) { + String errorMsg = String.format( + "Encountered an illegal document error -> Response status: '%s',\n" + + "Index: '%s',\n Document Id: '%s'. \n" + + "Ignoring and will not index record.", + response.getFailure().getStatus(), + response.getFailure().getIndex(), + response.getFailure().getId() + ); + switch (config.behaviorOnMalformedDoc()) { + case IGNORE: + log.debug(errorMsg); + return false; + case WARN: + log.warn(errorMsg); + return false; + case FAIL: + default: + log.error( + "Encountered an illegal document error -> Response status: '{}',\n " + + "Index: '{}',\n Document Id: '{}'\n" + + " To ignore future records like this," + + " change the configuration '{}' to '{}'.", + response.getFailure().getStatus(), + response.getFailure().getIndex(), + response.getFailure().getId(), + ElasticsearchSinkConnectorConfig.BEHAVIOR_ON_MALFORMED_DOCS_CONFIG, + BehaviorOnMalformedDoc.IGNORE + ); + error.compareAndSet( + null, + new ConnectException( + "Indexing record failed -> Response status: " + + response.getFailure().getStatus() + + ",\n Index: " + response.getFailure().getIndex() + + ",\n Document Id: " + response.getFailure().getId()) + ); + return true; + } + } + + /** + * Whether there is a failed response. + * + * @return true if a response has failed, false if none have failed + */ + public boolean isFailed() { + return error.get() != null; + } + + /** + * Gets the mapping for an index. + * + * @param index the index to fetch the mapping for + * @return the MappingMetadata for the index + */ + private MappingMetadata mapping(String index) { + GetMappingsRequest request = new GetMappingsRequest().indices(index); + GetMappingsResponse response = callWithRetries( + "get mapping for index " + index, + () -> client.indices().getMapping(request, RequestOptions.DEFAULT) + ); + return response.mappings().get(index); + } + + /** + * Removes the mapping for bulk request id to records being written. + * + * @param executionId the execution id of the bulk request + */ + private void removeFromInFlightRequests(long executionId) { + if (inFlightRequests != null) { + inFlightRequests.remove(executionId); + } + } + + /** + * Reports a bad record to the DLQ. + * + * @param response the failed response from ES + * @param executionId the execution id of the request associated with the response + */ + private synchronized void reportBadRecord(BulkItemResponse response, + long executionId) { + + // RCCA-7507 : Don't push to DLQ if we receive Internal version conflict on data streams + if (response.getFailureMessage().contains(VERSION_CONFLICT_EXCEPTION) + && config.isDataStream()) { + log.info("Skipping DLQ insertion for DataStream type."); + return; + } + if (reporter != null) { + List sinkRecords = + inFlightRequests.getOrDefault(executionId, new ArrayList<>()); + SinkRecordAndOffset original = sinkRecords.size() > response.getItemId() + ? sinkRecords.get(response.getItemId()) + : null; + if (original != null) { + // log only status, index and document id for record failure + reporter.report( + original.sinkRecord, + new ReportingException( + "Indexing failed -> Response status: " + response.getFailure().getStatus() + + ",\n Index: " + response.getFailure().getIndex() + + ",\n Document Id: " + response.getFailure().getId()) + ); + } + } + } + + /** + * Exception that swallows the stack trace used for reporting errors from Elasticsearch + * (mapper_parser_exception, illegal_argument_exception, and action_request_validation_exception) + * resulting from bad records using the AK 2.6 reporter DLQ interface. + */ + @SuppressWarnings("serial") + public static class ReportingException extends RuntimeException { + + public ReportingException(String message) { + super(message); + } + + /** + * This method is overridden to swallow the stack trace. + * + * @return Throwable + */ + @Override + public synchronized Throwable fillInStackTrace() { + return this; + } + } +} \ No newline at end of file diff --git a/src/main/java/io/confluent/connect/elasticsearch/opensearch/OpensearchDataConverter.java b/src/main/java/io/confluent/connect/elasticsearch/opensearch/OpensearchDataConverter.java new file mode 100644 index 000000000..0ccf67bcb --- /dev/null +++ b/src/main/java/io/confluent/connect/elasticsearch/opensearch/OpensearchDataConverter.java @@ -0,0 +1,426 @@ +package io.confluent.connect.elasticsearch.opensearch; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import io.confluent.connect.elasticsearch.ElasticsearchDataConverter; +import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig; +import io.confluent.connect.elasticsearch.SearchDataConverter; +import org.apache.kafka.connect.data.*; +import org.apache.kafka.connect.data.Date; +import org.apache.kafka.connect.errors.DataException; +import org.apache.kafka.connect.json.JsonConverter; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.storage.Converter; +import org.opensearch.action.DocWriteRequest; +import org.opensearch.action.delete.DeleteRequest; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.action.update.UpdateRequest; +import org.opensearch.index.VersionType; +import org.opensearch.common.xcontent.XContentType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.math.BigDecimal; +import java.nio.charset.StandardCharsets; +import java.util.*; + +public class OpensearchDataConverter implements SearchDataConverter { + + private static final Logger log = LoggerFactory.getLogger(ElasticsearchDataConverter.class); + + private static final Converter JSON_CONVERTER; + protected static final String MAP_KEY = "key"; + protected static final String MAP_VALUE = "value"; + protected static final String TIMESTAMP_FIELD = "@timestamp"; + + private ObjectMapper objectMapper; + + static { + JSON_CONVERTER = new JsonConverter(); + JSON_CONVERTER.configure(Collections.singletonMap("schemas.enable", "false"), false); + } + + private final ElasticsearchSinkConnectorConfig config; + + /** + * Create a DataConverter, specifying how map entries with string keys within record + * values should be written to JSON. Compact map entries are written as + * "entryKey": "entryValue", while the non-compact form are written as a nested + * document such as {"key": "entryKey", "value": "entryValue"}. All map entries + * with non-string keys are always written as nested documents. + * + * @param config connector config + */ + public OpensearchDataConverter(ElasticsearchSinkConnectorConfig config) { + this.config = config; + this.objectMapper = new ObjectMapper(); + } + + private String convertKey(Schema keySchema, Object key) { + if (key == null) { + throw new DataException("Key is used as document id and can not be null."); + } + if (String.valueOf(key).isEmpty()) { + throw new DataException("Key is used as document id and can not be empty."); + } + + final Schema.Type schemaType; + if (keySchema == null) { + schemaType = ConnectSchema.schemaType(key.getClass()); + if (schemaType == null) { + throw new DataException( + "Java class " + key.getClass() + " does not have corresponding schema type." + ); + } + } else { + schemaType = keySchema.type(); + } + + switch (schemaType) { + case INT8: + case INT16: + case INT32: + case INT64: + case STRING: + return String.valueOf(key); + default: + throw new DataException(schemaType.name() + " is not supported as the document id."); + } + } + + public Object convertRecord(SinkRecord record, String index) { + if (record.value() == null) { + switch (config.behaviorOnNullValues()) { + case IGNORE: + log.trace("Ignoring {} with null value.", recordString(record)); + return null; + case DELETE: + if (record.key() == null) { + // Since the record key is used as the ID of the index to delete and we don't have a key + // for this record, we can't delete anything anyways, so we ignore the record. + // We can also disregard the value of the ignoreKey parameter, since even if it's true + // the resulting index we'd try to delete would be based solely off topic/partition/ + // offset information for the SinkRecord. Since that information is guaranteed to be + // unique per message, we can be confident that there wouldn't be any corresponding + // index present in ES to delete anyways. + log.trace( + "Ignoring {} with null key, since the record key is used as the ID of the index", + recordString(record) + ); + return null; + } + // Will proceed as normal, ultimately creating a DeleteRequest + log.trace("Deleting {} from Elasticsearch", recordString(record)); + break; + case FAIL: + default: + throw new DataException( + String.format( + "%s has a null value (to ignore future records like" + + " this change the configuration property '%s' from '%s' to '%s')", + recordString(record), + ElasticsearchSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG, + ElasticsearchSinkConnectorConfig.BehaviorOnNullValues.FAIL, + ElasticsearchSinkConnectorConfig.BehaviorOnNullValues.IGNORE + ) + ); + } + } + + final String id = config.shouldIgnoreKey(record.topic()) + ? String.format("%s+%d+%d", record.topic(), record.kafkaPartition(), record.kafkaOffset()) + : convertKey(record.keySchema(), record.key()); + + // delete + if (record.value() == null) { + return maybeAddExternalVersioning(new DeleteRequest(index).id(id), record); + } + + String payload = getPayload(record); + payload = maybeAddTimestamp(payload, record.timestamp()); + + // index + switch (config.writeMethod()) { + case UPSERT: + return new UpdateRequest(index, id) + .doc(payload, XContentType.JSON) + .upsert(payload, XContentType.JSON) + .retryOnConflict(Math.min(config.maxInFlightRequests(), 5)); + case INSERT: + DocWriteRequest.OpType opType = config.isDataStream() ? DocWriteRequest.OpType.CREATE : DocWriteRequest.OpType.INDEX; + return maybeAddExternalVersioning( + new IndexRequest(index).id(id).source(payload, XContentType.JSON).opType(opType), + record + ); + default: + return null; // shouldn't happen + } + } + + private String getPayload(SinkRecord record) { + if (record.value() == null) { + return null; + } + + Schema schema = config.shouldIgnoreSchema(record.topic()) + ? record.valueSchema() + : preProcessSchema(record.valueSchema()); + Object value = config.shouldIgnoreSchema(record.topic()) + ? record.value() + : preProcessValue(record.value(), record.valueSchema(), schema); + + byte[] rawJsonPayload = JSON_CONVERTER.fromConnectData(record.topic(), schema, value); + return new String(rawJsonPayload, StandardCharsets.UTF_8); + } + + private String maybeAddTimestamp(String payload, Long timestamp) { + if (!config.isDataStream()) { + return payload; + } + try { + JsonNode jsonNode = objectMapper.readTree(payload); + + if (!jsonNode.isObject()) { + throw new DataException("Top level payload contains data of Json type " + + jsonNode.getNodeType() + ". Required Json object."); + } + + if (!config.dataStreamTimestampField().isEmpty()) { + for (String timestampField : config.dataStreamTimestampField()) { + if (jsonNode.has(timestampField)) { + ((ObjectNode) jsonNode).put(TIMESTAMP_FIELD, jsonNode.get(timestampField).asText()); + return objectMapper.writeValueAsString(jsonNode); + } else { + log.debug("Timestamp field {} is not present in payload. This record may fail or " + + "be skipped", + timestampField); + } + } + } else { + ((ObjectNode) jsonNode).put(TIMESTAMP_FIELD, timestamp); + return objectMapper.writeValueAsString(jsonNode); + } + } catch (JsonProcessingException e) { + // Should not happen if the payload was retrieved correctly. + } + return payload; + } + + /** + * In many cases, we explicitly set the record version using the topic's offset. + * This version will, in turn, be checked by Elasticsearch and will throw a versioning + * error if the request represents an equivalent or older version of the record. + * + * @param request the request currently being constructed for `record` + * @param record the record to be processed + * @return the (possibly modified) request which was passed in + */ + private DocWriteRequest maybeAddExternalVersioning( + DocWriteRequest request, + SinkRecord record + ) { + if (!config.isDataStream() && !config.shouldIgnoreKey(record.topic())) { + request.versionType(VersionType.EXTERNAL); + request.version(record.kafkaOffset()); + } + + return request; + } + + // We need to pre process the Kafka Connect schema before converting to JSON as Elasticsearch + // expects a different JSON format from the current JSON converter provides. Rather than + // completely rewrite a converter for Elasticsearch, we will refactor the JSON converter to + // support customized translation. The pre process is no longer needed once we have the JSON + // converter refactored. + // visible for testing + Schema preProcessSchema(Schema schema) { + if (schema == null) { + return null; + } + // Handle logical types + String schemaName = schema.name(); + if (schemaName != null) { + switch (schemaName) { + case Decimal.LOGICAL_NAME: + return copySchemaBasics(schema, SchemaBuilder.float64()).build(); + case Date.LOGICAL_NAME: + case Time.LOGICAL_NAME: + case Timestamp.LOGICAL_NAME: + return schema; + default: + // User type or unknown logical type + break; + } + } + + Schema.Type schemaType = schema.type(); + switch (schemaType) { + case ARRAY: + return preProcessArraySchema(schema); + case MAP: + return preProcessMapSchema(schema); + case STRUCT: + return preProcessStructSchema(schema); + default: + return schema; + } + } + + private Schema preProcessArraySchema(Schema schema) { + Schema valSchema = preProcessSchema(schema.valueSchema()); + return copySchemaBasics(schema, SchemaBuilder.array(valSchema)).build(); + } + + private Schema preProcessMapSchema(Schema schema) { + Schema keySchema = schema.keySchema(); + Schema valueSchema = schema.valueSchema(); + String keyName = keySchema.name() == null ? keySchema.type().name() : keySchema.name(); + String valueName = valueSchema.name() == null ? valueSchema.type().name() : valueSchema.name(); + Schema preprocessedKeySchema = preProcessSchema(keySchema); + Schema preprocessedValueSchema = preProcessSchema(valueSchema); + if (config.useCompactMapEntries() && keySchema.type() == Schema.Type.STRING) { + SchemaBuilder result = SchemaBuilder.map(preprocessedKeySchema, preprocessedValueSchema); + return copySchemaBasics(schema, result).build(); + } + Schema elementSchema = SchemaBuilder.struct().name(keyName + "-" + valueName) + .field(MAP_KEY, preprocessedKeySchema) + .field(MAP_VALUE, preprocessedValueSchema) + .build(); + return copySchemaBasics(schema, SchemaBuilder.array(elementSchema)).build(); + } + + private Schema preProcessStructSchema(Schema schema) { + SchemaBuilder builder = copySchemaBasics(schema, SchemaBuilder.struct().name(schema.name())); + for (Field field : schema.fields()) { + builder.field(field.name(), preProcessSchema(field.schema())); + } + return builder.build(); + } + + private SchemaBuilder copySchemaBasics(Schema source, SchemaBuilder target) { + if (source.isOptional()) { + target.optional(); + } + if (source.defaultValue() != null && source.type() != Schema.Type.STRUCT) { + final Object defaultVal = preProcessValue(source.defaultValue(), source, target); + target.defaultValue(defaultVal); + } + return target; + } + + // visible for testing + Object preProcessValue(Object value, Schema schema, Schema newSchema) { + // Handle missing schemas and acceptable null values + if (schema == null) { + return value; + } + + if (value == null) { + return preProcessNullValue(schema); + } + + // Handle logical types + String schemaName = schema.name(); + if (schemaName != null) { + Object result = preProcessLogicalValue(schemaName, value); + if (result != null) { + return result; + } + } + + Schema.Type schemaType = schema.type(); + switch (schemaType) { + case ARRAY: + return preProcessArrayValue(value, schema, newSchema); + case MAP: + return preProcessMapValue(value, schema, newSchema); + case STRUCT: + return preProcessStructValue(value, schema, newSchema); + default: + return value; + } + } + + private Object preProcessNullValue(Schema schema) { + if (schema.defaultValue() != null) { + return schema.defaultValue(); + } + if (schema.isOptional()) { + return null; + } + throw new DataException("null value for field that is required and has no default value"); + } + + // @returns the decoded logical value or null if this isn't a known logical type + private Object preProcessLogicalValue(String schemaName, Object value) { + switch (schemaName) { + case Decimal.LOGICAL_NAME: + return ((BigDecimal) value).doubleValue(); + case Date.LOGICAL_NAME: + case Time.LOGICAL_NAME: + case Timestamp.LOGICAL_NAME: + return value; + default: + // User-defined type or unknown built-in + return null; + } + } + + private Object preProcessArrayValue(Object value, Schema schema, Schema newSchema) { + Collection collection = (Collection) value; + List result = new ArrayList<>(); + for (Object element: collection) { + result.add(preProcessValue(element, schema.valueSchema(), newSchema.valueSchema())); + } + return result; + } + + private Object preProcessMapValue(Object value, Schema schema, Schema newSchema) { + Schema keySchema = schema.keySchema(); + Schema valueSchema = schema.valueSchema(); + Schema newValueSchema = newSchema.valueSchema(); + Map map = (Map) value; + if (config.useCompactMapEntries() && keySchema.type() == Schema.Type.STRING) { + Map processedMap = new HashMap<>(); + for (Map.Entry entry: map.entrySet()) { + processedMap.put( + preProcessValue(entry.getKey(), keySchema, newSchema.keySchema()), + preProcessValue(entry.getValue(), valueSchema, newValueSchema) + ); + } + return processedMap; + } + List mapStructs = new ArrayList<>(); + for (Map.Entry entry: map.entrySet()) { + Struct mapStruct = new Struct(newValueSchema); + Schema mapKeySchema = newValueSchema.field(MAP_KEY).schema(); + Schema mapValueSchema = newValueSchema.field(MAP_VALUE).schema(); + mapStruct.put(MAP_KEY, preProcessValue(entry.getKey(), keySchema, mapKeySchema)); + mapStruct.put(MAP_VALUE, preProcessValue(entry.getValue(), valueSchema, mapValueSchema)); + mapStructs.add(mapStruct); + } + return mapStructs; + } + + private Object preProcessStructValue(Object value, Schema schema, Schema newSchema) { + Struct struct = (Struct) value; + Struct newStruct = new Struct(newSchema); + for (Field field : schema.fields()) { + Schema newFieldSchema = newSchema.field(field.name()).schema(); + Object converted = preProcessValue(struct.get(field), field.schema(), newFieldSchema); + newStruct.put(field.name(), converted); + } + return newStruct; + } + + private static String recordString(SinkRecord record) { + return String.format( + "record from topic=%s partition=%s offset=%s", + record.topic(), + record.kafkaPartition(), + record.kafkaOffset() + ); + } +} diff --git a/src/main/java/io/confluent/connect/elasticsearch/opensearch/OpensearchValidator.java b/src/main/java/io/confluent/connect/elasticsearch/opensearch/OpensearchValidator.java new file mode 100644 index 000000000..6384d43c4 --- /dev/null +++ b/src/main/java/io/confluent/connect/elasticsearch/opensearch/OpensearchValidator.java @@ -0,0 +1,499 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.connect.elasticsearch.opensearch; + +import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig; +import io.confluent.connect.elasticsearch.Validator; +import org.apache.http.HttpHost; +import org.apache.kafka.common.config.Config; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.config.ConfigValue; +import org.apache.kafka.common.config.SslConfigs; +import org.opensearch.OpenSearchStatusException; +import org.opensearch.client.core.MainResponse; +import org.opensearch.client.RequestOptions; +import org.opensearch.client.RestClient; +import org.opensearch.client.RestHighLevelClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import io.confluent.connect.elasticsearch.Version; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.SecurityProtocol; + +import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.BATCH_SIZE_CONFIG; +import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG; +import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.BehaviorOnNullValues; +import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.CONNECTION_PASSWORD_CONFIG; +import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.CONNECTION_URL_CONFIG; +import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.CONNECTION_USERNAME_CONFIG; +import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.DATA_STREAM_DATASET_CONFIG; +import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.DATA_STREAM_TIMESTAMP_CONFIG; +import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.DATA_STREAM_TYPE_CONFIG; +import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.DataStreamType; +import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.FLUSH_TIMEOUT_MS_CONFIG; +import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.IGNORE_KEY_CONFIG; +import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.IGNORE_KEY_TOPICS_CONFIG; +import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.IGNORE_SCHEMA_CONFIG; +import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.IGNORE_SCHEMA_TOPICS_CONFIG; +import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.KERBEROS_KEYTAB_PATH_CONFIG; +import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.KERBEROS_PRINCIPAL_CONFIG; +import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.LINGER_MS_CONFIG; +import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.MAX_BUFFERED_RECORDS_CONFIG; +import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.MAX_IN_FLIGHT_REQUESTS_CONFIG; +import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.PROXY_HOST_CONFIG; +import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.PROXY_PASSWORD_CONFIG; +import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.PROXY_PORT_CONFIG; +import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.PROXY_USERNAME_CONFIG; +import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.SECURITY_PROTOCOL_CONFIG; +import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.SSL_CONFIG_PREFIX; +import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.WRITE_METHOD_CONFIG; +import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.WriteMethod; + +public class OpensearchValidator extends Validator { + + private static final Logger log = LoggerFactory.getLogger(OpensearchValidator.class); + + private static final String CONNECTOR_V11_COMPATIBLE_ES_VERSION = "7.0.0"; + private static final String DATA_STREAM_COMPATIBLE_ES_VERSION = "7.9.0"; + + private ElasticsearchSinkConnectorConfig config; + private Map values; + private List validations; + private ClientFactory clientFactory; + + public OpensearchValidator(Map props) { + this(props, null); + } + + // Exposed for testing + protected OpensearchValidator(Map props, ClientFactory clientFactory) { + try { + this.config = new ElasticsearchSinkConnectorConfig(props); + } catch (ConfigException e) { + // some configs are invalid + } + + this.clientFactory = clientFactory == null ? this::createClient : clientFactory; + validations = ElasticsearchSinkConnectorConfig.CONFIG.validate(props); + values = validations.stream().collect(Collectors.toMap(ConfigValue::name, Function.identity())); + } + + public Config validate() { + if (config == null) { + // individual configs are invalid, no point in validating combinations + return new Config(validations); + } + + validateCredentials(); + validateDataStreamConfigs(); + validateIgnoreConfigs(); + validateKerberos(); + validateLingerMs(); + validateMaxBufferedRecords(); + validateProxy(); + validateSsl(); + + if (!hasErrors()) { + // no point in connection validation if previous ones fails + try (RestHighLevelClient client = clientFactory.client()) { + validateConnection(client); + validateVersion(client); + } catch (IOException e) { + log.warn("Closing the client failed.", e); + } catch (Throwable e) { + log.error("Failed to create client to verify connection. ", e); + addErrorMessage(CONNECTION_URL_CONFIG, "Failed to create client to verify connection. " + + e.getMessage()); + } + } + + return new Config(validations); + } + + private void validateCredentials() { + boolean onlyOneSet = config.username() != null ^ config.password() != null; + if (onlyOneSet) { + String errorMessage = String.format( + "Both '%s' and '%s' must be set.", CONNECTION_USERNAME_CONFIG, CONNECTION_PASSWORD_CONFIG + ); + addErrorMessage(CONNECTION_USERNAME_CONFIG, errorMessage); + addErrorMessage(CONNECTION_PASSWORD_CONFIG, errorMessage); + } + } + + private void validateDataStreamConfigs() { + if (config.dataStreamType() == DataStreamType.NONE ^ config.dataStreamDataset().isEmpty()) { + String errorMessage = String.format( + "Either both or neither '%s' and '%s' must be set.", + DATA_STREAM_DATASET_CONFIG, + DATA_STREAM_TYPE_CONFIG + ); + addErrorMessage(DATA_STREAM_TYPE_CONFIG, errorMessage); + addErrorMessage(DATA_STREAM_DATASET_CONFIG, errorMessage); + } + + if (config.isDataStream() && config.writeMethod() == WriteMethod.UPSERT) { + String errorMessage = String.format( + "Upserts are not supported with data streams. %s must not be %s if %s and %s are set.", + WRITE_METHOD_CONFIG, + WriteMethod.UPSERT, + DATA_STREAM_TYPE_CONFIG, + DATA_STREAM_DATASET_CONFIG + ); + addErrorMessage(WRITE_METHOD_CONFIG, errorMessage); + } + + if (config.isDataStream() && config.behaviorOnNullValues() == BehaviorOnNullValues.DELETE) { + String errorMessage = String.format( + "Deletes are not supported with data streams. %s must not be %s if %s and %s are set.", + BEHAVIOR_ON_NULL_VALUES_CONFIG, + BehaviorOnNullValues.DELETE, + DATA_STREAM_TYPE_CONFIG, + DATA_STREAM_DATASET_CONFIG + ); + addErrorMessage(BEHAVIOR_ON_NULL_VALUES_CONFIG, errorMessage); + } + + if (!config.isDataStream() && !config.dataStreamTimestampField().isEmpty()) { + String errorMessage = String.format( + "Mapping a field to the '@timestamp' field is only necessary for data streams. " + + "%s must not be set if %s and %s are not set.", + DATA_STREAM_TIMESTAMP_CONFIG, + DATA_STREAM_TYPE_CONFIG, + DATA_STREAM_DATASET_CONFIG + ); + addErrorMessage(DATA_STREAM_TIMESTAMP_CONFIG, errorMessage); + } + } + + private void validateIgnoreConfigs() { + if (config.ignoreKey() && !config.ignoreKeyTopics().isEmpty()) { + String errorMessage = String.format( + "'%s' can not be set if '%s' is true.", IGNORE_KEY_TOPICS_CONFIG, IGNORE_KEY_CONFIG + ); + addErrorMessage(IGNORE_KEY_CONFIG, errorMessage); + addErrorMessage(IGNORE_KEY_TOPICS_CONFIG, errorMessage); + } + + if (config.ignoreSchema() && !config.ignoreSchemaTopics().isEmpty()) { + String errorMessage = String.format( + "'%s' can not be set if '%s' is true.", IGNORE_SCHEMA_TOPICS_CONFIG, IGNORE_SCHEMA_CONFIG + ); + addErrorMessage(IGNORE_SCHEMA_CONFIG, errorMessage); + addErrorMessage(IGNORE_SCHEMA_TOPICS_CONFIG, errorMessage); + } + } + + private void validateKerberos() { + boolean onlyOneSet = config.kerberosUserPrincipal() != null ^ config.keytabPath() != null; + if (onlyOneSet) { + String errorMessage = String.format( + "Either both or neither '%s' and '%s' must be set.", + KERBEROS_PRINCIPAL_CONFIG, + KERBEROS_KEYTAB_PATH_CONFIG + ); + addErrorMessage(KERBEROS_PRINCIPAL_CONFIG, errorMessage); + addErrorMessage(KERBEROS_KEYTAB_PATH_CONFIG, errorMessage); + } + + if (config.isKerberosEnabled()) { + // currently do not support Kerberos with regular auth + if (config.isAuthenticatedConnection()) { + String errorMessage = String.format( + "Either only Kerberos (%s, %s) or connection credentials (%s, %s) must be set.", + KERBEROS_PRINCIPAL_CONFIG, + KERBEROS_KEYTAB_PATH_CONFIG, + CONNECTION_USERNAME_CONFIG, + CONNECTION_PASSWORD_CONFIG + ); + addErrorMessage(KERBEROS_PRINCIPAL_CONFIG, errorMessage); + addErrorMessage(KERBEROS_KEYTAB_PATH_CONFIG, errorMessage); + addErrorMessage(CONNECTION_USERNAME_CONFIG, errorMessage); + addErrorMessage(CONNECTION_PASSWORD_CONFIG, errorMessage); + } + + // currently do not support Kerberos with proxy + if (config.isBasicProxyConfigured()) { + String errorMessage = String.format( + "Kerberos (%s, %s) is not supported with proxy settings (%s).", + KERBEROS_PRINCIPAL_CONFIG, + KERBEROS_KEYTAB_PATH_CONFIG, + PROXY_HOST_CONFIG + ); + addErrorMessage(KERBEROS_PRINCIPAL_CONFIG, errorMessage); + addErrorMessage(KERBEROS_KEYTAB_PATH_CONFIG, errorMessage); + addErrorMessage(PROXY_HOST_CONFIG, errorMessage); + } + } + + } + + private void validateLingerMs() { + if (config.lingerMs() > config.flushTimeoutMs()) { + String errorMessage = String.format( + "'%s' (%d) can not be larger than '%s' (%d).", + LINGER_MS_CONFIG, config.lingerMs(), FLUSH_TIMEOUT_MS_CONFIG, config.flushTimeoutMs() + ); + addErrorMessage(LINGER_MS_CONFIG, errorMessage); + addErrorMessage(FLUSH_TIMEOUT_MS_CONFIG, errorMessage); + } + } + + private void validateMaxBufferedRecords() { + if (config.maxBufferedRecords() < config.batchSize() * config.maxInFlightRequests()) { + String errorMessage = String.format( + "'%s' (%d) must be larger than or equal to '%s' (%d) x %s (%d).", + MAX_BUFFERED_RECORDS_CONFIG, config.maxBufferedRecords(), + BATCH_SIZE_CONFIG, config.batchSize(), + MAX_IN_FLIGHT_REQUESTS_CONFIG, config.maxInFlightRequests() + ); + + addErrorMessage(MAX_BUFFERED_RECORDS_CONFIG, errorMessage); + addErrorMessage(BATCH_SIZE_CONFIG, errorMessage); + addErrorMessage(MAX_IN_FLIGHT_REQUESTS_CONFIG, errorMessage); + } + } + + private void validateProxy() { + if (!config.isBasicProxyConfigured()) { + if (!config.proxyUsername().isEmpty()) { + String errorMessage = String.format( + "'%s' must be set to use '%s'.", PROXY_HOST_CONFIG, PROXY_USERNAME_CONFIG + ); + addErrorMessage(PROXY_USERNAME_CONFIG, errorMessage); + addErrorMessage(PROXY_HOST_CONFIG, errorMessage); + } + + if (config.proxyPassword() != null) { + String errorMessage = String.format( + "'%s' must be set to use '%s'.", PROXY_HOST_CONFIG, PROXY_PASSWORD_CONFIG + ); + addErrorMessage(PROXY_PASSWORD_CONFIG, errorMessage); + addErrorMessage(PROXY_HOST_CONFIG, errorMessage); + } + } else { + boolean onlyOneSet = config.proxyUsername().isEmpty() ^ config.proxyPassword() == null; + if (onlyOneSet) { + String errorMessage = String.format( + "Either both or neither '%s' and '%s' can be set.", + PROXY_USERNAME_CONFIG, + PROXY_PASSWORD_CONFIG + ); + addErrorMessage(PROXY_USERNAME_CONFIG, errorMessage); + addErrorMessage(PROXY_PASSWORD_CONFIG, errorMessage); + } + } + } + + private void validateSsl() { + Map sslConfigs = config.originalsWithPrefix(SSL_CONFIG_PREFIX); + if (!config.isSslEnabled()) { + if (!sslConfigs.isEmpty()) { + String errorMessage = String.format( + "'%s' must be set to '%s' to use SSL configs.", + SECURITY_PROTOCOL_CONFIG, + SecurityProtocol.SSL + ); + addErrorMessage(SECURITY_PROTOCOL_CONFIG, errorMessage); + } + } else { + if (sslConfigs.isEmpty()) { + String errorMessage = String.format( + "At least these SSL configs ('%s', '%s', '%s', and '%s') must be present for SSL" + + " support. Otherwise set '%s' to '%s'.", + SSL_CONFIG_PREFIX + SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, + SSL_CONFIG_PREFIX + SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, + SSL_CONFIG_PREFIX + SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, + SSL_CONFIG_PREFIX + SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, + SECURITY_PROTOCOL_CONFIG, + SecurityProtocol.PLAINTEXT + ); + addErrorMessage(SECURITY_PROTOCOL_CONFIG, errorMessage); + } + } + } + + private void validateVersion(RestHighLevelClient client) { + MainResponse response; + try { + response = client.info(RequestOptions.DEFAULT); + } catch (IOException | OpenSearchStatusException e) { + // Same error messages as from validating the connection for IOException. + // Insufficient privileges to validate the version number if caught + // ElasticsearchStatusException. + return; + } + String esVersionNumber = response.getVersion().getNumber(); + if (config.isDataStream() + && compareVersions(esVersionNumber, DATA_STREAM_COMPATIBLE_ES_VERSION) < 0) { + String errorMessage = String.format( + "Elasticsearch version %s is not compatible with data streams. Elasticsearch" + + "version must be at least %s.", + esVersionNumber, + DATA_STREAM_COMPATIBLE_ES_VERSION + ); + addErrorMessage(CONNECTION_URL_CONFIG, errorMessage); + addErrorMessage(DATA_STREAM_TYPE_CONFIG, errorMessage); + addErrorMessage(DATA_STREAM_DATASET_CONFIG, errorMessage); + } + if (compareVersions(esVersionNumber, CONNECTOR_V11_COMPATIBLE_ES_VERSION) < 0) { + String errorMessage = String.format( + "Connector version %s is not compatible with Elasticsearch version %s. Elasticsearch " + + "version must be at least %s.", + Version.getVersion(), + esVersionNumber, + CONNECTOR_V11_COMPATIBLE_ES_VERSION + ); + addErrorMessage(CONNECTION_URL_CONFIG, errorMessage); + } + } + + /** + * Compares versionNumber to compatibleVersion. + * + * @return a negative integer, zero, or a positive integer if + * versionNumber is less than, equal to, or greater + * than compatibleVersion. + */ + private int compareVersions(String versionNumber, String compatibleVersion) { + String[] versionSplit = versionNumber.split("\\."); + String[] compatibleSplit = compatibleVersion.split("\\."); + + for (int i = 0; i < Math.min(versionSplit.length, compatibleSplit.length); i++) { + String versionSplitBeforeSuffix = versionSplit[i].split("-")[0]; + String compatibleSplitBeforeSuffix = compatibleSplit[i].split("-")[0]; + int comparison = Integer.compare( + Integer.parseInt(versionSplitBeforeSuffix), + Integer.parseInt(compatibleSplitBeforeSuffix) + ); + if (comparison != 0) { + return comparison; + } + } + return versionSplit.length - compatibleSplit.length; + } + + private void validateConnection(RestHighLevelClient client) { + boolean successful; + String exceptionMessage = ""; + try { + successful = client.ping(RequestOptions.DEFAULT); + } catch (OpenSearchStatusException e) { + switch (e.status()) { + case FORBIDDEN: + // ES is up, but user is not authorized to ping server + successful = true; + break; + default: + successful = false; + exceptionMessage = String.format("Error message: %s", e.getMessage()); + } + } catch (Exception e) { + successful = false; + exceptionMessage = String.format("Error message: %s", e.getMessage()); + } + if (!successful) { + String errorMessage = String.format( + "Could not connect to Elasticsearch. %s", + exceptionMessage + ); + addErrorMessage(CONNECTION_URL_CONFIG, errorMessage); + + if (config.isAuthenticatedConnection()) { + errorMessage = String.format( + "Could not authenticate the user. Check the '%s' and '%s'. %s", + CONNECTION_USERNAME_CONFIG, + CONNECTION_PASSWORD_CONFIG, + exceptionMessage + ); + addErrorMessage(CONNECTION_USERNAME_CONFIG, errorMessage); + addErrorMessage(CONNECTION_PASSWORD_CONFIG, errorMessage); + } + + if (config.isSslEnabled()) { + errorMessage = String.format( + "Could not connect to Elasticsearch. Check your SSL settings.%s", + exceptionMessage + ); + + addErrorMessage(SECURITY_PROTOCOL_CONFIG, errorMessage); + } + + if (config.isKerberosEnabled()) { + errorMessage = String.format( + "Could not connect to Elasticsearch. Check your Kerberos settings. %s", + exceptionMessage + ); + + addErrorMessage(KERBEROS_PRINCIPAL_CONFIG, errorMessage); + addErrorMessage(KERBEROS_KEYTAB_PATH_CONFIG, errorMessage); + } + + if (config.isBasicProxyConfigured()) { + errorMessage = String.format( + "Could not connect to Elasticsearch. Check your proxy settings. %s", + exceptionMessage + ); + addErrorMessage(PROXY_HOST_CONFIG, errorMessage); + addErrorMessage(PROXY_PORT_CONFIG, errorMessage); + + if (config.isProxyWithAuthenticationConfigured()) { + addErrorMessage(PROXY_USERNAME_CONFIG, errorMessage); + addErrorMessage(PROXY_PASSWORD_CONFIG, errorMessage); + } + } + } + } + + private void addErrorMessage(String property, String error) { + values.get(property).addErrorMessage(error); + } + + private RestHighLevelClient createClient() { + ConfigCallbackHandler configCallbackHandler = new ConfigCallbackHandler(config); + return new RestHighLevelClient( + RestClient + .builder( + config.connectionUrls() + .stream() + .map(HttpHost::create) + .collect(Collectors.toList()) + .toArray(new HttpHost[config.connectionUrls().size()]) + ) + .setHttpClientConfigCallback(configCallbackHandler) + ); + } + + private boolean hasErrors() { + for (ConfigValue config : validations) { + if (!config.errorMessages().isEmpty()) { + return true; + } + } + + return false; + } + + interface ClientFactory { + RestHighLevelClient client(); + } +} diff --git a/src/test/java/io/confluent/connect/elasticsearch/DataConverterTest.java b/src/test/java/io/confluent/connect/elasticsearch/ElasticSearchElasticsearchDataConverterTest.java similarity index 91% rename from src/test/java/io/confluent/connect/elasticsearch/DataConverterTest.java rename to src/test/java/io/confluent/connect/elasticsearch/ElasticSearchElasticsearchDataConverterTest.java index 23edff5be..e81ccd382 100644 --- a/src/test/java/io/confluent/connect/elasticsearch/DataConverterTest.java +++ b/src/test/java/io/confluent/connect/elasticsearch/ElasticSearchElasticsearchDataConverterTest.java @@ -34,18 +34,18 @@ import java.util.List; import java.util.Map; -import static io.confluent.connect.elasticsearch.DataConverter.MAP_KEY; -import static io.confluent.connect.elasticsearch.DataConverter.MAP_VALUE; -import static io.confluent.connect.elasticsearch.DataConverter.TIMESTAMP_FIELD; +import static io.confluent.connect.elasticsearch.ElasticsearchDataConverter.MAP_KEY; +import static io.confluent.connect.elasticsearch.ElasticsearchDataConverter.MAP_VALUE; +import static io.confluent.connect.elasticsearch.ElasticsearchDataConverter.TIMESTAMP_FIELD; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThrows; import static org.junit.Assert.fail; -public class DataConverterTest { +public class ElasticSearchElasticsearchDataConverterTest { - private DataConverter converter; + private ElasticsearchDataConverter converter; private Map props; private String key; @@ -62,7 +62,7 @@ public void setUp() { props.put(ElasticsearchSinkConnectorConfig.IGNORE_KEY_CONFIG, "true"); props.put(ElasticsearchSinkConnectorConfig.COMPACT_MAP_ENTRIES_CONFIG, "true"); - converter = new DataConverter(new ElasticsearchSinkConnectorConfig(props)); + converter = new ElasticsearchDataConverter(new ElasticsearchSinkConnectorConfig(props)); key = "key"; topic = "topic"; partition = 0; @@ -209,7 +209,7 @@ public void stringKeyedMapNonCompactFormat() { // Use the older non-compact format for map entries with string keys props.put(ElasticsearchSinkConnectorConfig.COMPACT_MAP_ENTRIES_CONFIG, "false"); - converter = new DataConverter(new ElasticsearchSinkConnectorConfig(props)); + converter = new ElasticsearchDataConverter(new ElasticsearchSinkConnectorConfig(props)); Schema preProcessedSchema = converter.preProcessSchema(origSchema); assertEquals( @@ -244,7 +244,7 @@ public void stringKeyedMapCompactFormat() { // Use the newer compact format for map entries with string keys props.put(ElasticsearchSinkConnectorConfig.COMPACT_MAP_ENTRIES_CONFIG, "true"); - converter = new DataConverter(new ElasticsearchSinkConnectorConfig(props)); + converter = new ElasticsearchDataConverter(new ElasticsearchSinkConnectorConfig(props)); Schema preProcessedSchema = converter.preProcessSchema(origSchema); assertEquals( SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA).build(), @@ -289,7 +289,7 @@ public void optionalFieldsWithoutDefaults() { testOptionalFieldWithoutDefault(SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.BOOLEAN_SCHEMA)); // Have to test maps with useCompactMapEntries set to true and set to false props.put(ElasticsearchSinkConnectorConfig.COMPACT_MAP_ENTRIES_CONFIG, "false"); - converter = new DataConverter(new ElasticsearchSinkConnectorConfig(props)); + converter = new ElasticsearchDataConverter(new ElasticsearchSinkConnectorConfig(props)); testOptionalFieldWithoutDefault(SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.BOOLEAN_SCHEMA)); } @@ -315,7 +315,7 @@ public void ignoreOnNullValue() { props.put(ElasticsearchSinkConnectorConfig.IGNORE_KEY_CONFIG, "false"); props.put(ElasticsearchSinkConnectorConfig.IGNORE_SCHEMA_CONFIG, "false"); props.put(ElasticsearchSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG, BehaviorOnNullValues.IGNORE.name()); - converter = new DataConverter(new ElasticsearchSinkConnectorConfig(props)); + converter = new ElasticsearchDataConverter(new ElasticsearchSinkConnectorConfig(props)); SinkRecord sinkRecord = createSinkRecordWithValue(null); assertNull(converter.convertRecord(sinkRecord, index)); @@ -328,7 +328,7 @@ public void throwExceptionOnEmptyOrNullKey() { props.put(ElasticsearchSinkConnectorConfig.IGNORE_KEY_CONFIG, "false"); props.put(ElasticsearchSinkConnectorConfig.IGNORE_SCHEMA_CONFIG, "false"); props.put(ElasticsearchSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG, BehaviorOnNullValues.IGNORE.name()); - converter = new DataConverter(new ElasticsearchSinkConnectorConfig(props)); + converter = new ElasticsearchDataConverter(new ElasticsearchSinkConnectorConfig(props)); Schema preProcessedSchema = converter.preProcessSchema(schema); Struct struct = new Struct(preProcessedSchema).put("string", "myValue"); @@ -348,7 +348,7 @@ public void deleteOnNullValue() { props.put(ElasticsearchSinkConnectorConfig.IGNORE_KEY_CONFIG, "false"); props.put(ElasticsearchSinkConnectorConfig.IGNORE_SCHEMA_CONFIG, "false"); props.put(ElasticsearchSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG, BehaviorOnNullValues.DELETE.name()); - converter = new DataConverter(new ElasticsearchSinkConnectorConfig(props)); + converter = new ElasticsearchDataConverter(new ElasticsearchSinkConnectorConfig(props)); SinkRecord sinkRecord = createSinkRecordWithValue(null); DeleteRequest actualRecord = (DeleteRequest) converter.convertRecord(sinkRecord, index); @@ -362,7 +362,7 @@ public void ignoreDeleteOnNullValueWithNullKey() { props.put(ElasticsearchSinkConnectorConfig.IGNORE_KEY_CONFIG, "false"); props.put(ElasticsearchSinkConnectorConfig.IGNORE_SCHEMA_CONFIG, "false"); props.put(ElasticsearchSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG, BehaviorOnNullValues.DELETE.name()); - converter = new DataConverter(new ElasticsearchSinkConnectorConfig(props)); + converter = new ElasticsearchDataConverter(new ElasticsearchSinkConnectorConfig(props)); key = null; @@ -376,7 +376,7 @@ public void failOnNullValue() { props.put(ElasticsearchSinkConnectorConfig.IGNORE_KEY_CONFIG, "false"); props.put(ElasticsearchSinkConnectorConfig.IGNORE_SCHEMA_CONFIG, "false"); props.put(ElasticsearchSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG, BehaviorOnNullValues.FAIL.name()); - converter = new DataConverter(new ElasticsearchSinkConnectorConfig(props)); + converter = new ElasticsearchDataConverter(new ElasticsearchSinkConnectorConfig(props)); SinkRecord sinkRecord = createSinkRecordWithValue(null); try { @@ -402,7 +402,7 @@ public SinkRecord createSinkRecordWithValue(Object value) { @Test public void testDoNotInjectPayloadTimestampIfNotDataStream() { - converter = new DataConverter(new ElasticsearchSinkConnectorConfig(props)); + converter = new ElasticsearchDataConverter(new ElasticsearchSinkConnectorConfig(props)); Schema preProcessedSchema = converter.preProcessSchema(schema); Struct struct = new Struct(preProcessedSchema).put("string", "myValue"); SinkRecord sinkRecord = createSinkRecordWithValue(struct); @@ -416,7 +416,7 @@ public void testDoNotInjectPayloadTimestampIfNotDataStream() { public void testDoNotInjectMissingPayloadTimestampIfDataStreamAndTimestampMapNotFound() { configureDataStream(); props.put(ElasticsearchSinkConnectorConfig.DATA_STREAM_TIMESTAMP_CONFIG, "timestampFieldNotPresent"); - converter = new DataConverter(new ElasticsearchSinkConnectorConfig(props)); + converter = new ElasticsearchDataConverter(new ElasticsearchSinkConnectorConfig(props)); Schema preProcessedSchema = converter.preProcessSchema(schema); Struct struct = new Struct(preProcessedSchema).put("string", "myValue"); SinkRecord sinkRecord = createSinkRecordWithValue(struct); @@ -428,7 +428,7 @@ public void testDoNotInjectMissingPayloadTimestampIfDataStreamAndTimestampMapNot @Test public void testInjectPayloadTimestampIfDataStreamAndNoTimestampMapSet() { configureDataStream(); - converter = new DataConverter(new ElasticsearchSinkConnectorConfig(props)); + converter = new ElasticsearchDataConverter(new ElasticsearchSinkConnectorConfig(props)); Schema preProcessedSchema = converter.preProcessSchema(schema); Struct struct = new Struct(preProcessedSchema).put("string", "myValue"); SinkRecord sinkRecord = createSinkRecordWithValue(struct); @@ -441,7 +441,7 @@ public void testInjectPayloadTimestampIfDataStreamAndNoTimestampMapSet() { @Test public void testInjectPayloadTimestampEvenIfAlreadyExistsAndTimestampMapNotSet() { configureDataStream(); - converter = new DataConverter(new ElasticsearchSinkConnectorConfig(props)); + converter = new ElasticsearchDataConverter(new ElasticsearchSinkConnectorConfig(props)); schema = SchemaBuilder .struct() .name("struct") @@ -462,7 +462,7 @@ public void testMapPayloadTimestampIfDataStreamSetAndOneTimestampMapSet() { String timestampFieldMap = "onefield"; configureDataStream(); props.put(ElasticsearchSinkConnectorConfig.DATA_STREAM_TIMESTAMP_CONFIG, timestampFieldMap); - converter = new DataConverter(new ElasticsearchSinkConnectorConfig(props)); + converter = new ElasticsearchDataConverter(new ElasticsearchSinkConnectorConfig(props)); schema = SchemaBuilder .struct() .name("struct") @@ -483,7 +483,7 @@ public void testMapPayloadTimestampByPriorityIfMultipleTimestampMapsSet() { String timestampFieldToUse = "two"; configureDataStream(); props.put(ElasticsearchSinkConnectorConfig.DATA_STREAM_TIMESTAMP_CONFIG, "one, two, field"); - converter = new DataConverter(new ElasticsearchSinkConnectorConfig(props)); + converter = new ElasticsearchDataConverter(new ElasticsearchSinkConnectorConfig(props)); schema = SchemaBuilder .struct() .name("struct") @@ -503,7 +503,7 @@ public void testMapPayloadTimestampByPriorityIfMultipleTimestampMapsSet() { @Test(expected = DataException.class) public void testExceptionWhenNonObjectPayloadInDataStream() { configureDataStream(); - converter = new DataConverter(new ElasticsearchSinkConnectorConfig(props)); + converter = new ElasticsearchDataConverter(new ElasticsearchSinkConnectorConfig(props)); SinkRecord record = new SinkRecord("t", 0, Schema.STRING_SCHEMA, key, SchemaBuilder.array(Schema.STRING_SCHEMA).build(), Arrays.asList("a", "b"), offset, recordTimestamp, TimestampType.CREATE_TIME); @@ -514,7 +514,7 @@ public void testExceptionWhenNonObjectPayloadInDataStream() { public void testDoNotAddExternalVersioningIfDataStream() { configureDataStream(); props.put(ElasticsearchSinkConnectorConfig.IGNORE_KEY_CONFIG, "false"); - converter = new DataConverter(new ElasticsearchSinkConnectorConfig(props)); + converter = new ElasticsearchDataConverter(new ElasticsearchSinkConnectorConfig(props)); Schema preProcessedSchema = converter.preProcessSchema(schema); Struct struct = new Struct(preProcessedSchema).put("string", "myValue"); SinkRecord sinkRecord = createSinkRecordWithValue(struct); diff --git a/src/test/java/io/confluent/connect/elasticsearch/ElasticsearchClientTest.java b/src/test/java/io/confluent/connect/elasticsearch/ElasticsearchClientTest.java index bd527eb01..e4a437c20 100644 --- a/src/test/java/io/confluent/connect/elasticsearch/ElasticsearchClientTest.java +++ b/src/test/java/io/confluent/connect/elasticsearch/ElasticsearchClientTest.java @@ -89,7 +89,7 @@ public class ElasticsearchClientTest { private static ElasticsearchContainer container; - private DataConverter converter; + private ElasticsearchDataConverter converter; private ElasticsearchHelperClient helperClient; private ElasticsearchSinkConnectorConfig config; private Map props; @@ -115,7 +115,7 @@ public void setup() { props.put(IGNORE_KEY_CONFIG, "true"); props.put(LINGER_MS_CONFIG, "1000"); config = new ElasticsearchSinkConnectorConfig(props); - converter = new DataConverter(config); + converter = new ElasticsearchDataConverter(config); helperClient = new ElasticsearchHelperClient(container.getConnectionUrl(), config, container.shouldStartClientInCompatibilityMode()); helperClient.waitForConnection(30000); @@ -337,7 +337,7 @@ public void testDeleteRecord() throws Exception { props.put(BEHAVIOR_ON_NULL_VALUES_CONFIG, BehaviorOnNullValues.DELETE.name()); props.put(IGNORE_KEY_CONFIG, "false"); config = new ElasticsearchSinkConnectorConfig(props); - converter = new DataConverter(config); + converter = new ElasticsearchDataConverter(config); ElasticsearchClient client = new ElasticsearchClient(config, null, () -> offsetTracker.updateOffsets()); client.createIndexOrDataStream(index); @@ -360,7 +360,7 @@ public void testUpsertRecords() throws Exception { props.put(WRITE_METHOD_CONFIG, WriteMethod.UPSERT.name()); props.put(IGNORE_KEY_CONFIG, "false"); config = new ElasticsearchSinkConnectorConfig(props); - converter = new DataConverter(config); + converter = new ElasticsearchDataConverter(config); ElasticsearchClient client = new ElasticsearchClient(config, null, () -> offsetTracker.updateOffsets()); client.createIndexOrDataStream(index); @@ -404,7 +404,7 @@ public void testUpsertRecords() throws Exception { public void testIgnoreBadRecord() throws Exception { props.put(BEHAVIOR_ON_MALFORMED_DOCS_CONFIG, BehaviorOnMalformedDoc.IGNORE.name()); config = new ElasticsearchSinkConnectorConfig(props); - converter = new DataConverter(config); + converter = new ElasticsearchDataConverter(config); ElasticsearchClient client = new ElasticsearchClient(config, null, () -> offsetTracker.updateOffsets()); client.createIndexOrDataStream(index); @@ -474,7 +474,7 @@ public void testRetryRecordsOnSocketTimeoutFailure() throws Exception { props.put(RETRY_BACKOFF_MS_CONFIG, "1000"); props.put(MAX_IN_FLIGHT_REQUESTS_CONFIG, "1"); config = new ElasticsearchSinkConnectorConfig(props); - converter = new DataConverter(config); + converter = new ElasticsearchDataConverter(config); // mock bulk processor to throw errors ElasticsearchClient client = new ElasticsearchClient(config, null, () -> offsetTracker.updateOffsets()); @@ -502,7 +502,7 @@ public void testReporter() throws Exception { props.put(IGNORE_KEY_CONFIG, "false"); props.put(BEHAVIOR_ON_MALFORMED_DOCS_CONFIG, BehaviorOnMalformedDoc.IGNORE.name()); config = new ElasticsearchSinkConnectorConfig(props); - converter = new DataConverter(config); + converter = new ElasticsearchDataConverter(config); ErrantRecordReporter reporter = mock(ErrantRecordReporter.class); when(reporter.report(any(), any())) @@ -609,7 +609,7 @@ private List causeExternalVersionConflictError(ElasticsearchClient c public void testExternalVersionConflictReporterNotCalled() throws Exception { props.put(IGNORE_KEY_CONFIG, "false"); config = new ElasticsearchSinkConnectorConfig(props); - converter = new DataConverter(config); + converter = new ElasticsearchDataConverter(config); ErrantRecordReporter reporter = mock(ErrantRecordReporter.class); ElasticsearchClient client = new ElasticsearchClient(config, reporter, () -> offsetTracker.updateOffsets()); @@ -636,7 +636,7 @@ public void testExternalVersionConflictReporterNotCalled() throws Exception { public void testHandleResponseInternalVersionConflictReporterCalled() throws Exception { props.put(IGNORE_KEY_CONFIG, "false"); config = new ElasticsearchSinkConnectorConfig(props); - converter = new DataConverter(config); + converter = new ElasticsearchDataConverter(config); ErrantRecordReporter reporter = mock(ErrantRecordReporter.class); @@ -669,7 +669,7 @@ public void testNoVersionConflict() throws Exception { props.put(IGNORE_KEY_CONFIG, "false"); props.put(WRITE_METHOD_CONFIG, WriteMethod.UPSERT.name()); config = new ElasticsearchSinkConnectorConfig(props); - converter = new DataConverter(config); + converter = new ElasticsearchDataConverter(config); ErrantRecordReporter reporter = mock(ErrantRecordReporter.class); ErrantRecordReporter reporter2 = mock(ErrantRecordReporter.class); @@ -710,7 +710,7 @@ public void testSsl() throws Exception { props.put(SSL_CONFIG_PREFIX + SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, container.getTruststorePassword()); props.put(SSL_CONFIG_PREFIX + SslConfigs.SSL_KEY_PASSWORD_CONFIG, container.getKeyPassword()); config = new ElasticsearchSinkConnectorConfig(props); - converter = new DataConverter(config); + converter = new ElasticsearchDataConverter(config); ElasticsearchClient client = new ElasticsearchClient(config, null, () -> offsetTracker.updateOffsets()); helperClient = new ElasticsearchHelperClient(address, config, @@ -735,7 +735,7 @@ public void testWriteDataStreamInjectTimestamp() throws Exception { props.put(DATA_STREAM_TYPE_CONFIG, DATA_STREAM_TYPE); props.put(DATA_STREAM_DATASET_CONFIG, DATA_STREAM_DATASET); config = new ElasticsearchSinkConnectorConfig(props); - converter = new DataConverter(config); + converter = new ElasticsearchDataConverter(config); ElasticsearchClient client = new ElasticsearchClient(config, null, () -> offsetTracker.updateOffsets()); index = createIndexName(TOPIC); diff --git a/src/test/java/io/confluent/connect/elasticsearch/MappingTest.java b/src/test/java/io/confluent/connect/elasticsearch/MappingTest.java index 2be3b54aa..371db21fe 100644 --- a/src/test/java/io/confluent/connect/elasticsearch/MappingTest.java +++ b/src/test/java/io/confluent/connect/elasticsearch/MappingTest.java @@ -15,7 +15,6 @@ package io.confluent.connect.elasticsearch; -import com.github.tomakehurst.wiremock.common.Json; import com.google.gson.JsonObject; import com.google.gson.JsonParser; import java.io.ByteArrayOutputStream; @@ -182,7 +181,7 @@ private void verifyMapping(Schema schema, JsonObject mapping) { } } - DataConverter converter = new DataConverter(new ElasticsearchSinkConnectorConfig(ElasticsearchSinkConnectorConfigTest.addNecessaryProps(new HashMap<>()))); + ElasticsearchDataConverter converter = new ElasticsearchDataConverter(new ElasticsearchSinkConnectorConfig(ElasticsearchSinkConnectorConfigTest.addNecessaryProps(new HashMap<>()))); Schema.Type schemaType = schema.type(); switch (schemaType) { case ARRAY: diff --git a/src/test/java/io/confluent/connect/elasticsearch/ValidatorTest.java b/src/test/java/io/confluent/connect/elasticsearch/ValidatorTest.java index 3dba9ec8a..1dfaff12d 100644 --- a/src/test/java/io/confluent/connect/elasticsearch/ValidatorTest.java +++ b/src/test/java/io/confluent/connect/elasticsearch/ValidatorTest.java @@ -70,7 +70,7 @@ public class ValidatorTest { private MainResponse mockInfoResponse; private Map props; private RestHighLevelClient mockClient; - private Validator validator; + private ElasticsearchValidator validator; @Before public void setup() throws IOException { @@ -85,14 +85,14 @@ public void setup() throws IOException { @Test public void testValidDefaultConfig() { - validator = new Validator(props, () -> mockClient); + validator = new ElasticsearchValidator(props, () -> mockClient); Config result = validator.validate(); assertNoErrors(result); } @Test public void testInvalidIndividualConfigs() { - validator = new Validator(new HashMap<>(), () -> mockClient); + validator = new ElasticsearchValidator(new HashMap<>(), () -> mockClient); Config result = validator.validate(); assertHasErrorMessage(result, CONNECTION_URL_CONFIG, "Missing required configuration"); } @@ -101,7 +101,7 @@ public void testInvalidIndividualConfigs() { public void testValidUpsertDeleteOnDefaultConfig() { props.put(BEHAVIOR_ON_NULL_VALUES_CONFIG, "delete"); props.put(WRITE_METHOD_CONFIG, "upsert"); - validator = new Validator(props, () -> mockClient); + validator = new ElasticsearchValidator(props, () -> mockClient); Config result = validator.validate(); assertNoErrors(result); } @@ -109,7 +109,7 @@ public void testValidUpsertDeleteOnDefaultConfig() { @Test public void testInvalidCredentials() { props.put(CONNECTION_USERNAME_CONFIG, "username"); - validator = new Validator(props, () -> mockClient); + validator = new ElasticsearchValidator(props, () -> mockClient); Config result = validator.validate(); assertHasErrorMessage(result, CONNECTION_USERNAME_CONFIG, "must be set"); @@ -117,7 +117,7 @@ public void testInvalidCredentials() { props.remove(CONNECTION_USERNAME_CONFIG); props.put(CONNECTION_PASSWORD_CONFIG, "password"); - validator = new Validator(props); + validator = new ElasticsearchValidator(props); result = validator.validate(); assertHasErrorMessage(result, CONNECTION_USERNAME_CONFIG, "must be set"); assertHasErrorMessage(result, CONNECTION_PASSWORD_CONFIG, "must be set"); @@ -126,7 +126,7 @@ public void testInvalidCredentials() { @Test public void testClientThrowsElasticsearchStatusException() throws IOException { when(mockClient.ping(any(RequestOptions.class))).thenThrow(new ElasticsearchStatusException("Deleted resource.", RestStatus.GONE)); - validator = new Validator(props, () -> mockClient); + validator = new ElasticsearchValidator(props, () -> mockClient); Config result = validator.validate(); assertHasErrorMessage(result, CONNECTION_URL_CONFIG, "Could not connect to Elasticsearch. Error message: Deleted resource."); } @@ -134,7 +134,7 @@ public void testClientThrowsElasticsearchStatusException() throws IOException { @Test public void testValidCredentials() { // username and password not set - validator = new Validator(props, () -> mockClient); + validator = new ElasticsearchValidator(props, () -> mockClient); Config result = validator.validate(); assertNoErrors(result); @@ -142,7 +142,7 @@ public void testValidCredentials() { // both set props.put(CONNECTION_USERNAME_CONFIG, "username"); props.put(CONNECTION_PASSWORD_CONFIG, "password"); - validator = new Validator(props, () -> mockClient); + validator = new ElasticsearchValidator(props, () -> mockClient); result = validator.validate(); assertNoErrors(result); @@ -151,7 +151,7 @@ public void testValidCredentials() { @Test public void testInvalidMissingOneDataStreamConfig() { props.put(DATA_STREAM_DATASET_CONFIG, "a_valid_dataset"); - validator = new Validator(props, () -> mockClient); + validator = new ElasticsearchValidator(props, () -> mockClient); Config result = validator.validate(); assertHasErrorMessage(result, DATA_STREAM_DATASET_CONFIG, "must be set"); assertHasErrorMessage(result, DATA_STREAM_TYPE_CONFIG, "must be set"); @@ -161,13 +161,13 @@ public void testInvalidMissingOneDataStreamConfig() { public void testInvalidUpsertDeleteOnValidDataStreamConfigs() { props.put(DATA_STREAM_DATASET_CONFIG, "a_valid_dataset"); props.put(DATA_STREAM_TYPE_CONFIG, "logs"); - validator = new Validator(props, () -> mockClient); + validator = new ElasticsearchValidator(props, () -> mockClient); Config result = validator.validate(); assertNoErrors(result); props.put(BEHAVIOR_ON_NULL_VALUES_CONFIG, "delete"); props.put(WRITE_METHOD_CONFIG, "upsert"); - validator = new Validator(props, () -> mockClient); + validator = new ElasticsearchValidator(props, () -> mockClient); result = validator.validate(); assertHasErrorMessage(result, BEHAVIOR_ON_NULL_VALUES_CONFIG, "must not be"); @@ -180,7 +180,7 @@ public void testInvalidIgnoreConfigs() { props.put(IGNORE_KEY_TOPICS_CONFIG, "some,topics"); props.put(IGNORE_SCHEMA_CONFIG, "true"); props.put(IGNORE_SCHEMA_TOPICS_CONFIG, "some,other,topics"); - validator = new Validator(props, () -> mockClient); + validator = new ElasticsearchValidator(props, () -> mockClient); Config result = validator.validate(); assertHasErrorMessage(result, IGNORE_KEY_CONFIG, "is true"); @@ -194,7 +194,7 @@ public void testValidIgnoreConfigs() { // topics configs not set props.put(IGNORE_KEY_CONFIG, "true"); props.put(IGNORE_SCHEMA_CONFIG, "true"); - validator = new Validator(props, () -> mockClient); + validator = new ElasticsearchValidator(props, () -> mockClient); Config result = validator.validate(); assertNoErrors(result); @@ -204,7 +204,7 @@ public void testValidIgnoreConfigs() { props.put(IGNORE_KEY_TOPICS_CONFIG, "some,topics"); props.put(IGNORE_SCHEMA_CONFIG, "false"); props.put(IGNORE_SCHEMA_TOPICS_CONFIG, "some,other,topics"); - validator = new Validator(props, () -> mockClient); + validator = new ElasticsearchValidator(props, () -> mockClient); result = validator.validate(); assertNoErrors(result); @@ -213,7 +213,7 @@ public void testValidIgnoreConfigs() { @Test public void testInvalidKerberos() throws IOException { props.put(KERBEROS_PRINCIPAL_CONFIG, "principal"); - validator = new Validator(props, () -> mockClient); + validator = new ElasticsearchValidator(props, () -> mockClient); Config result = validator.validate(); assertHasErrorMessage(result, KERBEROS_PRINCIPAL_CONFIG, "must be set"); @@ -224,7 +224,7 @@ public void testInvalidKerberos() throws IOException { props.put(KERBEROS_PRINCIPAL_CONFIG, "principal"); props.put(KERBEROS_KEYTAB_PATH_CONFIG, keytab.toString()); props.put(PROXY_HOST_CONFIG, "proxy.com"); - validator = new Validator(props, () -> mockClient); + validator = new ElasticsearchValidator(props, () -> mockClient); result = validator.validate(); assertHasErrorMessage(result, KERBEROS_PRINCIPAL_CONFIG, "not supported with proxy settings"); @@ -235,7 +235,7 @@ public void testInvalidKerberos() throws IOException { props.remove(PROXY_HOST_CONFIG); props.put(CONNECTION_USERNAME_CONFIG, "username"); props.put(CONNECTION_PASSWORD_CONFIG, "password"); - validator = new Validator(props, () -> mockClient); + validator = new ElasticsearchValidator(props, () -> mockClient); result = validator.validate(); assertHasErrorMessage(result, KERBEROS_PRINCIPAL_CONFIG, "Either only Kerberos"); @@ -249,7 +249,7 @@ public void testInvalidKerberos() throws IOException { @Test public void testValidKerberos() throws IOException { // kerberos configs not set - validator = new Validator(props, () -> mockClient); + validator = new ElasticsearchValidator(props, () -> mockClient); Config result = validator.validate(); assertNoErrors(result); @@ -258,7 +258,7 @@ public void testValidKerberos() throws IOException { Path keytab = Files.createTempFile("es", ".keytab"); props.put(KERBEROS_PRINCIPAL_CONFIG, "principal"); props.put(KERBEROS_KEYTAB_PATH_CONFIG, keytab.toString()); - validator = new Validator(props, () -> mockClient); + validator = new ElasticsearchValidator(props, () -> mockClient); result = validator.validate(); assertNoErrors(result); @@ -269,7 +269,7 @@ public void testValidKerberos() throws IOException { public void testInvalidLingerMs() { props.put(LINGER_MS_CONFIG, "1001"); props.put(FLUSH_TIMEOUT_MS_CONFIG, "1000"); - validator = new Validator(props, () -> mockClient); + validator = new ElasticsearchValidator(props, () -> mockClient); Config result = validator.validate(); assertHasErrorMessage(result, LINGER_MS_CONFIG, "can not be larger than"); @@ -280,7 +280,7 @@ public void testInvalidLingerMs() { public void testValidLingerMs() { props.put(LINGER_MS_CONFIG, "999"); props.put(FLUSH_TIMEOUT_MS_CONFIG, "1000"); - validator = new Validator(props, () -> mockClient); + validator = new ElasticsearchValidator(props, () -> mockClient); Config result = validator.validate(); assertNoErrors(result); @@ -291,7 +291,7 @@ public void testInvalidMaxBufferedRecords() { props.put(MAX_BUFFERED_RECORDS_CONFIG, "1"); props.put(BATCH_SIZE_CONFIG, "2"); props.put(MAX_IN_FLIGHT_REQUESTS_CONFIG, "2"); - validator = new Validator(props, () -> mockClient); + validator = new ElasticsearchValidator(props, () -> mockClient); Config result = validator.validate(); assertHasErrorMessage(result, MAX_BUFFERED_RECORDS_CONFIG, "must be larger than or equal to"); @@ -304,7 +304,7 @@ public void testValidMaxBufferedRecords() { props.put(MAX_BUFFERED_RECORDS_CONFIG, "5"); props.put(BATCH_SIZE_CONFIG, "2"); props.put(MAX_IN_FLIGHT_REQUESTS_CONFIG, "2"); - validator = new Validator(props, () -> mockClient); + validator = new ElasticsearchValidator(props, () -> mockClient); Config result = validator.validate(); assertNoErrors(result); @@ -315,7 +315,7 @@ public void testInvalidProxy() { props.put(PROXY_HOST_CONFIG, ""); props.put(PROXY_USERNAME_CONFIG, "username"); props.put(PROXY_PASSWORD_CONFIG, "password"); - validator = new Validator(props, () -> mockClient); + validator = new ElasticsearchValidator(props, () -> mockClient); Config result = validator.validate(); assertHasErrorMessage(result, PROXY_HOST_CONFIG, " must be set to use"); @@ -326,7 +326,7 @@ public void testInvalidProxy() { props.put(PROXY_HOST_CONFIG, "proxy"); props.put(PROXY_PASSWORD_CONFIG, "password"); - validator = new Validator(props, () -> mockClient); + validator = new ElasticsearchValidator(props, () -> mockClient); result = validator.validate(); assertHasErrorMessage(result, PROXY_USERNAME_CONFIG, "Either both or neither"); @@ -336,7 +336,7 @@ public void testInvalidProxy() { @Test public void testValidProxy() { props.put(PROXY_HOST_CONFIG, "proxy"); - validator = new Validator(props, () -> mockClient); + validator = new ElasticsearchValidator(props, () -> mockClient); Config result = validator.validate(); assertNoErrors(result); @@ -344,7 +344,7 @@ public void testValidProxy() { props.put(PROXY_HOST_CONFIG, "proxy"); props.put(PROXY_USERNAME_CONFIG, "password"); props.put(PROXY_PASSWORD_CONFIG, "password"); - validator = new Validator(props, () -> mockClient); + validator = new ElasticsearchValidator(props, () -> mockClient); result = validator.validate(); assertNoErrors(result); @@ -354,7 +354,7 @@ public void testValidProxy() { public void testInvalidSsl() { // no SSL props.put(SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SSL.name()); - validator = new Validator(props, () -> mockClient); + validator = new ElasticsearchValidator(props, () -> mockClient); Config result = validator.validate(); assertHasErrorMessage(result, SECURITY_PROTOCOL_CONFIG, "At least these SSL configs "); @@ -365,7 +365,7 @@ public void testInvalidSsl() { props.put(SSL_CONFIG_PREFIX + SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "b"); props.put(SSL_CONFIG_PREFIX + SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "c"); props.put(SSL_CONFIG_PREFIX + SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "d"); - validator = new Validator(props, () -> mockClient); + validator = new ElasticsearchValidator(props, () -> mockClient); result = validator.validate(); assertHasErrorMessage(result, SECURITY_PROTOCOL_CONFIG, "to use SSL configs"); @@ -373,7 +373,7 @@ public void testInvalidSsl() { @Test public void testIncompatibleESVersionWithConnector() { - validator = new Validator(props, () -> mockClient); + validator = new ElasticsearchValidator(props, () -> mockClient); when(mockInfoResponse.getVersion().getNumber()).thenReturn("6.0.0"); Config result = validator.validate(); assertHasErrorMessage(result, CONNECTION_URL_CONFIG, "not compatible with Elasticsearch"); @@ -381,7 +381,7 @@ public void testIncompatibleESVersionWithConnector() { @Test public void testCompatibleESVersionWithConnector() { - validator = new Validator(props, () -> mockClient); + validator = new ElasticsearchValidator(props, () -> mockClient); String[] compatibleESVersions = {"7.0.0", "7.9.3", "7.10.0", "7.12.1", "8.0.0", "10.10.10"}; for (String version : compatibleESVersions) { when(mockInfoResponse.getVersion().getNumber()).thenReturn(version); @@ -395,7 +395,7 @@ public void testCompatibleESVersionWithConnector() { public void testValidSsl() { // no SSL props.put(SECURITY_PROTOCOL_CONFIG, SecurityProtocol.PLAINTEXT.name()); - validator = new Validator(props, () -> mockClient); + validator = new ElasticsearchValidator(props, () -> mockClient); Config result = validator.validate(); assertNoErrors(result); @@ -406,7 +406,7 @@ public void testValidSsl() { props.put(SSL_CONFIG_PREFIX + SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "b"); props.put(SSL_CONFIG_PREFIX + SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "c"); props.put(SSL_CONFIG_PREFIX + SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "d"); - validator = new Validator(props, () -> mockClient); + validator = new ElasticsearchValidator(props, () -> mockClient); result = validator.validate(); assertNoErrors(result); @@ -414,7 +414,7 @@ public void testValidSsl() { @Test public void testValidConnection() { - validator = new Validator(props, () -> mockClient); + validator = new ElasticsearchValidator(props, () -> mockClient); Config result = validator.validate(); assertNoErrors(result); @@ -423,7 +423,7 @@ public void testValidConnection() { @Test public void testInvalidConnection() throws IOException { when(mockClient.ping(eq(RequestOptions.DEFAULT))).thenReturn(false); - validator = new Validator(props, () -> mockClient); + validator = new ElasticsearchValidator(props, () -> mockClient); Config result = validator.validate(); assertHasErrorMessage(result, CONNECTION_URL_CONFIG, "Could not connect to Elasticsearch."); @@ -432,7 +432,7 @@ public void testInvalidConnection() throws IOException { @Test public void testInvalidConnectionThrows() throws IOException { when(mockClient.ping(eq(RequestOptions.DEFAULT))).thenThrow(new IOException("i iz fake")); - validator = new Validator(props, () -> mockClient); + validator = new ElasticsearchValidator(props, () -> mockClient); Config result = validator.validate(); assertHasErrorMessage(result, CONNECTION_URL_CONFIG, "Could not connect to Elasticsearch."); @@ -442,7 +442,7 @@ public void testInvalidConnectionThrows() throws IOException { public void testTimestampMappingDataStreamSet() { configureDataStream(); props.put(DATA_STREAM_TIMESTAMP_CONFIG, "one, two, fields"); - validator = new Validator(props, () -> mockClient); + validator = new ElasticsearchValidator(props, () -> mockClient); Config result = validator.validate(); @@ -452,7 +452,7 @@ public void testTimestampMappingDataStreamSet() { @Test public void testTimestampMappingDataStreamNotSet() { props.put(DATA_STREAM_TIMESTAMP_CONFIG, "one, two, fields"); - validator = new Validator(props, () -> mockClient); + validator = new ElasticsearchValidator(props, () -> mockClient); Config result = validator.validate(); @@ -462,7 +462,7 @@ public void testTimestampMappingDataStreamNotSet() { @Test public void testIncompatibleVersionDataStreamSet() { configureDataStream(); - validator = new Validator(props, () -> mockClient); + validator = new ElasticsearchValidator(props, () -> mockClient); when(mockInfoResponse.getVersion().getNumber()).thenReturn("7.8.1"); Config result = validator.validate(); @@ -473,7 +473,7 @@ public void testIncompatibleVersionDataStreamSet() { @Test public void testIncompatibleVersionDataStreamNotSet() { - validator = new Validator(props, () -> mockClient); + validator = new ElasticsearchValidator(props, () -> mockClient); String[] incompatibleESVersions = {"7.8.0", "7.7.1", "7.6.2", "7.2.0", "7.1.1", "7.0.0-rc2"}; for (String version : incompatibleESVersions) { when(mockInfoResponse.getVersion().getNumber()).thenReturn(version); @@ -485,7 +485,7 @@ public void testIncompatibleVersionDataStreamNotSet() { @Test public void testCompatibleVersionDataStreamNotSet() { - validator = new Validator(props, () -> mockClient); + validator = new ElasticsearchValidator(props, () -> mockClient); String[] compatibleESVersions = {"7.9.0", "7.9.3", "7.9.3-amd64", "7.10.0", "7.10.2", "7.11.0", "7.11.2", "7.12.0", "7.12.1", "8.0.0", "10.10.10", "10.1.10", "10.1.1", "8.10.10"}; for (String version : compatibleESVersions) { @@ -499,7 +499,7 @@ public void testCompatibleVersionDataStreamNotSet() { @Test public void testCompatibleVersionDataStreamSet() { configureDataStream(); - validator = new Validator(props, () -> mockClient); + validator = new ElasticsearchValidator(props, () -> mockClient); String[] compatibleESVersions = {"7.9.0", "7.9.3", "7.9.3-amd64", "7.10.0", "7.10.2", "7.11.0", "7.11.2", "7.12.0", "7.12.1", "8.0.0", "10.10.10", "10.1.10", "10.1.1", "8.10.10"}; for (String version : compatibleESVersions) {