《优化接口设计的思路》系列:第七篇—接口限流策略

发布时间 2024-01-10 23:47:14作者: sum墨

一、前言

大家好!我是sum墨,一个一线的底层码农,平时喜欢研究和思考一些技术相关的问题并整理成文,限于本人水平,如果文章和代码有表述不当之处,还请不吝赐教。

作为一名从业已达六年的老码农,我的工作主要是开发后端Java业务系统,包括各种管理后台和小程序等。在这些项目中,我设计过单/多租户体系系统,对接过许多开放平台,也搞过消息中心这类较为复杂的应用,但幸运的是,我至今还没有遇到过线上系统由于代码崩溃导致资损的情况。这其中的原因有三点:一是业务系统本身并不复杂;二是我一直遵循某大厂代码规约,在开发过程中尽可能按规约编写代码;三是经过多年的开发经验积累,我成为了一名熟练工,掌握了一些实用的技巧。

好像一提到防抖,接下来就会提到限流,我在第六篇文章写了一些接口防抖的策略,那么这篇正好讲讲接口如何限流。不知道从哪里看到的,“防抖是回城,限流是攻击”,感觉真的很形象,我来简要描述一下:

王者荣耀大家都玩过吧,里面的英雄都有一个攻击间隔,当我们连续的点击普通攻击的时候,英雄的攻速并不会随着我们点击的越快而更快的攻击。这个就是限流,英雄会按照自身攻速的系数执行攻击,我们点的再快也没用。

而防抖在王者荣耀中就是回城,在游戏中经常会遇到连续回城嘲讽对手的玩家,它们每点击一次回城,后一次的回城都会打断前一次的回城,只有最后一次点击的回城会被触发,从而保证回城只执行一次,这就是防抖的概念。

本文参考项目源码地址:summo-springboot-interface-demo

二、业务场景

1. API速率限制

对外提供的API接口可能需要限制每个用户或每个IP地址在单位时间内的访问次数,以防止滥用或过载。
2. 网站流量控制

对于高流量的网站,为了防止瞬时访问量过大导致服务器压力过重,可以通过限流保护系统稳定运行。
3. 秒杀活动

电商平台在进行秒杀活动时,可能会遭遇大量用户同时抢购,通过计数器限流算法可以有效地控制访问量,避免系统崩溃。
4. 微服务架构

在微服务架构中,限流可以防止某个服务因为突发的高流量而成为瓶颈,进而影响到整个系统的稳定性。
5. 分布式系统的互斥操作

在进行诸如分布式锁的操作时,限流算法可以避免过多的请求同时竞争资源,保证系统的公平性和效率。
6. 基础设施保护

对于数据库、缓存等基础设施服务,通过计数器限流可以避免过多的并发请求导致服务不可用。
7. 网络带宽控制

对于带宽有限的网络服务,限流算法可以用来确保带宽的合理分配,防止网络拥堵。

三、限流策略

1. 计数器

(1)代码

CounterRateLimit.java
package com.summo.demo.config.limitstrategy.counter;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface CounterRateLimit {
    /**
     * 请求的数量
     *
     * @return
     */
    int requests();

    /**
     * 时间窗口,单位为秒
     *
     * @return
     */
    int timeWindow();
}

CounterRateLimitAspect.java

package com.summo.demo.config.limitstrategy.counter;

import java.lang.reflect.Method;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import com.summo.demo.exception.biz.BizException;
import com.summo.demo.model.response.ResponseCodeEnum;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;

@Slf4j
@Aspect
@Component
@Order(5)
public class CounterRateLimitAspect {
    /**
     * 用来存储每个方法请求计数的映射
     */
    private final ConcurrentHashMap<String, AtomicInteger> REQUEST_COUNT = new ConcurrentHashMap<>();
    /**
     * 来存储每个方法的时间戳的映射
     */
    private final ConcurrentHashMap<String, Long> TIMESTAMP = new ConcurrentHashMap<>();

    @Around("@annotation(com.summo.demo.config.limitstrategy.counter.CounterRateLimit)")
    public Object rateLimit(ProceedingJoinPoint joinPoint) throws Throwable {
        //获取注解信息
        MethodSignature signature = (MethodSignature)joinPoint.getSignature();
        Method method = signature.getMethod();
        CounterRateLimit counterRateLimit = method.getAnnotation(CounterRateLimit.class);

        //获取注解上配置的参数
        int maxRequests = counterRateLimit.requests();
        long windowSizeInMillis = TimeUnit.SECONDS.toMillis(counterRateLimit.timeWindow());

        // 获取方法的字符串表示,用作键值
        String methodName = method.toString();

        // 初始化计数器和时间戳
        AtomicInteger count = REQUEST_COUNT.computeIfAbsent(methodName, k -> new AtomicInteger(0));
        long startTime = TIMESTAMP.computeIfAbsent(methodName, k -> System.currentTimeMillis());

        // 获取当前时间
        long currentTimeMillis = System.currentTimeMillis();
        // 如果当前时间超出时间窗口,则重置计数器和时间戳
        if (currentTimeMillis - startTime > windowSizeInMillis) {
            // 原子地重置时间戳和计数器
            TIMESTAMP.put(methodName, currentTimeMillis);
            count.set(0);
        }

        // 原子地增加计数器并检查其值
        if (count.incrementAndGet() > maxRequests) {
            // 如果超出最大请求次数,递减计数器,并报错
            count.decrementAndGet();
            throw new BizException(ResponseCodeEnum.LIMIT_EXCEPTION, "Too many requests, please try again later.");
        }
        // 执行原方法
        return joinPoint.proceed();
    }
}

使用方式

/**
  * 计数器算法限流
  *
  * @return
*/
@GetMapping("/counter")
@CounterRateLimit(requests = 2, timeWindow = 2)
public ResponseEntity counter() {
    return ResponseEntity.ok("counter test ok!");
}

(3)原理说明

在示例中,有一个使用了 @CounterRateLimit 注解的 counter 方法。根据注解的参数,这个方法在2秒钟的时间窗口内只能被调用2次。 如果在 2 秒内有更多的调用尝试,那么这些额外的调用将被限流,并返回错误信息。

(4)流程图

(5)缺点

无法处理“临界问题”。

(6)临界问题

假设1min一个时间段,每个时间段内最多100个请求。有一种极端情况,当10:00:58这个时刻100个请求一起过来,到达阈值;当10:01:02这个时刻100个请求又一起过来,到达阈值。这种情况就会导致在短短的4s内已经处理完了200个请求,而其他所有的时间都在限流中。

2. 滑动窗口

(1)代码

SlidingWindowRateLimit.java

package com.summo.demo.config.limitstrategy.slidingwindow;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface SlidingWindowRateLimit {
    /**
     * 请求的数量
     *
     * @return
     */
    int requests();

    /**
     * 时间窗口,单位为秒
     *
     * @return
     */
    int timeWindow();
}

SlidingWindowRateLimitAspect.java

package com.summo.demo.config.limitstrategy.slidingwindow;

import com.summo.demo.exception.biz.BizException;
import com.summo.demo.model.response.ResponseCodeEnum;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;

import java.lang.reflect.Method;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;

@Slf4j
@Aspect
@Component
@Order(5)
public class SlidingWindowRateLimitAspect {
    /**
     * 使用 ConcurrentHashMap 保存每个方法的请求时间戳队列
     */
    private final ConcurrentHashMap<String, ConcurrentLinkedQueue<Long>> REQUEST_TIMES_MAP = new ConcurrentHashMap<>();

    @Around("@annotation(com.summo.demo.config.limitstrategy.slidingwindow.SlidingWindowRateLimit)")
    public Object rateLimit(ProceedingJoinPoint joinPoint) throws Throwable {
        MethodSignature signature = (MethodSignature)joinPoint.getSignature();
        Method method = signature.getMethod();
        SlidingWindowRateLimit rateLimit = method.getAnnotation(SlidingWindowRateLimit.class);
        // 允许的最大请求数
        int requests = rateLimit.requests();
        // 滑动窗口的大小(秒)
        int timeWindow = rateLimit.timeWindow();

        // 获取方法名称字符串
        String methodName = method.toString();
        // 如果不存在当前方法的请求时间戳队列,则初始化一个新的队列
        ConcurrentLinkedQueue<Long> requestTimes = REQUEST_TIMES_MAP.computeIfAbsent(methodName,
            k -> new ConcurrentLinkedQueue<>());

        // 当前时间
        long currentTime = System.currentTimeMillis();
        // 计算时间窗口的开始时间戳
        long thresholdTime = currentTime - TimeUnit.SECONDS.toMillis(timeWindow);

        // 这一段代码是滑动窗口限流算法中的关键部分,其功能是移除当前滑动窗口之前的请求时间戳。这样做是为了确保窗口内只保留最近时间段内的请求记录。
        // requestTimes.isEmpty() 是检查队列是否为空的条件。如果队列为空,则意味着没有任何请求记录,不需要进行移除操作。
        // requestTimes.peek() < thresholdTime 是检查队列头部的时间戳是否早于滑动窗口的开始时间。如果是,说明这个时间戳已经不在当前的时间窗口内,应当被移除。
        while (!requestTimes.isEmpty() && requestTimes.peek() < thresholdTime) {
            // 移除队列头部的过期时间戳
            requestTimes.poll();
        }

        // 检查当前时间窗口内的请求次数是否超过限制
        if (requestTimes.size() < requests) {
            // 未超过限制,记录当前请求时间
            requestTimes.add(currentTime);
            return joinPoint.proceed();
        } else {
            // 超过限制,抛出限流异常
            throw new BizException(ResponseCodeEnum.LIMIT_EXCEPTION, "Too many requests, please try again later.");
        }
    }
}

使用方式

/**
   * 滑动窗口算法限流
   *
   * @return
   */
@GetMapping("/slidingWindow")
@SlidingWindowRateLimit(requests = 2, timeWindow = 2)
public ResponseEntity slidingWindow() {
        return ResponseEntity.ok("slidingWindow test ok!");
}

(3)原理说明

从图上可以看到时间创建是一种滑动的方式前进, 滑动窗口限流策略能够显著减少临界问题的影响,但并不能完全消除它。滑动窗口通过跟踪和限制在一个连续的时间窗口内的请求来工作。与简单的计数器方法不同,它不是在窗口结束时突然重置计数器,而是根据时间的推移逐渐地移除窗口中的旧请求,添加新的请求。
举个例子:假设时间窗口为10s,请求限制为3,第一次请求在10:00:00发起,第二次在10:00:05发起,第三次10:00:11发起,那么计数器策略的下一个窗口开始时间是10:00:11,而滑动窗口是10:00:05。所以这也是滑动窗口为什么可以减少临界问题的影响,但并不能完全消除它的原因。

(4)流程图

3. 令牌桶

(1)代码

该算法我就不造轮子了,直接使用Guava自带的RateLimiter实现。

pom.xml

<!-- guava -->
<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>32.1.1-jre</version>
</dependency>

TokenBucketRateLimit.java

package com.summo.demo.config.limitstrategy.tokenbucket;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
public @interface TokenBucketRateLimit {
    /**
     * 产生令牌的速率(xx 个/秒)
     *
     * @return
     */
    double permitsPerSecond();
}

TokenBucketRateLimitAspect.java

package com.summo.demo.config.limitstrategy.tokenbucket;

import com.google.common.util.concurrent.RateLimiter;
import com.summo.demo.exception.biz.BizException;
import com.summo.demo.model.response.ResponseCodeEnum;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;

import java.lang.reflect.Method;
import java.util.concurrent.ConcurrentHashMap;

@Slf4j
@Aspect
@Component
@Order(5)
public class TokenBucketRateLimitAspect {
    private final ConcurrentHashMap<String, RateLimiter> limiters = new ConcurrentHashMap<>();

    @Around("@annotation(com.summo.demo.config.limitstrategy.tokenbucket.TokenBucketRateLimit)")
    public Object rateLimit(ProceedingJoinPoint joinPoint) throws Throwable {
        MethodSignature signature = (MethodSignature)joinPoint.getSignature();
        Method method = signature.getMethod();
        TokenBucketRateLimit rateLimit = method.getAnnotation(TokenBucketRateLimit.class);
        double permitsPerSecond = rateLimit.permitsPerSecond();

        String methodName = method.toString();
        RateLimiter rateLimiter = limiters.computeIfAbsent(methodName, k -> RateLimiter.create(permitsPerSecond));

        if (rateLimiter.tryAcquire()) {
            return joinPoint.proceed();
        } else {
            throw new BizException(ResponseCodeEnum.LIMIT_EXCEPTION, "Too many requests, please try again later.");
        }
    }
}

使用方式

/**
   * 令牌桶算法限流
   *
   * @return
   */
@GetMapping("/tokenBucket")
@TokenBucketRateLimit(permitsPerSecond = 0.5)
public ResponseEntity tokenBucket() {
    return ResponseEntity.ok("tokenBucket test ok!");
}

(3)原理说明

令牌桶算法是一种流量控制机制,非常适合于处理突发流量,同时保证一定程度的平滑流动。它的工作原理类似于一个实际的水桶,其中水桶代表令牌桶,水流代表令牌。令牌以恒定的速率填充到桶中,直到达到桶的容量上限,多余的> 令牌会被丢弃。当请求(比如网络数据包或者游戏中的攻击动作)到达时,它需要消耗一个令牌才能被处理。如果桶中没有令牌,请求就会被延迟或丢弃,直到桶中再次有令牌为止。

以王者荣耀和LOL中的英雄攻速为例,英雄的攻击动作可以类比为请求,而英雄的攻速属性则确定了令牌生成的速度,即攻击的最大频率。如果英雄的攻击动作必须消耗一个令牌才能执行,那么即使玩家手速再快,也不能超过攻速设定> 的最大限制。这样,英雄的攻击将会保持一个恒定和平滑的节奏,而不会出现一会儿快速连续攻击,一会儿又突然停止的现象。

这种算法的优势在于其能够限制请求的峰值速率,同时允许一定程度的突发请求。在实际应用中,令牌桶算法可以平滑流量,减少拥塞,确保系统的稳定性。相较于仅仅依靠计数器或者滑动窗口算法,令牌桶算法提供了更加灵活和平滑> 的流量控制方式,非常适合需要处理突发性高流量和保持服务质量的场景。

(4)流程图

4. 漏桶

(1)代码

LeakyBucketRateLimit.java

package com.summo.demo.config.limitstrategy.leakybucket;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface LeakyBucketRateLimit {
    /**
     * 桶的容量
     *
     * @return
     */
    int capacity();

    /**
     * 漏斗的速率,单位通常是秒
     *
     * @return
     */
    int leakRate();
}

LeakyBucketLimiter.java

package com.summo.demo.config.limitstrategy.leakybucket;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

public class LeakyBucketLimiter {
    /**
     * 桶的容量
     */
    private final int capacity;
    /**
     * 漏桶的漏出速率,单位时间内漏出水的数量
     */
    private final int leakRate;
    /**
     * 当前桶中的水量
     */
    private volatile int water = 0;
    /**
     * 上次漏水的时间
     */
    private volatile long lastLeakTime = System.currentTimeMillis();

    /**
     * 漏桶容器
     */
    private static final ConcurrentHashMap<String, LeakyBucketLimiter> LIMITER_MAP = new ConcurrentHashMap<>();

    /**
     * 静态工厂方法,确保相同的方法使用相同的漏桶实例
     *
     * @param methodKey 方法名
     * @param capacity
     * @param leakRate
     * @return
     */
    public static LeakyBucketLimiter createLimiter(String methodKey, int capacity, int leakRate) {
        return LIMITER_MAP.computeIfAbsent(methodKey, k -> new LeakyBucketLimiter(capacity, leakRate));
    }

    private LeakyBucketLimiter(int capacity, int leakRate) {
        this.capacity = capacity;
        this.leakRate = leakRate;
    }

    /**
     * 尝试获取许可(try to acquire a permit),如果获取成功返回true,否则返回false
     *
     * @return
     */
    public boolean tryAcquire() {
        long currentTime = System.currentTimeMillis();
        synchronized (this) {
            // 计算上次漏水到当前时间的时间间隔
            long leakDuration = currentTime - lastLeakTime;
            // 如果时间间隔大于等于1秒,表示漏桶已经漏出一定数量的水
            if (leakDuration >= TimeUnit.SECONDS.toMillis(1)) {
                // 计算漏出的水量
                long leakQuantity = leakDuration / TimeUnit.SECONDS.toMillis(1) * leakRate;
                // 漏桶漏出水后,更新桶中的水量,但不能低于0
                water = (int)Math.max(0, water - leakQuantity);
                lastLeakTime = currentTime;
            }
            // 判断桶中的水量是否小于容量,如果是则可以继续添加水(相当于获取到令牌)
            if (water < capacity) {
                water++;
                return true;
            }
        }
        // 如果桶满,则获取令牌失败
        return false;
    }

}

LeakyBucketRateLimitAspect.java

package com.summo.demo.config.limitstrategy.leakybucket;

import java.lang.reflect.Method;

import com.summo.demo.exception.biz.BizException;
import com.summo.demo.model.response.ResponseCodeEnum;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;

@Slf4j
@Aspect
@Component
@Order(5)
public class LeakyBucketRateLimitAspect {

    @Around("@annotation(com.summo.demo.config.limitstrategy.leakybucket.LeakyBucketRateLimit)")
    public Object rateLimit(ProceedingJoinPoint joinPoint) throws Throwable {
        MethodSignature signature = (MethodSignature)joinPoint.getSignature();
        Method method = signature.getMethod();
        LeakyBucketRateLimit leakyBucketRateLimit = method.getAnnotation(LeakyBucketRateLimit.class);

        int capacity = leakyBucketRateLimit.capacity();
        int leakRate = leakyBucketRateLimit.leakRate();

        // 方法签名作为唯一标识
        String methodKey = method.toString();

        LeakyBucketLimiter limiter = LeakyBucketLimiter.createLimiter(methodKey, capacity, leakRate);
        if (!limiter.tryAcquire()) {
            // 超过限制,抛出限流异常
            throw new BizException(ResponseCodeEnum.LIMIT_EXCEPTION, "Too many requests, please try again later.");
        }

        return joinPoint.proceed();
    }
}

使用方式

/**
   * 漏桶算法限流
   *
   * @return
   */
@GetMapping("/leakyBucket")
@LeakyBucketRateLimit(capacity = 100, leakRate = 20)
public ResponseEntity leakyBucket() {
    return ResponseEntity.ok("leakyBucket test ok!");
}

(3)原理说明

在Leaky Bucket算法中,容器有一个固定的容量,类似于漏桶的容量。数据以固定的速率进入容器,如果容器满了,则多余的数据会溢出。容器中的数据会以恒定的速率从底部流出,类似于漏桶中的水滴。如果容器中的数据不足以满足流出速率,则会等待直到有足够的数据可供流出。这样就实现了对数据流的平滑控制。

(4)流程图

四、小结一下

如果面试中被问到如何进行接口优化,大家第一印象会想到什么?放到3年前的我来看,第一反应肯定是优化接口性能,然后说一个本来耗时好几秒的接口优化到毫秒的案例。而现在的我会说:上下文、权限控制、防抖/限流等等,当然也会说性能优化。感谢一直在学习的自己!

在写这篇文章之前,我看了很多大佬写的相关文章,才开始动笔,不是看不懂也不是写不来,只是因为我没怎么在真实业务中用到限流策略,我不敢乱写。怕给人埋坑。

回顾前面的6篇,加上这一篇一共7篇,共计耗时3个月,但第6篇到第7篇则整整花了一个月,不是在偷懒,理论型的东西好写,但没有实际业务支持那就是高谈阔论。不过沉淀了一个月,不写点东西我怕马上就忘记了,还是写一篇,拙文共赏吧!如果有哪位同学在真实业务中使用过类似的限流策略,可以和我们分享一下经验哈,谢谢啦!