diff --git a/src/components/ec/cuda/ec_cuda.c b/src/components/ec/cuda/ec_cuda.c index 1221f0d3ba..a3c3b5c2ba 100644 --- a/src/components/ec/cuda/ec_cuda.c +++ b/src/components/ec/cuda/ec_cuda.c @@ -103,8 +103,9 @@ static ucc_status_t ucc_ec_cuda_init(const ucc_ec_params_t *ec_params) int device, num_devices; cudaError_t cuda_st; struct cudaDeviceProp prop; - ucc_status_t status; + ucc_ec_cuda_config = ucc_derived_of(ucc_ec_cuda.super.config, + ucc_ec_cuda_config_t); ucc_ec_cuda.exec_streams_initialized = 0; ucc_strncpy_safe(ucc_ec_cuda.super.config->log_component.name, ucc_ec_cuda.super.super.name, @@ -186,14 +187,6 @@ static ucc_status_t ucc_ec_cuda_init(const ucc_ec_params_t *ec_params) } ucc_ec_cuda.resources_hash = kh_init(ucc_ec_cuda_resources_hash); - status = ucc_ec_cuda_resources_init(&ucc_ec_cuda.super, - EC_CUDA_CONFIG->exec_num_streams, - &ucc_ec_cuda.resources); - if (status != UCC_OK) { - ec_warn(&ucc_ec_cuda.super, "failed to initilize CUDA resources"); - return UCC_ERR_NOT_SUPPORTED; - } - ucc_spinlock_init(&ucc_ec_cuda.init_spinlock, 0); return UCC_OK; } @@ -259,7 +252,15 @@ ucc_status_t ucc_ec_cuda_event_test(void *event) static ucc_status_t ucc_ec_cuda_finalize() { - ucc_ec_cuda_resources_cleanup(&ucc_ec_cuda.resources); + ucc_ec_cuda_resources_t *resources; + + resources = ec_cuda_resources_hash_pop(ucc_ec_cuda.resources_hash); + while (resources) { + ucc_ec_cuda_resources_cleanup(resources); + resources = ec_cuda_resources_hash_pop(ucc_ec_cuda.resources_hash); + } + + ucc_spinlock_destroy(&ucc_ec_cuda.init_spinlock); return UCC_OK; } @@ -298,7 +299,6 @@ ucc_status_t ucc_ec_cuda_get_resources(ucc_ec_cuda_resources_t **resources) return UCC_ERR_NO_MEMORY; } status = ucc_ec_cuda_resources_init(&ucc_ec_cuda.super, - EC_CUDA_CONFIG->exec_num_streams, *resources); if (status != UCC_OK) { ucc_free(*resources); @@ -341,5 +341,7 @@ ucc_ec_cuda_t ucc_ec_cuda = { .super.executor_ops.finalize = ucc_cuda_executor_finalize, }; +ucc_ec_cuda_config_t *ucc_ec_cuda_config; + UCC_CONFIG_REGISTER_TABLE_ENTRY(&ucc_ec_cuda.super.config_table, &ucc_config_global_list); diff --git a/src/components/ec/cuda/ec_cuda.h b/src/components/ec/cuda/ec_cuda.h index 47b1447e90..84b8588605 100644 --- a/src/components/ec/cuda/ec_cuda.h +++ b/src/components/ec/cuda/ec_cuda.h @@ -16,35 +16,13 @@ #define WARP_SIZE 32 -typedef enum ucc_ec_cuda_strm_task_mode { - UCC_EC_CUDA_TASK_KERNEL, - UCC_EC_CUDA_TASK_MEM_OPS, - UCC_EC_CUDA_TASK_AUTO, - UCC_EC_CUDA_TASK_LAST, -} ucc_ec_cuda_strm_task_mode_t; - - typedef ucc_status_t (*ucc_ec_cuda_task_post_fn) (uint32_t *dev_status, int blocking_wait, cudaStream_t stream); -typedef struct ucc_ec_cuda_config { - ucc_ec_config_t super; - ucc_ec_cuda_strm_task_mode_t strm_task_mode; - unsigned long exec_num_workers; - unsigned long exec_num_threads; - unsigned long exec_max_tasks; - unsigned long exec_num_streams; - unsigned long reduce_num_blocks; - int reduce_num_threads; - int use_cooperative_launch; - unsigned long exec_copy_thresh; -} ucc_ec_cuda_config_t; - typedef struct ucc_ec_cuda { ucc_ec_base_t super; int exec_streams_initialized; - ucc_ec_cuda_resources_t resources; ucc_ec_cuda_resources_hash_t *resources_hash; ucc_thread_mode_t thread_mode; ucc_ec_cuda_strm_task_mode_t strm_task_mode; diff --git a/src/components/ec/cuda/ec_cuda_executor.c b/src/components/ec/cuda/ec_cuda_executor.c index c97e33766e..1349187b71 100644 --- a/src/components/ec/cuda/ec_cuda_executor.c +++ b/src/components/ec/cuda/ec_cuda_executor.c @@ -23,8 +23,16 @@ ucc_status_t ucc_cuda_executor_persistent_wait_stop(ucc_ee_executor_t *executor) ucc_status_t ucc_cuda_executor_init(const ucc_ee_executor_params_t *params, ucc_ee_executor_t **executor) { - ucc_ec_cuda_executor_t *eee = ucc_mpool_get(&ucc_ec_cuda.resources.executors); + ucc_ec_cuda_executor_t *eee; + ucc_ec_cuda_resources_t *resources; + ucc_status_t status; + status = ucc_ec_cuda_get_resources(&resources); + if (ucc_unlikely(status != UCC_OK)) { + return status; + } + + eee = ucc_mpool_get(&resources->executors); if (ucc_unlikely(!eee)) { ec_error(&ucc_ec_cuda.super, "failed to allocate executor"); return UCC_ERR_NO_MEMORY; diff --git a/src/components/ec/cuda/ec_cuda_executor_interruptible.c b/src/components/ec/cuda/ec_cuda_executor_interruptible.c index 2d3247dd7c..0d567216dd 100644 --- a/src/components/ec/cuda/ec_cuda_executor_interruptible.c +++ b/src/components/ec/cuda/ec_cuda_executor_interruptible.c @@ -63,14 +63,20 @@ ucc_cuda_executor_interruptible_task_post(ucc_ee_executor_t *executor, ucc_ec_cuda_executor_interruptible_task_t *ee_task; ucc_status_t status; cudaGraphNode_t nodes[UCC_EE_EXECUTOR_MULTI_OP_NUM_BUFS]; + ucc_ec_cuda_resources_t *resources; int i; + status = ucc_ec_cuda_get_resources(&resources); + if (ucc_unlikely(status != UCC_OK)) { + return status; + } + status = ucc_cuda_executor_interruptible_get_stream(&stream); if (ucc_unlikely(status != UCC_OK)) { return status; } - ee_task = ucc_mpool_get(&ucc_ec_cuda.resources.executor_interruptible_tasks); + ee_task = ucc_mpool_get(&resources->executor_interruptible_tasks); if (ucc_unlikely(!ee_task)) { return UCC_ERR_NO_MEMORY; } diff --git a/src/components/ec/cuda/ec_cuda_executor_persistent.c b/src/components/ec/cuda/ec_cuda_executor_persistent.c index 5ba24035ec..c43b132e12 100644 --- a/src/components/ec/cuda/ec_cuda_executor_persistent.c +++ b/src/components/ec/cuda/ec_cuda_executor_persistent.c @@ -18,12 +18,19 @@ ucc_cuda_executor_persistent_task_post(ucc_ee_executor_t *executor, ucc_ee_executor_task_args_t *subtask_args; ucc_ec_cuda_executor_persistent_task_t *ee_task; int i; + ucc_ec_cuda_resources_t *resources; + ucc_status_t status; + + status = ucc_ec_cuda_get_resources(&resources); + if (ucc_unlikely(status != UCC_OK)) { + return status; + } if (ucc_ec_cuda.thread_mode == UCC_THREAD_MULTIPLE) { ucc_spin_lock(&eee->tasks_lock); } - ee_task = ucc_mpool_get(&ucc_ec_cuda.resources.executor_persistent_tasks); + ee_task = ucc_mpool_get(&resources->executor_persistent_tasks); if (ucc_unlikely(!ee_task)) { return UCC_ERR_NO_MEMORY; } diff --git a/src/components/ec/cuda/ec_cuda_resources.c b/src/components/ec/cuda/ec_cuda_resources.c index 9bfb119ec0..5bc0043f1f 100644 --- a/src/components/ec/cuda/ec_cuda_resources.c +++ b/src/components/ec/cuda/ec_cuda_resources.c @@ -2,8 +2,6 @@ #include "components/ec/ucc_ec_log.h" #include "utils/ucc_malloc.h" -#define EXEC_MAX_TASKS 128 - static void ucc_ec_cuda_event_init(ucc_mpool_t *mp, void *obj, void *chunk) //NOLINT: mp is unused { ucc_ec_cuda_event_t *base = (ucc_ec_cuda_event_t *) obj; @@ -43,9 +41,7 @@ static void ucc_ec_cuda_executor_chunk_init(ucc_mpool_t *mp, void *obj, //NOLINT void *chunk) //NOLINT: chunk is unused { ucc_ec_cuda_executor_t *eee = (ucc_ec_cuda_executor_t*) obj; - // int max_tasks = EC_CUDA_CONFIG->exec_max_tasks; - //TODO: add config - int max_tasks = EXEC_MAX_TASKS; + int max_tasks = ucc_ec_cuda_config->exec_max_tasks; CUDA_FUNC(cudaHostGetDevicePointer( (void**)(&eee->dev_state), (void *)&eee->state, 0)); @@ -112,11 +108,12 @@ static ucc_mpool_ops_t ucc_ec_cuda_interruptible_task_mpool_ops = { }; ucc_status_t ucc_ec_cuda_resources_init(ucc_ec_base_t *ec, - int num_streams, ucc_ec_cuda_resources_t *resources) { ucc_status_t status; + int num_streams; + CUDADRV_CHECK(cuCtxGetCurrent(&resources->cu_ctx)); status = ucc_mpool_init(&resources->events, 0, sizeof(ucc_ec_cuda_event_t), 0, UCC_CACHE_LINE_SIZE, 16, UINT_MAX, &ucc_ec_cuda_event_mpool_ops, UCC_THREAD_MULTIPLE, @@ -155,7 +152,7 @@ ucc_status_t ucc_ec_cuda_resources_init(ucc_ec_base_t *ec, goto free_interruptible_tasks_mpool; } - resources->num_streams = num_streams; + num_streams = ucc_ec_cuda_config->exec_num_streams; resources->exec_streams = ucc_calloc(num_streams, sizeof(cudaStream_t), "ec cuda streams"); if (!resources->exec_streams) { @@ -182,8 +179,10 @@ ucc_status_t ucc_ec_cuda_resources_init(ucc_ec_base_t *ec, void ucc_ec_cuda_resources_cleanup(ucc_ec_cuda_resources_t *resources) { int i; + CUcontext tmp_context; - for (i = 0; i < resources->num_streams; i++) { + cuCtxPushCurrent(resources->cu_ctx); + for (i = 0; i < ucc_ec_cuda_config->exec_num_streams; i++) { if (resources->exec_streams[i] != NULL) { CUDA_FUNC(cudaStreamDestroy(resources->exec_streams[i])); } @@ -194,4 +193,5 @@ void ucc_ec_cuda_resources_cleanup(ucc_ec_cuda_resources_t *resources) ucc_mpool_cleanup(&resources->executor_persistent_tasks, 1); ucc_free(resources->exec_streams); + cuCtxPopCurrent(&tmp_context); } diff --git a/src/components/ec/cuda/ec_cuda_resources.h b/src/components/ec/cuda/ec_cuda_resources.h index 379457462e..1390f76cdd 100644 --- a/src/components/ec/cuda/ec_cuda_resources.h +++ b/src/components/ec/cuda/ec_cuda_resources.h @@ -67,6 +67,7 @@ typedef struct ucc_ec_cuda_executor_persistent_task { } ucc_ec_cuda_executor_persistent_task_t; typedef struct ucc_ec_cuda_resources { + CUcontext cu_ctx; ucc_mpool_t events; ucc_mpool_t executors; ucc_mpool_t executor_interruptible_tasks; @@ -76,8 +77,29 @@ typedef struct ucc_ec_cuda_resources { cudaStream_t *exec_streams; } ucc_ec_cuda_resources_t; +typedef enum ucc_ec_cuda_strm_task_mode { + UCC_EC_CUDA_TASK_KERNEL, + UCC_EC_CUDA_TASK_MEM_OPS, + UCC_EC_CUDA_TASK_AUTO, + UCC_EC_CUDA_TASK_LAST, +} ucc_ec_cuda_strm_task_mode_t; + +typedef struct ucc_ec_cuda_config { + ucc_ec_config_t super; + ucc_ec_cuda_strm_task_mode_t strm_task_mode; + unsigned long exec_num_workers; + unsigned long exec_num_threads; + unsigned long exec_max_tasks; + unsigned long exec_num_streams; + unsigned long reduce_num_blocks; + int reduce_num_threads; + int use_cooperative_launch; + unsigned long exec_copy_thresh; +} ucc_ec_cuda_config_t; + +extern ucc_ec_cuda_config_t *ucc_ec_cuda_config; + ucc_status_t ucc_ec_cuda_resources_init(ucc_ec_base_t *ec, - int num_streams, ucc_ec_cuda_resources_t *resources); void ucc_ec_cuda_resources_cleanup(ucc_ec_cuda_resources_t *resources); @@ -112,5 +134,25 @@ void ec_cuda_resources_hash_put(ucc_ec_cuda_resources_hash_t *h, kh_value(h, k) = value; } +static inline +void* ec_cuda_resources_hash_pop(ucc_ec_cuda_resources_hash_t *h) +{ + void *resources = NULL; + khiter_t k; + + k = kh_begin(h); + while (k != kh_end(h)) { + if (kh_exist(h, k)) { + resources = kh_value(h, k); + break; + } + k++; + } + + if (resources) { + kh_del(ucc_ec_cuda_resources_hash, h, k); + } + return resources; +} #endif diff --git a/src/components/ec/ucc_ec.c b/src/components/ec/ucc_ec.c index af83e301b4..4f691a01ec 100644 --- a/src/components/ec/ucc_ec.c +++ b/src/components/ec/ucc_ec.c @@ -4,6 +4,7 @@ * See file LICENSE for terms. */ +#include #include "config.h" #include "base/ucc_ec_base.h" #include "ucc_ec.h" @@ -13,6 +14,7 @@ static const ucc_ec_ops_t *ec_ops[UCC_EE_LAST]; static const ucc_ee_executor_ops_t *executor_ops[UCC_EE_LAST]; +static pthread_mutex_t ucc_ec_mutex = PTHREAD_MUTEX_INITIALIZER; #define UCC_CHECK_EC_AVAILABLE(ee) \ do { \ @@ -28,6 +30,7 @@ ucc_status_t ucc_ec_init(const ucc_ec_params_t *ec_params) ucc_status_t status; ucc_ec_attr_t attr; + pthread_mutex_lock(&ucc_ec_mutex); memset(ec_ops, 0, UCC_EE_LAST * sizeof(ucc_ec_ops_t *)); n_ecs = ucc_global_config.ec_framework.n_components; for (i = 0; i < n_ecs; i++) { @@ -75,6 +78,7 @@ ucc_status_t ucc_ec_init(const ucc_ec_params_t *ec_params) ec_ops[ec->type] = &ec->ops; executor_ops[ec->type] = &ec->executor_ops; } + pthread_mutex_unlock(&ucc_ec_mutex); return UCC_OK; } @@ -102,6 +106,7 @@ ucc_status_t ucc_ec_finalize() ucc_ee_type_t et; ucc_ec_base_t *ec; + pthread_mutex_lock(&ucc_ec_mutex); for (et = UCC_EE_FIRST; et < UCC_EE_LAST; et++) { if (NULL != ec_ops[et]) { ec = ucc_container_of(ec_ops[et], ucc_ec_base_t, ops); @@ -115,6 +120,7 @@ ucc_status_t ucc_ec_finalize() } } } + pthread_mutex_unlock(&ucc_ec_mutex); return UCC_OK; } diff --git a/src/components/mc/cuda/mc_cuda.c b/src/components/mc/cuda/mc_cuda.c index 5c820bd768..663ddbdfe9 100644 --- a/src/components/mc/cuda/mc_cuda.c +++ b/src/components/mc/cuda/mc_cuda.c @@ -50,8 +50,8 @@ static ucc_status_t ucc_mc_cuda_init(const ucc_mc_params_t *mc_params) int num_devices, driver_ver; cudaError_t cuda_st; - ucc_mc_cuda.stream = NULL; - ucc_mc_cuda.stream_initialized = 0; + ucc_mc_cuda_config = ucc_derived_of(ucc_mc_cuda.super.config, + ucc_mc_cuda_config_t); ucc_strncpy_safe(ucc_mc_cuda.super.config->log_component.name, ucc_mc_cuda.super.super.name, sizeof(ucc_mc_cuda.super.config->log_component.name)); @@ -100,6 +100,7 @@ static ucc_status_t ucc_mc_cuda_init(const ucc_mc_params_t *mc_params) "with driver version %d", driver_ver); } #endif + ucc_mc_cuda.resources_hash = kh_init(ucc_mc_cuda_resources_hash); // lock assures single mpool initiation when multiple threads concurrently execute // different collective operations thus concurrently entering init function. ucc_spinlock_init(&ucc_mc_cuda.init_spinlock, 0); @@ -120,8 +121,9 @@ static ucc_status_t ucc_mc_cuda_mem_alloc(ucc_mc_buffer_header_t **h_ptr, ucc_memory_type_t mt) { cudaError_t st; - ucc_mc_buffer_header_t *h = - ucc_malloc(sizeof(ucc_mc_buffer_header_t), "mc cuda"); + ucc_mc_buffer_header_t *h; + + h = ucc_malloc(sizeof(ucc_mc_buffer_header_t), "mc cuda"); if (ucc_unlikely(!h)) { mc_error(&ucc_mc_cuda.super, "failed to allocate %zd bytes", sizeof(ucc_mc_buffer_header_t)); @@ -132,13 +134,13 @@ static ucc_status_t ucc_mc_cuda_mem_alloc(ucc_mc_buffer_header_t **h_ptr, cudaMemAttachGlobal); if (ucc_unlikely(st != cudaSuccess)) { cudaGetLastError(); - mc_error(&ucc_mc_cuda.super, - "failed to allocate %zd bytes, " + mc_error(&ucc_mc_cuda.super, "failed to allocate %zd bytes, " "cuda error %d(%s)", size, st, cudaGetErrorString(st)); ucc_free(h); return UCC_ERR_NO_MEMORY; } + h->from_pool = 0; h->mt = UCC_MEMORY_TYPE_CUDA; *h_ptr = h; @@ -151,15 +153,25 @@ static ucc_status_t ucc_mc_cuda_mem_pool_alloc(ucc_mc_buffer_header_t **h_ptr, size_t size, ucc_memory_type_t mt) { - ucc_mc_buffer_header_t *h = NULL; - if (size <= MC_CUDA_CONFIG->mpool_elem_size && - mt != UCC_MEMORY_TYPE_CUDA_MANAGED) { - h = (ucc_mc_buffer_header_t *)ucc_mpool_get(&ucc_mc_cuda.mpool); + ucc_mc_buffer_header_t *h = NULL; + ucc_mc_cuda_resources_t *resources; + ucc_status_t status; + + if ((size <= MC_CUDA_CONFIG->mpool_elem_size) && + (mt != UCC_MEMORY_TYPE_CUDA_MANAGED)) { + status = ucc_mc_cuda_get_resources(&resources); + if (ucc_unlikely(status != UCC_OK)) { + return status; + } + + h = (ucc_mc_buffer_header_t *)ucc_mpool_get(&resources->scratch_mpool); } + if (!h) { // Slow path return ucc_mc_cuda_mem_alloc(h_ptr, size, mt); } + if (ucc_unlikely(!h->addr)){ return UCC_ERR_NO_MEMORY; } @@ -168,61 +180,6 @@ static ucc_status_t ucc_mc_cuda_mem_pool_alloc(ucc_mc_buffer_header_t **h_ptr, return UCC_OK; } -static ucc_status_t ucc_mc_cuda_chunk_alloc(ucc_mpool_t *mp, //NOLINT - size_t *size_p, - void **chunk_p) -{ - *chunk_p = ucc_malloc(*size_p, "mc cuda"); - if (!*chunk_p) { - mc_error(&ucc_mc_cuda.super, "failed to allocate %zd bytes", *size_p); - return UCC_ERR_NO_MEMORY; - } - - return UCC_OK; -} - -static void ucc_mc_cuda_chunk_init(ucc_mpool_t *mp, //NOLINT - void *obj, void *chunk) //NOLINT -{ - ucc_mc_buffer_header_t *h = (ucc_mc_buffer_header_t *)obj; - cudaError_t st = cudaMalloc(&h->addr, MC_CUDA_CONFIG->mpool_elem_size); - if (st != cudaSuccess) { - // h->addr will be 0 so ucc_mc_cuda_mem_alloc_pool function will - // return UCC_ERR_NO_MEMORY. As such mc_error message is suffice. - cudaGetLastError(); - mc_error(&ucc_mc_cuda.super, - "failed to allocate %zd bytes, " - "cuda error %d(%s)", - MC_CUDA_CONFIG->mpool_elem_size, st, cudaGetErrorString(st)); - } - h->from_pool = 1; - h->mt = UCC_MEMORY_TYPE_CUDA; -} - -static void ucc_mc_cuda_chunk_release(ucc_mpool_t *mp, void *chunk) //NOLINT: mp is unused -{ - ucc_free(chunk); -} - -static void ucc_mc_cuda_chunk_cleanup(ucc_mpool_t *mp, void *obj) //NOLINT: mp is unused -{ - ucc_mc_buffer_header_t *h = (ucc_mc_buffer_header_t *)obj; - cudaError_t st; - st = cudaFree(h->addr); - if (st != cudaSuccess) { - cudaGetLastError(); - mc_error(&ucc_mc_cuda.super, - "failed to free mem at %p, " - "cuda error %d(%s)", - obj, st, cudaGetErrorString(st)); - } -} - -static ucc_mpool_ops_t ucc_mc_ops = {.chunk_alloc = ucc_mc_cuda_chunk_alloc, - .chunk_release = ucc_mc_cuda_chunk_release, - .obj_init = ucc_mc_cuda_chunk_init, - .obj_cleanup = ucc_mc_cuda_chunk_cleanup}; - static ucc_status_t ucc_mc_cuda_mem_free(ucc_mc_buffer_header_t *h_ptr) { cudaError_t st; @@ -250,92 +207,72 @@ static ucc_status_t ucc_mc_cuda_mem_pool_free(ucc_mc_buffer_header_t *h_ptr) static ucc_status_t ucc_mc_cuda_mem_pool_alloc_with_init(ucc_mc_buffer_header_t **h_ptr, - size_t size, - ucc_memory_type_t mt) + size_t size, + ucc_memory_type_t mt) { - // lock assures single mpool initiation when multiple threads concurrently execute - // different collective operations thus concurrently entering init function. - ucc_spin_lock(&ucc_mc_cuda.init_spinlock); - if (MC_CUDA_CONFIG->mpool_max_elems == 0) { ucc_mc_cuda.super.ops.mem_alloc = ucc_mc_cuda_mem_alloc; ucc_mc_cuda.super.ops.mem_free = ucc_mc_cuda_mem_free; - ucc_spin_unlock(&ucc_mc_cuda.init_spinlock); return ucc_mc_cuda_mem_alloc(h_ptr, size, mt); - } - - if (!ucc_mc_cuda.mpool_init_flag) { - ucc_status_t status = ucc_mpool_init( - &ucc_mc_cuda.mpool, 0, sizeof(ucc_mc_buffer_header_t), 0, - UCC_CACHE_LINE_SIZE, 1, MC_CUDA_CONFIG->mpool_max_elems, - &ucc_mc_ops, ucc_mc_cuda.thread_mode, "mc cuda mpool buffers"); - if (status != UCC_OK) { - ucc_spin_unlock(&ucc_mc_cuda.init_spinlock); - return status; - } + } else { ucc_mc_cuda.super.ops.mem_alloc = ucc_mc_cuda_mem_pool_alloc; - ucc_mc_cuda.mpool_init_flag = 1; + ucc_mc_cuda.super.ops.mem_free = ucc_mc_cuda_mem_pool_free; + return ucc_mc_cuda_mem_pool_alloc(h_ptr, size, mt); } - ucc_spin_unlock(&ucc_mc_cuda.init_spinlock); - return ucc_mc_cuda_mem_pool_alloc(h_ptr, size, mt); } static ucc_status_t ucc_mc_cuda_memcpy(void *dst, const void *src, size_t len, ucc_memory_type_t dst_mem, ucc_memory_type_t src_mem) { - cudaError_t st; + ucc_status_t status; + ucc_mc_cuda_resources_t *resources; + ucc_assert(dst_mem == UCC_MEMORY_TYPE_CUDA || src_mem == UCC_MEMORY_TYPE_CUDA || dst_mem == UCC_MEMORY_TYPE_CUDA_MANAGED || src_mem == UCC_MEMORY_TYPE_CUDA_MANAGED); - UCC_MC_CUDA_INIT_STREAM(); - st = cudaMemcpyAsync(dst, src, len, cudaMemcpyDefault, ucc_mc_cuda.stream); - if (ucc_unlikely(st != cudaSuccess)) { - cudaGetLastError(); - mc_error(&ucc_mc_cuda.super, - "failed to launch cudaMemcpyAsync, dst %p, src %p, len %zd " - "cuda error %d(%s)", - dst, src, len, st, cudaGetErrorString(st)); - return UCC_ERR_NO_MESSAGE; + status = ucc_mc_cuda_get_resources(&resources); + if (ucc_unlikely(status) != UCC_OK) { + return status; } - st = cudaStreamSynchronize(ucc_mc_cuda.stream); - if (ucc_unlikely(st != cudaSuccess)) { - cudaGetLastError(); + + status = CUDA_FUNC(cudaMemcpyAsync(dst, src, len, cudaMemcpyDefault, + resources->stream)); + if (ucc_unlikely(status != UCC_OK)) { mc_error(&ucc_mc_cuda.super, - "failed to synchronize mc_cuda.stream " - "cuda error %d(%s)", - st, cudaGetErrorString(st)); - return UCC_ERR_NO_MESSAGE; + "failed to launch cudaMemcpyAsync, dst %p, src %p, len %zd", + dst, src, len); + return status; } - return UCC_OK; + + status = CUDA_FUNC(cudaStreamSynchronize(resources->stream)); + + return status; } ucc_status_t ucc_mc_cuda_memset(void *ptr, int val, size_t len) { - cudaError_t st; + ucc_status_t status; + ucc_mc_cuda_resources_t *resources; - UCC_MC_CUDA_INIT_STREAM(); - st = cudaMemsetAsync(ptr, val, len, ucc_mc_cuda.stream); - if (ucc_unlikely(st != cudaSuccess)) { - cudaGetLastError(); - mc_error(&ucc_mc_cuda.super, - "failed to launch cudaMemsetAsync, dst %p, len %zd " - "cuda error %d(%s)", - ptr, len, st, cudaGetErrorString(st)); - return UCC_ERR_NO_MESSAGE; + status = ucc_mc_cuda_get_resources(&resources); + if (ucc_unlikely(status) != UCC_OK) { + return status; } - st = cudaStreamSynchronize(ucc_mc_cuda.stream); - if (ucc_unlikely(st != cudaSuccess)) { - cudaGetLastError(); + + status = CUDA_FUNC(cudaMemsetAsync(ptr, val, len, resources->stream)); + if (ucc_unlikely(status != UCC_OK)) { mc_error(&ucc_mc_cuda.super, - "failed to synchronize mc_cuda.stream " - "cuda error %d(%s)", - st, cudaGetErrorString(st)); - return UCC_ERR_NO_MESSAGE; + "failed to launch cudaMemsetAsync, dst %p, len %zd", + ptr, len); + return status; } - return UCC_OK; + + status = CUDA_FUNC(cudaStreamSynchronize(resources->stream)); + + return status; } static ucc_status_t ucc_mc_cuda_mem_query(const void *ptr, @@ -407,17 +344,65 @@ static ucc_status_t ucc_mc_cuda_mem_query(const void *ptr, return UCC_OK; } -static ucc_status_t ucc_mc_cuda_finalize() +ucc_status_t ucc_mc_cuda_get_resources(ucc_mc_cuda_resources_t **resources) { - if (ucc_mc_cuda.stream != NULL) { - CUDA_CHECK(cudaStreamDestroy(ucc_mc_cuda.stream)); - ucc_mc_cuda.stream = NULL; + CUcontext cu_ctx; + unsigned long long int cu_ctx_id; + ucc_status_t status; + + status = CUDADRV_FUNC(cuCtxGetCurrent(&cu_ctx)); + if (ucc_unlikely(status != UCC_OK)) { + mc_error(&ucc_mc_cuda.super, "failed to get current CUDA context"); + return status; + } + + status = CUDADRV_FUNC(cuCtxGetId(cu_ctx, &cu_ctx_id)); + if (ucc_unlikely(status != UCC_OK)) { + mc_error(&ucc_mc_cuda.super, "failed to get currect CUDA context ID"); + } + + *resources = mc_cuda_resources_hash_get(ucc_mc_cuda.resources_hash, + cu_ctx_id); + if (ucc_unlikely(*resources == NULL)) { + ucc_spin_lock(&ucc_mc_cuda.init_spinlock); + *resources = mc_cuda_resources_hash_get(ucc_mc_cuda.resources_hash, + cu_ctx_id); + if (*resources == NULL) { + *resources = ucc_malloc(sizeof(ucc_mc_cuda_resources_t), + "mc cuda resources"); + if (*resources == NULL) { + mc_error(&ucc_mc_cuda.super, + "failed to allocate %zd bytes for resources", + sizeof(ucc_mc_cuda_resources_t)); + ucc_spin_unlock(&ucc_mc_cuda.init_spinlock); + return UCC_ERR_NO_MEMORY; + } + status = ucc_mc_cuda_resources_init(&ucc_mc_cuda.super, + *resources); + if (status != UCC_OK) { + ucc_free(*resources); + ucc_spin_unlock(&ucc_mc_cuda.init_spinlock); + return status; + } + mc_cuda_resources_hash_put(ucc_mc_cuda.resources_hash, cu_ctx_id, + *resources); + } + ucc_spin_unlock(&ucc_mc_cuda.init_spinlock); } - if (ucc_mc_cuda.mpool_init_flag) { - ucc_mpool_cleanup(&ucc_mc_cuda.mpool, 1); - ucc_mc_cuda.mpool_init_flag = 0; - ucc_mc_cuda.super.ops.mem_alloc = ucc_mc_cuda_mem_pool_alloc_with_init; + return UCC_OK; +} + +static ucc_status_t ucc_mc_cuda_finalize() +{ + ucc_mc_cuda_resources_t *resources; + + resources = mc_cuda_resources_hash_pop(ucc_mc_cuda.resources_hash); + while (resources) { + ucc_mc_cuda_resources_cleanup(resources); + resources = mc_cuda_resources_hash_pop(ucc_mc_cuda.resources_hash); } + + ucc_mc_cuda.super.ops.mem_alloc = ucc_mc_cuda_mem_pool_alloc_with_init; ucc_spinlock_destroy(&ucc_mc_cuda.init_spinlock); return UCC_OK; } @@ -443,8 +428,9 @@ ucc_mc_cuda_t ucc_mc_cuda = { .table = ucc_mc_cuda_config_table, .size = sizeof(ucc_mc_cuda_config_t), }, - .mpool_init_flag = 0, }; +ucc_mc_cuda_config_t *ucc_mc_cuda_config; + UCC_CONFIG_REGISTER_TABLE_ENTRY(&ucc_mc_cuda.super.config_table, &ucc_config_global_list); diff --git a/src/components/mc/cuda/mc_cuda.h b/src/components/mc/cuda/mc_cuda.h index df566726e6..90075504d9 100644 --- a/src/components/mc/cuda/mc_cuda.h +++ b/src/components/mc/cuda/mc_cuda.h @@ -7,27 +7,18 @@ #ifndef UCC_MC_CUDA_H_ #define UCC_MC_CUDA_H_ -#include +#include #include "components/mc/base/ucc_mc_base.h" #include "components/mc/ucc_mc_log.h" #include "utils/ucc_mpool.h" #include "utils/arch/cuda_def.h" -#include - -typedef struct ucc_mc_cuda_config { - ucc_mc_config_t super; - size_t mpool_elem_size; - int mpool_max_elems; -} ucc_mc_cuda_config_t; +#include "mc_cuda_resources.h" typedef struct ucc_mc_cuda { ucc_mc_base_t super; - int stream_initialized; - cudaStream_t stream; - ucc_mpool_t mpool; - int mpool_init_flag; ucc_spinlock_t init_spinlock; ucc_thread_mode_t thread_mode; + ucc_mc_cuda_resources_hash_t *resources_hash; } ucc_mc_cuda_t; extern ucc_mc_cuda_t ucc_mc_cuda; @@ -35,21 +26,7 @@ extern ucc_mc_cuda_t ucc_mc_cuda; #define MC_CUDA_CONFIG \ (ucc_derived_of(ucc_mc_cuda.super.config, ucc_mc_cuda_config_t)) -#define UCC_MC_CUDA_INIT_STREAM() do { \ - if (!ucc_mc_cuda.stream_initialized) { \ - cudaError_t cuda_st = cudaSuccess; \ - ucc_spin_lock(&ucc_mc_cuda.init_spinlock); \ - if (!ucc_mc_cuda.stream_initialized) { \ - cuda_st = cudaStreamCreateWithFlags(&ucc_mc_cuda.stream, \ - cudaStreamNonBlocking); \ - ucc_mc_cuda.stream_initialized = 1; \ - } \ - ucc_spin_unlock(&ucc_mc_cuda.init_spinlock); \ - if (ucc_unlikely(cudaSuccess != cuda_st)) { \ - return cuda_error_to_ucc_status(cuda_st); \ - } \ - } \ -} while(0) +ucc_status_t ucc_mc_cuda_get_resources(ucc_mc_cuda_resources_t **resources); ucc_status_t ucc_mc_cuda_memset(void *ptr, int val, size_t len); diff --git a/src/utils/arch/cuda_def.h b/src/utils/arch/cuda_def.h index 7f690531e2..d758846c9d 100644 --- a/src/utils/arch/cuda_def.h +++ b/src/utils/arch/cuda_def.h @@ -74,6 +74,15 @@ static inline ucc_status_t cuda_error_to_ucc_status(cudaError_t cuda_status) } \ } while(0) +#define CUDADRV_CHECK(_cmd) \ + /* coverity[dead_error_line] */ \ + do { \ + ucc_status_t _cuda_status = CUDADRV_FUNC(_cmd); \ + if (ucc_unlikely(_cuda_status != UCC_OK)) { \ + return _cuda_status; \ + } \ + } while(0) + #define CUDA_CHECK_GOTO(_cmd, _label, _cuda_status) \ do { \ _cuda_status = CUDA_FUNC(_cmd); \