diff --git a/charts/distributed-serving/.helmignore b/charts/distributed-serving/.helmignore new file mode 100644 index 000000000..f0c131944 --- /dev/null +++ b/charts/distributed-serving/.helmignore @@ -0,0 +1,21 @@ +# Patterns to ignore when building packages. +# This supports shell glob matching, relative path matching, and +# negation (prefixed with !). Only one pattern per line. +.DS_Store +# Common VCS dirs +.git/ +.gitignore +.bzr/ +.bzrignore +.hg/ +.hgignore +.svn/ +# Common backup files +*.swp +*.bak +*.tmp +*~ +# Various IDEs +.project +.idea/ +*.tmproj diff --git a/charts/distributed-serving/CHANGLOG.md b/charts/distributed-serving/CHANGLOG.md new file mode 100644 index 000000000..005d6d965 --- /dev/null +++ b/charts/distributed-serving/CHANGLOG.md @@ -0,0 +1,3 @@ +### 0.1.0 + +* init distributed-serving chart \ No newline at end of file diff --git a/charts/distributed-serving/Chart.yaml b/charts/distributed-serving/Chart.yaml new file mode 100644 index 000000000..b4ea74551 --- /dev/null +++ b/charts/distributed-serving/Chart.yaml @@ -0,0 +1,5 @@ +apiVersion: v1 +appVersion: "1.0" +description: A Helm chart for distributed-serving +name: distributed-serving +version: 0.1.0 \ No newline at end of file diff --git a/charts/distributed-serving/templates/_helpers.tpl b/charts/distributed-serving/templates/_helpers.tpl new file mode 100644 index 000000000..a682efd60 --- /dev/null +++ b/charts/distributed-serving/templates/_helpers.tpl @@ -0,0 +1,32 @@ +{{/* vim: set filetype=mustache: */}} +{{/* +Expand the name of the chart. +*/}} +{{- define "distributed-serving.name" -}} +{{- default .Chart.Name .Values.nameOverride | trunc 63 | trimSuffix "-" -}} +{{- end -}} + +{{/* +Create a default fully qualified app name. +We truncate at 63 chars because some Kubernetes name fields are limited to this (by the DNS naming spec). +If release name contains chart name it will be used as a full name. +*/}} +{{- define "distributed-serving.fullname" -}} +{{- if .Values.fullnameOverride -}} +{{- .Values.fullnameOverride | trunc 63 | trimSuffix "-" -}} +{{- else -}} +{{- $name := default .Chart.Name .Values.nameOverride -}} +{{- if contains $name .Release.Name -}} +{{- .Release.Name | trunc 63 | trimSuffix "-" -}} +{{- else -}} +{{- printf "%s-%s" .Release.Name $name | trunc 63 | trimSuffix "-" -}} +{{- end -}} +{{- end -}} +{{- end -}} + +{{/* +Create chart name and version as used by the chart label. +*/}} +{{- define "distributed-serving.chart" -}} +{{- printf "%s-%s" .Chart.Name .Chart.Version | replace "+" "_" | trunc 63 | trimSuffix "-" -}} +{{- end -}} diff --git a/charts/distributed-serving/templates/configmap.yaml b/charts/distributed-serving/templates/configmap.yaml new file mode 100644 index 000000000..bf2837b48 --- /dev/null +++ b/charts/distributed-serving/templates/configmap.yaml @@ -0,0 +1,63 @@ +{{- $releaseName := .Release.Name }} +{{- $namespace := .Release.Namespace }} +{{- $workerNum := .Values.workers -}} +apiVersion: v1 +kind: ConfigMap +metadata: + name: {{ $releaseName }}-cm + labels: + app: {{ template "distributed-serving.name" $ }} + chart: {{ template "distributed-serving.chart" $ }} + release: {{ $releaseName }} + heritage: {{ .Release.Service }} + createdBy: "DistributedServing" +data: + {{- range $replica := until (int .Values.replicas) }} + hostfile-{{ $replica }}: |- + {{ $releaseName }}.{{ $releaseName }}-{{ $replica }}.{{ $namespace }} + {{- range $i := until (int $workerNum) }} + {{ $releaseName }}.{{ $releaseName }}-{{ $replica }}-{{ $i }}.{{ $namespace }} + {{- end }} + {{- end }} + master.rayInit: |- + #!/bin/bash + + ray_port=6379 + ray_init_timeout=300 + ray_cluster_size=$WORLD_SIZE + master_command=$1 + + ray start --head --port=$ray_port + + for (( i=0; i < $ray_init_timeout; i+=5 )); do + active_nodes=`python3 -c 'import ray; ray.init(); print(sum(node["Alive"] for node in ray.nodes()))'` + if [ $active_nodes -eq $ray_cluster_size ]; then + echo "All ray workers are active and the ray cluster is initialized successfully." + $master_command + exit 0 + fi + echo "Wait for all ray workers to be active. $active_nodes/$ray_cluster_size is active" + sleep 5s; + done + echo "Waiting for all ray workers to be active timed out." + exit 1 + worker.rayInit: |- + #!/bin/bash + + ray_port=6379 + ray_init_timeout=300 + ray_address=$MASTER_ADDR + worker_command=$1 + + for (( i=0; i < $ray_init_timeout; i+=5 )); do + ray start --address=$ray_address:$ray_port + if [ $? -eq 0 ]; then + echo "Worker: Ray runtime started with head address $ray_address:$ray_port" + $worker_command + exit 0 + fi + echo "Waiting until the ray worker is active..." + sleep 5s; + done + echo "Ray worker starts timeout, head address: $ray_address:$ray_port" + exit 1 \ No newline at end of file diff --git a/charts/distributed-serving/templates/distributed.yaml b/charts/distributed-serving/templates/distributed.yaml new file mode 100644 index 000000000..4600b145b --- /dev/null +++ b/charts/distributed-serving/templates/distributed.yaml @@ -0,0 +1,546 @@ +{{- $podNum := add (int .Values.masters) (int .Values.workers) -}} +{{- $masterGpuCount := .Values.masterGpus -}} +{{- $workerGpuCount := .Values.workerGpus -}} +{{- $masterGpuMemory := .Values.masterGPUMemory -}} +{{- $workerGpuMemory := .Values.workerGPUMemory -}} +{{- $masterGpuCore := .Values.masterGPUCore -}} +{{- $workerGpuCore := .Values.workerGPUCore -}} +{{- $dataDirs := .Values.dataDirs -}} +apiVersion: leaderworkerset.x-k8s.io/v1 +kind: LeaderWorkerSet +metadata: + name: {{ template "distributed-serving.fullname" . }} + labels: + heritage: {{ .Release.Service | quote }} + release: {{ .Release.Name | quote }} + chart: {{ template "distributed-serving.chart" . }} + app: {{ template "distributed-serving.name" . }} + servingName: "{{ .Values.servingName }}" + servingType: "distributed-serving" + serviceName: "{{ .Values.servingName }}" + servingVersion: "{{ .Values.servingVersion }}" + {{- range $key, $value := .Values.labels }} + {{ $key }}: {{ $value | quote }} + {{- end }} + annotations: + "helm.sh/created": {{ now | unixEpoch | quote }} +spec: + replicas: {{ .Values.replicas }} + rolloutStrategy: + rollingUpdateConfiguration: + {{- if .Values.maxSurge }} + maxSurge: {{ .Values.maxSurge }} + {{- end }} + {{- if .Values.maxUnavailable }} + maxUnavailable: {{ .Values.maxUnavailable }} + {{- end }} + leaderWorkerTemplate: + size: {{ $podNum }} + restartPolicy: RecreateGroupOnPodRestart + leaderTemplate: + metadata: + annotations: + {{- if eq .Values.enableIstio true }} + sidecar.istio.io/inject: "true" + {{- end }} + {{- range $key, $value := .Values.annotations }} + {{ $key }}: {{ $value | quote }} + {{- end }} + labels: + heritage: {{ .Release.Service | quote }} + release: {{ .Release.Name | quote }} + chart: {{ template "distributed-serving.chart" . }} + app: {{ template "distributed-serving.name" . }} + serviceName: "{{ .Values.servingName }}" + servingType: "distributed-serving" + servingName: "{{ .Values.servingName }}" + servingVersion: "{{ .Values.servingVersion }}" + role: "master" + {{- range $key, $value := .Values.labels }} + {{ $key }}: {{ $value | quote }} + {{- end }} + spec: + {{- if ne (len .Values.nodeSelectors) 0 }} + nodeSelector: + {{- range $nodeKey,$nodeVal := .Values.nodeSelectors }} + {{ $nodeKey }}: "{{ $nodeVal }}" + {{- end }} + {{- end }} + {{- if .Values.schedulerName }} + schedulerName: {{ .Values.schedulerName }} + {{- end }} + {{- if ne (len .Values.tolerations) 0 }} + tolerations: + {{- range $tolerationKey := .Values.tolerations }} + - {{- if $tolerationKey.key }} + key: "{{ $tolerationKey.key }}" + {{- end }} + {{- if $tolerationKey.value }} + value: "{{ $tolerationKey.value }}" + {{- end }} + {{- if $tolerationKey.effect }} + effect: "{{ $tolerationKey.effect }}" + {{- end }} + {{- if $tolerationKey.operator }} + operator: "{{ $tolerationKey.operator }}" + {{- end }} + {{- end }} + {{- end }} + {{- if ne (len .Values.imagePullSecrets) 0 }} + imagePullSecrets: + {{- range $imagePullSecret := .Values.imagePullSecrets }} + - name: "{{ $imagePullSecret }}" + {{- end }} + {{- end }} + containers: + - name: distributed-serving-master + image: {{ .Values.image }} + {{- if .Values.imagePullPolicy }} + imagePullPolicy: "{{ .Values.imagePullPolicy }}" + {{- end }} + env: + {{- if .Values.envs }} + {{- range $key, $value := .Values.envs }} + - name: "{{ $key }}" + value: "{{ $value }}" + {{- end }} + {{- end }} + {{- if .Values.envsFromSecret }} + {{- range $envName, $secretName := .Values.envsFromSecret }} + - name: "{{ $envName }}" + valueFrom: + secretKeyRef: + key: "{{ $envName }}" + name: "{{ $secretName }}" + {{- end }} + {{- end }} + - name: MASTER_ADDR + value: $(LWS_LEADER_ADDRESS) + - name: WORLD_SIZE + valueFrom: + fieldRef: + fieldPath: metadata.annotations['leaderworkerset.sigs.k8s.io/size'] + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: POD_INDEX + valueFrom: + fieldRef: + fieldPath: metadata.labels['leaderworkerset.sigs.k8s.io/worker-index'] + - name: GROUP_INDEX + valueFrom: + fieldRef: + fieldPath: metadata.labels['leaderworkerset.sigs.k8s.io/group-index'] + - name: GPU_COUNT + value: "{{ .Values.masterGpus }}" + - name: HOSTFILE + value: /etc/hostfile + - name: ROLE + value: master + {{- if and (eq (int $masterGpuCount) 0) (eq (int $masterGpuMemory) 0) (eq (int $masterGpuCore) 0) }} + - name: NVIDIA_VISIBLE_DEVICES + value: void + {{- end }} + command: + {{- if eq .Values.initBackend "ray" }} + - "/etc/ray_init.sh" + {{- else }} + - {{ .Values.shell }} + - -c + {{- end }} + - {{ .Values.masterCommand }} + ports: + {{- if ne (int .Values.port) 0 }} + - containerPort: {{ .Values.port }} + name: grpc + protocol: TCP + {{- end }} + {{- if ne (int .Values.restApiPort) 0 }} + - containerPort: {{ .Values.restApiPort }} + name: restful + protocol: TCP + {{- end }} + {{- if ne (int .Values.metricsPort) 0 }} + - containerPort: {{ .Values.metricsPort }} + name: metrics + protocol: TCP + {{- end }} + {{- if .Values.livenessProbeAction }} + livenessProbe: + {{ .Values.livenessProbeAction }}: + {{- range $key := .Values.livenessProbeActionOption }} + {{ $key }} + {{- end }} + {{- range $key := .Values.livenessProbeOption }} + {{ $key }} + {{- end }} + {{- end }} + {{- if .Values.readinessProbeAction }} + readinessProbe: + {{ .Values.readinessProbeAction }}: + {{- range $key := .Values.readinessProbeActionOption }} + {{ $key }} + {{- end }} + {{- range $key := .Values.readinessProbeOption }} + {{ $key }} + {{- end }} + {{- end }} + {{- if .Values.startupProbeAction }} + startupProbe: + {{ .Values.startupProbeAction }}: + {{- range $key := .Values.startupProbeActionOption }} + {{ $key }} + {{- end }} + {{- range $key := .Values.startupProbeOption }} + {{ $key }} + {{- end }} + {{- end }} + resources: + limits: + {{- if .Values.masterCpus }} + cpu: {{ .Values.masterCpus }} + {{- end }} + {{- if .Values.masterMemory }} + memory: {{ .Values.masterMemory }} + {{- end }} + {{- if gt (int $masterGpuCount) 0}} + nvidia.com/gpu: {{ .Values.masterGpus }} + {{- end }} + {{- range $key, $value := .Values.devices }} + {{ $key }}: {{ $value }} + {{- end }} + {{- if gt (int $masterGpuMemory) 0}} + aliyun.com/gpu-mem: {{ .Values.masterGPUMemory }} + {{- end }} + {{- if gt (int $masterGpuCore) 0 }} + aliyun.com/gpu-core.percentage: {{ .Values.masterGPUCore }} + {{- end }} + volumeMounts: + {{- if .Values.shareMemory }} + - name: dshm + mountPath: /dev/shm + {{- end }} + {{- if .Values.modelDirs }} + {{- range $pvcName, $destPath := .Values.modelDirs}} + - name: "{{ $pvcName }}" + mountPath: "{{ $destPath }}" + {{- if hasKey $.Values.dataSubPathExprs $pvcName }} + subPathExpr: {{ get $.Values.dataSubPathExprs $pvcName }} + {{- end }} + {{- end }} + {{- end }} + {{- if .Values.tempDirs }} + {{- range $name, $destPath := .Values.tempDirs}} + - name: "{{ $name }}" + mountPath: "{{ $destPath }}" + {{- if hasKey $.Values.tempDirSubPathExprs $name }} + subPathExpr: {{ get $.Values.tempDirSubPathExprs $name }} + {{- end }} + {{- end }} + {{- end }} + {{- if ne (len .Values.configFiles) 0 }} + {{- $releaseName := .Release.Name }} + {{- range $containerPathKey,$configFileInfos := .Values.configFiles }} + {{- $visit := "false" }} + {{- range $cofigFileKey,$configFileInfo := $configFileInfos }} + {{- if eq "false" $visit }} + - name: {{ $containerPathKey }} + mountPath: {{ $configFileInfo.containerFilePath }} + {{- $visit = "true" }} + {{- end }} + {{- end }} + {{- end }} + {{- end }} + {{- if $dataDirs }} + {{- range $dataDirs }} + - name: {{ .name }} + mountPath: {{ .containerPath }} + {{- end }} + {{- end }} + - name: {{ $.Release.Name }}-cm + mountPath: /etc/hostfile + subPathExpr: hostfile-$(GROUP_INDEX) + {{- if eq .Values.initBackend "ray" }} + - name: {{ $.Release.Name }}-cm + mountPath: /etc/ray_init.sh + subPathExpr: ray_init.sh + {{- end }} + volumes: + - name: {{ $.Release.Name }}-cm + configMap: + name: {{ $.Release.Name }}-cm + items: + {{- range $i := until (int .Values.replicas) }} + - key: hostfile-{{ $i }} + path: hostfile-{{ $i }} + mode: 438 + {{- end }} + {{- if eq .Values.initBackend "ray" }} + - key: master.rayInit + path: ray_init.sh + mode: 365 + {{- end }} + {{- if .Values.shareMemory }} + - name: dshm + emptyDir: + medium: Memory + sizeLimit: {{ .Values.shareMemory }} + {{- end }} + {{- if .Values.modelDirs }} + {{- range $pvcName, $destPath := .Values.modelDirs}} + - name: "{{ $pvcName }}" + persistentVolumeClaim: + claimName: "{{ $pvcName }}" + {{- end }} + {{- end }} + {{- if .Values.tempDirs }} + {{- range $name, $destPath := .Values.tempDirs}} + - name: "{{ $name }}" + emptyDir: {} + {{- end }} + {{- end }} + {{- if ne (len .Values.configFiles) 0 }} + {{- $releaseName := .Release.Name }} + {{- range $containerPathKey,$configFileInfos := .Values.configFiles }} + - name: {{ $containerPathKey }} + configMap: + name: {{ $releaseName }}-{{ $containerPathKey }} + {{- end }} + {{- end }} + {{- if $dataDirs }} + {{- range $dataDirs }} + - name: {{ .name }} + hostPath: + path: {{ .hostPath }} + {{- end }} + {{- end }} + workerTemplate: + metadata: + annotations: + {{- if eq .Values.enableIstio true }} + sidecar.istio.io/inject: "true" + {{- end }} + {{- range $key, $value := .Values.annotations }} + {{ $key }}: {{ $value | quote }} + {{- end }} + labels: + heritage: {{ .Release.Service | quote }} + release: {{ .Release.Name | quote }} + chart: {{ template "distributed-serving.chart" . }} + app: {{ template "distributed-serving.name" . }} + serviceName: "{{ .Values.servingName }}" + servingType: "distributed-serving" + servingName: "{{ .Values.servingName }}" + servingVersion: "{{ .Values.servingVersion }}" + role: "worker" + spec: + {{- if ne (len .Values.nodeSelectors) 0 }} + nodeSelector: + {{- range $nodeKey,$nodeVal := .Values.nodeSelectors }} + {{ $nodeKey }}: "{{ $nodeVal }}" + {{- end }} + {{- end }} + {{- if .Values.schedulerName }} + schedulerName: {{ .Values.schedulerName }} + {{- end }} + {{- if ne (len .Values.tolerations) 0 }} + tolerations: + {{- range $tolerationKey := .Values.tolerations }} + - {{- if $tolerationKey.key }} + key: "{{ $tolerationKey.key }}" + {{- end }} + {{- if $tolerationKey.value }} + value: "{{ $tolerationKey.value }}" + {{- end }} + {{- if $tolerationKey.effect }} + effect: "{{ $tolerationKey.effect }}" + {{- end }} + {{- if $tolerationKey.operator }} + operator: "{{ $tolerationKey.operator }}" + {{- end }} + {{- end }} + {{- end }} + {{- if ne (len .Values.imagePullSecrets) 0 }} + imagePullSecrets: + {{- range $imagePullSecret := .Values.imagePullSecrets }} + - name: "{{ $imagePullSecret }}" + {{- end }} + {{- end }} + containers: + - name: distributed-serving-worker + image: {{ .Values.image }} + {{- if .Values.imagePullPolicy }} + imagePullPolicy: "{{ .Values.imagePullPolicy }}" + {{- end }} + env: + {{- if .Values.envs }} + {{- range $key, $value := .Values.envs }} + - name: "{{ $key }}" + value: "{{ $value }}" + {{- end }} + {{- end }} + {{- if .Values.envsFromSecret }} + {{- range $envName, $secretName := .Values.envsFromSecret }} + - name: "{{ $envName }}" + valueFrom: + secretKeyRef: + key: "{{ $envName }}" + name: "{{ $secretName }}" + {{- end }} + {{- end }} + - name: MASTER_ADDR + value: $(LWS_LEADER_ADDRESS) + - name: WORLD_SIZE + valueFrom: + fieldRef: + fieldPath: metadata.annotations['leaderworkerset.sigs.k8s.io/size'] + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: POD_INDEX + valueFrom: + fieldRef: + fieldPath: metadata.labels['leaderworkerset.sigs.k8s.io/worker-index'] + - name: GROUP_INDEX + valueFrom: + fieldRef: + fieldPath: metadata.labels['leaderworkerset.sigs.k8s.io/group-index'] + - name: GPU_COUNT + value: "{{ .Values.workerGpus }}" + - name: HOSTFILE + value: /etc/hostfile + - name: ROLE + value: worker + {{- if and (eq (int $workerGpuCount) 0) (eq (int $workerGpuMemory) 0) (eq (int $workerGpuCore) 0) }} + - name: NVIDIA_VISIBLE_DEVICES + value: void + {{- end }} + command: + {{- if eq .Values.initBackend "ray" }} + - "/etc/ray_init.sh" + {{- else }} + - {{ .Values.shell }} + - -c + {{- end }} + - {{ .Values.workerCommand }} + resources: + limits: + {{- if .Values.workerCpus }} + cpu: {{ .Values.workerCpus }} + {{- end }} + {{- if .Values.workerMemory }} + memory: {{ .Values.workerMemory }} + {{- end }} + {{- if gt (int $workerGpuCount) 0}} + nvidia.com/gpu: {{ .Values.workerGpus }} + {{- end }} + {{- range $key, $value := .Values.devices }} + {{ $key }}: {{ $value }} + {{- end }} + {{- if gt (int $workerGpuMemory) 0}} + aliyun.com/gpu-mem: {{ .Values.workerGPUMemory }} + {{- end }} + {{- if gt (int $workerGpuCore) 0 }} + aliyun.com/gpu-core.percentage: {{ .Values.workerGPUCore }} + {{- end }} + volumeMounts: + {{- if .Values.shareMemory }} + - name: dshm + mountPath: /dev/shm + {{- end }} + {{- if .Values.modelDirs }} + {{- range $pvcName, $destPath := .Values.modelDirs}} + - name: "{{ $pvcName }}" + mountPath: "{{ $destPath }}" + {{- if hasKey $.Values.dataSubPathExprs $pvcName }} + subPathExpr: {{ get $.Values.dataSubPathExprs $pvcName }} + {{- end }} + {{- end }} + {{- end }} + {{- if .Values.tempDirs }} + {{- range $name, $destPath := .Values.tempDirs}} + - name: "{{ $name }}" + mountPath: "{{ $destPath }}" + {{- if hasKey $.Values.tempDirSubPathExprs $name }} + subPathExpr: {{ get $.Values.tempDirSubPathExprs $name }} + {{- end }} + {{- end }} + {{- end }} + {{- if ne (len .Values.configFiles) 0 }} + {{- $releaseName := .Release.Name }} + {{- range $containerPathKey,$configFileInfos := .Values.configFiles }} + {{- $visit := "false" }} + {{- range $cofigFileKey,$configFileInfo := $configFileInfos }} + {{- if eq "false" $visit }} + - name: {{ $containerPathKey }} + mountPath: {{ $configFileInfo.containerFilePath }} + {{- $visit = "true" }} + {{- end }} + {{- end }} + {{- end }} + {{- end }} + {{- if $dataDirs }} + {{- range $dataDirs }} + - name: {{ .name }} + mountPath: {{ .containerPath }} + {{- end }} + {{- end }} + - name: {{ $.Release.Name }}-cm + mountPath: /etc/hostfile + subPathExpr: hostfile-$(GROUP_INDEX) + {{- if eq .Values.initBackend "ray" }} + - name: {{ $.Release.Name }}-cm + mountPath: /etc/ray_init.sh + subPathExpr: ray_init.sh + {{- end }} + volumes: + - name: {{ $.Release.Name }}-cm + configMap: + name: {{ $.Release.Name }}-cm + items: + {{- range $i := until (int .Values.replicas) }} + - key: hostfile-{{ $i }} + path: hostfile-{{ $i }} + mode: 438 + {{- end }} + {{- if eq .Values.initBackend "ray" }} + - key: worker.rayInit + path: ray_init.sh + mode: 365 + {{- end }} + {{- if .Values.shareMemory }} + - name: dshm + emptyDir: + medium: Memory + sizeLimit: {{ .Values.shareMemory }} + {{- end }} + {{- if .Values.modelDirs }} + {{- range $pvcName, $destPath := .Values.modelDirs}} + - name: "{{ $pvcName }}" + persistentVolumeClaim: + claimName: "{{ $pvcName }}" + {{- end }} + {{- end }} + {{- if .Values.tempDirs }} + {{- range $name, $destPath := .Values.tempDirs}} + - name: "{{ $name }}" + emptyDir: {} + {{- end }} + {{- end }} + {{- if ne (len .Values.configFiles) 0 }} + {{- $releaseName := .Release.Name }} + {{- range $containerPathKey,$configFileInfos := .Values.configFiles }} + - name: {{ $containerPathKey }} + configMap: + name: {{ $releaseName }}-{{ $containerPathKey }} + {{- end }} + {{- end }} + {{- if $dataDirs }} + {{- range $dataDirs }} + - name: {{ .name }} + hostPath: + path: {{ .hostPath }} + {{- end }} + {{- end }} \ No newline at end of file diff --git a/charts/distributed-serving/templates/service.yaml b/charts/distributed-serving/templates/service.yaml new file mode 100644 index 000000000..583cb365f --- /dev/null +++ b/charts/distributed-serving/templates/service.yaml @@ -0,0 +1,39 @@ +{{- if .Values.servingVersion }} +apiVersion: v1 +kind: Service +metadata: + name: {{ .Values.servingName }}-{{ .Values.servingVersion }} + labels: + heritage: {{ .Release.Service | quote }} + release: {{ .Release.Name | quote }} + chart: {{ template "distributed-serving.chart" . }} + app: {{ template "distributed-serving.name" . }} + servingName: {{ .Values.servingName }} + servingVersion: "{{ .Values.servingVersion }}" + servingType: "distributed-serving" + {{- range $key, $value := .Values.labels }} + {{ $key }}: {{ $value | quote }} + {{- end }} +spec: + type: {{ .Values.serviceType }} + ports: + {{- if ne (int .Values.port) 0 }} + - name: grpc-serving + port: {{ .Values.port }} + targetPort: {{ .Values.port }} + {{- end }} + {{- if ne (int .Values.restApiPort) 0 }} + - name: http-serving + port: {{ .Values.restApiPort }} + targetPort: {{ .Values.restApiPort }} + {{- end }} + {{- if ne (int .Values.metricsPort) 0 }} + - name: http-metrics + port: {{ .Values.metricsPort }} + targetPort: {{ .Values.metricsPort }} + {{- end }} + selector: + app: {{ template "distributed-serving.name" . }} + release: {{ .Release.Name | quote }} + role: master +{{- end }} diff --git a/charts/distributed-serving/values.yaml b/charts/distributed-serving/values.yaml new file mode 100644 index 000000000..f76ed9ba5 --- /dev/null +++ b/charts/distributed-serving/values.yaml @@ -0,0 +1,32 @@ +# Default values for distributed-serving. +# This is a YAML-formatted file. +# Declare variables to be passed into your templates. + +serviceType: ClusterIP + +## serving name and version +# servingName: +# servingVersion: + +## expose the service to the grpc client +port: 8500 +restApiPort: 8501 +metricsPort: 0 +replicas: 1 + +# device resources +#devices: amd.com/gpu=1 + +# repository: "cheyang/tf-model-server-gpu" +image: "tensorflow/serving:latest" + +imagePullPolicy: "IfNotPresent" + +nodeSelector: {} + +tolerations: [] + +affinity: {} + +dataSubPathExprs: + a: b \ No newline at end of file diff --git a/docs/cli/arena_serve_distributed.md b/docs/cli/arena_serve_distributed.md new file mode 100644 index 000000000..a72e42215 --- /dev/null +++ b/docs/cli/arena_serve_distributed.md @@ -0,0 +1,87 @@ +## arena serve distributed + +Submit distributed server job to deploy and serve machine learning models. + +### Synopsis + +Submit distributed server job to deploy and serve machine learning models. + +``` + arena serve distributed [flags] +``` + +### Options + +``` + -a, --annotation stringArray specify the annotations, usage: "--annotation=key=value" or "--annotation key=value" + --command string specify the container command + --config-file stringArray giving configuration files when serving model, usage:"--config-file :" + -d, --data stringArray specify the trained models datasource to mount for serving, like : + --data-dir stringArray specify the trained models datasource on host to mount for serving, like : + --data-subpath-expr stringArray specify the datasource subpath to mount to the job by expression, like : + --device stringArray the chip vendors and count that used for resources, such as amd.com/gpu=1 gpu.intel.com/i915=1. + --enable-istio enable Istio for serving or not (disable Istio by default) + -e, --env stringArray the environment variables, usage: "--env envName=envValue" + --env-from-secret stringArray the environment variables using Secret data, usage: "--env-from-secret envName=secretName" + --expose-service expose service using Istio gateway for external access or not (not expose by default) + -h, --help help for distributed + --image string the docker image name of serving job + --image-pull-policy string the policy to pull the image, and the default policy is IfNotPresent (default "IfNotPresent") + --image-pull-secret stringArray giving names of imagePullSecret when you want to use a private registry, usage:"--image-pull-secret " + --init-backend string specity the init backend for distributed serving job. Currently only support ray. support: ray + -l, --label stringArray specify the labels + --liveness-probe-action string the liveness probe action, support httpGet,exec,grpc,tcpSocket + --liveness-probe-action-option stringArray the liveness probe action option, usage: --liveness-probe-action-option="path: /healthz" or --liveness-probe-action-option="command=cat /tmp/healthy" + --liveness-probe-option stringArray the liveness probe option, usage: --liveness-probe-option="initialDelaySeconds: 3" or --liveness-probe-option="periodSeconds: 3" + --master-command string the command to run for the master pod + --master-cpu string the cpu resource to use for the master pod, like 1 for 1 core + --master-gpucore int the limit GPU core of master pod to run the serve + --master-gpumemory int the limit GPU memory of master pod to run the serve + --master-gpus int the gpu resource to use for the master pod, like 1 for 1 gpu + --master-memory string the memory resource to use for the master pod, like 1Gi + --masters int the number of the master pods (p.s. only support 1 master currently) (default 1) + --max-surge string the maximum number of pods that can be created over the desired number of pods + --max-unavailable string the maximum number of Pods that can be unavailable during the update process + --metrics-port int the port of metrics, default is 0 represents that don't create service listening on this port + --model-name string model name + --model-version string model version + --name string the serving name + --port int the port of gRPC listening port, default is 0 represents that don't create service listening on this port + --readiness-probe-action string the readiness probe action, support httpGet,exec,grpc,tcpSocket + --readiness-probe-action-option stringArray the readiness probe action option, usage: --readiness-probe-action-option="path: /healthz" or --readiness-probe-action-option="command=cat /tmp/healthy" + --readiness-probe-option stringArray the readiness probe option, usage: --readiness-probe-option="initialDelaySeconds: 3" or --readiness-probe-option="periodSeconds: 3" + --replicas int the replicas number of the serve job. (default 1) + --restful-port int the port of RESTful listening port, default is 0 represents that don't create service listening on this port + --selector stringArray assigning jobs to some k8s particular nodes, usage: "--selector=key=value" or "--selector key=value" + --share-memory string the request share memory of each replica to run the serve. + --shell string specify the linux shell, usage: bash or sh (default "sh") + --startup-probe-action string the startup probe action, support httpGet,exec,grpc,tcpSocket + --startup-probe-action-option stringArray the startup probe action option, usage: --startup-probe-action-option="path: /healthz" or --startup-probe-action-option="command=cat /tmp/healthy" + --startup-probe-option stringArray the startup probe option, usage: --startup-probe-option="initialDelaySeconds: 3" or --startup-probe-option="periodSeconds: 3" + --temp-dir stringArray specify the deployment empty dir, like : + --temp-dir-subpath-expr stringArray specify the datasource subpath to mount to the pod by expression, like : + --toleration stringArray tolerate some k8s nodes with taints,usage: "--toleration key=value:effect,operator" or "--toleration all" + --version string the serving version + --worker-command string the command to run of each worker pods + --worker-cpu string the cpu resource to use for each worker pods, like 1 for 1 core + --worker-gpucore int the limit GPU core of each worker pods to run the serve + --worker-gpumemory int the limit GPU memory of each worker pods to run the serve + --worker-gpus int the gpu resource to use for each worker pods, like 1 for 1 gpu + --worker-memory string the memory resource to use for the worker pods, like 1Gi + --workers int the number of the worker pods +``` + +### Options inherited from parent commands + +``` + --arena-namespace string The namespace of arena system service, like tf-operator (default "arena-system") + --config string Path to a kube config. Only required if out-of-cluster + --loglevel string Set the logging level. One of: debug|info|warn|error (default "info") + -n, --namespace string the namespace of the job + --pprof enable cpu profile + --trace enable trace +``` + +### SEE ALSO + +* [arena serve](arena_serve.md) - Serve a job. \ No newline at end of file diff --git a/docs/serving/distributedserving/serving.md b/docs/serving/distributedserving/serving.md new file mode 100644 index 000000000..16ac7fdec --- /dev/null +++ b/docs/serving/distributedserving/serving.md @@ -0,0 +1,100 @@ +# Submit a distributed serving job + +This guide walks through the steps to deploy and serve a model on two nodes and each node has 1 gpu. To illustrate usage, we use the [Qwen2-1.5B](https://modelscope.cn/models/Qwen/Qwen2-1.5B) downloaded from modelscope and deploy it by using vllm with pipeline-parallel-size equals to 2. + +## Prerequisites + +- Install LeaderWorkerSet API to your k8s cluster following this [guide](https://github.com/kubernetes-sigs/lws/blob/main/docs/setup/install.md) (required) +- Create a pvc named `test-pvc` with models to depoy + +## Steps + +1\. Sumbit vllm distributed serving job with: + + $ arena serve distributed \ + --name=vllm \ + --version=alpha \ + --restful-port=5000 \ + --image=vllm/vllm-openai:latest \ + --data=test-pvc:/mnt/models \ + --masters=1 \ + --master-gpus=1 + --master-command="ray start --head --port=6379; vllm serve /mnt/models/Qwen2-1.5B \ + --port 5000 \ + --dtype half \ + --pipeline-parallel-size 2" \ + --workers=1 \ + --worker-gpus=1 \ + --worker-command="ray start --address=\$(MASTER_ADDR):6379 --block" \ + --share-memory=4Gi \ + --startup-probe-action=httpGet \ + --startup-probe-action-option="path: /health" \ + --startup-probe-action-option="port: 5000" \ + --startup-probe-option="periodSeconds: 60" \ + --startup-probe-option="failureThreshold: 5" + configmap/vllm-alpha-cm created + service/vllm-alpha created + leaderworkerset.leaderworkerset.x-k8s.io/vllm-alpha-distributed-serving created + INFO[0002] The Job vllm has been submitted successfully + INFO[0002] You can run `arena serve get vllm --type distributed-serving -n default` to check the job status + +In this example, we use `MASTER_ADDR` to get the address of the master pod in worker command. This environment variable is automatically injected into all pods by Arena when it creates the job. Besides this variable, there are several others environment variables created by Arena in order to help user to deploy the distributed serving job: + +- `WORLD_SIZE`: The number of pod in one replica, equals to `#masters + #workers`. +- `POD_NAME`: The name of current pod. +- `POD_INDEX`: The index of current pod in the replica, starts from 0 which can only be master pod +- `GROUP_INDEX`: The index of current replica, starts from 0. +- `HOSTFILE`: The hostfile path of current replica, which contains the hostname of all pods in the replica. +- `GPU_COUNT`: The number of gpu of current pod. +- `ROLE`: The role of current pod, can only be master or worker. + +!!! note + + To run a distributed serving job, you need to specify: + + - `--workers`: The number of workers (default is 0). + - `--master-gpus`: GPUs of master (default is 0). + - `--worker-gpus`: GPUs of each worker (default is 0). + - `--master-command`: The command to run on master (required). + - `--worker-command`: The command to run on each worker (required). + + If you do not explicitly specify the command for the master and worker but run the command like following format: + + $ arena serve distributed --name=test ... "command" + + arena will automatically run this command on both the master and the worker. + +!!! warning + To avoid ambiguity, the distributed serving job exposes the `--masters` parameter to user. But currently arena does not support modifying master numbers in distributed serving job. By default, one replica can only have one master pod. + +2\. List the job you were just serving + + $ arena serve list --type=distributed + NAME TYPE VERSION DESIRED AVAILABLE ADDRESS PORTS GPU + vllm Distributed alpha 1 1 172.21.5.50 RESTFUL:5000 2 + +3\. Test the model service + + $ kubectl get svc | grep vllm + vllm-alpha ClusterIP 172.21.13.60 5000/TCP 3m24s + vllm-alpha-distributed-serving ClusterIP None 3m24s + + $ kubectl port-forward svc/vllm-alpha 5000:5000 + Forwarding from 127.0.0.1:5000 -> 5000 + Forwarding from [::1]:5000 -> 5000 + + # check model service + $ curl -X POST http://localhost:5000/v1/completions \ + -H "Content-Type: application/json" \ + -d '{ + "model": "/mnt/oss/models/Qwen2-1.5B", + "prompt": "Please count from 1 to 10: 1, 2", + "max_tokens": 32 + }' + {"id":"cmpl-9f6c8d3be9ae476ca0cc3e393ec370a6","object":"text_completion","created":1730190958,"model":"/mnt/oss/models/Qwen2-1.5B","choices":[{"index":0,"text":", 3, 4, 5, 6, 7, 8, 9, 10. These observations boiling point. According","logprobs":null,"finish_reason":"length","stop_reason":null}],"usage":{"prompt_tokens":15,"total_tokens":47,"completion_tokens":32}} + + +4\. Delete the inference service + + $ arena serve delete vllm + INFO[0001] The serving job vllm with version alpha has been deleted successfully \ No newline at end of file diff --git a/docs/serving/index.md b/docs/serving/index.md index 07b9ed83d..e7882e378 100644 --- a/docs/serving/index.md +++ b/docs/serving/index.md @@ -45,3 +45,6 @@ If you want to use arena to manage serving jobs, this guide is for you. we have * I want to [submit a kserve job with supported serving runtime](kserve/sklearn.md) * I want to [submit a kserve job with custom serving runtime](kserve/custom.md) + +## Distributed Serving Job Guide +* I want to [submit a distributed serving job](distributedserving/serving.md). \ No newline at end of file diff --git a/go.mod b/go.mod index 9315c316a..54736ed23 100644 --- a/go.mod +++ b/go.mod @@ -27,6 +27,7 @@ require ( k8s.io/client-go v0.29.6 k8s.io/kubectl v0.26.4 sigs.k8s.io/controller-runtime v0.17.5 + sigs.k8s.io/lws v0.3.0 ) require ( diff --git a/go.sum b/go.sum index 6c3243d24..766173347 100644 --- a/go.sum +++ b/go.sum @@ -220,8 +220,8 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f h1:KUppIJq7/+SVif2QVs3tOP0zanoHgBEVAwHxUSIzRqU= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= -github.com/onsi/ginkgo/v2 v2.17.2 h1:7eMhcy3GimbsA3hEnVKdw/PQM9XN9krpKVXsZdph0/g= -github.com/onsi/ginkgo/v2 v2.17.2/go.mod h1:nP2DPOQoNsQmsVyv5rDA8JkXQoCs6goXIvr/PRJ1eCc= +github.com/onsi/ginkgo/v2 v2.19.0 h1:9Cnnf7UHo57Hy3k6/m5k3dRfGTMXGvxhHFvkDTCTpvA= +github.com/onsi/ginkgo/v2 v2.19.0/go.mod h1:rlwLi9PilAFJ8jCg9UE1QP6VBpd6/xj3SRC0d6TU0To= github.com/onsi/gomega v1.33.1 h1:dsYjIxxSR755MDmKVsaFQTE22ChNBcuuTWgkUDSubOk= github.com/onsi/gomega v1.33.1/go.mod h1:U4R44UsT+9eLIaYRB2a5qajjtQYn0hauxvRm16AVYg0= github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= @@ -505,6 +505,8 @@ sigs.k8s.io/kustomize/api v0.13.5-0.20230601165947-6ce0bf390ce3 h1:XX3Ajgzov2RKU sigs.k8s.io/kustomize/api v0.13.5-0.20230601165947-6ce0bf390ce3/go.mod h1:9n16EZKMhXBNSiUC5kSdFQJkdH3zbxS/JoO619G1VAY= sigs.k8s.io/kustomize/kyaml v0.14.3-0.20230601165947-6ce0bf390ce3 h1:W6cLQc5pnqM7vh3b7HvGNfXrJ/xL6BDMS0v1V/HHg5U= sigs.k8s.io/kustomize/kyaml v0.14.3-0.20230601165947-6ce0bf390ce3/go.mod h1:JWP1Fj0VWGHyw3YUPjXSQnRnrwezrZSrApfX5S0nIag= +sigs.k8s.io/lws v0.3.0 h1:PtjiDHZWCxAeMyrsmPNN0i7KAVf6ocVEQFcojPWeA+k= +sigs.k8s.io/lws v0.3.0/go.mod h1:/R1Q2LB2eg6t9mX5M6V4HLkeucxBFgOyaKkSGh/FGAY= sigs.k8s.io/structured-merge-diff/v4 v4.4.1 h1:150L+0vs/8DA78h1u02ooW1/fFq/Lwr+sGiqlzvrtq4= sigs.k8s.io/structured-merge-diff/v4 v4.4.1/go.mod h1:N8hJocpFajUSSeSJ9bOZ77VzejKZaXsTtZo4/u7Io08= sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E= diff --git a/pkg/apis/arenaclient/serving_client.go b/pkg/apis/arenaclient/serving_client.go index fee6a5cbc..09f4e83c9 100644 --- a/pkg/apis/arenaclient/serving_client.go +++ b/pkg/apis/arenaclient/serving_client.go @@ -73,6 +73,9 @@ func (t *ServingJobClient) Submit(job *apiserving.Job) error { case types.TritonServingJob: args := job.Args().(*types.TritonServingArgs) return serving.SubmitTritonServingJob(args.Namespace, args) + case types.DistributedServingJob: + args := job.Args().(*types.DistributedServingArgs) + return serving.SubmitDistributedServingJob(args.Namespace, args) } return nil } @@ -192,6 +195,9 @@ func (t *ServingJobClient) Update(job *apiserving.Job) error { case types.KServeJob: args := job.Args().(*types.UpdateKServeArgs) return serving.UpdateKServe(args) + case types.DistributedServingJob: + args := job.Args().(*types.UpdateDistributedServingArgs) + return serving.UpdateDistributedServing(args) } return nil } diff --git a/pkg/apis/serving/distributed_builder.go b/pkg/apis/serving/distributed_builder.go new file mode 100644 index 000000000..bdc6555dc --- /dev/null +++ b/pkg/apis/serving/distributed_builder.go @@ -0,0 +1,445 @@ +// Copyright 2024 The Kubeflow Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package serving + +import ( + "fmt" + "strings" + + "github.com/kubeflow/arena/pkg/apis/types" + "github.com/kubeflow/arena/pkg/argsbuilder" +) + +type DistributedServingJobBuilder struct { + args *types.DistributedServingArgs + argValues map[string]interface{} + argsbuilder.ArgsBuilder +} + +func NewDistributedServingJobBuilder() *DistributedServingJobBuilder { + args := &types.DistributedServingArgs{ + CustomServingArgs: types.CustomServingArgs{ + CommonServingArgs: types.CommonServingArgs{ + ImagePullPolicy: "IfNotPresent", + Replicas: 1, + Shell: "sh", + Namespace: "default", + }, + }, + } + return &DistributedServingJobBuilder{ + args: args, + argValues: map[string]interface{}{}, + ArgsBuilder: argsbuilder.NewDistributedServingArgsBuilder(args), + } +} + +// Name is used to set job name,match option --name +func (b *DistributedServingJobBuilder) Name(name string) *DistributedServingJobBuilder { + if name != "" { + b.args.Name = name + } + return b +} + +// Namespace is used to set job namespace,match option --namespace +func (b *DistributedServingJobBuilder) Namespace(namespace string) *DistributedServingJobBuilder { + if namespace != "" { + b.args.Namespace = namespace + } + return b +} + +// Shell is used to set bash or sh +func (b *DistributedServingJobBuilder) Shell(shell string) *DistributedServingJobBuilder { + if shell != "" { + b.args.Shell = shell + } + return b +} + +// Command is used to set job command +func (b *DistributedServingJobBuilder) Command(args []string) *DistributedServingJobBuilder { + if b.args.Command == "" { + b.args.Command = strings.Join(args, " ") + } + return b +} + +// GPUCount is used to set count of gpu for the job,match the option --gpus +func (b *DistributedServingJobBuilder) GPUCount(count int) *DistributedServingJobBuilder { + if count > 0 { + b.args.GPUCount = count + } + return b +} + +// GPUMemory is used to set gpu memory for the job,match the option --gpumemory +func (b *DistributedServingJobBuilder) GPUMemory(memory int) *DistributedServingJobBuilder { + if memory > 0 { + b.args.GPUMemory = memory + } + return b +} + +// GPUCore is used to set gpu core for the job, match the option --gpucore +func (b *DistributedServingJobBuilder) GPUCore(core int) *DistributedServingJobBuilder { + if core > 0 { + b.args.GPUCore = core + } + return b +} + +// Image is used to set job image,match the option --image +func (b *DistributedServingJobBuilder) Image(image string) *DistributedServingJobBuilder { + if image != "" { + b.args.Image = image + } + return b +} + +// ImagePullPolicy is used to set image pull policy,match the option --image-pull-policy +func (b *DistributedServingJobBuilder) ImagePullPolicy(policy string) *DistributedServingJobBuilder { + if policy != "" { + b.args.ImagePullPolicy = policy + } + return b +} + +// CPU assign cpu limits,match the option --cpu +func (b *DistributedServingJobBuilder) CPU(cpu string) *DistributedServingJobBuilder { + if cpu != "" { + b.args.Cpu = cpu + } + return b +} + +// Memory assign memory limits,match option --memory +func (b *DistributedServingJobBuilder) Memory(memory string) *DistributedServingJobBuilder { + if memory != "" { + b.args.Memory = memory + } + return b +} + +// Envs is used to set env of job containers,match option --env +func (b *DistributedServingJobBuilder) Envs(envs map[string]string) *DistributedServingJobBuilder { + if len(envs) != 0 { + envSlice := []string{} + for key, value := range envs { + envSlice = append(envSlice, fmt.Sprintf("%v=%v", key, value)) + } + b.argValues["env"] = &envSlice + } + return b +} + +// EnvsFromSecret is used to set env of job containers,match option --env-from-secret +func (b *DistributedServingJobBuilder) EnvsFromSecret(envs map[string]string) *DistributedServingJobBuilder { + if len(envs) != 0 { + envSlice := []string{} + for key, value := range envs { + envSlice = append(envSlice, fmt.Sprintf("%v=%v", key, value)) + } + b.argValues["env-from-secret"] = &envSlice + } + return b +} + +// Replicas is used to set serving job replicas,match the option --replicas +func (b *DistributedServingJobBuilder) Replicas(count int) *DistributedServingJobBuilder { + if count > 0 { + b.args.Replicas = count + } + return b +} + +// EnableIstio is used to enable istio,match the option --enable-istio +func (b *DistributedServingJobBuilder) EnableIstio() *DistributedServingJobBuilder { + b.args.EnableIstio = true + return b +} + +// ExposeService is used to expose service,match the option --expose-service +func (b *DistributedServingJobBuilder) ExposeService() *DistributedServingJobBuilder { + b.args.ExposeService = true + return b +} + +// Version is used to set serving job version,match the option --version +func (b *DistributedServingJobBuilder) Version(version string) *DistributedServingJobBuilder { + if version != "" { + b.args.Version = version + } + return b +} + +// Tolerations is used to set tolerations for tolerate nodes,match option --toleration +func (b *DistributedServingJobBuilder) Tolerations(tolerations []string) *DistributedServingJobBuilder { + b.argValues["toleration"] = &tolerations + return b +} + +// NodeSelectors is used to set node selectors for scheduling job,match option --selector +func (b *DistributedServingJobBuilder) NodeSelectors(selectors map[string]string) *DistributedServingJobBuilder { + if len(selectors) != 0 { + selectorsSlice := []string{} + for key, value := range selectors { + selectorsSlice = append(selectorsSlice, fmt.Sprintf("%v=%v", key, value)) + } + b.argValues["selector"] = &selectorsSlice + } + return b +} + +// Annotations is used to add annotations for job pods,match option --annotation +func (b *DistributedServingJobBuilder) Annotations(annotations map[string]string) *DistributedServingJobBuilder { + if len(annotations) != 0 { + s := []string{} + for key, value := range annotations { + s = append(s, fmt.Sprintf("%v=%v", key, value)) + } + b.argValues["annotation"] = &s + } + return b +} + +// Labels is used to add labels for job +func (b *DistributedServingJobBuilder) Labels(labels map[string]string) *DistributedServingJobBuilder { + if len(labels) != 0 { + s := []string{} + for key, value := range labels { + s = append(s, fmt.Sprintf("%v=%v", key, value)) + } + b.argValues["label"] = &s + } + return b +} + +// Datas is used to mount k8s pvc to job pods,match option --data +func (b *DistributedServingJobBuilder) Datas(volumes map[string]string) *DistributedServingJobBuilder { + if len(volumes) != 0 { + s := []string{} + for key, value := range volumes { + s = append(s, fmt.Sprintf("%v:%v", key, value)) + } + b.argValues["data"] = &s + } + return b +} + +// DataSubPathExprs is used to mount k8s pvc subpath to job pods,match option data-subpath-expr +func (b *DistributedServingJobBuilder) DataSubPathExprs(exprs map[string]string) *DistributedServingJobBuilder { + if len(exprs) != 0 { + s := []string{} + for key, value := range exprs { + s = append(s, fmt.Sprintf("%v:%v", key, value)) + } + b.argValues["data-subpath-expr"] = &s + } + return b +} + +func (b *DistributedServingJobBuilder) TempDirs(volumes map[string]string) *DistributedServingJobBuilder { + if len(volumes) != 0 { + s := []string{} + for key, value := range volumes { + s = append(s, fmt.Sprintf("%v:%v", key, value)) + } + b.argValues["temp-dir"] = &s + } + return b +} + +func (b *DistributedServingJobBuilder) EmptyDirSubPathExprs(exprs map[string]string) *DistributedServingJobBuilder { + if len(exprs) != 0 { + s := []string{} + for key, value := range exprs { + s = append(s, fmt.Sprintf("%v:%v", key, value)) + } + b.argValues["temp-dir-subpath-expr"] = &s + } + return b +} + +// DataDirs is used to mount host files to job containers,match option --data-dir +func (b *DistributedServingJobBuilder) DataDirs(volumes map[string]string) *DistributedServingJobBuilder { + if len(volumes) != 0 { + s := []string{} + for key, value := range volumes { + s = append(s, fmt.Sprintf("%v:%v", key, value)) + } + b.argValues["data-dir"] = &s + } + return b +} + +// Port is used to set port,match the option --port +func (b *DistributedServingJobBuilder) Port(port int) *DistributedServingJobBuilder { + if port > 0 { + b.args.Port = port + } + return b +} + +// RestfulPort is used to set restful port,match the option --restful-port +func (b *DistributedServingJobBuilder) RestfulPort(port int) *DistributedServingJobBuilder { + if port > 0 { + b.args.RestfulPort = port + } + return b +} + +// MetricsPort is used to set metrics port,match the option --metrics-port +func (b *DistributedServingJobBuilder) MetricsPort(port int) *DistributedServingJobBuilder { + if port > 0 { + b.args.MetricsPort = port + } + return b +} + +// Masters is used to set master pods number,match the option --masters +func (b *DistributedServingJobBuilder) Masters(masters int) *DistributedServingJobBuilder { + if masters > 0 { + b.args.Masters = masters + } + return b +} + +// Workers is used to set worker pods number,match the option --workers +func (b *DistributedServingJobBuilder) Workers(workers int) *DistributedServingJobBuilder { + if workers > 0 { + b.args.Workers = workers + } + return b +} + +// MasterCpu is used to set master pods cpu,match the option --master-cpu +func (b *DistributedServingJobBuilder) MasterCpu(cpu string) *DistributedServingJobBuilder { + if cpu != "" { + b.args.MasterCpu = cpu + } + return b +} + +// WorkerCpu is used to set worker pods cpu,match the option --worker-cpu +func (b *DistributedServingJobBuilder) WorkerCpu(cpu string) *DistributedServingJobBuilder { + if cpu != "" { + b.args.WorkerCpu = cpu + } + return b +} + +// MasterGpus is used to set master pods gpus,match the option --master-gpus +func (b *DistributedServingJobBuilder) MasterGpus(gpus int) *DistributedServingJobBuilder { + if gpus > 0 { + b.args.MasterGPUCount = gpus + } + return b +} + +// WorkerGpus is used to set worker pods gpus,match the option --worker-gpus +func (b *DistributedServingJobBuilder) WorkerGpus(gpus int) *DistributedServingJobBuilder { + if gpus > 0 { + b.args.WorkerGPUCount = gpus + } + return b +} + +// MasterMemory is used to set master pods memory,match the option --master-memory +func (b *DistributedServingJobBuilder) MasterMemory(memory string) *DistributedServingJobBuilder { + if memory != "" { + b.args.MasterMemory = memory + } + return b +} + +// WorkerMemory is used to set worker pods memory,match the option --worker-memory +func (b *DistributedServingJobBuilder) WorkerMemory(memory string) *DistributedServingJobBuilder { + if memory != "" { + b.args.WorkerMemory = memory + } + return b +} + +// MasterGPUMemory is used to set master pods memory,match the option --master-gpumemory +func (b *DistributedServingJobBuilder) MasterGPUMemory(gpuMemory int) *DistributedServingJobBuilder { + if gpuMemory > 0 { + b.args.MasterGPUMemory = gpuMemory + } + return b +} + +// WorkerGPUMemory is used to set worker pods memory,match the option --worker-gpumemory +func (b *DistributedServingJobBuilder) WorkerGPUMemory(gpuMemory int) *DistributedServingJobBuilder { + if gpuMemory > 0 { + b.args.WorkerGPUMemory = gpuMemory + } + return b +} + +// MasterGPUCore is used to set master pods gpucore,match the option --master-gpucore +func (b *DistributedServingJobBuilder) MasterGPUCore(gpucore int) *DistributedServingJobBuilder { + if gpucore > 0 { + b.args.MasterGPUCore = gpucore + } + return b +} + +// WorkerGPUCore is used to set worker pods gpucore,match the option --worker-gpucore +func (b *DistributedServingJobBuilder) WorkerGPUCore(gpucore int) *DistributedServingJobBuilder { + if gpucore > 0 { + b.args.WorkerGPUCore = gpucore + } + return b +} + +// MasterCommand is used to set master pods command,match the option --master-command +func (b *DistributedServingJobBuilder) MasterCommand(command string) *DistributedServingJobBuilder { + if command != "" { + b.args.MasterCommand = command + } + return b +} + +// WorkerCommand is used to set worker pods command,match the option --worker-command +func (b *DistributedServingJobBuilder) WorkerCommand(command string) *DistributedServingJobBuilder { + if command != "" { + b.args.WorkerCommand = command + } + return b +} + +// InitBackend is used to set init backend,match the option --init-backend +func (b *DistributedServingJobBuilder) InitBackend(backend string) *DistributedServingJobBuilder { + if backend != "" { + b.args.InitBackend = backend + } + return b +} + +// Build is used to build the job +func (b *DistributedServingJobBuilder) Build() (*Job, error) { + for key, value := range b.argValues { + b.AddArgValue(key, value) + } + if err := b.PreBuild(); err != nil { + return nil, err + } + if err := b.ArgsBuilder.Build(); err != nil { + return nil, err + } + return NewJob(b.args.Name, types.DistributedServingJob, b.args), nil +} diff --git a/pkg/apis/serving/update_distributed_builder.go b/pkg/apis/serving/update_distributed_builder.go new file mode 100644 index 000000000..b8b1015f2 --- /dev/null +++ b/pkg/apis/serving/update_distributed_builder.go @@ -0,0 +1,262 @@ +// Copyright 2024 The Kubeflow Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package serving + +import ( + "fmt" + "strings" + + "github.com/kubeflow/arena/pkg/apis/types" + "github.com/kubeflow/arena/pkg/argsbuilder" +) + +type UpdateDistributedServingJobBuilder struct { + args *types.UpdateDistributedServingArgs + argValues map[string]interface{} + argsbuilder.ArgsBuilder +} + +func NewUpdateDistributedServingJobBuilder() *UpdateDistributedServingJobBuilder { + args := &types.UpdateDistributedServingArgs{ + CommonUpdateServingArgs: types.CommonUpdateServingArgs{ + Replicas: 1, + }, + } + return &UpdateDistributedServingJobBuilder{ + args: args, + argValues: map[string]interface{}{}, + ArgsBuilder: argsbuilder.NewUpdateDistributedServingArgsBuilder(args), + } +} + +// Name is used to set job name,match option --name +func (b *UpdateDistributedServingJobBuilder) Name(name string) *UpdateDistributedServingJobBuilder { + if name != "" { + b.args.Name = name + } + return b +} + +// Namespace is used to set job namespace,match option --namespace +func (b *UpdateDistributedServingJobBuilder) Namespace(namespace string) *UpdateDistributedServingJobBuilder { + if namespace != "" { + b.args.Namespace = namespace + } + return b +} + +// Version is used to set serving job version, match the option --version +func (b *UpdateDistributedServingJobBuilder) Version(version string) *UpdateDistributedServingJobBuilder { + if version != "" { + b.args.Version = version + } + return b +} + +// Command is used to set job command +func (b *UpdateDistributedServingJobBuilder) Command(args []string) *UpdateDistributedServingJobBuilder { + if b.args.Command == "" { + b.args.Command = strings.Join(args, " ") + } + return b +} + +// Image is used to set job image,match the option --image +func (b *UpdateDistributedServingJobBuilder) Image(image string) *UpdateDistributedServingJobBuilder { + if image != "" { + b.args.Image = image + } + return b +} + +// Envs is used to set env of job containers,match option --env +func (b *UpdateDistributedServingJobBuilder) Envs(envs map[string]string) *UpdateDistributedServingJobBuilder { + if len(envs) != 0 { + envSlice := []string{} + for key, value := range envs { + envSlice = append(envSlice, fmt.Sprintf("%v=%v", key, value)) + } + b.argValues["env"] = &envSlice + } + return b +} + +// Tolerations are used to set tolerations for tolerate nodes, match option --toleration +func (b *UpdateDistributedServingJobBuilder) Tolerations(tolerations []string) *UpdateDistributedServingJobBuilder { + b.argValues["toleration"] = &tolerations + return b +} + +// NodeSelectors is used to set node selectors for scheduling job, match option --selector +func (b *UpdateDistributedServingJobBuilder) NodeSelectors(selectors map[string]string) *UpdateDistributedServingJobBuilder { + if len(selectors) != 0 { + selectorsSlice := []string{} + for key, value := range selectors { + selectorsSlice = append(selectorsSlice, fmt.Sprintf("%v=%v", key, value)) + } + b.argValues["selector"] = &selectorsSlice + } + return b +} + +// Annotations is used to add annotations for job pods,match option --annotation +func (b *UpdateDistributedServingJobBuilder) Annotations(annotations map[string]string) *UpdateDistributedServingJobBuilder { + if len(annotations) != 0 { + s := []string{} + for key, value := range annotations { + s = append(s, fmt.Sprintf("%v=%v", key, value)) + } + b.argValues["annotation"] = &s + } + return b +} + +// Labels is used to add labels for job +func (b *UpdateDistributedServingJobBuilder) Labels(labels map[string]string) *UpdateDistributedServingJobBuilder { + if len(labels) != 0 { + s := []string{} + for key, value := range labels { + s = append(s, fmt.Sprintf("%v=%v", key, value)) + } + b.argValues["label"] = &s + } + return b +} + +// Replicas is used to set serving job replicas,match the option --replicas +func (b *UpdateDistributedServingJobBuilder) Replicas(count int) *UpdateDistributedServingJobBuilder { + if count > 0 { + b.args.Replicas = count + } + return b +} + +// Workers is used to set worker pods number,match the option --workers +func (b *UpdateDistributedServingJobBuilder) Workers(workers int) *UpdateDistributedServingJobBuilder { + if workers > 0 { + b.args.Workers = workers + } + return b +} + +// MasterCpu is used to set master pods cpu,match the option --master-cpu +func (b *UpdateDistributedServingJobBuilder) MasterCpu(cpu string) *UpdateDistributedServingJobBuilder { + if cpu != "" { + b.args.MasterCpu = cpu + } + return b +} + +// WorkerCpu is used to set worker pods cpu,match the option --worker-cpu +func (b *UpdateDistributedServingJobBuilder) WorkerCpu(cpu string) *UpdateDistributedServingJobBuilder { + if cpu != "" { + b.args.WorkerCpu = cpu + } + return b +} + +// MasterGpus is used to set master pods gpus,match the option --master-gpus +func (b *UpdateDistributedServingJobBuilder) MasterGpus(gpus int) *UpdateDistributedServingJobBuilder { + if gpus > 0 { + b.args.MasterGPUCount = gpus + } + return b +} + +// WorkerGpus is used to set worker pods gpus,match the option --worker-gpus +func (b *UpdateDistributedServingJobBuilder) WorkerGpus(gpus int) *UpdateDistributedServingJobBuilder { + if gpus > 0 { + b.args.WorkerGPUCount = gpus + } + return b +} + +// MasterGPUMemory is used to set master pods memory,match the option --master-gpumemory +func (b *UpdateDistributedServingJobBuilder) MasterGPUMemory(gpuMemory int) *UpdateDistributedServingJobBuilder { + if gpuMemory > 0 { + b.args.MasterGPUMemory = gpuMemory + } + return b +} + +// WorkerGPUMemory is used to set worker pods memory,match the option --worker-gpumemory +func (b *UpdateDistributedServingJobBuilder) WorkerGPUMemory(gpuMemory int) *UpdateDistributedServingJobBuilder { + if gpuMemory > 0 { + b.args.WorkerGPUMemory = gpuMemory + } + return b +} + +// MasterGPUCore is used to set master pods gpucore,match the option --master-gpucore +func (b *UpdateDistributedServingJobBuilder) MasterGPUCore(gpucore int) *UpdateDistributedServingJobBuilder { + if gpucore > 0 { + b.args.MasterGPUCore = gpucore + } + return b +} + +// WorkerGPUCore is used to set worker pods gpucore,match the option --worker-gpucore +func (b *UpdateDistributedServingJobBuilder) WorkerGPUCore(gpucore int) *UpdateDistributedServingJobBuilder { + if gpucore > 0 { + b.args.WorkerGPUCore = gpucore + } + return b +} + +// MasterMemory is used to set master pods memory,match the option --master-memory +func (b *UpdateDistributedServingJobBuilder) MasterMemory(memory string) *UpdateDistributedServingJobBuilder { + if memory != "" { + b.args.MasterMemory = memory + } + return b +} + +// WorkerMemory is used to set worker pods memory,match the option --worker-memory +func (b *UpdateDistributedServingJobBuilder) WorkerMemory(memory string) *UpdateDistributedServingJobBuilder { + if memory != "" { + b.args.WorkerMemory = memory + } + return b +} + +// MasterCommand is used to set master pods command,match the option --master-command +func (b *UpdateDistributedServingJobBuilder) MasterCommand(command string) *UpdateDistributedServingJobBuilder { + if command != "" { + b.args.MasterCommand = command + } + return b +} + +// WorkerCommand is used to set worker pods command,match the option --worker-command +func (b *UpdateDistributedServingJobBuilder) WorkerCommand(command string) *UpdateDistributedServingJobBuilder { + if command != "" { + b.args.WorkerCommand = command + } + return b +} + +// Build is used to build the job +func (b *UpdateDistributedServingJobBuilder) Build() (*Job, error) { + for key, value := range b.argValues { + b.AddArgValue(key, value) + } + if err := b.PreBuild(); err != nil { + return nil, err + } + if err := b.ArgsBuilder.Build(); err != nil { + return nil, err + } + return NewJob(b.args.Name, types.DistributedServingJob, b.args), nil +} diff --git a/pkg/apis/types/serving.go b/pkg/apis/types/serving.go index 2a47c24b4..43d7dcec6 100644 --- a/pkg/apis/types/serving.go +++ b/pkg/apis/types/serving.go @@ -33,6 +33,8 @@ const ( TritonServingJob ServingJobType = "triton-serving" // CustomServingJob defines the custom serving job CustomServingJob ServingJobType = "custom-serving" + // DistributedServingJob defines the distributed serving job + DistributedServingJob ServingJobType = "distributed-serving" // AllServingJob represents all serving job type AllServingJob ServingJobType = "" // UnknownServingJob defines the unknown serving job @@ -82,6 +84,11 @@ var ServingTypeMap = map[ServingJobType]ServingTypeInfo{ Alias: "Seldon", Shorthand: "seldon", }, + DistributedServingJob: { + Name: DistributedServingJob, + Alias: "Distributed", + Shorthand: "distributed", + }, } // ServingJobInfo display serving job information @@ -285,6 +292,25 @@ type TritonServingArgs struct { CommonServingArgs `yaml:",inline"` } +type DistributedServingArgs struct { + Masters int `yaml:"masters"` // --masters + Workers int `yaml:"workers"` // --workers + MasterCpu string `yaml:"masterCpus"` // --master-cpu + WorkerCpu string `yaml:"workerCpus"` // --worker-cpu + MasterGPUCount int `yaml:"masterGpus"` // --master-gpus + WorkerGPUCount int `yaml:"workerGpus"` // --worker-gpus + MasterMemory string `yaml:"masterMemory"` // --master-memory + WorkerMemory string `yaml:"workerMemory"` // --worker-memory + MasterGPUMemory int `yaml:"masterGPUMemory"` // --master-gpumemory + WorkerGPUMemory int `yaml:"workerGPUMemory"` // --worker-gpumemory + MasterGPUCore int `yaml:"masterGPUCore"` // --master-gpucore + WorkerGPUCore int `yaml:"workerGPUCore"` // --worker-gpucore + MasterCommand string `yaml:"masterCommand"` // --master-command + WorkerCommand string `yaml:"workerCommand"` // --worker-command + InitBackend string `yaml:"initBackend"` // --init-backend + CustomServingArgs `yaml:",inline"` +} + type ModelFormat struct { // Name of the model format. // +required diff --git a/pkg/apis/types/update_serving.go b/pkg/apis/types/update_serving.go index 06f6a8224..429e863a7 100644 --- a/pkg/apis/types/update_serving.go +++ b/pkg/apis/types/update_serving.go @@ -70,3 +70,20 @@ type UpdateKServeArgs struct { Port int `yaml:"port"` // --port CommonUpdateServingArgs `yaml:",inline"` } + +type UpdateDistributedServingArgs struct { + Workers int `yaml:"workers"` // --workers + MasterCpu string `yaml:"masterCPU"` // --master-cpu + WorkerCpu string `yaml:"workerCPU"` // --worker-cpu + MasterGPUCount int `yaml:"masterGPUCount"` // master-gpus + WorkerGPUCount int `yaml:"workerGPUCount"` // worker-gpus + MasterMemory string `yaml:"masterMemory"` // master-memory + WorkerMemory string `yaml:"workerMemory"` // worker-memory + MasterGPUMemory int `yaml:"masterGPUMemory"` // master-gpumemory + WorkerGPUMemory int `yaml:"workerGPUMemory"` // worker-gpumemory + MasterGPUCore int `yaml:"masterGPUCore"` // master-gpucore + WorkerGPUCore int `yaml:"workerGPUCore"` // worker-gpucore + MasterCommand string `yaml:"masterCommand"` // master-command + WorkerCommand string `yaml:"workerCommand"` // worker-command + CommonUpdateServingArgs `yaml:",inline"` +} diff --git a/pkg/argsbuilder/serving_distributed.go b/pkg/argsbuilder/serving_distributed.go new file mode 100644 index 000000000..eba02174f --- /dev/null +++ b/pkg/argsbuilder/serving_distributed.go @@ -0,0 +1,174 @@ +// Copyright 2024 The Kubeflow Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package argsbuilder + +import ( + "fmt" + "reflect" + "strings" + + "github.com/kubeflow/arena/pkg/apis/types" + "github.com/spf13/cobra" +) + +type DistributedServingArgsBuilder struct { + args *types.DistributedServingArgs + argValues map[string]interface{} + subBuilders map[string]ArgsBuilder +} + +func NewDistributedServingArgsBuilder(args *types.DistributedServingArgs) ArgsBuilder { + args.Type = types.DistributedServingJob + s := &DistributedServingArgsBuilder{ + args: args, + argValues: map[string]interface{}{}, + subBuilders: map[string]ArgsBuilder{}, + } + s.AddSubBuilder( + NewCustomServingArgsBuilder(&s.args.CustomServingArgs), + ) + return s +} + +func (s *DistributedServingArgsBuilder) GetName() string { + items := strings.Split(fmt.Sprintf("%v", reflect.TypeOf(*s)), ".") + return items[len(items)-1] +} + +func (s *DistributedServingArgsBuilder) AddSubBuilder(builders ...ArgsBuilder) ArgsBuilder { + for _, b := range builders { + s.subBuilders[b.GetName()] = b + } + return s +} + +func (s *DistributedServingArgsBuilder) AddArgValue(key string, value interface{}) ArgsBuilder { + for name := range s.subBuilders { + s.subBuilders[name].AddArgValue(key, value) + } + s.argValues[key] = value + return s +} + +func (s *DistributedServingArgsBuilder) AddCommandFlags(command *cobra.Command) { + for name := range s.subBuilders { + s.subBuilders[name].AddCommandFlags(command) + } + command.Flags().IntVar(&s.args.Masters, "masters", 1, "the number of the master pods (p.s. only support 1 master currently)") + command.Flags().IntVar(&s.args.Workers, "workers", 0, "the number of the worker pods") + command.Flags().StringVar(&s.args.MasterCpu, "master-cpu", "", "the cpu resource to use for the master pod, like 1 for 1 core") + command.Flags().StringVar(&s.args.WorkerCpu, "worker-cpu", "", "the cpu resource to use for each worker pods, like 1 for 1 core") + command.Flags().IntVar(&s.args.MasterGPUCount, "master-gpus", 0, "the gpu resource to use for the master pod, like 1 for 1 gpu") + command.Flags().IntVar(&s.args.WorkerGPUCount, "worker-gpus", 0, "the gpu resource to use for each worker pods, like 1 for 1 gpu") + command.Flags().StringVar(&s.args.MasterMemory, "master-memory", "", "the memory resource to use for the master pod, like 1Gi") + command.Flags().StringVar(&s.args.WorkerMemory, "worker-memory", "", "the memory resource to use for the worker pods, like 1Gi") + command.Flags().IntVar(&s.args.MasterGPUMemory, "master-gpumemory", 0, "the limit GPU memory of master pod to run the serve") + command.Flags().IntVar(&s.args.WorkerGPUMemory, "worker-gpumemory", 0, "the limit GPU memory of each worker pods to run the serve") + command.Flags().IntVar(&s.args.MasterGPUCore, "master-gpucore", 0, "the limit GPU core of master pod to run the serve") + command.Flags().IntVar(&s.args.WorkerGPUCore, "worker-gpucore", 0, "the limit GPU core of each worker pods to run the serve") + command.Flags().StringVar(&s.args.MasterCommand, "master-command", "", "the command to run for the master pod") + command.Flags().StringVar(&s.args.WorkerCommand, "worker-command", "", "the command to run of each worker pods") + command.Flags().StringVar(&s.args.InitBackend, "init-backend", "", "specity the init backend for distributed serving job. Currently only support ray. support: ray") + + _ = command.Flags().MarkHidden("cpu") + _ = command.Flags().MarkHidden("memory") + _ = command.Flags().MarkHidden("gpus") + _ = command.Flags().MarkHidden("gpumemory") + _ = command.Flags().MarkHidden("gpucore") +} + +func (s *DistributedServingArgsBuilder) PreBuild() error { + for name := range s.subBuilders { + if err := s.subBuilders[name].PreBuild(); err != nil { + return err + } + } + return nil +} + +func (s *DistributedServingArgsBuilder) Build() error { + for name := range s.subBuilders { + if err := s.subBuilders[name].Build(); err != nil { + return err + } + } + if err := s.check(); err != nil { + return err + } + if err := s.setType(); err != nil { + return err + } + if err := s.setNvidiaENV(); err != nil { + return err + } + if err := s.setCommand(); err != nil { + return err + } + return nil +} + +func (s *DistributedServingArgsBuilder) setType() error { + s.args.Type = types.DistributedServingJob + return nil +} + +func (s *DistributedServingArgsBuilder) setCommand() error { + if s.args.Command != "" { + s.args.MasterCommand = s.args.Command + s.args.WorkerCommand = s.args.Command + } + return nil +} + +func (s *DistributedServingArgsBuilder) setNvidiaENV() error { + if s.args.Envs == nil { + s.args.Envs = map[string]string{} + } + // Since master and worker share the same envs, but they may have + // different gpu resource, we delete the NVIDIA_VISIBLE_DEVICES env + // and set it in helm chart manually + delete(s.args.Envs, "NVIDIA_VISIBLE_DEVICES") + return nil +} + +func (s *DistributedServingArgsBuilder) check() error { + if s.args.Masters != 1 { + return fmt.Errorf("can not change master number, only support 1 master currently") + } + if s.args.Command != "" { + if s.args.MasterCommand != "" || s.args.WorkerCommand != "" { + return fmt.Errorf("--command and --master-command/--worker-command can not be set at the same time") + } + } else { + if s.args.MasterCommand == "" || s.args.WorkerCommand == "" { + return fmt.Errorf("--command or --master-command/--worker-command must be set") + } + } + if s.args.MasterGPUCount < 0 || s.args.WorkerGPUCount < 0 { + return fmt.Errorf("--master-gpus/--worker-gpus is invalid") + } + if s.args.MasterGPUMemory < 0 || s.args.WorkerGPUMemory < 0 { + return fmt.Errorf("--master-gpumemory/--worker-gpumemory is invalid") + } + if s.args.MasterGPUCore < 0 || s.args.WorkerGPUCore < 0 { + return fmt.Errorf("--master-gpucore/--worker-gpucore is invalid") + } + if s.args.InitBackend != "" { + if s.args.InitBackend != "ray" { + return fmt.Errorf("invalid init-backend value: %s, support: ray. ", s.args.InitBackend) + } + } + return nil +} diff --git a/pkg/argsbuilder/update_serving_distributed.go b/pkg/argsbuilder/update_serving_distributed.go new file mode 100644 index 000000000..1f5847cc2 --- /dev/null +++ b/pkg/argsbuilder/update_serving_distributed.go @@ -0,0 +1,134 @@ +// Copyright 2018 The Kubeflow Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License +package argsbuilder + +import ( + "fmt" + "reflect" + "strings" + + "github.com/kubeflow/arena/pkg/apis/types" + "github.com/spf13/cobra" +) + +type UpdateDistributedServingArgsBuilder struct { + args *types.UpdateDistributedServingArgs + argValues map[string]interface{} + subBuilders map[string]ArgsBuilder +} + +func NewUpdateDistributedServingArgsBuilder(args *types.UpdateDistributedServingArgs) ArgsBuilder { + args.Type = types.DistributedServingJob + s := &UpdateDistributedServingArgsBuilder{ + args: args, + argValues: map[string]interface{}{}, + subBuilders: map[string]ArgsBuilder{}, + } + s.AddSubBuilder( + NewUpdateServingArgsBuilder(&s.args.CommonUpdateServingArgs), + ) + return s +} + +func (s *UpdateDistributedServingArgsBuilder) GetName() string { + items := strings.Split(fmt.Sprintf("%v", reflect.TypeOf(*s)), ".") + return items[len(items)-1] +} + +func (s *UpdateDistributedServingArgsBuilder) AddSubBuilder(builders ...ArgsBuilder) ArgsBuilder { + for _, b := range builders { + s.subBuilders[b.GetName()] = b + } + return s +} + +func (s *UpdateDistributedServingArgsBuilder) AddArgValue(key string, value interface{}) ArgsBuilder { + for name := range s.subBuilders { + s.subBuilders[name].AddArgValue(key, value) + } + s.argValues[key] = value + return s +} + +func (s *UpdateDistributedServingArgsBuilder) AddCommandFlags(command *cobra.Command) { + for name := range s.subBuilders { + s.subBuilders[name].AddCommandFlags(command) + } + + command.Flags().IntVar(&s.args.Workers, "workers", 0, "the number of the worker pods") + command.Flags().StringVar(&s.args.MasterCpu, "master-cpu", "", "the cpu resource to use for the master pods, like 1 for 1 core") + command.Flags().StringVar(&s.args.WorkerCpu, "worker-cpu", "", "the cpu resource to use for the worker pods, like 1 for 1 core") + command.Flags().IntVar(&s.args.MasterGPUCount, "master-gpus", 0, "the gpu resource to use for the master pods, like 1 for 1 gpu") + command.Flags().IntVar(&s.args.WorkerGPUCount, "worker-gpus", 0, "the gpu resource to use for the master pods, like 1 for 1 gpu") + command.Flags().IntVar(&s.args.MasterGPUMemory, "master-gpumemory", 0, "the limit GPU memory of master pod to run the serve.") + command.Flags().IntVar(&s.args.WorkerGPUMemory, "worker-gpumemory", 0, "the limit GPU memory of each worker pods to run the serve.") + command.Flags().IntVar(&s.args.MasterGPUCore, "master-gpucore", 0, "the limit GPU core of master pod to run the serve.") + command.Flags().IntVar(&s.args.WorkerGPUCore, "worker-gpucore", 0, "the limit GPU core of each worker pods to run the serve.") + command.Flags().StringVar(&s.args.MasterMemory, "master-memory", "", "the memory resource to use for the master pods, like 1Gi") + command.Flags().StringVar(&s.args.WorkerMemory, "worker-memory", "", "the memory resource to use for the worker pods, like 1Gi") + command.Flags().StringVar(&s.args.MasterCommand, "master-command", "", "the command to run for the master pod") + command.Flags().StringVar(&s.args.WorkerCommand, "worker-command", "", "the command to run of each worker pods") + + _ = command.Flags().MarkHidden("cpu") + _ = command.Flags().MarkHidden("memory") + _ = command.Flags().MarkHidden("gpus") + _ = command.Flags().MarkHidden("gpumemory") + _ = command.Flags().MarkHidden("gpucore") +} + +func (s *UpdateDistributedServingArgsBuilder) PreBuild() error { + for name := range s.subBuilders { + if err := s.subBuilders[name].PreBuild(); err != nil { + return err + } + } + + return nil +} + +func (s *UpdateDistributedServingArgsBuilder) Build() error { + for name := range s.subBuilders { + if err := s.subBuilders[name].Build(); err != nil { + return err + } + } + if err := s.check(); err != nil { + return err + } + if err := s.setCommand(); err != nil { + return err + } + + return nil +} + +func (s *UpdateDistributedServingArgsBuilder) setCommand() error { + if s.args.Command != "" { + s.args.MasterCommand = s.args.Command + s.args.WorkerCommand = s.args.Command + } + return nil +} + +func (s *UpdateDistributedServingArgsBuilder) check() error { + if s.args.Workers < 0 { + return fmt.Errorf("--workers can not be negative") + } + if s.args.Command != "" { + if s.args.MasterCommand != "" || s.args.WorkerCommand != "" { + return fmt.Errorf("--command and --master-command/--worker-command can not be set at the same time") + } + } + return nil +} diff --git a/pkg/commands/serving/serving.go b/pkg/commands/serving/serving.go index 9ee2d8976..418887093 100644 --- a/pkg/commands/serving/serving.go +++ b/pkg/commands/serving/serving.go @@ -25,7 +25,8 @@ Available Commands: custom Submit a Custom Serving Job kfserving,kfs Submit a kubeflow Serving Job kserve Submit a KServe Serving Job - seldon Submit a Seldon Serving Job` + seldon Submit a Seldon Serving Job + distributed Submit a Distributed Serving Job` ) func NewServeCommand() *cobra.Command { @@ -44,6 +45,7 @@ func NewServeCommand() *cobra.Command { command.AddCommand(NewSubmitKServeJobCommand()) command.AddCommand(NewSubmitSeldonServingJobCommand()) command.AddCommand(NewSubmitTritonServingJobCommand()) + command.AddCommand(NewSubmitDistributedServingJobCommand()) command.AddCommand(NewListCommand()) command.AddCommand(NewDeleteCommand()) command.AddCommand(NewGetCommand()) diff --git a/pkg/commands/serving/serving_distributed.go b/pkg/commands/serving/serving_distributed.go new file mode 100644 index 000000000..22cf72a2c --- /dev/null +++ b/pkg/commands/serving/serving_distributed.go @@ -0,0 +1,57 @@ +// Copyright 2024 The Kubeflow Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package serving + +import ( + "fmt" + + "github.com/kubeflow/arena/pkg/apis/arenaclient" + "github.com/kubeflow/arena/pkg/apis/config" + "github.com/kubeflow/arena/pkg/apis/serving" + "github.com/kubeflow/arena/pkg/apis/types" + "github.com/spf13/cobra" + "github.com/spf13/viper" +) + +func NewSubmitDistributedServingJobCommand() *cobra.Command { + builder := serving.NewDistributedServingJobBuilder() + var command = &cobra.Command{ + Use: "distributed", + Short: "Submit distributed server job to deploy and serve machine learning models.", + Aliases: []string{"distributed"}, + PreRun: func(cmd *cobra.Command, args []string) { + _ = viper.BindPFlags(cmd.Flags()) + }, + RunE: func(cmd *cobra.Command, args []string) error { + client, err := arenaclient.NewArenaClient(types.ArenaClientArgs{ + Kubeconfig: viper.GetString("config"), + LogLevel: viper.GetString("loglevel"), + Namespace: viper.GetString("namespace"), + ArenaNamespace: viper.GetString("arena-namespace"), + IsDaemonMode: false, + }) + if err != nil { + return fmt.Errorf("failed to create arena client: %v\n", err) + } + job, err := builder.Namespace(config.GetArenaConfiger().GetNamespace()).Command(args).Build() + if err != nil { + return fmt.Errorf("failed to validate command args: %v", err) + } + return client.Serving().Submit(job) + }, + } + builder.AddCommandFlags(command) + return command +} diff --git a/pkg/commands/serving/update.go b/pkg/commands/serving/update.go index 0676464bc..82016ad68 100644 --- a/pkg/commands/serving/update.go +++ b/pkg/commands/serving/update.go @@ -23,7 +23,8 @@ Available Commands: tensorflow,tf Update a TensorFlow Serving Job triton Update a Nvidia Triton Serving Job custom Update a Custom Serving Job - kserve Update a KServe Serving Job` + kserve Update a KServe Serving Job + distributed Update a Distributed Serving Job` ) func NewUpdateCommand() *cobra.Command { @@ -39,6 +40,7 @@ func NewUpdateCommand() *cobra.Command { command.AddCommand(NewUpdateTritonCommand()) command.AddCommand(NewUpdateCustomCommand()) command.AddCommand(NewUpdateKServeCommand()) + command.AddCommand(NewUpdateDistributedCommand()) return command } diff --git a/pkg/commands/serving/update_distributed.go b/pkg/commands/serving/update_distributed.go new file mode 100644 index 000000000..d366ec1b5 --- /dev/null +++ b/pkg/commands/serving/update_distributed.go @@ -0,0 +1,60 @@ +// Copyright 2024 The Kubeflow Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package serving + +import ( + "fmt" + + "github.com/kubeflow/arena/pkg/apis/config" + "github.com/kubeflow/arena/pkg/apis/serving" + + "github.com/kubeflow/arena/pkg/apis/arenaclient" + "github.com/kubeflow/arena/pkg/apis/types" + "github.com/spf13/cobra" + "github.com/spf13/viper" +) + +// NewUpdateDistributedCommand update a distributed serving +func NewUpdateDistributedCommand() *cobra.Command { + builder := serving.NewUpdateDistributedServingJobBuilder() + var command = &cobra.Command{ + Use: "distributed", + Short: "Update a distributed serving job and its associated instances", + PreRun: func(cmd *cobra.Command, args []string) { + _ = viper.BindPFlags(cmd.Flags()) + }, + RunE: func(cmd *cobra.Command, args []string) error { + client, err := arenaclient.NewArenaClient(types.ArenaClientArgs{ + Kubeconfig: viper.GetString("config"), + LogLevel: viper.GetString("loglevel"), + Namespace: viper.GetString("namespace"), + ArenaNamespace: viper.GetString("arena-namespace"), + IsDaemonMode: false, + }) + if err != nil { + return err + } + + job, err := builder.Namespace(config.GetArenaConfiger().GetNamespace()).Command(args).Build() + if err != nil { + return fmt.Errorf("failed to validate command args: %v", err) + } + return client.Serving().Update(job) + }, + } + + builder.AddCommandFlags(command) + return command +} diff --git a/pkg/k8saccesser/const.go b/pkg/k8saccesser/const.go index e9e982527..51978e69f 100644 --- a/pkg/k8saccesser/const.go +++ b/pkg/k8saccesser/const.go @@ -35,4 +35,7 @@ const ( RayJobCRDName = "rayjobs.ray.io" RayJobCRDNameInDaemonMode = "RayJob.ray.io" + + LWSCRDName = "leaderworkersets.leaderworkerset.x-k8s.io" + LWSCRDNameInDaemonMode = "Leaderworkerset.leaderworkerset.x-k8s.io" ) diff --git a/pkg/k8saccesser/k8s_accesser.go b/pkg/k8saccesser/k8s_accesser.go index c8077335a..799c4849b 100644 --- a/pkg/k8saccesser/k8s_accesser.go +++ b/pkg/k8saccesser/k8s_accesser.go @@ -54,6 +54,8 @@ import ( volcanovesioned "github.com/kubeflow/arena/pkg/operators/volcano-operator/client/clientset/versioned" ray_v1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" rayversioned "github.com/ray-project/kuberay/ray-operator/pkg/client/clientset/versioned" + lws_v1 "sigs.k8s.io/lws/api/leaderworkerset/v1" + lwsversioned "sigs.k8s.io/lws/client-go/clientset/versioned" ) var accesser *k8sResourceAccesser @@ -651,6 +653,37 @@ func (k *k8sResourceAccesser) ListSparkJobs(sparkjobClient *sparkversioned.Clien return jobs, nil } +func (k *k8sResourceAccesser) ListLWSJobs(lwsClient *lwsversioned.Clientset, namespace string, labels string) ([]*lws_v1.LeaderWorkerSet, error) { + jobs := []*lws_v1.LeaderWorkerSet{} + jobList := &lws_v1.LeaderWorkerSetList{} + var err error + labelSelector, err := parseLabelSelector(labels) + if err != nil { + return nil, err + } + if k.cacheEnabled { + err = k.cacheClient.List( + context.Background(), + jobList, + client.InNamespace(namespace), + &client.ListOptions{ + LabelSelector: labelSelector, + }) + } else { + jobList, err = lwsClient.LeaderworkersetV1().LeaderWorkerSets(namespace).List( + context.Background(), + metav1.ListOptions{LabelSelector: labelSelector.String()}, + ) + } + if err != nil { + return nil, err + } + for _, job := range jobList.Items { + jobs = append(jobs, job.DeepCopy()) + } + return jobs, nil +} + func (k *k8sResourceAccesser) GetCron(cronClient *cronversioned.Clientset, namespace string, name string) (*cron_v1alpha1.Cron, error) { cron := &cron_v1alpha1.Cron{} var err error @@ -832,6 +865,29 @@ func (k *k8sResourceAccesser) GetSparkJob(sparkjobClient *sparkversioned.Clients return sparkJob, err } +func (k *k8sResourceAccesser) GetLWSJob(lwsClient *lwsversioned.Clientset, namespace string, name string) (*lws_v1.LeaderWorkerSet, error) { + lwsJob := &lws_v1.LeaderWorkerSet{} + var err error + if k.cacheEnabled { + err = k.cacheClient.Get(context.Background(), client.ObjectKey{Namespace: namespace, Name: name}, lwsJob) + if err != nil { + if strings.Contains(err.Error(), fmt.Sprintf(`%v "%v" not found`, LWSCRDNameInDaemonMode, name)) { + return nil, types.ErrTrainingJobNotFound + } + return nil, fmt.Errorf("failed to find lwsjob %v from cache,reason: %v", name, err) + } + } else { + lwsJob, err = lwsClient.LeaderworkersetV1().LeaderWorkerSets(namespace).Get(context.Background(), name, metav1.GetOptions{}) + if err != nil { + if strings.Contains(err.Error(), fmt.Sprintf(`%v "%v" not found`, SparkCRDName, name)) { + return nil, types.ErrTrainingJobNotFound + } + return nil, fmt.Errorf("failed to find lws %v from api server,reason: %v", name, err) + } + } + return lwsJob, err +} + func (k *k8sResourceAccesser) GetService(namespace, name string) (*v1.Service, error) { service := &v1.Service{} var err error diff --git a/pkg/serving/get.go b/pkg/serving/get.go index 5cca97f69..77db4c1fe 100644 --- a/pkg/serving/get.go +++ b/pkg/serving/get.go @@ -96,6 +96,8 @@ func validateJobs(jobs []ServingJob, name string) error { var labels map[string]string if ksjob, ok := s.(*kserveJob); ok { labels = ksjob.inferenceService.Labels + } else if lwsjob, ok := s.(*lwsJob); ok { + labels = lwsjob.lws.Labels } else { labels = s.Deployment().Labels } diff --git a/pkg/serving/serving.go b/pkg/serving/serving.go index f5f4ba6af..20ededb4a 100644 --- a/pkg/serving/serving.go +++ b/pkg/serving/serving.go @@ -64,6 +64,7 @@ func GetAllProcesser() map[types.ServingJobType]Processer { NewTensorrtServingProcesser, NewSeldonServingProcesser, NewTritonServingProcesser, + NewDistributedServingProcesser, } var wg sync.WaitGroup for _, initFunc := range processerInits { diff --git a/pkg/serving/serving_distributed.go b/pkg/serving/serving_distributed.go new file mode 100644 index 000000000..ffd8b4714 --- /dev/null +++ b/pkg/serving/serving_distributed.go @@ -0,0 +1,306 @@ +// Copyright 2024 The Kubeflow Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package serving + +import ( + "fmt" + "strings" + "time" + + "github.com/kubeflow/arena/pkg/apis/config" + "github.com/kubeflow/arena/pkg/apis/types" + "github.com/kubeflow/arena/pkg/k8saccesser" + "github.com/kubeflow/arena/pkg/util" + "github.com/kubeflow/arena/pkg/workflow" + log "github.com/sirupsen/logrus" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + lws_v1 "sigs.k8s.io/lws/api/leaderworkerset/v1" + lws_client "sigs.k8s.io/lws/client-go/clientset/versioned" +) + +type DistributedServingProcesser struct { + lwsClient *lws_client.Clientset + *processer +} + +type lwsJob struct { + lws *lws_v1.LeaderWorkerSet + *servingJob +} + +func NewDistributedServingProcesser() Processer { + p := &processer{ + processerType: types.DistributedServingJob, + client: config.GetArenaConfiger().GetClientSet(), + enable: true, + useIstioGateway: false, + } + + lwsClient := lws_client.NewForConfigOrDie(config.GetArenaConfiger().GetRestConfig()) + return &DistributedServingProcesser{ + lwsClient: lwsClient, + processer: p, + } +} + +func SubmitDistributedServingJob(namespace string, args *types.DistributedServingArgs) (err error) { + nameWithVersion := fmt.Sprintf("%v-%v", args.Name, args.Version) + args.Namespace = namespace + processers := GetAllProcesser() + processer, ok := processers[args.Type] + if !ok { + return fmt.Errorf("the processer of %v is not found", args.Type) + } + jobs, err := processer.GetServingJobs(args.Namespace, args.Name, args.Version) + if err != nil { + return err + } + if err := ValidateJobsBeforeSubmiting(jobs, args.Name); err != nil { + return err + } + chart := util.GetChartsFolder() + "/distributed-serving" + err = workflow.SubmitJob(nameWithVersion, string(types.DistributedServingJob), namespace, args, chart, args.HelmOptions...) + if err != nil { + return err + } + log.Infof("The Job %s has been submitted successfully", args.Name) + log.Infof("You can run `arena serve get %s --type %s -n %s` to check the job status", args.Name, args.Type, args.Namespace) + return nil +} + +func (p *DistributedServingProcesser) ListServingJobs(namespace string, allNamespace bool) ([]ServingJob, error) { + selector := fmt.Sprintf("%v=%v", servingTypeLabelKey, p.processerType) + arenaConfiger := config.GetArenaConfiger() + if arenaConfiger.IsIsolateUserInNamespace() { + selector = fmt.Sprintf("%v,%v=%v", selector, types.UserNameIdLabel, arenaConfiger.GetUser().GetId()) + } + log.Debugf("filter jobs by labels: %v", selector) + return p.FilterServingJobs(namespace, allNamespace, selector) +} + +func (p *DistributedServingProcesser) GetServingJobs(namespace, name, version string) ([]ServingJob, error) { + selector := []string{ + fmt.Sprintf("%v=%v", servingNameLabelKey, name), + fmt.Sprintf("%v=%v", servingTypeLabelKey, p.processerType), + } + log.Debugf("processer %v,filter jobs by labels: %v", p.processerType, selector) + return p.FilterServingJobs(namespace, false, strings.Join(selector, ",")) +} + +func (p *DistributedServingProcesser) FilterServingJobs(namespace string, allNamespace bool, label string) ([]ServingJob, error) { + if allNamespace { + namespace = metav1.NamespaceAll + } + + // get leaderworkerset + lwsList, err := k8saccesser.GetK8sResourceAccesser().ListLWSJobs(p.lwsClient, namespace, label) + if err != nil { + return nil, err + } + + // get pod + pods, err := k8saccesser.GetK8sResourceAccesser().ListPods(namespace, label, "", nil) + if err != nil { + return nil, err + } + + // get svc + services, err := k8saccesser.GetK8sResourceAccesser().ListServices(namespace, label) + if err != nil { + return nil, err + } + + servingJobs := []ServingJob{} + for _, lws := range lwsList { + filterPods := []*v1.Pod{} + for _, pod := range pods { + if lws.Labels[servingNameLabelKey] == pod.Labels[servingNameLabelKey] && + lws.Labels[servingTypeLabelKey] == pod.Labels[servingTypeLabelKey] { + filterPods = append(filterPods, pod) + } + } + version := lws.Labels[servingVersionLabelKey] + servingJobs = append(servingJobs, &lwsJob{ + lws: lws, + servingJob: &servingJob{ + name: lws.Labels[servingNameLabelKey], + namespace: lws.Namespace, + servingType: p.processerType, + version: version, + deployment: nil, + pods: filterPods, + services: services, + istioServices: nil, + }, + }) + } + + return servingJobs, nil +} + +func (s *lwsJob) Uid() string { + return string(s.lws.UID) +} + +func (s *lwsJob) Age() time.Duration { + return time.Since(s.lws.ObjectMeta.CreationTimestamp.Time) +} + +func (s *lwsJob) StartTime() *metav1.Time { + return &s.lws.ObjectMeta.CreationTimestamp +} + +func (s *lwsJob) RequestCPUs() float64 { + replicas := s.lws.Spec.Replicas + size := s.lws.Spec.LeaderWorkerTemplate.Size + masterCPUs := 0.0 + for _, c := range s.lws.Spec.LeaderWorkerTemplate.LeaderTemplate.Spec.Containers { + if val, ok := c.Resources.Limits[v1.ResourceName(types.CPUResourceName)]; ok { + masterCPUs += float64(val.Value()) + } + } + result := masterCPUs * float64(*replicas) + if size != nil && *size > 1 { + workerCPUs := 0.0 + for _, c := range s.lws.Spec.LeaderWorkerTemplate.WorkerTemplate.Spec.Containers { + if val, ok := c.Resources.Limits[v1.ResourceName(types.CPUResourceName)]; ok { + workerCPUs += float64(val.Value()) + } + } + workerCPUs *= float64((*size) - 1) + result += float64(*replicas) * workerCPUs + } + return result +} + +func (s *lwsJob) RequestGPUs() float64 { + replicas := s.lws.Spec.Replicas + size := s.lws.Spec.LeaderWorkerTemplate.Size + masterGPUs := 0.0 + for _, c := range s.lws.Spec.LeaderWorkerTemplate.LeaderTemplate.Spec.Containers { + if val, ok := c.Resources.Limits[v1.ResourceName(types.NvidiaGPUResourceName)]; ok { + masterGPUs += float64(val.Value()) + } + if val, ok := c.Resources.Limits[v1.ResourceName(types.AliyunGPUResourceName)]; ok { + masterGPUs += float64(val.Value()) + } + } + result := masterGPUs * float64(*replicas) + if size != nil && *size > 1 { + workerGPUs := 0.0 + for _, c := range s.lws.Spec.LeaderWorkerTemplate.WorkerTemplate.Spec.Containers { + if val, ok := c.Resources.Limits[v1.ResourceName(types.NvidiaGPUResourceName)]; ok { + workerGPUs += float64(val.Value()) + } + } + for _, c := range s.lws.Spec.LeaderWorkerTemplate.WorkerTemplate.Spec.Containers { + if val, ok := c.Resources.Limits[v1.ResourceName(types.AliyunGPUResourceName)]; ok { + workerGPUs += float64(val.Value()) + } + } + workerGPUs *= float64((*size) - 1) + result += float64(*replicas) * workerGPUs + } + return result +} + +func (s *lwsJob) RequestGPUMemory() int { + replicas := s.lws.Spec.Replicas + size := s.lws.Spec.LeaderWorkerTemplate.Size + + masterGpuMemory := 0 + for _, c := range s.lws.Spec.LeaderWorkerTemplate.LeaderTemplate.Spec.Containers { + if val, ok := c.Resources.Limits[v1.ResourceName(types.GPUShareResourceName)]; ok { + masterGpuMemory += int(val.Value()) + } + } + + result := masterGpuMemory * int(*replicas) + if size != nil && *size > 1 { + workerGpuMemory := 0 + for _, c := range s.lws.Spec.LeaderWorkerTemplate.WorkerTemplate.Spec.Containers { + if val, ok := c.Resources.Limits[v1.ResourceName(types.GPUShareResourceName)]; ok { + workerGpuMemory += int(val.Value()) + } + } + workerGpuMemory *= int((*size) - 1) + result += int(*replicas) * workerGpuMemory + } + + return result +} + +func (s *lwsJob) RequestGPUCore() int { + replicas := s.lws.Spec.Replicas + size := s.lws.Spec.LeaderWorkerTemplate.Size + + masterGpuCore := 0 + for _, c := range s.lws.Spec.LeaderWorkerTemplate.LeaderTemplate.Spec.Containers { + if val, ok := c.Resources.Limits[v1.ResourceName(types.GPUCoreShareResourceName)]; ok { + masterGpuCore += int(val.Value()) + } + } + + result := masterGpuCore * int(*replicas) + if size != nil && *size > 1 { + workerGpuCore := 0 + for _, c := range s.lws.Spec.LeaderWorkerTemplate.WorkerTemplate.Spec.Containers { + if val, ok := c.Resources.Limits[v1.ResourceName(types.GPUCoreShareResourceName)]; ok { + workerGpuCore += int(val.Value()) + } + } + workerGpuCore *= int((*size) - 1) + result += int(*replicas) * workerGpuCore + } + + return result +} + +func (s *lwsJob) DesiredInstances() int { + return int(s.lws.Status.Replicas) +} + +func (s *lwsJob) AvailableInstances() int { + return int(s.lws.Status.ReadyReplicas) +} + +func (s *lwsJob) GetLabels() map[string]string { + return s.lws.Labels +} + +func (s *lwsJob) Convert2JobInfo() types.ServingJobInfo { + servingType := types.ServingTypeMap[s.servingType].Alias + servingJobInfo := types.ServingJobInfo{ + UUID: s.Uid(), + Name: s.name, + Namespace: s.namespace, + Version: s.version, + Type: servingType, + Age: util.ShortHumanDuration(s.Age()), + Desired: s.DesiredInstances(), + IPAddress: s.IPAddress(), + Available: s.AvailableInstances(), + RequestCPUs: s.RequestCPUs(), + RequestGPUs: s.RequestGPUs(), + RequestGPUMemory: s.RequestGPUMemory(), + RequestGPUCore: s.RequestGPUCore(), + Endpoints: s.Endpoints(), + Instances: s.Instances(), + CreationTimestamp: s.StartTime().Unix(), + } + return servingJobInfo +} diff --git a/pkg/serving/update.go b/pkg/serving/update.go index 0d24da2f8..23468f2ed 100644 --- a/pkg/serving/update.go +++ b/pkg/serving/update.go @@ -25,6 +25,7 @@ import ( appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" + lwsv1 "sigs.k8s.io/lws/api/leaderworkerset/v1" "github.com/kubeflow/arena/pkg/apis/types" "github.com/kubeflow/arena/pkg/util/kubectl" @@ -328,6 +329,77 @@ func UpdateKServe(args *types.UpdateKServeArgs) error { return updateInferenceService(args.Name, args.Version, inferenceService) } +func UpdateDistributedServing(args *types.UpdateDistributedServingArgs) error { + lwsJob, err := findAndBuildLWSJob(args) + if err != nil { + return nil + } + + if args.Annotations != nil && len(args.Annotations) > 0 { + for k, v := range args.Annotations { + lwsJob.Annotations[k] = v + lwsJob.Spec.LeaderWorkerTemplate.LeaderTemplate.Annotations[k] = v + lwsJob.Spec.LeaderWorkerTemplate.WorkerTemplate.Annotations[k] = v + } + } + + if args.Labels != nil && len(args.Labels) > 0 { + for k, v := range args.Labels { + lwsJob.Labels[k] = v + lwsJob.Spec.LeaderWorkerTemplate.LeaderTemplate.Labels[k] = v + lwsJob.Spec.LeaderWorkerTemplate.WorkerTemplate.Labels[k] = v + } + } + + if args.NodeSelectors != nil && len(args.NodeSelectors) > 0 { + if lwsJob.Spec.LeaderWorkerTemplate.LeaderTemplate.Spec.NodeSelector == nil { + lwsJob.Spec.LeaderWorkerTemplate.LeaderTemplate.Spec.NodeSelector = map[string]string{} + } + if lwsJob.Spec.LeaderWorkerTemplate.WorkerTemplate.Spec.NodeSelector == nil { + lwsJob.Spec.LeaderWorkerTemplate.WorkerTemplate.Spec.NodeSelector = map[string]string{} + } + for k, v := range args.NodeSelectors { + lwsJob.Spec.LeaderWorkerTemplate.LeaderTemplate.Spec.NodeSelector[k] = v + lwsJob.Spec.LeaderWorkerTemplate.WorkerTemplate.Spec.NodeSelector[k] = v + } + } + + if args.Tolerations != nil && len(args.Tolerations) > 0 { + if lwsJob.Spec.LeaderWorkerTemplate.LeaderTemplate.Spec.Tolerations == nil { + lwsJob.Spec.LeaderWorkerTemplate.LeaderTemplate.Spec.Tolerations = []v1.Toleration{} + } + if lwsJob.Spec.LeaderWorkerTemplate.WorkerTemplate.Spec.Tolerations == nil { + lwsJob.Spec.LeaderWorkerTemplate.WorkerTemplate.Spec.Tolerations = []v1.Toleration{} + } + exist := map[string]bool{} + var tolerations []v1.Toleration + for _, toleration := range args.Tolerations { + tolerations = append(tolerations, v1.Toleration{ + Key: toleration.Key, + Value: toleration.Value, + Effect: v1.TaintEffect(toleration.Effect), + Operator: v1.TolerationOperator(toleration.Operator), + }) + exist[toleration.Key+toleration.Value] = true + } + + for _, preToleration := range lwsJob.Spec.LeaderWorkerTemplate.LeaderTemplate.Spec.Tolerations { + if !exist[preToleration.Key+preToleration.Value] { + tolerations = append(tolerations, preToleration) + } + } + for _, preToleration := range lwsJob.Spec.LeaderWorkerTemplate.WorkerTemplate.Spec.Tolerations { + if !exist[preToleration.Key+preToleration.Value] { + tolerations = append(tolerations, preToleration) + } + } + lwsJob.Spec.LeaderWorkerTemplate.LeaderTemplate.Spec.Tolerations = tolerations + lwsJob.Spec.LeaderWorkerTemplate.WorkerTemplate.Spec.Tolerations = tolerations + } + + return updateLWSJob(args.Name, args.Version, lwsJob) +} + func findAndBuildDeployment(args *types.CommonUpdateServingArgs) (*appsv1.Deployment, error) { job, err := SearchServingJob(args.Namespace, args.Name, args.Version, args.Type) if err != nil { @@ -470,6 +542,133 @@ func findAndBuildInferenceService(args *types.UpdateKServeArgs) (*kservev1beta1. return inferenceService, nil } +func findAndBuildLWSJob(args *types.UpdateDistributedServingArgs) (*lwsv1.LeaderWorkerSet, error) { + job, err := SearchServingJob(args.Namespace, args.Name, args.Version, args.Type) + if err != nil { + return nil, err + } + if args.Version == "" { + jobInfo := job.Convert2JobInfo() + args.Version = jobInfo.Version + } + + lwsName := fmt.Sprintf("%s-%s-%s", args.Name, args.Version, "distributed-serving") + lwsJob, err := kubectl.GetLWSJob(lwsName, args.Namespace) + if err != nil { + return nil, err + } + + if args.Image != "" { + lwsJob.Spec.LeaderWorkerTemplate.LeaderTemplate.Spec.Containers[0].Image = args.Image + lwsJob.Spec.LeaderWorkerTemplate.WorkerTemplate.Spec.Containers[0].Image = args.Image + } + + if args.Replicas > 0 { + replicas := int32(args.Replicas) + lwsJob.Spec.Replicas = &replicas + } + + // update resource + masterResourceLimits := lwsJob.Spec.LeaderWorkerTemplate.LeaderTemplate.Spec.Containers[0].Resources.Limits + workerResourceLimits := lwsJob.Spec.LeaderWorkerTemplate.WorkerTemplate.Spec.Containers[0].Resources.Limits + if masterResourceLimits == nil { + masterResourceLimits = make(map[v1.ResourceName]resource.Quantity) + } + if workerResourceLimits == nil { + workerResourceLimits = make(map[v1.ResourceName]resource.Quantity) + } + if args.MasterCpu != "" { + masterResourceLimits[v1.ResourceCPU] = resource.MustParse(args.MasterCpu) + } + if args.WorkerCpu != "" { + workerResourceLimits[v1.ResourceCPU] = resource.MustParse(args.WorkerCpu) + } + if args.MasterGPUCount > 0 { + masterResourceLimits[ResourceGPU] = resource.MustParse(strconv.Itoa(args.MasterGPUCount)) + delete(masterResourceLimits, ResourceGPUMemory) + } + if args.WorkerGPUCount > 0 { + workerResourceLimits[ResourceGPU] = resource.MustParse(strconv.Itoa(args.WorkerGPUCount)) + delete(workerResourceLimits, ResourceGPUMemory) + } + if args.MasterGPUMemory > 0 { + masterResourceLimits[ResourceGPUMemory] = resource.MustParse(strconv.Itoa(args.MasterGPUMemory)) + delete(masterResourceLimits, ResourceGPU) + } + if args.WorkerGPUMemory > 0 { + workerResourceLimits[ResourceGPUMemory] = resource.MustParse(strconv.Itoa(args.WorkerGPUMemory)) + delete(workerResourceLimits, ResourceGPU) + } + if args.MasterGPUCore > 0 { + masterResourceLimits[ResourceGPUCore] = resource.MustParse(strconv.Itoa(args.MasterGPUCore)) + delete(masterResourceLimits, ResourceGPU) + } + if args.WorkerGPUCore > 0 { + workerResourceLimits[ResourceGPUCore] = resource.MustParse(strconv.Itoa(args.WorkerGPUCore)) + delete(workerResourceLimits, ResourceGPU) + } + if args.MasterMemory != "" { + masterResourceLimits[v1.ResourceMemory] = resource.MustParse(args.MasterMemory) + } + if args.WorkerMemory != "" { + workerResourceLimits[v1.ResourceMemory] = resource.MustParse(args.WorkerMemory) + } + lwsJob.Spec.LeaderWorkerTemplate.LeaderTemplate.Spec.Containers[0].Resources.Limits = masterResourceLimits + lwsJob.Spec.LeaderWorkerTemplate.WorkerTemplate.Spec.Containers[0].Resources.Limits = workerResourceLimits + + // update env + var masterEnvs []v1.EnvVar + var workerEnvs []v1.EnvVar + masterExist := map[string]bool{} + workerExist := map[string]bool{} + if args.Envs != nil { + for k, v := range args.Envs { + envVar := v1.EnvVar{ + Name: k, + Value: v, + } + masterEnvs = append(masterEnvs, envVar) + workerEnvs = append(workerEnvs, envVar) + masterExist[k] = true + workerExist[k] = true + } + } + for _, env := range lwsJob.Spec.LeaderWorkerTemplate.LeaderTemplate.Spec.Containers[0].Env { + if !masterExist[env.Name] { + masterEnvs = append(masterEnvs, env) + } + } + for _, env := range lwsJob.Spec.LeaderWorkerTemplate.WorkerTemplate.Spec.Containers[0].Env { + if !workerExist[env.Name] { + workerEnvs = append(workerEnvs, env) + } + } + lwsJob.Spec.LeaderWorkerTemplate.LeaderTemplate.Spec.Containers[0].Env = masterEnvs + lwsJob.Spec.LeaderWorkerTemplate.WorkerTemplate.Spec.Containers[0].Env = workerEnvs + + // update command + if args.MasterCommand != "" { + // commands: sh -c xxx + masterCommand := lwsJob.Spec.LeaderWorkerTemplate.LeaderTemplate.Spec.Containers[0].Command + newMasterCommand := make([]string, len(masterCommand)) + copy(newMasterCommand, masterCommand) + newMasterCommand[len(newMasterCommand)-1] = args.MasterCommand + lwsJob.Spec.LeaderWorkerTemplate.LeaderTemplate.Spec.Containers[0].Command = newMasterCommand + lwsJob.Spec.LeaderWorkerTemplate.LeaderTemplate.Spec.Containers[0].Args = []string{} + } + if args.WorkerCommand != "" { + // commands: sh -c xxx + workerCommand := lwsJob.Spec.LeaderWorkerTemplate.WorkerTemplate.Spec.Containers[0].Command + newWorkerCommand := make([]string, len(workerCommand)) + copy(newWorkerCommand, workerCommand) + newWorkerCommand[len(newWorkerCommand)-1] = args.WorkerCommand + lwsJob.Spec.LeaderWorkerTemplate.WorkerTemplate.Spec.Containers[0].Command = newWorkerCommand + lwsJob.Spec.LeaderWorkerTemplate.WorkerTemplate.Spec.Containers[0].Args = []string{} + } + + return lwsJob, nil +} + func updateDeployment(name, version string, deploy *appsv1.Deployment) error { err := kubectl.UpdateDeployment(deploy) if err == nil { @@ -491,6 +690,17 @@ func updateInferenceService(name, version string, inferenceService *kservev1beta return nil } +func updateLWSJob(name, version string, lwsJob *lwsv1.LeaderWorkerSet) error { + err := kubectl.UpdateLWSJob(lwsJob) + if err != nil { + log.Errorf("The serving job %s with version %s update failed", name, version) + return err + } + + log.Infof("The serving job %s with version %s has been updated successfully", name, version) + return nil +} + func setInferenceServiceForFrameworkModel(args *types.UpdateKServeArgs, inferenceService *kservev1beta1.InferenceService) { if args.ModelFormat != nil { inferenceService.Spec.Predictor.Model.ModelFormat.Name = args.ModelFormat.Name diff --git a/pkg/serving/util.go b/pkg/serving/util.go index 8fe6a9824..dfdab989b 100644 --- a/pkg/serving/util.go +++ b/pkg/serving/util.go @@ -77,6 +77,8 @@ func ValidateJobsBeforeSubmiting(jobs []ServingJob, name string) error { var labels map[string]string if ksjob, ok := s.(*kserveJob); ok { labels = ksjob.inferenceService.Labels + } else if lwsjob, ok := s.(*lwsJob); ok { + labels = lwsjob.lws.Labels } else { labels = s.Deployment().Labels } diff --git a/pkg/util/kubectl/kubectl.go b/pkg/util/kubectl/kubectl.go index f51ebd2dc..3c844cc30 100644 --- a/pkg/util/kubectl/kubectl.go +++ b/pkg/util/kubectl/kubectl.go @@ -29,6 +29,8 @@ import ( v1 "k8s.io/api/apps/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + lwsv1 "sigs.k8s.io/lws/api/leaderworkerset/v1" + lwsClient "sigs.k8s.io/lws/client-go/clientset/versioned" "github.com/kubeflow/arena/pkg/apis/config" ) @@ -356,6 +358,11 @@ func GetInferenceService(name, namespace string) (*kservev1beta1.InferenceServic return client.ServingV1beta1().InferenceServices(namespace).Get(context.TODO(), name, metav1.GetOptions{}) } +func GetLWSJob(name, namespace string) (*lwsv1.LeaderWorkerSet, error) { + client := lwsClient.NewForConfigOrDie(config.GetArenaConfiger().GetRestConfig()) + return client.LeaderworkersetV1().LeaderWorkerSets(namespace).Get(context.TODO(), name, metav1.GetOptions{}) +} + func UpdateDeployment(deploy *v1.Deployment) error { arenaConfiger := config.GetArenaConfiger() client := arenaConfiger.GetClientSet() @@ -371,6 +378,13 @@ func UpdateInferenceService(inferenceService *kservev1beta1.InferenceService) er return err } +func UpdateLWSJob(lwsJob *lwsv1.LeaderWorkerSet) error { + client := lwsClient.NewForConfigOrDie(config.GetArenaConfiger().GetRestConfig()) + + _, err := client.LeaderworkersetV1().LeaderWorkerSets(lwsJob.Namespace).Update(context.TODO(), lwsJob, metav1.UpdateOptions{}) + return err +} + // PatchOwnerReferenceWithAppInfoFile patch tfjob / pytorchjob ownerReference func PatchOwnerReferenceWithAppInfoFile(name, trainingType, appInfoFile, namespace string) error { data, err := os.ReadFile(appInfoFile)