diff --git a/src/schedule/ucc_schedule.h b/src/schedule/ucc_schedule.h index bead6ee819..6e99047bb2 100644 --- a/src/schedule/ucc_schedule.h +++ b/src/schedule/ucc_schedule.h @@ -166,6 +166,50 @@ ucc_status_t ucc_dependency_handler(ucc_coll_task_t *parent, ucc_status_t ucc_triggered_post(ucc_ee_h ee, ucc_ev_t *ev, ucc_coll_task_t *task); +static inline ucc_status_t ucc_copy_asymmetric_buffer_out(ucc_asymmetric_save_info_t *save) +{ + ucc_status_t status = UCC_OK; + status = ucc_mc_memcpy(save->old_asymmetric_buffer.info.buffer, + save->scratch->addr, + ucc_dt_size(save->old_asymmetric_buffer.info.datatype) * + save->old_asymmetric_buffer.info.count, + save->old_asymmetric_buffer.info.mem_type, + save->scratch->mt); + if (ucc_unlikely(status != UCC_OK)) { + ucc_error("error copying back to old asymmetric buffer: %s", + ucc_status_string(status)); + } + status = ucc_mc_free(save->scratch); + if (ucc_unlikely(status != UCC_OK)) { + ucc_error("error freeing scratch asymmetric buffer: %s", + ucc_status_string(status)); + } + return status; +} + +static inline ucc_status_t ucc_copy_asymmetric_buffer_v_out(ucc_coll_task_t *task) +{ + ucc_status_t status = UCC_OK; + ucc_coll_args_t *coll_args = &task->bargs.args; + ucc_asymmetric_save_info_t *save = &task->bargs.asymmetric_save_info; + ucc_rank_t size = task->team->params.size; + status = ucc_mc_memcpy(save->old_asymmetric_buffer.info_v.buffer, + save->scratch->addr, + ucc_coll_args_get_total_count(coll_args, coll_args->dst.info_v.counts, size), + save->old_asymmetric_buffer.info_v.mem_type, + save->scratch->mt); + if (ucc_unlikely(status != UCC_OK)) { + ucc_error("error copying back to old asymmetric buffer: %s", + ucc_status_string(status)); + } + status = ucc_mc_free(save->scratch); + if (ucc_unlikely(status != UCC_OK)) { + ucc_error("error freeing scratch asymmetric buffer: %s", + ucc_status_string(status)); + } + return status; +} + static inline ucc_status_t ucc_task_complete(ucc_coll_task_t *task) { ucc_status_t status = task->status; @@ -188,20 +232,13 @@ static inline ucc_status_t ucc_task_complete(ucc_coll_task_t *task) if (ucc_likely(status == UCC_OK)) { ucc_asymmetric_save_info_t *save = &task->bargs.asymmetric_save_info; if (save->scratch != NULL) { - status = ucc_mc_memcpy(save->old_asymmetric_buffer.info.buffer, - save->scratch->addr, - ucc_dt_size(save->old_asymmetric_buffer.info.datatype) * - save->old_asymmetric_buffer.info.count, - save->old_asymmetric_buffer.info.mem_type, - save->scratch->mt); - if (ucc_unlikely(status != UCC_OK)) { - ucc_error("error copying back to old asymmetric buffer: %s", - ucc_status_string(status)); - } - status = ucc_mc_free(save->scratch); - if (ucc_unlikely(status != UCC_OK)) { - ucc_error("error freeing scratch asymmetric buffer: %s", - ucc_status_string(status)); + status = (task->bargs.args.coll_type == UCC_COLL_TYPE_GATHERV || + task->bargs.args.coll_type == UCC_COLL_TYPE_SCATTERV) ? + ucc_copy_asymmetric_buffer_v_out(task) : + ucc_copy_asymmetric_buffer_out(save); + if (status != UCC_OK) { + ucc_error("failure copying out asymmetric buffer: %s", + ucc_status_string(status)); } save->scratch = NULL; } diff --git a/src/utils/ucc_coll_utils.c b/src/utils/ucc_coll_utils.c index 14da4ff8d8..407db801da 100644 --- a/src/utils/ucc_coll_utils.c +++ b/src/utils/ucc_coll_utils.c @@ -102,85 +102,12 @@ ucc_coll_args_update_asymmetric_buffer(ucc_coll_args_t *args, ucc_team_h team, ucc_asymmetric_save_info_t *save_info) { - //ucc_rank_t root = args->root; - //ucc_rank_t rank = team->rank; ucc_status_t status = UCC_OK; if (UCC_IS_INPLACE(*args)) { return UCC_ERR_INVALID_PARAM; } switch (args->coll_type) { - case UCC_COLL_TYPE_BARRIER: - case UCC_COLL_TYPE_BCAST: - case UCC_COLL_TYPE_FANIN: - case UCC_COLL_TYPE_FANOUT: - return UCC_ERR_INVALID_PARAM; - case UCC_COLL_TYPE_ALLTOALL: - case UCC_COLL_TYPE_ALLREDUCE: - case UCC_COLL_TYPE_ALLGATHER: - case UCC_COLL_TYPE_REDUCE_SCATTER: - /*{ - if (args->dst.info.mem_type == mem_type) { - *old_buffer = args->src.info.buffer; - status = ucc_mc_alloc(args->src.info.buffer, ucc_dt_size(args->src.info.datatype) * args->src.info.count, mem_type); - if (ucc_unlikely(UCC_OK != status)) { - ucc_error("failed to allocate replacement memory for asymmetric buffer"); - return status; - } - } else { - *old_buffer = args->dst.info.buffer; - status = ucc_mc_alloc(args->dst.info.buffer, ucc_dt_size(args->dst.info.datatype) * args->dst.info.count, mem_type); - if (ucc_unlikely(UCC_OK != status)) { - ucc_error("failed to allocate replacement memory for asymmetric buffer"); - return status; - } - } - }*/ - case UCC_COLL_TYPE_ALLGATHERV: - case UCC_COLL_TYPE_REDUCE_SCATTERV: - /*{ - if (args->dst.info_v.mem_type == mem_type) { - *old_buffer = args->src.info.buffer; - status = ucc_mc_alloc(args->src.info.buffer, ucc_dt_size(args->src.info.datatype) * args->src.info.count, mem_type); - if (ucc_unlikely(UCC_OK != status)) { - ucc_error("failed to allocate replacement memory for asymmetric buffer"); - return status; - } - } else { - *old_buffer = args->dst.info_v.buffer; - status = ucc_mc_alloc(args->dst.info_v.buffer, ucc_dt_size(args->dst.info_v.datatype) * args->dst.info_v.counts[rank], mem_type); - if (ucc_unlikely(UCC_OK != status)) { - ucc_error("failed to allocate replacement memory for asymmetric buffer"); - return status; - } - } - }*/ - case UCC_COLL_TYPE_ALLTOALLV: - /*{ - ucc_count_t sum_count = 0; - ucc_rank_t i; - if (args->dst.info_v.mem_type == mem_type) { - *old_buffer = args->src.info_v.buffer; - for(i = 0; i < team->size; i++) { - sum_count += args->src.info_v.counts[i]; - } - status = ucc_mc_alloc(args->src.info_v.buffer, ucc_dt_size(args->src.info_v.datatype) * sum_count, mem_type); - if (ucc_unlikely(UCC_OK != status)) { - ucc_error("failed to allocate replacement memory for asymmetric buffer"); - return status; - } - } else { - *old_buffer = args->dst.info_v.buffer; - for(i = 0; i < team->size; i++) { - sum_count += args->dst.info_v.counts[i]; - } - status = ucc_mc_alloc(args->dst.info_v.buffer, ucc_dt_size(args->dst.info_v.datatype) * sum_count, mem_type); - if (ucc_unlikely(UCC_OK != status)) { - ucc_error("failed to allocate replacement memory for asymmetric buffer"); - return status; - } - } - }*/ case UCC_COLL_TYPE_REDUCE: case UCC_COLL_TYPE_GATHER: case UCC_COLL_TYPE_SCATTER: @@ -200,10 +127,22 @@ ucc_coll_args_update_asymmetric_buffer(ucc_coll_args_t *args, args->dst.info.mem_type = args->src.info.mem_type; return UCC_OK; } - case UCC_COLL_TYPE_GATHERV: /* - return (root != rank ? NULL : ( - args->dst.info_v.mem_type == mem_type ? args->src.info.buffer : args->dst.info_v.buffer; - )) */ + case UCC_COLL_TYPE_GATHERV: + { + memcpy(&save_info->old_asymmetric_buffer.info_v, + &args->dst.info_v, sizeof(ucc_coll_buffer_info_v_t)); + status = ucc_mc_alloc(&save_info->scratch, + ucc_coll_args_get_total_count(args, args->dst.info_v.counts, team->size), + args->src.info.mem_type); + if (ucc_unlikely(UCC_OK != status)) { + ucc_error("failed to allocate replacement " + "memory for asymmetric buffer"); + return status; + } + args->dst.info_v.buffer = save_info->scratch->addr; + args->dst.info_v.mem_type = args->src.info.mem_type; + return UCC_OK; + } case UCC_COLL_TYPE_SCATTERV: /* return (root != rank ? NULL : ( args->dst.info.mem_type == mem_type ? args->src.info_v.buffer : args->dst.info.buffer; diff --git a/test/gtest/asym_mem/test_asymmetric_memory.cc b/test/gtest/asym_mem/test_asymmetric_memory.cc index 788928b7c6..e06c70f659 100644 --- a/test/gtest/asym_mem/test_asymmetric_memory.cc +++ b/test/gtest/asym_mem/test_asymmetric_memory.cc @@ -8,7 +8,7 @@ #ifdef HAVE_CUDA using Param = std::tuple; + ucc_memory_type_t, int>; class test_asymmetric_memory : public ucc::test, public ::testing::WithParamInterface @@ -20,7 +20,16 @@ class test_asymmetric_memory : public ucc::test, ucc_rank_t tsize = team->procs.size(); int root = 0; size_t msglen = 2048; + size_t src_modifier = 1; + size_t dst_modifier = 1; ctxs.resize(tsize); + + if (coll_type == UCC_COLL_TYPE_GATHER) { + dst_modifier = tsize; + } else if (coll_type == UCC_COLL_TYPE_SCATTER) { + src_modifier = tsize; + } + for (int i = 0; i < tsize; i++) { ctxs[i] = (gtest_ucc_coll_ctx_t*) calloc(1, sizeof(gtest_ucc_coll_ctx_t)); @@ -31,41 +40,41 @@ class test_asymmetric_memory : public ucc::test, coll->coll_type = coll_type; coll->src.info.mem_type = src_mem_type; - coll->src.info.count = (ucc_count_t)msglen; + coll->src.info.count = (ucc_count_t)msglen * src_modifier; coll->src.info.datatype = UCC_DT_INT8; coll->root = root; - ctxs[i]->rbuf_size = msglen; UCC_CHECK(ucc_mc_alloc(&ctxs[i]->src_mc_header, - ctxs[i]->rbuf_size, src_mem_type)); + msglen * src_modifier, src_mem_type)); coll->src.info.buffer = ctxs[i]->src_mc_header->addr; - ctxs[i]->init_buf = ucc_malloc(ctxs[i]->rbuf_size, + ctxs[i]->init_buf = ucc_malloc(msglen * src_modifier, "init buf"); EXPECT_NE(ctxs[i]->init_buf, nullptr); uint8_t *sbuf = (uint8_t*)ctxs[i]->init_buf; - for (int j = 0; j < ctxs[i]->rbuf_size; j++) { + for (int j = 0; j < msglen * src_modifier; j++) { sbuf[j] = (uint8_t) 1; } UCC_CHECK(ucc_mc_memcpy(coll->src.info.buffer, ctxs[i]->init_buf, - ctxs[i]->rbuf_size, src_mem_type, + msglen * src_modifier, src_mem_type, UCC_MEMORY_TYPE_HOST)); + ctxs[i]->rbuf_size = msglen * dst_modifier; if (i == root) { UCC_CHECK(ucc_mc_alloc(&ctxs[i]->dst_mc_header, ctxs[i]->rbuf_size, dst_mem_type)); - coll->dst.info.buffer = ctxs[i]->dst_mc_header->addr; - coll->dst.info.count = (ucc_count_t)msglen; + coll->dst.info.buffer = ctxs[i]->dst_mc_header->addr; + coll->dst.info.count = (ucc_count_t)ctxs[i]->rbuf_size; coll->dst.info.datatype = UCC_DT_INT8; coll->dst.info.mem_type = dst_mem_type; } } } - void data_fini(UccTeam_h team) + void data_fini() { - for (int i = 0; i < team->procs.size(); i++) { + for (int i = 0; i < ctxs.size(); i++) { gtest_ucc_coll_ctx_t *ctx = ctxs[i]; if (!ctx) { continue; @@ -82,14 +91,18 @@ class test_asymmetric_memory : public ucc::test, ctxs.clear(); } - // nick: for reduce this func doesnt need a loop, but i will change this test to work with other colls in the future - bool data_validate(UccTeam_h team) + bool data_validate() { bool ret = true; int root = 0; + uint8_t result = 1; ucc_memory_type_t dst_mem_type; uint8_t *rst; + if (ctxs[0]->args->coll_type == UCC_COLL_TYPE_REDUCE) { + result = (uint8_t) ctxs.size(); + } + for (int i = 0; i < ctxs.size(); i++) { if (!ctxs[i]) { continue; @@ -100,7 +113,7 @@ class test_asymmetric_memory : public ucc::test, if (i == root) { dst_mem_type = ctxs[root]->args->dst.info.mem_type; - rst = (uint8_t*) ucc_malloc(ctxs[root]->rbuf_size, "dsts buf"); + rst = (uint8_t*) ucc_malloc(ctxs[root]->rbuf_size, "validation buf"); EXPECT_NE(rst, nullptr); UCC_CHECK(ucc_mc_memcpy(rst, ctxs[root]->args->dst.info.buffer, @@ -108,7 +121,7 @@ class test_asymmetric_memory : public ucc::test, UCC_MEMORY_TYPE_HOST, dst_mem_type)); for (int j = 0; j < ctxs[root]->rbuf_size; j++) { - if ((uint8_t) team->procs.size() != rst[j]) { + if (result != rst[j]) { ret = false; break; } @@ -122,42 +135,187 @@ class test_asymmetric_memory : public ucc::test, } }; -UCC_TEST_P(test_asymmetric_memory, single) + +class test_asymmetric_memory_v : public ucc::test, + public ::testing::WithParamInterface { - const ucc_coll_type_t coll_type = std::get<0>(GetParam()); - const ucc_memory_type_t src_mem_type = std::get<1>(GetParam()); - const ucc_memory_type_t dst_mem_type = std::get<2>(GetParam()); - const ucc_job_env_t env = std::get<3>(GetParam()); - const int n_procs = std::get<4>(GetParam()); - - UccJob job(n_procs, UccJob::UCC_JOB_CTX_GLOBAL, env); - UccTeam_h team = job.create_team(n_procs); - - data_init(coll_type, src_mem_type, dst_mem_type, team); - UccReq req(team, ctxs); - if (req.status != UCC_OK) { - data_fini(team); - GTEST_SKIP() << "ucc_collective_init returned " - << ucc_status_string(req.status); +public: + UccCollCtxVec ctxs; + void data_init(ucc_coll_type_t coll_type, ucc_memory_type_t src_mem_type, + ucc_memory_type_t dst_mem_type, UccTeam_h team) { + int nprocs = team->n_procs; + size_t count = 2048; + ucc_rank_t root = 0; + ucc_coll_args_t *coll; + int *counts, *displs; + size_t my_count, all_counts; + + ctxs.resize(nprocs); + for (auto r = 0; r < nprocs; r++) { + coll = (ucc_coll_args_t *)calloc(1, sizeof(ucc_coll_args_t)); + my_count = (nprocs - r) * count; + ctxs[r] = + (gtest_ucc_coll_ctx_t *)calloc(1, sizeof(gtest_ucc_coll_ctx_t)); + ctxs[r]->args = coll; + coll->mask = 0; + coll->flags = 0; + coll->coll_type = coll_type; + coll->root = root; + coll->src.info.mem_type = src_mem_type; + coll->src.info.count = (ucc_count_t)my_count; + coll->src.info.datatype = UCC_DT_INT8; + + ctxs[r]->init_buf = + ucc_malloc(ucc_dt_size(UCC_DT_INT8) * my_count, "init buf"); + ASSERT_NE(ctxs[r]->init_buf, nullptr); + for (int i = 0; i < my_count * ucc_dt_size(UCC_DT_INT8); i++) { + uint8_t *sbuf = (uint8_t *)ctxs[r]->init_buf; + sbuf[i] = ((i + r) % 256); + } + + if (r == root) { + all_counts = 0; + counts = (int*)malloc(sizeof(int) * nprocs); + ASSERT_NE(counts, nullptr); + displs = (int*)malloc(sizeof(int) * nprocs); + ASSERT_NE(displs, nullptr); + + for (int i = 0; i < nprocs; i++) { + counts[i] = (nprocs - i) * count; + displs[i] = all_counts; + all_counts += counts[i]; + } + + coll->dst.info_v.mem_type = dst_mem_type; + coll->dst.info_v.counts = (ucc_count_t *)counts; + coll->dst.info_v.displacements = (ucc_aint_t *)displs; + coll->dst.info_v.datatype = UCC_DT_INT8; + + ctxs[r]->rbuf_size = ucc_dt_size(UCC_DT_INT8) * all_counts; + UCC_CHECK(ucc_mc_alloc(&ctxs[r]->dst_mc_header, + ctxs[r]->rbuf_size, dst_mem_type)); + coll->dst.info_v.buffer = ctxs[r]->dst_mc_header->addr; + } + + UCC_CHECK(ucc_mc_alloc(&ctxs[r]->src_mc_header, + ucc_dt_size(UCC_DT_INT8) * my_count, + src_mem_type)); + coll->src.info.buffer = ctxs[r]->src_mc_header->addr; + UCC_CHECK(ucc_mc_memcpy(coll->src.info.buffer, + ctxs[r]->init_buf, + ucc_dt_size(UCC_DT_INT8) * my_count, + src_mem_type, UCC_MEMORY_TYPE_HOST)); + } + } + + bool data_validate() + { + bool ret = true; + int root = ctxs[0]->args->root; + int *displs = (int*)ctxs[root]->args->dst.info_v.displacements; + size_t dt_size = ucc_dt_size(ctxs[root]->args->src.info.datatype); + ucc_memory_type_t dst_mem_type = ctxs[root]->args->dst.info_v.mem_type; + ucc_count_t my_count; + uint8_t *dsts; + + if (UCC_MEMORY_TYPE_HOST != dst_mem_type) { + dsts = (uint8_t *)ucc_malloc(ctxs[root]->rbuf_size, "dsts buf"); + ucc_assert(dsts != nullptr); + UCC_CHECK(ucc_mc_memcpy(dsts, ctxs[root]->args->dst.info_v.buffer, + ctxs[root]->rbuf_size, + UCC_MEMORY_TYPE_HOST, dst_mem_type)); + } else { + dsts = (uint8_t *)ctxs[root]->args->dst.info_v.buffer; + } + + for (int r = 0; r < ctxs.size(); r++) { + my_count = ctxs[r]->args->src.info.count; + for (int i = 0; i < my_count * dt_size; i++) { + if ((uint8_t)((i + r) % 256) != + dsts[(displs[r] * dt_size + i)]) { + ret = false; + break; + } + } + } + + if (UCC_MEMORY_TYPE_HOST != dst_mem_type) { + ucc_free(dsts); + } + return ret; } - req.start(); - req.wait(); - EXPECT_EQ(true, data_validate(team)); - data_fini(team); -} -ucc_job_env_t asymmetric_env = {}; + void data_fini() + { + int root = ctxs[0]->args->root; + for (auto r = 0; r < ctxs.size(); r++) { + ucc_coll_args_t *coll = ctxs[r]->args; + if (r == root) { + UCC_CHECK(ucc_mc_free(ctxs[r]->dst_mc_header)); + free(coll->dst.info_v.counts); + free(coll->dst.info_v.displacements); + } + UCC_CHECK(ucc_mc_free(ctxs[r]->src_mc_header)); + ucc_free(ctxs[r]->init_buf); + free(coll); + free(ctxs[r]); + } + ctxs.clear(); + } +}; + +#define TEST_ASYM_DECLARE \ + const ucc_coll_type_t coll_type = std::get<0>(GetParam()); \ + const ucc_memory_type_t src_mem_type = std::get<1>(GetParam()); \ + const ucc_memory_type_t dst_mem_type = std::get<2>(GetParam()); \ + const int n_procs = std::get<3>(GetParam()); \ + \ + UccJob job(n_procs, UccJob::UCC_JOB_CTX_GLOBAL); \ + UccTeam_h team = job.create_team(n_procs); \ + \ + data_init(coll_type, src_mem_type, dst_mem_type, team); \ + UccReq req(team, ctxs); \ + if (req.status != UCC_OK) { \ + data_fini(); \ + GTEST_SKIP() << "ucc_collective_init returned " \ + << ucc_status_string(req.status); \ + } \ + req.start(); \ + req.wait(); \ + EXPECT_EQ(true, data_validate()); \ + data_fini(); + +UCC_TEST_P(test_asymmetric_memory, single) +{ + TEST_ASYM_DECLARE +} INSTANTIATE_TEST_CASE_P ( , test_asymmetric_memory, ::testing::Combine ( - ::testing::Values(UCC_COLL_TYPE_REDUCE), // coll type - ::testing::Values(UCC_MEMORY_TYPE_HOST), // src mem type - ::testing::Values(UCC_MEMORY_TYPE_CUDA), // dst mem type - ::testing::Values(asymmetric_env), // env - ::testing::Values(16) // n_procs + ::testing::Values(UCC_COLL_TYPE_REDUCE, UCC_COLL_TYPE_GATHER, UCC_COLL_TYPE_SCATTER), // coll type (scatter may be skipped because tl/ucp does not support scatter) + ::testing::Values(UCC_MEMORY_TYPE_HOST, UCC_MEMORY_TYPE_CUDA), // src mem type + ::testing::Values(UCC_MEMORY_TYPE_HOST, UCC_MEMORY_TYPE_CUDA), // dst mem type + ::testing::Values(8) // n_procs + ) +); + +UCC_TEST_P(test_asymmetric_memory_v, single_v) +{ + TEST_ASYM_DECLARE +} + +INSTANTIATE_TEST_CASE_P +( + , test_asymmetric_memory_v, + ::testing::Combine + ( + ::testing::Values(UCC_COLL_TYPE_GATHERV, UCC_COLL_TYPE_SCATTERV), // coll type + ::testing::Values(UCC_MEMORY_TYPE_HOST, UCC_MEMORY_TYPE_CUDA), // src mem type + ::testing::Values(UCC_MEMORY_TYPE_HOST, UCC_MEMORY_TYPE_CUDA), // dst mem type + ::testing::Values(8) // n_procs ) );