Nacos源码(二):客户端服务注册源码分析

发布时间 2023-11-29 10:02:49作者: 无虑的小猪

  当生产者启动时,会自动注册到Nacos,如下图的service-provider:

0

  客户端的服务注册的都做了哪些事情。

1、服务注册源码分析入口及整体步骤解析

1.1、整体步骤

  从nacos-2.2.0源码包中提供的nacos-example模板作为切入点,NamingExample详情如下:

在示例中,服务注册分为3步:

  1、服务注册信息封装;

  2、通过服务注册信息创建注册发现服务 - NamingService;

  3、通过注册发现服务 - NamingService创建实例Instance,并完成实例的注册。

1.2、注册需要信息

  构建NamingService需要NacosServer的连接信息,注册需要服务实例Instance。

1.2.1、NacosServer的相关信息信息

  在示例中,Nacos Server的连接信息存储在Properties中,包含 Server地址 - serverAddr,命名空间 namespace。在NamingService实例化时,加载Nacos Server信息,

0

  SearchableProperties#derive 详情如下:

// 加载Properties和nacos-client的配置
public NacosClientProperties derive(Properties properties) {
    // 加载nacos-client的配置,若无自定义配置,使用默认的配置nacos_default_setting.properties
    final NacosClientProperties nacosClientProperties = this.derive();
    // 加载Nacos Server的地址和命名空间
    nacosClientProperties.addProperties(properties);
    return nacosClientProperties;
}

  nacos的默认配置为nacos-client模块的 src/main/resources/nacos_default_setting.properties,详情如下:

contextPath=/nacos
nacos.cache.data.init.snapshot=true
configLongPollTimeout=30000
configRetryTime=2000
enableRemoteSyncConfig=false
maxRetry=3
limitTime=5
isUseEndpointParsingRule=true
isMultiInstance=false
com.alibaba.nacos.naming.log.filename=naming.log
namingRequestTimeout=-1
com.alibaba.nacos.client.naming.rtimeout=50000
com.alibaba.nacos.client.naming.ctimeout=3000
tls.enable=false
namingRequestDomainMaxRetryCount=3
nacos.use.cloud.namespace.parsing=true
nacos.use.endpoint.parsing.rule=true
nacos.client.contextPath=nacos
nacos.server.port=8848
NACOS.CONNECT.TIMEOUT=1000
PER_TASK_CONFIG_SIZE=3000

  包含了Nacos Server的端口、通讯超时时间、重试次数、日志存放路径等信息。

1.2.2、服务实例Instance

  Instance中包含了实例的基础信息,还包含类型为HashMap用户拓展属性 metadata,用户可以将实例的心跳间隔时间、实例状态不健康时间阈值、IP移除时间阈值设置在实例的元数据 metadata 中。
/**********************************   属性  *****************************************/
// 唯一的实例id
private String instanceId;
// 实例的ip
private String ip;
// 实例的端口
private int port;
// 权重,当前实例的权限,默认1.0D
private double weight = 1.0D;
// 实例的健康状态,默认true
private boolean healthy = true;
// 实例是否准备好接收请求,默认true
private boolean enabled = true;
// 默认示例为瞬时对象,用来判断注册服务使用的代理
private boolean ephemeral = true;
// 实例所属的集群名称
private String clusterName;
// 实例的服务信息
private String serviceName;
// 实例的元数据,用户拓展属性
private Map<String, String> metadata = new HashMap<>();

/**********************************   方法  *****************************************/
/**
 * 获取心跳间隔时间
 * @return
 */
public long getInstanceHeartBeatInterval() {
    // 优先从 Instance#metadata元数据中获取用户定义的key为 preserved.heart.beat.interval 心跳间隔时间,
    // 若用户未定义,使用系统默认间隔时间 5s
    return getMetaDataByKeyWithDefault(PreservedMetadataKeys.HEART_BEAT_INTERVAL,
            Constants.DEFAULT_HEART_BEAT_INTERVAL);
}
/**
 * 获取心跳超时时间
 * @return
 */
public long getInstanceHeartBeatTimeOut() {
    // 优先从 Instance#metadata元数据中获取用户定义的key为 preserved.heart.beat.timeout 心跳超时时间,
    // 若用户未定义,使用系统默认间隔时间 15s
    return getMetaDataByKeyWithDefault(PreservedMetadataKeys.HEART_BEAT_TIMEOUT,
            Constants.DEFAULT_HEART_BEAT_TIMEOUT);
}
/**
 * 获取移除IP超时时间
 * @return
 */
public long getIpDeleteTimeout() {
    // 优先从 Instance#metadata元数据中获取用户定义的key为 preserved.ip.delete.timeout 的值 移除IP超时时间,
    // 若用户未定义,使用系统默认间隔时间 30s
    return getMetaDataByKeyWithDefault(PreservedMetadataKeys.IP_DELETE_TIMEOUT,
            Constants.DEFAULT_IP_DELETE_TIMEOUT);
}
/**
 * 获取实例Id生成器
 * @return
 */
public String getInstanceIdGenerator() {
    // 优先从 Instance#metadata元数据中获取用户定义的key为 preserved.instance.id.generator 的值 实例Id生成器,
    // 若用户未定义,使用系统默认的实例ID生成器 simple
    return getMetaDataByKeyWithDefault(PreservedMetadataKeys.INSTANCE_ID_GENERATOR,
            Constants.DEFAULT_INSTANCE_ID_GENERATOR);
}

  Constants 部分属性如下:

/**
 * 默认的心跳超时时间15s,即超过15s收不到心跳,服务会被标记为不健康
 */
public static final long DEFAULT_HEART_BEAT_TIMEOUT = TimeUnit.SECONDS.toMillis(15);
/**
 * 默认的IP移除超时时间30s,即30秒收不到心跳,实例将会被移除
 */
public static final long DEFAULT_IP_DELETE_TIMEOUT = TimeUnit.SECONDS.toMillis(30);
/**
 * 默认的心跳间隔时间5s,即5s进行一次心跳
 */
public static final long DEFAULT_HEART_BEAT_INTERVAL = TimeUnit.SECONDS.toMillis(5);
/**
 * 默认的实例Id生成器
 */
public static final String DEFAULT_INSTANCE_ID_GENERATOR = "simple";

  Nacos对必要的信息提供默认值,当用户未设置相关属性时,Nacos使用默认值在注册实例是与Nacos Server进行交互,这样Nacos Server便知道了该实例的心跳间隔时间、心跳超时时间,进而判断该实例是否健康。

2、服务注册源码详解

  服务注册过程的核心: NamingService - 注册发现服务。

2.1、NamingService

  NamingService接口在nacos-api模块中,是Nacos命名服务对外提供的一个统一接口,提供了操作实例的的相关方法:

// 注册服务
void registerInstance(...) throws NacosException;
// 注销服务
void deregisterInstance(...) throws NacosException;
// 获取服务实例列表
List<Instance> getAllInstances(...) throws NacosException;
// 获取健康的服务实例列表
List<Instance> selectInstances(...) throws NacosException;
// 获取一个健康的实例,通常使用负载均衡策略
Instance selectOneHealthyInstance(...) throws NacosException;
// 订阅服务,监听服务事件
void subscribe(...) throws NacosException;
// 取消订阅
void unsubscribe(...) throws NacosException;
// 获取服务名称
ListView<String> getServicesOfServer(...) throws NacosException;
// 获取所有订阅的服务
List<ServiceInfo> getSubscribeServices() throws NacosException;
// 获取Nacos服务健康状态
String getServerStatus();
// 关闭服务
void shutDown() throws NacosException;

  NamingService为满足不同的场景,有很多重载的方法,用户可以根据自己的实际情况调用合适的方法。

2.2、NacosNamingService

2.2.1、NacosNamingService实例化

  在NamingExample中,通过NamingFactory和NacosService连接信息完成了NamingService的创建。

  NamingFactory#createNamingService 详情如下:

// 创建 NamingService 实例
public static NamingService createNamingService(Properties properties) throws NacosException {
    try {
        // 反射机制,实现类为 NacosNamingService
        Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.naming.NacosNamingService");
        // 通过 Properties 的有参构造器实例化
        Constructor constructor = driverImplClass.getConstructor(Properties.class);
        return (NamingService) constructor.newInstance(properties);
    } catch (Throwable e) {
        throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);
    }
}

  NamingService通过反射机制完成实例化,实现类为其子类NacosNamingService。NacosNamingService的Properties参数构造器详情:

// 实例化需要的构造器,properties中包含了NacosServer的连接信息
public NacosNamingService(Properties properties) throws NacosException {
    init(properties);
}

// 实际初始化的方法
private void init(Properties properties) throws NacosException {
    final NacosClientProperties nacosClientProperties = NacosClientProperties.PROTOTYPE.derive(properties);
    
    // 参数合法性检查
    ValidatorUtils.checkInitParam(nacosClientProperties);
    // 获取当前nacos客户端的命令空间
    this.namespace = InitUtils.initNamespaceForNaming(nacosClientProperties);
    InitUtils.initSerialization();
    InitUtils.initWebRootContext(nacosClientProperties);
    // 初始化日志名称
    initLogName(nacosClientProperties);

    this.notifierEventScope = UUID.randomUUID().toString();
    this.changeNotifier = new InstancesChangeNotifier(this.notifierEventScope);
    // 注册事件
    NotifyCenter.registerToPublisher(InstancesChangeEvent.class, 16384);
    NotifyCenter.registerSubscriber(changeNotifier);
    this.serviceInfoHolder = new ServiceInfoHolder(namespace, this.notifierEventScope, nacosClientProperties);
    // 初始化,注册服务 使用NamingClientProxyDelegate代理来完成
    this.clientProxy = new NamingClientProxyDelegate(this.namespace, serviceInfoHolder, nacosClientProperties, changeNotifier);
}

2.2.2、NacosNamingService注册实例

  在nacos-example的服务注册步骤中,注册服务调用的是NacosNamingService#registerInstance 详情如下:

// 注册服务实例
public void registerInstance(String serviceName, String ip, int port, String clusterName) throws NacosException {
    registerInstance(serviceName, Constants.DEFAULT_GROUP, ip, port, clusterName);
}

// 注册服务实例
public void registerInstance(String serviceName, String groupName, String ip, int port, String clusterName)
        throws NacosException {
    // 创建服务实例 Instance
    Instance instance = new Instance();
    instance.setIp(ip);
    instance.setPort(port);
    instance.setWeight(1.0);
    instance.setClusterName(clusterName);
    registerInstance(serviceName, groupName, instance);
}

// 注册服实例
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
    // 检查心跳、服务实例所在的集群名称是否合法
    NamingUtils.checkInstanceIsLegal(instance);
    // 使用 NamingClientProxyDelegate代理来完成服务注册
    clientProxy.registerService(serviceName, groupName, instance);
}

  在实例化NacosNamingService时,clientProxy被赋值为NamingClientProxyDelegate实例,实际是通过NamingClientProxyDelegate代理来完成注册功能。

  NamingClientProxyDelegate#registerSevice 详情如下:

// 注册服务
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
    // 获取客户端代理,默认NamingGrpcClientProxy,并注册实例
    getExecuteClientProxy(instance).registerService(serviceName, groupName, instance);
}

  判断当前实例是否为瞬时对象,若为瞬时对象,则采用gRPC协议(NamingGrpcClientProxy)进行请求;否则采用http协议(NamingHttpClientProxy)进行请求。

2.1、获取客户端代理

  NamingClientProxyDelegate的 getExecuteClientProxy 方法及关键属性。

// 默认实例为瞬时对象
private boolean ephemeral = true;
// http协议请求的客户端代理
private final NamingHttpClientProxy httpClientProxy;
// grpc协议请求的客户端代理
private final NamingGrpcClientProxy grpcClientProxy;

// 获取注册服务的代理示例
private NamingClientProxy getExecuteClientProxy(Instance instance) {
    // 当前实例是否为瞬时对象(2.2.0版本默认为瞬时对象),若为瞬时对象,客户端代理使用grpcClientProxy
    return instance.isEphemeral() ? grpcClientProxy : httpClientProxy;
}

  2.2.0版本的实例默认为瞬时对象,即默认采用了gRPC协议进行与Nacos服务进行交互。

2.2、NamingGrpcClientProxy客户端注册服务

  默认的注册服务实现,NamingGrpcClientProxy#registerService,详情如下:
// 注册服务
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
    // 缓存注册的实例,便于注册成功,更新注册状态
    redoService.cacheInstanceForRedo(serviceName, groupName, instance);
    //  注册服务,注册成功更新注册缓存实例中的注册状态
    doRegisterService(serviceName, groupName, instance);
}

  缓存实例数据,NamingGrpcRedoService # cacheInstanceForRedo详情如下:

// 注册的实例缓存
private final ConcurrentMap<String, InstanceRedoData> registeredInstances = new ConcurrentHashMap<>();

// 缓存数据
public void cacheInstanceForRedo(String serviceName, String groupName, Instance instance) {
    // 获取key,key=serviceName@@groupName
    String key = NamingUtils.getGroupedName(serviceName, groupName);
    // 构建实例包装对象,包含服务名、组名、实例,默认注册状态为 false
    InstanceRedoData redoData = InstanceRedoData.build(serviceName, groupName, instance);
    synchronized (registeredInstances) {
        // 缓存实例包装对象
        registeredInstances.put(key, redoData);
    }
}

  注册服务,实际干活的,NamingGrpcClientProxy#doRegisterService,详情如下:

// 实际干活的,注册服务无
public void doRegisterService(String serviceName, String groupName, Instance instance) throws NacosException {
    // 构建注册服务请求,请求类型:registerInstance
    InstanceRequest request = new InstanceRequest(namespaceId, serviceName, groupName,
            NamingRemoteConstants.REGISTER_INSTANCE, instance);
    // 注册请求,基于rpc进行服务调用
    requestToServer(request, Response.class);
    // 服务注册成功,修改缓存数据中的注册成功标识为true
    redoService.instanceRegistered(serviceName, groupName);
}

2.3、NamingHttpClientProxy客户端注册服务

  NamingHttpClientProxy使用HTTP协议向服务端发起请求,使用 HttpClientRequest 使用post请求, 调用 /v1/ns/instance 服务端接口。

public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
    NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName,
            instance);
    // 获取 serviceName 格式为 serviceName@@groupName
    String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
    // 实例是否为瞬时对象校验
    if (instance.isEphemeral()) {
        throw new UnsupportedOperationException(
                "Do not support register ephemeral instances by HTTP, please use gRPC replaced.");
    }
    // 组装参数
    final Map<String, String> params = new HashMap<>(32);
    // 命名空间id
    params.put(CommonParams.NAMESPACE_ID, namespaceId);
    // 服务名
    params.put(CommonParams.SERVICE_NAME, groupedServiceName);
    // 分组名称
    params.put(CommonParams.GROUP_NAME, groupName);
    // 集群名称
    params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
    // 实例信息 IP、端口、健康状态等信息
    params.put(IP_PARAM, instance.getIp());
    params.put(PORT_PARAM, String.valueOf(instance.getPort()));
    params.put(WEIGHT_PARAM, String.valueOf(instance.getWeight()));
    params.put(REGISTER_ENABLE_PARAM, String.valueOf(instance.isEnabled()));
    params.put(HEALTHY_PARAM, String.valueOf(instance.isHealthy()));
    params.put(EPHEMERAL_PARAM, String.valueOf(instance.isEphemeral()));
    params.put(META_PARAM, JacksonUtils.toJson(instance.getMetadata()));
    // HttpClientRequest 使用post请求, 调用 /v1/ns/instance 服务端接口
    reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);
}

  UtilAndComs常量如下:

0

  通过NamingHttpClientProxy的方式,可以知晓Nacos服务端提供了 /v1/ns/instance 的接口,供客户端完成服务注册。

2.4、小结

NamingGrpcClientProxy的registerService服务注册,实际做了两件事:

  1、缓存当前注册的实例信息,缓存的数据结构为ConcurrentMap<string, instance="">,key为“serviceName@@groupName”,value是封装的实例信息InstanceRedoData。

   2、封装请求参数,请求类型为 registerInstance,基于gPRC进行服务调用和结果处理,并更新当前缓存的注册实例信息的注册状态。

3、服务注册流程图

0