Netty使用实例

发布时间 2023-05-30 17:38:38作者: 田野与天

当然!这是三个使用Java实现的Netty示例代码,用于演示Netty的基本用法和通信模式:

示例1:简单的Echo服务器和客户端

在此示例中,我们将创建一个简单的Echo服务器和客户端,客户端向服务器发送消息,并接收服务器返回的相同消息。

服务器实现类

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

public class EchoServer {
    public static void main(String[] args) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                     .channel(NioServerSocketChannel.class)
                     .handler(new LoggingHandler(LogLevel.INFO))
                     .childHandler(new ChannelInitializer<SocketChannel>() {
                         @Override
                         protected void initChannel(SocketChannel ch) {
                             ChannelPipeline pipeline = ch.pipeline();
                             pipeline.addLast(new StringDecoder());
                             pipeline.addLast(new StringEncoder());
                             pipeline.addLast(new EchoServerHandler());
                         }
                     });

            ChannelFuture future = bootstrap.bind(8080).sync();
            future.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

服务器处理器类

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

public class EchoServerHandler extends SimpleChannelInboundHandler<String> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) {
        ctx.writeAndFlush(msg);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

客户端实现类

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

public class EchoClient {
    public static void main(String[] args) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();

        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                     .channel(NioSocketChannel.class)
                     .handler(new ChannelInitializer<SocketChannel>() {
                         @Override
                         protected void initChannel(SocketChannel ch) {
                             ChannelPipeline pipeline = ch.pipeline();
                             pipeline.addLast(new StringDecoder());
                             pipeline.addLast(new StringEncoder());
                             pipeline.addLast(new EchoClientHandler());
                         }
                     });

            ChannelFuture future = bootstrap.connect("localhost", 8080).sync();
            future.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }
}

客户端处理器类

import io.netty.buffer.ByteBuf

;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class EchoClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        String message = "Hello, Netty!";
        ByteBuf buffer = Unpooled.buffer(message.length());
        buffer.writeBytes(message.getBytes());
        ctx.writeAndFlush(buffer);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf buffer = (ByteBuf) msg;
        byte[] bytes = new byte[buffer.readableBytes()];
        buffer.readBytes(bytes);
        String response = new String(bytes);
        System.out.println("Received: " + response);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

在示例1中,我们创建了一个简单的Echo服务器和客户端。服务器通过接收客户端发送的消息,并将其原样返回给客户端来实现Echo功能。

服务器实现类EchoServer创建了两个EventLoopGroup,一个用于接收连接(bossGroup),一个用于处理连接的I/O操作(workerGroup)。然后,我们使用ServerBootstrap配置服务器,并设置处理器(LoggingHandler)和管道初始化程序(ChannelInitializer)。管道初始化程序添加了一个字符串解码器、一个字符串编码器和EchoServerHandler作为最后一个处理器。

客户端实现类EchoClient也创建了一个EventLoopGroup。然后,我们使用Bootstrap配置客户端,并设置处理器(ChannelInitializer)。管道初始化程序添加了一个字符串解码器、一个字符串编码器和EchoClientHandler作为最后一个处理器。

运行服务器和客户端代码后,您将看到客户端成功连接到服务器,并发送消息。服务器接收到消息后,将其原样返回给客户端,客户端接收到服务器返回的消息并将其打印出来。

示例2:WebSocket服务器和客户端

在此示例中,我们将创建一个基于WebSocket的服务器和客户端,用于实现双向通信。

服务器实现类

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrameAggregator;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;

public class WebSocketServer {
    public static void main(String[] args) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                     .channel(NioServerSocketChannel.class)
                     .childHandler(new ChannelInitializer<SocketChannel>() {
                         @Override
                         protected void initChannel(SocketChannel ch) {
                             ChannelPipeline pipeline = ch.pipeline();
                             pipeline.addLast(new HttpServerCodec());
                             pipeline.addLast

(new ChunkedWriteHandler());
                             pipeline.addLast(new HttpObjectAggregator(8192));
                             pipeline.addLast(new WebSocketFrameAggregator(8192));
                             pipeline.addLast(new WebSocketServerProtocolHandler("/websocket"));
                             pipeline.addLast(new WebSocketServerHandler());
                         }
                     });

            ChannelFuture future = bootstrap.bind(8080).sync();
            future.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

服务器处理器类

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;

public class WebSocketServerHandler extends SimpleChannelInboundHandler<WebSocketFrame> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) {
        if (frame instanceof TextWebSocketFrame) {
            TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
            String request = textFrame.text();
            System.out.println("Received: " + request);

            String response = "Hello, WebSocket!";
            ctx.writeAndFlush(new TextWebSocketFrame(response));
        } else {
            throw new UnsupportedOperationException("Unsupported frame type: " + frame.getClass().getName());
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

客户端实现类

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrameAggregator;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
import io.netty.handler.stream.ChunkedWriteHandler;

import java.net.URI;

public class WebSocketClient {
    public static void main(String[] args) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();

        try {
            URI uri = new URI("ws://localhost:8080/websocket");
            WebSocketClientHandshaker handshaker = WebSocketClientHandshakerFactory.newHandshaker(
                    uri, WebSocketVersion.V13, null, false, null);

            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                     .channel(NioSocketChannel.class)
                     .handler(new ChannelInitializer<SocketChannel>() {
                         @Override
                         protected void initChannel(SocketChannel ch) {
                             ChannelPipeline pipeline = ch.pipeline();
                             pipeline.addLast(new HttpClientCodec());
                             pipeline.addLast(new ChunkedWriteHandler());
                             pipeline.addLast(new HttpObjectAggregator(8192));
                             pipeline.addLast(new WebSocketFrameAggregator(8192));
                             pipeline.addLast(new WebSocketClientHandler(handshaker));
                         }
                     });

            ChannelFuture future = bootstrap.connect(uri.getHost(), uri.getPort()).sync();
            future.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }
}

客户端处理器类

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;


import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrameAggregator;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;

public class WebSocketClientHandler extends SimpleChannelInboundHandler<WebSocketFrame> {
    private final WebSocketClientHandshaker handshaker;

    public WebSocketClientHandler(WebSocketClientHandshaker handshaker) {
        this.handshaker = handshaker;
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        handshaker.handshake(ctx.channel());
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        if (!handshaker.isHandshakeComplete()) {
            handshaker.finishHandshake(ctx.channel(), (FullHttpResponse) msg);
            System.out.println("WebSocket handshake completed");
            return;
        }

        if (msg instanceof TextWebSocketFrame) {
            TextWebSocketFrame textFrame = (TextWebSocketFrame) msg;
            String response = textFrame.text();
            System.out.println("Received: " + response);
        } else {
            throw new UnsupportedOperationException("Unsupported frame type: " + msg.getClass().getName());
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

在示例2中,我们创建了一个基于WebSocket的服务器和客户端。服务器使用HttpServerCodec处理HTTP请求和响应,使用WebSocketServerProtocolHandler处理WebSocket升级握手和握手完成后的WebSocket帧,然后使用自定义的WebSocketServerHandler处理接收到的WebSocket消息。

客户端使用HttpClientCodec处理HTTP请求和响应,使用WebSocketClientHandshakerFactory创建WebSocketClientHandshaker来进行握手,使用自定义的WebSocketClientHandler处理握手完成后接收到的WebSocket消息。

运行服务器和客户端代码后,您将看到客户端成功连接到服务器,并可以通过WebSocket进行双向通信。客户端发送消息给服务器,服务器接收到消息并返回相同的消息给客户端,并在控制台上打印接收到的消息。

示例3:使用Netty进行文件传输

在此示例中,我们将创建一个文件传输服务器和客户端,用于将文件从客户端传输到服务器。

服务器实现类

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.stream.ChunkedWriteHandler;

public class FileServer {
    public static void main(String[] args) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                     .channel(NioServerSocketChannel.class)
                     .childHandler(new ChannelInitializer<Socket

Channel>() {
                         @Override
                         protected void initChannel(SocketChannel ch) {
                             ChannelPipeline pipeline = ch.pipeline();
                             pipeline.addLast(new LengthFieldPrepender(4));
                             pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
                             pipeline.addLast(new ChunkedWriteHandler());
                             pipeline.addLast(new FileServerHandler());
                         }
                     });

            ChannelFuture future = bootstrap.bind(8080).sync();
            future.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

服务器处理器类

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.LastHttpContent;

import java.io.File;
import java.io.FileOutputStream;
import java.nio.channels.FileChannel;

public class FileServerHandler extends SimpleChannelInboundHandler<Object> {
    private FileChannel fileChannel;
    private FileOutputStream fileOutputStream;
    private File file;
    private boolean fileStarted;

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
        if (msg instanceof HttpRequest) {
            HttpRequest request = (HttpRequest) msg;
            if (HttpUtil.is100ContinueExpected(request)) {
                ctx.writeAndFlush(HttpResponseStatus.CONTINUE);
            }

            HttpHeaders headers = request.headers();
            String filePath = headers.get("FilePath");
            if (filePath != null && !fileStarted) {
                try {
                    file = new File(filePath);
                    fileOutputStream = new FileOutputStream(file);
                    fileChannel = fileOutputStream.getChannel();
                    fileStarted = true;
                } catch (Exception e) {
                    e.printStackTrace();
                    ctx.close();
                }
            }
        }

        if (msg instanceof LastHttpContent && fileStarted) {
            LastHttpContent content = (LastHttpContent) msg;
            try {
                fileChannel.transferFrom(content.content().nioBuffer(), fileChannel.size(), content.content().readableBytes());
                fileChannel.force(true);
                fileChannel.close();
                fileOutputStream.close();
                fileStarted = false;
                ctx.writeAndFlush(HttpResponseStatus.OK);
            } catch (Exception e) {
                e.printStackTrace();
                ctx.close();
            }
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

客户端实现类

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.*;
import io.netty.util.CharsetUtil;

import java.io.File;
import java.io.FileInputStream;
import java.nio.channels.FileChannel;

public class FileClient {
    public static void main(String[] args) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();

        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                     .channel(NioSocketChannel.class)
                     .handler(new ChannelInitializer<SocketChannel>() {
                         @Override
                         protected void initChannel(SocketChannel ch) {
                             ChannelPipeline pipeline = ch.pipeline();
                             pipeline.addLast(new HttpClientCodec());
                             pipeline.addLast(new HttpObjectAggregator(819

2));
                             pipeline.addLast(new FileClientHandler());
                         }
                     });

            ChannelFuture future = bootstrap.connect("localhost", 8080).sync();
            future.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }
}

客户端处理器类

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;

import java.io.File;
import java.io.FileInputStream;
import java.nio.channels.FileChannel;

public class FileClientHandler extends SimpleChannelInboundHandler<HttpResponse> {
    private FileChannel fileChannel;
    private FileInputStream fileInputStream;
    private File file;
    private boolean fileStarted;

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        file = new File("path/to/file"); // 替换为要传输的文件路径
        fileInputStream = new FileInputStream(file);
        fileChannel = fileInputStream.getChannel();

        DefaultHttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/");
        request.headers().set(HttpHeaderNames.HOST, "localhost");
        request.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
        request.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED);
        request.headers().set("FilePath", file.getAbsolutePath());

        ctx.write(request);
        ctx.flush();
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, HttpResponse msg) throws Exception {
        if (msg.status().equals(HttpResponseStatus.OK)) {
            while (fileChannel.position() < fileChannel.size()) {
                ByteBuf buf = Unpooled.buffer(8192);
                int bytesRead = fileChannel.read(buf.nioBuffer());
                if (bytesRead > 0) {
                    buf.writerIndex(bytesRead);
                    DefaultHttpContent content = new DefaultHttpContent(buf);
                    ctx.writeAndFlush(content);
                }
            }

            DefaultLastHttpContent lastContent = new DefaultLastHttpContent();
            ctx.writeAndFlush(lastContent);

            fileChannel.close();
            fileInputStream.close();
        } else {
            System.out.println("Server responded with status: " + msg.status());
            ctx.close();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

在示例3中,我们创建了一个文件传输服务器和客户端。服务器使用LengthFieldPrepender和LengthFieldBasedFrameDecoder处理文件的分块传输,并使用ChunkedWriteHandler实现文件的传输。服务器接收到客户端发送的文件,并将其保存到指定路径。

客户端使用HttpClientCodec处理HTTP请求和响应,创建一个包含文件路径的HttpRequest,并将文件内容以块的方式写入到ChannelHandlerContext中。客户端在发送完文件后关闭连接。

注意:在示例3中,请将文件路径替换为要传输的实际文件路径。

运行服务器和客户端代码后,客户端将文件传输给服务器,并将其保存到指定路径。在服务器和客户端的控制台上都将打印出传输完成的状态信息。