diff --git a/logstash-core/src/main/java/org/logstash/plugins/pipeline/PipelineBusV2.java b/logstash-core/src/main/java/org/logstash/plugins/pipeline/PipelineBusV2.java index 6626641a181..082b3bc3c92 100644 --- a/logstash-core/src/main/java/org/logstash/plugins/pipeline/PipelineBusV2.java +++ b/logstash-core/src/main/java/org/logstash/plugins/pipeline/PipelineBusV2.java @@ -141,16 +141,20 @@ public AddressState.ReadOnly mutate(final String address, consumer.accept(addressState); - // If this addressState has a listener, ensure that any waiting + return addressState.isEmpty() ? null : addressState; + }); + + if (result == null) { + return null; + } else { + // If the resulting addressState had a listener, ensure that any waiting // threads get notified so that they can resume immediately - final PipelineInput currentInput = addressState.getInput(); + final PipelineInput currentInput = result.getInput(); if (currentInput != null) { synchronized (currentInput) { currentInput.notifyAll(); } } - - return addressState.isEmpty() ? null : addressState; - }); - return result == null ? null : result.getReadOnlyView(); + return result.getReadOnlyView(); + } } private AddressState.ReadOnly get(final String address) { diff --git a/logstash-core/src/test/java/org/logstash/plugins/pipeline/PipelineBusTest.java b/logstash-core/src/test/java/org/logstash/plugins/pipeline/PipelineBusTest.java index 78f7c22acf8..268ed8d0949 100644 --- a/logstash-core/src/test/java/org/logstash/plugins/pipeline/PipelineBusTest.java +++ b/logstash-core/src/test/java/org/logstash/plugins/pipeline/PipelineBusTest.java @@ -23,6 +23,7 @@ import org.junit.Test; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -30,8 +31,16 @@ import org.logstash.ext.JrubyEventExtLibrary; import java.time.Duration; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.LongAdder; import java.util.stream.Stream; @@ -307,6 +316,56 @@ public void whenInBlockingModeInputsShutdownLast() throws InterruptedException { assertThat(bus.getAddressState(address)).isNotPresent(); } + @Test + public void blockingShutdownDeadlock() throws InterruptedException { + final ExecutorService executor = Executors.newFixedThreadPool(10); + try { + for (int i = 0; i < 100; i++) { + bus.registerSender(output, addresses); + bus.listen(input, address); + bus.setBlockOnUnlisten(true); + + // we use a CountDownLatch to increase the likelihood + // of simultaneous execution + final CountDownLatch startLatch = new CountDownLatch(2); + final CompletableFuture unlistenFuture = CompletableFuture.runAsync(asRunnable(() -> { + startLatch.countDown(); + startLatch.await(); + bus.unlisten(input, address); + }), executor); + final CompletableFuture unregisterFuture = CompletableFuture.runAsync(asRunnable(() -> { + startLatch.countDown(); + startLatch.await(); + bus.unregisterSender(output, addresses); + }), executor); + + // ensure that our tasks all exit successfully, quickly + assertThatCode(() -> CompletableFuture.allOf(unlistenFuture, unregisterFuture).get(1, TimeUnit.SECONDS)) + .withThreadDumpOnError() + .withFailMessage("Expected unlisten and unregisterSender to not deadlock, but they did not return in a reasonable amount of time in the <%s>th iteration", i) + .doesNotThrowAnyException(); + } + } finally { + executor.shutdownNow(); + } + } + + @FunctionalInterface + interface ExceptionalRunnable { + void run() throws E; + } + + private Runnable asRunnable(final ExceptionalRunnable exceptionalRunnable) { + return () -> { + try { + exceptionalRunnable.run(); + } catch (Throwable e) { + throw new RuntimeException(e); + } + }; + } + + @Test public void whenInputFailsOutputRetryOnlyNotYetDelivered() throws InterruptedException { bus.registerSender(output, addresses);