Skip to content

Commit

Permalink
fix(collector): Avoid race on upstream channel close, tidy sync points
Browse files Browse the repository at this point in the history
  • Loading branch information
pdf committed Sep 13, 2021
1 parent 20182da commit e6fbdf5
Showing 1 changed file with 24 additions and 23 deletions.
47 changes: 24 additions & 23 deletions collector/zfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,21 +87,15 @@ func (c *ZFS) Collect(ch chan<- prometheus.Metric) {
wg.Add(len(c.Collectors))
// Synchonize after timeout event, ensuring no writers are still active when we return control.
timeout := make(chan struct{})
done := make(chan struct{})
timeoutMutex := sync.Mutex{}

// Upon exceeding deadline, send cached data for any metrics that have not already been reported.
go func() {
<-ctx.Done()
if err := ctx.Err(); err != nil && err != context.Canceled {
timeoutMutex.Lock()
c.cache.merge(cache)
cacheIndex := cache.index()
c.sendCached(ch, cacheIndex)
close(timeout) // assert timeout for flow control in other goroutines
timeoutMutex.Unlock()
finalized := make(chan struct{})
finalize := func() {
select {
case <-finalized:
default:
close(finalized)
}
}()

}

// Close the proxy channel upon collector completion.
go func() {
Expand All @@ -112,20 +106,17 @@ func (c *ZFS) Collect(ch chan<- prometheus.Metric) {
// Cache metrics as they come in via the proxy channel, and ship them out if we've not exceeded the deadline.
go func() {
for metric := range proxy {
timeoutMutex.Lock()
cache.add(metric)
select {
case <-timeout:
timeoutMutex.Unlock()
continue
finalize()
default:
ch <- metric.prometheus
timeoutMutex.Unlock()
}
}
// Signal completion and update full cache.
c.cache.replace(cache)
close(done)
cancel()
// Notify next collection that we're ready to collect again
c.ready <- struct{}{}
}()
Expand All @@ -145,6 +136,7 @@ func (c *ZFS) Collect(ch chan<- prometheus.Metric) {
collector, err := state.factory(c.logger, c.client, strings.Split(*state.Properties, `,`))
if err != nil {
_ = level.Error(c.logger).Log("Error instantiating collector", "collector", name, "err", err)
wg.Done()
continue
}
go func(name string, collector Collector) {
Expand All @@ -153,11 +145,20 @@ func (c *ZFS) Collect(ch chan<- prometheus.Metric) {
}(name, collector)
}

// Wait for either timeout or completion.
select {
case <-timeout:
case <-done:
// Wait for completion or timeout
<-ctx.Done()
err = ctx.Err()
if err == context.Canceled {
finalize()
} else if err != nil {
// Upon exceeding deadline, send cached data for any metrics that have not already been reported.
close(timeout) // assert timeout for flow control in other goroutines
c.cache.merge(cache)
cacheIndex := cache.index()
c.sendCached(ch, cacheIndex)
}
// Ensure there are no in-flight writes to the upstream channel
<-finalized
}

// sendCached values that do not appear in the current cacheIndex.
Expand Down

0 comments on commit e6fbdf5

Please sign in to comment.