From 89e96433c6cd4db335fa612984ccb17738552ff2 Mon Sep 17 00:00:00 2001 From: Alex Peters Date: Fri, 19 Jan 2024 10:24:12 +0100 Subject: [PATCH 1/5] Fix issues reported by gosec --- cmd/lingo/main.go | 14 ++++++++++---- pkg/deployments/manager.go | 8 ++++++-- pkg/proxy/handler.go | 4 ++-- pkg/stats/handler.go | 2 +- 4 files changed, 19 insertions(+), 9 deletions(-) diff --git a/cmd/lingo/main.go b/cmd/lingo/main.go index 2f9ae6f4..580b375b 100644 --- a/cmd/lingo/main.go +++ b/cmd/lingo/main.go @@ -71,11 +71,13 @@ func run() error { var metricsAddr string var probeAddr string var concurrencyPerReplica int + var requestHeaderTimeout time.Duration flag.StringVar(&metricsAddr, "metrics-bind-address", ":8082", "The address the metric endpoint binds to.") flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.") flag.IntVar(&concurrencyPerReplica, "concurrency", concurrency, "the number of simultaneous requests that can be processed by each replica") flag.IntVar(&scaleDownDelay, "scale-down-delay", scaleDownDelay, "seconds to wait before scaling down") + flag.DurationVar(&requestHeaderTimeout, "request-header-timeout", 10*time.Second, "amount of time for the client to send headers before a timeout error will occur") opts := zap.Options{ Development: true, } @@ -154,19 +156,23 @@ func run() error { proxy.MustRegister(metricsRegistry) proxyHandler := proxy.NewHandler(deploymentManager, endpointManager, queueManager) - proxyServer := &http.Server{Addr: ":8080", Handler: proxyHandler} + proxyServer := &http.Server{Addr: ":8080", Handler: proxyHandler, ReadHeaderTimeout: requestHeaderTimeout} statsHandler := &stats.Handler{ Queues: queueManager, } - statsServer := &http.Server{Addr: ":8083", Handler: statsHandler} + statsServer := &http.Server{Addr: ":8083", Handler: statsHandler, ReadHeaderTimeout: requestHeaderTimeout} var wg sync.WaitGroup wg.Add(1) go func() { defer func() { - statsServer.Shutdown(context.Background()) - proxyServer.Shutdown(context.Background()) + if err := statsServer.Shutdown(context.Background()); err != nil { + setupLog.Error(err, "shutdown stats server") + } + if err := proxyServer.Shutdown(context.Background()); err != nil { + setupLog.Error(err, "shutdown proxy server") + } wg.Done() }() if err := mgr.Start(ctx); err != nil { diff --git a/pkg/deployments/manager.go b/pkg/deployments/manager.go index 13b542d9..37ee9686 100644 --- a/pkg/deployments/manager.go +++ b/pkg/deployments/manager.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "log" + "math" "net/http" "strconv" "strings" @@ -258,6 +259,9 @@ func getAnnotationInt32(ann map[string]string, key string, defaultValue int32) i log.Printf("parsing annotation as int: %v", err) return defaultValue } - - return int32(value) + if value > math.MaxInt32 { + log.Printf("invalid value that exceeds max int32: %d", value) + return defaultValue + } + return int32(value) // #nosec G109 : checked before } diff --git a/pkg/proxy/handler.go b/pkg/proxy/handler.go index a51d9d3b..3a54c94b 100644 --- a/pkg/proxy/handler.go +++ b/pkg/proxy/handler.go @@ -56,7 +56,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { modelName = "unknown" log.Printf("error reading model from request body: %v", err) w.WriteHeader(http.StatusBadRequest) - w.Write([]byte("Bad request: unable to parse .model from JSON payload")) + _, _ = w.Write([]byte("Bad request: unable to parse .model from JSON payload")) return } log.Println("model:", modelName) @@ -65,7 +65,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { if !found { log.Printf("deployment not found for model: %v", err) w.WriteHeader(http.StatusNotFound) - w.Write([]byte(fmt.Sprintf("Deployment for model not found: %v", modelName))) + _, _ = w.Write([]byte(fmt.Sprintf("Deployment for model not found: %v", modelName))) return } diff --git a/pkg/stats/handler.go b/pkg/stats/handler.go index 30113ad2..8ec656d4 100644 --- a/pkg/stats/handler.go +++ b/pkg/stats/handler.go @@ -12,7 +12,7 @@ type Handler struct { Queues *queue.Manager } -func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { +func (h *Handler) ServeHTTP(w http.ResponseWriter, _ *http.Request) { if err := json.NewEncoder(w).Encode(Stats{ ActiveRequests: h.Queues.TotalCounts(), }); err != nil { From 1dfb0c052e5dab5c6c73257acc4936fbbf35fa40 Mon Sep 17 00:00:00 2001 From: Alex Peters Date: Fri, 19 Jan 2024 16:02:31 +0100 Subject: [PATCH 2/5] Formatter + Linter --- .github/workflows/linter.yaml | 36 ++++++++++++++++++++++ .golangci.yaml | 43 +++++++++++++++++++++++++++ Makefile | 29 ++++++++++++++++++ cmd/lingo/main.go | 14 ++++----- pkg/autoscaler/autoscaler.go | 27 +++++++++-------- pkg/deployments/manager.go | 4 +-- pkg/deployments/metrics.go | 6 ++-- pkg/endpoints/endpoints.go | 36 +++++++++++----------- pkg/endpoints/endpoints_test.go | 1 - pkg/endpoints/manager.go | 3 +- pkg/proxy/handler.go | 4 +-- pkg/proxy/metrics_test.go | 4 +-- pkg/queue/metrics.go | 6 ++-- pkg/queue/queue.go | 6 +++- tests/integration/integration_test.go | 3 +- tests/integration/main_test.go | 13 ++++---- 16 files changed, 172 insertions(+), 63 deletions(-) create mode 100644 .github/workflows/linter.yaml create mode 100644 .golangci.yaml diff --git a/.github/workflows/linter.yaml b/.github/workflows/linter.yaml new file mode 100644 index 00000000..72655638 --- /dev/null +++ b/.github/workflows/linter.yaml @@ -0,0 +1,36 @@ +name: lint + +on: + push: + branches: + - main + paths-ignore: + - '**.md' + pull_request: + branches: + - main + paths-ignore: + - '**.md' + +permissions: read-all + +jobs: + lint: + runs-on: ubuntu-latest + steps: + - name: Harden Runner + uses: step-security/harden-runner@eb238b55efaa70779f274895e782ed17c84f2895 # v2.6.1 + with: + egress-policy: audit + + - uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4.1.1 + + - uses: actions/setup-go@0c52d547c9bc32b1aa3301fd7a9cb496313a4491 # v5.0.0 + with: + go-version: "1.21" + check-latest: true + + - name: lint + uses: golangci/golangci-lint-action@3a919529898de77ec3da873e3063ca4b10e7f5cc # v3.7.0 + with: + version: v1.55.2 \ No newline at end of file diff --git a/.golangci.yaml b/.golangci.yaml new file mode 100644 index 00000000..8c48f26f --- /dev/null +++ b/.golangci.yaml @@ -0,0 +1,43 @@ +run: + timeout: 5m + +linters-settings: + gci: + custom-order: true + sections: + - standard # Standard section: captures all standard packages. + - default # Default section: contains all imports that could not be matched to another section type. + - prefix(*.k8s.io) + - prefix(github.com/substratusai/lingo) + lll: + line-length: 200 + misspell: + locale: US + staticcheck: + go: "1.21" + +linters: + disable-all: true + enable: + - errcheck + - errorlint + - exportloopref + - forcetypeassert + - gci + - gocritic + - goconst + - godot + - gofmt + - gofumpt + - goimports + - gosec + - gosimple + - govet + - ineffassign + - misspell + - revive + - staticcheck + - typecheck + - unconvert + - unused + - whitespace \ No newline at end of file diff --git a/Makefile b/Makefile index 639c369a..d5d8d703 100644 --- a/Makefile +++ b/Makefile @@ -26,3 +26,32 @@ envtest: $(ENVTEST) ## Download envtest-setup locally if necessary. $(ENVTEST): $(LOCALBIN) test -s $(LOCALBIN)/setup-envtest || GOBIN=$(LOCALBIN) go install sigs.k8s.io/controller-runtime/tools/setup-envtest@latest +GOLANGCI ?= $(LOCALBIN)/golangci-lint + +.PHONY: golangci +golangci: $(GOLANGCI) ## Download golangci-lint locally if necessary. +$(GOLANGCI): $(LOCALBIN) + test -s $(LOCALBIN)golangci-lint || GOBIN=$(LOCALBIN) go install github.com/golangci/golangci-lint/cmd/golangci-lint@v1.55.2 + + +.PHONY: lint +lint: golangci + golangci-lint run --tests=false ./... --timeout 5m + +GOLANGCI ?= $(LOCALBIN)/golangci-lint + +.PHONY: golangci +golangci: $(GOLANGCI) ## Download golangci-lint locally if necessary. +$(GOLANGCI): $(LOCALBIN) + test -s $(LOCALBIN)golangci-lint || GOBIN=$(LOCALBIN) go install github.com/golangci/golangci-lint/cmd/golangci-lint@v1.55.2 + +GCI ?= $(LOCALBIN)/gci + +.PHONY: formatter +formatter: $(GCI) ## Download gci locally if necessary. +$(GCI): $(LOCALBIN) + test -s $(LOCALBIN)gci || GOBIN=$(LOCALBIN) go install github.com/daixiang0/gci@v0.12.1 + +.PHONY: format +format: formatter + find . -name '*.go' -type f -not -path "./vendor*" -not -path "*.git*" | xargs gci write --skip-generated -s standard -s default -s "prefix(*.k8s.io)" -s "prefix(github.com/substratusai/lingo)" --custom-order \ No newline at end of file diff --git a/cmd/lingo/main.go b/cmd/lingo/main.go index 580b375b..5ec3f00e 100644 --- a/cmd/lingo/main.go +++ b/cmd/lingo/main.go @@ -12,12 +12,14 @@ import ( "time" "github.com/prometheus/client_golang/prometheus" - - "sigs.k8s.io/controller-runtime/pkg/metrics" - + "k8s.io/apimachinery/pkg/runtime" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/kubernetes" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" + ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/log/zap" + "sigs.k8s.io/controller-runtime/pkg/metrics" metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" "github.com/substratusai/lingo/pkg/autoscaler" @@ -27,10 +29,6 @@ import ( "github.com/substratusai/lingo/pkg/proxy" "github.com/substratusai/lingo/pkg/queue" "github.com/substratusai/lingo/pkg/stats" - "k8s.io/apimachinery/pkg/runtime" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" - clientgoscheme "k8s.io/client-go/kubernetes/scheme" - ctrl "sigs.k8s.io/controller-runtime" ) var ( @@ -113,7 +111,7 @@ func run() error { hostname, err := os.Hostname() if err != nil { - return fmt.Errorf("getting hostname: %v", err) + return fmt.Errorf("getting hostname: %w", err) } le := leader.NewElection(clientset, hostname, namespace) diff --git a/pkg/autoscaler/autoscaler.go b/pkg/autoscaler/autoscaler.go index fa6c07b2..c903c505 100644 --- a/pkg/autoscaler/autoscaler.go +++ b/pkg/autoscaler/autoscaler.go @@ -10,15 +10,16 @@ import ( "sync" "time" + corev1 "k8s.io/api/core/v1" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "github.com/substratusai/lingo/pkg/deployments" "github.com/substratusai/lingo/pkg/endpoints" "github.com/substratusai/lingo/pkg/leader" "github.com/substratusai/lingo/pkg/movingaverage" "github.com/substratusai/lingo/pkg/queue" "github.com/substratusai/lingo/pkg/stats" - corev1 "k8s.io/api/core/v1" - ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" ) func New(mgr ctrl.Manager) (*Autoscaler, error) { @@ -70,9 +71,9 @@ func (r *Autoscaler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu return ctrl.Result{}, nil } -func (a *Autoscaler) Start() { - for range time.Tick(a.Interval) { - if !a.LeaderElection.IsLeader.Load() { +func (r *Autoscaler) Start() { + for range time.Tick(r.Interval) { + if !r.LeaderElection.IsLeader.Load() { log.Println("Not leader, doing nothing") continue } @@ -80,11 +81,11 @@ func (a *Autoscaler) Start() { log.Println("Calculating scales for all") // TODO: Remove hardcoded Service lookup by name "lingo". - otherLingoEndpoints := a.Endpoints.GetAllHosts("lingo", "stats") + otherLingoEndpoints := r.Endpoints.GetAllHosts("lingo", "stats") stats, errs := aggregateStats(stats.Stats{ - ActiveRequests: a.Queues.TotalCounts(), - }, a.HTTPClient, otherLingoEndpoints) + ActiveRequests: r.Queues.TotalCounts(), + }, r.HTTPClient, otherLingoEndpoints) if len(errs) != 0 { for _, err := range errs { log.Printf("Failed to aggregate stats: %v", err) @@ -94,13 +95,13 @@ func (a *Autoscaler) Start() { for deploymentName, waitCount := range stats.ActiveRequests { log.Println("Is leader, autoscaling") - avg := a.getMovingAvgQueueSize(deploymentName) + avg := r.getMovingAvgQueueSize(deploymentName) avg.Next(float64(waitCount)) flt := avg.Calculate() - normalized := flt / float64(a.ConcurrencyPerReplica) + normalized := flt / float64(r.ConcurrencyPerReplica) ceil := math.Ceil(normalized) log.Printf("Average for deployment: %s: %v (ceil: %v), current wait count: %v", deploymentName, flt, ceil, waitCount) - a.Deployments.SetDesiredScale(deploymentName, int32(ceil)) + r.Deployments.SetDesiredScale(deploymentName, int32(ceil)) } } } @@ -126,7 +127,7 @@ func aggregateStats(s stats.Stats, httpc *http.Client, endpoints []string) (stat for _, endpoint := range endpoints { fetched, err := getStats(httpc, "http://"+endpoint) if err != nil { - errs = append(errs, fmt.Errorf("getting stats: %v: %v", endpoint, err)) + errs = append(errs, fmt.Errorf("getting stats: %v: %w", endpoint, err)) continue } for k, v := range fetched.ActiveRequests { diff --git a/pkg/deployments/manager.go b/pkg/deployments/manager.go index 37ee9686..01875397 100644 --- a/pkg/deployments/manager.go +++ b/pkg/deployments/manager.go @@ -151,7 +151,7 @@ func (r *Manager) getScaler(deploymentName string) *scaler { } // getScalesSnapshot returns a snapshot of the stats for all scalers managed by the Manager. -// The scales are returned as a map, where the keys are the model names +// The scales are returned as a map, where the keys are the model names. func (r *Manager) getScalesSnapshot() map[string]scale { r.scalersMtx.Lock() defer r.scalersMtx.Unlock() @@ -236,7 +236,7 @@ func (r *Manager) Bootstrap(ctx context.Context) error { // ReadinessChecker checks if the Manager state is loaded and ready to handle requests. // It returns an error if Manager is not bootstrapped yet. -// To be used with sigs.k8s.io/controller-runtime manager `AddReadyzCheck` +// To be used with sigs.k8s.io/controller-runtime manager `AddReadyzCheck`. func (r *Manager) ReadinessChecker(_ *http.Request) error { if !r.bootstrapped.Load() { return fmt.Errorf("not boostrapped yet") diff --git a/pkg/deployments/metrics.go b/pkg/deployments/metrics.go index 1bab7252..8e206c4a 100644 --- a/pkg/deployments/metrics.go +++ b/pkg/deployments/metrics.go @@ -9,7 +9,7 @@ type MetricsCollector struct { manager *Manager } -// NewMetricsCollector constructor +// NewMetricsCollector constructor. func NewMetricsCollector(m *Manager) *MetricsCollector { if m == nil { panic("manager required") @@ -22,12 +22,12 @@ func NewMetricsCollector(m *Manager) *MetricsCollector { } } -// MustRegister registers all metrics +// MustRegister registers all metrics. func (p *MetricsCollector) MustRegister(r prometheus.Registerer) { r.MustRegister(p) } -// Describe sends the super-set of all possible descriptors of metrics +// Describe sends the super-set of all possible descriptors of metrics. func (p *MetricsCollector) Describe(descs chan<- *prometheus.Desc) { descs <- p.currentScaleDescr descs <- p.minScaleDescr diff --git a/pkg/endpoints/endpoints.go b/pkg/endpoints/endpoints.go index 0e51083a..d4ffee26 100644 --- a/pkg/endpoints/endpoints.go +++ b/pkg/endpoints/endpoints.go @@ -93,37 +93,37 @@ func (e *endpointGroup) getPort(portName string) int32 { return e.ports[portName] } -func (g *endpointGroup) lenIPs() int { - g.mtx.RLock() - defer g.mtx.RUnlock() - return len(g.endpoints) +func (e *endpointGroup) lenIPs() int { + e.mtx.RLock() + defer e.mtx.RUnlock() + return len(e.endpoints) } -func (g *endpointGroup) setIPs(ips map[string]struct{}, ports map[string]int32) { - g.mtx.Lock() - g.ports = ports +func (e *endpointGroup) setIPs(ips map[string]struct{}, ports map[string]int32) { + e.mtx.Lock() + e.ports = ports for ip := range ips { - if _, ok := g.endpoints[ip]; !ok { - g.endpoints[ip] = endpoint{inFlight: &atomic.Int64{}} + if _, ok := e.endpoints[ip]; !ok { + e.endpoints[ip] = endpoint{inFlight: &atomic.Int64{}} } } - for ip := range g.endpoints { + for ip := range e.endpoints { if _, ok := ips[ip]; !ok { - delete(g.endpoints, ip) + delete(e.endpoints, ip) } } - g.mtx.Unlock() + e.mtx.Unlock() // notify waiting requests if len(ips) > 0 { - g.broadcastEndpoints() + e.broadcastEndpoints() } } -func (g *endpointGroup) broadcastEndpoints() { - g.bmtx.Lock() - defer g.bmtx.Unlock() +func (e *endpointGroup) broadcastEndpoints() { + e.bmtx.Lock() + defer e.bmtx.Unlock() - close(g.bcast) - g.bcast = make(chan struct{}) + close(e.bcast) + e.bcast = make(chan struct{}) } diff --git a/pkg/endpoints/endpoints_test.go b/pkg/endpoints/endpoints_test.go index 8c2cc1f3..8282c367 100644 --- a/pkg/endpoints/endpoints_test.go +++ b/pkg/endpoints/endpoints_test.go @@ -8,7 +8,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "k8s.io/apimachinery/pkg/util/rand" ) diff --git a/pkg/endpoints/manager.go b/pkg/endpoints/manager.go index bd2d3e39..a9df17fe 100644 --- a/pkg/endpoints/manager.go +++ b/pkg/endpoints/manager.go @@ -7,7 +7,6 @@ import ( "sync" disv1 "k8s.io/api/discovery/v1" - ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -113,7 +112,7 @@ func (r *Manager) getEndpoints(service string) *endpointGroup { // AwaitHostAddress returns the host address with the lowest number of in-flight requests. It will block until the host address // becomes available or the context times out. // -// It returns a string in the format "host:port" or error on timeout +// It returns a string in the format "host:port" or error on timeout. func (r *Manager) AwaitHostAddress(ctx context.Context, service, portName string) (string, error) { return r.getEndpoints(service).getBestHost(ctx, portName) } diff --git a/pkg/proxy/handler.go b/pkg/proxy/handler.go index 3a54c94b..8782841c 100644 --- a/pkg/proxy/handler.go +++ b/pkg/proxy/handler.go @@ -91,7 +91,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { switch { case errors.Is(err, context.Canceled): w.WriteHeader(http.StatusInternalServerError) - _, _ = w.Write([]byte("Request cancelled")) + _, _ = w.Write([]byte("Request canceled")) return case errors.Is(err, context.DeadlineExceeded): w.WriteHeader(http.StatusGatewayTimeout) @@ -112,7 +112,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } // parseModel parses the model name from the request -// returns empty string when none found or an error for failures on the proxy request object +// returns empty string when none found or an error for failures on the proxy request object. func parseModel(r *http.Request) (string, *http.Request, error) { if model := r.Header.Get("X-Model"); model != "" { return model, r, nil diff --git a/pkg/proxy/metrics_test.go b/pkg/proxy/metrics_test.go index 5940e445..ed38c892 100644 --- a/pkg/proxy/metrics_test.go +++ b/pkg/proxy/metrics_test.go @@ -12,16 +12,16 @@ import ( "github.com/prometheus/client_model/go" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/substratusai/lingo/pkg/deployments" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" - ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/config" "sigs.k8s.io/controller-runtime/pkg/manager" + + "github.com/substratusai/lingo/pkg/deployments" ) func TestMetrics(t *testing.T) { diff --git a/pkg/queue/metrics.go b/pkg/queue/metrics.go index e2f0adee..d302eee8 100644 --- a/pkg/queue/metrics.go +++ b/pkg/queue/metrics.go @@ -8,7 +8,7 @@ type MetricsCollector struct { manager *Manager } -// NewMetricsCollector constructor +// NewMetricsCollector constructor. func NewMetricsCollector(m *Manager) *MetricsCollector { if m == nil { panic("manager required") @@ -20,12 +20,12 @@ func NewMetricsCollector(m *Manager) *MetricsCollector { } } -// MustRegister registers all metrics +// MustRegister registers all metrics. func (p *MetricsCollector) MustRegister(r prometheus.Registerer) { r.MustRegister(p) } -// Describe sends the super-set of all possible descriptors of metrics +// Describe sends the super-set of all possible descriptors of metrics. func (p *MetricsCollector) Describe(descs chan<- *prometheus.Desc) { descs <- p.inFlightDescr descs <- p.queuedDescr diff --git a/pkg/queue/queue.go b/pkg/queue/queue.go index eb9ac5bc..9a77728b 100644 --- a/pkg/queue/queue.go +++ b/pkg/queue/queue.go @@ -3,6 +3,7 @@ package queue import ( "container/list" "context" + "fmt" "log" "sync" "sync/atomic" @@ -152,7 +153,10 @@ func (q *Queue) Start() { continue } - itm := e.Value.(*item) + itm, ok := e.Value.(*item) + if !ok { + panic(fmt.Sprintf("invalid type: %T", e.Value)) + } q.dequeue(itm, true) log.Println("Dequeued: ", itm.id) diff --git a/tests/integration/integration_test.go b/tests/integration/integration_test.go index 1917a58a..c8fb3856 100644 --- a/tests/integration/integration_test.go +++ b/tests/integration/integration_test.go @@ -13,8 +13,6 @@ import ( "testing" "time" - "sigs.k8s.io/controller-runtime/pkg/client" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" appsv1 "k8s.io/api/apps/v1" @@ -24,6 +22,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/utils/ptr" + "sigs.k8s.io/controller-runtime/pkg/client" ) func TestScaleUpAndDown(t *testing.T) { diff --git a/tests/integration/main_test.go b/tests/integration/main_test.go index 74697559..ab530f67 100644 --- a/tests/integration/main_test.go +++ b/tests/integration/main_test.go @@ -11,12 +11,6 @@ import ( "testing" "time" - "github.com/substratusai/lingo/pkg/autoscaler" - "github.com/substratusai/lingo/pkg/deployments" - "github.com/substratusai/lingo/pkg/endpoints" - "github.com/substratusai/lingo/pkg/leader" - "github.com/substratusai/lingo/pkg/proxy" - "github.com/substratusai/lingo/pkg/queue" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -27,6 +21,13 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/envtest" metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" + + "github.com/substratusai/lingo/pkg/autoscaler" + "github.com/substratusai/lingo/pkg/deployments" + "github.com/substratusai/lingo/pkg/endpoints" + "github.com/substratusai/lingo/pkg/leader" + "github.com/substratusai/lingo/pkg/proxy" + "github.com/substratusai/lingo/pkg/queue" ) var ( From 10da194ef64cafdc310ea5aaed76042e401576ed Mon Sep 17 00:00:00 2001 From: Alex Peters Date: Fri, 19 Jan 2024 16:07:24 +0100 Subject: [PATCH 3/5] Do not lint tests --- .golangci.yaml | 3 +++ Makefile | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/.golangci.yaml b/.golangci.yaml index 8c48f26f..8ffedeea 100644 --- a/.golangci.yaml +++ b/.golangci.yaml @@ -1,5 +1,8 @@ run: timeout: 5m + tests: false + sort-results: true + allow-parallel-runners: true linters-settings: gci: diff --git a/Makefile b/Makefile index d5d8d703..5e46a4d9 100644 --- a/Makefile +++ b/Makefile @@ -36,7 +36,7 @@ $(GOLANGCI): $(LOCALBIN) .PHONY: lint lint: golangci - golangci-lint run --tests=false ./... --timeout 5m + golangci-lint run ./... --timeout 5m GOLANGCI ?= $(LOCALBIN)/golangci-lint From 0aa92a52489e990647126ac1301ba8e839833d83 Mon Sep 17 00:00:00 2001 From: Alex Peters Date: Mon, 22 Jan 2024 08:39:37 +0100 Subject: [PATCH 4/5] Review feedback --- cmd/lingo/main.go | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/cmd/lingo/main.go b/cmd/lingo/main.go index 5ec3f00e..c366f02f 100644 --- a/cmd/lingo/main.go +++ b/cmd/lingo/main.go @@ -165,12 +165,8 @@ func run() error { wg.Add(1) go func() { defer func() { - if err := statsServer.Shutdown(context.Background()); err != nil { - setupLog.Error(err, "shutdown stats server") - } - if err := proxyServer.Shutdown(context.Background()); err != nil { - setupLog.Error(err, "shutdown proxy server") - } + _ = statsServer.Shutdown(context.Background()) + _ = proxyServer.Shutdown(context.Background()) wg.Done() }() if err := mgr.Start(ctx); err != nil { From b21571aae12e2f43d95ff56c77bfb1ae309d18da Mon Sep 17 00:00:00 2001 From: Alex Peters Date: Tue, 13 Feb 2024 10:25:09 +0100 Subject: [PATCH 5/5] Review feedback --- .github/workflows/linter.yaml | 36 --------------------------- .golangci.yaml | 46 ----------------------------------- Makefile | 29 ---------------------- cmd/lingo/main.go | 10 +++++--- pkg/autoscaler/autoscaler.go | 40 +++++++++++++++--------------- 5 files changed, 26 insertions(+), 135 deletions(-) delete mode 100644 .github/workflows/linter.yaml delete mode 100644 .golangci.yaml diff --git a/.github/workflows/linter.yaml b/.github/workflows/linter.yaml deleted file mode 100644 index 72655638..00000000 --- a/.github/workflows/linter.yaml +++ /dev/null @@ -1,36 +0,0 @@ -name: lint - -on: - push: - branches: - - main - paths-ignore: - - '**.md' - pull_request: - branches: - - main - paths-ignore: - - '**.md' - -permissions: read-all - -jobs: - lint: - runs-on: ubuntu-latest - steps: - - name: Harden Runner - uses: step-security/harden-runner@eb238b55efaa70779f274895e782ed17c84f2895 # v2.6.1 - with: - egress-policy: audit - - - uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4.1.1 - - - uses: actions/setup-go@0c52d547c9bc32b1aa3301fd7a9cb496313a4491 # v5.0.0 - with: - go-version: "1.21" - check-latest: true - - - name: lint - uses: golangci/golangci-lint-action@3a919529898de77ec3da873e3063ca4b10e7f5cc # v3.7.0 - with: - version: v1.55.2 \ No newline at end of file diff --git a/.golangci.yaml b/.golangci.yaml deleted file mode 100644 index 8ffedeea..00000000 --- a/.golangci.yaml +++ /dev/null @@ -1,46 +0,0 @@ -run: - timeout: 5m - tests: false - sort-results: true - allow-parallel-runners: true - -linters-settings: - gci: - custom-order: true - sections: - - standard # Standard section: captures all standard packages. - - default # Default section: contains all imports that could not be matched to another section type. - - prefix(*.k8s.io) - - prefix(github.com/substratusai/lingo) - lll: - line-length: 200 - misspell: - locale: US - staticcheck: - go: "1.21" - -linters: - disable-all: true - enable: - - errcheck - - errorlint - - exportloopref - - forcetypeassert - - gci - - gocritic - - goconst - - godot - - gofmt - - gofumpt - - goimports - - gosec - - gosimple - - govet - - ineffassign - - misspell - - revive - - staticcheck - - typecheck - - unconvert - - unused - - whitespace \ No newline at end of file diff --git a/Makefile b/Makefile index 5e46a4d9..639c369a 100644 --- a/Makefile +++ b/Makefile @@ -26,32 +26,3 @@ envtest: $(ENVTEST) ## Download envtest-setup locally if necessary. $(ENVTEST): $(LOCALBIN) test -s $(LOCALBIN)/setup-envtest || GOBIN=$(LOCALBIN) go install sigs.k8s.io/controller-runtime/tools/setup-envtest@latest -GOLANGCI ?= $(LOCALBIN)/golangci-lint - -.PHONY: golangci -golangci: $(GOLANGCI) ## Download golangci-lint locally if necessary. -$(GOLANGCI): $(LOCALBIN) - test -s $(LOCALBIN)golangci-lint || GOBIN=$(LOCALBIN) go install github.com/golangci/golangci-lint/cmd/golangci-lint@v1.55.2 - - -.PHONY: lint -lint: golangci - golangci-lint run ./... --timeout 5m - -GOLANGCI ?= $(LOCALBIN)/golangci-lint - -.PHONY: golangci -golangci: $(GOLANGCI) ## Download golangci-lint locally if necessary. -$(GOLANGCI): $(LOCALBIN) - test -s $(LOCALBIN)golangci-lint || GOBIN=$(LOCALBIN) go install github.com/golangci/golangci-lint/cmd/golangci-lint@v1.55.2 - -GCI ?= $(LOCALBIN)/gci - -.PHONY: formatter -formatter: $(GCI) ## Download gci locally if necessary. -$(GCI): $(LOCALBIN) - test -s $(LOCALBIN)gci || GOBIN=$(LOCALBIN) go install github.com/daixiang0/gci@v0.12.1 - -.PHONY: format -format: formatter - find . -name '*.go' -type f -not -path "./vendor*" -not -path "*.git*" | xargs gci write --skip-generated -s standard -s default -s "prefix(*.k8s.io)" -s "prefix(github.com/substratusai/lingo)" --custom-order \ No newline at end of file diff --git a/cmd/lingo/main.go b/cmd/lingo/main.go index c366f02f..59d9e2b2 100644 --- a/cmd/lingo/main.go +++ b/cmd/lingo/main.go @@ -66,10 +66,12 @@ func run() error { concurrency := getEnvInt("CONCURRENCY", 100) scaleDownDelay := getEnvInt("SCALE_DOWN_DELAY", 30) - var metricsAddr string - var probeAddr string - var concurrencyPerReplica int - var requestHeaderTimeout time.Duration + var ( + metricsAddr string + probeAddr string + concurrencyPerReplica int + requestHeaderTimeout time.Duration // setting to prevent slowloris attack on http server + ) flag.StringVar(&metricsAddr, "metrics-bind-address", ":8082", "The address the metric endpoint binds to.") flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.") diff --git a/pkg/autoscaler/autoscaler.go b/pkg/autoscaler/autoscaler.go index c903c505..f04f7c0d 100644 --- a/pkg/autoscaler/autoscaler.go +++ b/pkg/autoscaler/autoscaler.go @@ -56,24 +56,24 @@ type Autoscaler struct { movingAvgQueueSize map[string]*movingaverage.Simple } -func (r *Autoscaler) SetupWithManager(mgr ctrl.Manager) error { +func (a *Autoscaler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&corev1.ConfigMap{}). - Complete(r) + Complete(a) } -func (r *Autoscaler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { +func (a *Autoscaler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { var cm corev1.ConfigMap - if err := r.Get(ctx, req.NamespacedName, &cm); err != nil { + if err := a.Get(ctx, req.NamespacedName, &cm); err != nil { return ctrl.Result{}, fmt.Errorf("get: %w", err) } return ctrl.Result{}, nil } -func (r *Autoscaler) Start() { - for range time.Tick(r.Interval) { - if !r.LeaderElection.IsLeader.Load() { +func (a *Autoscaler) Start() { + for range time.Tick(a.Interval) { + if !a.LeaderElection.IsLeader.Load() { log.Println("Not leader, doing nothing") continue } @@ -81,11 +81,11 @@ func (r *Autoscaler) Start() { log.Println("Calculating scales for all") // TODO: Remove hardcoded Service lookup by name "lingo". - otherLingoEndpoints := r.Endpoints.GetAllHosts("lingo", "stats") + otherLingoEndpoints := a.Endpoints.GetAllHosts("lingo", "stats") stats, errs := aggregateStats(stats.Stats{ - ActiveRequests: r.Queues.TotalCounts(), - }, r.HTTPClient, otherLingoEndpoints) + ActiveRequests: a.Queues.TotalCounts(), + }, a.HTTPClient, otherLingoEndpoints) if len(errs) != 0 { for _, err := range errs { log.Printf("Failed to aggregate stats: %v", err) @@ -95,26 +95,26 @@ func (r *Autoscaler) Start() { for deploymentName, waitCount := range stats.ActiveRequests { log.Println("Is leader, autoscaling") - avg := r.getMovingAvgQueueSize(deploymentName) + avg := a.getMovingAvgQueueSize(deploymentName) avg.Next(float64(waitCount)) flt := avg.Calculate() - normalized := flt / float64(r.ConcurrencyPerReplica) + normalized := flt / float64(a.ConcurrencyPerReplica) ceil := math.Ceil(normalized) log.Printf("Average for deployment: %s: %v (ceil: %v), current wait count: %v", deploymentName, flt, ceil, waitCount) - r.Deployments.SetDesiredScale(deploymentName, int32(ceil)) + a.Deployments.SetDesiredScale(deploymentName, int32(ceil)) } } } -func (r *Autoscaler) getMovingAvgQueueSize(deploymentName string) *movingaverage.Simple { - r.movingAvgQueueSizeMtx.Lock() - a, ok := r.movingAvgQueueSize[deploymentName] +func (a *Autoscaler) getMovingAvgQueueSize(deploymentName string) *movingaverage.Simple { + a.movingAvgQueueSizeMtx.Lock() + avg, ok := a.movingAvgQueueSize[deploymentName] if !ok { - a = movingaverage.NewSimple(make([]float64, r.AverageCount)) - r.movingAvgQueueSize[deploymentName] = a + avg = movingaverage.NewSimple(make([]float64, a.AverageCount)) + a.movingAvgQueueSize[deploymentName] = avg } - r.movingAvgQueueSizeMtx.Unlock() - return a + a.movingAvgQueueSizeMtx.Unlock() + return avg } func aggregateStats(s stats.Stats, httpc *http.Client, endpoints []string) (stats.Stats, []error) {