前言 本文参考了码友 Nyima 的学习笔记 https:// nyimac.gitee.io/2021/ 04 /25/ Netty%E5%9 F%BA%E7%A1%80 /
一、概述 1. 什么是Netty 1 2 3 4 Netty 是一个异步的、基于事件驱动的网络应用框架,用于快速开发可维护、高性能的网络服务器和客户端 Netty is an asynchronous event-driven network application frameworkfor rapid development of maintainable high performance protocol servers & clients.
2. 注意 1 netty的异步还是基于多路复用的,并没有实现真正意义上的异步IO
3. 优势 传统 NIO
工作量大
bug 多
需要自己构建协议
解决了 TCP 传输问题,如粘包、半包
因为 bug 的存在,会导致 epoll 空轮询导致 CPU 占用 100%
Netty
API 增强,易于使用,不需要做大量重复事情
增强类,如:
1 2 FastThreadLocal => ThreadLocalByteBuf => ByteBuffer
4. 为什么Netty大量使用了异步? 1 2 简而言之就是,以响应速度换取了吞吐量 详情看 https:// www.bilibili.com/video/ BV1py4y1E7oA?p=70 &spm_id_from=pageDriver
5. 读与写 我最初在认识上有这样的误区,认为只有在netty,nio这样的多路复用I0模型时,读写才不会相互阻塞才可以实现高效的双向通信,
但实际上,Java Socket是全双工的:在任意时刻,线路上存在A到B和B到A的双向信号传输。 即使是阻塞lO,读和写是可以同时进行的,只要分别采用读线程和写线程即可,读不会阻塞写、写也不会阻塞读
二、案例 服务端 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public class HelloServer { public static void main (String[] args) { new ServerBootstrap() .group(new NioEventLoopGroup()) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel (NioSocketChannel nioSocketChannel) throws Exception { nioSocketChannel.pipeline().addLast(new StringDecoder()); nioSocketChannel.pipeline().addLast(new SimpleChannelInboundHandler<String>() { @Override protected void channelRead0 (ChannelHandlerContext channelHandlerContext, String s) throws Exception { System.out.println(s); } }); } }).bind(8080 ); } }
客户端 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public class HelloClient { public static void main (String[] args) throws InterruptedException { new Bootstrap() .group(new NioEventLoopGroup()) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<Channel>() { @Override protected void initChannel (Channel channel) throws Exception { channel.pipeline().addLast(new StringEncoder()); } }) .connect(new InetSocketAddress("localhost" , 8080 )) .sync() .channel() .writeAndFlush("hello world" ); } }
执行流程
三、重要组件 1. EventLoop 和 EventLoopGroup ①. EventLoopGroup 1 2 3 4 5 6 事件循环组对象 本质上是一组EventLoop对象Channel 一般会调用 EventLoopGroup 的 register 方法来绑定其中一个 EventLoop, 后续这个 Channel 上的 IO 事件都会由此 EventLoop 来处理 目的是为了保证 IO 事件处理时的线程安全
Ⅰ. 继承关系
EventExecutorGroup 1 2 实现了 Iterable 接口提供遍历 EventLoop 的能力 另有 next 方法获取集合中下一个 EventLoop
②. EventLoop 1 2 3 4 事件循环对象 本质上是一个单线程执行器(同时维护了一个 Selector), 里面有 run 方法处理 Channel 上源源不断的 IO 事件
Ⅰ. 继承关系
ScheduledExecutorService
OrderedEventExecutor 1 2 提供了 boolean inEventLoop (Thread thread) 方法判断一个线程是否属于 EventLoop提供了 parent 方法来看看自己属于哪个 EventLoopGroup
③. 普通任务 Ⅰ. 代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 @Slf4j public class DefaultTaskTest { public static void main (String[] args) { NioEventLoopGroup group = new NioEventLoopGroup(2 ); System.out.println(group.next()); System.out.println(group.next()); System.out.println(group.next()); System.out.println(group.next()); System.out.println(group.next()); group.next().submit(() -> { try { Thread.sleep(1000 ); } catch (InterruptedException e) { e.printStackTrace(); } log.debug("ok" ); }); group.next().submit(() -> { try { Thread.sleep(1000 ); } catch (InterruptedException e) { e.printStackTrace(); } log.debug("ok" ); }); log.debug("main" ); } }
④. 定时任务 Ⅰ. 代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Slf4j public class TimingTaskTest { public static void main (String[] args) { NioEventLoopGroup group = new NioEventLoopGroup(2 ); System.out.println(group.next()); System.out.println(group.next()); System.out.println(group.next()); System.out.println(group.next()); System.out.println(group.next()); group.next().scheduleAtFixedRate(() -> { log.debug("ok" ); }, 0 , 1 , TimeUnit.SECONDS); log.debug("main" ); } }
⑤. IO任务 Ⅰ. 客户端代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @Slf4j public class IOTestClientTest { public static void main (String[] args) throws InterruptedException { Channel channel = new Bootstrap() .group(new NioEventLoopGroup()) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel (NioSocketChannel ch) throws Exception { ch.pipeline().addLast(new StringEncoder()); } }) .connect(new InetSocketAddress("localhost" , 8080 )) .sync() .channel(); System.out.println(channel); System.out.println("" ); } }
Ⅱ. 服务端代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @Slf4j public class IOTaskServerTest { public static void main (String[] args) { new ServerBootstrap() .group(new NioEventLoopGroup()) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel (NioSocketChannel ch) throws Exception { ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; log.debug(buf.toString(Charset.defaultCharset())); } }); } }) .bind(8080 ); } }
⑥. 分工细化 Ⅰ. 服务端代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 @Slf4j public class DivisionOfLaborTest { public static void main (String[] args) { DefaultEventLoopGroup group = new DefaultEventLoopGroup(); new ServerBootstrap() .group(new NioEventLoopGroup(), new NioEventLoopGroup(2 )) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel (NioSocketChannel ch) throws Exception { ch.pipeline().addLast("handler1" , new ChannelInboundHandlerAdapter() { @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; log.debug(buf.toString(Charset.defaultCharset())); ctx.fireChannelRead(msg); } }).addLast(group, "handler2" , new ChannelInboundHandlerAdapter() { @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; log.debug(buf.toString(Charset.defaultCharset())); } }); } }) .bind(8080 ); } }
⑦. 切换线程的原理 1 在 ⑥. 分工细化中, 出现了 handler1 线程转换为 handler2 线程,其原理如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 static void invokeChannelRead (final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg" ), next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRead(m); } else { executor.execute(new Runnable() { public void run () { next.invokeChannelRead(m); } }); } }
1 2 简而言之就是,如果两个 handler 绑定的是同一个线程,那么就直接调用 否则,把要调用的代码封装为一个任务对象,由下一个 handler 的线程来执行
2. Channel Channel 的常用方法
close() 可以用来关闭Channel
closeFuture() 用来处理 Channel 的关闭
sync 方法作用是同步等待 Channel 关闭
而 addListener 方法是异步等待 Channel 关闭
pipeline() 方法用于添加处理器
write() 方法将数据写入
因为缓冲机制,数据被写入到 Channel 中以后,不会立即被发送
只有当缓冲满了或者调用了flush()方法后 ,才会将数据通过 Channel 发送出去
writeAndFlush() 方法将数据写入并立即发送(刷出)
①. ChannelFuture Ⅰ. 同步连接 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public class SynConnectionChannelFutureTest { public static void main (String[] args) throws IOException, InterruptedException { ChannelFuture channelFuture = new Bootstrap() .group(new NioEventLoopGroup()) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel (SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new StringEncoder()); } }) .connect(new InetSocketAddress("localhost" , 8080 )); channelFuture.sync(); Channel channel = channelFuture.channel(); channel.writeAndFlush("hello world" ); System.in.read(); } }
如果我们去掉channelFuture.sync()
方法,会服务器无法收到hello world
这是因为建立连接(connect)的过程是异步非阻塞 的,若不通过sync()
方法阻塞主线程,等待连接真正建立,这时通过 channelFuture.channel() 拿到的 Channel 对象,并不是真正与服务器建立好连接的 Channel ,也就没法将信息正确的传输给服务器端
所以需要通过channelFuture.sync()
方法,阻塞主线程,同步处理结果 ,等待连接真正建立好以后,再去获得 Channel 传递数据。使用该方法,获取 Channel 和发送数据的线程都是主线程
Ⅱ. 异步连接 下面还有一种方法,用于异步 获取建立连接后的 Channel 和发送数据,使得执行这些操作的线程是 NIO 线程(去执行connect操作的线程)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public class AsyncConnectionChannelFutureTest { public static void main (String[] args) throws IOException, InterruptedException { ChannelFuture channelFuture = new Bootstrap() .group(new NioEventLoopGroup()) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel (SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new StringEncoder()); } }) .connect(new InetSocketAddress("localhost" , 8080 )); channelFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete (ChannelFuture channelFuture) throws Exception { Channel channel = channelFuture.channel(); channel.writeAndFlush("hello world" ); } }); System.in.read(); } }
Ⅲ. 同步关闭 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public class SynCloseChannelFutureClientTest { public static void main (String[] args) throws InterruptedException { NioEventLoopGroup group = new NioEventLoopGroup(); ChannelFuture channelFuture = new Bootstrap() .group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel (NioSocketChannel ch) throws Exception { ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG)); ch.pipeline().addLast(new StringEncoder()); } }).connect(new InetSocketAddress("localhost" , 8080 )); Channel channel = channelFuture.sync().channel(); new Thread(() -> { Scanner scanner = new Scanner(System.in); while (true ) { String line = scanner.nextLine(); if ("q" .equals(line)) { channel.close(); break ; } channel.writeAndFlush(line); } }, "input" ).start(); ChannelFuture closeFuture = channel.closeFuture(); System.out.println("等待关闭" ); closeFuture.sync(); System.out.println("处理关闭之后的操作" ); group.shutdownGracefully(); } }
Ⅳ. 异步关闭 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 public class AsyncCloseChannelFutureClientTest { public static void main (String[] args) throws InterruptedException { NioEventLoopGroup group = new NioEventLoopGroup(); ChannelFuture channelFuture = new Bootstrap() .group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel (NioSocketChannel ch) throws Exception { ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG)); ch.pipeline().addLast(new StringEncoder()); } }).connect(new InetSocketAddress("localhost" , 8080 )); Channel channel = channelFuture.sync().channel(); new Thread(() -> { Scanner scanner = new Scanner(System.in); while (true ) { String line = scanner.nextLine(); if ("q" .equals(line)) { channel.close(); break ; } channel.writeAndFlush(line); } }, "input" ).start(); ChannelFuture closeFuture = channel.closeFuture(); System.out.println("等待关闭" ); closeFuture.addListener((ChannelFutureListener) channelFuture1 -> { System.out.println("处理关闭之后的操作" ); group.shutdownGracefully(); }); } }
Ⅴ. 优雅地关闭 在上述两种关闭期间 输入 q
后并不会使服务真正地关闭
是因为 NioEventLoopGroup 仍存在线程未被关闭
所以可以使用 shutdownGracefully()
方法进行关闭
3. Future 和 Promise netty 中的 Future 与 jdk 中的 Future 同名 ,但是是两个接口
netty 的 Future 继承自 jdk 的 Future,而 Promise 又对 netty Future 进行了扩展
jdk Future 只能同步等待任务结束(或成功、或失败)才能得到结果
netty Future 可以同步等待任务结束得到结果,也可以异步方式得到结果,但都是要等任务结束
netty Promise 不仅有 netty Future 的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器
功能/名称
jdk Future
netty Future
Promise
cancel
取消任务
-
-
isCanceled
任务是否取消
-
-
isDone
任务是否完成,不能区分成功失败
-
-
get
获取任务结果,阻塞等待
-
-
getNow
-
获取任务结果,非阻塞,还未产生结果时返回 null
-
await
-
等待任务结束,如果任务失败,不会抛异常 ,而是通过 isSuccess 判断
-
sync
-
等待任务结束,如果任务失败,抛出异常
-
isSuccess
-
判断任务是否成功
-
cause
-
获取失败信息,非阻塞,如果没有失败,返回null
-
addLinstener
-
添加回调,异步接收结果
-
setSuccess
-
-
设置成功结果
setFailure
-
-
设置失败结果
①. JDK 的 Future 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public class JDKFutureTest { public static void main (String[] args) throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(2 ); Future<Integer> future = executorService.submit(() -> { System.out.println("执行计算" ); Thread.sleep(1000 ); return 50 ; }); System.out.println("等待结果" ); System.out.println("结果是:" + future.get()); } }
②. Netty 的 Future 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 @Slf4j public class NettyFutureTest { public static void main (String[] args) { async(); } public static void sync () throws ExecutionException, InterruptedException { NioEventLoopGroup group = new NioEventLoopGroup(); EventLoop eventLoop = group.next(); Future<Integer> future = eventLoop.submit(() -> { System.out.println("执行计算" ); Thread.sleep(1000 ); return 70 ; }); System.out.println("等待结果" ); System.out.println("结果是:" + future.get()); } public static void async () { NioEventLoopGroup group = new NioEventLoopGroup(); EventLoop eventLoop = group.next(); Future<Integer> future = eventLoop.submit(() -> { log.debug("执行计算" ); Thread.sleep(1000 ); return 70 ; }); future.addListener(new GenericFutureListener<Future<? super Integer>>() { @Override public void operationComplete (Future<? super Integer> future) throws Exception { log.debug("结果是:" + future.get()); } }); } }
③. Netty 的 Promise 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 @Slf4j public class NettyPromiseTest { public static void main (String[] args) throws ExecutionException, InterruptedException { EventLoop eventLoop = new NioEventLoopGroup().next(); DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop); new Thread(() -> { log.debug("开始计算" ); try { Thread.sleep(1000 ); } catch (InterruptedException e) { e.printStackTrace(); } promise.setSuccess(80 ); }).start(); log.debug("等待结果" ); log.debug("结果是:{}" , promise.get()); } }
4. PipeLine 和 Handler ChannelHandler
用来处理 Channel
上的各种事件,分为入站、出站 两种。
所有 ChannelHandler
被连成—串,就是 Pipeline
入站处理器通常是 ChannellnboundHandlerAdapter
的子类,主要用来读取客户端数据,写回结果
出站处理器通常是 ChannelOutboundHandlerAdapter
的子类,主要对写回结果进行加工
打个比喻,每个 Channel
是一个产品的加工车间,管道是车间中的流水线,ChanneHandler
就是流水线上的各道工序,而后面要讲的 ByteBuf
是原材料,经过很多工序的加工:先经过一道道入站工序,再经过一道道出站工序最终变成产品
①. Pipeline 通过channel.pipeline().addLast(name, handler)添加handler时,记得给handler取名字 。
这样可以调用pipeline的addAfter、addBefore等方法更灵活地向pipeline中添加handler
handler需要放入通道的pipeline中,才能根据放入顺序来使用handler
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 @Slf4j public class PipelineTest { public static void main (String[] args) { new ServerBootstrap() .group(new NioEventLoopGroup()) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel (NioSocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("h1" , new ChannelInboundHandlerAdapter() { @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { log.debug("1" ); super .channelRead(ctx, msg); } }); pipeline.addLast("h2" , new ChannelInboundHandlerAdapter() { @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { log.debug("2" ); super .channelRead(ctx, msg); } }); pipeline.addLast("h3" , new ChannelInboundHandlerAdapter() { @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { log.debug("3" ); super .channelRead(ctx, msg); ch.writeAndFlush(ctx.alloc().buffer().writeBytes("server" .getBytes())); } }); pipeline.addLast("h4" , new ChannelOutboundHandlerAdapter() { @Override public void write (ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { log.debug("4" ); super .write(ctx, msg, promise); } }); pipeline.addLast("h5" , new ChannelOutboundHandlerAdapter() { @Override public void write (ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { log.debug("5" ); super .write(ctx, msg, promise); } }); pipeline.addLast("h6" , new ChannelOutboundHandlerAdapter() { @Override public void write (ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { log.debug("6" ); super .write(ctx, msg, promise); } }); } }) .bind(8080 ); } }
Ⅰ. 结构
pipeline是结构是一个带有head与tail指针的双向链表,其中的节点为handler
要通过ctx.fireChannelRead(msg)等方法,将当前handler的处理结果传递给下一个handler
当有入站 (Inbound)操作时,会从head开始向后 调用handler,直到handler不是处理Inbound操作为止
当有出站 (Outbound)操作时,会从tail开始向前 调用handler,直到handler不是处理Outbound操作为止
Ⅱ. 调用顺序
②. Handler Ⅰ. OutboundHandler socketChannel.writeAndFlush()
当handler中调用该方法进行写操作时,会触发Outbound操作,此时是从tail向前寻找OutboundHandler
ctx.writeAndFlush()
当handler中调用该方法进行写操作时,会触发Outbound操作,此时是从当前handler向前寻找OutboundHandler
Ⅱ. EmbeddedChannel EmbeddedChannel可以用于测试各个handler,通过其构造函数按顺序传入需要测试handler,然后调用对应的Inbound和Outbound方法即可
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 public class EmbeddedChannelTest { public static void main (String[] args) { ChannelInboundHandlerAdapter h1 = new ChannelInboundHandlerAdapter() { @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("1" ); super .channelRead(ctx, msg); } }; ChannelInboundHandlerAdapter h2 = new ChannelInboundHandlerAdapter() { @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("2" ); super .channelRead(ctx, msg); } }; ChannelOutboundHandlerAdapter h3 = new ChannelOutboundHandlerAdapter() { @Override public void write (ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { System.out.println("3" ); super .write(ctx, msg, promise); } }; ChannelOutboundHandlerAdapter h4 = new ChannelOutboundHandlerAdapter() { @Override public void write (ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { System.out.println("4" ); super .write(ctx, msg, promise); } }; EmbeddedChannel channel = new EmbeddedChannel(h1, h2, h3, h4); channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello" .getBytes(StandardCharsets.UTF_8))); channel.writeOutbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello" .getBytes(StandardCharsets.UTF_8))); } }
5. ByteBuf ①. 创建 ButyBuf空间日志打印工具类
1 该方法可以帮助我们更为详细地查看ByteBuf中的内容
1 2 3 4 5 6 7 8 9 10 11 12 13 public class ByteBufLogUtil { public static void log (ByteBuf buffer) { int length = buffer.readableBytes(); int rows = length / 16 + (length % 15 == 0 ? 0 : 1 ) + 4 ; StringBuilder buf = new StringBuilder(rows * 80 * 2 ) .append("read index:" ).append(buffer.readerIndex()) .append(" write index:" ).append(buffer.writerIndex()) .append(" capacity:" ).append(buffer.capacity()) .append(NEWLINE); appendPrettyHexDump(buf, buffer); System.out.println(buf.toString()); } }
测试代码
1 2 3 4 5 6 7 8 9 10 11 12 public class ByteBufAllocatorTest { public static void main (String[] args) { ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(); log(buffer); StringBuilder stringBuilder = new StringBuilder(); for (int i = 0 ; i < 32 ; i++) { stringBuilder.append("a" ); } buffer.writeBytes(stringBuilder.toString().getBytes()); log(buffer); } }
②. 直接内存和堆内存 通过该方法创建的ByteBuf,使用的是基于直接内存 的ByteBuf
1 ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16 );
可以使用下面的代码来创建池化基于堆 的 ByteBuf
1 ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer(16 );
也可以使用下面的代码来创建池化基于直接内存 的 ByteBuf
1 ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer(16 );
直接内存创建和销毁的代价昂贵,但读写性能高(少一次内存复制),适合配合池化功能一起用
直接内存对 GC 压力小,因为这部分内存不受 JVM 垃圾回收的管理,但也要注意及时主动释放
测试代码
1 2 3 4 5 6 7 8 9 10 11 @Test public void HeapAndDirectTest () { ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16 ); System.out.println(buffer.getClass()); buffer = ByteBufAllocator.DEFAULT.heapBuffer(16 ); System.out.println(buffer.getClass()); buffer = ByteBufAllocator.DEFAULT.directBuffer(16 ); System.out.println(buffer.getClass()); }
结果
1 2 3 4 5 6 7 8 class io .netty .buffer .PooledUnsafeDirectByteBuf // 使用池化的堆内存 class io .netty .buffer .PooledUnsafeHeapByteBuf // 使用池化的直接内存 class io .netty .buffer .PooledUnsafeDirectByteBuf
③. 池化与非池化 池化的最大意义在于可以重用 ByteBuf,优点有
没有池化,则每次都得创建新的 ByteBuf 实例,这个操作对直接内存代价昂贵,就算是堆内存,也会增加 GC 压力
有了池化,则可以重用池中 ByteBuf 实例,并且采用了与 jemalloc 类似的内存分配算法提升分配效率
高并发时,池化功能更节约内存,减少内存溢出的可能
池化功能是否开启,可以通过下面的系统环境变量来设置
1 -Dio .netty.allocator.type ={unpooled|pooled}
4.1 以后,非 Android 平台默认启用池化实现 ,Android 平台启用非池化实现
4.1 之前,池化功能还不成熟,默认是非池化实现
④. 组成 ByteBuf 主要有以下几个组成部分
⑤. 写入 常用方法如下
方法签名
含义
备注
writeBoolean(boolean value)
写入 boolean 值
用一字节 01|00 代表 true|false
writeByte(int value)
写入 byte 值
writeShort(int value)
写入 short 值
writeInt(int value)
写入 int 值
Big Endian(大端写入) ,即 0x250,写入后 00 00 02 50
writeIntLE(int value)
写入 int 值
Little Endian(小端写入) ,即 0x250,写入后 50 02 00 00
writeLong(long value)
写入 long 值
writeChar(int value)
写入 char 值
writeFloat(float value)
写入 float 值
writeDouble(double value)
写入 double 值
writeBytes(ByteBuf src)
写入 netty 的 ByteBuf
writeBytes(byte[] src)
写入 byte[]
writeBytes(ByteBuffer src)
写入 nio 的 ByteBuffer
int writeCharSequence(CharSequence sequence, Charset charset)
写入字符串
CharSequence为字符串类的父类,第二个参数为对应的字符集
注意
这些方法的未指明返回值的,其返回值都是 ByteBuf,意味着可以链式调用来写入不同的数据
网络传输中,默认习惯是 Big Endian ,使用 writeInt(int value)
使用方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public class ByteBufStudy { public static void main (String[] args) { ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16 , 20 ); ByteBufUtil.log(buffer); buffer.writeBytes(new byte []{1 , 2 , 3 , 4 }); ByteBufUtil.log(buffer); buffer.writeInt(5 ); ByteBufUtil.log(buffer); buffer.writeIntLE(6 ); ByteBufUtil.log(buffer); buffer.writeLong(7 ); ByteBufUtil.log(buffer); } }
运行结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 read index:0 write index:0 capacity:16 read index:0 write index:4 capacity:16 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000 | 01 02 03 04 |.... | +--------+-------------------------------------------------+----------------+ read index:0 write index:8 capacity:16 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000 | 01 02 03 04 00 00 00 05 |........ | +--------+-------------------------------------------------+----------------+ read index:0 write index:12 capacity:16 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000 | 01 02 03 04 00 00 00 05 06 00 00 00 |............ | +--------+-------------------------------------------------+----------------+ read index:0 write index:20 capacity:20 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000 | 01 02 03 04 00 00 00 05 06 00 00 00 00 00 00 00 |................| |00000010 | 00 00 00 07 |.... | +--------+-------------------------------------------------+----------------+
还有一类方法是 set 开头 的一系列方法,也可以写入数据,但不会改变写指针位置
⑥. 扩容 当ByteBuf中的容量无法容纳写入的数据时,会进行扩容操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 buffer.writeLong(7 ); ByteBufUtil.log(buffer); read index:0 write index:12 capacity:16 ... read index:0 write index:20 capacity:20 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000 | 01 02 03 04 00 00 00 05 06 00 00 00 00 00 00 00 |................| |00000010 | 00 00 00 07 |.... | +--------+-------------------------------------------------+----------------+
扩容规则
如何写入后数据大小未超过 512 字节,则选择下一个 16 的整数倍进行扩容
例如写入后大小为 12 字节,则扩容后 capacity 是 16 字节
如果写入后数据大小超过 512 字节,则选择下一个 2^n
例如写入后大小为 513 字节,则扩容后 capacity 是 2^10=1024 字节(2^9=512 已经不够了)
扩容不能超过 maxCapacity,否则会抛出java.lang.IndexOutOfBoundsException
异常
1 2 Exception in thread "main" java.lang.IndexOutOfBoundsException: writerIndex(20) + minWritableBytes(8) exceeds maxCapacity(20) : PooledUnsafeDirectByteBuf(ridx : 0, widx : 20, cap : 20/ 20) ...
⑦. 读取 读取主要是通过一系列 read 方法进行读取,读取时会根据读取数据的字节数移动读指针
如果需要重复读取 ,需要调用buffer.markReaderIndex()
对读指针进行标记,并通过buffer.resetReaderIndex()
将读指针恢复到mark标记的位置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public class ByteBufStudy { public static void main (String[] args) { ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16 , 20 ); buffer.writeBytes(new byte []{1 , 2 , 3 , 4 }); buffer.writeInt(5 ); System.out.println(buffer.readByte()); System.out.println(buffer.readByte()); System.out.println(buffer.readByte()); System.out.println(buffer.readByte()); ByteBufUtil.log(buffer); buffer.markReaderIndex(); System.out.println(buffer.readInt()); ByteBufUtil.log(buffer); buffer.resetReaderIndex(); ByteBufUtil.log(buffer); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 1 2 3 4 read index:4 write index:8 capacity:16 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000 | 00 00 00 05 |.... | +--------+-------------------------------------------------+----------------+5 read index:8 write index:8 capacity:16 read index:4 write index:8 capacity:16 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000 | 00 00 00 05 |.... | +--------+-------------------------------------------------+----------------+
还有以 get 开头的一系列方法,这些方法不会改变读指针的位置
⑧. 释放 由于 Netty 中有堆外内存(直接内存)的 ByteBuf 实现,堆外内存最好是手动来释放 ,而不是等 GC 垃圾回收。
UnpooledHeap ByteBuf 使用的是 JVM 内存,只需等 GC 回收内存即可
UnpooledDirect ByteBuf 使用的就是直接内存 了,需要特殊的方法来回收内存
Pooled ByteBuf 和它的子类使用了池化机制 ,需要更复杂的规则来回收内存
Netty 这里采用了引用计数法来控制回收内存,每个 ByteBuf 都实现了 ReferenceCounted 接口
每个 ByteBuf 对象的初始计数为 1
调用 release 方法计数减 1,如果计数为 0,ByteBuf 内存被回收
调用 retain 方法计数加 1,表示调用者没用完之前,其它 handler 即使调用了 release 也不会造成回收
当计数为 0 时,底层内存会被回收,这时即使 ByteBuf 对象还在,其各个方法均无法正常使用
释放规则 因为 pipeline 的存在,一般需要将 ByteBuf 传递给下一个 ChannelHandler,如果在每个 ChannelHandler 中都去调用 release ,就失去了传递性 (如果在这个 ChannelHandler 内这个 ByteBuf 已完成了它的使命,那么便无须再传递)
基本规则是,谁是最后使用者,谁负责 release
当ByteBuf被传到了pipeline的head与tail时 ,ByteBuf会被其中的方法彻底释放,但前提是ByteBuf被传递到了head与tail中
TailConext中释放ByteBuf的源码
1 2 3 4 5 6 7 8 protected void onUnhandledInboundMessage (Object msg) { try { logger.debug("Discarded inbound message {} that reached at the tail of the pipeline. Please check your pipeline configuration." , msg); } finally { ReferenceCountUtil.release(msg); } }
判断传过来的是否为ByteBuf,是的话才需要释放
1 2 3 public static boolean release (Object msg) { return msg instanceof ReferenceCounted ? ((ReferenceCounted)msg).release() : false ; }
⑨. 切片 Ⅰ. slice ByteBuf切片是【零拷贝】的体现之一,对原始 ByteBuf 进行切片成多个 ByteBuf,切片后的 ByteBuf 并没有发生内存复制,还是使用原始 ByteBuf 的内存 ,切片后的 ByteBuf 维护独立的 read,write 指针
得到分片后的buffer后,要调用其retain方法,使其内部的引用计数加一。避免原ByteBuf释放,导致切片buffer无法使用
修改原ByteBuf中的值,也会影响切片后得到的ByteBuf
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public class ByteBufSliceTest { public static void main (String[] args) { ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16 , 20 ); buffer.writeBytes(new byte []{'a' , 'b' , 'c' , 'd' , 'e' , 'f' , 'g' , 'h' , 'i' , 'j' }); ByteBuf slice1 = buffer.slice(0 , 5 ); ByteBuf slice2 = buffer.slice(5 , 5 ); slice1.retain(); slice2.retain(); log(slice1); log(slice2); System.out.println("===========修改原buffer中的值===========" ); buffer.setByte(0 ,'b' ); System.out.println("===========打印slice1===========" ); log(slice1); } }
运行结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 read index:0 write index:5 capacity:5 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000 | 61 62 63 64 65 |abcde | +--------+-------------------------------------------------+----------------+ read index:0 write index:5 capacity:5 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000 | 66 67 68 69 6a |fghij | +--------+-------------------------------------------------+----------------+ ===========修改原buffer中的值=========== ===========打印slice1=========== read index:0 write index:5 capacity:5 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000 | 62 62 63 64 65 |bbcde | +--------+-------------------------------------------------+----------------+ Process finished with exit code 0
Ⅱ. duplicate 【零拷贝】的体现之一,就好比截取了原始 ByteBuf 所有内容,并且没有 max capacity 的限制,也是与原始 ByteBuf 使用同一块底层内存,只是读写指针是独立的
Ⅲ. copy 会将底层内存数据进行深拷贝,因此无论读写,都与原始 ByteBuf 无关
⑩. 合成 与切片是相反操作,是将多个 ByteBuf 碎片,合成为一个ByteBuf ,同时在合成过程中也不会发生数据的拷贝
Ⅰ. composite 1 2 3 4 5 6 7 8 9 10 11 12 public class ByteBufCompositeTest { public static void main (String[] args) { ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer(); buf1.writeBytes(new byte []{'a' , 'b' , 'c' , 'd' , 'e' }); ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer(); buf1.writeBytes(new byte []{'f' , 'g' , 'h' , 'i' , 'j' }); CompositeByteBuf buffs = ByteBufAllocator.DEFAULT.compositeBuffer(); buffs.addComponents(true , buf1, buf2); log(buffs); } }
运行结果
1 2 3 4 5 6 read index:0 write index:10 capacity:10 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000 | 61 62 63 64 65 66 67 68 69 6a |abcdefghij | +--------+-------------------------------------------------+----------------+
⑪. 优势
池化思想 - 可以重用池中 ByteBuf 实例,更节约内存,减少内存溢出的可能
读写指针分离 ,不需要像 ByteBuffer 一样切换读写模式
可以自动扩容
支持链式调用,使用更流畅
很多地方体现零拷贝,例如
slice、duplicate、CompositeByteBuf
6. 总结 可以将各大重要组件分为三大类:
网络通信层 BootStrap 负责客户端启动,并且去连接远程的Netty Server
ServerBootStrap 负责服务端的监听,用来监听指定的一个端口
Channel 负责网络通信的一个载体
事件调度层 EventLoopGroup 本质上是一个线程池,主要去负责接收IO请求,并分配线程去执行处理请求
EventLoop 是EventLoopGroup
线程池中的一个具体线程
服务编排层 ChannelPipeLine 负责处理多个ChannelHandler
,将多个ChannelHandler
构成一个执行链,从而实现流水线的效果
ChannelHandler 针对 IO 数据的一个处理器,数据处理接收后通过指定的 Handler
进行处理
ChannelHandlerContext 用来保存ChannelHandler
上下文信息