【Netty】使用Netty搭建简易Sokect客户端

发布时间 2023-11-02 10:41:48作者: D:

直接上代码

创建客户端,连接到服务端,并发送消息:

  
/** 发送一条消息到socket服务端*/
private void sendOne(String rawMessage) {
        NioEventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap
                    .group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) {
                            ch.pipeline()
                                    .addLast(new MyDecoder())
                                    .addLast(new MyEncoder())
                                    .addLast(new MyClientHandler());
                        }
                    });
            ChannelFuture future = bootstrap.connect("192.168.1.101", 4000).sync();
            //发送消息
            future.channel().writeAndFlush(rawMessage);
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            log.error("发送消息异常", e);
        } finally {
            group.shutdownGracefully();
        }
    }
//解码  服务端返回信息解码工具
public class MyDecoder extends DelimiterBasedFrameDecoder {
    private static final int MAX_FRAME_LENGTH = 4096;
    
    //和服务端约定的起始字符和中止字符
    private static char startByte = (char)11;
    private static char endByte1 = (char)28;
    private static char endByte2 = (char)13;
    
    public MyDecoder() {
        super(MAX_FRAME_LENGTH, true, Unpooled.copiedBuffer(
                new char[]{endByte1, endByte2}, StandardCharsets.UTF_8));
    }
    
    @Override
    protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
        ByteBuf buf = (ByteBuf) super.decode(ctx, buffer);
        if (buf != null) {
            try {
                int pos = buf.bytesBefore((byte) startByte);
                if (pos >= 0) {
                    ByteBuf msg = buf.readerIndex(pos + 1).slice();
                    return asString(msg);
                } else {
                    throw new DecoderException("找不到开始符号:" + Integer.toHexString(startByte));
                }
            } finally {
                buf.release();
            }
        }
        return null;
    }
    
    private String asString(ByteBuf msg) {
        String text = msg.toString(StandardCharsets.UTF_8);
        return text;
    }
}

  

//编码 对发送的消息进行编码
public class MyEncoder extends MessageToByteEncoder<Object> {
    private static char startByte = (char)11;
    private static char endByte1 = (char)28;
    private static char endByte2 = (char)13;
    
    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, Object message, ByteBuf byteBuf) throws Exception {
        byte[] body;
        if (message instanceof String) {
            body = ((String) message).getBytes(StandardCharsets.UTF_8);
        } else if (message instanceof byte[]) {
            body = (byte[]) message;
        } else {
            throw new IllegalArgumentException("不支持的类型:"
                    + message.getClass().getCanonicalName());
        }
        byteBuf.writeByte(startByte);
        byteBuf.writeBytes(body);
        byteBuf.writeByte(endByte1);
        byteBuf.writeByte(endByte2);
    }
}

  

//收到服务端返回消息后的处理,此处逻辑为直接关闭通道,即结束通讯
@ChannelHandler.Sharable
public class MyClientHandler extends SimpleChannelInboundHandler<String> {
    
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        //收到服务器返回信息后,关闭通道。
        ctx.channel().close();
    }
}

  

以上方式为通过各方信息整合后,得到的符合自身需求的一个Netty客户端发送消息的编码,经过了测试和验证,满足发送一条消息到服务端,收到服务端返回信息后关闭通道的需要。