Skip to content

Commit

Permalink
Fix: frequent 503 errors when connecting to a Service experiencing hi…
Browse files Browse the repository at this point in the history
…gh Pod churn (#4754)

* Revert "fix: some status updates are discarded by the status updater (#4337)"

This reverts commit 14830c7.

Signed-off-by: Huabing Zhao <[email protected]>

* store update events and process it later

Signed-off-by: Huabing Zhao <[email protected]>

* rename method

Signed-off-by: Huabing Zhao <[email protected]>

* add release note

Signed-off-by: Huabing Zhao <[email protected]>

---------

Signed-off-by: Huabing Zhao <[email protected]>
  • Loading branch information
zhaohuabing authored Nov 27, 2024
1 parent cda2dcb commit 8ec3095
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 5 deletions.
51 changes: 46 additions & 5 deletions internal/provider/kubernetes/status_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package kubernetes

import (
"context"
"errors"
"time"

"github.com/go-logr/logr"
Expand Down Expand Up @@ -56,14 +57,25 @@ func (m MutatorFunc) Mutate(old client.Object) client.Object {
type UpdateHandler struct {
log logr.Logger
client client.Client
sendUpdates chan struct{}
updateChannel chan Update
writer *UpdateWriter
}

func NewUpdateHandler(log logr.Logger, client client.Client) *UpdateHandler {
sendUpdates := make(chan struct{})
updateChannel := make(chan Update, 100)
return &UpdateHandler{
log: log,
client: client,
updateChannel: make(chan Update, 100),
sendUpdates: sendUpdates,
updateChannel: updateChannel,
writer: &UpdateWriter{
log: log,
enabled: sendUpdates,
updateChannel: updateChannel,
eventsBeforeEnabled: make(chan Update, 1000),
},
}
}

Expand Down Expand Up @@ -127,6 +139,10 @@ func (u *UpdateHandler) Start(ctx context.Context) error {
u.log.Info("started status update handler")
defer u.log.Info("stopped status update handler")

// Enable Updaters to start sending updates to this handler.
close(u.sendUpdates)
u.writer.handleEventsReceivedBeforeEnabled()

for {
select {
case <-ctx.Done():
Expand All @@ -142,9 +158,7 @@ func (u *UpdateHandler) Start(ctx context.Context) error {

// Writer retrieves the interface that should be used to write to the UpdateHandler.
func (u *UpdateHandler) Writer() Updater {
return &UpdateWriter{
updateChannel: u.updateChannel,
}
return u.writer
}

// Updater describes an interface to send status updates somewhere.
Expand All @@ -154,13 +168,40 @@ type Updater interface {

// UpdateWriter takes status updates and sends these to the UpdateHandler via a channel.
type UpdateWriter struct {
log logr.Logger
enabled <-chan struct{}
updateChannel chan<- Update
// a temporary buffer to store events received before the Updater is enabled.
// These events will be sent to the update channel once the Updater is enabled.
eventsBeforeEnabled chan Update
}

// Send sends the given Update off to the update channel for writing by the UpdateHandler.
func (u *UpdateWriter) Send(update Update) {
// Non-blocking receive to see if we should pass along update.
u.updateChannel <- update
select {
case <-u.enabled:
u.updateChannel <- update
default:
if len(u.eventsBeforeEnabled) < cap(u.eventsBeforeEnabled) {
u.log.Info("received a status update while disabled, storing for later", "event", update.NamespacedName)
u.eventsBeforeEnabled <- update
} else {
// If the buffer is full, drop the event to avoid blocking the sender.
u.log.Error(errors.New("dropping status update, buffer full"), "event", update.NamespacedName)
}
}
}

// handleEventsReceivedBeforeEnabled sends the events received before the Updater was enabled to the update channel.
func (u *UpdateWriter) handleEventsReceivedBeforeEnabled() {
go func() {
for e := range u.eventsBeforeEnabled {
u.log.Info("sending stored status update", "event", e.NamespacedName)
u.updateChannel <- e
}
close(u.eventsBeforeEnabled)
}()
}

// isStatusEqual checks if two objects have equivalent status.
Expand Down
1 change: 1 addition & 0 deletions release-notes/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ bug fixes: |
Fixed failed to update SecurityPolicy resources with the `backendRef` field specified
Fixed Envoy rejecting TCP Listeners that have no attached TCPRoutes
Fixed xDS translation failed when oidc tokenEndpoint and jwt remoteJWKS are specified in the same SecurityPolicy and using the same hostname
Fixed frequent 503 errors when connecting to a Service experiencing high Pod churn
# Enhancements that improve performance.
performance improvements: |
Expand Down

0 comments on commit 8ec3095

Please sign in to comment.