Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(discovery): configure sharding every time MetricsHandler.Run runs #2478

Merged
22 changes: 2 additions & 20 deletions internal/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,10 +203,6 @@ func (r *CRDiscoverer) PollForCacheUpdates(
) {
// The interval at which we will check the cache for updates.
t := time.NewTicker(Interval)
// Track previous context to allow refreshing cache.
olderContext, olderCancel := context.WithCancel(ctx)
// Prevent context leak (kill the last metric handler instance).
defer olderCancel()
generateMetrics := func() {
// Get families for discovered factories.
customFactories, err := factoryGenerator()
Expand Down Expand Up @@ -239,21 +235,8 @@ func (r *CRDiscoverer) PollForCacheUpdates(
r.SafeWrite(func() {
r.WasUpdated = false
})
// Run the metrics handler with updated configs.
olderContext, olderCancel = context.WithCancel(ctx)
go func() {
// Blocks indefinitely until the unbuffered context is cancelled to serve metrics for that duration.
err = m.Run(olderContext)
if err != nil {
// Check if context was cancelled.
select {
case <-olderContext.Done():
// Context cancelled, don't really need to log this though.
default:
klog.ErrorS(err, "failed to run metrics handler")
}
}
}()
// Update metric handler with the new configs.
m.ReconfigureSharding(ctx)
wallee94 marked this conversation as resolved.
Show resolved Hide resolved
}
go func() {
for range t.C {
Expand All @@ -269,7 +252,6 @@ func (r *CRDiscoverer) PollForCacheUpdates(
shouldGenerateMetrics = r.WasUpdated
})
if shouldGenerateMetrics {
olderCancel()
dgrisonnet marked this conversation as resolved.
Show resolved Hide resolved
generateMetrics()
klog.InfoS("discovery finished, cache updated")
}
Expand Down
14 changes: 6 additions & 8 deletions pkg/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,14 +297,12 @@ func RunKubeStateMetrics(ctx context.Context, opts *options.Options) error {
opts.EnableGZIPEncoding,
)
// Run MetricsHandler
if config == nil {
ctxMetricsHandler, cancel := context.WithCancel(ctx)
g.Add(func() error {
return m.Run(ctxMetricsHandler)
}, func(error) {
cancel()
})
}
ctxMetricsHandler, cancel := context.WithCancel(ctx)
g.Add(func() error {
return m.Run(ctxMetricsHandler)
}, func(error) {
cancel()
})

tlsConfig := opts.TLSConfig

Expand Down
14 changes: 13 additions & 1 deletion pkg/metricshandler/metrics_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,19 @@ func New(opts *options.Options, kubeClient kubernetes.Interface, storeBuilder ks
}
}

// ConfigureSharding (re-)configures sharding. Re-configuration can be done
// ReconfigureSharding reconfigures sharding with the current shard and totalShards, and
// it's a no-op if both values are 0.
func (m *MetricsHandler) ReconfigureSharding(ctx context.Context) {
m.mtx.RLock()
hasShardsSet := m.curShard != 0 || m.curTotalShards != 0
m.mtx.RUnlock()
if !hasShardsSet {
return
}
m.ConfigureSharding(ctx, m.curShard, m.curTotalShards)
}

// ConfigureSharding configures sharding. Configuration can be used mutlitple times and
wallee94 marked this conversation as resolved.
Show resolved Hide resolved
// concurrently.
func (m *MetricsHandler) ConfigureSharding(ctx context.Context, shard int32, totalShards int) {
m.mtx.Lock()
Expand Down