Skip to content

Commit

Permalink
Formatter + Linter
Browse files Browse the repository at this point in the history
  • Loading branch information
alpe committed Jan 19, 2024
1 parent f2f2915 commit 07ac95b
Show file tree
Hide file tree
Showing 18 changed files with 176 additions and 69 deletions.
36 changes: 36 additions & 0 deletions .github/workflows/linter.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
name: lint

on:
push:
branches:
- main
paths-ignore:
- '**.md'
pull_request:
branches:
- main
paths-ignore:
- '**.md'

permissions: read-all

jobs:
lint:
runs-on: ubuntu-latest
steps:
- name: Harden Runner
uses: step-security/harden-runner@eb238b55efaa70779f274895e782ed17c84f2895 # v2.6.1
with:
egress-policy: audit

- uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4.1.1

- uses: actions/setup-go@0c52d547c9bc32b1aa3301fd7a9cb496313a4491 # v5.0.0
with:
go-version: "1.21"
check-latest: true

- name: lint
uses: golangci/golangci-lint-action@3a919529898de77ec3da873e3063ca4b10e7f5cc # v3.7.0
with:
version: v1.55.2
43 changes: 43 additions & 0 deletions .golangci.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
run:
timeout: 5m

linters-settings:
gci:
custom-order: true
sections:
- standard # Standard section: captures all standard packages.
- default # Default section: contains all imports that could not be matched to another section type.
- prefix(*.k8s.io)
- prefix(github.com/substratusai/lingo)
lll:
line-length: 200
misspell:
locale: US
staticcheck:
go: "1.21"

linters:
disable-all: true
enable:
- errcheck
- errorlint
- exportloopref
- forcetypeassert
- gci
- gocritic
- goconst
- godot
- gofmt
- gofumpt
- goimports
- gosec
- gosimple
- govet
- ineffassign
- misspell
- revive
- staticcheck
- typecheck
- unconvert
- unused
- whitespace
29 changes: 29 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,32 @@ envtest: $(ENVTEST) ## Download envtest-setup locally if necessary.
$(ENVTEST): $(LOCALBIN)
test -s $(LOCALBIN)/setup-envtest || GOBIN=$(LOCALBIN) go install sigs.k8s.io/controller-runtime/tools/setup-envtest@latest

GOLANGCI ?= $(LOCALBIN)/golangci-lint

.PHONY: golangci
golangci: $(GOLANGCI) ## Download golangci-lint locally if necessary.
$(GOLANGCI): $(LOCALBIN)
test -s $(LOCALBIN)golangci-lint || GOBIN=$(LOCALBIN) go install github.com/golangci/golangci-lint/cmd/[email protected]


.PHONY: lint
lint: golangci
golangci-lint run --tests=false ./... --timeout 5m

GOLANGCI ?= $(LOCALBIN)/golangci-lint

.PHONY: golangci
golangci: $(GOLANGCI) ## Download golangci-lint locally if necessary.
$(GOLANGCI): $(LOCALBIN)
test -s $(LOCALBIN)golangci-lint || GOBIN=$(LOCALBIN) go install github.com/golangci/golangci-lint/cmd/[email protected]

GCI ?= $(LOCALBIN)/gci

.PHONY: formatter
formatter: $(GCI) ## Download gci locally if necessary.
$(GCI): $(LOCALBIN)
test -s $(LOCALBIN)gci || GOBIN=$(LOCALBIN) go install github.com/daixiang0/[email protected]

.PHONY: format
format: formatter
find . -name '*.go' -type f -not -path "./vendor*" -not -path "*.git*" | xargs gci write --skip-generated -s standard -s default -s "prefix(*.k8s.io)" -s "prefix(github.com/substratusai/lingo)" --custom-order
14 changes: 6 additions & 8 deletions cmd/lingo/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ import (
"time"

"github.com/prometheus/client_golang/prometheus"

"sigs.k8s.io/controller-runtime/pkg/metrics"

"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/kubernetes"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"sigs.k8s.io/controller-runtime/pkg/metrics"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"

"github.com/substratusai/lingo/pkg/autoscaler"
Expand All @@ -27,10 +29,6 @@ import (
"github.com/substratusai/lingo/pkg/proxy"
"github.com/substratusai/lingo/pkg/queue"
"github.com/substratusai/lingo/pkg/stats"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
)

var (
Expand Down Expand Up @@ -113,7 +111,7 @@ func run() error {

hostname, err := os.Hostname()
if err != nil {
return fmt.Errorf("getting hostname: %v", err)
return fmt.Errorf("getting hostname: %w", err)
}
le := leader.NewElection(clientset, hostname, namespace)

Expand Down
27 changes: 14 additions & 13 deletions pkg/autoscaler/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,16 @@ import (
"sync"
"time"

corev1 "k8s.io/api/core/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/substratusai/lingo/pkg/deployments"
"github.com/substratusai/lingo/pkg/endpoints"
"github.com/substratusai/lingo/pkg/leader"
"github.com/substratusai/lingo/pkg/movingaverage"
"github.com/substratusai/lingo/pkg/queue"
"github.com/substratusai/lingo/pkg/stats"
corev1 "k8s.io/api/core/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
)

func New(mgr ctrl.Manager) (*Autoscaler, error) {
Expand Down Expand Up @@ -70,21 +71,21 @@ func (r *Autoscaler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
return ctrl.Result{}, nil
}

func (a *Autoscaler) Start() {
for range time.Tick(a.Interval) {
if !a.LeaderElection.IsLeader.Load() {
func (r *Autoscaler) Start() {
for range time.Tick(r.Interval) {
if !r.LeaderElection.IsLeader.Load() {
log.Println("Not leader, doing nothing")
continue
}

log.Println("Calculating scales for all")

// TODO: Remove hardcoded Service lookup by name "lingo".
otherLingoEndpoints := a.Endpoints.GetAllHosts("lingo", "stats")
otherLingoEndpoints := r.Endpoints.GetAllHosts("lingo", "stats")

stats, errs := aggregateStats(stats.Stats{
ActiveRequests: a.Queues.TotalCounts(),
}, a.HTTPClient, otherLingoEndpoints)
ActiveRequests: r.Queues.TotalCounts(),
}, r.HTTPClient, otherLingoEndpoints)
if len(errs) != 0 {
for _, err := range errs {
log.Printf("Failed to aggregate stats: %v", err)
Expand All @@ -94,13 +95,13 @@ func (a *Autoscaler) Start() {

for deploymentName, waitCount := range stats.ActiveRequests {
log.Println("Is leader, autoscaling")
avg := a.getMovingAvgQueueSize(deploymentName)
avg := r.getMovingAvgQueueSize(deploymentName)
avg.Next(float64(waitCount))
flt := avg.Calculate()
normalized := flt / float64(a.ConcurrencyPerReplica)
normalized := flt / float64(r.ConcurrencyPerReplica)
ceil := math.Ceil(normalized)
log.Printf("Average for deployment: %s: %v (ceil: %v), current wait count: %v", deploymentName, flt, ceil, waitCount)
a.Deployments.SetDesiredScale(deploymentName, int32(ceil))
r.Deployments.SetDesiredScale(deploymentName, int32(ceil))
}
}
}
Expand All @@ -126,7 +127,7 @@ func aggregateStats(s stats.Stats, httpc *http.Client, endpoints []string) (stat
for _, endpoint := range endpoints {
fetched, err := getStats(httpc, "http://"+endpoint)
if err != nil {
errs = append(errs, fmt.Errorf("getting stats: %v: %v", endpoint, err))
errs = append(errs, fmt.Errorf("getting stats: %v: %w", endpoint, err))
continue
}
for k, v := range fetched.ActiveRequests {
Expand Down
4 changes: 2 additions & 2 deletions pkg/deployments/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (r *Manager) getScaler(deploymentName string) *scaler {
}

// getScalesSnapshot returns a snapshot of the stats for all scalers managed by the Manager.
// The scales are returned as a map, where the keys are the model names
// The scales are returned as a map, where the keys are the model names.
func (r *Manager) getScalesSnapshot() map[string]scale {
r.scalersMtx.Lock()
defer r.scalersMtx.Unlock()
Expand Down Expand Up @@ -222,7 +222,7 @@ func (r *Manager) Bootstrap(ctx context.Context) error {

// ReadinessChecker checks if the Manager state is loaded and ready to handle requests.
// It returns an error if Manager is not bootstrapped yet.
// To be used with sigs.k8s.io/controller-runtime manager `AddReadyzCheck`
// To be used with sigs.k8s.io/controller-runtime manager `AddReadyzCheck`.
func (r *Manager) ReadinessChecker(_ *http.Request) error {
if !r.bootstrapped.Load() {
return fmt.Errorf("not boostrapped yet")
Expand Down
6 changes: 2 additions & 4 deletions pkg/deployments/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,12 @@ import (
"reflect"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
appsv1 "k8s.io/api/apps/v1"
autoscalingv1 "k8s.io/api/autoscaling/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
rtfake "sigs.k8s.io/controller-runtime/pkg/client/fake"
)

Expand Down
6 changes: 3 additions & 3 deletions pkg/deployments/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ type MetricsCollector struct {
manager *Manager
}

// NewMetricsCollector constructor
// NewMetricsCollector constructor.
func NewMetricsCollector(m *Manager) *MetricsCollector {
if m == nil {
panic("manager required")
Expand All @@ -22,12 +22,12 @@ func NewMetricsCollector(m *Manager) *MetricsCollector {
}
}

// MustRegister registers all metrics
// MustRegister registers all metrics.
func (p *MetricsCollector) MustRegister(r prometheus.Registerer) {
r.MustRegister(p)
}

// Describe sends the super-set of all possible descriptors of metrics
// Describe sends the super-set of all possible descriptors of metrics.
func (p *MetricsCollector) Describe(descs chan<- *prometheus.Desc) {
descs <- p.currentScaleDescr
descs <- p.minScaleDescr
Expand Down
4 changes: 2 additions & 2 deletions pkg/deployments/scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,9 @@ func (s *scaler) compareScales(current, desired int32) {

log.Printf("Comparing scales, current: %v, desired: %v", s.currentScale, s.desiredScale)

if s.desiredScale > s.currentScale {
if s.desiredScale > s.currentScale { //nolint:gocritic
// Scale up immediately.
go s.scaleFunc(s.desiredScale, false)
go s.scaleFunc(s.desiredScale, false) //nolint:errcheck
s.scaleDownStarted = false
} else if s.desiredScale == s.currentScale {
// Do nothing, schedule nothing.
Expand Down
36 changes: 18 additions & 18 deletions pkg/endpoints/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,37 +93,37 @@ func (e *endpointGroup) getPort(portName string) int32 {
return e.ports[portName]
}

func (g *endpointGroup) lenIPs() int {
g.mtx.RLock()
defer g.mtx.RUnlock()
return len(g.endpoints)
func (e *endpointGroup) lenIPs() int {
e.mtx.RLock()
defer e.mtx.RUnlock()
return len(e.endpoints)
}

func (g *endpointGroup) setIPs(ips map[string]struct{}, ports map[string]int32) {
g.mtx.Lock()
g.ports = ports
func (e *endpointGroup) setIPs(ips map[string]struct{}, ports map[string]int32) {
e.mtx.Lock()
e.ports = ports
for ip := range ips {
if _, ok := g.endpoints[ip]; !ok {
g.endpoints[ip] = endpoint{inFlight: &atomic.Int64{}}
if _, ok := e.endpoints[ip]; !ok {
e.endpoints[ip] = endpoint{inFlight: &atomic.Int64{}}
}
}
for ip := range g.endpoints {
for ip := range e.endpoints {
if _, ok := ips[ip]; !ok {
delete(g.endpoints, ip)
delete(e.endpoints, ip)
}
}
g.mtx.Unlock()
e.mtx.Unlock()

// notify waiting requests
if len(ips) > 0 {
g.broadcastEndpoints()
e.broadcastEndpoints()
}
}

func (g *endpointGroup) broadcastEndpoints() {
g.bmtx.Lock()
defer g.bmtx.Unlock()
func (e *endpointGroup) broadcastEndpoints() {
e.bmtx.Lock()
defer e.bmtx.Unlock()

close(g.bcast)
g.bcast = make(chan struct{})
close(e.bcast)
e.bcast = make(chan struct{})
}
1 change: 0 additions & 1 deletion pkg/endpoints/endpoints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"k8s.io/apimachinery/pkg/util/rand"
)

Expand Down
3 changes: 1 addition & 2 deletions pkg/endpoints/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"sync"

disv1 "k8s.io/api/discovery/v1"

ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
)
Expand Down Expand Up @@ -113,7 +112,7 @@ func (r *Manager) getEndpoints(service string) *endpointGroup {
// AwaitHostAddress returns the host address with the lowest number of in-flight requests. It will block until the host address
// becomes available or the context times out.
//
// It returns a string in the format "host:port" or error on timeout
// It returns a string in the format "host:port" or error on timeout.
func (r *Manager) AwaitHostAddress(ctx context.Context, service, portName string) (string, error) {
return r.getEndpoints(service).getBestHost(ctx, portName)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/proxy/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
switch {
case errors.Is(err, context.Canceled):
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte("Request cancelled"))
_, _ = w.Write([]byte("Request canceled"))
return
case errors.Is(err, context.DeadlineExceeded):
w.WriteHeader(http.StatusGatewayTimeout)
Expand All @@ -112,7 +112,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}

// parseModel parses the model name from the request
// returns empty string when none found or an error for failures on the proxy request object
// returns empty string when none found or an error for failures on the proxy request object.
func parseModel(r *http.Request) (string, *http.Request, error) {
if model := r.Header.Get("X-Model"); model != "" {
return model, r, nil
Expand Down
Loading

0 comments on commit 07ac95b

Please sign in to comment.