Skip to content

Commit

Permalink
Support elasticsearch.byte_size (#1852)
Browse files Browse the repository at this point in the history
* Modified code for `byte_size` parameter

1. Sending byte size limit parameter to bulk processor
2. Added a condition to check if total byte size of bulk request has reached byte
 size limit and accordingly sending bulk request

* Refactor the code of the bulk framework

The bulk request is now aware of its own limits with `FsCrawlerBulkRequest#isOverTheLimit()`.
The bulk request computes itself its current size when adding a new entity (`FsCrawlerBulkRequest#add(T request)`).

We are adding a lot of unit tests for both `FsCrawlerBulkRequest` and the `FsCrawlerBulkProcessor`.

Closes #1835.

---------

Co-authored-by: kamal-sharma <[email protected]>
  • Loading branch information
dadoonet and kamalsharma9001 authored Mar 29, 2024
1 parent c35bc18 commit 43dc79a
Show file tree
Hide file tree
Showing 12 changed files with 394 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ public void start() throws ElasticsearchClientException {
ElasticsearchBulkRequest::new)
.setBulkActions(settings.getElasticsearch().getBulkSize())
.setFlushInterval(settings.getElasticsearch().getFlushInterval())
.setByteSize(settings.getElasticsearch().getByteSize())
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package fr.pilato.elasticsearch.crawler.fs.framework.bulk;

import fr.pilato.elasticsearch.crawler.fs.framework.ByteSizeValue;
import fr.pilato.elasticsearch.crawler.fs.framework.TimeValue;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -43,6 +44,7 @@ public class FsCrawlerBulkProcessor<
private static final Logger logger = LogManager.getLogger(FsCrawlerBulkProcessor.class);

private final int bulkActions;
private final ByteSizeValue byteSize;
private final Listener<O, Req, Res> listener;
private final Engine<O, Req, Res> engine;
private Req bulkRequest;
Expand All @@ -55,12 +57,14 @@ public FsCrawlerBulkProcessor(Engine<O, Req, Res> engine,
Listener<O, Req, Res> listener,
int bulkActions,
TimeValue flushInterval,
ByteSizeValue byteSize,
Supplier<Req> requestSupplier) {
this.engine = engine;
this.listener = listener;
this.bulkActions = bulkActions;
this.byteSize = byteSize;
this.requestSupplier = requestSupplier;
this.bulkRequest = requestSupplier.get();
this.bulkRequest = supplyRequestWithLimits(requestSupplier, bulkActions, byteSize);
this.listener.setBulkProcessor(this);

if (flushInterval != null) {
Expand Down Expand Up @@ -119,10 +123,10 @@ private void ensureOpen() {
throw new IllegalStateException("bulk process already closed");
}
}

private void executeIfNeeded() {
ensureOpen();
if (isOverTheLimit()) {
if (bulkRequest.isOverTheLimit()) {
execute();
}
}
Expand All @@ -136,7 +140,7 @@ private void executeWhenNeeded() {

private void execute() {
final Req bulkRequest = this.bulkRequest;
this.bulkRequest = requestSupplier.get();
this.bulkRequest = supplyRequestWithLimits(requestSupplier, bulkActions, byteSize);
final long executionId = executionIdGen.incrementAndGet();

// execute in a blocking fashion...
Expand All @@ -153,10 +157,13 @@ private void execute() {
}
}

private boolean isOverTheLimit() {
return (bulkActions != -1) && (bulkRequest.numberOfActions() >= bulkActions);
Req supplyRequestWithLimits(Supplier<Req> requestSupplier, int bulkActions, ByteSizeValue byteSize) {
Req bulkRequest = requestSupplier.get();
bulkRequest.maxNumberOfActions(bulkActions);
bulkRequest.maxBulkSize(byteSize);
return bulkRequest;
}

public Listener<O, Req, Res> getListener() {
return listener;
}
Expand All @@ -171,6 +178,7 @@ public static class Builder<O extends FsCrawlerOperation<O>,

private int bulkActions;
private TimeValue flushInterval;
private ByteSizeValue byteSize;
private final Engine<O, Req, Res> engine;
private final Listener<O, Req, Res> listener;
private final Supplier<Req> requestSupplier;
Expand All @@ -190,9 +198,14 @@ public Builder<O, Req, Res> setFlushInterval(TimeValue flushInterval) {
this.flushInterval = flushInterval;
return this;
}


public Builder<O, Req, Res> setByteSize(ByteSizeValue byteSizeValue) {
this.byteSize = byteSizeValue;
return this;
}

public FsCrawlerBulkProcessor<O, Req, Res> build() {
return new FsCrawlerBulkProcessor<>(engine, listener, bulkActions, flushInterval, requestSupplier);
return new FsCrawlerBulkProcessor<>(engine, listener, bulkActions, flushInterval, byteSize, requestSupplier);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,60 @@

package fr.pilato.elasticsearch.crawler.fs.framework.bulk;

import fr.pilato.elasticsearch.crawler.fs.framework.ByteSizeValue;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.List;

import static fr.pilato.elasticsearch.crawler.fs.framework.JsonUtil.serialize;

public abstract class FsCrawlerBulkRequest<T extends FsCrawlerOperation<T>> {

private final Logger logger = LogManager.getLogger(FsCrawlerBulkRequest.class);

private final List<T> operations = new ArrayList<>();
private int totalByteSize = 0;
private int maxNumberOfActions;
private ByteSizeValue maxBulkSize;

public int numberOfActions() {
return operations.size();
}

public int totalByteSize() {
return totalByteSize;
}

public void maxNumberOfActions(int maxNumberOfActions) {
this.maxNumberOfActions = maxNumberOfActions;
}

public void maxBulkSize(ByteSizeValue maxBulkSize) {
this.maxBulkSize = maxBulkSize;
}

public void add(T request) {
operations.add(request);
// There's a cost of serializing the request. We need to take it into account
// and only compute the size if we need to.
if (maxBulkSize != null && maxBulkSize.getBytes() > 0) {
// TODO may be we should just add the serialized request to the T object as an optional payload?
String jsonValue = serialize(request);
byte[] bytes = jsonValue.getBytes();
totalByteSize += bytes.length;
}
}

public List<T> getOperations() {
return operations;
}

boolean isOverTheLimit() {
logger.trace("Checking if we need to flush the bulk processor: [{}] >= [{}] actions, [{}] >= [{}] bytes",
numberOfActions(), maxNumberOfActions, totalByteSize(), maxBulkSize != null ? maxBulkSize.getBytes() : null);
return (maxBulkSize != null && maxBulkSize.getBytes() > 0 && totalByteSize >= maxBulkSize.getBytes()) ||
(maxNumberOfActions > 0 && numberOfActions() >= maxNumberOfActions);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* Licensed to David Pilato under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package fr.pilato.elasticsearch.crawler.fs.framework.bulk;

import fr.pilato.elasticsearch.crawler.fs.framework.ByteSizeUnit;
import fr.pilato.elasticsearch.crawler.fs.framework.ByteSizeValue;
import fr.pilato.elasticsearch.crawler.fs.framework.TimeValue;
import fr.pilato.elasticsearch.crawler.fs.test.framework.AbstractFSCrawlerTestCase;
import org.junit.Test;

import java.io.IOException;

import static com.carrotsearch.randomizedtesting.RandomizedTest.randomIntBetween;
import static fr.pilato.elasticsearch.crawler.fs.framework.JsonUtil.serialize;
import static org.junit.Assert.*;

public class FsCrawlerBulkProcessorTest extends AbstractFSCrawlerTestCase {
private static final TestBean PAYLOAD = new TestBean("bar");
private static final int PAYLOAD_SIZE = serialize(PAYLOAD).getBytes().length + 12 /* for the json payload field overhead */;

@Test
public void testBulkProcessorMaxActions() throws IOException {
int maxActions = randomIntBetween(1, 1000);
TestBulkListener listener = new TestBulkListener();
FsCrawlerBulkProcessor<TestOperation, TestBulkRequest, TestBulkResponse> bulkProcessor =
new FsCrawlerBulkProcessor<>(
new TestEngine(),
listener,
maxActions,
null,
new ByteSizeValue(1, ByteSizeUnit.MB),
TestBulkRequest::new);

generatePayload(bulkProcessor, 1, maxActions - 1);
assertEquals(0, listener.nbSuccessfulExecutions);
generatePayload(bulkProcessor, maxActions, 1);
assertEquals(1, listener.nbSuccessfulExecutions);
generatePayload(bulkProcessor, maxActions + 1, 1);
bulkProcessor.close();
assertEquals(2, listener.nbSuccessfulExecutions);
}

@Test
public void testBulkProcessorNullSize() throws IOException {
int maxActions = randomIntBetween(1, 1000);
TestBulkListener listener = new TestBulkListener();
FsCrawlerBulkProcessor<TestOperation, TestBulkRequest, TestBulkResponse> bulkProcessor =
new FsCrawlerBulkProcessor<>(
new TestEngine(),
listener,
maxActions,
null,
null,
TestBulkRequest::new);

generatePayload(bulkProcessor, 1, maxActions - 1);
assertEquals(0, listener.nbSuccessfulExecutions);
generatePayload(bulkProcessor, maxActions, 1);
assertEquals(1, listener.nbSuccessfulExecutions);
generatePayload(bulkProcessor, maxActions + 1, 1);
bulkProcessor.close();
assertEquals(2, listener.nbSuccessfulExecutions);
}

@Test
public void testBulkProcessorZeroSize() throws IOException {
int maxActions = randomIntBetween(1, 1000);
TestBulkListener listener = new TestBulkListener();
FsCrawlerBulkProcessor<TestOperation, TestBulkRequest, TestBulkResponse> bulkProcessor =
new FsCrawlerBulkProcessor<>(
new TestEngine(),
listener,
maxActions,
null,
new ByteSizeValue(0, ByteSizeUnit.MB),
TestBulkRequest::new);

generatePayload(bulkProcessor, 1, maxActions - 1);
assertEquals(0, listener.nbSuccessfulExecutions);
generatePayload(bulkProcessor, maxActions, 1);
assertEquals(1, listener.nbSuccessfulExecutions);
generatePayload(bulkProcessor, maxActions + 1, 1);
bulkProcessor.close();
assertEquals(2, listener.nbSuccessfulExecutions);
}

@Test
public void testBulkProcessorMaxSize() throws IOException {
int maxActions = randomIntBetween(1, 1000);
TestBulkListener listener = new TestBulkListener();
FsCrawlerBulkProcessor<TestOperation, TestBulkRequest, TestBulkResponse> bulkProcessor =
new FsCrawlerBulkProcessor<>(
new TestEngine(),
listener,
0,
null,
new ByteSizeValue((long) maxActions * PAYLOAD_SIZE, ByteSizeUnit.BYTES),
TestBulkRequest::new);

generatePayload(bulkProcessor, 1, maxActions - 1);
assertEquals(0, listener.nbSuccessfulExecutions);
generatePayload(bulkProcessor, maxActions, 1);
assertEquals(1, listener.nbSuccessfulExecutions);
generatePayload(bulkProcessor, maxActions + 1, maxActions - 1);
assertEquals(1, listener.nbSuccessfulExecutions);
generatePayload(bulkProcessor, 2 * maxActions, 1);
assertEquals(2, listener.nbSuccessfulExecutions);
generatePayload(bulkProcessor, 2 * maxActions + 1, 1);
bulkProcessor.close();
assertEquals(3, listener.nbSuccessfulExecutions);
}

@Test
public void testBulkProcessorFlushInterval() throws IOException, InterruptedException {
int maxActions = randomIntBetween(1, 1000);
TimeValue flushInterval = TimeValue.timeValueMillis(randomIntBetween(500, 2000));
TestBulkListener listener = new TestBulkListener();
FsCrawlerBulkProcessor<TestOperation, TestBulkRequest, TestBulkResponse> bulkProcessor =
new FsCrawlerBulkProcessor<>(new TestEngine(), listener, 0, flushInterval, null, TestBulkRequest::new);

// We don't load immediately the bulk processor
Thread.sleep(100);

generatePayload(bulkProcessor, 1, maxActions);
assertEquals(0, listener.nbSuccessfulExecutions);

Thread.sleep(flushInterval.millis());

assertEquals(1, listener.nbSuccessfulExecutions);
bulkProcessor.close();
assertEquals(1, listener.nbSuccessfulExecutions);
}

private void generatePayload(FsCrawlerBulkProcessor<TestOperation, TestBulkRequest, TestBulkResponse> bulkProcessor, int start, int size) {
for (int i = start; i < start + size; i++) {
logger.trace("Adding a new operation [{}]", i);
bulkProcessor.add(new TestOperation(PAYLOAD));
}
}

}
Loading

0 comments on commit 43dc79a

Please sign in to comment.