Netty、Kafka中的零拷贝技术到底有多牛?
零拷贝,从字面意思理解就是数据不需要来回的拷贝,大大提升了系统的性能。

图片来自 Pexels
我们也经常在 Java NIO,Netty,Kafka,RocketMQ 等框架中听到零拷贝,它经常作为其提升性能的一大亮点;下面从 I/O 的几个概念开始,进而再分析零拷贝。
I/O 概念
缓冲区
缓冲区是所有 I/O 的基础,I/O 讲的无非就是把数据移进或移出缓冲区;进程执行 I/O 操作,就是向操作系统发出请求,让它要么把缓冲区的数据排干(写),要么填充缓冲区(读)。

进程发起 Read 请求之后,内核接收到 Read 请求之后,会先检查内核空间中是否已经存在进程所需要的数据,如果已经存在,则直接把数据 Copy 给进程的缓冲区。
如果没有内核随即向磁盘控制器发出命令,要求从磁盘读取数据,磁盘控制器把数据直接写入内核 Read 缓冲区,这一步通过 DMA 完成。
接下来就是内核将数据 Copy 到进程的缓冲区;如果进程发起 Write 请求,同样需要把用户缓冲区里面的数据 Copy 到内核的 Socket 缓冲区里面,然后再通过 DMA 把数据 Copy 到网卡中,发送出去。
你可能觉得这样挺浪费空间的,每次都需要把内核空间的数据拷贝到用户空间中,所以零拷贝的出现就是为了解决这种问题的。
关于零拷贝提供了两种方式分别是:
mmap+write
Sendfile
虚拟内存
所有现代操作系统都使用虚拟内存,使用虚拟的地址取代物理地址,这样做的好处是:
一个以上的虚拟地址可以指向同一个物理内存地址。
虚拟内存空间可大于实际可用的物理地址。
利用第一条特性可以把内核空间地址和用户空间的虚拟地址映射到同一个物理地址,这样 DMA 就可以填充对内核和用户空间进程同时可见的缓冲区了。

mmap+write 方式

Sendfile?方式

Java 零拷贝
MappedByteBuffer
下面看一个简单的读取实例,然后再对 MappedByteBuffer 进行分析:
public?class?MappedByteBufferTest?{
????public?static?void?main(String[]?args)?throws?Exception?{
????????File?file?=?new?File("D://db.txt");
????????long?len?=?file.length();
????????byte[]?ds?=?new?byte[(int)?len];
????????MappedByteBuffer?mappedByteBuffer?=?new?FileInputStream(file).getChannel().map(FileChannel.MapMode.READ_ONLY,?0,
????????????????len);
????????for?(int?offset?=?0;?offset?????????????byte?b?=?mappedByteBuffer.get();
????????????ds[offset]?=?b;
????????}
????????Scanner?scan?=?new?Scanner(new?ByteArrayInputStream(ds)).useDelimiter("?");
????????while?(scan.hasNext())?{
????????????System.out.print(scan.next()?+?"?");
????????}
????}
}
????public?abstract?MappedByteBuffer?map(MapMode?mode,
?????????????????????????????????????????long?position,?long?size)
????????throws?IOException;
MapMode:映射的模式,可选项包括:READ_ONLY,READ_WRITE,PRIVATE。
Position:从哪个位置开始映射,字节数的位置。
Size:从 Position 开始向后多少个字节。
大致浏览一下 map() 方法的源码:
????public?MappedByteBuffer?map(MapMode?mode,?long?position,?long?size)
????????throws?IOException
????{
????????????...省略...
????????????int?pagePosition?=?(int)(position?%?allocationGranularity);
????????????long?mapPosition?=?position?-?pagePosition;
????????????long?mapSize?=?size?+?pagePosition;
????????????try?{
????????????????//?If?no?exception?was?thrown?from?map0,?the?address?is?valid
????????????????addr?=?map0(imode,?mapPosition,?mapSize);
????????????}?catch?(OutOfMemoryError?x)?{
????????????????//?An?OutOfMemoryError?may?indicate?that?we've?exhausted?memory
????????????????//?so?force?gc?and?re-attempt?map
????????????????System.gc();
????????????????try?{
????????????????????Thread.sleep(100);
????????????????}?catch?(InterruptedException?y)?{
????????????????????Thread.currentThread().interrupt();
????????????????}
????????????????try?{
????????????????????addr?=?map0(imode,?mapPosition,?mapSize);
????????????????}?catch?(OutOfMemoryError?y)?{
????????????????????//?After?a?second?OOME,?fail
????????????????????throw?new?IOException("Map?failed",?y);
????????????????}
????????????}
????????????//?On?Windows,?and?potentially?other?platforms,?we?need?an?open
????????????//?file?descriptor?for?some?mapping?operations.
????????????FileDescriptor?mfd;
????????????try?{
????????????????mfd?=?nd.duplicateForMapping(fd);
????????????}?catch?(IOException?ioe)?{
????????????????unmap0(addr,?mapSize);
????????????????throw?ioe;
????????????}
????????????assert?(IOStatus.checkAll(addr));
????????????assert?(addr?%?allocationGranularity?==?0);
????????????int?isize?=?(int)size;
????????????Unmapper?um?=?new?Unmapper(addr,?mapSize,?isize,?mfd);
????????????if?((!writable)?||?(imode?==?MAP_RO))?{
????????????????return?Util.newMappedByteBufferR(isize,
?????????????????????????????????????????????????addr?+?pagePosition,
?????????????????????????????????????????????????mfd,
?????????????????????????????????????????????????um);
????????????}?else?{
????????????????return?Util.newMappedByteBuffer(isize,
????????????????????????????????????????????????addr?+?pagePosition,
????????????????????????????????????????????????mfd,
????????????????????????????????????????????????um);
????????????}
?????}
DirectByteBuffer
上一节中通过 Filechannel 映射出的 MappedByteBuffer 其实际也是 DirectByteBuffer,当然除了这种方式,也可以手动开辟一段空间:
ByteBuffer?directByteBuffer?=?ByteBuffer.allocateDirect(100);
Channel-to-Channel 传输
经常需要从一个位置将文件传输到另外一个位置,FileChannel 提供了 transferTo() 方法用来提高传输的效率,首先看一个简单的实例:
public?class?ChannelTransfer?{
????public?static?void?main(String[]?argv)?throws?Exception?{
????????String?files[]=new?String[1];
????????files[0]="D://db.txt";
????????catFiles(Channels.newChannel(System.out),?files);
????}
????private?static?void?catFiles(WritableByteChannel?target,?String[]?files)
????????????throws?Exception?{
????????for?(int?i?=?0;?i?????????????FileInputStream?fis?=?new?FileInputStream(files[i]);
????????????FileChannel?channel?=?fis.getChannel();
????????????channel.transferTo(0,?channel.size(),?target);
????????????channel.close();
????????????fis.close();
????????}
????}
}
通过 FileChannel 的 transferTo() 方法将文件数据传输到 System.out 通道,接口定义如下:
????public?abstract?long?transferTo(long?position,?long?count,
????????????????????????????????????WritableByteChannel?target)
????????throws?IOException;
Netty 零拷贝
看下面一张图会比较清晰:

可以看一下 Netty 提供的 CompositeChannelBuffer 源码:
public?class?CompositeChannelBuffer?extends?AbstractChannelBuffer?{
????private?final?ByteOrder?order;
????private?ChannelBuffer[]?components;
????private?int[]?indices;
????private?int?lastAccessedComponentId;
????private?final?boolean?gathering;
????public?byte?getByte(int?index)?{
????????int?componentId?=?componentId(index);
????????return?components[componentId].getByte(index?-?indices[componentId]);
????}
????...省略...
其他零拷贝
总结
作者:ksfzhaohui
编辑:陶家龙、孙淑娟
出处:juejin.im/post/5cad6f1ef265da039f0ef5df

精彩文章推荐:
关注公众号:拾黑(shiheibook)了解更多
[广告]赞助链接:
四季很好,只要有你,文娱排行榜:https://www.yaopaiming.com/
让资讯触达的更精准有趣:https://www.0xu.cn/
关注网络尖刀微信公众号随时掌握互联网精彩
- 1 习近平听取岑浩辉述职报告 7904229
- 2 哈尔滨大雪人原来是挖出来的 7809746
- 3 央视曝光走私孕妇血样黑色产业链 7712436
- 4 2025年度文化记忆 重温感动瞬间 7617100
- 5 女子毛衣粘走3000元翡翠耳环主动归还 7523626
- 6 中央财办:扩大内需是明年首位任务 7425141
- 7 一家四口5年在存钱罐存下8万多 7330166
- 8 学校通报“宿管摔死学生小猫” 7233438
- 9 女子罕见被控迷信罪 “供奉”2.1亿 7141621
- 10 用漫画方式了解海南自贸港封关 7045621







51CTO技术栈
