Skip to content
This repository has been archived by the owner on Jun 21, 2023. It is now read-only.

Commit

Permalink
Reduced object allocations in websocket gateway client (#193)
Browse files Browse the repository at this point in the history
* Reduced object allocations in websocket gateway client
* Removed Optional
  • Loading branch information
artem-v authored Nov 26, 2021
1 parent 54bd46e commit 99f7621
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,36 +108,32 @@ private WebsocketGatewayClient(

@Override
public Mono<ServiceMessage> requestResponse(ServiceMessage request) {
return Mono.defer(
() -> {
long sid = sidCounter.incrementAndGet();
return getOrConnect()
.flatMap(
session ->
session
.send(encodeRequest(request, sid))
.doOnSubscribe(s -> LOGGER.debug("Sending request {}", request))
.then(session.<ServiceMessage>newMonoProcessor(sid).asMono())
.doOnCancel(() -> session.cancel(sid, request.qualifier()))
.doFinally(s -> session.removeProcessor(sid)));
});
return getOrConnect()
.flatMap(
session -> {
long sid = sidCounter.incrementAndGet();
return session
.send(encodeRequest(request, sid))
.doOnSubscribe(s -> LOGGER.debug("Sending request {}", request))
.then(session.<ServiceMessage>newMonoProcessor(sid).asMono())
.doOnCancel(() -> session.cancel(sid, request.qualifier()))
.doFinally(s -> session.removeProcessor(sid));
});
}

@Override
public Flux<ServiceMessage> requestStream(ServiceMessage request) {
return Flux.defer(
() -> {
long sid = sidCounter.incrementAndGet();
return getOrConnect()
.flatMapMany(
session ->
session
.send(encodeRequest(request, sid))
.doOnSubscribe(s -> LOGGER.debug("Sending request {}", request))
.thenMany(session.<ServiceMessage>newUnicastProcessor(sid).asFlux())
.doOnCancel(() -> session.cancel(sid, request.qualifier()))
.doFinally(s -> session.removeProcessor(sid)));
});
return getOrConnect()
.flatMapMany(
session -> {
long sid = sidCounter.incrementAndGet();
return session
.send(encodeRequest(request, sid))
.doOnSubscribe(s -> LOGGER.debug("Sending request {}", request))
.thenMany(session.<ServiceMessage>newUnicastProcessor(sid).asFlux())
.doOnCancel(() -> session.cancel(sid, request.qualifier()))
.doFinally(s -> session.removeProcessor(sid));
});
}

@Override
Expand All @@ -161,7 +157,7 @@ private Mono<Void> doClose() {

private Mono<WebsocketGatewayClientSession> getOrConnect() {
// noinspection unchecked
return Mono.defer(() -> websocketMonoUpdater.updateAndGet(this, this::getOrConnect0));
return websocketMonoUpdater.updateAndGet(this, this::getOrConnect0);
}

private Mono<WebsocketGatewayClientSession> getOrConnect0(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@
import io.scalecube.services.transport.api.ReferenceCountUtil;
import java.nio.channels.ClosedChannelException;
import java.util.Map;
import java.util.Optional;
import java.util.StringJoiner;
import org.jctools.maps.NonBlockingHashMapLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.Many;
import reactor.core.publisher.Sinks.One;
import reactor.netty.Connection;
import reactor.netty.http.websocket.WebsocketInbound;
import reactor.netty.http.websocket.WebsocketOutbound;
Expand Down Expand Up @@ -59,22 +60,26 @@ public final class WebsocketGatewayClientSession {
try {
message = codec.decode(byteBuf);
} catch (Exception ex) {
LOGGER.error("Response decoder failed: " + ex);
LOGGER.error("Response decoder failed:", ex);
return;
}

// ignore messages w/o sid
if (!message.headers().containsKey(STREAM_ID)) {
LOGGER.error("Ignore response: {} with null sid, session={}", message, id);
Optional.ofNullable(message.data()).ifPresent(ReferenceCountUtil::safestRelease);
if (message.data() != null) {
ReferenceCountUtil.safestRelease(message.data());
}
return;
}

// processor?
long sid = Long.parseLong(message.header(STREAM_ID));
Object processor = inboundProcessors.get(sid);
if (processor == null) {
Optional.ofNullable(message.data()).ifPresent(ReferenceCountUtil::safestRelease);
if (message.data() != null) {
ReferenceCountUtil.safestRelease(message.data());
}
return;
}

Expand All @@ -88,24 +93,22 @@ public final class WebsocketGatewayClientSession {

@SuppressWarnings({"rawtypes", "unchecked"})
<T> Sinks.One<T> newMonoProcessor(long sid) {
return (Sinks.One)
inboundProcessors.computeIfAbsent(
sid,
key -> {
LOGGER.debug("Put sid={}, session={}", sid, id);
return Sinks.one();
});
return (Sinks.One) inboundProcessors.computeIfAbsent(sid, this::newMonoProcessor0);
}

@SuppressWarnings({"rawtypes", "unchecked"})
<T> Sinks.Many<T> newUnicastProcessor(long sid) {
return (Sinks.Many)
inboundProcessors.computeIfAbsent(
sid,
key -> {
LOGGER.debug("Put sid={}, session={}", sid, id);
return Sinks.many().unicast().onBackpressureBuffer();
});
return (Sinks.Many) inboundProcessors.computeIfAbsent(sid, this::newUnicastProcessor0);
}

private One<Object> newMonoProcessor0(long sid) {
LOGGER.debug("Put sid={}, session={}", sid, id);
return Sinks.one();
}

private Many<Object> newUnicastProcessor0(long sid) {
LOGGER.debug("Put sid={}, session={}", sid, id);
return Sinks.many().unicast().onBackpressureBuffer();
}

void removeProcessor(long sid) {
Expand All @@ -115,14 +118,7 @@ void removeProcessor(long sid) {
}

Mono<Void> send(ByteBuf byteBuf) {
return Mono.defer(
() -> {
// send with publisher (defer buffer cleanup to netty)
return connection
.outbound()
.sendObject(Mono.just(byteBuf).map(TextWebSocketFrame::new), f -> true)
.then();
});
return connection.outbound().sendObject(new TextWebSocketFrame(byteBuf)).then();
}

void cancel(long sid, String qualifier) {
Expand Down Expand Up @@ -158,25 +154,29 @@ public Mono<Void> onClose() {
}

private void handleResponse(ServiceMessage response, Object processor) {
LOGGER.debug("Handle response: {}, session={}", response, id);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Handle response: {}, session={}", response, id);
}

try {
Optional<Signal> signalOptional =
Optional.ofNullable(response.header(SIGNAL)).map(Signal::from);
Signal signal = null;
final String header = response.header(SIGNAL);

if (header != null) {
signal = Signal.from(header);
}

if (!signalOptional.isPresent()) {
if (signal == null) {
// handle normal response
emitNext(processor, response);
} else {
// handle completion signal
Signal signal = signalOptional.get();
if (signal == Signal.COMPLETE) {
emitComplete(processor);
}
if (signal == Signal.ERROR) {
// decode error data to retrieve real error cause
ServiceMessage errorMessage = codec.decodeData(response, ErrorData.class);
emitNext(processor, errorMessage);
emitNext(processor, codec.decodeData(response, ErrorData.class));
}
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
Expand Down Expand Up @@ -185,23 +184,22 @@ private void onRequest(WebsocketGatewaySession session, ServiceMessage request,
final long sid = getSid(request);
final AtomicBoolean receivedError = new AtomicBoolean(false);

final Flux<ServiceMessage> serviceStream = serviceCall.requestMany(request);
Flux<ServiceMessage> serviceStream = serviceCall.requestMany(request);
final String limitRate = request.header(RATE_LIMIT_FIELD);
serviceStream =
limitRate != null ? serviceStream.limitRate(Integer.parseInt(limitRate)) : serviceStream;

Disposable disposable =
session
.send(
Optional.ofNullable(request.header(RATE_LIMIT_FIELD))
.map(Integer::valueOf)
.map(serviceStream::limitRate)
.orElse(serviceStream)
.map(
response -> {
boolean isErrorResponse = response.isError();
if (isErrorResponse) {
receivedError.set(true);
}
return newResponseMessage(sid, response, isErrorResponse);
}))
serviceStream.map(
response -> {
boolean isErrorResponse = response.isError();
if (isErrorResponse) {
receivedError.set(true);
}
return newResponseMessage(sid, response, isErrorResponse);
}))
.doOnError(
th -> {
ReferenceCountUtil.safestRelease(request.data());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Predicate;
import org.jctools.maps.NonBlockingHashMapLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -22,6 +23,8 @@ public final class WebsocketGatewaySession implements GatewaySession {

private static final Logger LOGGER = LoggerFactory.getLogger(WebsocketGatewaySession.class);

private static final Predicate<Object> SEND_PREDICATE = f -> true;

private final Map<Long, Disposable> subscriptions = new NonBlockingHashMapLong<>(1024);

private final GatewaySessionHandler gatewayHandler;
Expand Down Expand Up @@ -118,7 +121,7 @@ public Mono<Void> send(Flux<ServiceMessage> messages) {
this, frame.content(), response, (Context) context);
return frame;
}),
f -> true)
SEND_PREDICATE)
.then()
.doOnError(th -> gatewayHandler.onError(this, th, (Context) context));
});
Expand Down Expand Up @@ -171,7 +174,9 @@ public boolean dispose(Long streamId) {
Disposable disposable = subscriptions.remove(streamId);
result = disposable != null;
if (result) {
LOGGER.debug("Dispose subscription by sid={}, session={}", streamId, sessionId);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Dispose subscription by sid={}, session={}", streamId, sessionId);
}
disposable.dispose();
}
}
Expand All @@ -188,24 +193,28 @@ public boolean containsSid(Long streamId) {
*
* @param streamId stream id
* @param disposable service subscription
* @return true if disposable subscription was stored
*/
public boolean register(Long streamId, Disposable disposable) {
public void register(Long streamId, Disposable disposable) {
boolean result = false;
if (!disposable.isDisposed()) {
result = subscriptions.putIfAbsent(streamId, disposable) == null;
}
if (result) {
LOGGER.debug("Registered subscription with sid={}, session={}", streamId, sessionId);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Registered subscription with sid={}, session={}", streamId, sessionId);
}
}
return result;
}

private void clearSubscriptions() {
if (subscriptions.size() > 1) {
LOGGER.debug("Clear all {} subscriptions on session={}", subscriptions.size(), sessionId);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Clear all {} subscriptions on session={}", subscriptions.size(), sessionId);
}
} else if (subscriptions.size() == 1) {
LOGGER.debug("Clear 1 subscription on session={}", sessionId);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Clear 1 subscription on session={}", sessionId);
}
}
subscriptions.forEach((sid, disposable) -> disposable.dispose());
subscriptions.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Map.Entry;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -100,7 +99,9 @@ public ByteBuf encode(ServiceMessage message) throws MessageCodecException {
generator.writeEndObject();
} catch (Throwable ex) {
ReferenceCountUtil.safestRelease(byteBuf);
Optional.ofNullable(message.data()).ifPresent(ReferenceCountUtil::safestRelease);
if (message.data() != null) {
ReferenceCountUtil.safestRelease(message.data());
}
LOGGER.error("Failed to encode gateway service message: {}", message, ex);
throw new MessageCodecException("Failed to encode gateway service message", ex);
}
Expand Down

0 comments on commit 99f7621

Please sign in to comment.