Nacos深入原理从源码层面讲解

发布时间 2023-09-17 22:04:40作者: 上善若泪

1 Nacos原理

1.1 Nacos架构

图片

  • Provider APP:服务提供者
  • Consumer APP:服务消费者
  • Name Server:通过VIPVirtual IP)或DNS的方式实现Nacos高可用集群的服务路由
  • Nacos ServerNacos服务提供者,里面包含的Open API是功能访问入口,Conig ServiceNaming ServiceNacos提供的配置服务、命名服务模块。Consitency Protocol是一致性协议,用来实现Nacos集群节点的数据同步,这里使用的是Raft算法(Etcd、Redis哨兵选举)
  • Nacos Console:控制台

1.2 注册中心原理

注册中心原理:

  • 服务实例在启动时注册到服务注册表,并在关闭时注销
  • 服务消费者查询服务注册表,获得可用实例
  • 服务注册中心需要调用服务实例的健康检查API来验证它是否能够处理请求
    在这里插入图片描述

1.3 SpringCloud服务注册

Spring-Cloud-Common包中有一个类org.springframework.cloud. client.serviceregistry .ServiceRegistry ,它是Spring Cloud提供的服务注册的标准。集成到Spring Cloud中实现服务注册的组件,都会实现该接口。
在这里插入图片描述
该接口有一个实现类是 NacoServiceRegistry

SpringCloud集成Nacos的实现过程:
spring-clou-commons包的META-INF/spring.factories中包含自动装配的配置信息如下:

图片
其中AutoServiceRegistrationAutoConfiguration就是服务注册相关的配置类:

@Configuration(proxyBeanMethods = false)
@Import(AutoServiceRegistrationConfiguration.class)
@ConditionalOnProperty(value ="spring.cloud.service-registry.auto-registration.enabled",matchIfMissing = true)
public class AutoServiceRegistrationAutoConfiguration{
	@Autowired(required = false)
	private AutoServiceRegistration autoServiceRegistration;
	@Autowired
	private AutoServiceRegistrationProperties properties;
	@PostConstruct
	protected void init() {
		if (this.autoServiceRegistration == null && this.properties.isFailFast()) {
			throw new IllegalStateException("Auto Service Registration has been requested,but there is no AutoServiceRegistration bean");
}}}

AutoServiceRegistrationAutoConfiguration配置类中,可以看到注入了一个AutoServiceRegistration实例,该类的关系图如下所示。
图片

可以看出, AbstractAutoServiceRegistration 抽象类实现了该接口,并且最重要的是NacosAutoServiceRegistration继承了AbstractAutoServiceRegistration

看到EventListener我们就应该知道,Nacos是通过Spring的事件机制集成到SpringCloud中去的。

AbstractAutoServiceRegistration实现了onApplicationEvent抽象方法,并且监听WebServerInitializedEvent事件(当Webserver初始化完成之后) , 调用this.bind ( event )方法。

@Override
public void onApplicationEvent(WebServerInitializedEvent event) {
	bind(event);
}
@Deprecated
public void bind(WebServerInitializedEvent event) {
	ApplicationContext context = event.getApplicationContext();
	if (context instanceof ConfigurableWebServerApplicationContext){
		if ("management".equals(((ConfigurableWebServerApplicationContext)context).getServerNamespace )))
			return;
		}
	}
	this.port.compareAndSet( 0,event.getWebServer() .getPort());
	this.start();
}

最终会调用NacosServiceREgistry.register()方法进行服务注册。

public void start() {
	if (!isEnabled()) {
		if (logger.isDebugEnabled()) [
			logger.debug("Discovery Lifecycle disabled. Not starting");
		}
		return;
	}
// only initialize if nonSecurePort is greater than 0 and it isn't already running
// because of containerPortInitializer below
	if (!this.running.get()){
		this.context.publishEvent(new InstancePreRegisteredEvent(this,getRegistration()));
		register();
		if (shouldRegisterManagement()){
			registerManagement();
		}
		this.context.publishEvent(new InstanceRegisteredEvent<>(this, getConfiguration()));
		this.running.compareAndSet(false,true);
	}
}

protected void register(){
	this.serviceRegistry.register(getRegistration());
}

1.4 NacosServiceRegistry实现

NacosServiceRegistry.registry方法中,调用了Nacos Client SDK中的namingService.registerInstance完成服务的注册。

@Override
public void register(Registration registration){
	if (StringUtils.isEmpty(registration.getServiceId())) {
		log.warn("No service to register for nacos client...");
		return;
	}
	String serviceId = registration.getServiceId();
	Instance instance = getNacosInstanceFromRegistration(registration):
	try{
		namingService.registerInstance(serviceId,instance);
		log.info("nacos registry,{} {} : {}register finished", serviceId,instance.getIp(),instance.getPort());
	}catch (Exception e) {
		log.error("nacos registry, {} register failed... {},",serviceId,registration.toString(),e);
	}
}

跟踪NacosNamingServiceregisterInstance()方法:

@Override
public void registerInstance(String serviceName, Instance instance) throws NacosException {
	registerInstance(serviceName,Constants.DEFAULT_GROUP,instance);
}
@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
	if (instance.isEphemeralO){
		BeatInfo beatInfo = new BeatInfo();
		beatInfo. setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
		beatInfo.setIp(instance.getIp());
		beatInfo.setPort(instance.getPort());
		beatInfo.setCluster(instance.getClusterName());
		beatInfo.setWeight(instance.getWeight());
		beatInfo.setMetadata(instance .getMetadata());
		beatInfo.setScheduled(false);
		long instanceInterval = instance.getInstanceHeartBeatInterval();
		beatInfo.setPeriod(instanceInterval == 0 ? DEFAULT_HEART_BEAT_INTERVAL : instanceInterval);
		
		beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
	}
serverProxy.registerService(Namingutils.getGroupedName(serviceName, groupName), groupName, instance);
}

通过beatReactor.addBeatInfo()创建心跳信息实现健康检测, Nacos Server必须要确保注册的服务实例是健康的,而心跳检测就是服务健康检测的手段。
serverProxy.registerService()实现服务注册

1.4.1 心跳机制

public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
	NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
	dom2Beat.put(buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort()), beatInfo);
	executorService.schedule(new BeatTask(beatInfo), 0, TimeUnit.MILLISECONDS);
	MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
}

从上述代码看,所谓心跳机制就是客户端通过schedule定时向服务端发送一个数据包 ,然后启动一个线程不断检测服务端的回应,如果在设定时间内没有收到服务端的回应,则认为服务器出现了故障。Nacos服务端会根据客户端的心跳包不断更新服务的状态。

1.4.2 注册原理

Nacos 提供了SDKOpen API两种形式来实现服务注册。

Open API:

curl -X POST 'http://127.0.0.1:8848/nacos/v1/ns/instance?serviceName=nacos.naming.serviceName&ip=192.16813.1&port=8080'

SDK:

void registerInstance(String serviceName, String ip, int port) throws NacosException;

这两种形式本质都一样,底层都是基于HTTP协议完成请求的。所以注册服务就是发送一个HTTP请求:

public void registerService(String serviceName,
String groupName,Instance instance) throws NacosException {

	NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}",namespaceId,serviceName,instance);
	final Map<String,String> params = new HashMap<>(9);
	params.put(CommonParams.NAMESPACE_ID,namespaceId);
	params.put(CommonParams.SERVICE_NAME,serviceName);
	params.put(CommonParams.GROUP_NAME,groupName);
	params.put(CommonParams.CLUSTERNAME,instance.getClusterName());
	params.put("ip",instance.getIp());
	params .put("port",String. valueOf(instance.getPort()));
	params.put("weight",String.valueOf(instance.getWeight()));
	params.put("enable",String.valueOf(instance.isEnabled()));
	params.put("healthy",String.valueOf(instance.isHealthy()));
	params.put("ephemeral",String.valueOf(instance.isEphemeral()));
	params.put("metadata",JSON.toJSONString(instance.getMetadata()));
	
	regAPI(UtilAndComs .NACOS_URL_INSTANCE,params,HttpMethod.POSD);
}

对于nacos服务端,对外提供的服务接口请求地址为nacos/v1/ns/instance,实现代码在nacos-naming模块下的InstanceController类中:

@RestController
@RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT+"/instance")
public class InstanceController{
//省略部分代码
@CanDistro
@PostMapping
public String register(HttpServletRequest request) throws Exception {
	String serviceName = WebUtils.required(request,CommonParams.SERVICENAME);
	String namespaceId = WebUtils.optional(request,CommonParams,NAMESPACE_ID,Constants.DEFAULT_NAMESPACE_ID);
	serviceManager.registerInstance(namespaceId,serviceName,parseInstance(request));
	return"ok";
	}
//省略部分代码
}
  • 从请求参数汇总获得serviceName(服务名)和namespaceId(命名空间Id)

  • 调用registerInstance注册实例

public void registerInstance(String namespaceld, String serviceName, Instance instance)throws NacosException{
	createEmptyService(namespaceId,serviceNameinstance.isEphemeral());
	Service service=getService(namespaceId,serviceName);
	if (service== null){
		throw new NacosException(NacosException.INVALID_PARAM,"service not found,namespace:"+namespaceId +",service:"+serviceName);
	}
	addInstance(namespaceId,serviceName,instance.isEphemeral(),instance);
}
  • 创建一个控服务(在Nacos控制台服务列表中展示的服务信息),实际上是初始化一个serviceMap,它是一个ConcurrentHashMap集合
  • getService,从serviceMap中根据namespaceIdserviceName得到一个服务对象
  • 调用addInstance添加服务实例
public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local,Cluster cluster) throws NacosException {
	Service service = getService(namespaceId,serviceName);
	if(service== null){
		service= new Service();
		service.setName(serviceName);
		service.setNamespaceId(namespaceId);
		service.setGroupName(NamingUtils.getGroupName(serviceName));
		service.setLastModifiedMillis(System.currentTimeMillis());
		service.recalculateChecksum();
		if(cluster != null){
			cluster.setService(service);
			service.getClusterMap().put(cluster.getName(),cluster);
		}
		service.validate();
		putServiceAndInit(service);
		if(!local){
			addOrReplaceService(service);
		}
	}
}
  • 根据namespaceIdserviceName从缓存中获取Service实例
  • 如果Service实例为空,则创建并保存到缓存中
private void putServiceAndInit(Service service) throws NacosException{
	putService(service);
	service.init();
	consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(),service.getName(),true),service);
	consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(),service.getName(),false),service);
	Loggers.SRV_LOG.info("[NEW-SERVICE]{}",service.toJSON());
}
  • 通过putService()方法将服务缓存到内存
  • service.init()建立心跳机制
  • consistencyService.listen实现数据一致性监听

service.init()方法的如下图所示,它主要通过定时任务不断检测当前服务下所有实例最后发送心跳包的时间。如果超时,则设置healthyfalse表示服务不健康,并且发送服务变更事件。

在这里请大家思考一一个问题,服务实例的最后心跳包更新时间是谁来触发的?实际上前面有讲到, Nacos客户端注册服务的同时也建立了心跳机制。
在这里插入图片描述

putService方法,它的功能是将Service保存到serviceMap中:

public void putService(Service service)(
	if(!serviceMap.containsKey(service.getNamespaceId())){
		synchronized (putServiceLock){
			if(!serviceMap.containsKey(service,getNamespaceId())){
				serviceMap.put(service.getNamespaceId(),new ConcurrentHashMap<>(16));
			}
		}
	}
	serviceMap.get(service.getNamespaceId()).put(service.getName(),service);
}

继续调用addInstance方法把当前注册的服务实例保存到Service中:

addInstance(namespaceId,serviceName,instance.isEphemeral(),instance)

1.4.3 总结

  • Nacos客户端通过Open API的形式发送服务注册请求
  • Nacos服务端收到请求后,做以下三件事:
    • 构建一个Service对象保存到ConcurrentHashMap集合中
    • 使用定时任务对当前服务下的所有实例建立心跳检测机制
    • 基于数据一致性协议服务数据进行同步

1.5 服务提供者地址查询

Open API:

curl -X GET127.00.1:8848/nacos/v1/ns/instance/list?serviceName=example

SDK:

List<Instance> selectInstances(String serviceName, boolean healthy) throws NacosException;

InstanceController中的list方法:

@GetMapping("/list")
public JSONObject list(HttpServletRequest request) throws Exception {
	String namespaceId = WebUtils.optional(request,CommonParams,NAMESPACE_ID,Constants.DEFAULT_NAMESPACE_ID);
	String serviceName = WebUtils.required(request,CommonParams.SERVICE_NAME);
	String agent =WebUtils.getUserAgent(request);
	String clusters = WebUtils.optional(request,"clusters",StringUtils.EMPTY);
	String clientIP = WebUtils.optional(request,"clientIp", StringUtils.EMPTY);
	Integer udpPort = Integer.parseInt(WebUtils.optional(request, "udpPort","0"));
	String env= WebUtils.optional(request,"env",StringUtils.EMPTY);
	boolean isCheck = Boolean.parseBoolean(WebUtils.optional(request,"isCheck","false"));
	String app= WebUtils.optional(request,"app",StringUtils.EMPTY);
	String tenant = WebUtils.optional(request,"tid",StringUtils.EMPTY);
	boolean healthyOnly = Boolean.parseBoolean(WebUtils.optional(request,"healthyOnly""false"));
	return doSrvIPXT(namespaceld, serviceName, agent, clusters, clientIP, udpPort, env,isCheck,app,tenant,healthyOnly);
}
  • 解析请求参数
  • 通过doSrvIPXT返回服务列表数据
public JSONObject doSrvIPXT(String namespaceld, String serviceName, String agent, String clusters,String clientIP,int udpPort,String env,boolean isCheck,String app,String tid,boolean healthyonly)
throws Exception {
	//以下代码中移除了很多非核心代码
	ClientInfo clientInfo = new ClientInfo(agent);
	JSONObject result=new JSONObject();
	Service service= serviceManager.getService(namespaceId,serviceName);
	List<Instance> srvedIPs;
	//获取指定服务下的所有实例 IP
	srvedIPs = service.srvIPs(Arrays.asList(StringUtils.split(clusters,",")));
	Map<Boolean,List<Instance>>ipMap =new HashMap<>(2);
	ipMap.put(Boolean.TRUE,new ArrayList<>());
	ipMap.put(Boolean.FALSE,new ArrayList<>());
	for (Instance ip : srvedIPs){
		ipMap.get(ip.isHealthy()).add(ip);
	}
	//遍历,完成JSON字管中的纠装
	JSONArray hosts = new JSONArray();
	for (Map.Entry<Boolean, List<Instance>> entry : ipMap.entrySet()) {
		List<Instance> ips = entry.getValue();
		if (healthyOnly && !entry.getKey()){
			continue;
		}
		for (Instance instance :ips) {
			if (!instanceisEnabled()) {
				continue;
			}
			JSONObject ipobj=new JSONObject();
			ipobj.put("ip",instance.getIp());
			ipObj.put("port",instance.getPort());
			ipObj.put("valid",entry.getKey());
			ipObj.put("healthy",entry.getKey());
			ipObj.put("marked",instance.isMarked());
			ipObj.put("instanceId",instance.getInstanceId());
			ipObj.put("metadata",instance.getMetadata());
			ipObj.put("enabled",instance.isEnabled());
			ipObj.put("weight",instance.getweight());
			ipObj.put("clusterName",instance.getClusterName());
			if(clientInfo.type== ClientInfo.ClientType.JAVA 
				&& clientInfo.version.compareTo(VersionUtil.parseVersion("1.0."))>=0){
				ipObj.put("serviceName",instance.getServiceName());
			}else{
				ipObj.put("serviceName",NamingUtils.getServiceName(instance.getServiceName()));
			}
			ipObj.put("ephemeral",instance.isEphemeral());
			hosts.add(ipobj);
		}
	}
	result.put("hosts",hosts);
	result.put("name",serviceName);
	result.put("cacheMillis",cacheMillis);
	result.put("lastRefTime",System.currentTimeMillis());
	result.put("checksum",service.getChecksum());
	result.put("useSpecifiedURL",false);
	result.put("clusters",clusters);
	result.put("env",env);
	result.put("metadata",service.getMetadata());
	return result;
}
  • 根据namespaceIdserviceName获得Service实例
  • Service实例中基于srvIPs得到所有服务提供者实例
  • 遍历组装JSON字符串并返回

1.6 Nacos服务地址动态感知原理

可以通过subscribe方法来实现监听,其中serviceName表示服务名、EventListener表示监听到的事件:

void subscribe(String serviceName, EventListener listener) throws NacosException;

具体调用方式如下:

NamingService naming = NamingFactory.createNamingService(System.getProperty("serveAddr"));
naming.subscribe("example",event->(
	if (event instanceof NamingEvent) {
		System.out.println(((NamingEvent) event).getServceName());
		System.out.printIn(((NamingEvent) event).getInstances());
	}
});

或者调用selectInstance方法,如果将subscribe属性设置为true,会自动注册监听:

public List<Instance> selectInstances(String serviceName, List<String> clusters, boolean healthy,boolean subscribe)

在这里插入图片描述

Nacos客户端中有一个HostReactor类,它的功能是实现服务的动态更新,基本原理是:

  • 客户端发起时间订阅后,在HostReactor中有一个UpdateTask线程,每10s发送一次Pull请求,获得服务端最新的地址列表
  • 对于服务端,它和服务提供者的实例之间维持了心跳检测,一旦服务提供者出现异常,则会发送一个Push消息给Nacos客户端,也就是服务端消费者
  • 服务消费者收到请求之后,使用HostReactor中提供的processServiceJSON解析消息,并更新本地服务地址列表