mirror of
https://git.mirrors.martin98.com/https://github.com/ceph/ceph-csi.git
synced 2025-04-18 11:49:57 +08:00
rebase: bump sigs.k8s.io/controller-runtime
Bumps the k8s-dependencies group with 1 update: [sigs.k8s.io/controller-runtime](https://github.com/kubernetes-sigs/controller-runtime). Updates `sigs.k8s.io/controller-runtime` from 0.20.3 to 0.20.4 - [Release notes](https://github.com/kubernetes-sigs/controller-runtime/releases) - [Changelog](https://github.com/kubernetes-sigs/controller-runtime/blob/main/RELEASE.md) - [Commits](https://github.com/kubernetes-sigs/controller-runtime/compare/v0.20.3...v0.20.4) --- updated-dependencies: - dependency-name: sigs.k8s.io/controller-runtime dependency-type: direct:production update-type: version-update:semver-patch dependency-group: k8s-dependencies ... Signed-off-by: dependabot[bot] <support@github.com>
This commit is contained in:
parent
c1564a135f
commit
9002d95e53
2
go.mod
2
go.mod
@ -46,7 +46,7 @@ require (
|
||||
require (
|
||||
// sigs.k8s.io/controller-runtime wants this version, it gets replaced below
|
||||
k8s.io/client-go v12.0.0+incompatible
|
||||
sigs.k8s.io/controller-runtime v0.20.3
|
||||
sigs.k8s.io/controller-runtime v0.20.4
|
||||
)
|
||||
|
||||
replace k8s.io/client-go => k8s.io/client-go v0.32.2
|
||||
|
4
go.sum
4
go.sum
@ -1394,8 +1394,8 @@ k8s.io/utils v0.0.0-20241210054802-24370beab758/go.mod h1:OLgZIPagt7ERELqWJFomSt
|
||||
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
|
||||
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
|
||||
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
|
||||
sigs.k8s.io/controller-runtime v0.20.3 h1:I6Ln8JfQjHH7JbtCD2HCYHoIzajoRxPNuvhvcDbZgkI=
|
||||
sigs.k8s.io/controller-runtime v0.20.3/go.mod h1:xg2XB0K5ShQzAgsoujxuKN4LNXR2LfwwHsPj7Iaw+XY=
|
||||
sigs.k8s.io/controller-runtime v0.20.4 h1:X3c+Odnxz+iPTRobG4tp092+CvBU9UK0t/bRf+n0DGU=
|
||||
sigs.k8s.io/controller-runtime v0.20.4/go.mod h1:xg2XB0K5ShQzAgsoujxuKN4LNXR2LfwwHsPj7Iaw+XY=
|
||||
sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0=
|
||||
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0=
|
||||
sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3/go.mod h1:18nIHnGi6636UCz6m8i4DhaJ65T6EruyzmoQqI2BVDo=
|
||||
|
2
vendor/modules.txt
vendored
2
vendor/modules.txt
vendored
@ -1190,7 +1190,7 @@ k8s.io/utils/path
|
||||
k8s.io/utils/pointer
|
||||
k8s.io/utils/ptr
|
||||
k8s.io/utils/trace
|
||||
# sigs.k8s.io/controller-runtime v0.20.3
|
||||
# sigs.k8s.io/controller-runtime v0.20.4
|
||||
## explicit; go 1.23.0
|
||||
sigs.k8s.io/controller-runtime/pkg/cache
|
||||
sigs.k8s.io/controller-runtime/pkg/cache/internal
|
||||
|
20
vendor/sigs.k8s.io/controller-runtime/pkg/client/apiutil/restmapper.go
generated
vendored
20
vendor/sigs.k8s.io/controller-runtime/pkg/client/apiutil/restmapper.go
generated
vendored
@ -246,10 +246,18 @@ func (m *mapper) addGroupVersionResourcesToCacheAndReloadLocked(gvr map[schema.G
|
||||
}
|
||||
|
||||
if !found {
|
||||
groupResources.Group.Versions = append(groupResources.Group.Versions, metav1.GroupVersionForDiscovery{
|
||||
gv := metav1.GroupVersionForDiscovery{
|
||||
GroupVersion: metav1.GroupVersion{Group: groupVersion.Group, Version: version}.String(),
|
||||
Version: version,
|
||||
})
|
||||
}
|
||||
|
||||
// Prepend if preferred version, else append. The upstream DiscoveryRestMappper assumes
|
||||
// the first version is the preferred one: https://github.com/kubernetes/kubernetes/blob/ef54ac803b712137871c1a1f8d635d50e69ffa6c/staging/src/k8s.io/apimachinery/pkg/api/meta/restmapper.go#L458-L461
|
||||
if group, ok := m.apiGroups[groupVersion.Group]; ok && group.PreferredVersion.Version == version {
|
||||
groupResources.Group.Versions = append([]metav1.GroupVersionForDiscovery{gv}, groupResources.Group.Versions...)
|
||||
} else {
|
||||
groupResources.Group.Versions = append(groupResources.Group.Versions, gv)
|
||||
}
|
||||
}
|
||||
|
||||
// Update data in the cache.
|
||||
@ -284,14 +292,14 @@ func (m *mapper) findAPIGroupByNameAndMaybeAggregatedDiscoveryLocked(groupName s
|
||||
}
|
||||
|
||||
m.initialDiscoveryDone = true
|
||||
if len(maybeResources) > 0 {
|
||||
didAggregatedDiscovery = true
|
||||
m.addGroupVersionResourcesToCacheAndReloadLocked(maybeResources)
|
||||
}
|
||||
for i := range apiGroups.Groups {
|
||||
group := &apiGroups.Groups[i]
|
||||
m.apiGroups[group.Name] = group
|
||||
}
|
||||
if len(maybeResources) > 0 {
|
||||
didAggregatedDiscovery = true
|
||||
m.addGroupVersionResourcesToCacheAndReloadLocked(maybeResources)
|
||||
}
|
||||
|
||||
// Looking in the cache again.
|
||||
// Don't return an error here if the API group is not present.
|
||||
|
2
vendor/sigs.k8s.io/controller-runtime/pkg/controller/name.go
generated
vendored
2
vendor/sigs.k8s.io/controller-runtime/pkg/controller/name.go
generated
vendored
@ -34,7 +34,7 @@ func checkName(name string) error {
|
||||
}
|
||||
|
||||
if usedNames.Has(name) {
|
||||
return fmt.Errorf("controller with name %s already exists. Controller names must be unique to avoid multiple controllers reporting to the same metric", name)
|
||||
return fmt.Errorf("controller with name %s already exists. Controller names must be unique to avoid multiple controllers reporting the same metric. This validation can be disabled via the SkipNameValidation option", name)
|
||||
}
|
||||
|
||||
usedNames.Insert(name)
|
||||
|
19
vendor/sigs.k8s.io/controller-runtime/pkg/handler/enqueue.go
generated
vendored
19
vendor/sigs.k8s.io/controller-runtime/pkg/handler/enqueue.go
generated
vendored
@ -52,25 +52,32 @@ func (e *TypedEnqueueRequestForObject[T]) Create(ctx context.Context, evt event.
|
||||
enqueueLog.Error(nil, "CreateEvent received with no metadata", "event", evt)
|
||||
return
|
||||
}
|
||||
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
|
||||
|
||||
item := reconcile.Request{NamespacedName: types.NamespacedName{
|
||||
Name: evt.Object.GetName(),
|
||||
Namespace: evt.Object.GetNamespace(),
|
||||
}})
|
||||
}}
|
||||
|
||||
addToQueueCreate(q, evt, item)
|
||||
}
|
||||
|
||||
// Update implements EventHandler.
|
||||
func (e *TypedEnqueueRequestForObject[T]) Update(ctx context.Context, evt event.TypedUpdateEvent[T], q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
|
||||
switch {
|
||||
case !isNil(evt.ObjectNew):
|
||||
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
|
||||
item := reconcile.Request{NamespacedName: types.NamespacedName{
|
||||
Name: evt.ObjectNew.GetName(),
|
||||
Namespace: evt.ObjectNew.GetNamespace(),
|
||||
}})
|
||||
}}
|
||||
|
||||
addToQueueUpdate(q, evt, item)
|
||||
case !isNil(evt.ObjectOld):
|
||||
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
|
||||
item := reconcile.Request{NamespacedName: types.NamespacedName{
|
||||
Name: evt.ObjectOld.GetName(),
|
||||
Namespace: evt.ObjectOld.GetNamespace(),
|
||||
}})
|
||||
}}
|
||||
|
||||
addToQueueUpdate(q, evt, item)
|
||||
default:
|
||||
enqueueLog.Error(nil, "UpdateEvent received with no metadata", "event", evt)
|
||||
}
|
||||
|
45
vendor/sigs.k8s.io/controller-runtime/pkg/handler/enqueue_mapped.go
generated
vendored
45
vendor/sigs.k8s.io/controller-runtime/pkg/handler/enqueue_mapped.go
generated
vendored
@ -21,6 +21,7 @@ import (
|
||||
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
"sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue"
|
||||
"sigs.k8s.io/controller-runtime/pkg/event"
|
||||
"sigs.k8s.io/controller-runtime/pkg/reconcile"
|
||||
)
|
||||
@ -63,7 +64,8 @@ func EnqueueRequestsFromMapFunc(fn MapFunc) EventHandler {
|
||||
// TypedEnqueueRequestsFromMapFunc is experimental and subject to future change.
|
||||
func TypedEnqueueRequestsFromMapFunc[object any, request comparable](fn TypedMapFunc[object, request]) TypedEventHandler[object, request] {
|
||||
return &enqueueRequestsFromMapFunc[object, request]{
|
||||
toRequests: fn,
|
||||
toRequests: fn,
|
||||
objectImplementsClientObject: implementsClientObject[object](),
|
||||
}
|
||||
}
|
||||
|
||||
@ -71,7 +73,8 @@ var _ EventHandler = &enqueueRequestsFromMapFunc[client.Object, reconcile.Reques
|
||||
|
||||
type enqueueRequestsFromMapFunc[object any, request comparable] struct {
|
||||
// Mapper transforms the argument into a slice of keys to be reconciled
|
||||
toRequests TypedMapFunc[object, request]
|
||||
toRequests TypedMapFunc[object, request]
|
||||
objectImplementsClientObject bool
|
||||
}
|
||||
|
||||
// Create implements EventHandler.
|
||||
@ -81,7 +84,15 @@ func (e *enqueueRequestsFromMapFunc[object, request]) Create(
|
||||
q workqueue.TypedRateLimitingInterface[request],
|
||||
) {
|
||||
reqs := map[request]empty{}
|
||||
e.mapAndEnqueue(ctx, q, evt.Object, reqs)
|
||||
|
||||
var lowPriority bool
|
||||
if e.objectImplementsClientObject && isPriorityQueue(q) && !isNil(evt.Object) {
|
||||
clientObjectEvent := event.CreateEvent{Object: any(evt.Object).(client.Object)}
|
||||
if isObjectUnchanged(clientObjectEvent) {
|
||||
lowPriority = true
|
||||
}
|
||||
}
|
||||
e.mapAndEnqueue(ctx, q, evt.Object, reqs, lowPriority)
|
||||
}
|
||||
|
||||
// Update implements EventHandler.
|
||||
@ -90,9 +101,13 @@ func (e *enqueueRequestsFromMapFunc[object, request]) Update(
|
||||
evt event.TypedUpdateEvent[object],
|
||||
q workqueue.TypedRateLimitingInterface[request],
|
||||
) {
|
||||
var lowPriority bool
|
||||
if e.objectImplementsClientObject && isPriorityQueue(q) && !isNil(evt.ObjectOld) && !isNil(evt.ObjectNew) {
|
||||
lowPriority = any(evt.ObjectOld).(client.Object).GetResourceVersion() == any(evt.ObjectNew).(client.Object).GetResourceVersion()
|
||||
}
|
||||
reqs := map[request]empty{}
|
||||
e.mapAndEnqueue(ctx, q, evt.ObjectOld, reqs)
|
||||
e.mapAndEnqueue(ctx, q, evt.ObjectNew, reqs)
|
||||
e.mapAndEnqueue(ctx, q, evt.ObjectOld, reqs, lowPriority)
|
||||
e.mapAndEnqueue(ctx, q, evt.ObjectNew, reqs, lowPriority)
|
||||
}
|
||||
|
||||
// Delete implements EventHandler.
|
||||
@ -102,7 +117,7 @@ func (e *enqueueRequestsFromMapFunc[object, request]) Delete(
|
||||
q workqueue.TypedRateLimitingInterface[request],
|
||||
) {
|
||||
reqs := map[request]empty{}
|
||||
e.mapAndEnqueue(ctx, q, evt.Object, reqs)
|
||||
e.mapAndEnqueue(ctx, q, evt.Object, reqs, false)
|
||||
}
|
||||
|
||||
// Generic implements EventHandler.
|
||||
@ -112,14 +127,26 @@ func (e *enqueueRequestsFromMapFunc[object, request]) Generic(
|
||||
q workqueue.TypedRateLimitingInterface[request],
|
||||
) {
|
||||
reqs := map[request]empty{}
|
||||
e.mapAndEnqueue(ctx, q, evt.Object, reqs)
|
||||
e.mapAndEnqueue(ctx, q, evt.Object, reqs, false)
|
||||
}
|
||||
|
||||
func (e *enqueueRequestsFromMapFunc[object, request]) mapAndEnqueue(ctx context.Context, q workqueue.TypedRateLimitingInterface[request], o object, reqs map[request]empty) {
|
||||
func (e *enqueueRequestsFromMapFunc[object, request]) mapAndEnqueue(
|
||||
ctx context.Context,
|
||||
q workqueue.TypedRateLimitingInterface[request],
|
||||
o object,
|
||||
reqs map[request]empty,
|
||||
lowPriority bool,
|
||||
) {
|
||||
for _, req := range e.toRequests(ctx, o) {
|
||||
_, ok := reqs[req]
|
||||
if !ok {
|
||||
q.Add(req)
|
||||
if lowPriority {
|
||||
q.(priorityqueue.PriorityQueue[request]).AddWithOpts(priorityqueue.AddOpts{
|
||||
Priority: LowPriority,
|
||||
}, req)
|
||||
} else {
|
||||
q.Add(req)
|
||||
}
|
||||
reqs[req] = empty{}
|
||||
}
|
||||
}
|
||||
|
2
vendor/sigs.k8s.io/controller-runtime/pkg/handler/enqueue_owner.go
generated
vendored
2
vendor/sigs.k8s.io/controller-runtime/pkg/handler/enqueue_owner.go
generated
vendored
@ -72,7 +72,7 @@ func TypedEnqueueRequestForOwner[object client.Object](scheme *runtime.Scheme, m
|
||||
for _, opt := range opts {
|
||||
opt(e)
|
||||
}
|
||||
return e
|
||||
return WithLowPriorityWhenUnchanged(e)
|
||||
}
|
||||
|
||||
// OnlyControllerOwner if provided will only look at the first OwnerReference with Controller: true.
|
||||
|
132
vendor/sigs.k8s.io/controller-runtime/pkg/handler/eventhandler.go
generated
vendored
132
vendor/sigs.k8s.io/controller-runtime/pkg/handler/eventhandler.go
generated
vendored
@ -18,6 +18,7 @@ package handler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"reflect"
|
||||
"time"
|
||||
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
@ -108,10 +109,46 @@ type TypedFuncs[object any, request comparable] struct {
|
||||
GenericFunc func(context.Context, event.TypedGenericEvent[object], workqueue.TypedRateLimitingInterface[request])
|
||||
}
|
||||
|
||||
var typeForClientObject = reflect.TypeFor[client.Object]()
|
||||
|
||||
func implementsClientObject[object any]() bool {
|
||||
return reflect.TypeFor[object]().Implements(typeForClientObject)
|
||||
}
|
||||
|
||||
func isPriorityQueue[request comparable](q workqueue.TypedRateLimitingInterface[request]) bool {
|
||||
_, ok := q.(priorityqueue.PriorityQueue[request])
|
||||
return ok
|
||||
}
|
||||
|
||||
// Create implements EventHandler.
|
||||
func (h TypedFuncs[object, request]) Create(ctx context.Context, e event.TypedCreateEvent[object], q workqueue.TypedRateLimitingInterface[request]) {
|
||||
if h.CreateFunc != nil {
|
||||
h.CreateFunc(ctx, e, q)
|
||||
if !implementsClientObject[object]() || !isPriorityQueue(q) || isNil(e.Object) {
|
||||
h.CreateFunc(ctx, e, q)
|
||||
return
|
||||
}
|
||||
wq := workqueueWithCustomAddFunc[request]{
|
||||
TypedRateLimitingInterface: q,
|
||||
// We already know that we have a priority queue, that event.Object implements
|
||||
// client.Object and that its not nil
|
||||
addFunc: func(item request, q workqueue.TypedRateLimitingInterface[request]) {
|
||||
// We construct a new event typed to client.Object because isObjectUnchanged
|
||||
// is a generic and hence has to know at compile time the type of the event
|
||||
// it gets. We only figure that out at runtime though, but we know for sure
|
||||
// that it implements client.Object at this point so we can hardcode the event
|
||||
// type to that.
|
||||
evt := event.CreateEvent{Object: any(e.Object).(client.Object)}
|
||||
var priority int
|
||||
if isObjectUnchanged(evt) {
|
||||
priority = LowPriority
|
||||
}
|
||||
q.(priorityqueue.PriorityQueue[request]).AddWithOpts(
|
||||
priorityqueue.AddOpts{Priority: priority},
|
||||
item,
|
||||
)
|
||||
},
|
||||
}
|
||||
h.CreateFunc(ctx, e, wq)
|
||||
}
|
||||
}
|
||||
|
||||
@ -125,7 +162,27 @@ func (h TypedFuncs[object, request]) Delete(ctx context.Context, e event.TypedDe
|
||||
// Update implements EventHandler.
|
||||
func (h TypedFuncs[object, request]) Update(ctx context.Context, e event.TypedUpdateEvent[object], q workqueue.TypedRateLimitingInterface[request]) {
|
||||
if h.UpdateFunc != nil {
|
||||
h.UpdateFunc(ctx, e, q)
|
||||
if !implementsClientObject[object]() || !isPriorityQueue(q) || isNil(e.ObjectOld) || isNil(e.ObjectNew) {
|
||||
h.UpdateFunc(ctx, e, q)
|
||||
return
|
||||
}
|
||||
|
||||
wq := workqueueWithCustomAddFunc[request]{
|
||||
TypedRateLimitingInterface: q,
|
||||
// We already know that we have a priority queue, that event.ObjectOld and ObjectNew implement
|
||||
// client.Object and that they are not nil
|
||||
addFunc: func(item request, q workqueue.TypedRateLimitingInterface[request]) {
|
||||
var priority int
|
||||
if any(e.ObjectOld).(client.Object).GetResourceVersion() == any(e.ObjectNew).(client.Object).GetResourceVersion() {
|
||||
priority = LowPriority
|
||||
}
|
||||
q.(priorityqueue.PriorityQueue[request]).AddWithOpts(
|
||||
priorityqueue.AddOpts{Priority: priority},
|
||||
item,
|
||||
)
|
||||
},
|
||||
}
|
||||
h.UpdateFunc(ctx, e, wq)
|
||||
}
|
||||
}
|
||||
|
||||
@ -142,43 +199,10 @@ const LowPriority = -100
|
||||
// WithLowPriorityWhenUnchanged reduces the priority of events stemming from the initial listwatch or from a resync if
|
||||
// and only if a priorityqueue.PriorityQueue is used. If not, it does nothing.
|
||||
func WithLowPriorityWhenUnchanged[object client.Object, request comparable](u TypedEventHandler[object, request]) TypedEventHandler[object, request] {
|
||||
// TypedFuncs already implements this so just wrap
|
||||
return TypedFuncs[object, request]{
|
||||
CreateFunc: func(ctx context.Context, tce event.TypedCreateEvent[object], trli workqueue.TypedRateLimitingInterface[request]) {
|
||||
// Due to how the handlers are factored, we have to wrap the workqueue to be able
|
||||
// to inject custom behavior.
|
||||
u.Create(ctx, tce, workqueueWithCustomAddFunc[request]{
|
||||
TypedRateLimitingInterface: trli,
|
||||
addFunc: func(item request, q workqueue.TypedRateLimitingInterface[request]) {
|
||||
priorityQueue, isPriorityQueue := q.(priorityqueue.PriorityQueue[request])
|
||||
if !isPriorityQueue {
|
||||
q.Add(item)
|
||||
return
|
||||
}
|
||||
var priority int
|
||||
if isObjectUnchanged(tce) {
|
||||
priority = LowPriority
|
||||
}
|
||||
priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item)
|
||||
},
|
||||
})
|
||||
},
|
||||
UpdateFunc: func(ctx context.Context, tue event.TypedUpdateEvent[object], trli workqueue.TypedRateLimitingInterface[request]) {
|
||||
u.Update(ctx, tue, workqueueWithCustomAddFunc[request]{
|
||||
TypedRateLimitingInterface: trli,
|
||||
addFunc: func(item request, q workqueue.TypedRateLimitingInterface[request]) {
|
||||
priorityQueue, isPriorityQueue := q.(priorityqueue.PriorityQueue[request])
|
||||
if !isPriorityQueue {
|
||||
q.Add(item)
|
||||
return
|
||||
}
|
||||
var priority int
|
||||
if tue.ObjectOld.GetResourceVersion() == tue.ObjectNew.GetResourceVersion() {
|
||||
priority = LowPriority
|
||||
}
|
||||
priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item)
|
||||
},
|
||||
})
|
||||
},
|
||||
CreateFunc: u.Create,
|
||||
UpdateFunc: u.Update,
|
||||
DeleteFunc: u.Delete,
|
||||
GenericFunc: u.Generic,
|
||||
}
|
||||
@ -199,3 +223,35 @@ func (w workqueueWithCustomAddFunc[request]) Add(item request) {
|
||||
func isObjectUnchanged[object client.Object](e event.TypedCreateEvent[object]) bool {
|
||||
return e.Object.GetCreationTimestamp().Time.Before(time.Now().Add(-time.Minute))
|
||||
}
|
||||
|
||||
// addToQueueCreate adds the reconcile.Request to the priorityqueue in the handler
|
||||
// for Create requests if and only if the workqueue being used is of type priorityqueue.PriorityQueue[reconcile.Request]
|
||||
func addToQueueCreate[T client.Object, request comparable](q workqueue.TypedRateLimitingInterface[request], evt event.TypedCreateEvent[T], item request) {
|
||||
priorityQueue, isPriorityQueue := q.(priorityqueue.PriorityQueue[request])
|
||||
if !isPriorityQueue {
|
||||
q.Add(item)
|
||||
return
|
||||
}
|
||||
|
||||
var priority int
|
||||
if isObjectUnchanged(evt) {
|
||||
priority = LowPriority
|
||||
}
|
||||
priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item)
|
||||
}
|
||||
|
||||
// addToQueueUpdate adds the reconcile.Request to the priorityqueue in the handler
|
||||
// for Update requests if and only if the workqueue being used is of type priorityqueue.PriorityQueue[reconcile.Request]
|
||||
func addToQueueUpdate[T client.Object, request comparable](q workqueue.TypedRateLimitingInterface[request], evt event.TypedUpdateEvent[T], item request) {
|
||||
priorityQueue, isPriorityQueue := q.(priorityqueue.PriorityQueue[request])
|
||||
if !isPriorityQueue {
|
||||
q.Add(item)
|
||||
return
|
||||
}
|
||||
|
||||
var priority int
|
||||
if evt.ObjectOld.GetResourceVersion() == evt.ObjectNew.GetResourceVersion() {
|
||||
priority = LowPriority
|
||||
}
|
||||
priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item)
|
||||
}
|
||||
|
44
vendor/sigs.k8s.io/controller-runtime/pkg/internal/controller/controller.go
generated
vendored
44
vendor/sigs.k8s.io/controller-runtime/pkg/internal/controller/controller.go
generated
vendored
@ -31,6 +31,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/util/uuid"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
|
||||
"sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue"
|
||||
ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics"
|
||||
logf "sigs.k8s.io/controller-runtime/pkg/log"
|
||||
"sigs.k8s.io/controller-runtime/pkg/reconcile"
|
||||
@ -60,7 +61,7 @@ type Controller[request comparable] struct {
|
||||
|
||||
// Queue is an listeningQueue that listens for events from Informers and adds object keys to
|
||||
// the Queue for processing
|
||||
Queue workqueue.TypedRateLimitingInterface[request]
|
||||
Queue priorityqueue.PriorityQueue[request]
|
||||
|
||||
// mu is used to synchronize Controller setup
|
||||
mu sync.Mutex
|
||||
@ -157,7 +158,12 @@ func (c *Controller[request]) Start(ctx context.Context) error {
|
||||
// Set the internal context.
|
||||
c.ctx = ctx
|
||||
|
||||
c.Queue = c.NewQueue(c.Name, c.RateLimiter)
|
||||
queue := c.NewQueue(c.Name, c.RateLimiter)
|
||||
if priorityQueue, isPriorityQueue := queue.(priorityqueue.PriorityQueue[request]); isPriorityQueue {
|
||||
c.Queue = priorityQueue
|
||||
} else {
|
||||
c.Queue = &priorityQueueWrapper[request]{TypedRateLimitingInterface: queue}
|
||||
}
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
c.Queue.ShutDown()
|
||||
@ -268,7 +274,7 @@ func (c *Controller[request]) Start(ctx context.Context) error {
|
||||
// processNextWorkItem will read a single work item off the workqueue and
|
||||
// attempt to process it, by calling the reconcileHandler.
|
||||
func (c *Controller[request]) processNextWorkItem(ctx context.Context) bool {
|
||||
obj, shutdown := c.Queue.Get()
|
||||
obj, priority, shutdown := c.Queue.GetWithPriority()
|
||||
if shutdown {
|
||||
// Stop working
|
||||
return false
|
||||
@ -285,7 +291,7 @@ func (c *Controller[request]) processNextWorkItem(ctx context.Context) bool {
|
||||
ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(1)
|
||||
defer ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(-1)
|
||||
|
||||
c.reconcileHandler(ctx, obj)
|
||||
c.reconcileHandler(ctx, obj, priority)
|
||||
return true
|
||||
}
|
||||
|
||||
@ -308,7 +314,7 @@ func (c *Controller[request]) initMetrics() {
|
||||
ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Set(0)
|
||||
}
|
||||
|
||||
func (c *Controller[request]) reconcileHandler(ctx context.Context, req request) {
|
||||
func (c *Controller[request]) reconcileHandler(ctx context.Context, req request, priority int) {
|
||||
// Update metrics after processing each item
|
||||
reconcileStartTS := time.Now()
|
||||
defer func() {
|
||||
@ -331,7 +337,7 @@ func (c *Controller[request]) reconcileHandler(ctx context.Context, req request)
|
||||
if errors.Is(err, reconcile.TerminalError(nil)) {
|
||||
ctrlmetrics.TerminalReconcileErrors.WithLabelValues(c.Name).Inc()
|
||||
} else {
|
||||
c.Queue.AddRateLimited(req)
|
||||
c.Queue.AddWithOpts(priorityqueue.AddOpts{RateLimited: true, Priority: priority}, req)
|
||||
}
|
||||
ctrlmetrics.ReconcileErrors.WithLabelValues(c.Name).Inc()
|
||||
ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelError).Inc()
|
||||
@ -346,11 +352,11 @@ func (c *Controller[request]) reconcileHandler(ctx context.Context, req request)
|
||||
// We need to drive to stable reconcile loops before queuing due
|
||||
// to result.RequestAfter
|
||||
c.Queue.Forget(req)
|
||||
c.Queue.AddAfter(req, result.RequeueAfter)
|
||||
c.Queue.AddWithOpts(priorityqueue.AddOpts{After: result.RequeueAfter, Priority: priority}, req)
|
||||
ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeueAfter).Inc()
|
||||
case result.Requeue:
|
||||
log.V(5).Info("Reconcile done, requeueing")
|
||||
c.Queue.AddRateLimited(req)
|
||||
c.Queue.AddWithOpts(priorityqueue.AddOpts{RateLimited: true, Priority: priority}, req)
|
||||
ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeue).Inc()
|
||||
default:
|
||||
log.V(5).Info("Reconcile successful")
|
||||
@ -388,3 +394,25 @@ type reconcileIDKey struct{}
|
||||
func addReconcileID(ctx context.Context, reconcileID types.UID) context.Context {
|
||||
return context.WithValue(ctx, reconcileIDKey{}, reconcileID)
|
||||
}
|
||||
|
||||
type priorityQueueWrapper[request comparable] struct {
|
||||
workqueue.TypedRateLimitingInterface[request]
|
||||
}
|
||||
|
||||
func (p *priorityQueueWrapper[request]) AddWithOpts(opts priorityqueue.AddOpts, items ...request) {
|
||||
for _, item := range items {
|
||||
switch {
|
||||
case opts.RateLimited:
|
||||
p.TypedRateLimitingInterface.AddRateLimited(item)
|
||||
case opts.After > 0:
|
||||
p.TypedRateLimitingInterface.AddAfter(item, opts.After)
|
||||
default:
|
||||
p.TypedRateLimitingInterface.Add(item)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *priorityQueueWrapper[request]) GetWithPriority() (request, int, bool) {
|
||||
item, shutdown := p.TypedRateLimitingInterface.Get()
|
||||
return item, 0, shutdown
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user