Skip to content

Commit

Permalink
Merge pull request kubernetes#25871 from smarterclayton/retry_on_error
Browse files Browse the repository at this point in the history
Fix the Retry-After code path to work for clients, and send correct bodies
  • Loading branch information
wojtek-t committed May 19, 2016
2 parents 044d55e + eeb04e6 commit 9784dff
Show file tree
Hide file tree
Showing 12 changed files with 103 additions and 33 deletions.
30 changes: 15 additions & 15 deletions pkg/api/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func FromObject(obj runtime.Object) error {
}

// NewNotFound returns a new error which indicates that the resource of the kind and the name was not found.
func NewNotFound(qualifiedResource unversioned.GroupResource, name string) error {
func NewNotFound(qualifiedResource unversioned.GroupResource, name string) *StatusError {
return &StatusError{unversioned.Status{
Status: unversioned.StatusFailure,
Code: http.StatusNotFound,
Expand All @@ -108,7 +108,7 @@ func NewNotFound(qualifiedResource unversioned.GroupResource, name string) error
}

// NewAlreadyExists returns an error indicating the item requested exists by that identifier.
func NewAlreadyExists(qualifiedResource unversioned.GroupResource, name string) error {
func NewAlreadyExists(qualifiedResource unversioned.GroupResource, name string) *StatusError {
return &StatusError{unversioned.Status{
Status: unversioned.StatusFailure,
Code: http.StatusConflict,
Expand All @@ -124,7 +124,7 @@ func NewAlreadyExists(qualifiedResource unversioned.GroupResource, name string)

// NewUnauthorized returns an error indicating the client is not authorized to perform the requested
// action.
func NewUnauthorized(reason string) error {
func NewUnauthorized(reason string) *StatusError {
message := reason
if len(message) == 0 {
message = "not authorized"
Expand All @@ -138,7 +138,7 @@ func NewUnauthorized(reason string) error {
}

// NewForbidden returns an error indicating the requested action was forbidden
func NewForbidden(qualifiedResource unversioned.GroupResource, name string, err error) error {
func NewForbidden(qualifiedResource unversioned.GroupResource, name string, err error) *StatusError {
return &StatusError{unversioned.Status{
Status: unversioned.StatusFailure,
Code: http.StatusForbidden,
Expand All @@ -153,7 +153,7 @@ func NewForbidden(qualifiedResource unversioned.GroupResource, name string, err
}

// NewConflict returns an error indicating the item can't be updated as provided.
func NewConflict(qualifiedResource unversioned.GroupResource, name string, err error) error {
func NewConflict(qualifiedResource unversioned.GroupResource, name string, err error) *StatusError {
return &StatusError{unversioned.Status{
Status: unversioned.StatusFailure,
Code: http.StatusConflict,
Expand All @@ -168,7 +168,7 @@ func NewConflict(qualifiedResource unversioned.GroupResource, name string, err e
}

// NewGone returns an error indicating the item no longer available at the server and no forwarding address is known.
func NewGone(message string) error {
func NewGone(message string) *StatusError {
return &StatusError{unversioned.Status{
Status: unversioned.StatusFailure,
Code: http.StatusGone,
Expand All @@ -178,7 +178,7 @@ func NewGone(message string) error {
}

// NewInvalid returns an error indicating the item is invalid and cannot be processed.
func NewInvalid(qualifiedKind unversioned.GroupKind, name string, errs field.ErrorList) error {
func NewInvalid(qualifiedKind unversioned.GroupKind, name string, errs field.ErrorList) *StatusError {
causes := make([]unversioned.StatusCause, 0, len(errs))
for i := range errs {
err := errs[i]
Expand All @@ -203,7 +203,7 @@ func NewInvalid(qualifiedKind unversioned.GroupKind, name string, errs field.Err
}

// NewBadRequest creates an error that indicates that the request is invalid and can not be processed.
func NewBadRequest(reason string) error {
func NewBadRequest(reason string) *StatusError {
return &StatusError{unversioned.Status{
Status: unversioned.StatusFailure,
Code: http.StatusBadRequest,
Expand All @@ -213,7 +213,7 @@ func NewBadRequest(reason string) error {
}

// NewServiceUnavailable creates an error that indicates that the requested service is unavailable.
func NewServiceUnavailable(reason string) error {
func NewServiceUnavailable(reason string) *StatusError {
return &StatusError{unversioned.Status{
Status: unversioned.StatusFailure,
Code: http.StatusServiceUnavailable,
Expand All @@ -223,7 +223,7 @@ func NewServiceUnavailable(reason string) error {
}

// NewMethodNotSupported returns an error indicating the requested action is not supported on this kind.
func NewMethodNotSupported(qualifiedResource unversioned.GroupResource, action string) error {
func NewMethodNotSupported(qualifiedResource unversioned.GroupResource, action string) *StatusError {
return &StatusError{unversioned.Status{
Status: unversioned.StatusFailure,
Code: http.StatusMethodNotAllowed,
Expand All @@ -238,7 +238,7 @@ func NewMethodNotSupported(qualifiedResource unversioned.GroupResource, action s

// NewServerTimeout returns an error indicating the requested action could not be completed due to a
// transient error, and the client should try again.
func NewServerTimeout(qualifiedResource unversioned.GroupResource, operation string, retryAfterSeconds int) error {
func NewServerTimeout(qualifiedResource unversioned.GroupResource, operation string, retryAfterSeconds int) *StatusError {
return &StatusError{unversioned.Status{
Status: unversioned.StatusFailure,
Code: http.StatusInternalServerError,
Expand All @@ -255,12 +255,12 @@ func NewServerTimeout(qualifiedResource unversioned.GroupResource, operation str

// NewServerTimeoutForKind should not exist. Server timeouts happen when accessing resources, the Kind is just what we
// happened to be looking at when the request failed. This delegates to keep code sane, but we should work towards removing this.
func NewServerTimeoutForKind(qualifiedKind unversioned.GroupKind, operation string, retryAfterSeconds int) error {
func NewServerTimeoutForKind(qualifiedKind unversioned.GroupKind, operation string, retryAfterSeconds int) *StatusError {
return NewServerTimeout(unversioned.GroupResource{Group: qualifiedKind.Group, Resource: qualifiedKind.Kind}, operation, retryAfterSeconds)
}

// NewInternalError returns an error indicating the item is invalid and cannot be processed.
func NewInternalError(err error) error {
func NewInternalError(err error) *StatusError {
return &StatusError{unversioned.Status{
Status: unversioned.StatusFailure,
Code: http.StatusInternalServerError,
Expand All @@ -274,7 +274,7 @@ func NewInternalError(err error) error {

// NewTimeoutError returns an error indicating that a timeout occurred before the request
// could be completed. Clients may retry, but the operation may still complete.
func NewTimeoutError(message string, retryAfterSeconds int) error {
func NewTimeoutError(message string, retryAfterSeconds int) *StatusError {
return &StatusError{unversioned.Status{
Status: unversioned.StatusFailure,
Code: StatusServerTimeout,
Expand All @@ -287,7 +287,7 @@ func NewTimeoutError(message string, retryAfterSeconds int) error {
}

// NewGenericServerResponse returns a new error for server responses that are not in a recognizable form.
func NewGenericServerResponse(code int, verb string, qualifiedResource unversioned.GroupResource, name, serverMessage string, retryAfterSeconds int, isUnexpectedResponse bool) error {
func NewGenericServerResponse(code int, verb string, qualifiedResource unversioned.GroupResource, name, serverMessage string, retryAfterSeconds int, isUnexpectedResponse bool) *StatusError {
reason := unversioned.StatusReasonUnknown
message := fmt.Sprintf("the server responded with the status code %d but did not return more information", code)
switch code {
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/errors/errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func TestNewInvalid(t *testing.T) {
vErr, expected := testCase.Err, testCase.Details
expected.Causes[0].Message = vErr.ErrorBody()
err := NewInvalid(api.Kind("Kind"), "name", field.ErrorList{vErr})
status := err.(*StatusError).ErrStatus
status := err.ErrStatus
if status.Code != 422 || status.Reason != unversioned.StatusReasonInvalid {
t.Errorf("%d: unexpected status: %#v", i, status)
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/apiserver/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"net/http"
"path"
rt "runtime"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -449,6 +450,11 @@ func writeNegotiated(s runtime.NegotiatedSerializer, gv unversioned.GroupVersion
func errorNegotiated(err error, s runtime.NegotiatedSerializer, gv unversioned.GroupVersion, w http.ResponseWriter, req *http.Request) int {
status := errToAPIStatus(err)
code := int(status.Code)
// when writing an error, check to see if the status indicates a retry after period
if status.Details != nil && status.Details.RetryAfterSeconds > 0 {
delay := strconv.Itoa(int(status.Details.RetryAfterSeconds))
w.Header().Set("Retry-After", delay)
}
writeNegotiated(s, gv, w, req, code, status)
return code
}
Expand Down
25 changes: 24 additions & 1 deletion pkg/apiserver/apiserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1562,6 +1562,7 @@ func TestGetNamespaceSelfLink(t *testing.T) {
t.Errorf("Never set self link")
}
}

func TestGetMissing(t *testing.T) {
storage := map[string]rest.Storage{}
simpleStorage := SimpleRESTStorage{
Expand All @@ -1572,7 +1573,7 @@ func TestGetMissing(t *testing.T) {
server := httptest.NewServer(handler)
defer server.Close()

resp, err := http.Get(server.URL + "/" + prefix + "/" + testGroupVersion.Group + "/" + testGroupVersion.Version + "/simple/id")
resp, err := http.Get(server.URL + "/" + prefix + "/" + testGroupVersion.Group + "/" + testGroupVersion.Version + "/namespaces/default/simple/id")
if err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand All @@ -1582,6 +1583,28 @@ func TestGetMissing(t *testing.T) {
}
}

func TestGetRetryAfter(t *testing.T) {
storage := map[string]rest.Storage{}
simpleStorage := SimpleRESTStorage{
errors: map[string]error{"get": apierrs.NewServerTimeout(api.Resource("simples"), "id", 2)},
}
storage["simple"] = &simpleStorage
handler := handle(storage)
server := httptest.NewServer(handler)
defer server.Close()

resp, err := http.Get(server.URL + "/" + prefix + "/" + testGroupVersion.Group + "/" + testGroupVersion.Version + "/namespaces/default/simple/id")
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if resp.StatusCode != http.StatusInternalServerError {
t.Errorf("Unexpected response %#v", resp)
}
if resp.Header.Get("Retry-After") != "2" {
t.Errorf("Unexpected Retry-After header: %v", resp.Header)
}
}

func TestConnect(t *testing.T) {
responseText := "Hello World"
itemID := "theID"
Expand Down
27 changes: 23 additions & 4 deletions pkg/apiserver/resthandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package apiserver

import (
"encoding/hex"
"encoding/json"
"fmt"
"math/rand"
Expand Down Expand Up @@ -376,7 +377,7 @@ func createHandler(r rest.NamedCreater, scope RequestScope, typer runtime.Object
trace.Step("About to convert to expected version")
obj, gvk, err := decoder.Decode(body, &defaultGVK, original)
if err != nil {
err = transformDecodeError(typer, err, original, gvk)
err = transformDecodeError(typer, err, original, gvk, body)
scope.err(err, res.ResponseWriter, req.Request)
return
}
Expand Down Expand Up @@ -650,7 +651,7 @@ func UpdateResource(r rest.Updater, scope RequestScope, typer runtime.ObjectType
trace.Step("About to convert to expected version")
obj, gvk, err := scope.Serializer.DecoderToVersion(s, defaultGVK.GroupVersion()).Decode(body, &defaultGVK, original)
if err != nil {
err = transformDecodeError(typer, err, original, gvk)
err = transformDecodeError(typer, err, original, gvk, body)
scope.err(err, res.ResponseWriter, req.Request)
return
}
Expand Down Expand Up @@ -938,15 +939,16 @@ func finishRequest(timeout time.Duration, fn resultFunc) (result runtime.Object,
}

// transformDecodeError adds additional information when a decode fails.
func transformDecodeError(typer runtime.ObjectTyper, baseErr error, into runtime.Object, gvk *unversioned.GroupVersionKind) error {
func transformDecodeError(typer runtime.ObjectTyper, baseErr error, into runtime.Object, gvk *unversioned.GroupVersionKind, body []byte) error {
objGVK, err := typer.ObjectKind(into)
if err != nil {
return err
}
if gvk != nil && len(gvk.Kind) > 0 {
return errors.NewBadRequest(fmt.Sprintf("%s in version %q cannot be handled as a %s: %v", gvk.Kind, gvk.Version, objGVK.Kind, baseErr))
}
return errors.NewBadRequest(fmt.Sprintf("the object provided is unrecognized (must be of type %s): %v", objGVK.Kind, baseErr))
summary := summarizeData(body, 30)
return errors.NewBadRequest(fmt.Sprintf("the object provided is unrecognized (must be of type %s): %v (%s)", objGVK.Kind, baseErr, summary))
}

// setSelfLink sets the self link of an object (or the child items in a list) to the base URL of the request
Expand Down Expand Up @@ -1038,3 +1040,20 @@ func getPatchedJS(patchType api.PatchType, originalJS, patchJS []byte, obj runti
return nil, fmt.Errorf("unknown Content-Type header for patch: %v", patchType)
}
}

func summarizeData(data []byte, maxLength int) string {
switch {
case len(data) == 0:
return "<empty>"
case data[0] == '{':
if len(data) > maxLength {
return string(data[:maxLength]) + " ..."
}
return string(data)
default:
if len(data) > maxLength {
return hex.EncodeToString(data[:maxLength]) + " ..."
}
return hex.EncodeToString(data)
}
}
15 changes: 12 additions & 3 deletions pkg/client/restclient/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,10 +539,10 @@ func (r *Request) Body(obj interface{}) *Request {
return r
}
glog.V(8).Infof("Request Body: %s", string(data))
r.body = bytes.NewBuffer(data)
r.body = bytes.NewReader(data)
case []byte:
glog.V(8).Infof("Request Body: %s", string(t))
r.body = bytes.NewBuffer(t)
r.body = bytes.NewReader(t)
case io.Reader:
r.body = t
case runtime.Object:
Expand All @@ -556,7 +556,7 @@ func (r *Request) Body(obj interface{}) *Request {
return r
}
glog.V(8).Infof("Request Body: %s", string(data))
r.body = bytes.NewBuffer(data)
r.body = bytes.NewReader(data)
r.SetHeader("Content-Type", r.content.ContentType)
default:
r.err = fmt.Errorf("unknown type used for body: %+v", obj)
Expand Down Expand Up @@ -823,6 +823,15 @@ func (r *Request) request(fn func(*http.Request, *http.Response)) error {

retries++
if seconds, wait := checkWait(resp); wait && retries < maxRetries {
if seeker, ok := r.body.(io.Seeker); ok && r.body != nil {
_, err := seeker.Seek(0, 0)
if err != nil {
glog.V(4).Infof("Could not retry request, can't Seek() back to beginning of body for %T", r.body)
fn(req, resp)
return true
}
}

glog.V(4).Infof("Got a Retry-After %s response for attempt %d to %v", seconds, retries, url)
r.backoffMgr.Sleep(time.Duration(seconds) * time.Second)
return false
Expand Down
7 changes: 7 additions & 0 deletions pkg/client/restclient/request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -874,6 +874,13 @@ func TestCheckRetryHandles429And5xx(t *testing.T) {
count := 0
ch := make(chan struct{})
testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
data, err := ioutil.ReadAll(req.Body)
if err != nil {
t.Fatalf("unable to read request body: %v", err)
}
if !bytes.Equal(data, []byte(strings.Repeat("abcd", 1000))) {
t.Fatalf("retry did not send a complete body: %s", data)
}
t.Logf("attempt %d", count)
if count >= 4 {
w.WriteHeader(http.StatusOK)
Expand Down
4 changes: 2 additions & 2 deletions pkg/client/unversioned/testclient/testclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ func TestErrors(t *testing.T) {
o.Add(&api.List{
Items: []runtime.Object{
// This first call to List will return this error
&(errors.NewNotFound(api.Resource("ServiceList"), "").(*errors.StatusError).ErrStatus),
&(errors.NewNotFound(api.Resource("ServiceList"), "").ErrStatus),
// The second call to List will return this error
&(errors.NewForbidden(api.Resource("ServiceList"), "", nil).(*errors.StatusError).ErrStatus),
&(errors.NewForbidden(api.Resource("ServiceList"), "", nil).ErrStatus),
},
})
client := &Fake{}
Expand Down
4 changes: 2 additions & 2 deletions pkg/kubectl/cmd/delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func TestDeleteAllNotFound(t *testing.T) {

// Add an item to the list which will result in a 404 on delete
svc.Items = append(svc.Items, api.Service{ObjectMeta: api.ObjectMeta{Name: "foo"}})
notFoundError := &errors.NewNotFound(api.Resource("services"), "foo").(*errors.StatusError).ErrStatus
notFoundError := &errors.NewNotFound(api.Resource("services"), "foo").ErrStatus

tf.Printer = &testPrinter{}
tf.Client = &fake.RESTClient{
Expand Down Expand Up @@ -234,7 +234,7 @@ func TestDeleteAllIgnoreNotFound(t *testing.T) {

// Add an item to the list which will result in a 404 on delete
svc.Items = append(svc.Items, api.Service{ObjectMeta: api.ObjectMeta{Name: "foo"}})
notFoundError := &errors.NewNotFound(api.Resource("services"), "foo").(*errors.StatusError).ErrStatus
notFoundError := &errors.NewNotFound(api.Resource("services"), "foo").ErrStatus

tf.Printer = &testPrinter{}
tf.Client = &fake.RESTClient{
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/cacher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func TestWatch(t *testing.T) {
}
defer tooOldWatcher.Stop()
// Ensure we get a "Gone" error
expectedGoneError := errors.NewGone("").(*errors.StatusError).ErrStatus
expectedGoneError := errors.NewGone("").ErrStatus
verifyWatchEvent(t, tooOldWatcher, watch.Error, &expectedGoneError)

initialWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", fooCreated.ResourceVersion, storage.Everything)
Expand Down
9 changes: 7 additions & 2 deletions plugin/pkg/admission/serviceaccount/admission.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"k8s.io/kubernetes/pkg/admission"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/fields"
kubelet "k8s.io/kubernetes/pkg/kubelet/types"
Expand Down Expand Up @@ -199,6 +200,9 @@ func (s *serviceAccount) Admit(a admission.Attributes) (err error) {

if s.MountServiceAccountToken {
if err := s.mountServiceAccountToken(serviceAccount, pod); err != nil {
if _, ok := err.(errors.APIStatus); ok {
return err
}
return admission.NewForbidden(a, err)
}
}
Expand Down Expand Up @@ -357,8 +361,9 @@ func (s *serviceAccount) mountServiceAccountToken(serviceAccount *api.ServiceAcc
// We don't have an API token to mount, so return
if s.RequireAPIToken {
// If a token is required, this is considered an error
// TODO: convert to a ServerTimeout error (or other error that sends a Retry-After header)
return fmt.Errorf("no API token found for service account %s/%s, retry after the token is automatically created and added to the service account", serviceAccount.Namespace, serviceAccount.Name)
err := errors.NewServerTimeout(unversioned.GroupResource{Resource: "serviceaccounts"}, "create pod", 1)
err.ErrStatus.Message = fmt.Sprintf("No API token found for service account %q, retry after the token is automatically created and added to the service account", serviceAccount.Name)
return err
}
return nil
}
Expand Down
Loading

0 comments on commit 9784dff

Please sign in to comment.