From b9b376070f21e1f0f962f3ca4421ddc67120273f Mon Sep 17 00:00:00 2001 From: He-Pin Date: Sat, 28 Dec 2024 22:29:50 +0800 Subject: [PATCH] fix: Fix ByteBuf leak in remoting transport. --- .../testconductor/RemoteConnection.scala | 9 +++++++-- .../transport/netty/NettyTransport.scala | 3 --- .../remote/transport/netty/TcpSupport.scala | 18 ++++++++++++------ 3 files changed, 19 insertions(+), 11 deletions(-) diff --git a/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/RemoteConnection.scala b/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/RemoteConnection.scala index 99801878bf9..ddb17a7afe5 100644 --- a/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/RemoteConnection.scala +++ b/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/RemoteConnection.scala @@ -56,8 +56,13 @@ private[pekko] class ProtobufEncoder extends MessageToMessageEncoder[Message] { private[pekko] class ProtobufDecoder(prototype: Message) extends MessageToMessageDecoder[ByteBuf] { override def decode(ctx: ChannelHandlerContext, msg: ByteBuf, out: java.util.List[AnyRef]): Unit = { - val bytes = ByteBufUtil.getBytes(msg) - out.add(prototype.getParserForType.parseFrom(bytes)) + try { + val bytes: Array[Byte] = new Array[Byte](msg.readableBytes()) + msg.readBytes(bytes) + out.add(prototype.getParserForType.parseFrom(bytes)) + } catch { + case NonFatal(e) => ctx.pipeline().fireExceptionCaught(e) + } } } diff --git a/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala b/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala index 0956548fdfe..f52d4540e3b 100644 --- a/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala +++ b/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala @@ -52,7 +52,6 @@ import io.netty.channel.nio.NioEventLoopGroup import io.netty.channel.socket.SocketChannel import io.netty.channel.socket.nio.{ NioServerSocketChannel, NioSocketChannel } import io.netty.handler.codec.{ LengthFieldBasedFrameDecoder, LengthFieldPrepender } -import io.netty.handler.flush.FlushConsolidationHandler import io.netty.handler.ssl.SslHandler import io.netty.util.concurrent.GlobalEventExecutor @@ -369,8 +368,6 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA private def newPipeline(channel: Channel): ChannelPipeline = { val pipeline = channel.pipeline() - pipeline.addFirst("FlushConsolidationHandler", - new FlushConsolidationHandler(FlushConsolidationHandler.DEFAULT_EXPLICIT_FLUSH_AFTER_FLUSHES, true)) pipeline.addLast( "FrameDecoder", new LengthFieldBasedFrameDecoder( diff --git a/remote/src/main/scala/org/apache/pekko/remote/transport/netty/TcpSupport.scala b/remote/src/main/scala/org/apache/pekko/remote/transport/netty/TcpSupport.scala index 8d8fab344a1..ba308d92457 100644 --- a/remote/src/main/scala/org/apache/pekko/remote/transport/netty/TcpSupport.scala +++ b/remote/src/main/scala/org/apache/pekko/remote/transport/netty/TcpSupport.scala @@ -14,10 +14,8 @@ package org.apache.pekko.remote.transport.netty import java.net.InetSocketAddress - import scala.annotation.nowarn import scala.concurrent.{ Future, Promise } - import org.apache.pekko import pekko.actor.Address import pekko.event.LoggingAdapter @@ -25,11 +23,12 @@ import pekko.remote.transport.AssociationHandle import pekko.remote.transport.AssociationHandle.{ Disassociated, HandleEvent, HandleEventListener, InboundPayload } import pekko.remote.transport.Transport.AssociationEventListener import pekko.util.ByteString - -import io.netty.buffer.{ ByteBuf, ByteBufUtil, Unpooled } +import io.netty.buffer.{ ByteBuf, Unpooled } import io.netty.channel.{ Channel, ChannelHandlerContext } import io.netty.util.AttributeKey +import scala.util.control.NonFatal + private[remote] object TcpHandlers { private val LISTENER = AttributeKey.valueOf[HandleEventListener]("listener") } @@ -56,8 +55,15 @@ private[remote] trait TcpHandlers extends CommonHandlers { } override def onMessage(ctx: ChannelHandlerContext, msg: ByteBuf): Unit = { - val bytes: Array[Byte] = ByteBufUtil.getBytes(msg) - if (bytes.length > 0) notifyListener(ctx.channel(), InboundPayload(ByteString(bytes))) + try { + val bytes: Array[Byte] = new Array[Byte](msg.readableBytes()) + msg.readBytes(bytes) + if (bytes.length > 0) notifyListener(ctx.channel(), InboundPayload(ByteString(bytes))) + } catch { + case NonFatal(e) => + log.warning("Error while handling message from [{}]: {}", ctx.channel().remoteAddress(), e) + onException(ctx, e) + } } override def onException(ctx: ChannelHandlerContext, e: Throwable): Unit = {