Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prepare delayed downscale #131

Merged
merged 10 commits into from
Feb 7, 2024
2 changes: 1 addition & 1 deletion cmd/rollout-operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func main() {
maybeStartTLSServer(cfg, logger, kubeClient, restart, metrics)

// Init the controller.
c := controller.NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, cfg.kubeNamespace, cfg.reconcileInterval, reg, logger)
c := controller.NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, cfg.kubeNamespace, httpClient, cfg.reconcileInterval, reg, logger)
check(errors.Wrap(c.Init(), "failed to init controller"))

// Listen to sigterm, as well as for restart (like for certificate renewal).
Expand Down
6 changes: 6 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,10 @@ const (
RolloutMirrorReplicasFromResourceNameAnnotationKey = rolloutMirrorReplicasFromResourceAnnotationKeyPrefix + "-name"
RolloutMirrorReplicasFromResourceKindAnnotationKey = rolloutMirrorReplicasFromResourceAnnotationKeyPrefix + "-kind"
RolloutMirrorReplicasFromResourceAPIVersionAnnotationKey = rolloutMirrorReplicasFromResourceAnnotationKeyPrefix + "-api-version" // optional

// RolloutDelayedDownscaleAnnotationKey configures delay for downscaling. Prepare-url must be configured as well, and must support GET, POST and DELETE methods.
RolloutDelayedDownscaleAnnotationKey = "grafana.com/rollout-delayed-downscale"

// RolloutDelayedDownscalePrepareUrlAnnotationKey is a full URL to prepare-downscale endpoint. Hostname will be replaced with pod's fully qualified domain name.
RolloutDelayedDownscalePrepareUrlAnnotationKey = "grafana.com/rollout-prepare-delayed-downscale-url"
)
11 changes: 9 additions & 2 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package controller
import (
"context"
"fmt"
"net/http"
"strings"
"time"

Expand Down Expand Up @@ -39,6 +40,10 @@ const (
informerSyncInterval = 5 * time.Minute
)

type httpClient interface {
Do(req *http.Request) (*http.Response, error)
}

type RolloutController struct {
kubeClient kubernetes.Interface
namespace string
Expand All @@ -52,6 +57,7 @@ type RolloutController struct {
restMapper meta.RESTMapper
scaleClient scale.ScalesGetter
dynamicClient dynamic.Interface
httpClient httpClient
logger log.Logger

// This bool is true if we should trigger a reconcile.
Expand All @@ -71,7 +77,7 @@ type RolloutController struct {
discoveredGroups map[string]struct{}
}

func NewRolloutController(kubeClient kubernetes.Interface, restMapper meta.RESTMapper, scaleClient scale.ScalesGetter, dynamic dynamic.Interface, namespace string, reconcileInterval time.Duration, reg prometheus.Registerer, logger log.Logger) *RolloutController {
func NewRolloutController(kubeClient kubernetes.Interface, restMapper meta.RESTMapper, scaleClient scale.ScalesGetter, dynamic dynamic.Interface, namespace string, client httpClient, reconcileInterval time.Duration, reg prometheus.Registerer, logger log.Logger) *RolloutController {
namespaceOpt := informers.WithNamespace(namespace)

// Initialise the StatefulSet informer to restrict the returned StatefulSets to only the ones
Expand Down Expand Up @@ -99,6 +105,7 @@ func NewRolloutController(kubeClient kubernetes.Interface, restMapper meta.RESTM
restMapper: restMapper,
scaleClient: scaleClient,
dynamicClient: dynamic,
httpClient: client,
logger: logger,
stopCh: make(chan struct{}),
discoveredGroups: map[string]struct{}{},
Expand Down Expand Up @@ -323,7 +330,7 @@ func (c *RolloutController) adjustStatefulSetsGroupReplicas(ctx context.Context,
return updated, err
}

return c.adjustStatefulSetsGroupReplicasToMirrorResource(ctx, groupName, sets)
return c.adjustStatefulSetsGroupReplicasToMirrorResource(ctx, groupName, sets, c.httpClient)
}

// adjustStatefulSetsGroupReplicasToFollowLeader examines each StatefulSet and adjusts the number of replicas if desired,
Expand Down
439 changes: 384 additions & 55 deletions pkg/controller/controller_test.go

Large diffs are not rendered by default.

14 changes: 13 additions & 1 deletion pkg/controller/custom_resource_replicas.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"github.com/grafana/rollout-operator/pkg/config"
)

func (c *RolloutController) adjustStatefulSetsGroupReplicasToMirrorResource(ctx context.Context, groupName string, sets []*appsv1.StatefulSet) (bool, error) {
func (c *RolloutController) adjustStatefulSetsGroupReplicasToMirrorResource(ctx context.Context, groupName string, sets []*appsv1.StatefulSet, client httpClient) (bool, error) {
// Return early no matter what after scaling up or down a single StatefulSet to make sure that rollout-operator
// works with up-to-date models.
for _, sts := range sets {
Expand All @@ -37,10 +37,22 @@ func (c *RolloutController) adjustStatefulSetsGroupReplicasToMirrorResource(ctx
desiredReplicas := scaleObj.Spec.Replicas
if currentReplicas == desiredReplicas {
updateStatusReplicasOnReferenceResourceIfNeeded(ctx, c.logger, c.dynamicClient, sts, scaleObj, referenceGVR, referenceName, desiredReplicas)
cancelDelayedDownscaleIfConfigured(ctx, c.logger, sts, client, desiredReplicas)
// No change in the number of replicas: don't log because this will be the result most of the time.
continue
}

// We're going to change number of replicas on the statefulset.
// If there is delayed downscale configured on the statefulset, we will first handle delay part, and only if that succeeds,
// continue with downscaling or upscaling.
if err := checkScalingDelay(ctx, c.logger, sts, client, currentReplicas, desiredReplicas); err != nil {
level.Warn(c.logger).Log("msg", "not scaling statefulset due to failed scaling delay check", "group", groupName, "name", sts.GetName(), "currentReplicas", currentReplicas, "desiredReplicas", desiredReplicas, "err", err)

// If delay has not been reached, we can check next statefulset.
Copy link
Collaborator

@pracucci pracucci Feb 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can end up in a situation where different zone statefulsets are scaled down at different times. When this happens, what's the impact on the INACTIVE partitions? We're going to lose some replicas for a partition which is still INACTIVE in the ring and potentially queried 🤔

Maybe the solution is to simply configure a scale down delay > than the ring lookback, so that when replicas being to scale down their partitions aren't queried anymore since "some time".

Makes sense?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can end up in a situation where different zone statefulsets are scaled down at different times.

For project sigyn I imagined that we would configure delayed-scaledown on zone-a, and then configure other zones to follow replicas from zone-a. That way only zone-a would control the state of partitions (marking them inactive, or deleting).

Maybe the solution is to simply configure a scale down delay > than the ring lookback, so that when replicas being to scale down their partitions aren't queried anymore since "some time".

Makes sense?

Yes, this was my plan too.

pstibrany marked this conversation as resolved.
Show resolved Hide resolved
updateStatusReplicasOnReferenceResourceIfNeeded(ctx, c.logger, c.dynamicClient, sts, scaleObj, referenceGVR, referenceName, currentReplicas)
continue
}

direction := ""
if desiredReplicas > currentReplicas {
direction = "up"
Expand Down
252 changes: 252 additions & 0 deletions pkg/controller/delay.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,252 @@
package controller

import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"sync"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/common/model"
"golang.org/x/sync/errgroup"
v1 "k8s.io/api/apps/v1"

"github.com/grafana/rollout-operator/pkg/config"
)

func cancelDelayedDownscaleIfConfigured(ctx context.Context, logger log.Logger, sts *v1.StatefulSet, httpClient httpClient, replicas int32) {
delay, prepareURL, err := parseDelayedDownscaleAnnotations(sts.GetAnnotations())
if delay == 0 || prepareURL == nil {
return
}

if err != nil {
level.Warn(logger).Log("msg", "failed to cancel possible downscale due to error", "name", sts.GetName(), "err", err)
return
}

endpoints := createEndpoints(sts.Namespace, sts.GetName(), 0, int(replicas), prepareURL)

callCancelDelayedDownscale(ctx, logger, httpClient, endpoints)
}

func checkScalingDelay(ctx context.Context, logger log.Logger, sts *v1.StatefulSet, httpClient httpClient, currentReplicas, desiredReplicas int32) error {
if currentReplicas == desiredReplicas {
// should not happen
return nil
}

delay, prepareURL, err := parseDelayedDownscaleAnnotations(sts.GetAnnotations())
if delay == 0 || prepareURL == nil || err != nil {
return err
}

if desiredReplicas > currentReplicas {
pstibrany marked this conversation as resolved.
Show resolved Hide resolved
callCancelDelayedDownscale(ctx, logger, httpClient, createEndpoints(sts.Namespace, sts.GetName(), 0, int(currentReplicas), prepareURL))
// Proceed even if calling cancel of delayed downscale fails. We call cancellation repeatedly, so it will happen during next reconcile.
return nil
}

{
// Replicas in [0, desired) interval should cancel any delayed downscale, if they have any.
cancelEndpoints := createEndpoints(sts.Namespace, sts.GetName(), 0, int(desiredReplicas), prepareURL)
callCancelDelayedDownscale(ctx, logger, httpClient, cancelEndpoints)
}

// Replicas in [desired, current) interval are going to be stopped.
downscaleEndpoints := createEndpoints(sts.Namespace, sts.GetName(), int(desiredReplicas), int(currentReplicas), prepareURL)
maxPrepareTime, err := callPrepareDownscaleAndReturnMaxPrepareTimestamp(ctx, logger, httpClient, downscaleEndpoints)
if err != nil {
return fmt.Errorf("failed prepare pods for delayed downscale: %v", err)
}

elapsedSinceMaxTime := time.Since(maxPrepareTime)
if elapsedSinceMaxTime < delay {
return fmt.Errorf("configured downscale delay %v has not been reached for all pods. elapsed time: %v", delay, elapsedSinceMaxTime)
}

// We can proceed with downscale!
level.Info(logger).Log("msg", "downscale delay has been reached on all downscaled pods, proceeding with downscale", "name", sts.GetName(), "delay", delay, "elapsed", elapsedSinceMaxTime)
pstibrany marked this conversation as resolved.
Show resolved Hide resolved
return nil
}

func parseDelayedDownscaleAnnotations(annotations map[string]string) (time.Duration, *url.URL, error) {
delayStr := annotations[config.RolloutDelayedDownscaleAnnotationKey]
urlStr := annotations[config.RolloutDelayedDownscalePrepareUrlAnnotationKey]

if delayStr == "" || urlStr == "" {
return 0, nil, nil
}

d, err := model.ParseDuration(delayStr)
if err != nil {
return 0, nil, fmt.Errorf("failed to parse %s annotation value as duration: %v", config.RolloutDelayedDownscaleAnnotationKey, err)
}
if d < 0 {
return 0, nil, fmt.Errorf("negative value of %s annotation: %v", config.RolloutDelayedDownscaleAnnotationKey, delayStr)
}

delay := time.Duration(d)

u, err := url.Parse(urlStr)
if err != nil {
return 0, nil, fmt.Errorf("failed to parse %s annotation value as URL: %v", config.RolloutDelayedDownscalePrepareUrlAnnotationKey, err)
}

return delay, u, nil
}

type endpoint struct {
namespace string
podName string
url url.URL
index int
}

// Create prepare-downscale endpoints for pods with index in [from, to) range.
func createEndpoints(namespace, serviceName string, from, to int, url *url.URL) []endpoint {
pstibrany marked this conversation as resolved.
Show resolved Hide resolved
eps := make([]endpoint, 0, to-from)

// The DNS entry for a pod of a stateful set is
// ingester-zone-a-0.$(servicename).$(namespace).svc.cluster.local
// The service in this case is ingester-zone-a as well.
// https://kubernetes.io/docs/concepts/workloads/controllers/statefulset/#stable-network-id

for index := from; index < to; index++ {
ep := endpoint{
namespace: namespace,
podName: fmt.Sprintf("%v-%v", serviceName, index),
index: index,
}

ep.url = *url
ep.url.Host = fmt.Sprintf("%s.%v.%v.svc.cluster.local", ep.podName, serviceName, ep.namespace)
pstibrany marked this conversation as resolved.
Show resolved Hide resolved

eps = append(eps, ep)
}

return eps
}

func callPrepareDownscaleAndReturnMaxPrepareTimestamp(ctx context.Context, logger log.Logger, client httpClient, endpoints []endpoint) (time.Time, error) {
if len(endpoints) == 0 {
return time.Now(), nil
pstibrany marked this conversation as resolved.
Show resolved Hide resolved
}

var (
maxTimeMu sync.Mutex
maxTime time.Time
)

type expectedResponse struct {
Timestamp int64 `json:"timestamp"`
}

g, ctx := errgroup.WithContext(ctx)
pstibrany marked this conversation as resolved.
Show resolved Hide resolved
for ix := range endpoints {
ep := endpoints[ix]
g.Go(func() error {
target := ep.url.String()

epLogger := log.With(logger, "pod", ep.podName, "url", target)

// POST -- prepare for delayed downscale, if not yet prepared, and return timestamp when prepare was called.
req, err := http.NewRequestWithContext(ctx, http.MethodPost, target, nil)
if err != nil {
level.Error(epLogger).Log("msg", "error creating HTTP POST request to endpoint", "err", err)
return err
}

resp, err := client.Do(req)
if err != nil {
level.Error(epLogger).Log("error sending HTTP POST request to endpoint", "err", err)
return err
}

defer resp.Body.Close()

body, readError := io.ReadAll(resp.Body)
if readError != nil {
level.Error(epLogger).Log("msg", "error reading response from HTTP POST request to endpoint", "err", err)
pstibrany marked this conversation as resolved.
Show resolved Hide resolved
return err
}

if resp.StatusCode/100 != 2 {
err := errors.New("HTTP DELETE request returned non-2xx status code")
level.Error(epLogger).Log("msg", "unexpected status code returned when calling DELETE on endpoint", "status", resp.StatusCode, "response_body", string(body))
return errors.Join(err, readError)
pstibrany marked this conversation as resolved.
Show resolved Hide resolved
}

r := expectedResponse{}
if err := json.Unmarshal(body, &r); err != nil {
level.Error(epLogger).Log("msg", "error decoding response from HTTP POST request to endpoint", "err", err)
return err
}

if r.Timestamp == 0 {
level.Error(epLogger).Log("msg", "invalid response from HTTP POST request to endpoint: no timestamp")
return fmt.Errorf("no timestamp in response")
}

t := time.Unix(r.Timestamp, 0)

maxTimeMu.Lock()
if t.After(maxTime) {
maxTime = t
}
maxTimeMu.Unlock()

level.Debug(epLogger).Log("msg", "HTTP POST request to endpoint succeded", "timestamp", t.UTC().Format(time.RFC3339))
return nil
})
}
err := g.Wait()
return maxTime, err
}

func callCancelDelayedDownscale(ctx context.Context, logger log.Logger, client httpClient, endpoints []endpoint) {
if len(endpoints) == 0 {
return
}

g, _ := errgroup.WithContext(ctx)
pstibrany marked this conversation as resolved.
Show resolved Hide resolved
for ix := range endpoints {
ep := endpoints[ix]
g.Go(func() error {
target := ep.url.String()

epLogger := log.With(logger, "pod", ep.podName, "url", target)

req, err := http.NewRequestWithContext(ctx, http.MethodDelete, target, nil)
if err != nil {
level.Error(epLogger).Log("msg", "error creating HTTP DELETE request to endpoint", "err", err)
return err
}

resp, err := client.Do(req)
if err != nil {
level.Error(epLogger).Log("msg", "error sending HTTP DELETE request to endpoint", "err", err)
return err
}

defer resp.Body.Close()

if resp.StatusCode/100 != 2 {
err := errors.New("HTTP DELETE request returned non-2xx status code")
body, readError := io.ReadAll(resp.Body)
level.Error(epLogger).Log("msg", "unexpected status code returned when calling DELETE on endpoint", "status", resp.StatusCode, "response_body", string(body))
return errors.Join(err, readError)
}
level.Debug(epLogger).Log("msg", "HTTP DELETE request to endpoint succeeded")
return nil
})
}
// We ignore errors, since all errors are already logged, and callers don't need i
pstibrany marked this conversation as resolved.
Show resolved Hide resolved
_ = g.Wait()
}
18 changes: 13 additions & 5 deletions pkg/instrumentation/kubernetes_api_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,17 @@ func InstrumentKubernetesAPIClient(cfg *rest.Config, reg prometheus.Registerer)
)

cfg.Wrap(func(rt http.RoundTripper) http.RoundTripper {
return &kubernetesAPIClientInstrumentation{
next: &nethttp.Transport{RoundTripper: rt},
hist: hist,
}
return newInstrumentation(rt, hist)
})
}

func newInstrumentation(rt http.RoundTripper, hist *prometheus.HistogramVec) *kubernetesAPIClientInstrumentation {
return &kubernetesAPIClientInstrumentation{
next: &nethttp.Transport{RoundTripper: rt},
hist: hist,
}
}

func (k *kubernetesAPIClientInstrumentation) RoundTrip(req *http.Request) (*http.Response, error) {
start := time.Now()

Expand All @@ -46,7 +50,11 @@ func (k *kubernetesAPIClientInstrumentation) RoundTrip(req *http.Request) (*http

resp, err := k.next.RoundTrip(req)
duration := time.Since(start)
instrument.ObserveWithExemplar(req.Context(), k.hist.WithLabelValues(urlToResourceDescription(req.URL.EscapedPath()), req.Method, strconv.Itoa(resp.StatusCode)), duration.Seconds())
statusCode := 0
pstibrany marked this conversation as resolved.
Show resolved Hide resolved
if resp != nil {
statusCode = resp.StatusCode
}
instrument.ObserveWithExemplar(req.Context(), k.hist.WithLabelValues(urlToResourceDescription(req.URL.EscapedPath()), req.Method, strconv.Itoa(statusCode)), duration.Seconds())

return resp, err
}
Expand Down
Loading
Loading