Skip to content

Commit

Permalink
[fix][io] Fix es index creation (#22654)
Browse files Browse the repository at this point in the history
Signed-off-by: Zixuan Liu <[email protected]>
  • Loading branch information
nodece authored and Technoboy- committed May 8, 2024
1 parent 06387de commit b86ebaa
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public boolean deleteIndex(String index) throws IOException {
public boolean deleteDocument(String index, String documentId) throws IOException {
final DeleteRequest req = new
DeleteRequest.Builder()
.index(config.getIndexName())
.index(index)
.id(documentId)
.build();

Expand All @@ -156,7 +156,7 @@ public boolean deleteDocument(String index, String documentId) throws IOExceptio
public boolean indexDocument(String index, String documentId, String documentSource) throws IOException {
final Map mapped = objectMapper.readValue(documentSource, Map.class);
final IndexRequest<Object> indexRequest = new IndexRequest.Builder<>()
.index(config.getIndexName())
.index(index)
.document(mapped)
.id(documentId)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import co.elastic.clients.transport.ElasticsearchTransport;
import com.fasterxml.jackson.core.JsonParseException;
Expand All @@ -43,6 +44,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.GenericObject;
Expand Down Expand Up @@ -152,6 +154,7 @@ public Object getNativeObject() {
});

when(mockRecord.getSchema()).thenAnswer((Answer<Schema<KeyValue<String, UserProfile>>>) invocation -> kvSchema);
when(mockRecord.getEventTime()).thenAnswer(invocation -> Optional.of(System.currentTimeMillis()));
}

@AfterMethod(alwaysRun = true)
Expand Down Expand Up @@ -209,6 +212,16 @@ public final void send100Test() throws Exception {
verify(mockRecord, times(100)).ack();
}

@Test
public final void send1WithFormattedIndexTest() throws Exception {
map.put("indexName", "test-formatted-index-%{+yyyy-MM-dd}");
sink.open(map, mockSinkContext);
send(1);
verify(mockRecord, times(1)).ack();
String value = getHitIdAtIndex("test-formatted-index-*", 0);
assertTrue(StringUtils.isNotBlank(value));
}

@Test
public final void sendNoSchemaTest() throws Exception {

Expand Down

0 comments on commit b86ebaa

Please sign in to comment.