diff --git a/src/components/tl/ucp/Makefile.am b/src/components/tl/ucp/Makefile.am index cc0a118c60..4d684adfb5 100644 --- a/src/components/tl/ucp/Makefile.am +++ b/src/components/tl/ucp/Makefile.am @@ -34,10 +34,12 @@ alltoallv = \ alltoallv/alltoallv_pairwise.c \ alltoallv/alltoallv_hybrid.c -allreduce = \ - allreduce/allreduce.h \ - allreduce/allreduce.c \ - allreduce/allreduce_knomial.c \ +allreduce = \ + allreduce/allreduce.h \ + allreduce/allreduce.c \ + allreduce/allreduce_knomial.c \ + allreduce/allreduce_sliding_window.c \ + allreduce/allreduce_sliding_window_setup.c \ allreduce/allreduce_sra_knomial.c barrier = \ diff --git a/src/components/tl/ucp/allreduce/allreduce.c b/src/components/tl/ucp/allreduce/allreduce.c index 90e9d6bf48..1b01cb5455 100644 --- a/src/components/tl/ucp/allreduce/allreduce.c +++ b/src/components/tl/ucp/allreduce/allreduce.c @@ -13,13 +13,17 @@ ucc_base_coll_alg_info_t [UCC_TL_UCP_ALLREDUCE_ALG_KNOMIAL] = {.id = UCC_TL_UCP_ALLREDUCE_ALG_KNOMIAL, .name = "knomial", - .desc = - "recursive knomial with arbitrary radix (optimized for latency)"}, + .desc = "recursive knomial with arbitrary radix (optimized for " + "latency)"}, [UCC_TL_UCP_ALLREDUCE_ALG_SRA_KNOMIAL] = {.id = UCC_TL_UCP_ALLREDUCE_ALG_SRA_KNOMIAL, .name = "sra_knomial", .desc = "recursive knomial scatter-reduce followed by knomial " "allgather (optimized for BW)"}, + [UCC_TL_UCP_ALLREDUCE_ALG_SLIDING_WINDOW] = + {.id = UCC_TL_UCP_ALLREDUCE_ALG_SLIDING_WINDOW, + .name = "sliding_window", + .desc = "sliding window allreduce (optimized for running on DPU)"}, [UCC_TL_UCP_ALLREDUCE_ALG_LAST] = { .id = 0, .name = NULL, .desc = NULL}}; @@ -46,3 +50,40 @@ ucc_status_t ucc_tl_ucp_allreduce_knomial_init(ucc_base_coll_args_t *coll_args, out: return status; } + +ucc_status_t +ucc_tl_ucp_allreduce_sliding_window_init(ucc_base_coll_args_t *coll_args, + ucc_base_team_t * team, + ucc_coll_task_t ** task_h) +{ + ucc_status_t status = UCC_OK; + ucc_tl_ucp_team_t * tl_team = ucc_derived_of(team, ucc_tl_ucp_team_t); + ucc_tl_ucp_task_t * task; + ucc_ee_executor_params_t params; + + ALLREDUCE_TASK_CHECK(coll_args->args, tl_team); + + task = ucc_tl_ucp_init_task(coll_args, team); + if (ucc_unlikely(!task)) { + ucc_error("couldnt allocate task"); + return UCC_ERR_NO_MEMORY; + } + *task_h = &task->super; + task->super.post = ucc_tl_ucp_allreduce_sliding_window_start; + task->super.progress = ucc_tl_ucp_allreduce_sliding_window_progress; + task->super.finalize = ucc_tl_ucp_allreduce_sliding_window_finalize; + + ucc_tl_ucp_allreduce_sliding_window_task_init(coll_args, team, task); + + params.mask = UCC_EE_EXECUTOR_PARAM_FIELD_TYPE; + params.ee_type = UCC_EE_CPU_THREAD; + status = + ucc_ee_executor_init(¶ms, &task->allreduce_sliding_window.executor); + + if (UCC_OK != status) { + ucc_error("failed to init executor: %s", ucc_status_string(status)); + } + +out: + return status; +} diff --git a/src/components/tl/ucp/allreduce/allreduce.h b/src/components/tl/ucp/allreduce/allreduce.h index 250bebc981..5e545b6135 100644 --- a/src/components/tl/ucp/allreduce/allreduce.h +++ b/src/components/tl/ucp/allreduce/allreduce.h @@ -11,6 +11,7 @@ enum { UCC_TL_UCP_ALLREDUCE_ALG_KNOMIAL, UCC_TL_UCP_ALLREDUCE_ALG_SRA_KNOMIAL, + UCC_TL_UCP_ALLREDUCE_ALG_SLIDING_WINDOW, UCC_TL_UCP_ALLREDUCE_ALG_LAST }; @@ -35,21 +36,116 @@ ucc_status_t ucc_tl_ucp_allreduce_init(ucc_tl_ucp_task_t *task); #define ALLREDUCE_TASK_CHECK(_args, _team) \ CHECK_SAME_MEMTYPE((_args), (_team)); +#define ALLREDUCE_PACKED_KEY_MAX_LEN 1024 + +typedef struct ucc_tl_ucp_allreduce_sw_global_work_buf_info { + void *packed_src_memh; + void *packed_dst_memh; +} ucc_tl_ucp_allreduce_sw_global_work_buf_info; + +typedef enum ucc_tl_ucp_allreduce_sw_buf_state +{ + FREE, + RECVING, + REDUCING, + REDUCED, + SENDING, + IDLE, +} ucc_tl_ucp_allreduce_sw_buf_state; + +typedef struct ucc_tl_ucp_allreduce_sw_buf { + void * buf; + ucc_tl_ucp_allreduce_sw_buf_state state; + ucs_status_ptr_t ucp_req; + size_t count; + size_t bytes; +} ucc_tl_ucp_allreduce_sw_buf; + +typedef struct ucc_tl_ucp_allreduce_sw_pipeline { + ucc_tl_ucp_allreduce_sw_buf accbuf; + ucc_tl_ucp_allreduce_sw_buf *getbuf; + ucs_status_ptr_t * put_requests; + size_t buffer_size; + size_t num_buffers; + size_t avail_buffs; + size_t my_count; + size_t my_offset; + size_t count_issued; + size_t count_received; + size_t count_reduced; + size_t count_serviced; + size_t get_idx; + size_t red_idx; + ucc_rank_t src_rank; + ucc_rank_t dst_rank; + int done_get; + int done_red; + int done_put; + int posted_put; +} ucc_tl_ucp_allreduce_sw_pipeline; + +struct ucc_tl_ucp_allreduce_sw_export_buf { + ucp_context_h ucp_context; + ucp_mem_h memh; + void * packed_memh; + size_t packed_memh_len; + void * packed_key; + size_t packed_key_len; + uint64_t memh_id; +}; + +typedef struct ucc_tl_ucp_allreduce_sw_host_allgather { + void *src_buf; + void *dst_buf; + char packed_src_key[ALLREDUCE_PACKED_KEY_MAX_LEN]; + char packed_dst_key[ALLREDUCE_PACKED_KEY_MAX_LEN]; +} ucc_tl_ucp_allreduce_sw_host_allgather; + ucc_status_t ucc_tl_ucp_allreduce_knomial_init(ucc_base_coll_args_t *coll_args, ucc_base_team_t * team, ucc_coll_task_t ** task_h); +ucc_status_t +ucc_tl_ucp_allreduce_sliding_window_init(ucc_base_coll_args_t *coll_args, + ucc_base_team_t * team, + ucc_coll_task_t ** task_h); + ucc_status_t ucc_tl_ucp_allreduce_knomial_init_common(ucc_tl_ucp_task_t *task); +ucc_status_t +ucc_tl_ucp_allreduce_sliding_window_alloc_pipe(ucc_base_coll_args_t *coll_args, + ucc_base_team_t * team, + ucc_tl_ucp_task_t * task); + +ucc_status_t +ucc_tl_ucp_allreduce_sliding_window_task_init(ucc_base_coll_args_t *coll_args, + ucc_base_team_t * team, + ucc_tl_ucp_task_t * task); + +ucc_status_t ucc_tl_ucp_allreduce_sliding_window_allgather_info_finalize( + ucc_service_coll_req_t *scoll_req, ucc_tl_ucp_task_t *sw_task); + +ucc_status_t +ucc_tl_ucp_allreduce_sliding_window_free_gwbi(ucc_coll_task_t *coll_task); + ucc_status_t ucc_tl_ucp_allreduce_knomial_start(ucc_coll_task_t *task); void ucc_tl_ucp_allreduce_knomial_progress(ucc_coll_task_t *task); +ucc_status_t +ucc_tl_ucp_allreduce_sliding_window_start(ucc_coll_task_t *coll_task); + +void ucc_tl_ucp_allreduce_sliding_window_progress(ucc_coll_task_t *task); + +ucc_status_t +ucc_tl_ucp_allreduce_sliding_window_finalize(ucc_coll_task_t *task); + ucc_status_t ucc_tl_ucp_allreduce_knomial_finalize(ucc_coll_task_t *task); -ucc_status_t ucc_tl_ucp_allreduce_sra_knomial_init(ucc_base_coll_args_t *coll_args, - ucc_base_team_t * team, - ucc_coll_task_t ** task_h); +ucc_status_t +ucc_tl_ucp_allreduce_sra_knomial_init(ucc_base_coll_args_t *coll_args, + ucc_base_team_t * team, + ucc_coll_task_t ** task_h); ucc_status_t ucc_tl_ucp_allreduce_sra_knomial_start(ucc_coll_task_t *task); diff --git a/src/components/tl/ucp/allreduce/allreduce_sliding_window.c b/src/components/tl/ucp/allreduce/allreduce_sliding_window.c new file mode 100644 index 0000000000..8074df24c1 --- /dev/null +++ b/src/components/tl/ucp/allreduce/allreduce_sliding_window.c @@ -0,0 +1,472 @@ +/** + * Copyright(c) 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * + * See file LICENSE for terms. + */ + +#include "allreduce.h" +#include "../allgather/allgather.h" +#include "../barrier/barrier.h" +#include "utils/ucc_dt_reduce.h" +#include "tl_ucp_ep.h" + +static inline void +ucc_tl_ucp_allreduce_sliding_window_reset_buf(ucc_tl_ucp_allreduce_sw_buf *buf) +{ + buf->state = FREE; + buf->count = 0; + buf->bytes = 0; + buf->ucp_req = NULL; +} + +static inline void ucc_tl_ucp_allreduce_sliding_window_reset_pipeline( + ucc_tl_ucp_allreduce_sw_pipeline *pipe, ucc_rank_t rank, + size_t put_window_size) +{ + int i; + + pipe->avail_buffs = pipe->num_buffers; + pipe->src_rank = pipe->dst_rank = rank; + pipe->get_idx = pipe->red_idx = 0; + pipe->done_get = pipe->done_red = 0; + pipe->done_put = pipe->posted_put = 0; + pipe->count_issued = pipe->count_received = 0; + pipe->count_reduced = pipe->count_serviced = 0; + pipe->my_count = pipe->my_offset = 0; + + ucc_tl_ucp_allreduce_sliding_window_reset_buf(&pipe->accbuf); + for (i = 0; i < pipe->num_buffers; i++) { + ucc_tl_ucp_allreduce_sliding_window_reset_buf(&pipe->getbuf[i]); + } + + for (i = 0; i < put_window_size; i++) { + pipe->put_requests[i] = NULL; + } +} + +ucc_status_t +ucc_tl_ucp_allreduce_sliding_window_start(ucc_coll_task_t *coll_task) +{ + ucc_status_t status = UCC_OK; + ucc_tl_ucp_task_t *task = ucc_derived_of(coll_task, ucc_tl_ucp_task_t); + ucc_tl_ucp_team_t *team = TASK_TEAM(task); + ucc_rank_t rank = UCC_TL_TEAM_RANK(team); + uint32_t count_total = coll_task->bargs.args.dst.info.count; + ucc_rank_t size = coll_task->team->params.size; + ucc_datatype_t dtype = TASK_ARGS(task).dst.info.datatype; + size_t dt_size = ucc_dt_size(dtype); + ucc_tl_ucp_allreduce_sw_pipeline *pipe = + task->allreduce_sliding_window.pipe; + ucc_tl_ucp_allreduce_sw_host_allgather *allgather_data = + task->allreduce_sliding_window.allgather_data; + size_t allgather_size = sizeof(ucc_tl_ucp_allreduce_sw_host_allgather); + ucc_service_coll_req_t *scoll_req; + + ucc_tl_ucp_allreduce_sliding_window_reset_pipeline( + pipe, rank, task->allreduce_sliding_window.put_window_size); + + pipe->my_count = count_total / size; + pipe->my_offset = pipe->my_count * dt_size * rank; + if (rank == size - 1) { + pipe->my_count += count_total % size; + } + + ucc_tl_ucp_task_reset(task, UCC_INPROGRESS); + + task->allreduce_sliding_window.reduce_in_progress = 0; + task->allreduce_sliding_window.barrier_task = NULL; + + UCC_CHECK_GOTO( + ucc_service_allgather( + UCC_TL_CORE_TEAM(team), allgather_data, + PTR_OFFSET(allgather_data, allgather_size), allgather_size, + ucc_sbgp_to_subset(ucc_topo_get_sbgp(team->topo, UCC_SBGP_FULL)), + &scoll_req), + out, status); + + scoll_req->data = allgather_data; + task->allreduce_sliding_window.allgather_scoll_req = scoll_req; + + return ucc_progress_queue_enqueue(UCC_TL_CORE_CTX(team)->pq, &task->super); +out: + return status; +} + +ucc_status_t +ucc_tl_ucp_allreduce_sliding_window_finalize(ucc_coll_task_t *coll_task) +{ + ucc_tl_ucp_task_t *task = ucc_derived_of(coll_task, ucc_tl_ucp_task_t); + ucc_status_t st = + ucc_ee_executor_finalize(task->allreduce_sliding_window.executor); + + if (ucc_unlikely(st != UCC_OK)) { + tl_error(UCC_TASK_LIB(task), "failed to finalize executor"); + } + + if (ucc_tl_ucp_allreduce_sliding_window_free_gwbi(coll_task) != UCC_OK) { + printf("error freeing resources\n"); + } + + st = ucc_tl_ucp_coll_finalize(&task->super); + + if (ucc_unlikely(st != UCC_OK)) { + tl_error(UCC_TASK_LIB(task), "failed to finalize collective"); + } + + return st; +} + +static inline void ucc_tl_ucp_allreduce_sliding_window_reduction( + ucc_coll_task_t *coll_task, ucc_tl_ucp_allreduce_sw_buf *accbuf, + ucc_tl_ucp_allreduce_sw_buf *getbuf) +{ + ucc_status_t status = UCC_OK; + ucc_tl_ucp_task_t *task = ucc_derived_of(coll_task, ucc_tl_ucp_task_t); + ucc_coll_args_t * args = &TASK_ARGS(task); + ucc_datatype_t dt = TASK_ARGS(task).dst.info.datatype; + + status = + ucc_dt_reduce(accbuf->buf, getbuf->buf, accbuf->buf, accbuf->count, dt, + args, 0, 0, task->allreduce_sliding_window.executor, + &task->allreduce_sliding_window.etask); + + task->allreduce_sliding_window.reduce_in_progress = 1; + + if (ucc_unlikely(UCC_OK != status)) { + tl_error(UCC_TASK_LIB(task), "failed to perform dt reduction"); + task->super.status = status; + return; + } +} + +static inline void +ucc_tl_ucp_allreduce_sliding_window_test_reduction(ucc_tl_ucp_task_t *task) +{ + ucc_status_t status; + +#define SAVE_STATE(_reduce_in_progress) _reduce_in_progress = 1 + + EXEC_TASK_TEST(task->allreduce_sliding_window.reduce_in_progress, + "failed to perform dt reduction", + task->allreduce_sliding_window.etask); + + // If it didn't complete, we would have returned by now. So, clear the flag + task->allreduce_sliding_window.reduce_in_progress = 0; +} + +static inline ucc_status_t +ucc_tl_ucp_allreduce_sliding_window_req_test(ucs_status_ptr_t request, + ucc_tl_ucp_task_t *task) +{ + if (request == NULL) { + return UCC_OK; + } else if (UCS_PTR_IS_ERR(request)) { + tl_error(UCC_TASK_LIB(task), "unable to complete UCX request=%p: %d\n", + request, UCS_PTR_STATUS(request)); + return ucs_status_to_ucc_status(UCS_PTR_STATUS(request)); + } else { + return ucs_status_to_ucc_status(ucp_request_check_status(request)); + } +} + +static inline void ucc_tl_ucp_allreduce_sliding_window_allgather_info_test( + ucc_coll_task_t *coll_task) +{ + ucc_tl_ucp_task_t * task = ucc_derived_of(coll_task, ucc_tl_ucp_task_t); + ucc_service_coll_req_t *allgather_scoll_req = + task->allreduce_sliding_window.allgather_scoll_req; + ucc_status_t status = ucc_service_coll_test(allgather_scoll_req); + if (status < 0) { + tl_error(coll_task->team->context->lib, + "failure during service coll exchange: %s", + ucc_status_string(status)); + ucc_service_coll_finalize(allgather_scoll_req); + task->super.status = status; + return; + } + if (UCC_INPROGRESS == status) { + return; + } + ucc_assert(status == UCC_OK); + + // copy from allgather recvbuf into gwbi + ucc_tl_ucp_allreduce_sliding_window_allgather_info_finalize( + allgather_scoll_req, task); + + ucc_service_coll_finalize( + task->allreduce_sliding_window.allgather_scoll_req); + task->allreduce_sliding_window.allgather_scoll_req = NULL; +} + +static inline void ucc_tl_ucp_allreduce_sliding_window_allgather_free_rkeys( + ucc_coll_task_t *coll_task) +{ + int i; + ucc_base_team_t * team = coll_task->team; + ucc_rank_t team_size = (ucc_rank_t)team->params.size; + ucc_tl_ucp_task_t *task = ucc_derived_of(coll_task, ucc_tl_ucp_task_t); + + for (i = 0; i < team_size; i++) { + if (!task->allreduce_sliding_window.inplace) + ucp_rkey_destroy(task->allreduce_sliding_window.src_rkeys[i]); + ucp_rkey_destroy(task->allreduce_sliding_window.dst_rkeys[i]); + } +} + +static inline void +ucc_tl_ucp_allreduce_sliding_window_barrier(ucc_coll_task_t *coll_task) +{ + ucc_tl_ucp_task_t *task = ucc_derived_of(coll_task, ucc_tl_ucp_task_t); + ucc_base_team_t * team = coll_task->team; + ucc_status_t status; + + ucc_base_coll_args_t coll_args = { + .team = coll_task->team->params.team, + .args.coll_type = UCC_COLL_TYPE_BARRIER, + }; + + status = ucc_tl_ucp_coll_init(&coll_args, team, + &task->allreduce_sliding_window.barrier_task); + if (status < 0) { + tl_error(coll_task->team->context->lib, + "failure during sliding window barrier init: %s", + ucc_status_string(status)); + task->super.status = status; + return; + } + + status = ucc_tl_ucp_barrier_knomial_start( + task->allreduce_sliding_window.barrier_task); + if (status < 0) { + tl_error(coll_task->team->context->lib, + "failure during sliding window barrier start: %s", + ucc_status_string(status)); + task->super.status = status; + } +} + +void ucc_tl_ucp_allreduce_sliding_window_progress(ucc_coll_task_t *coll_task) +{ + ucc_tl_ucp_task_t *task = ucc_derived_of(coll_task, ucc_tl_ucp_task_t); + ucc_rank_t size = (ucc_rank_t)task->subset.map.ep_num; + ucc_datatype_t dtype = TASK_ARGS(task).dst.info.datatype; + size_t dt_size = ucc_dt_size(dtype); + uint32_t host_team_size = size; + ucc_base_team_t * base_team = coll_task->team; + ucc_tl_ucp_team_t *tl_team = ucc_derived_of(base_team, ucc_tl_ucp_team_t); + ucc_tl_ucp_context_t * tl_ctx = UCC_TL_UCP_TEAM_CTX(tl_team); + ucc_tl_ucp_allreduce_sw_pipeline *pipe = + task->allreduce_sliding_window.pipe; + ucc_tl_ucp_allreduce_sw_buf *accbuf = &pipe->accbuf; + ucp_request_param_t req_param = {0}; + int i = 0; + ucc_service_coll_req_t * allgather_scoll_req = + task->allreduce_sliding_window.allgather_scoll_req; + ucc_coll_task_t *barrier_task = task->allreduce_sliding_window.barrier_task; + size_t remaining_elems; + size_t get_idx; + size_t count; + size_t get_offset; + size_t data_size; + ucc_rank_t src_rank; + ucc_rank_t dst_rank; + void * src_addr; + void * dst_addr; + ucs_status_ptr_t request; + size_t red_idx; + ucc_tl_ucp_allreduce_sw_buf *redbuf; + ucc_tl_ucp_allreduce_sw_buf *getbuf; + size_t put_offset; + int window; + int put_idx; + + if (barrier_task != NULL) { + // mark sliding window task complete once barrier finishes + if (barrier_task->super.status == UCC_OK) { + ucc_tl_ucp_put_task( + ucc_derived_of(task->allreduce_sliding_window.barrier_task, + ucc_tl_ucp_task_t)); + task->allreduce_sliding_window.barrier_task = NULL; + task->super.status = UCC_OK; + } + + ucc_assert(barrier_task->super.status >= 0); + return; + } + + if (allgather_scoll_req != NULL) { + ucc_tl_ucp_allreduce_sliding_window_allgather_info_test(coll_task); + return; + } + + if (task->allreduce_sliding_window.reduce_in_progress) { + // We've previously started a reduction on the accbuf that hasn't yet + // completed. + ucc_tl_ucp_allreduce_sliding_window_test_reduction(task); + + if (task->allreduce_sliding_window.reduce_in_progress) { + return; + } + } + + if (pipe->count_serviced < pipe->my_count) { + if ((pipe->count_received < pipe->my_count) && + (pipe->done_get < host_team_size) && (pipe->avail_buffs > 0) && + (accbuf->state != REDUCED && accbuf->state != SENDING)) { + remaining_elems = pipe->my_count - pipe->count_received; + get_idx = pipe->get_idx % pipe->num_buffers; + count = ucc_min(pipe->buffer_size / dt_size, remaining_elems); + get_offset = pipe->count_received * dt_size + pipe->my_offset; + data_size = count * dt_size; + src_rank = pipe->src_rank; + getbuf = accbuf->state == FREE ? accbuf : &pipe->getbuf[get_idx]; + src_addr = + task->allreduce_sliding_window.sbufs[src_rank] + get_offset; + dst_addr = getbuf->buf; + + ucc_assert(getbuf->state == FREE); + + getbuf->state = RECVING; + getbuf->count = count; + getbuf->bytes = data_size; + getbuf->ucp_req = ucp_get_nbx( + task->allreduce_sliding_window.eps[src_rank], dst_addr, + data_size, (uint64_t)src_addr, + task->allreduce_sliding_window.src_rkeys[src_rank], &req_param); + + pipe->src_rank = (src_rank + 1) % host_team_size; + + if (getbuf != accbuf) { + pipe->avail_buffs--; + pipe->get_idx++; + } + + pipe->done_get++; + if (pipe->done_get == host_team_size) { + pipe->count_received += count; + } + } + + if (accbuf->state == RECVING) { + request = accbuf->ucp_req; + if (ucc_tl_ucp_allreduce_sliding_window_req_test(request, task) == + UCC_OK) { + if (request) + ucp_request_free(request); + accbuf->state = REDUCING; + accbuf->ucp_req = NULL; + } + } + + red_idx = pipe->red_idx % pipe->num_buffers; + redbuf = &pipe->getbuf[red_idx]; + if (accbuf->state == REDUCING && redbuf->state == RECVING) { + request = redbuf->ucp_req; + if (ucc_tl_ucp_allreduce_sliding_window_req_test(request, task) == + UCC_OK) { + if (request) + ucp_request_free(request); + redbuf->state = REDUCING; + redbuf->ucp_req = NULL; + + ucc_tl_ucp_allreduce_sliding_window_reduction(coll_task, accbuf, + redbuf); + + ucc_tl_ucp_allreduce_sliding_window_test_reduction(task); + + if (task->allreduce_sliding_window.reduce_in_progress) { + return; + } + + redbuf->state = FREE; + pipe->avail_buffs++; + pipe->red_idx++; + pipe->done_red++; + + if (pipe->done_red == host_team_size - 1) { + accbuf->state = REDUCED; + pipe->count_reduced += accbuf->count; + } + } + } + + if ((pipe->count_serviced < pipe->count_reduced) && + (accbuf->state == REDUCED)) { + data_size = accbuf->bytes; + put_offset = pipe->count_serviced * dt_size + pipe->my_offset; + + window = ucc_min(task->allreduce_sliding_window.put_window_size, + host_team_size - pipe->posted_put); + + for (i = 0; i < window; i++) { + dst_rank = pipe->dst_rank; + src_addr = accbuf->buf; + dst_addr = + task->allreduce_sliding_window.rbufs[dst_rank] + put_offset; + put_idx = pipe->posted_put % + task->allreduce_sliding_window.put_window_size; + + if (task->allreduce_sliding_window.put_requests[put_idx] != + NULL) { + // We've already posted a put at this index that didn't yet + // complete, left this function and came back. Skip to check + // whether this request finished instead of overwriting it + // with another put + break; + } + + ucp_worker_fence(tl_ctx->worker.ucp_worker); + task->allreduce_sliding_window.put_requests[put_idx] = + ucp_put_nbx( + task->allreduce_sliding_window.eps[dst_rank], src_addr, + data_size, (uint64_t)dst_addr, + task->allreduce_sliding_window.dst_rkeys[dst_rank], + &req_param); + + pipe->posted_put++; + pipe->dst_rank = (dst_rank + 1) % host_team_size; + } + + for (i = pipe->done_put; i < pipe->posted_put; i++) { + put_idx = i % task->allreduce_sliding_window.put_window_size; + request = task->allreduce_sliding_window.put_requests[put_idx]; + + // These are fenced, so if the first fails, the proceding will + // too + if (ucc_tl_ucp_allreduce_sliding_window_req_test( + request, task) != UCC_OK) + break; + + if (request != NULL) + ucp_request_free(request); + task->allreduce_sliding_window.put_requests[put_idx] = NULL; + pipe->done_put++; + } + + if (pipe->done_put == host_team_size) { + ucc_assert(pipe->avail_buffs == pipe->num_buffers); + ucc_assert(pipe->done_get == host_team_size); + ucc_assert(pipe->done_red == host_team_size - 1); + ucc_assert(pipe->done_put == host_team_size); + + pipe->count_serviced += accbuf->count; + + ucc_tl_ucp_allreduce_sliding_window_reset_buf(accbuf); + pipe->done_get = 0; + pipe->done_red = pipe->done_put = pipe->posted_put = 0; + + for (i = 0; i < task->allreduce_sliding_window.put_window_size; + i++) { + task->allreduce_sliding_window.put_requests[i] = NULL; + } + } + } + + ucp_worker_progress(tl_ctx->worker.ucp_worker); + } + + if (pipe->count_serviced == pipe->my_count) { + ucc_tl_ucp_allreduce_sliding_window_barrier(coll_task); + ucc_tl_ucp_allreduce_sliding_window_allgather_free_rkeys(coll_task); + } +} diff --git a/src/components/tl/ucp/allreduce/allreduce_sliding_window_setup.c b/src/components/tl/ucp/allreduce/allreduce_sliding_window_setup.c new file mode 100644 index 0000000000..9c5baed836 --- /dev/null +++ b/src/components/tl/ucp/allreduce/allreduce_sliding_window_setup.c @@ -0,0 +1,287 @@ +/** + * Copyright(c) 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * + * See file LICENSE for terms. + */ + +#include "allreduce.h" +#include "../allgather/allgather.h" +#include "utils/ucc_dt_reduce.h" +#include "tl_ucp_ep.h" + +static int ucc_tl_ucp_allreduce_sliding_window_register( + ucp_context_h ucp_context, ucc_tl_ucp_team_t *tl_team, + struct ucc_tl_ucp_allreduce_sw_export_buf *ebuf, void *packed_memh) +{ + ucs_status_t ucs_status; + ucp_mem_map_params_t params = {0}; + + ebuf->ucp_context = ucp_context; + + params.field_mask = UCP_MEM_MAP_PARAM_FIELD_EXPORTED_MEMH_BUFFER; + params.exported_memh_buffer = packed_memh; + + ucs_status = ucp_mem_map(ucp_context, ¶ms, &ebuf->memh); + if (UCS_OK != ucs_status) { + tl_error(UCC_TL_TEAM_LIB(tl_team), + "import using ucp_mem_map() returned error: %s\n", + ucs_status_string(ucs_status)); + return 0; + } + + ucs_status = ucp_rkey_pack(ucp_context, ebuf->memh, &ebuf->packed_key, + &ebuf->packed_key_len); + if (UCS_OK != ucs_status) { + tl_error(UCC_TL_TEAM_LIB(tl_team), + "ucp_rkey_pack() returned error: %s\n", + ucs_status_string(ucs_status)); + return 0; + } + + return 0; +} + +ucc_status_t +ucc_tl_ucp_allreduce_sliding_window_task_init(ucc_base_coll_args_t *coll_args, + ucc_base_team_t * team, + ucc_tl_ucp_task_t * task) +{ + ucc_status_t status = UCC_OK; + void * src_buf = coll_args->args.src.info.buffer; + void * dst_buf = coll_args->args.dst.info.buffer; + ucc_rank_t team_size = (ucc_rank_t)team->params.size; + ucc_tl_ucp_team_t * tl_team = ucc_derived_of(team, ucc_tl_ucp_team_t); + ucc_tl_ucp_context_t *tl_ctx = UCC_TL_UCP_TEAM_CTX(tl_team); + ucc_tl_ucp_allreduce_sw_global_work_buf_info *gwbi_p = NULL; + size_t allgather_size = sizeof(ucc_tl_ucp_allreduce_sw_host_allgather); + ucc_tl_ucp_allreduce_sw_host_allgather *allgather_data; + + tl_debug(UCC_TL_TEAM_LIB(tl_team), "allocating pipe\n"); + if ((status = ucc_tl_ucp_allreduce_sliding_window_alloc_pipe( + coll_args, team, task)) != UCC_OK) { + tl_error(UCC_TL_TEAM_LIB(tl_team), "error while allocating pipe"); + goto out; + } + + allgather_data = ucc_malloc(allgather_size * (team_size + 1)); + + gwbi_p = coll_args->args.global_work_buffer; + task->super.bargs.args.global_work_buffer = gwbi_p; + + task->allreduce_sliding_window.inplace = UCC_IS_INPLACE(coll_args->args); + + task->allreduce_sliding_window.barrier_task = NULL; + + if (!task->allreduce_sliding_window.inplace) { + task->allreduce_sliding_window.sbufs = + ucc_malloc(sizeof(void *) * team_size); + task->allreduce_sliding_window.src_rkeys = + ucc_malloc(sizeof(ucp_rkey_h) * team_size); + } + + task->allreduce_sliding_window.rbufs = + ucc_malloc(sizeof(void *) * team_size); + task->allreduce_sliding_window.dst_rkeys = + ucc_malloc(sizeof(ucp_rkey_h) * team_size); + task->allreduce_sliding_window.eps = + ucc_malloc(sizeof(ucp_ep_h) * team_size); + + task->allreduce_sliding_window.put_requests = + task->allreduce_sliding_window.pipe->put_requests; + + if (!task->allreduce_sliding_window.inplace) { + task->allreduce_sliding_window.src_ebuf = + ucc_malloc(sizeof(struct ucc_tl_ucp_allreduce_sw_export_buf)); + } else { + task->allreduce_sliding_window.src_ebuf = NULL; + } + + task->allreduce_sliding_window.dst_ebuf = + ucc_malloc(sizeof(struct ucc_tl_ucp_allreduce_sw_export_buf)); + + if (!task->allreduce_sliding_window.inplace) + allgather_data->src_buf = src_buf; + + allgather_data->dst_buf = dst_buf; + + // Register the src and dst bufs + if (!task->allreduce_sliding_window.inplace) { + ucc_tl_ucp_allreduce_sliding_window_register( + tl_ctx->worker.ucp_context, tl_team, + task->allreduce_sliding_window.src_ebuf, gwbi_p->packed_src_memh); + memcpy(allgather_data->packed_src_key, + task->allreduce_sliding_window.src_ebuf->packed_key, + task->allreduce_sliding_window.src_ebuf->packed_key_len); + } + + ucc_tl_ucp_allreduce_sliding_window_register( + tl_ctx->worker.ucp_context, tl_team, + task->allreduce_sliding_window.dst_ebuf, gwbi_p->packed_dst_memh); + memcpy(allgather_data->packed_dst_key, + task->allreduce_sliding_window.dst_ebuf->packed_key, + task->allreduce_sliding_window.dst_ebuf->packed_key_len); + + task->allreduce_sliding_window.allgather_data = allgather_data; + task->allreduce_sliding_window.allgather_scoll_req = NULL; + + return UCC_OK; +out: + return status; +} + +ucc_status_t ucc_tl_ucp_allreduce_sliding_window_allgather_info_finalize( + ucc_service_coll_req_t *scoll_req, ucc_tl_ucp_task_t *sw_task) +{ + ucc_status_t status; + ucc_rank_t i; + ucc_base_team_t * base_team = sw_task->super.team; + ucc_tl_ucp_team_t *tl_team = ucc_derived_of(base_team, ucc_tl_ucp_team_t); + ucc_rank_t team_size = base_team->params.size; + + size_t allgather_size = sizeof(ucc_tl_ucp_allreduce_sw_host_allgather); + ucc_tl_ucp_allreduce_sw_host_allgather *all_host_allgather = + PTR_OFFSET(scoll_req->data, allgather_size); + + for (i = 0; i < team_size; i++) { + ucs_status_t ucs_status = UCS_OK; + ucp_rkey_h src_unpacked, dst_unpacked; + ucp_ep_h ep; + + status = ucc_tl_ucp_get_ep(tl_team, i, &ep); + if (ucc_unlikely(UCC_OK != status)) { + return status; + } + + ucs_status = ucp_ep_rkey_unpack( + ep, all_host_allgather[i].packed_dst_key, &dst_unpacked); + if (UCS_OK != ucs_status) { + tl_error(UCC_TL_TEAM_LIB(tl_team), "dst rkey unpack failed\n"); + return UCC_ERR_NO_RESOURCE; + } + + sw_task->allreduce_sliding_window.rbufs[i] = + all_host_allgather[i].dst_buf; + sw_task->allreduce_sliding_window.dst_rkeys[i] = dst_unpacked; + + if (!sw_task->allreduce_sliding_window.inplace) { + ucs_status = ucp_ep_rkey_unpack( + ep, all_host_allgather[i].packed_src_key, &src_unpacked); + if (UCS_OK != ucs_status) { + tl_error(UCC_TL_TEAM_LIB(tl_team), "src rkey unpack failed\n"); + return UCC_ERR_NO_RESOURCE; + } + + sw_task->allreduce_sliding_window.sbufs[i] = + all_host_allgather[i].src_buf; + sw_task->allreduce_sliding_window.src_rkeys[i] = src_unpacked; + } else { + sw_task->allreduce_sliding_window.sbufs = + sw_task->allreduce_sliding_window.rbufs; + sw_task->allreduce_sliding_window.src_rkeys = + sw_task->allreduce_sliding_window.dst_rkeys; + } + + sw_task->allreduce_sliding_window.eps[i] = ep; + } + + return status; +} + +ucc_status_t +ucc_tl_ucp_allreduce_sliding_window_alloc_pipe(ucc_base_coll_args_t *coll_args, + ucc_base_team_t * team, + ucc_tl_ucp_task_t * task) +{ + int i; + ucc_tl_ucp_team_t *tl_team = ucc_derived_of(team, ucc_tl_ucp_team_t); + ucc_rank_t team_size = (ucc_rank_t)team->params.size; + + const size_t buf_size = + UCC_TL_UCP_TEAM_LIB(tl_team)->cfg.allreduce_sliding_window_buf_size; + int put_window_size = UCC_TL_UCP_TEAM_LIB(tl_team) + ->cfg.allreduce_sliding_window_put_window_size; + int num_get_bufs = + UCC_TL_UCP_TEAM_LIB(tl_team)->cfg.allreduce_sliding_window_num_get_bufs; + + ucc_tl_ucp_allreduce_sw_pipeline *pipe = + (ucc_tl_ucp_allreduce_sw_pipeline *)ucc_malloc( + sizeof(ucc_tl_ucp_allreduce_sw_pipeline)); + if (pipe == NULL) { + tl_error(UCC_TL_TEAM_LIB(tl_team), "error allocating dpu pipe\n"); + return UCC_ERR_NO_RESOURCE; + } + + if (put_window_size <= 0) + put_window_size = team_size; + if (num_get_bufs <= 0) + num_get_bufs = team_size; + + pipe->accbuf.buf = ucc_malloc(buf_size); + if (pipe->accbuf.buf == NULL) { + tl_error(UCC_TL_TEAM_LIB(tl_team), "error allocating accbuf\n"); + return UCC_ERR_NO_RESOURCE; + } + pipe->getbuf = (ucc_tl_ucp_allreduce_sw_buf *)ucc_malloc( + num_get_bufs * sizeof(ucc_tl_ucp_allreduce_sw_buf)); + if (pipe->getbuf == NULL) { + tl_error(UCC_TL_TEAM_LIB(tl_team), "error allocating getbuf array\n"); + return UCC_ERR_NO_RESOURCE; + } + for (i = 0; i < num_get_bufs; i++) { + pipe->getbuf[i].buf = ucc_malloc(buf_size); + } + + pipe->buffer_size = buf_size; + pipe->num_buffers = num_get_bufs; + pipe->put_requests = (ucs_status_ptr_t *)ucc_malloc( + put_window_size * sizeof(ucs_status_ptr_t)); + + task->allreduce_sliding_window.put_window_size = put_window_size; + task->allreduce_sliding_window.num_get_bufs = num_get_bufs; + + task->allreduce_sliding_window.pipe = pipe; + + return UCC_OK; +} + +ucc_status_t +ucc_tl_ucp_allreduce_sliding_window_free_gwbi(ucc_coll_task_t *coll_task) +{ + int i; + ucc_base_team_t * team = coll_task->team; + ucc_tl_ucp_team_t * tl_team = ucc_derived_of(team, ucc_tl_ucp_team_t); + ucc_tl_ucp_task_t * task = ucc_derived_of(coll_task, ucc_tl_ucp_task_t); + ucc_tl_ucp_context_t *tl_ctx = UCC_TL_UCP_TEAM_CTX(tl_team); + ucc_tl_ucp_allreduce_sw_pipeline *pipe = + task->allreduce_sliding_window.pipe; + + if (!task->allreduce_sliding_window.inplace) + ucc_free(task->allreduce_sliding_window.sbufs); + + ucc_free(task->allreduce_sliding_window.rbufs); + ucc_free(task->allreduce_sliding_window.eps); + ucc_free(task->allreduce_sliding_window.allgather_data); + + if (!task->allreduce_sliding_window.inplace) { + ucp_mem_unmap(tl_ctx->worker.ucp_context, + task->allreduce_sliding_window.src_ebuf->memh); + ucc_free(task->allreduce_sliding_window.src_ebuf); + ucc_free(task->allreduce_sliding_window.src_rkeys); + } + + ucp_mem_unmap(tl_ctx->worker.ucp_context, + task->allreduce_sliding_window.dst_ebuf->memh); + ucc_free(task->allreduce_sliding_window.dst_ebuf); + ucc_free(task->allreduce_sliding_window.dst_rkeys); + + ucc_free(pipe->accbuf.buf); + for (i = 0; i < task->allreduce_sliding_window.num_get_bufs; i++) { + ucc_free(pipe->getbuf[i].buf); + } + ucc_free(pipe->getbuf); + ucc_free(pipe->put_requests); + ucc_free(pipe); + + return UCC_OK; +} + diff --git a/src/components/tl/ucp/barrier/barrier.c b/src/components/tl/ucp/barrier/barrier.c index 84170dce7b..19f82d6021 100644 --- a/src/components/tl/ucp/barrier/barrier.c +++ b/src/components/tl/ucp/barrier/barrier.c @@ -7,9 +7,6 @@ #include "tl_ucp.h" #include "barrier.h" -ucc_status_t ucc_tl_ucp_barrier_knomial_start(ucc_coll_task_t *task); -void ucc_tl_ucp_barrier_knomial_progress(ucc_coll_task_t *task); - ucc_base_coll_alg_info_t ucc_tl_ucp_barrier_algs[UCC_TL_UCP_BARRIER_ALG_LAST + 1] = { [UCC_TL_UCP_BARRIER_ALG_KNOMIAL] = diff --git a/src/components/tl/ucp/barrier/barrier.h b/src/components/tl/ucp/barrier/barrier.h index de16490621..a6c7c65336 100644 --- a/src/components/tl/ucp/barrier/barrier.h +++ b/src/components/tl/ucp/barrier/barrier.h @@ -18,4 +18,7 @@ extern ucc_base_coll_alg_info_t ucc_status_t ucc_tl_ucp_barrier_init(ucc_tl_ucp_task_t *task); +ucc_status_t ucc_tl_ucp_barrier_knomial_start(ucc_coll_task_t *task); +void ucc_tl_ucp_barrier_knomial_progress(ucc_coll_task_t *task); + #endif diff --git a/src/components/tl/ucp/tl_ucp.c b/src/components/tl/ucp/tl_ucp.c index 3a07c81f65..83fa7dceeb 100644 --- a/src/components/tl/ucp/tl_ucp.c +++ b/src/components/tl/ucp/tl_ucp.c @@ -104,6 +104,23 @@ ucc_config_field_t ucc_tl_ucp_lib_config_table[] = { ucc_offsetof(ucc_tl_ucp_lib_config_t, allreduce_kn_radix), UCC_CONFIG_TYPE_UINT_RANGED}, + {"ALLREDUCE_SLIDING_WIN_BUF_SIZE", "65536", + "Buffer size of the sliding window allreduce algorithm", + ucc_offsetof(ucc_tl_ucp_lib_config_t, allreduce_sliding_window_buf_size), + UCC_CONFIG_TYPE_MEMUNITS}, + + {"ALLREDUCE_SLIDING_WIN_PUT_WINDOW_SIZE", "0", + "Buffer size for sliding window allreduce. <= 0 means set to team size", + ucc_offsetof(ucc_tl_ucp_lib_config_t, + allreduce_sliding_window_put_window_size), + UCC_CONFIG_TYPE_UINT}, + + {"ALLREDUCE_SLIDING_WIN_NUM_GET_BUFS", "0", + "Buffer size for sliding window allreduce. <= 0 means set to team size", + ucc_offsetof(ucc_tl_ucp_lib_config_t, + allreduce_sliding_window_num_get_bufs), + UCC_CONFIG_TYPE_UINT}, + {"ALLREDUCE_SRA_KN_RADIX", "auto", "Radix of the scatter-reduce-allgather (SRA) knomial allreduce algorithm", ucc_offsetof(ucc_tl_ucp_lib_config_t, allreduce_sra_kn_radix), diff --git a/src/components/tl/ucp/tl_ucp.h b/src/components/tl/ucp/tl_ucp.h index 75fdc76de5..6d51d37bbe 100644 --- a/src/components/tl/ucp/tl_ucp.h +++ b/src/components/tl/ucp/tl_ucp.h @@ -48,6 +48,9 @@ typedef struct ucc_tl_ucp_lib_config { uint32_t fanin_kn_radix; uint32_t fanout_kn_radix; uint32_t barrier_kn_radix; + size_t allreduce_sliding_window_buf_size; + uint32_t allreduce_sliding_window_put_window_size; + uint32_t allreduce_sliding_window_num_get_bufs; ucc_mrange_uint_t allreduce_kn_radix; ucc_mrange_uint_t allreduce_sra_kn_radix; uint32_t reduce_scatter_kn_radix; diff --git a/src/components/tl/ucp/tl_ucp_coll.c b/src/components/tl/ucp/tl_ucp_coll.c index bbbac03fc7..e3dd1782af 100644 --- a/src/components/tl/ucp/tl_ucp_coll.c +++ b/src/components/tl/ucp/tl_ucp_coll.c @@ -268,6 +268,9 @@ ucc_status_t ucc_tl_ucp_alg_id_to_init(int alg_id, const char *alg_id_str, case UCC_TL_UCP_ALLREDUCE_ALG_SRA_KNOMIAL: *init = ucc_tl_ucp_allreduce_sra_knomial_init; break; + case UCC_TL_UCP_ALLREDUCE_ALG_SLIDING_WINDOW: + *init = ucc_tl_ucp_allreduce_sliding_window_init; + break; default: status = UCC_ERR_INVALID_PARAM; break; diff --git a/src/components/tl/ucp/tl_ucp_coll.h b/src/components/tl/ucp/tl_ucp_coll.h index c2a260b103..a4def89286 100644 --- a/src/components/tl/ucp/tl_ucp_coll.h +++ b/src/components/tl/ucp/tl_ucp_coll.h @@ -86,6 +86,11 @@ enum ucc_tl_ucp_task_flags { UCC_TL_UCP_TASK_FLAG_SUBSET = UCC_BIT(0), }; +typedef struct ucc_tl_ucp_allreduce_sw_pipeline + ucc_tl_ucp_allreduce_sw_pipeline; +typedef struct ucc_tl_ucp_allreduce_sw_host_allgather + ucc_tl_ucp_allreduce_sw_host_allgather; + typedef struct ucc_tl_ucp_task { ucc_coll_task_t super; uint32_t flags; @@ -119,6 +124,27 @@ typedef struct ucc_tl_ucp_task { ucc_ee_executor_task_t *etask; ucc_ee_executor_t *executor; } allreduce_kn; + struct { + int reduce_in_progress; + ucp_rkey_h * src_rkeys; //unpacked + ucp_rkey_h * dst_rkeys; //unpacked + ucp_ep_h * eps; + void ** sbufs; + void ** rbufs; + ucc_coll_task_t * allreduce_task_h; + ucc_tl_ucp_allreduce_sw_pipeline * pipe; + ucc_ee_executor_task_t * etask; + ucc_ee_executor_t * executor; + int put_window_size; + int num_get_bufs; + ucs_status_ptr_t * put_requests; + ucc_service_coll_req_t * allgather_scoll_req; + ucc_tl_ucp_allreduce_sw_host_allgather * allgather_data; + ucc_coll_task_t * barrier_task; + struct ucc_tl_ucp_allreduce_sw_export_buf *src_ebuf; + struct ucc_tl_ucp_allreduce_sw_export_buf *dst_ebuf; + int inplace; + } allreduce_sliding_window; struct { int phase; ucc_knomial_pattern_t p; @@ -253,7 +279,8 @@ static inline void ucc_tl_ucp_task_reset(ucc_tl_ucp_task_t *task, static inline ucc_tl_ucp_task_t *ucc_tl_ucp_get_task(ucc_tl_ucp_team_t *team) { ucc_tl_ucp_context_t *ctx = UCC_TL_UCP_TEAM_CTX(team); - ucc_tl_ucp_task_t *task = ucc_mpool_get(&ctx->req_mp);; + ucc_tl_ucp_task_t *task = (ucc_tl_ucp_task_t*) + ucc_mpool_get(&ctx->req_mp); UCC_TL_UCP_PROFILE_REQUEST_NEW(task, "tl_ucp_task", 0); task->super.flags = 0; @@ -280,7 +307,7 @@ ucc_tl_ucp_get_schedule(ucc_tl_ucp_team_t *team, { ucc_tl_ucp_context_t *ctx = UCC_TL_UCP_TEAM_CTX(team); - *schedule = ucc_mpool_get(&ctx->req_mp); + *schedule = (ucc_tl_ucp_schedule_t*) ucc_mpool_get(&ctx->req_mp); if (ucc_unlikely(!(*schedule))) { return UCC_ERR_NO_MEMORY; diff --git a/test/gtest/Makefile.am b/test/gtest/Makefile.am index 591b2cf005..08cb7159b6 100644 --- a/test/gtest/Makefile.am +++ b/test/gtest/Makefile.am @@ -53,6 +53,7 @@ gtest_CPPFLAGS = \ -I$(top_srcdir)/test \ -I$(top_builddir)/src \ -I$(top_srcdir)/test/gtest \ + -I$(top_srcdir)/src/components/tl/ucp \ $(GTEST_CPPFLAGS) gtest_LDFLAGS = $(GTEST_LDFLAGS) -pthread -no-install -Wl,-dynamic-list-data \ @@ -64,47 +65,48 @@ gtest_CXXFLAGS = -std=gnu++11 \ -DGTEST_UCM_HOOK_LIB_DIR="\"${abs_builddir}/ucm/test_dlopen/.libs\"" \ -DGTEST_UCC_TOP_SRCDIR="\"${UCC_TOP_SRCDIR}\"" -gtest_SOURCES = \ - common/gtest-all.cc \ - common/test_obj_size.cc \ - common/main.cc \ - common/test_ucc.cc \ - tl/tl_test.cc \ - core/test_lib_config.cc \ - core/test_lib.cc \ - core/test_context_config.cc \ - core/test_context.cc \ - core/test_mc.cc \ - core/test_mc_reduce.cc \ - core/test_team.cc \ - core/test_schedule.cc \ - core/test_topo.cc \ - core/test_service_coll.cc \ - core/test_timeout.cc \ - core/test_utils.cc \ - coll/test_barrier.cc \ - coll/test_alltoall.cc \ - coll/test_alltoallv.cc \ - coll/test_allgather.cc \ - coll/test_allgatherv.cc \ - coll/test_gather.cc \ - coll/test_gatherv.cc \ - coll/test_bcast.cc \ - coll/test_reduce.cc \ - coll/test_allreduce.cc \ - coll/test_reduce_scatter.cc \ - coll/test_reduce_scatterv.cc \ - coll/test_scatter.cc \ - coll/test_scatterv.cc \ - utils/test_string.cc \ - utils/test_ep_map.cc \ - utils/test_lock_free_queue.cc \ - utils/test_math.cc \ - utils/test_cfg_file.cc \ - utils/test_parser.cc \ - coll_score/test_score.cc \ - coll_score/test_score_str.cc \ - coll_score/test_score_update.cc \ +gtest_SOURCES = \ + common/gtest-all.cc \ + common/test_obj_size.cc \ + common/main.cc \ + common/test_ucc.cc \ + tl/tl_test.cc \ + core/test_lib_config.cc \ + core/test_lib.cc \ + core/test_context_config.cc \ + core/test_context.cc \ + core/test_mc.cc \ + core/test_mc_reduce.cc \ + core/test_team.cc \ + core/test_schedule.cc \ + core/test_topo.cc \ + core/test_service_coll.cc \ + core/test_timeout.cc \ + core/test_utils.cc \ + coll/test_barrier.cc \ + coll/test_alltoall.cc \ + coll/test_alltoallv.cc \ + coll/test_allgather.cc \ + coll/test_allgatherv.cc \ + coll/test_gather.cc \ + coll/test_gatherv.cc \ + coll/test_bcast.cc \ + coll/test_reduce.cc \ + coll/test_allreduce_sliding_window.cc \ + coll/test_allreduce.cc \ + coll/test_reduce_scatter.cc \ + coll/test_reduce_scatterv.cc \ + coll/test_scatter.cc \ + coll/test_scatterv.cc \ + utils/test_string.cc \ + utils/test_ep_map.cc \ + utils/test_lock_free_queue.cc \ + utils/test_math.cc \ + utils/test_cfg_file.cc \ + utils/test_parser.cc \ + coll_score/test_score.cc \ + coll_score/test_score_str.cc \ + coll_score/test_score_update.cc \ active_set/test_active_set.cc if TL_MLX5_ENABLED @@ -134,13 +136,18 @@ gtest_LDFLAGS += $(HIP_LDFLAGS) gtest_LDADD += $(HIP_LIBS) endif - -noinst_HEADERS = \ - common/gtest.h \ - common/test.h \ - common/test_ucc.h \ - core/test_context.h \ - core/test_mc_reduce.h \ +gtest_CXXFLAGS += $(UCX_CXXFLAGS) +gtest_CPPFLAGS += $(UCX_CPPFLAGS) +gtest_LDFLAGS += $(UCX_LDFLAGS) +gtest_LDADD += $(UCX_LIBS) $(UCX_LIBADD) + +noinst_HEADERS = \ + common/gtest.h \ + common/test.h \ + common/test_ucc.h \ + core/test_context.h \ + core/test_mc_reduce.h \ + coll/test_allreduce_sliding_window.h \ coll_score/test_score.h .PHONY: test test gdb valgrind fix_rpath ucc diff --git a/test/gtest/coll/test_allreduce.cc b/test/gtest/coll/test_allreduce.cc index dba5ecc8b6..3384f997e7 100644 --- a/test/gtest/coll/test_allreduce.cc +++ b/test/gtest/coll/test_allreduce.cc @@ -7,6 +7,9 @@ #include "common/test_ucc.h" #include "utils/ucc_math.h" +// For sliding window allreduce +#include "test_allreduce_sliding_window.h" + #include template @@ -23,8 +26,9 @@ class test_allreduce : public UccCollArgs, public testing::Test { ctxs[r] = (gtest_ucc_coll_ctx_t*)calloc(1, sizeof(gtest_ucc_coll_ctx_t)); ctxs[r]->args = coll; - coll->coll_type = UCC_COLL_TYPE_ALLREDUCE; - coll->op = T::redop; + coll->coll_type = UCC_COLL_TYPE_ALLREDUCE; + coll->op = T::redop; + coll->global_work_buffer = NULL; ctxs[r]->init_buf = ucc_malloc(ucc_dt_size(dt) * count, "init buf"); EXPECT_NE(ctxs[r]->init_buf, nullptr); @@ -396,6 +400,50 @@ TYPED_TEST(test_allreduce_alg, rab_pipelined) { } } +TYPED_TEST(test_allreduce_alg, sliding_window) +{ + int n_procs = 8; + ucc_job_env_t env = {{"UCC_TL_UCP_TUNE", "allreduce:@2"}, + {"UCC_CLS", "all"}}; + UccJob job(n_procs, UccJob::UCC_JOB_CTX_GLOBAL_ONESIDED, env); + UccTeam_h team = job.create_team(n_procs); + int repeat = 3; + UccCollCtxVec ctxs; + std::vector mt = {UCC_MEMORY_TYPE_HOST}; + ucp_info_t * ucp_infos = NULL; + + if (UCC_OK == ucc_mc_available( + UCC_MEMORY_TYPE_CUDA)) { //add cuda_managed for cl hier? + mt.push_back(UCC_MEMORY_TYPE_CUDA); + } + + for (auto count : {65536, 123567}) { + for (auto inplace : {TEST_NO_INPLACE, TEST_INPLACE}) { + for (auto m : mt) { + SET_MEM_TYPE(m); + this->set_inplace(inplace); + this->data_init(n_procs, TypeParam::dt, count, ctxs, true); + + // set args->global_work_buffer on each ctx + setup_gwbi(n_procs, ctxs, &ucp_infos, inplace == TEST_INPLACE); + + UccReq req(team, ctxs); + + for (auto i = 0; i < repeat; i++) { + req.start(); + req.wait(); + EXPECT_EQ(true, this->data_validate(ctxs)); + this->reset(ctxs); + } + + free_gwbi(n_procs, ctxs, ucp_infos, inplace == TEST_INPLACE); + ucp_infos = NULL; + this->data_fini(ctxs); + } + } + } +} + template class test_allreduce_avg_order : public test_allreduce { }; diff --git a/test/gtest/coll/test_allreduce_sliding_window.cc b/test/gtest/coll/test_allreduce_sliding_window.cc new file mode 100644 index 0000000000..de1304ca99 --- /dev/null +++ b/test/gtest/coll/test_allreduce_sliding_window.cc @@ -0,0 +1,180 @@ +/** + * Copyright (c) 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * See file LICENSE for terms. + */ + +/* + This file is for setting up the global work buffer for sliding window + allreduce. This entails allocating ucp workers, registering memory, + exchanging rkeys, and allocating the pipeline datastructure the + algorithm uses. +*/ + +#include "core/test_mc_reduce.h" +#include "common/test_ucc.h" +#include "utils/ucc_math.h" + +#include + +#include "test_allreduce_sliding_window.h" + +int ucp_init_ex(ucp_context_h *ucp_ctx) +{ + ucs_status_t ucs_status; + ucp_config_t *ucp_config; + ucp_params_t ucp_params; + ucp_context_h ucp_context; + + ucs_status = ucp_config_read(NULL, NULL, &ucp_config); + assert(ucs_status == UCS_OK); + + ucp_params.field_mask = UCP_PARAM_FIELD_FEATURES; + ucp_params.features = UCP_FEATURE_TAG | UCP_FEATURE_RMA | + UCP_FEATURE_AMO64 | UCP_FEATURE_EXPORTED_MEMH; + + ucs_status = ucp_init(&ucp_params, ucp_config, &ucp_context); + if (ucs_status != UCS_OK) { + printf("error on ucp init\n"); + return -1; + } + + *ucp_ctx = ucp_context; + + return 0; +} + +void ep_err_cb(void *arg, ucp_ep_h ep, ucs_status_t ucs_status) +{ + printf("Endpoint error detected, status: %s\n", + ucs_status_string(ucs_status)); +} + +int buffer_export_ucc(ucp_context_h ucp_context, void *buf, size_t len, + struct export_buf *ebuf) +{ + ucs_status_t ucs_status; + ucp_mem_map_params_t params; + ucp_memh_pack_params_t pack_params; + + ebuf->ucp_context = ucp_context; + + params.field_mask = + UCP_MEM_MAP_PARAM_FIELD_ADDRESS | UCP_MEM_MAP_PARAM_FIELD_LENGTH; + params.address = buf; + params.length = len; + + ucs_status = ucp_mem_map(ucp_context, ¶ms, &ebuf->memh); + assert(ucs_status == UCS_OK); + + pack_params.field_mask = UCP_MEMH_PACK_PARAM_FIELD_FLAGS; + pack_params.flags = UCP_MEMH_PACK_FLAG_EXPORT; + + ucs_status = ucp_memh_pack(ebuf->memh, &pack_params, &ebuf->packed_memh, + &ebuf->packed_memh_len); + if (ucs_status != UCS_OK) { + printf("ucp_memh_pack() returned error: %s\n", + ucs_status_string(ucs_status)); + ebuf->packed_memh = NULL; + ebuf->packed_memh_len = 0; + } + + return 0; +} + +void setup_gwbi(int n_procs, UccCollCtxVec &ctxs, + ucp_info_t **ucp_infos_p /* out */, bool inplace) +{ + int i; + + ucp_info_t *ucp_infos = + (ucp_info_t *)ucc_malloc(sizeof(ucp_info_t) * n_procs); + *ucp_infos_p = ucp_infos; + + // allocate gwbi + for (auto ctx : ctxs) { + ucc_tl_ucp_allreduce_sw_global_work_buf_info *gwbi = + (ucc_tl_ucp_allreduce_sw_global_work_buf_info *)ucc_malloc( + sizeof(ucc_tl_ucp_allreduce_sw_global_work_buf_info), + "global work buf info"); + + ctx->args->global_work_buffer = gwbi; + } + + // setup ucp contexts and workers + for (i = 0; i < n_procs; i++) { + ucp_info_t ucp_info; + ucp_init_ex(&ucp_info.ucp_ctx); + memcpy(&ucp_infos[i], &ucp_info, sizeof(ucp_info_t)); + } + + // set up packed src/dst memh + for (i = 0; i < n_procs; i++) { + // my proc's gwbi + ucc_tl_ucp_allreduce_sw_global_work_buf_info *gwbi = + (ucc_tl_ucp_allreduce_sw_global_work_buf_info *)ctxs[i] + ->args->global_work_buffer; + // my proc's ucp_info + ucp_info_t * ucp_info = &ucp_infos[i]; + struct export_buf *dst_ebuf = &ucp_info->dst_ebuf; + size_t dst_len = ctxs[i]->args->dst.info.count * + ucc_dt_size(ctxs[i]->args->dst.info.datatype); + + buffer_export_ucc(ucp_info->ucp_ctx, ctxs[i]->args->dst.info.buffer, + dst_len, dst_ebuf); + + gwbi->packed_dst_memh = dst_ebuf->packed_memh; + + if (!inplace) { + size_t src_len = ctxs[i]->args->src.info.count * + ucc_dt_size(ctxs[i]->args->src.info.datatype); + struct export_buf *src_ebuf = &ucp_info->src_ebuf; + buffer_export_ucc(ucp_info->ucp_ctx, ctxs[i]->args->src.info.buffer, + src_len, src_ebuf); + + gwbi->packed_src_memh = src_ebuf->packed_memh; + } + } + + // set the flag that indicates the global work buffer was passed + for (auto ctx : ctxs) { + ctx->args->mask |= + UCC_COLL_ARGS_FIELD_FLAGS | UCC_COLL_ARGS_FIELD_GLOBAL_WORK_BUFFER; + ctx->args->flags |= UCC_COLL_ARGS_FLAG_MEM_MAPPED_BUFFERS; + } +} + +void free_gwbi(int n_procs, UccCollCtxVec &ctxs, ucp_info_t *ucp_infos, + bool inplace) +{ + int i, k; + + // free sbufs, rbufs, src_rkeys, and dst_rkeys + for (i = 0; i < n_procs; i++) { + // my proc's ucp_info + ucp_info_t *ucp_info = &ucp_infos[i]; + + if (!inplace) { + struct export_buf *src_ebuf = &ucp_info->src_ebuf; + ucp_mem_unmap(ucp_info->ucp_ctx, src_ebuf->memh); + } + + struct export_buf *dst_ebuf = &ucp_info->dst_ebuf; + ucp_mem_unmap(ucp_info->ucp_ctx, dst_ebuf->memh); + } + + // free ucp contexts + for (i = 0; i < n_procs; i++) { + ucp_cleanup(ucp_infos[i].ucp_ctx); + } + + // free gwbi and each gwbi's set of pipes + for (k = 0; k < n_procs; k++) { + ucc_tl_ucp_allreduce_sw_global_work_buf_info *gwbi = + (ucc_tl_ucp_allreduce_sw_global_work_buf_info *)ctxs[k] + ->args->global_work_buffer; + + ucc_free(gwbi); + } + + ucc_free(ucp_infos); +} diff --git a/test/gtest/coll/test_allreduce_sliding_window.h b/test/gtest/coll/test_allreduce_sliding_window.h new file mode 100644 index 0000000000..734e58e6c9 --- /dev/null +++ b/test/gtest/coll/test_allreduce_sliding_window.h @@ -0,0 +1,30 @@ +#ifndef TEST_ALLREDUCE_SW_H +#define TEST_ALLREDUCE_SW_H + +#include "components/tl/ucp/allreduce/allreduce.h" + +struct export_buf { + ucp_context_h ucp_context; + ucp_mem_h memh; + void * packed_memh; + size_t packed_memh_len; + uint64_t memh_id; +}; + +typedef struct ucp_info { + ucp_context_h ucp_ctx; + struct export_buf src_ebuf; + struct export_buf dst_ebuf; +} ucp_info_t; + +void free_gwbi(int n_procs, UccCollCtxVec &ctxs, ucp_info_t *ucp_infos, + bool inplace); +void setup_gwbi(int n_procs, UccCollCtxVec &ctxs, + ucp_info_t **ucp_infos_p /* out */, bool inplace); +int buffer_export_ucc(ucp_context_h ucp_context, void *buf, size_t len, + struct export_buf *ebuf); +void ep_err_cb(void *arg, ucp_ep_h ep, ucs_status_t ucs_status); + +int ucp_init_ex(ucp_context_h *ucp_ctx); + +#endif