Skip to content

Commit

Permalink
Support to track the chunks by local objects for kubernetes workflow. (
Browse files Browse the repository at this point in the history
…#1606)

With it, we can schedule the workloads that only produce the localobjects.

Signed-off-by: Ye Cao <[email protected]>
  • Loading branch information
dashanji authored Oct 31, 2023
1 parent 21a76b1 commit 0eefec8
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 11 deletions.
37 changes: 34 additions & 3 deletions k8s/pkg/schedulers/scheduling_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,18 @@ func (b *BestEffortStrategy) TrackingChunksByCRD() *BestEffortStrategy {
_ = multierr.Append(errList, err)
}

// if there is no local objects, return error
if len(localObjects) == 0 && len(globalObjects) != 0 {
_ = multierr.Append(errList, errors.Errorf("No local chunks found"))
if len(localObjects) == 0 {
if len(globalObjects) == 0 {
localObjects, err = b.GetLocalObjectsByID(b.required)
// if there is no local objects, return error
if err != nil {
_ = multierr.Append(errList, err)
}
} else {
// if there is no local objects, return error
_ = multierr.Append(errList, errors.Errorf("Failed to get local objects"))
}

}

if errList != nil {
Expand Down Expand Up @@ -335,6 +344,28 @@ func (b *BestEffortStrategy) GetGlobalObjectsByID(
return objects, nil
}

// GetLocalObjectsByID returns the local objects by the given jobname.
func (b *BestEffortStrategy) GetLocalObjectsByID(
jobNames []string,
) ([]*v1alpha1.LocalObject, error) {
requiredJobs := make(map[string]bool)
for _, n := range jobNames {
requiredJobs[n] = true
}
objects := []*v1alpha1.LocalObject{}
localObjects := &v1alpha1.LocalObjectList{}
if err := b.List(context.TODO(), localObjects); err != nil {
return nil, err
}
for i, obj := range localObjects.Items {
if jobname, exist := obj.Labels[labels.VineyardObjectJobLabel]; exist && requiredJobs[jobname] {
objects = append(objects, &localObjects.Items[i])
}
}

return objects, nil
}

// CreateConfigmapForID creates a configmap for the object id and the nodes.
func (b *BestEffortStrategy) CreateConfigmapForID(
jobname []string,
Expand Down
8 changes: 4 additions & 4 deletions k8s/test/e2e/kind-with-local-registry.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ containerdConfigPatches:
endpoint = ["http://kind-registry:5000"]
nodes:
- role: control-plane
image: kindest/node:v1.24.0
image: kindest/node:v1.25.11
- role: worker
image: kindest/node:v1.24.0
image: kindest/node:v1.25.11
- role: worker
image: kindest/node:v1.24.0
image: kindest/node:v1.25.11
- role: worker
image: kindest/node:v1.24.0
image: kindest/node:v1.25.11
8 changes: 4 additions & 4 deletions k8s/test/e2e/kind.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ kind: Cluster
apiVersion: kind.x-k8s.io/v1alpha4
nodes:
- role: control-plane
image: kindest/node:v1.24.0
image: kindest/node:v1.25.11
- role: worker
image: kindest/node:v1.24.0
image: kindest/node:v1.25.11
- role: worker
image: kindest/node:v1.24.0
image: kindest/node:v1.25.11
- role: worker
image: kindest/node:v1.24.0
image: kindest/node:v1.25.11

0 comments on commit 0eefec8

Please sign in to comment.