From 9002d95e5321d73609c552d160cc9361c5279075 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 24 Mar 2025 21:06:14 +0000 Subject: [PATCH] rebase: bump sigs.k8s.io/controller-runtime Bumps the k8s-dependencies group with 1 update: [sigs.k8s.io/controller-runtime](https://github.com/kubernetes-sigs/controller-runtime). Updates `sigs.k8s.io/controller-runtime` from 0.20.3 to 0.20.4 - [Release notes](https://github.com/kubernetes-sigs/controller-runtime/releases) - [Changelog](https://github.com/kubernetes-sigs/controller-runtime/blob/main/RELEASE.md) - [Commits](https://github.com/kubernetes-sigs/controller-runtime/compare/v0.20.3...v0.20.4) --- updated-dependencies: - dependency-name: sigs.k8s.io/controller-runtime dependency-type: direct:production update-type: version-update:semver-patch dependency-group: k8s-dependencies ... Signed-off-by: dependabot[bot] --- go.mod | 2 +- go.sum | 4 +- vendor/modules.txt | 2 +- .../pkg/client/apiutil/restmapper.go | 20 ++- .../controller-runtime/pkg/controller/name.go | 2 +- .../controller-runtime/pkg/handler/enqueue.go | 19 ++- .../pkg/handler/enqueue_mapped.go | 45 ++++-- .../pkg/handler/enqueue_owner.go | 2 +- .../pkg/handler/eventhandler.go | 132 +++++++++++++----- .../pkg/internal/controller/controller.go | 44 ++++-- 10 files changed, 199 insertions(+), 73 deletions(-) diff --git a/go.mod b/go.mod index 0be9a3967..744df0f9a 100644 --- a/go.mod +++ b/go.mod @@ -46,7 +46,7 @@ require ( require ( // sigs.k8s.io/controller-runtime wants this version, it gets replaced below k8s.io/client-go v12.0.0+incompatible - sigs.k8s.io/controller-runtime v0.20.3 + sigs.k8s.io/controller-runtime v0.20.4 ) replace k8s.io/client-go => k8s.io/client-go v0.32.2 diff --git a/go.sum b/go.sum index 33f926f64..28bf1ff85 100644 --- a/go.sum +++ b/go.sum @@ -1394,8 +1394,8 @@ k8s.io/utils v0.0.0-20241210054802-24370beab758/go.mod h1:OLgZIPagt7ERELqWJFomSt rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA= -sigs.k8s.io/controller-runtime v0.20.3 h1:I6Ln8JfQjHH7JbtCD2HCYHoIzajoRxPNuvhvcDbZgkI= -sigs.k8s.io/controller-runtime v0.20.3/go.mod h1:xg2XB0K5ShQzAgsoujxuKN4LNXR2LfwwHsPj7Iaw+XY= +sigs.k8s.io/controller-runtime v0.20.4 h1:X3c+Odnxz+iPTRobG4tp092+CvBU9UK0t/bRf+n0DGU= +sigs.k8s.io/controller-runtime v0.20.4/go.mod h1:xg2XB0K5ShQzAgsoujxuKN4LNXR2LfwwHsPj7Iaw+XY= sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0= sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0= sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3/go.mod h1:18nIHnGi6636UCz6m8i4DhaJ65T6EruyzmoQqI2BVDo= diff --git a/vendor/modules.txt b/vendor/modules.txt index b0005d566..986c787e9 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1190,7 +1190,7 @@ k8s.io/utils/path k8s.io/utils/pointer k8s.io/utils/ptr k8s.io/utils/trace -# sigs.k8s.io/controller-runtime v0.20.3 +# sigs.k8s.io/controller-runtime v0.20.4 ## explicit; go 1.23.0 sigs.k8s.io/controller-runtime/pkg/cache sigs.k8s.io/controller-runtime/pkg/cache/internal diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/client/apiutil/restmapper.go b/vendor/sigs.k8s.io/controller-runtime/pkg/client/apiutil/restmapper.go index ad898617f..7a7a0d114 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/client/apiutil/restmapper.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/client/apiutil/restmapper.go @@ -246,10 +246,18 @@ func (m *mapper) addGroupVersionResourcesToCacheAndReloadLocked(gvr map[schema.G } if !found { - groupResources.Group.Versions = append(groupResources.Group.Versions, metav1.GroupVersionForDiscovery{ + gv := metav1.GroupVersionForDiscovery{ GroupVersion: metav1.GroupVersion{Group: groupVersion.Group, Version: version}.String(), Version: version, - }) + } + + // Prepend if preferred version, else append. The upstream DiscoveryRestMappper assumes + // the first version is the preferred one: https://github.com/kubernetes/kubernetes/blob/ef54ac803b712137871c1a1f8d635d50e69ffa6c/staging/src/k8s.io/apimachinery/pkg/api/meta/restmapper.go#L458-L461 + if group, ok := m.apiGroups[groupVersion.Group]; ok && group.PreferredVersion.Version == version { + groupResources.Group.Versions = append([]metav1.GroupVersionForDiscovery{gv}, groupResources.Group.Versions...) + } else { + groupResources.Group.Versions = append(groupResources.Group.Versions, gv) + } } // Update data in the cache. @@ -284,14 +292,14 @@ func (m *mapper) findAPIGroupByNameAndMaybeAggregatedDiscoveryLocked(groupName s } m.initialDiscoveryDone = true - if len(maybeResources) > 0 { - didAggregatedDiscovery = true - m.addGroupVersionResourcesToCacheAndReloadLocked(maybeResources) - } for i := range apiGroups.Groups { group := &apiGroups.Groups[i] m.apiGroups[group.Name] = group } + if len(maybeResources) > 0 { + didAggregatedDiscovery = true + m.addGroupVersionResourcesToCacheAndReloadLocked(maybeResources) + } // Looking in the cache again. // Don't return an error here if the API group is not present. diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/controller/name.go b/vendor/sigs.k8s.io/controller-runtime/pkg/controller/name.go index 0e71a01c6..00ca65512 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/controller/name.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/controller/name.go @@ -34,7 +34,7 @@ func checkName(name string) error { } if usedNames.Has(name) { - return fmt.Errorf("controller with name %s already exists. Controller names must be unique to avoid multiple controllers reporting to the same metric", name) + return fmt.Errorf("controller with name %s already exists. Controller names must be unique to avoid multiple controllers reporting the same metric. This validation can be disabled via the SkipNameValidation option", name) } usedNames.Insert(name) diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/handler/enqueue.go b/vendor/sigs.k8s.io/controller-runtime/pkg/handler/enqueue.go index 1a1d1ab2f..64cbe8a4d 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/handler/enqueue.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/handler/enqueue.go @@ -52,25 +52,32 @@ func (e *TypedEnqueueRequestForObject[T]) Create(ctx context.Context, evt event. enqueueLog.Error(nil, "CreateEvent received with no metadata", "event", evt) return } - q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ + + item := reconcile.Request{NamespacedName: types.NamespacedName{ Name: evt.Object.GetName(), Namespace: evt.Object.GetNamespace(), - }}) + }} + + addToQueueCreate(q, evt, item) } // Update implements EventHandler. func (e *TypedEnqueueRequestForObject[T]) Update(ctx context.Context, evt event.TypedUpdateEvent[T], q workqueue.TypedRateLimitingInterface[reconcile.Request]) { switch { case !isNil(evt.ObjectNew): - q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ + item := reconcile.Request{NamespacedName: types.NamespacedName{ Name: evt.ObjectNew.GetName(), Namespace: evt.ObjectNew.GetNamespace(), - }}) + }} + + addToQueueUpdate(q, evt, item) case !isNil(evt.ObjectOld): - q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ + item := reconcile.Request{NamespacedName: types.NamespacedName{ Name: evt.ObjectOld.GetName(), Namespace: evt.ObjectOld.GetNamespace(), - }}) + }} + + addToQueueUpdate(q, evt, item) default: enqueueLog.Error(nil, "UpdateEvent received with no metadata", "event", evt) } diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/handler/enqueue_mapped.go b/vendor/sigs.k8s.io/controller-runtime/pkg/handler/enqueue_mapped.go index 491bc40c4..be97fa378 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/handler/enqueue_mapped.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/handler/enqueue_mapped.go @@ -21,6 +21,7 @@ import ( "k8s.io/client-go/util/workqueue" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/reconcile" ) @@ -63,7 +64,8 @@ func EnqueueRequestsFromMapFunc(fn MapFunc) EventHandler { // TypedEnqueueRequestsFromMapFunc is experimental and subject to future change. func TypedEnqueueRequestsFromMapFunc[object any, request comparable](fn TypedMapFunc[object, request]) TypedEventHandler[object, request] { return &enqueueRequestsFromMapFunc[object, request]{ - toRequests: fn, + toRequests: fn, + objectImplementsClientObject: implementsClientObject[object](), } } @@ -71,7 +73,8 @@ var _ EventHandler = &enqueueRequestsFromMapFunc[client.Object, reconcile.Reques type enqueueRequestsFromMapFunc[object any, request comparable] struct { // Mapper transforms the argument into a slice of keys to be reconciled - toRequests TypedMapFunc[object, request] + toRequests TypedMapFunc[object, request] + objectImplementsClientObject bool } // Create implements EventHandler. @@ -81,7 +84,15 @@ func (e *enqueueRequestsFromMapFunc[object, request]) Create( q workqueue.TypedRateLimitingInterface[request], ) { reqs := map[request]empty{} - e.mapAndEnqueue(ctx, q, evt.Object, reqs) + + var lowPriority bool + if e.objectImplementsClientObject && isPriorityQueue(q) && !isNil(evt.Object) { + clientObjectEvent := event.CreateEvent{Object: any(evt.Object).(client.Object)} + if isObjectUnchanged(clientObjectEvent) { + lowPriority = true + } + } + e.mapAndEnqueue(ctx, q, evt.Object, reqs, lowPriority) } // Update implements EventHandler. @@ -90,9 +101,13 @@ func (e *enqueueRequestsFromMapFunc[object, request]) Update( evt event.TypedUpdateEvent[object], q workqueue.TypedRateLimitingInterface[request], ) { + var lowPriority bool + if e.objectImplementsClientObject && isPriorityQueue(q) && !isNil(evt.ObjectOld) && !isNil(evt.ObjectNew) { + lowPriority = any(evt.ObjectOld).(client.Object).GetResourceVersion() == any(evt.ObjectNew).(client.Object).GetResourceVersion() + } reqs := map[request]empty{} - e.mapAndEnqueue(ctx, q, evt.ObjectOld, reqs) - e.mapAndEnqueue(ctx, q, evt.ObjectNew, reqs) + e.mapAndEnqueue(ctx, q, evt.ObjectOld, reqs, lowPriority) + e.mapAndEnqueue(ctx, q, evt.ObjectNew, reqs, lowPriority) } // Delete implements EventHandler. @@ -102,7 +117,7 @@ func (e *enqueueRequestsFromMapFunc[object, request]) Delete( q workqueue.TypedRateLimitingInterface[request], ) { reqs := map[request]empty{} - e.mapAndEnqueue(ctx, q, evt.Object, reqs) + e.mapAndEnqueue(ctx, q, evt.Object, reqs, false) } // Generic implements EventHandler. @@ -112,14 +127,26 @@ func (e *enqueueRequestsFromMapFunc[object, request]) Generic( q workqueue.TypedRateLimitingInterface[request], ) { reqs := map[request]empty{} - e.mapAndEnqueue(ctx, q, evt.Object, reqs) + e.mapAndEnqueue(ctx, q, evt.Object, reqs, false) } -func (e *enqueueRequestsFromMapFunc[object, request]) mapAndEnqueue(ctx context.Context, q workqueue.TypedRateLimitingInterface[request], o object, reqs map[request]empty) { +func (e *enqueueRequestsFromMapFunc[object, request]) mapAndEnqueue( + ctx context.Context, + q workqueue.TypedRateLimitingInterface[request], + o object, + reqs map[request]empty, + lowPriority bool, +) { for _, req := range e.toRequests(ctx, o) { _, ok := reqs[req] if !ok { - q.Add(req) + if lowPriority { + q.(priorityqueue.PriorityQueue[request]).AddWithOpts(priorityqueue.AddOpts{ + Priority: LowPriority, + }, req) + } else { + q.Add(req) + } reqs[req] = empty{} } } diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/handler/enqueue_owner.go b/vendor/sigs.k8s.io/controller-runtime/pkg/handler/enqueue_owner.go index 1680043b4..e8fc8eb46 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/handler/enqueue_owner.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/handler/enqueue_owner.go @@ -72,7 +72,7 @@ func TypedEnqueueRequestForOwner[object client.Object](scheme *runtime.Scheme, m for _, opt := range opts { opt(e) } - return e + return WithLowPriorityWhenUnchanged(e) } // OnlyControllerOwner if provided will only look at the first OwnerReference with Controller: true. diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/handler/eventhandler.go b/vendor/sigs.k8s.io/controller-runtime/pkg/handler/eventhandler.go index 57107f20e..84a10ac07 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/handler/eventhandler.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/handler/eventhandler.go @@ -18,6 +18,7 @@ package handler import ( "context" + "reflect" "time" "k8s.io/client-go/util/workqueue" @@ -108,10 +109,46 @@ type TypedFuncs[object any, request comparable] struct { GenericFunc func(context.Context, event.TypedGenericEvent[object], workqueue.TypedRateLimitingInterface[request]) } +var typeForClientObject = reflect.TypeFor[client.Object]() + +func implementsClientObject[object any]() bool { + return reflect.TypeFor[object]().Implements(typeForClientObject) +} + +func isPriorityQueue[request comparable](q workqueue.TypedRateLimitingInterface[request]) bool { + _, ok := q.(priorityqueue.PriorityQueue[request]) + return ok +} + // Create implements EventHandler. func (h TypedFuncs[object, request]) Create(ctx context.Context, e event.TypedCreateEvent[object], q workqueue.TypedRateLimitingInterface[request]) { if h.CreateFunc != nil { - h.CreateFunc(ctx, e, q) + if !implementsClientObject[object]() || !isPriorityQueue(q) || isNil(e.Object) { + h.CreateFunc(ctx, e, q) + return + } + wq := workqueueWithCustomAddFunc[request]{ + TypedRateLimitingInterface: q, + // We already know that we have a priority queue, that event.Object implements + // client.Object and that its not nil + addFunc: func(item request, q workqueue.TypedRateLimitingInterface[request]) { + // We construct a new event typed to client.Object because isObjectUnchanged + // is a generic and hence has to know at compile time the type of the event + // it gets. We only figure that out at runtime though, but we know for sure + // that it implements client.Object at this point so we can hardcode the event + // type to that. + evt := event.CreateEvent{Object: any(e.Object).(client.Object)} + var priority int + if isObjectUnchanged(evt) { + priority = LowPriority + } + q.(priorityqueue.PriorityQueue[request]).AddWithOpts( + priorityqueue.AddOpts{Priority: priority}, + item, + ) + }, + } + h.CreateFunc(ctx, e, wq) } } @@ -125,7 +162,27 @@ func (h TypedFuncs[object, request]) Delete(ctx context.Context, e event.TypedDe // Update implements EventHandler. func (h TypedFuncs[object, request]) Update(ctx context.Context, e event.TypedUpdateEvent[object], q workqueue.TypedRateLimitingInterface[request]) { if h.UpdateFunc != nil { - h.UpdateFunc(ctx, e, q) + if !implementsClientObject[object]() || !isPriorityQueue(q) || isNil(e.ObjectOld) || isNil(e.ObjectNew) { + h.UpdateFunc(ctx, e, q) + return + } + + wq := workqueueWithCustomAddFunc[request]{ + TypedRateLimitingInterface: q, + // We already know that we have a priority queue, that event.ObjectOld and ObjectNew implement + // client.Object and that they are not nil + addFunc: func(item request, q workqueue.TypedRateLimitingInterface[request]) { + var priority int + if any(e.ObjectOld).(client.Object).GetResourceVersion() == any(e.ObjectNew).(client.Object).GetResourceVersion() { + priority = LowPriority + } + q.(priorityqueue.PriorityQueue[request]).AddWithOpts( + priorityqueue.AddOpts{Priority: priority}, + item, + ) + }, + } + h.UpdateFunc(ctx, e, wq) } } @@ -142,43 +199,10 @@ const LowPriority = -100 // WithLowPriorityWhenUnchanged reduces the priority of events stemming from the initial listwatch or from a resync if // and only if a priorityqueue.PriorityQueue is used. If not, it does nothing. func WithLowPriorityWhenUnchanged[object client.Object, request comparable](u TypedEventHandler[object, request]) TypedEventHandler[object, request] { + // TypedFuncs already implements this so just wrap return TypedFuncs[object, request]{ - CreateFunc: func(ctx context.Context, tce event.TypedCreateEvent[object], trli workqueue.TypedRateLimitingInterface[request]) { - // Due to how the handlers are factored, we have to wrap the workqueue to be able - // to inject custom behavior. - u.Create(ctx, tce, workqueueWithCustomAddFunc[request]{ - TypedRateLimitingInterface: trli, - addFunc: func(item request, q workqueue.TypedRateLimitingInterface[request]) { - priorityQueue, isPriorityQueue := q.(priorityqueue.PriorityQueue[request]) - if !isPriorityQueue { - q.Add(item) - return - } - var priority int - if isObjectUnchanged(tce) { - priority = LowPriority - } - priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item) - }, - }) - }, - UpdateFunc: func(ctx context.Context, tue event.TypedUpdateEvent[object], trli workqueue.TypedRateLimitingInterface[request]) { - u.Update(ctx, tue, workqueueWithCustomAddFunc[request]{ - TypedRateLimitingInterface: trli, - addFunc: func(item request, q workqueue.TypedRateLimitingInterface[request]) { - priorityQueue, isPriorityQueue := q.(priorityqueue.PriorityQueue[request]) - if !isPriorityQueue { - q.Add(item) - return - } - var priority int - if tue.ObjectOld.GetResourceVersion() == tue.ObjectNew.GetResourceVersion() { - priority = LowPriority - } - priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item) - }, - }) - }, + CreateFunc: u.Create, + UpdateFunc: u.Update, DeleteFunc: u.Delete, GenericFunc: u.Generic, } @@ -199,3 +223,35 @@ func (w workqueueWithCustomAddFunc[request]) Add(item request) { func isObjectUnchanged[object client.Object](e event.TypedCreateEvent[object]) bool { return e.Object.GetCreationTimestamp().Time.Before(time.Now().Add(-time.Minute)) } + +// addToQueueCreate adds the reconcile.Request to the priorityqueue in the handler +// for Create requests if and only if the workqueue being used is of type priorityqueue.PriorityQueue[reconcile.Request] +func addToQueueCreate[T client.Object, request comparable](q workqueue.TypedRateLimitingInterface[request], evt event.TypedCreateEvent[T], item request) { + priorityQueue, isPriorityQueue := q.(priorityqueue.PriorityQueue[request]) + if !isPriorityQueue { + q.Add(item) + return + } + + var priority int + if isObjectUnchanged(evt) { + priority = LowPriority + } + priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item) +} + +// addToQueueUpdate adds the reconcile.Request to the priorityqueue in the handler +// for Update requests if and only if the workqueue being used is of type priorityqueue.PriorityQueue[reconcile.Request] +func addToQueueUpdate[T client.Object, request comparable](q workqueue.TypedRateLimitingInterface[request], evt event.TypedUpdateEvent[T], item request) { + priorityQueue, isPriorityQueue := q.(priorityqueue.PriorityQueue[request]) + if !isPriorityQueue { + q.Add(item) + return + } + + var priority int + if evt.ObjectOld.GetResourceVersion() == evt.ObjectNew.GetResourceVersion() { + priority = LowPriority + } + priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item) +} diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/internal/controller/controller.go b/vendor/sigs.k8s.io/controller-runtime/pkg/internal/controller/controller.go index cc734dfb6..3f8cfdaa0 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/internal/controller/controller.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/internal/controller/controller.go @@ -31,6 +31,7 @@ import ( "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/client-go/util/workqueue" + "sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue" ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics" logf "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -60,7 +61,7 @@ type Controller[request comparable] struct { // Queue is an listeningQueue that listens for events from Informers and adds object keys to // the Queue for processing - Queue workqueue.TypedRateLimitingInterface[request] + Queue priorityqueue.PriorityQueue[request] // mu is used to synchronize Controller setup mu sync.Mutex @@ -157,7 +158,12 @@ func (c *Controller[request]) Start(ctx context.Context) error { // Set the internal context. c.ctx = ctx - c.Queue = c.NewQueue(c.Name, c.RateLimiter) + queue := c.NewQueue(c.Name, c.RateLimiter) + if priorityQueue, isPriorityQueue := queue.(priorityqueue.PriorityQueue[request]); isPriorityQueue { + c.Queue = priorityQueue + } else { + c.Queue = &priorityQueueWrapper[request]{TypedRateLimitingInterface: queue} + } go func() { <-ctx.Done() c.Queue.ShutDown() @@ -268,7 +274,7 @@ func (c *Controller[request]) Start(ctx context.Context) error { // processNextWorkItem will read a single work item off the workqueue and // attempt to process it, by calling the reconcileHandler. func (c *Controller[request]) processNextWorkItem(ctx context.Context) bool { - obj, shutdown := c.Queue.Get() + obj, priority, shutdown := c.Queue.GetWithPriority() if shutdown { // Stop working return false @@ -285,7 +291,7 @@ func (c *Controller[request]) processNextWorkItem(ctx context.Context) bool { ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(1) defer ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(-1) - c.reconcileHandler(ctx, obj) + c.reconcileHandler(ctx, obj, priority) return true } @@ -308,7 +314,7 @@ func (c *Controller[request]) initMetrics() { ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Set(0) } -func (c *Controller[request]) reconcileHandler(ctx context.Context, req request) { +func (c *Controller[request]) reconcileHandler(ctx context.Context, req request, priority int) { // Update metrics after processing each item reconcileStartTS := time.Now() defer func() { @@ -331,7 +337,7 @@ func (c *Controller[request]) reconcileHandler(ctx context.Context, req request) if errors.Is(err, reconcile.TerminalError(nil)) { ctrlmetrics.TerminalReconcileErrors.WithLabelValues(c.Name).Inc() } else { - c.Queue.AddRateLimited(req) + c.Queue.AddWithOpts(priorityqueue.AddOpts{RateLimited: true, Priority: priority}, req) } ctrlmetrics.ReconcileErrors.WithLabelValues(c.Name).Inc() ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelError).Inc() @@ -346,11 +352,11 @@ func (c *Controller[request]) reconcileHandler(ctx context.Context, req request) // We need to drive to stable reconcile loops before queuing due // to result.RequestAfter c.Queue.Forget(req) - c.Queue.AddAfter(req, result.RequeueAfter) + c.Queue.AddWithOpts(priorityqueue.AddOpts{After: result.RequeueAfter, Priority: priority}, req) ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeueAfter).Inc() case result.Requeue: log.V(5).Info("Reconcile done, requeueing") - c.Queue.AddRateLimited(req) + c.Queue.AddWithOpts(priorityqueue.AddOpts{RateLimited: true, Priority: priority}, req) ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeue).Inc() default: log.V(5).Info("Reconcile successful") @@ -388,3 +394,25 @@ type reconcileIDKey struct{} func addReconcileID(ctx context.Context, reconcileID types.UID) context.Context { return context.WithValue(ctx, reconcileIDKey{}, reconcileID) } + +type priorityQueueWrapper[request comparable] struct { + workqueue.TypedRateLimitingInterface[request] +} + +func (p *priorityQueueWrapper[request]) AddWithOpts(opts priorityqueue.AddOpts, items ...request) { + for _, item := range items { + switch { + case opts.RateLimited: + p.TypedRateLimitingInterface.AddRateLimited(item) + case opts.After > 0: + p.TypedRateLimitingInterface.AddAfter(item, opts.After) + default: + p.TypedRateLimitingInterface.Add(item) + } + } +} + +func (p *priorityQueueWrapper[request]) GetWithPriority() (request, int, bool) { + item, shutdown := p.TypedRateLimitingInterface.Get() + return item, 0, shutdown +}