diff --git a/go.mod b/go.mod index bba639b0905..1109082d944 100644 --- a/go.mod +++ b/go.mod @@ -12,14 +12,14 @@ require ( github.com/google/go-cmp v0.5.9 github.com/google/gofuzz v1.2.0 github.com/google/mako v0.0.0-20190821191249-122f8dcef9e3 - github.com/google/uuid v1.3.0 + github.com/google/uuid v1.3.1 github.com/gorilla/websocket v1.5.0 github.com/hashicorp/go-cleanhttp v0.5.1 github.com/hashicorp/go-retryablehttp v0.6.7 github.com/hashicorp/golang-lru v1.0.2 github.com/kelseyhightower/envconfig v1.4.0 github.com/mitchellh/go-homedir v1.1.0 - github.com/openzipkin/zipkin-go v0.4.1 + github.com/openzipkin/zipkin-go v0.4.2 github.com/pelletier/go-toml/v2 v2.0.5 github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2 github.com/pkg/errors v0.9.1 @@ -46,8 +46,8 @@ require ( k8s.io/utils v0.0.0-20221108210102-8e77b1f39fe2 knative.dev/hack v0.0.0-20230818155117-9cc05a31e8c0 knative.dev/hack/schema v0.0.0-20230818155117-9cc05a31e8c0 - knative.dev/pkg v0.0.0-20230821102121-81e4ee140363 - knative.dev/reconciler-test v0.0.0-20230817080342-39774f133674 + knative.dev/pkg v0.0.0-20230901225035-211243a92d2f + knative.dev/reconciler-test v0.0.0-20230901013135-51e7751247b7 sigs.k8s.io/yaml v1.3.0 ) @@ -73,7 +73,7 @@ require ( github.com/evanphx/json-patch/v5 v5.6.0 // indirect github.com/go-kit/log v0.2.1 // indirect github.com/go-logfmt/logfmt v0.5.1 // indirect - github.com/go-logr/logr v1.2.3 // indirect + github.com/go-logr/logr v1.2.4 // indirect github.com/go-openapi/jsonpointer v0.19.5 // indirect github.com/go-openapi/jsonreference v0.20.0 // indirect github.com/go-openapi/swag v0.19.15 // indirect @@ -120,7 +120,7 @@ require ( golang.org/x/time v0.3.0 // indirect golang.org/x/tools v0.12.0 // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect - gomodules.xyz/jsonpatch/v2 v2.3.0 // indirect + gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect google.golang.org/api v0.138.0 // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/genproto v0.0.0-20230803162519-f966b187b2e5 // indirect diff --git a/go.sum b/go.sum index 0204afa7285..9a89bc45ad3 100644 --- a/go.sum +++ b/go.sum @@ -145,8 +145,8 @@ github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KE github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7sIas= github.com/go-logr/logr v0.2.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTgseGU= github.com/go-logr/logr v1.2.0/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= -github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0= -github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ= +github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-openapi/jsonpointer v0.19.3/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg= github.com/go-openapi/jsonpointer v0.19.5 h1:gZr+CIYByUqjcgeLXnQu2gHYQC9o73G2XUeOFYEICuY= @@ -157,6 +157,7 @@ github.com/go-openapi/swag v0.19.5/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh github.com/go-openapi/swag v0.19.15 h1:D2NRCBzS9/pEY3gP9Nl8aDqGUcPFrwG2p+CNFrLyrCM= github.com/go-openapi/swag v0.19.15/go.mod h1:QYRuS/SOXUCsnplDa677K7+DxSOj6IPNl/eQntq43wQ= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI= github.com/gobuffalo/flect v1.0.2 h1:eqjPGSo2WmjgY2XlpGwo2NXgL3RucAKo4k4qQMNA5sA= github.com/gobuffalo/flect v1.0.2/go.mod h1:A5msMlrHtLqh9umBSnvabjsMrCcCpAyzglnDvkbYKHs= github.com/gogo/googleapis v1.1.0/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s= @@ -236,12 +237,13 @@ github.com/google/pprof v0.0.0-20200212024743-f11f1df84d12/go.mod h1:ZgVRPoUq/hf github.com/google/pprof v0.0.0-20200229191704-1ebb73c60ed3/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM= github.com/google/pprof v0.0.0-20200430221834-fc25d7d30c6d/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM= github.com/google/pprof v0.0.0-20200708004538-1a94d8640e99/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM= +github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38 h1:yAJXTCF9TqKcTiHJAE8dj7HMvPfh66eeA2JYW7eFpSE= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/s2a-go v0.1.5 h1:8IYp3w9nysqv3JH+NJgXJzGbDHzLOTj43BmSkp+O7qg= github.com/google/s2a-go v0.1.5/go.mod h1:Ej+mSEMGRnqRzjc7VtF+jdBwYG5fuJfiZ8ELkjEwM0A= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= -github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4= +github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/enterprise-certificate-proxy v0.2.5 h1:UR4rDjcgpgEnqpIEvkiqTYKBCKLNmlge2eVjoZfySzM= github.com/googleapis/enterprise-certificate-proxy v0.2.5/go.mod h1:RxW0N9901Cko1VOCW3SXCpWP+mlIEkk2tP7jnHy9a3w= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= @@ -334,15 +336,15 @@ github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk= -github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= -github.com/onsi/ginkgo/v2 v2.4.0 h1:+Ig9nvqgS5OBSACXNk15PLdp0U9XPYROt9CFzVdFGIs= +github.com/onsi/ginkgo v1.14.2 h1:8mVmC9kjFFmA8H4pKMUhcblgifdkOIXPvbhN1T36q1M= +github.com/onsi/ginkgo/v2 v2.11.0 h1:WgqUCUt/lT6yXoQ8Wef0fsNn5cAuMK7+KT9UFRz2tcU= github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= -github.com/onsi/gomega v1.23.0 h1:/oxKu9c2HVap+F3PfKort2Hw5DEU+HGlW8n+tguWsys= +github.com/onsi/gomega v1.27.10 h1:naR28SdDFlqrG6kScpT8VWpu1xWY5nJRCF3XaYyBjhI= github.com/openzipkin/zipkin-go v0.2.2/go.mod h1:NaW6tEwdmWMaCDZzg8sh+IBNOxHMPnhQw8ySjnjRyN4= -github.com/openzipkin/zipkin-go v0.4.1 h1:kNd/ST2yLLWhaWrkgchya40TJabe8Hioj9udfPcEO5A= -github.com/openzipkin/zipkin-go v0.4.1/go.mod h1:qY0VqDSN1pOBN94dBc6w2GJlWLiovAyg7Qt6/I9HecM= +github.com/openzipkin/zipkin-go v0.4.2 h1:zjqfqHjUpPmB3c1GlCvvgsM1G4LkvqQbBDueDOCg/jA= +github.com/openzipkin/zipkin-go v0.4.2/go.mod h1:ZeVkFjuuBiSy13y8vpSDCjMi9GoI3hPpCJSBx/EYFhY= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/pelletier/go-toml/v2 v2.0.5 h1:ipoSadvV8oGUjnUbMub59IDPPwfxF694nG/jwbMiyQg= github.com/pelletier/go-toml/v2 v2.0.5/go.mod h1:OMHamSCAODeSsVrwwvcJOaoN0LIUIaFVNZzmWyNfXas= @@ -708,8 +710,8 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk= golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8= -gomodules.xyz/jsonpatch/v2 v2.3.0 h1:8NFhfS6gzxNqjLIYnZxg319wZ5Qjnx4m/CcX+Klzazc= -gomodules.xyz/jsonpatch/v2 v2.3.0/go.mod h1:AH3dM2RI6uoBZxn3LVrfvJ3E0/9dG4cSrbuBJT4moAY= +gomodules.xyz/jsonpatch/v2 v2.4.0 h1:Ci3iUJyx9UeRx7CeFN8ARgGbkESwJK+KB9lLcWxY/Zw= +gomodules.xyz/jsonpatch/v2 v2.4.0/go.mod h1:AH3dM2RI6uoBZxn3LVrfvJ3E0/9dG4cSrbuBJT4moAY= gonum.org/v1/gonum v0.0.0-20181121035319-3f7ecaa7e8ca h1:PupagGYwj8+I4ubCxcmcBRk3VlUWtTg5huQpZR9flmE= gonum.org/v1/gonum v0.0.0-20181121035319-3f7ecaa7e8ca/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo= gonum.org/v1/netlib v0.0.0-20181029234149-ec6d1f5cefe6/go.mod h1:wa6Ws7BG/ESfp6dHfk7C6KdzKA7wR7u/rKwOGE66zvw= @@ -873,10 +875,10 @@ knative.dev/hack v0.0.0-20230818155117-9cc05a31e8c0 h1:n9YEGYuoj31pAkhGlNL+xTQAe knative.dev/hack v0.0.0-20230818155117-9cc05a31e8c0/go.mod h1:yk2OjGDsbEnQjfxdm0/HJKS2WqTLEFg/N6nUs6Rqx3Q= knative.dev/hack/schema v0.0.0-20230818155117-9cc05a31e8c0 h1:eowpUm7e3BZFdzeNybqxmjKvcVJOdsYtIzSiYEYyczc= knative.dev/hack/schema v0.0.0-20230818155117-9cc05a31e8c0/go.mod h1:GeIb+PLd5mllawcpHEGF5J5fYTQrvgEO5liao8lUKUs= -knative.dev/pkg v0.0.0-20230821102121-81e4ee140363 h1:TI2hMwTM5Bl+yaWu1gN5bXAHSvc+FtH9cqm3NzmDBtY= -knative.dev/pkg v0.0.0-20230821102121-81e4ee140363/go.mod h1:dA3TdhFTRm4KmmpvfknpGV43SbGNFkLHySjC8/+NczM= -knative.dev/reconciler-test v0.0.0-20230817080342-39774f133674 h1:xgUvk/bKVq0wDgahE/wxmg3sD6j2mjCAimJGtxaQeiY= -knative.dev/reconciler-test v0.0.0-20230817080342-39774f133674/go.mod h1:i+/PWK/n3HPgjXMoj5U7CA6WRW/C3c3EfHCQ0FmrhNM= +knative.dev/pkg v0.0.0-20230901225035-211243a92d2f h1:I60WBu0TRBhQ1ke8s3xfhn3fXo2OOLv+ebCoTwUdddU= +knative.dev/pkg v0.0.0-20230901225035-211243a92d2f/go.mod h1:KOCW7iby+PL0aSDG2Ta7Vf1kCn+VAqL7QaTyK0c4fuk= +knative.dev/reconciler-test v0.0.0-20230901013135-51e7751247b7 h1:ckW3I6nCIOCpmk3u7+GKKEPfBbYZn5Mr+uZ1Mgzllcg= +knative.dev/reconciler-test v0.0.0-20230901013135-51e7751247b7/go.mod h1:i+/PWK/n3HPgjXMoj5U7CA6WRW/C3c3EfHCQ0FmrhNM= pgregory.net/rapid v1.0.0 h1:iQaM2w5PZ6xvt6x7hbd7tiDS+nk7YPp5uCaEba+T/F4= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= diff --git a/pkg/scheduler/state/state.go b/pkg/scheduler/state/state.go index fcc758cd394..2d5460cf801 100644 --- a/pkg/scheduler/state/state.go +++ b/pkg/scheduler/state/state.go @@ -20,6 +20,7 @@ import ( "context" "encoding/json" "errors" + "math" "strconv" "time" @@ -95,6 +96,13 @@ type State struct { // Stores for each vpod, a map of zonename to total number of vreplicas placed on all pods located in that zone currently ZoneSpread map[types.NamespacedName]map[string]int32 + + // Pending tracks the number of virtual replicas that haven't been scheduled yet + // because there wasn't enough free capacity. + Pending map[types.NamespacedName]int32 + + // ExpectedVReplicaByVPod is the expected virtual replicas for each vpod key + ExpectedVReplicaByVPod map[types.NamespacedName]int32 } // Free safely returns the free capacity at the given ordinal @@ -190,6 +198,8 @@ func (s *stateBuilder) State(reserved map[types.NamespacedName]map[string]int32) } free := make([]int32, 0) + pending := make(map[types.NamespacedName]int32, 4) + expectedVReplicasByVPod := make(map[types.NamespacedName]int32, len(vpods)) schedulablePods := sets.NewInt32() last := int32(-1) @@ -255,10 +265,17 @@ func (s *stateBuilder) State(reserved map[types.NamespacedName]map[string]int32) } } + for _, p := range schedulablePods.List() { + free, last = s.updateFreeCapacity(free, last, PodNameFromOrdinal(s.statefulSetName, p), 0) + } + // Getting current state from existing placements for all vpods for _, vpod := range vpods { ps := vpod.GetPlacements() + pending[vpod.GetKey()] = pendingFromVPod(vpod) + expectedVReplicasByVPod[vpod.GetKey()] = vpod.GetVReplicas() + withPlacement[vpod.GetKey()] = make(map[string]bool) podSpread[vpod.GetKey()] = make(map[string]int32) nodeSpread[vpod.GetKey()] = make(map[string]int32) @@ -321,13 +338,20 @@ func (s *stateBuilder) State(reserved map[types.NamespacedName]map[string]int32) state := &State{FreeCap: free, SchedulablePods: schedulablePods.List(), LastOrdinal: last, Capacity: s.capacity, Replicas: scale.Spec.Replicas, NumZones: int32(len(zoneMap)), NumNodes: int32(len(nodeToZoneMap)), SchedulerPolicy: s.schedulerPolicy, SchedPolicy: s.schedPolicy, DeschedPolicy: s.deschedPolicy, NodeToZoneMap: nodeToZoneMap, StatefulSetName: s.statefulSetName, PodLister: s.podLister, - PodSpread: podSpread, NodeSpread: nodeSpread, ZoneSpread: zoneSpread} + PodSpread: podSpread, NodeSpread: nodeSpread, ZoneSpread: zoneSpread, Pending: pending, ExpectedVReplicaByVPod: expectedVReplicasByVPod} s.logger.Infow("cluster state info", zap.Any("state", state), zap.Any("reserved", toJSONable(reserved))) return state, nil } +func pendingFromVPod(vpod scheduler.VPod) int32 { + expected := vpod.GetVReplicas() + scheduled := scheduler.GetTotalVReplicas(vpod.GetPlacements()) + + return int32(math.Max(float64(0), float64(expected-scheduled))) +} + func (s *stateBuilder) updateFreeCapacity(free []int32, last int32, podName string, vreplicas int32) ([]int32, int32) { ordinal := OrdinalFromPodName(podName) free = grow(free, ordinal, s.capacity) @@ -340,13 +364,29 @@ func (s *stateBuilder) updateFreeCapacity(free []int32, last int32, podName stri s.logger.Errorw("pod is overcommitted", zap.String("podName", podName), zap.Int32("free", free[ordinal])) } - if ordinal > last && free[ordinal] != s.capacity { + if ordinal > last { last = ordinal } return free, last } +func (s *State) TotalPending() int32 { + t := int32(0) + for _, p := range s.Pending { + t += p + } + return t +} + +func (s *State) TotalExpectedVReplicas() int32 { + t := int32(0) + for _, v := range s.ExpectedVReplicaByVPod { + t += v + } + return t +} + func grow(slice []int32, ordinal int32, def int32) []int32 { l := int32(len(slice)) diff := ordinal - l + 1 @@ -435,6 +475,7 @@ func (s *State) MarshalJSON() ([]byte, error) { SchedulerPolicy scheduler.SchedulerPolicyType `json:"schedulerPolicy"` SchedPolicy *scheduler.SchedulerPolicy `json:"schedPolicy"` DeschedPolicy *scheduler.SchedulerPolicy `json:"deschedPolicy"` + Pending map[string]int32 `json:"pending"` } sj := S{ @@ -453,6 +494,7 @@ func (s *State) MarshalJSON() ([]byte, error) { SchedulerPolicy: s.SchedulerPolicy, SchedPolicy: s.SchedPolicy, DeschedPolicy: s.DeschedPolicy, + Pending: toJSONablePending(s.Pending), } return json.Marshal(sj) @@ -465,3 +507,12 @@ func toJSONable(ps map[types.NamespacedName]map[string]int32) map[string]map[str } return r } + +func toJSONablePending(pending map[types.NamespacedName]int32) map[string]int32 { + r := make(map[string]int32, len(pending)) + for k, v := range pending { + r[k.String()] = v + } + return r + +} diff --git a/pkg/scheduler/state/state_test.go b/pkg/scheduler/state/state_test.go index dce2bb5ef59..7244eea2bf9 100644 --- a/pkg/scheduler/state/state_test.go +++ b/pkg/scheduler/state/state_test.go @@ -62,7 +62,7 @@ func TestStateBuilder(t *testing.T) { name: "no vpods", replicas: int32(0), vpods: [][]duckv1alpha1.Placement{}, - expected: State{Capacity: 10, FreeCap: []int32{}, SchedulablePods: []int32{}, LastOrdinal: -1, SchedulerPolicy: scheduler.MAXFILLUP, SchedPolicy: &scheduler.SchedulerPolicy{}, DeschedPolicy: &scheduler.SchedulerPolicy{}, StatefulSetName: sfsName}, + expected: State{Capacity: 10, FreeCap: []int32{}, SchedulablePods: []int32{}, LastOrdinal: -1, SchedulerPolicy: scheduler.MAXFILLUP, SchedPolicy: &scheduler.SchedulerPolicy{}, DeschedPolicy: &scheduler.SchedulerPolicy{}, StatefulSetName: sfsName, Pending: map[types.NamespacedName]int32{}, ExpectedVReplicaByVPod: map[types.NamespacedName]int32{}}, freec: int32(0), schedulerPolicyType: scheduler.MAXFILLUP, }, @@ -87,6 +87,12 @@ func TestStateBuilder(t *testing.T) { "zone-0": 1, }, }, + Pending: map[types.NamespacedName]int32{ + {Name: "vpod-name-0", Namespace: "vpod-ns-0"}: 0, + }, + ExpectedVReplicaByVPod: map[types.NamespacedName]int32{ + {Name: "vpod-name-0", Namespace: "vpod-ns-0"}: 1, + }, }, freec: int32(9), schedulerPolicyType: scheduler.MAXFILLUP, @@ -141,6 +147,16 @@ func TestStateBuilder(t *testing.T) { "zone-1": 3, }, }, + Pending: map[types.NamespacedName]int32{ + {Name: "vpod-name-0", Namespace: "vpod-ns-0"}: 0, + {Name: "vpod-name-1", Namespace: "vpod-ns-1"}: 0, + {Name: "vpod-name-2", Namespace: "vpod-ns-2"}: 0, + }, + ExpectedVReplicaByVPod: map[types.NamespacedName]int32{ + {Name: "vpod-name-0", Namespace: "vpod-ns-0"}: 1, + {Name: "vpod-name-1", Namespace: "vpod-ns-1"}: 1, + {Name: "vpod-name-2", Namespace: "vpod-ns-2"}: 1, + }, }, freec: int32(18), schedulerPolicyType: scheduler.MAXFILLUP, @@ -190,6 +206,16 @@ func TestStateBuilder(t *testing.T) { "zone-1": 3, }, }, + Pending: map[types.NamespacedName]int32{ + {Name: "vpod-name-0", Namespace: "vpod-ns-0"}: 0, + {Name: "vpod-name-1", Namespace: "vpod-ns-1"}: 0, + {Name: "vpod-name-2", Namespace: "vpod-ns-2"}: 0, + }, + ExpectedVReplicaByVPod: map[types.NamespacedName]int32{ + {Name: "vpod-name-0", Namespace: "vpod-ns-0"}: 1, + {Name: "vpod-name-1", Namespace: "vpod-ns-1"}: 1, + {Name: "vpod-name-2", Namespace: "vpod-ns-2"}: 1, + }, }, freec: int32(10), schedulerPolicyType: scheduler.MAXFILLUP, @@ -203,7 +229,7 @@ func TestStateBuilder(t *testing.T) { {{PodName: "statefulset-name-1", VReplicas: 0}}, {{PodName: "statefulset-name-1", VReplicas: 0}, {PodName: "statefulset-name-3", VReplicas: 0}}, }, - expected: State{Capacity: 10, FreeCap: []int32{int32(9), int32(10), int32(5), int32(10)}, SchedulablePods: []int32{int32(0), int32(1), int32(2), int32(3)}, LastOrdinal: 2, Replicas: 4, NumNodes: 4, NumZones: 3, SchedulerPolicy: scheduler.MAXFILLUP, SchedPolicy: &scheduler.SchedulerPolicy{}, DeschedPolicy: &scheduler.SchedulerPolicy{}, StatefulSetName: sfsName, + expected: State{Capacity: 10, FreeCap: []int32{int32(9), int32(10), int32(5), int32(10)}, SchedulablePods: []int32{int32(0), int32(1), int32(2), int32(3)}, LastOrdinal: 3, Replicas: 4, NumNodes: 4, NumZones: 3, SchedulerPolicy: scheduler.MAXFILLUP, SchedPolicy: &scheduler.SchedulerPolicy{}, DeschedPolicy: &scheduler.SchedulerPolicy{}, StatefulSetName: sfsName, NodeToZoneMap: map[string]string{"node-0": "zone-0", "node-1": "zone-1", "node-2": "zone-2", "node-3": "zone-0"}, PodSpread: map[types.NamespacedName]map[string]int32{ {Name: vpodName + "-0", Namespace: vpodNs + "-0"}: { @@ -244,6 +270,16 @@ func TestStateBuilder(t *testing.T) { "zone-1": 0, }, }, + Pending: map[types.NamespacedName]int32{ + {Name: "vpod-name-0", Namespace: "vpod-ns-0"}: 0, + {Name: "vpod-name-1", Namespace: "vpod-ns-1"}: 1, + {Name: "vpod-name-2", Namespace: "vpod-ns-2"}: 1, + }, + ExpectedVReplicaByVPod: map[types.NamespacedName]int32{ + {Name: "vpod-name-0", Namespace: "vpod-ns-0"}: 1, + {Name: "vpod-name-1", Namespace: "vpod-ns-1"}: 1, + {Name: "vpod-name-2", Namespace: "vpod-ns-2"}: 1, + }, }, freec: int32(34), schedulerPolicyType: scheduler.MAXFILLUP, @@ -257,7 +293,7 @@ func TestStateBuilder(t *testing.T) { {{PodName: "statefulset-name-1", VReplicas: 0}}, {{PodName: "statefulset-name-1", VReplicas: 0}, {PodName: "statefulset-name-3", VReplicas: 0}}, }, - expected: State{Capacity: 10, FreeCap: []int32{int32(3), int32(10), int32(5), int32(10)}, SchedulablePods: []int32{int32(0), int32(1), int32(2), int32(3)}, LastOrdinal: 2, Replicas: 4, NumNodes: 4, NumZones: 3, SchedulerPolicy: scheduler.MAXFILLUP, SchedPolicy: &scheduler.SchedulerPolicy{}, DeschedPolicy: &scheduler.SchedulerPolicy{}, StatefulSetName: sfsName, + expected: State{Capacity: 10, FreeCap: []int32{int32(3), int32(10), int32(5), int32(10)}, SchedulablePods: []int32{int32(0), int32(1), int32(2), int32(3)}, LastOrdinal: 3, Replicas: 4, NumNodes: 4, NumZones: 3, SchedulerPolicy: scheduler.MAXFILLUP, SchedPolicy: &scheduler.SchedulerPolicy{}, DeschedPolicy: &scheduler.SchedulerPolicy{}, StatefulSetName: sfsName, NodeToZoneMap: map[string]string{"node-0": "zone-0", "node-1": "zone-1", "node-2": "zone-2", "node-3": "zone-0"}, PodSpread: map[types.NamespacedName]map[string]int32{ {Name: vpodName + "-0", Namespace: vpodNs + "-0"}: { @@ -298,6 +334,16 @@ func TestStateBuilder(t *testing.T) { "zone-1": 0, }, }, + Pending: map[types.NamespacedName]int32{ + {Name: "vpod-name-0", Namespace: "vpod-ns-0"}: 0, + {Name: "vpod-name-1", Namespace: "vpod-ns-1"}: 1, + {Name: "vpod-name-2", Namespace: "vpod-ns-2"}: 1, + }, + ExpectedVReplicaByVPod: map[types.NamespacedName]int32{ + {Name: "vpod-name-0", Namespace: "vpod-ns-0"}: 1, + {Name: "vpod-name-1", Namespace: "vpod-ns-1"}: 1, + {Name: "vpod-name-2", Namespace: "vpod-ns-2"}: 1, + }, }, freec: int32(28), reserved: map[types.NamespacedName]map[string]int32{ @@ -361,6 +407,16 @@ func TestStateBuilder(t *testing.T) { "zone-1": 0, }, }, + Pending: map[types.NamespacedName]int32{ + {Name: "vpod-name-0", Namespace: "vpod-ns-0"}: 0, + {Name: "vpod-name-1", Namespace: "vpod-ns-1"}: 1, + {Name: "vpod-name-2", Namespace: "vpod-ns-2"}: 1, + }, + ExpectedVReplicaByVPod: map[types.NamespacedName]int32{ + {Name: "vpod-name-0", Namespace: "vpod-ns-0"}: 1, + {Name: "vpod-name-1", Namespace: "vpod-ns-1"}: 1, + {Name: "vpod-name-2", Namespace: "vpod-ns-2"}: 1, + }, }, freec: int32(26), reserved: map[types.NamespacedName]map[string]int32{ @@ -427,6 +483,16 @@ func TestStateBuilder(t *testing.T) { "zone-1": 0, }, }, + Pending: map[types.NamespacedName]int32{ + {Name: "vpod-name-0", Namespace: "vpod-ns-0"}: 0, + {Name: "vpod-name-1", Namespace: "vpod-ns-1"}: 1, + {Name: "vpod-name-2", Namespace: "vpod-ns-2"}: 1, + }, + ExpectedVReplicaByVPod: map[types.NamespacedName]int32{ + {Name: "vpod-name-0", Namespace: "vpod-ns-0"}: 1, + {Name: "vpod-name-1", Namespace: "vpod-ns-1"}: 1, + {Name: "vpod-name-2", Namespace: "vpod-ns-2"}: 1, + }, }, freec: int32(28), reserved: map[types.NamespacedName]map[string]int32{ @@ -462,6 +528,12 @@ func TestStateBuilder(t *testing.T) { "zone-0": 1, }, }, + Pending: map[types.NamespacedName]int32{ + {Name: "vpod-name-0", Namespace: "vpod-ns-0"}: 0, + }, + ExpectedVReplicaByVPod: map[types.NamespacedName]int32{ + {Name: "vpod-name-0", Namespace: "vpod-ns-0"}: 1, + }, }, freec: int32(9), schedulerPolicyType: scheduler.MAXFILLUP, @@ -488,6 +560,12 @@ func TestStateBuilder(t *testing.T) { "zone-0": 1, }, }, + Pending: map[types.NamespacedName]int32{ + {Name: "vpod-name-0", Namespace: "vpod-ns-0"}: 0, + }, + ExpectedVReplicaByVPod: map[types.NamespacedName]int32{ + {Name: "vpod-name-0", Namespace: "vpod-ns-0"}: 1, + }, }, freec: int32(9), schedulerPolicy: &scheduler.SchedulerPolicy{ diff --git a/pkg/scheduler/statefulset/autoscaler.go b/pkg/scheduler/statefulset/autoscaler.go index 5641502e080..53b184e90c2 100644 --- a/pkg/scheduler/statefulset/autoscaler.go +++ b/pkg/scheduler/statefulset/autoscaler.go @@ -52,9 +52,8 @@ type Autoscaler interface { // Start runs the autoscaler until cancelled. Start(ctx context.Context) - // Autoscale is used to immediately trigger the autoscaler with the hint - // that pending number of vreplicas couldn't be scheduled. - Autoscale(ctx context.Context, attemptScaleDown bool, pending int32) + // Autoscale is used to immediately trigger the autoscaler. + Autoscale(ctx context.Context) } type autoscaler struct { @@ -63,7 +62,7 @@ type autoscaler struct { vpodLister scheduler.VPodLister logger *zap.SugaredLogger stateAccessor st.StateAccessor - trigger chan int32 + trigger chan struct{} evictor scheduler.Evictor // capacity is the total number of virtual replicas available per pod. @@ -77,6 +76,11 @@ type autoscaler struct { // The autoscaler is considered the leader when ephemeralLeaderElectionObject is in a // bucket where we've been promoted. isLeader atomic.Bool + + // getReserved returns reserved replicas. + getReserved GetReserved + + lastCompactAttempt time.Time } var ( @@ -108,53 +112,65 @@ func newAutoscaler(ctx context.Context, cfg *Config, stateAccessor st.StateAcces vpodLister: cfg.VPodLister, stateAccessor: stateAccessor, evictor: cfg.Evictor, - trigger: make(chan int32, 1), + trigger: make(chan struct{}, 1), capacity: cfg.PodCapacity, refreshPeriod: cfg.RefreshPeriod, lock: new(sync.Mutex), isLeader: atomic.Bool{}, + getReserved: cfg.getReserved, + // Anything that is less than now() - refreshPeriod, so that we will try to compact + // as soon as we start. + lastCompactAttempt: time.Now(). + Add(-cfg.RefreshPeriod). + Add(-time.Minute), } } func (a *autoscaler) Start(ctx context.Context) { attemptScaleDown := false - pending := int32(0) for { select { case <-ctx.Done(): return case <-time.After(a.refreshPeriod): attemptScaleDown = true - case pending = <-a.trigger: + case <-a.trigger: attemptScaleDown = false } // Retry a few times, just so that we don't have to wait for the next beat when // a transient error occurs - a.syncAutoscale(ctx, attemptScaleDown, pending) - pending = int32(0) + a.syncAutoscale(ctx, attemptScaleDown) } } -func (a *autoscaler) Autoscale(ctx context.Context, attemptScaleDown bool, pending int32) { - a.syncAutoscale(ctx, attemptScaleDown, pending) +func (a *autoscaler) Autoscale(ctx context.Context) { + // We trigger the autoscaler asynchronously by using the channel so that the scale down refresh + // period is reset. + a.trigger <- struct{}{} } -func (a *autoscaler) syncAutoscale(ctx context.Context, attemptScaleDown bool, pending int32) { +func (a *autoscaler) syncAutoscale(ctx context.Context, attemptScaleDown bool) error { a.lock.Lock() defer a.lock.Unlock() + var lastErr error wait.Poll(500*time.Millisecond, 5*time.Second, func() (bool, error) { - err := a.doautoscale(ctx, attemptScaleDown, pending) + err := a.doautoscale(ctx, attemptScaleDown) + if err != nil { + logging.FromContext(ctx).Errorw("Failed to autoscale", zap.Error(err)) + } + lastErr = err return err == nil, nil }) + return lastErr } -func (a *autoscaler) doautoscale(ctx context.Context, attemptScaleDown bool, pending int32) error { +func (a *autoscaler) doautoscale(ctx context.Context, attemptScaleDown bool) error { if !a.isLeader.Load() { return nil } - state, err := a.stateAccessor.State(nil) + state, err := a.stateAccessor.State(a.getReserved()) if err != nil { a.logger.Info("error while refreshing scheduler state (will retry)", zap.Error(err)) return err @@ -168,9 +184,8 @@ func (a *autoscaler) doautoscale(ctx context.Context, attemptScaleDown bool, pen } a.logger.Debugw("checking adapter capacity", - zap.Int32("pending", pending), zap.Int32("replicas", scale.Spec.Replicas), - zap.Int32("last ordinal", state.LastOrdinal)) + zap.Any("state", state)) var scaleUpFactor, newreplicas, minNumPods int32 scaleUpFactor = 1 // Non-HA scaling @@ -183,21 +198,26 @@ func (a *autoscaler) doautoscale(ctx context.Context, attemptScaleDown bool, pen newreplicas = state.LastOrdinal + 1 // Ideal number - // Take into account pending replicas and pods that are already filled (for even pod spread) - if pending > 0 { - // Make sure to allocate enough pods for holding all pending replicas. - if state.SchedPolicy != nil && contains(state.SchedPolicy.Predicates, nil, st.EvenPodSpread) && len(state.FreeCap) > 0 { //HA scaling across pods - leastNonZeroCapacity := a.minNonZeroInt(state.FreeCap) - minNumPods = int32(math.Ceil(float64(pending) / float64(leastNonZeroCapacity))) - } else { - minNumPods = int32(math.Ceil(float64(pending) / float64(a.capacity))) + if state.SchedulerPolicy == scheduler.MAXFILLUP { + newreplicas = int32(math.Ceil(float64(state.TotalExpectedVReplicas()) / float64(state.Capacity))) + } else { + // Take into account pending replicas and pods that are already filled (for even pod spread) + pending := state.TotalPending() + if pending > 0 { + // Make sure to allocate enough pods for holding all pending replicas. + if state.SchedPolicy != nil && contains(state.SchedPolicy.Predicates, nil, st.EvenPodSpread) && len(state.FreeCap) > 0 { //HA scaling across pods + leastNonZeroCapacity := a.minNonZeroInt(state.FreeCap) + minNumPods = int32(math.Ceil(float64(pending) / float64(leastNonZeroCapacity))) + } else { + minNumPods = int32(math.Ceil(float64(pending) / float64(a.capacity))) + } + newreplicas += int32(math.Ceil(float64(minNumPods)/float64(scaleUpFactor)) * float64(scaleUpFactor)) } - newreplicas += int32(math.Ceil(float64(minNumPods)/float64(scaleUpFactor)) * float64(scaleUpFactor)) - } - // Make sure to never scale down past the last ordinal - if newreplicas <= state.LastOrdinal { - newreplicas = state.LastOrdinal + scaleUpFactor + if newreplicas <= state.LastOrdinal { + // Make sure to never scale down past the last ordinal + newreplicas = state.LastOrdinal + scaleUpFactor + } } // Only scale down if permitted @@ -223,6 +243,24 @@ func (a *autoscaler) doautoscale(ctx context.Context, attemptScaleDown bool, pen } func (a *autoscaler) mayCompact(s *st.State, scaleUpFactor int32) { + + // This avoids a too aggressive scale down by adding a "grace period" based on the refresh + // period + nextAttempt := a.lastCompactAttempt.Add(a.refreshPeriod) + if time.Now().Before(nextAttempt) { + a.logger.Debugw("Compact was retried before refresh period", + zap.Time("lastCompactAttempt", a.lastCompactAttempt), + zap.Time("nextAttempt", nextAttempt), + zap.String("refreshPeriod", a.refreshPeriod.String()), + ) + return + } + + a.logger.Debugw("Trying to compact and scale down", + zap.Int32("scaleUpFactor", scaleUpFactor), + zap.Any("state", s), + ) + // when there is only one pod there is nothing to move or number of pods is just enough! if s.LastOrdinal < 1 || len(s.SchedulablePods) <= int(scaleUpFactor) { return @@ -235,6 +273,7 @@ func (a *autoscaler) mayCompact(s *st.State, scaleUpFactor int32) { usedInLastPod := s.Capacity - s.Free(s.LastOrdinal) if freeCapacity >= usedInLastPod { + a.lastCompactAttempt = time.Now() err := a.compact(s, scaleUpFactor) if err != nil { a.logger.Errorw("vreplicas compaction failed", zap.Error(err)) @@ -254,6 +293,7 @@ func (a *autoscaler) mayCompact(s *st.State, scaleUpFactor int32) { if (freeCapacity >= usedInLastXPods) && //remaining pods can hold all vreps from evicted pods (s.Replicas-scaleUpFactor >= scaleUpFactor) { //remaining # of pods is enough for HA scaling + a.lastCompactAttempt = time.Now() err := a.compact(s, scaleUpFactor) if err != nil { a.logger.Errorw("vreplicas compaction failed", zap.Error(err)) diff --git a/pkg/scheduler/statefulset/autoscaler_test.go b/pkg/scheduler/statefulset/autoscaler_test.go index 976a2e1cf8a..48c56379e28 100644 --- a/pkg/scheduler/statefulset/autoscaler_test.go +++ b/pkg/scheduler/statefulset/autoscaler_test.go @@ -53,12 +53,12 @@ func TestAutoscaler(t *testing.T) { name string replicas int32 vpods []scheduler.VPod - pendings int32 scaleDown bool wantReplicas int32 schedulerPolicyType scheduler.SchedulerPolicyType schedulerPolicy *scheduler.SchedulerPolicy deschedulerPolicy *scheduler.SchedulerPolicy + reserved map[types.NamespacedName]map[string]int32 }{ { name: "no replicas, no placements, no pending", @@ -66,7 +66,6 @@ func TestAutoscaler(t *testing.T) { vpods: []scheduler.VPod{ tscheduler.NewVPod(testNs, "vpod-1", 0, nil), }, - pendings: int32(0), wantReplicas: int32(0), schedulerPolicyType: scheduler.MAXFILLUP, }, @@ -76,7 +75,6 @@ func TestAutoscaler(t *testing.T) { vpods: []scheduler.VPod{ tscheduler.NewVPod(testNs, "vpod-1", 5, nil), }, - pendings: int32(5), wantReplicas: int32(1), schedulerPolicyType: scheduler.MAXFILLUP, }, @@ -88,7 +86,6 @@ func TestAutoscaler(t *testing.T) { {PodName: "statefulset-name-0", VReplicas: int32(8)}, {PodName: "statefulset-name-1", VReplicas: int32(7)}}), }, - pendings: int32(0), wantReplicas: int32(2), schedulerPolicyType: scheduler.MAXFILLUP, }, @@ -96,23 +93,21 @@ func TestAutoscaler(t *testing.T) { name: "no replicas, with placements, with pending, enough capacity", replicas: int32(0), vpods: []scheduler.VPod{ - tscheduler.NewVPod(testNs, "vpod-1", 15, []duckv1alpha1.Placement{ + tscheduler.NewVPod(testNs, "vpod-1", 18, []duckv1alpha1.Placement{ {PodName: "statefulset-name-0", VReplicas: int32(8)}, {PodName: "statefulset-name-1", VReplicas: int32(7)}}), }, - pendings: int32(3), - wantReplicas: int32(3), + wantReplicas: int32(2), schedulerPolicyType: scheduler.MAXFILLUP, }, { name: "no replicas, with placements, with pending, not enough capacity", replicas: int32(0), vpods: []scheduler.VPod{ - tscheduler.NewVPod(testNs, "vpod-1", 15, []duckv1alpha1.Placement{ + tscheduler.NewVPod(testNs, "vpod-1", 23, []duckv1alpha1.Placement{ {PodName: "statefulset-name-0", VReplicas: int32(8)}, {PodName: "statefulset-name-1", VReplicas: int32(7)}}), }, - pendings: int32(8), wantReplicas: int32(3), schedulerPolicyType: scheduler.MAXFILLUP, }, @@ -122,7 +117,14 @@ func TestAutoscaler(t *testing.T) { vpods: []scheduler.VPod{ tscheduler.NewVPod(testNs, "vpod-1", 0, nil), }, - pendings: int32(0), + scaleDown: true, + wantReplicas: int32(0), + schedulerPolicyType: scheduler.MAXFILLUP, + }, + { + name: "with replicas, no placements, no pending, scale down (no vpods)", + replicas: int32(3), + vpods: []scheduler.VPod{}, scaleDown: true, wantReplicas: int32(0), schedulerPolicyType: scheduler.MAXFILLUP, @@ -133,7 +135,6 @@ func TestAutoscaler(t *testing.T) { vpods: []scheduler.VPod{ tscheduler.NewVPod(testNs, "vpod-1", 5, nil), }, - pendings: int32(5), scaleDown: true, wantReplicas: int32(1), schedulerPolicyType: scheduler.MAXFILLUP, @@ -144,7 +145,6 @@ func TestAutoscaler(t *testing.T) { vpods: []scheduler.VPod{ tscheduler.NewVPod(testNs, "vpod-1", 5, nil), }, - pendings: int32(5), scaleDown: false, wantReplicas: int32(3), schedulerPolicyType: scheduler.MAXFILLUP, @@ -153,10 +153,18 @@ func TestAutoscaler(t *testing.T) { name: "with replicas, no placements, with pending, scale up", replicas: int32(3), vpods: []scheduler.VPod{ - tscheduler.NewVPod(testNs, "vpod-1", 5, nil), + tscheduler.NewVPod(testNs, "vpod-1", 45, nil), }, - pendings: int32(40), - wantReplicas: int32(4), + wantReplicas: int32(5), + schedulerPolicyType: scheduler.MAXFILLUP, + }, + { + name: "with replicas, no placements, with pending, no change", + replicas: int32(3), + vpods: []scheduler.VPod{ + tscheduler.NewVPod(testNs, "vpod-1", 25, nil), + }, + wantReplicas: int32(3), schedulerPolicyType: scheduler.MAXFILLUP, }, { @@ -167,10 +175,92 @@ func TestAutoscaler(t *testing.T) { {PodName: "statefulset-name-0", VReplicas: int32(8)}, {PodName: "statefulset-name-1", VReplicas: int32(7)}}), }, - pendings: int32(0), wantReplicas: int32(2), schedulerPolicyType: scheduler.MAXFILLUP, }, + { + name: "with replicas, with placements, with reserved", + replicas: int32(2), + vpods: []scheduler.VPod{ + tscheduler.NewVPod(testNs, "vpod-1", 12, []duckv1alpha1.Placement{ + {PodName: "statefulset-name-0", VReplicas: int32(5)}, + {PodName: "statefulset-name-1", VReplicas: int32(7)}}), + }, + wantReplicas: int32(2), + schedulerPolicyType: scheduler.MAXFILLUP, + reserved: map[types.NamespacedName]map[string]int32{ + {Namespace: testNs, Name: "vpod-1"}: { + "statefulset-name-0": 8, + }, + }, + }, + { + name: "with replicas, with placements, with reserved (scale up)", + replicas: int32(2), + vpods: []scheduler.VPod{ + tscheduler.NewVPod(testNs, "vpod-1", 22, []duckv1alpha1.Placement{ + {PodName: "statefulset-name-0", VReplicas: int32(2)}, + {PodName: "statefulset-name-1", VReplicas: int32(7)}}), + }, + wantReplicas: int32(3), + schedulerPolicyType: scheduler.MAXFILLUP, + reserved: map[types.NamespacedName]map[string]int32{ + {Namespace: testNs, Name: "vpod-1"}: { + "statefulset-name-0": 9, + }, + }, + }, + { + name: "with replicas, with placements, with pending (scale up)", + replicas: int32(2), + vpods: []scheduler.VPod{ + tscheduler.NewVPod(testNs, "vpod-1", 21, []duckv1alpha1.Placement{ + {PodName: "statefulset-name-0", VReplicas: int32(5)}, + {PodName: "statefulset-name-1", VReplicas: int32(7)}}), + }, + wantReplicas: int32(3), + schedulerPolicyType: scheduler.MAXFILLUP, + }, + { + name: "with replicas, with placements, with pending (scale up)", + replicas: int32(2), + vpods: []scheduler.VPod{ + tscheduler.NewVPod(testNs, "vpod-1", 21, []duckv1alpha1.Placement{ + {PodName: "statefulset-name-0", VReplicas: int32(5)}, + {PodName: "statefulset-name-1", VReplicas: int32(7)}}), + tscheduler.NewVPod(testNs, "vpod-2", 19, []duckv1alpha1.Placement{ + {PodName: "statefulset-name-0", VReplicas: int32(5)}, + {PodName: "statefulset-name-1", VReplicas: int32(7)}}), + }, + wantReplicas: int32(4), + schedulerPolicyType: scheduler.MAXFILLUP, + }, + { + name: "with replicas, with placements, with pending (scale up), 1 over capacity", + replicas: int32(2), + vpods: []scheduler.VPod{ + tscheduler.NewVPod(testNs, "vpod-1", 21, []duckv1alpha1.Placement{ + {PodName: "statefulset-name-0", VReplicas: int32(5)}, + {PodName: "statefulset-name-1", VReplicas: int32(7)}}), + tscheduler.NewVPod(testNs, "vpod-2", 20, []duckv1alpha1.Placement{ + {PodName: "statefulset-name-0", VReplicas: int32(5)}, + {PodName: "statefulset-name-1", VReplicas: int32(7)}}), + }, + wantReplicas: int32(5), + schedulerPolicyType: scheduler.MAXFILLUP, + }, + { + name: "with replicas, with placements, with pending, attempt scale down", + replicas: int32(3), + vpods: []scheduler.VPod{ + tscheduler.NewVPod(testNs, "vpod-1", 21, []duckv1alpha1.Placement{ + {PodName: "statefulset-name-0", VReplicas: int32(5)}, + {PodName: "statefulset-name-1", VReplicas: int32(7)}}), + }, + wantReplicas: int32(3), + scaleDown: true, + schedulerPolicyType: scheduler.MAXFILLUP, + }, { name: "with replicas, with placements, no pending, scale down", replicas: int32(5), @@ -179,7 +269,6 @@ func TestAutoscaler(t *testing.T) { {PodName: "statefulset-name-0", VReplicas: int32(8)}, {PodName: "statefulset-name-1", VReplicas: int32(7)}}), }, - pendings: int32(0), scaleDown: true, wantReplicas: int32(2), schedulerPolicyType: scheduler.MAXFILLUP, @@ -188,23 +277,21 @@ func TestAutoscaler(t *testing.T) { name: "with replicas, with placements, with pending, enough capacity", replicas: int32(2), vpods: []scheduler.VPod{ - tscheduler.NewVPod(testNs, "vpod-1", 15, []duckv1alpha1.Placement{ + tscheduler.NewVPod(testNs, "vpod-1", 18, []duckv1alpha1.Placement{ {PodName: "statefulset-name-0", VReplicas: int32(8)}, {PodName: "statefulset-name-1", VReplicas: int32(7)}}), }, - pendings: int32(3), - wantReplicas: int32(3), + wantReplicas: int32(2), schedulerPolicyType: scheduler.MAXFILLUP, }, { name: "with replicas, with placements, with pending, not enough capacity", replicas: int32(2), vpods: []scheduler.VPod{ - tscheduler.NewVPod(testNs, "vpod-1", 15, []duckv1alpha1.Placement{ + tscheduler.NewVPod(testNs, "vpod-1", 23, []duckv1alpha1.Placement{ {PodName: "statefulset-name-0", VReplicas: int32(8)}, {PodName: "statefulset-name-1", VReplicas: int32(7)}}), }, - pendings: int32(8), wantReplicas: int32(3), schedulerPolicyType: scheduler.MAXFILLUP, }, @@ -212,14 +299,13 @@ func TestAutoscaler(t *testing.T) { name: "with replicas, with placements, no pending, round up capacity", replicas: int32(5), vpods: []scheduler.VPod{ - tscheduler.NewVPod(testNs, "vpod-1", 15, []duckv1alpha1.Placement{ + tscheduler.NewVPod(testNs, "vpod-1", 20, []duckv1alpha1.Placement{ {PodName: "statefulset-name-0", VReplicas: int32(8)}, {PodName: "statefulset-name-1", VReplicas: int32(7)}, {PodName: "statefulset-name-2", VReplicas: int32(1)}, {PodName: "statefulset-name-3", VReplicas: int32(1)}, {PodName: "statefulset-name-4", VReplicas: int32(1)}}), }, - pendings: int32(0), wantReplicas: int32(5), schedulerPolicyType: scheduler.MAXFILLUP, }, @@ -227,11 +313,10 @@ func TestAutoscaler(t *testing.T) { name: "with replicas, with placements, with pending, enough capacity, with Predicates and Zone Priorities", replicas: int32(2), vpods: []scheduler.VPod{ - tscheduler.NewVPod(testNs, "vpod-1", 15, []duckv1alpha1.Placement{ + tscheduler.NewVPod(testNs, "vpod-1", 18, []duckv1alpha1.Placement{ {PodName: "statefulset-name-0", VReplicas: int32(8)}, {PodName: "statefulset-name-1", VReplicas: int32(7)}}), }, - pendings: int32(3), wantReplicas: int32(5), schedulerPolicy: &scheduler.SchedulerPolicy{ Predicates: []scheduler.PredicatePolicy{ @@ -247,11 +332,10 @@ func TestAutoscaler(t *testing.T) { name: "with replicas, with placements, with pending, enough capacity, with Predicates and Node Priorities", replicas: int32(2), vpods: []scheduler.VPod{ - tscheduler.NewVPod(testNs, "vpod-1", 15, []duckv1alpha1.Placement{ + tscheduler.NewVPod(testNs, "vpod-1", 18, []duckv1alpha1.Placement{ {PodName: "statefulset-name-0", VReplicas: int32(8)}, {PodName: "statefulset-name-1", VReplicas: int32(7)}}), }, - pendings: int32(3), wantReplicas: int32(8), schedulerPolicy: &scheduler.SchedulerPolicy{ Predicates: []scheduler.PredicatePolicy{ @@ -267,11 +351,10 @@ func TestAutoscaler(t *testing.T) { name: "with replicas, with placements, with pending, enough capacity, with Pod Predicates and Priorities", replicas: int32(2), vpods: []scheduler.VPod{ - tscheduler.NewVPod(testNs, "vpod-1", 15, []duckv1alpha1.Placement{ + tscheduler.NewVPod(testNs, "vpod-1", 18, []duckv1alpha1.Placement{ {PodName: "statefulset-name-0", VReplicas: int32(8)}, {PodName: "statefulset-name-1", VReplicas: int32(7)}}), }, - pendings: int32(3), wantReplicas: int32(4), schedulerPolicy: &scheduler.SchedulerPolicy{ Predicates: []scheduler.PredicatePolicy{ @@ -287,11 +370,10 @@ func TestAutoscaler(t *testing.T) { name: "with replicas, with placements, with pending, enough capacity, with Pod Predicates and Zone Priorities", replicas: int32(2), vpods: []scheduler.VPod{ - tscheduler.NewVPod(testNs, "vpod-1", 15, []duckv1alpha1.Placement{ + tscheduler.NewVPod(testNs, "vpod-1", 18, []duckv1alpha1.Placement{ {PodName: "statefulset-name-0", VReplicas: int32(8)}, {PodName: "statefulset-name-1", VReplicas: int32(7)}}), }, - pendings: int32(3), wantReplicas: int32(5), schedulerPolicy: &scheduler.SchedulerPolicy{ Predicates: []scheduler.PredicatePolicy{ @@ -308,11 +390,10 @@ func TestAutoscaler(t *testing.T) { name: "with replicas, with placements, with pending, enough capacity, with Pod Predicates and Node Priorities", replicas: int32(2), vpods: []scheduler.VPod{ - tscheduler.NewVPod(testNs, "vpod-1", 15, []duckv1alpha1.Placement{ + tscheduler.NewVPod(testNs, "vpod-1", 18, []duckv1alpha1.Placement{ {PodName: "statefulset-name-0", VReplicas: int32(8)}, {PodName: "statefulset-name-1", VReplicas: int32(7)}}), }, - pendings: int32(3), wantReplicas: int32(8), schedulerPolicy: &scheduler.SchedulerPolicy{ Predicates: []scheduler.PredicatePolicy{ @@ -387,6 +468,9 @@ func TestAutoscaler(t *testing.T) { Evictor: noopEvictor, RefreshPeriod: 10 * time.Second, PodCapacity: 10, + getReserved: func() map[types.NamespacedName]map[string]int32 { + return tc.reserved + }, } autoscaler := newAutoscaler(ctx, cfg, stateAccessor) _ = autoscaler.Promote(reconciler.UniversalBucket(), nil) @@ -395,7 +479,7 @@ func TestAutoscaler(t *testing.T) { vpodClient.Append(vpod) } - err = autoscaler.doautoscale(ctx, tc.scaleDown, tc.pendings) + err = autoscaler.syncAutoscale(ctx, tc.scaleDown) if err != nil { t.Fatal("unexpected error", err) } @@ -444,6 +528,9 @@ func TestAutoscalerScaleDownToZero(t *testing.T) { Evictor: noopEvictor, RefreshPeriod: 2 * time.Second, PodCapacity: 10, + getReserved: func() map[types.NamespacedName]map[string]int32 { + return nil + }, } autoscaler := newAutoscaler(ctx, cfg, stateAccessor) _ = autoscaler.Promote(reconciler.UniversalBucket(), nil) diff --git a/pkg/scheduler/statefulset/scheduler.go b/pkg/scheduler/statefulset/scheduler.go index da8db64f623..ff4be28682a 100644 --- a/pkg/scheduler/statefulset/scheduler.go +++ b/pkg/scheduler/statefulset/scheduler.go @@ -58,6 +58,8 @@ import ( _ "knative.dev/eventing/pkg/scheduler/plugins/kafka/nomaxresourcecount" ) +type GetReserved func() map[types.NamespacedName]map[string]int32 + type Config struct { StatefulSetNamespace string `json:"statefulSetNamespace"` StatefulSetName string `json:"statefulSetName"` @@ -75,6 +77,9 @@ type Config struct { VPodLister scheduler.VPodLister `json:"-"` NodeLister corev1listers.NodeLister `json:"-"` + + // getReserved returns reserved replicas + getReserved GetReserved } func New(ctx context.Context, cfg *Config) (scheduler.Scheduler, error) { @@ -83,41 +88,36 @@ func New(ctx context.Context, cfg *Config) (scheduler.Scheduler, error) { podLister := podInformer.Lister().Pods(cfg.StatefulSetNamespace) stateAccessor := st.NewStateBuilder(ctx, cfg.StatefulSetNamespace, cfg.StatefulSetName, cfg.VPodLister, cfg.PodCapacity, cfg.SchedulerPolicy, cfg.SchedPolicy, cfg.DeschedPolicy, podLister, cfg.NodeLister) + + var getReserved GetReserved + cfg.getReserved = func() map[types.NamespacedName]map[string]int32 { + return getReserved() + } + autoscaler := newAutoscaler(ctx, cfg, stateAccessor) - go autoscaler.Start(ctx) + var wg sync.WaitGroup + wg.Add(1) + go func() { + wg.Wait() + autoscaler.Start(ctx) + }() + + s := newStatefulSetScheduler(ctx, cfg, stateAccessor, autoscaler, podLister) + getReserved = s.Reserved + wg.Done() - return newStatefulSetScheduler(ctx, cfg, stateAccessor, autoscaler, podLister), nil + return s, nil } -// NewScheduler creates a new scheduler with pod autoscaling enabled. -// Deprecated: Use New -func NewScheduler(ctx context.Context, - namespace, name string, - lister scheduler.VPodLister, - refreshPeriod time.Duration, - capacity int32, - schedulerPolicy scheduler.SchedulerPolicyType, - nodeLister corev1listers.NodeLister, - evictor scheduler.Evictor, - schedPolicy *scheduler.SchedulerPolicy, - deschedPolicy *scheduler.SchedulerPolicy) scheduler.Scheduler { - - cfg := &Config{ - StatefulSetNamespace: namespace, - StatefulSetName: name, - PodCapacity: capacity, - RefreshPeriod: refreshPeriod, - SchedulerPolicy: schedulerPolicy, - SchedPolicy: schedPolicy, - DeschedPolicy: deschedPolicy, - Evictor: evictor, - VPodLister: lister, - NodeLister: nodeLister, - } - - s, _ := New(ctx, cfg) - return s +type Pending map[types.NamespacedName]int32 + +func (p Pending) Total() int32 { + t := int32(0) + for _, vr := range p { + t += vr + } + return t } // StatefulSetScheduler is a scheduler placing VPod into statefulset-managed set of pods @@ -136,14 +136,10 @@ type StatefulSetScheduler struct { // replicas is the (cached) number of statefulset replicas. replicas int32 - // pending tracks the number of virtual replicas that haven't been scheduled yet - // because there wasn't enough free capacity. - // The autoscaler uses - pending map[types.NamespacedName]int32 - // reserved tracks vreplicas that have been placed (ie. scheduled) but haven't been // committed yet (ie. not appearing in vpodLister) - reserved map[types.NamespacedName]map[string]int32 + reserved map[types.NamespacedName]map[string]int32 + reservedMu sync.Mutex } var ( @@ -180,7 +176,6 @@ func newStatefulSetScheduler(ctx context.Context, statefulSetClient: kubeclient.Get(ctx).AppsV1().StatefulSets(cfg.StatefulSetNamespace), podLister: podlister, vpodLister: cfg.VPodLister, - pending: make(map[types.NamespacedName]int32), lock: new(sync.Mutex), stateAccessor: stateAccessor, reserved: make(map[types.NamespacedName]map[string]int32), @@ -200,6 +195,8 @@ func newStatefulSetScheduler(ctx context.Context, func (s *StatefulSetScheduler) Schedule(vpod scheduler.VPod) ([]duckv1alpha1.Placement, error) { s.lock.Lock() defer s.lock.Unlock() + s.reservedMu.Lock() + defer s.reservedMu.Unlock() vpods, err := s.vpodLister() if err != nil { @@ -227,8 +224,6 @@ func (s *StatefulSetScheduler) Schedule(vpod scheduler.VPod) ([]duckv1alpha1.Pla func (s *StatefulSetScheduler) scheduleVPod(vpod scheduler.VPod) ([]duckv1alpha1.Placement, error) { logger := s.logger.With("key", vpod.GetKey()) - logger.Debugw("scheduling", zap.Any("pending", toJSONable(s.pending))) - // Get the current placements state // Quite an expensive operation but safe and simple. state, err := s.stateAccessor.State(s.reserved) @@ -237,6 +232,8 @@ func (s *StatefulSetScheduler) scheduleVPod(vpod scheduler.VPod) ([]duckv1alpha1 return nil, err } + logger.Debugw("scheduling", zap.Any("state", state)) + existingPlacements := vpod.GetPlacements() var left int32 @@ -260,7 +257,6 @@ func (s *StatefulSetScheduler) scheduleVPod(vpod scheduler.VPod) ([]duckv1alpha1 tr := scheduler.GetTotalVReplicas(placements) if tr == vpod.GetVReplicas() { logger.Debug("scheduling succeeded (already scheduled)") - delete(s.pending, vpod.GetKey()) // Fully placed. Nothing to do return placements, nil @@ -308,17 +304,14 @@ func (s *StatefulSetScheduler) scheduleVPod(vpod scheduler.VPod) ([]duckv1alpha1 // Give time for the autoscaler to do its job logger.Info("not enough pod replicas to schedule. Awaiting autoscaler", zap.Any("placement", placements), zap.Int32("left", left)) - s.pending[vpod.GetKey()] = left - // Trigger the autoscaler if s.autoscaler != nil { - s.autoscaler.Autoscale(s.ctx, false, s.pendingVReplicas()) + s.autoscaler.Autoscale(s.ctx) } if state.SchedPolicy != nil { logger.Info("reverting to previous placements") s.reservePlacements(vpod, existingPlacements) // rebalancing doesn't care about new placements since all vreps will be re-placed - delete(s.pending, vpod.GetKey()) // rebalancing doesn't care about pending since all vreps will be re-placed return existingPlacements, s.notEnoughPodReplicas(left) // requeue to wait for the autoscaler to do its job } @@ -326,7 +319,6 @@ func (s *StatefulSetScheduler) scheduleVPod(vpod scheduler.VPod) ([]duckv1alpha1 } logger.Infow("scheduling successful", zap.Any("placement", placements)) - delete(s.pending, vpod.GetKey()) return placements, nil } @@ -735,16 +727,6 @@ func (s *StatefulSetScheduler) addReplicas(states *st.State, diff int32, placeme return newPlacements, diff } -// pendingReplicas returns the total number of vreplicas -// that haven't been scheduled yet -func (s *StatefulSetScheduler) pendingVReplicas() int32 { - t := int32(0) - for _, v := range s.pending { - t += v - } - return t -} - func (s *StatefulSetScheduler) updateStatefulset(obj interface{}) { statefulset, ok := obj.(*appsv1.StatefulSet) if !ok { @@ -800,3 +782,18 @@ func (s *StatefulSetScheduler) notEnoughPodReplicas(left int32) error { controller.NewRequeueAfter(5*time.Second), ) } + +func (s *StatefulSetScheduler) Reserved() map[types.NamespacedName]map[string]int32 { + s.reservedMu.Lock() + defer s.reservedMu.Unlock() + + r := make(map[types.NamespacedName]map[string]int32, len(s.reserved)) + for k1, v1 := range s.reserved { + r[k1] = make(map[string]int32, len(v1)) + for k2, v2 := range v1 { + r[k1][k2] = v2 + } + } + + return r +} diff --git a/pkg/scheduler/statefulset/scheduler_test.go b/pkg/scheduler/statefulset/scheduler_test.go index f9ac3a25669..456c9d4e56c 100644 --- a/pkg/scheduler/statefulset/scheduler_test.go +++ b/pkg/scheduler/statefulset/scheduler_test.go @@ -784,9 +784,6 @@ func TestStatefulsetScheduler(t *testing.T) { VPodLister: vpodClient.List, } s := newStatefulSetScheduler(ctx, cfg, sa, nil, lsp.GetPodLister().Pods(testNs)) - if tc.pending != nil { - s.pending = tc.pending - } // Give some time for the informer to notify the scheduler and set the number of replicas err = wait.PollImmediate(200*time.Millisecond, time.Second, func() (bool, error) { @@ -907,7 +904,7 @@ type fakeAutoscaler struct { func (f *fakeAutoscaler) Start(ctx context.Context) { } -func (f *fakeAutoscaler) Autoscale(ctx context.Context, attemptScaleDown bool, pending int32) { +func (f *fakeAutoscaler) Autoscale(ctx context.Context) { } func newFakeAutoscaler() *fakeAutoscaler { diff --git a/vendor/github.com/go-logr/logr/.golangci.yaml b/vendor/github.com/go-logr/logr/.golangci.yaml index 94ff801df1a..0cffafa7bf9 100644 --- a/vendor/github.com/go-logr/logr/.golangci.yaml +++ b/vendor/github.com/go-logr/logr/.golangci.yaml @@ -6,7 +6,6 @@ linters: disable-all: true enable: - asciicheck - - deadcode - errcheck - forcetypeassert - gocritic @@ -18,10 +17,8 @@ linters: - misspell - revive - staticcheck - - structcheck - typecheck - unused - - varcheck issues: exclude-use-default: false diff --git a/vendor/github.com/go-logr/logr/discard.go b/vendor/github.com/go-logr/logr/discard.go index 9d92a38f1d7..99fe8be93c1 100644 --- a/vendor/github.com/go-logr/logr/discard.go +++ b/vendor/github.com/go-logr/logr/discard.go @@ -20,35 +20,5 @@ package logr // used whenever the caller is not interested in the logs. Logger instances // produced by this function always compare as equal. func Discard() Logger { - return Logger{ - level: 0, - sink: discardLogSink{}, - } -} - -// discardLogSink is a LogSink that discards all messages. -type discardLogSink struct{} - -// Verify that it actually implements the interface -var _ LogSink = discardLogSink{} - -func (l discardLogSink) Init(RuntimeInfo) { -} - -func (l discardLogSink) Enabled(int) bool { - return false -} - -func (l discardLogSink) Info(int, string, ...interface{}) { -} - -func (l discardLogSink) Error(error, string, ...interface{}) { -} - -func (l discardLogSink) WithValues(...interface{}) LogSink { - return l -} - -func (l discardLogSink) WithName(string) LogSink { - return l + return New(nil) } diff --git a/vendor/github.com/go-logr/logr/logr.go b/vendor/github.com/go-logr/logr/logr.go index c3b56b3d2c5..e027aea3fd3 100644 --- a/vendor/github.com/go-logr/logr/logr.go +++ b/vendor/github.com/go-logr/logr/logr.go @@ -21,7 +21,7 @@ limitations under the License. // to back that API. Packages in the Go ecosystem can depend on this package, // while callers can implement logging with whatever backend is appropriate. // -// Usage +// # Usage // // Logging is done using a Logger instance. Logger is a concrete type with // methods, which defers the actual logging to a LogSink interface. The main @@ -30,16 +30,20 @@ limitations under the License. // "structured logging". // // With Go's standard log package, we might write: -// log.Printf("setting target value %s", targetValue) +// +// log.Printf("setting target value %s", targetValue) // // With logr's structured logging, we'd write: -// logger.Info("setting target", "value", targetValue) +// +// logger.Info("setting target", "value", targetValue) // // Errors are much the same. Instead of: -// log.Printf("failed to open the pod bay door for user %s: %v", user, err) +// +// log.Printf("failed to open the pod bay door for user %s: %v", user, err) // // We'd write: -// logger.Error(err, "failed to open the pod bay door", "user", user) +// +// logger.Error(err, "failed to open the pod bay door", "user", user) // // Info() and Error() are very similar, but they are separate methods so that // LogSink implementations can choose to do things like attach additional @@ -47,7 +51,7 @@ limitations under the License. // always logged, regardless of the current verbosity. If there is no error // instance available, passing nil is valid. // -// Verbosity +// # Verbosity // // Often we want to log information only when the application in "verbose // mode". To write log lines that are more verbose, Logger has a V() method. @@ -58,20 +62,22 @@ limitations under the License. // Error messages do not have a verbosity level and are always logged. // // Where we might have written: -// if flVerbose >= 2 { -// log.Printf("an unusual thing happened") -// } +// +// if flVerbose >= 2 { +// log.Printf("an unusual thing happened") +// } // // We can write: -// logger.V(2).Info("an unusual thing happened") // -// Logger Names +// logger.V(2).Info("an unusual thing happened") +// +// # Logger Names // // Logger instances can have name strings so that all messages logged through // that instance have additional context. For example, you might want to add // a subsystem name: // -// logger.WithName("compactor").Info("started", "time", time.Now()) +// logger.WithName("compactor").Info("started", "time", time.Now()) // // The WithName() method returns a new Logger, which can be passed to // constructors or other functions for further use. Repeated use of WithName() @@ -82,25 +88,27 @@ limitations under the License. // joining operation (e.g. whitespace, commas, periods, slashes, brackets, // quotes, etc). // -// Saved Values +// # Saved Values // // Logger instances can store any number of key/value pairs, which will be // logged alongside all messages logged through that instance. For example, // you might want to create a Logger instance per managed object: // // With the standard log package, we might write: -// log.Printf("decided to set field foo to value %q for object %s/%s", -// targetValue, object.Namespace, object.Name) +// +// log.Printf("decided to set field foo to value %q for object %s/%s", +// targetValue, object.Namespace, object.Name) // // With logr we'd write: -// // Elsewhere: set up the logger to log the object name. -// obj.logger = mainLogger.WithValues( -// "name", obj.name, "namespace", obj.namespace) // -// // later on... -// obj.logger.Info("setting foo", "value", targetValue) +// // Elsewhere: set up the logger to log the object name. +// obj.logger = mainLogger.WithValues( +// "name", obj.name, "namespace", obj.namespace) +// +// // later on... +// obj.logger.Info("setting foo", "value", targetValue) // -// Best Practices +// # Best Practices // // Logger has very few hard rules, with the goal that LogSink implementations // might have a lot of freedom to differentiate. There are, however, some @@ -124,15 +132,15 @@ limitations under the License. // around. For cases where passing a logger is optional, a pointer to Logger // should be used. // -// Key Naming Conventions +// # Key Naming Conventions // // Keys are not strictly required to conform to any specification or regex, but // it is recommended that they: -// * be human-readable and meaningful (not auto-generated or simple ordinals) -// * be constant (not dependent on input data) -// * contain only printable characters -// * not contain whitespace or punctuation -// * use lower case for simple keys and lowerCamelCase for more complex ones +// - be human-readable and meaningful (not auto-generated or simple ordinals) +// - be constant (not dependent on input data) +// - contain only printable characters +// - not contain whitespace or punctuation +// - use lower case for simple keys and lowerCamelCase for more complex ones // // These guidelines help ensure that log data is processed properly regardless // of the log implementation. For example, log implementations will try to @@ -141,51 +149,54 @@ limitations under the License. // While users are generally free to use key names of their choice, it's // generally best to avoid using the following keys, as they're frequently used // by implementations: -// * "caller": the calling information (file/line) of a particular log line -// * "error": the underlying error value in the `Error` method -// * "level": the log level -// * "logger": the name of the associated logger -// * "msg": the log message -// * "stacktrace": the stack trace associated with a particular log line or -// error (often from the `Error` message) -// * "ts": the timestamp for a log line +// - "caller": the calling information (file/line) of a particular log line +// - "error": the underlying error value in the `Error` method +// - "level": the log level +// - "logger": the name of the associated logger +// - "msg": the log message +// - "stacktrace": the stack trace associated with a particular log line or +// error (often from the `Error` message) +// - "ts": the timestamp for a log line // // Implementations are encouraged to make use of these keys to represent the // above concepts, when necessary (for example, in a pure-JSON output form, it // would be necessary to represent at least message and timestamp as ordinary // named values). // -// Break Glass +// # Break Glass // // Implementations may choose to give callers access to the underlying // logging implementation. The recommended pattern for this is: -// // Underlier exposes access to the underlying logging implementation. -// // Since callers only have a logr.Logger, they have to know which -// // implementation is in use, so this interface is less of an abstraction -// // and more of way to test type conversion. -// type Underlier interface { -// GetUnderlying() -// } +// +// // Underlier exposes access to the underlying logging implementation. +// // Since callers only have a logr.Logger, they have to know which +// // implementation is in use, so this interface is less of an abstraction +// // and more of way to test type conversion. +// type Underlier interface { +// GetUnderlying() +// } // // Logger grants access to the sink to enable type assertions like this: -// func DoSomethingWithImpl(log logr.Logger) { -// if underlier, ok := log.GetSink()(impl.Underlier) { -// implLogger := underlier.GetUnderlying() -// ... -// } -// } +// +// func DoSomethingWithImpl(log logr.Logger) { +// if underlier, ok := log.GetSink().(impl.Underlier); ok { +// implLogger := underlier.GetUnderlying() +// ... +// } +// } // // Custom `With*` functions can be implemented by copying the complete // Logger struct and replacing the sink in the copy: -// // WithFooBar changes the foobar parameter in the log sink and returns a -// // new logger with that modified sink. It does nothing for loggers where -// // the sink doesn't support that parameter. -// func WithFoobar(log logr.Logger, foobar int) logr.Logger { -// if foobarLogSink, ok := log.GetSink()(FoobarSink); ok { -// log = log.WithSink(foobarLogSink.WithFooBar(foobar)) -// } -// return log -// } +// +// // WithFooBar changes the foobar parameter in the log sink and returns a +// // new logger with that modified sink. It does nothing for loggers where +// // the sink doesn't support that parameter. +// func WithFoobar(log logr.Logger, foobar int) logr.Logger { +// if foobarLogSink, ok := log.GetSink().(FoobarSink); ok { +// log = log.WithSink(foobarLogSink.WithFooBar(foobar)) +// } +// return log +// } // // Don't use New to construct a new Logger with a LogSink retrieved from an // existing Logger. Source code attribution might not work correctly and @@ -201,11 +212,14 @@ import ( ) // New returns a new Logger instance. This is primarily used by libraries -// implementing LogSink, rather than end users. +// implementing LogSink, rather than end users. Passing a nil sink will create +// a Logger which discards all log lines. func New(sink LogSink) Logger { logger := Logger{} logger.setSink(sink) - sink.Init(runtimeInfo) + if sink != nil { + sink.Init(runtimeInfo) + } return logger } @@ -244,7 +258,7 @@ type Logger struct { // Enabled tests whether this Logger is enabled. For example, commandline // flags might be used to set the logging verbosity and disable some info logs. func (l Logger) Enabled() bool { - return l.sink.Enabled(l.level) + return l.sink != nil && l.sink.Enabled(l.level) } // Info logs a non-error message with the given key/value pairs as context. @@ -254,6 +268,9 @@ func (l Logger) Enabled() bool { // information. The key/value pairs must alternate string keys and arbitrary // values. func (l Logger) Info(msg string, keysAndValues ...interface{}) { + if l.sink == nil { + return + } if l.Enabled() { if withHelper, ok := l.sink.(CallStackHelperLogSink); ok { withHelper.GetCallStackHelper()() @@ -273,6 +290,9 @@ func (l Logger) Info(msg string, keysAndValues ...interface{}) { // triggered this log line, if present. The err parameter is optional // and nil may be passed instead of an error instance. func (l Logger) Error(err error, msg string, keysAndValues ...interface{}) { + if l.sink == nil { + return + } if withHelper, ok := l.sink.(CallStackHelperLogSink); ok { withHelper.GetCallStackHelper()() } @@ -284,6 +304,9 @@ func (l Logger) Error(err error, msg string, keysAndValues ...interface{}) { // level means a log message is less important. Negative V-levels are treated // as 0. func (l Logger) V(level int) Logger { + if l.sink == nil { + return l + } if level < 0 { level = 0 } @@ -294,6 +317,9 @@ func (l Logger) V(level int) Logger { // WithValues returns a new Logger instance with additional key/value pairs. // See Info for documentation on how key/value pairs work. func (l Logger) WithValues(keysAndValues ...interface{}) Logger { + if l.sink == nil { + return l + } l.setSink(l.sink.WithValues(keysAndValues...)) return l } @@ -304,6 +330,9 @@ func (l Logger) WithValues(keysAndValues ...interface{}) Logger { // contain only letters, digits, and hyphens (see the package documentation for // more information). func (l Logger) WithName(name string) Logger { + if l.sink == nil { + return l + } l.setSink(l.sink.WithName(name)) return l } @@ -324,6 +353,9 @@ func (l Logger) WithName(name string) Logger { // WithCallDepth(1) because it works with implementions that support the // CallDepthLogSink and/or CallStackHelperLogSink interfaces. func (l Logger) WithCallDepth(depth int) Logger { + if l.sink == nil { + return l + } if withCallDepth, ok := l.sink.(CallDepthLogSink); ok { l.setSink(withCallDepth.WithCallDepth(depth)) } @@ -345,6 +377,9 @@ func (l Logger) WithCallDepth(depth int) Logger { // implementation does not support either of these, the original Logger will be // returned. func (l Logger) WithCallStackHelper() (func(), Logger) { + if l.sink == nil { + return func() {}, l + } var helper func() if withCallDepth, ok := l.sink.(CallDepthLogSink); ok { l.setSink(withCallDepth.WithCallDepth(1)) @@ -357,6 +392,11 @@ func (l Logger) WithCallStackHelper() (func(), Logger) { return helper, l } +// IsZero returns true if this logger is an uninitialized zero value +func (l Logger) IsZero() bool { + return l.sink == nil +} + // contextKey is how we find Loggers in a context.Context. type contextKey struct{} @@ -442,7 +482,7 @@ type LogSink interface { WithName(name string) LogSink } -// CallDepthLogSink represents a Logger that knows how to climb the call stack +// CallDepthLogSink represents a LogSink that knows how to climb the call stack // to identify the original call site and can offset the depth by a specified // number of frames. This is useful for users who have helper functions // between the "real" call site and the actual calls to Logger methods. @@ -467,7 +507,7 @@ type CallDepthLogSink interface { WithCallDepth(depth int) LogSink } -// CallStackHelperLogSink represents a Logger that knows how to climb +// CallStackHelperLogSink represents a LogSink that knows how to climb // the call stack to identify the original call site and can skip // intermediate helper functions if they mark themselves as // helper. Go's testing package uses that approach. diff --git a/vendor/github.com/google/uuid/.travis.yml b/vendor/github.com/google/uuid/.travis.yml deleted file mode 100644 index d8156a60ba9..00000000000 --- a/vendor/github.com/google/uuid/.travis.yml +++ /dev/null @@ -1,9 +0,0 @@ -language: go - -go: - - 1.4.3 - - 1.5.3 - - tip - -script: - - go test -v ./... diff --git a/vendor/github.com/google/uuid/CHANGELOG.md b/vendor/github.com/google/uuid/CHANGELOG.md new file mode 100644 index 00000000000..2bd78667afb --- /dev/null +++ b/vendor/github.com/google/uuid/CHANGELOG.md @@ -0,0 +1,10 @@ +# Changelog + +## [1.3.1](https://github.com/google/uuid/compare/v1.3.0...v1.3.1) (2023-08-18) + + +### Bug Fixes + +* Use .EqualFold() to parse urn prefixed UUIDs ([#118](https://github.com/google/uuid/issues/118)) ([574e687](https://github.com/google/uuid/commit/574e6874943741fb99d41764c705173ada5293f0)) + +## Changelog diff --git a/vendor/github.com/google/uuid/CONTRIBUTING.md b/vendor/github.com/google/uuid/CONTRIBUTING.md index 04fdf09f136..5566888726d 100644 --- a/vendor/github.com/google/uuid/CONTRIBUTING.md +++ b/vendor/github.com/google/uuid/CONTRIBUTING.md @@ -2,6 +2,22 @@ We definitely welcome patches and contribution to this project! +### Tips + +Commits must be formatted according to the [Conventional Commits Specification](https://www.conventionalcommits.org). + +Always try to include a test case! If it is not possible or not necessary, +please explain why in the pull request description. + +### Releasing + +Commits that would precipitate a SemVer change, as desrcibed in the Conventional +Commits Specification, will trigger [`release-please`](https://github.com/google-github-actions/release-please-action) +to create a release candidate pull request. Once submitted, `release-please` +will create a release. + +For tips on how to work with `release-please`, see its documentation. + ### Legal requirements In order to protect both you and ourselves, you will need to sign the diff --git a/vendor/github.com/google/uuid/README.md b/vendor/github.com/google/uuid/README.md index f765a46f915..3e9a61889de 100644 --- a/vendor/github.com/google/uuid/README.md +++ b/vendor/github.com/google/uuid/README.md @@ -1,6 +1,6 @@ -# uuid ![build status](https://travis-ci.org/google/uuid.svg?branch=master) +# uuid The uuid package generates and inspects UUIDs based on -[RFC 4122](http://tools.ietf.org/html/rfc4122) +[RFC 4122](https://datatracker.ietf.org/doc/html/rfc4122) and DCE 1.1: Authentication and Security Services. This package is based on the github.com/pborman/uuid package (previously named @@ -9,10 +9,12 @@ a UUID is a 16 byte array rather than a byte slice. One loss due to this change is the ability to represent an invalid UUID (vs a NIL UUID). ###### Install -`go get github.com/google/uuid` +```sh +go get github.com/google/uuid +``` ###### Documentation -[![GoDoc](https://godoc.org/github.com/google/uuid?status.svg)](http://godoc.org/github.com/google/uuid) +[![Go Reference](https://pkg.go.dev/badge/github.com/google/uuid.svg)](https://pkg.go.dev/github.com/google/uuid) Full `go doc` style documentation for the package can be viewed online without installing this package by using the GoDoc site here: diff --git a/vendor/github.com/google/uuid/node_js.go b/vendor/github.com/google/uuid/node_js.go index 24b78edc907..b2a0bc8711b 100644 --- a/vendor/github.com/google/uuid/node_js.go +++ b/vendor/github.com/google/uuid/node_js.go @@ -7,6 +7,6 @@ package uuid // getHardwareInterface returns nil values for the JS version of the code. -// This remvoves the "net" dependency, because it is not used in the browser. +// This removes the "net" dependency, because it is not used in the browser. // Using the "net" library inflates the size of the transpiled JS code by 673k bytes. func getHardwareInterface(name string) (string, []byte) { return "", nil } diff --git a/vendor/github.com/google/uuid/uuid.go b/vendor/github.com/google/uuid/uuid.go index a57207aeb6f..a56138cc4bd 100644 --- a/vendor/github.com/google/uuid/uuid.go +++ b/vendor/github.com/google/uuid/uuid.go @@ -69,7 +69,7 @@ func Parse(s string) (UUID, error) { // urn:uuid:xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx case 36 + 9: - if strings.ToLower(s[:9]) != "urn:uuid:" { + if !strings.EqualFold(s[:9], "urn:uuid:") { return uuid, fmt.Errorf("invalid urn prefix: %q", s[:9]) } s = s[9:] @@ -101,7 +101,8 @@ func Parse(s string) (UUID, error) { 9, 11, 14, 16, 19, 21, - 24, 26, 28, 30, 32, 34} { + 24, 26, 28, 30, 32, 34, + } { v, ok := xtob(s[x], s[x+1]) if !ok { return uuid, errors.New("invalid UUID format") @@ -117,7 +118,7 @@ func ParseBytes(b []byte) (UUID, error) { switch len(b) { case 36: // xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx case 36 + 9: // urn:uuid:xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx - if !bytes.Equal(bytes.ToLower(b[:9]), []byte("urn:uuid:")) { + if !bytes.EqualFold(b[:9], []byte("urn:uuid:")) { return uuid, fmt.Errorf("invalid urn prefix: %q", b[:9]) } b = b[9:] @@ -145,7 +146,8 @@ func ParseBytes(b []byte) (UUID, error) { 9, 11, 14, 16, 19, 21, - 24, 26, 28, 30, 32, 34} { + 24, 26, 28, 30, 32, 34, + } { v, ok := xtob(b[x], b[x+1]) if !ok { return uuid, errors.New("invalid UUID format") diff --git a/vendor/github.com/openzipkin/zipkin-go/.golangci.yml b/vendor/github.com/openzipkin/zipkin-go/.golangci.yml index 3d615b8b0a4..e990f027f11 100644 --- a/vendor/github.com/openzipkin/zipkin-go/.golangci.yml +++ b/vendor/github.com/openzipkin/zipkin-go/.golangci.yml @@ -16,9 +16,8 @@ linters: - lll - misspell - nakedret - - structcheck - unparam - - varcheck + - unused linters-settings: dupl: diff --git a/vendor/github.com/openzipkin/zipkin-go/README.md b/vendor/github.com/openzipkin/zipkin-go/README.md index 68e7081e134..05000f80a08 100644 --- a/vendor/github.com/openzipkin/zipkin-go/README.md +++ b/vendor/github.com/openzipkin/zipkin-go/README.md @@ -109,7 +109,7 @@ backend asynchronously. #### Kafka Reporter High performance Reporter transporting Spans to the Zipkin server using a Kafka Producer digesting JSON V2 Spans. The reporter uses the -[Sarama async producer](https://godoc.org/github.com/Shopify/sarama#AsyncProducer) +[Sarama async producer](https://pkg.go.dev/github.com/IBM/sarama#AsyncProducer) underneath. ## usage and examples diff --git a/vendor/gomodules.xyz/jsonpatch/v2/jsonpatch.go b/vendor/gomodules.xyz/jsonpatch/v2/jsonpatch.go index a411d542c68..0d7823b3cd6 100644 --- a/vendor/gomodules.xyz/jsonpatch/v2/jsonpatch.go +++ b/vendor/gomodules.xyz/jsonpatch/v2/jsonpatch.go @@ -1,6 +1,7 @@ package jsonpatch import ( + "bytes" "encoding/json" "fmt" "reflect" @@ -64,6 +65,9 @@ func NewOperation(op, path string, value interface{}) Operation { // // An error will be returned if any of the two documents are invalid. func CreatePatch(a, b []byte) ([]Operation, error) { + if bytes.Equal(a, b) { + return []Operation{}, nil + } var aI interface{} var bI interface{} err := json.Unmarshal(a, &aI) diff --git a/vendor/knative.dev/pkg/hack/update-codegen.sh b/vendor/knative.dev/pkg/hack/update-codegen.sh index d432f659cb3..3d0945344fc 100644 --- a/vendor/knative.dev/pkg/hack/update-codegen.sh +++ b/vendor/knative.dev/pkg/hack/update-codegen.sh @@ -51,7 +51,7 @@ EXTERNAL_INFORMER_PKG="k8s.io/client-go/informers" \ k8s.io/api \ "${K8S_TYPES}" \ --go-header-file ${REPO_ROOT_DIR}/hack/boilerplate/boilerplate.go.txt \ - --force-genreconciler-kinds "Namespace,ConfigMap,Deployment,Secret,Pod,CronJob,NetworkPolicy,Node,ValidatingWebhookConfiguration,MutatingWebhookConfiguration" + --force-genreconciler-kinds "Namespace,ConfigMap,Deployment,Secret,Pod,CronJob,NetworkPolicy,Node,ValidatingWebhookConfiguration,MutatingWebhookConfiguration,ServiceAccount" OUTPUT_PKG="knative.dev/pkg/client/injection/apiextensions" \ VERSIONED_CLIENTSET_PKG="k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" \ diff --git a/vendor/modules.txt b/vendor/modules.txt index bb74b8ade8a..a56b826864b 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -116,7 +116,7 @@ github.com/go-kit/log/level # github.com/go-logfmt/logfmt v0.5.1 ## explicit; go 1.17 github.com/go-logfmt/logfmt -# github.com/go-logr/logr v1.2.3 +# github.com/go-logr/logr v1.2.4 ## explicit; go 1.16 github.com/go-logr/logr # github.com/go-openapi/jsonpointer v0.19.5 @@ -210,7 +210,7 @@ github.com/google/s2a-go/internal/v2/remotesigner github.com/google/s2a-go/internal/v2/tlsconfigstore github.com/google/s2a-go/retry github.com/google/s2a-go/stream -# github.com/google/uuid v1.3.0 +# github.com/google/uuid v1.3.1 ## explicit github.com/google/uuid # github.com/googleapis/enterprise-certificate-proxy v0.2.5 @@ -281,7 +281,7 @@ github.com/modern-go/reflect2 # github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 ## explicit github.com/munnerz/goautoneg -# github.com/openzipkin/zipkin-go v0.4.1 +# github.com/openzipkin/zipkin-go v0.4.2 ## explicit; go 1.18 github.com/openzipkin/zipkin-go github.com/openzipkin/zipkin-go/idgenerator @@ -500,7 +500,7 @@ golang.org/x/tools/internal/typeparams ## explicit; go 1.17 golang.org/x/xerrors golang.org/x/xerrors/internal -# gomodules.xyz/jsonpatch/v2 v2.3.0 +# gomodules.xyz/jsonpatch/v2 v2.4.0 ## explicit; go 1.20 gomodules.xyz/jsonpatch/v2 # google.golang.org/api v0.138.0 @@ -1189,7 +1189,7 @@ knative.dev/hack/schema/commands knative.dev/hack/schema/docs knative.dev/hack/schema/registry knative.dev/hack/schema/schema -# knative.dev/pkg v0.0.0-20230821102121-81e4ee140363 +# knative.dev/pkg v0.0.0-20230901225035-211243a92d2f ## explicit; go 1.18 knative.dev/pkg/apiextensions/storageversion knative.dev/pkg/apiextensions/storageversion/cmd/migrate @@ -1327,7 +1327,7 @@ knative.dev/pkg/webhook/resourcesemantics knative.dev/pkg/webhook/resourcesemantics/conversion knative.dev/pkg/webhook/resourcesemantics/defaulting knative.dev/pkg/webhook/resourcesemantics/validation -# knative.dev/reconciler-test v0.0.0-20230817080342-39774f133674 +# knative.dev/reconciler-test v0.0.0-20230901013135-51e7751247b7 ## explicit; go 1.18 knative.dev/reconciler-test/cmd/eventshub knative.dev/reconciler-test/pkg/environment