Nacos源码(七):客户端实例变更事件机制源码分析

发布时间 2023-12-08 17:23:02作者: 无虑的小猪

  在给出的NamingExample示例中,给出客户端订阅的代码,详情如下:

0

  客户端的订阅机制是通过事件完成的, NacosNamingService#subscribe() 详情如下:

0

客户端订阅主要步骤:

  1、注册事件监听器

  2、客户端订阅

  客户端订阅在Nacos源码(六):客户端服务发现源码分析中已经做了简要的分析,得知在Nacos客户端订阅时,通过UpdateTask定时任务从注册中心拉取实例列表,当实例发生变化时,更新本地缓存中的服务实例信息。具体是如何完成本地缓存更新未做详细介绍,下面重点来看看这一步是如何完成的。

1、本地缓存的处理回顾

  先来看看获取到实例列表后,如何处理本地缓存的,从定时同步任务UpdateTask的run()方法中,很容易看出哪一步是处理服务信息的。

0

  ServiceInfoHolder#processServiceInfo() 详情如下:

 1 // 本地缓存服务信息,内存中
 2 private final ConcurrentMap<String, ServiceInfo> serviceInfoMap;
 3 
 4 // 缓存磁盘文件路径
 5 private String cacheDir;
 6 
 7 public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {
 8     String serviceKey = serviceInfo.getKey();
 9     if (serviceKey == null) {
10         return null;
11     }
12     // 未能从Nacos服务获取到信息,则返回本地缓存的服务信息
13     ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());
14     if (isEmptyOrErrorPush(serviceInfo)) {
15         //empty or error push, just ignore
16         return oldService;
17     }
18     // 更新本地缓存服务信息
19     serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
20     // 判断注册的实例信息是否已变更
21     boolean changed = isChangedServiceInfo(oldService, serviceInfo);
22     if (StringUtils.isBlank(serviceInfo.getJsonFromServer())) {
23         serviceInfo.setJsonFromServer(JacksonUtils.toJson(serviceInfo));
24     }
25     // 监控服务监控缓存Map的大小
26     MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());
27     // 服务实例已变更
28     if (changed) {
29         NAMING_LOGGER.info("current ips:({}) service: {} -> {}", serviceInfo.ipCount(), serviceInfo.getKey(),
30                 JacksonUtils.toJson(serviceInfo.getHosts()));
31         // 添加实例变更事件,通知订阅者执行业务处理
32         NotifyCenter.publishEvent(new InstancesChangeEvent(notifierEventScope, serviceInfo.getName(), serviceInfo.getGroupName(),
33                 serviceInfo.getClusters(), serviceInfo.getHosts()));
34         // 记录Service本地文件
35         DiskCache.write(serviceInfo, cacheDir);
36     }
37     return serviceInfo;
38 }

  本地缓存由两部分,一是内存,存储在Map类型的ServiceInfoHolder#serviceInfoMap里;一是磁盘文件中,存储路径在 ServiceInfoHolder#cacheDir,存储在磁盘文件中的服务信息用于故障转移处理。

  当服务实例发生变更,更新内存、磁盘文件的实例列表信息,同时发布InstancesChangeEvent事件,通知订阅者进行业务处理。

2、发布实例变更事件

  发布实例变更事件最终执行到 NotifyCenter#publishEvent 详情如下:

 1 // INSTANCE是NotifyCenter的单例对象
 2 private static final NotifyCenter INSTANCE = new NotifyCenter();
 3 
 4 /**
 5  * 事件发布者管理器
 6  * Publisher management container.
 7  */
 8 private final Map<String, EventPublisher> publisherMap = new ConcurrentHashMap<>(16);
 9 
10 // 发布事件
11 private static boolean publishEvent(final Class<? extends Event> eventType, final Event event) {
12    // ...
13     // 根据事件类型获取canonicalName
14     final String topic = ClassUtils.getCanonicalName(eventType);
15 
16     // 根据 canonicalName 获取 事件发布者
17     EventPublisher publisher = INSTANCE.publisherMap.get(topic);
18     if (publisher != null) {
19         // 发布事件
20         return publisher.publish(event);
21     }
22     // ...
23     return false;
24 }

  根据InstancesChangeEvent事件类型,获取CanonicalName;将CanonicalName作为key,从NotifyCenter.publisherMap中获取对应的事件发布者(EventPublisher);EventPublisher将InstancesChangeEvent事件进行发布。

2.1、eventType 与 EventPublisher 映射关系的建立

  CanonicalName与EventPublisher是怎么建立起联系的呢?

  在Nacos源码(二):客户端服务注册源码分析中,NamingService进行实例化时,NacosNamingService构造函数中调用了init方法,NacosNamingService#init() 详情如下:

0

  在init方法中,注册了InstancesChangeEvent的事件发布者,同时也注册了实例列表变化事件的订阅者InstancesChangeNotifier。

2.1.1、注册事件发布器

  NotifyCenter#registerToPublisher利用 NotifyCenter#publisherMap 维护了 ChangeEvent 与 Publisher 的映射关系。,并启动了 EventPublisher 处理事件线程,等待订阅者启动成功后,将事件通知到订阅者处理。

  NotifyCenter#registerToPublisher 详情如下:

 1 // 事件发布工厂
 2 private static final EventPublisherFactory DEFAULT_PUBLISHER_FACTORY;
 3 
 4 /**
 5  * 注册事件发布者
 6  * eventType 为  InstancesChangeEvent.class
 7  */
 8 public static EventPublisher registerToPublisher(final Class<? extends Event> eventType, final int queueMaxSize) {
 9     // 根据 DEFAULT_PUBLISHER_FACTORY 获取 EventPublisher
10     return registerToPublisher(eventType, DEFAULT_PUBLISHER_FACTORY, queueMaxSize);
11 }
12 
13 /**
14  * 注册发布者
15  */
16 public static EventPublisher registerToPublisher(final Class<? extends Event> eventType,
17         final EventPublisherFactory factory, final int queueMaxSize) {
18     if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {
19         return INSTANCE.sharePublisher;
20     }
21     
22     // 根据eventType获取CanonicalName
23     final String topic = ClassUtils.getCanonicalName(eventType);
24     synchronized (NotifyCenter.class) {
25         // 维护 INSTANCE.publisherMap 中  CanonicalName 与 EventPublisher 的关系
26         MapUtil.computeIfAbsent(INSTANCE.publisherMap, topic, factory, eventType, queueMaxSize);
27     }
28     return INSTANCE.publisherMap.get(topic);
29 }

  EventPublisher 是通过 EventPublisherFactory 创建的,在 NotifyCenter 的静态代码块中利用反射机制进行实例化,默认使用的是 DefaultPublisher。

0

2.1.2、注册事件订阅者

  NotifyCenter#addSubscriber 详情如下:

0

  将事件订阅者添加到DefaultPublisher的 subscribers 属性中。

1 // 添加到订阅者的集合
2 public void addSubscriber(Subscriber subscriber) {
3     subscribers.add(subscriber);
4 }

  NotifyCenter#addSubscriber确定了事件、事件发布者、事件订阅者三者的关系,关联的关键是事件发布者 EventPublisher,

  通过事件类型EventType获取DefaultPublisher发布事件,再遍历DefaultPublisher中维护的subscribers订阅者集合,通知订阅者处理事件。

2.2、发布实例列表变更事件

  DefaultPublisher#publish 详情如下:

 1 /**
 2  * 发布事件任务
 3  */
 4 @Override
 5 public boolean publish(Event event) {
 6     checkIsStart();
 7     // 将事件添加进任务队列,等待执行
 8     boolean success = this.queue.offer(event);
 9     // 添加失败,直接处理事件
10     if (!success) {
11         // 通知订阅者处理事件
12         receiveEvent(event);
13         return true;
14     }
15     return true;
16 }

  实例列表变更事件添加到任务队列中,若添加失败,直接通知订阅者处理该事件。

3、通知订阅处理事件

  发布实例变更事件,实际上是将事件添加到任务队列queue中,队列在 NotifyCenter 的静态代码块中利用反射机制对 EventPublisherFactory 进行实例化时,执行 DefaultPublisher#init() 方法,详情如下:

0

  DefaultPublisher继承 Thread 类,在 init 方法中初始化了存放事件的任务队列,同时启动了事件发布者线程,有事件发布,则通知订阅者处理事件。

  接下来我们来看看是如何通知订阅者处理的,DefaultPublisher的 run 方法:

 1 // 开启事件处理
 2 public void run() {
 3     openEventHandler();
 4 }
 5 
 6 // 开始时间处理
 7 void openEventHandler() {
 8     try {
 9         // 等待次数,1次等待 1s
10         int waitTimes = 60;
11         // 死循环延迟,线程启动最大延时60秒,为了解决消息积压的问题。
12         while (!shutdown && !hasSubscriber() && waitTimes > 0) {
13             ThreadUtils.sleep(1000L);
14             waitTimes--;
15         }
16 
17         //  死循环不断的从任务队列中取出Event,并通知订阅者Subscriber执行Event
18         while (!shutdown) {
19             final Event event = queue.take();
20             // 从队列中取出Event并处理
21             receiveEvent(event);
22             UPDATER.compareAndSet(this, lastEventSequence, Math.max(lastEventSequence, event.sequence()));
23         }
24     } catch (Throwable ex) {
25         LOGGER.error("Event listener exception : ", ex);
26     }
27 }

  第一个循环会等待60s,当无订阅者处理事件时,结束;

  第二个循环会不断从任务队列中获取事件处理,若无任务,阻塞等待,直到有事件发布后执行。

4、通知订阅者处理事件

  在上面已经分析到,订阅者是在构建NamingService实例完成的注册,实际上是将订阅者注册到DefaultPublisher中维护的订阅者集合 subscribers 属性中。

DefaultPublisher#receiveEvent() 详情如下:

 1 /**
 2  * 接收并通知订阅者处理事件
 3  */
 4 void receiveEvent(Event event) {
 5     final long currentEventSequence = event.sequence();
 6     if (!hasSubscriber()) {
 7         LOGGER.warn("[NotifyCenter] the {} is lost, because there is no subscriber.", event);
 8         return;
 9     }
10     // 遍历订阅者
11     for (Subscriber subscriber : subscribers) {
12         if (!subscriber.scopeMatches(event)) {
13             continue;
14         }
15         // 忽略过期事件
16         if (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) {
17             continue;
18         }
19         // 通知订阅者处理事件
20         notifySubscriber(subscriber, event);
21     }
22 }
23 
24 /**
25  * 通知订阅者处理事件
26  */
27 @Override
28 public void notifySubscriber(final Subscriber subscriber, final Event event) {
29 
30     LOGGER.debug("[NotifyCenter] the {} will received by {}", event, subscriber);
31 
32     // 执行订阅者事件
33     final Runnable job = () -> subscriber.onEvent(event);
34     final Executor executor = subscriber.executor();
35 
36     if (executor != null) {
37         executor.execute(job);
38     } else {
39         try {
40             job.run();
41         } catch (Throwable e) {
42             LOGGER.error("Event callback exception: ", e);
43         }
44     }
45 }

  遍历注册的事件处理器subscribers,获取指定事件的订阅者,通知订阅者执行onEvent方法,完成事件的处理,至此整个事件便处理完成了。

5、整体流程

5.1、InstanceChangeEvent事件发布

  在订阅后,从注册中心拉取到的实例列表有更新的情况时,通过NotifyCenter发布了InstancesChangeEvent事件,发布事件要流程:根据InstancesChangeEvent事件类型,获得对应的CanonicalName,再将CanonicalName作为Key,从NotifyCenter.publisherMap中获取对应的事件发布者(EventPublisher),EventPublisher将InstancesChangeEvent事件进行发布。

  通过EventPublisher的实现类DefaultPublisher进行InstancesChangeEvent事件发布,DefaultPublisher本身以守护线程的方式运作,在执行业务逻辑前,先判断该线程是否启动,如果启动,则将事件添加到BlockingQueue中,队列默认大小为16384,添加到BlockingQueue成功,则整个发布过程完成。

  如果添加失败,则直接调用DefaultPublisher.receiveEvent方法,接收事件并通知订阅者,通知订阅者时创建一个Runnable对象,执行订阅者的Event。

5.2、InstanceChangeEvent事件消费

  如果添加到BlockingQueue成功,DefaultPublisher初始化时会创建一个阻塞(BlockingQueue)队列,并标记线程启动 ,DefaultPublisher本身是一个Thread,当执行super.start方法时,会调用它的run方法 ,run方法的核心业务逻辑是通过openEventHandler方法处理的 ,openEventHandler方法通过两个for循环,从阻塞队列中获取时间信息 ,第一个for循环用于让线程启动时在60s内检查执行条件 ,第二个for循环为死循环,从阻塞队列中获取Event,并调用DefaultPublisher#receiveEvent方法,接收事件并通知订阅者。