build source
This commit is contained in:
commit
ee1fec43ed
4171 changed files with 1351288 additions and 0 deletions
295
vendor/k8s.io/client-go/util/workqueue/default_rate_limiters.go
generated
vendored
Normal file
295
vendor/k8s.io/client-go/util/workqueue/default_rate_limiters.go
generated
vendored
Normal file
|
|
@ -0,0 +1,295 @@
|
|||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package workqueue
|
||||
|
||||
import (
|
||||
"math"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"golang.org/x/time/rate"
|
||||
)
|
||||
|
||||
// Deprecated: RateLimiter is deprecated, use TypedRateLimiter instead.
|
||||
type RateLimiter TypedRateLimiter[any]
|
||||
|
||||
type TypedRateLimiter[T comparable] interface {
|
||||
// When gets an item and gets to decide how long that item should wait
|
||||
When(item T) time.Duration
|
||||
// Forget indicates that an item is finished being retried. Doesn't matter whether it's for failing
|
||||
// or for success, we'll stop tracking it
|
||||
Forget(item T)
|
||||
// NumRequeues returns back how many failures the item has had
|
||||
NumRequeues(item T) int
|
||||
}
|
||||
|
||||
// DefaultControllerRateLimiter is a no-arg constructor for a default rate limiter for a workqueue. It has
|
||||
// both overall and per-item rate limiting. The overall is a token bucket and the per-item is exponential
|
||||
//
|
||||
// Deprecated: Use DefaultTypedControllerRateLimiter instead.
|
||||
func DefaultControllerRateLimiter() RateLimiter {
|
||||
return DefaultTypedControllerRateLimiter[any]()
|
||||
}
|
||||
|
||||
// DefaultTypedControllerRateLimiter is a no-arg constructor for a default rate limiter for a workqueue. It has
|
||||
// both overall and per-item rate limiting. The overall is a token bucket and the per-item is exponential
|
||||
func DefaultTypedControllerRateLimiter[T comparable]() TypedRateLimiter[T] {
|
||||
return NewTypedMaxOfRateLimiter(
|
||||
NewTypedItemExponentialFailureRateLimiter[T](5*time.Millisecond, 1000*time.Second),
|
||||
// 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
|
||||
&TypedBucketRateLimiter[T]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
|
||||
)
|
||||
}
|
||||
|
||||
// Deprecated: BucketRateLimiter is deprecated, use TypedBucketRateLimiter instead.
|
||||
type BucketRateLimiter = TypedBucketRateLimiter[any]
|
||||
|
||||
// TypedBucketRateLimiter adapts a standard bucket to the workqueue ratelimiter API
|
||||
type TypedBucketRateLimiter[T comparable] struct {
|
||||
*rate.Limiter
|
||||
}
|
||||
|
||||
var _ RateLimiter = &BucketRateLimiter{}
|
||||
|
||||
func (r *TypedBucketRateLimiter[T]) When(item T) time.Duration {
|
||||
return r.Limiter.Reserve().Delay()
|
||||
}
|
||||
|
||||
func (r *TypedBucketRateLimiter[T]) NumRequeues(item T) int {
|
||||
return 0
|
||||
}
|
||||
|
||||
func (r *TypedBucketRateLimiter[T]) Forget(item T) {
|
||||
}
|
||||
|
||||
// Deprecated: ItemExponentialFailureRateLimiter is deprecated, use TypedItemExponentialFailureRateLimiter instead.
|
||||
type ItemExponentialFailureRateLimiter = TypedItemExponentialFailureRateLimiter[any]
|
||||
|
||||
// TypedItemExponentialFailureRateLimiter does a simple baseDelay*2^<num-failures> limit
|
||||
// dealing with max failures and expiration are up to the caller
|
||||
type TypedItemExponentialFailureRateLimiter[T comparable] struct {
|
||||
failuresLock sync.Mutex
|
||||
failures map[T]int
|
||||
|
||||
baseDelay time.Duration
|
||||
maxDelay time.Duration
|
||||
}
|
||||
|
||||
var _ RateLimiter = &ItemExponentialFailureRateLimiter{}
|
||||
|
||||
// Deprecated: NewItemExponentialFailureRateLimiter is deprecated, use NewTypedItemExponentialFailureRateLimiter instead.
|
||||
func NewItemExponentialFailureRateLimiter(baseDelay time.Duration, maxDelay time.Duration) RateLimiter {
|
||||
return NewTypedItemExponentialFailureRateLimiter[any](baseDelay, maxDelay)
|
||||
}
|
||||
|
||||
func NewTypedItemExponentialFailureRateLimiter[T comparable](baseDelay time.Duration, maxDelay time.Duration) TypedRateLimiter[T] {
|
||||
return &TypedItemExponentialFailureRateLimiter[T]{
|
||||
failures: map[T]int{},
|
||||
baseDelay: baseDelay,
|
||||
maxDelay: maxDelay,
|
||||
}
|
||||
}
|
||||
|
||||
// Deprecated: DefaultItemBasedRateLimiter is deprecated, use DefaultTypedItemBasedRateLimiter instead.
|
||||
func DefaultItemBasedRateLimiter() RateLimiter {
|
||||
return DefaultTypedItemBasedRateLimiter[any]()
|
||||
}
|
||||
|
||||
func DefaultTypedItemBasedRateLimiter[T comparable]() TypedRateLimiter[T] {
|
||||
return NewTypedItemExponentialFailureRateLimiter[T](time.Millisecond, 1000*time.Second)
|
||||
}
|
||||
|
||||
func (r *TypedItemExponentialFailureRateLimiter[T]) When(item T) time.Duration {
|
||||
r.failuresLock.Lock()
|
||||
defer r.failuresLock.Unlock()
|
||||
|
||||
exp := r.failures[item]
|
||||
r.failures[item] = r.failures[item] + 1
|
||||
|
||||
// The backoff is capped such that 'calculated' value never overflows.
|
||||
backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp))
|
||||
if backoff > math.MaxInt64 {
|
||||
return r.maxDelay
|
||||
}
|
||||
|
||||
calculated := time.Duration(backoff)
|
||||
if calculated > r.maxDelay {
|
||||
return r.maxDelay
|
||||
}
|
||||
|
||||
return calculated
|
||||
}
|
||||
|
||||
func (r *TypedItemExponentialFailureRateLimiter[T]) NumRequeues(item T) int {
|
||||
r.failuresLock.Lock()
|
||||
defer r.failuresLock.Unlock()
|
||||
|
||||
return r.failures[item]
|
||||
}
|
||||
|
||||
func (r *TypedItemExponentialFailureRateLimiter[T]) Forget(item T) {
|
||||
r.failuresLock.Lock()
|
||||
defer r.failuresLock.Unlock()
|
||||
|
||||
delete(r.failures, item)
|
||||
}
|
||||
|
||||
// ItemFastSlowRateLimiter does a quick retry for a certain number of attempts, then a slow retry after that
|
||||
// Deprecated: Use TypedItemFastSlowRateLimiter instead.
|
||||
type ItemFastSlowRateLimiter = TypedItemFastSlowRateLimiter[any]
|
||||
|
||||
// TypedItemFastSlowRateLimiter does a quick retry for a certain number of attempts, then a slow retry after that
|
||||
type TypedItemFastSlowRateLimiter[T comparable] struct {
|
||||
failuresLock sync.Mutex
|
||||
failures map[T]int
|
||||
|
||||
maxFastAttempts int
|
||||
fastDelay time.Duration
|
||||
slowDelay time.Duration
|
||||
}
|
||||
|
||||
var _ RateLimiter = &ItemFastSlowRateLimiter{}
|
||||
|
||||
// Deprecated: NewItemFastSlowRateLimiter is deprecated, use NewTypedItemFastSlowRateLimiter instead.
|
||||
func NewItemFastSlowRateLimiter(fastDelay, slowDelay time.Duration, maxFastAttempts int) RateLimiter {
|
||||
return NewTypedItemFastSlowRateLimiter[any](fastDelay, slowDelay, maxFastAttempts)
|
||||
}
|
||||
|
||||
func NewTypedItemFastSlowRateLimiter[T comparable](fastDelay, slowDelay time.Duration, maxFastAttempts int) TypedRateLimiter[T] {
|
||||
return &TypedItemFastSlowRateLimiter[T]{
|
||||
failures: map[T]int{},
|
||||
fastDelay: fastDelay,
|
||||
slowDelay: slowDelay,
|
||||
maxFastAttempts: maxFastAttempts,
|
||||
}
|
||||
}
|
||||
|
||||
func (r *TypedItemFastSlowRateLimiter[T]) When(item T) time.Duration {
|
||||
r.failuresLock.Lock()
|
||||
defer r.failuresLock.Unlock()
|
||||
|
||||
r.failures[item] = r.failures[item] + 1
|
||||
|
||||
if r.failures[item] <= r.maxFastAttempts {
|
||||
return r.fastDelay
|
||||
}
|
||||
|
||||
return r.slowDelay
|
||||
}
|
||||
|
||||
func (r *TypedItemFastSlowRateLimiter[T]) NumRequeues(item T) int {
|
||||
r.failuresLock.Lock()
|
||||
defer r.failuresLock.Unlock()
|
||||
|
||||
return r.failures[item]
|
||||
}
|
||||
|
||||
func (r *TypedItemFastSlowRateLimiter[T]) Forget(item T) {
|
||||
r.failuresLock.Lock()
|
||||
defer r.failuresLock.Unlock()
|
||||
|
||||
delete(r.failures, item)
|
||||
}
|
||||
|
||||
// MaxOfRateLimiter calls every RateLimiter and returns the worst case response
|
||||
// When used with a token bucket limiter, the burst could be apparently exceeded in cases where particular items
|
||||
// were separately delayed a longer time.
|
||||
//
|
||||
// Deprecated: Use TypedMaxOfRateLimiter instead.
|
||||
type MaxOfRateLimiter = TypedMaxOfRateLimiter[any]
|
||||
|
||||
// TypedMaxOfRateLimiter calls every RateLimiter and returns the worst case response
|
||||
// When used with a token bucket limiter, the burst could be apparently exceeded in cases where particular items
|
||||
// were separately delayed a longer time.
|
||||
type TypedMaxOfRateLimiter[T comparable] struct {
|
||||
limiters []TypedRateLimiter[T]
|
||||
}
|
||||
|
||||
func (r *TypedMaxOfRateLimiter[T]) When(item T) time.Duration {
|
||||
ret := time.Duration(0)
|
||||
for _, limiter := range r.limiters {
|
||||
curr := limiter.When(item)
|
||||
if curr > ret {
|
||||
ret = curr
|
||||
}
|
||||
}
|
||||
|
||||
return ret
|
||||
}
|
||||
|
||||
// Deprecated: NewMaxOfRateLimiter is deprecated, use NewTypedMaxOfRateLimiter instead.
|
||||
func NewMaxOfRateLimiter(limiters ...TypedRateLimiter[any]) RateLimiter {
|
||||
return NewTypedMaxOfRateLimiter(limiters...)
|
||||
}
|
||||
|
||||
func NewTypedMaxOfRateLimiter[T comparable](limiters ...TypedRateLimiter[T]) TypedRateLimiter[T] {
|
||||
return &TypedMaxOfRateLimiter[T]{limiters: limiters}
|
||||
}
|
||||
|
||||
func (r *TypedMaxOfRateLimiter[T]) NumRequeues(item T) int {
|
||||
ret := 0
|
||||
for _, limiter := range r.limiters {
|
||||
curr := limiter.NumRequeues(item)
|
||||
if curr > ret {
|
||||
ret = curr
|
||||
}
|
||||
}
|
||||
|
||||
return ret
|
||||
}
|
||||
|
||||
func (r *TypedMaxOfRateLimiter[T]) Forget(item T) {
|
||||
for _, limiter := range r.limiters {
|
||||
limiter.Forget(item)
|
||||
}
|
||||
}
|
||||
|
||||
// WithMaxWaitRateLimiter have maxDelay which avoids waiting too long
|
||||
// Deprecated: Use TypedWithMaxWaitRateLimiter instead.
|
||||
type WithMaxWaitRateLimiter = TypedWithMaxWaitRateLimiter[any]
|
||||
|
||||
// TypedWithMaxWaitRateLimiter have maxDelay which avoids waiting too long
|
||||
type TypedWithMaxWaitRateLimiter[T comparable] struct {
|
||||
limiter TypedRateLimiter[T]
|
||||
maxDelay time.Duration
|
||||
}
|
||||
|
||||
// Deprecated: NewWithMaxWaitRateLimiter is deprecated, use NewTypedWithMaxWaitRateLimiter instead.
|
||||
func NewWithMaxWaitRateLimiter(limiter RateLimiter, maxDelay time.Duration) RateLimiter {
|
||||
return NewTypedWithMaxWaitRateLimiter[any](limiter, maxDelay)
|
||||
}
|
||||
|
||||
func NewTypedWithMaxWaitRateLimiter[T comparable](limiter TypedRateLimiter[T], maxDelay time.Duration) TypedRateLimiter[T] {
|
||||
return &TypedWithMaxWaitRateLimiter[T]{limiter: limiter, maxDelay: maxDelay}
|
||||
}
|
||||
|
||||
func (w TypedWithMaxWaitRateLimiter[T]) When(item T) time.Duration {
|
||||
delay := w.limiter.When(item)
|
||||
if delay > w.maxDelay {
|
||||
return w.maxDelay
|
||||
}
|
||||
|
||||
return delay
|
||||
}
|
||||
|
||||
func (w TypedWithMaxWaitRateLimiter[T]) Forget(item T) {
|
||||
w.limiter.Forget(item)
|
||||
}
|
||||
|
||||
func (w TypedWithMaxWaitRateLimiter[T]) NumRequeues(item T) int {
|
||||
return w.limiter.NumRequeues(item)
|
||||
}
|
||||
369
vendor/k8s.io/client-go/util/workqueue/delaying_queue.go
generated
vendored
Normal file
369
vendor/k8s.io/client-go/util/workqueue/delaying_queue.go
generated
vendored
Normal file
|
|
@ -0,0 +1,369 @@
|
|||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package workqueue
|
||||
|
||||
import (
|
||||
"container/heap"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/utils/clock"
|
||||
)
|
||||
|
||||
// DelayingInterface is an Interface that can Add an item at a later time. This makes it easier to
|
||||
// requeue items after failures without ending up in a hot-loop.
|
||||
//
|
||||
// Deprecated: use TypedDelayingInterface instead.
|
||||
type DelayingInterface TypedDelayingInterface[any]
|
||||
|
||||
// TypedDelayingInterface is an Interface that can Add an item at a later time. This makes it easier to
|
||||
// requeue items after failures without ending up in a hot-loop.
|
||||
type TypedDelayingInterface[T comparable] interface {
|
||||
TypedInterface[T]
|
||||
// AddAfter adds an item to the workqueue after the indicated duration has passed
|
||||
AddAfter(item T, duration time.Duration)
|
||||
}
|
||||
|
||||
// DelayingQueueConfig specifies optional configurations to customize a DelayingInterface.
|
||||
//
|
||||
// Deprecated: use TypedDelayingQueueConfig instead.
|
||||
type DelayingQueueConfig = TypedDelayingQueueConfig[any]
|
||||
|
||||
// TypedDelayingQueueConfig specifies optional configurations to customize a DelayingInterface.
|
||||
type TypedDelayingQueueConfig[T comparable] struct {
|
||||
// An optional logger. The name of the queue does *not* get added to it, this should
|
||||
// be done by the caller if desired.
|
||||
Logger *klog.Logger
|
||||
|
||||
// Name for the queue. If unnamed, the metrics will not be registered.
|
||||
Name string
|
||||
|
||||
// MetricsProvider optionally allows specifying a metrics provider to use for the queue
|
||||
// instead of the global provider.
|
||||
MetricsProvider MetricsProvider
|
||||
|
||||
// Clock optionally allows injecting a real or fake clock for testing purposes.
|
||||
Clock clock.WithTicker
|
||||
|
||||
// Queue optionally allows injecting custom queue Interface instead of the default one.
|
||||
Queue TypedInterface[T]
|
||||
}
|
||||
|
||||
// NewDelayingQueue constructs a new workqueue with delayed queuing ability.
|
||||
// NewDelayingQueue does not emit metrics. For use with a MetricsProvider, please use
|
||||
// NewDelayingQueueWithConfig instead and specify a name.
|
||||
//
|
||||
// Deprecated: use NewTypedDelayingQueue instead.
|
||||
func NewDelayingQueue() DelayingInterface {
|
||||
return NewDelayingQueueWithConfig(DelayingQueueConfig{})
|
||||
}
|
||||
|
||||
// NewTypedDelayingQueue constructs a new workqueue with delayed queuing ability.
|
||||
// NewTypedDelayingQueue does not emit metrics. For use with a MetricsProvider, please use
|
||||
// NewTypedDelayingQueueWithConfig instead and specify a name.
|
||||
func NewTypedDelayingQueue[T comparable]() TypedDelayingInterface[T] {
|
||||
return NewTypedDelayingQueueWithConfig(TypedDelayingQueueConfig[T]{})
|
||||
}
|
||||
|
||||
// NewDelayingQueueWithConfig constructs a new workqueue with options to
|
||||
// customize different properties.
|
||||
//
|
||||
// Deprecated: use NewTypedDelayingQueueWithConfig instead.
|
||||
func NewDelayingQueueWithConfig(config DelayingQueueConfig) DelayingInterface {
|
||||
return NewTypedDelayingQueueWithConfig[any](config)
|
||||
}
|
||||
|
||||
// TypedNewDelayingQueue exists for backwards compatibility only.
|
||||
//
|
||||
// Deprecated: use NewTypedDelayingQueueWithConfig instead.
|
||||
func TypedNewDelayingQueue[T comparable]() TypedDelayingInterface[T] {
|
||||
return NewTypedDelayingQueue[T]()
|
||||
}
|
||||
|
||||
// NewTypedDelayingQueueWithConfig constructs a new workqueue with options to
|
||||
// customize different properties.
|
||||
func NewTypedDelayingQueueWithConfig[T comparable](config TypedDelayingQueueConfig[T]) TypedDelayingInterface[T] {
|
||||
logger := klog.Background()
|
||||
if config.Logger != nil {
|
||||
logger = *config.Logger
|
||||
}
|
||||
if config.Clock == nil {
|
||||
config.Clock = clock.RealClock{}
|
||||
}
|
||||
|
||||
if config.Queue == nil {
|
||||
config.Queue = NewTypedWithConfig[T](TypedQueueConfig[T]{
|
||||
Name: config.Name,
|
||||
MetricsProvider: config.MetricsProvider,
|
||||
Clock: config.Clock,
|
||||
})
|
||||
}
|
||||
|
||||
return newDelayingQueue(logger, config.Clock, config.Queue, config.Name, config.MetricsProvider)
|
||||
}
|
||||
|
||||
// NewDelayingQueueWithCustomQueue constructs a new workqueue with ability to
|
||||
// inject custom queue Interface instead of the default one
|
||||
// Deprecated: Use NewDelayingQueueWithConfig instead.
|
||||
func NewDelayingQueueWithCustomQueue(q Interface, name string) DelayingInterface {
|
||||
return NewDelayingQueueWithConfig(DelayingQueueConfig{
|
||||
Name: name,
|
||||
Queue: q,
|
||||
})
|
||||
}
|
||||
|
||||
// NewNamedDelayingQueue constructs a new named workqueue with delayed queuing ability.
|
||||
// Deprecated: Use NewDelayingQueueWithConfig instead.
|
||||
func NewNamedDelayingQueue(name string) DelayingInterface {
|
||||
return NewDelayingQueueWithConfig(DelayingQueueConfig{Name: name})
|
||||
}
|
||||
|
||||
// NewDelayingQueueWithCustomClock constructs a new named workqueue
|
||||
// with ability to inject real or fake clock for testing purposes.
|
||||
// Deprecated: Use NewDelayingQueueWithConfig instead.
|
||||
func NewDelayingQueueWithCustomClock(clock clock.WithTicker, name string) DelayingInterface {
|
||||
return NewDelayingQueueWithConfig(DelayingQueueConfig{
|
||||
Name: name,
|
||||
Clock: clock,
|
||||
})
|
||||
}
|
||||
|
||||
func newDelayingQueue[T comparable](logger klog.Logger, clock clock.WithTicker, q TypedInterface[T], name string, provider MetricsProvider) *delayingType[T] {
|
||||
ret := &delayingType[T]{
|
||||
TypedInterface: q,
|
||||
clock: clock,
|
||||
heartbeat: clock.NewTicker(maxWait),
|
||||
stopCh: make(chan struct{}),
|
||||
waitingForAddCh: make(chan *waitFor[T], 1000),
|
||||
metrics: newRetryMetrics(name, provider),
|
||||
}
|
||||
|
||||
go ret.waitingLoop(logger)
|
||||
return ret
|
||||
}
|
||||
|
||||
// delayingType wraps an Interface and provides delayed re-enquing
|
||||
type delayingType[T comparable] struct {
|
||||
TypedInterface[T]
|
||||
|
||||
// clock tracks time for delayed firing
|
||||
clock clock.Clock
|
||||
|
||||
// stopCh lets us signal a shutdown to the waiting loop
|
||||
stopCh chan struct{}
|
||||
// stopOnce guarantees we only signal shutdown a single time
|
||||
stopOnce sync.Once
|
||||
|
||||
// heartbeat ensures we wait no more than maxWait before firing
|
||||
heartbeat clock.Ticker
|
||||
|
||||
// waitingForAddCh is a buffered channel that feeds waitingForAdd
|
||||
waitingForAddCh chan *waitFor[T]
|
||||
|
||||
// metrics counts the number of retries
|
||||
metrics retryMetrics
|
||||
}
|
||||
|
||||
// waitFor holds the data to add and the time it should be added
|
||||
type waitFor[T any] struct {
|
||||
data T
|
||||
readyAt time.Time
|
||||
// index in the priority queue (heap)
|
||||
index int
|
||||
}
|
||||
|
||||
// waitForPriorityQueue implements a priority queue for waitFor items.
|
||||
//
|
||||
// waitForPriorityQueue implements heap.Interface. The item occurring next in
|
||||
// time (i.e., the item with the smallest readyAt) is at the root (index 0).
|
||||
// Peek returns this minimum item at index 0. Pop returns the minimum item after
|
||||
// it has been removed from the queue and placed at index Len()-1 by
|
||||
// container/heap. Push adds an item at index Len(), and container/heap
|
||||
// percolates it into the correct location.
|
||||
type waitForPriorityQueue[T any] []*waitFor[T]
|
||||
|
||||
func (pq waitForPriorityQueue[T]) Len() int {
|
||||
return len(pq)
|
||||
}
|
||||
func (pq waitForPriorityQueue[T]) Less(i, j int) bool {
|
||||
return pq[i].readyAt.Before(pq[j].readyAt)
|
||||
}
|
||||
func (pq waitForPriorityQueue[T]) Swap(i, j int) {
|
||||
pq[i], pq[j] = pq[j], pq[i]
|
||||
pq[i].index = i
|
||||
pq[j].index = j
|
||||
}
|
||||
|
||||
// Push adds an item to the queue. Push should not be called directly; instead,
|
||||
// use `heap.Push`.
|
||||
func (pq *waitForPriorityQueue[T]) Push(x interface{}) {
|
||||
n := len(*pq)
|
||||
item := x.(*waitFor[T])
|
||||
item.index = n
|
||||
*pq = append(*pq, item)
|
||||
}
|
||||
|
||||
// Pop removes an item from the queue. Pop should not be called directly;
|
||||
// instead, use `heap.Pop`.
|
||||
func (pq *waitForPriorityQueue[T]) Pop() interface{} {
|
||||
n := len(*pq)
|
||||
item := (*pq)[n-1]
|
||||
item.index = -1
|
||||
*pq = (*pq)[0:(n - 1)]
|
||||
return item
|
||||
}
|
||||
|
||||
// Peek returns the item at the beginning of the queue, without removing the
|
||||
// item or otherwise mutating the queue. It is safe to call directly.
|
||||
func (pq waitForPriorityQueue[T]) Peek() interface{} {
|
||||
return pq[0]
|
||||
}
|
||||
|
||||
// ShutDown stops the queue. After the queue drains, the returned shutdown bool
|
||||
// on Get() will be true. This method may be invoked more than once.
|
||||
func (q *delayingType[T]) ShutDown() {
|
||||
q.stopOnce.Do(func() {
|
||||
q.TypedInterface.ShutDown()
|
||||
close(q.stopCh)
|
||||
q.heartbeat.Stop()
|
||||
})
|
||||
}
|
||||
|
||||
// AddAfter adds the given item to the work queue after the given delay
|
||||
func (q *delayingType[T]) AddAfter(item T, duration time.Duration) {
|
||||
// don't add if we're already shutting down
|
||||
if q.ShuttingDown() {
|
||||
return
|
||||
}
|
||||
|
||||
q.metrics.retry()
|
||||
|
||||
// immediately add things with no delay
|
||||
if duration <= 0 {
|
||||
q.Add(item)
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case <-q.stopCh:
|
||||
// unblock if ShutDown() is called
|
||||
case q.waitingForAddCh <- &waitFor[T]{data: item, readyAt: q.clock.Now().Add(duration)}:
|
||||
}
|
||||
}
|
||||
|
||||
// maxWait keeps a max bound on the wait time. It's just insurance against weird things happening.
|
||||
// Checking the queue every 10 seconds isn't expensive and we know that we'll never end up with an
|
||||
// expired item sitting for more than 10 seconds.
|
||||
const maxWait = 10 * time.Second
|
||||
|
||||
// waitingLoop runs until the workqueue is shutdown and keeps a check on the list of items to be added.
|
||||
func (q *delayingType[T]) waitingLoop(logger klog.Logger) {
|
||||
defer utilruntime.HandleCrashWithLogger(logger)
|
||||
|
||||
// Make a placeholder channel to use when there are no items in our list
|
||||
never := make(<-chan time.Time)
|
||||
|
||||
// Make a timer that expires when the item at the head of the waiting queue is ready
|
||||
var nextReadyAtTimer clock.Timer
|
||||
|
||||
waitingForQueue := &waitForPriorityQueue[T]{}
|
||||
heap.Init(waitingForQueue)
|
||||
|
||||
waitingEntryByData := map[T]*waitFor[T]{}
|
||||
|
||||
for {
|
||||
if q.TypedInterface.ShuttingDown() {
|
||||
return
|
||||
}
|
||||
|
||||
now := q.clock.Now()
|
||||
|
||||
// Add ready entries
|
||||
for waitingForQueue.Len() > 0 {
|
||||
entry := waitingForQueue.Peek().(*waitFor[T])
|
||||
if entry.readyAt.After(now) {
|
||||
break
|
||||
}
|
||||
|
||||
entry = heap.Pop(waitingForQueue).(*waitFor[T])
|
||||
q.Add(entry.data)
|
||||
delete(waitingEntryByData, entry.data)
|
||||
}
|
||||
|
||||
// Set up a wait for the first item's readyAt (if one exists)
|
||||
nextReadyAt := never
|
||||
if waitingForQueue.Len() > 0 {
|
||||
if nextReadyAtTimer != nil {
|
||||
nextReadyAtTimer.Stop()
|
||||
}
|
||||
entry := waitingForQueue.Peek().(*waitFor[T])
|
||||
nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now))
|
||||
nextReadyAt = nextReadyAtTimer.C()
|
||||
}
|
||||
|
||||
select {
|
||||
case <-q.stopCh:
|
||||
return
|
||||
|
||||
case <-q.heartbeat.C():
|
||||
// continue the loop, which will add ready items
|
||||
|
||||
case <-nextReadyAt:
|
||||
// continue the loop, which will add ready items
|
||||
|
||||
case waitEntry := <-q.waitingForAddCh:
|
||||
if waitEntry.readyAt.After(q.clock.Now()) {
|
||||
insert(waitingForQueue, waitingEntryByData, waitEntry)
|
||||
} else {
|
||||
q.Add(waitEntry.data)
|
||||
}
|
||||
|
||||
drained := false
|
||||
for !drained {
|
||||
select {
|
||||
case waitEntry := <-q.waitingForAddCh:
|
||||
if waitEntry.readyAt.After(q.clock.Now()) {
|
||||
insert(waitingForQueue, waitingEntryByData, waitEntry)
|
||||
} else {
|
||||
q.Add(waitEntry.data)
|
||||
}
|
||||
default:
|
||||
drained = true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// insert adds the entry to the priority queue, or updates the readyAt if it already exists in the queue
|
||||
func insert[T comparable](q *waitForPriorityQueue[T], knownEntries map[T]*waitFor[T], entry *waitFor[T]) {
|
||||
// if the entry already exists, update the time only if it would cause the item to be queued sooner
|
||||
existing, exists := knownEntries[entry.data]
|
||||
if exists {
|
||||
if existing.readyAt.After(entry.readyAt) {
|
||||
existing.readyAt = entry.readyAt
|
||||
heap.Fix(q, existing.index)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
heap.Push(q, entry)
|
||||
knownEntries[entry.data] = entry
|
||||
}
|
||||
27
vendor/k8s.io/client-go/util/workqueue/doc.go
generated
vendored
Normal file
27
vendor/k8s.io/client-go/util/workqueue/doc.go
generated
vendored
Normal file
|
|
@ -0,0 +1,27 @@
|
|||
/*
|
||||
Copyright 2014 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
// Package workqueue provides a simple queue that supports the following
|
||||
// features:
|
||||
// - Fair: items processed in the order in which they are added.
|
||||
// - Stingy: a single item will not be processed multiple times concurrently,
|
||||
// and if an item is added multiple times before it can be processed, it
|
||||
// will only be processed once.
|
||||
// - Multiple consumers and producers. In particular, it is allowed for an
|
||||
// item to be reenqueued while it is being processed.
|
||||
// In this case it will be processed again.
|
||||
// - Shutdown notifications.
|
||||
package workqueue
|
||||
255
vendor/k8s.io/client-go/util/workqueue/metrics.go
generated
vendored
Normal file
255
vendor/k8s.io/client-go/util/workqueue/metrics.go
generated
vendored
Normal file
|
|
@ -0,0 +1,255 @@
|
|||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package workqueue
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"k8s.io/utils/clock"
|
||||
)
|
||||
|
||||
// This file provides abstractions for setting the provider (e.g., prometheus)
|
||||
// of metrics.
|
||||
|
||||
type queueMetrics[T comparable] interface {
|
||||
add(item T)
|
||||
get(item T)
|
||||
done(item T)
|
||||
updateUnfinishedWork()
|
||||
}
|
||||
|
||||
// GaugeMetric represents a single numerical value that can arbitrarily go up
|
||||
// and down.
|
||||
type GaugeMetric interface {
|
||||
Inc()
|
||||
Dec()
|
||||
}
|
||||
|
||||
// SettableGaugeMetric represents a single numerical value that can arbitrarily go up
|
||||
// and down. (Separate from GaugeMetric to preserve backwards compatibility.)
|
||||
type SettableGaugeMetric interface {
|
||||
Set(float64)
|
||||
}
|
||||
|
||||
// CounterMetric represents a single numerical value that only ever
|
||||
// goes up.
|
||||
type CounterMetric interface {
|
||||
Inc()
|
||||
}
|
||||
|
||||
// SummaryMetric captures individual observations.
|
||||
type SummaryMetric interface {
|
||||
Observe(float64)
|
||||
}
|
||||
|
||||
// HistogramMetric counts individual observations.
|
||||
type HistogramMetric interface {
|
||||
Observe(float64)
|
||||
}
|
||||
|
||||
type noopMetric struct{}
|
||||
|
||||
func (noopMetric) Inc() {}
|
||||
func (noopMetric) Dec() {}
|
||||
func (noopMetric) Set(float64) {}
|
||||
func (noopMetric) Observe(float64) {}
|
||||
|
||||
// defaultQueueMetrics expects the caller to lock before setting any metrics.
|
||||
type defaultQueueMetrics[T comparable] struct {
|
||||
clock clock.Clock
|
||||
|
||||
// current depth of a workqueue
|
||||
depth GaugeMetric
|
||||
// total number of adds handled by a workqueue
|
||||
adds CounterMetric
|
||||
// how long an item stays in a workqueue
|
||||
latency HistogramMetric
|
||||
// how long processing an item from a workqueue takes
|
||||
workDuration HistogramMetric
|
||||
addTimes map[T]time.Time
|
||||
processingStartTimes map[T]time.Time
|
||||
|
||||
// how long have current threads been working?
|
||||
unfinishedWorkSeconds SettableGaugeMetric
|
||||
longestRunningProcessor SettableGaugeMetric
|
||||
}
|
||||
|
||||
func (m *defaultQueueMetrics[T]) add(item T) {
|
||||
if m == nil {
|
||||
return
|
||||
}
|
||||
|
||||
m.adds.Inc()
|
||||
m.depth.Inc()
|
||||
if _, exists := m.addTimes[item]; !exists {
|
||||
m.addTimes[item] = m.clock.Now()
|
||||
}
|
||||
}
|
||||
|
||||
func (m *defaultQueueMetrics[T]) get(item T) {
|
||||
if m == nil {
|
||||
return
|
||||
}
|
||||
|
||||
m.depth.Dec()
|
||||
m.processingStartTimes[item] = m.clock.Now()
|
||||
if startTime, exists := m.addTimes[item]; exists {
|
||||
m.latency.Observe(m.sinceInSeconds(startTime))
|
||||
delete(m.addTimes, item)
|
||||
}
|
||||
}
|
||||
|
||||
func (m *defaultQueueMetrics[T]) done(item T) {
|
||||
if m == nil {
|
||||
return
|
||||
}
|
||||
|
||||
if startTime, exists := m.processingStartTimes[item]; exists {
|
||||
m.workDuration.Observe(m.sinceInSeconds(startTime))
|
||||
delete(m.processingStartTimes, item)
|
||||
}
|
||||
}
|
||||
|
||||
func (m *defaultQueueMetrics[T]) updateUnfinishedWork() {
|
||||
// Note that a summary metric would be better for this, but prometheus
|
||||
// doesn't seem to have non-hacky ways to reset the summary metrics.
|
||||
var total float64
|
||||
var oldest float64
|
||||
for _, t := range m.processingStartTimes {
|
||||
age := m.sinceInSeconds(t)
|
||||
total += age
|
||||
if age > oldest {
|
||||
oldest = age
|
||||
}
|
||||
}
|
||||
m.unfinishedWorkSeconds.Set(total)
|
||||
m.longestRunningProcessor.Set(oldest)
|
||||
}
|
||||
|
||||
type noMetrics[T any] struct{}
|
||||
|
||||
func (noMetrics[T]) add(item T) {}
|
||||
func (noMetrics[T]) get(item T) {}
|
||||
func (noMetrics[T]) done(item T) {}
|
||||
func (noMetrics[T]) updateUnfinishedWork() {}
|
||||
|
||||
// Gets the time since the specified start in seconds.
|
||||
func (m *defaultQueueMetrics[T]) sinceInSeconds(start time.Time) float64 {
|
||||
return m.clock.Since(start).Seconds()
|
||||
}
|
||||
|
||||
type retryMetrics interface {
|
||||
retry()
|
||||
}
|
||||
|
||||
type defaultRetryMetrics struct {
|
||||
retries CounterMetric
|
||||
}
|
||||
|
||||
func (m *defaultRetryMetrics) retry() {
|
||||
if m == nil {
|
||||
return
|
||||
}
|
||||
|
||||
m.retries.Inc()
|
||||
}
|
||||
|
||||
// MetricsProvider generates various metrics used by the queue.
|
||||
type MetricsProvider interface {
|
||||
NewDepthMetric(name string) GaugeMetric
|
||||
NewAddsMetric(name string) CounterMetric
|
||||
NewLatencyMetric(name string) HistogramMetric
|
||||
NewWorkDurationMetric(name string) HistogramMetric
|
||||
NewUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric
|
||||
NewLongestRunningProcessorSecondsMetric(name string) SettableGaugeMetric
|
||||
NewRetriesMetric(name string) CounterMetric
|
||||
}
|
||||
|
||||
type noopMetricsProvider struct{}
|
||||
|
||||
func (_ noopMetricsProvider) NewDepthMetric(name string) GaugeMetric {
|
||||
return noopMetric{}
|
||||
}
|
||||
|
||||
func (_ noopMetricsProvider) NewAddsMetric(name string) CounterMetric {
|
||||
return noopMetric{}
|
||||
}
|
||||
|
||||
func (_ noopMetricsProvider) NewLatencyMetric(name string) HistogramMetric {
|
||||
return noopMetric{}
|
||||
}
|
||||
|
||||
func (_ noopMetricsProvider) NewWorkDurationMetric(name string) HistogramMetric {
|
||||
return noopMetric{}
|
||||
}
|
||||
|
||||
func (_ noopMetricsProvider) NewUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric {
|
||||
return noopMetric{}
|
||||
}
|
||||
|
||||
func (_ noopMetricsProvider) NewLongestRunningProcessorSecondsMetric(name string) SettableGaugeMetric {
|
||||
return noopMetric{}
|
||||
}
|
||||
|
||||
func (_ noopMetricsProvider) NewRetriesMetric(name string) CounterMetric {
|
||||
return noopMetric{}
|
||||
}
|
||||
|
||||
var globalMetricsProvider MetricsProvider = noopMetricsProvider{}
|
||||
|
||||
var setGlobalMetricsProviderOnce sync.Once
|
||||
|
||||
func newQueueMetrics[T comparable](mp MetricsProvider, name string, clock clock.Clock) queueMetrics[T] {
|
||||
if len(name) == 0 || mp == (noopMetricsProvider{}) {
|
||||
return noMetrics[T]{}
|
||||
}
|
||||
return &defaultQueueMetrics[T]{
|
||||
clock: clock,
|
||||
depth: mp.NewDepthMetric(name),
|
||||
adds: mp.NewAddsMetric(name),
|
||||
latency: mp.NewLatencyMetric(name),
|
||||
workDuration: mp.NewWorkDurationMetric(name),
|
||||
unfinishedWorkSeconds: mp.NewUnfinishedWorkSecondsMetric(name),
|
||||
longestRunningProcessor: mp.NewLongestRunningProcessorSecondsMetric(name),
|
||||
addTimes: map[T]time.Time{},
|
||||
processingStartTimes: map[T]time.Time{},
|
||||
}
|
||||
}
|
||||
|
||||
func newRetryMetrics(name string, provider MetricsProvider) retryMetrics {
|
||||
var ret *defaultRetryMetrics
|
||||
if len(name) == 0 {
|
||||
return ret
|
||||
}
|
||||
|
||||
if provider == nil {
|
||||
provider = globalMetricsProvider
|
||||
}
|
||||
|
||||
return &defaultRetryMetrics{
|
||||
retries: provider.NewRetriesMetric(name),
|
||||
}
|
||||
}
|
||||
|
||||
// SetProvider sets the metrics provider for all subsequently created work
|
||||
// queues. Only the first call has an effect.
|
||||
func SetProvider(metricsProvider MetricsProvider) {
|
||||
setGlobalMetricsProviderOnce.Do(func() {
|
||||
globalMetricsProvider = metricsProvider
|
||||
})
|
||||
}
|
||||
101
vendor/k8s.io/client-go/util/workqueue/parallelizer.go
generated
vendored
Normal file
101
vendor/k8s.io/client-go/util/workqueue/parallelizer.go
generated
vendored
Normal file
|
|
@ -0,0 +1,101 @@
|
|||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package workqueue
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
)
|
||||
|
||||
type DoWorkPieceFunc func(piece int)
|
||||
|
||||
type options struct {
|
||||
chunkSize int
|
||||
}
|
||||
|
||||
type Options func(*options)
|
||||
|
||||
// WithChunkSize allows to set chunks of work items to the workers, rather than
|
||||
// processing one by one.
|
||||
// It is recommended to use this option if the number of pieces significantly
|
||||
// higher than the number of workers and the work done for each item is small.
|
||||
func WithChunkSize(c int) func(*options) {
|
||||
return func(o *options) {
|
||||
o.chunkSize = c
|
||||
}
|
||||
}
|
||||
|
||||
// ParallelizeUntil is a framework that allows for parallelizing N
|
||||
// independent pieces of work until done or the context is canceled.
|
||||
func ParallelizeUntil(ctx context.Context, workers, pieces int, doWorkPiece DoWorkPieceFunc, opts ...Options) {
|
||||
if pieces == 0 {
|
||||
return
|
||||
}
|
||||
o := options{}
|
||||
for _, opt := range opts {
|
||||
opt(&o)
|
||||
}
|
||||
chunkSize := o.chunkSize
|
||||
if chunkSize < 1 {
|
||||
chunkSize = 1
|
||||
}
|
||||
|
||||
chunks := ceilDiv(pieces, chunkSize)
|
||||
toProcess := make(chan int, chunks)
|
||||
for i := 0; i < chunks; i++ {
|
||||
toProcess <- i
|
||||
}
|
||||
close(toProcess)
|
||||
|
||||
var stop <-chan struct{}
|
||||
if ctx != nil {
|
||||
stop = ctx.Done()
|
||||
}
|
||||
if chunks < workers {
|
||||
workers = chunks
|
||||
}
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(workers)
|
||||
for i := 0; i < workers; i++ {
|
||||
go func() {
|
||||
defer utilruntime.HandleCrashWithContext(ctx)
|
||||
defer wg.Done()
|
||||
for chunk := range toProcess {
|
||||
start := chunk * chunkSize
|
||||
end := start + chunkSize
|
||||
if end > pieces {
|
||||
end = pieces
|
||||
}
|
||||
for p := start; p < end; p++ {
|
||||
select {
|
||||
case <-stop:
|
||||
return
|
||||
default:
|
||||
doWorkPiece(p)
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func ceilDiv(a, b int) int {
|
||||
return (a + b - 1) / b
|
||||
}
|
||||
370
vendor/k8s.io/client-go/util/workqueue/queue.go
generated
vendored
Normal file
370
vendor/k8s.io/client-go/util/workqueue/queue.go
generated
vendored
Normal file
|
|
@ -0,0 +1,370 @@
|
|||
/*
|
||||
Copyright 2015 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package workqueue
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/utils/clock"
|
||||
)
|
||||
|
||||
// Deprecated: Interface is deprecated, use TypedInterface instead.
|
||||
type Interface TypedInterface[any]
|
||||
|
||||
type TypedInterface[T comparable] interface {
|
||||
Add(item T)
|
||||
Len() int
|
||||
Get() (item T, shutdown bool)
|
||||
Done(item T)
|
||||
ShutDown()
|
||||
ShutDownWithDrain()
|
||||
ShuttingDown() bool
|
||||
}
|
||||
|
||||
// Queue is the underlying storage for items. The functions below are always
|
||||
// called from the same goroutine.
|
||||
type Queue[T comparable] interface {
|
||||
// Touch can be hooked when an existing item is added again. This may be
|
||||
// useful if the implementation allows priority change for the given item.
|
||||
Touch(item T)
|
||||
// Push adds a new item.
|
||||
Push(item T)
|
||||
// Len tells the total number of items.
|
||||
Len() int
|
||||
// Pop retrieves an item.
|
||||
Pop() (item T)
|
||||
}
|
||||
|
||||
// DefaultQueue is a slice based FIFO queue.
|
||||
func DefaultQueue[T comparable]() Queue[T] {
|
||||
return new(queue[T])
|
||||
}
|
||||
|
||||
// queue is a slice which implements Queue.
|
||||
type queue[T comparable] []T
|
||||
|
||||
func (q *queue[T]) Touch(item T) {}
|
||||
|
||||
func (q *queue[T]) Push(item T) {
|
||||
*q = append(*q, item)
|
||||
}
|
||||
|
||||
func (q *queue[T]) Len() int {
|
||||
return len(*q)
|
||||
}
|
||||
|
||||
func (q *queue[T]) Pop() (item T) {
|
||||
item = (*q)[0]
|
||||
|
||||
// The underlying array still exists and reference this object, so the object will not be garbage collected.
|
||||
(*q)[0] = *new(T)
|
||||
*q = (*q)[1:]
|
||||
|
||||
return item
|
||||
}
|
||||
|
||||
// QueueConfig specifies optional configurations to customize an Interface.
|
||||
// Deprecated: use TypedQueueConfig instead.
|
||||
type QueueConfig = TypedQueueConfig[any]
|
||||
|
||||
type TypedQueueConfig[T comparable] struct {
|
||||
// Name for the queue. If unnamed, the metrics will not be registered.
|
||||
Name string
|
||||
|
||||
// MetricsProvider optionally allows specifying a metrics provider to use for the queue
|
||||
// instead of the global provider.
|
||||
MetricsProvider MetricsProvider
|
||||
|
||||
// Clock ability to inject real or fake clock for testing purposes.
|
||||
Clock clock.WithTicker
|
||||
|
||||
// Queue provides the underlying queue to use. It is optional and defaults to slice based FIFO queue.
|
||||
Queue Queue[T]
|
||||
}
|
||||
|
||||
// New constructs a new work queue (see the package comment).
|
||||
//
|
||||
// Deprecated: use NewTyped instead.
|
||||
func New() *Type {
|
||||
return NewWithConfig(QueueConfig{
|
||||
Name: "",
|
||||
})
|
||||
}
|
||||
|
||||
// NewTyped constructs a new work queue (see the package comment).
|
||||
func NewTyped[T comparable]() *Typed[T] {
|
||||
return NewTypedWithConfig(TypedQueueConfig[T]{
|
||||
Name: "",
|
||||
})
|
||||
}
|
||||
|
||||
// NewWithConfig constructs a new workqueue with ability to
|
||||
// customize different properties.
|
||||
//
|
||||
// Deprecated: use NewTypedWithConfig instead.
|
||||
func NewWithConfig(config QueueConfig) *Type {
|
||||
return NewTypedWithConfig(config)
|
||||
}
|
||||
|
||||
// NewTypedWithConfig constructs a new workqueue with ability to
|
||||
// customize different properties.
|
||||
func NewTypedWithConfig[T comparable](config TypedQueueConfig[T]) *Typed[T] {
|
||||
return newQueueWithConfig(config, defaultUnfinishedWorkUpdatePeriod)
|
||||
}
|
||||
|
||||
// NewNamed creates a new named queue.
|
||||
// Deprecated: Use NewWithConfig instead.
|
||||
func NewNamed(name string) *Type {
|
||||
return NewWithConfig(QueueConfig{
|
||||
Name: name,
|
||||
})
|
||||
}
|
||||
|
||||
// newQueueWithConfig constructs a new named workqueue
|
||||
// with the ability to customize different properties for testing purposes
|
||||
func newQueueWithConfig[T comparable](config TypedQueueConfig[T], updatePeriod time.Duration) *Typed[T] {
|
||||
metricsProvider := globalMetricsProvider
|
||||
if config.MetricsProvider != nil {
|
||||
metricsProvider = config.MetricsProvider
|
||||
}
|
||||
|
||||
if config.Clock == nil {
|
||||
config.Clock = clock.RealClock{}
|
||||
}
|
||||
|
||||
if config.Queue == nil {
|
||||
config.Queue = DefaultQueue[T]()
|
||||
}
|
||||
|
||||
return newQueue(
|
||||
config.Clock,
|
||||
config.Queue,
|
||||
newQueueMetrics[T](metricsProvider, config.Name, config.Clock),
|
||||
updatePeriod,
|
||||
)
|
||||
}
|
||||
|
||||
func newQueue[T comparable](c clock.WithTicker, queue Queue[T], metrics queueMetrics[T], updatePeriod time.Duration) *Typed[T] {
|
||||
t := &Typed[T]{
|
||||
clock: c,
|
||||
queue: queue,
|
||||
dirty: sets.Set[T]{},
|
||||
processing: sets.Set[T]{},
|
||||
cond: sync.NewCond(&sync.Mutex{}),
|
||||
metrics: metrics,
|
||||
unfinishedWorkUpdatePeriod: updatePeriod,
|
||||
stopCh: make(chan struct{}),
|
||||
}
|
||||
|
||||
// Don't start the goroutine for a type of noMetrics so we don't consume
|
||||
// resources unnecessarily
|
||||
if _, ok := metrics.(noMetrics[T]); !ok {
|
||||
t.wg.Go(t.updateUnfinishedWorkLoop)
|
||||
}
|
||||
|
||||
return t
|
||||
}
|
||||
|
||||
const defaultUnfinishedWorkUpdatePeriod = 500 * time.Millisecond
|
||||
|
||||
// Type is a work queue (see the package comment).
|
||||
// Deprecated: Use Typed instead.
|
||||
type Type = Typed[any]
|
||||
|
||||
type Typed[t comparable] struct {
|
||||
// queue defines the order in which we will work on items. Every
|
||||
// element of queue should be in the dirty set and not in the
|
||||
// processing set.
|
||||
queue Queue[t]
|
||||
|
||||
// dirty defines all of the items that need to be processed.
|
||||
dirty sets.Set[t]
|
||||
|
||||
// Things that are currently being processed are in the processing set.
|
||||
// These things may be simultaneously in the dirty set. When we finish
|
||||
// processing something and remove it from this set, we'll check if
|
||||
// it's in the dirty set, and if so, add it to the queue.
|
||||
processing sets.Set[t]
|
||||
|
||||
cond *sync.Cond
|
||||
|
||||
shuttingDown bool
|
||||
drain bool
|
||||
|
||||
metrics queueMetrics[t]
|
||||
|
||||
unfinishedWorkUpdatePeriod time.Duration
|
||||
clock clock.WithTicker
|
||||
|
||||
// wg manages goroutines started by the queue to allow graceful shutdown
|
||||
// ShutDown() will wait for goroutines to exit before returning.
|
||||
wg sync.WaitGroup
|
||||
|
||||
stopCh chan struct{}
|
||||
// stopOnce guarantees we only signal shutdown a single time
|
||||
stopOnce sync.Once
|
||||
}
|
||||
|
||||
// Add marks item as needing processing. When the queue is shutdown new
|
||||
// items will silently be ignored and not queued or marked as dirty for
|
||||
// reprocessing.
|
||||
func (q *Typed[T]) Add(item T) {
|
||||
q.cond.L.Lock()
|
||||
defer q.cond.L.Unlock()
|
||||
if q.shuttingDown {
|
||||
return
|
||||
}
|
||||
if q.dirty.Has(item) {
|
||||
// the same item is added again before it is processed, call the Touch
|
||||
// function if the queue cares about it (for e.g, reset its priority)
|
||||
if !q.processing.Has(item) {
|
||||
q.queue.Touch(item)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
q.metrics.add(item)
|
||||
|
||||
q.dirty.Insert(item)
|
||||
if q.processing.Has(item) {
|
||||
return
|
||||
}
|
||||
|
||||
q.queue.Push(item)
|
||||
q.cond.Signal()
|
||||
}
|
||||
|
||||
// Len returns the current queue length, for informational purposes only. You
|
||||
// shouldn't e.g. gate a call to Add() or Get() on Len() being a particular
|
||||
// value, that can't be synchronized properly.
|
||||
func (q *Typed[T]) Len() int {
|
||||
q.cond.L.Lock()
|
||||
defer q.cond.L.Unlock()
|
||||
return q.queue.Len()
|
||||
}
|
||||
|
||||
// Get blocks until it can return an item to be processed. If shutdown = true,
|
||||
// the caller should end their goroutine. You must call Done with item when you
|
||||
// have finished processing it.
|
||||
func (q *Typed[T]) Get() (item T, shutdown bool) {
|
||||
q.cond.L.Lock()
|
||||
defer q.cond.L.Unlock()
|
||||
for q.queue.Len() == 0 && !q.shuttingDown {
|
||||
q.cond.Wait()
|
||||
}
|
||||
if q.queue.Len() == 0 {
|
||||
// We must be shutting down.
|
||||
return *new(T), true
|
||||
}
|
||||
|
||||
item = q.queue.Pop()
|
||||
|
||||
q.metrics.get(item)
|
||||
|
||||
q.processing.Insert(item)
|
||||
q.dirty.Delete(item)
|
||||
|
||||
return item, false
|
||||
}
|
||||
|
||||
// Done marks item as done processing, and if it has been marked as dirty again
|
||||
// while it was being processed, it will be re-added to the queue for
|
||||
// re-processing.
|
||||
func (q *Typed[T]) Done(item T) {
|
||||
q.cond.L.Lock()
|
||||
defer q.cond.L.Unlock()
|
||||
|
||||
q.metrics.done(item)
|
||||
|
||||
q.processing.Delete(item)
|
||||
if q.dirty.Has(item) {
|
||||
q.queue.Push(item)
|
||||
q.cond.Signal()
|
||||
} else if q.processing.Len() == 0 {
|
||||
q.cond.Signal()
|
||||
}
|
||||
}
|
||||
|
||||
// ShutDown will cause q to ignore all new items added to it. Worker
|
||||
// goroutines will continue processing items in the queue until it is
|
||||
// empty and then receive the shutdown signal.
|
||||
func (q *Typed[T]) ShutDown() {
|
||||
defer q.wg.Wait()
|
||||
q.stopOnce.Do(func() {
|
||||
defer close(q.stopCh)
|
||||
})
|
||||
|
||||
q.cond.L.Lock()
|
||||
defer q.cond.L.Unlock()
|
||||
|
||||
q.drain = false
|
||||
q.shuttingDown = true
|
||||
q.cond.Broadcast()
|
||||
}
|
||||
|
||||
// ShutDownWithDrain is equivalent to ShutDown but waits until all items
|
||||
// in the queue have been processed.
|
||||
// ShutDown can be called after ShutDownWithDrain to force
|
||||
// ShutDownWithDrain to stop waiting.
|
||||
// Workers must call Done on an item after processing it, otherwise
|
||||
// ShutDownWithDrain will block indefinitely.
|
||||
func (q *Typed[T]) ShutDownWithDrain() {
|
||||
defer q.wg.Wait()
|
||||
q.stopOnce.Do(func() {
|
||||
defer close(q.stopCh)
|
||||
})
|
||||
q.cond.L.Lock()
|
||||
defer q.cond.L.Unlock()
|
||||
|
||||
q.drain = true
|
||||
q.shuttingDown = true
|
||||
q.cond.Broadcast()
|
||||
|
||||
for q.processing.Len() != 0 && q.drain {
|
||||
q.cond.Wait()
|
||||
}
|
||||
}
|
||||
|
||||
func (q *Typed[T]) ShuttingDown() bool {
|
||||
q.cond.L.Lock()
|
||||
defer q.cond.L.Unlock()
|
||||
|
||||
return q.shuttingDown
|
||||
}
|
||||
|
||||
func (q *Typed[T]) updateUnfinishedWork() {
|
||||
q.cond.L.Lock()
|
||||
defer q.cond.L.Unlock()
|
||||
if !q.shuttingDown {
|
||||
q.metrics.updateUnfinishedWork()
|
||||
}
|
||||
}
|
||||
|
||||
func (q *Typed[T]) updateUnfinishedWorkLoop() {
|
||||
t := q.clock.NewTicker(q.unfinishedWorkUpdatePeriod)
|
||||
defer t.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-t.C():
|
||||
q.updateUnfinishedWork()
|
||||
case <-q.stopCh:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
147
vendor/k8s.io/client-go/util/workqueue/rate_limiting_queue.go
generated
vendored
Normal file
147
vendor/k8s.io/client-go/util/workqueue/rate_limiting_queue.go
generated
vendored
Normal file
|
|
@ -0,0 +1,147 @@
|
|||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package workqueue
|
||||
|
||||
import "k8s.io/utils/clock"
|
||||
|
||||
// RateLimitingInterface is an interface that rate limits items being added to the queue.
|
||||
//
|
||||
// Deprecated: Use TypedRateLimitingInterface instead.
|
||||
type RateLimitingInterface TypedRateLimitingInterface[any]
|
||||
|
||||
// TypedRateLimitingInterface is an interface that rate limits items being added to the queue.
|
||||
type TypedRateLimitingInterface[T comparable] interface {
|
||||
TypedDelayingInterface[T]
|
||||
|
||||
// AddRateLimited adds an item to the workqueue after the rate limiter says it's ok
|
||||
AddRateLimited(item T)
|
||||
|
||||
// Forget indicates that an item is finished being retried. Doesn't matter whether it's for perm failing
|
||||
// or for success, we'll stop the rate limiter from tracking it. This only clears the `rateLimiter`, you
|
||||
// still have to call `Done` on the queue.
|
||||
Forget(item T)
|
||||
|
||||
// NumRequeues returns back how many times the item was requeued
|
||||
NumRequeues(item T) int
|
||||
}
|
||||
|
||||
// RateLimitingQueueConfig specifies optional configurations to customize a RateLimitingInterface.
|
||||
//
|
||||
// Deprecated: Use TypedRateLimitingQueueConfig instead.
|
||||
type RateLimitingQueueConfig = TypedRateLimitingQueueConfig[any]
|
||||
|
||||
// TypedRateLimitingQueueConfig specifies optional configurations to customize a TypedRateLimitingInterface.
|
||||
type TypedRateLimitingQueueConfig[T comparable] struct {
|
||||
// Name for the queue. If unnamed, the metrics will not be registered.
|
||||
Name string
|
||||
|
||||
// MetricsProvider optionally allows specifying a metrics provider to use for the queue
|
||||
// instead of the global provider.
|
||||
MetricsProvider MetricsProvider
|
||||
|
||||
// Clock optionally allows injecting a real or fake clock for testing purposes.
|
||||
Clock clock.WithTicker
|
||||
|
||||
// DelayingQueue optionally allows injecting custom delaying queue DelayingInterface instead of the default one.
|
||||
DelayingQueue TypedDelayingInterface[T]
|
||||
}
|
||||
|
||||
// NewRateLimitingQueue constructs a new workqueue with rateLimited queuing ability
|
||||
// Remember to call Forget! If you don't, you may end up tracking failures forever.
|
||||
// NewRateLimitingQueue does not emit metrics. For use with a MetricsProvider, please use
|
||||
// NewRateLimitingQueueWithConfig instead and specify a name.
|
||||
//
|
||||
// Deprecated: Use NewTypedRateLimitingQueue instead.
|
||||
func NewRateLimitingQueue(rateLimiter RateLimiter) RateLimitingInterface {
|
||||
return NewRateLimitingQueueWithConfig(rateLimiter, RateLimitingQueueConfig{})
|
||||
}
|
||||
|
||||
// NewTypedRateLimitingQueue constructs a new workqueue with rateLimited queuing ability
|
||||
// Remember to call Forget! If you don't, you may end up tracking failures forever.
|
||||
// NewTypedRateLimitingQueue does not emit metrics. For use with a MetricsProvider, please use
|
||||
// NewTypedRateLimitingQueueWithConfig instead and specify a name.
|
||||
func NewTypedRateLimitingQueue[T comparable](rateLimiter TypedRateLimiter[T]) TypedRateLimitingInterface[T] {
|
||||
return NewTypedRateLimitingQueueWithConfig(rateLimiter, TypedRateLimitingQueueConfig[T]{})
|
||||
}
|
||||
|
||||
// NewRateLimitingQueueWithConfig constructs a new workqueue with rateLimited queuing ability
|
||||
// with options to customize different properties.
|
||||
// Remember to call Forget! If you don't, you may end up tracking failures forever.
|
||||
//
|
||||
// Deprecated: Use NewTypedRateLimitingQueueWithConfig instead.
|
||||
func NewRateLimitingQueueWithConfig(rateLimiter RateLimiter, config RateLimitingQueueConfig) RateLimitingInterface {
|
||||
return NewTypedRateLimitingQueueWithConfig(rateLimiter, config)
|
||||
}
|
||||
|
||||
// NewTypedRateLimitingQueueWithConfig constructs a new workqueue with rateLimited queuing ability
|
||||
// with options to customize different properties.
|
||||
// Remember to call Forget! If you don't, you may end up tracking failures forever.
|
||||
func NewTypedRateLimitingQueueWithConfig[T comparable](rateLimiter TypedRateLimiter[T], config TypedRateLimitingQueueConfig[T]) TypedRateLimitingInterface[T] {
|
||||
if config.Clock == nil {
|
||||
config.Clock = clock.RealClock{}
|
||||
}
|
||||
|
||||
if config.DelayingQueue == nil {
|
||||
config.DelayingQueue = NewTypedDelayingQueueWithConfig(TypedDelayingQueueConfig[T]{
|
||||
Name: config.Name,
|
||||
MetricsProvider: config.MetricsProvider,
|
||||
Clock: config.Clock,
|
||||
})
|
||||
}
|
||||
|
||||
return &rateLimitingType[T]{
|
||||
TypedDelayingInterface: config.DelayingQueue,
|
||||
rateLimiter: rateLimiter,
|
||||
}
|
||||
}
|
||||
|
||||
// NewNamedRateLimitingQueue constructs a new named workqueue with rateLimited queuing ability.
|
||||
// Deprecated: Use NewRateLimitingQueueWithConfig instead.
|
||||
func NewNamedRateLimitingQueue(rateLimiter RateLimiter, name string) RateLimitingInterface {
|
||||
return NewRateLimitingQueueWithConfig(rateLimiter, RateLimitingQueueConfig{
|
||||
Name: name,
|
||||
})
|
||||
}
|
||||
|
||||
// NewRateLimitingQueueWithDelayingInterface constructs a new named workqueue with rateLimited queuing ability
|
||||
// with the option to inject a custom delaying queue instead of the default one.
|
||||
// Deprecated: Use NewRateLimitingQueueWithConfig instead.
|
||||
func NewRateLimitingQueueWithDelayingInterface(di DelayingInterface, rateLimiter RateLimiter) RateLimitingInterface {
|
||||
return NewRateLimitingQueueWithConfig(rateLimiter, RateLimitingQueueConfig{
|
||||
DelayingQueue: di,
|
||||
})
|
||||
}
|
||||
|
||||
// rateLimitingType wraps an Interface and provides rateLimited re-enquing
|
||||
type rateLimitingType[T comparable] struct {
|
||||
TypedDelayingInterface[T]
|
||||
|
||||
rateLimiter TypedRateLimiter[T]
|
||||
}
|
||||
|
||||
// AddRateLimited AddAfter's the item based on the time when the rate limiter says it's ok
|
||||
func (q *rateLimitingType[T]) AddRateLimited(item T) {
|
||||
q.TypedDelayingInterface.AddAfter(item, q.rateLimiter.When(item))
|
||||
}
|
||||
|
||||
func (q *rateLimitingType[T]) NumRequeues(item T) int {
|
||||
return q.rateLimiter.NumRequeues(item)
|
||||
}
|
||||
|
||||
func (q *rateLimitingType[T]) Forget(item T) {
|
||||
q.rateLimiter.Forget(item)
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue