netty_1、介绍

发布时间 2023-08-09 15:34:16作者: Stitches

1、NIO存在的问题

1.1 客户端关闭导致服务端轮询

在关闭客户端时,服务端 Selector.select() 操作不会阻塞,会直接通过并且认为是 READ 状态,而此时的数据长度为0,就会导致空轮询操作。

image-20220906203301768

针对这点可以在服务端读取数据时发现为0,就断开通道连接。

1.2 粘包和拆包问题

通过TCP协议发送数据时会优先将数据放在缓冲区,由TCP协议去决定何时发送这些数据。假设此时有 A、B两个数据包需要发送,那么TCP发送时可能将A、B当成一个数据包发送过去(造成粘包问题);或者发送A、B的一部分造成拆包问题。

img

针对该问题有以下常见解决方案:

  • 消息定长,发送方和接收方规定固定消息大小长度,只有当接收到指定长度数据后才作为一个完成数据包进行处理。
  • 在每个包尾设置固定分隔符,例如 “\r\n” 作为分隔符;
  • 将消息划分为 包=包内容长度(4byte)+包内容 ,这样对于粘包问题先读出包内容长度 n,然后再读取长度为 n 的包内容,这样数据包之间的边界就清楚了;对于拆包问题,先读出包内容长度 n,由于此次缓存区数据长度小于 n,因此需要先缓存这部分内容,等待下次 read事件时来拼接形成完整数据包。

粘包/拆包问题参考:https://www.jianshu.com/p/5c13ed1c709c

2、Netty框架

<dependencies>
    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>4.1.76.Final</version>
    </dependency>
</dependencies>

2.1 ByteBuf

Netty 中使用 ByteBuf 作为缓冲区,ByteBuf 在写操作完成后无需进行 flip() 翻转,并且具有比 NIO中 ByteBuffer 更快的响应速度,而且能够实现自动扩容。

有两种方式生成 ByteBuf,一种是池化缓冲区、一种是非池化缓冲区。池化缓冲区类似于线程池,能够对内存块进行复用处理,这样就不用进行频繁的内存申请和释放了。

// ByteBuf 写入和读取
ByteBuf buf = Unpooled.buffer(10);
System.out.println("初始状态:"+ Arrays.toString(buf.array()));
buf.writeInt(-8888888);
System.out.println("写入Int后:"+ Arrays.toString(buf.array()));
buf.readShort();
System.out.println("读取short后;"+ Arrays.toString(buf.array()));
buf.discardReadBytes();  //丢弃后续可读部分到最前面,并将读写指针向前移动丢弃的距离
System.out.println("丢弃之后:"+Arrays.toString(buf.array()));
buf.clear(); //清空后读写指针都为零
System.out.println("清空之后:"+Arrays.toString(buf.array()));
// ByteBuf 通过 wrappedBuffer创建,缓冲区仅可读
ByteBuf buf = Unpooled.wrappedBuffer("abcdefg".getBytes());
buf.readByte();     //读取一个字节
ByteBuf slice = buf.slice();   //划分,此时读指针位于1
System.out.println(slice.arrayOffset());
System.out.println(Arrays.toString(slice.array()));
// 创建一个复合缓冲区,两个缓冲区的组合
CompositeByteBuf buf = Unpooled.compositeBuffer();
buf.addComponent(Unpooled.copiedBuffer("abc".getBytes()));
buf.addComponent(Unpooled.copiedBuffer("def".getBytes()));
for (int i = 0; i < buf.capacity(); i++) {
    System.out.println((char) buf.getByte(i));
}
// 池化缓冲区的使用,演示了内存块的回收利用
ByteBufAllocator allocator = PooledByteBufAllocator.DEFAULT;
ByteBuf buf = allocator.directBuffer(10);   //申请一个容量为10的直接缓冲区
buf.writeChar('T');    //随便操作操作
System.out.println(buf.readChar());
buf.release();    //释放此缓冲区

ByteBuf buf2 = allocator.directBuffer(10);   //重新再申请一个同样大小的直接缓冲区
System.out.println(buf2 == buf);

2.2 零拷贝

零拷贝是指将数据从一个区域拷贝到另一个区域,零拷贝减少了不必要的CPU数据拷贝;减少了用户空间和内核空间的上下文切换。

内核空间和用户空间:

image-20220907095322213
  • 内核空间:Linux 自身使用的空间,主要提供供进程调度、内存分配、连接硬件资源等功能;
  • 用户空间:提供给各个程序进程的空间,用户空间不具备访问内核空间资源的权限,如果应用程序需要使用到内核空间的资源,则需要通过系统调用来完成;从用户空间切换到内核空间,然后完成相关操作后再回到用户空间。

缓冲区和虚拟内存:

image-20220907095702617
  • 直接内存访问(DMA):DMA允许外设设备和内存存储器之间直接进行IO数据传输,其过程不需要CPU的参与;

  • 缓冲区:进程如果发起 read 请求,内核首先检查内核空间缓冲区是否有进程所需的数据,如果已经存在则 copy 到进程的内存区;如果没有,系统向磁盘请求数据,通过 DMA 写入到内核的 read 缓冲区,再从内核缓冲区 copy 到进程的内存区;

    ​ 进程如果发起 write 请求,把进程内存区的数据 copy 到内核write缓冲区,然后再通过 DMA 把内核缓冲区数据刷回磁盘或者网卡。

  • 虚拟内存:多个虚拟地址可以对应同一个物理地址,虚拟内存空间可大于实际可用的物理地址。

2.2.1 传统 I/O 操作

image-20220907100445674
  • 用户发出 read 系统调用,由用户态转化为内核态,接着通过 DMA 将数据从磁盘中读取到内核空间的内核缓冲区中;
  • 接着将内核空间缓冲区的数据拷贝到用户空间进程内存,然后 read 系统调用返回。read 系统调用返回又会导致一次内核空间和用户空间的上下文切换;
  • write 系统调用,再次导致用户空间到内核空间的上下文切换,将用户空间的进程的内存数据复制到内核空间的 socket 缓冲区,然后write 系统调用返回,再次触发上下文切换。
  • socket缓冲区数据通过 DMA 传输到网卡是独立异步的过程。

传统 I/O 操作涉及到 4次上下文切换、2次CPU数据复制、2次DMA数据复制

2.2.2 mmap、write 实现零拷贝

mmap 文件内存映射函数可以将磁盘上的物理页映射到虚拟内存的虚拟页。 参照这一特性,mmap可以实现进程间通信。

image-20220907104148253
  • 用户发起 mmap 系统调用,由用户态切换为内核态,然后通过 DMA 将磁盘文件的数据复制到内核缓冲区中;
  • mmap 系统调用返回,导致内核空间到用户空间的上下文切换,同时内核空间和用户空间共享同一缓冲区;
  • 用户发起 write 系统调用,由用户态切换为内核态,将内核缓冲区的数据复制到socket缓冲区中,write系统调用返回,内核态切换为用户态;
  • 异步地进行DMA将数据从 socket 缓冲区复制到网卡。
#include <sys/mman.h>
void *mmap(void *start, size_t length, int prot, int flags, int fd, off_t offset)

mmap、write操作涉及到4次上下文切换、1次CPU复制、2次DMA复制

2.2.3 sendfile 实现零拷贝

image-20220907104724967
  • 用户发起 sendfile 系统调用,由用户态切换为内核态,然后通过DMA将磁盘文件复制到内核缓冲区中,再将数据从内核缓冲区复制到socket缓冲区中;
  • sendfile 系统调用返回,由内核态切换为用户态,并由DMA异步地将数据从socket缓冲区传递到网卡。
#include <sys/sendfile.h>
ssize_t sendfile(int out_fd, int in_fd, off_t *offset, size_t count);

sendfile 系统调用涉及到 2次用户空间与内核空间的上下文切换,1次CPU复制,2次DMA复制。

2.3 Netty 工作模型

image-20220907105157321

BossGroup 对应主Reactor、 WorkerGroup对应从Reactor。两种均采用了事件循环机制 EventLoopGroup,不断进行事件监听并通知。

BossGroup之后,将SocketChannel 绑定到 WorkerGroup 中的一个 EventLoopGroup上,进行后续的读写操作监听。

// Netty 服务器端
public class Server {
    public static void main(String[] args) {
        //1.创建 BossGroup 和 WorkerGroup,底层采用 NioEventLoopGroup
        EventLoopGroup bossGroup = new NioEventLoopGroup(), workerGroup = new NioEventLoopGroup();
        //2.创建服务端启动引导类
        ServerBootstrap bootstrap = new ServerBootstrap();
        //3.链接指定事件处理
        bootstrap
                .group(bossGroup, workerGroup)             //指定事件循环组
                .channel(NioServerSocketChannel.class)     //指定NIO为ServerSocketChannel
                .childHandler(new ChannelInitializer<SocketChannel>() {  //可以循环链接上多个处理器
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        //获取流水线,以流水线方式处理客户端数据
                        socketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {  //添加一个Handler
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //ctx为上下文、msg为收到的消息
                                ByteBuf buf = (ByteBuf) msg;
                                System.out.println(Thread.currentThread().getName()+" >> 接收到客户端发送的数据:"+buf.toString(StandardCharsets.UTF_8));
                                //通过上下文将数据返回回去
                                ctx.writeAndFlush(Unpooled.wrappedBuffer("已收到!".getBytes()));
                            }
                        });
                    }
                });
        //4.绑定端口启动
        bootstrap.bind(8080);
    }
}
// Netty 客户端
public class Client {
    public static void main(String[] args) {
        //创建一个新的SocketChannel,一会通过通道进行通信
        try (SocketChannel channel = SocketChannel.open(new InetSocketAddress("localhost", 8080));
             Scanner scanner = new Scanner(System.in)){
            System.out.println("已连接到服务端!");
            while (true) {   //咱给它套个无限循环,这样就能一直发消息了
                System.out.println("请输入要发送给服务端的内容:");
                String text = scanner.nextLine();
                if(text.isEmpty()) continue;
                //直接向通道中写入数据,真舒服
                channel.write(ByteBuffer.wrap(text.getBytes()));
                System.out.println("已发送!");
                ByteBuffer buffer = ByteBuffer.allocate(128);
                channel.read(buffer);   //直接从通道中读取数据
                buffer.flip();
                System.out.println("收到服务器返回:"+new String(buffer.array(), 0, buffer.remaining()));
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}

2.4 Netty 中的Channel

Netty 中 Channel 涉及到的 IO 操作均是异步的,并不是在当前线程同步运行,方法调用之后就返回了。

image-20220907113126205 image-20220907113145529

Channel 接口的父接口 ChannelOutboundInvoker,内部方法均返回 ChannelFuture 而不是具体的结果,因此为异步。

ChannelHandler 是对通道内的数据的具体操作处理,类似于NIO 中的Handler,多个Handler的处理是放在流水线(pipeline) 上进行处理的,流水线类似于一条链表,每次通过 addLast() 操作往链表尾添加 Handler 处理器。ChannelInboundHanderAdapter 继承于父类 ChannelInboundHandler,该接口中包括事件处理的各个阶段会触发的函数,比如 channelRegistered、channelActive、channelRead、channelReadComplete等等。

image-20220907113613267 image-20220907113721696

自定义实现 ChannelInboundHandler 处理器,测试事件处理的各个阶段:

public class TestChannelHandler extends ChannelInboundHandlerAdapter {
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channel Registered");
    }

    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channel UnRegistered");
    }

    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channel active");
    }

    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channel Inactive");
    }

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        System.out.println(Thread.currentThread().getName()+" >> 接收客户端发送的数据:"+buf.toString(StandardCharsets.UTF_8));
        ByteBuf buffer = ctx.alloc().buffer();
        buffer.writeCharSequence("已收到!",StandardCharsets.UTF_8);
        ctx.writeAndFlush(buffer);
        System.out.println("channelRead");
    }

    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channel ReadComplete");
    }

    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        System.out.println("user Trigger");
    }

    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channel writechange");
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("exceptionCaught"+cause);
    }
}

image-20220907191309338

ChannelPipeline 流水线上能够容纳多个 Handler,其中入站操作为 ChannelInboundHandler、出站操作为 ChannelOutboundHandler。入站操作的按照流水线正向传播,比如如下两个 Handler 进行消息的传递。

bootstrap
    .group(bossGroup, workerGroup)
    .channel(NioServerSocketChannel.class)
    .childHandler(new ChannelInitializer<SocketChannel>() {
        @Override
        protected void initChannel(SocketChannel socketChannel) throws Exception {
            socketChannel.pipeline()
                .addLast(new ChannelInboundHandlerAdapter(){
                    @Override
                    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                        ByteBuf buf = (ByteBuf) msg;
                        System.out.println("1接收客户端数据:"+buf.toString(StandardCharsets.UTF_8));
                        ctx.fireChannelRead(msg);   //传播数据到下一个Handler
                    }
                })
                .addLast(new ChannelInboundHandlerAdapter(){
                    @Override
                    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                        ByteBuf buf = (ByteBuf) msg;
                        System.out.println("2接收客户端数据:"+buf.toString(StandardCharsets.UTF_8));
                    }
                });
        }
    });

image-20220907192352750

出站操作按照流水线反方向进行处理,整个流水线操作如下:

image-20220907192455852

2.5 EventLoop 任务调度

image-20220907201713913

每建立一个连接就将绑定到一个 EventLoop 上,之后 EventLoop 就开始监听这个连接,一个 EventLoop 可以同时监听多个 Channel,类似于 Selector。

我们现在编写的服务端,类似于主从 Reactor模型,但是 Handler 中处理读和写操作都是在一起的,而为了更高的效率应该将Handler 处理与读写隔离开,由单独的多线程去处理。

//单线程模式,多个客户端连接会依次卡住
public static void main(String[] args) {
    EventLoopGroup bossGroup = new NioEventLoopGroup(), workerGroup = new NioEventLoopGroup(1);  //线程数先限制一下
    ServerBootstrap bootstrap = new ServerBootstrap();
    bootstrap
            .group(bossGroup, workerGroup)   //指定事件循环组
            .channel(NioServerSocketChannel.class)   //指定为NIO的ServerSocketChannel
            .childHandler(new ChannelInitializer<SocketChannel>() {   //注意,这里的SocketChannel不是我们NIO里面的,是Netty的
                @Override
                protected void initChannel(SocketChannel channel) {
                    channel.pipeline()
                            .addLast(new ChannelInboundHandlerAdapter(){
                                @Override
                                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                    ByteBuf buf = (ByteBuf) msg;
                                    System.out.println("接收到客户端发送的数据:"+buf.toString(StandardCharsets.UTF_8));
                                    Thread.sleep(10000);   //这里我们直接卡10秒假装在处理任务
                                    ctx.writeAndFlush(Unpooled.wrappedBuffer("已收到!".getBytes()));
                                }
                            });
                }
            });
    bootstrap.bind(8080);
}

有两种方式将 Handler 和 读写操作分隔开,①单独创建一个 EventLoop 去处理读写之外的操作,类似于从 Reactor 的 selector集合;②以流水线的方式加入对应的 EventLoop 和 Handler:

public static void main(String[] args) {
    EventLoopGroup bossGroup = new NioEventLoopGroup(), workerGroup = new NioEventLoopGroup(1);  //线程数先限制一下
    EventLoopGroup handlerGroup = new DefaultEventLoopGroup();  //使用DefaultEventLoop来处理其他任务
    ServerBootstrap bootstrap = new ServerBootstrap();
    bootstrap
        .group(bossGroup, workerGroup)
        .channel(NioServerSocketChannel.class)
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel channel) {
                channel.pipeline()
                    .addLast(new ChannelInboundHandlerAdapter(){
                        @Override
                        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                            ByteBuf buf = (ByteBuf) msg;
                            System.out.println("接收到客户端发送的数据:"+buf.toString(StandardCharsets.UTF_8));
                            ctx.fireChannelRead(msg);
                        }
                    })
                    .addLast(handlerGroup, new ChannelInboundHandlerAdapter(){
                        @Override
                        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                            Thread.sleep(10000);
                            ctx.writeAndFlush(Unpooled.wrappedBuffer("已收到!".getBytes()));
                        }
                    });
            }
        });
    bootstrap.bind(8080);
}

2.6 Netty 解码器

https://blog.csdn.net/saienenen/article/details/112757958

2.7 Netty 的一些问题

1、netty的@ChannelHandler.Sharable

https://blog.csdn.net/m0_37055174/article/details/99957301

https://blog.csdn.net/dyingstraw/article/details/97303124

2、netty服务端多端口绑定

https://blog.csdn.net/weixin_41863745/article/details/108637573

https://blog.csdn.net/usagoole/article/details/88025354

3、netty接收超过1024字节

https://blog.csdn.net/qq_36665310/article/details/122281267

4、netty ByteBuff 详解

http://t.zoukankan.com/kuluo-p-12624889.html

零拷贝:

https://blog.csdn.net/weixin_46507406/article/details/111479124

https://zhuanlan.zhihu.com/p/485632980

虚拟内存:

https://blog.csdn.net/qq_45335146/article/details/125350776

https://blog.csdn.net/baidu_38310096/article/details/78225020

https://blog.csdn.net/qq_33369979/article/details/109074372

写时复制:

https://blog.csdn.net/qq_38341582/article/details/108123971