Nacos源码(四):服务端服务注册源码分析

发布时间 2023-12-08 16:47:48作者: 无虑的小猪

1、服务端服务注册源码入口分析

  客户端在注册服务是调用NamingService的registerInstance方法,使用HTTP协议注册服务至Nacos服务端时,调用服务端提供的 "/v1/ns/instance" 接口,优先在服务端找到该入口。

  搜索关键字 "/instance",排除测试案例后,最终可以找到 UtilsAndCommons、InitUtils、UtilAndComs 中包含 /instance"。

0

  InitUtils、UtilAndComs是 nacos-client模块下的类,这里不做考虑。

0

  下面我们重点关注 UtilsAndCommons 中的 "/instance" 是在哪里调用的,找到所有使用 NACOS_NAMING_INSTANCE_CONTEXT 常量的地方,详情如下:

0

  锁定 nacos-naming 模块的controllers包下的 InstanceController、InstanceControllerV2。根据 UtilsAndCommons 中的常量可知,请求 InstanceControllerV2 的url为/v2/ns/instance,请求InstanceController的url为 /v1/ns/instance。

// nacos服务版本
public static final String NACOS_SERVER_VERSION = "/v1";
public static final String NACOS_SERVER_VERSION_2 = "/v2";
// nacos默认命名空间地址
public static final String DEFAULT_NACOS_NAMING_CONTEXT = NACOS_SERVER_VERSION + "/ns";
public static final String DEFAULT_NACOS_NAMING_CONTEXT_V2 = NACOS_SERVER_VERSION_2 + "/ns";

  综上服务端服务注册源码分析入口为 nacos-naming 的 InstanceController。

2、服务端服务注册

0

  在InstanceController的所有方法中,register方法是服务端注册服务的核心,InstanceController#register 详情如下:

 1 /**
 2  * 注册一个新实例
 3  * Register new instance.
 4  */
 5 @CanDistro
 6 @PostMapping
 7 @Secured(action = ActionTypes.WRITE)
 8 public String register(HttpServletRequest request) throws Exception {
 9     // 获取namespaceId,默认的为 public
10     final String namespaceId = WebUtils
11             .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
12     final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
13     // serviceName合法性检查
14     NamingUtils.checkServiceNameFormat(serviceName);
15 
16     // 构造者模式,从request中获取客户端的实例信息,并填充到实例构造器 InstanceBuilder 对象, 通过 InstanceBuilder 构建实例
17     final Instance instance = HttpRequestInstanceBuilder.newBuilder()
18             .setDefaultInstanceEphemeral(switchDomain.isDefaultInstanceEphemeral()).setRequest(request).build();
19 
20     // 注册实例, 调用 InstanceOperatorClientImpl#registerInstance
21     getInstanceOperator().registerInstance(namespaceId, serviceName, instance);
22 
23     // 发布注册实例跟踪事件
24     NotifyCenter.publishEvent(new RegisterInstanceTraceEvent(System.currentTimeMillis(), "", false, namespaceId,
25             NamingUtils.getGroupName(serviceName), NamingUtils.getServiceName(serviceName), instance.getIp(),
26             instance.getPort()));
27     return "ok";
28 }

服务端服务注册总体分为两步:

  1、从request中解析出客户端需要注册的instance实例;

  2、服务端注册实例。

2.1、Instance实例解析

  Instance实例解析的核心代码如下:

final Instance instance = HttpRequestInstanceBuilder.newBuilder()
            .setDefaultInstanceEphemeral(switchDomain.isDefaultInstanceEphemeral()).setRequest(request).build();

  采用构造者模式,从请求request中获取客户端的实例信息,并填充到实例构造器 InstanceBuilder 对象, 通过 InstanceBuilder 构建实例。

2.2、服务端注册实例

  服务端注册是核心代码如下:

getInstanceOperator().registerInstance(namespaceId, serviceName, instance);

   实际调用 InstanceOperatorClientImpl#registerInstance

 1 /**
 2  * 服务端注册实例
 3  */
 4 @Override
 5 public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
 6     // 实例合法性检查
 7     NamingUtils.checkInstanceIsLegal(instance);
 8     // 判断实例是否为瞬时对象,默认为true
 9     boolean ephemeral = instance.isEphemeral();
10     // 获取客户端id, 一个客户端gRPC长连接对应一个Client,每个Client有自己唯一的id(clientId)
11     String clientId = IpPortBasedClient.getClientId(instance.toInetAddr(), ephemeral);
12     // 通过客户端Id获取连接
13     createIpPortClientIfAbsent(clientId);
14     // 获取服务
15     Service service = getService(namespaceId, serviceName, ephemeral);
16     // 实际干活的,注册实例
17     clientOperationService.registerInstance(service, instance, clientId);
18 }

2.2.1、根据ClientId获取Client

  Nacos在2.0版本之后新增了Client模型,每个Client都有自身唯一的ClientId,一个gRPC长连接对应一个Client。Client负责管理一个客户端的服务实例注册Publish和服务订阅Subscribe。Client接口详情如下:

 1 /**
 2  * Client接口
 3  */
 4 public interface Client {
 5     // 为当前客户端获取一个唯一的Id
 6     String getClientId();
 7     // 是否是临时的客户端
 8     boolean isEphemeral();
 9     //设置最后更新时间
10     void setLastUpdatedTime();
11     // 获取最后更新时间
12     long getLastUpdatedTime();
13     // 为当前客户端注册服务实例
14     boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo);
15     // 注销服务实例
16     InstancePublishInfo removeServiceInstance(Service service);
17     // 服务实例查询
18     InstancePublishInfo getInstancePublishInfo(Service service);
19     // 获取当前客户端所有已注册的服务
20     Collection<Service> getAllPublishedService();
21     // 订阅服务
22     boolean addServiceSubscriber(Service service, Subscriber subscriber);
23     // 取消服务订阅
24     boolean removeServiceSubscriber(Service service);
25     // 从客户端获取服务订阅信息
26     Subscriber getSubscriber(Service service);
27     // 当前客户端获取所有已订阅服务
28     Collection<Service> getAllSubscribeService();
29     // 当前客户端是否过期
30     boolean isExpire(long currentTime);
31     // 生成同步给其他节点的client数据
32     ClientSyncData generateSyncData();
33     // 释放当前客户端并释放资源
34     void release();
35 }

  Nacos服务端使用 ClientManager 接口管理 Client的创建、获取、释放。

0

  ClientManagerDelegate 实现 ClientManager 接口,有不同连接的Client管理器。

1 // 基于Tcp连接的Client管理器,负责管理长连接clientId与Client模型的映射关系
2 private final ConnectionBasedClientManager connectionBasedClientManager;
3 // 基于IP和PORT的临时Client管理器
4 private final EphemeralIpPortClientManager ephemeralIpPortClientManager;
5 // 基于IP和PORT的持久Client管理器
6 private final PersistentIpPortClientManager persistentIpPortClientManager;

  Client由ClientFactory创建,默认使用EphemeralIpPortClientFactory创建IpPortBasedClient对象,并将ClientId与IpPortBasedClient的映射关系存储到客户端管理器EphemeralIpPortClientManager的 clients 属性中缓存起来。

private final ConcurrentMap<String, IpPortBasedClient> clients = new ConcurrentHashMap<>();

  在EphemeralIpPortClientManager管理器中,初始化IpPortBasedClient时,完成了心跳检测定时任务的设置。

0

  IpPortBasedClient#init() 详情如下:

 1 /**
 2  * 初始化client,设置心跳检测定时任务
 3  */
 4 public void init() {
 5     // 临时的client
 6     if (ephemeral) {
 7         // ClientBeatCheckTaskV2 作为心跳检测的任务
 8         beatCheckTask = new ClientBeatCheckTaskV2(this);
 9         HealthCheckReactor.scheduleCheck(beatCheckTask);
10     // 非临时的client
11     } else {
12         // HealthCheckTaskV2 作为心跳检测的任务
13         healthCheckTaskV2 = new HealthCheckTaskV2(this);
14         HealthCheckReactor.scheduleCheck(healthCheckTaskV2);
15     }
16 }

2.2.2、创建Service

  根据客户端请求的groupName、serviceName创建Service对象。Service#newService 详情如下:

public static Service newService(String namespace, String group, String name, boolean ephemeral) {
    // 创建服务Service
    return new Service(namespace, group, name, ephemeral);
}

  Service对象包含了 命名空间、组名、服务名,临时客户端标识等信息。

2.2.3、注册服务

  注册服务实例由 ClientOperationService 接口的 registerInstance 方法完成,ClientOperationService的实现类如下:

   这里我们来看默认的注册实现,EphemeralClientOperationServiceImpl # registerInstance 详情如下:

 1 public void registerInstance(Service service, Instance instance, String clientId) throws NacosException {
 2     // 实例合法性校验
 3     NamingUtils.checkInstanceIsLegal(instance);
 4 
 5     // 获取单例Service
 6     Service singleton = ServiceManager.getInstance().getSingleton(service);
 7     if (!singleton.isEphemeral()) {
 8         throw new NacosRuntimeException(NacosException.INVALID_PARAM,
 9                 String.format("Current service %s is persistent service, can't register ephemeral instance.",
10                         singleton.getGroupedServiceName()));
11     }
12     // 根据客户端id,获取客户端
13     Client client = clientManager.getClient(clientId);
14     if (!clientIsLegal(client, clientId)) {
15         return;
16     }
17     InstancePublishInfo instanceInfo = getPublishInfo(instance);
18     // 服务注册实例,负责存储当前客户端的服务注册表,即Service与Instance的关系。
19     client.addServiceInstance(singleton, instanceInfo);
20     client.setLastUpdatedTime();
21     client.recalculateRevision();
22     // 建立Service与ClientId的关系
23     NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
24     NotifyCenter
25             .publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));
26 }

1、ServiceManager

  ServiceManager是 Nacos的服务管理器,内部维护了两个 ConcurrentHashMap 类型的成员变量,singletonRepository 用来保证Service的单例;namespaceSingletonMaps 用来存储namespace下的所有 Service。

0

  获取单例Service详情:

 1 //保证单例Service
 2 private final ConcurrentHashMap<Service, Service> singletonRepository;
 3 //namespace下的所有service,存储Service的容器
 4 private final ConcurrentHashMap<String, Set<Service>> namespaceSingletonMaps;
 5 
 6 /**
 7  * 获取单例 Service
 8  */
 9 public Service getSingleton(Service service) {
10     // Service在singletonRepository中不存在,发布ServiceMetadataEvent事件,将Service设置到singletonRepository中
11     singletonRepository.computeIfAbsent(service, key -> {
12         NotifyCenter.publishEvent(new MetadataEvent.ServiceMetadataEvent(service, false));
13         return service;
14     });
15     // 获取单例 Service
16     Service result = singletonRepository.get(service);
17     // 容器,存储 namespace 与 Service集 的映射关系
18     namespaceSingletonMaps.computeIfAbsent(result.getNamespace(), namespace -> new ConcurrentHashSet<>());
19     namespaceSingletonMaps.get(result.getNamespace()).add(result);
20     return result;
21 }

2、ClientManager

  ClientManager是Nacos的客户端管理器,负责管理clientId与Client模型的映射关系。

public Client getClient(String clientId) {
    return clients.get(clientId);
}

  clients是 ConcurrentMap<string, <t="" extends="" abstractclient="">> 类型的容器,获取继承了AbstractClient处理不同的连接的Client对象。

2.3、实际的注册

  AbstractClient负责存储当前客户端的服务注册表,即Service与Instance的映射关系。对于单个客户端来说,同一个服务只能注册一个实例。

  实际的注册,AbstractClient#addServiceInstance 详情如下:

 1 // 服务与实例的映射关系
 2 protected final ConcurrentHashMap<Service, InstancePublishInfo> publishers = new ConcurrentHashMap<>(16, 0.75f, 1);
 3 
 4 // 注册服务与实例的映射关系
 5 public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {
 6     // 注册服务与实例的映射关系
 7     if (null == publishers.put(service, instancePublishInfo)) {
 8         // 批量注册
 9         if (instancePublishInfo instanceof BatchInstancePublishInfo) {
10             MetricsMonitor.incrementIpCountWithBatchRegister(instancePublishInfo);
11         } else {
12             MetricsMonitor.incrementInstanceCount();
13         }
14     }
15     // 发布服务注册事件
16     NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));
17     Loggers.SRV_LOG.info("Client change for service {}, {}", service, getClientId());
18     return true;
19 }

  Service与Instance的映射关系建立之后,发布服务注册事件。

2.4、服务注册事件的处理

  发布服务注册事件,最终调用 NamingEventPublisher 将事件发布出去。NamingEventPublisher类继承结构如下:

0

  NamingEventPublisher继承Thread类,下面我们看下重写 run 方法,NamingEventPublisher#run,详情如下:

 1 public void run() {
 2     try {
 3         // 锁等待,等待订阅者初始化完成
 4         waitSubscriberForInit();
 5         // 处理事件
 6         handleEvents();
 7     } catch (Exception e) {
 8         Loggers.EVT_LOG.error("Naming Event Publisher {}, stop to handle event due to unexpected exception: ",
 9                 this.publisherName, e);
10     }
11 }
12 
13 /**
14  * 处理事件
15  */
16 private void handleEvent(Event event) {
17     // 获取事件类型
18     Class<? extends Event> eventType = event.getClass();
19     // 获取订阅该事件类型的订阅者
20     Set<Subscriber<? extends Event>> subscribers = subscribes.get(eventType);
21     if (null == subscribers) {
22         return;
23     }
24     // 通知订阅者
25     for (Subscriber subscriber : subscribers) {
26         notifySubscriber(subscriber, event);
27     }
28 }

  将服务注册事件通知订阅者,并调用 onEvent 事件。

0

  通过查看Subscriber监听接口子类情况,发现 ClientServiceIndexesManager。

0

  通过 ClientServiceIndexesManager#subscribeTypes 方法,查看 ClientServiceIndexesManager 支持处理的事件类型详情如下:

0

  ClientServiceIndexesManager构造函数中,维护了事件订阅者、事件类型、事件发布者的关系,在创建 ClientServiceIndexesManager实例时完成了订阅者的注册。

0

  根据事件机制,ClientServiceIndexesManager#onEvent() 的事件处理详情如下:

 1 // 发布索引  service 与 clientId列表 映射关系
 2 private final ConcurrentMap<Service, Set<String>> publisherIndexes = new ConcurrentHashMap<>();
 3 // 订阅索引  service 与 clientId列表 映射关系
 4 private final ConcurrentMap<Service, Set<String>> subscriberIndexes = new ConcurrentHashMap<>();
 5 
 6 // 事件处理
 7 public void onEvent(Event event) {
 8     // 客户端断开连接事件
 9     if (event instanceof ClientEvent.ClientDisconnectEvent) {
10         handleClientDisconnect((ClientEvent.ClientDisconnectEvent) event);
11     // 客户端操作事件
12     } else if (event instanceof ClientOperationEvent) {
13         handleClientOperation((ClientOperationEvent) event);
14     }
15 }
16 
17 // 处理客户端操作
18 private void handleClientOperation(ClientOperationEvent event) {
19     Service service = event.getService();
20     String clientId = event.getClientId();
21     // 注册事件
22     if (event instanceof ClientOperationEvent.ClientRegisterServiceEvent) {
23         addPublisherIndexes(service, clientId);
24     // 注销事件
25     } else if (event instanceof ClientOperationEvent.ClientDeregisterServiceEvent) {
26         removePublisherIndexes(service, clientId);
27     // 订阅事件
28     } else if (event instanceof ClientOperationEvent.ClientSubscribeServiceEvent) {
29         addSubscriberIndexes(service, clientId);
30     // 取消订阅事件
31     } else if (event instanceof ClientOperationEvent.ClientUnsubscribeServiceEvent) {
32         removeSubscriberIndexes(service, clientId);
33     }
34 }
35 
36 // 添加发布索引
37 private void addPublisherIndexes(Service service, String clientId) {
38     // 建立发布Service与clientId列表的映射关系
39     publisherIndexes.computeIfAbsent(service, key -> new ConcurrentHashSet<>());
40     publisherIndexes.get(service).add(clientId);
41     NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));
42 }

  ClientServiceIndexesManager维护了两个索引:publisherIndexes、subscriberIndexes。

  publisherIndexes维护Service与发布clientId列表的映射关系,当有新的clientId注册,将clientId添加到clientId列表中。

  subscriberIndexes维护Service与订阅clientId列表的映射关系,当有clientId断开连接或取消订阅,将clientId从clientId列表中移除。

  索引关系建立以后,还会触发服务注册表变更ServiceChangedEvent事件。

  NamingSubscriberServiceV2Impl订阅了 ServiceChangedEvent 事件,NamingSubscriberServiceV2Impl#scubsribeTypes 详情如下:

0

  构造函数中注册 ServiceChangedEvent 事件订阅者。

0

  ServiceChangedEvent事件处理详情,NamingSubscriberServiceV2Impl#onEvent:

 1 // 事件处理
 2 public void onEvent(Event event) {
 3     // 服务改变事件
 4     if (event instanceof ServiceEvent.ServiceChangedEvent) {
 5         ServiceEvent.ServiceChangedEvent serviceChangedEvent = (ServiceEvent.ServiceChangedEvent) event;
 6         Service service = serviceChangedEvent.getService();
 7         delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay()));
 8         MetricsMonitor.incrementServiceChangeCount(service.getNamespace(), service.getGroup(), service.getName());
 9     // 服务订阅事件
10     } else if (event instanceof ServiceEvent.ServiceSubscribedEvent) {
11         ServiceEvent.ServiceSubscribedEvent subscribedEvent = (ServiceEvent.ServiceSubscribedEvent) event;
12         Service service = subscribedEvent.getService();
13         delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay(),
14                 subscribedEvent.getClientId()));
15     }
16 }

  注册表变更后还需要完成两个事情:1.通知订阅客户端 2.Nacos集群数据同步。

3、服务注册整体流程