diff --git a/src/components/ec/cuda/ec_cuda.c b/src/components/ec/cuda/ec_cuda.c index 4027099be0..1221f0d3ba 100644 --- a/src/components/ec/cuda/ec_cuda.c +++ b/src/components/ec/cuda/ec_cuda.c @@ -105,8 +105,6 @@ static ucc_status_t ucc_ec_cuda_init(const ucc_ec_params_t *ec_params) struct cudaDeviceProp prop; ucc_status_t status; - ucc_ec_cuda.stream = NULL; - ucc_ec_cuda.stream_initialized = 0; ucc_ec_cuda.exec_streams_initialized = 0; ucc_strncpy_safe(ucc_ec_cuda.super.config->log_component.name, ucc_ec_cuda.super.super.name, @@ -141,14 +139,6 @@ static ucc_status_t ucc_ec_cuda_init(const ucc_ec_params_t *ec_params) cfg->exec_num_streams = 1; } - ucc_ec_cuda.exec_streams = ucc_calloc(cfg->exec_num_streams, - sizeof(cudaStream_t), - "ec cuda streams"); - if (!ucc_ec_cuda.exec_streams) { - ec_error(&ucc_ec_cuda.super, "failed to allocate streams array"); - return UCC_ERR_NO_MEMORY; - } - if (cfg->strm_task_mode == UCC_EC_CUDA_TASK_KERNEL) { ucc_ec_cuda.strm_task_mode = UCC_EC_CUDA_TASK_KERNEL; } else { @@ -197,6 +187,7 @@ static ucc_status_t ucc_ec_cuda_init(const ucc_ec_params_t *ec_params) ucc_ec_cuda.resources_hash = kh_init(ucc_ec_cuda_resources_hash); status = ucc_ec_cuda_resources_init(&ucc_ec_cuda.super, + EC_CUDA_CONFIG->exec_num_streams, &ucc_ec_cuda.resources); if (status != UCC_OK) { ec_warn(&ucc_ec_cuda.super, "failed to initilize CUDA resources"); @@ -215,54 +206,6 @@ static ucc_status_t ucc_ec_cuda_get_attr(ucc_ec_attr_t *ec_attr) return UCC_OK; } -static ucc_status_t ucc_ec_cuda_get_resources(ucc_ec_cuda_resources_t **resources) -{ - CUcontext cu_ctx; - unsigned long long int cu_ctx_id; - ucc_status_t status; - - status = CUDADRV_FUNC(cuCtxGetCurrent(&cu_ctx)); - if (ucc_unlikely(status != UCC_OK)) { - ec_error(&ucc_ec_cuda.super, "failed to get current CUDA context"); - return status; - } - - status = CUDADRV_FUNC(cuCtxGetId(cu_ctx, &cu_ctx_id)); - if (ucc_unlikely(status != UCC_OK)) { - ec_error(&ucc_ec_cuda.super, "failed to get currect CUDA context ID"); - } - - *resources = ec_cuda_resources_hash_get(ucc_ec_cuda.resources_hash, - cu_ctx_id); - if (ucc_unlikely(*resources == NULL)) { - ucc_spin_lock(&ucc_ec_cuda.init_spinlock); - *resources = ec_cuda_resources_hash_get(ucc_ec_cuda.resources_hash, - cu_ctx_id); - if (*resources == NULL) { - *resources = ucc_malloc(sizeof(ucc_ec_cuda_resources_t), - "ec cuda resources"); - if (*resources == NULL) { - ec_error(&ucc_ec_cuda.super, - "failed to allocate %zd bytes for resources", - sizeof(ucc_ec_cuda_resources_t)); - ucc_spin_unlock(&ucc_ec_cuda.init_spinlock); - return UCC_ERR_NO_MEMORY; - } - status = ucc_ec_cuda_resources_init(&ucc_ec_cuda.super, - *resources); - if (status != UCC_OK) { - ucc_free(*resources); - ucc_spin_unlock(&ucc_ec_cuda.init_spinlock); - return status; - } - ec_cuda_resources_hash_put(ucc_ec_cuda.resources_hash, cu_ctx_id, - *resources); - } - ucc_spin_unlock(&ucc_ec_cuda.init_spinlock); - } - return UCC_OK; -} - ucc_status_t ucc_ec_cuda_event_create(void **event) { ucc_ec_cuda_event_t *cuda_event; @@ -316,22 +259,57 @@ ucc_status_t ucc_ec_cuda_event_test(void *event) static ucc_status_t ucc_ec_cuda_finalize() { - int i; + ucc_ec_cuda_resources_cleanup(&ucc_ec_cuda.resources); + + return UCC_OK; +} + +ucc_status_t ucc_ec_cuda_get_resources(ucc_ec_cuda_resources_t **resources) +{ + CUcontext cu_ctx; + unsigned long long int cu_ctx_id; + ucc_status_t status; - if (ucc_ec_cuda.stream_initialized) { - CUDA_FUNC(cudaStreamDestroy(ucc_ec_cuda.stream)); - ucc_ec_cuda.stream_initialized = 0; + status = CUDADRV_FUNC(cuCtxGetCurrent(&cu_ctx)); + if (ucc_unlikely(status != UCC_OK)) { + ec_error(&ucc_ec_cuda.super, "failed to get current CUDA context"); + return status; } - if (ucc_ec_cuda.exec_streams_initialized) { - for (i = 0; i < EC_CUDA_CONFIG->exec_num_streams; i++) { - CUDA_FUNC(cudaStreamDestroy(ucc_ec_cuda.exec_streams[i])); - } - ucc_ec_cuda.exec_streams_initialized = 0; + status = CUDADRV_FUNC(cuCtxGetId(cu_ctx, &cu_ctx_id)); + if (ucc_unlikely(status != UCC_OK)) { + ec_error(&ucc_ec_cuda.super, "failed to get currect CUDA context ID"); } - ucc_ec_cuda_resources_cleanup(&ucc_ec_cuda.resources); - ucc_free(ucc_ec_cuda.exec_streams); + *resources = ec_cuda_resources_hash_get(ucc_ec_cuda.resources_hash, + cu_ctx_id); + if (ucc_unlikely(*resources == NULL)) { + ucc_spin_lock(&ucc_ec_cuda.init_spinlock); + *resources = ec_cuda_resources_hash_get(ucc_ec_cuda.resources_hash, + cu_ctx_id); + if (*resources == NULL) { + *resources = ucc_malloc(sizeof(ucc_ec_cuda_resources_t), + "ec cuda resources"); + if (*resources == NULL) { + ec_error(&ucc_ec_cuda.super, + "failed to allocate %zd bytes for resources", + sizeof(ucc_ec_cuda_resources_t)); + ucc_spin_unlock(&ucc_ec_cuda.init_spinlock); + return UCC_ERR_NO_MEMORY; + } + status = ucc_ec_cuda_resources_init(&ucc_ec_cuda.super, + EC_CUDA_CONFIG->exec_num_streams, + *resources); + if (status != UCC_OK) { + ucc_free(*resources); + ucc_spin_unlock(&ucc_ec_cuda.init_spinlock); + return status; + } + ec_cuda_resources_hash_put(ucc_ec_cuda.resources_hash, cu_ctx_id, + *resources); + } + ucc_spin_unlock(&ucc_ec_cuda.init_spinlock); + } return UCC_OK; } diff --git a/src/components/ec/cuda/ec_cuda.h b/src/components/ec/cuda/ec_cuda.h index c174e85788..47b1447e90 100644 --- a/src/components/ec/cuda/ec_cuda.h +++ b/src/components/ec/cuda/ec_cuda.h @@ -43,10 +43,7 @@ typedef struct ucc_ec_cuda_config { typedef struct ucc_ec_cuda { ucc_ec_base_t super; - int stream_initialized; - cudaStream_t stream; int exec_streams_initialized; - cudaStream_t *exec_streams; ucc_ec_cuda_resources_t resources; ucc_ec_cuda_resources_hash_t *resources_hash; ucc_thread_mode_t thread_mode; @@ -68,6 +65,8 @@ ucc_status_t ucc_ec_cuda_event_post(void *ee_context, void *event); ucc_status_t ucc_ec_cuda_event_test(void *event); +ucc_status_t ucc_ec_cuda_get_resources(ucc_ec_cuda_resources_t **resources); + extern ucc_ec_cuda_t ucc_ec_cuda; #define EC_CUDA_CONFIG \ diff --git a/src/components/ec/cuda/ec_cuda_executor_interruptible.c b/src/components/ec/cuda/ec_cuda_executor_interruptible.c index 3f1e091b64..2d3247dd7c 100644 --- a/src/components/ec/cuda/ec_cuda_executor_interruptible.c +++ b/src/components/ec/cuda/ec_cuda_executor_interruptible.c @@ -9,37 +9,43 @@ ucc_status_t ucc_cuda_executor_interruptible_get_stream(cudaStream_t *stream) { - static uint32_t last_used = 0; - int num_streams = EC_CUDA_CONFIG->exec_num_streams; - ucc_status_t st; - int i, j; - uint32_t id; + static uint32_t last_used = 0; + int num_streams = EC_CUDA_CONFIG->exec_num_streams; + ucc_ec_cuda_resources_t *resources; + ucc_status_t st; + int i, j; + uint32_t id; ucc_assert(num_streams > 0); - if (ucc_unlikely(!ucc_ec_cuda.exec_streams_initialized)) { + st = ucc_ec_cuda_get_resources(&resources); + if (ucc_unlikely(st != UCC_OK)) { + return st; + } + + if (ucc_unlikely(!resources->streams_initialized)) { ucc_spin_lock(&ucc_ec_cuda.init_spinlock); - if (ucc_ec_cuda.exec_streams_initialized) { + if (resources->streams_initialized) { goto unlock; } for(i = 0; i < num_streams; i++) { - st = CUDA_FUNC(cudaStreamCreateWithFlags(&ucc_ec_cuda.exec_streams[i], + st = CUDA_FUNC(cudaStreamCreateWithFlags(&(resources->exec_streams[i]), cudaStreamNonBlocking)); if (st != UCC_OK) { for (j = 0; j < i; j++) { - CUDA_FUNC(cudaStreamDestroy(ucc_ec_cuda.exec_streams[j])); + CUDA_FUNC(cudaStreamDestroy(resources->exec_streams[j])); } ucc_spin_unlock(&ucc_ec_cuda.init_spinlock); return st; } } - ucc_ec_cuda.exec_streams_initialized = 1; + resources->streams_initialized = 1; unlock: ucc_spin_unlock(&ucc_ec_cuda.init_spinlock); } id = ucc_atomic_fadd32(&last_used, 1); - *stream = ucc_ec_cuda.exec_streams[id % num_streams]; + *stream = resources->exec_streams[id % num_streams]; return UCC_OK; } @@ -52,14 +58,13 @@ ucc_cuda_executor_interruptible_task_post(ucc_ee_executor_t *executor, const ucc_ee_executor_task_args_t *task_args, ucc_ee_executor_task_t **task) { - cudaStream_t stream = NULL; + cudaStream_t stream = NULL; + size_t num_nodes = UCC_EE_EXECUTOR_MULTI_OP_NUM_BUFS; ucc_ec_cuda_executor_interruptible_task_t *ee_task; ucc_status_t status; cudaGraphNode_t nodes[UCC_EE_EXECUTOR_MULTI_OP_NUM_BUFS]; - size_t num_nodes = UCC_EE_EXECUTOR_MULTI_OP_NUM_BUFS; int i; - status = ucc_cuda_executor_interruptible_get_stream(&stream); if (ucc_unlikely(status != UCC_OK)) { return status; diff --git a/src/components/ec/cuda/ec_cuda_resources.c b/src/components/ec/cuda/ec_cuda_resources.c index 1bc674adda..9bfb119ec0 100644 --- a/src/components/ec/cuda/ec_cuda_resources.c +++ b/src/components/ec/cuda/ec_cuda_resources.c @@ -1,5 +1,6 @@ #include "ec_cuda_resources.h" #include "components/ec/ucc_ec_log.h" +#include "utils/ucc_malloc.h" #define EXEC_MAX_TASKS 128 @@ -111,6 +112,7 @@ static ucc_mpool_ops_t ucc_ec_cuda_interruptible_task_mpool_ops = { }; ucc_status_t ucc_ec_cuda_resources_init(ucc_ec_base_t *ec, + int num_streams, ucc_ec_cuda_resources_t *resources) { ucc_status_t status; @@ -153,8 +155,20 @@ ucc_status_t ucc_ec_cuda_resources_init(ucc_ec_base_t *ec, goto free_interruptible_tasks_mpool; } + resources->num_streams = num_streams; + resources->exec_streams = ucc_calloc(num_streams, sizeof(cudaStream_t), + "ec cuda streams"); + if (!resources->exec_streams) { + ec_error(ec, "failed to allocate %zd bytes for executor streams", + sizeof(cudaStream_t) * num_streams); + status = UCC_ERR_NO_MEMORY; + goto free_persistent_tasks_mpool; + } + return UCC_OK; +free_persistent_tasks_mpool: + ucc_mpool_cleanup(&resources->executor_persistent_tasks, 0); free_interruptible_tasks_mpool: ucc_mpool_cleanup(&resources->executor_persistent_tasks, 0); free_executors_mpool: @@ -167,8 +181,17 @@ ucc_status_t ucc_ec_cuda_resources_init(ucc_ec_base_t *ec, void ucc_ec_cuda_resources_cleanup(ucc_ec_cuda_resources_t *resources) { + int i; + + for (i = 0; i < resources->num_streams; i++) { + if (resources->exec_streams[i] != NULL) { + CUDA_FUNC(cudaStreamDestroy(resources->exec_streams[i])); + } + } ucc_mpool_cleanup(&resources->events, 1); ucc_mpool_cleanup(&resources->executors, 1); ucc_mpool_cleanup(&resources->executor_interruptible_tasks, 1); ucc_mpool_cleanup(&resources->executor_persistent_tasks, 1); + + ucc_free(resources->exec_streams); } diff --git a/src/components/ec/cuda/ec_cuda_resources.h b/src/components/ec/cuda/ec_cuda_resources.h index 6626f3e2ea..379457462e 100644 --- a/src/components/ec/cuda/ec_cuda_resources.h +++ b/src/components/ec/cuda/ec_cuda_resources.h @@ -67,13 +67,17 @@ typedef struct ucc_ec_cuda_executor_persistent_task { } ucc_ec_cuda_executor_persistent_task_t; typedef struct ucc_ec_cuda_resources { - ucc_mpool_t events; - ucc_mpool_t executors; - ucc_mpool_t executor_interruptible_tasks; - ucc_mpool_t executor_persistent_tasks; + ucc_mpool_t events; + ucc_mpool_t executors; + ucc_mpool_t executor_interruptible_tasks; + ucc_mpool_t executor_persistent_tasks; + int streams_initialized; + int num_streams; + cudaStream_t *exec_streams; } ucc_ec_cuda_resources_t; ucc_status_t ucc_ec_cuda_resources_init(ucc_ec_base_t *ec, + int num_streams, ucc_ec_cuda_resources_t *resources); void ucc_ec_cuda_resources_cleanup(ucc_ec_cuda_resources_t *resources); diff --git a/src/components/mc/cuda/Makefile.am b/src/components/mc/cuda/Makefile.am index 1e25e2109a..d8e1dbe55e 100644 --- a/src/components/mc/cuda/Makefile.am +++ b/src/components/mc/cuda/Makefile.am @@ -5,8 +5,10 @@ if HAVE_CUDA sources = \ - mc_cuda.h \ - mc_cuda.c + mc_cuda.h \ + mc_cuda.c \ + mc_cuda_resources.c \ + mc_cuda_resources.h module_LTLIBRARIES = libucc_mc_cuda.la libucc_mc_cuda_la_SOURCES = $(sources) diff --git a/src/components/mc/cuda/mc_cuda.h b/src/components/mc/cuda/mc_cuda.h index abc82312c2..df566726e6 100644 --- a/src/components/mc/cuda/mc_cuda.h +++ b/src/components/mc/cuda/mc_cuda.h @@ -24,8 +24,6 @@ typedef struct ucc_mc_cuda { ucc_mc_base_t super; int stream_initialized; cudaStream_t stream; - ucc_mpool_t events; - ucc_mpool_t strm_reqs; ucc_mpool_t mpool; int mpool_init_flag; ucc_spinlock_t init_spinlock;