Skip to content

Commit

Permalink
ISTIO support for notebook controller (kubeflow#3104)
Browse files Browse the repository at this point in the history
* virtual service func init

* create virtualservice

* fix

* fix

* add cluster role

* fix unstructured format

* updates

* fix

* reconcile virtual service

* fix

* revert quote changes

* add virtualservice update

* comment

* copy if spec is not found in toSpec

* add watch event
  • Loading branch information
gabrielwen authored and k8s-ci-robot committed Apr 29, 2019
1 parent d099dfb commit 5367d99
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package notebook

import (
"context"
"fmt"
"os"
"strings"

Expand All @@ -27,6 +28,7 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
Expand All @@ -45,6 +47,7 @@ import (
var log = logf.Log.WithName("controller")

const DefaultContainerPort = 8888
const DefaultServingPort = 80

// The default fsGroup of PodSecurityContext.
// https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.11/#podsecuritycontext-v1-core
Expand Down Expand Up @@ -91,6 +94,18 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
return err
}

// Watch for changes to Notebook virtualservices.
virtualService := &unstructured.Unstructured{}
virtualService.SetAPIVersion("networking.istio.io/v1alpha3")
virtualService.SetKind("VirtualService")
err = c.Watch(&source.Kind{Type: virtualService}, &handler.EnqueueRequestForOwner{
IsController: true,
OwnerType: &v1alpha1.Notebook{},
})
if err != nil {
return err
}

// Watch underlying pod.
// mapFn defines the mapping from object in event to reconcile request
mapFn := handler.ToRequestsFunc(
Expand Down Expand Up @@ -216,6 +231,39 @@ func (r *ReconcileNotebook) Reconcile(request reconcile.Request) (reconcile.Resu
}
}

// Reconcile virtual service
virtualService, err := generateVirtualService(instance)
if err := controllerutil.SetControllerReference(instance, virtualService, r.scheme); err != nil {
return reconcile.Result{}, err
}
// Check if the virtual service already exists.
foundVirtual := &unstructured.Unstructured{}
justCreated = false
foundVirtual.SetAPIVersion("networking.istio.io/v1alpha3")
foundVirtual.SetKind("VirtualService")
err = r.Get(context.TODO(), types.NamespacedName{Name: virtualServiceName(instance.Name,
instance.Namespace), Namespace: instance.Namespace}, foundVirtual)
if err != nil && errors.IsNotFound(err) {
log.Info("Creating virtual service", "namespace", instance.Namespace, "name",
virtualServiceName(instance.Name, instance.Namespace))
err = r.Create(context.TODO(), virtualService)
justCreated = true
if err != nil {
return reconcile.Result{}, err
}
} else if err != nil {
return reconcile.Result{}, err
}

if !justCreated && util.CopyVirtualService(virtualService, foundVirtual) {
log.Info("Updating virtual service", "namespace", instance.Namespace, "name",
virtualServiceName(instance.Name, instance.Namespace))
err = r.Update(context.TODO(), foundVirtual)
if err != nil {
return reconcile.Result{}, err
}
}

// Update the status if previous condition is not "Ready"
oldConditions := instance.Status.Conditions
if len(oldConditions) == 0 || oldConditions[0].Type != "Ready" {
Expand Down Expand Up @@ -360,7 +408,7 @@ func generateService(instance *v1alpha1.Notebook) *corev1.Service {
Selector: map[string]string{"statefulset": instance.Name},
Ports: []corev1.ServicePort{
corev1.ServicePort{
Port: 80,
Port: DefaultServingPort,
TargetPort: intstr.FromInt(port),
Protocol: "TCP",
},
Expand All @@ -369,3 +417,61 @@ func generateService(instance *v1alpha1.Notebook) *corev1.Service {
}
return svc
}

func virtualServiceName(kfName string, namespace string) string {
return fmt.Sprintf("notebook-%s-%s", namespace, kfName)
}

func generateVirtualService(instance *v1alpha1.Notebook) (*unstructured.Unstructured, error) {
name := instance.Name
namespace := instance.Namespace
prefix := fmt.Sprintf("/notebook/%s/%s", namespace, name)
rewrite := fmt.Sprintf("/notebook/%s/%s", namespace, name)
// TODO(gabrielwen): Make clusterDomain an option.
service := fmt.Sprintf("%s.%s.svc.cluster.local", name, namespace)

vsvc := &unstructured.Unstructured{}
vsvc.SetAPIVersion("networking.istio.io/v1alpha3")
vsvc.SetKind("VirtualService")
vsvc.SetName(virtualServiceName(name, namespace))
vsvc.SetNamespace(namespace)
if err := unstructured.SetNestedStringSlice(vsvc.Object, []string{"*"}, "spec", "hosts"); err != nil {
return nil, fmt.Errorf("Set .spec.hosts error: %v", err)
}
if err := unstructured.SetNestedStringSlice(vsvc.Object, []string{"kubeflow-gateway"},
"spec", "gateways"); err != nil {
return nil, fmt.Errorf("Set .spec.gateways error: %v", err)
}

http := []interface{}{
map[string]interface{}{
"match": []interface{}{
map[string]interface{}{
"uri": map[string]interface{}{
"prefix": prefix,
},
},
},
"rewrite": map[string]interface{}{
"uri": rewrite,
},
"route": []interface{}{
map[string]interface{}{
"destination": map[string]interface{}{
"host": service,
"port": map[string]interface{}{
"number": int64(DefaultServingPort),
},
},
},
},
"timeout": "300s",
},
}
if err := unstructured.SetNestedSlice(vsvc.Object, http, "spec", "http"); err != nil {
return nil, fmt.Errorf("Set .spec.http error: %v", err)
}

return vsvc, nil

}
25 changes: 25 additions & 0 deletions components/notebook-controller/pkg/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
)

// Reference: https://github.com/pwittrock/kubebuilder-workshop/blob/master/pkg/util/util.go
Expand Down Expand Up @@ -79,3 +80,27 @@ func CopyServiceFields(from, to *corev1.Service) bool {

return requireUpdate
}

// Copy configuration related fields to another instance and returns true if there
// is a diff and thus needs to update.
func CopyVirtualService(from, to *unstructured.Unstructured) bool {
fromSpec, found, err := unstructured.NestedMap(from.Object, "spec")
if !found {
return false
}
if err != nil {
return false
}

toSpec, found, err := unstructured.NestedMap(to.Object, "spec")
if !found || err != nil {
unstructured.SetNestedMap(to.Object, fromSpec, "spec")
return true
}

requiresUpdate := !reflect.DeepEqual(fromSpec, toSpec)
if requiresUpdate {
unstructured.SetNestedMap(to.Object, fromSpec, "spec")
}
return requiresUpdate
}
11 changes: 11 additions & 0 deletions kubeflow/jupyter/notebook_controller.libsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,17 @@
"*",
],
},
{
apiGroups: [
"networking.istio.io",
],
resources: [
"virtualservices",
],
verbs: [
"*",
],
},
],
},
role:: role,
Expand Down

0 comments on commit 5367d99

Please sign in to comment.