Skip to content

Commit

Permalink
MC/CUDA: add support for multiple contexts
Browse files Browse the repository at this point in the history
  • Loading branch information
Sergei-Lebedev committed Sep 29, 2023
1 parent 8be0a78 commit cb3dd8b
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 94 deletions.
116 changes: 47 additions & 69 deletions src/components/ec/cuda/ec_cuda.c
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,6 @@ static ucc_status_t ucc_ec_cuda_init(const ucc_ec_params_t *ec_params)
struct cudaDeviceProp prop;
ucc_status_t status;

ucc_ec_cuda.stream = NULL;
ucc_ec_cuda.stream_initialized = 0;
ucc_ec_cuda.exec_streams_initialized = 0;
ucc_strncpy_safe(ucc_ec_cuda.super.config->log_component.name,
ucc_ec_cuda.super.super.name,
Expand Down Expand Up @@ -141,14 +139,6 @@ static ucc_status_t ucc_ec_cuda_init(const ucc_ec_params_t *ec_params)
cfg->exec_num_streams = 1;
}

ucc_ec_cuda.exec_streams = ucc_calloc(cfg->exec_num_streams,
sizeof(cudaStream_t),
"ec cuda streams");
if (!ucc_ec_cuda.exec_streams) {
ec_error(&ucc_ec_cuda.super, "failed to allocate streams array");
return UCC_ERR_NO_MEMORY;
}

if (cfg->strm_task_mode == UCC_EC_CUDA_TASK_KERNEL) {
ucc_ec_cuda.strm_task_mode = UCC_EC_CUDA_TASK_KERNEL;
} else {
Expand Down Expand Up @@ -197,6 +187,7 @@ static ucc_status_t ucc_ec_cuda_init(const ucc_ec_params_t *ec_params)

ucc_ec_cuda.resources_hash = kh_init(ucc_ec_cuda_resources_hash);
status = ucc_ec_cuda_resources_init(&ucc_ec_cuda.super,
EC_CUDA_CONFIG->exec_num_streams,
&ucc_ec_cuda.resources);
if (status != UCC_OK) {
ec_warn(&ucc_ec_cuda.super, "failed to initilize CUDA resources");
Expand All @@ -215,54 +206,6 @@ static ucc_status_t ucc_ec_cuda_get_attr(ucc_ec_attr_t *ec_attr)
return UCC_OK;
}

static ucc_status_t ucc_ec_cuda_get_resources(ucc_ec_cuda_resources_t **resources)
{
CUcontext cu_ctx;
unsigned long long int cu_ctx_id;
ucc_status_t status;

status = CUDADRV_FUNC(cuCtxGetCurrent(&cu_ctx));
if (ucc_unlikely(status != UCC_OK)) {
ec_error(&ucc_ec_cuda.super, "failed to get current CUDA context");
return status;
}

status = CUDADRV_FUNC(cuCtxGetId(cu_ctx, &cu_ctx_id));
if (ucc_unlikely(status != UCC_OK)) {
ec_error(&ucc_ec_cuda.super, "failed to get currect CUDA context ID");
}

*resources = ec_cuda_resources_hash_get(ucc_ec_cuda.resources_hash,
cu_ctx_id);
if (ucc_unlikely(*resources == NULL)) {
ucc_spin_lock(&ucc_ec_cuda.init_spinlock);
*resources = ec_cuda_resources_hash_get(ucc_ec_cuda.resources_hash,
cu_ctx_id);
if (*resources == NULL) {
*resources = ucc_malloc(sizeof(ucc_ec_cuda_resources_t),
"ec cuda resources");
if (*resources == NULL) {
ec_error(&ucc_ec_cuda.super,
"failed to allocate %zd bytes for resources",
sizeof(ucc_ec_cuda_resources_t));
ucc_spin_unlock(&ucc_ec_cuda.init_spinlock);
return UCC_ERR_NO_MEMORY;
}
status = ucc_ec_cuda_resources_init(&ucc_ec_cuda.super,
*resources);
if (status != UCC_OK) {
ucc_free(*resources);
ucc_spin_unlock(&ucc_ec_cuda.init_spinlock);
return status;
}
ec_cuda_resources_hash_put(ucc_ec_cuda.resources_hash, cu_ctx_id,
*resources);
}
ucc_spin_unlock(&ucc_ec_cuda.init_spinlock);
}
return UCC_OK;
}

ucc_status_t ucc_ec_cuda_event_create(void **event)
{
ucc_ec_cuda_event_t *cuda_event;
Expand Down Expand Up @@ -316,22 +259,57 @@ ucc_status_t ucc_ec_cuda_event_test(void *event)

static ucc_status_t ucc_ec_cuda_finalize()
{
int i;
ucc_ec_cuda_resources_cleanup(&ucc_ec_cuda.resources);

return UCC_OK;
}

ucc_status_t ucc_ec_cuda_get_resources(ucc_ec_cuda_resources_t **resources)
{
CUcontext cu_ctx;
unsigned long long int cu_ctx_id;
ucc_status_t status;

if (ucc_ec_cuda.stream_initialized) {
CUDA_FUNC(cudaStreamDestroy(ucc_ec_cuda.stream));
ucc_ec_cuda.stream_initialized = 0;
status = CUDADRV_FUNC(cuCtxGetCurrent(&cu_ctx));
if (ucc_unlikely(status != UCC_OK)) {
ec_error(&ucc_ec_cuda.super, "failed to get current CUDA context");
return status;
}

if (ucc_ec_cuda.exec_streams_initialized) {
for (i = 0; i < EC_CUDA_CONFIG->exec_num_streams; i++) {
CUDA_FUNC(cudaStreamDestroy(ucc_ec_cuda.exec_streams[i]));
}
ucc_ec_cuda.exec_streams_initialized = 0;
status = CUDADRV_FUNC(cuCtxGetId(cu_ctx, &cu_ctx_id));
if (ucc_unlikely(status != UCC_OK)) {
ec_error(&ucc_ec_cuda.super, "failed to get currect CUDA context ID");
}
ucc_ec_cuda_resources_cleanup(&ucc_ec_cuda.resources);
ucc_free(ucc_ec_cuda.exec_streams);

*resources = ec_cuda_resources_hash_get(ucc_ec_cuda.resources_hash,
cu_ctx_id);
if (ucc_unlikely(*resources == NULL)) {
ucc_spin_lock(&ucc_ec_cuda.init_spinlock);
*resources = ec_cuda_resources_hash_get(ucc_ec_cuda.resources_hash,
cu_ctx_id);
if (*resources == NULL) {
*resources = ucc_malloc(sizeof(ucc_ec_cuda_resources_t),
"ec cuda resources");
if (*resources == NULL) {
ec_error(&ucc_ec_cuda.super,
"failed to allocate %zd bytes for resources",
sizeof(ucc_ec_cuda_resources_t));
ucc_spin_unlock(&ucc_ec_cuda.init_spinlock);
return UCC_ERR_NO_MEMORY;
}
status = ucc_ec_cuda_resources_init(&ucc_ec_cuda.super,
EC_CUDA_CONFIG->exec_num_streams,
*resources);
if (status != UCC_OK) {
ucc_free(*resources);
ucc_spin_unlock(&ucc_ec_cuda.init_spinlock);
return status;
}
ec_cuda_resources_hash_put(ucc_ec_cuda.resources_hash, cu_ctx_id,
*resources);
}
ucc_spin_unlock(&ucc_ec_cuda.init_spinlock);
}
return UCC_OK;
}

Expand Down
5 changes: 2 additions & 3 deletions src/components/ec/cuda/ec_cuda.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,7 @@ typedef struct ucc_ec_cuda_config {

typedef struct ucc_ec_cuda {
ucc_ec_base_t super;
int stream_initialized;
cudaStream_t stream;
int exec_streams_initialized;
cudaStream_t *exec_streams;
ucc_ec_cuda_resources_t resources;
ucc_ec_cuda_resources_hash_t *resources_hash;
ucc_thread_mode_t thread_mode;
Expand All @@ -68,6 +65,8 @@ ucc_status_t ucc_ec_cuda_event_post(void *ee_context, void *event);

ucc_status_t ucc_ec_cuda_event_test(void *event);

ucc_status_t ucc_ec_cuda_get_resources(ucc_ec_cuda_resources_t **resources);

extern ucc_ec_cuda_t ucc_ec_cuda;

#define EC_CUDA_CONFIG \
Expand Down
33 changes: 19 additions & 14 deletions src/components/ec/cuda/ec_cuda_executor_interruptible.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,37 +9,43 @@

ucc_status_t ucc_cuda_executor_interruptible_get_stream(cudaStream_t *stream)
{
static uint32_t last_used = 0;
int num_streams = EC_CUDA_CONFIG->exec_num_streams;
ucc_status_t st;
int i, j;
uint32_t id;
static uint32_t last_used = 0;
int num_streams = EC_CUDA_CONFIG->exec_num_streams;
ucc_ec_cuda_resources_t *resources;
ucc_status_t st;
int i, j;
uint32_t id;

ucc_assert(num_streams > 0);
if (ucc_unlikely(!ucc_ec_cuda.exec_streams_initialized)) {
st = ucc_ec_cuda_get_resources(&resources);
if (ucc_unlikely(st != UCC_OK)) {
return st;
}

if (ucc_unlikely(!resources->streams_initialized)) {
ucc_spin_lock(&ucc_ec_cuda.init_spinlock);
if (ucc_ec_cuda.exec_streams_initialized) {
if (resources->streams_initialized) {
goto unlock;
}

for(i = 0; i < num_streams; i++) {
st = CUDA_FUNC(cudaStreamCreateWithFlags(&ucc_ec_cuda.exec_streams[i],
st = CUDA_FUNC(cudaStreamCreateWithFlags(&(resources->exec_streams[i]),
cudaStreamNonBlocking));
if (st != UCC_OK) {
for (j = 0; j < i; j++) {
CUDA_FUNC(cudaStreamDestroy(ucc_ec_cuda.exec_streams[j]));
CUDA_FUNC(cudaStreamDestroy(resources->exec_streams[j]));
}
ucc_spin_unlock(&ucc_ec_cuda.init_spinlock);
return st;
}
}
ucc_ec_cuda.exec_streams_initialized = 1;
resources->streams_initialized = 1;
unlock:
ucc_spin_unlock(&ucc_ec_cuda.init_spinlock);
}

id = ucc_atomic_fadd32(&last_used, 1);
*stream = ucc_ec_cuda.exec_streams[id % num_streams];
*stream = resources->exec_streams[id % num_streams];
return UCC_OK;
}

Expand All @@ -52,14 +58,13 @@ ucc_cuda_executor_interruptible_task_post(ucc_ee_executor_t *executor,
const ucc_ee_executor_task_args_t *task_args,
ucc_ee_executor_task_t **task)
{
cudaStream_t stream = NULL;
cudaStream_t stream = NULL;
size_t num_nodes = UCC_EE_EXECUTOR_MULTI_OP_NUM_BUFS;
ucc_ec_cuda_executor_interruptible_task_t *ee_task;
ucc_status_t status;
cudaGraphNode_t nodes[UCC_EE_EXECUTOR_MULTI_OP_NUM_BUFS];
size_t num_nodes = UCC_EE_EXECUTOR_MULTI_OP_NUM_BUFS;
int i;


status = ucc_cuda_executor_interruptible_get_stream(&stream);
if (ucc_unlikely(status != UCC_OK)) {
return status;
Expand Down
23 changes: 23 additions & 0 deletions src/components/ec/cuda/ec_cuda_resources.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "ec_cuda_resources.h"
#include "components/ec/ucc_ec_log.h"
#include "utils/ucc_malloc.h"

#define EXEC_MAX_TASKS 128

Expand Down Expand Up @@ -111,6 +112,7 @@ static ucc_mpool_ops_t ucc_ec_cuda_interruptible_task_mpool_ops = {
};

ucc_status_t ucc_ec_cuda_resources_init(ucc_ec_base_t *ec,
int num_streams,
ucc_ec_cuda_resources_t *resources)
{
ucc_status_t status;
Expand Down Expand Up @@ -153,8 +155,20 @@ ucc_status_t ucc_ec_cuda_resources_init(ucc_ec_base_t *ec,
goto free_interruptible_tasks_mpool;
}

resources->num_streams = num_streams;
resources->exec_streams = ucc_calloc(num_streams, sizeof(cudaStream_t),
"ec cuda streams");
if (!resources->exec_streams) {
ec_error(ec, "failed to allocate %zd bytes for executor streams",
sizeof(cudaStream_t) * num_streams);
status = UCC_ERR_NO_MEMORY;
goto free_persistent_tasks_mpool;
}

return UCC_OK;

free_persistent_tasks_mpool:
ucc_mpool_cleanup(&resources->executor_persistent_tasks, 0);
free_interruptible_tasks_mpool:
ucc_mpool_cleanup(&resources->executor_persistent_tasks, 0);
free_executors_mpool:
Expand All @@ -167,8 +181,17 @@ ucc_status_t ucc_ec_cuda_resources_init(ucc_ec_base_t *ec,

void ucc_ec_cuda_resources_cleanup(ucc_ec_cuda_resources_t *resources)
{
int i;

for (i = 0; i < resources->num_streams; i++) {
if (resources->exec_streams[i] != NULL) {
CUDA_FUNC(cudaStreamDestroy(resources->exec_streams[i]));
}
}
ucc_mpool_cleanup(&resources->events, 1);
ucc_mpool_cleanup(&resources->executors, 1);
ucc_mpool_cleanup(&resources->executor_interruptible_tasks, 1);
ucc_mpool_cleanup(&resources->executor_persistent_tasks, 1);

ucc_free(resources->exec_streams);
}
12 changes: 8 additions & 4 deletions src/components/ec/cuda/ec_cuda_resources.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,17 @@ typedef struct ucc_ec_cuda_executor_persistent_task {
} ucc_ec_cuda_executor_persistent_task_t;

typedef struct ucc_ec_cuda_resources {
ucc_mpool_t events;
ucc_mpool_t executors;
ucc_mpool_t executor_interruptible_tasks;
ucc_mpool_t executor_persistent_tasks;
ucc_mpool_t events;
ucc_mpool_t executors;
ucc_mpool_t executor_interruptible_tasks;
ucc_mpool_t executor_persistent_tasks;
int streams_initialized;
int num_streams;
cudaStream_t *exec_streams;
} ucc_ec_cuda_resources_t;

ucc_status_t ucc_ec_cuda_resources_init(ucc_ec_base_t *ec,
int num_streams,
ucc_ec_cuda_resources_t *resources);

void ucc_ec_cuda_resources_cleanup(ucc_ec_cuda_resources_t *resources);
Expand Down
6 changes: 4 additions & 2 deletions src/components/mc/cuda/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
if HAVE_CUDA

sources = \
mc_cuda.h \
mc_cuda.c
mc_cuda.h \
mc_cuda.c \
mc_cuda_resources.c \
mc_cuda_resources.h

module_LTLIBRARIES = libucc_mc_cuda.la
libucc_mc_cuda_la_SOURCES = $(sources)
Expand Down
2 changes: 0 additions & 2 deletions src/components/mc/cuda/mc_cuda.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ typedef struct ucc_mc_cuda {
ucc_mc_base_t super;
int stream_initialized;
cudaStream_t stream;
ucc_mpool_t events;
ucc_mpool_t strm_reqs;
ucc_mpool_t mpool;
int mpool_init_flag;
ucc_spinlock_t init_spinlock;
Expand Down

0 comments on commit cb3dd8b

Please sign in to comment.