diff --git a/src/components/tl/ucp/bcast/bcast_dbt.c b/src/components/tl/ucp/bcast/bcast_dbt.c index 36394edc57..8f90b4137b 100644 --- a/src/components/tl/ucp/bcast/bcast_dbt.c +++ b/src/components/tl/ucp/bcast/bcast_dbt.c @@ -73,7 +73,7 @@ void ucc_tl_ucp_bcast_dbt_progress(ucc_coll_task_t *coll_task) ucc_derived_of(coll_task, ucc_tl_ucp_task_t); ucc_tl_ucp_team_t *team = TASK_TEAM(task); ucc_coll_args_t *args = &TASK_ARGS(task); - ucc_rank_t rank = UCC_TL_TEAM_RANK(team); + ucc_rank_t rank = task->subset.myrank; ucc_dbt_single_tree_t t1 = task->bcast_dbt.t1; ucc_dbt_single_tree_t t2 = task->bcast_dbt.t2; void *buffer = args->src.info.buffer; @@ -93,14 +93,19 @@ void ucc_tl_ucp_bcast_dbt_progress(ucc_coll_task_t *coll_task) if (rank != t1.root && rank != coll_root) { UCPCHECK_GOTO(ucc_tl_ucp_recv_cb(buffer, data_size_t1, mtype, - t1.parent, team, task, cb[0], + ucc_ep_map_eval(task->subset.map, + t1.parent), + team, task, cb[0], (void *)task), task, out); } if (rank != t2.root && rank != coll_root) { UCPCHECK_GOTO(ucc_tl_ucp_recv_cb(PTR_OFFSET(buffer, data_size_t1), - data_size_t2, mtype, t2.parent, team, + data_size_t2, mtype, + ucc_ep_map_eval(task->subset.map, + t2.parent), + team, task, cb[1], (void *)task), task, out); } @@ -114,7 +119,10 @@ void ucc_tl_ucp_bcast_dbt_progress(ucc_coll_task_t *coll_task) if ((t1.children[i] != UCC_RANK_INVALID) && (t1.children[i] != coll_root)) { UCPCHECK_GOTO(ucc_tl_ucp_send_nb(buffer, data_size_t1, mtype, - t1.children[i], team, task), + ucc_ep_map_eval( + task->subset.map, + t1.children[i]), + team, task), task, out); } } @@ -133,7 +141,10 @@ void ucc_tl_ucp_bcast_dbt_progress(ucc_coll_task_t *coll_task) UCPCHECK_GOTO(ucc_tl_ucp_send_nb(PTR_OFFSET(buffer, data_size_t1), data_size_t2, mtype, - t2.children[i], team, task), + ucc_ep_map_eval( + task->subset.map, + t2.children[i]), + team, task), task, out); } } @@ -161,7 +172,7 @@ ucc_status_t ucc_tl_ucp_bcast_dbt_start(ucc_coll_task_t *coll_task) ucc_tl_ucp_team_t *team = TASK_TEAM(task); ucc_coll_args_t *args = &TASK_ARGS(task); ucc_status_t status = UCC_OK; - ucc_rank_t rank = UCC_TL_TEAM_RANK(team); + ucc_rank_t rank = task->subset.myrank; void *buffer = args->src.info.buffer; ucc_memory_type_t mtype = args->src.info.mem_type; ucc_datatype_t dt = args->src.info.datatype; @@ -181,8 +192,9 @@ ucc_status_t ucc_tl_ucp_bcast_dbt_start(ucc_coll_task_t *coll_task) ucc_tl_ucp_task_reset(task, UCC_INPROGRESS); if (rank == coll_root && coll_root != t1_root) { - status = ucc_tl_ucp_send_nb(buffer, data_size_t1, mtype, t1_root, team, - task); + status = ucc_tl_ucp_send_nb(buffer, data_size_t1, mtype, + ucc_ep_map_eval(task->subset.map, t1_root), + team, task); if (UCC_OK != status) { return status; } @@ -190,14 +202,18 @@ ucc_status_t ucc_tl_ucp_bcast_dbt_start(ucc_coll_task_t *coll_task) if (rank == coll_root && coll_root != t2_root) { status = ucc_tl_ucp_send_nb(PTR_OFFSET(buffer, data_size_t1), - data_size_t2, mtype, t2_root, team, task); + data_size_t2, mtype, + ucc_ep_map_eval(task->subset.map, t2_root), + team, task); if (UCC_OK != status) { return status; } } if (rank != coll_root && rank == t1_root) { - status = ucc_tl_ucp_recv_cb(buffer, data_size_t1, mtype, coll_root, + status = ucc_tl_ucp_recv_cb(buffer, data_size_t1, mtype, + ucc_ep_map_eval(task->subset.map, + coll_root), team, task, cb[0], (void *)task); if (UCC_OK != status) { return status; @@ -206,8 +222,10 @@ ucc_status_t ucc_tl_ucp_bcast_dbt_start(ucc_coll_task_t *coll_task) if (rank != coll_root && rank == t2_root) { status = ucc_tl_ucp_recv_cb(PTR_OFFSET(buffer, data_size_t1), - data_size_t2, mtype, coll_root, team, task, - cb[1], (void *)task); + data_size_t2, mtype, + ucc_ep_map_eval(task->subset.map, + coll_root), + team, task, cb[1], (void *)task); if (UCC_OK != status) { return status; } @@ -227,7 +245,6 @@ ucc_status_t ucc_tl_ucp_bcast_dbt_init( ucc_base_coll_args_t *coll_args, ucc_base_team_t *team, ucc_coll_task_t **task_h) { - ucc_tl_ucp_team_t *tl_team; ucc_tl_ucp_task_t *task; ucc_rank_t rank, size; @@ -236,9 +253,9 @@ ucc_status_t ucc_tl_ucp_bcast_dbt_init( task->super.progress = ucc_tl_ucp_bcast_dbt_progress; task->super.finalize = ucc_tl_ucp_bcast_dbt_finalize; task->n_polls = ucc_max(1, task->n_polls); - tl_team = TASK_TEAM(task); - rank = UCC_TL_TEAM_RANK(tl_team); - size = UCC_TL_TEAM_SIZE(tl_team); + rank = task->subset.myrank; + size = (ucc_rank_t)task->subset.map.ep_num; + ucc_dbt_build_trees(rank, size, &task->bcast_dbt.t1, &task->bcast_dbt.t2); diff --git a/src/components/tl/ucp/tl_ucp_coll.h b/src/components/tl/ucp/tl_ucp_coll.h index 0a8a340955..96b635faa9 100644 --- a/src/components/tl/ucp/tl_ucp_coll.h +++ b/src/components/tl/ucp/tl_ucp_coll.h @@ -340,7 +340,7 @@ ucc_tl_ucp_init_task(ucc_base_coll_args_t *coll_args, ucc_base_team_t *team) ucc_coll_task_init(&task->super, coll_args, team); if (UCC_COLL_ARGS_ACTIVE_SET(&coll_args->args)) { - task->tagged.tag = (coll_args->mask & UCC_COLL_ARGS_FIELD_TAG) + task->tagged.tag = (coll_args->args.mask & UCC_COLL_ARGS_FIELD_TAG) ? coll_args->args.tag : UCC_TL_UCP_ACTIVE_SET_TAG; task->flags |= UCC_TL_UCP_TASK_FLAG_SUBSET; task->subset.map = ucc_active_set_to_ep_map(&coll_args->args); @@ -348,12 +348,8 @@ ucc_tl_ucp_init_task(ucc_base_coll_args_t *coll_args, ucc_base_team_t *team) ucc_ep_map_local_rank(task->subset.map, UCC_TL_TEAM_RANK(tl_team)); ucc_assert(coll_args->args.coll_type == UCC_COLL_TYPE_BCAST); - /* root value in args corresponds to the original team ranks, - need to convert to subset local value */ - TASK_ARGS(task).root = ucc_ep_map_local_rank(task->subset.map, - coll_args->args.root); } else { - if (coll_args->mask & UCC_COLL_ARGS_FIELD_TAG) { + if (coll_args->args.mask & UCC_COLL_ARGS_FIELD_TAG) { task->tagged.tag = coll_args->args.tag; } else { tl_team->seq_num = (tl_team->seq_num + 1) % UCC_TL_UCP_MAX_COLL_TAG; diff --git a/src/core/ucc_coll.c b/src/core/ucc_coll.c index 6cb0426389..68993284fd 100644 --- a/src/core/ucc_coll.c +++ b/src/core/ucc_coll.c @@ -205,9 +205,8 @@ UCC_CORE_PROFILE_FUNC(ucc_status_t, ucc_collective_init, } if (UCC_COLL_ARGS_ACTIVE_SET(coll_args) && - ((UCC_COLL_TYPE_BCAST != coll_args->coll_type) || - coll_args->active_set.size != 2)) { - ucc_warn("Active Sets are only supported for bcast and set size = 2"); + (UCC_COLL_TYPE_BCAST != coll_args->coll_type)) { + ucc_warn("Active Sets are only supported for bcast"); return UCC_ERR_NOT_SUPPORTED; } diff --git a/test/gtest/active_set/test_active_set.cc b/test/gtest/active_set/test_active_set.cc index c9d166c0dd..b7f4f9518b 100644 --- a/test/gtest/active_set/test_active_set.cc +++ b/test/gtest/active_set/test_active_set.cc @@ -4,20 +4,20 @@ */ #include "common/test_ucc.h" +#include -typedef std::tuple, size_t, ucc_memory_type_t> op_t; -using param = std::vector; +typedef std::tuple op_t; +using Param = std::tuple, ucc_job_env_t, int>; -#define OP_T(_src, _dst, _size, _mt) ({ \ - op_t _op(std::pair(_src, _dst), _size, UCC_MEMORY_TYPE_ ## _mt); \ - _op; \ +#define OP_T(_root, _aset_start, _aset_stride, _aset_size, _msg_size, _mt) ({ \ + op_t _op(_root, _aset_start, _aset_stride, _aset_size, \ + _msg_size, UCC_MEMORY_TYPE_ ## _mt); \ + _op; \ }) -class test_active_set : public ucc::test -{}; - -class test_active_set_2 : public test_active_set, - public ::testing::WithParamInterface +class test_active_set : public ucc::test, + public ::testing::WithParamInterface { public: std::vector ctxs; @@ -25,44 +25,73 @@ class test_active_set_2 : public test_active_set, ucc_rank_t tsize = team->procs.size(); ctxs.resize(ops.size()); for (int i = 0; i < ops.size(); i++) { - ucc_rank_t src = std::get<0>(ops[i]).first; - ucc_rank_t dst = std::get<0>(ops[i]).second; - size_t msglen = std::get<1>(ops[i]); - ucc_memory_type_t mt = std::get<2>(ops[i]); + uint64_t aset_root = std::get<0>(ops[i]); + uint64_t aset_start = std::get<1>(ops[i]); + int64_t aset_stride = std::get<2>(ops[i]); + uint64_t aset_size = std::get<3>(ops[i]); + size_t msglen = std::get<4>(ops[i]); + ucc_memory_type_t mt = std::get<5>(ops[i]); + uint64_t root = (aset_start + aset_stride*aset_root) % tsize; + + // aset contains ranks of the active_set in terms of the original + // team + uint64_t to_add = aset_start; + std::unordered_set aset{to_add}; + + while(aset.size() != aset_size) { + to_add = (to_add + aset_stride) % tsize; + // the following assertion makes sure the active set + // start/stride/size combo doesnt select the same rank twice + EXPECT_EQ(aset.find(to_add), aset.end()); + aset.insert(to_add); + } + + EXPECT_NE(aset.find(root), aset.end()); + ctxs[i].resize(tsize); for (int j = 0; j < tsize; j++) { - if (j != src && j != dst) { + if (aset.find(j) == aset.end()) { ctxs[i][j] = NULL; continue; } - ctxs[i][j] = (gtest_ucc_coll_ctx_t*)calloc(1, sizeof(gtest_ucc_coll_ctx_t)); + ctxs[i][j] = (gtest_ucc_coll_ctx_t*) + calloc(1, sizeof(gtest_ucc_coll_ctx_t)); ucc_coll_args_t *coll = (ucc_coll_args_t*) calloc(1, sizeof(ucc_coll_args_t)); ctxs[i][j]->args = coll; - coll->mask = UCC_COLL_ARGS_FIELD_ACTIVE_SET; + coll->mask = + UCC_COLL_ARGS_FIELD_ACTIVE_SET | UCC_COLL_ARGS_FIELD_TAG; coll->coll_type = UCC_COLL_TYPE_BCAST; coll->src.info.mem_type = mt; coll->src.info.count = (ucc_count_t)msglen; coll->src.info.datatype = UCC_DT_INT8; - coll->root = src; - coll->active_set.size = 2; - coll->active_set.start = src; - coll->active_set.stride = (int)dst - (int)src; + coll->root = aset_root; + coll->active_set.size = aset_size; + coll->active_set.start = aset_start; + coll->active_set.stride = aset_stride; + coll->tag = i; ctxs[i][j]->rbuf_size = msglen; - UCC_CHECK(ucc_mc_alloc(&ctxs[i][j]->src_mc_header, ctxs[i][j]->rbuf_size, - mt)); + UCC_CHECK(ucc_mc_alloc(&ctxs[i][j]->src_mc_header, + ctxs[i][j]->rbuf_size, mt)); coll->src.info.buffer = ctxs[i][j]->src_mc_header->addr; - if (j == src) { - ctxs[i][j]->init_buf = ucc_malloc(ctxs[i][j]->rbuf_size, "init buf"); + + for (int k = 0; k < ctxs[i][j]->rbuf_size; k++) { + ((uint8_t *)coll->src.info.buffer)[k] = (uint8_t) 0; + } + + if (j == root) { + ctxs[i][j]->init_buf = ucc_malloc(ctxs[i][j]->rbuf_size, + "init buf"); EXPECT_NE(ctxs[i][j]->init_buf, nullptr); uint8_t *sbuf = (uint8_t*)ctxs[i][j]->init_buf; for (int k = 0; k < ctxs[i][j]->rbuf_size; k++) { - sbuf[k] = (uint8_t)(src + k * dst); + sbuf[k] = (uint8_t) aset_root; } - UCC_CHECK(ucc_mc_memcpy(coll->src.info.buffer, ctxs[i][j]->init_buf, + UCC_CHECK(ucc_mc_memcpy(coll->src.info.buffer, + ctxs[i][j]->init_buf, ctxs[i][j]->rbuf_size, mt, UCC_MEMORY_TYPE_HOST)); } @@ -71,8 +100,9 @@ class test_active_set_2 : public test_active_set, } } - ~test_active_set_2() + void data_fini(UccTeam_h team) { + ucc_rank_t tsize = team->procs.size(); for (int i = 0; i < ctxs.size(); i++) { for (int j = 0; j < ctxs[i].size(); j++ ) { gtest_ucc_coll_ctx_t *ctx = ctxs[i][j]; @@ -80,8 +110,11 @@ class test_active_set_2 : public test_active_set, continue; } ucc_coll_args_t* coll = ctx->args; + uint64_t aset_start = coll->active_set.start; + int64_t aset_stride = coll->active_set.stride; + uint64_t root = (aset_start + aset_stride*coll->root) % tsize; UCC_CHECK(ucc_mc_free(ctx->src_mc_header)); - if (j == coll->root) { + if (j == root) { ucc_free(ctx->init_buf); } free(coll); @@ -92,46 +125,48 @@ class test_active_set_2 : public test_active_set, ctxs.clear(); } - bool data_validate_one(UccCollCtxVec ctxs) + bool data_validate_one(UccCollCtxVec ctx, UccTeam_h team) { bool ret = true; - int root = 0; - ucc_rank_t src = 0, dst = 0; + int root = 0, aset_root = 0; ucc_memory_type_t mem_type; uint8_t *rst; + uint64_t aset_start, aset_stride; + ucc_rank_t tsize = team->procs.size(); - for (int i = 0; i < ctxs.size(); i++) { - if (!ctxs[i]) { + for (int i = 0; i < ctx.size(); i++) { + if (!ctx[i]) { continue; } - root = ctxs[i]->args->root; - mem_type = ctxs[i]->args->src.info.mem_type; - if (root == i) { - src = i; - } else { - dst = i; - } - } - rst = (uint8_t*) ucc_malloc(ctxs[root]->rbuf_size, "dsts buf"); - EXPECT_NE(rst, nullptr); - UCC_CHECK(ucc_mc_memcpy(rst, ctxs[dst]->args->src.info.buffer, - ctxs[root]->rbuf_size, - UCC_MEMORY_TYPE_HOST, mem_type)); + aset_root = ctx[i]->args->root; + aset_start = ctx[i]->args->active_set.start; + aset_stride = ctx[i]->args->active_set.stride; + root = (aset_start + aset_stride*aset_root) % tsize; + mem_type = ctx[i]->args->src.info.mem_type; + + rst = (uint8_t*) ucc_malloc(ctx[i]->rbuf_size, "dsts buf"); + EXPECT_NE(rst, nullptr); - for (int i = 0; i < ctxs[root]->rbuf_size; i++) { - if ((uint8_t)(src + i * dst) != rst[i]) { - ret = false; - break; + UCC_CHECK(ucc_mc_memcpy(rst, ctx[i]->args->src.info.buffer, + ctx[i]->rbuf_size, + UCC_MEMORY_TYPE_HOST, mem_type)); + + for (int j = 0; j < ctx[root]->rbuf_size; j++) { + if ((uint8_t) aset_root != rst[j]) { + ret = false; + break; + } } - } - ucc_free(rst); + ucc_free(rst); + } + return ret; } - bool data_validate() { + bool data_validate(UccTeam_h team) { for (auto &c : ctxs) { - if (true != data_validate_one(c)) { + if (true != data_validate_one(c, team)) { return false; } } @@ -139,11 +174,14 @@ class test_active_set_2 : public test_active_set, } }; -UCC_TEST_P(test_active_set_2, single) +UCC_TEST_P(test_active_set, single) { - auto ops = GetParam(); + auto ops = std::get<0>(GetParam()); + const ucc_job_env_t env = std::get<1>(GetParam()); + const int n_procs = std::get<2>(GetParam()); - UccTeam_h team = UccJob::getStaticTeams().back(); + UccJob job(n_procs, UccJob::UCC_JOB_CTX_GLOBAL, env); + UccTeam_h team = job.create_team(n_procs); data_init(ops, team); for (auto &c : ctxs) { @@ -151,13 +189,19 @@ UCC_TEST_P(test_active_set_2, single) req.start(); req.wait(); } - EXPECT_EQ(true, data_validate()); + EXPECT_EQ(true, data_validate(team)); + data_fini(team); } -UCC_TEST_P(test_active_set_2, multiple) +UCC_TEST_P(test_active_set, multiple) { - auto ops = GetParam(); - UccTeam_h team = UccJob::getStaticTeams().back(); + auto ops = std::get<0>(GetParam()); + const ucc_job_env_t env = std::get<1>(GetParam()); + const int n_procs = std::get<2>(GetParam()); + + UccJob job(n_procs, UccJob::UCC_JOB_CTX_GLOBAL, env); + UccTeam_h team = job.create_team(n_procs); + std::vector reqs; data_init(ops, team); @@ -166,15 +210,44 @@ UCC_TEST_P(test_active_set_2, multiple) } UccReq::startall(reqs); UccReq::waitall(reqs); - EXPECT_EQ(true, data_validate()); + EXPECT_EQ(true, data_validate(team)); + data_fini(team); } -INSTANTIATE_TEST_CASE_P( - , test_active_set_2, - ::testing::Values( - std::vector({ - OP_T(0, 1, 8, HOST), - OP_T(3, 7, 1024, HOST), - OP_T(11, 3, 65530, HOST), - OP_T(7, 5, 123456, HOST) - }))); +ucc_job_env_t knomial_env = {{"UCC_TL_UCP_TUNE", "bcast:@knomial:0-inf:inf"}, + {"UCC_CLS", "basic"}}; +extern ucc_job_env_t dbt_env; // test_bcast.cc + +INSTANTIATE_TEST_CASE_P +( + , test_active_set, + ::testing::Combine + ( + ::testing::Values + ( + std::vector + ({ + // root, start, stride, size, msglen, mt + // root is in terms of the active_set + OP_T(0, 0, 1, 16, 8, HOST), // subset == full set + OP_T(0, 0, 2, 8, 6, HOST), // even ranks in full set + OP_T(0, 3, 8, 2, 65530, HOST), // pt2pt + OP_T(0, 1, 1, 2, 65531, HOST), // pt2pt + OP_T(0, 0, 1, 2, 8, HOST), // pt2pt + OP_T(0, 3, 4, 2, 1024, HOST), // pt2pt + OP_T(1, 3, 4, 2, 1023, HOST), // pt2pt + OP_T(0, 11, -8, 2, 65530, HOST), // pt2pt + OP_T(1, 11, -8, 2, 65531, HOST), // pt2pt + OP_T(0, 7, -2, 2, 123456, HOST), // pt2pt + OP_T(1, 7, -2, 2, 123455, HOST), // pt2pt + OP_T(0, 0, 1, 4, 1337, HOST), + OP_T(1, 0, 2, 4, 64, HOST), + OP_T(2, 0, 3, 3, 1335, HOST), + OP_T(4, 7, -1, 6, 18, HOST) + + }) + ), + ::testing::Values(knomial_env, dbt_env), // env + ::testing::Values(16) // n_procs + ) +);