diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java index afda5ba0e7449..133daa8cd6a68 100644 --- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java +++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java @@ -144,7 +144,7 @@ public boolean deleteIndex(String index) throws IOException { public boolean deleteDocument(String index, String documentId) throws IOException { final DeleteRequest req = new DeleteRequest.Builder() - .index(config.getIndexName()) + .index(index) .id(documentId) .build(); @@ -156,7 +156,7 @@ public boolean deleteDocument(String index, String documentId) throws IOExceptio public boolean indexDocument(String index, String documentId, String documentSource) throws IOException { final Map mapped = objectMapper.readValue(documentSource, Map.class); final IndexRequest indexRequest = new IndexRequest.Builder<>() - .index(config.getIndexName()) + .index(index) .document(mapped) .id(documentId) .build(); diff --git a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java index 62592f5f09b3f..4a16caf3ede40 100644 --- a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java +++ b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java @@ -18,39 +18,41 @@ */ package org.apache.pulsar.io.elasticsearch; -import co.elastic.clients.transport.ElasticsearchTransport; -import com.fasterxml.jackson.core.JsonParseException; -import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.api.Schema; -import org.apache.pulsar.client.api.schema.GenericObject; -import org.apache.pulsar.client.api.schema.GenericRecord; -import org.apache.pulsar.client.api.schema.GenericSchema; -import org.apache.pulsar.client.api.schema.RecordSchemaBuilder; -import org.apache.pulsar.client.api.schema.SchemaBuilder; -import org.apache.pulsar.client.impl.MessageImpl; -import org.apache.pulsar.common.schema.KeyValue; -import org.apache.pulsar.common.schema.KeyValueEncodingType; -import org.apache.pulsar.common.schema.SchemaType; - import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; - +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; +import co.elastic.clients.transport.ElasticsearchTransport; +import com.fasterxml.jackson.core.JsonParseException; +import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.HashMap; import java.util.List; import java.util.Locale; -import java.io.IOException; -import java.util.HashMap; import java.util.Map; import java.util.Optional; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.Stream; +import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.schema.GenericObject; +import org.apache.pulsar.client.api.schema.GenericRecord; +import org.apache.pulsar.client.api.schema.GenericSchema; +import org.apache.pulsar.client.api.schema.RecordSchemaBuilder; +import org.apache.pulsar.client.api.schema.SchemaBuilder; +import org.apache.pulsar.client.impl.MessageImpl; +import org.apache.pulsar.common.schema.KeyValue; +import org.apache.pulsar.common.schema.KeyValueEncodingType; +import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.io.core.SinkContext; import org.apache.pulsar.io.elasticsearch.client.BulkProcessor; @@ -72,10 +74,6 @@ import org.testng.annotations.DataProvider; import org.testng.annotations.Test; - -import static org.testng.Assert.assertNull; -import static org.testng.Assert.fail; - public abstract class ElasticSearchSinkTests extends ElasticSearchTestBase { private static ElasticsearchContainer container; @@ -149,6 +147,7 @@ public Object getNativeObject() { }); when(mockRecord.getSchema()).thenAnswer((Answer>>) invocation -> kvSchema); + when(mockRecord.getEventTime()).thenAnswer(invocation -> Optional.of(System.currentTimeMillis())); } @AfterMethod(alwaysRun = true) @@ -206,6 +205,16 @@ public final void send100Test() throws Exception { verify(mockRecord, times(100)).ack(); } + @Test + public final void send1WithFormattedIndexTest() throws Exception { + map.put("indexName", "test-formatted-index-%{+yyyy-MM-dd}"); + sink.open(map, mockSinkContext); + send(1); + verify(mockRecord, times(1)).ack(); + String value = getHitIdAtIndex("test-formatted-index-*", 0); + assertTrue(StringUtils.isNotBlank(value)); + } + @Test public final void sendNoSchemaTest() throws Exception {