用springBoot、netty写TCP客户端/服务端,并用TCP工具测试

发布时间 2023-10-27 14:54:27作者: 山茶花llia

1.启动客户端和连接服务端

package com.pkx.cloud.test.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;


/**
 * 客户端
 */
@Component
@Slf4j
//@RequiredArgsConstructor
public class client implements InitializingBean {
    public void connect(int port, String host) throws Exception{

        /**
         * 客户端的NIO线程组
         *
         */
        EventLoopGroup group = new NioEventLoopGroup();

        try {
            /**
             * Bootstrap 是一个启动NIO服务的辅助启动类 客户端的
             */
            Bootstrap bootstrap = new Bootstrap();
            /**
             * 设置group
             */
            bootstrap = bootstrap.group(group);
            /**
             * 关联客户端通道
             */
            bootstrap = bootstrap.channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true);
            /**
             * 设置 I/O处理类,主要用于网络I/O事件,记录日志,编码、解码消息
             */
            bootstrap = bootstrap.handler(new ServerHandlerInit());

            System.out.println("netty client start success!");

            /**
             * 连接服务端
             */
            ChannelFuture f = bootstrap.connect(host, port).sync();
            //通常需要写不断重连服务端
            f.addListener((ChannelFutureListener) future -> {
                if (!future.isSuccess()) {
                   //重连交给后端线程执行
                    future.channel().eventLoop().schedule(() -> {
                        log.info("重连服务端...");
                        try {
                            connect();
                        } catch (Exception e) {
    //                        e.printStackTrace();
                            log.error("连接失败。。。");
                        }
                    }, 3000, TimeUnit.MILLISECONDS);
                } else {
                  log.info("服务端连接成功...");
              }
            });
            /**
             * 等待连接端口关闭
             */
            f.channel().closeFuture().sync();

        } finally {
            /**
             * 退出,释放资源
             */
            group.shutdownGracefully();
        }
    }

    /**
     * 通道初始化
     */
    static class ServerHandlerInit extends ChannelInitializer<SocketChannel> {
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            // 解码出具体的数据类型
            //该篇对解码器和编码器进行了详细说明,推荐阅读https://blog.csdn.net/tang_huan_11/article/details/133853786
            pipeline.addLast("decoder", new StringDecoder());//解码和编码可以自己定义
            pipeline.addLast(new handler());//handler类是自己写的处理类
            pipeline.addLast("encoder", new StringEncoder());
        }
    }

    @Override
    @Async //异步多线程注解
    //这个方法在服务启动时就会执行,即服务启动后则客户端就启动了
    public void afterPropertiesSet() throws Exception {
        connect(8888,"127.0.0.1");//TCP工具做服务端进行测试的时候的端口和ip
    }
}

2.自定义处理类

package com.pkx.cloud.test.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.net.InetSocketAddress;

/**
 * 处理类
 */
@Service
public class handler extends ChannelInboundHandlerAdapter {
    /**
     * 从服务端收到新的数据时,这个方法会在收到消息时被调用
     * 这里写收到服务端的数据之后要做的处理,通常有数据类型转换,数据解析
     * @param ctx
     * @param msg
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception, IOException
    {
        System.out.println("channelRead:read msg:"+msg.toString());
        //回应服务端
        ctx.write("接收到了数据----------------------!");
    }

    /**
     * 从服务端收到新的数据、读取完成时调用
     *
     * @param ctx
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws IOException
    {
        System.out.println("channelReadComplete");
        //回应服务端
        ctx.write("接收到了新数据----------------------!");
        ctx.flush();
    }

    /**
     * 当出现 Throwable 对象才会被调用,即当 Netty 由于 IO 错误或者处理器在处理事件时抛出的异常时
     *
     * @param ctx
     * @param cause
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws IOException
    {
        System.out.println("exceptionCaught");
        cause.printStackTrace();
        ctx.close();//抛出异常,断开与客户端的连接
    }

    /**
     * 客户端与服务端第一次建立连接时 执行
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception, IOException
    {
        super.channelActive(ctx);
        InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
        String clientIp = insocket.getAddress().getHostAddress();
        System.out.println("channelActive:"+clientIp+ctx.name());
        ByteBuf message = null;
        byte[] req = ("I am client once").getBytes();
        for(int i = 0; i < 5; i++) {
            message = Unpooled.buffer(req.length);
            message.writeBytes(req);
            Thread.sleep(5000);
            ctx.writeAndFlush(message);
        }

    }

    /**
     * 客户端与服务端 断连时 执行
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception, IOException
    {
        super.channelInactive(ctx);
        InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
        String clientIp = insocket.getAddress().getHostAddress();
        ctx.close(); //断开连接时,必须关闭,否则造成资源浪费
        System.out.println("channelInactive:"+clientIp);
    }
}

推荐一篇文章对客户端和服务端写法上的区别说明:https://juejin.cn/post/7290740945073668131

3.工具测试

  • 首先协议类型选择TCP服务端,表示该工具做服务端(因为我们写的是客户端,如果写的是服务端则这里选择 TCP client),ip和端口可以自定义,和我们上面代码里写的连接服务端的ip和端口保持一致就行,然后点击打开,可以看到当前连接对象数量为0

  • 然后我们启动我们的客户端服务

  • 可以看到已经有一个可连接的对象,并且已经自动连接上

  • 向客户端发送数据,可以看到客户端已经收到并作出了反应

至此,一个TCP客户端如何编写与测试就结束了,具体的代码解释已经标注