From cc9ec3854502d8d09277dd6fd8b270b43a88407b Mon Sep 17 00:00:00 2001 From: Luis Davim Date: Tue, 8 Oct 2024 17:41:53 +0100 Subject: [PATCH] fix: handle "leader changed" errors Fixes fluxcd/flux2/#4804 by copying the solution used in helm/helm#11426 --- internal/kube/client.go | 5 +++ internal/kube/roundtripper.go | 82 +++++++++++++++++++++++++++++++++++ 2 files changed, 87 insertions(+) create mode 100644 internal/kube/roundtripper.go diff --git a/internal/kube/client.go b/internal/kube/client.go index 6b8335472..788f0b27e 100644 --- a/internal/kube/client.go +++ b/internal/kube/client.go @@ -18,6 +18,7 @@ package kube import ( "fmt" + "net/http" "sync" "k8s.io/apimachinery/pkg/api/meta" @@ -130,6 +131,10 @@ func (c *MemoryRESTClientGetter) ToRESTConfig() (*rest.Config, error) { if c.cfg == nil { return nil, fmt.Errorf("MemoryRESTClientGetter has no REST config") } + // add retries to fix temporary "etcdserver: leader changed" errors from kube-apiserver + c.cfg.Wrap(func(rt http.RoundTripper) http.RoundTripper { + return &retryingRoundTripper{wrapped: rt} + }) return c.cfg, nil } diff --git a/internal/kube/roundtripper.go b/internal/kube/roundtripper.go new file mode 100644 index 000000000..613d9df27 --- /dev/null +++ b/internal/kube/roundtripper.go @@ -0,0 +1,82 @@ +/* +Copyright The Helm Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// TODO: this file was taken from https://github.com/helm/helm/blob/main/pkg/cli/roundtripper.go +// We should be able to get rid of it once https://github.com/helm/helm/issues/13052 is addressed +package kube + +import ( + "bytes" + "encoding/json" + "io" + "net/http" + "strings" +) + +type retryingRoundTripper struct { + wrapped http.RoundTripper +} + +func (rt *retryingRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { + return rt.roundTrip(req, 1, nil) +} + +func (rt *retryingRoundTripper) roundTrip(req *http.Request, retry int, prevResp *http.Response) (*http.Response, error) { + if retry < 0 { + return prevResp, nil + } + resp, rtErr := rt.wrapped.RoundTrip(req) + if rtErr != nil { + return resp, rtErr + } + if resp.StatusCode < 500 { + return resp, rtErr + } + if resp.Header.Get("content-type") != "application/json" { + return resp, rtErr + } + b, err := io.ReadAll(resp.Body) + resp.Body.Close() + if err != nil { + return resp, rtErr + } + + var ke kubernetesError + r := bytes.NewReader(b) + err = json.NewDecoder(r).Decode(&ke) + r.Seek(0, io.SeekStart) + resp.Body = io.NopCloser(r) + if err != nil { + return resp, rtErr + } + if ke.Code < 500 { + return resp, rtErr + } + // Matches messages like "etcdserver: leader changed" + if strings.HasSuffix(ke.Message, "etcdserver: leader changed") { + return rt.roundTrip(req, retry-1, resp) + } + // Matches messages like "rpc error: code = Unknown desc = raft proposal dropped" + if strings.HasSuffix(ke.Message, "raft proposal dropped") { + return rt.roundTrip(req, retry-1, resp) + } + return resp, rtErr +} + +type kubernetesError struct { + Message string `json:"message"` + Code int `json:"code"` +}