diff --git a/src/components/tl/ucp/Makefile.am b/src/components/tl/ucp/Makefile.am index cc0a118c60..4d684adfb5 100644 --- a/src/components/tl/ucp/Makefile.am +++ b/src/components/tl/ucp/Makefile.am @@ -34,10 +34,12 @@ alltoallv = \ alltoallv/alltoallv_pairwise.c \ alltoallv/alltoallv_hybrid.c -allreduce = \ - allreduce/allreduce.h \ - allreduce/allreduce.c \ - allreduce/allreduce_knomial.c \ +allreduce = \ + allreduce/allreduce.h \ + allreduce/allreduce.c \ + allreduce/allreduce_knomial.c \ + allreduce/allreduce_sliding_window.c \ + allreduce/allreduce_sliding_window_setup.c \ allreduce/allreduce_sra_knomial.c barrier = \ diff --git a/src/components/tl/ucp/allreduce/allreduce.c b/src/components/tl/ucp/allreduce/allreduce.c index 90e9d6bf48..97d35d4076 100644 --- a/src/components/tl/ucp/allreduce/allreduce.c +++ b/src/components/tl/ucp/allreduce/allreduce.c @@ -13,13 +13,17 @@ ucc_base_coll_alg_info_t [UCC_TL_UCP_ALLREDUCE_ALG_KNOMIAL] = {.id = UCC_TL_UCP_ALLREDUCE_ALG_KNOMIAL, .name = "knomial", - .desc = - "recursive knomial with arbitrary radix (optimized for latency)"}, + .desc = "recursive knomial with arbitrary radix (optimized for " + "latency)"}, [UCC_TL_UCP_ALLREDUCE_ALG_SRA_KNOMIAL] = {.id = UCC_TL_UCP_ALLREDUCE_ALG_SRA_KNOMIAL, .name = "sra_knomial", .desc = "recursive knomial scatter-reduce followed by knomial " "allgather (optimized for BW)"}, + [UCC_TL_UCP_ALLREDUCE_ALG_SLIDING_WINDOW] = + {.id = UCC_TL_UCP_ALLREDUCE_ALG_SLIDING_WINDOW, + .name = "sliding_window", + .desc = "sliding window allreduce (optimized for running on DPU)"}, [UCC_TL_UCP_ALLREDUCE_ALG_LAST] = { .id = 0, .name = NULL, .desc = NULL}}; @@ -46,3 +50,11 @@ ucc_status_t ucc_tl_ucp_allreduce_knomial_init(ucc_base_coll_args_t *coll_args, out: return status; } + +ucc_status_t +ucc_tl_ucp_allreduce_sliding_window_init(ucc_base_coll_args_t *coll_args, + ucc_base_team_t * team, + ucc_coll_task_t ** task_h) +{ + return UCC_OK; +} diff --git a/src/components/tl/ucp/allreduce/allreduce.h b/src/components/tl/ucp/allreduce/allreduce.h index 250bebc981..5e545b6135 100644 --- a/src/components/tl/ucp/allreduce/allreduce.h +++ b/src/components/tl/ucp/allreduce/allreduce.h @@ -11,6 +11,7 @@ enum { UCC_TL_UCP_ALLREDUCE_ALG_KNOMIAL, UCC_TL_UCP_ALLREDUCE_ALG_SRA_KNOMIAL, + UCC_TL_UCP_ALLREDUCE_ALG_SLIDING_WINDOW, UCC_TL_UCP_ALLREDUCE_ALG_LAST }; @@ -35,21 +36,116 @@ ucc_status_t ucc_tl_ucp_allreduce_init(ucc_tl_ucp_task_t *task); #define ALLREDUCE_TASK_CHECK(_args, _team) \ CHECK_SAME_MEMTYPE((_args), (_team)); +#define ALLREDUCE_PACKED_KEY_MAX_LEN 1024 + +typedef struct ucc_tl_ucp_allreduce_sw_global_work_buf_info { + void *packed_src_memh; + void *packed_dst_memh; +} ucc_tl_ucp_allreduce_sw_global_work_buf_info; + +typedef enum ucc_tl_ucp_allreduce_sw_buf_state +{ + FREE, + RECVING, + REDUCING, + REDUCED, + SENDING, + IDLE, +} ucc_tl_ucp_allreduce_sw_buf_state; + +typedef struct ucc_tl_ucp_allreduce_sw_buf { + void * buf; + ucc_tl_ucp_allreduce_sw_buf_state state; + ucs_status_ptr_t ucp_req; + size_t count; + size_t bytes; +} ucc_tl_ucp_allreduce_sw_buf; + +typedef struct ucc_tl_ucp_allreduce_sw_pipeline { + ucc_tl_ucp_allreduce_sw_buf accbuf; + ucc_tl_ucp_allreduce_sw_buf *getbuf; + ucs_status_ptr_t * put_requests; + size_t buffer_size; + size_t num_buffers; + size_t avail_buffs; + size_t my_count; + size_t my_offset; + size_t count_issued; + size_t count_received; + size_t count_reduced; + size_t count_serviced; + size_t get_idx; + size_t red_idx; + ucc_rank_t src_rank; + ucc_rank_t dst_rank; + int done_get; + int done_red; + int done_put; + int posted_put; +} ucc_tl_ucp_allreduce_sw_pipeline; + +struct ucc_tl_ucp_allreduce_sw_export_buf { + ucp_context_h ucp_context; + ucp_mem_h memh; + void * packed_memh; + size_t packed_memh_len; + void * packed_key; + size_t packed_key_len; + uint64_t memh_id; +}; + +typedef struct ucc_tl_ucp_allreduce_sw_host_allgather { + void *src_buf; + void *dst_buf; + char packed_src_key[ALLREDUCE_PACKED_KEY_MAX_LEN]; + char packed_dst_key[ALLREDUCE_PACKED_KEY_MAX_LEN]; +} ucc_tl_ucp_allreduce_sw_host_allgather; + ucc_status_t ucc_tl_ucp_allreduce_knomial_init(ucc_base_coll_args_t *coll_args, ucc_base_team_t * team, ucc_coll_task_t ** task_h); +ucc_status_t +ucc_tl_ucp_allreduce_sliding_window_init(ucc_base_coll_args_t *coll_args, + ucc_base_team_t * team, + ucc_coll_task_t ** task_h); + ucc_status_t ucc_tl_ucp_allreduce_knomial_init_common(ucc_tl_ucp_task_t *task); +ucc_status_t +ucc_tl_ucp_allreduce_sliding_window_alloc_pipe(ucc_base_coll_args_t *coll_args, + ucc_base_team_t * team, + ucc_tl_ucp_task_t * task); + +ucc_status_t +ucc_tl_ucp_allreduce_sliding_window_task_init(ucc_base_coll_args_t *coll_args, + ucc_base_team_t * team, + ucc_tl_ucp_task_t * task); + +ucc_status_t ucc_tl_ucp_allreduce_sliding_window_allgather_info_finalize( + ucc_service_coll_req_t *scoll_req, ucc_tl_ucp_task_t *sw_task); + +ucc_status_t +ucc_tl_ucp_allreduce_sliding_window_free_gwbi(ucc_coll_task_t *coll_task); + ucc_status_t ucc_tl_ucp_allreduce_knomial_start(ucc_coll_task_t *task); void ucc_tl_ucp_allreduce_knomial_progress(ucc_coll_task_t *task); +ucc_status_t +ucc_tl_ucp_allreduce_sliding_window_start(ucc_coll_task_t *coll_task); + +void ucc_tl_ucp_allreduce_sliding_window_progress(ucc_coll_task_t *task); + +ucc_status_t +ucc_tl_ucp_allreduce_sliding_window_finalize(ucc_coll_task_t *task); + ucc_status_t ucc_tl_ucp_allreduce_knomial_finalize(ucc_coll_task_t *task); -ucc_status_t ucc_tl_ucp_allreduce_sra_knomial_init(ucc_base_coll_args_t *coll_args, - ucc_base_team_t * team, - ucc_coll_task_t ** task_h); +ucc_status_t +ucc_tl_ucp_allreduce_sra_knomial_init(ucc_base_coll_args_t *coll_args, + ucc_base_team_t * team, + ucc_coll_task_t ** task_h); ucc_status_t ucc_tl_ucp_allreduce_sra_knomial_start(ucc_coll_task_t *task); diff --git a/src/components/tl/ucp/allreduce/allreduce_sliding_window.c b/src/components/tl/ucp/allreduce/allreduce_sliding_window.c new file mode 100644 index 0000000000..cacd8ee720 --- /dev/null +++ b/src/components/tl/ucp/allreduce/allreduce_sliding_window.c @@ -0,0 +1,71 @@ +/** + * Copyright(c) 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * + * See file LICENSE for terms. + */ + +#include "allreduce.h" +#include "../allgather/allgather.h" +#include "../barrier/barrier.h" +#include "utils/ucc_dt_reduce.h" +#include "tl_ucp_ep.h" + +static inline void +ucc_tl_ucp_allreduce_sliding_window_reset_buf(ucc_tl_ucp_allreduce_sw_buf *buf) +{ +} + +static inline void ucc_tl_ucp_allreduce_sliding_window_reset_pipeline( + ucc_tl_ucp_allreduce_sw_pipeline *pipe, ucc_rank_t rank, + size_t put_window_size) +{ +} + +ucc_status_t +ucc_tl_ucp_allreduce_sliding_window_start(ucc_coll_task_t *coll_task) +{ + return UCC_OK; +} + +ucc_status_t +ucc_tl_ucp_allreduce_sliding_window_finalize(ucc_coll_task_t *coll_task) +{ + return UCC_OK; +} + +static inline void ucc_tl_ucp_allreduce_sliding_window_reduction( + ucc_coll_task_t *coll_task, ucc_tl_ucp_allreduce_sw_buf *accbuf, + ucc_tl_ucp_allreduce_sw_buf *getbuf) +{ +} + +static inline void +ucc_tl_ucp_allreduce_sliding_window_test_reduction(ucc_tl_ucp_task_t *task) +{ +} + +static inline ucc_status_t +ucc_tl_ucp_allreduce_sliding_window_req_test(ucs_status_ptr_t request, + ucc_tl_ucp_task_t *task) +{ + return UCC_OK; +} + +static inline void ucc_tl_ucp_allreduce_sliding_window_allgather_info_test( + ucc_coll_task_t *coll_task) +{ +} + +static inline void ucc_tl_ucp_allreduce_sliding_window_allgather_free_rkeys( + ucc_coll_task_t *coll_task) +{ +} + +static inline void +ucc_tl_ucp_allreduce_sliding_window_barrier(ucc_coll_task_t *coll_task) +{ +} + +void ucc_tl_ucp_allreduce_sliding_window_progress(ucc_coll_task_t *coll_task) +{ +} diff --git a/src/components/tl/ucp/allreduce/allreduce_sliding_window_setup.c b/src/components/tl/ucp/allreduce/allreduce_sliding_window_setup.c new file mode 100644 index 0000000000..3033f5f444 --- /dev/null +++ b/src/components/tl/ucp/allreduce/allreduce_sliding_window_setup.c @@ -0,0 +1,38 @@ +/** + * Copyright(c) 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * + * See file LICENSE for terms. + */ + +#include "allreduce.h" +#include "../allgather/allgather.h" +#include "utils/ucc_dt_reduce.h" +#include "tl_ucp_ep.h" + +ucc_status_t +ucc_tl_ucp_allreduce_sliding_window_task_init(ucc_base_coll_args_t *coll_args, + ucc_base_team_t * team, + ucc_tl_ucp_task_t * task) +{ + return UCC_OK; +} + +ucc_status_t ucc_tl_ucp_allreduce_sliding_window_allgather_info_finalize( + ucc_service_coll_req_t *scoll_req, ucc_tl_ucp_task_t *sw_task) +{ + return UCC_OK; +} + +ucc_status_t +ucc_tl_ucp_allreduce_sliding_window_alloc_pipe(ucc_base_coll_args_t *coll_args, + ucc_base_team_t * team, + ucc_tl_ucp_task_t * task) +{ + return UCC_OK; +} + +ucc_status_t +ucc_tl_ucp_allreduce_sliding_window_free_gwbi(ucc_coll_task_t *coll_task) +{ + return UCC_OK; +} diff --git a/src/components/tl/ucp/barrier/barrier.c b/src/components/tl/ucp/barrier/barrier.c index 84170dce7b..19f82d6021 100644 --- a/src/components/tl/ucp/barrier/barrier.c +++ b/src/components/tl/ucp/barrier/barrier.c @@ -7,9 +7,6 @@ #include "tl_ucp.h" #include "barrier.h" -ucc_status_t ucc_tl_ucp_barrier_knomial_start(ucc_coll_task_t *task); -void ucc_tl_ucp_barrier_knomial_progress(ucc_coll_task_t *task); - ucc_base_coll_alg_info_t ucc_tl_ucp_barrier_algs[UCC_TL_UCP_BARRIER_ALG_LAST + 1] = { [UCC_TL_UCP_BARRIER_ALG_KNOMIAL] = diff --git a/src/components/tl/ucp/barrier/barrier.h b/src/components/tl/ucp/barrier/barrier.h index de16490621..a6c7c65336 100644 --- a/src/components/tl/ucp/barrier/barrier.h +++ b/src/components/tl/ucp/barrier/barrier.h @@ -18,4 +18,7 @@ extern ucc_base_coll_alg_info_t ucc_status_t ucc_tl_ucp_barrier_init(ucc_tl_ucp_task_t *task); +ucc_status_t ucc_tl_ucp_barrier_knomial_start(ucc_coll_task_t *task); +void ucc_tl_ucp_barrier_knomial_progress(ucc_coll_task_t *task); + #endif diff --git a/src/components/tl/ucp/tl_ucp.c b/src/components/tl/ucp/tl_ucp.c index 3a07c81f65..83fa7dceeb 100644 --- a/src/components/tl/ucp/tl_ucp.c +++ b/src/components/tl/ucp/tl_ucp.c @@ -104,6 +104,23 @@ ucc_config_field_t ucc_tl_ucp_lib_config_table[] = { ucc_offsetof(ucc_tl_ucp_lib_config_t, allreduce_kn_radix), UCC_CONFIG_TYPE_UINT_RANGED}, + {"ALLREDUCE_SLIDING_WIN_BUF_SIZE", "65536", + "Buffer size of the sliding window allreduce algorithm", + ucc_offsetof(ucc_tl_ucp_lib_config_t, allreduce_sliding_window_buf_size), + UCC_CONFIG_TYPE_MEMUNITS}, + + {"ALLREDUCE_SLIDING_WIN_PUT_WINDOW_SIZE", "0", + "Buffer size for sliding window allreduce. <= 0 means set to team size", + ucc_offsetof(ucc_tl_ucp_lib_config_t, + allreduce_sliding_window_put_window_size), + UCC_CONFIG_TYPE_UINT}, + + {"ALLREDUCE_SLIDING_WIN_NUM_GET_BUFS", "0", + "Buffer size for sliding window allreduce. <= 0 means set to team size", + ucc_offsetof(ucc_tl_ucp_lib_config_t, + allreduce_sliding_window_num_get_bufs), + UCC_CONFIG_TYPE_UINT}, + {"ALLREDUCE_SRA_KN_RADIX", "auto", "Radix of the scatter-reduce-allgather (SRA) knomial allreduce algorithm", ucc_offsetof(ucc_tl_ucp_lib_config_t, allreduce_sra_kn_radix), diff --git a/src/components/tl/ucp/tl_ucp.h b/src/components/tl/ucp/tl_ucp.h index 75fdc76de5..6d51d37bbe 100644 --- a/src/components/tl/ucp/tl_ucp.h +++ b/src/components/tl/ucp/tl_ucp.h @@ -48,6 +48,9 @@ typedef struct ucc_tl_ucp_lib_config { uint32_t fanin_kn_radix; uint32_t fanout_kn_radix; uint32_t barrier_kn_radix; + size_t allreduce_sliding_window_buf_size; + uint32_t allreduce_sliding_window_put_window_size; + uint32_t allreduce_sliding_window_num_get_bufs; ucc_mrange_uint_t allreduce_kn_radix; ucc_mrange_uint_t allreduce_sra_kn_radix; uint32_t reduce_scatter_kn_radix; diff --git a/src/components/tl/ucp/tl_ucp_coll.c b/src/components/tl/ucp/tl_ucp_coll.c index bbbac03fc7..e3dd1782af 100644 --- a/src/components/tl/ucp/tl_ucp_coll.c +++ b/src/components/tl/ucp/tl_ucp_coll.c @@ -268,6 +268,9 @@ ucc_status_t ucc_tl_ucp_alg_id_to_init(int alg_id, const char *alg_id_str, case UCC_TL_UCP_ALLREDUCE_ALG_SRA_KNOMIAL: *init = ucc_tl_ucp_allreduce_sra_knomial_init; break; + case UCC_TL_UCP_ALLREDUCE_ALG_SLIDING_WINDOW: + *init = ucc_tl_ucp_allreduce_sliding_window_init; + break; default: status = UCC_ERR_INVALID_PARAM; break; diff --git a/src/components/tl/ucp/tl_ucp_coll.h b/src/components/tl/ucp/tl_ucp_coll.h index c2a260b103..a4def89286 100644 --- a/src/components/tl/ucp/tl_ucp_coll.h +++ b/src/components/tl/ucp/tl_ucp_coll.h @@ -86,6 +86,11 @@ enum ucc_tl_ucp_task_flags { UCC_TL_UCP_TASK_FLAG_SUBSET = UCC_BIT(0), }; +typedef struct ucc_tl_ucp_allreduce_sw_pipeline + ucc_tl_ucp_allreduce_sw_pipeline; +typedef struct ucc_tl_ucp_allreduce_sw_host_allgather + ucc_tl_ucp_allreduce_sw_host_allgather; + typedef struct ucc_tl_ucp_task { ucc_coll_task_t super; uint32_t flags; @@ -119,6 +124,27 @@ typedef struct ucc_tl_ucp_task { ucc_ee_executor_task_t *etask; ucc_ee_executor_t *executor; } allreduce_kn; + struct { + int reduce_in_progress; + ucp_rkey_h * src_rkeys; //unpacked + ucp_rkey_h * dst_rkeys; //unpacked + ucp_ep_h * eps; + void ** sbufs; + void ** rbufs; + ucc_coll_task_t * allreduce_task_h; + ucc_tl_ucp_allreduce_sw_pipeline * pipe; + ucc_ee_executor_task_t * etask; + ucc_ee_executor_t * executor; + int put_window_size; + int num_get_bufs; + ucs_status_ptr_t * put_requests; + ucc_service_coll_req_t * allgather_scoll_req; + ucc_tl_ucp_allreduce_sw_host_allgather * allgather_data; + ucc_coll_task_t * barrier_task; + struct ucc_tl_ucp_allreduce_sw_export_buf *src_ebuf; + struct ucc_tl_ucp_allreduce_sw_export_buf *dst_ebuf; + int inplace; + } allreduce_sliding_window; struct { int phase; ucc_knomial_pattern_t p; @@ -253,7 +279,8 @@ static inline void ucc_tl_ucp_task_reset(ucc_tl_ucp_task_t *task, static inline ucc_tl_ucp_task_t *ucc_tl_ucp_get_task(ucc_tl_ucp_team_t *team) { ucc_tl_ucp_context_t *ctx = UCC_TL_UCP_TEAM_CTX(team); - ucc_tl_ucp_task_t *task = ucc_mpool_get(&ctx->req_mp);; + ucc_tl_ucp_task_t *task = (ucc_tl_ucp_task_t*) + ucc_mpool_get(&ctx->req_mp); UCC_TL_UCP_PROFILE_REQUEST_NEW(task, "tl_ucp_task", 0); task->super.flags = 0; @@ -280,7 +307,7 @@ ucc_tl_ucp_get_schedule(ucc_tl_ucp_team_t *team, { ucc_tl_ucp_context_t *ctx = UCC_TL_UCP_TEAM_CTX(team); - *schedule = ucc_mpool_get(&ctx->req_mp); + *schedule = (ucc_tl_ucp_schedule_t*) ucc_mpool_get(&ctx->req_mp); if (ucc_unlikely(!(*schedule))) { return UCC_ERR_NO_MEMORY; diff --git a/test/gtest/Makefile.am b/test/gtest/Makefile.am index 591b2cf005..08cb7159b6 100644 --- a/test/gtest/Makefile.am +++ b/test/gtest/Makefile.am @@ -53,6 +53,7 @@ gtest_CPPFLAGS = \ -I$(top_srcdir)/test \ -I$(top_builddir)/src \ -I$(top_srcdir)/test/gtest \ + -I$(top_srcdir)/src/components/tl/ucp \ $(GTEST_CPPFLAGS) gtest_LDFLAGS = $(GTEST_LDFLAGS) -pthread -no-install -Wl,-dynamic-list-data \ @@ -64,47 +65,48 @@ gtest_CXXFLAGS = -std=gnu++11 \ -DGTEST_UCM_HOOK_LIB_DIR="\"${abs_builddir}/ucm/test_dlopen/.libs\"" \ -DGTEST_UCC_TOP_SRCDIR="\"${UCC_TOP_SRCDIR}\"" -gtest_SOURCES = \ - common/gtest-all.cc \ - common/test_obj_size.cc \ - common/main.cc \ - common/test_ucc.cc \ - tl/tl_test.cc \ - core/test_lib_config.cc \ - core/test_lib.cc \ - core/test_context_config.cc \ - core/test_context.cc \ - core/test_mc.cc \ - core/test_mc_reduce.cc \ - core/test_team.cc \ - core/test_schedule.cc \ - core/test_topo.cc \ - core/test_service_coll.cc \ - core/test_timeout.cc \ - core/test_utils.cc \ - coll/test_barrier.cc \ - coll/test_alltoall.cc \ - coll/test_alltoallv.cc \ - coll/test_allgather.cc \ - coll/test_allgatherv.cc \ - coll/test_gather.cc \ - coll/test_gatherv.cc \ - coll/test_bcast.cc \ - coll/test_reduce.cc \ - coll/test_allreduce.cc \ - coll/test_reduce_scatter.cc \ - coll/test_reduce_scatterv.cc \ - coll/test_scatter.cc \ - coll/test_scatterv.cc \ - utils/test_string.cc \ - utils/test_ep_map.cc \ - utils/test_lock_free_queue.cc \ - utils/test_math.cc \ - utils/test_cfg_file.cc \ - utils/test_parser.cc \ - coll_score/test_score.cc \ - coll_score/test_score_str.cc \ - coll_score/test_score_update.cc \ +gtest_SOURCES = \ + common/gtest-all.cc \ + common/test_obj_size.cc \ + common/main.cc \ + common/test_ucc.cc \ + tl/tl_test.cc \ + core/test_lib_config.cc \ + core/test_lib.cc \ + core/test_context_config.cc \ + core/test_context.cc \ + core/test_mc.cc \ + core/test_mc_reduce.cc \ + core/test_team.cc \ + core/test_schedule.cc \ + core/test_topo.cc \ + core/test_service_coll.cc \ + core/test_timeout.cc \ + core/test_utils.cc \ + coll/test_barrier.cc \ + coll/test_alltoall.cc \ + coll/test_alltoallv.cc \ + coll/test_allgather.cc \ + coll/test_allgatherv.cc \ + coll/test_gather.cc \ + coll/test_gatherv.cc \ + coll/test_bcast.cc \ + coll/test_reduce.cc \ + coll/test_allreduce_sliding_window.cc \ + coll/test_allreduce.cc \ + coll/test_reduce_scatter.cc \ + coll/test_reduce_scatterv.cc \ + coll/test_scatter.cc \ + coll/test_scatterv.cc \ + utils/test_string.cc \ + utils/test_ep_map.cc \ + utils/test_lock_free_queue.cc \ + utils/test_math.cc \ + utils/test_cfg_file.cc \ + utils/test_parser.cc \ + coll_score/test_score.cc \ + coll_score/test_score_str.cc \ + coll_score/test_score_update.cc \ active_set/test_active_set.cc if TL_MLX5_ENABLED @@ -134,13 +136,18 @@ gtest_LDFLAGS += $(HIP_LDFLAGS) gtest_LDADD += $(HIP_LIBS) endif - -noinst_HEADERS = \ - common/gtest.h \ - common/test.h \ - common/test_ucc.h \ - core/test_context.h \ - core/test_mc_reduce.h \ +gtest_CXXFLAGS += $(UCX_CXXFLAGS) +gtest_CPPFLAGS += $(UCX_CPPFLAGS) +gtest_LDFLAGS += $(UCX_LDFLAGS) +gtest_LDADD += $(UCX_LIBS) $(UCX_LIBADD) + +noinst_HEADERS = \ + common/gtest.h \ + common/test.h \ + common/test_ucc.h \ + core/test_context.h \ + core/test_mc_reduce.h \ + coll/test_allreduce_sliding_window.h \ coll_score/test_score.h .PHONY: test test gdb valgrind fix_rpath ucc diff --git a/test/gtest/coll/test_allreduce.cc b/test/gtest/coll/test_allreduce.cc index dba5ecc8b6..3384f997e7 100644 --- a/test/gtest/coll/test_allreduce.cc +++ b/test/gtest/coll/test_allreduce.cc @@ -7,6 +7,9 @@ #include "common/test_ucc.h" #include "utils/ucc_math.h" +// For sliding window allreduce +#include "test_allreduce_sliding_window.h" + #include template @@ -23,8 +26,9 @@ class test_allreduce : public UccCollArgs, public testing::Test { ctxs[r] = (gtest_ucc_coll_ctx_t*)calloc(1, sizeof(gtest_ucc_coll_ctx_t)); ctxs[r]->args = coll; - coll->coll_type = UCC_COLL_TYPE_ALLREDUCE; - coll->op = T::redop; + coll->coll_type = UCC_COLL_TYPE_ALLREDUCE; + coll->op = T::redop; + coll->global_work_buffer = NULL; ctxs[r]->init_buf = ucc_malloc(ucc_dt_size(dt) * count, "init buf"); EXPECT_NE(ctxs[r]->init_buf, nullptr); @@ -396,6 +400,50 @@ TYPED_TEST(test_allreduce_alg, rab_pipelined) { } } +TYPED_TEST(test_allreduce_alg, sliding_window) +{ + int n_procs = 8; + ucc_job_env_t env = {{"UCC_TL_UCP_TUNE", "allreduce:@2"}, + {"UCC_CLS", "all"}}; + UccJob job(n_procs, UccJob::UCC_JOB_CTX_GLOBAL_ONESIDED, env); + UccTeam_h team = job.create_team(n_procs); + int repeat = 3; + UccCollCtxVec ctxs; + std::vector mt = {UCC_MEMORY_TYPE_HOST}; + ucp_info_t * ucp_infos = NULL; + + if (UCC_OK == ucc_mc_available( + UCC_MEMORY_TYPE_CUDA)) { //add cuda_managed for cl hier? + mt.push_back(UCC_MEMORY_TYPE_CUDA); + } + + for (auto count : {65536, 123567}) { + for (auto inplace : {TEST_NO_INPLACE, TEST_INPLACE}) { + for (auto m : mt) { + SET_MEM_TYPE(m); + this->set_inplace(inplace); + this->data_init(n_procs, TypeParam::dt, count, ctxs, true); + + // set args->global_work_buffer on each ctx + setup_gwbi(n_procs, ctxs, &ucp_infos, inplace == TEST_INPLACE); + + UccReq req(team, ctxs); + + for (auto i = 0; i < repeat; i++) { + req.start(); + req.wait(); + EXPECT_EQ(true, this->data_validate(ctxs)); + this->reset(ctxs); + } + + free_gwbi(n_procs, ctxs, ucp_infos, inplace == TEST_INPLACE); + ucp_infos = NULL; + this->data_fini(ctxs); + } + } + } +} + template class test_allreduce_avg_order : public test_allreduce { }; diff --git a/test/gtest/coll/test_allreduce_sliding_window.cc b/test/gtest/coll/test_allreduce_sliding_window.cc new file mode 100644 index 0000000000..de1304ca99 --- /dev/null +++ b/test/gtest/coll/test_allreduce_sliding_window.cc @@ -0,0 +1,180 @@ +/** + * Copyright (c) 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * See file LICENSE for terms. + */ + +/* + This file is for setting up the global work buffer for sliding window + allreduce. This entails allocating ucp workers, registering memory, + exchanging rkeys, and allocating the pipeline datastructure the + algorithm uses. +*/ + +#include "core/test_mc_reduce.h" +#include "common/test_ucc.h" +#include "utils/ucc_math.h" + +#include + +#include "test_allreduce_sliding_window.h" + +int ucp_init_ex(ucp_context_h *ucp_ctx) +{ + ucs_status_t ucs_status; + ucp_config_t *ucp_config; + ucp_params_t ucp_params; + ucp_context_h ucp_context; + + ucs_status = ucp_config_read(NULL, NULL, &ucp_config); + assert(ucs_status == UCS_OK); + + ucp_params.field_mask = UCP_PARAM_FIELD_FEATURES; + ucp_params.features = UCP_FEATURE_TAG | UCP_FEATURE_RMA | + UCP_FEATURE_AMO64 | UCP_FEATURE_EXPORTED_MEMH; + + ucs_status = ucp_init(&ucp_params, ucp_config, &ucp_context); + if (ucs_status != UCS_OK) { + printf("error on ucp init\n"); + return -1; + } + + *ucp_ctx = ucp_context; + + return 0; +} + +void ep_err_cb(void *arg, ucp_ep_h ep, ucs_status_t ucs_status) +{ + printf("Endpoint error detected, status: %s\n", + ucs_status_string(ucs_status)); +} + +int buffer_export_ucc(ucp_context_h ucp_context, void *buf, size_t len, + struct export_buf *ebuf) +{ + ucs_status_t ucs_status; + ucp_mem_map_params_t params; + ucp_memh_pack_params_t pack_params; + + ebuf->ucp_context = ucp_context; + + params.field_mask = + UCP_MEM_MAP_PARAM_FIELD_ADDRESS | UCP_MEM_MAP_PARAM_FIELD_LENGTH; + params.address = buf; + params.length = len; + + ucs_status = ucp_mem_map(ucp_context, ¶ms, &ebuf->memh); + assert(ucs_status == UCS_OK); + + pack_params.field_mask = UCP_MEMH_PACK_PARAM_FIELD_FLAGS; + pack_params.flags = UCP_MEMH_PACK_FLAG_EXPORT; + + ucs_status = ucp_memh_pack(ebuf->memh, &pack_params, &ebuf->packed_memh, + &ebuf->packed_memh_len); + if (ucs_status != UCS_OK) { + printf("ucp_memh_pack() returned error: %s\n", + ucs_status_string(ucs_status)); + ebuf->packed_memh = NULL; + ebuf->packed_memh_len = 0; + } + + return 0; +} + +void setup_gwbi(int n_procs, UccCollCtxVec &ctxs, + ucp_info_t **ucp_infos_p /* out */, bool inplace) +{ + int i; + + ucp_info_t *ucp_infos = + (ucp_info_t *)ucc_malloc(sizeof(ucp_info_t) * n_procs); + *ucp_infos_p = ucp_infos; + + // allocate gwbi + for (auto ctx : ctxs) { + ucc_tl_ucp_allreduce_sw_global_work_buf_info *gwbi = + (ucc_tl_ucp_allreduce_sw_global_work_buf_info *)ucc_malloc( + sizeof(ucc_tl_ucp_allreduce_sw_global_work_buf_info), + "global work buf info"); + + ctx->args->global_work_buffer = gwbi; + } + + // setup ucp contexts and workers + for (i = 0; i < n_procs; i++) { + ucp_info_t ucp_info; + ucp_init_ex(&ucp_info.ucp_ctx); + memcpy(&ucp_infos[i], &ucp_info, sizeof(ucp_info_t)); + } + + // set up packed src/dst memh + for (i = 0; i < n_procs; i++) { + // my proc's gwbi + ucc_tl_ucp_allreduce_sw_global_work_buf_info *gwbi = + (ucc_tl_ucp_allreduce_sw_global_work_buf_info *)ctxs[i] + ->args->global_work_buffer; + // my proc's ucp_info + ucp_info_t * ucp_info = &ucp_infos[i]; + struct export_buf *dst_ebuf = &ucp_info->dst_ebuf; + size_t dst_len = ctxs[i]->args->dst.info.count * + ucc_dt_size(ctxs[i]->args->dst.info.datatype); + + buffer_export_ucc(ucp_info->ucp_ctx, ctxs[i]->args->dst.info.buffer, + dst_len, dst_ebuf); + + gwbi->packed_dst_memh = dst_ebuf->packed_memh; + + if (!inplace) { + size_t src_len = ctxs[i]->args->src.info.count * + ucc_dt_size(ctxs[i]->args->src.info.datatype); + struct export_buf *src_ebuf = &ucp_info->src_ebuf; + buffer_export_ucc(ucp_info->ucp_ctx, ctxs[i]->args->src.info.buffer, + src_len, src_ebuf); + + gwbi->packed_src_memh = src_ebuf->packed_memh; + } + } + + // set the flag that indicates the global work buffer was passed + for (auto ctx : ctxs) { + ctx->args->mask |= + UCC_COLL_ARGS_FIELD_FLAGS | UCC_COLL_ARGS_FIELD_GLOBAL_WORK_BUFFER; + ctx->args->flags |= UCC_COLL_ARGS_FLAG_MEM_MAPPED_BUFFERS; + } +} + +void free_gwbi(int n_procs, UccCollCtxVec &ctxs, ucp_info_t *ucp_infos, + bool inplace) +{ + int i, k; + + // free sbufs, rbufs, src_rkeys, and dst_rkeys + for (i = 0; i < n_procs; i++) { + // my proc's ucp_info + ucp_info_t *ucp_info = &ucp_infos[i]; + + if (!inplace) { + struct export_buf *src_ebuf = &ucp_info->src_ebuf; + ucp_mem_unmap(ucp_info->ucp_ctx, src_ebuf->memh); + } + + struct export_buf *dst_ebuf = &ucp_info->dst_ebuf; + ucp_mem_unmap(ucp_info->ucp_ctx, dst_ebuf->memh); + } + + // free ucp contexts + for (i = 0; i < n_procs; i++) { + ucp_cleanup(ucp_infos[i].ucp_ctx); + } + + // free gwbi and each gwbi's set of pipes + for (k = 0; k < n_procs; k++) { + ucc_tl_ucp_allreduce_sw_global_work_buf_info *gwbi = + (ucc_tl_ucp_allreduce_sw_global_work_buf_info *)ctxs[k] + ->args->global_work_buffer; + + ucc_free(gwbi); + } + + ucc_free(ucp_infos); +} diff --git a/test/gtest/coll/test_allreduce_sliding_window.h b/test/gtest/coll/test_allreduce_sliding_window.h new file mode 100644 index 0000000000..734e58e6c9 --- /dev/null +++ b/test/gtest/coll/test_allreduce_sliding_window.h @@ -0,0 +1,30 @@ +#ifndef TEST_ALLREDUCE_SW_H +#define TEST_ALLREDUCE_SW_H + +#include "components/tl/ucp/allreduce/allreduce.h" + +struct export_buf { + ucp_context_h ucp_context; + ucp_mem_h memh; + void * packed_memh; + size_t packed_memh_len; + uint64_t memh_id; +}; + +typedef struct ucp_info { + ucp_context_h ucp_ctx; + struct export_buf src_ebuf; + struct export_buf dst_ebuf; +} ucp_info_t; + +void free_gwbi(int n_procs, UccCollCtxVec &ctxs, ucp_info_t *ucp_infos, + bool inplace); +void setup_gwbi(int n_procs, UccCollCtxVec &ctxs, + ucp_info_t **ucp_infos_p /* out */, bool inplace); +int buffer_export_ucc(ucp_context_h ucp_context, void *buf, size_t len, + struct export_buf *ebuf); +void ep_err_cb(void *arg, ucp_ep_h ep, ucs_status_t ucs_status); + +int ucp_init_ex(ucp_context_h *ucp_ctx); + +#endif