Netty编解码、粘包、拆包、零拷贝详解 Netty编解码 Netty涉及到编解码的组件有Channel、ChannelHandler、ChannelPipe等,先大概了解下这几个组件的作用。
ChannelHandler ChannelHandler充当了处理入站和出站数据的应用程序逻辑容器 。例如,实现ChannelInboundHandler接口(或ChannelInboundHandlerAdapter),你就可以接收入站事件和数据,这些数据随后会被你的应用程序的业务逻辑处理。当你要给连接的客户端发送响应时,也可以从ChannelInboundHandler 冲刷数据。你的业务逻辑通常写在一个或者多个ChannelInboundHandler中。ChannelOutboundHandler 原理一样,只不过它是用来处理出站数据的。
分两种,ChannelInboundHandler 和ChannelOutboundHandler
ChannelPipeline ChannelPipeline提供了ChannelHandler链的容器 。以客户端应用程序为例,如果事件的运动方向是从客户端到服务端的,那么我们称这些事件为出站的,即客户端发送给服务端的数据会通过pipeline中的一系列ChannelOutboundHandler(ChannelOutboundHandler调用是从tail到head方向逐个调用每个handler的逻辑),并被这些Handler处理,反之则称为入站的,入站只调用pipeline里的ChannelInboundHandler逻辑(ChannelInboundHandler调用是从head到tail方向逐个调用每个handler的逻辑)。
简单来说:不论对服务端合适客户端来说,消息发送出去就是出站(pipeline是从tail->head),接受消息就是入站(pipeline是从head->tail)
编码解码器 当你通过Netty发送或者接受一个消息的时候,就将会发生一次数据转换。入站消息会被解码:从字节转换为另一种格式(比如java对象);如果是出站消息,它会被编码成字节。Netty提供了一系列实用的编码解码器,他们都实现了ChannelInboundHadnler或者ChannelOutcoundHandler接口。在这些类中,channelRead方法已经被重写了。以入站为例,对于每个从入站Channel读取的消息,这个方法会被调用。随后,它将调用由已知解码器所提供的decode()方法进行解码,并将已经解码的字节转发给ChannelPipeline中的下一个ChannelInboundHandler。Netty提供了很多编解码器,比如编解码字符串的StringEncoder和StringDecoder,编解码对象的ObjectEncoder和ObjectDecoder等。
当然也可以通过集成ByteToMessageDecoder、MessageToByteEncoder自定义编解码器。
Netty粘包拆包 是什么 TCP粘包拆包是指发送方发送的若干包数据到接收方接收时粘成一包或某个数据包被拆开接收。如下图所示,client发了两个数据包D1和D2,但是server端可能会收到如下几种情况的数据。
为什么 TCP是面向连接的,面向流的,提供高可靠性服务。收发两端(客户端和服务器端)都要有成对的socket,因此,发送端为了将多个发给接收端的包,更有效的发给对方,使用了优化方法(Nagle算法),将多次间隔较小且数据量小的数据,合并成一个大的数据块,然后进行封包。这样做虽然提高了效率,但是接收端就难于分辨出完整的数据包了,因为面向流的通信是无消息保护边界的。
简单理解:TCP传输协议有个缓冲区,传输数据时是以缓冲区大小为单位传输的。
当D1D2消息发送间隔较短且数据包较小会发生合并(粘包)
当D2太大,被拆分成多个运输(拆包)
当D1D2两个包都刚好满足TCP缓冲区的大小,或者说其等待时间已经达到TCP等待时长,从而还是使用两个独立的包进行发送
怎么做 格式化数据 每条数据有固定的格式(开始符、结束符),这种方法简单易行,但选择开始符和结束符的时候一定要注意每条数据的 内部一定不能出现开始符或结束符
发送数据长度 发送每条数据的时候,将数据的长度一并发送,比如可以选择每条数据的前4位是数据的长度,应用层处理时可以根据长度来判断每条数据的开始和结束。
发送数据长度解决粘包拆包问题代码示例
关键代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 package netty.splitpackage;public class MyMessageProtocol { private int len; private byte [] content; public void setLen (int len) { this .len = len; } public void setContent (byte [] content) { this .content = content; } public int getLen () { return len; } public byte [] getContent() { return content; } }
1 2 3 4 5 6 7 8 public class MyMessageEncoder extends MessageToByteEncoder <MyMessageProtocol > { @Override protected void encode (ChannelHandlerContext ctx, MyMessageProtocol msg, ByteBuf out) throws Exception { System.out.println("MyMessageEncoder encode 方法被调用" ); out.writeInt(msg.getLen()); out.writeBytes(msg.getContent()); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public class MyMessageDecoder extends ByteToMessageDecoder { int length = 0 ; @Override protected void decode (ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { System.out.println("MyMessageDecoder decode 被调用" ); System.out.println(in); while (in.readableBytes() >= 4 ){ if (length == 0 ){ length = in.readInt(); } if (in.readableBytes() < length){ System.out.println("当前可读数据不够,继续等待。。" ); return ; } byte [] data = new byte [length]; in.readBytes(data); MyMessageProtocol messageProtocol = new MyMessageProtocol(); messageProtocol.setLen(length); messageProtocol.setContent(data); out.add(messageProtocol); length = 0 ; } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 public class MyClientHandler extends SimpleChannelInboundHandler <MyMessageProtocol > { @Override public void channelActive (ChannelHandlerContext ctx) throws Exception { for (int i = 0 ; i< 200 ; i++) { String msg = "你好,我是张三!" ; MyMessageProtocol messageProtocol = new MyMessageProtocol(); messageProtocol.setLen(msg.getBytes(CharsetUtil.UTF_8).length); messageProtocol.setContent(msg.getBytes(CharsetUtil.UTF_8)); ctx.writeAndFlush(messageProtocol); } } }
1 2 3 4 5 6 7 8 9 10 11 public class MyServerHandler extends SimpleChannelInboundHandler <MyMessageProtocol > { private int count; @Override protected void channelRead0 (ChannelHandlerContext ctx, MyMessageProtocol msg) throws Exception { System.out.println("====服务端接收到消息如下====" ); System.out.println("长度=" + msg.getLen()); System.out.println("内容=" + new String(msg.getContent(), CharsetUtil.UTF_8)); System.out.println("服务端接收到消息包数量=" + (++this .count)); } }
Netty心跳检测机制 所谓心跳, 即在 TCP 长连接中, 客户端和服务器之间定期发送的一种特殊的数据包, 通知对方自己还在线, 以确保 TCP 连接的有效性.
在 Netty 中, 实现心跳机制的关键是 IdleStateHandler, 看下它的构造器:
1 2 3 4 5 public IdleStateHandler ( long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) { this (false , readerIdleTime, writerIdleTime, allIdleTime, unit); }
readerIdleTimeSeconds : 读超时. 即当在指定的时间间隔内没有从 Channel 读取到数据时, 会触发一个 READER_IDLE 的 IdleStateEvent 事件.
writerIdleTimeSeconds : 写超时. 即当在指定的时间间隔内没有数据写入到 Channel 时, 会触发一个 WRITER_IDLE 的 IdleStateEvent 事件
allIdleTimeSeconds : 读/写超时. 即当在指定的时间间隔内没有读或写操作时, 会触发一个 ALL_IDLE 的 IdleStateEvent 事件.
注:这三个参数默认的时间单位是秒。若需要指定其他时间单位,可以使用另一个构造方法:
IdleStateHandler源码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 public class IdleStateHandler extends ChannelDuplexHandler { ... private final long readerIdleTimeNanos; private final long writerIdleTimeNanos; private final long allIdleTimeNanos; private ScheduledFuture<?> readerIdleTimeout; private long lastReadTime; private boolean firstReaderIdleEvent = true ; private ScheduledFuture<?> writerIdleTimeout; private long lastWriteTime; private boolean firstWriterIdleEvent = true ; private ScheduledFuture<?> allIdleTimeout; private boolean firstAllIdleEvent = true ; @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0 ) { reading = true ; firstReaderIdleEvent = firstAllIdleEvent = true ; } ctx.fireChannelRead(msg); } @Override public void channelActive (ChannelHandlerContext ctx) throws Exception { initialize(ctx); super .channelActive(ctx); } private void initialize (ChannelHandlerContext ctx) { switch ( state) { case 1 : case 2 : return ; } state = 1 ; initOutputChanged(ctx); lastReadTime = lastWriteTime = ticksInNanos(); if (readerIdleTimeNanos > 0 ) { readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx), readerIdleTimeNanos, TimeUnit.NANOSECONDS); } if (writerIdleTimeNanos > 0 ) { writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx), writerIdleTimeNanos, TimeUnit.NANOSECONDS); } if (allIdleTimeNanos > 0 ) { allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx), allIdleTimeNanos, TimeUnit.NANOSECONDS); } } private final class ReaderIdleTimeoutTask extends AbstractIdleTask { ReaderIdleTimeoutTask(ChannelHandlerContext ctx) { super (ctx); } @Override protected void run (ChannelHandlerContext ctx) { long nextDelay = readerIdleTimeNanos; if (!reading) { nextDelay -= ticksInNanos() - lastReadTime; } if (nextDelay <= 0 ) { readerIdleTimeout = schedule(ctx, this , readerIdleTimeNanos, TimeUnit.NANOSECONDS); boolean first = firstReaderIdleEvent; firstReaderIdleEvent = false ; try { IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first); channelIdle(ctx, event); } catch (Throwable t) { ctx.fireExceptionCaught(t); } } else { readerIdleTimeout = schedule(ctx, this , nextDelay, TimeUnit.NANOSECONDS); } } } }
示例代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 public class HeartBeatClient { public static void main (String[] args) throws Exception { EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel (SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("decoder" , new StringDecoder()); pipeline.addLast("encoder" , new StringEncoder()); pipeline.addLast(new HeartBeatClientHandler()); } }); System.out.println("netty client start。。" ); Channel channel = bootstrap.connect("127.0.0.1" , 9000 ).sync().channel(); String text = "Heartbeat Packet" ; Random random = new Random(); while (channel.isActive()) { int num = random.nextInt(10 ); Thread.sleep(num * 1000 ); channel.writeAndFlush(text); } } catch (Exception e) { e.printStackTrace(); } finally { eventLoopGroup.shutdownGracefully(); } } static class HeartBeatClientHandler extends SimpleChannelInboundHandler <String > { @Override protected void channelRead0 (ChannelHandlerContext ctx, String msg) throws Exception { System.out.println(" client received :" + msg); if (msg != null && msg.equals("idle close" )) { System.out.println(" 服务端关闭连接,客户端也关闭" ); ctx.channel().closeFuture(); } } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public class HeartBeatServer { public static void main (String[] args) throws Exception { EventLoopGroup boss = new NioEventLoopGroup(); EventLoopGroup worker = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(boss, worker) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel (SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("decoder" , new StringDecoder()); pipeline.addLast("encoder" , new StringEncoder()); pipeline.addLast(new IdleStateHandler(3 , 0 , 0 , TimeUnit.SECONDS)); pipeline.addLast(new HeartBeatServerHandler()); } }); System.out.println("netty server start。。" ); ChannelFuture future = bootstrap.bind(9000 ).sync(); future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { worker.shutdownGracefully(); boss.shutdownGracefully(); } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 public class HeartBeatServerHandler extends SimpleChannelInboundHandler <String > { int readIdleTimes = 0 ; @Override protected void channelRead0 (ChannelHandlerContext ctx, String s) throws Exception { System.out.println(" ====== > [server] message received : " + s); if ("Heartbeat Packet" .equals(s)) { ctx.channel().writeAndFlush("ok" ); } else { System.out.println(" 其他信息处理 ... " ); } } @Override public void channelActive (ChannelHandlerContext ctx) throws Exception { System.err.println("=== " + ctx.channel().remoteAddress() + " is active ===" ); } @Override public void userEventTriggered (ChannelHandlerContext ctx, Object evt) throws Exception { IdleStateEvent event = (IdleStateEvent) evt; boolean first = event.isFirst(); String eventType = null ; switch (event.state()) { case READER_IDLE: eventType = "读空闲" ; readIdleTimes++; break ; case WRITER_IDLE: eventType = "写空闲" ; break ; case ALL_IDLE: eventType = "读写空闲" ; break ; } System.out.println(ctx.channel().remoteAddress() + "超时事件:" + eventType + "是否连续超时:" + !first); if (readIdleTimes > 3 ) { System.out.println(" [server]读空闲超过3次,关闭连接,释放更多资源" ); ctx.channel().writeAndFlush("idle close" ); ctx.channel().close(); } } }
结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 客户端while (channel.isActive()) { int num = random.nextInt(10 ); Thread.sleep(20 * 1000 ); channel.writeAndFlush(text); } netty client start。。 服务端关闭连接,客户端也关闭 服务端 === /127.0 .0 .1 :62678 is active === /127.0 .0 .1 :62678 超时事件:读空闲是否连续超时:false /127.0 .0 .1 :62678 超时事件:读空闲是否连续超时:true /127.0 .0 .1 :62678 超时事件:读空闲是否连续超时:true /127.0 .0 .1 :62678 超时事件:读空闲是否连续超时:true [server]读空闲超过3 次,关闭连接,释放更多资源
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 客户端while (channel.isActive()) { int num = random.nextInt(10 ); Thread.sleep(num * 1000 ); channel.writeAndFlush(text); } netty client start。。 client received :ok client received :ok client received :ok client received :ok client received :ok client received :idle close 服务端关闭连接,客户端也关闭 服务端 === /127.0 .0 .1 :62921 is active === /127.0 .0 .1 :62921 超时事件:读空闲是否连续超时:false ====== > [server] message received : Heartbeat Packet /127.0 .0 .1 :62921 超时事件:读空闲是否连续超时:false /127.0 .0 .1 :62921 超时事件:读空闲是否连续超时:true ====== > [server] message received : Heartbeat Packet ====== > [server] message received : Heartbeat Packet ====== > [server] message received : Heartbeat Packet ====== > [server] message received : Heartbeat Packet /127.0 .0 .1 :62921 超时事件:读空闲是否连续超时:false [server]读空闲超过3 次,关闭连接,释放更多资源
Netty零拷贝 是什么 Netty的接收和发送ByteBuffer采用DIRECT BUFFERS,使用堆外直接内存进行Socket读写,不需要进行字节缓冲区的二次拷贝 (一句话说就是Netty采用直接内存进行Socket读写时,发生0次物理内存到堆内存的拷贝)。 如果使用传统的JVM堆内存(HEAP BUFFERS)进行Socket读写,JVM会将堆内存Buffer拷贝一份到直接内存中,然后才能写入Socket 中。JVM堆内存的数据是不能直接写入Socket中的 。相比于堆外直接内存,消息在发送过程中多了一次缓冲区的内存拷贝。 可以看下netty的读写源码,比如read源码NioByteUnsafe.read()
NettyClientHandler.channelActive ->
ctx.writeAndFlush(buf) ->
NioByteUnsafe.read() ->
AbstractByteBufAllocator.oBuffer(int) ->
AbstractByteBufAllocator->directBuffer(initialCapacity);
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 public class NettyClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive (ChannelHandlerContext ctx) throws Exception { ByteBuf buf = Unpooled.copiedBuffer("HelloServer" , CharsetUtil.UTF_8); ctx.writeAndFlush(buf); } } protected class NioByteUnsafe extends AbstractNioUnsafe { @Override public final void read () { final ChannelConfig config = config(); if (shouldBreakReadReady(config)) { clearReadPending(); return ; } final ChannelPipeline pipeline = pipeline(); final ByteBufAllocator allocator = config.getAllocator(); final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); allocHandle.reset(config); ByteBuf byteBuf = null ; boolean close = false ; try { do { byteBuf = allocHandle.allocate(allocator); allocHandle.lastBytesRead(doReadBytes(byteBuf)); if (allocHandle.lastBytesRead() <= 0 ) { byteBuf.release(); byteBuf = null ; close = allocHandle.lastBytesRead() < 0 ; if (close) { readPending = false ; } break ; } allocHandle.incMessagesRead(1 ); readPending = false ; pipeline.fireChannelRead(byteBuf); byteBuf = null ; } while (allocHandle.continueReading()); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); if (close) { closeOnRead(pipeline); } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close, allocHandle); } finally { if (!readPending && !config.isAutoRead()) { removeReadOp(); } } } }
直接内存
直接内存(Direct Memory)并不是虚拟机运行时数据区的一部分,也不是Java虚拟机规范中定义的内存区域,某些情况下这部分内存也会被频繁地使用,而且也可能导致OutOfMemoryError异常出现。Java里用DirectByteBuffer可以分配一块直接内存(堆外内存),元空间对应的内存也叫作直接内存,它们对应的都是机器的物理内存。
直接内存与堆内存的比较 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 public class DirectMemoryTest { public static void heapAccess () { long startTime = System.currentTimeMillis(); ByteBuffer buffer = ByteBuffer.allocate(1000 ); for (int i = 0 ; i < 100000 ; i++) { for (int j = 0 ; j < 200 ; j++) { buffer.putInt(j); } buffer.flip(); for (int j = 0 ; j < 200 ; j++) { buffer.getInt(); } buffer.clear(); } long endTime = System.currentTimeMillis(); System.out.println("堆内存访问:" + (endTime - startTime)); } public static void directAccess () { long startTime = System.currentTimeMillis(); ByteBuffer buffer = ByteBuffer.allocateDirect(1000 ); for (int i = 0 ; i < 100000 ; i++) { for (int j = 0 ; j < 200 ; j++) { buffer.putInt(j); } buffer.flip(); for (int j = 0 ; j < 200 ; j++) { buffer.getInt(); } buffer.clear(); } long endTime = System.currentTimeMillis(); System.out.println("直接内存访问:" + (endTime - startTime)); } public static void heapAllocate () { long startTime = System.currentTimeMillis(); for (int i = 0 ; i < 100000 ; i++) { ByteBuffer.allocate(100 ); } long endTime = System.currentTimeMillis(); System.out.println("堆内存申请:" + (endTime - startTime)); } public static void directAllocate () { long startTime = System.currentTimeMillis(); for (int i = 0 ; i < 100000 ; i++) { ByteBuffer.allocateDirect(100 ); } long endTime = System.currentTimeMillis(); System.out.println("直接内存申请:" + (endTime - startTime)); } public static void main (String args[]) { for (int i = 0 ; i < 5 ; i++) { heapAccess(); directAccess(); } System.out.println(); for (int i = 0 ; i < 5 ; i++) { heapAllocate(); directAllocate(); } } }
从程序运行结果看出直接内存申请较慢,但访问效率高。在java虚拟机实现上,本地IO会直接操作直接内存(直接内存=>系统调用=>硬盘/网卡),而非直接内存则需要二次拷贝(堆内存=>直接内存=>系统调用=>硬盘/网卡)。
使用直接内存的优缺点 优点 1、不占用堆内存空间,减少了发生GC的可能
2、运行较快(jvm虚拟机实现中,本地IO会操作直接内存,而如果是堆内存的话,操作IO需要多一步从直接内存复制到堆内存或者从堆内存写会直接内存)
缺点 1、初始分配慢
2、没有JVM直接帮助管理内存,容易发生内存溢出。
为了避免一直没有FullGC,最终导致直接内存把物理内存被耗完,我们可以通过-XX:MaxDirectMemorySize来指定直接内存最大值(JVM堆内存大小可以通过-Xmx来设置,同样的direct ByteBuffer可以通过-XX:MaxDirectMemorySize来设置 ),当达到阈值的时候,系统会自动调用system.gc来进行一次FULL GC, 间接把那些没有被使用的直接内存回收掉。