Skip to content

Commit

Permalink
Support for Opensearch
Browse files Browse the repository at this point in the history
  • Loading branch information
sp-gupta committed Apr 10, 2023
1 parent 98ed07f commit dccae08
Show file tree
Hide file tree
Showing 20 changed files with 3,021 additions and 588 deletions.
10 changes: 10 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@
</repositories>

<dependencies>
<dependency>
<groupId>org.apache.lucene</groupId>
<artifactId>lucene-core</artifactId>
<version>9.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
Expand All @@ -75,6 +80,11 @@
<artifactId>connect-json</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.opensearch.client</groupId>
<artifactId>opensearch-rest-high-level-client</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.elasticsearch.action.bulk.BulkProcessor.Listener;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
Expand All @@ -61,6 +62,7 @@
import org.elasticsearch.client.indices.GetMappingsResponse;
import org.elasticsearch.client.indices.PutMappingRequest;
import org.elasticsearch.cluster.metadata.MappingMetadata;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.VersionType;
import org.slf4j.Logger;
Expand All @@ -85,7 +87,7 @@
* in failure of the task.
*/
@SuppressWarnings("checkstyle:ClassDataAbstractionCoupling")
public class ElasticsearchClient {
public class ElasticsearchClient extends SearchClient{

private static final Logger log = LoggerFactory.getLogger(ElasticsearchClient.class);

Expand All @@ -111,7 +113,7 @@ public class ElasticsearchClient {
private final ConcurrentMap<Long, List<SinkRecordAndOffset>> inFlightRequests;
private final ElasticsearchSinkConnectorConfig config;
private final ErrantRecordReporter reporter;
private final RestHighLevelClient client;
protected final RestHighLevelClient client;
private final ExecutorService bulkExecutorService;
private final Time clock;
private final Lock inFlightRequestLock = new ReentrantLock();
Expand Down Expand Up @@ -157,7 +159,7 @@ public ElasticsearchClient(
this.bulkProcessor = BulkProcessor
.builder(buildConsumer(), buildListener(afterBulkCallback))
.setBulkActions(config.batchSize())
.setBulkSize(config.bulkSize())
.setBulkSize((ByteSizeValue) config.bulkSize())
.setConcurrentRequests(config.maxInFlightRequests() - 1) // 0 = no concurrent requests
.setFlushInterval(TimeValue.timeValueMillis(config.lingerMs()))
// Disabling bulk processor retries, because they only cover a small subset of errors
Expand Down Expand Up @@ -331,15 +333,16 @@ public boolean hasMapping(String index) {
* @param offsetState record's offset state
* @throws ConnectException if one of the requests failed
*/
public void index(SinkRecord record, DocWriteRequest<?> request, OffsetState offsetState) {
public void index(SinkRecord record, Object request, OffsetState offsetState) {
DocWriteRequest<?> docWriteRequest = (DocWriteRequest<?>) request;
throwIfFailed();

// TODO should we just pause partitions instead of blocking and failing the connector?
verifyNumBufferedRecords();

requestToSinkRecord.put(request, new SinkRecordAndOffset(record, offsetState));
requestToSinkRecord.put(docWriteRequest, new SinkRecordAndOffset(record, offsetState));
numBufferedRecords.incrementAndGet();
bulkProcessor.add(request);
bulkProcessor.add(docWriteRequest);
}

public void throwIfFailed() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@
import java.util.List;
import java.util.Map;

public class DataConverter {
public class ElasticsearchDataConverter implements SearchDataConverter {

private static final Logger log = LoggerFactory.getLogger(DataConverter.class);
private static final Logger log = LoggerFactory.getLogger(ElasticsearchDataConverter.class);

private static final Converter JSON_CONVERTER;
protected static final String MAP_KEY = "key";
Expand All @@ -79,7 +79,7 @@ public class DataConverter {
*
* @param config connector config
*/
public DataConverter(ElasticsearchSinkConnectorConfig config) {
public ElasticsearchDataConverter(ElasticsearchSinkConnectorConfig config) {
this.config = config;
this.objectMapper = new ObjectMapper();
}
Expand Down Expand Up @@ -116,7 +116,7 @@ private String convertKey(Schema keySchema, Object key) {
}
}

public DocWriteRequest<?> convertRecord(SinkRecord record, String index) {
public Object convertRecord(SinkRecord record, String index) {
if (record.value() == null) {
switch (config.behaviorOnNullValues()) {
case IGNORE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@

package io.confluent.connect.elasticsearch;

import io.confluent.connect.elasticsearch.opensearch.OpensearchValidator;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -30,6 +33,7 @@
public class ElasticsearchSinkConnector extends SinkConnector {

private Map<String, String> configProperties;
private static final Logger log = LoggerFactory.getLogger(ElasticsearchClient.class);

@Override
public String version() {
Expand Down Expand Up @@ -76,7 +80,14 @@ public ConfigDef config() {

@Override
public Config validate(Map<String, String> connectorConfigs) {
Validator validator = new Validator(connectorConfigs);
Validator validator;
ElasticsearchSinkConnectorConfig config = new ElasticsearchSinkConnectorConfig(connectorConfigs);
log.info(config.toString());
if(config.getServiceType().equals("Elasticsearch")){
validator = new ElasticsearchValidator(connectorConfigs);
} else {
validator = new OpensearchValidator(connectorConfigs);
}
return validator.validate();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,11 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
);
private static final String DATA_STREAM_TIMESTAMP_DISPLAY = "Data Stream Timestamp Field";
private static final String DATA_STREAM_TIMESTAMP_DEFAULT = "";

private static final String SERVICE_TYPE_CONFIG =
"service.type";
private static final String SERVICE_TYPE_DISPLAY = "Elasticsearch or Opensearch";
private static final String SERVICE_TYPE_DEFAULT = "Elasticsearch";
private static final String SERVICE_TYPE_DOC = "Type of end service - Elasticsearch or opensearch";
private static final String CONNECTOR_GROUP = "Connector";
private static final String DATA_CONVERSION_GROUP = "Data Conversion";
private static final String PROXY_GROUP = "Proxy";
Expand Down Expand Up @@ -583,6 +587,16 @@ private static void addConnectorConfigs(ConfigDef configDef) {
++order,
Width.SHORT,
READ_TIMEOUT_MS_DISPLAY
).define(
SERVICE_TYPE_CONFIG,
Type.STRING,
SERVICE_TYPE_DEFAULT,
Importance.LOW,
SERVICE_TYPE_DOC,
CONNECTOR_GROUP,
++order,
Width.SHORT,
SERVICE_TYPE_DISPLAY
);
}

Expand Down Expand Up @@ -882,8 +896,14 @@ public BehaviorOnNullValues behaviorOnNullValues() {
return BehaviorOnNullValues.valueOf(getString(BEHAVIOR_ON_NULL_VALUES_CONFIG).toUpperCase());
}

public ByteSizeValue bulkSize() {
return new ByteSizeValue(getLong(BULK_SIZE_BYTES_CONFIG));
public Object bulkSize() {
if(getServiceType().equals("Elasticsearch")){
return new org.elasticsearch.common.unit.ByteSizeValue(getLong(BULK_SIZE_BYTES_CONFIG));
}
else{
return new org.opensearch.common.unit.ByteSizeValue(getLong(BULK_SIZE_BYTES_CONFIG));
}

}

public boolean compression() {
Expand Down Expand Up @@ -996,6 +1016,8 @@ public long retryBackoffMs() {
return getLong(RETRY_BACKOFF_MS_CONFIG);
}

public String getServiceType() { return getString(SERVICE_TYPE_CONFIG); }

private SecurityProtocol securityProtocol() {
return SecurityProtocol.valueOf(getString(SECURITY_PROTOCOL_CONFIG).toUpperCase());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.util.Set;
import java.util.function.BooleanSupplier;

import io.confluent.connect.elasticsearch.opensearch.OpensearchClient;
import io.confluent.connect.elasticsearch.opensearch.OpensearchDataConverter;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
Expand All @@ -29,7 +31,6 @@
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.elasticsearch.action.DocWriteRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -40,26 +41,26 @@ public class ElasticsearchSinkTask extends SinkTask {

private static final Logger log = LoggerFactory.getLogger(ElasticsearchSinkTask.class);

private DataConverter converter;
private ElasticsearchClient client;
private SearchDataConverter converter;
private SearchClient client;
private ElasticsearchSinkConnectorConfig config;
private ErrantRecordReporter reporter;
private Set<String> existingMappings;
private Set<String> indexCache;
private OffsetTracker offsetTracker;
private PartitionPauser partitionPauser;
public String type;

@Override
public void start(Map<String, String> props) {
start(props, null);
}

// visible for testing
protected void start(Map<String, String> props, ElasticsearchClient client) {
protected void start(Map<String, String> props, SearchClient client) {
log.info("Starting ElasticsearchSinkTask.");

this.config = new ElasticsearchSinkConnectorConfig(props);
this.converter = new DataConverter(config);
this.existingMappings = new HashSet<>();
this.indexCache = new HashSet<>();
int offsetHighWaterMark = config.maxBufferedRecords() * 10;
Expand All @@ -79,8 +80,18 @@ protected void start(Map<String, String> props, ElasticsearchClient client) {
log.warn("AK versions prior to 2.6 do not support the errant record reporter.");
}
Runnable afterBulkCallback = () -> offsetTracker.updateOffsets();
this.client = client != null ? client
: new ElasticsearchClient(config, reporter, afterBulkCallback);
if(config.getServiceType().equals("Elasticsearch")){
this.type = "Elasticsearch";
this.client = client != null ? client
: new ElasticsearchClient(config, reporter, afterBulkCallback);
this.converter = new ElasticsearchDataConverter(config);
} else {
log.info("Opensearch client.....");
this.type = "Opensearch";
this.client = client != null ? client
: new OpensearchClient(config, reporter, afterBulkCallback);
this.converter = new OpensearchDataConverter(config);
}

if (!config.flushSynchronously()) {
this.offsetTracker = new AsyncOffsetTracker(context);
Expand Down Expand Up @@ -254,7 +265,7 @@ private void tryWriteRecord(SinkRecord sinkRecord, OffsetState offsetState) {
ensureIndexExists(indexName);
checkMapping(indexName, sinkRecord);

DocWriteRequest<?> docWriteRequest = null;
Object docWriteRequest = null;
try {
docWriteRequest = converter.convertRecord(sinkRecord, indexName);
} catch (DataException convertException) {
Expand All @@ -270,7 +281,9 @@ private void tryWriteRecord(SinkRecord sinkRecord, OffsetState offsetState) {

if (docWriteRequest != null) {
logTrace("Adding {} to bulk processor.", sinkRecord);
log.info("Client: " + client.getClass());
client.index(sinkRecord, docWriteRequest, offsetState);
log.info("Called index method");
}
}

Expand Down
Loading

0 comments on commit dccae08

Please sign in to comment.