Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(discovery): configure sharding every time MetricsHandler.Run runs #2478

Merged
22 changes: 2 additions & 20 deletions internal/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,10 +203,6 @@ func (r *CRDiscoverer) PollForCacheUpdates(
) {
// The interval at which we will check the cache for updates.
t := time.NewTicker(Interval)
// Track previous context to allow refreshing cache.
olderContext, olderCancel := context.WithCancel(ctx)
// Prevent context leak (kill the last metric handler instance).
defer olderCancel()
generateMetrics := func() {
// Get families for discovered factories.
customFactories, err := factoryGenerator()
Expand Down Expand Up @@ -239,21 +235,8 @@ func (r *CRDiscoverer) PollForCacheUpdates(
r.SafeWrite(func() {
r.WasUpdated = false
})
// Run the metrics handler with updated configs.
olderContext, olderCancel = context.WithCancel(ctx)
go func() {
// Blocks indefinitely until the unbuffered context is cancelled to serve metrics for that duration.
err = m.Run(olderContext)
if err != nil {
// Check if context was cancelled.
select {
case <-olderContext.Done():
// Context cancelled, don't really need to log this though.
default:
klog.ErrorS(err, "failed to run metrics handler")
}
}
}()
// Update metric handler with the new configs.
m.BuildWriters(ctx)
}
go func() {
for range t.C {
Expand All @@ -269,7 +252,6 @@ func (r *CRDiscoverer) PollForCacheUpdates(
shouldGenerateMetrics = r.WasUpdated
})
if shouldGenerateMetrics {
olderCancel()
dgrisonnet marked this conversation as resolved.
Show resolved Hide resolved
generateMetrics()
klog.InfoS("discovery finished, cache updated")
}
Expand Down
14 changes: 6 additions & 8 deletions pkg/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,14 +286,12 @@ func RunKubeStateMetrics(ctx context.Context, opts *options.Options) error {
opts.EnableGZIPEncoding,
)
// Run MetricsHandler
if config == nil {
ctxMetricsHandler, cancel := context.WithCancel(ctx)
g.Add(func() error {
return m.Run(ctxMetricsHandler)
}, func(error) {
cancel()
})
}
ctxMetricsHandler, cancel := context.WithCancel(ctx)
g.Add(func() error {
return m.Run(ctxMetricsHandler)
}, func(error) {
cancel()
})

tlsConfig := opts.TLSConfig

Expand Down
25 changes: 18 additions & 7 deletions pkg/metricshandler/metrics_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,24 +69,35 @@ func New(opts *options.Options, kubeClient kubernetes.Interface, storeBuilder ks
}
}

// ConfigureSharding (re-)configures sharding. Re-configuration can be done
// concurrently.
func (m *MetricsHandler) ConfigureSharding(ctx context.Context, shard int32, totalShards int) {
// BuildWriters builds the metrics writers, cancelling any previous context and passing a new one on every build.
// Build can be used multiple times and concurrently.
func (m *MetricsHandler) BuildWriters(ctx context.Context) {
m.mtx.Lock()
defer m.mtx.Unlock()

if m.cancel != nil {
m.cancel()
}
if totalShards != 1 {
klog.InfoS("Configuring sharding of this instance to be shard index (zero-indexed) out of total shards", "shard", shard, "totalShards", totalShards)
}
ctx, m.cancel = context.WithCancel(ctx)
m.storeBuilder.WithSharding(shard, totalShards)
m.storeBuilder.WithContext(ctx)
m.metricsWriters = m.storeBuilder.Build()
}

// ConfigureSharding configures sharding. Configuration can be used multiple times and
// concurrently.
func (m *MetricsHandler) ConfigureSharding(ctx context.Context, shard int32, totalShards int) {
m.mtx.Lock()

if totalShards != 1 {
klog.InfoS("Configuring sharding of this instance to be shard index (zero-indexed) out of total shards", "shard", shard, "totalShards", totalShards)
}
m.curShard = shard
m.curTotalShards = totalShards
m.storeBuilder.WithSharding(shard, totalShards)

// unlock because BuildWriters will hold a lock again
m.mtx.Unlock()
m.BuildWriters(ctx)
}

// Run configures the MetricsHandler's sharding and if autosharding is enabled
Expand Down
Loading