Skip to content

Commit

Permalink
[release-1.16] MT-Broker: return retriable status code based on the s…
Browse files Browse the repository at this point in the history
…tate to leverage retries (#8367)

* MT-Broker: return appropriate status code based on the state to leverage retries

The ingress or filter deployments were returning 400 even in the case
where a given resource (like trigger, broker, subscription) wasn't
found, however, this is a common case where the lister cache
hasn't caught up with the latest state.

Signed-off-by: Pierangelo Di Pilato <[email protected]>

* Fix unit tests

Signed-off-by: Pierangelo Di Pilato <[email protected]>

---------

Signed-off-by: Pierangelo Di Pilato <[email protected]>
Co-authored-by: Pierangelo Di Pilato <[email protected]>
  • Loading branch information
knative-prow-robot and pierDipi authored Dec 3, 2024
1 parent 96ab579 commit 9740b12
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 18 deletions.
39 changes: 32 additions & 7 deletions pkg/broker/filter/filter_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"net/http"
"time"

apierrors "k8s.io/apimachinery/pkg/api/errors"

messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1"
messaginginformers "knative.dev/eventing/pkg/client/informers/externalversions/messaging/v1"
"knative.dev/eventing/pkg/reconciler/broker/resources"
Expand Down Expand Up @@ -178,16 +180,14 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
}

trigger, err := h.getTrigger(triggerRef)
if err != nil {
h.logger.Info("Unable to get the Trigger", zap.Error(err), zap.Any("triggerRef", triggerRef))
writer.WriteHeader(http.StatusBadRequest)
if apierrors.IsNotFound(err) {
h.logger.Info("Unable to find the Trigger", zap.Error(err), zap.Any("triggerRef", triggerRef))
writer.WriteHeader(http.StatusNotFound)
return
}

subscription, err := h.getSubscription(features, trigger)
if err != nil {
h.logger.Info("Unable to get the Subscription of the Trigger", zap.Error(err), zap.Any("triggerRef", triggerRef))
writer.WriteHeader(http.StatusInternalServerError)
h.logger.Info("Unable to get the Trigger", zap.Error(err), zap.Any("triggerRef", triggerRef))
writer.WriteHeader(http.StatusBadRequest)
return
}

Expand Down Expand Up @@ -216,6 +216,18 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
if features.IsOIDCAuthentication() {
h.logger.Debug("OIDC authentication is enabled")

subscription, err := h.getSubscription(features, trigger)
if apierrors.IsNotFound(err) {
h.logger.Info("Unable to find the Subscription for trigger", zap.Error(err), zap.Any("triggerRef", triggerRef))
writer.WriteHeader(http.StatusNotFound)
return
}
if err != nil {
h.logger.Info("Unable to get the Subscription of the Trigger", zap.Error(err), zap.Any("triggerRef", triggerRef))
writer.WriteHeader(http.StatusInternalServerError)
return
}

audience := FilterAudience

if subscription.Status.Auth == nil || subscription.Status.Auth.ServiceAccountName == nil {
Expand Down Expand Up @@ -266,6 +278,11 @@ func (h *Handler) handleDispatchToReplyRequest(ctx context.Context, trigger *eve
}

broker, err := h.brokerLister.Brokers(brokerNamespace).Get(brokerName)
if apierrors.IsNotFound(err) {
h.logger.Info("Unable to get the Broker", zap.Error(err))
writer.WriteHeader(http.StatusNotFound)
return
}
if err != nil {
h.logger.Info("Unable to get the Broker", zap.Error(err))
writer.WriteHeader(http.StatusBadRequest)
Expand Down Expand Up @@ -311,6 +328,11 @@ func (h *Handler) handleDispatchToDLSRequest(ctx context.Context, trigger *event
brokerNamespace = trigger.Namespace
}
broker, err := h.brokerLister.Brokers(brokerNamespace).Get(brokerName)
if apierrors.IsNotFound(err) {
h.logger.Info("Unable to get the Broker", zap.Error(err))
writer.WriteHeader(http.StatusNotFound)
return
}
if err != nil {
h.logger.Info("Unable to get the Broker", zap.Error(err))
writer.WriteHeader(http.StatusBadRequest)
Expand All @@ -331,6 +353,9 @@ func (h *Handler) handleDispatchToDLSRequest(ctx context.Context, trigger *event
Audience: broker.Status.DeadLetterSinkAudience,
}
}
if target == nil {
return
}

reportArgs := &ReportArgs{
ns: trigger.Namespace,
Expand Down
12 changes: 7 additions & 5 deletions pkg/broker/filter/filter_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@ import (
"testing"
"time"

"knative.dev/eventing/pkg/eventingtls"
filteredFactory "knative.dev/pkg/client/injection/kube/informers/factory/filtered"
"knative.dev/pkg/configmap"
"knative.dev/pkg/system"

"knative.dev/eventing/pkg/eventingtls"

messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1"
"knative.dev/eventing/pkg/reconciler/broker/resources"

Expand Down Expand Up @@ -64,10 +65,11 @@ import (
eventpolicyinformerfake "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventpolicy/fake"
subscriptioninformerfake "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/subscription/fake"

// Fake injection client
_ "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventpolicy/fake"
_ "knative.dev/pkg/client/injection/kube/client/fake"
_ "knative.dev/pkg/client/injection/kube/informers/factory/filtered/fake"

// Fake injection client
_ "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventpolicy/fake"
)

const (
Expand Down Expand Up @@ -121,7 +123,7 @@ func TestReceiver(t *testing.T) {
expectedStatus: http.StatusBadRequest,
},
"Path too long": {
request: httptest.NewRequest(http.MethodPost, "/triggers/test-namespace/test-trigger/extra", nil),
request: httptest.NewRequest(http.MethodPost, "/triggers/test-namespace/test-trigger/uuid/extra/extra", nil),
expectedStatus: http.StatusBadRequest,
},
"Path without prefix": {
Expand All @@ -130,7 +132,7 @@ func TestReceiver(t *testing.T) {
},
"Trigger.Get fails": {
// No trigger exists, so the Get will fail.
expectedStatus: http.StatusBadRequest,
expectedStatus: http.StatusNotFound,
},
"Trigger doesn't have SubscriberURI": {
triggers: []*eventingv1.Trigger{
Expand Down
8 changes: 7 additions & 1 deletion pkg/broker/ingress/ingress_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"strings"
"time"

apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/utils/ptr"

opencensusclient "github.com/cloudevents/sdk-go/observability/opencensus/v2/client"
Expand Down Expand Up @@ -226,6 +227,11 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
}

broker, err := h.getBroker(brokerName, brokerNamespace)
if apierrors.IsNotFound(err) {
h.Logger.Warn("Failed to retrieve broker", zap.Error(err))
writer.WriteHeader(http.StatusNotFound)
return
}
if err != nil {
h.Logger.Warn("Failed to retrieve broker", zap.Error(err))
writer.WriteHeader(http.StatusBadRequest)
Expand Down Expand Up @@ -315,7 +321,7 @@ func (h *Handler) receive(ctx context.Context, headers http.Header, event *cloud
channelAddress, err := h.getChannelAddress(brokerObj)
if err != nil {
h.Logger.Warn("could not get channel address from broker", zap.Error(err))
return http.StatusBadRequest, kncloudevents.NoDuration
return http.StatusInternalServerError, kncloudevents.NoDuration
}

opts := []kncloudevents.SendOption{
Expand Down
12 changes: 7 additions & 5 deletions pkg/broker/ingress/ingress_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@ import (
"testing"
"time"

"knative.dev/eventing/pkg/eventingtls"
filteredconfigmapinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap/filtered/fake"
filteredFactory "knative.dev/pkg/client/injection/kube/informers/factory/filtered"
"knative.dev/pkg/system"

"knative.dev/eventing/pkg/eventingtls"

"github.com/cloudevents/sdk-go/v2/client"
"github.com/cloudevents/sdk-go/v2/event"
cehttp "github.com/cloudevents/sdk-go/v2/protocol/http"
Expand All @@ -54,10 +55,11 @@ import (
brokerinformerfake "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/broker/fake"
eventpolicyinformerfake "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventpolicy/fake"

// Fake injection client
_ "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventpolicy/fake"
_ "knative.dev/pkg/client/injection/kube/client/fake"
_ "knative.dev/pkg/client/injection/kube/informers/factory/filtered/fake"

// Fake injection client
_ "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventpolicy/fake"
)

const (
Expand Down Expand Up @@ -223,9 +225,9 @@ func TestHandler_ServeHTTP(t *testing.T) {
method: nethttp.MethodPost,
uri: "/ns/name",
body: getValidEvent(),
statusCode: nethttp.StatusBadRequest,
statusCode: nethttp.StatusInternalServerError,
handler: handler(),
reporter: &mockReporter{StatusCode: nethttp.StatusBadRequest, EventDispatchTimeReported: false},
reporter: &mockReporter{StatusCode: nethttp.StatusInternalServerError, EventDispatchTimeReported: false},
defaulter: broker.TTLDefaulter(logger, 100),
brokers: []*eventingv1.Broker{
withUninitializedAnnotations(makeBroker("name", "ns")),
Expand Down

0 comments on commit 9740b12

Please sign in to comment.