Skip to content

Commit

Permalink
cluster fix (#105)
Browse files Browse the repository at this point in the history
* cluster fix

* fix proxy issue

* move storeage to cluster
  • Loading branch information
ganisback authored Aug 15, 2024
1 parent e9ea547 commit 16b2151
Show file tree
Hide file tree
Showing 16 changed files with 67 additions and 44 deletions.
4 changes: 4 additions & 0 deletions api/handler/rproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ func (r *RProxyHandler) Proxy(ctx *gin.Context) {
if allow {
apiname := ctx.Param("api")
target := fmt.Sprintf("http://%s.%s", appSrvName, r.SpaceRootDomain)
if deploy.Endpoint != "" {
//support multi-cluster
target = deploy.Endpoint
}
rp, _ := proxy.NewReverseProxy(target)
rp.ServeHTTP(ctx.Writer, ctx.Request, apiname)
} else {
Expand Down
16 changes: 10 additions & 6 deletions builder/deploy/cluster/cluster_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type Cluster struct {
ConfigPath string // Path to the kubeconfig file
Client *kubernetes.Clientset // Kubernetes client
KnativeClient *knative.Clientset // Knative client
StorageClass string
}

// ClusterPool is a resource pool of cluster information
Expand Down Expand Up @@ -99,12 +100,15 @@ func (p *ClusterPool) GetCluster() (*Cluster, error) {
// GetClusterByID retrieves a cluster from the pool given its unique ID
func (p *ClusterPool) GetClusterByID(ctx context.Context, id string) (*Cluster, error) {
cfId := "config"
storageClass := ""
if len(id) != 0 {
cInfo, _ := p.ClusterStore.ByClusterID(ctx, id)
cfId = cInfo.ClusterConfig
storageClass = cInfo.StorageClass
}
for _, Cluster := range p.Clusters {
if Cluster.ID == cfId {
Cluster.StorageClass = storageClass
return &Cluster, nil
}
}
Expand Down Expand Up @@ -134,10 +138,10 @@ func GetNodeResources(clientset *kubernetes.Clientset, config *config.Config) (m
}
totalMem := getMem(memQuantity)
totalCPU := node.Status.Capacity.Cpu().MilliValue()
totalGPU := resource.Quantity{}
totalXPU := resource.Quantity{}
xpuCapacityLabel, xpuTypeLabel := getXPULabel(node, config)
if xpuCapacityLabel != "" {
totalGPU = node.Status.Capacity[v1.ResourceName(xpuCapacityLabel)]
totalXPU = node.Status.Capacity[v1.ResourceName(xpuCapacityLabel)]
}

gpuModelVendor := strings.Split(node.Labels[xpuTypeLabel], "-")
Expand All @@ -149,10 +153,10 @@ func GetNodeResources(clientset *kubernetes.Clientset, config *config.Config) (m
NodeName: node.Name,
TotalCPU: millicoresToCores(totalCPU),
AvailableCPU: millicoresToCores(totalCPU),
GPUModel: gpuModel,
XPUModel: gpuModel,
GPUVendor: gpuModelVendor[0],
TotalGPU: parseQuantityToInt64(totalGPU),
AvailableGPU: parseQuantityToInt64(totalGPU),
TotalXPU: parseQuantityToInt64(totalXPU),
AvailableXPU: parseQuantityToInt64(totalXPU),
AvailableMem: totalMem,
TotalMem: totalMem,
XPUCapacityLabel: xpuCapacityLabel,
Expand All @@ -166,7 +170,7 @@ func GetNodeResources(clientset *kubernetes.Clientset, config *config.Config) (m
nodeResource := nodeResourcesMap[pod.Spec.NodeName]
for _, container := range pod.Spec.Containers {
if requestedGPU, hasGPU := container.Resources.Requests[v1.ResourceName(nodeResource.XPUCapacityLabel)]; hasGPU {
nodeResource.AvailableGPU -= parseQuantityToInt64(requestedGPU)
nodeResource.AvailableXPU -= parseQuantityToInt64(requestedGPU)
}
if memoryRequest, hasMemory := container.Resources.Requests[v1.ResourceMemory]; hasMemory {
nodeResource.AvailableMem -= getMem(parseQuantityToInt64(memoryRequest))
Expand Down
9 changes: 4 additions & 5 deletions builder/deploy/deployer.go
Original file line number Diff line number Diff line change
Expand Up @@ -732,9 +732,7 @@ func CheckResource(clusterResources *types.ClusterRes, hardware *types.HardWare)
}
for _, node := range clusterResources.Resources {
if float32(mem) <= node.AvailableMem {
if hardware.Gpu.Num == "" {
return true
} else {
if hardware.Gpu.Num != "" {
gpu, err := strconv.Atoi(hardware.Gpu.Num)
if err != nil {
slog.Error("failed to parse hardware gpu ", slog.Any("error", err))
Expand All @@ -746,10 +744,11 @@ func CheckResource(clusterResources *types.ClusterRes, hardware *types.HardWare)
return false

}
if gpu <= int(node.AvailableGPU) && hardware.Gpu.Type == node.GPUModel && cpu <= int(node.AvailableCPU) {
if gpu <= int(node.AvailableXPU) && hardware.Gpu.Type == node.XPUModel && cpu <= int(node.AvailableCPU) {
return true
}

} else {
return true
}
}
}
Expand Down
6 changes: 5 additions & 1 deletion builder/deploy/scheduler/deploy_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"log/slog"
"net/url"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -247,6 +248,9 @@ func (t *DeployRunner) makeDeployRequest() (*types.RunRequest, error) {
envMap["ACCESS_TOKEN"] = token.Token
envMap["REPO_ID"] = t.repo.Path // "namespace/name"
envMap["REVISION"] = deploy.GitBranch // branch
if hardware.Gpu.Num != "" {
envMap["GPU_NUM"] = hardware.Gpu.Num
}

if deploy.SpaceID > 0 {
// sdk port for space
Expand All @@ -269,7 +273,7 @@ func (t *DeployRunner) makeDeployRequest() (*types.RunRequest, error) {

if deploy.Type == types.FinetuneType {
envMap["port"] = strconv.Itoa(deploy.ContainerPort)
envMap["HF_ENDPOINT"] = t.modelDownloadEndpoint + "/hf"
envMap["HF_ENDPOINT"], _ = url.JoinPath(t.modelDownloadEndpoint, "hf")
envMap["HF_TOKEN"] = token.Token
}

Expand Down
1 change: 1 addition & 0 deletions builder/store/database/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ func NewClusterInfoStore() *ClusterInfoStore {
type ClusterInfo struct {
ClusterID string `bun:",pk" json:"cluster_id"`
ClusterConfig string `bun:",notnull" json:"cluster_config"`
StorageClass string `bun:",notnull" json:"storage_class"`
Region string `bun:",notnull" json:"region"`
Zone string `bun:",notnull" json:"zone"` //cn-beijing
Provider string `bun:",notnull" json:"provider"` //ali
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
SET statement_timeout = 0;

--bun:split

ALTER TABLE cluster_infos DROP COLUMN IF EXISTS storage_class;
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
SET statement_timeout = 0;

--bun:split

ALTER TABLE cluster_infos ADD COLUMN IF NOT EXISTS storage_class VARCHAR;
4 changes: 1 addition & 3 deletions common/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,11 @@ type Config struct {
DeployTimeoutInMin int `envconfig:"STARHUB_SERVER_SPACE_DEPLOY_TIMEOUT_IN_MINUTES" default:"30"`
// gpu model label
GPUModelLabel string `envconfig:"STARHUB_SERVER_GPU_MODEL_LABEL" default:"aliyun.accelerator/nvidia_name"`
// Persist volume storage class
StorageClass string `envconfig:"STARHUB_SERVER_STORAGE_CLASS"`
}

Model struct {
DeployTimeoutInMin int `envconfig:"STARHUB_SERVER_MODEL_DEPLOY_TIMEOUT_IN_MINUTES" default:"30"`
DownloadEndpoint string `envconfig:"STARHUB_SERVER_MODEL_DOWNLOAD_ENDPOINT" default:"https://hub.opencsg.com/"`
DownloadEndpoint string `envconfig:"STARHUB_SERVER_MODEL_DOWNLOAD_ENDPOINT" default:"https://hub.opencsg.com"`
DockerRegBase string `envconfig:"STARHUB_SERVER_MODEL_DOCKER_REG_BASE" default:"opencsg-registry.cn-beijing.cr.aliyuncs.com/public/"`
}
// send events
Expand Down
7 changes: 4 additions & 3 deletions common/types/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type ClusterRequest struct {
Zone string `json:"zone"` //cn-beijing
Provider string `json:"provider"` //ali
Enable bool `json:"enable"`
StorageClass string `json:"storage_class"`
}
type ClusterRes struct {
ClusterID string `json:"cluster_id"`
Expand All @@ -26,11 +27,11 @@ type ClusterRes struct {

type NodeResourceInfo struct {
NodeName string `json:"node_name"`
GPUModel string `json:"gpu_model"`
XPUModel string `json:"xpu_model"`
TotalCPU float64 `json:"total_cpu"`
AvailableCPU float64 `json:"available_cpu"`
TotalGPU int64 `json:"total_gpu"`
AvailableGPU int64 `json:"available_gpu"`
TotalXPU int64 `json:"total_xpu"`
AvailableXPU int64 `json:"available_xpu"`
GPUVendor string `json:"gpu_vendor"`
TotalMem float32 `json:"total_mem"` //in GB
AvailableMem float32 `json:"available_mem"` //in GB
Expand Down
22 changes: 11 additions & 11 deletions common/types/hardware.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,22 @@ package types

type (
GPU struct {
Type string `json:"type,omitempty" protobuf:"bytes,1,opt,name=type"`
Num string `json:"num,omitempty" protobuf:"varint,2,opt,name=num"`
ResourceName string `json:"resource_name,omitempty" protobuf:"bytes,3,opt,name=resource_name"`
Labels map[string]string `json:"labels,omitempty" protobuf:"bytes,4,rep,name=labels"`
Type string `json:"type,omitempty"`
Num string `json:"num,omitempty"`
ResourceName string `json:"resource_name,omitempty"`
Labels map[string]string `json:"labels,omitempty"`
}

CPU struct {
Type string `json:"type,omitempty" protobuf:"bytes,1,opt,name=type"`
Num string `json:"num,omitempty" protobuf:"varint,2,opt,name=num"`
Labels map[string]string `json:"labels,omitempty" protobuf:"bytes,3,rep,name=labels"`
Type string `json:"type,omitempty"`
Num string `json:"num,omitempty"`
Labels map[string]string `json:"labels,omitempty"`
}

HardWare struct {
Gpu GPU `json:"gpu,omitempty" protobuf:"bytes,1,opt,name=gpu"`
Cpu CPU `json:"cpu,omitempty" protobuf:"bytes,2,opt,name=cpu"`
Memory string `json:"memory,omitempty" protobuf:"bytes,3,opt,name=memory"`
EphemeralStorage string `json:"ephemeral_storage,omitempty" protobuf:"bytes,4,opt,name=ephemeral_storage"`
Gpu GPU `json:"gpu,omitempty"`
Cpu CPU `json:"cpu,omitempty"`
Memory string `json:"memory,omitempty"`
EphemeralStorage string `json:"ephemeral_storage,omitempty"`
}
)
4 changes: 0 additions & 4 deletions component/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ type RepoComponent struct {
tokenStore *database.AccessTokenStore
rtfm *database.RuntimeFrameworksStore
rrtfms *database.RepositoriesRuntimeFrameworkStore
needPurge bool
syncVersion *database.SyncVersionStore
syncClientSetting *database.SyncClientSettingStore
file *database.FileStore
Expand Down Expand Up @@ -117,9 +116,6 @@ func NewRepoComponent(config *config.Config) (*RepoComponent, error) {
c.cluster = database.NewClusterInfoStore()
c.rtfm = database.NewRuntimeFrameworksStore()
c.rrtfms = database.NewRepositoriesRuntimeFramework()
if config.Space.StorageClass != "" {
c.needPurge = true
}
c.ac, err = NewAccountingComponent(config)
if err != nil {
return nil, err
Expand Down
4 changes: 3 additions & 1 deletion docker/finetune/Dockerfile.llamafactory
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ RUN pip install --no-cache-dir jupyterlab numpy==1.26.4 \
jupyter-server-proxy==4.2.0
# Create a working directory
WORKDIR /etc/csghub
RUN git clone https://github.com/hiyouga/LLaMA-Factory.git --branch v0.8.1 --single-branch
RUN git clone https://github.com/hiyouga/LLaMA-Factory.git --branch v0.8.3 --single-branch
RUN cd LLaMA-Factory && pip install --no-cache-dir -e ".[metrics,deepspeed]"
# setup supervisord
RUN mkdir -p /var/log/supervisord
Expand All @@ -47,6 +47,8 @@ RUN mkdir -p /root/.jupyter/lab/user-settings/@jupyterlab/apputils-extension &&
echo '{"theme":"JupyterLab Dark"}' > /root/.jupyter/lab/user-settings/@jupyterlab/apputils-extension/themes.jupyterlab-settings && \
mkdir -p /root/.jupyter/lab/user-settings/@jupyterlab/notebook-extension && \
echo '{"codeCellConfig":{"lineNumbers":true }}' > /root/.jupyter/lab/user-settings/@jupyterlab/notebook-extension/tracker.jupyterlab-settings
#fix gradio proxy issue
RUN pip uninstall gradio && pip install https://git-devops.opencsg.com/opensource/gradio/-/raw/3a207a08755b4820541915e9ea63e6abc1b4b424/gradio-4.41.0-py3-none-any.whl
# Create a working directory
WORKDIR /workspace/
ENTRYPOINT ["/usr/bin/supervisord", "-c", "/etc/supervisor/conf.d/supervisord.conf"]
Expand Down
4 changes: 2 additions & 2 deletions docker/finetune/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ docker push xxx
## latest images
```
#for llama-factory image
opencsg-registry.cn-beijing.cr.aliyuncs.com/public/llama-factory:1.11-cuda12.1-devel-ubuntu22.04-py310-torch2.1.2
opencsg-registry.cn-beijing.cr.aliyuncs.com/public/llama-factory:1.15-cuda12.1-devel-ubuntu22.04-py310-torch2.1.2
```
## Run image locally
```
docker run -d -e ACCESS_TOKEN=xxx -e REPO_ID="OpenCSG/csg-wukong-1B" -e HF_ENDPOINT=https://hub.opencsg.com/hf opencsg-registry.cn-beijing.cr.aliyuncs.com/public/llama-factory:1.11-cuda12.1-devel-ubuntu22.04-py310-torch2.1.2
docker run -d -e ACCESS_TOKEN=xxx -e REPO_ID="OpenCSG/csg-wukong-1B" -e HF_ENDPOINT=https://hub.opencsg.com/hf opencsg-registry.cn-beijing.cr.aliyuncs.com/public/llama-factory:1.15-cuda12.1-devel-ubuntu22.04-py310-torch2.1.2
```
Note: HF_ENDPOINT should be use the real csghub address
Expand Down
2 changes: 1 addition & 1 deletion docker/inference/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ opencsg-registry.cn-beijing.cr.aliyuncs.com/public/tgi-local:1.6
```
## Run image locally
```
docker run -d -e ACCESS_TOKEN=9a5f8d256720e960cf573bb09dafa90328c0a477 -e REPO_ID="xzgan001/csg-wukong-1B" -e HF_ENDPOINT=https://hub-stg.opencsg.com/ --gpus device=1 opencsg-registry.cn-beijing.cr.aliyuncs.com/public/vllm-local:2.7
docker run -d -e ACCESS_TOKEN=xxx -e REPO_ID="xzgan001/csg-wukong-1B" -e HF_ENDPOINT=https://hub-stg.opencsg.com/ --gpus device=1 opencsg-registry.cn-beijing.cr.aliyuncs.com/public/vllm-local:2.7
docker run -d -v llm:/data -e ACCESS_TOKEN=xxx -e REPO_ID="xzgan001/csg-wukong-1B" -e HF_ENDPOINT=https://hub-stg.opencsg.com/hf --gpus device=7 opencsg-registry.cn-beijing.cr.aliyuncs.com/public/tgi-local:1.6
Expand Down
2 changes: 1 addition & 1 deletion servicerunner/component/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func (s *ServiceComponent) NewPersistentVolumeClaim(name string, ctx context.Con
corev1.ResourceStorage: storage,
},
},
StorageClassName: &s.env.Space.StorageClass,
StorageClassName: &cluster.StorageClass,
},
}
_, err = cluster.Client.CoreV1().PersistentVolumeClaims(s.k8sNameSpace).Create(ctx, &pvc, metav1.CreateOptions{})
Expand Down
16 changes: 10 additions & 6 deletions servicerunner/handler/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ func (s *K8sHander) RunService(c *gin.Context) {
return
}
// add pvc if possible
if s.env.Space.StorageClass != "" && request.DeployType != types.SpaceType {
// space image was built from user's code, model cache dir is hard to control
// so no PV cache for space case so far
if cluster.StorageClass != "" && request.DeployType != types.SpaceType {
err = s.s.NewPersistentVolumeClaim(srvName, c, *cluster, request.Hardware)
if err != nil {
slog.Error("Failed to create persist volume", "error", err)
Expand Down Expand Up @@ -827,11 +829,13 @@ func (s *K8sHander) PurgeService(c *gin.Context) {
}

// 2 clean up pvc
err = cluster.Client.CoreV1().PersistentVolumeClaims(s.k8sNameSpace).Delete(c, srvName, metav1.DeleteOptions{})
if err != nil {
slog.Error("fail to delete pvc", slog.Any("error", err))
c.JSON(http.StatusInternalServerError, gin.H{"error": "fail to delete pvc"})
return
if cluster.StorageClass != "" {
err = cluster.Client.CoreV1().PersistentVolumeClaims(s.k8sNameSpace).Delete(c, srvName, metav1.DeleteOptions{})
if err != nil {
slog.Error("fail to delete pvc", slog.Any("error", err))
c.JSON(http.StatusInternalServerError, gin.H{"error": "fail to delete pvc"})
return
}
}
slog.Info("service deleted, PVC deleted.", slog.String("srv_name", srvName))
resp.Code = 0
Expand Down

0 comments on commit 16b2151

Please sign in to comment.