Netty——4、优化

发布时间 2023-06-28 14:54:24作者: 啊噢1231

1、扩展序列化算法

序列化、反序列化主要用在消息正文的转换上:

  • 序列化时,需要将 Java 对象变为要传输的数据(可以是 byte[],或 json 等,最终都需要变成 byte[]);
  • 反序列化时,需要将传入的正文数据还原成 Java 对象,便于处理。

目前的代码仅支持 Java 自带的序列化,反序列化机制,核心代码如下:

// 反序列化
byte[] body = new byte[bodyLength];
bytebuf.readBytes(body);
ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(body));
Message message = (Message) in.readObject();
message.setSequenceId(sequenceId);

// 序列化
ByteArrayOutputStream out = new ByteArrayOutputStream();
new ObjectOutputStream(out).writeObject(message);
byte[] bytes = out.toByteArray();

为了支持更多序列化算法,抽象一个 Serializer 接口。

pulic interface Serializer {
    // 反序列化方法
    <T> T deserialize(Class<T> clazz, byte[] bytes);
    
    // 序列化方法
    <T> byte[] serialize(T object);
}

代码演示:

public interface Serializer {

    <T> T deserialize(Class<T> clazz, byte[] bytes);

    <T> byte[] serialize(T object);

    enum Algorithm implements Serializer {
        Java {
            @Override
            @SuppressWarnings("unchecked")
            public <T> T deserialize(Class<T> clazz, byte[] bytes) {
                try {
                    ObjectInputStream oiStream = null;
                    oiStream = new ObjectInputStream(new ByteArrayInputStream(bytes));
                    return ((T) oiStream.readObject());
                } catch (IOException | ClassNotFoundException e) {
                    throw new RuntimeException("反序列化失败", e);
                }
            }

            @Override
            public <T> byte[] serialize(T object) {
                try {
                    ByteArrayOutputStream baoStream = new ByteArrayOutputStream();
                    ObjectOutputStream ooStream = null;
                    ooStream = new ObjectOutputStream(baoStream);
                    ooStream.writeObject(object);
                    return baoStream.toByteArray();
                } catch (IOException e) {
                    throw new RuntimeException("序列化失败", e);
                }
            }
        },

        JSON {
            @Override
            public <T> T deserialize(Class<T> clazz, byte[] bytes) {
                String json = new String(bytes, StandardCharsets.UTF_8);
                return new Gson().fromJson(json, clazz);
            }

            @Override
            public <T> byte[] serialize(T object) {
                String json = new Gson().toJson(object);
                return json.getBytes(StandardCharsets.UTF_8);
            }
        }
    }

}

2、参数调优

2.1、CONNECT_TIMEOUT_MILLIS

  • 属于 SocketChannel 参数;
  • 用在客户端建立连接时,如果在指定毫秒内无法连接,会抛出 timeout 异常;
  • SO_TIMEOUT 主要用在阻塞IO,阻塞 IO 中 accept、read 等待都是无限等待的,如果不希望永远阻塞,使用它调整超时时间。
/**
 *
 // 客户端通过 .option() 方法配置参数
 // 服务器端通过 .option() 方法给 ServerSocketChannel 配置参数;通过 .childOption() 方法给 SocketChannel 配置参数
 */
@Slf4j
public class TestConnectionTimeout {
    public static void main(String[] args) {
        NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap().group(eventLoopGroup)
                    .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 300)
                    .channel(NioSocketChannel.class)
                    .handler(new LoggingHandler());
            ChannelFuture future = bootstrap.connect("127.0.0.1", 8080);
            future.sync().channel().closeFuture().sync();
        } catch (Exception ex) {
            ex.printStackTrace();
            log.debug("timeout");
        } finally {
            eventLoopGroup.shutdownGracefully();
        }
    }
}

2.2、SO_BACKLOG

属于 ServerSocketChannel 参数。

  • 第1次握手,client 发送 SYN 到 server,状态修改为 SYN_SEND,server 收到,状态改变为 SYN_REVD,并将该请求放入 sync queue 队列;
  • 第2次握手,server 回复 SYN + ACK 给 client,client 收到,状态改变为 ESTABLISHED,并发送 ACK 给 server;
  • 第3次握手,server 收到 ACK,状态改变为 ESTABLISHED,将该请求从 sync queue 放入 accept queue。

在 Linux 2.2 之前,backlog 大小包括了2个队列的大小,在 2.2 之后,分别用下面两个参数来控制:

  • sysc queue - 半连接队列:大小通过 /proc/sys/net/ipv4/tcp_max_syn_backlog 指定,在 sync cookies 启用的情况下,逻辑上没有最大值限制,这个设置便被忽略;
  • accept queue - 全连接队列:大小通过 /proc/sys/net/core/somaxconn 指定,在使用 listen 函数时,内核会根据传入的 backlog 参数与系统参数,取二者的较小值。如果 accept queue 队列满了,server 将发送一个拒绝连接的错误信息到 client。

netty 中,可以通过 option(ChannelOption.SO_BACK_LOG, 值) 来设置大小。

2.3、ulimit -n

属于操作系统参数,限制一个进程能同时打开的最大文件描述符(FD)数量。建议放在启动脚本中。

2.4、TCP_NODELAY

属于 SocketChannel 参数,netty 默认为false,即开启了 nagle 算法,如果我们不想要延迟,则应设为 true。

2.5、SO_SNDBUF & SO_REVBUF

滑动窗口上限,我们一般不需要调整,因为操作系统会自动帮我们进行调整。

  • SO_SNDBUF 属于 SocketChannel 参数;
  • SO_REVBUF 既可以用于 SocketChannel 参数,也可以用于 ServerSocketChannel 参数(建议设置到 ServerSocketChannel 上)。

2.6、ALLOCATOR

  • 属于 SocketChannel 参数;
  • 用来分配 ByteBuf,ctx.alloc()。

2.7、REVBUF_ALLOCATOR

  • 属于 SocketChannel 参数;
  • 控制 netty 接收缓冲区的大小;
  • 负责入站数据的分配,决定入站缓冲区的大小(并可动态调整),统一采用 direct 直接内存,具体池化还是非池化由 allocator 决定。

3、RPC 框架

为了简化起见,在原来聊天项目的基础上新增 RPC 请求和响应消息。

@Data
public abstract class Message implements Serializable {

    public static final int RPC_MESSAGE_TYPE_REQUEST = 101;
    public static final int RPC_MESSAGE_TYPE_RESPONSE = 102;
    
    private static final Map<Integer, Class<? extends Message>> MESSAGE_CLASSES = new ConcurrentHashMap<>();
    
    static {
        MESSAGE_CLASSES.put(RPC_MESSAGE_TYPE_REQUEST, RpcRequestMessage.class);
        MESSAGE_CLASSES.put(RPC_MESSAGE_TYPE_RESPONSE, RpcResponseMessage.class);
    }

    public static Class<?> getMassageClass(int messageType) {
        return MESSAGE_CLASSES.get(messageType);
    }

    public abstract int getMessageType();
    
}

请求消息:

@Data
@ToString(callSuper = true)
public class RpcRequestMessage extends Message {
    private String interfaceName; // 调用的接口全限定名,服务端根据它找到实现
    private String methodName; // 调用接口中的方法名
    private Class<?> returnType; // 方法返回类型
    private Class<?>[] parameterTypes; // 方法参数类型数组
    private Object[] parameterValues; // 方法参数值数组

    public RpcRequestMessage(int sequenceId, String interfaceName, String methodName, Class<?> returnType,
                             Class<?>[] parameterTypes, Object[] parameterValues) {
        super(sequenceId);
        this.interfaceName = interfaceName;
        this.methodName = methodName;
        this.returnType = returnType;
        this.parameterTypes = parameterTypes;
        this.parameterValues = parameterValues;
    }


    @Override
    public int getMessageType() {
        return RPC_MESSAGE_TYPE_REQUEST;
    }
}

响应消息:

@Data
@ToString(callSuper = true)
public class RpcResponseMessage extends Message {
    private Object returnValue;
    private Exception exceptionValue;
    
    @Override
    public int getMessageType() {
        return RPC_MESSAGE_TYPE_RESPONSE;
    }
}

服务端架子:

@ChannelHandler.Sharable
public class RpcRequestMessageHandler extends SimpleChannelInboundHandler<RpcRequestMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RpcRequestMessage message) {
        RpcResponseMessage responseMsg = new RpcResponseMessage();
        responseMsg.setSequenceId(message.getSequenceId());
        try {
            HelloService helloService = (HelloService) ServicesFactory.getService(Class.forName(message.getInterfaceName()));
            Method method = helloService.getClass().getMethod(message.getMethodName(), message.getParameterTypes());
            Object result = method.invoke(helloService, message.getParameterValues());

            responseMsg.setReturnValue(result);

        } catch (Exception e) {
            e.printStackTrace();
            responseMsg.setExceptionValue(new Exception("远程调用出错:" + e.getCause().getMessage()));
        }
        ctx.writeAndFlush(responseMsg);
    }
}

@Slf4j
public class RpcServer {
    public static void main(String[] args) {
        NioEventLoopGroup boos = new NioEventLoopGroup();
        NioEventLoopGroup worker = new NioEventLoopGroup();

        LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
        MessageCodec MESSAGE_CODEC = new MessageCodec();
        // rpc 请求消息处理器
        RpcRequestMessageHandler RPC_HANDLER = new RpcRequestMessageHandler();

        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.channel(NioServerSocketChannel.class);
            serverBootstrap.group(boos, worker);
            serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new ProtocolFrameDecoder());
                    ch.pipeline().addLast(LOGGING_HANDLER);
                    ch.pipeline().addLast(MESSAGE_CODEC);
                    ch.pipeline().addLast(RPC_HANDLER);
                }
            });
            Channel channel = serverBootstrap.bind(8080).sync().channel();
            channel.closeFuture().sync();
        } catch (InterruptedException e) {
            log.error("server error", e);
        } finally {
            boos.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }
}

客户端架子:

@Slf4j
@ChannelHandler.Sharable
public class RpcResponseMessageHandler extends SimpleChannelInboundHandler<RpcResponseMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception {
        log.debug("{}", msg);
    }
}

@Slf4j
public class RpcClient {
    public static void main(String[] args) {
        NioEventLoopGroup group = new NioEventLoopGroup();
        LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
        RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler();

        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.group(group);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new ProtocolFrameDecoder());
                    ch.pipeline().addLast(LOGGING_HANDLER);
                    ch.pipeline().addLast(new MessageCodec());
                    ch.pipeline().addLast(RPC_HANDLER);
                }
            });
            Channel channel = bootstrap.connect("127.0.0.1", 8080).sync().channel();

            ChannelFuture future = channel.writeAndFlush(new RpcRequestMessage(
                    1,
                    "com.clp.test3.HelloService",
                    "sayHello",
                    String.class,
                    new Class[]{String.class},
                    new Object[]{"张三"}
            )).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (!future.isSuccess()) {
                        Throwable cause = future.cause();
                        log.error("error: {}", cause.getMessage());
                    }
                }
            });

            channel.closeFuture().sync().addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    group.shutdownGracefully();
                }
            });

        } catch (Exception e) {
            log.error("client error", e);
        }
    }
}

RPC例子:

@Slf4j
@ChannelHandler.Sharable
public class RpcResponseMessageHandler extends SimpleChannelInboundHandler<RpcResponseMessage> {
    // Map<SequenceId, 结果容器Promise> ,用来接收结果
    public static final Map<Integer, Promise<Object>> PROMISE_MAP = new ConcurrentHashMap<>();

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception {
        // 拿到空的 promise
        Promise<Object> promise = PROMISE_MAP.remove(msg.getSequenceId());
        if (promise != null) {
            Object returnValue = msg.getReturnValue();
            Exception exceptionValue = msg.getExceptionValue();
            if (exceptionValue != null) {
                promise.setFailure(exceptionValue);
            } else {
                promise.setSuccess(returnValue);
            }
        }

        log.debug("{}", msg);
    }
}

@Slf4j
public class RpcClientManager {
    public static void main(String[] args) {
        HelloService helloService = getProxyService(HelloService.class);
        helloService.sayHello("张三");
        helloService.sayHello("李四");
    }

    // 创建代理类,实现要远程调用的接口
    @SuppressWarnings("unchecked")
    public static <T> T getProxyService(Class<T> serviceClass) {
        ClassLoader loader = serviceClass.getClassLoader();
        Class<?>[] interfaces = new Class[]{serviceClass};
        Object proxy = Proxy.newProxyInstance(loader, interfaces, new InvocationHandler() {
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                // 1、将方法的调用转化成 消息对象
                int sequenceId = SequenceIdGenerator.nextId();
                RpcRequestMessage message = new RpcRequestMessage(
                        sequenceId,
                        serviceClass.getName(),
                        method.getName(),
                        method.getReturnType(),
                        method.getParameterTypes(),
                        args
                );
                // 2、将消息对象发送出去
                getChannel().writeAndFlush(message);
                // 3、准备一个空 Promise 对象,用来接收结果        指定 promise 对象异步接收结果的线程(即要回调 listener 的方法的线程)
                DefaultPromise<Object> promise = new DefaultPromise<>(getChannel().eventLoop());
                RpcResponseMessageHandler.PROMISE_MAP.put(sequenceId, promise);
                // 等待 promise 的结果(sync()方法会抛异常,而await()不会)
                promise.await();
                if (promise.isSuccess()) {
                    // 调用正常
                    return promise.getNow();
                } else {
                    // 调用失败
                    throw new RuntimeException(promise.cause());
                }
            }
        });
        return ((T) proxy);
    }

    private static Channel channel = null;
    private static final Object LOCK = new Object();

    // 获取唯一的 channel 对象
    public static Channel getChannel() {
        if (channel != null) return channel;
        synchronized (LOCK) {
            if (channel != null) return channel;
            initChannel();
            return channel;
        }
    }

    // 初始化 Channel
    static void initChannel() {
        NioEventLoopGroup group = new NioEventLoopGroup();
        LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
        RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler();

        Bootstrap bootstrap = new Bootstrap();
        bootstrap.channel(NioSocketChannel.class);
        bootstrap.group(group);
        bootstrap.handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline().addLast(new ProtocolFrameDecoder());
                ch.pipeline().addLast(LOGGING_HANDLER);
                ch.pipeline().addLast(new MessageCodec());
                ch.pipeline().addLast(RPC_HANDLER);
            }
        });
        try {
            channel = bootstrap.connect("127.0.0.1", 8080).sync().channel();
            channel.closeFuture().addListener((ChannelFutureListener) future -> group.shutdownGracefully());
        } catch (Exception e) {
            log.error("client error", e);
        }
    }
}