From 93cec0f6a8c750bb63010b2baa385146c46381fb Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Tue, 12 Sep 2023 01:47:39 -0400 Subject: [PATCH] Fixed eventtype create-delete loop on built in sources (#7245) * Fixed eventtype create-delete loop on built in sources Signed-off-by: Calum Murray * Fix unit tests Signed-off-by: Calum Murray * Fixed source finalizers Signed-off-by: Calum Murray * Fixed tests Signed-off-by: Calum Murray --------- Signed-off-by: Calum Murray --- .../apiserversource/apiserversource.go | 7 ++ .../apiserversource/apiserversource_test.go | 71 +++++++++++++++++++ pkg/reconciler/pingsource/pingsource.go | 7 ++ pkg/reconciler/pingsource/pingsource_test.go | 49 +++++++++++++ pkg/reconciler/source/duck/duck.go | 7 ++ pkg/reconciler/source/duck/duck_test.go | 3 + .../features/apiserversource/data_plane.go | 2 +- 7 files changed, 145 insertions(+), 1 deletion(-) diff --git a/pkg/reconciler/apiserversource/apiserversource.go b/pkg/reconciler/apiserversource/apiserversource.go index f01419950b5..e9c144d6b91 100644 --- a/pkg/reconciler/apiserversource/apiserversource.go +++ b/pkg/reconciler/apiserversource/apiserversource.go @@ -134,6 +134,13 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, source *v1.ApiServerSour return nil } +func (r *Reconciler) FinalizeKind(ctx context.Context, source *v1.ApiServerSource) pkgreconciler.Event { + logging.FromContext(ctx).Info("Deleting source") + // Allow for eventtypes to be cleaned up + source.Status.CloudEventAttributes = []duckv1.CloudEventAttributes{} + return nil +} + func (r *Reconciler) namespacesFromSelector(src *v1.ApiServerSource) ([]string, error) { if src.Spec.NamespaceSelector == nil { return []string{src.Namespace}, nil diff --git a/pkg/reconciler/apiserversource/apiserversource_test.go b/pkg/reconciler/apiserversource/apiserversource_test.go index d7219559b61..64c756b890e 100644 --- a/pkg/reconciler/apiserversource/apiserversource_test.go +++ b/pkg/reconciler/apiserversource/apiserversource_test.go @@ -142,8 +142,12 @@ func TestReconcile(t *testing.T) { }, WantErr: true, WantEvents: []string{ + Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName), Eventf(corev1.EventTypeWarning, "InternalError", `insufficient permissions: User system:serviceaccount:testnamespace:default cannot get, list, watch resource "namespaces" in API group "" in Namespace "testnamespace"`), }, + WantPatches: []clientgotesting.PatchActionImpl{ + patchFinalizers(sourceName, testNS), + }, WithReactors: []clientgotesting.ReactionFunc{subjectAccessReviewCreateReactor(false)}, SkipNamespaceValidation: true, // SubjectAccessReview objects are cluster-scoped. }, { @@ -193,6 +197,12 @@ func TestReconcile(t *testing.T) { makeSubjectAccessReview("namespaces", "list", "default"), makeSubjectAccessReview("namespaces", "watch", "default"), }, + WantEvents: []string{ + Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName), + }, + WantPatches: []clientgotesting.PatchActionImpl{ + patchFinalizers(sourceName, testNS), + }, WithReactors: []clientgotesting.ReactionFunc{subjectAccessReviewCreateReactor(true)}, SkipNamespaceValidation: true, // SubjectAccessReview objects are cluster-scoped. }, { @@ -254,8 +264,12 @@ func TestReconcile(t *testing.T) { Object: makeAvailableReceiveAdapterWithNamespaces(t, []string{"test-a", "test-b"}, false), }}, WantEvents: []string{ + Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName), Eventf(corev1.EventTypeNormal, "ApiServerSourceDeploymentUpdated", `Deployment "apiserversource-test-apiserver-source-1234" updated`), }, + WantPatches: []clientgotesting.PatchActionImpl{ + patchFinalizers(sourceName, testNS), + }, WithReactors: []clientgotesting.ReactionFunc{subjectAccessReviewCreateReactor(true)}, SkipNamespaceValidation: true, // SubjectAccessReview objects are cluster-scoped. }, { @@ -320,8 +334,12 @@ func TestReconcile(t *testing.T) { Object: makeAvailableReceiveAdapterWithNamespaces(t, []string{"test-a", "test-b", "test-c"}, true), }}, WantEvents: []string{ + Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName), Eventf(corev1.EventTypeNormal, "ApiServerSourceDeploymentUpdated", `Deployment "apiserversource-test-apiserver-source-1234" updated`), }, + WantPatches: []clientgotesting.PatchActionImpl{ + patchFinalizers(sourceName, testNS), + }, WithReactors: []clientgotesting.ReactionFunc{subjectAccessReviewCreateReactor(true)}, SkipNamespaceValidation: true, // SubjectAccessReview objects are cluster-scoped. }, { @@ -373,6 +391,12 @@ func TestReconcile(t *testing.T) { makeSubjectAccessReview("namespaces", "list", "default"), makeSubjectAccessReview("namespaces", "watch", "default"), }, + WantEvents: []string{ + Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName), + }, + WantPatches: []clientgotesting.PatchActionImpl{ + patchFinalizers(sourceName, testNS), + }, WithReactors: []clientgotesting.ReactionFunc{subjectAccessReviewCreateReactor(true)}, SkipNamespaceValidation: true, // SubjectAccessReview objects are cluster-scoped. }, { @@ -422,6 +446,12 @@ func TestReconcile(t *testing.T) { makeSubjectAccessReview("namespaces", "list", "default"), makeSubjectAccessReview("namespaces", "watch", "default"), }, + WantEvents: []string{ + Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName), + }, + WantPatches: []clientgotesting.PatchActionImpl{ + patchFinalizers(sourceName, testNS), + }, WithReactors: []clientgotesting.ReactionFunc{subjectAccessReviewCreateReactor(true)}, SkipNamespaceValidation: true, // SubjectAccessReview objects are cluster-scoped. }, { @@ -441,9 +471,13 @@ func TestReconcile(t *testing.T) { }, Key: testNS + "/" + sourceName, WantEvents: []string{ + Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName), Eventf(corev1.EventTypeWarning, "SinkNotFound", `Sink not found: {"ref":{"kind":"Channel","namespace":"testnamespace","name":"testsink","apiVersion":"messaging.knative.dev/v1"}}`), }, + WantPatches: []clientgotesting.PatchActionImpl{ + patchFinalizers(sourceName, testNS), + }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: rttestingv1.NewApiServerSource(sourceName, testNS, rttestingv1.WithApiServerSourceSpec(sourcesv1.ApiServerSourceSpec{ @@ -485,11 +519,15 @@ func TestReconcile(t *testing.T) { Key: testNS + "/" + sourceName, WantErr: true, WantEvents: []string{ + Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName), Eventf(corev1.EventTypeNormal, apiserversourceDeploymentCreated, "Deployment created, error:inducing failure for create deployments"), Eventf(corev1.EventTypeWarning, "InternalError", "inducing failure for create deployments"), }, + WantPatches: []clientgotesting.PatchActionImpl{ + patchFinalizers(sourceName, testNS), + }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: rttestingv1.NewApiServerSource(sourceName, testNS, rttestingv1.WithApiServerSourceSpec(sourcesv1.ApiServerSourceSpec{ @@ -578,6 +616,12 @@ func TestReconcile(t *testing.T) { makeSubjectAccessReview("namespaces", "list", "default"), makeSubjectAccessReview("namespaces", "watch", "default"), }, + WantEvents: []string{ + Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName), + }, + WantPatches: []clientgotesting.PatchActionImpl{ + patchFinalizers(sourceName, testNS), + }, WithReactors: []clientgotesting.ReactionFunc{subjectAccessReviewCreateReactor(true)}, SkipNamespaceValidation: true, // SubjectAccessReview objects are cluster-scoped. }, { @@ -602,8 +646,12 @@ func TestReconcile(t *testing.T) { }, Key: testNS + "/" + sourceName, WantEvents: []string{ + Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName), Eventf(corev1.EventTypeNormal, "ApiServerSourceDeploymentUpdated", `Deployment "apiserversource-test-apiserver-source-1234" updated`), }, + WantPatches: []clientgotesting.PatchActionImpl{ + patchFinalizers(sourceName, testNS), + }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: rttestingv1.NewApiServerSource(sourceName, testNS, rttestingv1.WithApiServerSourceSpec(sourcesv1.ApiServerSourceSpec{ @@ -660,8 +708,12 @@ func TestReconcile(t *testing.T) { }, Key: testNS + "/" + sourceName, WantEvents: []string{ + Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName), Eventf(corev1.EventTypeNormal, "ApiServerSourceDeploymentUpdated", `Deployment "apiserversource-test-apiserver-source-1234" updated`), }, + WantPatches: []clientgotesting.PatchActionImpl{ + patchFinalizers(sourceName, testNS), + }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: rttestingv1.NewApiServerSource(sourceName, testNS, rttestingv1.WithApiServerSourceSpec(sourcesv1.ApiServerSourceSpec{ @@ -718,8 +770,12 @@ func TestReconcile(t *testing.T) { }, Key: testNS + "/" + sourceName, WantEvents: []string{ + Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName), Eventf(corev1.EventTypeNormal, "ApiServerSourceDeploymentUpdated", `Deployment "apiserversource-test-apiserver-source-1234" updated`), }, + WantPatches: []clientgotesting.PatchActionImpl{ + patchFinalizers(sourceName, testNS), + }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: rttestingv1.NewApiServerSource(sourceName, testNS, rttestingv1.WithApiServerSourceSpec(sourcesv1.ApiServerSourceSpec{ @@ -793,6 +849,12 @@ func TestReconcile(t *testing.T) { rttestingv1.WithApiServerSourceStatusNamespaces([]string{testNS}), ), }}, + WantEvents: []string{ + Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName), + }, + WantPatches: []clientgotesting.PatchActionImpl{ + patchFinalizers(sourceName, testNS), + }, WantCreates: []runtime.Object{ makeSubjectAccessReview("namespaces", "get", "default"), makeSubjectAccessReview("namespaces", "list", "default"), @@ -1018,3 +1080,12 @@ func subjectAccessReviewCreateReactor(allowed bool) clientgotesting.ReactionFunc return false, nil, nil } } + +func patchFinalizers(name, namespace string) clientgotesting.PatchActionImpl { + action := clientgotesting.PatchActionImpl{} + action.Name = name + action.Namespace = namespace + patch := `{"metadata":{"finalizers":["apiserversources.sources.knative.dev"],"resourceVersion":""}}` + action.Patch = []byte(patch) + return action +} diff --git a/pkg/reconciler/pingsource/pingsource.go b/pkg/reconciler/pingsource/pingsource.go index 9dc73e529a2..ea1afae131e 100644 --- a/pkg/reconciler/pingsource/pingsource.go +++ b/pkg/reconciler/pingsource/pingsource.go @@ -135,6 +135,13 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, source *sourcesv1.PingSo return nil } +func (r *Reconciler) FinalizeKind(ctx context.Context, source *sourcesv1.PingSource) pkgreconciler.Event { + logging.FromContext(ctx).Info("Deleting source") + // Allow for eventtypes to be cleaned up + source.Status.CloudEventAttributes = []duckv1.CloudEventAttributes{} + return nil +} + func (r *Reconciler) reconcileReceiveAdapter(ctx context.Context, source *sourcesv1.PingSource) (*appsv1.Deployment, error) { args := resources.Args{ ConfigEnvVars: r.configAcc.ToEnvVars(), diff --git a/pkg/reconciler/pingsource/pingsource_test.go b/pkg/reconciler/pingsource/pingsource_test.go index ab8aaaeda68..d2c099983f6 100644 --- a/pkg/reconciler/pingsource/pingsource_test.go +++ b/pkg/reconciler/pingsource/pingsource_test.go @@ -144,9 +144,13 @@ func TestAllCases(t *testing.T) { ), }}, WantEvents: []string{ + Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName), Eventf(corev1.EventTypeWarning, "SinkNotFound", `Sink not found: {"ref":{"kind":"Channel","namespace":"testnamespace","name":"testsink","apiVersion":"messaging.knative.dev/v1"}}`), }, + WantPatches: []clientgotesting.PatchActionImpl{ + patchFinalizers(sourceName, testNS), + }, }, { Name: "sink ref has no namespace", Objects: []runtime.Object{ @@ -183,9 +187,13 @@ func TestAllCases(t *testing.T) { ), }}, WantEvents: []string{ + Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName), Eventf(corev1.EventTypeWarning, "SinkNotFound", `Sink not found: {"ref":{"kind":"Channel","namespace":"testnamespace","name":"testsink","apiVersion":"messaging.knative.dev/v1"}}`), }, + WantPatches: []clientgotesting.PatchActionImpl{ + patchFinalizers(sourceName, testNS), + }, }, { Name: "error creating deployment", Objects: []runtime.Object{ @@ -208,8 +216,12 @@ func TestAllCases(t *testing.T) { }, Key: testNS + "/" + sourceName, WantEvents: []string{ + Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName), Eventf(corev1.EventTypeWarning, "InternalError", "deployments.apps \"pingsource-mt-adapter\" not found"), }, + WantPatches: []clientgotesting.PatchActionImpl{ + patchFinalizers(sourceName, testNS), + }, WantErr: true, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: rtv1.NewPingSource(sourceName, testNS, @@ -271,6 +283,12 @@ func TestAllCases(t *testing.T) { rtv1.WithPingSourceStatusObservedGeneration(generation), ), }}, + WantEvents: []string{ + Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName), + }, + WantPatches: []clientgotesting.PatchActionImpl{ + patchFinalizers(sourceName, testNS), + }, }, { Name: "Propagate CA certs", Objects: []runtime.Object{ @@ -321,6 +339,12 @@ func TestAllCases(t *testing.T) { rtv1.WithPingSourceStatusObservedGeneration(generation), ), }}, + WantEvents: []string{ + Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName), + }, + WantPatches: []clientgotesting.PatchActionImpl{ + patchFinalizers(sourceName, testNS), + }, }, { Name: "deployment update due to env", Objects: []runtime.Object{ @@ -344,8 +368,12 @@ func TestAllCases(t *testing.T) { }, Key: testNS + "/" + sourceName, WantEvents: []string{ + Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName), Eventf(corev1.EventTypeNormal, pingSourceDeploymentUpdated, `PingSource adapter deployment updated`), }, + WantPatches: []clientgotesting.PatchActionImpl{ + patchFinalizers(sourceName, testNS), + }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: rtv1.NewPingSource(sourceName, testNS, rtv1.WithPingSourceSpec(sourcesv1.PingSourceSpec{ @@ -411,6 +439,12 @@ func TestAllCases(t *testing.T) { rtv1.WithPingSourceStatusObservedGeneration(generation), ), }}, + WantEvents: []string{ + Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName), + }, + WantPatches: []clientgotesting.PatchActionImpl{ + patchFinalizers(sourceName, testNS), + }, }, { Name: "valid with dataBase64", Objects: []runtime.Object{ @@ -453,6 +487,12 @@ func TestAllCases(t *testing.T) { rtv1.WithPingSourceStatusObservedGeneration(generation), ), }}, + WantEvents: []string{ + Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName), + }, + WantPatches: []clientgotesting.PatchActionImpl{ + patchFinalizers(sourceName, testNS), + }, }, } @@ -512,3 +552,12 @@ func makeAvailableMTAdapterWithDifferentEnv() *appsv1.Deployment { WithDeploymentAvailable()(ma) return ma } + +func patchFinalizers(name, namespace string) clientgotesting.PatchActionImpl { + action := clientgotesting.PatchActionImpl{} + action.Name = name + action.Namespace = namespace + patch := `{"metadata":{"finalizers":["pingsources.sources.knative.dev"],"resourceVersion":""}}` + action.Patch = []byte(patch) + return action +} diff --git a/pkg/reconciler/source/duck/duck.go b/pkg/reconciler/source/duck/duck.go index 932647d3bfa..7ca55ec48c0 100644 --- a/pkg/reconciler/source/duck/duck.go +++ b/pkg/reconciler/source/duck/duck.go @@ -41,6 +41,10 @@ import ( "knative.dev/eventing/pkg/reconciler/source/duck/resources" ) +const ( + defaultNamespace = "default" +) + type Reconciler struct { // eventingClientSet allows us to configure Eventing objects eventingClientSet clientset.Interface @@ -211,6 +215,9 @@ func (r *Reconciler) makeEventTypes(ctx context.Context, src *duckv1.Source) []v CeSchema: schemaURL, Description: description, }) + if eventType.Spec.Reference.Namespace == "" { + eventType.Spec.Reference.Namespace = defaultNamespace + } eventTypes = append(eventTypes, *eventType) } return eventTypes diff --git a/pkg/reconciler/source/duck/duck_test.go b/pkg/reconciler/source/duck/duck_test.go index 2b8f58ef144..1bd6dc44458 100644 --- a/pkg/reconciler/source/duck/duck_test.go +++ b/pkg/reconciler/source/duck/duck_test.go @@ -345,6 +345,9 @@ func makeEventType(ceType, ceSource string) *v1beta2.EventType { func makeEventTypeWithReference(ceType, ceSource string, ref *duckv1.KReference) *v1beta2.EventType { ceSourceURL, _ := apis.ParseURL(ceSource) + if ref.Namespace == "" { + ref.Namespace = "default" + } return &v1beta2.EventType{ ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf("%x", md5.Sum([]byte(ceType+ceSource+sourceUID))), diff --git a/test/rekt/features/apiserversource/data_plane.go b/test/rekt/features/apiserversource/data_plane.go index a00e47693cf..4a9005c1672 100644 --- a/test/rekt/features/apiserversource/data_plane.go +++ b/test/rekt/features/apiserversource/data_plane.go @@ -256,7 +256,7 @@ func SendsEventsWithEventTypes() *feature.Feature { }) f.Requirement("ApiServerSource goes ready", apiserversource.IsReady(source)) - expectedCeTypes := sets.New(sources.ApiServerSourceEventReferenceModeTypes...) + expectedCeTypes := sets.New(sources.ApiServerSourceEventResourceModeTypes...) f.Stable("ApiServerSource as event source"). Must("delivers events on broker with URI",