Skip to content

Commit

Permalink
MC/CUDA: support multiple contexts
Browse files Browse the repository at this point in the history
  • Loading branch information
Sergei-Lebedev committed Oct 3, 2023
1 parent 5269a66 commit 106a67d
Show file tree
Hide file tree
Showing 13 changed files with 408 additions and 203 deletions.
28 changes: 17 additions & 11 deletions src/components/ec/cuda/ec_cuda.c
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,9 @@ static ucc_status_t ucc_ec_cuda_init(const ucc_ec_params_t *ec_params)
int device, num_devices;
cudaError_t cuda_st;
struct cudaDeviceProp prop;
ucc_status_t status;

ucc_ec_cuda_config = ucc_derived_of(ucc_ec_cuda.super.config,
ucc_ec_cuda_config_t);
ucc_ec_cuda.exec_streams_initialized = 0;
ucc_strncpy_safe(ucc_ec_cuda.super.config->log_component.name,
ucc_ec_cuda.super.super.name,
Expand Down Expand Up @@ -186,14 +187,6 @@ static ucc_status_t ucc_ec_cuda_init(const ucc_ec_params_t *ec_params)
}

ucc_ec_cuda.resources_hash = kh_init(ucc_ec_cuda_resources_hash);
status = ucc_ec_cuda_resources_init(&ucc_ec_cuda.super,
EC_CUDA_CONFIG->exec_num_streams,
&ucc_ec_cuda.resources);
if (status != UCC_OK) {
ec_warn(&ucc_ec_cuda.super, "failed to initilize CUDA resources");
return UCC_ERR_NOT_SUPPORTED;
}

ucc_spinlock_init(&ucc_ec_cuda.init_spinlock, 0);
return UCC_OK;
}
Expand Down Expand Up @@ -259,7 +252,15 @@ ucc_status_t ucc_ec_cuda_event_test(void *event)

static ucc_status_t ucc_ec_cuda_finalize()
{
ucc_ec_cuda_resources_cleanup(&ucc_ec_cuda.resources);
ucc_ec_cuda_resources_t *resources;

resources = ec_cuda_resources_hash_pop(ucc_ec_cuda.resources_hash);
while (resources) {
ucc_ec_cuda_resources_cleanup(resources);
resources = ec_cuda_resources_hash_pop(ucc_ec_cuda.resources_hash);
}

ucc_spinlock_destroy(&ucc_ec_cuda.init_spinlock);

return UCC_OK;
}
Expand All @@ -276,10 +277,14 @@ ucc_status_t ucc_ec_cuda_get_resources(ucc_ec_cuda_resources_t **resources)
return status;
}

#if CUDA_VERSION < 12000
cu_ctx_id = 1;
#else
status = CUDADRV_FUNC(cuCtxGetId(cu_ctx, &cu_ctx_id));
if (ucc_unlikely(status != UCC_OK)) {
ec_error(&ucc_ec_cuda.super, "failed to get currect CUDA context ID");
}
#endif

*resources = ec_cuda_resources_hash_get(ucc_ec_cuda.resources_hash,
cu_ctx_id);
Expand All @@ -298,7 +303,6 @@ ucc_status_t ucc_ec_cuda_get_resources(ucc_ec_cuda_resources_t **resources)
return UCC_ERR_NO_MEMORY;
}
status = ucc_ec_cuda_resources_init(&ucc_ec_cuda.super,
EC_CUDA_CONFIG->exec_num_streams,
*resources);
if (status != UCC_OK) {
ucc_free(*resources);
Expand Down Expand Up @@ -341,5 +345,7 @@ ucc_ec_cuda_t ucc_ec_cuda = {
.super.executor_ops.finalize = ucc_cuda_executor_finalize,
};

ucc_ec_cuda_config_t *ucc_ec_cuda_config;

UCC_CONFIG_REGISTER_TABLE_ENTRY(&ucc_ec_cuda.super.config_table,
&ucc_config_global_list);
22 changes: 0 additions & 22 deletions src/components/ec/cuda/ec_cuda.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,35 +16,13 @@

#define WARP_SIZE 32

typedef enum ucc_ec_cuda_strm_task_mode {
UCC_EC_CUDA_TASK_KERNEL,
UCC_EC_CUDA_TASK_MEM_OPS,
UCC_EC_CUDA_TASK_AUTO,
UCC_EC_CUDA_TASK_LAST,
} ucc_ec_cuda_strm_task_mode_t;


typedef ucc_status_t (*ucc_ec_cuda_task_post_fn) (uint32_t *dev_status,
int blocking_wait,
cudaStream_t stream);

typedef struct ucc_ec_cuda_config {
ucc_ec_config_t super;
ucc_ec_cuda_strm_task_mode_t strm_task_mode;
unsigned long exec_num_workers;
unsigned long exec_num_threads;
unsigned long exec_max_tasks;
unsigned long exec_num_streams;
unsigned long reduce_num_blocks;
int reduce_num_threads;
int use_cooperative_launch;
unsigned long exec_copy_thresh;
} ucc_ec_cuda_config_t;

typedef struct ucc_ec_cuda {
ucc_ec_base_t super;
int exec_streams_initialized;
ucc_ec_cuda_resources_t resources;
ucc_ec_cuda_resources_hash_t *resources_hash;
ucc_thread_mode_t thread_mode;
ucc_ec_cuda_strm_task_mode_t strm_task_mode;
Expand Down
10 changes: 9 additions & 1 deletion src/components/ec/cuda/ec_cuda_executor.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,16 @@ ucc_status_t ucc_cuda_executor_persistent_wait_stop(ucc_ee_executor_t *executor)
ucc_status_t ucc_cuda_executor_init(const ucc_ee_executor_params_t *params,
ucc_ee_executor_t **executor)
{
ucc_ec_cuda_executor_t *eee = ucc_mpool_get(&ucc_ec_cuda.resources.executors);
ucc_ec_cuda_executor_t *eee;
ucc_ec_cuda_resources_t *resources;
ucc_status_t status;

status = ucc_ec_cuda_get_resources(&resources);
if (ucc_unlikely(status != UCC_OK)) {
return status;
}

eee = ucc_mpool_get(&resources->executors);
if (ucc_unlikely(!eee)) {
ec_error(&ucc_ec_cuda.super, "failed to allocate executor");
return UCC_ERR_NO_MEMORY;
Expand Down
8 changes: 7 additions & 1 deletion src/components/ec/cuda/ec_cuda_executor_interruptible.c
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,20 @@ ucc_cuda_executor_interruptible_task_post(ucc_ee_executor_t *executor,
ucc_ec_cuda_executor_interruptible_task_t *ee_task;
ucc_status_t status;
cudaGraphNode_t nodes[UCC_EE_EXECUTOR_MULTI_OP_NUM_BUFS];
ucc_ec_cuda_resources_t *resources;
int i;

status = ucc_ec_cuda_get_resources(&resources);
if (ucc_unlikely(status != UCC_OK)) {
return status;
}

status = ucc_cuda_executor_interruptible_get_stream(&stream);
if (ucc_unlikely(status != UCC_OK)) {
return status;
}

ee_task = ucc_mpool_get(&ucc_ec_cuda.resources.executor_interruptible_tasks);
ee_task = ucc_mpool_get(&resources->executor_interruptible_tasks);
if (ucc_unlikely(!ee_task)) {
return UCC_ERR_NO_MEMORY;
}
Expand Down
9 changes: 8 additions & 1 deletion src/components/ec/cuda/ec_cuda_executor_persistent.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,19 @@ ucc_cuda_executor_persistent_task_post(ucc_ee_executor_t *executor,
ucc_ee_executor_task_args_t *subtask_args;
ucc_ec_cuda_executor_persistent_task_t *ee_task;
int i;
ucc_ec_cuda_resources_t *resources;
ucc_status_t status;

status = ucc_ec_cuda_get_resources(&resources);
if (ucc_unlikely(status != UCC_OK)) {
return status;
}

if (ucc_ec_cuda.thread_mode == UCC_THREAD_MULTIPLE) {
ucc_spin_lock(&eee->tasks_lock);
}

ee_task = ucc_mpool_get(&ucc_ec_cuda.resources.executor_persistent_tasks);
ee_task = ucc_mpool_get(&resources->executor_persistent_tasks);
if (ucc_unlikely(!ee_task)) {
return UCC_ERR_NO_MEMORY;
}
Expand Down
16 changes: 8 additions & 8 deletions src/components/ec/cuda/ec_cuda_resources.c
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
#include "components/ec/ucc_ec_log.h"
#include "utils/ucc_malloc.h"

#define EXEC_MAX_TASKS 128

static void ucc_ec_cuda_event_init(ucc_mpool_t *mp, void *obj, void *chunk) //NOLINT: mp is unused
{
ucc_ec_cuda_event_t *base = (ucc_ec_cuda_event_t *) obj;
Expand Down Expand Up @@ -43,9 +41,7 @@ static void ucc_ec_cuda_executor_chunk_init(ucc_mpool_t *mp, void *obj, //NOLINT
void *chunk) //NOLINT: chunk is unused
{
ucc_ec_cuda_executor_t *eee = (ucc_ec_cuda_executor_t*) obj;
// int max_tasks = EC_CUDA_CONFIG->exec_max_tasks;
//TODO: add config
int max_tasks = EXEC_MAX_TASKS;
int max_tasks = ucc_ec_cuda_config->exec_max_tasks;

CUDA_FUNC(cudaHostGetDevicePointer(
(void**)(&eee->dev_state), (void *)&eee->state, 0));
Expand Down Expand Up @@ -112,11 +108,12 @@ static ucc_mpool_ops_t ucc_ec_cuda_interruptible_task_mpool_ops = {
};

ucc_status_t ucc_ec_cuda_resources_init(ucc_ec_base_t *ec,
int num_streams,
ucc_ec_cuda_resources_t *resources)
{
ucc_status_t status;
int num_streams;

CUDADRV_CHECK(cuCtxGetCurrent(&resources->cu_ctx));
status = ucc_mpool_init(&resources->events, 0, sizeof(ucc_ec_cuda_event_t),
0, UCC_CACHE_LINE_SIZE, 16, UINT_MAX,
&ucc_ec_cuda_event_mpool_ops, UCC_THREAD_MULTIPLE,
Expand Down Expand Up @@ -155,7 +152,7 @@ ucc_status_t ucc_ec_cuda_resources_init(ucc_ec_base_t *ec,
goto free_interruptible_tasks_mpool;
}

resources->num_streams = num_streams;
num_streams = ucc_ec_cuda_config->exec_num_streams;
resources->exec_streams = ucc_calloc(num_streams, sizeof(cudaStream_t),
"ec cuda streams");
if (!resources->exec_streams) {
Expand All @@ -182,8 +179,10 @@ ucc_status_t ucc_ec_cuda_resources_init(ucc_ec_base_t *ec,
void ucc_ec_cuda_resources_cleanup(ucc_ec_cuda_resources_t *resources)
{
int i;
CUcontext tmp_context;

for (i = 0; i < resources->num_streams; i++) {
cuCtxPushCurrent(resources->cu_ctx);
for (i = 0; i < ucc_ec_cuda_config->exec_num_streams; i++) {
if (resources->exec_streams[i] != NULL) {
CUDA_FUNC(cudaStreamDestroy(resources->exec_streams[i]));
}
Expand All @@ -194,4 +193,5 @@ void ucc_ec_cuda_resources_cleanup(ucc_ec_cuda_resources_t *resources)
ucc_mpool_cleanup(&resources->executor_persistent_tasks, 1);

ucc_free(resources->exec_streams);
cuCtxPopCurrent(&tmp_context);
}
44 changes: 43 additions & 1 deletion src/components/ec/cuda/ec_cuda_resources.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ typedef struct ucc_ec_cuda_executor_persistent_task {
} ucc_ec_cuda_executor_persistent_task_t;

typedef struct ucc_ec_cuda_resources {
CUcontext cu_ctx;
ucc_mpool_t events;
ucc_mpool_t executors;
ucc_mpool_t executor_interruptible_tasks;
Expand All @@ -76,8 +77,29 @@ typedef struct ucc_ec_cuda_resources {
cudaStream_t *exec_streams;
} ucc_ec_cuda_resources_t;

typedef enum ucc_ec_cuda_strm_task_mode {
UCC_EC_CUDA_TASK_KERNEL,
UCC_EC_CUDA_TASK_MEM_OPS,
UCC_EC_CUDA_TASK_AUTO,
UCC_EC_CUDA_TASK_LAST,
} ucc_ec_cuda_strm_task_mode_t;

typedef struct ucc_ec_cuda_config {
ucc_ec_config_t super;
ucc_ec_cuda_strm_task_mode_t strm_task_mode;
unsigned long exec_num_workers;
unsigned long exec_num_threads;
unsigned long exec_max_tasks;
unsigned long exec_num_streams;
unsigned long reduce_num_blocks;
int reduce_num_threads;
int use_cooperative_launch;
unsigned long exec_copy_thresh;
} ucc_ec_cuda_config_t;

extern ucc_ec_cuda_config_t *ucc_ec_cuda_config;

ucc_status_t ucc_ec_cuda_resources_init(ucc_ec_base_t *ec,
int num_streams,
ucc_ec_cuda_resources_t *resources);

void ucc_ec_cuda_resources_cleanup(ucc_ec_cuda_resources_t *resources);
Expand Down Expand Up @@ -112,5 +134,25 @@ void ec_cuda_resources_hash_put(ucc_ec_cuda_resources_hash_t *h,
kh_value(h, k) = value;
}

static inline
void* ec_cuda_resources_hash_pop(ucc_ec_cuda_resources_hash_t *h)
{
void *resources = NULL;
khiter_t k;

k = kh_begin(h);
while (k != kh_end(h)) {
if (kh_exist(h, k)) {
resources = kh_value(h, k);
break;
}
k++;
}

if (resources) {
kh_del(ucc_ec_cuda_resources_hash, h, k);
}
return resources;
}

#endif
6 changes: 6 additions & 0 deletions src/components/ec/ucc_ec.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* See file LICENSE for terms.
*/

#include <pthread.h>
#include "config.h"
#include "base/ucc_ec_base.h"
#include "ucc_ec.h"
Expand All @@ -13,6 +14,7 @@

static const ucc_ec_ops_t *ec_ops[UCC_EE_LAST];
static const ucc_ee_executor_ops_t *executor_ops[UCC_EE_LAST];
static pthread_mutex_t ucc_ec_mutex = PTHREAD_MUTEX_INITIALIZER;

#define UCC_CHECK_EC_AVAILABLE(ee) \
do { \
Expand All @@ -28,6 +30,7 @@ ucc_status_t ucc_ec_init(const ucc_ec_params_t *ec_params)
ucc_status_t status;
ucc_ec_attr_t attr;

pthread_mutex_lock(&ucc_ec_mutex);
memset(ec_ops, 0, UCC_EE_LAST * sizeof(ucc_ec_ops_t *));
n_ecs = ucc_global_config.ec_framework.n_components;
for (i = 0; i < n_ecs; i++) {
Expand Down Expand Up @@ -75,6 +78,7 @@ ucc_status_t ucc_ec_init(const ucc_ec_params_t *ec_params)
ec_ops[ec->type] = &ec->ops;
executor_ops[ec->type] = &ec->executor_ops;
}
pthread_mutex_unlock(&ucc_ec_mutex);

return UCC_OK;
}
Expand Down Expand Up @@ -102,6 +106,7 @@ ucc_status_t ucc_ec_finalize()
ucc_ee_type_t et;
ucc_ec_base_t *ec;

pthread_mutex_lock(&ucc_ec_mutex);
for (et = UCC_EE_FIRST; et < UCC_EE_LAST; et++) {
if (NULL != ec_ops[et]) {
ec = ucc_container_of(ec_ops[et], ucc_ec_base_t, ops);
Expand All @@ -115,6 +120,7 @@ ucc_status_t ucc_ec_finalize()
}
}
}
pthread_mutex_unlock(&ucc_ec_mutex);

return UCC_OK;
}
Expand Down
Loading

0 comments on commit 106a67d

Please sign in to comment.