了解asp.net core限流中间件

发布时间 2023-11-15 21:56:44作者: 果小天

Asp.net Core限流中间件

当我们做后台应用的开发的时候,如果流量大了,应用很可能扛不住,所有为了避免应用挂了,导致更大的问题,需要我们对应用进行限流设置。即牺牲掉一些请求,避免整个服务挂掉。

介绍

在.net 7中就出了 Microsoft.AspNetCore.RateLimiting 中间件提供速率限制中间件。核心原理是应用可配置速率限制策略,然后将策略附加到终结点。即还是将属性附加在中间点上。

限流算法

  1. 固定窗口

    即固定时间请求的次数,超过次数就会限流,下一个窗口时间将次数重置

    //新增代码
    builder.Services.AddRateLimiter(_ =>
    {
        _.AddFixedWindowLimiter("FixWindows", options =>
        {
            //options.AutoReplenishment = true;
            options.QueueLimit = 1;
            options.PermitLimit = 5;
            options.Window=TimeSpan.FromSeconds(5);
            options.QueueProcessingOrder= QueueProcessingOrder.OldestFirst;
        });
    });
    //省略代码 新增代码
     app.UseRateLimiter();
    app.MapGet("/weatherforecast", (HttpContext httpContext) =>
    {
        var forecast = Enumerable.Range(1, 5).Select(index =>
            new WeatherForecast
            {
                Date = DateOnly.FromDateTime(DateTime.Now.AddDays(index)),
                TemperatureC = Random.Shared.Next(-20, 55),
                Summary = summaries[Random.Shared.Next(summaries.Length)]
            })
            .ToArray();
        return forecast;
    })
    .WithName("GetWeatherForecast")
    .WithOpenApi()
     //新增代码
    .RequireRateLimiting("FixWindows");
    

经过测试,多余的请求还是会等待。奇了怪了

  1. 滑动窗口

与固定窗口类似,只不过将固定窗口的时间切分的更细,可以称之为段,即以段为时间计算请求的次数,每过一个时间段,我们的计数窗口向右滑动一段,计算这个时间窗口的总次数。

滑动窗口可以解决固定窗口的临界问题,即一个固定窗口60s,限流100次。前面59秒的时候都没有访问,但是59秒的时候来了100个请求,然后60秒的时候又来了100个请求。这种时候固定窗口是没有作用的,因为固定时间窗口更新了。而滑动窗口的解决方案是60秒可以划分为了6段,每段10秒,59秒的时候来了100次请求,这个段的计数器+100、到了60s滑动窗口向右滑了一个时间段,此时如果又来了100个请求,那么会拒绝,因为从10-70秒这个窗口里面已经有了59秒来的100次请求了。所有会拒绝。

代码示例省略相同的代码

_.AddSlidingWindowLimiter("SlidingWindows", options =>
{
    options.QueueLimit = 2;
    options.PermitLimit = 4;
    options.Window= TimeSpan.FromSeconds(5);
    options.SegmentsPerWindow = 2;
    options.QueueProcessingOrder = QueueProcessingOrder.OldestFirst;
});
.RequireRateLimiting("SlidingWindows");
  1. 令牌桶

即向一个桶里面放令牌,如果请求过来的时候需要拿到令牌才能被处理,否则丢弃或者重试,如果桶里面的令牌满了则会被丢弃,间隔一段时间向桶里放若干个令牌。

 _.AddTokenBucketLimiter("TokenLimit", options =>
 {
     options.QueueLimit = 2;
     options.TokenLimit = 4; //令牌桶容量 多余会被丢弃 
     options.QueueProcessingOrder = QueueProcessingOrder.OldestFirst;
     options.ReplenishmentPeriod = TimeSpan.FromSeconds(10);//间隔多长时间放一次令牌
     options.TokensPerPeriod = 2;//每次向令牌桶放入的令牌数量
     options.AutoReplenishment = true;
 });
.RequireRateLimiting("TokenLimit");
  1. 并发

即当有请求过来的时候,并发量+1,请求处理完了并发-1,对于前面三种来说并不限制请求的总量,只限制最大速率。

_.AddConcurrencyLimiter("ConLimit", options =>
{
    options.QueueLimit = 10; //达到并发限制进入队列排队的数量,多余会被丢弃
    options.PermitLimit = 20;//并发请求最大数量
    options.QueueProcessingOrder = QueueProcessingOrder.OldestFirst;
   
});
.RequireRateLimiting("ConLimit");

源码逻辑

核心限流中间件是 :RateLimitingMiddleware

public static IApplicationBuilder UseRateLimiter(this IApplicationBuilder app)
    {
        ArgumentNullException.ThrowIfNull(app);
 
        VerifyServicesAreRegistered(app);
 
        return app.UseMiddleware<RateLimitingMiddleware>();
    }

看看具体逻辑吧 省略较多代码

internal sealed partial class RateLimitingMiddleware
{    
    //省略一堆代码 
    public Task Invoke(HttpContext context)
    {
        //获取终结点
        var endpoint = context.GetEndpoint();
        // If this endpoint has a DisableRateLimitingAttribute, don't apply any rate limits.  如果终结点有 DisableRateLimitingAttribute 不限流
        if (endpoint?.Metadata.GetMetadata<DisableRateLimitingAttribute>() is not null)
        {
            return _next(context);
        }
        var enableRateLimitingAttribute = endpoint?.Metadata.GetMetadata<EnableRateLimitingAttribute>();
        // If this endpoint has no EnableRateLimitingAttribute & there's no global limiter, don't apply any rate limits.
         如果没有EnableRateLimitingAttribute,而且也没有全局限流器也不要限流
        if (enableRateLimitingAttribute is null && _globalLimiter is null)
        {
            return _next(context);
        }
        //具体调用逻辑 
        return InvokeInternal(context, enableRateLimitingAttribute);
    }
 
    private async Task InvokeInternal(HttpContext context, EnableRateLimitingAttribute? enableRateLimitingAttribute)
    {
        //限流策略
        var policyName = enableRateLimitingAttribute?.PolicyName;
        var metricsContext = _metrics.CreateContext(policyName);
        //获取处理许可
        using var leaseContext = await TryAcquireAsync(context, metricsContext);
        
        if (leaseContext.Lease?.IsAcquired == true)//获取到了
        {
            var startTimestamp = Stopwatch.GetTimestamp();
            var currentLeaseStart = metricsContext.CurrentLeasedRequestsCounterEnabled;
            try
            {
                //继续下一个中间件
                _metrics.LeaseStart(metricsContext);
                await _next(context);
            }
            finally
            {
                _metrics.LeaseEnd(metricsContext, startTimestamp, Stopwatch.GetTimestamp());
            }
        }
        else
        {
            _metrics.LeaseFailed(metricsContext, leaseContext.RequestRejectionReason!.Value);
            //请求终止 不要调用拒绝策略
            // If the request was canceled, do not call OnRejected, just return.
            if (leaseContext.RequestRejectionReason == RequestRejectionReason.RequestCanceled)
            {
                return;
            }
            var thisRequestOnRejected = _defaultOnRejected;
            RateLimiterLog.RequestRejectedLimitsExceeded(_logger);
            // OnRejected "wins" over DefaultRejectionStatusCode - we set DefaultRejectionStatusCode first,
            // then call OnRejected in case it wants to do any further modification of the status code.
            context.Response.StatusCode = _rejectionStatusCode;
             //如果请求被终结点限流器拒绝使用它自己的拒绝策略如果可以的话
            // If this request was rejected by the endpoint limiter, use its OnRejected if available.
            if (leaseContext.RequestRejectionReason == RequestRejectionReason.EndpointLimiter)
            {
                DefaultRateLimiterPolicy? policy;
                // Use custom policy OnRejected if available, else use OnRejected from the Options if available.
                policy = enableRateLimitingAttribute?.Policy;
                if (policy is not null)
                {
                    thisRequestOnRejected = policy.OnRejected;
                }
                else
                {
                    if (policyName is not null && _policyMap.TryGetValue(policyName, out policy) && policy.OnRejected is not null)
                    {
                        thisRequestOnRejected = policy.OnRejected;
                    }
                }
            }
            if (thisRequestOnRejected is not null)
            {
                // leaseContext.Lease will only be null when the request was canceled.
                await thisRequestOnRejected(new OnRejectedContext() { HttpContext = context, Lease = leaseContext.Lease! }, context.RequestAborted);
            }
        }
    }
 

可以看出来逻辑:首先决定是否要限流,根据attrubure标识做具体逻辑分析 ,然后如果要限流 则获取处理请求的许可,如果获取到了就下一个中间件,不能则执行拒绝策略

那么看看如果获取处理请求的许可呢 方法:TryAcquireAsync

逻辑是 调用 CombinedAcquire 获取许可,先从全局限流获取许可,再从终结点限流器获取许可,如果全局没有获取就直接返回。最后再次异步调用 CombinedWaitAsync 从逻辑上讲 和CombinedAcquire差不多。

    private async ValueTask<LeaseContext> TryAcquireAsync(HttpContext context, MetricsContext metricsContext)
    {    //组合获取许可
        var leaseContext = CombinedAcquire(context);
        if (leaseContext.Lease?.IsAcquired == true)
        {
            return leaseContext;
        }//异步再次等待获取许可
        var waitTask = CombinedWaitAsync(context, context.RequestAborted);
        // If the task returns immediately then the request wasn't queued.
        if (waitTask.IsCompleted)
        {
            return await waitTask;
        }
 
        var startTimestamp = Stopwatch.GetTimestamp();
        try
        {
            _metrics.QueueStart(metricsContext);
            leaseContext = await waitTask;
            return leaseContext;
        }
        finally
        {
            _metrics.QueueEnd(metricsContext, leaseContext.RequestRejectionReason, startTimestamp, Stopwatch.GetTimestamp());
        }
    }
 
    private LeaseContext CombinedAcquire(HttpContext context)
    {
        RateLimitLease? globalLease = null;
        RateLimitLease? endpointLease = null;
 
        try
        {
            if (_globalLimiter is not null)
            {   //全局限流器获取
                globalLease = _globalLimiter.AttemptAcquire(context);
                if (!globalLease.IsAcquired)
                {   //没有直接返回
                    return new LeaseContext() { RequestRejectionReason = RequestRejectionReason.GlobalLimiter, Lease = globalLease };
                }
            }
            //终结点限流器获取
            endpointLease = _endpointLimiter.AttemptAcquire(context);
            if (!endpointLease.IsAcquired)
            {
                globalLease?.Dispose();
                return new LeaseContext() { RequestRejectionReason = RequestRejectionReason.EndpointLimiter, Lease = endpointLease };
            }
        }
        catch (Exception)
        {
            endpointLease?.Dispose();
            globalLease?.Dispose();
            throw;
        }
        return globalLease is null ? new LeaseContext() { Lease = endpointLease } : new LeaseContext() { Lease = new DefaultCombinedLease(globalLease, endpointLease) };
    }
 
    private async ValueTask<LeaseContext> CombinedWaitAsync(HttpContext context, CancellationToken cancellationToken)
    {
        RateLimitLease? globalLease = null;
        RateLimitLease? endpointLease = null;
 
        try
        {
            if (_globalLimiter is not null)
            {
                globalLease = await _globalLimiter.AcquireAsync(context, cancellationToken: cancellationToken);
                if (!globalLease.IsAcquired)
                {
                    return new LeaseContext() { RequestRejectionReason = RequestRejectionReason.GlobalLimiter, Lease = globalLease };
                }
            }
            endpointLease = await _endpointLimiter.AcquireAsync(context, cancellationToken: cancellationToken);
            if (!endpointLease.IsAcquired)
            {
                globalLease?.Dispose();
                return new LeaseContext() { RequestRejectionReason = RequestRejectionReason.EndpointLimiter, Lease = endpointLease };
            }
        }
        catch (Exception ex)
        {
            endpointLease?.Dispose();
            globalLease?.Dispose();
            // Don't throw if the request was canceled - instead log. 
            if (ex is OperationCanceledException && context.RequestAborted.IsCancellationRequested)
            {
                RateLimiterLog.RequestCanceled(_logger);
                return new LeaseContext() { RequestRejectionReason = RequestRejectionReason.RequestCanceled };
            }
            else
            {
                throw;
            }
        }
 
        return globalLease is null ? new LeaseContext() { Lease = endpointLease } : new LeaseContext() { Lease = new DefaultCombinedLease(globalLease, endpointLease) };
    }
 

看到了上面的代码有些疑问,全局限流器哪来的?答案是可以设置的 :在AddRateLimiter上进行设置。

builder.Services.AddRateLimiter(_ =>
                                
_.GlobalLimiter = PartitionedRateLimiter.CreateChained(
PartitionedRateLimiter.Create<HttpContext, string>(httpContext =>
{
    var userAgent = httpContext.Request.Headers.UserAgent.ToString();

    return RateLimitPartition.GetFixedWindowLimiter
    (userAgent, _ =>
        new FixedWindowRateLimiterOptions
        {
            AutoReplenishment = true,
            PermitLimit = 4,
            Window = TimeSpan.FromSeconds(2)
        });
}),
PartitionedRateLimiter.Create<HttpContext, string>(httpContext =>
{
    var userAgent = httpContext.Request.Headers.UserAgent.ToString();

    return RateLimitPartition.GetFixedWindowLimiter
    (userAgent, _ =>
        new FixedWindowRateLimiterOptions
        {
            AutoReplenishment = true,
            PermitLimit = 20,
            Window = TimeSpan.FromSeconds(30)
        });
}));

链式限流器

这里加入了全局限流器,就会进行全局限流,通过 CreateChained API 可传入多个 PartitionedRateLimiter,这些限制器组合成一个 PartitionedRateLimiter。 组合的限制器按顺序运行所有输入限制器。(来自微软官方)

如何自定义限流策略

  1. addpolicy
builder.Services.AddRateLimiter(limiterOptions =>
{
   
    limiterOptions.AddPolicy(policyName: jwtPolicyName, partitioner: httpContext =>
    {
        var accessToken = httpContext.Features.Get<IAuthenticateResultFeature>()?
                              .AuthenticateResult?.Properties?.GetTokenValue("access_token")?.ToString()
                          ?? string.Empty;

        if (!StringValues.IsNullOrEmpty(accessToken))
        {
            return RateLimitPartition.GetTokenBucketLimiter(accessToken, _ =>
                new TokenBucketRateLimiterOptions
                {
                    TokenLimit = myOptions.TokenLimit2,
                    QueueProcessingOrder = QueueProcessingOrder.OldestFirst,
                    QueueLimit = myOptions.QueueLimit,
                    ReplenishmentPeriod = TimeSpan.FromSeconds(myOptions.ReplenishmentPeriod),
                    TokensPerPeriod = myOptions.TokensPerPeriod,
                    AutoReplenishment = myOptions.AutoReplenishment
                });
        }

        return RateLimitPartition.GetTokenBucketLimiter("Anon", _ =>
            new TokenBucketRateLimiterOptions
            {
                TokenLimit = myOptions.TokenLimit,
                QueueProcessingOrder = QueueProcessingOrder.OldestFirst,
                QueueLimit = myOptions.QueueLimit,
                ReplenishmentPeriod = TimeSpan.FromSeconds(myOptions.ReplenishmentPeriod),
                TokensPerPeriod = myOptions.TokensPerPeriod,
                AutoReplenishment = true
            });
    });
});

2.实现IRateLimiterPolicy<>接口

public class SampleRateLimiterPolicy : IRateLimiterPolicy<string>
{
    private Func<OnRejectedContext, CancellationToken, ValueTask>? _onRejected;
    private readonly MyRateLimitOptions _options;

    public SampleRateLimiterPolicy(ILogger<SampleRateLimiterPolicy> logger,
                                   IOptions<MyRateLimitOptions> options)
    {
        _onRejected = (ctx, token) =>
        {
            ctx.HttpContext.Response.StatusCode = StatusCodes.Status429TooManyRequests;
            logger.LogWarning($"Request rejected by {nameof(SampleRateLimiterPolicy)}");
            return ValueTask.CompletedTask;
        };
        _options = options.Value;
    }

    public Func<OnRejectedContext, CancellationToken, ValueTask>? OnRejected => _onRejected;

    public RateLimitPartition<string> GetPartition(HttpContext httpContext)
    {
        return RateLimitPartition.GetSlidingWindowLimiter(string.Empty,
            _ => new SlidingWindowRateLimiterOptions
            {
                PermitLimit = _options.PermitLimit,
                QueueProcessingOrder = QueueProcessingOrder.OldestFirst,
                QueueLimit = _options.QueueLimit,
                Window = TimeSpan.FromSeconds(_options.Window),
                SegmentsPerWindow = _options.SegmentsPerWindow
            });
    }
}
//还要注册 
builder.Services.AddRateLimiter(Options =>
{
    Options.AddPolicy<string, SampleRateLimiterPolicy>(policyName: "myPolicy");
}

总结

  1. 4种限流方式 固定,滑动 ,令牌桶 ,并发
  2. 如何自定义限流策略,以及处理方法
  3. 原理还是通过attribute标记决定是否限流,采用哪个限流器。

参考文章 https://learn.microsoft.com/zh-cn/aspnet/core/performance/rate-limit?view=aspnetcore-7.0
大佬写的更详细 :https://www.cnblogs.com/xiaoxiaotank/p/17560251.html