From 36bd165adf06854f3454f8a3fc2c960b5bcc325c Mon Sep 17 00:00:00 2001 From: Serhii Plyhun Date: Mon, 4 Jul 2022 13:18:34 +0200 Subject: [PATCH] Make use of timeout attempts --- .../mesh/search/verticle/ElasticsearchProcessVerticle.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/elasticsearch/src/main/java/com/gentics/mesh/search/verticle/ElasticsearchProcessVerticle.java b/elasticsearch/src/main/java/com/gentics/mesh/search/verticle/ElasticsearchProcessVerticle.java index 370055288e..a447b758e3 100644 --- a/elasticsearch/src/main/java/com/gentics/mesh/search/verticle/ElasticsearchProcessVerticle.java +++ b/elasticsearch/src/main/java/com/gentics/mesh/search/verticle/ElasticsearchProcessVerticle.java @@ -388,6 +388,7 @@ private Flowable generateRequests(MessageEvent messageE return Flowable.empty(); } try { + AtomicInteger retried = new AtomicInteger(options.getRetryLimit()); return this.mainEventhandler.handle(messageEvent) .doOnNext(request -> { if (log.isTraceEnabled()) { @@ -396,9 +397,8 @@ private Flowable generateRequests(MessageEvent messageE idleChecker.addAndGetRequests(request.requestCount()); }) .doOnError(err -> logElasticSearchError(err, () -> { - if (err instanceof SocketTimeoutException - || (err instanceof CompositeException && ((CompositeException) err).getExceptions().stream().anyMatch(SocketTimeoutException.class::isInstance))) { - log.info("Transforming event " + messageEvent.event + " process timed out", err); + if (err instanceof SocketTimeoutException && retried.get() > 0) { + log.info("Transforming event " + messageEvent.event + " process timed out on retry #" + retried.getAndDecrement(), err); idleChecker.decrementAndGetTransformations(); } else { log.error("Error transforming event " + messageEvent.event, err);