-
Notifications
You must be signed in to change notification settings - Fork 600
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow configuring (opt-in) IMC async handler #8311
Changes from all commits
55a7c61
cd2b5e9
c200cff
66375bb
b187b70
32bf295
acc4557
35def44
b1e6f3d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,6 +20,7 @@ import ( | |
"context" | ||
"errors" | ||
"fmt" | ||
"strings" | ||
|
||
"go.uber.org/zap" | ||
corev1 "k8s.io/api/core/v1" | ||
|
@@ -108,10 +109,20 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, b *eventingv1.Broker) pk | |
return err | ||
} | ||
|
||
var tmpChannelableSpec duckv1.ChannelableSpec = duckv1.ChannelableSpec{ | ||
var tmpChannelableSpec = duckv1.ChannelableSpec{ | ||
Delivery: b.Spec.Delivery, | ||
} | ||
|
||
metadata := b.ObjectMeta.DeepCopy() | ||
channelAnnotations := map[string]string{ | ||
eventing.ScopeAnnotationKey: eventing.ScopeCluster, | ||
} | ||
for k, v := range metadata.GetAnnotations() { | ||
if strings.HasPrefix(k, messagingv1.SchemeGroupVersion.Group) { | ||
channelAnnotations[k] = v | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this always a string? |
||
} | ||
} | ||
|
||
logging.FromContext(ctx).Infow("Reconciling the trigger channel") | ||
c, err := ducklib.NewPhysicalChannel( | ||
chanMan.template.TypeMeta, | ||
|
@@ -122,7 +133,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, b *eventingv1.Broker) pk | |
*kmeta.NewControllerRef(b), | ||
}, | ||
Labels: TriggerChannelLabels(b.Name, b.Namespace), | ||
Annotations: map[string]string{eventing.ScopeAnnotationKey: eventing.ScopeCluster}, | ||
Annotations: channelAnnotations, | ||
}, | ||
ducklib.WithChannelableSpec(tmpChannelableSpec), | ||
ducklib.WithPhysicalChannelSpec(chanMan.template.Spec), | ||
|
@@ -392,7 +403,9 @@ func (r *Reconciler) reconcileChannel(ctx context.Context, channelResourceInterf | |
return nil, fmt.Errorf("failed to convert %s/%s into Channelable: %w", channelObjRef.Namespace, channelObjRef.Name, err) | ||
} | ||
|
||
if equality.Semantic.DeepEqual(desired.Spec.Delivery, channelable.Spec.Delivery) { | ||
if equality.Semantic.DeepDerivative(desired.Spec.Delivery, channelable.Spec.Delivery) && | ||
equality.Semantic.DeepDerivative(desired.Annotations, channelable.Annotations) && | ||
equality.Semantic.DeepDerivative(desired.Labels, channelable.Labels) { | ||
// If propagated/mutable properties match return the Channel. | ||
return channelable, nil | ||
} | ||
|
@@ -402,12 +415,20 @@ func (r *Reconciler) reconcileChannel(ctx context.Context, channelResourceInterf | |
jsonPatch, err := duckapis.CreatePatch( | ||
// Existing Channel properties | ||
duckv1.Channelable{ | ||
ObjectMeta: metav1.ObjectMeta{ | ||
Annotations: channelable.Annotations, | ||
Labels: channelable.Labels, | ||
}, | ||
Spec: duckv1.ChannelableSpec{ | ||
Delivery: channelable.Spec.Delivery, | ||
}, | ||
}, | ||
// Desired Channel properties | ||
duckv1.Channelable{ | ||
ObjectMeta: metav1.ObjectMeta{ | ||
Annotations: desired.Annotations, | ||
Labels: desired.Labels, | ||
}, | ||
Spec: duckv1.ChannelableSpec{ | ||
Delivery: desired.Spec.Delivery, | ||
}, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,6 +19,7 @@ package dispatcher | |
import ( | ||
"context" | ||
"fmt" | ||
"strconv" | ||
|
||
listers "knative.dev/eventing/pkg/client/listers/messaging/v1" | ||
|
||
|
@@ -250,13 +251,21 @@ func newConfigForInMemoryChannel(ctx context.Context, imc *v1.InMemoryChannel) ( | |
subs[i] = *conf | ||
} | ||
|
||
async := false | ||
if v, ok := imc.Annotations[v1.AsyncHandlerAnnotation]; ok { | ||
b, err := strconv.ParseBool(v) | ||
if err == nil { | ||
async = b | ||
} | ||
} | ||
|
||
return &multichannelfanout.ChannelConfig{ | ||
Namespace: imc.Namespace, | ||
Name: imc.Name, | ||
HostName: imc.Status.Address.URL.Host, | ||
Path: fmt.Sprintf("%s/%s", imc.Namespace, imc.Name), | ||
FanoutConfig: fanout.Config{ | ||
AsyncHandler: false, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. seems like a good idea to pass in the bool via configuration/annotation in case folks want to have it "async", and do not care about the impact. |
||
AsyncHandler: async, | ||
Subscriptions: subs, | ||
}, | ||
}, nil | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we release note that this is not
deprecated
anymore?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is only at the code level / library and we usually don't release note it since pretty much all users don't care