Nacos源码(五):服务端健康检查源码分析

发布时间 2023-12-08 16:58:10作者: 无虑的小猪

  服务注册到Nacos后,其他服务就可以获取该服务的实例信息,调用此服务;当服务宕机,Nacos会将该服务信息从维护的服务实例列表中删除,此时,其他服务获取不到该服务的实例信息,无法调用该服务。该服务是否应该被删除,取决于该服务是否健康,Nacos提供健康检查机制,判断服务是否有问题,将不健康的服务剔除下线。

  在Nacos1.x版本中临时实例通过Distro协议内存进行存储,客户端向注册中心发送心跳来维持自身healthy状态;永久实例通过Raft协议进行永久化存储,服务端定时与客户端建立tcp连接做健康检查。

  在Nacos2.x版本后临时实例不再使用心跳,而是通过GRPC长连接是否存活来判断实例是否健康,但仍然保留对Nacos1.x的http客户端的支持。

1、客户端健康检查

  在Nacos2.x临时节点的探活通过GRPC长连接实现,长连接是指在一个连接上可以连续发送多个数据包,在连接保持期间,如果没有数据包发送,需要双方发链路检测包。

  只要Client与Server的连接存在,就表明客户端是healthy状态。下面来看看长连接是怎样的建立起来的。

  在 Nacos源码(二):客户端服务注册源码分析 中提到,客户端临时节点通过代理NamingClientProxyDelegate调用 NamingGrpcClientProxy 中的方法完成实例注册功能,NamingGrpcClientProxy对象是在 NamingClientProxyDelegate 的构造方法中创建。

0

  NamingGrpcClientProxy构造方法详情如下:

0

  RpcClient#start() 的主要步骤如下:

0

1.1、连接Nacos服务

  连接Nacos服务并设置客户端状态,将连接事件添加到阻塞队列中,便于回调处理:

0

1.2、Nacos连接事件回调

  客户端连接、断开连接事件回调基于阻塞队列 eventLinkedBlockingQueue 完成的,在连接Nacos服务后,会将连接事件添加到 eventLinkedBlockingQueue 队列中,等待线程池的回调处理。

0

1.3、探活处理

  永真循环检测长连接,若Naocs服务端无法连接,连接集群中的其他Nacos服务节点。

0

  至此,临时节点健康检查,涉及GRPC长连接启动,及探活处理部分分析完成,下面来看看永久节点的健康检查是如何处理的。

2、服务端节点健康检查

  在Nacos源码(四):服务端服务注册源码分析中提到,客户端管理器 ClientManagerDelegate 实现 ClientManager 接口,持有 EphemeralIpPortClientManager、 PersistentIpPortClientManage 客户端管理器。PersistentIpPortClientManage对应的是基于Ip、端口的永久节点客户端管理器。

  在注册实例,根据客户端Id获取连接时,会设置心跳检测的定时任务:

0

  最终执行到 PersistentIpPortClientManager 的 clientConnected 方法:

 1 public boolean clientConnected(final Client client) {
 2     clients.computeIfAbsent(client.getClientId(), s -> {
 3         Loggers.SRV_LOG.info("Client connection {} connect", client.getClientId());
 4         IpPortBasedClient ipPortBasedClient = (IpPortBasedClient) client;
 5         // 设置心跳检查定时任务
 6         ipPortBasedClient.init();
 7         return ipPortBasedClient;
 8     });
 9     return true;
10 }

  IpPortBasedClient#init() 心跳检查定时任务如下:

 1 /**
 2  * 初始化client,设置心跳检测定时任务
 3  * Init client.
 4  */
 5 public void init() {
 6     // 临时节点
 7     if (ephemeral) {
 8         // ClientBeatCheckTaskV2 作为心跳检测的任务
 9         beatCheckTask = new ClientBeatCheckTaskV2(this);
10         HealthCheckReactor.scheduleCheck(beatCheckTask);
11     // 永久节点
12     } else {
13         // HealthCheckTaskV2 作为心跳检测的任务
14         healthCheckTaskV2 = new HealthCheckTaskV2(this);
15         HealthCheckReactor.scheduleCheck(healthCheckTaskV2);
16     }
17 }

  临时实例:使用 ClientBeatCheckTaskV2 处理健康检查。

  永久实例:使用 HealthCheckTaskV2 处理健康检查。

2.1、永久实例健康检查

  永久节点的健康检查是由服务端定时与客户端建立tcp连接做健康检查,是服务端主动发起的探测,服定时请求客户端判断是否健康。

  永久实例使用 HealthCheckTaskV2 处理健康检查,HealthCheckTaskV2类图如下:

0

  通过实现的run()方法得知,执行 HealthCheckTaskV2#doHealthCheck() 完成发起永久实例的健康检测。

0

  最终执行 HealthCheckProcessorV2Delegate#process(),HealthCheckProcessorV2Delegate是一个代理类,具体的健康检查由内部维护的healthCheckProcessorMap中的具体实现类完成。

 1 /**
 2  * 健康检查处理代理类
 3  */
 4 public class HealthCheckProcessorV2Delegate implements HealthCheckProcessorV2 {
 5 
 6     // 健康检查实现类 容器
 7     private final Map<String, HealthCheckProcessorV2> healthCheckProcessorMap = new HashMap<>();
 8 
 9     @Override
10     public void process(HealthCheckTaskV2 task, Service service, ClusterMetadata metadata) {
11         // 从元数据中获取当前客户端的健康检查类型,默认TCP (HTTP、TCP、MYSQL、NONE) => HealthCheckType 枚举类
12         String type = metadata.getHealthyCheckType();
13         // 根据类型从缓存中获取 健康检查具体处理类
14         HealthCheckProcessorV2 processor = healthCheckProcessorMap.get(type);
15         // 找不到处理类 使用 NoneHealthCheckProcessor 做健康检查,及什么都不做
16         if (processor == null) {
17             processor = healthCheckProcessorMap.get(NoneHealthCheckProcessor.TYPE);
18         }
19         processor.process(task, service, metadata);
20     }
21 }

  HealthCheckProcessorV2类图如下:

0

  子类实现与 HealthCheckType 枚举中的健康检查类型一一对应,下面来看下具体实现:

2.1.1、HttpHealthCheckProcessor

  HttpHealthCheckProcessor的process是通过 RestTemplate 完成健康检测请求。

0

2.1.2、MysqlHealthCheckProcessor

  MysqlHealthCheckProcessor的process是执行配置中的sql,完成健康检测。

0

  健康检测任务 MysqlCheckTask#run(),通过执行SQL来完成健康检测,执行过程不报异常,证明实例健康。

0

2.1.3、TcpHealthCheckProcessor

  TcpHealthCheckProcessor是通过构建Socket,然后对连接或读入事件进行监听,完成健康检测。

0

  需要进行健康检测的实例添加到队列后,是在哪里执行的呢?

  TcpHealthCheckProcessor实现了Runables接口,在TcpHealthCheckProcessor构造函数中启动了该任务。

0

  在run方法中,执行了 processTask处理任务:

0

  TcpHealthCheckProcessor#processTask() 详情如下:

 1 /**
 2  * 处理任务
 3  * @throws Exception
 4  */
 5 private void processTask() throws Exception {
 6     Collection<Callable<Void>> tasks = new LinkedList<>();
 7     do {
 8         // 获取队列中需要进行心跳检测的实例
 9         Beat beat = taskQueue.poll(CONNECT_TIMEOUT_MS / 2, TimeUnit.MILLISECONDS);
10         if (beat == null) {
11             return;
12         }
13 
14         // 添加到任务队列
15         tasks.add(new TaskProcessor(beat));
16     } while (taskQueue.size() > 0 && tasks.size() < NIO_THREAD_COUNT * 64);
17 
18     // 调用队列中任务,并获取执行结果
19     for (Future<?> f : GlobalExecutor.invokeAllTcpSuperSenseTask(tasks)) {
20         f.get();
21     }
22 }

2.1.4、NoneHealthCheckProcessor

  NoneHealthCheckProcessor什么都不做,不对示例进行健康检测。

2.2、临时实例健康检查

  临时实例的 ClientBeatCheckTaskV2 处理健康检查在,是通过责任链完成的健康检测。

0

  AbstractNamingInterceptorChain#doInterceptor 详情如下:

0

  最终会调用传入的参数 InstanceBeatCheckTask#passIntercept()

0

  passIntercept方法实际上会遍历 statics 代码块中的 UnhealthyInstanceChecker、ExpiredInstanceChecker 和 用户自定义实现了InstanceBeatChecker接口的检测。

2.2.1、UnhealthyInstanceChecker

  UnhealthyInstanceChecker 是不健康实例的检查器,详情如下:

 1 // 健康检测
 2 public void doCheck(Client client, Service service, HealthCheckInstancePublishInfo instance) {
 3     // 调整实例健康状态
 4     if (instance.isHealthy() && isUnhealthy(service, instance)) {
 5         // 将当前不健康实例的状态调整为为 不健康
 6         changeHealthyStatus(client, service, instance);
 7     }
 8 }
 9 
10 /**
11  * 判断实例是否健康
12  * @param service
13  * @param instance
14  * @return
15  */
16 private boolean isUnhealthy(Service service, HealthCheckInstancePublishInfo instance) {
17     // 获取超时时间 默认 15 秒;可通过 preserved.heart.beat.timeout 配置更改
18     long beatTimeout = getTimeout(service, instance);
19     // 当前时间距离上一次发送心跳包时间  超过了 规定的超时时间  则返回 true,代表节点不健康了
20     return System.currentTimeMillis() - instance.getLastHeartBeatTime() > beatTimeout;
21 }
22 
23 /**
24  * 调整实例健康状态
25  */
26 private void changeHealthyStatus(Client client, Service service, HealthCheckInstancePublishInfo instance) {
27     // 改为false
28     instance.setHealthy(false);
29     Loggers.EVT_LOG
30             .info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client last beat: {}", instance.getIp(),
31                     instance.getPort(), instance.getCluster(), service.getName(), UtilsAndCommons.LOCALHOST_SITE,
32                     instance.getLastHeartBeatTime());
33     NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service));
34     NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(client));
35     NotifyCenter.publishEvent(new HealthStateChangeTraceEvent(System.currentTimeMillis(),
36             service.getNamespace(), service.getGroup(), service.getName(), instance.getIp(), instance.getPort(),
37             false, "client_beat"));
38 }

2.2.2、ExpiredInstanceChecker

  UnhealthyInstanceChecker 是过期实例的检查器,详情如下:

 

 1 public class ExpiredInstanceChecker implements InstanceBeatChecker {
 2 
 3     @Override
 4     public void doCheck(Client client, Service service, HealthCheckInstancePublishInfo instance) {
 5         // 帕努单当前实例是否过期
 6         boolean expireInstance = ApplicationUtils.getBean(GlobalConfig.class).isExpireInstance();
 7         // 若实例过期,直接剔除实例
 8         if (expireInstance && isExpireInstance(service, instance)) {
 9             deleteIp(client, service, instance);
10         }
11     }
12 
13     private boolean isExpireInstance(Service service, HealthCheckInstancePublishInfo instance) {
14         // 获取超时时间 默认 30 秒;可通过 preserved.ip.delete.timeout 配置更改。
15         long deleteTimeout = getTimeout(service, instance);
16         // 当前时间距离上一次发送心跳包时间  超过了 规定的超时时间  则返回 true,代表节点过期了,需要进行节点剔除操作
17         return System.currentTimeMillis() - instance.getLastHeartBeatTime() > deleteTimeout;
18     }
19 
20     /**
21      * 剔除服务
22      * @param client
23      * @param service
24      * @param instance
25      */
26     private void deleteIp(Client client, Service service, InstancePublishInfo instance) {
27         Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.toString(), JacksonUtils.toJson(instance));
28         client.removeServiceInstance(service);
29         // 注销客户端
30         NotifyCenter.publishEvent(new ClientOperationEvent.ClientDeregisterServiceEvent(service, client.getClientId()));
31         // 修改实例元数据
32         NotifyCenter.publishEvent(new MetadataEvent.InstanceMetadataEvent(service, instance.getMetadataId(), true));
33         // 注销实例
34         NotifyCenter.publishEvent(new DeregisterInstanceTraceEvent(System.currentTimeMillis(), "",
35                 false, DeregisterInstanceReason.HEARTBEAT_EXPIRE, service.getNamespace(), service.getGroup(),
36                 service.getName(), instance.getIp(), instance.getPort()));
37     }
38 }