diff --git a/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyHelpers.scala b/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyHelpers.scala index 3b17a57730..8a02a9c4e3 100644 --- a/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyHelpers.scala +++ b/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyHelpers.scala @@ -22,7 +22,6 @@ import org.apache.pekko import pekko.PekkoException import pekko.util.unused -import io.netty.buffer.ByteBuf import io.netty.channel.{ ChannelHandlerContext, SimpleChannelInboundHandler } /** @@ -36,7 +35,7 @@ private[netty] trait NettyHelpers { protected def onOpen(@unused ctx: ChannelHandlerContext): Unit = () - protected def onMessage(@unused ctx: ChannelHandlerContext, @unused msg: ByteBuf): Unit = () + protected def onMessage(@unused ctx: ChannelHandlerContext, @unused msg: Array[Byte]): Unit = () protected def onException(@unused ctx: ChannelHandlerContext, @unused e: Throwable): Unit = () @@ -53,9 +52,9 @@ private[netty] trait NettyHelpers { /** * INTERNAL API */ -private[netty] abstract class NettyChannelHandlerAdapter extends SimpleChannelInboundHandler[ByteBuf] +private[netty] abstract class NettyChannelHandlerAdapter extends SimpleChannelInboundHandler[Array[Byte]] with NettyHelpers { - final override def channelRead0(ctx: ChannelHandlerContext, msg: ByteBuf): Unit = { + final override def channelRead0(ctx: ChannelHandlerContext, msg: Array[Byte]): Unit = { onMessage(ctx, msg) } diff --git a/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala b/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala index 0956548fdf..3c57186934 100644 --- a/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala +++ b/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala @@ -51,6 +51,7 @@ import io.netty.channel.group.{ ChannelGroup, ChannelGroupFuture, ChannelMatcher import io.netty.channel.nio.NioEventLoopGroup import io.netty.channel.socket.SocketChannel import io.netty.channel.socket.nio.{ NioServerSocketChannel, NioSocketChannel } +import io.netty.handler.codec.bytes.ByteArrayDecoder import io.netty.handler.codec.{ LengthFieldBasedFrameDecoder, LengthFieldPrepender } import io.netty.handler.flush.FlushConsolidationHandler import io.netty.handler.ssl.SslHandler @@ -381,7 +382,7 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA FrameLengthFieldLength, // Strip the header true)) pipeline.addLast("FrameEncoder", new LengthFieldPrepender(FrameLengthFieldLength)) - + pipeline.addLast("bytesDecoder", new ByteArrayDecoder) pipeline } diff --git a/remote/src/main/scala/org/apache/pekko/remote/transport/netty/TcpSupport.scala b/remote/src/main/scala/org/apache/pekko/remote/transport/netty/TcpSupport.scala index 8d8fab344a..1870bf056a 100644 --- a/remote/src/main/scala/org/apache/pekko/remote/transport/netty/TcpSupport.scala +++ b/remote/src/main/scala/org/apache/pekko/remote/transport/netty/TcpSupport.scala @@ -26,7 +26,7 @@ import pekko.remote.transport.AssociationHandle.{ Disassociated, HandleEvent, Ha import pekko.remote.transport.Transport.AssociationEventListener import pekko.util.ByteString -import io.netty.buffer.{ ByteBuf, ByteBufUtil, Unpooled } +import io.netty.buffer.Unpooled import io.netty.channel.{ Channel, ChannelHandlerContext } import io.netty.util.AttributeKey @@ -55,8 +55,7 @@ private[remote] trait TcpHandlers extends CommonHandlers { log.debug("Remote connection to [{}] was disconnected.", ctx.channel().remoteAddress()) } - override def onMessage(ctx: ChannelHandlerContext, msg: ByteBuf): Unit = { - val bytes: Array[Byte] = ByteBufUtil.getBytes(msg) + override def onMessage(ctx: ChannelHandlerContext, bytes: Array[Byte]): Unit = { if (bytes.length > 0) notifyListener(ctx.channel(), InboundPayload(ByteString(bytes))) }