diff --git a/src/components/tl/ucp/allreduce/allreduce_sliding_window.c b/src/components/tl/ucp/allreduce/allreduce_sliding_window.c index b072927d17..8074df24c1 100644 --- a/src/components/tl/ucp/allreduce/allreduce_sliding_window.c +++ b/src/components/tl/ucp/allreduce/allreduce_sliding_window.c @@ -19,29 +19,27 @@ ucc_tl_ucp_allreduce_sliding_window_reset_buf(ucc_tl_ucp_allreduce_sw_buf *buf) buf->ucp_req = NULL; } -static inline void -ucc_tl_ucp_allreduce_sliding_window_reset_pipeline( - ucc_tl_ucp_allreduce_sw_pipeline *pipe, - ucc_rank_t rank, - size_t put_window_size) +static inline void ucc_tl_ucp_allreduce_sliding_window_reset_pipeline( + ucc_tl_ucp_allreduce_sw_pipeline *pipe, ucc_rank_t rank, + size_t put_window_size) { int i; - pipe->avail_buffs = pipe->num_buffers; - pipe->src_rank = pipe->dst_rank = rank; - pipe->get_idx = pipe->red_idx = 0; - pipe->done_get = pipe->done_red = 0; - pipe->done_put = pipe->posted_put = 0; - pipe->count_issued = pipe->count_received = 0; + pipe->avail_buffs = pipe->num_buffers; + pipe->src_rank = pipe->dst_rank = rank; + pipe->get_idx = pipe->red_idx = 0; + pipe->done_get = pipe->done_red = 0; + pipe->done_put = pipe->posted_put = 0; + pipe->count_issued = pipe->count_received = 0; pipe->count_reduced = pipe->count_serviced = 0; - pipe->my_count = pipe->my_offset = 0; + pipe->my_count = pipe->my_offset = 0; ucc_tl_ucp_allreduce_sliding_window_reset_buf(&pipe->accbuf); for (i = 0; i < pipe->num_buffers; i++) { ucc_tl_ucp_allreduce_sliding_window_reset_buf(&pipe->getbuf[i]); } - for(i = 0; i < put_window_size; i++) { + for (i = 0; i < put_window_size; i++) { pipe->put_requests[i] = NULL; } } @@ -49,25 +47,23 @@ ucc_tl_ucp_allreduce_sliding_window_reset_pipeline( ucc_status_t ucc_tl_ucp_allreduce_sliding_window_start(ucc_coll_task_t *coll_task) { - ucc_status_t status = UCC_OK; - ucc_tl_ucp_task_t *task = ucc_derived_of(coll_task, - ucc_tl_ucp_task_t); - ucc_tl_ucp_team_t *team = TASK_TEAM(task); - ucc_rank_t rank = UCC_TL_TEAM_RANK(team); + ucc_status_t status = UCC_OK; + ucc_tl_ucp_task_t *task = ucc_derived_of(coll_task, ucc_tl_ucp_task_t); + ucc_tl_ucp_team_t *team = TASK_TEAM(task); + ucc_rank_t rank = UCC_TL_TEAM_RANK(team); uint32_t count_total = coll_task->bargs.args.dst.info.count; ucc_rank_t size = coll_task->team->params.size; ucc_datatype_t dtype = TASK_ARGS(task).dst.info.datatype; size_t dt_size = ucc_dt_size(dtype); - ucc_tl_ucp_allreduce_sw_pipeline *pipe - = task->allreduce_sliding_window.pipe; - ucc_tl_ucp_allreduce_sw_host_allgather *allgather_data - = task->allreduce_sliding_window.allgather_data; - size_t allgather_size - = sizeof(ucc_tl_ucp_allreduce_sw_host_allgather); + ucc_tl_ucp_allreduce_sw_pipeline *pipe = + task->allreduce_sliding_window.pipe; + ucc_tl_ucp_allreduce_sw_host_allgather *allgather_data = + task->allreduce_sliding_window.allgather_data; + size_t allgather_size = sizeof(ucc_tl_ucp_allreduce_sw_host_allgather); ucc_service_coll_req_t *scoll_req; - ucc_tl_ucp_allreduce_sliding_window_reset_pipeline(pipe, rank, - task->allreduce_sliding_window.put_window_size); + ucc_tl_ucp_allreduce_sliding_window_reset_pipeline( + pipe, rank, task->allreduce_sliding_window.put_window_size); pipe->my_count = count_total / size; pipe->my_offset = pipe->my_count * dt_size * rank; @@ -78,18 +74,17 @@ ucc_tl_ucp_allreduce_sliding_window_start(ucc_coll_task_t *coll_task) ucc_tl_ucp_task_reset(task, UCC_INPROGRESS); task->allreduce_sliding_window.reduce_in_progress = 0; - task->allreduce_sliding_window.barrier_task = NULL; + task->allreduce_sliding_window.barrier_task = NULL; UCC_CHECK_GOTO( - ucc_service_allgather(UCC_TL_CORE_TEAM(team), allgather_data, - PTR_OFFSET(allgather_data, allgather_size), - allgather_size, - ucc_sbgp_to_subset(ucc_topo_get_sbgp( - team->topo, UCC_SBGP_FULL)), - &scoll_req), + ucc_service_allgather( + UCC_TL_CORE_TEAM(team), allgather_data, + PTR_OFFSET(allgather_data, allgather_size), allgather_size, + ucc_sbgp_to_subset(ucc_topo_get_sbgp(team->topo, UCC_SBGP_FULL)), + &scoll_req), out, status); - scoll_req->data = allgather_data; + scoll_req->data = allgather_data; task->allreduce_sliding_window.allgather_scoll_req = scoll_req; return ucc_progress_queue_enqueue(UCC_TL_CORE_CTX(team)->pq, &task->super); @@ -101,14 +96,14 @@ ucc_status_t ucc_tl_ucp_allreduce_sliding_window_finalize(ucc_coll_task_t *coll_task) { ucc_tl_ucp_task_t *task = ucc_derived_of(coll_task, ucc_tl_ucp_task_t); - ucc_status_t st = ucc_ee_executor_finalize( - task->allreduce_sliding_window.executor); + ucc_status_t st = + ucc_ee_executor_finalize(task->allreduce_sliding_window.executor); if (ucc_unlikely(st != UCC_OK)) { tl_error(UCC_TASK_LIB(task), "failed to finalize executor"); } - if(ucc_tl_ucp_allreduce_sliding_window_free_gwbi(coll_task) != UCC_OK) { + if (ucc_tl_ucp_allreduce_sliding_window_free_gwbi(coll_task) != UCC_OK) { printf("error freeing resources\n"); } @@ -121,25 +116,19 @@ ucc_tl_ucp_allreduce_sliding_window_finalize(ucc_coll_task_t *coll_task) return st; } -static inline void -ucc_tl_ucp_allreduce_sliding_window_reduction(ucc_coll_task_t *coll_task, - ucc_tl_ucp_allreduce_sw_buf - *accbuf, - ucc_tl_ucp_allreduce_sw_buf - *getbuf) +static inline void ucc_tl_ucp_allreduce_sliding_window_reduction( + ucc_coll_task_t *coll_task, ucc_tl_ucp_allreduce_sw_buf *accbuf, + ucc_tl_ucp_allreduce_sw_buf *getbuf) { ucc_status_t status = UCC_OK; ucc_tl_ucp_task_t *task = ucc_derived_of(coll_task, ucc_tl_ucp_task_t); - ucc_coll_args_t *args = &TASK_ARGS(task); + ucc_coll_args_t * args = &TASK_ARGS(task); ucc_datatype_t dt = TASK_ARGS(task).dst.info.datatype; - status = ucc_dt_reduce(accbuf->buf, - getbuf->buf, - accbuf->buf, - accbuf->count, dt, - args, 0, - 0, task->allreduce_sliding_window.executor, - &task->allreduce_sliding_window.etask); + status = + ucc_dt_reduce(accbuf->buf, getbuf->buf, accbuf->buf, accbuf->count, dt, + args, 0, 0, task->allreduce_sliding_window.executor, + &task->allreduce_sliding_window.etask); task->allreduce_sliding_window.reduce_in_progress = 1; @@ -155,47 +144,42 @@ ucc_tl_ucp_allreduce_sliding_window_test_reduction(ucc_tl_ucp_task_t *task) { ucc_status_t status; - #define SAVE_STATE(_reduce_in_progress) _reduce_in_progress = 1 +#define SAVE_STATE(_reduce_in_progress) _reduce_in_progress = 1 - EXEC_TASK_TEST( - task->allreduce_sliding_window.reduce_in_progress, - "failed to perform dt reduction", - task->allreduce_sliding_window.etask); + EXEC_TASK_TEST(task->allreduce_sliding_window.reduce_in_progress, + "failed to perform dt reduction", + task->allreduce_sliding_window.etask); // If it didn't complete, we would have returned by now. So, clear the flag task->allreduce_sliding_window.reduce_in_progress = 0; } -static inline ucc_status_t -ucc_tl_ucp_allreduce_sliding_window_req_test(ucs_status_ptr_t request, +static inline ucc_status_t +ucc_tl_ucp_allreduce_sliding_window_req_test(ucs_status_ptr_t request, ucc_tl_ucp_task_t *task) { if (request == NULL) { return UCC_OK; } else if (UCS_PTR_IS_ERR(request)) { tl_error(UCC_TASK_LIB(task), "unable to complete UCX request=%p: %d\n", - request, - UCS_PTR_STATUS(request)); + request, UCS_PTR_STATUS(request)); return ucs_status_to_ucc_status(UCS_PTR_STATUS(request)); } else { return ucs_status_to_ucc_status(ucp_request_check_status(request)); } } -static inline void -ucc_tl_ucp_allreduce_sliding_window_allgather_info_test(ucc_coll_task_t *coll_task) +static inline void ucc_tl_ucp_allreduce_sliding_window_allgather_info_test( + ucc_coll_task_t *coll_task) { - ucc_tl_ucp_task_t *task = ucc_derived_of( - coll_task, - ucc_tl_ucp_task_t); - ucc_service_coll_req_t *allgather_scoll_req = - task->allreduce_sliding_window.allgather_scoll_req; - ucc_status_t status = - ucc_service_coll_test(allgather_scoll_req); + ucc_tl_ucp_task_t * task = ucc_derived_of(coll_task, ucc_tl_ucp_task_t); + ucc_service_coll_req_t *allgather_scoll_req = + task->allreduce_sliding_window.allgather_scoll_req; + ucc_status_t status = ucc_service_coll_test(allgather_scoll_req); if (status < 0) { tl_error(coll_task->team->context->lib, - "failure during service coll exchange: %s", - ucc_status_string(status)); + "failure during service coll exchange: %s", + ucc_status_string(status)); ucc_service_coll_finalize(allgather_scoll_req); task->super.status = status; return; @@ -207,23 +191,22 @@ ucc_tl_ucp_allreduce_sliding_window_allgather_info_test(ucc_coll_task_t *coll_ta // copy from allgather recvbuf into gwbi ucc_tl_ucp_allreduce_sliding_window_allgather_info_finalize( - allgather_scoll_req, - task); + allgather_scoll_req, task); - ucc_service_coll_finalize(task->allreduce_sliding_window.allgather_scoll_req); + ucc_service_coll_finalize( + task->allreduce_sliding_window.allgather_scoll_req); task->allreduce_sliding_window.allgather_scoll_req = NULL; } -static inline void -ucc_tl_ucp_allreduce_sliding_window_allgather_free_rkeys(ucc_coll_task_t *coll_task) +static inline void ucc_tl_ucp_allreduce_sliding_window_allgather_free_rkeys( + ucc_coll_task_t *coll_task) { - int i; - ucc_base_team_t *team = coll_task->team; - ucc_rank_t team_size = (ucc_rank_t)team->params.size; - ucc_tl_ucp_task_t *task = ucc_derived_of(coll_task, - ucc_tl_ucp_task_t); + int i; + ucc_base_team_t * team = coll_task->team; + ucc_rank_t team_size = (ucc_rank_t)team->params.size; + ucc_tl_ucp_task_t *task = ucc_derived_of(coll_task, ucc_tl_ucp_task_t); - for(i = 0; i < team_size; i++) { + for (i = 0; i < team_size; i++) { if (!task->allreduce_sliding_window.inplace) ucp_rkey_destroy(task->allreduce_sliding_window.src_rkeys[i]); ucp_rkey_destroy(task->allreduce_sliding_window.dst_rkeys[i]); @@ -233,81 +216,78 @@ ucc_tl_ucp_allreduce_sliding_window_allgather_free_rkeys(ucc_coll_task_t *coll_t static inline void ucc_tl_ucp_allreduce_sliding_window_barrier(ucc_coll_task_t *coll_task) { - ucc_tl_ucp_task_t *task = ucc_derived_of(coll_task, - ucc_tl_ucp_task_t); - ucc_base_team_t *team = coll_task->team; + ucc_tl_ucp_task_t *task = ucc_derived_of(coll_task, ucc_tl_ucp_task_t); + ucc_base_team_t * team = coll_task->team; ucc_status_t status; ucc_base_coll_args_t coll_args = { - .team = coll_task->team->params.team, + .team = coll_task->team->params.team, .args.coll_type = UCC_COLL_TYPE_BARRIER, }; - status = ucc_tl_ucp_coll_init(&coll_args, - team, + status = ucc_tl_ucp_coll_init(&coll_args, team, &task->allreduce_sliding_window.barrier_task); if (status < 0) { tl_error(coll_task->team->context->lib, - "failure during sliding window barrier init: %s", - ucc_status_string(status)); + "failure during sliding window barrier init: %s", + ucc_status_string(status)); task->super.status = status; return; } status = ucc_tl_ucp_barrier_knomial_start( - task->allreduce_sliding_window.barrier_task); + task->allreduce_sliding_window.barrier_task); if (status < 0) { tl_error(coll_task->team->context->lib, - "failure during sliding window barrier start: %s", - ucc_status_string(status)); + "failure during sliding window barrier start: %s", + ucc_status_string(status)); task->super.status = status; } } void ucc_tl_ucp_allreduce_sliding_window_progress(ucc_coll_task_t *coll_task) { - ucc_tl_ucp_task_t *task = ucc_derived_of(coll_task, ucc_tl_ucp_task_t); - ucc_rank_t size = (ucc_rank_t)task->subset.map.ep_num; - ucc_datatype_t dtype = TASK_ARGS(task).dst.info.datatype; - size_t dt_size = ucc_dt_size(dtype); - uint32_t host_team_size = size; - ucc_base_team_t *base_team = coll_task->team; - ucc_tl_ucp_team_t *tl_team = ucc_derived_of(base_team, - ucc_tl_ucp_team_t); - ucc_tl_ucp_context_t *tl_ctx = UCC_TL_UCP_TEAM_CTX(tl_team); - ucc_tl_ucp_allreduce_sw_pipeline *pipe = task->allreduce_sliding_window.pipe; - ucc_tl_ucp_allreduce_sw_buf *accbuf = &pipe->accbuf; - ucp_request_param_t req_param = {0}; - int i = 0; - ucc_service_coll_req_t *allgather_scoll_req = - task->allreduce_sliding_window.allgather_scoll_req; - ucc_coll_task_t *barrier_task = - task->allreduce_sliding_window.barrier_task; - size_t remaining_elems; - size_t get_idx; - size_t count; - size_t get_offset; - size_t data_size; - ucc_rank_t src_rank; - ucc_rank_t dst_rank; - void *src_addr; - void *dst_addr; - ucs_status_ptr_t request; - size_t red_idx; - ucc_tl_ucp_allreduce_sw_buf *redbuf; - ucc_tl_ucp_allreduce_sw_buf *getbuf; - size_t put_offset; - int window; - int put_idx; + ucc_tl_ucp_task_t *task = ucc_derived_of(coll_task, ucc_tl_ucp_task_t); + ucc_rank_t size = (ucc_rank_t)task->subset.map.ep_num; + ucc_datatype_t dtype = TASK_ARGS(task).dst.info.datatype; + size_t dt_size = ucc_dt_size(dtype); + uint32_t host_team_size = size; + ucc_base_team_t * base_team = coll_task->team; + ucc_tl_ucp_team_t *tl_team = ucc_derived_of(base_team, ucc_tl_ucp_team_t); + ucc_tl_ucp_context_t * tl_ctx = UCC_TL_UCP_TEAM_CTX(tl_team); + ucc_tl_ucp_allreduce_sw_pipeline *pipe = + task->allreduce_sliding_window.pipe; + ucc_tl_ucp_allreduce_sw_buf *accbuf = &pipe->accbuf; + ucp_request_param_t req_param = {0}; + int i = 0; + ucc_service_coll_req_t * allgather_scoll_req = + task->allreduce_sliding_window.allgather_scoll_req; + ucc_coll_task_t *barrier_task = task->allreduce_sliding_window.barrier_task; + size_t remaining_elems; + size_t get_idx; + size_t count; + size_t get_offset; + size_t data_size; + ucc_rank_t src_rank; + ucc_rank_t dst_rank; + void * src_addr; + void * dst_addr; + ucs_status_ptr_t request; + size_t red_idx; + ucc_tl_ucp_allreduce_sw_buf *redbuf; + ucc_tl_ucp_allreduce_sw_buf *getbuf; + size_t put_offset; + int window; + int put_idx; if (barrier_task != NULL) { // mark sliding window task complete once barrier finishes if (barrier_task->super.status == UCC_OK) { ucc_tl_ucp_put_task( ucc_derived_of(task->allreduce_sliding_window.barrier_task, - ucc_tl_ucp_task_t)); + ucc_tl_ucp_task_t)); task->allreduce_sliding_window.barrier_task = NULL; - task->super.status = UCC_OK; + task->super.status = UCC_OK; } ucc_assert(barrier_task->super.status >= 0); @@ -330,35 +310,29 @@ void ucc_tl_ucp_allreduce_sliding_window_progress(ucc_coll_task_t *coll_task) } if (pipe->count_serviced < pipe->my_count) { - if ((pipe->count_received < pipe->my_count) - && (pipe->done_get < host_team_size) - && (pipe->avail_buffs > 0) - && (accbuf->state != REDUCED && accbuf->state != SENDING)) - { + if ((pipe->count_received < pipe->my_count) && + (pipe->done_get < host_team_size) && (pipe->avail_buffs > 0) && + (accbuf->state != REDUCED && accbuf->state != SENDING)) { remaining_elems = pipe->my_count - pipe->count_received; get_idx = pipe->get_idx % pipe->num_buffers; - count = ucc_min(pipe->buffer_size/dt_size, - remaining_elems); - get_offset = pipe->count_received * dt_size + pipe->my_offset; - data_size = count * dt_size; - src_rank = pipe->src_rank; - getbuf = accbuf->state == FREE ? - accbuf : &pipe->getbuf[get_idx]; - src_addr = task->allreduce_sliding_window.sbufs[src_rank] - + get_offset; - dst_addr = getbuf->buf; + count = ucc_min(pipe->buffer_size / dt_size, remaining_elems); + get_offset = pipe->count_received * dt_size + pipe->my_offset; + data_size = count * dt_size; + src_rank = pipe->src_rank; + getbuf = accbuf->state == FREE ? accbuf : &pipe->getbuf[get_idx]; + src_addr = + task->allreduce_sliding_window.sbufs[src_rank] + get_offset; + dst_addr = getbuf->buf; ucc_assert(getbuf->state == FREE); - getbuf->state = RECVING; - getbuf->count = count; - getbuf->bytes = data_size; - getbuf->ucp_req = - ucp_get_nbx( - task->allreduce_sliding_window.eps[src_rank], - dst_addr, data_size, (uint64_t)src_addr, - task->allreduce_sliding_window.src_rkeys[src_rank], - &req_param); + getbuf->state = RECVING; + getbuf->count = count; + getbuf->bytes = data_size; + getbuf->ucp_req = ucp_get_nbx( + task->allreduce_sliding_window.eps[src_rank], dst_addr, + data_size, (uint64_t)src_addr, + task->allreduce_sliding_window.src_rkeys[src_rank], &req_param); pipe->src_rank = (src_rank + 1) % host_team_size; @@ -375,26 +349,28 @@ void ucc_tl_ucp_allreduce_sliding_window_progress(ucc_coll_task_t *coll_task) if (accbuf->state == RECVING) { request = accbuf->ucp_req; - if (ucc_tl_ucp_allreduce_sliding_window_req_test(request, task) - == UCC_OK) { - if (request) ucp_request_free(request); - accbuf->state = REDUCING; + if (ucc_tl_ucp_allreduce_sliding_window_req_test(request, task) == + UCC_OK) { + if (request) + ucp_request_free(request); + accbuf->state = REDUCING; accbuf->ucp_req = NULL; } } red_idx = pipe->red_idx % pipe->num_buffers; - redbuf = &pipe->getbuf[red_idx]; + redbuf = &pipe->getbuf[red_idx]; if (accbuf->state == REDUCING && redbuf->state == RECVING) { request = redbuf->ucp_req; - if (ucc_tl_ucp_allreduce_sliding_window_req_test(request, task) - == UCC_OK) { - if (request) ucp_request_free(request); - redbuf->state = REDUCING; + if (ucc_tl_ucp_allreduce_sliding_window_req_test(request, task) == + UCC_OK) { + if (request) + ucp_request_free(request); + redbuf->state = REDUCING; redbuf->ucp_req = NULL; - ucc_tl_ucp_allreduce_sliding_window_reduction(coll_task, - accbuf, redbuf); + ucc_tl_ucp_allreduce_sliding_window_reduction(coll_task, accbuf, + redbuf); ucc_tl_ucp_allreduce_sliding_window_test_reduction(task); @@ -407,32 +383,31 @@ void ucc_tl_ucp_allreduce_sliding_window_progress(ucc_coll_task_t *coll_task) pipe->red_idx++; pipe->done_red++; - if (pipe->done_red == host_team_size-1) { + if (pipe->done_red == host_team_size - 1) { accbuf->state = REDUCED; pipe->count_reduced += accbuf->count; } } } - if ((pipe->count_serviced < pipe->count_reduced) - && (accbuf->state == REDUCED)) - { - data_size = accbuf->bytes; + if ((pipe->count_serviced < pipe->count_reduced) && + (accbuf->state == REDUCED)) { + data_size = accbuf->bytes; put_offset = pipe->count_serviced * dt_size + pipe->my_offset; window = ucc_min(task->allreduce_sliding_window.put_window_size, - host_team_size - pipe->posted_put); + host_team_size - pipe->posted_put); for (i = 0; i < window; i++) { dst_rank = pipe->dst_rank; src_addr = accbuf->buf; - dst_addr = task->allreduce_sliding_window.rbufs[dst_rank] - + put_offset; - put_idx = pipe->posted_put - % task->allreduce_sliding_window.put_window_size; + dst_addr = + task->allreduce_sliding_window.rbufs[dst_rank] + put_offset; + put_idx = pipe->posted_put % + task->allreduce_sliding_window.put_window_size; - if(task->allreduce_sliding_window.put_requests[put_idx] - != NULL) { + if (task->allreduce_sliding_window.put_requests[put_idx] != + NULL) { // We've already posted a put at this index that didn't yet // complete, left this function and came back. Skip to check // whether this request finished instead of overwriting it @@ -443,8 +418,8 @@ void ucc_tl_ucp_allreduce_sliding_window_progress(ucc_coll_task_t *coll_task) ucp_worker_fence(tl_ctx->worker.ucp_worker); task->allreduce_sliding_window.put_requests[put_idx] = ucp_put_nbx( - task->allreduce_sliding_window.eps[dst_rank], - src_addr, data_size, (uint64_t)dst_addr, + task->allreduce_sliding_window.eps[dst_rank], src_addr, + data_size, (uint64_t)dst_addr, task->allreduce_sliding_window.dst_rkeys[dst_rank], &req_param); @@ -458,11 +433,12 @@ void ucc_tl_ucp_allreduce_sliding_window_progress(ucc_coll_task_t *coll_task) // These are fenced, so if the first fails, the proceding will // too - if (ucc_tl_ucp_allreduce_sliding_window_req_test(request, task) - != UCC_OK) + if (ucc_tl_ucp_allreduce_sliding_window_req_test( + request, task) != UCC_OK) break; - if (request != NULL) ucp_request_free(request); + if (request != NULL) + ucp_request_free(request); task->allreduce_sliding_window.put_requests[put_idx] = NULL; pipe->done_put++; } @@ -470,17 +446,17 @@ void ucc_tl_ucp_allreduce_sliding_window_progress(ucc_coll_task_t *coll_task) if (pipe->done_put == host_team_size) { ucc_assert(pipe->avail_buffs == pipe->num_buffers); ucc_assert(pipe->done_get == host_team_size); - ucc_assert(pipe->done_red == host_team_size-1); + ucc_assert(pipe->done_red == host_team_size - 1); ucc_assert(pipe->done_put == host_team_size); pipe->count_serviced += accbuf->count; ucc_tl_ucp_allreduce_sliding_window_reset_buf(accbuf); - pipe->done_get = 0; - pipe->done_red = pipe->done_put = pipe->posted_put = 0; + pipe->done_get = 0; + pipe->done_red = pipe->done_put = pipe->posted_put = 0; for (i = 0; i < task->allreduce_sliding_window.put_window_size; - i++) { + i++) { task->allreduce_sliding_window.put_requests[i] = NULL; } } diff --git a/src/components/tl/ucp/allreduce/allreduce_sliding_window_setup.c b/src/components/tl/ucp/allreduce/allreduce_sliding_window_setup.c index a546288287..f7b2c12b39 100644 --- a/src/components/tl/ucp/allreduce/allreduce_sliding_window_setup.c +++ b/src/components/tl/ucp/allreduce/allreduce_sliding_window_setup.c @@ -9,27 +9,23 @@ #include "utils/ucc_dt_reduce.h" #include "tl_ucp_ep.h" -static int -ucc_tl_ucp_allreduce_sliding_window_register( - ucp_context_h ucp_context, - ucc_tl_ucp_team_t *tl_team, - struct ucc_tl_ucp_allreduce_sw_export_buf *ebuf, - void *packed_memh) +static int ucc_tl_ucp_allreduce_sliding_window_register( + ucp_context_h ucp_context, ucc_tl_ucp_team_t *tl_team, + struct ucc_tl_ucp_allreduce_sw_export_buf *ebuf, void *packed_memh) { ucs_status_t ucs_status; ucp_mem_map_params_t params = {0}; ebuf->ucp_context = ucp_context; - params.field_mask = - UCP_MEM_MAP_PARAM_FIELD_EXPORTED_MEMH_BUFFER; + params.field_mask = UCP_MEM_MAP_PARAM_FIELD_EXPORTED_MEMH_BUFFER; params.exported_memh_buffer = packed_memh; ucs_status = ucp_mem_map(ucp_context, ¶ms, &ebuf->memh); if (UCS_OK != ucs_status) { tl_error(UCC_TL_TEAM_LIB(tl_team), - "import using ucp_mem_map() returned error: %s\n", - ucs_status_string(ucs_status)); + "import using ucp_mem_map() returned error: %s\n", + ucs_status_string(ucs_status)); return 0; } @@ -37,8 +33,8 @@ ucc_tl_ucp_allreduce_sliding_window_register( &ebuf->packed_key_len); if (UCS_OK != ucs_status) { tl_error(UCC_TL_TEAM_LIB(tl_team), - "ucp_rkey_pack() returned error: %s\n", - ucs_status_string(ucs_status)); + "ucp_rkey_pack() returned error: %s\n", + ucs_status_string(ucs_status)); return 0; } @@ -47,65 +43,61 @@ ucc_tl_ucp_allreduce_sliding_window_register( ucc_status_t ucc_tl_ucp_allreduce_sliding_window_task_init(ucc_base_coll_args_t *coll_args, - ucc_base_team_t *team, - ucc_tl_ucp_task_t *task) + ucc_base_team_t * team, + ucc_tl_ucp_task_t * task) { - ucc_status_t status = UCC_OK; - void *src_buf = coll_args->args.src.info.buffer; - void *dst_buf = coll_args->args.dst.info.buffer; - ucc_rank_t team_size = (ucc_rank_t)team->params.size; - ucc_tl_ucp_team_t *tl_team = ucc_derived_of(team, ucc_tl_ucp_team_t); - ucc_tl_ucp_context_t *tl_ctx = UCC_TL_UCP_TEAM_CTX(tl_team); - ucc_tl_ucp_allreduce_sw_global_work_buf_info - *gwbi_p = NULL; - size_t allgather_size = sizeof(ucc_tl_ucp_allreduce_sw_host_allgather); + ucc_status_t status = UCC_OK; + void * src_buf = coll_args->args.src.info.buffer; + void * dst_buf = coll_args->args.dst.info.buffer; + ucc_rank_t team_size = (ucc_rank_t)team->params.size; + ucc_tl_ucp_team_t * tl_team = ucc_derived_of(team, ucc_tl_ucp_team_t); + ucc_tl_ucp_context_t *tl_ctx = UCC_TL_UCP_TEAM_CTX(tl_team); + ucc_tl_ucp_allreduce_sw_global_work_buf_info *gwbi_p = NULL; + size_t allgather_size = sizeof(ucc_tl_ucp_allreduce_sw_host_allgather); ucc_tl_ucp_allreduce_sw_host_allgather *allgather_data; - tl_debug(UCC_TL_TEAM_LIB(tl_team), - "allocating pipe\n"); + tl_debug(UCC_TL_TEAM_LIB(tl_team), "allocating pipe\n"); if ((status = ucc_tl_ucp_allreduce_sliding_window_alloc_pipe( - coll_args, team, task)) != UCC_OK) { - tl_error(UCC_TL_TEAM_LIB(tl_team), - "error while allocating pipe"); + coll_args, team, task)) != UCC_OK) { + tl_error(UCC_TL_TEAM_LIB(tl_team), "error while allocating pipe"); goto out; } allgather_data = ucc_malloc(allgather_size * (team_size + 1)); gwbi_p = coll_args->args.global_work_buffer; - task->super.bargs.args.global_work_buffer = gwbi_p; + task->super.bargs.args.global_work_buffer = gwbi_p; - task->allreduce_sliding_window.inplace = - UCC_IS_INPLACE(coll_args->args); + task->allreduce_sliding_window.inplace = UCC_IS_INPLACE(coll_args->args); - task->allreduce_sliding_window.barrier_task = NULL; + task->allreduce_sliding_window.barrier_task = NULL; if (!task->allreduce_sliding_window.inplace) { - task->allreduce_sliding_window.sbufs = ucc_malloc(sizeof(void*) * - team_size); - task->allreduce_sliding_window.src_rkeys = ucc_malloc(sizeof(ucp_rkey_h) - * team_size); + task->allreduce_sliding_window.sbufs = + ucc_malloc(sizeof(void *) * team_size); + task->allreduce_sliding_window.src_rkeys = + ucc_malloc(sizeof(ucp_rkey_h) * team_size); } - task->allreduce_sliding_window.rbufs = ucc_malloc(sizeof(void*) - * team_size); - task->allreduce_sliding_window.dst_rkeys = ucc_malloc(sizeof(ucp_rkey_h) - * team_size); - task->allreduce_sliding_window.eps = ucc_malloc(sizeof(ucp_ep_h) - * team_size); + task->allreduce_sliding_window.rbufs = + ucc_malloc(sizeof(void *) * team_size); + task->allreduce_sliding_window.dst_rkeys = + ucc_malloc(sizeof(ucp_rkey_h) * team_size); + task->allreduce_sliding_window.eps = + ucc_malloc(sizeof(ucp_ep_h) * team_size); - task->allreduce_sliding_window.put_requests = - task->allreduce_sliding_window.pipe->put_requests; + task->allreduce_sliding_window.put_requests = + task->allreduce_sliding_window.pipe->put_requests; if (!task->allreduce_sliding_window.inplace) { - task->allreduce_sliding_window.src_ebuf = ucc_malloc( - sizeof(struct ucc_tl_ucp_allreduce_sw_export_buf)); + task->allreduce_sliding_window.src_ebuf = + ucc_malloc(sizeof(struct ucc_tl_ucp_allreduce_sw_export_buf)); } else { task->allreduce_sliding_window.src_ebuf = NULL; } - task->allreduce_sliding_window.dst_ebuf = ucc_malloc( - sizeof(struct ucc_tl_ucp_allreduce_sw_export_buf)); + task->allreduce_sliding_window.dst_ebuf = + ucc_malloc(sizeof(struct ucc_tl_ucp_allreduce_sw_export_buf)); if (!task->allreduce_sliding_window.inplace) allgather_data->src_buf = src_buf; @@ -115,23 +107,21 @@ ucc_tl_ucp_allreduce_sliding_window_task_init(ucc_base_coll_args_t *coll_args, // Register the src and dst bufs if (!task->allreduce_sliding_window.inplace) { ucc_tl_ucp_allreduce_sliding_window_register( - tl_ctx->worker.ucp_context, - tl_team, task->allreduce_sliding_window.src_ebuf, - gwbi_p->packed_src_memh); + tl_ctx->worker.ucp_context, tl_team, + task->allreduce_sliding_window.src_ebuf, gwbi_p->packed_src_memh); memcpy(allgather_data->packed_src_key, - task->allreduce_sliding_window.src_ebuf->packed_key, - task->allreduce_sliding_window.src_ebuf->packed_key_len); + task->allreduce_sliding_window.src_ebuf->packed_key, + task->allreduce_sliding_window.src_ebuf->packed_key_len); } ucc_tl_ucp_allreduce_sliding_window_register( - tl_ctx->worker.ucp_context, - tl_team, task->allreduce_sliding_window.dst_ebuf, - gwbi_p->packed_dst_memh); + tl_ctx->worker.ucp_context, tl_team, + task->allreduce_sliding_window.dst_ebuf, gwbi_p->packed_dst_memh); memcpy(allgather_data->packed_dst_key, - task->allreduce_sliding_window.dst_ebuf->packed_key, - task->allreduce_sliding_window.dst_ebuf->packed_key_len); + task->allreduce_sliding_window.dst_ebuf->packed_key, + task->allreduce_sliding_window.dst_ebuf->packed_key_len); - task->allreduce_sliding_window.allgather_data = allgather_data; + task->allreduce_sliding_window.allgather_data = allgather_data; task->allreduce_sliding_window.allgather_scoll_req = NULL; return UCC_OK; @@ -139,24 +129,20 @@ ucc_tl_ucp_allreduce_sliding_window_task_init(ucc_base_coll_args_t *coll_args, return status; } -ucc_status_t -ucc_tl_ucp_allreduce_sliding_window_allgather_info_finalize(ucc_service_coll_req_t *scoll_req, - ucc_tl_ucp_task_t *sw_task) +ucc_status_t ucc_tl_ucp_allreduce_sliding_window_allgather_info_finalize( + ucc_service_coll_req_t *scoll_req, ucc_tl_ucp_task_t *sw_task) { - ucc_status_t status; - ucc_rank_t i; - ucc_base_team_t *base_team = sw_task->super.team; - ucc_tl_ucp_team_t *tl_team = ucc_derived_of(base_team, - ucc_tl_ucp_team_t); - ucc_rank_t team_size = base_team->params.size; - - size_t allgather_size = sizeof( - ucc_tl_ucp_allreduce_sw_host_allgather); - ucc_tl_ucp_allreduce_sw_host_allgather - *all_host_allgather = PTR_OFFSET(scoll_req->data, - allgather_size); - - for(i = 0; i < team_size; i++) { + ucc_status_t status; + ucc_rank_t i; + ucc_base_team_t * base_team = sw_task->super.team; + ucc_tl_ucp_team_t *tl_team = ucc_derived_of(base_team, ucc_tl_ucp_team_t); + ucc_rank_t team_size = base_team->params.size; + + size_t allgather_size = sizeof(ucc_tl_ucp_allreduce_sw_host_allgather); + ucc_tl_ucp_allreduce_sw_host_allgather *all_host_allgather = + PTR_OFFSET(scoll_req->data, allgather_size); + + for (i = 0; i < team_size; i++) { ucs_status_t ucs_status = UCS_OK; ucp_rkey_h src_unpacked, dst_unpacked; ucp_ep_h ep; @@ -166,31 +152,30 @@ ucc_tl_ucp_allreduce_sliding_window_allgather_info_finalize(ucc_service_coll_req return status; } - ucs_status = ucp_ep_rkey_unpack(ep, - all_host_allgather[i].packed_dst_key, - &dst_unpacked); + ucs_status = ucp_ep_rkey_unpack( + ep, all_host_allgather[i].packed_dst_key, &dst_unpacked); if (UCS_OK != ucs_status) { tl_error(UCC_TL_TEAM_LIB(tl_team), "dst rkey unpack failed\n"); return UCC_ERR_NO_RESOURCE; } - sw_task->allreduce_sliding_window.rbufs[i] = + sw_task->allreduce_sliding_window.rbufs[i] = all_host_allgather[i].dst_buf; sw_task->allreduce_sliding_window.dst_rkeys[i] = dst_unpacked; if (!sw_task->allreduce_sliding_window.inplace) { - ucs_status = ucp_ep_rkey_unpack(ep, - all_host_allgather[i].packed_src_key, - &src_unpacked); + ucs_status = ucp_ep_rkey_unpack( + ep, all_host_allgather[i].packed_src_key, &src_unpacked); if (UCS_OK != ucs_status) { tl_error(UCC_TL_TEAM_LIB(tl_team), "src rkey unpack failed\n"); return UCC_ERR_NO_RESOURCE; } - sw_task->allreduce_sliding_window.sbufs[i] = all_host_allgather[i].src_buf; + sw_task->allreduce_sliding_window.sbufs[i] = + all_host_allgather[i].src_buf; sw_task->allreduce_sliding_window.src_rkeys[i] = src_unpacked; } else { - sw_task->allreduce_sliding_window.sbufs = + sw_task->allreduce_sliding_window.sbufs = sw_task->allreduce_sliding_window.rbufs; sw_task->allreduce_sliding_window.src_rkeys = sw_task->allreduce_sliding_window.dst_rkeys; @@ -204,50 +189,52 @@ ucc_tl_ucp_allreduce_sliding_window_allgather_info_finalize(ucc_service_coll_req ucc_status_t ucc_tl_ucp_allreduce_sliding_window_alloc_pipe(ucc_base_coll_args_t *coll_args, - ucc_base_team_t *team, - ucc_tl_ucp_task_t *task) + ucc_base_team_t * team, + ucc_tl_ucp_task_t * task) { - int i; - ucc_tl_ucp_team_t *tl_team = ucc_derived_of(team, ucc_tl_ucp_team_t); - ucc_rank_t team_size = (ucc_rank_t)team->params.size; - - const size_t buf_size = UCC_TL_UCP_TEAM_LIB(tl_team)->cfg. - allreduce_sliding_window_buf_size; - int put_window_size = UCC_TL_UCP_TEAM_LIB(tl_team)->cfg. - allreduce_sliding_window_put_window_size; - int num_get_bufs = UCC_TL_UCP_TEAM_LIB(tl_team)->cfg. - allreduce_sliding_window_num_get_bufs; - - ucc_tl_ucp_allreduce_sw_pipeline *pipe = (ucc_tl_ucp_allreduce_sw_pipeline *) - ucc_malloc(sizeof(ucc_tl_ucp_allreduce_sw_pipeline)); - if(pipe == NULL) { - tl_error(UCC_TL_TEAM_LIB(tl_team), - "error allocating dpu pipe\n"); + int i; + ucc_tl_ucp_team_t *tl_team = ucc_derived_of(team, ucc_tl_ucp_team_t); + ucc_rank_t team_size = (ucc_rank_t)team->params.size; + + const size_t buf_size = + UCC_TL_UCP_TEAM_LIB(tl_team)->cfg.allreduce_sliding_window_buf_size; + int put_window_size = UCC_TL_UCP_TEAM_LIB(tl_team) + ->cfg.allreduce_sliding_window_put_window_size; + int num_get_bufs = + UCC_TL_UCP_TEAM_LIB(tl_team)->cfg.allreduce_sliding_window_num_get_bufs; + + ucc_tl_ucp_allreduce_sw_pipeline *pipe = + (ucc_tl_ucp_allreduce_sw_pipeline *)ucc_malloc( + sizeof(ucc_tl_ucp_allreduce_sw_pipeline)); + if (pipe == NULL) { + tl_error(UCC_TL_TEAM_LIB(tl_team), "error allocating dpu pipe\n"); return UCC_ERR_NO_RESOURCE; } - if (put_window_size <= 0) put_window_size = team_size; - if (num_get_bufs <= 0) num_get_bufs = team_size; + if (put_window_size <= 0) + put_window_size = team_size; + if (num_get_bufs <= 0) + num_get_bufs = team_size; pipe->accbuf.buf = ucc_malloc(buf_size); - if(pipe->accbuf.buf == NULL) { + if (pipe->accbuf.buf == NULL) { tl_error(UCC_TL_TEAM_LIB(tl_team), "error allocating accbuf\n"); return UCC_ERR_NO_RESOURCE; } - pipe->getbuf = (ucc_tl_ucp_allreduce_sw_buf*) - ucc_malloc(num_get_bufs * sizeof(ucc_tl_ucp_allreduce_sw_buf)); - if(pipe->getbuf == NULL) { + pipe->getbuf = (ucc_tl_ucp_allreduce_sw_buf *)ucc_malloc( + num_get_bufs * sizeof(ucc_tl_ucp_allreduce_sw_buf)); + if (pipe->getbuf == NULL) { tl_error(UCC_TL_TEAM_LIB(tl_team), "error allocating getbuf array\n"); return UCC_ERR_NO_RESOURCE; } - for(i = 0; i < num_get_bufs; i++) { + for (i = 0; i < num_get_bufs; i++) { pipe->getbuf[i].buf = ucc_malloc(buf_size); } pipe->buffer_size = buf_size; pipe->num_buffers = num_get_bufs; - pipe->put_requests = (ucs_status_ptr_t*) - ucc_malloc(put_window_size * sizeof(ucs_status_ptr_t)); + pipe->put_requests = (ucs_status_ptr_t *)ucc_malloc( + put_window_size * sizeof(ucs_status_ptr_t)); task->allreduce_sliding_window.put_window_size = put_window_size; task->allreduce_sliding_window.num_get_bufs = num_get_bufs; @@ -261,13 +248,12 @@ ucc_status_t ucc_tl_ucp_allreduce_sliding_window_free_gwbi(ucc_coll_task_t *coll_task) { int i; - ucc_base_team_t *team = coll_task->team; - ucc_tl_ucp_team_t *tl_team = ucc_derived_of(team, ucc_tl_ucp_team_t); - ucc_tl_ucp_task_t *task = ucc_derived_of(coll_task, - ucc_tl_ucp_task_t); - ucc_tl_ucp_context_t *tl_ctx = UCC_TL_UCP_TEAM_CTX(tl_team); - ucc_tl_ucp_allreduce_sw_pipeline - *pipe = task->allreduce_sliding_window.pipe; + ucc_base_team_t * team = coll_task->team; + ucc_tl_ucp_team_t * tl_team = ucc_derived_of(team, ucc_tl_ucp_team_t); + ucc_tl_ucp_task_t * task = ucc_derived_of(coll_task, ucc_tl_ucp_task_t); + ucc_tl_ucp_context_t *tl_ctx = UCC_TL_UCP_TEAM_CTX(tl_team); + ucc_tl_ucp_allreduce_sw_pipeline *pipe = + task->allreduce_sliding_window.pipe; if (!task->allreduce_sliding_window.inplace) ucc_free(task->allreduce_sliding_window.sbufs); @@ -278,18 +264,18 @@ ucc_tl_ucp_allreduce_sliding_window_free_gwbi(ucc_coll_task_t *coll_task) if (!task->allreduce_sliding_window.inplace) { ucp_mem_unmap(tl_ctx->worker.ucp_context, - task->allreduce_sliding_window.src_ebuf->memh); + task->allreduce_sliding_window.src_ebuf->memh); ucc_free(task->allreduce_sliding_window.src_ebuf); ucc_free(task->allreduce_sliding_window.src_rkeys); } - + ucp_mem_unmap(tl_ctx->worker.ucp_context, - task->allreduce_sliding_window.dst_ebuf->memh); + task->allreduce_sliding_window.dst_ebuf->memh); ucc_free(task->allreduce_sliding_window.dst_ebuf); ucc_free(task->allreduce_sliding_window.dst_rkeys); ucc_free(pipe->accbuf.buf); - for(i = 0; i < task->allreduce_sliding_window.num_get_bufs; i++) { + for (i = 0; i < task->allreduce_sliding_window.num_get_bufs; i++) { ucc_free(pipe->getbuf[i].buf); } ucc_free(pipe->getbuf); diff --git a/src/components/tl/ucp/tl_ucp_coll.c b/src/components/tl/ucp/tl_ucp_coll.c index 0efd285db7..f6234866bc 100644 --- a/src/components/tl/ucp/tl_ucp_coll.c +++ b/src/components/tl/ucp/tl_ucp_coll.c @@ -26,57 +26,40 @@ const ucc_tl_ucp_default_alg_desc_t ucc_tl_ucp_default_alg_descs[UCC_TL_UCP_N_DEFAULT_ALG_SELECT_STR] = { - { - .select_str = NULL, - .str_get_fn = ucc_tl_ucp_allgather_score_str_get - }, - { - .select_str = NULL, - .str_get_fn = ucc_tl_ucp_alltoall_score_str_get - }, - { - .select_str = UCC_TL_UCP_ALLREDUCE_DEFAULT_ALG_SELECT_STR, - .str_get_fn = NULL - }, - { - .select_str = UCC_TL_UCP_BCAST_DEFAULT_ALG_SELECT_STR, - .str_get_fn = NULL - }, - { - .select_str = UCC_TL_UCP_REDUCE_DEFAULT_ALG_SELECT_STR, - .str_get_fn = NULL - }, - { - .select_str = UCC_TL_UCP_REDUCE_SCATTER_DEFAULT_ALG_SELECT_STR, - .str_get_fn = NULL - }, - { - .select_str = UCC_TL_UCP_REDUCE_SCATTERV_DEFAULT_ALG_SELECT_STR, - .str_get_fn = NULL - }, - { - .select_str = UCC_TL_UCP_ALLTOALLV_DEFAULT_ALG_SELECT_STR, - .str_get_fn = NULL - } -}; + {.select_str = NULL, .str_get_fn = ucc_tl_ucp_allgather_score_str_get}, + {.select_str = NULL, .str_get_fn = ucc_tl_ucp_alltoall_score_str_get}, + {.select_str = UCC_TL_UCP_ALLREDUCE_DEFAULT_ALG_SELECT_STR, + .str_get_fn = NULL}, + {.select_str = UCC_TL_UCP_BCAST_DEFAULT_ALG_SELECT_STR, + .str_get_fn = NULL}, + {.select_str = UCC_TL_UCP_REDUCE_DEFAULT_ALG_SELECT_STR, + .str_get_fn = NULL}, + {.select_str = UCC_TL_UCP_REDUCE_SCATTER_DEFAULT_ALG_SELECT_STR, + .str_get_fn = NULL}, + {.select_str = UCC_TL_UCP_REDUCE_SCATTERV_DEFAULT_ALG_SELECT_STR, + .str_get_fn = NULL}, + {.select_str = UCC_TL_UCP_ALLTOALLV_DEFAULT_ALG_SELECT_STR, + .str_get_fn = NULL}}; -ucc_status_t ucc_tl_ucp_team_default_score_str_alloc(ucc_tl_ucp_team_t *team, - char *default_select_str[UCC_TL_UCP_N_DEFAULT_ALG_SELECT_STR]) +ucc_status_t ucc_tl_ucp_team_default_score_str_alloc( + ucc_tl_ucp_team_t *team, + char * default_select_str[UCC_TL_UCP_N_DEFAULT_ALG_SELECT_STR]) { ucc_status_t st = UCC_OK; - int i; + int i; for (i = 0; i < UCC_TL_UCP_N_DEFAULT_ALG_SELECT_STR; i++) { if (ucc_tl_ucp_default_alg_descs[i].select_str) { - default_select_str[i] = strdup(ucc_tl_ucp_default_alg_descs[i].select_str); + default_select_str[i] = + strdup(ucc_tl_ucp_default_alg_descs[i].select_str); } else { - default_select_str[i] = ucc_tl_ucp_default_alg_descs[i].str_get_fn(team); + default_select_str[i] = + ucc_tl_ucp_default_alg_descs[i].str_get_fn(team); } if (!default_select_str[i]) { st = UCC_ERR_NO_MEMORY; goto exit; } - } exit: @@ -134,7 +117,7 @@ void ucc_tl_ucp_get_completion_cb(void *request, ucs_status_t status, void ucc_tl_ucp_recv_completion_cb(void *request, ucs_status_t status, const ucp_tag_recv_info_t *info, /* NOLINT */ - void *user_data) + void * user_data) { ucc_tl_ucp_task_t *task = (ucc_tl_ucp_task_t *)user_data; if (ucc_unlikely(UCS_OK != status)) { @@ -156,11 +139,11 @@ ucc_status_t ucc_tl_ucp_coll_finalize(ucc_coll_task_t *coll_task) } ucc_status_t ucc_tl_ucp_coll_init(ucc_base_coll_args_t *coll_args, - ucc_base_team_t *team, - ucc_coll_task_t **task_h) + ucc_base_team_t * team, + ucc_coll_task_t ** task_h) { - ucc_tl_ucp_task_t *task = ucc_tl_ucp_init_task(coll_args, team); - ucc_status_t status; + ucc_tl_ucp_task_t *task = ucc_tl_ucp_init_task(coll_args, team); + ucc_status_t status; switch (coll_args->args.coll_type) { case UCC_COLL_TYPE_BARRIER: @@ -298,8 +281,8 @@ ucc_status_t ucc_tl_ucp_alg_id_to_init(int alg_id, const char *alg_id_str, *init = ucc_tl_ucp_bcast_dbt_init; break; default: - status = UCC_ERR_INVALID_PARAM; - break; + status = UCC_ERR_INVALID_PARAM; + break; }; break; case UCC_COLL_TYPE_ALLTOALL: @@ -343,8 +326,8 @@ ucc_status_t ucc_tl_ucp_alg_id_to_init(int alg_id, const char *alg_id_str, *init = ucc_tl_ucp_reduce_dbt_init; break; default: - status = UCC_ERR_INVALID_PARAM; - break; + status = UCC_ERR_INVALID_PARAM; + break; }; break; case UCC_COLL_TYPE_REDUCE_SCATTER: diff --git a/src/components/tl/ucp/tl_ucp_coll.h b/src/components/tl/ucp/tl_ucp_coll.h index b10fcc2e9b..60e6669d87 100644 --- a/src/components/tl/ucp/tl_ucp_coll.h +++ b/src/components/tl/ucp/tl_ucp_coll.h @@ -16,46 +16,47 @@ #include "components/ec/ucc_ec.h" #include "tl_ucp_tag.h" -#define UCC_UUNITS_AUTO_RADIX 4 +#define UCC_UUNITS_AUTO_RADIX 4 #define UCC_TL_UCP_N_DEFAULT_ALG_SELECT_STR 8 -ucc_status_t ucc_tl_ucp_team_default_score_str_alloc(ucc_tl_ucp_team_t *team, - char *default_select_str[UCC_TL_UCP_N_DEFAULT_ALG_SELECT_STR]); +ucc_status_t ucc_tl_ucp_team_default_score_str_alloc( + ucc_tl_ucp_team_t *team, + char * default_select_str[UCC_TL_UCP_N_DEFAULT_ALG_SELECT_STR]); void ucc_tl_ucp_team_default_score_str_free( char *default_select_str[UCC_TL_UCP_N_DEFAULT_ALG_SELECT_STR]); -#define CALC_KN_TREE_DIST(_size, _radix, _dist) \ - do { \ - _dist = 1; \ - while (_dist * _radix < _size) { \ - _dist *= _radix; \ - } \ +#define CALC_KN_TREE_DIST(_size, _radix, _dist) \ + do { \ + _dist = 1; \ + while (_dist * _radix < _size) { \ + _dist *= _radix; \ + } \ } while (0) -#define VRANK(_rank, _root, _team_size) \ +#define VRANK(_rank, _root, _team_size) \ (((_rank) - (_root) + (_team_size)) % (_team_size)) -#define INV_VRANK(_rank, _root, _team_size) \ - (((_rank) + (_root)) % (_team_size)) +#define INV_VRANK(_rank, _root, _team_size) (((_rank) + (_root)) % (_team_size)) -#define EXEC_TASK_TEST(_phase, _errmsg, _etask) do { \ - if (_etask != NULL) { \ - status = ucc_ee_executor_task_test(_etask); \ - if (status > 0) { \ - task->super.status = UCC_INPROGRESS; \ - SAVE_STATE(_phase); \ - return; \ - } \ - ucc_ee_executor_task_finalize(_etask); \ - _etask = NULL; \ - if (ucc_unlikely(status < 0)) { \ - tl_error(UCC_TASK_LIB(task), _errmsg); \ - task->super.status = status; \ - return; \ +#define EXEC_TASK_TEST(_phase, _errmsg, _etask) \ + do { \ + if (_etask != NULL) { \ + status = ucc_ee_executor_task_test(_etask); \ + if (status > 0) { \ + task->super.status = UCC_INPROGRESS; \ + SAVE_STATE(_phase); \ + return; \ + } \ + ucc_ee_executor_task_finalize(_etask); \ + _etask = NULL; \ + if (ucc_unlikely(status < 0)) { \ + tl_error(UCC_TASK_LIB(task), _errmsg); \ + task->super.status = status; \ + return; \ + } \ } \ - } \ -} while(0) + } while (0) #define EXEC_TASK_WAIT(_etask, ...) \ do { \ @@ -77,114 +78,117 @@ void ucc_tl_ucp_team_default_score_str_free( } \ } while (0) -typedef char* (*ucc_tl_ucp_score_str_get_fn_t)(ucc_tl_ucp_team_t *team); +typedef char *(*ucc_tl_ucp_score_str_get_fn_t)(ucc_tl_ucp_team_t *team); typedef struct ucc_tl_ucp_default_alg_desc { - char *select_str; - ucc_tl_ucp_score_str_get_fn_t str_get_fn; + char * select_str; + ucc_tl_ucp_score_str_get_fn_t str_get_fn; } ucc_tl_ucp_default_alg_desc_t; -enum ucc_tl_ucp_task_flags { +enum ucc_tl_ucp_task_flags +{ /*indicates whether subset field of tl_ucp_task is set*/ UCC_TL_UCP_TASK_FLAG_SUBSET = UCC_BIT(0), }; -typedef struct ucc_tl_ucp_allreduce_sw_pipeline ucc_tl_ucp_allreduce_sw_pipeline; -typedef struct ucc_tl_ucp_allreduce_sw_host_allgather ucc_tl_ucp_allreduce_sw_host_allgather; +typedef struct ucc_tl_ucp_allreduce_sw_pipeline + ucc_tl_ucp_allreduce_sw_pipeline; +typedef struct ucc_tl_ucp_allreduce_sw_host_allgather + ucc_tl_ucp_allreduce_sw_host_allgather; typedef struct ucc_tl_ucp_task { ucc_coll_task_t super; uint32_t flags; union { struct { - uint32_t send_posted; - uint32_t send_completed; - uint32_t recv_posted; - uint32_t recv_completed; - uint32_t tag; + uint32_t send_posted; + uint32_t send_completed; + uint32_t recv_posted; + uint32_t recv_completed; + uint32_t tag; } tagged; struct { - uint32_t put_posted; - uint32_t put_completed; - uint32_t get_posted; - uint32_t get_completed; + uint32_t put_posted; + uint32_t put_completed; + uint32_t get_posted; + uint32_t get_completed; } onesided; }; - uint32_t n_polls; - ucc_subset_t subset; + uint32_t n_polls; + ucc_subset_t subset; union { struct { - int phase; - ucc_knomial_pattern_t p; + int phase; + ucc_knomial_pattern_t p; } barrier; struct { int phase; ucc_knomial_pattern_t p; - void *scratch; + void * scratch; ucc_mc_buffer_header_t *scratch_mc_header; ucc_ee_executor_task_t *etask; - ucc_ee_executor_t *executor; + ucc_ee_executor_t * executor; } allreduce_kn; struct { - int reduce_in_progress; - ucp_rkey_h *src_rkeys; //unpacked - ucp_rkey_h *dst_rkeys; //unpacked - ucp_ep_h *eps; - void **sbufs; - void **rbufs; - ucc_coll_task_t *allreduce_task_h; - ucc_tl_ucp_allreduce_sw_pipeline *pipe; - ucc_ee_executor_task_t *etask; - ucc_ee_executor_t *executor; - int put_window_size; - int num_get_bufs; - ucs_status_ptr_t *put_requests; - ucc_service_coll_req_t *allgather_scoll_req; - ucc_tl_ucp_allreduce_sw_host_allgather *allgather_data; - ucc_coll_task_t *barrier_task; + int reduce_in_progress; + ucp_rkey_h * src_rkeys; //unpacked + ucp_rkey_h * dst_rkeys; //unpacked + ucp_ep_h * eps; + void ** sbufs; + void ** rbufs; + ucc_coll_task_t * allreduce_task_h; + ucc_tl_ucp_allreduce_sw_pipeline * pipe; + ucc_ee_executor_task_t * etask; + ucc_ee_executor_t * executor; + int put_window_size; + int num_get_bufs; + ucs_status_ptr_t * put_requests; + ucc_service_coll_req_t * allgather_scoll_req; + ucc_tl_ucp_allreduce_sw_host_allgather * allgather_data; + ucc_coll_task_t * barrier_task; struct ucc_tl_ucp_allreduce_sw_export_buf *src_ebuf; struct ucc_tl_ucp_allreduce_sw_export_buf *dst_ebuf; - int inplace; + int inplace; } allreduce_sliding_window; struct { int phase; ucc_knomial_pattern_t p; - void *scratch; + void * scratch; ucc_mc_buffer_header_t *scratch_mc_header; ucc_ee_executor_task_t *etask; - ucc_ee_executor_t *executor; + ucc_ee_executor_t * executor; } reduce_scatter_kn; struct { - void *scratch; + void * scratch; size_t max_block_count; ucc_ep_map_t inv_map; int n_frags; int frag; char s_scratch_busy[2]; ucc_ee_executor_task_t *etask; - ucc_ee_executor_t *executor; + ucc_ee_executor_t * executor; } reduce_scatter_ring; struct { - void *scratch; + void * scratch; size_t max_block_count; ucc_ep_map_t inv_map; int n_frags; int frag; char s_scratch_busy[2]; ucc_ee_executor_task_t *etask; - ucc_ee_executor_t *executor; + ucc_ee_executor_t * executor; } reduce_scatterv_ring; struct { - int phase; - ucc_knomial_pattern_t p; - ucc_rank_t recv_dist; - ptrdiff_t send_offset; - ptrdiff_t recv_offset; - size_t recv_size; + int phase; + ucc_knomial_pattern_t p; + ucc_rank_t recv_dist; + ptrdiff_t send_offset; + ptrdiff_t recv_offset; + size_t recv_size; } scatter_kn; struct { int phase; ucc_knomial_pattern_t p; - void *sbuf; + void * sbuf; ucc_ee_executor_task_t *etask; ucc_rank_t recv_dist; } allgather_kn; @@ -196,23 +200,19 @@ typedef struct ucc_tl_ucp_task { * For regular allgather with rank reordering both endpoints * and blocks permutation are necessary. */ - ucc_rank_t (*get_send_block)(ucc_subset_t *subset, - ucc_rank_t trank, - ucc_rank_t tsize, - int step); - ucc_rank_t (*get_recv_block)(ucc_subset_t *subset, - ucc_rank_t trank, - ucc_rank_t tsize, - int step); + ucc_rank_t (*get_send_block)(ucc_subset_t *subset, ucc_rank_t trank, + ucc_rank_t tsize, int step); + ucc_rank_t (*get_recv_block)(ucc_subset_t *subset, ucc_rank_t trank, + ucc_rank_t tsize, int step); } allgather_ring; struct { - ucc_rank_t dist; - uint32_t radix; + ucc_rank_t dist; + uint32_t radix; } bcast_kn; struct { - ucc_dbt_single_tree_t t1; - ucc_dbt_single_tree_t t2; - int state; + ucc_dbt_single_tree_t t1; + ucc_dbt_single_tree_t t2; + int state; } bcast_dbt; struct { ucc_rank_t dist; @@ -220,20 +220,20 @@ typedef struct ucc_tl_ucp_task { int children_per_cycle; uint32_t radix; int phase; - void *scratch; + void * scratch; ucc_mc_buffer_header_t *scratch_mc_header; ucc_ee_executor_task_t *etask; - ucc_ee_executor_t *executor; + ucc_ee_executor_t * executor; } reduce_kn; struct { int state; ucc_dbt_single_tree_t trees[2]; int reduction_comp[2]; int send_comp[2]; - void *scratch; + void * scratch; ucc_mc_buffer_header_t *scratch_mc_header; ucc_ee_executor_task_t *etask; - ucc_ee_executor_t *executor; + ucc_ee_executor_t * executor; } reduce_dbt; struct { ucc_rank_t dist; @@ -261,8 +261,8 @@ typedef struct ucc_tl_ucp_task { struct { ucc_mc_buffer_header_t *scratch_mc_header; ucc_ee_executor_task_t *etask; - void *src; - void *dst; + void * src; + void * dst; ucc_rank_t iteration; int phase; } alltoall_bruck; @@ -271,7 +271,7 @@ typedef struct ucc_tl_ucp_task { typedef struct ucc_tl_ucp_schedule { ucc_schedule_pipelined_t super; - ucc_mc_buffer_header_t *scratch_mc_header; + ucc_mc_buffer_header_t * scratch_mc_header; } ucc_tl_ucp_schedule_t; #define TASK_TEAM(_task) \ @@ -285,7 +285,7 @@ typedef struct ucc_tl_ucp_schedule { #define AVG_ALPHA(_task) (1.0 / (double)UCC_TL_TEAM_SIZE(TASK_TEAM(_task))) static inline void ucc_tl_ucp_task_reset(ucc_tl_ucp_task_t *task, - ucc_status_t status) + ucc_status_t status) { task->tagged.send_posted = 0; task->tagged.send_completed = 0; @@ -296,8 +296,8 @@ static inline void ucc_tl_ucp_task_reset(ucc_tl_ucp_task_t *task, static inline ucc_tl_ucp_task_t *ucc_tl_ucp_get_task(ucc_tl_ucp_team_t *team) { - ucc_tl_ucp_context_t *ctx = UCC_TL_UCP_TEAM_CTX(team); - ucc_tl_ucp_task_t *task = (ucc_tl_ucp_task_t*) ucc_mpool_get(&ctx->req_mp); + ucc_tl_ucp_context_t *ctx = UCC_TL_UCP_TEAM_CTX(team); + ucc_tl_ucp_task_t *task = (ucc_tl_ucp_task_t *)ucc_mpool_get(&ctx->req_mp); UCC_TL_UCP_PROFILE_REQUEST_NEW(task, "tl_ucp_task", 0); task->super.flags = 0; @@ -318,13 +318,12 @@ static inline void ucc_tl_ucp_put_task(ucc_tl_ucp_task_t *task) } static inline ucc_status_t -ucc_tl_ucp_get_schedule(ucc_tl_ucp_team_t *team, - ucc_base_coll_args_t *args, +ucc_tl_ucp_get_schedule(ucc_tl_ucp_team_t *team, ucc_base_coll_args_t *args, ucc_tl_ucp_schedule_t **schedule) { - ucc_tl_ucp_context_t *ctx = UCC_TL_UCP_TEAM_CTX(team); + ucc_tl_ucp_context_t *ctx = UCC_TL_UCP_TEAM_CTX(team); - *schedule = (ucc_tl_ucp_schedule_t*) ucc_mpool_get(&ctx->req_mp); + *schedule = (ucc_tl_ucp_schedule_t *)ucc_mpool_get(&ctx->req_mp); if (ucc_unlikely(!(*schedule))) { return UCC_ERR_NO_MEMORY; @@ -340,7 +339,6 @@ static inline void ucc_tl_ucp_put_schedule(ucc_schedule_t *schedule) ucc_mpool_put(schedule); } - ucc_status_t ucc_tl_ucp_coll_init(ucc_base_coll_args_t *coll_args, ucc_base_team_t * team, ucc_coll_task_t ** task_h); @@ -361,17 +359,17 @@ ucc_tl_ucp_init_task(ucc_base_coll_args_t *coll_args, ucc_base_team_t *team) if (UCC_COLL_ARGS_ACTIVE_SET(&coll_args->args)) { task->tagged.tag = (coll_args->mask & UCC_COLL_ARGS_FIELD_TAG) - ? coll_args->args.tag : UCC_TL_UCP_ACTIVE_SET_TAG; - task->flags |= UCC_TL_UCP_TASK_FLAG_SUBSET; - task->subset.map = ucc_active_set_to_ep_map(&coll_args->args); + ? coll_args->args.tag + : UCC_TL_UCP_ACTIVE_SET_TAG; + task->flags |= UCC_TL_UCP_TASK_FLAG_SUBSET; + task->subset.map = ucc_active_set_to_ep_map(&coll_args->args); task->subset.myrank = - ucc_ep_map_local_rank(task->subset.map, - UCC_TL_TEAM_RANK(tl_team)); + ucc_ep_map_local_rank(task->subset.map, UCC_TL_TEAM_RANK(tl_team)); ucc_assert(coll_args->args.coll_type == UCC_COLL_TYPE_BCAST); /* root value in args corresponds to the original team ranks, need to convert to subset local value */ - TASK_ARGS(task).root = ucc_ep_map_local_rank(task->subset.map, - coll_args->args.root); + TASK_ARGS(task).root = + ucc_ep_map_local_rank(task->subset.map, coll_args->args.root); } else { if (coll_args->mask & UCC_COLL_ARGS_FIELD_TAG) { task->tagged.tag = coll_args->args.tag; @@ -381,7 +379,7 @@ ucc_tl_ucp_init_task(ucc_base_coll_args_t *coll_args, ucc_base_team_t *team) } } - task->super.finalize = ucc_tl_ucp_coll_finalize; + task->super.finalize = ucc_tl_ucp_coll_finalize; return task; } @@ -495,11 +493,9 @@ ucc_status_t ucc_tl_ucp_alg_id_to_init(int alg_id, const char *alg_id_str, ucc_base_coll_init_fn_t *init); static inline unsigned -ucc_tl_ucp_get_radix_from_range(ucc_tl_ucp_team_t *team, - size_t msgsize, +ucc_tl_ucp_get_radix_from_range(ucc_tl_ucp_team_t *team, size_t msgsize, ucc_memory_type_t mem_type, - ucc_mrange_uint_t *p, - ucc_rank_t default_value) + ucc_mrange_uint_t *p, ucc_rank_t default_value) { unsigned radix; diff --git a/test/gtest/coll/test_allreduce.cc b/test/gtest/coll/test_allreduce.cc index 2c38a5fe0e..46530fbaba 100644 --- a/test/gtest/coll/test_allreduce.cc +++ b/test/gtest/coll/test_allreduce.cc @@ -12,7 +12,7 @@ #include -template +template class test_allreduce : public UccCollArgs, public testing::Test { public: void data_init(int nprocs, ucc_datatype_t dt, size_t count, @@ -20,10 +20,11 @@ class test_allreduce : public UccCollArgs, public testing::Test { { ctxs.resize(nprocs); for (int r = 0; r < nprocs; r++) { - ucc_coll_args_t *coll = (ucc_coll_args_t*) - calloc(1, sizeof(ucc_coll_args_t)); + ucc_coll_args_t *coll = + (ucc_coll_args_t *)calloc(1, sizeof(ucc_coll_args_t)); - ctxs[r] = (gtest_ucc_coll_ctx_t*)calloc(1, sizeof(gtest_ucc_coll_ctx_t)); + ctxs[r] = + (gtest_ucc_coll_ctx_t *)calloc(1, sizeof(gtest_ucc_coll_ctx_t)); ctxs[r]->args = coll; coll->coll_type = UCC_COLL_TYPE_ALLREDUCE; @@ -33,7 +34,7 @@ class test_allreduce : public UccCollArgs, public testing::Test { ctxs[r]->init_buf = ucc_malloc(ucc_dt_size(dt) * count, "init buf"); EXPECT_NE(ctxs[r]->init_buf, nullptr); for (int i = 0; i < count; i++) { - typename T::type * ptr; + typename T::type *ptr; ptr = (typename T::type *)ctxs[r]->init_buf; /* need to limit the init value so that "prod" operation would not grow too large. We have teams up to 16 procs @@ -47,11 +48,11 @@ class test_allreduce : public UccCollArgs, public testing::Test { ucc_dt_size(dt) * count, mem_type)); coll->dst.info.buffer = ctxs[r]->dst_mc_header->addr; if (TEST_INPLACE == inplace) { - coll->mask |= UCC_COLL_ARGS_FIELD_FLAGS; + coll->mask |= UCC_COLL_ARGS_FIELD_FLAGS; coll->flags |= UCC_COLL_ARGS_FLAG_IN_PLACE; - UCC_CHECK(ucc_mc_memcpy(coll->dst.info.buffer, ctxs[r]->init_buf, - ucc_dt_size(dt) * count, mem_type, - UCC_MEMORY_TYPE_HOST)); + UCC_CHECK(ucc_mc_memcpy( + coll->dst.info.buffer, ctxs[r]->init_buf, + ucc_dt_size(dt) * count, mem_type, UCC_MEMORY_TYPE_HOST)); coll->src.info.mem_type = UCC_MEMORY_TYPE_UNKNOWN; coll->src.info.count = SIZE_MAX; coll->src.info.datatype = (ucc_datatype_t)-1; @@ -59,9 +60,9 @@ class test_allreduce : public UccCollArgs, public testing::Test { UCC_CHECK(ucc_mc_alloc(&ctxs[r]->src_mc_header, ucc_dt_size(dt) * count, mem_type)); coll->src.info.buffer = ctxs[r]->src_mc_header->addr; - UCC_CHECK(ucc_mc_memcpy(coll->src.info.buffer, ctxs[r]->init_buf, - ucc_dt_size(dt) * count, mem_type, - UCC_MEMORY_TYPE_HOST)); + UCC_CHECK(ucc_mc_memcpy( + coll->src.info.buffer, ctxs[r]->init_buf, + ucc_dt_size(dt) * count, mem_type, UCC_MEMORY_TYPE_HOST)); coll->src.info.mem_type = mem_type; coll->src.info.count = (ucc_count_t)count; coll->src.info.datatype = dt; @@ -70,14 +71,15 @@ class test_allreduce : public UccCollArgs, public testing::Test { coll->dst.info.count = (ucc_count_t)count; coll->dst.info.datatype = dt; if (persistent) { - coll->mask |= UCC_COLL_ARGS_FIELD_FLAGS; + coll->mask |= UCC_COLL_ARGS_FIELD_FLAGS; coll->flags |= UCC_COLL_ARGS_FLAG_PERSISTENT; } } } - void data_fini(UccCollCtxVec ctxs) { - for (gtest_ucc_coll_ctx_t* ctx : ctxs) { - ucc_coll_args_t* coll = ctx->args; + void data_fini(UccCollCtxVec ctxs) + { + for (gtest_ucc_coll_ctx_t *ctx : ctxs) { + ucc_coll_args_t *coll = ctx->args; if (coll->src.info.buffer) { /* no inplace */ UCC_CHECK(ucc_mc_free(ctx->src_mc_header)); } @@ -107,16 +109,17 @@ class test_allreduce : public UccCollArgs, public testing::Test { } bool data_validate(UccCollCtxVec ctxs) { - size_t count = (ctxs[0])->args->dst.info.count; + size_t count = (ctxs[0])->args->dst.info.count; std::vector dsts(ctxs.size()); if (UCC_MEMORY_TYPE_HOST != mem_type) { for (int r = 0; r < ctxs.size(); r++) { - dsts[r] = (typename T::type *) ucc_malloc(count * sizeof(typename T::type), "dsts buf"); + dsts[r] = (typename T::type *)ucc_malloc( + count * sizeof(typename T::type), "dsts buf"); EXPECT_NE(dsts[r], nullptr); UCC_CHECK(ucc_mc_memcpy(dsts[r], ctxs[r]->args->dst.info.buffer, - count * sizeof(typename T::type), UCC_MEMORY_TYPE_HOST, - mem_type)); + count * sizeof(typename T::type), + UCC_MEMORY_TYPE_HOST, mem_type)); } } else { for (int r = 0; r < ctxs.size(); r++) { @@ -125,14 +128,15 @@ class test_allreduce : public UccCollArgs, public testing::Test { } for (int i = 0; i < count; i++) { typename T::type res = - ((typename T::type *)((ctxs[0])->init_buf))[i]; + ((typename T::type *)((ctxs[0])->init_buf))[i]; for (int r = 1; r < ctxs.size(); r++) { - res = T::do_op(res, ((typename T::type *)((ctxs[r])->init_buf))[i]); + res = T::do_op(res, + ((typename T::type *)((ctxs[r])->init_buf))[i]); } if (T::redop == UCC_OP_AVG) { - if (T::dt == UCC_DT_BFLOAT16){ - float32tobfloat16(bfloat16tofloat32(&res) / (float)ctxs.size(), - &res); + if (T::dt == UCC_DT_BFLOAT16) { + float32tobfloat16( + bfloat16tofloat32(&res) / (float)ctxs.size(), &res); } else { res = res / (typename T::type)ctxs.size(); } @@ -149,11 +153,11 @@ class test_allreduce : public UccCollArgs, public testing::Test { return true; } }; -template -class test_allreduce_host : public test_allreduce {}; +template class test_allreduce_host : public test_allreduce { +}; -template -class test_allreduce_cuda : public test_allreduce {}; +template class test_allreduce_cuda : public test_allreduce { +}; TYPED_TEST_CASE(test_allreduce_host, CollReduceTypeOpsHost); TYPED_TEST_CASE(test_allreduce_cuda, CollReduceTypeOpsCuda); @@ -168,7 +172,8 @@ TYPED_TEST_CASE(test_allreduce_cuda, CollReduceTypeOpsCuda); UccCollCtxVec ctxs; \ SET_MEM_TYPE(_mem_type); \ this->set_inplace(_inplace); \ - this->data_init(size, TypeParam::dt, count, ctxs, _persistent);\ + this->data_init(size, TypeParam::dt, count, ctxs, \ + _persistent); \ UccReq req(team, ctxs); \ for (auto i = 0; i < _repeat; i++) { \ req.start(); \ @@ -181,7 +186,8 @@ TYPED_TEST_CASE(test_allreduce_cuda, CollReduceTypeOpsCuda); } \ } -TYPED_TEST(test_allreduce_host, single) { +TYPED_TEST(test_allreduce_host, single) +{ TEST_DECLARE(UCC_MEMORY_TYPE_HOST, TEST_NO_INPLACE, 1, 0); } @@ -190,7 +196,8 @@ TYPED_TEST(test_allreduce_host, single_persistent) TEST_DECLARE(UCC_MEMORY_TYPE_HOST, TEST_NO_INPLACE, 3, 1); } -TYPED_TEST(test_allreduce_host, single_inplace) { +TYPED_TEST(test_allreduce_host, single_inplace) +{ TEST_DECLARE(UCC_MEMORY_TYPE_HOST, TEST_INPLACE, 1, 0); } @@ -200,7 +207,8 @@ TYPED_TEST(test_allreduce_host, single_persistent_inplace) } #ifdef HAVE_CUDA -TYPED_TEST(test_allreduce_cuda, single) { +TYPED_TEST(test_allreduce_cuda, single) +{ TEST_DECLARE(UCC_MEMORY_TYPE_CUDA, TEST_NO_INPLACE, 1, 0); } @@ -209,7 +217,8 @@ TYPED_TEST(test_allreduce_cuda, single_persistent) TEST_DECLARE(UCC_MEMORY_TYPE_CUDA, TEST_NO_INPLACE, 3, 1); } -TYPED_TEST(test_allreduce_cuda, single_inplace) { +TYPED_TEST(test_allreduce_cuda, single_inplace) +{ TEST_DECLARE(UCC_MEMORY_TYPE_CUDA, TEST_INPLACE, 1, 0); } @@ -217,22 +226,24 @@ TYPED_TEST(test_allreduce_cuda, single_persistent_inplace) { TEST_DECLARE(UCC_MEMORY_TYPE_CUDA, TEST_INPLACE, 3, 1); } -TYPED_TEST(test_allreduce_cuda, single_managed) { - TEST_DECLARE( UCC_MEMORY_TYPE_CUDA_MANAGED, TEST_NO_INPLACE, 1, 0); +TYPED_TEST(test_allreduce_cuda, single_managed) +{ + TEST_DECLARE(UCC_MEMORY_TYPE_CUDA_MANAGED, TEST_NO_INPLACE, 1, 0); } TYPED_TEST(test_allreduce_cuda, single_persistent_managed) { - TEST_DECLARE( UCC_MEMORY_TYPE_CUDA_MANAGED, TEST_NO_INPLACE, 3, 1); + TEST_DECLARE(UCC_MEMORY_TYPE_CUDA_MANAGED, TEST_NO_INPLACE, 3, 1); } -TYPED_TEST(test_allreduce_cuda, single_inplace_managed) { - TEST_DECLARE( UCC_MEMORY_TYPE_CUDA_MANAGED, TEST_INPLACE, 1, 0); +TYPED_TEST(test_allreduce_cuda, single_inplace_managed) +{ + TEST_DECLARE(UCC_MEMORY_TYPE_CUDA_MANAGED, TEST_INPLACE, 1, 0); } TYPED_TEST(test_allreduce_cuda, single_persistent_inplace_managed) { - TEST_DECLARE( UCC_MEMORY_TYPE_CUDA_MANAGED, TEST_INPLACE, 3, 1); + TEST_DECLARE(UCC_MEMORY_TYPE_CUDA_MANAGED, TEST_INPLACE, 3, 1); } #endif @@ -261,43 +272,50 @@ TYPED_TEST(test_allreduce_cuda, single_persistent_inplace_managed) } \ } -TYPED_TEST(test_allreduce_host, multiple) { +TYPED_TEST(test_allreduce_host, multiple) +{ TEST_DECLARE_MULTIPLE(UCC_MEMORY_TYPE_HOST, TEST_NO_INPLACE); } -TYPED_TEST(test_allreduce_host, multiple_inplace) { +TYPED_TEST(test_allreduce_host, multiple_inplace) +{ TEST_DECLARE_MULTIPLE(UCC_MEMORY_TYPE_HOST, TEST_INPLACE); } #ifdef HAVE_CUDA -TYPED_TEST(test_allreduce_cuda, multiple) { +TYPED_TEST(test_allreduce_cuda, multiple) +{ TEST_DECLARE_MULTIPLE(UCC_MEMORY_TYPE_CUDA, TEST_NO_INPLACE); } -TYPED_TEST(test_allreduce_cuda, multiple_inplace) { +TYPED_TEST(test_allreduce_cuda, multiple_inplace) +{ TEST_DECLARE_MULTIPLE(UCC_MEMORY_TYPE_CUDA, TEST_INPLACE); } -TYPED_TEST(test_allreduce_cuda, multiple_managed) { - TEST_DECLARE_MULTIPLE( UCC_MEMORY_TYPE_CUDA_MANAGED, TEST_NO_INPLACE); +TYPED_TEST(test_allreduce_cuda, multiple_managed) +{ + TEST_DECLARE_MULTIPLE(UCC_MEMORY_TYPE_CUDA_MANAGED, TEST_NO_INPLACE); } -TYPED_TEST(test_allreduce_cuda, multiple_inplace_managed) { - TEST_DECLARE_MULTIPLE( UCC_MEMORY_TYPE_CUDA_MANAGED, TEST_INPLACE); +TYPED_TEST(test_allreduce_cuda, multiple_inplace_managed) +{ + TEST_DECLARE_MULTIPLE(UCC_MEMORY_TYPE_CUDA_MANAGED, TEST_INPLACE); } #endif -template -class test_allreduce_alg : public test_allreduce -{}; +template class test_allreduce_alg : public test_allreduce { +}; using test_allreduce_alg_type = ::testing::Types>; TYPED_TEST_CASE(test_allreduce_alg, test_allreduce_alg_type); -TYPED_TEST(test_allreduce_alg, sra_knomial_pipelined) { +TYPED_TEST(test_allreduce_alg, sra_knomial_pipelined) +{ int n_procs = 15; - ucc_job_env_t env = {{"UCC_CL_BASIC_TUNE", "inf"}, - {"UCC_TL_UCP_TUNE", "allreduce:@sra_knomial:inf"}, - {"UCC_TL_UCP_ALLREDUCE_SRA_KN_PIPELINE", "thresh=1024:nfrags=11"}}; + ucc_job_env_t env = { + {"UCC_CL_BASIC_TUNE", "inf"}, + {"UCC_TL_UCP_TUNE", "allreduce:@sra_knomial:inf"}, + {"UCC_TL_UCP_ALLREDUCE_SRA_KN_PIPELINE", "thresh=1024:nfrags=11"}}; UccJob job(n_procs, UccJob::UCC_JOB_CTX_GLOBAL, env); UccTeam_h team = job.create_team(n_procs); int repeat = 3; @@ -307,8 +325,8 @@ TYPED_TEST(test_allreduce_alg, sra_knomial_pipelined) { if (UCC_OK == ucc_mc_available(UCC_MEMORY_TYPE_CUDA)) { mt.push_back(UCC_MEMORY_TYPE_CUDA); } - if (UCC_OK == ucc_mc_available( UCC_MEMORY_TYPE_CUDA_MANAGED)) { - mt.push_back( UCC_MEMORY_TYPE_CUDA_MANAGED); + if (UCC_OK == ucc_mc_available(UCC_MEMORY_TYPE_CUDA_MANAGED)) { + mt.push_back(UCC_MEMORY_TYPE_CUDA_MANAGED); } for (auto count : {65536, 123567}) { @@ -331,10 +349,11 @@ TYPED_TEST(test_allreduce_alg, sra_knomial_pipelined) { } } -TYPED_TEST(test_allreduce_alg, dbt) { +TYPED_TEST(test_allreduce_alg, dbt) +{ int n_procs = 15; ucc_job_env_t env = {{"UCC_CL_BASIC_TUNE", "inf"}, - {"UCC_TL_UCP_TUNE", "allreduce:@dbt:inf"}}; + {"UCC_TL_UCP_TUNE", "allreduce:@dbt:inf"}}; UccJob job(n_procs, UccJob::UCC_JOB_CTX_GLOBAL, env); UccTeam_h team = job.create_team(n_procs); int repeat = 3; @@ -344,8 +363,8 @@ TYPED_TEST(test_allreduce_alg, dbt) { if (UCC_OK == ucc_mc_available(UCC_MEMORY_TYPE_CUDA)) { mt.push_back(UCC_MEMORY_TYPE_CUDA); } - if (UCC_OK == ucc_mc_available( UCC_MEMORY_TYPE_CUDA_MANAGED)) { - mt.push_back( UCC_MEMORY_TYPE_CUDA_MANAGED); + if (UCC_OK == ucc_mc_available(UCC_MEMORY_TYPE_CUDA_MANAGED)) { + mt.push_back(UCC_MEMORY_TYPE_CUDA_MANAGED); } for (auto count : {65536, 123567}) { @@ -368,17 +387,19 @@ TYPED_TEST(test_allreduce_alg, dbt) { } } -TYPED_TEST(test_allreduce_alg, rab) { +TYPED_TEST(test_allreduce_alg, rab) +{ int n_procs = 15; ucc_job_env_t env = {{"UCC_CL_HIER_TUNE", "allreduce:@rab:0-inf:inf"}, - {"UCC_CLS", "all"}}; + {"UCC_CLS", "all"}}; UccJob job(n_procs, UccJob::UCC_JOB_CTX_GLOBAL, env); UccTeam_h team = job.create_team(n_procs); int repeat = 3; UccCollCtxVec ctxs; std::vector mt = {UCC_MEMORY_TYPE_HOST}; - if (UCC_OK == ucc_mc_available(UCC_MEMORY_TYPE_CUDA)) { //add cuda_managed for cl hier? + if (UCC_OK == ucc_mc_available( + UCC_MEMORY_TYPE_CUDA)) { //add cuda_managed for cl hier? mt.push_back(UCC_MEMORY_TYPE_CUDA); } @@ -402,18 +423,21 @@ TYPED_TEST(test_allreduce_alg, rab) { } } -TYPED_TEST(test_allreduce_alg, rab_pipelined) { +TYPED_TEST(test_allreduce_alg, rab_pipelined) +{ int n_procs = 15; - ucc_job_env_t env = {{"UCC_CL_HIER_TUNE", "allreduce:@rab:0-inf:inf"}, - {"UCC_CL_HIER_ALLREDUCE_RAB_PIPELINE", "thresh=1024:nfrags=11"}, - {"UCC_CLS", "all"}}; + ucc_job_env_t env = { + {"UCC_CL_HIER_TUNE", "allreduce:@rab:0-inf:inf"}, + {"UCC_CL_HIER_ALLREDUCE_RAB_PIPELINE", "thresh=1024:nfrags=11"}, + {"UCC_CLS", "all"}}; UccJob job(n_procs, UccJob::UCC_JOB_CTX_GLOBAL, env); UccTeam_h team = job.create_team(n_procs); int repeat = 3; UccCollCtxVec ctxs; std::vector mt = {UCC_MEMORY_TYPE_HOST}; - if (UCC_OK == ucc_mc_available(UCC_MEMORY_TYPE_CUDA)) { //add cuda_managed for cl hier? + if (UCC_OK == ucc_mc_available( + UCC_MEMORY_TYPE_CUDA)) { //add cuda_managed for cl hier? mt.push_back(UCC_MEMORY_TYPE_CUDA); } @@ -437,18 +461,20 @@ TYPED_TEST(test_allreduce_alg, rab_pipelined) { } } -TYPED_TEST(test_allreduce_alg, sliding_window) { +TYPED_TEST(test_allreduce_alg, sliding_window) +{ int n_procs = 8; ucc_job_env_t env = {{"UCC_TL_UCP_TUNE", "allreduce:@2"}, - {"UCC_CLS", "all"}}; + {"UCC_CLS", "all"}}; UccJob job(n_procs, UccJob::UCC_JOB_CTX_GLOBAL_ONESIDED, env); UccTeam_h team = job.create_team(n_procs); int repeat = 3; UccCollCtxVec ctxs; - std::vector mt = {UCC_MEMORY_TYPE_HOST}; - ucp_info_t *ucp_infos = NULL; + std::vector mt = {UCC_MEMORY_TYPE_HOST}; + ucp_info_t * ucp_infos = NULL; - if (UCC_OK == ucc_mc_available(UCC_MEMORY_TYPE_CUDA)) { //add cuda_managed for cl hier? + if (UCC_OK == ucc_mc_available( + UCC_MEMORY_TYPE_CUDA)) { //add cuda_managed for cl hier? mt.push_back(UCC_MEMORY_TYPE_CUDA); } @@ -504,8 +530,8 @@ TYPED_TEST(test_allreduce_avg_order, avg_post_op) if (UCC_OK == ucc_mc_available(UCC_MEMORY_TYPE_CUDA)) { mt.push_back(UCC_MEMORY_TYPE_CUDA); } - if (UCC_OK == ucc_mc_available( UCC_MEMORY_TYPE_CUDA_MANAGED)) { - mt.push_back( UCC_MEMORY_TYPE_CUDA_MANAGED); + if (UCC_OK == ucc_mc_available(UCC_MEMORY_TYPE_CUDA_MANAGED)) { + mt.push_back(UCC_MEMORY_TYPE_CUDA_MANAGED); } for (auto count : {4, 256, 65536}) { diff --git a/test/gtest/coll/test_allreduce_sliding_window.cc b/test/gtest/coll/test_allreduce_sliding_window.cc index 31172536e0..de1304ca99 100644 --- a/test/gtest/coll/test_allreduce_sliding_window.cc +++ b/test/gtest/coll/test_allreduce_sliding_window.cc @@ -20,10 +20,10 @@ int ucp_init_ex(ucp_context_h *ucp_ctx) { - ucs_status_t ucs_status; - ucp_config_t *ucp_config; - ucp_params_t ucp_params; - ucp_context_h ucp_context; + ucs_status_t ucs_status; + ucp_config_t *ucp_config; + ucp_params_t ucp_params; + ucp_context_h ucp_context; ucs_status = ucp_config_read(NULL, NULL, &ucp_config); assert(ucs_status == UCS_OK); @@ -46,7 +46,7 @@ int ucp_init_ex(ucp_context_h *ucp_ctx) void ep_err_cb(void *arg, ucp_ep_h ep, ucs_status_t ucs_status) { printf("Endpoint error detected, status: %s\n", - ucs_status_string(ucs_status)); + ucs_status_string(ucs_status)); } int buffer_export_ucc(ucp_context_h ucp_context, void *buf, size_t len, @@ -86,15 +86,16 @@ void setup_gwbi(int n_procs, UccCollCtxVec &ctxs, { int i; - ucp_info_t *ucp_infos = (ucp_info_t*) ucc_malloc(sizeof(ucp_info_t) * n_procs); + ucp_info_t *ucp_infos = + (ucp_info_t *)ucc_malloc(sizeof(ucp_info_t) * n_procs); *ucp_infos_p = ucp_infos; // allocate gwbi for (auto ctx : ctxs) { ucc_tl_ucp_allreduce_sw_global_work_buf_info *gwbi = - (ucc_tl_ucp_allreduce_sw_global_work_buf_info*) - ucc_malloc(sizeof(ucc_tl_ucp_allreduce_sw_global_work_buf_info), - "global work buf info"); + (ucc_tl_ucp_allreduce_sw_global_work_buf_info *)ucc_malloc( + sizeof(ucc_tl_ucp_allreduce_sw_global_work_buf_info), + "global work buf info"); ctx->args->global_work_buffer = gwbi; } @@ -110,12 +111,12 @@ void setup_gwbi(int n_procs, UccCollCtxVec &ctxs, for (i = 0; i < n_procs; i++) { // my proc's gwbi ucc_tl_ucp_allreduce_sw_global_work_buf_info *gwbi = - (ucc_tl_ucp_allreduce_sw_global_work_buf_info *) - ctxs[i]->args->global_work_buffer; + (ucc_tl_ucp_allreduce_sw_global_work_buf_info *)ctxs[i] + ->args->global_work_buffer; // my proc's ucp_info - ucp_info_t *ucp_info = &ucp_infos[i]; + ucp_info_t * ucp_info = &ucp_infos[i]; struct export_buf *dst_ebuf = &ucp_info->dst_ebuf; - size_t dst_len = ctxs[i]->args->dst.info.count * + size_t dst_len = ctxs[i]->args->dst.info.count * ucc_dt_size(ctxs[i]->args->dst.info.datatype); buffer_export_ucc(ucp_info->ucp_ctx, ctxs[i]->args->dst.info.buffer, @@ -136,8 +137,8 @@ void setup_gwbi(int n_procs, UccCollCtxVec &ctxs, // set the flag that indicates the global work buffer was passed for (auto ctx : ctxs) { - ctx->args->mask |= UCC_COLL_ARGS_FIELD_FLAGS | - UCC_COLL_ARGS_FIELD_GLOBAL_WORK_BUFFER; + ctx->args->mask |= + UCC_COLL_ARGS_FIELD_FLAGS | UCC_COLL_ARGS_FIELD_GLOBAL_WORK_BUFFER; ctx->args->flags |= UCC_COLL_ARGS_FLAG_MEM_MAPPED_BUFFERS; } } @@ -169,8 +170,8 @@ void free_gwbi(int n_procs, UccCollCtxVec &ctxs, ucp_info_t *ucp_infos, // free gwbi and each gwbi's set of pipes for (k = 0; k < n_procs; k++) { ucc_tl_ucp_allreduce_sw_global_work_buf_info *gwbi = - (ucc_tl_ucp_allreduce_sw_global_work_buf_info *) - ctxs[k]->args->global_work_buffer; + (ucc_tl_ucp_allreduce_sw_global_work_buf_info *)ctxs[k] + ->args->global_work_buffer; ucc_free(gwbi); } diff --git a/test/gtest/coll/test_allreduce_sliding_window.h b/test/gtest/coll/test_allreduce_sliding_window.h index 5ed75c606a..734e58e6c9 100644 --- a/test/gtest/coll/test_allreduce_sliding_window.h +++ b/test/gtest/coll/test_allreduce_sliding_window.h @@ -6,23 +6,23 @@ struct export_buf { ucp_context_h ucp_context; ucp_mem_h memh; - void *packed_memh; + void * packed_memh; size_t packed_memh_len; uint64_t memh_id; }; typedef struct ucp_info { - ucp_context_h ucp_ctx; - struct export_buf src_ebuf; - struct export_buf dst_ebuf; + ucp_context_h ucp_ctx; + struct export_buf src_ebuf; + struct export_buf dst_ebuf; } ucp_info_t; void free_gwbi(int n_procs, UccCollCtxVec &ctxs, ucp_info_t *ucp_infos, bool inplace); void setup_gwbi(int n_procs, UccCollCtxVec &ctxs, ucp_info_t **ucp_infos_p /* out */, bool inplace); -int buffer_export_ucc(ucp_context_h ucp_context, void *buf, size_t len, - struct export_buf *ebuf); +int buffer_export_ucc(ucp_context_h ucp_context, void *buf, size_t len, + struct export_buf *ebuf); void ep_err_cb(void *arg, ucp_ep_h ep, ucs_status_t ucs_status); int ucp_init_ex(ucp_context_h *ucp_ctx);