Skip to content

Commit

Permalink
Fixed eventtype create-delete loop on built in sources (knative#7245) (
Browse files Browse the repository at this point in the history
…knative#7250) (#491)

* Fixed eventtype create-delete loop on built in sources



* Fix unit tests



* Fixed source finalizers



* Fixed tests



---------

Signed-off-by: Calum Murray <[email protected]>
Co-authored-by: Calum Murray <[email protected]>
  • Loading branch information
dsimansk and Cali0707 authored Jan 12, 2024
1 parent 428995a commit 62d36d2
Show file tree
Hide file tree
Showing 7 changed files with 145 additions and 1 deletion.
7 changes: 7 additions & 0 deletions pkg/reconciler/apiserversource/apiserversource.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,13 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, source *v1.ApiServerSour
return nil
}

func (r *Reconciler) FinalizeKind(ctx context.Context, source *v1.ApiServerSource) pkgreconciler.Event {
logging.FromContext(ctx).Info("Deleting source")
// Allow for eventtypes to be cleaned up
source.Status.CloudEventAttributes = []duckv1.CloudEventAttributes{}
return nil
}

func (r *Reconciler) namespacesFromSelector(src *v1.ApiServerSource) ([]string, error) {
if src.Spec.NamespaceSelector == nil {
return []string{src.Namespace}, nil
Expand Down
71 changes: 71 additions & 0 deletions pkg/reconciler/apiserversource/apiserversource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,12 @@ func TestReconcile(t *testing.T) {
},
WantErr: true,
WantEvents: []string{
Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName),
Eventf(corev1.EventTypeWarning, "InternalError", `insufficient permissions: User system:serviceaccount:testnamespace:default cannot get, list, watch resource "namespaces" in API group "" in Namespace "testnamespace"`),
},
WantPatches: []clientgotesting.PatchActionImpl{
patchFinalizers(sourceName, testNS),
},
WithReactors: []clientgotesting.ReactionFunc{subjectAccessReviewCreateReactor(false)},
SkipNamespaceValidation: true, // SubjectAccessReview objects are cluster-scoped.
}, {
Expand Down Expand Up @@ -193,6 +197,12 @@ func TestReconcile(t *testing.T) {
makeSubjectAccessReview("namespaces", "list", "default"),
makeSubjectAccessReview("namespaces", "watch", "default"),
},
WantEvents: []string{
Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName),
},
WantPatches: []clientgotesting.PatchActionImpl{
patchFinalizers(sourceName, testNS),
},
WithReactors: []clientgotesting.ReactionFunc{subjectAccessReviewCreateReactor(true)},
SkipNamespaceValidation: true, // SubjectAccessReview objects are cluster-scoped.
}, {
Expand Down Expand Up @@ -254,8 +264,12 @@ func TestReconcile(t *testing.T) {
Object: makeAvailableReceiveAdapterWithNamespaces(t, []string{"test-a", "test-b"}, false),
}},
WantEvents: []string{
Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName),
Eventf(corev1.EventTypeNormal, "ApiServerSourceDeploymentUpdated", `Deployment "apiserversource-test-apiserver-source-1234" updated`),
},
WantPatches: []clientgotesting.PatchActionImpl{
patchFinalizers(sourceName, testNS),
},
WithReactors: []clientgotesting.ReactionFunc{subjectAccessReviewCreateReactor(true)},
SkipNamespaceValidation: true, // SubjectAccessReview objects are cluster-scoped.
}, {
Expand Down Expand Up @@ -320,8 +334,12 @@ func TestReconcile(t *testing.T) {
Object: makeAvailableReceiveAdapterWithNamespaces(t, []string{"test-a", "test-b", "test-c"}, true),
}},
WantEvents: []string{
Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName),
Eventf(corev1.EventTypeNormal, "ApiServerSourceDeploymentUpdated", `Deployment "apiserversource-test-apiserver-source-1234" updated`),
},
WantPatches: []clientgotesting.PatchActionImpl{
patchFinalizers(sourceName, testNS),
},
WithReactors: []clientgotesting.ReactionFunc{subjectAccessReviewCreateReactor(true)},
SkipNamespaceValidation: true, // SubjectAccessReview objects are cluster-scoped.
}, {
Expand Down Expand Up @@ -373,6 +391,12 @@ func TestReconcile(t *testing.T) {
makeSubjectAccessReview("namespaces", "list", "default"),
makeSubjectAccessReview("namespaces", "watch", "default"),
},
WantEvents: []string{
Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName),
},
WantPatches: []clientgotesting.PatchActionImpl{
patchFinalizers(sourceName, testNS),
},
WithReactors: []clientgotesting.ReactionFunc{subjectAccessReviewCreateReactor(true)},
SkipNamespaceValidation: true, // SubjectAccessReview objects are cluster-scoped.
}, {
Expand Down Expand Up @@ -422,6 +446,12 @@ func TestReconcile(t *testing.T) {
makeSubjectAccessReview("namespaces", "list", "default"),
makeSubjectAccessReview("namespaces", "watch", "default"),
},
WantEvents: []string{
Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName),
},
WantPatches: []clientgotesting.PatchActionImpl{
patchFinalizers(sourceName, testNS),
},
WithReactors: []clientgotesting.ReactionFunc{subjectAccessReviewCreateReactor(true)},
SkipNamespaceValidation: true, // SubjectAccessReview objects are cluster-scoped.
}, {
Expand All @@ -441,9 +471,13 @@ func TestReconcile(t *testing.T) {
},
Key: testNS + "/" + sourceName,
WantEvents: []string{
Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName),
Eventf(corev1.EventTypeWarning, "SinkNotFound",
`Sink not found: {"ref":{"kind":"Channel","namespace":"testnamespace","name":"testsink","apiVersion":"messaging.knative.dev/v1"}}`),
},
WantPatches: []clientgotesting.PatchActionImpl{
patchFinalizers(sourceName, testNS),
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: rttestingv1.NewApiServerSource(sourceName, testNS,
rttestingv1.WithApiServerSourceSpec(sourcesv1.ApiServerSourceSpec{
Expand Down Expand Up @@ -485,11 +519,15 @@ func TestReconcile(t *testing.T) {
Key: testNS + "/" + sourceName,
WantErr: true,
WantEvents: []string{
Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName),
Eventf(corev1.EventTypeNormal, apiserversourceDeploymentCreated,
"Deployment created, error:inducing failure for create deployments"),
Eventf(corev1.EventTypeWarning, "InternalError",
"inducing failure for create deployments"),
},
WantPatches: []clientgotesting.PatchActionImpl{
patchFinalizers(sourceName, testNS),
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: rttestingv1.NewApiServerSource(sourceName, testNS,
rttestingv1.WithApiServerSourceSpec(sourcesv1.ApiServerSourceSpec{
Expand Down Expand Up @@ -578,6 +616,12 @@ func TestReconcile(t *testing.T) {
makeSubjectAccessReview("namespaces", "list", "default"),
makeSubjectAccessReview("namespaces", "watch", "default"),
},
WantEvents: []string{
Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName),
},
WantPatches: []clientgotesting.PatchActionImpl{
patchFinalizers(sourceName, testNS),
},
WithReactors: []clientgotesting.ReactionFunc{subjectAccessReviewCreateReactor(true)},
SkipNamespaceValidation: true, // SubjectAccessReview objects are cluster-scoped.
}, {
Expand All @@ -602,8 +646,12 @@ func TestReconcile(t *testing.T) {
},
Key: testNS + "/" + sourceName,
WantEvents: []string{
Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName),
Eventf(corev1.EventTypeNormal, "ApiServerSourceDeploymentUpdated", `Deployment "apiserversource-test-apiserver-source-1234" updated`),
},
WantPatches: []clientgotesting.PatchActionImpl{
patchFinalizers(sourceName, testNS),
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: rttestingv1.NewApiServerSource(sourceName, testNS,
rttestingv1.WithApiServerSourceSpec(sourcesv1.ApiServerSourceSpec{
Expand Down Expand Up @@ -660,8 +708,12 @@ func TestReconcile(t *testing.T) {
},
Key: testNS + "/" + sourceName,
WantEvents: []string{
Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName),
Eventf(corev1.EventTypeNormal, "ApiServerSourceDeploymentUpdated", `Deployment "apiserversource-test-apiserver-source-1234" updated`),
},
WantPatches: []clientgotesting.PatchActionImpl{
patchFinalizers(sourceName, testNS),
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: rttestingv1.NewApiServerSource(sourceName, testNS,
rttestingv1.WithApiServerSourceSpec(sourcesv1.ApiServerSourceSpec{
Expand Down Expand Up @@ -718,8 +770,12 @@ func TestReconcile(t *testing.T) {
},
Key: testNS + "/" + sourceName,
WantEvents: []string{
Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName),
Eventf(corev1.EventTypeNormal, "ApiServerSourceDeploymentUpdated", `Deployment "apiserversource-test-apiserver-source-1234" updated`),
},
WantPatches: []clientgotesting.PatchActionImpl{
patchFinalizers(sourceName, testNS),
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: rttestingv1.NewApiServerSource(sourceName, testNS,
rttestingv1.WithApiServerSourceSpec(sourcesv1.ApiServerSourceSpec{
Expand Down Expand Up @@ -793,6 +849,12 @@ func TestReconcile(t *testing.T) {
rttestingv1.WithApiServerSourceStatusNamespaces([]string{testNS}),
),
}},
WantEvents: []string{
Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName),
},
WantPatches: []clientgotesting.PatchActionImpl{
patchFinalizers(sourceName, testNS),
},
WantCreates: []runtime.Object{
makeSubjectAccessReview("namespaces", "get", "default"),
makeSubjectAccessReview("namespaces", "list", "default"),
Expand Down Expand Up @@ -1018,3 +1080,12 @@ func subjectAccessReviewCreateReactor(allowed bool) clientgotesting.ReactionFunc
return false, nil, nil
}
}

func patchFinalizers(name, namespace string) clientgotesting.PatchActionImpl {
action := clientgotesting.PatchActionImpl{}
action.Name = name
action.Namespace = namespace
patch := `{"metadata":{"finalizers":["apiserversources.sources.knative.dev"],"resourceVersion":""}}`
action.Patch = []byte(patch)
return action
}
7 changes: 7 additions & 0 deletions pkg/reconciler/pingsource/pingsource.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,13 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, source *sourcesv1.PingSo
return nil
}

func (r *Reconciler) FinalizeKind(ctx context.Context, source *sourcesv1.PingSource) pkgreconciler.Event {
logging.FromContext(ctx).Info("Deleting source")
// Allow for eventtypes to be cleaned up
source.Status.CloudEventAttributes = []duckv1.CloudEventAttributes{}
return nil
}

func (r *Reconciler) reconcileReceiveAdapter(ctx context.Context, source *sourcesv1.PingSource) (*appsv1.Deployment, error) {
args := resources.Args{
ConfigEnvVars: r.configAcc.ToEnvVars(),
Expand Down
49 changes: 49 additions & 0 deletions pkg/reconciler/pingsource/pingsource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,13 @@ func TestAllCases(t *testing.T) {
),
}},
WantEvents: []string{
Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName),
Eventf(corev1.EventTypeWarning, "SinkNotFound",
`Sink not found: {"ref":{"kind":"Channel","namespace":"testnamespace","name":"testsink","apiVersion":"messaging.knative.dev/v1"}}`),
},
WantPatches: []clientgotesting.PatchActionImpl{
patchFinalizers(sourceName, testNS),
},
}, {
Name: "sink ref has no namespace",
Objects: []runtime.Object{
Expand Down Expand Up @@ -183,9 +187,13 @@ func TestAllCases(t *testing.T) {
),
}},
WantEvents: []string{
Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName),
Eventf(corev1.EventTypeWarning, "SinkNotFound",
`Sink not found: {"ref":{"kind":"Channel","namespace":"testnamespace","name":"testsink","apiVersion":"messaging.knative.dev/v1"}}`),
},
WantPatches: []clientgotesting.PatchActionImpl{
patchFinalizers(sourceName, testNS),
},
}, {
Name: "error creating deployment",
Objects: []runtime.Object{
Expand All @@ -208,8 +216,12 @@ func TestAllCases(t *testing.T) {
},
Key: testNS + "/" + sourceName,
WantEvents: []string{
Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName),
Eventf(corev1.EventTypeWarning, "InternalError", "deployments.apps \"pingsource-mt-adapter\" not found"),
},
WantPatches: []clientgotesting.PatchActionImpl{
patchFinalizers(sourceName, testNS),
},
WantErr: true,
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: rtv1.NewPingSource(sourceName, testNS,
Expand Down Expand Up @@ -271,6 +283,12 @@ func TestAllCases(t *testing.T) {
rtv1.WithPingSourceStatusObservedGeneration(generation),
),
}},
WantEvents: []string{
Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName),
},
WantPatches: []clientgotesting.PatchActionImpl{
patchFinalizers(sourceName, testNS),
},
}, {
Name: "Propagate CA certs",
Objects: []runtime.Object{
Expand Down Expand Up @@ -321,6 +339,12 @@ func TestAllCases(t *testing.T) {
rtv1.WithPingSourceStatusObservedGeneration(generation),
),
}},
WantEvents: []string{
Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName),
},
WantPatches: []clientgotesting.PatchActionImpl{
patchFinalizers(sourceName, testNS),
},
}, {
Name: "deployment update due to env",
Objects: []runtime.Object{
Expand All @@ -344,8 +368,12 @@ func TestAllCases(t *testing.T) {
},
Key: testNS + "/" + sourceName,
WantEvents: []string{
Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName),
Eventf(corev1.EventTypeNormal, pingSourceDeploymentUpdated, `PingSource adapter deployment updated`),
},
WantPatches: []clientgotesting.PatchActionImpl{
patchFinalizers(sourceName, testNS),
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: rtv1.NewPingSource(sourceName, testNS,
rtv1.WithPingSourceSpec(sourcesv1.PingSourceSpec{
Expand Down Expand Up @@ -411,6 +439,12 @@ func TestAllCases(t *testing.T) {
rtv1.WithPingSourceStatusObservedGeneration(generation),
),
}},
WantEvents: []string{
Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName),
},
WantPatches: []clientgotesting.PatchActionImpl{
patchFinalizers(sourceName, testNS),
},
}, {
Name: "valid with dataBase64",
Objects: []runtime.Object{
Expand Down Expand Up @@ -453,6 +487,12 @@ func TestAllCases(t *testing.T) {
rtv1.WithPingSourceStatusObservedGeneration(generation),
),
}},
WantEvents: []string{
Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName),
},
WantPatches: []clientgotesting.PatchActionImpl{
patchFinalizers(sourceName, testNS),
},
},
}

Expand Down Expand Up @@ -512,3 +552,12 @@ func makeAvailableMTAdapterWithDifferentEnv() *appsv1.Deployment {
WithDeploymentAvailable()(ma)
return ma
}

func patchFinalizers(name, namespace string) clientgotesting.PatchActionImpl {
action := clientgotesting.PatchActionImpl{}
action.Name = name
action.Namespace = namespace
patch := `{"metadata":{"finalizers":["pingsources.sources.knative.dev"],"resourceVersion":""}}`
action.Patch = []byte(patch)
return action
}
7 changes: 7 additions & 0 deletions pkg/reconciler/source/duck/duck.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ import (
"knative.dev/eventing/pkg/reconciler/source/duck/resources"
)

const (
defaultNamespace = "default"
)

type Reconciler struct {
// eventingClientSet allows us to configure Eventing objects
eventingClientSet clientset.Interface
Expand Down Expand Up @@ -211,6 +215,9 @@ func (r *Reconciler) makeEventTypes(ctx context.Context, src *duckv1.Source) []v
CeSchema: schemaURL,
Description: description,
})
if eventType.Spec.Reference.Namespace == "" {
eventType.Spec.Reference.Namespace = defaultNamespace
}
eventTypes = append(eventTypes, *eventType)
}
return eventTypes
Expand Down
3 changes: 3 additions & 0 deletions pkg/reconciler/source/duck/duck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,9 @@ func makeEventType(ceType, ceSource string) *v1beta2.EventType {

func makeEventTypeWithReference(ceType, ceSource string, ref *duckv1.KReference) *v1beta2.EventType {
ceSourceURL, _ := apis.ParseURL(ceSource)
if ref.Namespace == "" {
ref.Namespace = "default"
}
return &v1beta2.EventType{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%x", md5.Sum([]byte(ceType+ceSource+sourceUID))),
Expand Down
2 changes: 1 addition & 1 deletion test/rekt/features/apiserversource/data_plane.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ func SendsEventsWithEventTypes() *feature.Feature {
})
f.Requirement("ApiServerSource goes ready", apiserversource.IsReady(source))

expectedCeTypes := sets.NewString(sources.ApiServerSourceEventReferenceModeTypes...)
expectedCeTypes := sets.NewString(sources.ApiServerSourceEventResourceModeTypes...)

f.Stable("ApiServerSource as event source").
Must("delivers events on broker with URI",
Expand Down

0 comments on commit 62d36d2

Please sign in to comment.