Nacos源码 (5) Grpc服务端和客户端

发布时间 2023-09-04 10:56:48作者: 用户不存在!

Nacos 2.x在服务端与客户端直接增加了GRPC通信方式,本文通过2.0.2版本源码,简单分析GRPC通信方式:

  • 服务器启动
  • 客户端连接
  • 客户端心跳
  • 服务器监控检查

服务器

proto文件

api/src/main/proto/nacos_grpc_service.proto文件:

syntax = "proto3";

import "google/protobuf/any.proto";
import "google/protobuf/timestamp.proto";

option java_multiple_files = true;
option java_package = "com.alibaba.nacos.api.grpc.auto";

message Metadata {
  string type = 3; // 请求/响应的真实类型
  string clientIp = 8;
  map<string, string> headers = 7;
}

// GRPC通信层请求/响应体
message Payload {
  Metadata metadata = 2;
  // 业务层的请求/响应体,需要使用type做反序列化
  google.protobuf.Any body = 3;
}

service RequestStream {
  // build a streamRequest
  rpc requestStream (Payload) returns (stream Payload) {
  }
}

service Request {
  // Sends a commonRequest
  rpc request (Payload) returns (Payload) {
  }
}

service BiRequestStream {
  // Sends a commonRequest
  rpc requestBiStream (stream Payload) returns (stream Payload) {
  }
}

文件定义了通信层的service和message结构,业务层请求响应的序列化和反序列化是Nacos在RequestAcceptor/Connection中使用工具类实现的,业务层请求处理是在RequestAcceptor中进行的转发。

服务器启动

Server类继承关系

BaseRpcServer
  |-- BaseGrpcServer
     |-- GrpcSdkServer
     |-- GrpcClusterServer

此处介绍一下GrpcSdkServer实现。

GrpcSdkServer类

@Service
public class GrpcSdkServer extends BaseGrpcServer {

    // 所以SDK服务器的监听端口是9848
    private static final int PORT_OFFSET = 1000;

    @Override
    public int rpcPortOffset() {
        return PORT_OFFSET;
    }

    @Override
    public ThreadPoolExecutor getRpcExecutor() {
        return GlobalExecutor.sdkRpcExecutor;
    }
}

大部分的启动逻辑在BaseGrpcServer中。

BaseGrpcServer类

GRPC服务器的启动逻辑大部分都在这个类的startServer方法。

  1. 将处理请求的RequestAcceptor注册到HandlerRegistry
    • GrpcRequestAcceptor用于处理普通业务请求
    • GrpcBiStreamRequestAcceptor用于处理连接建立请求,获取Channel创建GrpcConnection并注册到ConnectionManager中,后续向客户端发送消息都是使用GrpcConnection做的
  2. 创建GRPC的Server对象
    • 设置port和executor
    • 设置HandlerRegistry
    • 添加ServerTransportFilter在连接建立和断开时做一些业务操作
  3. 启动Server

GrpcRequestAcceptor类

这个类对GRPC做了扩展,重写了request方法:

  1. 解析Payload获取请求体的数据类型
  2. 从RequestHandlerRegistry获取适配的RequestHandler处理器
  3. 将请求体反序列化成请求体类型对象
  4. 调用handleRequest方法处理请求返回响应

处理请求代码:

Request request = (Request) parseObj;
try {
    // 获取Connection
    Connection connection = connectionManager.getConnection(CONTEXT_KEY_CONN_ID.get());
    RequestMeta requestMeta = new RequestMeta();
    requestMeta.setClientIp(connection.getMetaInfo().getClientIp());
    requestMeta.setConnectionId(CONTEXT_KEY_CONN_ID.get());
    requestMeta.setClientVersion(connection.getMetaInfo().getVersion());
    requestMeta.setLabels(connection.getMetaInfo().getLabels());
    // 刷新活跃时间,后续的健康检查会使用到这个时间戳
    connectionManager.refreshActiveTime(requestMeta.getConnectionId());
    // 使用RequestHandler处理请求
    Response response = requestHandler.handleRequest(request, requestMeta);
    Payload payloadResponse = GrpcUtils.convert(response);
    traceIfNecessary(payloadResponse, false);
    responseObserver.onNext(payloadResponse);
    responseObserver.onCompleted();
} catch (Throwable e) {
    Payload payloadResponse = GrpcUtils.convert(buildErrorResponse(
            (e instanceof NacosException) ? ((NacosException) e).getErrCode() : ResponseCode.FAIL.getCode(),
            e.getMessage()));
    traceIfNecessary(payloadResponse, false);
    responseObserver.onNext(payloadResponse);
    responseObserver.onCompleted();
}

RequestHandler处理器

RequestHandler抽象类是Nacos在业务层处理GRPC请求的抽象类:

public abstract class RequestHandler<T extends Request, S extends Response> {

    @Autowired
    private RequestFilters requestFilters;

    /**
     * Handler request.
     */
    public Response handleRequest(T request, RequestMeta meta) throws NacosException {
        for (AbstractRequestFilter filter : requestFilters.filters) {
            try {
                Response filterResult = filter.filter(request, meta, this.getClass());
                if (filterResult != null && !filterResult.isSuccess()) {
                    return filterResult;
                }
            } catch (Throwable throwable) {
                Loggers.REMOTE.error("filter error", throwable);
            }
        }
        return handle(request, meta);
    }

    /**
     * Handler request.
     */
    public abstract S handle(T request, RequestMeta meta) throws NacosException;
}

实现类:

Nacos使用RequestHandlerRegistry管理所有的RequestHandler,是一个Map结构:

// key是Request类型的简单名
// value是RequestHandler实现类对象
Map<String, RequestHandler> registryHandlers = new HashMap<String, RequestHandler>();

RequestHandlerRegistry会扫描Spring容器里面所有的RequestHandler对象,解析RequestHandler实现类处理的Request类型的简单名,将其注册到registryHandlers中。

GrpcRequestAcceptor类获取适配的RequestHandler处理器使用的就是RequestHandlerRegistry类的getByRequestType方法:

public RequestHandler getByRequestType(String requestType) {
    return registryHandlers.get(requestType);
}

建立连接

在Server初始化的时候,Nacos注册了ServerInterceptor和ServerTransportFilter组件,这些组件会在连接建立时将conn_id、remote_ip、remote_port、local_port、ctx_channel等绑定到Context上。

创建GrpcConnection

客户端在连接建立之后会发送一个ConnectionSetupRequest请求,服务器使用GrpcBiStreamRequestAcceptor处理该请求:

  1. 获取到conn_id、remote_ip、remote_port、local_port等
  2. 解析请求获取clienIp
  3. 封装GrpcConnection对象,包括:conn_id、remote_ip、remote_port、local_port、clientIp、客户端版本等基础信息,以及StreamObserver和Channel
  4. 将GrpcConnection注册到ConnectionManager上

创建Client

ConnectionManager的注册操作会触发ConnectionBasedClientManager的clientConnected方法来创建Client对象:

public void clientConnected(Connection connect) {
    // grpc类型
    String type = connect.getMetaInfo().getConnectType();
    // 此处获取到的是ConnectionBasedClientFactory对象
    ClientFactory clientFactory = ClientFactoryHolder.getInstance().findClientFactory(type);
    // 此处创建的是ConnectionBasedClient对象
    clientConnected(clientFactory.newClient(connect.getMetaInfo().getConnectionId()));
}

public boolean clientConnected(Client client) {
    if (!clients.containsKey(client.getClientId())) {
        // 注册到client集
        // 使用Map维护clientId->client对象关系
        clients.putIfAbsent(client.getClientId(), (ConnectionBasedClient) client);
    }
    return true;
}

健康检查

ConnectionManager连接管理器

这个类管理客户端连接,提供注册连接、移除连接等功能:

// 管理IP -> 连接数,用于实现ConnectionLimitRule
private Map<String, AtomicInteger> connectionForClientIp = new ConcurrentHashMap<String, AtomicInteger>(16);
// 管理connectionId -> Connection
Map<String, Connection> connections = new ConcurrentHashMap<String, Connection>();

Connection抽象类实现了Requester接口,能够向客户端发送请求、管理连接状态。

GrpcConnection实现了Connection抽象类。

在连接建立后,客户端会发送一个ConnectionSetupRequest请求,服务端收到该请求后,会解析出connectionId、客户端IP、客户端端口、客户端版本、Channel等封装成GrpcConnection对象,然后注册到ConnectionManager中。

健康检查周期任务

ConnectionManager在启动阶段会启动一个周期任务来检查IP连接数和连接的活跃状态,每3秒执行一次:

  1. 遍历连接集,使用connectionLimitRule查找需要重置的连接,向这些客户端发reset请求重置连接
  2. 获取连接的最后活跃时间(客户端每次请求都会更新这个时间),如果超过20秒不活跃,则向客户端发送一个探测请求,如果请求失败则断开连接

断开连接

业务处理流程

GRPC连接层检测到连接断开之后,会触发GrpcServer的transportTerminated事件:

public void transportTerminated(Attributes transportAttrs) {
    String connectionId = null;
    try {
        connectionId = transportAttrs.get(TRANS_KEY_CONN_ID);
    } catch (Exception e) {
        // Ignore
    }
    if (StringUtils.isNotBlank(connectionId)) {
        // 使用ConnectionManager移除连接
        connectionManager.unregister(connectionId);
    }
}

ConnectionManager移除连接:

public synchronized void unregister(String connectionId) {
    // 从Connection集移除连接
    Connection remove = this.connections.remove(connectionId);
    if (remove != null) {
        String clientIp = remove.getMetaInfo().clientIp;
        AtomicInteger atomicInteger = connectionForClientIp.get(clientIp);
        // IP连接数--
        if (atomicInteger != null) {
            int count = atomicInteger.decrementAndGet();
            if (count <= 0) {
                connectionForClientIp.remove(clientIp);
            }
        }
        remove.close();
        // 通知ClientManager层移除client对象
        clientConnectionEventListenerRegistry.notifyClientDisConnected(remove);
    }
}

ConnectionBasedClientManager的clientDisconnected方法:

public boolean clientDisconnected(String clientId) {
    ConnectionBasedClient client = clients.remove(clientId);
    if (null == client) {
        return true;
    }
    client.release();
    // 推送一个ClientDisconnectEvent事件
    NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client));
    return true;
}

事件处理流程

ClientDisconnectEvent事件:Client disconnect event. Happened when Client disconnect with server.

  • ClientServiceIndexesManager - 维护注册和订阅关系
  • DistroClientDataProcessor - 同步客户端数据到所有服务节点
  • NamingMetadataManager - 维护客户端注册的服务和实例元数据信息

客户端

建立连接

ServerListFactory接口

Server list factory. Use to inner client to connected and switch servers.

管理Server服务器地址集合,RpcClient使用这个接口选择可用的服务器地址。

public interface ServerListFactory {

    // 选择一个可用的服务器地址 ip:port格式
    String genNextServer();

    // 返回当前使用的服务器地址 ip:port格式
    String getCurrentServer();

    // 返回服务器集合
    List<String> getServerList();
}

ServerListManager类

解析Properties参数封装服务器地址集合。

创建RpcClient

RpcClientFactory.createClient(uuid, ConnectionType.GRPC, labels);

createClient方法:

public static RpcClient createClient(String clientName,
                                     ConnectionType connectionType,
                                     Map<String, String> labels) {
    return CLIENT_MAP.compute(clientName, (clientNameInner, client) -> {
        if (client == null) {
            if (ConnectionType.GRPC.equals(connectionType)) {
                // 创建的是GrpcSdkClient对象
                client = new GrpcSdkClient(clientNameInner);
            }
            if (client == null) {
                throw new UnsupportedOperationException(
                    "unsupported connection type :" + connectionType.getType());
            }
            client.labels(labels);
        }
        return client;
    });
}

之后需要为Client进行初始化:

  1. 设置ServerListFactory,用于选择服务器地址

  2. 注册ServerRequestHandler处理器,用于处理服务端发送的请求,比如服务订阅的回调、配置文件变化通知

  3. 注册ConnectionEventListener监听器

    rpcClient.serverListFactory(serverListFactory);
    rpcClient.start();
    rpcClient.registerServerRequestHandler(new NamingPushRequestHandler(serviceInfoHolder));
    rpcClient.registerConnectionListener(namingGrpcConnectionEventListener);
    
  4. 启动Client

    • 启动ConnectionEvent处理线程
    • 启动健康检查(心跳)线程
    • 创建GrpcConnection

创建GrpcConnection

  1. 创建GRPC的RequestFutureStub和BiRequestStreamStub
  2. 发一个ServerCheckRequest请求验证服务端的可用性
  3. 创建GrpcConnection对象,封装serverInfo和executor、connectionId、channel等
  4. 为BiRequestStreamStub绑定请求处理逻辑:使用ServerRequestHandler处理器处理服务端发送过来的请求
  5. 发送ConnectionSetupRequest请求,让服务端创建并注册GrpcConnection
if (grpcExecutor == null) {
    int threadNumber = ThreadUtils.getSuitableThreadCount(8);
    grpcExecutor = new ThreadPoolExecutor(threadNumber, threadNumber, 10L, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(10000),
            new ThreadFactoryBuilder().setDaemon(true).setNameFormat("nacos-grpc-client-executor-%d")
                    .build());
    grpcExecutor.allowCoreThreadTimeOut(true);
}
// 8848+1000
int port = serverInfo.getServerPort() + rpcPortOffset();
RequestGrpc.RequestFutureStub newChannelStubTemp = createNewChannelStub(serverInfo.getServerIp(), port);
// 发一个ServerCheckRequest请求验证服务端的可用性
Response response = serverCheck(serverInfo.getServerIp(), port, newChannelStubTemp);
if (response == null || !(response instanceof ServerCheckResponse)) {
    shuntDownChannel((ManagedChannel) newChannelStubTemp.getChannel());
    return null;
}

BiRequestStreamGrpc.BiRequestStreamStub biRequestStreamStub = BiRequestStreamGrpc
        .newStub(newChannelStubTemp.getChannel());
// 创建GrpcConnection对象,封装serverInfo和executor、connectionId、channel等
GrpcConnection grpcConn = new GrpcConnection(serverInfo, grpcExecutor);
grpcConn.setConnectionId(((ServerCheckResponse) response).getConnectionId());

// create stream request and bind connection event to this connection
StreamObserver<Payload> payloadStreamObserver = bindRequestStream(biRequestStreamStub, grpcConn);

// stream observer to send response to server
grpcConn.setPayloadStreamObserver(payloadStreamObserver);
grpcConn.setGrpcFutureServiceStub(newChannelStubTemp);
grpcConn.setChannel((ManagedChannel) newChannelStubTemp.getChannel());
// send a setup request
ConnectionSetupRequest conSetupRequest = new ConnectionSetupRequest();
conSetupRequest.setClientVersion(VersionUtils.getFullClientVersion());
conSetupRequest.setLabels(super.getLabels());
conSetupRequest.setAbilities(super.clientAbilities);
conSetupRequest.setTenant(super.getTenant());
grpcConn.sendRequest(conSetupRequest);

发送请求

Requester接口

这个接口定义了发送请求的方法:

public interface Requester {

    /**
     * send request.
     *
     * @param request      request.
     * @param timeoutMills mills of timeouts.
     * @return response  response returned.
     * @throws NacosException exception throw.
     */
    Response request(Request request, long timeoutMills) throws NacosException;

    /**
     * send request.
     *
     * @param request request.
     * @return request future.
     * @throws NacosException exception throw.
     */
    RequestFuture requestFuture(Request request) throws NacosException;

    /**
     * send async request.
     *
     * @param request         request.
     * @param requestCallBack callback of request.
     * @throws NacosException exception throw.
     */
    void asyncRequest(Request request, RequestCallBack requestCallBack) throws NacosException;

    /**
     * close connection.
     */
    void close();
}

GrpcConnection实现

GrpcConnection类实现了Requester接口的三个request方法,使用的是GRPC的Stub发送请求,以request方法为例:

public Response request(Request request, long timeouts) throws NacosException {
    Payload grpcRequest = GrpcUtils.convert(request);
    ListenableFuture<Payload> requestFuture = grpcFutureServiceStub.request(grpcRequest);
    Payload grpcResponse;
    try {
        // 由于request方法是同步的,所以此处阻塞等待响应
        grpcResponse = requestFuture.get(timeouts, TimeUnit.MILLISECONDS);
    } catch (Exception e) {
        throw new NacosException(NacosException.SERVER_ERROR, e);
    }

    return (Response) GrpcUtils.parse(grpcResponse);
}

对于另外两个方法:

  • requestFuture方法:在grpcFutureServiceStub.request(grpcRequest)发送请求之后,创建一个RequestFuture返回
  • asyncRequest方法:在grpcFutureServiceStub.request(grpcRequest)发送请求之后,为requestFuture添加监听回调

心跳healthCheck

前文介绍过,在启动RpcClient阶段,会启动健康检查任务,该任务每5秒执行一次,对当前客户端封装的connection做健康检查:

// keepAliveTime默认5000L
ReconnectContext reconnectContext = reconnectionSignal
        .poll(keepAliveTime, TimeUnit.MILLISECONDS);
if (reconnectContext == null) {
    // check alive time.
    if (System.currentTimeMillis() - lastActiveTimeStamp >= keepAliveTime) {
        // 健康检查
        boolean isHealthy = healthCheck();
        if (!isHealthy) {
            if (currentConnection == null) {
                continue;
            }

            RpcClientStatus rpcClientStatus = RpcClient.this.rpcClientStatus.get();
            if (RpcClientStatus.SHUTDOWN.equals(rpcClientStatus)) {
                break;
            }

            // 准备重连
            boolean success = RpcClient.this.rpcClientStatus
                    .compareAndSet(rpcClientStatus, RpcClientStatus.UNHEALTHY);
            if (success) {
                reconnectContext = new ReconnectContext(null, false);
            } else {
                continue;
            }
        } else {
            lastActiveTimeStamp = System.currentTimeMillis();
            continue;
        }
    } else {
        continue;
    }
}

if (reconnectContext.serverInfo != null) {
    // clear recommend server if server is not in server list.
    boolean serverExist = false;
    for (String server : getServerListFactory().getServerList()) {
        ServerInfo serverInfo = resolveServerInfo(server);
        if (serverInfo.getServerIp().equals(reconnectContext.serverInfo.getServerIp())) {
            serverExist = true;
            reconnectContext.serverInfo.serverPort = serverInfo.serverPort;
            break;
        }
    }
    if (!serverExist) {
        reconnectContext.serverInfo = null;
    }
}
// 重连
reconnect(reconnectContext.serverInfo, reconnectContext.onRequestFail);

healthCheck方法:

private boolean healthCheck() {
    HealthCheckRequest healthCheckRequest = new HealthCheckRequest();
    if (this.currentConnection == null) {
        return false;
    }
    try {
        Response response = this.currentConnection.request(healthCheckRequest, 3000L);
        // not only check server is ok ,also check connection is register.
        return response != null && response.isSuccess();
    } catch (NacosException e) {
        // ignore
    }
    return false;
}