From 820794df5d755004064ec1d326855065c49bdc01 Mon Sep 17 00:00:00 2001 From: Yael Yacobovich Date: Fri, 13 Dec 2024 17:31:34 +0200 Subject: [PATCH] add loopback option for allgather --- src/components/tl/ucp/allgather/allgather.h | 3 + .../tl/ucp/allgather/allgather_bruck.c | 2 +- .../tl/ucp/allgather/allgather_knomial.c | 64 +++++++++++++------ .../tl/ucp/allgather/allgather_neighbor.c | 18 ++++-- .../tl/ucp/allgather/allgather_ring.c | 37 ++++++++--- .../tl/ucp/allgather/allgather_sparbit.c | 17 +++-- src/components/tl/ucp/tl_ucp.c | 4 ++ src/components/tl/ucp/tl_ucp.h | 1 + src/components/tl/ucp/tl_ucp_coll.h | 1 + src/components/tl/ucp/tl_ucp_lib.c | 1 + src/components/tl/ucp/tl_ucp_service_coll.c | 3 + src/ucc/api/ucc.h | 2 + 12 files changed, 116 insertions(+), 37 deletions(-) diff --git a/src/components/tl/ucp/allgather/allgather.h b/src/components/tl/ucp/allgather/allgather.h index 61733a4ab7..f6b7d54bfd 100644 --- a/src/components/tl/ucp/allgather/allgather.h +++ b/src/components/tl/ucp/allgather/allgather.h @@ -7,6 +7,8 @@ #define ALLGATHER_H_ #include "../tl_ucp.h" #include "../tl_ucp_coll.h" + #include "tl_ucp_sendrecv.h" + enum { UCC_TL_UCP_ALLGATHER_ALG_KNOMIAL, @@ -36,6 +38,7 @@ static inline int ucc_tl_ucp_allgather_alg_from_str(const char *str) return i; } + ucc_status_t ucc_tl_ucp_allgather_init(ucc_tl_ucp_task_t *task); /* Ring */ diff --git a/src/components/tl/ucp/allgather/allgather_bruck.c b/src/components/tl/ucp/allgather/allgather_bruck.c index 507136805f..959e687a9b 100644 --- a/src/components/tl/ucp/allgather/allgather_bruck.c +++ b/src/components/tl/ucp/allgather/allgather_bruck.c @@ -254,4 +254,4 @@ ucc_status_t ucc_tl_ucp_allgather_bruck_start(ucc_coll_task_t *coll_task) } return ucc_progress_queue_enqueue(UCC_TL_CORE_CTX(team)->pq, &task->super); -} +} \ No newline at end of file diff --git a/src/components/tl/ucp/allgather/allgather_knomial.c b/src/components/tl/ucp/allgather/allgather_knomial.c index 1fbcf773cc..ecee9134f5 100644 --- a/src/components/tl/ucp/allgather/allgather_knomial.c +++ b/src/components/tl/ucp/allgather/allgather_knomial.c @@ -13,6 +13,9 @@ #include "coll_patterns/sra_knomial.h" #include "utils/ucc_math.h" #include "utils/ucc_coll_utils.h" +#include "allgather.h" + + #define SAVE_STATE(_phase) \ do { \ @@ -52,6 +55,7 @@ * As such allgather must keep to this ranking to be aligned with scatter. */ + void ucc_tl_ucp_allgather_knomial_progress(ucc_coll_task_t *coll_task) { ucc_tl_ucp_task_t *task = ucc_derived_of(coll_task, @@ -78,9 +82,17 @@ void ucc_tl_ucp_allgather_knomial_progress(ucc_coll_task_t *coll_task) ucc_status_t status; size_t extra_count; - EXEC_TASK_TEST(UCC_KN_PHASE_INIT, "failed during ee task test", - task->allgather_kn.etask); + int use_cuda = UCC_TL_UCP_TEAM_LIB(team)->cfg.allgather_use_cuda; + + if (!use_cuda) { + if (UCC_INPROGRESS == ucc_tl_ucp_test(task)){ + return; + } + } + + if (use_cuda) EXEC_TASK_TEST(UCC_KN_PHASE_INIT, "failed during ee task test", task->allgather_kn.etask); task->allgather_kn.etask = NULL; + UCC_KN_GOTO_PHASE(task->allgather_kn.phase); if (KN_NODE_EXTRA == node_type) { peer = ucc_knomial_pattern_get_proxy(p, rank); @@ -197,6 +209,7 @@ void ucc_tl_ucp_allgather_knomial_progress(ucc_coll_task_t *coll_task) ucc_status_t ucc_tl_ucp_allgather_knomial_start(ucc_coll_task_t *coll_task) { + ucc_tl_ucp_task_t *task = ucc_derived_of(coll_task, ucc_tl_ucp_task_t); ucc_coll_args_t *args = &TASK_ARGS(task); @@ -214,6 +227,8 @@ ucc_status_t ucc_tl_ucp_allgather_knomial_start(ucc_coll_task_t *coll_task) ucc_ee_executor_t *exec; void *rbuf; + int use_cuda = UCC_TL_UCP_TEAM_LIB(team)->cfg.allgather_use_cuda; + UCC_TL_UCP_PROFILE_REQUEST_EVENT(coll_task, "ucp_allgather_kn_start", 0); ucc_tl_ucp_task_reset(task, UCC_INPROGRESS); task->allgather_kn.etask = NULL; @@ -225,21 +240,29 @@ ucc_status_t ucc_tl_ucp_allgather_knomial_start(ucc_coll_task_t *coll_task) ucc_dt_size(args->dst.info.datatype); rbuf = args->dst.info.buffer; if (!UCC_IS_INPLACE(*args)) { - status = ucc_coll_task_get_executor(&task->super, &exec); - if (ucc_unlikely(status != UCC_OK)) { - task->super.status = status; - return status; - } - eargs.task_type = UCC_EE_EXECUTOR_TASK_COPY; - eargs.copy.dst = PTR_OFFSET(args->dst.info.buffer, offset); - eargs.copy.src = args->src.info.buffer; - eargs.copy.len = args->src.info.count * - ucc_dt_size(args->src.info.datatype); - status = ucc_ee_executor_task_post(exec, &eargs, - &task->allgather_kn.etask); - if (ucc_unlikely(status != UCC_OK)) { - task->super.status = status; - return status; + if (use_cuda) { + status = ucc_coll_task_get_executor(&task->super, &exec); + if (ucc_unlikely(status != UCC_OK)) { + task->super.status = status; + return status; + } + eargs.task_type = UCC_EE_EXECUTOR_TASK_COPY; + eargs.copy.dst = PTR_OFFSET(args->dst.info.buffer, offset); + eargs.copy.src = args->src.info.buffer; + eargs.copy.len = args->src.info.count * + ucc_dt_size(args->src.info.datatype); + status = ucc_ee_executor_task_post(exec, &eargs, + &task->allgather_kn.etask); + if (ucc_unlikely(status != UCC_OK)) { + task->super.status = status; + return status; + } + } else { + /*Loopback*/ + UCPCHECK_GOTO(ucc_tl_ucp_send_nb(args->src.info.buffer, args->src.info.count * ucc_dt_size(args->src.info.datatype), + args->src.info.mem_type, rank, team, task),task, out); + UCPCHECK_GOTO(ucc_tl_ucp_recv_nb(PTR_OFFSET(args->dst.info.buffer, offset), args->src.info.count * ucc_dt_size(args->src.info.datatype), + args->dst.info.mem_type, rank, team, task),task, out); } } } else if (ct == UCC_COLL_TYPE_ALLGATHERV) { @@ -284,6 +307,8 @@ ucc_status_t ucc_tl_ucp_allgather_knomial_start(ucc_coll_task_t *coll_task) task->allgather_kn.sbuf = PTR_OFFSET(rbuf, offset); return ucc_progress_queue_enqueue(UCC_TL_CORE_CTX(team)->pq, &task->super); +out: + return task->super.status; } ucc_status_t ucc_tl_ucp_allgather_knomial_init_r( @@ -293,8 +318,9 @@ ucc_status_t ucc_tl_ucp_allgather_knomial_init_r( ucc_tl_ucp_team_t *tl_team = ucc_derived_of(team, ucc_tl_ucp_team_t); ucc_tl_ucp_task_t *task; ucc_sbgp_t *sbgp; - task = ucc_tl_ucp_init_task(coll_args, team); + //ucc_coll_args_t *args = &coll_args->args; + if (tl_team->cfg.use_reordering && coll_args->args.coll_type == UCC_COLL_TYPE_ALLREDUCE) { sbgp = ucc_topo_get_sbgp(tl_team->topo, UCC_SBGP_FULL_HOST_ORDERED); @@ -306,6 +332,7 @@ ucc_status_t ucc_tl_ucp_allgather_knomial_init_r( task->super.post = ucc_tl_ucp_allgather_knomial_start; task->super.progress = ucc_tl_ucp_allgather_knomial_progress; *task_h = &task->super; + return UCC_OK; } @@ -316,7 +343,6 @@ ucc_status_t ucc_tl_ucp_allgather_knomial_init(ucc_base_coll_args_t *coll_args, ucc_tl_ucp_team_t *tl_team = ucc_derived_of(team, ucc_tl_ucp_team_t); ucc_rank_t size = UCC_TL_TEAM_SIZE(tl_team); ucc_kn_radix_t radix; - radix = ucc_min(UCC_TL_UCP_TEAM_LIB(tl_team)->cfg.allgather_kn_radix, size); return ucc_tl_ucp_allgather_knomial_init_r(coll_args, team, task_h, radix); } diff --git a/src/components/tl/ucp/allgather/allgather_neighbor.c b/src/components/tl/ucp/allgather/allgather_neighbor.c index 534c197e4e..6e8461540a 100644 --- a/src/components/tl/ucp/allgather/allgather_neighbor.c +++ b/src/components/tl/ucp/allgather/allgather_neighbor.c @@ -145,18 +145,26 @@ ucc_status_t ucc_tl_ucp_allgather_neighbor_start(ucc_coll_task_t *coll_task) ucc_rank_t neighbor; void *tmprecv, *tmpsend; + UCC_TL_UCP_PROFILE_REQUEST_EVENT(coll_task, "ucp_allgather_neighbor_start", 0); ucc_tl_ucp_task_reset(task, UCC_INPROGRESS); + int use_cuda = UCC_TL_UCP_TEAM_LIB(team)->cfg.allgather_use_cuda; + if (!UCC_IS_INPLACE(TASK_ARGS(task))) { - status = ucc_mc_memcpy(PTR_OFFSET(rbuf, data_size * trank), sbuf, + if (use_cuda) { + status = ucc_mc_memcpy(PTR_OFFSET(rbuf, data_size * trank), sbuf, data_size, rmem, smem); - if (ucc_unlikely(UCC_OK != status)) { - return status; + if (ucc_unlikely(UCC_OK != status)) { + return status; + } + } else { + /* Loopback */ + UCPCHECK_GOTO(ucc_tl_ucp_send_nb(sbuf, data_size, smem, trank, team, task),task, err); + UCPCHECK_GOTO(ucc_tl_ucp_recv_nb(PTR_OFFSET(rbuf, data_size * trank), data_size, rmem, trank, team, task),task, err); } } - if (trank % 2) { neighbor = (trank - 1 + tsize) % tsize; } else { @@ -175,4 +183,6 @@ ucc_status_t ucc_tl_ucp_allgather_neighbor_start(ucc_coll_task_t *coll_task) task, out); out: return ucc_progress_queue_enqueue(UCC_TL_CORE_CTX(team)->pq, &task->super); +err: + return task->super.status; } diff --git a/src/components/tl/ucp/allgather/allgather_ring.c b/src/components/tl/ucp/allgather/allgather_ring.c index 07178aea25..6816a58d48 100644 --- a/src/components/tl/ucp/allgather/allgather_ring.c +++ b/src/components/tl/ucp/allgather/allgather_ring.c @@ -47,16 +47,24 @@ void ucc_tl_ucp_allgather_ring_progress(ucc_coll_task_t *coll_task) if (UCC_INPROGRESS == ucc_tl_ucp_test(task)) { return; } + sendto = ucc_ep_map_eval(task->subset.map, (trank + 1) % tsize); recvfrom = ucc_ep_map_eval(task->subset.map, (trank - 1 + tsize) % tsize); + + + uint32_t use_cuda = UCC_TL_UCP_TEAM_LIB(team)->cfg.allgather_use_cuda; + int iter = use_cuda ? tsize - 1 : tsize; + + while (task->tagged.send_posted < iter) { + + step = use_cuda ? task->tagged.send_posted : task->tagged.send_posted - 1; - while (task->tagged.send_posted < tsize - 1) { - step = task->tagged.send_posted; sblock = task->allgather_ring.get_send_block(&task->subset, trank, tsize, step); rblock = task->allgather_ring.get_recv_block(&task->subset, trank, tsize, step); buf = PTR_OFFSET(rbuf, sblock * data_size); + UCPCHECK_GOTO( ucc_tl_ucp_send_nb(buf, data_size, rmem, sendto, team, task), task, out); @@ -64,6 +72,7 @@ void ucc_tl_ucp_allgather_ring_progress(ucc_coll_task_t *coll_task) UCPCHECK_GOTO( ucc_tl_ucp_recv_nb(buf, data_size, rmem, recvfrom, team, task), task, out); + if (UCC_INPROGRESS == ucc_tl_ucp_test(task)) { return; } @@ -93,17 +102,26 @@ ucc_status_t ucc_tl_ucp_allgather_ring_start(ucc_coll_task_t *coll_task) UCC_TL_UCP_PROFILE_REQUEST_EVENT(coll_task, "ucp_allgather_ring_start", 0); ucc_tl_ucp_task_reset(task, UCC_INPROGRESS); + int use_cuda = UCC_TL_UCP_TEAM_LIB(team)->cfg.allgather_use_cuda; + if (!UCC_IS_INPLACE(TASK_ARGS(task))) { - block = task->allgather_ring.get_send_block(&task->subset, trank, tsize, - 0); - status = ucc_mc_memcpy(PTR_OFFSET(rbuf, data_size * block), + block = task->allgather_ring.get_send_block(&task->subset, trank, tsize, 0); + if (use_cuda) { + status = ucc_mc_memcpy(PTR_OFFSET(rbuf, data_size * block), sbuf, data_size, rmem, smem); - if (ucc_unlikely(UCC_OK != status)) { - return status; + if (ucc_unlikely(UCC_OK != status)) { + return status; + } + } else { + /* Loopback */ + ucc_rank_t rank = ucc_ep_map_eval(task->subset.map, trank); + UCPCHECK_GOTO(ucc_tl_ucp_send_nb(sbuf, data_size, smem, rank, team, task),task, out); + UCPCHECK_GOTO(ucc_tl_ucp_recv_nb(PTR_OFFSET(rbuf, data_size * block), data_size, rmem, rank, team, task),task, out); } } - return ucc_progress_queue_enqueue(UCC_TL_CORE_CTX(team)->pq, &task->super); +out: + return task->super.status; } ucc_status_t ucc_tl_ucp_allgather_ring_init_common(ucc_tl_ucp_task_t *task) @@ -119,6 +137,7 @@ ucc_status_t ucc_tl_ucp_allgather_ring_init_common(ucc_tl_ucp_task_t *task) if (!(task->flags & UCC_TL_UCP_TASK_FLAG_SUBSET)) { if (team->cfg.use_reordering) { sbgp = ucc_topo_get_sbgp(team->topo, UCC_SBGP_FULL_HOST_ORDERED); + //printf("reordering: rank %d chenged to %d\n", (int)task->subset.myrank, (int)sbgp->group_rank); task->subset.myrank = sbgp->group_rank; task->subset.map = sbgp->map; } @@ -147,4 +166,4 @@ ucc_status_t ucc_tl_ucp_allgather_ring_init(ucc_base_coll_args_t *coll_args, } *task_h = &task->super; return UCC_OK; -} +} \ No newline at end of file diff --git a/src/components/tl/ucp/allgather/allgather_sparbit.c b/src/components/tl/ucp/allgather/allgather_sparbit.c index 0edfc4d4a3..20c60e8c32 100644 --- a/src/components/tl/ucp/allgather/allgather_sparbit.c +++ b/src/components/tl/ucp/allgather/allgather_sparbit.c @@ -130,13 +130,22 @@ ucc_status_t ucc_tl_ucp_allgather_sparbit_start(ucc_coll_task_t *coll_task) task->allgather_sparbit.i = 0; // setup iteration task->allgather_sparbit.data_expected = 1; + int use_cuda = UCC_TL_UCP_TEAM_LIB(team)->cfg.allgather_use_cuda; + if (!UCC_IS_INPLACE(TASK_ARGS(task))) { - status = ucc_mc_memcpy(PTR_OFFSET(rbuf, data_size * trank), sbuf, + if (use_cuda) { + status = ucc_mc_memcpy(PTR_OFFSET(rbuf, data_size * trank), sbuf, data_size, rmem, smem); - if (ucc_unlikely(UCC_OK != status)) { - return status; + if (ucc_unlikely(UCC_OK != status)) { + return status; + } + } else { + /* Loopback */ + UCPCHECK_GOTO(ucc_tl_ucp_send_nb(sbuf, data_size, smem, trank, team, task),task, out); + UCPCHECK_GOTO(ucc_tl_ucp_recv_nb(PTR_OFFSET(rbuf, data_size * trank), data_size, rmem, trank, team, task),task, out); } } - return ucc_progress_queue_enqueue(UCC_TL_CORE_CTX(team)->pq, &task->super); +out: + return task->super.status; } diff --git a/src/components/tl/ucp/tl_ucp.c b/src/components/tl/ucp/tl_ucp.c index 7db99bdaf2..b1be17e1f8 100644 --- a/src/components/tl/ucp/tl_ucp.c +++ b/src/components/tl/ucp/tl_ucp.c @@ -140,6 +140,10 @@ ucc_config_field_t ucc_tl_ucp_lib_config_table[] = { ucc_offsetof(ucc_tl_ucp_lib_config_t, allgather_kn_radix), UCC_CONFIG_TYPE_UINT}, + {"ALLGATHER_USE_CUDA", "1", "Flag weather to use mc cuda copy", + ucc_offsetof(ucc_tl_ucp_lib_config_t, allgather_use_cuda), + UCC_CONFIG_TYPE_UINT}, + {"BCAST_KN_RADIX", "4", "Radix of the recursive-knomial bcast algorithm", ucc_offsetof(ucc_tl_ucp_lib_config_t, bcast_kn_radix), UCC_CONFIG_TYPE_UINT}, diff --git a/src/components/tl/ucp/tl_ucp.h b/src/components/tl/ucp/tl_ucp.h index 3c439f4ae5..517d4dba3b 100644 --- a/src/components/tl/ucp/tl_ucp.h +++ b/src/components/tl/ucp/tl_ucp.h @@ -45,6 +45,7 @@ extern ucc_tl_ucp_iface_t ucc_tl_ucp; typedef struct ucc_tl_ucp_lib_config { ucc_tl_lib_config_t super; uint32_t kn_radix; + uint32_t allgather_use_cuda; uint32_t fanin_kn_radix; uint32_t fanout_kn_radix; uint32_t barrier_kn_radix; diff --git a/src/components/tl/ucp/tl_ucp_coll.h b/src/components/tl/ucp/tl_ucp_coll.h index 2769244d39..5f7a16e49f 100644 --- a/src/components/tl/ucp/tl_ucp_coll.h +++ b/src/components/tl/ucp/tl_ucp_coll.h @@ -270,6 +270,7 @@ typedef struct ucc_tl_ucp_task { } alltoall_bruck; char plugin_data[UCC_TL_UCP_TASK_PLUGIN_MAX_DATA]; }; + //int count; } ucc_tl_ucp_task_t; typedef struct ucc_tl_ucp_schedule { diff --git a/src/components/tl/ucp/tl_ucp_lib.c b/src/components/tl/ucp/tl_ucp_lib.c index 9a724abff9..7902ddf913 100644 --- a/src/components/tl/ucp/tl_ucp_lib.c +++ b/src/components/tl/ucp/tl_ucp_lib.c @@ -26,6 +26,7 @@ UCC_CLASS_INIT_FUNC(ucc_tl_ucp_lib_t, const ucc_base_lib_params_t *params, if (UCC_OK != status) { return status; } + //printf("\n ------- use_cuda tl_ucp_lib.c:29 = %u ----------\n", self->cfg.allgather_use_cuda); if (tl_ucp_config->kn_radix > 0) { self->cfg.barrier_kn_radix = tl_ucp_config->kn_radix; diff --git a/src/components/tl/ucp/tl_ucp_service_coll.c b/src/components/tl/ucp/tl_ucp_service_coll.c index bf16cf00d7..6176a0f643 100644 --- a/src/components/tl/ucp/tl_ucp_service_coll.c +++ b/src/components/tl/ucp/tl_ucp_service_coll.c @@ -31,6 +31,7 @@ static ucc_rank_t ucc_tl_ucp_service_ring_get_recv_block(ucc_subset_t *subset, static ucc_status_t ucc_tl_ucp_service_coll_start_executor(ucc_coll_task_t *task) { + ucc_debug("<==================== install/ucc/src/components/tl/ucp/tl_ucp_service_coll.c:34 start executer ====================>"); ucc_ee_executor_params_t eparams; ucc_status_t status; @@ -143,6 +144,7 @@ ucc_status_t ucc_tl_ucp_service_allgather(ucc_base_team_t *team, void *sbuf, ucc_subset_t subset, ucc_coll_task_t **task_p) { + ucc_debug("<==================== install/ucc/src/components/tl/ucp/tl_ucp_service_coll.c:146 ====================>"); ucc_tl_ucp_team_t *tl_team = ucc_derived_of(team, ucc_tl_ucp_team_t); ucc_tl_ucp_task_t *task = ucc_tl_ucp_get_task(tl_team); uint32_t npolls = @@ -167,6 +169,7 @@ ucc_status_t ucc_tl_ucp_service_allgather(ucc_base_team_t *team, void *sbuf, ucc_status_t status; status = ucc_coll_task_init(&task->super, &bargs, team); + ucc_debug("<==================== install/ucc/src/components/tl/ucp/tl_ucp_service_coll.c:171 ====================>"); if (status != UCC_OK) { goto free_task; } diff --git a/src/ucc/api/ucc.h b/src/ucc/api/ucc.h index 3257c79896..6b0c13ade5 100644 --- a/src/ucc/api/ucc.h +++ b/src/ucc/api/ucc.h @@ -1880,6 +1880,8 @@ typedef struct ucc_coll_args { int64_t stride; uint64_t size; } active_set; + + } ucc_coll_args_t; /**