diff --git a/internal/provider/kubernetes/status_updater.go b/internal/provider/kubernetes/status_updater.go index fbb5ada36a1..188c941e4c4 100644 --- a/internal/provider/kubernetes/status_updater.go +++ b/internal/provider/kubernetes/status_updater.go @@ -71,10 +71,10 @@ func NewUpdateHandler(log logr.Logger, client client.Client) *UpdateHandler { sendUpdates: sendUpdates, updateChannel: updateChannel, writer: &UpdateWriter{ - log: log, - enabled: sendUpdates, - updateChannel: updateChannel, - missedEvents: make(chan Update, 1000), + log: log, + enabled: sendUpdates, + updateChannel: updateChannel, + eventsBeforeEnabled: make(chan Update, 1000), }, } } @@ -141,7 +141,7 @@ func (u *UpdateHandler) Start(ctx context.Context) error { // Enable Updaters to start sending updates to this handler. close(u.sendUpdates) - u.writer.start() + u.writer.handleEventsReceivedBeforeEnabled() for { select { @@ -168,10 +168,12 @@ type Updater interface { // UpdateWriter takes status updates and sends these to the UpdateHandler via a channel. type UpdateWriter struct { - log logr.Logger - enabled <-chan struct{} - updateChannel chan<- Update - missedEvents chan Update + log logr.Logger + enabled <-chan struct{} + updateChannel chan<- Update + // a temporary buffer to store events received before the Updater is enabled. + // These events will be sent to the update channel once the Updater is enabled. + eventsBeforeEnabled chan Update } // Send sends the given Update off to the update channel for writing by the UpdateHandler. @@ -181,23 +183,23 @@ func (u *UpdateWriter) Send(update Update) { case <-u.enabled: u.updateChannel <- update default: - if len(u.missedEvents) < cap(u.missedEvents) { + if len(u.eventsBeforeEnabled) < cap(u.eventsBeforeEnabled) { u.log.Info("received a status update while disabled, storing for later", "event", update.NamespacedName) - u.missedEvents <- update + u.eventsBeforeEnabled <- update } else { u.log.Error(errors.New("dropping status update, buffer full"), "event", update.NamespacedName) } } } -// start runs the goroutine to send the events received before the Updater was enabled to the update channel. -func (u *UpdateWriter) start() { +// handleEventsReceivedBeforeEnabled sends the events received before the Updater was enabled to the update channel. +func (u *UpdateWriter) handleEventsReceivedBeforeEnabled() { go func() { - for e := range u.missedEvents { + for e := range u.eventsBeforeEnabled { u.log.Info("sending stored status update", "event", e.NamespacedName) u.updateChannel <- e } - close(u.missedEvents) + close(u.eventsBeforeEnabled) }() }