Skip to content

Commit

Permalink
Add serviceaccount in parallel (#7373)
Browse files Browse the repository at this point in the history
* added serviceaccount in parallel

Signed-off-by: Griffin <[email protected]>

* Update controller.go

added changes

* Update controller.go

added `globalResync`

* added Marker functions and tests

Signed-off-by: Griffin <[email protected]>

* added tests

Signed-off-by: Griffin <[email protected]>

* added function in test and fixed lint

Signed-off-by: Griffin <[email protected]>

* fixing test in parallel_test

Signed-off-by: Griffin <[email protected]>

* fixed lint

Signed-off-by: Griffin <[email protected]>

* modified parallel_test

Signed-off-by: Griffin <[email protected]>

---------

Signed-off-by: Griffin <[email protected]>
  • Loading branch information
prakrit55 authored Nov 17, 2023
1 parent 32d7dd8 commit dc96522
Show file tree
Hide file tree
Showing 7 changed files with 340 additions and 81 deletions.
21 changes: 19 additions & 2 deletions pkg/apis/flows/v1/parallel_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
pkgduckv1 "knative.dev/pkg/apis/duck/v1"
)

var pCondSet = apis.NewLivingConditionSet(ParallelConditionReady, ParallelConditionChannelsReady, ParallelConditionSubscriptionsReady, ParallelConditionAddressable)
var pCondSet = apis.NewLivingConditionSet(ParallelConditionReady, ParallelConditionChannelsReady, ParallelConditionSubscriptionsReady, ParallelConditionAddressable, ParallelConditionOIDCIdentityCreated)

const (
// ParallelConditionReady has status True when all subconditions below have been set to True.
Expand All @@ -41,7 +41,8 @@ const (

// ParallelConditionAddressable has status true when this Parallel meets
// the Addressable contract and has a non-empty hostname.
ParallelConditionAddressable apis.ConditionType = "Addressable"
ParallelConditionAddressable apis.ConditionType = "Addressable"
ParallelConditionOIDCIdentityCreated apis.ConditionType = "OIDCIdentityCreated"
)

// GetConditionSet retrieves the condition set for this resource. Implements the KRShaped interface.
Expand Down Expand Up @@ -195,6 +196,22 @@ func (ps *ParallelStatus) MarkAddressableNotReady(reason, messageFormat string,
pCondSet.Manage(ps).MarkFalse(ParallelConditionAddressable, reason, messageFormat, messageA...)
}

func (ps *ParallelStatus) MarkOIDCIdentityCreatedSucceeded() {
pCondSet.Manage(ps).MarkTrue(ParallelConditionOIDCIdentityCreated)
}

func (ps *ParallelStatus) MarkOIDCIdentityCreatedSucceededWithReason(reason, messageFormat string, messageA ...interface{}) {
pCondSet.Manage(ps).MarkTrueWithReason(ParallelConditionOIDCIdentityCreated, reason, messageFormat, messageA...)
}

func (ps *ParallelStatus) MarkOIDCIdentityCreatedFailed(reason, messageFormat string, messageA ...interface{}) {
pCondSet.Manage(ps).MarkFalse(ParallelConditionOIDCIdentityCreated, reason, messageFormat, messageA...)
}

func (ps *ParallelStatus) MarkOIDCIdentityCreatedUnknown(reason, messageFormat string, messageA ...interface{}) {
pCondSet.Manage(ps).MarkUnknown(ParallelConditionOIDCIdentityCreated, reason, messageFormat, messageA...)
}

func (ps *ParallelStatus) setAddress(address *pkgduckv1.Addressable) {
ps.Address = address
if address == nil {
Expand Down
173 changes: 107 additions & 66 deletions pkg/apis/flows/v1/parallel_lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ func TestParallelInitializeConditions(t *testing.T) {
}, {
Type: ParallelConditionChannelsReady,
Status: corev1.ConditionUnknown,
}, {
Type: ParallelConditionOIDCIdentityCreated,
Status: corev1.ConditionUnknown,
}, {
Type: ParallelConditionReady,
Status: corev1.ConditionUnknown,
Expand Down Expand Up @@ -115,6 +118,9 @@ func TestParallelInitializeConditions(t *testing.T) {
}, {
Type: ParallelConditionChannelsReady,
Status: corev1.ConditionFalse,
}, {
Type: ParallelConditionOIDCIdentityCreated,
Status: corev1.ConditionUnknown,
}, {
Type: ParallelConditionReady,
Status: corev1.ConditionUnknown,
Expand Down Expand Up @@ -142,6 +148,9 @@ func TestParallelInitializeConditions(t *testing.T) {
}, {
Type: ParallelConditionChannelsReady,
Status: corev1.ConditionUnknown,
}, {
Type: ParallelConditionOIDCIdentityCreated,
Status: corev1.ConditionUnknown,
}, {
Type: ParallelConditionReady,
Status: corev1.ConditionUnknown,
Expand Down Expand Up @@ -327,89 +336,121 @@ func TestParallelPropagateSubscriptionStatusUpdated(t *testing.T) {

func TestParallelReady(t *testing.T) {
tests := []struct {
name string
fsubs []*messagingv1.Subscription
subs []*messagingv1.Subscription
ichannel *eventingduckv1.Channelable
channels []*eventingduckv1.Channelable
want bool
name string
fsubs []*messagingv1.Subscription
subs []*messagingv1.Subscription
ichannel *eventingduckv1.Channelable
channels []*eventingduckv1.Channelable
markOIDCServiceAccountCreated bool
want bool
}{{
name: "ingress false, empty",
fsubs: []*messagingv1.Subscription{},
subs: []*messagingv1.Subscription{},
ichannel: getChannelable(false),
channels: []*eventingduckv1.Channelable{},
want: false,
name: "ingress false, empty, serviceAccount ready",
fsubs: []*messagingv1.Subscription{},
subs: []*messagingv1.Subscription{},
ichannel: getChannelable(false),
channels: []*eventingduckv1.Channelable{},
markOIDCServiceAccountCreated: true,
want: false,
}, {
name: "ingress true, empty",
fsubs: []*messagingv1.Subscription{},
subs: []*messagingv1.Subscription{},
ichannel: getChannelable(true),
channels: []*eventingduckv1.Channelable{},
want: false,
name: "ingress true, empty, serviceAccount ready",
fsubs: []*messagingv1.Subscription{},
subs: []*messagingv1.Subscription{},
ichannel: getChannelable(true),
channels: []*eventingduckv1.Channelable{},
markOIDCServiceAccountCreated: true,
want: false,
}, {
name: "ingress true, one channelable not ready, one subscription ready",
ichannel: getChannelable(true),
channels: []*eventingduckv1.Channelable{getChannelable(false)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true)},
want: false,
name: "ingress true, one channelable not ready, one subscription ready, serviceAccount ready",
ichannel: getChannelable(true),
channels: []*eventingduckv1.Channelable{getChannelable(false)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true)},
markOIDCServiceAccountCreated: true,
want: false,
}, {
name: "ingress true, one channelable ready, one subscription not ready",
ichannel: getChannelable(true),
channels: []*eventingduckv1.Channelable{getChannelable(true)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", false)},
subs: []*messagingv1.Subscription{getSubscription("sub0", false)},
want: false,
name: "ingress true, one channelable ready, one subscription not ready, serviceAccount ready",
ichannel: getChannelable(true),
channels: []*eventingduckv1.Channelable{getChannelable(true)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", false)},
subs: []*messagingv1.Subscription{getSubscription("sub0", false)},
markOIDCServiceAccountCreated: true,
want: false,
}, {
name: "ingress false, one channelable ready, one subscription ready",
ichannel: getChannelable(false),
channels: []*eventingduckv1.Channelable{getChannelable(true)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true)},
want: false,
name: "ingress false, one channelable ready, one subscription ready,serviceAccount ready",
ichannel: getChannelable(false),
channels: []*eventingduckv1.Channelable{getChannelable(true)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true)},
markOIDCServiceAccountCreated: true,
want: false,
}, {
name: "ingress true, one channelable ready, one subscription ready",
ichannel: getChannelable(true),
channels: []*eventingduckv1.Channelable{getChannelable(true)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true)},
want: true,
name: "ingress true, one channelable ready, one subscription ready, serviceAccount ready",
ichannel: getChannelable(true),
channels: []*eventingduckv1.Channelable{getChannelable(true)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true)},
markOIDCServiceAccountCreated: true,
want: true,
}, {
name: "ingress true, one channelable ready, one not, two subscriptions ready",
ichannel: getChannelable(true),
channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(false)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", true)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", true)},
want: false,
name: "ingress true, one channelable ready, one subscription ready, serviceAccount not ready",
ichannel: getChannelable(true),
channels: []*eventingduckv1.Channelable{getChannelable(true)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true)},
markOIDCServiceAccountCreated: false,
want: false,
}, {
name: "ingress true, two channelables ready, one subscription ready, one not",
ichannel: getChannelable(true),
channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(true)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", false)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", false)},
want: false,
name: "ingress true, one channelable ready, one not, two subscriptions ready, serviceAccount ready",
ichannel: getChannelable(true),
channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(false)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", true)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", true)},
markOIDCServiceAccountCreated: true,
want: false,
}, {
name: "ingress false, two channelables ready, two subscriptions ready",
ichannel: getChannelable(false),
channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(true)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", true)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", true)},
want: false,
name: "ingress true, two channelables ready, one subscription ready, one not, serviceAccount ready",
ichannel: getChannelable(true),
channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(true)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", false)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", false)},
markOIDCServiceAccountCreated: true,
want: false,
}, {
name: "ingress true, two channelables ready, two subscriptions ready",
ichannel: getChannelable(true),
channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(true)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", true)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", true)},
want: true,
name: "ingress false, two channelables ready, two subscriptions ready, serviceAccount ready",
ichannel: getChannelable(false),
channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(true)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", true)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", true)},
markOIDCServiceAccountCreated: true,
want: false,
}, {
name: "ingress true, two channelables ready, two subscriptions ready, serviceAccount not ready",
ichannel: getChannelable(true),
channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(true)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", true)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", true)},
markOIDCServiceAccountCreated: false,
want: false,
}, {
name: "ingress true, two channelables ready, two subscriptions ready, serviceAccount ready",
ichannel: getChannelable(true),
channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(true)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", true)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", true)},
markOIDCServiceAccountCreated: true,
want: true,
}}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ps := ParallelStatus{}
ps.PropagateChannelStatuses(test.ichannel, test.channels)
ps.PropagateSubscriptionStatuses(test.fsubs, test.subs)
if test.markOIDCServiceAccountCreated {
ps.MarkOIDCIdentityCreatedSucceeded()
} else {
ps.MarkOIDCIdentityCreatedFailed("Unable to create serviceaccount", "")
}
got := ps.IsReady()
want := test.want
if want != got {
Expand Down
38 changes: 33 additions & 5 deletions pkg/reconciler/parallel/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,15 @@ import (
"context"

"k8s.io/client-go/tools/cache"
"knative.dev/eventing/pkg/apis/feature"
v1 "knative.dev/eventing/pkg/apis/flows/v1"
"knative.dev/eventing/pkg/duck"
kubeclient "knative.dev/pkg/client/injection/kube/client"
serviceaccountinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/serviceaccount"
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
"knative.dev/pkg/injection/clients/dynamicclient"
"knative.dev/pkg/logging"

eventingclient "knative.dev/eventing/pkg/client/injection/client"
"knative.dev/eventing/pkg/client/injection/ducks/duck/v1/channelable"
Expand All @@ -42,14 +46,33 @@ func NewController(

parallelInformer := parallel.Get(ctx)
subscriptionInformer := subscription.Get(ctx)
serviceaccountInformer := serviceaccountinformer.Get(ctx)

var globalResync func(obj interface{})
featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"), func(name string, value interface{}) {
if globalResync != nil {
globalResync(nil)
}
})
featureStore.WatchConfigs(cmw)

r := &Reconciler{
parallelLister: parallelInformer.Lister(),
subscriptionLister: subscriptionInformer.Lister(),
dynamicClientSet: dynamicclient.Get(ctx),
eventingClientSet: eventingclient.Get(ctx),
parallelLister: parallelInformer.Lister(),
subscriptionLister: subscriptionInformer.Lister(),
serviceAccountLister: serviceaccountInformer.Lister(),
kubeclient: kubeclient.Get(ctx),
dynamicClientSet: dynamicclient.Get(ctx),
eventingClientSet: eventingclient.Get(ctx),
}
impl := parallelreconciler.NewImpl(ctx, r, func(impl *controller.Impl) controller.Options {
return controller.Options{
ConfigStore: featureStore,
}
})

globalResync = func(_ interface{}) {
impl.GlobalResync(parallelInformer.Informer())
}
impl := parallelreconciler.NewImpl(ctx, r)

r.channelableTracker = duck.NewListableTrackerFromTracker(ctx, channelable.Get, impl.Tracker)
parallelInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue))
Expand All @@ -60,6 +83,11 @@ func NewController(
FilterFunc: controller.FilterController(&v1.Parallel{}),
Handler: controller.HandleAll(impl.EnqueueControllerOf),
})
// Reconcile Parallel when the OIDC service account changes
serviceaccountInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: controller.FilterController(&v1.Parallel{}),
Handler: controller.HandleAll(impl.EnqueueControllerOf),
})

return impl
}
12 changes: 11 additions & 1 deletion pkg/reconciler/parallel/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,29 @@ package parallel
import (
"testing"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"knative.dev/pkg/configmap"
. "knative.dev/pkg/reconciler/testing"

// Fake injection informers
"knative.dev/eventing/pkg/apis/feature"
_ "knative.dev/eventing/pkg/client/injection/ducks/duck/v1/channelable/fake"
_ "knative.dev/eventing/pkg/client/injection/informers/flows/v1/parallel/fake"
_ "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/subscription/fake"
_ "knative.dev/pkg/client/injection/kube/informers/core/v1/serviceaccount/fake"
)

func TestNew(t *testing.T) {
ctx, _ := SetupFakeContext(t)

c := NewController(ctx, configmap.NewStaticWatcher())
c := NewController(ctx, configmap.NewStaticWatcher(
&corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: feature.FlagsConfigName,
},
},
))

if c == nil {
t.Fatal("Expected NewController to return a non-nil value")
Expand Down
Loading

0 comments on commit dc96522

Please sign in to comment.