From 55a7c6182280e065db9979df371930b33347188e Mon Sep 17 00:00:00 2001 From: Pierangelo Di Pilato Date: Thu, 7 Nov 2024 09:16:12 +0100 Subject: [PATCH 1/9] Allow configuring (opt-in) IMC async handler We switched to use the sync handler by default, however, it was reported that in some cases, this is not wanted as it slows down the source event senders since it needs to wait for all subscribers to receive events. While this is the best default behavior since reduces lost events in InMemoryChannel, we want to allow configuring this behavior, while documenting the downsides (follow up to docs repo) Signed-off-by: Pierangelo Di Pilato --- .../messaging/v1/in_memory_channel_types.go | 11 +- .../dispatcher/inmemorychannel.go | 11 +- .../dispatcher/inmemorychannel_test.go | 105 ++++++++++++++++++ 3 files changed, 125 insertions(+), 2 deletions(-) diff --git a/pkg/apis/messaging/v1/in_memory_channel_types.go b/pkg/apis/messaging/v1/in_memory_channel_types.go index d45d1a971b4..b2fbdb56c94 100644 --- a/pkg/apis/messaging/v1/in_memory_channel_types.go +++ b/pkg/apis/messaging/v1/in_memory_channel_types.go @@ -19,10 +19,11 @@ package v1 import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1" "knative.dev/pkg/apis" duckv1 "knative.dev/pkg/apis/duck/v1" "knative.dev/pkg/kmeta" + + eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1" ) // +genclient @@ -44,6 +45,14 @@ type InMemoryChannel struct { Status InMemoryChannelStatus `json:"status,omitempty"` } +const ( + // AsyncHandlerAnnotation controls whether InMemoryChannel uses the async handler. + // + // Async handler is subject to event loss since it responds with 200 before forwarding the event + // to all subscriptions. + AsyncHandlerAnnotation = "eventing.knative.dev/async-handler" +) + var ( // Check that InMemoryChannel can be validated and defaulted. _ apis.Validatable = (*InMemoryChannel)(nil) diff --git a/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go b/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go index 8f663491d65..7dec76f9363 100644 --- a/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go +++ b/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go @@ -19,6 +19,7 @@ package dispatcher import ( "context" "fmt" + "strconv" listers "knative.dev/eventing/pkg/client/listers/messaging/v1" @@ -250,13 +251,21 @@ func newConfigForInMemoryChannel(ctx context.Context, imc *v1.InMemoryChannel) ( subs[i] = *conf } + async := false + if v, ok := imc.Annotations[v1.AsyncHandlerAnnotation]; ok { + b, err := strconv.ParseBool(v) + if err == nil { + async = b + } + } + return &multichannelfanout.ChannelConfig{ Namespace: imc.Namespace, Name: imc.Name, HostName: imc.Status.Address.URL.Host, Path: fmt.Sprintf("%s/%s", imc.Namespace, imc.Name), FanoutConfig: fanout.Config{ - AsyncHandler: false, + AsyncHandler: async, Subscriptions: subs, }, }, nil diff --git a/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel_test.go b/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel_test.go index ea85a6fc9a6..ac83622b395 100644 --- a/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel_test.go +++ b/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel_test.go @@ -19,11 +19,13 @@ package dispatcher import ( "context" "net/http" + "reflect" "testing" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" clientgotesting "k8s.io/client-go/testing" @@ -662,3 +664,106 @@ func (f *fakeMultiChannelHandler) GetChannelHandler(host string) fanout.EventHan func (f *fakeMultiChannelHandler) CountChannelHandlers() int { return len(f.handlers) } + +func Test_newConfigForInMemoryChannelAsyncHandler(t *testing.T) { + ctx, _ := SetupFakeContext(t, SetUpInformerSelector) + + type args struct { + ctx context.Context + imc *v1.InMemoryChannel + } + tests := []struct { + name string + args args + wantAsync bool + wantErr bool + }{ + { + name: "async handler", + args: args{ + ctx: ctx, + imc: &v1.InMemoryChannel{ + ObjectMeta: metav1.ObjectMeta{ + Name: "n", + Namespace: "ns", + Annotations: map[string]string{ + v1.AsyncHandlerAnnotation: "true", + }, + }, + Status: v1.InMemoryChannelStatus{ + ChannelableStatus: eventingduckv1.ChannelableStatus{ + AddressStatus: duckv1.AddressStatus{ + Address: &duckv1.Addressable{ + URL: apis.HTTPS("something"), + }, + }, + }, + }, + }, + }, + wantAsync: true, + wantErr: false, + }, + { + name: "sync handler, default", + args: args{ + ctx: ctx, + imc: &v1.InMemoryChannel{ + ObjectMeta: metav1.ObjectMeta{ + Name: "n", + Namespace: "ns", + }, + Status: v1.InMemoryChannelStatus{ + ChannelableStatus: eventingduckv1.ChannelableStatus{ + AddressStatus: duckv1.AddressStatus{ + Address: &duckv1.Addressable{ + URL: apis.HTTPS("something"), + }, + }, + }, + }, + }, + }, + wantAsync: false, + wantErr: false, + }, + { + name: "sync handler, explicit", + args: args{ + ctx: ctx, + imc: &v1.InMemoryChannel{ + ObjectMeta: metav1.ObjectMeta{ + Name: "n", + Namespace: "ns", + Annotations: map[string]string{ + v1.AsyncHandlerAnnotation: "false", + }, + }, + Status: v1.InMemoryChannelStatus{ + ChannelableStatus: eventingduckv1.ChannelableStatus{ + AddressStatus: duckv1.AddressStatus{ + Address: &duckv1.Addressable{ + URL: apis.HTTPS("something"), + }, + }, + }, + }, + }, + }, + wantAsync: false, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := newConfigForInMemoryChannel(tt.args.ctx, tt.args.imc) + if (err != nil) != tt.wantErr { + t.Errorf("newConfigForInMemoryChannel() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got.FanoutConfig.AsyncHandler, tt.wantAsync) { + t.Errorf("newConfigForInMemoryChannel() got = %v, want %v", got, tt.wantAsync) + } + }) + } +} From cd2b5e95bfca7428ded9cd4d8fdb1d7c1176885b Mon Sep 17 00:00:00 2001 From: Pierangelo Di Pilato Date: Thu, 7 Nov 2024 09:19:59 +0100 Subject: [PATCH 2/9] Propagate annotations and labels to channel Signed-off-by: Pierangelo Di Pilato --- pkg/reconciler/broker/broker.go | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/pkg/reconciler/broker/broker.go b/pkg/reconciler/broker/broker.go index 46e2bb32b0d..78e7b7e1686 100644 --- a/pkg/reconciler/broker/broker.go +++ b/pkg/reconciler/broker/broker.go @@ -108,10 +108,14 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, b *eventingv1.Broker) pk return err } - var tmpChannelableSpec duckv1.ChannelableSpec = duckv1.ChannelableSpec{ + var tmpChannelableSpec = duckv1.ChannelableSpec{ Delivery: b.Spec.Delivery, } + metadata := b.ObjectMeta.DeepCopy() + channelAnnotations := metadata.GetAnnotations() + channelAnnotations[eventing.ScopeAnnotationKey] = eventing.ScopeCluster + logging.FromContext(ctx).Infow("Reconciling the trigger channel") c, err := ducklib.NewPhysicalChannel( chanMan.template.TypeMeta, @@ -122,7 +126,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, b *eventingv1.Broker) pk *kmeta.NewControllerRef(b), }, Labels: TriggerChannelLabels(b.Name, b.Namespace), - Annotations: map[string]string{eventing.ScopeAnnotationKey: eventing.ScopeCluster}, + Annotations: channelAnnotations, }, ducklib.WithChannelableSpec(tmpChannelableSpec), ducklib.WithPhysicalChannelSpec(chanMan.template.Spec), @@ -392,7 +396,9 @@ func (r *Reconciler) reconcileChannel(ctx context.Context, channelResourceInterf return nil, fmt.Errorf("failed to convert %s/%s into Channelable: %w", channelObjRef.Namespace, channelObjRef.Name, err) } - if equality.Semantic.DeepEqual(desired.Spec.Delivery, channelable.Spec.Delivery) { + if equality.Semantic.DeepDerivative(desired.Spec.Delivery, channelable.Spec.Delivery) && + equality.Semantic.DeepDerivative(desired.Annotations, channelable.Annotations) && + equality.Semantic.DeepDerivative(desired.Labels, channelable.Labels) { // If propagated/mutable properties match return the Channel. return channelable, nil } @@ -402,12 +408,20 @@ func (r *Reconciler) reconcileChannel(ctx context.Context, channelResourceInterf jsonPatch, err := duckapis.CreatePatch( // Existing Channel properties duckv1.Channelable{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: channelable.Annotations, + Labels: channelable.Labels, + }, Spec: duckv1.ChannelableSpec{ Delivery: channelable.Spec.Delivery, }, }, // Desired Channel properties duckv1.Channelable{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: desired.Annotations, + Labels: desired.Labels, + }, Spec: duckv1.ChannelableSpec{ Delivery: desired.Spec.Delivery, }, From c200cffa5f9b7289fc0c7e4b747cd5164e80fcb9 Mon Sep 17 00:00:00 2001 From: Pierangelo Di Pilato Date: Thu, 7 Nov 2024 09:20:29 +0100 Subject: [PATCH 3/9] Add E2E tests Signed-off-by: Pierangelo Di Pilato --- pkg/channel/fanout/fanout_event_handler.go | 5 +- test/rekt/broker_test.go | 16 ++ test/rekt/channel_test.go | 24 ++- test/rekt/features/broker/feature.go | 164 ++++++++++++++---- test/rekt/features/channel/features.go | 75 ++++++++ test/rekt/resources/broker/broker.go | 4 + .../resources/channel_impl/channel_impl.go | 2 + 7 files changed, 258 insertions(+), 32 deletions(-) diff --git a/pkg/channel/fanout/fanout_event_handler.go b/pkg/channel/fanout/fanout_event_handler.go index 7aa80acd0c3..d307b79d99a 100644 --- a/pkg/channel/fanout/fanout_event_handler.go +++ b/pkg/channel/fanout/fanout_event_handler.go @@ -60,8 +60,11 @@ type Subscription struct { // Config for a fanout.EventHandler. type Config struct { Subscriptions []Subscription `json:"subscriptions"` - // Deprecated: AsyncHandler controls whether the Subscriptions are called synchronous or asynchronously. + // AsyncHandler controls whether the Subscriptions are called synchronous or asynchronously. // It is expected to be false when used as a sidecar. + // + // Async handler is subject to event loss since it responds with 200 before forwarding the event + // to all subscriptions. AsyncHandler bool `json:"asyncHandler,omitempty"` } diff --git a/test/rekt/broker_test.go b/test/rekt/broker_test.go index f0c8d866ad4..343553086b2 100644 --- a/test/rekt/broker_test.go +++ b/test/rekt/broker_test.go @@ -192,6 +192,22 @@ func TestBrokerRedelivery(t *testing.T) { env.TestSet(ctx, t, broker.BrokerRedelivery()) } +// TestBrokerPropagatesMetadata test Broker reconciler propagates metadata to channel. +func TestBrokerPropagatesMetadata(t *testing.T) { + t.Parallel() + + ctx, env := global.Environment( + knative.WithKnativeNamespace(system.Namespace()), + knative.WithLoggingConfig, + knative.WithTracingConfig, + k8s.WithEventListener, + environment.Managed(t), + environment.WithPollTimings(5*time.Second, 4*time.Minute), + ) + + env.ParallelTest(ctx, t, broker.PropagatesMetadata()) +} + func TestBrokerDeadLetterSinkExtensions(t *testing.T) { t.Parallel() diff --git a/test/rekt/channel_test.go b/test/rekt/channel_test.go index c0c32358e39..aad5e42b21d 100644 --- a/test/rekt/channel_test.go +++ b/test/rekt/channel_test.go @@ -23,9 +23,10 @@ import ( "testing" "time" - "knative.dev/eventing/test/rekt/features/authz" "knative.dev/reconciler-test/pkg/feature" + "knative.dev/eventing/test/rekt/features/authz" + "github.com/cloudevents/sdk-go/v2/binding" duckv1 "knative.dev/pkg/apis/duck/v1" "knative.dev/pkg/system" @@ -205,6 +206,27 @@ func TestChannelDeadLetterSink(t *testing.T) { env.Test(ctx, t, channel.DeadLetterSink(createSubscriberFn)) } +/* +TestChannelAsyncHandler tests if the async handler can be configured on the channel. +*/ +func TestChannelAsyncHandler(t *testing.T) { + t.Parallel() + + ctx, env := global.Environment( + knative.WithKnativeNamespace(system.Namespace()), + knative.WithLoggingConfig, + knative.WithTracingConfig, + k8s.WithEventListener, + environment.Managed(t), + ) + + createSubscriberFn := func(ref *duckv1.KReference, uri string) manifest.CfgFn { + return subscription.WithSubscriber(ref, uri, "") + } + env.ParallelTest(ctx, t, channel.AsyncHandler(createSubscriberFn)) + env.ParallelTest(ctx, t, channel.AsyncHandlerUpdate(createSubscriberFn)) +} + // TestGenericChannelDeadLetterSink tests if the events that cannot be delivered end up in // the dead letter sink. func TestGenericChannelDeadLetterSink(t *testing.T) { diff --git a/test/rekt/features/broker/feature.go b/test/rekt/features/broker/feature.go index 3704da705b7..cb41bd45122 100644 --- a/test/rekt/features/broker/feature.go +++ b/test/rekt/features/broker/feature.go @@ -19,6 +19,7 @@ package broker import ( "context" "encoding/base64" + "encoding/json" "fmt" "strings" @@ -26,6 +27,8 @@ import ( "github.com/cloudevents/sdk-go/v2/binding/spec" "github.com/cloudevents/sdk-go/v2/test" "github.com/google/uuid" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "knative.dev/pkg/injection/clients/dynamicclient" "knative.dev/reconciler-test/pkg/environment" "knative.dev/reconciler-test/pkg/state" @@ -34,6 +37,7 @@ import ( "knative.dev/eventing/test/rekt/features" "knative.dev/eventing/test/rekt/resources/broker" "knative.dev/eventing/test/rekt/resources/channel" + "knative.dev/eventing/test/rekt/resources/channel_impl" "knative.dev/eventing/test/rekt/resources/subscription" "knative.dev/eventing/test/rekt/resources/trigger" @@ -42,7 +46,7 @@ import ( "knative.dev/pkg/ptr" "knative.dev/reconciler-test/pkg/eventshub" - eventasssert "knative.dev/reconciler-test/pkg/eventshub/assert" + eventassert "knative.dev/reconciler-test/pkg/eventshub/assert" "knative.dev/reconciler-test/pkg/feature" "knative.dev/reconciler-test/pkg/manifest" "knative.dev/reconciler-test/pkg/resources/service" @@ -173,7 +177,7 @@ func ManyTriggers() *feature.FeatureSet { eventshub.InputEvent(eventToSend), )) - f.Assert("source sent event", eventasssert.OnStore(source). + f.Assert("source sent event", eventassert.OnStore(source). MatchSentEvent(test.HasId(eventToSend.ID())). AtLeast(1), ) @@ -185,7 +189,7 @@ func ManyTriggers() *feature.FeatureSet { // Check on every dumper whether we should expect this event or not if eventFilter.toEventMatcher()(eventToSend) == nil { f.Assert(fmt.Sprintf("%s receive event %s", sink, eventToSend.ID()), func(ctx context.Context, t feature.T) { - eventasssert.OnStore(sink). + eventassert.OnStore(sink). Match(features.HasKnNamespaceHeader(environment.FromContext(ctx).Namespace())). MatchReceivedEvent(test.HasId(eventToSend.ID())). MatchReceivedEvent(matcher). @@ -335,12 +339,12 @@ func brokerChannelFlowWithTransformation(createSubscriberFn func(ref *v1.KRefere eventshub.InputEvent(eventToSend), )) - eventMatcher := eventasssert.MatchEvent( + eventMatcher := eventassert.MatchEvent( test.HasSource(eventSource), test.HasType(eventType), test.HasData([]byte(eventBody)), ) - transformEventMatcher := eventasssert.MatchEvent( + transformEventMatcher := eventassert.MatchEvent( test.HasSource(transformedEventSource), test.HasType(transformedEventType), test.HasData([]byte(transformedBody)), @@ -348,19 +352,19 @@ func brokerChannelFlowWithTransformation(createSubscriberFn func(ref *v1.KRefere f.Stable("(Trigger1 point to) sink1 has all the events"). Must("delivers original events", - eventasssert.OnStore(sink1).Match(eventMatcher).AtLeast(1)) + eventassert.OnStore(sink1).Match(eventMatcher).AtLeast(1)) f.Stable("(Trigger2 point to) sink2 has all the events"). Must("delivers original events", - eventasssert.OnStore(sink2).Match(eventMatcher).AtLeast(1)). + eventassert.OnStore(sink2).Match(eventMatcher).AtLeast(1)). Must("delivers transformation events", - eventasssert.OnStore(sink2).Match(transformEventMatcher).AtLeast(1)) + eventassert.OnStore(sink2).Match(transformEventMatcher).AtLeast(1)) f.Stable("(Trigger3 point to) Channel's subscriber just has events after transformation"). Must("delivers transformation events", - eventasssert.OnStore(sink3).Match(transformEventMatcher).AtLeast(1)). + eventassert.OnStore(sink3).Match(transformEventMatcher).AtLeast(1)). Must("delivers original events", - eventasssert.OnStore(sink3).Match(eventMatcher).Not()) + eventassert.OnStore(sink3).Match(eventMatcher).Not()) return f } @@ -482,13 +486,13 @@ func BrokerEventTransformationForTriggerAssert(f *feature.Feature, eventshub.InputEvent(cfg.EventToSend), )) - eventMatcher := eventasssert.MatchEvent( + eventMatcher := eventassert.MatchEvent( test.HasId(cfg.EventToSend.ID()), test.HasSource(cfg.EventToSend.Source()), test.HasType(cfg.EventToSend.Type()), test.HasData(cfg.EventToSend.Data()), ) - transformEventMatcher := eventasssert.MatchEvent( + transformEventMatcher := eventassert.MatchEvent( test.HasSource(cfg.TransformedEvent.Source()), test.HasType(cfg.TransformedEvent.Type()), test.HasData(cfg.TransformedEvent.Data()), @@ -496,13 +500,13 @@ func BrokerEventTransformationForTriggerAssert(f *feature.Feature, f.Stable("Trigger has filtered all transformed events"). Must("trigger 1 delivers original events", - eventasssert.OnStore(cfg.Sink1).Match(eventMatcher).AtLeast(1)). + eventassert.OnStore(cfg.Sink1).Match(eventMatcher).AtLeast(1)). Must("trigger 1 does not deliver transformed events", - eventasssert.OnStore(cfg.Sink1).Match(transformEventMatcher).Not()). + eventassert.OnStore(cfg.Sink1).Match(transformEventMatcher).Not()). Must("trigger 2 delivers transformed events", - eventasssert.OnStore(cfg.Sink2).Match(transformEventMatcher).AtLeast(1)). + eventassert.OnStore(cfg.Sink2).Match(transformEventMatcher).AtLeast(1)). Must("trigger 2 does not deliver original events", - eventasssert.OnStore(cfg.Sink2).Match(eventMatcher).Not()) + eventassert.OnStore(cfg.Sink2).Match(eventMatcher).Not()) } func BrokerPreferHeaderCheck() *feature.Feature { @@ -544,13 +548,113 @@ func BrokerPreferHeaderCheck() *feature.Feature { f.Stable("test message without explicit prefer header should have the header"). Must("delivers events", - eventasssert.OnStore(sink).Match( - eventasssert.HasAdditionalHeader("Prefer", "reply"), + eventassert.OnStore(sink).Match( + eventassert.HasAdditionalHeader("Prefer", "reply"), ).AtLeast(1)) return f } +func PropagatesMetadata() *feature.Feature { + f := feature.NewFeatureNamed("Broker PreferHeader Check") + + if !broker.EnvCfg.IsMTChannelBasedBroker() { + f.Assert("class is not MTChannelBasedBroker, skipping", func(ctx context.Context, t feature.T) {}) + return f + } + + source := feature.MakeRandomK8sName("source") + sink := feature.MakeRandomK8sName("sink") + via := feature.MakeRandomK8sName("via") + + key := "eventing.knative.dev/async-handler" + value := "false" + + event := test.FullEvent() + event.SetID(uuid.New().String()) + + //Install the broker + brokerName := feature.MakeRandomK8sName("broker") + f.Setup("install broker", broker.Install(brokerName, append(broker.WithEnvConfig(), broker.WithAnnotations( + map[string]interface{}{key: value}, + ))...)) + f.Requirement("broker is ready", broker.IsReady(brokerName)) + f.Requirement("broker is addressable", broker.IsAddressable(brokerName)) + + f.Setup("install sink", eventshub.Install(sink, eventshub.StartReceiver)) + + // Point the Trigger subscriber to the sink svc. + cfg := []manifest.CfgFn{trigger.WithSubscriber(service.AsKReference(sink), ""), trigger.WithBrokerName(brokerName)} + + // Install the trigger + f.Setup("install trigger", trigger.Install(via, cfg...)) + f.Setup("trigger goes ready", trigger.IsReady(via)) + + f.Requirement("install source", eventshub.Install( + source, + eventshub.StartSenderToResource(broker.GVR(), brokerName), + eventshub.InputEvent(event), + )) + + f.Assert("channel has annotations and labels", func(ctx context.Context, t feature.T) { + d := dynamicclient.Get(ctx) + channelsImpls, err := d.Resource(channel_impl.GVR()). + Namespace(environment.FromContext(ctx).Namespace()). + List(ctx, metav1.ListOptions{}) + if err != nil { + t.Errorf("Failed to list channels (%v): %v", channel_impl.GVR(), err) + return + } + + channels, err := d.Resource(channel.GVR()). + Namespace(environment.FromContext(ctx).Namespace()). + List(ctx, metav1.ListOptions{}) + if err != nil { + t.Errorf("Failed to list channels (%v): %v", channel.GVR(), err) + return + } + + channels.Items = append(channels.Items, channelsImpls.Items...) + + if len(channels.Items) <= 0 { + t.Errorf("No channels found for resources: %#v or %#v", channel_impl.GVR(), channel.GVR()) + } + + found := false + for _, ch := range channels.Items { + for _, or := range ch.GetOwnerReferences() { + if or.Kind == "Broker" && or.Name == brokerName { + v, ok := ch.GetAnnotations()[key] + if !ok { + t.Errorf("Failed to find async handler annotation:\n%#v", ch) + return + } + if v != value { + t.Errorf("Failed to find expected '%s' value for annotation '%s':\n%#v", value, key, ch) + return + } + found = true + break + } + } + } + if !found { + bytes, _ := json.MarshalIndent(channels, "", " ") + t.Errorf("No channel found associated with broker %q\n%#v", brokerName, string(bytes)) + } + }) + f.Assert("event sent", eventassert.OnStore(source). + MatchSentEvent(test.HasId(event.ID())). + AtLeast(1), + ) + f.Assert("event received", eventassert.OnStore(sink). + MatchReceivedEvent(test.HasId(event.ID())). + AtLeast(1), + ) + + return f +} + func BrokerRedelivery() *feature.FeatureSet { fs := &feature.FeatureSet{ Name: "Knative Broker - Redelivery - with different sequences", @@ -606,9 +710,9 @@ func brokerRedeliveryFibonacci(retryNum int32) *feature.Feature { f.Stable("Broker Redelivery following the fibonacci sequence"). Must("delivers events", - eventasssert.OnStore(sink).Match( - eventasssert.MatchKind(eventasssert.EventReceived), - eventasssert.MatchEvent( + eventassert.OnStore(sink).Match( + eventassert.MatchKind(eventassert.EventReceived), + eventassert.MatchEvent( test.HasSource(eventSource), test.HasType(eventType), test.HasData([]byte(eventBody)), @@ -662,11 +766,11 @@ func brokerRedeliveryDropN(retryNum int32, dropNum uint) *feature.Feature { f.Stable("Broker Redelivery failed the first n events"). Must("delivers events", func(ctx context.Context, t feature.T) { - eventasssert.OnStore(sink). + eventassert.OnStore(sink). Match(features.HasKnNamespaceHeader(environment.FromContext(ctx).Namespace())). Match( - eventasssert.MatchKind(eventasssert.EventReceived), - eventasssert.MatchEvent( + eventassert.MatchKind(eventassert.EventReceived), + eventassert.MatchEvent( test.HasSource(eventSource), test.HasType(eventType), test.HasData([]byte(eventBody)), @@ -734,7 +838,7 @@ func brokerSubscriberUnreachable() *feature.Feature { f.Assert("Receives dls extensions when subscriber is unreachable", func(ctx context.Context, t feature.T) { - eventasssert.OnStore(sink). + eventassert.OnStore(sink). Match(features.HasKnNamespaceHeader(environment.FromContext(ctx).Namespace())). MatchEvent( test.HasExtension("knativeerrordest", subscriberUri), @@ -880,8 +984,8 @@ func assertEnhancedWithKnativeErrorExtensions(sinkName string, matcherfns ...fun ctx, t, 1, - eventasssert.MatchKind(eventshub.EventReceived), - eventasssert.MatchEvent(matchers...), + eventassert.MatchKind(eventshub.EventReceived), + eventassert.MatchEvent(matchers...), ) } } @@ -936,7 +1040,7 @@ func brokerSubscriberLongMessage() *feature.Feature { )) f.Assert("receive long event on sink exactly once", - eventasssert.OnStore(sink). + eventassert.OnStore(sink). MatchEvent(test.HasData([]byte(eventBody))). Exact(1), ) @@ -1021,13 +1125,13 @@ func brokerSubscriberLongResponseMessage() *feature.Feature { )) f.Assert("receive long event on sink1 exactly once", - eventasssert.OnStore(sink1). + eventassert.OnStore(sink1). MatchEvent(test.HasData([]byte(eventBody))). Exact(1), ) f.Assert("receive long event on sink2 exactly once", - eventasssert.OnStore(sink2). + eventassert.OnStore(sink2). MatchEvent(test.HasData([]byte(transformedEventBody))). Exact(1), ) diff --git a/test/rekt/features/channel/features.go b/test/rekt/features/channel/features.go index da8a6e9bf5f..5dcf9207cae 100644 --- a/test/rekt/features/channel/features.go +++ b/test/rekt/features/channel/features.go @@ -134,6 +134,81 @@ func DeadLetterSink(createSubscriberFn func(ref *duckv1.KReference, uri string) return f } +func AsyncHandler(createSubscriberFn func(ref *duckv1.KReference, uri string) manifest.CfgFn) *feature.Feature { + f := feature.NewFeature() + sink := feature.MakeRandomK8sName("sink") + source := feature.MakeRandomK8sName("source") + name := feature.MakeRandomK8sName("channel") + sub := feature.MakeRandomK8sName("subscription") + + event := test.FullEvent() + event.SetID(uuid.New().String()) + + f.Setup("install sink", eventshub.Install(sink, eventshub.StartReceiver)) + f.Setup("install channel", channel_impl.Install(name, channel_impl.WithAnnotations(map[string]interface{}{ + "eventing.knative.dev/async-handler": "true", + }))) + f.Setup("install subscription", subscription.Install(sub, + subscription.WithChannel(channel_impl.AsRef(name)), + createSubscriberFn(service.AsKReference(sink), ""), + )) + f.Setup("channel is ready", channel_impl.IsReady(name)) + f.Setup("subscription is ready", subscription.IsReady(sub)) + + f.Requirement("install source", eventshub.Install(source, eventshub.InputEvent(event), eventshub.StartSenderToResource(channel_impl.GVR(), name))) + + f.Assert("Event sent", assert.OnStore(source). + MatchSentEvent(test.HasId(event.ID())). + AtLeast(1), + ) + f.Assert("sink receives event", assert.OnStore(sink). + MatchEvent(test.HasId(event.ID())). + AtLeast(1), + ) + + return f +} + +func AsyncHandlerUpdate(createSubscriberFn func(ref *duckv1.KReference, uri string) manifest.CfgFn) *feature.Feature { + f := feature.NewFeature() + sink := feature.MakeRandomK8sName("sink") + source := feature.MakeRandomK8sName("source") + name := feature.MakeRandomK8sName("channel") + sub := feature.MakeRandomK8sName("subscription") + + event := test.FullEvent() + event.SetID(uuid.New().String()) + + f.Setup("install sink", eventshub.Install(sink, eventshub.StartReceiver)) + f.Setup("install channel", channel_impl.Install(name, channel_impl.WithAnnotations(map[string]interface{}{ + "eventing.knative.dev/async-handler": "true", + }))) + f.Setup("install subscription", subscription.Install(sub, + subscription.WithChannel(channel_impl.AsRef(name)), + createSubscriberFn(service.AsKReference(sink), ""), + )) + f.Setup("channel is ready", channel_impl.IsReady(name)) + f.Setup("subscription is ready", subscription.IsReady(sub)) + + f.Requirement("update channel async handler", channel_impl.Install(name, channel_impl.WithAnnotations(map[string]interface{}{ + "eventing.knative.dev/async-handler": "false", + }))) + f.Requirement("channel is ready", channel_impl.IsReady(name)) + + f.Assert("install source", eventshub.Install(source, eventshub.InputEvent(event), eventshub.StartSenderToResource(channel_impl.GVR(), name))) + + f.Assert("Event sent", assert.OnStore(source). + MatchSentEvent(test.HasId(event.ID())). + AtLeast(1), + ) + f.Assert("sink receives event", assert.OnStore(sink). + MatchEvent(test.HasId(event.ID())). + AtLeast(1), + ) + + return f +} + func DeadLetterSinkGenericChannel(createSubscriberFn func(ref *duckv1.KReference, uri string) manifest.CfgFn) *feature.Feature { f := feature.NewFeature() sink := feature.MakeRandomK8sName("sink") diff --git a/test/rekt/resources/broker/broker.go b/test/rekt/resources/broker/broker.go index 81c170ad0d4..7bd1e4ce34c 100644 --- a/test/rekt/resources/broker/broker.go +++ b/test/rekt/resources/broker/broker.go @@ -51,6 +51,10 @@ type EnvConfig struct { BrokerTemplatesDir string `envconfig:"BROKER_TEMPLATES"` } +func (cfg EnvConfig) IsMTChannelBasedBroker() bool { + return cfg.BrokerClass == "" || cfg.BrokerClass == "MTChannelBasedBroker" +} + func init() { // Process EventingGlobal. if err := envconfig.Process("", &EnvCfg); err != nil { diff --git a/test/rekt/resources/channel_impl/channel_impl.go b/test/rekt/resources/channel_impl/channel_impl.go index 93d51230a6a..7766cd03f6b 100644 --- a/test/rekt/resources/channel_impl/channel_impl.go +++ b/test/rekt/resources/channel_impl/channel_impl.go @@ -173,6 +173,8 @@ func AsDestinationRef(name string) *duckv1.Destination { // WithDeadLetterSink adds the dead letter sink related config to a Subscription spec. var WithDeadLetterSink = delivery.WithDeadLetterSink +var WithAnnotations = manifest.WithAnnotations + // ValidateAddress validates the address retured by Address func ValidateAddress(name string, validate addressable.ValidateAddressFn, timings ...time.Duration) feature.StepFn { return addressable.ValidateAddress(GVR(), name, validate, timings...) From 66375bb4502f2f01fefe92d94bb1a71e1e5a74be Mon Sep 17 00:00:00 2001 From: Pierangelo Di Pilato Date: Thu, 7 Nov 2024 09:25:16 +0100 Subject: [PATCH 4/9] Use constant in tests Signed-off-by: Pierangelo Di Pilato --- pkg/apis/messaging/v1/in_memory_channel_types.go | 2 +- test/rekt/features/broker/feature.go | 3 ++- test/rekt/features/channel/features.go | 7 ++++--- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/pkg/apis/messaging/v1/in_memory_channel_types.go b/pkg/apis/messaging/v1/in_memory_channel_types.go index b2fbdb56c94..f7e168ad49a 100644 --- a/pkg/apis/messaging/v1/in_memory_channel_types.go +++ b/pkg/apis/messaging/v1/in_memory_channel_types.go @@ -50,7 +50,7 @@ const ( // // Async handler is subject to event loss since it responds with 200 before forwarding the event // to all subscriptions. - AsyncHandlerAnnotation = "eventing.knative.dev/async-handler" + AsyncHandlerAnnotation = "messaging.knative.dev/async-handler" ) var ( diff --git a/test/rekt/features/broker/feature.go b/test/rekt/features/broker/feature.go index cb41bd45122..9d7f4f09a16 100644 --- a/test/rekt/features/broker/feature.go +++ b/test/rekt/features/broker/feature.go @@ -34,6 +34,7 @@ import ( duckv1 "knative.dev/eventing/pkg/apis/duck/v1" eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" + messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1" "knative.dev/eventing/test/rekt/features" "knative.dev/eventing/test/rekt/resources/broker" "knative.dev/eventing/test/rekt/resources/channel" @@ -567,7 +568,7 @@ func PropagatesMetadata() *feature.Feature { sink := feature.MakeRandomK8sName("sink") via := feature.MakeRandomK8sName("via") - key := "eventing.knative.dev/async-handler" + key := messagingv1.AsyncHandlerAnnotation value := "false" event := test.FullEvent() diff --git a/test/rekt/features/channel/features.go b/test/rekt/features/channel/features.go index 5dcf9207cae..61213dd1076 100644 --- a/test/rekt/features/channel/features.go +++ b/test/rekt/features/channel/features.go @@ -37,6 +37,7 @@ import ( eventasssert "knative.dev/reconciler-test/pkg/eventshub/assert" + v1 "knative.dev/eventing/pkg/apis/messaging/v1" "knative.dev/eventing/test/rekt/features" "knative.dev/eventing/test/rekt/resources/channel" "knative.dev/eventing/test/rekt/resources/channel_impl" @@ -146,7 +147,7 @@ func AsyncHandler(createSubscriberFn func(ref *duckv1.KReference, uri string) ma f.Setup("install sink", eventshub.Install(sink, eventshub.StartReceiver)) f.Setup("install channel", channel_impl.Install(name, channel_impl.WithAnnotations(map[string]interface{}{ - "eventing.knative.dev/async-handler": "true", + v1.AsyncHandlerAnnotation: "true", }))) f.Setup("install subscription", subscription.Install(sub, subscription.WithChannel(channel_impl.AsRef(name)), @@ -181,7 +182,7 @@ func AsyncHandlerUpdate(createSubscriberFn func(ref *duckv1.KReference, uri stri f.Setup("install sink", eventshub.Install(sink, eventshub.StartReceiver)) f.Setup("install channel", channel_impl.Install(name, channel_impl.WithAnnotations(map[string]interface{}{ - "eventing.knative.dev/async-handler": "true", + v1.AsyncHandlerAnnotation: "true", }))) f.Setup("install subscription", subscription.Install(sub, subscription.WithChannel(channel_impl.AsRef(name)), @@ -191,7 +192,7 @@ func AsyncHandlerUpdate(createSubscriberFn func(ref *duckv1.KReference, uri stri f.Setup("subscription is ready", subscription.IsReady(sub)) f.Requirement("update channel async handler", channel_impl.Install(name, channel_impl.WithAnnotations(map[string]interface{}{ - "eventing.knative.dev/async-handler": "false", + v1.AsyncHandlerAnnotation: "false", }))) f.Requirement("channel is ready", channel_impl.IsReady(name)) From b187b702ed749c434133d0b657c06edecde86f73 Mon Sep 17 00:00:00 2001 From: Pierangelo Di Pilato Date: Thu, 7 Nov 2024 09:35:42 +0100 Subject: [PATCH 5/9] Only propagates messaging.knative.dev annotations Signed-off-by: Pierangelo Di Pilato --- pkg/apis/messaging/v1/in_memory_channel_types.go | 4 ++-- pkg/reconciler/broker/broker.go | 11 +++++++++-- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/pkg/apis/messaging/v1/in_memory_channel_types.go b/pkg/apis/messaging/v1/in_memory_channel_types.go index f7e168ad49a..35ef3988aed 100644 --- a/pkg/apis/messaging/v1/in_memory_channel_types.go +++ b/pkg/apis/messaging/v1/in_memory_channel_types.go @@ -45,12 +45,12 @@ type InMemoryChannel struct { Status InMemoryChannelStatus `json:"status,omitempty"` } -const ( +var ( // AsyncHandlerAnnotation controls whether InMemoryChannel uses the async handler. // // Async handler is subject to event loss since it responds with 200 before forwarding the event // to all subscriptions. - AsyncHandlerAnnotation = "messaging.knative.dev/async-handler" + AsyncHandlerAnnotation = SchemeGroupVersion.Group + "/async-handler" ) var ( diff --git a/pkg/reconciler/broker/broker.go b/pkg/reconciler/broker/broker.go index 78e7b7e1686..3550f73ab93 100644 --- a/pkg/reconciler/broker/broker.go +++ b/pkg/reconciler/broker/broker.go @@ -20,6 +20,7 @@ import ( "context" "errors" "fmt" + "strings" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" @@ -113,8 +114,14 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, b *eventingv1.Broker) pk } metadata := b.ObjectMeta.DeepCopy() - channelAnnotations := metadata.GetAnnotations() - channelAnnotations[eventing.ScopeAnnotationKey] = eventing.ScopeCluster + channelAnnotations := map[string]string{ + eventing.ScopeAnnotationKey: eventing.ScopeCluster, + } + for k, v := range metadata.GetAnnotations() { + if strings.HasPrefix(k, messagingv1.SchemeGroupVersion.Group) { + channelAnnotations[k] = v + } + } logging.FromContext(ctx).Infow("Reconciling the trigger channel") c, err := ducklib.NewPhysicalChannel( From 32bf295e390e9a4ae99428e340bdfe570ffddaf1 Mon Sep 17 00:00:00 2001 From: Pierangelo Di Pilato Date: Thu, 7 Nov 2024 09:45:34 +0100 Subject: [PATCH 6/9] Add unit test Signed-off-by: Pierangelo Di Pilato --- pkg/reconciler/broker/broker_test.go | 43 ++++++++++++++++++++++++++++ pkg/reconciler/testing/v1/broker.go | 9 ++++++ 2 files changed, 52 insertions(+) diff --git a/pkg/reconciler/broker/broker_test.go b/pkg/reconciler/broker/broker_test.go index 3081a50c299..01ed141ea68 100644 --- a/pkg/reconciler/broker/broker_test.go +++ b/pkg/reconciler/broker/broker_test.go @@ -48,6 +48,7 @@ import ( eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1" "knative.dev/eventing/pkg/apis/feature" + v1 "knative.dev/eventing/pkg/apis/messaging/v1" "knative.dev/eventing/pkg/auth" fakeeventingclient "knative.dev/eventing/pkg/client/injection/client/fake" "knative.dev/eventing/pkg/client/injection/ducks/duck/v1/channelable" @@ -405,6 +406,48 @@ func TestReconcile(t *testing.T) { WithDLSNotConfigured(), WithBrokerEventPoliciesReadyBecauseOIDCDisabled()), }}, + }, { + Name: "Propagate annotations", + Key: testKey, + Objects: []runtime.Object{ + makeDLSServiceAsUnstructured(), + NewBroker(brokerName, testNS, + WithBrokerClass(eventing.MTChannelBrokerClassValue), + WithBrokerConfig(config()), + WithInitBrokerConditions, + WithBrokerAnnotation(v1.AsyncHandlerAnnotation, "true")), + createChannel(withChannelReady), + imcConfigMap(), + NewEndpoints(filterServiceName, systemNS, + WithEndpointsLabels(FilterLabels()), + WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})), + NewEndpoints(ingressServiceName, systemNS, + WithEndpointsLabels(IngressLabels()), + WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewBroker(brokerName, testNS, + WithBrokerClass(eventing.MTChannelBrokerClassValue), + WithBrokerConfig(config()), + WithBrokerAnnotation(v1.AsyncHandlerAnnotation, "true"), + WithBrokerReady, + WithDLSNotConfigured(), + WithBrokerAddressURI(brokerAddress), + WithChannelAddressAnnotation(triggerChannelURL), + WithChannelAPIVersionAnnotation(triggerChannelAPIVersion), + WithChannelKindAnnotation(triggerChannelKind), + WithChannelNameAnnotation(triggerChannelName), + WithBrokerEventPoliciesReadyBecauseOIDCDisabled()), + }}, + WantPatches: []clientgotesting.PatchActionImpl{ + { + ActionImpl: clientgotesting.ActionImpl{ + Namespace: testNS, + }, + Name: fmt.Sprintf("%s-kne-trigger", brokerName), + Patch: []byte(`[{"op":"add","path":"/metadata/annotations/messaging.knative.dev~1async-handler","value":"` + "true" + `"}]`), + }, + }, }, { Name: "Successful Reconciliation with a Channel with CA certs", Key: testKey, diff --git a/pkg/reconciler/testing/v1/broker.go b/pkg/reconciler/testing/v1/broker.go index e4bfa7141e4..b04ed910d96 100644 --- a/pkg/reconciler/testing/v1/broker.go +++ b/pkg/reconciler/testing/v1/broker.go @@ -61,6 +61,15 @@ func WithBrokerFinalizers(finalizers ...string) BrokerOption { } } +func WithBrokerAnnotation(key, value string) BrokerOption { + return func(b *v1.Broker) { + if b.Annotations == nil { + b.Annotations = map[string]string{} + } + b.Annotations[key] = value + } +} + func WithBrokerResourceVersion(rv string) BrokerOption { return func(b *v1.Broker) { b.ResourceVersion = rv From acc455749f21a7da0dc463aed648f7e528f93a22 Mon Sep 17 00:00:00 2001 From: Matthias Wessendorf Date: Mon, 11 Nov 2024 17:15:44 +0100 Subject: [PATCH 7/9] Make annotation val explicit string....... Signed-off-by: Matthias Wessendorf --- test/rekt/resources/broker/broker.yaml | 2 +- test/rekt/resources/broker/broker_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/test/rekt/resources/broker/broker.yaml b/test/rekt/resources/broker/broker.yaml index ec73900e5bb..ba28e55715d 100644 --- a/test/rekt/resources/broker/broker.yaml +++ b/test/rekt/resources/broker/broker.yaml @@ -24,7 +24,7 @@ metadata: {{ end }} {{ if .annotations }} {{ range $key, $value := .annotations }} - {{ $key }}: {{ $value }} + {{ $key }}: "{{ $value }}" {{ end }} {{ end }} {{ end }} diff --git a/test/rekt/resources/broker/broker_test.go b/test/rekt/resources/broker/broker_test.go index 2b3b7f2e418..bc88aa4ec85 100644 --- a/test/rekt/resources/broker/broker_test.go +++ b/test/rekt/resources/broker/broker_test.go @@ -168,7 +168,7 @@ func ExampleWithAnnotations() { // name: foo // namespace: bar // annotations: - // eventing.knative.dev/foo: bar + // eventing.knative.dev/foo: "bar" // spec: } From 35def4439a5e8cf48699dcc3987386e3f118e68d Mon Sep 17 00:00:00 2001 From: Matthias Wessendorf Date: Tue, 12 Nov 2024 16:12:27 +0100 Subject: [PATCH 8/9] channel impl did not support setting annotations before........ Signed-off-by: Matthias Wessendorf --- test/rekt/resources/channel_impl/channel_impl.yaml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/test/rekt/resources/channel_impl/channel_impl.yaml b/test/rekt/resources/channel_impl/channel_impl.yaml index 34eb667ca97..d75bc854c06 100644 --- a/test/rekt/resources/channel_impl/channel_impl.yaml +++ b/test/rekt/resources/channel_impl/channel_impl.yaml @@ -17,6 +17,12 @@ kind: {{ .kind }} metadata: name: {{ .name }} namespace: {{ .namespace }} + annotations: + {{ if .annotations }} + {{ range $key, $value := .annotations }} + {{ $key }}: "{{ $value }}" + {{ end }} + {{ end }} spec: {{ if .delivery }} delivery: From b1e6f3dead26d9e36dd62cdae5e65713cc905c27 Mon Sep 17 00:00:00 2001 From: Matthias Wessendorf Date: Tue, 12 Nov 2024 16:15:33 +0100 Subject: [PATCH 9/9] Instead of re-installing the channel_impl, lets update the annotations. Otherwise we loose the channel's Spec.Subscribers Signed-off-by: Matthias Wessendorf --- test/rekt/features/channel/features.go | 23 +++++++++++++++---- .../resources/channel_impl/channel_impl.yaml | 2 +- 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/test/rekt/features/channel/features.go b/test/rekt/features/channel/features.go index 61213dd1076..22178a0720c 100644 --- a/test/rekt/features/channel/features.go +++ b/test/rekt/features/channel/features.go @@ -25,6 +25,7 @@ import ( "github.com/cloudevents/sdk-go/v2/binding" "github.com/cloudevents/sdk-go/v2/test" "github.com/google/uuid" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" duckv1 "knative.dev/pkg/apis/duck/v1" "knative.dev/pkg/network" "knative.dev/reconciler-test/pkg/environment" @@ -191,12 +192,24 @@ func AsyncHandlerUpdate(createSubscriberFn func(ref *duckv1.KReference, uri stri f.Setup("channel is ready", channel_impl.IsReady(name)) f.Setup("subscription is ready", subscription.IsReady(sub)) - f.Requirement("update channel async handler", channel_impl.Install(name, channel_impl.WithAnnotations(map[string]interface{}{ - v1.AsyncHandlerAnnotation: "false", - }))) - f.Requirement("channel is ready", channel_impl.IsReady(name)) + f.Requirement("update channel async handler", func(ctx context.Context, t feature.T) { + dc := Client(ctx) - f.Assert("install source", eventshub.Install(source, eventshub.InputEvent(event), eventshub.StartSenderToResource(channel_impl.GVR(), name))) + imc, err := dc.ChannelImpl.Get(ctx, name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Failed to retrieve InMemoryChannel: %v", err) + } + // swap and update it to false + imc.SetAnnotations(map[string]string{ + v1.AsyncHandlerAnnotation: "true", + }) + if _, err := dc.ChannelImpl.Update(ctx, imc, metav1.UpdateOptions{}); err != nil { + t.Fatalf("Failed to update async handler annotation: %v", err) + } + }) + + f.Requirement("channel is ready", channel_impl.IsReady(name)) + f.Requirement("install source", eventshub.Install(source, eventshub.InputEvent(event), eventshub.StartSenderToResource(channel_impl.GVR(), name))) f.Assert("Event sent", assert.OnStore(source). MatchSentEvent(test.HasId(event.ID())). diff --git a/test/rekt/resources/channel_impl/channel_impl.yaml b/test/rekt/resources/channel_impl/channel_impl.yaml index d75bc854c06..78a042beed7 100644 --- a/test/rekt/resources/channel_impl/channel_impl.yaml +++ b/test/rekt/resources/channel_impl/channel_impl.yaml @@ -17,8 +17,8 @@ kind: {{ .kind }} metadata: name: {{ .name }} namespace: {{ .namespace }} - annotations: {{ if .annotations }} + annotations: {{ range $key, $value := .annotations }} {{ $key }}: "{{ $value }}" {{ end }}