Netty实践 -- echo

发布时间 2023-10-19 21:20:04作者: 乐之者v

Netty实践

学习netty,可以从netty源码的 netty-example 模块开始。

netty-example 有一个例子 echo,非常适合入门学习。

这里稍微改造一下,用作示例学习。

引入依赖包:

        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.29.Final</version>
        </dependency>

服务端

服务端收到客户端的消息后,会进行响应。

  • EchoServer:
/**
 * 服务端收到客户端的消息后,会进行响应。
 */
public final class EchoServer {
    /**
     * 端口
     */
    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));

    public static void main(String[] args) throws Exception {
        // 配置 EventLoopGroup
        // 主从 Reactor 多线程模式,bossGroup是 主 Reactor,workerGroup是 从Reactor
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            //初始化服务器的引导类 ServerBootstrap
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            //指定 EventLoopGroup
            serverBootstrap.group(bossGroup, workerGroup)
                //指定 channel
             .channel(NioServerSocketChannel.class)
             .option(ChannelOption.SO_BACKLOG, 100)
                //指定 ChannelHandler,用于处理 channel
             .handler(new LoggingHandler(LogLevel.INFO))
             .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     //ChannelPipeline,基于责任链模式,可以添加多个 ChannelHandler
                     ChannelPipeline channelPipeline = ch.pipeline();
                     //channelPipeline.addLast(new LoggingHandler(LogLevel.INFO));
                     //ChannelHandler,用于处理 channel,实现对接收的数据的处理,实现业务逻辑。
                     channelPipeline.addLast(new EchoServerHandler());
                 }
             });

            // 开启服务器,将服务器绑定到它要监听连接请求的端口上
            ChannelFuture channelFuture = serverBootstrap.bind(PORT).sync();

            // 等待直到服务器socket关闭
            channelFuture.channel().closeFuture().sync();
        } finally {
            //关闭所有 eventLoop,终止线程
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

  • EchoServerHandler:

ChannelHandler,用于处理 channel,实现业务逻辑。

/**
 * 服务端的 ChannelHandler.
 *
 * ChannelHandler,用于处理 channel,实现对接收的数据的处理,实现业务逻辑。
 * 继承 ChannelInboundHandlerAdapter,用来定义响应入站事件的方法。
 *
 */
@Sharable
@Slf4j
public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    /**
     * channelRead() :读取 channel 传入的消息
     *
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf buf= (ByteBuf) msg;
        log.info("客户端发来的消息:"+ buf.toString(CharsetUtil.UTF_8) +"\n");
    }

    /**
     * channelReadComplete() :表示当前 ChannelHandler 读取完毕.
     * 执行后会自动跳转到 ChannelPipeline 中的下一个 ChannelHandler.
     *
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        //向客户端返回数据,writeAndFlush() 也可以拆分成 write(msg) 和 flush()
        ctx.writeAndFlush(Unpooled.copiedBuffer("见到你,我也很高兴^_^",CharsetUtil.UTF_8));
    }

    /**
     * exceptionCaught(): 在读取操作期间,有异常抛出时会调用。
     *
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // 发生异常时关闭连接
        cause.printStackTrace();
        ctx.close();
    }
}

客户端

在以下示例中,客户端会向服务端发送消息。

  • EchoClient:
/**
 * 客户端。发送数据给服务端,并接收服务端的响应。
 *
 */
public final class EchoClient {

    static final String HOST = System.getProperty("host", "127.0.0.1");
    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
    static final int SIZE = Integer.parseInt(System.getProperty("size", "256"));

    public static void main(String[] args) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
             .channel(NioSocketChannel.class)
             .option(ChannelOption.TCP_NODELAY, true)
             .handler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel socketChannel) throws Exception {
                     ChannelPipeline channelPipeline = socketChannel.pipeline();
                     //channelPipeline.addLast(new LoggingHandler(LogLevel.INFO));
                     //ChannelHandler,用于处理 channel,实现对接收的数据的处理,实现业务逻辑。
                     channelPipeline.addLast(new EchoClientHandler());
                 }
             });

            // 开启客户端,连接服务端的端口
            ChannelFuture channelFuture = bootstrap.connect(HOST, PORT).sync();

            channelFuture.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }
}

  • EchoClientHandler:
/**
 * 客户端 的 ChannelHandler.
 */
@Slf4j
public class EchoClientHandler extends ChannelInboundHandlerAdapter {


    /**
     * channelActive() 客户端跟服务器的连接建立之后将被调用.
     *
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        ByteBuf firstMessage = Unpooled.buffer(EchoClient.SIZE);
        byte[] bytes = "见到你很高兴^_^\n".getBytes(CharsetUtil.UTF_8);
        firstMessage.writeBytes(bytes);
        //向服务器发送数据
        ctx.writeAndFlush(firstMessage);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf buf = (ByteBuf) msg;
        log.info("服务器发来的消息:" + buf.toString(CharsetUtil.UTF_8));
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

执行:

先启动服务端,然后再启动客户端。

服务端收到客户端的信息:

11:25:03.081 [nioEventLoopGroup-3-1] INFO com.example.demo.netty.echo.EchoServerHandler - 客户端发来的消息:见到你很高兴^_^

客户端收到服务端的回复:

11:25:03.091 [nioEventLoopGroup-2-1] INFO com.example.demo.netty.echo.EchoClientHandler - 服务器发来的消息:见到你,我也很高兴^_^

Netty常用类

以上的这个示例,用到了 Netty常用的类,

详情见:https://blog.csdn.net/sinat_32502451/article/details/133934402

参考资料:

https://zhuanlan.zhihu.com/p/415450910