Skip to content

Commit

Permalink
Add readiness endpoint (#43)
Browse files Browse the repository at this point in the history
Resolves #5
  • Loading branch information
alpe authored Jan 9, 2024
1 parent bc4ba0d commit e51569a
Show file tree
Hide file tree
Showing 6 changed files with 236 additions and 24 deletions.
13 changes: 11 additions & 2 deletions cmd/lingo/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ func run() error {
if err != nil {
return fmt.Errorf("clientset: %w", err)
}
ctx := ctrl.SetupSignalHandler()

hostname, err := os.Hostname()
if err != nil {
Expand Down Expand Up @@ -134,6 +135,9 @@ func run() error {
deploymentManager.Namespace = namespace
deploymentManager.ScaleDownPeriod = time.Duration(scaleDownDelay) * time.Second
deployments.NewMetricsCollector(deploymentManager).MustRegister(metricsRegistry)
if err := mgr.AddReadyzCheck("readyz", deploymentManager.ReadinessChecker); err != nil {
return fmt.Errorf("setup readiness handler: %w", err)
}

autoscaler, err := autoscaler.New(mgr)
if err != nil {
Expand All @@ -157,8 +161,6 @@ func run() error {
}
statsServer := &http.Server{Addr: ":8083", Handler: statsHandler}

ctx := ctrl.SetupSignalHandler()

var wg sync.WaitGroup
wg.Add(1)
go func() {
Expand Down Expand Up @@ -188,6 +190,13 @@ func run() error {
setupLog.Info("manager stopped")
}()

if ok := mgr.GetCache().WaitForCacheSync(ctx); !ok {
return fmt.Errorf("client cache could not be synced")
}
if err := deploymentManager.Bootstrap(ctx); err != nil {
return fmt.Errorf("bootstrap deloyment manager: %w", err)
}

if err := proxyServer.ListenAndServe(); err != nil {
return fmt.Errorf("listen and serve: %w", err)
}
Expand Down
11 changes: 11 additions & 0 deletions deploy/lingo_deploy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,14 @@ spec:
valueFrom:
fieldRef:
fieldPath: metadata.namespace
readinessProbe:
httpGet:
path: /readyz
port: 8081
initialDelaySeconds: 1
periodSeconds: 5
livenessProbe:
tcpSocket:
port: 8080
initialDelaySeconds: 15
periodSeconds: 20
9 changes: 5 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ module github.com/substratusai/lingo
go 1.21.0

require (
github.com/go-logr/logr v1.2.4
github.com/google/uuid v1.4.0
github.com/prometheus/client_golang v1.16.0
github.com/prometheus/client_model v0.4.0
github.com/stretchr/testify v1.8.4
k8s.io/api v0.28.3
k8s.io/apimachinery v0.28.3
Expand All @@ -16,9 +20,9 @@ require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
github.com/evanphx/json-patch v5.6.0+incompatible // indirect
github.com/evanphx/json-patch/v5 v5.6.0 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-logr/zapr v1.2.4 // indirect
github.com/go-openapi/jsonpointer v0.19.6 // indirect
github.com/go-openapi/jsonreference v0.20.2 // indirect
Expand All @@ -29,7 +33,6 @@ require (
github.com/google/gnostic-models v0.6.8 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/uuid v1.4.0 // indirect
github.com/imdario/mergo v0.3.6 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
Expand All @@ -40,8 +43,6 @@ require (
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.16.0 // indirect
github.com/prometheus/client_model v0.4.0 // indirect
github.com/prometheus/common v0.44.0 // indirect
github.com/prometheus/procfs v0.10.1 // indirect
github.com/spf13/pflag v1.0.5 // indirect
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0=
github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1 h1:K6RDEckDVWvDI9JAJYCmNdQXq6neHJOYx3V6jnqNEec=
github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4=
github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/imdario/mergo v0.3.6 h1:xTNEAn+kxVO7dTZGu0CegyqKZmoWFI0rF8UxjlB2d28=
Expand Down
69 changes: 53 additions & 16 deletions pkg/deployments/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import (
"context"
"fmt"
"log"
"net/http"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

appsv1 "k8s.io/api/apps/v1"
Expand Down Expand Up @@ -47,6 +49,8 @@ type Manager struct {
// modelToDeployment maps model names to deployment names. A single deployment
// can serve multiple models.
modelToDeployment map[string]string

bootstrapped atomic.Bool
}

func (r *Manager) SetupWithManager(mgr ctrl.Manager) error {
Expand All @@ -73,33 +77,41 @@ func (r *Manager) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result,
return ctrl.Result{}, fmt.Errorf("get: %w", err)
}

if ann := d.GetAnnotations(); ann != nil {
modelCSV, ok := ann[lingoDomain+"/models"]
if !ok {
return ctrl.Result{}, nil
}
models := strings.Split(modelCSV, ",")
if len(models) == 0 {
return ctrl.Result{}, nil
}
for _, model := range models {
r.setModelMapping(strings.TrimSpace(model), d.Name)
}
}
return ctrl.Result{}, r.addDeployment(ctx, d)
}

func (r *Manager) addDeployment(ctx context.Context, d appsv1.Deployment) error {
models := getModelsFromAnnotation(d.GetAnnotations())
if len(models) == 0 {
return nil
}
for _, model := range models {
r.setModelMapping(strings.TrimSpace(model), d.Name)
}
var scale autoscalingv1.Scale
if err := r.SubResource("scale").Get(ctx, &d, &scale); err != nil {
return ctrl.Result{}, fmt.Errorf("get scale: %w", err)
return fmt.Errorf("get scale: %w", err)
}

deploymentName := req.Name
deploymentName := d.Name
r.getScaler(deploymentName).UpdateState(
scale.Spec.Replicas,
getAnnotationInt32(d.GetAnnotations(), lingoDomain+"/min-replicas", 0),
getAnnotationInt32(d.GetAnnotations(), lingoDomain+"/max-replicas", 3),
)

return ctrl.Result{}, nil
return nil
}

func getModelsFromAnnotation(ann map[string]string) []string {
if len(ann) == 0 {
return []string{}
}
modelCSV, ok := ann[lingoDomain+"/models"]
if !ok {
return []string{}
}
return strings.Split(modelCSV, ",")
}

func (r *Manager) removeDeployment(req ctrl.Request) {
Expand Down Expand Up @@ -192,6 +204,31 @@ func (r *Manager) ResolveDeployment(model string) (string, bool) {
return deploy, ok
}

// Bootstrap initializes the Manager by retrieving a list of deployments from the k8s cluster and adding them to the Manager's internal state.
func (r *Manager) Bootstrap(ctx context.Context) error {
var sliceList appsv1.DeploymentList
if err := r.List(ctx, &sliceList, client.InNamespace(r.Namespace)); err != nil {
return fmt.Errorf("list deployments: %w", err)
}
for _, d := range sliceList.Items {
if err := r.addDeployment(ctx, d); err != nil {
return err
}
}
r.bootstrapped.Store(true)
return nil
}

// ReadinessChecker checks if the Manager state is loaded and ready to handle requests.
// It returns an error if Manager is not bootstrapped yet.
// To be used with sigs.k8s.io/controller-runtime manager `AddReadyzCheck`
func (r *Manager) ReadinessChecker(_ *http.Request) error {
if !r.bootstrapped.Load() {
return fmt.Errorf("not boostrapped yet")
}
return nil
}

func getAnnotationInt32(ann map[string]string, key string, defaultValue int32) int32 {
if ann == nil {
return defaultValue
Expand Down
156 changes: 156 additions & 0 deletions pkg/deployments/manager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package deployments

import (
"context"
"reflect"
"testing"

appsv1 "k8s.io/api/apps/v1"
autoscalingv1 "k8s.io/api/autoscaling/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
rtfake "sigs.k8s.io/controller-runtime/pkg/client/fake"
)

func TestReadinessChecker(t *testing.T) {
tests := map[string]struct {
bootstrapped bool
expectError bool
}{
"not_bootstrapped": {
expectError: true,
},
"bootstrapped": {
bootstrapped: true,
expectError: false,
},
}
for name, spec := range tests {
t.Run(name, func(t *testing.T) {
mgr := &Manager{
Client: rtfake.NewClientBuilder().Build(),
}
if spec.bootstrapped {
require.NoError(t, mgr.Bootstrap(context.TODO()))
}
// when
gotErr := mgr.ReadinessChecker(nil)
if spec.expectError {

assert.Error(t, gotErr)
return
}
assert.NoError(t, gotErr)
})
}
}

func TestAddDeployment(t *testing.T) {
specs := map[string]struct {
deployment appsv1.Deployment
expScale scale
expectedError error
expModels []string
}{
"single model - default replica settings": {
deployment: appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: "my-deployment",
Annotations: map[string]string{
lingoDomain + "/models": "my-model1",
},
},
},
expModels: []string{"my-model1"},
expScale: scale{Current: 3, Min: 0, Max: 3},
},
"single model - annotated": {
deployment: appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: "my-deployment",
Annotations: map[string]string{
lingoDomain + "/models": "my-model1",
lingoDomain + "/min-replicas": "2",
lingoDomain + "/max-replicas": "5",
},
},
},
expModels: []string{"my-model1"},
expScale: scale{Current: 3, Min: 2, Max: 5},
},
"multi model": {
deployment: appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: "my-deployment",
Annotations: map[string]string{
lingoDomain + "/models": "my-model1,my-model2",
},
},
},
expModels: []string{"my-model1", "my-model2"},
expScale: scale{Current: 3, Min: 0, Max: 3},
},
"no model - skipped": {
deployment: appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: "my-deployment",
Annotations: map[string]string{},
},
},
},
}
for name, spec := range specs {
t.Run(name, func(t *testing.T) {
depScale := &autoscalingv1.Scale{
Spec: autoscalingv1.ScaleSpec{
Replicas: 3,
},
}

r := &Manager{
Client: &partialFakeClient{subRes: depScale},
Namespace: "default",
modelToDeployment: make(map[string]string),
scalers: map[string]*scaler{},
}

// when
gotErr := r.addDeployment(context.Background(), spec.deployment)

// then
require.NoError(t, gotErr)

for _, v := range spec.expModels {
dep, ok := r.ResolveDeployment(v)
require.True(t, ok)
assert.Equal(t, "my-deployment", dep)
}
assert.Len(t, r.modelToDeployment, len(spec.expModels))
scales := r.getScalesSnapshot()
assert.Equal(t, spec.expScale, scales["my-deployment"])
})
}
}

type partialFakeClient struct {
client.Client
subRes client.Object
}

func (f *partialFakeClient) SubResource(subResource string) client.SubResourceClient {
return &partialSubResFakeClient{sourceSubRes: &f.subRes}
}

type partialSubResFakeClient struct {
client.SubResourceClient
sourceSubRes *client.Object
}

func (f *partialSubResFakeClient) Get(ctx context.Context, obj client.Object, target client.Object, opts ...client.SubResourceGetOption) error {
reflect.ValueOf(target).Elem().Set(reflect.ValueOf(*f.sourceSubRes).Elem())
return nil
}

0 comments on commit e51569a

Please sign in to comment.