diff --git a/cluster-autoscaler/config/autoscaling_options.go b/cluster-autoscaler/config/autoscaling_options.go index ed4a8b645801..5f32de514af7 100644 --- a/cluster-autoscaler/config/autoscaling_options.go +++ b/cluster-autoscaler/config/autoscaling_options.go @@ -311,6 +311,8 @@ type AutoscalingOptions struct { ForceDeleteLongUnregisteredNodes bool // DynamicResourceAllocationEnabled configures whether logic for handling DRA objects is enabled. DynamicResourceAllocationEnabled bool + // ClusterSnapshotParallelism is the maximum parallelism of cluster snapshot creation. + ClusterSnapshotParallelism int } // KubeClientOptions specify options for kube client diff --git a/cluster-autoscaler/core/podlistprocessor/filter_out_schedulable_test.go b/cluster-autoscaler/core/podlistprocessor/filter_out_schedulable_test.go index c8510cb0ac17..61303ed3a8d1 100644 --- a/cluster-autoscaler/core/podlistprocessor/filter_out_schedulable_test.go +++ b/cluster-autoscaler/core/podlistprocessor/filter_out_schedulable_test.go @@ -254,7 +254,7 @@ func BenchmarkFilterOutSchedulable(b *testing.B) { return testsnapshot.NewCustomTestSnapshotOrDie(b, store.NewBasicSnapshotStore()) }, "delta": func() clustersnapshot.ClusterSnapshot { - return testsnapshot.NewCustomTestSnapshotOrDie(b, store.NewDeltaSnapshotStore()) + return testsnapshot.NewCustomTestSnapshotOrDie(b, store.NewDeltaSnapshotStore(16)) }, } for snapshotName, snapshotFactory := range snapshots { diff --git a/cluster-autoscaler/main.go b/cluster-autoscaler/main.go index c828774e992b..3395b26ac6e7 100644 --- a/cluster-autoscaler/main.go +++ b/cluster-autoscaler/main.go @@ -283,6 +283,7 @@ var ( checkCapacityProvisioningRequestBatchTimebox = flag.Duration("check-capacity-provisioning-request-batch-timebox", 10*time.Second, "Maximum time to process a batch of provisioning requests.") forceDeleteLongUnregisteredNodes = flag.Bool("force-delete-unregistered-nodes", false, "Whether to enable force deletion of long unregistered nodes, regardless of the min size of the node group the belong to.") enableDynamicResourceAllocation = flag.Bool("enable-dynamic-resource-allocation", false, "Whether logic for handling DRA (Dynamic Resource Allocation) objects is enabled.") + clusterSnapshotParallelism = flag.Int("cluster-snapshot-parallelism", 16, "Maximum parallelism of cluster snapshot creation.") ) func isFlagPassed(name string) bool { @@ -463,6 +464,7 @@ func createAutoscalingOptions() config.AutoscalingOptions { CheckCapacityProvisioningRequestBatchTimebox: *checkCapacityProvisioningRequestBatchTimebox, ForceDeleteLongUnregisteredNodes: *forceDeleteLongUnregisteredNodes, DynamicResourceAllocationEnabled: *enableDynamicResourceAllocation, + ClusterSnapshotParallelism: *clusterSnapshotParallelism, } } @@ -505,7 +507,7 @@ func buildAutoscaler(context ctx.Context, debuggingSnapshotter debuggingsnapshot deleteOptions := options.NewNodeDeleteOptions(autoscalingOptions) drainabilityRules := rules.Default(deleteOptions) - var snapshotStore clustersnapshot.ClusterSnapshotStore = store.NewDeltaSnapshotStore() + var snapshotStore clustersnapshot.ClusterSnapshotStore = store.NewDeltaSnapshotStore(autoscalingOptions.ClusterSnapshotParallelism) if autoscalingOptions.DynamicResourceAllocationEnabled { // TODO(DRA): Remove this once DeltaSnapshotStore is integrated with DRA. klog.Warningf("Using BasicSnapshotStore instead of DeltaSnapshotStore because DRA is enabled. Autoscaling performance/scalability might be decreased.") diff --git a/cluster-autoscaler/processors/podinjection/pod_injection_processor_test.go b/cluster-autoscaler/processors/podinjection/pod_injection_processor_test.go index 5ef275fd3dcf..4ce6b895b15f 100644 --- a/cluster-autoscaler/processors/podinjection/pod_injection_processor_test.go +++ b/cluster-autoscaler/processors/podinjection/pod_injection_processor_test.go @@ -114,7 +114,7 @@ func TestTargetCountInjectionPodListProcessor(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { p := NewPodInjectionPodListProcessor(podinjectionbackoff.NewFakePodControllerRegistry()) - clusterSnapshot := testsnapshot.NewCustomTestSnapshotOrDie(t, store.NewDeltaSnapshotStore()) + clusterSnapshot := testsnapshot.NewCustomTestSnapshotOrDie(t, store.NewDeltaSnapshotStore(16)) err := clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(node, tc.scheduledPods...)) assert.NoError(t, err) ctx := context.AutoscalingContext{ diff --git a/cluster-autoscaler/simulator/clustersnapshot/predicate/predicate_snapshot_test.go b/cluster-autoscaler/simulator/clustersnapshot/predicate/predicate_snapshot_test.go index 615f3e5f91e8..b56a67e9e729 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/predicate/predicate_snapshot_test.go +++ b/cluster-autoscaler/simulator/clustersnapshot/predicate/predicate_snapshot_test.go @@ -56,7 +56,7 @@ var snapshots = map[string]func() (clustersnapshot.ClusterSnapshot, error){ if err != nil { return nil, err } - return NewPredicateSnapshot(store.NewDeltaSnapshotStore(), fwHandle, true), nil + return NewPredicateSnapshot(store.NewDeltaSnapshotStore(16), fwHandle, true), nil }, } diff --git a/cluster-autoscaler/simulator/clustersnapshot/store/delta.go b/cluster-autoscaler/simulator/clustersnapshot/store/delta.go index 40154047491a..705bc655893e 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/store/delta.go +++ b/cluster-autoscaler/simulator/clustersnapshot/store/delta.go @@ -17,11 +17,13 @@ limitations under the License. package store import ( + "context" "fmt" apiv1 "k8s.io/api/core/v1" "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot" + "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" ) @@ -44,7 +46,8 @@ import ( // pod affinity - causes scheduler framework to list pods with non-empty selector, // so basic caching doesn't help. type DeltaSnapshotStore struct { - data *internalDeltaSnapshotData + data *internalDeltaSnapshotData + parallelism int } type deltaSnapshotStoreNodeLister DeltaSnapshotStore @@ -137,10 +140,14 @@ func (data *internalDeltaSnapshotData) buildNodeInfoList() []*schedulerframework return nodeInfoList } -func (data *internalDeltaSnapshotData) addNode(node *apiv1.Node) error { +func (data *internalDeltaSnapshotData) addNode(node *apiv1.Node) (*schedulerframework.NodeInfo, error) { nodeInfo := schedulerframework.NewNodeInfo() nodeInfo.SetNode(node) - return data.addNodeInfo(nodeInfo) + err := data.addNodeInfo(nodeInfo) + if err != nil { + return nil, err + } + return nodeInfo, nil } func (data *internalDeltaSnapshotData) addNodeInfo(nodeInfo *schedulerframework.NodeInfo) error { @@ -241,6 +248,24 @@ func (data *internalDeltaSnapshotData) addPod(pod *apiv1.Pod, nodeName string) e return nil } +func (data *internalDeltaSnapshotData) addPodToNode(pod *apiv1.Pod, ni *schedulerframework.NodeInfo) error { + ni.AddPod(pod) + + // Maybe consider deleting from the list in the future. Maybe not. + data.clearCaches() + return nil +} + +func (data *internalDeltaSnapshotData) addPodsToNode(pods []*apiv1.Pod, ni *schedulerframework.NodeInfo) error { + for _, pod := range pods { + ni.AddPod(pod) + } + + // Maybe consider deleting from the list in the future. Maybe not. + data.clearCaches() + return nil +} + func (data *internalDeltaSnapshotData) removePod(namespace, name, nodeName string) error { // This always clones node info, even if the pod is actually missing. // Not sure if we mind, since removing non-existent pod @@ -403,8 +428,10 @@ func (snapshot *DeltaSnapshotStore) DeviceClasses() schedulerframework.DeviceCla } // NewDeltaSnapshotStore creates instances of DeltaSnapshotStore. -func NewDeltaSnapshotStore() *DeltaSnapshotStore { - snapshot := &DeltaSnapshotStore{} +func NewDeltaSnapshotStore(parallelism int) *DeltaSnapshotStore { + snapshot := &DeltaSnapshotStore{ + parallelism: parallelism, + } snapshot.clear() return snapshot } @@ -417,7 +444,7 @@ func (snapshot *DeltaSnapshotStore) DraSnapshot() drasnapshot.Snapshot { // AddSchedulerNodeInfo adds a NodeInfo. func (snapshot *DeltaSnapshotStore) AddSchedulerNodeInfo(nodeInfo *schedulerframework.NodeInfo) error { - if err := snapshot.data.addNode(nodeInfo.Node()); err != nil { + if _, err := snapshot.data.addNode(nodeInfo.Node()); err != nil { return err } for _, podInfo := range nodeInfo.Pods { @@ -428,24 +455,71 @@ func (snapshot *DeltaSnapshotStore) AddSchedulerNodeInfo(nodeInfo *schedulerfram return nil } +// setClusterStatePodsSequential sets the pods in cluster state in a sequential way. +func (snapshot *DeltaSnapshotStore) setClusterStatePodsSequential(nodeInfos []*schedulerframework.NodeInfo, nodeNameToIdx map[string]int, scheduledPods []*apiv1.Pod) error { + for _, pod := range scheduledPods { + if nodeIdx, ok := nodeNameToIdx[pod.Spec.NodeName]; ok { + if err := snapshot.data.addPodToNode(pod, nodeInfos[nodeIdx]); err != nil { + return err + } + } + } + return nil +} + +// setClusterStatePodsParallelized sets the pods in cluster state in parallel based on snapshot.parallelism value. +func (snapshot *DeltaSnapshotStore) setClusterStatePodsParallelized(nodeInfos []*schedulerframework.NodeInfo, nodeNameToIdx map[string]int, scheduledPods []*apiv1.Pod) error { + podsForNode := make([][]*apiv1.Pod, len(nodeInfos)) + for _, pod := range scheduledPods { + nodeIdx, ok := nodeNameToIdx[pod.Spec.NodeName] + if !ok { + continue + } + podsForNode[nodeIdx] = append(podsForNode[nodeIdx], pod) + } + + ctx := context.Background() + ctx, cancel := context.WithCancelCause(ctx) + + workqueue.ParallelizeUntil(ctx, snapshot.parallelism, len(nodeInfos), func(nodeIdx int) { + err := snapshot.data.addPodsToNode(podsForNode[nodeIdx], nodeInfos[nodeIdx]) + if err != nil { + cancel(err) + } + }) + + return context.Cause(ctx) +} + // SetClusterState sets the cluster state. func (snapshot *DeltaSnapshotStore) SetClusterState(nodes []*apiv1.Node, scheduledPods []*apiv1.Pod, draSnapshot drasnapshot.Snapshot) error { snapshot.clear() - knownNodes := make(map[string]bool) - for _, node := range nodes { - if err := snapshot.data.addNode(node); err != nil { + nodeNameToIdx := make(map[string]int, len(nodes)) + nodeInfos := make([]*schedulerframework.NodeInfo, len(nodes)) + for i, node := range nodes { + nodeInfo, err := snapshot.data.addNode(node) + if err != nil { return err } - knownNodes[node.Name] = true + nodeNameToIdx[node.Name] = i + nodeInfos[i] = nodeInfo } - for _, pod := range scheduledPods { - if knownNodes[pod.Spec.NodeName] { - if err := snapshot.data.addPod(pod, pod.Spec.NodeName); err != nil { - return err - } + + if snapshot.parallelism > 1 { + err := snapshot.setClusterStatePodsParallelized(nodeInfos, nodeNameToIdx, scheduledPods) + if err != nil { + return err + } + } else { + // TODO(macsko): Migrate to setClusterStatePodsParallelized for parallelism == 1 + // after making sure the implementation is always correct in CA 1.33. + err := snapshot.setClusterStatePodsSequential(nodeInfos, nodeNameToIdx, scheduledPods) + if err != nil { + return err } } + // TODO(DRA): Save DRA snapshot. return nil } diff --git a/cluster-autoscaler/simulator/clustersnapshot/store/delta_benchmark_test.go b/cluster-autoscaler/simulator/clustersnapshot/store/delta_benchmark_test.go index c10776cc9de8..5f618befd180 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/store/delta_benchmark_test.go +++ b/cluster-autoscaler/simulator/clustersnapshot/store/delta_benchmark_test.go @@ -48,7 +48,7 @@ func BenchmarkBuildNodeInfoList(b *testing.B) { for _, tc := range testCases { b.Run(fmt.Sprintf("fork add 1000 to %d", tc.nodeCount), func(b *testing.B) { nodes := clustersnapshot.CreateTestNodes(tc.nodeCount + 1000) - deltaStore := NewDeltaSnapshotStore() + deltaStore := NewDeltaSnapshotStore(16) if err := deltaStore.SetClusterState(nodes[:tc.nodeCount], nil, drasnapshot.Snapshot{}); err != nil { assert.NoError(b, err) } @@ -70,7 +70,7 @@ func BenchmarkBuildNodeInfoList(b *testing.B) { for _, tc := range testCases { b.Run(fmt.Sprintf("base %d", tc.nodeCount), func(b *testing.B) { nodes := clustersnapshot.CreateTestNodes(tc.nodeCount) - deltaStore := NewDeltaSnapshotStore() + deltaStore := NewDeltaSnapshotStore(16) if err := deltaStore.SetClusterState(nodes, nil, drasnapshot.Snapshot{}); err != nil { assert.NoError(b, err) }