Skip to content

Commit

Permalink
chore: Try to fix Netty leak with explicit read.
Browse files Browse the repository at this point in the history
  • Loading branch information
He-Pin committed Dec 28, 2024
1 parent 5334ea4 commit b8716a9
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import org.apache.pekko
import pekko.PekkoException
import pekko.util.unused

import io.netty.buffer.ByteBuf
import io.netty.channel.{ ChannelHandlerContext, SimpleChannelInboundHandler }

/**
Expand All @@ -36,7 +35,7 @@ private[netty] trait NettyHelpers {

protected def onOpen(@unused ctx: ChannelHandlerContext): Unit = ()

protected def onMessage(@unused ctx: ChannelHandlerContext, @unused msg: ByteBuf): Unit = ()
protected def onMessage(@unused ctx: ChannelHandlerContext, @unused msg: Array[Byte]): Unit = ()

protected def onException(@unused ctx: ChannelHandlerContext, @unused e: Throwable): Unit = ()

Expand All @@ -53,9 +52,9 @@ private[netty] trait NettyHelpers {
/**
* INTERNAL API
*/
private[netty] abstract class NettyChannelHandlerAdapter extends SimpleChannelInboundHandler[ByteBuf]
private[netty] abstract class NettyChannelHandlerAdapter extends SimpleChannelInboundHandler[Array[Byte]]
with NettyHelpers {
final override def channelRead0(ctx: ChannelHandlerContext, msg: ByteBuf): Unit = {
final override def channelRead0(ctx: ChannelHandlerContext, msg: Array[Byte]): Unit = {
onMessage(ctx, msg)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import io.netty.channel.group.{ ChannelGroup, ChannelGroupFuture, ChannelMatcher
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.{ NioServerSocketChannel, NioSocketChannel }
import io.netty.handler.codec.bytes.ByteArrayDecoder
import io.netty.handler.codec.{ LengthFieldBasedFrameDecoder, LengthFieldPrepender }
import io.netty.handler.flush.FlushConsolidationHandler
import io.netty.handler.ssl.SslHandler
Expand Down Expand Up @@ -381,7 +382,7 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA
FrameLengthFieldLength, // Strip the header
true))
pipeline.addLast("FrameEncoder", new LengthFieldPrepender(FrameLengthFieldLength))

pipeline.addLast("bytesDecoder", new ByteArrayDecoder)
pipeline
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import pekko.remote.transport.AssociationHandle.{ Disassociated, HandleEvent, Ha
import pekko.remote.transport.Transport.AssociationEventListener
import pekko.util.ByteString

import io.netty.buffer.{ ByteBuf, ByteBufUtil, Unpooled }
import io.netty.buffer.Unpooled
import io.netty.channel.{ Channel, ChannelHandlerContext }
import io.netty.util.AttributeKey

Expand Down Expand Up @@ -55,8 +55,7 @@ private[remote] trait TcpHandlers extends CommonHandlers {
log.debug("Remote connection to [{}] was disconnected.", ctx.channel().remoteAddress())
}

override def onMessage(ctx: ChannelHandlerContext, msg: ByteBuf): Unit = {
val bytes: Array[Byte] = ByteBufUtil.getBytes(msg)
override def onMessage(ctx: ChannelHandlerContext, bytes: Array[Byte]): Unit = {
if (bytes.length > 0) notifyListener(ctx.channel(), InboundPayload(ByteString(bytes)))
}

Expand Down

0 comments on commit b8716a9

Please sign in to comment.