Skip to content

Commit

Permalink
Update dependencies (#3485)
Browse files Browse the repository at this point in the history
* Run update-deps.sh

* Fix build issue

* Add Subscribers update verification

* Add call to CheckSubscribersChangeAllowed()

* Use eventing controller SA name
  • Loading branch information
creydr authored Nov 29, 2023
1 parent 846f546 commit 9a4a161
Show file tree
Hide file tree
Showing 20 changed files with 228 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
"knative.dev/pkg/kmp"
)

const eventingControllerSAName = "system:serviceaccount:knative-eventing:eventing-controller"

func (kc *KafkaChannel) Validate(ctx context.Context) *apis.FieldError {
errs := kc.Spec.Validate(ctx).ViaField("spec")

Expand All @@ -45,6 +47,7 @@ func (kc *KafkaChannel) Validate(ctx context.Context) *apis.FieldError {
if apis.IsInUpdate(ctx) {
original := apis.GetBaseline(ctx).(*KafkaChannel)
errs = errs.Also(kc.CheckImmutableFields(ctx, original))
errs = errs.Also(kc.CheckSubscribersChangeAllowed(ctx, original))
}

return errs
Expand Down Expand Up @@ -113,3 +116,44 @@ func (kc *KafkaChannel) CheckImmutableFields(_ context.Context, original *KafkaC

return nil
}

func (kc *KafkaChannel) CheckSubscribersChangeAllowed(ctx context.Context, original *KafkaChannel) *apis.FieldError {
if original == nil {
return nil
}

if !canChangeChannelSpecAuth(ctx) {
return kc.checkSubsciberSpecAuthChanged(ctx, original)
}
return nil
}

func (kc *KafkaChannel) checkSubsciberSpecAuthChanged(ctx context.Context, original *KafkaChannel) *apis.FieldError {
if diff, err := kmp.ShortDiff(original.Spec.Subscribers, kc.Spec.Subscribers); err != nil {
return &apis.FieldError{
Message: "Failed to diff Channel.Spec.Subscribers",
Paths: []string{"spec.subscribers"},
Details: err.Error(),
}
} else if diff != "" {
user := apis.GetUserInfo(ctx)
userName := ""
if user != nil {
userName = user.Username
}
return &apis.FieldError{
Message: fmt.Sprintf("Channel.Spec.Subscribers changed by user %s which was not the %s service account", userName, eventingControllerSAName),
Paths: []string{"spec.subscribers"},
Details: diff,
}
}
return nil
}

func canChangeChannelSpecAuth(ctx context.Context) bool {
user := apis.GetUserInfo(ctx)
if user == nil {
return false
}
return user.Username == eventingControllerSAName
}
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ require (
k8s.io/apiserver v0.27.6
k8s.io/client-go v0.27.6
k8s.io/utils v0.0.0-20230209194617-a36077c30491
knative.dev/eventing v0.39.1-0.20231123120018-116abe23fcf8
knative.dev/eventing v0.39.1-0.20231124172449-d989ca732fad
knative.dev/hack v0.0.0-20231122182901-eb352426ecc1
knative.dev/pkg v0.0.0-20231122190403-23f3ee2ee35b
knative.dev/reconciler-test v0.0.0-20231123130922-55110f3262c7
knative.dev/pkg v0.0.0-20231123185329-ea6ea8440341
knative.dev/reconciler-test v0.0.0-20231124074205-94985a9f3833
sigs.k8s.io/controller-runtime v0.12.3
sigs.k8s.io/yaml v1.4.0
)
Expand Down
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1252,14 +1252,14 @@ k8s.io/utils v0.0.0-20200912215256-4140de9c8800/go.mod h1:jPW/WVKK9YHAvNhRxK0md/
k8s.io/utils v0.0.0-20210819203725-bdf08cb9a70a/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
k8s.io/utils v0.0.0-20230209194617-a36077c30491 h1:r0BAOLElQnnFhE/ApUsg3iHdVYYPBjNSSOMowRZxxsY=
k8s.io/utils v0.0.0-20230209194617-a36077c30491/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
knative.dev/eventing v0.39.1-0.20231123120018-116abe23fcf8 h1:tyjld32c922UQxSBPeg4mux+VZdNgiOoCYkIww32R/Q=
knative.dev/eventing v0.39.1-0.20231123120018-116abe23fcf8/go.mod h1:mpkcqSdX6s/b1WtmtWUUFzwDv6UNqmyqYtVuVS6u67A=
knative.dev/eventing v0.39.1-0.20231124172449-d989ca732fad h1:KNtNoC4ojhvdmFTKM7rcbylKua7SmOxwajeuWDPAFFs=
knative.dev/eventing v0.39.1-0.20231124172449-d989ca732fad/go.mod h1:hibqweYk29xszfCrMZLYllRqsJJUl4K6pLBB1P/s5Bo=
knative.dev/hack v0.0.0-20231122182901-eb352426ecc1 h1:ZUkFAaq5gEls7bJ2ADLg+ZJVtN3KUcHEWx5ngLfacuQ=
knative.dev/hack v0.0.0-20231122182901-eb352426ecc1/go.mod h1:yk2OjGDsbEnQjfxdm0/HJKS2WqTLEFg/N6nUs6Rqx3Q=
knative.dev/pkg v0.0.0-20231122190403-23f3ee2ee35b h1:ZDodmDWTYRhSQJMir3Q7P5GyRM+w5W1J2fMBto3pfJ8=
knative.dev/pkg v0.0.0-20231122190403-23f3ee2ee35b/go.mod h1:tLmRcSDWCzcsoitR+xfghXDxFYdOMIMdBXiazmdmb2Y=
knative.dev/reconciler-test v0.0.0-20231123130922-55110f3262c7 h1:oFHCVnuCCgpkv7d9wdXAwVIzmA1GyOXelAbyE5QEsBo=
knative.dev/reconciler-test v0.0.0-20231123130922-55110f3262c7/go.mod h1:V5dY5ZYfAwVe2JzJ6+WSwg1v9uzTrDR/vb0EHC01C+g=
knative.dev/pkg v0.0.0-20231123185329-ea6ea8440341 h1:GVSTPofS7DbTTxoJw4TwfxNWCzH4QcSI5jm4F7tgDPE=
knative.dev/pkg v0.0.0-20231123185329-ea6ea8440341/go.mod h1:uOiSmQ4t36/4qxaY+hrrgrNNNkDqj6BGZVjtV+cQ+V4=
knative.dev/reconciler-test v0.0.0-20231124074205-94985a9f3833 h1:nKY+GdskcfjfraLFSFnPF3VbdO5m74pJwLjNPQvB4Ew=
knative.dev/reconciler-test v0.0.0-20231124074205-94985a9f3833/go.mod h1:cuMM/h2jDwNju8rhdqoqFQHq5xT0BlMvYeZHKWg1vSA=
pgregory.net/rapid v1.1.0 h1:CMa0sjHSru3puNx+J0MIAuiiEV4N0qj8/cMWGBBCsjw=
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
Expand Down
10 changes: 5 additions & 5 deletions test/e2e_new_channel/kafka_channel_upstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func TestSmoke_ChannelWithSubscription(t *testing.T) {
for _, name := range names {
env.Test(ctx, t, channel.SubscriptionGoesReady(name,
subscription.WithChannel(chRef),
subscription.WithSubscriber(nil, "http://example.com")),
subscription.WithSubscriber(nil, "http://example.com", "")),
)
}
}
Expand Down Expand Up @@ -146,7 +146,7 @@ func TestSmoke_ChannelImplWithSubscription(t *testing.T) {
for _, name := range names {
env.Test(ctx, t, channel.SubscriptionGoesReady(name,
subscription.WithChannel(chRef),
subscription.WithSubscriber(nil, "http://example.com")),
subscription.WithSubscriber(nil, "http://example.com", "")),
)
}
}
Expand Down Expand Up @@ -200,7 +200,7 @@ func TestChannelChain(t *testing.T) {
)

createSubscriberFn := func(ref *duckv1.KReference, uri string) manifest.CfgFn {
return subscription.WithSubscriber(ref, uri)
return subscription.WithSubscriber(ref, uri, "")
}
env.Test(ctx, t, channel.ChannelChain(10, createSubscriberFn))
}
Expand All @@ -223,7 +223,7 @@ func TestChannelDeadLetterSink(t *testing.T) {
)

createSubscriberFn := func(ref *duckv1.KReference, uri string) manifest.CfgFn {
return subscription.WithSubscriber(ref, uri)
return subscription.WithSubscriber(ref, uri, "")
}
env.Test(ctx, t, channel.DeadLetterSink(createSubscriberFn))
}
Expand Down Expand Up @@ -342,7 +342,7 @@ func TestChannelDeadLetterSinkGenericChannel(t *testing.T) {
)

createSubscriberFn := func(ref *duckv1.KReference, uri string) manifest.CfgFn {
return subscription.WithSubscriber(ref, uri)
return subscription.WithSubscriber(ref, uri, "")
}
env.Test(ctx, t, channel.DeadLetterSinkGenericChannel(createSubscriberFn))
}
2 changes: 1 addition & 1 deletion test/e2e_new_channel/tracing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func eventWithTraceExported() *feature.Feature {
f.Setup("install channel", channel_impl.Install(channelName))
f.Setup("install subscription", subscription.Install(subName,
subscription.WithChannel(channel_impl.AsRef(channelName)),
subscription.WithSubscriber(service.AsKReference(sinkName), ""),
subscription.WithSubscriber(service.AsKReference(sinkName), "", ""),
))

f.Setup("subscription is ready", subscription.IsReady(subName))
Expand Down
2 changes: 1 addition & 1 deletion test/rekt/features/kafkachannel/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func setupSubscription(f *feature.Feature, name string, receiverName string) {
Kind: "Service",
Name: receiverName,
APIVersion: "v1",
}, ""),
}, "", ""),
))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ type SubscriberSpec struct {
// DeliverySpec contains options controlling the event delivery
// +optional
Delivery *DeliverySpec `json:"delivery,omitempty"`
// Auth contains the service account name for the subscription
// +optional
Auth *duckv1.AuthStatus `json:"auth,omitempty"`
}

// SubscriberStatus defines the status of a single subscriber to a Channel.
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions vendor/knative.dev/eventing/pkg/apis/eventing/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ const (
// https://www.rfc-editor.org/rfc/rfc7468
BrokerChannelCACertsStatusAnnotationKey = "knative.dev/channelCACerts"

// BrokerChannelAudienceStatusAnnotationKey is the broker status annotation
// key used to specify the channels OIDC audience.
BrokerChannelAudienceStatusAnnotationKey = "knative.dev/channelAudience"

// BrokerChannelAPIVersionStatusAnnotationKey is the broker status
// annotation key used to specify the APIVersion of the channel for
// the triggers to subscribe to.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@ import (
"fmt"

"knative.dev/pkg/apis"
"knative.dev/pkg/kmp"

"knative.dev/eventing/pkg/apis/eventing"
)

const eventingControllerSAName = "system:serviceaccount:knative-eventing:eventing-controller"

func (imc *InMemoryChannel) Validate(ctx context.Context) *apis.FieldError {
errs := imc.Spec.Validate(ctx).ViaField("spec")

Expand All @@ -39,6 +42,12 @@ func (imc *InMemoryChannel) Validate(ctx context.Context) *apis.FieldError {
}
}

if apis.IsInUpdate(ctx) {
// Validate that if any changes were made to spec.subscribers, they were made by the eventing-controller
original := apis.GetBaseline(ctx).(*InMemoryChannel)
errs = errs.Also(imc.CheckSubscribersChangeAllowed(ctx, original))
}

return errs
}

Expand All @@ -54,3 +63,44 @@ func (imcs *InMemoryChannelSpec) Validate(ctx context.Context) *apis.FieldError

return errs
}

func (imc *InMemoryChannel) CheckSubscribersChangeAllowed(ctx context.Context, original *InMemoryChannel) *apis.FieldError {
if original == nil {
return nil
}

if !canChangeChannelSpecAuth(ctx) {
return imc.checkSubsciberSpecAuthChanged(original, ctx)
}
return nil
}

func (imc *InMemoryChannel) checkSubsciberSpecAuthChanged(original *InMemoryChannel, ctx context.Context) *apis.FieldError {
if diff, err := kmp.ShortDiff(original.Spec.Subscribers, imc.Spec.Subscribers); err != nil {
return &apis.FieldError{
Message: "Failed to diff Channel.Spec.Subscribers",
Paths: []string{"spec.subscribers"},
Details: err.Error(),
}
} else if diff != "" {
user := apis.GetUserInfo(ctx)
userName := ""
if user != nil {
userName = user.Username
}
return &apis.FieldError{
Message: fmt.Sprintf("Channel.Spec.Subscribers changed by user %s which was not the %s service account", userName, eventingControllerSAName),
Paths: []string{"spec.subscribers"},
Details: diff,
}
}
return nil
}

func canChangeChannelSpecAuth(ctx context.Context) bool {
user := apis.GetUserInfo(ctx)
if user == nil {
return false
}
return user.Username == eventingControllerSAName
}
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,15 @@ func WithChannelCACertsAnnotation(caCerts string) BrokerOption {
}
}

func WithChannelAudienceAnnotation(audience string) BrokerOption {
return func(b *v1.Broker) {
if b.Status.Annotations == nil {
b.Status.Annotations = make(map[string]string, 1)
}
b.Status.Annotations[eventing.BrokerChannelAudienceStatusAnnotationKey] = audience
}
}

func WithBrokerStatusDLS(dls duckv1.Addressable) BrokerOption {
return func(b *v1.Broker) {
b.Status.MarkDeadLetterSinkResolvedSucceeded(eventingv1.NewDeliveryStatusFromAddressable(&dls))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package helpers

import (
"context"
"fmt"
"testing"

"encoding/json"
Expand All @@ -30,6 +31,7 @@ import (

eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1"
testlib "knative.dev/eventing/test/lib"
"knative.dev/eventing/test/lib/resources"
"knative.dev/pkg/apis"
)

Expand Down Expand Up @@ -98,13 +100,26 @@ func channelSpecAllowsSubscribersArray(t *testing.T, client *testlib.Client, cha
t.Fatalf("Error unmarshaling %s: %s", u, err)
}

// the admission webhook should deny attempts to modify spec.subscribers directly from anything other than eventing-controller
err = client.RetryWebhookErrors(func(attempt int) error {
_, e := client.Dynamic.Resource(gvr).Namespace(client.Namespace).Update(context.Background(), u, metav1.UpdateOptions{})
if e != nil {
t.Logf("Failed to update channel spec at attempt %d %q %q: %v", attempt, channel.Kind, channelName, e)
}
return e
})
if err == nil {
return fmt.Errorf("channel validation should prevent direct updates to spec.subcribers except when made by eventing-controller")
}
subscriptionName := names.SimpleNameGenerator.GenerateName("channel-spec-subscribers-")
subscriberUrl, _ := apis.ParseURL("http://localhost")
client.CreateSubscriptionOrFail(subscriptionName, channelName, &channel, resources.WithURIForSubscription(subscriberUrl))
client.WaitForResourceReadyOrFail(subscriptionName, testlib.SubscriptionTypeMeta)
client.WaitForResourceReadyOrFail(channelName, &channel)
channelable, err = getChannelAsChannelable(channelName, client, channel)
if len(channelable.Spec.Subscribers) != 1 {
return fmt.Errorf("channel.Spec.Subscribers was not updated by the subscription reconciler")
}
return err
})
if err != nil {
Expand Down
11 changes: 11 additions & 0 deletions vendor/knative.dev/eventing/test/lib/resources/eventing.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,17 @@ func WithSubscriberForSubscription(name string) SubscriptionOption {
}
}

// WithURIForSubscription returns an option that adds a URI for the given v1 Subscription
func WithURIForSubscription(uri *apis.URL) SubscriptionOption {
return func(s *messagingv1.Subscription) {
if uri != nil {
s.Spec.Subscriber = &duckv1.Destination{
URI: uri,
}
}
}
}

// WithReplyForSubscription returns an options that adds a ReplyStrategy for the given v1 Subscription.
func WithReplyForSubscription(name string, typemeta *metav1.TypeMeta) SubscriptionOption {
return func(s *messagingv1.Subscription) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func ManyTriggers() *feature.FeatureSet {

func BrokerWorkFlowWithTransformation() *feature.FeatureSet {
createSubscriberFn := func(ref *v1.KReference, uri string) manifest.CfgFn {
return subscription.WithSubscriber(ref, uri)
return subscription.WithSubscriber(ref, uri, "")
}
fs := &feature.FeatureSet{
Name: "Knative Broker - Transformation - Channel flow and Trigger event flow",
Expand Down
Loading

0 comments on commit 9a4a161

Please sign in to comment.