Skip to content

Commit

Permalink
rename method
Browse files Browse the repository at this point in the history
Signed-off-by: Huabing Zhao <[email protected]>
  • Loading branch information
zhaohuabing committed Nov 26, 2024
1 parent bce5a67 commit a2d7838
Showing 1 changed file with 17 additions and 15 deletions.
32 changes: 17 additions & 15 deletions internal/provider/kubernetes/status_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,10 @@ func NewUpdateHandler(log logr.Logger, client client.Client) *UpdateHandler {
sendUpdates: sendUpdates,
updateChannel: updateChannel,
writer: &UpdateWriter{
log: log,
enabled: sendUpdates,
updateChannel: updateChannel,
missedEvents: make(chan Update, 1000),
log: log,
enabled: sendUpdates,
updateChannel: updateChannel,
eventsBeforeEnabled: make(chan Update, 1000),
},
}
}
Expand Down Expand Up @@ -141,7 +141,7 @@ func (u *UpdateHandler) Start(ctx context.Context) error {

// Enable Updaters to start sending updates to this handler.
close(u.sendUpdates)
u.writer.start()
u.writer.handleEventsReceivedBeforeEnabled()

for {
select {
Expand All @@ -168,10 +168,12 @@ type Updater interface {

// UpdateWriter takes status updates and sends these to the UpdateHandler via a channel.
type UpdateWriter struct {
log logr.Logger
enabled <-chan struct{}
updateChannel chan<- Update
missedEvents chan Update
log logr.Logger
enabled <-chan struct{}
updateChannel chan<- Update
// a temporary buffer to store events received before the Updater is enabled.
// These events will be sent to the update channel once the Updater is enabled.
eventsBeforeEnabled chan Update
}

// Send sends the given Update off to the update channel for writing by the UpdateHandler.
Expand All @@ -181,23 +183,23 @@ func (u *UpdateWriter) Send(update Update) {
case <-u.enabled:
u.updateChannel <- update
default:
if len(u.missedEvents) < cap(u.missedEvents) {
if len(u.eventsBeforeEnabled) < cap(u.eventsBeforeEnabled) {
u.log.Info("received a status update while disabled, storing for later", "event", update.NamespacedName)
u.missedEvents <- update
u.eventsBeforeEnabled <- update
} else {
u.log.Error(errors.New("dropping status update, buffer full"), "event", update.NamespacedName)
}
}
}

// start runs the goroutine to send the events received before the Updater was enabled to the update channel.
func (u *UpdateWriter) start() {
// handleEventsReceivedBeforeEnabled sends the events received before the Updater was enabled to the update channel.
func (u *UpdateWriter) handleEventsReceivedBeforeEnabled() {
go func() {
for e := range u.missedEvents {
for e := range u.eventsBeforeEnabled {
u.log.Info("sending stored status update", "event", e.NamespacedName)
u.updateChannel <- e
}
close(u.missedEvents)
close(u.eventsBeforeEnabled)
}()
}

Expand Down

0 comments on commit a2d7838

Please sign in to comment.