点击量:3737
在阅读了netty-4.0.0-final的源码以及参考了《Netty in Action v5 MEAP》这本书之后,决定写一篇文章记录这几天的心得体会,介绍下netty的相关知识。但并不会面面俱到地阐述每一个细节,只会记录一些个人认为非常重要的部分,比如其线程模型,关键组件,以及使用方法等。
一.Netty是什么?
Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。也就是说,Netty 是一个基于NIO的客户,服务器端编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户,服务端应用。Netty相当简化和流线化了网络应用的编程开发过程,例如,TCP和UDP的socket服务开发。
以上这个定义来自百度百科,简单来说Netty就是一个服务器端的网络通信框架,用于处理网络请求,和你自己写一个简单的服务器没什么本质上的区别,只不过Netty做的更强大。因为它是一个基于网络IO的通信框架,不受限于网络协议,所以他的应用范围就很广。比如你如果要开发一个分布式应用,需要RPC(Remote Procedure Call Protocol)进行应用间数据交互。一般情况下你需要设计一套自己的私有协议,基于TCP或者UDP。再比如说游戏行业,玩家长时间在线和服务器进行交互,你可以使用现有的协议比如HTTP来完成数据交互。但是你要知道HTTP可是为浏览器而生的,包含了很多不必要的信息,从节省流量的角度来说肯定是不合理的。当你需要使用私有的网络协议时,Netty这种纯粹基于网络IO的高性能、高可靠性的通信框架就派上用场了,因为你可以进行高度的定制化。当然Netty也对现有的网络协议支持的也很好,比如Http/Https/Websocket等。
二.高效的Reactor线程模型
在介绍Netty的线程模型之前先了解下什么是Reactor线程模型,这个模型最大的特色是基于IO多路复用技术,从而能极大地提高线程处理IO的能力。
Reactor线程模型一般分为以下三种:
Reactor单线程模型
如下图所示,Reactor单线程模型,指的是所有的IO操作都在同一个NIO线程上面完成。
很显然这种单线程的reactor模型是不能满足某些高负载、大并发场景的,因为即使是异步非阻塞IO,所有的IO操作都不会导致阻塞,但说到底也就只有一个线程,处理能力毕竟有限。
Reactor多线程模型
在此基础上又演化出了Reactor多线程模型,示意图如下:
主要有这几个特点:
1.有一个专门一个NIO线程-Acceptor线程用于监听服务端,接收客户端的TCP连接请求
2.网络IO操作-读、写等由一个NIO线程池负责,由这些NIO线程负责消息的读取、解码、编码和发送;
3.一个NIO线程可以同时处理N条链路,但是一个链路只对应一个NIO线程,防止发生并发操作问题。
主从Reactor多线程模型
多线程Reactor模型已经能够满足绝大多数的场景,但是当客户端连接数过多的时候,比如百万级的并发量,那么只用一个线程来接收请求就会显得力不从心了。于是第三种主从Reactor多线程模型应运而生,最大的特点就是:服务端用于接收客户端连接的不再是个1个单独的NIO线程,而是一个独立的NIO线程池。模型图如下:
看着高大上,说白了其实就是两个线程池嘛。
Netty就是采用了这种经典的Reactor线程模型设计,但是具体采用了哪一种呢?答案是这三种都支持,通过在启动类中创建不同的EventLoopGroup实例并通过适当的参数配置就可以实现,具体怎么操作下文再说。
三.Netty组件设计
EventLoopGroup&EventLoop
上一部分介绍了高大上的Reactor线程模型,接下来我们会解释Netty的组件设计,以及他们处于线程模型的哪一部分。
先来看一张非常概括的图:
这张简易的线程流程图就对应了上一部分所说的主从多线程Reactor模型,分别有两个线程池:EventLoopGroupA和EventLoopGroupB。一个用于接收请求,另一个用于处理IO操作。要实现这种线程模型,代码如下:
1 2 3 4 5 |
EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap sb = new ServerBootstrap(); sb.group(bossGroup, workerGroup); |
如果只使用一个线程池的话,可以这样写:
1 2 |
ServerBootstrap sb = new ServerBootstrap(); sb.group(new NioEventLoopGroup()); |
其实这个方法最终还是调用了sb.group(bossGroup, workerGroup),只不过bossGroup和workerGroup是同一个对象。
那如果使用单线程模型呢?很简单,通过配置参数:io.netty.eventLoopThreads来指定线程的数量,如果不配置则默认是cpu数量的两倍。配置为0或者1则只使用一个线程。
1 2 3 4 5 6 |
private static final int DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2)); protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object[] args) { super((nThreads == 0) ? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args); } |
上面这个图实在是太概括了,并不能帮助我们很好的理解Netty各组件之间的关系,我们再来看另一张:
(EventLoopGroup接受和分配请求)
从上图中可以看出,一个EventLoopGroup就相当于一个线程池,而每一个EventLoop就是一个线程,当新的Channel被创建时(有新的请求进来),就会在EventLoopGroup里注册一下,同时会分配一个EventLoop给这个Channel,从此开始直到这个Channel被销毁,这个Channel只能被它绑定的这个EventLoop执行,这也就是为什么Netty可以不用考虑并发的原因。由此可见,会存在多个Channel去竞争同一个EventLoop的情况,那此时执行线程的处理逻辑是怎样的呢?也很简单:如果Channel所绑定的那个线程是空闲的,则执行任务,否则去排队。一个EventLoopGroup里面可以包含多个EventLoop,顶层都是都基于Java的执行器Executor的,他们的继承关系如下:
(EventLoopGroup和EventLoop的继承关系)
ChannelPipeline&ChannelHander
关于接收、分配请求的流程已经说了,接下来就是数据处理了,这是Netty最关键的一部分!Netty采用了一种叫做数据流(data flow)的处理机制,类似于Unix中的管道。即每一个Channel都有一个自己的ChannelPipeline,每一个pipeline里会有多个ChannelHandler。数据会像击鼓传花一样依次通过每一个handler被逐一处理。流程如下图所示:
(ChannelPipeline数据处理)
值得一提的是,这个流处理是双向混合的,分为Inbound和Outbound(他们在Netty3中被称为upstream和downstream),分别对应request和response。这个handler被分成两类:ChannelOutboundHandler和ChannelInboundHandler。当服务器处理进来的请求时,则只会调用实现了ChannelInboundHandler的handler;当服务器返回信息给客户端时,则只会调用实现了ChannelOutboundHandler的handler。
(ChannelHander和它的派生类的关系)
Handler的使用非常方便,你只需要继承一个ChannelInboundHandlerAdapter,重写一些方法,比如channelRead(),channelReadComplete()和exceptionCaught()等,在channelRead方法里面对进来的数据进行处理,也就是我们处理业务逻辑的地方。或者更简单的继承SimpleChannelInboundHandler,只要重写channelRead0()就可以了。
Encoders&Decoders
我们在解析处理请求时通常需要对数据格式进行转换,比如把字节变成对象,或者把对象转换为字节。针对这种常见的场景,Netty提供了编码和解码的接口:MessageToByteEncoder和ByteToMessageEncoder。其实两个抽象类分别继承了ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter,说白了,使用起来和普通的handler没什么区别。自己写的类只要重写decode()或者encode()方法对数据进行处理即可。
Bootstrap/ServerBootstrap
顾名思义就是启动类,分别负责启动客户端和服务器端。这个类用来配置相关参数,比如设置EventLoopGroup,IO类型,handler等等,Netty的一切都从这里开始。
四.代码示例
Demo1:Echo服务器
上面的几个部分把Netty介绍的差不读,接下来我们写一个echo服务器来进一步理解Netty。所谓echo服务器就是收到客户端发送的消息,并且把收到的内容返回给客户端即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
package com.fan3cn.netty; import java.net.InetSocketAddress; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; public class EchoServer { //端口号 private static int port = 7777; public static void main(String[] args) { // TODO Auto-generated method stub EchoServer server = new EchoServer(); server.startup(); } public void startup(){ ServerBootstrap sb = new ServerBootstrap(); //主从reactor模式 sb.group(new NioEventLoopGroup(), new NioEventLoopGroup()); //指定IO类型 sb.channel(NioServerSocketChannel.class); //设置handler sb.childHandler(new EchoServerChannelPipeline()); ChannelFuture future = sb.bind(new InetSocketAddress(port)); future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { // TODO Auto-generated method stub if(f.isSuccess()){ System.out.println("bind success,listening on "+port); }else{ System.out.println("bind failed"); } } }); } /** * * @author Eric * */ public class EchoServerChannelPipeline extends ChannelInitializer<SocketChannel>{ @Override protected void initChannel(SocketChannel ch) throws Exception { // TODO Auto-generated method stub ChannelPipeline pipeline = ch.pipeline(); //添加InboundHandler pipeline.addLast(new MySimpleChannelInboundHandler()); } } /** * handler * @author Eric * */ public class MySimpleChannelInboundHandler extends ChannelInboundHandlerAdapter{ /** * 处理业务逻辑 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof ByteBuf){ ByteBuf data = (ByteBuf) msg; //拷贝一份数据 ByteBuf readCpoy = data.copy(); int idx = readCpoy.readableBytes(); byte[] bytes = readCpoy.readBytes(idx).array(); System.out.println("read data:\n"+new String(bytes)); } //写回去 ctx.write(msg); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { //读完后flush空数据 ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // 异常 System.out.println("exception happend from indound:"+cause.getMessage()); ctx.close(); } } } |
浏览器输入:Http:127.0.0.1:7777就会显示自己发送的内容:
1 2 3 4 5 6 7 8 9 10 |
GET / HTTP/1.1 Host: 127.0.0.1:7777 Connection: keep-alive Pragma: no-cache Cache-Control: no-cache Upgrade-Insecure-Requests: 1 User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.71 Safari/537.36 Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8 Accept-Encoding: gzip, deflate, sdch, br Accept-Language: zh-CN,zh;q=0.8,en;q=0.6,zh-TW;q=0.4 |
这段代码很好理解,但是如果你细心一点就会发现一个问题:ChannelPipeline里并没有添加往回写的handler,那是如何把数据写回去的呢?这是因为在绑定一个channel时,会对这个channel的相关属性进行初始化,此时在构造这个channel的channelpipeline时会默认添加两个handler:HeadHandler(implements ChannelOutboundHandler)和TailHandler(implements ChannelInboundHandler),分别用于写回和读取数据。而之后再添加的handler则通过addLast方法插入到他们之间。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
public DefaultChannelPipeline(Channel channel) { if (channel == null) { throw new NullPointerException("channel"); } this.channel = channel; TailHandler tailHandler = new TailHandler(); tail = new DefaultChannelHandlerContext(this, null, generateName(tailHandler), tailHandler); HeadHandler headHandler = new HeadHandler(channel.unsafe()); head = new DefaultChannelHandlerContext(this, null, generateName(headHandler), headHandler); head.next = tail; tail.prev = head; } |
(DefaultChannelPipeline的构造函数)
Demo2:稍微复杂的服务器
需求:按照既定协议解析来自客户端的请求,并且把byte数据封装成java对象供后续使用。返回时按照协议格式化数据或者对数据进行压缩等操作。
这个例子比上面这个复杂一点,更接近于真实的使用情况,主要展示codec和handler的混合使用。有兴趣的可以去我的github上看一下,不再贴代码了。
参考:
Netty系列之Netty高性能之道
《Netty in Action》中文版—第七章 EventLoop和线程模型
Netty-in-Action
非常好!
谢谢!
代码有问题啊
有何问题呀?