Skip to content

Commit

Permalink
Add back generation changed filter. Move finalizer add to after delet…
Browse files Browse the repository at this point in the history
…ion check.
  • Loading branch information
samuelattwood committed Jan 16, 2025
1 parent 9993084 commit 6c2b9df
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 66 deletions.
28 changes: 15 additions & 13 deletions internal/controller/account_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

Expand Down Expand Up @@ -89,19 +90,6 @@ func (r *AccountReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
return ctrl.Result{Requeue: true}, nil
}

// Add finalizer
if !controllerutil.ContainsFinalizer(account, accountFinalizer) {
log.Info("Adding Account finalizer.")
if ok := controllerutil.AddFinalizer(account, accountFinalizer); !ok {
return ctrl.Result{}, errors.New("failed to add finalizer to account resource")
}

if err := r.Update(ctx, account); err != nil {
return ctrl.Result{}, fmt.Errorf("update account resource to add finalizer: %w", err)
}
return ctrl.Result{}, nil
}

// Check Deletion
markedForDeletion := account.GetDeletionTimestamp() != nil
if markedForDeletion {
Expand Down Expand Up @@ -135,6 +123,19 @@ func (r *AccountReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
return ctrl.Result{}, nil
}

// Add finalizer
if !controllerutil.ContainsFinalizer(account, accountFinalizer) {
log.Info("Adding Account finalizer.")
if ok := controllerutil.AddFinalizer(account, accountFinalizer); !ok {
return ctrl.Result{}, errors.New("failed to add finalizer to account resource")
}

if err := r.Update(ctx, account); err != nil {
return ctrl.Result{}, fmt.Errorf("update account resource to add finalizer: %w", err)
}
return ctrl.Result{}, nil
}

// Update ready status for non-deleted accounts
account.Status.ObservedGeneration = account.Generation
account.Status.Conditions = updateReadyCondition(
Expand Down Expand Up @@ -218,6 +219,7 @@ func (r *AccountReconciler) findDependentResources(ctx context.Context, log logr
func (r *AccountReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&api.Account{}).
WithEventFilter(predicate.GenerationChangedPredicate{}).
WithOptions(controller.Options{
MaxConcurrentReconciles: 1,
}).
Expand Down
38 changes: 24 additions & 14 deletions internal/controller/consumer_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/predicate"

api "github.com/nats-io/nack/pkg/jetstream/apis/jetstream/v1beta2"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -82,19 +84,6 @@ func (r *ConsumerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
return ctrl.Result{Requeue: true}, nil
}

// Add finalizer
if !controllerutil.ContainsFinalizer(consumer, consumerFinalizer) {
log.Info("Adding consumer finalizer.")
if ok := controllerutil.AddFinalizer(consumer, consumerFinalizer); !ok {
return ctrl.Result{}, errors.New("failed to add finalizer to consumer resource")
}

if err := r.Update(ctx, consumer); err != nil {
return ctrl.Result{}, fmt.Errorf("update consumer resource to add finalizer: %w", err)
}
return ctrl.Result{}, nil
}

// Check Deletion
markedForDeletion := consumer.GetDeletionTimestamp() != nil
if markedForDeletion {
Expand All @@ -110,6 +99,19 @@ func (r *ConsumerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
return ctrl.Result{}, nil
}

// Add finalizer
if !controllerutil.ContainsFinalizer(consumer, consumerFinalizer) {
log.Info("Adding consumer finalizer.")
if ok := controllerutil.AddFinalizer(consumer, consumerFinalizer); !ok {
return ctrl.Result{}, errors.New("failed to add finalizer to consumer resource")
}

if err := r.Update(ctx, consumer); err != nil {
return ctrl.Result{}, fmt.Errorf("update consumer resource to add finalizer: %w", err)
}
return ctrl.Result{}, nil
}

if consumer.Spec.FlowControl || consumer.Spec.DeliverSubject != "" || consumer.Spec.DeliverGroup != "" || consumer.Spec.HeartbeatInterval != "" {
log.Info("FlowControl, DeliverSubject, DeliverGroup, and HeartbeatInterval are Push Consumer options, which are not supported. Skipping consumer creation or update.")
consumer.Status.Conditions = updateReadyCondition(consumer.Status.Conditions, v1.ConditionFalse, stateErrored, "Push Consumer options are not supported.")
Expand All @@ -121,6 +123,9 @@ func (r *ConsumerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c

// Create or update stream
if err := r.createOrUpdate(ctx, log, consumer); err != nil {
if err := r.Get(ctx, client.ObjectKeyFromObject(consumer), consumer); err != nil {
return ctrl.Result{}, fmt.Errorf("get consumer resource: %w", err)
}
consumer.Status.Conditions = updateReadyCondition(consumer.Status.Conditions, v1.ConditionFalse, stateErrored, err.Error())
if err := r.Status().Update(ctx, consumer); err != nil {
log.Error(err, "Failed to update ready condition to Errored.")
Expand Down Expand Up @@ -160,7 +165,11 @@ func (r *ConsumerReconciler) deleteConsumer(ctx context.Context, log logr.Logger
case errors.Is(err, jetstream.ErrStreamNotFound):
log.Info("Stream of consumer does not exist. Unable to delete.")
case err != nil:
return fmt.Errorf("delete jetstream consumer: %w", err)
if storedState == nil {
log.Info("Consumer not reconciled and no state received from server. Removing finalizer.")
} else {
return fmt.Errorf("delete jetstream consumer: %w", err)
}
default:
log.Info("Consumer deleted.")
}
Expand Down Expand Up @@ -407,6 +416,7 @@ func consumerSpecToConfig(spec *api.ConsumerSpec) (*jetstream.ConsumerConfig, er
func (r *ConsumerReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&api.Consumer{}).
WithEventFilter(predicate.GenerationChangedPredicate{}).
WithOptions(controller.Options{
MaxConcurrentReconciles: 1,
}).
Expand Down
30 changes: 17 additions & 13 deletions internal/controller/keyvalue_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)

const (
Expand Down Expand Up @@ -87,19 +88,6 @@ func (r *KeyValueReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
return ctrl.Result{Requeue: true}, nil
}

// Add finalizer
if !controllerutil.ContainsFinalizer(keyValue, keyValueFinalizer) {
log.Info("Adding KeyValue finalizer.")
if ok := controllerutil.AddFinalizer(keyValue, keyValueFinalizer); !ok {
return ctrl.Result{}, errors.New("failed to add finalizer to keyvalue resource")
}

if err := r.Update(ctx, keyValue); err != nil {
return ctrl.Result{}, fmt.Errorf("update keyvalue resource to add finalizer: %w", err)
}
return ctrl.Result{}, nil
}

// Check Deletion
markedForDeletion := keyValue.GetDeletionTimestamp() != nil
if markedForDeletion {
Expand All @@ -115,6 +103,19 @@ func (r *KeyValueReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
return ctrl.Result{}, nil
}

// Add finalizer
if !controllerutil.ContainsFinalizer(keyValue, keyValueFinalizer) {
log.Info("Adding KeyValue finalizer.")
if ok := controllerutil.AddFinalizer(keyValue, keyValueFinalizer); !ok {
return ctrl.Result{}, errors.New("failed to add finalizer to keyvalue resource")
}

if err := r.Update(ctx, keyValue); err != nil {
return ctrl.Result{}, fmt.Errorf("update keyvalue resource to add finalizer: %w", err)
}
return ctrl.Result{}, nil
}

// Create or update KeyValue
if err := r.createOrUpdate(ctx, log, keyValue); err != nil {
return ctrl.Result{}, fmt.Errorf("create or update: %s", err)
Expand Down Expand Up @@ -149,6 +150,8 @@ func (r *KeyValueReconciler) deleteKeyValue(ctx context.Context, log logr.Logger
})
if errors.Is(err, jetstream.ErrBucketNotFound) {
log.Info("KeyValue does not exist, unable to delete.", "keyValueName", keyValue.Spec.Bucket)
} else if err != nil && storedState == nil {
log.Info("KeyValue not reconciled and no state received from server. Removing finalizer.")
} else if err != nil {
return fmt.Errorf("delete keyvalue during finalization: %w", err)
}
Expand Down Expand Up @@ -379,6 +382,7 @@ func keyValueSpecToConfig(spec *api.KeyValueSpec) (jetstream.KeyValueConfig, err
func (r *KeyValueReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&api.KeyValue{}).
WithEventFilter(predicate.GenerationChangedPredicate{}).
WithOptions(controller.Options{
MaxConcurrentReconciles: 1,
}).
Expand Down
30 changes: 17 additions & 13 deletions internal/controller/objectstore_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)

const (
Expand Down Expand Up @@ -88,19 +89,6 @@ func (r *ObjectStoreReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return ctrl.Result{Requeue: true}, nil
}

// Add finalizer
if !controllerutil.ContainsFinalizer(objectStore, objectStoreFinalizer) {
log.Info("Adding ObjectStore finalizer.")
if ok := controllerutil.AddFinalizer(objectStore, objectStoreFinalizer); !ok {
return ctrl.Result{}, errors.New("failed to add finalizer to objectstore resource")
}

if err := r.Update(ctx, objectStore); err != nil {
return ctrl.Result{}, fmt.Errorf("update objectstore resource to add finalizer: %w", err)
}
return ctrl.Result{}, nil
}

// Check Deletion
markedForDeletion := objectStore.GetDeletionTimestamp() != nil
if markedForDeletion {
Expand All @@ -116,6 +104,19 @@ func (r *ObjectStoreReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return ctrl.Result{}, nil
}

// Add finalizer
if !controllerutil.ContainsFinalizer(objectStore, objectStoreFinalizer) {
log.Info("Adding ObjectStore finalizer.")
if ok := controllerutil.AddFinalizer(objectStore, objectStoreFinalizer); !ok {
return ctrl.Result{}, errors.New("failed to add finalizer to objectstore resource")
}

if err := r.Update(ctx, objectStore); err != nil {
return ctrl.Result{}, fmt.Errorf("update objectstore resource to add finalizer: %w", err)
}
return ctrl.Result{}, nil
}

// Create or update ObjectStore
if err := r.createOrUpdate(ctx, log, objectStore); err != nil {
return ctrl.Result{}, fmt.Errorf("create or update: %s", err)
Expand Down Expand Up @@ -150,6 +151,8 @@ func (r *ObjectStoreReconciler) deleteObjectStore(ctx context.Context, log logr.
})
if errors.Is(err, jetstream.ErrStreamNotFound) || errors.Is(err, jetstream.ErrBucketNotFound) {
log.Info("ObjectStore does not exist, unable to delete.", "objectStoreName", objectStore.Spec.Bucket)
} else if err != nil && storedState == nil {
log.Info("ObjectStore not reconciled and no state received from server. Removing finalizer.")
} else if err != nil {
return fmt.Errorf("delete objectstore during finalization: %w", err)
}
Expand Down Expand Up @@ -349,6 +352,7 @@ func objectStoreSpecToConfig(spec *api.ObjectStoreSpec) (jetstream.ObjectStoreCo
func (r *ObjectStoreReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&api.ObjectStore{}).
WithEventFilter(predicate.GenerationChangedPredicate{}).
WithOptions(controller.Options{
MaxConcurrentReconciles: 1,
}).
Expand Down
30 changes: 17 additions & 13 deletions internal/controller/stream_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)

// StreamReconciler reconciles a Stream object
Expand Down Expand Up @@ -84,19 +85,6 @@ func (r *StreamReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
return ctrl.Result{Requeue: true}, nil
}

// Add finalizer
if !controllerutil.ContainsFinalizer(stream, streamFinalizer) {
log.Info("Adding stream finalizer.")
if ok := controllerutil.AddFinalizer(stream, streamFinalizer); !ok {
return ctrl.Result{}, errors.New("failed to add finalizer to stream resource")
}

if err := r.Update(ctx, stream); err != nil {
return ctrl.Result{}, fmt.Errorf("update stream resource to add finalizer: %w", err)
}
return ctrl.Result{}, nil
}

// Check Deletion
markedForDeletion := stream.GetDeletionTimestamp() != nil
if markedForDeletion {
Expand All @@ -112,6 +100,19 @@ func (r *StreamReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
return ctrl.Result{}, nil
}

// Add finalizer
if !controllerutil.ContainsFinalizer(stream, streamFinalizer) {
log.Info("Adding stream finalizer.")
if ok := controllerutil.AddFinalizer(stream, streamFinalizer); !ok {
return ctrl.Result{}, errors.New("failed to add finalizer to stream resource")
}

if err := r.Update(ctx, stream); err != nil {
return ctrl.Result{}, fmt.Errorf("update stream resource to add finalizer: %w", err)
}
return ctrl.Result{}, nil
}

// Create or update stream
if err := r.createOrUpdate(ctx, log, stream); err != nil {
return ctrl.Result{}, fmt.Errorf("create or update: %s", err)
Expand Down Expand Up @@ -146,6 +147,8 @@ func (r *StreamReconciler) deleteStream(ctx context.Context, log logr.Logger, st
})
if errors.Is(err, jetstream.ErrStreamNotFound) {
log.Info("Stream does not exist, unable to delete.", "streamName", stream.Spec.Name)
} else if err != nil && storedState == nil {
log.Info("Stream not reconciled and no state received from server. Removing finalizer.")
} else if err != nil {
return fmt.Errorf("delete stream during finalization: %w", err)
}
Expand Down Expand Up @@ -472,6 +475,7 @@ func mapStreamSource(ss *api.StreamSource) (*jetstream.StreamSource, error) {
func (r *StreamReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&api.Stream{}).
WithEventFilter(predicate.GenerationChangedPredicate{}).
WithOptions(controller.Options{
MaxConcurrentReconciles: 1,
}).
Expand Down

0 comments on commit 6c2b9df

Please sign in to comment.