Netty

发布时间 2023-11-12 23:46:25作者: RainbowMagic

IO模型

阻塞IO

客户端与服务端通过IP与断开进行流传输,客户端与服务端传输数据是以阻塞的方式进行传输的,在传输的过程中,其他客户端请求将会阻塞调,无法处理其他客户端的连接,可以利用多线程来尽量的处理多个客户端请求,但是线程消耗的资源是有限的,所有这种IO模型无法适用于大并发从场景下。服务端需要一直循环判断客户端是否有数据传输,如果有数据传输才进行处理,在一直循环客户端获取数据时,如果客户端没有发送数据,那么有很多请求都是无意义的,这无疑是浪费系统资源。
image

单线程单Reactor模型

在Java NIO编程中就实现了这种IO模型,服务端将客户端的连接事件注册到多路复用器中,也就是下图的事件转发器中,服务端通过遍历这个多路复用器来判断客户端是否有事件产生,如果有事件产生则执行具体的业务代码。这种方式比阻塞IO好一些,只有客户端有具体的事件发生才进行业务处理,少了许多无意义的服务端轮询请求。如下图所示,事件转发器轮询判断客户端是否有事件发生,如果有事件发生再进行具体的业务处理。
image

多线程单reactor模型

由于单线程单reactor模型只能单独处理一个客户端连接的客户端事件,别的客户端额别的事件将会被阻塞调,这样也不利用高并发场景下的使用,所有需要再多路复用器中添加多线程来提高系统的并发处理。
image

主从reactor模型

在多线程单reactor模式中,如果所有线程都在处理具体的业务功能,没有空余线程获取连接,如果有新连接到来没办法为新连接提供服务了,这样并发量的话也会下降,主从reactor可以解决这个问题,主reactor用于接收连接,接收到连接之后再转交给从reactor进行业务处理,如下图所示,mainRactor接收客户端连接,接收到客户端连接之后转交给SubReactor进行具体的read write处理。
image

Netty

netty为开发者实现了许多客户端协议支持、线程安全性以及优化了在NIO中不太还用的对象的API。
image
Netty支持以下几种IO模型,以及IO模型具体的实现类如下图所示。
image
reactor实现了reactor模型,单线程单reactor、多线程单reactor、主从reactor模型都有很好的支持。

BoosGroupEventLoop为主reactor,WorkerGroupEventLoop为从reactor。

客户端与服务端通过Pipline来处理接收到的数据,Pipline分为出站和入站,从对端获取到的数据需要处理则是入站handler,将数据发送到对端处理则是出站handler,入站handler是从左往右进行处理的,出站handler是从右往左进行处理的。
image

服务端

serverBootstrap.group(new NioEventLoopGroup(10), new NioEventLoopGroup(10));这里定义了主从reactor操作,主reactor 10个线程,从reactor 10个线程。
并在服务端channel中添加一个日志输出handler

        ServerBootstrap serverBootstrap = new ServerBootstrap();

        serverBootstrap.group(new NioEventLoopGroup(10), new NioEventLoopGroup(10));
		// io 模型类型
        serverBootstrap.channel(NioServerSocketChannel.class);
        // 用于监听服务端channel发生改变时触发
        serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));

        serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
        // 服务端获取到的客户端发生连接时触发
        serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
            /**
             * 当有客户端连接完毕时 将由从reactor处理时会调用这个方法
             */
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
			// 通过piplie来操作通道中的流的
                ChannelPipeline pipeline = ch.pipeline();
                // inboundHandler从前往后执行 outboundHandler从后往前执行
                pipeline.addLast(new LoggingHandler(LogLevel.INFO));
                pipeline.addLast(new HttpRequestDecoder());
                pipeline.addLast(new ServerInboundHandler());
                pipeline.addLast(new ServerInboundHandler1());
            }
        });


        // future提供了异步操作 可以添加监听器监听对指定操作进行回调
        ChannelFuture bind = serverBootstrap.bind(7979);

        try {
		// 服务端channel同步,同步完成才继续往下执行
            bind.sync();

        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }

入站handler,可以通过上下文来获取客户端channel,获取到channel之后就可以对客户端连接进行读写。
如果想要将事件向下传递到其他的入站事件时可以调用上下文.fireChannelActive()来向下传递。
通过构建ByteBuf来向对端传输二进制数据

public class ServerInboundHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("客户端发生连接0");
        // 将channel Active向下传递
        ctx.fireChannelActive();
    }

	// 有数据发送过来时执行
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf byteBuf1 = (ByteBuf) msg;
        byte[] bytes = new byte[byteBuf1.readableBytes()];

        byteBuf1.readBytes(bytes);

        System.out.println("接收到客户端发送过来的数据:" + new String(bytes));

        ByteBuf byteBuf = ctx.alloc().buffer();;
        byteBuf.writeBytes("Hello world".getBytes());

        Channel channel = ctx.channel();

        // 使用channel写数据和使用ctx上下文写数据是不同的,使用上下文写数据始终是从Pipline尾部向前传递,可以经过所有的OutBoundHandler,
        // 而使用channel写数据则是指挥通过前面的outboundHandler,后面的outboundhandle则会被忽略
        channel.writeAndFlush(byteBuf);

        // 将channel read向下从换掉 传递数据为msg
            ctx.fireChannelRead(bytes);
    }
}

出站事件, 如果有向对端写数据时则会触发出站事件,在出站事件中可以对将要发送到对端的数据进行操作,操作完成后再发送到对端,和入站事件一样,也可以调ctx.fireChannelActive();来将出站事件传递到下一个出站事件中。通常一个handler只能添加到pipilie一次,添加ChannelHandler.Sharable注解可以使handler添加到pipline多次。

// 共享handler如果定义全局变量的话 netty无法保证线程安全 线程安全由开发者来保证
@ChannelHandler.Sharable
public class ServerOutboundHandler1 extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        System.out.println(msg);
        for (int i = 0; i < 100; i++) {
            Map<String, Object> resultMap = new HashMap<>();
            resultMap.put("test", "123123");
            resultMap.put("sadasd", 123);
            ctx.writeAndFlush("test123");
        }
        ctx.fireChannelActive();
    }
}

需要注意的是,netty本身是可以保证线程安全的,但是它没办法保证handler对象中的成员共享变量的线程安全。如下所示,如果多个线程操作成员变量a,那么无法保证线程安全,需要加锁才可以保证线程安全。
image

客户端

客户端的大多数操作和服务端也差不多,出站入站都是通用,不同在于创建客户端对象的时候与服务端更简单了,客户端只用添加一个event loop即可,如下所示:

public class Client {
    public static void main(String[] args) {
        EventLoopGroup worker = new NioEventLoopGroup(10);

        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(worker);
		// io 模型类型
        bootstrap.channel(NioSocketChannel.class);
        bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
            @Override
            protected void initChannel(NioSocketChannel ch) throws Exception {
                ChannelPipeline pipeline = ch.pipeline();
                // 添加日志解析器
                pipeline.addLast(new LoggingHandler(LogLevel.INFO));
                pipeline.addLast(new ClientInboundHandler());

            }
        });

        try {
            bootstrap.connect("127.0.0.1", 7979).sync();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

解码器

粘包 拆包问题,客户端发送了多次数据 但是客户端接收数据会丢失一部分数据
原因: tcp会根据缓冲区剩余空间大小来发送数据,如果缓冲区没满则不会发送到对端,等到缓冲区满了才会将数据一次性发送到对端,
样就会有一个问题,多次数据全粘在一个缓冲区里,客户端会接收到多个数据
解决: 根据协议格式来将整个数据包发送到对端而不是缓冲区满了再发送,例如根据分隔符结尾或协议位来解决。
netty提供了以下几种常用的handler来解决粘包拆包问题。
image
固定长度,客户端服务端每次发送的长度都是固定的,这样就可以使用第一个handler来解决粘包拆包问题。

              // 基于长度域的编解码器 参数1表示length的长度
                pipeline.addLast(new FixedLengthFrameDecoder(4));

可以采用分隔符来分割每次发送的结尾,如果遇到指定分隔符的话,那么表示数据发送完成了。参数一表示一请全体最长长度,参数2表示分隔符。

                pipeline.addLast(new DelimiterBasedFrameDecoder(65535, ch.alloc().buffer().writeBytes("\n".getBytes())));

最常用的应该就是第三章基于固定字段的消息方式来解决
编码: 参数1表示lenght长度占的字节数

pipeline.addLast(new FixedLengthFrameDecoder(4));

解码:基于长度的域解码器 参数1为最大协议帧长度 参数2为跳过字节数 参数3为header最大长度 参数4为header与body填充数据 参数5为解码后跳过的字节长度

                pipeline.addFirst(new LengthFieldBasedFrameDecoder(65535, 0, 4, 0, 4));