diff --git a/elasticsearch/src/main/java/com/gentics/mesh/search/verticle/ElasticsearchProcessVerticle.java b/elasticsearch/src/main/java/com/gentics/mesh/search/verticle/ElasticsearchProcessVerticle.java index a2a8651a73..486ab75b61 100644 --- a/elasticsearch/src/main/java/com/gentics/mesh/search/verticle/ElasticsearchProcessVerticle.java +++ b/elasticsearch/src/main/java/com/gentics/mesh/search/verticle/ElasticsearchProcessVerticle.java @@ -8,6 +8,7 @@ import static com.gentics.mesh.search.verticle.eventhandler.RxUtil.retryWithDelay; import static com.gentics.mesh.search.verticle.eventhandler.Util.logElasticSearchError; +import java.net.SocketTimeoutException; import java.time.Duration; import java.util.List; import java.util.Objects; @@ -386,6 +387,7 @@ private Flowable generateRequests(MessageEvent messageE return Flowable.empty(); } try { + AtomicInteger retried = new AtomicInteger(options.getRetryLimit()); return this.mainEventhandler.handle(messageEvent) .doOnNext(request -> { if (log.isTraceEnabled()) { @@ -393,11 +395,18 @@ private Flowable generateRequests(MessageEvent messageE } idleChecker.addAndGetRequests(request.requestCount()); }) + .doOnError(err -> logElasticSearchError(err, () -> { + if (err instanceof SocketTimeoutException && retried.get() > 0) { + log.info("Transforming event " + messageEvent.event + " process timed out on retry #" + retried.getAndDecrement(), err); + idleChecker.decrementAndGetTransformations(); + } else { + log.error("Error transforming event " + messageEvent.event, err); + } + })) .retryWhen(retryWithDelay( Duration.ofMillis(options.getRetryInterval()), options.getRetryLimit())) - .doOnComplete( - () -> log.trace("Done transforming event {}. Transformations pending: {}", messageEvent.event, idleChecker.getTransformations())) + .doOnComplete(() -> log.trace("Done transforming event {}. Transformations pending: {}", messageEvent.event, idleChecker.getTransformations())) .doOnTerminate(idleChecker::decrementAndGetTransformations); } catch (Exception e) { // For safety to keep the verticle always running