diff --git a/cmd/lingo/main.go b/cmd/lingo/main.go index 6c4d8c38..7bfec8f0 100644 --- a/cmd/lingo/main.go +++ b/cmd/lingo/main.go @@ -107,6 +107,7 @@ func run() error { if err != nil { return fmt.Errorf("clientset: %w", err) } + ctx := ctrl.SetupSignalHandler() hostname, err := os.Hostname() if err != nil { @@ -134,6 +135,9 @@ func run() error { deploymentManager.Namespace = namespace deploymentManager.ScaleDownPeriod = time.Duration(scaleDownDelay) * time.Second deployments.NewMetricsCollector(deploymentManager).MustRegister(metricsRegistry) + if err := mgr.AddReadyzCheck("readyz", deploymentManager.ReadinessChecker); err != nil { + return fmt.Errorf("setup readiness handler: %w", err) + } autoscaler, err := autoscaler.New(mgr) if err != nil { @@ -157,8 +161,6 @@ func run() error { } statsServer := &http.Server{Addr: ":8083", Handler: statsHandler} - ctx := ctrl.SetupSignalHandler() - var wg sync.WaitGroup wg.Add(1) go func() { @@ -188,6 +190,13 @@ func run() error { setupLog.Info("manager stopped") }() + if ok := mgr.GetCache().WaitForCacheSync(ctx); !ok { + return fmt.Errorf("client cache could not be synced") + } + if err := deploymentManager.Bootstrap(ctx); err != nil { + return fmt.Errorf("bootstrap deloyment manager: %w", err) + } + if err := proxyServer.ListenAndServe(); err != nil { return fmt.Errorf("listen and serve: %w", err) } diff --git a/deploy/lingo_deploy.yaml b/deploy/lingo_deploy.yaml index 593ca5c8..e76f4d10 100644 --- a/deploy/lingo_deploy.yaml +++ b/deploy/lingo_deploy.yaml @@ -24,3 +24,14 @@ spec: valueFrom: fieldRef: fieldPath: metadata.namespace + readinessProbe: + httpGet: + path: /readyz + port: 8081 + initialDelaySeconds: 1 + periodSeconds: 5 + livenessProbe: + tcpSocket: + port: 8080 + initialDelaySeconds: 15 + periodSeconds: 20 diff --git a/go.mod b/go.mod index 9045ef29..1895ae6a 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,10 @@ module github.com/substratusai/lingo go 1.21.0 require ( + github.com/go-logr/logr v1.2.4 + github.com/google/uuid v1.4.0 + github.com/prometheus/client_golang v1.16.0 + github.com/prometheus/client_model v0.4.0 github.com/stretchr/testify v1.8.4 k8s.io/api v0.28.3 k8s.io/apimachinery v0.28.3 @@ -16,9 +20,9 @@ require ( github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/emicklei/go-restful/v3 v3.11.0 // indirect + github.com/evanphx/json-patch v5.6.0+incompatible // indirect github.com/evanphx/json-patch/v5 v5.6.0 // indirect github.com/fsnotify/fsnotify v1.6.0 // indirect - github.com/go-logr/logr v1.2.4 // indirect github.com/go-logr/zapr v1.2.4 // indirect github.com/go-openapi/jsonpointer v0.19.6 // indirect github.com/go-openapi/jsonreference v0.20.2 // indirect @@ -29,7 +33,6 @@ require ( github.com/google/gnostic-models v0.6.8 // indirect github.com/google/go-cmp v0.5.9 // indirect github.com/google/gofuzz v1.2.0 // indirect - github.com/google/uuid v1.4.0 // indirect github.com/imdario/mergo v0.3.6 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect @@ -40,8 +43,6 @@ require ( github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/prometheus/client_golang v1.16.0 // indirect - github.com/prometheus/client_model v0.4.0 // indirect github.com/prometheus/common v0.44.0 // indirect github.com/prometheus/procfs v0.10.1 // indirect github.com/spf13/pflag v1.0.5 // indirect diff --git a/go.sum b/go.sum index c838eda3..4fea5e22 100644 --- a/go.sum +++ b/go.sum @@ -49,8 +49,6 @@ github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1 h1:K6RDEckDVWvDI9JAJYCmNdQXq6neHJOYx3V6jnqNEec= github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= -github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= -github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4= github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/imdario/mergo v0.3.6 h1:xTNEAn+kxVO7dTZGu0CegyqKZmoWFI0rF8UxjlB2d28= diff --git a/pkg/deployments/manager.go b/pkg/deployments/manager.go index feebfcc1..a008dd70 100644 --- a/pkg/deployments/manager.go +++ b/pkg/deployments/manager.go @@ -4,9 +4,11 @@ import ( "context" "fmt" "log" + "net/http" "strconv" "strings" "sync" + "sync/atomic" "time" appsv1 "k8s.io/api/apps/v1" @@ -47,6 +49,8 @@ type Manager struct { // modelToDeployment maps model names to deployment names. A single deployment // can serve multiple models. modelToDeployment map[string]string + + bootstrapped atomic.Bool } func (r *Manager) SetupWithManager(mgr ctrl.Manager) error { @@ -73,33 +77,41 @@ func (r *Manager) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, return ctrl.Result{}, fmt.Errorf("get: %w", err) } - if ann := d.GetAnnotations(); ann != nil { - modelCSV, ok := ann[lingoDomain+"/models"] - if !ok { - return ctrl.Result{}, nil - } - models := strings.Split(modelCSV, ",") - if len(models) == 0 { - return ctrl.Result{}, nil - } - for _, model := range models { - r.setModelMapping(strings.TrimSpace(model), d.Name) - } - } + return ctrl.Result{}, r.addDeployment(ctx, d) +} +func (r *Manager) addDeployment(ctx context.Context, d appsv1.Deployment) error { + models := getModelsFromAnnotation(d.GetAnnotations()) + if len(models) == 0 { + return nil + } + for _, model := range models { + r.setModelMapping(strings.TrimSpace(model), d.Name) + } var scale autoscalingv1.Scale if err := r.SubResource("scale").Get(ctx, &d, &scale); err != nil { - return ctrl.Result{}, fmt.Errorf("get scale: %w", err) + return fmt.Errorf("get scale: %w", err) } - deploymentName := req.Name + deploymentName := d.Name r.getScaler(deploymentName).UpdateState( scale.Spec.Replicas, getAnnotationInt32(d.GetAnnotations(), lingoDomain+"/min-replicas", 0), getAnnotationInt32(d.GetAnnotations(), lingoDomain+"/max-replicas", 3), ) - return ctrl.Result{}, nil + return nil +} + +func getModelsFromAnnotation(ann map[string]string) []string { + if len(ann) == 0 { + return []string{} + } + modelCSV, ok := ann[lingoDomain+"/models"] + if !ok { + return []string{} + } + return strings.Split(modelCSV, ",") } func (r *Manager) removeDeployment(req ctrl.Request) { @@ -192,6 +204,31 @@ func (r *Manager) ResolveDeployment(model string) (string, bool) { return deploy, ok } +// Bootstrap initializes the Manager by retrieving a list of deployments from the k8s cluster and adding them to the Manager's internal state. +func (r *Manager) Bootstrap(ctx context.Context) error { + var sliceList appsv1.DeploymentList + if err := r.List(ctx, &sliceList, client.InNamespace(r.Namespace)); err != nil { + return fmt.Errorf("list deployments: %w", err) + } + for _, d := range sliceList.Items { + if err := r.addDeployment(ctx, d); err != nil { + return err + } + } + r.bootstrapped.Store(true) + return nil +} + +// ReadinessChecker checks if the Manager state is loaded and ready to handle requests. +// It returns an error if Manager is not bootstrapped yet. +// To be used with sigs.k8s.io/controller-runtime manager `AddReadyzCheck` +func (r *Manager) ReadinessChecker(_ *http.Request) error { + if !r.bootstrapped.Load() { + return fmt.Errorf("not boostrapped yet") + } + return nil +} + func getAnnotationInt32(ann map[string]string, key string, defaultValue int32) int32 { if ann == nil { return defaultValue diff --git a/pkg/deployments/manager_test.go b/pkg/deployments/manager_test.go new file mode 100644 index 00000000..12cba718 --- /dev/null +++ b/pkg/deployments/manager_test.go @@ -0,0 +1,156 @@ +package deployments + +import ( + "context" + "reflect" + "testing" + + appsv1 "k8s.io/api/apps/v1" + autoscalingv1 "k8s.io/api/autoscaling/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + rtfake "sigs.k8s.io/controller-runtime/pkg/client/fake" +) + +func TestReadinessChecker(t *testing.T) { + tests := map[string]struct { + bootstrapped bool + expectError bool + }{ + "not_bootstrapped": { + expectError: true, + }, + "bootstrapped": { + bootstrapped: true, + expectError: false, + }, + } + for name, spec := range tests { + t.Run(name, func(t *testing.T) { + mgr := &Manager{ + Client: rtfake.NewClientBuilder().Build(), + } + if spec.bootstrapped { + require.NoError(t, mgr.Bootstrap(context.TODO())) + } + // when + gotErr := mgr.ReadinessChecker(nil) + if spec.expectError { + + assert.Error(t, gotErr) + return + } + assert.NoError(t, gotErr) + }) + } +} + +func TestAddDeployment(t *testing.T) { + specs := map[string]struct { + deployment appsv1.Deployment + expScale scale + expectedError error + expModels []string + }{ + "single model - default replica settings": { + deployment: appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "my-deployment", + Annotations: map[string]string{ + lingoDomain + "/models": "my-model1", + }, + }, + }, + expModels: []string{"my-model1"}, + expScale: scale{Current: 3, Min: 0, Max: 3}, + }, + "single model - annotated": { + deployment: appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "my-deployment", + Annotations: map[string]string{ + lingoDomain + "/models": "my-model1", + lingoDomain + "/min-replicas": "2", + lingoDomain + "/max-replicas": "5", + }, + }, + }, + expModels: []string{"my-model1"}, + expScale: scale{Current: 3, Min: 2, Max: 5}, + }, + "multi model": { + deployment: appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "my-deployment", + Annotations: map[string]string{ + lingoDomain + "/models": "my-model1,my-model2", + }, + }, + }, + expModels: []string{"my-model1", "my-model2"}, + expScale: scale{Current: 3, Min: 0, Max: 3}, + }, + "no model - skipped": { + deployment: appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "my-deployment", + Annotations: map[string]string{}, + }, + }, + }, + } + for name, spec := range specs { + t.Run(name, func(t *testing.T) { + depScale := &autoscalingv1.Scale{ + Spec: autoscalingv1.ScaleSpec{ + Replicas: 3, + }, + } + + r := &Manager{ + Client: &partialFakeClient{subRes: depScale}, + Namespace: "default", + modelToDeployment: make(map[string]string), + scalers: map[string]*scaler{}, + } + + // when + gotErr := r.addDeployment(context.Background(), spec.deployment) + + // then + require.NoError(t, gotErr) + + for _, v := range spec.expModels { + dep, ok := r.ResolveDeployment(v) + require.True(t, ok) + assert.Equal(t, "my-deployment", dep) + } + assert.Len(t, r.modelToDeployment, len(spec.expModels)) + scales := r.getScalesSnapshot() + assert.Equal(t, spec.expScale, scales["my-deployment"]) + }) + } +} + +type partialFakeClient struct { + client.Client + subRes client.Object +} + +func (f *partialFakeClient) SubResource(subResource string) client.SubResourceClient { + return &partialSubResFakeClient{sourceSubRes: &f.subRes} +} + +type partialSubResFakeClient struct { + client.SubResourceClient + sourceSubRes *client.Object +} + +func (f *partialSubResFakeClient) Get(ctx context.Context, obj client.Object, target client.Object, opts ...client.SubResourceGetOption) error { + reflect.ValueOf(target).Elem().Set(reflect.ValueOf(*f.sourceSubRes).Elem()) + return nil +}