Netty-TCP 03.服务端

发布时间 2023-06-30 20:01:21作者: 米虫2022

本文是使用Netty开发一个简单的TCP通讯(聊天)应用程序的第【3】部分,主要介绍服务端的实现。

模块划分

跟客户端类似,服务端也是主要分为三个部分:

  1. 心跳检测处理
  2. 消息消费处理
  3. TCP服务实现

心跳检测

服务端需要定时检测客户端是否在线(即是否发送心跳),如果没有,那么将客户端连接断开,同样通过注册Netty的IdleStateHandler监听IdleState事件来实现:

/**
 * @author michong
 */
public class PingHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleState state = ((IdleStateEvent) evt).state();
            if (state == IdleState.READER_IDLE) {
                ctx.close();
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }
}

消息消费

这个部分实现跟客户端基本是完全一致的,只不过这里额外处理了一下,在收到的数据包不是心跳包时,给客户端发送一个"OK."字符串:

/**
 * @author michong
 */
public class PacketHandler extends SimpleChannelInboundHandler<Packet> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Packet msg) {
        if (msg.getType() == Pkt.PING) {
            return;
        }
        System.out.printf("收到消息:类型=%d,内容=%s\n", msg.getType(), new String(msg.getContent()));
        ctx.channel().writeAndFlush(new Packet(Pkt.TEXT, "OK.".getBytes()));
    }
}

TCP服务

跟客户端不同的是,服务端使用的ServerBootstrap,采用NIO的方式启动TCP服务:

/**
 * @author michong
 */
public class TCPServerBootstrap {

    private final String host;
    private final int port;

    private EventLoopGroup bossGroup;
    private EventLoopGroup workGroup;

    public TCPServerBootstrap(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public void start() {
        bossGroup = new NioEventLoopGroup(1);
        workGroup = new NioEventLoopGroup(256);

        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(bossGroup, workGroup).channel(NioServerSocketChannel.class);
        bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel socketChannel) {
                ChannelPipeline pipe = socketChannel.pipeline();
                pipe.addLast(
                        new IdleStateHandler(TCPConst.IDLE_TIME_OUT_MILLISECONDS * 3, 0, 0, TimeUnit.MILLISECONDS));
                pipe.addLast(new PingHandler());
                pipe.addLast(new PacketEncoder());
                pipe.addLast(new PacketDecoder());
                pipe.addLast(new PacketHandler());
            }
        });

        bootstrap.bind(host, port).addListener((ChannelFutureListener) cf -> {
            if (cf.isSuccess()) {
                System.out.printf("服务端启动成功 => host=%s, port=%d\n", host, port);
            } else {
                System.out.printf("服务端启动失败 => %s", cf.cause());
                System.exit(-1);
            }
        });
    }
}