Netty实践 -- Netty处理粘包拆包

发布时间 2023-10-23 23:28:46作者: 乐之者v

TCP 粘包/拆包

TCP是以流的方式来处理数据,

拆包:一个完整的数据包可能会被TCP拆分成多个包进行发送。

粘包:TCP 可能把多个小的包粘成一个大的数据包。

粘包拆包示例:

  • 服务端 EchoServer:
/**
 * 服务端收到客户端的消息后,会进行响应。
 */
public final class EchoServer {
    /**
     * 端口
     */
    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));

    public static void main(String[] args) throws Exception {
        // 配置 EventLoopGroup
        // 主从 Reactor 多线程模式,bossGroup是 主 Reactor,workerGroup是 从Reactor
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            //初始化服务器的引导类 ServerBootstrap
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            //指定 EventLoopGroup
            serverBootstrap.group(bossGroup, workerGroup)
                //指定 channel
             .channel(NioServerSocketChannel.class)
             .option(ChannelOption.SO_BACKLOG, 100)
                //指定 ChannelHandler,用于处理 Channel
             .handler(new LoggingHandler(LogLevel.INFO))
             .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     //ChannelPipeline,基于责任链模式,可以添加多个 ChannelHandler
                     ChannelPipeline channelPipeline = ch.pipeline();
                     //ChannelHandler,用于处理 channel,实现对接收的数据的处理,实现业务逻辑。
                     //固定长度的拆包器 FixedLengthFrameDecoder
//                     channelPipeline.addLast(new FixedLengthFrameDecoder(19));
                     channelPipeline.addLast(new EchoServerHandler());
                 }
             });

            // 开启服务器,将服务器绑定到它要监听连接请求的端口上
            ChannelFuture channelFuture = serverBootstrap.bind(PORT).sync();

            // 等待直到服务器socket关闭
            channelFuture.channel().closeFuture().sync();
        } finally {
            //关闭所有 eventLoop,终止线程
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}
  • 服务端 EchoServerHandler:

/**
 * 服务端的 ChannelHandler.
 *
 * ChannelHandler,用于处理 channel,实现对接收的数据的处理,实现业务逻辑。
 * 继承 ChannelInboundHandlerAdapter,用来定义响应入站事件的方法。
 *
 */
@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    /**
     * channelRead() :读取 channel 传入的消息
     *
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf buf= (ByteBuf) msg;
        log.info("客户端发来的消息:"+ buf.toString(CharsetUtil.UTF_8) +"\n");
    }

    /**
     * channelReadComplete() :表示当前 ChannelHandler 读取完毕.
     * 执行后会自动跳转到 ChannelPipeline 中的下一个 ChannelHandler.
     *
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        //向客户端返回数据,writeAndFlush() 也可以拆分成 write(msg) 和 flush()
        ctx.writeAndFlush(Unpooled.copiedBuffer("见到你,我也很高兴^_^",CharsetUtil.UTF_8));
    }

    /**
     * exceptionCaught(): 在读取操作期间,有异常抛出时会调用。
     *
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // 发生异常时关闭连接
        cause.printStackTrace();
        ctx.close();
    }
}

  • 客户端 EchoClient
/**
 * 客户端。发送消息给服务端,并接收服务端的响应。
 *
 */
public final class EchoClient {

    static final String HOST = System.getProperty("host", "127.0.0.1");
    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
    static final int SIZE = Integer.parseInt(System.getProperty("size", "256"));

    public static void main(String[] args) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
             .channel(NioSocketChannel.class)
             .option(ChannelOption.TCP_NODELAY, true)
             .handler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel socketChannel) throws Exception {
                     ChannelPipeline channelPipeline = socketChannel.pipeline();
                     //ChannelHandler,用于处理 channel,实现对接收的数据的处理,实现业务逻辑。
                     //固定长度的拆包器 FixedLengthFrameDecoder
//                     channelPipeline.addLast(new FixedLengthFrameDecoder(19));
                     channelPipeline.addLast(new ClientNoDecoderHandler());
                 }
             });

            // 开启客户端,连接服务端的端口
            ChannelFuture channelFuture = bootstrap.connect(HOST, PORT).sync();

            channelFuture.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }
}
  • 客户端 ClientNoDecoderHandler:
public class ClientNoDecoderHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        for (int i = 0; i < 1000; i++) {
            ByteBuf buffer = getByteBuf(ctx);
            ctx.channel().writeAndFlush(buffer);
        }
    }

    private ByteBuf getByteBuf(ChannelHandlerContext ctx) {
        byte[] bytes = "粘包拆包测试.".getBytes(StandardCharsets.UTF_8);
        ByteBuf buffer = ctx.alloc().buffer();
        buffer.writeBytes(bytes);
        return buffer;
    }
}

测试结果:

客户端发送了多个字节,服务端在接收时可能会发生粘包拆包。

比如以下客户端发送了多个 "粘包拆包测试." ,结果有好几条数据粘在了一起,有些又被拆开了。

21:32:18.793 [nioEventLoopGroup-3-2] INFO com.example.demo.netty.echo.EchoServerHandler - 客户端发来的消息:粘包拆包测试.粘包拆包测试.

21:32:18.799 [nioEventLoopGroup-3-2] INFO com.example.demo.netty.echo.EchoServerHandler - 客户端发来的消息:粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.�

21:32:18.799 [nioEventLoopGroup-3-2] INFO com.example.demo.netty.echo.EchoServerHandler - 客户端发来的消息:�包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆�

21:32:18.799 [nioEventLoopGroup-3-2] INFO com.example.demo.netty.echo.EchoServerHandler - 客户端发来的消息:�测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.

21:32:18.799 [nioEventLoopGroup-3-2] INFO com.example.demo.netty.echo.EchoServerHandler - 客户端发来的消息:粘包拆包测试.粘包拆包测试.粘包拆包测试.

Netty解决粘包拆包

Netty 可以使用多个拆包器处理粘包拆包。

  • FixedLengthFrameDecoder:

固定长度拆包器,Netty 在消息长度固定的场景下,对固定长度的流数据进行解码。

  • LineBasedFrameDecoder:

行拆包器,发送端发送数据包的时候,每个数据包之间以换行符作为分隔,接收端通过LineBasedFrameDecoder将粘过的ByteBuf拆分成一个个完整的应用层数据包。

  • DelimiterBasedFrameDecoder:

分隔符拆包器。DelimiterBasedFrameDecoder是行拆包器的通用版本。

  • LengthFieldBasedFrameDecoder:

基于长度域拆包器。最后一种拆包器是最通用的一种拆包器,只要你的自定义协议中包含长度域字段,均可以使用这个拆包器来实现应用层拆包。

测试:

使用 FixedLengthFrameDecoder 进行拆包。

将示例的 EchoClient 、 EchoServer 这两个类注释掉的以下代码放开。

channelPipeline.addLast(new FixedLengthFrameDecoder(19));

可以看到,数据流已经正常展示了,没有粘包拆包。

22:50:03.127 [nioEventLoopGroup-3-1] INFO com.example.demo.netty.echo.EchoServerHandler - 客户端发来的消息:粘包拆包测试.

22:50:03.127 [nioEventLoopGroup-3-1] INFO com.example.demo.netty.echo.EchoServerHandler - 客户端发来的消息:粘包拆包测试.

22:50:03.127 [nioEventLoopGroup-3-1] INFO com.example.demo.netty.echo.EchoServerHandler - 客户端发来的消息:粘包拆包测试.

参考资料:

https://zhuanlan.zhihu.com/p/415450910