直接上代码
创建客户端,连接到服务端,并发送消息:
/** 发送一条消息到socket服务端*/ private void sendOne(String rawMessage) { NioEventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap .group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { ch.pipeline() .addLast(new MyDecoder()) .addLast(new MyEncoder()) .addLast(new MyClientHandler()); } }); ChannelFuture future = bootstrap.connect("192.168.1.101", 4000).sync(); //发送消息 future.channel().writeAndFlush(rawMessage); future.channel().closeFuture().sync(); } catch (Exception e) { log.error("发送消息异常", e); } finally { group.shutdownGracefully(); } }
//解码 服务端返回信息解码工具 public class MyDecoder extends DelimiterBasedFrameDecoder { private static final int MAX_FRAME_LENGTH = 4096; //和服务端约定的起始字符和中止字符 private static char startByte = (char)11; private static char endByte1 = (char)28; private static char endByte2 = (char)13; public MyDecoder() { super(MAX_FRAME_LENGTH, true, Unpooled.copiedBuffer( new char[]{endByte1, endByte2}, StandardCharsets.UTF_8)); } @Override protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception { ByteBuf buf = (ByteBuf) super.decode(ctx, buffer); if (buf != null) { try { int pos = buf.bytesBefore((byte) startByte); if (pos >= 0) { ByteBuf msg = buf.readerIndex(pos + 1).slice(); return asString(msg); } else { throw new DecoderException("找不到开始符号:" + Integer.toHexString(startByte)); } } finally { buf.release(); } } return null; } private String asString(ByteBuf msg) { String text = msg.toString(StandardCharsets.UTF_8); return text; } }
//编码 对发送的消息进行编码 public class MyEncoder extends MessageToByteEncoder<Object> { private static char startByte = (char)11; private static char endByte1 = (char)28; private static char endByte2 = (char)13; @Override protected void encode(ChannelHandlerContext channelHandlerContext, Object message, ByteBuf byteBuf) throws Exception { byte[] body; if (message instanceof String) { body = ((String) message).getBytes(StandardCharsets.UTF_8); } else if (message instanceof byte[]) { body = (byte[]) message; } else { throw new IllegalArgumentException("不支持的类型:" + message.getClass().getCanonicalName()); } byteBuf.writeByte(startByte); byteBuf.writeBytes(body); byteBuf.writeByte(endByte1); byteBuf.writeByte(endByte2); } }
//收到服务端返回消息后的处理,此处逻辑为直接关闭通道,即结束通讯 @ChannelHandler.Sharable public class MyClientHandler extends SimpleChannelInboundHandler<String> { @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { //收到服务器返回信息后,关闭通道。 ctx.channel().close(); } }
以上方式为通过各方信息整合后,得到的符合自身需求的一个Netty客户端发送消息的编码,经过了测试和验证,满足发送一条消息到服务端,收到服务端返回信息后关闭通道的需要。