From 3285447a73b0e58f667cb14445923ef9e7ac3ff2 Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Fri, 23 Feb 2024 12:15:50 +0200 Subject: [PATCH 1/4] bump versions Signed-off-by: Oleh Dokuka --- build.gradle | 8 ++++---- rsocket-core/build.gradle | 3 ++- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/build.gradle b/build.gradle index 62ee687da..ae8f05d06 100644 --- a/build.gradle +++ b/build.gradle @@ -33,10 +33,10 @@ subprojects { apply plugin: 'com.github.sherter.google-java-format' apply plugin: 'com.github.vlsi.gradle-extensions' - ext['reactor-bom.version'] = '2020.0.32' - ext['logback.version'] = '1.2.10' - ext['netty-bom.version'] = '4.1.93.Final' - ext['netty-boringssl.version'] = '2.0.61.Final' + ext['reactor-bom.version'] = '2020.0.39' + ext['logback.version'] = '1.3.14' + ext['netty-bom.version'] = '4.1.106.Final' + ext['netty-boringssl.version'] = '2.0.62.Final' ext['hdrhistogram.version'] = '2.1.12' ext['mockito.version'] = '4.11.0' ext['slf4j.version'] = '1.7.36' diff --git a/rsocket-core/build.gradle b/rsocket-core/build.gradle index 6f2056da0..d0aeca2b6 100644 --- a/rsocket-core/build.gradle +++ b/rsocket-core/build.gradle @@ -41,13 +41,14 @@ dependencies { testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine' jcstressImplementation(project(":rsocket-test")) + jcstressImplementation 'org.slf4j:slf4j-api' jcstressImplementation "ch.qos.logback:logback-classic" jcstressImplementation 'io.projectreactor:reactor-test' } jcstress { mode = 'quick' //quick, default, tough - jcstressDependency = "org.openjdk.jcstress:jcstress-core:0.15" + jcstressDependency = "org.openjdk.jcstress:jcstress-core:0.16" } jar { From 748effe78386dccb267d057f43dfc5854af1bf21 Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Fri, 23 Feb 2024 12:38:00 +0200 Subject: [PATCH 2/4] fixes memleaks Signed-off-by: Oleh Dokuka --- .../UnboundedProcessorStressTest.java | 53 ++++- .../java/io/rsocket/utils/FastLogger.java | 137 +++++++++++ .../rsocket/internal/UnboundedProcessor.java | 220 ++++++++++++++---- 3 files changed, 364 insertions(+), 46 deletions(-) create mode 100644 rsocket-core/src/jcstress/java/io/rsocket/utils/FastLogger.java diff --git a/rsocket-core/src/jcstress/java/io/rsocket/internal/UnboundedProcessorStressTest.java b/rsocket-core/src/jcstress/java/io/rsocket/internal/UnboundedProcessorStressTest.java index 2f5e51f0e..c55d8d8cf 100644 --- a/rsocket-core/src/jcstress/java/io/rsocket/internal/UnboundedProcessorStressTest.java +++ b/rsocket-core/src/jcstress/java/io/rsocket/internal/UnboundedProcessorStressTest.java @@ -3,6 +3,10 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.UnpooledByteBufAllocator; import io.rsocket.core.StressSubscriber; +import io.rsocket.utils.FastLogger; + +import java.util.Arrays; +import java.util.ConcurrentModificationException; import org.openjdk.jcstress.annotations.Actor; import org.openjdk.jcstress.annotations.Arbiter; import org.openjdk.jcstress.annotations.Expect; @@ -14,6 +18,7 @@ import org.openjdk.jcstress.infra.results.L_Result; import reactor.core.Fuseable; import reactor.core.publisher.Hooks; +import reactor.util.Logger; public abstract class UnboundedProcessorStressTest { @@ -21,7 +26,9 @@ public abstract class UnboundedProcessorStressTest { Hooks.onErrorDropped(t -> {}); } - final UnboundedProcessor unboundedProcessor = new UnboundedProcessor(); + final Logger logger = new FastLogger(getClass().getName()); + + final UnboundedProcessor unboundedProcessor = new UnboundedProcessor(logger); @JCStressTest @Outcome( @@ -145,6 +152,8 @@ public void arbiter(LLL_Result r) { stressSubscriber.values.forEach(ByteBuf::release); r.r3 = byteBuf1.refCnt() + byteBuf2.refCnt() + byteBuf3.refCnt() + byteBuf4.refCnt(); + + checkOutcomes(this, r.toString(), logger); } } @@ -270,6 +279,8 @@ public void arbiter(LLL_Result r) { stressSubscriber.values.forEach(ByteBuf::release); r.r3 = byteBuf1.refCnt() + byteBuf2.refCnt() + byteBuf3.refCnt() + byteBuf4.refCnt(); + + checkOutcomes(this, r.toString(), logger); } } @@ -375,6 +386,8 @@ public void arbiter(LLL_Result r) { stressSubscriber.values.forEach(ByteBuf::release); r.r3 = byteBuf1.refCnt() + byteBuf2.refCnt() + byteBuf3.refCnt() + byteBuf4.refCnt(); + + checkOutcomes(this, r.toString(), logger); } } @@ -476,6 +489,8 @@ public void arbiter(LLL_Result r) { stressSubscriber.values.forEach(ByteBuf::release); r.r3 = byteBuf1.refCnt() + byteBuf2.refCnt() + byteBuf3.refCnt() + byteBuf4.refCnt(); + + checkOutcomes(this, r.toString(), logger); } } @@ -578,6 +593,8 @@ public void arbiter(LLL_Result r) { stressSubscriber.values.forEach(ByteBuf::release); r.r3 = byteBuf1.refCnt() + byteBuf2.refCnt() + byteBuf3.refCnt() + byteBuf4.refCnt(); + + checkOutcomes(this, r.toString(), logger); } } @@ -701,6 +718,8 @@ public void arbiter(LLL_Result r) { stressSubscriber.values.forEach(ByteBuf::release); r.r3 = byteBuf1.refCnt() + byteBuf2.refCnt() + byteBuf3.refCnt() + byteBuf4.refCnt(); + + checkOutcomes(this, r.toString(), logger); } } @@ -781,6 +800,8 @@ public void arbiter(LLL_Result r) { stressSubscriber.values.forEach(ByteBuf::release); r.r3 = byteBuf1.refCnt() + byteBuf2.refCnt() + byteBuf3.refCnt() + byteBuf4.refCnt(); + + checkOutcomes(this, r.toString(), logger); } } @@ -837,9 +858,15 @@ public void arbiter(LLL_Result r) { + stressSubscriber.onErrorCalls * 2 + stressSubscriber.droppedErrors.size() * 3; + if (stressSubscriber.concurrentOnNext || stressSubscriber.concurrentOnComplete) { + throw new ConcurrentModificationException("boo"); + } + stressSubscriber.values.forEach(ByteBuf::release); r.r3 = byteBuf1.refCnt() + byteBuf2.refCnt() + byteBuf3.refCnt() + byteBuf4.refCnt(); + + checkOutcomes(this, r.toString(), logger); } } @@ -892,6 +919,8 @@ public void arbiter(LLL_Result r) { stressSubscriber.values.forEach(ByteBuf::release); r.r3 = byteBuf1.refCnt() + byteBuf2.refCnt() + byteBuf3.refCnt() + byteBuf4.refCnt(); + + checkOutcomes(this, r.toString(), logger); } } @@ -1107,6 +1136,8 @@ public void arbiter(LLLL_Result r) { stressSubscriber.values.forEach(ByteBuf::release); r.r4 = byteBuf1.refCnt() + byteBuf2.refCnt() + byteBuf3.refCnt() + byteBuf4.refCnt(); + + checkOutcomes(this, r.toString(), logger); } } @@ -1238,6 +1269,8 @@ public void arbiter(LLLL_Result r) { stressSubscriber.values.forEach(ByteBuf::release); r.r4 = byteBuf1.refCnt() + byteBuf2.refCnt() + byteBuf3.refCnt() + byteBuf4.refCnt(); + + checkOutcomes(this, r.toString(), logger); } } @@ -1390,6 +1423,8 @@ public void arbiter(LLLL_Result r) { stressSubscriber.values.forEach(ByteBuf::release); r.r4 = byteBuf1.refCnt() + byteBuf2.refCnt() + byteBuf3.refCnt() + byteBuf4.refCnt(); + + checkOutcomes(this, r.toString(), logger); } } @@ -1522,6 +1557,8 @@ public void arbiter(LLLL_Result r) { stressSubscriber.values.forEach(ByteBuf::release); r.r4 = byteBuf1.refCnt() + byteBuf2.refCnt() + byteBuf3.refCnt() + byteBuf4.refCnt(); + + checkOutcomes(this, r.toString(), logger); } } @@ -1587,6 +1624,8 @@ public void arbiter(LLLL_Result r) { stressSubscriber.values.forEach(ByteBuf::release); r.r4 = byteBuf1.refCnt() + byteBuf2.refCnt() + byteBuf3.refCnt() + byteBuf4.refCnt(); + + checkOutcomes(this, r.toString(), logger); } } @@ -1652,6 +1691,8 @@ public void arbiter(LLLL_Result r) { stressSubscriber.values.forEach(ByteBuf::release); r.r4 = byteBuf1.refCnt() + byteBuf2.refCnt() + byteBuf3.refCnt() + byteBuf4.refCnt(); + + checkOutcomes(this, r.toString(), logger); } } @@ -1678,6 +1719,16 @@ public void subscribe2() { @Arbiter public void arbiter(L_Result r) { r.r1 = stressSubscriber1.onErrorCalls + stressSubscriber2.onErrorCalls; + + checkOutcomes(this, r.toString(), logger); + } + } + + static void checkOutcomes(Object instance, String result, Logger logger) { + if (Arrays.stream(instance.getClass().getDeclaredAnnotationsByType(Outcome.class)) + .flatMap(o -> Arrays.stream(o.id())) + .noneMatch(s -> s.equalsIgnoreCase(result))) { + throw new RuntimeException(result + " " + logger); } } } diff --git a/rsocket-core/src/jcstress/java/io/rsocket/utils/FastLogger.java b/rsocket-core/src/jcstress/java/io/rsocket/utils/FastLogger.java new file mode 100644 index 000000000..c301d87cf --- /dev/null +++ b/rsocket-core/src/jcstress/java/io/rsocket/utils/FastLogger.java @@ -0,0 +1,137 @@ +package io.rsocket.utils; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import reactor.util.Logger; + +/** + * Implementation of {@link Logger} which is based on the {@link ThreadLocal} based queue which + * collects all the events on the per-thread basis.
Such logger is designed to have all events + * stored during the stress-test run and then sorted and printed out once all the Threads completed + * execution (inside the {@link org.openjdk.jcstress.annotations.Arbiter} annotated method.
+ * Note, this implementation only supports trace-level logs and ignores all others, it is intended + * to be used by {@link reactor.core.publisher.StateLogger}. + */ +public class FastLogger implements Logger { + + final Map> queues = new ConcurrentHashMap<>(); + + final ThreadLocal> logsQueueLocal = + ThreadLocal.withInitial( + () -> { + final ArrayList logs = new ArrayList<>(100); + queues.put(Thread.currentThread(), logs); + return logs; + }); + + private final String name; + + public FastLogger(String name) { + this.name = name; + } + + @Override + public String toString() { + return queues + .values() + .stream() + .flatMap(List::stream) + .sorted( + Comparator.comparingLong( + s -> { + Pattern pattern = Pattern.compile("\\[(.*?)]"); + Matcher matcher = pattern.matcher(s); + matcher.find(); + return Long.parseLong(matcher.group(1)); + })) + .collect(Collectors.joining("\n")); + } + + @Override + public String getName() { + return this.name; + } + + @Override + public boolean isTraceEnabled() { + return true; + } + + @Override + public void trace(String msg) { + logsQueueLocal.get().add(String.format("[%s] %s", System.nanoTime(), msg)); + } + + @Override + public void trace(String format, Object... arguments) { + trace(String.format(format, arguments)); + } + + @Override + public void trace(String msg, Throwable t) { + trace(String.format("%s, %s", msg, Arrays.toString(t.getStackTrace()))); + } + + @Override + public boolean isDebugEnabled() { + return false; + } + + @Override + public void debug(String msg) {} + + @Override + public void debug(String format, Object... arguments) {} + + @Override + public void debug(String msg, Throwable t) {} + + @Override + public boolean isInfoEnabled() { + return false; + } + + @Override + public void info(String msg) {} + + @Override + public void info(String format, Object... arguments) {} + + @Override + public void info(String msg, Throwable t) {} + + @Override + public boolean isWarnEnabled() { + return false; + } + + @Override + public void warn(String msg) {} + + @Override + public void warn(String format, Object... arguments) {} + + @Override + public void warn(String msg, Throwable t) {} + + @Override + public boolean isErrorEnabled() { + return false; + } + + @Override + public void error(String msg) {} + + @Override + public void error(String format, Object... arguments) {} + + @Override + public void error(String msg, Throwable t) {} +} diff --git a/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java b/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java index 23ada95fe..c96a7aed2 100644 --- a/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java +++ b/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java @@ -32,6 +32,7 @@ import reactor.core.Scannable; import reactor.core.publisher.Flux; import reactor.core.publisher.Operators; +import reactor.util.Logger; import reactor.util.annotation.Nullable; import reactor.util.concurrent.Queues; import reactor.util.context.Context; @@ -51,6 +52,7 @@ public final class UnboundedProcessor extends Flux final Queue queue; final Queue priorityQueue; final Runnable onFinalizedHook; + @Nullable final Logger logger; boolean cancelled; boolean done; @@ -99,10 +101,19 @@ public UnboundedProcessor() { this(() -> {}); } + UnboundedProcessor(Logger logger) { + this(() -> {}, logger); + } + public UnboundedProcessor(Runnable onFinalizedHook) { + this(onFinalizedHook, null); + } + + UnboundedProcessor(Runnable onFinalizedHook, @Nullable Logger logger) { this.onFinalizedHook = onFinalizedHook; this.queue = new MpscUnboundedArrayQueue<>(Queues.SMALL_BUFFER_SIZE); this.priorityQueue = new MpscUnboundedArrayQueue<>(Queues.SMALL_BUFFER_SIZE); + this.logger = logger; } @Override @@ -153,7 +164,7 @@ public boolean tryEmitPrioritized(ByteBuf t) { } if (hasRequest(previousState)) { - drainRegular(previousState); + drainRegular((previousState | FLAG_HAS_VALUE) + 1); } } return true; @@ -189,7 +200,7 @@ public boolean tryEmitNormal(ByteBuf t) { } if (hasRequest(previousState)) { - drainRegular(previousState); + drainRegular((previousState | FLAG_HAS_VALUE) + 1); } } @@ -223,9 +234,7 @@ public boolean tryEmitFinal(ByteBuf t) { return true; } - if (hasRequest(previousState)) { - drainRegular(previousState); - } + drainRegular((previousState | FLAG_TERMINATED | FLAG_HAS_VALUE) + 1); } return true; @@ -279,9 +288,7 @@ public void onError(Throwable t) { return; } - if (hasRequest(previousState)) { - drainRegular(previousState); - } + drainRegular((previousState | FLAG_TERMINATED) + 1); } } @@ -318,18 +325,15 @@ public void onComplete() { return; } - if (hasRequest(previousState)) { - drainRegular(previousState); - } + drainRegular((previousState | FLAG_TERMINATED) + 1); } } - void drainRegular(long previousState) { + void drainRegular(long expectedState) { final CoreSubscriber a = this.actual; final Queue q = this.queue; final Queue pq = this.priorityQueue; - long expectedState = previousState + 1; for (; ; ) { long r = this.requested; @@ -351,7 +355,7 @@ void drainRegular(long previousState) { empty = t == null; } - if (checkTerminated(done, empty, a)) { + if (checkTerminated(done, empty, true, a)) { if (!empty) { release(t); } @@ -374,7 +378,7 @@ void drainRegular(long previousState) { done = this.done; empty = q.isEmpty() && pq.isEmpty(); - if (checkTerminated(done, empty, a)) { + if (checkTerminated(done, empty, false, a)) { return; } } @@ -401,7 +405,8 @@ void drainRegular(long previousState) { } } - boolean checkTerminated(boolean done, boolean empty, CoreSubscriber a) { + boolean checkTerminated( + boolean done, boolean empty, boolean hasDemand, CoreSubscriber a) { final long state = this.state; if (isCancelled(state)) { clearAndFinalize(this); @@ -415,8 +420,15 @@ boolean checkTerminated(boolean done, boolean empty, CoreSubscriber actual) { previousState = markSubscriberReady(this); + if (isSubscriberReady(previousState)) { + return; + } + if (this.outputFused) { if (isCancelled(previousState)) { return; @@ -523,7 +539,7 @@ public void subscribe(CoreSubscriber actual) { } if (hasRequest(previousState)) { - drainRegular(previousState); + drainRegular((previousState | FLAG_SUBSCRIBER_READY) + 1); } } @@ -549,7 +565,7 @@ public void request(long n) { } if (isSubscriberReady(previousState) && hasValue(previousState)) { - drainRegular(previousState); + drainRegular((previousState | FLAG_HAS_REQUEST) + 1); } } } @@ -727,7 +743,9 @@ static long markSubscribedOnce(UnboundedProcessor instance) { return state; } - if (STATE.compareAndSet(instance, state, state | FLAG_SUBSCRIBED_ONCE)) { + final long nextState = state | FLAG_SUBSCRIBED_ONCE; + if (STATE.compareAndSet(instance, state, nextState)) { + log(instance, " mso", state, nextState); return state; } } @@ -743,7 +761,10 @@ static long markSubscriberReady(UnboundedProcessor instance) { for (; ; ) { long state = instance.state; - if (isFinalized(state) || isCancelled(state) || isDisposed(state)) { + if (isFinalized(state) + || isCancelled(state) + || isDisposed(state) + || isSubscriberReady(state)) { return state; } @@ -754,7 +775,9 @@ static long markSubscriberReady(UnboundedProcessor instance) { } } - if (STATE.compareAndSet(instance, state, nextState | FLAG_SUBSCRIBER_READY)) { + nextState = nextState | FLAG_SUBSCRIBER_READY; + if (STATE.compareAndSet(instance, state, nextState)) { + log(instance, " msr", state, nextState); return state; } } @@ -776,11 +799,13 @@ static long markRequestAdded(UnboundedProcessor instance) { } long nextState = state; - if (isSubscriberReady(state) && hasValue(state)) { + if (isWorkInProgress(state) || (isSubscriberReady(state) && hasValue(state))) { nextState = addWork(state); } - if (STATE.compareAndSet(instance, state, nextState | FLAG_HAS_REQUEST)) { + nextState = nextState | FLAG_HAS_REQUEST; + if (STATE.compareAndSet(instance, state, nextState)) { + log(instance, " mra", state, nextState); return state; } } @@ -815,7 +840,9 @@ static long markValueAdded(UnboundedProcessor instance) { } } - if (STATE.compareAndSet(instance, state, nextState | FLAG_HAS_VALUE)) { + nextState = nextState | FLAG_HAS_VALUE; + if (STATE.compareAndSet(instance, state, nextState)) { + log(instance, " mva", state, nextState); return state; } } @@ -840,12 +867,12 @@ static long markValueAddedAndTerminated(UnboundedProcessor instance) { if (isWorkInProgress(state)) { nextState = addWork(state); } else if (isSubscriberReady(state) && !instance.outputFused) { - if (hasRequest(state)) { - nextState = addWork(state); - } + nextState = addWork(state); } - if (STATE.compareAndSet(instance, state, nextState | FLAG_HAS_VALUE | FLAG_TERMINATED)) { + nextState = nextState | FLAG_HAS_VALUE | FLAG_TERMINATED; + if (STATE.compareAndSet(instance, state, nextState)) { + log(instance, "mva&t", state, nextState); return state; } } @@ -867,16 +894,20 @@ static long markTerminatedOrFinalized(UnboundedProcessor instance) { } long nextState = state; - if (isSubscriberReady(state) && !instance.outputFused) { + if (isWorkInProgress(state)) { + nextState = addWork(state); + } else if (isSubscriberReady(state) && !instance.outputFused) { if (!hasValue(state)) { // fast path for no values and no work in progress nextState = FLAG_FINALIZED; - } else if (hasRequest(state)) { + } else { nextState = addWork(state); } } - if (STATE.compareAndSet(instance, state, nextState | FLAG_TERMINATED)) { + nextState = nextState | FLAG_TERMINATED; + if (STATE.compareAndSet(instance, state, nextState)) { + log(instance, " mt|f", state, nextState); if (isFinalized(nextState)) { instance.onFinalizedHook.run(); } @@ -899,8 +930,9 @@ static long markCancelled(UnboundedProcessor instance) { return state; } - final long nextState = addWork(state); - if (STATE.compareAndSet(instance, state, nextState | FLAG_CANCELLED)) { + final long nextState = addWork(state) | FLAG_CANCELLED; + if (STATE.compareAndSet(instance, state, nextState)) { + log(instance, " mc", state, nextState); return state; } } @@ -921,8 +953,9 @@ static long markDisposed(UnboundedProcessor instance) { return state; } - final long nextState = addWork(state); - if (STATE.compareAndSet(instance, state, nextState | FLAG_DISPOSED)) { + final long nextState = addWork(state) | FLAG_DISPOSED; + if (STATE.compareAndSet(instance, state, nextState)) { + log(instance, " md", state, nextState); return state; } } @@ -945,12 +978,10 @@ static long addWork(long state) { */ static long markWorkDone( UnboundedProcessor instance, long expectedState, boolean hasRequest, boolean hasValue) { - final long expectedMissed = expectedState & MAX_WIP_VALUE; for (; ; ) { final long state = instance.state; - final long missed = state & MAX_WIP_VALUE; - if (missed != expectedMissed) { + if (state != expectedState) { return state; } @@ -958,11 +989,12 @@ static long markWorkDone( return state; } - final long nextState = state - expectedMissed; - if (STATE.compareAndSet( - instance, - state, - nextState ^ (hasRequest ? 0 : FLAG_HAS_REQUEST) ^ (hasValue ? 0 : FLAG_HAS_VALUE))) { + final long nextState = + (state - (expectedState & MAX_WIP_VALUE)) + ^ (hasRequest ? 0 : FLAG_HAS_REQUEST) + ^ (hasValue ? 0 : FLAG_HAS_VALUE); + if (STATE.compareAndSet(instance, state, nextState)) { + log(instance, " mwd", state, nextState); return nextState; } } @@ -991,8 +1023,9 @@ static void clearAndFinalize(UnboundedProcessor instance) { instance.clearUnsafely(); } - if (STATE.compareAndSet( - instance, state, (state & ~MAX_WIP_VALUE & ~FLAG_HAS_VALUE) | FLAG_FINALIZED)) { + long nextState = (state & ~MAX_WIP_VALUE & ~FLAG_HAS_VALUE) | FLAG_FINALIZED; + if (STATE.compareAndSet(instance, state, nextState)) { + log(instance, " c&f", state, nextState); instance.onFinalizedHook.run(); break; } @@ -1034,4 +1067,101 @@ static boolean isSubscriberReady(long state) { static boolean isSubscribedOnce(long state) { return (state & FLAG_SUBSCRIBED_ONCE) == FLAG_SUBSCRIBED_ONCE; } + + static void log( + UnboundedProcessor instance, String action, long initialState, long committedState) { + log(instance, action, initialState, committedState, false); + } + + static void log( + UnboundedProcessor instance, + String action, + long initialState, + long committedState, + boolean logStackTrace) { + Logger logger = instance.logger; + if (logger == null || !logger.isTraceEnabled()) { + return; + } + + if (logStackTrace) { + logger.trace( + String.format( + "[%s][%s][%s][%s-%s]", + instance, + action, + action, + Thread.currentThread().getId(), + formatState(initialState, 64), + formatState(committedState, 64)), + new RuntimeException()); + } else { + logger.trace( + String.format( + "[%s][%s][%s][%s-%s]", + instance, + action, + Thread.currentThread().getId(), + formatState(initialState, 64), + formatState(committedState, 64))); + } + } + + static void log( + UnboundedProcessor instance, String action, int initialState, int committedState) { + log(instance, action, initialState, committedState, false); + } + + static void log( + UnboundedProcessor instance, + String action, + int initialState, + int committedState, + boolean logStackTrace) { + Logger logger = instance.logger; + if (logger == null || !logger.isTraceEnabled()) { + return; + } + + if (logStackTrace) { + logger.trace( + String.format( + "[%s][%s][%s][%s-%s]", + instance, + action, + action, + Thread.currentThread().getId(), + formatState(initialState, 32), + formatState(committedState, 32)), + new RuntimeException()); + } else { + logger.trace( + String.format( + "[%s][%s][%s][%s-%s]", + instance, + action, + Thread.currentThread().getId(), + formatState(initialState, 32), + formatState(committedState, 32))); + } + } + + static String formatState(long state, int size) { + final String defaultFormat = Long.toBinaryString(state); + final StringBuilder formatted = new StringBuilder(); + final int toPrepend = size - defaultFormat.length(); + for (int i = 0; i < size; i++) { + if (i != 0 && i % 4 == 0) { + formatted.append("_"); + } + if (i < toPrepend) { + formatted.append("0"); + } else { + formatted.append(defaultFormat.charAt(i - toPrepend)); + } + } + + formatted.insert(0, "0b"); + return formatted.toString(); + } } From a0eee7bdbbe2dd0730d56639f3225d071d40f189 Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Fri, 23 Feb 2024 12:38:00 +0200 Subject: [PATCH 3/4] fix gjf Signed-off-by: Oleh Dokuka --- .../io/rsocket/internal/UnboundedProcessorStressTest.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/rsocket-core/src/jcstress/java/io/rsocket/internal/UnboundedProcessorStressTest.java b/rsocket-core/src/jcstress/java/io/rsocket/internal/UnboundedProcessorStressTest.java index c55d8d8cf..a2d9fcf4d 100644 --- a/rsocket-core/src/jcstress/java/io/rsocket/internal/UnboundedProcessorStressTest.java +++ b/rsocket-core/src/jcstress/java/io/rsocket/internal/UnboundedProcessorStressTest.java @@ -4,7 +4,6 @@ import io.netty.buffer.UnpooledByteBufAllocator; import io.rsocket.core.StressSubscriber; import io.rsocket.utils.FastLogger; - import java.util.Arrays; import java.util.ConcurrentModificationException; import org.openjdk.jcstress.annotations.Actor; @@ -1726,8 +1725,8 @@ public void arbiter(L_Result r) { static void checkOutcomes(Object instance, String result, Logger logger) { if (Arrays.stream(instance.getClass().getDeclaredAnnotationsByType(Outcome.class)) - .flatMap(o -> Arrays.stream(o.id())) - .noneMatch(s -> s.equalsIgnoreCase(result))) { + .flatMap(o -> Arrays.stream(o.id())) + .noneMatch(s -> s.equalsIgnoreCase(result))) { throw new RuntimeException(result + " " + logger); } } From bcd94e9149a1c6ad66caf5ba6cd05411cd5bee4d Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Fri, 23 Feb 2024 13:01:59 +0200 Subject: [PATCH 4/4] change stress mode to sanity Signed-off-by: Oleh Dokuka --- rsocket-core/build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rsocket-core/build.gradle b/rsocket-core/build.gradle index d0aeca2b6..da5b69b14 100644 --- a/rsocket-core/build.gradle +++ b/rsocket-core/build.gradle @@ -47,7 +47,7 @@ dependencies { } jcstress { - mode = 'quick' //quick, default, tough + mode = 'sanity' //sanity, quick, default, tough jcstressDependency = "org.openjdk.jcstress:jcstress-core:0.16" }