记录socket-io注册到nacos后,nacos默认轮询策略会导致服务请求失败

发布时间 2024-01-08 17:54:23作者: XSWClevo

问题:

socket-io服务注册到nacos, 出现多个socket-io实例。此时前端发送请求,nginx请求gateway地址,gateway根据url寻找注册到nacos上的服务名,服务名有多个实例,nacos默认的轮询策略会出现服务连接失败。因为有的socket-io服务里面没有需要的namespace。所以报错。并且出现连接成功后中间断开请求。因为是上一次的请求没有断开导致超时然后出现的蝴蝶效应。

 

解决方案:

1. 重写LoadRalancer策略

代码如下, 请供参考:

pom.xml

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-loadbalancer</artifactId>
    <version>4.0.3</version>
</dependency>

实现代码

import com.alibaba.cloud.commons.lang.StringUtils;
import com.alibaba.cloud.nacos.NacosDiscoveryProperties;
import com.alibaba.cloud.nacos.loadbalancer.NacosLoadBalancer;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.client.naming.core.Balancer;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.*;
import org.springframework.cloud.loadbalancer.core.NoopServiceInstanceListSupplier;
import org.springframework.cloud.loadbalancer.core.ReactorServiceInstanceLoadBalancer;
import org.springframework.cloud.loadbalancer.core.SelectedInstanceCallback;
import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
import org.springframework.http.HttpHeaders;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.io.IOException;
import java.math.BigDecimal;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.http.HttpClient;
import java.net.http.HttpConnectTimeoutException;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

// 自定义负载均衡实现需要实现 ReactorServiceInstanceLoadBalancer 接口 以及重写choose方法
public class NacosSocketCustomLoadBalancer extends NacosLoadBalancer implements ReactorServiceInstanceLoadBalancer {

    private final String SERVICE_id = "socket-io-service";

    // 注入当前服务的nacos的配置信息
    private final NacosDiscoveryProperties nacosDiscoveryProperties;

    // loadbalancer 提供的访问当前服务的名称
    final String serviceId;

    // loadbalancer 提供的访问的服务列表
    ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider;

    Request request;

    public NacosSocketCustomLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider, String serviceId, NacosDiscoveryProperties nacosDiscoveryProperties) {
        super(serviceInstanceListSupplierProvider, serviceId, nacosDiscoveryProperties);
        this.serviceId = serviceId;
        this.serviceInstanceListSupplierProvider = serviceInstanceListSupplierProvider;
        this.nacosDiscoveryProperties = nacosDiscoveryProperties;
    }

    /**
     * 服务器调用负载均衡时调的放啊
     * 此处代码内容与 RandomLoadBalancer 一致
     */
    public Mono<Response<ServiceInstance>> choose(Request request) {
        this.request = request;
        System.out.println("choose.request:" + request);
        ServiceInstanceListSupplier supplier = this.serviceInstanceListSupplierProvider.getIfAvailable(NoopServiceInstanceListSupplier::new);
        if (SERVICE_id.equals(this.serviceId)) {
            Flux<List<ServiceInstance>> listFlux = supplier.get(request);
            return listFlux.next().map((serviceInstances) -> this.processInstanceResponse(supplier, serviceInstances));
        }
        // 不是我们需要的服务ID走默认的Nacos轮询策略
        return super.choose(request);
    }


    /**
     * 对负载均衡的服务进行筛选的方法
     * 此处代码内容与 RandomLoadBalancer 一致
     */
    private Response<ServiceInstance> processInstanceResponse(ServiceInstanceListSupplier supplier, List<ServiceInstance> serviceInstances) {
        System.out.println("processInstanceResponse.serviceInstances:" + serviceInstances);
        Response<ServiceInstance> serviceInstanceResponse = this.getInstanceResponse(serviceInstances);
        System.out.println("serviceInstanceResponse ==> " + serviceInstanceResponse);
        if (supplier instanceof SelectedInstanceCallback && serviceInstanceResponse.hasServer()) {
            ((SelectedInstanceCallback) supplier).selectedServiceInstance(serviceInstanceResponse.getServer());
        }

        return serviceInstanceResponse;
    }

    /**
     * 对负载均衡的服务进行筛选的方法
     * 自定义
     * 此处的 instances 实例列表  只会提供健康的实例  所以不需要担心如果实例无法访问的情况
     */
    private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances) {
        if (instances.isEmpty()) {
            return new EmptyResponse();
        }
        // 获取当前服务所在的集群名称
        String currentServiceName = nacosDiscoveryProperties.getService();
        System.out.println("currentServiceName:" + currentServiceName);
        // 过滤在同一集群下注册的服务 根据集群名称筛选的集合
        List<ServiceInstance> sameClusterNameInstList = instances.stream().filter(i -> StringUtils.equals(i.getMetadata().get("nacos.service"), currentServiceName)).collect(Collectors.toList());
        ServiceInstance sameClusterNameInst;
        if (sameClusterNameInstList.isEmpty()) {
            // 如果为空,则根据权重直接过滤所有服务列表
            sameClusterNameInst = getHostByRandomWeight(instances);
        } else {
            // 如果不为空,则根据权重直接过滤所在集群下的服务列表
            sameClusterNameInst = getHostByRandomWeight(sameClusterNameInstList);
        }
        return new DefaultResponse(sameClusterNameInst);
    }

    private ServiceInstance getHostByRandomWeight(List<ServiceInstance> sameClusterNameInstList) {

        List<Instance> list = new ArrayList<>();
        Map<String, ServiceInstance> dataMap = new HashMap<>();
        // 此处将 ServiceInstance 转化为 Instance 是为了接下来调用nacos中的权重算法,由于入参不同,所以需要转换,此处建议打断电进行参数调试,以下是我目前为止所用到的参数,转化为map是为了最终方便获取取值到的服务对象
        sameClusterNameInstList.forEach(i -> {
            Instance ins = new Instance();
            Map<String, String> metadata = i.getMetadata();

            ins.setInstanceId(metadata.get("nacos.instanceId"));
            ins.setWeight(new BigDecimal(metadata.get("nacos.weight")).doubleValue());
            ins.setClusterName(metadata.get("nacos.cluster"));
            ins.setEphemeral(Boolean.parseBoolean(metadata.get("nacos.ephemeral")));
            ins.setHealthy(Boolean.parseBoolean(metadata.get("nacos.healthy")));
            ins.setPort(i.getPort());
            ins.setIp(i.getHost());
            ins.setServiceName(i.getServiceId());

            ins.setMetadata(metadata);

            list.add(ins);
            // key为服务ID,值为服务对象
            dataMap.put(metadata.get("nacos.instanceId"), i);
        });
        // 调用自定义选择socket实例的算法
        System.out.println("main list:{  }" + list);
        Instance hostByRandomWeightCopy = ExtendBalancer.getHostBySocketResponse(list, this.request);

        // 根据最终ID获取需要返回的实例对象
        return dataMap.get(hostByRandomWeightCopy.getInstanceId());
    }

}

class ExtendBalancer extends Balancer {

    public static Instance getHostBySocketResponse(List<Instance> hosts, Request request) {
        DefaultRequest defaultRequest = (DefaultRequest) request;

        Instance availableInstances = getAvailableInstances(hosts, defaultRequest);
        if (availableInstances != null) {
            return availableInstances;
        }
        // 如果全部未响应,使用默认算法
        return getHostByRandomWeight(hosts);
    }

    public static Instance getAvailableInstances(List<Instance> hosts, DefaultRequest defaultRequest) {
        RequestDataContext context = (RequestDataContext) defaultRequest.getContext();
        RequestData clientRequest = context.getClientRequest();
        HttpHeaders headers = clientRequest.getHeaders();

        Instance instance = new Instance();

        String rawUrl = clientRequest.getUrl().toString();
        // 创建HttpClient
        HttpClient httpClient = HttpClient.newBuilder()
                .build();

        // 构建HttpRequest
        Map<String, String> headerMap = new HashMap<>();
        headers.forEach((k, v) -> {
            if (k.equals("Host")) {
                return;
            }
            headerMap.put(k, v.get(0));
        });
        // 网关url转为服务url
        String targetUrl = replaceServerUrl(rawUrl);

        for (Instance host : hosts) {
            try {
                String hostName = host.getIp() + ":" + host.getPort();
                headerMap.put("Host", hostName);
                String url = targetUrl.formatted(hostName);
                HttpRequest.Builder builder = HttpRequest.newBuilder()
                        .uri(new URI(url));
                headerMap.forEach(builder::header);
                HttpRequest request = builder.GET()
                        .timeout(Duration.ofMillis(100))
                        .build();
                // 发送请求,只需要保证是这个机器,不需要实际连接,以免还要写端口逻辑。
                // 对应的服务会出现错误日志,received a frame that is not masked as expected。无需理会
                httpClient.send(request, HttpResponse.BodyHandlers.ofString());
            } catch (HttpConnectTimeoutException | InterruptedException | URISyntaxException e) {
                System.err.println(e.getMessage());
            } catch (IOException e) {
                instance = host;
            }
        }

        return instance;
    }

    /**
     * 网关的URL转为服务的URL
     * @param rawUrl 网关的URL
     * @return 获取到的服务的URL
     */
    private static String replaceServerUrl(String rawUrl) {
        // 匹配IP地址和端口的正则表达式
        String regex = "http://(\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}):(\\d+)/.*";

        // 创建Pattern对象
        Pattern pattern = Pattern.compile(regex);

        // 创建Matcher对象
        Matcher matcher = pattern.matcher(rawUrl);

        // 进行匹配和替换
        if (matcher.matches()) {
            String ipAddress = matcher.group(1);
            String port = matcher.group(2);
            String replacedString = rawUrl.replace(ipAddress + ":" + port, "%s");

            // 输出替换后的字符串
            System.out.println("Replaced String: " + replacedString);
            return replacedString;
        }
        return "";
    }

    static {
        // jdk 屏蔽了部分请求头,需要手动开启
        System.setProperty("jdk.httpclient.allowRestrictedHeaders", "connection,content-length,expect,host,upgrade");
    }
}

配置类注册

@Configuration
@LoadBalancerClients(defaultConfiguration = SpringBeanConfig.class)
public class SpringBeanConfig {

    @Bean
    @LoadBalanced
    public RestTemplate restTemplate(){
        return new RestTemplate();
    }


    @Bean
    public ReactorLoadBalancer<ServiceInstance> nacosLoadBalancer(Environment environment,
                                                                  LoadBalancerClientFactory loadBalancerClientFactory,
                                                                  NacosDiscoveryProperties nacosDiscoveryProperties) {
        String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
        return new NacosSocketCustomLoadBalancer(
                loadBalancerClientFactory.getLazyProvider(name,
                        ServiceInstanceListSupplier.class),
                name, nacosDiscoveryProperties);
    }
}

socket-io注册到nacos文章:
https://www.cnblogs.com/xxsdnol/p/17903542.html