From 862cad6695ccedd44620cfa5ce41497b133e19d9 Mon Sep 17 00:00:00 2001 From: Nick Sarkauskas Date: Wed, 27 Nov 2024 15:41:18 -0800 Subject: [PATCH] CL/HIER: Add allgatherv --- src/components/cl/hier/Makefile.am | 91 +++--- .../cl/hier/allgatherv/allgatherv.c | 304 ++++++++++++++++++ .../cl/hier/allgatherv/allgatherv.h | 35 ++ src/components/cl/hier/allgatherv/unpack.c | 149 +++++++++ src/components/cl/hier/allgatherv/unpack.h | 15 + src/components/cl/hier/cl_hier.c | 4 +- src/components/cl/hier/cl_hier.h | 7 + src/components/cl/hier/cl_hier_coll.h | 3 +- src/components/cl/hier/cl_hier_team.c | 10 + src/components/topo/ucc_sbgp.c | 1 + src/components/topo/ucc_sbgp.h | 2 +- src/utils/ucc_coll_utils.h | 23 ++ test/gtest/coll/test_allgatherv.cc | 59 +++- test/gtest/common/test_ucc.cc | 5 + test/gtest/common/test_ucc.h | 2 + 15 files changed, 650 insertions(+), 60 deletions(-) create mode 100755 src/components/cl/hier/allgatherv/allgatherv.c create mode 100755 src/components/cl/hier/allgatherv/allgatherv.h create mode 100644 src/components/cl/hier/allgatherv/unpack.c create mode 100644 src/components/cl/hier/allgatherv/unpack.h diff --git a/src/components/cl/hier/Makefile.am b/src/components/cl/hier/Makefile.am index 243f5811e8..ed86a4e395 100644 --- a/src/components/cl/hier/Makefile.am +++ b/src/components/cl/hier/Makefile.am @@ -1,49 +1,56 @@ # -# Copyright (c) 2020-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. # +allgatherv = \ + allgatherv/unpack.h \ + allgatherv/unpack.c \ + allgatherv/allgatherv.h \ + allgatherv/allgatherv.c + allreduce = \ - allreduce/allreduce.h \ - allreduce/allreduce.c \ - allreduce/allreduce_rab.c \ - allreduce/allreduce_split_rail.c - -alltoallv = \ - alltoallv/alltoallv.h \ - alltoallv/alltoallv.c - -alltoall = \ - alltoall/alltoall.h \ - alltoall/alltoall.c - -barrier = \ - barrier/barrier.h \ - barrier/barrier.c - -bcast = \ - bcast/bcast.h \ - bcast/bcast.c \ - bcast/bcast_2step.c - -reduce = \ - reduce/reduce.h \ - reduce/reduce.c \ - reduce/reduce_2step.c - -sources = \ - cl_hier.h \ - cl_hier.c \ - cl_hier_lib.c \ - cl_hier_context.c \ - cl_hier_team.c \ - cl_hier_coll.c \ - cl_hier_coll.h \ - $(allreduce) \ - $(alltoallv) \ - $(alltoall) \ - $(barrier) \ - $(bcast) \ - $(reduce) + allreduce/allreduce.h \ + allreduce/allreduce.c \ + allreduce/allreduce_rab.c \ + allreduce/allreduce_split_rail.c + +alltoallv = \ + alltoallv/alltoallv.h \ + alltoallv/alltoallv.c + +alltoall = \ + alltoall/alltoall.h \ + alltoall/alltoall.c + +barrier = \ + barrier/barrier.h \ + barrier/barrier.c + +bcast = \ + bcast/bcast.h \ + bcast/bcast.c \ + bcast/bcast_2step.c + +reduce = \ + reduce/reduce.h \ + reduce/reduce.c \ + reduce/reduce_2step.c + +sources = \ + cl_hier.h \ + cl_hier.c \ + cl_hier_lib.c \ + cl_hier_context.c \ + cl_hier_team.c \ + cl_hier_coll.c \ + cl_hier_coll.h \ + $(allgatherv) \ + $(allreduce) \ + $(alltoallv) \ + $(alltoall) \ + $(barrier) \ + $(bcast) \ + $(reduce) module_LTLIBRARIES = libucc_cl_hier.la libucc_cl_hier_la_SOURCES = $(sources) diff --git a/src/components/cl/hier/allgatherv/allgatherv.c b/src/components/cl/hier/allgatherv/allgatherv.c new file mode 100755 index 0000000000..0df210218d --- /dev/null +++ b/src/components/cl/hier/allgatherv/allgatherv.c @@ -0,0 +1,304 @@ +/** + * Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * + * See file LICENSE for terms. + */ + +#include "allgatherv.h" +#include "unpack.h" +#include "../cl_hier_coll.h" +#include "core/ucc_team.h" + +#define MAX_ALLGATHERV_TASKS 4 + +ucc_base_coll_alg_info_t + ucc_cl_hier_allgatherv_algs[UCC_CL_HIER_ALLGATHERV_ALG_LAST + 1] = { + [UCC_CL_HIER_ALLGATHERV_ALG_GAB] = + {.id = UCC_CL_HIER_ALLGATHERV_ALG_GAB, + .name = "gab", + .desc = "gatherv + allgatherv + bcast"}, + [UCC_CL_HIER_ALLGATHERV_ALG_LAST] = { + .id = 0, .name = NULL, .desc = NULL}}; + +static ucc_status_t ucc_cl_hier_allgatherv_start(ucc_coll_task_t *task) +{ + UCC_CL_HIER_PROFILE_REQUEST_EVENT(task, "cl_hier_allgatherv_start", 0); + return ucc_schedule_start(task); +} + +static ucc_status_t ucc_cl_hier_allgatherv_finalize(ucc_coll_task_t *task) +{ + ucc_schedule_t *schedule = ucc_derived_of(task, ucc_schedule_t); + ucc_cl_hier_schedule_t *cl_schedule = ucc_derived_of(task, + ucc_cl_hier_schedule_t); + ucc_status_t status; + + ucc_mc_free(cl_schedule->scratch); + + UCC_CL_HIER_PROFILE_REQUEST_EVENT(task, "cl_hier_allgatherv_finalize", 0); + status = ucc_schedule_finalize(task); + ucc_cl_hier_put_schedule(schedule); + return status; +} + +static inline int is_leader(ucc_base_team_t *team, ucc_rank_t rank) +{ + ucc_cl_hier_team_t *cl_team = ucc_derived_of(team, ucc_cl_hier_team_t); + ucc_rank_t ldr_sbgp_size = SBGP_SIZE(cl_team, NODE_LEADERS); + ucc_rank_t i; + for (i = 0; i < ldr_sbgp_size; i++) { + if (ucc_ep_map_eval(SBGP_MAP(cl_team, NODE_LEADERS), i) == rank) { + return 1; + } + } + return 0; +} + +/* TODO: is there a better way to do this? */ +static inline ucc_rank_t find_leader_rank(ucc_base_team_t *team, ucc_rank_t team_rank) +{ + ucc_cl_hier_team_t *cl_team = ucc_derived_of(team, ucc_cl_hier_team_t); + ucc_team_t *core_team = team->params.team; + ucc_rank_t i; + + for (i = 0; i < UCC_CL_TEAM_SIZE(cl_team); i++) { + if (ucc_team_ranks_on_same_node(i, team_rank, core_team) && + is_leader(team, i)) { + return i; + } + } + + return UCC_RANK_INVALID; +} + +UCC_CL_HIER_PROFILE_FUNC(ucc_status_t, ucc_cl_hier_allgatherv_init, + (coll_args, team, task), + ucc_base_coll_args_t *coll_args, ucc_base_team_t *team, + ucc_coll_task_t **task) +{ + ucc_cl_hier_team_t *cl_team = ucc_derived_of(team, + ucc_cl_hier_team_t); + ucc_coll_task_t *tasks[MAX_ALLGATHERV_TASKS] + = {NULL}; + ucc_rank_t rank = UCC_CL_TEAM_RANK(cl_team); + ucc_rank_t node_sbgp_size = SBGP_SIZE(cl_team, NODE); + ucc_rank_t leader_sbgp_size = SBGP_SIZE(cl_team, NODE_LEADERS); + ucc_rank_t team_size = UCC_CL_TEAM_SIZE(cl_team); + ucc_aint_t *node_disps = NULL; + ucc_count_t *node_counts = NULL; + ucc_aint_t *leader_disps = NULL; + ucc_count_t *leader_counts = NULL; + size_t dt_size = ucc_dt_size(coll_args->args. + dst.info_v.datatype); + int in_place = 0; + int is_contig = 1; + ucc_schedule_t *schedule; + ucc_cl_hier_schedule_t *cl_schedule; + ucc_status_t status; + ucc_base_coll_args_t args, args_old; + int n_tasks, i; + size_t scratch_size; + size_t node_counts_size; + size_t node_disps_size; + size_t leader_counts_size; + size_t leader_disps_size; + size_t total_count; + void *node_gathered_data; + + schedule = &ucc_cl_hier_get_schedule(cl_team)->super.super; + if (ucc_unlikely(!schedule)) { + return UCC_ERR_NO_MEMORY; + } + cl_schedule = ucc_derived_of(schedule, ucc_cl_hier_schedule_t); + + memcpy(&args, coll_args, sizeof(args)); + memcpy(&args_old, coll_args, sizeof(args)); + in_place = UCC_IS_INPLACE(args.args); + is_contig = UCC_COLL_IS_DST_CONTIG(&args.args); + n_tasks = 0; + UCC_CHECK_GOTO(ucc_schedule_init(schedule, &args, team), free_sched, status); + + node_counts_size = node_sbgp_size * sizeof(ucc_count_t); + node_disps_size = node_sbgp_size * sizeof(ucc_aint_t); + leader_counts_size = leader_sbgp_size * sizeof(ucc_count_t); + leader_disps_size = leader_sbgp_size * sizeof(ucc_aint_t); + total_count = ucc_coll_args_get_total_count(&args.args, + args.args.dst.info_v.counts, team_size); + scratch_size = node_counts_size + node_disps_size + leader_counts_size + + leader_disps_size + (total_count * dt_size); + + UCC_CHECK_GOTO( + ucc_mc_alloc(&cl_schedule->scratch, scratch_size, UCC_MEMORY_TYPE_HOST), + free_sched, status); + memset(cl_schedule->scratch->addr, 0, scratch_size); + + node_counts = PTR_OFFSET(cl_schedule->scratch->addr, 0); + node_disps = PTR_OFFSET(node_counts, node_counts_size); + leader_counts = PTR_OFFSET(node_disps, node_disps_size); + leader_disps = PTR_OFFSET(leader_counts, leader_counts_size); + node_gathered_data = PTR_OFFSET(leader_disps, leader_disps_size); + + if (SBGP_ENABLED(cl_team, NODE)) { + ucc_assert(n_tasks == 0); + if (cl_team->top_sbgp == UCC_HIER_SBGP_NODE) { + args.args.coll_type = UCC_COLL_TYPE_ALLGATHERV; + } else { + size_t disp_counter = 0; + for (i = 0; i < node_sbgp_size; i++) { + ucc_rank_t team_rank = + ucc_ep_map_eval(SBGP_MAP(cl_team, NODE), i); + ucc_coll_args_set_count( + &args.args, node_counts, i, + ucc_coll_args_get_count(&args.args, + args.args.dst.info_v.counts, + team_rank)); + ucc_coll_args_set_displacement(&args.args, node_disps, + i, disp_counter); + disp_counter += ucc_coll_args_get_count(&args.args, + node_counts, i); + } + + if (in_place) { + args.args.src.info.buffer = + PTR_OFFSET(args.args.dst.info_v.buffer, + dt_size * ucc_coll_args_get_displacement( + &args.args, + args.args.dst.info_v.displacements, + rank)); + args.args.src.info.count = + ucc_coll_args_get_count(&args.args, + args.args.dst.info_v.counts, + rank); + args.args.src.info.datatype = args.args.dst.info_v.datatype; + args.args.src.info.mem_type = args.args.dst.info_v.mem_type; + } + + args.args.coll_type = UCC_COLL_TYPE_GATHERV; + args.args.root = 0; + args.args.flags &= ~UCC_COLL_ARGS_FLAG_IN_PLACE; + args.args.dst.info_v.displacements = node_disps; + args.args.dst.info_v.counts = node_counts; + args.args.dst.info_v.buffer = node_gathered_data; + } + UCC_CHECK_GOTO( + ucc_coll_init(SCORE_MAP(cl_team, NODE), &args, &tasks[n_tasks]), + free_scratch, status); + n_tasks++; + } + + args = args_old; + + if (SBGP_ENABLED(cl_team, NODE_LEADERS)) { + ucc_assert(cl_team->top_sbgp == UCC_HIER_SBGP_NODE_LEADERS); + size_t disp_counter = 0; + + /* Sum up the counts on each node to get the count for each node leader */ + for (i = 0; i < team_size; i++) { + ucc_rank_t leader_team_rank = find_leader_rank(team, i); + size_t leader_old_count = + ucc_coll_args_get_count(&args.args, leader_counts, + ucc_ep_map_local_rank( + SBGP_MAP(cl_team, NODE_LEADERS), + leader_team_rank)); + size_t add_count = + ucc_coll_args_get_count(&args.args, + args.args.dst.info_v.counts, i); + size_t new_count = add_count + leader_old_count; + ucc_coll_args_set_count(&args.args, leader_counts, + ucc_ep_map_local_rank( + SBGP_MAP(cl_team, NODE_LEADERS), + leader_team_rank), + new_count); + } + + /* + Need to order leader displacements by their team rank, not their leader sbgp rank. + The reason is leaders are not always in the same order as they are in the team + e.g., 2n2ppn + team ranks = 0 1 2 3 with 0 and 2 as leaders + leader sbgp ranks can be 2 0 wrt their team ranks + */ + for (i = 0; i < team_size; i++) { + if (is_leader(team, i)) { + ucc_rank_t leader_sgbp_rank = + ucc_ep_map_local_rank(SBGP_MAP(cl_team, NODE_LEADERS), i); + ucc_coll_args_set_displacement(&args.args, leader_disps, + leader_sgbp_rank, disp_counter); + disp_counter += ucc_coll_args_get_count(&args.args, + leader_counts, + leader_sgbp_rank); + } + } + args.args.coll_type = UCC_COLL_TYPE_ALLGATHERV; + args.args.flags &= ~UCC_COLL_ARGS_FLAG_IN_PLACE; + args.args.src.info.buffer = node_gathered_data; + args.args.src.info.count = ucc_coll_args_get_total_count( + &args.args, + node_counts, + node_sbgp_size); + args.args.src.info.datatype = args.args.dst.info_v.datatype; + args.args.src.info.mem_type = UCC_MEMORY_TYPE_HOST; + args.args.dst.info_v.displacements = leader_disps; + args.args.dst.info_v.counts = leader_counts; + args.args.dst.info_v.buffer = args_old.args.dst.info_v.buffer; + UCC_CHECK_GOTO(ucc_coll_init(SCORE_MAP(cl_team, NODE_LEADERS), &args, + &tasks[n_tasks]), + free_scratch, status); + n_tasks++; + } + + if (SBGP_ENABLED(cl_team, NODE) && + cl_team->top_sbgp != UCC_HIER_SBGP_NODE) { + args = args_old; + args.args.coll_type = UCC_COLL_TYPE_BCAST; + args.args.mask |= UCC_COLL_ARGS_FIELD_FLAGS; + args.args.flags |= UCC_COLL_ARGS_FLAG_IN_PLACE; + args.args.root = 0; + args.args.src.info.buffer = args_old.args.dst.info_v.buffer; + args.args.src.info.count = total_count; + args.args.src.info.datatype = args_old.args.dst.info_v.datatype; + args.args.src.info.mem_type = args_old.args.dst.info_v.mem_type; + UCC_CHECK_GOTO( + ucc_coll_init(SCORE_MAP(cl_team, NODE), &args, &tasks[n_tasks]), + free_scratch, status); + n_tasks++; + + if (!is_contig) { + args = args_old; + UCC_CHECK_GOTO( + ucc_cl_hier_allgatherv_unpack_init(&args, team, &tasks[n_tasks]), + free_scratch, status); + n_tasks++; + } + } + + UCC_CHECK_GOTO(ucc_event_manager_subscribe( + &schedule->super, UCC_EVENT_SCHEDULE_STARTED, tasks[0], + ucc_task_start_handler), + free_scratch, status); + UCC_CHECK_GOTO( + ucc_schedule_add_task(schedule, tasks[0]), free_scratch, status); + for (i = 1; i < n_tasks; i++) { + UCC_CHECK_GOTO( + ucc_event_manager_subscribe(tasks[i - 1], UCC_EVENT_COMPLETED, + tasks[i], ucc_task_start_handler), + free_scratch, status); + UCC_CHECK_GOTO( + ucc_schedule_add_task(schedule, tasks[i]), free_scratch, status); + } + + schedule->super.flags |= UCC_COLL_TASK_FLAG_EXECUTOR; + schedule->super.post = ucc_cl_hier_allgatherv_start; + schedule->super.finalize = ucc_cl_hier_allgatherv_finalize; + *task = &schedule->super; + return UCC_OK; + +free_scratch: + ucc_mc_free(cl_schedule->scratch); +free_sched: + for (i = 0; i < n_tasks; i++) { + tasks[i]->finalize(tasks[i]); + } + ucc_cl_hier_put_schedule(schedule); + return status; +} diff --git a/src/components/cl/hier/allgatherv/allgatherv.h b/src/components/cl/hier/allgatherv/allgatherv.h new file mode 100755 index 0000000000..9cf211b6dd --- /dev/null +++ b/src/components/cl/hier/allgatherv/allgatherv.h @@ -0,0 +1,35 @@ +/** + * Copyright (c) 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * + * See file LICENSE for terms. + */ + +#ifndef ALLGATHERV_H_ +#define ALLGATHERV_H_ +#include "../cl_hier.h" + +enum +{ + UCC_CL_HIER_ALLGATHERV_ALG_GAB, + UCC_CL_HIER_ALLGATHERV_ALG_LAST, +}; + +extern ucc_base_coll_alg_info_t + ucc_cl_hier_allgatherv_algs[UCC_CL_HIER_ALLGATHERV_ALG_LAST + 1]; + +ucc_status_t ucc_cl_hier_allgatherv_init(ucc_base_coll_args_t *coll_args, + ucc_base_team_t *team, + ucc_coll_task_t **task); + +static inline int ucc_cl_hier_allgatherv_alg_from_str(const char *str) +{ + int i; + for (i = 0; i < UCC_CL_HIER_ALLGATHERV_ALG_LAST; i++) { + if (0 == strcasecmp(str, ucc_cl_hier_allgatherv_algs[i].name)) { + break; + } + } + return i; +} + +#endif diff --git a/src/components/cl/hier/allgatherv/unpack.c b/src/components/cl/hier/allgatherv/unpack.c new file mode 100644 index 0000000000..3281cc3314 --- /dev/null +++ b/src/components/cl/hier/allgatherv/unpack.c @@ -0,0 +1,149 @@ +/** + * Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * + * See file LICENSE for terms. + */ + +#include "unpack.h" + +ucc_status_t ucc_cl_hier_allgatherv_unpack_finalize(ucc_coll_task_t *task) +{ + ucc_schedule_t *schedule = ucc_derived_of(task, ucc_schedule_t); + ucc_cl_hier_schedule_t *cl_schedule = ucc_derived_of(schedule, + ucc_cl_hier_schedule_t); + + ucc_mc_free(cl_schedule->scratch); + + return UCC_OK; +} + +void ucc_cl_hier_allgatherv_unpack_progress(ucc_coll_task_t *task) +{ + ucc_schedule_t *schedule = ucc_derived_of(task, ucc_schedule_t); + ucc_cl_hier_team_t *cl_team = ucc_derived_of(task->team, + ucc_cl_hier_team_t); + ucc_rank_t team_size = UCC_CL_TEAM_SIZE(cl_team); + ucc_cl_hier_schedule_t *cl_schedule = ucc_derived_of(schedule, + ucc_cl_hier_schedule_t); + ucc_ee_executor_task_t **tasks = cl_schedule->scratch->addr; + ucc_status_t st = UCC_OK; + ucc_rank_t i; + + for (i = 0; i < team_size; i++) { + ucc_ee_executor_task_t *etask = tasks[i]; + if (etask != NULL) { + st = ucc_ee_executor_task_test(etask); + if (st == UCC_OK) { + ucc_ee_executor_task_finalize(etask); + tasks[i] = NULL; + } else { + if (ucc_likely(st > 0)) { + st = UCC_INPROGRESS; + } + goto out; + } + } + } + +out: + schedule->super.status = st; + schedule->super.super.status = st; +} + +ucc_status_t ucc_cl_hier_allgatherv_unpack_start(ucc_coll_task_t *task) +{ + ucc_schedule_t *schedule = ucc_derived_of(task, + ucc_schedule_t); + ucc_cl_hier_team_t *cl_team = ucc_derived_of(task->team, + ucc_cl_hier_team_t); + ucc_rank_t team_size = UCC_CL_TEAM_SIZE(cl_team); + ucc_coll_args_t *args = &task->bargs.args; + ucc_ee_executor_task_args_t eargs = {0}; + ucc_cl_hier_schedule_t *cl_schedule = ucc_derived_of(schedule, + ucc_cl_hier_schedule_t); + ucc_ee_executor_task_t **tasks = cl_schedule->scratch->addr; + ucc_rank_t n_tasks = 0; + size_t dt_size = ucc_dt_size( + args->dst.info_v.datatype); + ucc_ee_executor_t *exec; + ucc_status_t status; + int i; + size_t disp_counter; + size_t this_rank_count; + + UCC_CHECK_GOTO( + ucc_coll_task_get_executor(&schedule->super, &exec), + out, status); + eargs.task_type = UCC_EE_EXECUTOR_TASK_COPY; + + disp_counter = ucc_coll_args_get_total_count(args, + args->dst.info_v.counts, + team_size); + + for (i = team_size - 1; i >= 0; i--) { + this_rank_count = ucc_coll_args_get_count(args, args->dst.info_v.counts, + i); + disp_counter -= this_rank_count; + eargs.copy.src = PTR_OFFSET( + args->dst.info_v.buffer, disp_counter * dt_size); + eargs.copy.dst = PTR_OFFSET( + args->dst.info_v.buffer, + ucc_coll_args_get_displacement( + args, args->dst.info_v.displacements, i) * + dt_size); + eargs.copy.len = this_rank_count * dt_size; + UCC_CHECK_GOTO( + ucc_ee_executor_task_post(exec, &eargs, &tasks[n_tasks]), + out, status); + n_tasks++; + } + + schedule->super.status = UCC_INPROGRESS; + schedule->super.super.status = UCC_INPROGRESS; + + ucc_progress_queue_enqueue(cl_team->super.super.context->ucc_context->pq, + task); + + return UCC_OK; +out: + return status; +} + +ucc_status_t ucc_cl_hier_allgatherv_unpack_init(ucc_base_coll_args_t *coll_args, + ucc_base_team_t *team, + ucc_coll_task_t **task_h) +{ + ucc_cl_hier_team_t *cl_team = ucc_derived_of(team, ucc_cl_hier_team_t); + ucc_rank_t team_size = UCC_CL_TEAM_SIZE(cl_team); + ucc_status_t status; + ucc_schedule_t *schedule; + ucc_cl_hier_schedule_t *cl_schedule; + size_t scratch_size; + + schedule = &ucc_cl_hier_get_schedule(cl_team)->super.super; + if (ucc_unlikely(!schedule)) { + return UCC_ERR_NO_MEMORY; + } + cl_schedule = ucc_derived_of(schedule, ucc_cl_hier_schedule_t); + + UCC_CHECK_GOTO( + ucc_schedule_init(schedule, coll_args, team), free_schedule, status); + + scratch_size = team_size * sizeof(ucc_ee_executor_task_t*); + UCC_CHECK_GOTO( + ucc_mc_alloc(&cl_schedule->scratch, scratch_size, UCC_MEMORY_TYPE_HOST), + free_schedule, status); + + schedule->super.flags |= UCC_COLL_TASK_FLAG_EXECUTOR; + schedule->super.post = ucc_cl_hier_allgatherv_unpack_start; + schedule->super.progress = ucc_cl_hier_allgatherv_unpack_progress; + schedule->super.finalize = ucc_cl_hier_allgatherv_unpack_finalize; + + *task_h = &schedule->super; + + return UCC_OK; + +free_schedule: + ucc_cl_hier_put_schedule(schedule); + return status; +} diff --git a/src/components/cl/hier/allgatherv/unpack.h b/src/components/cl/hier/allgatherv/unpack.h new file mode 100644 index 0000000000..8e59774dc2 --- /dev/null +++ b/src/components/cl/hier/allgatherv/unpack.h @@ -0,0 +1,15 @@ +/** + * Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * + * See file LICENSE for terms. + */ + +#include "../cl_hier_coll.h" +#include "core/ucc_team.h" + +ucc_status_t ucc_cl_hier_allgatherv_unpack_init(ucc_base_coll_args_t *coll_args, + ucc_base_team_t *team, + ucc_coll_task_t **task_h); +ucc_status_t ucc_cl_hier_allgatherv_unpack_start(ucc_coll_task_t *task); +void ucc_cl_hier_allgatherv_unpack_progress(ucc_coll_task_t *task); +ucc_status_t ucc_cl_hier_allgatherv_unpack_finalize(ucc_coll_task_t *task); diff --git a/src/components/cl/hier/cl_hier.c b/src/components/cl/hier/cl_hier.c index edbb469d78..1e67522c6d 100644 --- a/src/components/cl/hier/cl_hier.c +++ b/src/components/cl/hier/cl_hier.c @@ -1,5 +1,5 @@ /** - * Copyright (c) 2020-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * Copyright (c) 2020-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. * * See file LICENSE for terms. */ @@ -117,4 +117,6 @@ __attribute__((constructor)) static void cl_hier_iface_init(void) ucc_cl_hier_alltoallv_algs; ucc_cl_hier.super.alg_info[ucc_ilog2(UCC_COLL_TYPE_BCAST)] = ucc_cl_hier_bcast_algs; + ucc_cl_hier.super.alg_info[ucc_ilog2(UCC_COLL_TYPE_ALLGATHERV)] = + ucc_cl_hier_allgatherv_algs; } diff --git a/src/components/cl/hier/cl_hier.h b/src/components/cl/hier/cl_hier.h index 82f082123d..489ec41578 100644 --- a/src/components/cl/hier/cl_hier.h +++ b/src/components/cl/hier/cl_hier.h @@ -113,6 +113,7 @@ UCC_CLASS_DECLARE(ucc_cl_hier_team_t, ucc_base_context_t *, #define UCC_CL_HIER_SUPPORTED_COLLS \ (UCC_COLL_TYPE_ALLTOALL | \ UCC_COLL_TYPE_ALLTOALLV | \ + UCC_COLL_TYPE_ALLGATHERV | \ UCC_COLL_TYPE_ALLREDUCE | \ UCC_COLL_TYPE_BARRIER | \ UCC_COLL_TYPE_BCAST | \ @@ -134,6 +135,12 @@ ucc_status_t ucc_cl_hier_coll_init(ucc_base_coll_args_t *coll_args, #define SBGP_RANK(_team, _sbgp) \ ((_team)->sbgps[UCC_HIER_SBGP_##_sbgp].sbgp->group_rank) +#define SBGP_SIZE(_team, _sbgp) \ + ((_team)->sbgps[UCC_HIER_SBGP_##_sbgp].sbgp->group_size) + +#define SBGP_MAP(_team, _sbgp) \ + ((_team)->sbgps[UCC_HIER_SBGP_##_sbgp].sbgp->map) + #define SBGP_EXISTS(_team, _sbgp) \ ((NULL != (_team)->sbgps[UCC_HIER_SBGP_##_sbgp].sbgp) && \ ((_team)->sbgps[UCC_HIER_SBGP_##_sbgp].sbgp->status != \ diff --git a/src/components/cl/hier/cl_hier_coll.h b/src/components/cl/hier/cl_hier_coll.h index 5a1e294afe..33c5e3377b 100644 --- a/src/components/cl/hier/cl_hier_coll.h +++ b/src/components/cl/hier/cl_hier_coll.h @@ -1,5 +1,5 @@ /** - * Copyright (c) 2020-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * Copyright (c) 2020-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. * * See file LICENSE for terms. */ @@ -15,6 +15,7 @@ #include "barrier/barrier.h" #include "bcast/bcast.h" #include "reduce/reduce.h" +#include "allgatherv/allgatherv.h" #define UCC_CL_HIER_N_DEFAULT_ALG_SELECT_STR 3 diff --git a/src/components/cl/hier/cl_hier_team.c b/src/components/cl/hier/cl_hier_team.c index 32ef7e2f93..dd671e14f5 100644 --- a/src/components/cl/hier/cl_hier_team.c +++ b/src/components/cl/hier/cl_hier_team.c @@ -402,6 +402,16 @@ ucc_status_t ucc_cl_hier_team_get_scores(ucc_base_team_t *cl_team, } + status = ucc_coll_score_add_range( + score, UCC_COLL_TYPE_ALLGATHERV, UCC_MEMORY_TYPE_HOST, + 0, UCC_MSG_MAX, UCC_CL_HIER_DEFAULT_SCORE, + ucc_cl_hier_allgatherv_init, cl_team); + if (UCC_OK != status) { + cl_error(lib, "faild to add range to score_t"); + return status; + + } + for (i = 0; i < UCC_CL_HIER_N_DEFAULT_ALG_SELECT_STR; i++) { status = ucc_coll_score_update_from_str( ucc_cl_hier_default_alg_select_str[i], &team_info, diff --git a/src/components/topo/ucc_sbgp.c b/src/components/topo/ucc_sbgp.c index 9dcfbe5239..74d685ea54 100644 --- a/src/components/topo/ucc_sbgp.c +++ b/src/components/topo/ucc_sbgp.c @@ -244,6 +244,7 @@ static ucc_status_t sbgp_create_node_leaders(ucc_topo_t *topo, ucc_sbgp_t *sbgp, nl_array_3[sbgp_id + host_id * max_ctx_sbgp_size]++; } + /* Find the first rank that maps to this node, store in nl_array_2 */ if (nl_array_1[host_id] == 0 || nl_array_1[host_id] == ctx_nlr) { nl_array_2[host_id] = i; } diff --git a/src/components/topo/ucc_sbgp.h b/src/components/topo/ucc_sbgp.h index 63697fd02f..de1ca57b84 100644 --- a/src/components/topo/ucc_sbgp.h +++ b/src/components/topo/ucc_sbgp.h @@ -1,5 +1,5 @@ /** - * Copyright (c) 2020-2021, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * Copyright (c) 2020-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. * See file LICENSE for terms. */ #ifndef UCC_SBGP_H_ diff --git a/src/utils/ucc_coll_utils.h b/src/utils/ucc_coll_utils.h index 2cb563b24a..1dc995ba33 100644 --- a/src/utils/ucc_coll_utils.h +++ b/src/utils/ucc_coll_utils.h @@ -97,6 +97,17 @@ ucc_coll_args_get_count(const ucc_coll_args_t *args, const ucc_count_t *counts, return ((uint32_t *)counts)[idx]; } +static inline void +ucc_coll_args_set_count(const ucc_coll_args_t *args, const ucc_count_t *counts, + ucc_rank_t idx, size_t val) +{ + if (UCC_COLL_ARGS_COUNT64(args)) { + ((uint64_t *)counts)[idx] = (uint64_t)val; + } else { + ((uint32_t *)counts)[idx] = (uint32_t)val; + } +} + static inline size_t ucc_coll_args_get_max_count(const ucc_coll_args_t *args, const ucc_count_t * counts, ucc_rank_t size) @@ -123,6 +134,18 @@ ucc_coll_args_get_displacement(const ucc_coll_args_t *args, return ((uint32_t *)displacements)[idx]; } +static inline void +ucc_coll_args_set_displacement(const ucc_coll_args_t *args, + const ucc_aint_t *displacements, ucc_rank_t idx, + size_t val) +{ + if (UCC_COLL_ARGS_DISPL64(args)) { + ((uint64_t *)displacements)[idx] = (uint64_t)val; + } else { + ((uint32_t *)displacements)[idx] = (uint32_t)val; + } +} + static inline size_t ucc_coll_args_get_total_count(const ucc_coll_args_t *args, const ucc_count_t *counts, ucc_rank_t size) diff --git a/test/gtest/coll/test_allgatherv.cc b/test/gtest/coll/test_allgatherv.cc index 70bc325732..9ecc164f71 100644 --- a/test/gtest/coll/test_allgatherv.cc +++ b/test/gtest/coll/test_allgatherv.cc @@ -7,9 +7,11 @@ #include "common/test_ucc.h" #include "utils/ucc_math.h" -using Param_0 = std::tuple; -using Param_1 = std::tuple; -using Param_2 = std::tuple; +using Param_0 = std::tuple; +using Param_1 = std::tuple; +using Param_2 = std::tuple; + +size_t noncontig_padding = 1; // # elements worth of space in between each rank's contribution to the dst buf class test_allgatherv : public UccCollArgs, public ucc::test { @@ -21,7 +23,7 @@ class test_allgatherv : public UccCollArgs, public ucc::test int *counts; int *displs; size_t my_count = (nprocs - r) * count; - size_t all_counts = 0; + size_t disp_counter = 0; ucc_coll_args_t *coll = (ucc_coll_args_t*)calloc(1, sizeof(ucc_coll_args_t)); ctxs[r] = (gtest_ucc_coll_ctx_t*)calloc(1, sizeof(gtest_ucc_coll_ctx_t)); @@ -30,13 +32,21 @@ class test_allgatherv : public UccCollArgs, public ucc::test counts = (int*)malloc(sizeof(int) * nprocs); displs = (int*)malloc(sizeof(int) * nprocs); - for (int i = 0; i < nprocs; i++) { - counts[i] = (nprocs - i) * count; - displs[i] = all_counts; - all_counts += counts[i]; + if (is_contig) { + for (int i = 0; i < nprocs; i++) { + counts[i] = (nprocs - i) * count; + displs[i] = disp_counter; + disp_counter += counts[i]; + } + coll->flags = UCC_COLL_ARGS_FLAG_CONTIG_DST_BUFFER; + } else { + for (int i = 0; i < nprocs; i++) { + counts[i] = (nprocs - i) * count; + displs[i] = disp_counter; + disp_counter += counts[i] + noncontig_padding; // Add noncontig_padding elemnts of space between the bufs + } } coll->mask = UCC_COLL_ARGS_FIELD_FLAGS; - coll->flags = UCC_COLL_ARGS_FLAG_CONTIG_DST_BUFFER; coll->coll_type = UCC_COLL_TYPE_ALLGATHERV; coll->src.info.mem_type = mem_type; @@ -55,7 +65,7 @@ class test_allgatherv : public UccCollArgs, public ucc::test sbuf[i] = r; } - ctxs[r]->rbuf_size = ucc_dt_size(dtype) * all_counts; + ctxs[r]->rbuf_size = ucc_dt_size(dtype) * disp_counter; UCC_CHECK(ucc_mc_alloc(&ctxs[r]->dst_mc_header, ctxs[r]->rbuf_size, mem_type)); coll->dst.info_v.buffer = ctxs[r]->dst_mc_header->addr; @@ -138,6 +148,7 @@ class test_allgatherv : public UccCollArgs, public ucc::test for (int i = 0; i < ctxs.size(); i++) { size_t rank_size = 0; uint8_t *rbuf = dsts[i]; + int is_contig = UCC_COLL_IS_DST_CONTIG(ctxs[i]->args); for (int r = 0; r < ctxs.size(); r++) { rbuf += rank_size; rank_size = ucc_dt_size((ctxs[r])->args->src.info.datatype) * @@ -148,6 +159,9 @@ class test_allgatherv : public UccCollArgs, public ucc::test break; } } + if (!is_contig) { + rbuf += noncontig_padding * ucc_dt_size((ctxs[r])->args->src.info.datatype); + } } } if (UCC_MEMORY_TYPE_HOST != mem_type) { @@ -169,11 +183,13 @@ UCC_TEST_P(test_allgatherv_0, single) const ucc_memory_type_t mem_type = std::get<2>(GetParam()); const int count = std::get<3>(GetParam()); const gtest_ucc_inplace_t inplace = std::get<4>(GetParam()); + const bool contig = std::get<5>(GetParam()); UccTeam_h team = UccJob::getStaticTeams()[team_id]; int size = team->procs.size(); UccCollCtxVec ctxs; set_inplace(inplace); + set_contig(contig); SET_MEM_TYPE(mem_type); data_init(size, dtype, count, ctxs, false); @@ -191,12 +207,14 @@ UCC_TEST_P(test_allgatherv_0, single_persistent) const ucc_memory_type_t mem_type = std::get<2>(GetParam()); const int count = std::get<3>(GetParam()); const gtest_ucc_inplace_t inplace = std::get<4>(GetParam()); + const bool contig = std::get<5>(GetParam()); UccTeam_h team = UccJob::getStaticTeams()[team_id]; int size = team->procs.size(); const int n_calls = 3; UccCollCtxVec ctxs; set_inplace(inplace); + set_contig(contig); SET_MEM_TYPE(mem_type); data_init(size, dtype, count, ctxs, true); @@ -223,7 +241,9 @@ INSTANTIATE_TEST_CASE_P( ::testing::Values(UCC_MEMORY_TYPE_HOST), #endif ::testing::Values(1,3,8192), // count - ::testing::Values(TEST_INPLACE, TEST_NO_INPLACE))); // inplace + ::testing::Values(TEST_INPLACE, TEST_NO_INPLACE), // inplace + ::testing::Values(false, true) // contig dst buf displacements + )); class test_allgatherv_1 : public test_allgatherv, public ::testing::WithParamInterface {}; @@ -234,6 +254,7 @@ UCC_TEST_P(test_allgatherv_1, multiple) const ucc_memory_type_t mem_type = std::get<1>(GetParam()); const int count = std::get<2>(GetParam()); const gtest_ucc_inplace_t inplace = std::get<3>(GetParam()); + const bool contig = std::get<4>(GetParam()); std::vector reqs; std::vector ctxs; @@ -243,6 +264,7 @@ UCC_TEST_P(test_allgatherv_1, multiple) UccCollCtxVec ctx; this->set_inplace(inplace); + this->set_contig(contig); SET_MEM_TYPE(mem_type); data_init(size, dtype, count, ctx, false); @@ -269,7 +291,9 @@ INSTANTIATE_TEST_CASE_P( ::testing::Values(UCC_MEMORY_TYPE_HOST), #endif ::testing::Values(1,3,8192), // count - ::testing::Values(TEST_INPLACE, TEST_NO_INPLACE))); + ::testing::Values(TEST_INPLACE, TEST_NO_INPLACE), + ::testing::Values(false, true)) // dst buf contig + ); class test_allgatherv_alg : public test_allgatherv, public ::testing::WithParamInterface {}; @@ -280,6 +304,7 @@ UCC_TEST_P(test_allgatherv_alg, alg) const ucc_memory_type_t mem_type = std::get<1>(GetParam()); const int count = std::get<2>(GetParam()); const gtest_ucc_inplace_t inplace = std::get<3>(GetParam()); + const bool contig = std::get<5>(GetParam()); int n_procs = 5; char tune[32]; @@ -291,13 +316,14 @@ UCC_TEST_P(test_allgatherv_alg, alg) UccCollCtxVec ctxs; set_inplace(inplace); + set_contig(contig); SET_MEM_TYPE(mem_type); data_init(n_procs, dtype, count, ctxs, false); UccReq req(team, ctxs); req.start(); req.wait(); - EXPECT_EQ(true, data_validate(ctxs));; + EXPECT_EQ(true, data_validate(ctxs)); data_fini(ctxs); } @@ -313,13 +339,16 @@ INSTANTIATE_TEST_CASE_P( #endif ::testing::Values(1,3,8192), // count ::testing::Values(TEST_INPLACE, TEST_NO_INPLACE), - ::testing::Values("knomial", "ring")), + ::testing::Values("knomial", "ring"), + ::testing::Values(false, true)), // dst buf contig [](const testing::TestParamInfo& info) { std::string name; name += ucc_datatype_str(std::get<0>(info.param)); name += std::string("_") + std::string(ucc_mem_type_str(std::get<1>(info.param))); name += std::string("_count_")+std::to_string(std::get<2>(info.param)); name += std::string("_inplace_")+std::to_string(std::get<3>(info.param)); + name += std::string("_contig_")+std::to_string(std::get<5>(info.param)); name += std::string("_")+std::get<4>(info.param); return name; - }); + } + ); diff --git a/test/gtest/common/test_ucc.cc b/test/gtest/common/test_ucc.cc index 40b51c1f56..4728f2119a 100644 --- a/test/gtest/common/test_ucc.cc +++ b/test/gtest/common/test_ucc.cc @@ -707,6 +707,11 @@ void UccCollArgs::set_inplace(gtest_ucc_inplace_t _inplace) inplace = _inplace; } +void UccCollArgs::set_contig(bool _is_contig) +{ + is_contig = _is_contig; +} + void clear_buffer(void *_buf, size_t size, ucc_memory_type_t mt, uint8_t value) { void *buf = _buf; diff --git a/test/gtest/common/test_ucc.h b/test/gtest/common/test_ucc.h index f16e014b54..60597d81d9 100644 --- a/test/gtest/common/test_ucc.h +++ b/test/gtest/common/test_ucc.h @@ -40,6 +40,7 @@ class UccCollArgs { protected: ucc_memory_type_t mem_type; gtest_ucc_inplace_t inplace; + bool is_contig; void alltoallx_init_buf(int src_rank, int dst_rank, uint8_t *buf, size_t len) { for (int i = 0; i < len; i++) { @@ -74,6 +75,7 @@ class UccCollArgs { virtual bool data_validate(UccCollCtxVec args) = 0; void set_mem_type(ucc_memory_type_t _mt); void set_inplace(gtest_ucc_inplace_t _inplace); + void set_contig(bool _contig); }; #define SET_MEM_TYPE(_mt) do { \