From 76f5275dbcbcc85f6235e01a91c32e5f576821ef Mon Sep 17 00:00:00 2001 From: Eric Chiang Date: Wed, 12 Dec 2018 09:09:37 -0800 Subject: [PATCH] *: allow watches on individual resources --- resource.go | 16 ++++++- resource_test.go | 110 +++++++++++++++++++++++++++++++++++++++++++++++ watch.go | 16 +++++++ watch_test.go | 82 +++++++++++++++++++++++------------ 4 files changed, 196 insertions(+), 28 deletions(-) diff --git a/resource.go b/resource.go index 3cf32f5..fdb95c0 100644 --- a/resource.go +++ b/resource.go @@ -260,11 +260,25 @@ func resourceWatchURL(endpoint, namespace string, r Resource, options ...Option) return "", fmt.Errorf("unregistered type %T", r) } + // Hack to let watch work on individual resources + name := "" + if meta := r.GetMetadata(); meta != nil && meta.Name != nil { + name = *meta.Name + if meta.Namespace != nil { + // Ensure that namespaces aren't different. + ns := *meta.Namespace + if namespace != "" && ns != namespace { + return "", fmt.Errorf("different namespace provided on resource than to watch call") + } + namespace = ns + } + } + if !t.namespaced && namespace != "" { return "", fmt.Errorf("type not namespaced") } - url := urlFor(endpoint, t.apiGroup, t.apiVersion, namespace, t.name, "", options...) + url := urlFor(endpoint, t.apiGroup, t.apiVersion, namespace, t.name, name, options...) if strings.Contains(url, "?") { url = url + "&watch=true" } else { diff --git a/resource_test.go b/resource_test.go index 8da7cf8..d6435dd 100644 --- a/resource_test.go +++ b/resource_test.go @@ -161,3 +161,113 @@ func TestResourceURL(t *testing.T) { }) } } + +func TestResourceWatchURL(t *testing.T) { + tests := []struct { + name string + endpoint string + namespace string + resource Resource + options []Option + want string + wantErr bool + }{ + { + name: "watch_pods", + namespace: "my-namespace", + endpoint: "https://k8s.example.com/foo/", + resource: &Pod{}, + want: "https://k8s.example.com/foo/api/v1/namespaces/my-namespace/pods?watch=true", + }, + { + name: "watch_all_pods", + endpoint: "https://k8s.example.com/foo/", + resource: &Pod{}, + want: "https://k8s.example.com/foo/api/v1/pods?watch=true", + }, + { + name: "watch_deployments", + namespace: "my-namespace", + endpoint: "https://k8s.example.com/foo/", + resource: &Deployment{}, + want: "https://k8s.example.com/foo/apis/apps/v1beta2/namespaces/my-namespace/deployments?watch=true", + }, + { + name: "watch_with_options", + namespace: "my-namespace", + endpoint: "https://k8s.example.com/foo/", + resource: &Deployment{}, + options: []Option{ + Timeout(time.Minute), + }, + want: "https://k8s.example.com/foo/apis/apps/v1beta2/namespaces/my-namespace/deployments?timeoutSeconds=60&watch=true", + }, + { + name: "watch_non_namespaced", + endpoint: "https://k8s.example.com/foo/", + resource: &ClusterRole{}, + want: "https://k8s.example.com/foo/apis/rbac.authorization.k8s.io/v1/clusterroles?watch=true", + }, + { + name: "watch_non_namespaced_with_namespace", + namespace: "my-namespace", + endpoint: "https://k8s.example.com/foo/", + resource: &ClusterRole{}, + wantErr: true, // can't provide a namespace for a non-namespaced resource + }, + { + name: "watch_deployment", + endpoint: "https://k8s.example.com/foo/", + resource: &Deployment{ + Metadata: &metav1.ObjectMeta{ + Namespace: String("my-namespace"), + Name: String("my-deployment"), + }, + }, + want: "https://k8s.example.com/foo/apis/apps/v1beta2/namespaces/my-namespace/deployments/my-deployment?watch=true", + }, + { + name: "watch_deployment_ns_in_call", + endpoint: "https://k8s.example.com/foo/", + namespace: "my-namespace", + resource: &Deployment{ + Metadata: &metav1.ObjectMeta{ + Name: String("my-deployment"), + }, + }, + want: "https://k8s.example.com/foo/apis/apps/v1beta2/namespaces/my-namespace/deployments/my-deployment?watch=true", + }, + { + name: "watch_deployment_mismatched_ns", + endpoint: "https://k8s.example.com/foo/", + namespace: "my-other-namespace", + resource: &Deployment{ + Metadata: &metav1.ObjectMeta{ + Namespace: String("my-namespace"), + Name: String("my-deployment"), + }, + }, + wantErr: true, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + got, err := resourceWatchURL( + test.endpoint, + test.namespace, + test.resource, + test.options..., + ) + if err != nil { + if !test.wantErr { + t.Fatalf("resourceWatchURL: %v", err) + } + return + } + if got != test.want { + t.Errorf("want: %q", test.want) + t.Errorf("got : %q", got) + } + }) + } +} diff --git a/watch.go b/watch.go index 91c82dc..8b145c0 100644 --- a/watch.go +++ b/watch.go @@ -160,6 +160,22 @@ func parseUnknown(b []byte) (*runtime.Unknown, error) { // fmt.Println(eventType, *cm.Metadata.Name) // } // +// To watch an individual resource, provide a resource with pre-populated +// metadata: +// +// // Watch "my-configmap" in "my-namespace" +// configMap := corev1.ConfigMap{ +// Metadata: &metav1.ObjectMeta{ +// Namespace: String("my-namespace"), +// Name: String("my-configmap"), +// }, +// } +// watcher, err := client.Watch(ctx, "", &configMap) +// if err != nil { +// // handle error +// } +// defer watcher.Close() // Always close the returned watcher. +// func (c *Client) Watch(ctx context.Context, namespace string, r Resource, options ...Option) (*Watcher, error) { url, err := resourceWatchURL(c.Endpoint, namespace, r, options...) if err != nil { diff --git a/watch_test.go b/watch_test.go index f1e7dc5..c62fcd9 100644 --- a/watch_test.go +++ b/watch_test.go @@ -24,38 +24,48 @@ func init() { k8s.Register("", "v1", "configmaps", true, &configMapJSON{}) } -func testWatch(t *testing.T, client *k8s.Client, namespace string, newCM func() k8s.Resource, update func(cm k8s.Resource)) { - w, err := client.Watch(context.TODO(), namespace, newCM()) +func wantEvent(t *testing.T, w *k8s.Watcher, eventType string, got, want k8s.Resource) { + t.Helper() + eT, err := w.Next(got) if err != nil { - t.Errorf("watch configmaps: %v", err) + t.Errorf("decode watch event: %v", err) + return } - defer w.Close() + if eT != eventType { + t.Errorf("expected event type %q got %q", eventType, eT) + } + want.GetMetadata().ResourceVersion = k8s.String("") + got.GetMetadata().ResourceVersion = k8s.String("") + if !reflect.DeepEqual(got, want) { + t.Errorf("configmaps didn't match") + t.Errorf("want: %#v", want) + t.Errorf(" got: %#v", got) + } +} +func testWatch(t *testing.T, client *k8s.Client, namespace string, r k8s.Resource, newCM func() k8s.Resource, update func(cm k8s.Resource)) { cm := newCM() - want := func(eventType string) { - got := newCM() - eT, err := w.Next(got) - if err != nil { - t.Errorf("decode watch event: %v", err) + + if r.GetMetadata() != nil { + // Individual watch must created beforehand + if err := client.Create(context.TODO(), cm); err != nil { + t.Errorf("create configmap: %v", err) return } - if eT != eventType { - t.Errorf("expected event type %q got %q", eventType, eT) - } - cm.GetMetadata().ResourceVersion = k8s.String("") - got.GetMetadata().ResourceVersion = k8s.String("") - if !reflect.DeepEqual(got, cm) { - t.Errorf("configmaps didn't match") - t.Errorf("want: %#v", cm) - t.Errorf(" got: %#v", got) - } } + w, err := client.Watch(context.TODO(), namespace, r) + if err != nil { + t.Fatalf("watch configmaps: %v", err) + } + defer w.Close() - if err := client.Create(context.TODO(), cm); err != nil { - t.Errorf("create configmap: %v", err) - return + if r.GetMetadata() == nil { + if err := client.Create(context.TODO(), cm); err != nil { + t.Errorf("create configmap: %v", err) + return + } + wantEvent(t, w, k8s.EventAdded, newCM(), cm) } - want(k8s.EventAdded) update(cm) @@ -63,13 +73,13 @@ func testWatch(t *testing.T, client *k8s.Client, namespace string, newCM func() t.Errorf("update configmap: %v", err) return } - want(k8s.EventModified) + wantEvent(t, w, k8s.EventModified, newCM(), cm) if err := client.Delete(context.TODO(), cm); err != nil { t.Errorf("Delete configmap: %v", err) return } - want(k8s.EventDeleted) + wantEvent(t, w, k8s.EventDeleted, newCM(), cm) } func TestWatchConfigMapJSON(t *testing.T) { @@ -86,7 +96,7 @@ func TestWatchConfigMapJSON(t *testing.T) { updateCM := func(cm k8s.Resource) { (cm.(*configMapJSON)).Data = map[string]string{"hello": "world"} } - testWatch(t, client, namespace, newCM, updateCM) + testWatch(t, client, namespace, &configMapJSON{}, newCM, updateCM) }) } @@ -104,6 +114,24 @@ func TestWatchConfigMapProto(t *testing.T) { updateCM := func(cm k8s.Resource) { (cm.(*corev1.ConfigMap)).Data = map[string]string{"hello": "world"} } - testWatch(t, client, namespace, newCM, updateCM) + testWatch(t, client, namespace, &corev1.ConfigMap{}, newCM, updateCM) + }) +} + +func TestWatchIndividualConfigMap(t *testing.T) { + withNamespace(t, func(client *k8s.Client, namespace string) { + newCM := func() k8s.Resource { + return &corev1.ConfigMap{ + Metadata: &metav1.ObjectMeta{ + Name: k8s.String("my-configmap"), + Namespace: &namespace, + }, + } + } + + updateCM := func(cm k8s.Resource) { + (cm.(*corev1.ConfigMap)).Data = map[string]string{"hello": "world"} + } + testWatch(t, client, namespace, newCM(), newCM, updateCM) }) }