Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow configuring (opt-in) IMC async handler #8311

Merged
11 changes: 10 additions & 1 deletion pkg/apis/messaging/v1/in_memory_channel_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ package v1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"
"knative.dev/pkg/kmeta"

eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1"
)

// +genclient
Expand All @@ -44,6 +45,14 @@ type InMemoryChannel struct {
Status InMemoryChannelStatus `json:"status,omitempty"`
}

var (
// AsyncHandlerAnnotation controls whether InMemoryChannel uses the async handler.
//
// Async handler is subject to event loss since it responds with 200 before forwarding the event
// to all subscriptions.
AsyncHandlerAnnotation = SchemeGroupVersion.Group + "/async-handler"
)

var (
// Check that InMemoryChannel can be validated and defaulted.
_ apis.Validatable = (*InMemoryChannel)(nil)
Expand Down
5 changes: 4 additions & 1 deletion pkg/channel/fanout/fanout_event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,11 @@ type Subscription struct {
// Config for a fanout.EventHandler.
type Config struct {
Subscriptions []Subscription `json:"subscriptions"`
// Deprecated: AsyncHandler controls whether the Subscriptions are called synchronous or asynchronously.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we release note that this is not deprecated anymore?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only at the code level / library and we usually don't release note it since pretty much all users don't care

// AsyncHandler controls whether the Subscriptions are called synchronous or asynchronously.
// It is expected to be false when used as a sidecar.
//
// Async handler is subject to event loss since it responds with 200 before forwarding the event
// to all subscriptions.
AsyncHandler bool `json:"asyncHandler,omitempty"`
}

Expand Down
27 changes: 24 additions & 3 deletions pkg/reconciler/broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"errors"
"fmt"
"strings"

"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -108,10 +109,20 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, b *eventingv1.Broker) pk
return err
}

var tmpChannelableSpec duckv1.ChannelableSpec = duckv1.ChannelableSpec{
var tmpChannelableSpec = duckv1.ChannelableSpec{
Delivery: b.Spec.Delivery,
}

metadata := b.ObjectMeta.DeepCopy()
channelAnnotations := map[string]string{
eventing.ScopeAnnotationKey: eventing.ScopeCluster,
}
for k, v := range metadata.GetAnnotations() {
if strings.HasPrefix(k, messagingv1.SchemeGroupVersion.Group) {
channelAnnotations[k] = v
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this always a string?

}
}

logging.FromContext(ctx).Infow("Reconciling the trigger channel")
c, err := ducklib.NewPhysicalChannel(
chanMan.template.TypeMeta,
Expand All @@ -122,7 +133,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, b *eventingv1.Broker) pk
*kmeta.NewControllerRef(b),
},
Labels: TriggerChannelLabels(b.Name, b.Namespace),
Annotations: map[string]string{eventing.ScopeAnnotationKey: eventing.ScopeCluster},
Annotations: channelAnnotations,
},
ducklib.WithChannelableSpec(tmpChannelableSpec),
ducklib.WithPhysicalChannelSpec(chanMan.template.Spec),
Expand Down Expand Up @@ -392,7 +403,9 @@ func (r *Reconciler) reconcileChannel(ctx context.Context, channelResourceInterf
return nil, fmt.Errorf("failed to convert %s/%s into Channelable: %w", channelObjRef.Namespace, channelObjRef.Name, err)
}

if equality.Semantic.DeepEqual(desired.Spec.Delivery, channelable.Spec.Delivery) {
if equality.Semantic.DeepDerivative(desired.Spec.Delivery, channelable.Spec.Delivery) &&
equality.Semantic.DeepDerivative(desired.Annotations, channelable.Annotations) &&
equality.Semantic.DeepDerivative(desired.Labels, channelable.Labels) {
// If propagated/mutable properties match return the Channel.
return channelable, nil
}
Expand All @@ -402,12 +415,20 @@ func (r *Reconciler) reconcileChannel(ctx context.Context, channelResourceInterf
jsonPatch, err := duckapis.CreatePatch(
// Existing Channel properties
duckv1.Channelable{
ObjectMeta: metav1.ObjectMeta{
Annotations: channelable.Annotations,
Labels: channelable.Labels,
},
Spec: duckv1.ChannelableSpec{
Delivery: channelable.Spec.Delivery,
},
},
// Desired Channel properties
duckv1.Channelable{
ObjectMeta: metav1.ObjectMeta{
Annotations: desired.Annotations,
Labels: desired.Labels,
},
Spec: duckv1.ChannelableSpec{
Delivery: desired.Spec.Delivery,
},
Expand Down
43 changes: 43 additions & 0 deletions pkg/reconciler/broker/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1"
"knative.dev/eventing/pkg/apis/feature"
v1 "knative.dev/eventing/pkg/apis/messaging/v1"
"knative.dev/eventing/pkg/auth"
fakeeventingclient "knative.dev/eventing/pkg/client/injection/client/fake"
"knative.dev/eventing/pkg/client/injection/ducks/duck/v1/channelable"
Expand Down Expand Up @@ -405,6 +406,48 @@ func TestReconcile(t *testing.T) {
WithDLSNotConfigured(),
WithBrokerEventPoliciesReadyBecauseOIDCDisabled()),
}},
}, {
Name: "Propagate annotations",
Key: testKey,
Objects: []runtime.Object{
makeDLSServiceAsUnstructured(),
NewBroker(brokerName, testNS,
WithBrokerClass(eventing.MTChannelBrokerClassValue),
WithBrokerConfig(config()),
WithInitBrokerConditions,
WithBrokerAnnotation(v1.AsyncHandlerAnnotation, "true")),
createChannel(withChannelReady),
imcConfigMap(),
NewEndpoints(filterServiceName, systemNS,
WithEndpointsLabels(FilterLabels()),
WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})),
NewEndpoints(ingressServiceName, systemNS,
WithEndpointsLabels(IngressLabels()),
WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})),
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: NewBroker(brokerName, testNS,
WithBrokerClass(eventing.MTChannelBrokerClassValue),
WithBrokerConfig(config()),
WithBrokerAnnotation(v1.AsyncHandlerAnnotation, "true"),
WithBrokerReady,
WithDLSNotConfigured(),
WithBrokerAddressURI(brokerAddress),
WithChannelAddressAnnotation(triggerChannelURL),
WithChannelAPIVersionAnnotation(triggerChannelAPIVersion),
WithChannelKindAnnotation(triggerChannelKind),
WithChannelNameAnnotation(triggerChannelName),
WithBrokerEventPoliciesReadyBecauseOIDCDisabled()),
}},
WantPatches: []clientgotesting.PatchActionImpl{
{
ActionImpl: clientgotesting.ActionImpl{
Namespace: testNS,
},
Name: fmt.Sprintf("%s-kne-trigger", brokerName),
Patch: []byte(`[{"op":"add","path":"/metadata/annotations/messaging.knative.dev~1async-handler","value":"` + "true" + `"}]`),
},
},
}, {
Name: "Successful Reconciliation with a Channel with CA certs",
Key: testKey,
Expand Down
11 changes: 10 additions & 1 deletion pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package dispatcher
import (
"context"
"fmt"
"strconv"

listers "knative.dev/eventing/pkg/client/listers/messaging/v1"

Expand Down Expand Up @@ -250,13 +251,21 @@ func newConfigForInMemoryChannel(ctx context.Context, imc *v1.InMemoryChannel) (
subs[i] = *conf
}

async := false
if v, ok := imc.Annotations[v1.AsyncHandlerAnnotation]; ok {
b, err := strconv.ParseBool(v)
if err == nil {
async = b
}
}

return &multichannelfanout.ChannelConfig{
Namespace: imc.Namespace,
Name: imc.Name,
HostName: imc.Status.Address.URL.Host,
Path: fmt.Sprintf("%s/%s", imc.Namespace, imc.Name),
FanoutConfig: fanout.Config{
AsyncHandler: false,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like a good idea to pass in the bool via configuration/annotation in case folks want to have it "async", and do not care about the impact.

AsyncHandler: async,
Subscriptions: subs,
},
}, nil
Expand Down
105 changes: 105 additions & 0 deletions pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ package dispatcher
import (
"context"
"net/http"
"reflect"
"testing"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
clientgotesting "k8s.io/client-go/testing"
Expand Down Expand Up @@ -662,3 +664,106 @@ func (f *fakeMultiChannelHandler) GetChannelHandler(host string) fanout.EventHan
func (f *fakeMultiChannelHandler) CountChannelHandlers() int {
return len(f.handlers)
}

func Test_newConfigForInMemoryChannelAsyncHandler(t *testing.T) {
ctx, _ := SetupFakeContext(t, SetUpInformerSelector)

type args struct {
ctx context.Context
imc *v1.InMemoryChannel
}
tests := []struct {
name string
args args
wantAsync bool
wantErr bool
}{
{
name: "async handler",
args: args{
ctx: ctx,
imc: &v1.InMemoryChannel{
ObjectMeta: metav1.ObjectMeta{
Name: "n",
Namespace: "ns",
Annotations: map[string]string{
v1.AsyncHandlerAnnotation: "true",
},
},
Status: v1.InMemoryChannelStatus{
ChannelableStatus: eventingduckv1.ChannelableStatus{
AddressStatus: duckv1.AddressStatus{
Address: &duckv1.Addressable{
URL: apis.HTTPS("something"),
},
},
},
},
},
},
wantAsync: true,
wantErr: false,
},
{
name: "sync handler, default",
args: args{
ctx: ctx,
imc: &v1.InMemoryChannel{
ObjectMeta: metav1.ObjectMeta{
Name: "n",
Namespace: "ns",
},
Status: v1.InMemoryChannelStatus{
ChannelableStatus: eventingduckv1.ChannelableStatus{
AddressStatus: duckv1.AddressStatus{
Address: &duckv1.Addressable{
URL: apis.HTTPS("something"),
},
},
},
},
},
},
wantAsync: false,
wantErr: false,
},
{
name: "sync handler, explicit",
args: args{
ctx: ctx,
imc: &v1.InMemoryChannel{
ObjectMeta: metav1.ObjectMeta{
Name: "n",
Namespace: "ns",
Annotations: map[string]string{
v1.AsyncHandlerAnnotation: "false",
},
},
Status: v1.InMemoryChannelStatus{
ChannelableStatus: eventingduckv1.ChannelableStatus{
AddressStatus: duckv1.AddressStatus{
Address: &duckv1.Addressable{
URL: apis.HTTPS("something"),
},
},
},
},
},
},
wantAsync: false,
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := newConfigForInMemoryChannel(tt.args.ctx, tt.args.imc)
if (err != nil) != tt.wantErr {
t.Errorf("newConfigForInMemoryChannel() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got.FanoutConfig.AsyncHandler, tt.wantAsync) {
t.Errorf("newConfigForInMemoryChannel() got = %v, want %v", got, tt.wantAsync)
}
})
}
}
9 changes: 9 additions & 0 deletions pkg/reconciler/testing/v1/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,15 @@ func WithBrokerFinalizers(finalizers ...string) BrokerOption {
}
}

func WithBrokerAnnotation(key, value string) BrokerOption {
return func(b *v1.Broker) {
if b.Annotations == nil {
b.Annotations = map[string]string{}
}
b.Annotations[key] = value
}
}

func WithBrokerResourceVersion(rv string) BrokerOption {
return func(b *v1.Broker) {
b.ResourceVersion = rv
Expand Down
16 changes: 16 additions & 0 deletions test/rekt/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,22 @@ func TestBrokerRedelivery(t *testing.T) {
env.TestSet(ctx, t, broker.BrokerRedelivery())
}

// TestBrokerPropagatesMetadata test Broker reconciler propagates metadata to channel.
func TestBrokerPropagatesMetadata(t *testing.T) {
t.Parallel()

ctx, env := global.Environment(
knative.WithKnativeNamespace(system.Namespace()),
knative.WithLoggingConfig,
knative.WithTracingConfig,
k8s.WithEventListener,
environment.Managed(t),
environment.WithPollTimings(5*time.Second, 4*time.Minute),
)

env.ParallelTest(ctx, t, broker.PropagatesMetadata())
}

func TestBrokerDeadLetterSinkExtensions(t *testing.T) {
t.Parallel()

Expand Down
24 changes: 23 additions & 1 deletion test/rekt/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ import (
"testing"
"time"

"knative.dev/eventing/test/rekt/features/authz"
"knative.dev/reconciler-test/pkg/feature"

"knative.dev/eventing/test/rekt/features/authz"

"github.com/cloudevents/sdk-go/v2/binding"
duckv1 "knative.dev/pkg/apis/duck/v1"
"knative.dev/pkg/system"
Expand Down Expand Up @@ -205,6 +206,27 @@ func TestChannelDeadLetterSink(t *testing.T) {
env.Test(ctx, t, channel.DeadLetterSink(createSubscriberFn))
}

/*
TestChannelAsyncHandler tests if the async handler can be configured on the channel.
*/
func TestChannelAsyncHandler(t *testing.T) {
t.Parallel()

ctx, env := global.Environment(
knative.WithKnativeNamespace(system.Namespace()),
knative.WithLoggingConfig,
knative.WithTracingConfig,
k8s.WithEventListener,
environment.Managed(t),
)

createSubscriberFn := func(ref *duckv1.KReference, uri string) manifest.CfgFn {
return subscription.WithSubscriber(ref, uri, "")
}
env.ParallelTest(ctx, t, channel.AsyncHandler(createSubscriberFn))
env.ParallelTest(ctx, t, channel.AsyncHandlerUpdate(createSubscriberFn))
}

// TestGenericChannelDeadLetterSink tests if the events that cannot be delivered end up in
// the dead letter sink.
func TestGenericChannelDeadLetterSink(t *testing.T) {
Expand Down
Loading
Loading