Skip to content

Commit

Permalink
fix: Fix ByteBuf leak in remoting transport.
Browse files Browse the repository at this point in the history
  • Loading branch information
He-Pin committed Dec 28, 2024
1 parent 5334ea4 commit b9b3760
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,13 @@ private[pekko] class ProtobufEncoder extends MessageToMessageEncoder[Message] {
private[pekko] class ProtobufDecoder(prototype: Message) extends MessageToMessageDecoder[ByteBuf] {

override def decode(ctx: ChannelHandlerContext, msg: ByteBuf, out: java.util.List[AnyRef]): Unit = {
val bytes = ByteBufUtil.getBytes(msg)
out.add(prototype.getParserForType.parseFrom(bytes))
try {
val bytes: Array[Byte] = new Array[Byte](msg.readableBytes())
msg.readBytes(bytes)
out.add(prototype.getParserForType.parseFrom(bytes))
} catch {
case NonFatal(e) => ctx.pipeline().fireExceptionCaught(e)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.{ NioServerSocketChannel, NioSocketChannel }
import io.netty.handler.codec.{ LengthFieldBasedFrameDecoder, LengthFieldPrepender }
import io.netty.handler.flush.FlushConsolidationHandler
import io.netty.handler.ssl.SslHandler
import io.netty.util.concurrent.GlobalEventExecutor

Expand Down Expand Up @@ -369,8 +368,6 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA

private def newPipeline(channel: Channel): ChannelPipeline = {
val pipeline = channel.pipeline()
pipeline.addFirst("FlushConsolidationHandler",
new FlushConsolidationHandler(FlushConsolidationHandler.DEFAULT_EXPLICIT_FLUSH_AFTER_FLUSHES, true))
pipeline.addLast(
"FrameDecoder",
new LengthFieldBasedFrameDecoder(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,21 @@
package org.apache.pekko.remote.transport.netty

import java.net.InetSocketAddress

import scala.annotation.nowarn
import scala.concurrent.{ Future, Promise }

import org.apache.pekko
import pekko.actor.Address
import pekko.event.LoggingAdapter
import pekko.remote.transport.AssociationHandle
import pekko.remote.transport.AssociationHandle.{ Disassociated, HandleEvent, HandleEventListener, InboundPayload }
import pekko.remote.transport.Transport.AssociationEventListener
import pekko.util.ByteString

import io.netty.buffer.{ ByteBuf, ByteBufUtil, Unpooled }
import io.netty.buffer.{ ByteBuf, Unpooled }
import io.netty.channel.{ Channel, ChannelHandlerContext }
import io.netty.util.AttributeKey

import scala.util.control.NonFatal

private[remote] object TcpHandlers {
private val LISTENER = AttributeKey.valueOf[HandleEventListener]("listener")
}
Expand All @@ -56,8 +55,15 @@ private[remote] trait TcpHandlers extends CommonHandlers {
}

override def onMessage(ctx: ChannelHandlerContext, msg: ByteBuf): Unit = {
val bytes: Array[Byte] = ByteBufUtil.getBytes(msg)
if (bytes.length > 0) notifyListener(ctx.channel(), InboundPayload(ByteString(bytes)))
try {
val bytes: Array[Byte] = new Array[Byte](msg.readableBytes())
msg.readBytes(bytes)
if (bytes.length > 0) notifyListener(ctx.channel(), InboundPayload(ByteString(bytes)))
} catch {
case NonFatal(e) =>
log.warning("Error while handling message from [{}]: {}", ctx.channel().remoteAddress(), e)
onException(ctx, e)
}
}

override def onException(ctx: ChannelHandlerContext, e: Throwable): Unit = {
Expand Down

0 comments on commit b9b3760

Please sign in to comment.