From dc965225f63560e79d11d6729a2c720c254b0db7 Mon Sep 17 00:00:00 2001 From: Prakriti Mandal <98270250+prakrit55@users.noreply.github.com> Date: Fri, 17 Nov 2023 13:29:09 +0530 Subject: [PATCH] Add serviceaccount in parallel (#7373) * added serviceaccount in parallel Signed-off-by: Griffin * Update controller.go added changes * Update controller.go added `globalResync` * added Marker functions and tests Signed-off-by: Griffin * added tests Signed-off-by: Griffin * added function in test and fixed lint Signed-off-by: Griffin * fixing test in parallel_test Signed-off-by: Griffin * fixed lint Signed-off-by: Griffin * modified parallel_test Signed-off-by: Griffin --------- Signed-off-by: Griffin --- pkg/apis/flows/v1/parallel_lifecycle.go | 21 ++- pkg/apis/flows/v1/parallel_lifecycle_test.go | 173 ++++++++++++------- pkg/reconciler/parallel/controller.go | 38 +++- pkg/reconciler/parallel/controller_test.go | 12 +- pkg/reconciler/parallel/parallel.go | 26 ++- pkg/reconciler/parallel/parallel_test.go | 121 ++++++++++++- pkg/reconciler/testing/v1/parallel.go | 30 ++++ 7 files changed, 340 insertions(+), 81 deletions(-) diff --git a/pkg/apis/flows/v1/parallel_lifecycle.go b/pkg/apis/flows/v1/parallel_lifecycle.go index b02363ebfb8..34467c058de 100644 --- a/pkg/apis/flows/v1/parallel_lifecycle.go +++ b/pkg/apis/flows/v1/parallel_lifecycle.go @@ -25,7 +25,7 @@ import ( pkgduckv1 "knative.dev/pkg/apis/duck/v1" ) -var pCondSet = apis.NewLivingConditionSet(ParallelConditionReady, ParallelConditionChannelsReady, ParallelConditionSubscriptionsReady, ParallelConditionAddressable) +var pCondSet = apis.NewLivingConditionSet(ParallelConditionReady, ParallelConditionChannelsReady, ParallelConditionSubscriptionsReady, ParallelConditionAddressable, ParallelConditionOIDCIdentityCreated) const ( // ParallelConditionReady has status True when all subconditions below have been set to True. @@ -41,7 +41,8 @@ const ( // ParallelConditionAddressable has status true when this Parallel meets // the Addressable contract and has a non-empty hostname. - ParallelConditionAddressable apis.ConditionType = "Addressable" + ParallelConditionAddressable apis.ConditionType = "Addressable" + ParallelConditionOIDCIdentityCreated apis.ConditionType = "OIDCIdentityCreated" ) // GetConditionSet retrieves the condition set for this resource. Implements the KRShaped interface. @@ -195,6 +196,22 @@ func (ps *ParallelStatus) MarkAddressableNotReady(reason, messageFormat string, pCondSet.Manage(ps).MarkFalse(ParallelConditionAddressable, reason, messageFormat, messageA...) } +func (ps *ParallelStatus) MarkOIDCIdentityCreatedSucceeded() { + pCondSet.Manage(ps).MarkTrue(ParallelConditionOIDCIdentityCreated) +} + +func (ps *ParallelStatus) MarkOIDCIdentityCreatedSucceededWithReason(reason, messageFormat string, messageA ...interface{}) { + pCondSet.Manage(ps).MarkTrueWithReason(ParallelConditionOIDCIdentityCreated, reason, messageFormat, messageA...) +} + +func (ps *ParallelStatus) MarkOIDCIdentityCreatedFailed(reason, messageFormat string, messageA ...interface{}) { + pCondSet.Manage(ps).MarkFalse(ParallelConditionOIDCIdentityCreated, reason, messageFormat, messageA...) +} + +func (ps *ParallelStatus) MarkOIDCIdentityCreatedUnknown(reason, messageFormat string, messageA ...interface{}) { + pCondSet.Manage(ps).MarkUnknown(ParallelConditionOIDCIdentityCreated, reason, messageFormat, messageA...) +} + func (ps *ParallelStatus) setAddress(address *pkgduckv1.Addressable) { ps.Address = address if address == nil { diff --git a/pkg/apis/flows/v1/parallel_lifecycle_test.go b/pkg/apis/flows/v1/parallel_lifecycle_test.go index d87f3026be4..d193951325f 100644 --- a/pkg/apis/flows/v1/parallel_lifecycle_test.go +++ b/pkg/apis/flows/v1/parallel_lifecycle_test.go @@ -88,6 +88,9 @@ func TestParallelInitializeConditions(t *testing.T) { }, { Type: ParallelConditionChannelsReady, Status: corev1.ConditionUnknown, + }, { + Type: ParallelConditionOIDCIdentityCreated, + Status: corev1.ConditionUnknown, }, { Type: ParallelConditionReady, Status: corev1.ConditionUnknown, @@ -115,6 +118,9 @@ func TestParallelInitializeConditions(t *testing.T) { }, { Type: ParallelConditionChannelsReady, Status: corev1.ConditionFalse, + }, { + Type: ParallelConditionOIDCIdentityCreated, + Status: corev1.ConditionUnknown, }, { Type: ParallelConditionReady, Status: corev1.ConditionUnknown, @@ -142,6 +148,9 @@ func TestParallelInitializeConditions(t *testing.T) { }, { Type: ParallelConditionChannelsReady, Status: corev1.ConditionUnknown, + }, { + Type: ParallelConditionOIDCIdentityCreated, + Status: corev1.ConditionUnknown, }, { Type: ParallelConditionReady, Status: corev1.ConditionUnknown, @@ -327,82 +336,109 @@ func TestParallelPropagateSubscriptionStatusUpdated(t *testing.T) { func TestParallelReady(t *testing.T) { tests := []struct { - name string - fsubs []*messagingv1.Subscription - subs []*messagingv1.Subscription - ichannel *eventingduckv1.Channelable - channels []*eventingduckv1.Channelable - want bool + name string + fsubs []*messagingv1.Subscription + subs []*messagingv1.Subscription + ichannel *eventingduckv1.Channelable + channels []*eventingduckv1.Channelable + markOIDCServiceAccountCreated bool + want bool }{{ - name: "ingress false, empty", - fsubs: []*messagingv1.Subscription{}, - subs: []*messagingv1.Subscription{}, - ichannel: getChannelable(false), - channels: []*eventingduckv1.Channelable{}, - want: false, + name: "ingress false, empty, serviceAccount ready", + fsubs: []*messagingv1.Subscription{}, + subs: []*messagingv1.Subscription{}, + ichannel: getChannelable(false), + channels: []*eventingduckv1.Channelable{}, + markOIDCServiceAccountCreated: true, + want: false, }, { - name: "ingress true, empty", - fsubs: []*messagingv1.Subscription{}, - subs: []*messagingv1.Subscription{}, - ichannel: getChannelable(true), - channels: []*eventingduckv1.Channelable{}, - want: false, + name: "ingress true, empty, serviceAccount ready", + fsubs: []*messagingv1.Subscription{}, + subs: []*messagingv1.Subscription{}, + ichannel: getChannelable(true), + channels: []*eventingduckv1.Channelable{}, + markOIDCServiceAccountCreated: true, + want: false, }, { - name: "ingress true, one channelable not ready, one subscription ready", - ichannel: getChannelable(true), - channels: []*eventingduckv1.Channelable{getChannelable(false)}, - fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true)}, - subs: []*messagingv1.Subscription{getSubscription("sub0", true)}, - want: false, + name: "ingress true, one channelable not ready, one subscription ready, serviceAccount ready", + ichannel: getChannelable(true), + channels: []*eventingduckv1.Channelable{getChannelable(false)}, + fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true)}, + subs: []*messagingv1.Subscription{getSubscription("sub0", true)}, + markOIDCServiceAccountCreated: true, + want: false, }, { - name: "ingress true, one channelable ready, one subscription not ready", - ichannel: getChannelable(true), - channels: []*eventingduckv1.Channelable{getChannelable(true)}, - fsubs: []*messagingv1.Subscription{getSubscription("fsub0", false)}, - subs: []*messagingv1.Subscription{getSubscription("sub0", false)}, - want: false, + name: "ingress true, one channelable ready, one subscription not ready, serviceAccount ready", + ichannel: getChannelable(true), + channels: []*eventingduckv1.Channelable{getChannelable(true)}, + fsubs: []*messagingv1.Subscription{getSubscription("fsub0", false)}, + subs: []*messagingv1.Subscription{getSubscription("sub0", false)}, + markOIDCServiceAccountCreated: true, + want: false, }, { - name: "ingress false, one channelable ready, one subscription ready", - ichannel: getChannelable(false), - channels: []*eventingduckv1.Channelable{getChannelable(true)}, - fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true)}, - subs: []*messagingv1.Subscription{getSubscription("sub0", true)}, - want: false, + name: "ingress false, one channelable ready, one subscription ready,serviceAccount ready", + ichannel: getChannelable(false), + channels: []*eventingduckv1.Channelable{getChannelable(true)}, + fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true)}, + subs: []*messagingv1.Subscription{getSubscription("sub0", true)}, + markOIDCServiceAccountCreated: true, + want: false, }, { - name: "ingress true, one channelable ready, one subscription ready", - ichannel: getChannelable(true), - channels: []*eventingduckv1.Channelable{getChannelable(true)}, - fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true)}, - subs: []*messagingv1.Subscription{getSubscription("sub0", true)}, - want: true, + name: "ingress true, one channelable ready, one subscription ready, serviceAccount ready", + ichannel: getChannelable(true), + channels: []*eventingduckv1.Channelable{getChannelable(true)}, + fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true)}, + subs: []*messagingv1.Subscription{getSubscription("sub0", true)}, + markOIDCServiceAccountCreated: true, + want: true, }, { - name: "ingress true, one channelable ready, one not, two subscriptions ready", - ichannel: getChannelable(true), - channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(false)}, - fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", true)}, - subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", true)}, - want: false, + name: "ingress true, one channelable ready, one subscription ready, serviceAccount not ready", + ichannel: getChannelable(true), + channels: []*eventingduckv1.Channelable{getChannelable(true)}, + fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true)}, + subs: []*messagingv1.Subscription{getSubscription("sub0", true)}, + markOIDCServiceAccountCreated: false, + want: false, }, { - name: "ingress true, two channelables ready, one subscription ready, one not", - ichannel: getChannelable(true), - channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(true)}, - fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", false)}, - subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", false)}, - want: false, + name: "ingress true, one channelable ready, one not, two subscriptions ready, serviceAccount ready", + ichannel: getChannelable(true), + channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(false)}, + fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", true)}, + subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", true)}, + markOIDCServiceAccountCreated: true, + want: false, }, { - name: "ingress false, two channelables ready, two subscriptions ready", - ichannel: getChannelable(false), - channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(true)}, - fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", true)}, - subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", true)}, - want: false, + name: "ingress true, two channelables ready, one subscription ready, one not, serviceAccount ready", + ichannel: getChannelable(true), + channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(true)}, + fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", false)}, + subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", false)}, + markOIDCServiceAccountCreated: true, + want: false, }, { - name: "ingress true, two channelables ready, two subscriptions ready", - ichannel: getChannelable(true), - channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(true)}, - fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", true)}, - subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", true)}, - want: true, + name: "ingress false, two channelables ready, two subscriptions ready, serviceAccount ready", + ichannel: getChannelable(false), + channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(true)}, + fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", true)}, + subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", true)}, + markOIDCServiceAccountCreated: true, + want: false, + }, { + name: "ingress true, two channelables ready, two subscriptions ready, serviceAccount not ready", + ichannel: getChannelable(true), + channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(true)}, + fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", true)}, + subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", true)}, + markOIDCServiceAccountCreated: false, + want: false, + }, { + name: "ingress true, two channelables ready, two subscriptions ready, serviceAccount ready", + ichannel: getChannelable(true), + channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(true)}, + fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", true)}, + subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", true)}, + markOIDCServiceAccountCreated: true, + want: true, }} for _, test := range tests { @@ -410,6 +446,11 @@ func TestParallelReady(t *testing.T) { ps := ParallelStatus{} ps.PropagateChannelStatuses(test.ichannel, test.channels) ps.PropagateSubscriptionStatuses(test.fsubs, test.subs) + if test.markOIDCServiceAccountCreated { + ps.MarkOIDCIdentityCreatedSucceeded() + } else { + ps.MarkOIDCIdentityCreatedFailed("Unable to create serviceaccount", "") + } got := ps.IsReady() want := test.want if want != got { diff --git a/pkg/reconciler/parallel/controller.go b/pkg/reconciler/parallel/controller.go index 71121995dee..86522c21244 100644 --- a/pkg/reconciler/parallel/controller.go +++ b/pkg/reconciler/parallel/controller.go @@ -20,11 +20,15 @@ import ( "context" "k8s.io/client-go/tools/cache" + "knative.dev/eventing/pkg/apis/feature" v1 "knative.dev/eventing/pkg/apis/flows/v1" "knative.dev/eventing/pkg/duck" + kubeclient "knative.dev/pkg/client/injection/kube/client" + serviceaccountinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/serviceaccount" "knative.dev/pkg/configmap" "knative.dev/pkg/controller" "knative.dev/pkg/injection/clients/dynamicclient" + "knative.dev/pkg/logging" eventingclient "knative.dev/eventing/pkg/client/injection/client" "knative.dev/eventing/pkg/client/injection/ducks/duck/v1/channelable" @@ -42,14 +46,33 @@ func NewController( parallelInformer := parallel.Get(ctx) subscriptionInformer := subscription.Get(ctx) + serviceaccountInformer := serviceaccountinformer.Get(ctx) + + var globalResync func(obj interface{}) + featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"), func(name string, value interface{}) { + if globalResync != nil { + globalResync(nil) + } + }) + featureStore.WatchConfigs(cmw) r := &Reconciler{ - parallelLister: parallelInformer.Lister(), - subscriptionLister: subscriptionInformer.Lister(), - dynamicClientSet: dynamicclient.Get(ctx), - eventingClientSet: eventingclient.Get(ctx), + parallelLister: parallelInformer.Lister(), + subscriptionLister: subscriptionInformer.Lister(), + serviceAccountLister: serviceaccountInformer.Lister(), + kubeclient: kubeclient.Get(ctx), + dynamicClientSet: dynamicclient.Get(ctx), + eventingClientSet: eventingclient.Get(ctx), + } + impl := parallelreconciler.NewImpl(ctx, r, func(impl *controller.Impl) controller.Options { + return controller.Options{ + ConfigStore: featureStore, + } + }) + + globalResync = func(_ interface{}) { + impl.GlobalResync(parallelInformer.Informer()) } - impl := parallelreconciler.NewImpl(ctx, r) r.channelableTracker = duck.NewListableTrackerFromTracker(ctx, channelable.Get, impl.Tracker) parallelInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue)) @@ -60,6 +83,11 @@ func NewController( FilterFunc: controller.FilterController(&v1.Parallel{}), Handler: controller.HandleAll(impl.EnqueueControllerOf), }) + // Reconcile Parallel when the OIDC service account changes + serviceaccountInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ + FilterFunc: controller.FilterController(&v1.Parallel{}), + Handler: controller.HandleAll(impl.EnqueueControllerOf), + }) return impl } diff --git a/pkg/reconciler/parallel/controller_test.go b/pkg/reconciler/parallel/controller_test.go index acf3b8c5258..57f214a68ae 100644 --- a/pkg/reconciler/parallel/controller_test.go +++ b/pkg/reconciler/parallel/controller_test.go @@ -19,19 +19,29 @@ package parallel import ( "testing" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "knative.dev/pkg/configmap" . "knative.dev/pkg/reconciler/testing" // Fake injection informers + "knative.dev/eventing/pkg/apis/feature" _ "knative.dev/eventing/pkg/client/injection/ducks/duck/v1/channelable/fake" _ "knative.dev/eventing/pkg/client/injection/informers/flows/v1/parallel/fake" _ "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/subscription/fake" + _ "knative.dev/pkg/client/injection/kube/informers/core/v1/serviceaccount/fake" ) func TestNew(t *testing.T) { ctx, _ := SetupFakeContext(t) - c := NewController(ctx, configmap.NewStaticWatcher()) + c := NewController(ctx, configmap.NewStaticWatcher( + &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: feature.FlagsConfigName, + }, + }, + )) if c == nil { t.Fatal("Expected NewController to return a non-nil value") diff --git a/pkg/reconciler/parallel/parallel.go b/pkg/reconciler/parallel/parallel.go index 26f3521b03c..fda2313dd70 100644 --- a/pkg/reconciler/parallel/parallel.go +++ b/pkg/reconciler/parallel/parallel.go @@ -30,24 +30,30 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/dynamic" + "k8s.io/client-go/kubernetes" duckapis "knative.dev/pkg/apis/duck" "knative.dev/pkg/kmeta" "knative.dev/pkg/logging" pkgreconciler "knative.dev/pkg/reconciler" + corev1listers "k8s.io/client-go/listers/core/v1" duckv1 "knative.dev/eventing/pkg/apis/duck/v1" + "knative.dev/eventing/pkg/apis/feature" v1 "knative.dev/eventing/pkg/apis/flows/v1" messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1" + "knative.dev/eventing/pkg/auth" clientset "knative.dev/eventing/pkg/client/clientset/versioned" parallelreconciler "knative.dev/eventing/pkg/client/injection/reconciler/flows/v1/parallel" listers "knative.dev/eventing/pkg/client/listers/flows/v1" messaginglisters "knative.dev/eventing/pkg/client/listers/messaging/v1" ducklib "knative.dev/eventing/pkg/duck" "knative.dev/eventing/pkg/reconciler/parallel/resources" + duckv1knative "knative.dev/pkg/apis/duck/v1" ) type Reconciler struct { + kubeclient kubernetes.Interface // listers index properties about resources parallelLister listers.ParallelLister channelableTracker ducklib.ListableTracker @@ -57,7 +63,8 @@ type Reconciler struct { eventingClientSet clientset.Interface // dynamicClientSet allows us to configure pluggable Build objects - dynamicClientSet dynamic.Interface + dynamicClientSet dynamic.Interface + serviceAccountLister corev1listers.ServiceAccountLister } // Check that our Reconciler implements parallelreconciler.Interface @@ -71,6 +78,23 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, p *v1.Parallel) pkgrecon // 2.2 create a Subscription to the filter Channel, subscribe the subscriber and send reply to // either the branch Reply. If not present, send reply to the global Reply. If not present, do not send reply. // 3. Rinse and repeat step #2 above for each branch in the list + // OIDC authentication + featureFlags := feature.FromContext(ctx) + if featureFlags.IsOIDCAuthentication() { + saName := auth.GetOIDCServiceAccountNameForResource(v1.SchemeGroupVersion.WithKind("Parallel"), p.ObjectMeta) + p.Status.Auth = &duckv1knative.AuthStatus{ + ServiceAccountName: &saName, + } + if err := auth.EnsureOIDCServiceAccountExistsForResource(ctx, r.serviceAccountLister, r.kubeclient, v1.SchemeGroupVersion.WithKind("Parallel"), p.ObjectMeta); err != nil { + p.Status.MarkOIDCIdentityCreatedFailed("Unable to resolve service account for OIDC authentication", "%v", err) + return err + } + p.Status.MarkOIDCIdentityCreatedSucceeded() + } else { + p.Status.Auth = nil + p.Status.MarkOIDCIdentityCreatedSucceededWithReason(fmt.Sprintf("%s feature disabled", feature.OIDCAuthentication), "") + } + if p.Status.BranchStatuses == nil { p.Status.BranchStatuses = make([]v1.ParallelBranchStatus, 0) } diff --git a/pkg/reconciler/parallel/parallel_test.go b/pkg/reconciler/parallel/parallel_test.go index 54dbec0d76f..9dabaeb1656 100644 --- a/pkg/reconciler/parallel/parallel_test.go +++ b/pkg/reconciler/parallel/parallel_test.go @@ -21,6 +21,7 @@ import ( "fmt" "testing" + "knative.dev/eventing/pkg/auth" fakeeventingclient "knative.dev/eventing/pkg/client/injection/client/fake" fakedynamicclient "knative.dev/pkg/injection/clients/dynamicclient/fake" "knative.dev/pkg/tracker" @@ -47,9 +48,11 @@ import ( v1 "knative.dev/eventing/pkg/apis/flows/v1" + "knative.dev/eventing/pkg/apis/feature" messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1" "knative.dev/eventing/pkg/reconciler/parallel/resources" . "knative.dev/eventing/pkg/reconciler/testing/v1" + fakekubeclient "knative.dev/pkg/client/injection/kube/client/fake" ) const ( @@ -137,6 +140,7 @@ func TestAllBranches(t *testing.T) { WithFlowsParallelAddressableNotReady("emptyAddress", "addressable is nil"), WithFlowsParallelSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), WithFlowsParallelIngressChannelStatus(createParallelChannelStatus(parallelName, corev1.ConditionFalse)), + WithFlowsParallelOIDCIdentityCreatedSucceededBecauseOIDCFeatureDisabled(), WithFlowsParallelBranchStatuses([]v1.ParallelBranchStatus{{ FilterSubscriptionStatus: createParallelFilterSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), FilterChannelStatus: createParallelBranchChannelStatus(parallelName, 0, corev1.ConditionFalse), @@ -173,6 +177,7 @@ func TestAllBranches(t *testing.T) { WithFlowsParallelAddressableNotReady("emptyAddress", "addressable is nil"), WithFlowsParallelSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), WithFlowsParallelIngressChannelStatus(createParallelChannelStatus(parallelName, corev1.ConditionFalse)), + WithFlowsParallelOIDCIdentityCreatedSucceededBecauseOIDCFeatureDisabled(), WithFlowsParallelBranchStatuses([]v1.ParallelBranchStatus{{ FilterSubscriptionStatus: createParallelFilterSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), FilterChannelStatus: createParallelBranchChannelStatus(parallelName, 0, corev1.ConditionFalse), @@ -209,6 +214,7 @@ func TestAllBranches(t *testing.T) { WithFlowsParallelAddressableNotReady("emptyAddress", "addressable is nil"), WithFlowsParallelSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), WithFlowsParallelIngressChannelStatus(createParallelChannelStatus(parallelName, corev1.ConditionFalse)), + WithFlowsParallelOIDCIdentityCreatedSucceededBecauseOIDCFeatureDisabled(), WithFlowsParallelBranchStatuses([]v1.ParallelBranchStatus{{ FilterSubscriptionStatus: createParallelFilterSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), FilterChannelStatus: createParallelBranchChannelStatus(parallelName, 0, corev1.ConditionFalse), @@ -249,6 +255,7 @@ func TestAllBranches(t *testing.T) { WithFlowsParallelChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none"), WithFlowsParallelSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), WithFlowsParallelIngressChannelStatus(createParallelChannelStatus(parallelName, corev1.ConditionFalse)), + WithFlowsParallelOIDCIdentityCreatedSucceededBecauseOIDCFeatureDisabled(), WithFlowsParallelBranchStatuses([]v1.ParallelBranchStatus{{ FilterSubscriptionStatus: createParallelFilterSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), FilterChannelStatus: createParallelBranchChannelStatus(parallelName, 0, corev1.ConditionFalse), @@ -289,6 +296,7 @@ func TestAllBranches(t *testing.T) { WithFlowsParallelChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none"), WithFlowsParallelSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), WithFlowsParallelIngressChannelStatus(createParallelChannelStatus(parallelName, corev1.ConditionFalse)), + WithFlowsParallelOIDCIdentityCreatedSucceededBecauseOIDCFeatureDisabled(), WithFlowsParallelBranchStatuses([]v1.ParallelBranchStatus{{ FilterSubscriptionStatus: createParallelFilterSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), FilterChannelStatus: createParallelBranchChannelStatus(parallelName, 0, corev1.ConditionFalse), @@ -339,6 +347,7 @@ func TestAllBranches(t *testing.T) { WithFlowsParallelAddressableNotReady("emptyAddress", "addressable is nil"), WithFlowsParallelSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), WithFlowsParallelIngressChannelStatus(createParallelChannelStatus(parallelName, corev1.ConditionFalse)), + WithFlowsParallelOIDCIdentityCreatedSucceededBecauseOIDCFeatureDisabled(), WithFlowsParallelBranchStatuses([]v1.ParallelBranchStatus{ { FilterSubscriptionStatus: createParallelFilterSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), @@ -398,6 +407,7 @@ func TestAllBranches(t *testing.T) { WithFlowsParallelAddressableNotReady("emptyAddress", "addressable is nil"), WithFlowsParallelSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), WithFlowsParallelIngressChannelStatus(createParallelChannelStatus(parallelName, corev1.ConditionFalse)), + WithFlowsParallelOIDCIdentityCreatedSucceededBecauseOIDCFeatureDisabled(), WithFlowsParallelBranchStatuses([]v1.ParallelBranchStatus{ { FilterSubscriptionStatus: createParallelFilterSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), @@ -453,6 +463,7 @@ func TestAllBranches(t *testing.T) { WithFlowsParallelAddressableNotReady("emptyAddress", "addressable is nil"), WithFlowsParallelSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), WithFlowsParallelIngressChannelStatus(createParallelChannelStatus(parallelName, corev1.ConditionFalse)), + WithFlowsParallelOIDCIdentityCreatedSucceededBecauseOIDCFeatureDisabled(), WithFlowsParallelBranchStatuses([]v1.ParallelBranchStatus{{ FilterSubscriptionStatus: createParallelFilterSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), FilterChannelStatus: createParallelBranchChannelStatus(parallelName, 0, corev1.ConditionFalse), @@ -474,6 +485,7 @@ func TestAllBranches(t *testing.T) { WithFlowsParallelAddressableNotReady("emptyAddress", "addressable is nil"), WithFlowsParallelSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), WithFlowsParallelIngressChannelStatus(createParallelChannelStatus(parallelName, corev1.ConditionFalse)), + WithFlowsParallelOIDCIdentityCreatedSucceededBecauseOIDCFeatureDisabled(), WithFlowsParallelBranchStatuses([]v1.ParallelBranchStatus{ { FilterSubscriptionStatus: createParallelFilterSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), @@ -550,24 +562,104 @@ func TestAllBranches(t *testing.T) { WithFlowsParallelAddressableNotReady("emptyAddress", "addressable is nil"), WithFlowsParallelSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), WithFlowsParallelIngressChannelStatus(createParallelChannelStatus(parallelName, corev1.ConditionFalse)), + WithFlowsParallelOIDCIdentityCreatedSucceededBecauseOIDCFeatureDisabled(), WithFlowsParallelBranchStatuses([]v1.ParallelBranchStatus{{ FilterSubscriptionStatus: createParallelFilterSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), FilterChannelStatus: createParallelBranchChannelStatus(parallelName, 0, corev1.ConditionFalse), SubscriptionStatus: createParallelSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), }})), }}, - }, + }, { + Name: "OIDC: creates OIDC service account", + Key: pKey, + Ctx: feature.ToContext(context.Background(), feature.Flags{ + feature.OIDCAuthentication: feature.Enabled, + }), + Objects: []runtime.Object{ + NewFlowsParallel(parallelName, testNS, + WithInitFlowsParallelConditions, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{ + {Subscriber: createSubscriber(0)}, + })), + createChannel(parallelName), + createBranchChannel(parallelName, 0), + resources.NewFilterSubscription(0, NewFlowsParallel(parallelName, testNS, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{ + {Subscriber: createSubscriber(0)}, + }))), + resources.NewSubscription(0, NewFlowsParallel(parallelName, testNS, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{ + {Subscriber: createSubscriber(0)}, + }))), + }, + WantErr: false, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewFlowsParallel(parallelName, testNS, + WithInitFlowsParallelConditions, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{{Subscriber: createSubscriber(0)}}), + WithFlowsParallelChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none"), + WithFlowsParallelAddressableNotReady("emptyAddress", "addressable is nil"), + WithFlowsParallelSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), + WithFlowsParallelIngressChannelStatus(createParallelChannelStatus(parallelName, corev1.ConditionFalse)), + WithFlowsParallelOIDCServiceAccountName(makeFlowParallelOIDCServiceAccount().Name), + WithFlowsParallelOIDCIdentityCreatedSucceeded(), + WithFlowsParallelBranchStatuses([]v1.ParallelBranchStatus{{ + FilterSubscriptionStatus: createParallelFilterSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), + FilterChannelStatus: createParallelBranchChannelStatus(parallelName, 0, corev1.ConditionFalse), + SubscriptionStatus: createParallelSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), + }})), + }}, + WantCreates: []runtime.Object{ + makeFlowParallelOIDCServiceAccount(), + }, + }, { + Name: "OIDC: Parallel not ready on invalid OIDC service account", + Key: pKey, + Ctx: feature.ToContext(context.Background(), feature.Flags{ + feature.OIDCAuthentication: feature.Enabled, + }), + Objects: []runtime.Object{ + makeFlowParallelOIDCServiceAccountWithoutOwnerRef(), + NewFlowsParallel(parallelName, testNS, + WithInitFlowsParallelConditions, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{ + {Subscriber: createSubscriber(0)}, + }))}, + WantErr: true, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewFlowsParallel(parallelName, testNS, + WithInitFlowsParallelConditions, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{{Subscriber: createSubscriber(0)}}), + // WithFlowsParallelChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none"), + // WithFlowsParallelAddressableNotReady("emptyAddress", "addressable is nil"), + // WithFlowsParallelSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), + // WithFlowsParallelIngressChannelStatus(createParallelChannelStatus(parallelName, corev1.ConditionFalse)), + WithFlowsParallelOIDCIdentityCreatedFailed("Unable to resolve service account for OIDC authentication", fmt.Sprintf("service account %s not owned by Parallel %s", makeFlowParallelOIDCServiceAccountWithoutOwnerRef().Name, parallelName)), + WithFlowsParallelOIDCServiceAccountName(makeFlowParallelOIDCServiceAccountWithoutOwnerRef().Name), + ), + }}, + WantEvents: []string{ + Eventf(corev1.EventTypeWarning, "InternalError", fmt.Sprintf("service account %s not owned by Parallel %s", makeFlowParallelOIDCServiceAccountWithoutOwnerRef().Name, parallelName)), + }}, } logger := logtesting.TestLogger(t) table.Test(t, MakeFactory(func(ctx context.Context, listers *Listers, cmw configmap.Watcher) controller.Reconciler { ctx = channelable.WithDuck(ctx) r := &Reconciler{ - parallelLister: listers.GetParallelLister(), - channelableTracker: duck.NewListableTrackerFromTracker(ctx, channelable.Get, tracker.New(func(types.NamespacedName) {}, 0)), - subscriptionLister: listers.GetSubscriptionLister(), - eventingClientSet: fakeeventingclient.Get(ctx), - dynamicClientSet: fakedynamicclient.Get(ctx), + parallelLister: listers.GetParallelLister(), + channelableTracker: duck.NewListableTrackerFromTracker(ctx, channelable.Get, tracker.New(func(types.NamespacedName) {}, 0)), + subscriptionLister: listers.GetSubscriptionLister(), + eventingClientSet: fakeeventingclient.Get(ctx), + kubeclient: fakekubeclient.Get(ctx), + dynamicClientSet: fakedynamicclient.Get(ctx), + serviceAccountLister: listers.GetServiceAccountLister(), } return parallel.NewReconciler(ctx, logging.FromContext(ctx), fakeeventingclient.Get(ctx), listers.GetParallelLister(), @@ -737,3 +829,20 @@ func createDelivery(gvk metav1.GroupVersionKind, name, namespace string) *eventi }, } } + +func makeFlowParallelOIDCServiceAccount() *corev1.ServiceAccount { + return auth.GetOIDCServiceAccountForResource(v1.SchemeGroupVersion.WithKind("Parallel"), metav1.ObjectMeta{ + Name: parallelName, + Namespace: testNS, + }) +} + +func makeFlowParallelOIDCServiceAccountWithoutOwnerRef() *corev1.ServiceAccount { + sa := auth.GetOIDCServiceAccountForResource(v1.SchemeGroupVersion.WithKind("Parallel"), metav1.ObjectMeta{ + Name: parallelName, + Namespace: testNS, + }) + sa.OwnerReferences = nil + + return sa +} diff --git a/pkg/reconciler/testing/v1/parallel.go b/pkg/reconciler/testing/v1/parallel.go index a9d50c0743c..dd8463a14d1 100644 --- a/pkg/reconciler/testing/v1/parallel.go +++ b/pkg/reconciler/testing/v1/parallel.go @@ -18,9 +18,11 @@ package testing import ( "context" + "fmt" "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "knative.dev/eventing/pkg/apis/feature" flowsv1 "knative.dev/eventing/pkg/apis/flows/v1" messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1" duckv1 "knative.dev/pkg/apis/duck/v1" @@ -113,3 +115,31 @@ func WithFlowsParallelAddressableNotReady(reason, message string) FlowsParallelO p.Status.MarkAddressableNotReady(reason, message) } } + +func WithFlowsParallelOIDCIdentityCreatedSucceeded() FlowsParallelOption { + return func(p *flowsv1.Parallel) { + p.Status.MarkOIDCIdentityCreatedSucceeded() + } +} + +func WithFlowsParallelOIDCIdentityCreatedSucceededBecauseOIDCFeatureDisabled() FlowsParallelOption { + return func(p *flowsv1.Parallel) { + p.Status.MarkOIDCIdentityCreatedSucceededWithReason(fmt.Sprintf("%s feature disabled", feature.OIDCAuthentication), "") + } +} + +func WithFlowsParallelOIDCIdentityCreatedFailed(reason, message string) FlowsParallelOption { + return func(p *flowsv1.Parallel) { + p.Status.MarkOIDCIdentityCreatedFailed(reason, message) + } +} + +func WithFlowsParallelOIDCServiceAccountName(name string) FlowsParallelOption { + return func(p *flowsv1.Parallel) { + if p.Status.Auth == nil { + p.Status.Auth = &duckv1.AuthStatus{} + } + + p.Status.Auth.ServiceAccountName = &name + } +}