Skip to content
This repository has been archived by the owner on Nov 19, 2020. It is now read-only.

Commit

Permalink
*: allow watches on individual resources
Browse files Browse the repository at this point in the history
  • Loading branch information
ericchiang committed Dec 12, 2018
1 parent db7ff61 commit 76f5275
Show file tree
Hide file tree
Showing 4 changed files with 196 additions and 28 deletions.
16 changes: 15 additions & 1 deletion resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,11 +260,25 @@ func resourceWatchURL(endpoint, namespace string, r Resource, options ...Option)
return "", fmt.Errorf("unregistered type %T", r)
}

// Hack to let watch work on individual resources
name := ""
if meta := r.GetMetadata(); meta != nil && meta.Name != nil {
name = *meta.Name
if meta.Namespace != nil {
// Ensure that namespaces aren't different.
ns := *meta.Namespace
if namespace != "" && ns != namespace {
return "", fmt.Errorf("different namespace provided on resource than to watch call")
}
namespace = ns
}
}

if !t.namespaced && namespace != "" {
return "", fmt.Errorf("type not namespaced")
}

url := urlFor(endpoint, t.apiGroup, t.apiVersion, namespace, t.name, "", options...)
url := urlFor(endpoint, t.apiGroup, t.apiVersion, namespace, t.name, name, options...)
if strings.Contains(url, "?") {
url = url + "&watch=true"
} else {
Expand Down
110 changes: 110 additions & 0 deletions resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,3 +161,113 @@ func TestResourceURL(t *testing.T) {
})
}
}

func TestResourceWatchURL(t *testing.T) {
tests := []struct {
name string
endpoint string
namespace string
resource Resource
options []Option
want string
wantErr bool
}{
{
name: "watch_pods",
namespace: "my-namespace",
endpoint: "https://k8s.example.com/foo/",
resource: &Pod{},
want: "https://k8s.example.com/foo/api/v1/namespaces/my-namespace/pods?watch=true",
},
{
name: "watch_all_pods",
endpoint: "https://k8s.example.com/foo/",
resource: &Pod{},
want: "https://k8s.example.com/foo/api/v1/pods?watch=true",
},
{
name: "watch_deployments",
namespace: "my-namespace",
endpoint: "https://k8s.example.com/foo/",
resource: &Deployment{},
want: "https://k8s.example.com/foo/apis/apps/v1beta2/namespaces/my-namespace/deployments?watch=true",
},
{
name: "watch_with_options",
namespace: "my-namespace",
endpoint: "https://k8s.example.com/foo/",
resource: &Deployment{},
options: []Option{
Timeout(time.Minute),
},
want: "https://k8s.example.com/foo/apis/apps/v1beta2/namespaces/my-namespace/deployments?timeoutSeconds=60&watch=true",
},
{
name: "watch_non_namespaced",
endpoint: "https://k8s.example.com/foo/",
resource: &ClusterRole{},
want: "https://k8s.example.com/foo/apis/rbac.authorization.k8s.io/v1/clusterroles?watch=true",
},
{
name: "watch_non_namespaced_with_namespace",
namespace: "my-namespace",
endpoint: "https://k8s.example.com/foo/",
resource: &ClusterRole{},
wantErr: true, // can't provide a namespace for a non-namespaced resource
},
{
name: "watch_deployment",
endpoint: "https://k8s.example.com/foo/",
resource: &Deployment{
Metadata: &metav1.ObjectMeta{
Namespace: String("my-namespace"),
Name: String("my-deployment"),
},
},
want: "https://k8s.example.com/foo/apis/apps/v1beta2/namespaces/my-namespace/deployments/my-deployment?watch=true",
},
{
name: "watch_deployment_ns_in_call",
endpoint: "https://k8s.example.com/foo/",
namespace: "my-namespace",
resource: &Deployment{
Metadata: &metav1.ObjectMeta{
Name: String("my-deployment"),
},
},
want: "https://k8s.example.com/foo/apis/apps/v1beta2/namespaces/my-namespace/deployments/my-deployment?watch=true",
},
{
name: "watch_deployment_mismatched_ns",
endpoint: "https://k8s.example.com/foo/",
namespace: "my-other-namespace",
resource: &Deployment{
Metadata: &metav1.ObjectMeta{
Namespace: String("my-namespace"),
Name: String("my-deployment"),
},
},
wantErr: true,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
got, err := resourceWatchURL(
test.endpoint,
test.namespace,
test.resource,
test.options...,
)
if err != nil {
if !test.wantErr {
t.Fatalf("resourceWatchURL: %v", err)
}
return
}
if got != test.want {
t.Errorf("want: %q", test.want)
t.Errorf("got : %q", got)
}
})
}
}
16 changes: 16 additions & 0 deletions watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,22 @@ func parseUnknown(b []byte) (*runtime.Unknown, error) {
// fmt.Println(eventType, *cm.Metadata.Name)
// }
//
// To watch an individual resource, provide a resource with pre-populated
// metadata:
//
// // Watch "my-configmap" in "my-namespace"
// configMap := corev1.ConfigMap{
// Metadata: &metav1.ObjectMeta{
// Namespace: String("my-namespace"),
// Name: String("my-configmap"),
// },
// }
// watcher, err := client.Watch(ctx, "", &configMap)
// if err != nil {
// // handle error
// }
// defer watcher.Close() // Always close the returned watcher.
//
func (c *Client) Watch(ctx context.Context, namespace string, r Resource, options ...Option) (*Watcher, error) {
url, err := resourceWatchURL(c.Endpoint, namespace, r, options...)
if err != nil {
Expand Down
82 changes: 55 additions & 27 deletions watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,52 +24,62 @@ func init() {
k8s.Register("", "v1", "configmaps", true, &configMapJSON{})
}

func testWatch(t *testing.T, client *k8s.Client, namespace string, newCM func() k8s.Resource, update func(cm k8s.Resource)) {
w, err := client.Watch(context.TODO(), namespace, newCM())
func wantEvent(t *testing.T, w *k8s.Watcher, eventType string, got, want k8s.Resource) {
t.Helper()
eT, err := w.Next(got)
if err != nil {
t.Errorf("watch configmaps: %v", err)
t.Errorf("decode watch event: %v", err)
return
}
defer w.Close()
if eT != eventType {
t.Errorf("expected event type %q got %q", eventType, eT)
}
want.GetMetadata().ResourceVersion = k8s.String("")
got.GetMetadata().ResourceVersion = k8s.String("")
if !reflect.DeepEqual(got, want) {
t.Errorf("configmaps didn't match")
t.Errorf("want: %#v", want)
t.Errorf(" got: %#v", got)
}
}

func testWatch(t *testing.T, client *k8s.Client, namespace string, r k8s.Resource, newCM func() k8s.Resource, update func(cm k8s.Resource)) {
cm := newCM()
want := func(eventType string) {
got := newCM()
eT, err := w.Next(got)
if err != nil {
t.Errorf("decode watch event: %v", err)

if r.GetMetadata() != nil {
// Individual watch must created beforehand
if err := client.Create(context.TODO(), cm); err != nil {
t.Errorf("create configmap: %v", err)
return
}
if eT != eventType {
t.Errorf("expected event type %q got %q", eventType, eT)
}
cm.GetMetadata().ResourceVersion = k8s.String("")
got.GetMetadata().ResourceVersion = k8s.String("")
if !reflect.DeepEqual(got, cm) {
t.Errorf("configmaps didn't match")
t.Errorf("want: %#v", cm)
t.Errorf(" got: %#v", got)
}
}
w, err := client.Watch(context.TODO(), namespace, r)
if err != nil {
t.Fatalf("watch configmaps: %v", err)
}
defer w.Close()

if err := client.Create(context.TODO(), cm); err != nil {
t.Errorf("create configmap: %v", err)
return
if r.GetMetadata() == nil {
if err := client.Create(context.TODO(), cm); err != nil {
t.Errorf("create configmap: %v", err)
return
}
wantEvent(t, w, k8s.EventAdded, newCM(), cm)
}
want(k8s.EventAdded)

update(cm)

if err := client.Update(context.TODO(), cm); err != nil {
t.Errorf("update configmap: %v", err)
return
}
want(k8s.EventModified)
wantEvent(t, w, k8s.EventModified, newCM(), cm)

if err := client.Delete(context.TODO(), cm); err != nil {
t.Errorf("Delete configmap: %v", err)
return
}
want(k8s.EventDeleted)
wantEvent(t, w, k8s.EventDeleted, newCM(), cm)
}

func TestWatchConfigMapJSON(t *testing.T) {
Expand All @@ -86,7 +96,7 @@ func TestWatchConfigMapJSON(t *testing.T) {
updateCM := func(cm k8s.Resource) {
(cm.(*configMapJSON)).Data = map[string]string{"hello": "world"}
}
testWatch(t, client, namespace, newCM, updateCM)
testWatch(t, client, namespace, &configMapJSON{}, newCM, updateCM)
})
}

Expand All @@ -104,6 +114,24 @@ func TestWatchConfigMapProto(t *testing.T) {
updateCM := func(cm k8s.Resource) {
(cm.(*corev1.ConfigMap)).Data = map[string]string{"hello": "world"}
}
testWatch(t, client, namespace, newCM, updateCM)
testWatch(t, client, namespace, &corev1.ConfigMap{}, newCM, updateCM)
})
}

func TestWatchIndividualConfigMap(t *testing.T) {
withNamespace(t, func(client *k8s.Client, namespace string) {
newCM := func() k8s.Resource {
return &corev1.ConfigMap{
Metadata: &metav1.ObjectMeta{
Name: k8s.String("my-configmap"),
Namespace: &namespace,
},
}
}

updateCM := func(cm k8s.Resource) {
(cm.(*corev1.ConfigMap)).Data = map[string]string{"hello": "world"}
}
testWatch(t, client, namespace, newCM(), newCM, updateCM)
})
}

0 comments on commit 76f5275

Please sign in to comment.