Skip to content

Commit

Permalink
[fix][io] Fix es index creation (#22654) (#22701)
Browse files Browse the repository at this point in the history
Signed-off-by: Zixuan Liu <[email protected]>
  • Loading branch information
nodece authored May 13, 2024
1 parent b178084 commit f1daa75
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public boolean deleteIndex(String index) throws IOException {
public boolean deleteDocument(String index, String documentId) throws IOException {
final DeleteRequest req = new
DeleteRequest.Builder()
.index(config.getIndexName())
.index(index)
.id(documentId)
.build();

Expand All @@ -156,7 +156,7 @@ public boolean deleteDocument(String index, String documentId) throws IOExceptio
public boolean indexDocument(String index, String documentId, String documentSource) throws IOException {
final Map mapped = objectMapper.readValue(documentSource, Map.class);
final IndexRequest<Object> indexRequest = new IndexRequest.Builder<>()
.index(config.getIndexName())
.index(index)
.document(mapped)
.id(documentId)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,39 +18,41 @@
*/
package org.apache.pulsar.io.elasticsearch;

import co.elastic.clients.transport.ElasticsearchTransport;
import com.fasterxml.jackson.core.JsonParseException;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericSchema;
import org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
import org.apache.pulsar.client.api.schema.SchemaBuilder;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.SchemaType;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;

import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import co.elastic.clients.transport.ElasticsearchTransport;
import com.fasterxml.jackson.core.JsonParseException;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericSchema;
import org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
import org.apache.pulsar.client.api.schema.SchemaBuilder;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.elasticsearch.client.BulkProcessor;
Expand All @@ -72,10 +74,6 @@
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;


import static org.testng.Assert.assertNull;
import static org.testng.Assert.fail;

public abstract class ElasticSearchSinkTests extends ElasticSearchTestBase {

private static ElasticsearchContainer container;
Expand Down Expand Up @@ -149,6 +147,7 @@ public Object getNativeObject() {
});

when(mockRecord.getSchema()).thenAnswer((Answer<Schema<KeyValue<String, UserProfile>>>) invocation -> kvSchema);
when(mockRecord.getEventTime()).thenAnswer(invocation -> Optional.of(System.currentTimeMillis()));
}

@AfterMethod(alwaysRun = true)
Expand Down Expand Up @@ -206,6 +205,16 @@ public final void send100Test() throws Exception {
verify(mockRecord, times(100)).ack();
}

@Test
public final void send1WithFormattedIndexTest() throws Exception {
map.put("indexName", "test-formatted-index-%{+yyyy-MM-dd}");
sink.open(map, mockSinkContext);
send(1);
verify(mockRecord, times(1)).ack();
String value = getHitIdAtIndex("test-formatted-index-*", 0);
assertTrue(StringUtils.isNotBlank(value));
}

@Test
public final void sendNoSchemaTest() throws Exception {

Expand Down

0 comments on commit f1daa75

Please sign in to comment.