diff --git a/.github/workflows/linter.yaml b/.github/workflows/linter.yaml new file mode 100644 index 00000000..72655638 --- /dev/null +++ b/.github/workflows/linter.yaml @@ -0,0 +1,36 @@ +name: lint + +on: + push: + branches: + - main + paths-ignore: + - '**.md' + pull_request: + branches: + - main + paths-ignore: + - '**.md' + +permissions: read-all + +jobs: + lint: + runs-on: ubuntu-latest + steps: + - name: Harden Runner + uses: step-security/harden-runner@eb238b55efaa70779f274895e782ed17c84f2895 # v2.6.1 + with: + egress-policy: audit + + - uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4.1.1 + + - uses: actions/setup-go@0c52d547c9bc32b1aa3301fd7a9cb496313a4491 # v5.0.0 + with: + go-version: "1.21" + check-latest: true + + - name: lint + uses: golangci/golangci-lint-action@3a919529898de77ec3da873e3063ca4b10e7f5cc # v3.7.0 + with: + version: v1.55.2 \ No newline at end of file diff --git a/.golangci.yaml b/.golangci.yaml new file mode 100644 index 00000000..8c48f26f --- /dev/null +++ b/.golangci.yaml @@ -0,0 +1,43 @@ +run: + timeout: 5m + +linters-settings: + gci: + custom-order: true + sections: + - standard # Standard section: captures all standard packages. + - default # Default section: contains all imports that could not be matched to another section type. + - prefix(*.k8s.io) + - prefix(github.com/substratusai/lingo) + lll: + line-length: 200 + misspell: + locale: US + staticcheck: + go: "1.21" + +linters: + disable-all: true + enable: + - errcheck + - errorlint + - exportloopref + - forcetypeassert + - gci + - gocritic + - goconst + - godot + - gofmt + - gofumpt + - goimports + - gosec + - gosimple + - govet + - ineffassign + - misspell + - revive + - staticcheck + - typecheck + - unconvert + - unused + - whitespace \ No newline at end of file diff --git a/Makefile b/Makefile index 1826ea5b..89024eb2 100644 --- a/Makefile +++ b/Makefile @@ -26,3 +26,32 @@ envtest: $(ENVTEST) ## Download envtest-setup locally if necessary. $(ENVTEST): $(LOCALBIN) test -s $(LOCALBIN)/setup-envtest || GOBIN=$(LOCALBIN) go install sigs.k8s.io/controller-runtime/tools/setup-envtest@latest +GOLANGCI ?= $(LOCALBIN)/golangci-lint + +.PHONY: golangci +golangci: $(GOLANGCI) ## Download golangci-lint locally if necessary. +$(GOLANGCI): $(LOCALBIN) + test -s $(LOCALBIN)golangci-lint || GOBIN=$(LOCALBIN) go install github.com/golangci/golangci-lint/cmd/golangci-lint@v1.55.2 + + +.PHONY: lint +lint: golangci + golangci-lint run --tests=false ./... --timeout 5m + +GOLANGCI ?= $(LOCALBIN)/golangci-lint + +.PHONY: golangci +golangci: $(GOLANGCI) ## Download golangci-lint locally if necessary. +$(GOLANGCI): $(LOCALBIN) + test -s $(LOCALBIN)golangci-lint || GOBIN=$(LOCALBIN) go install github.com/golangci/golangci-lint/cmd/golangci-lint@v1.55.2 + +GCI ?= $(LOCALBIN)/gci + +.PHONY: formatter +formatter: $(GCI) ## Download gci locally if necessary. +$(GCI): $(LOCALBIN) + test -s $(LOCALBIN)gci || GOBIN=$(LOCALBIN) go install github.com/daixiang0/gci@v0.12.1 + +.PHONY: format +format: formatter + find . -name '*.go' -type f -not -path "./vendor*" -not -path "*.git*" | xargs gci write --skip-generated -s standard -s default -s "prefix(*.k8s.io)" -s "prefix(github.com/substratusai/lingo)" --custom-order \ No newline at end of file diff --git a/cmd/lingo/main.go b/cmd/lingo/main.go index c7582a6c..8d7af897 100644 --- a/cmd/lingo/main.go +++ b/cmd/lingo/main.go @@ -12,12 +12,14 @@ import ( "time" "github.com/prometheus/client_golang/prometheus" - - "sigs.k8s.io/controller-runtime/pkg/metrics" - + "k8s.io/apimachinery/pkg/runtime" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/kubernetes" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" + ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/log/zap" + "sigs.k8s.io/controller-runtime/pkg/metrics" metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" "github.com/substratusai/lingo/pkg/autoscaler" @@ -27,10 +29,6 @@ import ( "github.com/substratusai/lingo/pkg/proxy" "github.com/substratusai/lingo/pkg/queue" "github.com/substratusai/lingo/pkg/stats" - "k8s.io/apimachinery/pkg/runtime" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" - clientgoscheme "k8s.io/client-go/kubernetes/scheme" - ctrl "sigs.k8s.io/controller-runtime" ) var ( @@ -113,7 +111,7 @@ func run() error { hostname, err := os.Hostname() if err != nil { - return fmt.Errorf("getting hostname: %v", err) + return fmt.Errorf("getting hostname: %w", err) } le := leader.NewElection(clientset, hostname, namespace) diff --git a/pkg/autoscaler/autoscaler.go b/pkg/autoscaler/autoscaler.go index fa6c07b2..c903c505 100644 --- a/pkg/autoscaler/autoscaler.go +++ b/pkg/autoscaler/autoscaler.go @@ -10,15 +10,16 @@ import ( "sync" "time" + corev1 "k8s.io/api/core/v1" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "github.com/substratusai/lingo/pkg/deployments" "github.com/substratusai/lingo/pkg/endpoints" "github.com/substratusai/lingo/pkg/leader" "github.com/substratusai/lingo/pkg/movingaverage" "github.com/substratusai/lingo/pkg/queue" "github.com/substratusai/lingo/pkg/stats" - corev1 "k8s.io/api/core/v1" - ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" ) func New(mgr ctrl.Manager) (*Autoscaler, error) { @@ -70,9 +71,9 @@ func (r *Autoscaler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu return ctrl.Result{}, nil } -func (a *Autoscaler) Start() { - for range time.Tick(a.Interval) { - if !a.LeaderElection.IsLeader.Load() { +func (r *Autoscaler) Start() { + for range time.Tick(r.Interval) { + if !r.LeaderElection.IsLeader.Load() { log.Println("Not leader, doing nothing") continue } @@ -80,11 +81,11 @@ func (a *Autoscaler) Start() { log.Println("Calculating scales for all") // TODO: Remove hardcoded Service lookup by name "lingo". - otherLingoEndpoints := a.Endpoints.GetAllHosts("lingo", "stats") + otherLingoEndpoints := r.Endpoints.GetAllHosts("lingo", "stats") stats, errs := aggregateStats(stats.Stats{ - ActiveRequests: a.Queues.TotalCounts(), - }, a.HTTPClient, otherLingoEndpoints) + ActiveRequests: r.Queues.TotalCounts(), + }, r.HTTPClient, otherLingoEndpoints) if len(errs) != 0 { for _, err := range errs { log.Printf("Failed to aggregate stats: %v", err) @@ -94,13 +95,13 @@ func (a *Autoscaler) Start() { for deploymentName, waitCount := range stats.ActiveRequests { log.Println("Is leader, autoscaling") - avg := a.getMovingAvgQueueSize(deploymentName) + avg := r.getMovingAvgQueueSize(deploymentName) avg.Next(float64(waitCount)) flt := avg.Calculate() - normalized := flt / float64(a.ConcurrencyPerReplica) + normalized := flt / float64(r.ConcurrencyPerReplica) ceil := math.Ceil(normalized) log.Printf("Average for deployment: %s: %v (ceil: %v), current wait count: %v", deploymentName, flt, ceil, waitCount) - a.Deployments.SetDesiredScale(deploymentName, int32(ceil)) + r.Deployments.SetDesiredScale(deploymentName, int32(ceil)) } } } @@ -126,7 +127,7 @@ func aggregateStats(s stats.Stats, httpc *http.Client, endpoints []string) (stat for _, endpoint := range endpoints { fetched, err := getStats(httpc, "http://"+endpoint) if err != nil { - errs = append(errs, fmt.Errorf("getting stats: %v: %v", endpoint, err)) + errs = append(errs, fmt.Errorf("getting stats: %v: %w", endpoint, err)) continue } for k, v := range fetched.ActiveRequests { diff --git a/pkg/deployments/manager.go b/pkg/deployments/manager.go index 3bea6710..32aaf5a9 100644 --- a/pkg/deployments/manager.go +++ b/pkg/deployments/manager.go @@ -141,7 +141,7 @@ func (r *Manager) getScaler(deploymentName string) *scaler { } // getScalesSnapshot returns a snapshot of the stats for all scalers managed by the Manager. -// The scales are returned as a map, where the keys are the model names +// The scales are returned as a map, where the keys are the model names. func (r *Manager) getScalesSnapshot() map[string]scale { r.scalersMtx.Lock() defer r.scalersMtx.Unlock() @@ -222,7 +222,7 @@ func (r *Manager) Bootstrap(ctx context.Context) error { // ReadinessChecker checks if the Manager state is loaded and ready to handle requests. // It returns an error if Manager is not bootstrapped yet. -// To be used with sigs.k8s.io/controller-runtime manager `AddReadyzCheck` +// To be used with sigs.k8s.io/controller-runtime manager `AddReadyzCheck`. func (r *Manager) ReadinessChecker(_ *http.Request) error { if !r.bootstrapped.Load() { return fmt.Errorf("not boostrapped yet") diff --git a/pkg/deployments/manager_test.go b/pkg/deployments/manager_test.go index 12cba718..2a0a9f5b 100644 --- a/pkg/deployments/manager_test.go +++ b/pkg/deployments/manager_test.go @@ -5,14 +5,12 @@ import ( "reflect" "testing" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" appsv1 "k8s.io/api/apps/v1" autoscalingv1 "k8s.io/api/autoscaling/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "sigs.k8s.io/controller-runtime/pkg/client" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" rtfake "sigs.k8s.io/controller-runtime/pkg/client/fake" ) diff --git a/pkg/deployments/metrics.go b/pkg/deployments/metrics.go index 1bab7252..8e206c4a 100644 --- a/pkg/deployments/metrics.go +++ b/pkg/deployments/metrics.go @@ -9,7 +9,7 @@ type MetricsCollector struct { manager *Manager } -// NewMetricsCollector constructor +// NewMetricsCollector constructor. func NewMetricsCollector(m *Manager) *MetricsCollector { if m == nil { panic("manager required") @@ -22,12 +22,12 @@ func NewMetricsCollector(m *Manager) *MetricsCollector { } } -// MustRegister registers all metrics +// MustRegister registers all metrics. func (p *MetricsCollector) MustRegister(r prometheus.Registerer) { r.MustRegister(p) } -// Describe sends the super-set of all possible descriptors of metrics +// Describe sends the super-set of all possible descriptors of metrics. func (p *MetricsCollector) Describe(descs chan<- *prometheus.Desc) { descs <- p.currentScaleDescr descs <- p.minScaleDescr diff --git a/pkg/deployments/scaler.go b/pkg/deployments/scaler.go index 5e5b49bc..799ae501 100644 --- a/pkg/deployments/scaler.go +++ b/pkg/deployments/scaler.go @@ -86,9 +86,9 @@ func (s *scaler) compareScales(current, desired int32) { log.Printf("Comparing scales, current: %v, desired: %v", s.currentScale, s.desiredScale) - if s.desiredScale > s.currentScale { + if s.desiredScale > s.currentScale { //nolint:gocritic // Scale up immediately. - go s.scaleFunc(s.desiredScale, false) + go s.scaleFunc(s.desiredScale, false) //nolint:errcheck s.scaleDownStarted = false } else if s.desiredScale == s.currentScale { // Do nothing, schedule nothing. diff --git a/pkg/endpoints/endpoints.go b/pkg/endpoints/endpoints.go index 0e51083a..d4ffee26 100644 --- a/pkg/endpoints/endpoints.go +++ b/pkg/endpoints/endpoints.go @@ -93,37 +93,37 @@ func (e *endpointGroup) getPort(portName string) int32 { return e.ports[portName] } -func (g *endpointGroup) lenIPs() int { - g.mtx.RLock() - defer g.mtx.RUnlock() - return len(g.endpoints) +func (e *endpointGroup) lenIPs() int { + e.mtx.RLock() + defer e.mtx.RUnlock() + return len(e.endpoints) } -func (g *endpointGroup) setIPs(ips map[string]struct{}, ports map[string]int32) { - g.mtx.Lock() - g.ports = ports +func (e *endpointGroup) setIPs(ips map[string]struct{}, ports map[string]int32) { + e.mtx.Lock() + e.ports = ports for ip := range ips { - if _, ok := g.endpoints[ip]; !ok { - g.endpoints[ip] = endpoint{inFlight: &atomic.Int64{}} + if _, ok := e.endpoints[ip]; !ok { + e.endpoints[ip] = endpoint{inFlight: &atomic.Int64{}} } } - for ip := range g.endpoints { + for ip := range e.endpoints { if _, ok := ips[ip]; !ok { - delete(g.endpoints, ip) + delete(e.endpoints, ip) } } - g.mtx.Unlock() + e.mtx.Unlock() // notify waiting requests if len(ips) > 0 { - g.broadcastEndpoints() + e.broadcastEndpoints() } } -func (g *endpointGroup) broadcastEndpoints() { - g.bmtx.Lock() - defer g.bmtx.Unlock() +func (e *endpointGroup) broadcastEndpoints() { + e.bmtx.Lock() + defer e.bmtx.Unlock() - close(g.bcast) - g.bcast = make(chan struct{}) + close(e.bcast) + e.bcast = make(chan struct{}) } diff --git a/pkg/endpoints/endpoints_test.go b/pkg/endpoints/endpoints_test.go index 8c2cc1f3..8282c367 100644 --- a/pkg/endpoints/endpoints_test.go +++ b/pkg/endpoints/endpoints_test.go @@ -8,7 +8,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "k8s.io/apimachinery/pkg/util/rand" ) diff --git a/pkg/endpoints/manager.go b/pkg/endpoints/manager.go index bd2d3e39..a9df17fe 100644 --- a/pkg/endpoints/manager.go +++ b/pkg/endpoints/manager.go @@ -7,7 +7,6 @@ import ( "sync" disv1 "k8s.io/api/discovery/v1" - ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -113,7 +112,7 @@ func (r *Manager) getEndpoints(service string) *endpointGroup { // AwaitHostAddress returns the host address with the lowest number of in-flight requests. It will block until the host address // becomes available or the context times out. // -// It returns a string in the format "host:port" or error on timeout +// It returns a string in the format "host:port" or error on timeout. func (r *Manager) AwaitHostAddress(ctx context.Context, service, portName string) (string, error) { return r.getEndpoints(service).getBestHost(ctx, portName) } diff --git a/pkg/proxy/handler.go b/pkg/proxy/handler.go index 3a54c94b..8782841c 100644 --- a/pkg/proxy/handler.go +++ b/pkg/proxy/handler.go @@ -91,7 +91,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { switch { case errors.Is(err, context.Canceled): w.WriteHeader(http.StatusInternalServerError) - _, _ = w.Write([]byte("Request cancelled")) + _, _ = w.Write([]byte("Request canceled")) return case errors.Is(err, context.DeadlineExceeded): w.WriteHeader(http.StatusGatewayTimeout) @@ -112,7 +112,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } // parseModel parses the model name from the request -// returns empty string when none found or an error for failures on the proxy request object +// returns empty string when none found or an error for failures on the proxy request object. func parseModel(r *http.Request) (string, *http.Request, error) { if model := r.Header.Get("X-Model"); model != "" { return model, r, nil diff --git a/pkg/proxy/metrics_test.go b/pkg/proxy/metrics_test.go index 5940e445..ed38c892 100644 --- a/pkg/proxy/metrics_test.go +++ b/pkg/proxy/metrics_test.go @@ -12,16 +12,16 @@ import ( "github.com/prometheus/client_model/go" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/substratusai/lingo/pkg/deployments" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" - ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/config" "sigs.k8s.io/controller-runtime/pkg/manager" + + "github.com/substratusai/lingo/pkg/deployments" ) func TestMetrics(t *testing.T) { diff --git a/pkg/queue/metrics.go b/pkg/queue/metrics.go index e2f0adee..d302eee8 100644 --- a/pkg/queue/metrics.go +++ b/pkg/queue/metrics.go @@ -8,7 +8,7 @@ type MetricsCollector struct { manager *Manager } -// NewMetricsCollector constructor +// NewMetricsCollector constructor. func NewMetricsCollector(m *Manager) *MetricsCollector { if m == nil { panic("manager required") @@ -20,12 +20,12 @@ func NewMetricsCollector(m *Manager) *MetricsCollector { } } -// MustRegister registers all metrics +// MustRegister registers all metrics. func (p *MetricsCollector) MustRegister(r prometheus.Registerer) { r.MustRegister(p) } -// Describe sends the super-set of all possible descriptors of metrics +// Describe sends the super-set of all possible descriptors of metrics. func (p *MetricsCollector) Describe(descs chan<- *prometheus.Desc) { descs <- p.inFlightDescr descs <- p.queuedDescr diff --git a/pkg/queue/queue.go b/pkg/queue/queue.go index eb9ac5bc..9a77728b 100644 --- a/pkg/queue/queue.go +++ b/pkg/queue/queue.go @@ -3,6 +3,7 @@ package queue import ( "container/list" "context" + "fmt" "log" "sync" "sync/atomic" @@ -152,7 +153,10 @@ func (q *Queue) Start() { continue } - itm := e.Value.(*item) + itm, ok := e.Value.(*item) + if !ok { + panic(fmt.Sprintf("invalid type: %T", e.Value)) + } q.dequeue(itm, true) log.Println("Dequeued: ", itm.id) diff --git a/tests/integration/integration_test.go b/tests/integration/integration_test.go index 10fb5e45..de0e1ccb 100644 --- a/tests/integration/integration_test.go +++ b/tests/integration/integration_test.go @@ -13,8 +13,6 @@ import ( "testing" "time" - "sigs.k8s.io/controller-runtime/pkg/client" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" appsv1 "k8s.io/api/apps/v1" @@ -24,6 +22,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/utils/ptr" + "sigs.k8s.io/controller-runtime/pkg/client" ) func TestScaleUpAndDown(t *testing.T) { diff --git a/tests/integration/main_test.go b/tests/integration/main_test.go index 74697559..ab530f67 100644 --- a/tests/integration/main_test.go +++ b/tests/integration/main_test.go @@ -11,12 +11,6 @@ import ( "testing" "time" - "github.com/substratusai/lingo/pkg/autoscaler" - "github.com/substratusai/lingo/pkg/deployments" - "github.com/substratusai/lingo/pkg/endpoints" - "github.com/substratusai/lingo/pkg/leader" - "github.com/substratusai/lingo/pkg/proxy" - "github.com/substratusai/lingo/pkg/queue" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -27,6 +21,13 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/envtest" metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" + + "github.com/substratusai/lingo/pkg/autoscaler" + "github.com/substratusai/lingo/pkg/deployments" + "github.com/substratusai/lingo/pkg/endpoints" + "github.com/substratusai/lingo/pkg/leader" + "github.com/substratusai/lingo/pkg/proxy" + "github.com/substratusai/lingo/pkg/queue" ) var (