-
Notifications
You must be signed in to change notification settings - Fork 360
Netty实战二之自己的Netty应用程序
接下来我们将展示如何构建一个基于Netty的客户端和服务器,程序很简单:客户端将消息发送给服务器,而服务器再将消息回送给客户端,这将是一个对你而言很重要的第一个netty的实践经验。
编译和运行,我们需要准备JDK和Apache Maven工具,这里建议大家使用Java的集成开发环境(IDE)。
如果你已经安装了JDK,那么可以略过此步。
否则,请从http://java.com/en/download/manual.jsp 处获取JDK第8版,请下载JDK,而不是Java运行环境(JRE),其仅仅可以运行Java应用程序,但不够编译它们。
有关安装说明:
-
将环境变量JAVA_HOME设置为你的JDK安装位置
-
将%JAVA_HOME%\bin添加到你的执行路径
下面是使用最广泛的Java IDE,都可以免费获取
-
Eclipse——www.eclipse.org
-
NetBeans——https://netbeans.org
-
Intellij IDEA Community Edition——www.jetbrains.com
有关MAVEN的安装也与Java JDK安装类似
图2-1展示了我们将要编写的Echo客户端和服务器应用程序,即使可能我们要编写基于Web的用于被浏览器访问的应用程序,但是通过同时实现客户端和服务器,你一定能更加全面地理解Netty的API。
虽然图中也展示了我们一开始所说的多个客户端,所能够支持的客户端数量,在理论上,仅受限于系统的可用资源(以及所使用的JDK版本可能会施加的限制)。
Echo客户端和服务器之间的交互非常简单,其本身也充分体现了客户端/服务器系统中典型的请求-响应交互模式。
所有的Netty服务器都需要以下两个部分:
-
至少一个ChannelHandler——该组件实现了服务器对从客户端接收的数据的处理,即它的业务逻辑。
-
引导——配置服务器的启动代码,将服务器绑定到它要监听连接请求的端口上。
我们的服务器会响应传入的消息,需要实现ChannelInboundHandler接口,用来定义响应入站事件的方法,对于此应用而言只需要用到少量的这些方法,所以继承ChannelInboundHandlerAdapter类就足够了,它提供了ChannelInboundHandler的默认实现。
-
channelRead():对于每个传入的消息都要调用
-
channelReadComplete():通知ChannelInboundHandler最后一次对channelRead()的调用时当前批量读取中的最后一条消息
-
exceptionCaught():在读取操作期间,有异常跑出会调用
代码清单2-1,展示Echo服务器的ChannelHandler实现EchoServerHandler。
@ChannelHandler.Sharable //标示一个ChannelHandler可以被多个Channel安全地共享
public class EchoServerHandler extends ChannelInboundHandlerAdapter{
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf in = (ByteBuf)msg;
System.out.println("Server received: " + in.toString(CharsetUtil.UTF_8));//将消息记录到控制台
ctx.write(in);//将接受到的消息写给发送者,而不冲刷出站消息·
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//将未决消息冲刷到远程节点,并且关闭该Channel
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();//打印异常栈跟踪
ctx.close();//关闭该Channel
}
}
ChannelInboundHandlerAdapter有一个直观的API,并且它的每个方法都可以被重写以挂钩到事件生命周期的恰当点上。因为需要处理所有接收到的数据,所以重写channelRead()方法
重写exceptionCaught()方法允许你对Throwable的任何子类做出反应,我们代码中记录了异常并关闭了连接。
如果不捕获异常,会发生什么呢?
每个Channel都拥有一个与之相关联的ChannelPipeline,其持有一个ChannelHandler的实例链,在默认情况下,ChannelHandler会把对它的方法的调用转发给链中的下一个ChannelHandler,因此,如果exceptionCaught()方法没有被该链中的某处实现,那么所接收的异常将会被传递到ChannelPipeline的尾端并被记录,为此,你的应用程序应该提供至少有一个实现了exceptionCaught()方法的ChannelHandler。
除了ChannelInboundHandlerAdapter之外,还有很多需要学习的ChannelHandler的子类型和实现。
-
针对不同类型的事件调用ChannelHandler
-
应用程序通过实现或者扩展ChannelHandler来挂钩到事件的生命周期,并且提供自定义的应用程序逻辑
-
在架构上,ChannelHandler有助于保持业务逻辑与网络处理代码的分离,这简化了开发过程,因为代码必须不断地演化以响应不断变化的需求
在讨论过EchoServerHandler实现的核心业务逻辑之后,我们现在可以讨论引导服务器本身的过程:
-
绑定到服务器将在其上监听并接受请求的端口
-
配置Channel,以将有关的入站消息通知给EchoServerHandler实例
传输:
在网络协议的标准多层视图中,传输层提供了端到端的或者主机到主机的通信服务。
因特网通信是建立在TCP传输之上的,除了一些由Java NIO实现提供的服务器端性能增强之外,NIO传输大多数时候指的就是TCP传输。
public class EchoServer {
private final int port;
public EchoServer(int port) {
this.port = port;
}
public static void main(String[] args) throws Exception{
if (args.length != 1){
System.out.println("Usage: " + EchoServer.class.getSimpleName() + " <port>");
return;
}
int port = Integer.parseInt(args[0]);//设置端口值(如果端口参数的格式不正确,则抛出一个NumberFormatException)
new EchoServer(port).start();//调用服务器的start()方法
}
public void start() throws Exception{
final EchoServerHandler serverHandler = new EchoServerHandler();
EventLoopGroup group = new NioEventLoopGroup();//创建EventLoopGroup
try {
ServerBootstrap b = new ServerBootstrap();//创建ServerBootstrap
b.group(group)
.channel(NioServerSocketChannel.class)//指定所使用的NIO传输Channel
.localAddress(new InetSocketAddress(port))//使用指定的端口设置套接字地址
.childHandler(new ChannelInitializer<SocketChannel>() {//添加一个EchoServerHandler到子Channel的ChannelPipeline
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//EchoServerHandler被标注为@Shareable,所以我们可以总是使用同样的实例
socketChannel.pipeline().addLast(serverHandler);
}
});
ChannelFuture f = b.bind().sync();//异步地绑定服务器,调用sync()方法阻塞等待直到绑定完成
f.channel().closeFuture().sync();//获取Channel的closeFuture,并且阻塞当前线程直到它完成
}finally {
group.shutdownGracefully().sync();//关闭EventLoopGroup释放所有的资源
}
}
}
我们创建了一个ServerBootstrap实例,因为正在使用NIO传输,指定NioEventLoopGroup来接收和处理新的连接,并且将Channel的类型指定为NioServerSocketChannel。在此之后,将本地地址设置为一个具有选定端口的InetSocketAddress,服务器将绑定到这个地址以监听新的连接请求。
使用一个特殊的类——ChannelInitializer。当一个新的连接被接受时,一个新的子Channel将会被创建,而ChannelInitializer将会把一个你的EchoServerHandler的实例添加到该Channel的ChannelPipeline中,即这个ChannelHandler将会收到有关入站消息的通知。
虽然NIO是可伸缩的,但是其关于多线程处理的配置并不简单。Netty的设计封装了大部分的复杂性。
绑定服务器,并等待绑定完成。(对sync()方法的调用将导致当前Thread阻塞,一直到绑定操作完成为止)该应用程序将会阻塞等待直到服务器的Channel关闭(因为我的Channel的CloseFuture上调用sync()方法),之后我们可以关闭EventLoopGroup,并释放所有的资源,包括所有被创建的线程。
使用了NIO,因为得益于它的可扩展性和彻底的异步性,它是目前使用最广泛的传输,可以使用一个不同的传输实现,当然如果你想要在自己的服务器中使用OIO传输,将需要指定OioServerSocketChanne和OioEventLoopGroup。
让我们回顾一下服务器中的重要步骤:
-
EchoServerHandler实现了业务逻辑
-
main()方法引导了服务器
引导过程中所需的步骤:
-
创建一个ServerBootstrap的实例以引导和绑定服务器
-
创建并分配一个NioEventLoopGroup实例以进行事件的处理,如接受新连接以及读/写数据
-
指定服务器绑定的本地的InetSocketAddress
-
使用一个EchoServerHandler的实例初始化每一个新的Channel
-
调用ServerBootstrap.bing()方法以绑定服务器
1、连接到服务器 2、发送一个或者多个消息 3、对于每个消息,等待并接收从服务器发回的相同的消息 4、关闭连接
编写客户端所涉及的两个主要代码部分也是业务逻辑和引导
客户端将拥有一个用来处理数据的ChannelInboundHandler,在这个场景下,将扩展SimpleChannelInboundHandler类以处理所有必须的任务。如代码清单2-3,要求重写下面的方法:
-
channelActive():在到服务器的连接已经建立之后将被调用
-
channelRead():当从服务器接收到一条消息时被调用
-
exceptionCaught():在处理过程中引发异常时被调用
@ChannelHandler.Sharable //标记该类的实例可以被多个Channel共享
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf>{
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//当被通知Channel是活跃的时候,发送一条消息
ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rocks!", CharsetUtil.UTF_8));
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception {
//记录已接收消息的转储
System.out.println("Client received: " + byteBuf.toString(CharsetUtil.UTF_8));
}
/**
* 在发生异常时,记录错误并关闭Channel
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
重写了channelActive()方法,其将在一个连接建立时被调用,这确保了数据将会被尽可能快地写入服务器,其在这个场景下是一个编码了字符串“Netty rocks!”的字符串缓存区。
重写了channelRead0()方法,每当接收数据时,都会调用这个方法。需要注意的是,由服务器发送的消息可能会被分块接收。如果服务器发送了5字节,那么不能保证这5字节会被一次性接收。
即使是对于这么少量的数据,channelRead0()方法也可能会被调用两次。作为一个面向流的协议,TCP保证了字节数组将会按照服务器发送它们的顺序被接收
重写了exceptionCaught()。如同在EchoServerHandler(见代码清单2-2)中所示,记录Throwable,关闭Channel,在这个场景下,终止到服务器的连接。
SimpleChannelInboundHandler与ChannelLnboundHandler
为什么我们在客户端使用的是SimpleChannelInboundHandler,而不是在EchoServerHandler中所使用的ChannelInboundHandlerAdapter呢?这两个因素的相互作用有关:业务逻辑如何处理消息以及Netty如何管理资源
在客户端,当channelRead()方法完成时,你已经有了传入消息,并且已经处理完它了。当该方法返回时,SimpleChannelInboundHandler负责释放指向保存该消息的ByteBuf的内存引用。
在EchoServerHandler中,你仍然需要将传入消息回送给发送者,而write()操作时异步的,直到channelRead()方法返回后可能仍然没有完成,为此,EchoServerHandler扩展了ChannelInboundHandlerAdapter,其在这个时间点上不会释放消息。
消息在EchoServerHandler的channelReadComplete()方法中,当writeAndFlush()方法被调用时被释放。
引导客户端类似于引导服务器,不同的是,客户端是使用主机和端口参数来连接远程地址,也就是这里的Echo服务器的地址,而不是绑定到一个一直被监听的端口。
public class EchoClient {
private final String host;
private final int port;
public EchoClient(String host, int port) {
this.host = host;
this.port = port;
}
public void start() throws Exception{
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();//创建Bootstrap
b.group(group)//指定EventLoopGroup以处理客户端事件,需要适用于NIO的实现
.channel(NioSocketChannel.class)//适用于NIO传输的Channel类型
.remoteAddress(new InetSocketAddress(host,port))//设置服务器的InetSocketAddress
.handler(new ChannelInitializer<SocketChannel>() {//在创建Channel时,向ChannelPipeline中添加一个EchoClientHandler实例
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new EchoClientHandler());
}
});
ChannelFuture f = b.connect().sync();//连接到远程节点,阻塞等待直到连接完成
f.channel().closeFuture().sync();//阻塞,直到Channel关闭
}finally {
group.shutdownGracefully().sync();//关闭线程池并且释放所有的资源
}
}
public static void main(String[] args) throws Exception{
if (args.length != 2){
System.out.println("Usage: " + EchoClient.class.getSimpleName() + " <host> <port>");
return;
}
String host = args[0];
int port = Integer.parseInt(args[1]);
new EchoClient(host,port).start();
}
}
注意,你可以在客户端和服务器上分别使用不同的传输。在服务器端使用NIO传输,而在客户端使用OIO传输。
-
为初始化客户端,创建了一个Bootstrap实例
-
为进行事件处理分配了一个NioEventLoopGroup实例,其中事件处理包括创建新的连接以及处理入站和出站数据
-
为服务器连接创建了一个InetSocketAddress实例
-
当连接被建立时,一个EchoClientHandler实例会被安装到(该Channel的)ChannelPipeline中
-
在一切都设置完成后,调用Bootstrap.connect()方法连接到远程节点
在本节中虽然只是一个简单的应用程序, 但是它可以伸缩到支持数千并发连接——每秒可以比普通的基于套接字的Java应用程序处理多得多的消息。
深入地了解Netty对于关注点分离的架构原则的支持,通过提供正确的抽象来解耦业务逻辑和网络编程逻辑。
公众号:UncleCatMySelf