WorkQueue称为工作队列,Kubernetes的WorkQueue队列与普通FIFO(先进先出,First-In,First-Out)队列相比,实现略显复杂,它的主要功能在于标记和去重,并支持如下特性。
● 有序:按照添加顺序处理元素(item)。
● 去重:相同元素在同一时间不会被重复处理,例如一个元素在处理之前被添加了多次,它只会被处理一次。
● 并发性:多生产者和多消费者。
● 标记机制:支持标记功能,标记一个元素是否被处理,也允许元素在处理时重新排队。
● 通知机制:ShutDown方法通过信号量通知队列不再接收新的元素,并通知metric goroutine退出。
● 延迟:支持延迟队列,延迟一段时间后再将元素存入队列。
● 限速:支持限速队列,元素存入队列时进行速率限制。限制一个元素被重新排队(Reenqueued)的次数。
● Metric:支持metric监控指标,可用于Prometheus监控。
WorkQueue支持3种队列,并提供了3种接口,不同队列实现可应对不同的使用场景,分别介绍如下。
● Interface:FIFO队列接口,先进先出队列,并支持去重机制。
● DelayingInterface:延迟队列接口,基于Interface接口封装,延迟一段时间后再将元素存入队列。
● RateLimitingInterface:限速队列接口,基于DelayingInterface接口封装,支持元素存入队列时进行速率限制。
Interface
Interface是FIFO队列,是最基础的队列,限速和延迟队列都是基于它来实现的。其提供一下方法(源码:staging\src\k8s.io\client-go\util\workqueue\queue.go)
type Interface interface {Add(item interface{})Len() intGet() (item interface{}, shutdown bool)Done(item interface{})ShutDown()ShuttingDown() bool}
其中:
- Add():向队列中添加元素
- Len():统计队列的长度
- Get():从队列中取第一个元素
- Done():标记已被处理的元素
- ShutDown():关闭队列
- ShuttingDown():查询队列是否关闭
FIFO的数据结构定义如下(源码:staging\src\k8s.io\client-go\util\workqueue\queue.go):
// Type is a work queue (see the package comment).type Type struct {queue []tdirty setprocessing setcond *sync.CondshuttingDown boolmetrics queueMetricsunfinishedWorkUpdatePeriod time.Durationclock clock.Clock}type empty struct{}type t interface{}type set map[t]empty
其中最主要的字段是queue、dirty、processing。它们的作用分别是:
- queue:存放数据的队列
- dirty:除了能保证去重,还能保证在处理一个元素之前哪怕其被添加了多次(并发情况下),但也只会被处理一次
- processing:标记一个元素是否正在被处理
从上面可以看到queue是slice结构,可以存放任何数据。
其主要流程如下:
(1)使用Add()方法将元素添加进队列,源码如下:
func (q *Type) Add(item interface{}) {q.cond.L.Lock()defer q.cond.L.Unlock()if q.shuttingDown {return}if q.dirty.has(item) {return}q.metrics.add(item)q.dirty.insert(item)if q.processing.has(item) {return}q.queue = append(q.queue, item)q.cond.Signal()}
首先判断这个队列是否存在,其次判断元素是否在dirty中,如果在就不添加了,如果不在就先加入dirty中,然后再判断在processing中是否有相同元素在进行处理,如果有就不添加进queue,如果没有再添加进queue。
(2)使用Get()方法从队列中取第一个元素,源码如下:
func (q *Type) Get() (item interface{}, shutdown bool) {q.cond.L.Lock()defer q.cond.L.Unlock()for len(q.queue) == 0 && !q.shuttingDown {q.cond.Wait()}if len(q.queue) == 0 {// We must be shutting down.return nil, true}item, q.queue = q.queue[0], q.queue[1:]q.metrics.get(item)q.processing.insert(item)q.dirty.delete(item)return item, false}
首先判断队列中是否有元素并且队列是存在的,其次从queue中取第一个元素并更新queue,然后将取出来的元素插入processing中,最后再从dirty中删除这个元素。
(3)使用Done()方法移除已经被处理的元素,源码如下:
func (q *Type) Done(item interface{}) {q.cond.L.Lock()defer q.cond.L.Unlock()q.metrics.done(item)q.processing.delete(item)if q.dirty.has(item) {q.queue = append(q.queue, item)q.cond.Signal()}}
如果该元素被处理完了,直接从processing中删除,不过这里还有一个动作,就是我们第一步Add中提到了,如果一个新加入的元素是正在processing中处理的元素会被暂放到dirty中而不直接放如queue,那么在这里如果这个元素被处理完了,并且dirty中仍有这个元素,则将这个元素加入到queue中。
(4)使用ShutDown()方法关闭队列,源码如下:
func (q *Type) ShutDown() {q.cond.L.Lock()defer q.cond.L.Unlock()q.shuttingDown = trueq.cond.Broadcast()}
关闭很简单,就是将这个队列标记一下。
DelayingInterface
DelayingInterface就是延迟队列,主要基于Interface进行了封装,在其基础上新增了延迟处理方法AddAfter(),定义如下(源码:staging\src\k8s.io\client-go\util\workqueue\delaying_queue.go):
type DelayingInterface interface {InterfaceAddAfter(item interface{}, duration time.Duration)}
AddAfter包含了两个参数,一个是待加入的元素,一个时间,这个时间就是延迟时间。其方法实现的代码如下:
func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {// don't add if we're already shutting downif q.ShuttingDown() {return}q.metrics.retry()// immediately add things with no delayif duration <= 0 {q.Add(item)return}select {case <-q.stopCh:// unblock if ShutDown() is calledcase q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:}}
首先判断队列是否存在,然后判断duration时间是否小于等于0,如果是则直接加入队列,如果不是这将其发送给waitingForAddCh。
其数据结构定义如下:
type delayingType struct {Interfaceclock clock.ClockstopCh chan struct{}stopOnce sync.Onceheartbeat clock.TickerwaitingForAddCh chan *waitFormetrics retryMetrics}
其中最主要的字段是waitingForAddCh,其默认值为1000,初始化代码如下:
func newDelayingQueue(clock clock.Clock, q Interface, name string) *delayingType {ret := &delayingType{Interface: q,clock: clock,heartbeat: clock.NewTicker(maxWait),stopCh: make(chan struct{}),waitingForAddCh: make(chan *waitFor, 1000),metrics: newRetryMetrics(name),}go ret.waitingLoop()return ret}
这个默认值1000代表的是只有当插入的元素大于1000的时候才阻塞。waitingForAddCh字段中的数据通过goroutine运行的waitingLoop函数持久运行,在初始化的时候可以看到其运行它的程序go ret.waitingLoop()。waitingLoop()的代码如下:
func (q *delayingType) waitingLoop() {defer utilruntime.HandleCrash()// Make a placeholder channel to use when there are no items in our listnever := make(<-chan time.Time)// Make a timer that expires when the item at the head of the waiting queue is readyvar nextReadyAtTimer clock.TimerwaitingForQueue := &waitForPriorityQueue{}heap.Init(waitingForQueue)waitingEntryByData := map[t]*waitFor{}for {if q.Interface.ShuttingDown() {return}now := q.clock.Now()// Add ready entriesfor waitingForQueue.Len() > 0 {entry := waitingForQueue.Peek().(*waitFor)if entry.readyAt.After(now) {break}entry = heap.Pop(waitingForQueue).(*waitFor)q.Add(entry.data)delete(waitingEntryByData, entry.data)}// Set up a wait for the first item's readyAt (if one exists)nextReadyAt := neverif waitingForQueue.Len() > 0 {if nextReadyAtTimer != nil {nextReadyAtTimer.Stop()}entry := waitingForQueue.Peek().(*waitFor)nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now))nextReadyAt = nextReadyAtTimer.C()}select {case <-q.stopCh:returncase <-q.heartbeat.C():// continue the loop, which will add ready itemscase <-nextReadyAt:// continue the loop, which will add ready itemscase waitEntry := <-q.waitingForAddCh:if waitEntry.readyAt.After(q.clock.Now()) {insert(waitingForQueue, waitingEntryByData, waitEntry)} else {q.Add(waitEntry.data)}drained := falsefor !drained {select {case waitEntry := <-q.waitingForAddCh:if waitEntry.readyAt.After(q.clock.Now()) {insert(waitingForQueue, waitingEntryByData, waitEntry)} else {q.Add(waitEntry.data)}default:drained = true}}}}}
使用一个for循环来处理延迟数据,从队列中取出数据以及时间参数,当元素的延迟时间不大于当前时间时,说明还需要延迟将元素插入FIFO队列的时间,此时将该元素放入优先队列(waitForPriorityQueue)中。当元素的延迟时间大于当前时间时,则将该元素插入FIFO队列中。另外,还会遍历优先队列(waitForPriorityQueue)中的元素,按照上述逻辑验证时间。
RateLimitingInterface
RateLimitingInterface是限速队列,基于延迟队列和FIFO队列接口封装,在原有功能上增加了AddRateLimited、Forget、NumRequeues方法。限速队列的重点不在于RateLimitingInterface接口,而在于它提供的4种限速算法接口(RateLimiter)。其原理是,限速队列利用延迟队列的特性,延迟某个元素的插入时间,达到限速目的。
RateLimitingInterface的方法接口如下(源码:staging\src\k8s.io\client-go\util\workqueue\rate_limiting_queue.go):
type RateLimitingInterface interface {DelayingInterfaceAddRateLimited(item interface{})Forget(item interface{})NumRequeues(item interface{}) int}
数据结构定义如下:
type rateLimitingType struct {DelayingInterfacerateLimiter RateLimiter}
rateLimitingType就定义了两个,一个是继承了延迟队列接口,另一个是RateLimiter具体的限速算法。
AddRateLimited、Forget、NumRequeues方法的实现代码如下:
func (q *rateLimitingType) AddRateLimited(item interface{}) {q.DelayingInterface.AddAfter(item, q.rateLimiter.When(item))}func (q *rateLimitingType) NumRequeues(item interface{}) int {return q.rateLimiter.NumRequeues(item)}func (q *rateLimitingType) Forget(item interface{}) {q.rateLimiter.Forget(item)}
AddRateLimited是通过DelayingInterface.AddAfter来实现的,NumRequeues和Forget都是调用了rateLimiter的方法。
RateLimiter接口的定义如下:
type RateLimiter interface {When(item interface{}) time.DurationForget(item interface{})NumRequeues(item interface{}) int}
其中:
- When:获取元素的等待时间
- Forget:释放指定元素
- NumRequeues:获取指定元素的排队数
注意:这里有一个非常重要的概念——限速周期,一个限速周期是指从执行AddRateLimited方法到执行完Forget方法之间的时间。如果该元素被Forget方法处理完,则清空排队数。
RateLimiter中有四种限速算法,分别是:
- 令牌桶算法(BucketRateLimiter)。
- 排队指数算法(ItemExponentialFailureRateLimiter)。
- 计数器算法(ItemFastSlowRateLimiter)。
- 混合模式(MaxOfRateLimiter),将多种限速算法混合使用。
令牌桶算法(BucketRateLimiter)
令牌桶算法内部实现了一个存放token(令牌)的“桶”,初始时“桶”是空的,token会以固定速率往“桶”里填充,直到将其填满为止,多余的token会被丢弃。每个元素都会从令牌桶得到一个token,只有得到token的元素才允许通过(accept),而没有得到token的元素处于等待状态。令牌桶算法通过控制发放token来达到限速目的。
在初始化的时候会给令牌桶设置一个默认值,代码如下:
func DefaultControllerRateLimiter() RateLimiter {return NewMaxOfRateLimiter(NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),// 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)&BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},)}
通过rate.NewLimiter(rate.Limit(10), 100)来实例化令牌桶的,实例化的时候传递了两个参数,NewLimiter的代码如下:
func NewLimiter(r Limit, b int) *Limiter {return &Limiter{limit: r,burst: b,}}
其中:
- r表示qps。每秒往桶里放token的数量
- b表示令牌桶的大小,也就是总共可以存放多少token
在工作中会通过r.Limiter.Reserve().Delay()来返回指定元素的等待时间。假设在一个限速周期内插入了1000个元素,通过r.Limiter.Reserve().Delay函数返回指定元素应该等待的时间,那么前b(即100)个元素会被立刻处理,而后面元素的延迟时间分别为item100/100ms、item101/200ms、item102/300ms、item103/400ms,以此类推。
排队指数算法(ItemExponentialFailureRateLimiter)
排队指数算法是将相同元素得排队数,相同元素越多,排队数越大,相应的速率也会增长,但是不会超过maxDelay。
ItemExponentialFailureRateLimiter的数据结构定义如下(源码:staging\src\k8s.io\client-go\util\workqueue\default_rate_limiters.go):
type ItemExponentialFailureRateLimiter struct {failuresLock sync.Mutexfailures map[interface{}]intbaseDelay time.DurationmaxDelay time.Duration}
其中:
- failures:用于统计排队数
- baseDelay:最初的限速单位
- maxDelay:最大的限速单位
限速队列利用延迟队列的特性,延迟多个相同元素的插入时间,达到限速目的。
在同一限速周期内,如果不存在相同元素,那么所有元素的延迟时间为baseDelay;而在同一限速周期内,如果存在相同元素,那么相同元素的延迟时间呈指数级增长,最长延迟时间不超过maxDelay。
核心实现代码块如下:
func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration {r.failuresLock.Lock()defer r.failuresLock.Unlock()exp := r.failures[item]r.failures[item] = r.failures[item] + 1// The backoff is capped such that 'calculated' value never overflows.backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp))if backoff > math.MaxInt64 {return r.maxDelay}calculated := time.Duration(backoff)if calculated > r.maxDelay {return r.maxDelay}return calculated}
每当通过AddAfter添加一个元素,排队数failures通过r.failures[item] = r.failures[item] + 1进行加1操作,然后计算延迟时间,不得超出maxDelay。
我们假定baseDelay是1time.Millisecond,maxDelay是1000time.Second。假设在一个限速周期内通过AddRateLimited方法插入10个相同元素,那么第1个元素会通过延迟队列的AddAfter方法插入并设置延迟时间为1ms(即baseDelay),第2个相同元素的延迟时间为2ms,第3个相同元素的延迟时间为4ms,第4个相同元素的延迟时间为8ms,第5个相同元素的延迟时间为16ms……第10个相同元素的延迟时间为512ms,最长延迟时间不超过1000s(即maxDelay)。
计数器算法(ItemFastSlowRateLimiter)
计数器算法限制一段时间内允许通过的元素数量,例如在1分钟内只允许通过100个元素,每插入一个元素,计数器自增1,当计数器数到100的阈值且还在限速周期内时,则不允许元素再通过。但WorkQueue在此基础上扩展了fast和slow速率。也就是超出了指定的元素数量不是丢弃,而是指定slow速率。
ItemFastSlowRateLimiter的数据结构定义如下(源码:staging\src\k8s.io\client-go\util\workqueue\default_rate_limiters.go):
type ItemFastSlowRateLimiter struct {failuresLock sync.Mutexfailures map[interface{}]intmaxFastAttempts intfastDelay time.DurationslowDelay time.Duration}
其中:
- failures:用于统计排队数,每新增一个,则加1
- maxFastAttempts:控制速率是fast的数量,超出该数速率则变为slow
- fastDelay:fast的速率
- slowDelay:slow的速率
核心代码如下:
func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration {r.failuresLock.Lock()defer r.failuresLock.Unlock()r.failures[item] = r.failures[item] + 1if r.failures[item] <= r.maxFastAttempts {return r.fastDelay}return r.slowDelay}
就是新进一个元素,排队数加1,然后排队数和maxFastAttempts进行对比,如果小于则返回fastDelay,如果大于则返回slowDelay。
假设fastDelay是5time.Millisecond,slowDelay是10time.Second,maxFastAttempts是3。在一个限速周期内通过AddRateLimited方法插入4个相同的元素,那么前3个元素使用fastDelay定义的fast速率,当触发maxFastAttempts字段时,第4个元素使用slowDelay定义的slow速率。
混合模式(MaxOfRateLimiter)
混合模式是将多种限速算法混合使用,即多种限速算法同时生效。例如,同时使用排队指数算法和令牌桶算法。比如默认就同时时候排队算法和令牌桶算法,如下:
func DefaultControllerRateLimiter() RateLimiter {return NewMaxOfRateLimiter(NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),// 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)&BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},)}
其数据结构定义如下:
type MaxOfRateLimiter struct {limiters []RateLimiter}
是一个RateLimiter的slice类型,可以存放多种算法。然后在获取元素延迟时间的时候也是循环这个slice进行获取,如下:
func (r *MaxOfRateLimiter) When(item interface{}) time.Duration {ret := time.Duration(0)for _, limiter := range r.limiters {curr := limiter.When(item)if curr > ret {ret = curr}}return ret}
