Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add readiness endpoint #43

Merged
merged 7 commits into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions cmd/lingo/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"sync"
"time"

"sigs.k8s.io/controller-runtime/pkg/healthz"

"github.com/prometheus/client_golang/prometheus"

"sigs.k8s.io/controller-runtime/pkg/metrics"
Expand Down Expand Up @@ -103,10 +105,15 @@ func run() error {
return fmt.Errorf("starting manager: %w", err)
}

if err := mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the always 200 handler

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this start serving after mgr.Start()? If so, I think we need to move .Bootstrap() above mgr.Start().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit of a chicken/ egg problem. The mgr.Start() call also starts the cache syncs that we need for the bootstrap call. So with this implementation, the container may be "ready" but not alive, yet.
This can be solved with a custom handler that checks bootstrap was called once.
Good 👁️

return fmt.Errorf("setup readiness handler: %w", err)
}

clientset, err := kubernetes.NewForConfig(mgr.GetConfig())
if err != nil {
return fmt.Errorf("clientset: %w", err)
}
ctx := ctrl.SetupSignalHandler()

hostname, err := os.Hostname()
if err != nil {
Expand Down Expand Up @@ -157,8 +164,6 @@ func run() error {
}
statsServer := &http.Server{Addr: ":8083", Handler: statsHandler}

ctx := ctrl.SetupSignalHandler()

var wg sync.WaitGroup
wg.Add(1)
go func() {
Expand Down Expand Up @@ -188,6 +193,11 @@ func run() error {
setupLog.Info("manager stopped")
}()

_ = mgr.GetCache().WaitForCacheSync(ctx)
alpe marked this conversation as resolved.
Show resolved Hide resolved
if err := deploymentManager.Bootstrap(ctx, mgr.GetClient()); err != nil {
return fmt.Errorf("bootstrap deloyment manager: %w", err)
}

if err := proxyServer.ListenAndServe(); err != nil {
return fmt.Errorf("listen and serve: %w", err)
}
Expand Down
11 changes: 11 additions & 0 deletions deploy/lingo_deploy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,14 @@ spec:
valueFrom:
fieldRef:
fieldPath: metadata.namespace
readinessProbe:
httpGet:
path: /readyz
port: 8081
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The healthz endpoint was setup already in main. This adds port/path to make use of it. Not sure about the time values here

initialDelaySeconds: 1
periodSeconds: 5
livenessProbe:
tcpSocket:
alpe marked this conversation as resolved.
Show resolved Hide resolved
port: 8080
initialDelaySeconds: 15
periodSeconds: 20
59 changes: 43 additions & 16 deletions pkg/deployments/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,33 +69,41 @@ func (r *Manager) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result,
return ctrl.Result{}, fmt.Errorf("get: %w", err)
}

if ann := d.GetAnnotations(); ann != nil {
modelCSV, ok := ann[lingoDomain+"/models"]
if !ok {
return ctrl.Result{}, nil
}
models := strings.Split(modelCSV, ",")
if len(models) == 0 {
return ctrl.Result{}, nil
}
for _, model := range models {
r.setModelMapping(strings.TrimSpace(model), d.Name)
}
}
return ctrl.Result{}, r.addDeployment(ctx, d)
}

func (r *Manager) addDeployment(ctx context.Context, d appsv1.Deployment) error {
models := getModelsFromAnnotation(d.GetAnnotations())
alpe marked this conversation as resolved.
Show resolved Hide resolved
if len(models) == 0 {
alpe marked this conversation as resolved.
Show resolved Hide resolved
return nil
}
for _, model := range models {
r.setModelMapping(strings.TrimSpace(model), d.Name)
}
var scale autoscalingv1.Scale
if err := r.SubResource("scale").Get(ctx, &d, &scale); err != nil {
return ctrl.Result{}, fmt.Errorf("get scale: %w", err)
return fmt.Errorf("get scale: %w", err)
}

deploymentName := req.Name
deploymentName := d.Name
r.getScaler(deploymentName).UpdateState(
scale.Spec.Replicas,
getAnnotationInt32(d.GetAnnotations(), lingoDomain+"/min-replicas", 0),
getAnnotationInt32(d.GetAnnotations(), lingoDomain+"/max-replicas", 3),
)

return ctrl.Result{}, nil
return nil
}

func getModelsFromAnnotation(ann map[string]string) []string {
alpe marked this conversation as resolved.
Show resolved Hide resolved
if len(ann) == 0 {
return []string{}
}
modelCSV, ok := ann[lingoDomain+"/models"]
if !ok {
return []string{}
}
return strings.Split(modelCSV, ",")
}

func (r *Manager) getScaler(deploymentName string) *scaler {
Expand Down Expand Up @@ -174,6 +182,25 @@ func (r *Manager) ResolveDeployment(model string) (string, bool) {
return deploy, ok
}

type k8sClient interface {
List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error
}

// Bootstrap initializes the Manager by retrieving a list of deployments from the k8s cluster and adding them to the Manager's internal state.
// The k8s client passed should either be non caching or have the cache synced first.
func (r *Manager) Bootstrap(ctx context.Context, k8sClient k8sClient) error {
alpe marked this conversation as resolved.
Show resolved Hide resolved
var sliceList appsv1.DeploymentList
if err := k8sClient.List(context.TODO(), &sliceList, client.InNamespace(r.Namespace)); err != nil {
alpe marked this conversation as resolved.
Show resolved Hide resolved
alpe marked this conversation as resolved.
Show resolved Hide resolved
return fmt.Errorf("list deployments: %w", err)
}
for _, d := range sliceList.Items {
if err := r.addDeployment(ctx, d); err != nil {
return err
}
}
return nil
}

func getAnnotationInt32(ann map[string]string, key string, defaultValue int32) int32 {
if ann == nil {
return defaultValue
Expand Down
Loading