Skip to content

Commit

Permalink
TL/UCP: add knomial allgatherv (openucx#1008)
Browse files Browse the repository at this point in the history
  • Loading branch information
Sergei-Lebedev authored Oct 7, 2024
1 parent fb041fd commit e5cb294
Show file tree
Hide file tree
Showing 14 changed files with 262 additions and 28 deletions.
2 changes: 2 additions & 0 deletions src/coll_patterns/recursive_knomial.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,10 @@ typedef struct ucc_knomial_pattern {
ucc_rank_t n_extra; /* number of "extra" ranks to be served by "proxies" */
size_t block_size_counts;
size_t count; /* collective buffer size */
ucc_count_t *counts;
ucc_rank_t block_size;
ptrdiff_t block_offset;
int is64;
} ucc_knomial_pattern_t;

/**
Expand Down
22 changes: 20 additions & 2 deletions src/coll_patterns/sra_knomial.h
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,20 @@ ucc_kn_agx_pattern_init(ucc_rank_t size, ucc_rank_t rank, ucc_kn_radix_t radix,
}
}

static inline void
ucc_kn_agv_pattern_init(ucc_rank_t size, ucc_rank_t rank, ucc_kn_radix_t radix,
ucc_count_t *counts, int is64,
ucc_knomial_pattern_t *p)
{
ucc_knomial_pattern_init(size, rank, radix, p);
p->type = KN_PATTERN_ALLGATHERV;
p->counts = counts;
p->is64 = is64;
p->block_size = p->radix_pow * radix;
p->block_offset = ucc_knomial_pattern_loop_rank(p, rank) / p->block_size *
p->block_size;
}

static inline void
ucc_kn_ag_pattern_peer_seg(ucc_rank_t peer, ucc_knomial_pattern_t *p,
size_t *seg_count, ptrdiff_t *seg_offset)
Expand All @@ -236,8 +250,12 @@ ucc_kn_ag_pattern_peer_seg(ucc_rank_t peer, ucc_knomial_pattern_t *p,
*seg_offset;
return;
case KN_PATTERN_ALLGATHERV:
/* not implemented */
ucc_assert(0);
ucc_kn_seg_desc_compute(p, &s, peer);
*seg_offset = ucc_buffer_vector_block_offset(p->counts, p->is64,
s.seg_start);
*seg_count = ucc_buffer_vector_block_offset(p->counts, p->is64,
s.seg_end) - *seg_offset;
return;
default:
ucc_assert(0);
}
Expand Down
9 changes: 5 additions & 4 deletions src/components/tl/ucp/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ allgather = \
allgather/allgather_sparbit.c \
allgather/allgather_knomial.c

allgatherv = \
allgatherv/allgatherv.h \
allgatherv/allgatherv.c \
allgatherv/allgatherv_ring.c
allgatherv = \
allgatherv/allgatherv.h \
allgatherv/allgatherv.c \
allgatherv/allgatherv_ring.c \
allgatherv/allgatherv_knomial.c

alltoall = \
alltoall/alltoall.h \
Expand Down
74 changes: 63 additions & 11 deletions src/components/tl/ucp/allgather/allgather_knomial.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (c) 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* Copyright (c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
*
* See file LICENSE for terms.
*/
Expand All @@ -25,6 +25,27 @@
(_rank)) \
: (_args)->dst.info.count / (_size)

#define GET_TOTAL_COUNT(_args, _size) \
((_args)->coll_type == UCC_COLL_TYPE_ALLGATHERV) \
? ucc_coll_args_get_total_count((_args), (_args)->dst.info_v.counts, \
(_size)) \
: (_args)->dst.info.count

#define GET_DT(_args) \
((_args)->coll_type == UCC_COLL_TYPE_ALLGATHERV) \
? (_args)->dst.info_v.datatype \
: (_args)->dst.info.datatype

#define GET_DST(_args) \
((_args)->coll_type == UCC_COLL_TYPE_ALLGATHERV) \
? (_args)->dst.info_v.buffer \
: (_args)->dst.info.buffer

#define GET_MT(_args) \
((_args)->coll_type == UCC_COLL_TYPE_ALLGATHERV) \
? (_args)->dst.info_v.mem_type \
: (_args)->dst.info.mem_type

/* Bcast will first call scatter and then allgather.
* In case of non-full tree with "extra" ranks, scatter will give each rank
* a new virtual rank number - "vrank".
Expand All @@ -40,12 +61,11 @@ void ucc_tl_ucp_allgather_knomial_progress(ucc_coll_task_t *coll_task)
ucc_kn_radix_t radix = task->allgather_kn.p.radix;
uint8_t node_type = task->allgather_kn.p.node_type;
ucc_knomial_pattern_t *p = &task->allgather_kn.p;
void *rbuf = args->dst.info.buffer;
ucc_memory_type_t mem_type = args->dst.info.mem_type;
size_t count = args->dst.info.count;
size_t dt_size = ucc_dt_size(args->dst.info.datatype);
size_t data_size = count * dt_size;
void *rbuf = GET_DST(args);
ucc_memory_type_t mem_type = GET_MT(args);
size_t dt_size = ucc_dt_size(GET_DT(args));
ucc_rank_t size = task->subset.map.ep_num;
size_t data_size = GET_TOTAL_COUNT(args, size);
ucc_rank_t broot = args->coll_type == UCC_COLL_TYPE_BCAST ?
args->root : 0;
ucc_rank_t rank = VRANK(task->subset.myrank, broot, size);
Expand All @@ -72,15 +92,18 @@ void ucc_tl_ucp_allgather_knomial_progress(ucc_coll_task_t *coll_task)
team, task),
task, out);
}
UCPCHECK_GOTO(ucc_tl_ucp_recv_nb(rbuf, data_size, mem_type,
UCPCHECK_GOTO(ucc_tl_ucp_recv_nb(rbuf, data_size * dt_size, mem_type,
ucc_ep_map_eval(task->subset.map,
INV_VRANK(peer,broot,size)),
team, task),
task, out);
}
if ((p->type != KN_PATTERN_ALLGATHERX) && (node_type == KN_NODE_PROXY)) {
peer = ucc_knomial_pattern_get_extra(p, rank);
extra_count = GET_LOCAL_COUNT(args, size, peer);
extra_count =
coll_task->bargs.args.coll_type == UCC_COLL_TYPE_ALLGATHER
? local
: ucc_coll_args_get_count(args, args->dst.info_v.counts, peer);
peer = ucc_ep_map_eval(task->subset.map, peer);
UCPCHECK_GOTO(ucc_tl_ucp_recv_nb(PTR_OFFSET(task->allgather_kn.sbuf,
local * dt_size), extra_count * dt_size,
Expand Down Expand Up @@ -154,8 +177,7 @@ void ucc_tl_ucp_allgather_knomial_progress(ucc_coll_task_t *coll_task)

if (KN_NODE_PROXY == node_type) {
peer = ucc_knomial_pattern_get_extra(p, rank);
UCPCHECK_GOTO(ucc_tl_ucp_send_nb(args->dst.info.buffer, data_size,
mem_type,
UCPCHECK_GOTO(ucc_tl_ucp_send_nb(rbuf, data_size * dt_size, mem_type,
ucc_ep_map_eval(task->subset.map,
INV_VRANK(peer, broot, size)),
team, task),
Expand Down Expand Up @@ -190,6 +212,7 @@ ucc_status_t ucc_tl_ucp_allgather_knomial_start(ucc_coll_task_t *coll_task)
ucc_status_t status;
ptrdiff_t offset;
ucc_ee_executor_t *exec;
void *rbuf;

UCC_TL_UCP_PROFILE_REQUEST_EVENT(coll_task, "ucp_allgather_kn_start", 0);
ucc_tl_ucp_task_reset(task, UCC_INPROGRESS);
Expand All @@ -200,6 +223,7 @@ ucc_status_t ucc_tl_ucp_allgather_knomial_start(ucc_coll_task_t *coll_task)
&task->allgather_kn.p);
offset = ucc_buffer_block_offset(args->dst.info.count, size, rank) *
ucc_dt_size(args->dst.info.datatype);
rbuf = args->dst.info.buffer;
if (!UCC_IS_INPLACE(*args)) {
status = ucc_coll_task_get_executor(&task->super, &exec);
if (ucc_unlikely(status != UCC_OK)) {
Expand All @@ -218,18 +242,46 @@ ucc_status_t ucc_tl_ucp_allgather_knomial_start(ucc_coll_task_t *coll_task)
return status;
}
}
} else if (ct == UCC_COLL_TYPE_ALLGATHERV) {
ucc_kn_agv_pattern_init(size, rank, radix, args->dst.info_v.counts,
UCC_COLL_ARGS_COUNT64(args),
&task->allgather_kn.p);
offset = ucc_buffer_vector_block_offset(args->dst.info_v.counts,
UCC_COLL_ARGS_COUNT64(args),
rank) *
ucc_dt_size(args->dst.info_v.datatype);
rbuf = args->dst.info_v.buffer;
if (!UCC_IS_INPLACE(*args)) {
status = ucc_coll_task_get_executor(&task->super, &exec);
if (ucc_unlikely(status != UCC_OK)) {
task->super.status = status;
return status;
}
eargs.task_type = UCC_EE_EXECUTOR_TASK_COPY;
eargs.copy.dst = PTR_OFFSET(args->dst.info_v.buffer, offset);
eargs.copy.src = args->src.info.buffer;
eargs.copy.len = args->src.info.count *
ucc_dt_size(args->src.info.datatype);
status = ucc_ee_executor_task_post(exec, &eargs,
&task->allgather_kn.etask);
if (ucc_unlikely(status != UCC_OK)) {
task->super.status = status;
return status;
}
}
} else {
ucc_kn_agx_pattern_init(size, rank, radix, args->dst.info.count,
&task->allgather_kn.p);
offset = ucc_sra_kn_get_offset(args->dst.info.count,
ucc_dt_size(args->dst.info.datatype), rank,
size, radix);
rbuf = args->dst.info.buffer;
task->allgather_kn.recv_dist = ucc_knomial_calc_recv_dist(
size - p->n_extra,
ucc_knomial_pattern_loop_rank(p, rank),
p->radix, 0);
}
task->allgather_kn.sbuf = PTR_OFFSET(args->dst.info.buffer, offset);
task->allgather_kn.sbuf = PTR_OFFSET(rbuf, offset);

return ucc_progress_queue_enqueue(UCC_TL_CORE_CTX(team)->pq, &task->super);
}
Expand Down
6 changes: 5 additions & 1 deletion src/components/tl/ucp/allgatherv/allgatherv.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (c) 2021, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* Copyright (c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
*
* See file LICENSE for terms.
*/
Expand All @@ -15,6 +15,10 @@ ucc_base_coll_alg_info_t
{.id = UCC_TL_UCP_ALLGATHERV_ALG_RING,
.name = "ring",
.desc = "O(N) Ring"},
[UCC_TL_UCP_ALLGATHERV_ALG_KNOMIAL] =
{.id = UCC_TL_UCP_ALLGATHERV_ALG_KNOMIAL,
.name = "knomial",
.desc = "recursive k-ing with arbitrary radix"},
[UCC_TL_UCP_ALLGATHERV_ALG_LAST] = {
.id = 0, .name = NULL, .desc = NULL}};

Expand Down
27 changes: 26 additions & 1 deletion src/components/tl/ucp/allgatherv/allgatherv.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (c) 2021, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* Copyright (c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
*
* See file LICENSE for terms.
*/
Expand All @@ -12,13 +12,38 @@

enum {
UCC_TL_UCP_ALLGATHERV_ALG_RING,
UCC_TL_UCP_ALLGATHERV_ALG_KNOMIAL,
UCC_TL_UCP_ALLGATHERV_ALG_LAST
};

extern ucc_base_coll_alg_info_t
ucc_tl_ucp_allgatherv_algs[UCC_TL_UCP_ALLGATHERV_ALG_LAST + 1];

#define UCC_TL_UCP_ALLGATHERV_DEFAULT_ALG_SELECT_STR \
"allgatherv:@0"

char *ucc_tl_ucp_allgatherv_score_str_get(ucc_tl_ucp_team_t *team);

static inline int ucc_tl_ucp_allgatherv_alg_from_str(const char *str)
{
int i;
for (i = 0; i < UCC_TL_UCP_ALLGATHERV_ALG_LAST; i++) {
if (0 == strcasecmp(str, ucc_tl_ucp_allgatherv_algs[i].name)) {
break;
}
}
return i;
}

ucc_status_t ucc_tl_ucp_allgatherv_ring_init_common(ucc_tl_ucp_task_t *task);

ucc_status_t ucc_tl_ucp_allgatherv_ring_init(ucc_base_coll_args_t *coll_args,
ucc_base_team_t *team,
ucc_coll_task_t **task_h);

ucc_status_t ucc_tl_ucp_allgatherv_knomial_init(ucc_base_coll_args_t *coll_args,
ucc_base_team_t *team,
ucc_coll_task_t **task_h);

ucc_status_t ucc_tl_ucp_allgatherv_init(ucc_tl_ucp_task_t *task);
#endif
18 changes: 18 additions & 0 deletions src/components/tl/ucp/allgatherv/allgatherv_knomial.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/**
* Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
*
* See file LICENSE for terms.
*/

#include "allgatherv/allgatherv.h"
#include "allgather/allgather.h"

ucc_status_t ucc_tl_ucp_allgatherv_knomial_init(ucc_base_coll_args_t *coll_args,
ucc_base_team_t *team,
ucc_coll_task_t **task_h)
{
if (!UCC_COLL_IS_DST_CONTIG(&coll_args->args)) {
return ucc_tl_ucp_allgatherv_ring_init(coll_args, team, task_h);
}
return ucc_tl_ucp_allgather_knomial_init(coll_args, team, task_h);
}
19 changes: 18 additions & 1 deletion src/components/tl/ucp/allgatherv/allgatherv_ring.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (c) 2021-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* Copyright (c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
*
* See file LICENSE for terms.
*/
Expand Down Expand Up @@ -128,3 +128,20 @@ ucc_status_t ucc_tl_ucp_allgatherv_ring_init_common(ucc_tl_ucp_task_t *task)

return UCC_OK;
}

ucc_status_t ucc_tl_ucp_allgatherv_ring_init(ucc_base_coll_args_t *coll_args,
ucc_base_team_t *team,
ucc_coll_task_t **task_h)
{
ucc_tl_ucp_task_t *task;
ucc_status_t status;

task = ucc_tl_ucp_init_task(coll_args, team);
status = ucc_tl_ucp_allgatherv_ring_init_common(task);
if (status != UCC_OK) {
ucc_tl_ucp_put_task(task);
return status;
}
*task_h = &task->super;
return UCC_OK;
}
19 changes: 19 additions & 0 deletions src/components/tl/ucp/tl_ucp_coll.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ const ucc_tl_ucp_default_alg_desc_t
.select_str = NULL,
.str_get_fn = ucc_tl_ucp_allgather_score_str_get
},
{
.select_str = UCC_TL_UCP_ALLGATHERV_DEFAULT_ALG_SELECT_STR,
.str_get_fn = NULL
},
{
.select_str = NULL,
.str_get_fn = ucc_tl_ucp_alltoall_score_str_get
Expand Down Expand Up @@ -219,6 +223,8 @@ static inline int alg_id_from_str(ucc_coll_type_t coll_type, const char *str)
switch (coll_type) {
case UCC_COLL_TYPE_ALLGATHER:
return ucc_tl_ucp_allgather_alg_from_str(str);
case UCC_COLL_TYPE_ALLGATHERV:
return ucc_tl_ucp_allgatherv_alg_from_str(str);
case UCC_COLL_TYPE_ALLREDUCE:
return ucc_tl_ucp_allreduce_alg_from_str(str);
case UCC_COLL_TYPE_ALLTOALL:
Expand Down Expand Up @@ -273,6 +279,19 @@ ucc_status_t ucc_tl_ucp_alg_id_to_init(int alg_id, const char *alg_id_str,
break;
};
break;
case UCC_COLL_TYPE_ALLGATHERV:
switch (alg_id) {
case UCC_TL_UCP_ALLGATHERV_ALG_KNOMIAL:
*init = ucc_tl_ucp_allgatherv_knomial_init;
break;
case UCC_TL_UCP_ALLGATHERV_ALG_RING:
*init = ucc_tl_ucp_allgatherv_ring_init;
break;
default:
status = UCC_ERR_INVALID_PARAM;
break;
};
break;
case UCC_COLL_TYPE_ALLREDUCE:
switch (alg_id) {
case UCC_TL_UCP_ALLREDUCE_ALG_KNOMIAL:
Expand Down
2 changes: 1 addition & 1 deletion src/components/tl/ucp/tl_ucp_coll.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
#include "tl_ucp_tag.h"

#define UCC_UUNITS_AUTO_RADIX 4
#define UCC_TL_UCP_N_DEFAULT_ALG_SELECT_STR 8
#define UCC_TL_UCP_TASK_PLUGIN_MAX_DATA 128
#define UCC_TL_UCP_N_DEFAULT_ALG_SELECT_STR 9

ucc_status_t ucc_tl_ucp_team_default_score_str_alloc(ucc_tl_ucp_team_t *team,
char *default_select_str[UCC_TL_UCP_N_DEFAULT_ALG_SELECT_STR]);
Expand Down
19 changes: 19 additions & 0 deletions src/utils/ucc_coll_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,25 @@ static inline size_t ucc_buffer_block_offset(size_t total_count,
return (block < left) ? offset - (left - block) : offset;
}

static inline size_t ucc_buffer_vector_block_offset(ucc_count_t *counts,
int is64,
ucc_rank_t rank)
{
size_t offset = 0;
ucc_rank_t i;

if (is64) {
for (i = 0; i < rank; i++) {
offset += ((uint64_t *)counts)[i];
}
} else {
for (i = 0; i < rank; i++) {
offset += ((uint32_t *)counts)[i];
}
}
return offset;
}

/* Given the rank space A (e.g. core ucc team), a subset B (e.g. active set
within the core team), the ep_map that maps ranks from the subset B to A,
and the rank of a process within A. The function below computes the local
Expand Down
Loading

0 comments on commit e5cb294

Please sign in to comment.