Skip to content

Commit

Permalink
Use mt-broker-filter as the Audience of a Triggers Subscriptions Su…
Browse files Browse the repository at this point in the history
…bscriber (#7319)

Set Audience for Triggers Subscription Subscriber to "mt-broker-filter"
  • Loading branch information
creydr authored Oct 3, 2023
1 parent eec9a52 commit d1c3f15
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 4 deletions.
10 changes: 8 additions & 2 deletions pkg/reconciler/broker/trigger/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ const (
subscriptionDeleteFailed = "SubscriptionDeleteFailed"
subscriptionCreateFailed = "SubscriptionCreateFailed"
subscriptionGetFailed = "SubscriptionGetFailed"

FilterAudience = "mt-broker-filter"
)

type Reconciler struct {
Expand Down Expand Up @@ -207,8 +209,8 @@ func (r *Reconciler) resolveDeadLetterSink(ctx context.Context, b *eventingv1.Br
// subscribeToBrokerChannel subscribes service 'svc' to the Broker's channels.
func (r *Reconciler) subscribeToBrokerChannel(ctx context.Context, b *eventingv1.Broker, t *eventingv1.Trigger, brokerTrigger *corev1.ObjectReference) (*messagingv1.Subscription, error) {
var dest *duckv1.Destination
transportEncryptionFlags := feature.FromContext(ctx)
if transportEncryptionFlags.IsPermissiveTransportEncryption() || transportEncryptionFlags.IsStrictTransportEncryption() {
featureFlags := feature.FromContext(ctx)
if featureFlags.IsPermissiveTransportEncryption() || featureFlags.IsStrictTransportEncryption() {
caCerts, err := r.getCaCerts()
if err != nil {
return nil, fmt.Errorf("failed to get CA certs: %w", err)
Expand All @@ -232,6 +234,10 @@ func (r *Reconciler) subscribeToBrokerChannel(ctx context.Context, b *eventingv1
}
}

if featureFlags.IsOIDCAuthentication() {
dest.Audience = pointer.String(FilterAudience)
}

// Note that we have to hard code the brokerGKV stuff as sometimes typemeta is not
// filled in. So instead of b.TypeMeta.Kind and b.TypeMeta.APIVersion, we have to
// do it this way.
Expand Down
60 changes: 58 additions & 2 deletions pkg/reconciler/broker/trigger/trigger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1493,7 +1493,7 @@ func TestReconcile(t *testing.T) {
feature.OIDCAuthentication: feature.Enabled,
}),
Objects: allBrokerObjectsReadyPlus([]runtime.Object{
makeReadySubscription(testNS),
makeReadySubscriptionWithAudience(testNS),
NewTrigger(triggerName, testNS, brokerName,
WithTriggerUID(triggerUID),
WithTriggerSubscriberURI(subscriberURI),
Expand Down Expand Up @@ -1527,7 +1527,7 @@ func TestReconcile(t *testing.T) {
feature.OIDCAuthentication: feature.Enabled,
}),
Objects: allBrokerObjectsReadyPlus([]runtime.Object{
makeReadySubscription(testNS),
makeReadySubscriptionWithAudience(testNS),
makeTriggerOIDCServiceAccountWithoutOwnerRef(),
NewTrigger(triggerName, testNS, brokerName,
WithTriggerUID(triggerUID),
Expand Down Expand Up @@ -1556,6 +1556,49 @@ func TestReconcile(t *testing.T) {
Eventf(corev1.EventTypeWarning, "InternalError", fmt.Sprintf("service account %s not owned by Trigger %s", makeTriggerOIDCServiceAccountWithoutOwnerRef().Name, triggerName)),
},
},
{
Name: "OIDC: set Audience of broker-filter in Subscription",
Key: testKey,
Ctx: feature.ToContext(context.Background(), feature.Flags{
feature.OIDCAuthentication: feature.Enabled,
}),
Objects: allBrokerObjectsReadyPlus([]runtime.Object{
makeReadySubscription(testNS),
makeTriggerOIDCServiceAccount(),
NewTrigger(triggerName, testNS, brokerName,
WithTriggerUID(triggerUID),
WithTriggerSubscriberURI(subscriberURI),
WithInitTriggerConditions,
)}...),
WantErr: false,
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: NewTrigger(triggerName, testNS, brokerName,
WithTriggerUID(triggerUID),
WithTriggerSubscriberURI(subscriberURI),
WithTriggerBrokerReady(),
// The first reconciliation will initialize the status conditions.
WithInitTriggerConditions,
WithTriggerDependencyReady(),
WithTriggerSubscribed(),
WithTriggerStatusSubscriberURI(subscriberURI),
WithTriggerSubscriberResolvedSucceeded(),
WithTriggerDeadLetterSinkNotConfigured(),
WithTriggerSubscriptionNotConfigured(),
WithTriggerOIDCIdentityCreatedSucceeded(),
WithTriggerOIDCServiceAccountName(makeTriggerOIDCServiceAccount().Name),
),
}},
WantCreates: []runtime.Object{
resources.NewSubscription(makeTrigger(testNS), createTriggerChannelRef(), makeBrokerRef(), makeServiceURIWithAudience(), makeEmptyDelivery()),
},
WantDeletes: []clientgotesting.DeleteActionImpl{{
ActionImpl: clientgotesting.ActionImpl{
Namespace: testNS,
Resource: eventingduckv1.SchemeGroupVersion.WithResource("subscriptions"),
},
Name: subscriptionName,
}},
},
}

logger := logtesting.TestLogger(t)
Expand Down Expand Up @@ -1696,6 +1739,13 @@ func makeServiceURI() *duckv1.Destination {
}
}

func makeServiceURIWithAudience() *duckv1.Destination {
dst := makeServiceURI()
dst.Audience = ptr.String(FilterAudience)

return dst
}

func makeServiceURIHTTPS() *duckv1.Destination {
return &duckv1.Destination{
URI: &apis.URL{
Expand Down Expand Up @@ -1807,6 +1857,12 @@ func makeReadySubscription(subscriberNamespace string) *messagingv1.Subscription
return s
}

func makeReadySubscriptionWithAudience(subscriberNamespace string) *messagingv1.Subscription {
s := makeReadySubscription(subscriberNamespace)
s.Spec.Subscriber.Audience = ptr.String(FilterAudience)
return s
}

func makeSubscriberAddressableAsUnstructured(subscriberNamespace string) *unstructured.Unstructured {
return &unstructured.Unstructured{
Object: map[string]interface{}{
Expand Down

0 comments on commit d1c3f15

Please sign in to comment.