diff --git a/cmd/libs/go2idl/client-gen/generators/generator_for_group.go b/cmd/libs/go2idl/client-gen/generators/generator_for_group.go index afc4064efe46e..21437a4e8b999 100644 --- a/cmd/libs/go2idl/client-gen/generators/generator_for_group.go +++ b/cmd/libs/go2idl/client-gen/generators/generator_for_group.go @@ -204,7 +204,8 @@ func setConfigDefaults(config *$.Config|raw$) error { config.GroupVersion = ©GroupVersion //} - config.Codec = $.codecs|raw$.LegacyCodec(*config.GroupVersion) + config.NegotiatedSerializer = $.codecs|raw$ + if config.QPS == 0 { config.QPS = 5 } @@ -232,11 +233,7 @@ func setConfigDefaults(config *$.Config|raw$) error { config.GroupVersion = ©GroupVersion //} - codec, ok := $.codecs|raw$.SerializerForFileExtension("json") - if !ok { - return $.Errorf|raw$("unable to find serializer for JSON") - } - config.Codec = codec + config.NegotiatedSerializer = $.codecs|raw$ if config.QPS == 0 { config.QPS = 5 diff --git a/cmd/libs/go2idl/client-gen/testoutput/clientset_generated/test_internalclientset/typed/testgroup.k8s.io/unversioned/testgroup_client.go b/cmd/libs/go2idl/client-gen/testoutput/clientset_generated/test_internalclientset/typed/testgroup.k8s.io/unversioned/testgroup_client.go index 56d08722b3e1b..f870a64d889ea 100644 --- a/cmd/libs/go2idl/client-gen/testoutput/clientset_generated/test_internalclientset/typed/testgroup.k8s.io/unversioned/testgroup_client.go +++ b/cmd/libs/go2idl/client-gen/testoutput/clientset_generated/test_internalclientset/typed/testgroup.k8s.io/unversioned/testgroup_client.go @@ -80,7 +80,8 @@ func setConfigDefaults(config *restclient.Config) error { config.GroupVersion = ©GroupVersion //} - config.Codec = api.Codecs.LegacyCodec(*config.GroupVersion) + config.NegotiatedSerializer = api.Codecs + if config.QPS == 0 { config.QPS = 5 } diff --git a/contrib/mesos/pkg/scheduler/integration/integration_test.go b/contrib/mesos/pkg/scheduler/integration/integration_test.go index c56bef6fa7b7a..cd99680a6f962 100644 --- a/contrib/mesos/pkg/scheduler/integration/integration_test.go +++ b/contrib/mesos/pkg/scheduler/integration/integration_test.go @@ -89,6 +89,7 @@ func NewTestServer(t *testing.T, namespace string, mockPodListWatch *MockPodsLis mux := http.NewServeMux() podListHandler := func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) pods := mockPodListWatch.Pods() w.Write([]byte(runtime.EncodeOrDie(testapi.Default.Codec(), &pods))) @@ -106,6 +107,7 @@ func NewTestServer(t *testing.T, namespace string, mockPodListWatch *MockPodsLis ts.stats[name] = ts.stats[name] + 1 p := mockPodListWatch.Pod(name) + w.Header().Set("Content-Type", "application/json") if p != nil { w.WriteHeader(http.StatusOK) w.Write([]byte(runtime.EncodeOrDie(testapi.Default.Codec(), p))) @@ -117,6 +119,7 @@ func NewTestServer(t *testing.T, namespace string, mockPodListWatch *MockPodsLis mux.HandleFunc( testapi.Default.ResourcePath("events", namespace, ""), func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) }, ) @@ -125,6 +128,7 @@ func NewTestServer(t *testing.T, namespace string, mockPodListWatch *MockPodsLis testapi.Default.ResourcePath("nodes", "", ""), func(w http.ResponseWriter, r *http.Request) { var node api.Node + w.Header().Set("Content-Type", "application/json") if err := json.NewDecoder(r.Body).Decode(&node); err != nil { w.WriteHeader(http.StatusInternalServerError) return @@ -144,6 +148,7 @@ func NewTestServer(t *testing.T, namespace string, mockPodListWatch *MockPodsLis mux.HandleFunc("/", func(res http.ResponseWriter, req *http.Request) { t.Errorf("unexpected request: %v", req.RequestURI) + res.Header().Set("Content-Type", "application/json") res.WriteHeader(http.StatusNotFound) }) diff --git a/pkg/api/testapi/testapi.go b/pkg/api/testapi/testapi.go index bbbe0367e9865..8bec17e33e3b1 100644 --- a/pkg/api/testapi/testapi.go +++ b/pkg/api/testapi/testapi.go @@ -45,13 +45,14 @@ import ( ) var ( - Groups = make(map[string]TestGroup) - Default TestGroup - Autoscaling TestGroup - Batch TestGroup - Extensions TestGroup - Apps TestGroup - Federation TestGroup + Groups = make(map[string]TestGroup) + Default TestGroup + Autoscaling TestGroup + Batch TestGroup + Extensions TestGroup + Apps TestGroup + Federation TestGroup + NegotiatedSerializer = api.Codecs ) type TestGroup struct { diff --git a/pkg/client/clientset_generated/internalclientset/typed/batch/unversioned/batch_client.go b/pkg/client/clientset_generated/internalclientset/typed/batch/unversioned/batch_client.go index e42c2d84a6e4b..58b67ceda073a 100644 --- a/pkg/client/clientset_generated/internalclientset/typed/batch/unversioned/batch_client.go +++ b/pkg/client/clientset_generated/internalclientset/typed/batch/unversioned/batch_client.go @@ -80,7 +80,8 @@ func setConfigDefaults(config *restclient.Config) error { config.GroupVersion = ©GroupVersion //} - config.Codec = api.Codecs.LegacyCodec(*config.GroupVersion) + config.NegotiatedSerializer = api.Codecs + if config.QPS == 0 { config.QPS = 5 } diff --git a/pkg/client/clientset_generated/internalclientset/typed/core/unversioned/core_client.go b/pkg/client/clientset_generated/internalclientset/typed/core/unversioned/core_client.go index 9250a2261b501..41aee4cf65434 100644 --- a/pkg/client/clientset_generated/internalclientset/typed/core/unversioned/core_client.go +++ b/pkg/client/clientset_generated/internalclientset/typed/core/unversioned/core_client.go @@ -155,7 +155,8 @@ func setConfigDefaults(config *restclient.Config) error { config.GroupVersion = ©GroupVersion //} - config.Codec = api.Codecs.LegacyCodec(*config.GroupVersion) + config.NegotiatedSerializer = api.Codecs + if config.QPS == 0 { config.QPS = 5 } diff --git a/pkg/client/clientset_generated/internalclientset/typed/extensions/unversioned/extensions_client.go b/pkg/client/clientset_generated/internalclientset/typed/extensions/unversioned/extensions_client.go index d8b97a05ed98a..5c979161256c0 100644 --- a/pkg/client/clientset_generated/internalclientset/typed/extensions/unversioned/extensions_client.go +++ b/pkg/client/clientset_generated/internalclientset/typed/extensions/unversioned/extensions_client.go @@ -110,7 +110,8 @@ func setConfigDefaults(config *restclient.Config) error { config.GroupVersion = ©GroupVersion //} - config.Codec = api.Codecs.LegacyCodec(*config.GroupVersion) + config.NegotiatedSerializer = api.Codecs + if config.QPS == 0 { config.QPS = 5 } diff --git a/pkg/client/clientset_generated/release_1_2/typed/core/v1/core_client.go b/pkg/client/clientset_generated/release_1_2/typed/core/v1/core_client.go index 886d556b5ddd6..6c2874e386d0e 100644 --- a/pkg/client/clientset_generated/release_1_2/typed/core/v1/core_client.go +++ b/pkg/client/clientset_generated/release_1_2/typed/core/v1/core_client.go @@ -149,7 +149,8 @@ func setConfigDefaults(config *restclient.Config) error { config.GroupVersion = ©GroupVersion //} - config.Codec = api.Codecs.LegacyCodec(*config.GroupVersion) + config.NegotiatedSerializer = api.Codecs + if config.QPS == 0 { config.QPS = 5 } diff --git a/pkg/client/clientset_generated/release_1_2/typed/extensions/v1beta1/extensions_client.go b/pkg/client/clientset_generated/release_1_2/typed/extensions/v1beta1/extensions_client.go index af3348a3357e5..86cc6a9ce277f 100644 --- a/pkg/client/clientset_generated/release_1_2/typed/extensions/v1beta1/extensions_client.go +++ b/pkg/client/clientset_generated/release_1_2/typed/extensions/v1beta1/extensions_client.go @@ -114,7 +114,8 @@ func setConfigDefaults(config *restclient.Config) error { config.GroupVersion = ©GroupVersion //} - config.Codec = api.Codecs.LegacyCodec(*config.GroupVersion) + config.NegotiatedSerializer = api.Codecs + if config.QPS == 0 { config.QPS = 5 } diff --git a/pkg/client/clientset_generated/release_1_3/typed/core/v1/core_client.go b/pkg/client/clientset_generated/release_1_3/typed/core/v1/core_client.go index c6b0cfbfe226a..6c2874e386d0e 100644 --- a/pkg/client/clientset_generated/release_1_3/typed/core/v1/core_client.go +++ b/pkg/client/clientset_generated/release_1_3/typed/core/v1/core_client.go @@ -17,7 +17,6 @@ limitations under the License. package v1 import ( - fmt "fmt" api "k8s.io/kubernetes/pkg/api" registered "k8s.io/kubernetes/pkg/apimachinery/registered" restclient "k8s.io/kubernetes/pkg/client/restclient" @@ -150,11 +149,7 @@ func setConfigDefaults(config *restclient.Config) error { config.GroupVersion = ©GroupVersion //} - codec, ok := api.Codecs.SerializerForFileExtension("json") - if !ok { - return fmt.Errorf("unable to find serializer for JSON") - } - config.Codec = codec + config.NegotiatedSerializer = api.Codecs if config.QPS == 0 { config.QPS = 5 diff --git a/pkg/client/clientset_generated/release_1_3/typed/extensions/v1beta1/extensions_client.go b/pkg/client/clientset_generated/release_1_3/typed/extensions/v1beta1/extensions_client.go index ebbe0a2f8b507..86cc6a9ce277f 100644 --- a/pkg/client/clientset_generated/release_1_3/typed/extensions/v1beta1/extensions_client.go +++ b/pkg/client/clientset_generated/release_1_3/typed/extensions/v1beta1/extensions_client.go @@ -17,7 +17,6 @@ limitations under the License. package v1beta1 import ( - fmt "fmt" api "k8s.io/kubernetes/pkg/api" registered "k8s.io/kubernetes/pkg/apimachinery/registered" restclient "k8s.io/kubernetes/pkg/client/restclient" @@ -115,11 +114,7 @@ func setConfigDefaults(config *restclient.Config) error { config.GroupVersion = ©GroupVersion //} - codec, ok := api.Codecs.SerializerForFileExtension("json") - if !ok { - return fmt.Errorf("unable to find serializer for JSON") - } - config.Codec = codec + config.NegotiatedSerializer = api.Codecs if config.QPS == 0 { config.QPS = 5 diff --git a/pkg/client/restclient/client.go b/pkg/client/restclient/client.go index b66884f91ab5b..d14347a0b4cc8 100644 --- a/pkg/client/restclient/client.go +++ b/pkg/client/restclient/client.go @@ -17,6 +17,7 @@ limitations under the License. package restclient import ( + "fmt" "net/http" "net/url" "os" @@ -53,6 +54,9 @@ type RESTClient struct { // contentConfig is the information used to communicate with the server. contentConfig ContentConfig + // serializers contain all serializers for undelying content type. + serializers Serializers + // TODO extract this into a wrapper interface via the RESTClient interface in kubectl. Throttle flowcontrol.RateLimiter @@ -60,10 +64,17 @@ type RESTClient struct { Client *http.Client } +type Serializers struct { + Encoder runtime.Encoder + Decoder runtime.Decoder + StreamingSerializer runtime.Serializer + Framer runtime.Framer +} + // NewRESTClient creates a new RESTClient. This client performs generic REST functions // such as Get, Put, Post, and Delete on specified paths. Codec controls encoding and // decoding of responses from the server. -func NewRESTClient(baseURL *url.URL, versionedAPIPath string, config ContentConfig, maxQPS float32, maxBurst int, rateLimiter flowcontrol.RateLimiter, client *http.Client) *RESTClient { +func NewRESTClient(baseURL *url.URL, versionedAPIPath string, config ContentConfig, maxQPS float32, maxBurst int, rateLimiter flowcontrol.RateLimiter, client *http.Client) (*RESTClient, error) { base := *baseURL if !strings.HasSuffix(base.Path, "/") { base.Path += "/" @@ -77,6 +88,10 @@ func NewRESTClient(baseURL *url.URL, versionedAPIPath string, config ContentConf if len(config.ContentType) == 0 { config.ContentType = "application/json" } + serializers, err := createSerializers(config) + if err != nil { + return nil, err + } var throttle flowcontrol.RateLimiter if maxQPS > 0 && rateLimiter == nil { @@ -88,9 +103,10 @@ func NewRESTClient(baseURL *url.URL, versionedAPIPath string, config ContentConf base: &base, versionedAPIPath: versionedAPIPath, contentConfig: config, + serializers: *serializers, Throttle: throttle, Client: client, - } + }, nil } // GetRateLimiter returns rate limier for a given client, or nil if it's called on a nil client @@ -119,10 +135,38 @@ func readExpBackoffConfig() BackoffManager { time.Duration(backoffDurationInt)*time.Second)} } +// createSerializers creates all necessary serializers for given contentType. +func createSerializers(config ContentConfig) (*Serializers, error) { + negotiated := config.NegotiatedSerializer + contentType := config.ContentType + serializer, ok := negotiated.SerializerForMediaType(contentType, nil) + if !ok { + return nil, fmt.Errorf("serializer for %s not registered", contentType) + } + streamingSerializer, framer, _, ok := negotiated.StreamingSerializerForMediaType(contentType, nil) + if !ok { + return nil, fmt.Errorf("streaming serializer for %s not registered", contentType) + } + if framer == nil { + return nil, fmt.Errorf("no framer for %s", contentType) + } + internalGV := unversioned.GroupVersion{ + Group: config.GroupVersion.Group, + Version: runtime.APIVersionInternal, + } + return &Serializers{ + Encoder: negotiated.EncoderForVersion(serializer, *config.GroupVersion), + Decoder: negotiated.DecoderToVersion(serializer, internalGV), + StreamingSerializer: streamingSerializer, + Framer: framer, + }, nil +} + // Verb begins a request with a verb (GET, POST, PUT, DELETE). // // Example usage of RESTClient's request building interface: -// c := NewRESTClient(url, codec) +// c, err := NewRESTClient(...) +// if err != nil { ... } // resp, err := c.Verb("GET"). // Path("pods"). // SelectorParam("labels", "area=staging"). @@ -135,9 +179,9 @@ func (c *RESTClient) Verb(verb string) *Request { backoff := readExpBackoffConfig() if c.Client == nil { - return NewRequest(nil, verb, c.base, c.versionedAPIPath, c.contentConfig, backoff, c.Throttle) + return NewRequest(nil, verb, c.base, c.versionedAPIPath, c.contentConfig, c.serializers, backoff, c.Throttle) } - return NewRequest(c.Client, verb, c.base, c.versionedAPIPath, c.contentConfig, backoff, c.Throttle) + return NewRequest(c.Client, verb, c.base, c.versionedAPIPath, c.contentConfig, c.serializers, backoff, c.Throttle) } // Post begins a POST request. Short for c.Verb("POST"). diff --git a/pkg/client/restclient/client_test.go b/pkg/client/restclient/client_test.go index 6bd9596a7eaaf..1fd2c603c95bd 100644 --- a/pkg/client/restclient/client_test.go +++ b/pkg/client/restclient/client_test.go @@ -46,8 +46,8 @@ func TestDoRequestSuccess(t *testing.T) { c, err := RESTClientFor(&Config{ Host: testServer.URL, ContentConfig: ContentConfig{ - GroupVersion: testapi.Default.GroupVersion(), - Codec: testapi.Default.Codec(), + GroupVersion: testapi.Default.GroupVersion(), + NegotiatedSerializer: testapi.NegotiatedSerializer, }, Username: "user", Password: "pass", @@ -91,8 +91,8 @@ func TestDoRequestFailed(t *testing.T) { c, err := RESTClientFor(&Config{ Host: testServer.URL, ContentConfig: ContentConfig{ - GroupVersion: testapi.Default.GroupVersion(), - Codec: testapi.Default.Codec(), + GroupVersion: testapi.Default.GroupVersion(), + NegotiatedSerializer: testapi.NegotiatedSerializer, }, }) if err != nil { @@ -129,8 +129,8 @@ func TestDoRequestCreated(t *testing.T) { c, err := RESTClientFor(&Config{ Host: testServer.URL, ContentConfig: ContentConfig{ - GroupVersion: testapi.Default.GroupVersion(), - Codec: testapi.Default.Codec(), + GroupVersion: testapi.Default.GroupVersion(), + NegotiatedSerializer: testapi.NegotiatedSerializer, }, Username: "user", Password: "pass", diff --git a/pkg/client/restclient/config.go b/pkg/client/restclient/config.go index c1928de081889..6e7494b123043 100644 --- a/pkg/client/restclient/config.go +++ b/pkg/client/restclient/config.go @@ -130,9 +130,17 @@ type ContentConfig struct { // a RESTClient directly. When initializing a Client, will be set with the default // code version. GroupVersion *unversioned.GroupVersion + // NegotiatedSerializer is used for obtaining encoders and decoders for multiple + // supported media types. + NegotiatedSerializer runtime.NegotiatedSerializer + // Codec specifies the encoding and decoding behavior for runtime.Objects passed // to a RESTClient or Client. Required when initializing a RESTClient, optional // when initializing a Client. + // + // DEPRECATED: Please use NegotiatedSerializer instead. + // Codec is currently used only in some tests and will be removed soon. + // All production setups should use NegotiatedSerializer. Codec runtime.Codec } @@ -144,8 +152,8 @@ func RESTClientFor(config *Config) (*RESTClient, error) { if config.GroupVersion == nil { return nil, fmt.Errorf("GroupVersion is required when initializing a RESTClient") } - if config.Codec == nil { - return nil, fmt.Errorf("Codec is required when initializing a RESTClient") + if config.NegotiatedSerializer == nil { + return nil, fmt.Errorf("NegotiatedSerializer is required when initializing a RESTClient") } baseURL, versionedAPIPath, err := defaultServerUrlFor(config) @@ -163,16 +171,14 @@ func RESTClientFor(config *Config) (*RESTClient, error) { httpClient = &http.Client{Transport: transport} } - client := NewRESTClient(baseURL, versionedAPIPath, config.ContentConfig, config.QPS, config.Burst, config.RateLimiter, httpClient) - - return client, nil + return NewRESTClient(baseURL, versionedAPIPath, config.ContentConfig, config.QPS, config.Burst, config.RateLimiter, httpClient) } // UnversionedRESTClientFor is the same as RESTClientFor, except that it allows // the config.Version to be empty. func UnversionedRESTClientFor(config *Config) (*RESTClient, error) { - if config.Codec == nil { - return nil, fmt.Errorf("Codec is required when initializing a RESTClient") + if config.NegotiatedSerializer == nil { + return nil, fmt.Errorf("NeogitatedSerializer is required when initializing a RESTClient") } baseURL, versionedAPIPath, err := defaultServerUrlFor(config) @@ -196,8 +202,7 @@ func UnversionedRESTClientFor(config *Config) (*RESTClient, error) { versionConfig.GroupVersion = &v } - client := NewRESTClient(baseURL, versionedAPIPath, versionConfig, config.QPS, config.Burst, config.RateLimiter, httpClient) - return client, nil + return NewRESTClient(baseURL, versionedAPIPath, versionConfig, config.QPS, config.Burst, config.RateLimiter, httpClient) } // SetKubernetesDefaults sets default values on the provided client config for accessing the diff --git a/pkg/client/restclient/config_test.go b/pkg/client/restclient/config_test.go index 1a1dd422b4f82..029e64253ba87 100644 --- a/pkg/client/restclient/config_test.go +++ b/pkg/client/restclient/config_test.go @@ -87,13 +87,13 @@ func TestSetKubernetesDefaultsUserAgent(t *testing.T) { } func TestRESTClientRequires(t *testing.T) { - if _, err := RESTClientFor(&Config{Host: "127.0.0.1", ContentConfig: ContentConfig{Codec: testapi.Default.Codec()}}); err == nil { + if _, err := RESTClientFor(&Config{Host: "127.0.0.1", ContentConfig: ContentConfig{NegotiatedSerializer: testapi.NegotiatedSerializer}}); err == nil { t.Errorf("unexpected non-error") } if _, err := RESTClientFor(&Config{Host: "127.0.0.1", ContentConfig: ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}); err == nil { t.Errorf("unexpected non-error") } - if _, err := RESTClientFor(&Config{Host: "127.0.0.1", ContentConfig: ContentConfig{GroupVersion: testapi.Default.GroupVersion(), Codec: testapi.Default.Codec()}}); err != nil { + if _, err := RESTClientFor(&Config{Host: "127.0.0.1", ContentConfig: ContentConfig{GroupVersion: testapi.Default.GroupVersion(), NegotiatedSerializer: testapi.NegotiatedSerializer}}); err != nil { t.Errorf("unexpected error: %v", err) } } diff --git a/pkg/client/restclient/request.go b/pkg/client/restclient/request.go index 959318d7f6a57..c29f6e4eedd63 100644 --- a/pkg/client/restclient/request.go +++ b/pkg/client/restclient/request.go @@ -39,11 +39,12 @@ import ( "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/runtime/serializer/streaming" "k8s.io/kubernetes/pkg/util/flowcontrol" "k8s.io/kubernetes/pkg/util/net" "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/watch" - watchjson "k8s.io/kubernetes/pkg/watch/json" + "k8s.io/kubernetes/pkg/watch/versioned" ) var ( @@ -90,8 +91,9 @@ type Request struct { client HTTPClient verb string - baseURL *url.URL - content ContentConfig + baseURL *url.URL + content ContentConfig + serializers Serializers // generic components accessible via method setters pathPrefix string @@ -121,7 +123,7 @@ type Request struct { } // NewRequest creates a new request helper object for accessing runtime.Objects on a server. -func NewRequest(client HTTPClient, verb string, baseURL *url.URL, versionedAPIPath string, content ContentConfig, backoff BackoffManager, throttle flowcontrol.RateLimiter) *Request { +func NewRequest(client HTTPClient, verb string, baseURL *url.URL, versionedAPIPath string, content ContentConfig, serializers Serializers, backoff BackoffManager, throttle flowcontrol.RateLimiter) *Request { if backoff == nil { glog.V(2).Infof("Not implementing request backoff strategy.") backoff = &NoBackoff{} @@ -132,13 +134,14 @@ func NewRequest(client HTTPClient, verb string, baseURL *url.URL, versionedAPIPa pathPrefix = path.Join(pathPrefix, baseURL.Path) } r := &Request{ - client: client, - verb: verb, - baseURL: baseURL, - pathPrefix: path.Join(pathPrefix, versionedAPIPath), - content: content, - backoffMgr: backoff, - throttle: throttle, + client: client, + verb: verb, + baseURL: baseURL, + pathPrefix: path.Join(pathPrefix, versionedAPIPath), + content: content, + serializers: serializers, + backoffMgr: backoff, + throttle: throttle, } if len(content.ContentType) > 0 { r.SetHeader("Accept", content.ContentType+", */*") @@ -547,7 +550,7 @@ func (r *Request) Body(obj interface{}) *Request { if reflect.ValueOf(t).IsNil() { return r } - data, err := runtime.Encode(r.content.Codec, t) + data, err := runtime.Encode(r.serializers.Encoder, t) if err != nil { r.err = err return r @@ -670,7 +673,9 @@ func (r *Request) Watch() (watch.Interface, error) { } return nil, fmt.Errorf("for request '%+v', got status: %v", url, resp.StatusCode) } - return watch.NewStreamWatcher(watchjson.NewDecoder(resp.Body, r.content.Codec)), nil + framer := r.serializers.Framer.NewFrameReader(resp.Body) + decoder := streaming.NewDecoder(framer, r.serializers.StreamingSerializer) + return watch.NewStreamWatcher(versioned.NewDecoder(decoder, r.serializers.Decoder)), nil } // updateURLMetrics is a convenience function for pushing metrics. @@ -738,7 +743,8 @@ func (r *Request) Stream() (io.ReadCloser, error) { return nil, fmt.Errorf("%v while accessing %v", resp.Status, url) } - if runtimeObject, err := runtime.Decode(r.content.Codec, bodyBytes); err == nil { + // TODO: Check ContentType. + if runtimeObject, err := runtime.Decode(r.serializers.Decoder, bodyBytes); err == nil { statusError := errors.FromObject(runtimeObject) if _, ok := statusError.(errors.APIStatus); ok { @@ -876,7 +882,7 @@ func (r *Request) transformResponse(resp *http.Response, req *http.Request) Resu // default groupVersion, otherwise a status response won't be correctly // decoded. status := &unversioned.Status{} - err := runtime.DecodeInto(r.content.Codec, body, status) + err := runtime.DecodeInto(r.serializers.Decoder, body, status) if err == nil && len(status.Status) > 0 { isStatusResponse = true } @@ -898,11 +904,12 @@ func (r *Request) transformResponse(resp *http.Response, req *http.Request) Resu return Result{err: errors.FromObject(status)} } + // TODO: Check ContentType. return Result{ body: body, contentType: resp.Header.Get("Content-Type"), statusCode: resp.StatusCode, - decoder: r.content.Codec, + decoder: r.serializers.Decoder, } } diff --git a/pkg/client/restclient/request_test.go b/pkg/client/restclient/request_test.go index 31f2f9fcf5e43..3b1a67a0cfedd 100644 --- a/pkg/client/restclient/request_test.go +++ b/pkg/client/restclient/request_test.go @@ -37,21 +37,22 @@ import ( "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/runtime/serializer/streaming" "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/flowcontrol" "k8s.io/kubernetes/pkg/util/httpstream" "k8s.io/kubernetes/pkg/util/intstr" utiltesting "k8s.io/kubernetes/pkg/util/testing" "k8s.io/kubernetes/pkg/watch" - watchjson "k8s.io/kubernetes/pkg/watch/json" + "k8s.io/kubernetes/pkg/watch/versioned" ) func TestNewRequestSetsAccept(t *testing.T) { - r := NewRequest(nil, "get", &url.URL{Path: "/path/"}, "", ContentConfig{}, nil, nil) + r := NewRequest(nil, "get", &url.URL{Path: "/path/"}, "", ContentConfig{}, Serializers{}, nil, nil) if r.headers.Get("Accept") != "" { t.Errorf("unexpected headers: %#v", r.headers) } - r = NewRequest(nil, "get", &url.URL{Path: "/path/"}, "", ContentConfig{ContentType: "application/other"}, nil, nil) + r = NewRequest(nil, "get", &url.URL{Path: "/path/"}, "", ContentConfig{ContentType: "application/other"}, Serializers{}, nil, nil) if r.headers.Get("Accept") != "application/other, */*" { t.Errorf("unexpected headers: %#v", r.headers) } @@ -242,6 +243,23 @@ type NotAnAPIObject struct{} func (obj NotAnAPIObject) GroupVersionKind() *unversioned.GroupVersionKind { return nil } func (obj NotAnAPIObject) SetGroupVersionKind(gvk *unversioned.GroupVersionKind) {} +func defaultContentConfig() ContentConfig { + return ContentConfig{ + GroupVersion: testapi.Default.GroupVersion(), + Codec: testapi.Default.Codec(), + NegotiatedSerializer: testapi.NegotiatedSerializer, + } +} + +func defaultSerializers() Serializers { + return Serializers{ + Encoder: testapi.Default.Codec(), + Decoder: testapi.Default.Codec(), + StreamingSerializer: testapi.Default.Codec(), + Framer: runtime.DefaultFramer, + } +} + func TestRequestBody(t *testing.T) { // test unknown type r := (&Request{}).Body([]string{"test"}) @@ -262,7 +280,7 @@ func TestRequestBody(t *testing.T) { } // test unencodable api object - r = (&Request{content: ContentConfig{Codec: testapi.Default.Codec()}}).Body(&NotAnAPIObject{}) + r = (&Request{content: defaultContentConfig()}).Body(&NotAnAPIObject{}) if r.err == nil || r.body != nil { t.Errorf("should have set err and left body nil: %#v", r) } @@ -277,7 +295,7 @@ func TestResultIntoWithErrReturnsErr(t *testing.T) { func TestURLTemplate(t *testing.T) { uri, _ := url.Parse("http://localhost") - r := NewRequest(nil, "POST", uri, "", ContentConfig{GroupVersion: &unversioned.GroupVersion{Group: "test"}}, nil, nil) + r := NewRequest(nil, "POST", uri, "", ContentConfig{GroupVersion: &unversioned.GroupVersion{Group: "test"}}, Serializers{}, nil, nil) r.Prefix("pre1").Resource("r1").Namespace("ns").Name("nm").Param("p0", "v0") full := r.URL() if full.String() != "http://localhost/pre1/namespaces/ns/r1/nm?p0=v0" { @@ -338,7 +356,7 @@ func TestTransformResponse(t *testing.T) { {Response: &http.Response{StatusCode: 200, Body: ioutil.NopCloser(bytes.NewReader(invalid))}, Data: invalid}, } for i, test := range testCases { - r := NewRequest(nil, "", uri, "", ContentConfig{GroupVersion: testapi.Default.GroupVersion(), Codec: testapi.Default.Codec()}, nil, nil) + r := NewRequest(nil, "", uri, "", defaultContentConfig(), defaultSerializers(), nil, nil) if test.Response.Body == nil { test.Response.Body = ioutil.NopCloser(bytes.NewReader([]byte{})) } @@ -425,7 +443,8 @@ func TestTransformUnstructuredError(t *testing.T) { for _, testCase := range testCases { r := &Request{ - content: ContentConfig{GroupVersion: testapi.Default.GroupVersion(), Codec: testapi.Default.Codec()}, + content: defaultContentConfig(), + serializers: defaultSerializers(), resourceName: testCase.Name, resource: testCase.Resource, } @@ -476,7 +495,8 @@ func TestRequestWatch(t *testing.T) { }, { Request: &Request{ - content: ContentConfig{GroupVersion: testapi.Default.GroupVersion(), Codec: testapi.Default.Codec()}, + content: defaultContentConfig(), + serializers: defaultSerializers(), client: clientFunc(func(req *http.Request) (*http.Response, error) { return &http.Response{ StatusCode: http.StatusForbidden, @@ -492,7 +512,8 @@ func TestRequestWatch(t *testing.T) { }, { Request: &Request{ - content: ContentConfig{GroupVersion: testapi.Default.GroupVersion(), Codec: testapi.Default.Codec()}, + content: defaultContentConfig(), + serializers: defaultSerializers(), client: clientFunc(func(req *http.Request) (*http.Response, error) { return &http.Response{ StatusCode: http.StatusUnauthorized, @@ -508,7 +529,8 @@ func TestRequestWatch(t *testing.T) { }, { Request: &Request{ - content: ContentConfig{GroupVersion: testapi.Default.GroupVersion(), Codec: testapi.Default.Codec()}, + content: defaultContentConfig(), + serializers: defaultSerializers(), client: clientFunc(func(req *http.Request) (*http.Response, error) { return &http.Response{ StatusCode: http.StatusUnauthorized, @@ -620,8 +642,9 @@ func TestRequestStream(t *testing.T) { })))), }, nil }), - content: ContentConfig{Codec: testapi.Default.Codec()}, - baseURL: &url.URL{}, + content: defaultContentConfig(), + serializers: defaultSerializers(), + baseURL: &url.URL{}, }, Err: true, }, @@ -1107,7 +1130,7 @@ func TestAbsPath(t *testing.T) { {"/p1/api/p2", "/api/r1", "/api/", "/p1/api/p2/api/"}, } { u, _ := url.Parse("http://localhost:123" + tc.configPrefix) - r := NewRequest(nil, "POST", u, "", ContentConfig{GroupVersion: &unversioned.GroupVersion{Group: "test"}}, nil, nil).Prefix(tc.resourcePrefix).AbsPath(tc.absPath) + r := NewRequest(nil, "POST", u, "", ContentConfig{GroupVersion: &unversioned.GroupVersion{Group: "test"}}, Serializers{}, nil, nil).Prefix(tc.resourcePrefix).AbsPath(tc.absPath) if r.pathPrefix != tc.wantsAbsPath { t.Errorf("test case %d failed, unexpected path: %q, expected %q", i, r.pathPrefix, tc.wantsAbsPath) } @@ -1127,7 +1150,7 @@ func TestUintParam(t *testing.T) { for _, item := range table { u, _ := url.Parse("http://localhost") - r := NewRequest(nil, "GET", u, "", ContentConfig{GroupVersion: &unversioned.GroupVersion{Group: "test"}}, nil, nil).AbsPath("").UintParam(item.name, item.testVal) + r := NewRequest(nil, "GET", u, "", ContentConfig{GroupVersion: &unversioned.GroupVersion{Group: "test"}}, Serializers{}, nil, nil).AbsPath("").UintParam(item.name, item.testVal) if e, a := item.expectStr, r.URL().String(); e != a { t.Errorf("expected %v, got %v", e, a) } @@ -1233,7 +1256,7 @@ func TestWatch(t *testing.T) { w.WriteHeader(http.StatusOK) flusher.Flush() - encoder := watchjson.NewEncoder(w, testapi.Default.Codec()) + encoder := versioned.NewEncoder(streaming.NewEncoder(w, testapi.Default.Codec()), testapi.Default.Codec()) for _, item := range table { if err := encoder.Encode(&watch.Event{Type: item.t, Object: item.obj}); err != nil { panic(err) @@ -1308,5 +1331,9 @@ func testRESTClient(t testing.TB, srv *httptest.Server) *RESTClient { } } versionedAPIPath := testapi.Default.ResourcePath("", "", "") - return NewRESTClient(baseURL, versionedAPIPath, ContentConfig{GroupVersion: testapi.Default.GroupVersion(), Codec: testapi.Default.Codec()}, 0, 0, nil, nil) + client, err := NewRESTClient(baseURL, versionedAPIPath, defaultContentConfig(), 0, 0, nil, nil) + if err != nil { + t.Fatalf("failed to create a client: %v", err) + } + return client } diff --git a/pkg/client/typed/discovery/discovery_client.go b/pkg/client/typed/discovery/discovery_client.go index 8e130ebc1eeb4..6c0a4039b3411 100644 --- a/pkg/client/typed/discovery/discovery_client.go +++ b/pkg/client/typed/discovery/discovery_client.go @@ -29,6 +29,7 @@ import ( "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/client/restclient" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/runtime/serializer" "k8s.io/kubernetes/pkg/version" ) @@ -213,7 +214,8 @@ func (d *DiscoveryClient) SwaggerSchema(version unversioned.GroupVersion) (*swag func setDiscoveryDefaults(config *restclient.Config) error { config.APIPath = "" config.GroupVersion = nil - config.Codec = runtime.NoopEncoder{Decoder: api.Codecs.UniversalDecoder()} + codec := runtime.NoopEncoder{Decoder: api.Codecs.UniversalDecoder()} + config.NegotiatedSerializer = serializer.NegotiatedSerializerWrapper(codec, codec, runtime.DefaultFramer) if len(config.UserAgent) == 0 { config.UserAgent = restclient.DefaultKubernetesUserAgent() } diff --git a/pkg/client/typed/dynamic/client.go b/pkg/client/typed/dynamic/client.go index 355a2be357e7c..f542cef726cec 100644 --- a/pkg/client/typed/dynamic/client.go +++ b/pkg/client/typed/dynamic/client.go @@ -26,11 +26,14 @@ import ( "net/url" "strings" + "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/client/restclient" "k8s.io/kubernetes/pkg/conversion/queryparams" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/runtime/serializer" + serializerjson "k8s.io/kubernetes/pkg/runtime/serializer/json" "k8s.io/kubernetes/pkg/watch" ) @@ -47,7 +50,9 @@ func NewClient(conf *restclient.Config) (*Client, error) { confCopy := *conf conf = &confCopy - conf.Codec = dynamicCodec{} + codec := dynamicCodec{} + legacyCodec := api.Codecs.LegacyCodec(v1.SchemeGroupVersion) + conf.NegotiatedSerializer = serializer.NegotiatedSerializerWrapper(codec, legacyCodec, serializerjson.Framer) if conf.APIPath == "" { conf.APIPath = "/api" diff --git a/pkg/client/typed/dynamic/client_test.go b/pkg/client/typed/dynamic/client_test.go index 0e1b477732655..b2e7a4236511b 100644 --- a/pkg/client/typed/dynamic/client_test.go +++ b/pkg/client/typed/dynamic/client_test.go @@ -29,8 +29,9 @@ import ( "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/client/restclient" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/runtime/serializer/streaming" "k8s.io/kubernetes/pkg/watch" - watchjson "k8s.io/kubernetes/pkg/watch/json" + "k8s.io/kubernetes/pkg/watch/versioned" ) func getJSON(version, kind, name string) []byte { @@ -449,7 +450,7 @@ func TestWatch(t *testing.T) { t.Errorf("Watch(%q) got path %s. wanted %s", tc.name, r.URL.Path, tc.path) } - enc := watchjson.NewEncoder(w, dynamicCodec{}) + enc := versioned.NewEncoder(streaming.NewEncoder(w, dynamicCodec{}), dynamicCodec{}) for _, e := range tc.events { enc.Encode(&e) } diff --git a/pkg/client/unversioned/apps.go b/pkg/client/unversioned/apps.go index a6612d75a9389..c81c53d84ee2a 100644 --- a/pkg/client/unversioned/apps.go +++ b/pkg/client/unversioned/apps.go @@ -72,6 +72,7 @@ func setAppsDefaults(config *restclient.Config) error { //} config.Codec = api.Codecs.LegacyCodec(*config.GroupVersion) + config.NegotiatedSerializer = api.Codecs if config.QPS == 0 { config.QPS = 5 } diff --git a/pkg/client/unversioned/autoscaling.go b/pkg/client/unversioned/autoscaling.go index c3ec198103013..7566042544f12 100644 --- a/pkg/client/unversioned/autoscaling.go +++ b/pkg/client/unversioned/autoscaling.go @@ -73,6 +73,7 @@ func setAutoscalingDefaults(config *restclient.Config) error { //} config.Codec = api.Codecs.LegacyCodec(*config.GroupVersion) + config.NegotiatedSerializer = api.Codecs if config.QPS == 0 { config.QPS = 5 } diff --git a/pkg/client/unversioned/batch.go b/pkg/client/unversioned/batch.go index a432e4c789c73..b9f30fa8d06d4 100644 --- a/pkg/client/unversioned/batch.go +++ b/pkg/client/unversioned/batch.go @@ -73,6 +73,7 @@ func setBatchDefaults(config *restclient.Config) error { //} config.Codec = api.Codecs.LegacyCodec(*config.GroupVersion) + config.NegotiatedSerializer = api.Codecs if config.QPS == 0 { config.QPS = 5 } diff --git a/pkg/client/unversioned/extensions.go b/pkg/client/unversioned/extensions.go index 5db86dbb8384f..c741390a46204 100644 --- a/pkg/client/unversioned/extensions.go +++ b/pkg/client/unversioned/extensions.go @@ -127,6 +127,7 @@ func setExtensionsDefaults(config *restclient.Config) error { //} config.Codec = api.Codecs.LegacyCodec(*config.GroupVersion) + config.NegotiatedSerializer = api.Codecs if config.QPS == 0 { config.QPS = 5 } diff --git a/pkg/client/unversioned/fake/fake.go b/pkg/client/unversioned/fake/fake.go index 09f1f027452b0..df7824251f94d 100644 --- a/pkg/client/unversioned/fake/fake.go +++ b/pkg/client/unversioned/fake/fake.go @@ -70,7 +70,17 @@ func (c *RESTClient) Delete() *restclient.Request { } func (c *RESTClient) request(verb string) *restclient.Request { - return restclient.NewRequest(c, verb, &url.URL{Host: "localhost"}, "", restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion(), Codec: c.Codec}, nil, nil) + config := restclient.ContentConfig{ + GroupVersion: testapi.Default.GroupVersion(), + Codec: c.Codec, + } + serializers := restclient.Serializers{ + Encoder: c.Codec, + Decoder: c.Codec, + StreamingSerializer: c.Codec, + Framer: runtime.DefaultFramer, + } + return restclient.NewRequest(c, verb, &url.URL{Host: "localhost"}, "", config, serializers, nil, nil) } func (c *RESTClient) Do(req *http.Request) (*http.Response, error) { diff --git a/pkg/client/unversioned/helper.go b/pkg/client/unversioned/helper.go index a81307c34c92a..0f2d6694597ab 100644 --- a/pkg/client/unversioned/helper.go +++ b/pkg/client/unversioned/helper.go @@ -231,6 +231,9 @@ func SetKubernetesDefaults(config *restclient.Config) error { // TODO: Unconditionally set the config.Version, until we fix the config. copyGroupVersion := g.GroupVersion config.GroupVersion = ©GroupVersion + if config.NegotiatedSerializer == nil { + config.NegotiatedSerializer = api.Codecs + } if config.Codec == nil { config.Codec = api.Codecs.LegacyCodec(*config.GroupVersion) } diff --git a/pkg/client/unversioned/helper_test.go b/pkg/client/unversioned/helper_test.go index 1d506ff9763ad..80660a6d05695 100644 --- a/pkg/client/unversioned/helper_test.go +++ b/pkg/client/unversioned/helper_test.go @@ -40,8 +40,9 @@ func TestSetKubernetesDefaults(t *testing.T) { restclient.Config{ APIPath: "/api", ContentConfig: restclient.ContentConfig{ - GroupVersion: testapi.Default.GroupVersion(), - Codec: testapi.Default.Codec(), + GroupVersion: testapi.Default.GroupVersion(), + Codec: testapi.Default.Codec(), + NegotiatedSerializer: testapi.NegotiatedSerializer, }, QPS: 5, Burst: 10, @@ -125,7 +126,7 @@ func TestHelperGetServerAPIVersions(t *testing.T) { w.Write(output) })) defer server.Close() - got, err := restclient.ServerAPIVersions(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &unversioned.GroupVersion{Group: "invalid version", Version: "one"}, Codec: testapi.Default.Codec()}}) + got, err := restclient.ServerAPIVersions(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &unversioned.GroupVersion{Group: "invalid version", Version: "one"}, NegotiatedSerializer: testapi.NegotiatedSerializer}}) if err != nil { t.Fatalf("unexpected encoding error: %v", err) } diff --git a/pkg/client/unversioned/remotecommand/remotecommand_test.go b/pkg/client/unversioned/remotecommand/remotecommand_test.go index 84b16219d9ea7..3c68fe5d9f6de 100644 --- a/pkg/client/unversioned/remotecommand/remotecommand_test.go +++ b/pkg/client/unversioned/remotecommand/remotecommand_test.go @@ -30,6 +30,7 @@ import ( "time" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/testapi" "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/client/restclient" "k8s.io/kubernetes/pkg/kubelet/server/remotecommand" @@ -212,7 +213,14 @@ func TestStream(t *testing.T) { server := httptest.NewServer(fakeServer(t, name, exec, testCase.Stdin, testCase.Stdout, testCase.Stderr, testCase.Error, testCase.Tty, testCase.MessageCount, testCase.ServerProtocols)) url, _ := url.ParseRequestURI(server.URL) - c := restclient.NewRESTClient(url, "", restclient.ContentConfig{GroupVersion: &unversioned.GroupVersion{Group: "x"}}, -1, -1, nil, nil) + config := restclient.ContentConfig{ + GroupVersion: &unversioned.GroupVersion{Group: "x"}, + NegotiatedSerializer: testapi.NegotiatedSerializer, + } + c, err := restclient.NewRESTClient(url, "", config, -1, -1, nil, nil) + if err != nil { + t.Fatalf("failed to create a client: %v", err) + } req := c.Post().Resource("testing") if exec { diff --git a/pkg/controller/replicaset/replica_set_test.go b/pkg/controller/replicaset/replica_set_test.go index 05a71f0247587..0dd8d64b211f8 100644 --- a/pkg/controller/replicaset/replica_set_test.go +++ b/pkg/controller/replicaset/replica_set_test.go @@ -224,7 +224,7 @@ func TestStatusUpdatesWithoutReplicasChange(t *testing.T) { // Setup a fake server to listen for requests, and run the ReplicaSet controller in steady state fakeHandler := utiltesting.FakeHandler{ StatusCode: 200, - ResponseBody: "", + ResponseBody: "{}", } testServer := httptest.NewServer(&fakeHandler) defer testServer.Close() @@ -266,7 +266,7 @@ func TestControllerUpdateReplicas(t *testing.T) { // This is a happy server just to record the PUT request we expect for status.Replicas fakeHandler := utiltesting.FakeHandler{ StatusCode: 200, - ResponseBody: "", + ResponseBody: "{}", } testServer := httptest.NewServer(&fakeHandler) defer testServer.Close() @@ -311,7 +311,7 @@ func TestSyncReplicaSetDormancy(t *testing.T) { // Setup a test server so we can lie about the current state of pods fakeHandler := utiltesting.FakeHandler{ StatusCode: 200, - ResponseBody: "", + ResponseBody: "{}", } testServer := httptest.NewServer(&fakeHandler) defer testServer.Close() @@ -574,7 +574,7 @@ func TestControllerUpdateRequeue(t *testing.T) { // This server should force a requeue of the controller because it fails to update status.Replicas. fakeHandler := utiltesting.FakeHandler{ StatusCode: 500, - ResponseBody: "", + ResponseBody: "{}", } testServer := httptest.NewServer(&fakeHandler) defer testServer.Close() diff --git a/pkg/kubectl/cmd/get_test.go b/pkg/kubectl/cmd/get_test.go index 653d5f0eb32b3..8b0981233cc7b 100644 --- a/pkg/kubectl/cmd/get_test.go +++ b/pkg/kubectl/cmd/get_test.go @@ -35,9 +35,11 @@ import ( "k8s.io/kubernetes/pkg/client/restclient" "k8s.io/kubernetes/pkg/client/unversioned/fake" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/runtime/serializer/json" + "k8s.io/kubernetes/pkg/runtime/serializer/streaming" "k8s.io/kubernetes/pkg/util/diff" "k8s.io/kubernetes/pkg/watch" - "k8s.io/kubernetes/pkg/watch/json" + "k8s.io/kubernetes/pkg/watch/versioned" ) func testData() (*api.PodList, *api.ServiceList, *api.ReplicationControllerList) { @@ -859,9 +861,9 @@ func TestWatchOnlyResource(t *testing.T) { func watchBody(codec runtime.Codec, events []watch.Event) io.ReadCloser { buf := bytes.NewBuffer([]byte{}) - enc := json.NewEncoder(buf, codec) + enc := versioned.NewEncoder(streaming.NewEncoder(buf, codec), codec) for i := range events { enc.Encode(&events[i]) } - return ioutil.NopCloser(buf) + return json.Framer.NewFrameReader(ioutil.NopCloser(buf)) } diff --git a/pkg/kubectl/resource/builder_test.go b/pkg/kubectl/resource/builder_test.go index d99ba8e747c62..15abe6fe82194 100644 --- a/pkg/kubectl/resource/builder_test.go +++ b/pkg/kubectl/resource/builder_test.go @@ -39,10 +39,11 @@ import ( "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/client/unversioned/fake" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/runtime/serializer/streaming" utilerrors "k8s.io/kubernetes/pkg/util/errors" utiltesting "k8s.io/kubernetes/pkg/util/testing" "k8s.io/kubernetes/pkg/watch" - watchjson "k8s.io/kubernetes/pkg/watch/json" + "k8s.io/kubernetes/pkg/watch/versioned" ) func stringBody(body string) io.ReadCloser { @@ -51,7 +52,8 @@ func stringBody(body string) io.ReadCloser { func watchBody(events ...watch.Event) string { buf := &bytes.Buffer{} - enc := watchjson.NewEncoder(buf, testapi.Default.Codec()) + codec := testapi.Default.Codec() + enc := versioned.NewEncoder(streaming.NewEncoder(buf, codec), codec) for _, e := range events { enc.Encode(&e) } diff --git a/pkg/runtime/serializer/negotiated_codec.go b/pkg/runtime/serializer/negotiated_codec.go new file mode 100644 index 0000000000000..0f0f369118401 --- /dev/null +++ b/pkg/runtime/serializer/negotiated_codec.go @@ -0,0 +1,58 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package serializer + +import ( + "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/runtime" +) + +// TODO: We should figure out what happens when someone asks +// encoder for version and it conflicts with the raw serializer. +type negotiatedSerializerWrapper struct { + serializer runtime.Serializer + streamingSerializer runtime.Serializer + framer runtime.Framer +} + +func NegotiatedSerializerWrapper(serializer, streamingSerializer runtime.Serializer, framer runtime.Framer) runtime.NegotiatedSerializer { + return &negotiatedSerializerWrapper{serializer, streamingSerializer, framer} +} + +func (n *negotiatedSerializerWrapper) SupportedMediaTypes() []string { + return []string{} +} + +func (n *negotiatedSerializerWrapper) SerializerForMediaType(mediaType string, options map[string]string) (runtime.Serializer, bool) { + return n.serializer, true +} + +func (n *negotiatedSerializerWrapper) SupportedStreamingMediaTypes() []string { + return []string{} +} + +func (n *negotiatedSerializerWrapper) StreamingSerializerForMediaType(mediaType string, options map[string]string) (runtime.Serializer, runtime.Framer, string, bool) { + return n.streamingSerializer, n.framer, "", true +} + +func (n *negotiatedSerializerWrapper) EncoderForVersion(serializer runtime.Serializer, gv unversioned.GroupVersion) runtime.Encoder { + return n.serializer +} + +func (n *negotiatedSerializerWrapper) DecoderToVersion(serializer runtime.Serializer, gv unversioned.GroupVersion) runtime.Decoder { + return n.serializer +} diff --git a/pkg/util/framer/framer.go b/pkg/util/framer/framer.go index 1e886e818f6a7..615d07de1ccdc 100644 --- a/pkg/util/framer/framer.go +++ b/pkg/util/framer/framer.go @@ -145,6 +145,7 @@ func (r *jsonFrameReader) Read(data []byte) (int, error) { // RawMessage#Unmarshal appends to data - we reset the slice down to 0 and will either see // data written to data, or be larger than data and a different array. + n := len(data) m := json.RawMessage(data[:0]) if err := r.decoder.Decode(&m); err != nil { return 0, err @@ -153,7 +154,7 @@ func (r *jsonFrameReader) Read(data []byte) (int, error) { // If capacity of data is less than length of the message, decoder will allocate a new slice // and set m to it, which means we need to copy the partial result back into data and preserve // the remaining result for subsequent reads. - if n := cap(data); len(m) > n { + if len(m) > n { data = append(data[0:0], m[:n]...) r.remaining = m[n:] return n, io.ErrShortBuffer diff --git a/pkg/util/testing/fake_handler.go b/pkg/util/testing/fake_handler.go index 3fc9e3ed0aa4a..d25b2c25c72c4 100644 --- a/pkg/util/testing/fake_handler.go +++ b/pkg/util/testing/fake_handler.go @@ -63,6 +63,7 @@ func (f *FakeHandler) ServeHTTP(response http.ResponseWriter, request *http.Requ } f.RequestReceived = request + response.Header().Set("Content-Type", "application/json") response.WriteHeader(f.StatusCode) response.Write([]byte(f.ResponseBody)) diff --git a/pkg/watch/iowatcher.go b/pkg/watch/streamwatcher.go similarity index 98% rename from pkg/watch/iowatcher.go rename to pkg/watch/streamwatcher.go index 505e6bfcabf87..2802a9e01f4f0 100644 --- a/pkg/watch/iowatcher.go +++ b/pkg/watch/streamwatcher.go @@ -42,9 +42,9 @@ type Decoder interface { // StreamWatcher turns any stream for which you can write a Decoder interface // into a watch.Interface. type StreamWatcher struct { - source Decoder - result chan Event sync.Mutex + source Decoder + result chan Event stopped bool } diff --git a/pkg/watch/iowatcher_test.go b/pkg/watch/streamwatcher_test.go similarity index 100% rename from pkg/watch/iowatcher_test.go rename to pkg/watch/streamwatcher_test.go diff --git a/pkg/watch/json/decoder.go b/pkg/watch/versioned/decoder.go similarity index 58% rename from pkg/watch/json/decoder.go rename to pkg/watch/versioned/decoder.go index 45bf06d2f2867..2d13ca809ff19 100644 --- a/pkg/watch/json/decoder.go +++ b/pkg/watch/versioned/decoder.go @@ -14,56 +14,58 @@ See the License for the specific language governing permissions and limitations under the License. */ -package json +package versioned import ( - "encoding/json" "fmt" - "io" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/runtime/serializer/streaming" "k8s.io/kubernetes/pkg/watch" ) // Decoder implements the watch.Decoder interface for io.ReadClosers that -// have contents which consist of a series of watchEvent objects encoded via JSON. -// It will decode any object registered in the supplied codec. +// have contents which consist of a series of watchEvent objects encoded +// with the given streaming decoder. The internal objects will be then +// decoded by the embedded decoder. type Decoder struct { - r io.ReadCloser - decoder *json.Decoder - codec runtime.Codec + decoder streaming.Decoder + embeddedDecoder runtime.Decoder } // NewDecoder creates an Decoder for the given writer and codec. -func NewDecoder(r io.ReadCloser, codec runtime.Codec) *Decoder { +func NewDecoder(decoder streaming.Decoder, embeddedDecoder runtime.Decoder) *Decoder { return &Decoder{ - r: r, - decoder: json.NewDecoder(r), - codec: codec, + decoder: decoder, + embeddedDecoder: embeddedDecoder, } } -// Decode blocks until it can return the next object in the writer. Returns an error -// if the writer is closed or an object can't be decoded. +// Decode blocks until it can return the next object in the reader. Returns an error +// if the reader is closed or an object can't be decoded. func (d *Decoder) Decode() (watch.EventType, runtime.Object, error) { - var got WatchEvent - if err := d.decoder.Decode(&got); err != nil { + var got Event + res, _, err := d.decoder.Decode(nil, &got) + if err != nil { return "", nil, err } + if res != &got { + return "", nil, fmt.Errorf("unable to decode to versioned.Event") + } switch got.Type { - case watch.Added, watch.Modified, watch.Deleted, watch.Error: + case string(watch.Added), string(watch.Modified), string(watch.Deleted), string(watch.Error): default: return "", nil, fmt.Errorf("got invalid watch event type: %v", got.Type) } - obj, err := runtime.Decode(d.codec, got.Object.Raw) + obj, err := runtime.Decode(d.embeddedDecoder, got.Object.Raw) if err != nil { return "", nil, fmt.Errorf("unable to decode watch event: %v", err) } - return got.Type, obj, nil + return watch.EventType(got.Type), obj, nil } // Close closes the underlying r. func (d *Decoder) Close() { - d.r.Close() + d.decoder.Close() } diff --git a/pkg/watch/json/decoder_test.go b/pkg/watch/versioned/decoder_test.go similarity index 81% rename from pkg/watch/json/decoder_test.go rename to pkg/watch/versioned/decoder_test.go index e8ad4e91c7afc..1ccb38108b25d 100644 --- a/pkg/watch/json/decoder_test.go +++ b/pkg/watch/versioned/decoder_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package json +package versioned_test import ( "encoding/json" @@ -25,8 +25,10 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/testapi" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/runtime/serializer/streaming" "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/watch" + "k8s.io/kubernetes/pkg/watch/versioned" ) func TestDecoder(t *testing.T) { @@ -34,7 +36,8 @@ func TestDecoder(t *testing.T) { for _, eventType := range table { out, in := io.Pipe() - decoder := NewDecoder(out, testapi.Default.Codec()) + codec := testapi.Default.Codec() + decoder := versioned.NewDecoder(streaming.NewDecoder(out, codec), codec) expect := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}} encoder := json.NewEncoder(in) @@ -43,7 +46,11 @@ func TestDecoder(t *testing.T) { if err != nil { t.Fatalf("Unexpected error %v", err) } - if err := encoder.Encode(&WatchEvent{eventType, runtime.RawExtension{Raw: json.RawMessage(data)}}); err != nil { + event := versioned.Event{ + Type: string(eventType), + Object: runtime.RawExtension{Raw: json.RawMessage(data)}, + } + if err := encoder.Encode(&event); err != nil { t.Errorf("Unexpected error %v", err) } in.Close() @@ -82,7 +89,8 @@ func TestDecoder(t *testing.T) { func TestDecoder_SourceClose(t *testing.T) { out, in := io.Pipe() - decoder := NewDecoder(out, testapi.Default.Codec()) + codec := testapi.Default.Codec() + decoder := versioned.NewDecoder(streaming.NewDecoder(out, codec), codec) done := make(chan struct{}) diff --git a/pkg/watch/json/encoder.go b/pkg/watch/versioned/encoder.go similarity index 58% rename from pkg/watch/json/encoder.go rename to pkg/watch/versioned/encoder.go index 1eaf6a8a09cb1..8438ee984f384 100644 --- a/pkg/watch/json/encoder.go +++ b/pkg/watch/versioned/encoder.go @@ -14,40 +14,38 @@ See the License for the specific language governing permissions and limitations under the License. */ -package json +package versioned import ( "encoding/json" - "io" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/runtime/serializer/streaming" "k8s.io/kubernetes/pkg/watch" ) -// Encoder implements the json.Encoder interface for io.Writers that -// should serialize WatchEvent objects into JSON. It will encode any object -// registered in the supplied codec and return an error otherwies. +// Encoder serializes watch.Events into io.Writer. The internal objects +// are encoded using embedded encoder, and the outer Event is serialized +// using encoder. type Encoder struct { - w io.Writer - encoder *json.Encoder - codec runtime.Encoder + encoder streaming.Encoder + embeddedEncoder runtime.Encoder } -// NewEncoder creates an Encoder for the given writer and codec -func NewEncoder(w io.Writer, codec runtime.Encoder) *Encoder { +func NewEncoder(encoder streaming.Encoder, embeddedEncoder runtime.Encoder) *Encoder { return &Encoder{ - w: w, - encoder: json.NewEncoder(w), - codec: codec, + encoder: encoder, + embeddedEncoder: embeddedEncoder, } } // Encode writes an event to the writer. Returns an error // if the writer is closed or an object can't be encoded. func (e *Encoder) Encode(event *watch.Event) error { - obj, err := Object(e.codec, event) + data, err := runtime.Encode(e.embeddedEncoder, event.Object) if err != nil { return err } - return e.encoder.Encode(obj) + // FIXME: get rid of json.RawMessage. + return e.encoder.Encode(&Event{string(event.Type), runtime.RawExtension{Raw: json.RawMessage(data)}}) } diff --git a/pkg/watch/json/encoder_test.go b/pkg/watch/versioned/encoder_test.go similarity index 84% rename from pkg/watch/json/encoder_test.go rename to pkg/watch/versioned/encoder_test.go index 92550692fdda8..cc0e615019de2 100644 --- a/pkg/watch/json/encoder_test.go +++ b/pkg/watch/versioned/encoder_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package json +package versioned_test import ( "bytes" @@ -24,7 +24,9 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/testapi" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/runtime/serializer/streaming" "k8s.io/kubernetes/pkg/watch" + "k8s.io/kubernetes/pkg/watch/versioned" ) func TestEncodeDecodeRoundTrip(t *testing.T) { @@ -52,13 +54,15 @@ func TestEncodeDecodeRoundTrip(t *testing.T) { for i, testCase := range testCases { buf := &bytes.Buffer{} - encoder := NewEncoder(buf, testCase.Codec) + codec := testCase.Codec + encoder := versioned.NewEncoder(streaming.NewEncoder(buf, codec), codec) if err := encoder.Encode(&watch.Event{Type: testCase.Type, Object: testCase.Object}); err != nil { t.Errorf("%d: unexpected error: %v", i, err) continue } - decoder := NewDecoder(ioutil.NopCloser(buf), testCase.Codec) + rc := ioutil.NopCloser(buf) + decoder := versioned.NewDecoder(streaming.NewDecoder(rc, codec), codec) event, obj, err := decoder.Decode() if err != nil { t.Errorf("%d: unexpected error: %v", i, err) diff --git a/plugin/pkg/auth/authorizer/webhook/webhook.go b/plugin/pkg/auth/authorizer/webhook/webhook.go index 6cce9ffe47b52..65b88865e05a6 100644 --- a/plugin/pkg/auth/authorizer/webhook/webhook.go +++ b/plugin/pkg/auth/authorizer/webhook/webhook.go @@ -29,6 +29,7 @@ import ( "k8s.io/kubernetes/pkg/client/restclient" "k8s.io/kubernetes/pkg/client/unversioned/clientcmd" "k8s.io/kubernetes/pkg/runtime" + runtimeserializer "k8s.io/kubernetes/pkg/runtime/serializer" "k8s.io/kubernetes/pkg/runtime/serializer/json" "k8s.io/kubernetes/pkg/runtime/serializer/versioning" @@ -86,7 +87,8 @@ func New(kubeConfigFile string) (*WebhookAuthorizer, error) { return nil, err } serializer := json.NewSerializer(json.DefaultMetaFactory, api.Scheme, runtime.ObjectTyperToTyper(api.Scheme), false) - clientConfig.ContentConfig.Codec = versioning.NewCodecForScheme(api.Scheme, serializer, serializer, encodeVersions, decodeVersions) + codec := versioning.NewCodecForScheme(api.Scheme, serializer, serializer, encodeVersions, decodeVersions) + clientConfig.ContentConfig.NegotiatedSerializer = runtimeserializer.NegotiatedSerializerWrapper(codec, codec, json.Framer) restClient, err := restclient.UnversionedRESTClientFor(clientConfig) if err != nil {