Skip to content

Commit

Permalink
Replace deprecated reactor processors
Browse files Browse the repository at this point in the history
See https://projectreactor.io/docs/core/release/reference/#processors

"Since 3.4.0, sinks become the first class citizen and Processor are being phased out entirely"

Signed-off-by: Jakub Zalas <[email protected]>
  • Loading branch information
jakzal committed Sep 3, 2021
1 parent 4557ecd commit e2c146d
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@
import io.vlingo.xoom.wire.channel.ResponseSenderChannel;
import io.vlingo.xoom.wire.message.ConsumerByteBuffer;
import io.vlingo.xoom.wire.message.ConsumerByteBufferPool;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.UnicastProcessor;
import reactor.core.publisher.Sinks;

@SuppressWarnings("deprecation")
class RSocketChannelContext implements RequestResponseContext<FluxSink<ConsumerByteBuffer>> {
private final RequestChannelConsumer consumer;
private final Logger logger;
private final ConsumerByteBufferPool readBufferPool;
private final UnicastProcessor<Payload> processor;
private final Sinks.Many<Payload> sink;
private Object closingData;
private Object consumerData;

Expand All @@ -34,11 +34,11 @@ class RSocketChannelContext implements RequestResponseContext<FluxSink<ConsumerB
this.readBufferPool = new ConsumerByteBufferPool(
ElasticResourcePool.Config.of(maxBufferPoolSize), maxMessageSize);

processor = UnicastProcessor.create();
sink = Sinks.many().unicast().onBackpressureBuffer();
}

UnicastProcessor<Payload> processor() {
return processor;
Flux<Payload> flux() {
return sink.asFlux();
}

@Override
Expand Down Expand Up @@ -97,11 +97,10 @@ public void consume(Payload request) {
@Override
public void abandon() {
close();
processor.dispose();
}

@Override
public void respondWith(final ConsumerByteBuffer buffer) {
processor.onNext(ByteBufPayload.create(buffer.asByteBuffer()));
sink.emitNext(ByteBufPayload.create(buffer.asByteBuffer()), Sinks.EmitFailureHandler.FAIL_FAST);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,11 @@
import io.vlingo.xoom.wire.message.ConsumerByteBuffer;
import io.vlingo.xoom.wire.message.ConsumerByteBufferPool;
import io.vlingo.xoom.wire.node.Address;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Sinks;
import reactor.util.retry.Retry;

@SuppressWarnings("deprecation")
public class RSocketClientChannel implements ClientRequestResponseChannel {
private final EmitterProcessor<Payload> publisher;
private final Sinks.Many<Payload> sink;
private final Logger logger;
private final ChannelResponseHandler responseHandler;
private final Address address;
Expand All @@ -44,7 +43,7 @@ public RSocketClientChannel(final ClientTransport clientTransport, final Address

public RSocketClientChannel(final ClientTransport clientTransport, final Address address, final ResponseChannelConsumer consumer, final int maxBufferPoolSize,
final int maxMessageSize, final Logger logger, final Duration connectionTimeout) {
this.publisher = EmitterProcessor.create();
this.sink = Sinks.many().multicast().onBackpressureBuffer();
this.logger = logger;
this.address = address;
this.connectionTimeout = connectionTimeout;
Expand Down Expand Up @@ -73,7 +72,7 @@ public void requestWith(final ByteBuffer buffer) {
data.put(buffer);
data.flip();

this.publisher.onNext(ByteBufPayload.create(data));
this.sink.emitNext(ByteBufPayload.create(data), Sinks.EmitFailureHandler.FAIL_FAST);
} else {
logger.debug("RSocket client channel for {} not ready. Message dropped", this.address);
}
Expand All @@ -92,23 +91,17 @@ private void prepareChannel() {
.payloadDecoder(PayloadDecoder.ZERO_COPY)
.connect(transport)
.timeout(this.connectionTimeout)
.doOnError(throwable -> {
logger.error("Failed to create RSocket client channel for address {}", this.address, throwable);
})
.doOnError(throwable -> logger.error("Failed to create RSocket client channel for address {}", this.address, throwable))
.block();

if (this.channelSocket != null) {
this.channelSocket.requestChannel(this.publisher)
this.channelSocket.requestChannel(this.sink.asFlux())
.retryWhen(Retry.indefinitely()
.filter(throwable -> throwable instanceof ApplicationErrorException)
.doBeforeRetry(retrySignal -> {
logger.debug("RSocket client channel for address {} received a retry-able error", this.address, retrySignal.failure());
})
.doBeforeRetry(retrySignal -> logger.debug("RSocket client channel for address {} received a retry-able error", this.address, retrySignal.failure()))
)
.subscribe(responseHandler::handle, //process server response
throwable -> {
logger.error("RSocket client channel for address {} received unrecoverable error", this.address, throwable);
});
throwable -> logger.error("RSocket client channel for address {} received unrecoverable error", this.address, throwable));

logger.info("RSocket client channel opened for address {}", this.address);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public RSocketServerChannelActor(final RequestChannelConsumerProvider provider,
.doOnError((throwable) -> logger().error("Unexpected error when consuming channel request", throwable))
.subscribe();

return Flux.from(context.processor());
return context.flux();
}))
.bind(serverTransport)
.block();
Expand Down

0 comments on commit e2c146d

Please sign in to comment.