【MSA】Spring Cloud Hystrix

发布时间 2023-06-25 22:57:41作者: lihewei

1. 前言

1.1 什么是服务雪崩?

在微服务中,会经常出线链式调用,例如:用户服务 --> 订单服务 --> 商品服务

  1. 当用户访问A的某个接口时,tomcat会给用户分配一个现成,支持用户访问
  2. 服务A需要调用服务B ,B的tomcat会给A分配一个现成,支持A的访问
  3. 服务B需要调用服务C,但C挂了,但B在访问之前不知道,直到超时,才发现C无法访问

结果:因为C不可用,导致B的现成不能及时回收,从而导致A的现成也无法及时回收,导致整个服务链的线程池里没有线程可用了。从此在有用户访问A,那么tomcat直接报503,服务不可用!!

服务雪崩的本质:线程没有及时回收。

不管线程调用成功还是失败,只要线程及时回收,就可以解决服务雪崩问题

1.2 怎么解决服务雪崩

  • 修改调用的超时时长(不推荐):将服务间的调用超时时长改小,这样就可以让线程及时回收,保证服务可用
  • 设置拦截器:例如 B —> C ,在拦截器直到C的状态,B调用C的时候,从拦截器里判断C的状态,若C挂了,直接return,若C正常,继续调用

2. Hystrix 快速入门

当有服务调用的时候,才会出现服务雪崩,所以 Hystrix 常和 OpenFeign,Ribbon 一起出现

2.1 Hystrix + OpenFeign

2.1.1 服务提供者

  1. 创建 provider-order-service 服务(勾选web、Eureka Discovery Client 启动依赖), OpenFegin默认集成了Hystrix。

    注意:boot版本:2.3.12.RELEASE、cloud版本:Hoxton.SR12

  2. provider-order-service 修改配置文件

    server:
    	port: 8082
    spring:
    	application:
    		name: provider-order-service
    eureka:
    	client:
    		service-url:
    			defaultZone: http://localhost:8761/eureka
    instance:
    	instance-id: ${spring.application.name}:${server.port}
    	prefer-ip-address: true
    
  3. provider-order-service 修改启动类 @EnableEurekaClient

  4. 增加一个访问接口

    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    @RestController
    public class OrderController {
    	/**
    	* 订单服务下单接口
    	* @return
    	*/
    	@GetMapping("doOrder")
    	public String doOrder() {
    	System.out.println("有用户来下单了");
    	return "下单成功";
    	}
    }
    
  5. 测试

2.1.2 服务消费者

  1. 创建 consumer-user-service

  2. 创建 OrderServiceHystrix 实现 OrderServiceFeign(代替方案)

    package com.lihw.fegin.hystrix;
    import com.lihw.fegin.OrderServiceFeign;
    
    @Component//要将此类加入IOC
    public class OrderServiceHystrix implements OrderServiceFeign {
    	@Override
    	public String doOrder() {
    		System.out.println("调用下单服务失败,我走 hystrix 了");
    		return "我是 hystrix 的 doOrder,说明下单失败了";
    	}
    }
    
  3. 修改 OrderServiceFeign 增加一个 fallback

    @FeignClient(value = "provider-order-service", fallback = OrderServiceHystrix.class)
    
  4. 修改 yml 配置文件 (使用断路器)

    feign:
    	hystrix:
    		enabled: true #开启断路器的使用
    

2.1.3 测试

服务提供者宕机后,调用doOrder()方法会走到OrderServiceHystrix.java 中的方法上

2.2 Hystrix + Ribbon

修改 consumer-user-service

  1. 添加 Hystrix 的依赖,ribbon 没有集成 hystrix

    <dependency>
    	<groupId>org.springframework.cloud</groupId>
    	<artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
    </dependency>
    
  2. 修改启动类

    @SpringBootApplication
    @EnableEurekaClient
    @EnableFeignClients
    @EnableCircuitBreaker //开启断路器
    public class ConsumerUserServiceApplication {
    	public static void main(String[] args) {
    		SpringApplication.run(ConsumerUserServiceApplication.class, args);
    	}
    }
    
  3. 修改 controller

    /**
    * 用户下单方法 ribbon 的熔断
    * @return
    * @HystrixCommand(fallbackMethod = "ribbonHystrix")
    * 指定熔断的方法
    */
    @GetMapping("userDoOrderRibbon")
    @HystrixCommand(fallbackMethod = "ribbonHystrix")
    public String testRibbonHystrix(String serviceId) {
    	String result = restTemplate.getForObject("http:" + serviceId + "/doOrder", String.class);
    	System.out.println(result);
    	return "成功";
    }
    
    //方法签名要和原来的方法一致(备选方法)
    public String ribbonHystrix(String serviceId) {
    	return "我是 ribbon 的备选方案";
    }
    

3. Hystrix 的常用配置?

server:
  port: 8081
spring:
  application:
    name: consumer-user-service
eureka:
  client:
    service-url:
      defaultZone: http://localhost:8761/eureka/
    fetch-registry: true
    register-with-eureka: true
  instance:
    instance-id: ${spring.application.name}:${server.port}
    prefer-ip-address: true
feign:
  hystrix:
    enabled: true
hystrix: #hystrix 的全局控制
  command:
    default: #default 是全局控制,也可以换成单个方法控制,把 default 换成方法名即可
      fallback:
        isolation:
          semaphore:
            maxConcurrentRequests: 1000 #信号量隔离级别最大并发数
      circuitBreaker:
        enabled: true #开启断路器
        requestVolumeThreshold: 3 #失败次数(阀值)
        sleepWindowInMilliseconds: 20000 #窗口时间
        errorThresholdPercentage: 60 #失败率
      execution:
        isolation:
          Strategy: thread #隔离方式 thread 线程隔离集合和 SEMAPHORE 信号量隔离级别
            thread:
              timeoutInMilliseconds: 3000 #调用超时时长
ribbon:
  ReadTimeout: 5000 #要结合 feign 的底层 ribbon 调用的时长
  ConnectTimeout: 5000

隔离方式:两种隔离方式

  • thread 线程池: 按照 group(10 个线程)划分服务提供者,用户请求的线程和做远程的线程不一样

    优点:当 B 服务调用失败了 或者请求 B 服务的量太大了 不会对 C 服务造成影响 用户访问比较大的情况下使用比较好 异步的方式

    缺点:线程间切换开销大,对机器性能影响

    场景:调用第三方服务 并发量大的情况下

  • SEMAPHORE 信号量隔离: 每次请进来 有一个原子计数器 做请求次数的++ 当请求完成以后 --

    优点:对 cpu 开销小

    缺点:并发请求不易太多 当请求过多 就会拒绝请求 做一个保护机制

    场景:使用内部调用 ,并发小的情况下

4. Hystrix 设计断路器模型

手写断路器?

时间窗口滑动模型

3.1 断路器的状态说明

  • 关:服务正常调用 A---》B

  • 开:在一段时间内,调用失败次数达到阀值(5s 内失败 3 次)则断路器打开,直接 return

  • 半开:断路器打开后,过一段时间,让少许流量尝试调用 B 服务,如果成功则断路器关闭,使服务正常调用,如果失败,则继续半开

3.2 设计MyHystrix

1. 创建项目选择依赖:

  • Spring Boot DevTools
  • Lombok
  • Spring Configuration Processor
  • Spring Web

2. 创建断路器状态模型

/**
 * @explain: 自定义断路器状态枚举类
 * @author: lihewei
*/
public enum HystrixStatus {
    CLOSE,//关闭
    OPEN,//打开
    HALF_OPEN//半开
}

3. 创建断路器 Hystrix

package com.lihw.model;

import lombok.Data;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @explain: 自定义断路器
 * @author: lihewei
*/
@Data
public class Hystrix {

    /**
     * 窗口时间
     */
    public static final Integer WINDOW_TIME= 20;

    /**
     * 最大失败次数
     */
    public static final Integer MAX_FAIL_COUNT= 3;

    /**
     * 断路器状态
     */
    private HystrixStatus status = HystrixStatus.CLOSE;

    /**
     * 判断当前断路器失败次数的容器 AtomicInteger(线程安全的容器)
     */
    private AtomicInteger currentFailCount = new AtomicInteger(0);

    /**
     * 线程池
     */
    private ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(
            4,
            8,
            30,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(2000),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy()
    );

    private Object lock = new Object();

    {
        poolExecutor.execute(() -> {
            while(true){
                try {
                    TimeUnit.SECONDS.sleep(WINDOW_TIME);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // 如果断路器是开的 那么 不会去调用  就不会有失败  就不会记录次数 没有必要清零  这个线程可以不执行
                if (this.status.equals(HystrixStatus.CLOSE)) {
                    // 20s 清零一次
                    this.currentFailCount.set(0);
                } else {
                    // 半开或者开 不需要去记录次数 这个线程可以不工作
                    // 学过生产者 消费者模型  wait notifyAll  condition singleAll await   它们只能随机唤醒某一个线程
                    synchronized (lock) {
                        try {
                            lock.wait();
                            System.out.println("我被唤醒了,开始工作");
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
        });
    }

    /**
     * 记录失败次数
     */
    public void addFailCount() {
        int i = currentFailCount.incrementAndGet();  // ++i
        if (i >= MAX_FAIL_COUNT) {
            // 说失败次数已经到了阈值了
            // 修改当前状态为 open
            this.setStatus(HystrixStatus.OPEN);
            System.out.println("修改状态为开放状态 open");
            // 当断路器打开以后  就不能去访问了  需要将他变成半开
            // 等待一个时间窗口  让断路器变成半开
            poolExecutor.execute(() -> {
                try {
                    TimeUnit.SECONDS.sleep(WINDOW_TIME);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                this.setStatus(HystrixStatus.HALF_OPEN);
                System.out.println("变成半开状态 HALF_OPEN");
                // 重置失败次数  不然下次进来直接就会打开断路器
                this.currentFailCount.set(0);
            });
        }
    }

}

4. 引入切面类比拦截器

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-aop</artifactId>
</dependency>

5. 自定义注解

package com.lihw.anno;
import java.lang.annotation.*;

/**
 * 熔断器切面注解,加到需要增加熔断器的方法上
 */
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface LihwHystrix {
}

6. 创建 HystrixAspect

类似方法拦截器

package com.lihw.aspect;

import com.lihw.model.Hystrix;
import com.lihw.model.HystrixStatus;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;
import java.util.Random;

@Component
@Aspect
public class HystrixAspect {

    // 因为一个消费者可以去调用多个提供者  每个提供者都有自己的断路器
    // 在消费者里面去创建一个断路器的容器
    public static Map<String, Hystrix> fishMap = new HashMap<>();

    static {
        // 假设 是需要去调用order-service的服务
        fishMap.put("order-service", new Hystrix());
    }

    Random random = new Random();

    /**
     * 这个就类比拦截器
     * 就是要判断 当前断路器的状态 从而决定是否发起调用(执行目标方法)
     * @param joinPoint
     * @return
     */
    @Around(value = "@annotation(com.lihw.anno.LihwHystrix)")
    public Object fishAround(ProceedingJoinPoint joinPoint) {
        Object result = null;
        // 获取到当前提供者的断路器
        Hystrix hystrix = fishMap.get("order-service");
        HystrixStatus status = hystrix.getStatus();
        switch (status) {
            case CLOSE:
                // 正常  去调用 执行目标方法
                try {
                    result = joinPoint.proceed();
                    return result;
                } catch (Throwable throwable) {
                    // 说明调用失败  记录次数
                    hystrix.addFailCount();
                    return "我是备胎-当前断路器状态:CLOSE";
                }
            case OPEN:
                // 不能调用
                return "我是备胎-当前断路器状态:OPEN";
            case HALF_OPEN:
                // 可以用少许流量去调用
                int i = random.nextInt(5);
                System.out.println(i);
                if (i == 1) {
                    // 去调用
                    try {
                        result = joinPoint.proceed();
                        // 说明成功了 断路器关闭
                        hystrix.setStatus(HystrixStatus.CLOSE);
                        synchronized (hystrix.getLock()) {
                            hystrix.getLock().notifyAll();
                        }
                        return result;
                    } catch (Throwable throwable) {
                        return "我是备胎-当前断路器状态:HALF_OPEN";
                    }
                }
            default:
                return "我是备胎--default";
        }
    }

}

7. 创建测试类

package com.lihw.controller;

import com.lihw.anno.LihwHystrix;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.RestTemplate;

@RestController
public class lihwController {

    @Autowired
    private RestTemplate restTemplate;

    @GetMapping("doRpc")
    @LihwHystrix
    public String doRpc(){
        String result = restTemplate.getForObject("http://localhost:8989/abc", String.class);//路径不存在
        return result;
    }
}