From d6b0a91d80d381b6f58695f4c7a62cff5b797607 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=91=88=E9=93=AD?= Date: Mon, 18 Mar 2024 15:16:41 +0800 Subject: [PATCH] support netty io_uring --- bom/pom.xml | 22 ++++++++ .../alipay/sofa/rpc/common/RpcOptions.java | 6 +++ remoting/remoting-http/pom.xml | 16 ++++++ .../http/AbstractHttp2ClientTransport.java | 4 +- .../sofa/rpc/transport/netty/NettyHelper.java | 35 ++++++++++-- .../rpc/transport/netty/NettyHelperTest.java | 53 +++++++++++++++++++ 6 files changed, 130 insertions(+), 6 deletions(-) create mode 100644 remoting/remoting-http/src/test/java/com/alipay/sofa/rpc/transport/netty/NettyHelperTest.java diff --git a/bom/pom.xml b/bom/pom.xml index a97d8cc8f..54c9c207a 100644 --- a/bom/pom.xml +++ b/bom/pom.xml @@ -60,6 +60,9 @@ true true + + + 0.0.21.Final @@ -598,6 +601,25 @@ 0.16.0 test + + + + io.netty.incubator + netty-incubator-transport-classes-io_uring + ${netty-iouring.version} + + + io.netty.incubator + netty-incubator-transport-native-io_uring + ${netty-iouring.version} + linux-x86_64 + + + io.netty.incubator + netty-incubator-transport-native-io_uring + ${netty-iouring.version} + linux-aarch_64 + diff --git a/core/api/src/main/java/com/alipay/sofa/rpc/common/RpcOptions.java b/core/api/src/main/java/com/alipay/sofa/rpc/common/RpcOptions.java index c083625cc..c9318bc1d 100644 --- a/core/api/src/main/java/com/alipay/sofa/rpc/common/RpcOptions.java +++ b/core/api/src/main/java/com/alipay/sofa/rpc/common/RpcOptions.java @@ -396,6 +396,12 @@ public class RpcOptions { * 默认开启epoll? */ public static final String TRANSPORT_USE_EPOLL = "transport.use.epoll"; + + /** + * 是否开始io_uring + */ + public static final String TRANSPORT_USE_IO_URING = "transport.use.ioUring"; + /** * 默认服务端 数据包限制 */ diff --git a/remoting/remoting-http/pom.xml b/remoting/remoting-http/pom.xml index 42a4ceab1..a66646853 100644 --- a/remoting/remoting-http/pom.xml +++ b/remoting/remoting-http/pom.xml @@ -46,6 +46,22 @@ netty-tcnative-boringssl-static ${os.detected.classifier} + + + + io.netty.incubator + netty-incubator-transport-classes-io_uring + + + io.netty.incubator + netty-incubator-transport-native-io_uring + linux-x86_64 + + + io.netty.incubator + netty-incubator-transport-native-io_uring + linux-aarch_64 + diff --git a/remoting/remoting-http/src/main/java/com/alipay/sofa/rpc/transport/http/AbstractHttp2ClientTransport.java b/remoting/remoting-http/src/main/java/com/alipay/sofa/rpc/transport/http/AbstractHttp2ClientTransport.java index b0fc1d8f7..20b2130b6 100644 --- a/remoting/remoting-http/src/main/java/com/alipay/sofa/rpc/transport/http/AbstractHttp2ClientTransport.java +++ b/remoting/remoting-http/src/main/java/com/alipay/sofa/rpc/transport/http/AbstractHttp2ClientTransport.java @@ -51,8 +51,6 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; -import io.netty.channel.epoll.EpollSocketChannel; -import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.http.DefaultFullHttpRequest; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.HttpHeaderNames; @@ -145,7 +143,7 @@ public void connect() { int port = providerInfo.getPort(); Bootstrap b = new Bootstrap(); b.group(workerGroup); - b.channel(transportConfig.isUseEpoll() ? EpollSocketChannel.class : NioSocketChannel.class); + b.channel(NettyHelper.socketChannel()); b.option(ChannelOption.SO_KEEPALIVE, true); b.remoteAddress(host, port); b.handler(initializer); diff --git a/remoting/remoting-http/src/main/java/com/alipay/sofa/rpc/transport/netty/NettyHelper.java b/remoting/remoting-http/src/main/java/com/alipay/sofa/rpc/transport/netty/NettyHelper.java index 14e5c8f50..9c938c3b4 100644 --- a/remoting/remoting-http/src/main/java/com/alipay/sofa/rpc/transport/netty/NettyHelper.java +++ b/remoting/remoting-http/src/main/java/com/alipay/sofa/rpc/transport/netty/NettyHelper.java @@ -29,7 +29,13 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.EventLoopGroup; import io.netty.channel.epoll.EpollEventLoopGroup; +import io.netty.channel.epoll.EpollSocketChannel; import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.incubator.channel.uring.IOUring; +import io.netty.incubator.channel.uring.IOUringEventLoopGroup; +import io.netty.incubator.channel.uring.IOUringSocketChannel; import io.netty.util.Attribute; import io.netty.util.AttributeKey; @@ -42,6 +48,7 @@ import static com.alipay.sofa.rpc.common.RpcConfigs.getIntValue; import static com.alipay.sofa.rpc.common.RpcOptions.TRANSPORT_CLIENT_IO_THREADS; import static com.alipay.sofa.rpc.common.RpcOptions.TRANSPORT_USE_EPOLL; +import static com.alipay.sofa.rpc.common.RpcOptions.TRANSPORT_USE_IO_URING; /** * @author GengZhang @@ -188,9 +195,7 @@ public synchronized static EventLoopGroup getClientIOEventLoopGroup() { clientIoThreads : // 用户配置 Math.max(4, SystemInfo.getCpuCores() + 1); // 默认cpu+1,至少4个 NamedThreadFactory threadName = new NamedThreadFactory("CLI-IO", true); - boolean useEpoll = getBooleanValue(TRANSPORT_USE_EPOLL); - clientIOEventLoopGroup = useEpoll ? new EpollEventLoopGroup(threads, threadName) - : new NioEventLoopGroup(threads, threadName); + clientIOEventLoopGroup = eventLoopGroup(threads, threadName); refCounter.putIfAbsent(clientIOEventLoopGroup, new AtomicInteger(0)); // SelectStrategyFactory 未设置 } @@ -198,6 +203,30 @@ public synchronized static EventLoopGroup getClientIOEventLoopGroup() { return clientIOEventLoopGroup; } + public synchronized static EventLoopGroup eventLoopGroup(int threads, NamedThreadFactory threadName) { + boolean useEpoll = getBooleanValue(TRANSPORT_USE_EPOLL); + boolean useIoUring = getBooleanValue(TRANSPORT_USE_IO_URING); + if (useEpoll) { + return new EpollEventLoopGroup(threads, threadName); + } else if (useIoUring && SystemInfo.isLinux() && IOUring.isAvailable()) { + return new IOUringEventLoopGroup(threads, threadName); + } else { + return new NioEventLoopGroup(threads, threadName); + } + } + + public synchronized static Class socketChannel() { + boolean useEpoll = getBooleanValue(TRANSPORT_USE_EPOLL); + boolean useIoUring = getBooleanValue(TRANSPORT_USE_IO_URING); + if (useEpoll) { + return EpollSocketChannel.class; + } else if (useIoUring && SystemInfo.isLinux() && IOUring.isAvailable()) { + return IOUringSocketChannel.class; + } else { + return NioSocketChannel.class; + } + } + /** * 关闭客户端IO线程池 */ diff --git a/remoting/remoting-http/src/test/java/com/alipay/sofa/rpc/transport/netty/NettyHelperTest.java b/remoting/remoting-http/src/test/java/com/alipay/sofa/rpc/transport/netty/NettyHelperTest.java new file mode 100644 index 000000000..51efbacd1 --- /dev/null +++ b/remoting/remoting-http/src/test/java/com/alipay/sofa/rpc/transport/netty/NettyHelperTest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.transport.netty; + +import com.alipay.sofa.rpc.common.struct.NamedThreadFactory; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.incubator.channel.uring.IOUring; +import io.netty.incubator.channel.uring.IOUringEventLoopGroup; +import io.netty.incubator.channel.uring.IOUringSocketChannel; +import org.junit.Assert; +import org.junit.Test; + +import static com.alipay.sofa.rpc.common.RpcOptions.TRANSPORT_USE_IO_URING; + +/** + * @author chengming + * @version NettyHelperTest.java, v 0.1 2024年03月18日 2:35 PM chengming + */ +public class NettyHelperTest { + + @Test + public void testEventLoopGroup() { + System.setProperty("os.name", "linux111"); + System.setProperty(TRANSPORT_USE_IO_URING, "true"); + + EventLoopGroup eventLoopGroup = NettyHelper.eventLoopGroup(1, new NamedThreadFactory("test", true)); + Class socketChannel = NettyHelper.socketChannel(); + if (IOUring.isAvailable()) { + Assert.assertTrue(eventLoopGroup instanceof IOUringEventLoopGroup); + Assert.assertEquals(IOUringSocketChannel.class, socketChannel); + } else { + Assert.assertTrue(eventLoopGroup instanceof NioEventLoopGroup); + Assert.assertEquals(NioSocketChannel.class, socketChannel); + } + } +}