Netty-TCP 02.客户端

发布时间 2023-06-30 20:01:21作者: 米虫2022

本文是使用Netty开发一个简单的TCP通讯(聊天)应用程序的第【2】部分,主要介绍客户端的实现。

模块划分

TCP简单TCP通讯(聊天)应用程序客户端主要分为三个部分:

  1. 心跳保活处理
  2. 消息消费处理
  3. TCP连接实现

心跳保活

心跳保活是目的是告诉服务端客户端是在线的,当客户端空闲时,定时给服务端发一个ping数据包(客户端-服务端双方协定)。

通过注册Netty的IdleStateHandler监听IdleState事件来实现:

/**
 * @author michong
 */
public class PingHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleState state = ((IdleStateEvent) evt).state();
            if (state == IdleState.WRITER_IDLE) {
                Packet packet = new Packet(Pkt.PING, new byte[] {0});
                ctx.channel().writeAndFlush(packet);
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }
}

消息消费

消息消费是整个客户端业务处理的重心,可以根据小心的类型进行不用的业务处理,这里的实现是简单地将收到的消息打印处理:

/**
 * @author michong
 */
public class PacketHandler extends SimpleChannelInboundHandler<Packet> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Packet msg) {
        System.out.printf("收到消息:类型=%d,内容=%s\n", msg.getType(), new String(msg.getContent()));
    }
}

TCP连接

TCP连接,是客户端连接服务端的重要步骤,这里除了上面的IdleStateHandler和PingHandler、PacketHandler之外加入编解码处理器:

/**
 * @author michong
 */
public class TCPClientBootstrap {

    private final String host;
    private final int port;

    private EventLoopGroup workGroup;
    private Channel channel;

    public TCPClientBootstrap(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public Channel start() throws InterruptedException {
        if (Objects.nonNull(channel)) {
            return channel;
        }

        workGroup = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(workGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<Channel>() {
            @Override
            protected void initChannel(Channel channel) {
                ChannelPipeline pipe = channel.pipeline();
                pipe.addLast(new IdleStateHandler(0, TCPConst.IDLE_TIME_OUT_MILLISECONDS, 0, TimeUnit.MILLISECONDS));
                pipe.addLast(new PingHandler());
                pipe.addLast(new PacketDecoder());
                pipe.addLast(new PacketEncoder());
                pipe.addLast(new PacketHandler());
            }
        });
        ChannelFuture future = bootstrap.connect(host, port);
        future.sync();
        System.out.printf("客户端启动成功 => host=%s, port=%d\n", host, port);
        channel = future.channel();
        return channel;
    }

    public void stop() throws InterruptedException {
        if (Objects.nonNull(workGroup)) {
            workGroup.shutdownGracefully().sync();
        }
    }
}