Skip to content

Commit

Permalink
Review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
alpe committed Feb 13, 2024
1 parent 0aa92a5 commit b21571a
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 135 deletions.
36 changes: 0 additions & 36 deletions .github/workflows/linter.yaml

This file was deleted.

46 changes: 0 additions & 46 deletions .golangci.yaml

This file was deleted.

29 changes: 0 additions & 29 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -26,32 +26,3 @@ envtest: $(ENVTEST) ## Download envtest-setup locally if necessary.
$(ENVTEST): $(LOCALBIN)
test -s $(LOCALBIN)/setup-envtest || GOBIN=$(LOCALBIN) go install sigs.k8s.io/controller-runtime/tools/setup-envtest@latest

GOLANGCI ?= $(LOCALBIN)/golangci-lint

.PHONY: golangci
golangci: $(GOLANGCI) ## Download golangci-lint locally if necessary.
$(GOLANGCI): $(LOCALBIN)
test -s $(LOCALBIN)golangci-lint || GOBIN=$(LOCALBIN) go install github.com/golangci/golangci-lint/cmd/[email protected]


.PHONY: lint
lint: golangci
golangci-lint run ./... --timeout 5m

GOLANGCI ?= $(LOCALBIN)/golangci-lint

.PHONY: golangci
golangci: $(GOLANGCI) ## Download golangci-lint locally if necessary.
$(GOLANGCI): $(LOCALBIN)
test -s $(LOCALBIN)golangci-lint || GOBIN=$(LOCALBIN) go install github.com/golangci/golangci-lint/cmd/[email protected]

GCI ?= $(LOCALBIN)/gci

.PHONY: formatter
formatter: $(GCI) ## Download gci locally if necessary.
$(GCI): $(LOCALBIN)
test -s $(LOCALBIN)gci || GOBIN=$(LOCALBIN) go install github.com/daixiang0/[email protected]

.PHONY: format
format: formatter
find . -name '*.go' -type f -not -path "./vendor*" -not -path "*.git*" | xargs gci write --skip-generated -s standard -s default -s "prefix(*.k8s.io)" -s "prefix(github.com/substratusai/lingo)" --custom-order
10 changes: 6 additions & 4 deletions cmd/lingo/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,12 @@ func run() error {
concurrency := getEnvInt("CONCURRENCY", 100)
scaleDownDelay := getEnvInt("SCALE_DOWN_DELAY", 30)

var metricsAddr string
var probeAddr string
var concurrencyPerReplica int
var requestHeaderTimeout time.Duration
var (
metricsAddr string
probeAddr string
concurrencyPerReplica int
requestHeaderTimeout time.Duration // setting to prevent slowloris attack on http server
)

flag.StringVar(&metricsAddr, "metrics-bind-address", ":8082", "The address the metric endpoint binds to.")
flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
Expand Down
40 changes: 20 additions & 20 deletions pkg/autoscaler/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,36 +56,36 @@ type Autoscaler struct {
movingAvgQueueSize map[string]*movingaverage.Simple
}

func (r *Autoscaler) SetupWithManager(mgr ctrl.Manager) error {
func (a *Autoscaler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&corev1.ConfigMap{}).
Complete(r)
Complete(a)
}

func (r *Autoscaler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
func (a *Autoscaler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
var cm corev1.ConfigMap
if err := r.Get(ctx, req.NamespacedName, &cm); err != nil {
if err := a.Get(ctx, req.NamespacedName, &cm); err != nil {
return ctrl.Result{}, fmt.Errorf("get: %w", err)
}

return ctrl.Result{}, nil
}

func (r *Autoscaler) Start() {
for range time.Tick(r.Interval) {
if !r.LeaderElection.IsLeader.Load() {
func (a *Autoscaler) Start() {
for range time.Tick(a.Interval) {
if !a.LeaderElection.IsLeader.Load() {
log.Println("Not leader, doing nothing")
continue
}

log.Println("Calculating scales for all")

// TODO: Remove hardcoded Service lookup by name "lingo".
otherLingoEndpoints := r.Endpoints.GetAllHosts("lingo", "stats")
otherLingoEndpoints := a.Endpoints.GetAllHosts("lingo", "stats")

stats, errs := aggregateStats(stats.Stats{
ActiveRequests: r.Queues.TotalCounts(),
}, r.HTTPClient, otherLingoEndpoints)
ActiveRequests: a.Queues.TotalCounts(),
}, a.HTTPClient, otherLingoEndpoints)
if len(errs) != 0 {
for _, err := range errs {
log.Printf("Failed to aggregate stats: %v", err)
Expand All @@ -95,26 +95,26 @@ func (r *Autoscaler) Start() {

for deploymentName, waitCount := range stats.ActiveRequests {
log.Println("Is leader, autoscaling")
avg := r.getMovingAvgQueueSize(deploymentName)
avg := a.getMovingAvgQueueSize(deploymentName)
avg.Next(float64(waitCount))
flt := avg.Calculate()
normalized := flt / float64(r.ConcurrencyPerReplica)
normalized := flt / float64(a.ConcurrencyPerReplica)
ceil := math.Ceil(normalized)
log.Printf("Average for deployment: %s: %v (ceil: %v), current wait count: %v", deploymentName, flt, ceil, waitCount)
r.Deployments.SetDesiredScale(deploymentName, int32(ceil))
a.Deployments.SetDesiredScale(deploymentName, int32(ceil))
}
}
}

func (r *Autoscaler) getMovingAvgQueueSize(deploymentName string) *movingaverage.Simple {
r.movingAvgQueueSizeMtx.Lock()
a, ok := r.movingAvgQueueSize[deploymentName]
func (a *Autoscaler) getMovingAvgQueueSize(deploymentName string) *movingaverage.Simple {
a.movingAvgQueueSizeMtx.Lock()
avg, ok := a.movingAvgQueueSize[deploymentName]
if !ok {
a = movingaverage.NewSimple(make([]float64, r.AverageCount))
r.movingAvgQueueSize[deploymentName] = a
avg = movingaverage.NewSimple(make([]float64, a.AverageCount))
a.movingAvgQueueSize[deploymentName] = avg
}
r.movingAvgQueueSizeMtx.Unlock()
return a
a.movingAvgQueueSizeMtx.Unlock()
return avg
}

func aggregateStats(s stats.Stats, httpc *http.Client, endpoints []string) (stats.Stats, []error) {
Expand Down

0 comments on commit b21571a

Please sign in to comment.