From df217dc70ec516b963aeea3226369c3453e0be89 Mon Sep 17 00:00:00 2001 From: Nicholas Sarkauskas Date: Tue, 9 Jul 2024 23:35:17 +0300 Subject: [PATCH] CORE: Implement weak asymmetric mem with gtests --- src/coll_score/ucc_coll_score_map.c | 2 + src/components/base/ucc_base_iface.h | 4 +- src/core/ucc_coll.c | 5 +- src/schedule/ucc_schedule.h | 67 ++- src/utils/ucc_coll_utils.c | 114 ++--- test/gtest/Makefile.am | 3 +- test/gtest/asym_mem/test_asymmetric_memory.cc | 417 ++++++++++++++++++ test/gtest/coll/test_reduce.cc | 128 ++---- test/gtest/common/test_ucc.cc | 6 - test/gtest/common/test_ucc.h | 19 - 10 files changed, 538 insertions(+), 227 deletions(-) create mode 100644 test/gtest/asym_mem/test_asymmetric_memory.cc diff --git a/src/coll_score/ucc_coll_score_map.c b/src/coll_score/ucc_coll_score_map.c index 26c53f935a..0d5efe186b 100644 --- a/src/coll_score/ucc_coll_score_map.c +++ b/src/coll_score/ucc_coll_score_map.c @@ -93,6 +93,8 @@ static ucc_status_t ucc_coll_score_map_lookup(ucc_score_map_t *map, mt = UCC_MEMORY_TYPE_HOST; } if (!ucc_coll_args_is_mem_symmetric(&bargs->args, map->team_rank)) { + ucc_debug("Mem was asymmetric even though asymmetric memory should " + "be handled prior to finding coll score"); return UCC_ERR_INVALID_PARAM; } if (msgsize == UCC_MSG_SIZE_INVALID || msgsize == UCC_MSG_SIZE_ASYMMETRIC) { diff --git a/src/components/base/ucc_base_iface.h b/src/components/base/ucc_base_iface.h index d177e877dd..449ce5c7f0 100644 --- a/src/components/base/ucc_base_iface.h +++ b/src/components/base/ucc_base_iface.h @@ -161,8 +161,8 @@ enum { typedef struct ucc_asymmetric_save_info { union { - ucc_coll_buffer_info_t info; /*!< Buffer info for the collective */ - ucc_coll_buffer_info_v_t info_v; /*!< Buffer info for the collective */ + ucc_coll_buffer_info_t info; + ucc_coll_buffer_info_v_t info_v; } old_asymmetric_buffer; ucc_mc_buffer_header_t *scratch; } ucc_asymmetric_save_info_t; diff --git a/src/core/ucc_coll.c b/src/core/ucc_coll.c index cf64ab1cc3..2b58b375c7 100644 --- a/src/core/ucc_coll.c +++ b/src/core/ucc_coll.c @@ -178,8 +178,6 @@ UCC_CORE_PROFILE_FUNC(ucc_status_t, ucc_collective_init, ucc_ee_type_t coll_ee_type; size_t coll_size; - printf("nick in ucc_collective_init: dst buffer=%p\n", coll_args->dst.info.buffer); - if (ucc_unlikely(team->state != UCC_TEAM_ACTIVE)) { ucc_error("team %p is used before team create is completed", team); return UCC_ERR_INVALID_PARAM; @@ -250,11 +248,10 @@ UCC_CORE_PROFILE_FUNC(ucc_status_t, ucc_collective_init, goto free_scratch; } - coll_mem_type = ucc_coll_args_mem_type(&op_args.args, team->rank); - task->flags |= UCC_COLL_TASK_FLAG_TOP_LEVEL; if (task->flags & UCC_COLL_TASK_FLAG_EXECUTOR) { task->flags |= UCC_COLL_TASK_FLAG_EXECUTOR_STOP; + coll_mem_type = ucc_coll_args_mem_type(&op_args.args, team->rank); switch(coll_mem_type) { case UCC_MEMORY_TYPE_CUDA: case UCC_MEMORY_TYPE_CUDA_MANAGED: diff --git a/src/schedule/ucc_schedule.h b/src/schedule/ucc_schedule.h index bead6ee819..63af525de4 100644 --- a/src/schedule/ucc_schedule.h +++ b/src/schedule/ucc_schedule.h @@ -166,6 +166,53 @@ ucc_status_t ucc_dependency_handler(ucc_coll_task_t *parent, ucc_status_t ucc_triggered_post(ucc_ee_h ee, ucc_ev_t *ev, ucc_coll_task_t *task); +static inline +ucc_status_t ucc_copy_asymmetric_buffer_out(ucc_asymmetric_save_info_t *save) +{ + ucc_status_t status = UCC_OK; + status = ucc_mc_memcpy(save->old_asymmetric_buffer.info.buffer, + save->scratch->addr, + ucc_dt_size(save->old_asymmetric_buffer.info.datatype) * + save->old_asymmetric_buffer.info.count, + save->old_asymmetric_buffer.info.mem_type, + save->scratch->mt); + if (ucc_unlikely(status != UCC_OK)) { + ucc_error("error copying back to old asymmetric buffer: %s", + ucc_status_string(status)); + } + status = ucc_mc_free(save->scratch); + if (ucc_unlikely(status != UCC_OK)) { + ucc_error("error freeing scratch asymmetric buffer: %s", + ucc_status_string(status)); + } + return status; +} + +static inline +ucc_status_t ucc_copy_asymmetric_buffer_v_out(ucc_coll_task_t *task) +{ + ucc_status_t status = UCC_OK; + ucc_coll_args_t *coll_args = &task->bargs.args; + ucc_asymmetric_save_info_t *save = &task->bargs.asymmetric_save_info; + ucc_rank_t size = task->team->params.size; + status = ucc_mc_memcpy(save->old_asymmetric_buffer.info_v.buffer, + save->scratch->addr, + ucc_coll_args_get_total_count(coll_args, + coll_args->dst.info_v.counts, size), + save->old_asymmetric_buffer.info_v.mem_type, + save->scratch->mt); + if (ucc_unlikely(status != UCC_OK)) { + ucc_error("error copying back to old asymmetric buffer: %s", + ucc_status_string(status)); + } + status = ucc_mc_free(save->scratch); + if (ucc_unlikely(status != UCC_OK)) { + ucc_error("error freeing scratch asymmetric buffer: %s", + ucc_status_string(status)); + } + return status; +} + static inline ucc_status_t ucc_task_complete(ucc_coll_task_t *task) { ucc_status_t status = task->status; @@ -188,20 +235,12 @@ static inline ucc_status_t ucc_task_complete(ucc_coll_task_t *task) if (ucc_likely(status == UCC_OK)) { ucc_asymmetric_save_info_t *save = &task->bargs.asymmetric_save_info; if (save->scratch != NULL) { - status = ucc_mc_memcpy(save->old_asymmetric_buffer.info.buffer, - save->scratch->addr, - ucc_dt_size(save->old_asymmetric_buffer.info.datatype) * - save->old_asymmetric_buffer.info.count, - save->old_asymmetric_buffer.info.mem_type, - save->scratch->mt); - if (ucc_unlikely(status != UCC_OK)) { - ucc_error("error copying back to old asymmetric buffer: %s", - ucc_status_string(status)); - } - status = ucc_mc_free(save->scratch); - if (ucc_unlikely(status != UCC_OK)) { - ucc_error("error freeing scratch asymmetric buffer: %s", - ucc_status_string(status)); + status = (task->bargs.args.coll_type == UCC_COLL_TYPE_GATHERV) ? + ucc_copy_asymmetric_buffer_v_out(task) : + ucc_copy_asymmetric_buffer_out(save); + if (status != UCC_OK) { + ucc_error("failure copying out asymmetric buffer: %s", + ucc_status_string(status)); } save->scratch = NULL; } diff --git a/src/utils/ucc_coll_utils.c b/src/utils/ucc_coll_utils.c index 139df127ac..fa54d606d3 100644 --- a/src/utils/ucc_coll_utils.c +++ b/src/utils/ucc_coll_utils.c @@ -95,119 +95,61 @@ ucc_coll_args_is_mem_symmetric(const ucc_coll_args_t *args, } -/* If the src/dst buffers are asymmetric, one of them needs to have a new - allocation */ +/* If this is the root and the src/dst buffers are asymmetric, the dst needs + to have a new allocation to make the mem types match. On task completion, + copy the result back into the old dst */ ucc_status_t ucc_coll_args_update_asymmetric_buffer(ucc_coll_args_t *args, ucc_team_h team, ucc_asymmetric_save_info_t *save_info) { - //ucc_rank_t root = args->root; - //ucc_rank_t rank = team->rank; ucc_status_t status = UCC_OK; if (UCC_IS_INPLACE(*args)) { return UCC_ERR_INVALID_PARAM; } switch (args->coll_type) { - case UCC_COLL_TYPE_BARRIER: - case UCC_COLL_TYPE_BCAST: - case UCC_COLL_TYPE_FANIN: - case UCC_COLL_TYPE_FANOUT: - return UCC_ERR_INVALID_PARAM; - case UCC_COLL_TYPE_ALLTOALL: - case UCC_COLL_TYPE_ALLREDUCE: - case UCC_COLL_TYPE_ALLGATHER: - case UCC_COLL_TYPE_REDUCE_SCATTER: - /*{ - if (args->dst.info.mem_type == mem_type) { - *old_buffer = args->src.info.buffer; - status = ucc_mc_alloc(args->src.info.buffer, ucc_dt_size(args->src.info.datatype) * args->src.info.count, mem_type); - if (ucc_unlikely(UCC_OK != status)) { - ucc_error("failed to allocate replacement memory for asymmetric buffer"); - return status; - } - } else { - *old_buffer = args->dst.info.buffer; - status = ucc_mc_alloc(args->dst.info.buffer, ucc_dt_size(args->dst.info.datatype) * args->dst.info.count, mem_type); - if (ucc_unlikely(UCC_OK != status)) { - ucc_error("failed to allocate replacement memory for asymmetric buffer"); - return status; - } - } - }*/ - case UCC_COLL_TYPE_ALLGATHERV: - case UCC_COLL_TYPE_REDUCE_SCATTERV: - /*{ - if (args->dst.info_v.mem_type == mem_type) { - *old_buffer = args->src.info.buffer; - status = ucc_mc_alloc(args->src.info.buffer, ucc_dt_size(args->src.info.datatype) * args->src.info.count, mem_type); - if (ucc_unlikely(UCC_OK != status)) { - ucc_error("failed to allocate replacement memory for asymmetric buffer"); - return status; - } - } else { - *old_buffer = args->dst.info_v.buffer; - status = ucc_mc_alloc(args->dst.info_v.buffer, ucc_dt_size(args->dst.info_v.datatype) * args->dst.info_v.counts[rank], mem_type); - if (ucc_unlikely(UCC_OK != status)) { - ucc_error("failed to allocate replacement memory for asymmetric buffer"); - return status; - } - } - }*/ - case UCC_COLL_TYPE_ALLTOALLV: - /*{ - ucc_count_t sum_count = 0; - ucc_rank_t i; - if (args->dst.info_v.mem_type == mem_type) { - *old_buffer = args->src.info_v.buffer; - for(i = 0; i < team->size; i++) { - sum_count += args->src.info_v.counts[i]; - } - status = ucc_mc_alloc(args->src.info_v.buffer, ucc_dt_size(args->src.info_v.datatype) * sum_count, mem_type); - if (ucc_unlikely(UCC_OK != status)) { - ucc_error("failed to allocate replacement memory for asymmetric buffer"); - return status; - } - } else { - *old_buffer = args->dst.info_v.buffer; - for(i = 0; i < team->size; i++) { - sum_count += args->dst.info_v.counts[i]; - } - status = ucc_mc_alloc(args->dst.info_v.buffer, ucc_dt_size(args->dst.info_v.datatype) * sum_count, mem_type); - if (ucc_unlikely(UCC_OK != status)) { - ucc_error("failed to allocate replacement memory for asymmetric buffer"); - return status; - } - } - }*/ case UCC_COLL_TYPE_REDUCE: case UCC_COLL_TYPE_GATHER: case UCC_COLL_TYPE_SCATTER: + case UCC_COLL_TYPE_SCATTERV: { + ucc_memory_type_t mem_type = args->src.info.mem_type; + if (args->coll_type == UCC_COLL_TYPE_SCATTERV) { + mem_type = args->src.info_v.mem_type; + } memcpy(&save_info->old_asymmetric_buffer.info, &args->dst.info, sizeof(ucc_coll_buffer_info_t)); status = ucc_mc_alloc(&save_info->scratch, ucc_dt_size(args->dst.info.datatype) * args->dst.info.count, - args->src.info.mem_type); + mem_type); if (ucc_unlikely(UCC_OK != status)) { ucc_error("failed to allocate replacement " "memory for asymmetric buffer"); return status; } args->dst.info.buffer = save_info->scratch->addr; - args->dst.info.mem_type = args->src.info.mem_type; + args->dst.info.mem_type = mem_type; + return UCC_OK; + } + case UCC_COLL_TYPE_GATHERV: + { + memcpy(&save_info->old_asymmetric_buffer.info_v, + &args->dst.info_v, sizeof(ucc_coll_buffer_info_v_t)); + status = ucc_mc_alloc(&save_info->scratch, + ucc_coll_args_get_total_count(args, + args->dst.info_v.counts, team->size), + args->src.info.mem_type); + if (ucc_unlikely(UCC_OK != status)) { + ucc_error("failed to allocate replacement " + "memory for asymmetric buffer"); + return status; + } + args->dst.info_v.buffer = save_info->scratch->addr; + args->dst.info_v.mem_type = args->src.info.mem_type; return UCC_OK; } - case UCC_COLL_TYPE_GATHERV: /* - return (root != rank ? NULL : ( - args->dst.info_v.mem_type == mem_type ? args->src.info.buffer : args->dst.info_v.buffer; - )) */ - case UCC_COLL_TYPE_SCATTERV: /* - return (root != rank ? NULL : ( - args->dst.info.mem_type == mem_type ? args->src.info_v.buffer : args->dst.info.buffer; - ))*/ default: break; } @@ -290,8 +232,8 @@ ucc_memory_type_t ucc_coll_args_mem_type(const ucc_coll_args_t *args, return UCC_MEMORY_TYPE_NOT_APPLY; case UCC_COLL_TYPE_BCAST: return args->src.info.mem_type; - case UCC_COLL_TYPE_ALLREDUCE: case UCC_COLL_TYPE_ALLTOALL: + case UCC_COLL_TYPE_ALLREDUCE: case UCC_COLL_TYPE_ALLGATHER: case UCC_COLL_TYPE_REDUCE_SCATTER: return args->dst.info.mem_type; diff --git a/test/gtest/Makefile.am b/test/gtest/Makefile.am index 49c60fe16b..2bad1bd09a 100644 --- a/test/gtest/Makefile.am +++ b/test/gtest/Makefile.am @@ -106,7 +106,8 @@ gtest_SOURCES = \ coll_score/test_score.cc \ coll_score/test_score_str.cc \ coll_score/test_score_update.cc \ - active_set/test_active_set.cc + active_set/test_active_set.cc \ + asym_mem/test_asymmetric_memory.cc if TL_MLX5_ENABLED gtest_SOURCES += tl/mlx5/test_tl_mlx5.cc \ diff --git a/test/gtest/asym_mem/test_asymmetric_memory.cc b/test/gtest/asym_mem/test_asymmetric_memory.cc new file mode 100644 index 0000000000..6d5f59dcd4 --- /dev/null +++ b/test/gtest/asym_mem/test_asymmetric_memory.cc @@ -0,0 +1,417 @@ +/** + * Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * See file LICENSE for terms. + */ + +#include "common/test_ucc.h" + +#ifdef HAVE_CUDA + +using Param = std::tuple; + +class test_asymmetric_memory : public ucc::test, + public ::testing::WithParamInterface +{ +public: + UccCollCtxVec ctxs; + void data_init(ucc_coll_type_t coll_type, ucc_memory_type_t src_mem_type, + ucc_memory_type_t dst_mem_type, UccTeam_h team) { + ucc_rank_t tsize = team->procs.size(); + int root = 0; + size_t msglen = 2048; + size_t src_modifier = 1; + size_t dst_modifier = 1; + ctxs.resize(tsize); + + if (coll_type == UCC_COLL_TYPE_GATHER) { + dst_modifier = tsize; + } else if (coll_type == UCC_COLL_TYPE_SCATTER) { + src_modifier = tsize; + } + + for (int i = 0; i < tsize; i++) { + ctxs[i] = (gtest_ucc_coll_ctx_t*) + calloc(1, sizeof(gtest_ucc_coll_ctx_t)); + ucc_coll_args_t *coll = (ucc_coll_args_t*) + calloc(1, sizeof(ucc_coll_args_t)); + + ctxs[i]->args = coll; + + coll->coll_type = coll_type; + coll->src.info.mem_type = src_mem_type; + coll->src.info.count = (ucc_count_t)msglen * src_modifier; + coll->src.info.datatype = UCC_DT_INT8; + coll->root = root; + + UCC_CHECK(ucc_mc_alloc(&ctxs[i]->src_mc_header, + msglen * src_modifier, src_mem_type)); + coll->src.info.buffer = ctxs[i]->src_mc_header->addr; + + ctxs[i]->init_buf = ucc_malloc(msglen * src_modifier, + "init buf"); + EXPECT_NE(ctxs[i]->init_buf, nullptr); + uint8_t *sbuf = (uint8_t*)ctxs[i]->init_buf; + for (int j = 0; j < msglen * src_modifier; j++) { + sbuf[j] = (uint8_t) 1; + } + UCC_CHECK(ucc_mc_memcpy(coll->src.info.buffer, + ctxs[i]->init_buf, + msglen * src_modifier, src_mem_type, + UCC_MEMORY_TYPE_HOST)); + + ctxs[i]->rbuf_size = msglen * dst_modifier; + if (i == root) { + UCC_CHECK(ucc_mc_alloc(&ctxs[i]->dst_mc_header, + ctxs[i]->rbuf_size, dst_mem_type)); + coll->dst.info.buffer = ctxs[i]->dst_mc_header->addr; + coll->dst.info.count = (ucc_count_t)ctxs[i]->rbuf_size; + coll->dst.info.datatype = UCC_DT_INT8; + coll->dst.info.mem_type = dst_mem_type; + } + } + } + + void data_fini() + { + for (int i = 0; i < ctxs.size(); i++) { + gtest_ucc_coll_ctx_t *ctx = ctxs[i]; + if (!ctx) { + continue; + } + ucc_coll_args_t* coll = ctx->args; + UCC_CHECK(ucc_mc_free(ctx->src_mc_header)); + if (i == coll->root) { + UCC_CHECK(ucc_mc_free(ctx->dst_mc_header)); + } + ucc_free(ctx->init_buf); + free(coll); + free(ctx); + } + ctxs.clear(); + } + + bool data_validate() + { + bool ret = true; + int root = 0; + uint8_t result = 1; + ucc_memory_type_t dst_mem_type; + uint8_t *rst; + + if (ctxs[0]->args->coll_type == UCC_COLL_TYPE_REDUCE) { + result = (uint8_t) ctxs.size(); + } + + for (int i = 0; i < ctxs.size(); i++) { + if (!ctxs[i]) { + continue; + } + + root = ctxs[i]->args->root; + + if (i == root) { + dst_mem_type = ctxs[root]->args->dst.info.mem_type; + + rst = (uint8_t*) ucc_malloc(ctxs[root]->rbuf_size, "validation buf"); + EXPECT_NE(rst, nullptr); + + UCC_CHECK(ucc_mc_memcpy(rst, ctxs[root]->args->dst.info.buffer, + ctxs[root]->rbuf_size, + UCC_MEMORY_TYPE_HOST, dst_mem_type)); + + for (int j = 0; j < ctxs[root]->rbuf_size; j++) { + if (result != rst[j]) { + ret = false; + break; + } + } + + ucc_free(rst); + } + } + + return ret; + } +}; + + +class test_asymmetric_memory_v : public ucc::test, + public ::testing::WithParamInterface +{ +public: + UccCollCtxVec ctxs; + void data_init(ucc_coll_type_t coll_type, ucc_memory_type_t src_mem_type, + ucc_memory_type_t dst_mem_type, UccTeam_h team) { + int nprocs = team->n_procs; + size_t count = 2048; + ucc_rank_t root = 0; + ucc_coll_args_t *coll; + int *counts, *displs; + size_t my_count, all_counts; + + ctxs.resize(nprocs); + for (auto r = 0; r < nprocs; r++) { + coll = (ucc_coll_args_t *)calloc(1, sizeof(ucc_coll_args_t)); + my_count = (nprocs - r) * count; + ctxs[r] = + (gtest_ucc_coll_ctx_t *)calloc(1, sizeof(gtest_ucc_coll_ctx_t)); + ctxs[r]->args = coll; + coll->mask = 0; + coll->flags = 0; + coll->coll_type = coll_type; + coll->root = root; + if (coll_type == UCC_COLL_TYPE_GATHERV) { + coll->src.info.mem_type = src_mem_type; + coll->src.info.count = (ucc_count_t)my_count; + coll->src.info.datatype = UCC_DT_INT8; + + ctxs[r]->init_buf = + ucc_malloc(ucc_dt_size(UCC_DT_INT8) * my_count, "init buf"); + ASSERT_NE(ctxs[r]->init_buf, nullptr); + for (int i = 0; i < my_count * ucc_dt_size(UCC_DT_INT8); i++) { + uint8_t *sbuf = (uint8_t *)ctxs[r]->init_buf; + sbuf[i] = ((i + r) % 256); + } + + if (r == root) { + all_counts = 0; + counts = (int*)malloc(sizeof(int) * nprocs); + ASSERT_NE(counts, nullptr); + displs = (int*)malloc(sizeof(int) * nprocs); + ASSERT_NE(displs, nullptr); + + for (int i = 0; i < nprocs; i++) { + counts[i] = (nprocs - i) * count; + displs[i] = all_counts; + all_counts += counts[i]; + } + + coll->dst.info_v.mem_type = dst_mem_type; + coll->dst.info_v.counts = (ucc_count_t *)counts; + coll->dst.info_v.displacements = (ucc_aint_t *)displs; + coll->dst.info_v.datatype = UCC_DT_INT8; + + ctxs[r]->rbuf_size = ucc_dt_size(UCC_DT_INT8) * all_counts; + UCC_CHECK(ucc_mc_alloc(&ctxs[r]->dst_mc_header, + ctxs[r]->rbuf_size, dst_mem_type)); + coll->dst.info_v.buffer = ctxs[r]->dst_mc_header->addr; + } + + UCC_CHECK(ucc_mc_alloc(&ctxs[r]->src_mc_header, + ucc_dt_size(UCC_DT_INT8) * my_count, + src_mem_type)); + coll->src.info.buffer = ctxs[r]->src_mc_header->addr; + UCC_CHECK(ucc_mc_memcpy(coll->src.info.buffer, + ctxs[r]->init_buf, + ucc_dt_size(UCC_DT_INT8) * my_count, + src_mem_type, UCC_MEMORY_TYPE_HOST)); + } else { + // scatterv + coll->dst.info.mem_type = dst_mem_type; + coll->dst.info.count = (ucc_count_t)my_count; + coll->dst.info.datatype = UCC_DT_INT8; + + if (r == root) { + all_counts = 0; + counts = (int*)malloc(sizeof(int) * nprocs); + ASSERT_NE(counts, nullptr); + displs = (int*)malloc(sizeof(int) * nprocs); + ASSERT_NE(displs, nullptr); + + for (int i = 0; i < nprocs; i++) { + counts[i] = (nprocs - i) * count; + displs[i] = all_counts; + all_counts += counts[i]; + } + + ctxs[r]->init_buf = + ucc_malloc(ucc_dt_size(UCC_DT_INT8) * all_counts, "init buf"); + ASSERT_NE(ctxs[r]->init_buf, nullptr); + uint8_t *sbuf = (uint8_t*)ctxs[r]->init_buf; + for (int p = 0; p < nprocs; p++) { + for (int i = 0; i < ucc_dt_size(UCC_DT_INT8) * counts[p]; i++) { + sbuf[(displs[p] * ucc_dt_size(UCC_DT_INT8) + i)] = + (uint8_t)((i + p) % 256); + } + } + + coll->src.info_v.mem_type = src_mem_type; + coll->src.info_v.counts = (ucc_count_t *)counts; + coll->src.info_v.displacements = (ucc_aint_t *)displs; + coll->src.info_v.datatype = UCC_DT_INT8; + + UCC_CHECK(ucc_mc_alloc(&ctxs[r]->src_mc_header, + ucc_dt_size(UCC_DT_INT8) * all_counts, + src_mem_type)); + coll->src.info_v.buffer = ctxs[r]->src_mc_header->addr; + UCC_CHECK(ucc_mc_memcpy(coll->src.info_v.buffer, + ctxs[r]->init_buf, ucc_dt_size(UCC_DT_INT8) * all_counts, + src_mem_type, UCC_MEMORY_TYPE_HOST)); + } + UCC_CHECK(ucc_mc_alloc(&ctxs[r]->dst_mc_header, + ucc_dt_size(UCC_DT_INT8) * my_count, + dst_mem_type)); + coll->dst.info.buffer = ctxs[r]->dst_mc_header->addr; + } + } + } + + bool data_validate() + { + bool ret = true; + int root = ctxs[0]->args->root; + int *displs = (int*)ctxs[root]->args->dst.info_v.displacements; + size_t dt_size; + ucc_memory_type_t dst_mem_type; + ucc_count_t my_count; + uint8_t *dsts; + + if (ctxs[root]->args->coll_type == UCC_COLL_TYPE_GATHERV) { + dt_size = ucc_dt_size(ctxs[root]->args->src.info.datatype); + dst_mem_type = ctxs[root]->args->dst.info_v.mem_type; + if (UCC_MEMORY_TYPE_HOST != dst_mem_type) { + dsts = (uint8_t *)ucc_malloc(ctxs[root]->rbuf_size, "dsts buf"); + ucc_assert(dsts != nullptr); + UCC_CHECK(ucc_mc_memcpy(dsts, ctxs[root]->args->dst.info_v.buffer, + ctxs[root]->rbuf_size, + UCC_MEMORY_TYPE_HOST, dst_mem_type)); + } else { + dsts = (uint8_t *)ctxs[root]->args->dst.info_v.buffer; + } + + for (int r = 0; r < ctxs.size(); r++) { + my_count = ctxs[r]->args->src.info.count; + for (int i = 0; i < my_count * dt_size; i++) { + if ((uint8_t)((i + r) % 256) != + dsts[(displs[r] * dt_size + i)]) { + ret = false; + break; + } + } + } + + if (UCC_MEMORY_TYPE_HOST != dst_mem_type) { + ucc_free(dsts); + } + } else { + // scatterv + dst_mem_type = ctxs[root]->args->dst.info.mem_type; + for (auto r = 0; r < ctxs.size(); r++) { + dt_size = ucc_dt_size((ctxs[r])->args->dst.info.datatype); + my_count = (ctxs[r])->args->dst.info.count; + if (UCC_MEMORY_TYPE_HOST != dst_mem_type) { + dsts = (uint8_t *)ucc_malloc(my_count * dt_size, "dsts buf"); + ucc_assert(dsts != nullptr); + UCC_CHECK(ucc_mc_memcpy(dsts, ctxs[r]->args->dst.info.buffer, + my_count * dt_size, + UCC_MEMORY_TYPE_HOST, dst_mem_type)); + } else { + dsts = (uint8_t *)ctxs[r]->args->dst.info.buffer; + } + + for (int i = 0; i < my_count * dt_size; i++) { + if ((uint8_t)((i + r) % 256) != + dsts[i]) { + ret = false; + break; + } + } + + if (UCC_MEMORY_TYPE_HOST != dst_mem_type) { + ucc_free(dsts); + if (!ret) { + break; + } + } + } + } + + return ret; + } + + void data_fini() + { + int root = ctxs[0]->args->root; + for (auto r = 0; r < ctxs.size(); r++) { + ucc_coll_args_t *coll = ctxs[r]->args; + if (coll->coll_type == UCC_COLL_TYPE_GATHERV) { + if (r == root) { + UCC_CHECK(ucc_mc_free(ctxs[r]->dst_mc_header)); + free(coll->dst.info_v.counts); + free(coll->dst.info_v.displacements); + } + UCC_CHECK(ucc_mc_free(ctxs[r]->src_mc_header)); + } else { + // scatterv + if (r == root) { + UCC_CHECK(ucc_mc_free(ctxs[r]->src_mc_header)); + free(coll->src.info_v.counts); + free(coll->src.info_v.displacements); + } + UCC_CHECK(ucc_mc_free(ctxs[r]->dst_mc_header)); + } + ucc_free(ctxs[r]->init_buf); + free(coll); + free(ctxs[r]); + } + ctxs.clear(); + } +}; + +#define TEST_ASYM_DECLARE \ + const ucc_coll_type_t coll_type = std::get<0>(GetParam()); \ + const ucc_memory_type_t src_mem_type = std::get<1>(GetParam()); \ + const ucc_memory_type_t dst_mem_type = std::get<2>(GetParam()); \ + const int n_procs = std::get<3>(GetParam()); \ + \ + UccJob job(n_procs, UccJob::UCC_JOB_CTX_GLOBAL); \ + UccTeam_h team = job.create_team(n_procs); \ + \ + data_init(coll_type, src_mem_type, dst_mem_type, team); \ + UccReq req(team, ctxs); \ + if (req.status != UCC_OK) { \ + data_fini(); \ + GTEST_SKIP() << "ucc_collective_init returned " \ + << ucc_status_string(req.status); \ + } \ + req.start(); \ + req.wait(); \ + EXPECT_EQ(true, data_validate()); \ + data_fini(); + +UCC_TEST_P(test_asymmetric_memory, single) +{ + TEST_ASYM_DECLARE +} + +INSTANTIATE_TEST_CASE_P +( + , test_asymmetric_memory, + ::testing::Combine + ( + ::testing::Values(UCC_COLL_TYPE_REDUCE, UCC_COLL_TYPE_GATHER, UCC_COLL_TYPE_SCATTER), // coll type (scatter may be skipped because tl/ucp does not support scatter) + ::testing::Values(UCC_MEMORY_TYPE_HOST, UCC_MEMORY_TYPE_CUDA), // src mem type + ::testing::Values(UCC_MEMORY_TYPE_HOST, UCC_MEMORY_TYPE_CUDA), // dst mem type + ::testing::Values(8) // n_procs + ) +); + +UCC_TEST_P(test_asymmetric_memory_v, single_v) +{ + TEST_ASYM_DECLARE +} + +INSTANTIATE_TEST_CASE_P +( + , test_asymmetric_memory_v, + ::testing::Combine + ( + ::testing::Values(UCC_COLL_TYPE_GATHERV, UCC_COLL_TYPE_SCATTERV), // coll type + ::testing::Values(UCC_MEMORY_TYPE_HOST, UCC_MEMORY_TYPE_CUDA), // src mem type + ::testing::Values(UCC_MEMORY_TYPE_HOST, UCC_MEMORY_TYPE_CUDA), // dst mem type + ::testing::Values(8) // n_procs + ) +); + +#endif diff --git a/test/gtest/coll/test_reduce.cc b/test/gtest/coll/test_reduce.cc index d75e180cfa..2fb1cbc963 100644 --- a/test/gtest/coll/test_reduce.cc +++ b/test/gtest/coll/test_reduce.cc @@ -43,46 +43,29 @@ class test_reduce : public UccCollArgs, public testing::Test { coll->coll_type = UCC_COLL_TYPE_REDUCE; coll->op = T::redop; coll->root = root; - if (r != root || !inplace) { - ucc_memory_type_t src_mem_type = mem_type; - -#ifdef HAVE_CUDA - if (mem_symmetry == TEST_MEM_ASYMMETRIC_SRC_MISMATCH) { - src_mem_type = ((mem_type == UCC_MEMORY_TYPE_CUDA) ? - UCC_MEMORY_TYPE_HOST : UCC_MEMORY_TYPE_CUDA); - } -#endif - coll->src.info.mem_type = src_mem_type; + coll->src.info.mem_type = mem_type; coll->src.info.count = (ucc_count_t)count; coll->src.info.datatype = dt; UCC_CHECK(ucc_mc_alloc(&ctxs[r]->src_mc_header, - ucc_dt_size(dt) * count, src_mem_type)); + ucc_dt_size(dt) * count, mem_type)); coll->src.info.buffer = ctxs[r]->src_mc_header->addr; UCC_CHECK(ucc_mc_memcpy(coll->src.info.buffer, ctxs[r]->init_buf, - ucc_dt_size(dt) * count, src_mem_type, + ucc_dt_size(dt) * count, mem_type, UCC_MEMORY_TYPE_HOST)); } if (r == root) { - ucc_memory_type_t dst_mem_type = mem_type; - -#ifdef HAVE_CUDA - if (mem_symmetry == TEST_MEM_ASYMMETRIC_DST_MISMATCH) { - dst_mem_type = ((mem_type == UCC_MEMORY_TYPE_CUDA) ? - UCC_MEMORY_TYPE_HOST : UCC_MEMORY_TYPE_CUDA); - } -#endif - coll->dst.info.mem_type = dst_mem_type; + coll->dst.info.mem_type = mem_type; coll->dst.info.count = (ucc_count_t)count; coll->dst.info.datatype = dt; UCC_CHECK(ucc_mc_alloc(&ctxs[r]->dst_mc_header, - ucc_dt_size(dt) * count, dst_mem_type)); + ucc_dt_size(dt) * count, mem_type)); coll->dst.info.buffer = ctxs[r]->dst_mc_header->addr; if (inplace) { UCC_CHECK(ucc_mc_memcpy(coll->dst.info.buffer, ctxs[r]->init_buf, ucc_dt_size(dt) * count, - dst_mem_type, UCC_MEMORY_TYPE_HOST)); + mem_type, UCC_MEMORY_TYPE_HOST)); } } if (inplace) { @@ -116,7 +99,7 @@ class test_reduce : public UccCollArgs, public testing::Test { size_t count = coll->dst.info.count; ucc_datatype_t dtype = coll->dst.info.datatype; clear_buffer(coll->dst.info.buffer, count * ucc_dt_size(dtype), - coll->dst.info.mem_type, 0); + mem_type, 0); if (TEST_INPLACE == inplace) { UCC_CHECK(ucc_mc_memcpy(coll->dst.info.buffer, ctxs[root]->init_buf, @@ -128,13 +111,13 @@ class test_reduce : public UccCollArgs, public testing::Test { size_t count = (ctxs[0])->args->src.info.count; typename T::type * dsts; - if (UCC_MEMORY_TYPE_HOST != mem_type || ctxs[root]->args->dst.info.mem_type != UCC_MEMORY_TYPE_HOST) { + if (UCC_MEMORY_TYPE_HOST != mem_type) { dsts = (typename T::type *) ucc_malloc(count * sizeof(typename T::type), "dsts buf"); EXPECT_NE(dsts, nullptr); UCC_CHECK(ucc_mc_memcpy(dsts, ctxs[root]->args->dst.info.buffer, count * sizeof(typename T::type), - UCC_MEMORY_TYPE_HOST, ctxs[root]->args->dst.info.mem_type)); + UCC_MEMORY_TYPE_HOST, mem_type)); } else { dsts = (typename T::type *)ctxs[root]->args->dst.info.buffer; } @@ -171,7 +154,7 @@ class test_reduce_cuda : public test_reduce {}; TYPED_TEST_CASE(test_reduce_host, CollReduceTypeOpsHost); TYPED_TEST_CASE(test_reduce_cuda, CollReduceTypeOpsCuda); -#define TEST_DECLARE(_mem_type, _inplace, _repeat, _persistent, _mem_sym) \ +#define TEST_DECLARE(_mem_type, _inplace, _repeat, _persistent) \ { \ std::array counts{4, 256, 65536}; \ CHECK_TYPE_OP_SKIP(TypeParam::dt, TypeParam::redop, _mem_type); \ @@ -181,7 +164,6 @@ TYPED_TEST_CASE(test_reduce_cuda, CollReduceTypeOpsCuda); int size = team->procs.size(); \ UccCollCtxVec ctxs; \ SET_MEM_TYPE(_mem_type); \ - SET_MEM_SYMMETRY(_mem_sym); \ this->set_inplace(_inplace); \ this->data_init(size, TypeParam::dt, count, ctxs, _persistent);\ UccReq req(team, ctxs); \ @@ -198,76 +180,50 @@ TYPED_TEST_CASE(test_reduce_cuda, CollReduceTypeOpsCuda); } TYPED_TEST(test_reduce_host, single) { - TEST_DECLARE(UCC_MEMORY_TYPE_HOST, TEST_NO_INPLACE, 1, 0, TEST_MEM_SYMMETRIC); + TEST_DECLARE(UCC_MEMORY_TYPE_HOST, TEST_NO_INPLACE, 1, 0); } TYPED_TEST(test_reduce_host, single_persistent) { - TEST_DECLARE(UCC_MEMORY_TYPE_HOST, TEST_NO_INPLACE, 3, 1, TEST_MEM_SYMMETRIC); + TEST_DECLARE(UCC_MEMORY_TYPE_HOST, TEST_NO_INPLACE, 3, 1); } TYPED_TEST(test_reduce_host, single_inplace) { - TEST_DECLARE(UCC_MEMORY_TYPE_HOST, TEST_INPLACE, 1, 0, TEST_MEM_SYMMETRIC); + TEST_DECLARE(UCC_MEMORY_TYPE_HOST, TEST_INPLACE, 1, 0); } TYPED_TEST(test_reduce_host, single_persistent_inplace) { - TEST_DECLARE(UCC_MEMORY_TYPE_HOST, TEST_INPLACE, 3, 1, TEST_MEM_SYMMETRIC); + TEST_DECLARE(UCC_MEMORY_TYPE_HOST, TEST_INPLACE, 3, 1); } #ifdef HAVE_CUDA - -// Symmetric TYPED_TEST(test_reduce_cuda, single) { - TEST_DECLARE(UCC_MEMORY_TYPE_CUDA, TEST_NO_INPLACE, 1, 0, TEST_MEM_SYMMETRIC); + TEST_DECLARE(UCC_MEMORY_TYPE_CUDA, TEST_NO_INPLACE, 1, 0); } TYPED_TEST(test_reduce_cuda, single_persistent) { - TEST_DECLARE(UCC_MEMORY_TYPE_CUDA, TEST_NO_INPLACE, 3, 1, TEST_MEM_SYMMETRIC); -} - -TYPED_TEST(test_reduce_cuda, single_managed) { - TEST_DECLARE(UCC_MEMORY_TYPE_CUDA_MANAGED, TEST_NO_INPLACE, 1, 0, TEST_MEM_SYMMETRIC); + TEST_DECLARE(UCC_MEMORY_TYPE_CUDA, TEST_NO_INPLACE, 3, 1); } - -TYPED_TEST(test_reduce_cuda, single_persistent_managed) { - TEST_DECLARE(UCC_MEMORY_TYPE_CUDA_MANAGED, TEST_NO_INPLACE, 3, 1, TEST_MEM_SYMMETRIC); -} - -// Asymmetric src mismatch CUDA -TYPED_TEST(test_reduce_cuda, single_asymmetric_src_mismatch_cuda) { - TEST_DECLARE(UCC_MEMORY_TYPE_CUDA, TEST_NO_INPLACE, 1, 0, TEST_MEM_ASYMMETRIC_SRC_MISMATCH); +TYPED_TEST(test_reduce_cuda, single_inplace) { + TEST_DECLARE(UCC_MEMORY_TYPE_CUDA, TEST_INPLACE, 1, 0); } -TYPED_TEST(test_reduce_cuda, single_persistent_asymmetric_src_mismatch_cuda) { - TEST_DECLARE(UCC_MEMORY_TYPE_CUDA, TEST_NO_INPLACE, 3, 1, TEST_MEM_ASYMMETRIC_SRC_MISMATCH); +TYPED_TEST(test_reduce_cuda, single_persistent_inplace) { + TEST_DECLARE(UCC_MEMORY_TYPE_CUDA, TEST_INPLACE, 3, 1); } - -// Asymmetric dst mismatch CUDA -TYPED_TEST(test_reduce_cuda, single_asymmetric_dst_mismatch_cuda) { - TEST_DECLARE(UCC_MEMORY_TYPE_CUDA, TEST_NO_INPLACE, 1, 0, TEST_MEM_ASYMMETRIC_DST_MISMATCH); -} - -TYPED_TEST(test_reduce_cuda, single_persistent_asymmetric_dst_mismatch_cuda) { - TEST_DECLARE(UCC_MEMORY_TYPE_CUDA, TEST_NO_INPLACE, 3, 1, TEST_MEM_ASYMMETRIC_DST_MISMATCH); -} - -// Asymmetric src mismatch HOST -TYPED_TEST(test_reduce_cuda, single_asymmetric_src_mismatch_host) { - TEST_DECLARE(UCC_MEMORY_TYPE_HOST, TEST_NO_INPLACE, 1, 0, TEST_MEM_ASYMMETRIC_SRC_MISMATCH); +TYPED_TEST(test_reduce_cuda, single_managed) { + TEST_DECLARE(UCC_MEMORY_TYPE_CUDA_MANAGED, TEST_NO_INPLACE, 1, 0); } -TYPED_TEST(test_reduce_cuda, single_persistent_asymmetric_src_mismatch_host) { - TEST_DECLARE(UCC_MEMORY_TYPE_HOST, TEST_NO_INPLACE, 3, 1, TEST_MEM_ASYMMETRIC_SRC_MISMATCH); +TYPED_TEST(test_reduce_cuda, single_persistent_managed) { + TEST_DECLARE(UCC_MEMORY_TYPE_CUDA_MANAGED, TEST_NO_INPLACE, 3, 1); } - -// Asymmetric dst mismatch HOST -TYPED_TEST(test_reduce_cuda, single_asymmetric_dst_mismatch_host) { - TEST_DECLARE(UCC_MEMORY_TYPE_HOST, TEST_NO_INPLACE, 1, 0, TEST_MEM_ASYMMETRIC_DST_MISMATCH); +TYPED_TEST(test_reduce_cuda, single_inplace_managed) { + TEST_DECLARE(UCC_MEMORY_TYPE_CUDA_MANAGED, TEST_INPLACE, 1, 0); } -TYPED_TEST(test_reduce_cuda, single_persistent_asymmetric_dst_mismatch_host) { - TEST_DECLARE(UCC_MEMORY_TYPE_HOST, TEST_NO_INPLACE, 3, 1, TEST_MEM_ASYMMETRIC_DST_MISMATCH); +TYPED_TEST(test_reduce_cuda, single_persistent_inplace_managed) { + TEST_DECLARE(UCC_MEMORY_TYPE_CUDA_MANAGED, TEST_INPLACE, 3, 1); } - #endif #define TEST_DECLARE_MULTIPLE(_mem_type, _inplace) \ @@ -330,7 +286,7 @@ template class test_reduce_dbt : public test_reduce { template class test_reduce_2step : public test_reduce { }; -#define TEST_DECLARE_WITH_ENV(_env, _n_procs, _persistent, _mem_sym) \ +#define TEST_DECLARE_WITH_ENV(_env, _n_procs, _persistent) \ { \ UccJob job(_n_procs, UccJob::UCC_JOB_CTX_GLOBAL, _env); \ UccTeam_h team = job.create_team(_n_procs); \ @@ -348,7 +304,6 @@ template class test_reduce_2step : public test_reduce { for (auto m : mt) { \ CHECK_TYPE_OP_SKIP(TypeParam::dt, TypeParam::redop, m); \ SET_MEM_TYPE(m); \ - SET_MEM_SYMMETRY(_mem_sym); \ this->set_inplace(inplace); \ this->data_init(_n_procs, TypeParam::dt, count, ctxs, \ _persistent); \ @@ -377,34 +332,17 @@ ucc_job_env_t reduce_2step_env = {{"UCC_CL_HIER_TUNE", "reduce:@2step:0-inf:inf" {"UCC_CLS", "all"}}; TYPED_TEST(test_reduce_avg_order, avg_post_op) { - TEST_DECLARE_WITH_ENV(post_op_env, 15, true, TEST_MEM_SYMMETRIC); + TEST_DECLARE_WITH_ENV(post_op_env, 15, true); } TYPED_TEST(test_reduce_dbt, reduce_dbt_shift) { - TEST_DECLARE_WITH_ENV(reduce_dbt_env, 15, true, TEST_MEM_SYMMETRIC); + TEST_DECLARE_WITH_ENV(reduce_dbt_env, 15, true); } TYPED_TEST(test_reduce_dbt, reduce_dbt_mirror) { - TEST_DECLARE_WITH_ENV(reduce_dbt_env, 16, true, TEST_MEM_SYMMETRIC); + TEST_DECLARE_WITH_ENV(reduce_dbt_env, 16, true); } TYPED_TEST(test_reduce_2step, 2step) { - TEST_DECLARE_WITH_ENV(reduce_2step_env, 16, false, TEST_MEM_SYMMETRIC); -} - -// Asymmetric memory -TYPED_TEST(test_reduce_avg_order, avg_post_op_asymmetric) { - TEST_DECLARE_WITH_ENV(post_op_env, 15, true, TEST_MEM_ASYMMETRIC_SRC_MISMATCH); -} - -TYPED_TEST(test_reduce_dbt, reduce_dbt_shift_asymmetric) { - TEST_DECLARE_WITH_ENV(reduce_dbt_env, 15, true, TEST_MEM_ASYMMETRIC_SRC_MISMATCH); -} - -TYPED_TEST(test_reduce_dbt, reduce_dbt_mirror_asymmetric) { - TEST_DECLARE_WITH_ENV(reduce_dbt_env, 16, true, TEST_MEM_ASYMMETRIC_DST_MISMATCH); -} - -TYPED_TEST(test_reduce_2step, 2step_asymmetric) { - TEST_DECLARE_WITH_ENV(reduce_2step_env, 16, false, TEST_MEM_ASYMMETRIC_DST_MISMATCH); + TEST_DECLARE_WITH_ENV(reduce_2step_env, 16, false); } diff --git a/test/gtest/common/test_ucc.cc b/test/gtest/common/test_ucc.cc index bf5232bff1..40b51c1f56 100644 --- a/test/gtest/common/test_ucc.cc +++ b/test/gtest/common/test_ucc.cc @@ -707,12 +707,6 @@ void UccCollArgs::set_inplace(gtest_ucc_inplace_t _inplace) inplace = _inplace; } -void UccCollArgs::set_mem_symmetry(gtest_ucc_mem_symmetry_t _mem_symmetry) -{ - ucc_assert(!inplace); - mem_symmetry = _mem_symmetry; -} - void clear_buffer(void *_buf, size_t size, ucc_memory_type_t mt, uint8_t value) { void *buf = _buf; diff --git a/test/gtest/common/test_ucc.h b/test/gtest/common/test_ucc.h index bc760dad5e..f16e014b54 100644 --- a/test/gtest/common/test_ucc.h +++ b/test/gtest/common/test_ucc.h @@ -36,17 +36,10 @@ typedef enum { TEST_INPLACE } gtest_ucc_inplace_t; -typedef enum { - TEST_MEM_SYMMETRIC, /* src/dst mem types match */ - TEST_MEM_ASYMMETRIC_SRC_MISMATCH, /* src != mem_type */ - TEST_MEM_ASYMMETRIC_DST_MISMATCH, /* dst != mem_type */ -} gtest_ucc_mem_symmetry_t; - class UccCollArgs { protected: ucc_memory_type_t mem_type; gtest_ucc_inplace_t inplace; - gtest_ucc_mem_symmetry_t mem_symmetry; void alltoallx_init_buf(int src_rank, int dst_rank, uint8_t *buf, size_t len) { for (int i = 0; i < len; i++) { @@ -72,7 +65,6 @@ class UccCollArgs { // defaults mem_type = UCC_MEMORY_TYPE_HOST; inplace = TEST_NO_INPLACE; - mem_symmetry = TEST_MEM_SYMMETRIC; } virtual ~UccCollArgs() {} virtual void data_init(int nprocs, ucc_datatype_t dtype, @@ -82,7 +74,6 @@ class UccCollArgs { virtual bool data_validate(UccCollCtxVec args) = 0; void set_mem_type(ucc_memory_type_t _mt); void set_inplace(gtest_ucc_inplace_t _inplace); - void set_mem_symmetry(gtest_ucc_mem_symmetry_t mem_symmetry); }; #define SET_MEM_TYPE(_mt) do { \ @@ -92,16 +83,6 @@ class UccCollArgs { this->mem_type = _mt; \ } while (0) -#define SET_MEM_SYMMETRY(_sym) do { \ - if (_sym != TEST_MEM_SYMMETRIC && \ - (UCC_OK != ucc_mc_available(UCC_MEMORY_TYPE_CUDA) || \ - UCC_OK != ucc_mc_available(UCC_MEMORY_TYPE_HOST) || \ - this->inplace)) { \ - GTEST_SKIP(); \ - } \ - this->mem_symmetry = _sym; \ - } while (0) - class ThreadAllgather; class ThreadAllgatherReq { public: