k8s限速队列

发布时间 2023-09-16 16:14:19作者: 王景迁

channel问题

channel是go协程间通信的主要方式。
channel预设容量,很难评估,不支持动态扩容。
k8s的client-go提供了基于切片的线程安全的并发队列,解耦生产者与消费者,提供了去重、限速、重试加入队列等功能。

k8s controller处理事件官方例子

生产者

// 创建一个work queue
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
// 将一个待处理的对象添加work queue
queue.Add(key)

消费者

// workers代表处理并发处work queue中任务的协程数
for i := 0; i < workers; i++ {
    go wait.Until(c.runWorker, time.Second, stopCh)
}
// runWorker是一个死循环,通过processNextItem从队列中取出key进行处理,然后取出next key继续处理
func (c *Controller) runWorker() {
    for c.processNextItem() {
    }
}
// processNextItem就是真正的处理逻辑了,
func (c *Controller) processNextItem() bool {
    // quit为true代表队列已关闭
    key, quit := c.queue.Get()
    if quit {
        return false
    }
    
    // 处理完后将key从队列中移除,并且该操作是并发安全的,后续会进行详细分析
    defer c.queue.Done(key)
    
    // 调用实际业务代码对每个key进行处理
    err := c.syncToStdout(key.(string))
    // 如果处理异常则进行错误处理
    c.handleErr(err, key)
    return true
}
// 错误处理代码,对上一函数中可能处理失败的key进行重试等操作
func (c *Controller) handleErr(err error, key interface{}) {
    if err == nil {
        // 如果处理没有错误,调用Forget方法将key从限速队列中移除
        c.queue.Forget(key)
        return
    }
    
    // NumRequeues方法返回了一个key重入队列的次数,若超过设定的阈值,则从限速队列中移除,不再处理
    if c.queue.NumRequeues(key) < 5 {
        klog.Infof("Error syncing pod %v: %v", key, err)
    
        // 若重入队列次数没超过阈值,则添加到限速队列
        c.queue.AddRateLimited(key)
        return
    }
    
    c.queue.Forget(key)
    // 错误上报
    runtime.HandleError(err)
}

限速队列由多个队列一起完成。

基本队列work queue

接口

type Interface interface {
    // 添加元素
    Add(item interface{})
    // 队列长度
    Len() int
    // 返回队首元素,并返队列队列是否关闭
    Get() (item interface{}, shutdown bool)
    // 处理完一个元素后从队列中删除
    Done(item interface{})
    // 关闭队列
    ShutDown()
    // 返回队列是否关闭
    ShuttingDown() bool
}

实现

type Type struct {
    // interface{}类型的切片,存储具体的key
    queue []t
    // 存储需要被处理的元素
    dirty set
    // 存储正在处理的元素
    processing set
    // 用于唤醒其他协程队列已满足条件可以继续处理,如队列由空变为非空
    cond *sync.Cond
    // 标记队列是否关闭
    shuttingDown bool
    
    // 监控指标相关字段
    metrics queueMetrics
    unfinishedWorkUpdatePeriod time.Duration
    clock                      clock.Clock
}

shutdown标记通知消费者、监控组件等队列是否关闭。

Add方法

func (q *Type) Add(item interface{}) {
    // 加锁互斥保证线程安全
    q.cond.L.Lock()
    defer q.cond.L.Unlock()
    if q.shuttingDown {
        return
    }
    // 去重
    if q.dirty.has(item) {
        return
    }
    
    // 记录监控指标
    q.metrics.add(item)
    
    q.dirty.insert(item)
    // 当有相同元素正在处理时,同样进行去重操作,不予入队
    if q.processing.has(item) {
        return
    }
    
    // 最终添加到队列
    q.queue = append(q.queue, item)
    q.cond.Signal()
}

Add方法会先把key放入dirty和queue中,去重;若该key正在被处理,则不会加入,防止业务错误。

Get

func (q *Type) Get() (item interface{}, shutdown bool) {
    q.cond.L.Lock()
    defer q.cond.L.Unlock()
    // 如果队列元素为空且没有关闭则等待其他goroutine唤醒
    for len(q.queue) == 0 && !q.shuttingDown {
        q.cond.Wait()
    }
    // 若已经关闭则应该返回
    if len(q.queue) == 0 {
        // We must be shutting down.
        return nil, true
    }
    
    // 从queue字段取出一个元素
    item, q.queue = q.queue[0], q.queue[1:]
    
    // 监控相关
    q.metrics.get(item)
    
    // 把key加入processing集合并从dirty删除,这样相同的key可以在当前key处理完后继续入队处理
    q.processing.insert(item)
    q.dirty.delete(item)
    
    return item, false
}

key从dirty集合和queue中移除,放入processing。

Done & ShutDown

func (q *Type) Done(item interface{}) {
    q.cond.L.Lock()
    defer q.cond.L.Unlock()
    
    q.metrics.done(item)
    
    q.processing.delete(item)
    // 若key在处理的过程中,又再次被加入到队列,由Add方法可知,当key在processing中时,Add操作只是把key放到了dirty集合,并没有放入queue中,因此
    // 相同的key处理完从processing中移除后,需要把key再放入到queue中,防止key被遗漏
    if q.dirty.has(item) {
        q.queue = append(q.queue, item)
        q.cond.Signal()
    }
}

func (q *Type) ShutDown() {
    q.cond.L.Lock()
    defer q.cond.L.Unlock()
    q.shuttingDown = true
    q.cond.Broadcast()
}

通过dirty和processing两个集合,work queue实现了去重,防止了相同key被同时处理的错误。

延迟队列DelayingQueue

接口

type DelayingInterface interface {
    Interface
    // 经过duration时间后item被重新加入队列
    AddAfter(item interface{}, duration time.Duration)
}

延迟队列在work queue基础上实现,继承了Interface接口,多了AddAfter方法,通过设置指定的duration来达到限速的目的。

实现

func newDelayingQueue(clock clock.Clock, q Interface, name string) *delayingType {
    ret := &delayingType{
        Interface:       q,
        clock:           clock,
        heartbeat:       clock.NewTicker(maxWait),
        stopCh:          make(chan struct{}),
        waitingForAddCh: make(chan *waitFor, 1000),
        metrics:         newRetryMetrics(name),
    }
    
    go ret.waitingLoop()
    return ret
}

newDelayingQueue返回一个delayingType类型的限速队列,启动一个waitingLoop协程处理被添加的key。

AddAfter

func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
    if q.ShuttingDown() {
        return
    }
    
    q.metrics.retry()
    
    // 没有延迟直接加入queue
    if duration <= 0 {
        q.Add(item)
        return
    }
    
    select {
    case <-q.stopCh:
        // 将封装后的key放入waitingForAddCh channel
    case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:
    }
}

将key和延迟时间封装成一个waitFor struct,readyAt是key应该加入到队列的时间,放入到waitingForAddCh中,waitingLoop协程会异步处理。默认waitingForAddCh的大小为1000,当channel满时添加key会被block。

waitingLoop

func (q *delayingType) waitingLoop() {
    defer utilruntime.HandleCrash()
    
    never := make(<-chan time.Time)
    
    // 记录等待队列中第一个key需要的等待的时间
    var nextReadyAtTimer clock.Timer
    
    //  基于堆实现的优先级队列,需要最早被加入到队列的key放在最前面
    waitingForQueue := &waitForPriorityQueue{}
    heap.Init(waitingForQueue)
    
    waitingEntryByData := map[t]*waitFor{}
    
    for {
        if q.Interface.ShuttingDown() {
            return
        }
    
        now := q.clock.Now()
    
        // 判断堆顶元素是否到期需要加入队列
        for waitingForQueue.Len() > 0 {
            entry := waitingForQueue.Peek().(*waitFor)
            // 如果还没到期,继续等待或者继续监听后续key加入事件
            if entry.readyAt.After(now) {
                break
            }
    
            // 从堆顶弹出元素添加到队列
            entry = heap.Pop(waitingForQueue).(*waitFor)
            q.Add(entry.data)
            delete(waitingEntryByData, entry.data)
        }
    
        nextReadyAt := never
        if waitingForQueue.Len() > 0 {
            if nextReadyAtTimer != nil {
                nextReadyAtTimer.Stop()
            }
            entry := waitingForQueue.Peek().(*waitFor)
            nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now))
            nextReadyAt = nextReadyAtTimer.C()
        }
    
        select {
        case <-q.stopCh:
            return
        // 等待心跳时间过期
        case <-q.heartbeat.C():
            
        // 等待对顶元素时间过期
        case <-nextReadyAt:
            
        // 从waitingForAddCh中取元素,若已经到期直接加入到队列,否则加入堆中等待处理
        case waitEntry := <-q.waitingForAddCh:
            if waitEntry.readyAt.After(q.clock.Now()) {
                insert(waitingForQueue, waitingEntryByData, waitEntry)
            } else {
                q.Add(waitEntry.data)
            }
    
            // 尝试再取一个
            drained := false
            for !drained {
                select {
                case waitEntry := <-q.waitingForAddCh:
                    if waitEntry.readyAt.After(q.clock.Now()) {
                        insert(waitingForQueue, waitingEntryByData, waitEntry)
                    } else {
                        q.Add(waitEntry.data)
                    }
                default:
                    drained = true
                }
            }
        }
    }
}

监听waitingForAddCh channel,取出待添加到队列的key,如果已经到期则直接加入,否则将key放入到堆中,然后每次从堆中取出最先过期的key进行判断处理。

限速队列RateLimitingQueue

接口

type RateLimitingInterface interface {
    DelayingInterface
    
    // 限速器rate limiter指定的时间到期后将item加入队列
    AddRateLimited(item interface{})
    
    // 将item从限速器删除,不再进行重试加入,但还是需要调用Done方法删除item
    Forget(item interface{})
    
    // 返回item被重新入队列的次数
    NumRequeues(item interface{}) int
}

RateLimitingInterface是在DelayingInterface的基础上多了三个方法,调用NewNamedRateLimitingQueue方法传入RateLimiter,调用时可以传入不同的限速器ratelimiter实现,官方提供了四种rate Limiter实现,分别是BucketRateLimiter、ItemExponentialFailureRateLimiter、ItemFastSlowRateLimiter和MaxOfRateLimiter。
k8s中默认的控制器限速器初始化使用混合限速器。

RateLimiter需要实现三个方法

type RateLimiter interface {
   // 返回key需要等待加入队列的时间
   When(item interface{}) time.Duration
   // 取消key的重试
   Forget(item interface{})
   // 记录一个key被重试了多少次
   NumRequeues(item interface{}) int
}

BucketRateLimiter

基于token令牌桶的限速方法,通过三方库golang.org/x/time/rate实现。
rate.NewLimiter(rate.Limit(10), 100)
10代表每秒往桶中放入token的数量,100代表初始化token数量,前100个元素直接通过,第101个元素等待100ms,第102个元素等待200ms。

ItemExponentialFailureRateLimiter

指数退避算法,有两个主要参数baseDelay和maxDelay。baseDelay表示推迟的基数,每次添加相同的key对应的延迟加入时间会指数递增;maxDelay表示延迟时间的上限。

// 每次进来exp指数加1
exp := r.failures[item]
r.failures[item] = r.failures[item] + 1

backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp))
if backoff > math.MaxInt64 {
   return r.maxDelay
}

baseDelay是10,maxDelay是1000,同一个key第一次进行需要等待的时间为10*2^1,第二次为10*2^2,以此类推。

ItemFastSlowRateLimiter

定义了两个时间fastDelay、lowDelay以及达到fastDelay的阈值maxFastAttempts。

r.failures[item] = r.failures[item] + 1

if r.failures[item] <= r.maxFastAttempts {
   return r.fastDelay
}
return r.slowDelay

当重新加入队列的次数小于阈值maxFastAttempts,需要等待的时间为fastDelay,超过阈值则需要等待更长的时间slowDelay。

MaxOfRateLimiter

MaxOfRateLimiter则是多个RateLimiter的组合,需要延迟的时间为各个RateLimiter的时间最大值。

参考资料

https://monokaix.github.io/2021/09/12/workqueue/