Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prepare delayed downscale #131

Merged
merged 10 commits into from
Feb 7, 2024
2 changes: 1 addition & 1 deletion cmd/rollout-operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func main() {
maybeStartTLSServer(cfg, logger, kubeClient, restart, metrics)

// Init the controller.
c := controller.NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, cfg.kubeNamespace, cfg.reconcileInterval, reg, logger)
c := controller.NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, cfg.kubeNamespace, httpClient, cfg.reconcileInterval, reg, logger)
check(errors.Wrap(c.Init(), "failed to init controller"))

// Listen to sigterm, as well as for restart (like for certificate renewal).
Expand Down
6 changes: 6 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,10 @@ const (
RolloutMirrorReplicasFromResourceNameAnnotationKey = rolloutMirrorReplicasFromResourceAnnotationKeyPrefix + "-name"
RolloutMirrorReplicasFromResourceKindAnnotationKey = rolloutMirrorReplicasFromResourceAnnotationKeyPrefix + "-kind"
RolloutMirrorReplicasFromResourceAPIVersionAnnotationKey = rolloutMirrorReplicasFromResourceAnnotationKeyPrefix + "-api-version" // optional

// RolloutDelayedDownscaleAnnotationKey configures delay for downscaling. Prepare-url must be configured as well, and must support GET, POST and DELETE methods.
RolloutDelayedDownscaleAnnotationKey = "grafana.com/rollout-delayed-downscale"

// RolloutDelayedDownscalePrepareUrlAnnotationKey is a full URL to prepare-downscale endpoint. Hostname will be replaced with pod's fully qualified domain name.
RolloutDelayedDownscalePrepareUrlAnnotationKey = "grafana.com/rollout-prepare-delayed-downscale-url"
)
11 changes: 9 additions & 2 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package controller
import (
"context"
"fmt"
"net/http"
"strings"
"time"

Expand Down Expand Up @@ -39,6 +40,10 @@ const (
informerSyncInterval = 5 * time.Minute
)

type httpClient interface {
Do(req *http.Request) (*http.Response, error)
}

type RolloutController struct {
kubeClient kubernetes.Interface
namespace string
Expand All @@ -52,6 +57,7 @@ type RolloutController struct {
restMapper meta.RESTMapper
scaleClient scale.ScalesGetter
dynamicClient dynamic.Interface
httpClient httpClient
logger log.Logger

// This bool is true if we should trigger a reconcile.
Expand All @@ -71,7 +77,7 @@ type RolloutController struct {
discoveredGroups map[string]struct{}
}

func NewRolloutController(kubeClient kubernetes.Interface, restMapper meta.RESTMapper, scaleClient scale.ScalesGetter, dynamic dynamic.Interface, namespace string, reconcileInterval time.Duration, reg prometheus.Registerer, logger log.Logger) *RolloutController {
func NewRolloutController(kubeClient kubernetes.Interface, restMapper meta.RESTMapper, scaleClient scale.ScalesGetter, dynamic dynamic.Interface, namespace string, client httpClient, reconcileInterval time.Duration, reg prometheus.Registerer, logger log.Logger) *RolloutController {
namespaceOpt := informers.WithNamespace(namespace)

// Initialise the StatefulSet informer to restrict the returned StatefulSets to only the ones
Expand Down Expand Up @@ -99,6 +105,7 @@ func NewRolloutController(kubeClient kubernetes.Interface, restMapper meta.RESTM
restMapper: restMapper,
scaleClient: scaleClient,
dynamicClient: dynamic,
httpClient: client,
logger: logger,
stopCh: make(chan struct{}),
discoveredGroups: map[string]struct{}{},
Expand Down Expand Up @@ -323,7 +330,7 @@ func (c *RolloutController) adjustStatefulSetsGroupReplicas(ctx context.Context,
return updated, err
}

return c.adjustStatefulSetsGroupReplicasToMirrorResource(ctx, groupName, sets)
return c.adjustStatefulSetsGroupReplicasToMirrorResource(ctx, groupName, sets, c.httpClient)
}

// adjustStatefulSetsGroupReplicasToFollowLeader examines each StatefulSet and adjusts the number of replicas if desired,
Expand Down
439 changes: 384 additions & 55 deletions pkg/controller/controller_test.go

Large diffs are not rendered by default.

14 changes: 13 additions & 1 deletion pkg/controller/custom_resource_replicas.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"github.com/grafana/rollout-operator/pkg/config"
)

func (c *RolloutController) adjustStatefulSetsGroupReplicasToMirrorResource(ctx context.Context, groupName string, sets []*appsv1.StatefulSet) (bool, error) {
func (c *RolloutController) adjustStatefulSetsGroupReplicasToMirrorResource(ctx context.Context, groupName string, sets []*appsv1.StatefulSet, client httpClient) (bool, error) {
// Return early no matter what after scaling up or down a single StatefulSet to make sure that rollout-operator
// works with up-to-date models.
for _, sts := range sets {
Expand All @@ -37,10 +37,22 @@ func (c *RolloutController) adjustStatefulSetsGroupReplicasToMirrorResource(ctx
desiredReplicas := scaleObj.Spec.Replicas
if currentReplicas == desiredReplicas {
updateStatusReplicasOnReferenceResourceIfNeeded(ctx, c.logger, c.dynamicClient, sts, scaleObj, referenceGVR, referenceName, desiredReplicas)
cancelDelayedDownscaleIfConfigured(ctx, c.logger, sts, client, desiredReplicas)
// No change in the number of replicas: don't log because this will be the result most of the time.
continue
}

// We're going to change number of replicas on the statefulset.
// If there is delayed downscale configured on the statefulset, we will first handle delay part, and only if that succeeds,
// continue with downscaling or upscaling.
if err := checkScalingDelay(ctx, c.logger, sts, client, currentReplicas, desiredReplicas); err != nil {
level.Warn(c.logger).Log("msg", "not scaling statefulset due to failed scaling delay check", "group", groupName, "name", sts.GetName(), "currentReplicas", currentReplicas, "desiredReplicas", desiredReplicas, "err", err)

updateStatusReplicasOnReferenceResourceIfNeeded(ctx, c.logger, c.dynamicClient, sts, scaleObj, referenceGVR, referenceName, currentReplicas)
// If delay has not been reached, we can check next statefulset.
continue
}

direction := ""
if desiredReplicas > currentReplicas {
direction = "up"
Expand Down
253 changes: 253 additions & 0 deletions pkg/controller/delay.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
package controller

import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"sync"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/common/model"
"golang.org/x/sync/errgroup"
v1 "k8s.io/api/apps/v1"

"github.com/grafana/rollout-operator/pkg/config"
)

func cancelDelayedDownscaleIfConfigured(ctx context.Context, logger log.Logger, sts *v1.StatefulSet, httpClient httpClient, replicas int32) {
delay, prepareURL, err := parseDelayedDownscaleAnnotations(sts.GetAnnotations())
if delay == 0 || prepareURL == nil {
return
}

if err != nil {
level.Warn(logger).Log("msg", "failed to cancel possible downscale due to error", "name", sts.GetName(), "err", err)
return
}

endpoints := createPrepareDownscaleEndpoints(sts.Namespace, sts.GetName(), 0, int(replicas), prepareURL)

callCancelDelayedDownscale(ctx, logger, httpClient, endpoints)
}

func checkScalingDelay(ctx context.Context, logger log.Logger, sts *v1.StatefulSet, httpClient httpClient, currentReplicas, desiredReplicas int32) error {
if currentReplicas == desiredReplicas {
// should not happen
return nil
}

delay, prepareURL, err := parseDelayedDownscaleAnnotations(sts.GetAnnotations())
if delay == 0 || prepareURL == nil || err != nil {
return err
}

if desiredReplicas >= currentReplicas {
callCancelDelayedDownscale(ctx, logger, httpClient, createPrepareDownscaleEndpoints(sts.Namespace, sts.GetName(), 0, int(currentReplicas), prepareURL))
// Proceed even if calling cancel of delayed downscale fails. We call cancellation repeatedly, so it will happen during next reconcile.
return nil
}

{
// Replicas in [0, desired) interval should cancel any delayed downscale, if they have any.
cancelEndpoints := createPrepareDownscaleEndpoints(sts.Namespace, sts.GetName(), 0, int(desiredReplicas), prepareURL)
callCancelDelayedDownscale(ctx, logger, httpClient, cancelEndpoints)
}

// Replicas in [desired, current) interval are going to be stopped.
downscaleEndpoints := createPrepareDownscaleEndpoints(sts.Namespace, sts.GetName(), int(desiredReplicas), int(currentReplicas), prepareURL)
maxPrepareTime, err := callPrepareDownscaleAndReturnMaxPrepareTimestamp(ctx, logger, httpClient, downscaleEndpoints)
if err != nil {
return fmt.Errorf("failed prepare pods for delayed downscale: %v", err)
}

elapsedSinceMaxTime := time.Since(maxPrepareTime)
if elapsedSinceMaxTime < delay {
return fmt.Errorf("configured downscale delay %v has not been reached for all pods. elapsed time: %v", delay, elapsedSinceMaxTime)
}

// We can proceed with downscale!
level.Info(logger).Log("msg", "downscale delay has been reached on all downscaled pods", "name", sts.GetName(), "delay", delay, "elapsed", elapsedSinceMaxTime)
return nil
}

func parseDelayedDownscaleAnnotations(annotations map[string]string) (time.Duration, *url.URL, error) {
delayStr := annotations[config.RolloutDelayedDownscaleAnnotationKey]
urlStr := annotations[config.RolloutDelayedDownscalePrepareUrlAnnotationKey]

if delayStr == "" || urlStr == "" {
return 0, nil, nil
}

d, err := model.ParseDuration(delayStr)
if err != nil {
return 0, nil, fmt.Errorf("failed to parse %s annotation value as duration: %v", config.RolloutDelayedDownscaleAnnotationKey, err)
}
if d < 0 {
return 0, nil, fmt.Errorf("negative value of %s annotation: %v", config.RolloutDelayedDownscaleAnnotationKey, delayStr)
}

delay := time.Duration(d)

u, err := url.Parse(urlStr)
if err != nil {
return 0, nil, fmt.Errorf("failed to parse %s annotation value as URL: %v", config.RolloutDelayedDownscalePrepareUrlAnnotationKey, err)
}

return delay, u, nil
}

type endpoint struct {
namespace string
podName string
url url.URL
index int
}

// Create prepare-downscale endpoints for pods with index in [from, to) range. URL is fully reused except for host, which is replaced with pod's FQDN.
func createPrepareDownscaleEndpoints(namespace, serviceName string, from, to int, url *url.URL) []endpoint {
eps := make([]endpoint, 0, to-from)

// The DNS entry for a pod of a stateful set is
// ingester-zone-a-0.$(servicename).$(namespace).svc.cluster.local.
// The service in this case is ingester-zone-a as well.
// https://kubernetes.io/docs/concepts/workloads/controllers/statefulset/#stable-network-id

for index := from; index < to; index++ {
ep := endpoint{
namespace: namespace,
podName: fmt.Sprintf("%v-%v", serviceName, index),
index: index,
}

ep.url = *url
ep.url.Host = fmt.Sprintf("%s.%v.%v.svc.cluster.local.", ep.podName, serviceName, ep.namespace)

eps = append(eps, ep)
}

return eps
}

func callPrepareDownscaleAndReturnMaxPrepareTimestamp(ctx context.Context, logger log.Logger, client httpClient, endpoints []endpoint) (time.Time, error) {
if len(endpoints) == 0 {
return time.Time{}, fmt.Errorf("no endpoints")
}

var (
maxTimeMu sync.Mutex
maxTime time.Time
)

type expectedResponse struct {
Timestamp int64 `json:"timestamp"`
}

g, ctx := errgroup.WithContext(ctx)
pstibrany marked this conversation as resolved.
Show resolved Hide resolved
g.SetLimit(32)
for ix := range endpoints {
ep := endpoints[ix]
g.Go(func() error {
target := ep.url.String()

epLogger := log.With(logger, "pod", ep.podName, "url", target)

// POST -- prepare for delayed downscale, if not yet prepared, and return timestamp when prepare was called.
req, err := http.NewRequestWithContext(ctx, http.MethodPost, target, nil)
if err != nil {
level.Error(epLogger).Log("msg", "error creating HTTP POST request to endpoint", "err", err)
return err
}

resp, err := client.Do(req)
if err != nil {
level.Error(epLogger).Log("error sending HTTP POST request to endpoint", "err", err)
return err
}

defer resp.Body.Close()

body, readError := io.ReadAll(resp.Body)
if readError != nil {
level.Error(epLogger).Log("msg", "error reading response from HTTP POST request to endpoint", "err", readError)
return readError
}

if resp.StatusCode/100 != 2 {
level.Error(epLogger).Log("msg", "unexpected status code returned when calling DELETE on endpoint", "status", resp.StatusCode, "response_body", string(body))
return fmt.Errorf("HTTP DELETE request returned non-2xx status code: %v", resp.StatusCode)
}

r := expectedResponse{}
if err := json.Unmarshal(body, &r); err != nil {
level.Error(epLogger).Log("msg", "error decoding response from HTTP POST request to endpoint", "err", err)
return err
}

if r.Timestamp == 0 {
level.Error(epLogger).Log("msg", "invalid response from HTTP POST request to endpoint: no timestamp")
return fmt.Errorf("no timestamp in response")
}

t := time.Unix(r.Timestamp, 0)

maxTimeMu.Lock()
if t.After(maxTime) {
maxTime = t
}
maxTimeMu.Unlock()

level.Debug(epLogger).Log("msg", "HTTP POST request to endpoint succeded", "timestamp", t.UTC().Format(time.RFC3339))
return nil
})
}
err := g.Wait()
return maxTime, err
}

func callCancelDelayedDownscale(ctx context.Context, logger log.Logger, client httpClient, endpoints []endpoint) {
if len(endpoints) == 0 {
return
}

g, _ := errgroup.WithContext(ctx)
pstibrany marked this conversation as resolved.
Show resolved Hide resolved
g.SetLimit(32)
for ix := range endpoints {
ep := endpoints[ix]
g.Go(func() error {
target := ep.url.String()

epLogger := log.With(logger, "pod", ep.podName, "url", target)

req, err := http.NewRequestWithContext(ctx, http.MethodDelete, target, nil)
if err != nil {
level.Error(epLogger).Log("msg", "error creating HTTP DELETE request to endpoint", "err", err)
return err
}

resp, err := client.Do(req)
if err != nil {
level.Error(epLogger).Log("msg", "error sending HTTP DELETE request to endpoint", "err", err)
return err
}

defer resp.Body.Close()

if resp.StatusCode/100 != 2 {
err := errors.New("HTTP DELETE request returned non-2xx status code")
body, readError := io.ReadAll(resp.Body)
level.Error(epLogger).Log("msg", "unexpected status code returned when calling DELETE on endpoint", "status", resp.StatusCode, "response_body", string(body))
return errors.Join(err, readError)
}
level.Debug(epLogger).Log("msg", "HTTP DELETE request to endpoint succeeded")
return nil
})
}
// We ignore returned error here, since all errors are already logged, and callers don't need the error.
_ = g.Wait()
}
18 changes: 13 additions & 5 deletions pkg/instrumentation/kubernetes_api_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,17 @@ func InstrumentKubernetesAPIClient(cfg *rest.Config, reg prometheus.Registerer)
)

cfg.Wrap(func(rt http.RoundTripper) http.RoundTripper {
return &kubernetesAPIClientInstrumentation{
next: &nethttp.Transport{RoundTripper: rt},
hist: hist,
}
return newInstrumentation(rt, hist)
})
}

func newInstrumentation(rt http.RoundTripper, hist *prometheus.HistogramVec) *kubernetesAPIClientInstrumentation {
return &kubernetesAPIClientInstrumentation{
next: &nethttp.Transport{RoundTripper: rt},
hist: hist,
}
}

func (k *kubernetesAPIClientInstrumentation) RoundTrip(req *http.Request) (*http.Response, error) {
start := time.Now()

Expand All @@ -46,7 +50,11 @@ func (k *kubernetesAPIClientInstrumentation) RoundTrip(req *http.Request) (*http

resp, err := k.next.RoundTrip(req)
duration := time.Since(start)
instrument.ObserveWithExemplar(req.Context(), k.hist.WithLabelValues(urlToResourceDescription(req.URL.EscapedPath()), req.Method, strconv.Itoa(resp.StatusCode)), duration.Seconds())
statusCode := "error"
if resp != nil {
statusCode = strconv.Itoa(resp.StatusCode)
}
instrument.ObserveWithExemplar(req.Context(), k.hist.WithLabelValues(urlToResourceDescription(req.URL.EscapedPath()), req.Method, statusCode), duration.Seconds())

return resp, err
}
Expand Down
Loading
Loading