Skip to content

Commit

Permalink
Merge pull request kubernetes#24789 from wojtek-t/use_proper_codec_in…
Browse files Browse the repository at this point in the history
…_client

Automatic merge from submit-queue

Use proper codec in client
  • Loading branch information
k8s-merge-robot committed May 4, 2016
2 parents bacb12a + 11849e2 commit 93e3df8
Show file tree
Hide file tree
Showing 41 changed files with 343 additions and 149 deletions.
9 changes: 3 additions & 6 deletions cmd/libs/go2idl/client-gen/generators/generator_for_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,8 @@ func setConfigDefaults(config *$.Config|raw$) error {
config.GroupVersion = &copyGroupVersion
//}
config.Codec = $.codecs|raw$.LegacyCodec(*config.GroupVersion)
config.NegotiatedSerializer = $.codecs|raw$
if config.QPS == 0 {
config.QPS = 5
}
Expand Down Expand Up @@ -232,11 +233,7 @@ func setConfigDefaults(config *$.Config|raw$) error {
config.GroupVersion = &copyGroupVersion
//}
codec, ok := $.codecs|raw$.SerializerForFileExtension("json")
if !ok {
return $.Errorf|raw$("unable to find serializer for JSON")
}
config.Codec = codec
config.NegotiatedSerializer = $.codecs|raw$
if config.QPS == 0 {
config.QPS = 5
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ func setConfigDefaults(config *restclient.Config) error {
config.GroupVersion = &copyGroupVersion
//}

config.Codec = api.Codecs.LegacyCodec(*config.GroupVersion)
config.NegotiatedSerializer = api.Codecs

if config.QPS == 0 {
config.QPS = 5
}
Expand Down
5 changes: 5 additions & 0 deletions contrib/mesos/pkg/scheduler/integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func NewTestServer(t *testing.T, namespace string, mockPodListWatch *MockPodsLis
mux := http.NewServeMux()

podListHandler := func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
pods := mockPodListWatch.Pods()
w.Write([]byte(runtime.EncodeOrDie(testapi.Default.Codec(), &pods)))
Expand All @@ -106,6 +107,7 @@ func NewTestServer(t *testing.T, namespace string, mockPodListWatch *MockPodsLis
ts.stats[name] = ts.stats[name] + 1

p := mockPodListWatch.Pod(name)
w.Header().Set("Content-Type", "application/json")
if p != nil {
w.WriteHeader(http.StatusOK)
w.Write([]byte(runtime.EncodeOrDie(testapi.Default.Codec(), p)))
Expand All @@ -117,6 +119,7 @@ func NewTestServer(t *testing.T, namespace string, mockPodListWatch *MockPodsLis
mux.HandleFunc(
testapi.Default.ResourcePath("events", namespace, ""),
func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
},
)
Expand All @@ -125,6 +128,7 @@ func NewTestServer(t *testing.T, namespace string, mockPodListWatch *MockPodsLis
testapi.Default.ResourcePath("nodes", "", ""),
func(w http.ResponseWriter, r *http.Request) {
var node api.Node
w.Header().Set("Content-Type", "application/json")
if err := json.NewDecoder(r.Body).Decode(&node); err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
Expand All @@ -144,6 +148,7 @@ func NewTestServer(t *testing.T, namespace string, mockPodListWatch *MockPodsLis

mux.HandleFunc("/", func(res http.ResponseWriter, req *http.Request) {
t.Errorf("unexpected request: %v", req.RequestURI)
res.Header().Set("Content-Type", "application/json")
res.WriteHeader(http.StatusNotFound)
})

Expand Down
15 changes: 8 additions & 7 deletions pkg/api/testapi/testapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,14 @@ import (
)

var (
Groups = make(map[string]TestGroup)
Default TestGroup
Autoscaling TestGroup
Batch TestGroup
Extensions TestGroup
Apps TestGroup
Federation TestGroup
Groups = make(map[string]TestGroup)
Default TestGroup
Autoscaling TestGroup
Batch TestGroup
Extensions TestGroup
Apps TestGroup
Federation TestGroup
NegotiatedSerializer = api.Codecs
)

type TestGroup struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ func setConfigDefaults(config *restclient.Config) error {
config.GroupVersion = &copyGroupVersion
//}

config.Codec = api.Codecs.LegacyCodec(*config.GroupVersion)
config.NegotiatedSerializer = api.Codecs

if config.QPS == 0 {
config.QPS = 5
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ func setConfigDefaults(config *restclient.Config) error {
config.GroupVersion = &copyGroupVersion
//}

config.Codec = api.Codecs.LegacyCodec(*config.GroupVersion)
config.NegotiatedSerializer = api.Codecs

if config.QPS == 0 {
config.QPS = 5
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ func setConfigDefaults(config *restclient.Config) error {
config.GroupVersion = &copyGroupVersion
//}

config.Codec = api.Codecs.LegacyCodec(*config.GroupVersion)
config.NegotiatedSerializer = api.Codecs

if config.QPS == 0 {
config.QPS = 5
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ func setConfigDefaults(config *restclient.Config) error {
config.GroupVersion = &copyGroupVersion
//}

config.Codec = api.Codecs.LegacyCodec(*config.GroupVersion)
config.NegotiatedSerializer = api.Codecs

if config.QPS == 0 {
config.QPS = 5
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ func setConfigDefaults(config *restclient.Config) error {
config.GroupVersion = &copyGroupVersion
//}

config.Codec = api.Codecs.LegacyCodec(*config.GroupVersion)
config.NegotiatedSerializer = api.Codecs

if config.QPS == 0 {
config.QPS = 5
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package v1

import (
fmt "fmt"
api "k8s.io/kubernetes/pkg/api"
registered "k8s.io/kubernetes/pkg/apimachinery/registered"
restclient "k8s.io/kubernetes/pkg/client/restclient"
Expand Down Expand Up @@ -150,11 +149,7 @@ func setConfigDefaults(config *restclient.Config) error {
config.GroupVersion = &copyGroupVersion
//}

codec, ok := api.Codecs.SerializerForFileExtension("json")
if !ok {
return fmt.Errorf("unable to find serializer for JSON")
}
config.Codec = codec
config.NegotiatedSerializer = api.Codecs

if config.QPS == 0 {
config.QPS = 5
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package v1beta1

import (
fmt "fmt"
api "k8s.io/kubernetes/pkg/api"
registered "k8s.io/kubernetes/pkg/apimachinery/registered"
restclient "k8s.io/kubernetes/pkg/client/restclient"
Expand Down Expand Up @@ -115,11 +114,7 @@ func setConfigDefaults(config *restclient.Config) error {
config.GroupVersion = &copyGroupVersion
//}

codec, ok := api.Codecs.SerializerForFileExtension("json")
if !ok {
return fmt.Errorf("unable to find serializer for JSON")
}
config.Codec = codec
config.NegotiatedSerializer = api.Codecs

if config.QPS == 0 {
config.QPS = 5
Expand Down
54 changes: 49 additions & 5 deletions pkg/client/restclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package restclient

import (
"fmt"
"net/http"
"net/url"
"os"
Expand Down Expand Up @@ -53,17 +54,27 @@ type RESTClient struct {
// contentConfig is the information used to communicate with the server.
contentConfig ContentConfig

// serializers contain all serializers for undelying content type.
serializers Serializers

// TODO extract this into a wrapper interface via the RESTClient interface in kubectl.
Throttle flowcontrol.RateLimiter

// Set specific behavior of the client. If not set http.DefaultClient will be used.
Client *http.Client
}

type Serializers struct {
Encoder runtime.Encoder
Decoder runtime.Decoder
StreamingSerializer runtime.Serializer
Framer runtime.Framer
}

// NewRESTClient creates a new RESTClient. This client performs generic REST functions
// such as Get, Put, Post, and Delete on specified paths. Codec controls encoding and
// decoding of responses from the server.
func NewRESTClient(baseURL *url.URL, versionedAPIPath string, config ContentConfig, maxQPS float32, maxBurst int, rateLimiter flowcontrol.RateLimiter, client *http.Client) *RESTClient {
func NewRESTClient(baseURL *url.URL, versionedAPIPath string, config ContentConfig, maxQPS float32, maxBurst int, rateLimiter flowcontrol.RateLimiter, client *http.Client) (*RESTClient, error) {
base := *baseURL
if !strings.HasSuffix(base.Path, "/") {
base.Path += "/"
Expand All @@ -77,6 +88,10 @@ func NewRESTClient(baseURL *url.URL, versionedAPIPath string, config ContentConf
if len(config.ContentType) == 0 {
config.ContentType = "application/json"
}
serializers, err := createSerializers(config)
if err != nil {
return nil, err
}

var throttle flowcontrol.RateLimiter
if maxQPS > 0 && rateLimiter == nil {
Expand All @@ -88,9 +103,10 @@ func NewRESTClient(baseURL *url.URL, versionedAPIPath string, config ContentConf
base: &base,
versionedAPIPath: versionedAPIPath,
contentConfig: config,
serializers: *serializers,
Throttle: throttle,
Client: client,
}
}, nil
}

// GetRateLimiter returns rate limier for a given client, or nil if it's called on a nil client
Expand Down Expand Up @@ -119,10 +135,38 @@ func readExpBackoffConfig() BackoffManager {
time.Duration(backoffDurationInt)*time.Second)}
}

// createSerializers creates all necessary serializers for given contentType.
func createSerializers(config ContentConfig) (*Serializers, error) {
negotiated := config.NegotiatedSerializer
contentType := config.ContentType
serializer, ok := negotiated.SerializerForMediaType(contentType, nil)
if !ok {
return nil, fmt.Errorf("serializer for %s not registered", contentType)
}
streamingSerializer, framer, _, ok := negotiated.StreamingSerializerForMediaType(contentType, nil)
if !ok {
return nil, fmt.Errorf("streaming serializer for %s not registered", contentType)
}
if framer == nil {
return nil, fmt.Errorf("no framer for %s", contentType)
}
internalGV := unversioned.GroupVersion{
Group: config.GroupVersion.Group,
Version: runtime.APIVersionInternal,
}
return &Serializers{
Encoder: negotiated.EncoderForVersion(serializer, *config.GroupVersion),
Decoder: negotiated.DecoderToVersion(serializer, internalGV),
StreamingSerializer: streamingSerializer,
Framer: framer,
}, nil
}

// Verb begins a request with a verb (GET, POST, PUT, DELETE).
//
// Example usage of RESTClient's request building interface:
// c := NewRESTClient(url, codec)
// c, err := NewRESTClient(...)
// if err != nil { ... }
// resp, err := c.Verb("GET").
// Path("pods").
// SelectorParam("labels", "area=staging").
Expand All @@ -135,9 +179,9 @@ func (c *RESTClient) Verb(verb string) *Request {
backoff := readExpBackoffConfig()

if c.Client == nil {
return NewRequest(nil, verb, c.base, c.versionedAPIPath, c.contentConfig, backoff, c.Throttle)
return NewRequest(nil, verb, c.base, c.versionedAPIPath, c.contentConfig, c.serializers, backoff, c.Throttle)
}
return NewRequest(c.Client, verb, c.base, c.versionedAPIPath, c.contentConfig, backoff, c.Throttle)
return NewRequest(c.Client, verb, c.base, c.versionedAPIPath, c.contentConfig, c.serializers, backoff, c.Throttle)
}

// Post begins a POST request. Short for c.Verb("POST").
Expand Down
12 changes: 6 additions & 6 deletions pkg/client/restclient/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ func TestDoRequestSuccess(t *testing.T) {
c, err := RESTClientFor(&Config{
Host: testServer.URL,
ContentConfig: ContentConfig{
GroupVersion: testapi.Default.GroupVersion(),
Codec: testapi.Default.Codec(),
GroupVersion: testapi.Default.GroupVersion(),
NegotiatedSerializer: testapi.NegotiatedSerializer,
},
Username: "user",
Password: "pass",
Expand Down Expand Up @@ -91,8 +91,8 @@ func TestDoRequestFailed(t *testing.T) {
c, err := RESTClientFor(&Config{
Host: testServer.URL,
ContentConfig: ContentConfig{
GroupVersion: testapi.Default.GroupVersion(),
Codec: testapi.Default.Codec(),
GroupVersion: testapi.Default.GroupVersion(),
NegotiatedSerializer: testapi.NegotiatedSerializer,
},
})
if err != nil {
Expand Down Expand Up @@ -129,8 +129,8 @@ func TestDoRequestCreated(t *testing.T) {
c, err := RESTClientFor(&Config{
Host: testServer.URL,
ContentConfig: ContentConfig{
GroupVersion: testapi.Default.GroupVersion(),
Codec: testapi.Default.Codec(),
GroupVersion: testapi.Default.GroupVersion(),
NegotiatedSerializer: testapi.NegotiatedSerializer,
},
Username: "user",
Password: "pass",
Expand Down
23 changes: 14 additions & 9 deletions pkg/client/restclient/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,17 @@ type ContentConfig struct {
// a RESTClient directly. When initializing a Client, will be set with the default
// code version.
GroupVersion *unversioned.GroupVersion
// NegotiatedSerializer is used for obtaining encoders and decoders for multiple
// supported media types.
NegotiatedSerializer runtime.NegotiatedSerializer

// Codec specifies the encoding and decoding behavior for runtime.Objects passed
// to a RESTClient or Client. Required when initializing a RESTClient, optional
// when initializing a Client.
//
// DEPRECATED: Please use NegotiatedSerializer instead.
// Codec is currently used only in some tests and will be removed soon.
// All production setups should use NegotiatedSerializer.
Codec runtime.Codec
}

Expand All @@ -144,8 +152,8 @@ func RESTClientFor(config *Config) (*RESTClient, error) {
if config.GroupVersion == nil {
return nil, fmt.Errorf("GroupVersion is required when initializing a RESTClient")
}
if config.Codec == nil {
return nil, fmt.Errorf("Codec is required when initializing a RESTClient")
if config.NegotiatedSerializer == nil {
return nil, fmt.Errorf("NegotiatedSerializer is required when initializing a RESTClient")
}

baseURL, versionedAPIPath, err := defaultServerUrlFor(config)
Expand All @@ -163,16 +171,14 @@ func RESTClientFor(config *Config) (*RESTClient, error) {
httpClient = &http.Client{Transport: transport}
}

client := NewRESTClient(baseURL, versionedAPIPath, config.ContentConfig, config.QPS, config.Burst, config.RateLimiter, httpClient)

return client, nil
return NewRESTClient(baseURL, versionedAPIPath, config.ContentConfig, config.QPS, config.Burst, config.RateLimiter, httpClient)
}

// UnversionedRESTClientFor is the same as RESTClientFor, except that it allows
// the config.Version to be empty.
func UnversionedRESTClientFor(config *Config) (*RESTClient, error) {
if config.Codec == nil {
return nil, fmt.Errorf("Codec is required when initializing a RESTClient")
if config.NegotiatedSerializer == nil {
return nil, fmt.Errorf("NeogitatedSerializer is required when initializing a RESTClient")
}

baseURL, versionedAPIPath, err := defaultServerUrlFor(config)
Expand All @@ -196,8 +202,7 @@ func UnversionedRESTClientFor(config *Config) (*RESTClient, error) {
versionConfig.GroupVersion = &v
}

client := NewRESTClient(baseURL, versionedAPIPath, versionConfig, config.QPS, config.Burst, config.RateLimiter, httpClient)
return client, nil
return NewRESTClient(baseURL, versionedAPIPath, versionConfig, config.QPS, config.Burst, config.RateLimiter, httpClient)
}

// SetKubernetesDefaults sets default values on the provided client config for accessing the
Expand Down
Loading

0 comments on commit 93e3df8

Please sign in to comment.