31. 干货系列从零用Rust编写正反向代理,HTTP限流的实现(limit_req)

发布时间 2023-11-30 09:10:17作者: 问蒙服务框架

wmproxy

wmproxy已用Rust实现http/https代理, socks5代理, 反向代理, 静态文件服务器,四层TCP/UDP转发,七层负载均衡,内网穿透,后续将实现websocket代理等,会将实现过程分享出来,感兴趣的可以一起造个轮子

项目地址

国内: https://gitee.com/tickbh/wmproxy

github: https://github.com/tickbh/wmproxy

HTTP限流

HTTP限流是在HTTP请求处理过程中,对请求进行限制的一种技术手段。其目的是防止系统过载,保护系统的稳定性和可用性。HTTP限流可以基于不同的策略和方法,例如基于时间窗口、令牌桶、漏桶等。

常见的HTTP限流方法包括:

  • 基于时间窗口:这种方法将一段时间划分为若干个时间窗口,每个时间窗口内只允许一定数量的请求通过。例如,每秒只允许20个请求通过。
  • 令牌桶:令牌桶算法允许突发流量,只要有令牌就可以处理请求,当没有令牌时,请求就被拒绝。这种方法适用于处理突发流量的情况。和时间窗口结合时,如果当前时间段已经有20个请求,此时触发令牌桶brust,将当前的流量进行延时处理。
  • 漏桶:漏桶算法不允许突发流量,无论何时都只能按照一定的速率处理请求。这种方法适用于处理稳定流量的情况。

在进行HTTP限流时,需要考虑系统的实际情况和需求,选择合适的限流策略和方法。同时,还需要对系统的性能和负载进行充分的测试和评估,以确保系统的稳定性和可用性。

方案选择

在此项目中,选择的是基于时间窗口及令牌桶做组合使用进行限制,以下做个例子,配置

limit="rate=10r/s brust=10"

效果将是每秒钟限制10条请求,可以允许突发的10个令牌桶做一秒的延时,在下一秒允许通行。

sequenceDiagram participant C participant S C->>S: 第一秒请求数据10条 Note right of S: 当前记录请求10条 S->>C: 返回成功 C->>S: 第一秒继续请求数据1条 Note right of S: 当前记录请求10条+1条令牌桶 S->>C: 延时一秒再进行后续处理返回成功 C->>S: 第一秒继续请求数据9条 Note right of S: 当前记录请求10条+10条令牌桶 S->>C: 延时一秒再进行后续处理返回成功 C->>S: 第一秒继续请求数据(1条-N条) Note right of S: 当前记录当前秒已满,返回拒绝 S->>C: 直接返回错误,频率过快,已超时 C-->>S: 第二秒请求数据1条 Note right of S: 清除第一秒的记录10条<br>10条令牌桶转化第二秒10条记录<br>故当前为请求数10条+1条令牌桶 S->>C: 延时一秒过时行后续处理返回成功 C-->>S: 第三秒请求数据1条 Note right of S: 清除第二秒的记录10条<br>1条令牌桶转化第三秒1条记录<br>故当前为请求数1条 S->>C: 直接返回成功

以上是时序加令牌的请求数据和返回情况

限流配置

类似于nginx中的limit_req配置,分为limit_req_zonelimit_req两部分,可分为两个类,一个为zone,一个为关联到zone名称的具体项目

#[derive(Debug, Clone)]
pub struct LimitReqZone {
    /// 键值的匹配方式
    pub key: String,
    /// IP个数
    pub limit: u64,
    /// 周期内可以通行的数据
    pub nums: u64,
    /// 每个周期的时间
    pub per: Duration,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct LimitReq {
    zone: String,
    burst: u64,
}

然后在http的根目录下配置当前的zone空间,为一个HashMap结构,可以配置多种zone结构

#[serde_as]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HttpConfig {
    // ...
    #[serde_as(as = "HashMap<_, DisplayFromStr>")]
    #[serde(default = "HashMap::new")]
    pub limit_req_zone: HashMap<String, LimitReqZone>,
}

#[serde_as]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct CommonConfig {
    // ...
    #[serde_as(as = "Option<DisplayFromStr>")]
    pub limit_req: Option<LimitReq>,
}

因为并不是任何的请求都要进行限流,所以此处为Option,如果子级未配置,父级有配置,子级将会应用父级的配置。

以下展示在toml格式的配置

# 反向代理相关,七层协议为http及https
[http]

[http.limit_req_zone]
limit = "{client_ip} limit=10m rate=10r/s"
less = "{client_ip} limit=10m rate=10r/min"


# 反向代理中的具体服务,可配置多个多组
[[http.server]]
bind_addr = "0.0.0.0:82"
server_name = "soft.wm-proxy.com"
limit_req = "zone=limit brust=10"

# 按请求路径进行rule匹配,可匹配method,看具体的处理的内容如文件服务或者负载均衡
[[http.server.location]]
limit_req = "zone=less brust=1"
rule = "/root"
file_server = { browse = true }

[[http.server.location]]
rule = "/api"
file_server = { browse = true }

这样子就可以实现api不同的进行不同的限速方案,可以实现更好的通用效果。

配置解析

  • LimitReqZone解析
    需要将"{client_ip} limit=10m rate=10r/s"转成LimitReqZone结构,此处我们用的是FromStr接口,用空格分割,第一个字段为key,后续用=做分割,得取相应的值
impl FromStr for LimitReqZone {
    type Err = ProxyError;

    fn from_str(s: &str) -> Result<Self, Self::Err> {
        let v = s.split(" ").collect::<Vec<&str>>();
        let key = v[0].to_string();
        let mut limit = 0;
        let mut nums = 0;
        let mut per = Duration::new(0, 0);
        for idx in 1..v.len() {
            let key_value = v[idx].split("=").map(|k| k.trim()).collect::<Vec<&str>>();
            if key_value.len() <= 1 {
                return Err(ProxyError::Extension("未知的LimitReq"));
            }
            match key_value[0] {
                "limit" => {
                    let s = ConfigSize::from_str(key_value[1])?;
                    limit = s.0;
                }
                "rate" => {
                    let rate_key = key_value[1]
                        .split("/")
                        .map(|k| k.trim())
                        .collect::<Vec<&str>>();
                    if rate_key.len() == 1 {
                        return Err(ProxyError::Extension("未知的LimitReq"));
                    }

                    let rate = rate_key[0].trim_end_matches("r");
                    nums = rate
                        .parse::<u64>()
                        .map_err(|_e| ProxyError::Extension("parse error"))?;
                    
                    let s = ConfigDuration::from_str(rate_key[1])?;
                    per = s.0;
                }
                _ => {
                    return Err(ProxyError::Extension("未知的LimitReq"));
                }
            }
        }

        Ok(LimitReqZone::new(key, limit, nums, per))
    }
}
  • LimitReq解析

需要将"zone=less brust=1"转成LimitReq结构,此处我们用的是FromStr接口,用空格分割,将每个值用=做分割,得取相应的值

impl FromStr for LimitReq {
    type Err = ProxyError;
    fn from_str(s: &str) -> Result<Self, Self::Err> {
        let v = s.split(" ").collect::<Vec<&str>>();
        let mut zone = String::new();
        let mut brust = 0;
        for idx in 0..v.len() {
            let key_value = v[idx].split("=").map(|k| k.trim()).collect::<Vec<&str>>();
            if key_value.len() <= 1 {
                return Err(ProxyError::Extension("未知的LimitReq"));
            }
            match key_value[0] {
                "zone" => {
                    zone = key_value[1].to_string();
                }
                "brust" => {
                    brust = key_value[1]
                        .parse::<u64>()
                        .map_err(|_e| ProxyError::Extension("parse error"))?;
                }
                _ => {
                    return Err(ProxyError::Extension("未知的LimitReq"));
                }
            }
        }

        Ok(LimitReq::new(zone, brust))
    }
}

限制实现

首先我们配置一个静态可访问的全局变量,因为所有的线程操作都需要汇总到此时判定是否合格

每个命名空间里,都将存储不超过规格数据的IP,如果超过将直接返回失败

pub struct LimitReqData {
    /// 记录所有的ip数据的限制情况
    ips: HashMap<String, InnerLimit>,
    /// IP个数
    limit: u64,
    /// 周期内可以通行的数据
    nums: u64,
    /// 每个周期的时间
    per: Duration,
    /// 最后清理IP的时间
    last_remove: Instant,
}

全局静态数据

lazy_static! {
    static ref GLOABL_LIMIT_REQ: RwLock<HashMap<&'static str, LimitReqData>> =
        RwLock::new(HashMap::new());
}

返回结果

#[derive(Debug)]
pub enum LimitResult {
    Ok,
    Refuse,
    Delay(Duration),
}

所以的判断是否通过,我们将通过以下函数返回相应的结果,从而使外部的函数可以进行相应的处理。

impl LimitReqData {
    pub fn recv_new_req(key: &str, ip: &String, burst: u64) -> ProtResult<LimitResult> {
        let mut write = GLOBAL_LIMIT_REQ
            .write()
            .map_err(|_| ProtError::Extension("unlock error"))?;
        if !write.contains_key(&*key) {
            return Ok(LimitResult::Ok);
        }
        write.get_mut(key).unwrap().inner_recv_new_req(ip, burst)
    }
}

小结

我们通过全局共享数据,需要加锁获取该数据,来判定整体的KEY的流量情况,可能是IP,可能是IP+Cookie等,来灵活的针对用户限流还是针对IP限流或者其它的业务情况进行合理的安排。

点击 [关注][在看][点赞] 是对作者最大的支持