用Netty实现一个简单全双工通信

发布时间 2023-11-20 22:30:40作者: Tod4

用Netty实现一个简单全双工通信


​ 如题,需要注意在ByteBuf使用的过程中要使用buf.retain()保证buf引用计数大于0,才能够正常使用否则会报异常,原因是pipeline上的handler都是使用NIO线程执行的,因此使用buf的handler和tail handler(最后调用buf release的handler)是不同的线程,因此是异步的就会出现上面的问题

​ EchoServer.class

/**
 * @description: some desc
 * @author: admin
 * @email: 819574539@qq.com
 * @date: 2023/11/20 21:16
 */
@Slf4j
public class EchoServer {
    public static void main(String[] args) {
        new ServerBootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline()
                                .addLast(new ChannelInboundHandlerAdapter() {
                                    @Override
                                    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                        if (msg instanceof ByteBuf) {
                                            var buf = (ByteBuf) msg;
                                            var str = buf.toString(Charset.defaultCharset());
                                            log.debug(str);
                                            buf.retain();
                                            ctx.writeAndFlush(ByteBufAllocator.DEFAULT.buffer().writeBytes(("Hello, I'm Server:" + str).getBytes()));
                                        }
                                        super.channelRead(ctx, msg);
                                    }
                                });
                    }
                }).bind(8080);
    }
}

​ EchoClient.class

/**
 * @description: some desc
 * @author: admin
 * @email: 819574539@qq.com
 * @date: 2023/11/20 21:16
 */
@Slf4j
public class EchoClient {
    public static void main(String[] args) throws InterruptedException {
        var channelFuture = new Bootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<NioSocketChannel>() {

                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new StringEncoder())
                                .addLast(new ChannelInboundHandlerAdapter() {
                                    @Override
                                    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                        if (msg instanceof ByteBuf) {
                                            var buf = (ByteBuf) msg;
                                            buf.retain();
                                            log.debug(buf.toString(Charset.defaultCharset()));
                                        }
                                        super.channelRead(ctx, msg);
                                    }
                                });
                    }
                }).connect(new InetSocketAddress(8080));

        var channel = channelFuture.sync()
                .channel();
        while (true) {
            Scanner sc = new Scanner(System.in);
            var input = sc.next();
            if ("q".equals(input)) {
                break;
            }
            channel.writeAndFlush(input);
        }

    }
}