Skip to content

Commit

Permalink
remove cuda dep. simplify coll_init/start
Browse files Browse the repository at this point in the history
  • Loading branch information
ferrol aderholdt committed Sep 22, 2023
1 parent f86fd47 commit d8c72d9
Show file tree
Hide file tree
Showing 9 changed files with 151 additions and 284 deletions.
7 changes: 3 additions & 4 deletions src/components/cl/urom/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,13 @@ sources = \
cl_urom_context.c \
cl_urom_team.c \
cl_urom_coll.c \
cl_urom_memcpy.c \
$(alltoall)

module_LTLIBRARIES = libucc_cl_urom.la
libucc_cl_urom_la_SOURCES = $(sources)
libucc_cl_urom_la_CPPFLAGS = $(AM_CPPFLAGS) $(BASE_CPPFLAGS) $(UROM_CPPFLAGS) $(CUDA_CPPFLAGS)
libucc_cl_urom_la_CPPFLAGS = $(AM_CPPFLAGS) $(BASE_CPPFLAGS) $(UROM_CPPFLAGS)
libucc_cl_urom_la_CFLAGS = $(BASE_CFLAGS)
libucc_cl_urom_la_LDFLAGS = -version-info $(SOVERSION) --as-needed $(UROM_LDFLAGS) $(CUDA_LDFLAGS)
libucc_cl_urom_la_LIBADD = $(UROM_LIBADD) $(CUDA_LIBS) $(UCC_TOP_BUILDDIR)/src/libucc.la
libucc_cl_urom_la_LDFLAGS = -version-info $(SOVERSION) --as-needed $(UROM_LDFLAGS)
libucc_cl_urom_la_LIBADD = $(UROM_LIBADD) $(UCC_TOP_BUILDDIR)/src/libucc.la

include $(top_srcdir)/config/module.am
84 changes: 36 additions & 48 deletions src/components/cl/urom/alltoall/alltoall.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
*/

#include "alltoall.h"
//#include "utils/arch/cuda_def.h"

ucc_base_coll_alg_info_t
ucc_cl_urom_alltoall_algs[UCC_CL_UROM_ALLTOALL_ALG_LAST + 1] = {
Expand Down Expand Up @@ -65,34 +64,41 @@ static ucc_status_t ucc_cl_urom_alltoall_full_start(ucc_coll_task_t *task)
.ucc.cmd_type = UROM_WORKER_CMD_UCC_COLL,
.ucc.coll_cmd.coll_args = coll_args,
.ucc.coll_cmd.team = cl_team->teams[0],
.ucc.coll_cmd.use_xgvmi = cl_lib->xgvmi_enabled,
.ucc.coll_cmd.use_xgvmi = ctx->xgvmi_enabled,
};
ucc_memory_type_t prev_src, prev_dst;

prev_src = coll_args->src.info.mem_type;
prev_dst = coll_args->dst.info.mem_type;
coll_args->src.info.mem_type = UCC_MEMORY_TYPE_HOST;
coll_args->dst.info.mem_type = UCC_MEMORY_TYPE_HOST;
urom_status = urom_worker_push_cmdq(cl_lib->urom_ctx.urom_worker, 0, &coll_cmd);
if (UROM_OK != urom_status) {
cl_debug(&cl_lib->super, "failed to push collective to urom");
return UCC_ERR_NO_MESSAGE;
}
coll_args->src.info.mem_type = prev_src;
coll_args->dst.info.mem_type = prev_dst;

/*
if (coll_args->src.info.mem_type != UCC_MEMORY_TYPE_CUDA) {
urom_status = urom_worker_push_cmdq(cl_lib->urom_worker, 0, &coll_cmd);
urom_status = urom_worker_push_cmdq(cl_lib->urom_ctx.urom_worker, 0, &coll_cmd);
if (UROM_OK != urom_status) {
cl_debug(&cl_lib->super, "failed to push collective to urom");
return UCC_ERR_NO_MESSAGE;
}
} else {
#if HAVE_CUDA
// FIXME: a better way is to tweak args in urom
cudaStreamSynchronize(cl_lib->cuda_stream);

coll_args->src.info.mem_type = UCC_MEMORY_TYPE_HOST;
coll_args->dst.info.mem_type = UCC_MEMORY_TYPE_HOST;
urom_status = urom_worker_push_cmdq(cl_lib->urom_worker, 0, &coll_cmd);
urom_status = urom_worker_push_cmdq(cl_lib->urom_ctx.urom_worker, 0, &coll_cmd);
if (UROM_OK != urom_status) {
cl_debug(&cl_lib->super, "failed to push collective to urom");
return UCC_ERR_NO_MESSAGE;
}
coll_args->src.info.mem_type = UCC_MEMORY_TYPE_CUDA;
coll_args->dst.info.mem_type = UCC_MEMORY_TYPE_CUDA;
#else
cl_error(&cl_lib->super, "attempting to use CUDA without CUDA support");
return UCC_ERR_NO_RESOURCE;
#endif
}
*/
task->status = UCC_INPROGRESS;
cl_debug(&cl_lib->super, "pushed the collective to urom");
return ucc_progress_queue_enqueue(ctx->super.super.ucc_context->pq, task);
Expand All @@ -117,7 +123,7 @@ static void ucc_cl_urom_alltoall_full_progress(ucc_coll_task_t *ctask)
urom_status_t urom_status = 0;
urom_worker_notify_t *notif;

urom_status = urom_worker_pop_notifyq(cl_lib->urom_worker, 0, &notif);
urom_status = urom_worker_pop_notifyq(cl_lib->urom_ctx.urom_worker, 0, &notif);
if (UROM_ERR_QUEUE_EMPTY == urom_status) {
return;
}
Expand All @@ -133,25 +139,14 @@ static void ucc_cl_urom_alltoall_full_progress(ucc_coll_task_t *ctask)
return;
}

if (cl_lib->xgvmi_enabled) {
if (ctx->req_mc) {
size_t size_mod = dt_size(ctask->bargs.args.dst.info.datatype);

if (cl_lib->req_mc) {
if (ctask->bargs.args.dst.info.mem_type != UCC_MEMORY_TYPE_CUDA) {
memcpy(cl_lib->old_dest, ctask->bargs.args.dst.info.buffer, ctask->bargs.args.src.info.count * size_mod);
} else {
#if HAVE_CUDA
cudaMemcpyAsync(cl_lib->old_dest, ctask->bargs.args.dst.info.buffer, ctask->bargs.args.src.info.count * size_mod , cudaMemcpyHostToDevice, cl_lib->cuda_stream);
cudaStreamSynchronize(cl_lib->cuda_stream);
#else
cl_error(&cl_lib->super, "attempting to use CUDA without CUDA support");
return UCC_ERR_NO_RESOURCE;
#endif
}
ctask->bargs.args.dst.info.buffer = cl_lib->old_dest;
ctask->bargs.args.src.info.buffer = cl_lib->old_src;
if ((ucc_status_t) notif->ucc.status == UCC_OK) {
ucc_mc_memcpy(ctx->old_dest, ctask->bargs.args.dst.info.buffer, ctask->bargs.args.dst.info.count * size_mod, ctask->bargs.args.dst.info.mem_type, UCC_MEMORY_TYPE_HOST);
ctask->bargs.args.dst.info.buffer = ctx->old_dest;
ctask->bargs.args.src.info.buffer = ctx->old_src;
}

}
cl_debug(&cl_lib->super, "completed the collective from urom");

Expand All @@ -165,6 +160,7 @@ ucc_status_t ucc_cl_urom_alltoall_full_init(
ucc_cl_urom_team_t *cl_team = ucc_derived_of(team, ucc_cl_urom_team_t);
ucc_cl_urom_context_t *ctx = UCC_CL_UROM_TEAM_CTX(cl_team);
ucc_cl_urom_lib_t *cl_lib = ucc_derived_of(ctx->super.super.lib, ucc_cl_urom_lib_t);

ucc_cl_urom_schedule_t *cl_schedule;
ucc_base_coll_args_t args;
ucc_schedule_t *schedule;
Expand All @@ -175,26 +171,17 @@ ucc_status_t ucc_cl_urom_alltoall_full_init(
return UCC_ERR_NO_MEMORY;
}
schedule = &cl_schedule->super.super;
if (cl_lib->xgvmi_enabled) {
if (ctx->req_mc) {
size_t size_mod = dt_size(coll_args->args.src.info.datatype);
if (cl_lib->req_mc) {
//memcpy args to xgvmi buffer
void * ptr = cl_lib->xgvmi_buffer + (cl_lib->cfg.xgvmi_buffer_size * (schedule->super.seq_num % cl_lib->cfg.num_buffers));
if (coll_args->args.src.info.mem_type != UCC_MEMORY_TYPE_CUDA) {
memcpy(ptr, coll_args->args.src.info.buffer, coll_args->args.src.info.count * size_mod);
} else {
#if HAVE_CUDA
cudaMemcpyAsync(ptr, coll_args->args.src.info.buffer, coll_args->args.src.info.count * size_mod, cudaMemcpyDeviceToHost, cl_lib->cuda_stream);
#else
cl_error(&cl_lib->super, "attempting to use CUDA without CUDA support");
return UCC_ERR_NO_RESOURCE;
#endif
}
cl_lib->old_src = coll_args->args.src.info.buffer;
coll_args->args.src.info.buffer = ptr;
cl_lib->old_dest = coll_args->args.dst.info.buffer;
coll_args->args.dst.info.buffer = ptr + coll_args->args.src.info.count * size_mod;
}
size_t count = coll_args->args.src.info.count * size_mod;
//memcpy args to xgvmi buffer
void * ptr = ctx->xgvmi.xgvmi_buffer + (cl_lib->cfg.xgvmi_buffer_size * (schedule->super.seq_num % cl_lib->cfg.num_buffers));
ucc_mc_memcpy(ptr, coll_args->args.src.info.buffer, count, UCC_MEMORY_TYPE_HOST, coll_args->args.src.info.mem_type);

ctx->old_src = coll_args->args.src.info.buffer;
coll_args->args.src.info.buffer = ptr;
ctx->old_dest = coll_args->args.dst.info.buffer;
coll_args->args.dst.info.buffer = ptr + count;
}
memcpy(&args, coll_args, sizeof(args));
status = ucc_schedule_init(schedule, &args, team);
Expand All @@ -211,5 +198,6 @@ ucc_status_t ucc_cl_urom_alltoall_full_init(
ucc_cl_urom_alltoall_triggered_post_setup;

*task = &schedule->super;
cl_debug(cl_lib, "urom coll init'd");
return UCC_OK;
}
36 changes: 0 additions & 36 deletions src/components/cl/urom/alltoall/tags

This file was deleted.

77 changes: 37 additions & 40 deletions src/components/cl/urom/cl_urom.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,10 @@
#include <urom/api/urom.h>
#include <urom/api/urom_ucc.h>

#include "utils/arch/cuda_def.h"

#ifndef UCC_CL_UROM_DEFAULT_SCORE
#define UCC_CL_UROM_DEFAULT_SCORE 20
#endif

#define OFFSET_SIZE (128*1024*1024)
#define NUM_OFFSETS 8

typedef struct ucc_cl_urom_iface {
ucc_cl_iface_t super;
} ucc_cl_urom_iface_t;
Expand All @@ -47,54 +42,56 @@ typedef struct ucc_cl_urom_context_config {
ucc_cl_context_config_t super;
} ucc_cl_urom_context_config_t;

typedef struct urom_ctx {
urom_service_h urom_service;
urom_worker_h urom_worker;
void *urom_worker_addr;
size_t urom_worker_len;
uint64_t worker_id;
int pass_dc_exist;
} urom_ctx_t;

typedef struct xgvmi_info {
ucp_mem_h xgvmi_memh;
void *packed_mkey;
uint64_t packed_mkey_len;
void *packed_xgvmi_memh;
uint64_t packed_xgvmi_len;
void *xgvmi_buffer;
size_t xgvmi_size;
} xgvmi_info_t;

typedef struct ucc_cl_urom_lib {
ucc_cl_lib_t super;
ucc_cl_urom_lib_config_t cfg;
urom_service_h urom_service;
urom_worker_h urom_worker;
void * urom_ucc_ctx_h;
void *urom_worker_addr;
size_t urom_worker_len;
uint64_t worker_id;
int pass_dc_exist;
int xgvmi_enabled;
ucp_mem_h xgvmi_memh;
void * packed_mkey;
uint64_t packed_mkey_len;
void * packed_xgvmi_memh;
uint64_t packed_xgvmi_len;
void * xgvmi_buffer;
size_t xgvmi_size;
int req_mc;
void * old_dest;
void * old_src;
int xgvmi_offsets[NUM_OFFSETS];
int seq_num;
urom_ctx_t urom_ctx;
int tl_ucp_index; //FIXME: make this better
//void * cuda_stream;
#if HAVE_CUDA
cudaStream_t cuda_stream;
#endif
ucc_rank_t ctx_rank; //FIXME: this is not right
} ucc_cl_urom_lib_t;
UCC_CLASS_DECLARE(ucc_cl_urom_lib_t, const ucc_base_lib_params_t *,
const ucc_base_config_t *);

typedef struct ucc_cl_urom_context {
ucc_cl_context_t super;
urom_domain_h urom_domain;
ucc_mpool_t sched_mp;
ucc_cl_context_t super;
urom_domain_h urom_domain;
void *urom_ucc_ctx_h;
ucc_mpool_t sched_mp;
xgvmi_info_t xgvmi;
int xgvmi_enabled;
int req_mc;
void *old_dest;
void *old_src;
ucc_rank_t ctx_rank; //FIXME: this is not right
} ucc_cl_urom_context_t;
UCC_CLASS_DECLARE(ucc_cl_urom_context_t, const ucc_base_context_params_t *,
const ucc_base_config_t *);

typedef struct ucc_cl_urom_team {
ucc_cl_team_t super;
int team_posted;
ucc_team_h **teams;
unsigned n_teams;
ucc_coll_score_t *score;
ucc_score_map_t *score_map;
ucc_cl_team_t super;
int team_posted;
ucc_team_h **teams;
unsigned n_teams;
ucc_coll_score_t *score;
ucc_score_map_t *score_map;
} ucc_cl_urom_team_t;
UCC_CLASS_DECLARE(ucc_cl_urom_team_t, ucc_base_context_t *,
const ucc_base_team_params_t *);
Expand All @@ -103,7 +100,7 @@ ucc_status_t ucc_cl_urom_coll_init(ucc_base_coll_args_t *coll_args,
ucc_base_team_t *team,
ucc_coll_task_t **task);

#define UCC_CL_UROM_TEAM_CTX(_team) \
#define UCC_CL_UROM_TEAM_CTX(_team) \
(ucc_derived_of((_team)->super.super.context, ucc_cl_urom_context_t))

#endif
10 changes: 5 additions & 5 deletions src/components/cl/urom/cl_urom_coll.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,25 +26,25 @@ ucc_status_t ucc_cl_urom_coll_init(ucc_base_coll_args_t *coll_args,
int ucp_index = urom_lib->tl_ucp_index;
ucc_tl_ucp_context_t *tl_ctx = ucc_derived_of(ctx->super.tl_ctxs[ucp_index], ucc_tl_ucp_context_t);
urom_status_t urom_status;
if (!urom_lib->pass_dc_exist) {
if (!urom_lib->urom_ctx.pass_dc_exist) {
urom_worker_cmd_t pass_dc_cmd = {
.cmd_type = UROM_WORKER_CMD_UCC,
.ucc.cmd_type = UROM_WORKER_CMD_CREATE_PASSIVE_DATA_CHANNEL,
.ucc.dpu_worker_id = urom_lib->ctx_rank,
.ucc.dpu_worker_id = ctx->ctx_rank,
.ucc.pass_dc_create_cmd.ucp_addr = tl_ctx->worker.worker_address,
.ucc.pass_dc_create_cmd.addr_len = tl_ctx->worker.ucp_addrlen,
};
urom_worker_notify_t *notif;

urom_worker_push_cmdq(urom_lib->urom_worker, 0, &pass_dc_cmd);
urom_worker_push_cmdq(urom_lib->urom_ctx.urom_worker, 0, &pass_dc_cmd);
while (UROM_ERR_QUEUE_EMPTY ==
(urom_status = urom_worker_pop_notifyq(urom_lib->urom_worker, 0, &notif))) {
(urom_status = urom_worker_pop_notifyq(urom_lib->urom_ctx.urom_worker, 0, &notif))) {
sched_yield();
}
if ((ucc_status_t) notif->ucc.status != UCC_OK) {
return (ucc_status_t) notif->ucc.status;
}
urom_lib->pass_dc_exist = 1;
urom_lib->urom_ctx.pass_dc_exist = 1;
}
switch (coll_args->args.coll_type) {
case UCC_COLL_TYPE_ALLTOALL:
Expand Down
Loading

0 comments on commit d8c72d9

Please sign in to comment.